Skip to content

Commit

Permalink
Add node version and orch addr to transcoded metadata (#3165)
Browse files Browse the repository at this point in the history
* Update LPMS to latest.

* protoc: Regenerate
  • Loading branch information
j0sh authored Sep 9, 2024
1 parent a99a631 commit eec6ed3
Show file tree
Hide file tree
Showing 13 changed files with 1,958 additions and 1,088 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## v0.X.X

- [#3165](https://github.com/livepeer/go-livepeer/pull/3165) Add node version and orch addr to transcoded metadata

### Breaking Changes 🚨🚨

### Features ⚒
Expand All @@ -22,6 +24,8 @@

#### Broadcaster

- [#3164](https://github.com/livepeer/go-livepeer/pull/3164) Fix video compatibility check

#### Orchestrator

#### Transcoder
1 change: 1 addition & 0 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
glog.Errorf("Error setting up orchestrator: %v", err)
return
}
n.RecipientAddr = recipientAddr.Hex()

sigVerifier := &pm.DefaultSigVerifier{}
validator := pm.NewValidator(sigVerifier, timeWatcher)
Expand Down
1 change: 1 addition & 0 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type LivepeerNode struct {
// Transcoder public fields
SegmentChans map[ManifestID]SegmentChan
Recipient pm.Recipient
RecipientAddr string
SelectionAlgorithm common.SelectionAlgorithm
OrchestratorPool common.OrchestratorPool
OrchPerfScore *common.PerfScore
Expand Down
12 changes: 12 additions & 0 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,17 @@ func (n *LivepeerNode) transcodeSeg(ctx context.Context, config transcodeConfig,
}
md.Fname = url

orchId := "offchain"
if n.RecipientAddr != "" {
orchId = n.RecipientAddr
}
if isRemote {
// huge hack to thread the orch id down to the transcoder
md.Metadata = map[string]string{"orchId": orchId}
} else {
md.Metadata = MakeMetadata(orchId)
}

//Do the transcoding
start := time.Now()
tData, err := transcoder.Transcode(ctx, md)
Expand Down Expand Up @@ -816,6 +827,7 @@ func (rt *RemoteTranscoder) Transcode(logCtx context.Context, md *SegTranscoding
msg := &net.NotifySegment{
Url: fname,
TaskId: taskID,
OrchId: md.Metadata["orchId"],
SegData: segData,
// Triggers failure on Os that don't know how to use SegData
Profiles: []byte("invalid"),
Expand Down
9 changes: 9 additions & 0 deletions core/streamdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type SegTranscodingMetadata struct {
AuthToken *net.AuthToken
CalcPerceptualHash bool
SegmentParameters *SegmentParameters
Metadata map[string]string
}

func (md *SegTranscodingMetadata) Flatten() []byte {
Expand Down Expand Up @@ -192,3 +193,11 @@ func (id StreamID) String() string {
func RandomManifestID() ManifestID {
return ManifestID(common.RandomIDGenerator(DefaultManifestIDLength))
}

func MakeMetadata(id string) map[string]string {
s := fmt.Sprintf("Livepeer Transcoder %s (%s)", LivepeerVersion, id)
return map[string]string{
"service_provider": s, // for mpegts
"comment": "Processed by " + s, // for mp4
}
}
16 changes: 11 additions & 5 deletions core/transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (lt *LocalTranscoder) Transcode(ctx context.Context, md *SegTranscodingMeta
Accel: ffmpeg.Software,
}
profiles := md.Profiles
opts := profilesToTranscodeOptions(lt.workDir, ffmpeg.Software, profiles, md.CalcPerceptualHash, md.SegmentParameters)
opts := profilesToTranscodeOptions(lt.workDir, ffmpeg.Software, md)

_, seqNo, parseErr := parseURI(md.Fname)
start := time.Now()
Expand Down Expand Up @@ -100,7 +100,7 @@ func (nv *NetintTranscoder) Transcode(ctx context.Context, md *SegTranscodingMet
Device: nv.device,
}
profiles := md.Profiles
out := profilesToTranscodeOptions(WorkDir, ffmpeg.Netint, profiles, md.CalcPerceptualHash, md.SegmentParameters)
out := profilesToTranscodeOptions(WorkDir, ffmpeg.Netint, md)

_, seqNo, parseErr := parseURI(md.Fname)
start := time.Now()
Expand Down Expand Up @@ -135,7 +135,7 @@ func (nv *NvidiaTranscoder) Transcode(ctx context.Context, md *SegTranscodingMet
Device: nv.device,
}
profiles := md.Profiles
out := profilesToTranscodeOptions(WorkDir, ffmpeg.Nvidia, profiles, md.CalcPerceptualHash, md.SegmentParameters)
out := profilesToTranscodeOptions(WorkDir, ffmpeg.Nvidia, md)

_, seqNo, parseErr := parseURI(md.Fname)
start := time.Now()
Expand Down Expand Up @@ -429,8 +429,13 @@ func resToTranscodeData(ctx context.Context, res *ffmpeg.TranscodeResults, opts
}, nil
}

func profilesToTranscodeOptions(workDir string, accel ffmpeg.Acceleration, profiles []ffmpeg.VideoProfile, calcPHash bool,
segPar *SegmentParameters) []ffmpeg.TranscodeOptions {
func profilesToTranscodeOptions(workDir string, accel ffmpeg.Acceleration, md *SegTranscodingMetadata) []ffmpeg.TranscodeOptions {
var (
profiles []ffmpeg.VideoProfile = md.Profiles
calcPHash bool = md.CalcPerceptualHash
segPar *SegmentParameters = md.SegmentParameters
metadata map[string]string = md.Metadata
)

opts := make([]ffmpeg.TranscodeOptions, len(profiles))
for i := range profiles {
Expand All @@ -440,6 +445,7 @@ func profilesToTranscodeOptions(workDir string, accel ffmpeg.Acceleration, profi
Accel: accel,
AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"},
CalcSign: calcPHash,
Metadata: metadata,
}
if segPar != nil && segPar.Clip != nil {
o.From = segPar.Clip.From
Expand Down
23 changes: 18 additions & 5 deletions core/transcoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,24 @@ func TestProfilesToTranscodeOptions(t *testing.T) {
}
defer func() { common.RandomIDGenerator = oldRandIDFunc }()

makeMeta := func(p []ffmpeg.VideoProfile, c bool) *SegTranscodingMetadata {
return &SegTranscodingMetadata{
Profiles: p,
CalcPerceptualHash: c,
Metadata: map[string]string{
"meta": "data",
},
}
}

// Test 0 profiles
profiles := []ffmpeg.VideoProfile{}
opts := profilesToTranscodeOptions(workDir, ffmpeg.Software, profiles, false, nil)
opts := profilesToTranscodeOptions(workDir, ffmpeg.Software, makeMeta(profiles, false))
assert.Equal(0, len(opts))

// Test 1 profile
profiles = []ffmpeg.VideoProfile{ffmpeg.P144p30fps16x9}
opts = profilesToTranscodeOptions(workDir, ffmpeg.Software, profiles, false, nil)
opts = profilesToTranscodeOptions(workDir, ffmpeg.Software, makeMeta(profiles, false))
assert.Equal(1, len(opts))
assert.Equal("foo/out_bar.tempfile", opts[0].Oname)
assert.Equal(ffmpeg.Software, opts[0].Accel)
Expand All @@ -194,22 +204,25 @@ func TestProfilesToTranscodeOptions(t *testing.T) {

// Test > 1 profile
profiles = []ffmpeg.VideoProfile{ffmpeg.P144p30fps16x9, ffmpeg.P240p30fps16x9}
opts = profilesToTranscodeOptions(workDir, ffmpeg.Software, profiles, false, nil)
opts = profilesToTranscodeOptions(workDir, ffmpeg.Software, makeMeta(profiles, false))
assert.Equal(2, len(opts))

for i, p := range profiles {
assert.Equal("foo/out_bar.tempfile", opts[i].Oname)
assert.Equal(ffmpeg.Software, opts[i].Accel)
assert.Equal(p, opts[i].Profile)
assert.Equal("copy", opts[i].AudioEncoder.Name)
assert.Equal(opts[i].Metadata, map[string]string{
"meta": "data",
})
}

// Test different acceleration value
opts = profilesToTranscodeOptions(workDir, ffmpeg.Nvidia, profiles, false, nil)
opts = profilesToTranscodeOptions(workDir, ffmpeg.Nvidia, makeMeta(profiles, false))
assert.Equal(2, len(opts))

// Test signature calculation
opts = profilesToTranscodeOptions(workDir, ffmpeg.Nvidia, profiles, true, nil)
opts = profilesToTranscodeOptions(workDir, ffmpeg.Nvidia, makeMeta(profiles, true))
assert.True(opts[0].CalcSign)
assert.True(opts[1].CalcSign)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/jaypipes/pcidb v1.0.0
github.com/livepeer/go-tools v0.0.0-20220805063103-76df6beb6506
github.com/livepeer/livepeer-data v0.7.5-0.20231004073737-06f1f383fb18
github.com/livepeer/lpms v0.0.0-20240819180416-f87352959b85
github.com/livepeer/lpms v0.0.0-20240909171057-fe5aff1fa6a2
github.com/livepeer/m3u8 v0.11.1
github.com/mattn/go-sqlite3 v1.14.18
github.com/olekukonko/tablewriter v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,8 @@ github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded h1:ZQlvR5RB4nfT+cO
github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw=
github.com/livepeer/livepeer-data v0.7.5-0.20231004073737-06f1f383fb18 h1:4oH3NqV0NvcdS44Ld3zK2tO8IUiNozIggm74yobQeZg=
github.com/livepeer/livepeer-data v0.7.5-0.20231004073737-06f1f383fb18/go.mod h1:Jpf4jHK+fbWioBHRDRM1WadNT1qmY27g2YicTdO0Rtc=
github.com/livepeer/lpms v0.0.0-20240819180416-f87352959b85 h1:E8hJhT1nEW1jneK+Re3KyJcsITFYS9oa0vgyA6bLmKE=
github.com/livepeer/lpms v0.0.0-20240819180416-f87352959b85/go.mod h1:z5ROP1l5OzAKSoqVRLc34MjUdueil6wHSecQYV7llIw=
github.com/livepeer/lpms v0.0.0-20240909171057-fe5aff1fa6a2 h1:UYVfhBuJ2h6eYOCBaCzjoWoj3onhZ+6wFhXNllELYDA=
github.com/livepeer/lpms v0.0.0-20240909171057-fe5aff1fa6a2/go.mod h1:z5ROP1l5OzAKSoqVRLc34MjUdueil6wHSecQYV7llIw=
github.com/livepeer/m3u8 v0.11.1 h1:VkUJzfNTyjy9mqsgp5JPvouwna8wGZMvd/gAfT5FinU=
github.com/livepeer/m3u8 v0.11.1/go.mod h1:IUqAtwWPAG2CblfQa4SVzTQoDcEMPyfNOaBSxqHMS04=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
Expand Down
Loading

0 comments on commit eec6ed3

Please sign in to comment.