Skip to content

Commit

Permalink
Use ChannelFutureListener in Netty code to reduce capturing lambdas (#…
Browse files Browse the repository at this point in the history
…112967)

Mainly motivated by simplifying the reference chains for Netty buffers
and have easier to analyze heap dumps in some spots but also a small
performance win in and of itself.
  • Loading branch information
original-brownbear committed Sep 18, 2024
1 parent f437f13 commit 90e343c
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 22 deletions.
4 changes: 4 additions & 0 deletions modules/transport-netty4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,7 @@ tasks.named("thirdPartyAudit").configure {
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$5'
)
}

tasks.named('forbiddenApisMain').configure {
signaturesFiles += files('forbidden/netty-signatures.txt')
}
9 changes: 9 additions & 0 deletions modules/transport-netty4/forbidden/netty-signatures.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the "Elastic License
# 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
# Public License v 1"; you may not use this file except in compliance with, at
# your election, the "Elastic License 2.0", the "GNU Affero General Public
# License v3.0 only", or the "Server Side Public License, v 1".

@defaultMessage Use org.elasticsearch.transport.netty4.Netty4Utils.addListener(io.netty.channel.ChannelFuture, io.netty.channel.ChannelFutureListener) instead
io.netty.channel.ChannelFuture#addListener(io.netty.util.concurrent.GenericFutureListener)
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ public void onFailure(Exception e) {
Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel),
e
);
channel.close().addListener(ignored -> {
finishingWrite.combiner().add(channel.newFailedFuture(e));
Netty4Utils.addListener(channel.close(), f -> {
finishingWrite.combiner().add(f.channel().newFailedFuture(e));
finishingWrite.combiner().finish(finishingWrite.onDone());
});
checkShutdown();
Expand Down Expand Up @@ -417,7 +417,7 @@ private boolean writeChunk(ChannelHandlerContext ctx, ChunkedWrite chunkedWrite)
final boolean isPartComplete = bodyPart.isPartComplete();
final boolean isBodyComplete = isPartComplete && bodyPart.isLastPart();
final ChannelFuture f = ctx.write(isBodyComplete ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content));
f.addListener(ignored -> bytes.close());
Netty4Utils.addListener(f, ignored -> bytes.close());
combiner.add(f);
return isPartComplete;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,9 @@ protected InboundPipeline getInboundPipeline(Channel ch, boolean isRemoteCluster
}

private static void addClosedExceptionLogger(Channel channel) {
channel.closeFuture().addListener(f -> {
if (f.isSuccess() == false) {
logger.debug(() -> format("exception while closing channel: %s", channel), f.cause());
Netty4Utils.addListener(channel.closeFuture(), channelFuture -> {
if (channelFuture.isSuccess() == false && logger.isDebugEnabled()) {
logger.debug(format("exception while closing channel: %s", channelFuture.channel()), channelFuture.cause());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.Future;
Expand All @@ -28,6 +31,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.transport.TransportException;

import java.io.IOException;
Expand Down Expand Up @@ -141,7 +145,7 @@ public static void safeWriteAndFlush(Channel channel, Object message, ActionList
// can only be completed by some network event from this point on. However...
final var promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
addListener(promise, listener);
assert assertCorrectPromiseListenerThreading(channel, promise);
assert assertCorrectPromiseListenerThreading(promise);
channel.writeAndFlush(message, promise);
if (channel.eventLoop().isShuttingDown()) {
// ... if we get here then the event loop may already have terminated, and https://github.com/netty/netty/issues/8007 means that
Expand All @@ -156,10 +160,10 @@ public static void safeWriteAndFlush(Channel channel, Object message, ActionList
}
}

private static boolean assertCorrectPromiseListenerThreading(Channel channel, Future<?> promise) {
final var eventLoop = channel.eventLoop();
promise.addListener(future -> {
assert eventLoop.inEventLoop() || future.cause() instanceof RejectedExecutionException || channel.eventLoop().isTerminated()
private static boolean assertCorrectPromiseListenerThreading(ChannelPromise promise) {
addListener(promise, future -> {
var eventLoop = future.channel().eventLoop();
assert eventLoop.inEventLoop() || future.cause() instanceof RejectedExecutionException || eventLoop.isTerminated()
: future.cause();
});
return true;
Expand All @@ -183,4 +187,9 @@ public static void addListener(Future<Void> future, ActionListener<Void> listene
}
});
}

@SuppressForbidden(reason = "single point for adding listeners that enforces use of ChannelFutureListener")
public static void addListener(ChannelFuture channelFuture, ChannelFutureListener listener) {
channelFuture.addListener(listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.PromiseCombiner;

import org.apache.lucene.util.BytesRef;
Expand Down Expand Up @@ -94,13 +94,13 @@ private void writeInSlices(ChannelHandlerContext ctx, ChannelPromise promise, By
final int bufferSize = Math.min(readableBytes, MAX_BYTES_PER_WRITE);
if (readableBytes == bufferSize) {
// last write for this chunk we're done
ctx.write(buf).addListener(forwardResultListener(ctx, promise));
Netty4Utils.addListener(ctx.write(buf), forwardResultListener(promise));
return;
}
final int readerIndex = buf.readerIndex();
final ByteBuf writeBuffer = buf.retainedSlice(readerIndex, bufferSize);
buf.readerIndex(readerIndex + bufferSize);
ctx.write(writeBuffer).addListener(forwardFailureListener(ctx, promise));
Netty4Utils.addListener(ctx.write(writeBuffer), forwardFailureListener(promise));
if (ctx.channel().isWritable() == false) {
// channel isn't writable any longer -> move to queuing
queueWrite(buf, promise);
Expand Down Expand Up @@ -164,9 +164,9 @@ private boolean doFlush(ChannelHandlerContext ctx) {
final ChannelFuture writeFuture = ctx.write(writeBuffer);
if (sliced == false) {
currentWrite = null;
writeFuture.addListener(forwardResultListener(ctx, write.promise));
Netty4Utils.addListener(writeFuture, forwardResultListener(write.promise));
} else {
writeFuture.addListener(forwardFailureListener(ctx, write.promise));
Netty4Utils.addListener(writeFuture, forwardFailureListener(write.promise));
}
}
ctx.flush();
Expand All @@ -176,18 +176,18 @@ private boolean doFlush(ChannelHandlerContext ctx) {
return true;
}

private static GenericFutureListener<Future<Void>> forwardFailureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
private static ChannelFutureListener forwardFailureListener(ChannelPromise promise) {
return future -> {
assert ctx.executor().inEventLoop();
assert future.channel().eventLoop().inEventLoop();
if (future.isSuccess() == false) {
promise.tryFailure(future.cause());
}
};
}

private static GenericFutureListener<Future<Void>> forwardResultListener(ChannelHandlerContext ctx, ChannelPromise promise) {
private static ChannelFutureListener forwardResultListener(ChannelPromise promise) {
return future -> {
assert ctx.executor().inEventLoop();
assert future.channel().eventLoop().inEventLoop();
if (future.isSuccess()) {
promise.trySuccess();
} else {
Expand Down
7 changes: 6 additions & 1 deletion x-pack/plugin/security/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,12 @@ tasks.named("forbiddenPatterns").configure {
}

tasks.named('forbiddenApisMain').configure {
signaturesFiles += files('forbidden/ldap-signatures.txt', 'forbidden/xml-signatures.txt', 'forbidden/oidc-signatures.txt')
signaturesFiles += files(
'forbidden/ldap-signatures.txt',
'forbidden/xml-signatures.txt',
'forbidden/oidc-signatures.txt',
project(':modules:transport-netty4').file('forbidden/netty-signatures.txt')
)
}

tasks.named('forbiddenApisTest').configure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.transport.netty4.Netty4Transport;
import org.elasticsearch.transport.netty4.Netty4Utils;
import org.elasticsearch.transport.netty4.SharedGroupFactory;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.security.transport.ProfileConfigurations;
Expand Down Expand Up @@ -341,7 +342,7 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, Sock
final SslHandler sslHandler = new SslHandler(sslEngine);
ctx.pipeline().replace(this, "ssl", sslHandler);
final Future<?> handshakePromise = sslHandler.handshakeFuture();
connectPromise.addListener(result -> {
Netty4Utils.addListener(connectPromise, result -> {
if (result.isSuccess() == false) {
promise.tryFailure(result.cause());
} else {
Expand Down

0 comments on commit 90e343c

Please sign in to comment.