From f4346688f6eb2a44ffcaef285854caa8d7509f44 Mon Sep 17 00:00:00 2001 From: Brad Skuse <998188+bradskuse@users.noreply.github.com> Date: Tue, 18 Mar 2025 16:52:54 +0800 Subject: [PATCH 1/3] Add option to commit offsets --- .../eventhandling/consumer/AsyncFetcher.java | 19 ++++++++- .../consumer/FetchEventsTask.java | 9 ++++- .../consumer/FetchEventsTaskTest.java | 39 +++++++++++++++++-- 3 files changed, 61 insertions(+), 6 deletions(-) diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.java index efbfcf83..8d0b0a04 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.java @@ -56,6 +56,7 @@ public class AsyncFetcher implements Fetcher { private final ExecutorService executorService; private final boolean requirePoolShutdown; private final Set> activeFetchers = ConcurrentHashMap.newKeySet(); + private final boolean commitOnProcessed; /** * Instantiate a Builder to be able to create a {@link AsyncFetcher}. @@ -85,6 +86,7 @@ protected AsyncFetcher(Builder builder) { this.pollTimeout = builder.pollTimeout; this.executorService = builder.executorService; this.requirePoolShutdown = builder.requirePoolShutdown; + this.commitOnProcessed = builder.commitOnProcessed; } /** @@ -110,7 +112,8 @@ public Registration poll(Consumer consumer, RecordConverter recor recordConverter, eventConsumer, activeFetchers::remove, - runtimeErrorHandler); + runtimeErrorHandler, + commitOnProcessed); activeFetchers.add(fetcherTask); executorService.execute(fetcherTask); @@ -148,6 +151,7 @@ public static final class Builder { private Duration pollTimeout = Duration.ofMillis(DEFAULT_POLL_TIMEOUT_MS); private ExecutorService executorService = Executors.newCachedThreadPool(new AxonThreadFactory("AsyncFetcher")); private boolean requirePoolShutdown = true; + private boolean commitOnProcessed = false; /** * Set the {@code pollTimeout} in milliseconds for polling records from a topic. Defaults to {@code 5000} @@ -163,6 +167,19 @@ public Builder pollTimeout(long timeoutMillis) { return this; } + /** + * Set the {@code commitOnProcessed}, if false expects auto commit to be used, + * if true will commit the offsets after processing all the records in the poll. + * Defaults to {@code false} + * + * @param commitOnProcessed boolean flag to commit the offsets after processing all the records in the poll + * @return the current Builder instance, for fluent interfacing + */ + public AsyncFetcher.Builder commitOnProcessed(boolean commitOnProcessed) { + this.commitOnProcessed = commitOnProcessed; + return this; + } + /** * Sets the {@link ExecutorService} used to start {@link FetchEventsTask} instances to poll for Kafka consumer * records. Note that the {@code executorService} should contain sufficient threads to run the necessary fetcher diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTask.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTask.java index 67f09e5a..4aad3e31 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTask.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTask.java @@ -52,6 +52,7 @@ class FetchEventsTask implements Runnable { private final EventConsumer eventConsumer; private final java.util.function.Consumer> closeHandler; private final RuntimeErrorHandler runtimeErrorHandler; + private final boolean commitOnProcessed; /** @@ -72,7 +73,8 @@ class FetchEventsTask implements Runnable { RecordConverter recordConverter, EventConsumer eventConsumer, java.util.function.Consumer> closeHandler, - RuntimeErrorHandler runtimeErrorHandler) { + RuntimeErrorHandler runtimeErrorHandler, + boolean commitOnProcessed) { this.consumer = nonNull(consumer, () -> "Consumer may not be null"); assertThat(pollTimeout, time -> !time.isNegative(), "The poll timeout may not be negative [" + pollTimeout + "]"); @@ -81,6 +83,7 @@ class FetchEventsTask implements Runnable { this.eventConsumer = eventConsumer; this.closeHandler = getOrDefault(closeHandler, task -> { /* no-op */ }); this.runtimeErrorHandler = nonNull(runtimeErrorHandler, () -> "Runtime error handler may not be null"); + this.commitOnProcessed = commitOnProcessed; } @Override @@ -109,6 +112,10 @@ private void processRecords(ConsumerRecords records) { try { if (!convertedMessages.isEmpty()) { eventConsumer.consume(convertedMessages); + if (commitOnProcessed) { + consumer.commitSync(); + logger.debug("Committed offsets after processing {} messages", convertedMessages.size()); + } } } catch (InterruptedException e) { logger.debug("Event Consumer thread was interrupted. Shutting down", e); diff --git a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTaskTest.java b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTaskTest.java index 05451ba1..e41183e0 100644 --- a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTaskTest.java +++ b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTaskTest.java @@ -76,7 +76,8 @@ void setUp() { testRecordConverter, testEventConsumer, testCloseHandler, - runtimeErrorHandler + runtimeErrorHandler, + false ); when(testConsumer.poll(testPollTimeout)).thenReturn(consumerRecords); @@ -95,7 +96,8 @@ void testTaskConstructionWithInvalidConsumerShouldThrowException() { testRecordConverter, testEventConsumer, testCloseHandler, - runtimeErrorHandler + runtimeErrorHandler, + false ) ); } @@ -111,7 +113,8 @@ void testTaskConstructionWithNegativeTimeoutShouldThrowException() { testRecordConverter, testEventConsumer, testCloseHandler, - runtimeErrorHandler + runtimeErrorHandler, + false ) ); } @@ -127,7 +130,8 @@ void testFetchEventsTaskInterruptionClosesAsExpected() { testRecordConverter, failingEventConsumer, testCloseHandler, - runtimeErrorHandler + runtimeErrorHandler, + false ); Thread taskRunner = new Thread(testSubjectWithFailingEventConsumer); @@ -148,6 +152,7 @@ void testFetchEventsTaskPollsConvertsAndConsumesRecords() throws InterruptedExce verify(testConsumer, atLeastOnceWithTimeout).poll(testPollTimeout); verify(testRecordConverter, atLeastOnceWithTimeout).convert(consumerRecords); verify(testEventConsumer, atLeastOnceWithTimeout).consume(Collections.singletonList(kafkaEventMessage)); + verify(testConsumer, never()).commitSync(); taskRunner.interrupt(); } @@ -163,6 +168,7 @@ void testFetchEventsTaskPollsDoesNotCallEventConsumerForZeroConvertedEvents() { verify(testConsumer, atLeastOnceWithTimeout).poll(testPollTimeout); verify(testRecordConverter, atLeastOnceWithTimeout).convert(consumerRecords); verifyNoMoreInteractions(testEventConsumer); + verify(testConsumer, never()).commitSync(); taskRunner.interrupt(); } @@ -177,4 +183,29 @@ void testCloseCallsProvidedCloseHandler() { assertWithin(Duration.ofMillis(TIMEOUT_MILLIS), () -> assertTrue(expectedToBeClosed.get())); verify(testConsumer, timeout(TIMEOUT_MILLIS)).close(); } + + @Test + void testFetchEventsTaskPollsConvertsAndConsumesRecordsAndCommitOffsets() throws InterruptedException { + VerificationMode atLeastOnceWithTimeout = timeout(TIMEOUT_MILLIS).atLeastOnce(); + + FetchEventsTask testSubjectWithCommitOffsets = new FetchEventsTask<>( + testConsumer, + testPollTimeout, + testRecordConverter, + testEventConsumer, + testCloseHandler, + runtimeErrorHandler, + true + ); + + Thread taskRunner = new Thread(testSubjectWithCommitOffsets); + taskRunner.start(); + + verify(testConsumer, atLeastOnceWithTimeout).poll(testPollTimeout); + verify(testRecordConverter, atLeastOnceWithTimeout).convert(consumerRecords); + verify(testEventConsumer, atLeastOnceWithTimeout).consume(Collections.singletonList(kafkaEventMessage)); + verify(testConsumer, atLeastOnceWithTimeout).commitSync(); + + taskRunner.interrupt(); + } } From b8a5b61dd782e92c3797d9a66de8c74cb9b4efea Mon Sep 17 00:00:00 2001 From: Brad Skuse <998188+bradskuse@users.noreply.github.com> Date: Wed, 26 Mar 2025 12:13:19 +0800 Subject: [PATCH 2/3] Add option to commit offsets async --- .../eventhandling/consumer/AsyncFetcher.java | 22 ++++++----- .../consumer/FetchEventsTask.java | 23 +++++++---- .../consumer/OffsetCommitType.java | 10 +++++ .../consumer/FetchEventsTaskTest.java | 39 ++++++++++++++++--- 4 files changed, 71 insertions(+), 23 deletions(-) create mode 100644 kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/OffsetCommitType.java diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.java index 8d0b0a04..5d5b4e7c 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.java @@ -56,7 +56,7 @@ public class AsyncFetcher implements Fetcher { private final ExecutorService executorService; private final boolean requirePoolShutdown; private final Set> activeFetchers = ConcurrentHashMap.newKeySet(); - private final boolean commitOnProcessed; + private final OffsetCommitType offsetCommitType; /** * Instantiate a Builder to be able to create a {@link AsyncFetcher}. @@ -86,7 +86,7 @@ protected AsyncFetcher(Builder builder) { this.pollTimeout = builder.pollTimeout; this.executorService = builder.executorService; this.requirePoolShutdown = builder.requirePoolShutdown; - this.commitOnProcessed = builder.commitOnProcessed; + this.offsetCommitType = builder.offsetCommitType; } /** @@ -113,7 +113,7 @@ public Registration poll(Consumer consumer, RecordConverter recor eventConsumer, activeFetchers::remove, runtimeErrorHandler, - commitOnProcessed); + offsetCommitType); activeFetchers.add(fetcherTask); executorService.execute(fetcherTask); @@ -151,7 +151,7 @@ public static final class Builder { private Duration pollTimeout = Duration.ofMillis(DEFAULT_POLL_TIMEOUT_MS); private ExecutorService executorService = Executors.newCachedThreadPool(new AxonThreadFactory("AsyncFetcher")); private boolean requirePoolShutdown = true; - private boolean commitOnProcessed = false; + private OffsetCommitType offsetCommitType = OffsetCommitType.AUTO; /** * Set the {@code pollTimeout} in milliseconds for polling records from a topic. Defaults to {@code 5000} @@ -168,15 +168,17 @@ public Builder pollTimeout(long timeoutMillis) { } /** - * Set the {@code commitOnProcessed}, if false expects auto commit to be used, - * if true will commit the offsets after processing all the records in the poll. - * Defaults to {@code false} + * Set the {@code offsetCommitType}, options are: + * {@link OffsetCommitType#AUTO} let the Kafka consumer commit offsets automatically in background + * {@link OffsetCommitType#COMMIT_SYNC} let the Kafka consumer commit offsets synchronously after processing + * {@link OffsetCommitType#COMMIT_ASYNC} let the Kafka consumer commit offsets asynchronously after processing + * Defaults to {@code OffsetCommitType#AUTO} * - * @param commitOnProcessed boolean flag to commit the offsets after processing all the records in the poll + * @param offsetCommitType {@link OffsetCommitType} enum to specify the offset commit type * @return the current Builder instance, for fluent interfacing */ - public AsyncFetcher.Builder commitOnProcessed(boolean commitOnProcessed) { - this.commitOnProcessed = commitOnProcessed; + public AsyncFetcher.Builder offsetCommitType(OffsetCommitType offsetCommitType) { + this.offsetCommitType = offsetCommitType; return this; } diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTask.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTask.java index 4aad3e31..c97cbe7e 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTask.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTask.java @@ -52,7 +52,7 @@ class FetchEventsTask implements Runnable { private final EventConsumer eventConsumer; private final java.util.function.Consumer> closeHandler; private final RuntimeErrorHandler runtimeErrorHandler; - private final boolean commitOnProcessed; + private final OffsetCommitType offsetCommitType; /** @@ -74,7 +74,7 @@ class FetchEventsTask implements Runnable { EventConsumer eventConsumer, java.util.function.Consumer> closeHandler, RuntimeErrorHandler runtimeErrorHandler, - boolean commitOnProcessed) { + OffsetCommitType offsetCommitType) { this.consumer = nonNull(consumer, () -> "Consumer may not be null"); assertThat(pollTimeout, time -> !time.isNegative(), "The poll timeout may not be negative [" + pollTimeout + "]"); @@ -83,7 +83,7 @@ class FetchEventsTask implements Runnable { this.eventConsumer = eventConsumer; this.closeHandler = getOrDefault(closeHandler, task -> { /* no-op */ }); this.runtimeErrorHandler = nonNull(runtimeErrorHandler, () -> "Runtime error handler may not be null"); - this.commitOnProcessed = commitOnProcessed; + this.offsetCommitType = offsetCommitType; } @Override @@ -112,10 +112,7 @@ private void processRecords(ConsumerRecords records) { try { if (!convertedMessages.isEmpty()) { eventConsumer.consume(convertedMessages); - if (commitOnProcessed) { - consumer.commitSync(); - logger.debug("Committed offsets after processing {} messages", convertedMessages.size()); - } + handleOffsetsIfRequired(convertedMessages.size()); } } catch (InterruptedException e) { logger.debug("Event Consumer thread was interrupted. Shutting down", e); @@ -124,6 +121,18 @@ private void processRecords(ConsumerRecords records) { } } + private void handleOffsetsIfRequired(int messageCount) { + if (OffsetCommitType.COMMIT_SYNC == offsetCommitType) { + consumer.commitSync(); + logger.debug("Committed offsets synchronously for {} messages", messageCount); + } else if (OffsetCommitType.COMMIT_ASYNC == offsetCommitType) { + consumer.commitAsync(); + logger.debug("Committed offsets asynchronously for {} messages", messageCount); + } else { + logger.debug("Consumer will commit offsets in background"); + } + } + /** * Shutdown this {@link FetchEventsTask}. */ diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/OffsetCommitType.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/OffsetCommitType.java new file mode 100644 index 00000000..dbc34df1 --- /dev/null +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/OffsetCommitType.java @@ -0,0 +1,10 @@ +package org.axonframework.extensions.kafka.eventhandling.consumer; + +/** + * Enum to define how the consumer will handle committing offsets. + */ +public enum OffsetCommitType { + AUTO, + COMMIT_ASYNC, + COMMIT_SYNC +} diff --git a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTaskTest.java b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTaskTest.java index e41183e0..10b905c7 100644 --- a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTaskTest.java +++ b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTaskTest.java @@ -77,7 +77,7 @@ void setUp() { testEventConsumer, testCloseHandler, runtimeErrorHandler, - false + OffsetCommitType.AUTO ); when(testConsumer.poll(testPollTimeout)).thenReturn(consumerRecords); @@ -97,7 +97,7 @@ void testTaskConstructionWithInvalidConsumerShouldThrowException() { testEventConsumer, testCloseHandler, runtimeErrorHandler, - false + OffsetCommitType.AUTO ) ); } @@ -114,7 +114,7 @@ void testTaskConstructionWithNegativeTimeoutShouldThrowException() { testEventConsumer, testCloseHandler, runtimeErrorHandler, - false + OffsetCommitType.AUTO ) ); } @@ -131,7 +131,7 @@ void testFetchEventsTaskInterruptionClosesAsExpected() { failingEventConsumer, testCloseHandler, runtimeErrorHandler, - false + OffsetCommitType.AUTO ); Thread taskRunner = new Thread(testSubjectWithFailingEventConsumer); @@ -153,6 +153,7 @@ void testFetchEventsTaskPollsConvertsAndConsumesRecords() throws InterruptedExce verify(testRecordConverter, atLeastOnceWithTimeout).convert(consumerRecords); verify(testEventConsumer, atLeastOnceWithTimeout).consume(Collections.singletonList(kafkaEventMessage)); verify(testConsumer, never()).commitSync(); + verify(testConsumer, never()).commitAsync(); taskRunner.interrupt(); } @@ -169,6 +170,7 @@ void testFetchEventsTaskPollsDoesNotCallEventConsumerForZeroConvertedEvents() { verify(testRecordConverter, atLeastOnceWithTimeout).convert(consumerRecords); verifyNoMoreInteractions(testEventConsumer); verify(testConsumer, never()).commitSync(); + verify(testConsumer, never()).commitAsync(); taskRunner.interrupt(); } @@ -185,7 +187,7 @@ void testCloseCallsProvidedCloseHandler() { } @Test - void testFetchEventsTaskPollsConvertsAndConsumesRecordsAndCommitOffsets() throws InterruptedException { + void testFetchEventsTaskPollsConvertsAndConsumesRecordsAndCommitOffsetsSync() throws InterruptedException { VerificationMode atLeastOnceWithTimeout = timeout(TIMEOUT_MILLIS).atLeastOnce(); FetchEventsTask testSubjectWithCommitOffsets = new FetchEventsTask<>( @@ -195,7 +197,7 @@ void testFetchEventsTaskPollsConvertsAndConsumesRecordsAndCommitOffsets() throws testEventConsumer, testCloseHandler, runtimeErrorHandler, - true + OffsetCommitType.COMMIT_SYNC ); Thread taskRunner = new Thread(testSubjectWithCommitOffsets); @@ -208,4 +210,29 @@ void testFetchEventsTaskPollsConvertsAndConsumesRecordsAndCommitOffsets() throws taskRunner.interrupt(); } + + @Test + void testFetchEventsTaskPollsConvertsAndConsumesRecordsAndCommitOffsetsAsync() throws InterruptedException { + VerificationMode atLeastOnceWithTimeout = timeout(TIMEOUT_MILLIS).atLeastOnce(); + + FetchEventsTask testSubjectWithCommitOffsets = new FetchEventsTask<>( + testConsumer, + testPollTimeout, + testRecordConverter, + testEventConsumer, + testCloseHandler, + runtimeErrorHandler, + OffsetCommitType.COMMIT_ASYNC + ); + + Thread taskRunner = new Thread(testSubjectWithCommitOffsets); + taskRunner.start(); + + verify(testConsumer, atLeastOnceWithTimeout).poll(testPollTimeout); + verify(testRecordConverter, atLeastOnceWithTimeout).convert(consumerRecords); + verify(testEventConsumer, atLeastOnceWithTimeout).consume(Collections.singletonList(kafkaEventMessage)); + verify(testConsumer, atLeastOnceWithTimeout).commitAsync(); + + taskRunner.interrupt(); + } } From 17486817730fee8a86a86839808fc00b9f5134c4 Mon Sep 17 00:00:00 2001 From: Brad Skuse <998188+bradskuse@users.noreply.github.com> Date: Thu, 24 Apr 2025 15:52:08 +0800 Subject: [PATCH 3/3] Address PR comments --- .../eventhandling/consumer/AsyncFetcher.java | 44 ++++++++++++------- .../consumer/FetchEventsTask.java | 2 +- .../consumer/OffsetCommitType.java | 37 ++++++++++++++++ .../consumer/FetchEventsTaskTest.java | 2 +- 4 files changed, 66 insertions(+), 19 deletions(-) diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.java index 5d5b4e7c..4b80b8e4 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2022. Axon Framework + * Copyright (c) 2010-2025. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -94,10 +94,10 @@ protected AsyncFetcher(Builder builder) { */ @Override public Registration poll(Consumer consumer, - RecordConverter recordConverter, - EventConsumer eventConsumer) { + RecordConverter recordConverter, + EventConsumer eventConsumer) { return poll(consumer, recordConverter, eventConsumer, - e -> logger.warn("Error from fetching thread, should be handled properly", e)); + e -> logger.warn("Error from fetching thread, should be handled properly", e)); } /** @@ -105,15 +105,15 @@ public Registration poll(Consumer consumer, */ @Override public Registration poll(Consumer consumer, RecordConverter recordConverter, - EventConsumer eventConsumer, RuntimeErrorHandler runtimeErrorHandler) { + EventConsumer eventConsumer, RuntimeErrorHandler runtimeErrorHandler) { FetchEventsTask fetcherTask = new FetchEventsTask<>(consumer, - pollTimeout, - recordConverter, - eventConsumer, - activeFetchers::remove, - runtimeErrorHandler, - offsetCommitType); + pollTimeout, + recordConverter, + eventConsumer, + activeFetchers::remove, + runtimeErrorHandler, + offsetCommitType); activeFetchers.add(fetcherTask); executorService.execute(fetcherTask); @@ -162,22 +162,32 @@ public static final class Builder { */ public Builder pollTimeout(long timeoutMillis) { assertThat(timeoutMillis, timeout -> timeout > 0, - "The poll timeout may not be negative [" + timeoutMillis + "]"); + "The poll timeout may not be negative [" + timeoutMillis + "]"); this.pollTimeout = Duration.ofMillis(timeoutMillis); return this; } /** - * Set the {@code offsetCommitType}, options are: - * {@link OffsetCommitType#AUTO} let the Kafka consumer commit offsets automatically in background - * {@link OffsetCommitType#COMMIT_SYNC} let the Kafka consumer commit offsets synchronously after processing - * {@link OffsetCommitType#COMMIT_ASYNC} let the Kafka consumer commit offsets asynchronously after processing - * Defaults to {@code OffsetCommitType#AUTO} + * Sets the {@code offsetCommitType} defining how the {@link FetchEventsTask} will commit offsets during + * processing of events. + *

+ * Options are: + *

    + *
  • {@link OffsetCommitType#AUTO} - let the Kafka consumer commit offsets automatically in background. + *
  • + *
  • {@link OffsetCommitType#COMMIT_SYNC} - let the Kafka consumer commit offsets synchronously after + * processing.
  • + *
  • {@link OffsetCommitType#COMMIT_ASYNC} - let the Kafka consumer commit offsets asynchronously after + * processing.
  • + *
+ *

+ * Defaults to {@code OffsetCommitType#AUTO}, meaning the offset commit task happens in the background. * * @param offsetCommitType {@link OffsetCommitType} enum to specify the offset commit type * @return the current Builder instance, for fluent interfacing */ public AsyncFetcher.Builder offsetCommitType(OffsetCommitType offsetCommitType) { + assertNonNull(offsetCommitType, "OffsetCommitType may not be null"); this.offsetCommitType = offsetCommitType; return this; } diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTask.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTask.java index c97cbe7e..4fa7cd1a 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTask.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTask.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2022. Axon Framework + * Copyright (c) 2010-2025. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/OffsetCommitType.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/OffsetCommitType.java index dbc34df1..19c49dc2 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/OffsetCommitType.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/OffsetCommitType.java @@ -1,10 +1,47 @@ +/* + * Copyright (c) 2010-2025. Axon Framework + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.axonframework.extensions.kafka.eventhandling.consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; + /** * Enum to define how the consumer will handle committing offsets. + * + * @author Bradley Skuse + * @since 4.11.1 */ public enum OffsetCommitType { + + /** + * Kafka consumer will commit offsets automatically in the background. + */ AUTO, + + /** + * Kafka consumer will commit offsets asynchronously after processing + * + * @see KafkaConsumer#commitAsync() + */ COMMIT_ASYNC, + + /** + * Kafka consumer will commit offsets synchronously after processing + * + * @see KafkaConsumer#commitSync() + */ COMMIT_SYNC } diff --git a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTaskTest.java b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTaskTest.java index 10b905c7..aa9f3797 100644 --- a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTaskTest.java +++ b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTaskTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2022. Axon Framework + * Copyright (c) 2010-2025. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.