1 |
/* Creation date: 2005-07-26 08:20:59
|
2 |
|
3 |
* Authors: Don
|
4 |
* Change log:
|
5 |
|
6 |
*/
|
7 |
|
8 |
#include "cfuthread_queue.h"
|
9 |
#include "cfulist.h"
|
10 |
|
11 |
#include <pthread.h>
|
12 |
#include <stdlib.h>
|
13 |
#include <stdio.h>
|
14 |
|
15 |
#ifdef CFU_DEBUG
|
16 |
#ifdef NDEBUG
|
17 |
#undef NDEBUG
|
18 |
#endif
|
19 |
#else
|
20 |
#ifndef NDEBUG
|
21 |
#define NDEBUG 1
|
22 |
#endif
|
23 |
#endif
|
24 |
#include <assert.h>
|
25 |
|
26 |
struct cfuthread_queue {
|
27 |
pthread_mutex_t mutex;
|
28 |
pthread_cond_t cv;
|
29 |
cfulist_t *request_queue;
|
30 |
cfuthread_queue_fn_t fn;
|
31 |
pthread_t thread;
|
32 |
cfuthread_queue_init_t init_fn;
|
33 |
void *init_arg;
|
34 |
cfuthread_queue_cleanup_t cleanup_fn;
|
35 |
void *cleanup_arg;
|
36 |
};
|
37 |
|
38 |
typedef struct cfuthread_queue_entry {
|
39 |
pthread_mutex_t mutex;
|
40 |
pthread_cond_t cv;
|
41 |
void *data_in;
|
42 |
void *data_out;
|
43 |
} cfuthread_queue_entry;
|
44 |
|
45 |
static cfuthread_queue_entry *
|
46 |
_new_cfuthread_entry(void *data) {
|
47 |
cfuthread_queue_entry *entry =
|
48 |
(cfuthread_queue_entry *)calloc(1, sizeof(cfuthread_queue_entry));
|
49 |
pthread_mutex_init(&entry->mutex, NULL);
|
50 |
pthread_cond_init(&entry->cv, NULL);
|
51 |
entry->data_in = data;
|
52 |
return entry;
|
53 |
}
|
54 |
|
55 |
static void
|
56 |
_destroy_cfuthread_entry(cfuthread_queue_entry *entry) {
|
57 |
pthread_mutex_destroy(&entry->mutex);
|
58 |
pthread_cond_destroy(&entry->cv);
|
59 |
free(entry);
|
60 |
}
|
61 |
|
62 |
static void *
|
63 |
_run_queue(void *arg) {
|
64 |
cfuthread_queue_t *tq = (cfuthread_queue_t *)arg;
|
65 |
cfuthread_queue_entry *request = NULL;
|
66 |
|
67 |
if (tq->init_fn) {
|
68 |
tq->init_fn(tq->init_arg);
|
69 |
}
|
70 |
|
71 |
pthread_cleanup_push((void *)tq->cleanup_fn, tq->cleanup_arg);
|
72 |
|
73 |
while (1) {
|
74 |
pthread_mutex_lock(&tq->mutex);
|
75 |
while (cfulist_num_entries(tq->request_queue) == 0) {
|
76 |
pthread_cond_wait(&tq->cv, &tq->mutex);
|
77 |
}
|
78 |
|
79 |
request = (cfuthread_queue_entry *)cfulist_dequeue(tq->request_queue);
|
80 |
pthread_mutex_unlock(&tq->mutex);
|
81 |
if (!request) continue;
|
82 |
|
83 |
pthread_mutex_lock(&request->mutex);
|
84 |
request->data_out = tq->fn(request->data_in);
|
85 |
pthread_cond_signal(&request->cv);
|
86 |
pthread_mutex_unlock(&request->mutex);
|
87 |
}
|
88 |
pthread_exit((void *)0);
|
89 |
|
90 |
pthread_cleanup_pop(0);
|
91 |
|
92 |
}
|
93 |
|
94 |
extern cfuthread_queue_t *
|
95 |
cfuthread_queue_new_with_cleanup(cfuthread_queue_fn_t fn, cfuthread_queue_init_t init_fn,
|
96 |
void *init_arg, cfuthread_queue_cleanup_t cleanup_fn,
|
97 |
void *cleanup_arg) {
|
98 |
cfuthread_queue_t *tq = (cfuthread_queue_t *)calloc(1, sizeof(cfuthread_queue_t));
|
99 |
pthread_mutex_init(&tq->mutex, NULL);
|
100 |
pthread_cond_init(&tq->cv, NULL);
|
101 |
tq->fn = fn;
|
102 |
tq->request_queue = cfulist_new();
|
103 |
tq->init_fn = init_fn;
|
104 |
tq->init_arg = init_arg;
|
105 |
tq->cleanup_fn = cleanup_fn;
|
106 |
tq->cleanup_arg = cleanup_arg;
|
107 |
|
108 |
/* FIXME: do pthread_create() here to run a function that waits on
|
109 |
an entry to be put into the queue, then call fn().
|
110 |
*/
|
111 |
if ( (0 != pthread_create(&tq->thread, NULL, _run_queue, (void *)tq)) ) {
|
112 |
return NULL;
|
113 |
}
|
114 |
|
115 |
return tq;
|
116 |
}
|
117 |
|
118 |
extern cfuthread_queue_t *
|
119 |
cfuthread_queue_new(cfuthread_queue_fn_t fn) {
|
120 |
return cfuthread_queue_new_with_cleanup(fn, NULL, NULL, NULL, NULL);
|
121 |
}
|
122 |
|
123 |
extern void *
|
124 |
cfuthread_queue_make_request(cfuthread_queue_t * tq, void *data) {
|
125 |
cfuthread_queue_entry *request = _new_cfuthread_entry(data);
|
126 |
|
127 |
pthread_mutex_lock(&tq->mutex);
|
128 |
pthread_mutex_lock(&request->mutex);
|
129 |
cfulist_enqueue(tq->request_queue, (void *)request);
|
130 |
pthread_cond_signal(&tq->cv);
|
131 |
pthread_mutex_unlock(&tq->mutex);
|
132 |
|
133 |
pthread_cond_wait(&request->cv, &request->mutex);
|
134 |
pthread_mutex_unlock(&request->mutex);
|
135 |
|
136 |
data = request->data_out;
|
137 |
_destroy_cfuthread_entry(request);
|
138 |
|
139 |
return data;
|
140 |
}
|
141 |
|
142 |
extern void
|
143 |
cfuthread_queue_destroy(cfuthread_queue_t *tq) {
|
144 |
void *rv = NULL;
|
145 |
|
146 |
pthread_cancel(tq->thread);
|
147 |
pthread_join(tq->thread, &rv);
|
148 |
pthread_mutex_destroy(&tq->mutex);
|
149 |
pthread_cond_destroy(&tq->cv);
|
150 |
cfulist_destroy(tq->request_queue);
|
151 |
free(tq);
|
152 |
}
|