Skip to content

Commit

Permalink
reintroduce flow control for async index update
Browse files Browse the repository at this point in the history
  • Loading branch information
rnewson committed Sep 11, 2023
1 parent 5b8c23b commit 6898d6e
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 39 deletions.
49 changes: 21 additions & 28 deletions src/nouveau/src/nouveau_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
search/2,
set_purge_seq/3,
set_update_seq/3,
drain_async_responses/1,
drain_async_responses/2,
jaxrs_error/2
]).

Expand Down Expand Up @@ -201,39 +201,32 @@ set_seq(#index{} = Index, ReqBody) ->
send_error(Reason)
end.

drain_async_responses(List) when is_list(List) ->
drain_async_responses_list(List);
drain_async_responses(Timeout) when is_integer(Timeout); Timeout == infinity ->
drain_async_responses_timeout(Timeout, []).

drain_async_responses_list([]) ->
ok;
drain_async_responses_list([ReqId | Rest]) ->
receive
{ibrowse_async_headers, ReqId, Code, Headers} ->
case drain_async_response(ReqId, Code, Headers, undefined) of
{ok, "204", _Headers, _Body} ->
drain_async_responses_list(Rest);
{ok, StatusCode, _Headers, RespBody} ->
exit({error, jaxrs_error(StatusCode, RespBody)})
end
%% wait for enough async responses to reduce the Queue to Min length.
drain_async_responses(Queue0, Min) when Min >= 0 ->
case queue:len(Queue0) > Min of
true ->
{{value, ReqId}, Queue1} = queue:out(Queue0),
wait_for_response(ReqId),
drain_async_responses(Queue1, Min);
false ->
Queue0
end.

drain_async_responses_timeout(Timeout, ReqIds) when is_integer(Timeout); Timeout == infinity ->
receive
{ibrowse_async_headers, ReqId, Code0, Headers0} ->
case drain_async_response(ReqId, Code0, Headers0, undefined) of
{ok, "204", _Headers, _Body} ->
drain_async_responses_timeout(Timeout, [ReqId | ReqIds]);
{ok, StatusCode, _Headers, RespBody} ->
exit({error, jaxrs_error(StatusCode, RespBody)})
end
after Timeout ->
ReqIds
wait_for_response(ReqId) ->
case drain_async_response(ReqId) of
{ok, "204", _Headers, _Body} ->
ok;
{ok, StatusCode, _Headers, RespBody} ->
exit({error, jaxrs_error(StatusCode, RespBody)})
end.

drain_async_response(ReqId) ->
drain_async_response(ReqId, undefined, undefined, undefined).

drain_async_response(ReqId, Code0, Headers0, Body0) ->
receive
{ibrowse_async_headers, ReqId, Code1, Headers1} ->
drain_async_response(ReqId, Code1, Headers1, Body0);
{ibrowse_async_response, ReqId, Body1} ->
drain_async_response(ReqId, Code0, Headers0, Body1);
{ibrowse_async_response_end, ReqId} ->
Expand Down
4 changes: 2 additions & 2 deletions src/nouveau/src/nouveau_index_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ configure_ibrowse(URL) ->
ibrowse:set_max_sessions(
Host,
Port,
config:get_integer("nouveau", "max_sessions", 100)
nouveau_util:max_sessions()
),
ibrowse:set_max_pipeline_size(
Host,
Port,
config:get_integer("nouveau", "max_pipeline_size", 1000)
nouveau_util:max_pipeline_size()
).
19 changes: 11 additions & 8 deletions src/nouveau/src/nouveau_index_updater.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@
changes_done,
total_changes,
exclude_idrevs,
reqids = [],
reqids,
conn_pid,
update_seq
update_seq,
max_pipeline_size
}).

-record(purge_acc, {
Expand Down Expand Up @@ -97,13 +98,15 @@ update(#index{} = Index) ->
changes_done = 0,
total_changes = TotalChanges,
exclude_idrevs = PurgeAcc1#purge_acc.exclude_list,
reqids = queue:new(),
conn_pid = ConnPid,
update_seq = PurgeAcc1#purge_acc.index_update_seq
update_seq = PurgeAcc1#purge_acc.index_update_seq,
max_pipeline_size = nouveau_util:max_pipeline_size()
},
{ok, Acc1} = couch_db:fold_changes(
Db, Acc0#acc.update_seq, fun load_docs/2, Acc0, []
),
nouveau_api:drain_async_responses(lists:reverse(Acc1#acc.reqids)),
nouveau_api:drain_async_responses(Acc1#acc.reqids, 0),
ibrowse:stop_worker_process(ConnPid),
ok = nouveau_api:set_update_seq(Index, Acc1#acc.update_seq, NewCurSeq)
after
Expand All @@ -117,9 +120,9 @@ update(#index{} = Index) ->
load_docs(#full_doc_info{id = <<"_design/", _/binary>>}, #acc{} = Acc) ->
{ok, Acc};
load_docs(FDI, #acc{} = Acc0) ->
%% collect completed requests without blocking
ReqIds = nouveau_api:drain_async_responses(0),
Acc1 = Acc0#acc{reqids = Acc0#acc.reqids -- ReqIds},
%% block for responses so we stay under the max pipeline size
ReqIds1 = nouveau_api:drain_async_responses(Acc0#acc.reqids, Acc0#acc.max_pipeline_size),
Acc1 = Acc0#acc{reqids = ReqIds1},

couch_task_status:update([
{changes_done, Acc1#acc.changes_done},
Expand All @@ -146,7 +149,7 @@ load_docs(FDI, #acc{} = Acc0) ->
{ibrowse_req_id, ReqId} ->
Acc1#acc{
update_seq = DI#doc_info.high_seq,
reqids = [ReqId | Acc1#acc.reqids]
reqids = queue:in(ReqId, Acc1#acc.reqids)
};
{error, Reason} ->
exit({error, Reason})
Expand Down
10 changes: 9 additions & 1 deletion src/nouveau/src/nouveau_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
maybe_create_local_purge_doc/2,
get_local_purge_doc_id/1,
get_local_purge_doc_body/3,
nouveau_url/0
nouveau_url/0,
max_sessions/0,
max_pipeline_size/0
]).

index_name(Path) when is_binary(Path) ->
Expand Down Expand Up @@ -196,3 +198,9 @@ get_local_purge_doc_body(LocalDocId, PurgeSeq, Index) ->

nouveau_url() ->
config:get("nouveau", "url", "http://127.0.0.1:8080").

max_sessions() ->
config:get_integer("nouveau", "max_sessions", 100).

max_pipeline_size() ->
config:get_integer("nouveau", "max_pipeline_size", 1000).

0 comments on commit 6898d6e

Please sign in to comment.