Skip to content

[#555] Add option to commit offsets #556

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2022. Axon Framework
* Copyright (c) 2010-2025. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,6 +56,7 @@ public class AsyncFetcher<K, V, E> implements Fetcher<K, V, E> {
private final ExecutorService executorService;
private final boolean requirePoolShutdown;
private final Set<FetchEventsTask<K, V, E>> activeFetchers = ConcurrentHashMap.newKeySet();
private final OffsetCommitType offsetCommitType;

/**
* Instantiate a Builder to be able to create a {@link AsyncFetcher}.
Expand Down Expand Up @@ -85,32 +86,34 @@ protected AsyncFetcher(Builder<K, V, E> builder) {
this.pollTimeout = builder.pollTimeout;
this.executorService = builder.executorService;
this.requirePoolShutdown = builder.requirePoolShutdown;
this.offsetCommitType = builder.offsetCommitType;
}

/**
* {@inheritDoc}
*/
@Override
public Registration poll(Consumer<K, V> consumer,
RecordConverter<K, V, E> recordConverter,
EventConsumer<E> eventConsumer) {
RecordConverter<K, V, E> recordConverter,
EventConsumer<E> eventConsumer) {
return poll(consumer, recordConverter, eventConsumer,
e -> logger.warn("Error from fetching thread, should be handled properly", e));
e -> logger.warn("Error from fetching thread, should be handled properly", e));
}

/**
* {@inheritDoc}
*/
@Override
public Registration poll(Consumer<K, V> consumer, RecordConverter<K, V, E> recordConverter,
EventConsumer<E> eventConsumer, RuntimeErrorHandler runtimeErrorHandler) {
EventConsumer<E> eventConsumer, RuntimeErrorHandler runtimeErrorHandler) {
FetchEventsTask<K, V, E> fetcherTask =
new FetchEventsTask<>(consumer,
pollTimeout,
recordConverter,
eventConsumer,
activeFetchers::remove,
runtimeErrorHandler);
pollTimeout,
recordConverter,
eventConsumer,
activeFetchers::remove,
runtimeErrorHandler,
offsetCommitType);

activeFetchers.add(fetcherTask);
executorService.execute(fetcherTask);
Expand Down Expand Up @@ -148,6 +151,7 @@ public static final class Builder<K, V, E> {
private Duration pollTimeout = Duration.ofMillis(DEFAULT_POLL_TIMEOUT_MS);
private ExecutorService executorService = Executors.newCachedThreadPool(new AxonThreadFactory("AsyncFetcher"));
private boolean requirePoolShutdown = true;
private OffsetCommitType offsetCommitType = OffsetCommitType.AUTO;

/**
* Set the {@code pollTimeout} in milliseconds for polling records from a topic. Defaults to {@code 5000}
Expand All @@ -158,11 +162,36 @@ public static final class Builder<K, V, E> {
*/
public Builder<K, V, E> pollTimeout(long timeoutMillis) {
assertThat(timeoutMillis, timeout -> timeout > 0,
"The poll timeout may not be negative [" + timeoutMillis + "]");
"The poll timeout may not be negative [" + timeoutMillis + "]");
this.pollTimeout = Duration.ofMillis(timeoutMillis);
return this;
}

/**
* Sets the {@code offsetCommitType} defining how the {@link FetchEventsTask} will commit offsets during
* processing of events.
* <p>
* Options are:
* <ul>
* <li>{@link OffsetCommitType#AUTO} - let the Kafka consumer commit offsets automatically in background.
* </li>
* <li>{@link OffsetCommitType#COMMIT_SYNC} - let the Kafka consumer commit offsets synchronously after
* processing.</li>
* <li>{@link OffsetCommitType#COMMIT_ASYNC} - let the Kafka consumer commit offsets asynchronously after
* processing.</li>
* </ul>
* <p>
* Defaults to {@code OffsetCommitType#AUTO}, meaning the offset commit task happens in the background.
*
* @param offsetCommitType {@link OffsetCommitType} enum to specify the offset commit type
* @return the current Builder instance, for fluent interfacing
*/
public AsyncFetcher.Builder<K, V, E> offsetCommitType(OffsetCommitType offsetCommitType) {
assertNonNull(offsetCommitType, "OffsetCommitType may not be null");
this.offsetCommitType = offsetCommitType;
return this;
}

/**
* Sets the {@link ExecutorService} used to start {@link FetchEventsTask} instances to poll for Kafka consumer
* records. Note that the {@code executorService} should contain sufficient threads to run the necessary fetcher
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2022. Axon Framework
* Copyright (c) 2010-2025. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,6 +52,7 @@ class FetchEventsTask<K, V, E> implements Runnable {
private final EventConsumer<E> eventConsumer;
private final java.util.function.Consumer<FetchEventsTask<K, V, E>> closeHandler;
private final RuntimeErrorHandler runtimeErrorHandler;
private final OffsetCommitType offsetCommitType;


/**
Expand All @@ -72,7 +73,8 @@ class FetchEventsTask<K, V, E> implements Runnable {
RecordConverter<K, V, E> recordConverter,
EventConsumer<E> eventConsumer,
java.util.function.Consumer<FetchEventsTask<K, V, E>> closeHandler,
RuntimeErrorHandler runtimeErrorHandler) {
RuntimeErrorHandler runtimeErrorHandler,
OffsetCommitType offsetCommitType) {
this.consumer = nonNull(consumer, () -> "Consumer may not be null");
assertThat(pollTimeout, time -> !time.isNegative(),
"The poll timeout may not be negative [" + pollTimeout + "]");
Expand All @@ -81,6 +83,7 @@ class FetchEventsTask<K, V, E> implements Runnable {
this.eventConsumer = eventConsumer;
this.closeHandler = getOrDefault(closeHandler, task -> { /* no-op */ });
this.runtimeErrorHandler = nonNull(runtimeErrorHandler, () -> "Runtime error handler may not be null");
this.offsetCommitType = offsetCommitType;
}

@Override
Expand Down Expand Up @@ -109,6 +112,7 @@ private void processRecords(ConsumerRecords<K, V> records) {
try {
if (!convertedMessages.isEmpty()) {
eventConsumer.consume(convertedMessages);
handleOffsetsIfRequired(convertedMessages.size());
}
} catch (InterruptedException e) {
logger.debug("Event Consumer thread was interrupted. Shutting down", e);
Expand All @@ -117,6 +121,18 @@ private void processRecords(ConsumerRecords<K, V> records) {
}
}

private void handleOffsetsIfRequired(int messageCount) {
if (OffsetCommitType.COMMIT_SYNC == offsetCommitType) {
consumer.commitSync();
logger.debug("Committed offsets synchronously for {} messages", messageCount);
} else if (OffsetCommitType.COMMIT_ASYNC == offsetCommitType) {
consumer.commitAsync();
logger.debug("Committed offsets asynchronously for {} messages", messageCount);
} else {
logger.debug("Consumer will commit offsets in background");
}
}

/**
* Shutdown this {@link FetchEventsTask}.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2010-2025. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.extensions.kafka.eventhandling.consumer;

import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
* Enum to define how the consumer will handle committing offsets.
*
* @author Bradley Skuse
* @since 4.11.1
*/
public enum OffsetCommitType {

/**
* Kafka consumer will commit offsets automatically in the background.
*/
AUTO,

/**
* Kafka consumer will commit offsets asynchronously after processing
*
* @see KafkaConsumer#commitAsync()
*/
COMMIT_ASYNC,

/**
* Kafka consumer will commit offsets synchronously after processing
*
* @see KafkaConsumer#commitSync()
*/
COMMIT_SYNC
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2022. Axon Framework
* Copyright (c) 2010-2025. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -76,7 +76,8 @@ void setUp() {
testRecordConverter,
testEventConsumer,
testCloseHandler,
runtimeErrorHandler
runtimeErrorHandler,
OffsetCommitType.AUTO
);

when(testConsumer.poll(testPollTimeout)).thenReturn(consumerRecords);
Expand All @@ -95,7 +96,8 @@ void testTaskConstructionWithInvalidConsumerShouldThrowException() {
testRecordConverter,
testEventConsumer,
testCloseHandler,
runtimeErrorHandler
runtimeErrorHandler,
OffsetCommitType.AUTO
)
);
}
Expand All @@ -111,7 +113,8 @@ void testTaskConstructionWithNegativeTimeoutShouldThrowException() {
testRecordConverter,
testEventConsumer,
testCloseHandler,
runtimeErrorHandler
runtimeErrorHandler,
OffsetCommitType.AUTO
)
);
}
Expand All @@ -127,7 +130,8 @@ void testFetchEventsTaskInterruptionClosesAsExpected() {
testRecordConverter,
failingEventConsumer,
testCloseHandler,
runtimeErrorHandler
runtimeErrorHandler,
OffsetCommitType.AUTO
);

Thread taskRunner = new Thread(testSubjectWithFailingEventConsumer);
Expand All @@ -148,6 +152,8 @@ void testFetchEventsTaskPollsConvertsAndConsumesRecords() throws InterruptedExce
verify(testConsumer, atLeastOnceWithTimeout).poll(testPollTimeout);
verify(testRecordConverter, atLeastOnceWithTimeout).convert(consumerRecords);
verify(testEventConsumer, atLeastOnceWithTimeout).consume(Collections.singletonList(kafkaEventMessage));
verify(testConsumer, never()).commitSync();
verify(testConsumer, never()).commitAsync();

taskRunner.interrupt();
}
Expand All @@ -163,6 +169,8 @@ void testFetchEventsTaskPollsDoesNotCallEventConsumerForZeroConvertedEvents() {
verify(testConsumer, atLeastOnceWithTimeout).poll(testPollTimeout);
verify(testRecordConverter, atLeastOnceWithTimeout).convert(consumerRecords);
verifyNoMoreInteractions(testEventConsumer);
verify(testConsumer, never()).commitSync();
verify(testConsumer, never()).commitAsync();

taskRunner.interrupt();
}
Expand All @@ -177,4 +185,54 @@ void testCloseCallsProvidedCloseHandler() {
assertWithin(Duration.ofMillis(TIMEOUT_MILLIS), () -> assertTrue(expectedToBeClosed.get()));
verify(testConsumer, timeout(TIMEOUT_MILLIS)).close();
}

@Test
void testFetchEventsTaskPollsConvertsAndConsumesRecordsAndCommitOffsetsSync() throws InterruptedException {
VerificationMode atLeastOnceWithTimeout = timeout(TIMEOUT_MILLIS).atLeastOnce();

FetchEventsTask<String, String, KafkaEventMessage> testSubjectWithCommitOffsets = new FetchEventsTask<>(
testConsumer,
testPollTimeout,
testRecordConverter,
testEventConsumer,
testCloseHandler,
runtimeErrorHandler,
OffsetCommitType.COMMIT_SYNC
);

Thread taskRunner = new Thread(testSubjectWithCommitOffsets);
taskRunner.start();

verify(testConsumer, atLeastOnceWithTimeout).poll(testPollTimeout);
verify(testRecordConverter, atLeastOnceWithTimeout).convert(consumerRecords);
verify(testEventConsumer, atLeastOnceWithTimeout).consume(Collections.singletonList(kafkaEventMessage));
verify(testConsumer, atLeastOnceWithTimeout).commitSync();

taskRunner.interrupt();
}

@Test
void testFetchEventsTaskPollsConvertsAndConsumesRecordsAndCommitOffsetsAsync() throws InterruptedException {
VerificationMode atLeastOnceWithTimeout = timeout(TIMEOUT_MILLIS).atLeastOnce();

FetchEventsTask<String, String, KafkaEventMessage> testSubjectWithCommitOffsets = new FetchEventsTask<>(
testConsumer,
testPollTimeout,
testRecordConverter,
testEventConsumer,
testCloseHandler,
runtimeErrorHandler,
OffsetCommitType.COMMIT_ASYNC
);

Thread taskRunner = new Thread(testSubjectWithCommitOffsets);
taskRunner.start();

verify(testConsumer, atLeastOnceWithTimeout).poll(testPollTimeout);
verify(testRecordConverter, atLeastOnceWithTimeout).convert(consumerRecords);
verify(testEventConsumer, atLeastOnceWithTimeout).consume(Collections.singletonList(kafkaEventMessage));
verify(testConsumer, atLeastOnceWithTimeout).commitAsync();

taskRunner.interrupt();
}
}
Loading