Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add node version and orch addr to transcoded metadata #3165

Merged
merged 6 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## v0.X.X

- [#3165](https://github.com/livepeer/go-livepeer/pull/3165) Add node version and orch addr to transcoded metadata

### Breaking Changes 🚨🚨

### Features ⚒
Expand All @@ -22,6 +24,8 @@

#### Broadcaster

- [#3164](https://github.com/livepeer/go-livepeer/pull/3164) Fix video compatibility check

#### Orchestrator

#### Transcoder
1 change: 1 addition & 0 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,7 @@
glog.Errorf("Error setting up orchestrator: %v", err)
return
}
n.RecipientAddr = recipientAddr.Hex()

Check warning on line 851 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L851

Added line #L851 was not covered by tests

sigVerifier := &pm.DefaultSigVerifier{}
validator := pm.NewValidator(sigVerifier, timeWatcher)
Expand Down
1 change: 1 addition & 0 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type LivepeerNode struct {
// Transcoder public fields
SegmentChans map[ManifestID]SegmentChan
Recipient pm.Recipient
RecipientAddr string
SelectionAlgorithm common.SelectionAlgorithm
OrchestratorPool common.OrchestratorPool
OrchPerfScore *common.PerfScore
Expand Down
12 changes: 12 additions & 0 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,17 @@
}
md.Fname = url

orchId := "offchain"
if n.RecipientAddr != "" {
orchId = n.RecipientAddr

Check warning on line 587 in core/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

core/orchestrator.go#L587

Added line #L587 was not covered by tests
}
if isRemote {
// huge hack to thread the orch id down to the transcoder
md.Metadata = map[string]string{"orchId": orchId}

Check warning on line 591 in core/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

core/orchestrator.go#L591

Added line #L591 was not covered by tests
} else {
md.Metadata = MakeMetadata(orchId)
}

//Do the transcoding
start := time.Now()
tData, err := transcoder.Transcode(ctx, md)
Expand Down Expand Up @@ -816,6 +827,7 @@
msg := &net.NotifySegment{
Url: fname,
TaskId: taskID,
OrchId: md.Metadata["orchId"],
SegData: segData,
// Triggers failure on Os that don't know how to use SegData
Profiles: []byte("invalid"),
Expand Down
9 changes: 9 additions & 0 deletions core/streamdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type SegTranscodingMetadata struct {
AuthToken *net.AuthToken
CalcPerceptualHash bool
SegmentParameters *SegmentParameters
Metadata map[string]string
}

func (md *SegTranscodingMetadata) Flatten() []byte {
Expand Down Expand Up @@ -192,3 +193,11 @@ func (id StreamID) String() string {
func RandomManifestID() ManifestID {
return ManifestID(common.RandomIDGenerator(DefaultManifestIDLength))
}

func MakeMetadata(id string) map[string]string {
s := fmt.Sprintf("Livepeer Transcoder %s (%s)", LivepeerVersion, id)
return map[string]string{
"service_provider": s, // for mpegts
"comment": "Processed by " + s, // for mp4
}
}
16 changes: 11 additions & 5 deletions core/transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
Accel: ffmpeg.Software,
}
profiles := md.Profiles
opts := profilesToTranscodeOptions(lt.workDir, ffmpeg.Software, profiles, md.CalcPerceptualHash, md.SegmentParameters)
opts := profilesToTranscodeOptions(lt.workDir, ffmpeg.Software, md)

_, seqNo, parseErr := parseURI(md.Fname)
start := time.Now()
Expand Down Expand Up @@ -100,7 +100,7 @@
Device: nv.device,
}
profiles := md.Profiles
out := profilesToTranscodeOptions(WorkDir, ffmpeg.Netint, profiles, md.CalcPerceptualHash, md.SegmentParameters)
out := profilesToTranscodeOptions(WorkDir, ffmpeg.Netint, md)

Check warning on line 103 in core/transcoder.go

View check run for this annotation

Codecov / codecov/patch

core/transcoder.go#L103

Added line #L103 was not covered by tests

_, seqNo, parseErr := parseURI(md.Fname)
start := time.Now()
Expand Down Expand Up @@ -135,7 +135,7 @@
Device: nv.device,
}
profiles := md.Profiles
out := profilesToTranscodeOptions(WorkDir, ffmpeg.Nvidia, profiles, md.CalcPerceptualHash, md.SegmentParameters)
out := profilesToTranscodeOptions(WorkDir, ffmpeg.Nvidia, md)

Check warning on line 138 in core/transcoder.go

View check run for this annotation

Codecov / codecov/patch

core/transcoder.go#L138

Added line #L138 was not covered by tests

_, seqNo, parseErr := parseURI(md.Fname)
start := time.Now()
Expand Down Expand Up @@ -429,8 +429,13 @@
}, nil
}

func profilesToTranscodeOptions(workDir string, accel ffmpeg.Acceleration, profiles []ffmpeg.VideoProfile, calcPHash bool,
segPar *SegmentParameters) []ffmpeg.TranscodeOptions {
func profilesToTranscodeOptions(workDir string, accel ffmpeg.Acceleration, md *SegTranscodingMetadata) []ffmpeg.TranscodeOptions {
var (
profiles []ffmpeg.VideoProfile = md.Profiles
calcPHash bool = md.CalcPerceptualHash
segPar *SegmentParameters = md.SegmentParameters
metadata map[string]string = md.Metadata
)

opts := make([]ffmpeg.TranscodeOptions, len(profiles))
for i := range profiles {
Expand All @@ -440,6 +445,7 @@
Accel: accel,
AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"},
CalcSign: calcPHash,
Metadata: metadata,
}
if segPar != nil && segPar.Clip != nil {
o.From = segPar.Clip.From
Expand Down
23 changes: 18 additions & 5 deletions core/transcoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,24 @@ func TestProfilesToTranscodeOptions(t *testing.T) {
}
defer func() { common.RandomIDGenerator = oldRandIDFunc }()

makeMeta := func(p []ffmpeg.VideoProfile, c bool) *SegTranscodingMetadata {
return &SegTranscodingMetadata{
Profiles: p,
CalcPerceptualHash: c,
Metadata: map[string]string{
"meta": "data",
},
}
}

// Test 0 profiles
profiles := []ffmpeg.VideoProfile{}
opts := profilesToTranscodeOptions(workDir, ffmpeg.Software, profiles, false, nil)
opts := profilesToTranscodeOptions(workDir, ffmpeg.Software, makeMeta(profiles, false))
assert.Equal(0, len(opts))

// Test 1 profile
profiles = []ffmpeg.VideoProfile{ffmpeg.P144p30fps16x9}
opts = profilesToTranscodeOptions(workDir, ffmpeg.Software, profiles, false, nil)
opts = profilesToTranscodeOptions(workDir, ffmpeg.Software, makeMeta(profiles, false))
assert.Equal(1, len(opts))
assert.Equal("foo/out_bar.tempfile", opts[0].Oname)
assert.Equal(ffmpeg.Software, opts[0].Accel)
Expand All @@ -194,22 +204,25 @@ func TestProfilesToTranscodeOptions(t *testing.T) {

// Test > 1 profile
profiles = []ffmpeg.VideoProfile{ffmpeg.P144p30fps16x9, ffmpeg.P240p30fps16x9}
opts = profilesToTranscodeOptions(workDir, ffmpeg.Software, profiles, false, nil)
opts = profilesToTranscodeOptions(workDir, ffmpeg.Software, makeMeta(profiles, false))
assert.Equal(2, len(opts))

for i, p := range profiles {
assert.Equal("foo/out_bar.tempfile", opts[i].Oname)
assert.Equal(ffmpeg.Software, opts[i].Accel)
assert.Equal(p, opts[i].Profile)
assert.Equal("copy", opts[i].AudioEncoder.Name)
assert.Equal(opts[i].Metadata, map[string]string{
"meta": "data",
})
}

// Test different acceleration value
opts = profilesToTranscodeOptions(workDir, ffmpeg.Nvidia, profiles, false, nil)
opts = profilesToTranscodeOptions(workDir, ffmpeg.Nvidia, makeMeta(profiles, false))
assert.Equal(2, len(opts))

// Test signature calculation
opts = profilesToTranscodeOptions(workDir, ffmpeg.Nvidia, profiles, true, nil)
opts = profilesToTranscodeOptions(workDir, ffmpeg.Nvidia, makeMeta(profiles, true))
assert.True(opts[0].CalcSign)
assert.True(opts[1].CalcSign)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/jaypipes/pcidb v1.0.0
github.com/livepeer/go-tools v0.0.0-20220805063103-76df6beb6506
github.com/livepeer/livepeer-data v0.7.5-0.20231004073737-06f1f383fb18
github.com/livepeer/lpms v0.0.0-20240819180416-f87352959b85
github.com/livepeer/lpms v0.0.0-20240909171057-fe5aff1fa6a2
github.com/livepeer/m3u8 v0.11.1
github.com/mattn/go-sqlite3 v1.14.18
github.com/olekukonko/tablewriter v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,8 @@ github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded h1:ZQlvR5RB4nfT+cO
github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw=
github.com/livepeer/livepeer-data v0.7.5-0.20231004073737-06f1f383fb18 h1:4oH3NqV0NvcdS44Ld3zK2tO8IUiNozIggm74yobQeZg=
github.com/livepeer/livepeer-data v0.7.5-0.20231004073737-06f1f383fb18/go.mod h1:Jpf4jHK+fbWioBHRDRM1WadNT1qmY27g2YicTdO0Rtc=
github.com/livepeer/lpms v0.0.0-20240819180416-f87352959b85 h1:E8hJhT1nEW1jneK+Re3KyJcsITFYS9oa0vgyA6bLmKE=
github.com/livepeer/lpms v0.0.0-20240819180416-f87352959b85/go.mod h1:z5ROP1l5OzAKSoqVRLc34MjUdueil6wHSecQYV7llIw=
github.com/livepeer/lpms v0.0.0-20240909171057-fe5aff1fa6a2 h1:UYVfhBuJ2h6eYOCBaCzjoWoj3onhZ+6wFhXNllELYDA=
github.com/livepeer/lpms v0.0.0-20240909171057-fe5aff1fa6a2/go.mod h1:z5ROP1l5OzAKSoqVRLc34MjUdueil6wHSecQYV7llIw=
github.com/livepeer/m3u8 v0.11.1 h1:VkUJzfNTyjy9mqsgp5JPvouwna8wGZMvd/gAfT5FinU=
github.com/livepeer/m3u8 v0.11.1/go.mod h1:IUqAtwWPAG2CblfQa4SVzTQoDcEMPyfNOaBSxqHMS04=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
Expand Down
Loading
Loading