From 792b7b42af9661d23b6b07474b9d0e73445c2a2f Mon Sep 17 00:00:00 2001 From: lianghongjia Date: Tue, 18 Mar 2025 17:01:58 +0800 Subject: [PATCH 1/4] [FLINK-18590][json] Support json array explode to multi messages --- .../docs/connectors/table/formats/json.md | 31 +++++++++++ .../docs/connectors/table/formats/json.md | 31 +++++++++++ .../AbstractJsonDeserializationSchema.java | 37 +++++++++++++ ...sonParserRowDataDeserializationSchema.java | 31 ++++++++--- .../JsonRowDataDeserializationSchema.java | 37 ++++++++++--- .../json/JsonRowDataSerDeSchemaTest.java | 52 +++++++++++++++++++ 6 files changed, 204 insertions(+), 15 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/formats/json.md b/docs/content.zh/docs/connectors/table/formats/json.md index 005485a7a0a8e..7ed093f5a4a11 100644 --- a/docs/content.zh/docs/connectors/table/formats/json.md +++ b/docs/content.zh/docs/connectors/table/formats/json.md @@ -243,3 +243,34 @@ Format 参数 + +特性 +-------- + +### 允许 json array 直接展开成多行数据 + +通常,我们假设 JSON 的最外层数据是一个 JSON Object。所以一条 JSON 会转换成一行结果。 + +但是在某些情况下 JSON 的最外层数据可能是一个 JSON Array,我们期望它可以被展开成多条结果,Array 的每个元素都被转成一行结果。Flink JSON Format 支持对这种情况的默认处理。 + +例如,对于如下 DDL: +```sql +CREATE TABLE user_behavior ( + col1 BIGINT, + col2 VARCHAR +) WITH ( + 'format' = 'json', + ... +) +``` + +以下两种情况下 Flink JSON Format 都将会产生两条数据 `(123, "a")` 和 `(456, "b")`. +最外层是一个 JSON Array: +```json lines +[{"col1": 123, "col2": "a"}, {"col1": 456, "col2": "b"}] +``` +最外层是一个 JSON Object: +```json lines +{"col1": 123, "col2": "a"} +{"col1": 456, "col2": "b"} +``` diff --git a/docs/content/docs/connectors/table/formats/json.md b/docs/content/docs/connectors/table/formats/json.md index 64592ac28bea7..0b381c44735e9 100644 --- a/docs/content/docs/connectors/table/formats/json.md +++ b/docs/content/docs/connectors/table/formats/json.md @@ -257,6 +257,37 @@ The following table lists the type mapping from Flink type to JSON type. +Features +-------- +### Allow top-level JSON Arrays + +Usually, we assume the top-level of json string is a json object. Then the json object is converted to one SQL row. + +There are some cases that, the top-level of json string is a json array, and we want to explode the array to +multiple records, each one of the array is a json object which is converted to one row. Flink JSON Format supports +read such data implicitly. + +For example, for the following SQL DDL: +```sql +CREATE TABLE user_behavior ( + col1 BIGINT, + col2 VARCHAR +) WITH ( + 'format' = 'json', + ... +) +``` + +Flink JSON Format will produce 2 rows `(123, "a")` and `(456, "b")` with both of following two json string. +The top-level is JSON Array: +```json lines +[{"col1": 123, "col2": "a"}, {"col1": 456, "col2": "b"}] +``` +The top-level is JSON Object: +```json lines +{"col1": 123, "col2": "a"} +{"col1": 456, "col2": "b"} +``` diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java index aa62e0d5f8711..97a994a729792 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java @@ -18,6 +18,7 @@ package org.apache.flink.formats.json; +import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.formats.common.TimestampFormat; @@ -25,12 +26,19 @@ import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; +import org.apache.flink.util.Collector; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -62,6 +70,10 @@ public abstract class AbstractJsonDeserializationSchema implements Deserializati private final boolean hasDecimalType; + private transient Collector collector; + + private transient List reusableCollectList; + public AbstractJsonDeserializationSchema( RowType rowType, TypeInformation resultTypeInfo, @@ -89,6 +101,31 @@ public void open(InitializationContext context) throws Exception { if (hasDecimalType) { objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS); } + reusableCollectList = new ArrayList<>(); + collector = new ListCollector<>(reusableCollectList); + } + + /** + * NOTICE: We prefer to keep only {@link DeserializationSchema#deserialize(byte[], Collector)}, + * however, {@link DeserializationSchema#deserialize(byte[])} has not been deprecated now, hence + * it's better to keep {@link DeserializationSchema#deserialize(byte[])} usable until we remove + * {@link DeserializationSchema#deserialize(byte[])}. + * + *

Please do not change this method when you add new features! + */ + @Override + public RowData deserialize(@Nullable byte[] message) throws IOException { + reusableCollectList.clear(); + deserialize(message, collector); + if (reusableCollectList.size() > 1) { + throw new FlinkRuntimeException( + "Please invoke " + + "DeserializationSchema#deserialize(byte[], Collector) instead."); + } + if (reusableCollectList.isEmpty()) { + return null; + } + return reusableCollectList.get(0); } @Override diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java index 22df48f2ac209..eea1f607f1c58 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java @@ -23,6 +23,7 @@ import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken; @@ -72,10 +73,10 @@ public JsonParserRowDataDeserializationSchema( } @Override - public RowData deserialize(byte[] message) throws IOException { + public void deserialize(byte[] message, Collector out) throws IOException { // return null when there is no token if (message == null || message.length == 0) { - return null; + return; } try (JsonParser root = objectMapper.getFactory().createParser(message)) { /* First: must point to a token; if not pointing to one, advance. @@ -85,16 +86,30 @@ public RowData deserialize(byte[] message) throws IOException { if (root.currentToken() == null) { root.nextToken(); } - if (root.currentToken() != JsonToken.START_OBJECT) { + if (root.currentToken() != JsonToken.START_OBJECT + && root.currentToken() != JsonToken.START_ARRAY) { throw JsonMappingException.from(root, "No content to map due to end-of-input"); } - return (RowData) runtimeConverter.convert(root); + if (root.currentToken() == JsonToken.START_ARRAY) { + processArray(root, out); + } else { + processObject(root, out); + } } catch (Throwable t) { - if (ignoreParseErrors) { - return null; + if (!ignoreParseErrors) { + throw new IOException( + format("Failed to deserialize JSON '%s'.", new String(message)), t); } - throw new IOException( - format("Failed to deserialize JSON '%s'.", new String(message)), t); } } + + private void processArray(JsonParser root, Collector out) throws IOException { + while (root.nextToken() != JsonToken.END_ARRAY) { + out.collect((RowData) runtimeConverter.convert(root)); + } + } + + private void processObject(JsonParser root, Collector out) throws IOException { + out.collect((RowData) runtimeConverter.convert(root)); + } } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java index 5a3fe22b308c8..85c33a562fa33 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java @@ -23,8 +23,10 @@ import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; import javax.annotation.Nullable; @@ -63,18 +65,39 @@ public JsonRowDataDeserializationSchema( } @Override - public RowData deserialize(@Nullable byte[] message) throws IOException { + public void deserialize(@Nullable byte[] message, Collector out) throws IOException { if (message == null) { - return null; + return; } try { - return convertToRowData(deserializeToJsonNode(message)); + final JsonNode root = objectMapper.readTree(message); + + if (root != null && root.isArray()) { + ArrayNode arrayNode = (ArrayNode) root; + for (int i = 0; i < arrayNode.size(); i++) { + try { + RowData result = convertToRowData(arrayNode.get(i)); + if (result != null) { + out.collect(result); + } + } catch (Throwable t) { + if (!ignoreParseErrors) { + // will be caught by outer try-catch + throw t; + } + } + } + } else { + RowData result = convertToRowData(root); + if (result != null) { + out.collect(result); + } + } } catch (Throwable t) { - if (ignoreParseErrors) { - return null; + if (!ignoreParseErrors) { + throw new IOException( + format("Failed to deserialize JSON '%s'.", new String(message)), t); } - throw new IOException( - format("Failed to deserialize JSON '%s'.", new String(message)), t); } } diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java index 916b04f50f8be..18d01c9c083ae 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java @@ -18,6 +18,7 @@ package org.apache.flink.formats.json; +import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.connector.testutils.formats.DummyInitializationContext; import org.apache.flink.core.testutils.FlinkAssertions; @@ -34,6 +35,7 @@ import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -51,6 +53,7 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneOffset; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -224,6 +227,55 @@ void testSerDe() throws Exception { assertThat(serializedJson).containsExactly(actualBytes); } + @Test + public void testJsonArrayToMultiRecords() throws Exception { + DataType dataType = ROW(FIELD("f1", INT()), FIELD("f2", BOOLEAN()), FIELD("f3", STRING())); + RowType rowType = (RowType) dataType.getLogicalType(); + + ObjectMapper objectMapper = new ObjectMapper(); + + ObjectNode element1 = objectMapper.createObjectNode(); + element1.put("f1", 1); + element1.put("f2", true); + element1.put("f3", "str"); + + ObjectNode element2 = objectMapper.createObjectNode(); + element2.put("f1", 10); + element2.put("f2", false); + element2.put("f3", "newStr"); + + ArrayNode arrayNode = objectMapper.createArrayNode(); + arrayNode.add(element1); + arrayNode.add(element2); + + DeserializationSchema deserializationSchema = + createDeserializationSchema( + isJsonParser, rowType, false, false, TimestampFormat.ISO_8601); + + open(deserializationSchema); + + // test serialization + JsonRowDataSerializationSchema serializationSchema = + new JsonRowDataSerializationSchema( + rowType, + TimestampFormat.ISO_8601, + JsonFormatOptions.MapNullKeyMode.LITERAL, + "null", + true, + false); + open(serializationSchema); + + List result = new ArrayList<>(); + Collector collector = new ListCollector<>(result); + deserializationSchema.deserialize(objectMapper.writeValueAsBytes(arrayNode), collector); + assertThat(result).hasSize(2); + + byte[] result1 = serializationSchema.serialize(result.get(0)); + byte[] result2 = serializationSchema.serialize(result.get(1)); + assertThat(result1).isEqualTo(objectMapper.writeValueAsBytes(element1)); + assertThat(result2).isEqualTo(objectMapper.writeValueAsBytes(element2)); + } + /** * Tests the deserialization slow path, e.g. convert into string and use {@link * Double#parseDouble(String)}. From 58aa28b6eecaf33befdc5eb5646705efb65405d9 Mon Sep 17 00:00:00 2001 From: lianghongjia Date: Tue, 22 Apr 2025 17:01:53 +0800 Subject: [PATCH 2/4] fix --- docs/content/docs/connectors/table/formats/json.md | 8 +++----- .../formats/json/AbstractJsonDeserializationSchema.java | 4 ++-- .../formats/json/JsonRowDataDeserializationSchema.java | 3 +-- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/docs/content/docs/connectors/table/formats/json.md b/docs/content/docs/connectors/table/formats/json.md index 0b381c44735e9..e3ff1f2377937 100644 --- a/docs/content/docs/connectors/table/formats/json.md +++ b/docs/content/docs/connectors/table/formats/json.md @@ -262,11 +262,9 @@ Features ### Allow top-level JSON Arrays -Usually, we assume the top-level of json string is a json object. Then the json object is converted to one SQL row. +Usually, we assume the top-level of JSON string is a stringified JSON object. Then this stringified JSON object can be converted into one SQL row. -There are some cases that, the top-level of json string is a json array, and we want to explode the array to -multiple records, each one of the array is a json object which is converted to one row. Flink JSON Format supports -read such data implicitly. +There are some cases that, the top-level of JSON string is a stringified JSON array, and we want to explode the array into multiple records. Every element within the array is a JSON object, and each of these JSON objects can be converted into one row. Flink JSON Format supports reading such data. For example, for the following SQL DDL: ```sql @@ -279,7 +277,7 @@ CREATE TABLE user_behavior ( ) ``` -Flink JSON Format will produce 2 rows `(123, "a")` and `(456, "b")` with both of following two json string. +Flink JSON Format will produce 2 rows `(123, "a")` and `(456, "b")` with both of following two JSON string. The top-level is JSON Array: ```json lines [{"col1": 123, "col2": "a"}, {"col1": 456, "col2": "b"}] diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java index 97a994a729792..ca64f1f85abd2 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java @@ -107,8 +107,8 @@ public void open(InitializationContext context) throws Exception { /** * NOTICE: We prefer to keep only {@link DeserializationSchema#deserialize(byte[], Collector)}, - * however, {@link DeserializationSchema#deserialize(byte[])} has not been deprecated now, hence - * it's better to keep {@link DeserializationSchema#deserialize(byte[])} usable until we remove + * however, {@link DeserializationSchema#deserialize(byte[])} is not deprecated now, hence it's + * better to keep {@link DeserializationSchema#deserialize(byte[])} usable until we remove * {@link DeserializationSchema#deserialize(byte[])}. * *

Please do not change this method when you add new features! diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java index 85c33a562fa33..e8f76eca52aba 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java @@ -70,8 +70,7 @@ public void deserialize(@Nullable byte[] message, Collector out) throws return; } try { - final JsonNode root = objectMapper.readTree(message); - + final JsonNode root = deserializeToJsonNode(message); if (root != null && root.isArray()) { ArrayNode arrayNode = (ArrayNode) root; for (int i = 0; i < arrayNode.size(); i++) { From 9f96344d70a41d809ba5860531a1cd6bfd0d55b4 Mon Sep 17 00:00:00 2001 From: lianghongjia Date: Tue, 22 Apr 2025 17:35:39 +0800 Subject: [PATCH 3/4] fix doc --- docs/content.zh/docs/connectors/table/formats/json.md | 2 +- docs/content/docs/connectors/table/formats/json.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/formats/json.md b/docs/content.zh/docs/connectors/table/formats/json.md index 7ed093f5a4a11..76f4593f1d8f9 100644 --- a/docs/content.zh/docs/connectors/table/formats/json.md +++ b/docs/content.zh/docs/connectors/table/formats/json.md @@ -251,7 +251,7 @@ Format 参数 通常,我们假设 JSON 的最外层数据是一个 JSON Object。所以一条 JSON 会转换成一行结果。 -但是在某些情况下 JSON 的最外层数据可能是一个 JSON Array,我们期望它可以被展开成多条结果,Array 的每个元素都被转成一行结果。Flink JSON Format 支持对这种情况的默认处理。 +但是在某些情况下 JSON 的最外层数据可能是一个 JSON Array,我们期望它可以被展开成多条结果。 JSON Array 的每个元素都是一个 JSON Object, 这些 JSON Object 的 schema 需要和 SQL 定义一致。然后每个 JSON Object 会被转成一行结果。Flink JSON Format 支持对这种情况的默认处理。 例如,对于如下 DDL: ```sql diff --git a/docs/content/docs/connectors/table/formats/json.md b/docs/content/docs/connectors/table/formats/json.md index e3ff1f2377937..7ead1ab90d825 100644 --- a/docs/content/docs/connectors/table/formats/json.md +++ b/docs/content/docs/connectors/table/formats/json.md @@ -264,7 +264,7 @@ Features Usually, we assume the top-level of JSON string is a stringified JSON object. Then this stringified JSON object can be converted into one SQL row. -There are some cases that, the top-level of JSON string is a stringified JSON array, and we want to explode the array into multiple records. Every element within the array is a JSON object, and each of these JSON objects can be converted into one row. Flink JSON Format supports reading such data. +There are some cases that, the top-level of JSON string is a stringified JSON array, and we want to explode the array into multiple records. Each element within the array is a JSON object, the schema of every such JSON object is the same as defined in SQL, and each of these JSON objects can be converted into one row. Flink JSON Format supports reading such data. For example, for the following SQL DDL: ```sql From 0f0a646ae3daa7e0d8960722d3afdda53998921e Mon Sep 17 00:00:00 2001 From: lianghongjia Date: Tue, 22 Apr 2025 18:56:10 +0800 Subject: [PATCH 4/4] remove comments --- .../formats/json/AbstractJsonDeserializationSchema.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java index ca64f1f85abd2..a89eb042d38ff 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java @@ -105,14 +105,6 @@ public void open(InitializationContext context) throws Exception { collector = new ListCollector<>(reusableCollectList); } - /** - * NOTICE: We prefer to keep only {@link DeserializationSchema#deserialize(byte[], Collector)}, - * however, {@link DeserializationSchema#deserialize(byte[])} is not deprecated now, hence it's - * better to keep {@link DeserializationSchema#deserialize(byte[])} usable until we remove - * {@link DeserializationSchema#deserialize(byte[])}. - * - *

Please do not change this method when you add new features! - */ @Override public RowData deserialize(@Nullable byte[] message) throws IOException { reusableCollectList.clear();