@@ -15,6 +15,7 @@ import (
1515 "github.com/facebookincubator/go-belt/tool/experimental/errmon"
1616 "github.com/facebookincubator/go-belt/tool/logger"
1717 "github.com/hashicorp/go-multierror"
18+ "github.com/xaionaro-go/avpipeline/indicator"
1819 "github.com/xaionaro-go/avpipeline/kernel"
1920 "github.com/xaionaro-go/avpipeline/packet"
2021 "github.com/xaionaro-go/avpipeline/processor"
@@ -28,14 +29,14 @@ import (
2829)
2930
3031const (
31- enableSeekOnStart = true
32- enableTracksRotation = false
33- enableSlowDown = true
34- minSpeed = 0.975
35- minSpeedDifferenceSlowDown = 0.001
36- minJitterBufDuration = 100 * time .Millisecond
37- jitterBufDecayHalftime = 5 * time .Minute
38- playerCheckInterval = 100 * time .Millisecond
32+ enableSeekOnStart = true
33+ enableTracksRotation = false
34+ enableSlowDown = true
35+ minSpeed = 0.975
36+ minSpeedDifference = 0.01
37+ minJitterBufDuration = 500 * time .Millisecond
38+ jitterBufDecayHalftime = 5 * time .Minute
39+ playerCheckInterval = 100 * time .Millisecond
3940)
4041
4142type Publisher interface {
@@ -94,6 +95,7 @@ type StreamPlayer struct {
9495 Backend player.Backend
9596 Config Config
9697
98+ WantSpeedAverage * indicator.MAMA [float64 ]
9799 CurrentJitterBufDuration time.Duration
98100}
99101
@@ -129,9 +131,10 @@ func (sp *StreamPlayers) Create(
129131 Parent : sp ,
130132 Cancel : cancel ,
131133 StreamPlayer : StreamPlayer {
132- Backend : backend ,
133- Config : resultingOpts .Config (ctx ),
134- StreamID : streamID ,
134+ Backend : backend ,
135+ Config : resultingOpts .Config (ctx ),
136+ StreamID : streamID ,
137+ WantSpeedAverage : indicator .NewMAMA [float64 ](10 , 0.3 , 0.05 ),
135138 },
136139 }
137140
@@ -672,7 +675,8 @@ func (p *StreamPlayerHandler) controllerLoop(
672675 err = fmt .Errorf ("unable to get position: %w" , err )
673676 }
674677 if enableSeekOnStart && protocol != streamtypes .ServerTypeRTMP && ! triedToSeek {
675- l , err := player .GetLength (ctx )
678+ var l time.Duration
679+ l , err = player .GetLength (ctx )
676680 if err != nil {
677681 err = fmt .Errorf ("unable to get length: %w" , err )
678682 }
@@ -801,9 +805,10 @@ func (p *StreamPlayerHandler) controllerLoop(
801805 p .Config .JitterBufDuration ,
802806 )
803807 logger .Debugf (ctx ,
804- "StreamPlayer[%s].controllerLoop: increased jitter buffer duration to %v" ,
808+ "StreamPlayer[%s].controllerLoop: increased jitter buffer duration to %v (increased by %v) " ,
805809 p .StreamID ,
806810 p .CurrentJitterBufDuration ,
811+ jitterBufDurationIncrease ,
807812 )
808813 jitterBufDurationIncrease = 0
809814 }
@@ -934,14 +939,15 @@ func (p *StreamPlayerHandler) controllerLoop(
934939 if enableSlowDown && protocol == streamtypes .ServerTypeRTMP && p .CurrentJitterBufDuration > time .Second && minBufDuration > lag {
935940 jitterBufDurationIncrease = max (jitterBufDurationIncrease , minBufDuration - lag )
936941 k := lag .Seconds () / minBufDuration .Seconds ()
937- speed := 1 - (1 - k )* (1 - minSpeed )
938- if speed <= 0 {
942+ wantSpeed := 1 - (1 - k )* (1 - minSpeed )
943+ if wantSpeed <= 0 {
939944 return
940945 }
941- if math .Abs (speed - curSpeed ) < minSpeedDifferenceSlowDown {
946+ setSpeed := p .WantSpeedAverage .Update (wantSpeed )
947+ if math .Abs (setSpeed - curSpeed ) < minSpeedDifference {
942948 return
943949 }
944- curSpeed = speed
950+ curSpeed = setSpeed
945951 logger .Debugf (ctx ,
946952 "StreamPlayer[%s].controllerLoop: slowing down to %f" ,
947953 p .StreamID , curSpeed ,
@@ -958,6 +964,7 @@ func (p *StreamPlayerHandler) controllerLoop(
958964 commitJitterBufferIncrease ()
959965 }
960966 if lag <= p .CurrentJitterBufDuration {
967+ p .WantSpeedAverage .Update (1 )
961968 if curSpeed == 1 {
962969 return
963970 }
@@ -979,29 +986,31 @@ func (p *StreamPlayerHandler) controllerLoop(
979986 // log(x) = log(0.5) / (halftime/interval)
980987 // x = e^(log(0.5)/(halftime/interval))
981988 jitterBufFactor := math .Exp (math .Log (2 ) / (jitterBufDecayHalftime .Seconds () / playerCheckInterval .Seconds ()))
989+ p .CurrentJitterBufDuration = max (
990+ time .Duration (float64 (p .CurrentJitterBufDuration )* jitterBufFactor ),
991+ minJitterBufDuration ,
992+ )
982993 logger .Tracef (ctx ,
983- "StreamPlayer[%s].controllerLoop: increasing jitter buffer duration factor: %v (halftime: %v, interval: %v)" ,
994+ "StreamPlayer[%s].controllerLoop: increasing jitter buffer duration factor: %v (halftime: %v, interval: %v); new duration: %v " ,
984995 p .StreamID ,
985996 jitterBufFactor ,
986997 jitterBufDecayHalftime ,
987998 playerCheckInterval ,
988- )
989- p .CurrentJitterBufDuration = max (
990- time .Duration (float64 (p .CurrentJitterBufDuration )* jitterBufFactor ),
991- minJitterBufDuration ,
999+ p .CurrentJitterBufDuration ,
9921000 )
9931001
994- speed := float64 (1 ) +
1002+ wantSpeed := float64 (1 ) +
9951003 (p .Config .CatchupMaxSpeedFactor - float64 (1 ))*
9961004 (lag - p .CurrentJitterBufDuration ).Seconds ()/
9971005 (p .Config .MaxCatchupAtLag - p .CurrentJitterBufDuration ).Seconds ()
9981006
999- speed = float64 (uint (speed * 100 )) / 100 // to avoid flickering (for example between 1.0001 and 1.0)
1007+ setSpeed := p .WantSpeedAverage .Update (wantSpeed )
1008+ setSpeed = float64 (uint (setSpeed * 50 )) / 50 // to avoid flickering (for example between 1.0001 and 1.0000)
10001009
1001- if speed > p .Config .CatchupMaxSpeedFactor {
1010+ if setSpeed > p .Config .CatchupMaxSpeedFactor {
10021011 logger .Warnf (ctx ,
10031012 "speed is calculated higher than the maximum: %v > %v: (%v-1)*(%v-%v)/(%v-%v); lag calculation: %v - %v" ,
1004- speed ,
1013+ setSpeed ,
10051014 p .Config .CatchupMaxSpeedFactor ,
10061015 p .Config .CatchupMaxSpeedFactor ,
10071016 lag .Seconds (),
@@ -1010,22 +1019,24 @@ func (p *StreamPlayerHandler) controllerLoop(
10101019 p .CurrentJitterBufDuration .Seconds (),
10111020 l , pos ,
10121021 )
1013- speed = p .Config .CatchupMaxSpeedFactor
1022+ setSpeed = p .Config .CatchupMaxSpeedFactor
10141023 }
10151024
1016- if speed != curSpeed {
1017- logger .Debugf (
1018- ctx ,
1019- "StreamPlayer[%s].controllerLoop: setting the speed to %v: lag: %v - %v == %v" ,
1020- p .StreamID , speed , l , pos , lag ,
1021- )
1022- err = player .SetSpeed (ctx , speed )
1023- if err != nil {
1024- logger .Errorf (ctx , "unable to set the speed to %v: %v" , speed , err )
1025- return
1026- }
1027- curSpeed = speed
1025+ if setSpeed == curSpeed {
1026+ return
1027+ }
1028+
1029+ logger .Debugf (
1030+ ctx ,
1031+ "StreamPlayer[%s].controllerLoop: setting the speed to %v: lag: %v - %v == %v" ,
1032+ p .StreamID , setSpeed , l , pos , lag ,
1033+ )
1034+ err = player .SetSpeed (ctx , setSpeed )
1035+ if err != nil {
1036+ logger .Errorf (ctx , "unable to set the speed to %v: %v" , setSpeed , err )
1037+ return
10281038 }
1039+ curSpeed = setSpeed
10291040 })
10301041 if err != nil {
10311042 logger .Error (ctx , "unable to get the player: %v" , err )
0 commit comments