50
50
51
51
52
52
typedef struct {
53
+ pthread_t tid ;
54
+ us_device_s * dev ;
55
+ us_queue_s * queue ;
56
+ us_h264_stream_s * h264 ;
57
+ atomic_bool * stop ;
58
+ } _h264_context_s ;
59
+
60
+ typedef struct {
61
+ pthread_t tid ;
53
62
us_device_s * dev ;
54
63
us_queue_s * queue ;
55
64
pthread_mutex_t * mutex ;
56
65
atomic_bool * stop ;
57
66
} _releaser_context_s ;
58
67
59
68
69
+ static void * _h264_thread (void * v_ctx );
60
70
static void * _releaser_thread (void * v_ctx );
61
71
62
- static void _stream_release_buffer (us_stream_s * stream , us_hw_buffer_s * hw );
63
- static bool _stream_is_stopped (us_stream_s * stream );
64
72
static bool _stream_has_any_clients (us_stream_s * stream );
65
73
static bool _stream_slowdown (us_stream_s * stream );
66
74
static int _stream_init_loop (us_stream_s * stream );
67
75
static void _stream_expose_frame (us_stream_s * stream , us_frame_s * frame );
76
+ static void _stream_check_suicide (us_stream_s * stream );
68
77
69
78
70
79
#define _SINK_PUT (x_sink , x_frame ) { \
@@ -74,18 +83,10 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame);
74
83
} \
75
84
}
76
85
77
- #define _H264_PUT (x_frame , x_force_key ) { \
78
- if (stream->run->h264) { \
79
- us_h264_stream_process(stream->run->h264, x_frame, x_force_key); \
80
- } \
81
- }
82
-
83
86
84
87
us_stream_s * us_stream_init (us_device_s * dev , us_encoder_s * enc ) {
85
88
us_stream_runtime_s * run ;
86
89
US_CALLOC (run , 1 );
87
- US_MUTEX_INIT (run -> release_mutex );
88
- atomic_init (& run -> release_stop , false);
89
90
US_RING_INIT_WITH_ITEMS (run -> http_jpeg_ring , 4 , us_frame_init );
90
91
atomic_init (& run -> http_has_clients , false);
91
92
atomic_init (& run -> http_last_request_ts , 0 );
@@ -108,7 +109,6 @@ us_stream_s *us_stream_init(us_device_s *dev, us_encoder_s *enc) {
108
109
void us_stream_destroy (us_stream_s * stream ) {
109
110
us_blank_destroy (stream -> run -> blank );
110
111
US_RING_DELETE_WITH_ITEMS (stream -> run -> http_jpeg_ring , us_frame_destroy );
111
- US_MUTEX_DESTROY (stream -> run -> release_mutex );
112
112
free (stream -> run );
113
113
free (stream );
114
114
}
@@ -126,17 +126,29 @@ void us_stream_loop(us_stream_s *stream) {
126
126
}
127
127
128
128
while (!_stream_init_loop (stream )) {
129
+ atomic_bool threads_stop ;
130
+ atomic_init (& threads_stop , false);
131
+
132
+ pthread_mutex_t release_mutex ;
133
+ US_MUTEX_INIT (release_mutex );
129
134
const uint n_releasers = stream -> dev -> run -> n_bufs ;
130
- US_CALLOC (run -> releasers , n_releasers );
135
+ _releaser_context_s * releasers ;
136
+ US_CALLOC (releasers , n_releasers );
131
137
for (uint index = 0 ; index < n_releasers ; ++ index ) {
132
- run -> releasers [index ].queue = us_queue_init (1 );
133
- _releaser_context_s * ctx ;
134
- US_CALLOC (ctx , 1 );
135
- ctx -> dev = stream -> dev ;
136
- ctx -> queue = run -> releasers [index ].queue ;
137
- ctx -> mutex = & run -> release_mutex ;
138
- ctx -> stop = & run -> release_stop ;
139
- US_THREAD_CREATE (run -> releasers [index ].tid , _releaser_thread , ctx );
138
+ releasers [index ].dev = stream -> dev ;
139
+ releasers [index ].queue = us_queue_init (1 );
140
+ releasers [index ].mutex = & release_mutex ;
141
+ releasers [index ].stop = & threads_stop ;
142
+ US_THREAD_CREATE (releasers [index ].tid , _releaser_thread , & releasers [index ]);
143
+ }
144
+
145
+ _h264_context_s h264_ctx ;
146
+ if (run -> h264 != NULL ) {
147
+ h264_ctx .dev = stream -> dev ;
148
+ h264_ctx .queue = us_queue_init (stream -> dev -> run -> n_bufs );
149
+ h264_ctx .h264 = run -> h264 ;
150
+ h264_ctx .stop = & threads_stop ;
151
+ US_THREAD_CREATE (h264_ctx .tid , _h264_thread , & h264_ctx );
140
152
}
141
153
142
154
ldf grab_after = 0 ;
@@ -146,15 +158,18 @@ void us_stream_loop(us_stream_s *stream) {
146
158
147
159
US_LOG_INFO ("Capturing ..." );
148
160
149
- while (!_stream_is_stopped (stream ) && !atomic_load (& run -> release_stop )) {
161
+ while (!atomic_load (& run -> stop ) && !atomic_load (& threads_stop )) {
162
+ _stream_check_suicide (stream );
163
+
150
164
US_SEP_DEBUG ('-' );
151
165
US_LOG_DEBUG ("Waiting for worker ..." );
152
166
153
167
us_worker_s * const ready_wr = us_workers_pool_wait (stream -> enc -> run -> pool );
154
168
us_encoder_job_s * const ready_job = ready_wr -> job ;
155
169
156
170
if (ready_job -> hw != NULL ) {
157
- _stream_release_buffer (stream , ready_job -> hw );
171
+ assert (!us_queue_put (releasers [ready_job -> hw -> buf .index ].queue , ready_job -> hw , 0 ));
172
+ atomic_fetch_sub (& ready_job -> hw -> busy , 1 );
158
173
ready_job -> hw = NULL ;
159
174
if (ready_wr -> job_failed ) {
160
175
// pass
@@ -168,7 +183,7 @@ void us_stream_loop(us_stream_s *stream) {
168
183
}
169
184
170
185
const bool h264_force_key = _stream_slowdown (stream );
171
- if (_stream_is_stopped ( stream )) {
186
+ if (atomic_load ( & run -> stop ) || atomic_load ( & threads_stop )) {
172
187
goto close ;
173
188
}
174
189
@@ -191,8 +206,14 @@ void us_stream_loop(us_stream_s *stream) {
191
206
fluency_passed += 1 ;
192
207
US_LOG_VERBOSE ("Passed %u frames for fluency: now=%.03Lf, grab_after=%.03Lf" ,
193
208
fluency_passed , now_ts , grab_after );
194
- _stream_release_buffer ( stream , hw );
209
+ assert (! us_queue_put ( releasers [ hw -> buf . index ]. queue , hw , 0 ) );
195
210
} else {
211
+ int hw_busy = 1 ;
212
+ if (run -> h264 != NULL ) {
213
+ hw_busy += 1 ;
214
+ }
215
+ atomic_store (& hw -> busy , hw_busy );
216
+
196
217
fluency_passed = 0 ;
197
218
198
219
const sll now_sec_ts = us_floor_ms (now_ts );
@@ -213,18 +234,29 @@ void us_stream_loop(us_stream_s *stream) {
213
234
US_LOG_DEBUG ("Assigned new frame in buffer=%d to worker=%s" , buf_index , ready_wr -> name );
214
235
215
236
_SINK_PUT (raw_sink , & hw -> raw );
216
- _H264_PUT (& hw -> raw , h264_force_key );
237
+
238
+ if (run -> h264 != NULL ) {
239
+ us_queue_put (h264_ctx .queue , hw , h264_force_key );
240
+ }
217
241
}
218
242
}
219
243
220
244
close :
221
- atomic_store (& run -> release_stop , true);
245
+ atomic_store (& threads_stop , true);
246
+
247
+ if (run -> h264 != NULL ) {
248
+ US_THREAD_JOIN (h264_ctx .tid );
249
+ us_queue_destroy (h264_ctx .queue );
250
+ }
251
+
222
252
for (uint index = 0 ; index < n_releasers ; ++ index ) {
223
- US_THREAD_JOIN (run -> releasers [index ].tid );
224
- us_queue_destroy (run -> releasers [index ].queue );
253
+ US_THREAD_JOIN (releasers [index ].tid );
254
+ us_queue_destroy (releasers [index ].queue );
225
255
}
226
- free (run -> releasers );
227
- atomic_store (& run -> release_stop , false);
256
+ free (releasers );
257
+ US_MUTEX_DESTROY (release_mutex );
258
+
259
+ atomic_store (& threads_stop , false);
228
260
229
261
us_encoder_close (stream -> enc );
230
262
us_device_close (stream -> dev );
@@ -241,11 +273,29 @@ void us_stream_loop_break(us_stream_s *stream) {
241
273
atomic_store (& stream -> run -> stop , true);
242
274
}
243
275
276
+ static void * _h264_thread (void * v_ctx ) {
277
+ _h264_context_s * ctx = v_ctx ;
278
+ while (!atomic_load (ctx -> stop )) {
279
+ us_hw_buffer_s * hw ;
280
+ if (!us_queue_get (ctx -> queue , (void * * )& hw , 0.1 )) {
281
+ us_h264_stream_process (ctx -> h264 , & hw -> raw , false);
282
+ atomic_fetch_sub (& hw -> busy , 1 );
283
+ }
284
+ }
285
+ return NULL ;
286
+ }
287
+
244
288
static void * _releaser_thread (void * v_ctx ) {
245
289
_releaser_context_s * ctx = v_ctx ;
246
290
while (!atomic_load (ctx -> stop )) {
247
291
us_hw_buffer_s * hw ;
248
292
if (!us_queue_get (ctx -> queue , (void * * )& hw , 0.1 )) {
293
+ while (atomic_load (& hw -> busy ) > 0 ) {
294
+ if (atomic_load (ctx -> stop )) {
295
+ break ;
296
+ }
297
+ usleep (5 * 1000 );
298
+ }
249
299
US_MUTEX_LOCK (* ctx -> mutex );
250
300
const int released = us_device_release_buffer (ctx -> dev , hw );
251
301
US_MUTEX_UNLOCK (* ctx -> mutex );
@@ -254,36 +304,10 @@ static void *_releaser_thread(void *v_ctx) {
254
304
}
255
305
}
256
306
}
257
- atomic_store (ctx -> stop , true); // Stop all other guys
258
- free (ctx );
307
+ atomic_store (ctx -> stop , true); // Stop all other guys on error
259
308
return NULL ;
260
309
}
261
310
262
- static void _stream_release_buffer (us_stream_s * stream , us_hw_buffer_s * hw ) {
263
- assert (!us_queue_put (stream -> run -> releasers [hw -> buf .index ].queue , hw , 0 ));
264
- }
265
-
266
- static bool _stream_is_stopped (us_stream_s * stream ) {
267
- us_stream_runtime_s * const run = stream -> run ;
268
- const bool stop = atomic_load (& run -> stop );
269
- if (stop ) {
270
- return true;
271
- }
272
- if (stream -> exit_on_no_clients > 0 ) {
273
- const ldf now_ts = us_get_now_monotonic ();
274
- const ull http_last_request_ts = atomic_load (& run -> http_last_request_ts ); // Seconds
275
- if (_stream_has_any_clients (stream )) {
276
- atomic_store (& run -> http_last_request_ts , now_ts );
277
- } else if (http_last_request_ts + stream -> exit_on_no_clients < now_ts ) {
278
- US_LOG_INFO ("No requests or HTTP/sink clients found in last %u seconds, exiting ..." ,
279
- stream -> exit_on_no_clients );
280
- us_process_suicide ();
281
- atomic_store (& run -> http_last_request_ts , now_ts );
282
- }
283
- }
284
- return false;
285
- }
286
-
287
311
static bool _stream_has_any_clients (us_stream_s * stream ) {
288
312
const us_stream_runtime_s * const run = stream -> run ;
289
313
return (
@@ -297,7 +321,7 @@ static bool _stream_has_any_clients(us_stream_s *stream) {
297
321
static bool _stream_slowdown (us_stream_s * stream ) {
298
322
if (stream -> slowdown ) {
299
323
unsigned count = 0 ;
300
- while (count < 10 && !_stream_is_stopped ( stream ) && !_stream_has_any_clients (stream )) {
324
+ while (count < 10 && !atomic_load ( & stream -> run -> stop ) && !_stream_has_any_clients (stream )) {
301
325
usleep (100000 );
302
326
++ count ;
303
327
}
@@ -310,7 +334,9 @@ static int _stream_init_loop(us_stream_s *stream) {
310
334
us_stream_runtime_s * const run = stream -> run ;
311
335
312
336
int access_errno = 0 ;
313
- while (!_stream_is_stopped (stream )) {
337
+ while (!atomic_load (& stream -> run -> stop )) {
338
+ _stream_check_suicide (stream );
339
+
314
340
unsigned width = stream -> dev -> run -> width ;
315
341
unsigned height = stream -> dev -> run -> height ;
316
342
if (width == 0 || height == 0 ) {
@@ -323,7 +349,10 @@ static int _stream_init_loop(us_stream_s *stream) {
323
349
_stream_expose_frame (stream , NULL );
324
350
325
351
_SINK_PUT (raw_sink , run -> blank -> raw );
326
- _H264_PUT (run -> blank -> raw , false);
352
+
353
+ if (run -> h264 != NULL ) {
354
+ us_h264_stream_process (run -> h264 , run -> blank -> raw , false);
355
+ }
327
356
328
357
if (access (stream -> dev -> path , R_OK |W_OK ) < 0 ) {
329
358
if (access_errno != errno ) {
@@ -394,7 +423,7 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame) {
394
423
395
424
int ri = -1 ;
396
425
while (
397
- !_stream_is_stopped ( stream )
426
+ !atomic_load ( & run -> stop )
398
427
&& ((ri = us_ring_producer_acquire (run -> http_jpeg_ring , 0 )) < 0 )
399
428
) {
400
429
US_LOG_ERROR ("Can't push JPEG to HTTP ring (no free slots)" );
@@ -416,3 +445,20 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame) {
416
445
417
446
_SINK_PUT (jpeg_sink , (frame != NULL ? frame : run -> blank -> jpeg ));
418
447
}
448
+
449
+ static void _stream_check_suicide (us_stream_s * stream ) {
450
+ us_stream_runtime_s * const run = stream -> run ;
451
+ if (stream -> exit_on_no_clients <= 0 ) {
452
+ return ;
453
+ }
454
+ const ldf now_ts = us_get_now_monotonic ();
455
+ const ull http_last_request_ts = atomic_load (& run -> http_last_request_ts ); // Seconds
456
+ if (_stream_has_any_clients (stream )) {
457
+ atomic_store (& run -> http_last_request_ts , now_ts );
458
+ } else if (http_last_request_ts + stream -> exit_on_no_clients < now_ts ) {
459
+ US_LOG_INFO ("No requests or HTTP/sink clients found in last %u seconds, exiting ..." ,
460
+ stream -> exit_on_no_clients );
461
+ us_process_suicide ();
462
+ atomic_store (& run -> http_last_request_ts , now_ts );
463
+ }
464
+ }
0 commit comments