Skip to content

[FLINK-18590][json] Support json array explode to multi messages #26473

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docs/content.zh/docs/connectors/table/formats/json.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,34 @@ Format 参数
</tr>
</tbody>
</table>

特性
--------

### 允许 json array 直接展开成多行数据

通常,我们假设 JSON 的最外层数据是一个 JSON Object。所以一条 JSON 会转换成一行结果。

但是在某些情况下 JSON 的最外层数据可能是一个 JSON Array,我们期望它可以被展开成多条结果。 JSON Array 的每个元素都是一个 JSON Object, 这些 JSON Object 的 schema 需要和 SQL 定义一致。然后每个 JSON Object 会被转成一行结果。Flink JSON Format 支持对这种情况的默认处理。

例如,对于如下 DDL:
```sql
CREATE TABLE user_behavior (
col1 BIGINT,
col2 VARCHAR
) WITH (
'format' = 'json',
...
)
```

以下两种情况下 Flink JSON Format 都将会产生两条数据 `(123, "a")` 和 `(456, "b")`.
最外层是一个 JSON Array:
```json lines
[{"col1": 123, "col2": "a"}, {"col1": 456, "col2": "b"}]
```
最外层是一个 JSON Object:
```json lines
{"col1": 123, "col2": "a"}
{"col1": 456, "col2": "b"}
```
29 changes: 29 additions & 0 deletions docs/content/docs/connectors/table/formats/json.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,35 @@ The following table lists the type mapping from Flink type to JSON type.
</tbody>
</table>

Features
--------

### Allow top-level JSON Arrays

Usually, we assume the top-level of JSON string is a stringified JSON object. Then this stringified JSON object can be converted into one SQL row.

There are some cases that, the top-level of JSON string is a stringified JSON array, and we want to explode the array into multiple records. Each element within the array is a JSON object, the schema of every such JSON object is the same as defined in SQL, and each of these JSON objects can be converted into one row. Flink JSON Format supports reading such data.

For example, for the following SQL DDL:
```sql
CREATE TABLE user_behavior (
col1 BIGINT,
col2 VARCHAR
) WITH (
'format' = 'json',
...
)
```

Flink JSON Format will produce 2 rows `(123, "a")` and `(456, "b")` with both of following two JSON string.
The top-level is JSON Array:
```json lines
[{"col1": 123, "col2": "a"}, {"col1": 456, "col2": "b"}]
```
The top-level is JSON Object:
```json lines
{"col1": 123, "col2": "a"}
{"col1": 456, "col2": "b"}
```


Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,27 @@

package org.apache.flink.formats.json;

import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.util.Collector;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.jackson.JacksonMapperFactory;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -62,6 +70,10 @@ public abstract class AbstractJsonDeserializationSchema implements Deserializati

private final boolean hasDecimalType;

private transient Collector<RowData> collector;

private transient List<RowData> reusableCollectList;

public AbstractJsonDeserializationSchema(
RowType rowType,
TypeInformation<RowData> resultTypeInfo,
Expand Down Expand Up @@ -89,6 +101,23 @@ public void open(InitializationContext context) throws Exception {
if (hasDecimalType) {
objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
}
reusableCollectList = new ArrayList<>();
collector = new ListCollector<>(reusableCollectList);
}

@Override
public RowData deserialize(@Nullable byte[] message) throws IOException {
reusableCollectList.clear();
deserialize(message, collector);
if (reusableCollectList.size() > 1) {
throw new FlinkRuntimeException(
"Please invoke "
+ "DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
}
if (reusableCollectList.isEmpty()) {
return null;
}
return reusableCollectList.get(0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
Expand Down Expand Up @@ -72,10 +73,10 @@ public JsonParserRowDataDeserializationSchema(
}

@Override
public RowData deserialize(byte[] message) throws IOException {
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
// return null when there is no token
if (message == null || message.length == 0) {
return null;
return;
}
try (JsonParser root = objectMapper.getFactory().createParser(message)) {
/* First: must point to a token; if not pointing to one, advance.
Expand All @@ -85,16 +86,30 @@ public RowData deserialize(byte[] message) throws IOException {
if (root.currentToken() == null) {
root.nextToken();
}
if (root.currentToken() != JsonToken.START_OBJECT) {
if (root.currentToken() != JsonToken.START_OBJECT
&& root.currentToken() != JsonToken.START_ARRAY) {
throw JsonMappingException.from(root, "No content to map due to end-of-input");
}
return (RowData) runtimeConverter.convert(root);
if (root.currentToken() == JsonToken.START_ARRAY) {
processArray(root, out);
} else {
processObject(root, out);
}
} catch (Throwable t) {
if (ignoreParseErrors) {
return null;
if (!ignoreParseErrors) {
throw new IOException(
format("Failed to deserialize JSON '%s'.", new String(message)), t);
}
throw new IOException(
format("Failed to deserialize JSON '%s'.", new String(message)), t);
}
}

private void processArray(JsonParser root, Collector<RowData> out) throws IOException {
while (root.nextToken() != JsonToken.END_ARRAY) {
out.collect((RowData) runtimeConverter.convert(root));
}
}

private void processObject(JsonParser root, Collector<RowData> out) throws IOException {
out.collect((RowData) runtimeConverter.convert(root));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -63,18 +65,38 @@ public JsonRowDataDeserializationSchema(
}

@Override
public RowData deserialize(@Nullable byte[] message) throws IOException {
public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws IOException {
if (message == null) {
return null;
return;
}
try {
return convertToRowData(deserializeToJsonNode(message));
final JsonNode root = deserializeToJsonNode(message);
if (root != null && root.isArray()) {
ArrayNode arrayNode = (ArrayNode) root;
for (int i = 0; i < arrayNode.size(); i++) {
try {
RowData result = convertToRowData(arrayNode.get(i));
if (result != null) {
out.collect(result);
}
} catch (Throwable t) {
if (!ignoreParseErrors) {
// will be caught by outer try-catch
throw t;
}
}
}
} else {
RowData result = convertToRowData(root);
if (result != null) {
out.collect(result);
}
}
} catch (Throwable t) {
if (ignoreParseErrors) {
return null;
if (!ignoreParseErrors) {
throw new IOException(
format("Failed to deserialize JSON '%s'.", new String(message)), t);
}
throw new IOException(
format("Failed to deserialize JSON '%s'.", new String(message)), t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.formats.json;

import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
import org.apache.flink.core.testutils.FlinkAssertions;
Expand All @@ -34,6 +35,7 @@
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.jackson.JacksonMapperFactory;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -51,6 +53,7 @@
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -224,6 +227,55 @@ void testSerDe() throws Exception {
assertThat(serializedJson).containsExactly(actualBytes);
}

@Test
public void testJsonArrayToMultiRecords() throws Exception {
DataType dataType = ROW(FIELD("f1", INT()), FIELD("f2", BOOLEAN()), FIELD("f3", STRING()));
RowType rowType = (RowType) dataType.getLogicalType();

ObjectMapper objectMapper = new ObjectMapper();

ObjectNode element1 = objectMapper.createObjectNode();
element1.put("f1", 1);
element1.put("f2", true);
element1.put("f3", "str");

ObjectNode element2 = objectMapper.createObjectNode();
element2.put("f1", 10);
element2.put("f2", false);
element2.put("f3", "newStr");

ArrayNode arrayNode = objectMapper.createArrayNode();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we test arrays of arrays of arrays and arrays of objects of arrays

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious with arrays of arrays, do we create multiple records only at the top level or should we have an option to expand the nested arrays to multiple records also. Should this be a format configuration option ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that it's more complicated structure and not supported yet.
I'm fine that we remain it as unsupported and add related description in the doc.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that it's more complicated structure and not supported yet. I'm fine that we remain it as unsupported and add related description in the doc.

I have added related description in doc.
Each element within the array is a JSON object, the schema of every such JSON object is the same as defined in SQL, and each of these JSON objects can be converted into one row

arrayNode.add(element1);
arrayNode.add(element2);

DeserializationSchema<RowData> deserializationSchema =
createDeserializationSchema(
isJsonParser, rowType, false, false, TimestampFormat.ISO_8601);

open(deserializationSchema);

// test serialization
JsonRowDataSerializationSchema serializationSchema =
new JsonRowDataSerializationSchema(
rowType,
TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL,
"null",
true,
false);
open(serializationSchema);

List<RowData> result = new ArrayList<>();
Collector<RowData> collector = new ListCollector<>(result);
deserializationSchema.deserialize(objectMapper.writeValueAsBytes(arrayNode), collector);
assertThat(result).hasSize(2);

byte[] result1 = serializationSchema.serialize(result.get(0));
byte[] result2 = serializationSchema.serialize(result.get(1));
assertThat(result1).isEqualTo(objectMapper.writeValueAsBytes(element1));
assertThat(result2).isEqualTo(objectMapper.writeValueAsBytes(element2));
}

/**
* Tests the deserialization slow path, e.g. convert into string and use {@link
* Double#parseDouble(String)}.
Expand Down