Skip to content

Commit 9a1f15b

Browse files
committed
ai/live: Simplify payments API around readers
1 parent dc70dc3 commit 9a1f15b

File tree

4 files changed

+11
-20
lines changed

4 files changed

+11
-20
lines changed

server/ai_http.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,9 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
214214
clog.Infof(ctx, "Error getting local trickle segment err=%v", err)
215215
return
216216
}
217-
reader := segment.Reader
218217
if paymentProcessor != nil {
219-
reader = paymentProcessor.process(ctx, segment.Reader)
218+
paymentProcessor.process(ctx, segment.Reader)
220219
}
221-
io.Copy(io.Discard, reader)
222220
}
223221
}()
224222

server/ai_live_video.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func startTricklePublish(ctx context.Context, url *url.URL, params aiRequestPara
7878
defer slowOrchChecker.EndSegment()
7979
var r io.Reader = reader
8080
if paymentProcessor != nil {
81-
r = paymentProcessor.process(ctx, reader)
81+
paymentProcessor.process(ctx, reader)
8282
}
8383

8484
clog.V(8).Infof(ctx, "trickle publish writing data seq=%d", seq)

server/live_payment_processor.go

+4-13
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
"github.com/livepeer/go-livepeer/clog"
13+
"github.com/livepeer/go-livepeer/media"
1314
"github.com/livepeer/lpms/ffmpeg"
1415
)
1516

@@ -103,20 +104,13 @@ func (p *LivePaymentProcessor) processOne(ctx context.Context, timestamp time.Ti
103104
p.lastProcessedAt = timestamp
104105
}
105106

106-
func (p *LivePaymentProcessor) process(ctx context.Context, reader io.Reader) io.Reader {
107+
func (p *LivePaymentProcessor) process(ctx context.Context, reader media.CloneableReader) {
107108
timestamp := time.Now()
108109
if p.shouldSkip(timestamp) {
109110
// We don't process every segment, because it's too compute-expensive
110-
return reader
111-
}
112-
113-
pipeReader, pipeWriter, err := os.Pipe()
114-
if err != nil {
115-
clog.InfofErr(ctx, "Error creating pipe", err)
116-
return reader
111+
return
117112
}
118113

119-
resReader := io.TeeReader(reader, pipeWriter)
120114
go func() {
121115
select {
122116
case p.processCh <- timestamp:
@@ -126,8 +120,7 @@ func (p *LivePaymentProcessor) process(ctx context.Context, reader io.Reader) io
126120

127121
// read the segment into the buffer, because the direct use of the reader causes Broken pipe
128122
// it's probably related to different pace of reading by trickle and ffmpeg.GetCodecInfo()
129-
defer pipeReader.Close()
130-
segData, err := io.ReadAll(pipeReader)
123+
segData, err := io.ReadAll(reader.Clone())
131124
if err != nil {
132125
clog.InfofErr(ctx, "Error reading segment data", err)
133126
return
@@ -139,8 +132,6 @@ func (p *LivePaymentProcessor) process(ctx context.Context, reader io.Reader) io
139132
// We process one segment at the time, no need to buffer them
140133
}
141134
}()
142-
143-
return resReader
144135
}
145136

146137
func (p *LivePaymentProcessor) shouldSkip(timestamp time.Time) bool {

trickle/local_subscriber.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@ package trickle
22

33
import (
44
"errors"
5-
"io"
65
"log/slog"
76
"strconv"
87
"sync"
8+
9+
"github.com/livepeer/go-livepeer/media"
910
)
1011

1112
// local (in-memory) subscriber for trickle protocol
1213

1314
type TrickleData struct {
14-
Reader io.Reader
15+
Reader media.CloneableReader
1516
Metadata map[string]string
1617
}
1718

@@ -44,7 +45,8 @@ func (c *TrickleLocalSubscriber) Read() (*TrickleData, error) {
4445
return nil, errors.New("seq not found")
4546
}
4647
c.seq++
47-
r, w := io.Pipe()
48+
w := media.NewMediaWriter()
49+
r := w.MakeReader()
4850
go func() {
4951
subscriber := &SegmentSubscriber{
5052
segment: segment,

0 commit comments

Comments
 (0)