Skip to content

[#555] Add option to commit offsets #556

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

bradskuse
Copy link

@bradskuse bradskuse commented Mar 18, 2025

Resolves #555

This adds an option so that offsets can be committed after processing of events, instead of using enable auto commit.

Edit: Added ability to use auto, async or sync

@CLAassistant
Copy link

CLAassistant commented Mar 18, 2025

CLA assistant check
All committers have signed the CLA.

@smcvb smcvb requested review from a team, CodeDrivenMitch, smcvb and MateuszNaKodach and removed request for a team March 19, 2025 10:04
@smcvb smcvb added this to the Release 4.11.1 milestone Mar 19, 2025
@smcvb smcvb changed the title Add option to commit offsets [#555] Add option to commit offsets Mar 19, 2025
Copy link
Member

@smcvb smcvb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually I think there's nothing wrong with the changes. So, thanks for the effort there, @bradskuse! However, I do have some minor concerns we need to address. So, if you could take a look at those once you have found the time for that, that would be great.

Comment on lines 7 to 9
AUTO,
COMMIT_ASYNC,
COMMIT_SYNC
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Each enumeration option should have some JavaDoc! Would you mind adding that, @bradskuse?

package org.axonframework.extensions.kafka.eventhandling.consumer;

/**
* Enum to define how the consumer will handle committing offsets.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to add an @author tag with your name, and an @since tag referring to 4.11.1 to the class-level JavaDoc! :-)

@@ -0,0 +1,10 @@
package org.axonframework.extensions.kafka.eventhandling.consumer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New files should have a copyright notice to state all is Apache 2 licensed. You can find the copyright notice template here. Or, you can copy it from another file in this repository and adjust the date accordingly.

* @return the current Builder instance, for fluent interfacing
*/
public AsyncFetcher.Builder<K, V, E> offsetCommitType(OffsetCommitType offsetCommitType) {
this.offsetCommitType = offsetCommitType;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Please verify that the given offsetCommitType is not null. You can use the assertNonNull method from the Builder#executorService method, for that, for example.

Comment on lines 171 to 175
* Set the {@code offsetCommitType}, options are:
* {@link OffsetCommitType#AUTO} let the Kafka consumer commit offsets automatically in background
* {@link OffsetCommitType#COMMIT_SYNC} let the Kafka consumer commit offsets synchronously after processing
* {@link OffsetCommitType#COMMIT_ASYNC} let the Kafka consumer commit offsets asynchronously after processing
* Defaults to {@code OffsetCommitType#AUTO}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor restructuring suggestion:

Suggested change
* Set the {@code offsetCommitType}, options are:
* {@link OffsetCommitType#AUTO} let the Kafka consumer commit offsets automatically in background
* {@link OffsetCommitType#COMMIT_SYNC} let the Kafka consumer commit offsets synchronously after processing
* {@link OffsetCommitType#COMMIT_ASYNC} let the Kafka consumer commit offsets asynchronously after processing
* Defaults to {@code OffsetCommitType#AUTO}
* Sets the {@code offsetCommitType} defining how the {@link FetchEventsTask} will commit offsets during processing of events.
*
* Options are:
* <ul>
* <li>{@link OffsetCommitType#AUTO} - let the Kafka consumer commit offsets automatically in background.</li>
* <li>{@link OffsetCommitType#COMMIT_SYNC} - let the Kafka consumer commit offsets synchronously after processing.</li>
* <li>{@link OffsetCommitType#COMMIT_ASYNC} - let the Kafka consumer commit offsets asynchronously after processing.</li>
* </ul>
*
* Defaults to {@code OffsetCommitType#AUTO}, meaning the offset commit task happens in the background.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes to this file should update the copyright notice to this year. More specifically, that means line 2 of this file should state the following:

Copyright (c) 2010-2025. Axon Framework

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes to this file should update the copyright notice to this year. More specifically, that means line 2 of this file should state the following:

Copyright (c) 2010-2025. Axon Framework

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes to this file should update the copyright notice to this year. More specifically, that means line 2 of this file should state the following:

Copyright (c) 2010-2025. Axon Framework

@bradskuse
Copy link
Author

Hey @smcvb

Thank you for reviewing this PR. I have just pushed up all the changes requested 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allow SubscribableKafkaMessageSource option of manually commiting offsets
3 participants