diff options
Diffstat (limited to 'src')
3 files changed, 87 insertions, 0 deletions
diff --git a/src/main/java/com/chaoc/flink/serialization/JsonTest.java b/src/main/java/com/chaoc/flink/serialization/JsonTest.java new file mode 100644 index 0000000..f00fabe --- /dev/null +++ b/src/main/java/com/chaoc/flink/serialization/JsonTest.java @@ -0,0 +1,42 @@ +package com.chaoc.flink.serialization; + +import org.apache.flink.formats.json.JsonNodeDeserializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import com.chaoc.flink.serialization.formats.JsonNodeSerializationSchema; + +import java.util.Properties; + +public class JsonTest { + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final Properties srcProps = new Properties(); + srcProps.setProperty("bootstrap.servers", "192.168.41.32:9092"); + srcProps.setProperty("group.id", "jackson-serialization"); + + final FlinkKafkaConsumer<ObjectNode> consumer = new FlinkKafkaConsumer<>( + "SESSION-RECORD-JSON", + new JsonNodeDeserializationSchema(), + srcProps); + + final Properties dstProps = new Properties(); + dstProps.setProperty("bootstrap.servers", "192.168.41.32:9092"); + + final FlinkKafkaProducer<ObjectNode> producer = new FlinkKafkaProducer<>( + "SESSION-RECORD-JSON-COMPLETED", + new JsonNodeSerializationSchema(), + dstProps); + + env.addSource(consumer) + .name("SessionRecordJacksonReader") + .addSink(producer) + .name("SessionRecordJacksonWriter"); + + env.execute("Jackson Serialization Test"); + } +} diff --git a/src/main/java/com/chaoc/flink/serialization/formats/JsonNodeSerializationSchema.java b/src/main/java/com/chaoc/flink/serialization/formats/JsonNodeSerializationSchema.java new file mode 100644 index 0000000..0de2f56 --- /dev/null +++ b/src/main/java/com/chaoc/flink/serialization/formats/JsonNodeSerializationSchema.java @@ -0,0 +1,20 @@ +package com.chaoc.flink.serialization.formats; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +public class JsonNodeSerializationSchema implements SerializationSchema<ObjectNode> { + + private final ObjectMapper mapper = new ObjectMapper(); + + @Override + public byte[] serialize(ObjectNode element) { + try { + return mapper.writeValueAsBytes(element); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties new file mode 100644 index 0000000..e7c4ef9 --- /dev/null +++ b/src/main/resources/log4j2.properties @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = WARN +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n |
