summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhoujinchuan <[email protected]>2024-01-22 17:33:39 +0800
committerhoujinchuan <[email protected]>2024-01-22 17:33:39 +0800
commite7fc4feb6fee1faed38a71a51cd0ec4fb281099a (patch)
tree0c243d94cd40fccaa9fbbdee2cd3f1913c90d849
parente08ac67b2c418e8b714239cf421bb1d6c177cec7 (diff)
首次提交,24.01版本
-rw-r--r--README.md2
-rw-r--r--pom.xml251
-rw-r--r--src/main/java/com/zdjizhi/FileChunkCombiner.java78
-rw-r--r--src/main/java/com/zdjizhi/config/Configs.java96
-rw-r--r--src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java44
-rw-r--r--src/main/java/com/zdjizhi/function/FileChunkKeySelector.java11
-rw-r--r--src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java79
-rw-r--r--src/main/java/com/zdjizhi/function/SideOutputMapFunction.java34
-rw-r--r--src/main/java/com/zdjizhi/kafka/ByteArrayDeserializationSchema.java21
-rw-r--r--src/main/java/com/zdjizhi/kafka/KafkaConsumer.java47
-rw-r--r--src/main/java/com/zdjizhi/pojo/FileChunk.java159
-rw-r--r--src/main/java/com/zdjizhi/sink/HosSink.java39
-rw-r--r--src/main/java/com/zdjizhi/trigger/LastChunkOrNoDataInTimeTrigger.java72
-rw-r--r--src/main/java/com/zdjizhi/trigger/MultipleTrigger.java67
-rw-r--r--src/main/java/com/zdjizhi/utils/ContentTypeUtil.java368
-rw-r--r--src/main/java/com/zdjizhi/utils/HttpClientUtil.java148
-rw-r--r--src/main/java/com/zdjizhi/utils/KafkaCertUtil.java41
-rw-r--r--src/main/java/com/zdjizhi/utils/PublicUtil.java191
-rw-r--r--src/main/resources/common.properties39
-rw-r--r--src/main/resources/log4j.properties7
-rw-r--r--src/test/data/messagePacksbin0 -> 22205 bytes
-rw-r--r--src/test/data/test.eml218
-rw-r--r--src/test/java/com/zdjizhi/FileChunkCombinerTests.java566
23 files changed, 2577 insertions, 1 deletions
diff --git a/README.md b/README.md
index 5be6ff0..5a8ee0d 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# file-stream-combiner
+# file-chunk-combiner
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..275475a
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,251 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>file-chunk-combiner</artifactId>
+ <version>24.01.18</version>
+
+ <repositories>
+ <repository>
+ <id>nexus</id>
+ <name>Team Nexus Repository</name>
+ <url>http://192.168.40.153:8099/content/groups/public</url>
+ </repository>
+ <repository>
+ <id>maven-ali</id>
+ <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ <checksumPolicy>fail</checksumPolicy>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <properties>
+ <flink.version>1.13.1</flink.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>galaxy</artifactId>
+ <version>1.1.3</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.25</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.25</version>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka_2.12</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-dropwizard</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.msgpack</groupId>
+ <artifactId>msgpack-core</artifactId>
+ <version>0.9.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.msgpack</groupId>
+ <artifactId>jackson-dataformat-msgpack</artifactId>
+ <version>0.9.5</version>
+ </dependency>
+ <dependency>
+ <groupId>cn.hutool</groupId>
+ <artifactId>hutool-all</artifactId>
+ <version>5.8.22</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>2.0.32</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jasypt</groupId>
+ <artifactId>jasypt</artifactId>
+ <version>1.9.3</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>2.21.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.13</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ <forceJavacCompilerUse>true</forceJavacCompilerUse>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.1.1</version>
+ <configuration>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.http</pattern>
+ <shadedPattern>shade.org.apache.http</shadedPattern>
+ </relocation>
+ </relocations>
+ <artifactSet>
+ <excludes>
+ <exclude>com.google.code.findbugs:jsr305</exclude>
+ <exclude>org.slf4j:slf4j-api</exclude>
+ <exclude>org.slf4j:slf4j-jdk14</exclude>
+ <exclude>org.slf4j:slf4j-jcl</exclude>
+ <exclude>org.slf4j:slf4j-nop</exclude>
+ <exclude>org.slf4j:slf4j-simple</exclude>
+ <exclude>org.slf4j:slf4j-reload4j</exclude>
+ <exclude>org.slf4j:slf4j-log4j12</exclude>
+ <exclude>org.slf4j:log4j-over-slf4j</exclude>
+ <exclude>org.slf4j:jcl-over-slf4j</exclude>
+ <exclude>log4j:*</exclude>
+ <exclude>commons-logging:*</exclude>
+ <exclude>ch.qos.logback:*</exclude>
+ <exclude>org.apache.logging.log4j:log4j-api</exclude>
+ <exclude>org.apache.logging.log4j:log4j-core</exclude>
+ <exclude>org.apache.logging.log4j:log4j-slf4j-impl</exclude>
+ <exclude>org.apache.logging.log4j:log4j-1.2-api</exclude>
+ <exclude>org.apache.logging.log4j:log4j-to-slf4j</exclude>
+ </excludes>
+ </artifactSet>
+ </configuration>
+ <executions>
+ <execution>
+ <id>file-chunk-combiner</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>file-chunk-combiner-${version}</finalName>
+ <filters>
+ <filter>
+ <!-- Do not copy the signatures in the META-INF folder.
+ Otherwise, this might cause SecurityExceptions when using the JAR. -->
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>com.zdjizhi.FileChunkCombiner</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>io.github.zlika</groupId>
+ <artifactId>reproducible-build-maven-plugin</artifactId>
+ <version>0.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>strip-jar</goal>
+ </goals>
+ <phase>package</phase>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project> \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/FileChunkCombiner.java b/src/main/java/com/zdjizhi/FileChunkCombiner.java
new file mode 100644
index 0000000..06f7402
--- /dev/null
+++ b/src/main/java/com/zdjizhi/FileChunkCombiner.java
@@ -0,0 +1,78 @@
+package com.zdjizhi;
+
+import com.zdjizhi.config.Configs;
+import com.zdjizhi.function.*;
+import com.zdjizhi.pojo.*;
+import com.zdjizhi.sink.HosSink;
+import com.zdjizhi.kafka.KafkaConsumer;
+import com.zdjizhi.trigger.LastChunkOrNoDataInTimeTrigger;
+import com.zdjizhi.trigger.MultipleTrigger;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.OutputTag;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class FileChunkCombiner extends KafkaConsumer {
+
+ public static void main(String[] args) throws Exception {
+ final ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);
+ final Configuration configuration = parameterTool.getConfiguration();
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+ environment.getConfig().setGlobalJobParameters(configuration);
+
+ WatermarkStrategy<FileChunk> watermarkStrategy = WatermarkStrategy
+ .<FileChunk>forBoundedOutOfOrderness(Duration.ofSeconds(0))
+ .withTimestampAssigner((FileChunk, timestamp) -> FileChunk.getTimestamp() / 1000);
+
+ SingleOutputStreamOperator<FileChunk> parseMessagePackStream = environment
+ .addSource(KafkaConsumer.byteArrayConsumer(configuration))
+ .name("Kafka Source")
+ .map(new ParseMessagePackMapFunction())
+ .name("Map: Parse Message Pack")
+ .filter((FilterFunction<FileChunk>) Objects::nonNull)
+ .assignTimestampsAndWatermarks(watermarkStrategy);
+
+ OutputTag<FileChunk> delayedChunkOutputTag = new OutputTag<FileChunk>("delayed-chunk") {
+ };
+
+ List<Trigger<Object, TimeWindow>> triggers = new ArrayList<>();
+ triggers.add(EventTimeTrigger.create());
+ triggers.add(LastChunkOrNoDataInTimeTrigger.of(configuration.get(Configs.COMBINER_WINDOW_IDLE_TIME) * 1000));
+ Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
+ SingleOutputStreamOperator<FileChunk> windowStream = parseMessagePackStream
+ .keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO)
+ .window(TumblingEventTimeWindows.of(Time.seconds(configuration.get(Configs.COMBINER_WINDOW_TIME))))
+ .trigger(trigger)
+ .sideOutputLateData(delayedChunkOutputTag)
+ .process(new CombineChunkProcessWindowFunction(configuration))
+ .name("Window: Combine Chunk")
+ .setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM))
+ .disableChaining();
+
+ HosSink hosSink = new HosSink(configuration);
+ windowStream.addSink(hosSink)
+ .name("Hos")
+ .setParallelism(configuration.get(Configs.SINK_HOS_PARALLELISM));
+ windowStream.getSideOutput(delayedChunkOutputTag)
+ .map(new SideOutputMapFunction())
+ .addSink(hosSink)
+ .name("Hos Delayed Chunk");
+
+ environment.execute(configuration.get(Configs.FLINK_JOB_NAME));
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/config/Configs.java b/src/main/java/com/zdjizhi/config/Configs.java
new file mode 100644
index 0000000..6d79bb8
--- /dev/null
+++ b/src/main/java/com/zdjizhi/config/Configs.java
@@ -0,0 +1,96 @@
+package com.zdjizhi.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class Configs {
+ public static final ConfigOption<String> FLINK_JOB_NAME = ConfigOptions.key("flink.job.name")
+ .stringType()
+ .defaultValue("FILE-CHUNK-COMBINER")
+ .withDescription("The name of job.");
+
+ public static final ConfigOption<Integer> SOURCE_KAFKA_PARALLELISM = ConfigOptions.key("source.kafka.parallelism")
+ .intType()
+ .defaultValue(1);
+ public static final ConfigOption<String> KAFKA_BROKER = ConfigOptions.key("source.kafka.broker")
+ .stringType()
+ .noDefaultValue();
+ public static final ConfigOption<String> KAFKA_GROUP_ID = ConfigOptions.key("source.kafka.group.id")
+ .stringType()
+ .defaultValue("test1");
+ public static final ConfigOption<String> KAFKA_TOPIC = ConfigOptions.key("source.kafka.topic")
+ .stringType()
+ .noDefaultValue();
+ public static final ConfigOption<Boolean> KAFKA_ENABLE_AUTO_COMMIT = ConfigOptions.key("source.kafka.enable.auto.commit")
+ .booleanType()
+ .defaultValue(true);
+ public static final ConfigOption<String> KAFKA_AUTO_OFFSET_RESET = ConfigOptions.key("source.kafka.auto.offset.reset")
+ .stringType()
+ .defaultValue("latest");
+ public static final ConfigOption<String> KAFKA_SESSION_TIMEOUT_MS = ConfigOptions.key("source.kafka.session.timeout.ms")
+ .stringType()
+ .defaultValue("60000");
+ public static final ConfigOption<String> KAFKA_MAX_POLL_RECORDS = ConfigOptions.key("source.kafka.max.poll.records")
+ .stringType()
+ .defaultValue("1000");
+ public static final ConfigOption<String> KAFKA_MAX_PARTITION_FETCH_BYTES = ConfigOptions.key("source.kafka.max.partition.fetch.bytes")
+ .stringType()
+ .defaultValue("31457280");
+ public static final ConfigOption<String> KAFKA_USER = ConfigOptions.key("source.kafka.user")
+ .stringType()
+ .defaultValue("admin");
+ public static final ConfigOption<String> KAFKA_PIN = ConfigOptions.key("source.kafka.pin")
+ .stringType()
+ .defaultValue("galaxy2019");
+ public static final ConfigOption<String> KAFKA_TOOLS_LIBRARY = ConfigOptions.key("source.kafka.tools.library")
+ .stringType()
+ .noDefaultValue();
+
+ public static final ConfigOption<Integer> PARSE_MESSAGE_PACK_PARALLELISM = ConfigOptions.key("parse.message.pack.parallelism")
+ .intType()
+ .defaultValue(1);
+ public static final ConfigOption<Integer> COMBINER_WINDOW_PARALLELISM = ConfigOptions.key("combiner.window.parallelism")
+ .intType()
+ .defaultValue(1);
+ public static final ConfigOption<Long> COMBINER_WINDOW_TIME = ConfigOptions.key("combiner.window.time")
+ .longType()
+ .defaultValue(5L);
+ public static final ConfigOption<Long> COMBINER_WINDOW_IDLE_TIME = ConfigOptions.key("combiner.window.idle.time")
+ .longType()
+ .defaultValue(5L);
+ public static final ConfigOption<Long> COMBINER_WINDOW_KEY_MAX_CHUNK = ConfigOptions.key("combiner.window.key.max.chunk")
+ .longType()
+ .defaultValue(5L);
+
+ public static final ConfigOption<Integer> SINK_HOS_PARALLELISM = ConfigOptions.key("sink.hos.parallelism")
+ .intType()
+ .defaultValue(1);
+ public static final ConfigOption<String> SINK_HOS_ENDPOINT = ConfigOptions.key("sink.hos.endpoint")
+ .stringType()
+ .noDefaultValue();
+ public static final ConfigOption<String> SINK_HOS_BUCKET = ConfigOptions.key("sink.hos.bucket")
+ .stringType()
+ .noDefaultValue();
+ public static final ConfigOption<String> SINK_HOS_TOKEN = ConfigOptions.key("sink.hos.token")
+ .stringType()
+ .noDefaultValue();
+ public static final ConfigOption<Integer> SINK_HOS_HTTP_MAX_TOTAL = ConfigOptions.key("sink.hos.http.max.total")
+ .intType()
+ .defaultValue(2000);
+ public static final ConfigOption<Integer> SINK_HOS_HTTP_MAX_PER_ROUTE = ConfigOptions.key("sink.hos.http.max.per.route")
+ .intType()
+ .defaultValue(1000);
+ public static final ConfigOption<Integer> SINK_HOS_HTTP_ERROR_RETRY = ConfigOptions.key("sink.hos.http.error.retry")
+ .intType()
+ .defaultValue(3);
+ public static final ConfigOption<Integer> SINK_HOS_HTTP_CONNECT_TIMEOUT = ConfigOptions.key("sink.hos.http.connect.timeout")
+ .intType()
+ .defaultValue(10000);
+ public static final ConfigOption<Integer> SINK_HOS_HTTP_REQUEST_TIMEOUT = ConfigOptions.key("sink.hos.http.request.timeout")
+ .intType()
+ .defaultValue(10000);
+ public static final ConfigOption<Integer> SINK_HOS_HTTP_SOCKET_TIMEOUT = ConfigOptions.key("sink.hos.http.socket.timeout")
+ .intType()
+ .defaultValue(60000);
+
+}
diff --git a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
new file mode 100644
index 0000000..8f0f40d
--- /dev/null
+++ b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
@@ -0,0 +1,44 @@
+package com.zdjizhi.function;
+
+import com.zdjizhi.config.Configs;
+import com.zdjizhi.pojo.FileChunk;
+import com.zdjizhi.utils.PublicUtil;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+
+public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<FileChunk, FileChunk, String, TimeWindow> {
+
+ private transient Counter duplicateChunkCounter;
+ private transient Counter combineErrorCounter;
+ private transient Counter seekChunkCounter;
+ private transient Counter appendChunkCounter;
+ private final Configuration configuration;
+
+ public CombineChunkProcessWindowFunction(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
+ duplicateChunkCounter = metricGroup.counter("duplicateChunkCount");
+ combineErrorCounter = metricGroup.counter("combineErrorCount");
+ seekChunkCounter = metricGroup.counter("seekChunkCount");
+ appendChunkCounter = metricGroup.counter("appendChunkCount");
+ }
+
+ @Override
+ public void process(String string, Context context, Iterable<FileChunk> elements, Collector<FileChunk> out) throws Exception {
+ List<FileChunk> fileChunks = PublicUtil.combine(elements, configuration.get(Configs.COMBINER_WINDOW_KEY_MAX_CHUNK), duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter);
+ for (FileChunk fileChunk : fileChunks) {
+ out.collect(fileChunk);
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/function/FileChunkKeySelector.java b/src/main/java/com/zdjizhi/function/FileChunkKeySelector.java
new file mode 100644
index 0000000..6634c93
--- /dev/null
+++ b/src/main/java/com/zdjizhi/function/FileChunkKeySelector.java
@@ -0,0 +1,11 @@
+package com.zdjizhi.function;
+
+import com.zdjizhi.pojo.FileChunk;
+import org.apache.flink.api.java.functions.KeySelector;
+
+public class FileChunkKeySelector implements KeySelector<FileChunk, String> {
+ @Override
+ public String getKey(FileChunk value) {
+ return value.getUuid();
+ }
+}
diff --git a/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java b/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java
new file mode 100644
index 0000000..e686006
--- /dev/null
+++ b/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java
@@ -0,0 +1,79 @@
+package com.zdjizhi.function;
+
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.pojo.FileChunk;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessageUnpacker;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ParseMessagePackMapFunction extends RichMapFunction<byte[], FileChunk> {
+ private static final Log LOG = LogFactory.get();
+
+ @Override
+ public FileChunk map(byte[] messagePackData) {
+ FileChunk fileChunk;
+ try {
+ fileChunk = new FileChunk();
+ MessageUnpacker messageUnpacker = MessagePack.newDefaultUnpacker(messagePackData);
+ int numFields = messageUnpacker.unpackMapHeader();
+ Map<String, Object> metaMap = new HashMap<>();
+ for (int i = 0; i < numFields; i++) {
+ String fieldName = messageUnpacker.unpackString();
+ switch (fieldName) {
+ case "uuid":
+ fileChunk.setUuid(messageUnpacker.unpackString());
+ break;
+ case "fileName":
+ fileChunk.setFileName(messageUnpacker.unpackString());
+ break;
+ case "fileType":
+ fileChunk.setFileType(messageUnpacker.unpackString());
+ break;
+ case "combineMode":
+ fileChunk.setCombineMode(messageUnpacker.unpackString());
+ break;
+ case "offset":
+ fileChunk.setOffset(messageUnpacker.unpackLong());
+ break;
+ case "length":
+ fileChunk.setLength(messageUnpacker.unpackLong());
+ break;
+ case "lastChunkFlag":
+ fileChunk.setLastChunkFlag(messageUnpacker.unpackInt());
+ break;
+ case "chunk":
+ fileChunk.setChunk(messageUnpacker.readPayload(messageUnpacker.unpackRawStringHeader()));
+ break;
+ case "timestamp":
+ fileChunk.setTimestamp(messageUnpacker.unpackLong());
+ break;
+ case "meta":
+ String meta = messageUnpacker.unpackString();
+ JSONObject metaJsonObject = JSONUtil.parseObj(meta);
+ for (String key : metaJsonObject.keySet()) {
+ metaMap.put(key, metaJsonObject.get(key));
+ }
+ fileChunk.setMeta(metaMap);
+ break;
+ default:
+ messageUnpacker.skipValue();
+ break;
+ }
+ }
+ if ("append".equals(fileChunk.getCombineMode())) {
+ fileChunk.setLastChunkFlag(0);
+ }
+ } catch (Exception e) {
+ LOG.error("Parse messagePack failed.", e);
+ fileChunk = null;
+ }
+ return fileChunk;
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/function/SideOutputMapFunction.java b/src/main/java/com/zdjizhi/function/SideOutputMapFunction.java
new file mode 100644
index 0000000..e948d8a
--- /dev/null
+++ b/src/main/java/com/zdjizhi/function/SideOutputMapFunction.java
@@ -0,0 +1,34 @@
+package com.zdjizhi.function;
+
+import com.zdjizhi.pojo.FileChunk;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+
+public class SideOutputMapFunction extends RichMapFunction<FileChunk, FileChunk> {
+
+ private transient Counter pcapDelayedChunkCounter;
+ private transient Counter trafficDelayedChunkCounter;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
+ pcapDelayedChunkCounter = metricGroup.counter("pcapDelayedChunkCount");
+ trafficDelayedChunkCounter = metricGroup.counter("trafficDelayedChunkCount");
+ }
+
+ @Override
+ public FileChunk map(FileChunk fileChunk) {
+ fileChunk.setChunkCount(1);
+ if ("seek".equals(fileChunk.getCombineMode())) {
+ trafficDelayedChunkCounter.inc();
+ } else {
+ fileChunk.setChunkNumbers(fileChunk.getTimestamp() + "-" + fileChunk.getChunk().length + ";");
+ pcapDelayedChunkCounter.inc();
+ }
+ return fileChunk;
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/kafka/ByteArrayDeserializationSchema.java b/src/main/java/com/zdjizhi/kafka/ByteArrayDeserializationSchema.java
new file mode 100644
index 0000000..be4baa0
--- /dev/null
+++ b/src/main/java/com/zdjizhi/kafka/ByteArrayDeserializationSchema.java
@@ -0,0 +1,21 @@
+package com.zdjizhi.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+public class ByteArrayDeserializationSchema implements DeserializationSchema<byte[]> {
+ @Override
+ public byte[] deserialize(byte[] message) {
+ return message;
+ }
+
+ @Override
+ public boolean isEndOfStream(byte[] nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<byte[]> getProducedType() {
+ return TypeInformation.of(byte[].class);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/kafka/KafkaConsumer.java
new file mode 100644
index 0000000..df1fd32
--- /dev/null
+++ b/src/main/java/com/zdjizhi/kafka/KafkaConsumer.java
@@ -0,0 +1,47 @@
+package com.zdjizhi.kafka;
+
+import com.zdjizhi.config.Configs;
+import com.zdjizhi.utils.KafkaCertUtil;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+
+import java.util.*;
+
+public abstract class KafkaConsumer extends ByteArrayDeserializationSchema {
+
+ private static Properties createConsumerConfig(Configuration configuration) {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", configuration.get(Configs.KAFKA_BROKER));
+ properties.put("group.id", configuration.get(Configs.KAFKA_GROUP_ID));
+ properties.put("session.timeout.ms", configuration.get(Configs.KAFKA_SESSION_TIMEOUT_MS));
+ properties.put("max.poll.records", configuration.get(Configs.KAFKA_MAX_POLL_RECORDS));
+ properties.put("max.partition.fetch.bytes", configuration.get(Configs.KAFKA_MAX_PARTITION_FETCH_BYTES));
+ properties.put("partition.discovery.interval.ms", "10000");
+ properties.put("auto.offset.reset", configuration.get(Configs.KAFKA_AUTO_OFFSET_RESET));
+ properties.put("enable.auto.commit", configuration.get(Configs.KAFKA_ENABLE_AUTO_COMMIT));
+ KafkaCertUtil.chooseCert(properties, configuration);
+ return properties;
+ }
+
+ public static FlinkKafkaConsumer<String> stringConsumer(Configuration configuration) {
+ FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(configuration.get(Configs.KAFKA_TOPIC),
+ new SimpleStringSchema(), createConsumerConfig(configuration));
+ //随着checkpoint提交,将offset提交到kafka
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
+ //从消费组当前的offset开始消费
+ kafkaConsumer.setStartFromGroupOffsets();
+ return kafkaConsumer;
+ }
+
+ public static FlinkKafkaConsumer<byte[]> byteArrayConsumer(Configuration configuration) {
+ FlinkKafkaConsumer<byte[]> kafkaConsumer = new FlinkKafkaConsumer<>(configuration.get(Configs.KAFKA_TOPIC),
+ new ByteArrayDeserializationSchema(), createConsumerConfig(configuration));
+ //随着checkpoint提交,将offset提交到kafka
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
+ //从消费组当前的offset开始消费
+ kafkaConsumer.setStartFromGroupOffsets();
+ return kafkaConsumer;
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/pojo/FileChunk.java b/src/main/java/com/zdjizhi/pojo/FileChunk.java
new file mode 100644
index 0000000..99076f8
--- /dev/null
+++ b/src/main/java/com/zdjizhi/pojo/FileChunk.java
@@ -0,0 +1,159 @@
+package com.zdjizhi.pojo;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+
+public class FileChunk implements Serializable {
+ private String uuid;
+ private String fileName;
+ private String fileType;
+ private long offset;
+ private long length;
+ private byte[] chunk;
+ private String combineMode;
+ private int lastChunkFlag;
+ private int chunkCount;
+ private long timestamp;
+ private Map<String, Object> meta;
+ private String chunkNumbers;
+
+ public FileChunk() {
+ }
+
+ public String getChunkNumbers() {
+ return chunkNumbers;
+ }
+
+ public void setChunkNumbers(String chunkNumbers) {
+ this.chunkNumbers = chunkNumbers;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public Map<String, Object> getMeta() {
+ return meta;
+ }
+
+ public void setMeta(Map<String, Object> meta) {
+ this.meta = meta;
+ }
+
+ public int getChunkCount() {
+ return chunkCount;
+ }
+
+ public void setChunkCount(int chunkCount) {
+ this.chunkCount = chunkCount;
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public void setFileName(String fileName) {
+ this.fileName = fileName;
+ }
+
+ public String getFileType() {
+ return fileType;
+ }
+
+ public void setFileType(String fileType) {
+ this.fileType = fileType;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ public void setLength(long length) {
+ this.length = length;
+ }
+
+ public byte[] getChunk() {
+ return chunk;
+ }
+
+ public void setChunk(byte[] chunk) {
+ this.chunk = chunk;
+ }
+
+ public String getCombineMode() {
+ return combineMode;
+ }
+
+ public void setCombineMode(String combineMode) {
+ this.combineMode = combineMode;
+ }
+
+ public int getLastChunkFlag() {
+ return lastChunkFlag;
+ }
+
+ public void setLastChunkFlag(int lastChunkFlag) {
+ this.lastChunkFlag = lastChunkFlag;
+ }
+
+ @Override
+ public String toString() {
+ return "FileChunk{" +
+ "uuid='" + uuid + '\'' +
+ ", fileName='" + fileName + '\'' +
+ ", fileType='" + fileType + '\'' +
+ ", offset=" + offset +
+ ", length=" + length +
+ ", combineMode='" + combineMode + '\'' +
+ ", lastChunkFlag=" + lastChunkFlag +
+ ", chunkCount=" + chunkCount +
+ ", timestamp=" + timestamp +
+ ", meta=" + meta +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ FileChunk fileChunk = (FileChunk) o;
+ return offset == fileChunk.offset &&
+ length == fileChunk.length &&
+ lastChunkFlag == fileChunk.lastChunkFlag &&
+ chunkCount == fileChunk.chunkCount &&
+ Objects.equals(uuid, fileChunk.uuid) &&
+ Objects.equals(fileName, fileChunk.fileName) &&
+ Objects.equals(fileType, fileChunk.fileType) &&
+ Arrays.equals(chunk, fileChunk.chunk) &&
+ Objects.equals(combineMode, fileChunk.combineMode);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(uuid, fileName, fileType, offset, length, combineMode, lastChunkFlag, chunkCount);
+ result = 31 * result + Arrays.hashCode(chunk);
+ return result;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/sink/HosSink.java b/src/main/java/com/zdjizhi/sink/HosSink.java
new file mode 100644
index 0000000..f49a43b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/sink/HosSink.java
@@ -0,0 +1,39 @@
+package com.zdjizhi.sink;
+
+import com.zdjizhi.pojo.FileChunk;
+import com.zdjizhi.utils.HttpClientUtil;
+import com.zdjizhi.utils.PublicUtil;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import java.io.IOException;
+
+public class HosSink extends RichSinkFunction<FileChunk> {
+
+ private final Configuration configuration;
+ private transient Counter sendHosErrorCounter;
+
+ public HosSink(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
+ sendHosErrorCounter = metricGroup.counter("sendHosErrorCount");
+ }
+
+ @Override
+ public void invoke(FileChunk fileChunk, Context context) {
+ PublicUtil.sendToHos(fileChunk, configuration, sendHosErrorCounter);
+ }
+
+ @Override
+ public void close() throws IOException {
+ HttpClientUtil.getInstance(null).close();
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/trigger/LastChunkOrNoDataInTimeTrigger.java b/src/main/java/com/zdjizhi/trigger/LastChunkOrNoDataInTimeTrigger.java
new file mode 100644
index 0000000..2605c88
--- /dev/null
+++ b/src/main/java/com/zdjizhi/trigger/LastChunkOrNoDataInTimeTrigger.java
@@ -0,0 +1,72 @@
+package com.zdjizhi.trigger;
+
+import com.zdjizhi.pojo.FileChunk;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+public class LastChunkOrNoDataInTimeTrigger<W extends TimeWindow> extends Trigger<Object, TimeWindow> {
+ private static final long serialVersionUID = 1L;
+
+ private final long maxIdleTime;
+
+ private LastChunkOrNoDataInTimeTrigger(long maxIdleTime) {
+ this.maxIdleTime = maxIdleTime;
+ }
+
+ public static <W extends TimeWindow> LastChunkOrNoDataInTimeTrigger<TimeWindow> of(long maxIdleTime) {
+ return new LastChunkOrNoDataInTimeTrigger<>(maxIdleTime);
+ }
+
+ private final ReducingStateDescriptor<Long> processingTimeStateDesc =
+ new ReducingStateDescriptor<>("processTimer", new ReduceMax(), LongSerializer.INSTANCE);
+
+ @Override
+ public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
+ if (((FileChunk) element).getLastChunkFlag() == 1) {
+ return TriggerResult.FIRE;
+ } else {
+ ReducingState<Long> fireTimestamp = ctx.getPartitionedState(processingTimeStateDesc);
+ fireTimestamp.clear();
+ long nextFireTimestamp = ctx.getCurrentProcessingTime() + maxIdleTime;
+ ctx.registerProcessingTimeTimer(nextFireTimestamp);
+ fireTimestamp.add(nextFireTimestamp);
+ return TriggerResult.CONTINUE;
+ }
+
+ }
+
+ @Override
+ public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
+ ReducingState<Long> fireTimestamp = ctx.getPartitionedState(processingTimeStateDesc);
+ if (fireTimestamp.get() != null && fireTimestamp.get() == time) {
+ fireTimestamp.clear();
+ return TriggerResult.FIRE;
+ }
+ return TriggerResult.CONTINUE;
+ }
+
+ @Override
+ public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
+ return TriggerResult.CONTINUE;
+ }
+
+ @Override
+ public void clear(TimeWindow window, TriggerContext ctx) {
+ ReducingState<Long> fireTimestamp = ctx.getPartitionedState(processingTimeStateDesc);
+ fireTimestamp.clear();
+ }
+
+ private static class ReduceMax implements ReduceFunction<Long> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Long reduce(Long value1, Long value2) {
+ return Math.max(value1, value2);
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/trigger/MultipleTrigger.java b/src/main/java/com/zdjizhi/trigger/MultipleTrigger.java
new file mode 100644
index 0000000..22eefe8
--- /dev/null
+++ b/src/main/java/com/zdjizhi/trigger/MultipleTrigger.java
@@ -0,0 +1,67 @@
+package com.zdjizhi.trigger;
+
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+import java.util.List;
+
+public class MultipleTrigger<T, W extends Window> extends Trigger<T, W> {
+ private static final long serialVersionUID = 1L;
+
+ private final List<Trigger<T, W>> triggers;
+
+ private MultipleTrigger(List<Trigger<T, W>> triggers) {
+ this.triggers = triggers;
+ }
+
+ public static <T, W extends Window> MultipleTrigger<T, W> of(List<Trigger<T, W>> triggers) {
+ return new MultipleTrigger<>(triggers);
+ }
+
+ @Override
+ public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
+ TriggerResult result = TriggerResult.CONTINUE;
+ for (Trigger<T, W> trigger : triggers) {
+ TriggerResult triggerResult = trigger.onElement(element, timestamp, window, ctx);
+ if (triggerResult == TriggerResult.FIRE) {
+ result = TriggerResult.FIRE_AND_PURGE;
+ break;
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
+ TriggerResult result = TriggerResult.CONTINUE;
+ for (Trigger<T, W> trigger : triggers) {
+ TriggerResult triggerResult = trigger.onProcessingTime(time, window, ctx);
+ if (triggerResult == TriggerResult.FIRE) {
+ result = TriggerResult.FIRE_AND_PURGE;
+ break;
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
+ TriggerResult result = TriggerResult.CONTINUE;
+ for (Trigger<T, W> trigger : triggers) {
+ TriggerResult triggerResult = trigger.onEventTime(time, window, ctx);
+ if (triggerResult == TriggerResult.FIRE) {
+ result = TriggerResult.FIRE_AND_PURGE;
+ break;
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void clear(W window, TriggerContext ctx) throws Exception {
+ for (Trigger<T, W> trigger : triggers) {
+ trigger.clear(window, ctx);
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/ContentTypeUtil.java b/src/main/java/com/zdjizhi/utils/ContentTypeUtil.java
new file mode 100644
index 0000000..2b1747b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/ContentTypeUtil.java
@@ -0,0 +1,368 @@
+package com.zdjizhi.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ContentTypeUtil {
+
+ private static Map<String, String> map = new HashMap<>();
+
+ static {
+ map.put("anno", "application/octet-stream");
+ map.put("0.001", "application/x-001");
+ map.put("0.301", "application/x-301");
+ map.put("0.323", "text/h323");
+ map.put("0.906", "application/x-906");
+ map.put("0.907", "drawing/907");
+ map.put("a11", "application/x-a11");
+ map.put("acp", "audio/x-mei-aac");
+ map.put("ai", "application/postscript");
+ map.put("aif", "audio/aiff");
+ map.put("aifc", "audio/aiff");
+ map.put("aiff", "audio/aiff");
+ map.put("anv", "application/x-anv");
+ map.put("asa", "text/asa");
+ map.put("asf", "video/x-ms-asf");
+ map.put("asp", "text/asp");
+ map.put("asx", "video/x-ms-asf");
+ map.put("au", "audio/basic");
+ map.put("avi", "video/avi");
+ map.put("awf", "application/vnd.adobe.workflow");
+ map.put("biz", "text/xml");
+ map.put("bmp", "application/x-bmp");
+ map.put("bot", "application/x-bot");
+ map.put("c4t", "application/x-c4t");
+ map.put("c90", "application/x-c90");
+ map.put("cal", "application/x-cals");
+ map.put("cat", "application/vnd.ms-pki.seccat");
+ map.put("cdf", "application/x-netcdf");
+ map.put("cdr", "application/x-cdr");
+ map.put("cel", "application/x-cel");
+ map.put("cer", "application/x-x509-ca-cert");
+ map.put("cg4", "application/x-g4");
+ map.put("cgm", "application/x-cgm");
+ map.put("cit", "application/x-cit");
+ map.put("class", "java/");
+ map.put("cml", "text/xml");
+ map.put("cmp", "application/x-cmp");
+ map.put("cmx", "application/x-cmx");
+ map.put("cot", "application/x-cot");
+ map.put("crl", "application/pkix-crl");
+ map.put("crt", "application/x-x509-ca-cert");
+ map.put("csi", "application/x-csi");
+ map.put("css", "text/css");
+ map.put("cut", "application/x-cut");
+ map.put("dbf", "application/x-dbf");
+ map.put("dbm", "application/x-dbm");
+ map.put("dbx", "application/x-dbx");
+ map.put("dcd", "text/xml");
+ map.put("dcx", "application/x-dcx");
+ map.put("der", "application/x-x509-ca-cert");
+ map.put("dgn", "application/x-dgn");
+ map.put("dib", "application/x-dib");
+ map.put("dll", "application/x-msdownload");
+ map.put("doc", "application/msword");
+ map.put("docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document");
+ map.put("dot", "application/msword");
+ map.put("drw", "application/x-drw");
+ map.put("dtd", "text/xml");
+ map.put("dwf", "Model/vnd.dwf");
+ //map.put("dwf","application/x-dwf");
+ map.put("dwg", "application/x-dwg");
+ map.put("dxb", "application/x-dxb");
+ map.put("dxf", "application/x-dxf");
+ map.put("edn", "application/vnd.adobe.edn");
+ map.put("emf", "application/x-emf");
+ map.put("eml", "message/rfc822");
+ map.put("ent", "text/xml");
+ map.put("epi", "application/x-epi");
+ // map.put("eps","application/x-ps");
+ map.put("eps", "application/postscript");
+ map.put("etd", "application/x-ebx");
+ map.put("exe", "application/x-msdownload");
+ map.put("fax", "image/fax");
+ map.put("fdf", "application/vnd.fdf");
+ map.put("fif", "application/fractals");
+ map.put("fo", "text/xml");
+ map.put("frm", "application/x-frm");
+ map.put("g4", "application/x-g4");
+ map.put("gbr", "application/x-gbr");
+ map.put("", "application/x-");
+ map.put("gif", "image/gif");
+ map.put("gl2", "application/x-gl2");
+ map.put("gp4", "application/x-gp4");
+ map.put("hgl", "application/x-hgl");
+ map.put("hmr", "application/x-hmr");
+ map.put("hpg", "application/x-hpgl");
+ map.put("hpl", "application/x-hpl");
+ map.put("hqx", "application/mac-binhex40");
+ map.put("hrf", "application/x-hrf");
+ map.put("hta", "application/hta");
+ map.put("htc", "text/x-component");
+ map.put("htm", "text/html");
+ map.put("html", "text/html");
+ map.put("htt", "text/webviewhtml");
+ map.put("htx", "text/html");
+ map.put("icb", "application/x-icb");
+ map.put("ico", "image/x-icon");
+ //map.put("ico","application/x-ico");
+ map.put("iff", "application/x-iff");
+ map.put("ig4", "application/x-g4");
+ map.put("igs", "application/x-igs");
+ map.put("iii", "application/x-iphone");
+ map.put("img", "application/x-img");
+ map.put("ins", "application/x-internet-signup");
+ map.put("isp", "application/x-internet-signup");
+ map.put("IVF", "video/x-ivf");
+ map.put("java", "java/*");
+ map.put("jfif", "image/jpeg");
+ map.put("jpe", "image/jpeg");
+// map.put("jpe", "application/x-jpe");
+ map.put("jpeg", "image/jpeg");
+ map.put("jpg", "image/jpeg");
+ //map.put("jpg","application/x-jpg");
+ map.put("js", "application/x-javascript");
+ map.put("jsp", "text/html");
+ map.put("la1", "audio/x-liquid-file");
+ map.put("lar", "application/x-laplayer-reg");
+ map.put("latex", "application/x-latex");
+ map.put("lavs", "audio/x-liquid-secure");
+ map.put("lbm", "application/x-lbm");
+ map.put("lmsff", "audio/x-la-lms");
+ map.put("ls", "application/x-javascript");
+ map.put("ltr", "application/x-ltr");
+ map.put("m1v", "video/x-mpeg");
+ map.put("m2v", "video/x-mpeg");
+ map.put("m3u", "audio/mpegurl");
+ map.put("m4e", "video/mpeg4");
+ map.put("mac", "application/x-mac");
+ map.put("man", "application/x-troff-man");
+ map.put("math", "text/xml");
+ //map.put("mdb","application/msaccess");
+ map.put("mdb", "application/x-mdb");
+ map.put("mfp", "application/x-shockwave-flash");
+ map.put("mht", "message/rfc822");
+ map.put("mhtml", "message/rfc822");
+ map.put("mi", "application/x-mi");
+ map.put("mid", "audio/mid");
+ map.put("midi", "audio/mid");
+ map.put("mil", "application/x-mil");
+ map.put("mml", "text/xml");
+ map.put("mnd", "audio/x-musicnet-download");
+ map.put("mns", "audio/x-musicnet-stream");
+ map.put("mocha", "application/x-javascript");
+ map.put("movie", "video/x-sgi-movie");
+ map.put("mp1", "audio/mp1");
+ map.put("mp2", "audio/mp2");
+ map.put("mp2v", "video/mpeg");
+ map.put("mp3", "audio/mp3");
+ map.put("mp4", "video/mpeg4");
+ map.put("mpa", "video/x-mpg");
+ map.put("mpd", "application/vnd.ms-project");
+ map.put("mpe", "video/x-mpeg");
+ map.put("mpeg", "video/mpg");
+ map.put("mpg", "video/mpg");
+ map.put("mpga", "audio/rn-mpeg");
+ map.put("mpp", "application/vnd.ms-project");
+ map.put("mps", "video/x-mpeg");
+ map.put("mpt", "application/vnd.ms-project");
+ map.put("mpv", "video/mpg");
+ map.put("mpv2", "video/mpeg");
+ map.put("mpw", "application/vnd.ms-project");
+ map.put("mpx", "application/vnd.ms-project");
+ map.put("mtx", "text/xml");
+ map.put("mxp", "application/x-mmxp");
+ map.put("net", "image/pnetvue");
+ map.put("nrf", "application/x-nrf");
+ map.put("nws", "message/rfc822");
+ map.put("odc", "text/x-ms-odc");
+ map.put("out", "application/x-out");
+ map.put("p10", "application/pkcs10");
+ map.put("p12", "application/x-pkcs12");
+ map.put("p7b", "application/x-pkcs7-certificates");
+ map.put("p7c", "application/pkcs7-mime");
+ map.put("p7m", "application/pkcs7-mime");
+ map.put("p7r", "application/x-pkcs7-certreqresp");
+ map.put("p7s", "application/pkcs7-signature");
+ map.put("pc5", "application/x-pc5");
+ map.put("pci", "application/x-pci");
+ map.put("pcl", "application/x-pcl");
+ map.put("pcx", "application/x-pcx");
+ map.put("pdf", "application/pdf");
+ map.put("pdx", "application/vnd.adobe.pdx");
+ map.put("pfx", "application/x-pkcs12");
+ map.put("pgl", "application/x-pgl");
+ map.put("pic", "application/x-pic");
+ map.put("pko", "application/vnd.ms-pki.pko");
+ map.put("pl", "application/x-perl");
+ map.put("plg", "text/html");
+ map.put("pls", "audio/scpls");
+ map.put("plt", "application/x-plt");
+ map.put("png", "image/png");
+ // map.put("png","application/x-png");
+ map.put("pot", "application/vnd.ms-powerpoint");
+ map.put("ppa", "application/vnd.ms-powerpoint");
+ map.put("ppm", "application/x-ppm");
+ map.put("pps", "application/vnd.ms-powerpoint");
+ map.put("ppt", "application/vnd.ms-powerpoint");
+ // map.put("ppt","application/x-ppt");
+ map.put("pr", "application/x-pr");
+ map.put("prf", "application/pics-rules");
+ map.put("prn", "application/x-prn");
+ map.put("prt", "application/x-prt");
+ map.put("ps", "application/x-ps");
+// map.put("ps", "application/postscript");
+ map.put("ptn", "application/x-ptn");
+ map.put("pwz", "application/vnd.ms-powerpoint");
+ map.put("r3t", "text/vnd.rn-realtext3d");
+ map.put("ra", "audio/vnd.rn-realaudio");
+ map.put("ram", "audio/x-pn-realaudio");
+ map.put("ras", "application/x-ras");
+ map.put("rat", "application/rat-file");
+ map.put("rdf", "text/xml");
+ map.put("rec", "application/vnd.rn-recording");
+ map.put("red", "application/x-red");
+ map.put("rgb", "application/x-rgb");
+ map.put("rjs", "application/vnd.rn-realsystem-rjs");
+ map.put("rjt", "application/vnd.rn-realsystem-rjt");
+ map.put("rlc", "application/x-rlc");
+ map.put("rle", "application/x-rle");
+ map.put("rm", "application/vnd.rn-realmedia");
+ map.put("rmf", "application/vnd.adobe.rmf");
+ map.put("rmi", "audio/mid");
+ map.put("rmj", "application/vnd.rn-realsystem-rmj");
+ map.put("rmm", "audio/x-pn-realaudio");
+ map.put("rmp", "application/vnd.rn-rn_music_package");
+ map.put("rms", "application/vnd.rn-realmedia-secure");
+ map.put("rmvb", "application/vnd.rn-realmedia-vbr");
+ map.put("rmx", "application/vnd.rn-realsystem-rmx");
+ map.put("rnx", "application/vnd.rn-realplayer");
+ map.put("rp", "image/vnd.rn-realpix");
+ map.put("rpm", "audio/x-pn-realaudio-plugin");
+ map.put("rsml", "application/vnd.rn-rsml");
+ map.put("rt", "text/vnd.rn-realtext");
+ map.put("rtf", "application/msword");
+ //map.put("rtf","application/x-rtf");
+ map.put("rv", "video/vnd.rn-realvideo");
+ map.put("sam", "application/x-sam");
+ map.put("sat", "application/x-sat");
+ map.put("sdp", "application/sdp");
+ map.put("sdw", "application/x-sdw");
+ map.put("sit", "application/x-stuffit");
+ map.put("slb", "application/x-slb");
+ map.put("sld", "application/x-sld");
+ map.put("slk", "drawing/x-slk");
+ map.put("smi", "application/smil");
+ map.put("smil", "application/smil");
+ map.put("smk", "application/x-smk");
+ map.put("snd", "audio/basic");
+ map.put("sol", "text/plain");
+ map.put("sor", "text/plain");
+ map.put("spc", "application/x-pkcs7-certificates");
+ map.put("spl", "application/futuresplash");
+ map.put("spp", "text/xml");
+ map.put("ssm", "application/streamingmedia");
+ map.put("sst", "application/vnd.ms-pki.certstore");
+ map.put("stl", "application/vnd.ms-pki.stl");
+ map.put("stm", "text/html");
+ map.put("sty", "application/x-sty");
+ map.put("svg", "text/xml");
+ map.put("swf", "application/x-shockwave-flash");
+ map.put("tdf", "application/x-tdf");
+ map.put("tg4", "application/x-tg4");
+ map.put("tga", "application/x-tga");
+ map.put("tif", "image/tiff");
+// map.put("tif", "application/x-tif");
+ map.put("tld", "text/xml");
+ map.put("top", "drawing/x-top");
+ map.put("torrent", "application/x-bittorrent");
+ map.put("tsd", "text/xml");
+ map.put("txt", "text/plain");
+ map.put("uin", "application/x-icq");
+ map.put("uls", "text/iuls");
+ map.put("vcf", "text/x-vcard");
+ map.put("vda", "application/x-vda");
+ map.put("vdx", "application/vnd.visio");
+ map.put("vml", "text/xml");
+ map.put("vpg", "application/x-vpeg005");
+ map.put("vsd", "application/vnd.visio");
+// map.put("vsd", "application/x-vsd");
+ map.put("vss", "application/vnd.visio");
+ map.put("vst", "application/vnd.visio");
+// map.put("vst", "application/x-vst");
+ map.put("vsw", "application/vnd.visio");
+ map.put("vsx", "application/vnd.visio");
+ map.put("vtx", "application/vnd.visio");
+ map.put("vxml", "text/xml");
+ map.put("wav", "audio/wav");
+ map.put("wax", "audio/x-ms-wax");
+ map.put("wb1", "application/x-wb1");
+ map.put("wb2", "application/x-wb2");
+ map.put("wb3", "application/x-wb3");
+ map.put("wbmp", "image/vnd.wap.wbmp");
+ map.put("wiz", "application/msword");
+ map.put("wk3", "application/x-wk3");
+ map.put("wk4", "application/x-wk4");
+ map.put("wkq", "application/x-wkq");
+ map.put("wks", "application/x-wks");
+ map.put("wm", "video/x-ms-wm");
+ map.put("wma", "audio/x-ms-wma");
+ map.put("wmd", "application/x-ms-wmd");
+ map.put("wmf", "application/x-wmf");
+ map.put("wml", "text/vnd.wap.wml");
+ map.put("wmv", "video/x-ms-wmv");
+ map.put("wmx", "video/x-ms-wmx");
+ map.put("wmz", "application/x-ms-wmz");
+ map.put("wp6", "application/x-wp6");
+ map.put("wpd", "application/x-wpd");
+ map.put("wpg", "application/x-wpg");
+ map.put("wpl", "application/vnd.ms-wpl");
+ map.put("wq1", "application/x-wq1");
+ map.put("wr1", "application/x-wr1");
+ map.put("wri", "application/x-wri");
+ map.put("wrk", "application/x-wrk");
+ map.put("ws", "application/x-ws");
+ map.put("ws2", "application/x-ws");
+ map.put("wsc", "text/scriptlet");
+ map.put("wsdl", "text/xml");
+ map.put("wvx", "video/x-ms-wvx");
+ map.put("xdp", "application/vnd.adobe.xdp");
+ map.put("xdr", "text/xml");
+ map.put("xfd", "application/vnd.adobe.xfd");
+ map.put("xfdf", "application/vnd.adobe.xfdf");
+ map.put("xhtml", "text/html");
+ map.put("xls", "application/vnd.ms-excel");
+ // map.put("xls","application/x-xls");
+ map.put("xlw", "application/x-xlw");
+ map.put("xml", "text/xml");
+ map.put("xpl", "audio/scpls");
+ map.put("xq", "text/xml");
+ map.put("xql", "text/xml");
+ map.put("xquery", "text/xml");
+ map.put("xsd", "text/xml");
+ map.put("xsl", "text/xml");
+ map.put("xslt", "text/xml");
+ map.put("xwd", "application/x-xwd");
+ map.put("x_b", "application/x-x_b");
+ map.put("sis", "application/vnd.symbian.install");
+ map.put("sisx", "application/vnd.symbian.install");
+ map.put("x_t", "application/x-x_t");
+ map.put("ipa", "application/vnd.iphone");
+ map.put("apk", "application/vnd.android.package-archive");
+ map.put("xap", "application/x-silverlight-app");
+ }
+
+
+ public static String getContentType(String filename) {
+ String contentType = "application/octet-stream";
+ if (filename != null) {
+ if (filename.lastIndexOf(".") != -1 && filename.lastIndexOf(".") != 0) {
+ String fileExt = filename.substring(filename.lastIndexOf(".") + 1);
+ if (map.containsKey(fileExt)) {
+ contentType = map.get(fileExt);
+ }
+ }
+ }
+ return contentType;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/HttpClientUtil.java
new file mode 100644
index 0000000..37f8975
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/HttpClientUtil.java
@@ -0,0 +1,148 @@
+package com.zdjizhi.utils;
+
+import com.zdjizhi.config.Configs;
+import org.apache.flink.configuration.Configuration;
+import org.apache.http.Header;
+import org.apache.http.HttpEntityEnclosingRequest;
+import org.apache.http.HttpRequest;
+import org.apache.http.NoHttpResponseException;
+import org.apache.http.client.HttpRequestRetryHandler;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.socket.PlainConnectionSocketFactory;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+
+import javax.net.ssl.*;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.ConnectException;
+import java.net.UnknownHostException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.X509Certificate;
+
+public class HttpClientUtil {
+
+ private static HttpClientUtil httpClientUtil;
+ private final CloseableHttpClient closeableHttpClient;
+ private final Configuration configuration;
+
+ private HttpClientUtil(Configuration configuration) {
+ this.configuration = configuration;
+ closeableHttpClient = HttpClients.custom()
+ // 把请求相关的超时信息设置到连接客户端
+ .setDefaultRequestConfig(getRequestConfig())
+ // 把请求重试设置到连接客户端
+ .setRetryHandler(getRetryHandler())
+ // 配置连接池管理对象
+ .setConnectionManager(getSslClientManager())
+ .build();
+ }
+
+ private RequestConfig getRequestConfig() {
+ return RequestConfig.custom()
+ .setConnectTimeout(configuration.get(Configs.SINK_HOS_HTTP_CONNECT_TIMEOUT))
+ .setConnectionRequestTimeout(configuration.get(Configs.SINK_HOS_HTTP_REQUEST_TIMEOUT))
+ .setSocketTimeout(configuration.get(Configs.SINK_HOS_HTTP_SOCKET_TIMEOUT))
+ .build();
+ }
+
+ private HttpRequestRetryHandler getRetryHandler() {
+ return (exception, executionCount, context) -> {
+ if (executionCount >= configuration.get(Configs.SINK_HOS_HTTP_ERROR_RETRY)) {
+ return false;
+ }
+ if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
+ return true;
+ }
+ if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常
+ return false;
+ }
+ if (exception instanceof ConnectException) {// 连接被拒绝
+ return false;
+ }
+ if (exception instanceof InterruptedIOException) {// 超时
+ return true;
+ }
+ if (exception instanceof UnknownHostException) {// 目标服务器不可达
+ return false;
+ }
+ if (exception instanceof SSLException) {// ssl握手异常
+ return false;
+ }
+ HttpClientContext clientContext = HttpClientContext.adapt(context);
+ HttpRequest request = clientContext.getRequest();
+ // 如果请求是幂等的,就再次尝试
+ return !(request instanceof HttpEntityEnclosingRequest);
+ };
+ }
+
+ private PoolingHttpClientConnectionManager getSslClientManager() {
+ PoolingHttpClientConnectionManager connManager;
+ try {
+ X509TrustManager trustManager = new X509TrustManager() {
+ @Override
+ public X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+
+ @Override
+ public void checkClientTrusted(X509Certificate[] xcs, String str) {
+ }
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] xcs, String str) {
+ }
+ };
+ SSLContext ctx = SSLContext.getInstance(SSLConnectionSocketFactory.TLS);
+ ctx.init(null, new TrustManager[]{trustManager}, null);
+ SSLConnectionSocketFactory socketFactory = new SSLConnectionSocketFactory(ctx, NoopHostnameVerifier.INSTANCE);
+ Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
+ .register("http", PlainConnectionSocketFactory.INSTANCE)
+ .register("https", socketFactory).build();
+ // 创建ConnectionManager,添加Connection配置信息
+ connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
+ // 设置最大连接数
+ connManager.setMaxTotal(configuration.get(Configs.SINK_HOS_HTTP_MAX_TOTAL));
+ // 设置每个连接的路由数
+ connManager.setDefaultMaxPerRoute(configuration.get(Configs.SINK_HOS_HTTP_MAX_PER_ROUTE));
+ } catch (KeyManagementException | NoSuchAlgorithmException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ return connManager;
+ }
+
+ public static synchronized HttpClientUtil getInstance(Configuration configuration) {
+ if (null == httpClientUtil) {
+ httpClientUtil = new HttpClientUtil(configuration);
+ }
+ return httpClientUtil;
+ }
+
+ public void close() throws IOException {
+ closeableHttpClient.close();
+ }
+
+ public CloseableHttpResponse httpPut(String url, byte[] requestBody, Header... headers) throws IOException {
+ HttpPut put = new HttpPut(url);
+ if (StringUtil.isNotEmpty(headers)) {
+ for (Header header : headers) {
+ if (StringUtil.isNotEmpty(header)) {
+ put.addHeader(header);
+ }
+ }
+ }
+ put.setEntity(new ByteArrayEntity(requestBody));
+ return closeableHttpClient.execute(put);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/KafkaCertUtil.java b/src/main/java/com/zdjizhi/utils/KafkaCertUtil.java
new file mode 100644
index 0000000..efa9703
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/KafkaCertUtil.java
@@ -0,0 +1,41 @@
+package com.zdjizhi.utils;
+
+import com.zdjizhi.config.Configs;
+import org.apache.flink.configuration.Configuration;
+import org.apache.kafka.common.config.SslConfigs;
+
+import java.util.Properties;
+
+public class KafkaCertUtil {
+ /**
+ * Kafka SASL认证端口
+ */
+ private static final String SASL_PORT = "9094";
+
+ /**
+ * Kafka SSL认证端口
+ */
+ private static final String SSL_PORT = "9095";
+
+ /**
+ * 根据连接信息端口判断认证方式。
+ *
+ * @param properties kafka 连接配置信息
+ */
+ public static void chooseCert(Properties properties, Configuration configuration) {
+ if (configuration.get(Configs.KAFKA_BROKER).contains(SASL_PORT)) {
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ + configuration.get(Configs.KAFKA_USER) + " password=" + configuration.get(Configs.KAFKA_PIN) + ";");
+ } else if (configuration.get(Configs.KAFKA_BROKER).contains(SSL_PORT)) {
+ properties.put("security.protocol", "SSL");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ properties.put("ssl.keystore.location", configuration.get(Configs.KAFKA_TOOLS_LIBRARY) + "keystore.jks");
+ properties.put("ssl.keystore.password", configuration.get(Configs.KAFKA_PIN));
+ properties.put("ssl.truststore.location", configuration.get(Configs.KAFKA_TOOLS_LIBRARY) + "truststore.jks");
+ properties.put("ssl.truststore.password", configuration.get(Configs.KAFKA_PIN));
+ properties.put("ssl.key.password", configuration.get(Configs.KAFKA_PIN));
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/PublicUtil.java b/src/main/java/com/zdjizhi/utils/PublicUtil.java
new file mode 100644
index 0000000..ac11fbb
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/PublicUtil.java
@@ -0,0 +1,191 @@
+package com.zdjizhi.utils;
+
+import cn.hutool.core.io.IoUtil;
+import cn.hutool.core.util.ArrayUtil;
+import cn.hutool.core.util.CharUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.config.Configs;
+import com.zdjizhi.pojo.FileChunk;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.http.Header;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+public class PublicUtil {
+ private static final Log LOG = LogFactory.get();
+
+ public static List<FileChunk> combine(Iterable<FileChunk> input, long keyMaxChunk, Counter duplicateChunkCounter, Counter combineErrorCounter, Counter seekChunkCounter, Counter appendChunkCounter) {
+ List<FileChunk> combinedFileChunkList = new ArrayList<>();
+ try {
+ List<FileChunk> originalFileChunkList = StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList());
+ List<byte[]> waitingToCombineChunkList = new ArrayList<>();
+ if ("seek".equals(originalFileChunkList.get(0).getCombineMode())) {
+ seekChunkCounter.inc();
+ // 按照offset排序
+ originalFileChunkList.sort(Comparator.comparingLong(FileChunk::getOffset));
+ Iterator<FileChunk> originalFileChunkIterator = originalFileChunkList.iterator();
+ if (originalFileChunkIterator.hasNext()) {
+ int duplicateCount = 0;
+ FileChunk currentFileChunk = originalFileChunkIterator.next();
+ int lastChunkFlag = currentFileChunk.getLastChunkFlag();
+ long startOffset = currentFileChunk.getOffset();
+ if (currentFileChunk.getChunk() != null && currentFileChunk.getChunk().length > 0) {
+ waitingToCombineChunkList.add(currentFileChunk.getChunk());
+ }
+ while (originalFileChunkIterator.hasNext()) {
+ seekChunkCounter.inc();
+ long expectedOffset = currentFileChunk.getOffset() + currentFileChunk.getLength();
+ currentFileChunk = originalFileChunkIterator.next();
+ long actualOffset = currentFileChunk.getOffset();
+ if (expectedOffset > actualOffset) {// 期望offset大于当前offset,该块为重复块,跳过该块
+ duplicateCount++;
+ } else if (expectedOffset == actualOffset) {// 期望offset等于当前offset,将该块添加到待合并集合中
+ if (currentFileChunk.getLastChunkFlag() == 1) {
+ lastChunkFlag = currentFileChunk.getLastChunkFlag();
+ }
+ if (currentFileChunk.getChunk() != null && currentFileChunk.getChunk().length > 0) {
+ waitingToCombineChunkList.add(currentFileChunk.getChunk());
+ }
+ } else {// 期望offset小于当前offset,说明缺块
+ if (waitingToCombineChunkList.size() > 0) {//将可合并的chunk合并,清空集合
+ combinedFileChunkList.add(combineChunk(waitingToCombineChunkList, currentFileChunk.getUuid(), currentFileChunk.getFileName(), currentFileChunk.getFileType(), startOffset, currentFileChunk.getCombineMode(), lastChunkFlag, originalFileChunkList.get(0).getMeta(), 0, null));
+ waitingToCombineChunkList.clear();
+ } else {
+ if (lastChunkFlag == 1) {
+ combinedFileChunkList.add(currentFileChunk);
+ }
+ }
+ // 将当前块作为第一个块,继续合并
+ startOffset = currentFileChunk.getOffset();// 重置起始offset
+ lastChunkFlag = currentFileChunk.getLastChunkFlag();
+ if (currentFileChunk.getChunk() != null && currentFileChunk.getChunk().length > 0) {
+ waitingToCombineChunkList.add(currentFileChunk.getChunk());
+ }
+ }
+ }
+ if (waitingToCombineChunkList.size() > 0) {
+ combinedFileChunkList.add(combineChunk(waitingToCombineChunkList, currentFileChunk.getUuid(), currentFileChunk.getFileName(), currentFileChunk.getFileType(), startOffset, currentFileChunk.getCombineMode(), lastChunkFlag, originalFileChunkList.get(0).getMeta(), 0, null));
+ } else {
+ if (lastChunkFlag == 1) {
+ combinedFileChunkList.add(currentFileChunk);
+ }
+ }
+ if (duplicateCount > 0) {
+ LOG.error("Duplicate upload chunk, uuid=" + currentFileChunk.getUuid() + ", repetitionCount=" + duplicateCount);
+ duplicateChunkCounter.inc(duplicateCount);
+ }
+ }
+ } else {
+ // 按timestamp排序
+ originalFileChunkList.sort(Comparator.comparingLong(FileChunk::getTimestamp));
+ long startTimestamp = originalFileChunkList.get(0).getTimestamp();
+ StringBuilder timestampAndSizes = new StringBuilder();
+ for (FileChunk originalFileChunk : originalFileChunkList) {
+ appendChunkCounter.inc();
+ byte[] chunk = originalFileChunk.getChunk();
+ if (chunk != null && chunk.length > 0) {
+ chunk = originalFileChunk.getChunk();
+ waitingToCombineChunkList.add(chunk);
+ timestampAndSizes.append(originalFileChunk.getTimestamp()).append("-").append(chunk.length).append(";");
+ }
+ if (waitingToCombineChunkList.size() > keyMaxChunk) {
+ break;
+ }
+ }
+ if (waitingToCombineChunkList.size() > 0) {
+ combinedFileChunkList.add(combineChunk(waitingToCombineChunkList, originalFileChunkList.get(0).getUuid(), originalFileChunkList.get(0).getFileName(), originalFileChunkList.get(0).getFileType(), 0, "append", 0, originalFileChunkList.get(0).getMeta(), startTimestamp, timestampAndSizes.toString()));
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Combiner file error.", e);
+ combineErrorCounter.inc();
+ }
+ return combinedFileChunkList;
+ }
+
+ private static FileChunk combineChunk(List<byte[]> byteList, String uuid, String fileName, String fileType, long offset, String combineMode, int lastChunkFlag, Map<String, Object> metaMap, long startTimestamp, String chunkNumbers) {
+ FileChunk fileChunk = new FileChunk();
+ fileChunk.setChunkCount(byteList.size());
+ byte[][] bytes = new byte[byteList.size()][];
+ byteList.toArray(bytes);
+ byte[] newData = ArrayUtil.addAll(bytes);
+ if ("seek".equals(combineMode)) {
+ fileChunk.setOffset(offset);
+ fileChunk.setLastChunkFlag(lastChunkFlag);
+ } else {
+ if (StringUtil.isNotEmpty(chunkNumbers)) {
+ fileChunk.setChunkNumbers(chunkNumbers);
+ }
+ }
+ fileChunk.setTimestamp(startTimestamp);
+ fileChunk.setFileType(fileType);
+ fileChunk.setUuid(uuid);
+ fileChunk.setChunk(newData);
+ fileChunk.setFileName(fileName);
+ fileChunk.setCombineMode(combineMode);
+ fileChunk.setLength(newData.length);
+ fileChunk.setMeta(metaMap);
+ return fileChunk;
+ }
+
+ public static void sendToHos(FileChunk fileChunk, Configuration configuration,Counter sendHosErrorCounter){
+ CloseableHttpResponse response = null;
+ try {
+ String url = configuration.get(Configs.SINK_HOS_ENDPOINT) + "/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid();
+ byte[] data;
+ if (fileChunk.getChunk() != null) {
+ data = fileChunk.getChunk();
+ } else {
+ data = "".getBytes();
+ }
+ List<Header> headers = new ArrayList<>();
+ headers.add(new BasicHeader("token", configuration.get(Configs.SINK_HOS_TOKEN)));
+ headers.add(new BasicHeader("x-hos-upload-type", "appendV2"));
+ headers.add(new BasicHeader("x-hos-combine-mode", fileChunk.getCombineMode()));
+ String filename = fileChunk.getFileName();
+ if (StrUtil.isNotEmpty(filename) && filename.contains(".")) {
+ headers.add(new BasicHeader("x-hos-meta-filename", filename));
+ } else if (StrUtil.isNotEmpty(filename) && !filename.contains(".")) {
+ filename = filename + "." + fileChunk.getFileType();
+ headers.add(new BasicHeader("x-hos-meta-filename", filename));
+ } else if (StrUtil.isEmpty(filename) && StrUtil.isNotEmpty(fileChunk.getFileType())) {
+ headers.add(new BasicHeader("x-hos-meta-file-type", fileChunk.getFileType()));
+ }
+ if ("seek".equals(fileChunk.getCombineMode())) {
+ headers.add(new BasicHeader("x-hos-offset", fileChunk.getOffset() + ""));
+ headers.add(new BasicHeader("x-hos-part-last-flag", fileChunk.getLastChunkFlag() + ""));
+ } else {
+ headers.add(new BasicHeader("x-hos-part-number", fileChunk.getTimestamp() + ""));
+ headers.add(new BasicHeader("x-hos-part-chunk-numbers", fileChunk.getChunkNumbers()));
+ }
+ headers.add(new BasicHeader("x-hos-part-chunk-count", fileChunk.getChunkCount() + ""));
+ Map<String, Object> metaMap = fileChunk.getMeta();
+ if (metaMap != null && metaMap.size() > 0) {
+ for (String meta : metaMap.keySet()) {
+ headers.add(new BasicHeader("x-hos-meta-" + StrUtil.toSymbolCase(meta, CharUtil.DASHED), metaMap.get(meta) + ""));
+ }
+ }
+ response = HttpClientUtil.getInstance(configuration).httpPut(url, data, headers.toArray(new Header[0]));
+ if (response.getStatusLine().getStatusCode() != 200) {
+ String responseEntity = EntityUtils.toString(response.getEntity(), "UTF-8");
+ LOG.error("put part to hos error. code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity);
+ sendHosErrorCounter.inc();
+ }
+ } catch (IOException e) {
+ LOG.error("put part to hos error.", e);
+ sendHosErrorCounter.inc();
+ } finally {
+ IoUtil.close(response);
+ }
+ }
+
+}
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
new file mode 100644
index 0000000..e8e2d19
--- /dev/null
+++ b/src/main/resources/common.properties
@@ -0,0 +1,39 @@
+flink.job.name=agg_traffic_file_chunk_combine
+#source�������
+source.kafka.parallelism=1
+#9092Ϊ����֤ 9095Ϊssl 9094Ϊsasl
+source.kafka.broker=192.168.44.12:9092
+source.kafka.group.id=test1
+source.kafka.topic=TRAFFIC-FILE-STREAM-RECORD
+#earliest��ͷ��ʼ latest����
+source.kafka.auto.offset.reset=latest
+source.kafka.session.timeout.ms=60000
+#ÿ����ȡ�����ӷ����л�ȡ������¼��
+source.kafka.max.poll.records=1000
+#�����ߴӵ���������һ���Ի�ȡ������ֽ���
+source.kafka.max.partition.fetch.bytes=31457280
+source.kafka.enable.auto.commit=true
+#kafka SASL��֤�û���
+source.kafka.user=admin
+#kafka SASL��SSL��֤����
+source.kafka.pin=galaxy2019
+#SSL��Ҫ
+source.kafka.tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
+parse.message.pack.parallelism=1
+#�����������
+combiner.window.parallelism=3
+combiner.window.time=10
+#�೤ʱ��δд�������򴥷�����
+combiner.window.idle.time=5
+combiner.window.key.max.chunk=100000
+#hos sink�������
+sink.hos.parallelism=3
+sink.hos.endpoint=http://192.168.44.12:9098/hos
+sink.hos.bucket=traffic_file_bucket
+sink.hos.token=c21f969b5f03d33d43e04f8f136e7682
+sink.hos.http.error.retry=3
+sink.hos.http.max.total=2000
+sink.hos.http.max.per.route=1000
+sink.hos.http.connect.timeout=10000
+sink.hos.http.request.timeout=10000
+sink.hos.http.socket.timeout=60000 \ No newline at end of file
diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties
new file mode 100644
index 0000000..30a9f04
--- /dev/null
+++ b/src/main/resources/log4j.properties
@@ -0,0 +1,7 @@
+log4j.rootLogger=info,console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=INFO
+log4j.appender.console.ImmediateFlush=true
+log4j.appender.console.Target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n \ No newline at end of file
diff --git a/src/test/data/messagePacks b/src/test/data/messagePacks
new file mode 100644
index 0000000..d8fd72d
--- /dev/null
+++ b/src/test/data/messagePacks
Binary files differ
diff --git a/src/test/data/test.eml b/src/test/data/test.eml
new file mode 100644
index 0000000..6f40886
--- /dev/null
+++ b/src/test/data/test.eml
@@ -0,0 +1,218 @@
+Received: from smtp-server1.cfdenselr.com [39.191.101.160] by relay.2yahoo.com with SMTP; Sat, 26 Jan 2019 00:30:55 -0500
+Received: from mmx09.tilkbans.com [144.61.26.91] by m1.gns.snv.thisdomainl.com with NNFMP; Sat, 26 Jan 2019 00:16:11 -0500
+Received: from mail.naihautsui.co.kr ([196.191.214.12]) by smtp.endend.nl with SMTP; Sat, 26 Jan 2019 00:00:50 -0500
+Message-ID: <[email protected]>
+Date: Sat, 26 Jan 2019 00:00:50 -0500
+Reply-To: "Chubuk" <[email protected]>
+From: "Chubuk" <[email protected]>
+X-Accept-Language: en-us
+MIME-Version: 1.0
+To: "ii9264510538" <[email protected]>
+Subject: chubuk1971 : ii9264510538
+Content-Type: text/html;charset="iso-8859-1"
+Content-Transfer-Encoding: base64
+
+SSBhbSB3ZWxsIGF3YXJlIGlpOTI2NDUxMDUzOCBpcyBvbmUgb2YgeW91ciBwYXNzcGhyYXNlcy4g
+TGV0cyBnZXQgc3RyYWlnaHQgdG8gdGhlIHB1cnBvc2UuIE5vdCBhIHNpbmdsZSBwZXJzb24gaGFz
+IHBhaWQgbWUgdG8gaW52ZXN0aWdhdGUgeW91LiBZb3UgZG9uJ3Qga25vdyBtZSBhbmQgeW91IGFy
+ZSBwcm9iYWJseSB0aGlua2luZyB3aHkgeW91IGFyZSBnZXR0aW5nIHRoaXMgZSBtYWlsPw0KPGJy
+Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5oY3c8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1
+YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8
+Zm9udCBjb2xvcj0id2hpdGUiPmFpdWthbjwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVi
+dWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxm
+b250IGNvbG9yPSJ3aGl0ZSI+ZW9vPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5
+NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQg
+Y29sb3I9IndoaXRlIj5tdGFkZWNsY2E8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVr
+MTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9u
+dCBjb2xvcj0id2hpdGUiPnN5bmVldWVibzwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVi
+dWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxm
+b250IGNvbG9yPSJ3aGl0ZSI+eXVlYWZvPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1
+azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZv
+bnQgY29sb3I9IndoaXRlIj5pcmM8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3
+MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBj
+b2xvcj0id2hpdGUiPnVteXBlbWxsPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5
+NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQg
+Y29sb3I9IndoaXRlIj5hc3lleGE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3
+MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBj
+b2xvcj0id2hpdGUiPm91Y3l4ZW5pPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5
+NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGJyPg0K
+TGV0IG1lIHRlbGwgeW91LCBpIGFjdHVhbGx5IHBsYWNlZCBhIG1hbHdhcmUgb24gdGhlIFggc3Ry
+ZWFtaW5nIChwb3Jubykgd2ViLXNpdGUgYW5kIGd1ZXNzIHdoYXQsIHlvdSB2aXNpdGVkIHRoaXMg
+d2Vic2l0ZSB0byBleHBlcmllbmNlIGZ1biAoeW91IGtub3cgd2hhdCBpIG1lYW4pLiBXaGlsZSB5
+b3Ugd2VyZSB3YXRjaGluZyB2aWRlb3MsIHlvdXIgYnJvd3NlciBpbml0aWF0ZWQgZnVuY3Rpb25p
+bmcgYXMgYSBSZW1vdGUgRGVza3RvcCB0aGF0IGhhcyBhIGtleSBsb2dnZXIgd2hpY2ggZ2F2ZSBt
+ZSBhY2Nlc3NpYmlsaXR5IHRvIHlvdXIgc2NyZWVuIGFzIHdlbGwgYXMgY2FtLiBhZnRlciB0aGF0
+LCBteSBzb2Z0d2FyZSBwcm9ncmFtIGdhdGhlcmVkIHlvdXIgZW50aXJlIGNvbnRhY3RzIGZyb20g
+eW91ciBNZXNzZW5nZXIsIHNvY2lhbCBuZXR3b3JrcywgYXMgd2VsbCBhcyBlbWFpbCAuIGFmdGVy
+IHRoYXQgaSBtYWRlIGEgZG91YmxlLXNjcmVlbiB2aWRlby4gMXN0IHBhcnQgZGlzcGxheXMgdGhl
+IHZpZGVvIHlvdSB3ZXJlIHZpZXdpbmcgKHlvdSBoYXZlIGEgZ29vZCB0YXN0ZSBsb2wgLiAuIC4p
+LCBhbmQgMm5kIHBhcnQgc2hvd3MgdGhlIHZpZXcgb2YgeW91ciB3ZWJjYW0sIHllYWggaXRzIHlv
+dS4gDQo8YnI+DQo8Zm9udCBjb2xvcj0id2hpdGUiPnJ6ZGU8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3
+aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8
+L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPnBhb208L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0
+ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2Zv
+bnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPnZvemFrZHJlbTwvZm9udD4gPGZvbnQgY29sb3I9Indo
+aXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwv
+Zm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+Z29wdXlwbW11PC9mb250PiA8Zm9udCBjb2xvcj0i
+d2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4
+PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5hcW9pbmN6aDwvZm9udD4gPGZvbnQgY29sb3I9
+IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUz
+ODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+eG15PC9mb250PiA8Zm9udCBjb2xvcj0id2hp
+dGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9m
+b250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj50emlmcXVldXY8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3
+aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8
+L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPm9ieWR6eW90PC9mb250PiA8Zm9udCBjb2xvcj0i
+d2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4
+PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj55PC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUi
+PmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250
+Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5pbzwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVi
+dWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxi
+cj4NCllvdSBhY3R1YWxseSBoYXZlIGEgcGFpciBvZiBwb3NzaWJpbGl0aWVzLiBMZXQgdXMgYW5h
+bHl6ZSBlYWNoIG9uZSBvZiB0aGVzZSBjaG9pY2VzIGluIGRldGFpbHM6DQo8YnI+DQo8Zm9udCBj
+b2xvcj0id2hpdGUiPmhnaW9rZ3VnPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5
+NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQg
+Y29sb3I9IndoaXRlIj5xb3RveGY8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3
+MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBj
+b2xvcj0id2hpdGUiPmJvcHB3aWppZTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsx
+OTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250
+IGNvbG9yPSJ3aGl0ZSI+aGFoPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8
+L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29s
+b3I9IndoaXRlIj5odjwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250
+PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3
+aGl0ZSI+em5pbW91YXQ8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9u
+dD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0i
+d2hpdGUiPm9yPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxm
+b250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRl
+Ij5vZ3hlb2d5dTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8
+Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0
+ZSI+aGw8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQg
+Y29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPnVq
+Yml1cXdjZTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9u
+dCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxicj4NCjFzdCBzb2x1dGlvbiBp
+cyB0byBqdXN0IGlnbm9yZSB0aGlzIGUgbWFpbC4gaW4gdGhpcyBzaXR1YXRpb24sIGkgd2lsbCBz
+ZW5kIG91dCB5b3VyIGFjdHVhbCB2aWRlbyB0byBlYWNoIG9mIHlvdXIgeW91ciBwZXJzb25hbCBj
+b250YWN0cyBhbmQgdGh1cyBqdXN0IGltYWdpbmUgY29uY2VybmluZyB0aGUgc2hhbWUgeW91IGZl
+ZWwuIGFuZCBjb25zZXF1ZW50bHkgaWYgeW91IGFyZSBpbiBhIHJvbWFudGljIHJlbGF0aW9uc2hp
+cCwganVzdCBob3cgaXQgY2FuIGFmZmVjdD8NCjxicj4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+d291
+dnd5PC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNv
+bG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj50dWZl
+eHlkeTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBj
+b2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+bGd5
+anU8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29s
+b3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPmlrdHVx
+PC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9y
+PSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5vdWN5aGVp
+bGE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29s
+b3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPmRwaGpo
+d3FhPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNv
+bG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5zPC9m
+b250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3
+aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5hcmNlcXNhPC9m
+b250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3
+aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5lcmZheHl3PC9m
+b250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3
+aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj54bWF5YzwvZm9u
+dD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hp
+dGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxicj4NCkxhdHRlciBvcHRpb24gd2lsbCBiZSB0byBw
+YXkgbWUgVVNEIDk3Ny4gTGV0cyBuYW1lIGl0IGFzIGEgZG9uYXRpb24uIGluIHRoaXMgc2NlbmFy
+aW8sIGkgd2lsbCBpbnN0YW50bHkgcmVtb3ZlIHlvdXIgdmlkZW90YXBlLiBZb3Ugd2lsbCBjb250
+aW51ZSBvbiB5b3VyIGRhaWx5IGxpZmUgbGlrZSB0aGlzIG5ldmVyIG9jY3VycmVkIGFuZCB5b3Ug
+d291bGQgbmV2ZXIgaGVhciBiYWNrIGFnYWluIGZyb20gbWUuDQo8YnI+DQo8Zm9udCBjb2xvcj0i
+d2hpdGUiPmtpPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxm
+b250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRl
+Ij5ib25vdWl1b3U8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4g
+PGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hp
+dGUiPnF1ZnpvenhleTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250
+PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3
+aGl0ZSI+cm93emxwdmE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9u
+dD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0i
+d2hpdGUiPmJ5aTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8
+Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0
+ZSI+aHlydWdlbGY8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4g
+PGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hp
+dGUiPnRlc2FvZW94ZDwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250
+PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3
+aGl0ZSI+YXB1ZXY8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4g
+PGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hp
+dGUiPnhhb2Vpem1oeTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250
+PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3
+aGl0ZSI+eW51eWhveHlsPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2Zv
+bnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGJyPg0KWW91IHdp
+bGwgbWFrZSB0aGUgcGF5bWVudCB2aWEgQmkmIzgyMDQ7dGNvJiM4MjA0O2luIChpZiB5b3UgZG8g
+bm90IGtub3cgdGhpcywgc2VhcmNoICdob3cgdG8gYnV5IGImIzgyMDQ7aXRjb2kmIzgyMDQ7bicg
+aW4gR29vZ2xlIHNlYXJjaCBlbmdpbmUpLiANCjxicj4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+bGlk
+enV5bGliPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250
+IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj51
+ZWZiaW9mb3U8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZv
+bnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUi
+PnFvYnl1a2V3YTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8
+Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0
+ZSI+eHVqdmRzZTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8
+Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0
+ZSI+ZXBldG88L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZv
+bnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUi
+Pnpxb3VmbG88L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZv
+bnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUi
+PnZlbmFqZmtvdDwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8
+Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0
+ZSI+YWR5ZXB3bzwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8
+Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0
+ZSI+YWdoaXlhbm88L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4g
+PGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hp
+dGUiPmt6dnVuaTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8
+Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwvZm9udD4NCjxicj4NCkImIzgyMDQ7VCYj
+ODIwNDtDJiM4MjA0OyBhZCYjODIwNDtkcmUmIzgyMDQ7c3M6ICAxOHo1YzZUakxVb3NxUFRFbm02
+cTdRMkVWTmdiQ3kxNlRkDQo8YnI+DQo8Zm9udCBjb2xvcj0id2hpdGUiPmx5c2VsaTwvZm9udD4g
+PGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUi
+PmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+c21hYmplZjwvZm9udD4g
+PGZvbnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUi
+PmlpOTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+eHV6eW9hcWVsPC9mb250
+PiA8Zm9udCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0
+ZSI+aWk5MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5jYTwvZm9udD4gPGZv
+bnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlp
+OTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+dGs8L2ZvbnQ+IDxmb250IGNv
+bG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1
+MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPnVhcWppZG9oPC9mb250PiA8Zm9udCBj
+b2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0
+NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5ueWlqamFpYXk8L2ZvbnQ+IDxmb250
+IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTky
+NjQ1MTA1Mzg8L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPmhhcnltYXV5ZTwvZm9udD4gPGZv
+bnQgY29sb3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlp
+OTI2NDUxMDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+b3Vxb2tlPC9mb250PiA8Zm9u
+dCBjb2xvcj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5
+MjY0NTEwNTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj54aWx5YXh5ZXQ8L2ZvbnQ+IDxm
+b250IGNvbG9yPSJ3aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5p
+aTkyNjQ1MTA1Mzg8L2ZvbnQ+DQo8YnI+DQpbQ2FTZS1zZW5zaXRpdmUgY29weSBhbmQgcGFzdGUg
+aXRdDQo8YnI+DQo8Zm9udCBjb2xvcj0id2hpdGUiPnhhaGlyeGJtPC9mb250PiA8Zm9udCBjb2xv
+cj0id2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEw
+NTM4PC9mb250Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5qamlkaWZhdDwvZm9udD4gPGZvbnQgY29s
+b3I9IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUx
+MDUzODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+c2lueTwvZm9udD4gPGZvbnQgY29sb3I9
+IndoaXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUz
+ODwvZm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+YWdheWY8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3
+aGl0ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8
+L2ZvbnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPnh0ZWM8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0
+ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2Zv
+bnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPm9tYWg8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+
+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2ZvbnQ+
+DQo8Zm9udCBjb2xvcj0id2hpdGUiPnV2aWhhdGFoPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUi
+PmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4PC9mb250
+Pg0KPGZvbnQgY29sb3I9IndoaXRlIj5xbXRkdXh1aWQ8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0
+ZSI+Y2h1YnVrMTk3MTwvZm9udD4gPGZvbnQgY29sb3I9IndoaXRlIj5paTkyNjQ1MTA1Mzg8L2Zv
+bnQ+DQo8Zm9udCBjb2xvcj0id2hpdGUiPmVuaWxlZ2FlYTwvZm9udD4gPGZvbnQgY29sb3I9Indo
+aXRlIj5jaHVidWsxOTcxPC9mb250PiA8Zm9udCBjb2xvcj0id2hpdGUiPmlpOTI2NDUxMDUzODwv
+Zm9udD4NCjxmb250IGNvbG9yPSJ3aGl0ZSI+YXF6em1oeWx5PC9mb250PiA8Zm9udCBjb2xvcj0i
+d2hpdGUiPmNodWJ1azE5NzE8L2ZvbnQ+IDxmb250IGNvbG9yPSJ3aGl0ZSI+aWk5MjY0NTEwNTM4
+PC9mb250Pg0KDQo8YnI+DQppZiB5b3UgaGF2ZSBiZWVuIG1ha2luZyBwbGFucyBmb3IgZ29pbmcg
+dG8gdGhlIGxhdyBlbmZvcmNlbWVudCwgc3VyZWx5LCB0aGlzIGUtbWFpbCBjYW4gbm90IGJlIHRy
+YWNlZCBiYWNrIHRvIG1lLiBJIGhhdmUgY292ZXJlZCBteSBzdGVwcy4gaSBhbSBhbHNvIG5vdCB0
+cnlpbmcgdG8gYXNrIHlvdSBmb3IgYSBodWdlIGFtb3VudCwgaSBwcmVmZXIgdG8gYmUgcmV3YXJk
+ZWQuIGUtbWFpbCBpZiBpIGRvbid0IHJlY2VpdmUgdGhlICYjODIwNDtiaSYjODIwNDt0Y28mIzgy
+MDQ7aW4mIzgyMDQ7LCBpIHdpbGwsIG5vIGRvdWJ0IHNlbmQgb3V0IHlvdXIgdmlkZW8gcmVjb3Jk
+aW5nIHRvIGFsbCBvZiB5b3VyIGNvbnRhY3RzIGluY2x1ZGluZyBtZW1iZQ0KcnMgb2YgeW91ciBm
+YW1pbHksIGNvbGxlYWd1ZXMsIGFuZCBzbyBmb3J0aC4gSG93ZXZlciwgaWYgaSByZWNlaXZlIHRo
+ZSBwYXltZW50LCBpIHdpbGwgZXJhc2UgdGhlIHZpZGVvIGltbWVkaWF0ZWx5LiBJZiB5b3Ugd2Fu
+dCB0byBoYXZlIGV2aWRlbmNlLCByZXBseSAgWWVhaCBhbmQgaSB3aWxsIGNlcnRhaW5seSBzZW5k
+IG91dCB5b3VyIHZpZGVvIHJlY29yZGluZyB0byB5b3VyIDcgY29udGFjdHMuIGl0J3MgYSBub246
+bmVnb3RpYWJsZSBvZmZlciB0aHVzIGRvbid0IHdhc3RlIG1pbmUgdGltZSAmIHlvdXJzIGJ5IHJl
+cGx5aW5nIHRvIHRoaXMgZW1haWwuDQo= \ No newline at end of file
diff --git a/src/test/java/com/zdjizhi/FileChunkCombinerTests.java b/src/test/java/com/zdjizhi/FileChunkCombinerTests.java
new file mode 100644
index 0000000..a5b0652
--- /dev/null
+++ b/src/test/java/com/zdjizhi/FileChunkCombinerTests.java
@@ -0,0 +1,566 @@
+package com.zdjizhi;
+
+import cn.hutool.core.io.FileUtil;
+import cn.hutool.core.util.ArrayUtil;
+import cn.hutool.core.util.RandomUtil;
+import com.zdjizhi.config.Configs;
+import com.zdjizhi.function.CombineChunkProcessWindowFunction;
+import com.zdjizhi.function.FileChunkKeySelector;
+import com.zdjizhi.function.ParseMessagePackMapFunction;
+import com.zdjizhi.pojo.FileChunk;
+import com.zdjizhi.trigger.LastChunkOrNoDataInTimeTrigger;
+import com.zdjizhi.trigger.MultipleTrigger;
+import com.zdjizhi.utils.PublicUtil;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.state.JavaSerializer;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.Collector;
+import org.junit.*;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.*;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class FileChunkCombinerTests {
+ private static Counter duplicateChunkCounter;
+ private static Counter combineErrorCounter;
+ private static Counter seekChunkCounter;
+ private static Counter appendChunkCounter;
+ private static Counter sendHosErrorCounter;
+ private File emlFile;
+ private byte[] emlFileBytes;
+ private byte[] pcapngFileBytes;
+ private List<FileChunk> inputFileChunks;
+ private List<byte[]> messagePackList;
+ private List<FileChunk> emlFileChunks;
+ private List<FileChunk> pcapngFileChunks;
+ private List<FileChunk> pcapngIncludeMetaFileChunks;
+ private Map<String, Object> pcapngFileMeta;
+ private String emlUuid = "1111111111";
+ private String pcapngUuid = "2222222222";
+ private String pcapngIncludeMetaUuid = "3333333333";
+ private int emlChunkCount = 10;
+ private int pcapngChunkCount = 10;
+ private long maxChunkCount;
+ private String pcapChunkData = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
+ private static Configuration configuration;
+
+ @Before
+ public void testBefore() throws Exception {
+ ParameterTool parameterTool = ParameterTool.fromPropertiesFile("C:\\Users\\root\\Documents\\file-chunk-combiner\\src\\main\\resources\\common.properties");
+ configuration = parameterTool.getConfiguration();
+ duplicateChunkCounter = new SimpleCounter();
+ combineErrorCounter = new SimpleCounter();
+ seekChunkCounter = new SimpleCounter();
+ appendChunkCounter = new SimpleCounter();
+ sendHosErrorCounter = new SimpleCounter();
+ maxChunkCount = configuration.get(Configs.COMBINER_WINDOW_KEY_MAX_CHUNK);
+ String filePath = "src" + File.separator + "test" + File.separator + "data" + File.separator + "test.eml";
+ emlFile = new File(filePath);
+ emlFileBytes = FileUtil.readBytes(emlFile);
+ StringBuilder pcapData = new StringBuilder();
+ for (int i = 0; i < 10; i++) {
+ pcapData.append(pcapChunkData);
+ }
+ pcapngFileBytes = pcapData.toString().getBytes();
+ pcapngFileMeta = new HashMap<>();
+ pcapngFileMeta.put("ruleId", 151);
+ pcapngFileMeta.put("taskId", 7477);
+ pcapngFileMeta.put("sledIP", "127.0.0.1");
+ inputFileChunks = new ArrayList<>();
+ emlFileChunks = new ArrayList<>();
+ pcapngFileChunks = new ArrayList<>();
+ pcapngIncludeMetaFileChunks = new ArrayList<>();
+ ParseMessagePackMapFunction mapFunction = new ParseMessagePackMapFunction();
+ ObjectInputStream inputStream = new ObjectInputStream(new FileInputStream("src" + File.separator + "test" + File.separator + "data" + File.separator + "messagePacks"));
+ messagePackList = (List<byte[]>) inputStream.readObject();
+ for (byte[] messagePack : messagePackList) {
+ FileChunk fileChunk = mapFunction.map(messagePack);
+ inputFileChunks.add(fileChunk);
+ }
+ }
+
+ @Test
+ public void testParseMessagePack() {
+ ParseMessagePackMapFunction mapFunction = new ParseMessagePackMapFunction();
+ for (byte[] messagePack : messagePackList) {
+ FileChunk fileChunk = mapFunction.map(messagePack);
+ Assert.assertNotEquals("解析MessagePack数据失败", null, fileChunk.getUuid());
+ }
+ }
+
+ @Test
+ public void testCombineFullChunk() {
+ categorizeChunks(inputFileChunks);
+ //测试seek合并模式
+ List<FileChunk> fileChunkList = PublicUtil.combine(emlFileChunks, maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter);
+ Assert.assertEquals("seek模式合并错误", 1, fileChunkList.size());
+ Assert.assertEquals("seek模式合并错误,lastChunkFlag错误", 1, fileChunkList.get(0).getLastChunkFlag());
+ Assert.assertEquals("seek模式合并错误,chunkCount错误", emlChunkCount - 1, fileChunkList.get(0).getChunkCount());
+ Assert.assertEquals("seek模式合并错误,文件长度错误", emlFileBytes.length, fileChunkList.get(0).getChunk().length);
+ Assert.assertEquals("seek模式合并错误,文件内容错误", new String(emlFileBytes), new String(fileChunkList.get(0).getChunk()));
+ //测试append合并模式
+ fileChunkList = PublicUtil.combine(pcapngFileChunks, maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter);
+ Assert.assertEquals("append模式合并错误", 1, fileChunkList.size());
+ Assert.assertEquals("append模式合并错误,chunkCount错误", pcapngChunkCount, fileChunkList.get(0).getChunkCount());
+ Assert.assertEquals("append模式合并错误,文件长度错误", pcapngFileBytes.length, fileChunkList.get(0).getChunk().length);
+ Assert.assertEquals("append模式合并错误,文件内容错误", new String(pcapngFileBytes), new String(fileChunkList.get(0).getChunk()));
+ //测试合并携带元信息
+ fileChunkList = PublicUtil.combine(pcapngIncludeMetaFileChunks, maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter);
+ Assert.assertEquals(1, fileChunkList.size());
+ Assert.assertEquals("合并错误,元信息错误", pcapngFileMeta, fileChunkList.get(0).getMeta());
+
+ Assert.assertEquals("监控指标错误", 0, duplicateChunkCounter.getCount());
+ Assert.assertEquals("监控指标错误", 0, combineErrorCounter.getCount());
+ Assert.assertEquals("监控指标错误", emlChunkCount, seekChunkCounter.getCount());
+ Assert.assertEquals("监控指标错误", pcapngChunkCount * 2, appendChunkCounter.getCount());
+ }
+
+ @Test
+ public void testCombineDuplicateChunk() {
+ categorizeChunks(inputFileChunks);
+ //测试seek合并模式
+ emlFileChunks.add(emlFileChunks.get(5));
+ List<FileChunk> fileChunkList = PublicUtil.combine(emlFileChunks, maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter);
+ Assert.assertEquals("seek模式合并错误", 1, fileChunkList.size());
+ Assert.assertEquals("seek模式合并错误,lastChunkFlag错误", 1, fileChunkList.get(0).getLastChunkFlag());
+ Assert.assertEquals("seek模式合并错误,chunkCount错误", emlChunkCount - 1, fileChunkList.get(0).getChunkCount());
+ Assert.assertEquals("seek模式合并错误,文件长度错误", emlFileBytes.length, fileChunkList.get(0).getChunk().length);
+ Assert.assertEquals("seek模式合并错误,文件内容错误", new String(emlFileBytes), new String(fileChunkList.get(0).getChunk()));
+ //测试append合并模式
+ pcapngFileChunks.add(pcapngFileChunks.get(5));
+ fileChunkList = PublicUtil.combine(pcapngFileChunks, maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter);
+ Assert.assertEquals("append模式合并错误", 1, fileChunkList.size());
+ Assert.assertEquals("append模式合并错误,chunkCount错误", pcapngChunkCount + 1, fileChunkList.get(0).getChunkCount());
+ Assert.assertEquals("append模式合并错误,文件长度错误", pcapngFileBytes.length + pcapChunkData.length(), fileChunkList.get(0).getChunk().length);
+ Assert.assertEquals("append模式合并错误,文件内容错误", new String(pcapngFileBytes) + pcapChunkData, new String(fileChunkList.get(0).getChunk()));
+ //测试合并携带元信息
+ pcapngIncludeMetaFileChunks.add(pcapngIncludeMetaFileChunks.get(5));
+ fileChunkList = PublicUtil.combine(pcapngIncludeMetaFileChunks, maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter);
+ Assert.assertEquals(1, fileChunkList.size());
+ Assert.assertEquals("合并错误,元信息错误", pcapngFileMeta, fileChunkList.get(0).getMeta());
+
+ Assert.assertEquals("监控指标错误", 1, duplicateChunkCounter.getCount());
+ Assert.assertEquals("监控指标错误", 0, combineErrorCounter.getCount());
+ Assert.assertEquals("监控指标错误", emlChunkCount + 1, seekChunkCounter.getCount());
+ Assert.assertEquals("监控指标错误", pcapngChunkCount * 2 + 2, appendChunkCounter.getCount());
+ }
+
+ @Test
+ public void testCombineLostChunk() {
+ categorizeChunks(inputFileChunks);
+ //测试seek合并模式
+ emlFileChunks.remove(emlFileChunks.get(5));
+ List<FileChunk> fileChunkList = PublicUtil.combine(emlFileChunks, maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter);
+ Assert.assertEquals("seek模式合并错误", 2, fileChunkList.size());
+ Assert.assertEquals("seek模式合并错误,lastChunkFlag错误", 1, fileChunkList.get(1).getLastChunkFlag());
+ Assert.assertEquals("append模式合并错误,chunkCount错误", emlChunkCount - 2, fileChunkList.get(0).getChunkCount() + fileChunkList.get(1).getChunkCount());
+ Assert.assertEquals("seek模式合并错误,文件长度错误", emlFileBytes.length - 2000, fileChunkList.get(0).getLength() + fileChunkList.get(1).getLength());
+ //测试append合并模式
+ pcapngFileChunks.remove(pcapngFileChunks.get(5));
+ fileChunkList = PublicUtil.combine(pcapngFileChunks, maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter);
+ Assert.assertEquals("append模式合并错误", 1, fileChunkList.size());
+ Assert.assertEquals("append模式合并错误,chunkCount错误", pcapngChunkCount - 1, fileChunkList.get(0).getChunkCount());
+ Assert.assertEquals("append模式合并错误,文件长度错误", pcapngFileBytes.length - pcapChunkData.length(), fileChunkList.get(0).getChunk().length);
+ Assert.assertEquals("append模式合并错误,文件内容错误", new String(ArrayUtil.sub(pcapngFileBytes, 0, pcapngFileBytes.length - pcapChunkData.length())), new String(fileChunkList.get(0).getChunk()));
+ //测试合并携带元信息
+ pcapngIncludeMetaFileChunks.remove(pcapngIncludeMetaFileChunks.get(5));
+ fileChunkList = PublicUtil.combine(pcapngIncludeMetaFileChunks, maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter);
+ Assert.assertEquals(1, fileChunkList.size());
+ Assert.assertEquals("合并错误,元信息错误", pcapngFileMeta, fileChunkList.get(0).getMeta());
+
+ Assert.assertEquals("监控指标错误", 0, duplicateChunkCounter.getCount());
+ Assert.assertEquals("监控指标错误", 0, combineErrorCounter.getCount());
+ Assert.assertEquals("监控指标错误", emlChunkCount - 1, seekChunkCounter.getCount());
+ Assert.assertEquals("监控指标错误", pcapngChunkCount * 2 - 2, appendChunkCounter.getCount());
+ }
+
+ @Test
+ public void testSendToHos() {
+ byte[] data = RandomUtil.randomString(1000).getBytes();
+ //seek模式
+ FileChunk fileChunk = new FileChunk();
+ fileChunk.setUuid("0000000001");
+ fileChunk.setCombineMode("seek");
+ fileChunk.setFileType("eml");
+ fileChunk.setOffset(0);
+ fileChunk.setLength(data.length);
+ fileChunk.setLastChunkFlag(1);
+ fileChunk.setChunkCount(5);
+ fileChunk.setTimestamp(System.currentTimeMillis());
+ fileChunk.setChunk(data);
+ PublicUtil.sendToHos(fileChunk, configuration, sendHosErrorCounter);
+ Assert.assertEquals("上传文件到hos错误", 0, sendHosErrorCounter.getCount());
+ //append模式
+ fileChunk = new FileChunk();
+ fileChunk.setUuid("0000000002");
+ fileChunk.setCombineMode("append");
+ fileChunk.setFileType("pcapng");
+ fileChunk.setLength(data.length);
+ fileChunk.setChunkCount(5);
+ fileChunk.setTimestamp(System.currentTimeMillis());
+ fileChunk.setChunk(data);
+ fileChunk.setChunkNumbers("1-200,2-200,3-200,4-200,5-200");
+ fileChunk.setMeta(pcapngFileMeta);
+ PublicUtil.sendToHos(fileChunk, configuration, sendHosErrorCounter);
+ Assert.assertEquals("上传文件到hos错误", 0, sendHosErrorCounter.getCount());
+ }
+
+ @Test
+ public void testPipelineFullChunk() throws Exception {
+ CollectSink.values.clear();
+ long windowTime = 5;
+ messagePackList.sort(Comparator.comparingInt(Arrays::hashCode));//打乱顺序
+ StreamExecutionEnvironment env = createPipeline(2, new ByteDataSource(messagePackList, 500, windowTime), windowTime, 1);
+ env.execute();
+ List<FileChunk> fileChunks = CollectSink.values;
+ Assert.assertTrue("合并错误,sink输出错误", fileChunks.size() > 0);
+ categorizeChunks(fileChunks);
+ byte[] data = new byte[0];
+ long length = 0;
+ long chunkCount = 0;
+ int lastChunkFlag = 0;
+ emlFileChunks.sort(Comparator.comparingLong(FileChunk::getOffset));
+ for (FileChunk fileChunk : emlFileChunks) {
+ data = ArrayUtil.addAll(data, fileChunk.getChunk());
+ length += fileChunk.getLength();
+ chunkCount += fileChunk.getChunkCount();
+ if (fileChunk.getLastChunkFlag() == 1) {
+ lastChunkFlag = 1;
+ }
+ }
+ Assert.assertEquals("seek模式合并错误,lastChunkFlag错误", 1, lastChunkFlag);
+ Assert.assertEquals("seek模式合并错误,chunkCount错误", emlChunkCount - 1, chunkCount);
+ Assert.assertEquals("seek模式合并错误,文件长度错误", emlFile.length(), length);
+ Assert.assertEquals("seek模式合并错误,文件内容错误", new String(emlFileBytes), new String(data));
+ data = new byte[0];
+ length = 0;
+ chunkCount = 0;
+ for (FileChunk fileChunk : pcapngFileChunks) {
+ data = ArrayUtil.addAll(data, fileChunk.getChunk());
+ length += fileChunk.getLength();
+ chunkCount += fileChunk.getChunkCount();
+ }
+ Assert.assertEquals("append模式合并错误,chunkCount错误", pcapngChunkCount, chunkCount);
+ Assert.assertEquals("append模式合并错误,文件长度错误", pcapngFileBytes.length, length);
+ Assert.assertEquals("append模式合并错误,文件内容错误", new String(pcapngFileBytes), new String(data));
+ data = new byte[0];
+ length = 0;
+ chunkCount = 0;
+ for (FileChunk fileChunk : pcapngIncludeMetaFileChunks) {
+ data = ArrayUtil.addAll(data, fileChunk.getChunk());
+ length += fileChunk.getLength();
+ chunkCount += fileChunk.getChunkCount();
+ Assert.assertEquals("合并错误,元信息错误", pcapngFileMeta, fileChunk.getMeta());
+ }
+ }
+
+ @Test
+ public void testPipelineLostChunk() throws Exception {
+ CollectSink.values.clear();
+ long windowTime = 5;
+ //删除部分chunk
+ messagePackList.remove(5);
+ messagePackList.remove(15);
+ messagePackList.remove(25);
+ messagePackList.sort(Comparator.comparingInt(Arrays::hashCode));//打乱顺序
+ StreamExecutionEnvironment env = createPipeline(2, new ByteDataSource(messagePackList, 500, windowTime), windowTime, 1);
+ env.execute();
+ List<FileChunk> fileChunks = CollectSink.values;
+ Assert.assertTrue("合并错误,sink输出错误", fileChunks.size() > 0);
+ categorizeChunks(fileChunks);
+ byte[] data = new byte[0];
+ long length = 0;
+ long chunkCount = 0;
+ int lastChunkFlag = 0;
+ emlFileChunks.sort(Comparator.comparingLong(FileChunk::getOffset));
+ for (FileChunk fileChunk : emlFileChunks) {
+ data = ArrayUtil.addAll(data, fileChunk.getChunk());
+ length += fileChunk.getLength();
+ chunkCount += fileChunk.getChunkCount();
+ if (fileChunk.getLastChunkFlag() == 1) {
+ lastChunkFlag = 1;
+ }
+ }
+ Assert.assertEquals("seek模式合并错误,lastChunkFlag错误", 1, lastChunkFlag);
+ Assert.assertEquals("seek模式合并错误,chunkCount错误", emlChunkCount - 2, chunkCount);
+ Assert.assertEquals("seek模式合并错误,文件长度错误", emlFileBytes.length - 2000, length);
+ data = new byte[0];
+ length = 0;
+ chunkCount = 0;
+ for (FileChunk fileChunk : pcapngFileChunks) {
+ data = ArrayUtil.addAll(data, fileChunk.getChunk());
+ length += fileChunk.getLength();
+ chunkCount += fileChunk.getChunkCount();
+ }
+ Assert.assertEquals("append模式合并错误,chunkCount错误", pcapngChunkCount - 1, chunkCount);
+ Assert.assertEquals("append模式合并错误,文件长度错误", pcapngFileBytes.length - pcapChunkData.length(), length);
+ Assert.assertEquals("append模式合并错误,文件内容错误", new String(ArrayUtil.sub(pcapngFileBytes, 0, pcapngFileBytes.length - pcapChunkData.length())), new String(data));
+ data = new byte[0];
+ length = 0;
+ chunkCount = 0;
+ for (FileChunk fileChunk : pcapngIncludeMetaFileChunks) {
+ data = ArrayUtil.addAll(data, fileChunk.getChunk());
+ length += fileChunk.getLength();
+ chunkCount += fileChunk.getChunkCount();
+ Assert.assertEquals("合并错误,元信息错误", pcapngFileMeta, fileChunk.getMeta());
+ }
+ }
+
+ @Test
+ public void testPipelineDuplicateChunk() throws Exception {
+ CollectSink.values.clear();
+ long windowTime = 5;
+ //添加重复chunk
+ messagePackList.add(messagePackList.get(5));
+ messagePackList.add(messagePackList.get(15));
+ messagePackList.add(messagePackList.get(25));
+ messagePackList.sort(Comparator.comparingInt(Arrays::hashCode));//打乱顺序
+ StreamExecutionEnvironment env = createPipeline(2, new ByteDataSource(messagePackList, 500, windowTime), windowTime, 1);
+ env.execute();
+ List<FileChunk> fileChunks = CollectSink.values;
+ Assert.assertTrue("合并错误,sink输出错误", fileChunks.size() > 0);
+ categorizeChunks(fileChunks);
+ byte[] data = new byte[0];
+ long length = 0;
+ long chunkCount = 0;
+ int lastChunkFlag = 0;
+ emlFileChunks.sort(Comparator.comparingLong(FileChunk::getOffset));
+ for (FileChunk fileChunk : emlFileChunks) {
+ data = ArrayUtil.addAll(data, fileChunk.getChunk());
+ length += fileChunk.getLength();
+ chunkCount += fileChunk.getChunkCount();
+ if (fileChunk.getLastChunkFlag() == 1) {
+ lastChunkFlag = 1;
+ }
+ }
+ Assert.assertEquals("seek模式合并错误,lastChunkFlag错误", 1, lastChunkFlag);
+ Assert.assertEquals("seek模式合并错误,chunkCount错误", emlChunkCount - 1, chunkCount);
+ Assert.assertEquals("seek模式合并错误,文件长度错误", emlFileBytes.length, length);
+ data = new byte[0];
+ length = 0;
+ chunkCount = 0;
+ for (FileChunk fileChunk : pcapngFileChunks) {
+ data = ArrayUtil.addAll(data, fileChunk.getChunk());
+ length += fileChunk.getLength();
+ chunkCount += fileChunk.getChunkCount();
+ }
+ Assert.assertEquals("append模式合并错误,chunkCount错误", pcapngChunkCount + 1, chunkCount);
+ Assert.assertEquals("append模式合并错误,文件长度错误", pcapngFileBytes.length + pcapChunkData.length(), length);
+ Assert.assertEquals("append模式合并错误,文件内容错误", new String(ArrayUtil.addAll(pcapngFileBytes, ArrayUtil.sub(pcapngFileBytes, 0, pcapChunkData.length()))), new String(data));
+ data = new byte[0];
+ length = 0;
+ chunkCount = 0;
+ for (FileChunk fileChunk : pcapngIncludeMetaFileChunks) {
+ data = ArrayUtil.addAll(data, fileChunk.getChunk());
+ length += fileChunk.getLength();
+ chunkCount += fileChunk.getChunkCount();
+ Assert.assertEquals("合并错误,元信息错误", pcapngFileMeta, fileChunk.getMeta());
+ }
+ }
+
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(1)
+ .setNumberTaskManagers(2)
+ .build());
+
+ private static class CollectSink implements SinkFunction<FileChunk> {
+ private static final List<FileChunk> values = Collections.synchronizedList(new ArrayList<>());
+
+ @Override
+ public void invoke(FileChunk value, Context context) {
+ values.add(value);
+ }
+ }
+
+ private static class ByteDataSource implements SourceFunction<byte[]> {
+ private volatile boolean isRunning = true;
+ private final List<byte[]> dataList;
+ private final long delay;
+ private final long windowTime;
+
+ ByteDataSource(List<byte[]> dataList, long delay, long windowTime) {
+ this.dataList = dataList;
+ this.delay = delay;
+ this.windowTime = windowTime;
+ }
+
+ @Override
+ public void run(SourceContext<byte[]> ctx) throws Exception {
+ int index = 0;
+ while (isRunning && index < dataList.size()) {
+ byte[] record = dataList.get(index);
+ ctx.collect(record);
+ index++;
+ Thread.sleep(delay);
+ }
+ // 发送完数据后,等待窗口执行完成
+ Thread.sleep(windowTime * 1000);
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+ }
+
+ @Test
+ public void testCombineChunkProcessWindowFunction() throws Exception {
+ ListStateDescriptor listStateDescriptor = new ListStateDescriptor<FileChunk>("test-window", new ListSerializer(new JavaSerializer()));
+ WindowOperator<String, FileChunk, FileChunk, FileChunk, TimeWindow> operator = new WindowOperator<String, FileChunk, FileChunk, FileChunk, TimeWindow>(
+ TumblingProcessingTimeWindows.of(Time.seconds(3)),
+ new TimeWindow.Serializer(),
+ new FileChunkKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ listStateDescriptor,
+ new InternalIterableProcessWindowFunction(new CombineChunkProcessWindowFunction(configuration)),
+ ProcessingTimeTrigger.create(),
+ 0L, null);
+ KeyedOneInputStreamOperatorTestHarness<String, FileChunk, FileChunk> testHarness = new KeyedOneInputStreamOperatorTestHarness<String, FileChunk, FileChunk>(operator, new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+ testHarness.open();
+ testHarness.setProcessingTime(3L);
+ testHarness.processElement(new StreamRecord<>(inputFileChunks.get(0)));
+ testHarness.processElement(new StreamRecord<>(inputFileChunks.get(1)));
+ testHarness.processElement(new StreamRecord<>(inputFileChunks.get(2)));
+ testHarness.processElement(new StreamRecord<>(inputFileChunks.get(3)));
+ testHarness.processElement(new StreamRecord<>(inputFileChunks.get(4)));
+ testHarness.setProcessingTime(5000L);
+ expectedOutput.add(new StreamRecord<>(PublicUtil.combine(inputFileChunks.subList(0, 5), maxChunkCount, duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter).get(0), 2999L));
+ ConcurrentLinkedQueue<Object> actualOutput = testHarness.getOutput();
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, actualOutput, new Comparator<Object>() {
+ @Override
+ public int compare(Object o1, Object o2) {
+ StreamRecord sr0 = (StreamRecord) o1;
+ StreamRecord sr1 = (StreamRecord) o2;
+ return Long.compare(((FileChunk) sr0.getValue()).getOffset(), ((FileChunk) sr1.getValue()).getOffset());
+ }
+ });
+ }
+
+ @Test
+ public void testMock() throws Exception {
+ ProcessWindowFunctionMock mock = Mockito.mock(ProcessWindowFunctionMock.class);
+ InternalIterableProcessWindowFunction<FileChunk, FileChunk, String, TimeWindow> windowFunction = new InternalIterableProcessWindowFunction<>(mock);
+ TypeInformation<FileChunk> fileChunkType = PojoTypeInfo.of(FileChunk.class);
+ ExecutionConfig execConf = new ExecutionConfig();
+ execConf.setParallelism(5);
+ StreamingFunctionUtils.setOutputType(windowFunction, fileChunkType, execConf);
+ Mockito.verify(mock).setOutputType(fileChunkType, execConf);
+ Configuration config = new Configuration();
+ windowFunction.open(config);
+ Mockito.verify(mock).open(config);
+ RuntimeContext rCtx = Mockito.mock(RuntimeContext.class);
+ windowFunction.setRuntimeContext(rCtx);
+ (Mockito.verify(mock)).setRuntimeContext(rCtx);
+ TimeWindow w = Mockito.mock(TimeWindow.class);
+ Iterable<FileChunk> i = Mockito.mock(Iterable.class);
+ Collector<FileChunk> c = Mockito.mock(Collector.class);
+ InternalWindowFunction.InternalWindowContext ctx = Mockito.mock(InternalWindowFunction.InternalWindowContext.class);
+ (Mockito.doAnswer(new Answer() {
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ProcessWindowFunction<FileChunk, FileChunk, String, TimeWindow>.Context c = (ProcessWindowFunction.Context) invocationOnMock.getArguments()[1];
+ c.currentProcessingTime();
+ c.currentWatermark();
+ c.windowState();
+ c.globalState();
+ return null;
+ }
+ }).when(mock)).process(Mockito.anyString(), Mockito.anyObject(), Mockito.eq(i), Mockito.eq(c));
+ windowFunction.process("", w, ctx, i, c);
+ Mockito.verify(ctx).currentProcessingTime();
+ Mockito.verify(ctx).currentWatermark();
+ Mockito.verify(ctx).windowState();
+ Mockito.verify(ctx).globalState();
+ windowFunction.close();
+ Mockito.verify(mock).close();
+ }
+
+ private static class ProcessWindowFunctionMock extends ProcessWindowFunction<FileChunk, FileChunk, String, TimeWindow> implements OutputTypeConfigurable<FileChunk> {
+ private static final long serialVersionUID = 1L;
+
+ private ProcessWindowFunctionMock() {
+ }
+
+ @Override
+ public void process(String s, Context context, Iterable<FileChunk> elements, Collector<FileChunk> out) throws Exception {
+ }
+
+ public void setOutputType(TypeInformation<FileChunk> outTypeInfo, ExecutionConfig executionConfig) {
+ }
+ }
+
+ private StreamExecutionEnvironment createPipeline(int parallelism, SourceFunction<byte[]> source, long windowTime, long windowIdleTime) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(parallelism);
+ WatermarkStrategy<FileChunk> watermarkStrategy = WatermarkStrategy
+ .<FileChunk>forBoundedOutOfOrderness(Duration.ofSeconds(0))
+ .withTimestampAssigner((FileChunk, timestamp) -> FileChunk.getTimestamp() / 1000);
+ List<Trigger<Object, TimeWindow>> triggers = new ArrayList<>();
+ triggers.add(EventTimeTrigger.create());
+ triggers.add(LastChunkOrNoDataInTimeTrigger.of(windowIdleTime * 1000));
+ Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
+ env.addSource(source)
+ .map(new ParseMessagePackMapFunction())
+ .filter((FilterFunction<FileChunk>) Objects::nonNull)
+ .assignTimestampsAndWatermarks(watermarkStrategy)
+ .keyBy(new FileChunkKeySelector())
+ .window(TumblingEventTimeWindows.of(Time.seconds(windowTime)))
+ .trigger(trigger)
+ .process(new CombineChunkProcessWindowFunction(configuration))
+ .addSink(new CollectSink());
+ return env;
+ }
+
+ private void categorizeChunks(List<FileChunk> fileChunks) {
+ for (FileChunk fileChunk : fileChunks) {
+ if (emlUuid.equals(fileChunk.getUuid())) {
+ emlFileChunks.add(fileChunk);
+ } else if (pcapngUuid.equals(fileChunk.getUuid())) {
+ pcapngFileChunks.add(fileChunk);
+ } else if (pcapngIncludeMetaUuid.equals(fileChunk.getUuid())) {
+ pcapngIncludeMetaFileChunks.add(fileChunk);
+ }
+ }
+ }
+
+}