Skip to content

Commit

Permalink
ClientRequestContext#cancel cancels the associated request immediat…
Browse files Browse the repository at this point in the history
…ely (#5800)

Motivation:

In order to handle #4591, I
propose that we first define an API which allows users to cancel a
request. Currently, `ClientRequestContext#cancel` is invoked once
`CancellationScheduler#start` is invoked. I propose to change the
behavior of `ClientRequestContext#cancel` such that the associated
request is aborted immediately. Once this API is available, it would be
trivial to implement `ResponseTimeoutMode` by adjusting where to call
`CancellationScheduler#start`. Additionally, users may easily implement
their own timeout mechanism if they would like.

Modifications:

- `CancellationScheduler` related changes
- `DefaultCancellationScheduler` is refactored to be lock based instead
of event loop based. The reasoning for this change was for the scenario
where the request execution didn't reach the event loop yet. i.e. If a
user calls `ctx.cancel` in the decorator, a connection shouldn't be
opened.
- e.g.
https://github.com/line/armeria/blob/59aa40a59e1f1122716e70f9f1d6f1402a6aae0e/core/src/test/java/com/linecorp/armeria/client/ContextCancellationTest.java#L90-L116
- `CancellationScheduler#updateTask` is introduced. This API updates the
cancellation task if the scheduler isn't invoked yet. If the scheduler
is invoked already, the cancellation task will be executed eventually.
This API allows `ctx.cancel` to attempt cancellation depending on which
stage the request is at. For instance, at the decorators only
`req.abort` needs to be called but at the request write stage, the
cancellation task may need to send a reset signal.
- Misc. an infinite timeout is internally represented as
`Long.MAX_VALUE` instead of `0`
- `AbstractHttpRequestHandler` related changes
- `CancellationTask` in `AbstractHttpRequestHandler`,
`HttpResponseWrapper`, `AbstractHttpResponseHandler` is modified to be
scheduled inside an event loop. The reasoning is that `ctx.cancel`, and
hence `CancellationTask#run` can be invoked from any thread.
- There is a higher chance of `AbstractHttpRequestHandler` calling
`fail` or `failAndReset` multiple times. There is no point in doing so,
so added a boolean flag `failed` for safety.
- `HttpResponseWrapper` related changes
- The original intention of `cancelTimeoutAndLog` was to not log if the
response is unexpected. Modified so that if the response is cancelled or
the context is cancelled, no logging is done
- There is probably no reason to not call `close` when a timeout occurs.
Unified the fragmented logic of closing the `HttpResponseWrapper`.

Result:

- Users can call `ClientRequestContext#cancel` to cancel the ongoing
request easily.
- #5793 can be prepared for

<!--
Visit this URL to learn more about how to write a pull request
description:

https://armeria.dev/community/developer-guide#how-to-write-pull-request-description
-->

---------

Co-authored-by: Ikhun Um <[email protected]>
  • Loading branch information
jrhee17 and ikhoon authored Aug 9, 2024
1 parent faa886f commit 9a29b39
Show file tree
Hide file tree
Showing 16 changed files with 1,063 additions and 576 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import com.linecorp.armeria.internal.client.ClientRequestContextExtension;
import com.linecorp.armeria.internal.client.DecodedHttpResponse;
import com.linecorp.armeria.internal.client.HttpSession;
import com.linecorp.armeria.internal.common.CancellationScheduler;
import com.linecorp.armeria.internal.common.CancellationScheduler.CancellationTask;
import com.linecorp.armeria.internal.common.RequestContextUtil;
import com.linecorp.armeria.unsafe.PooledObjects;

Expand Down Expand Up @@ -89,6 +91,7 @@ enum State {
private ScheduledFuture<?> timeoutFuture;
private State state = State.NEEDS_TO_WRITE_FIRST_HEADER;
private boolean loggedRequestFirstBytesTransferred;
private boolean failed;

AbstractHttpRequestHandler(Channel ch, ClientHttpObjectEncoder encoder, HttpResponseDecoder responseDecoder,
DecodedHttpResponse originalRes,
Expand Down Expand Up @@ -192,9 +195,30 @@ final boolean tryInitialize() {
() -> failAndReset(WriteTimeoutException.get()),
timeoutMillis, TimeUnit.MILLISECONDS);
}
final CancellationScheduler scheduler = cancellationScheduler();
if (scheduler != null) {
scheduler.updateTask(newCancellationTask());
}
if (ctx.isCancelled()) {
// The previous cancellation task wraps the cause with an UnprocessedRequestException
// so we return early
return false;
}
return true;
}

private CancellationTask newCancellationTask() {
return cause -> {
if (ch.eventLoop().inEventLoop()) {
try (SafeCloseable ignored = RequestContextUtil.pop()) {
failAndReset(cause);
}
} else {
ch.eventLoop().execute(() -> failAndReset(cause));
}
};
}

RequestHeaders mergedRequestHeaders(RequestHeaders headers) {
final HttpHeaders internalHeaders;
final ClientRequestContextExtension ctxExtension = ctx.as(ClientRequestContextExtension.class);
Expand Down Expand Up @@ -354,6 +378,10 @@ final void failRequest(Throwable cause) {
}

private void fail(Throwable cause) {
if (failed) {
return;
}
failed = true;
state = State.DONE;
cancel();
logBuilder.endRequest(cause);
Expand All @@ -368,9 +396,20 @@ private void fail(Throwable cause) {
logBuilder.endResponse(cause);
originalRes.close(cause);
}

final CancellationScheduler scheduler = cancellationScheduler();
if (scheduler != null) {
// best-effort attempt to cancel the scheduled timeout task so that RequestContext#cause
// isn't set unnecessarily
scheduler.cancelScheduled();
}
}

final void failAndReset(Throwable cause) {
if (failed) {
return;
}

if (cause instanceof WriteTimeoutException) {
final HttpSession session = HttpSession.get(ch);
// Mark the session as unhealthy so that subsequent requests do not use it.
Expand All @@ -394,7 +433,7 @@ final void failAndReset(Throwable cause) {
error = Http2Error.INTERNAL_ERROR;
}

if (error.code() != Http2Error.CANCEL.code()) {
if (error.code() != Http2Error.CANCEL.code() && cause != ctx.cancellationCause()) {
Exceptions.logIfUnexpected(logger, ch,
HttpSession.get(ch).protocol(),
"a request publisher raised an exception", cause);
Expand All @@ -415,4 +454,13 @@ final boolean cancelTimeout() {
this.timeoutFuture = null;
return timeoutFuture.cancel(false);
}

@Nullable
private CancellationScheduler cancellationScheduler() {
final ClientRequestContextExtension ctxExt = ctx.as(ClientRequestContextExtension.class);
if (ctxExt != null) {
return ctxExt.responseCancellationScheduler();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.logging.ClientConnectionTimings;
import com.linecorp.armeria.common.logging.ClientConnectionTimingsBuilder;
import com.linecorp.armeria.common.logging.RequestLogBuilder;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.internal.client.ClientPendingThrowableUtil;
import com.linecorp.armeria.internal.client.ClientRequestContextExtension;
import com.linecorp.armeria.internal.client.DecodedHttpResponse;
import com.linecorp.armeria.internal.client.HttpSession;
import com.linecorp.armeria.internal.client.PooledChannel;
Expand Down Expand Up @@ -63,13 +63,13 @@ final class HttpClientDelegate implements HttpClient {
public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Exception {
final Throwable throwable = ClientPendingThrowableUtil.pendingThrowable(ctx);
if (throwable != null) {
return earlyFailedResponse(throwable, ctx, req);
return earlyFailedResponse(throwable, ctx);
}
if (req != ctx.request()) {
return earlyFailedResponse(
new IllegalStateException("ctx.request() does not match the actual request; " +
"did you forget to call ctx.updateRequest() in your decorator?"),
ctx, req);
ctx);
}

final Endpoint endpoint = ctx.endpoint();
Expand All @@ -84,21 +84,27 @@ public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Ex
// and response created here will be exposed only when `EndpointGroup.select()` returned `null`.
//
// See `DefaultClientRequestContext.init()` for more information.
return earlyFailedResponse(EmptyEndpointGroupException.get(ctx.endpointGroup()), ctx, req);
return earlyFailedResponse(EmptyEndpointGroupException.get(ctx.endpointGroup()), ctx);
}

final SessionProtocol protocol = ctx.sessionProtocol();
final ProxyConfig proxyConfig;
try {
proxyConfig = getProxyConfig(protocol, endpoint);
} catch (Throwable t) {
return earlyFailedResponse(t, ctx, req);
return earlyFailedResponse(t, ctx);
}

final Throwable cancellationCause = ctx.cancellationCause();
if (cancellationCause != null) {
return earlyFailedResponse(cancellationCause, ctx);
}

final Endpoint endpointWithPort = endpoint.withDefaultPort(ctx.sessionProtocol());
final EventLoop eventLoop = ctx.eventLoop().withoutContext();
// TODO(ikhoon) Use ctx.exchangeType() to create an optimized HttpResponse for non-streaming response.
final DecodedHttpResponse res = new DecodedHttpResponse(eventLoop);
updateCancellationTask(ctx, req, res);

final ClientConnectionTimingsBuilder timingsBuilder = ClientConnectionTimings.builder();

Expand All @@ -115,14 +121,31 @@ public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Ex
acquireConnectionAndExecute(ctx, resolved, req, res, timingsBuilder, proxyConfig);
} else {
ctx.logBuilder().session(null, ctx.sessionProtocol(), timingsBuilder.build());
earlyFailedResponse(cause, ctx, req, res);
ctx.cancel(cause);
}
});
}

return res;
}

private static void updateCancellationTask(ClientRequestContext ctx, HttpRequest req,
DecodedHttpResponse res) {
final ClientRequestContextExtension ctxExt = ctx.as(ClientRequestContextExtension.class);
if (ctxExt == null) {
return;
}
ctxExt.responseCancellationScheduler().updateTask(cause -> {
try (SafeCloseable ignored = RequestContextUtil.pop()) {
final UnprocessedRequestException ure = UnprocessedRequestException.of(cause);
req.abort(ure);
ctx.logBuilder().endRequest(ure);
res.close(ure);
ctx.logBuilder().endResponse(ure);
}
});
}

private void resolveAddress(Endpoint endpoint, ClientRequestContext ctx,
BiConsumer<@Nullable Endpoint, @Nullable Throwable> onComplete) {

Expand Down Expand Up @@ -169,7 +192,7 @@ private void acquireConnectionAndExecute0(ClientRequestContext ctx, Endpoint end
try {
pool = factory.pool(ctx.eventLoop().withoutContext());
} catch (Throwable t) {
earlyFailedResponse(t, ctx, req, res);
ctx.cancel(t);
return;
}
final SessionProtocol protocol = ctx.sessionProtocol();
Expand All @@ -185,7 +208,7 @@ private void acquireConnectionAndExecute0(ClientRequestContext ctx, Endpoint end
if (cause == null) {
doExecute(newPooledChannel, ctx, req, res);
} else {
earlyFailedResponse(cause, ctx, req, res);
ctx.cancel(cause);
}
return null;
});
Expand Down Expand Up @@ -224,30 +247,12 @@ private static void logSession(ClientRequestContext ctx, @Nullable PooledChannel
}
}

private static HttpResponse earlyFailedResponse(Throwable t, ClientRequestContext ctx, HttpRequest req) {
private static HttpResponse earlyFailedResponse(Throwable t, ClientRequestContext ctx) {
final UnprocessedRequestException cause = UnprocessedRequestException.of(t);
handleEarlyRequestException(ctx, req, cause);
ctx.cancel(cause);
return HttpResponse.ofFailure(cause);
}

private static void earlyFailedResponse(Throwable t, ClientRequestContext ctx, HttpRequest req,
DecodedHttpResponse res) {
final UnprocessedRequestException cause = UnprocessedRequestException.of(t);
handleEarlyRequestException(ctx, req, cause);
res.close(cause);
}

private static void handleEarlyRequestException(ClientRequestContext ctx,
HttpRequest req, Throwable cause) {
try (SafeCloseable ignored = RequestContextUtil.pop()) {
req.abort(cause);
final RequestLogBuilder logBuilder = ctx.logBuilder();
logBuilder.endRequest(cause);
logBuilder.endResponse(cause);
ctx.cancel(cause);
}
}

private static void doExecute(PooledChannel pooledChannel, ClientRequestContext ctx,
HttpRequest req, DecodedHttpResponse res) {
final Channel channel = pooledChannel.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
import com.linecorp.armeria.common.stream.StreamWriter;
import com.linecorp.armeria.common.stream.SubscriptionOption;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.internal.client.ClientRequestContextExtension;
import com.linecorp.armeria.internal.client.DecodedHttpResponse;
import com.linecorp.armeria.internal.common.CancellationScheduler;
import com.linecorp.armeria.internal.common.CancellationScheduler.CancellationTask;
import com.linecorp.armeria.internal.common.RequestContextUtil;
import com.linecorp.armeria.unsafe.PooledObjects;

import io.netty.channel.EventLoop;
Expand Down Expand Up @@ -213,7 +215,7 @@ void close(@Nullable Throwable cause, boolean cancel) {
}
done = true;
closed = true;
cancelTimeoutOrLog(cause, cancel);
cancelTimeoutAndLog(cause, cancel);
final HttpRequest request = ctx.request();
assert request != null;
if (cause != null) {
Expand Down Expand Up @@ -250,32 +252,24 @@ private void cancelAction(@Nullable Throwable cause) {
}
}

private void cancelTimeoutOrLog(@Nullable Throwable cause, boolean cancel) {
CancellationScheduler responseCancellationScheduler = null;
private void cancelTimeoutAndLog(@Nullable Throwable cause, boolean cancel) {
final ClientRequestContextExtension ctxExtension = ctx.as(ClientRequestContextExtension.class);
if (ctxExtension != null) {
responseCancellationScheduler = ctxExtension.responseCancellationScheduler();
// best-effort attempt to cancel the scheduled timeout task so that RequestContext#cause
// isn't set unnecessarily
ctxExtension.responseCancellationScheduler().cancelScheduled();
}

if (responseCancellationScheduler == null || !responseCancellationScheduler.isFinished()) {
if (responseCancellationScheduler != null) {
responseCancellationScheduler.clearTimeout(false);
}
// There's no timeout or the response has not been timed out.
if (cancel) {
cancelAction(cause);
} else {
closeAction(cause);
}
if (cancel) {
cancelAction(cause);
return;
}
if (delegate.isOpen()) {
closeAction(cause);
}

// Response has been timed out already.
// Log only when it's not a ResponseTimeoutException.
if (cause instanceof ResponseTimeoutException) {
// the context has been cancelled either by timeout or by user invocation
if (cause == ctx.cancellationCause()) {
return;
}

Expand All @@ -299,7 +293,8 @@ void initTimeout() {
if (ctxExtension != null) {
final CancellationScheduler responseCancellationScheduler =
ctxExtension.responseCancellationScheduler();
responseCancellationScheduler.start(newCancellationTask());
responseCancellationScheduler.updateTask(newCancellationTask());
responseCancellationScheduler.start();
}
}

Expand All @@ -312,11 +307,13 @@ public boolean canSchedule() {

@Override
public void run(Throwable cause) {
delegate.close(cause);
final HttpRequest request = ctx.request();
assert request != null;
request.abort(cause);
ctx.logBuilder().endResponse(cause);
if (ctx.eventLoop().inEventLoop()) {
try (SafeCloseable ignored = RequestContextUtil.pop()) {
close(cause);
}
} else {
ctx.eventLoop().withoutContext().execute(() -> close(cause));
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,9 @@ default void setRequestAutoAbortDelay(Duration delay) {

/**
* Returns the cause of cancellation, {@code null} if the request has not been cancelled.
* Note that there is no guarantee that the cancellation cause is equivalent to the cause of failure
* for {@link HttpRequest} or {@link HttpResponse}. Refer to {@link RequestLog#requestCause()}
* or {@link RequestLog#responseCause()} for the exact reason why a request or response failed.
*/
@Nullable
Throwable cancellationCause();
Expand Down
Loading

0 comments on commit 9a29b39

Please sign in to comment.