summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhoujinchuan <[email protected]>2024-04-18 16:36:34 +0800
committerhoujinchuan <[email protected]>2024-04-18 16:36:34 +0800
commitaf97ab0a6ddd8258dfc1b4035910fc70ea06b813 (patch)
treecbccc4341f91c0f5fac41630a333311064a398f6
parentede6ebbb583d489c8dedfff81a6b96b2caf3452f (diff)
优化监控指标,添加oss sink,优化过滤和限流
-rw-r--r--pom.xml52
-rw-r--r--src/main/java/com/zdjizhi/FileChunkCombiner.java257
-rw-r--r--src/main/java/com/zdjizhi/config/Configs.java83
-rw-r--r--src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java139
-rw-r--r--src/main/java/com/zdjizhi/function/FileChunkFilterFunction.java29
-rw-r--r--src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java223
-rw-r--r--src/main/java/com/zdjizhi/function/TestKeyedCoProcessFunction.java101
-rw-r--r--src/main/java/com/zdjizhi/function/map/ParseMessagePackMapFunction.java385
-rw-r--r--src/main/java/com/zdjizhi/function/map/ParseProxyFileMetaFlatMapFunction.java98
-rw-r--r--src/main/java/com/zdjizhi/function/map/ParseSessionFileMetaFlatMapFunction.java117
-rw-r--r--src/main/java/com/zdjizhi/function/map/SideOutputMapFunction.java (renamed from src/main/java/com/zdjizhi/function/SideOutputMapFunction.java)4
-rw-r--r--src/main/java/com/zdjizhi/kafka/FileMetaKafkaConsumer.java36
-rw-r--r--src/main/java/com/zdjizhi/pojo/FileChunk.java7
-rw-r--r--src/main/java/com/zdjizhi/sink/HBaseSink.java236
-rw-r--r--src/main/java/com/zdjizhi/sink/HosSink.java407
-rw-r--r--src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java392
-rw-r--r--src/main/java/com/zdjizhi/sink/OssSinkByEhcache.java396
-rw-r--r--src/main/java/com/zdjizhi/trigger/LastChunkTrigger.java37
-rw-r--r--src/main/java/com/zdjizhi/utils/CaffeineCacheUtil.java46
-rw-r--r--src/main/java/com/zdjizhi/utils/EhcacheUtil.java35
-rw-r--r--src/main/java/com/zdjizhi/utils/HttpClientUtil.java16
-rw-r--r--src/main/resources/common.properties65
-rw-r--r--src/main/resources/ehcache.xml36
23 files changed, 2654 insertions, 543 deletions
diff --git a/pom.xml b/pom.xml
index 5d943cd..aeefdca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>file-chunk-combiner</artifactId>
- <version>1.2.1</version>
+ <version>1.3.0</version>
<repositories>
<repository>
@@ -199,6 +199,56 @@
<artifactId>commons-jexl3</artifactId>
<version>3.2.1</version>
</dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <version>2.9.3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.ehcache</groupId>
+ <artifactId>ehcache</artifactId>
+ <version>3.10.8</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.glassfish.jaxb</groupId>
+ <artifactId>jaxb-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.cache</groupId>
+ <artifactId>cache-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jaxb</groupId>
+ <artifactId>jaxb-runtime</artifactId>
+ <version>2.3.9</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.cache</groupId>
+ <artifactId>cache-api</artifactId>
+ <version>1.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.xml.bind</groupId>
+ <artifactId>jaxb-core</artifactId>
+ <version>2.3.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ <version>2.3.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.xml.bind</groupId>
+ <artifactId>jaxb-impl</artifactId>
+ <version>2.3.1</version>
+ </dependency>
</dependencies>
<build>
diff --git a/src/main/java/com/zdjizhi/FileChunkCombiner.java b/src/main/java/com/zdjizhi/FileChunkCombiner.java
index f64b6bd..705f673 100644
--- a/src/main/java/com/zdjizhi/FileChunkCombiner.java
+++ b/src/main/java/com/zdjizhi/FileChunkCombiner.java
@@ -1,29 +1,38 @@
package com.zdjizhi;
+import cn.hutool.core.util.StrUtil;
import com.zdjizhi.config.Configs;
import com.zdjizhi.function.*;
+import com.zdjizhi.function.map.ParseMessagePackMapFunction;
+import com.zdjizhi.function.map.ParseProxyFileMetaFlatMapFunction;
+import com.zdjizhi.function.map.ParseSessionFileMetaFlatMapFunction;
+import com.zdjizhi.function.map.SideOutputMapFunction;
+import com.zdjizhi.kafka.FileMetaKafkaConsumer;
import com.zdjizhi.pojo.*;
-import com.zdjizhi.sink.HBaseSink;
-import com.zdjizhi.sink.HosSink;
+import com.zdjizhi.sink.*;
import com.zdjizhi.kafka.KafkaConsumer;
+import com.zdjizhi.trigger.IdleTimeTrigger;
import com.zdjizhi.trigger.LastChunkOrNoDataInTimeTrigger;
+import com.zdjizhi.trigger.LastChunkTrigger;
import com.zdjizhi.trigger.MultipleTrigger;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
public class FileChunkCombiner {
@@ -33,53 +42,211 @@ public class FileChunkCombiner {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.getConfig().setGlobalJobParameters(configuration);
- WatermarkStrategy<FileChunk> watermarkStrategy = WatermarkStrategy
- .<FileChunk>forBoundedOutOfOrderness(Duration.ofSeconds(0))
- .withTimestampAssigner((FileChunk, timestamp) -> FileChunk.getTimestamp() / 1000);
-
- SingleOutputStreamOperator<FileChunk> parseMessagePackStream = environment
- .addSource(KafkaConsumer.byteArrayConsumer(configuration))
- .name("Kafka Source")
- .map(new ParseMessagePackMapFunction(configuration.get(Configs.ENABLE_RATE_LIMIT), configuration.get(Configs.RATE_LIMIT_THRESHOLD), configuration.get(Configs.RATE_LIMIT_EXCLUSION_EXPRESSION)))
- .name("Map: Parse Message Pack")
- .filter(new FileChunkFilterFunction(configuration.getLong(Configs.FILE_MAX_SIZE), configuration.getString(Configs.FILTER_EXPRESSION)))
- .assignTimestampsAndWatermarks(watermarkStrategy);
-
+ SingleOutputStreamOperator<FileChunk> windowStream;
OutputTag<FileChunk> delayedChunkOutputTag = new OutputTag<FileChunk>("delayed-chunk") {
};
+ if (configuration.getInteger(Configs.COMBINER_WINDOW_TYPE) == 0) {
+ WatermarkStrategy<FileChunk> watermarkStrategy = WatermarkStrategy
+ .<FileChunk>forBoundedOutOfOrderness(Duration.ofSeconds(configuration.get(Configs.COMBINER_WINDOW_ALLOWED_LATENESS)))
+ .withTimestampAssigner((FileChunk, timestamp) -> FileChunk.getTimestamp() / 1000);
- List<Trigger<Object, TimeWindow>> triggers = new ArrayList<>();
- triggers.add(EventTimeTrigger.create());
- triggers.add(LastChunkOrNoDataInTimeTrigger.of(configuration.get(Configs.COMBINER_WINDOW_IDLE_TIME) * 1000));
- Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
- SingleOutputStreamOperator<FileChunk> windowStream = parseMessagePackStream
- .keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO)
- .window(TumblingEventTimeWindows.of(Time.seconds(configuration.get(Configs.COMBINER_WINDOW_TIME))))
- .trigger(trigger)
- .sideOutputLateData(delayedChunkOutputTag)
- .process(new CombineChunkProcessWindowFunction(configuration.get(Configs.FILE_MAX_CHUNK_COUNT)))
- .name("Window: Combine Chunk")
- .setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM))
- .disableChaining();
+ SingleOutputStreamOperator<FileChunk> parseMessagePackStream = environment
+ .addSource(KafkaConsumer.byteArrayConsumer(configuration))
+ .name(configuration.get(Configs.KAFKA_TOPIC))
+ .map(new ParseMessagePackMapFunction())
+ .name("Map: Parse Message Pack")
+ .filter(new FileChunkFilterFunction(configuration.getString(Configs.MAP_FILTER_EXPRESSION), "map_parse_message_pack"))
+ .name("Filter: Map")
+ .assignTimestampsAndWatermarks(watermarkStrategy);
- if ("hos".equals(configuration.get(Configs.SINK_TYPE))) {
- windowStream.addSink(new HosSink(configuration))
- .name("Hos")
- .setParallelism(configuration.get(Configs.SINK_PARALLELISM));
- windowStream.getSideOutput(delayedChunkOutputTag)
- .map(new SideOutputMapFunction())
- .addSink(new HosSink(configuration))
- .name("Delayed Chunk");
+ List<Trigger<Object, TimeWindow>> triggers = new ArrayList<>();
+ triggers.add(EventTimeTrigger.create());
+ if (configuration.get(Configs.COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER)) {
+ triggers.add(LastChunkOrNoDataInTimeTrigger.of(configuration.get(Configs.COMBINER_WINDOW_IDLE_TIME) * 1000));
+ } else {
+ triggers.add(IdleTimeTrigger.of(configuration.get(Configs.COMBINER_WINDOW_IDLE_TIME) * 1000));
+ }
+ Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
+ windowStream = parseMessagePackStream
+ .keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO)
+ .window(TumblingEventTimeWindows.of(Time.seconds(configuration.get(Configs.COMBINER_WINDOW_TIME))))
+ .trigger(trigger)
+ .sideOutputLateData(delayedChunkOutputTag)
+ .allowedLateness(Time.seconds(configuration.get(Configs.COMBINER_WINDOW_ALLOWED_LATENESS)))
+ .process(new CombineChunkProcessWindowFunction(configuration.get(Configs.FILE_MAX_CHUNK_COUNT)))
+ .name("Window: Combine Chunk")
+ .setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM));
} else {
- windowStream.addSink(new HBaseSink(configuration))
- .name("HBase")
- .setParallelism(configuration.get(Configs.SINK_PARALLELISM));
- windowStream.getSideOutput(delayedChunkOutputTag)
- .map(new SideOutputMapFunction())
- .addSink(new HBaseSink(configuration))
- .name("Delayed Chunk");
+ SingleOutputStreamOperator<FileChunk> parseMessagePackStream = environment
+ .addSource(KafkaConsumer.byteArrayConsumer(configuration))
+ .name(configuration.get(Configs.KAFKA_TOPIC))
+ .map(new ParseMessagePackMapFunction())
+ .name("Map: Parse Message Pack")
+ .filter(new FileChunkFilterFunction(configuration.getString(Configs.MAP_FILTER_EXPRESSION), "map_parse_message_pack"))
+ .name("Filter: Map");
+
+ List<Trigger<Object, TimeWindow>> triggers = new ArrayList<>();
+ triggers.add(ProcessingTimeTrigger.create());
+ if (configuration.get(Configs.COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER)) {
+ triggers.add(LastChunkTrigger.create());
+ }
+ Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
+ windowStream = parseMessagePackStream
+ .keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO)
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(configuration.get(Configs.COMBINER_WINDOW_TIME))))
+ .trigger(trigger)
+ .process(new CombineChunkProcessWindowFunction(configuration.get(Configs.FILE_MAX_CHUNK_COUNT)))
+ .name("Window: Combine Chunk")
+ .setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM));
}
+ for (String sinkType : configuration.get(Configs.SINK_TYPE).split(",")) {
+ switch (sinkType) {
+ case "hos":
+ DataStream<FileChunk> sideOutput = windowStream.getSideOutput(delayedChunkOutputTag);
+ if (StrUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) {
+ windowStream
+ .filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "sink_hos"))
+ .name("Filter: Hos")
+ .setParallelism(configuration.get(Configs.SINK_PARALLELISM))
+ .addSink(new HosSink(configuration))
+ .name("Hos")
+ .setParallelism(configuration.get(Configs.SINK_PARALLELISM));
+ sideOutput = sideOutput
+ .filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "side_out_put_hos"))
+ .name("Filter: Delayed Chunk")
+ .setParallelism(configuration.get(Configs.SINK_PARALLELISM));
+ } else {
+ windowStream
+ .addSink(new HosSink(configuration))
+ .name("Hos")
+ .setParallelism(configuration.get(Configs.SINK_PARALLELISM));
+ }
+ sideOutput.map(new SideOutputMapFunction())
+ .setParallelism(configuration.get(Configs.SINK_PARALLELISM))
+ .addSink(new HosSink(configuration))
+ .setParallelism(configuration.get(Configs.SINK_PARALLELISM))
+ .name("Delayed Chunk");
+ break;
+ case "hbase":
+ sideOutput = windowStream.getSideOutput(delayedChunkOutputTag);
+ if (StrUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) {
+ windowStream
+ .filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "sink_hbase"))
+ .name("Filter: HBase")
+ .setParallelism(configuration.get(Configs.SINK_PARALLELISM))
+ .addSink(new HBaseSink(configuration))
+ .name("HBase")
+ .setParallelism(configuration.get(Configs.SINK_PARALLELISM));
+ sideOutput = sideOutput
+ .filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "side_out_put_hbase"))
+ .name("Filter: Delayed Chunk")
+ .setParallelism(configuration.get(Configs.SINK_PARALLELISM));
+ } else {
+ windowStream
+ .addSink(new HBaseSink(configuration))
+ .name("HBase")
+ .setParallelism(configuration.get(Configs.SINK_PARALLELISM));
+ }
+ sideOutput
+ .map(new SideOutputMapFunction())
+ .setParallelism(configuration.get(Configs.SINK_PARALLELISM))
+ .addSink(new HBaseSink(configuration))
+ .setParallelism(configuration.get(Configs.SINK_PARALLELISM))
+ .name("Delayed Chunk");
+ break;
+ case "oss":
+ SingleOutputStreamOperator<FileChunk> fileMetaSessionSingleOutputStreamOperator = environment.addSource(FileMetaKafkaConsumer.stringConsumer(configuration, configuration.get(Configs.KAFKA_FILE_META_SESSION_TOPIC)))
+ .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
+ .name(configuration.get(Configs.KAFKA_FILE_META_SESSION_TOPIC))
+ .flatMap(new ParseSessionFileMetaFlatMapFunction())
+ .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
+ .name("Map: Parse Session File Meta")
+ .filter(new FileChunkFilterFunction(configuration.getString(Configs.FILE_META_FILTER_EXPRESSION), "map_parse_session_file_meta"))
+ .name("Filter: Map")
+ .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM));
+ SingleOutputStreamOperator<FileChunk> fileMetaProxySingleOutputStreamOperator = environment.addSource(FileMetaKafkaConsumer.stringConsumer(configuration, configuration.get(Configs.KAFKA_FILE_META_PROXY_TOPIC)))
+ .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
+ .name(configuration.get(Configs.KAFKA_FILE_META_PROXY_TOPIC))
+ .flatMap(new ParseProxyFileMetaFlatMapFunction())
+ .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
+ .name("Map: Parse Proxy File Meta")
+ .filter(new FileChunkFilterFunction(configuration.getString(Configs.FILE_META_FILTER_EXPRESSION), "map_parse_proxy_file_meta"))
+ .name("Filter: Map")
+ .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM));
+ windowStream
+ .filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_OSS_FILTER_EXPRESSION), "sink_oss"))
+ .name("Filter: Oss")
+ .setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM))
+ .union(fileMetaSessionSingleOutputStreamOperator, fileMetaProxySingleOutputStreamOperator)
+ .keyBy(new FileChunkKeySelector())
+ .addSink(new OssSinkByEhcache(configuration))
+ .name("Oss")
+ .setParallelism(configuration.get(Configs.SINK_PARALLELISM));
+ break;
+ case "oss-caffeine":
+ fileMetaSessionSingleOutputStreamOperator = environment.addSource(FileMetaKafkaConsumer.stringConsumer(configuration, configuration.get(Configs.KAFKA_FILE_META_SESSION_TOPIC)))
+ .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
+ .name(configuration.get(Configs.KAFKA_FILE_META_SESSION_TOPIC))
+ .flatMap(new ParseSessionFileMetaFlatMapFunction())
+ .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
+ .name("Map: Parse Session File Meta")
+ .filter(new FileChunkFilterFunction(configuration.getString(Configs.FILE_META_FILTER_EXPRESSION), "map_parse_session_file_meta"))
+ .name("Filter: Map")
+ .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM));
+ fileMetaProxySingleOutputStreamOperator = environment.addSource(FileMetaKafkaConsumer.stringConsumer(configuration, configuration.get(Configs.KAFKA_FILE_META_PROXY_TOPIC)))
+ .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
+ .name(configuration.get(Configs.KAFKA_FILE_META_PROXY_TOPIC))
+ .flatMap(new ParseProxyFileMetaFlatMapFunction())
+ .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
+ .name("Map: Parse Proxy File Meta")
+ .filter(new FileChunkFilterFunction(configuration.getString(Configs.FILE_META_FILTER_EXPRESSION), "map_parse_proxy_file_meta"))
+ .name("Filter: Map")
+ .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM));
+ windowStream
+ .filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_OSS_FILTER_EXPRESSION), "sink_oss"))
+ .name("Filter: Oss")
+ .setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM))
+ .union(fileMetaSessionSingleOutputStreamOperator, fileMetaProxySingleOutputStreamOperator)
+ .keyBy(new FileChunkKeySelector())
+ .addSink(new OssSinkByCaffeineCache(configuration))
+ .name("Oss")
+ .setParallelism(configuration.get(Configs.SINK_PARALLELISM));
+ break;
+ case "test":
+ fileMetaSessionSingleOutputStreamOperator = environment.addSource(FileMetaKafkaConsumer.stringConsumer(configuration, configuration.get(Configs.KAFKA_FILE_META_SESSION_TOPIC)))
+ .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
+ .name(configuration.get(Configs.KAFKA_FILE_META_SESSION_TOPIC))
+ .flatMap(new ParseSessionFileMetaFlatMapFunction())
+ .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
+ .name("Map: Parse Session File Meta")
+ .filter(new FileChunkFilterFunction(configuration.getString(Configs.FILE_META_FILTER_EXPRESSION), "map_parse_session_file_meta"))
+ .name("Filter: Map")
+ .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM));
+ fileMetaProxySingleOutputStreamOperator = environment.addSource(FileMetaKafkaConsumer.stringConsumer(configuration, configuration.get(Configs.KAFKA_FILE_META_PROXY_TOPIC)))
+ .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
+ .name(configuration.get(Configs.KAFKA_FILE_META_PROXY_TOPIC))
+ .flatMap(new ParseProxyFileMetaFlatMapFunction())
+ .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM))
+ .name("Map: Parse Proxy File Meta")
+ .filter(new FileChunkFilterFunction(configuration.getString(Configs.FILE_META_FILTER_EXPRESSION), "map_parse_proxy_file_meta"))
+ .name("Filter: Map")
+ .setParallelism(configuration.get(Configs.MAP_PARSE_FILE_META_PARALLELISM));
+ KeyedStream<FileChunk, String> fileMetaStringKeyedStream = fileMetaSessionSingleOutputStreamOperator
+ .union(fileMetaProxySingleOutputStreamOperator)
+ .keyBy((KeySelector<FileChunk, String>) FileChunk::getUuid);
+ windowStream
+ .filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_OSS_FILTER_EXPRESSION), "sink_oss"))
+ .name("Filter: Oss")
+ .setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM))
+ .keyBy((KeySelector<FileChunk, String>) FileChunk::getUuid)
+ .connect(fileMetaStringKeyedStream)
+ .process(new TestKeyedCoProcessFunction(configuration))
+ .setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM))
+ .name("Oss");
+ break;
+ }
+ }
environment.execute(configuration.get(Configs.FLINK_JOB_NAME));
}
}
diff --git a/src/main/java/com/zdjizhi/config/Configs.java b/src/main/java/com/zdjizhi/config/Configs.java
index 118ab0e..0f642bf 100644
--- a/src/main/java/com/zdjizhi/config/Configs.java
+++ b/src/main/java/com/zdjizhi/config/Configs.java
@@ -14,7 +14,7 @@ public class Configs {
.noDefaultValue();
public static final ConfigOption<String> KAFKA_GROUP_ID = ConfigOptions.key("source.kafka.group.id")
.stringType()
- .defaultValue("test1");
+ .defaultValue("agg_traffic_file_chunk_combine");
public static final ConfigOption<String> KAFKA_TOPIC = ConfigOptions.key("source.kafka.topic")
.stringType()
.noDefaultValue();
@@ -43,25 +43,28 @@ public class Configs {
.stringType()
.noDefaultValue();
- public static final ConfigOption<Boolean> ENABLE_RATE_LIMIT = ConfigOptions.key("enable.rate.limit")
- .booleanType()
- .defaultValue(false);
- public static final ConfigOption<Long> RATE_LIMIT_THRESHOLD = ConfigOptions.key("rate.limit.threshold")
- .longType()
- .defaultValue(Long.MAX_VALUE);
- public static final ConfigOption<String> RATE_LIMIT_EXCLUSION_EXPRESSION = ConfigOptions.key("rate.limit.exclusion.expression")
- .stringType()
- .defaultValue("");
+ public static final ConfigOption<Integer> MAP_PARSE_FILE_META_PARALLELISM = ConfigOptions.key("map.parse.file.meta.parallelism")
+ .intType()
+ .defaultValue(1);
+ public static final ConfigOption<Integer> COMBINER_WINDOW_TYPE = ConfigOptions.key("combiner.window.type")
+ .intType()
+ .defaultValue(0);
public static final ConfigOption<Integer> COMBINER_WINDOW_PARALLELISM = ConfigOptions.key("combiner.window.parallelism")
.intType()
.defaultValue(1);
public static final ConfigOption<Long> COMBINER_WINDOW_TIME = ConfigOptions.key("combiner.window.time")
.longType()
.defaultValue(5L);
+ public static final ConfigOption<Long> COMBINER_WINDOW_ALLOWED_LATENESS = ConfigOptions.key("combiner.window.allowed.lateness")
+ .longType()
+ .defaultValue(0L);
public static final ConfigOption<Long> COMBINER_WINDOW_IDLE_TIME = ConfigOptions.key("combiner.window.idle.time")
.longType()
.defaultValue(5L);
+ public static final ConfigOption<Boolean> COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER = ConfigOptions.key("combiner.window.enable.last.chunk.trigger")
+ .booleanType()
+ .defaultValue(true);
public static final ConfigOption<String> SINK_TYPE = ConfigOptions.key("sink.type")
.stringType()
@@ -81,9 +84,16 @@ public class Configs {
public static final ConfigOption<Long> SINK_BATCH_SIZE = ConfigOptions.key("sink.batch.size")
.longType()
.defaultValue(Long.MAX_VALUE);
- public static final ConfigOption<Integer> SINK_HOS_LOAD_BALANCE_MODE = ConfigOptions.key("sink.hos.load.balance.mode")
- .intType()
- .defaultValue(0);
+ public static final ConfigOption<String> SINK_FILTER_EXPRESSION = ConfigOptions.key("sink.filter.expression")
+ .stringType()
+ .defaultValue("");
+ public static final ConfigOption<Long> SINK_RATE_LIMIT_THRESHOLD = ConfigOptions.key("sink.rate.limit.threshold")
+ .longType()
+ .defaultValue(0L);
+ public static final ConfigOption<String> SINK_RATE_LIMIT_EXCLUSION_EXPRESSION = ConfigOptions.key("sink.rate.limit.exclusion.expression")
+ .stringType()
+ .defaultValue("");
+
public static final ConfigOption<String> SINK_HOS_ENDPOINT = ConfigOptions.key("sink.hos.endpoint")
.stringType()
.noDefaultValue();
@@ -93,25 +103,36 @@ public class Configs {
public static final ConfigOption<String> SINK_HOS_TOKEN = ConfigOptions.key("sink.hos.token")
.stringType()
.defaultValue("");
- public static final ConfigOption<Integer> SINK_HOS_HTTP_MAX_TOTAL = ConfigOptions.key("sink.hos.http.max.total")
+
+ public static final ConfigOption<Integer> SINK_HTTP_MAX_TOTAL = ConfigOptions.key("sink.http.max.total")
.intType()
.defaultValue(2000);
- public static final ConfigOption<Integer> SINK_HOS_HTTP_MAX_PER_ROUTE = ConfigOptions.key("sink.hos.http.max.per.route")
+ public static final ConfigOption<Integer> SINK_HTTP_MAX_PER_ROUTE = ConfigOptions.key("sink.http.max.per.route")
.intType()
.defaultValue(1000);
- public static final ConfigOption<Integer> SINK_HOS_HTTP_ERROR_RETRY = ConfigOptions.key("sink.hos.http.error.retry")
+ public static final ConfigOption<Integer> SINK_HTTP_ERROR_RETRY = ConfigOptions.key("sink.http.error.retry")
.intType()
.defaultValue(3);
- public static final ConfigOption<Integer> SINK_HOS_HTTP_CONNECT_TIMEOUT = ConfigOptions.key("sink.hos.http.connect.timeout")
+ public static final ConfigOption<Integer> SINK_HTTP_CONNECT_TIMEOUT = ConfigOptions.key("sink.http.connect.timeout")
.intType()
.defaultValue(10000);
- public static final ConfigOption<Integer> SINK_HOS_HTTP_REQUEST_TIMEOUT = ConfigOptions.key("sink.hos.http.request.timeout")
+ public static final ConfigOption<Integer> SINK_HTTP_REQUEST_TIMEOUT = ConfigOptions.key("sink.http.request.timeout")
.intType()
.defaultValue(10000);
- public static final ConfigOption<Integer> SINK_HOS_HTTP_SOCKET_TIMEOUT = ConfigOptions.key("sink.hos.http.socket.timeout")
+ public static final ConfigOption<Integer> SINK_HTTP_SOCKET_TIMEOUT = ConfigOptions.key("sink.http.socket.timeout")
.intType()
.defaultValue(60000);
+ public static final ConfigOption<Boolean> SINK_OSS_ASYNC = ConfigOptions.key("sink.oss.async")
+ .booleanType()
+ .defaultValue(false);
+ public static final ConfigOption<String> SINK_OSS_ENDPOINT = ConfigOptions.key("sink.oss.endpoint")
+ .stringType()
+ .noDefaultValue();
+ public static final ConfigOption<String> SINK_OSS_FILTER_EXPRESSION = ConfigOptions.key("sink.oss.filter.expression")
+ .stringType()
+ .defaultValue("");
+
public static final ConfigOption<String> SINK_HBASE_ZOOKEEPER = ConfigOptions.key("sink.hbase.zookeeper")
.stringType()
.defaultValue("");
@@ -131,10 +152,26 @@ public class Configs {
public static final ConfigOption<Integer> FILE_MAX_CHUNK_COUNT = ConfigOptions.key("file.max.chunk.count")
.intType()
.defaultValue(100000);
- public static final ConfigOption<Long> FILE_MAX_SIZE = ConfigOptions.key("file.max.size")
- .longType()
- .defaultValue(10737418240L);
- public static final ConfigOption<String> FILTER_EXPRESSION = ConfigOptions.key("filter.expression")
+ public static final ConfigOption<String> MAP_FILTER_EXPRESSION = ConfigOptions.key("map.filter.expression")
.stringType()
.defaultValue("");
+ public static final ConfigOption<String> FILE_META_FILTER_EXPRESSION = ConfigOptions.key("file.meta.filter.expression")
+ .stringType()
+ .defaultValue("");
+
+ public static final ConfigOption<String> KAFKA_FILE_META_SESSION_TOPIC = ConfigOptions.key("source.kafka.file.meta.session.topic")
+ .stringType()
+ .noDefaultValue();
+ public static final ConfigOption<String> KAFKA_FILE_META_PROXY_TOPIC = ConfigOptions.key("source.kafka.file.meta.proxy.topic")
+ .stringType()
+ .noDefaultValue();
+ public static final ConfigOption<String> KAFKA_FILE_META_GROUP_ID = ConfigOptions.key("source.kafka.file.meta.group.id")
+ .stringType()
+ .defaultValue("file_chunk_combine_1");
+ public static final ConfigOption<Long> FILE_META_CACHE_TIME = ConfigOptions.key("file.meta.cache.time")
+ .longType()
+ .defaultValue(0L);
+ public static final ConfigOption<Long> FILE_META_CACHE_SIZE = ConfigOptions.key("file.meta.cache.size")
+ .longType()
+ .defaultValue(0L);
}
diff --git a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
index 9797b1d..cd5b0df 100644
--- a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
+++ b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
@@ -22,8 +22,20 @@ import static com.zdjizhi.utils.PublicConstants.COMBINE_MODE_SEEK;
public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<FileChunk, FileChunk, String, TimeWindow> {
private static final Log LOG = LogFactory.get();
+ public transient Counter completeFilesCounter;
+ public transient Counter completeEmlFilesCounter;
+ public transient Counter completeTxtFilesCounter;
+ public transient Counter startChunksCounter;
+ public transient Counter endChunksCounter;
+ public transient Counter chunksInCounter;
+ public transient Counter chunksOutCounter;
+ public transient Counter bytesInCounter;
+ public transient Counter bytesOutCounter;
public transient Counter duplicateChunksCounter;
- public transient Counter combineErrorChunksCounter;
+ public transient Counter errorChunksCounter;
+ public transient Counter nullChunksCounter;
+ public transient Counter nullTxtChunksCounter;
+ public transient Counter nullEmlChunksCounter;
private final int fileMaxChunkCount;
public CombineChunkProcessWindowFunction(int fileMaxChunkCount) {
@@ -33,17 +45,42 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup("file_chunk_combiner", "window_combine_chunk");
+ startChunksCounter = metricGroup.counter("startChunksCount");
+ endChunksCounter = metricGroup.counter("endChunksCount");
duplicateChunksCounter = metricGroup.counter("duplicateChunksCount");
- combineErrorChunksCounter = metricGroup.counter("combineErrorChunksCount");
+ errorChunksCounter = metricGroup.counter("errorChunksCount");
+ chunksInCounter = metricGroup.counter("chunksInCount");
+ chunksOutCounter = metricGroup.counter("chunksOutCount");
+ bytesInCounter = metricGroup.counter("bytesInCount");
+ bytesOutCounter = metricGroup.counter("bytesOutCount");
+ completeFilesCounter = metricGroup.counter("completeFilesCount");
+ completeEmlFilesCounter = metricGroup.counter("completeEmlFilesCount");
+ completeTxtFilesCounter = metricGroup.counter("completeTxtFilesCount");
+ metricGroup.meter("numCompleteFilesOutPerSecond", new MeterView(completeFilesCounter));
+ metricGroup.meter("numCompleteEmlFilesOutPerSecond", new MeterView(completeEmlFilesCounter));
+ metricGroup.meter("numCompleteTxtFilesOutPerSecond", new MeterView(completeTxtFilesCounter));
+ metricGroup.meter("numStartChunksOutPerSecond", new MeterView(startChunksCounter));
+ metricGroup.meter("numEndChunksOutPerSecond", new MeterView(endChunksCounter));
+ metricGroup.meter("numChunksInPerSecond", new MeterView(chunksInCounter));
+ metricGroup.meter("numChunksOutPerSecond", new MeterView(chunksOutCounter));
+ metricGroup.meter("numBytesInPerSecond", new MeterView(bytesInCounter));
+ metricGroup.meter("numBytesOutPerSecond", new MeterView(bytesOutCounter));
+ metricGroup.meter("numErrorChunksPerSecond", new MeterView(errorChunksCounter));
metricGroup.meter("numDuplicateChunksInPerSecond", new MeterView(duplicateChunksCounter));
- metricGroup.meter("numChunksCombineErrorPerSecond", new MeterView(combineErrorChunksCounter));
+ nullChunksCounter = metricGroup.counter("nullChunksCount");
+ nullEmlChunksCounter = metricGroup.counter("nullTxtChunksCount");
+ nullTxtChunksCounter = metricGroup.counter("nullEmlChunksCount");
+ metricGroup.meter("numNullFilesOutPerSecond", new MeterView(nullChunksCounter));
+ metricGroup.meter("numNullEmlFilesOutPerSecond", new MeterView(nullEmlChunksCounter));
+ metricGroup.meter("numNullTxtFilesOutPerSecond", new MeterView(nullTxtChunksCounter));
}
@Override
public void process(String string, Context context, Iterable<FileChunk> elements, Collector<FileChunk> out) {
List<FileChunk> fileChunks = combine(elements);
for (FileChunk fileChunk : fileChunks) {
+ calculateFileChunkMetrics(fileChunk);
out.collect(fileChunk);
}
}
@@ -51,6 +88,7 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
private List<FileChunk> combine(Iterable<FileChunk> input) {
List<FileChunk> combinedFileChunkList = new ArrayList<>();
List<FileChunk> originalFileChunkList = StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList());
+ chunksInCounter.inc(originalFileChunkList.size());
try {
List<byte[]> waitingToCombineChunkList = new ArrayList<>();
if (COMBINE_MODE_SEEK.equals(originalFileChunkList.get(0).getCombineMode())) {
@@ -60,6 +98,7 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
if (originalFileChunkIterator.hasNext()) {
int duplicateCount = 0;
FileChunk currentFileChunk = originalFileChunkIterator.next();
+ bytesInCounter.inc(currentFileChunk.getLength());
int lastChunkFlag = currentFileChunk.getLastChunkFlag();
long startOffset = currentFileChunk.getOffset();
if (currentFileChunk.getChunk() != null && currentFileChunk.getChunk().length > 0) {
@@ -68,6 +107,7 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
while (originalFileChunkIterator.hasNext()) {
long expectedOffset = currentFileChunk.getOffset() + currentFileChunk.getLength();
currentFileChunk = originalFileChunkIterator.next();
+ bytesInCounter.inc(currentFileChunk.getLength());
long actualOffset = currentFileChunk.getOffset();
if (expectedOffset > actualOffset) {// 期望offset大于当前offset,该块为重复块,跳过该块
duplicateCount++;
@@ -81,7 +121,10 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
}
} else {// 期望offset小于当前offset,说明缺块
if (waitingToCombineChunkList.size() > 0) {//将可合并的chunk合并,清空集合
- combinedFileChunkList.add(combineChunk(waitingToCombineChunkList, currentFileChunk.getUuid(), currentFileChunk.getFileName(), currentFileChunk.getFileType(), startOffset, currentFileChunk.getCombineMode(), lastChunkFlag, originalFileChunkList.get(0).getMeta(), 0, null));
+ FileChunk fileChunk = combineChunk(waitingToCombineChunkList, currentFileChunk.getUuid(), currentFileChunk.getFileName(), currentFileChunk.getFileType(), startOffset, currentFileChunk.getCombineMode(), lastChunkFlag, originalFileChunkList.get(0).getMeta(), originalFileChunkList.get(0).getTimestamp(), null);
+ if (fileChunk != null) {
+ combinedFileChunkList.add(fileChunk);
+ }
waitingToCombineChunkList.clear();
} else {
if (lastChunkFlag == 1) {
@@ -97,7 +140,10 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
}
}
if (waitingToCombineChunkList.size() > 0) {
- combinedFileChunkList.add(combineChunk(waitingToCombineChunkList, currentFileChunk.getUuid(), currentFileChunk.getFileName(), currentFileChunk.getFileType(), startOffset, currentFileChunk.getCombineMode(), lastChunkFlag, originalFileChunkList.get(0).getMeta(), 0, null));
+ FileChunk fileChunk = combineChunk(waitingToCombineChunkList, currentFileChunk.getUuid(), currentFileChunk.getFileName(), currentFileChunk.getFileType(), startOffset, currentFileChunk.getCombineMode(), lastChunkFlag, originalFileChunkList.get(0).getMeta(), originalFileChunkList.get(0).getTimestamp(), null);
+ if (fileChunk != null) {
+ combinedFileChunkList.add(fileChunk);
+ }
} else {
if (lastChunkFlag == 1) {
combinedFileChunkList.add(currentFileChunk);
@@ -113,6 +159,7 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
long startTimestamp = originalFileChunkList.get(0).getTimestamp();
StringBuilder timestampAndSizes = new StringBuilder();
for (FileChunk originalFileChunk : originalFileChunkList) {
+ bytesInCounter.inc(originalFileChunk.getLength());
byte[] chunk = originalFileChunk.getChunk();
if (chunk != null && chunk.length > 0) {
chunk = originalFileChunk.getChunk();
@@ -124,38 +171,76 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
}
}
if (waitingToCombineChunkList.size() > 0) {
- combinedFileChunkList.add(combineChunk(waitingToCombineChunkList, originalFileChunkList.get(0).getUuid(), originalFileChunkList.get(0).getFileName(), originalFileChunkList.get(0).getFileType(), 0, "append", 0, originalFileChunkList.get(0).getMeta(), startTimestamp, timestampAndSizes.toString()));
+ FileChunk fileChunk = combineChunk(waitingToCombineChunkList, originalFileChunkList.get(0).getUuid(), originalFileChunkList.get(0).getFileName(), originalFileChunkList.get(0).getFileType(), 0, "append", 0, originalFileChunkList.get(0).getMeta(), startTimestamp, timestampAndSizes.toString());
+ if (fileChunk != null) {
+ combinedFileChunkList.add(fileChunk);
+ }
}
}
+ chunksOutCounter.inc(combinedFileChunkList.size());
} catch (Exception e) {
- LOG.error("Combiner chunk error.", e);
- combineErrorChunksCounter.inc(originalFileChunkList.size());
+ LOG.error("Combiner chunk error. uuid: " + originalFileChunkList.get(0).getUuid() + ", chunk count: " + originalFileChunkList.size() + ". ", e);
+ errorChunksCounter.inc(originalFileChunkList.size());
}
return combinedFileChunkList;
}
private FileChunk combineChunk(List<byte[]> byteList, String uuid, String fileName, String fileType, long offset, String combineMode, int lastChunkFlag, Map<String, Object> metaMap, long startTimestamp, String chunkNumbers) {
FileChunk fileChunk = new FileChunk();
- fileChunk.setChunkCount(byteList.size());
- byte[][] bytes = new byte[byteList.size()][];
- byteList.toArray(bytes);
- byte[] newData = ArrayUtil.addAll(bytes);
- if (COMBINE_MODE_SEEK.equals(combineMode)) {
- fileChunk.setOffset(offset);
- fileChunk.setLastChunkFlag(lastChunkFlag);
- } else {
- if (StringUtil.isNotEmpty(chunkNumbers)) {
- fileChunk.setChunkNumbers(chunkNumbers);
+ try {
+ fileChunk.setChunkCount(byteList.size());
+ byte[][] bytes = new byte[byteList.size()][];
+ byteList.toArray(bytes);
+ byte[] newData = ArrayUtil.addAll(bytes);
+ if (COMBINE_MODE_SEEK.equals(combineMode)) {
+ fileChunk.setOffset(offset);
+ fileChunk.setLastChunkFlag(lastChunkFlag);
+ } else {
+ if (StringUtil.isNotEmpty(chunkNumbers)) {
+ fileChunk.setChunkNumbers(chunkNumbers);
+ }
}
+ fileChunk.setTimestamp(startTimestamp);
+ fileChunk.setFileType(fileType);
+ fileChunk.setUuid(uuid);
+ fileChunk.setChunk(newData);
+ fileChunk.setFileName(fileName);
+ fileChunk.setCombineMode(combineMode);
+ fileChunk.setLength(newData.length);
+ fileChunk.setMeta(metaMap);
+ bytesOutCounter.inc(newData.length);
+ } catch (Exception e) {
+ LOG.error("Combiner chunk error. uuid: " + uuid + ", chunk count: " + byteList.size() + ". ", e);
+ errorChunksCounter.inc(byteList.size());
+ fileChunk = null;
}
- fileChunk.setTimestamp(startTimestamp);
- fileChunk.setFileType(fileType);
- fileChunk.setUuid(uuid);
- fileChunk.setChunk(newData);
- fileChunk.setFileName(fileName);
- fileChunk.setCombineMode(combineMode);
- fileChunk.setLength(newData.length);
- fileChunk.setMeta(metaMap);
return fileChunk;
}
+
+ private void calculateFileChunkMetrics(FileChunk fileChunk) {
+ long offset = fileChunk.getOffset();
+ int lastChunkFlag = fileChunk.getLastChunkFlag();
+ String fileType = fileChunk.getFileType();
+ if (offset == 0 && lastChunkFlag == 1) {
+ completeFilesCounter.inc();
+ if ("eml".equals(fileType)) {
+ completeEmlFilesCounter.inc();
+ } else if ("txt".equals(fileType)) {
+ completeTxtFilesCounter.inc();
+ }
+ if (fileChunk.getChunk() == null) {
+ nullChunksCounter.inc();
+ if ("eml".equals(fileType)) {
+ nullEmlChunksCounter.inc();
+ } else if ("txt".equals(fileType)) {
+ nullTxtChunksCounter.inc();
+ }
+ LOG.info("send file data is null. " + fileChunk.toString());
+ }
+ } else if (offset == 0) {
+ startChunksCounter.inc();
+ } else if (lastChunkFlag == 1) {
+ endChunksCounter.inc();
+ }
+ }
}
diff --git a/src/main/java/com/zdjizhi/function/FileChunkFilterFunction.java b/src/main/java/com/zdjizhi/function/FileChunkFilterFunction.java
index 4c39cb2..41f46f7 100644
--- a/src/main/java/com/zdjizhi/function/FileChunkFilterFunction.java
+++ b/src/main/java/com/zdjizhi/function/FileChunkFilterFunction.java
@@ -1,6 +1,8 @@
package com.zdjizhi.function;
import cn.hutool.core.util.StrUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
import com.zdjizhi.pojo.FileChunk;
import org.apache.commons.jexl3.*;
import org.apache.flink.api.common.functions.RichFilterFunction;
@@ -10,22 +12,30 @@ import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
public class FileChunkFilterFunction extends RichFilterFunction<FileChunk> {
- private final long maxFileSize;
+ private static final Log LOG = LogFactory.get();
+
private final String filterExpression;
+ private final String functionName;
public transient Counter filterChunksCounter;
+ public transient Counter emlChunksCounter;
+ public transient Counter txtChunksCounter;
private JexlExpression jexlExpression;
private JexlContext jexlContext;
- public FileChunkFilterFunction(long maxFileSize, String filterExpression) {
- this.maxFileSize = maxFileSize;
+ public FileChunkFilterFunction(String filterExpression, String functionName) {
this.filterExpression = filterExpression;
+ this.functionName = functionName;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup("file_chunk_combiner", "filter_" + functionName);
filterChunksCounter = metricGroup.counter("filterChunksCount");
+ emlChunksCounter = metricGroup.counter("emlChunksCount");
+ txtChunksCounter = metricGroup.counter("txtChunksCount");
+ metricGroup.meter("numEmlChunksInPerSecond", new MeterView(emlChunksCounter));
+ metricGroup.meter("numTxtChunksInPerSecond", new MeterView(txtChunksCounter));
metricGroup.meter("numChunksFilterPerSecond", new MeterView(filterChunksCounter));
JexlEngine jexlEngine = new JexlBuilder().create();
jexlExpression = jexlEngine.createExpression(filterExpression);
@@ -34,14 +44,21 @@ public class FileChunkFilterFunction extends RichFilterFunction<FileChunk> {
@Override
public boolean filter(FileChunk value) {
- if (value == null || value.getOffset() > maxFileSize) {
+ if (value == null) {
filterChunksCounter.inc();
return false;
}
if (StrUtil.isNotEmpty(filterExpression)) {
- jexlContext.set(value.getClass().getSimpleName(), value);
+ jexlContext.set("FileChunk", value);
if (!Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString())) {
filterChunksCounter.inc();
+ String fileType = value.getFileType();
+ if ("eml".equals(fileType)) {
+ emlChunksCounter.inc();
+ } else if ("txt".equals(fileType)) {
+ txtChunksCounter.inc();
+ }
+ LOG.info(functionName + " filter: " + value.toString());
return false;
}
}
diff --git a/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java b/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java
deleted file mode 100644
index c7add8d..0000000
--- a/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java
+++ /dev/null
@@ -1,223 +0,0 @@
-package com.zdjizhi.function;
-
-import cn.hutool.core.util.StrUtil;
-import cn.hutool.json.JSONObject;
-import cn.hutool.json.JSONUtil;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.pojo.FileChunk;
-import org.apache.commons.jexl3.*;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.*;
-import org.msgpack.core.MessagePack;
-import org.msgpack.core.MessageUnpacker;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static com.zdjizhi.utils.PublicConstants.COMBINE_MODE_APPEND;
-
-public class ParseMessagePackMapFunction extends RichMapFunction<byte[], FileChunk> {
- private static final Log LOG = LogFactory.get();
- private final boolean enableRateLimit;
- private final long rateLimitThreshold;
- private final String rateLimitExpression;
- public transient Counter parseMessagePacksCounter;
- public transient Counter parseErrorMessagePacksCounter;
- public transient Counter rateLimitDropChunksCounter;
- public transient Counter equal0BChunksCounter;
- public transient Counter lessThan1KBChunksCounter;
- public transient Counter between1KBAnd5KBChunksCounter;
- public transient Counter between5KBAnd10KBChunksCounter;
- public transient Counter between10KBAnd50KBChunksCounter;
- public transient Counter between50KBAnd100KBChunksCounter;
- public transient Counter greaterThan100KBChunksCounter;
- public transient Counter emlChunksCounter;
- public transient Counter txtChunksCounter;
- public transient Counter htmlChunksCounter;
- public transient Counter pcapngChunksCounter;
- public transient Counter mediaChunksCounter;
- private long timestamp;
- private long count;
- private JexlExpression jexlExpression;
- private JexlContext jexlContext;
-
- public ParseMessagePackMapFunction(boolean enableRateLimit, long rateLimitThreshold, String rateLimitExpression) {
- this.rateLimitThreshold = rateLimitThreshold;
- this.enableRateLimit = enableRateLimit;
- this.rateLimitExpression = rateLimitExpression;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
- equal0BChunksCounter = metricGroup.counter("equal0BChunksCount");
- lessThan1KBChunksCounter = metricGroup.counter("lessThan1KBChunksCount");
- between1KBAnd5KBChunksCounter = metricGroup.counter("between1KBAnd5KBChunksCount");
- between5KBAnd10KBChunksCounter = metricGroup.counter("between5KBAnd10KBChunksCount");
- between10KBAnd50KBChunksCounter = metricGroup.counter("between10KBAnd50KBChunksCount");
- between50KBAnd100KBChunksCounter = metricGroup.counter("between50KBAnd100KBChunksCount");
- greaterThan100KBChunksCounter = metricGroup.counter("greaterThan100KBChunksCount");
- metricGroup.meter("numEqual0BChunksInPerSecond", new MeterView(equal0BChunksCounter));
- metricGroup.meter("numLessThan1KBChunksInPerSecond", new MeterView(lessThan1KBChunksCounter));
- metricGroup.meter("numBetween1KBAnd5KBChunksInPerSecond", new MeterView(between1KBAnd5KBChunksCounter));
- metricGroup.meter("numBetween5KBAnd10KBChunksInPerSecond", new MeterView(between5KBAnd10KBChunksCounter));
- metricGroup.meter("numBetween10KBAnd50KBChunksInPerSecond", new MeterView(between10KBAnd50KBChunksCounter));
- metricGroup.meter("numBetween50KBAnd100KBChunksInPerSecond", new MeterView(between50KBAnd100KBChunksCounter));
- metricGroup.meter("numGreaterThan100KBChunksInPerSecond", new MeterView(greaterThan100KBChunksCounter));
- emlChunksCounter = metricGroup.counter("emlChunksCount");
- txtChunksCounter = metricGroup.counter("txtChunksCount");
- htmlChunksCounter = metricGroup.counter("htmlChunksCount");
- pcapngChunksCounter = metricGroup.counter("pcapngChunksCount");
- mediaChunksCounter = metricGroup.counter("mediaChunksCount");
- metricGroup.meter("numEmlChunksInPerSecond", new MeterView(emlChunksCounter));
- metricGroup.meter("numTxtChunksInPerSecond", new MeterView(txtChunksCounter));
- metricGroup.meter("numHtmlChunksInPerSecond", new MeterView(htmlChunksCounter));
- metricGroup.meter("numPcapngChunksInPerSecond", new MeterView(pcapngChunksCounter));
- metricGroup.meter("numMediaChunksInPerSecond", new MeterView(mediaChunksCounter));
- parseMessagePacksCounter = metricGroup.counter("parseMessagePacksCount");
- parseErrorMessagePacksCounter = metricGroup.counter("parseErrorMessagePacksCount");
- rateLimitDropChunksCounter = metricGroup.counter("rateLimitDropChunksCount");
- metricGroup.meter("numMessagePacksParsePerSecond", new MeterView(parseMessagePacksCounter));
- metricGroup.meter("numMessagePacksParseErrorPerSecond", new MeterView(parseErrorMessagePacksCounter));
- metricGroup.meter("numChunksRateLimitDropPerSecond", new MeterView(rateLimitDropChunksCounter));
- timestamp = System.currentTimeMillis();
- count = 0;
- JexlEngine jexlEngine = new JexlBuilder().create();
- jexlExpression = jexlEngine.createExpression(rateLimitExpression);
- jexlContext = new MapContext();
- }
-
- @Override
- public FileChunk map(byte[] messagePackData) {
- FileChunk fileChunk = parseMessagePack(messagePackData);
- if (enableRateLimit) {
- count++;
- if (System.currentTimeMillis() - timestamp < 1000 && count > rateLimitThreshold) {
- if (StrUtil.isNotEmpty(rateLimitExpression)) {
- jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk);
- if (Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString())) {
- return fileChunk;
- }
- }
- rateLimitDropChunksCounter.inc();
- fileChunk = null;
- } else if (System.currentTimeMillis() - timestamp >= 1000) {
- if (StrUtil.isNotEmpty(rateLimitExpression)) {
- jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk);
- if (Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString())) {
- return fileChunk;
- }
- }
- rateLimitDropChunksCounter.inc();
- fileChunk = null;
- timestamp = System.currentTimeMillis();
- count = 0;
- }
- }
- return fileChunk;
- }
-
- private FileChunk parseMessagePack(byte[] messagePackData) {
- parseMessagePacksCounter.inc();
- FileChunk fileChunk;
- try {
- fileChunk = new FileChunk();
- MessageUnpacker messageUnpacker = MessagePack.newDefaultUnpacker(messagePackData);
- int numFields = messageUnpacker.unpackMapHeader();
- Map<String, Object> metaMap = new HashMap<>();
- for (int i = 0; i < numFields; i++) {
- String fieldName = messageUnpacker.unpackString();
- switch (fieldName) {
- case "uuid":
- fileChunk.setUuid(messageUnpacker.unpackString());
- break;
- case "fileName":
- fileChunk.setFileName(messageUnpacker.unpackString());
- break;
- case "fileType":
- fileChunk.setFileType(messageUnpacker.unpackString());
- break;
- case "combineMode":
- fileChunk.setCombineMode(messageUnpacker.unpackString());
- break;
- case "offset":
- fileChunk.setOffset(messageUnpacker.unpackLong());
- break;
- case "length":
- fileChunk.setLength(messageUnpacker.unpackLong());
- break;
- case "lastChunkFlag":
- fileChunk.setLastChunkFlag(messageUnpacker.unpackInt());
- break;
- case "chunk":
- fileChunk.setChunk(messageUnpacker.readPayload(messageUnpacker.unpackRawStringHeader()));
- break;
- case "timestamp":
- fileChunk.setTimestamp(messageUnpacker.unpackLong());
- break;
- case "meta":
- String meta = messageUnpacker.unpackString();
- JSONObject metaJsonObject = JSONUtil.parseObj(meta);
- for (String key : metaJsonObject.keySet()) {
- metaMap.put(key, metaJsonObject.get(key));
- }
- fileChunk.setMeta(metaMap);
- break;
- default:
- messageUnpacker.skipValue();
- break;
- }
- }
- if (COMBINE_MODE_APPEND.equals(fileChunk.getCombineMode())) {
- fileChunk.setLastChunkFlag(0);
- }
- calculateChunkSize(fileChunk.getLength());
- calculateFileType(fileChunk.getFileType());
- } catch (Exception e) {
- parseErrorMessagePacksCounter.inc();
- LOG.error("Parse messagePack failed.", e);
- fileChunk = null;
- }
- return fileChunk;
- }
-
- private void calculateChunkSize(long length) {
- if (length == 0) {
- equal0BChunksCounter.inc();
- } else if (length <= 1024) {
- lessThan1KBChunksCounter.inc();
- } else if (length <= 5 * 1024) {
- between1KBAnd5KBChunksCounter.inc();
- } else if (length <= 10 * 1024) {
- between5KBAnd10KBChunksCounter.inc();
- } else if (length <= 50 * 1024) {
- between10KBAnd50KBChunksCounter.inc();
- } else if (length <= 100 * 1024) {
- between50KBAnd100KBChunksCounter.inc();
- } else {
- greaterThan100KBChunksCounter.inc();
- }
- }
-
- private void calculateFileType(String fileType) {
- switch (fileType) {
- case "eml":
- emlChunksCounter.inc();
- break;
- case "html":
- htmlChunksCounter.inc();
- break;
- case "txt":
- txtChunksCounter.inc();
- break;
- case "pcapng":
- pcapngChunksCounter.inc();
- break;
- default:
- mediaChunksCounter.inc();
- }
- }
-}
diff --git a/src/main/java/com/zdjizhi/function/TestKeyedCoProcessFunction.java b/src/main/java/com/zdjizhi/function/TestKeyedCoProcessFunction.java
new file mode 100644
index 0000000..fb71fd3
--- /dev/null
+++ b/src/main/java/com/zdjizhi/function/TestKeyedCoProcessFunction.java
@@ -0,0 +1,101 @@
+package com.zdjizhi.function;
+
+import cn.hutool.core.io.IoUtil;
+import com.zdjizhi.config.Configs;
+import com.zdjizhi.pojo.FileChunk;
+import com.zdjizhi.utils.HBaseConnectionUtil;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class TestKeyedCoProcessFunction extends KeyedCoProcessFunction<String, FileChunk, FileChunk, FileChunk> {
+
+ private final Configuration configuration;
+ public transient Counter chunksInCounter;
+ public transient Counter fileMetasInCounter;
+
+ private boolean isAsync;
+ private Connection syncHBaseConnection;
+ private AsyncConnection AsyncHBaseConnection;
+ private Table table;
+ private AsyncTable<AdvancedScanResultConsumer> asyncTable;
+ private List<Put> dataPutList;
+ private List<Put> metaPutList;
+ private long maxBatchCount;
+
+ public TestKeyedCoProcessFunction(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup("file_chunk_combiner", "add_file_meta");
+ chunksInCounter = metricGroup.counter("chunksInCount");
+ fileMetasInCounter = metricGroup.counter("fileMetasInCount");
+ metricGroup.meter("numChunksInPerSecond", new MeterView(chunksInCounter));
+ metricGroup.meter("numFileMetasInPerSecond", new MeterView(fileMetasInCounter));
+ isAsync = configuration.getBoolean(Configs.SINK_ASYNC);
+ if (isAsync) {
+ AsyncHBaseConnection = HBaseConnectionUtil.getInstance(configuration).getAsyncHBaseConnection();
+ asyncTable = AsyncHBaseConnection.getTable(TableName.valueOf("default:" + configuration.get(Configs.SINK_HOS_BUCKET)));
+ } else {
+ syncHBaseConnection = HBaseConnectionUtil.getInstance(configuration).getSyncHBaseConnection();
+ table = syncHBaseConnection.getTable(TableName.valueOf("default:" + configuration.get(Configs.SINK_HOS_BUCKET)));
+ }
+ maxBatchCount = configuration.getInteger(Configs.SINK_BATCH_COUNT);
+ dataPutList = new ArrayList<>();
+ metaPutList = new ArrayList<>();
+ }
+
+ @Override
+ public void processElement1(FileChunk value, Context ctx, Collector<FileChunk> out) throws IOException, InterruptedException {
+ chunksInCounter.inc();
+ Put dataPut = new Put(value.getUuid().getBytes());
+ dataPut.addColumn("meta".getBytes(), "data".getBytes(), (value.toString()).getBytes());
+ dataPutList.add(dataPut);
+ if (dataPutList.size() >= maxBatchCount) {
+ if (isAsync) {
+ asyncTable.batch(dataPutList);
+ dataPutList.clear();
+ } else {
+ table.batch(dataPutList, null);
+ dataPutList.clear();
+ }
+ }
+ }
+
+ @Override
+ public void processElement2(FileChunk value, Context ctx, Collector<FileChunk> out) throws IOException, InterruptedException {
+ fileMetasInCounter.inc();
+ Put metaPut = new Put(value.getUuid().getBytes());
+ metaPut.addColumn("meta".getBytes(), "meta".getBytes(), (value.getMeta().toString()).getBytes());
+ metaPutList.add(metaPut);
+ if (metaPutList.size() >= maxBatchCount) {
+ if (isAsync) {
+ asyncTable.batch(metaPutList);
+ metaPutList.clear();
+ } else {
+ table.batch(metaPutList, null);
+ metaPutList.clear();
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ IoUtil.close(table);
+ IoUtil.close(syncHBaseConnection);
+ IoUtil.close(AsyncHBaseConnection);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/function/map/ParseMessagePackMapFunction.java b/src/main/java/com/zdjizhi/function/map/ParseMessagePackMapFunction.java
new file mode 100644
index 0000000..06c1e5c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/function/map/ParseMessagePackMapFunction.java
@@ -0,0 +1,385 @@
+package com.zdjizhi.function.map;
+
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.pojo.FileChunk;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.*;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessageUnpacker;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.zdjizhi.utils.PublicConstants.COMBINE_MODE_APPEND;
+
+public class ParseMessagePackMapFunction extends RichMapFunction<byte[], FileChunk> {
+ private static final Log LOG = LogFactory.get();
+
+ public transient Counter chunksInCounter;
+ public transient Counter chunksOutCounter;
+ public transient Counter bytesInCounter;
+ public transient Counter bytesOutCounter;
+ public transient Counter errorChunksCounter;
+ public transient Counter lessThan1KBChunksCounter;
+ public transient Counter between1KBAnd3KBChunksCounter;
+ public transient Counter between3KBAnd5KBChunksCounter;
+ public transient Counter between5KBAnd10KBChunksCounter;
+ public transient Counter between10KBAnd50KBChunksCounter;
+ public transient Counter between50KBAnd100KBChunksCounter;
+ public transient Counter greaterThan100KBChunksCounter;
+ public transient Counter offsetLessThan10KBChunksCounter;
+ public transient Counter offsetBetween10KBAnd100KBChunksCounter;
+ public transient Counter offsetBetween100KBAnd1MBChunksCounter;
+ public transient Counter offsetBetween1MBAnd10MBChunksCounter;
+ public transient Counter offsetBetween10MBAnd100MBChunksCounter;
+ public transient Counter offsetBetween100MBAnd1GBChunksCounter;
+ public transient Counter offsetGreaterThan1GBChunksCounter;
+ public transient Counter lessThan10KBFilesCounter;
+ public transient Counter between10KBAnd100KBFilesCounter;
+ public transient Counter between100KBAnd1MBFilesCounter;
+ public transient Counter between1MBAnd10MBFilesCounter;
+ public transient Counter between10MBAnd100MBFilesCounter;
+ public transient Counter between100MBAnd1GBFilesCounter;
+ public transient Counter greaterThan1GBFilesCounter;
+ public transient Counter lessThan10KBEmlChunksCounter;
+ public transient Counter between10KBAnd100KBEmlChunksCounter;
+ public transient Counter between100KBAnd1MBEmlChunksCounter;
+ public transient Counter between1MBAnd10MBEmlChunksCounter;
+ public transient Counter greaterThan10MBEmlChunksCounter;
+ public transient Counter lessThan10KBTxtChunksCounter;
+ public transient Counter between10KBAnd100KBTxtChunksCounter;
+ public transient Counter between100KBAnd1MBTxtChunksCounter;
+ public transient Counter between1MBAnd10MBTxtChunksCounter;
+ public transient Counter greaterThan10MBTxtChunksCounter;
+ public transient Counter emlChunksCounter;
+ public transient Counter txtChunksCounter;
+ public transient Counter htmlChunksCounter;
+ public transient Counter pcapngChunksCounter;
+ public transient Counter mediaChunksCounter;
+ public transient Counter startChunksCounter;
+ public transient Counter endChunksCounter;
+ public transient Counter nullChunksCounter;
+ public transient Counter nullTxtChunksCounter;
+ public transient Counter nullEmlChunksCounter;
+ public transient Counter emlFilesCounter;
+ public transient Counter txtFilesCounter;
+ public transient Counter htmlFilesCounter;
+ public transient Counter pcapngFilesCounter;
+ public transient Counter mediaFilesCounter;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup("file_chunk_combiner", "map_message_pack");
+ lessThan1KBChunksCounter = metricGroup.counter("lessThan1KBChunksCount");
+ between1KBAnd3KBChunksCounter = metricGroup.counter("between1KBAnd3KBChunksCount");
+ between3KBAnd5KBChunksCounter = metricGroup.counter("between3KBAnd5KBChunksCount");
+ between5KBAnd10KBChunksCounter = metricGroup.counter("between5KBAnd10KBChunksCount");
+ between10KBAnd50KBChunksCounter = metricGroup.counter("between10KBAnd50KBChunksCount");
+ between50KBAnd100KBChunksCounter = metricGroup.counter("between50KBAnd100KBChunksCount");
+ greaterThan100KBChunksCounter = metricGroup.counter("greaterThan100KBChunksCount");
+ metricGroup.meter("numLessThan1KBChunksInPerSecond", new MeterView(lessThan1KBChunksCounter));
+ metricGroup.meter("numBetween1KBAnd3KBChunksInPerSecond", new MeterView(between1KBAnd3KBChunksCounter));
+ metricGroup.meter("numBetween3KBAnd5KBChunksInPerSecond", new MeterView(between3KBAnd5KBChunksCounter));
+ metricGroup.meter("numBetween5KBAnd10KBChunksInPerSecond", new MeterView(between5KBAnd10KBChunksCounter));
+ metricGroup.meter("numBetween10KBAnd50KBChunksInPerSecond", new MeterView(between10KBAnd50KBChunksCounter));
+ metricGroup.meter("numBetween50KBAnd100KBChunksInPerSecond", new MeterView(between50KBAnd100KBChunksCounter));
+ metricGroup.meter("numGreaterThan100KBChunksInPerSecond", new MeterView(greaterThan100KBChunksCounter));
+ offsetLessThan10KBChunksCounter = metricGroup.counter("offsetLessThan10KBChunksCount");
+ offsetBetween10KBAnd100KBChunksCounter = metricGroup.counter("offsetBetween10KBAnd100KBChunksCount");
+ offsetBetween100KBAnd1MBChunksCounter = metricGroup.counter("offsetBetween100KBAnd1MBChunksCount");
+ offsetBetween1MBAnd10MBChunksCounter = metricGroup.counter("offsetBetween1MBAnd10MBChunksCount");
+ offsetBetween10MBAnd100MBChunksCounter = metricGroup.counter("offsetBetween10MBAnd100MBChunksCount");
+ offsetBetween100MBAnd1GBChunksCounter = metricGroup.counter("offsetBetween100MBAnd1GBChunksCount");
+ offsetGreaterThan1GBChunksCounter = metricGroup.counter("offsetGreaterThan1GBChunksCount");
+ metricGroup.meter("numOffsetLessThan10KBChunksInPerSecond", new MeterView(offsetLessThan10KBChunksCounter));
+ metricGroup.meter("numOffsetBetween10KBAnd100KBChunksInPerSecond", new MeterView(offsetBetween10KBAnd100KBChunksCounter));
+ metricGroup.meter("numOffsetBetween100KBAnd1MBChunksInPerSecond", new MeterView(offsetBetween100KBAnd1MBChunksCounter));
+ metricGroup.meter("numOffsetBetween1MBAnd10MBChunksInPerSecond", new MeterView(offsetBetween1MBAnd10MBChunksCounter));
+ metricGroup.meter("numOffsetBetween10MBAnd100MBChunksInPerSecond", new MeterView(offsetBetween10MBAnd100MBChunksCounter));
+ metricGroup.meter("numOffsetBetween100MBAnd1GBChunksInPerSecond", new MeterView(offsetBetween100MBAnd1GBChunksCounter));
+ metricGroup.meter("numOffsetGreaterThan1GBChunksInPerSecond", new MeterView(offsetGreaterThan1GBChunksCounter));
+ lessThan10KBFilesCounter = metricGroup.counter("lessThan10KBFilesCount");
+ between10KBAnd100KBFilesCounter = metricGroup.counter("between10KBAnd100KBFilesCount");
+ between100KBAnd1MBFilesCounter = metricGroup.counter("between100KBAnd1MBFilesCount");
+ between1MBAnd10MBFilesCounter = metricGroup.counter("between1MBAnd10MBFilesCount");
+ between10MBAnd100MBFilesCounter = metricGroup.counter("between10MBAnd100MBFilesCount");
+ between100MBAnd1GBFilesCounter = metricGroup.counter("between100MBAnd1GBFilesCount");
+ greaterThan1GBFilesCounter = metricGroup.counter("greaterThan1GBFilesCount");
+ metricGroup.meter("numLessThan10KBFilesInPerSecond", new MeterView(lessThan10KBFilesCounter));
+ metricGroup.meter("numBetween10KBAnd100KBFilesInPerSecond", new MeterView(between10KBAnd100KBFilesCounter));
+ metricGroup.meter("numBetween100KBAnd1MBFilesInPerSecond", new MeterView(between100KBAnd1MBFilesCounter));
+ metricGroup.meter("numBetween1MBAnd10MBFilesInPerSecond", new MeterView(between1MBAnd10MBFilesCounter));
+ metricGroup.meter("numBetween10MBAnd100MBFilesInPerSecond", new MeterView(between10MBAnd100MBFilesCounter));
+ metricGroup.meter("numBetween100MBAnd1GBFilesInPerSecond", new MeterView(between100MBAnd1GBFilesCounter));
+ metricGroup.meter("numGreaterThan1GBFilesInPerSecond", new MeterView(greaterThan1GBFilesCounter));
+ lessThan10KBEmlChunksCounter = metricGroup.counter("lessThan10KBEmlChunksCount");
+ between10KBAnd100KBEmlChunksCounter = metricGroup.counter("between10KBAnd100KBEmlChunksCount");
+ between100KBAnd1MBEmlChunksCounter = metricGroup.counter("between100KBAnd1MBEmlChunksCount");
+ between1MBAnd10MBEmlChunksCounter = metricGroup.counter("between1MBAnd10MBEmlChunksCount");
+ greaterThan10MBEmlChunksCounter = metricGroup.counter("greaterThan10MBEmlChunksCount");
+ metricGroup.meter("numLessThan10KBEmlFilesInPerSecond", new MeterView(lessThan10KBEmlChunksCounter));
+ metricGroup.meter("numBetween10KBAnd100KBEmlFilesInPerSecond", new MeterView(between10KBAnd100KBEmlChunksCounter));
+ metricGroup.meter("numBetween100KBAnd1MBEmlFilesInPerSecond", new MeterView(between100KBAnd1MBEmlChunksCounter));
+ metricGroup.meter("numBetween1MBAnd10MBEmlFilesInPerSecond", new MeterView(between1MBAnd10MBEmlChunksCounter));
+ metricGroup.meter("numGreaterThan10MBEmlFilesInPerSecond", new MeterView(greaterThan10MBEmlChunksCounter));
+ lessThan10KBTxtChunksCounter = metricGroup.counter("lessThan10KBTxtChunksCount");
+ between10KBAnd100KBTxtChunksCounter = metricGroup.counter("between10KBAnd100KBTxtChunksCount");
+ between100KBAnd1MBTxtChunksCounter = metricGroup.counter("between100KBAnd1MBTxtChunksCount");
+ between1MBAnd10MBTxtChunksCounter = metricGroup.counter("between1MBAnd10MBTxtChunksCount");
+ greaterThan10MBTxtChunksCounter = metricGroup.counter("greaterThan10MBTxtChunksCount");
+ metricGroup.meter("numLessThan10KBTxtFilesInPerSecond", new MeterView(lessThan10KBTxtChunksCounter));
+ metricGroup.meter("numBetween10KBAnd100KBTxtFilesInPerSecond", new MeterView(between10KBAnd100KBTxtChunksCounter));
+ metricGroup.meter("numBetween100KBAnd1MBTxtFilesInPerSecond", new MeterView(between100KBAnd1MBTxtChunksCounter));
+ metricGroup.meter("numBetween1MBAnd10MBTxtFilesInPerSecond", new MeterView(between1MBAnd10MBTxtChunksCounter));
+ metricGroup.meter("numGreaterThan10MBTxtFilesInPerSecond", new MeterView(greaterThan10MBTxtChunksCounter));
+ emlChunksCounter = metricGroup.counter("emlChunksCount");
+ txtChunksCounter = metricGroup.counter("txtChunksCount");
+ htmlChunksCounter = metricGroup.counter("htmlChunksCount");
+ pcapngChunksCounter = metricGroup.counter("pcapngChunksCount");
+ mediaChunksCounter = metricGroup.counter("mediaChunksCount");
+ metricGroup.meter("numEmlChunksInPerSecond", new MeterView(emlChunksCounter));
+ metricGroup.meter("numTxtChunksInPerSecond", new MeterView(txtChunksCounter));
+ metricGroup.meter("numHtmlChunksInPerSecond", new MeterView(htmlChunksCounter));
+ metricGroup.meter("numPcapngChunksInPerSecond", new MeterView(pcapngChunksCounter));
+ metricGroup.meter("numMediaChunksInPerSecond", new MeterView(mediaChunksCounter));
+ startChunksCounter = metricGroup.counter("startChunksCount");
+ endChunksCounter = metricGroup.counter("endChunksCount");
+ metricGroup.meter("numStartChunksInPerSecond", new MeterView(startChunksCounter));
+ metricGroup.meter("numEndChunksInPerSecond", new MeterView(endChunksCounter));
+ nullChunksCounter = metricGroup.counter("nullChunksCount");
+ nullEmlChunksCounter = metricGroup.counter("nullTxtChunksCount");
+ nullTxtChunksCounter = metricGroup.counter("nullEmlChunksCount");
+ metricGroup.meter("numNullFilesInPerSecond", new MeterView(nullChunksCounter));
+ metricGroup.meter("numNullEmlFilesInPerSecond", new MeterView(nullEmlChunksCounter));
+ metricGroup.meter("numNullTxtFilesInPerSecond", new MeterView(nullTxtChunksCounter));
+ emlFilesCounter = metricGroup.counter("emlFilesCount");
+ txtFilesCounter = metricGroup.counter("txtFilesCount");
+ htmlFilesCounter = metricGroup.counter("htmlFilesCount");
+ pcapngFilesCounter = metricGroup.counter("pcapngFilesCount");
+ mediaFilesCounter = metricGroup.counter("mediaFilesCount");
+ metricGroup.meter("numEmlFilesInPerSecond", new MeterView(emlFilesCounter));
+ metricGroup.meter("numTxtFilesInPerSecond", new MeterView(txtFilesCounter));
+ metricGroup.meter("numHtmlFilesInPerSecond", new MeterView(htmlFilesCounter));
+ metricGroup.meter("numPcapngFilesInPerSecond", new MeterView(pcapngFilesCounter));
+ metricGroup.meter("numMediaFilesInPerSecond", new MeterView(mediaFilesCounter));
+ chunksInCounter = metricGroup.counter("chunksInCount");
+ chunksOutCounter = metricGroup.counter("chunksOutCount");
+ bytesInCounter = metricGroup.counter("bytesInCount");
+ bytesOutCounter = metricGroup.counter("bytesOutCount");
+ errorChunksCounter = metricGroup.counter("errorChunksCount");
+ metricGroup.meter("numChunksInPerSecond", new MeterView(chunksInCounter));
+ metricGroup.meter("numChunksOutPerSecond", new MeterView(chunksOutCounter));
+ metricGroup.meter("numBytesInPerSecond", new MeterView(bytesInCounter));
+ metricGroup.meter("numBytesOutPerSecond", new MeterView(bytesOutCounter));
+ metricGroup.meter("numErrorChunksPerSecond", new MeterView(errorChunksCounter));
+ }
+
+ @Override
+ public FileChunk map(byte[] messagePackData) {
+ return parseMessagePack(messagePackData);
+ }
+
+ private FileChunk parseMessagePack(byte[] messagePackData) {
+ chunksInCounter.inc();
+ bytesInCounter.inc(messagePackData.length);
+ FileChunk fileChunk;
+ try {
+ fileChunk = new FileChunk();
+ MessageUnpacker messageUnpacker = MessagePack.newDefaultUnpacker(messagePackData);
+ int numFields = messageUnpacker.unpackMapHeader();
+ Map<String, Object> metaMap = new HashMap<>();
+ for (int i = 0; i < numFields; i++) {
+ String fieldName = messageUnpacker.unpackString();
+ switch (fieldName) {
+ case "uuid":
+ fileChunk.setUuid(messageUnpacker.unpackString());
+ break;
+ case "fileName":
+ fileChunk.setFileName(messageUnpacker.unpackString());
+ break;
+ case "fileType":
+ fileChunk.setFileType(messageUnpacker.unpackString());
+ break;
+ case "combineMode":
+ fileChunk.setCombineMode(messageUnpacker.unpackString());
+ break;
+ case "offset":
+ fileChunk.setOffset(messageUnpacker.unpackLong());
+ break;
+ case "length":
+ fileChunk.setLength(messageUnpacker.unpackLong());
+ break;
+ case "lastChunkFlag":
+ fileChunk.setLastChunkFlag(messageUnpacker.unpackInt());
+ break;
+ case "chunk":
+ fileChunk.setChunk(messageUnpacker.readPayload(messageUnpacker.unpackRawStringHeader()));
+ break;
+ case "timestamp":
+ fileChunk.setTimestamp(messageUnpacker.unpackLong());
+ break;
+ case "meta":
+ String meta = messageUnpacker.unpackString();
+ JSONObject metaJsonObject = JSONUtil.parseObj(meta);
+ for (String key : metaJsonObject.keySet()) {
+ metaMap.put(key, metaJsonObject.get(key));
+ }
+ fileChunk.setMeta(metaMap);
+ break;
+ default:
+ messageUnpacker.skipValue();
+ break;
+ }
+ }
+ if (COMBINE_MODE_APPEND.equals(fileChunk.getCombineMode())) {
+ fileChunk.setLastChunkFlag(0);
+ }
+ chunksOutCounter.inc();
+ bytesOutCounter.inc(fileChunk.getLength());
+ calculateFileChunkMetrics(fileChunk);
+ } catch (Exception e) {
+ errorChunksCounter.inc();
+ LOG.error("Parse messagePack failed.", e);
+ fileChunk = null;
+ }
+ return fileChunk;
+ }
+
+ private void calculateFileChunkMetrics(FileChunk fileChunk) {
+ String fileType = fileChunk.getFileType();
+ long length = fileChunk.getLength();
+ long offset = fileChunk.getOffset();
+ int lastChunkFlag = fileChunk.getLastChunkFlag();
+ if (length <= 1024) {
+ lessThan1KBChunksCounter.inc();
+ } else if (length <= 3 * 1024) {
+ between1KBAnd3KBChunksCounter.inc();
+ } else if (length <= 5 * 1024) {
+ between3KBAnd5KBChunksCounter.inc();
+ } else if (length <= 10 * 1024) {
+ between5KBAnd10KBChunksCounter.inc();
+ } else if (length <= 50 * 1024) {
+ between10KBAnd50KBChunksCounter.inc();
+ } else if (length <= 100 * 1024) {
+ between50KBAnd100KBChunksCounter.inc();
+ } else {
+ greaterThan100KBChunksCounter.inc();
+ }
+ switch (fileType) {
+ case "eml":
+ emlChunksCounter.inc();
+ break;
+ case "html":
+ htmlChunksCounter.inc();
+ break;
+ case "txt":
+ txtChunksCounter.inc();
+ break;
+ case "pcapng":
+ pcapngChunksCounter.inc();
+ break;
+ default:
+ mediaChunksCounter.inc();
+ }
+ if (offset == 0 && lastChunkFlag == 1 && fileChunk.getChunk() == null) {
+ nullChunksCounter.inc();
+ if ("eml".equals(fileType)) {
+ nullEmlChunksCounter.inc();
+ } else if ("txt".equals(fileType)) {
+ nullTxtChunksCounter.inc();
+ }
+ LOG.info("map file data is null: " + fileChunk.toString());
+ }
+ if (offset == 0) {
+ startChunksCounter.inc();
+ switch (fileType) {
+ case "eml":
+ emlFilesCounter.inc();
+ break;
+ case "html":
+ htmlFilesCounter.inc();
+ break;
+ case "txt":
+ txtFilesCounter.inc();
+ break;
+ case "pcapng":
+ pcapngFilesCounter.inc();
+ break;
+ default:
+ mediaFilesCounter.inc();
+ }
+ }
+ if (lastChunkFlag == 1) {
+ endChunksCounter.inc();
+ if (offset <= 10 * 1024) {
+ lessThan10KBFilesCounter.inc();
+ } else if (offset <= 100 * 1024) {
+ between10KBAnd100KBFilesCounter.inc();
+ } else if (offset <= 1024 * 1024) {
+ between100KBAnd1MBFilesCounter.inc();
+ } else if (offset <= 10 * 1024 * 1024) {
+ between1MBAnd10MBFilesCounter.inc();
+ } else if (offset <= 100 * 1024 * 1024) {
+ between10MBAnd100MBFilesCounter.inc();
+ } else if (offset <= 1024 * 1024 * 1024) {
+ between100MBAnd1GBFilesCounter.inc();
+ } else {
+ greaterThan1GBFilesCounter.inc();
+ }
+ if ("eml".equals(fileType)) {
+ calculateEmlFileSize(offset);
+ }
+ if ("txt".equals(fileType)) {
+ calculateTxtFileSize(offset);
+ }
+ }
+ if (offset <= 10 * 1024) {
+ offsetLessThan10KBChunksCounter.inc();
+ } else if (offset <= 100 * 1024) {
+ offsetBetween10KBAnd100KBChunksCounter.inc();
+ } else if (offset <= 1024 * 1024) {
+ offsetBetween100KBAnd1MBChunksCounter.inc();
+ } else if (offset <= 10 * 1024 * 1024) {
+ offsetBetween1MBAnd10MBChunksCounter.inc();
+ } else if (offset <= 100 * 1024 * 1024) {
+ offsetBetween10MBAnd100MBChunksCounter.inc();
+ } else if (offset <= 1024 * 1024 * 1024) {
+ offsetBetween100MBAnd1GBChunksCounter.inc();
+ } else {
+ offsetGreaterThan1GBChunksCounter.inc();
+ }
+ }
+
+ private void calculateEmlFileSize(long length) {
+ if (length <= 10 * 1024) {
+ lessThan10KBEmlChunksCounter.inc();
+ } else if (length <= 100 * 1024) {
+ between10KBAnd100KBEmlChunksCounter.inc();
+ } else if (length <= 1024 * 1024) {
+ between100KBAnd1MBEmlChunksCounter.inc();
+ } else if (length <= 10 * 1024 * 1024) {
+ between1MBAnd10MBEmlChunksCounter.inc();
+ } else {
+ greaterThan10MBEmlChunksCounter.inc();
+ }
+ }
+
+ private void calculateTxtFileSize(long length) {
+ if (length <= 10 * 1024) {
+ lessThan10KBTxtChunksCounter.inc();
+ } else if (length <= 100 * 1024) {
+ between10KBAnd100KBTxtChunksCounter.inc();
+ } else if (length <= 1024 * 1024) {
+ between100KBAnd1MBTxtChunksCounter.inc();
+ } else if (length <= 10 * 1024 * 1024) {
+ between1MBAnd10MBTxtChunksCounter.inc();
+ } else {
+ greaterThan10MBTxtChunksCounter.inc();
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/function/map/ParseProxyFileMetaFlatMapFunction.java b/src/main/java/com/zdjizhi/function/map/ParseProxyFileMetaFlatMapFunction.java
new file mode 100644
index 0000000..8b7ce56
--- /dev/null
+++ b/src/main/java/com/zdjizhi/function/map/ParseProxyFileMetaFlatMapFunction.java
@@ -0,0 +1,98 @@
+package com.zdjizhi.function.map;
+
+import cn.hutool.crypto.digest.DigestUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSONObject;
+import com.zdjizhi.pojo.FileChunk;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Collector;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ParseProxyFileMetaFlatMapFunction extends RichFlatMapFunction<String, FileChunk> {
+ private static final Log LOG = LogFactory.get();
+
+ public transient Counter chunksInCounter;
+ public transient Counter chunksOutCounter;
+ public transient Counter errorChunksCounter;
+ public transient Counter httpRequestFilesCounter;
+ public transient Counter httpResponseFilesCounter;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup("file_chunk_combiner", "map_proxy_file_meta");
+ chunksInCounter = metricGroup.counter("chunksInCount");
+ chunksOutCounter = metricGroup.counter("chunksOutCount");
+ errorChunksCounter = metricGroup.counter("errorChunksCount");
+ httpRequestFilesCounter = metricGroup.counter("httpRequestFilesInCount");
+ httpResponseFilesCounter = metricGroup.counter("httpResponseFilesInCount");
+ metricGroup.meter("numChunksInPerSecond", new MeterView(chunksInCounter));
+ metricGroup.meter("numChunksOutPerSecond", new MeterView(chunksOutCounter));
+ metricGroup.meter("numErrorChunksPerSecond", new MeterView(errorChunksCounter));
+ metricGroup.meter("numHttpRequestFilesInPerSecond", new MeterView(httpRequestFilesCounter));
+ metricGroup.meter("numHttpResponseFilesInPerSecond", new MeterView(httpResponseFilesCounter));
+ }
+
+ @Override
+ public void flatMap(String value, Collector<FileChunk> out) {
+ try {
+ chunksInCounter.inc();
+ JSONObject record = JSONObject.parseObject(value);
+ if (record.containsKey("proxy_rule_list") && record.getJSONArray("proxy_rule_list").size() > 0) {
+ if (record.containsKey("http_request_body")) {
+ FileChunk fileChunk = new FileChunk();
+ Map<String, Object> metaMap = new HashMap<>();
+ String uuid = record.getString("http_request_body");
+ fileChunk.setUuid(uuid);
+ metaMap.put("fileId", DigestUtil.md5Hex(uuid) + "_1");
+ metaMap.put("contentType", record.getString("http_request_content_type"));
+ metaMap.put("contentLength", record.getOrDefault("http_request_content_length", 0));
+ metaMap.put("http_session_duration_ms", record.getOrDefault("http_session_duration_ms", 0));
+ getFileMeta(record, metaMap);
+ fileChunk.setMeta(metaMap);
+ out.collect(fileChunk);
+ chunksOutCounter.inc();
+ httpRequestFilesCounter.inc();
+ }
+ if (record.containsKey("http_response_body")) {
+ FileChunk fileChunk = new FileChunk();
+ Map<String, Object> metaMap = new HashMap<>();
+ String uuid = record.getString("http_response_body");
+ fileChunk.setUuid(uuid);
+ metaMap.put("fileId", DigestUtil.md5Hex(uuid) + "_2");
+ metaMap.put("contentType", record.getString("http_response_content_type"));
+ metaMap.put("contentLength", record.getOrDefault("http_response_content_length", 0));
+ metaMap.put("http_session_duration_ms", record.getOrDefault("http_session_duration_ms", 0));
+ getFileMeta(record, metaMap);
+ fileChunk.setMeta(metaMap);
+ out.collect(fileChunk);
+ chunksOutCounter.inc();
+ httpResponseFilesCounter.inc();
+ }
+ }
+ } catch (Exception e) {
+ errorChunksCounter.inc();
+ LOG.error("Parse proxy file meta failed.", e);
+ }
+ }
+
+ private void getFileMeta(JSONObject record, Map<String, Object> metaMap) {
+ metaMap.put("policyId", record.containsKey("monitor_rule_list") && record.getJSONArray("monitor_rule_list").size() > 0 ? record.getJSONArray("monitor_rule_list").getInteger(0) : 0);
+ metaMap.put("serverIP", record.getString("server_ip"));
+ metaMap.put("serverPort", record.getInteger("server_port"));
+ metaMap.put("clientIP", record.getString("client_ip"));
+ metaMap.put("clientPort", record.getInteger("client_port"));
+ metaMap.put("httpHost", record.getOrDefault("http_host", "").toString());
+ metaMap.put("subscriberId", record.getOrDefault("subscriber_id", "").toString());
+ metaMap.put("foundTime", record.getLong("end_timestamp_ms"));
+ metaMap.put("sled_ip", record.getOrDefault("sled_ip", "").toString());
+ metaMap.put("duration_ms", record.getInteger("duration_ms"));
+ }
+}
diff --git a/src/main/java/com/zdjizhi/function/map/ParseSessionFileMetaFlatMapFunction.java b/src/main/java/com/zdjizhi/function/map/ParseSessionFileMetaFlatMapFunction.java
new file mode 100644
index 0000000..cdbb131
--- /dev/null
+++ b/src/main/java/com/zdjizhi/function/map/ParseSessionFileMetaFlatMapFunction.java
@@ -0,0 +1,117 @@
+package com.zdjizhi.function.map;
+
+import cn.hutool.crypto.digest.DigestUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSONObject;
+import com.zdjizhi.pojo.FileChunk;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Collector;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class ParseSessionFileMetaFlatMapFunction extends RichFlatMapFunction<String, FileChunk> {
+ private static final Log LOG = LogFactory.get();
+
+ public transient Counter chunksInCounter;
+ public transient Counter chunksOutCounter;
+ public transient Counter emlFilesCounter;
+ public transient Counter errorChunksCounter;
+ public transient Counter httpRequestFilesCounter;
+ public transient Counter httpResponseFilesCounter;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup("file_chunk_combiner", "map_session_file_meta");
+ chunksInCounter = metricGroup.counter("chunksInCount");
+ chunksOutCounter = metricGroup.counter("chunksOutCount");
+ errorChunksCounter = metricGroup.counter("errorChunksCount");
+ emlFilesCounter = metricGroup.counter("emlFilesInCount");
+ httpRequestFilesCounter = metricGroup.counter("httpRequestFilesInCount");
+ httpResponseFilesCounter = metricGroup.counter("httpResponseFilesInCount");
+ metricGroup.meter("numChunksInPerSecond", new MeterView(chunksInCounter));
+ metricGroup.meter("numChunksOutPerSecond", new MeterView(chunksOutCounter));
+ metricGroup.meter("numErrorChunksPerSecond", new MeterView(errorChunksCounter));
+ metricGroup.meter("numEmlFilesInPerSecond", new MeterView(emlFilesCounter));
+ metricGroup.meter("numHttpRequestFilesInPerSecond", new MeterView(httpRequestFilesCounter));
+ metricGroup.meter("numHttpResponseFilesInPerSecond", new MeterView(httpResponseFilesCounter));
+ }
+
+ @Override
+ public void flatMap(String value, Collector<FileChunk> out) {
+ try {
+ chunksInCounter.inc();
+ JSONObject record = JSONObject.parseObject(value);
+ if (record.containsKey("security_rule_list")
+ && record.getJSONArray("security_rule_list").size() > 0
+ || record.containsKey("monitor_rule_list")
+ && record.getJSONArray("monitor_rule_list").size() > 0) {
+ if (record.containsKey("http_request_body")) {
+ FileChunk fileChunk = new FileChunk();
+ Map<String, Object> metaMap = new HashMap<>();
+ String uuid = record.getString("http_request_body");
+ fileChunk.setUuid(uuid);
+ metaMap.put("fileId", DigestUtil.md5Hex(uuid) + "_1");
+ metaMap.put("contentType", record.getString("http_request_content_type"));
+ metaMap.put("contentLength", record.getOrDefault("http_request_content_length", 0));
+ metaMap.put("http_session_duration_ms", record.getOrDefault("http_session_duration_ms", 0));
+ getFileMeta(record, metaMap);
+ fileChunk.setMeta(metaMap);
+ out.collect(fileChunk);
+ chunksOutCounter.inc();
+ httpRequestFilesCounter.inc();
+ }
+ if (record.containsKey("http_response_body")) {
+ FileChunk fileChunk = new FileChunk();
+ Map<String, Object> metaMap = new HashMap<>();
+ String uuid = record.getString("http_response_body");
+ fileChunk.setUuid(uuid);
+ metaMap.put("fileId", DigestUtil.md5Hex(uuid) + "_2");
+ metaMap.put("contentType", record.getString("http_response_content_type"));
+ metaMap.put("contentLength", record.getOrDefault("http_response_content_length", 0));
+ metaMap.put("http_session_duration_ms", record.getOrDefault("http_session_duration_ms", 0));
+ getFileMeta(record, metaMap);
+ fileChunk.setMeta(metaMap);
+ out.collect(fileChunk);
+ chunksOutCounter.inc();
+ httpResponseFilesCounter.inc();
+ }
+ if (record.containsKey("mail_eml_file")) {
+ FileChunk fileChunk = new FileChunk();
+ Map<String, Object> metaMap = new HashMap<>();
+ String uuid = record.getString("mail_eml_file");
+ fileChunk.setUuid(uuid);
+ metaMap.put("fileId", DigestUtil.md5Hex(uuid) + "_9");
+ getFileMeta(record, metaMap);
+ fileChunk.setMeta(metaMap);
+ out.collect(fileChunk);
+ chunksOutCounter.inc();
+ emlFilesCounter.inc();
+ }
+ }
+ } catch (Exception e) {
+ errorChunksCounter.inc();
+ LOG.error("Parse session file meta failed.", e);
+ }
+ }
+
+ private void getFileMeta(JSONObject record, Map<String, Object> metaMap) {
+ metaMap.put("policyId", record.containsKey("monitor_rule_list") && record.getJSONArray("monitor_rule_list").size() > 0 ? record.getJSONArray("monitor_rule_list").getInteger(0) : 0);
+ metaMap.put("serverIP", record.getString("server_ip"));
+ metaMap.put("serverPort", record.getInteger("server_port"));
+ metaMap.put("clientIP", record.getString("client_ip"));
+ metaMap.put("clientPort", record.getInteger("client_port"));
+ metaMap.put("httpHost", record.getOrDefault("http_host", "").toString());
+ metaMap.put("subscriberId", record.getOrDefault("subscriber_id", "").toString());
+ metaMap.put("foundTime", record.getLong("end_timestamp_ms"));
+ metaMap.put("sled_ip", record.getOrDefault("sled_ip", "").toString());
+ metaMap.put("duration_ms", record.getInteger("duration_ms"));
+ }
+}
diff --git a/src/main/java/com/zdjizhi/function/SideOutputMapFunction.java b/src/main/java/com/zdjizhi/function/map/SideOutputMapFunction.java
index c422fee..58236fc 100644
--- a/src/main/java/com/zdjizhi/function/SideOutputMapFunction.java
+++ b/src/main/java/com/zdjizhi/function/map/SideOutputMapFunction.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.function;
+package com.zdjizhi.function.map;
import com.zdjizhi.pojo.FileChunk;
import org.apache.flink.api.common.functions.RichMapFunction;
@@ -16,7 +16,7 @@ public class SideOutputMapFunction extends RichMapFunction<FileChunk, FileChunk>
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup("file_chunk_combiner", "side_out_put");
delayedChunksCounter = metricGroup.counter("delayedChunksCount");
metricGroup.meter("numChunksDelayPerSecond", new MeterView(delayedChunksCounter));
}
diff --git a/src/main/java/com/zdjizhi/kafka/FileMetaKafkaConsumer.java b/src/main/java/com/zdjizhi/kafka/FileMetaKafkaConsumer.java
new file mode 100644
index 0000000..305e042
--- /dev/null
+++ b/src/main/java/com/zdjizhi/kafka/FileMetaKafkaConsumer.java
@@ -0,0 +1,36 @@
+package com.zdjizhi.kafka;
+
+import com.zdjizhi.config.Configs;
+import com.zdjizhi.utils.KafkaCertUtil;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+
+import java.util.Properties;
+
+public abstract class FileMetaKafkaConsumer extends ByteArrayDeserializationSchema {
+
+ private static Properties createConsumerConfig(Configuration configuration) {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", configuration.get(Configs.KAFKA_BROKER));
+ properties.put("group.id", configuration.get(Configs.KAFKA_FILE_META_GROUP_ID));
+ properties.put("session.timeout.ms", configuration.get(Configs.KAFKA_SESSION_TIMEOUT_MS));
+ properties.put("max.poll.records", configuration.get(Configs.KAFKA_MAX_POLL_RECORDS));
+ properties.put("max.partition.fetch.bytes", configuration.get(Configs.KAFKA_MAX_PARTITION_FETCH_BYTES));
+ properties.put("partition.discovery.interval.ms", "10000");
+ properties.put("auto.offset.reset", configuration.get(Configs.KAFKA_AUTO_OFFSET_RESET));
+ properties.put("enable.auto.commit", configuration.get(Configs.KAFKA_ENABLE_AUTO_COMMIT));
+ KafkaCertUtil.chooseCert(properties, configuration);
+ return properties;
+ }
+
+ public static FlinkKafkaConsumer<String> stringConsumer(Configuration configuration, String topic) {
+ FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
+ new SimpleStringSchema(), createConsumerConfig(configuration));
+ //随着checkpoint提交,将offset提交到kafka
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
+ //从消费组当前的offset开始消费
+ kafkaConsumer.setStartFromGroupOffsets();
+ return kafkaConsumer;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/pojo/FileChunk.java b/src/main/java/com/zdjizhi/pojo/FileChunk.java
index c2282aa..1586986 100644
--- a/src/main/java/com/zdjizhi/pojo/FileChunk.java
+++ b/src/main/java/com/zdjizhi/pojo/FileChunk.java
@@ -22,6 +22,10 @@ public class FileChunk implements Serializable {
public FileChunk() {
}
+ public FileChunk(String uuid) {
+ this.uuid = uuid;
+ }
+
public FileChunk(String uuid, String fileType, long length, byte[] chunk, String combineMode, int chunkCount, long timestamp, Map<String, Object> meta, String chunkNumbers) {
this.uuid = uuid;
this.fileType = fileType;
@@ -144,17 +148,20 @@ public class FileChunk implements Serializable {
@Override
public String toString() {
+ String chunkData = chunk == null ? "null" : chunk.length + "";
return "FileChunk{" +
"uuid='" + uuid + '\'' +
", fileName='" + fileName + '\'' +
", fileType='" + fileType + '\'' +
", offset=" + offset +
", length=" + length +
+ ", chunk=" + chunkData +
", combineMode='" + combineMode + '\'' +
", lastChunkFlag=" + lastChunkFlag +
", chunkCount=" + chunkCount +
", timestamp=" + timestamp +
", meta=" + meta +
+ ", chunkNumbers='" + chunkNumbers + '\'' +
'}';
}
diff --git a/src/main/java/com/zdjizhi/sink/HBaseSink.java b/src/main/java/com/zdjizhi/sink/HBaseSink.java
index 66ec2c6..3bab6aa 100644
--- a/src/main/java/com/zdjizhi/sink/HBaseSink.java
+++ b/src/main/java/com/zdjizhi/sink/HBaseSink.java
@@ -1,6 +1,7 @@
package com.zdjizhi.sink;
import cn.hutool.core.io.IoUtil;
+import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.config.Configs;
@@ -9,6 +10,7 @@ import com.zdjizhi.utils.HBaseColumnConstants;
import com.zdjizhi.utils.HBaseConnectionUtil;
import com.zdjizhi.utils.PublicConstants;
import com.zdjizhi.utils.PublicUtil;
+import org.apache.commons.jexl3.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MeterView;
@@ -29,16 +31,34 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
private static final Log LOG = LogFactory.get();
private final Configuration configuration;
- public transient Counter sinkRequestsCounter;
- public transient Counter sinkErrorRequestsCounter;
- public transient Counter sinkFilesCounter;
- public transient Counter sinkChunksCounter;
- public transient Counter lessThan5KBChunksCounter;
+ public transient Counter chunksInCounter;
+ public transient Counter chunksOutCounter;
+ public transient Counter bytesInCounter;
+ public transient Counter bytesOutCounter;
+ public transient Counter errorChunksCounter;
+ public transient Counter filesCounter;
+ public transient Counter rateLimitDropChunksCounter;
+ public transient Counter lessThan1KBChunksCounter;
+ public transient Counter between1KBAnd5KBChunksCounter;
public transient Counter between5KBAnd10KBChunksCounter;
- public transient Counter between10KBAnd50KBChunksCounter;
- public transient Counter between50KBAnd100KBChunksCounter;
+ public transient Counter between10KBAnd100KBChunksCounter;
public transient Counter between100KBAnd1MBChunksCounter;
public transient Counter greaterThan1MBChunksCounter;
+ public transient Counter lessThan10KBEmlChunksCounter;
+ public transient Counter between1MBAnd10MBEmlChunksCounter;
+ public transient Counter between10KBAnd100KBEmlChunksCounter;
+ public transient Counter between100KBAnd1MBEmlChunksCounter;
+ public transient Counter greaterThan10MBEmlChunksCounter;
+ public transient Counter lessThan10KBTxtChunksCounter;
+ public transient Counter between1MBAnd10MBTxtChunksCounter;
+ public transient Counter between10KBAnd100KBTxtChunksCounter;
+ public transient Counter between100KBAnd1MBTxtChunksCounter;
+ public transient Counter greaterThan10MBTxtChunksCounter;
+ public transient Counter emlChunksCounter;
+ public transient Counter txtChunksCounter;
+ public transient Counter htmlChunksCounter;
+ public transient Counter pcapngChunksCounter;
+ public transient Counter mediaChunksCounter;
private boolean isAsync;
private Connection syncHBaseConnection;
private AsyncConnection AsyncHBaseConnection;
@@ -55,6 +75,12 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
private int chunkCount;
private long maxBatchSize;
private long maxBatchCount;
+ private long rateLimitThreshold;
+ private String rateLimitExpression;
+ private long timestamp;
+ private long count;
+ private JexlExpression jexlExpression;
+ private JexlContext jexlContext;
public HBaseSink(Configuration configuration) {
this.configuration = configuration;
@@ -63,28 +89,63 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
- lessThan5KBChunksCounter = metricGroup.counter("lessThan5KBChunksCount");
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup("file_chunk_combiner", "sink_hbase");
+ errorChunksCounter = metricGroup.counter("errorChunksCount");
+ chunksInCounter = metricGroup.counter("chunksInCount");
+ chunksOutCounter = metricGroup.counter("chunksOutCount");
+ bytesInCounter = metricGroup.counter("bytesInCount");
+ bytesOutCounter = metricGroup.counter("bytesOutCount");
+ filesCounter = metricGroup.counter("filesCount");
+ rateLimitDropChunksCounter = metricGroup.counter("rateLimitDropChunksCount");
+ metricGroup.meter("numChunksInPerSecond", new MeterView(chunksInCounter));
+ metricGroup.meter("numChunksOutPerSecond", new MeterView(chunksOutCounter));
+ metricGroup.meter("numBytesInPerSecond", new MeterView(bytesInCounter));
+ metricGroup.meter("numBytesOutPerSecond", new MeterView(bytesOutCounter));
+ metricGroup.meter("numErrorChunksPerSecond", new MeterView(errorChunksCounter));
+ metricGroup.meter("numFilesOutPerSecond", new MeterView(filesCounter));
+ metricGroup.meter("numChunksRateLimitDropPerSecond", new MeterView(rateLimitDropChunksCounter));
+ lessThan1KBChunksCounter = metricGroup.counter("lessThan1KBChunksCount");
+ between1KBAnd5KBChunksCounter = metricGroup.counter("between1KBAnd5KBChunksCount");
between5KBAnd10KBChunksCounter = metricGroup.counter("between5KBAnd10KBChunksCount");
- between10KBAnd50KBChunksCounter = metricGroup.counter("between10KBAnd50KBChunksCount");
- between50KBAnd100KBChunksCounter = metricGroup.counter("between50KBAnd100KBChunksCount");
+ between10KBAnd100KBChunksCounter = metricGroup.counter("between10KBAnd100KBChunksCount");
between100KBAnd1MBChunksCounter = metricGroup.counter("between100KBAnd1MBChunksCount");
greaterThan1MBChunksCounter = metricGroup.counter("greaterThan1MBChunksCount");
- metricGroup.meter("numLessThan5KBChunksOutPerSecond", new MeterView(lessThan5KBChunksCounter));
+ metricGroup.meter("numLessThan1KBChunksOutPerSecond", new MeterView(lessThan1KBChunksCounter));
+ metricGroup.meter("numBetween1KBAnd5KBChunksOutPerSecond", new MeterView(between1KBAnd5KBChunksCounter));
metricGroup.meter("numBetween5KBAnd10KBChunksOutPerSecond", new MeterView(between5KBAnd10KBChunksCounter));
- metricGroup.meter("numBetween10KBAnd50KBChunksOutPerSecond", new MeterView(between10KBAnd50KBChunksCounter));
- metricGroup.meter("numBetween50KBAnd100KBChunkPsOuterSecond", new MeterView(between50KBAnd100KBChunksCounter));
+ metricGroup.meter("numBetween10KBAnd100KBChunksOutPerSecond", new MeterView(between10KBAnd100KBChunksCounter));
metricGroup.meter("numBetween100KBAnd1MBChunksOutPerSecond", new MeterView(between100KBAnd1MBChunksCounter));
metricGroup.meter("numGreaterThan1MBChunksOutPerSecond", new MeterView(greaterThan1MBChunksCounter));
- sinkRequestsCounter = metricGroup.counter("sinkRequestsCount");
- sinkErrorRequestsCounter = metricGroup.counter("sinkErrorRequestsCount");
- sinkFilesCounter = metricGroup.counter("sinkFilesCount");
- sinkChunksCounter = metricGroup.counter("sinkChunksCount");
- metricGroup.meter("numRequestsSinkPerSecond", new MeterView(sinkRequestsCounter, 5));
- metricGroup.meter("numErrorRequestsSinkPerSecond", new MeterView(sinkErrorRequestsCounter));
- metricGroup.meter("numFilesSinkPerSecond", new MeterView(sinkFilesCounter));
- metricGroup.meter("numChunksSinkPerSecond", new MeterView(sinkChunksCounter));
-
+ lessThan10KBEmlChunksCounter = metricGroup.counter("lessThan10KBEmlChunksCount");
+ between10KBAnd100KBEmlChunksCounter = metricGroup.counter("between10KBAnd100KBEmlChunksCount");
+ between100KBAnd1MBEmlChunksCounter = metricGroup.counter("between100KBAnd1MBEmlChunksCount");
+ between1MBAnd10MBEmlChunksCounter = metricGroup.counter("between1MBAnd10MBEmlChunksCount");
+ greaterThan10MBEmlChunksCounter = metricGroup.counter("greaterThan10MBEmlChunksCount");
+ metricGroup.meter("numLessThan10KBEmlChunksOutPerSecond", new MeterView(lessThan10KBEmlChunksCounter));
+ metricGroup.meter("numBetween10KBAnd100KBEmlChunksOutPerSecond", new MeterView(between10KBAnd100KBEmlChunksCounter));
+ metricGroup.meter("numBetween100KBAnd1MBEmlChunksOutPerSecond", new MeterView(between100KBAnd1MBEmlChunksCounter));
+ metricGroup.meter("numBetween1MBAnd10MBEmlChunksOutPerSecond", new MeterView(between1MBAnd10MBEmlChunksCounter));
+ metricGroup.meter("numGreaterThan10MBEmlChunksOutPerSecond", new MeterView(greaterThan10MBEmlChunksCounter));
+ lessThan10KBTxtChunksCounter = metricGroup.counter("lessThan10KBTxtChunksCount");
+ between10KBAnd100KBTxtChunksCounter = metricGroup.counter("between10KBAnd100KBTxtChunksCount");
+ between100KBAnd1MBTxtChunksCounter = metricGroup.counter("between100KBAnd1MBTxtChunksCount");
+ between1MBAnd10MBTxtChunksCounter = metricGroup.counter("between1MBAnd10MBTxtChunksCount");
+ greaterThan10MBTxtChunksCounter = metricGroup.counter("greaterThan10MBTxtChunksCount");
+ metricGroup.meter("numLessThan10KBTxtChunksOutPerSecond", new MeterView(lessThan10KBTxtChunksCounter));
+ metricGroup.meter("numBetween10KBAnd100KBTxtChunksOutPerSecond", new MeterView(between10KBAnd100KBTxtChunksCounter));
+ metricGroup.meter("numBetween100KBAnd1MBTxtChunksOutPerSecond", new MeterView(between100KBAnd1MBTxtChunksCounter));
+ metricGroup.meter("numBetween1MBAnd10MBTxtChunksOutPerSecond", new MeterView(between1MBAnd10MBTxtChunksCounter));
+ metricGroup.meter("numGreaterThan10MBTxtChunksOutPerSecond", new MeterView(greaterThan10MBTxtChunksCounter));
+ emlChunksCounter = metricGroup.counter("emlChunksCount");
+ txtChunksCounter = metricGroup.counter("txtChunksCount");
+ htmlChunksCounter = metricGroup.counter("htmlChunksCount");
+ pcapngChunksCounter = metricGroup.counter("pcapngChunksCount");
+ mediaChunksCounter = metricGroup.counter("mediaChunksCount");
+ metricGroup.meter("numEmlChunksOutPerSecond", new MeterView(emlChunksCounter));
+ metricGroup.meter("numTxtChunksOutPerSecond", new MeterView(txtChunksCounter));
+ metricGroup.meter("numHtmlChunksOutPerSecond", new MeterView(htmlChunksCounter));
+ metricGroup.meter("numPcapngChunksOutPerSecond", new MeterView(pcapngChunksCounter));
+ metricGroup.meter("numMediaChunksOutPerSecond", new MeterView(mediaChunksCounter));
isAsync = configuration.getBoolean(Configs.SINK_ASYNC);
if (isAsync) {
AsyncHBaseConnection = HBaseConnectionUtil.getInstance(configuration).getAsyncHBaseConnection();
@@ -104,12 +165,54 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
indexFilenamePutList = new ArrayList<>();
chunkSize = 0;
chunkCount = 0;
+ rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD);
+ rateLimitExpression = configuration.getString(Configs.SINK_RATE_LIMIT_EXCLUSION_EXPRESSION);
+ timestamp = System.currentTimeMillis();
+ count = 0;
+ JexlEngine jexlEngine = new JexlBuilder().create();
+ jexlExpression = jexlEngine.createExpression(rateLimitExpression);
+ jexlContext = new MapContext();
}
@Override
public void invoke(FileChunk fileChunk, Context context) {
+ chunksInCounter.inc();
+ bytesInCounter.inc(fileChunk.getLength());
+ if (rateLimitThreshold > 0) {
+ count++;
+ if (System.currentTimeMillis() - timestamp < 1000 && count > rateLimitThreshold) {
+ if (checkFileChunk(fileChunk)) {
+ sendFileChunk(fileChunk);
+ } else {
+ rateLimitDropChunksCounter.inc();
+ }
+ } else if (System.currentTimeMillis() - timestamp >= 1000) {
+ if (checkFileChunk(fileChunk)) {
+ sendFileChunk(fileChunk);
+ } else {
+ rateLimitDropChunksCounter.inc();
+ timestamp = System.currentTimeMillis();
+ count = 0;
+ }
+ } else {
+ sendFileChunk(fileChunk);
+ }
+ } else {
+ sendFileChunk(fileChunk);
+ }
+ }
+
+ @Override
+ public void close() {
+ IoUtil.close(table);
+ IoUtil.close(indexTimeTable);
+ IoUtil.close(indexFilenameTable);
+ IoUtil.close(syncHBaseConnection);
+ IoUtil.close(AsyncHBaseConnection);
+ }
+
+ private void sendFileChunk(FileChunk fileChunk) {
if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode()) && configuration.get(Configs.SINK_BATCH)) {
- sinkChunksCounter.inc();
byte[] data = "".getBytes();
if (fileChunk.getChunk() != null) {
data = fileChunk.getChunk();
@@ -146,7 +249,6 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
Put indexFilenamePut = new Put(Bytes.toBytes(indexFilenameKey));
indexFilenamePut.addColumn(HBaseColumnConstants.BYTE_FAMILY_META, HBaseColumnConstants.BYTE_COLUMN_FILENAME, Bytes.toBytes(fileChunk.getUuid()));
indexFilenamePutList.add(indexFilenamePut);
- sinkFilesCounter.inc();
} else {
Put metaPut = new Put(Bytes.toBytes(PublicUtil.getRowKey(fileChunk.getUuid())));
metaPut.addColumn(HBaseColumnConstants.BYTE_FAMILY_META, HBaseColumnConstants.BYTE_COLUMN_LAST_MODIFIED, Bytes.toBytes(timestamp));
@@ -154,18 +256,19 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
}
chunkCount++;
chunkSize += chunkLength;
- calculateChunkSize(chunkLength);
+ chunksOutCounter.inc();
+ bytesOutCounter.inc(chunkLength);
+ calculateFileChunkMetrics(fileChunk);
if (chunkSize >= maxBatchSize || chunkCount >= maxBatchCount) {
if (isAsync) {
if (dataPutList.size() > 0) {
List<CompletableFuture<Object>> futures = asyncTable.batch(dataPutList);
- sinkRequestsCounter.inc();
CompletableFuture.supplyAsync(() -> {
for (CompletableFuture<Object> completableFuture : futures) {
completableFuture.whenCompleteAsync((result, error) -> {
if (error != null) {
LOG.error("put chunk to hbase error. ", error.getMessage());
- sinkErrorRequestsCounter.inc();
+ errorChunksCounter.inc();
}
});
}
@@ -175,44 +278,37 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
}
if (indexTimePutList.size() > 0) {
asyncIndexTimeTable.batch(indexTimePutList);
- sinkRequestsCounter.inc();
indexTimePutList.clear();
}
if (indexFilenamePutList.size() > 0) {
asyncIndexFilenameTable.batch(indexFilenamePutList);
- sinkRequestsCounter.inc();
indexFilenamePutList.clear();
}
} else {
if (dataPutList.size() > 0) {
try {
- sinkRequestsCounter.inc();
table.batch(dataPutList, null);
} catch (IOException | InterruptedException e) {
LOG.error("put chunk to hbase data table error. ", e.getMessage());
- sinkErrorRequestsCounter.inc();
+ errorChunksCounter.inc(dataPutList.size());
} finally {
dataPutList.clear();
}
}
if (indexTimePutList.size() > 0) {
try {
- sinkRequestsCounter.inc();
indexTimeTable.batch(indexTimePutList, null);
} catch (IOException | InterruptedException e) {
LOG.error("put chunk to hbase index time table error. ", e.getMessage());
- sinkErrorRequestsCounter.inc();
} finally {
indexTimePutList.clear();
}
}
if (indexFilenamePutList.size() > 0) {
try {
- sinkRequestsCounter.inc();
indexFilenameTable.batch(indexFilenamePutList, null);
} catch (IOException | InterruptedException e) {
LOG.error("put chunk to hbase index filename table error. ", e.getMessage());
- sinkErrorRequestsCounter.inc();
} finally {
indexFilenamePutList.clear();
}
@@ -224,28 +320,70 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
}
}
- @Override
- public void close() {
- IoUtil.close(table);
- IoUtil.close(indexTimeTable);
- IoUtil.close(indexFilenameTable);
- IoUtil.close(syncHBaseConnection);
- IoUtil.close(AsyncHBaseConnection);
+ private boolean checkFileChunk(FileChunk fileChunk) {
+ if (StrUtil.isNotEmpty(rateLimitExpression)) {
+ jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk);
+ return Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString());
+ }
+ return false;
}
- private void calculateChunkSize(long length) {
- if (length <= 5 * 1024) {
- lessThan5KBChunksCounter.inc();
+ private void calculateFileChunkMetrics(FileChunk fileChunk) {
+ long length = fileChunk.getLength();
+ String fileType = fileChunk.getFileType();
+ if (length <= 1024) {
+ lessThan1KBChunksCounter.inc();
+ } else if (length <= 5 * 1024) {
+ between1KBAnd5KBChunksCounter.inc();
} else if (length <= 10 * 1024) {
between5KBAnd10KBChunksCounter.inc();
- } else if (length <= 50 * 1024) {
- between10KBAnd50KBChunksCounter.inc();
} else if (length <= 100 * 1024) {
- between50KBAnd100KBChunksCounter.inc();
+ between10KBAnd100KBChunksCounter.inc();
} else if (length <= 1024 * 1024) {
between100KBAnd1MBChunksCounter.inc();
} else {
greaterThan1MBChunksCounter.inc();
}
+ switch (fileType) {
+ case "eml":
+ emlChunksCounter.inc();
+ if (length <= 10 * 1024) {
+ lessThan10KBEmlChunksCounter.inc();
+ } else if (length <= 100 * 1024) {
+ between10KBAnd100KBEmlChunksCounter.inc();
+ } else if (length <= 1024 * 1024) {
+ between100KBAnd1MBEmlChunksCounter.inc();
+ } else if (length <= 10 * 1024 * 1024) {
+ between1MBAnd10MBEmlChunksCounter.inc();
+ } else {
+ greaterThan10MBEmlChunksCounter.inc();
+ }
+ break;
+ case "html":
+ htmlChunksCounter.inc();
+ break;
+ case "txt":
+ txtChunksCounter.inc();
+ if (length <= 10 * 1024) {
+ lessThan10KBTxtChunksCounter.inc();
+ } else if (length <= 100 * 1024) {
+ between10KBAnd100KBTxtChunksCounter.inc();
+ } else if (length <= 1024 * 1024) {
+ between100KBAnd1MBTxtChunksCounter.inc();
+ } else if (length <= 10 * 1024 * 1024) {
+ between1MBAnd10MBTxtChunksCounter.inc();
+ } else {
+ greaterThan10MBTxtChunksCounter.inc();
+ }
+ break;
+ case "pcapng":
+ pcapngChunksCounter.inc();
+ break;
+ default:
+ mediaChunksCounter.inc();
+ }
+ if (fileChunk.getLastChunkFlag() == 1) {
+ filesCounter.inc();
+ }
}
}
diff --git a/src/main/java/com/zdjizhi/sink/HosSink.java b/src/main/java/com/zdjizhi/sink/HosSink.java
index 3256cef..ac25022 100644
--- a/src/main/java/com/zdjizhi/sink/HosSink.java
+++ b/src/main/java/com/zdjizhi/sink/HosSink.java
@@ -8,7 +8,7 @@ import com.zdjizhi.config.Configs;
import com.zdjizhi.pojo.FileChunk;
import com.zdjizhi.utils.HttpClientUtil;
import com.zdjizhi.utils.PublicUtil;
-import org.apache.commons.lang.CharEncoding;
+import org.apache.commons.jexl3.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MeterView;
@@ -36,23 +36,40 @@ public class HosSink extends RichSinkFunction<FileChunk> {
private static final Log LOG = LogFactory.get();
private final Configuration configuration;
- public transient Counter sinkRequestsCounter;
- public transient Counter sinkErrorRequestsCounter;
- public transient Counter sinkFilesCounter;
- public transient Counter sinkChunksCounter;
- public transient Counter lessThan5KBChunksCounter;
+ public transient Counter chunksInCounter;
+ public transient Counter chunksOutCounter;
+ public transient Counter bytesInCounter;
+ public transient Counter bytesOutCounter;
+ public transient Counter errorChunksCounter;
+ public transient Counter filesCounter;
+ public transient Counter rateLimitDropChunksCounter;
+ public transient Counter lessThan1KBChunksCounter;
+ public transient Counter between1KBAnd5KBChunksCounter;
public transient Counter between5KBAnd10KBChunksCounter;
- public transient Counter between10KBAnd50KBChunksCounter;
- public transient Counter between50KBAnd100KBChunksCounter;
+ public transient Counter between10KBAnd100KBChunksCounter;
public transient Counter between100KBAnd1MBChunksCounter;
public transient Counter greaterThan1MBChunksCounter;
+ public transient Counter lessThan10KBEmlChunksCounter;
+ public transient Counter between1MBAnd10MBEmlChunksCounter;
+ public transient Counter between10KBAnd100KBEmlChunksCounter;
+ public transient Counter between100KBAnd1MBEmlChunksCounter;
+ public transient Counter greaterThan10MBEmlChunksCounter;
+ public transient Counter lessThan10KBTxtChunksCounter;
+ public transient Counter between1MBAnd10MBTxtChunksCounter;
+ public transient Counter between10KBAnd100KBTxtChunksCounter;
+ public transient Counter between100KBAnd1MBTxtChunksCounter;
+ public transient Counter greaterThan10MBTxtChunksCounter;
+ public transient Counter emlChunksCounter;
+ public transient Counter txtChunksCounter;
+ public transient Counter htmlChunksCounter;
+ public transient Counter pcapngChunksCounter;
+ public transient Counter mediaChunksCounter;
private boolean isAsync;
private CloseableHttpClient syncHttpClient;
private CloseableHttpAsyncClient asyncHttpClient;
private int loadBalanceMode;
+ private List<String> endpointList;
private volatile String endpoint;
- private List<String> ipList;
- private List<String> portList;
private String token;
private volatile String bathPutUrl;
private HashMap<String, String> hosMessage;
@@ -63,6 +80,12 @@ public class HosSink extends RichSinkFunction<FileChunk> {
private long maxBatchCount;
private long chunkSize = 0;
private int chunkCount = 0;
+ private long rateLimitThreshold;
+ private String rateLimitExpression;
+ private long timestamp;
+ private long count;
+ private JexlExpression jexlExpression;
+ private JexlContext jexlContext;
public HosSink(Configuration configuration) {
this.configuration = configuration;
@@ -71,36 +94,70 @@ public class HosSink extends RichSinkFunction<FileChunk> {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
- lessThan5KBChunksCounter = metricGroup.counter("lessThan5KBChunksCount");
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup("file_chunk_combiner", "sink_hos");
+ errorChunksCounter = metricGroup.counter("errorChunksCount");
+ chunksInCounter = metricGroup.counter("chunksInCount");
+ bytesInCounter = metricGroup.counter("bytesInCount");
+ chunksOutCounter = metricGroup.counter("chunksOutCount");
+ bytesOutCounter = metricGroup.counter("bytesOutCount");
+ filesCounter = metricGroup.counter("filesCount");
+ rateLimitDropChunksCounter = metricGroup.counter("rateLimitDropChunksCount");
+ metricGroup.meter("numChunksInPerSecond", new MeterView(chunksInCounter));
+ metricGroup.meter("numChunksOutPerSecond", new MeterView(chunksOutCounter));
+ metricGroup.meter("numBytesInPerSecond", new MeterView(bytesInCounter));
+ metricGroup.meter("numBytesOutPerSecond", new MeterView(bytesOutCounter));
+ metricGroup.meter("numErrorChunksPerSecond", new MeterView(errorChunksCounter));
+ metricGroup.meter("numFilesOutPerSecond", new MeterView(filesCounter));
+ metricGroup.meter("numChunksRateLimitDropPerSecond", new MeterView(rateLimitDropChunksCounter));
+ lessThan1KBChunksCounter = metricGroup.counter("lessThan1KBChunksCount");
+ between1KBAnd5KBChunksCounter = metricGroup.counter("between1KBAnd5KBChunksCount");
between5KBAnd10KBChunksCounter = metricGroup.counter("between5KBAnd10KBChunksCount");
- between10KBAnd50KBChunksCounter = metricGroup.counter("between10KBAnd50KBChunksCount");
- between50KBAnd100KBChunksCounter = metricGroup.counter("between50KBAnd100KBChunksCount");
+ between10KBAnd100KBChunksCounter = metricGroup.counter("between10KBAnd100KBChunksCount");
between100KBAnd1MBChunksCounter = metricGroup.counter("between100KBAnd1MBChunksCount");
greaterThan1MBChunksCounter = metricGroup.counter("greaterThan1MBChunksCount");
- metricGroup.meter("numLessThan5KBChunksOutPerSecond", new MeterView(lessThan5KBChunksCounter));
+ metricGroup.meter("numLessThan1KBChunksOutPerSecond", new MeterView(lessThan1KBChunksCounter));
+ metricGroup.meter("numBetween1KBAnd5KBChunksOutPerSecond", new MeterView(between1KBAnd5KBChunksCounter));
metricGroup.meter("numBetween5KBAnd10KBChunksOutPerSecond", new MeterView(between5KBAnd10KBChunksCounter));
- metricGroup.meter("numBetween10KBAnd50KBChunksOutPerSecond", new MeterView(between10KBAnd50KBChunksCounter));
- metricGroup.meter("numBetween50KBAnd100KBChunkPsOuterSecond", new MeterView(between50KBAnd100KBChunksCounter));
+ metricGroup.meter("numBetween10KBAnd100KBChunksOutPerSecond", new MeterView(between10KBAnd100KBChunksCounter));
metricGroup.meter("numBetween100KBAnd1MBChunksOutPerSecond", new MeterView(between100KBAnd1MBChunksCounter));
metricGroup.meter("numGreaterThan1MBChunksOutPerSecond", new MeterView(greaterThan1MBChunksCounter));
- sinkRequestsCounter = metricGroup.counter("sinkRequestsCount");
- sinkErrorRequestsCounter = metricGroup.counter("sinkErrorRequestsCount");
- sinkFilesCounter = metricGroup.counter("sinkFilesCount");
- sinkChunksCounter = metricGroup.counter("sinkChunksCount");
- metricGroup.meter("numRequestsSinkPerSecond", new MeterView(sinkRequestsCounter, 5));
- metricGroup.meter("numErrorRequestsSinkPerSecond", new MeterView(sinkErrorRequestsCounter));
- metricGroup.meter("numFilesSinkPerSecond", new MeterView(sinkFilesCounter));
- metricGroup.meter("numChunksSinkPerSecond", new MeterView(sinkChunksCounter));
-
- loadBalanceMode = configuration.getInteger(Configs.SINK_HOS_LOAD_BALANCE_MODE);
- if (loadBalanceMode == 0) {
- endpoint = configuration.getString(Configs.SINK_HOS_ENDPOINT);
- } else if (loadBalanceMode == 1) {
- String[] ipPortArr = configuration.get(Configs.SINK_HOS_ENDPOINT).split(":");
- ipList = Arrays.asList(ipPortArr[0].split(","));
- portList = Arrays.asList(ipPortArr[1].split(","));
- endpoint = ipList.get(RandomUtil.randomInt(ipList.size())) + ":" + portList.get(RandomUtil.randomInt(portList.size()));
+ lessThan10KBEmlChunksCounter = metricGroup.counter("lessThan10KBEmlChunksCount");
+ between10KBAnd100KBEmlChunksCounter = metricGroup.counter("between10KBAnd100KBEmlChunksCount");
+ between100KBAnd1MBEmlChunksCounter = metricGroup.counter("between100KBAnd1MBEmlChunksCount");
+ between1MBAnd10MBEmlChunksCounter = metricGroup.counter("between1MBAnd10MBEmlChunksCount");
+ greaterThan10MBEmlChunksCounter = metricGroup.counter("greaterThan10MBEmlChunksCount");
+ metricGroup.meter("numLessThan10KBEmlChunksOutPerSecond", new MeterView(lessThan10KBEmlChunksCounter));
+ metricGroup.meter("numBetween10KBAnd100KBEmlChunksOutPerSecond", new MeterView(between10KBAnd100KBEmlChunksCounter));
+ metricGroup.meter("numBetween100KBAnd1MBEmlChunksOutPerSecond", new MeterView(between100KBAnd1MBEmlChunksCounter));
+ metricGroup.meter("numBetween1MBAnd10MBEmlChunksOutPerSecond", new MeterView(between1MBAnd10MBEmlChunksCounter));
+ metricGroup.meter("numGreaterThan10MBEmlChunksOutPerSecond", new MeterView(greaterThan10MBEmlChunksCounter));
+ lessThan10KBTxtChunksCounter = metricGroup.counter("lessThan10KBTxtChunksCount");
+ between10KBAnd100KBTxtChunksCounter = metricGroup.counter("between10KBAnd100KBTxtChunksCount");
+ between100KBAnd1MBTxtChunksCounter = metricGroup.counter("between100KBAnd1MBTxtChunksCount");
+ between1MBAnd10MBTxtChunksCounter = metricGroup.counter("between1MBAnd10MBTxtChunksCount");
+ greaterThan10MBTxtChunksCounter = metricGroup.counter("greaterThan10MBTxtChunksCount");
+ metricGroup.meter("numLessThan10KBTxtChunksOutPerSecond", new MeterView(lessThan10KBTxtChunksCounter));
+ metricGroup.meter("numBetween10KBAnd100KBTxtChunksOutPerSecond", new MeterView(between10KBAnd100KBTxtChunksCounter));
+ metricGroup.meter("numBetween100KBAnd1MBTxtChunksOutPerSecond", new MeterView(between100KBAnd1MBTxtChunksCounter));
+ metricGroup.meter("numBetween1MBAnd10MBTxtChunksOutPerSecond", new MeterView(between1MBAnd10MBTxtChunksCounter));
+ metricGroup.meter("numGreaterThan10MBTxtChunksOutPerSecond", new MeterView(greaterThan10MBTxtChunksCounter));
+ emlChunksCounter = metricGroup.counter("emlChunksCount");
+ txtChunksCounter = metricGroup.counter("txtChunksCount");
+ htmlChunksCounter = metricGroup.counter("htmlChunksCount");
+ pcapngChunksCounter = metricGroup.counter("pcapngChunksCount");
+ mediaChunksCounter = metricGroup.counter("mediaChunksCount");
+ metricGroup.meter("numEmlChunksOutPerSecond", new MeterView(emlChunksCounter));
+ metricGroup.meter("numTxtChunksOutPerSecond", new MeterView(txtChunksCounter));
+ metricGroup.meter("numHtmlChunksOutPerSecond", new MeterView(htmlChunksCounter));
+ metricGroup.meter("numPcapngChunksOutPerSecond", new MeterView(pcapngChunksCounter));
+ metricGroup.meter("numMediaChunksOutPerSecond", new MeterView(mediaChunksCounter));
+ endpointList = Arrays.asList(configuration.get(Configs.SINK_HOS_ENDPOINT).split(","));
+ if (endpointList.size() == 1) {
+ loadBalanceMode = 0;
+ endpoint = endpointList.get(0);
+ } else {
+ loadBalanceMode = 1;
+ endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size()));
}
token = configuration.get(Configs.SINK_HOS_TOKEN);
isAsync = configuration.getBoolean(Configs.SINK_ASYNC);
@@ -117,96 +174,40 @@ public class HosSink extends RichSinkFunction<FileChunk> {
objectsMeta = "";
objectsOffset = "";
byteList = new ArrayList<>();
+ rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD);
+ rateLimitExpression = configuration.getString(Configs.SINK_RATE_LIMIT_EXCLUSION_EXPRESSION);
+ timestamp = System.currentTimeMillis();
+ count = 0;
+ JexlEngine jexlEngine = new JexlBuilder().create();
+ jexlExpression = jexlEngine.createExpression(rateLimitExpression);
+ jexlContext = new MapContext();
}
@Override
public void invoke(FileChunk fileChunk, Context context) {
- byte[] data = "".getBytes();
- if (fileChunk.getChunk() != null) {
- data = fileChunk.getChunk();
- }
- long chunkLength = data.length;
- sinkChunksCounter.inc();
- if (configuration.get(Configs.SINK_BATCH)) {
- hosMessage.put(HOS_META_FILE_TYPE, fileChunk.getFileType());
- hosMessage.put(HOS_META_FILENAME, fileChunk.getUuid());
- if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) {
- hosMessage.put(HOS_OFFSET, fileChunk.getOffset() + "");
- hosMessage.put(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + "");
- if (fileChunk.getOffset() == 0) {
- sinkFilesCounter.inc();
- }
- } else {
- hosMessage.put(HOS_PART_NUMBER, fileChunk.getTimestamp() + "");
- hosMessage.put(HOS_PART_CHUNK_NUMBERS, fileChunk.getChunkNumbers());
- }
- hosMessage.put(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + "");
- Map<String, Object> metaMap = fileChunk.getMeta();
- if (metaMap != null && metaMap.size() > 0) {
- for (String meta : metaMap.keySet()) {
- hosMessage.put(HOS_META_PREFIX + StrUtil.toSymbolCase(meta, CharUtil.DASHED), metaMap.get(meta) + "");
+ chunksInCounter.inc();
+ bytesInCounter.inc(fileChunk.getLength());
+ if (rateLimitThreshold > 0) {
+ count++;
+ if (System.currentTimeMillis() - timestamp < 1000 && count > rateLimitThreshold) {
+ if (checkFileChunk(fileChunk)) {
+ sendFileChunk(fileChunk);
+ } else {
+ rateLimitDropChunksCounter.inc();
}
- }
- objectsMeta += hosMessage.toString() + ";";
- hosMessage.clear();
- objectsOffset += chunkLength + ";";
- byteList.add(data);
- chunkCount++;
- chunkSize += chunkLength;
- calculateChunkSize(chunkLength);
- if (chunkSize >= maxBatchSize || chunkCount >= maxBatchCount) {
- HttpPut httpPut = new HttpPut(bathPutUrl);
- httpPut.setHeader(TOKEN, token);
- httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2);
- httpPut.setHeader(HOS_COMBINE_MODE, fileChunk.getCombineMode());
- httpPut.setHeader(HOS_OBJECTS_META, objectsMeta);
- httpPut.setHeader(HOS_OBJECTS_OFFSET, objectsOffset);
- byte[][] bytes = new byte[byteList.size()][];
- byteList.toArray(bytes);
- byte[] newData = ArrayUtil.addAll(bytes);
- httpPut.setEntity(new ByteArrayEntity(newData));
- byteList.clear();
- executeRequest(httpPut);
- objectsMeta = "";
- objectsOffset = "";
- chunkSize = 0;
- chunkCount = 0;
- }
- } else {
- String url = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid());
- HttpPut httpPut = new HttpPut(url);
- httpPut.setHeader(TOKEN, configuration.get(Configs.SINK_HOS_TOKEN));
- httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2);
- httpPut.setHeader(HOS_COMBINE_MODE, fileChunk.getCombineMode());
- String filename = fileChunk.getFileName();
- if (StrUtil.isNotEmpty(filename) && filename.contains(".")) {
- httpPut.setHeader(HOS_META_FILENAME, filename);
- } else if (StrUtil.isNotEmpty(filename) && !filename.contains(".")) {
- filename = filename + "." + fileChunk.getFileType();
- httpPut.setHeader(HOS_META_FILENAME, filename);
- } else if (StrUtil.isEmpty(filename) && StrUtil.isNotEmpty(fileChunk.getFileType())) {
- httpPut.setHeader(HOS_META_FILE_TYPE, fileChunk.getFileType());
- }
- if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) {
- httpPut.setHeader(HOS_OFFSET, fileChunk.getOffset() + "");
- httpPut.setHeader(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + "");
- if (fileChunk.getOffset() == 0) {
- sinkFilesCounter.inc();
+ } else if (System.currentTimeMillis() - timestamp >= 1000) {
+ if (checkFileChunk(fileChunk)) {
+ sendFileChunk(fileChunk);
+ } else {
+ rateLimitDropChunksCounter.inc();
+ timestamp = System.currentTimeMillis();
+ count = 0;
}
} else {
- httpPut.setHeader(HOS_PART_NUMBER, fileChunk.getTimestamp() + "");
- httpPut.setHeader(HOS_PART_CHUNK_NUMBERS, fileChunk.getChunkNumbers());
- }
- httpPut.setHeader(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + "");
- Map<String, Object> metaMap = fileChunk.getMeta();
- if (metaMap != null && metaMap.size() > 0) {
- for (String meta : metaMap.keySet()) {
- httpPut.setHeader(HOS_META_PREFIX + StrUtil.toSymbolCase(meta, CharUtil.DASHED), metaMap.get(meta) + "");
- }
+ sendFileChunk(fileChunk);
}
- httpPut.setEntity(new ByteArrayEntity(data));
- calculateChunkSize(chunkLength);
- executeRequest(httpPut);
+ } else {
+ sendFileChunk(fileChunk);
}
}
@@ -216,30 +217,121 @@ public class HosSink extends RichSinkFunction<FileChunk> {
IoUtil.close(asyncHttpClient);
}
+ private void sendFileChunk(FileChunk fileChunk) {
+ try {
+ byte[] data = "".getBytes();
+ if (fileChunk.getChunk() != null) {
+ data = fileChunk.getChunk();
+ }
+ long chunkLength = data.length;
+ if (configuration.get(Configs.SINK_BATCH)) {
+ hosMessage.put(HOS_META_FILE_TYPE, fileChunk.getFileType());
+ hosMessage.put(HOS_META_FILENAME, fileChunk.getUuid());
+ if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) {
+ hosMessage.put(HOS_OFFSET, fileChunk.getOffset() + "");
+ hosMessage.put(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + "");
+ } else {
+ hosMessage.put(HOS_PART_NUMBER, fileChunk.getTimestamp() + "");
+ hosMessage.put(HOS_PART_CHUNK_NUMBERS, fileChunk.getChunkNumbers());
+ }
+ hosMessage.put(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + "");
+ Map<String, Object> metaMap = fileChunk.getMeta();
+ if (metaMap != null && metaMap.size() > 0) {
+ for (String meta : metaMap.keySet()) {
+ hosMessage.put(HOS_META_PREFIX + StrUtil.toSymbolCase(meta, CharUtil.DASHED), metaMap.get(meta) + "");
+ }
+ }
+ objectsMeta += hosMessage.toString() + ";";
+ hosMessage.clear();
+ objectsOffset += chunkLength + ";";
+ byteList.add(data);
+ chunkCount++;
+ chunkSize += chunkLength;
+ chunksOutCounter.inc();
+ bytesOutCounter.inc(chunkLength);
+ calculateFileChunkMetrics(fileChunk);
+ if (chunkSize >= maxBatchSize || chunkCount >= maxBatchCount) {
+ HttpPut httpPut = new HttpPut(bathPutUrl);
+ httpPut.setHeader(TOKEN, token);
+ httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2);
+ httpPut.setHeader(HOS_COMBINE_MODE, fileChunk.getCombineMode());
+ httpPut.setHeader(HOS_OBJECTS_META, objectsMeta);
+ httpPut.setHeader(HOS_OBJECTS_OFFSET, objectsOffset);
+ byte[][] bytes = new byte[byteList.size()][];
+ byteList.toArray(bytes);
+ byte[] newData = ArrayUtil.addAll(bytes);
+ httpPut.setEntity(new ByteArrayEntity(newData));
+ byteList.clear();
+ executeRequest(httpPut);
+ objectsMeta = "";
+ objectsOffset = "";
+ chunkSize = 0;
+ chunkCount = 0;
+ }
+ } else {
+ String url = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid());
+ HttpPut httpPut = new HttpPut(url);
+ httpPut.setHeader(TOKEN, configuration.get(Configs.SINK_HOS_TOKEN));
+ httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2);
+ httpPut.setHeader(HOS_COMBINE_MODE, fileChunk.getCombineMode());
+ String filename = fileChunk.getFileName();
+ if (StrUtil.isNotEmpty(filename) && filename.contains(".")) {
+ httpPut.setHeader(HOS_META_FILENAME, filename);
+ } else if (StrUtil.isNotEmpty(filename) && !filename.contains(".")) {
+ filename = filename + "." + fileChunk.getFileType();
+ httpPut.setHeader(HOS_META_FILENAME, filename);
+ } else if (StrUtil.isEmpty(filename) && StrUtil.isNotEmpty(fileChunk.getFileType())) {
+ httpPut.setHeader(HOS_META_FILE_TYPE, fileChunk.getFileType());
+ }
+ if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) {
+ httpPut.setHeader(HOS_OFFSET, fileChunk.getOffset() + "");
+ httpPut.setHeader(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + "");
+ } else {
+ httpPut.setHeader(HOS_PART_NUMBER, fileChunk.getTimestamp() + "");
+ httpPut.setHeader(HOS_PART_CHUNK_NUMBERS, fileChunk.getChunkNumbers());
+ }
+ httpPut.setHeader(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + "");
+ Map<String, Object> metaMap = fileChunk.getMeta();
+ if (metaMap != null && metaMap.size() > 0) {
+ for (String meta : metaMap.keySet()) {
+ httpPut.setHeader(HOS_META_PREFIX + StrUtil.toSymbolCase(meta, CharUtil.DASHED), metaMap.get(meta) + "");
+ }
+ }
+ httpPut.setEntity(new ByteArrayEntity(data));
+ executeRequest(httpPut);
+ chunksOutCounter.inc();
+ bytesOutCounter.inc(chunkLength);
+ calculateFileChunkMetrics(fileChunk);
+ }
+ } catch (Exception e) {
+ LOG.error("put part to hos error.", e);
+ errorChunksCounter.inc();
+ }
+ }
+
private void executeRequest(HttpPut httpPut) {
- sinkRequestsCounter.inc();
if (isAsync) {
asyncHttpClient.execute(httpPut, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse httpResponse) {
try {
if (httpResponse.getStatusLine().getStatusCode() != 200) {
- String responseEntity = EntityUtils.toString(httpResponse.getEntity(), CharEncoding.UTF_8);
+ String responseEntity = EntityUtils.toString(httpResponse.getEntity(), "UTF-8");
LOG.error("put part to hos error. code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity);
- sinkErrorRequestsCounter.inc();
+ errorChunksCounter.inc();
}
} catch (IOException e) {
LOG.error("put part to hos error.", e);
- sinkErrorRequestsCounter.inc();
+ errorChunksCounter.inc();
}
}
@Override
public void failed(Exception ex) {
LOG.error("put part to hos error.", ex);
- sinkErrorRequestsCounter.inc();
+ errorChunksCounter.inc();
if (loadBalanceMode == 1 && ex instanceof ConnectException) {
- endpoint = ipList.get(RandomUtil.randomInt(ipList.size())) + ":" + portList.get(RandomUtil.randomInt(portList.size()));
+ endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size()));
bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile";
}
}
@@ -254,15 +346,15 @@ public class HosSink extends RichSinkFunction<FileChunk> {
try {
response = syncHttpClient.execute(httpPut);
if (response.getStatusLine().getStatusCode() != 200) {
- String responseEntity = EntityUtils.toString(response.getEntity(), CharEncoding.UTF_8);
+ String responseEntity = EntityUtils.toString(response.getEntity(), "UTF-8");
LOG.error("put part to hos error. code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity);
- sinkErrorRequestsCounter.inc();
+ errorChunksCounter.inc();
}
} catch (IOException e) {
LOG.error("put part to hos error.", e);
- sinkErrorRequestsCounter.inc();
+ errorChunksCounter.inc();
if (loadBalanceMode == 1 && (e instanceof HttpHostConnectException || e instanceof ConnectTimeoutException)) {
- endpoint = ipList.get(RandomUtil.randomInt(ipList.size())) + ":" + portList.get(RandomUtil.randomInt(portList.size()));
+ endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size()));
}
} finally {
IoUtil.close(response);
@@ -270,19 +362,70 @@ public class HosSink extends RichSinkFunction<FileChunk> {
}
}
- private void calculateChunkSize(long length) {
- if (length <= 5 * 1024) {
- lessThan5KBChunksCounter.inc();
+ private boolean checkFileChunk(FileChunk fileChunk) {
+ if (StrUtil.isNotEmpty(rateLimitExpression)) {
+ jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk);
+ return Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString());
+ }
+ return false;
+ }
+
+ private void calculateFileChunkMetrics(FileChunk fileChunk) {
+ long length = fileChunk.getLength();
+ String fileType = fileChunk.getFileType();
+ if (length <= 1024) {
+ lessThan1KBChunksCounter.inc();
+ } else if (length <= 5 * 1024) {
+ between1KBAnd5KBChunksCounter.inc();
} else if (length <= 10 * 1024) {
between5KBAnd10KBChunksCounter.inc();
- } else if (length <= 50 * 1024) {
- between10KBAnd50KBChunksCounter.inc();
} else if (length <= 100 * 1024) {
- between50KBAnd100KBChunksCounter.inc();
+ between10KBAnd100KBChunksCounter.inc();
} else if (length <= 1024 * 1024) {
between100KBAnd1MBChunksCounter.inc();
} else {
greaterThan1MBChunksCounter.inc();
}
+ switch (fileType) {
+ case "eml":
+ emlChunksCounter.inc();
+ if (length <= 10 * 1024) {
+ lessThan10KBEmlChunksCounter.inc();
+ } else if (length <= 100 * 1024) {
+ between10KBAnd100KBEmlChunksCounter.inc();
+ } else if (length <= 1024 * 1024) {
+ between100KBAnd1MBEmlChunksCounter.inc();
+ } else if (length <= 10 * 1024 * 1024) {
+ between1MBAnd10MBEmlChunksCounter.inc();
+ } else {
+ greaterThan10MBEmlChunksCounter.inc();
+ }
+ break;
+ case "html":
+ htmlChunksCounter.inc();
+ break;
+ case "txt":
+ txtChunksCounter.inc();
+ if (length <= 10 * 1024) {
+ lessThan10KBTxtChunksCounter.inc();
+ } else if (length <= 100 * 1024) {
+ between10KBAnd100KBTxtChunksCounter.inc();
+ } else if (length <= 1024 * 1024) {
+ between100KBAnd1MBTxtChunksCounter.inc();
+ } else if (length <= 10 * 1024 * 1024) {
+ between1MBAnd10MBTxtChunksCounter.inc();
+ } else {
+ greaterThan10MBTxtChunksCounter.inc();
+ }
+ break;
+ case "pcapng":
+ pcapngChunksCounter.inc();
+ break;
+ default:
+ mediaChunksCounter.inc();
+ }
+ if (fileChunk.getLastChunkFlag() == 1) {
+ filesCounter.inc();
+ }
}
}
diff --git a/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java b/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java
new file mode 100644
index 0000000..056d793
--- /dev/null
+++ b/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java
@@ -0,0 +1,392 @@
+package com.zdjizhi.sink;
+
+import cn.hutool.core.io.IoUtil;
+import cn.hutool.core.util.RandomUtil;
+import cn.hutool.core.util.URLUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.zdjizhi.config.Configs;
+import com.zdjizhi.pojo.FileChunk;
+import com.zdjizhi.utils.*;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class OssSinkByCaffeineCache extends RichSinkFunction<FileChunk> {
+ private static final Log LOG = LogFactory.get();
+
+ private final Configuration configuration;
+ public transient Counter chunksInCounter;
+ public transient Counter chunksOutCounter;
+ public transient Counter bytesInCounter;
+ public transient Counter bytesOutCounter;
+ public transient Counter errorChunksCounter;
+ public transient Counter fileMetasCounter;
+ public transient Counter requestFileMetasCounter;
+ public transient Counter responseFileMetasCounter;
+ public transient Counter requestFilesCounter;
+ public transient Counter responseFilesCounter;
+ public transient Counter emlChunksCounter;
+ public transient Counter txtChunksCounter;
+ public transient Counter completeFilesCounter;
+ public transient Counter completeEmlFilesCounter;
+ public transient Counter completeTxtFilesCounter;
+ public transient Counter completeRequestFilesCounter;
+ public transient Counter completeResponseFilesCounter;
+ public transient Counter nullChunksCounter;
+ public transient Counter nullTxtChunksCounter;
+ public transient Counter nullEmlChunksCounter;
+ public transient Counter lessThan1KBChunksCounter;
+ public transient Counter between1KBAnd5KBChunksCounter;
+ public transient Counter between5KBAnd10KBChunksCounter;
+ public transient Counter between10KBAnd100KBChunksCounter;
+ public transient Counter between100KBAnd1MBChunksCounter;
+ public transient Counter greaterThan1MBChunksCounter;
+ public transient Counter lessThan10KBEmlChunksCounter;
+ public transient Counter between1MBAnd10MBEmlChunksCounter;
+ public transient Counter between10KBAnd100KBEmlChunksCounter;
+ public transient Counter between100KBAnd1MBEmlChunksCounter;
+ public transient Counter greaterThan10MBEmlChunksCounter;
+ public transient Counter lessThan10KBTxtChunksCounter;
+ public transient Counter between1MBAnd10MBTxtChunksCounter;
+ public transient Counter between10KBAnd100KBTxtChunksCounter;
+ public transient Counter between100KBAnd1MBTxtChunksCounter;
+ public transient Counter greaterThan10MBTxtChunksCounter;
+ private boolean isAsync;
+ private CloseableHttpClient syncHttpClient;
+ private CloseableHttpAsyncClient asyncHttpClient;
+ private List<String> endpointList;
+ private CaffeineCacheUtil caffeineCacheUtil;
+ private Cache<String, FileChunk> cache;
+
+ public OssSinkByCaffeineCache(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup("file_chunk_combiner", "sink_oss");
+ endpointList = Arrays.asList(configuration.get(Configs.SINK_OSS_ENDPOINT).split(","));
+ isAsync = configuration.getBoolean(Configs.SINK_OSS_ASYNC);
+ if (isAsync) {
+ asyncHttpClient = HttpClientUtil.getInstance(configuration).getAsyncHttpClient();
+ asyncHttpClient.start();
+ } else {
+ syncHttpClient = HttpClientUtil.getInstance(configuration).getSyncHttpClient();
+ }
+ caffeineCacheUtil = CaffeineCacheUtil.getInstance(configuration);
+ cache = caffeineCacheUtil.getCaffeineCache();
+ metricGroup.gauge("cacheLength", (Gauge<Long>) () -> cache.estimatedSize());
+ lessThan1KBChunksCounter = metricGroup.counter("lessThan1KBChunksCount");
+ between1KBAnd5KBChunksCounter = metricGroup.counter("between1KBAnd5KBChunksCount");
+ between5KBAnd10KBChunksCounter = metricGroup.counter("between5KBAnd10KBChunksCount");
+ between10KBAnd100KBChunksCounter = metricGroup.counter("between10KBAnd100KBChunksCount");
+ between100KBAnd1MBChunksCounter = metricGroup.counter("between100KBAnd1MBChunksCount");
+ greaterThan1MBChunksCounter = metricGroup.counter("greaterThan1MBChunksCount");
+ metricGroup.meter("numLessThan1KBFilesOutPerSecond", new MeterView(lessThan1KBChunksCounter));
+ metricGroup.meter("numBetween1KBAnd5KBFilesOutPerSecond", new MeterView(between1KBAnd5KBChunksCounter));
+ metricGroup.meter("numBetween5KBAnd10KBFilesOutPerSecond", new MeterView(between5KBAnd10KBChunksCounter));
+ metricGroup.meter("numBetween10KBAnd100KBFilesOutPerSecond", new MeterView(between10KBAnd100KBChunksCounter));
+ metricGroup.meter("numBetween100KBAnd1MBFilesOutPerSecond", new MeterView(between100KBAnd1MBChunksCounter));
+ metricGroup.meter("numGreaterThan1MBFilesOutPerSecond", new MeterView(greaterThan1MBChunksCounter));
+ lessThan10KBEmlChunksCounter = metricGroup.counter("lessThan10KBEmlChunksCount");
+ between10KBAnd100KBEmlChunksCounter = metricGroup.counter("between10KBAnd100KBEmlChunksCount");
+ between100KBAnd1MBEmlChunksCounter = metricGroup.counter("between100KBAnd1MBEmlChunksCount");
+ between1MBAnd10MBEmlChunksCounter = metricGroup.counter("between1MBAnd10MBEmlChunksCount");
+ greaterThan10MBEmlChunksCounter = metricGroup.counter("greaterThan10MBEmlChunksCount");
+ metricGroup.meter("numLessThan10KBEmlFilesOutPerSecond", new MeterView(lessThan10KBEmlChunksCounter));
+ metricGroup.meter("numBetween10KBAnd100KBEmlFilesOutPerSecond", new MeterView(between10KBAnd100KBEmlChunksCounter));
+ metricGroup.meter("numBetween100KBAnd1MBEmlFilesOutPerSecond", new MeterView(between100KBAnd1MBEmlChunksCounter));
+ metricGroup.meter("numBetween1MBAnd10MBEmlFilesOutPerSecond", new MeterView(between1MBAnd10MBEmlChunksCounter));
+ metricGroup.meter("numGreaterThan10MBEmlFilesOutPerSecond", new MeterView(greaterThan10MBEmlChunksCounter));
+ lessThan10KBTxtChunksCounter = metricGroup.counter("lessThan10KBTxtChunksCount");
+ between10KBAnd100KBTxtChunksCounter = metricGroup.counter("between10KBAnd100KBTxtChunksCount");
+ between100KBAnd1MBTxtChunksCounter = metricGroup.counter("between100KBAnd1MBTxtChunksCount");
+ between1MBAnd10MBTxtChunksCounter = metricGroup.counter("between1MBAnd10MBTxtChunksCount");
+ greaterThan10MBTxtChunksCounter = metricGroup.counter("greaterThan10MBTxtChunksCount");
+ metricGroup.meter("numLessThan10KBTxtChunksOutPerSecond", new MeterView(lessThan10KBTxtChunksCounter));
+ metricGroup.meter("numBetween10KBAnd100KBTxtChunksOutPerSecond", new MeterView(between10KBAnd100KBTxtChunksCounter));
+ metricGroup.meter("numBetween100KBAnd1MBTxtChunksOutPerSecond", new MeterView(between100KBAnd1MBTxtChunksCounter));
+ metricGroup.meter("numBetween1MBAnd10MBTxtChunksOutPerSecond", new MeterView(between1MBAnd10MBTxtChunksCounter));
+ metricGroup.meter("numGreaterThan10MBTxtChunksOutPerSecond", new MeterView(greaterThan10MBTxtChunksCounter));
+ emlChunksCounter = metricGroup.counter("emlChunksCount");
+ txtChunksCounter = metricGroup.counter("txtChunksCount");
+ metricGroup.meter("numEmlChunksOutPerSecond", new MeterView(emlChunksCounter));
+ metricGroup.meter("numTxtChunksOutPerSecond", new MeterView(txtChunksCounter));
+ fileMetasCounter = metricGroup.counter("fileMetasCount");
+ metricGroup.meter("numFileMetasInPerSecond", new MeterView(fileMetasCounter));
+ requestFileMetasCounter = metricGroup.counter("requestFileMetasCount");
+ responseFileMetasCounter = metricGroup.counter("responseFileMetasCount");
+ requestFilesCounter = metricGroup.counter("requestFilesCount");
+ responseFilesCounter = metricGroup.counter("responseFilesCount");
+ metricGroup.meter("numRequestFileMetasInPerSecond", new MeterView(requestFileMetasCounter));
+ metricGroup.meter("numResponseFileMetasInPerSecond", new MeterView(responseFileMetasCounter));
+ metricGroup.meter("numRequestFilesOutPerSecond", new MeterView(requestFilesCounter));
+ metricGroup.meter("numResponseFilesOutPerSecond", new MeterView(responseFilesCounter));
+ errorChunksCounter = metricGroup.counter("errorChunksCount");
+ chunksInCounter = metricGroup.counter("chunksInCount");
+ chunksOutCounter = metricGroup.counter("chunksOutCount");
+ bytesInCounter = metricGroup.counter("bytesInCount");
+ bytesOutCounter = metricGroup.counter("bytesOutCount");
+ metricGroup.meter("numChunksInPerSecond", new MeterView(chunksInCounter));
+ metricGroup.meter("numChunksOutPerSecond", new MeterView(chunksOutCounter));
+ metricGroup.meter("numBytesInPerSecond", new MeterView(bytesInCounter));
+ metricGroup.meter("numBytesOutPerSecond", new MeterView(bytesOutCounter));
+ metricGroup.meter("numErrorChunksPerSecond", new MeterView(errorChunksCounter));
+ completeFilesCounter = metricGroup.counter("completeFilesCount");
+ completeEmlFilesCounter = metricGroup.counter("completeEmlFilesCount");
+ completeTxtFilesCounter = metricGroup.counter("completeTxtFilesCount");
+ completeRequestFilesCounter = metricGroup.counter("completeRequestFilesCount");
+ completeResponseFilesCounter = metricGroup.counter("completeResponseFilesCount");
+ metricGroup.meter("numCompleteFilesOutPerSecond", new MeterView(completeFilesCounter));
+ metricGroup.meter("numCompleteEmlFilesOutPerSecond", new MeterView(completeEmlFilesCounter));
+ metricGroup.meter("numCompleteTxtFilesOutPerSecond", new MeterView(completeTxtFilesCounter));
+ metricGroup.meter("numCompleteRequestFilesOutPerSecond", new MeterView(completeRequestFilesCounter));
+ metricGroup.meter("numCompleteResponseFilesOutPerSecond", new MeterView(completeResponseFilesCounter));
+ nullChunksCounter = metricGroup.counter("nullChunksCount");
+ nullEmlChunksCounter = metricGroup.counter("nullTxtChunksCount");
+ nullTxtChunksCounter = metricGroup.counter("nullEmlChunksCount");
+ metricGroup.meter("numNullFilesOutPerSecond", new MeterView(nullChunksCounter));
+ metricGroup.meter("numNullEmlFilesOutPerSecond", new MeterView(nullEmlChunksCounter));
+ metricGroup.meter("numNullTxtFilesOutPerSecond", new MeterView(nullTxtChunksCounter));
+ }
+
+ @Override
+ public void invoke(FileChunk fileChunk, Context context) {
+ String uuid = fileChunk.getUuid();
+ if (fileChunk.getMeta() != null) { //日志
+ fileMetasCounter.inc();
+ Map<String, Object> meta = fileChunk.getMeta();
+ String fileId = meta.get("fileId").toString();
+ if (fileId.contains("_1")) {
+ requestFileMetasCounter.inc();
+ } else if (fileId.contains("_2")) {
+ responseFileMetasCounter.inc();
+ }
+ FileChunk data = cache.getIfPresent(uuid + "_data");
+ if (data != null) {
+ sendFile(data, meta);
+ cache.invalidate(uuid + "_data");
+ } else {
+ cache.put(fileChunk.getUuid() + "_meta", fileChunk);
+ }
+ } else { //文件
+ chunksInCounter.inc();
+ bytesInCounter.inc(fileChunk.getLength());
+ FileChunk meta = cache.getIfPresent(uuid + "_meta");
+ if (meta != null) {
+ sendFile(fileChunk, meta.getMeta());
+ cache.invalidate(uuid + "_meta");
+ } else {
+ cache.put(fileChunk.getUuid() + "_data", fileChunk);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ IoUtil.close(syncHttpClient);
+ IoUtil.close(asyncHttpClient);
+ caffeineCacheUtil.close();
+ }
+
+ private void sendFile(FileChunk fileChunk, Map<String, Object> metaMap) {
+ String url = "";
+ try {
+ byte[] data;
+ String fileType = fileChunk.getFileType();
+ if (fileChunk.getChunk() != null) {
+ data = fileChunk.getChunk();
+ } else {
+ data = "".getBytes();
+ }
+ String fileId = metaMap != null && metaMap.containsKey("fileId") ? metaMap.get("fileId").toString() : "";
+ String policyId = metaMap != null && metaMap.containsKey("policyId") ? metaMap.get("policyId").toString() : "0";
+ String serverIP = metaMap != null && metaMap.containsKey("serverIP") ? metaMap.get("serverIP").toString() : "";
+ String serverPort = metaMap != null && metaMap.containsKey("serverPort") ? metaMap.get("serverPort").toString() : "";
+ String clientIP = metaMap != null && metaMap.containsKey("clientIP") ? metaMap.get("clientIP").toString() : "";
+ String clientPort = metaMap != null && metaMap.containsKey("clientPort") ? metaMap.get("clientPort").toString() : "";
+ String domain = metaMap != null && metaMap.containsKey("httpHost") ? FormatUtils.getTopPrivateDomain(metaMap.get("httpHost").toString()) : "";
+ String subscriberId = metaMap != null && metaMap.containsKey("subscriberId") ? metaMap.get("subscriberId").toString() : "";
+ String foundTime = metaMap != null && metaMap.containsKey("foundTime") ? metaMap.get("foundTime").toString() : "0";
+ url = URLUtil.normalize(endpointList.get(RandomUtil.randomInt(endpointList.size())) + "/v3/upload?" +
+ "cfg_id=" + policyId +
+ "&file_id=" + fileId +
+ "&file_type=" + fileType +
+ "&found_time=" + foundTime +
+ "&s_ip=" + serverIP +
+ "&s_port=" + serverPort +
+ "&d_ip=" + clientIP +
+ "&d_port=" + clientPort +
+ "&domain=" + domain +
+ "&account=" + subscriberId);
+ HttpPost httpPost = new HttpPost(url);
+ httpPost.setEntity(new ByteArrayEntity(data));
+ executeRequest(httpPost, url);
+ chunksOutCounter.inc();
+ bytesOutCounter.inc(data.length);
+ calculateFileChunkMetrics(fileChunk, fileId);
+ } catch (Exception e) {
+ LOG.error("post file error. current url: " + url, e);
+ errorChunksCounter.inc();
+ }
+ }
+
+ private void executeRequest(HttpPost httpPost, String url) {
+ if (isAsync) {
+ asyncHttpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
+ @Override
+ public void completed(HttpResponse httpResponse) {
+ try {
+ String responseEntity = EntityUtils.toString(httpResponse.getEntity(), "UTF-8");
+ if (httpResponse.getStatusLine().getStatusCode() == 200) {
+ if (!responseEntity.contains("\"code\":200")) {
+ LOG.error("post file error. current url: {}, msg: {}", url, responseEntity);
+ errorChunksCounter.inc();
+ }
+ } else {
+ LOG.error("post file error. current url: {}, code: {}, msg: {}", url, httpResponse.getStatusLine().getStatusCode(), responseEntity);
+ errorChunksCounter.inc();
+ }
+ } catch (IOException e) {
+ LOG.error("post file error. current url: " + url, e);
+ errorChunksCounter.inc();
+ }
+ }
+
+ @Override
+ public void failed(Exception ex) {
+ LOG.error("post file error. current url: " + url, ex);
+ errorChunksCounter.inc();
+ }
+
+ @Override
+ public void cancelled() {
+
+ }
+ });
+ } else {
+ CloseableHttpResponse response = null;
+ try {
+ response = syncHttpClient.execute(httpPost);
+ String responseEntity = EntityUtils.toString(response.getEntity(), "UTF-8");
+ if (response.getStatusLine().getStatusCode() == 200) {
+ if (!responseEntity.contains("\"code\":200")) {
+ LOG.error("post file error. current url: {}, msg: {}", url, responseEntity);
+ errorChunksCounter.inc();
+ }
+ } else {
+ LOG.error("post file error. current url: {}, code: {}, msg: {}", url, response.getStatusLine().getStatusCode(), responseEntity);
+ errorChunksCounter.inc();
+ }
+ } catch (IOException e) {
+ LOG.error("post file error. current url: " + url, e);
+ errorChunksCounter.inc();
+ } finally {
+ IoUtil.close(response);
+ }
+ }
+ }
+
+ private void calculateFileChunkMetrics(FileChunk fileChunk, String fileId) {
+ String fileType = fileChunk.getFileType();
+ long length = fileChunk.getLength();
+ calculateChunkSize(length);
+ if ("eml".equals(fileType)) {
+ emlChunksCounter.inc();
+ calculateEmlChunkSize(length);
+ } else if ("txt".equals(fileType)) {
+ txtChunksCounter.inc();
+ calculateTxtChunkSize(length);
+ }
+ if (fileId.contains("_1")) {
+ requestFilesCounter.inc();
+ } else if (fileId.contains("_2")) {
+ responseFilesCounter.inc();
+ }
+ if (fileChunk.getOffset() == 0 && fileChunk.getLastChunkFlag() == 1) {
+ completeFilesCounter.inc();
+ if ("eml".equals(fileType)) {
+ completeEmlFilesCounter.inc();
+ } else if ("txt".equals(fileType)) {
+ completeTxtFilesCounter.inc();
+ }
+ if (fileChunk.getChunk() == null) {
+ nullChunksCounter.inc();
+ if ("eml".equals(fileType)) {
+ nullEmlChunksCounter.inc();
+ } else if ("txt".equals(fileType)) {
+ nullTxtChunksCounter.inc();
+ }
+ LOG.info("send file data is null. " + fileChunk.toString());
+ }
+ if (fileId.contains("_1")) {
+ completeRequestFilesCounter.inc();
+ } else if (fileId.contains("_2")) {
+ completeResponseFilesCounter.inc();
+ }
+ }
+ }
+
+ private void calculateChunkSize(long length) {
+ if (length <= 1024) {
+ lessThan1KBChunksCounter.inc();
+ } else if (length <= 5 * 1024) {
+ between1KBAnd5KBChunksCounter.inc();
+ } else if (length <= 10 * 1024) {
+ between5KBAnd10KBChunksCounter.inc();
+ } else if (length <= 100 * 1024) {
+ between10KBAnd100KBChunksCounter.inc();
+ } else if (length <= 1024 * 1024) {
+ between100KBAnd1MBChunksCounter.inc();
+ } else {
+ greaterThan1MBChunksCounter.inc();
+ }
+ }
+
+ private void calculateEmlChunkSize(long length) {
+ if (length <= 10 * 1024) {
+ lessThan10KBEmlChunksCounter.inc();
+ } else if (length <= 100 * 1024) {
+ between10KBAnd100KBEmlChunksCounter.inc();
+ } else if (length <= 1024 * 1024) {
+ between100KBAnd1MBEmlChunksCounter.inc();
+ } else if (length <= 10 * 1024 * 1024) {
+ between1MBAnd10MBEmlChunksCounter.inc();
+ } else {
+ greaterThan10MBEmlChunksCounter.inc();
+ }
+ }
+
+ private void calculateTxtChunkSize(long length) {
+ if (length <= 10 * 1024) {
+ lessThan10KBTxtChunksCounter.inc();
+ } else if (length <= 100 * 1024) {
+ between10KBAnd100KBTxtChunksCounter.inc();
+ } else if (length <= 1024 * 1024) {
+ between100KBAnd1MBTxtChunksCounter.inc();
+ } else if (length <= 10 * 1024 * 1024) {
+ between1MBAnd10MBTxtChunksCounter.inc();
+ } else {
+ greaterThan10MBTxtChunksCounter.inc();
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/sink/OssSinkByEhcache.java b/src/main/java/com/zdjizhi/sink/OssSinkByEhcache.java
new file mode 100644
index 0000000..7c4209e
--- /dev/null
+++ b/src/main/java/com/zdjizhi/sink/OssSinkByEhcache.java
@@ -0,0 +1,396 @@
+package com.zdjizhi.sink;
+
+import cn.hutool.core.io.IoUtil;
+import cn.hutool.core.util.RandomUtil;
+import cn.hutool.core.util.URLUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.config.Configs;
+import com.zdjizhi.pojo.FileChunk;
+import com.zdjizhi.utils.EhcacheUtil;
+import com.zdjizhi.utils.FormatUtils;
+import com.zdjizhi.utils.HttpClientUtil;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.util.EntityUtils;
+import org.ehcache.Cache;
+import org.ehcache.CacheManager;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class OssSinkByEhcache extends RichSinkFunction<FileChunk> {
+ private static final Log LOG = LogFactory.get();
+
+ private final Configuration configuration;
+ public transient Counter chunksInCounter;
+ public transient Counter chunksOutCounter;
+ public transient Counter bytesInCounter;
+ public transient Counter bytesOutCounter;
+ public transient Counter errorChunksCounter;
+ public transient Counter fileMetasCounter;
+ public transient Counter requestFileMetasCounter;
+ public transient Counter responseFileMetasCounter;
+ public transient Counter requestFilesCounter;
+ public transient Counter responseFilesCounter;
+ public transient Counter emlChunksCounter;
+ public transient Counter txtChunksCounter;
+ public transient Counter completeFilesCounter;
+ public transient Counter completeEmlFilesCounter;
+ public transient Counter completeTxtFilesCounter;
+ public transient Counter completeRequestFilesCounter;
+ public transient Counter completeResponseFilesCounter;
+ public transient Counter nullChunksCounter;
+ public transient Counter nullTxtChunksCounter;
+ public transient Counter nullEmlChunksCounter;
+ public transient Counter lessThan1KBChunksCounter;
+ public transient Counter between1KBAnd5KBChunksCounter;
+ public transient Counter between5KBAnd10KBChunksCounter;
+ public transient Counter between10KBAnd100KBChunksCounter;
+ public transient Counter between100KBAnd1MBChunksCounter;
+ public transient Counter greaterThan1MBChunksCounter;
+ public transient Counter lessThan10KBEmlChunksCounter;
+ public transient Counter between1MBAnd10MBEmlChunksCounter;
+ public transient Counter between10KBAnd100KBEmlChunksCounter;
+ public transient Counter between100KBAnd1MBEmlChunksCounter;
+ public transient Counter greaterThan10MBEmlChunksCounter;
+ public transient Counter lessThan10KBTxtChunksCounter;
+ public transient Counter between1MBAnd10MBTxtChunksCounter;
+ public transient Counter between10KBAnd100KBTxtChunksCounter;
+ public transient Counter between100KBAnd1MBTxtChunksCounter;
+ public transient Counter greaterThan10MBTxtChunksCounter;
+ private boolean isAsync;
+ private CloseableHttpClient syncHttpClient;
+ private CloseableHttpAsyncClient asyncHttpClient;
+ private List<String> endpointList;
+ private EhcacheUtil ehcacheUtil;
+ private Cache<String, FileChunk> dataCache;
+ private Cache<String, FileChunk> metaCache;
+
+ public OssSinkByEhcache(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup("file_chunk_combiner", "sink_oss");
+ endpointList = Arrays.asList(configuration.get(Configs.SINK_OSS_ENDPOINT).split(","));
+ isAsync = configuration.getBoolean(Configs.SINK_OSS_ASYNC);
+ if (isAsync) {
+ asyncHttpClient = HttpClientUtil.getInstance(configuration).getAsyncHttpClient();
+ asyncHttpClient.start();
+ } else {
+ syncHttpClient = HttpClientUtil.getInstance(configuration).getSyncHttpClient();
+ }
+ ehcacheUtil = EhcacheUtil.getInstance();
+ CacheManager ehcacheManager = EhcacheUtil.getInstance().getEhcacheManager();
+ dataCache = ehcacheManager.getCache("data", String.class, FileChunk.class);
+ metaCache = ehcacheManager.getCache("meta", String.class, FileChunk.class);
+ lessThan1KBChunksCounter = metricGroup.counter("lessThan1KBChunksCount");
+ between1KBAnd5KBChunksCounter = metricGroup.counter("between1KBAnd5KBChunksCount");
+ between5KBAnd10KBChunksCounter = metricGroup.counter("between5KBAnd10KBChunksCount");
+ between10KBAnd100KBChunksCounter = metricGroup.counter("between10KBAnd100KBChunksCount");
+ between100KBAnd1MBChunksCounter = metricGroup.counter("between100KBAnd1MBChunksCount");
+ greaterThan1MBChunksCounter = metricGroup.counter("greaterThan1MBChunksCount");
+ metricGroup.meter("numLessThan1KBFilesOutPerSecond", new MeterView(lessThan1KBChunksCounter));
+ metricGroup.meter("numBetween1KBAnd5KBFilesOutPerSecond", new MeterView(between1KBAnd5KBChunksCounter));
+ metricGroup.meter("numBetween5KBAnd10KBFilesOutPerSecond", new MeterView(between5KBAnd10KBChunksCounter));
+ metricGroup.meter("numBetween10KBAnd100KBFilesOutPerSecond", new MeterView(between10KBAnd100KBChunksCounter));
+ metricGroup.meter("numBetween100KBAnd1MBFilesOutPerSecond", new MeterView(between100KBAnd1MBChunksCounter));
+ metricGroup.meter("numGreaterThan1MBFilesOutPerSecond", new MeterView(greaterThan1MBChunksCounter));
+ lessThan10KBEmlChunksCounter = metricGroup.counter("lessThan10KBEmlChunksCount");
+ between10KBAnd100KBEmlChunksCounter = metricGroup.counter("between10KBAnd100KBEmlChunksCount");
+ between100KBAnd1MBEmlChunksCounter = metricGroup.counter("between100KBAnd1MBEmlChunksCount");
+ between1MBAnd10MBEmlChunksCounter = metricGroup.counter("between1MBAnd10MBEmlChunksCount");
+ greaterThan10MBEmlChunksCounter = metricGroup.counter("greaterThan10MBEmlChunksCount");
+ metricGroup.meter("numLessThan10KBEmlFilesOutPerSecond", new MeterView(lessThan10KBEmlChunksCounter));
+ metricGroup.meter("numBetween10KBAnd100KBEmlFilesOutPerSecond", new MeterView(between10KBAnd100KBEmlChunksCounter));
+ metricGroup.meter("numBetween100KBAnd1MBEmlFilesOutPerSecond", new MeterView(between100KBAnd1MBEmlChunksCounter));
+ metricGroup.meter("numBetween1MBAnd10MBEmlFilesOutPerSecond", new MeterView(between1MBAnd10MBEmlChunksCounter));
+ metricGroup.meter("numGreaterThan10MBEmlFilesOutPerSecond", new MeterView(greaterThan10MBEmlChunksCounter));
+ lessThan10KBTxtChunksCounter = metricGroup.counter("lessThan10KBTxtChunksCount");
+ between10KBAnd100KBTxtChunksCounter = metricGroup.counter("between10KBAnd100KBTxtChunksCount");
+ between100KBAnd1MBTxtChunksCounter = metricGroup.counter("between100KBAnd1MBTxtChunksCount");
+ between1MBAnd10MBTxtChunksCounter = metricGroup.counter("between1MBAnd10MBTxtChunksCount");
+ greaterThan10MBTxtChunksCounter = metricGroup.counter("greaterThan10MBTxtChunksCount");
+ metricGroup.meter("numLessThan10KBTxtChunksOutPerSecond", new MeterView(lessThan10KBTxtChunksCounter));
+ metricGroup.meter("numBetween10KBAnd100KBTxtChunksOutPerSecond", new MeterView(between10KBAnd100KBTxtChunksCounter));
+ metricGroup.meter("numBetween100KBAnd1MBTxtChunksOutPerSecond", new MeterView(between100KBAnd1MBTxtChunksCounter));
+ metricGroup.meter("numBetween1MBAnd10MBTxtChunksOutPerSecond", new MeterView(between1MBAnd10MBTxtChunksCounter));
+ metricGroup.meter("numGreaterThan10MBTxtChunksOutPerSecond", new MeterView(greaterThan10MBTxtChunksCounter));
+ emlChunksCounter = metricGroup.counter("emlChunksCount");
+ txtChunksCounter = metricGroup.counter("txtChunksCount");
+ metricGroup.meter("numEmlChunksOutPerSecond", new MeterView(emlChunksCounter));
+ metricGroup.meter("numTxtChunksOutPerSecond", new MeterView(txtChunksCounter));
+ fileMetasCounter = metricGroup.counter("fileMetasCount");
+ metricGroup.meter("numFileMetasInPerSecond", new MeterView(fileMetasCounter));
+ requestFileMetasCounter = metricGroup.counter("requestFileMetasCount");
+ responseFileMetasCounter = metricGroup.counter("responseFileMetasCount");
+ requestFilesCounter = metricGroup.counter("requestFilesCount");
+ responseFilesCounter = metricGroup.counter("responseFilesCount");
+ metricGroup.meter("numRequestFileMetasInPerSecond", new MeterView(requestFileMetasCounter));
+ metricGroup.meter("numResponseFileMetasInPerSecond", new MeterView(responseFileMetasCounter));
+ metricGroup.meter("numRequestFilesOutPerSecond", new MeterView(requestFilesCounter));
+ metricGroup.meter("numResponseFilesOutPerSecond", new MeterView(responseFilesCounter));
+ errorChunksCounter = metricGroup.counter("errorChunksCount");
+ chunksInCounter = metricGroup.counter("chunksInCount");
+ chunksOutCounter = metricGroup.counter("chunksOutCount");
+ bytesInCounter = metricGroup.counter("bytesInCount");
+ bytesOutCounter = metricGroup.counter("bytesOutCount");
+ metricGroup.meter("numChunksInPerSecond", new MeterView(chunksInCounter));
+ metricGroup.meter("numChunksOutPerSecond", new MeterView(chunksOutCounter));
+ metricGroup.meter("numBytesInPerSecond", new MeterView(bytesInCounter));
+ metricGroup.meter("numBytesOutPerSecond", new MeterView(bytesOutCounter));
+ metricGroup.meter("numErrorChunksPerSecond", new MeterView(errorChunksCounter));
+ completeFilesCounter = metricGroup.counter("completeFilesCount");
+ completeEmlFilesCounter = metricGroup.counter("completeEmlFilesCount");
+ completeTxtFilesCounter = metricGroup.counter("completeTxtFilesCount");
+ completeRequestFilesCounter = metricGroup.counter("completeRequestFilesCount");
+ completeResponseFilesCounter = metricGroup.counter("completeResponseFilesCount");
+ metricGroup.meter("numCompleteFilesOutPerSecond", new MeterView(completeFilesCounter));
+ metricGroup.meter("numCompleteEmlFilesOutPerSecond", new MeterView(completeEmlFilesCounter));
+ metricGroup.meter("numCompleteTxtFilesOutPerSecond", new MeterView(completeTxtFilesCounter));
+ metricGroup.meter("numCompleteRequestFilesOutPerSecond", new MeterView(completeRequestFilesCounter));
+ metricGroup.meter("numCompleteResponseFilesOutPerSecond", new MeterView(completeResponseFilesCounter));
+ nullChunksCounter = metricGroup.counter("nullChunksCount");
+ nullEmlChunksCounter = metricGroup.counter("nullTxtChunksCount");
+ nullTxtChunksCounter = metricGroup.counter("nullEmlChunksCount");
+ metricGroup.meter("numNullFilesOutPerSecond", new MeterView(nullChunksCounter));
+ metricGroup.meter("numNullEmlFilesOutPerSecond", new MeterView(nullEmlChunksCounter));
+ metricGroup.meter("numNullTxtFilesOutPerSecond", new MeterView(nullTxtChunksCounter));
+ }
+
+ @Override
+ public void invoke(FileChunk fileChunk, Context context) {
+ String uuid = fileChunk.getUuid();
+ if (fileChunk.getMeta() != null) { //日志
+ fileMetasCounter.inc();
+ Map<String, Object> meta = fileChunk.getMeta();
+ String fileId = meta.get("fileId").toString();
+ if (fileId.contains("_1")) {
+ requestFileMetasCounter.inc();
+ } else if (fileId.contains("_2")) {
+ responseFileMetasCounter.inc();
+ }
+ FileChunk data = dataCache.get(uuid);
+ if (data != null) {
+ sendFile(data, meta);
+ dataCache.remove(uuid);
+ } else {
+ metaCache.put(fileChunk.getUuid(), fileChunk);
+ }
+ } else { //文件
+ chunksInCounter.inc();
+ bytesInCounter.inc(fileChunk.getLength());
+ FileChunk meta = metaCache.get(uuid);
+ if (meta != null) {
+ sendFile(fileChunk, meta.getMeta());
+ metaCache.remove(uuid);
+ } else {
+ dataCache.put(fileChunk.getUuid(), fileChunk);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ IoUtil.close(syncHttpClient);
+ IoUtil.close(asyncHttpClient);
+ ehcacheUtil.close();
+ }
+
+ private void sendFile(FileChunk fileChunk, Map<String, Object> metaMap) {
+ String url = "";
+ try {
+ byte[] data;
+ String fileType = fileChunk.getFileType();
+ if (fileChunk.getChunk() != null) {
+ data = fileChunk.getChunk();
+ } else {
+ data = "".getBytes();
+ }
+ String fileId = metaMap != null && metaMap.containsKey("fileId") ? metaMap.get("fileId").toString() : "";
+ String policyId = metaMap != null && metaMap.containsKey("policyId") ? metaMap.get("policyId").toString() : "0";
+ String serverIP = metaMap != null && metaMap.containsKey("serverIP") ? metaMap.get("serverIP").toString() : "";
+ String serverPort = metaMap != null && metaMap.containsKey("serverPort") ? metaMap.get("serverPort").toString() : "";
+ String clientIP = metaMap != null && metaMap.containsKey("clientIP") ? metaMap.get("clientIP").toString() : "";
+ String clientPort = metaMap != null && metaMap.containsKey("clientPort") ? metaMap.get("clientPort").toString() : "";
+ String domain = metaMap != null && metaMap.containsKey("httpHost") ? FormatUtils.getTopPrivateDomain(metaMap.get("httpHost").toString()) : "";
+ String subscriberId = metaMap != null && metaMap.containsKey("subscriberId") ? metaMap.get("subscriberId").toString() : "";
+ String foundTime = metaMap != null && metaMap.containsKey("foundTime") ? metaMap.get("foundTime").toString() : "0";
+ url = URLUtil.normalize(endpointList.get(RandomUtil.randomInt(endpointList.size())) + "/v3/upload?" +
+ "cfg_id=" + policyId +
+ "&file_id=" + fileId +
+ "&file_type=" + fileType +
+ "&found_time=" + foundTime +
+ "&s_ip=" + serverIP +
+ "&s_port=" + serverPort +
+ "&d_ip=" + clientIP +
+ "&d_port=" + clientPort +
+ "&domain=" + domain +
+ "&account=" + subscriberId);
+ HttpPost httpPost = new HttpPost(url);
+ httpPost.setEntity(new ByteArrayEntity(data));
+ executeRequest(httpPost, url);
+ chunksOutCounter.inc();
+ bytesOutCounter.inc(data.length);
+ calculateFileChunkMetrics(fileChunk, fileId);
+ } catch (Exception e) {
+ LOG.error("post file error. current url: " + url, e);
+ errorChunksCounter.inc();
+ }
+ }
+
+ private void executeRequest(HttpPost httpPost, String url) {
+ if (isAsync) {
+ asyncHttpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
+ @Override
+ public void completed(HttpResponse httpResponse) {
+ try {
+ String responseEntity = EntityUtils.toString(httpResponse.getEntity(), "UTF-8");
+ if (httpResponse.getStatusLine().getStatusCode() == 200) {
+ if (!responseEntity.contains("\"code\":200")) {
+ LOG.error("post file error. current url: {}, msg: {}", url, responseEntity);
+ errorChunksCounter.inc();
+ }
+ } else {
+ LOG.error("post file error. current url: {}, code: {}, msg: {}", url, httpResponse.getStatusLine().getStatusCode(), responseEntity);
+ errorChunksCounter.inc();
+ }
+ } catch (IOException e) {
+ LOG.error("post file error. current url: " + url, e);
+ errorChunksCounter.inc();
+ }
+ }
+
+ @Override
+ public void failed(Exception ex) {
+ LOG.error("post file error. current url: " + url, ex);
+ errorChunksCounter.inc();
+ }
+
+ @Override
+ public void cancelled() {
+
+ }
+ });
+ } else {
+ CloseableHttpResponse response = null;
+ try {
+ response = syncHttpClient.execute(httpPost);
+ String responseEntity = EntityUtils.toString(response.getEntity(), "UTF-8");
+ if (response.getStatusLine().getStatusCode() == 200) {
+ if (!responseEntity.contains("\"code\":200")) {
+ LOG.error("post file error. current url: {}, msg: {}", url, responseEntity);
+ errorChunksCounter.inc();
+ }
+ } else {
+ LOG.error("post file error. current url: {}, code: {}, msg: {}", url, response.getStatusLine().getStatusCode(), responseEntity);
+ errorChunksCounter.inc();
+ }
+ } catch (IOException e) {
+ LOG.error("post file error. current url: " + url, e);
+ errorChunksCounter.inc();
+ } finally {
+ IoUtil.close(response);
+ }
+ }
+ }
+
+ private void calculateFileChunkMetrics(FileChunk fileChunk, String fileId) {
+ String fileType = fileChunk.getFileType();
+ long length = fileChunk.getLength();
+ calculateChunkSize(length);
+ if ("eml".equals(fileType)) {
+ emlChunksCounter.inc();
+ calculateEmlChunkSize(length);
+ } else if ("txt".equals(fileType)) {
+ txtChunksCounter.inc();
+ calculateTxtChunkSize(length);
+ }
+ if (fileId.contains("_1")) {
+ requestFilesCounter.inc();
+ } else if (fileId.contains("_2")) {
+ responseFilesCounter.inc();
+ }
+ if (fileChunk.getOffset() == 0 && fileChunk.getLastChunkFlag() == 1) {
+ completeFilesCounter.inc();
+ if ("eml".equals(fileType)) {
+ completeEmlFilesCounter.inc();
+ } else if ("txt".equals(fileType)) {
+ completeTxtFilesCounter.inc();
+ }
+ if (fileChunk.getChunk() == null) {
+ nullChunksCounter.inc();
+ if ("eml".equals(fileType)) {
+ nullEmlChunksCounter.inc();
+ } else if ("txt".equals(fileType)) {
+ nullTxtChunksCounter.inc();
+ }
+ LOG.info("send file data is null. " + fileChunk.toString());
+ }
+ if (fileId.contains("_1")) {
+ completeRequestFilesCounter.inc();
+ } else if (fileId.contains("_2")) {
+ completeResponseFilesCounter.inc();
+ }
+ }
+ }
+
+ private void calculateChunkSize(long length) {
+ if (length <= 1024) {
+ lessThan1KBChunksCounter.inc();
+ } else if (length <= 5 * 1024) {
+ between1KBAnd5KBChunksCounter.inc();
+ } else if (length <= 10 * 1024) {
+ between5KBAnd10KBChunksCounter.inc();
+ } else if (length <= 100 * 1024) {
+ between10KBAnd100KBChunksCounter.inc();
+ } else if (length <= 1024 * 1024) {
+ between100KBAnd1MBChunksCounter.inc();
+ } else {
+ greaterThan1MBChunksCounter.inc();
+ }
+ }
+
+ private void calculateEmlChunkSize(long length) {
+ if (length <= 10 * 1024) {
+ lessThan10KBEmlChunksCounter.inc();
+ } else if (length <= 100 * 1024) {
+ between10KBAnd100KBEmlChunksCounter.inc();
+ } else if (length <= 1024 * 1024) {
+ between100KBAnd1MBEmlChunksCounter.inc();
+ } else if (length <= 10 * 1024 * 1024) {
+ between1MBAnd10MBEmlChunksCounter.inc();
+ } else {
+ greaterThan10MBEmlChunksCounter.inc();
+ }
+ }
+
+ private void calculateTxtChunkSize(long length) {
+ if (length <= 10 * 1024) {
+ lessThan10KBTxtChunksCounter.inc();
+ } else if (length <= 100 * 1024) {
+ between10KBAnd100KBTxtChunksCounter.inc();
+ } else if (length <= 1024 * 1024) {
+ between100KBAnd1MBTxtChunksCounter.inc();
+ } else if (length <= 10 * 1024 * 1024) {
+ between1MBAnd10MBTxtChunksCounter.inc();
+ } else {
+ greaterThan10MBTxtChunksCounter.inc();
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/trigger/LastChunkTrigger.java b/src/main/java/com/zdjizhi/trigger/LastChunkTrigger.java
new file mode 100644
index 0000000..58b0d9b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/trigger/LastChunkTrigger.java
@@ -0,0 +1,37 @@
+package com.zdjizhi.trigger;
+
+import com.zdjizhi.pojo.FileChunk;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+public class LastChunkTrigger extends Trigger<Object, TimeWindow> {
+ private static final long serialVersionUID = 1L;
+
+ public static LastChunkTrigger create() {
+ return new LastChunkTrigger();
+ }
+
+ @Override
+ public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
+ if (((FileChunk) element).getLastChunkFlag() == 1) {
+ return TriggerResult.FIRE;
+ } else {
+ return TriggerResult.CONTINUE;
+ }
+ }
+
+ @Override
+ public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
+ return TriggerResult.CONTINUE;
+ }
+
+ @Override
+ public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
+ return TriggerResult.CONTINUE;
+ }
+
+ @Override
+ public void clear(TimeWindow window, TriggerContext ctx) {
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/CaffeineCacheUtil.java b/src/main/java/com/zdjizhi/utils/CaffeineCacheUtil.java
new file mode 100644
index 0000000..75c946e
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/CaffeineCacheUtil.java
@@ -0,0 +1,46 @@
+package com.zdjizhi.utils;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Weigher;
+import com.zdjizhi.config.Configs;
+import com.zdjizhi.pojo.FileChunk;
+import org.apache.flink.configuration.Configuration;
+import org.checkerframework.checker.index.qual.NonNegative;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+import java.util.concurrent.TimeUnit;
+
+public class CaffeineCacheUtil {
+ private static CaffeineCacheUtil caffeineCacheUtil = null;
+ private static Cache<String, FileChunk> caffeineCache = null;
+
+ private CaffeineCacheUtil(Configuration configuration) {
+ caffeineCache = Caffeine.newBuilder()
+// .initialCapacity(configuration.getLong(Configs.FILE_META_CACHE_SIZE))
+ .maximumWeight(configuration.getLong(Configs.FILE_META_CACHE_SIZE))
+ .weigher(new Weigher<String, FileChunk>() {
+ @Override
+ public @NonNegative int weigh(@NonNull String key, @NonNull FileChunk value) {
+ return (int) value.getLength();
+ }
+ })
+ .expireAfterWrite(configuration.get(Configs.FILE_META_CACHE_TIME), TimeUnit.SECONDS)
+ .build();
+ }
+
+ public static synchronized CaffeineCacheUtil getInstance(Configuration configuration) {
+ if (null == caffeineCacheUtil) {
+ caffeineCacheUtil = new CaffeineCacheUtil(configuration);
+ }
+ return caffeineCacheUtil;
+ }
+
+ public Cache<String, FileChunk> getCaffeineCache() {
+ return caffeineCache;
+ }
+
+ public void close() {
+ caffeineCache.cleanUp();
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/EhcacheUtil.java b/src/main/java/com/zdjizhi/utils/EhcacheUtil.java
new file mode 100644
index 0000000..3f28c01
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/EhcacheUtil.java
@@ -0,0 +1,35 @@
+package com.zdjizhi.utils;
+
+import org.ehcache.CacheManager;
+import org.ehcache.config.builders.CacheManagerBuilder;
+import org.ehcache.xml.XmlConfiguration;
+
+import java.net.URL;
+
+public class EhcacheUtil {
+ private static EhcacheUtil ehcacheUtil = null;
+ private static CacheManager cacheManager = null;
+
+ private EhcacheUtil() {
+ URL xmlConfigUrl = this.getClass().getClassLoader().getResource("ehcache.xml");
+ XmlConfiguration xmlConfiguration = new XmlConfiguration(xmlConfigUrl);
+ cacheManager = CacheManagerBuilder.newCacheManager(xmlConfiguration);
+ cacheManager.init();
+ }
+
+ public static synchronized EhcacheUtil getInstance() {
+ if (null == ehcacheUtil) {
+ ehcacheUtil = new EhcacheUtil();
+ }
+ return ehcacheUtil;
+ }
+
+ public CacheManager getEhcacheManager() {
+ return cacheManager;
+ }
+
+ public void close() {
+ cacheManager.close();
+ ehcacheUtil = null;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/HttpClientUtil.java
index 89640ea..cafd193 100644
--- a/src/main/java/com/zdjizhi/utils/HttpClientUtil.java
+++ b/src/main/java/com/zdjizhi/utils/HttpClientUtil.java
@@ -44,15 +44,15 @@ public class HttpClientUtil {
private RequestConfig getRequestConfig() {
return RequestConfig.custom()
- .setConnectTimeout(configuration.get(Configs.SINK_HOS_HTTP_CONNECT_TIMEOUT))
- .setConnectionRequestTimeout(configuration.get(Configs.SINK_HOS_HTTP_REQUEST_TIMEOUT))
- .setSocketTimeout(configuration.get(Configs.SINK_HOS_HTTP_SOCKET_TIMEOUT))
+ .setConnectTimeout(configuration.get(Configs.SINK_HTTP_CONNECT_TIMEOUT))
+ .setConnectionRequestTimeout(configuration.get(Configs.SINK_HTTP_REQUEST_TIMEOUT))
+ .setSocketTimeout(configuration.get(Configs.SINK_HTTP_SOCKET_TIMEOUT))
.build();
}
private HttpRequestRetryHandler getRetryHandler() {
return (exception, executionCount, context) -> {
- if (executionCount >= configuration.get(Configs.SINK_HOS_HTTP_ERROR_RETRY)) {
+ if (executionCount >= configuration.get(Configs.SINK_HTTP_ERROR_RETRY)) {
return false;
}
if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
@@ -106,9 +106,9 @@ public class HttpClientUtil {
// 创建ConnectionManager,添加Connection配置信息
connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
// 设置最大连接数
- connManager.setMaxTotal(configuration.get(Configs.SINK_HOS_HTTP_MAX_TOTAL));
+ connManager.setMaxTotal(configuration.get(Configs.SINK_HTTP_MAX_TOTAL));
// 设置每个连接的路由数
- connManager.setDefaultMaxPerRoute(configuration.get(Configs.SINK_HOS_HTTP_MAX_PER_ROUTE));
+ connManager.setDefaultMaxPerRoute(configuration.get(Configs.SINK_HTTP_MAX_PER_ROUTE));
} catch (KeyManagementException | NoSuchAlgorithmException e) {
throw new RuntimeException(e.getMessage());
}
@@ -146,8 +146,8 @@ public class HttpClientUtil {
.build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
connManager = new PoolingNHttpClientConnectionManager(ioReactor);
- connManager.setMaxTotal(configuration.get(Configs.SINK_HOS_HTTP_MAX_TOTAL));
- connManager.setDefaultMaxPerRoute(configuration.get(Configs.SINK_HOS_HTTP_MAX_PER_ROUTE));
+ connManager.setMaxTotal(configuration.get(Configs.SINK_HTTP_MAX_TOTAL));
+ connManager.setDefaultMaxPerRoute(configuration.get(Configs.SINK_HTTP_MAX_PER_ROUTE));
} catch (IOReactorException e) {
throw new RuntimeException(e.getMessage());
}
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
index e3efb43..d08d918 100644
--- a/src/main/resources/common.properties
+++ b/src/main/resources/common.properties
@@ -1,11 +1,10 @@
flink.job.name=agg_traffic_file_chunk_combine
-#source�������
#9092Ϊ����֤ 9095Ϊssl 9094Ϊsasl
-source.kafka.broker=192.168.40.151:9092,192.168.40.152:9092,192.168.40.203:9092
-source.kafka.group.id=test1
+source.kafka.broker=192.168.41.29:9092
source.kafka.topic=TRAFFIC-FILE-STREAM-RECORD
+source.kafka.group.id=test1
#earliest��ͷ��ʼ latest����
-source.kafka.auto.offset.reset=latest
+source.kafka.auto.offset.reset=earliest
source.kafka.session.timeout.ms=60000
#ÿ����ȡ�����ӷ����л�ȡ������¼��
source.kafka.max.poll.records=1000
@@ -18,40 +17,52 @@ source.kafka.user=admin
source.kafka.pin=galaxy2019
#SSL��Ҫ
source.kafka.tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
-enable.rate.limit=false
-rate.limit.threshold=10000
-rate.limit.exclusion.expression=FileChunk.fileType == "eml"
-file.max.chunk.count=100000
-file.max.size=1073741824
-#�����ֶι��ˣ�java����ʽ
-#filter.expression=FileChunk.fileType == "txt" || FileChunk.fileType == "eml"
+source.kafka.file.meta.session.topic=SESSION-RECORD
+source.kafka.file.meta.proxy.topic=PROXY-EVENT
+source.kafka.file.meta.group.id=file_chunk_combine_1
+map.filter.expression=FileChunk.fileType == "eml" || (FileChunk.offset <= 10737 && FileChunk.fileType != "eml")
+map.parse.file.meta.parallelism=1
#�����������
-combiner.window.parallelism=2
+combiner.window.type=0
+combiner.window.parallelism=1
combiner.window.time=10
+combiner.window.allowed.lateness=10
#�೤ʱ��δд�������򴥷�����
-combiner.window.idle.time=5
-#sink�������
-sink.parallelism=2
+combiner.window.idle.time=10
+combiner.window.enable.last.chunk.trigger=true
+file.max.chunk.count=100000
+file.meta.cache.time=30
+file.meta.cache.size=1073741824
+#file.meta.filter.expression=(FileChunk.meta.sled_ip == "172.18.10.168" && FileChunk.meta.fileId.contains("_9")) || (FileChunk.meta.sled_ip == "172.18.10.168" && FileChunk.meta.duration_ms < 60 && FileChunk.meta.contentLength < 1048576)
+sink.parallelism=1
+#��ѡhos��oss��hbase
sink.type=hos
sink.async=false
sink.batch=false
-sink.batch.count=100
-sink.batch.size=102400
+sink.batch.count=1000
+sink.batch.size=1048576
+#sink.filter.expression=
+#sink.rate.limit.threshold=0
+#sink.rate.limit.exclusion.expression=FileChunk.fileType == "eml"
#hos sink�������
-#0����nginx��1��ѯ����hos��Ĭ��0
-sink.hos.load.balance.mode=1
-#����nginx�򵥸�hos����Ϊip:port�����ʶ��hos������Ϊip1,ip2:port1,port2
+#����nginx�򵥸�hos����Ϊip:port�����ʶ��hos������Ϊip1:port,ip2:port...
sink.hos.endpoint=192.168.41.29:8186
sink.hos.bucket=traffic_file_bucket
sink.hos.token=c21f969b5f03d33d43e04f8f136e7682
-sink.hos.http.error.retry=3
-sink.hos.http.max.total=10
-sink.hos.http.max.per.route=10
-sink.hos.http.connect.timeout=1000
-sink.hos.http.request.timeout=5000
-sink.hos.http.socket.timeout=60000
+#oss sink�������
+#���Զ����ַ��ip1:port,ip2:port...
+sink.oss.endpoint=192.168.41.29:8186
+#sink.oss.filter.expression=FileChunk.offset == 0 && FileChunk.lastChunkFlag == 1
+sink.oss.async=false
+#http �������
+sink.http.error.retry=3
+sink.http.max.total=10
+sink.http.max.per.route=10
+sink.http.connect.timeout=1000
+sink.http.request.timeout=5000
+sink.http.socket.timeout=60000
#hbase sink�������
-sink.hbase.zookeeper=192.168.44.12
+sink.hbase.zookeeper=192.168.41.29
sink.hbase.retries.number=10
sink.hbase.rpc.timeout=600000
sink.hbase.client.write.buffer=10971520
diff --git a/src/main/resources/ehcache.xml b/src/main/resources/ehcache.xml
new file mode 100644
index 0000000..85ee37e
--- /dev/null
+++ b/src/main/resources/ehcache.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<config xmlns="http://www.ehcache.org/v3"
+ xmlns:jsr107="http://www.ehcache.org/v3/jsr107"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.ehcache.org/v3
+ http://www.ehcache.org/schema/ehcache-core-3.10.xsd
+ http://www.ehcache.org/v3/jsr107
+ http://www.ehcache.org/schema/ehcache-107-ext-3.10.xsd">
+
+<!-- <persistence directory="D:\myCache"/>-->
+ <cache alias="data">
+ <key-type>java.lang.String</key-type>
+ <value-type>com.zdjizhi.pojo.FileChunk</value-type>
+ <expiry>
+ <ttl unit="seconds">600</ttl>
+ </expiry>
+ <resources>
+ <heap unit="entries">100000</heap>
+<!-- <offheap unit="GB">15</offheap>-->
+<!-- <disk persistent="true" unit="MB">500</disk>-->
+ </resources>
+ </cache>
+
+ <cache alias="meta">
+ <key-type>java.lang.String</key-type>
+ <value-type>com.zdjizhi.pojo.FileChunk</value-type>
+ <expiry>
+ <ttl unit="seconds">1200</ttl>
+ </expiry>
+ <resources>
+ <heap unit="entries">100000</heap>
+<!-- <offheap unit="GB">5</offheap>-->
+<!-- <disk persistent="true" unit="MB">500</disk>-->
+ </resources>
+ </cache>
+</config> \ No newline at end of file