Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow Processor - DAG Execution Ordering #2599

Open
jem-davies opened this issue May 18, 2024 · 13 comments
Open

Workflow Processor - DAG Execution Ordering #2599

jem-davies opened this issue May 18, 2024 · 13 comments
Labels
enhancement processors Any tasks or issues relating specifically to processors

Comments

@jem-davies
Copy link

jem-davies commented May 18, 2024

Issue Description

The Benthos provided processor Workflow executes a DAG of Nodes, "performing them in parallel where possible".

However the current implementation uses this dependency solver and it takes the approach: resolve the DAG into series of steps where the steps are performed sequentially but the nodes in the step are performed in parallel.

This means that there can be a situation where a step could be waiting for all the nodes in the previous step: even though all dependencies for the step are ready.

Consider the following DAG, from the workflow processor docs:

      /--> B -------------|--> D
     /                   /
A --|          /--> E --|
     \--> C --|          \
               \----------|--> F

The dependency solver would resolve the DAG into: [ [ A ], [ B, C ], [ E ], [ D, F ] ].
When we consider the node E, we can see the that full dependency of this node would be : A -> C -> E, however in the stage before [ E ], there is the node B so in the current Benthos Workflow implementation E would not execute until B even though there is no dependency of B for E.

@jem-davies
Copy link
Author

jem-davies commented May 18, 2024

Added Draft MR #2600 to resolve / gather feedback.

@mihaitodor mihaitodor added enhancement processors Any tasks or issues relating specifically to processors labels May 23, 2024
@jem-davies
Copy link
Author

jem-davies commented May 25, 2024

Closed Draft MR #2600 as the commit messages didn't contain a signed-off-by line.

Opened new Draft MR #2607 for feedback.

@jem-davies
Copy link
Author

The MR is a draft MR because I am making breaking changes to the way workflow is configured so rather than:

pipeline:
  processors:
    - workflow:
        order: [ [ A ], [ B] ]

        branches:
          A:
            processors:
              #...

          B:
            processors:
              #...
pipeline:
  processors:
    - workflow_v2:

        branches:
          A:
            processors:
              #...

          B:
            dependency_list: ["A"]
            processors:
              #...

@AndreasBergmeier6176
Copy link
Contributor

Can we then please also get rid of request_map and result_map?

@jem-davies
Copy link
Author

@AndreasBergmeier6176 - won't we still want to be able to use request_map & result_map to work on a subset of the message in that branch?

@AndreasBergmeier6176
Copy link
Contributor

@AndreasBergmeier6176 - won't we still want to be able to use request_map & result_map to work on a subset of the message in that branch?

So far do not really understand why you would want request_map in the first place. Why not simply use mapping?

@jem-davies
Copy link
Author

In the original workflow processor it "executes a topology of branch processors".

In a branch the request_map and results_map enable you work on a new message based on those mappings - it's kind of like a different context to the main message that is being passed along the stream.

A mapping processor will replace the message with contents of the message.

I don't wish to alter this as a part of the PR.

@AndreasBergmeier6176
Copy link
Contributor

In a branch the request_map and results_map enable you work on a new message based on those mappings - it's kind of like a different context to the main message that is being passed along the stream.
A mapping processor will replace the message with contents of the message.

Maybe it is me not being a native speaker, but I did not get that difference from the docs neither from the attributes.

Would it make sense to rename e.g. request_map to local_message_map?

@jem-davies
Copy link
Author

jem-davies commented Jun 4, 2024

I think that you might find a number of tokens perhaps don't make sense - one in particular is the unarchive processor - I feel that it is poorly named.

You could submit a PR to do a renaming.

EDIT: Thanks for checking out my PR 😄

@AndreasBergmeier6176
Copy link
Contributor

Within this new design you are now explicitly "drawing" the graph vertices via dependency_list.
If you are doing that anyway, I think I would find it more transparent to tell what data is passed and where:

pipeline:
  processors:
    - workflow_v2:

        branches:
          A:
            processors:
              #...
            push_mapping: |
              branch("B") = this.foo

          B:
            dependency_list: ["A"]
            processors:
              #...
            push_branch: ["C"] # Passes message as is to C
          C:
            processors:
              #...

@jem-davies
Copy link
Author

jem-davies commented Jun 4, 2024

@AndreasBergmeier6176

I think that you might be confused about how the original workflow processor worked - and how this works as well - as I haven't changed anything that pertains to provenance of what data is passed where.

In the original workflow processor and this v2: all the nodes get the message passed to them. - the message isn't passed along the nodes of the DAG - in the way you perhaps are thinking.

What you can do with the original workflow processor is use the request_map field to enable redpanda-connect to infer the DAG - but doing this will still resolve it into a 2D array - and will still have a problem where in certain DAGs you will find that a Node isn't executed at the time it could.

When the node is ready to run it gets the contents of the entire message given to it at that time - you can use the request_map and results_map to create a separate "context" where fields from the incoming message can be mapped to this new context and after explicitly map fields back to the message of the data-stream.

You then realise that to fix the DAG execution it is required to use a different data structure then the one is being used by the original workflow processor, this new proposed data structure is a mapping of nodes -> dependency_list and could be separate from the branches part of the config - however to avoid duplication I have included a new field for the definition of a branch.

@jem-davies
Copy link
Author

I also am thinking that it would be better - (though I am happy to be told otherwise) - we get rid of inferring the DAG from request_maps in the new workflow_v2 processor.

I think it makes more sense to have the user explicitly state the dependents of the DAG in the config.

@jem-davies
Copy link
Author

I have started to think that making this alteration to the way the config is specified - is better and that the PR should be resolved with a new workflow_v2 processor.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement processors Any tasks or issues relating specifically to processors
Projects
None yet
Development

No branches or pull requests

3 participants