Skip to content

Commit bdee5b2

Browse files
authored
recordtester: Allow overriding catalyst strategy on VOD tests (#231)
* vodtester: Allow overiding catalyst pipeline * recordtester: Receive catalyst pipeline config in cli arg * recordtester: Allow running only vod tests * vodtester: Remove redundant ticker We already use test duration for the ctx and continuousTest for the pause, no need for that. * vodtester: Improve log on waiting task * vodtester: Fix progress log
1 parent 7d72d3b commit bdee5b2

File tree

5 files changed

+31
-25
lines changed

5 files changed

+31
-25
lines changed

cmd/recordtester/recordtester.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ func main() {
5959
useHttp := fs.Bool("http", false, "Do HTTP tests instead of RTMP")
6060
testMP4 := fs.Bool("mp4", false, "Download MP4 of recording")
6161
testStreamHealth := fs.Bool("stream-health", false, "Check stream health during test")
62+
testLive := fs.Bool("live", false, "Check Live workflow")
6263
testVod := fs.Bool("vod", false, "Check VOD workflow")
64+
catalystPipelineStrategy := fs.String("catalyst-pipeline-strategy", "", "Which catalyst pipeline strategy to use regarding. The appropriate values are defined by catalyst-api itself.")
6365
recordObjectStoreId := fs.String("record-object-store-id", "", "ID for the Object Store to use for recording storage. Forwarded to the streams created in the API")
6466
discordURL := fs.String("discord-url", "", "URL of Discord's webhook to send messages to Discord channel")
6567
discordUserName := fs.String("discord-user-name", "", "User name to use when sending messages to Discord")
@@ -218,7 +220,8 @@ func main() {
218220
TestStreamHealth: *testStreamHealth,
219221
}
220222
vtOpts := vodtester.VodTesterOptions{
221-
API: lapi,
223+
API: lapi,
224+
CatalystPipelineStrategy: *catalystPipelineStrategy,
222225
}
223226
if *sim > 1 {
224227
var testers []recordtester.IRecordTester
@@ -264,16 +267,18 @@ func main() {
264267
metricServer := server.NewMetricsServer()
265268
go metricServer.Start(gctx, *bind)
266269
eg, egCtx := errgroup.WithContext(gctx)
267-
eg.Go(func() error {
268-
crtOpts := recordtester.ContinuousRecordTesterOptions{
269-
PagerDutyIntegrationKey: *pagerDutyIntegrationKey,
270-
PagerDutyComponent: *pagerDutyComponent,
271-
PagerDutyLowUrgency: *pagerDutyLowUrgency,
272-
RecordTesterOptions: rtOpts,
273-
}
274-
crt := recordtester.NewContinuousRecordTester(egCtx, crtOpts)
275-
return crt.Start(fileName, *testDuration, *pauseDuration, *continuousTest)
276-
})
270+
if *testLive {
271+
eg.Go(func() error {
272+
crtOpts := recordtester.ContinuousRecordTesterOptions{
273+
PagerDutyIntegrationKey: *pagerDutyIntegrationKey,
274+
PagerDutyComponent: *pagerDutyComponent,
275+
PagerDutyLowUrgency: *pagerDutyLowUrgency,
276+
RecordTesterOptions: rtOpts,
277+
}
278+
crt := recordtester.NewContinuousRecordTester(egCtx, crtOpts)
279+
return crt.Start(fileName, *testDuration, *pauseDuration, *continuousTest)
280+
})
281+
}
277282
if *testVod {
278283
eg.Go(func() error {
279284
cvtOpts := vodtester.ContinuousVodTesterOptions{

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ require (
1212
github.com/golang/glog v1.0.0
1313
github.com/gosuri/uilive v0.0.3 // indirect
1414
github.com/gosuri/uiprogress v0.0.1
15-
github.com/livepeer/go-api-client v0.4.0
15+
github.com/livepeer/go-api-client v0.4.1-0.20221207101406-c3675c55eed5
1616
github.com/livepeer/go-livepeer v0.5.31
1717
github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07
1818
github.com/livepeer/leaderboard-serverless v1.0.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -710,8 +710,8 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL
710710
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
711711
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
712712
github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
713-
github.com/livepeer/go-api-client v0.4.0 h1:uOd8ztWrbJwmf1HrmeX3fLXHWUuTmLOqYKxmpUdmxNI=
714-
github.com/livepeer/go-api-client v0.4.0/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
713+
github.com/livepeer/go-api-client v0.4.1-0.20221207101406-c3675c55eed5 h1:sxyLVN5lD4JB5THu2+48BbhNTifJK67YvW8DyNuPBJI=
714+
github.com/livepeer/go-api-client v0.4.1-0.20221207101406-c3675c55eed5/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
715715
github.com/livepeer/go-livepeer v0.5.31 h1:LcN+qDnqWRws7fdVYc4ucZPVcLQRs2tehUYCQVnlnRw=
716716
github.com/livepeer/go-livepeer v0.5.31/go.mod h1:cpBikcGWApkx0cyR0Ht+uAym7j3uAwXGpPbvaOA8XUU=
717717
github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw=

internal/app/vodtester/continuous_vod_tester.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,7 @@ func NewContinuousVodTester(gctx context.Context, opts ContinuousVodTesterOption
5858

5959
func (cvt *continuousVodTester) Start(fileName string, vodImportUrl string, testDuration, taskPollDuration, pauseBetweenTests time.Duration) error {
6060
messenger.SendMessage(fmt.Sprintf("Starting continuous vod test of %s", cvt.host))
61-
ticker := time.NewTicker(testDuration)
62-
defer ticker.Stop()
63-
for range ticker.C {
61+
for {
6462
msg := fmt.Sprintf(":arrow_right: Starting %s vod test to %s", testDuration, cvt.host)
6563
messenger.SendMessage(msg)
6664

internal/app/vodtester/vodtester_app.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@ type (
2323
}
2424

2525
VodTesterOptions struct {
26-
API *api.Client
26+
API *api.Client
27+
CatalystPipelineStrategy string
2728
}
2829

2930
vodTester struct {
30-
ctx context.Context
31-
cancel context.CancelFunc
32-
lapi *api.Client
31+
ctx context.Context
32+
cancel context.CancelFunc
33+
lapi *api.Client
34+
catalystPipelineStrategy string
3335
}
3436
)
3537

@@ -127,7 +129,7 @@ func (vt *vodTester) Start(fileName string, vodImportUrl string, taskPollDuratio
127129

128130
func (vt *vodTester) uploadViaUrlTester(vodImportUrl string, taskPollDuration time.Duration, assetName string) (*api.Asset, error) {
129131

130-
importAsset, importTask, err := vt.lapi.ImportAsset(vodImportUrl, assetName)
132+
importAsset, importTask, err := vt.lapi.UploadViaURL(vodImportUrl, assetName, vt.catalystPipelineStrategy)
131133
if err != nil {
132134
glog.Errorf("Error importing asset err=%v", err)
133135
return nil, fmt.Errorf("error importing asset: %w", err)
@@ -145,7 +147,7 @@ func (vt *vodTester) uploadViaUrlTester(vodImportUrl string, taskPollDuration ti
145147
func (vt *vodTester) directUploadTester(fileName string, taskPollDuration time.Duration) error {
146148
hostName, _ := os.Hostname()
147149
assetName := fmt.Sprintf("vod_test_upload_direct_%s_%s", hostName, time.Now().Format("2006-01-02T15:04:05Z07:00"))
148-
requestUpload, err := vt.lapi.RequestUpload(assetName)
150+
requestUpload, err := vt.lapi.RequestUpload(assetName, vt.catalystPipelineStrategy)
149151

150152
if err != nil {
151153
glog.Errorf("Error requesting upload for assetName=%s err=%v", assetName, err)
@@ -184,7 +186,7 @@ func (vt *vodTester) resumableUploadTester(fileName string, taskPollDuration tim
184186

185187
hostName, _ := os.Hostname()
186188
assetName := fmt.Sprintf("vod_test_upload_resumable_%s_%s", hostName, time.Now().Format("2006-01-02T15:04:05Z07:00"))
187-
requestUpload, err := vt.lapi.RequestUpload(assetName)
189+
requestUpload, err := vt.lapi.RequestUpload(assetName, vt.catalystPipelineStrategy)
188190

189191
if err != nil {
190192
glog.Errorf("Error requesting upload for assetName=%s err=%v", assetName, err)
@@ -223,7 +225,6 @@ func (vt *vodTester) resumableUploadTester(fileName string, taskPollDuration tim
223225
func (vt *vodTester) checkTaskProcessing(taskPollDuration time.Duration, processingTask api.Task) error {
224226
startTime := time.Now()
225227
for {
226-
glog.Infof("Waiting %s for task id=%s to be processed, elapsed=%s", taskPollDuration, processingTask.ID, time.Since(startTime))
227228
time.Sleep(taskPollDuration)
228229

229230
if err := vt.isCancelled(); err != nil {
@@ -244,6 +245,8 @@ func (vt *vodTester) checkTaskProcessing(taskPollDuration time.Duration, process
244245
glog.Errorf("Error processing task, taskId=%s status=%s error=%v", task.ID, task.Status.Phase, task.Status.ErrorMessage)
245246
return fmt.Errorf("error processing task, taskId=%s status=%s error=%v", task.ID, task.Status.Phase, task.Status.ErrorMessage)
246247
}
248+
249+
glog.Infof("Waiting for task to be processed id=%s pollWait=%s elapsed=%s progressPct=%.1f%%", task.ID, taskPollDuration, time.Since(startTime), 100*task.Status.Progress)
247250
}
248251
}
249252

0 commit comments

Comments
 (0)