Skip to content

Commit

Permalink
feat(ai): add AI gateway metrics (#3087)
Browse files Browse the repository at this point in the history
* Add gateway metric for roundtrip ai times by model and pipeline

* Rename metrics and add unique manifest

* Fix name mismatch

* modelsRequested not working correctly

* feat: add initial POC AI gateway metrics

This commit adds the initial AI gateway metrics so that they can
reviewed by others. The code still need to be cleaned up and the buckets
adjusted.

* feat: improve AI metrics

This commit improves the AI metrics so that they are easier to work
with.

* feat(ai): log no capacity error to metrics

This commit ensures that an error is logged when the Gateway could not
find orchestrators for a given model and capability.

* feat(ai): add TicketValueSent and TicketsSent metrics

This commit ensure that the `ticket_value_sent` abd `tickets_sent`
metrics are also created for a AI Gateway.

* fix(ai): ensure that AI metrics have orch address label

This commit ensures that the AI gateway metrics contain the orch address
label.

* fix(ai): fix incorrect Gateway pricing metric

This commit ensures that the AI job pricing is calculated correctly and
cleans up the codebase.

* refactor(ai): remove Orch label from ai_request_price metric

This commit removes the Orch label from the ai_request_price metrics
since that information is better to be retrieved from another endpoint.

---------

Co-authored-by: Elite Encoder <[email protected]>
  • Loading branch information
rickstaa and eliteprox authored Jul 17, 2024
1 parent 2ab10c6 commit f60c0c5
Show file tree
Hide file tree
Showing 2 changed files with 226 additions and 2 deletions.
107 changes: 107 additions & 0 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ type (
kOrchestratorURI tag.Key
kOrchestratorAddress tag.Key
kFVErrorType tag.Key
kPipeline tag.Key
kModelName tag.Key
mSegmentSourceAppeared *stats.Int64Measure
mSegmentEmerged *stats.Int64Measure
mSegmentEmergedUnprocessed *stats.Int64Measure
Expand Down Expand Up @@ -190,6 +192,12 @@ type (
mSegmentClassProb *stats.Float64Measure
mSceneClassification *stats.Int64Measure

// Metrics for AI jobs
mAIModelsRequested *stats.Int64Measure
mAIRequestLatencyScore *stats.Float64Measure
mAIRequestPrice *stats.Float64Measure
mAIRequestError *stats.Int64Measure

lock sync.Mutex
emergeTimes map[uint64]map[uint64]time.Time // nonce:seqNo
success map[uint64]*segmentsAverager
Expand Down Expand Up @@ -217,6 +225,11 @@ type (
removedAt time.Time
tries map[uint64]tryData // seqNo:try
}

AIJobInfo struct {
LatencyScore float64
PricePerUnit float64
}
)

// Exporter Prometheus exporter that handles `/metrics` endpoint
Expand Down Expand Up @@ -254,6 +267,8 @@ func InitCensus(nodeType NodeType, version string) {
census.kOrchestratorAddress = tag.MustNewKey("orchestrator_address")
census.kFVErrorType = tag.MustNewKey("fverror_type")
census.kSegClassName = tag.MustNewKey("seg_class_name")
census.kModelName = tag.MustNewKey("model_name")
census.kPipeline = tag.MustNewKey("pipeline")
census.ctx, err = tag.New(ctx, tag.Insert(census.kNodeType, string(nodeType)), tag.Insert(census.kNodeID, NodeID))
if err != nil {
glog.Exit("Error creating context", err)
Expand Down Expand Up @@ -339,6 +354,12 @@ func InitCensus(nodeType NodeType, version string) {
census.mSegmentClassProb = stats.Float64("segment_class_prob", "SegmentClassProb", "tot")
census.mSceneClassification = stats.Int64("scene_classification_done", "SceneClassificationDone", "tot")

// Metrics for AI jobs
census.mAIModelsRequested = stats.Int64("ai_models_requested", "Number of AI models requested over time", "tot")
census.mAIRequestLatencyScore = stats.Float64("ai_request_latency_score", "AI request latency score, based on smallest pipeline unit", "")
census.mAIRequestPrice = stats.Float64("ai_request_price", "AI request price per unit, based on smallest pipeline unit", "")
census.mAIRequestError = stats.Int64("ai_request_errors", "Errors during AI request processing", "tot")

glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version())
glog.Infof("Livepeer version: %s", version)
glog.Infof("Node type %s node ID %s", nodeType, NodeID)
Expand Down Expand Up @@ -855,6 +876,36 @@ func InitCensus(nodeType NodeType, version string) {
TagKeys: baseTags,
Aggregation: view.Count(),
},

// Metrics for AI jobs
{
Name: "ai_models_requested",
Measure: census.mAIModelsRequested,
Description: "Number of AI models requested over time",
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTags...),
Aggregation: view.Count(),
},
{
Name: "ai_request_latency_score",
Measure: census.mAIRequestLatencyScore,
Description: "AI request latency score",
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...),
Aggregation: view.LastValue(),
},
{
Name: "ai_request_price",
Measure: census.mAIRequestPrice,
Description: "AI request price per unit",
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTags...),
Aggregation: view.LastValue(),
},
{
Name: "ai_request_errors",
Measure: census.mAIRequestError,
Description: "Errors when processing AI requests",
TagKeys: append([]tag.Key{census.kErrorCode, census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...),
Aggregation: view.Sum(),
},
}

// Register the views
Expand Down Expand Up @@ -1709,6 +1760,62 @@ func RewardCallError(sender string) {
}
}

// AIRequestFinished records gateway AI job request metrics.
func AIRequestFinished(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) {
census.recordModelRequested(pipeline, model)
census.recordAIRequestLatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo)
census.recordAIRequestPricePerUnit(pipeline, model, jobInfo.PricePerUnit, orchInfo)
}

// recordModelRequested increments request count for a specific AI model and pipeline.
func (cen *censusMetricsCounter) recordModelRequested(pipeline, modelName string) {
cen.lock.Lock()
defer cen.lock.Unlock()

if err := stats.RecordWithTags(cen.ctx,
[]tag.Mutator{tag.Insert(cen.kPipeline, pipeline), tag.Insert(cen.kModelName, modelName)}, cen.mAIModelsRequested.M(1)); err != nil {
glog.Errorf("Failed to record metrics with tags: %v", err)
}
}

// recordAIRequestLatencyScore records the latency score for a AI job request.
func (cen *censusMetricsCounter) recordAIRequestLatencyScore(Pipeline string, Model string, latencyScore float64, orchInfo *lpnet.OrchestratorInfo) {
cen.lock.Lock()
defer cen.lock.Unlock()

if err := stats.RecordWithTags(cen.ctx,
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(cen.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())},
cen.mAIRequestLatencyScore.M(latencyScore)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// recordAIRequestPricePerUnit records the price per unit for a AI job request.
func (cen *censusMetricsCounter) recordAIRequestPricePerUnit(Pipeline string, Model string, pricePerUnit float64, orchInfo *lpnet.OrchestratorInfo) {
cen.lock.Lock()
defer cen.lock.Unlock()

if err := stats.RecordWithTags(cen.ctx,
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(cen.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())},
cen.mAIRequestPrice.M(pricePerUnit)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// AIRequestError logs an error in a gateway AI job request.
func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet.OrchestratorInfo) {
orchAddr := ""
if addr := orchInfo.GetAddress(); addr != nil {
orchAddr = common.BytesToAddress(addr).String()
}

if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)},
census.mAIRequestError.M(1)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// Convert wei to gwei
func wei2gwei(wei *big.Int) float64 {
gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64()
Expand Down
Loading

0 comments on commit f60c0c5

Please sign in to comment.