summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore35
-rw-r--r--pom.xml155
-rw-r--r--src/main/java/com/chaoc/flink/serialization/JsonTest.java42
-rw-r--r--src/main/java/com/chaoc/flink/serialization/formats/JsonNodeSerializationSchema.java20
-rw-r--r--src/main/resources/log4j2.properties25
5 files changed, 277 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..d769462
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,35 @@
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store \ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..cb134e0
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,155 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.chaoc.flink</groupId>
+ <artifactId>test-json-serialization</artifactId>
+ <version>1.0-SNAPSHOT</version>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <java.version>1.8</java.version>
+ <maven.compiler.source>${java.version}</maven.compiler.source>
+ <maven.compiler.target>${java.version}</maven.compiler.target>
+ <flink.version>1.13.6</flink.version>
+ <scala.binary.version>2.12</scala.binary.version>
+ <slf4j.version>1.7.32</slf4j.version>
+ <log4j.version>2.17.1</log4j.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
+
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ </dependency>
+ </dependencies>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.1.1</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ <artifactSet>
+ <excludes>
+ <exclude>org.apache.flink:force-shading</exclude>
+ <exclude>com.google.code.findbugs:jsr305</exclude>
+ <exclude>org.slf4j:*</exclude>
+ <exclude>org.apache.logging.log4j:*</exclude>
+ </excludes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project> \ No newline at end of file
diff --git a/src/main/java/com/chaoc/flink/serialization/JsonTest.java b/src/main/java/com/chaoc/flink/serialization/JsonTest.java
new file mode 100644
index 0000000..f00fabe
--- /dev/null
+++ b/src/main/java/com/chaoc/flink/serialization/JsonTest.java
@@ -0,0 +1,42 @@
+package com.chaoc.flink.serialization;
+
+import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import com.chaoc.flink.serialization.formats.JsonNodeSerializationSchema;
+
+import java.util.Properties;
+
+public class JsonTest {
+
+ public static void main(String[] args) throws Exception {
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ final Properties srcProps = new Properties();
+ srcProps.setProperty("bootstrap.servers", "192.168.41.32:9092");
+ srcProps.setProperty("group.id", "jackson-serialization");
+
+ final FlinkKafkaConsumer<ObjectNode> consumer = new FlinkKafkaConsumer<>(
+ "SESSION-RECORD-JSON",
+ new JsonNodeDeserializationSchema(),
+ srcProps);
+
+ final Properties dstProps = new Properties();
+ dstProps.setProperty("bootstrap.servers", "192.168.41.32:9092");
+
+ final FlinkKafkaProducer<ObjectNode> producer = new FlinkKafkaProducer<>(
+ "SESSION-RECORD-JSON-COMPLETED",
+ new JsonNodeSerializationSchema(),
+ dstProps);
+
+ env.addSource(consumer)
+ .name("SessionRecordJacksonReader")
+ .addSink(producer)
+ .name("SessionRecordJacksonWriter");
+
+ env.execute("Jackson Serialization Test");
+ }
+}
diff --git a/src/main/java/com/chaoc/flink/serialization/formats/JsonNodeSerializationSchema.java b/src/main/java/com/chaoc/flink/serialization/formats/JsonNodeSerializationSchema.java
new file mode 100644
index 0000000..0de2f56
--- /dev/null
+++ b/src/main/java/com/chaoc/flink/serialization/formats/JsonNodeSerializationSchema.java
@@ -0,0 +1,20 @@
+package com.chaoc.flink.serialization.formats;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class JsonNodeSerializationSchema implements SerializationSchema<ObjectNode> {
+
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ @Override
+ public byte[] serialize(ObjectNode element) {
+ try {
+ return mapper.writeValueAsBytes(element);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..e7c4ef9
--- /dev/null
+++ b/src/main/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = WARN
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n