Skip to content

Commit

Permalink
task: Update go-api-client and stream-tester with fixes (#43)
Browse files Browse the repository at this point in the history
* go.mod: Update go-api-client to v0.2.0-beta

* go.mod: Update stream-tester to v0.12.11

Fixes error handlign on segmenter

* task: Update usage of go-api-client

* task: Fix cancellation handling on segmenter

If the loop finishes because ctx was cancelled, we'll
have no indication about the cancellation. Need to
check that explicitly.

Also notice that even if the segmenter sent the error on the
output channel, it would not be sufficient. That because if
the ctx is cancelled, it might end up never sending the error
anyway so we'd still need to check.

* task/transcode: Handle errors writing trailer

* go.mod: Use merged version of go-api-client (0.2.0)

* imports: Update all go-api-client imports to api
  • Loading branch information
victorges authored Jun 14, 2022
1 parent 46adaf6 commit 3783a26
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 75 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ go 1.16

require (
github.com/golang/glog v1.0.0
github.com/livepeer/go-api-client v0.1.5
github.com/livepeer/go-api-client v0.2.0
github.com/livepeer/go-livepeer v0.5.31
github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07
github.com/livepeer/livepeer-data v0.4.14
github.com/livepeer/stream-tester v0.12.10
github.com/livepeer/stream-tester v0.12.11
github.com/peterbourgon/ff v1.7.1
github.com/rabbitmq/amqp091-go v1.1.0
github.com/stretchr/testify v1.7.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -693,8 +693,8 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/livepeer/go-api-client v0.1.5 h1:eSiYokcg11L3uexhXefV8OJs2oB2dvsI1gGZcGnf+TM=
github.com/livepeer/go-api-client v0.1.5/go.mod h1:FPgo6hPAEQkWS/Bmepdm+O/LWfurbmym93ghzLueTUo=
github.com/livepeer/go-api-client v0.2.0 h1:s4uR619KRNZsVLYGNBIp2wKr7ErE2GEGdfpljcACao4=
github.com/livepeer/go-api-client v0.2.0/go.mod h1:FPgo6hPAEQkWS/Bmepdm+O/LWfurbmym93ghzLueTUo=
github.com/livepeer/go-livepeer v0.5.31 h1:LcN+qDnqWRws7fdVYc4ucZPVcLQRs2tehUYCQVnlnRw=
github.com/livepeer/go-livepeer v0.5.31/go.mod h1:cpBikcGWApkx0cyR0Ht+uAym7j3uAwXGpPbvaOA8XUU=
github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw=
Expand All @@ -708,8 +708,8 @@ github.com/livepeer/lpms v0.0.0-20220523122311-fc32eb80248c h1:LFwabjsjQU/bxVdUL
github.com/livepeer/lpms v0.0.0-20220523122311-fc32eb80248c/go.mod h1:Hr/JhxxPDipOVd4ZrGYWrdJfpVF8/SEI0nNr2ctAlkM=
github.com/livepeer/m3u8 v0.11.1 h1:VkUJzfNTyjy9mqsgp5JPvouwna8wGZMvd/gAfT5FinU=
github.com/livepeer/m3u8 v0.11.1/go.mod h1:IUqAtwWPAG2CblfQa4SVzTQoDcEMPyfNOaBSxqHMS04=
github.com/livepeer/stream-tester v0.12.10 h1:M7W4TPO7tGvOaQHkgx207FeCg7D45SstHdq3kzBMiMk=
github.com/livepeer/stream-tester v0.12.10/go.mod h1:UJG9vkEJuBcbEFl0AUQYu1mahBOiVRZkF/FAcvJ4nJQ=
github.com/livepeer/stream-tester v0.12.11 h1:RuHOIC5pFZy9XUbGqWLuiFoqh4UHBUp6q+//IxEOme0=
github.com/livepeer/stream-tester v0.12.11/go.mod h1:UJG9vkEJuBcbEFl0AUQYu1mahBOiVRZkF/FAcvJ4nJQ=
github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
Expand Down
4 changes: 2 additions & 2 deletions task/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package task
import (
"errors"

livepeerAPI "github.com/livepeer/go-api-client"
api "github.com/livepeer/go-api-client"
)

type UnretriableError struct{ error }
Expand All @@ -13,6 +13,6 @@ func (e UnretriableError) Error() string { return e.error.Error() }
func (e UnretriableError) Unwrap() error { return e.error }

func IsUnretriable(err error) bool {
return errors.Is(err, livepeerAPI.ErrNotExists) ||
return errors.Is(err, api.ErrNotExists) ||
errors.As(err, &UnretriableError{})
}
18 changes: 9 additions & 9 deletions task/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"strings"

"github.com/golang/glog"
livepeerAPI "github.com/livepeer/go-api-client"
api "github.com/livepeer/go-api-client"
"github.com/livepeer/livepeer-data/pkg/data"
"github.com/livepeer/task-runner/clients"
)
Expand Down Expand Up @@ -60,7 +60,7 @@ type internalMetadata struct {
Pinata interface{} `json:"pinata"`
}

func uploadFile(tctx *TaskContext, asset *livepeerAPI.Asset, content io.Reader) (*data.ExportTaskOutput, error) {
func uploadFile(tctx *TaskContext, asset *api.Asset, content io.Reader) (*data.ExportTaskOutput, error) {
params := tctx.Task.Params.Export
contentType := "video/" + asset.VideoSpec.Format
if c := params.Custom; c != nil {
Expand Down Expand Up @@ -118,16 +118,16 @@ func uploadFile(tctx *TaskContext, asset *livepeerAPI.Asset, content io.Reader)
}, nil
}

func saveNFTMetadata(tctx *TaskContext, ipfs clients.IPFS, asset *livepeerAPI.Asset, videoCID string) (string, error) {
func saveNFTMetadata(tctx *TaskContext, ipfs clients.IPFS, asset *api.Asset, videoCID string) (string, error) {
params := tctx.Task.Params.Export.IPFS
template := params.NFTMetadataTemplate
if template == livepeerAPI.NFTMetadataTemplatePlayer && asset.PlaybackRecordingID == "" {
if template == api.NFTMetadataTemplatePlayer && asset.PlaybackRecordingID == "" {
return "", fmt.Errorf("cannot create player NFT for asset without playback URL")
}
if template == "" {
template = livepeerAPI.NFTMetadataTemplatePlayer
template = api.NFTMetadataTemplatePlayer
if asset.PlaybackRecordingID == "" {
template = livepeerAPI.NFTMetadataTemplateFile
template = api.NFTMetadataTemplateFile
}
}
nftMetadata := nftMetadata(asset, videoCID, template, tctx.ExportTaskConfig)
Expand All @@ -146,12 +146,12 @@ func saveNFTMetadata(tctx *TaskContext, ipfs clients.IPFS, asset *livepeerAPI.As
return cid, nil
}

func nftMetadata(asset *livepeerAPI.Asset, videoCID string, template livepeerAPI.NFTMetadataTemplate, config ExportTaskConfig) map[string]interface{} {
func nftMetadata(asset *api.Asset, videoCID string, template api.NFTMetadataTemplate, config ExportTaskConfig) map[string]interface{} {
videoUrl := "ipfs://" + videoCID
switch template {
default:
fallthrough
case livepeerAPI.NFTMetadataTemplatePlayer:
case api.NFTMetadataTemplatePlayer:
return map[string]interface{}{
"name": asset.Name,
"description": fmt.Sprintf("Livepeer video from asset %q", asset.Name),
Expand All @@ -164,7 +164,7 @@ func nftMetadata(asset *livepeerAPI.Asset, videoCID string, template livepeerAPI
"com.livepeer.playbackId": asset.PlaybackID,
},
}
case livepeerAPI.NFTMetadataTemplateFile:
case api.NFTMetadataTemplateFile:
return map[string]interface{}{
"name": asset.Name,
"description": fmt.Sprintf("Livepeer video from asset %q", asset.Name),
Expand Down
4 changes: 2 additions & 2 deletions task/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"strings"

"github.com/golang/glog"
livepeerAPI "github.com/livepeer/go-api-client"
api "github.com/livepeer/go-api-client"
"github.com/livepeer/go-livepeer/drivers"
"github.com/livepeer/livepeer-data/pkg/data"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -96,7 +96,7 @@ func TaskImport(tctx *TaskContext) (*data.TaskOutput, error) {
}}, nil
}

func getFile(ctx context.Context, osSess drivers.OSSession, params livepeerAPI.ImportTaskParams) (name string, size uint64, content io.ReadCloser, err error) {
func getFile(ctx context.Context, osSess drivers.OSSession, params api.ImportTaskParams) (name string, size uint64, content io.ReadCloser, err error) {
if upedObjKey := params.UploadedObjectKey; upedObjKey != "" {
// TODO: We should simply "move" the file in case of direct import since we
// know the file is already in the object store. Independently, we also have
Expand Down
6 changes: 3 additions & 3 deletions task/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"testing"
"time"

livepeerAPI "github.com/livepeer/go-api-client"
api "github.com/livepeer/go-api-client"
"github.com/livepeer/go-livepeer/drivers"
"github.com/stretchr/testify/assert"
)
Expand All @@ -31,7 +31,7 @@ func TestImport(t *testing.T) {

url := "https://eric-test-livepeer.s3.amazonaws.com/bbbx3_720.mp4"

var task *livepeerAPI.Task
var task *api.Task
err := json.Unmarshal([]byte(`{"params":{"import":{"url":"`+url+`"}}}`), &task)
assert.NoError(err)
os, err := drivers.ParseOSURL(osPath, true)
Expand All @@ -40,7 +40,7 @@ func TestImport(t *testing.T) {
result, err := TaskImport(&TaskContext{
Context: ctx,
Task: task,
OutputAsset: &livepeerAPI.Asset{PlaybackID: "test-playback-id"},
OutputAsset: &api.Asset{PlaybackID: "test-playback-id"},
outputOS: os.NewSession("test_import_bbb"),
})
assert.NoError(err)
Expand Down
31 changes: 17 additions & 14 deletions task/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"github.com/golang/glog"
livepeerAPI "github.com/livepeer/go-api-client"
api "github.com/livepeer/go-api-client"
"github.com/livepeer/stream-tester/model"
"github.com/livepeer/stream-tester/segmenter"
)
Expand All @@ -17,7 +17,7 @@ const (
absoluteMinVideoBitrate = 5_000
)

var allProfiles = []livepeerAPI.Profile{
var allProfiles = []api.Profile{
{
Name: "240p0",
Fps: 0,
Expand Down Expand Up @@ -52,7 +52,7 @@ var allProfiles = []livepeerAPI.Profile{
},
}

func Prepare(tctx *TaskContext, assetSpec *livepeerAPI.AssetSpec, file io.ReadSeekCloser) (string, error) {
func Prepare(tctx *TaskContext, assetSpec *api.AssetSpec, file io.ReadSeekCloser) (string, error) {
var (
ctx = tctx.Context
lapi = tctx.lapi
Expand All @@ -64,16 +64,16 @@ func Prepare(tctx *TaskContext, assetSpec *livepeerAPI.AssetSpec, file io.ReadSe
if err != nil {
return "", nil
}
stream, err := lapi.CreateStreamEx2(streamName, true, "", nil, profiles...)
stream, err := lapi.CreateStream(api.CreateStreamReq{Name: streamName, Record: true, Profiles: profiles})
if err != nil {
return "", nil
}
defer lapi.DeleteStream(stream.ID)

gctx, gcancel := context.WithCancel(ctx)
defer gcancel()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
segmentsIn := make(chan *model.HlsSegment)
if err = segmenter.StartSegmentingR(gctx, file, true, 0, 0, segLen, false, segmentsIn); err != nil {
if err = segmenter.StartSegmentingR(ctx, file, true, 0, 0, segLen, false, segmentsIn); err != nil {
return "", err
}
var transcoded [][]byte
Expand All @@ -98,21 +98,24 @@ func Prepare(tctx *TaskContext, assetSpec *livepeerAPI.AssetSpec, file io.ReadSe
}
glog.V(model.VERBOSE).Infof("Got segment seqNo=%d pts=%s dur=%s data len bytes=%d\n", seg.SeqNo, seg.Pts, seg.Duration, len(seg.Data))
started := time.Now()
_, err = lapi.PushSegment(stream.ID, seg.SeqNo, seg.Duration, seg.Data, contentResolution)
_, err = lapi.PushSegmentR(stream.ID, seg.SeqNo, seg.Duration, seg.Data, contentResolution)
if err != nil {
glog.Errorf("Error while segment push for prepare err=%v\n", err)
break
}
glog.V(model.VERBOSE).Infof("Transcode %d took %s\n", len(transcoded), time.Since(started))
}
if ctxErr := ctx.Err(); err == nil && ctxErr != nil {
err = ctxErr
}
if err != nil && err != io.EOF {
return "", err
}
return stream.ID, nil
}

func getPlaybackProfiles(assetVideoSpec *livepeerAPI.AssetVideoSpec) ([]livepeerAPI.Profile, error) {
var video *livepeerAPI.AssetTrack
func getPlaybackProfiles(assetVideoSpec *api.AssetVideoSpec) ([]api.Profile, error) {
var video *api.AssetTrack
for _, track := range assetVideoSpec.Tracks {
if track.Type == "video" {
video = track
Expand All @@ -121,26 +124,26 @@ func getPlaybackProfiles(assetVideoSpec *livepeerAPI.AssetVideoSpec) ([]livepeer
if video == nil {
return nil, fmt.Errorf("no video track found in asset spec")
}
filtered := make([]livepeerAPI.Profile, 0, len(allProfiles))
filtered := make([]api.Profile, 0, len(allProfiles))
for _, profile := range allProfiles {
if profile.Height <= video.Height && profile.Bitrate < int(video.Bitrate) {
filtered = append(filtered, profile)
}
}
if len(filtered) == 0 {
return []livepeerAPI.Profile{lowBitrateProfile(video)}, nil
return []api.Profile{lowBitrateProfile(video)}, nil
}
return filtered, nil
}

func lowBitrateProfile(video *livepeerAPI.AssetTrack) livepeerAPI.Profile {
func lowBitrateProfile(video *api.AssetTrack) api.Profile {
bitrate := int(video.Bitrate / 3)
if bitrate < minVideoBitrate && video.Bitrate > minVideoBitrate {
bitrate = minVideoBitrate
} else if bitrate < absoluteMinVideoBitrate {
bitrate = absoluteMinVideoBitrate
}
return livepeerAPI.Profile{
return api.Profile{
Name: "low-bitrate",
Fps: 0,
Bitrate: bitrate,
Expand Down
24 changes: 12 additions & 12 deletions task/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"strconv"
"strings"

livepeerAPI "github.com/livepeer/go-api-client"
api "github.com/livepeer/go-api-client"
ffprobe "gopkg.in/vansante/go-ffprobe.v2"
)

Expand All @@ -18,10 +18,10 @@ var (
)

type FileMetadata struct {
MD5 string `json:"md5"`
SHA256 string `json:"sha256"`
Ffprobe *ffprobe.ProbeData `json:"ffprobe"`
AssetSpec *livepeerAPI.AssetSpec `json:"assetSpec"`
MD5 string `json:"md5"`
SHA256 string `json:"sha256"`
Ffprobe *ffprobe.ProbeData `json:"ffprobe"`
AssetSpec *api.AssetSpec `json:"assetSpec"`
}

func Probe(ctx context.Context, filename string, data *ReadCounter) (*FileMetadata, error) {
Expand All @@ -34,7 +34,7 @@ func Probe(ctx context.Context, filename string, data *ReadCounter) (*FileMetada
return nil, fmt.Errorf("error reading input: %w", err)
}
size, md5, sha256 := data.Count(), hasher.MD5(), hasher.SHA256()
assetSpec, err := toAssetSpec(filename, probeData, size, []livepeerAPI.AssetHash{
assetSpec, err := toAssetSpec(filename, probeData, size, []api.AssetHash{
{Hash: md5, Algorithm: "md5"},
{Hash: sha256, Algorithm: "sha256"}})
if err != nil {
Expand All @@ -48,7 +48,7 @@ func Probe(ctx context.Context, filename string, data *ReadCounter) (*FileMetada
}, nil
}

func toAssetSpec(filename string, probeData *ffprobe.ProbeData, size uint64, hash []livepeerAPI.AssetHash) (*livepeerAPI.AssetSpec, error) {
func toAssetSpec(filename string, probeData *ffprobe.ProbeData, size uint64, hash []api.AssetHash) (*api.AssetSpec, error) {
if filename == "" && probeData.Format.Filename != "pipe:" {
filename = probeData.Format.Filename
}
Expand All @@ -60,16 +60,16 @@ func toAssetSpec(filename string, probeData *ffprobe.ProbeData, size uint64, has
if probeData.Format.BitRate != "" && err != nil {
return nil, fmt.Errorf("error parsing file bitrate: %w", err)
}
spec := &livepeerAPI.AssetSpec{
spec := &api.AssetSpec{
Name: filename,
Type: "video",
Hash: hash,
Size: size,
VideoSpec: &livepeerAPI.AssetVideoSpec{
VideoSpec: &api.AssetVideoSpec{
Format: format,
DurationSec: probeData.Format.DurationSeconds,
Bitrate: bitrate,
Tracks: make([]*livepeerAPI.AssetTrack, 0, len(probeData.Streams)),
Tracks: make([]*api.AssetTrack, 0, len(probeData.Streams)),
},
}
var hasVideo bool
Expand Down Expand Up @@ -115,7 +115,7 @@ func containsStr(slc []string, val string) bool {
return false
}

func toAssetTrack(stream *ffprobe.Stream) (*livepeerAPI.AssetTrack, error) {
func toAssetTrack(stream *ffprobe.Stream) (*api.AssetTrack, error) {
if stream.CodecType != "video" && stream.CodecType != "audio" {
return nil, fmt.Errorf("unsupported codec type: %s", stream.CodecType)
} else if stream.CodecType == "video" && !supportedVideoCodecs[stream.CodecName] {
Expand All @@ -136,7 +136,7 @@ func toAssetTrack(stream *ffprobe.Stream) (*livepeerAPI.AssetTrack, error) {
if stream.BitRate != "" && err != nil {
return nil, fmt.Errorf("error parsing bitrate from track %d: %w", stream.Index, err)
}
track := &livepeerAPI.AssetTrack{
track := &api.AssetTrack{
Type: stream.CodecType,
Codec: stream.CodecName,
StartTime: startTime,
Expand Down
4 changes: 2 additions & 2 deletions task/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (
"time"

"github.com/golang/glog"
livepeerAPI "github.com/livepeer/go-api-client"
api "github.com/livepeer/go-api-client"
)

var progressReportBuckets = []float64{0, 0.25, 0.5, 0.75, 1}

const minProgressReportInterval = 10 * time.Second
const progressCheckInterval = 1 * time.Second

func ReportProgress(ctx context.Context, lapi *livepeerAPI.Client, taskID string, size uint64, getCount func() uint64) {
func ReportProgress(ctx context.Context, lapi *api.Client, taskID string, size uint64, getCount func() uint64) {
defer func() {
if r := recover(); r != nil {
glog.Errorf("Panic reporting task progress: value=%q stack:\n%s", r, string(debug.Stack()))
Expand Down
Loading

0 comments on commit 3783a26

Please sign in to comment.