diff options
| -rw-r--r-- | .gitignore | 35 | ||||
| -rw-r--r-- | pom.xml | 155 | ||||
| -rw-r--r-- | src/main/java/com/chaoc/flink/serialization/JsonTest.java | 42 | ||||
| -rw-r--r-- | src/main/java/com/chaoc/flink/serialization/formats/JsonNodeSerializationSchema.java | 20 | ||||
| -rw-r--r-- | src/main/resources/log4j2.properties | 25 |
5 files changed, 277 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d769462 --- /dev/null +++ b/.gitignore @@ -0,0 +1,35 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store
\ No newline at end of file @@ -0,0 +1,155 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.chaoc.flink</groupId> + <artifactId>test-json-serialization</artifactId> + <version>1.0-SNAPSHOT</version> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <java.version>1.8</java.version> + <maven.compiler.source>${java.version}</maven.compiler.source> + <maven.compiler.target>${java.version}</maven.compiler.target> + <flink.version>1.13.6</flink.version> + <scala.binary.version>2.12</scala.binary.version> + <slf4j.version>1.7.32</slf4j.version> + <log4j.version>2.17.1</log4j.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_${scala.binary.version}</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> + + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-json</artifactId> + </dependency> + </dependencies> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-json</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + <version>${log4j.version}</version> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>${log4j.version}</version> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>${log4j.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.1</version> + <configuration> + <source>${java.version}</source> + <target>${java.version}</target> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.1.1</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <createDependencyReducedPom>false</createDependencyReducedPom> + <artifactSet> + <excludes> + <exclude>org.apache.flink:force-shading</exclude> + <exclude>com.google.code.findbugs:jsr305</exclude> + <exclude>org.slf4j:*</exclude> + <exclude>org.apache.logging.log4j:*</exclude> + </excludes> + </artifactSet> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project>
\ No newline at end of file diff --git a/src/main/java/com/chaoc/flink/serialization/JsonTest.java b/src/main/java/com/chaoc/flink/serialization/JsonTest.java new file mode 100644 index 0000000..f00fabe --- /dev/null +++ b/src/main/java/com/chaoc/flink/serialization/JsonTest.java @@ -0,0 +1,42 @@ +package com.chaoc.flink.serialization; + +import org.apache.flink.formats.json.JsonNodeDeserializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import com.chaoc.flink.serialization.formats.JsonNodeSerializationSchema; + +import java.util.Properties; + +public class JsonTest { + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final Properties srcProps = new Properties(); + srcProps.setProperty("bootstrap.servers", "192.168.41.32:9092"); + srcProps.setProperty("group.id", "jackson-serialization"); + + final FlinkKafkaConsumer<ObjectNode> consumer = new FlinkKafkaConsumer<>( + "SESSION-RECORD-JSON", + new JsonNodeDeserializationSchema(), + srcProps); + + final Properties dstProps = new Properties(); + dstProps.setProperty("bootstrap.servers", "192.168.41.32:9092"); + + final FlinkKafkaProducer<ObjectNode> producer = new FlinkKafkaProducer<>( + "SESSION-RECORD-JSON-COMPLETED", + new JsonNodeSerializationSchema(), + dstProps); + + env.addSource(consumer) + .name("SessionRecordJacksonReader") + .addSink(producer) + .name("SessionRecordJacksonWriter"); + + env.execute("Jackson Serialization Test"); + } +} diff --git a/src/main/java/com/chaoc/flink/serialization/formats/JsonNodeSerializationSchema.java b/src/main/java/com/chaoc/flink/serialization/formats/JsonNodeSerializationSchema.java new file mode 100644 index 0000000..0de2f56 --- /dev/null +++ b/src/main/java/com/chaoc/flink/serialization/formats/JsonNodeSerializationSchema.java @@ -0,0 +1,20 @@ +package com.chaoc.flink.serialization.formats; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +public class JsonNodeSerializationSchema implements SerializationSchema<ObjectNode> { + + private final ObjectMapper mapper = new ObjectMapper(); + + @Override + public byte[] serialize(ObjectNode element) { + try { + return mapper.writeValueAsBytes(element); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties new file mode 100644 index 0000000..e7c4ef9 --- /dev/null +++ b/src/main/resources/log4j2.properties @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = WARN +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n |
