Today's applications and IoT devices are generating a lot of big data. Most of this data is unstructured in nature. For a very long time, relational database management systems have been used to store structured data. These database management systems organize data into tables, which are rows and columns. However, unstructured data cannot be organized well in rows and columns, thus, relational databases are not the best option for storage of unstructured data. NoSQL databases were developed to fill the gaps created by relational databases. They are easy to use for the storage of unstructured data. GridDB is a good example of a NoSQL database. It is well optimized for IoT and Big Data. When using GridDB, you will need to stream data in and out. Apache Kafka is a data streaming platform that can help you to achieve this. In this article, we will be walking you through how to organize communication between different elements of the software system when using Kafka and GridDB database.
Apache Kafka is a distributed event streaming platform used for data integration, data pipelines, and streaming analytics. It can help you to ingest and process streaming data in real-time. Streaming data refers to the data generated continuously by data sources, sending data records simultaneously. Apache Kafka provides its users with the following functions:
- Publish and subscribe to record streams.
- Effective storage of record streams in the same order they were generated.
- Real-time processing of record streams.
Apache Kafka combines storage, messaging, and stream processing to facilitate the analysis of real-time and historical data.
- GridDB is a database for IoT with both NoSQL interface and SQL interface.
- The GridDB Kafka sink connector pushes data from Apache Kafka topics and persists the data to GridDB database tables.
- This guide explains how to configure and use the GridDB Kafka sink connector.
- The GridDB Kafka source connector pulls data from GridDB database tables and persists the data to Apache Kafka topics.
- This guide explains how to configure and use the GridDB Kafka source connector.
-
The data mapping is automatic mapping in source connector. Table type mapping:
Apache Kafka data type GridDB data type INT8 GSType.BYTE INT16 GSType.SHORT INT32 GSType.INTEGER INT64 GSType.LONG FLOAT32 GSType.FLOAT FLOAT64 GSType.DOUBLE BOOLEAN GSType.BOOL STRING GSType.STRING BYTES GSType.BLOB Timestamp GSType.TIMESTAMP -
Type mapping in sink connector:
Apache Kafka data type GridDB data type INT8 GSType.BYTE INT16 GSType.SHORT INT32 GSType.INTEGER INT64 GSType.LONG FLOAT32 GSType.FLOAT FLOAT64 GSType.DOUBLE BOOLEAN GSType.BOOL STRING GSType.STRING BYTES GSType.BLOB Decimal, Date, Time, Timestamp GSType.TIMESTAMP
Note: In GridDB, the rowkey is the first column, so it is not configurable with GridDB Kafka sink/source connector.
-
Concept
Concept of Apache Kafka Concept of GridDB topic container (table) record row field column Note: Please refer to an example in GridDB write as below.
-
Mode
- From GridDB database tables to Apache Kafka(GridDB Kafka source connector)
Container type of GridDB Mode of Apache Kafka COLLECTION,TIMESERIES bulk COLLECTION,TIMESERIES timestamp(should be used for column timestamp)
Suggestion: If GridDB database tables have column names, use type timestamp. Users should use mode timestamp(GridDB source connector configure) to get data from GridDB database tables.
- From GridDB database tables to Apache Kafka(GridDB Kafka source connector)
Note: Please refer to an example in GridDB written for 2 modes as below.
Building of the library and execution of the sample programs have been checked in the following environment.
OS: Ubuntu 24.04(x64)
Java: 17
Maven: 3.9.5
Kafka: 2.13-3.7.1
GridDB server: V5.6 CE, Ubuntu 22.04(x64)
$ wget https://dlcdn.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz
$ tar xzvf kafka_2.13-3.7.1.tgz
$ cd kafka_2.13-3.7.1
$ export PATH=$PATH:/path/to/kafka_2.13-3.7.1/bin
$ zookeeper-server-start.sh -daemon config/zookeeper.properties # Start zookeeper server
$ kafka-server-start.sh config/server.properties # Start Apache Kafka server
Note:
- Keep terminal to keep starting Apache Kafka server.
- If Apache Kafka broker is not available when starting Apache Kafka server, please fix this error as below:
- Open server.properties file and uncomment this line "#listeners=PLAINTEXT://:9092"
- Change to "listeners=PLAINTEXT://127.0.0.1:9092"
- Using package or source
- Using docker
- Quick docker command for starting GridDB server
$ docker pull griddb/griddb $ docker run --network="host" -e GRIDDB_CLUSTER_NAME=griddb griddb/griddb
- Quick docker command for starting GridDB server
Note:
- If using docker, please open a new terminal for install and start GridDB server.
-
Build GridDB Kafka sink/source connector
$ cd GRIDDB_KAFKA_CONNECTOR_FOLDER $ mvn clean install
-
After using command
$ mvn clean install
, file griddb-kafka-connector-X.X.jar will be created in "target/" folder. -
Copy the griddb-kafka-connector-X.X.jar file to /path/to/kafka_2.13-3.7.1/libs/
- Prepare data
- The "rawdata.txt" on samples/sink is a sample data file.
- Prepare script for running Apache Kafka
- The "script_sink.sh" on samples/sink is a sample script file.
- Run script
-
Set permission to execute for script_sink.sh
$ chmod a+x script_sink.sh
-
Export PATH for executing script_sink.sh
$ export PATH=$PATH:/path/to/kafka_2.13-3.7.1/bin
-
Run script_sink.sh for Apache Kafka
$ ./script_sink.sh
=> output:
./rawdata.txt Creating topic topic_D001 >>Creating topic topic_D002 ...
- Open griddb-sink.properties file at GRIDDB_KAFKA_CONNECTOR_FOLDER and configure property as below:
- Use property topics.regex:
topics.regex=topic.(.*)
- Use property topic:
topics=topic_D001,topic_D002
- Open a new terminal for executing this command:
$ cd kafka_2.13-3.7.1 $ ./bin/connect-standalone.sh config/connect-standalone.properties GRIDDB_KAFKA_CONNECTOR_FOLDER/config/griddb-sink.properties
- After finishing the command above, data/topic was pushed into GridDB database.
$ git clone https://github.com/griddb/cli.git
$ cd cli && ant # Refer to https://github.com/griddb/cli to build it
$ CP=.
$ CP=$CP:common/lib/commons-io-2.4.jar:release/griddb-cli.jar:common/lib/gridstore.jar:common/lib/gridstore-jdbc.jar:common/lib/jackson-annotations-2.2.3.jar:common/lib/jackson-core-2.2.3.jar:common/lib/jackson-databind-2.2.3.jar:common/lib/javax.json-1.0.jar:common/lib/jersey-client-1.17.1.jar:common/lib/jersey-core-1.17.1.jar:common/lib/orion-ssh2-214.jar:lib/commons-beanutils-1.9.3.jar:lib/commons-cli-1.2.jar:lib/commons-collections-3.2.2.jar:lib/commons-lang3-3.5.jar:lib/commons-logging-1.2.jar:lib/jline-3.17.1.jar:lib/logback-classic-1.0.13.jar:lib/logback-core-1.0.13.jar:lib/opencsv-3.9.jar:lib/slf4j-api-1.7.5.jar
$ java -Xmx1024m -Dlogback.configurationFile=gs_sh_logback.xml -classpath "$CP:$CLASSPATH" com.toshiba.mwcloud.gs.tools.shell.GridStoreShell $*
$ setcluster cluster0 griddb 239.0.0.1 31999
$ setclustersql cluster0 griddb 239.0.0.1 41999
$ setuser admin admin
$ connect $cluster0
$ gs[public]> sql select * from topic_D001;
1 results. (2 ms)
$ gs[public]> get
datetime,sensor,translate01,translate02,message,sensoractivity
2012-07-18T00:54:45.000Z,D001,Ignore,Ignore,CLOSE,Control4-Door
The 1 results had been acquired.
-
GridDB Kafka sink connector in the config file
Parameter Description Default Value connector.class the sink connector class com.github.griddb.kafka.connect.GriddbSinkConnector name the connector name topics.regex The list of topics to be format, It is used by the sink connector topics The name of topics is used by the sink connector host GridDB host or multicast address port GridDB port or multicast port cluster.name GridDB cluster name user GridDB username password GridDB user password notification.member GridDB notification member list in fixed list method batch.size the size of write buffer to GridDB 3000 multiput using multiput or single put in write buffer true container.name.format using it to change to topic name from GridDB container $(topic): The default container name is topic name container.type The GridDB container type. Choose either COLLECTION
orTIME_SERIES
. The TIME_SERIES container will be created when thecontainer.type
is TIME_SERIES and the first column is TIMESTAMP, otherwise the COLLECTION container will be created.COLLECTION Note:
- In file config/griddb-sink.properties: config values (connector.class, name, topics.regex or topics, transforms) are the properties used by Apache Kafka, not the connector).
- Just configure one property between "topics.regex" and "topics".
-
Install GridDB Python API
-
Run sample "topic.py"
- The "topic.py" on samples/source is a sample file to push data into GridDB database with GridDB Python API.
$ python3 topic.py 239.0.0.1 31999 griddb admin admin
-
Simple relationship in source code between Apache Kafka and GridDB database:
Apache Kafka GridDB database Value topic container(table) "D001", "D002", "T102",.... record row ['2020-10-01T15:00:00.000Z', False, 1, "griddb"],... field column "name","status","count","sign"
- Open griddb-source.properties file at GRIDDB_KAFKA_CONNECTOR_FOLDER, find properties similar and edit them as below:
-
Timestamp mode configure
containers=D001,D002,T102,BATP102,T103,BATP103,T101,MA016,D003,M009 mode=timestamp timestamp.column.name=name
-
Bulk mode configure
containers=D001,D002,T102,BATP102,T103,BATP103,T101,MA016,D003,M009 mode=bulk
-
Note:
- The differences between both modes are:
-
The mode timestamp: It needs at least one column is type timestamp (need configure property: "timestamp.column.name") to get data from GridDB database tables. It gets data just one time.
-
The mode bulk: It needn't be configured for "timestamp.column.name", but it gets loops and updates data many times.
-
- Open new terminal for executing commands:
$ cd kafka_2.13-3.7.1 $ ./bin/connect-standalone.sh config/connect-standalone.properties GRIDDB_KAFKA_CONNECTOR_FOLDER/config/griddb-source.properties
Note:
- GridDB Kafka sink connector and GridDB Kafka source connector default run on the same port (9092) so if you want to run both sink connector and source connector, please configure the port for Apache Kafka before running them.
- Besides, stopping the current process before running other methods of Apache Kafka.
- The listing of the available Topics in Apache Kafka:
$ ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092 BATP102 BATP103 ...
- Check and get data about the topic:
-
Timestamp mode
$ ./bin/kafka-console-consumer.sh --topic BATP102 --from-beginning --bootstrap-server localhost:9092 {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"name"},{"type":"boolean","optional":true,"field":"status"},{"type":"int64","optional":true,"field":"count"},{"type":"string","optional":true,"field":"sign"}],"optional":false,"name":"D001"},"payload":{"name":1601564400000,"status":false,"count":1,"sign":"griddb"}}
-
Bulk mode
$ ./bin/kafka-console-consumer.sh --topic BATP102 --from-beginning --bootstrap-server localhost:9092 {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"name"},{"type":"boolean","optional":true,"field":"status"},{"type":"int64","optional":true,"field":"count"},{"type":"string","optional":true,"field":"sign"}],"optional":false,"name":"D002"},"payload":{"name":1601564400000,"status":false,"count":1,"sign":"griddb"}}
-
GridDB Kafka source connector config parameters in the config file
Parameter Description Default Value connector.class the source connector class com.github.griddb.kafka.connect.GriddbSourceConnector name the connector name host GridDB host or multicast address port GridDB port or multicast port cluster.name GridDB cluster name user GridDB username password GridDB user password notification.member GridDB notification member list in fixed list method containers list of GridDB containers used by the source connector mode the mode to import (bulk/timestamp) timestamp.column.name the list of timestamp column in timestamp mode batch.max.rows The maximum rows for a batch 100 topic.prefix the prefix of output topic polling.interval.ms interval time for GridDB Kafka source connector to poll data 5000 Note:
- In file config/griddb-source.properties: the config values (connector.class, name is the properties used by Kafka, not the connector).