diff --git a/.github/workflows/buildtest.yaml b/.github/workflows/buildtest.yaml index ad6f5bf50a6..c41f870e6e8 100644 --- a/.github/workflows/buildtest.yaml +++ b/.github/workflows/buildtest.yaml @@ -45,7 +45,7 @@ jobs: go get github.com/sasha-s/go-deadlock grep -rl sync.Mutex ./pkg | xargs sed -i 's/sync\.Mutex/deadlock\.Mutex/g' grep -rl sync.RWMutex ./pkg | xargs sed -i 's/sync\.RWMutex/deadlock\.RWMutex/g' - go install golang.org/x/tools/cmd/goimports + go install golang.org/x/tools/cmd/goimports@latest grep -rl deadlock.Mutex ./pkg | xargs goimports -w grep -rl deadlock.RWMutex ./pkg | xargs goimports -w go mod tidy diff --git a/go.mod b/go.mod index 5758574cd60..7df71d5600a 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 github.com/livekit/mediatransportutil v0.0.0-20250825135402-7bc31f107ade - github.com/livekit/protocol v1.41.1-0.20250911034601-84953643320b + github.com/livekit/protocol v1.41.1-0.20250911214555-7e8f7f1f9435 github.com/livekit/psrpc v0.7.0 github.com/mackerelio/go-osstat v0.2.6 github.com/magefile/mage v1.15.0 @@ -36,7 +36,7 @@ require ( github.com/pion/ice/v4 v4.0.10 github.com/pion/interceptor v0.1.40 github.com/pion/rtcp v1.2.15 - github.com/pion/rtp v1.8.21 + github.com/pion/rtp v1.8.22 github.com/pion/sctp v1.8.39 github.com/pion/sdp/v3 v3.0.16 github.com/pion/transport/v3 v3.0.7 @@ -44,7 +44,7 @@ require ( github.com/pion/webrtc/v4 v4.1.5-0.20250828044558-c376d0edf977 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.23.0 - github.com/redis/go-redis/v9 v9.12.1 + github.com/redis/go-redis/v9 v9.14.0 github.com/rs/cors v1.11.1 github.com/stretchr/testify v1.11.1 github.com/thoas/go-funk v0.9.3 @@ -55,10 +55,10 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b - golang.org/x/mod v0.27.0 - golang.org/x/sync v0.16.0 - google.golang.org/protobuf v1.36.8 + golang.org/x/exp v0.0.0-20250911091902-df9299821621 + golang.org/x/mod v0.28.0 + golang.org/x/sync v0.17.0 + google.golang.org/protobuf v1.36.9 gopkg.in/yaml.v3 v3.0.1 ) @@ -66,6 +66,7 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.1.0 // indirect github.com/moby/sys/user v0.3.0 // indirect + github.com/nyaruka/phonenumbers v1.6.5 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/otel v1.37.0 // indirect go.opentelemetry.io/otel/metric v1.37.0 // indirect @@ -73,7 +74,7 @@ require ( ) require ( - buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.8-20250717185734-6c6e0d3c608e.1 // indirect + buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-20250717185734-6c6e0d3c608e.1 // indirect buf.build/go/protovalidate v0.14.0 // indirect buf.build/go/protoyaml v0.6.0 // indirect cel.dev/expr v0.24.0 // indirect @@ -139,13 +140,13 @@ require ( github.com/xeipuuv/gojsonschema v1.2.0 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.uber.org/zap/exp v0.3.0 // indirect - golang.org/x/crypto v0.41.0 // indirect - golang.org/x/net v0.43.0 // indirect - golang.org/x/sys v0.35.0 // indirect - golang.org/x/text v0.28.0 // indirect - golang.org/x/tools v0.36.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20250826171959-ef028d996bc1 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1 // indirect - google.golang.org/grpc v1.75.0 // indirect + golang.org/x/crypto v0.42.0 // indirect + golang.org/x/net v0.44.0 // indirect + golang.org/x/sys v0.36.0 // indirect + golang.org/x/text v0.29.0 // indirect + golang.org/x/tools v0.37.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090 // indirect + google.golang.org/grpc v1.75.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index e811df54f8e..fe171485a13 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.8-20250717185734-6c6e0d3c608e.1 h1:sjY1k5uszbIZfv11HO2keV4SLhNA47SabPO886v7Rvo= -buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.8-20250717185734-6c6e0d3c608e.1/go.mod h1:8EQ5GzyGJQ5tEIwMSxCl8RKJYsjCpAwkdcENoioXT6g= +buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-20250717185734-6c6e0d3c608e.1 h1:u98oQG8CHYBrOWrYdqbyNpKz4Pw02ssv03DsTInnXn8= +buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-20250717185734-6c6e0d3c608e.1/go.mod h1:aY3zbkNan5F+cGm9lITDP6oxJIwu0dn9KjJuJjWaHkg= buf.build/go/protovalidate v0.14.0 h1:kr/rC/no+DtRyYX+8KXLDxNnI1rINz0imk5K44ZpZ3A= buf.build/go/protovalidate v0.14.0/go.mod h1:+F/oISho9MO7gJQNYC2VWLzcO1fTPmaTA08SDYJZncA= buf.build/go/protoyaml v0.6.0 h1:Nzz1lvcXF8YgNZXk+voPPwdU8FjDPTUV4ndNTXN0n2w= @@ -171,8 +171,8 @@ github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5AT github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20250825135402-7bc31f107ade h1:lpxPcglwzUWNB4J0S2qZuyMehzmR7vW9whzSwV4IGoI= github.com/livekit/mediatransportutil v0.0.0-20250825135402-7bc31f107ade/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A= -github.com/livekit/protocol v1.41.1-0.20250911034601-84953643320b h1:xa2nqUsAnDip5YfzaEhdGqfsyHlJJfhxs2MTw0DJst0= -github.com/livekit/protocol v1.41.1-0.20250911034601-84953643320b/go.mod h1:sZNL/CLRtAEy4AcuUsab/pNSQ/StM0KstU5foHMk1jM= +github.com/livekit/protocol v1.41.1-0.20250911214555-7e8f7f1f9435 h1:Kb7ezfpKQsL9c4xZmME40or6nnbpemmsr/VEviZzbKQ= +github.com/livekit/protocol v1.41.1-0.20250911214555-7e8f7f1f9435/go.mod h1:vhMS30QoEyH2p34vi6X1eWkC4EMV72ZGZwQb74ajY7A= github.com/livekit/psrpc v0.7.0 h1:rtfqfjYN06WJYloE/S0nmkJ/Y04x4pxLQLe8kQ4FVHU= github.com/livekit/psrpc v0.7.0/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk= github.com/mackerelio/go-osstat v0.2.6 h1:gs4U8BZeS1tjrL08tt5VUliVvSWP26Ai2Ob8Lr7f2i0= @@ -222,6 +222,8 @@ github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/nyaruka/phonenumbers v1.6.5 h1:aBCaUhfpRA7hU6fsXk+p7KF1aNx4nQlq9hGeo2qdFg8= +github.com/nyaruka/phonenumbers v1.6.5/go.mod h1:7gjs+Lchqm49adhAKB5cdcng5ZXgt6x7Jgvi0ZorUtU= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/gomega v1.37.0 h1:CdEG8g0S133B4OswTDC/5XPSzE1OeP29QOioj2PID2Y= @@ -250,8 +252,8 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo= github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0= -github.com/pion/rtp v1.8.21 h1:3yrOwmZFyUpcIosNcWRpQaU+UXIJ6yxLuJ8Bx0mw37Y= -github.com/pion/rtp v1.8.21/go.mod h1:bAu2UFKScgzyFqvUKmbvzSdPr+NGbZtv6UB2hesqXBk= +github.com/pion/rtp v1.8.22 h1:8NCVDDF+uSJmMUkjLJVnIr/HX7gPesyMV1xFt5xozXc= +github.com/pion/rtp v1.8.22/go.mod h1:rF5nS1GqbR7H/TCpKwylzeq6yDM+MM6k+On5EgeThEM= github.com/pion/sctp v1.8.39 h1:PJma40vRHa3UTO3C4MyeJDQ+KIobVYRZQZ0Nt7SjQnE= github.com/pion/sctp v1.8.39/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE= github.com/pion/sdp/v3 v3.0.16 h1:0dKzYO6gTAvuLaAKQkC02eCPjMIi4NuAr/ibAwrGDCo= @@ -280,8 +282,8 @@ github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzM github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/puzpuzpuz/xsync/v3 v3.5.1 h1:GJYJZwO6IdxN/IKbneznS6yPkVC+c3zyY/j19c++5Fg= github.com/puzpuzpuz/xsync/v3 v3.5.1/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= -github.com/redis/go-redis/v9 v9.12.1 h1:k5iquqv27aBtnTm2tIkROUDp8JBXhXZIVu1InSgvovg= -github.com/redis/go-redis/v9 v9.12.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= +github.com/redis/go-redis/v9 v9.14.0 h1:u4tNCjXOyzfgeLN+vAZaW1xUooqWDqVEsZN0U01jfAE= +github.com/redis/go-redis/v9 v9.14.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= @@ -363,18 +365,18 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= -golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= -golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= -golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= -golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= +golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI= +golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8= +golang.org/x/exp v0.0.0-20250911091902-df9299821621 h1:2id6c1/gto0kaHYyrixvknJ8tUK/Qs5IsmBtrc+FtgU= +golang.org/x/exp v0.0.0-20250911091902-df9299821621/go.mod h1:TwQYMMnGpvZyc+JpB/UAuTNIsVJifOlSkrZkhcvpVUk= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= -golang.org/x/mod v0.27.0 h1:kb+q2PyFnEADO2IEF935ehFUXlWiNjJWtRNgBLSfbxQ= -golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc= +golang.org/x/mod v0.28.0 h1:gQBtGhjxykdjY9YhZpSlZIsbnaE2+PgjfLWUQTnoZ1U= +golang.org/x/mod v0.28.0/go.mod h1:yfB/L0NOf/kmEbXjzCPOx1iK1fRutOydrCMsqRhEBxI= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -397,8 +399,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= -golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= -golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= +golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= +golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -407,8 +409,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= -golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190411185658-b44545bcd369/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -442,8 +444,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= -golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= +golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -459,8 +461,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= -golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= @@ -469,22 +471,22 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= -golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg= -golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s= +golang.org/x/tools v0.37.0 h1:DVSRzp7FwePZW356yEAChSdNcQo6Nsp+fex1SUW09lE= +golang.org/x/tools v0.37.0/go.mod h1:MBN5QPQtLMHVdvsbtarmTNukZDdgwdwlO5qGacAzF0w= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= -google.golang.org/genproto/googleapis/api v0.0.0-20250826171959-ef028d996bc1 h1:APHvLLYBhtZvsbnpkfknDZ7NyH4z5+ub/I0u8L3Oz6g= -google.golang.org/genproto/googleapis/api v0.0.0-20250826171959-ef028d996bc1/go.mod h1:xUjFWUnWDpZ/C0Gu0qloASKFb6f8/QXiiXhSPFsD668= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1 h1:pmJpJEvT846VzausCQ5d7KreSROcDqmO388w5YbnltA= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1/go.mod h1:GmFNa4BdJZ2a8G+wCe9Bg3wwThLrJun751XstdJt5Og= -google.golang.org/grpc v1.75.0 h1:+TW+dqTd2Biwe6KKfhE5JpiYIBWq865PhKGSXiivqt4= -google.golang.org/grpc v1.75.0/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= -google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= -google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090 h1:d8Nakh1G+ur7+P3GcMjpRDEkoLUcLW2iU92XVqR+XMQ= +google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090/go.mod h1:U8EXRNSd8sUYyDfs/It7KVWodQr+Hf9xtxyxWudSwEw= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090 h1:/OQuEa4YWtDt7uQWHd3q3sUMb+QOLQUg1xa8CEsRv5w= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090/go.mod h1:GmFNa4BdJZ2a8G+wCe9Bg3wwThLrJun751XstdJt5Og= +google.golang.org/grpc v1.75.1 h1:/ODCNEuf9VghjgO3rqLcfg8fiOP0nSluljWFlDxELLI= +google.golang.org/grpc v1.75.1/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= +google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw= +google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/pkg/rtc/dynacast/dynacastmanager_test.go b/pkg/rtc/dynacast/dynacastmanager_test.go index 8a30d7e9791..9bb876db0be 100644 --- a/pkg/rtc/dynacast/dynacastmanager_test.go +++ b/pkg/rtc/dynacast/dynacastmanager_test.go @@ -27,16 +27,37 @@ import ( "github.com/livekit/protocol/livekit" ) -func TestSubscribedMaxQuality(t *testing.T) { +type testDynacastManagerListener struct { + onSubscribedMaxQualityChange func(subscribedQualties []*livekit.SubscribedCodec) + onSubscribedAudioCodecChange func(subscribedCodecs []*livekit.SubscribedAudioCodec) +} + +func (t *testDynacastManagerListener) OnDynacastSubscribedMaxQualityChange( + subscribedQualities []*livekit.SubscribedCodec, + _maxSubscribedQualities []types.SubscribedCodecQuality, +) { + t.onSubscribedMaxQualityChange(subscribedQualities) +} + +func (t *testDynacastManagerListener) OnDynacastSubscribedAudioCodecChange( + codecs []*livekit.SubscribedAudioCodec, +) { + t.onSubscribedAudioCodecChange(codecs) +} +func TestSubscribedMaxQuality(t *testing.T) { t.Run("subscribers muted", func(t *testing.T) { - dm := NewDynacastManager(DynacastManagerParams{}) var lock sync.Mutex actualSubscribedQualities := make([]*livekit.SubscribedCodec, 0) - dm.OnSubscribedMaxQualityChange(func(subscribedQualities []*livekit.SubscribedCodec, _maxSubscribedQualities []types.SubscribedCodecQuality) { - lock.Lock() - actualSubscribedQualities = subscribedQualities - lock.Unlock() + + dm := NewDynacastManagerVideo(DynacastManagerVideoParams{ + Listener: &testDynacastManagerListener{ + onSubscribedMaxQualityChange: func(subscribedQualities []*livekit.SubscribedCodec) { + lock.Lock() + actualSubscribedQualities = subscribedQualities + lock.Unlock() + }, + }, }) dm.NotifySubscriberMaxQuality("s1", mime.MimeTypeVP8, livekit.VideoQuality_HIGH) @@ -72,21 +93,20 @@ func TestSubscribedMaxQuality(t *testing.T) { }) t.Run("subscribers max quality", func(t *testing.T) { - dm := NewDynacastManager(DynacastManagerParams{ - DynacastPauseDelay: 100 * time.Millisecond, - }) - lock := sync.RWMutex{} - lock.Lock() actualSubscribedQualities := make([]*livekit.SubscribedCodec, 0) - lock.Unlock() - dm.OnSubscribedMaxQualityChange(func(subscribedQualities []*livekit.SubscribedCodec, _maxSubscribedQualities []types.SubscribedCodecQuality) { - lock.Lock() - actualSubscribedQualities = subscribedQualities - lock.Unlock() + + dm := NewDynacastManagerVideo(DynacastManagerVideoParams{ + Listener: &testDynacastManagerListener{ + onSubscribedMaxQualityChange: func(subscribedQualities []*livekit.SubscribedCodec) { + lock.Lock() + actualSubscribedQualities = subscribedQualities + lock.Unlock() + }, + }, }) - dm.maxSubscribedQuality = map[mime.MimeType]livekit.VideoQuality{ + dm.(*dynacastManagerVideo).maxSubscribedQuality = map[mime.MimeType]livekit.VideoQuality{ mime.MimeTypeVP8: livekit.VideoQuality_LOW, mime.MimeTypeAV1: livekit.VideoQuality_LOW, } @@ -279,91 +299,202 @@ func TestSubscribedMaxQuality(t *testing.T) { } func TestCodecRegression(t *testing.T) { - dm := NewDynacastManager(DynacastManagerParams{}) - var lock sync.Mutex - actualSubscribedQualities := make([]*livekit.SubscribedCodec, 0) - dm.OnSubscribedMaxQualityChange(func(subscribedQualities []*livekit.SubscribedCodec, _maxSubscribedQualities []types.SubscribedCodecQuality) { - lock.Lock() - actualSubscribedQualities = subscribedQualities - lock.Unlock() - }) + t.Run("codec regression video", func(t *testing.T) { + var lock sync.Mutex + actualSubscribedQualities := make([]*livekit.SubscribedCodec, 0) - dm.NotifySubscriberMaxQuality("s1", mime.MimeTypeAV1, livekit.VideoQuality_HIGH) + dm := NewDynacastManagerVideo(DynacastManagerVideoParams{ + Listener: &testDynacastManagerListener{ + onSubscribedMaxQualityChange: func(subscribedQualities []*livekit.SubscribedCodec) { + lock.Lock() + actualSubscribedQualities = subscribedQualities + lock.Unlock() + }, + }, + }) - expectedSubscribedQualities := []*livekit.SubscribedCodec{ - { - Codec: mime.MimeTypeAV1.String(), - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: true}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, - {Quality: livekit.VideoQuality_HIGH, Enabled: true}, + dm.NotifySubscriberMaxQuality("s1", mime.MimeTypeAV1, livekit.VideoQuality_HIGH) + + expectedSubscribedQualities := []*livekit.SubscribedCodec{ + { + Codec: mime.MimeTypeAV1.String(), + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: true}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, + {Quality: livekit.VideoQuality_HIGH, Enabled: true}, + }, }, - }, - } - require.Eventually(t, func() bool { - lock.Lock() - defer lock.Unlock() - - return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) - }, 10*time.Second, 100*time.Millisecond) - - dm.HandleCodecRegression(mime.MimeTypeAV1, mime.MimeTypeVP8) - - expectedSubscribedQualities = []*livekit.SubscribedCodec{ - { - Codec: mime.MimeTypeAV1.String(), - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: false}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, - {Quality: livekit.VideoQuality_HIGH, Enabled: false}, + } + require.Eventually(t, func() bool { + lock.Lock() + defer lock.Unlock() + + return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) + }, 10*time.Second, 100*time.Millisecond) + + dm.HandleCodecRegression(mime.MimeTypeAV1, mime.MimeTypeVP8) + + expectedSubscribedQualities = []*livekit.SubscribedCodec{ + { + Codec: mime.MimeTypeAV1.String(), + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: false}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, + {Quality: livekit.VideoQuality_HIGH, Enabled: false}, + }, }, - }, - { - Codec: mime.MimeTypeVP8.String(), - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: true}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, - {Quality: livekit.VideoQuality_HIGH, Enabled: true}, + { + Codec: mime.MimeTypeVP8.String(), + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: true}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, + {Quality: livekit.VideoQuality_HIGH, Enabled: true}, + }, }, - }, - } - require.Eventually(t, func() bool { - lock.Lock() - defer lock.Unlock() - - return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) - }, 10*time.Second, 100*time.Millisecond) - - // av1 quality change should be forwarded to vp8 - // av1 quality change of node should be ignored - dm.NotifySubscriberMaxQuality("s1", mime.MimeTypeAV1, livekit.VideoQuality_MEDIUM) - dm.NotifySubscriberNodeMaxQuality("n1", []types.SubscribedCodecQuality{ - {CodecMime: mime.MimeTypeAV1, Quality: livekit.VideoQuality_HIGH}, + } + require.Eventually(t, func() bool { + lock.Lock() + defer lock.Unlock() + + return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) + }, 10*time.Second, 100*time.Millisecond) + + // av1 quality change should be forwarded to vp8 + // av1 quality change of node should be ignored + dm.NotifySubscriberMaxQuality("s1", mime.MimeTypeAV1, livekit.VideoQuality_MEDIUM) + dm.NotifySubscriberNodeMaxQuality("n1", []types.SubscribedCodecQuality{ + {CodecMime: mime.MimeTypeAV1, Quality: livekit.VideoQuality_HIGH}, + }) + expectedSubscribedQualities = []*livekit.SubscribedCodec{ + { + Codec: mime.MimeTypeAV1.String(), + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: false}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, + {Quality: livekit.VideoQuality_HIGH, Enabled: false}, + }, + }, + { + Codec: mime.MimeTypeVP8.String(), + Qualities: []*livekit.SubscribedQuality{ + {Quality: livekit.VideoQuality_LOW, Enabled: true}, + {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, + {Quality: livekit.VideoQuality_HIGH, Enabled: false}, + }, + }, + } + require.Eventually(t, func() bool { + lock.Lock() + defer lock.Unlock() + + return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) + }, 10*time.Second, 100*time.Millisecond) }) - expectedSubscribedQualities = []*livekit.SubscribedCodec{ - { - Codec: mime.MimeTypeAV1.String(), - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: false}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, - {Quality: livekit.VideoQuality_HIGH, Enabled: false}, + + t.Run("codec regression audio", func(t *testing.T) { + var lock sync.Mutex + actualSubscribedCodecs := make([]*livekit.SubscribedAudioCodec, 0) + + dm := NewDynacastManagerAudio(DynacastManagerAudioParams{ + Listener: &testDynacastManagerListener{ + onSubscribedAudioCodecChange: func(subscribedCodecs []*livekit.SubscribedAudioCodec) { + lock.Lock() + actualSubscribedCodecs = subscribedCodecs + lock.Unlock() + }, }, - }, - { - Codec: mime.MimeTypeVP8.String(), - Qualities: []*livekit.SubscribedQuality{ - {Quality: livekit.VideoQuality_LOW, Enabled: true}, - {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, - {Quality: livekit.VideoQuality_HIGH, Enabled: false}, + }) + + dm.NotifySubscription("s1", mime.MimeTypeRED, true) + + expectedSubscribedCodecs := []*livekit.SubscribedAudioCodec{ + { + Codec: mime.MimeTypeRED.String(), + Enabled: true, }, - }, - } - require.Eventually(t, func() bool { - lock.Lock() - defer lock.Unlock() + } + require.Eventually(t, func() bool { + lock.Lock() + defer lock.Unlock() + + return subscribedAudioCodecsAsString(expectedSubscribedCodecs) == subscribedAudioCodecsAsString(actualSubscribedCodecs) + }, 10*time.Second, 100*time.Millisecond) + + dm.HandleCodecRegression(mime.MimeTypeRED, mime.MimeTypeOpus) + + expectedSubscribedCodecs = []*livekit.SubscribedAudioCodec{ + { + Codec: mime.MimeTypeRED.String(), + Enabled: false, + }, + { + Codec: mime.MimeTypeOpus.String(), + Enabled: true, + }, + } + require.Eventually(t, func() bool { + lock.Lock() + defer lock.Unlock() + + return subscribedAudioCodecsAsString(expectedSubscribedCodecs) == subscribedAudioCodecsAsString(actualSubscribedCodecs) + }, 10*time.Second, 100*time.Millisecond) - return subscribedCodecsAsString(expectedSubscribedQualities) == subscribedCodecsAsString(actualSubscribedQualities) - }, 10*time.Second, 100*time.Millisecond) + // RED disable as subscriber or subscriber node should be ignored as it has been regressed + dm.NotifySubscription("s1", mime.MimeTypeRED, false) + dm.NotifySubscriptionNode("n1", []*livekit.SubscribedAudioCodec{ + {Codec: mime.MimeTypeRED.String(), Enabled: false}, + }) + require.Eventually(t, func() bool { + lock.Lock() + defer lock.Unlock() + + return subscribedAudioCodecsAsString(expectedSubscribedCodecs) == subscribedAudioCodecsAsString(actualSubscribedCodecs) + }, 10*time.Second, 100*time.Millisecond) + + // `s1` unsubscription should turn off `opus` + dm.NotifySubscription("s1", mime.MimeTypeOpus, false) + expectedSubscribedCodecs = []*livekit.SubscribedAudioCodec{ + { + Codec: mime.MimeTypeRED.String(), + Enabled: false, + }, + { + Codec: mime.MimeTypeOpus.String(), + Enabled: false, + }, + } + require.Eventually(t, func() bool { + lock.Lock() + defer lock.Unlock() + + return subscribedAudioCodecsAsString(expectedSubscribedCodecs) == subscribedAudioCodecsAsString(actualSubscribedCodecs) + }, 10*time.Second, 100*time.Millisecond) + + // a node subscription should turn `opus` back on + dm.NotifySubscriptionNode("n1", []*livekit.SubscribedAudioCodec{ + { + Codec: mime.MimeTypeOpus.String(), + Enabled: true, + }, + }) + expectedSubscribedCodecs = []*livekit.SubscribedAudioCodec{ + { + Codec: mime.MimeTypeRED.String(), + Enabled: false, + }, + { + Codec: mime.MimeTypeOpus.String(), + Enabled: true, + }, + } + require.Eventually(t, func() bool { + lock.Lock() + defer lock.Unlock() + + return subscribedAudioCodecsAsString(expectedSubscribedCodecs) == subscribedAudioCodecsAsString(actualSubscribedCodecs) + }, 10*time.Second, 100*time.Millisecond) + + }) } func subscribedCodecsAsString(c1 []*livekit.SubscribedCodec) string { @@ -374,3 +505,12 @@ func subscribedCodecsAsString(c1 []*livekit.SubscribedCodec) string { } return s1 } + +func subscribedAudioCodecsAsString(c1 []*livekit.SubscribedAudioCodec) string { + sort.Slice(c1, func(i, j int) bool { return c1[i].Codec < c1[j].Codec }) + var s1 string + for _, c := range c1 { + s1 += c.String() + } + return s1 +} diff --git a/pkg/rtc/dynacast/dynacastmanageraudio.go b/pkg/rtc/dynacast/dynacastmanageraudio.go new file mode 100644 index 00000000000..542a58a1d83 --- /dev/null +++ b/pkg/rtc/dynacast/dynacastmanageraudio.go @@ -0,0 +1,198 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dynacast + +import ( + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + + "github.com/livekit/livekit-server/pkg/sfu/mime" +) + +var _ DynacastManager = (*dynacastManagerAudio)(nil) +var _ dynacastQualityListener = (*dynacastManagerAudio)(nil) + +type DynacastManagerAudioParams struct { + Listener DynacastManagerListener + Logger logger.Logger +} + +type dynacastManagerAudio struct { + params DynacastManagerAudioParams + + subscribedCodecs map[mime.MimeType]bool + committedSubscribedCodecs map[mime.MimeType]bool + + isClosed bool + + *dynacastManagerBase +} + +func NewDynacastManagerAudio(params DynacastManagerAudioParams) DynacastManager { + if params.Logger == nil { + params.Logger = logger.GetLogger() + } + d := &dynacastManagerAudio{ + params: params, + subscribedCodecs: make(map[mime.MimeType]bool), + committedSubscribedCodecs: make(map[mime.MimeType]bool), + } + d.dynacastManagerBase = newDynacastManagerBase(dynacastManagerBaseParams{ + Logger: params.Logger, + OpsQueueDepth: 4, + OnRestart: func() { + d.committedSubscribedCodecs = make(map[mime.MimeType]bool) + }, + OnDynacastQualityCreate: func(mimeType mime.MimeType) dynacastQuality { + dq := newDynacastQualityAudio(dynacastQualityAudioParams{ + MimeType: mimeType, + Listener: d, + Logger: d.params.Logger, + }) + return dq + }, + OnRegressCodec: func(fromMime, toMime mime.MimeType) { + d.subscribedCodecs[fromMime] = false + + // if the new codec is not added, notify the publisher to start publishing + if _, ok := d.subscribedCodecs[toMime]; !ok { + d.subscribedCodecs[toMime] = true + } + }, + OnUpdateNeeded: d.update, + }) + return d +} + +// It is possible for tracks to be in pending close state. When track +// is waiting to be closed, a node is not streaming a track. This can +// be used to force an update announcing that subscribed codec is disabled, +// i.e. indicating not pulling track any more. +func (d *dynacastManagerAudio) ForceEnable(enabled bool) { + d.lock.Lock() + defer d.lock.Unlock() + + for mime := range d.committedSubscribedCodecs { + d.committedSubscribedCodecs[mime] = enabled + } + + d.enqueueSubscribedChange() +} + +func (d *dynacastManagerAudio) NotifySubscription( + subscriberID livekit.ParticipantID, + mime mime.MimeType, + enabled bool, +) { + dq := d.getOrCreateDynacastQuality(mime) + if dq != nil { + dq.NotifySubscription(subscriberID, enabled) + } +} + +func (d *dynacastManagerAudio) NotifySubscriptionNode( + nodeID livekit.NodeID, + codecs []*livekit.SubscribedAudioCodec, +) { + for _, codec := range codecs { + dq := d.getOrCreateDynacastQuality(mime.NormalizeMimeType(codec.Codec)) + if dq != nil { + dq.NotifySubscriptionNode(nodeID, codec.Enabled) + } + } +} + +func (d *dynacastManagerAudio) OnUpdateAudioCodecForMime(mime mime.MimeType, enabled bool) { + d.lock.Lock() + if _, ok := d.regressedCodec[mime]; !ok { + d.subscribedCodecs[mime] = enabled + } + d.lock.Unlock() + + d.update(false) +} + +func (d *dynacastManagerAudio) update(force bool) { + d.lock.Lock() + + d.params.Logger.Debugw( + "processing subscribed codec change", + "force", force, + "committedSubscribedCodecs", d.committedSubscribedCodecs, + "subscribedCodecs", d.subscribedCodecs, + ) + + if len(d.subscribedCodecs) == 0 { + // no mime has been added, nothing to update + d.lock.Unlock() + return + } + + // add or remove of a mime triggers an update + changed := len(d.subscribedCodecs) != len(d.committedSubscribedCodecs) + if !changed { + for mime, enabled := range d.subscribedCodecs { + if ce, ok := d.committedSubscribedCodecs[mime]; ok { + if ce != enabled { + changed = true + break + } + } + } + } + + if !force && !changed { + d.lock.Unlock() + return + } + + d.params.Logger.Debugw( + "committing subscribed codec change", + "force", force, + "committedSubscribedCoecs", d.committedSubscribedCodecs, + "subscribedcodecs", d.subscribedCodecs, + ) + + // commit change + d.committedSubscribedCodecs = make(map[mime.MimeType]bool, len(d.subscribedCodecs)) + for mime, enabled := range d.subscribedCodecs { + d.committedSubscribedCodecs[mime] = enabled + } + + d.enqueueSubscribedChange() + d.lock.Unlock() +} + +func (d *dynacastManagerAudio) enqueueSubscribedChange() { + if d.isClosed || d.params.Listener == nil { + return + } + + subscribedCodecs := make([]*livekit.SubscribedAudioCodec, 0, len(d.committedSubscribedCodecs)) + for mime, enabled := range d.committedSubscribedCodecs { + subscribedCodecs = append(subscribedCodecs, &livekit.SubscribedAudioCodec{ + Codec: mime.String(), + Enabled: enabled, + }) + } + + d.params.Logger.Debugw( + "subscribedAudioCodecChange", + "subscribedCodecs", logger.ProtoSlice(subscribedCodecs), + ) + d.notifyOpsQueue.Enqueue(func() { + d.params.Listener.OnDynacastSubscribedAudioCodecChange(subscribedCodecs) + }) +} diff --git a/pkg/rtc/dynacast/dynacastmanagerbase.go b/pkg/rtc/dynacast/dynacastmanagerbase.go new file mode 100644 index 00000000000..94812acbcb0 --- /dev/null +++ b/pkg/rtc/dynacast/dynacastmanagerbase.go @@ -0,0 +1,165 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dynacast + +import ( + "sync" + + "golang.org/x/exp/maps" + + "github.com/livekit/protocol/logger" + + "github.com/livekit/livekit-server/pkg/sfu/mime" + "github.com/livekit/livekit-server/pkg/utils" +) + +type dynacastManagerBaseParams struct { + Logger logger.Logger + OpsQueueDepth uint + OnRestart func() + OnDynacastQualityCreate func(mimeType mime.MimeType) dynacastQuality + OnRegressCodec func(fromMime, toMime mime.MimeType) + OnUpdateNeeded func(force bool) +} + +type dynacastManagerBase struct { + params dynacastManagerBaseParams + + lock sync.RWMutex + regressedCodec map[mime.MimeType]struct{} + dynacastQuality map[mime.MimeType]dynacastQuality + + notifyOpsQueue *utils.OpsQueue + + isClosed bool + + dynacastManagerNull + dynacastQualityListenerNull +} + +func newDynacastManagerBase(params dynacastManagerBaseParams) *dynacastManagerBase { + if params.OpsQueueDepth == 0 { + params.OpsQueueDepth = 4 + } + d := &dynacastManagerBase{ + params: params, + regressedCodec: make(map[mime.MimeType]struct{}), + dynacastQuality: make(map[mime.MimeType]dynacastQuality), + notifyOpsQueue: utils.NewOpsQueue(utils.OpsQueueParams{ + Name: "dynacast-notify", + MinSize: params.OpsQueueDepth, + FlushOnStop: true, + Logger: params.Logger, + }), + } + d.notifyOpsQueue.Start() + return d +} + +func (d *dynacastManagerBase) AddCodec(mime mime.MimeType) { + d.getOrCreateDynacastQuality(mime) +} + +func (d *dynacastManagerBase) HandleCodecRegression(fromMime, toMime mime.MimeType) { + fromDq := d.getOrCreateDynacastQuality(fromMime) + + d.lock.Lock() + if d.isClosed { + d.lock.Unlock() + return + } + + if fromDq == nil { + // should not happen as we have added the codec on setup receiver + d.params.Logger.Warnw("regression from codec not found", nil, "mime", fromMime, "toMime", toMime) + d.lock.Unlock() + return + } + d.regressedCodec[fromMime] = struct{}{} + d.params.OnRegressCodec(fromMime, toMime) + d.lock.Unlock() + + d.params.OnUpdateNeeded(false) + + fromDq.Stop() + fromDq.RegressTo(d.getOrCreateDynacastQuality(toMime)) +} + +func (d *dynacastManagerBase) Restart() { + d.lock.Lock() + d.params.OnRestart() + + dqs := d.getDynacastQualitiesLocked() + d.lock.Unlock() + + for _, dq := range dqs { + dq.Restart() + } +} + +func (d *dynacastManagerBase) Close() { + d.notifyOpsQueue.Stop() + + d.lock.Lock() + dqs := d.getDynacastQualitiesLocked() + d.dynacastQuality = make(map[mime.MimeType]dynacastQuality) + + d.isClosed = true + d.lock.Unlock() + + for _, dq := range dqs { + dq.Stop() + } +} + +// There are situations like track unmute or streaming from a different node +// where subscription changes needs to sent to the provider immediately. +// This bypasses any debouncing and forces a subscription change update +// with immediate effect. +func (d *dynacastManagerBase) ForceUpdate() { + d.params.OnUpdateNeeded(true) +} + +func (d *dynacastManagerBase) ClearSubscriberNodes() { + d.lock.Lock() + dqs := d.getDynacastQualitiesLocked() + d.lock.Unlock() + for _, dq := range dqs { + dq.ClearSubscriberNodes() + } +} + +func (d *dynacastManagerBase) getOrCreateDynacastQuality(mimeType mime.MimeType) dynacastQuality { + d.lock.Lock() + defer d.lock.Unlock() + + if d.isClosed || mimeType == mime.MimeTypeUnknown { + return nil + } + + if dq := d.dynacastQuality[mimeType]; dq != nil { + return dq + } + + dq := d.params.OnDynacastQualityCreate(mimeType) + dq.Start() + + d.dynacastQuality[mimeType] = dq + return dq +} + +func (d *dynacastManagerBase) getDynacastQualitiesLocked() []dynacastQuality { + return maps.Values(d.dynacastQuality) +} diff --git a/pkg/rtc/dynacast/dynacastmanager.go b/pkg/rtc/dynacast/dynacastmanagervideo.go similarity index 55% rename from pkg/rtc/dynacast/dynacastmanager.go rename to pkg/rtc/dynacast/dynacastmanagervideo.go index 4f7d4e50b21..93120c84e5a 100644 --- a/pkg/rtc/dynacast/dynacastmanager.go +++ b/pkg/rtc/dynacast/dynacastmanagervideo.go @@ -15,148 +15,84 @@ package dynacast import ( - "sync" "time" "github.com/bep/debounce" - "golang.org/x/exp/maps" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu/mime" - "github.com/livekit/livekit-server/pkg/utils" ) -type DynacastManagerParams struct { +var _ DynacastManager = (*dynacastManagerVideo)(nil) +var _ dynacastQualityListener = (*dynacastManagerVideo)(nil) + +type DynacastManagerVideoParams struct { DynacastPauseDelay time.Duration + Listener DynacastManagerListener Logger logger.Logger } -type DynacastManager struct { - params DynacastManagerParams +type dynacastManagerVideo struct { + params DynacastManagerVideoParams - lock sync.RWMutex - regressedCodec map[mime.MimeType]struct{} - dynacastQuality map[mime.MimeType]*DynacastQuality maxSubscribedQuality map[mime.MimeType]livekit.VideoQuality committedMaxSubscribedQuality map[mime.MimeType]livekit.VideoQuality maxSubscribedQualityDebounce func(func()) maxSubscribedQualityDebouncePending bool - qualityNotifyOpQueue *utils.OpsQueue - isClosed bool - onSubscribedMaxQualityChange func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) + *dynacastManagerBase } -func NewDynacastManager(params DynacastManagerParams) *DynacastManager { +func NewDynacastManagerVideo(params DynacastManagerVideoParams) DynacastManager { if params.Logger == nil { params.Logger = logger.GetLogger() } - d := &DynacastManager{ + d := &dynacastManagerVideo{ params: params, - regressedCodec: make(map[mime.MimeType]struct{}), - dynacastQuality: make(map[mime.MimeType]*DynacastQuality), maxSubscribedQuality: make(map[mime.MimeType]livekit.VideoQuality), committedMaxSubscribedQuality: make(map[mime.MimeType]livekit.VideoQuality), - qualityNotifyOpQueue: utils.NewOpsQueue(utils.OpsQueueParams{ - Name: "quality-notify", - MinSize: 64, - FlushOnStop: true, - Logger: params.Logger, - }), } if params.DynacastPauseDelay > 0 { d.maxSubscribedQualityDebounce = debounce.New(params.DynacastPauseDelay) } - d.qualityNotifyOpQueue.Start() + d.dynacastManagerBase = newDynacastManagerBase(dynacastManagerBaseParams{ + Logger: params.Logger, + OpsQueueDepth: 64, + OnRestart: func() { + d.committedMaxSubscribedQuality = make(map[mime.MimeType]livekit.VideoQuality) + }, + OnDynacastQualityCreate: func(mimeType mime.MimeType) dynacastQuality { + dq := newDynacastQualityVideo(dynacastQualityVideoParams{ + MimeType: mimeType, + Listener: d, + Logger: d.params.Logger, + }) + return dq + }, + OnRegressCodec: func(fromMime, toMime mime.MimeType) { + d.maxSubscribedQuality[fromMime] = livekit.VideoQuality_OFF + + // if the new codec is not added, notify the publisher to start publishing + if _, ok := d.maxSubscribedQuality[toMime]; !ok { + d.maxSubscribedQuality[toMime] = livekit.VideoQuality_HIGH + } + }, + OnUpdateNeeded: d.update, + }) return d } -func (d *DynacastManager) OnSubscribedMaxQualityChange(f func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality)) { - d.lock.Lock() - d.onSubscribedMaxQualityChange = f - d.lock.Unlock() -} - -func (d *DynacastManager) AddCodec(mime mime.MimeType) { - d.getOrCreateDynacastQuality(mime) -} - -func (d *DynacastManager) HandleCodecRegression(fromMime, toMime mime.MimeType) { - fromDq := d.getOrCreateDynacastQuality(fromMime) - - d.lock.Lock() - if d.isClosed { - d.lock.Unlock() - return - } - - if fromDq == nil { - // should not happen as we have added the codec on setup receiver - d.params.Logger.Warnw("regression from codec not found", nil, "mime", fromMime) - d.lock.Unlock() - return - } - d.regressedCodec[fromMime] = struct{}{} - d.maxSubscribedQuality[fromMime] = livekit.VideoQuality_OFF - - // if the new codec is not added, notify the publisher to start publishing - if _, ok := d.maxSubscribedQuality[toMime]; !ok { - d.maxSubscribedQuality[toMime] = livekit.VideoQuality_HIGH - } - - d.lock.Unlock() - d.update(false) - - fromDq.Stop() - fromDq.RegressTo(d.getOrCreateDynacastQuality(toMime)) -} - -func (d *DynacastManager) Restart() { - d.lock.Lock() - d.committedMaxSubscribedQuality = make(map[mime.MimeType]livekit.VideoQuality) - - dqs := d.getDynacastQualitiesLocked() - d.lock.Unlock() - - for _, dq := range dqs { - dq.Restart() - } -} - -func (d *DynacastManager) Close() { - d.qualityNotifyOpQueue.Stop() - - d.lock.Lock() - dqs := d.getDynacastQualitiesLocked() - d.dynacastQuality = make(map[mime.MimeType]*DynacastQuality) - - d.isClosed = true - d.lock.Unlock() - - for _, dq := range dqs { - dq.Stop() - } -} - -// THere are situations like track unmute or streaming from a different node -// where subscribed quality needs to sent to the provider immediately. -// This bypasses any debouncing and forces a subscribed quality update -// with immediate effect. -func (d *DynacastManager) ForceUpdate() { - d.update(true) -} - // It is possible for tracks to be in pending close state. When track // is waiting to be closed, a node is not streaming a track. This can // be used to force an update announcing that subscribed quality is OFF, // i.e. indicating not pulling track any more. -func (d *DynacastManager) ForceQuality(quality livekit.VideoQuality) { +func (d *dynacastManagerVideo) ForceQuality(quality livekit.VideoQuality) { d.lock.Lock() defer d.lock.Unlock() @@ -167,14 +103,21 @@ func (d *DynacastManager) ForceQuality(quality livekit.VideoQuality) { d.enqueueSubscribedQualityChange() } -func (d *DynacastManager) NotifySubscriberMaxQuality(subscriberID livekit.ParticipantID, mime mime.MimeType, quality livekit.VideoQuality) { +func (d *dynacastManagerVideo) NotifySubscriberMaxQuality( + subscriberID livekit.ParticipantID, + mime mime.MimeType, + quality livekit.VideoQuality, +) { dq := d.getOrCreateDynacastQuality(mime) if dq != nil { dq.NotifySubscriberMaxQuality(subscriberID, quality) } } -func (d *DynacastManager) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, qualities []types.SubscribedCodecQuality) { +func (d *dynacastManagerVideo) NotifySubscriberNodeMaxQuality( + nodeID livekit.NodeID, + qualities []types.SubscribedCodecQuality, +) { for _, quality := range qualities { dq := d.getOrCreateDynacastQuality(quality.CodecMime) if dq != nil { @@ -183,43 +126,10 @@ func (d *DynacastManager) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, } } -func (d *DynacastManager) ClearSubscriberNodesMaxQuality() { - d.lock.Lock() - dqs := d.getDynacastQualitiesLocked() - d.lock.Unlock() - for _, dq := range dqs { - dq.ClearSubscriberNodesMaxQuality() - } -} - -func (d *DynacastManager) getOrCreateDynacastQuality(mimeType mime.MimeType) *DynacastQuality { - d.lock.Lock() - defer d.lock.Unlock() - - if d.isClosed || mimeType == mime.MimeTypeUnknown { - return nil - } - - if dq := d.dynacastQuality[mimeType]; dq != nil { - return dq - } - - dq := NewDynacastQuality(DynacastQualityParams{ - MimeType: mimeType, - Logger: d.params.Logger, - }) - dq.OnSubscribedMaxQualityChange(d.updateMaxQualityForMime) - dq.Start() - - d.dynacastQuality[mimeType] = dq - return dq -} - -func (d *DynacastManager) getDynacastQualitiesLocked() []*DynacastQuality { - return maps.Values(d.dynacastQuality) -} - -func (d *DynacastManager) updateMaxQualityForMime(mime mime.MimeType, maxQuality livekit.VideoQuality) { +func (d *dynacastManagerVideo) OnUpdateMaxQualityForMime( + mime mime.MimeType, + maxQuality livekit.VideoQuality, +) { d.lock.Lock() if _, ok := d.regressedCodec[mime]; !ok { d.maxSubscribedQuality[mime] = maxQuality @@ -229,10 +139,11 @@ func (d *DynacastManager) updateMaxQualityForMime(mime mime.MimeType, maxQuality d.update(false) } -func (d *DynacastManager) update(force bool) { +func (d *dynacastManagerVideo) update(force bool) { d.lock.Lock() - d.params.Logger.Debugw("processing quality change", + d.params.Logger.Debugw( + "processing quality change", "force", force, "committedMaxSubscribedQuality", d.committedMaxSubscribedQuality, "maxSubscribedQuality", d.maxSubscribedQuality, @@ -269,7 +180,8 @@ func (d *DynacastManager) update(force bool) { if downgradesOnly && d.maxSubscribedQualityDebounce != nil { if !d.maxSubscribedQualityDebouncePending { - d.params.Logger.Debugw("debouncing quality downgrade", + d.params.Logger.Debugw( + "debouncing quality downgrade", "committedMaxSubscribedQuality", d.committedMaxSubscribedQuality, "maxSubscribedQuality", d.maxSubscribedQuality, ) @@ -278,7 +190,8 @@ func (d *DynacastManager) update(force bool) { }) d.maxSubscribedQualityDebouncePending = true } else { - d.params.Logger.Debugw("quality downgrade waiting for debounce", + d.params.Logger.Debugw( + "quality downgrade waiting for debounce", "committedMaxSubscribedQuality", d.committedMaxSubscribedQuality, "maxSubscribedQuality", d.maxSubscribedQuality, ) @@ -294,7 +207,8 @@ func (d *DynacastManager) update(force bool) { d.maxSubscribedQualityDebouncePending = false } - d.params.Logger.Debugw("committing quality change", + d.params.Logger.Debugw( + "committing quality change", "force", force, "committedMaxSubscribedQuality", d.committedMaxSubscribedQuality, "maxSubscribedQuality", d.maxSubscribedQuality, @@ -310,8 +224,8 @@ func (d *DynacastManager) update(force bool) { d.lock.Unlock() } -func (d *DynacastManager) enqueueSubscribedQualityChange() { - if d.isClosed || d.onSubscribedMaxQualityChange == nil { +func (d *dynacastManagerVideo) enqueueSubscribedQualityChange() { + if d.isClosed || d.params.Listener == nil { return } @@ -352,7 +266,7 @@ func (d *DynacastManager) enqueueSubscribedQualityChange() { "subscribedCodecs", subscribedCodecs, "maxSubscribedQualities", maxSubscribedQualities, ) - d.qualityNotifyOpQueue.Enqueue(func() { - d.onSubscribedMaxQualityChange(subscribedCodecs, maxSubscribedQualities) + d.notifyOpsQueue.Enqueue(func() { + d.params.Listener.OnDynacastSubscribedMaxQualityChange(subscribedCodecs, maxSubscribedQualities) }) } diff --git a/pkg/rtc/dynacast/dynacastqualityaudio.go b/pkg/rtc/dynacast/dynacastqualityaudio.go new file mode 100644 index 00000000000..2414b5b6630 --- /dev/null +++ b/pkg/rtc/dynacast/dynacastqualityaudio.go @@ -0,0 +1,168 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dynacast + +import ( + "sync" + + "github.com/livekit/livekit-server/pkg/sfu/mime" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" +) + +var _ dynacastQuality = (*dynacastQualityAudio)(nil) + +type dynacastQualityAudioParams struct { + MimeType mime.MimeType + Listener dynacastQualityListener + Logger logger.Logger +} + +// dynacastQualityAudio manages enable a single receiver of a media track +type dynacastQualityAudio struct { + params dynacastQualityAudioParams + + // quality level enable/disable + lock sync.RWMutex + initialized bool + subscriberEnables map[livekit.ParticipantID]bool + subscriberNodeEnables map[livekit.NodeID]bool + enabled bool + regressTo dynacastQuality + + dynacastQualityNull +} + +func newDynacastQualityAudio(params dynacastQualityAudioParams) dynacastQuality { + return &dynacastQualityAudio{ + params: params, + subscriberEnables: make(map[livekit.ParticipantID]bool), + subscriberNodeEnables: make(map[livekit.NodeID]bool), + } +} + +func (d *dynacastQualityAudio) Start() { + d.reset() +} + +func (d *dynacastQualityAudio) Restart() { + d.reset() +} + +func (d *dynacastQualityAudio) Stop() { +} + +func (d *dynacastQualityAudio) NotifySubscription(subscriberID livekit.ParticipantID, enabled bool) { + d.params.Logger.Debugw( + "setting subscriber codec enable", + "mime", d.params.MimeType, + "subscriberID", subscriberID, + "enabled", enabled, + ) + + d.lock.Lock() + if r := d.regressTo; r != nil { + d.lock.Unlock() + return + } + + if !enabled { + delete(d.subscriberEnables, subscriberID) + } else { + d.subscriberEnables[subscriberID] = true + } + d.lock.Unlock() + + d.updateQualityChange(false) +} + +func (d *dynacastQualityAudio) NotifySubscriptionNode(nodeID livekit.NodeID, enabled bool) { + d.params.Logger.Debugw( + "setting subscriber node codec enabled", + "mime", d.params.MimeType, + "subscriberNodeID", nodeID, + "enabled", enabled, + ) + + d.lock.Lock() + if r := d.regressTo; r != nil { + // the downstream node will synthesize correct enable (its dynacast manager has codec regression), just ignore it + d.params.Logger.Debugw( + "ignoring node codec change, regressed to another dynacast quality", + "mime", d.params.MimeType, + "regressedMime", d.regressTo.Mime(), + ) + d.lock.Unlock() + return + } + + if !enabled { + delete(d.subscriberNodeEnables, nodeID) + } else { + d.subscriberNodeEnables[nodeID] = true + } + d.lock.Unlock() + + d.updateQualityChange(false) +} + +func (d *dynacastQualityAudio) ClearSubscriberNodes() { + d.lock.Lock() + d.subscriberNodeEnables = make(map[livekit.NodeID]bool) + d.lock.Unlock() + + d.updateQualityChange(false) +} + +func (d *dynacastQualityAudio) Mime() mime.MimeType { + return d.params.MimeType +} + +func (d *dynacastQualityAudio) RegressTo(other dynacastQuality) { + d.lock.Lock() + d.regressTo = other + d.lock.Unlock() + + other.Restart() +} + +func (d *dynacastQualityAudio) reset() { + d.lock.Lock() + d.initialized = false + d.lock.Unlock() +} + +func (d *dynacastQualityAudio) updateQualityChange(force bool) { + d.lock.Lock() + enabled := len(d.subscriberEnables) != 0 || len(d.subscriberNodeEnables) != 0 + if enabled == d.enabled && d.initialized && !force { + d.lock.Unlock() + return + } + + d.initialized = true + d.enabled = enabled + d.params.Logger.Debugw( + "notifying enabled change", + "mime", d.params.MimeType, + "enabled", d.enabled, + "subscriberNodeEnables", d.subscriberNodeEnables, + "subscribedEnables", d.subscriberEnables, + "force", force, + ) + d.lock.Unlock() + + d.params.Listener.OnUpdateAudioCodecForMime(d.params.MimeType, enabled) +} diff --git a/pkg/rtc/dynacast/dynacastquality.go b/pkg/rtc/dynacast/dynacastqualityvideo.go similarity index 67% rename from pkg/rtc/dynacast/dynacastquality.go rename to pkg/rtc/dynacast/dynacastqualityvideo.go index 3e038374e63..47f9f388ce9 100644 --- a/pkg/rtc/dynacast/dynacastquality.go +++ b/pkg/rtc/dynacast/dynacastqualityvideo.go @@ -23,18 +23,21 @@ import ( "github.com/livekit/protocol/logger" ) +var _ dynacastQuality = (*dynacastQualityVideo)(nil) + const ( initialQualityUpdateWait = 10 * time.Second ) -type DynacastQualityParams struct { +type dynacastQualityVideoParams struct { MimeType mime.MimeType + Listener dynacastQualityListener Logger logger.Logger } -// DynacastQuality manages max subscribed quality of a single receiver of a media track -type DynacastQuality struct { - params DynacastQualityParams +// dynacastQualityVideo manages max subscribed quality of a single receiver of a media track +type dynacastQualityVideo struct { + params dynacastQualityVideoParams // quality level enable/disable lock sync.RWMutex @@ -43,38 +46,32 @@ type DynacastQuality struct { maxSubscriberNodeQuality map[livekit.NodeID]livekit.VideoQuality maxSubscribedQuality livekit.VideoQuality maxQualityTimer *time.Timer - regressTo *DynacastQuality + regressTo dynacastQuality - onSubscribedMaxQualityChange func(mimeType mime.MimeType, maxSubscribedQuality livekit.VideoQuality) + dynacastQualityNull } -func NewDynacastQuality(params DynacastQualityParams) *DynacastQuality { - return &DynacastQuality{ +func newDynacastQualityVideo(params dynacastQualityVideoParams) dynacastQuality { + return &dynacastQualityVideo{ params: params, maxSubscriberQuality: make(map[livekit.ParticipantID]livekit.VideoQuality), maxSubscriberNodeQuality: make(map[livekit.NodeID]livekit.VideoQuality), } } -func (d *DynacastQuality) Start() { +func (d *dynacastQualityVideo) Start() { d.reset() } -func (d *DynacastQuality) Restart() { +func (d *dynacastQualityVideo) Restart() { d.reset() } -func (d *DynacastQuality) Stop() { +func (d *dynacastQualityVideo) Stop() { d.stopMaxQualityTimer() } -func (d *DynacastQuality) OnSubscribedMaxQualityChange(f func(mimeType mime.MimeType, maxSubscribedQuality livekit.VideoQuality)) { - d.lock.Lock() - defer d.lock.Unlock() - d.onSubscribedMaxQualityChange = f -} - -func (d *DynacastQuality) NotifySubscriberMaxQuality(subscriberID livekit.ParticipantID, quality livekit.VideoQuality) { +func (d *dynacastQualityVideo) NotifySubscriberMaxQuality(subscriberID livekit.ParticipantID, quality livekit.VideoQuality) { d.params.Logger.Debugw( "setting subscriber max quality", "mime", d.params.MimeType, @@ -99,14 +96,7 @@ func (d *DynacastQuality) NotifySubscriberMaxQuality(subscriberID livekit.Partic d.updateQualityChange(false) } -func (d *DynacastQuality) ClearSubscriberNodesMaxQuality() { - d.lock.Lock() - d.maxSubscriberNodeQuality = make(map[livekit.NodeID]livekit.VideoQuality) - d.lock.Unlock() - d.updateQualityChange(false) -} - -func (d *DynacastQuality) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, quality livekit.VideoQuality) { +func (d *dynacastQualityVideo) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, quality livekit.VideoQuality) { d.params.Logger.Debugw( "setting subscriber node max quality", "mime", d.params.MimeType, @@ -117,8 +107,12 @@ func (d *DynacastQuality) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, d.lock.Lock() if r := d.regressTo; r != nil { // the downstream node will synthesize correct quality notify (its dynacast manager has codec regression), just ignore it + d.params.Logger.Debugw( + "ignoring node quality change, regressed to another dynacast quality", + "mime", d.params.MimeType, + "regressedMime", d.regressTo.Mime(), + ) d.lock.Unlock() - r.params.Logger.Debugw("ignoring node quality change, regressed to another dynacast quality", "mime", d.params.MimeType) return } @@ -132,7 +126,19 @@ func (d *DynacastQuality) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, d.updateQualityChange(false) } -func (d *DynacastQuality) RegressTo(other *DynacastQuality) { +func (d *dynacastQualityVideo) ClearSubscriberNodes() { + d.lock.Lock() + d.maxSubscriberNodeQuality = make(map[livekit.NodeID]livekit.VideoQuality) + d.lock.Unlock() + + d.updateQualityChange(false) +} + +func (d *dynacastQualityVideo) Mime() mime.MimeType { + return d.params.MimeType +} + +func (d *dynacastQualityVideo) RegressTo(other dynacastQuality) { d.lock.Lock() d.regressTo = other maxSubscriberQuality := d.maxSubscriberQuality @@ -141,33 +147,41 @@ func (d *DynacastQuality) RegressTo(other *DynacastQuality) { d.maxSubscriberNodeQuality = make(map[livekit.NodeID]livekit.VideoQuality) d.lock.Unlock() - other.lock.Lock() + other.Replace(maxSubscriberQuality, maxSubscriberNodeQuality) +} + +func (d *dynacastQualityVideo) Replace( + maxSubscriberQuality map[livekit.ParticipantID]livekit.VideoQuality, + maxSubscriberNodeQuality map[livekit.NodeID]livekit.VideoQuality, +) { + d.lock.Lock() for subID, quality := range maxSubscriberQuality { - if otherQuality, ok := other.maxSubscriberQuality[subID]; ok { + if oldQuality, ok := d.maxSubscriberQuality[subID]; ok { // no QUALITY_OFF in the map - if quality > otherQuality { - other.maxSubscriberQuality[subID] = quality + if quality > oldQuality { + d.maxSubscriberQuality[subID] = quality } } else { - other.maxSubscriberQuality[subID] = quality + d.maxSubscriberQuality[subID] = quality } } for nodeID, quality := range maxSubscriberNodeQuality { - if otherQuality, ok := other.maxSubscriberNodeQuality[nodeID]; ok { + if oldQuality, ok := d.maxSubscriberNodeQuality[nodeID]; ok { // no QUALITY_OFF in the map - if quality > otherQuality { - other.maxSubscriberNodeQuality[nodeID] = quality + if quality > oldQuality { + d.maxSubscriberNodeQuality[nodeID] = quality } } else { - other.maxSubscriberNodeQuality[nodeID] = quality + d.maxSubscriberNodeQuality[nodeID] = quality } } - other.lock.Unlock() - other.Restart() + d.lock.Unlock() + + d.Restart() } -func (d *DynacastQuality) reset() { +func (d *dynacastQualityVideo) reset() { d.lock.Lock() d.initialized = false d.lock.Unlock() @@ -175,7 +189,7 @@ func (d *DynacastQuality) reset() { d.startMaxQualityTimer() } -func (d *DynacastQuality) updateQualityChange(force bool) { +func (d *dynacastQualityVideo) updateQualityChange(force bool) { d.lock.Lock() maxSubscribedQuality := livekit.VideoQuality_OFF for _, subQuality := range d.maxSubscriberQuality { @@ -196,22 +210,20 @@ func (d *DynacastQuality) updateQualityChange(force bool) { d.initialized = true d.maxSubscribedQuality = maxSubscribedQuality - d.params.Logger.Debugw("notifying quality change", + d.params.Logger.Debugw( + "notifying quality change", "mime", d.params.MimeType, "maxSubscriberQuality", d.maxSubscriberQuality, "maxSubscriberNodeQuality", d.maxSubscriberNodeQuality, "maxSubscribedQuality", d.maxSubscribedQuality, "force", force, ) - onSubscribedMaxQualityChange := d.onSubscribedMaxQualityChange d.lock.Unlock() - if onSubscribedMaxQualityChange != nil { - onSubscribedMaxQualityChange(d.params.MimeType, maxSubscribedQuality) - } + d.params.Listener.OnUpdateMaxQualityForMime(d.params.MimeType, maxSubscribedQuality) } -func (d *DynacastQuality) startMaxQualityTimer() { +func (d *dynacastQualityVideo) startMaxQualityTimer() { d.lock.Lock() defer d.lock.Unlock() @@ -226,7 +238,7 @@ func (d *DynacastQuality) startMaxQualityTimer() { }) } -func (d *DynacastQuality) stopMaxQualityTimer() { +func (d *dynacastQualityVideo) stopMaxQualityTimer() { d.lock.Lock() defer d.lock.Unlock() diff --git a/pkg/rtc/dynacast/interfaces.go b/pkg/rtc/dynacast/interfaces.go new file mode 100644 index 00000000000..42d1147aaef --- /dev/null +++ b/pkg/rtc/dynacast/interfaces.go @@ -0,0 +1,185 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dynacast + +import ( + "github.com/livekit/protocol/livekit" + + "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/livekit-server/pkg/sfu/mime" +) + +type DynacastManagerListener interface { + OnDynacastSubscribedMaxQualityChange( + subscribedQualities []*livekit.SubscribedCodec, + maxSubscribedQualities []types.SubscribedCodecQuality, + ) + + OnDynacastSubscribedAudioCodecChange(codecs []*livekit.SubscribedAudioCodec) +} + +var _ DynacastManagerListener = (*DynacastManagerListenerNull)(nil) + +type DynacastManagerListenerNull struct { +} + +func (d *DynacastManagerListenerNull) OnDynacastSubscribedMaxQualityChange( + subscribedQualities []*livekit.SubscribedCodec, + maxSubscribedQualities []types.SubscribedCodecQuality, +) { +} +func (d *DynacastManagerListenerNull) OnDynacastSubscribedAudioCodecChange( + codecs []*livekit.SubscribedAudioCodec, +) { +} + +// ----------------------------------------- + +type DynacastManager interface { + AddCodec(mime mime.MimeType) + HandleCodecRegression(fromMime, toMime mime.MimeType) + Restart() + Close() + ForceUpdate() + ForceQuality(quality livekit.VideoQuality) + ForceEnable(enabled bool) + + NotifySubscriberMaxQuality( + subscriberID livekit.ParticipantID, + mime mime.MimeType, + quality livekit.VideoQuality, + ) + NotifySubscription( + subscriberID livekit.ParticipantID, + mime mime.MimeType, + enabled bool, + ) + + NotifySubscriberNodeMaxQuality( + nodeID livekit.NodeID, + qualities []types.SubscribedCodecQuality, + ) + NotifySubscriptionNode( + nodeID livekit.NodeID, + codecs []*livekit.SubscribedAudioCodec, + ) + ClearSubscriberNodes() +} + +var _ DynacastManager = (*dynacastManagerNull)(nil) + +type dynacastManagerNull struct { +} + +func (d *dynacastManagerNull) AddCodec(mime mime.MimeType) {} +func (d *dynacastManagerNull) HandleCodecRegression(fromMime, toMime mime.MimeType) {} +func (d *dynacastManagerNull) Restart() {} +func (d *dynacastManagerNull) Close() {} +func (d *dynacastManagerNull) ForceUpdate() {} +func (d *dynacastManagerNull) ForceQuality(quality livekit.VideoQuality) {} +func (d *dynacastManagerNull) ForceEnable(enabled bool) {} +func (d *dynacastManagerNull) NotifySubscriberMaxQuality( + subscriberID livekit.ParticipantID, + mime mime.MimeType, + quality livekit.VideoQuality, +) { +} +func (d *dynacastManagerNull) NotifySubscription( + subscriberID livekit.ParticipantID, + mime mime.MimeType, + enabled bool, +) { +} +func (d *dynacastManagerNull) NotifySubscriberNodeMaxQuality( + nodeID livekit.NodeID, + qualities []types.SubscribedCodecQuality, +) { +} +func (d *dynacastManagerNull) NotifySubscriptionNode( + nodeID livekit.NodeID, + codecs []*livekit.SubscribedAudioCodec, +) { +} +func (d *dynacastManagerNull) ClearSubscriberNodes() {} + +// ------------------------------------------------ + +type dynacastQualityListener interface { + OnUpdateMaxQualityForMime(mimeType mime.MimeType, maxQuality livekit.VideoQuality) + OnUpdateAudioCodecForMime(mimeType mime.MimeType, enabled bool) +} + +var _ dynacastQualityListener = (*dynacastQualityListenerNull)(nil) + +type dynacastQualityListenerNull struct { +} + +func (d *dynacastQualityListenerNull) OnUpdateMaxQualityForMime( + mimeType mime.MimeType, + maxQuality livekit.VideoQuality, +) { +} + +func (d *dynacastQualityListenerNull) OnUpdateAudioCodecForMime( + mimeType mime.MimeType, + enabled bool, +) { +} + +// ------------------------------------------------ + +type dynacastQuality interface { + Start() + Restart() + Stop() + + NotifySubscriberMaxQuality(subscriberID livekit.ParticipantID, quality livekit.VideoQuality) + NotifySubscription(subscriberID livekit.ParticipantID, enabled bool) + + NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, quality livekit.VideoQuality) + NotifySubscriptionNode(nodeID livekit.NodeID, enabled bool) + ClearSubscriberNodes() + + Replace( + maxSubscriberQuality map[livekit.ParticipantID]livekit.VideoQuality, + maxSubscriberNodeQuality map[livekit.NodeID]livekit.VideoQuality, + ) + + Mime() mime.MimeType + RegressTo(other dynacastQuality) +} + +var _ dynacastQuality = (*dynacastQualityNull)(nil) + +type dynacastQualityNull struct { +} + +func (d *dynacastQualityNull) Start() {} +func (d *dynacastQualityNull) Restart() {} +func (d *dynacastQualityNull) Stop() {} +func (d *dynacastQualityNull) NotifySubscriberMaxQuality(subscriberID livekit.ParticipantID, quality livekit.VideoQuality) { +} +func (d *dynacastQualityNull) NotifySubscription(subscriberID livekit.ParticipantID, enabled bool) {} +func (d *dynacastQualityNull) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, quality livekit.VideoQuality) { +} +func (d *dynacastQualityNull) NotifySubscriptionNode(nodeID livekit.NodeID, enabled bool) {} +func (d *dynacastQualityNull) ClearSubscriberNodes() {} +func (d *dynacastQualityNull) Replace( + maxSubscriberQuality map[livekit.ParticipantID]livekit.VideoQuality, + maxSubscriberNodeQuality map[livekit.NodeID]livekit.VideoQuality, +) { +} +func (d *dynacastQualityNull) Mime() mime.MimeType { return mime.MimeTypeUnknown } +func (d *dynacastQualityNull) RegressTo(other dynacastQuality) {} diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index df8e3587ec9..08becb859be 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -51,7 +51,7 @@ type MediaTrack struct { *MediaTrackReceiver *MediaLossProxy - dynacastManager *dynacast.DynacastManager + dynacastManager dynacast.DynacastManager lock sync.RWMutex @@ -60,6 +60,17 @@ type MediaTrack struct { backupCodecPolicy livekit.BackupCodecPolicy regressionTargetCodec mime.MimeType regressionTargetCodecReceived bool + + onSubscribedMaxQualityChange func( + trackID livekit.TrackID, + trackInfo *livekit.TrackInfo, + subscribedQualities []*livekit.SubscribedCodec, + maxSubscribedQualities []types.SubscribedCodecQuality, + ) error + onSubscribedAudioCodecChange func( + trackID livekit.TrackID, + codecs []*livekit.SubscribedAudioCodec, + ) error } type MediaTrackParams struct { @@ -122,27 +133,57 @@ func NewMediaTrack(params MediaTrackParams, ti *livekit.TrackInfo) *MediaTrack { t.MediaTrackReceiver.OnMediaLossFeedback(t.MediaLossProxy.HandleMaxLossFeedback) } - if ti.Type == livekit.TrackType_VIDEO { - t.dynacastManager = dynacast.NewDynacastManager(dynacast.DynacastManagerParams{ + switch ti.Type { + case livekit.TrackType_VIDEO: + t.dynacastManager = dynacast.NewDynacastManagerVideo(dynacast.DynacastManagerVideoParams{ DynacastPauseDelay: params.VideoConfig.DynacastPauseDelay, + Listener: t, Logger: params.Logger, }) - t.MediaTrackReceiver.OnSetupReceiver(func(mime mime.MimeType) { + + case livekit.TrackType_AUDIO: + if len(ti.Codecs) > 1 { + t.dynacastManager = dynacast.NewDynacastManagerAudio(dynacast.DynacastManagerAudioParams{ + Listener: t, + Logger: params.Logger, + }) + } + } + t.MediaTrackReceiver.OnSetupReceiver(func(mime mime.MimeType) { + if t.dynacastManager != nil { t.dynacastManager.AddCodec(mime) - }) - t.MediaTrackReceiver.OnSubscriberMaxQualityChange( - func(subscriberID livekit.ParticipantID, mimeType mime.MimeType, layer int32) { + } + }) + t.MediaTrackReceiver.OnSubscriberMaxQualityChange( + func(subscriberID livekit.ParticipantID, mimeType mime.MimeType, layer int32) { + if t.dynacastManager != nil { t.dynacastManager.NotifySubscriberMaxQuality( subscriberID, mimeType, - buffer.GetVideoQualityForSpatialLayer(mimeType, layer, t.MediaTrackReceiver.TrackInfo()), + buffer.GetVideoQualityForSpatialLayer( + mimeType, + layer, + t.MediaTrackReceiver.TrackInfo(), + ), ) - }, - ) - t.MediaTrackReceiver.OnCodecRegression(func(old, new webrtc.RTPCodecParameters) { - t.dynacastManager.HandleCodecRegression(mime.NormalizeMimeType(old.MimeType), mime.NormalizeMimeType(new.MimeType)) - }) - } + } + }, + ) + t.MediaTrackReceiver.OnSubscriberAudioCodecChange( + func(subscriberID livekit.ParticipantID, mimeType mime.MimeType, enabled bool) { + if t.dynacastManager != nil { + t.dynacastManager.NotifySubscription(subscriberID, mimeType, enabled) + } + }, + ) + t.MediaTrackReceiver.OnCodecRegression(func(old, new webrtc.RTPCodecParameters) { + if t.dynacastManager != nil { + t.dynacastManager.HandleCodecRegression( + mime.NormalizeMimeType(old.MimeType), + mime.NormalizeMimeType(new.MimeType), + ) + } + }) t.SetMuted(ti.Muted) return t @@ -156,26 +197,20 @@ func (t *MediaTrack) OnSubscribedMaxQualityChange( maxSubscribedQualities []types.SubscribedCodecQuality, ) error, ) { - if t.dynacastManager == nil { - return - } - - handler := func(subscribedQualities []*livekit.SubscribedCodec, maxSubscribedQualities []types.SubscribedCodecQuality) { - if f != nil && !t.IsMuted() { - _ = f(t.ID(), t.ToProto(), subscribedQualities, maxSubscribedQualities) - } - - for _, q := range maxSubscribedQualities { - receiver := t.Receiver(q.CodecMime) - if receiver != nil { - receiver.SetMaxExpectedSpatialLayer( - buffer.GetSpatialLayerForVideoQuality(q.CodecMime, q.Quality, t.MediaTrackReceiver.TrackInfo()), - ) - } - } - } + t.lock.Lock() + t.onSubscribedMaxQualityChange = f + t.lock.Unlock() +} - t.dynacastManager.OnSubscribedMaxQualityChange(handler) +func (t *MediaTrack) OnSubscribedAudioCodecChange( + f func( + trackID livekit.TrackID, + codecs []*livekit.SubscribedAudioCodec, + ) error, +) { + t.lock.Lock() + t.onSubscribedAudioCodecChange = f + t.lock.Unlock() } func (t *MediaTrack) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, qualities []types.SubscribedCodecQuality) { @@ -184,9 +219,15 @@ func (t *MediaTrack) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, quali } } -func (t *MediaTrack) ClearSubscriberNodesMaxQuality() { +func (t *MediaTrack) NotifySubscriptionNode(nodeID livekit.NodeID, codecs []*livekit.SubscribedAudioCodec) { if t.dynacastManager != nil { - t.dynacastManager.ClearSubscriberNodesMaxQuality() + t.dynacastManager.NotifySubscriptionNode(nodeID, codecs) + } +} + +func (t *MediaTrack) ClearSubscriberNodes() { + if t.dynacastManager != nil { + t.dynacastManager.ClearSubscriberNodes() } } @@ -582,3 +623,47 @@ func (t *MediaTrack) enableRegression() bool { func (t *MediaTrack) Logger() logger.Logger { return t.params.Logger } + +// dynacast.DynacastManagerListtener implementation +var _ dynacast.DynacastManagerListener = (*MediaTrack)(nil) + +func (t *MediaTrack) OnDynacastSubscribedMaxQualityChange( + subscribedQualities []*livekit.SubscribedCodec, + maxSubscribedQualities []types.SubscribedCodecQuality, +) { + t.lock.RLock() + onSubscribedMaxQualityChange := t.onSubscribedMaxQualityChange + t.lock.RUnlock() + + if onSubscribedMaxQualityChange != nil && !t.IsMuted() { + _ = onSubscribedMaxQualityChange( + t.ID(), + t.ToProto(), + subscribedQualities, + maxSubscribedQualities, + ) + } + + for _, q := range maxSubscribedQualities { + receiver := t.Receiver(q.CodecMime) + if receiver != nil { + receiver.SetMaxExpectedSpatialLayer( + buffer.GetSpatialLayerForVideoQuality( + q.CodecMime, + q.Quality, + t.MediaTrackReceiver.TrackInfo(), + ), + ) + } + } +} + +func (t *MediaTrack) OnDynacastSubscribedAudioCodecChange(codecs []*livekit.SubscribedAudioCodec) { + t.lock.RLock() + onSubscribedAudioCodecChange := t.onSubscribedAudioCodecChange + t.lock.RUnlock() + + if onSubscribedAudioCodecChange != nil { + _ = onSubscribedAudioCodecChange(t.ID(), codecs) + } +} diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index a31c40a2f72..0281cee99c8 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -305,7 +305,11 @@ func (t *MediaTrackReceiver) HandleReceiverCodecChange(r sfu.TrackReceiver, code return } - t.params.Logger.Infow("regressing codec", "from", codec.MimeType, "to", backupCodecReceiver.Codec().MimeType) + t.params.Logger.Infow( + "regressing codec", + "from", codec.MimeType, + "to", backupCodecReceiver.Codec().MimeType, + ) // remove old codec from potential codecs for i, c := range t.potentialCodecs { @@ -578,6 +582,7 @@ func (t *MediaTrackReceiver) AddSubscriber(sub types.LocalParticipant) (types.Su UpstreamCodecs: potentialCodecs, Logger: tLogger, DisableRed: t.TrackInfo().GetDisableRed() || !t.params.AudioConfig.ActiveREDEncoding, + IsEncrypted: t.IsEncrypted(), }) subID := sub.ID() subTrack, err := t.MediaTrackSubscriptions.AddSubscriber(sub, wr) diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index 59fd4663193..de1781c5334 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -44,6 +44,7 @@ type MediaTrackSubscriptions struct { onDownTrackCreated func(downTrack *sfu.DownTrack) onSubscriberMaxQualityChange func(subscriberID livekit.ParticipantID, mime mime.MimeType, layer int32) + onSubscriberAudioCodecChange func(subscriberID livekit.ParticipantID, mime mime.MimeType, enabled bool) } type MediaTrackSubscriptionsParams struct { @@ -73,6 +74,10 @@ func (t *MediaTrackSubscriptions) OnSubscriberMaxQualityChange(f func(subscriber t.onSubscriberMaxQualityChange = f } +func (t *MediaTrackSubscriptions) OnSubscriberAudioCodecChange(f func(subscriberID livekit.ParticipantID, mime mime.MimeType, enabled bool)) { + t.onSubscriberAudioCodecChange = f +} + func (t *MediaTrackSubscriptions) SetMuted(muted bool) { // update mute of all subscribed tracks for _, st := range t.getAllSubscribedTracks() { @@ -117,6 +122,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * t.subscribedTracksMu.Unlock() }, OnSubscriberMaxQualityChange: t.onSubscriberMaxQualityChange, + OnSubscriberAudioCodecChange: t.onSubscriberAudioCodecChange, }) if err != nil { return nil, err diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index f2866a411e0..bb4f4e9bc46 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -2798,6 +2798,35 @@ func (p *ParticipantImpl) onSubscribedMaxQualityChange( return p.sendSubscribedQualityUpdate(subscribedQualityUpdate) } +func (p *ParticipantImpl) onSubscribedAudioCodecChange( + trackID livekit.TrackID, + codecs []*livekit.SubscribedAudioCodec, +) error { + if p.params.DisableDynacast { + return nil + } + + if len(codecs) == 0 { + return nil + } + + // normalize the codec name + for _, codec := range codecs { + codec.Codec = strings.ToLower(strings.TrimPrefix(codec.Codec, mime.MimeTypePrefixAudio)) + } + + subscribedAudioCodecUpdate := &livekit.SubscribedAudioCodecUpdate{ + TrackSid: string(trackID), + SubscribedAudioCodecs: codecs, + } + p.pubLogger.Debugw( + "sending subscribed audio codec update", + "trackID", trackID, + "update", logger.Proto(subscribedAudioCodecUpdate), + ) + return p.sendSubscribedAudioCodecUpdate(subscribedAudioCodecUpdate) +} + func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *livekit.TrackInfo { if req.Sid != "" { track := p.GetPublishedTrack(livekit.TrackID(req.Sid)) @@ -3317,6 +3346,7 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, ti *livekit.TrackInfo) }, ti) mt.OnSubscribedMaxQualityChange(p.onSubscribedMaxQualityChange) + mt.OnSubscribedAudioCodecChange(p.onSubscribedAudioCodecChange) // add to published and clean up pending if p.supervisor != nil { @@ -3726,6 +3756,17 @@ func (p *ParticipantImpl) UpdateSubscribedQuality(nodeID livekit.NodeID, trackID return nil } +func (p *ParticipantImpl) UpdateSubscribedAudioCodecs(nodeID livekit.NodeID, trackID livekit.TrackID, codecs []*livekit.SubscribedAudioCodec) error { + track := p.GetPublishedTrack(trackID) + if track == nil { + p.pubLogger.Debugw("could not find track", "trackID", trackID) + return errors.New("could not find published track") + } + + track.(types.LocalMediaTrack).NotifySubscriptionNode(nodeID, codecs) + return nil +} + func (p *ParticipantImpl) UpdateMediaLoss(nodeID livekit.NodeID, trackID livekit.TrackID, fractionalLoss uint32) error { track := p.GetPublishedTrack(trackID) if track == nil { @@ -4006,9 +4047,9 @@ func (p *ParticipantImpl) MoveToRoom(params types.MoveToRoomParams) { for _, track := range p.GetPublishedTracks() { for _, sub := range track.GetAllSubscribers() { track.RemoveSubscriber(sub, false) - // clear the subscriber node max quality as the remote quality notify + // clear the subscriber node max quality/audio codecs as the remote quality notify // from source room would not reach the moving out participant. - track.(types.LocalMediaTrack).ClearSubscriberNodesMaxQuality() + track.(types.LocalMediaTrack).ClearSubscriberNodes() } trackInfo := track.ToProto() p.params.Telemetry.TrackUnpublished( diff --git a/pkg/rtc/participant_signal.go b/pkg/rtc/participant_signal.go index 72bd1cd3541..cd894ab9fd0 100644 --- a/pkg/rtc/participant_signal.go +++ b/pkg/rtc/participant_signal.go @@ -311,6 +311,10 @@ func (p *ParticipantImpl) sendSubscribedQualityUpdate(subscribedQualityUpdate *l return p.signaller.WriteMessage(p.signalling.SignalSubscribedQualityUpdate(subscribedQualityUpdate)) } +func (p *ParticipantImpl) sendSubscribedAudioCodecUpdate(subscribedAudioCodecUpdate *livekit.SubscribedAudioCodecUpdate) error { + return p.signaller.WriteMessage(p.signalling.SignalSubscribedAudioCodecUpdate(subscribedAudioCodecUpdate)) +} + func (p *ParticipantImpl) sendSubscriptionResponse(trackID livekit.TrackID, subErr livekit.SubscriptionError) error { return p.signaller.WriteMessage(p.signalling.SignalSubscriptionResponse(&livekit.SubscriptionResponse{ TrackSid: string(trackID), diff --git a/pkg/rtc/signalling/interfaces.go b/pkg/rtc/signalling/interfaces.go index 73b35d5b090..e4983245285 100644 --- a/pkg/rtc/signalling/interfaces.go +++ b/pkg/rtc/signalling/interfaces.go @@ -58,4 +58,5 @@ type ParticipantSignalling interface { SignalSubscriptionResponse(subscriptionResponse *livekit.SubscriptionResponse) proto.Message SignalSubscriptionPermissionUpdate(subscriptionPermissionUpdate *livekit.SubscriptionPermissionUpdate) proto.Message SignalMediaSectionsRequirement(mediaSectionsRequirement *livekit.MediaSectionsRequirement) proto.Message + SignalSubscribedAudioCodecUpdate(subscribedAudioCodecUpdate *livekit.SubscribedAudioCodecUpdate) proto.Message } diff --git a/pkg/rtc/signalling/signalling.go b/pkg/rtc/signalling/signalling.go index 2e4ca69a707..8b08a6d029c 100644 --- a/pkg/rtc/signalling/signalling.go +++ b/pkg/rtc/signalling/signalling.go @@ -228,3 +228,11 @@ func (u *signalling) SignalMediaSectionsRequirement(mediaSectionsRequirement *li }, } } + +func (s *signalling) SignalSubscribedAudioCodecUpdate(subscribedAudioCodecUpdate *livekit.SubscribedAudioCodecUpdate) proto.Message { + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_SubscribedAudioCodecUpdate{ + SubscribedAudioCodecUpdate: subscribedAudioCodecUpdate, + }, + } +} diff --git a/pkg/rtc/signalling/signallingunimplemented.go b/pkg/rtc/signalling/signallingunimplemented.go index 5aa3a04cfc2..595877b02af 100644 --- a/pkg/rtc/signalling/signallingunimplemented.go +++ b/pkg/rtc/signalling/signallingunimplemented.go @@ -111,3 +111,7 @@ func (u *signallingUnimplemented) SignalSubscriptionPermissionUpdate(subscriptio func (u *signallingUnimplemented) SignalMediaSectionsRequirement(mediaSectionsRequirement *livekit.MediaSectionsRequirement) proto.Message { return nil } + +func (u *signallingUnimplemented) SignalSubscribedAudioCodecUpdate(subscribedAudioCodecUpdate *livekit.SubscribedAudioCodecUpdate) proto.Message { + return nil +} diff --git a/pkg/rtc/subscribedtrack.go b/pkg/rtc/subscribedtrack.go index 372e772d414..ac1c60e93c7 100644 --- a/pkg/rtc/subscribedtrack.go +++ b/pkg/rtc/subscribedtrack.go @@ -54,6 +54,7 @@ type SubscribedTrackParams struct { OnDownTrackCreated func(downTrack *sfu.DownTrack) OnDownTrackClosed func(subscriberID livekit.ParticipantID) OnSubscriberMaxQualityChange func(subscriberID livekit.ParticipantID, mime mime.MimeType, layer int32) + OnSubscriberAudioCodecChange func(subscriberID livekit.ParticipantID, mime mime.MimeType, enabled bool) } type SubscribedTrack struct { @@ -436,15 +437,21 @@ func (t *SubscribedTrack) OnCodecNegotiated(codec webrtc.RTPCodecCapability) { return } - if t.params.OnSubscriberMaxQualityChange != nil { + if t.params.OnSubscriberMaxQualityChange != nil || t.params.OnSubscriberAudioCodecChange != nil { go func() { mimeType := mime.NormalizeMimeType(codec.MimeType) - spatial := buffer.GetSpatialLayerForVideoQuality( - mimeType, - livekit.VideoQuality_HIGH, - t.params.MediaTrack.ToProto(), - ) - t.params.OnSubscriberMaxQualityChange(t.downTrack.SubscriberID(), mimeType, spatial) + switch t.params.MediaTrack.Kind() { + case livekit.TrackType_VIDEO: + spatial := buffer.GetSpatialLayerForVideoQuality( + mimeType, + livekit.VideoQuality_HIGH, + t.params.MediaTrack.ToProto(), + ) + t.params.OnSubscriberMaxQualityChange(t.downTrack.SubscriberID(), mimeType, spatial) + + case livekit.TrackType_AUDIO: + t.params.OnSubscriberAudioCodecChange(t.downTrack.SubscriberID(), mimeType, true) + } }() } } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index e732672e96d..91ad3ae6f67 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -518,6 +518,7 @@ type LocalParticipant interface { OnICEConfigChanged(callback func(participant LocalParticipant, iceConfig *livekit.ICEConfig)) UpdateSubscribedQuality(nodeID livekit.NodeID, trackID livekit.TrackID, maxQualities []SubscribedCodecQuality) error + UpdateSubscribedAudioCodecs(nodeID livekit.NodeID, trackID livekit.TrackID, codecs []*livekit.SubscribedAudioCodec) error UpdateMediaLoss(nodeID livekit.NodeID, trackID livekit.TrackID, fractionalLoss uint32) error // down stream bandwidth management @@ -627,7 +628,8 @@ type LocalMediaTrack interface { SetRTT(rtt uint32) NotifySubscriberNodeMaxQuality(nodeID livekit.NodeID, qualities []SubscribedCodecQuality) - ClearSubscriberNodesMaxQuality() + NotifySubscriptionNode(nodeID livekit.NodeID, codecs []*livekit.SubscribedAudioCodec) + ClearSubscriberNodes() NotifySubscriberNodeMediaLoss(nodeID livekit.NodeID, fractionalLoss uint8) } diff --git a/pkg/rtc/types/typesfakes/fake_local_media_track.go b/pkg/rtc/types/typesfakes/fake_local_media_track.go index 115e0ac4549..a327509de78 100644 --- a/pkg/rtc/types/typesfakes/fake_local_media_track.go +++ b/pkg/rtc/types/typesfakes/fake_local_media_track.go @@ -35,9 +35,9 @@ type FakeLocalMediaTrack struct { clearAllReceiversArgsForCall []struct { arg1 bool } - ClearSubscriberNodesMaxQualityStub func() - clearSubscriberNodesMaxQualityMutex sync.RWMutex - clearSubscriberNodesMaxQualityArgsForCall []struct { + ClearSubscriberNodesStub func() + clearSubscriberNodesMutex sync.RWMutex + clearSubscriberNodesArgsForCall []struct { } CloseStub func(bool) closeMutex sync.RWMutex @@ -239,6 +239,12 @@ type FakeLocalMediaTrack struct { arg1 livekit.NodeID arg2 uint8 } + NotifySubscriptionNodeStub func(livekit.NodeID, []*livekit.SubscribedAudioCodec) + notifySubscriptionNodeMutex sync.RWMutex + notifySubscriptionNodeArgsForCall []struct { + arg1 livekit.NodeID + arg2 []*livekit.SubscribedAudioCodec + } OnTrackSubscribedStub func() onTrackSubscribedMutex sync.RWMutex onTrackSubscribedArgsForCall []struct { @@ -491,28 +497,28 @@ func (fake *FakeLocalMediaTrack) ClearAllReceiversArgsForCall(i int) bool { return argsForCall.arg1 } -func (fake *FakeLocalMediaTrack) ClearSubscriberNodesMaxQuality() { - fake.clearSubscriberNodesMaxQualityMutex.Lock() - fake.clearSubscriberNodesMaxQualityArgsForCall = append(fake.clearSubscriberNodesMaxQualityArgsForCall, struct { +func (fake *FakeLocalMediaTrack) ClearSubscriberNodes() { + fake.clearSubscriberNodesMutex.Lock() + fake.clearSubscriberNodesArgsForCall = append(fake.clearSubscriberNodesArgsForCall, struct { }{}) - stub := fake.ClearSubscriberNodesMaxQualityStub - fake.recordInvocation("ClearSubscriberNodesMaxQuality", []interface{}{}) - fake.clearSubscriberNodesMaxQualityMutex.Unlock() + stub := fake.ClearSubscriberNodesStub + fake.recordInvocation("ClearSubscriberNodes", []interface{}{}) + fake.clearSubscriberNodesMutex.Unlock() if stub != nil { - fake.ClearSubscriberNodesMaxQualityStub() + fake.ClearSubscriberNodesStub() } } -func (fake *FakeLocalMediaTrack) ClearSubscriberNodesMaxQualityCallCount() int { - fake.clearSubscriberNodesMaxQualityMutex.RLock() - defer fake.clearSubscriberNodesMaxQualityMutex.RUnlock() - return len(fake.clearSubscriberNodesMaxQualityArgsForCall) +func (fake *FakeLocalMediaTrack) ClearSubscriberNodesCallCount() int { + fake.clearSubscriberNodesMutex.RLock() + defer fake.clearSubscriberNodesMutex.RUnlock() + return len(fake.clearSubscriberNodesArgsForCall) } -func (fake *FakeLocalMediaTrack) ClearSubscriberNodesMaxQualityCalls(stub func()) { - fake.clearSubscriberNodesMaxQualityMutex.Lock() - defer fake.clearSubscriberNodesMaxQualityMutex.Unlock() - fake.ClearSubscriberNodesMaxQualityStub = stub +func (fake *FakeLocalMediaTrack) ClearSubscriberNodesCalls(stub func()) { + fake.clearSubscriberNodesMutex.Lock() + defer fake.clearSubscriberNodesMutex.Unlock() + fake.ClearSubscriberNodesStub = stub } func (fake *FakeLocalMediaTrack) Close(arg1 bool) { @@ -1569,6 +1575,44 @@ func (fake *FakeLocalMediaTrack) NotifySubscriberNodeMediaLossArgsForCall(i int) return argsForCall.arg1, argsForCall.arg2 } +func (fake *FakeLocalMediaTrack) NotifySubscriptionNode(arg1 livekit.NodeID, arg2 []*livekit.SubscribedAudioCodec) { + var arg2Copy []*livekit.SubscribedAudioCodec + if arg2 != nil { + arg2Copy = make([]*livekit.SubscribedAudioCodec, len(arg2)) + copy(arg2Copy, arg2) + } + fake.notifySubscriptionNodeMutex.Lock() + fake.notifySubscriptionNodeArgsForCall = append(fake.notifySubscriptionNodeArgsForCall, struct { + arg1 livekit.NodeID + arg2 []*livekit.SubscribedAudioCodec + }{arg1, arg2Copy}) + stub := fake.NotifySubscriptionNodeStub + fake.recordInvocation("NotifySubscriptionNode", []interface{}{arg1, arg2Copy}) + fake.notifySubscriptionNodeMutex.Unlock() + if stub != nil { + fake.NotifySubscriptionNodeStub(arg1, arg2) + } +} + +func (fake *FakeLocalMediaTrack) NotifySubscriptionNodeCallCount() int { + fake.notifySubscriptionNodeMutex.RLock() + defer fake.notifySubscriptionNodeMutex.RUnlock() + return len(fake.notifySubscriptionNodeArgsForCall) +} + +func (fake *FakeLocalMediaTrack) NotifySubscriptionNodeCalls(stub func(livekit.NodeID, []*livekit.SubscribedAudioCodec)) { + fake.notifySubscriptionNodeMutex.Lock() + defer fake.notifySubscriptionNodeMutex.Unlock() + fake.NotifySubscriptionNodeStub = stub +} + +func (fake *FakeLocalMediaTrack) NotifySubscriptionNodeArgsForCall(i int) (livekit.NodeID, []*livekit.SubscribedAudioCodec) { + fake.notifySubscriptionNodeMutex.RLock() + defer fake.notifySubscriptionNodeMutex.RUnlock() + argsForCall := fake.notifySubscriptionNodeArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + func (fake *FakeLocalMediaTrack) OnTrackSubscribed() { fake.onTrackSubscribedMutex.Lock() fake.onTrackSubscribedArgsForCall = append(fake.onTrackSubscribedArgsForCall, struct { diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index abde499ad7c..a1dcca9471c 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -1332,6 +1332,19 @@ type FakeLocalParticipant struct { updateSignalingRTTArgsForCall []struct { arg1 uint32 } + UpdateSubscribedAudioCodecsStub func(livekit.NodeID, livekit.TrackID, []*livekit.SubscribedAudioCodec) error + updateSubscribedAudioCodecsMutex sync.RWMutex + updateSubscribedAudioCodecsArgsForCall []struct { + arg1 livekit.NodeID + arg2 livekit.TrackID + arg3 []*livekit.SubscribedAudioCodec + } + updateSubscribedAudioCodecsReturns struct { + result1 error + } + updateSubscribedAudioCodecsReturnsOnCall map[int]struct { + result1 error + } UpdateSubscribedQualityStub func(livekit.NodeID, livekit.TrackID, []types.SubscribedCodecQuality) error updateSubscribedQualityMutex sync.RWMutex updateSubscribedQualityArgsForCall []struct { @@ -8554,6 +8567,74 @@ func (fake *FakeLocalParticipant) UpdateSignalingRTTArgsForCall(i int) uint32 { return argsForCall.arg1 } +func (fake *FakeLocalParticipant) UpdateSubscribedAudioCodecs(arg1 livekit.NodeID, arg2 livekit.TrackID, arg3 []*livekit.SubscribedAudioCodec) error { + var arg3Copy []*livekit.SubscribedAudioCodec + if arg3 != nil { + arg3Copy = make([]*livekit.SubscribedAudioCodec, len(arg3)) + copy(arg3Copy, arg3) + } + fake.updateSubscribedAudioCodecsMutex.Lock() + ret, specificReturn := fake.updateSubscribedAudioCodecsReturnsOnCall[len(fake.updateSubscribedAudioCodecsArgsForCall)] + fake.updateSubscribedAudioCodecsArgsForCall = append(fake.updateSubscribedAudioCodecsArgsForCall, struct { + arg1 livekit.NodeID + arg2 livekit.TrackID + arg3 []*livekit.SubscribedAudioCodec + }{arg1, arg2, arg3Copy}) + stub := fake.UpdateSubscribedAudioCodecsStub + fakeReturns := fake.updateSubscribedAudioCodecsReturns + fake.recordInvocation("UpdateSubscribedAudioCodecs", []interface{}{arg1, arg2, arg3Copy}) + fake.updateSubscribedAudioCodecsMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) UpdateSubscribedAudioCodecsCallCount() int { + fake.updateSubscribedAudioCodecsMutex.RLock() + defer fake.updateSubscribedAudioCodecsMutex.RUnlock() + return len(fake.updateSubscribedAudioCodecsArgsForCall) +} + +func (fake *FakeLocalParticipant) UpdateSubscribedAudioCodecsCalls(stub func(livekit.NodeID, livekit.TrackID, []*livekit.SubscribedAudioCodec) error) { + fake.updateSubscribedAudioCodecsMutex.Lock() + defer fake.updateSubscribedAudioCodecsMutex.Unlock() + fake.UpdateSubscribedAudioCodecsStub = stub +} + +func (fake *FakeLocalParticipant) UpdateSubscribedAudioCodecsArgsForCall(i int) (livekit.NodeID, livekit.TrackID, []*livekit.SubscribedAudioCodec) { + fake.updateSubscribedAudioCodecsMutex.RLock() + defer fake.updateSubscribedAudioCodecsMutex.RUnlock() + argsForCall := fake.updateSubscribedAudioCodecsArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeLocalParticipant) UpdateSubscribedAudioCodecsReturns(result1 error) { + fake.updateSubscribedAudioCodecsMutex.Lock() + defer fake.updateSubscribedAudioCodecsMutex.Unlock() + fake.UpdateSubscribedAudioCodecsStub = nil + fake.updateSubscribedAudioCodecsReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeLocalParticipant) UpdateSubscribedAudioCodecsReturnsOnCall(i int, result1 error) { + fake.updateSubscribedAudioCodecsMutex.Lock() + defer fake.updateSubscribedAudioCodecsMutex.Unlock() + fake.UpdateSubscribedAudioCodecsStub = nil + if fake.updateSubscribedAudioCodecsReturnsOnCall == nil { + fake.updateSubscribedAudioCodecsReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.updateSubscribedAudioCodecsReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeLocalParticipant) UpdateSubscribedQuality(arg1 livekit.NodeID, arg2 livekit.TrackID, arg3 []types.SubscribedCodecQuality) error { var arg3Copy []types.SubscribedCodecQuality if arg3 != nil { diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index 37c7947b17e..d3a2aeb9506 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -40,6 +40,7 @@ type WrappedReceiverParams struct { UpstreamCodecs []webrtc.RTPCodecParameters Logger logger.Logger DisableRed bool + IsEncrypted bool } type WrappedReceiver struct { @@ -59,7 +60,7 @@ func NewWrappedReceiver(params WrappedReceiverParams) *WrappedReceiver { } codecs := params.UpstreamCodecs - if len(codecs) == 1 { + if len(codecs) == 1 && !params.IsEncrypted { normalizedMimeType := mime.NormalizeMimeType(codecs[0].MimeType) if normalizedMimeType == mime.MimeTypeRED { // if upstream is opus/red, then add opus to match clients that don't support red @@ -98,13 +99,17 @@ func (r *WrappedReceiver) DetermineReceiver(codec webrtc.RTPCodecCapability) boo if receiverMimeType == codecMimeType { trackReceiver = receiver break - } else if receiverMimeType == mime.MimeTypeRED && codecMimeType == mime.MimeTypeOpus { - // audio opus/red can match opus only - trackReceiver = receiver.GetPrimaryReceiverForRed() - break - } else if receiverMimeType == mime.MimeTypeOpus && codecMimeType == mime.MimeTypeRED { - trackReceiver = receiver.GetRedReceiver() - break + } + + if !r.params.IsEncrypted { + if receiverMimeType == mime.MimeTypeRED && codecMimeType == mime.MimeTypeOpus { + // audio opus/red can match opus only + trackReceiver = receiver.GetPrimaryReceiverForRed() + break + } else if receiverMimeType == mime.MimeTypeOpus && codecMimeType == mime.MimeTypeRED { + trackReceiver = receiver.GetRedReceiver() + break + } } } if trackReceiver == nil {