diff --git a/pkg/manager/impl/metrics_manager.go b/pkg/manager/impl/metrics_manager.go index a6d010b1e..5c0a09684 100644 --- a/pkg/manager/impl/metrics_manager.go +++ b/pkg/manager/impl/metrics_manager.go @@ -20,6 +20,9 @@ import ( "github.com/golang/protobuf/ptypes/timestamp" "google.golang.org/protobuf/types/known/timestamppb" + + dataInterfaces "github.com/flyteorg/flyteadmin/pkg/data/interfaces" + "github.com/flyteorg/flytestdlib/storage" ) const ( @@ -59,6 +62,8 @@ type MetricsManager struct { nodeExecutionManager interfaces.NodeExecutionInterface taskExecutionManager interfaces.TaskExecutionInterface metrics metrics + urlData dataInterfaces.RemoteURLInterface + storageClient *storage.DataStore } // createOperationSpan returns a Span defined by the provided arguments. @@ -663,13 +668,39 @@ func (m *MetricsManager) GetExecutionMetrics(ctx context.Context, return &admin.WorkflowExecutionGetMetricsResponse{Span: span}, nil } +func (m *MetricsManager) GetTaskMetrics(ctx context.Context, + request admin.GetTaskMetricsRequest) (*admin.GetTaskMetricsResponse, error) { + + nodeExecution, err := m.nodeExecutionManager.GetNodeExecution(ctx, admin.NodeExecutionGetRequest{ + Id: request.Id, + }, + ) + if err != nil { + return nil, err + } + + blob, err := m.urlData.Get(ctx, nodeExecution.Closure.SpanUri) + if err != nil { + return nil, err + } + + var flyteKitSpan core.Span + err = m.storageClient.ReadProtobuf(ctx, storage.DataReference(blob.Url), &flyteKitSpan) + if err != nil { + return nil, err + } + + return &admin.GetTaskMetricsResponse{Span: &flyteKitSpan}, nil +} + // NewMetricsManager returns a new MetricsManager constructed with the provided arguments. func NewMetricsManager( workflowManager interfaces.WorkflowInterface, executionManager interfaces.ExecutionInterface, nodeExecutionManager interfaces.NodeExecutionInterface, taskExecutionManager interfaces.TaskExecutionInterface, - scope promutils.Scope) interfaces.MetricsInterface { + scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface, storageClient *storage.DataStore, +) interfaces.MetricsInterface { metrics := metrics{ Scope: scope, } @@ -680,5 +711,7 @@ func NewMetricsManager( nodeExecutionManager: nodeExecutionManager, taskExecutionManager: taskExecutionManager, metrics: metrics, + urlData: urlData, + storageClient: storageClient, } } diff --git a/pkg/manager/interfaces/metrics.go b/pkg/manager/interfaces/metrics.go index d726cdc99..c07d8adb9 100644 --- a/pkg/manager/interfaces/metrics.go +++ b/pkg/manager/interfaces/metrics.go @@ -12,4 +12,7 @@ import ( type MetricsInterface interface { GetExecutionMetrics(ctx context.Context, request admin.WorkflowExecutionGetMetricsRequest) ( *admin.WorkflowExecutionGetMetricsResponse, error) + + GetTaskMetrics(ctx context.Context, request admin.GetTaskMetricsRequest) ( + *admin.GetTaskMetricsResponse, error) } diff --git a/pkg/manager/mocks/metrics_interface.go b/pkg/manager/mocks/metrics_interface.go index 2e292593e..e9c95a62b 100644 --- a/pkg/manager/mocks/metrics_interface.go +++ b/pkg/manager/mocks/metrics_interface.go @@ -55,3 +55,44 @@ func (_m *MetricsInterface) GetExecutionMetrics(ctx context.Context, request adm return r0, r1 } + +type MetricsInterface_GetTaskMetrics struct { + *mock.Call +} + +func (_m MetricsInterface_GetTaskMetrics) Return(_a0 *admin.GetTaskMetricsRequest, _a1 error) *MetricsInterface_GetTaskMetrics { + return &MetricsInterface_GetTaskMetrics{Call: _m.Call.Return(_a0, _a1)} +} + +func (_m *MetricsInterface) OnGetTaskMetrics(ctx context.Context, request admin.GetTaskMetricsRequest) *MetricsInterface_GetTaskMetrics { + c_call := _m.On("GetTaskMetrics", ctx, request) + return &MetricsInterface_GetTaskMetrics{Call: c_call} +} + +func (_m *MetricsInterface) OnGetTaskMetricsMatch(matchers ...interface{}) *MetricsInterface_GetTaskMetrics { + c_call := _m.On("GetTaskMetrics", matchers...) + return &MetricsInterface_GetTaskMetrics{Call: c_call} +} + +// GetTaskMetrics provides a mock function with given fields: ctx, request +func (_m *MetricsInterface) GetTaskMetrics(ctx context.Context, request admin.GetTaskMetricsRequest) (*admin.GetTaskMetricsRequest, error) { + ret := _m.Called(ctx, request) + + var r0 *admin.GetTaskMetricsRequest + if rf, ok := ret.Get(0).(func(context.Context, admin.GetTaskMetricsRequest) *admin.GetTaskMetricsRequest); ok { + r0 = rf(ctx, request) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*admin.GetTaskMetricsRequest) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, admin.GetTaskMetricsRequest) error); ok { + r1 = rf(ctx, request) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/pkg/repositories/transformers/node_execution.go b/pkg/repositories/transformers/node_execution.go index f1d90361f..4da95907d 100644 --- a/pkg/repositories/transformers/node_execution.go +++ b/pkg/repositories/transformers/node_execution.go @@ -98,6 +98,7 @@ func addTerminalState( nodeExecutionModel.ErrorCode = &request.Event.GetError().Code } closure.DeckUri = request.Event.DeckUri + closure.SpanUri = request.Event.SpanUri return nil } diff --git a/pkg/rpc/adminservice/base.go b/pkg/rpc/adminservice/base.go index 77c78f480..abca0b8e5 100644 --- a/pkg/rpc/adminservice/base.go +++ b/pkg/rpc/adminservice/base.go @@ -178,7 +178,7 @@ func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, confi ProjectManager: manager.NewProjectManager(repo, configuration), ResourceManager: resources.NewResourceManager(repo, configuration.ApplicationConfiguration()), MetricsManager: manager.NewMetricsManager(workflowManager, executionManager, nodeExecutionManager, - taskExecutionManager, adminScope.NewSubScope("metrics_manager")), + taskExecutionManager, adminScope.NewSubScope("metrics_manager"), urlData, dataStorageClient), Metrics: InitMetrics(adminScope), } } diff --git a/pkg/rpc/adminservice/execution.go b/pkg/rpc/adminservice/execution.go index 87e2fe487..2db3d1766 100644 --- a/pkg/rpc/adminservice/execution.go +++ b/pkg/rpc/adminservice/execution.go @@ -159,6 +159,24 @@ func (m *AdminService) GetExecutionMetrics( return response, nil } +func (m *AdminService) GetTaskMetrics( + ctx context.Context, request *admin.GetTaskMetricsRequest) (*admin.GetTaskMetricsResponse, error) { + defer m.interceptPanic(ctx, request) + if request == nil { + return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") + } + var response *admin.GetTaskMetricsResponse + var err error + m.Metrics.executionEndpointMetrics.GetTaskMetrics.Time(func() { + response, err = m.MetricsManager.GetTaskMetrics(ctx, *request) + }) + if err != nil { + return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.GetTaskMetrics) + } + m.Metrics.executionEndpointMetrics.GetTaskMetrics.Success() + return response, nil +} + func (m *AdminService) ListExecutions( ctx context.Context, request *admin.ResourceListRequest) (*admin.ExecutionList, error) { defer m.interceptPanic(ctx, request) diff --git a/pkg/rpc/adminservice/metrics.go b/pkg/rpc/adminservice/metrics.go index b2bab4514..68d9296cc 100644 --- a/pkg/rpc/adminservice/metrics.go +++ b/pkg/rpc/adminservice/metrics.go @@ -10,16 +10,17 @@ import ( type executionEndpointMetrics struct { scope promutils.Scope - create util.RequestMetrics - relaunch util.RequestMetrics - recover util.RequestMetrics - createEvent util.RequestMetrics - get util.RequestMetrics - update util.RequestMetrics - getData util.RequestMetrics - getMetrics util.RequestMetrics - list util.RequestMetrics - terminate util.RequestMetrics + create util.RequestMetrics + relaunch util.RequestMetrics + recover util.RequestMetrics + createEvent util.RequestMetrics + get util.RequestMetrics + update util.RequestMetrics + getData util.RequestMetrics + getMetrics util.RequestMetrics + GetTaskMetrics util.RequestMetrics + list util.RequestMetrics + terminate util.RequestMetrics } type launchPlanEndpointMetrics struct { @@ -131,17 +132,18 @@ func InitMetrics(adminScope promutils.Scope) AdminMetrics { "panics encountered while handling requests to the admin service"), executionEndpointMetrics: executionEndpointMetrics{ - scope: adminScope, - create: util.NewRequestMetrics(adminScope, "create_execution"), - relaunch: util.NewRequestMetrics(adminScope, "relaunch_execution"), - recover: util.NewRequestMetrics(adminScope, "recover_execution"), - createEvent: util.NewRequestMetrics(adminScope, "create_execution_event"), - get: util.NewRequestMetrics(adminScope, "get_execution"), - update: util.NewRequestMetrics(adminScope, "update_execution"), - getData: util.NewRequestMetrics(adminScope, "get_execution_data"), - getMetrics: util.NewRequestMetrics(adminScope, "get_execution_metrics"), - list: util.NewRequestMetrics(adminScope, "list_execution"), - terminate: util.NewRequestMetrics(adminScope, "terminate_execution"), + scope: adminScope, + create: util.NewRequestMetrics(adminScope, "create_execution"), + relaunch: util.NewRequestMetrics(adminScope, "relaunch_execution"), + recover: util.NewRequestMetrics(adminScope, "recover_execution"), + createEvent: util.NewRequestMetrics(adminScope, "create_execution_event"), + get: util.NewRequestMetrics(adminScope, "get_execution"), + update: util.NewRequestMetrics(adminScope, "update_execution"), + getData: util.NewRequestMetrics(adminScope, "get_execution_data"), + getMetrics: util.NewRequestMetrics(adminScope, "get_execution_metrics"), + GetTaskMetrics: util.NewRequestMetrics(adminScope, "get_flytekit_metrics"), + list: util.NewRequestMetrics(adminScope, "list_execution"), + terminate: util.NewRequestMetrics(adminScope, "terminate_execution"), }, launchPlanEndpointMetrics: launchPlanEndpointMetrics{ scope: adminScope,