Skip to content

Commit 478e72a

Browse files
committed
Add TODO unsound zstd POOL_add test
1 parent dc432a6 commit 478e72a

File tree

1 file changed

+312
-0
lines changed

1 file changed

+312
-0
lines changed
Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
// PARAM: --set ana.activated[+] symb_locks --set ana.activated[+] var_eq --set exp.extraspecials[+] ZSTD_customMalloc --set exp.extraspecials[+] ZSTD_customCalloc
2+
/* SPDX-License-Identifier: BSD-3-Clause */
3+
/*
4+
* Copyright (c) Facebook, Inc.
5+
* All rights reserved.
6+
*
7+
* This code is a challenging example for race detection extracted from zstd.
8+
* Copyright (c) The Goblint Contributors
9+
*/
10+
11+
#include<stdlib.h>
12+
#include<pthread.h>
13+
#define ZSTD_pthread_mutex_t pthread_mutex_t
14+
#define ZSTD_pthread_mutex_init(a, b) pthread_mutex_init((a), (b))
15+
#define ZSTD_pthread_mutex_destroy(a) pthread_mutex_destroy((a))
16+
#define ZSTD_pthread_mutex_lock(a) pthread_mutex_lock((a))
17+
#define ZSTD_pthread_mutex_unlock(a) pthread_mutex_unlock((a))
18+
19+
#define ZSTD_pthread_cond_t pthread_cond_t
20+
#define ZSTD_pthread_cond_init(a, b) pthread_cond_init((a), (b))
21+
#define ZSTD_pthread_cond_destroy(a) pthread_cond_destroy((a))
22+
#define ZSTD_pthread_cond_wait(a, b) pthread_cond_wait((a), (b))
23+
#define ZSTD_pthread_cond_signal(a) pthread_cond_signal((a))
24+
#define ZSTD_pthread_cond_broadcast(a) pthread_cond_broadcast((a))
25+
26+
#define ZSTD_pthread_t pthread_t
27+
#define ZSTD_pthread_create(a, b, c, d) pthread_create((a), (b), (c), (d))
28+
#define ZSTD_pthread_join(a, b) pthread_join((a),(b))
29+
30+
#define ZSTD_malloc(s) malloc(s)
31+
#define ZSTD_calloc(n,s) calloc((n), (s))
32+
#define ZSTD_free(p) free((p))
33+
#define ZSTD_memset(d,s,n) __builtin_memset((d),(s),(n))
34+
35+
typedef struct POOL_ctx_s POOL_ctx;
36+
37+
typedef void* (*ZSTD_allocFunction) (void* opaque, size_t size);
38+
typedef void (*ZSTD_freeFunction) (void* opaque, void* address);
39+
typedef struct { ZSTD_allocFunction customAlloc; ZSTD_freeFunction customFree; void* opaque; } ZSTD_customMem;
40+
typedef struct POOL_ctx_s ZSTD_threadPool;
41+
42+
43+
void* ZSTD_customMalloc(size_t size, ZSTD_customMem customMem)
44+
{
45+
if (customMem.customAlloc)
46+
return customMem.customAlloc(customMem.opaque, size);
47+
return ZSTD_malloc(size);
48+
}
49+
50+
void* ZSTD_customCalloc(size_t size, ZSTD_customMem customMem)
51+
{
52+
if (customMem.customAlloc) {
53+
/* calloc implemented as malloc+memset;
54+
* not as efficient as calloc, but next best guess for custom malloc */
55+
void* const ptr = customMem.customAlloc(customMem.opaque, size);
56+
ZSTD_memset(ptr, 0, size);
57+
return ptr;
58+
}
59+
return ZSTD_calloc(1, size);
60+
}
61+
62+
void ZSTD_customFree(void* ptr, ZSTD_customMem customMem)
63+
{
64+
if (ptr!=NULL) {
65+
if (customMem.customFree)
66+
customMem.customFree(customMem.opaque, ptr);
67+
else
68+
ZSTD_free(ptr);
69+
}
70+
}
71+
72+
73+
74+
/*! POOL_create() :
75+
* Create a thread pool with at most `numThreads` threads.
76+
* `numThreads` must be at least 1.
77+
* The maximum number of queued jobs before blocking is `queueSize`.
78+
* @return : POOL_ctx pointer on success, else NULL.
79+
*/
80+
POOL_ctx* POOL_create(size_t numThreads, size_t queueSize);
81+
82+
POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem);
83+
84+
/*! POOL_free() :
85+
* Free a thread pool returned by POOL_create().
86+
*/
87+
void POOL_free(POOL_ctx* ctx);
88+
89+
90+
/*! POOL_function :
91+
* The function type that can be added to a thread pool.
92+
*/
93+
typedef void (*POOL_function)(void*);
94+
95+
96+
static
97+
#ifdef __GNUC__
98+
__attribute__((__unused__))
99+
#endif
100+
ZSTD_customMem const ZSTD_defaultCMem = { NULL, NULL, NULL }; /**< this constant defers to stdlib's functions */
101+
102+
103+
/* A job is a function and an opaque argument */
104+
typedef struct POOL_job_s {
105+
POOL_function function;
106+
void *opaque;
107+
} POOL_job;
108+
109+
struct POOL_ctx_s {
110+
ZSTD_customMem customMem;
111+
/* Keep track of the threads */
112+
ZSTD_pthread_t* threads;
113+
size_t threadCapacity;
114+
size_t threadLimit;
115+
116+
/* The queue is a circular buffer */
117+
POOL_job *queue;
118+
size_t queueHead;
119+
size_t queueTail;
120+
size_t queueSize;
121+
122+
/* The number of threads working on jobs */
123+
size_t numThreadsBusy;
124+
/* Indicates if the queue is empty */
125+
int queueEmpty;
126+
127+
/* The mutex protects the queue */
128+
ZSTD_pthread_mutex_t queueMutex;
129+
/* Condition variable for pushers to wait on when the queue is full */
130+
ZSTD_pthread_cond_t queuePushCond;
131+
/* Condition variables for poppers to wait on when the queue is empty */
132+
ZSTD_pthread_cond_t queuePopCond;
133+
/* Indicates if the queue is shutting down */
134+
int shutdown;
135+
};
136+
137+
/* POOL_thread() :
138+
* Work thread for the thread pool.
139+
* Waits for jobs and executes them.
140+
* @returns : NULL on failure else non-null.
141+
*/
142+
static void* POOL_thread(void* opaque) {
143+
POOL_ctx* const ctx = (POOL_ctx*)opaque;
144+
if (!ctx) { return NULL; }
145+
for (;;) {
146+
/* Lock the mutex and wait for a non-empty queue or until shutdown */
147+
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
148+
149+
while ( ctx->queueEmpty // RACE! (threadLimit)
150+
|| (ctx->numThreadsBusy >= ctx->threadLimit) ) {
151+
if (ctx->shutdown) {
152+
/* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),
153+
* a few threads will be shutdown while !queueEmpty,
154+
* but enough threads will remain active to finish the queue */
155+
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
156+
return opaque;
157+
}
158+
ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
159+
}
160+
/* Pop a job off the queue */
161+
{ POOL_job const job = ctx->queue[ctx->queueHead]; //NORACE
162+
ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; //NORACE
163+
ctx->numThreadsBusy++; //NORACE
164+
ctx->queueEmpty = (ctx->queueHead == ctx->queueTail); //NORACE
165+
/* Unlock the mutex, signal a pusher, and run the job */
166+
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
167+
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
168+
169+
job.function(job.opaque);
170+
171+
/* If the intended queue size was 0, signal after finishing job */
172+
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
173+
ctx->numThreadsBusy--; //NORACE
174+
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
175+
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
176+
}
177+
} /* for (;;) */
178+
assert(0); //NOWARN (unreachable)
179+
}
180+
181+
POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
182+
return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
183+
}
184+
185+
POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
186+
ZSTD_customMem customMem)
187+
{
188+
POOL_ctx* ctx;
189+
/* Check parameters */
190+
if (!numThreads) { return NULL; }
191+
/* Allocate the context and zero initialize */
192+
ctx = (POOL_ctx*)ZSTD_customCalloc(sizeof(POOL_ctx), customMem);
193+
if (!ctx) { return NULL; }
194+
/* Initialize the job queue.
195+
* It needs one extra space since one space is wasted to differentiate
196+
* empty and full queues.
197+
*/
198+
ctx->queueSize = queueSize + 1;
199+
ctx->queue = (POOL_job*)ZSTD_customMalloc(ctx->queueSize * sizeof(POOL_job), customMem);
200+
ctx->queueHead = 0;
201+
ctx->queueTail = 0;
202+
ctx->numThreadsBusy = 0;
203+
ctx->queueEmpty = 1;
204+
{
205+
int error = 0;
206+
error |= ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL);
207+
error |= ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL);
208+
error |= ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL);
209+
if (error) { POOL_free(ctx); return NULL; }
210+
}
211+
ctx->shutdown = 0;
212+
/* Allocate space for the thread handles */
213+
ctx->threads = (ZSTD_pthread_t*)ZSTD_customMalloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
214+
ctx->threadCapacity = 0;
215+
ctx->customMem = customMem;
216+
/* Check for errors */
217+
if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
218+
/* Initialize the threads */
219+
{ size_t i;
220+
for (i = 0; i < numThreads; ++i) {
221+
if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
222+
ctx->threadCapacity = i;
223+
POOL_free(ctx);
224+
return NULL;
225+
} }
226+
ctx->threadCapacity = numThreads;
227+
ctx->threadLimit = numThreads; // RACE!
228+
}
229+
return ctx;
230+
}
231+
232+
/*! POOL_join() :
233+
Shutdown the queue, wake any sleeping threads, and join all of the threads.
234+
*/
235+
static void POOL_join(POOL_ctx* ctx) {
236+
/* Shut down the queue */
237+
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
238+
ctx->shutdown = 1; //NORACE
239+
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
240+
/* Wake up sleeping threads */
241+
ZSTD_pthread_cond_broadcast(&ctx->queuePushCond);
242+
ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
243+
/* Join all of the threads */
244+
{ size_t i;
245+
for (i = 0; i < ctx->threadCapacity; ++i) {
246+
ZSTD_pthread_join(ctx->threads[i], NULL); /* note : could fail */
247+
} }
248+
}
249+
250+
void POOL_free(POOL_ctx *ctx) {
251+
if (!ctx) { return; }
252+
POOL_join(ctx);
253+
ZSTD_pthread_mutex_destroy(&ctx->queueMutex);
254+
ZSTD_pthread_cond_destroy(&ctx->queuePushCond);
255+
ZSTD_pthread_cond_destroy(&ctx->queuePopCond);
256+
ZSTD_customFree(ctx->queue, ctx->customMem);
257+
ZSTD_customFree(ctx->threads, ctx->customMem);
258+
ZSTD_customFree(ctx, ctx->customMem);
259+
}
260+
261+
static int isQueueFull(POOL_ctx const* ctx) {
262+
if (ctx->queueSize > 1) {
263+
return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
264+
} else {
265+
return (ctx->numThreadsBusy == ctx->threadLimit) ||
266+
!ctx->queueEmpty;
267+
}
268+
}
269+
270+
271+
static void
272+
POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque)
273+
{
274+
POOL_job const job = {function, opaque};
275+
assert(ctx != NULL);
276+
if (ctx->shutdown) return;
277+
278+
ctx->queueEmpty = 0;
279+
ctx->queue[ctx->queueTail] = job;
280+
ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
281+
ZSTD_pthread_cond_signal(&ctx->queuePopCond);
282+
}
283+
284+
void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque)
285+
{
286+
assert(ctx != NULL);
287+
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
288+
/* Wait until there is space in the queue for the new job */
289+
while (isQueueFull(ctx) && (!ctx->shutdown)) {
290+
ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
291+
}
292+
POOL_add_internal(ctx, function, opaque);
293+
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
294+
}
295+
296+
void foo(void *arg) {
297+
assert(1); // TODO reachable
298+
}
299+
300+
int g;
301+
302+
void bar(void *arg) {
303+
g++; // TODO RACE!
304+
}
305+
306+
int main() {
307+
POOL_ctx* const ctx = POOL_create(20, 10);
308+
POOL_add(ctx, foo, NULL);
309+
POOL_add(ctx, bar, NULL);
310+
POOL_add(ctx, bar, NULL);
311+
return 0;
312+
}

0 commit comments

Comments
 (0)