Go to the documentation of this file.
33 #ifndef ABT_CONFIG_DISABLE_MIGRATION
89 #ifndef ABT_CONFIG_ENABLE_VER_20_API
105 #ifndef ABT_CONFIG_ENABLE_VER_20_API
183 #ifndef ABT_CONFIG_ENABLE_VER_20_API
205 for (i = 0; i < num_pools; i++) {
275 #ifndef ABT_CONFIG_ENABLE_VER_20_API
293 #ifndef ABT_CONFIG_ENABLE_VER_20_API
370 &p_main_sched_ythread->
thread);
436 "The current xstream cannot be freed.");
440 "The primary xstream cannot be freed explicitly.");
534 #ifndef ABT_CONFIG_ENABLE_VER_20_API
628 #ifndef ABT_CONFIG_ENABLE_VER_20_API
671 #ifndef ABT_CONFIG_ENABLE_VER_20_API
676 *rank = (int)p_local_xstream->
rank;
765 *rank = (int)p_xstream->
rank;
859 #ifndef ABT_CONFIG_ENABLE_VER_20_API
862 p_local_xstream == p_xstream,
867 p_local_xstream == p_xstream,
877 #ifndef ABT_CONFIG_ENABLE_VER_20_API
1048 memcpy(pools, p_sched->
pools,
sizeof(
ABT_pool) * max_pools);
1275 #ifndef ABT_CONFIG_ENABLE_VER_20_API
1423 affinity.
cpuids = cpuids;
1486 cpuids, num_cpuids);
1515 *pp_xstream = p_newxstream;
1546 (*pp_local_xstream)->p_thread = &p_ythread->
thread;
1592 if (p_cursched != NULL) {
1618 if (p_xstream == NULL) {
1619 fprintf(p_os,
"%*s== NULL ES ==\n", indent,
"");
1621 const char *type, *state;
1622 switch (p_xstream->
type) {
1638 state =
"TERMINATED";
1646 "%*s== ES (%p) ==\n"
1650 "%*sroot_ythread : %p\n"
1651 "%*sroot_pool : %p\n"
1653 "%*smain_sched : %p\n",
1654 indent,
"", (
void *)p_xstream, indent,
"", p_xstream->rank,
1655 indent,
"", type, indent,
"", state, indent,
"",
1656 (
void *)p_xstream->p_root_ythread, indent,
"",
1657 (
void *)p_xstream->p_root_pool, indent,
"",
1658 (
void *)p_xstream->p_thread, indent,
"",
1659 (
void *)p_xstream->p_main_sched);
1665 fprintf(p_os,
"%*sctx :\n", indent,
"");
1702 int abt_errno, init_stage = 0;
1708 p_newxstream->p_prev = NULL;
1709 p_newxstream->p_next = NULL;
1717 p_newxstream->type = xstream_type;
1720 p_newxstream->p_main_sched = NULL;
1721 p_newxstream->p_thread = NULL;
1733 p_newxstream, &p_newxstream->p_root_ythread);
1750 p_newxstream->p_main_sched);
1762 (
void *)p_newxstream,
1763 &p_newxstream->ctx);
1771 p_newxstream->rank);
1775 LOG_DEBUG(
"[E%d] created\n", p_newxstream->rank);
1778 *pp_xstream = p_newxstream;
1781 if (init_stage >= 5) {
1783 &p_newxstream->p_main_sched->p_ythread->thread);
1784 p_newxstream->p_main_sched->p_ythread = NULL;
1786 if (init_stage >= 4) {
1789 if (init_stage >= 3) {
1791 p_newxstream->p_root_ythread);
1793 if (init_stage >= 2) {
1797 if (init_stage >= 1) {
1834 #ifndef ABT_CONFIG_DISABLE_THREAD_CANCEL
1837 LOG_DEBUG(
"[U%" PRIu64
":E%d] canceled\n",
1839 p_local_xstream->
rank);
1848 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1870 LOG_DEBUG(
"[U%" PRIu64
":E%d] start running\n",
1881 p_local_xstream = *pp_local_xstream;
1883 LOG_DEBUG(
"[U%" PRIu64
":E%d] stopped\n",
1894 LOG_DEBUG(
"[U%" PRIu64
":E%d] finished\n",
1896 p_local_xstream->
rank);
1900 #ifndef ABT_CONFIG_DISABLE_THREAD_CANCEL
1902 LOG_DEBUG(
"[U%" PRIu64
":E%d] canceled\n",
1904 p_local_xstream->
rank);
1916 LOG_DEBUG(
"[U%" PRIu64
":E%d] check blocked\n",
1918 p_local_xstream->
rank);
1920 #ifndef ABT_CONFIG_DISABLE_MIGRATION
1933 LOG_DEBUG(
"[U%" PRIu64
":E%d] orphaned\n",
1935 p_local_xstream->
rank);
1948 #ifndef ABT_CONFIG_DISABLE_TASK_CANCEL
1967 p_local_xstream->
rank);
1969 ABTI_thread *p_sched_thread = p_local_xstream->p_thread;
1970 p_local_xstream->p_thread = p_task;
1977 p_local_xstream->
rank);
1981 p_local_xstream->
rank);
1984 p_local_xstream->p_thread = p_sched_thread;
1992 #ifndef ABT_CONFIG_DISABLE_MIGRATION
2050 if (p_main_sched == NULL) {
2056 }
else if (*pp_local_xstream != p_xstream) {
2083 ABTI_thread *p_thread = (*pp_local_xstream)->p_thread;
2091 for (p = 0; p < p_main_sched->
num_pools; p++) {
2140 static int max_xstreams_warning_once = 0;
2141 if (max_xstreams_warning_once == 0) {
2149 char *warning_message;
2151 ABTU_malloc(
sizeof(
char) * 1024, (
void **)&warning_message);
2153 snprintf(warning_message, 1024,
2154 "Warning: the number of execution streams exceeds "
2155 "ABT_MAX_NUM_XSTREAMS (=%d). This may cause an error.",
2159 max_xstreams_warning_once = 1;
2171 int rank = p_newxstream->
rank;
2177 if (p_xstream->
rank > rank) {
2181 p_prev_xstream = p_xstream;
2182 p_xstream = p_xstream->
p_next;
2187 if (p_prev_xstream) {
2188 p_prev_xstream->
p_next = p_newxstream;
2189 p_newxstream->
p_prev = p_prev_xstream;
2190 p_newxstream->
p_next = NULL;
2193 p_newxstream->
p_prev = NULL;
2194 p_newxstream->
p_next = NULL;
2208 p_xstream->
p_prev = p_newxstream;
2209 p_newxstream->
p_next = p_xstream;
2217 if (!p_xstream->
p_prev) {
2239 if (p_xstream->
rank == rank) {
2245 p_xstream = p_xstream->
p_next;
2251 if (p_xstream->
rank == rank) {
2254 }
else if (p_xstream->
rank > rank) {
2257 p_xstream = p_xstream->
p_next;
2261 p_newxstream->
rank = rank;
2274 if (p_xstream->
rank == rank) {
2284 if (p_next->
rank == rank) {
2287 }
else if (p_next->
rank > rank) {
2295 p_xstream->
rank = rank;
int ABT_xstream_set_affinity(ABT_xstream xstream, int num_cpuids, int *cpuids)
Bind an execution stream to target CPUs.
@ ABT_THREAD_STATE_TERMINATED
#define HANDLE_WARNING(msg)
static int ABTU_min_int(int a, int b)
#define ABTI_CHECK_NULL_SCHED_PTR(p)
#define ABT_ERR_INV_THREAD
Error code: invalid work unit.
ABT_sched_predef
Predefined scheduler type.
static ABTI_sched * ABTI_sched_get_ptr(ABT_sched sched)
static ABTU_ret_err int xstream_join(ABTI_local **pp_local, ABTI_xstream *p_xstream)
void(* f_migration_cb)(ABT_thread, void *)
int ABT_bool
Boolean type.
@ ABTD_XSTREAM_CONTEXT_STATE_WAITING
@ ABTI_XSTREAM_TYPE_PRIMARY
#define ABT_ERR_CPUID
Error code: error related to CPU ID.
#define ABTI_THREAD_REQ_TERMINATE
static void xstream_update_max_xstreams(ABTI_global *p_global, int newrank)
ABTU_ret_err int ABTI_sched_create_basic(ABT_sched_predef predef, int num_pools, ABT_pool *pools, ABTI_sched_config *p_config, ABTI_sched **pp_newsched)
#define ABTI_SETUP_LOCAL_XSTREAM(pp_local_xstream)
static uint32_t ABTD_atomic_acquire_load_uint32(const ABTD_atomic_uint32 *ptr)
ABTU_ret_err int ABTD_affinity_cpuset_apply(ABTD_xstream_context *p_ctx, const ABTD_affinity_cpuset *p_cpuset)
struct ABT_thread_opaque * ABT_thread
Work unit handle type.
ABTD_atomic_ptr p_migration_pool
ABTU_ret_err int ABTI_xstream_create_primary(ABTI_global *p_global, ABTI_xstream **pp_xstream)
#define ABTI_SETUP_GLOBAL(pp_global)
ABTD_spinlock xstream_list_lock
static ABTU_ret_err int ABTI_unit_set_associated_pool(ABTI_global *p_global, ABT_unit unit, ABTI_pool *p_pool, ABTI_thread **pp_thread)
int ABT_xstream_self(ABT_xstream *xstream)
Get an execution stream that is running the calling work unit.
void ABTD_xstream_context_print(ABTD_xstream_context *p_ctx, FILE *p_os, int indent)
#define ABT_ERR_INV_XSTREAM
Error code: invalid execution stream.
ABTU_ret_err int ABTI_thread_revive(ABTI_global *p_global, ABTI_local *p_local, ABTI_pool *p_pool, void(*thread_func)(void *), void *arg, ABTI_thread *p_thread)
static ABT_bool xstream_change_rank(ABTI_global *p_global, ABTI_xstream *p_xstream, int rank)
void ABTI_sched_exit(ABTI_sched *p_sched)
static ABTI_global * ABTI_global_get_global(void)
int ABT_xstream_set_rank(ABT_xstream xstream, int rank)
Set a rank for an execution stream.
int ABT_xstream_get_num(int *num_xstreams)
Get the number of current existing execution streams.
int ABT_xstream_get_affinity(ABT_xstream xstream, int max_cpuids, int *cpuids, int *num_cpuids)
Get CPU IDs of CPUs to which an execution stream is bound.
#define ABTI_CHECK_ERROR(abt_errno)
#define ABTI_THREAD_REQ_ORPHAN
static ABT_sched ABTI_sched_get_handle(ABTI_sched *p_sched)
struct ABT_sched_config_opaque * ABT_sched_config
Scheduler configuration handle type.
#define ABTI_THREAD_TYPE_YIELDABLE
ABTD_atomic_uint32 request
static ABTI_ythread * ABTI_thread_get_ythread(ABTI_thread *p_thread)
static void xstream_schedule_ythread(ABTI_global *p_global, ABTI_xstream **pp_local_xstream, ABTI_ythread *p_ythread)
void ABTD_xstream_context_free(ABTD_xstream_context *p_ctx)
#define ABTI_CHECK_TRUE_MSG(cond, abt_errno, msg)
ABT_unit_id ABTI_thread_get_id(ABTI_thread *p_thread)
static void xstream_return_rank(ABTI_global *p_global, ABTI_xstream *p_xstream)
#define ABTI_THREAD_REQ_NON_YIELD
static void ABTI_pool_push(ABTI_pool *p_pool, ABT_unit unit)
void ABTI_thread_free(ABTI_global *p_global, ABTI_local *p_local, ABTI_thread *p_thread)
void ABTI_xstream_run_thread(ABTI_global *p_global, ABTI_xstream **pp_local_xstream, ABTI_thread *p_thread)
static ABTU_ret_err int ABTI_thread_set_associated_pool(ABTI_global *p_global, ABTI_thread *p_thread, ABTI_pool *p_pool)
static void xstream_remove_xstream_list(ABTI_global *p_global, ABTI_xstream *p_xstream)
int ABT_xstream_cancel(ABT_xstream xstream)
Send a cancellation request to an execution stream.
#define ABTI_IS_ERROR_CHECK_ENABLED
int ABT_xstream_create(ABT_sched sched, ABT_xstream *newxstream)
Create a new execution stream.
void ABTI_xstream_check_events(ABTI_xstream *p_xstream, ABTI_sched *p_sched)
static ABTU_ret_err int xstream_migrate_thread(ABTI_global *p_global, ABTI_local *p_local, ABTI_thread *p_thread)
int ABT_xstream_get_main_sched(ABT_xstream xstream, ABT_sched *sched)
Retrieve the main scheduler of an execution stream.
struct ABT_pool_opaque * ABT_pool
Pool handle type.
int ABT_xstream_get_rank(ABT_xstream xstream, int *rank)
Retrieve a rank of an execution stream.
int ABT_xstream_self_rank(int *rank)
Return a rank of an execution stream associated with a caller.
static ABTU_ret_err int xstream_update_main_sched(ABTI_global *p_global, ABTI_xstream **pp_local_xstream, ABTI_xstream *p_xstream, ABTI_sched *p_sched)
void ABTD_xstream_context_revive(ABTD_xstream_context *p_ctx)
int ABT_xstream_is_primary(ABT_xstream xstream, ABT_bool *is_primary)
Check if the target execution stream is primary.
struct ABT_sched_opaque * ABT_sched
Scheduler handle type.
#define ABT_ERR_INV_SCHED
Error code: invalid scheduler.
#define ABTI_THREAD_TYPE_PRIMARY
static ABT_xstream ABTI_xstream_get_handle(ABTI_xstream *p_xstream)
static void xstream_init_main_sched(ABTI_xstream *p_xstream, ABTI_sched *p_sched)
void ABTI_pool_free(ABTI_pool *p_pool)
#define ABTI_THREAD_REQ_MIGRATE
void ABTI_info_check_print_all_thread_stacks(void)
ABTU_ret_err int ABTD_xstream_context_create(void *(*f_xstream)(void *), void *p_arg, ABTD_xstream_context *p_ctx)
#define ABTI_SCHED_REQ_REPLACE
static ABTU_ret_err int xstream_create(ABTI_global *p_global, ABTI_sched *p_sched, ABTI_xstream_type xstream_type, int rank, ABT_bool start, ABTI_xstream **pp_xstream)
static void ABTD_atomic_relaxed_store_uint32(ABTD_atomic_uint32 *ptr, uint32_t val)
ABTU_noreturn void ABTI_ythread_exit(ABTI_xstream *p_local_xstream, ABTI_ythread *p_ythread)
static void ABTI_sched_discard_and_free(ABTI_global *p_global, ABTI_local *p_local, ABTI_sched *p_sched, ABT_bool force_free)
static int ABTD_atomic_relaxed_load_int(const ABTD_atomic_int *ptr)
void ABTI_ythread_free_root(ABTI_global *p_global, ABTI_local *p_local, ABTI_ythread *p_ythread)
struct ABT_xstream_opaque * ABT_xstream
Execution stream handle type.
static ABTI_ythread * ABTI_thread_get_ythread_or_null(ABTI_thread *p_thread)
static void ABTI_pool_add_thread(ABTI_thread *p_thread)
static void ABTD_spinlock_acquire(ABTD_spinlock *p_lock)
#define ABTI_HANDLE_ERROR(n)
int ABT_xstream_create_basic(ABT_sched_predef predef, int num_pools, ABT_pool *pools, ABT_sched_config config, ABT_xstream *newxstream)
Create a new execution stream with a predefined scheduler.
void ABTD_xstream_context_set_self(ABTD_xstream_context *p_ctx)
int ABT_xstream_create_with_rank(ABT_sched sched, int rank, ABT_xstream *newxstream)
Create a new execution stream with a specific rank.
static ABT_bool xstream_set_new_rank(ABTI_global *p_global, ABTI_xstream *p_newxstream, int rank)
static ABTU_ret_err int ABTU_malloc(size_t size, void **p_ptr)
#define LOG_DEBUG(fmt,...)
void ABTI_sched_finish(ABTI_sched *p_sched)
static void ABTD_atomic_relaxed_store_int(ABTD_atomic_int *ptr, int val)
int ABT_xstream_check_events(ABT_sched sched)
Process events associated with a scheduler.
#define ABT_ERR_INV_UNIT
Error code: invalid work unit for scheduling.
#define ABT_ERR_XSTREAM_STATE
Error code: error related to an execution stream state.
ABTU_ret_err int ABTI_pool_create_basic(ABT_pool_kind kind, ABT_pool_access access, ABT_bool automatic, ABTI_pool **pp_newpool)
ABTI_sched * p_main_sched
void * p_migration_cb_arg
int ABT_xstream_set_main_sched_basic(ABT_xstream xstream, ABT_sched_predef predef, int num_pools, ABT_pool *pools)
Set the main scheduler of an execution stream to a predefined scheduler.
int ABT_xstream_get_main_pools(ABT_xstream xstream, int max_pools, ABT_pool *pools)
Get pools associated with the main scheduler of an execution stream.
@ ABTI_XSTREAM_TYPE_SECONDARY
int ABT_xstream_set_cpubind(ABT_xstream xstream, int cpuid)
Bind an execution stream to a target CPU.
void ABTI_thread_join(ABTI_local **pp_local, ABTI_thread *p_thread)
static void xstream_schedule_task(ABTI_global *p_global, ABTI_xstream *p_local_xstream, ABTI_thread *p_task)
struct ABT_unit_opaque * ABT_unit
Work unit handle type for scheduling.
#define ABTI_ASSERT(cond)
static void ABTI_sched_set_request(ABTI_sched *p_sched, uint32_t req)
static void ABTI_xstream_terminate_thread(ABTI_global *p_global, ABTI_local *p_local, ABTI_thread *p_thread)
ABTI_sched * p_replace_sched
int ABT_xstream_exit(void)
Terminate an execution stream that is running the calling ULT.
static ABTI_local * ABTI_local_get_local(void)
int ABT_xstream_join(ABT_xstream xstream)
Wait for an execution stream to terminate.
static void ABTI_local_set_xstream(ABTI_xstream *p_local_xstream)
#define ABT_SUCCESS
Error code: the routine returns successfully.
static ABTI_local * ABTI_local_get_local_uninlined(void)
void ABTI_xstream_free(ABTI_global *p_global, ABTI_local *p_local, ABTI_xstream *p_xstream, ABT_bool force_free)
#define ABTI_SETUP_LOCAL_YTHREAD(pp_local_xstream, pp_ythread)
ABTI_xstream * p_xstream_head
static int ABTD_atomic_acquire_load_int(const ABTD_atomic_int *ptr)
int ABT_xstream_get_cpubind(ABT_xstream xstream, int *cpuid)
Get CPU ID of a CPU to which an execution stream is bound.
void ABTI_ythread_set_blocked(ABTI_ythread *p_ythread)
#define ABTI_THREAD_REQ_CANCEL
ABTD_xstream_context_state state
static ABTI_xstream * ABTI_local_get_xstream_or_null(ABTI_local *p_local)
ABTU_ret_err int ABTI_mem_init_local(ABTI_global *p_global, ABTI_xstream *p_local_xstream)
static ABTI_local * ABTI_xstream_get_local(ABTI_xstream *p_xstream)
#define ABT_TRUE
True constant for ABT_bool.
static ABTI_pool * ABTI_pool_get_ptr(ABT_pool pool)
ABTD_atomic_uint32 request
@ ABT_XSTREAM_STATE_RUNNING
ABTU_ret_err int ABTI_ythread_create_root(ABTI_global *p_global, ABTI_local *p_local, ABTI_xstream *p_xstream, ABTI_ythread **pp_root_ythread)
ABTI_ythread * p_root_ythread
#define ABT_FALSE
False constant for ABT_bool.
int ABT_xstream_free(ABT_xstream *xstream)
Free an execution stream.
void ABTI_ythread_set_ready(ABTI_local *p_local, ABTI_ythread *p_ythread)
int ABTD_affinity_cpuset_apply_default(ABTD_xstream_context *p_ctx, int rank)
void ABTI_ythread_suspend(ABTI_xstream **pp_local_xstream, ABTI_ythread *p_ythread, ABT_sync_event_type sync_event_type, void *p_sync)
static void ABTD_ythread_context_switch(ABTD_ythread_context *p_old, ABTD_ythread_context *p_new)
#define ABT_ERR_INV_ARG
Error code: invalid user argument.
void ABTI_xstream_start_primary(ABTI_global *p_global, ABTI_xstream **pp_local_xstream, ABTI_xstream *p_xstream, ABTI_ythread *p_ythread)
#define ABTI_CHECK_NULL_POOL_PTR(p)
#define ABTI_CHECK_NULL_XSTREAM_PTR(p)
int ABT_xstream_get_state(ABT_xstream xstream, ABT_xstream_state *state)
Get a state of an execution stream.
static void ABTU_free(void *ptr)
static uint32_t ABTD_atomic_fetch_or_uint32(ABTD_atomic_uint32 *ptr, uint32_t v)
static void ABTD_spinlock_release(ABTD_spinlock *p_lock)
static ABTI_ythread * ABTI_ythread_context_switch_to_child(ABTI_xstream **pp_local_xstream, ABTI_ythread *p_old, ABTI_ythread *p_new)
ABTU_ret_err int ABTD_affinity_cpuset_read(ABTD_xstream_context *p_ctx, int max_cpuids, int *cpuids, int *p_num_cpuids)
void ABTD_xstream_context_join(ABTD_xstream_context *p_ctx)
static ABTI_xstream * ABTI_xstream_get_ptr(ABT_xstream xstream)
int ABT_xstream_set_main_sched(ABT_xstream xstream, ABT_sched sched)
Set the main scheduler of an execution stream.
void ABTI_xstream_print(ABTI_xstream *p_xstream, FILE *p_os, int indent, ABT_bool print_sub)
ABTU_ret_err int ABTI_ythread_create_main_sched(ABTI_global *p_global, ABTI_local *p_local, ABTI_xstream *p_xstream, ABTI_sched *p_sched)
ABTI_xstream * p_last_xstream
void ABTI_sched_free(ABTI_global *p_global, ABTI_local *p_local, ABTI_sched *p_sched, ABT_bool force_free)
static void xstream_add_xstream_list(ABTI_global *p_global, ABTI_xstream *p_newxstream)
ABTU_ret_err int ABTI_thread_get_mig_data(ABTI_global *p_global, ABTI_local *p_local, ABTI_thread *p_thread, ABTI_thread_mig_data **pp_mig_data)
struct ABTI_local ABTI_local
static void ABTI_thread_unset_associated_pool(ABTI_global *p_global, ABTI_thread *p_thread)
#define ABTI_THREAD_REQ_BLOCK
void ABTD_ythread_cancel(ABTI_xstream *p_local_xstream, ABTI_ythread *p_ythread)
#define ABTI_CHECK_TRUE(cond, abt_errno)
static void ABTI_thread_unset_request(ABTI_thread *p_thread, uint32_t req)
static ABTI_sched_config * ABTI_sched_config_get_ptr(ABT_sched_config config)
static void * xstream_launch_root_ythread(void *p_xstream)
#define ABT_ERR_INV_XSTREAM_RANK
Error code: invalid execution stream rank.
@ ABT_THREAD_STATE_RUNNING
int ABT_xstream_revive(ABT_xstream xstream)
Revive a terminated execution stream.
int ABT_xstream_run_unit(ABT_unit unit, ABT_pool pool)
Execute a work unit.
static int32_t ABTI_pool_release(ABTI_pool *p_pool)
static ABTI_xstream * ABTI_local_get_xstream(ABTI_local *p_local)
int ABT_xstream_equal(ABT_xstream xstream1, ABT_xstream xstream2, ABT_bool *result)
Compare two execution stream handles for equality.
void ABTI_mem_finalize_local(ABTI_xstream *p_local_xstream)
static ABTU_noreturn void ABTU_unreachable(void)
ABTI_ythread * p_replace_waiter
static void ABTD_atomic_release_store_int(ABTD_atomic_int *ptr, int val)
@ ABT_XSTREAM_STATE_TERMINATED
static void * ABTD_atomic_relaxed_load_ptr(const ABTD_atomic_ptr *ptr)
ABT_xstream_state
State of an execution stream.
void ABTI_sched_print(ABTI_sched *p_sched, FILE *p_os, int indent, ABT_bool print_sub)
#define ABTI_THREAD_REQ_JOIN