Skip to content

Commit

Permalink
feat(ai): add A2T gateway metrics (livepeer#3100)
Browse files Browse the repository at this point in the history
This commit adds the gateway metrics to the Audio-to-text pipeline.
  • Loading branch information
rickstaa authored and eliteprox committed Jul 26, 2024
1 parent 97c38dc commit f8a1cd0
Showing 1 changed file with 33 additions and 0 deletions.
33 changes: 33 additions & 0 deletions server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,9 @@ func submitUpscale(ctx context.Context, params aiRequestParams, sess *AISession,

setHeaders, balUpdate, err := prepareAIPayment(ctx, sess, outPixels)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "upscale", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}
defer completeBalanceUpdate(sess.BroadcastSession, balUpdate)
Expand Down Expand Up @@ -534,22 +537,34 @@ func submitAudioToText(ctx context.Context, params aiRequestParams, sess *AISess
var buf bytes.Buffer
mw, err := worker.NewAudioToTextMultipartWriter(&buf, req)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "audio-to-text", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}

client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient))
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "audio-to-text", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}

durationSeconds, err := common.CalculateAudioDuration(req.Audio)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "audio-to-text", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}

clog.V(common.VERBOSE).Infof(ctx, "Submitting audio-to-text media with duration: %d seconds", durationSeconds)
setHeaders, balUpdate, err := prepareAIPayment(ctx, sess, durationSeconds*1000)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "audio-to-text", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}
defer completeBalanceUpdate(sess.BroadcastSession, balUpdate)
Expand All @@ -558,12 +573,18 @@ func submitAudioToText(ctx context.Context, params aiRequestParams, sess *AISess
resp, err := client.AudioToTextWithBody(ctx, mw.FormDataContentType(), &buf, setHeaders)
took := time.Since(start)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "audio-to-text", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}
defer resp.Body.Close()

data, err := io.ReadAll(resp.Body)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "audio-to-text", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}

Expand All @@ -578,12 +599,24 @@ func submitAudioToText(ctx context.Context, params aiRequestParams, sess *AISess

var res worker.TextResponse
if err := json.Unmarshal(data, &res); err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "audio-to-text", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}

// TODO: Refine this rough estimate in future iterations
sess.LatencyScore = took.Seconds() / float64(durationSeconds)

if monitor.Enabled {
var pricePerAIUnit float64
if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil && priceInfo.PixelsPerUnit != 0 {
pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit)
}

monitor.AIRequestFinished(ctx, "audio-to-text", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo)
}

return &res, nil
}

Expand Down

0 comments on commit f8a1cd0

Please sign in to comment.