Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit d47c1e6

Browse files
authored
Merge pull request #428 from peihe/disable-unbounded-read
Revert PR-427 to re-enable streaming bounded read
2 parents a7e516d + bf6fed3 commit d47c1e6

File tree

2 files changed

+1
-38
lines changed

2 files changed

+1
-38
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java

+1-9
Original file line numberDiff line numberDiff line change
@@ -361,15 +361,7 @@ public static DataflowPipelineRunner fromOptions(PipelineOptions options) {
361361
builder.put(View.AsList.class, StreamingViewAsList.class);
362362
builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
363363
builder.put(Read.Unbounded.class, StreamingUnboundedRead.class);
364-
if (options.getExperiments() == null
365-
|| !options.getExperiments().contains("enable_streaming_bounded_read")) {
366-
builder.put(Read.Bounded.class, UnsupportedIO.class);
367-
builder.put(AvroIO.Read.Bound.class, UnsupportedIO.class);
368-
builder.put(BigQueryIO.Read.Bound.class, UnsupportedIO.class);
369-
builder.put(TextIO.Read.Bound.class, UnsupportedIO.class);
370-
} else {
371-
builder.put(Read.Bounded.class, StreamingBoundedRead.class);
372-
}
364+
builder.put(Read.Bounded.class, StreamingBoundedRead.class);
373365
builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class);
374366
builder.put(Window.Bound.class, AssignWindows.class);
375367
// In streaming mode must use either the custom Pubsub unbounded source/sink or

sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java

-29
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@
4646
import com.google.cloud.dataflow.sdk.coders.Coder;
4747
import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
4848
import com.google.cloud.dataflow.sdk.io.AvroIO;
49-
import com.google.cloud.dataflow.sdk.io.AvroSource;
50-
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
5149
import com.google.cloud.dataflow.sdk.io.Read;
5250
import com.google.cloud.dataflow.sdk.io.TextIO;
5351
import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions;
@@ -930,33 +928,6 @@ private void testUnsupportedSource(PTransform<PInput, ?> source, String name, bo
930928
p.run();
931929
}
932930

933-
@Test
934-
public void testBoundedSourceUnsupportedInStreaming() throws Exception {
935-
testUnsupportedSource(
936-
AvroSource.readFromFileWithClass("foo", String.class), "Read.Bounded", true);
937-
}
938-
939-
@Test
940-
public void testBigQueryIOSourceUnsupportedInStreaming() throws Exception {
941-
testUnsupportedSource(
942-
BigQueryIO.Read.from("project:bar.baz").withoutValidation(), "BigQueryIO.Read", true);
943-
}
944-
945-
@Test
946-
public void testAvroIOSourceUnsupportedInStreaming() throws Exception {
947-
testUnsupportedSource(AvroIO.Read.from("foo"), "AvroIO.Read", true);
948-
}
949-
950-
@Test
951-
public void testTextIOSourceUnsupportedInStreaming() throws Exception {
952-
testUnsupportedSource(TextIO.Read.from("foo"), "TextIO.Read", true);
953-
}
954-
955-
@Test
956-
public void testReadBoundedSourceUnsupportedInStreaming() throws Exception {
957-
testUnsupportedSource(Read.from(AvroSource.from("/tmp/test")), "Read.Bounded", true);
958-
}
959-
960931
@Test
961932
public void testReadUnboundedUnsupportedInBatch() throws Exception {
962933
testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false);

0 commit comments

Comments
 (0)