diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index e50d049f59c29..383624b569bb6 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -76,7 +76,6 @@ import type { IPollFunctions, IRequestOptions, IRunExecutionData, - ISourceData, ITaskData, ITaskDataConnections, ITriggerFunctions, @@ -166,7 +165,13 @@ import { extractValue } from './ExtractValue'; import { InstanceSettings } from './InstanceSettings'; import type { ExtendedValidationResult, IResponseError } from './Interfaces'; // eslint-disable-next-line import/no-cycle -import { HookContext, PollContext, TriggerContext, WebhookContext } from './node-execution-context'; +import { + ExecuteSingleContext, + HookContext, + PollContext, + TriggerContext, + WebhookContext, +} from './node-execution-context'; import { getSecretsProxy } from './Secrets'; import { SSHClientsManager } from './SSHClientsManager'; @@ -4180,145 +4185,19 @@ export function getExecuteSingleFunctions( mode: WorkflowExecuteMode, abortSignal?: AbortSignal, ): IExecuteSingleFunctions { - return ((workflow, runExecutionData, connectionInputData, inputData, node, itemIndex) => { - return { - ...getCommonWorkflowFunctions(workflow, node, additionalData), - ...executionCancellationFunctions(abortSignal), - continueOnFail: () => continueOnFail(node), - evaluateExpression: (expression: string, evaluateItemIndex: number | undefined) => { - evaluateItemIndex = evaluateItemIndex === undefined ? itemIndex : evaluateItemIndex; - return workflow.expression.resolveSimpleParameterValue( - `=${expression}`, - {}, - runExecutionData, - runIndex, - evaluateItemIndex, - node.name, - connectionInputData, - mode, - getAdditionalKeys(additionalData, mode, runExecutionData), - executeData, - ); - }, - getContext(type: ContextType): IContextObject { - return NodeHelpers.getContext(runExecutionData, type, node); - }, - getCredentials: async (type) => - await getCredentials( - workflow, - node, - type, - additionalData, - mode, - executeData, - runExecutionData, - runIndex, - connectionInputData, - itemIndex, - ), - getInputData: (inputIndex = 0, inputName = 'main') => { - if (!inputData.hasOwnProperty(inputName)) { - // Return empty array because else it would throw error when nothing is connected to input - return { json: {} }; - } - - // TODO: Check if nodeType has input with that index defined - if (inputData[inputName].length < inputIndex) { - throw new ApplicationError('Could not get input index', { - extra: { inputIndex, inputName }, - }); - } - - const allItems = inputData[inputName][inputIndex]; - - if (allItems === null) { - throw new ApplicationError('Input index was not set', { - extra: { inputIndex, inputName }, - }); - } - - if (allItems[itemIndex] === null) { - throw new ApplicationError('Value of input with given index was not set', { - extra: { inputIndex, inputName, itemIndex }, - }); - } - - return allItems[itemIndex]; - }, - getInputSourceData: (inputIndex = 0, inputName = 'main') => { - if (executeData?.source === null) { - // Should never happen as n8n sets it automatically - throw new ApplicationError('Source data is missing'); - } - return executeData.source[inputName][inputIndex] as ISourceData; - }, - getItemIndex: () => itemIndex, - getMode: () => mode, - getExecuteData: () => executeData, - getNodeParameter: ( - parameterName: string, - fallbackValue?: any, - options?: IGetNodeParameterOptions, - ): NodeParameterValueType | object => { - return getNodeParameter( - workflow, - runExecutionData, - runIndex, - connectionInputData, - node, - parameterName, - itemIndex, - mode, - getAdditionalKeys(additionalData, mode, runExecutionData), - executeData, - fallbackValue, - options, - ); - }, - getWorkflowDataProxy: (): IWorkflowDataProxyData => { - const dataProxy = new WorkflowDataProxy( - workflow, - runExecutionData, - runIndex, - itemIndex, - node.name, - connectionInputData, - {}, - mode, - getAdditionalKeys(additionalData, mode, runExecutionData), - executeData, - ); - return dataProxy.getDataProxy(); - }, - helpers: { - createDeferredPromise, - returnJsonArray, - ...getRequestHelperFunctions( - workflow, - node, - additionalData, - runExecutionData, - connectionInputData, - ), - ...getBinaryHelperFunctions(additionalData, workflow.id), - - assertBinaryData: (propertyName, inputIndex = 0) => - assertBinaryData(inputData, node, itemIndex, propertyName, inputIndex), - getBinaryDataBuffer: async (propertyName, inputIndex = 0) => - await getBinaryDataBuffer(inputData, itemIndex, propertyName, inputIndex), - }, - logAiEvent: (eventName: AiEvent, msg: string) => { - return additionalData.logAiEvent(eventName, { - executionId: additionalData.executionId ?? 'unsaved-execution', - nodeName: node.name, - workflowName: workflow.name ?? 'Unnamed workflow', - nodeType: node.type, - workflowId: workflow.id ?? 'unsaved-workflow', - msg, - }); - }, - }; - })(workflow, runExecutionData, connectionInputData, inputData, node, itemIndex); + return new ExecuteSingleContext( + workflow, + node, + additionalData, + mode, + runExecutionData, + runIndex, + connectionInputData, + inputData, + itemIndex, + executeData, + abortSignal, + ); } export function getCredentialTestFunctions(): ICredentialTestFunctions { diff --git a/packages/core/src/node-execution-context/__tests__/execute-single-context.test.ts b/packages/core/src/node-execution-context/__tests__/execute-single-context.test.ts new file mode 100644 index 0000000000000..dcd8509c1d644 --- /dev/null +++ b/packages/core/src/node-execution-context/__tests__/execute-single-context.test.ts @@ -0,0 +1,301 @@ +import { mock } from 'jest-mock-extended'; +import type { + INode, + IWorkflowExecuteAdditionalData, + IRunExecutionData, + INodeExecutionData, + ITaskDataConnections, + IExecuteData, + Workflow, + WorkflowExecuteMode, + ICredentialsHelper, + Expression, + INodeType, + INodeTypes, + OnError, + ContextType, + IContextObject, + ICredentialDataDecryptedObject, + ISourceData, +} from 'n8n-workflow'; +import { ApplicationError, NodeHelpers } from 'n8n-workflow'; + +import { ExecuteSingleContext } from '../execute-single-context'; + +describe('ExecuteSingleContext', () => { + const testCredentialType = 'testCredential'; + const nodeType = mock({ + description: { + credentials: [ + { + name: testCredentialType, + required: true, + }, + ], + properties: [ + { + name: 'testParameter', + required: true, + }, + ], + }, + }); + const nodeTypes = mock(); + const expression = mock(); + const workflow = mock({ expression, nodeTypes }); + const node = mock({ + credentials: { + [testCredentialType]: { + id: 'testCredentialId', + }, + }, + }); + node.parameters = { + testParameter: 'testValue', + }; + const credentialsHelper = mock(); + const additionalData = mock({ credentialsHelper }); + const mode: WorkflowExecuteMode = 'manual'; + const runExecutionData = mock(); + const connectionInputData = mock(); + const inputData: ITaskDataConnections = { main: [[{ json: { test: 'data' } }]] }; + const executeData = mock(); + const runIndex = 0; + const itemIndex = 0; + const abortSignal = mock(); + + const executeSingleContext = new ExecuteSingleContext( + workflow, + node, + additionalData, + mode, + runExecutionData, + runIndex, + connectionInputData, + inputData, + itemIndex, + executeData, + abortSignal, + ); + + beforeEach(() => { + nodeTypes.getByNameAndVersion.mockReturnValue(nodeType); + expression.getParameterValue.mockImplementation((value) => value); + }); + + describe('getExecutionCancelSignal', () => { + it('should return the abort signal', () => { + expect(executeSingleContext.getExecutionCancelSignal()).toBe(abortSignal); + }); + }); + + describe('continueOnFail', () => { + afterEach(() => { + node.onError = undefined; + node.continueOnFail = false; + }); + + it('should return false for nodes by default', () => { + expect(executeSingleContext.continueOnFail()).toEqual(false); + }); + + it('should return true if node has continueOnFail set to true', () => { + node.continueOnFail = true; + expect(executeSingleContext.continueOnFail()).toEqual(true); + }); + + test.each([ + ['continueRegularOutput', true], + ['continueErrorOutput', true], + ['stopWorkflow', false], + ])('if node has onError set to %s, it should return %s', (onError, expected) => { + node.onError = onError as OnError; + expect(executeSingleContext.continueOnFail()).toEqual(expected); + }); + }); + + describe('evaluateExpression', () => { + it('should evaluate the expression correctly', () => { + const expression = '$json.test'; + const expectedResult = 'data'; + const resolveSimpleParameterValueSpy = jest.spyOn( + workflow.expression, + 'resolveSimpleParameterValue', + ); + resolveSimpleParameterValueSpy.mockReturnValue(expectedResult); + + expect(executeSingleContext.evaluateExpression(expression, itemIndex)).toEqual( + expectedResult, + ); + + expect(resolveSimpleParameterValueSpy).toHaveBeenCalledWith( + `=${expression}`, + {}, + runExecutionData, + runIndex, + itemIndex, + node.name, + connectionInputData, + mode, + expect.objectContaining({}), + executeData, + ); + + resolveSimpleParameterValueSpy.mockRestore(); + }); + }); + + describe('getContext', () => { + it('should return the context object', () => { + const contextType: ContextType = 'node'; + const expectedContext = mock(); + const getContextSpy = jest.spyOn(NodeHelpers, 'getContext'); + getContextSpy.mockReturnValue(expectedContext); + + expect(executeSingleContext.getContext(contextType)).toEqual(expectedContext); + + expect(getContextSpy).toHaveBeenCalledWith(runExecutionData, contextType, node); + + getContextSpy.mockRestore(); + }); + }); + + describe('getInputData', () => { + const inputIndex = 0; + const inputName = 'main'; + + afterEach(() => { + inputData[inputName] = [[{ json: { test: 'data' } }]]; + }); + + it('should return the input data correctly', () => { + const expectedData = { json: { test: 'data' } }; + + expect(executeSingleContext.getInputData(inputIndex, inputName)).toEqual(expectedData); + }); + + it('should return an empty object if the input name does not exist', () => { + const inputName = 'nonExistent'; + const expectedData = { json: {} }; + + expect(executeSingleContext.getInputData(inputIndex, inputName)).toEqual(expectedData); + }); + + it('should throw an error if the input index is out of range', () => { + const inputIndex = 1; + + expect(() => executeSingleContext.getInputData(inputIndex, inputName)).toThrow( + ApplicationError, + ); + }); + + it('should throw an error if the input index was not set', () => { + inputData.main[inputIndex] = null; + + expect(() => executeSingleContext.getInputData(inputIndex, inputName)).toThrow( + ApplicationError, + ); + }); + + it('should throw an error if the value of input with given index was not set', () => { + delete inputData.main[inputIndex]![itemIndex]; + + expect(() => executeSingleContext.getInputData(inputIndex, inputName)).toThrow( + ApplicationError, + ); + }); + }); + + describe('getItemIndex', () => { + it('should return the item index correctly', () => { + expect(executeSingleContext.getItemIndex()).toEqual(itemIndex); + }); + }); + + describe('getNodeParameter', () => { + beforeEach(() => { + nodeTypes.getByNameAndVersion.mockReturnValue(nodeType); + expression.getParameterValue.mockImplementation((value) => value); + }); + + it('should return parameter value when it exists', () => { + const parameter = executeSingleContext.getNodeParameter('testParameter'); + + expect(parameter).toBe('testValue'); + }); + + it('should return the fallback value when the parameter does not exist', () => { + const parameter = executeSingleContext.getNodeParameter('otherParameter', 'fallback'); + + expect(parameter).toBe('fallback'); + }); + }); + + describe('getCredentials', () => { + it('should get decrypted credentials', async () => { + nodeTypes.getByNameAndVersion.mockReturnValue(nodeType); + credentialsHelper.getDecrypted.mockResolvedValue({ secret: 'token' }); + + const credentials = + await executeSingleContext.getCredentials( + testCredentialType, + ); + + expect(credentials).toEqual({ secret: 'token' }); + }); + }); + + describe('getExecuteData', () => { + it('should return the execute data correctly', () => { + expect(executeSingleContext.getExecuteData()).toEqual(executeData); + }); + }); + + describe('getWorkflowDataProxy', () => { + it('should return the workflow data proxy correctly', () => { + const workflowDataProxy = executeSingleContext.getWorkflowDataProxy(); + expect(workflowDataProxy.isProxy).toBe(true); + expect(Object.keys(workflowDataProxy.$input)).toEqual([ + 'all', + 'context', + 'first', + 'item', + 'last', + 'params', + ]); + }); + }); + + describe('getInputSourceData', () => { + it('should return the input source data correctly', () => { + const inputSourceData = mock(); + executeData.source = { main: [inputSourceData] }; + + expect(executeSingleContext.getInputSourceData()).toEqual(inputSourceData); + }); + + it('should throw an error if the source data is missing', () => { + executeData.source = null; + + expect(() => executeSingleContext.getInputSourceData()).toThrow(ApplicationError); + }); + }); + + describe('logAiEvent', () => { + it('should log the AI event correctly', () => { + const eventName = 'ai-tool-called'; + const msg = 'test message'; + + executeSingleContext.logAiEvent(eventName, msg); + + expect(additionalData.logAiEvent).toHaveBeenCalledWith(eventName, { + executionId: additionalData.executionId, + nodeName: node.name, + workflowName: workflow.name, + nodeType: node.type, + workflowId: workflow.id, + msg, + }); + }); + }); +}); diff --git a/packages/core/src/node-execution-context/execute-single-context.ts b/packages/core/src/node-execution-context/execute-single-context.ts new file mode 100644 index 0000000000000..6d8ef2a083afd --- /dev/null +++ b/packages/core/src/node-execution-context/execute-single-context.ts @@ -0,0 +1,212 @@ +import type { + ICredentialDataDecryptedObject, + IGetNodeParameterOptions, + INode, + INodeExecutionData, + IRunExecutionData, + IExecuteSingleFunctions, + IWorkflowExecuteAdditionalData, + Workflow, + WorkflowExecuteMode, + ITaskDataConnections, + IExecuteData, + ContextType, + AiEvent, + ISourceData, +} from 'n8n-workflow'; +import { + ApplicationError, + createDeferredPromise, + NodeHelpers, + WorkflowDataProxy, +} from 'n8n-workflow'; + +// eslint-disable-next-line import/no-cycle +import { + assertBinaryData, + continueOnFail, + getAdditionalKeys, + getBinaryDataBuffer, + getCredentials, + getNodeParameter, + returnJsonArray, +} from '@/NodeExecuteFunctions'; + +import { BinaryHelpers } from './helpers/binary-helpers'; +import { RequestHelpers } from './helpers/request-helpers'; +import { NodeExecutionContext } from './node-execution-context'; + +export class ExecuteSingleContext extends NodeExecutionContext implements IExecuteSingleFunctions { + readonly helpers: IExecuteSingleFunctions['helpers']; + + constructor( + workflow: Workflow, + node: INode, + additionalData: IWorkflowExecuteAdditionalData, + mode: WorkflowExecuteMode, + private readonly runExecutionData: IRunExecutionData, + private readonly runIndex: number, + private readonly connectionInputData: INodeExecutionData[], + private readonly inputData: ITaskDataConnections, + private readonly itemIndex: number, + private readonly executeData: IExecuteData, + private readonly abortSignal?: AbortSignal, + ) { + super(workflow, node, additionalData, mode); + + this.helpers = { + createDeferredPromise, + returnJsonArray, + ...new BinaryHelpers(workflow, additionalData).exported, + ...new RequestHelpers(this, workflow, node, additionalData).exported, + + assertBinaryData: (propertyName, inputIndex = 0) => + assertBinaryData(inputData, node, itemIndex, propertyName, inputIndex), + getBinaryDataBuffer: async (propertyName, inputIndex = 0) => + await getBinaryDataBuffer(inputData, itemIndex, propertyName, inputIndex), + }; + } + + getExecutionCancelSignal() { + return this.abortSignal; + } + + onExecutionCancellation(handler: () => unknown) { + const fn = () => { + this.abortSignal?.removeEventListener('abort', fn); + handler(); + }; + this.abortSignal?.addEventListener('abort', fn); + } + + continueOnFail() { + return continueOnFail(this.node); + } + + evaluateExpression(expression: string, evaluateItemIndex: number | undefined) { + evaluateItemIndex = evaluateItemIndex ?? this.itemIndex; + return this.workflow.expression.resolveSimpleParameterValue( + `=${expression}`, + {}, + this.runExecutionData, + this.runIndex, + evaluateItemIndex, + this.node.name, + this.connectionInputData, + this.mode, + getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData), + this.executeData, + ); + } + + getContext(type: ContextType) { + return NodeHelpers.getContext(this.runExecutionData, type, this.node); + } + + getInputData(inputIndex = 0, inputName = 'main') { + if (!this.inputData.hasOwnProperty(inputName)) { + // Return empty array because else it would throw error when nothing is connected to input + return { json: {} }; + } + + // TODO: Check if nodeType has input with that index defined + if (this.inputData[inputName].length < inputIndex) { + throw new ApplicationError('Could not get input index', { + extra: { inputIndex, inputName }, + }); + } + + const allItems = this.inputData[inputName][inputIndex]; + + if (allItems === null || allItems === undefined) { + throw new ApplicationError('Input index was not set', { + extra: { inputIndex, inputName }, + }); + } + + const data = allItems[this.itemIndex]; + if (data === null || data === undefined) { + throw new ApplicationError('Value of input with given index was not set', { + extra: { inputIndex, inputName, itemIndex: this.itemIndex }, + }); + } + + return data; + } + + getItemIndex() { + return this.itemIndex; + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + getNodeParameter(parameterName: string, fallbackValue?: any, options?: IGetNodeParameterOptions) { + return getNodeParameter( + this.workflow, + this.runExecutionData, + this.runIndex, + this.connectionInputData, + this.node, + parameterName, + this.itemIndex, + this.mode, + getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData), + this.executeData, + fallbackValue, + options, + ); + } + + // TODO: extract out in a BaseExecutionContext + async getCredentials(type: string) { + return await getCredentials( + this.workflow, + this.node, + type, + this.additionalData, + this.mode, + this.executeData, + this.runExecutionData, + this.runIndex, + this.connectionInputData, + this.itemIndex, + ); + } + + getExecuteData() { + return this.executeData; + } + + getWorkflowDataProxy() { + return new WorkflowDataProxy( + this.workflow, + this.runExecutionData, + this.runIndex, + this.itemIndex, + this.node.name, + this.connectionInputData, + {}, + this.mode, + getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData), + this.executeData, + ).getDataProxy(); + } + + getInputSourceData(inputIndex = 0, inputName = 'main'): ISourceData { + if (this.executeData?.source === null) { + // Should never happen as n8n sets it automatically + throw new ApplicationError('Source data is missing'); + } + return this.executeData.source[inputName][inputIndex] as ISourceData; + } + + logAiEvent(eventName: AiEvent, msg: string) { + return this.additionalData.logAiEvent(eventName, { + executionId: this.additionalData.executionId ?? 'unsaved-execution', + nodeName: this.node.name, + workflowName: this.workflow.name ?? 'Unnamed workflow', + nodeType: this.node.type, + workflowId: this.workflow.id ?? 'unsaved-workflow', + msg, + }); + } +} diff --git a/packages/core/src/node-execution-context/index.ts b/packages/core/src/node-execution-context/index.ts index a6397c60ced9e..d5c663b2abe28 100644 --- a/packages/core/src/node-execution-context/index.ts +++ b/packages/core/src/node-execution-context/index.ts @@ -1,4 +1,5 @@ // eslint-disable-next-line import/no-cycle +export { ExecuteSingleContext } from './execute-single-context'; export { HookContext } from './hook-context'; export { LoadOptionsContext } from './load-options-context'; export { PollContext } from './poll-context';