Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pricing): expand gateway max price to set per capability/model id #3116

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f2e8d47
enable setting max price per capability by gateway
ad-astra-video Aug 2, 2024
6bf4da5
fix capability price log line on startup
ad-astra-video Aug 3, 2024
3fa5448
fix setting max price from cli and webserver endpoint
ad-astra-video Aug 6, 2024
2a94ffa
fix price convert to int64 when numerator or denominator overflows int64
ad-astra-video Aug 6, 2024
055416d
Apply suggestions from code review
ad-astra-video Aug 16, 2024
931d84a
Apply suggestions from code review (missed one)
ad-astra-video Aug 16, 2024
ac5e3b1
updates for review comments: add error return to PipelineToCapability…
ad-astra-video Aug 17, 2024
cdd7462
Merge branch 'ai-video' into ai-video-set-max-price-per-capability-re…
ad-astra-video Aug 17, 2024
9e5189c
update capabilities created for selectors
ad-astra-video Aug 19, 2024
c235c7f
update GetCapabilitiesMaxPrice
ad-astra-video Aug 19, 2024
e007090
update max price per capability handlers test to initialize bradcastcfg
ad-astra-video Aug 19, 2024
c54b99f
fix server tests for api changes
ad-astra-video Aug 19, 2024
0916059
refactor: remove BroadcastConfig init method
rickstaa Aug 22, 2024
7f99222
fixup! refactor: remove BroadcastConfig init method
rickstaa Aug 22, 2024
fe75050
update for review comments
ad-astra-video Aug 22, 2024
cb71410
fix newBroadCastConfig
ad-astra-video Aug 22, 2024
d2c04bf
Merge branch 'ai-video' into ai-video-set-max-price-per-capability-re…
ad-astra-video Aug 22, 2024
9a99d63
fix price update log line to not overwrite the capName with last set
ad-astra-video Aug 22, 2024
db0e15d
set model id for price change log
ad-astra-video Aug 22, 2024
d7d0065
move capName and modelID below err check
ad-astra-video Aug 22, 2024
29334a8
feat(ai): improve maxPricePerCapability parsing
rickstaa Aug 23, 2024
625bbd5
refactor(ai): refactor PipelineToCapability method
rickstaa Aug 23, 2024
9ea6441
fixup! refactor(ai): refactor PipelineToCapability method
rickstaa Aug 23, 2024
caa9037
refactor(ai): apply some small code improvements
rickstaa Aug 23, 2024
ebe032a
refactor(ai): fix 'price_per_capability' typo
rickstaa Aug 23, 2024
2eaf8ae
refactor(ai): improve setBroadcastMaxPricePerCapability defaults
rickstaa Aug 23, 2024
a4ae9e5
fix(ai): handle priceFeed subscription memory leak
rickstaa Aug 23, 2024
15fcdd4
refactor(ai): apply code conventions
rickstaa Aug 23, 2024
a655f45
fix GetCapabilitiesMaxPrice to check for nil capabilities
ad-astra-video Aug 23, 2024
174cf32
refactor(ai): optimize code
rickstaa Aug 23, 2024
649d6fd
Merge branch 'ai-video-set-max-price-per-capability-rebase' of github…
rickstaa Aug 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.OrchPerfStatsURL = flag.String("orchPerfStatsUrl", *cfg.OrchPerfStatsURL, "URL of Orchestrator Performance Stream Tester")
cfg.Region = flag.String("region", *cfg.Region, "Region in which a broadcaster is deployed; used to select the region while using the orchestrator's performance stats")
cfg.MaxPricePerUnit = flag.String("maxPricePerUnit", *cfg.MaxPricePerUnit, "The maximum transcoding price per 'pixelsPerUnit' a broadcaster is willing to accept. If not set explicitly, broadcaster is willing to accept ANY price. Can be specified in wei or a custom currency in the format <price><currency> (e.g. 0.50USD). When using a custom currency, a corresponding price feed must be configured with -priceFeedAddr")
cfg.MaxPricePerCapability = flag.String("maxPricePerCapability", *cfg.MaxPricePerCapability, `json list of prices per capability/model or path to json config file. Use "model_id": "default" to price all models in a pipeline the same. Example: {"capabilities_prices": [{"pipeline": "text-to-image", "model_id": "stabilityai/sd-turbo", "priceperunit": 1000, "pixelsperunit": 1}, {"pipeline": "upscale", "model_id": "default", priceperunit": 1200, "pixelsperunit": 1}]}`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like the format! Do we want to add the currency field in there as well as an example?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this to snake case to keep the JSON convention. For this we can also change the PricePerGateway in this format (see https://linear.app/livepeer-ai/issue/AI-574/change-priceperbroadcaster-json-input-keys-to-recommended-conventions).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Will update pricePerGateway to same in separate PR.

cfg.MinPerfScore = flag.Float64("minPerfScore", *cfg.MinPerfScore, "The minimum orchestrator's performance score a broadcaster is willing to accept")

// Transcoding:
Expand Down
81 changes: 79 additions & 2 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type LivepeerConfig struct {
OrchPerfStatsURL *string
Region *string
MaxPricePerUnit *string
MaxPricePerCapability *string
MinPerfScore *float64
MaxSessions *string
CurrentManifest *bool
Expand Down Expand Up @@ -216,6 +217,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultMaxTotalEV := "20000000000000"
defaultDepositMultiplier := 1
defaultMaxPricePerUnit := "0"
defaultMaxPricePerCapability := ""
defaultPixelsPerUnit := "1"
defaultPriceFeedAddr := "0x639Fe6ab55C921f74e7fac1ee960C0B6293ba612" // ETH / USD price feed address on Arbitrum Mainnet
defaultAutoAdjustPrice := true
Expand Down Expand Up @@ -311,6 +313,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
MaxTotalEV: &defaultMaxTotalEV,
DepositMultiplier: &defaultDepositMultiplier,
MaxPricePerUnit: &defaultMaxPricePerUnit,
MaxPricePerCapability: &defaultMaxPricePerCapability,
PixelsPerUnit: &defaultPixelsPerUnit,
PriceFeedAddr: &defaultPriceFeedAddr,
AutoAdjustPrice: &defaultAutoAdjustPrice,
Expand Down Expand Up @@ -936,6 +939,8 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if err != nil {
panic(fmt.Errorf("The maximum price per unit must be a valid integer with an optional currency, provided %v instead\n", *cfg.MaxPricePerUnit))
}

server.BroadcastCfg.Initialize()
if maxPricePerUnit.Sign() > 0 {
pricePerPixel := new(big.Rat).Quo(maxPricePerUnit, pixelsPerUnit)
autoPrice, err := core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) {
Expand All @@ -952,6 +957,24 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
glog.Infof("Maximum transcoding price per pixel is not greater than 0: %v, broadcaster is currently set to accept ANY price.\n", *cfg.MaxPricePerUnit)
glog.Infoln("To update the broadcaster's maximum acceptable transcoding price per pixel, use the CLI or restart the broadcaster with the appropriate 'maxPricePerUnit' and 'pixelsPerUnit' values")
}
if *cfg.MaxPricePerCapability != "" {
capabilityPrices := getCapabilityPrices(*cfg.MaxPricePerCapability)
for _, p := range capabilityPrices {
capPrice := new(big.Rat).Quo(p.PricePerUnit, p.PixelsPerUnit)
cap := core.PipelineToCapability(p.Pipeline)
autoCapPrice, err := core.NewAutoConvertedPrice(p.Currency, capPrice, func(price *big.Rat) {
if monitor.Enabled {
monitor.MaxPriceForCapability(core.CapabilityNameLookup[cap], p.ModelID, price)
}
glog.Infof("Maximum price per unit set to %v wei for capability=%v model_id=%v", price.FloatString(3), p.Pipeline, p.ModelID)
})
if err != nil {
panic(fmt.Errorf("Error converting price: %v", err))
}

server.BroadcastCfg.SetCapabilityMaxPrice(core.PipelineToCapability(p.Pipeline), p.ModelID, autoCapPrice)
}
}
}

if n.NodeType == core.RedeemerNode {
Expand Down Expand Up @@ -1231,8 +1254,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
capability := aiCaps[len(aiCaps)-1]
price := n.GetBasePriceForCap("default", capability, config.ModelID)
if *cfg.Network != "offchain" {
pricePerUnit := price.Num().Int64() / price.Denom().Int64()
glog.V(6).Infof("Capability %s (ID: %v) advertised with model constraint %s at price %d wei per compute unit", config.Pipeline, capability, config.ModelID, pricePerUnit)
glog.V(6).Infof("Capability %s (ID: %v) advertised with model constraint %s at price %v wei per compute unit", config.Pipeline, capability, config.ModelID, price.FloatString(3))
} else {
glog.V(6).Infof("Capability %s (ID: %v) advertised with model constraint %s", config.Pipeline, capability, config.ModelID)
}
Expand Down Expand Up @@ -1768,6 +1790,61 @@ func getGatewayPrices(gatewayPrices string) []GatewayPrice {
return prices
}

type ModelPrice struct {
Gateway string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this field is a left over of another feature 🤔? Couldn't see it used.

Pipeline string
ModelID string
PricePerUnit *big.Rat
PixelsPerUnit *big.Rat
Currency string
}

func getCapabilityPrices(capabilitiesPrices string) []ModelPrice {
if capabilitiesPrices == "" {
return nil
}

// Format of modelPrices json
// model_id can be set to "default" to price all models in the pipeline
// same format is used for gateway and orchestrator. Setting maxPricePerCapability will not use the gateway field.
// {"capabilities_prices": [ {"gateway": "default", "pipeline": "text-to-image", "model_id": "stabilityai/sd-turbo", "priceperunit": 1000, "pixelsperunit": 1}, {"gateway": "0x0", "pipeline": "image-to-video", "model_id": "default", "priceperunit": 2000, "pixelsperunit": 3} ] }
var pricesSet struct {
CapabilitiesPrices []struct {
Gateway string `json:"gateway"`
Pipeline string `json:"pipeline"`
ModelID string `json:"model_id"`
PixelsPerUnit core.JSONRat `json:"pixelsperunit"`
PricePerUnit core.JSONRat `json:"priceperunit"`
Currency string `json:"currency"`
} `json:"capabilities_prices"`
}

pricesFileContent, _ := common.ReadFromFile(capabilitiesPrices)
err := json.Unmarshal([]byte(pricesFileContent), &pricesSet)
if err != nil {
glog.Errorf("model prices could not be parsed: %s", err)
return nil
}

prices := make([]ModelPrice, len(pricesSet.CapabilitiesPrices))
for i, p := range pricesSet.CapabilitiesPrices {
if p.Gateway == "" {
p.Gateway = "default"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could be nice to support this defaulting for the model_id field as well. Then one can specify only the pipeline for it to be the default price for the pipeline

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea. I removed the p.Gateway check and changed it to setting p.ModelID to 'default' if not set specifically.

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this field used for if, from what I understood, this is a config on the gateway itself? I thought this would make sense on an orchestrator config to set prices for specific gateways instead.

Copy link
Contributor Author

@ad-astra-video ad-astra-video Aug 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed Gateway field for now. There was a time that this logic was going to be re-used for the Orchestrator as well to set price per gateway/pipeline/model_id but I think we found a way to re-use the aiModels.json config parsing to set price per gateway at the Orchestrator. Can always add it back in if needed.

Confirmed test still passed.


prices[i] = ModelPrice{
Gateway: p.Gateway,
Pipeline: p.Pipeline,
ModelID: p.ModelID,
PricePerUnit: p.PricePerUnit.Rat,
PixelsPerUnit: p.PixelsPerUnit.Rat,
Currency: p.Currency,
}
}

return prices
}

func createSelectionAlgorithm(cfg LivepeerConfig) (common.SelectionAlgorithm, error) {
sumWeight := *cfg.SelectStakeWeight + *cfg.SelectPriceWeight + *cfg.SelectRandWeight
if math.Abs(sumWeight-1.0) > 0.0001 {
Expand Down
25 changes: 25 additions & 0 deletions cmd/livepeer/starter/starter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,31 @@ func TestParseGetGatewayPrices(t *testing.T) {
}
}

func TestMaxPricePerCapability(t *testing.T) {
assert := assert.New(t)

jsonTemplate := `{"capabilities_prices": [ {"pipeline": "text-to-image", "model_id": "stabilityai/sd-turbo", "priceperunit": 1000, "pixelsperunit": 1}, {"pipeline": "image-to-video", "model_id": "default", "priceperunit": 2000, "pixelsperunit": 3} ] }`

prices := getCapabilityPrices(jsonTemplate)
assert.NotNil(prices)
assert.Equal(2, len(prices))

//confirm gateway field is initialized to empty string
assert.Equal(prices[0].Gateway, "")
ad-astra-video marked this conversation as resolved.
Show resolved Hide resolved
//confirm Pipeline and ModelID is parsed correctly
assert.Equal(prices[0].Pipeline, "text-to-image")
assert.Equal(prices[1].Pipeline, "image-to-video")
assert.Equal(prices[0].ModelID, "stabilityai/sd-turbo")
assert.Equal(prices[1].ModelID, "default")

//confirm prices are parsed correctly
price1 := new(big.Rat).Quo(prices[0].PricePerUnit, prices[0].PixelsPerUnit)
price2 := new(big.Rat).Quo(prices[1].PricePerUnit, prices[1].PixelsPerUnit)
assert.NotEqual(price1, price2)
assert.Equal(big.NewRat(1000, 1), price1)
assert.Equal(big.NewRat(2000, 3), price2)
}

// Address provided to keystore file
func TestParse_ParseEthKeystorePathValidFile(t *testing.T) {
assert := assert.New(t)
Expand Down
1 change: 1 addition & 0 deletions cmd/livepeer_cli/livepeer_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (w *wizard) initializeOptions() []wizardOpt {
{desc: "Invoke \"cancel unlock of broadcasting funds\"", invoke: w.cancelUnlock, notOrchestrator: true},
{desc: "Invoke \"withdraw broadcasting funds\"", invoke: w.withdraw, notOrchestrator: true},
{desc: "Set broadcast config", invoke: w.setBroadcastConfig, notOrchestrator: true},
{desc: "Set max price per capability", invoke: w.setBroadcastMaxPricePerCapability, notOrchestrator: true},
{desc: "Set maximum Ethereum gas price", invoke: w.setMaxGasPrice},
{desc: "Set minimum Ethereum gas price", invoke: w.setMinGasPrice},
{desc: "Get test LPT", invoke: w.requestTokens, testnet: true},
Expand Down
27 changes: 27 additions & 0 deletions cmd/livepeer_cli/wizard_broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,33 @@ func (w *wizard) setBroadcastConfig() {
}
}

func (w *wizard) setBroadcastMaxPricePerCapability() {
fmt.Printf("Enter the pipeline to set price for: ")
pipeline := w.readString()
fmt.Printf("Enter the model id to set price for: ")
modelID := w.readString()
fmt.Printf("Enter the maximum price to pay: ")
maxPricePerUnit := w.readDefaultString("0")
fmt.Printf("Enter the price currency: ")
currency := w.readString()
pixelsPerUnit := "1"

val := url.Values{
"pixelsPerUnit": {fmt.Sprintf("%v", pixelsPerUnit)},
"maxPricePerUnit": {fmt.Sprintf("%v", maxPricePerUnit)},
"currency": {fmt.Sprintf("%v", currency)},
"pipeline": {fmt.Sprintf("%v", pipeline)},
"modelID": {fmt.Sprintf("%v", modelID)},
}

resp, ok := httpPostWithParams(fmt.Sprintf("http://%v:%v/setMaxPriceForCapability", w.host, w.httpPort), val)
if !ok {
fmt.Printf("Error setting max price for capability: %v\n", resp)
} else {
fmt.Printf("max price per capability set successfully\n")
}
}

func (w *wizard) idListToVideoProfileList(idList string, opts map[int]string) (string, error) {
ids := strings.Split(idList, ",")

Expand Down
9 changes: 9 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,15 @@ func FixedToPrice(price int64) *big.Rat {
return big.NewRat(price, priceScalingFactor)
}

func PriceToInt64(price *big.Rat) (*big.Rat, error) {
fixed := new(big.Int).Div(price.Num(), price.Denom())
if fixed.IsInt64() {
return big.NewRat(fixed.Int64(), int64(1)), nil
} else {
return nil, errors.New("price cannot convert to int64")
}
}

// BaseTokenAmountToFixed converts the base amount of a token (i.e. ETH/LPT) represented as a big.Int into a fixed point number represented
// as a int64 using a scalingFactor of 100000 resulting in max decimal places of 5
func BaseTokenAmountToFixed(baseAmount *big.Int) (int64, error) {
Expand Down
23 changes: 23 additions & 0 deletions core/ai.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"regexp"
"strconv"
"strings"
"unicode"

"github.com/livepeer/ai-worker/worker"
)
Expand Down Expand Up @@ -41,6 +42,28 @@ func (s JSONRat) String() string {
return s.FloatString(2)
}

func PipelineToCapability(pipeline string) Capability {
if len(pipeline) == 0 {
return Capability_Unused
}
runes := []rune(pipeline)
runes[0] = unicode.ToUpper(runes[0])
for i, r := range runes {
if r == '-' {
runes[i] = ' '
}
}
pipelineName := string(runes)
for cap, desc := range CapabilityNameLookup {
if pipelineName == desc {
return cap
}
}

//no capability description matches name
return Capability_Unused
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should be an explicit error instead (or the caller check this Unused value) to avoid silent bad configurations. Otherwise one might be missing a configuration that they swear they are making on the ir JSON config file which could have bad implications (e.g. paying more than they were willing to). In general I think it's better to fail explicitly for bad configs than to ignore them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it would be better to update this to return (Capability, error) to make it more obvious it should be checked by the caller.

Updated and added a test for this function in ai_test.go

}

type AIModelConfig struct {
Pipeline string `json:"pipeline"`
ModelID string `json:"model_id"`
Expand Down
8 changes: 8 additions & 0 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,14 @@ func (orch *orchestrator) PriceInfoForCaps(sender ethcommon.Address, manifestID
return nil, err
}

if !price.Num().IsInt64() || !price.Denom().IsInt64() {
fixedPrice, err := common.PriceToInt64(price)
if err != nil {
return nil, errors.New("price cannot be converted to int64")
}
price = fixedPrice
}

return &net.PriceInfo{
PricePerUnit: price.Num().Int64(),
PixelsPerUnit: price.Denom().Int64(),
Expand Down
40 changes: 29 additions & 11 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,17 @@ type (
mDeposit *stats.Float64Measure
mReserve *stats.Float64Measure
// Metrics for receiving payments
mTicketValueRecv *stats.Float64Measure
mTicketsRecv *stats.Int64Measure
mPaymentRecvErr *stats.Int64Measure
mWinningTicketsRecv *stats.Int64Measure
mValueRedeemed *stats.Float64Measure
mTicketRedemptionError *stats.Int64Measure
mSuggestedGasPrice *stats.Float64Measure
mMinGasPrice *stats.Float64Measure
mMaxGasPrice *stats.Float64Measure
mTranscodingPrice *stats.Float64Measure

mTicketValueRecv *stats.Float64Measure
mTicketsRecv *stats.Int64Measure
mPaymentRecvErr *stats.Int64Measure
mWinningTicketsRecv *stats.Int64Measure
mValueRedeemed *stats.Float64Measure
mTicketRedemptionError *stats.Int64Measure
mSuggestedGasPrice *stats.Float64Measure
mMinGasPrice *stats.Float64Measure
mMaxGasPrice *stats.Float64Measure
mTranscodingPrice *stats.Float64Measure
mTranscodingPricePerCapability *stats.Float64Measure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name it PricePerCapability?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

// Metrics for calling rewards
mRewardCallError *stats.Int64Measure

Expand Down Expand Up @@ -335,6 +335,7 @@ func InitCensus(nodeType NodeType, version string) {
census.mMinGasPrice = stats.Float64("min_gas_price", "MinGasPrice", "gwei")
census.mMaxGasPrice = stats.Float64("max_gas_price", "MaxGasPrice", "gwei")
census.mTranscodingPrice = stats.Float64("transcoding_price", "TranscodingPrice", "wei")
census.mTranscodingPricePerCapability = stats.Float64("transcoding_price_per_capability", "TranscodingPricePerCapability", "wei")

// Metrics for calling rewards
census.mRewardCallError = stats.Int64("reward_call_errors", "RewardCallError", "tot")
Expand Down Expand Up @@ -833,6 +834,13 @@ func InitCensus(nodeType NodeType, version string) {
TagKeys: baseTagsWithEthAddr,
Aggregation: view.LastValue(),
},
{
Name: "transcoding_price_per_capability",
Measure: census.mTranscodingPricePerCapability,
Description: "Transcoding price per unit per capability",
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTags...),
Aggregation: view.LastValue(),
},

// Metrics for calling rewards
{
Expand Down Expand Up @@ -1638,6 +1646,16 @@ func MaxTranscodingPrice(maxPrice *big.Rat) {
}
}

func MaxPriceForCapability(cap string, modelName string, maxPrice *big.Rat) {
floatWei, _ := maxPrice.Float64()
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kPipeline, cap), tag.Insert(census.kModelName, modelName)},
census.mTranscodingPricePerCapability.M(floatWei)); err != nil {

glog.Errorf("Error recording metrics err=%q", err)
}
}

// TicketValueRecv records the ticket value received from a sender for a manifestID
func TicketValueRecv(ctx context.Context, sender string, value *big.Rat) {
if value.Cmp(big.NewRat(0, 1)) <= 0 {
Expand Down
19 changes: 16 additions & 3 deletions server/ai_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,14 @@ func NewAISessionSelector(cap core.Capability, modelID string, node *core.Livepe
}

suspender := newSuspender()

//create caps for selector to get maxPrice
warmCaps := createAISelectorCapabilities(cap, modelID, true)
coldCaps := createAISelectorCapabilities(cap, modelID, false)
// The latency score in this context is just the latency of the last completed request for a session
// The "good enough" latency score is set to 0.0 so the selector will always select unknown sessions first
minLS := 0.0
warmPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore), suspender)
coldPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore), suspender)
warmPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore, warmCaps), suspender)
coldPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore, coldCaps), suspender)
sel := &AISessionSelector{
warmPool: warmPool,
coldPool: coldPool,
Expand All @@ -196,6 +198,17 @@ func NewAISessionSelector(cap core.Capability, modelID string, node *core.Livepe
return sel, nil
}

func createAISelectorCapabilities(cap core.Capability, modelID string, warm bool) *core.Capabilities {
var aiCaps []core.Capability
capabilityConstraints := make(map[core.Capability]*core.PerCapabilityConstraints)
aiCaps = append(aiCaps, cap)
capabilityConstraints[cap] = &core.PerCapabilityConstraints{
Models: make(map[string]*core.ModelConstraint),
}
capabilityConstraints[cap].Models[modelID] = &core.ModelConstraint{Warm: warm}
return core.NewCapabilitiesWithConstraints(aiCaps, aiCaps, core.Constraints{}, capabilityConstraints)
}

func (sel *AISessionSelector) Select(ctx context.Context) *AISession {
shouldRefreshSelector := func() bool {
// Refresh if the # of sessions across warm and cold pools falls below the smaller of the maxRefreshSessionsThreshold and
Expand Down
Loading
Loading