summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/etl/LogService.java
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-09-23 17:10:05 +0800
committerzhanghongqing <[email protected]>2022-09-23 17:10:05 +0800
commit9cdfe060cfeda37e04fa43563569efae53641eb4 (patch)
treefabf1a170248666f1587cffd1c0f3d6e724de0d9 /src/main/java/com/zdjizhi/etl/LogService.java
parent25e5b51766540d8c1b238a1e28a96fdff45024d3 (diff)
1.过滤异常数据 2.优化sink写入代码 3.优化clickhouse配置
Diffstat (limited to 'src/main/java/com/zdjizhi/etl/LogService.java')
-rw-r--r--src/main/java/com/zdjizhi/etl/LogService.java31
1 files changed, 1 insertions, 30 deletions
diff --git a/src/main/java/com/zdjizhi/etl/LogService.java b/src/main/java/com/zdjizhi/etl/LogService.java
index 052a8b3..de5141a 100644
--- a/src/main/java/com/zdjizhi/etl/LogService.java
+++ b/src/main/java/com/zdjizhi/etl/LogService.java
@@ -1,45 +1,16 @@
package com.zdjizhi.etl;
import cn.hutool.json.JSONUtil;
-import com.zdjizhi.etl.connection.ArangodbBatchIPWindow;
-import com.zdjizhi.utils.arangodb.ArangoDBSink;
import com.zdjizhi.utils.ck.CKSink;
import com.zdjizhi.utils.kafka.KafkaProducer;
-import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Map;
-import static com.zdjizhi.common.FlowWriteConfig.*;
+import static com.zdjizhi.common.FlowWriteConfig.SINK_PARALLELISM;
public class LogService {
-
-// @Deprecated
-// public static void getLogCKSink3(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
-//
-// sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
-// .trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime))
-// .apply(new CKBatchWindow())
-// .addSink(new ClickhouseSink(sink))
-// .setParallelism(SINK_PARALLELISM)
-// .name(sink)
-// .setParallelism(SINK_PARALLELISM);
-//
-// }
-
- public static void getLogArangoSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
- sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
- .trigger(new CountTriggerWithTimeout<>(sink, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))
- .apply(new ArangodbBatchIPWindow())
- .addSink(new ArangoDBSink(sink))
- .setParallelism(SINK_PARALLELISM)
- .name(sink)
- .setParallelism(SINK_PARALLELISM);
- }
-
public static void getLogKafkaSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
sourceStream.map(JSONUtil::toJsonStr)
.setParallelism(SINK_PARALLELISM)