diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.java index efbfcf83..4b80b8e4 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2022. Axon Framework + * Copyright (c) 2010-2025. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,6 +56,7 @@ public class AsyncFetcher implements Fetcher { private final ExecutorService executorService; private final boolean requirePoolShutdown; private final Set> activeFetchers = ConcurrentHashMap.newKeySet(); + private final OffsetCommitType offsetCommitType; /** * Instantiate a Builder to be able to create a {@link AsyncFetcher}. @@ -85,6 +86,7 @@ protected AsyncFetcher(Builder builder) { this.pollTimeout = builder.pollTimeout; this.executorService = builder.executorService; this.requirePoolShutdown = builder.requirePoolShutdown; + this.offsetCommitType = builder.offsetCommitType; } /** @@ -92,10 +94,10 @@ protected AsyncFetcher(Builder builder) { */ @Override public Registration poll(Consumer consumer, - RecordConverter recordConverter, - EventConsumer eventConsumer) { + RecordConverter recordConverter, + EventConsumer eventConsumer) { return poll(consumer, recordConverter, eventConsumer, - e -> logger.warn("Error from fetching thread, should be handled properly", e)); + e -> logger.warn("Error from fetching thread, should be handled properly", e)); } /** @@ -103,14 +105,15 @@ public Registration poll(Consumer consumer, */ @Override public Registration poll(Consumer consumer, RecordConverter recordConverter, - EventConsumer eventConsumer, RuntimeErrorHandler runtimeErrorHandler) { + EventConsumer eventConsumer, RuntimeErrorHandler runtimeErrorHandler) { FetchEventsTask fetcherTask = new FetchEventsTask<>(consumer, - pollTimeout, - recordConverter, - eventConsumer, - activeFetchers::remove, - runtimeErrorHandler); + pollTimeout, + recordConverter, + eventConsumer, + activeFetchers::remove, + runtimeErrorHandler, + offsetCommitType); activeFetchers.add(fetcherTask); executorService.execute(fetcherTask); @@ -148,6 +151,7 @@ public static final class Builder { private Duration pollTimeout = Duration.ofMillis(DEFAULT_POLL_TIMEOUT_MS); private ExecutorService executorService = Executors.newCachedThreadPool(new AxonThreadFactory("AsyncFetcher")); private boolean requirePoolShutdown = true; + private OffsetCommitType offsetCommitType = OffsetCommitType.AUTO; /** * Set the {@code pollTimeout} in milliseconds for polling records from a topic. Defaults to {@code 5000} @@ -158,11 +162,36 @@ public static final class Builder { */ public Builder pollTimeout(long timeoutMillis) { assertThat(timeoutMillis, timeout -> timeout > 0, - "The poll timeout may not be negative [" + timeoutMillis + "]"); + "The poll timeout may not be negative [" + timeoutMillis + "]"); this.pollTimeout = Duration.ofMillis(timeoutMillis); return this; } + /** + * Sets the {@code offsetCommitType} defining how the {@link FetchEventsTask} will commit offsets during + * processing of events. + *

+ * Options are: + *

    + *
  • {@link OffsetCommitType#AUTO} - let the Kafka consumer commit offsets automatically in background. + *
  • + *
  • {@link OffsetCommitType#COMMIT_SYNC} - let the Kafka consumer commit offsets synchronously after + * processing.
  • + *
  • {@link OffsetCommitType#COMMIT_ASYNC} - let the Kafka consumer commit offsets asynchronously after + * processing.
  • + *
+ *

+ * Defaults to {@code OffsetCommitType#AUTO}, meaning the offset commit task happens in the background. + * + * @param offsetCommitType {@link OffsetCommitType} enum to specify the offset commit type + * @return the current Builder instance, for fluent interfacing + */ + public AsyncFetcher.Builder offsetCommitType(OffsetCommitType offsetCommitType) { + assertNonNull(offsetCommitType, "OffsetCommitType may not be null"); + this.offsetCommitType = offsetCommitType; + return this; + } + /** * Sets the {@link ExecutorService} used to start {@link FetchEventsTask} instances to poll for Kafka consumer * records. Note that the {@code executorService} should contain sufficient threads to run the necessary fetcher diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTask.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTask.java index 67f09e5a..4fa7cd1a 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTask.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTask.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2022. Axon Framework + * Copyright (c) 2010-2025. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,6 +52,7 @@ class FetchEventsTask implements Runnable { private final EventConsumer eventConsumer; private final java.util.function.Consumer> closeHandler; private final RuntimeErrorHandler runtimeErrorHandler; + private final OffsetCommitType offsetCommitType; /** @@ -72,7 +73,8 @@ class FetchEventsTask implements Runnable { RecordConverter recordConverter, EventConsumer eventConsumer, java.util.function.Consumer> closeHandler, - RuntimeErrorHandler runtimeErrorHandler) { + RuntimeErrorHandler runtimeErrorHandler, + OffsetCommitType offsetCommitType) { this.consumer = nonNull(consumer, () -> "Consumer may not be null"); assertThat(pollTimeout, time -> !time.isNegative(), "The poll timeout may not be negative [" + pollTimeout + "]"); @@ -81,6 +83,7 @@ class FetchEventsTask implements Runnable { this.eventConsumer = eventConsumer; this.closeHandler = getOrDefault(closeHandler, task -> { /* no-op */ }); this.runtimeErrorHandler = nonNull(runtimeErrorHandler, () -> "Runtime error handler may not be null"); + this.offsetCommitType = offsetCommitType; } @Override @@ -109,6 +112,7 @@ private void processRecords(ConsumerRecords records) { try { if (!convertedMessages.isEmpty()) { eventConsumer.consume(convertedMessages); + handleOffsetsIfRequired(convertedMessages.size()); } } catch (InterruptedException e) { logger.debug("Event Consumer thread was interrupted. Shutting down", e); @@ -117,6 +121,18 @@ private void processRecords(ConsumerRecords records) { } } + private void handleOffsetsIfRequired(int messageCount) { + if (OffsetCommitType.COMMIT_SYNC == offsetCommitType) { + consumer.commitSync(); + logger.debug("Committed offsets synchronously for {} messages", messageCount); + } else if (OffsetCommitType.COMMIT_ASYNC == offsetCommitType) { + consumer.commitAsync(); + logger.debug("Committed offsets asynchronously for {} messages", messageCount); + } else { + logger.debug("Consumer will commit offsets in background"); + } + } + /** * Shutdown this {@link FetchEventsTask}. */ diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/OffsetCommitType.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/OffsetCommitType.java new file mode 100644 index 00000000..19c49dc2 --- /dev/null +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/OffsetCommitType.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2010-2025. Axon Framework + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.axonframework.extensions.kafka.eventhandling.consumer; + +import org.apache.kafka.clients.consumer.KafkaConsumer; + +/** + * Enum to define how the consumer will handle committing offsets. + * + * @author Bradley Skuse + * @since 4.11.1 + */ +public enum OffsetCommitType { + + /** + * Kafka consumer will commit offsets automatically in the background. + */ + AUTO, + + /** + * Kafka consumer will commit offsets asynchronously after processing + * + * @see KafkaConsumer#commitAsync() + */ + COMMIT_ASYNC, + + /** + * Kafka consumer will commit offsets synchronously after processing + * + * @see KafkaConsumer#commitSync() + */ + COMMIT_SYNC +} diff --git a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTaskTest.java b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTaskTest.java index 05451ba1..aa9f3797 100644 --- a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTaskTest.java +++ b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTaskTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2022. Axon Framework + * Copyright (c) 2010-2025. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -76,7 +76,8 @@ void setUp() { testRecordConverter, testEventConsumer, testCloseHandler, - runtimeErrorHandler + runtimeErrorHandler, + OffsetCommitType.AUTO ); when(testConsumer.poll(testPollTimeout)).thenReturn(consumerRecords); @@ -95,7 +96,8 @@ void testTaskConstructionWithInvalidConsumerShouldThrowException() { testRecordConverter, testEventConsumer, testCloseHandler, - runtimeErrorHandler + runtimeErrorHandler, + OffsetCommitType.AUTO ) ); } @@ -111,7 +113,8 @@ void testTaskConstructionWithNegativeTimeoutShouldThrowException() { testRecordConverter, testEventConsumer, testCloseHandler, - runtimeErrorHandler + runtimeErrorHandler, + OffsetCommitType.AUTO ) ); } @@ -127,7 +130,8 @@ void testFetchEventsTaskInterruptionClosesAsExpected() { testRecordConverter, failingEventConsumer, testCloseHandler, - runtimeErrorHandler + runtimeErrorHandler, + OffsetCommitType.AUTO ); Thread taskRunner = new Thread(testSubjectWithFailingEventConsumer); @@ -148,6 +152,8 @@ void testFetchEventsTaskPollsConvertsAndConsumesRecords() throws InterruptedExce verify(testConsumer, atLeastOnceWithTimeout).poll(testPollTimeout); verify(testRecordConverter, atLeastOnceWithTimeout).convert(consumerRecords); verify(testEventConsumer, atLeastOnceWithTimeout).consume(Collections.singletonList(kafkaEventMessage)); + verify(testConsumer, never()).commitSync(); + verify(testConsumer, never()).commitAsync(); taskRunner.interrupt(); } @@ -163,6 +169,8 @@ void testFetchEventsTaskPollsDoesNotCallEventConsumerForZeroConvertedEvents() { verify(testConsumer, atLeastOnceWithTimeout).poll(testPollTimeout); verify(testRecordConverter, atLeastOnceWithTimeout).convert(consumerRecords); verifyNoMoreInteractions(testEventConsumer); + verify(testConsumer, never()).commitSync(); + verify(testConsumer, never()).commitAsync(); taskRunner.interrupt(); } @@ -177,4 +185,54 @@ void testCloseCallsProvidedCloseHandler() { assertWithin(Duration.ofMillis(TIMEOUT_MILLIS), () -> assertTrue(expectedToBeClosed.get())); verify(testConsumer, timeout(TIMEOUT_MILLIS)).close(); } + + @Test + void testFetchEventsTaskPollsConvertsAndConsumesRecordsAndCommitOffsetsSync() throws InterruptedException { + VerificationMode atLeastOnceWithTimeout = timeout(TIMEOUT_MILLIS).atLeastOnce(); + + FetchEventsTask testSubjectWithCommitOffsets = new FetchEventsTask<>( + testConsumer, + testPollTimeout, + testRecordConverter, + testEventConsumer, + testCloseHandler, + runtimeErrorHandler, + OffsetCommitType.COMMIT_SYNC + ); + + Thread taskRunner = new Thread(testSubjectWithCommitOffsets); + taskRunner.start(); + + verify(testConsumer, atLeastOnceWithTimeout).poll(testPollTimeout); + verify(testRecordConverter, atLeastOnceWithTimeout).convert(consumerRecords); + verify(testEventConsumer, atLeastOnceWithTimeout).consume(Collections.singletonList(kafkaEventMessage)); + verify(testConsumer, atLeastOnceWithTimeout).commitSync(); + + taskRunner.interrupt(); + } + + @Test + void testFetchEventsTaskPollsConvertsAndConsumesRecordsAndCommitOffsetsAsync() throws InterruptedException { + VerificationMode atLeastOnceWithTimeout = timeout(TIMEOUT_MILLIS).atLeastOnce(); + + FetchEventsTask testSubjectWithCommitOffsets = new FetchEventsTask<>( + testConsumer, + testPollTimeout, + testRecordConverter, + testEventConsumer, + testCloseHandler, + runtimeErrorHandler, + OffsetCommitType.COMMIT_ASYNC + ); + + Thread taskRunner = new Thread(testSubjectWithCommitOffsets); + taskRunner.start(); + + verify(testConsumer, atLeastOnceWithTimeout).poll(testPollTimeout); + verify(testRecordConverter, atLeastOnceWithTimeout).convert(consumerRecords); + verify(testEventConsumer, atLeastOnceWithTimeout).consume(Collections.singletonList(kafkaEventMessage)); + verify(testConsumer, atLeastOnceWithTimeout).commitAsync(); + + taskRunner.interrupt(); + } }