summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorhoujinchuan <[email protected]>2024-03-14 16:31:54 +0800
committerhoujinchuan <[email protected]>2024-03-14 16:31:54 +0800
commit0243e2b6867c35889af8471122be996f22aeada4 (patch)
tree4256e9b3464840d0898a627fe2578da15ebc0173 /src
parent38e1049fa06dd51b7d9d8f610b7d100b1e38bb42 (diff)
限流时支持根据表达式忽略重要文件
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/zdjizhi/FileChunkCombiner.java2
-rw-r--r--src/main/java/com/zdjizhi/config/Configs.java7
-rw-r--r--src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java40
-rw-r--r--src/main/resources/common.properties9
-rw-r--r--src/test/java/com/zdjizhi/FileChunkCombinerTests.java4
5 files changed, 44 insertions, 18 deletions
diff --git a/src/main/java/com/zdjizhi/FileChunkCombiner.java b/src/main/java/com/zdjizhi/FileChunkCombiner.java
index 0bcbb19..e470d42 100644
--- a/src/main/java/com/zdjizhi/FileChunkCombiner.java
+++ b/src/main/java/com/zdjizhi/FileChunkCombiner.java
@@ -40,7 +40,7 @@ public class FileChunkCombiner {
SingleOutputStreamOperator<FileChunk> parseMessagePackStream = environment
.addSource(KafkaConsumer.byteArrayConsumer(configuration))
.name("Kafka Source")
- .map(new ParseMessagePackMapFunction(configuration.get(Configs.MAP_ENABLE_RATE_LIMIT), configuration.get(Configs.MAP_RATE_LIMIT_THRESHOLD)))
+ .map(new ParseMessagePackMapFunction(configuration.get(Configs.ENABLE_RATE_LIMIT), configuration.get(Configs.RATE_LIMIT_THRESHOLD), configuration.get(Configs.RATE_LIMIT_EXCLUSION_EXPRESSION)))
.name("Map: Parse Message Pack")
.filter(new FileChunkFilterFunction(configuration.getLong(Configs.FILE_MAX_SIZE), configuration.getString(Configs.FILTER_EXPRESSION)))
.assignTimestampsAndWatermarks(watermarkStrategy);
diff --git a/src/main/java/com/zdjizhi/config/Configs.java b/src/main/java/com/zdjizhi/config/Configs.java
index eaa7765..118ab0e 100644
--- a/src/main/java/com/zdjizhi/config/Configs.java
+++ b/src/main/java/com/zdjizhi/config/Configs.java
@@ -43,12 +43,15 @@ public class Configs {
.stringType()
.noDefaultValue();
- public static final ConfigOption<Boolean> MAP_ENABLE_RATE_LIMIT = ConfigOptions.key("map.enable.rate.limit")
+ public static final ConfigOption<Boolean> ENABLE_RATE_LIMIT = ConfigOptions.key("enable.rate.limit")
.booleanType()
.defaultValue(false);
- public static final ConfigOption<Long> MAP_RATE_LIMIT_THRESHOLD = ConfigOptions.key("map.rate.limit.threshold")
+ public static final ConfigOption<Long> RATE_LIMIT_THRESHOLD = ConfigOptions.key("rate.limit.threshold")
.longType()
.defaultValue(Long.MAX_VALUE);
+ public static final ConfigOption<String> RATE_LIMIT_EXCLUSION_EXPRESSION = ConfigOptions.key("rate.limit.exclusion.expression")
+ .stringType()
+ .defaultValue("");
public static final ConfigOption<Integer> COMBINER_WINDOW_PARALLELISM = ConfigOptions.key("combiner.window.parallelism")
.intType()
diff --git a/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java b/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java
index 2535421..4aeefef 100644
--- a/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java
+++ b/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java
@@ -1,10 +1,12 @@
package com.zdjizhi.function;
+import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.pojo.FileChunk;
+import org.apache.commons.jexl3.*;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.*;
@@ -18,18 +20,21 @@ import static com.zdjizhi.utils.PublicConstants.COMBINE_MODE_APPEND;
public class ParseMessagePackMapFunction extends RichMapFunction<byte[], FileChunk> {
private static final Log LOG = LogFactory.get();
-
+ private final boolean enableRateLimit;
+ private final long rateLimitThreshold;
+ private final String rateLimitExpression;
public transient Counter parseMessagePackCounter;
public transient Counter parseMessagePackErrorCounter;
public transient Counter rateLimitDropCounter;
- private final boolean enableRateLimit;
- private final long rateLimitThreshold;
private long timestamp;
private long count;
+ private JexlExpression jexlExpression;
+ private JexlContext jexlContext;
- public ParseMessagePackMapFunction(boolean enableRateLimit, long rateLimitThreshold) {
+ public ParseMessagePackMapFunction(boolean enableRateLimit, long rateLimitThreshold, String rateLimitExpression) {
this.rateLimitThreshold = rateLimitThreshold;
this.enableRateLimit = enableRateLimit;
+ this.rateLimitExpression = rateLimitExpression;
}
@Override
@@ -44,24 +49,37 @@ public class ParseMessagePackMapFunction extends RichMapFunction<byte[], FileChu
metricGroup.meter("rateLimitDropPerSecond", new MeterView(rateLimitDropCounter));
timestamp = System.currentTimeMillis();
count = 0;
+ JexlEngine jexlEngine = new JexlBuilder().create();
+ jexlExpression = jexlEngine.createExpression(rateLimitExpression);
+ jexlContext = new MapContext();
}
@Override
public FileChunk map(byte[] messagePackData) {
- FileChunk fileChunk = null;
+ FileChunk fileChunk = parseMessagePack(messagePackData);
if (enableRateLimit) {
count++;
- if (System.currentTimeMillis() - timestamp < 1000 && count <= rateLimitThreshold) {
- fileChunk = parseMessagePack(messagePackData);
- } else if (System.currentTimeMillis() - timestamp < 1000 && count > rateLimitThreshold) {
+ if (System.currentTimeMillis() - timestamp < 1000 && count > rateLimitThreshold) {
+ if (StrUtil.isNotEmpty(rateLimitExpression)) {
+ jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk);
+ if (Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString())) {
+ return fileChunk;
+ }
+ }
rateLimitDropCounter.inc();
- } else {
+ fileChunk = null;
+ } else if (System.currentTimeMillis() - timestamp >= 1000) {
+ if (StrUtil.isNotEmpty(rateLimitExpression)) {
+ jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk);
+ if (Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString())) {
+ return fileChunk;
+ }
+ }
rateLimitDropCounter.inc();
+ fileChunk = null;
timestamp = System.currentTimeMillis();
count = 0;
}
- } else {
- fileChunk = parseMessagePack(messagePackData);
}
return fileChunk;
}
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
index 51fee46..ceea19f 100644
--- a/src/main/resources/common.properties
+++ b/src/main/resources/common.properties
@@ -18,8 +18,13 @@ source.kafka.user=admin
source.kafka.pin=galaxy2019
#SSL��Ҫ
source.kafka.tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
-map.enable.rate.limit=false
-map.rate.limit.threshold=10000
+enable.rate.limit=false
+rate.limit.threshold=10000
+rate.limit.exclusion.expression=FileChunk.fileType == "eml"
+file.max.chunk.count=100000
+file.max.size=1073741824
+#�����ֶι��ˣ�java����ʽ
+#filter.expression=FileChunk.fileType == "txt" || FileChunk.fileType == "eml"
#�����������
combiner.window.parallelism=2
combiner.window.time=10
diff --git a/src/test/java/com/zdjizhi/FileChunkCombinerTests.java b/src/test/java/com/zdjizhi/FileChunkCombinerTests.java
index 28cc429..acf4926 100644
--- a/src/test/java/com/zdjizhi/FileChunkCombinerTests.java
+++ b/src/test/java/com/zdjizhi/FileChunkCombinerTests.java
@@ -122,7 +122,7 @@ public class FileChunkCombinerTests {
@Test
public void testParseMessagePackMapFunction() throws Exception {
- ParseMessagePackMapFunction mapFunction = new ParseMessagePackMapFunction(false, Long.MAX_VALUE);
+ ParseMessagePackMapFunction mapFunction = new ParseMessagePackMapFunction(false, Long.MAX_VALUE,"");
OneInputStreamOperatorTestHarness<byte[], FileChunk> testHarness = new OneInputStreamOperatorTestHarness<>(new StreamMap<>(mapFunction));
testHarness.setup();
testHarness.open();
@@ -617,7 +617,7 @@ public class FileChunkCombinerTests {
triggers.add(LastChunkOrNoDataInTimeTrigger.of(windowIdleTime * 1000));
Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
env.addSource(source)
- .map(new ParseMessagePackMapFunction(false, Long.MAX_VALUE))
+ .map(new ParseMessagePackMapFunction(false, Long.MAX_VALUE,""))
.filter(new FileChunkFilterFunction(Long.MAX_VALUE, ""))
.assignTimestampsAndWatermarks(watermarkStrategy)
.keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO)