source: trunk/server/source3/lib/pthreadpool/pthreadpool.c@ 898

Last change on this file since 898 was 751, checked in by Silvan Scherrer, 13 years ago

Samba Server: updated trunk to 3.6.9

File size: 12.2 KB
Line 
1/*
2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#include <errno.h>
21#include <stdio.h>
22#include <unistd.h>
23#include <stdlib.h>
24#include <string.h>
25#include <pthread.h>
26#include <signal.h>
27#include <assert.h>
28#include <fcntl.h>
29#include <sys/time.h>
30
31#include "pthreadpool.h"
32#include "lib/util/dlinklist.h"
33#ifdef __OS2__
34#define pipe(A) os2_pipe(A)
35#endif
36
37struct pthreadpool_job {
38 struct pthreadpool_job *next;
39 int id;
40 void (*fn)(void *private_data);
41 void *private_data;
42};
43
44struct pthreadpool {
45 /*
46 * List pthreadpools for fork safety
47 */
48 struct pthreadpool *prev, *next;
49
50 /*
51 * Control access to this struct
52 */
53 pthread_mutex_t mutex;
54
55 /*
56 * Threads waiting for work do so here
57 */
58 pthread_cond_t condvar;
59
60 /*
61 * List of work jobs
62 */
63 struct pthreadpool_job *jobs, *last_job;
64
65 /*
66 * pipe for signalling
67 */
68 int sig_pipe[2];
69
70 /*
71 * indicator to worker threads that they should shut down
72 */
73 int shutdown;
74
75 /*
76 * maximum number of threads
77 */
78 int max_threads;
79
80 /*
81 * Number of threads
82 */
83 int num_threads;
84
85 /*
86 * Number of idle threads
87 */
88 int num_idle;
89
90 /*
91 * An array of threads that require joining.
92 */
93 int num_exited;
94 pthread_t *exited; /* We alloc more */
95};
96
97static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
98static struct pthreadpool *pthreadpools = NULL;
99static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
100
101static void pthreadpool_prep_atfork(void);
102
103/*
104 * Initialize a thread pool
105 */
106
107int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
108{
109 struct pthreadpool *pool;
110 int ret;
111
112 pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
113 if (pool == NULL) {
114 return ENOMEM;
115 }
116
117 ret = pipe(pool->sig_pipe);
118 if (ret == -1) {
119 int err = errno;
120 free(pool);
121 return err;
122 }
123
124 ret = pthread_mutex_init(&pool->mutex, NULL);
125 if (ret != 0) {
126 close(pool->sig_pipe[0]);
127 close(pool->sig_pipe[1]);
128 free(pool);
129 return ret;
130 }
131
132 ret = pthread_cond_init(&pool->condvar, NULL);
133 if (ret != 0) {
134 pthread_mutex_destroy(&pool->mutex);
135 close(pool->sig_pipe[0]);
136 close(pool->sig_pipe[1]);
137 free(pool);
138 return ret;
139 }
140
141 pool->shutdown = 0;
142 pool->jobs = pool->last_job = NULL;
143 pool->num_threads = 0;
144 pool->num_exited = 0;
145 pool->exited = NULL;
146 pool->max_threads = max_threads;
147 pool->num_idle = 0;
148
149 ret = pthread_mutex_lock(&pthreadpools_mutex);
150 if (ret != 0) {
151 pthread_cond_destroy(&pool->condvar);
152 pthread_mutex_destroy(&pool->mutex);
153 close(pool->sig_pipe[0]);
154 close(pool->sig_pipe[1]);
155 free(pool);
156 return ret;
157 }
158 DLIST_ADD(pthreadpools, pool);
159
160 ret = pthread_mutex_unlock(&pthreadpools_mutex);
161 assert(ret == 0);
162
163 pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
164
165 *presult = pool;
166
167 return 0;
168}
169
170static void pthreadpool_prepare(void)
171{
172 int ret;
173 struct pthreadpool *pool;
174
175 ret = pthread_mutex_lock(&pthreadpools_mutex);
176 assert(ret == 0);
177
178 pool = pthreadpools;
179
180 while (pool != NULL) {
181 ret = pthread_mutex_lock(&pool->mutex);
182 assert(ret == 0);
183 pool = pool->next;
184 }
185}
186
187static void pthreadpool_parent(void)
188{
189 int ret;
190 struct pthreadpool *pool;
191
192 pool = DLIST_TAIL(pthreadpools);
193
194 while (1) {
195 ret = pthread_mutex_unlock(&pool->mutex);
196 assert(ret == 0);
197
198 if (pool == pthreadpools) {
199 break;
200 }
201 pool = pool->prev;
202 }
203
204 ret = pthread_mutex_unlock(&pthreadpools_mutex);
205 assert(ret == 0);
206}
207
208static void pthreadpool_child(void)
209{
210 int ret;
211 struct pthreadpool *pool;
212
213 pool = DLIST_TAIL(pthreadpools);
214
215 while (1) {
216 close(pool->sig_pipe[0]);
217 close(pool->sig_pipe[1]);
218
219 ret = pipe(pool->sig_pipe);
220 assert(ret == 0);
221
222 pool->num_threads = 0;
223
224 pool->num_exited = 0;
225 free(pool->exited);
226 pool->exited = NULL;
227
228 pool->num_idle = 0;
229
230 while (pool->jobs != NULL) {
231 struct pthreadpool_job *job;
232 job = pool->jobs;
233 pool->jobs = job->next;
234 free(job);
235 }
236 pool->last_job = NULL;
237
238 ret = pthread_mutex_unlock(&pool->mutex);
239 assert(ret == 0);
240
241 if (pool == pthreadpools) {
242 break;
243 }
244 pool = pool->prev;
245 }
246
247 ret = pthread_mutex_unlock(&pthreadpools_mutex);
248 assert(ret == 0);
249}
250
251static void pthreadpool_prep_atfork(void)
252{
253 pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
254 pthreadpool_child);
255}
256
257/*
258 * Return the file descriptor which becomes readable when a job has
259 * finished
260 */
261
262int pthreadpool_signal_fd(struct pthreadpool *pool)
263{
264 return pool->sig_pipe[0];
265}
266
267/*
268 * Do a pthread_join() on all children that have exited, pool->mutex must be
269 * locked
270 */
271static void pthreadpool_join_children(struct pthreadpool *pool)
272{
273 int i;
274
275 for (i=0; i<pool->num_exited; i++) {
276 pthread_join(pool->exited[i], NULL);
277 }
278 pool->num_exited = 0;
279
280 /*
281 * Deliberately not free and NULL pool->exited. That will be
282 * re-used by realloc later.
283 */
284}
285
286/*
287 * Fetch a finished job number from the signal pipe
288 */
289
290int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid)
291{
292 int ret_jobid;
293 ssize_t nread;
294
295 nread = -1;
296 errno = EINTR;
297
298 while ((nread == -1) && (errno == EINTR)) {
299 nread = read(pool->sig_pipe[0], &ret_jobid, sizeof(int));
300 }
301 if (nread == -1) {
302 return errno;
303 }
304 if (nread != sizeof(int)) {
305 return EINVAL;
306 }
307 *jobid = ret_jobid;
308 return 0;
309}
310
311/*
312 * Destroy a thread pool, finishing all threads working for it
313 */
314
315int pthreadpool_destroy(struct pthreadpool *pool)
316{
317 int ret, ret1;
318
319 ret = pthread_mutex_lock(&pool->mutex);
320 if (ret != 0) {
321 return ret;
322 }
323
324 if ((pool->jobs != NULL) || pool->shutdown) {
325 ret = pthread_mutex_unlock(&pool->mutex);
326 assert(ret == 0);
327 return EBUSY;
328 }
329
330 if (pool->num_threads > 0) {
331 /*
332 * We have active threads, tell them to finish, wait for that.
333 */
334
335 pool->shutdown = 1;
336
337 if (pool->num_idle > 0) {
338 /*
339 * Wake the idle threads. They will find pool->quit to
340 * be set and exit themselves
341 */
342 ret = pthread_cond_broadcast(&pool->condvar);
343 if (ret != 0) {
344 pthread_mutex_unlock(&pool->mutex);
345 return ret;
346 }
347 }
348
349 while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
350
351 if (pool->num_exited > 0) {
352 pthreadpool_join_children(pool);
353 continue;
354 }