Skip to content

Commit

Permalink
Fix JULIA_EXCLUSIVE setting affinity on non-worker threads (#57136)
Browse files Browse the repository at this point in the history
With JULIA_EXCLUSIVE=1, we would try to give both worker and interactive
threads an exclusive CPU, causing --threads=auto to produce a "Too many
threads requested" error.

Fixes #50702.
  • Loading branch information
xal-0 authored Jan 25, 2025
1 parent 899d2f5 commit 246e408
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 22 deletions.
7 changes: 5 additions & 2 deletions doc/src/manual/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,11 @@ during image compilation. Defaults to 0.
### [`JULIA_EXCLUSIVE`](@id JULIA_EXCLUSIVE)

If set to anything besides `0`, then Julia's thread policy is consistent with
running on a dedicated machine: the master thread is on proc 0, and threads are
affinitized. Otherwise, Julia lets the operating system handle thread policy.
running on a dedicated machine: each thread in the default threadpool is
affinitized. [Interactive threads](@ref man-threadpools) remain under the
control of the operating system scheduler.

Otherwise, Julia lets the operating system handle thread policy.

## Garbage Collection

Expand Down
4 changes: 2 additions & 2 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -907,8 +907,8 @@ static NOINLINE void _finish_julia_init(JL_IMAGE_SEARCH rel, jl_ptls_t ptls, jl_
jl_n_markthreads = 0;
jl_n_sweepthreads = 0;
jl_n_gcthreads = 0;
jl_n_threads_per_pool[0] = 0; // Interactive threadpool
jl_n_threads_per_pool[1] = 1; // Default threadpool
jl_n_threads_per_pool[JL_THREADPOOL_ID_INTERACTIVE] = 0;
jl_n_threads_per_pool[JL_THREADPOOL_ID_DEFAULT] = 1;
} else {
post_image_load_hooks();
}
Expand Down
3 changes: 3 additions & 0 deletions src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -2053,6 +2053,9 @@ extern JL_DLLIMPORT _Atomic(int) jl_n_threads;
extern JL_DLLIMPORT int jl_n_gcthreads;
extern int jl_n_markthreads;
extern int jl_n_sweepthreads;

#define JL_THREADPOOL_ID_INTERACTIVE 0
#define JL_THREADPOOL_ID_DEFAULT 1
extern JL_DLLIMPORT int *jl_n_threads_per_pool;

// environment entries
Expand Down
42 changes: 26 additions & 16 deletions src/threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -778,9 +778,9 @@ void jl_init_threading(void)
}

jl_all_tls_states_size = nthreads + nthreadsi + ngcthreads;
jl_n_threads_per_pool = (int*)malloc_s(2 * sizeof(int));
jl_n_threads_per_pool[0] = nthreadsi;
jl_n_threads_per_pool[1] = nthreads;
jl_n_threads_per_pool = (int*)calloc_s(jl_n_threadpools * sizeof(int));
jl_n_threads_per_pool[JL_THREADPOOL_ID_INTERACTIVE] = nthreadsi;
jl_n_threads_per_pool[JL_THREADPOOL_ID_DEFAULT] = nthreads;
assert(jl_all_tls_states_size > 0);
jl_atomic_store_release(&jl_all_tls_states, (jl_ptls_t*)calloc(jl_all_tls_states_size, sizeof(jl_ptls_t)));
jl_atomic_store_release(&jl_n_threads, jl_all_tls_states_size);
Expand All @@ -793,7 +793,10 @@ uv_barrier_t thread_init_done;
void jl_start_threads(void)
{
int nthreads = jl_atomic_load_relaxed(&jl_n_threads);
int ngcthreads = jl_n_gcthreads;
int ninteractive_threads = jl_n_threads_per_pool[JL_THREADPOOL_ID_INTERACTIVE];
int ndefault_threads = jl_n_threads_per_pool[JL_THREADPOOL_ID_DEFAULT];
int nmutator_threads = nthreads - jl_n_gcthreads;

int cpumasksize = uv_cpumask_size();
char *cp;
int i, exclusive;
Expand All @@ -808,36 +811,43 @@ void jl_start_threads(void)
if (cp && strcmp(cp, "0") != 0)
exclusive = 1;

// exclusive use: affinitize threads, master thread on proc 0, rest
// according to a 'compact' policy
// exclusive use: affinitize threads, master thread on proc 0, threads in
// default pool according to a 'compact' policy
// non-exclusive: no affinity settings; let the kernel move threads about
if (exclusive) {
if (nthreads > jl_cpu_threads()) {
if (ndefault_threads > jl_cpu_threads()) {
jl_printf(JL_STDERR, "ERROR: Too many threads requested for %s option.\n", MACHINE_EXCLUSIVE_NAME);
exit(1);
}
memset(mask, 0, cpumasksize);
mask[0] = 1;
uvtid = uv_thread_self();
uv_thread_setaffinity(&uvtid, mask, NULL, cpumasksize);
mask[0] = 0;

// If there are no interactive threads, the master thread is in the
// default pool and we must affinitize it
if (ninteractive_threads == 0) {
mask[0] = 1;
uvtid = uv_thread_self();
uv_thread_setaffinity(&uvtid, mask, NULL, cpumasksize);
mask[0] = 0;
}
}

// create threads
uv_barrier_init(&thread_init_done, nthreads);

// GC/System threads need to be after the worker threads.
int nmutator_threads = nthreads - ngcthreads;

for (i = 1; i < nmutator_threads; ++i) {
jl_threadarg_t *t = (jl_threadarg_t *)malloc_s(sizeof(jl_threadarg_t)); // ownership will be passed to the thread
t->tid = i;
t->barrier = &thread_init_done;
uv_thread_create(&uvtid, jl_threadfun, t);
if (exclusive) {
mask[i] = 1;

// Interactive pool threads get the low IDs, so check if this is a
// default pool thread. The master thread is already on CPU 0.
if (exclusive && i >= ninteractive_threads) {
assert(i - ninteractive_threads < cpumasksize);
mask[i - ninteractive_threads] = 1;
uv_thread_setaffinity(&uvtid, mask, NULL, cpumasksize);
mask[i] = 0;
mask[i - ninteractive_threads] = 0;
}
uv_thread_detach(&uvtid);
}
Expand Down
6 changes: 4 additions & 2 deletions test/threads.jl
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,11 @@ if AFFINITY_SUPPORTED
end
end

function get_nthreads(options = ``; cpus = nothing)
function get_nthreads(options = ``; cpus = nothing, exclusive = false)
cmd = `$(Base.julia_cmd()) --startup-file=no $(options)`
cmd = `$cmd -e "print(Threads.threadpoolsize())"`
cmd = addenv(cmd, "JULIA_EXCLUSIVE" => "0", "JULIA_NUM_THREADS" => "auto")
cmd = addenv(cmd, "JULIA_EXCLUSIVE" => exclusive ? "1" : "0",
"JULIA_NUM_THREADS" => "auto")
if cpus !== nothing
cmd = setcpuaffinity(cmd, cpus)
end
Expand All @@ -138,6 +139,7 @@ end
allowed_cpus = findall(uv_thread_getaffinity())
if length(allowed_cpus) 2
@test get_nthreads() 2
@test get_nthreads(exclusive = true) 2
@test get_nthreads(cpus = allowed_cpus[1:1]) == 1
@test get_nthreads(cpus = allowed_cpus[2:2]) == 1
@test get_nthreads(cpus = allowed_cpus[1:2]) == 2
Expand Down

0 comments on commit 246e408

Please sign in to comment.