ARGOBOTS  dce6e727ffc4ca5b3ffc04cb9517c6689be51ec5
abtd_stream.c
Go to the documentation of this file.
1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  * See COPYRIGHT in top-level directory.
4  */
5 
6 #include "abti.h"
7 
8 static void *xstream_context_thread_func(void *arg)
9 {
10  ABTD_xstream_context *p_ctx = (ABTD_xstream_context *)arg;
11  void *(*thread_f)(void *) = p_ctx->thread_f;
12  void *p_arg = p_ctx->p_arg;
13  ABTI_ASSERT(p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_RUNNING);
14  while (1) {
15  /* Execute a main execution stream function. */
16  thread_f(p_arg);
17  /* This thread has finished. */
18  ABT_bool restart;
19  pthread_mutex_lock(&p_ctx->state_lock);
20  /* If another execution stream is waiting for this thread completion,
21  * let's wake it up. */
22  if (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_REQ_JOIN) {
23  pthread_cond_signal(&p_ctx->state_cond);
24  }
25  p_ctx->state = ABTD_XSTREAM_CONTEXT_STATE_WAITING;
26  /* Wait for a request from ABTD_xstream_context_free() or
27  * ABTD_xstream_context_restart().
28  * The following loop is to deal with spurious wakeup. */
29  do {
30  pthread_cond_wait(&p_ctx->state_cond, &p_ctx->state_lock);
31  } while (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_WAITING);
32  if (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_REQ_TERMINATE) {
33  /* ABTD_xstream_context_free() terminates this thread. */
34  restart = ABT_FALSE;
35  } else {
36  /* ABTD_xstream_context_restart() restarts this thread */
37  ABTI_ASSERT(p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_RUNNING ||
38  p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_REQ_JOIN);
39  restart = ABT_TRUE;
40  }
41  pthread_mutex_unlock(&p_ctx->state_lock);
42  if (!restart)
43  break;
44  }
45  return NULL;
46 }
47 
48 ABTU_ret_err int ABTD_xstream_context_create(void *(*f_xstream)(void *),
49  void *p_arg,
50  ABTD_xstream_context *p_ctx)
51 {
52  p_ctx->thread_f = f_xstream;
53  p_ctx->p_arg = p_arg;
54  /* Coverity thinks p_ctx->state must be updated with a lock since it is
55  * updated with a lock in the other places. This assumption is wrong. The
56  * following suppresses a false positive. */
57  /* coverity[missing_lock] */
58  p_ctx->state = ABTD_XSTREAM_CONTEXT_STATE_RUNNING;
59  int ret, init_stage = 0;
60  ret = pthread_mutex_init(&p_ctx->state_lock, NULL);
61  if (ret != 0)
62  goto FAILED;
63  init_stage = 1;
64 
65  ret = pthread_cond_init(&p_ctx->state_cond, NULL);
66  if (ret != 0)
67  goto FAILED;
68  init_stage = 2;
69 
70  ret = pthread_create(&p_ctx->native_thread, NULL,
72  if (ret != 0)
73  goto FAILED;
74  init_stage = 3;
75 
76  return ABT_SUCCESS;
77 FAILED:
78  if (init_stage >= 2) {
79  ret = pthread_cond_destroy(&p_ctx->state_cond);
80  ABTI_ASSERT(ret == 0);
81  }
82  if (init_stage >= 1) {
83  ret = pthread_mutex_destroy(&p_ctx->state_lock);
84  ABTI_ASSERT(ret == 0);
85  }
86  p_ctx->state = ABTD_XSTREAM_CONTEXT_STATE_UNINIT;
87  ABTI_HANDLE_ERROR(ABT_ERR_SYS);
88 }
89 
90 void ABTD_xstream_context_free(ABTD_xstream_context *p_ctx)
91 {
92  /* Request termination */
93  if (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_UNINIT) {
94  /* Do nothing. */
95  } else {
96  pthread_mutex_lock(&p_ctx->state_lock);
97  ABTI_ASSERT(p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_WAITING);
98  p_ctx->state = ABTD_XSTREAM_CONTEXT_STATE_REQ_TERMINATE;
99  pthread_cond_signal(&p_ctx->state_cond);
100  pthread_mutex_unlock(&p_ctx->state_lock);
101  /* Join the target thread. */
102  int ret;
103  ret = pthread_join(p_ctx->native_thread, NULL);
104  ABTI_ASSERT(ret == 0);
105  ret = pthread_cond_destroy(&p_ctx->state_cond);
106  ABTI_ASSERT(ret == 0);
107  ret = pthread_mutex_destroy(&p_ctx->state_lock);
108  ABTI_ASSERT(ret == 0);
109  }
110 }
111 
112 void ABTD_xstream_context_join(ABTD_xstream_context *p_ctx)
113 {
114  /* If not finished, sleep this thread. */
115  pthread_mutex_lock(&p_ctx->state_lock);
116  if (p_ctx->state != ABTD_XSTREAM_CONTEXT_STATE_WAITING) {
117  ABTI_ASSERT(p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_RUNNING);
118  p_ctx->state = ABTD_XSTREAM_CONTEXT_STATE_REQ_JOIN;
119  /* The following loop is to deal with spurious wakeup. */
120  do {
121  pthread_cond_wait(&p_ctx->state_cond, &p_ctx->state_lock);
122  } while (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_REQ_JOIN);
123  }
124  ABTI_ASSERT(p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_WAITING);
125  pthread_mutex_unlock(&p_ctx->state_lock);
126 }
127 
128 void ABTD_xstream_context_revive(ABTD_xstream_context *p_ctx)
129 {
130  /* Request restart */
131  pthread_mutex_lock(&p_ctx->state_lock);
132  ABTI_ASSERT(p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_WAITING);
133  p_ctx->state = ABTD_XSTREAM_CONTEXT_STATE_RUNNING;
134  pthread_cond_signal(&p_ctx->state_cond);
135  pthread_mutex_unlock(&p_ctx->state_lock);
136 }
137 
138 void ABTD_xstream_context_set_self(ABTD_xstream_context *p_ctx)
139 {
140  p_ctx->native_thread = pthread_self();
141 }
142 
143 void ABTD_xstream_context_print(ABTD_xstream_context *p_ctx, FILE *p_os,
144  int indent)
145 {
146  if (p_ctx == NULL) {
147  fprintf(p_os, "%*s== NULL XSTREAM CONTEXT ==\n", indent, "");
148  } else {
149  const char *state;
150  if (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_RUNNING) {
151  state = "RUNNING";
152  } else if (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_WAITING) {
153  state = "WAITING";
154  } else if (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_REQ_JOIN) {
155  state = "REQ_JOIN";
156  } else if (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_REQ_TERMINATE) {
157  state = "REQ_TERMINATE";
158  } else if (p_ctx->state == ABTD_XSTREAM_CONTEXT_STATE_UNINIT) {
159  state = "UNINIT";
160  } else {
161  state = "UNKNOWN";
162  }
163  fprintf(p_os,
164  "%*s== XSTREAM CONTEXT (%p) ==\n"
165  "%*sstate : %s\n",
166  indent, "", (void *)p_ctx, indent, "", state);
167  }
168  fflush(p_os);
169 }
ABT_ERR_SYS
#define ABT_ERR_SYS
Error code: error related to system calls and standard libraries.
Definition: abt.h:403
ABT_bool
int ABT_bool
Boolean type.
Definition: abt.h:1043
xstream_context_thread_func
static void * xstream_context_thread_func(void *arg)
Definition: abtd_stream.c:8
abti.h
ABT_SUCCESS
#define ABT_SUCCESS
Error code: the routine returns successfully.
Definition: abt.h:92
ABTU_ret_err
#define ABTU_ret_err
Definition: abtu.h:155
ABT_TRUE
#define ABT_TRUE
True constant for ABT_bool.
Definition: abt.h:784
ABT_FALSE
#define ABT_FALSE
False constant for ABT_bool.
Definition: abt.h:786