summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/etl/LogService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/zdjizhi/etl/LogService.java')
-rw-r--r--src/main/java/com/zdjizhi/etl/LogService.java47
1 files changed, 34 insertions, 13 deletions
diff --git a/src/main/java/com/zdjizhi/etl/LogService.java b/src/main/java/com/zdjizhi/etl/LogService.java
index 56989b1..052a8b3 100644
--- a/src/main/java/com/zdjizhi/etl/LogService.java
+++ b/src/main/java/com/zdjizhi/etl/LogService.java
@@ -1,8 +1,10 @@
package com.zdjizhi.etl;
+import cn.hutool.json.JSONUtil;
import com.zdjizhi.etl.connection.ArangodbBatchIPWindow;
import com.zdjizhi.utils.arangodb.ArangoDBSink;
-import com.zdjizhi.utils.ck.ClickhouseSink;
+import com.zdjizhi.utils.ck.CKSink;
+import com.zdjizhi.utils.kafka.KafkaProducer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
@@ -12,27 +14,46 @@ import java.util.Map;
import static com.zdjizhi.common.FlowWriteConfig.*;
-public interface LogService {
+public class LogService {
- public static void getLogCKSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception{
- sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
- .trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime))
- .apply(new CKBatchWindow())
- .addSink(new ClickhouseSink(sink))
+// @Deprecated
+// public static void getLogCKSink3(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
+//
+// sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
+// .trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime))
+// .apply(new CKBatchWindow())
+// .addSink(new ClickhouseSink(sink))
+// .setParallelism(SINK_PARALLELISM)
+// .name(sink)
+// .setParallelism(SINK_PARALLELISM);
+//
+// }
+
+ public static void getLogArangoSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
+ sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
+ .trigger(new CountTriggerWithTimeout<>(sink, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))
+ .apply(new ArangodbBatchIPWindow())
+ .addSink(new ArangoDBSink(sink))
.setParallelism(SINK_PARALLELISM)
.name(sink)
.setParallelism(SINK_PARALLELISM);
+ }
+ public static void getLogKafkaSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
+ sourceStream.map(JSONUtil::toJsonStr)
+ .setParallelism(SINK_PARALLELISM)
+ .addSink(KafkaProducer.getKafkaProducer(sink))
+ .setParallelism(SINK_PARALLELISM)
+ .name(sink)
+ .setParallelism(SINK_PARALLELISM);
}
- public static void getLogArangoSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception{
- sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
- .trigger(new CountTriggerWithTimeout<>(sink, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))
- .apply(new ArangodbBatchIPWindow())
- .addSink(new ArangoDBSink(sink))
+ public static void getLogCKSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
+ sourceStream.addSink(new CKSink(sink))
.setParallelism(SINK_PARALLELISM)
.name(sink)
.setParallelism(SINK_PARALLELISM);
+
}
-}
+} \ No newline at end of file