summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorchaoc <[email protected]>2023-08-11 13:42:39 +0800
committerchaoc <[email protected]>2023-08-11 13:42:39 +0800
commitc084f808c593cdddddc4256d8d1c2b66eba7d07d (patch)
tree69384bcc72fea74df718f8015bdfb90312d39452 /src
feat: initfeature
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/chaoc/flink/serialization/JsonTest.java42
-rw-r--r--src/main/java/com/chaoc/flink/serialization/formats/JsonNodeSerializationSchema.java20
-rw-r--r--src/main/resources/log4j2.properties25
3 files changed, 87 insertions, 0 deletions
diff --git a/src/main/java/com/chaoc/flink/serialization/JsonTest.java b/src/main/java/com/chaoc/flink/serialization/JsonTest.java
new file mode 100644
index 0000000..f00fabe
--- /dev/null
+++ b/src/main/java/com/chaoc/flink/serialization/JsonTest.java
@@ -0,0 +1,42 @@
+package com.chaoc.flink.serialization;
+
+import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import com.chaoc.flink.serialization.formats.JsonNodeSerializationSchema;
+
+import java.util.Properties;
+
+public class JsonTest {
+
+ public static void main(String[] args) throws Exception {
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ final Properties srcProps = new Properties();
+ srcProps.setProperty("bootstrap.servers", "192.168.41.32:9092");
+ srcProps.setProperty("group.id", "jackson-serialization");
+
+ final FlinkKafkaConsumer<ObjectNode> consumer = new FlinkKafkaConsumer<>(
+ "SESSION-RECORD-JSON",
+ new JsonNodeDeserializationSchema(),
+ srcProps);
+
+ final Properties dstProps = new Properties();
+ dstProps.setProperty("bootstrap.servers", "192.168.41.32:9092");
+
+ final FlinkKafkaProducer<ObjectNode> producer = new FlinkKafkaProducer<>(
+ "SESSION-RECORD-JSON-COMPLETED",
+ new JsonNodeSerializationSchema(),
+ dstProps);
+
+ env.addSource(consumer)
+ .name("SessionRecordJacksonReader")
+ .addSink(producer)
+ .name("SessionRecordJacksonWriter");
+
+ env.execute("Jackson Serialization Test");
+ }
+}
diff --git a/src/main/java/com/chaoc/flink/serialization/formats/JsonNodeSerializationSchema.java b/src/main/java/com/chaoc/flink/serialization/formats/JsonNodeSerializationSchema.java
new file mode 100644
index 0000000..0de2f56
--- /dev/null
+++ b/src/main/java/com/chaoc/flink/serialization/formats/JsonNodeSerializationSchema.java
@@ -0,0 +1,20 @@
+package com.chaoc.flink.serialization.formats;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class JsonNodeSerializationSchema implements SerializationSchema<ObjectNode> {
+
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ @Override
+ public byte[] serialize(ObjectNode element) {
+ try {
+ return mapper.writeValueAsBytes(element);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..e7c4ef9
--- /dev/null
+++ b/src/main/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = WARN
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n