Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit 48b0853

Browse files
authored
Merge pull request #591 from tgroh/backport_bzip
Backport apache/beam #3669
2 parents ab51974 + 0adfeed commit 48b0853

File tree

2 files changed

+41
-3
lines changed

2 files changed

+41
-3
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public boolean matches(String fileName) {
144144
public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
145145
throws IOException {
146146
return Channels.newChannel(
147-
new BZip2CompressorInputStream(Channels.newInputStream(channel)));
147+
new BZip2CompressorInputStream(Channels.newInputStream(channel), true));
148148
}
149149
};
150150

sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java

+40-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
2020
import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
21-
2221
import static org.hamcrest.Matchers.containsString;
2322
import static org.hamcrest.Matchers.instanceOf;
2423
import static org.hamcrest.Matchers.not;
@@ -203,6 +202,38 @@ public void testReadConcatenatedGzip() throws IOException {
203202
p.run();
204203
}
205204

205+
/**
206+
* Test a bzip2 file containing multiple streams is correctly decompressed.
207+
*
208+
* <p>A bzip2 file may contain multiple streams and should decompress as the concatenation of
209+
* those streams.
210+
*/
211+
@Test
212+
public void testReadMultiStreamBzip2() throws IOException {
213+
CompressionMode mode = CompressionMode.BZIP2;
214+
byte[] input1 = generateInput(5, 587973);
215+
byte[] input2 = generateInput(5, 387374);
216+
217+
ByteArrayOutputStream stream1 = new ByteArrayOutputStream();
218+
try (OutputStream os = getOutputStreamForMode(mode, stream1)) {
219+
os.write(input1);
220+
}
221+
222+
ByteArrayOutputStream stream2 = new ByteArrayOutputStream();
223+
try (OutputStream os = getOutputStreamForMode(mode, stream2)) {
224+
os.write(input2);
225+
}
226+
227+
File tmpFile = tmpFolder.newFile();
228+
try (OutputStream os = new FileOutputStream(tmpFile)) {
229+
os.write(stream1.toByteArray());
230+
os.write(stream2.toByteArray());
231+
}
232+
233+
byte[] output = Bytes.concat(input1, input2);
234+
verifyReadContents(output, tmpFile, mode);
235+
}
236+
206237
/**
207238
* Test reading empty input with bzip2.
208239
*/
@@ -416,7 +447,14 @@ public void populateDisplayData(DisplayData.Builder builder) {
416447
*/
417448
private byte[] generateInput(int size) {
418449
// Arbitrary but fixed seed
419-
Random random = new Random(285930);
450+
return generateInput(size, 285930);
451+
}
452+
453+
/**
454+
* Generate byte array of given size.
455+
*/
456+
private byte[] generateInput(int size, int seed) {
457+
Random random = new Random(seed);
420458
byte[] buff = new byte[size];
421459
random.nextBytes(buff);
422460
return buff;

0 commit comments

Comments
 (0)