diff options
Diffstat (limited to 'src/main/java/com/zdjizhi/etl/LogService.java')
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/LogService.java | 47 |
1 files changed, 34 insertions, 13 deletions
diff --git a/src/main/java/com/zdjizhi/etl/LogService.java b/src/main/java/com/zdjizhi/etl/LogService.java index 56989b1..052a8b3 100644 --- a/src/main/java/com/zdjizhi/etl/LogService.java +++ b/src/main/java/com/zdjizhi/etl/LogService.java @@ -1,8 +1,10 @@ package com.zdjizhi.etl; +import cn.hutool.json.JSONUtil; import com.zdjizhi.etl.connection.ArangodbBatchIPWindow; import com.zdjizhi.utils.arangodb.ArangoDBSink; -import com.zdjizhi.utils.ck.ClickhouseSink; +import com.zdjizhi.utils.ck.CKSink; +import com.zdjizhi.utils.kafka.KafkaProducer; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; @@ -12,27 +14,46 @@ import java.util.Map; import static com.zdjizhi.common.FlowWriteConfig.*; -public interface LogService { +public class LogService { - public static void getLogCKSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception{ - sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) - .trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime)) - .apply(new CKBatchWindow()) - .addSink(new ClickhouseSink(sink)) +// @Deprecated +// public static void getLogCKSink3(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception { +// +// sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) +// .trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime)) +// .apply(new CKBatchWindow()) +// .addSink(new ClickhouseSink(sink)) +// .setParallelism(SINK_PARALLELISM) +// .name(sink) +// .setParallelism(SINK_PARALLELISM); +// +// } + + public static void getLogArangoSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception { + sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(sink, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new ArangodbBatchIPWindow()) + .addSink(new ArangoDBSink(sink)) .setParallelism(SINK_PARALLELISM) .name(sink) .setParallelism(SINK_PARALLELISM); + } + public static void getLogKafkaSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception { + sourceStream.map(JSONUtil::toJsonStr) + .setParallelism(SINK_PARALLELISM) + .addSink(KafkaProducer.getKafkaProducer(sink)) + .setParallelism(SINK_PARALLELISM) + .name(sink) + .setParallelism(SINK_PARALLELISM); } - public static void getLogArangoSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception{ - sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME))) - .trigger(new CountTriggerWithTimeout<>(sink, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime)) - .apply(new ArangodbBatchIPWindow()) - .addSink(new ArangoDBSink(sink)) + public static void getLogCKSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception { + sourceStream.addSink(new CKSink(sink)) .setParallelism(SINK_PARALLELISM) .name(sink) .setParallelism(SINK_PARALLELISM); + } -} +}
\ No newline at end of file |
