-
Notifications
You must be signed in to change notification settings - Fork 154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Force Reconnect API #1100
Force Reconnect API #1100
Conversation
scottf
commented
Mar 20, 2024
•
edited
Loading
edited
- Implement the Force Reconnect API (Add reconnect method nats-architecture-and-design#259)
- Better handling socket write timeout is triggered, uses force reconnect
@Override | ||
public void socketWriteTimeout(Connection conn) { | ||
Output.controlMessage(this.id, supplyMessage("SEVERE socketWriteTimeout", conn, null, null)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added since the class is an ErrorListener and socketWriteTimeout was added.
// .delay(1) | ||
// .reportFrequency(1000); | ||
// END CORE PUBLISHER EXAMPLE | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an example class that demonstrates resilient publishing. I added some functionality to it, for instance it only demoed Jet Stream publishing, but now can support core publishing. There are also some other improvements.
@@ -57,7 +57,7 @@ public static void main(String[] args) { | |||
} | |||
|
|||
System.out.println("Starting publish..."); | |||
ResilientPublisher publisher = ResilientPublisher.newInstanceQuietAndJitter(jsm, STREAM, SUBJECT, MESSAGE_PREFIX, 10); | |||
ResilientPublisher publisher = new ResilientPublisher(nc, jsm, STREAM, SUBJECT).basicDataPrefix(MESSAGE_PREFIX).jitter(10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was changed because ResilentPublisher was changed and it uses that class.
@@ -43,7 +43,7 @@ public static void main(String[] args) { | |||
createOrReplaceStream(jsm, STREAM, SUBJECT); | |||
|
|||
System.out.println("Starting publish..."); | |||
ResilientPublisher publisher = ResilientPublisher.newInstanceQuietAndJitter(jsm, STREAM, SUBJECT, MESSAGE_PREFIX, 10); | |||
ResilientPublisher publisher = new ResilientPublisher(nc, jsm, STREAM, SUBJECT).basicDataPrefix(MESSAGE_PREFIX).jitter(10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was changed because ResilentPublisher was changed and it uses that class.
* @throws IOException | ||
* @throws InterruptedException | ||
*/ | ||
void forceReconnect() throws IOException, InterruptedException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the new api in the Connection object.
/** Server Sent a lame duck mode. */ | ||
LAME_DUCK("nats: lame duck mode"); | ||
LAME_DUCK(false, "lame duck mode"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I modified this class for more granular information about the event and to allow for different types of reporting. It is backward compatible, meaning that the toString did not change text.
* | ||
* @param conn The connection that had the issue | ||
*/ | ||
default void socketWriteTimeout(Connection conn) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the new ErrorListener event. It has a default implementation to be backward compatible.
* to the message like ", foo: <fooValue>, bar-<barValue>". | ||
* @return | ||
*/ | ||
default String supplyMessage(String label, Connection conn, Consumer consumer, Subscription sub, Object... pairs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function used to be part of the ErrorListenerLoggerImpl class, but it was moved down into the interface since so it can be more easily used in ErrorListener implementations.
import io.nats.client.*; | ||
import io.nats.client.support.Status; | ||
|
||
public class ErrorListenerConsoleImpl implements ErrorListener { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The console implementation is less verbose than the logger impl, nothing really exciting.
@@ -23,30 +22,6 @@ public class ErrorListenerLoggerImpl implements ErrorListener { | |||
|
|||
private final static Logger LOGGER = Logger.getLogger(ErrorListenerLoggerImpl.class.getName()); | |||
|
|||
private String supplyMessage(String label, Connection conn, Consumer consumer, Subscription sub, Object... pairs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As already noted, this was moved down into the interface.
@Override | ||
public void socketWriteTimeout(Connection conn) { | ||
LOGGER.severe(() -> supplyMessage("socketWriteTimeout", conn, null, null)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implementation specific implementation since the interface has a no-op default implementation.
this.requestCleanupInterval = requestCleanupInterval; | ||
} | ||
|
||
MessageQueue(MessageQueue source) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allows a message queue to be created from another message queue. It is used when a writer is failing because it's blocked or a force reconnect, to take the write queue from the writer that is being discard and moving all the items (messages) in the queue to the queue in the new writer so they are not lost.
@@ -33,11 +33,13 @@ class MessageQueue { | |||
protected final AtomicLong length; | |||
protected final AtomicLong sizeInBytes; | |||
protected final AtomicInteger running; | |||
protected final boolean singleThreadedReader; | |||
protected final boolean singleReaderMode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made the variable name more consistent with it's usage and to match it's getter.
protected final LinkedBlockingQueue<NatsMessage> queue; | ||
protected final Lock filterLock; | ||
protected final int publishHighwaterMark; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state needed for when constructing a queue from a queue.
protected final boolean discardWhenFull; | ||
protected final long offerTimeoutMillis; | ||
protected final Duration requestCleanupInterval; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state needed for when constructing a queue from a queue.
private final NatsConnectionReader reader; | ||
private final NatsConnectionWriter writer; | ||
private NatsConnectionReader reader; | ||
private NatsConnectionWriter writer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these are no longer final, they will be swapped out on a force reconnect.
this.writer.stop().get(timeoutNanos, TimeUnit.NANOSECONDS); | ||
if (writer.isRunning()) { | ||
this.writer.stop().get(timeoutNanos, TimeUnit.NANOSECONDS); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sometimes stop times out even when it's not running. This guard prevents that since it's already been stopped.
dataPort.shutdownInput(); | ||
} catch (IOException e) { | ||
// we don't care, we are shutting down anyway | ||
if (running.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's not running, this has already been called, no need to go into the logic.
// if already not running, an IOE is not unreasonable in a transition state | ||
if (running.get()) { | ||
this.connection.handleCommunicationIssue(io); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if already not running, an IOE is not unreasonable in a transition state
@@ -75,6 +75,27 @@ class NatsConnectionWriter implements Runnable { | |||
reconnectBufferSize = options.getReconnectBufferSize(); | |||
} | |||
|
|||
NatsConnectionWriter(NatsConnectionWriter sourceWriter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to replace an existing writer but keeps it's relevant state including the message queue.
this.outgoing.filter((msg) -> | ||
msg.isProtocol() && | ||
(msg.protocolBab.equals(OP_PING_BYTES) || msg.protocolBab.equals(OP_PONG_BYTES))); | ||
if (running.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it's not running, don't need to go into the stop logic.
@@ -36,12 +36,16 @@ class WriteWatcherTask extends TimerTask { | |||
public void run() { | |||
// if now is after when it was supposed to be done by | |||
if (System.nanoTime() > writeMustBeDoneBy) { | |||
writeWatcherTimer.cancel(); // we don't need to repeat this | |||
connection.executeCallback((c, el) -> el.socketWriteTimeout(c)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the callback to the new ErrorListener method.
} | ||
catch (InterruptedException e) { | ||
// do nothing | ||
catch (IOException ignore) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try closing the output. Most likely going to fail since it was blocked, hence the catch, but also could have actually be freed up in the middle, so close it anyway, it's state is bad.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for placing the comments throughout, it helped a lot!
Pretty much only have one comment, but am happy to approve afterward.
updateStatus(Status.DISCONNECTED); | ||
reader.stop(); | ||
writer.stop(); | ||
writer = new NatsConnectionWriter(writer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment mentions making both reader
and writer
non-final to allow swapping them on forceReconnect
.
https://github.com/nats-io/nats.java/pull/1100/files#r1532862943
Is it correct that only the writer
is overwritten here, and not reader
? Or is that done as part of the reconnect()
call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, at one point I was going to swap both the reader and writer. I'll change the reader back, I can also swap it in the future if I needed.
} | ||
|
||
@Override | ||
public void afterContruct(Options options) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wondering, the interface was not changed, but could this be a small typo?
afterConstruct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yikes. This was already released as public but I'm fixing it anyway as a bug since I'm 99.9999% sure no one is even aware of it, it's essentially internal
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, nice work!