ARGOBOTS  dce6e727ffc4ca5b3ffc04cb9517c6689be51ec5
fifo.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 #include "thread_queue.h"
8 #include <time.h>
9 
10 /* FIFO pool implementation */
11 
12 static int pool_init(ABT_pool pool, ABT_pool_config config);
13 static void pool_free(ABT_pool pool);
14 static ABT_bool pool_is_empty(ABT_pool pool);
15 static size_t pool_get_size(ABT_pool pool);
16 static void pool_push_shared(ABT_pool pool, ABT_unit unit,
17  ABT_pool_context context);
18 static void pool_push_private(ABT_pool pool, ABT_unit unit,
19  ABT_pool_context context);
22 static ABT_thread pool_pop_wait(ABT_pool pool, double time_secs,
23  ABT_pool_context context);
24 static void pool_push_many_shared(ABT_pool pool, const ABT_unit *units,
25  size_t num_units, ABT_pool_context context);
26 static void pool_push_many_private(ABT_pool pool, const ABT_unit *units,
27  size_t num_units, ABT_pool_context context);
28 static void pool_pop_many_shared(ABT_pool pool, ABT_thread *threads,
29  size_t max_threads, size_t *num_popped,
30  ABT_pool_context context);
31 static void pool_pop_many_private(ABT_pool pool, ABT_thread *threads,
32  size_t max_threads, size_t *num_popped,
33  ABT_pool_context context);
34 static void pool_print_all(ABT_pool pool, void *arg,
35  void (*print_fn)(void *, ABT_thread));
36 static ABT_unit pool_create_unit(ABT_pool pool, ABT_thread thread);
37 static void pool_free_unit(ABT_pool pool, ABT_unit unit);
38 
39 /* For backward compatibility */
40 static int pool_remove_shared(ABT_pool pool, ABT_unit unit);
41 static int pool_remove_private(ABT_pool pool, ABT_unit unit);
42 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs);
44 
45 struct data {
46  ABTD_spinlock mutex;
47  thread_queue_t queue;
48 };
49 typedef struct data data_t;
50 
51 static inline data_t *pool_get_data_ptr(void *p_data)
52 {
53  return (data_t *)p_data;
54 }
55 
56 /* Obtain the FIFO pool definition according to the access type */
57 ABTU_ret_err int
58 ABTI_pool_get_fifo_def(ABT_pool_access access,
59  ABTI_pool_required_def *p_required_def,
60  ABTI_pool_optional_def *p_optional_def,
61  ABTI_pool_deprecated_def *p_deprecated_def)
62 {
63  /* Definitions according to the access type */
64  /* FIXME: need better implementation, e.g., lock-free one */
65  switch (access) {
67  p_required_def->p_push = pool_push_private;
68  p_required_def->p_pop = pool_pop_private;
69  p_optional_def->p_push_many = pool_push_many_private;
70  p_optional_def->p_pop_many = pool_pop_many_private;
71  p_deprecated_def->p_remove = pool_remove_private;
72  break;
73 
78  p_required_def->p_push = pool_push_shared;
79  p_required_def->p_pop = pool_pop_shared;
80  p_optional_def->p_push_many = pool_push_many_shared;
81  p_optional_def->p_pop_many = pool_pop_many_shared;
82  p_deprecated_def->p_remove = pool_remove_shared;
83  break;
84 
85  default:
86  ABTI_HANDLE_ERROR(ABT_ERR_INV_POOL_ACCESS);
87  }
88 
89  /* Common definitions regardless of the access type */
90  p_optional_def->p_init = pool_init;
91  p_optional_def->p_free = pool_free;
92  p_required_def->p_is_empty = pool_is_empty;
93  p_optional_def->p_get_size = pool_get_size;
94  p_optional_def->p_pop_wait = pool_pop_wait;
95  p_optional_def->p_print_all = pool_print_all;
96  p_required_def->p_create_unit = pool_create_unit;
97  p_required_def->p_free_unit = pool_free_unit;
98 
99  p_deprecated_def->p_pop_timedwait = pool_pop_timedwait;
100  p_deprecated_def->u_is_in_pool = pool_unit_is_in_pool;
101  return ABT_SUCCESS;
102 }
103 
104 /* Pool functions */
105 
106 static int pool_init(ABT_pool pool, ABT_pool_config config)
107 {
108  ABTI_UNUSED(config);
109  int abt_errno = ABT_SUCCESS;
110  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
111  ABT_pool_access access;
112 
113  data_t *p_data;
114  abt_errno = ABTU_malloc(sizeof(data_t), (void **)&p_data);
115  ABTI_CHECK_ERROR(abt_errno);
116 
117  access = p_pool->access;
118  if (access != ABT_POOL_ACCESS_PRIV) {
119  /* Initialize the mutex */
120  ABTD_spinlock_clear(&p_data->mutex);
121  }
122  thread_queue_init(&p_data->queue);
123 
124  p_pool->data = p_data;
125  return abt_errno;
126 }
127 
128 static void pool_free(ABT_pool pool)
129 {
130  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
131  data_t *p_data = pool_get_data_ptr(p_pool->data);
132  thread_queue_free(&p_data->queue);
133  ABTU_free(p_data);
134 }
135 
137 {
138  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
139  data_t *p_data = pool_get_data_ptr(p_pool->data);
140  return thread_queue_is_empty(&p_data->queue);
141 }
142 
143 static size_t pool_get_size(ABT_pool pool)
144 {
145  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
146  data_t *p_data = pool_get_data_ptr(p_pool->data);
147  return thread_queue_get_size(&p_data->queue);
148 }
149 
150 static void pool_push_shared(ABT_pool pool, ABT_unit unit,
151  ABT_pool_context context)
152 {
153  (void)context;
154  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
155  data_t *p_data = pool_get_data_ptr(p_pool->data);
156  ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit);
157  ABTD_spinlock_acquire(&p_data->mutex);
158  thread_queue_push_tail(&p_data->queue, p_thread);
159  ABTD_spinlock_release(&p_data->mutex);
160 }
161 
162 static void pool_push_private(ABT_pool pool, ABT_unit unit,
163  ABT_pool_context context)
164 {
165  (void)context;
166  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
167  data_t *p_data = pool_get_data_ptr(p_pool->data);
168  ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit);
169  thread_queue_push_tail(&p_data->queue, p_thread);
170 }
171 
172 static void pool_push_many_shared(ABT_pool pool, const ABT_unit *units,
173  size_t num_units, ABT_pool_context context)
174 {
175  (void)context;
176  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
177  data_t *p_data = pool_get_data_ptr(p_pool->data);
178  if (num_units > 0) {
179  ABTD_spinlock_acquire(&p_data->mutex);
180  size_t i;
181  for (i = 0; i < num_units; i++) {
182  ABTI_thread *p_thread =
183  ABTI_unit_get_thread_from_builtin_unit(units[i]);
184  thread_queue_push_tail(&p_data->queue, p_thread);
185  }
186  ABTD_spinlock_release(&p_data->mutex);
187  }
188 }
189 
190 static void pool_push_many_private(ABT_pool pool, const ABT_unit *units,
191  size_t num_units, ABT_pool_context context)
192 {
193  (void)context;
194  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
195  data_t *p_data = pool_get_data_ptr(p_pool->data);
196  size_t i;
197  for (i = 0; i < num_units; i++) {
198  ABTI_thread *p_thread =
199  ABTI_unit_get_thread_from_builtin_unit(units[i]);
200  thread_queue_push_tail(&p_data->queue, p_thread);
201  }
202 }
203 
204 static ABT_thread pool_pop_wait(ABT_pool pool, double time_secs,
205  ABT_pool_context context)
206 {
207  (void)context;
208  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
209  data_t *p_data = pool_get_data_ptr(p_pool->data);
210  double time_start = 0.0;
211  while (1) {
213  &p_data->mutex) == 0) {
214  ABTI_thread *p_thread = thread_queue_pop_head(&p_data->queue);
215  ABTD_spinlock_release(&p_data->mutex);
216  if (p_thread)
217  return ABTI_thread_get_handle(p_thread);
218  }
219  if (time_start == 0.0) {
220  time_start = ABTI_get_wtime();
221  } else {
222  double elapsed = ABTI_get_wtime() - time_start;
223  if (elapsed > time_secs)
224  return ABT_THREAD_NULL;
225  }
226  /* Sleep. */
227  const int sleep_nsecs = 100;
228  struct timespec ts = { 0, sleep_nsecs };
229  nanosleep(&ts, NULL);
230  }
231 }
232 
233 static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
234 {
235  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
236  data_t *p_data = pool_get_data_ptr(p_pool->data);
237  while (1) {
239  &p_data->mutex) == 0) {
240  ABTI_thread *p_thread = thread_queue_pop_head(&p_data->queue);
241  ABTD_spinlock_release(&p_data->mutex);
242  if (p_thread) {
243  return ABTI_unit_get_builtin_unit(p_thread);
244  }
245  }
246  const int sleep_nsecs = 100;
247  struct timespec ts = { 0, sleep_nsecs };
248  nanosleep(&ts, NULL);
249 
250  if (ABTI_get_wtime() > abstime_secs)
251  return ABT_UNIT_NULL;
252  }
253 }
254 
256 {
257  (void)context;
258  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
259  data_t *p_data = pool_get_data_ptr(p_pool->data);
261  &p_data->mutex) == 0) {
262  ABTI_thread *p_thread = thread_queue_pop_head(&p_data->queue);
263  ABTD_spinlock_release(&p_data->mutex);
264  return ABTI_thread_get_handle(p_thread);
265  } else {
266  return ABT_THREAD_NULL;
267  }
268 }
269 
271 {
272  (void)context;
273  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
274  data_t *p_data = pool_get_data_ptr(p_pool->data);
275  ABTI_thread *p_thread = thread_queue_pop_head(&p_data->queue);
276  return ABTI_thread_get_handle(p_thread);
277 }
278 
279 static void pool_pop_many_shared(ABT_pool pool, ABT_thread *threads,
280  size_t max_threads, size_t *num_popped,
281  ABT_pool_context context)
282 {
283  (void)context;
284  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
285  data_t *p_data = pool_get_data_ptr(p_pool->data);
286  if (max_threads != 0 &&
288  &p_data->mutex) == 0) {
289  size_t i;
290  for (i = 0; i < max_threads; i++) {
291  ABTI_thread *p_thread = thread_queue_pop_head(&p_data->queue);
292  if (!p_thread)
293  break;
294  threads[i] = ABTI_thread_get_handle(p_thread);
295  }
296  *num_popped = i;
297  ABTD_spinlock_release(&p_data->mutex);
298  } else {
299  *num_popped = 0;
300  }
301 }
302 
303 static void pool_pop_many_private(ABT_pool pool, ABT_thread *threads,
304  size_t max_threads, size_t *num_popped,
305  ABT_pool_context context)
306 {
307  (void)context;
308  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
309  data_t *p_data = pool_get_data_ptr(p_pool->data);
310  size_t i;
311  for (i = 0; i < max_threads; i++) {
312  ABTI_thread *p_thread = thread_queue_pop_head(&p_data->queue);
313  if (!p_thread)
314  break;
315  threads[i] = ABTI_thread_get_handle(p_thread);
316  }
317  *num_popped = i;
318 }
319 
320 static int pool_remove_shared(ABT_pool pool, ABT_unit unit)
321 {
322  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
323  data_t *p_data = pool_get_data_ptr(p_pool->data);
324  ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit);
325  ABTD_spinlock_acquire(&p_data->mutex);
326  int abt_errno = thread_queue_remove(&p_data->queue, p_thread);
327  ABTD_spinlock_release(&p_data->mutex);
328  return abt_errno;
329 }
330 
331 static int pool_remove_private(ABT_pool pool, ABT_unit unit)
332 {
333  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
334  data_t *p_data = pool_get_data_ptr(p_pool->data);
335  ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit);
336  return thread_queue_remove(&p_data->queue, p_thread);
337 }
338 
339 static void pool_print_all(ABT_pool pool, void *arg,
340  void (*print_fn)(void *, ABT_thread))
341 {
342  ABT_pool_access access;
343  ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
344  data_t *p_data = pool_get_data_ptr(p_pool->data);
345 
346  access = p_pool->access;
347  if (access != ABT_POOL_ACCESS_PRIV) {
348  ABTD_spinlock_acquire(&p_data->mutex);
349  }
350  thread_queue_print_all(&p_data->queue, arg, print_fn);
351  if (access != ABT_POOL_ACCESS_PRIV) {
352  ABTD_spinlock_release(&p_data->mutex);
353  }
354 }
355 
356 /* Unit functions */
357 
359 {
360  ABTI_thread *p_thread = ABTI_unit_get_thread_from_builtin_unit(unit);
361  return ABTD_atomic_acquire_load_int(&p_thread->is_in_pool) ? ABT_TRUE
362  : ABT_FALSE;
363 }
364 
366 {
367  /* Call ABTI_unit_init_builtin() instead. */
368  ABTI_ASSERT(0);
369  return ABT_UNIT_NULL;
370 }
371 
372 static void pool_free_unit(ABT_pool pool, ABT_unit unit)
373 {
374  /* A built-in unit does not need to be freed. This function may not be
375  * called. */
376  ABTI_ASSERT(0);
377 }
ABT_bool
int ABT_bool
Boolean type.
Definition: abt.h:1043
thread_queue_free
static void thread_queue_free(thread_queue_t *p_queue)
Definition: thread_queue.h:29
ABT_pool_context
uint64_t ABT_pool_context
A pool context value.
Definition: abt.h:1566
ABT_thread
struct ABT_thread_opaque * ABT_thread
Work unit handle type.
Definition: abt.h:932
pool_pop_wait
static ABT_thread pool_pop_wait(ABT_pool pool, double time_secs, ABT_pool_context context)
Definition: fifo.c:204
pool_unit_is_in_pool
static ABT_bool pool_unit_is_in_pool(ABT_unit unit)
Definition: fifo.c:358
thread_queue_acquire_spinlock_if_not_empty
static ABTU_ret_err int thread_queue_acquire_spinlock_if_not_empty(thread_queue_t *p_queue, ABTD_spinlock *p_lock)
Definition: thread_queue.h:35
ABT_POOL_ACCESS_MPMC
@ ABT_POOL_ACCESS_MPMC
Definition: abt.h:575
ABT_THREAD_NULL
#define ABT_THREAD_NULL
Definition: abt.h:1105
ABT_pool
struct ABT_pool_opaque * ABT_pool
Pool handle type.
Definition: abt.h:878
pool_pop_many_private
static void pool_pop_many_private(ABT_pool pool, ABT_thread *threads, size_t max_threads, size_t *num_popped, ABT_pool_context context)
Definition: fifo.c:303
ABT_POOL_ACCESS_MPSC
@ ABT_POOL_ACCESS_MPSC
Definition: abt.h:569
pool_remove_private
static int pool_remove_private(ABT_pool pool, ABT_unit unit)
Definition: fifo.c:331
ABT_POOL_ACCESS_PRIV
@ ABT_POOL_ACCESS_PRIV
Definition: abt.h:560
pool_get_size
static size_t pool_get_size(ABT_pool pool)
Definition: fifo.c:143
pool_pop_timedwait
static ABT_unit pool_pop_timedwait(ABT_pool pool, double abstime_secs)
Definition: fifo.c:233
abti.h
pool_is_empty
static ABT_bool pool_is_empty(ABT_pool pool)
Definition: fifo.c:136
pool_free
static void pool_free(ABT_pool pool)
Definition: fifo.c:128
ABT_pool_config
struct ABT_pool_config_opaque * ABT_pool_config
Pool configuration handle type.
Definition: abt.h:885
ABTU_malloc
static ABTU_ret_err int ABTU_malloc(size_t size, void **p_ptr)
Definition: abtu.h:235
ABT_ERR_INV_POOL_ACCESS
#define ABT_ERR_INV_POOL_ACCESS
Error code: invalid pool access type.
Definition: abt.h:166
pool_remove_shared
static int pool_remove_shared(ABT_pool pool, ABT_unit unit)
Definition: fifo.c:320
ABT_unit
struct ABT_unit_opaque * ABT_unit
Work unit handle type for scheduling.
Definition: abt.h:911
pool_push_many_private
static void pool_push_many_private(ABT_pool pool, const ABT_unit *units, size_t num_units, ABT_pool_context context)
Definition: fifo.c:190
ABT_SUCCESS
#define ABT_SUCCESS
Error code: the routine returns successfully.
Definition: abt.h:92
thread_queue_is_empty
static ABT_bool thread_queue_is_empty(const thread_queue_t *p_queue)
Definition: thread_queue.h:58
ABTU_ret_err
#define ABTU_ret_err
Definition: abtu.h:155
pool_free_unit
static void pool_free_unit(ABT_pool pool, ABT_unit unit)
Definition: fifo.c:372
pool_push_shared
static void pool_push_shared(ABT_pool pool, ABT_unit unit, ABT_pool_context context)
Definition: fifo.c:150
thread_queue_remove
static ABTU_ret_err int thread_queue_remove(thread_queue_t *p_queue, ABTI_thread *p_thread)
Definition: thread_queue.h:165
pool_pop_shared
static ABT_thread pool_pop_shared(ABT_pool pool, ABT_pool_context context)
Definition: fifo.c:255
ABT_TRUE
#define ABT_TRUE
True constant for ABT_bool.
Definition: abt.h:784
ABT_POOL_ACCESS_SPMC
@ ABT_POOL_ACCESS_SPMC
Definition: abt.h:573
pool_get_data_ptr
static data_t * pool_get_data_ptr(void *p_data)
Definition: fifo.c:51
ABT_FALSE
#define ABT_FALSE
False constant for ABT_bool.
Definition: abt.h:786
thread_queue_t
Definition: thread_queue.h:12
pool_push_many_shared
static void pool_push_many_shared(ABT_pool pool, const ABT_unit *units, size_t num_units, ABT_pool_context context)
Definition: fifo.c:172
ABT_POOL_ACCESS_SPSC
@ ABT_POOL_ACCESS_SPSC
Definition: abt.h:565
ABTU_free
static void ABTU_free(void *ptr)
Definition: abtu.h:228
data_t
struct data data_t
Definition: fifo.c:49
thread_queue_push_tail
static void thread_queue_push_tail(thread_queue_t *p_queue, ABTI_thread *p_thread)
Definition: thread_queue.h:92
pool_print_all
static void pool_print_all(ABT_pool pool, void *arg, void(*print_fn)(void *, ABT_thread))
Definition: fifo.c:339
pool_init
static int pool_init(ABT_pool pool, ABT_pool_config config)
Definition: fifo.c:106
thread_queue.h
thread_queue_pop_head
static ABTI_thread * thread_queue_pop_head(thread_queue_t *p_queue)
Definition: thread_queue.h:115
pool_pop_many_shared
static void pool_pop_many_shared(ABT_pool pool, ABT_thread *threads, size_t max_threads, size_t *num_popped, ABT_pool_context context)
Definition: fifo.c:279
ABT_UNIT_NULL
#define ABT_UNIT_NULL
Definition: abt.h:1104
thread_queue_print_all
static void thread_queue_print_all(const thread_queue_t *p_queue, void *arg, void(*print_fn)(void *, ABT_thread))
Definition: thread_queue.h:193
pool_pop_private
static ABT_thread pool_pop_private(ABT_pool pool, ABT_pool_context context)
Definition: fifo.c:270
thread_queue_init
static void thread_queue_init(thread_queue_t *p_queue)
Definition: thread_queue.h:21
pool_push_private
static void pool_push_private(ABT_pool pool, ABT_unit unit, ABT_pool_context context)
Definition: fifo.c:162
pool_create_unit
static ABT_unit pool_create_unit(ABT_pool pool, ABT_thread thread)
Definition: fifo.c:365
thread_queue_get_size
static size_t thread_queue_get_size(const thread_queue_t *p_queue)
Definition: thread_queue.h:64
ABT_pool_access
ABT_pool_access
Pool access type.
Definition: abt.h:556