Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ai/live: Reuse HTTP connections for trickle requests #3470

Merged
merged 3 commits into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions trickle/trickle_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ var StreamNotFoundErr = errors.New("stream not found")

// TricklePublisher represents a trickle streaming client
type TricklePublisher struct {
client *http.Client
baseURL string
index int // Current index for segments
writeLock sync.Mutex // Mutex to manage concurrent access
Expand Down Expand Up @@ -47,6 +48,7 @@ func NewTricklePublisher(url string) (*TricklePublisher, error) {
c := &TricklePublisher{
baseURL: url,
contentType: "video/MP2T",
client: httpClient(),
}
p, err := c.preconnect()
if err != nil {
Expand Down Expand Up @@ -76,13 +78,7 @@ func (c *TricklePublisher) preconnect() (*pendingPost, error) {

// Start the POST request in a background goroutine
go func() {
// Createa new client to prevent connection reuse
client := http.Client{Transport: &http.Transport{
DisableKeepAlives: true,
// ignore orch certs for now
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}}
resp, err := client.Do(req)
resp, err := c.client.Do(req)
if err != nil {
slog.Error("Failed to complete POST for segment", "url", url, "err", err)
errCh <- err
Expand Down Expand Up @@ -126,11 +122,8 @@ func (c *TricklePublisher) Close() error {
if err != nil {
return err
}
resp, err := (&http.Client{Transport: &http.Transport{
DisableKeepAlives: true,
// ignore orch certs for now
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}}).Do(req)
// Use a new client for a fresh connection
resp, err := httpClient().Do(req)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just checking it was intentional to create a new client on this one

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep - this method is not on the hot path and actually only gets invoked if there is a problem sending the segment. So it seems best to have a fresh client here to mitigate any issues with the existing connection, similar to why we reset the client when we retry (here)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments clarifying this, thanks - c3330b0

if err != nil {
return err
}
Expand Down Expand Up @@ -174,11 +167,12 @@ func (p *pendingPost) reconnect() (*pendingPost, error) {
// Set the publisher's sequence sequence to the intended reconnect
// Call publisher's preconnect (which increments its sequence)
// then reset publisher's sequence back to the original
//slog.Info("Re-connecting", "url", p.client.baseURL, "seq", p.client.index)
// Also recreate the client to force a fresh connection
p.client.writeLock.Lock()
defer p.client.writeLock.Unlock()
currentSeq := p.client.index
p.client.index = p.index
p.client.client = httpClient()
pp, err := p.client.preconnect()
p.client.index = currentSeq
return pp, err
Expand Down Expand Up @@ -261,11 +255,10 @@ func (p *pendingPost) Close() error {
if err != nil {
return err
}
resp, err := (&http.Client{Transport: &http.Transport{
DisableKeepAlives: true,
// ignore orch certs for now
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}}).Do(req)
// Since this method typically gets invoked when
// there is a problem sending the segment, use a
// new client for a fresh connection just in case
resp, err := httpClient().Do(req)
if err != nil {
return err
}
Expand All @@ -287,6 +280,15 @@ func (c *TricklePublisher) Write(data io.Reader) error {
return err
}

func httpClient() *http.Client {
return &http.Client{Transport: &http.Transport{
// Re-enable keepalives to avoid connection pooling
// DisableKeepAlives: true,
// ignore orch certs for now
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}}
}

func humanBytes(bytes int64) string {
var unit int64 = 1024
if bytes < unit {
Expand Down
12 changes: 5 additions & 7 deletions trickle/trickle_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package trickle

import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
Expand All @@ -28,6 +27,7 @@ const preconnectRefreshTimeout = 20 * time.Second

// TrickleSubscriber represents a trickle streaming reader that always fetches from index -1
type TrickleSubscriber struct {
client *http.Client
url string
mu sync.Mutex // Mutex to manage concurrent access
pendingGet *http.Response // Pre-initialized GET request
Expand All @@ -41,8 +41,9 @@ type TrickleSubscriber struct {
func NewTrickleSubscriber(url string) *TrickleSubscriber {
// No preconnect needed here; it will be handled by the first Read call.
return &TrickleSubscriber{
url: url,
idx: -1, // shortcut for 'latest'
client: httpClient(),
url: url,
idx: -1, // shortcut for 'latest'
}
}

Expand Down Expand Up @@ -93,10 +94,7 @@ func (c *TrickleSubscriber) connect(ctx context.Context) (*http.Response, error)
}

// Execute the GET request
resp, err := (&http.Client{Transport: &http.Transport{
DisableKeepAlives: true,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}}).Do(req)
resp, err := c.client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to complete GET for next segment: %w", err)
}
Expand Down
Loading