8 static void ABTI_xstream_set_new_rank(ABTI_xstream *);
9 static ABT_bool ABTI_xstream_take_rank(ABTI_xstream *,
int);
10 static void ABTI_xstream_return_rank(ABTI_xstream *);
30 ABTI_local *p_local = ABTI_local_get_local();
32 ABTI_xstream *p_newxstream;
37 ABTI_CHECK_ERROR(abt_errno);
39 p_sched = ABTI_sched_get_ptr(sched);
40 ABTI_CHECK_TRUE(p_sched->used == ABTI_SCHED_NOT_USED,
44 abt_errno = ABTI_xstream_create(&p_local, p_sched, &p_newxstream);
45 ABTI_CHECK_ERROR(abt_errno);
48 abt_errno = ABTI_xstream_start(p_local, p_newxstream);
49 ABTI_CHECK_ERROR(abt_errno);
52 *newxstream = ABTI_xstream_get_handle(p_newxstream);
63 int ABTI_xstream_create(ABTI_local **pp_local, ABTI_sched *p_sched,
64 ABTI_xstream **pp_xstream)
67 ABTI_xstream *p_newxstream;
69 p_newxstream = (ABTI_xstream *)
ABTU_malloc(
sizeof(ABTI_xstream));
71 ABTI_xstream_set_new_rank(p_newxstream);
73 p_newxstream->type = ABTI_XSTREAM_TYPE_SECONDARY;
74 ABTD_atomic_relaxed_store_int(&p_newxstream->state,
76 p_newxstream->scheds = NULL;
77 p_newxstream->num_scheds = 0;
78 p_newxstream->max_scheds = 0;
79 ABTD_atomic_relaxed_store_uint32(&p_newxstream->request, 0);
80 p_newxstream->p_req_arg = NULL;
81 p_newxstream->p_main_sched = NULL;
84 ABTI_spinlock_clear(&p_newxstream->sched_lock);
87 abt_errno = ABTI_xstream_set_main_sched(pp_local, p_newxstream, p_sched);
88 ABTI_CHECK_ERROR(abt_errno);
90 LOG_EVENT(
"[E%d] created\n", p_newxstream->rank);
93 *pp_xstream = p_newxstream;
103 int ABTI_xstream_create_primary(ABTI_local **pp_local,
104 ABTI_xstream **pp_xstream)
107 ABTI_xstream *p_newxstream;
113 ABTI_CHECK_ERROR(abt_errno);
115 abt_errno = ABTI_xstream_create(pp_local, p_sched, &p_newxstream);
116 ABTI_CHECK_ERROR(abt_errno);
118 p_newxstream->type = ABTI_XSTREAM_TYPE_PRIMARY;
120 *pp_xstream = p_newxstream;
151 ABTI_local *p_local = ABTI_local_get_local();
152 ABTI_xstream *p_newxstream;
156 ABTI_sched_create_basic(predef, num_pools, pools, config, &p_sched);
157 ABTI_CHECK_ERROR(abt_errno);
159 abt_errno = ABTI_xstream_create(&p_local, p_sched, &p_newxstream);
160 ABTI_CHECK_ERROR(abt_errno);
163 abt_errno = ABTI_xstream_start(p_local, p_newxstream);
164 ABTI_CHECK_ERROR(abt_errno);
166 *newxstream = ABTI_xstream_get_handle(p_newxstream);
193 ABTI_local *p_local = ABTI_local_get_local();
194 ABTI_xstream *p_newxstream;
199 p_newxstream = (ABTI_xstream *)
ABTU_malloc(
sizeof(ABTI_xstream));
201 if (ABTI_xstream_take_rank(p_newxstream, rank) ==
ABT_FALSE) {
211 ABTI_CHECK_ERROR(abt_errno);
213 p_sched = ABTI_sched_get_ptr(sched);
214 ABTI_CHECK_TRUE(p_sched->used == ABTI_SCHED_NOT_USED,
218 p_newxstream->type = ABTI_XSTREAM_TYPE_SECONDARY;
219 ABTD_atomic_relaxed_store_int(&p_newxstream->state,
221 p_newxstream->scheds = NULL;
222 p_newxstream->num_scheds = 0;
223 p_newxstream->max_scheds = 0;
224 ABTD_atomic_relaxed_store_uint32(&p_newxstream->request, 0);
225 p_newxstream->p_req_arg = NULL;
226 p_newxstream->p_main_sched = NULL;
229 ABTI_spinlock_clear(&p_newxstream->sched_lock);
232 abt_errno = ABTI_xstream_set_main_sched(&p_local, p_newxstream, p_sched);
233 ABTI_CHECK_ERROR(abt_errno);
235 LOG_EVENT(
"[E%d] created\n", p_newxstream->rank);
238 abt_errno = ABTI_xstream_start(p_local, p_newxstream);
239 ABTI_CHECK_ERROR(abt_errno);
242 *newxstream = ABTI_xstream_get_handle(p_newxstream);
253 int ABTI_xstream_start(ABTI_local *p_local, ABTI_xstream *p_xstream)
258 ABTI_ASSERT(ABTD_atomic_relaxed_load_int(&p_xstream->state) ==
262 ABTI_xstream_push_sched(p_xstream, p_xstream->p_main_sched);
264 if (p_xstream->type == ABTI_XSTREAM_TYPE_PRIMARY) {
265 LOG_EVENT(
"[E%d] start\n", p_xstream->rank);
267 abt_errno = ABTD_xstream_context_set_self(&p_xstream->ctx);
268 ABTI_CHECK_ERROR_MSG(abt_errno,
"ABTD_xstream_context_set_self");
271 ABTI_sched *p_sched = p_xstream->p_main_sched;
272 abt_errno = ABTI_thread_create_main_sched(p_local, p_xstream, p_sched);
273 ABTI_CHECK_ERROR(abt_errno);
274 p_sched->p_thread->p_last_xstream = p_xstream;
279 ABTD_xstream_context_create(ABTI_xstream_launch_main_sched,
280 (
void *)p_xstream, &p_xstream->ctx);
281 ABTI_CHECK_ERROR_MSG(abt_errno,
"ABTD_xstream_context_create");
286 ABTD_affinity_set(&p_xstream->ctx, p_xstream->rank);
308 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
309 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
312 ABTD_atomic_relaxed_store_uint32(&p_xstream->request, 0);
313 p_xstream->p_req_arg = NULL;
314 abt_errno = ABTD_xstream_context_revive(&p_xstream->ctx);
315 ABTI_CHECK_ERROR(abt_errno);
329 int ABTI_xstream_start_primary(ABTI_local **pp_local, ABTI_xstream *p_xstream,
330 ABTI_thread *p_thread)
335 ABTI_xstream_push_sched(p_xstream, p_xstream->p_main_sched);
338 ABTI_ASSERT(ABTD_atomic_relaxed_load_int(&p_xstream->state) ==
341 LOG_EVENT(
"[E%d] start\n", p_xstream->rank);
343 abt_errno = ABTD_xstream_context_set_self(&p_xstream->ctx);
344 ABTI_CHECK_ERROR_MSG(abt_errno,
"ABTD_xstream_context_set_self");
348 ABTD_affinity_set(&p_xstream->ctx, p_xstream->rank);
352 ABTI_sched *p_sched = p_xstream->p_main_sched;
353 abt_errno = ABTI_thread_create_main_sched(*pp_local, p_xstream, p_sched);
354 ABTI_CHECK_ERROR(abt_errno);
355 p_sched->p_thread->p_last_xstream = p_xstream;
358 LOG_EVENT(
"[U%" PRIu64
":E%d] yield\n", ABTI_thread_get_id(p_thread),
359 p_thread->p_last_xstream->rank);
360 ABTI_thread_context_switch_thread_to_sched(pp_local, p_thread, p_sched);
363 LOG_EVENT(
"[U%" PRIu64
":E%d] resume\n", ABTI_thread_get_id(p_thread),
364 p_thread->p_last_xstream->rank);
391 ABTI_local *p_local = ABTI_local_get_local();
394 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(h_xstream);
395 if (p_xstream == NULL)
400 ABTI_CHECK_TRUE_MSG(p_local == NULL || p_xstream != p_local->p_xstream,
402 "The current xstream cannot be freed.");
404 ABTI_CHECK_TRUE_MSG(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
406 "The primary xstream cannot be freed explicitly.");
409 if (ABTD_atomic_acquire_load_int(&p_xstream->state) !=
411 abt_errno = ABTI_xstream_join(&p_local, p_xstream);
412 ABTI_CHECK_ERROR(abt_errno);
416 abt_errno = ABTI_xstream_free(p_local, p_xstream);
417 ABTI_CHECK_ERROR(abt_errno);
446 ABTI_local *p_local = ABTI_local_get_local();
447 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
448 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
450 abt_errno = ABTI_xstream_join(&p_local, p_xstream);
451 ABTI_CHECK_ERROR(abt_errno);
476 ABTI_local *p_local = ABTI_local_get_local();
485 if (p_local == NULL) {
490 ABTI_xstream *p_xstream = p_local->p_xstream;
491 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
494 ABTI_xstream_set_request(p_xstream, ABTI_XSTREAM_REQ_EXIT);
498 #ifndef ABT_CONFIG_DISABLE_EXT_THREAD 504 ABTI_thread_yield(&p_local, p_local->p_thread);
505 }
while (ABTD_atomic_acquire_load_int(&p_xstream->state) !=
527 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
529 ABTI_CHECK_TRUE_MSG(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
531 "The primary xstream cannot be canceled.");
534 ABTI_xstream_set_request(p_xstream, ABTI_XSTREAM_REQ_CANCEL);
560 ABTI_local *p_local = ABTI_local_get_local();
570 if (p_local == NULL) {
576 ABTI_xstream *p_xstream = p_local->p_xstream;
577 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
580 *xstream = ABTI_xstream_get_handle(p_xstream);
603 ABTI_local *p_local = ABTI_local_get_local();
612 if (p_local == NULL) {
617 ABTI_xstream *p_xstream = p_local->p_xstream;
618 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
621 *rank = (int)p_xstream->rank;
643 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
644 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
646 p_xstream->rank = rank;
650 ABTD_affinity_set(&p_xstream->ctx, p_xstream->rank);
673 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
674 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
675 *rank = (int)p_xstream->rank;
717 ABTI_local *p_local = ABTI_local_get_local();
722 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
723 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
725 ABTI_thread *p_thread = p_local->p_thread;
733 if (ABTD_atomic_acquire_load_int(&p_xstream->state) ==
735 if (p_thread->p_last_xstream != p_xstream) {
743 if (p_xstream->p_main_sched) {
746 if (ABTI_sched_get_effective_size(p_local, p_xstream->p_main_sched) >
756 ABTI_CHECK_ERROR(abt_errno);
758 p_sched = ABTI_sched_get_ptr(sched);
759 ABTI_CHECK_TRUE(p_sched->used == ABTI_SCHED_NOT_USED,
763 abt_errno = ABTI_xstream_set_main_sched(&p_local, p_xstream, p_sched);
764 ABTI_CHECK_ERROR(abt_errno);
791 ABTI_local *p_local = ABTI_local_get_local();
794 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
795 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
798 abt_errno = ABTI_sched_create_basic(predef, num_pools, pools,
800 ABTI_CHECK_ERROR(abt_errno);
802 abt_errno = ABTI_xstream_set_main_sched(&p_local, p_xstream, p_sched);
803 ABTI_CHECK_ERROR(abt_errno);
829 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
830 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
832 *sched = ABTI_sched_get_handle(p_xstream->p_main_sched);
860 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
861 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
862 ABTI_sched *p_sched = p_xstream->p_main_sched;
863 max_pools = p_sched->num_pools > max_pools ? max_pools : p_sched->num_pools;
864 memcpy(pools, p_sched->pools,
sizeof(
ABT_pool) * max_pools);
887 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
888 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
919 ABTI_xstream *p_xstream1 = ABTI_xstream_get_ptr(xstream1);
920 ABTI_xstream *p_xstream2 = ABTI_xstream_get_ptr(xstream2);
972 ABTI_xstream *p_xstream;
974 p_xstream = ABTI_xstream_get_ptr(xstream);
975 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1004 ABTI_local *p_local = ABTI_local_get_local();
1005 ABTI_xstream *p_xstream = p_local->p_xstream;
1006 ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
1008 abt_errno = ABTI_xstream_run_unit(&p_local, p_xstream, unit, p_pool);
1009 ABTI_CHECK_ERROR(abt_errno);
1019 int ABTI_xstream_run_unit(ABTI_local **pp_local, ABTI_xstream *p_xstream,
1027 ABT_thread thread = p_pool->u_get_thread(unit);
1028 ABTI_thread *p_thread = ABTI_thread_get_ptr(thread);
1030 abt_errno = ABTI_xstream_schedule_thread(pp_local, p_xstream, p_thread);
1031 ABTI_CHECK_ERROR(abt_errno);
1034 ABT_task task = p_pool->u_get_task(unit);
1035 ABTI_task *p_task = ABTI_task_get_ptr(task);
1037 ABTI_xstream_schedule_task(*pp_local, p_xstream, p_task);
1066 ABTI_local *p_local = ABTI_local_get_local();
1075 if (p_local == NULL) {
1080 ABTI_xstream *p_xstream = p_local->p_xstream;
1082 abt_errno = ABTI_xstream_check_events(p_xstream, sched);
1083 ABTI_CHECK_ERROR(abt_errno);
1093 int ABTI_xstream_check_events(ABTI_xstream *p_xstream,
ABT_sched sched)
1096 ABTI_sched *p_sched = ABTI_sched_get_ptr(sched);
1097 ABTI_CHECK_NULL_SCHED_PTR(p_sched);
1099 ABTI_info_check_print_all_thread_stacks();
1101 uint32_t request = ABTD_atomic_acquire_load_uint32(&p_xstream->request);
1102 if (request & ABTI_XSTREAM_REQ_JOIN) {
1103 ABTI_sched_finish(p_sched);
1106 if ((request & ABTI_XSTREAM_REQ_EXIT) ||
1107 (request & ABTI_XSTREAM_REQ_CANCEL)) {
1108 ABTI_sched_exit(p_sched);
1135 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1136 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1138 abt_errno = ABTD_affinity_set_cpuset(&p_xstream->ctx, 1, &cpuid);
1139 ABTI_CHECK_ERROR(abt_errno);
1165 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1166 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1168 abt_errno = ABTD_affinity_get_cpuset(&p_xstream->ctx, 1, cpuid, NULL);
1169 ABTI_CHECK_ERROR(abt_errno);
1196 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1197 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1199 abt_errno = ABTD_affinity_set_cpuset(&p_xstream->ctx, cpuset_size, cpuset);
1200 ABTI_CHECK_ERROR(abt_errno);
1234 ABTI_xstream *p_xstream = ABTI_xstream_get_ptr(xstream);
1235 ABTI_CHECK_NULL_XSTREAM_PTR(p_xstream);
1237 if (cpuset == NULL && num_cpus == NULL) {
1241 abt_errno = ABTD_affinity_get_cpuset(&p_xstream->ctx, cpuset_size, cpuset,
1243 ABTI_CHECK_ERROR(abt_errno);
1257 int ABTI_xstream_join(ABTI_local **pp_local, ABTI_xstream *p_xstream)
1260 ABTI_local *p_local;
1261 ABTI_thread *p_thread;
1264 ABTI_CHECK_TRUE_MSG(p_xstream->type != ABTI_XSTREAM_TYPE_PRIMARY,
1266 "The primary ES cannot be joined.");
1272 p_local = *pp_local;
1273 p_thread = p_local ? p_local->p_thread : NULL;
1283 ABTI_CHECK_TRUE_MSG(p_xstream != p_local->p_xstream,
1285 "The target ES should be different.");
1289 if (ABTD_atomic_acquire_load_int(&p_xstream->state) ==
1296 ABTI_POOL_SET_CONSUMER(p_thread->p_pool,
1297 ABTI_self_get_native_thread_id(p_local));
1300 p_xstream->p_req_arg = (
void *)p_thread;
1301 ABTI_thread_set_blocked(p_thread);
1304 ABTI_xstream_set_request(p_xstream, ABTI_XSTREAM_REQ_JOIN);
1307 ABTI_thread_suspend(pp_local, p_thread);
1310 ABTI_xstream_set_request(p_xstream, ABTI_XSTREAM_REQ_JOIN);
1312 while (ABTD_atomic_acquire_load_int(&p_xstream->state) !=
1314 #ifndef ABT_CONFIG_DISABLE_EXT_THREAD 1316 ABTD_atomic_pause();
1320 ABTI_thread_yield(pp_local, p_local->p_thread);
1321 p_local = *pp_local;
1327 abt_errno = ABTD_xstream_context_join(&p_xstream->ctx);
1328 ABTI_CHECK_ERROR_MSG(abt_errno,
"ABTD_xstream_context_join");
1337 int ABTI_xstream_free(ABTI_local *p_local, ABTI_xstream *p_xstream)
1341 LOG_EVENT(
"[E%d] freed\n", p_xstream->rank);
1346 ABTI_xstream_return_rank(p_xstream);
1349 ABTI_sched *p_cursched = p_xstream->p_main_sched;
1350 if (p_cursched != NULL) {
1351 abt_errno = ABTI_sched_discard_and_free(p_local, p_cursched);
1352 ABTI_CHECK_ERROR(abt_errno);
1359 if (p_xstream->type == ABTI_XSTREAM_TYPE_SECONDARY) {
1360 abt_errno = ABTD_xstream_context_free(&p_xstream->ctx);
1361 ABTI_CHECK_ERROR(abt_errno);
1375 void ABTI_xstream_schedule(
void *p_arg)
1377 ABTI_local *p_local = ABTI_local_get_local();
1378 ABTI_xstream *p_xstream = (ABTI_xstream *)p_arg;
1380 ABTI_ASSERT(ABTD_atomic_relaxed_load_int(&p_xstream->state) ==
1386 ABTI_sched *p_sched = p_xstream->p_main_sched;
1390 ABTI_LOG_SET_SCHED(p_sched);
1392 LOG_EVENT(
"[S%" PRIu64
"] start\n", p_sched->id);
1393 p_sched->run(ABTI_sched_get_handle(p_sched));
1394 LOG_EVENT(
"[S%" PRIu64
"] end\n", p_sched->id);
1397 ABTI_spinlock_release(&p_xstream->sched_lock);
1399 request = ABTD_atomic_acquire_load_uint32(&p_xstream->request);
1403 if ((request & ABTI_XSTREAM_REQ_EXIT) ||
1404 (request & ABTI_XSTREAM_REQ_CANCEL))
1409 if (request & ABTI_XSTREAM_REQ_JOIN) {
1410 if (ABTI_sched_get_effective_size(p_local,
1411 p_xstream->p_main_sched) == 0) {
1414 if (p_xstream->p_req_arg) {
1415 ABTI_thread_set_ready(p_local,
1416 (ABTI_thread *)p_xstream->p_req_arg);
1417 p_xstream->p_req_arg = NULL;
1425 ABTD_atomic_release_store_int(&p_xstream->state,
1427 LOG_EVENT(
"[E%d] terminated\n", p_xstream->rank);
1430 int ABTI_xstream_schedule_thread(ABTI_local **pp_local, ABTI_xstream *p_xstream,
1431 ABTI_thread *p_thread)
1434 ABTI_local *p_local = *pp_local;
1436 #ifndef ABT_CONFIG_DISABLE_THREAD_CANCEL 1437 if (ABTD_atomic_acquire_load_uint32(&p_thread->request) &
1438 ABTI_THREAD_REQ_CANCEL) {
1439 LOG_EVENT(
"[U%" PRIu64
":E%d] canceled\n", ABTI_thread_get_id(p_thread),
1441 ABTD_thread_cancel(p_local, p_thread);
1442 ABTI_xstream_terminate_thread(p_local, p_thread);
1447 #ifndef ABT_CONFIG_DISABLE_MIGRATION 1448 if (ABTD_atomic_acquire_load_uint32(&p_thread->request) &
1449 ABTI_THREAD_REQ_MIGRATE) {
1450 abt_errno = ABTI_xstream_migrate_thread(p_local, p_thread);
1451 ABTI_CHECK_ERROR(abt_errno);
1456 #ifndef ABT_CONFIG_DISABLE_STACKABLE_SCHED 1458 if (p_thread->is_sched != NULL) {
1459 p_thread->is_sched->p_ctx = &p_thread->ctx;
1460 ABTI_xstream_push_sched(p_xstream, p_thread->is_sched);
1466 p_thread->p_last_xstream = p_xstream;
1472 LOG_EVENT(
"[U%" PRIu64
":E%d] start running\n",
1473 ABTI_thread_get_id(p_thread), p_xstream->rank);
1474 ABTI_sched *p_sched = ABTI_xstream_get_top_sched(p_xstream);
1475 #ifndef ABT_CONFIG_DISABLE_STACKABLE_SCHED 1476 if (p_thread->is_sched != NULL) {
1477 ABTI_thread_context_switch_sched_to_sched(pp_local, p_sched,
1478 p_thread->is_sched);
1480 p_local = *pp_local;
1485 ABTI_thread_context_switch_sched_to_thread(pp_local, p_sched, p_thread);
1487 p_local = *pp_local;
1490 p_thread = p_local->p_thread;
1491 #ifndef ABT_CONFIG_DISABLE_STACKABLE_SCHED 1495 p_xstream = p_thread->p_last_xstream;
1496 LOG_EVENT(
"[U%" PRIu64
":E%d] stopped\n", ABTI_thread_get_id(p_thread),
1499 #ifndef ABT_CONFIG_DISABLE_STACKABLE_SCHED 1501 if (p_thread->is_sched != NULL) {
1502 ABTI_xstream_pop_sched(p_xstream);
1506 ABTI_spinlock_release(&p_xstream->sched_lock);
1514 uint32_t request = ABTD_atomic_acquire_load_uint32(&p_thread->request);
1515 if (request & ABTI_THREAD_REQ_STOP) {
1517 LOG_EVENT(
"[U%" PRIu64
":E%d] %s\n", ABTI_thread_get_id(p_thread),
1519 (request & ABTI_THREAD_REQ_TERMINATE
1521 : ((request & ABTI_THREAD_REQ_EXIT) ?
"exit called" 1523 ABTI_xstream_terminate_thread(p_local, p_thread);
1524 #ifndef ABT_CONFIG_DISABLE_THREAD_CANCEL 1525 }
else if (request & ABTI_THREAD_REQ_CANCEL) {
1526 LOG_EVENT(
"[U%" PRIu64
":E%d] canceled\n", ABTI_thread_get_id(p_thread),
1528 ABTD_thread_cancel(p_local, p_thread);
1529 ABTI_xstream_terminate_thread(p_local, p_thread);
1531 }
else if (!(request & ABTI_THREAD_REQ_NON_YIELD)) {
1535 ABTI_POOL_ADD_THREAD(p_thread, ABTI_self_get_native_thread_id(p_local));
1536 }
else if (request & ABTI_THREAD_REQ_BLOCK) {
1537 LOG_EVENT(
"[U%" PRIu64
":E%d] check blocked\n",
1538 ABTI_thread_get_id(p_thread), p_xstream->rank);
1539 ABTI_thread_unset_request(p_thread, ABTI_THREAD_REQ_BLOCK);
1540 #ifndef ABT_CONFIG_DISABLE_MIGRATION 1541 }
else if (request & ABTI_THREAD_REQ_MIGRATE) {
1543 abt_errno = ABTI_xstream_migrate_thread(p_local, p_thread);
1544 ABTI_CHECK_ERROR(abt_errno);
1546 }
else if (request & ABTI_THREAD_REQ_ORPHAN) {
1549 LOG_EVENT(
"[U%" PRIu64
":E%d] orphaned\n", ABTI_thread_get_id(p_thread),
1551 ABTI_thread_unset_request(p_thread, ABTI_THREAD_REQ_ORPHAN);
1552 p_thread->p_pool->u_free(&p_thread->unit);
1553 p_thread->p_pool = NULL;
1554 }
else if (request & ABTI_THREAD_REQ_NOPUSH) {
1556 LOG_EVENT(
"[U%" PRIu64
":E%d] not pushed\n",
1557 ABTI_thread_get_id(p_thread), p_xstream->rank);
1558 ABTI_thread_unset_request(p_thread, ABTI_THREAD_REQ_NOPUSH);
1572 void ABTI_xstream_schedule_task(ABTI_local *p_local, ABTI_xstream *p_xstream,
1575 #ifndef ABT_CONFIG_DISABLE_TASK_CANCEL 1576 if (ABTD_atomic_acquire_load_uint32(&p_task->request) &
1577 ABTI_TASK_REQ_CANCEL) {
1578 ABTI_xstream_terminate_task(p_local, p_task);
1584 p_local->p_task = p_task;
1585 p_local->p_thread = NULL;
1591 p_task->p_xstream = p_xstream;
1593 #ifdef ABT_CONFIG_DISABLE_STACKABLE_SCHED 1595 LOG_EVENT(
"[T%" PRIu64
":E%d] running\n", ABTI_task_get_id(p_task),
1597 ABTI_LOG_SET_SCHED(NULL);
1598 p_task->f_task(p_task->p_arg);
1601 if (p_task->is_sched != NULL) {
1602 ABTI_sched *current_sched = ABTI_xstream_get_top_sched(p_xstream);
1603 ABTI_thread *p_last_thread = current_sched->p_thread;
1605 p_task->is_sched->p_ctx = current_sched->p_ctx;
1606 ABTI_xstream_push_sched(p_xstream, p_task->is_sched);
1608 p_task->is_sched->p_thread = p_last_thread;
1609 LOG_EVENT(
"[S%" PRIu64
":E%d] stacked sched start\n",
1610 p_task->is_sched->id, p_xstream->rank);
1614 LOG_EVENT(
"[T%" PRIu64
":E%d] running\n", ABTI_task_get_id(p_task),
1616 ABTI_LOG_SET_SCHED(p_task->is_sched ? p_task->is_sched : NULL);
1618 p_task->f_task(p_task->p_arg);
1621 if (p_task->is_sched != NULL) {
1622 ABTI_xstream_pop_sched(p_xstream);
1625 ABTI_spinlock_release(&p_xstream->sched_lock);
1626 ABTI_LOG_SET_SCHED(ABTI_xstream_get_top_sched(p_xstream));
1627 LOG_EVENT(
"[S%" PRIu64
":E%d] stacked sched end\n",
1628 p_task->is_sched->id, p_xstream->rank);
1632 ABTI_LOG_SET_SCHED(ABTI_xstream_get_top_sched(p_xstream));
1633 LOG_EVENT(
"[T%" PRIu64
":E%d] stopped\n", ABTI_task_get_id(p_task),
1637 ABTI_xstream_terminate_task(p_local, p_task);
1640 int ABTI_xstream_migrate_thread(ABTI_local *p_local, ABTI_thread *p_thread)
1642 #ifdef ABT_CONFIG_DISABLE_MIGRATION 1649 if (p_thread->attr.f_cb) {
1650 ABT_thread thread = ABTI_thread_get_handle(p_thread);
1651 p_thread->attr.f_cb(thread, p_thread->attr.p_cb_arg);
1654 ABTI_spinlock_acquire(&p_thread->lock);
1658 (ABTI_pool *)ABTI_thread_extract_req_arg(p_thread,
1659 ABTI_THREAD_REQ_MIGRATE);
1660 ABTI_thread_unset_request(p_thread, ABTI_THREAD_REQ_MIGRATE);
1662 LOG_EVENT(
"[U%" PRIu64
"] migration: E%d -> NT %p\n",
1663 ABTI_thread_get_id(p_thread), p_thread->p_last_xstream->rank,
1664 (
void *)p_pool->consumer_id);
1667 p_thread->p_pool = p_pool;
1670 ABTI_POOL_PUSH(p_pool, p_thread->unit,
1671 ABTI_self_get_native_thread_id(p_local));
1673 ABTI_spinlock_release(&p_thread->lock);
1675 ABTI_pool_dec_num_migrations(p_pool);
1678 ABTI_CHECK_ERROR(abt_errno);
1689 int ABTI_xstream_set_main_sched(ABTI_local **pp_local, ABTI_xstream *p_xstream,
1690 ABTI_sched *p_sched)
1693 ABTI_thread *p_thread = NULL;
1694 ABTI_sched *p_main_sched;
1695 ABTI_pool *p_tar_pool = NULL;
1698 #ifndef ABT_CONFIG_DISABLE_POOL_CONSUMER_CHECK 1701 ABTI_native_thread_id consumer_id =
1702 ABTI_xstream_get_native_thread_id(p_xstream);
1703 for (p = 0; p < p_sched->num_pools; p++) {
1704 ABTI_pool *p_pool = ABTI_pool_get_ptr(p_sched->pools[p]);
1705 abt_errno = ABTI_pool_set_consumer(p_pool, consumer_id);
1706 ABTI_CHECK_ERROR(abt_errno);
1714 p_sched->used = ABTI_SCHED_MAIN;
1716 p_main_sched = p_xstream->p_main_sched;
1717 if (p_main_sched == NULL) {
1719 p_xstream->p_main_sched = p_sched;
1725 p_thread = (*pp_local)->p_thread;
1726 ABTI_ASSERT(p_thread != NULL);
1728 p_tar_pool = ABTI_pool_get_ptr(p_sched->pools[0]);
1732 for (p = 0; p < p_main_sched->num_pools; p++) {
1733 if (p_thread->p_pool == ABTI_pool_get_ptr(p_main_sched->pools[p])) {
1735 p_thread->p_pool->u_free(&p_thread->unit);
1736 ABT_thread h_thread = ABTI_thread_get_handle(p_thread);
1737 p_thread->unit = p_tar_pool->u_create_from_thread(h_thread);
1738 p_thread->p_pool = p_tar_pool;
1743 if (p_xstream->type == ABTI_XSTREAM_TYPE_PRIMARY) {
1744 ABTI_CHECK_TRUE(p_thread->type == ABTI_THREAD_TYPE_MAIN,
1748 abt_errno = ABTI_sched_discard_and_free(*pp_local, p_main_sched);
1749 ABTI_CHECK_ERROR(abt_errno);
1756 ABTI_POOL_PUSH(p_tar_pool, p_thread->unit,
1757 ABTI_self_get_native_thread_id(*pp_local));
1760 ABTI_xstream_pop_sched(p_xstream);
1763 p_xstream->p_main_sched = p_sched;
1767 abt_errno = ABTI_xstream_start_primary(pp_local, p_xstream, p_thread);
1768 ABTI_CHECK_ERROR_MSG(abt_errno,
"ABTI_xstream_start");
1771 ABTI_sched_set_request(p_main_sched, ABTI_SCHED_REQ_FINISH);
1775 p_sched->p_thread = p_main_sched->p_thread;
1776 p_sched->p_ctx = p_main_sched->p_ctx;
1777 p_main_sched->p_thread = NULL;
1783 ABTI_POOL_PUSH(p_tar_pool, p_thread->unit,
1784 ABTI_self_get_native_thread_id(*pp_local));
1787 p_xstream->p_main_sched = p_sched;
1790 ABTI_xstream_replace_top_sched(p_xstream, p_sched);
1793 ABTI_thread_set_request(p_thread, ABTI_THREAD_REQ_NOPUSH);
1794 ABTI_thread_context_switch_thread_to_sched(pp_local, p_thread,
1798 abt_errno = ABTI_sched_discard_and_free(*pp_local, p_main_sched);
1799 ABTI_CHECK_ERROR(abt_errno);
1810 void ABTI_xstream_print(ABTI_xstream *p_xstream, FILE *p_os,
int indent,
1815 if (p_xstream == NULL) {
1816 fprintf(p_os,
"%s== NULL ES ==\n", prefix);
1825 switch (p_xstream->type) {
1826 case ABTI_XSTREAM_TYPE_PRIMARY:
1829 case ABTI_XSTREAM_TYPE_SECONDARY:
1836 switch (ABTD_atomic_acquire_load_int(&p_xstream->state)) {
1841 state =
"TERMINATED";
1848 size =
sizeof(char) * (p_xstream->num_scheds * 20 + 4);
1850 scheds_str[0] =
'[';
1851 scheds_str[1] =
' ';
1853 for (i = 0; i < p_xstream->num_scheds; i++) {
1854 sprintf(&scheds_str[pos],
"%p ", (
void *)p_xstream->scheds[i]);
1855 pos = strlen(scheds_str);
1857 scheds_str[pos] =
']';
1864 "%srequest : 0x%x\n" 1865 "%smax_scheds: %d\n" 1866 "%snum_scheds: %d\n" 1868 "%smain_sched: %p\n",
1869 prefix, (
void *)p_xstream, prefix, p_xstream->rank, prefix, type,
1870 prefix, state, prefix,
1871 ABTD_atomic_acquire_load_uint32(&p_xstream->request), prefix,
1872 p_xstream->max_scheds, prefix, p_xstream->num_scheds, prefix,
1873 scheds_str, prefix, (
void *)p_xstream->p_main_sched);
1877 ABTI_sched_print(p_xstream->p_main_sched, p_os, indent + ABTI_INDENT,
1886 void *ABTI_xstream_launch_main_sched(
void *p_arg)
1889 ABTI_xstream *p_xstream = (ABTI_xstream *)p_arg;
1892 ABTI_local *p_local = NULL;
1893 abt_errno = ABTI_local_init(&p_local);
1894 ABTI_local_set_local(p_local);
1895 ABTI_CHECK_ERROR(abt_errno);
1896 p_local->p_xstream = p_xstream;
1899 ABTI_sched *p_sched = p_xstream->p_main_sched;
1900 if (!p_sched->p_thread) {
1901 abt_errno = ABTI_thread_create_main_sched(p_local, p_xstream, p_sched);
1902 ABTI_CHECK_ERROR(abt_errno);
1903 p_sched->p_thread->p_last_xstream = p_xstream;
1907 p_local->p_thread = p_sched->p_thread;
1910 LOG_EVENT(
"[E%d] start\n", p_xstream->rank);
1911 ABTI_xstream_schedule(p_arg);
1912 LOG_EVENT(
"[E%d] end\n", p_xstream->rank);
1915 ABTI_local_finalize(&p_local);
1930 static void ABTI_xstream_set_new_rank(ABTI_xstream *p_xstream)
1937 ABTI_global_update_max_xstreams(0);
1955 p_xstream->rank = rank;
1958 static ABT_bool ABTI_xstream_take_rank(ABTI_xstream *p_xstream,
int rank)
1963 ABTI_global_update_max_xstreams(rank + 1);
1980 p_xstream->rank = rank;
1986 static void ABTI_xstream_return_rank(ABTI_xstream *p_xstream)
#define HANDLE_ERROR(msg)
struct ABT_unit_opaque * ABT_unit
int ABT_xstream_self(ABT_xstream *xstream)
Return the ES handle associated with the caller work unit.
struct ABT_xstream_opaque * ABT_xstream
struct ABT_sched_opaque * ABT_sched
char * ABTU_get_indent_str(int indent)
int ABT_xstream_get_num(int *num_xstreams)
Return the number of current existing ESs.
#define ABT_ERR_INV_THREAD
int ABT_xstream_create(ABT_sched sched, ABT_xstream *newxstream)
Create a new ES and return its handle through newxstream.
struct ABT_task_opaque * ABT_task
int ABT_xstream_self_rank(int *rank)
Return the rank of ES associated with the caller work unit.
static void * ABTU_malloc(size_t size)
int ABT_xstream_get_rank(ABT_xstream xstream, int *rank)
Return the rank of ES.
int ABT_xstream_cancel(ABT_xstream xstream)
Request the cancellation of the target ES.
int ABT_xstream_get_main_sched(ABT_xstream xstream, ABT_sched *sched)
Get the main scheduler of the target ES.
int ABT_xstream_is_primary(ABT_xstream xstream, ABT_bool *flag)
Check if the target ES is the primary ES.
struct ABT_pool_opaque * ABT_pool
int ABT_xstream_create_with_rank(ABT_sched sched, int rank, ABT_xstream *newxstream)
Create a new ES with a specific rank.
int ABT_xstream_set_main_sched_basic(ABT_xstream xstream, ABT_sched_predef predef, int num_pools, ABT_pool *pools)
Set the main scheduler for xstream with a predefined scheduler.
struct ABT_thread_opaque * ABT_thread
int ABT_xstream_get_main_pools(ABT_xstream xstream, int max_pools, ABT_pool *pools)
Get the pools of the main scheduler of the target ES.
#define HANDLE_ERROR_FUNC_WITH_CODE(n)
int ABT_xstream_create_basic(ABT_sched_predef predef, int num_pools, ABT_pool *pools, ABT_sched_config config, ABT_xstream *newxstream)
Create a new ES with a predefined scheduler and return its handle through newxstream.
int ABT_xstream_join(ABT_xstream xstream)
Wait for xstream to terminate.
int ABT_xstream_check_events(ABT_sched sched)
Check the events and process them.
int ABT_xstream_set_affinity(ABT_xstream xstream, int cpuset_size, int *cpuset)
Set the CPU affinity of the target ES.
int ABT_xstream_set_cpubind(ABT_xstream xstream, int cpuid)
Bind the target ES to a target CPU.
ABTI_global * gp_ABTI_global
int ABT_xstream_get_cpubind(ABT_xstream xstream, int *cpuid)
Get the CPU binding for the target ES.
int ABT_xstream_get_affinity(ABT_xstream xstream, int cpuset_size, int *cpuset, int *num_cpus)
Get the CPU affinity for the target ES.
#define LOG_EVENT(fmt,...)
int ABT_xstream_exit(void)
Terminate the ES associated with the calling ULT.
#define ABT_ERR_UNINITIALIZED
int ABT_xstream_free(ABT_xstream *xstream)
Release the ES object associated with ES handle.
#define ABT_ERR_XSTREAM_STATE
#define ABT_ERR_MIGRATION_NA
int ABT_xstream_get_state(ABT_xstream xstream, ABT_xstream_state *state)
Return the state of xstream.
#define ABT_ERR_INV_XSTREAM_RANK
struct ABT_sched_config_opaque * ABT_sched_config
int ABT_xstream_revive(ABT_xstream xstream)
Restart an ES that has been joined by ABT_xstream_join().
int ABT_xstream_set_main_sched(ABT_xstream xstream, ABT_sched sched)
Set the main scheduler of the target ES.
#define ABT_ERR_INV_SCHED
int ABT_xstream_set_rank(ABT_xstream xstream, const int rank)
Set the rank for target ES.
#define ABT_SCHED_CONFIG_NULL
#define ABT_ERR_INV_XSTREAM
static void ABTU_free(void *ptr)
int ABT_xstream_run_unit(ABT_unit unit, ABT_pool pool)
Execute a unit on the local ES.
int ABT_xstream_equal(ABT_xstream xstream1, ABT_xstream xstream2, ABT_bool *result)
Compare two ES handles for equality.
static void * ABTU_calloc(size_t num, size_t size)