diff options
Diffstat (limited to 'src/main/java/com/zdjizhi/etl/LogService.java')
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/LogService.java | 38 |
1 files changed, 38 insertions, 0 deletions
diff --git a/src/main/java/com/zdjizhi/etl/LogService.java b/src/main/java/com/zdjizhi/etl/LogService.java new file mode 100644 index 0000000..56989b1 --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/LogService.java @@ -0,0 +1,38 @@ +package com.zdjizhi.etl; + +import com.zdjizhi.etl.connection.ArangodbBatchIPWindow; +import com.zdjizhi.utils.arangodb.ArangoDBSink; +import com.zdjizhi.utils.ck.ClickhouseSink; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +import java.util.Map; + +import static com.zdjizhi.common.FlowWriteConfig.*; + +public interface LogService { + + public static void getLogCKSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception{ + + sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new CKBatchWindow()) + .addSink(new ClickhouseSink(sink)) + .setParallelism(SINK_PARALLELISM) + .name(sink) + .setParallelism(SINK_PARALLELISM); + + } + + public static void getLogArangoSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception{ + sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(sink, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new ArangodbBatchIPWindow()) + .addSink(new ArangoDBSink(sink)) + .setParallelism(SINK_PARALLELISM) + .name(sink) + .setParallelism(SINK_PARALLELISM); + } +} |
