Skip to content

Commit

Permalink
record-tester: Further improvements for Catalyst E2E tests (#357)
Browse files Browse the repository at this point in the history
* record-tester: Allow skipping source playback check

* record-tester: Inline options field in recordTester struct

No need for that indirection, just another layer of copying around.

* record-tester: Make profiles check required

But enforce output profiles if not provided

* record-tester: Fix num profiles check

Remove default recordingSpec if not provided.

Instead, just check there's at least 2 in response.
  • Loading branch information
victorges authored Jun 19, 2024
1 parent 9005dda commit f98674f
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 62 deletions.
2 changes: 2 additions & 0 deletions cmd/recordtester/recordtester.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func main() {
catalystPipelineStrategy := fs.String("catalyst-pipeline-strategy", "", "Which catalyst pipeline strategy to use regarding. The appropriate values are defined by catalyst-api itself.")
recordObjectStoreId := fs.String("record-object-store-id", "", "ID for the Object Store to use for recording storage. Forwarded to the streams created in the API")
recordingSpecStr := fs.String("recording-spec", "", "JSON object with the `recordingSpec` field to use in the test streams. Forwarded to the streams created in the API")
skipSourcePlayback := fs.Bool("skip-source-playback", false, "Whether to skip the source playback check on recordings processing validation")

// Discord related flags
discordURL := fs.String("discord-url", "", "URL of Discord's webhook to send messages to Discord channel")
Expand Down Expand Up @@ -347,6 +348,7 @@ func main() {
Ingest: ingest,
RecordObjectStoreId: *recordObjectStoreId,
RecordingSpec: recordingSpec,
SkipSourcePlayback: *skipSourcePlayback,
UseForceURL: *forceRecordingUrl,
RecordingWaitTime: *recordingWaitTime,
UseHTTP: *useHttp,
Expand Down
115 changes: 53 additions & 62 deletions internal/app/recordtester/recordtester_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type (
Ingest *api.Ingest
RecordObjectStoreId string
RecordingSpec *api.RecordingSpec
SkipSourcePlayback bool
UseForceURL bool
RecordingWaitTime time.Duration
UseHTTP bool
Expand All @@ -61,19 +62,11 @@ type (
}

recordTester struct {
ctx context.Context
cancel context.CancelFunc
lapi *api.Client
lanalyzers testers.AnalyzerByRegion
ingest *api.Ingest
recordObjectStoreId string
recordingSpec *api.RecordingSpec
useForceURL bool
recordingWaitTime time.Duration
useHTTP bool
mp4 bool
streamHealth bool
serfOpts SerfOptions
RecordTesterOptions
serfOpts SerfOptions

ctx context.Context
cancel context.CancelFunc

// mutable fields
streamID string
Expand All @@ -86,18 +79,9 @@ type (
func NewRecordTester(gctx context.Context, opts RecordTesterOptions, serfOpts SerfOptions) IRecordTester {
ctx, cancel := context.WithCancel(gctx)
rt := &recordTester{
lapi: opts.API,
lanalyzers: opts.Analyzers,
ingest: opts.Ingest,
RecordTesterOptions: opts,
ctx: ctx,
cancel: cancel,
recordObjectStoreId: opts.RecordObjectStoreId,
recordingSpec: opts.RecordingSpec,
useForceURL: opts.UseForceURL,
recordingWaitTime: opts.RecordingWaitTime,
useHTTP: opts.UseHTTP,
mp4: opts.TestMP4,
streamHealth: opts.TestStreamHealth,
serfOpts: serfOpts,
}
return rt
Expand All @@ -112,7 +96,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
}
apiTry := 0
for {
broadcasters, err = rt.lapi.Broadcasters()
broadcasters, err = rt.API.Broadcasters()
if err != nil {
if testers.Timedout(err) && apiTry < 3 {
apiTry++
Expand All @@ -126,21 +110,21 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
glog.V(model.DEBUG).Infof("Got broadcasters: %+v", broadcasters)
glog.V(model.DEBUG).Infof("Streaming video file '%s'\n", fileName)

if rt.useHTTP && len(broadcasters) == 0 {
if rt.UseHTTP && len(broadcasters) == 0 {
return 254, errors.New("empty list of broadcasters")
} else if (!rt.useHTTP && ingest.Ingest == "") || ingest.Playback == "" {
} else if (!rt.UseHTTP && ingest.Ingest == "") || ingest.Playback == "" {
return 254, errors.New("empty ingest URLs")
}

hostName, _ := os.Hostname()
streamName := fmt.Sprintf("%s_%s", hostName, time.Now().Format("2006-01-02T15:04:05Z07:00"))
var stream *api.Stream
for {
stream, err = rt.lapi.CreateStream(api.CreateStreamReq{
stream, err = rt.API.CreateStream(api.CreateStreamReq{
Name: streamName,
Record: true,
RecordingSpec: rt.recordingSpec,
RecordObjectStoreId: rt.recordObjectStoreId,
RecordingSpec: rt.RecordingSpec,
RecordObjectStoreId: rt.RecordObjectStoreId,
})
if err != nil {
if testers.Timedout(err) && apiTry < 3 {
Expand All @@ -166,9 +150,9 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
rtmpURL := fmt.Sprintf("%s/%s", ingest.Ingest, stream.StreamKey)

testerFuncs := []testers.StartTestFunc{}
if rt.streamHealth {
if rt.TestStreamHealth {
testerFuncs = append(testerFuncs, func(ctx context.Context, mediaURL string, waitForTarget time.Duration, opts testers.Streamer2Options) testers.Finite {
return testers.NewStreamHealth(ctx, stream.ID, rt.lanalyzers, 2*time.Minute)
return testers.NewStreamHealth(ctx, stream.ID, rt.Analyzers, 2*time.Minute)
})
}

Expand All @@ -187,7 +171,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
}
glog.V(model.SHORT).Infof("RTMP: %s streamId=%s playbackId=%s", rtmpURL, stream.ID, stream.PlaybackID)
glog.V(model.SHORT).Infof("MEDIA: %s streamId=%s playbackId=%s", mediaURL, stream.ID, stream.PlaybackID)
if rt.useHTTP {
if rt.UseHTTP {
sterr := rt.doOneHTTPStream(fileName, streamName, broadcasters[0], testDuration, stream)
if sterr != nil {
glog.Warningf("Streaming returned error err=%v streamId=%s playbackId=%s", sterr, stream.ID, stream.PlaybackID)
Expand Down Expand Up @@ -258,18 +242,18 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
}

lapiNoAPIKey := api.NewAPIClient(api.ClientOptions{
Server: rt.lapi.GetServer(),
Server: rt.API.GetServer(),
AccessToken: "", // test playback info call without API key
Timeout: 8 * time.Second,
})
if code, err := checkPlaybackInfo(stream.PlaybackID, rt.lapi, lapiNoAPIKey); err != nil {
if code, err := checkPlaybackInfo(stream.PlaybackID, rt.API, lapiNoAPIKey); err != nil {
return code, err
}

glog.Infof("Waiting 10 seconds. streamId=%s playbackId=%s", stream.ID, stream.PlaybackID)
time.Sleep(10 * time.Second)
// now get sessions
sessions, err := rt.lapi.GetSessionsNew(stream.ID, false)
sessions, err := rt.API.GetSessionsNew(stream.ID, false)
if err != nil {
glog.Errorf("Error getting sessions err=%v streamId=%s playbackId=%s", err, stream.ID, stream.PlaybackID)
return 252, err
Expand Down Expand Up @@ -304,8 +288,8 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.

glog.Infof("Streaming done, waiting for recording URL to appear. streamId=%s playbackId=%s", stream.ID, stream.PlaybackID)

deadline := time.Now().Add(rt.recordingWaitTime)
if rt.useForceURL {
deadline := time.Now().Add(rt.RecordingWaitTime)
if rt.UseForceURL {
deadline = time.Now().Add(5 * time.Second)
}

Expand All @@ -329,7 +313,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
for _, sess := range sessions {
// currently the assetID is the same as the sessionID so we could just query on that but just in case that
// ever changes, we can use the ListAssets call to find the asset
assets, _, err := rt.lapi.ListAssets(api.ListOptions{
assets, _, err := rt.API.ListAssets(api.ListOptions{
Limit: 1,
Filters: map[string]interface{}{
"sourceSessionId": sess.ID,
Expand All @@ -347,7 +331,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
}
asset := assets[0]

if code, err := checkPlaybackInfo(asset.PlaybackID, rt.lapi, lapiNoAPIKey); err != nil {
if code, err := checkPlaybackInfo(asset.PlaybackID, rt.API, lapiNoAPIKey); err != nil {
errCode, errs = code, append(errs, err)
} else {
// if we get playback before the processing is done it means source playback was provided
Expand All @@ -362,12 +346,12 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
}
}
}
if !sourcePlayback {
if !sourcePlayback && !rt.SkipSourcePlayback {
return 246, errors.New("source playback was not provided")
}

// check actual recordings playback
sessions, err = rt.lapi.GetSessionsNew(stream.ID, false)
sessions, err = rt.API.GetSessionsNew(stream.ID, false)
if err != nil {
glog.Errorf("Error getting sessions err=%v streamId=%s playbackId=%s", err, stream.ID, stream.PlaybackID)
return 252, err
Expand All @@ -383,7 +367,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.

for _, sess := range sessions {
statusShould := api.RecordingStatusReady
if rt.useForceURL {
if rt.UseForceURL {
statusShould = api.RecordingStatusWaiting
}
if sess.RecordingStatus != statusShould {
Expand All @@ -399,7 +383,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.
if err = rt.isCancelled(); err != nil {
return 0, err
}
if rt.mp4 {
if rt.TestMP4 {
es, err := rt.checkRecordingMp4(stream, sess.Mp4Url, testDuration)
if err != nil {
return es, err
Expand All @@ -417,7 +401,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time.

glog.Infof("Done Record Test. streamId=%s playbackId=%s", stream.ID, stream.PlaybackID)

rt.lapi.DeleteStream(stream.ID)
rt.API.DeleteStream(stream.ID)
return 0, nil
}

Expand All @@ -440,14 +424,14 @@ func checkPlaybackInfo(playbackID string, withKey, withoutKey *api.Client) (int,
}

func (rt *recordTester) getIngestInfo() (*api.Ingest, error) {
if rt.ingest != nil {
return rt.ingest, nil
if rt.Ingest != nil {
return rt.Ingest, nil
}
var ingests []api.Ingest
apiTry := 0
for {
var err error
ingests, err = rt.lapi.Ingest(false)
ingests, err = rt.API.Ingest(false)
if err != nil {
if testers.Timedout(err) && apiTry < 3 {
apiTry++
Expand All @@ -469,11 +453,11 @@ func (rt *recordTester) doOneHTTPStream(fileName, streamName, broadcasterURL str
var err error
apiTry := 0
for {
session, err = rt.lapi.CreateStream(api.CreateStreamReq{
session, err = rt.API.CreateStream(api.CreateStreamReq{
Name: streamName,
Record: true,
RecordingSpec: rt.recordingSpec,
RecordObjectStoreId: rt.recordObjectStoreId,
RecordingSpec: rt.RecordingSpec,
RecordObjectStoreId: rt.RecordObjectStoreId,
ParentID: stream.ID,
})
if err != nil {
Expand Down Expand Up @@ -558,7 +542,6 @@ func (rt *recordTester) checkRecordingMp4(stream *api.Stream, url string, stream
}

func (rt *recordTester) checkRecordingHls(stream *api.Stream, url string, streamDuration time.Duration) (int, error) {
es := 0
started := time.Now()
downloader := testers.NewM3utester2(rt.ctx, url, false, false, false, false, 5*time.Second, nil, false)
<-downloader.Done()
Expand All @@ -568,24 +551,32 @@ func (rt *recordTester) checkRecordingHls(stream *api.Stream, url string, stream
}
vs := downloader.VODStats()
rt.vodStats = vs
expectedProfiles := len(api.StandardProfiles) + 1
if rt.recordingSpec != nil && rt.recordingSpec.Profiles != nil {
expectedProfiles = len(*rt.recordingSpec.Profiles) + 1
}
if len(vs.SegmentsNum) != expectedProfiles {
glog.Warningf("Number of renditions doesn't match! Has %d should %d. streamId=%s playbackId=%s", len(vs.SegmentsNum), len(api.StandardProfiles)+1, stream.ID, stream.PlaybackID)
es = 35

numProfiles := len(vs.SegmentsNum)
if rt.RecordingSpec != nil && rt.RecordingSpec.Profiles != nil {
expectedProfiles := len(*rt.RecordingSpec.Profiles) + 1
if numProfiles != expectedProfiles {
glog.Warningf("Number of renditions doesn't match! Has %d should %d. streamId=%s playbackId=%s", numProfiles, expectedProfiles, stream.ID, stream.PlaybackID)
return 35, fmt.Errorf("number of renditions doesn't match (expected: %d actual: %d)", expectedProfiles, numProfiles)
}
} else {
// if there's no explicit recording spec we can only expect there's at least 2 profiles (source and transcoded)
expectedProfiles := 2
if numProfiles < expectedProfiles {
glog.Warningf("Number of renditions too low! Has %d should have at least %d. streamId=%s playbackId=%s", numProfiles, expectedProfiles, stream.ID, stream.PlaybackID)
return 35, fmt.Errorf("number of renditions too low (expected at least: %d actual: %d)", expectedProfiles, numProfiles)
}
}

glog.V(model.DEBUG).Infof("Stats: %s streamId=%s playbackId=%s", vs.String(), stream.ID, stream.PlaybackID)
glog.V(model.DEBUG).Infof("Stats raw: %+v streamId=%s playbackId=%s", vs, stream.ID, stream.PlaybackID)
if ok, ers := vs.IsOk(streamDuration, false); !ok {
glog.Warningf("NOT OK! (%s) streamId=%s playbackId=%s", ers, stream.ID, stream.PlaybackID)
es = 36
return es, errors.New(ers)
return 36, errors.New(ers)
} else {
glog.Infof("All ok! streamId=%s playbackId=%s", stream.ID, stream.PlaybackID)
}
return es, nil
return 0, nil
}

func (rt *recordTester) Cancel() {
Expand All @@ -602,7 +593,7 @@ func (rt *recordTester) VODStats() model.VODStats {

func (rt *recordTester) Clean() {
if rt.streamID != "" {
rt.lapi.DeleteStream(rt.streamID)
rt.API.DeleteStream(rt.streamID)
}
}

Expand Down

0 comments on commit f98674f

Please sign in to comment.