Skip to content

Commit 39a8081

Browse files
committed
dequeue segments on ffmpeg error #42.
1 parent 954fd53 commit 39a8081

File tree

1 file changed

+28
-6
lines changed

1 file changed

+28
-6
lines changed

hlsvod/manager.go

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,19 @@ func (m *ManagerCtx) enqueueSegments(offset, limit int) {
344344
}
345345
}
346346

347+
func (m *ManagerCtx) dequeueSegments(offset, limit int) {
348+
m.segmentQueueMu.Lock()
349+
defer m.segmentQueueMu.Unlock()
350+
351+
// remove segment signaling channels queue
352+
for i := offset; i < offset+limit; i++ {
353+
if res, ok := m.segmentQueue[i]; ok {
354+
close(res)
355+
delete(m.segmentQueue, i)
356+
}
357+
}
358+
}
359+
347360
func (m *ManagerCtx) dequeueSegment(index int) {
348361
m.segmentQueueMu.Lock()
349362
defer m.segmentQueueMu.Unlock()
@@ -388,15 +401,14 @@ func (m *ManagerCtx) transcodeSegments(offset, limit int) error {
388401
// create new segment signaling channels queue
389402
m.enqueueSegments(offset, limit)
390403

391-
index := offset
392-
logger.Info().Msg("transcode process started")
393-
394404
go func() {
405+
index := offset
406+
logger.Info().Msg("transcode process started")
407+
395408
for {
396409
segmentName, ok := <-segments
397410
if !ok {
398-
logger.Info().Int("index", index).Msg("transcode process finished")
399-
return
411+
break
400412
}
401413

402414
logger.Info().
@@ -413,6 +425,16 @@ func (m *ManagerCtx) transcodeSegments(offset, limit int) error {
413425
// expect new segment to come
414426
index++
415427
}
428+
429+
// check if all segments were transcoded
430+
if index < offset+limit {
431+
// clear segments queue if not all segments were transcoded
432+
m.dequeueSegments(offset, limit)
433+
434+
logger.Warn().Msg("transcode process finished, but not all segments were transcoded")
435+
} else {
436+
logger.Info().Msg("transcode process finished")
437+
}
416438
}()
417439

418440
return nil
@@ -559,7 +581,7 @@ func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) {
559581
// now segment should be available
560582
segmentPath, ok = m.getSegment(index)
561583
if !ok || segmentPath == "" {
562-
// this should never happen
584+
// can happen if transcode failed
563585
m.logger.Error().Int("index", index).Msg("segment not found even after transcoding")
564586
http.Error(w, "409 segment not found even after transcoding", http.StatusConflict)
565587
return

0 commit comments

Comments
 (0)