Skip to content

Commit 6e85e11

Browse files
committed
reworked pool logic
1 parent fab4c47 commit 6e85e11

File tree

2 files changed

+36
-64
lines changed

2 files changed

+36
-64
lines changed

src/ustreamer/workers.c

Lines changed: 31 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "../libs/tools.h"
3131
#include "../libs/threading.h"
3232
#include "../libs/logging.h"
33+
#include "../libs/list.h"
3334

3435

3536
static void *_worker_thread(void *v_worker);
@@ -53,13 +54,13 @@ us_workers_pool_s *us_workers_pool_init(
5354
atomic_init(&pool->stop, false);
5455

5556
pool->n_workers = n_workers;
56-
US_CALLOC(pool->workers, pool->n_workers);
5757

5858
US_MUTEX_INIT(pool->free_workers_mutex);
5959
US_COND_INIT(pool->free_workers_cond);
6060

6161
for (uint index = 0; index < pool->n_workers; ++index) {
62-
us_worker_s *const wr = &pool->workers[index];
62+
us_worker_s *wr;
63+
US_CALLOC(wr, 1);
6364

6465
wr->number = index;
6566
US_ASPRINTF(wr->name, "%s-%u", wr_prefix, index);
@@ -73,6 +74,8 @@ us_workers_pool_s *us_workers_pool_init(
7374

7475
US_THREAD_CREATE(wr->tid, _worker_thread, (void*)wr);
7576
pool->free_workers += 1;
77+
78+
US_LIST_APPEND(pool->workers, wr);
7679
}
7780
return pool;
7881
}
@@ -81,9 +84,7 @@ void us_workers_pool_destroy(us_workers_pool_s *pool) {
8184
US_LOG_INFO("Destroying workers pool %s ...", pool->name);
8285

8386
atomic_store(&pool->stop, true);
84-
for (uint index = 0; index < pool->n_workers; ++index) {
85-
us_worker_s *const wr = &pool->workers[index];
86-
87+
US_LIST_ITERATE(pool->workers, wr, { // cppcheck-suppress constStatement
8788
US_MUTEX_LOCK(wr->has_job_mutex);
8889
atomic_store(&wr->has_job, true); // Final job: die
8990
US_MUTEX_UNLOCK(wr->has_job_mutex);
@@ -93,83 +94,56 @@ void us_workers_pool_destroy(us_workers_pool_s *pool) {
9394
US_MUTEX_DESTROY(wr->has_job_mutex);
9495
US_COND_DESTROY(wr->has_job_cond);
9596

96-
free(wr->name);
97-
9897
pool->job_destroy(wr->job);
99-
}
98+
99+
free(wr->name);
100+
free(wr);
101+
});
100102

101103
US_MUTEX_DESTROY(pool->free_workers_mutex);
102104
US_COND_DESTROY(pool->free_workers_cond);
103105

104-
free(pool->workers);
105106
free(pool);
106107
}
107108

108109
us_worker_s *us_workers_pool_wait(us_workers_pool_s *pool) {
109-
us_worker_s *ready_wr = NULL;
110-
111110
US_MUTEX_LOCK(pool->free_workers_mutex);
112111
US_COND_WAIT_FOR(pool->free_workers, pool->free_workers_cond, pool->free_workers_mutex);
113112
US_MUTEX_UNLOCK(pool->free_workers_mutex);
114113

115-
if (pool->oldest_wr && !atomic_load(&pool->oldest_wr->has_job)) {
116-
ready_wr = pool->oldest_wr;
117-
ready_wr->job_timely = true;
118-
} else {
119-
for (uint index = 0; index < pool->n_workers; ++index) {
120-
us_worker_s *const wr = &pool->workers[index];
121-
if (
122-
!atomic_load(&wr->has_job) && (
123-
ready_wr == NULL
124-
|| ready_wr->job_start_ts < wr->job_start_ts
125-
)
126-
) {
127-
ready_wr = wr;
128-
break;
129-
}
114+
us_worker_s *found = NULL;
115+
US_LIST_ITERATE(pool->workers, wr, {
116+
if (!atomic_load(&wr->has_job) && (found == NULL || found->job_start_ts <= wr->job_start_ts)) {
117+
found = wr;
130118
}
131-
assert(ready_wr != NULL);
132-
ready_wr->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате)
119+
});
120+
assert(found != NULL);
121+
US_LIST_REMOVE(pool->workers, found);
122+
US_LIST_APPEND(pool->workers, found); // Перемещаем в конец списка
123+
124+
found->job_timely = (found->job_start_ts > pool->job_timely_ts);
125+
if (found->job_timely) {
126+
pool->job_timely_ts = found->job_start_ts;
133127
}
134-
return ready_wr;
128+
return found;
135129
}
136130

137-
void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr/*, void *job*/) {
138-
if (pool->oldest_wr == ready_wr) {
139-
pool->oldest_wr = pool->oldest_wr->next_wr;
140-
}
141-
if (pool->oldest_wr == NULL) {
142-
pool->oldest_wr = ready_wr;
143-
pool->latest_wr = pool->oldest_wr;
144-
} else {
145-
if (ready_wr->next_wr != NULL) {
146-
ready_wr->next_wr->prev_wr = ready_wr->prev_wr;
147-
}
148-
if (ready_wr->prev_wr != NULL) {
149-
ready_wr->prev_wr->next_wr = ready_wr->next_wr;
150-
}
151-
ready_wr->prev_wr = pool->latest_wr;
152-
pool->latest_wr->next_wr = ready_wr;
153-
pool->latest_wr = ready_wr;
154-
}
155-
pool->latest_wr->next_wr = NULL;
156-
157-
US_MUTEX_LOCK(ready_wr->has_job_mutex);
158-
//ready_wr->job = job;
159-
atomic_store(&ready_wr->has_job, true);
160-
US_MUTEX_UNLOCK(ready_wr->has_job_mutex);
161-
US_COND_SIGNAL(ready_wr->has_job_cond);
131+
void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *wr) {
132+
US_MUTEX_LOCK(wr->has_job_mutex);
133+
atomic_store(&wr->has_job, true);
134+
US_MUTEX_UNLOCK(wr->has_job_mutex);
135+
US_COND_SIGNAL(wr->has_job_cond);
162136

163137
US_MUTEX_LOCK(pool->free_workers_mutex);
164138
pool->free_workers -= 1;
165139
US_MUTEX_UNLOCK(pool->free_workers_mutex);
166140
}
167141

168-
ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr) {
169-
const ldf approx_job_time = pool->approx_job_time * 0.9 + ready_wr->last_job_time * 0.1;
142+
ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *wr) {
143+
const ldf approx_job_time = pool->approx_job_time * 0.9 + wr->last_job_time * 0.1;
170144

171145
US_LOG_VERBOSE("Correcting pool's %s approx_job_time: %.3Lf -> %.3Lf (last_job_time=%.3Lf)",
172-
pool->name, pool->approx_job_time, approx_job_time, ready_wr->last_job_time);
146+
pool->name, pool->approx_job_time, approx_job_time, wr->last_job_time);
173147

174148
pool->approx_job_time = approx_job_time;
175149

@@ -203,7 +177,6 @@ static void *_worker_thread(void *v_worker) {
203177
wr->job_start_ts = job_start_ts;
204178
wr->last_job_time = us_get_now_monotonic() - wr->job_start_ts;
205179
}
206-
//wr->job = NULL;
207180
atomic_store(&wr->has_job, false);
208181
}
209182

src/ustreamer/workers.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <pthread.h>
2828

2929
#include "../libs/types.h"
30+
#include "../libs/list.h"
3031

3132

3233
typedef struct us_worker_sx {
@@ -44,10 +45,9 @@ typedef struct us_worker_sx {
4445
ldf job_start_ts;
4546
pthread_cond_t has_job_cond;
4647

47-
struct us_worker_sx *prev_wr;
48-
struct us_worker_sx *next_wr;
49-
5048
struct us_workers_pool_sx *pool;
49+
50+
US_LIST_DECLARE;
5151
} us_worker_s;
5252

5353
typedef void *(*us_workers_pool_job_init_f)(void *arg);
@@ -63,8 +63,7 @@ typedef struct us_workers_pool_sx {
6363

6464
uint n_workers;
6565
us_worker_s *workers;
66-
us_worker_s *oldest_wr;
67-
us_worker_s *latest_wr;
66+
ldf job_timely_ts;
6867

6968
ldf approx_job_time;
7069

@@ -85,6 +84,6 @@ us_workers_pool_s *us_workers_pool_init(
8584
void us_workers_pool_destroy(us_workers_pool_s *pool);
8685

8786
us_worker_s *us_workers_pool_wait(us_workers_pool_s *pool);
88-
void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr/*, void *job*/);
87+
void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr);
8988

9089
ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr);

0 commit comments

Comments
 (0)