Skip to content

Commit

Permalink
feat(ai): add minLivepeerVersion constraint and IgnorePreReleaseVersions
Browse files Browse the repository at this point in the history
- Adds minLivepeerVersion constraint from
  #3050 to the AI codebase.
- Introduces `IgnorePreReleaseVersions` flag to filter out pre-release versions.

This update is essential for distinguishing AI subnet versions, which use pre-release
suffixes, from standard transcoding releases. This new flag can be
removed when merging in the main branch.
  • Loading branch information
rickstaa committed Jul 31, 2024
1 parent 6292df1 commit 71089b3
Show file tree
Hide file tree
Showing 11 changed files with 357 additions and 304 deletions.
1 change: 1 addition & 0 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.OrchWebhookURL = flag.String("orchWebhookUrl", *cfg.OrchWebhookURL, "Orchestrator discovery callback URL")
cfg.OrchBlacklist = flag.String("orchBlocklist", "", "Comma-separated list of blocklisted orchestrators")
cfg.OrchMinLivepeerVersion = flag.String("orchMinLivepeerVersion", *cfg.OrchMinLivepeerVersion, "Minimal go-livepeer version orchestrator should have to be selected")
cfg.IgnorePreReleaseVersions = flag.Bool("ignorePreReleaseVersions", *cfg.IgnorePreReleaseVersions, "Ignore pre-release version suffix when validating the minimum go-livepeer version (e.g., v0.7.5-beta, v0.7.5-0.20231004073737-06f1f383fb18)")
cfg.SelectRandWeight = flag.Float64("selectRandFreq", *cfg.SelectRandWeight, "Weight of the random factor in the orchestrator selection algorithm")
cfg.SelectStakeWeight = flag.Float64("selectStakeWeight", *cfg.SelectStakeWeight, "Weight of the stake factor in the orchestrator selection algorithm")
cfg.SelectPriceWeight = flag.Float64("selectPriceWeight", *cfg.SelectPriceWeight, "Weight of the price factor in the orchestrator selection algorithm")
Expand Down
168 changes: 87 additions & 81 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,84 +78,85 @@ const (
)

type LivepeerConfig struct {
Network *string
RtmpAddr *string
CliAddr *string
HttpAddr *string
ServiceAddr *string
OrchAddr *string
VerifierURL *string
EthController *string
VerifierPath *string
LocalVerify *bool
HttpIngest *bool
Orchestrator *bool
Transcoder *bool
AIWorker *bool
Gateway *bool
Broadcaster *bool
OrchSecret *string
TranscodingOptions *string
AIModels *string
MaxAttempts *int
SelectRandWeight *float64
SelectStakeWeight *float64
SelectPriceWeight *float64
SelectPriceExpFactor *float64
OrchPerfStatsURL *string
Region *string
MaxPricePerUnit *string
MinPerfScore *float64
MaxSessions *string
CurrentManifest *bool
Nvidia *string
Netint *string
TestTranscoder *bool
EthAcctAddr *string
EthPassword *string
EthKeystorePath *string
EthOrchAddr *string
EthUrl *string
TxTimeout *time.Duration
MaxTxReplacements *int
GasLimit *int
MinGasPrice *int64
MaxGasPrice *int
InitializeRound *bool
InitializeRoundMaxDelay *time.Duration
TicketEV *string
MaxFaceValue *string
MaxTicketEV *string
MaxTotalEV *string
DepositMultiplier *int
PricePerUnit *string
PixelsPerUnit *string
PriceFeedAddr *string
AutoAdjustPrice *bool
PricePerGateway *string
PricePerBroadcaster *string
BlockPollingInterval *int
Redeemer *bool
RedeemerAddr *string
Reward *bool
Monitor *bool
MetricsPerStream *bool
MetricsExposeClientIP *bool
MetadataQueueUri *string
MetadataAmqpExchange *string
MetadataPublishTimeout *time.Duration
Datadir *string
AIModelsDir *string
Objectstore *string
Recordstore *string
FVfailGsBucket *string
FVfailGsKey *string
AuthWebhookURL *string
OrchWebhookURL *string
OrchBlacklist *string
OrchMinLivepeerVersion *string
TestOrchAvail *bool
AIRunnerImage *string
Network *string
RtmpAddr *string
CliAddr *string
HttpAddr *string
ServiceAddr *string
OrchAddr *string
VerifierURL *string
EthController *string
VerifierPath *string
LocalVerify *bool
HttpIngest *bool
Orchestrator *bool
Transcoder *bool
AIWorker *bool
Gateway *bool
Broadcaster *bool
OrchSecret *string
TranscodingOptions *string
AIModels *string
MaxAttempts *int
SelectRandWeight *float64
SelectStakeWeight *float64
SelectPriceWeight *float64
SelectPriceExpFactor *float64
OrchPerfStatsURL *string
Region *string
MaxPricePerUnit *string
MinPerfScore *float64
MaxSessions *string
CurrentManifest *bool
Nvidia *string
Netint *string
TestTranscoder *bool
EthAcctAddr *string
EthPassword *string
EthKeystorePath *string
EthOrchAddr *string
EthUrl *string
TxTimeout *time.Duration
MaxTxReplacements *int
GasLimit *int
MinGasPrice *int64
MaxGasPrice *int
InitializeRound *bool
InitializeRoundMaxDelay *time.Duration
TicketEV *string
MaxFaceValue *string
MaxTicketEV *string
MaxTotalEV *string
DepositMultiplier *int
PricePerUnit *string
PixelsPerUnit *string
PriceFeedAddr *string
AutoAdjustPrice *bool
PricePerGateway *string
PricePerBroadcaster *string
BlockPollingInterval *int
Redeemer *bool
RedeemerAddr *string
Reward *bool
Monitor *bool
MetricsPerStream *bool
MetricsExposeClientIP *bool
MetadataQueueUri *string
MetadataAmqpExchange *string
MetadataPublishTimeout *time.Duration
Datadir *string
AIModelsDir *string
Objectstore *string
Recordstore *string
FVfailGsBucket *string
FVfailGsKey *string
AuthWebhookURL *string
OrchWebhookURL *string
OrchBlacklist *string
OrchMinLivepeerVersion *string
IgnorePreReleaseVersions *bool
TestOrchAvail *bool
AIRunnerImage *string
}

// DefaultLivepeerConfig creates LivepeerConfig exactly the same as when no flags are passed to the livepeer process.
Expand Down Expand Up @@ -250,6 +251,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultAuthWebhookURL := ""
defaultOrchWebhookURL := ""
defaultMinLivepeerVersion := ""
defaultIgnorePreReleaseVersions := true

// Flags
defaultTestOrchAvail := true
Expand Down Expand Up @@ -342,9 +344,12 @@ func DefaultLivepeerConfig() LivepeerConfig {
FVfailGsKey: &defaultFVfailGsKey,

// API
AuthWebhookURL: &defaultAuthWebhookURL,
OrchWebhookURL: &defaultOrchWebhookURL,
OrchMinLivepeerVersion: &defaultMinLivepeerVersion,
AuthWebhookURL: &defaultAuthWebhookURL,
OrchWebhookURL: &defaultOrchWebhookURL,

// Versioning constraints
OrchMinLivepeerVersion: &defaultMinLivepeerVersion,
IgnorePreReleaseVersions: &defaultIgnorePreReleaseVersions,

// Flags
TestOrchAvail: &defaultTestOrchAvail,
Expand Down Expand Up @@ -1379,6 +1384,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
n.Capabilities = core.NewCapabilitiesWithConstraints(append(transcoderCaps, aiCaps...), core.MandatoryOCapabilities(), core.Constraints{}, capabilityConstraints)
if cfg.OrchMinLivepeerVersion != nil {
n.Capabilities.SetMinVersionConstraint(*cfg.OrchMinLivepeerVersion)
n.Capabilities.SetIgnorePreReleaseVersions(*cfg.IgnorePreReleaseVersions)
}

if drivers.NodeStorage == nil {
Expand Down
32 changes: 25 additions & 7 deletions core/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ type ModelConstraint struct {
type Capability int
type CapabilityString []uint64
type Constraints struct {
minVersion string
minVersion string
ignorePreReleaseVersions bool
}
type PerCapabilityConstraints struct {
// Models contains a *ModelConstraint for each supported model ID
Expand Down Expand Up @@ -405,11 +406,15 @@ func (bcast *Capabilities) LivepeerVersionCompatibleWith(orch *net.Capabilities)
return false
}

// Ignore prerelease versions as in go-livepeer we actually define post-release suffixes
minVerNoSuffix, _ := minVer.SetPrerelease("")
verNoSuffix, _ := ver.SetPrerelease("")
// By default ignore prerelease versions as in go-livepeer we actually define post-release suffixes
if bcast.constraints.ignorePreReleaseVersions {
minVerNoSuffix, _ := minVer.SetPrerelease("")
verNoSuffix, _ := ver.SetPrerelease("")
minVer = &minVerNoSuffix
ver = &verNoSuffix
}

return !verNoSuffix.LessThan(&minVerNoSuffix)
return !ver.LessThan(minVer)
}

func (bcast *Capabilities) CompatibleWith(orch *net.Capabilities) bool {
Expand Down Expand Up @@ -450,7 +455,7 @@ func (c *Capabilities) ToNetCapabilities() *net.Capabilities {
}
c.mutex.Lock()
defer c.mutex.Unlock()
netCaps := &net.Capabilities{Bitstring: c.bitstring, Mandatories: c.mandatories, Version: c.version, Capacities: make(map[uint32]uint32), Constraints: &net.Capabilities_Constraints{MinVersion: c.constraints.minVersion}, CapabilityConstraints: make(map[uint32]*net.Capabilities_CapabilityConstraints)}
netCaps := &net.Capabilities{Bitstring: c.bitstring, Mandatories: c.mandatories, Version: c.version, Capacities: make(map[uint32]uint32), Constraints: &net.Capabilities_Constraints{MinVersion: c.constraints.minVersion, IgnorePreReleaseVersions: c.constraints.ignorePreReleaseVersions}, CapabilityConstraints: make(map[uint32]*net.Capabilities_CapabilityConstraints)}
for capability, capacity := range c.capacities {
netCaps.Capacities[uint32(capability)] = uint32(capacity)
}
Expand Down Expand Up @@ -478,7 +483,7 @@ func CapabilitiesFromNetCapabilities(caps *net.Capabilities) *Capabilities {
mandatories: caps.Mandatories,
capacities: make(map[Capability]int),
version: caps.Version,
constraints: Constraints{minVersion: caps.Constraints.GetMinVersion()},
constraints: Constraints{minVersion: caps.Constraints.GetMinVersion(), ignorePreReleaseVersions: caps.Constraints.GetIgnorePreReleaseVersions()},
capabilityConstraints: make(CapabilityConstraints),
}
if caps.Capacities == nil || len(caps.Capacities) == 0 {
Expand Down Expand Up @@ -718,3 +723,16 @@ func (bcast *Capabilities) MinVersionConstraint() string {
}
return ""
}

func (bcast *Capabilities) SetIgnorePreReleaseVersions(ignorePreReleaseVersions bool) {
if bcast != nil {
bcast.constraints.ignorePreReleaseVersions = ignorePreReleaseVersions
}
}

func (bcast *Capabilities) IgnorePreReleaseVersions() bool {
if bcast != nil {
return bcast.constraints.ignorePreReleaseVersions
}
return false
}
20 changes: 9 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/ethereum/go-ethereum v1.13.5
github.com/getkin/kin-openapi v0.124.0
github.com/golang/glog v1.1.1
github.com/golang/glog v1.2.1
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.4
github.com/jaypipes/ghw v0.10.0
Expand All @@ -32,15 +32,14 @@ require (
go.opencensus.io v0.24.0
go.uber.org/goleak v1.3.0
golang.org/x/net v0.25.0
google.golang.org/grpc v1.57.1
google.golang.org/protobuf v1.33.0
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.1
pgregory.net/rapid v1.1.0
)

require (
cloud.google.com/go v0.110.2 // indirect
cloud.google.com/go/compute v1.20.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/iam v1.1.0 // indirect
cloud.google.com/go/storage v1.30.1 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
Expand All @@ -54,7 +53,7 @@ require (
github.com/bits-and-blooms/bitset v1.7.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/cespare/cp v1.1.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cockroachdb/errors v1.8.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f // indirect
github.com/cockroachdb/pebble v0.0.0-20230928194634-aa077af62593 // indirect
Expand Down Expand Up @@ -102,10 +101,9 @@ require (
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/pprof v0.0.0-20230207041349-798e818bf904 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.10.0 // indirect
github.com/gorilla/mux v1.8.1 // indirect
Expand Down Expand Up @@ -220,7 +218,7 @@ require (
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/oauth2 v0.8.0 // indirect
golang.org/x/oauth2 v0.20.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
Expand All @@ -230,8 +228,8 @@ require (
google.golang.org/api v0.125.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
Loading

0 comments on commit 71089b3

Please sign in to comment.