Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ai): add AI orchestrator metrics #3097

Merged
merged 13 commits into from
Jul 18, 2024
Merged
74 changes: 63 additions & 11 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ func InitCensus(nodeType NodeType, version string) {
baseTagsWithEthAddr := baseTags
baseTagsWithManifestIDAndEthAddr := baseTags
baseTagsWithOrchInfo := baseTags
baseTagsWithGatewayInfo := baseTags
if PerStreamMetrics {
baseTagsWithManifestID = []tag.Key{census.kNodeID, census.kNodeType, census.kManifestID}
baseTagsWithEthAddr = []tag.Key{census.kNodeID, census.kNodeType, census.kSender}
Expand All @@ -391,8 +392,19 @@ func InitCensus(nodeType NodeType, version string) {
}
baseTagsWithManifestIDAndOrchInfo := baseTagsWithManifestID
baseTagsWithOrchInfo = append([]tag.Key{census.kOrchestratorURI, census.kOrchestratorAddress}, baseTags...)
baseTagsWithGatewayInfo = append([]tag.Key{census.kSender}, baseTags...)
baseTagsWithManifestIDAndOrchInfo = append([]tag.Key{census.kOrchestratorURI, census.kOrchestratorAddress}, baseTagsWithManifestID...)

// Add node type specific tags.
baseTagsWithNodeInfo := baseTags
aiRequestLatencyScoreTags := baseTags
if nodeType == Orchestrator {
baseTagsWithNodeInfo = baseTagsWithGatewayInfo
} else {
baseTagsWithNodeInfo = baseTagsWithOrchInfo
aiRequestLatencyScoreTags = baseTagsWithOrchInfo
}

views := []*view.View{
{
Name: "versions",
Expand Down Expand Up @@ -889,7 +901,7 @@ func InitCensus(nodeType NodeType, version string) {
Name: "ai_request_latency_score",
Measure: census.mAIRequestLatencyScore,
Description: "AI request latency score",
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...),
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, aiRequestLatencyScoreTags...),
Aggregation: view.LastValue(),
},
{
Expand All @@ -903,7 +915,7 @@ func InitCensus(nodeType NodeType, version string) {
Name: "ai_request_errors",
Measure: census.mAIRequestError,
Description: "Errors when processing AI requests",
TagKeys: append([]tag.Key{census.kErrorCode, census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...),
TagKeys: append([]tag.Key{census.kErrorCode, census.kPipeline, census.kModelName}, baseTagsWithNodeInfo...),
Aggregation: view.Sum(),
},
}
Expand Down Expand Up @@ -1760,13 +1772,6 @@ func RewardCallError(sender string) {
}
}

// AIRequestFinished records gateway AI job request metrics.
func AIRequestFinished(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) {
census.recordModelRequested(pipeline, model)
census.recordAIRequestLatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo)
census.recordAIRequestPricePerUnit(pipeline, model, jobInfo.PricePerUnit, orchInfo)
}

// recordModelRequested increments request count for a specific AI model and pipeline.
func (cen *censusMetricsCounter) recordModelRequested(pipeline, modelName string) {
cen.lock.Lock()
Expand All @@ -1778,6 +1783,13 @@ func (cen *censusMetricsCounter) recordModelRequested(pipeline, modelName string
}
}

// AIRequestFinished records gateway AI job request metrics.
func AIRequestFinished(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) {
census.recordModelRequested(pipeline, model)
census.recordAIRequestLatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo)
census.recordAIRequestPricePerUnit(pipeline, model, jobInfo.PricePerUnit)
}

// recordAIRequestLatencyScore records the latency score for a AI job request.
func (cen *censusMetricsCounter) recordAIRequestLatencyScore(Pipeline string, Model string, latencyScore float64, orchInfo *lpnet.OrchestratorInfo) {
cen.lock.Lock()
Expand All @@ -1791,12 +1803,12 @@ func (cen *censusMetricsCounter) recordAIRequestLatencyScore(Pipeline string, Mo
}

// recordAIRequestPricePerUnit records the price per unit for a AI job request.
func (cen *censusMetricsCounter) recordAIRequestPricePerUnit(Pipeline string, Model string, pricePerUnit float64, orchInfo *lpnet.OrchestratorInfo) {
func (cen *censusMetricsCounter) recordAIRequestPricePerUnit(Pipeline string, Model string, pricePerUnit float64) {
cen.lock.Lock()
defer cen.lock.Unlock()

if err := stats.RecordWithTags(cen.ctx,
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(cen.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())},
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model)},
cen.mAIRequestPrice.M(pricePerUnit)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
Expand All @@ -1816,6 +1828,46 @@ func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet.
}
}

// AIJobProcessed records orchestrator AI job processing metrics.
func AIJobProcessed(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo) {
census.recordModelRequested(pipeline, model)
census.recordAIJobLatencyScore(pipeline, model, jobInfo.LatencyScore)
census.recordAIJobPricePerUnit(pipeline, model, jobInfo.PricePerUnit)
}

// recordAIJobLatencyScore records the latency score for a processed AI job.
func (cen *censusMetricsCounter) recordAIJobLatencyScore(Pipeline string, Model string, latencyScore float64) {
cen.lock.Lock()
defer cen.lock.Unlock()

if err := stats.RecordWithTags(cen.ctx,
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model)},
cen.mAIRequestLatencyScore.M(latencyScore)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// recordAIJobPricePerUnit logs the cost per unit of a processed AI job.
func (cen *censusMetricsCounter) recordAIJobPricePerUnit(Pipeline string, Model string, pricePerUnit float64) {
cen.lock.Lock()
defer cen.lock.Unlock()

if err := stats.RecordWithTags(cen.ctx,
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model)},
cen.mAIRequestPrice.M(pricePerUnit)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// AIProcessingError logs errors in orchestrator AI job processing.
func AIProcessingError(code string, Pipeline string, Model string, sender string) {
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kSender, sender)},
census.mAIRequestError.M(1)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// Convert wei to gwei
func wei2gwei(wei *big.Int) float64 {
gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64()
Expand Down
30 changes: 30 additions & 0 deletions server/ai_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/livepeer/go-livepeer/clog"
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/core"
"github.com/livepeer/go-livepeer/monitor"
middleware "github.com/oapi-codegen/nethttp-middleware"
"github.com/oapi-codegen/runtime"
)
Expand Down Expand Up @@ -309,6 +310,9 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request
start := time.Now()
resp, err := submitFn(ctx)
if err != nil {
if monitor.Enabled {
monitor.AIProcessingError(err.Error(), pipeline, modelID, sender.Hex())
}
respondWithError(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -321,6 +325,32 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request
// If additional parameters that influence compute cost become configurable, then the formula should be reconsidered
orch.DebitFees(sender, manifestID, payment.GetExpectedPrice(), outPixels)

if monitor.Enabled {
var latencyScore float64
switch v := req.(type) {
case worker.TextToImageJSONRequestBody:
latencyScore = CalculateTextToImageLatencyScore(took, v, outPixels)
case worker.ImageToImageMultipartRequestBody:
latencyScore = CalculateImageToImageLatencyScore(took, v, outPixels)
case worker.ImageToVideoMultipartRequestBody:
latencyScore = CalculateImageToVideoLatencyScore(took, v, outPixels)
case worker.UpscaleMultipartRequestBody:
latencyScore = CalculateUpscaleLatencyScore(took, v, outPixels)
case worker.AudioToTextMultipartRequestBody:
durationSeconds, err := common.CalculateAudioDuration(v.Audio)
if err == nil {
latencyScore = CalculateAudioToTextLatencyScore(took, durationSeconds)
}
}

var pricePerAIUnit float64
if priceInfo := payment.GetExpectedPrice(); priceInfo != nil && priceInfo.GetPixelsPerUnit() != 0 {
pricePerAIUnit = float64(priceInfo.GetPricePerUnit()) / float64(priceInfo.GetPixelsPerUnit())
}

monitor.AIJobProcessed(ctx, pipeline, modelID, monitor.AIJobInfo{LatencyScore: latencyScore, PricePerUnit: pricePerAIUnit})
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(resp)
Expand Down
Loading
Loading