Skip to content

Commit 85b0a50

Browse files
committed
Rework apps/examples/workers/main.c
1 parent f44e300 commit 85b0a50

File tree

1 file changed

+77
-51
lines changed

1 file changed

+77
-51
lines changed

apps/examples/workers/main.c

Lines changed: 77 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
#include <stdlib.h>
3434
#include <string.h>
3535

36-
#include "common/compiler.h"
3736
#include "common/alignment.h"
3837
#include "common/macros.h"
3938
#include "event/event.h"
@@ -43,8 +42,8 @@
4342
#include "hsm/hsm.h"
4443

4544
#define AM_WORKERS_NUM_MAX 64
46-
#define AM_WORKER_LOAD_CYCLES 100000
47-
#define AM_TIMEOUT_MS (60 * 1000)
45+
#define AM_WORKER_LOAD_CYCLES 10000
46+
#define AM_TIMEOUT_MS (10 * 1000)
4847

4948
enum fork_evt {
5049
EVT_JOB_DONE = AM_EVT_USER,
@@ -72,11 +71,11 @@ struct stopped {
7271
int worker;
7372
};
7473

75-
union events {
76-
struct job_req req;
77-
struct job_done done;
78-
struct stopped stopped;
79-
};
74+
typedef union events {
75+
struct job_req req; /* cppcheck-suppress unusedStructMember */
76+
struct job_done done; /* cppcheck-suppress unusedStructMember */
77+
struct stopped stopped; /* cppcheck-suppress unusedStructMember */
78+
} events_t;
8079

8180
static struct am_ao_subscribe_list m_pubsub_list[EVT_PUB_MAX];
8281
static union events m_event_pool[AM_WORKERS_NUM_MAX];
@@ -90,26 +89,30 @@ struct worker {
9089
int id;
9190
};
9291

93-
static struct worker m_worker[AM_WORKERS_NUM_MAX];
92+
static struct worker m_workers[AM_WORKERS_NUM_MAX];
9493

95-
static const struct am_event m_evt_job_done = {.id = EVT_JOB_DONE};
9694
static const struct am_event m_evt_stop = {.id = EVT_STOP};
9795

9896
static int worker_proc(struct worker *me, const struct am_event *event) {
9997
switch (event->id) {
100-
case EVT_JOB: {
101-
struct job_req *req = (struct job_req*)event;
98+
case EVT_JOB_REQ: {
99+
const struct job_req *req = (const struct job_req *)event;
102100
for (volatile int i = 0; i < req->cycles; ++i) {
103101
;
104102
}
105-
struct job_done *done = am_event_allocate(EVT_JOB_DONE, sizeof(struct job_done));
106-
am_ao_publish(done);
103+
struct job_done *done = (struct job_done *)am_event_allocate(
104+
EVT_JOB_DONE, sizeof(struct job_done)
105+
);
106+
done->worker = me->id;
107+
am_ao_publish(&done->event);
107108
return AM_HSM_HANDLED();
108109
}
109110
case EVT_STOP: {
110-
struct stopped *done = am_event_allocate(EVT_JOB_DONE, sizeof(struct job_done));
111-
done->worker = me->id;
112-
am_ao_publish(done);
111+
struct stopped *stopped = (struct stopped *)am_event_allocate(
112+
EVT_STOPPED, sizeof(struct stopped)
113+
);
114+
stopped->worker = me->id;
115+
am_ao_publish(&stopped->event);
113116
am_ao_stop(&me->ao);
114117
return AM_HSM_HANDLED();
115118
}
@@ -126,41 +129,38 @@ static int worker_init(struct worker *me, const struct am_event *event) {
126129
return AM_HSM_TRAN(worker_proc);
127130
}
128131

129-
static void worker_ctor(int index) {
130-
struct worker *me = &m_worker[index];
132+
static void worker_ctor(struct worker *me, int id) {
131133
memset(me, 0, sizeof(*me));
132134
am_ao_ctor(&me->ao, AM_HSM_STATE_CTOR(worker_init));
133-
me->index = index;
135+
me->id = id;
134136
}
135137

136138
struct balancer {
137139
struct am_ao ao;
138140
struct am_timer timeout;
139-
int workers;
141+
int nworkers;
140142
int nstops;
143+
int stats[AM_WORKERS_NUM_MAX];
141144
};
142145

143146
static struct balancer m_balancer;
144147

145-
static int balancer_proc(struct balancer *me, const struct am_event *event) {
148+
static int balancer_stopping(
149+
struct balancer *me, const struct am_event *event
150+
) {
146151
switch (event->id) {
147-
case AM_EVT_ENTRY:
148-
am_timer_arm_ms(me->timeout, AM_TIMEOUT_MS, /*interval=*/0);
149-
struct job_req *req = am_event_allocate(EVT_JOB_REQ, sizeof(struct job_req));
150-
req->cycles = AM_WORKER_LOAD_CYCLES;
151-
am_ao_publish(req);
152+
case AM_EVT_HSM_ENTRY:
153+
am_ao_publish_exclude(&m_evt_stop, &me->ao);
152154
return AM_HSM_HANDLED();
153155

154-
case EVT_TIMEOUT:
155-
return AM_HSM_TRAN(balancer_stopping);
156-
157-
case EVT_JOB_DONE: {
158-
struct job_done *done = (struct job_done*)event;
159-
AM_ASSERT(done->worker >= 0);
160-
AM_ASSERT(done->worker < AM_COUNTOF(m_workers));
161-
struct job_req *req = am_event_allocate(EVT_JOB_REQ, sizeof(struct job_req));
162-
req->cycles = AM_WORKER_LOAD_CYCLES;
163-
am_ao_post_fifo(&m_workers[done->worker].ao, req);
156+
case EVT_STOPPED: {
157+
++me->nstops;
158+
if (me->nstops == me->nworkers) {
159+
for (int i = 0; i < me->nworkers; ++i) {
160+
am_pal_printf("worker: %d jobs done: %d\n", i, me->stats[i]);
161+
}
162+
am_ao_stop(&me->ao);
163+
}
164164
return AM_HSM_HANDLED();
165165
}
166166
default:
@@ -169,17 +169,30 @@ static int balancer_proc(struct balancer *me, const struct am_event *event) {
169169
return AM_HSM_SUPER(am_hsm_top);
170170
}
171171

172-
static int balancer_stopping(struct balancer *me, const struct am_event *event) {
172+
static int balancer_proc(struct balancer *me, const struct am_event *event) {
173173
switch (event->id) {
174-
case AM_EVT_ENTRY:
175-
am_ao_publish(&m_evt_stop);
176-
return AM_HSM_DONE();
174+
case AM_EVT_HSM_ENTRY: {
175+
am_timer_arm_ms(&me->timeout, AM_TIMEOUT_MS, /*interval=*/0);
176+
struct job_req *req = (struct job_req *)am_event_allocate(
177+
EVT_JOB_REQ, sizeof(struct job_req)
178+
);
179+
req->cycles = AM_WORKER_LOAD_CYCLES;
180+
am_ao_publish_exclude(&req->event, &me->ao);
181+
return AM_HSM_HANDLED();
182+
}
183+
case EVT_TIMEOUT:
184+
return AM_HSM_TRAN(balancer_stopping);
177185

178-
case EVT_STOPPED: {
179-
++me->nstops;
180-
if (me->nstops == me->workers) {
181-
am_ao_stop(&me->ao);
182-
}
186+
case EVT_JOB_DONE: {
187+
const struct job_done *done = (const struct job_done *)event;
188+
AM_ASSERT(done->worker >= 0);
189+
AM_ASSERT(done->worker < AM_COUNTOF(m_workers));
190+
struct job_req *req = (struct job_req *)am_event_allocate(
191+
EVT_JOB_REQ, sizeof(struct job_req)
192+
);
193+
req->cycles = AM_WORKER_LOAD_CYCLES;
194+
am_ao_post_fifo(&m_workers[done->worker].ao, &req->event);
195+
++me->stats[done->worker];
183196
return AM_HSM_HANDLED();
184197
}
185198
default:
@@ -203,19 +216,30 @@ static void balancer_ctor(int nworkers) {
203216
am_ao_ctor(&me->ao, AM_HSM_STATE_CTOR(balancer_init));
204217
}
205218

206-
AM_NORETURN static void ticker_task(void *param) {
219+
static void ticker_task(void *param) {
207220
(void)param;
208221

209222
am_ao_wait_start_all();
210223

211224
uint32_t now_ticks = am_pal_time_get_tick(AM_PAL_TICK_DOMAIN_DEFAULT);
212-
for (;;) {
225+
while (am_ao_get_cnt() > 0) {
213226
am_pal_sleep_till_ticks(AM_PAL_TICK_DOMAIN_DEFAULT, now_ticks + 1);
214227
now_ticks += 1;
215228
am_timer_tick(AM_PAL_TICK_DOMAIN_DEFAULT);
216229
}
217230
}
218231

232+
AM_ALIGNOF_DEFINE(events_t);
233+
234+
static int prio_map_fn(int prio) {
235+
AM_ASSERT(prio >= 0);
236+
AM_ASSERT(prio <= AM_AO_PRIO_MAX);
237+
if (prio == AM_AO_PRIO_MAX) {
238+
return prio;
239+
}
240+
return 0;
241+
}
242+
219243
int main(void) {
220244
struct am_ao_state_cfg cfg = {
221245
.on_idle = am_pal_on_idle,
@@ -224,11 +248,13 @@ int main(void) {
224248
};
225249
am_ao_state_ctor(&cfg);
226250

251+
am_pal_register_prio_map_cb(prio_map_fn);
252+
227253
am_event_add_pool(
228254
m_event_pool,
229255
sizeof(m_event_pool),
230256
sizeof(m_event_pool[0]),
231-
AM_ALIGNOF(union events)
257+
AM_ALIGNOF(events_t)
232258
);
233259

234260
am_ao_init_subscribe_list(m_pubsub_list, AM_COUNTOF(m_pubsub_list));
@@ -240,7 +266,7 @@ int main(void) {
240266

241267
balancer_ctor(/*nworkers=*/ncpus);
242268
for (int i = 0; i < ncpus; ++i) {
243-
worker_ctor(&m_worker[i], /*worker=*/i);
269+
worker_ctor(&m_workers[i], /*id=*/i);
244270
}
245271

246272
am_ao_start(
@@ -257,7 +283,7 @@ int main(void) {
257283
int prio_min = AM_AO_PRIO_MIN;
258284
for (int i = 0; i < ncpus; ++i) {
259285
am_ao_start(
260-
&m_worker[i].ao,
286+
&m_workers[i].ao,
261287
prio_min + i,
262288
/*queue=*/m_queue_worker[i],
263289
/*nqueue=*/AM_COUNTOF(m_queue_worker[i]),

0 commit comments

Comments
 (0)