summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/etl/LogService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/zdjizhi/etl/LogService.java')
-rw-r--r--src/main/java/com/zdjizhi/etl/LogService.java38
1 files changed, 38 insertions, 0 deletions
diff --git a/src/main/java/com/zdjizhi/etl/LogService.java b/src/main/java/com/zdjizhi/etl/LogService.java
new file mode 100644
index 0000000..56989b1
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/LogService.java
@@ -0,0 +1,38 @@
+package com.zdjizhi.etl;
+
+import com.zdjizhi.etl.connection.ArangodbBatchIPWindow;
+import com.zdjizhi.utils.arangodb.ArangoDBSink;
+import com.zdjizhi.utils.ck.ClickhouseSink;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+import java.util.Map;
+
+import static com.zdjizhi.common.FlowWriteConfig.*;
+
+public interface LogService {
+
+ public static void getLogCKSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception{
+
+ sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
+ .trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime))
+ .apply(new CKBatchWindow())
+ .addSink(new ClickhouseSink(sink))
+ .setParallelism(SINK_PARALLELISM)
+ .name(sink)
+ .setParallelism(SINK_PARALLELISM);
+
+ }
+
+ public static void getLogArangoSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception{
+ sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
+ .trigger(new CountTriggerWithTimeout<>(sink, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))
+ .apply(new ArangodbBatchIPWindow())
+ .addSink(new ArangoDBSink(sink))
+ .setParallelism(SINK_PARALLELISM)
+ .name(sink)
+ .setParallelism(SINK_PARALLELISM);
+ }
+}