Skip to content

Commit

Permalink
[#3214] Decouple readiness checks from verticle deployment
Browse files Browse the repository at this point in the history
The Kafka based clients have been adapted to provide a readiness check
that is independent from the clients' startup. This is to better
resemble the behavior of Hono's other components and also to not block
the deployment of verticles during startup of Hono services and
adapters.

Fixes #3214

Signed-off-by: Kai Hudalla <[email protected]>
  • Loading branch information
sophokles73 authored May 5, 2022
1 parent d7343ba commit 02b0a91
Show file tree
Hide file tree
Showing 32 changed files with 1,476 additions and 886 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -782,20 +782,20 @@ public final Map<String, Object> getDownstreamMessageProperties(final TelemetryE
@Override
public void registerReadinessChecks(final HealthCheckHandler handler) {

if (commandConsumerFactory instanceof ServiceClient) {
((ServiceClient) commandConsumerFactory).registerReadinessChecks(handler);
if (commandConsumerFactory instanceof ServiceClient client) {
client.registerReadinessChecks(handler);
}
if (tenantClient instanceof ServiceClient) {
((ServiceClient) tenantClient).registerReadinessChecks(handler);
if (tenantClient instanceof ServiceClient client) {
client.registerReadinessChecks(handler);
}
if (registrationClient instanceof ServiceClient) {
((ServiceClient) registrationClient).registerReadinessChecks(handler);
if (registrationClient instanceof ServiceClient client) {
client.registerReadinessChecks(handler);
}
if (credentialsClient instanceof ServiceClient) {
((ServiceClient) credentialsClient).registerReadinessChecks(handler);
if (credentialsClient instanceof ServiceClient client) {
client.registerReadinessChecks(handler);
}
if (commandRouterClient instanceof ServiceClient) {
((ServiceClient) commandRouterClient).registerReadinessChecks(handler);
if (commandRouterClient instanceof ServiceClient client) {
client.registerReadinessChecks(handler);
}
messagingClientProviders.registerReadinessChecks(handler);
}
Expand All @@ -810,20 +810,20 @@ public void registerReadinessChecks(final HealthCheckHandler handler) {
public void registerLivenessChecks(final HealthCheckHandler handler) {
registerEventLoopBlockedCheck(handler);

if (commandConsumerFactory instanceof ServiceClient) {
((ServiceClient) commandConsumerFactory).registerLivenessChecks(handler);
if (commandConsumerFactory instanceof ServiceClient client) {
client.registerLivenessChecks(handler);
}
if (tenantClient instanceof ServiceClient) {
((ServiceClient) tenantClient).registerLivenessChecks(handler);
if (tenantClient instanceof ServiceClient client) {
client.registerLivenessChecks(handler);
}
if (registrationClient instanceof ServiceClient) {
((ServiceClient) registrationClient).registerLivenessChecks(handler);
if (registrationClient instanceof ServiceClient client) {
client.registerLivenessChecks(handler);
}
if (credentialsClient instanceof ServiceClient) {
((ServiceClient) credentialsClient).registerLivenessChecks(handler);
if (credentialsClient instanceof ServiceClient client) {
client.registerLivenessChecks(handler);
}
if (commandRouterClient instanceof ServiceClient) {
((ServiceClient) commandRouterClient).registerLivenessChecks(handler);
if (commandRouterClient instanceof ServiceClient client) {
client.registerLivenessChecks(handler);
}
messagingClientProviders.registerLivenessChecks(handler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,13 @@
import org.eclipse.hono.client.telemetry.kafka.KafkaBasedEventSender;
import org.eclipse.hono.client.telemetry.kafka.KafkaBasedTelemetrySender;
import org.eclipse.hono.client.util.MessagingClientProvider;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.notification.NotificationConstants;
import org.eclipse.hono.notification.NotificationEventBusSupport;
import org.eclipse.hono.notification.NotificationReceiver;
import org.eclipse.hono.service.cache.Caches;
import org.eclipse.hono.service.quarkus.AbstractServiceApplication;
import org.eclipse.hono.service.util.ServiceClientAdapter;
import org.eclipse.hono.util.CredentialsObject;
import org.eclipse.hono.util.CredentialsResult;
import org.eclipse.hono.util.MessagingType;
Expand Down Expand Up @@ -322,10 +324,10 @@ protected void setCollaborators(final AbstractProtocolAdapterBase<?> adapter) {

final DeviceRegistrationClient registrationClient = registrationClient();

final MessagingClientProvider<TelemetrySender> telemetrySenderProvider = new MessagingClientProvider<>();
final MessagingClientProvider<EventSender> eventSenderProvider = new MessagingClientProvider<>();
final MessagingClientProvider<CommandResponseSender> commandResponseSenderProvider = new MessagingClientProvider<>();
final KafkaClientMetricsSupport kafkaClientMetricsSupport = kafkaClientMetricsSupport(kafkaMetricsOptions);
final var telemetrySenderProvider = new MessagingClientProvider<TelemetrySender>();
final var eventSenderProvider = new MessagingClientProvider<EventSender>();
final var commandResponseSenderProvider = new MessagingClientProvider<CommandResponseSender>();
final var kafkaClientMetricsSupport = kafkaClientMetricsSupport(kafkaMetricsOptions);
final var tenantClient = tenantClient();

if (kafkaEventConfig.isConfigured()) {
Expand All @@ -334,10 +336,18 @@ protected void setCollaborators(final AbstractProtocolAdapterBase<?> adapter) {
final KafkaProducerFactory<String, Buffer> factory = CachingKafkaProducerFactory.sharedFactory(vertx);
factory.setMetricsSupport(kafkaClientMetricsSupport);

telemetrySenderProvider.setClient(new KafkaBasedTelemetrySender(vertx, factory, kafkaTelemetryConfig,
protocolAdapterProperties.isDefaultsEnabled(), tracer));
eventSenderProvider.setClient(new KafkaBasedEventSender(vertx, factory, kafkaEventConfig,
protocolAdapterProperties.isDefaultsEnabled(), tracer));
telemetrySenderProvider.setClient(new KafkaBasedTelemetrySender(
vertx,
factory,
kafkaTelemetryConfig,
protocolAdapterProperties.isDefaultsEnabled(),
tracer));
eventSenderProvider.setClient(new KafkaBasedEventSender(
vertx,
factory,
kafkaEventConfig,
protocolAdapterProperties.isDefaultsEnabled(),
tracer));
commandResponseSenderProvider.setClient(new KafkaBasedCommandResponseSender(
vertx,
factory,
Expand All @@ -354,8 +364,10 @@ protected void setCollaborators(final AbstractProtocolAdapterBase<?> adapter) {
protocolAdapterProperties.isJmsVendorPropsEnabled()));
}

final MessagingClientProviders messagingClientProviders = new MessagingClientProviders(telemetrySenderProvider,
eventSenderProvider, commandResponseSenderProvider);
final MessagingClientProviders messagingClientProviders = new MessagingClientProviders(
telemetrySenderProvider,
eventSenderProvider,
commandResponseSenderProvider);

if (commandRouterConfig.isHostConfigured()) {
final CommandRouterClient commandRouterClient = commandRouterClient();
Expand Down Expand Up @@ -609,6 +621,9 @@ public NotificationReceiver notificationReceiver() {
notificationConfig.setServerRole("Notification");
notificationReceiver = new ProtonBasedNotificationReceiver(HonoConnection.newConnection(vertx, notificationConfig, tracer));
}
if (notificationReceiver instanceof ServiceClient serviceClient) {
healthCheckServer.registerHealthCheckResources(ServiceClientAdapter.forClient(serviceClient));
}
final var notificationSender = NotificationEventBusSupport.getNotificationSender(vertx);
NotificationConstants.DEVICE_REGISTRY_NOTIFICATION_TYPES.forEach(notificationType ->
notificationReceiver.registerConsumer(notificationType, notificationSender::handle));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
Expand Down Expand Up @@ -106,7 +107,8 @@ public KafkaApplicationClientImpl(
public Future<Void> stop() {
// stop created consumers
final List<Future> closeKafkaClientsTracker = consumersToCloseOnStop.stream()
.map(MessageConsumer::close).collect(Collectors.toList());
.map(MessageConsumer::close)
.collect(Collectors.toList());
// add command sender related clients
closeKafkaClientsTracker.add(super.stop());
return CompositeFuture.join(closeKafkaClientsTracker)
Expand Down Expand Up @@ -165,12 +167,15 @@ private Future<MessageConsumer> createKafkaBasedDownstreamMessageConsumer(
final Handler<KafkaConsumerRecord<String, Buffer>> recordHandler = record -> {
messageHandler.handle(new KafkaDownstreamMessage(record));
};
final Promise<Void> readyTracker = Promise.promise();
final HonoKafkaConsumer<Buffer> consumer = new HonoKafkaConsumer<>(vertx, Set.of(topic), recordHandler,
consumerConfig.getConsumerConfig(type.toString()));
consumer.setPollTimeout(Duration.ofMillis(consumerConfig.getPollTimeout()));
consumer.addOnKafkaConsumerReadyHandler(readyTracker);
Optional.ofNullable(kafkaConsumerSupplier)
.ifPresent(consumer::setKafkaConsumerSupplier);
return consumer.start()
.compose(ok -> readyTracker.future())
.map(v -> (MessageConsumer) new MessageConsumer() {
@Override
public Future<Void> close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,18 @@ public KafkaBasedCommandSender(
@SuppressWarnings("rawtypes")
@Override
public Future<Void> stop() {
// assemble futures for closing the command response consumers
final List<Future> stopKafkaClientsTracker = commandResponseConsumers.values().stream()
.map(HonoKafkaConsumer::stop)
.collect(Collectors.toList());
commandResponseConsumers.clear();

// add future for closing command producer
stopKafkaClientsTracker.add(super.stop());

return CompositeFuture.join(stopKafkaClientsTracker)
return lifecycleStatus.runStopAttempt(() -> {
// assemble futures for closing the command response consumers
final List<Future> stopConsumersTracker = commandResponseConsumers.values().stream()
.map(HonoKafkaConsumer::stop)
.collect(Collectors.toList());
commandResponseConsumers.clear();
return CompositeFuture.join(
stopProducer(),
CompositeFuture.join(stopConsumersTracker))
.mapEmpty();
});
}

/**
Expand Down Expand Up @@ -424,11 +425,14 @@ private Future<Void> subscribeForCommandResponse(final String tenantId, final Sp
getCommandResponseHandler(tenantId)
.handle(new KafkaDownstreamMessage(record));
};
final Promise<Void> readyTracker = Promise.promise();
final HonoKafkaConsumer<Buffer> consumer = new HonoKafkaConsumer<>(vertx, Set.of(topic), recordHandler, consumerConfig);
consumer.setPollTimeout(Duration.ofMillis(this.consumerConfig.getPollTimeout()));
Optional.ofNullable(kafkaConsumerSupplier)
.ifPresent(consumer::setKafkaConsumerSupplier);
consumer.addOnKafkaConsumerReadyHandler(readyTracker);
return consumer.start()
.compose(ok -> readyTracker.future())
.recover(error -> {
LOGGER.debug("error creating command response consumer for tenant [{}]", tenantId, error);
TracingHelper.logError(span, "error creating command response consumer", error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.junit5.Timeout;
Expand All @@ -57,6 +58,7 @@
public class KafkaApplicationClientImplTest {

private static final String PARAMETERIZED_TEST_NAME_PATTERN = "{displayName} [{index}]; parameters: {argumentsWithNames}";
private Promise<Void> clientReadyTracker;
private KafkaApplicationClientImpl client;
private KafkaMockConsumer<String, Buffer> mockConsumer;
private String tenantId;
Expand Down Expand Up @@ -94,8 +96,10 @@ void setUp(final Vertx vertx) {
producerConfig.setCommonClientConfig(commonConfig);
producerConfig.setProducerConfig(Map.of("client.id", "application-test-sender"));

clientReadyTracker = Promise.promise();
client = new KafkaApplicationClientImpl(vertx, consumerConfig, producerFactory, producerConfig);
client.setKafkaConsumerFactory(() -> mockConsumer);
client.addOnKafkaProducerReadyHandler(clientReadyTracker);
}

/**
Expand All @@ -119,11 +123,13 @@ void shutDown(final VertxTestContext context) {
public void testCreateConsumer(final Type msgType, final VertxTestContext ctx) {

// Verify that the consumer for the given tenant and the message type is successfully created
createConsumer(tenantId, msgType, m -> {}, t -> {})
.onComplete(ctx.succeeding(consumer -> ctx.verify(() -> {
assertThat(consumer).isNotNull();
ctx.completeNow();
})));
client.start()
.compose(ok -> clientReadyTracker.future())
.compose(ok -> createConsumer(tenantId, msgType, m -> {}, t -> {}))
.onComplete(ctx.succeeding(consumer -> ctx.verify(() -> {
assertThat(consumer).isNotNull();
ctx.completeNow();
})));
}

/**
Expand All @@ -138,14 +144,15 @@ public void testCreateConsumer(final Type msgType, final VertxTestContext ctx) {
public void testStopClosesConsumer(final Type msgType, final VertxTestContext ctx) {

// Verify that the consumer for the given tenant and the message type is successfully created
createConsumer(tenantId, msgType, m -> {}, t -> {})
// stop the application client
.compose(c -> client.stop())
.onComplete(ctx.succeeding(v -> ctx.verify(() -> {
// verify that the Kafka mock consumer is closed
assertThat(mockConsumer.closed()).isTrue();
ctx.completeNow();
})));
client.start()
.compose(ok -> clientReadyTracker.future())
.compose(ok -> createConsumer(tenantId, msgType, m -> {}, t -> {}))
// stop the application client
.compose(c -> client.stop())
.onComplete(ctx.succeeding(v -> {
ctx.verify(() -> assertThat(mockConsumer.closed()).isTrue());
ctx.completeNow();
}));
}

/**
Expand All @@ -157,19 +164,24 @@ public void testStopClosesConsumer(final Type msgType, final VertxTestContext ct
@ParameterizedTest(name = PARAMETERIZED_TEST_NAME_PATTERN)
@MethodSource("messageTypes")
public void testCloseConsumer(final Type msgType, final VertxTestContext ctx) {

// Given a consumer for the given tenant and the message type
createConsumer(tenantId, msgType, m -> {}, t -> {})
// When the message consumer is closed
.compose(MessageConsumer::close)
.onComplete(ctx.succeeding(consumer -> ctx.verify(() -> {
// verify that the Kafka mock consumer is also closed
assertThat(mockConsumer.closed()).isTrue();
ctx.completeNow();
})));
client.start()
.compose(ok -> clientReadyTracker.future())
.compose(ok -> createConsumer(tenantId, msgType, m -> {}, t -> {}))
// When the message consumer is closed
.compose(MessageConsumer::close)
.onComplete(ctx.succeeding(ok -> {
ctx.verify(() -> assertThat(mockConsumer.closed()).isTrue());
ctx.completeNow();
}));
}

private Future<MessageConsumer> createConsumer(final String tenantId, final Type type,
final Handler<DownstreamMessage<KafkaMessageContext>> msgHandler, final Handler<Throwable> closeHandler) {
private Future<MessageConsumer> createConsumer(
final String tenantId,
final Type type,
final Handler<DownstreamMessage<KafkaMessageContext>> msgHandler,
final Handler<Throwable> closeHandler) {

final String topic = new HonoTopic(type, tenantId).toString();
final TopicPartition topicPartition = new TopicPartition(topic, 0);
Expand Down
Loading

0 comments on commit 02b0a91

Please sign in to comment.