Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pricing): expand gateway max price to set per capability/model id #3116

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f2e8d47
enable setting max price per capability by gateway
ad-astra-video Aug 2, 2024
6bf4da5
fix capability price log line on startup
ad-astra-video Aug 3, 2024
3fa5448
fix setting max price from cli and webserver endpoint
ad-astra-video Aug 6, 2024
2a94ffa
fix price convert to int64 when numerator or denominator overflows int64
ad-astra-video Aug 6, 2024
055416d
Apply suggestions from code review
ad-astra-video Aug 16, 2024
931d84a
Apply suggestions from code review (missed one)
ad-astra-video Aug 16, 2024
ac5e3b1
updates for review comments: add error return to PipelineToCapability…
ad-astra-video Aug 17, 2024
cdd7462
Merge branch 'ai-video' into ai-video-set-max-price-per-capability-re…
ad-astra-video Aug 17, 2024
9e5189c
update capabilities created for selectors
ad-astra-video Aug 19, 2024
c235c7f
update GetCapabilitiesMaxPrice
ad-astra-video Aug 19, 2024
e007090
update max price per capability handlers test to initialize bradcastcfg
ad-astra-video Aug 19, 2024
c54b99f
fix server tests for api changes
ad-astra-video Aug 19, 2024
0916059
refactor: remove BroadcastConfig init method
rickstaa Aug 22, 2024
7f99222
fixup! refactor: remove BroadcastConfig init method
rickstaa Aug 22, 2024
fe75050
update for review comments
ad-astra-video Aug 22, 2024
cb71410
fix newBroadCastConfig
ad-astra-video Aug 22, 2024
d2c04bf
Merge branch 'ai-video' into ai-video-set-max-price-per-capability-re…
ad-astra-video Aug 22, 2024
9a99d63
fix price update log line to not overwrite the capName with last set
ad-astra-video Aug 22, 2024
db0e15d
set model id for price change log
ad-astra-video Aug 22, 2024
d7d0065
move capName and modelID below err check
ad-astra-video Aug 22, 2024
29334a8
feat(ai): improve maxPricePerCapability parsing
rickstaa Aug 23, 2024
625bbd5
refactor(ai): refactor PipelineToCapability method
rickstaa Aug 23, 2024
9ea6441
fixup! refactor(ai): refactor PipelineToCapability method
rickstaa Aug 23, 2024
caa9037
refactor(ai): apply some small code improvements
rickstaa Aug 23, 2024
ebe032a
refactor(ai): fix 'price_per_capability' typo
rickstaa Aug 23, 2024
2eaf8ae
refactor(ai): improve setBroadcastMaxPricePerCapability defaults
rickstaa Aug 23, 2024
a4ae9e5
fix(ai): handle priceFeed subscription memory leak
rickstaa Aug 23, 2024
15fcdd4
refactor(ai): apply code conventions
rickstaa Aug 23, 2024
a655f45
fix GetCapabilitiesMaxPrice to check for nil capabilities
ad-astra-video Aug 23, 2024
174cf32
refactor(ai): optimize code
rickstaa Aug 23, 2024
649d6fd
Merge branch 'ai-video-set-max-price-per-capability-rebase' of github…
rickstaa Aug 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.OrchPerfStatsURL = flag.String("orchPerfStatsUrl", *cfg.OrchPerfStatsURL, "URL of Orchestrator Performance Stream Tester")
cfg.Region = flag.String("region", *cfg.Region, "Region in which a broadcaster is deployed; used to select the region while using the orchestrator's performance stats")
cfg.MaxPricePerUnit = flag.String("maxPricePerUnit", *cfg.MaxPricePerUnit, "The maximum transcoding price per 'pixelsPerUnit' a broadcaster is willing to accept. If not set explicitly, broadcaster is willing to accept ANY price. Can be specified in wei or a custom currency in the format <price><currency> (e.g. 0.50USD). When using a custom currency, a corresponding price feed must be configured with -priceFeedAddr")
cfg.MaxPricePerCapability = flag.String("maxPricePerCapability", *cfg.MaxPricePerCapability, `json list of prices per capability/model or path to json config file. Use "model_id": "default" to price all models in a pipeline the same. Example: {"capabilities_prices": [{"pipeline": "text-to-image", "model_id": "stabilityai/sd-turbo", "price_per_unit": 1000, "pixels_per_unit": 1}, {"pipeline": "upscale", "model_id": "default", price_per_unit": 1200, "pixels_per_unit": 1}]}`)
cfg.IgnoreMaxPriceIfNeeded = flag.Bool("ignoreMaxPriceIfNeeded", *cfg.IgnoreMaxPriceIfNeeded, "Set to true to allow exceeding max price condition if there is no O that meets this requirement")
cfg.MinPerfScore = flag.Float64("minPerfScore", *cfg.MinPerfScore, "The minimum orchestrator's performance score a broadcaster is willing to accept")

Expand Down
99 changes: 98 additions & 1 deletion cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type LivepeerConfig struct {
OrchPerfStatsURL *string
Region *string
MaxPricePerUnit *string
MaxPricePerCapability *string
IgnoreMaxPriceIfNeeded *bool
MinPerfScore *float64
MaxSessions *string
Expand Down Expand Up @@ -219,6 +220,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultMaxTotalEV := "20000000000000"
defaultDepositMultiplier := 1
defaultMaxPricePerUnit := "0"
defaultMaxPricePerCapability := ""
defaultIgnoreMaxPriceIfNeeded := false
defaultPixelsPerUnit := "1"
defaultPriceFeedAddr := "0x639Fe6ab55C921f74e7fac1ee960C0B6293ba612" // ETH / USD price feed address on Arbitrum Mainnet
Expand Down Expand Up @@ -316,6 +318,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
MaxTotalEV: &defaultMaxTotalEV,
DepositMultiplier: &defaultDepositMultiplier,
MaxPricePerUnit: &defaultMaxPricePerUnit,
MaxPricePerCapability: &defaultMaxPricePerCapability,
IgnoreMaxPriceIfNeeded: &defaultIgnoreMaxPriceIfNeeded,
PixelsPerUnit: &defaultPixelsPerUnit,
PriceFeedAddr: &defaultPriceFeedAddr,
Expand Down Expand Up @@ -962,6 +965,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if err != nil {
panic(fmt.Errorf("The maximum price per unit must be a valid integer with an optional currency, provided %v instead\n", *cfg.MaxPricePerUnit))
}

if maxPricePerUnit.Sign() > 0 {
pricePerPixel := new(big.Rat).Quo(maxPricePerUnit, pixelsPerUnit)
autoPrice, err := core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) {
Expand All @@ -978,6 +982,48 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
glog.Infof("Maximum transcoding price per pixel is not greater than 0: %v, broadcaster is currently set to accept ANY price.\n", *cfg.MaxPricePerUnit)
glog.Infoln("To update the broadcaster's maximum acceptable transcoding price per pixel, use the CLI or restart the broadcaster with the appropriate 'maxPricePerUnit' and 'pixelsPerUnit' values")
}

if *cfg.MaxPricePerCapability != "" {
maxCapabilityPrices := getCapabilityPrices(*cfg.MaxPricePerCapability)
for _, p := range maxCapabilityPrices {
if p.PixelsPerUnit == nil {
p.PixelsPerUnit = pixelsPerUnit
} else if p.PixelsPerUnit.Sign() <= 0 {
glog.Infof("Pixels per unit for capability=%v model_id=%v in 'maxPricePerCapability' config is not greater than 0, using default pixelsPerUnit=%v.\n", p.Pipeline, p.ModelID, *cfg.PixelsPerUnit)
p.PixelsPerUnit = pixelsPerUnit
}

if p.PricePerUnit == nil || p.PricePerUnit.Sign() <= 0 {
if maxPricePerUnit.Sign() > 0 {
glog.Infof("Maximum price per unit not set for capability=%v model_id=%v in 'maxPricePerCapability' config, using maxPricePerUnit=%v.\n", p.Pipeline, p.ModelID, *cfg.MaxPricePerUnit)
p.PricePerUnit = maxPricePerUnit
} else {
glog.Warningf("Maximum price per unit for capability=%v model_id=%v in 'maxPricePerCapability' config is not greater than 0, and 'maxPricePerUnit' not set, gateway is currently set to accept ANY price.\n", p.Pipeline, p.ModelID)
continue
}
}

maxCapabilityPrice := new(big.Rat).Quo(p.PricePerUnit, p.PixelsPerUnit)

cap, err := core.PipelineToCapability(p.Pipeline)
if err != nil {
panic(fmt.Errorf("Pipeline in 'maxPricePerCapability' config is not valid capability: %v\n", p.Pipeline))
}
capName := core.CapabilityNameLookup[cap]
modelID := p.ModelID
autoCapPrice, err := core.NewAutoConvertedPrice(p.Currency, maxCapabilityPrice, func(price *big.Rat) {
if monitor.Enabled {
monitor.MaxPriceForCapability(capName, modelID, price)
}
glog.Infof("Maximum price per unit set to %v wei for capability=%v model_id=%v", price.FloatString(3), p.Pipeline, p.ModelID)
})
if err != nil {
panic(fmt.Errorf("Error converting price: %v", err))
}

server.BroadcastCfg.SetCapabilityMaxPrice(cap, p.ModelID, autoCapPrice)
}
}
}

if n.NodeType == core.RedeemerNode {
Expand Down Expand Up @@ -1781,8 +1827,8 @@ func getGatewayPrices(gatewayPrices string) []GatewayPrice {
Currency string `json:"currency"`
} `json:"broadcasters"`
}
pricesFileContent, _ := common.ReadFromFile(gatewayPrices)

pricesFileContent, _ := common.ReadFromFile(gatewayPrices)
err := json.Unmarshal([]byte(pricesFileContent), &pricesSet)
if err != nil {
glog.Errorf("gateway prices could not be parsed: %s", err)
Expand Down Expand Up @@ -1810,6 +1856,57 @@ func getGatewayPrices(gatewayPrices string) []GatewayPrice {
return prices
}

type ModelPrice struct {
Pipeline string
ModelID string
PricePerUnit *big.Rat
PixelsPerUnit *big.Rat
Currency string
}

func getCapabilityPrices(capabilitiesPrices string) []ModelPrice {
if capabilitiesPrices == "" {
return nil
}

// Format of modelPrices json
// Model_id will be set to "default" to price all models in the pipeline if not specified.
// {"capabilities_prices": [ {"pipeline": "text-to-image", "model_id": "stabilityai/sd-turbo", "price_per_unit": 1000, "pixels_per_unit": 1}, {"pipeline": "image-to-video", "model_id": "default", "price_per_unit": 2000, "pixels_per_unit": 3} ] }
var pricesSet struct {
CapabilitiesPrices []struct {
Pipeline string `json:"pipeline"`
ModelID string `json:"model_id"`
PixelsPerUnit core.JSONRat `json:"pixels_per_unit"`
PricePerUnit core.JSONRat `json:"price_per_unit"`
Currency string `json:"currency"`
} `json:"capabilities_prices"`
}

pricesFileContent, _ := common.ReadFromFile(capabilitiesPrices)
err := json.Unmarshal([]byte(pricesFileContent), &pricesSet)
if err != nil {
glog.Errorf("model prices could not be parsed: %s", err)
return nil
}

prices := make([]ModelPrice, len(pricesSet.CapabilitiesPrices))
for i, p := range pricesSet.CapabilitiesPrices {
if p.ModelID == "" {
p.ModelID = "default"
}

prices[i] = ModelPrice{
Pipeline: p.Pipeline,
ModelID: p.ModelID,
PricePerUnit: p.PricePerUnit.Rat,
PixelsPerUnit: p.PixelsPerUnit.Rat,
Currency: p.Currency,
}
}

return prices
}

func createSelectionAlgorithm(cfg LivepeerConfig) (common.SelectionAlgorithm, error) {
sumWeight := *cfg.SelectStakeWeight + *cfg.SelectPriceWeight + *cfg.SelectRandWeight
if math.Abs(sumWeight-1.0) > 0.0001 {
Expand Down
28 changes: 28 additions & 0 deletions cmd/livepeer/starter/starter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,34 @@ func TestParseGetGatewayPrices(t *testing.T) {
}
}

func TestMaxPricePerCapability(t *testing.T) {
assert := assert.New(t)

jsonTemplate := `{"capabilities_prices": [ {"pipeline": "text-to-image", "model_id": "stabilityai/sd-turbo", "price_per_unit": 1000, "pixels_per_unit": 1}, {"pipeline": "image-to-video", "model_id": "default", "price_per_unit": 2000, "pixels_per_unit": 3}, {"pipeline": "image-to-image", "price_per_unit": 3000, "pixels_per_unit": 1} ] }`

prices := getCapabilityPrices(jsonTemplate)
assert.NotNil(prices)
assert.Equal(3, len(prices))

// Confirm Pipeline and ModelID is parsed correctly
assert.Equal(prices[0].Pipeline, "text-to-image")
assert.Equal(prices[1].Pipeline, "image-to-video")
assert.Equal(prices[0].ModelID, "stabilityai/sd-turbo")
assert.Equal(prices[1].ModelID, "default")

// Confirm prices are parsed correctly
price1 := new(big.Rat).Quo(prices[0].PricePerUnit, prices[0].PixelsPerUnit)
price2 := new(big.Rat).Quo(prices[1].PricePerUnit, prices[1].PixelsPerUnit)
assert.NotEqual(price1, price2)
assert.Equal(big.NewRat(1000, 1), price1)
assert.Equal(big.NewRat(2000, 3), price2)

// Confirm modelID is "default" if not set and price set correctly
assert.Equal(prices[2].ModelID, "default")
price3 := new(big.Rat).Quo(prices[2].PricePerUnit, prices[2].PixelsPerUnit)
assert.Equal(big.NewRat(3000, 1), price3)
}

// Address provided to keystore file
func TestParse_ParseEthKeystorePathValidFile(t *testing.T) {
assert := assert.New(t)
Expand Down
1 change: 1 addition & 0 deletions cmd/livepeer_cli/livepeer_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (w *wizard) initializeOptions() []wizardOpt {
{desc: "Invoke \"cancel unlock of broadcasting funds\"", invoke: w.cancelUnlock, notOrchestrator: true},
{desc: "Invoke \"withdraw broadcasting funds\"", invoke: w.withdraw, notOrchestrator: true},
{desc: "Set broadcast config", invoke: w.setBroadcastConfig, notOrchestrator: true},
{desc: "Set max price per capability", invoke: w.setBroadcastMaxPricePerCapability, notOrchestrator: true},
{desc: "Set maximum Ethereum gas price", invoke: w.setMaxGasPrice},
{desc: "Set minimum Ethereum gas price", invoke: w.setMinGasPrice},
{desc: "Get test LPT", invoke: w.requestTokens, testnet: true},
Expand Down
32 changes: 32 additions & 0 deletions cmd/livepeer_cli/wizard_broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,38 @@ func (w *wizard) setBroadcastConfig() {
}
}

func (w *wizard) setBroadcastMaxPricePerCapability() {
fmt.Printf("Enter the pipeline to set price for - ")
pipeline := w.readString()
fmt.Printf("Enter the model id to set price for (default: default) - ")
modelID := w.readDefaultString("default")
fmt.Printf("Enter the maximum price to pay (default: 0) - ")
maxPricePerUnit := w.readDefaultString("0")
fmt.Printf("Enter the price currency (default: Wei) - ")
currency := w.readDefaultString("Wei")
pixelsPerUnit := "1"

// Make default case insensitive.
if strings.EqualFold(modelID, "default") {
modelID = "default"
}

val := url.Values{
"maxPricePerUnit": {fmt.Sprintf("%v", maxPricePerUnit)},
"pixelsPerUnit": {fmt.Sprintf("%v", pixelsPerUnit)},
"currency": {fmt.Sprintf("%v", currency)},
"pipeline": {fmt.Sprintf("%v", pipeline)},
"modelID": {fmt.Sprintf("%v", modelID)},
}

resp, ok := httpPostWithParams(fmt.Sprintf("http://%v:%v/setMaxPriceForCapability", w.host, w.httpPort), val)
if !ok {
fmt.Printf("Error setting max price for capability: %v\n", resp)
} else {
fmt.Printf("Max price per capability set successfully\n")
}
}

func (w *wizard) idListToVideoProfileList(idList string, opts map[int]string) (string, error) {
ids := strings.Split(idList, ",")

Expand Down
9 changes: 9 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,15 @@ func FixedToPrice(price int64) *big.Rat {
return big.NewRat(price, priceScalingFactor)
}

func PriceToInt64(price *big.Rat) (*big.Rat, error) {
fixed := new(big.Int).Div(price.Num(), price.Denom())
if fixed.IsInt64() {
return big.NewRat(fixed.Int64(), int64(1)), nil
} else {
return nil, errors.New("price cannot convert to int64")
}
}

// BaseTokenAmountToFixed converts the base amount of a token (i.e. ETH/LPT) represented as a big.Int into a fixed point number represented
// as a int64 using a scalingFactor of 100000 resulting in max decimal places of 5
func BaseTokenAmountToFixed(baseAmount *big.Int) (int64, error) {
Expand Down
20 changes: 20 additions & 0 deletions core/ai.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/livepeer/ai-worker/worker"
)

var errPipelineNotAvailable = errors.New("pipeline not available")

type AI interface {
TextToImage(context.Context, worker.TextToImageJSONRequestBody) (*worker.ImageResponse, error)
ImageToImage(context.Context, worker.ImageToImageMultipartRequestBody) (*worker.ImageResponse, error)
Expand Down Expand Up @@ -41,6 +43,24 @@ func (s JSONRat) String() string {
return s.FloatString(2)
}

// parsePipelineFromModelID converts a pipeline name to a capability enum.
func PipelineToCapability(pipeline string) (Capability, error) {
if pipeline == "" {
return Capability_Unused, errPipelineNotAvailable
}

pipelineName := strings.ToUpper(pipeline[:1]) + strings.ReplaceAll(pipeline[1:], "-", " ")

for cap, desc := range CapabilityNameLookup {
if pipelineName == desc {
return cap, nil
}
}

// No capability description matches name.
return Capability_Unused, errPipelineNotAvailable
}

type AIModelConfig struct {
Pipeline string `json:"pipeline"`
ModelID string `json:"model_id"`
Expand Down
20 changes: 20 additions & 0 deletions core/ai_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package core

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestPipelineToCapability(t *testing.T) {
good := "audio-to-text"
bad := "i-love-tests"

cap, err := PipelineToCapability(good)
assert.Nil(t, err)
assert.Equal(t, cap, Capability_AudioToText)

cap, err = PipelineToCapability(bad)
assert.Error(t, err)
assert.Equal(t, cap, Capability_Unused)
}
8 changes: 8 additions & 0 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,14 @@ func (orch *orchestrator) PriceInfoForCaps(sender ethcommon.Address, manifestID
return nil, err
}

if !price.Num().IsInt64() || !price.Denom().IsInt64() {
fixedPrice, err := common.PriceToInt64(price)
if err != nil {
return nil, errors.New("price cannot be converted to int64")
}
price = fixedPrice
}

return &net.PriceInfo{
PricePerUnit: price.Num().Int64(),
PixelsPerUnit: price.Denom().Int64(),
Expand Down
20 changes: 19 additions & 1 deletion monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ type (
mMinGasPrice *stats.Float64Measure
mMaxGasPrice *stats.Float64Measure
mTranscodingPrice *stats.Float64Measure

mPricePerCapability *stats.Float64Measure
// Metrics for calling rewards
mRewardCallError *stats.Int64Measure

Expand Down Expand Up @@ -335,6 +335,7 @@ func InitCensus(nodeType NodeType, version string) {
census.mMinGasPrice = stats.Float64("min_gas_price", "MinGasPrice", "gwei")
census.mMaxGasPrice = stats.Float64("max_gas_price", "MaxGasPrice", "gwei")
census.mTranscodingPrice = stats.Float64("transcoding_price", "TranscodingPrice", "wei")
census.mPricePerCapability = stats.Float64("price_per_capability", "PricePerCapability", "wei")

// Metrics for calling rewards
census.mRewardCallError = stats.Int64("reward_call_errors", "RewardCallError", "tot")
Expand Down Expand Up @@ -833,6 +834,13 @@ func InitCensus(nodeType NodeType, version string) {
TagKeys: baseTagsWithEthAddr,
Aggregation: view.LastValue(),
},
{
Name: "price_per_capability",
Measure: census.mPricePerCapability,
Description: "price per unit per capability",
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTags...),
Aggregation: view.LastValue(),
},

// Metrics for calling rewards
{
Expand Down Expand Up @@ -1638,6 +1646,16 @@ func MaxTranscodingPrice(maxPrice *big.Rat) {
}
}

func MaxPriceForCapability(cap string, modelName string, maxPrice *big.Rat) {
floatWei, _ := maxPrice.Float64()
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kPipeline, cap), tag.Insert(census.kModelName, modelName)},
census.mPricePerCapability.M(floatWei)); err != nil {

glog.Errorf("Error recording metrics err=%q", err)
}
}

// TicketValueRecv records the ticket value received from a sender for a manifestID
func TicketValueRecv(ctx context.Context, sender string, value *big.Rat) {
if value.Cmp(big.NewRat(0, 1)) <= 0 {
Expand Down
Loading
Loading