diff options
| author | zhanghongqing <[email protected]> | 2022-09-23 17:10:05 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2022-09-23 17:10:05 +0800 |
| commit | 9cdfe060cfeda37e04fa43563569efae53641eb4 (patch) | |
| tree | fabf1a170248666f1587cffd1c0f3d6e724de0d9 /src/main/java/com/zdjizhi/etl/LogService.java | |
| parent | 25e5b51766540d8c1b238a1e28a96fdff45024d3 (diff) | |
1.过滤异常数据 2.优化sink写入代码 3.优化clickhouse配置
Diffstat (limited to 'src/main/java/com/zdjizhi/etl/LogService.java')
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/LogService.java | 31 |
1 files changed, 1 insertions, 30 deletions
diff --git a/src/main/java/com/zdjizhi/etl/LogService.java b/src/main/java/com/zdjizhi/etl/LogService.java index 052a8b3..de5141a 100644 --- a/src/main/java/com/zdjizhi/etl/LogService.java +++ b/src/main/java/com/zdjizhi/etl/LogService.java @@ -1,45 +1,16 @@ package com.zdjizhi.etl; import cn.hutool.json.JSONUtil; -import com.zdjizhi.etl.connection.ArangodbBatchIPWindow; -import com.zdjizhi.utils.arangodb.ArangoDBSink; import com.zdjizhi.utils.ck.CKSink; import com.zdjizhi.utils.kafka.KafkaProducer; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; -import org.apache.flink.streaming.api.windowing.time.Time; import java.util.Map; -import static com.zdjizhi.common.FlowWriteConfig.*; +import static com.zdjizhi.common.FlowWriteConfig.SINK_PARALLELISM; public class LogService { - -// @Deprecated -// public static void getLogCKSink3(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception { -// -// sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) -// .trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime)) -// .apply(new CKBatchWindow()) -// .addSink(new ClickhouseSink(sink)) -// .setParallelism(SINK_PARALLELISM) -// .name(sink) -// .setParallelism(SINK_PARALLELISM); -// -// } - - public static void getLogArangoSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception { - sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME))) - .trigger(new CountTriggerWithTimeout<>(sink, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime)) - .apply(new ArangodbBatchIPWindow()) - .addSink(new ArangoDBSink(sink)) - .setParallelism(SINK_PARALLELISM) - .name(sink) - .setParallelism(SINK_PARALLELISM); - } - public static void getLogKafkaSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception { sourceStream.map(JSONUtil::toJsonStr) .setParallelism(SINK_PARALLELISM) |
