Skip to content

Commit 55c950a

Browse files
committed
add test program tst_mpio_pthread.c
1 parent 7098921 commit 55c950a

File tree

1 file changed

+241
-0
lines changed

1 file changed

+241
-0
lines changed

darshan-test/tst_mpio_pthread.c

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
/*
2+
* (C) 2025 by Argonne National Laboratory.
3+
* See COPYRIGHT in top-level directory.
4+
*/
5+
6+
#include <stdio.h>
7+
#include <stdlib.h>
8+
#include <string.h> /* strcpy(), strncpy() */
9+
#include <unistd.h> /* _POSIX_BARRIERS */
10+
#include <sys/types.h> /* open() */
11+
#include <sys/stat.h> /* open() */
12+
#include <fcntl.h> /* open() */
13+
#include <errno.h> /* errno */
14+
15+
#include <pthread.h>
16+
17+
#include <mpi.h>
18+
19+
#define NTHREADS 3
20+
#define LEN 100
21+
22+
#define ERR \
23+
if (err != MPI_SUCCESS) { \
24+
int errorStringLen; \
25+
char errorString[MPI_MAX_ERROR_STRING]; \
26+
MPI_Error_string(err, errorString, &errorStringLen); \
27+
printf("Error at line %d: %s\n",__LINE__, errorString); \
28+
nerrs++; \
29+
}
30+
31+
32+
#if !defined(_POSIX_BARRIERS) || _POSIX_BARRIERS <= 0
33+
/* According to opengroup.org, barriers are defined in the optional part of
34+
* POSIX standard. For example, Mac OSX does not have pthread_barrier. If
35+
* barriers were implemented, the _POSIX_BARRIERS macro is defined as a
36+
* positive number.
37+
*/
38+
39+
typedef int pthread_barrierattr_t;
40+
typedef struct {
41+
pthread_mutex_t mutex;
42+
pthread_cond_t cond;
43+
int count;
44+
int numThreads;
45+
} pthread_barrier_t;
46+
47+
static int pthread_barrier_init(pthread_barrier_t *barrier,
48+
const pthread_barrierattr_t *attr,
49+
unsigned int count)
50+
{
51+
if (count == 0) {
52+
errno = EINVAL;
53+
return -1;
54+
}
55+
56+
if (pthread_mutex_init(&barrier->mutex, 0) < 0)
57+
return -1;
58+
59+
if (pthread_cond_init(&barrier->cond, 0) < 0) {
60+
pthread_mutex_destroy(&barrier->mutex);
61+
return -1;
62+
}
63+
barrier->numThreads = count;
64+
barrier->count = 0;
65+
66+
return 0;
67+
}
68+
69+
static int pthread_barrier_destroy(pthread_barrier_t *barrier)
70+
{
71+
pthread_cond_destroy(&barrier->cond);
72+
pthread_mutex_destroy(&barrier->mutex);
73+
return 0;
74+
}
75+
76+
static int pthread_barrier_wait(pthread_barrier_t *barrier)
77+
{
78+
int ret;
79+
pthread_mutex_lock(&barrier->mutex);
80+
++(barrier->count);
81+
if (barrier->count >= barrier->numThreads) {
82+
barrier->count = 0;
83+
pthread_cond_broadcast(&barrier->cond);
84+
ret = 1;
85+
} else {
86+
pthread_cond_wait(&barrier->cond, &barrier->mutex);
87+
ret = 0;
88+
}
89+
pthread_mutex_unlock(&barrier->mutex);
90+
return ret;
91+
}
92+
#endif
93+
94+
/* pthread barrier object */
95+
static pthread_barrier_t barr;
96+
97+
typedef struct {
98+
int id; /* globally unique thread ID */
99+
MPI_File fh; /* file handler */
100+
int nprocs; /* number of MPI processes */
101+
int rank; /* MPI rank ID */
102+
size_t count; /* write length */
103+
char fname[256]; /* output file name base */
104+
} thread_arg;
105+
106+
pthread_mutex_t env_mutex = PTHREAD_MUTEX_INITIALIZER;
107+
108+
static int setenv_thread_safe(const char *name, const char *value, int overwrite) {
109+
int err;
110+
pthread_mutex_lock(&env_mutex);
111+
err = setenv(name, value, overwrite);
112+
pthread_mutex_unlock(&env_mutex);
113+
return err;
114+
}
115+
116+
/*----< thread_func() >------------------------------------------------------*/
117+
static
118+
void* thread_func(void *arg)
119+
{
120+
char filename[512];
121+
int i, id, err, nerrs=0, nprocs, rank, *ret;
122+
size_t count;
123+
off_t off;
124+
char buf[LEN], annotation[64];
125+
MPI_File fh;
126+
MPI_Status status;
127+
128+
/* make a unique file name for each thread */
129+
id = ((thread_arg*)arg)->id;
130+
fh = ((thread_arg*)arg)->fh;
131+
count = ((thread_arg*)arg)->count;
132+
nprocs = ((thread_arg*)arg)->nprocs;
133+
rank = ((thread_arg*)arg)->rank;
134+
135+
for (i=0; i<LEN; i++) buf[i] = rank;
136+
137+
/* Note Darshan will randomly pick annotation from one of the thread only */
138+
snprintf(annotation, 64, "annotation of rank %d thread %d", rank, id);
139+
// err = setenv_thread_safe("DARSHAN_DXT_EXTRA_INFO", annotation, 0);
140+
err = setenv("DARSHAN_DXT_EXTRA_INFO", annotation, 0);
141+
if (err == -1)
142+
printf("Error: rank %s thread %d failed to call setenv (%s)\n",
143+
rank, id, strerror(errno));
144+
145+
off = rank * NTHREADS * LEN + id * LEN;
146+
147+
err = MPI_File_read_at_all(fh, off, buf, count, MPI_BYTE, &status);
148+
ERR
149+
150+
off += nprocs * NTHREADS * LEN;
151+
152+
err = MPI_File_write_at_all(fh, off, buf, count, MPI_BYTE, &status);
153+
ERR
154+
155+
pthread_t pid = pthread_self();
156+
printf("Thread %d has pthread_self() returned ID %lu\n", id, pid);
157+
158+
/* return number of errors encountered */
159+
ret = (int*)malloc(sizeof(int));
160+
*ret = nerrs;
161+
162+
return ret; /* same as pthread_exit(ret); */
163+
}
164+
165+
/*----< main() >-------------------------------------------------------------*/
166+
int main(int argc, char **argv) {
167+
extern int optind;
168+
char filename[256];
169+
int i, err, nerrs=0, rank=0, nprocs, providedT;
170+
MPI_File fh;
171+
MPI_Status status;
172+
pthread_t threads[NTHREADS];
173+
174+
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &providedT);
175+
176+
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
177+
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
178+
179+
if (providedT != MPI_THREAD_MULTIPLE) {
180+
if (!rank)
181+
printf("Error: MPI does not support MPI_THREAD_MULTIPLE\n");
182+
MPI_Finalize();
183+
return 0;
184+
}
185+
186+
if (argc == 1) strcpy(filename, "testfile");
187+
else strcpy(filename, argv[1]);
188+
189+
/* create a file */
190+
err = MPI_File_open(MPI_COMM_SELF, filename, MPI_MODE_CREATE | MPI_MODE_RDWR,
191+
MPI_INFO_NULL, &fh);
192+
ERR
193+
194+
char buf[LEN*NTHREADS];
195+
size_t count = LEN * NTHREADS;
196+
MPI_Offset off = rank * count;
197+
for (i=0; i<count; i++) buf[i] = '0'+rank;
198+
err = MPI_File_write_at_all(fh, off, buf, count, MPI_BYTE, &status);
199+
ERR
200+
201+
/* initialize thread barrier */
202+
pthread_barrier_init(&barr, NULL, NTHREADS);
203+
204+
/* create threads, each calls thread_func() */
205+
for (i=0; i<NTHREADS; i++) {
206+
thread_arg t_arg[NTHREADS]; /* must be unique to each thread */
207+
t_arg[i].id = i + rank * NTHREADS;
208+
t_arg[i].fh = fh;
209+
t_arg[i].nprocs = nprocs;
210+
t_arg[i].rank = rank;
211+
t_arg[i].count = LEN;
212+
sprintf(t_arg[i].fname, "%s",filename);
213+
if (pthread_create(&threads[i], NULL, thread_func, &t_arg[i])) {
214+
fprintf(stderr, "Error in %s line %d creating thread %d\n",
215+
__FILE__, __LINE__, i);
216+
nerrs++;
217+
}
218+
else
219+
printf("Success create pthread %d with ID %lu\n",i, threads[i]);
220+
}
221+
222+
/* wait for all threads to finish */
223+
for (i=0; i<NTHREADS; i++) {
224+
void *ret;
225+
if (pthread_join(threads[i], (void**)&ret)) {
226+
fprintf(stderr, "Error in %s line %d joining thread %d\n",
227+
__FILE__, __LINE__, i);
228+
}
229+
nerrs += *(int*)ret;
230+
free(ret);
231+
}
232+
233+
pthread_barrier_destroy(&barr);
234+
235+
err = MPI_File_close(&fh); ERR
236+
237+
MPI_Finalize();
238+
239+
return (nerrs > 0);
240+
}
241+

0 commit comments

Comments
 (0)