Skip to content

Commit 06fdbfd

Browse files
authored
sip: route state updates through a public StateHandler (#714)
* sip: route state updates through a public StateHandler CallState's update stream now flows through a StateHandler interface that callers in cloud-sip can hook into. A default rpcStateHandler forwards UpdateSIPCallState / RecordCallContext to an upstream RPC client, preserving today's behavior for non-cloud builds. Other changes that fall out: - Handler.OnSessionEnd takes *CallState instead of *livekit.SIPCallInfo so handlers can route post-call writes (PCAP append) through the state's RecordCallContext passthrough rather than the raw RPC client. - CallState owns the proto + transferInfos + dirty bit and serializes every flush under its mutex. Ctx-free API. - Service.NewService takes a GetStateHandler factory so cloud builds can inject their own (legacy RPC + local observability fork). - internalProvidergetSIPTrunkAuthentication parallelizes the auth/ observability lookups and threads SIPCallObservability through the dispatch wrappers (auth response, create-participant request). * deps: bump protocol to pick up SIPCallObservability fields * deps: bump protocol * ci: re-trigger * sip: rename mutate to appendInfo in RecordCallContext Address PR feedback — `appendInfo` reads more clearly than `mutate` for the late-arriving call-context write (e.g. PCAP links). * deps: bump protocol * deps: bump server-sdk-go for new protocol compat protocol main removed the legacy Scenario_Create_* / ScenarioGroup_* types after #1609 (agent_simulation yaml refactor); server-sdk-go@v2.16.6 still references them and fails to build. Bump to the matching server-sdk-go main pseudo-version.
1 parent 2cb4bc1 commit 06fdbfd

16 files changed

Lines changed: 203 additions & 154 deletions

File tree

cmd/livekit-sip/main.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,21 @@ import (
2121
"os/signal"
2222
"syscall"
2323

24-
"github.com/livekit/protocol/tracer/jaeger"
25-
"github.com/livekit/psrpc/pkg/middleware/otelpsrpc"
2624
"github.com/urfave/cli/v3"
2725

26+
"github.com/livekit/protocol/livekit"
2827
"github.com/livekit/protocol/logger"
2928
"github.com/livekit/protocol/redis"
3029
"github.com/livekit/protocol/rpc"
30+
"github.com/livekit/protocol/tracer/jaeger"
3131
"github.com/livekit/psrpc"
32-
33-
"github.com/livekit/sip/pkg/stats"
32+
"github.com/livekit/psrpc/pkg/middleware/otelpsrpc"
3433

3534
"github.com/livekit/sip/pkg/config"
3635
"github.com/livekit/sip/pkg/errors"
3736
"github.com/livekit/sip/pkg/service"
3837
"github.com/livekit/sip/pkg/sip"
38+
"github.com/livekit/sip/pkg/stats"
3939
"github.com/livekit/sip/version"
4040
)
4141

@@ -99,7 +99,7 @@ func runService(ctx context.Context, c *cli.Command) error {
9999
return err
100100
}
101101

102-
sipsrv, err := sip.NewService("", conf, mon, log, func(projectID string) rpc.IOInfoClient { return psrpcClient })
102+
sipsrv, err := sip.NewService("", conf, mon, log, func(projectID string, _ *rpc.SIPCallObservability, _ *livekit.SIPCallInfo) sip.StateHandler { return sip.NewRPCStateHandler(psrpcClient) })
103103
if err != nil {
104104
return err
105105
}

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ require (
1111
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
1212
github.com/livekit/media-sdk v0.0.0-20260608233105-27d39f4e1900
1313
github.com/livekit/mediatransportutil v0.0.0-20260608063931-a3417d38cda0
14-
github.com/livekit/protocol v1.46.7-0.20260605212020-c0615a2f6f84
14+
github.com/livekit/protocol v1.46.7-0.20260610064410-e286afe70eb0
1515
github.com/livekit/psrpc v0.7.2
16-
github.com/livekit/server-sdk-go/v2 v2.16.6
16+
github.com/livekit/server-sdk-go/v2 v2.16.7-0.20260608025623-a5da15b13baa
1717
github.com/livekit/sipgo v0.13.2-0.20260519205735-a5b4a38b6ceb
1818
github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12
1919
github.com/ory/dockertest/v3 v3.12.0

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,12 +134,12 @@ github.com/livekit/media-sdk v0.0.0-20260608233105-27d39f4e1900 h1:8tHAXEeGZsqkg
134134
github.com/livekit/media-sdk v0.0.0-20260608233105-27d39f4e1900/go.mod h1:uWrLXY4JeLYynX39htMG49Dl4BhFYY+RCeoXaLdU+Lw=
135135
github.com/livekit/mediatransportutil v0.0.0-20260608063931-a3417d38cda0 h1:XHNNzebIKZRkLimla/hFGrAIX5EMWHctrgt3hLw7s+I=
136136
github.com/livekit/mediatransportutil v0.0.0-20260608063931-a3417d38cda0/go.mod h1:o8CFmAdrVwzJNOCsQCLUzXRjokkufNshnQHOe4fRaqU=
137-
github.com/livekit/protocol v1.46.7-0.20260605212020-c0615a2f6f84 h1:dkHHthyor9dwxxdBmbeG1ZUI4bPHpTEk9DjYJSSSIl4=
138-
github.com/livekit/protocol v1.46.7-0.20260605212020-c0615a2f6f84/go.mod h1:jO+y05AU9Ec4JswDyuzKCZ4bhziOS0CzMqgnbj60Dzs=
137+
github.com/livekit/protocol v1.46.7-0.20260610064410-e286afe70eb0 h1:aNazCl+gTEmF88tVsISOvtnfZM/K9IbqAn2WvZVmh4Y=
138+
github.com/livekit/protocol v1.46.7-0.20260610064410-e286afe70eb0/go.mod h1:jO+y05AU9Ec4JswDyuzKCZ4bhziOS0CzMqgnbj60Dzs=
139139
github.com/livekit/psrpc v0.7.2 h1:6oZ+NODJ2pLyaT6VqDq1F4Qc/3TpDUSpyphj/P9MhQc=
140140
github.com/livekit/psrpc v0.7.2/go.mod h1:rAI+m2+/cb4x9RXhLRtUx5ZwdfjjXOl4zi46IjEetaw=
141-
github.com/livekit/server-sdk-go/v2 v2.16.6 h1:NBKw5l1AAOHsHAZKzuzAGzILRmSm+4E+/YZ9ZiaqudI=
142-
github.com/livekit/server-sdk-go/v2 v2.16.6/go.mod h1:1+duFCDFpAvHqZ6mHQe7IwjecLIBFv/keJcexuFhD+0=
141+
github.com/livekit/server-sdk-go/v2 v2.16.7-0.20260608025623-a5da15b13baa h1:B19yilP7+JjekKMD0WejMh1Kvypdxpr5yxQZiFStRD0=
142+
github.com/livekit/server-sdk-go/v2 v2.16.7-0.20260608025623-a5da15b13baa/go.mod h1:SWJD68Rfcwrhze09EYaRiur7ESCBuu0u4fpK+0BGEYo=
143143
github.com/livekit/sipgo v0.13.2-0.20260519205735-a5b4a38b6ceb h1:HmgaJMGs0Nco/Z+XMc9f+xFgrbood9yJsIBtl1OY76M=
144144
github.com/livekit/sipgo v0.13.2-0.20260519205735-a5b4a38b6ceb/go.mod h1:aDa6mbFktNzA1D917RhFlIB5IOfNBTmrwt+/lX960j0=
145145
github.com/mackerelio/go-osstat v0.2.7 h1:TCavZi10wF49bT6iQZ9eT2keGZQpC69MTDfdJej5e94=

pkg/service/psrpc.go

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,23 +32,26 @@ func GetAuthCredentials(ctx context.Context, psrpcClient rpc.IOInfoClient, call
3232
switch resp.ErrorCode {
3333
case rpc.SIPTrunkAuthenticationError_SIP_TRUNK_AUTH_ERROR_QUOTA_EXCEEDED:
3434
return sip.AuthInfo{
35-
ProjectID: resp.ProjectId,
36-
Result: sip.AuthQuotaExceeded,
37-
ProviderInfo: resp.ProviderInfo,
35+
ProjectID: resp.ProjectId,
36+
Result: sip.AuthQuotaExceeded,
37+
ProviderInfo: resp.ProviderInfo,
38+
Observability: resp.Observability,
3839
}, nil
3940
case rpc.SIPTrunkAuthenticationError_SIP_TRUNK_AUTH_ERROR_NO_TRUNK_FOUND:
4041
return sip.AuthInfo{
41-
ProjectID: resp.ProjectId,
42-
Result: sip.AuthNoTrunkFound,
43-
ProviderInfo: resp.ProviderInfo,
42+
ProjectID: resp.ProjectId,
43+
Result: sip.AuthNoTrunkFound,
44+
ProviderInfo: resp.ProviderInfo,
45+
Observability: resp.Observability,
4446
}, nil
4547
}
4648

4749
if resp.Drop {
4850
return sip.AuthInfo{
49-
ProjectID: resp.ProjectId,
50-
Result: sip.AuthDrop,
51-
ProviderInfo: resp.ProviderInfo,
51+
ProjectID: resp.ProjectId,
52+
Result: sip.AuthDrop,
53+
ProviderInfo: resp.ProviderInfo,
54+
Observability: resp.Observability,
5255
}, nil
5356
}
5457
if resp.Username != "" && resp.Password != "" {
@@ -61,14 +64,16 @@ func GetAuthCredentials(ctx context.Context, psrpcClient rpc.IOInfoClient, call
6164
Password: resp.Password,
6265
Realm: resp.Realm,
6366
},
64-
ProviderInfo: resp.ProviderInfo,
67+
ProviderInfo: resp.ProviderInfo,
68+
Observability: resp.Observability,
6569
}, nil
6670
}
6771
return sip.AuthInfo{
68-
ProjectID: resp.ProjectId,
69-
TrunkID: resp.SipTrunkId,
70-
Result: sip.AuthAccept,
71-
ProviderInfo: resp.ProviderInfo,
72+
ProjectID: resp.ProjectId,
73+
TrunkID: resp.SipTrunkId,
74+
Result: sip.AuthAccept,
75+
ProviderInfo: resp.ProviderInfo,
76+
Observability: resp.Observability,
7277
}, nil
7378
}
7479

pkg/service/service.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,6 @@ func (s *Service) OnInboundInfo(log logger.Logger, callInfo *rpc.SIPCall, header
257257

258258
}
259259

260-
func (s *Service) OnSessionEnd(ctx context.Context, callIdentifier *sip.CallIdentifier, callInfo *livekit.SIPCallInfo, reason string) {
261-
s.log.Infow("SIP call ended", "callID", callInfo.CallId, "reason", reason)
260+
func (s *Service) OnSessionEnd(ctx context.Context, callIdentifier *sip.CallIdentifier, state *sip.CallState, reason string) {
261+
s.log.Infow("SIP call ended", "callID", callIdentifier.CallID, "reason", reason)
262262
}

pkg/sip/analytics.go

Lines changed: 85 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -30,28 +30,77 @@ import (
3030
"github.com/livekit/psrpc"
3131
)
3232

33+
// StateUpdater is the upstream RPC surface CallState's default StateHandler
34+
// forwards to.
3335
type StateUpdater interface {
3436
UpdateSIPCallState(ctx context.Context, req *rpc.UpdateSIPCallStateRequest, opts ...psrpc.RequestOption) (*emptypb.Empty, error)
37+
RecordCallContext(ctx context.Context, req *rpc.RecordCallContextRequest, opts ...psrpc.RequestOption) (*emptypb.Empty, error)
3538
}
3639

37-
func NewCallState(cli StateUpdater, initial *livekit.SIPCallInfo) *CallState {
40+
// StateHandler receives outgoing CallState changes. CallState invokes the
41+
// handler whenever the proto needs to be sent upstream (HandleUpdate) or a
42+
// transfer transitions (HandleTransfer). Implementations forward to an RPC
43+
// sink, drive in-process observability, or both.
44+
//
45+
// Methods are called while CallState holds its internal mutex. Implementations
46+
// must not call back into the same CallState. The supplied protos remain owned
47+
// by CallState; implementations that retain them past the call must clone.
48+
//
49+
// Resend / retry semantics belong to the implementation — CallState clears
50+
// dirty after every call regardless of upstream outcome.
51+
type StateHandler interface {
52+
HandleUpdate(info *livekit.SIPCallInfo)
53+
HandleTransfer(ti *livekit.SIPTransferInfo)
54+
HandleCallContextRecorded(info *livekit.SIPCallInfo)
55+
}
56+
57+
// NewRPCStateHandler returns a StateHandler that forwards updates to a
58+
// StateUpdater. nil cli yields a no-op handler — useful for tests that don't
59+
// care about the upstream sink.
60+
func NewRPCStateHandler(cli StateUpdater) StateHandler {
61+
return &rpcStateHandler{cli: cli}
62+
}
63+
64+
type rpcStateHandler struct{ cli StateUpdater }
65+
66+
func (h *rpcStateHandler) HandleUpdate(info *livekit.SIPCallInfo) {
67+
if h.cli == nil {
68+
return
69+
}
70+
_, _ = h.cli.UpdateSIPCallState(context.Background(), &rpc.UpdateSIPCallStateRequest{CallInfo: info})
71+
}
72+
73+
func (h *rpcStateHandler) HandleTransfer(ti *livekit.SIPTransferInfo) {
74+
if h.cli == nil {
75+
return
76+
}
77+
_, _ = h.cli.UpdateSIPCallState(context.Background(), &rpc.UpdateSIPCallStateRequest{TransferInfo: ti})
78+
}
79+
80+
func (h *rpcStateHandler) HandleCallContextRecorded(info *livekit.SIPCallInfo) {
81+
if h.cli == nil {
82+
return
83+
}
84+
_, _ = h.cli.RecordCallContext(context.Background(), &rpc.RecordCallContextRequest{CallInfo: info})
85+
}
86+
87+
func NewCallState(handler StateHandler, initial *livekit.SIPCallInfo) *CallState {
3888
if initial == nil {
3989
initial = &livekit.SIPCallInfo{}
4090
} else {
4191
initial = proto.CloneOf(initial)
4292
}
43-
s := &CallState{
44-
cli: cli,
93+
return &CallState{
94+
handler: handler,
4595
callInfo: initial,
4696
transferInfos: make(map[string]*livekit.SIPTransferInfo),
4797
dirty: true,
4898
}
49-
return s
5099
}
51100

52101
type CallState struct {
53102
mu sync.Mutex
54-
cli StateUpdater
103+
handler StateHandler
55104
callInfo *livekit.SIPCallInfo
56105
transferInfos map[string]*livekit.SIPTransferInfo
57106
dirty bool
@@ -78,98 +127,83 @@ func (s *CallState) DeferUpdate(update func(info *livekit.SIPCallInfo)) {
78127
update(s.callInfo)
79128
}
80129

81-
func (s *CallState) Update(ctx context.Context, update func(info *livekit.SIPCallInfo)) {
82-
ctx, span := Tracer.Start(ctx, "sip.CallState.Update")
83-
defer span.End()
130+
func (s *CallState) Update(update func(info *livekit.SIPCallInfo)) {
84131
s.mu.Lock()
85132
defer s.mu.Unlock()
86133
s.dirty = true
87134
update(s.callInfo)
88-
s.flush(ctx)
135+
s.flushLocked()
89136
}
90137

91-
func (s *CallState) StartTransfer(ctx context.Context, transferTo string) string {
138+
func (s *CallState) StartTransfer(transferTo string) string {
139+
s.mu.Lock()
140+
defer s.mu.Unlock()
92141
ti := &livekit.SIPTransferInfo{
93142
TransferId: guid.New(utils.SIPTransferPrefix),
94143
CallId: s.callInfo.CallId,
95144
TransferTo: transferTo,
96145
TransferInitiatedAtNs: time.Now().UnixNano(),
97146
TransferStatus: livekit.SIPTransferStatus_STS_TRANSFER_ONGOING,
98147
}
99-
100-
req := &rpc.UpdateSIPCallStateRequest{
101-
TransferInfo: ti,
102-
}
103-
104-
s.cli.UpdateSIPCallState(ctx, req)
105-
106-
s.mu.Lock()
107-
defer s.mu.Unlock()
108-
148+
s.handler.HandleTransfer(ti)
109149
s.transferInfos[ti.TransferId] = ti
110-
111150
return ti.TransferId
112151
}
113152

114-
func (s *CallState) EndTransfer(ctx context.Context, transferID string, inErr error) {
153+
func (s *CallState) EndTransfer(transferID string, inErr error) {
115154
s.mu.Lock()
155+
defer s.mu.Unlock()
116156
ti := s.transferInfos[transferID]
117157
delete(s.transferInfos, transferID)
118-
s.mu.Unlock()
119-
120158
if ti == nil {
121159
return
122160
}
123-
124161
ti.TransferCompletedAtNs = time.Now().UnixNano()
125162
if inErr != nil {
126163
ti.Error = inErr.Error()
127164
ti.TransferStatus = livekit.SIPTransferStatus_STS_TRANSFER_FAILED
128165
} else {
129166
ti.TransferStatus = livekit.SIPTransferStatus_STS_TRANSFER_SUCCESSFUL
130167
}
131-
132168
var sipStatus *livekit.SIPStatus
133169
if errors.As(inErr, &sipStatus) {
134170
ti.TransferStatusCode = sipStatus
135171
}
136-
137-
req := &rpc.UpdateSIPCallStateRequest{
138-
TransferInfo: ti,
139-
}
140-
141-
s.cli.UpdateSIPCallState(ctx, req)
172+
s.handler.HandleTransfer(ti)
142173
}
143174

144-
func (s *CallState) flush(ctx context.Context) {
145-
ctx = context.WithoutCancel(ctx)
146-
if s.cli == nil {
147-
s.dirty = false
148-
return
149-
}
150-
_, err := s.cli.UpdateSIPCallState(ctx, &rpc.UpdateSIPCallStateRequest{
151-
CallInfo: s.callInfo,
152-
})
153-
if err == nil {
154-
s.dirty = false
175+
// RecordCallContext appends late-arriving context to the canonical callInfo
176+
// (e.g. PCAP links published after the call has ended) and signals the
177+
// handler that the post-call context has been recorded. Does not touch the
178+
// dirty bit: this is a terminal post-call signal, not a regular flush.
179+
func (s *CallState) RecordCallContext(appendInfo func(info *livekit.SIPCallInfo)) {
180+
s.mu.Lock()
181+
defer s.mu.Unlock()
182+
if appendInfo != nil {
183+
appendInfo(s.callInfo)
155184
}
185+
s.handler.HandleCallContextRecorded(s.callInfo)
156186
}
157187

158-
func (s *CallState) Flush(ctx context.Context) {
159-
ctx, span := Tracer.Start(ctx, "sip.CallState.Flush")
160-
defer span.End()
188+
func (s *CallState) Flush() {
161189
s.mu.Lock()
162190
defer s.mu.Unlock()
163191
if !s.dirty {
164192
return
165193
}
166-
s.flush(ctx)
194+
s.flushLocked()
167195
}
168196

169-
func (s *CallState) ForceFlush(ctx context.Context) {
170-
ctx, span := Tracer.Start(ctx, "sip.CallState.ForceFlush")
171-
defer span.End()
197+
func (s *CallState) ForceFlush() {
172198
s.mu.Lock()
173199
defer s.mu.Unlock()
174-
s.flush(ctx)
200+
s.flushLocked()
201+
}
202+
203+
// flushLocked sends the current callInfo through the handler. Caller must
204+
// hold s.mu. Retry/resend semantics live in the handler; CallState clears
205+
// dirty unconditionally.
206+
func (s *CallState) flushLocked() {
207+
s.handler.HandleUpdate(s.callInfo)
208+
s.dirty = false
175209
}

0 commit comments

Comments
 (0)