Skip to content

[FLINK-37605][runtime] Infer checkpoint id on endInput in sink [1.19] #26458

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: release-1.19
Choose a base branch
from

Conversation

AHeise
Copy link
Contributor

@AHeise AHeise commented Apr 14, 2025

Backport of #26433.

Unchanged compared to 1.20 backport. Prod code unchanged. Test refactor skipped and new test moved to SinkWriterOperatorTestBase.

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 14, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

So far, we used a special value for the final checkpoint on endInput. However, as shown in the description of this ticket, final doesn't mean final. Hence, multiple committables with EOI could be created at different times.

With this commit, we stop using a special value for such committables and instead try to guess the checkpoint id of the next checkpoint. There are various factors that influence the checkpoint id but we can mostly ignore them all because we just need to pick a checkpoint id that is
- higher than all checkpoint ids of the previous, successful checkpoints of this attempt
- higher than the checkpoint id of the restored checkpoint
- lower than any future checkpoint id.

Hence, we just remember the last observed checkpoint id (initialized with max(0, restored id)), and use last id + 1 for endInput. Naturally, multiple endInput calls happening through restarts will result in unique checkpoint ids. Note that aborted checkpoints before endInput may result in diverged checkpoint ids across subtasks. However, each of the id satisfies above requirements and any id of endInput1 will be smaller than any id of endInput2. Thus, diverged checkpoint ids will not impact correctness at all.

(cherry picked from commit 9302545)
@AHeise AHeise force-pushed the FLINK-37605-fix-eoi-sink-1.19 branch from 5c19e4a to ac34334 Compare April 16, 2025 21:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants