Skip to content

Commit b6fc13b

Browse files
committed
jpeg in a separate thread
1 parent f2f560a commit b6fc13b

File tree

3 files changed

+126
-100
lines changed

3 files changed

+126
-100
lines changed

src/libs/device.c

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ int us_device_grab_buffer(us_device_s *dev, us_hw_buffer_s **hw) {
366366
} while (true);
367367

368368
*hw = &run->hw_bufs[buf.index];
369-
atomic_store(&(*hw)->busy, 0);
369+
atomic_store(&(*hw)->refs, 0);
370370
(*hw)->raw.dma_fd = (*hw)->dma_fd;
371371
(*hw)->raw.used = buf.bytesused;
372372
(*hw)->raw.width = run->width;
@@ -383,17 +383,25 @@ int us_device_grab_buffer(us_device_s *dev, us_hw_buffer_s **hw) {
383383
}
384384

385385
int us_device_release_buffer(us_device_s *dev, us_hw_buffer_s *hw) {
386+
assert(atomic_load(&hw->refs) == 0);
386387
const uint index = hw->buf.index;
387388
_D_LOG_DEBUG("Releasing device buffer=%u ...", index);
388389
if (us_xioctl(dev->run->fd, VIDIOC_QBUF, &hw->buf) < 0) {
389390
_D_LOG_PERROR("Can't release device buffer=%u", index);
390391
return -1;
391392
}
392393
hw->grabbed = false;
393-
atomic_store(&hw->busy, 0);
394394
return 0;
395395
}
396396

397+
void us_device_buffer_incref(us_hw_buffer_s *hw) {
398+
atomic_fetch_add(&hw->refs, 1);
399+
}
400+
401+
void us_device_buffer_decref(us_hw_buffer_s *hw) {
402+
atomic_fetch_sub(&hw->refs, 1);
403+
}
404+
397405
int _device_wait_buffer(us_device_s *dev) {
398406
us_device_runtime_s *const run = dev->run;
399407

@@ -834,7 +842,7 @@ static int _device_open_io_method_mmap(us_device_s *dev) {
834842
}
835843

836844
us_hw_buffer_s *hw = &run->hw_bufs[run->n_bufs];
837-
atomic_init(&hw->busy, false);
845+
atomic_init(&hw->refs, 0);
838846
const uz buf_size = (run->capture_mplane ? buf.m.planes[0].length : buf.length);
839847
const off_t buf_offset = (run->capture_mplane ? buf.m.planes[0].m.mem_offset : buf.m.offset);
840848

src/libs/device.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ typedef struct {
4848
struct v4l2_buffer buf;
4949
int dma_fd;
5050
bool grabbed;
51-
atomic_int busy;
51+
atomic_int refs;
5252
} us_hw_buffer_s;
5353

5454
typedef struct {
@@ -132,3 +132,6 @@ void us_device_close(us_device_s *dev);
132132

133133
int us_device_grab_buffer(us_device_s *dev, us_hw_buffer_s **hw);
134134
int us_device_release_buffer(us_device_s *dev, us_hw_buffer_s *hw);
135+
136+
void us_device_buffer_incref(us_hw_buffer_s *hw);
137+
void us_device_buffer_decref(us_hw_buffer_s *hw);

src/ustreamer/stream.c

Lines changed: 111 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
#include <stdatomic.h>
2727
#include <unistd.h>
2828
#include <errno.h>
29-
#include <assert.h>
3029

3130
#include <pthread.h>
3231

@@ -49,6 +48,13 @@
4948
#endif
5049

5150

51+
typedef struct {
52+
pthread_t tid;
53+
us_queue_s *queue;
54+
us_stream_s *stream;
55+
atomic_bool *stop;
56+
} _jpeg_context_s;
57+
5258
typedef struct {
5359
pthread_t tid;
5460
us_device_s *dev;
@@ -66,11 +72,11 @@ typedef struct {
6672
} _releaser_context_s;
6773

6874

75+
static void *_jpeg_thread(void *v_ctx);
6976
static void *_h264_thread(void *v_ctx);
7077
static void *_releaser_thread(void *v_ctx);
7178

7279
static bool _stream_has_any_clients(us_stream_s *stream);
73-
static bool _stream_slowdown(us_stream_s *stream);
7480
static int _stream_init_loop(us_stream_s *stream);
7581
static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame);
7682
static void _stream_check_suicide(us_stream_s *stream);
@@ -142,6 +148,13 @@ void us_stream_loop(us_stream_s *stream) {
142148
US_THREAD_CREATE(releasers[index].tid, _releaser_thread, &releasers[index]);
143149
}
144150

151+
_jpeg_context_s jpeg_ctx = {
152+
.queue = us_queue_init(stream->dev->run->n_bufs),
153+
.stream = stream,
154+
.stop = &threads_stop,
155+
};
156+
US_THREAD_CREATE(jpeg_ctx.tid, _jpeg_thread, &jpeg_ctx);
157+
145158
_h264_context_s h264_ctx;
146159
if (run->h264 != NULL) {
147160
h264_ctx.dev = stream->dev;
@@ -151,94 +164,51 @@ void us_stream_loop(us_stream_s *stream) {
151164
US_THREAD_CREATE(h264_ctx.tid, _h264_thread, &h264_ctx);
152165
}
153166

154-
ldf grab_after = 0;
155-
uint fluency_passed = 0;
156167
uint captured_fps_accum = 0;
157168
sll captured_fps_ts = 0;
158169

159170
US_LOG_INFO("Capturing ...");
160171

172+
uint slowdown_count = 0;
161173
while (!atomic_load(&run->stop) && !atomic_load(&threads_stop)) {
162174
_stream_check_suicide(stream);
163-
164-
US_SEP_DEBUG('-');
165-
US_LOG_DEBUG("Waiting for worker ...");
166-
167-
us_worker_s *const ready_wr = us_workers_pool_wait(stream->enc->run->pool);
168-
us_encoder_job_s *const ready_job = ready_wr->job;
169-
170-
if (ready_job->hw != NULL) {
171-
assert(!us_queue_put(releasers[ready_job->hw->buf.index].queue, ready_job->hw, 0));
172-
atomic_fetch_sub(&ready_job->hw->busy, 1);
173-
ready_job->hw = NULL;
174-
if (ready_wr->job_failed) {
175-
// pass
176-
} else if (ready_wr->job_timely) {
177-
_stream_expose_frame(stream, ready_job->dest);
178-
US_LOG_PERF("##### Encoded JPEG exposed; worker=%s, latency=%.3Lf",
179-
ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts);
180-
} else {
181-
US_LOG_PERF("----- Encoded JPEG dropped; worker=%s", ready_wr->name);
182-
}
183-
}
184-
185-
const bool h264_force_key = _stream_slowdown(stream);
186-
if (atomic_load(&run->stop) || atomic_load(&threads_stop)) {
187-
goto close;
175+
if (stream->slowdown && slowdown_count > 0 && !_stream_has_any_clients(stream)) {
176+
usleep(100 * 1000);
177+
slowdown_count = (slowdown_count + 1) % 10;
178+
continue;
188179
}
189180

190181
us_hw_buffer_s *hw;
191182
const int buf_index = us_device_grab_buffer(stream->dev, &hw);
192183
switch (buf_index) {
193184
case -3: continue; // Broken frame
194185
case -2: // Persistent timeout
195-
case -1: goto close; // Any error
186+
case -1: goto close; // Error
196187
}
197188
assert(buf_index >= 0);
198189

190+
const sll now_sec_ts = us_floor_ms(us_get_now_monotonic());
191+
if (now_sec_ts != captured_fps_ts) {
192+
US_LOG_PERF_FPS("A new second has come; captured_fps=%u", captured_fps_accum);
193+
atomic_store(&run->http_captured_fps, captured_fps_accum);
194+
captured_fps_accum = 0;
195+
captured_fps_ts = now_sec_ts;
196+
}
197+
captured_fps_accum += 1;
198+
199199
# ifdef WITH_GPIO
200200
us_gpio_set_stream_online(true);
201201
# endif
202202

203-
const ldf now_ts = us_get_now_monotonic();
204-
205-
if (now_ts < grab_after) {
206-
fluency_passed += 1;
207-
US_LOG_VERBOSE("Passed %u frames for fluency: now=%.03Lf, grab_after=%.03Lf",
208-
fluency_passed, now_ts, grab_after);
209-
assert(!us_queue_put(releasers[hw->buf.index].queue, hw, 0));
210-
} else {
211-
int hw_busy = 1;
212-
if (run->h264 != NULL) {
213-
hw_busy += 1;
214-
}
215-
atomic_store(&hw->busy, hw_busy);
216-
217-
fluency_passed = 0;
218-
219-
const sll now_sec_ts = us_floor_ms(now_ts);
220-
if (now_sec_ts != captured_fps_ts) {
221-
US_LOG_PERF_FPS("A new second has come; captured_fps=%u", captured_fps_accum);
222-
atomic_store(&run->http_captured_fps, captured_fps_accum);
223-
captured_fps_accum = 0;
224-
captured_fps_ts = now_sec_ts;
225-
}
226-
captured_fps_accum += 1;
227-
228-
const ldf fluency_delay = us_workers_pool_get_fluency_delay(stream->enc->run->pool, ready_wr);
229-
grab_after = now_ts + fluency_delay;
230-
US_LOG_VERBOSE("Fluency: delay=%.03Lf, grab_after=%.03Lf", fluency_delay, grab_after);
231-
232-
ready_job->hw = hw;
233-
us_workers_pool_assign(stream->enc->run->pool, ready_wr);
234-
US_LOG_DEBUG("Assigned new frame in buffer=%d to worker=%s", buf_index, ready_wr->name);
235-
236-
_SINK_PUT(raw_sink, &hw->raw);
237-
238-
if (run->h264 != NULL) {
239-
us_queue_put(h264_ctx.queue, hw, h264_force_key);
240-
}
203+
us_device_buffer_incref(hw); // JPEG
204+
us_queue_put(jpeg_ctx.queue, hw, 0);
205+
if (run->h264 != NULL) {
206+
us_device_buffer_incref(hw); // H264
207+
us_queue_put(h264_ctx.queue, hw, 0);
241208
}
209+
us_queue_put(releasers[hw->buf.index].queue, hw, 0); // Plan to release
210+
211+
_SINK_PUT(raw_sink, &hw->raw);
242212
}
243213

244214
close:
@@ -249,6 +219,9 @@ void us_stream_loop(us_stream_s *stream) {
249219
us_queue_destroy(h264_ctx.queue);
250220
}
251221

222+
US_THREAD_JOIN(jpeg_ctx.tid);
223+
us_queue_destroy(jpeg_ctx.queue);
224+
252225
for (uint index = 0; index < n_releasers; ++index) {
253226
US_THREAD_JOIN(releasers[index].tid);
254227
us_queue_destroy(releasers[index].queue);
@@ -273,14 +246,66 @@ void us_stream_loop_break(us_stream_s *stream) {
273246
atomic_store(&stream->run->stop, true);
274247
}
275248

249+
static void *_jpeg_thread(void *v_ctx) {
250+
_jpeg_context_s *ctx = v_ctx;
251+
us_stream_s *stream = ctx->stream;
252+
253+
ldf grab_after = 0;
254+
uint fluency_passed = 0;
255+
256+
while (!atomic_load(ctx->stop)) {
257+
us_worker_s *const ready_wr = us_workers_pool_wait(stream->enc->run->pool);
258+
us_encoder_job_s *const ready_job = ready_wr->job;
259+
260+
if (ready_job->hw != NULL) {
261+
us_device_buffer_decref(ready_job->hw);
262+
ready_job->hw = NULL;
263+
if (ready_wr->job_failed) {
264+
// pass
265+
} else if (ready_wr->job_timely) {
266+
_stream_expose_frame(stream, ready_job->dest);
267+
US_LOG_PERF("##### Encoded JPEG exposed; worker=%s, latency=%.3Lf",
268+
ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts);
269+
} else {
270+
US_LOG_PERF("----- Encoded JPEG dropped; worker=%s", ready_wr->name);
271+
}
272+
}
273+
274+
us_hw_buffer_s *hw;
275+
if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) {
276+
continue;
277+
}
278+
279+
const ldf now_ts = us_get_now_monotonic();
280+
if (now_ts < grab_after) {
281+
fluency_passed += 1;
282+
US_LOG_VERBOSE("Passed %u frames for fluency: now=%.03Lf, grab_after=%.03Lf",
283+
fluency_passed, now_ts, grab_after);
284+
us_device_buffer_decref(hw);
285+
continue;
286+
}
287+
fluency_passed = 0;
288+
289+
const ldf fluency_delay = us_workers_pool_get_fluency_delay(stream->enc->run->pool, ready_wr);
290+
grab_after = now_ts + fluency_delay;
291+
US_LOG_VERBOSE("Fluency: delay=%.03Lf, grab_after=%.03Lf", fluency_delay, grab_after);
292+
293+
ready_job->hw = hw;
294+
us_workers_pool_assign(stream->enc->run->pool, ready_wr);
295+
US_LOG_DEBUG("Assigned new frame in buffer=%d to worker=%s", hw->buf.index, ready_wr->name);
296+
}
297+
return NULL;
298+
}
299+
276300
static void *_h264_thread(void *v_ctx) {
277301
_h264_context_s *ctx = v_ctx;
278302
while (!atomic_load(ctx->stop)) {
279303
us_hw_buffer_s *hw;
280-
if (!us_queue_get(ctx->queue, (void**)&hw, 0.1)) {
281-
us_h264_stream_process(ctx->h264, &hw->raw, false);
282-
atomic_fetch_sub(&hw->busy, 1);
304+
if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) {
305+
continue;
283306
}
307+
us_h264_stream_process(ctx->h264, &hw->raw, false);
308+
us_device_buffer_decref(hw);
284309
}
285310
return NULL;
286311
}
@@ -289,21 +314,23 @@ static void *_releaser_thread(void *v_ctx) {
289314
_releaser_context_s *ctx = v_ctx;
290315
while (!atomic_load(ctx->stop)) {
291316
us_hw_buffer_s *hw;
292-
if (!us_queue_get(ctx->queue, (void**)&hw, 0.1)) {
293-
while (atomic_load(&hw->busy) > 0) {
294-
if (atomic_load(ctx->stop)) {
295-
break;
296-
}
297-
usleep(5 * 1000);
298-
}
299-
US_MUTEX_LOCK(*ctx->mutex);
300-
const int released = us_device_release_buffer(ctx->dev, hw);
301-
US_MUTEX_UNLOCK(*ctx->mutex);
302-
if (released < 0) {
303-
break;
317+
if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) {
318+
continue;
319+
}
320+
while (atomic_load(&hw->refs) > 0) {
321+
if (atomic_load(ctx->stop)) {
322+
goto done;
304323
}
324+
usleep(5 * 1000);
325+
}
326+
US_MUTEX_LOCK(*ctx->mutex);
327+
const int released = us_device_release_buffer(ctx->dev, hw);
328+
US_MUTEX_UNLOCK(*ctx->mutex);
329+
if (released < 0) {
330+
goto done;
305331
}
306332
}
333+
done:
307334
atomic_store(ctx->stop, true); // Stop all other guys on error
308335
return NULL;
309336
}
@@ -318,18 +345,6 @@ static bool _stream_has_any_clients(us_stream_s *stream) {
318345
);
319346
}
320347

321-
static bool _stream_slowdown(us_stream_s *stream) {
322-
if (stream->slowdown) {
323-
unsigned count = 0;
324-
while (count < 10 && !atomic_load(&stream->run->stop) && !_stream_has_any_clients(stream)) {
325-
usleep(100000);
326-
++count;
327-
}
328-
return (count >= 10);
329-
}
330-
return false;
331-
}
332-
333348
static int _stream_init_loop(us_stream_s *stream) {
334349
us_stream_runtime_s *const run = stream->run;
335350

@@ -448,7 +463,7 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame) {
448463

449464
static void _stream_check_suicide(us_stream_s *stream) {
450465
us_stream_runtime_s *const run = stream->run;
451-
if (stream->exit_on_no_clients <= 0) {
466+
if (stream->exit_on_no_clients > 0) {
452467
return;
453468
}
454469
const ldf now_ts = us_get_now_monotonic();

0 commit comments

Comments
 (0)