Skip to content

Commit ef47fa4

Browse files
committed
jpeg in a separate thread
1 parent f2f560a commit ef47fa4

File tree

3 files changed

+128
-100
lines changed

3 files changed

+128
-100
lines changed

src/libs/device.c

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ int us_device_grab_buffer(us_device_s *dev, us_hw_buffer_s **hw) {
366366
} while (true);
367367

368368
*hw = &run->hw_bufs[buf.index];
369-
atomic_store(&(*hw)->busy, 0);
369+
atomic_store(&(*hw)->refs, 0);
370370
(*hw)->raw.dma_fd = (*hw)->dma_fd;
371371
(*hw)->raw.used = buf.bytesused;
372372
(*hw)->raw.width = run->width;
@@ -383,17 +383,25 @@ int us_device_grab_buffer(us_device_s *dev, us_hw_buffer_s **hw) {
383383
}
384384

385385
int us_device_release_buffer(us_device_s *dev, us_hw_buffer_s *hw) {
386+
assert(atomic_load(&hw->refs) == 0);
386387
const uint index = hw->buf.index;
387388
_D_LOG_DEBUG("Releasing device buffer=%u ...", index);
388389
if (us_xioctl(dev->run->fd, VIDIOC_QBUF, &hw->buf) < 0) {
389390
_D_LOG_PERROR("Can't release device buffer=%u", index);
390391
return -1;
391392
}
392393
hw->grabbed = false;
393-
atomic_store(&hw->busy, 0);
394394
return 0;
395395
}
396396

397+
void us_device_buffer_incref(us_hw_buffer_s *hw) {
398+
atomic_fetch_add(&hw->refs, 1);
399+
}
400+
401+
void us_device_buffer_decref(us_hw_buffer_s *hw) {
402+
atomic_fetch_sub(&hw->refs, 1);
403+
}
404+
397405
int _device_wait_buffer(us_device_s *dev) {
398406
us_device_runtime_s *const run = dev->run;
399407

@@ -834,7 +842,7 @@ static int _device_open_io_method_mmap(us_device_s *dev) {
834842
}
835843

836844
us_hw_buffer_s *hw = &run->hw_bufs[run->n_bufs];
837-
atomic_init(&hw->busy, false);
845+
atomic_init(&hw->refs, 0);
838846
const uz buf_size = (run->capture_mplane ? buf.m.planes[0].length : buf.length);
839847
const off_t buf_offset = (run->capture_mplane ? buf.m.planes[0].m.mem_offset : buf.m.offset);
840848

src/libs/device.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ typedef struct {
4848
struct v4l2_buffer buf;
4949
int dma_fd;
5050
bool grabbed;
51-
atomic_int busy;
51+
atomic_int refs;
5252
} us_hw_buffer_s;
5353

5454
typedef struct {
@@ -132,3 +132,6 @@ void us_device_close(us_device_s *dev);
132132

133133
int us_device_grab_buffer(us_device_s *dev, us_hw_buffer_s **hw);
134134
int us_device_release_buffer(us_device_s *dev, us_hw_buffer_s *hw);
135+
136+
void us_device_buffer_incref(us_hw_buffer_s *hw);
137+
void us_device_buffer_decref(us_hw_buffer_s *hw);

src/ustreamer/stream.c

Lines changed: 113 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
#include <stdatomic.h>
2727
#include <unistd.h>
2828
#include <errno.h>
29-
#include <assert.h>
3029

3130
#include <pthread.h>
3231

@@ -49,6 +48,13 @@
4948
#endif
5049

5150

51+
typedef struct {
52+
pthread_t tid;
53+
us_queue_s *queue;
54+
us_stream_s *stream;
55+
atomic_bool *stop;
56+
} _jpeg_context_s;
57+
5258
typedef struct {
5359
pthread_t tid;
5460
us_device_s *dev;
@@ -66,11 +72,11 @@ typedef struct {
6672
} _releaser_context_s;
6773

6874

75+
static void *_jpeg_thread(void *v_ctx);
6976
static void *_h264_thread(void *v_ctx);
7077
static void *_releaser_thread(void *v_ctx);
7178

7279
static bool _stream_has_any_clients(us_stream_s *stream);
73-
static bool _stream_slowdown(us_stream_s *stream);
7480
static int _stream_init_loop(us_stream_s *stream);
7581
static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame);
7682
static void _stream_check_suicide(us_stream_s *stream);
@@ -142,6 +148,13 @@ void us_stream_loop(us_stream_s *stream) {
142148
US_THREAD_CREATE(releasers[index].tid, _releaser_thread, &releasers[index]);
143149
}
144150

151+
_jpeg_context_s jpeg_ctx = {
152+
.queue = us_queue_init(stream->dev->run->n_bufs),
153+
.stream = stream,
154+
.stop = &threads_stop,
155+
};
156+
US_THREAD_CREATE(jpeg_ctx.tid, _jpeg_thread, &jpeg_ctx);
157+
145158
_h264_context_s h264_ctx;
146159
if (run->h264 != NULL) {
147160
h264_ctx.dev = stream->dev;
@@ -151,94 +164,53 @@ void us_stream_loop(us_stream_s *stream) {
151164
US_THREAD_CREATE(h264_ctx.tid, _h264_thread, &h264_ctx);
152165
}
153166

154-
ldf grab_after = 0;
155-
uint fluency_passed = 0;
156167
uint captured_fps_accum = 0;
157168
sll captured_fps_ts = 0;
158169

159170
US_LOG_INFO("Capturing ...");
160171

172+
uint slowdown_count = 0;
161173
while (!atomic_load(&run->stop) && !atomic_load(&threads_stop)) {
162174
_stream_check_suicide(stream);
163-
164-
US_SEP_DEBUG('-');
165-
US_LOG_DEBUG("Waiting for worker ...");
166-
167-
us_worker_s *const ready_wr = us_workers_pool_wait(stream->enc->run->pool);
168-
us_encoder_job_s *const ready_job = ready_wr->job;
169-
170-
if (ready_job->hw != NULL) {
171-
assert(!us_queue_put(releasers[ready_job->hw->buf.index].queue, ready_job->hw, 0));
172-
atomic_fetch_sub(&ready_job->hw->busy, 1);
173-
ready_job->hw = NULL;
174-
if (ready_wr->job_failed) {
175-
// pass
176-
} else if (ready_wr->job_timely) {
177-
_stream_expose_frame(stream, ready_job->dest);
178-
US_LOG_PERF("##### Encoded JPEG exposed; worker=%s, latency=%.3Lf",
179-
ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts);
180-
} else {
181-
US_LOG_PERF("----- Encoded JPEG dropped; worker=%s", ready_wr->name);
175+
if (stream->slowdown && !_stream_has_any_clients(stream)) {
176+
usleep(100 * 1000);
177+
slowdown_count = (slowdown_count + 1) % 10;
178+
if (slowdown_count > 0) {
179+
continue;
182180
}
183181
}
184182

185-
const bool h264_force_key = _stream_slowdown(stream);
186-
if (atomic_load(&run->stop) || atomic_load(&threads_stop)) {
187-
goto close;
188-
}
189-
190183
us_hw_buffer_s *hw;
191184
const int buf_index = us_device_grab_buffer(stream->dev, &hw);
192185
switch (buf_index) {
193186
case -3: continue; // Broken frame
194187
case -2: // Persistent timeout
195-
case -1: goto close; // Any error
188+
case -1: goto close; // Error
196189
}
197190
assert(buf_index >= 0);
198191

192+
const sll now_sec_ts = us_floor_ms(us_get_now_monotonic());
193+
if (now_sec_ts != captured_fps_ts) {
194+
US_LOG_PERF_FPS("A new second has come; captured_fps=%u", captured_fps_accum);
195+
atomic_store(&run->http_captured_fps, captured_fps_accum);
196+
captured_fps_accum = 0;
197+
captured_fps_ts = now_sec_ts;
198+
}
199+
captured_fps_accum += 1;
200+
199201
# ifdef WITH_GPIO
200202
us_gpio_set_stream_online(true);
201203
# endif
202204

203-
const ldf now_ts = us_get_now_monotonic();
204-
205-
if (now_ts < grab_after) {
206-
fluency_passed += 1;
207-
US_LOG_VERBOSE("Passed %u frames for fluency: now=%.03Lf, grab_after=%.03Lf",
208-
fluency_passed, now_ts, grab_after);
209-
assert(!us_queue_put(releasers[hw->buf.index].queue, hw, 0));
210-
} else {
211-
int hw_busy = 1;
212-
if (run->h264 != NULL) {
213-
hw_busy += 1;
214-
}
215-
atomic_store(&hw->busy, hw_busy);
216-
217-
fluency_passed = 0;
218-
219-
const sll now_sec_ts = us_floor_ms(now_ts);
220-
if (now_sec_ts != captured_fps_ts) {
221-
US_LOG_PERF_FPS("A new second has come; captured_fps=%u", captured_fps_accum);
222-
atomic_store(&run->http_captured_fps, captured_fps_accum);
223-
captured_fps_accum = 0;
224-
captured_fps_ts = now_sec_ts;
225-
}
226-
captured_fps_accum += 1;
227-
228-
const ldf fluency_delay = us_workers_pool_get_fluency_delay(stream->enc->run->pool, ready_wr);
229-
grab_after = now_ts + fluency_delay;
230-
US_LOG_VERBOSE("Fluency: delay=%.03Lf, grab_after=%.03Lf", fluency_delay, grab_after);
231-
232-
ready_job->hw = hw;
233-
us_workers_pool_assign(stream->enc->run->pool, ready_wr);
234-
US_LOG_DEBUG("Assigned new frame in buffer=%d to worker=%s", buf_index, ready_wr->name);
235-
236-
_SINK_PUT(raw_sink, &hw->raw);
237-
238-
if (run->h264 != NULL) {
239-
us_queue_put(h264_ctx.queue, hw, h264_force_key);
240-
}
205+
us_device_buffer_incref(hw); // JPEG
206+
us_queue_put(jpeg_ctx.queue, hw, 0);
207+
if (run->h264 != NULL) {
208+
us_device_buffer_incref(hw); // H264
209+
us_queue_put(h264_ctx.queue, hw, 0);
241210
}
211+
us_queue_put(releasers[buf_index].queue, hw, 0); // Plan to release
212+
213+
_SINK_PUT(raw_sink, &hw->raw);
242214
}
243215

244216
close:
@@ -249,6 +221,9 @@ void us_stream_loop(us_stream_s *stream) {
249221
us_queue_destroy(h264_ctx.queue);
250222
}
251223

224+
US_THREAD_JOIN(jpeg_ctx.tid);
225+
us_queue_destroy(jpeg_ctx.queue);
226+
252227
for (uint index = 0; index < n_releasers; ++index) {
253228
US_THREAD_JOIN(releasers[index].tid);
254229
us_queue_destroy(releasers[index].queue);
@@ -273,14 +248,66 @@ void us_stream_loop_break(us_stream_s *stream) {
273248
atomic_store(&stream->run->stop, true);
274249
}
275250

251+
static void *_jpeg_thread(void *v_ctx) {
252+
_jpeg_context_s *ctx = v_ctx;
253+
us_stream_s *stream = ctx->stream;
254+
255+
ldf grab_after = 0;
256+
uint fluency_passed = 0;
257+
258+
while (!atomic_load(ctx->stop)) {
259+
us_worker_s *const ready_wr = us_workers_pool_wait(stream->enc->run->pool);
260+
us_encoder_job_s *const ready_job = ready_wr->job;
261+
262+
if (ready_job->hw != NULL) {
263+
us_device_buffer_decref(ready_job->hw);
264+
ready_job->hw = NULL;
265+
if (ready_wr->job_failed) {
266+
// pass
267+
} else if (ready_wr->job_timely) {
268+
_stream_expose_frame(stream, ready_job->dest);
269+
US_LOG_PERF("##### Encoded JPEG exposed; worker=%s, latency=%.3Lf",
270+
ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts);
271+
} else {
272+
US_LOG_PERF("----- Encoded JPEG dropped; worker=%s", ready_wr->name);
273+
}
274+
}
275+
276+
us_hw_buffer_s *hw;
277+
if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) {
278+
continue;
279+
}
280+
281+
const ldf now_ts = us_get_now_monotonic();
282+
if (now_ts < grab_after) {
283+
fluency_passed += 1;
284+
US_LOG_VERBOSE("Passed %u frames for fluency: now=%.03Lf, grab_after=%.03Lf",
285+
fluency_passed, now_ts, grab_after);
286+
us_device_buffer_decref(hw);
287+
continue;
288+
}
289+
fluency_passed = 0;
290+
291+
const ldf fluency_delay = us_workers_pool_get_fluency_delay(stream->enc->run->pool, ready_wr);
292+
grab_after = now_ts + fluency_delay;
293+
US_LOG_VERBOSE("Fluency: delay=%.03Lf, grab_after=%.03Lf", fluency_delay, grab_after);
294+
295+
ready_job->hw = hw;
296+
us_workers_pool_assign(stream->enc->run->pool, ready_wr);
297+
US_LOG_DEBUG("Assigned new frame in buffer=%d to worker=%s", hw->buf.index, ready_wr->name);
298+
}
299+
return NULL;
300+
}
301+
276302
static void *_h264_thread(void *v_ctx) {
277303
_h264_context_s *ctx = v_ctx;
278304
while (!atomic_load(ctx->stop)) {
279305
us_hw_buffer_s *hw;
280-
if (!us_queue_get(ctx->queue, (void**)&hw, 0.1)) {
281-
us_h264_stream_process(ctx->h264, &hw->raw, false);
282-
atomic_fetch_sub(&hw->busy, 1);
306+
if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) {
307+
continue;
283308
}
309+
us_h264_stream_process(ctx->h264, &hw->raw, false);
310+
us_device_buffer_decref(hw);
284311
}
285312
return NULL;
286313
}
@@ -289,21 +316,23 @@ static void *_releaser_thread(void *v_ctx) {
289316
_releaser_context_s *ctx = v_ctx;
290317
while (!atomic_load(ctx->stop)) {
291318
us_hw_buffer_s *hw;
292-
if (!us_queue_get(ctx->queue, (void**)&hw, 0.1)) {
293-
while (atomic_load(&hw->busy) > 0) {
294-
if (atomic_load(ctx->stop)) {
295-
break;
296-
}
297-
usleep(5 * 1000);
298-
}
299-
US_MUTEX_LOCK(*ctx->mutex);
300-
const int released = us_device_release_buffer(ctx->dev, hw);
301-
US_MUTEX_UNLOCK(*ctx->mutex);
302-
if (released < 0) {
303-
break;
319+
if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) {
320+
continue;
321+
}
322+
while (atomic_load(&hw->refs) > 0) {
323+
if (atomic_load(ctx->stop)) {
324+
goto done;
304325
}
326+
usleep(5 * 1000);
327+
}
328+
US_MUTEX_LOCK(*ctx->mutex);
329+
const int released = us_device_release_buffer(ctx->dev, hw);
330+
US_MUTEX_UNLOCK(*ctx->mutex);
331+
if (released < 0) {
332+
goto done;
305333
}
306334
}
335+
done:
307336
atomic_store(ctx->stop, true); // Stop all other guys on error
308337
return NULL;
309338
}
@@ -318,18 +347,6 @@ static bool _stream_has_any_clients(us_stream_s *stream) {
318347
);
319348
}
320349

321-
static bool _stream_slowdown(us_stream_s *stream) {
322-
if (stream->slowdown) {
323-
unsigned count = 0;
324-
while (count < 10 && !atomic_load(&stream->run->stop) && !_stream_has_any_clients(stream)) {
325-
usleep(100000);
326-
++count;
327-
}
328-
return (count >= 10);
329-
}
330-
return false;
331-
}
332-
333350
static int _stream_init_loop(us_stream_s *stream) {
334351
us_stream_runtime_s *const run = stream->run;
335352

@@ -447,10 +464,10 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame) {
447464
}
448465

449466
static void _stream_check_suicide(us_stream_s *stream) {
450-
us_stream_runtime_s *const run = stream->run;
451-
if (stream->exit_on_no_clients <= 0) {
467+
if (stream->exit_on_no_clients == 0) {
452468
return;
453469
}
470+
us_stream_runtime_s *const run = stream->run;
454471
const ldf now_ts = us_get_now_monotonic();
455472
const ull http_last_request_ts = atomic_load(&run->http_last_request_ts); // Seconds
456473
if (_stream_has_any_clients(stream)) {

0 commit comments

Comments
 (0)