Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mango): rolling execution statistics (exploration) #4735

Closed
wants to merge 1 commit into from

Conversation

pgj
Copy link
Contributor

@pgj pgj commented Aug 22, 2023

Overview

In case of map-reduce views, the arrival of the complete message is not guaranteed for the view callback (at the shard) when a stop is issued during the aggregation (at the coordinator). Due to that, internally collected shard-level statistics may not be fed back to the coordinator which can cause data loss hence inaccuracy in the overall execution statistics.

Address this issue by switching to a "rolling" model where row-level statistics are immediately streamed back to the coordinator. Support mixed-version cluster upgrades by activating this model only if requested through the map-reduce arguments and the given shard supports that.

This is only a proposal, a way explore the approach, comments and feedback are welcome. Remarks:

  • It clearly uses more bandwidth as it will send more execution_stats messages. Is this acceptable?
  • It does not require changes in fabric itself, the whole logic is kept on the Mango side and it is compatible with the previous solution.
  • Are there other ways to stop the aggregation as soon as the limit is reached?
  • Are there other kind of messages coming from the shards that could be reliably used to signal the end of statistics collection (alternatives to capturing complete)?

Testing recommendations

Running the respective Mango unit and integration test suites might suffice (which is done by the CI):

make eunit apps=mango
make mango-test MANGO_TEST_OPTS="15-execution-stats-test"

But there is a detailed description in related the ticket (see below) on how to trigger the problem. Feel free to kick the tires.

Related Issues or Pull Requests

Fixes #4560

Checklist

  • Code is written and works correctly
  • Changes are covered by tests

Copy link
Member

@rnewson rnewson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems neat and tidy to me. I'm surprised we're losing this information today or that 'completion' is ambiguous but perhaps I don't understand your concerns in the ticket info.

src/mango/src/mango_cursor_view.erl Outdated Show resolved Hide resolved
@pgj pgj force-pushed the feat/mango/rolling-execution-stats branch from 292639e to 29d9b85 Compare September 12, 2023 11:38
@pgj
Copy link
Contributor Author

pgj commented Sep 12, 2023

Thanks for the comments @rnewson!

If I understand correctly, the way how fabric and rexi are currently implemented and the way how mango uses it for view queries causes the problem. The stop reply at the coordinator level makes fabric to stop all the workers immediately so there is no chance for the complete message to arrive. This decision happens when there are no more rows needed and the coordinator already has all the required data at hand. It would have to wait some more for the complete message to arrive and record the execution statistics properly. But this is not something that fabric supports on receiving a stop. The whole execution therefore becomes a race and if the worker is not quick enough to send the complete message before it is stopped, the statistics data will be lost.

The complete message is sent only if the limit for fabric reaches zero:

handle_message(#view_row{}, {_, _}, #collector{sorted = false, limit = 0} = State) ->
#collector{callback = Callback} = State,
{_, Acc} = Callback(complete, State#collector.user_acc),
{stop, State#collector{user_acc = Acc}};
handle_message(#view_row{} = Row, {_, From}, #collector{sorted = false} = St) ->
#collector{callback = Callback, user_acc = AccIn, limit = Limit} = St,
{Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn),
rexi:stream_ack(From),
{Go, St#collector{user_acc = Acc, limit = Limit - 1}};
handle_message(#view_row{} = Row, {Worker, From}, State) ->
#collector{
query_args = #mrargs{direction = Dir},
counters = Counters0,
rows = Rows0,
keys = KeyDict0,
collation = Collation
} = State,
{Rows, KeyDict} = merge_row(
Dir,
Collation,
KeyDict0,
Row#view_row{worker = {Worker, From}},
Rows0
),
Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
State1 = State#collector{rows = Rows, counters = Counters1, keys = KeyDict},
fabric_view:maybe_send_row(State1);

But mango implements its own limit atop views which is independent from that:

-spec handle_doc(#cursor{}, doc()) -> Response when
Response :: {ok, #cursor{}} | {stop, #cursor{}}.
handle_doc(#cursor{skip = S} = C, _) when S > 0 ->
{ok, C#cursor{skip = S - 1}};
handle_doc(#cursor{limit = L, execution_stats = Stats} = C, Doc) when L > 0 ->
UserFun = C#cursor.user_fun,
UserAcc = C#cursor.user_acc,
{Go, NewAcc} = UserFun({row, Doc}, UserAcc),
{Go, C#cursor{
user_acc = NewAcc,
limit = L - 1,
execution_stats = mango_execution_stats:incr_results_returned(Stats)
}};
handle_doc(C, _Doc) ->
{stop, C}.

That is, in terms of fabric, mango's occasional stop is unexpected technically. In such cases, it is basically passed down to rexi:

case
rexi_utils:recv(
Workers,
#shard.ref,
fun handle_message/3,
State,
fabric_util:view_timeout(Args),
fabric_util:timeout("view_permsg", "3600000")
)
of
{ok, NewState} ->
{ok, NewState#collector.user_acc};
{timeout, NewState} ->
Callback({error, timeout}, NewState#collector.user_acc);
{error, Resp} ->
{ok, Resp}
end.

There stop means that messages from the coordinator's mailbox will not be read further.

-spec recv([any()], integer(), function(), any(), timeout(), timeout()) ->
{ok, any()} | {timeout, any()} | {error, atom()} | {error, atom(), any()}.
recv(Refs, Keypos, Fun, Acc0, infinity, PerMsgTO) ->
process_mailbox(Refs, Keypos, Fun, Acc0, nil, PerMsgTO);
recv(Refs, Keypos, Fun, Acc0, GlobalTimeout, PerMsgTO) ->
TimeoutRef = erlang:make_ref(),
TRef = erlang:send_after(GlobalTimeout, self(), {timeout, TimeoutRef}),
try
process_mailbox(Refs, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO)
after
erlang:cancel_timer(TRef)
end.
process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
case process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) of
{ok, Acc} ->
process_mailbox(RefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO);
{new_refs, NewRefList, Acc} ->
process_mailbox(NewRefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO);
{stop, Acc} ->
{ok, Acc};
Error ->
Error
end.

Then the main loop in fabric ends and the workers are cleaned up:

{ok, Workers} ->
try
go(DbName, Workers, VInfo, CoordArgs, Callback, Acc)
after
fabric_streams:cleanup(Workers)
end;

That is why I was unsure if the approach in the PR is the right one and it is not something that tackles the symptoms only and does not fix the underlying root cause. But I am afraid that fabric cannot offer guarantees about complete in such cases because that is not how it was originally designed...?

My other concern was certainly the bandwidth usage itself. By this approach, the number of messages sent per rows doubles which may have its own performance implications. But I cannot judge how much it means in practice, if there is an optimization somewhere down in the stack that helps with that and can make the associated costs amortized.

@rnewson
Copy link
Member

rnewson commented Sep 12, 2023

That's very helpful background. You are right that it was t anticipated we'd need information back about a worker we know we no longer need to calculate the response.

It would be better to address that directly if we can.

@rnewson
Copy link
Member

rnewson commented Sep 12, 2023

To expand on that last comment, perhaps we alter how workers are killed? Today we do exit(Pid, kill) which is obviously untrappable. As an option to fabric_util:cleanup/1 perhaps we could instead do exit(Pid, finish) (or some other word). I think the workers would get a {rexi_EXIT, finish} message. On receipt of that they'd "complete" whatever made sense for them (in our case, simply messaging out the execution stats up to that point, but not performing any new work). The fabric coordinator would have to wait for either that reply or notification that the worker processes terminated for some other reason, before finishing up.

This way ensures we send no more messages than today, at the small cost of making the coordinator wait for one message from each worker it would normally have unilaterally and asynchronously killed.

@pgj pgj force-pushed the feat/mango/rolling-execution-stats branch 2 times, most recently from abd0850 to fbfe7b5 Compare January 9, 2024 23:12
In case of map-reduce views, the arrival of the `complete` message
is not guaranteed for the view callback (at the shard) when a
`stop` is issued during the aggregation (at the coordinator).  Due
to that, internally collected shard-level statistics may not be
fed back to the coordinator which can cause data loss hence
inaccuracy in the overall execution statistics.

Address this issue by switching to a "rolling" model where
row-level statistics are immediately streamed back to the
coordinator.  Support mixed-version cluster upgrades by activating
this model only if requested through the map-reduce arguments and
the given shard supports that.

Fixes apache#4560
@pgj pgj force-pushed the feat/mango/rolling-execution-stats branch from fbfe7b5 to 2cb7139 Compare January 10, 2024 17:02
@pgj pgj changed the title feat(mango): rolling execution statistics feat(mango): rolling execution statistics (exploration) Jan 10, 2024
@pgj
Copy link
Contributor Author

pgj commented Jan 10, 2024

Because I did not want to lose the original description of this PR along with the discussion here, and I wanted to put the change in a different perspective in the light of #4812, I forked it as #4958. After talking with @chewbranca about the problem and its solution, and adding the fact that he has been working on a model that would follow a similar approach, the current code seems feasible.

I have studied @rnewson's suggestions but neither of them brought a clear solution to the problem. I am inclined to believe (perhaps I am wrong here) that fixing the issue from the side of fabric would be a more complicated change and as such it hides some risks.

@pgj
Copy link
Contributor Author

pgj commented Mar 27, 2024

Closing this in favor of #4958.

@pgj pgj closed this Mar 27, 2024
@pgj pgj deleted the feat/mango/rolling-execution-stats branch March 27, 2024 14:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Inconsistent execution statistics for Mango queries
2 participants