Skip to content

Commit

Permalink
feat(ai): add AI orchestrator metrics (livepeer#3097)
Browse files Browse the repository at this point in the history
* Add gateway metric for roundtrip ai times by model and pipeline

* Rename metrics and add unique manifest

* Fix name mismatch

* modelsRequested not working correctly

* feat: add initial POC AI gateway metrics

This commit adds the initial AI gateway metrics so that they can
reviewed by others. The code still need to be cleaned up and the buckets
adjusted.

* feat: improve AI metrics

This commit improves the AI metrics so that they are easier to work
with.

* feat(ai): log no capacity error to metrics

This commit ensures that an error is logged when the Gateway could not
find orchestrators for a given model and capability.

* feat(ai): add TicketValueSent and TicketsSent metrics

This commit ensure that the `ticket_value_sent` abd `tickets_sent`
metrics are also created for a AI Gateway.

* fix(ai): ensure that AI metrics have orch address label

This commit ensures that the AI gateway metrics contain the orch address
label.

* feat(ai): add orchestrator AI census metrics

This commit introduces a suite of AI orchestrator metrics to the census
module, mirroring those received by the Gateway. The newly added metrics
include `ai_models_requested`, `ai_request_latency_score`,
`ai_request_price`, and `ai_request_errors`, facilitating comprehensive
tracking and analysis of AI request handling performance on the orchestrator side.

* refactor: improve orchestrator metrics tags

This commit ensures that the right tags are attached to the Orchestrator
AI metrics.

* refactor(ai): improve latency score calculations

This commit ensures that no devide by zero errors can occur in the
latency score calculations.

---------

Co-authored-by: Elite Encoder <[email protected]>
  • Loading branch information
rickstaa and eliteprox committed Jul 26, 2024
1 parent c57a53b commit 97f3d7c
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 69 deletions.
74 changes: 63 additions & 11 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ func InitCensus(nodeType NodeType, version string) {
baseTagsWithEthAddr := baseTags
baseTagsWithManifestIDAndEthAddr := baseTags
baseTagsWithOrchInfo := baseTags
baseTagsWithGatewayInfo := baseTags
if PerStreamMetrics {
baseTagsWithManifestID = []tag.Key{census.kNodeID, census.kNodeType, census.kManifestID}
baseTagsWithEthAddr = []tag.Key{census.kNodeID, census.kNodeType, census.kSender}
Expand All @@ -391,8 +392,19 @@ func InitCensus(nodeType NodeType, version string) {
}
baseTagsWithManifestIDAndOrchInfo := baseTagsWithManifestID
baseTagsWithOrchInfo = append([]tag.Key{census.kOrchestratorURI, census.kOrchestratorAddress}, baseTags...)
baseTagsWithGatewayInfo = append([]tag.Key{census.kSender}, baseTags...)
baseTagsWithManifestIDAndOrchInfo = append([]tag.Key{census.kOrchestratorURI, census.kOrchestratorAddress}, baseTagsWithManifestID...)

// Add node type specific tags.
baseTagsWithNodeInfo := baseTags
aiRequestLatencyScoreTags := baseTags
if nodeType == Orchestrator {
baseTagsWithNodeInfo = baseTagsWithGatewayInfo
} else {
baseTagsWithNodeInfo = baseTagsWithOrchInfo
aiRequestLatencyScoreTags = baseTagsWithOrchInfo
}

views := []*view.View{
{
Name: "versions",
Expand Down Expand Up @@ -889,7 +901,7 @@ func InitCensus(nodeType NodeType, version string) {
Name: "ai_request_latency_score",
Measure: census.mAIRequestLatencyScore,
Description: "AI request latency score",
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...),
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, aiRequestLatencyScoreTags...),
Aggregation: view.LastValue(),
},
{
Expand All @@ -903,7 +915,7 @@ func InitCensus(nodeType NodeType, version string) {
Name: "ai_request_errors",
Measure: census.mAIRequestError,
Description: "Errors when processing AI requests",
TagKeys: append([]tag.Key{census.kErrorCode, census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...),
TagKeys: append([]tag.Key{census.kErrorCode, census.kPipeline, census.kModelName}, baseTagsWithNodeInfo...),
Aggregation: view.Sum(),
},
}
Expand Down Expand Up @@ -1760,13 +1772,6 @@ func RewardCallError(sender string) {
}
}

// AIRequestFinished records gateway AI job request metrics.
func AIRequestFinished(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) {
census.recordModelRequested(pipeline, model)
census.recordAIRequestLatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo)
census.recordAIRequestPricePerUnit(pipeline, model, jobInfo.PricePerUnit, orchInfo)
}

// recordModelRequested increments request count for a specific AI model and pipeline.
func (cen *censusMetricsCounter) recordModelRequested(pipeline, modelName string) {
cen.lock.Lock()
Expand All @@ -1778,6 +1783,13 @@ func (cen *censusMetricsCounter) recordModelRequested(pipeline, modelName string
}
}

// AIRequestFinished records gateway AI job request metrics.
func AIRequestFinished(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) {
census.recordModelRequested(pipeline, model)
census.recordAIRequestLatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo)
census.recordAIRequestPricePerUnit(pipeline, model, jobInfo.PricePerUnit)
}

// recordAIRequestLatencyScore records the latency score for a AI job request.
func (cen *censusMetricsCounter) recordAIRequestLatencyScore(Pipeline string, Model string, latencyScore float64, orchInfo *lpnet.OrchestratorInfo) {
cen.lock.Lock()
Expand All @@ -1791,12 +1803,12 @@ func (cen *censusMetricsCounter) recordAIRequestLatencyScore(Pipeline string, Mo
}

// recordAIRequestPricePerUnit records the price per unit for a AI job request.
func (cen *censusMetricsCounter) recordAIRequestPricePerUnit(Pipeline string, Model string, pricePerUnit float64, orchInfo *lpnet.OrchestratorInfo) {
func (cen *censusMetricsCounter) recordAIRequestPricePerUnit(Pipeline string, Model string, pricePerUnit float64) {
cen.lock.Lock()
defer cen.lock.Unlock()

if err := stats.RecordWithTags(cen.ctx,
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(cen.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())},
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model)},
cen.mAIRequestPrice.M(pricePerUnit)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
Expand All @@ -1816,6 +1828,46 @@ func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet.
}
}

// AIJobProcessed records orchestrator AI job processing metrics.
func AIJobProcessed(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo) {
census.recordModelRequested(pipeline, model)
census.recordAIJobLatencyScore(pipeline, model, jobInfo.LatencyScore)
census.recordAIJobPricePerUnit(pipeline, model, jobInfo.PricePerUnit)
}

// recordAIJobLatencyScore records the latency score for a processed AI job.
func (cen *censusMetricsCounter) recordAIJobLatencyScore(Pipeline string, Model string, latencyScore float64) {
cen.lock.Lock()
defer cen.lock.Unlock()

if err := stats.RecordWithTags(cen.ctx,
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model)},
cen.mAIRequestLatencyScore.M(latencyScore)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// recordAIJobPricePerUnit logs the cost per unit of a processed AI job.
func (cen *censusMetricsCounter) recordAIJobPricePerUnit(Pipeline string, Model string, pricePerUnit float64) {
cen.lock.Lock()
defer cen.lock.Unlock()

if err := stats.RecordWithTags(cen.ctx,
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model)},
cen.mAIRequestPrice.M(pricePerUnit)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// AIProcessingError logs errors in orchestrator AI job processing.
func AIProcessingError(code string, Pipeline string, Model string, sender string) {
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kSender, sender)},
census.mAIRequestError.M(1)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// Convert wei to gwei
func wei2gwei(wei *big.Int) float64 {
gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64()
Expand Down
30 changes: 30 additions & 0 deletions server/ai_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/livepeer/go-livepeer/clog"
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/core"
"github.com/livepeer/go-livepeer/monitor"
middleware "github.com/oapi-codegen/nethttp-middleware"
"github.com/oapi-codegen/runtime"
)
Expand Down Expand Up @@ -314,6 +315,9 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request
start := time.Now()
resp, err := submitFn(ctx)
if err != nil {
if monitor.Enabled {
monitor.AIProcessingError(err.Error(), pipeline, modelID, sender.Hex())
}
respondWithError(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -326,6 +330,32 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request
// If additional parameters that influence compute cost become configurable, then the formula should be reconsidered
orch.DebitFees(sender, manifestID, payment.GetExpectedPrice(), outPixels)

if monitor.Enabled {
var latencyScore float64
switch v := req.(type) {
case worker.TextToImageJSONRequestBody:
latencyScore = CalculateTextToImageLatencyScore(took, v, outPixels)
case worker.ImageToImageMultipartRequestBody:
latencyScore = CalculateImageToImageLatencyScore(took, v, outPixels)
case worker.ImageToVideoMultipartRequestBody:
latencyScore = CalculateImageToVideoLatencyScore(took, v, outPixels)
case worker.UpscaleMultipartRequestBody:
latencyScore = CalculateUpscaleLatencyScore(took, v, outPixels)
case worker.AudioToTextMultipartRequestBody:
durationSeconds, err := common.CalculateAudioDuration(v.Audio)
if err == nil {
latencyScore = CalculateAudioToTextLatencyScore(took, durationSeconds)
}
}

var pricePerAIUnit float64
if priceInfo := payment.GetExpectedPrice(); priceInfo != nil && priceInfo.GetPixelsPerUnit() != 0 {
pricePerAIUnit = float64(priceInfo.GetPricePerUnit()) / float64(priceInfo.GetPixelsPerUnit())
}

monitor.AIJobProcessed(ctx, pipeline, modelID, monitor.AIJobInfo{LatencyScore: latencyScore, PricePerUnit: pricePerAIUnit})
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(resp)
Expand Down
Loading

0 comments on commit 97f3d7c

Please sign in to comment.