From 2a830396acf9e79683f9e5fbfffc7bf3d158a614 Mon Sep 17 00:00:00 2001 From: lifengchao Date: Fri, 10 May 2024 10:42:39 +0800 Subject: [feature][connector-kafka] GAL-557 Groot Stream Kafka Sink 支持限速配置:超速阻塞时一直阻塞不drop修复,添加窗口大小参数 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../connectors/kafka/KafkaConnectorOptions.java | 6 ++++ .../connectors/kafka/KafkaTableFactory.java | 2 ++ .../kafka/rate/BlockDropRateLimitingStrategy.java | 35 ++++++++++++---------- .../kafka/rate/SlidingWindowRateStat.java | 32 ++++++++++++++------ .../kafka/rate/RateLimitingStrategyTest.java | 8 ++--- 5 files changed, 54 insertions(+), 29 deletions(-) diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java index 8ab6bc8..1339283 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java @@ -36,6 +36,12 @@ public class KafkaConnectorOptions { .defaultValue("10Mbps") .withDescription("Limit rate, examples: 10Mbps, 10Kbps, 10240bps."); + public static final ConfigOption RATE_LIMITING_WINDOW_SIZE = + ConfigOptions.key("rate.limiting.window.size") + .intType() + .defaultValue(5) + .withDescription("window size, unit second, default 5 second."); + public static final ConfigOption RATE_LIMITING_BLOCK_DURATION = ConfigOptions.key("rate.limiting.block.duration") .durationType() diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java index c3b483f..fbbaed2 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java @@ -78,6 +78,7 @@ public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory { options.add(LOG_FAILURES_ONLY); options.add(RATE_LIMITING_STRATEGY); options.add(RATE_LIMITING_LIMIT_RATE); + options.add(RATE_LIMITING_WINDOW_SIZE); options.add(RATE_LIMITING_BLOCK_DURATION); options.add(RATE_LIMITING_BLOCK_RESET_DURATION); return options; @@ -90,6 +91,7 @@ public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory { return new NoRateLimitingStrategy(); case SLIDING_WINDOW: return new BlockDropRateLimitingStrategy( + config.get(RATE_LIMITING_WINDOW_SIZE), parseRateLimitingRate(config.get(RATE_LIMITING_LIMIT_RATE)), config.get(RATE_LIMITING_BLOCK_DURATION).toMillis(), config.get(RATE_LIMITING_BLOCK_RESET_DURATION).toMillis()); diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/BlockDropRateLimitingStrategy.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/BlockDropRateLimitingStrategy.java index bb59486..889084c 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/BlockDropRateLimitingStrategy.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/BlockDropRateLimitingStrategy.java @@ -5,12 +5,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Timestamp; -import java.util.Arrays; public class BlockDropRateLimitingStrategy implements RateLimitingStrategy { private static final Logger LOG = LoggerFactory.getLogger(BlockDropRateLimitingStrategy.class); private static final int blockTsLastSize = 8; private static final int blockTsLastMask = 7; + private final int windowSecond; private final long maxRate; private final boolean blockEnable; private final long maxBlockDuration; // 毫秒 @@ -33,15 +33,16 @@ public class BlockDropRateLimitingStrategy implements RateLimitingStrategy { * @param maxBlockDuration 最长阻塞时长(毫秒), 值为0代表超速直接丢弃不执行阻塞 * @param resetBlockDuration 阻塞重置恢复时长, 超速阻塞后速率恢复正常多长时间后重置超速阻塞状态 */ - public BlockDropRateLimitingStrategy(long maxRate, long maxBlockDuration, long resetBlockDuration) { + public BlockDropRateLimitingStrategy(int windowSecond, long maxRate, long maxBlockDuration, long resetBlockDuration) { Preconditions.checkArgument(maxRate > 0); Preconditions.checkArgument(maxBlockDuration >= 0); Preconditions.checkArgument(resetBlockDuration >= 0); + this.windowSecond = windowSecond; this.maxRate = maxRate; this.blockEnable = maxBlockDuration > 0; this.maxBlockDuration = maxBlockDuration; this.resetBlockDuration = resetBlockDuration; - this.stat = new SlidingWindowRateStat(); + this.stat = new SlidingWindowRateStat(windowSecond); this.firstBlockTs = 0; this.lastBlockTs = 0; this.firstBlockTsLast = new long[blockTsLastSize]; @@ -56,7 +57,7 @@ public class BlockDropRateLimitingStrategy implements RateLimitingStrategy { @Override public BlockDropRateLimitingStrategy withMaxRate(long maxRate) { - return new BlockDropRateLimitingStrategy(maxRate, maxBlockDuration, resetBlockDuration); + return new BlockDropRateLimitingStrategy(windowSecond, maxRate, maxBlockDuration, resetBlockDuration); } @Override @@ -100,10 +101,9 @@ public class BlockDropRateLimitingStrategy implements RateLimitingStrategy { firstBlockTsLast[blockTsLastIndex] = firstBlockTs; lastBlockTsLast[blockTsLastIndex] = lastBlockTs; } else { - if(currentTs - lastBlockTs <= 300L){ - lastBlockTs = currentTs; - lastBlockTsLast[blockTsLastIndex] = lastBlockTs; - } + lastBlockTs = currentTs; + lastBlockTsLast[blockTsLastIndex] = lastBlockTs; + // drop立马return if (lastBlockTs - firstBlockTs > maxBlockDuration) { limitingDropCount += 1; @@ -136,16 +136,19 @@ public class BlockDropRateLimitingStrategy implements RateLimitingStrategy { if (hasLimitingStat && currentTs - limitingStatStartTs > 120000L) { hasLimitingStat = false; StringBuilder blockTsInfos = new StringBuilder(); - int idx; - for (int i = 0; i < blockTsLastSize; i++) { - idx = (blockTsLastIndex - i) & blockTsLastMask; - blockTsInfos.append(new Timestamp(firstBlockTsLast[idx]).toString()) - .append("_").append(new Timestamp(lastBlockTsLast[idx]).toString()).append("_").append(lastBlockTsLast[idx] - firstBlockTsLast[idx]); - if(i != blockTsLastSize - 1){ - blockTsInfos.append(",\n"); + if(blockEnable){ + blockTsInfos.append('\n'); + int idx; + for (int i = 0; i < blockTsLastSize; i++) { + idx = (blockTsLastIndex - i) & blockTsLastMask; + blockTsInfos.append(new Timestamp(firstBlockTsLast[idx]).toString()) + .append("_").append(new Timestamp(lastBlockTsLast[idx]).toString()).append("_").append(lastBlockTsLast[idx] - firstBlockTsLast[idx]); + if(i != blockTsLastSize - 1){ + blockTsInfos.append(",\n"); + } } } - LOG.info("from {} to {} limiting info for max rate({} bytes), blockCount: {}, blockMs: {}, dropCount: {}, dropSize: {},blockTsInfos:\n{}.", new Timestamp(limitingStatStartTs).toString(), new Timestamp(currentTs).toString(), + LOG.info("from {} to {} limiting info for max rate({} bytes), blockCount: {}, blockMs: {}, dropCount: {}, dropSize: {},blockTsInfos:{}.", new Timestamp(limitingStatStartTs).toString(), new Timestamp(currentTs).toString(), maxRate,limitingBlockCount, limitingBlockMs, limitingDropCount, limitingDropSize, blockTsInfos.toString() ); } diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStat.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStat.java index 660ab60..4f5e7c5 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStat.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStat.java @@ -1,21 +1,32 @@ package com.geedgenetworks.connectors.kafka.rate; +import org.apache.flink.util.Preconditions; + import java.io.Serializable; // 专门用于实时统计发送字节速率, 输出最近一秒内速率 public class SlidingWindowRateStat implements Serializable { - private static final long windowSize = 1000; - private static final int bucketSize = 8; - private static final int bucketMask = bucketSize - 1; - private static final long bucketWindowSize = windowSize / bucketSize; + private final long windowSecond; + private final long windowSize; + private final int bucketSize; + private final long bucketWindowSize; private final long[] buckets; private int current; private long currentWindowEndMs; private long totalValue; public SlidingWindowRateStat() { + this(5); + } + + public SlidingWindowRateStat(int windowSecond) { + Preconditions.checkArgument(windowSecond > 0 && windowSecond <= 60); + this.windowSecond = windowSecond; + this.windowSize = windowSecond * 1000; + this.bucketWindowSize = 100; + this.bucketSize = (int) (windowSize / bucketWindowSize); this.buckets = new long[bucketSize]; - this.current = (bucketSize - 1) & bucketMask; + this.current = bucketSize - 1; this.currentWindowEndMs = System.currentTimeMillis() / bucketWindowSize * bucketWindowSize; } @@ -31,24 +42,27 @@ public class SlidingWindowRateStat implements Serializable { currentUpdate(timeMs); buckets[current] += value; totalValue += value; - return totalValue; + return totalValue / windowSecond; } public long getCurrentRate() { long timeMs = System.currentTimeMillis(); currentUpdate(timeMs); - return totalValue; + return totalValue / windowSecond; } public long getCurrentRate(long timeMs) { currentUpdate(timeMs); - return totalValue; + return totalValue / windowSecond; } private void currentUpdate(long timeMs){ if (timeMs > currentWindowEndMs) { do { - current = (current + 1) & bucketMask; + current = current + 1; + if(current >= bucketSize){ + current = 0; + } totalValue -= buckets[current]; buckets[current] = 0; currentWindowEndMs += bucketWindowSize; diff --git a/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyTest.java b/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyTest.java index ebf902f..b25f60e 100644 --- a/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyTest.java +++ b/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyTest.java @@ -50,7 +50,7 @@ public class RateLimitingStrategyTest { } public static void testBlockDropRateLimitingStrategyForDrop() throws Exception{ - BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 0, 0 ); + BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(1,600000, 0, 0 ); int times = 500 * 60; long startTs = System.currentTimeMillis(); @@ -86,7 +86,7 @@ public class RateLimitingStrategyTest { public static void testBlockDropRateLimitingStrategyForBlockDrop() throws Exception{ //BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 10000, 1000 ); - BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 40000, 1000 ); + BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(1,600000, 40000, 1000 ); int times = 500 * 60; long startTs = System.currentTimeMillis(); @@ -127,7 +127,7 @@ public class RateLimitingStrategyTest { public static void testBlockDropRateLimitingStrategyForBlock() throws Exception{ //RateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 10000, 1000 ); - BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, TimeUtils.parseDuration("1d").toMillis(), 1000 ); + BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(1,600000, TimeUtils.parseDuration("1d").toMillis(), 1000 ); int times = 500 * 60; long startTs = System.currentTimeMillis(); @@ -203,7 +203,7 @@ public class RateLimitingStrategyTest { Thread consumerThread = new Thread(() -> { //BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 40000, 1000 ); - BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 30000, 1000 ); + BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(1,600000, 30000, 1000 ); //BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 0, 1000 ); //BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, TimeUtils.parseDuration("1d").toMillis(), 1000 ); System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ",poll_before"); -- cgit v1.2.3