Skip to content

Commit 6d8b5c3

Browse files
authored
ai/live: Reuse HTTP connections for trickle requests (#3470)
Initially we were re-creating connections to avoid being stuck in a degraded state over time, but it seems that we may have trouble even establishing a connection sometimes. Attempt to re-use established connections to mitigate this problem. Both scenarios should be caught by the slow orchestrator checker, but hopefully connection reuse makes the checker trip less often.
1 parent a4b76be commit 6d8b5c3

File tree

2 files changed

+25
-25
lines changed

2 files changed

+25
-25
lines changed

trickle/trickle_publisher.go

+20-18
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ var StreamNotFoundErr = errors.New("stream not found")
1414

1515
// TricklePublisher represents a trickle streaming client
1616
type TricklePublisher struct {
17+
client *http.Client
1718
baseURL string
1819
index int // Current index for segments
1920
writeLock sync.Mutex // Mutex to manage concurrent access
@@ -47,6 +48,7 @@ func NewTricklePublisher(url string) (*TricklePublisher, error) {
4748
c := &TricklePublisher{
4849
baseURL: url,
4950
contentType: "video/MP2T",
51+
client: httpClient(),
5052
}
5153
p, err := c.preconnect()
5254
if err != nil {
@@ -76,13 +78,7 @@ func (c *TricklePublisher) preconnect() (*pendingPost, error) {
7678

7779
// Start the POST request in a background goroutine
7880
go func() {
79-
// Createa new client to prevent connection reuse
80-
client := http.Client{Transport: &http.Transport{
81-
DisableKeepAlives: true,
82-
// ignore orch certs for now
83-
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
84-
}}
85-
resp, err := client.Do(req)
81+
resp, err := c.client.Do(req)
8682
if err != nil {
8783
slog.Error("Failed to complete POST for segment", "url", url, "err", err)
8884
errCh <- err
@@ -126,11 +122,8 @@ func (c *TricklePublisher) Close() error {
126122
if err != nil {
127123
return err
128124
}
129-
resp, err := (&http.Client{Transport: &http.Transport{
130-
DisableKeepAlives: true,
131-
// ignore orch certs for now
132-
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
133-
}}).Do(req)
125+
// Use a new client for a fresh connection
126+
resp, err := httpClient().Do(req)
134127
if err != nil {
135128
return err
136129
}
@@ -174,11 +167,12 @@ func (p *pendingPost) reconnect() (*pendingPost, error) {
174167
// Set the publisher's sequence sequence to the intended reconnect
175168
// Call publisher's preconnect (which increments its sequence)
176169
// then reset publisher's sequence back to the original
177-
//slog.Info("Re-connecting", "url", p.client.baseURL, "seq", p.client.index)
170+
// Also recreate the client to force a fresh connection
178171
p.client.writeLock.Lock()
179172
defer p.client.writeLock.Unlock()
180173
currentSeq := p.client.index
181174
p.client.index = p.index
175+
p.client.client = httpClient()
182176
pp, err := p.client.preconnect()
183177
p.client.index = currentSeq
184178
return pp, err
@@ -261,11 +255,10 @@ func (p *pendingPost) Close() error {
261255
if err != nil {
262256
return err
263257
}
264-
resp, err := (&http.Client{Transport: &http.Transport{
265-
DisableKeepAlives: true,
266-
// ignore orch certs for now
267-
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
268-
}}).Do(req)
258+
// Since this method typically gets invoked when
259+
// there is a problem sending the segment, use a
260+
// new client for a fresh connection just in case
261+
resp, err := httpClient().Do(req)
269262
if err != nil {
270263
return err
271264
}
@@ -287,6 +280,15 @@ func (c *TricklePublisher) Write(data io.Reader) error {
287280
return err
288281
}
289282

283+
func httpClient() *http.Client {
284+
return &http.Client{Transport: &http.Transport{
285+
// Re-enable keepalives to avoid connection pooling
286+
// DisableKeepAlives: true,
287+
// ignore orch certs for now
288+
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
289+
}}
290+
}
291+
290292
func humanBytes(bytes int64) string {
291293
var unit int64 = 1024
292294
if bytes < unit {

trickle/trickle_subscriber.go

+5-7
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package trickle
22

33
import (
44
"context"
5-
"crypto/tls"
65
"errors"
76
"fmt"
87
"io"
@@ -28,6 +27,7 @@ const preconnectRefreshTimeout = 20 * time.Second
2827

2928
// TrickleSubscriber represents a trickle streaming reader that always fetches from index -1
3029
type TrickleSubscriber struct {
30+
client *http.Client
3131
url string
3232
mu sync.Mutex // Mutex to manage concurrent access
3333
pendingGet *http.Response // Pre-initialized GET request
@@ -41,8 +41,9 @@ type TrickleSubscriber struct {
4141
func NewTrickleSubscriber(url string) *TrickleSubscriber {
4242
// No preconnect needed here; it will be handled by the first Read call.
4343
return &TrickleSubscriber{
44-
url: url,
45-
idx: -1, // shortcut for 'latest'
44+
client: httpClient(),
45+
url: url,
46+
idx: -1, // shortcut for 'latest'
4647
}
4748
}
4849

@@ -93,10 +94,7 @@ func (c *TrickleSubscriber) connect(ctx context.Context) (*http.Response, error)
9394
}
9495

9596
// Execute the GET request
96-
resp, err := (&http.Client{Transport: &http.Transport{
97-
DisableKeepAlives: true,
98-
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
99-
}}).Do(req)
97+
resp, err := c.client.Do(req)
10098
if err != nil {
10199
return nil, fmt.Errorf("failed to complete GET for next segment: %w", err)
102100
}

0 commit comments

Comments
 (0)