Skip to content

[FLINK-37605][runtime] Infer checkpoint id on endInput in sink [1.19] #26458

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: release-1.19
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
Expand Down Expand Up @@ -106,6 +107,8 @@ public class CompactorOperator
// submitted again while restoring
private ListState<Map<Long, List<CompactorRequest>>> remainingRequestsState;

private long lastKnownCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1;

public CompactorOperator(
FileCompactStrategy strategy,
SimpleVersionedSerializer<FileSinkCommittable> committableSerializer,
Expand Down Expand Up @@ -136,15 +139,16 @@ public void processElement(StreamRecord<CompactorRequest> element) throws Except
@Override
public void endInput() throws Exception {
// add collecting requests into the final snapshot
checkpointRequests.put(CommittableMessage.EOI, collectingRequests);
long checkpointId = lastKnownCheckpointId + 1;
checkpointRequests.put(checkpointId, collectingRequests);
collectingRequests = new ArrayList<>();

// submit all requests and wait until they are done
submitUntil(CommittableMessage.EOI);
submitUntil(checkpointId);
assert checkpointRequests.isEmpty();

getAllTasksFuture().join();
emitCompacted(CommittableMessage.EOI);
emitCompacted(checkpointId);
assert compactingRequests.isEmpty();
}

Expand Down Expand Up @@ -222,6 +226,8 @@ private void submitUntil(long checkpointId) {
}

private void emitCompacted(long checkpointId) throws Exception {
lastKnownCheckpointId = checkpointId;

List<FileSinkCommittable> compacted = new ArrayList<>();
Iterator<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> iter =
compactingRequests.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ public interface CommittableMessage<CommT> {
/**
* Special value for checkpointId for the end of input in case of batch commit or final
* checkpoint.
*
* @deprecated the special value is not used anymore at all (remove with Flink 2.2)
*/
long EOI = Long.MAX_VALUE;
@Deprecated long EOI = Long.MAX_VALUE;

/** The subtask that created this committable. */
int getSubtaskId();
Expand All @@ -49,6 +51,8 @@ default OptionalLong getCheckpointId() {
/**
* Returns the checkpoint id or EOI if this message belong to the final checkpoint or the batch
* commit.
*
* @deprecated the special value EOI is not used anymore
*/
long getCheckpointIdOrEOI();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ public interface BoundedOneInput {
/**
* It is notified that no more data will arrive from the input.
*
* <p>Stateful operators need to be aware that a restart with rescaling may occur after
* receiving this notification. A changed source split assignment may imply that the same
* subtask of this operator that received endInput, has its state after endInput snapshotted,
* and will receive new data after restart. Hence, the state should not contain any finalization
* that would make it impossible to process new data.
*
* <p><b>WARNING:</b> It is not safe to use this method to commit any transactions or other side
* effects! You can use this method to flush any buffered data that can later on be committed
* e.g. in a {@link StreamOperator#notifyCheckpointComplete(long)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.configuration.SinkOptions;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
Expand All @@ -51,7 +52,6 @@
import java.util.Collections;
import java.util.OptionalLong;

import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
import static org.apache.flink.util.IOUtils.closeAll;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -76,11 +76,9 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage
private SinkCommitterMetricGroup metricGroup;
private Committer<CommT> committer;
private CommittableCollector<CommT> committableCollector;
private long lastCompletedCheckpointId = -1;
private long lastCompletedCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1;
private int maxRetries;

private boolean endInput = false;

/** The operator's state descriptor. */
private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
new ListStateDescriptor<>(
Expand Down Expand Up @@ -131,11 +129,11 @@ public void initializeState(StateInitializationContext context) throws Exception
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(),
metricGroup));
if (context.isRestored()) {
if (checkpointId.isPresent()) {
committableCollectorState.get().forEach(cc -> committableCollector.merge(cc));
lastCompletedCheckpointId = checkpointId.getAsLong();
// try to re-commit recovered transactions as quickly as possible
commitAndEmitCheckpoints();
commitAndEmitCheckpoints(lastCompletedCheckpointId);
}
}

Expand All @@ -148,24 +146,23 @@ public void snapshotState(StateSnapshotContext context) throws Exception {

@Override
public void endInput() throws Exception {
endInput = true;
if (!isCheckpointingEnabled || isBatchMode) {
// There will be no final checkpoint, all committables should be committed here
commitAndEmitCheckpoints();
commitAndEmitCheckpoints(lastCompletedCheckpointId + 1);
}
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId);
commitAndEmitCheckpoints();
commitAndEmitCheckpoints(Math.max(lastCompletedCheckpointId, checkpointId));
}

private void commitAndEmitCheckpoints() throws IOException, InterruptedException {
long completedCheckpointId = endInput ? EOI : lastCompletedCheckpointId;
private void commitAndEmitCheckpoints(long checkpointId)
throws IOException, InterruptedException {
lastCompletedCheckpointId = checkpointId;
for (CheckpointCommittableManager<CommT> checkpointManager :
committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {
committableCollector.getCheckpointCommittablesUpTo(checkpointId)) {
// ensure that all committables of the first checkpoint are fully committed before
// attempting the next committable
commitAndEmit(checkpointManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
Expand Down Expand Up @@ -52,8 +51,6 @@
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.UserCodeClassLoader;

import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;

import javax.annotation.Nullable;

import java.io.IOException;
Expand All @@ -62,6 +59,7 @@
import java.util.List;
import java.util.OptionalLong;

import static org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID;
import static org.apache.flink.util.IOUtils.closeAll;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
Expand Down Expand Up @@ -91,13 +89,6 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
@Nullable private final SimpleVersionedSerializer<CommT> committableSerializer;
private final List<CommT> legacyCommittables = new ArrayList<>();

/**
* Used to remember that EOI has already happened so that we don't emit the last committables of
* the final checkpoints twice.
*/
private static final ListStateDescriptor<Boolean> END_OF_INPUT_STATE_DESC =
new ListStateDescriptor<>("end_of_input_state", BooleanSerializer.INSTANCE);

/** The runtime information of the input element. */
private final Context<InputT> context;

Expand All @@ -115,10 +106,7 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
private final MailboxExecutor mailboxExecutor;

private boolean endOfInput = false;
/**
* Remembers the endOfInput state for (final) checkpoints iff the operator emits committables.
*/
@Nullable private ListState<Boolean> endOfInputState;
private long lastKnownCheckpointId = INITIAL_CHECKPOINT_ID - 1;

SinkWriterOperator(
Sink<InputT> sink,
Expand Down Expand Up @@ -146,8 +134,10 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
WriterInitContext initContext = createInitContext(context.getRestoredCheckpointId());
if (context.isRestored()) {
OptionalLong restoredCheckpointId = context.getRestoredCheckpointId();
WriterInitContext initContext = createInitContext(restoredCheckpointId);
if (restoredCheckpointId.isPresent()) {
lastKnownCheckpointId = restoredCheckpointId.getAsLong();
if (committableSerializer != null) {
final ListState<List<CommT>> legacyCommitterState =
new SimpleVersionedListState<>(
Expand All @@ -161,41 +151,12 @@ public void initializeState(StateInitializationContext context) throws Exception
}

sinkWriter = writerStateHandler.createWriter(initContext, context);

if (emitDownstream) {
// Figure out if we have seen end of input before and if we can suppress creating
// transactions and sending them downstream to the CommitterOperator. We have the
// following
// cases:
// 1. state is empty:
// - First time initialization
// - Restoring from a previous version of Flink that didn't handle EOI
// - Upscaled from a final or regular checkpoint
// In all cases, we regularly handle EOI, potentially resulting in duplicate summaries
// that the CommitterOperator needs to handle.
// 2. state is not empty:
// - This implies Flink restores from a version that handles EOI.
// - If there is one entry, no rescaling happened (for this subtask), so if it's true,
// we recover from a final checkpoint (for this subtask) and can ignore another EOI
// else we have a regular checkpoint.
// - If there are multiple entries, Flink downscaled, and we need to check if all are
// true and do the same as above. As soon as one entry is false, we regularly start
// the writer and potentially emit duplicate summaries if we indeed recovered from a
// final checkpoint.
endOfInputState = context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC);
ArrayList<Boolean> previousState = Lists.newArrayList(endOfInputState.get());
endOfInput = !previousState.isEmpty() && !previousState.contains(false);
}
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
writerStateHandler.snapshotState(context.getCheckpointId());
if (endOfInputState != null) {
endOfInputState.clear();
endOfInputState.add(this.endOfInput);
}
}

@Override
Expand Down Expand Up @@ -225,17 +186,16 @@ public void processWatermark(Watermark mark) throws Exception {

@Override
public void endInput() throws Exception {
LOG.info("Received endInput");
if (!endOfInput) {
endOfInput = true;
if (endOfInputState != null) {
endOfInputState.add(true);
}
sinkWriter.flush(true);
emitCommittables(CommittableMessage.EOI);
emitCommittables(lastKnownCheckpointId + 1);
}
}

private void emitCommittables(long checkpointId) throws IOException, InterruptedException {
lastKnownCheckpointId = checkpointId;
if (!emitDownstream) {
// To support SinkV1 topologies with only a writer we have to call prepareCommit
// although no committables are forwarded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ void addSummary(CommittableSummary<CommT> summary) {
summary.getSubtaskId(),
checkpointId,
metricGroup);
// Remove branch once CommittableMessage.EOI has been removed (earliest 2.2)
if (checkpointId == CommittableMessage.EOI) {
SubtaskCommittableManager<CommT> merged =
subtasksCommittableManagers.merge(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.stream.Collectors;

Expand All @@ -49,7 +48,6 @@
*/
@Internal
public class CommittableCollector<CommT> {
private static final long EOI = Long.MAX_VALUE;
/** Mapping of checkpoint id to {@link CheckpointCommittableManagerImpl}. */
private final NavigableMap<Long, CheckpointCommittableManagerImpl<CommT>>
checkpointCommittables;
Expand Down Expand Up @@ -143,15 +141,6 @@ public Collection<? extends CheckpointCommittableManager<CommT>> getCheckpointCo
return new ArrayList<>(checkpointCommittables.headMap(checkpointId, true).values());
}

/**
* Returns {@link CheckpointCommittableManager} belonging to the last input.
*
* @return {@link CheckpointCommittableManager}
*/
public Optional<CheckpointCommittableManager<CommT>> getEndOfInputCommittable() {
return Optional.ofNullable(checkpointCommittables.get(EOI));
}

/**
* Returns whether all {@link CheckpointCommittableManager} currently hold by the collector are
* either committed or failed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.Collection;
import java.util.List;

import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
import static org.assertj.core.api.Assertions.assertThat;

class GlobalCommitterOperatorTest {
Expand Down Expand Up @@ -138,38 +137,6 @@ void testStateRestore() throws Exception {
}
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testCommitAllCommittablesOnFinalCheckpoint(boolean commitOnInput) throws Exception {
final MockCommitter committer = new MockCommitter();
final OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> testHarness =
createTestHarness(committer, commitOnInput);
testHarness.open();

final CommittableSummary<Integer> committableSummary =
new CommittableSummary<>(1, 2, EOI, 1, 1, 0);
testHarness.processElement(new StreamRecord<>(committableSummary));
final CommittableSummary<Integer> committableSummary2 =
new CommittableSummary<>(2, 2, EOI, 1, 1, 0);
testHarness.processElement(new StreamRecord<>(committableSummary2));

final CommittableWithLineage<Integer> first = new CommittableWithLineage<>(1, EOI, 1);
testHarness.processElement(new StreamRecord<>(first));
final CommittableWithLineage<Integer> second = new CommittableWithLineage<>(2, EOI, 2);
testHarness.processElement(new StreamRecord<>(second));

// commitOnInput implies that the global committer is not using notifyCheckpointComplete
if (commitOnInput) {
assertThat(committer.committed).containsExactly(1, 2);
} else {
assertThat(committer.committed).isEmpty();
testHarness.notifyOfCompletedCheckpoint(EOI);
assertThat(committer.committed).containsExactly(1, 2);
}

assertThat(testHarness.getOutput()).isEmpty();
}

private OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> createTestHarness(
Committer<Integer> committer, boolean commitOnInput) throws Exception {
return new OneInputStreamOperatorTestHarness<>(
Expand Down
Loading