/[zanavi_public1]/navit/navit/maptool/libcfu-0.03-zanavi/src/cfuthread_queue.c
ZANavi

Contents of /navit/navit/maptool/libcfu-0.03-zanavi/src/cfuthread_queue.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 31 - (show annotations) (download)
Mon Feb 4 17:41:59 2013 UTC (11 years, 1 month ago) by zoff99
File MIME type: text/plain
File size: 3587 byte(s)
new map version, lots of fixes and experimental new features
1 /* Creation date: 2005-07-26 08:20:59
2
3 * Authors: Don
4 * Change log:
5
6 */
7
8 #include "cfuthread_queue.h"
9 #include "cfulist.h"
10
11 #include <pthread.h>
12 #include <stdlib.h>
13 #include <stdio.h>
14
15 #ifdef CFU_DEBUG
16 #ifdef NDEBUG
17 #undef NDEBUG
18 #endif
19 #else
20 #ifndef NDEBUG
21 #define NDEBUG 1
22 #endif
23 #endif
24 #include <assert.h>
25
26 struct cfuthread_queue {
27 pthread_mutex_t mutex;
28 pthread_cond_t cv;
29 cfulist_t *request_queue;
30 cfuthread_queue_fn_t fn;
31 pthread_t thread;
32 cfuthread_queue_init_t init_fn;
33 void *init_arg;
34 cfuthread_queue_cleanup_t cleanup_fn;
35 void *cleanup_arg;
36 };
37
38 typedef struct cfuthread_queue_entry {
39 pthread_mutex_t mutex;
40 pthread_cond_t cv;
41 void *data_in;
42 void *data_out;
43 } cfuthread_queue_entry;
44
45 static cfuthread_queue_entry *
46 _new_cfuthread_entry(void *data) {
47 cfuthread_queue_entry *entry =
48 (cfuthread_queue_entry *)calloc(1, sizeof(cfuthread_queue_entry));
49 pthread_mutex_init(&entry->mutex, NULL);
50 pthread_cond_init(&entry->cv, NULL);
51 entry->data_in = data;
52 return entry;
53 }
54
55 static void
56 _destroy_cfuthread_entry(cfuthread_queue_entry *entry) {
57 pthread_mutex_destroy(&entry->mutex);
58 pthread_cond_destroy(&entry->cv);
59 free(entry);
60 }
61
62 static void *
63 _run_queue(void *arg) {
64 cfuthread_queue_t *tq = (cfuthread_queue_t *)arg;
65 cfuthread_queue_entry *request = NULL;
66
67 if (tq->init_fn) {
68 tq->init_fn(tq->init_arg);
69 }
70
71 pthread_cleanup_push((void *)tq->cleanup_fn, tq->cleanup_arg);
72
73 while (1) {
74 pthread_mutex_lock(&tq->mutex);
75 while (cfulist_num_entries(tq->request_queue) == 0) {
76 pthread_cond_wait(&tq->cv, &tq->mutex);
77 }
78
79 request = (cfuthread_queue_entry *)cfulist_dequeue(tq->request_queue);
80 pthread_mutex_unlock(&tq->mutex);
81 if (!request) continue;
82
83 pthread_mutex_lock(&request->mutex);
84 request->data_out = tq->fn(request->data_in);
85 pthread_cond_signal(&request->cv);
86 pthread_mutex_unlock(&request->mutex);
87 }
88 pthread_exit((void *)0);
89
90 pthread_cleanup_pop(0);
91
92 }
93
94 extern cfuthread_queue_t *
95 cfuthread_queue_new_with_cleanup(cfuthread_queue_fn_t fn, cfuthread_queue_init_t init_fn,
96 void *init_arg, cfuthread_queue_cleanup_t cleanup_fn,
97 void *cleanup_arg) {
98 cfuthread_queue_t *tq = (cfuthread_queue_t *)calloc(1, sizeof(cfuthread_queue_t));
99 pthread_mutex_init(&tq->mutex, NULL);
100 pthread_cond_init(&tq->cv, NULL);
101 tq->fn = fn;
102 tq->request_queue = cfulist_new();
103 tq->init_fn = init_fn;
104 tq->init_arg = init_arg;
105 tq->cleanup_fn = cleanup_fn;
106 tq->cleanup_arg = cleanup_arg;
107
108 /* FIXME: do pthread_create() here to run a function that waits on
109 an entry to be put into the queue, then call fn().
110 */
111 if ( (0 != pthread_create(&tq->thread, NULL, _run_queue, (void *)tq)) ) {
112 return NULL;
113 }
114
115 return tq;
116 }
117
118 extern cfuthread_queue_t *
119 cfuthread_queue_new(cfuthread_queue_fn_t fn) {
120 return cfuthread_queue_new_with_cleanup(fn, NULL, NULL, NULL, NULL);
121 }
122
123 extern void *
124 cfuthread_queue_make_request(cfuthread_queue_t * tq, void *data) {
125 cfuthread_queue_entry *request = _new_cfuthread_entry(data);
126
127 pthread_mutex_lock(&tq->mutex);
128 pthread_mutex_lock(&request->mutex);
129 cfulist_enqueue(tq->request_queue, (void *)request);
130 pthread_cond_signal(&tq->cv);
131 pthread_mutex_unlock(&tq->mutex);
132
133 pthread_cond_wait(&request->cv, &request->mutex);
134 pthread_mutex_unlock(&request->mutex);
135
136 data = request->data_out;
137 _destroy_cfuthread_entry(request);
138
139 return data;
140 }
141
142 extern void
143 cfuthread_queue_destroy(cfuthread_queue_t *tq) {
144 void *rv = NULL;
145
146 pthread_cancel(tq->thread);
147 pthread_join(tq->thread, &rv);
148 pthread_mutex_destroy(&tq->mutex);
149 pthread_cond_destroy(&tq->cv);
150 cfulist_destroy(tq->request_queue);
151 free(tq);
152 }

   
Visit the ZANavi Wiki