From f60c0c5a267b75aab0276166bc6e1896e2642ac8 Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Wed, 17 Jul 2024 17:51:09 +0200 Subject: [PATCH] feat(ai): add AI gateway metrics (#3087) * Add gateway metric for roundtrip ai times by model and pipeline * Rename metrics and add unique manifest * Fix name mismatch * modelsRequested not working correctly * feat: add initial POC AI gateway metrics This commit adds the initial AI gateway metrics so that they can reviewed by others. The code still need to be cleaned up and the buckets adjusted. * feat: improve AI metrics This commit improves the AI metrics so that they are easier to work with. * feat(ai): log no capacity error to metrics This commit ensures that an error is logged when the Gateway could not find orchestrators for a given model and capability. * feat(ai): add TicketValueSent and TicketsSent metrics This commit ensure that the `ticket_value_sent` abd `tickets_sent` metrics are also created for a AI Gateway. * fix(ai): ensure that AI metrics have orch address label This commit ensures that the AI gateway metrics contain the orch address label. * fix(ai): fix incorrect Gateway pricing metric This commit ensures that the AI job pricing is calculated correctly and cleans up the codebase. * refactor(ai): remove Orch label from ai_request_price metric This commit removes the Orch label from the ai_request_price metrics since that information is better to be retrieved from another endpoint. --------- Co-authored-by: Elite Encoder --- monitor/census.go | 107 ++++++++++++++++++++++++++++++++++++++ server/ai_process.go | 121 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 226 insertions(+), 2 deletions(-) diff --git a/monitor/census.go b/monitor/census.go index f89d1a41ae..373220122b 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -113,6 +113,8 @@ type ( kOrchestratorURI tag.Key kOrchestratorAddress tag.Key kFVErrorType tag.Key + kPipeline tag.Key + kModelName tag.Key mSegmentSourceAppeared *stats.Int64Measure mSegmentEmerged *stats.Int64Measure mSegmentEmergedUnprocessed *stats.Int64Measure @@ -190,6 +192,12 @@ type ( mSegmentClassProb *stats.Float64Measure mSceneClassification *stats.Int64Measure + // Metrics for AI jobs + mAIModelsRequested *stats.Int64Measure + mAIRequestLatencyScore *stats.Float64Measure + mAIRequestPrice *stats.Float64Measure + mAIRequestError *stats.Int64Measure + lock sync.Mutex emergeTimes map[uint64]map[uint64]time.Time // nonce:seqNo success map[uint64]*segmentsAverager @@ -217,6 +225,11 @@ type ( removedAt time.Time tries map[uint64]tryData // seqNo:try } + + AIJobInfo struct { + LatencyScore float64 + PricePerUnit float64 + } ) // Exporter Prometheus exporter that handles `/metrics` endpoint @@ -254,6 +267,8 @@ func InitCensus(nodeType NodeType, version string) { census.kOrchestratorAddress = tag.MustNewKey("orchestrator_address") census.kFVErrorType = tag.MustNewKey("fverror_type") census.kSegClassName = tag.MustNewKey("seg_class_name") + census.kModelName = tag.MustNewKey("model_name") + census.kPipeline = tag.MustNewKey("pipeline") census.ctx, err = tag.New(ctx, tag.Insert(census.kNodeType, string(nodeType)), tag.Insert(census.kNodeID, NodeID)) if err != nil { glog.Exit("Error creating context", err) @@ -339,6 +354,12 @@ func InitCensus(nodeType NodeType, version string) { census.mSegmentClassProb = stats.Float64("segment_class_prob", "SegmentClassProb", "tot") census.mSceneClassification = stats.Int64("scene_classification_done", "SceneClassificationDone", "tot") + // Metrics for AI jobs + census.mAIModelsRequested = stats.Int64("ai_models_requested", "Number of AI models requested over time", "tot") + census.mAIRequestLatencyScore = stats.Float64("ai_request_latency_score", "AI request latency score, based on smallest pipeline unit", "") + census.mAIRequestPrice = stats.Float64("ai_request_price", "AI request price per unit, based on smallest pipeline unit", "") + census.mAIRequestError = stats.Int64("ai_request_errors", "Errors during AI request processing", "tot") + glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version()) glog.Infof("Livepeer version: %s", version) glog.Infof("Node type %s node ID %s", nodeType, NodeID) @@ -855,6 +876,36 @@ func InitCensus(nodeType NodeType, version string) { TagKeys: baseTags, Aggregation: view.Count(), }, + + // Metrics for AI jobs + { + Name: "ai_models_requested", + Measure: census.mAIModelsRequested, + Description: "Number of AI models requested over time", + TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTags...), + Aggregation: view.Count(), + }, + { + Name: "ai_request_latency_score", + Measure: census.mAIRequestLatencyScore, + Description: "AI request latency score", + TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...), + Aggregation: view.LastValue(), + }, + { + Name: "ai_request_price", + Measure: census.mAIRequestPrice, + Description: "AI request price per unit", + TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTags...), + Aggregation: view.LastValue(), + }, + { + Name: "ai_request_errors", + Measure: census.mAIRequestError, + Description: "Errors when processing AI requests", + TagKeys: append([]tag.Key{census.kErrorCode, census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...), + Aggregation: view.Sum(), + }, } // Register the views @@ -1709,6 +1760,62 @@ func RewardCallError(sender string) { } } +// AIRequestFinished records gateway AI job request metrics. +func AIRequestFinished(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) { + census.recordModelRequested(pipeline, model) + census.recordAIRequestLatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo) + census.recordAIRequestPricePerUnit(pipeline, model, jobInfo.PricePerUnit, orchInfo) +} + +// recordModelRequested increments request count for a specific AI model and pipeline. +func (cen *censusMetricsCounter) recordModelRequested(pipeline, modelName string) { + cen.lock.Lock() + defer cen.lock.Unlock() + + if err := stats.RecordWithTags(cen.ctx, + []tag.Mutator{tag.Insert(cen.kPipeline, pipeline), tag.Insert(cen.kModelName, modelName)}, cen.mAIModelsRequested.M(1)); err != nil { + glog.Errorf("Failed to record metrics with tags: %v", err) + } +} + +// recordAIRequestLatencyScore records the latency score for a AI job request. +func (cen *censusMetricsCounter) recordAIRequestLatencyScore(Pipeline string, Model string, latencyScore float64, orchInfo *lpnet.OrchestratorInfo) { + cen.lock.Lock() + defer cen.lock.Unlock() + + if err := stats.RecordWithTags(cen.ctx, + []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(cen.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())}, + cen.mAIRequestLatencyScore.M(latencyScore)); err != nil { + glog.Errorf("Error recording metrics err=%q", err) + } +} + +// recordAIRequestPricePerUnit records the price per unit for a AI job request. +func (cen *censusMetricsCounter) recordAIRequestPricePerUnit(Pipeline string, Model string, pricePerUnit float64, orchInfo *lpnet.OrchestratorInfo) { + cen.lock.Lock() + defer cen.lock.Unlock() + + if err := stats.RecordWithTags(cen.ctx, + []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(cen.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())}, + cen.mAIRequestPrice.M(pricePerUnit)); err != nil { + glog.Errorf("Error recording metrics err=%q", err) + } +} + +// AIRequestError logs an error in a gateway AI job request. +func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet.OrchestratorInfo) { + orchAddr := "" + if addr := orchInfo.GetAddress(); addr != nil { + orchAddr = common.BytesToAddress(addr).String() + } + + if err := stats.RecordWithTags(census.ctx, + []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)}, + census.mAIRequestError.M(1)); err != nil { + glog.Errorf("Error recording metrics err=%q", err) + } +} + // Convert wei to gwei func wei2gwei(wei *big.Int) float64 { gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64() diff --git a/server/ai_process.go b/server/ai_process.go index 0f5ab335f4..14b746f7de 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -19,6 +19,7 @@ import ( "github.com/livepeer/go-livepeer/clog" "github.com/livepeer/go-livepeer/common" "github.com/livepeer/go-livepeer/core" + "github.com/livepeer/go-livepeer/monitor" "github.com/livepeer/go-tools/drivers" "github.com/livepeer/lpms/stream" ) @@ -85,7 +86,11 @@ func processTextToImage(ctx context.Context, params aiRequestParams, req worker. func submitTextToImage(ctx context.Context, params aiRequestParams, sess *AISession, req worker.TextToImageJSONRequestBody) (*worker.ImageResponse, error) { client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient)) + if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "text-to-image", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } @@ -101,6 +106,9 @@ func submitTextToImage(ctx context.Context, params aiRequestParams, sess *AISess outPixels := int64(*req.Height) * int64(*req.Width) setHeaders, balUpdate, err := prepareAIPayment(ctx, sess, outPixels) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "text-to-image", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } defer completeBalanceUpdate(sess.BroadcastSession, balUpdate) @@ -109,6 +117,9 @@ func submitTextToImage(ctx context.Context, params aiRequestParams, sess *AISess resp, err := client.TextToImageWithResponse(ctx, req, setHeaders) took := time.Since(start) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "text-to-image", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } @@ -140,6 +151,15 @@ func submitTextToImage(ctx context.Context, params aiRequestParams, sess *AISess sess.LatencyScore = took.Seconds() / float64(outPixels) / (numImages * numInferenceSteps) + if monitor.Enabled { + var pricePerAIUnit float64 + if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil && priceInfo.PixelsPerUnit != 0 { + pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit) + } + + monitor.AIRequestFinished(ctx, "text-to-image", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo) + } + return resp.JSON200, nil } @@ -178,26 +198,41 @@ func submitImageToImage(ctx context.Context, params aiRequestParams, sess *AISes var buf bytes.Buffer mw, err := worker.NewImageToImageMultipartWriter(&buf, req) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "image-to-image", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient)) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "image-to-image", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } imageRdr, err := req.Image.Reader() if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "image-to-image", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } config, _, err := image.DecodeConfig(imageRdr) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "image-to-image", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } outPixels := int64(config.Height) * int64(config.Width) setHeaders, balUpdate, err := prepareAIPayment(ctx, sess, outPixels) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "image-to-image", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } defer completeBalanceUpdate(sess.BroadcastSession, balUpdate) @@ -206,6 +241,9 @@ func submitImageToImage(ctx context.Context, params aiRequestParams, sess *AISes resp, err := client.ImageToImageWithBodyWithResponse(ctx, mw.FormDataContentType(), &buf, setHeaders) took := time.Since(start) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "image-to-image", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } @@ -237,6 +275,15 @@ func submitImageToImage(ctx context.Context, params aiRequestParams, sess *AISes sess.LatencyScore = took.Seconds() / float64(outPixels) / (numImages * numInferenceSteps) + if monitor.Enabled { + var pricePerAIUnit float64 + if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil && priceInfo.PixelsPerUnit != 0 { + pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit) + } + + monitor.AIRequestFinished(ctx, "image-to-image", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo) + } + return resp.JSON200, nil } @@ -280,11 +327,17 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes var buf bytes.Buffer mw, err := worker.NewImageToVideoMultipartWriter(&buf, req) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "image-to-video", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient)) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "image-to-video", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } @@ -301,6 +354,9 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes outPixels := int64(*req.Height) * int64(*req.Width) * frames setHeaders, balUpdate, err := prepareAIPayment(ctx, sess, outPixels) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "image-to-video", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } defer completeBalanceUpdate(sess.BroadcastSession, balUpdate) @@ -309,12 +365,18 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes resp, err := client.ImageToVideoWithBody(ctx, mw.FormDataContentType(), &buf, setHeaders) took := time.Since(start) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "image-to-video", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } defer resp.Body.Close() data, err := io.ReadAll(resp.Body) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "image-to-video", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } @@ -329,6 +391,9 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes var res worker.ImageResponse if err := json.Unmarshal(data, &res); err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "image-to-video", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } @@ -341,6 +406,15 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes } sess.LatencyScore = took.Seconds() / float64(outPixels) / numInferenceSteps + if monitor.Enabled { + var pricePerAIUnit float64 + if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil && priceInfo.PixelsPerUnit != 0 { + pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit) + } + + monitor.AIRequestFinished(ctx, "image-to-video", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo) + } + return &res, nil } @@ -379,20 +453,32 @@ func submitUpscale(ctx context.Context, params aiRequestParams, sess *AISession, var buf bytes.Buffer mw, err := worker.NewUpscaleMultipartWriter(&buf, req) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "upscale", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient)) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "upscale", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } imageRdr, err := req.Image.Reader() if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "upscale", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } config, _, err := image.DecodeConfig(imageRdr) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "upscale", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } outPixels := int64(config.Height) * int64(config.Width) @@ -407,6 +493,9 @@ func submitUpscale(ctx context.Context, params aiRequestParams, sess *AISession, resp, err := client.UpscaleWithBodyWithResponse(ctx, mw.FormDataContentType(), &buf, setHeaders) took := time.Since(start) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error(), "upscale", *req.ModelId, sess.OrchestratorInfo) + } return nil, err } @@ -429,6 +518,15 @@ func submitUpscale(ctx context.Context, params aiRequestParams, sess *AISession, } sess.LatencyScore = took.Seconds() / float64(outPixels) / numInferenceSteps + if monitor.Enabled { + var pricePerAIUnit float64 + if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil && priceInfo.PixelsPerUnit != 0 { + pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit) + } + + monitor.AIRequestFinished(ctx, "upscale", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo) + } + return resp.JSON200, nil } @@ -554,6 +652,7 @@ func processAIRequest(ctx context.Context, params aiRequestParams, req interface default: return nil, fmt.Errorf("unsupported request type %T", req) } + capName, _ := core.CapabilityToName(cap) var resp interface{} @@ -564,7 +663,11 @@ func processAIRequest(ctx context.Context, params aiRequestParams, req interface for { select { case <-cctx.Done(): - return nil, &ServiceUnavailableError{err: fmt.Errorf("no orchestrators available within %v timeout", processingRetryTimeout)} + err := fmt.Errorf("no orchestrators available within %v timeout", processingRetryTimeout) + if monitor.Enabled { + monitor.AIRequestError(err.Error(), capName, modelID, nil) + } + return nil, &ServiceUnavailableError{err: err} default: } @@ -594,7 +697,11 @@ func processAIRequest(ctx context.Context, params aiRequestParams, req interface } if resp == nil { - return nil, &ServiceUnavailableError{err: errors.New("no orchestrators available")} + errMsg := "no orchestrators available" + if monitor.Enabled { + monitor.AIRequestError(errMsg, capName, modelID, nil) + } + return nil, &ServiceUnavailableError{err: errors.New(errMsg)} } return resp, nil } @@ -627,11 +734,21 @@ func prepareAIPayment(ctx context.Context, sess *AISession, outPixels int64) (wo payment, err := genPayment(ctx, sess.BroadcastSession, balUpdate.NumTickets) if err != nil { + clog.Errorf(ctx, "Could not create payment err=%q", err) + + if monitor.Enabled { + monitor.PaymentCreateError(ctx) + } + return nil, nil, err } // As soon as the request is sent to the orch consider the balance update's credit as spent balUpdate.Status = CreditSpent + if monitor.Enabled { + monitor.TicketValueSent(ctx, balUpdate.NewCredit) + monitor.TicketsSent(ctx, balUpdate.NumTickets) + } setHeaders := func(_ context.Context, req *http.Request) error { req.Header.Set(segmentHeader, segCreds)