diff options
| author | houjinchuan <[email protected]> | 2024-03-14 16:31:54 +0800 |
|---|---|---|
| committer | houjinchuan <[email protected]> | 2024-03-14 16:31:54 +0800 |
| commit | 0243e2b6867c35889af8471122be996f22aeada4 (patch) | |
| tree | 4256e9b3464840d0898a627fe2578da15ebc0173 /src | |
| parent | 38e1049fa06dd51b7d9d8f610b7d100b1e38bb42 (diff) | |
限流时支持根据表达式忽略重要文件
Diffstat (limited to 'src')
5 files changed, 44 insertions, 18 deletions
diff --git a/src/main/java/com/zdjizhi/FileChunkCombiner.java b/src/main/java/com/zdjizhi/FileChunkCombiner.java index 0bcbb19..e470d42 100644 --- a/src/main/java/com/zdjizhi/FileChunkCombiner.java +++ b/src/main/java/com/zdjizhi/FileChunkCombiner.java @@ -40,7 +40,7 @@ public class FileChunkCombiner { SingleOutputStreamOperator<FileChunk> parseMessagePackStream = environment .addSource(KafkaConsumer.byteArrayConsumer(configuration)) .name("Kafka Source") - .map(new ParseMessagePackMapFunction(configuration.get(Configs.MAP_ENABLE_RATE_LIMIT), configuration.get(Configs.MAP_RATE_LIMIT_THRESHOLD))) + .map(new ParseMessagePackMapFunction(configuration.get(Configs.ENABLE_RATE_LIMIT), configuration.get(Configs.RATE_LIMIT_THRESHOLD), configuration.get(Configs.RATE_LIMIT_EXCLUSION_EXPRESSION))) .name("Map: Parse Message Pack") .filter(new FileChunkFilterFunction(configuration.getLong(Configs.FILE_MAX_SIZE), configuration.getString(Configs.FILTER_EXPRESSION))) .assignTimestampsAndWatermarks(watermarkStrategy); diff --git a/src/main/java/com/zdjizhi/config/Configs.java b/src/main/java/com/zdjizhi/config/Configs.java index eaa7765..118ab0e 100644 --- a/src/main/java/com/zdjizhi/config/Configs.java +++ b/src/main/java/com/zdjizhi/config/Configs.java @@ -43,12 +43,15 @@ public class Configs { .stringType() .noDefaultValue(); - public static final ConfigOption<Boolean> MAP_ENABLE_RATE_LIMIT = ConfigOptions.key("map.enable.rate.limit") + public static final ConfigOption<Boolean> ENABLE_RATE_LIMIT = ConfigOptions.key("enable.rate.limit") .booleanType() .defaultValue(false); - public static final ConfigOption<Long> MAP_RATE_LIMIT_THRESHOLD = ConfigOptions.key("map.rate.limit.threshold") + public static final ConfigOption<Long> RATE_LIMIT_THRESHOLD = ConfigOptions.key("rate.limit.threshold") .longType() .defaultValue(Long.MAX_VALUE); + public static final ConfigOption<String> RATE_LIMIT_EXCLUSION_EXPRESSION = ConfigOptions.key("rate.limit.exclusion.expression") + .stringType() + .defaultValue(""); public static final ConfigOption<Integer> COMBINER_WINDOW_PARALLELISM = ConfigOptions.key("combiner.window.parallelism") .intType() diff --git a/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java b/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java index 2535421..4aeefef 100644 --- a/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java +++ b/src/main/java/com/zdjizhi/function/ParseMessagePackMapFunction.java @@ -1,10 +1,12 @@ package com.zdjizhi.function; +import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.pojo.FileChunk; +import org.apache.commons.jexl3.*; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.*; @@ -18,18 +20,21 @@ import static com.zdjizhi.utils.PublicConstants.COMBINE_MODE_APPEND; public class ParseMessagePackMapFunction extends RichMapFunction<byte[], FileChunk> { private static final Log LOG = LogFactory.get(); - + private final boolean enableRateLimit; + private final long rateLimitThreshold; + private final String rateLimitExpression; public transient Counter parseMessagePackCounter; public transient Counter parseMessagePackErrorCounter; public transient Counter rateLimitDropCounter; - private final boolean enableRateLimit; - private final long rateLimitThreshold; private long timestamp; private long count; + private JexlExpression jexlExpression; + private JexlContext jexlContext; - public ParseMessagePackMapFunction(boolean enableRateLimit, long rateLimitThreshold) { + public ParseMessagePackMapFunction(boolean enableRateLimit, long rateLimitThreshold, String rateLimitExpression) { this.rateLimitThreshold = rateLimitThreshold; this.enableRateLimit = enableRateLimit; + this.rateLimitExpression = rateLimitExpression; } @Override @@ -44,24 +49,37 @@ public class ParseMessagePackMapFunction extends RichMapFunction<byte[], FileChu metricGroup.meter("rateLimitDropPerSecond", new MeterView(rateLimitDropCounter)); timestamp = System.currentTimeMillis(); count = 0; + JexlEngine jexlEngine = new JexlBuilder().create(); + jexlExpression = jexlEngine.createExpression(rateLimitExpression); + jexlContext = new MapContext(); } @Override public FileChunk map(byte[] messagePackData) { - FileChunk fileChunk = null; + FileChunk fileChunk = parseMessagePack(messagePackData); if (enableRateLimit) { count++; - if (System.currentTimeMillis() - timestamp < 1000 && count <= rateLimitThreshold) { - fileChunk = parseMessagePack(messagePackData); - } else if (System.currentTimeMillis() - timestamp < 1000 && count > rateLimitThreshold) { + if (System.currentTimeMillis() - timestamp < 1000 && count > rateLimitThreshold) { + if (StrUtil.isNotEmpty(rateLimitExpression)) { + jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk); + if (Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString())) { + return fileChunk; + } + } rateLimitDropCounter.inc(); - } else { + fileChunk = null; + } else if (System.currentTimeMillis() - timestamp >= 1000) { + if (StrUtil.isNotEmpty(rateLimitExpression)) { + jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk); + if (Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString())) { + return fileChunk; + } + } rateLimitDropCounter.inc(); + fileChunk = null; timestamp = System.currentTimeMillis(); count = 0; } - } else { - fileChunk = parseMessagePack(messagePackData); } return fileChunk; } diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 51fee46..ceea19f 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -18,8 +18,13 @@ source.kafka.user=admin source.kafka.pin=galaxy2019 #SSL��Ҫ source.kafka.tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ -map.enable.rate.limit=false -map.rate.limit.threshold=10000 +enable.rate.limit=false +rate.limit.threshold=10000 +rate.limit.exclusion.expression=FileChunk.fileType == "eml" +file.max.chunk.count=100000 +file.max.size=1073741824 +#�����ֶι��ˣ�java����ʽ +#filter.expression=FileChunk.fileType == "txt" || FileChunk.fileType == "eml" #����������� combiner.window.parallelism=2 combiner.window.time=10 diff --git a/src/test/java/com/zdjizhi/FileChunkCombinerTests.java b/src/test/java/com/zdjizhi/FileChunkCombinerTests.java index 28cc429..acf4926 100644 --- a/src/test/java/com/zdjizhi/FileChunkCombinerTests.java +++ b/src/test/java/com/zdjizhi/FileChunkCombinerTests.java @@ -122,7 +122,7 @@ public class FileChunkCombinerTests { @Test public void testParseMessagePackMapFunction() throws Exception { - ParseMessagePackMapFunction mapFunction = new ParseMessagePackMapFunction(false, Long.MAX_VALUE); + ParseMessagePackMapFunction mapFunction = new ParseMessagePackMapFunction(false, Long.MAX_VALUE,""); OneInputStreamOperatorTestHarness<byte[], FileChunk> testHarness = new OneInputStreamOperatorTestHarness<>(new StreamMap<>(mapFunction)); testHarness.setup(); testHarness.open(); @@ -617,7 +617,7 @@ public class FileChunkCombinerTests { triggers.add(LastChunkOrNoDataInTimeTrigger.of(windowIdleTime * 1000)); Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers); env.addSource(source) - .map(new ParseMessagePackMapFunction(false, Long.MAX_VALUE)) + .map(new ParseMessagePackMapFunction(false, Long.MAX_VALUE,"")) .filter(new FileChunkFilterFunction(Long.MAX_VALUE, "")) .assignTimestampsAndWatermarks(watermarkStrategy) .keyBy(new FileChunkKeySelector(), BasicTypeInfo.STRING_TYPE_INFO) |
