summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java6
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java2
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/BlockDropRateLimitingStrategy.java35
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStat.java32
-rw-r--r--groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyTest.java8
5 files changed, 54 insertions, 29 deletions
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java
index 8ab6bc8..1339283 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java
@@ -36,6 +36,12 @@ public class KafkaConnectorOptions {
.defaultValue("10Mbps")
.withDescription("Limit rate, examples: 10Mbps, 10Kbps, 10240bps.");
+ public static final ConfigOption<Integer> RATE_LIMITING_WINDOW_SIZE =
+ ConfigOptions.key("rate.limiting.window.size")
+ .intType()
+ .defaultValue(5)
+ .withDescription("window size, unit second, default 5 second.");
+
public static final ConfigOption<Duration> RATE_LIMITING_BLOCK_DURATION =
ConfigOptions.key("rate.limiting.block.duration")
.durationType()
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
index c3b483f..fbbaed2 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
@@ -78,6 +78,7 @@ public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory {
options.add(LOG_FAILURES_ONLY);
options.add(RATE_LIMITING_STRATEGY);
options.add(RATE_LIMITING_LIMIT_RATE);
+ options.add(RATE_LIMITING_WINDOW_SIZE);
options.add(RATE_LIMITING_BLOCK_DURATION);
options.add(RATE_LIMITING_BLOCK_RESET_DURATION);
return options;
@@ -90,6 +91,7 @@ public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory {
return new NoRateLimitingStrategy();
case SLIDING_WINDOW:
return new BlockDropRateLimitingStrategy(
+ config.get(RATE_LIMITING_WINDOW_SIZE),
parseRateLimitingRate(config.get(RATE_LIMITING_LIMIT_RATE)),
config.get(RATE_LIMITING_BLOCK_DURATION).toMillis(),
config.get(RATE_LIMITING_BLOCK_RESET_DURATION).toMillis());
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/BlockDropRateLimitingStrategy.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/BlockDropRateLimitingStrategy.java
index bb59486..889084c 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/BlockDropRateLimitingStrategy.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/BlockDropRateLimitingStrategy.java
@@ -5,12 +5,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Timestamp;
-import java.util.Arrays;
public class BlockDropRateLimitingStrategy implements RateLimitingStrategy {
private static final Logger LOG = LoggerFactory.getLogger(BlockDropRateLimitingStrategy.class);
private static final int blockTsLastSize = 8;
private static final int blockTsLastMask = 7;
+ private final int windowSecond;
private final long maxRate;
private final boolean blockEnable;
private final long maxBlockDuration; // 毫秒
@@ -33,15 +33,16 @@ public class BlockDropRateLimitingStrategy implements RateLimitingStrategy {
* @param maxBlockDuration 最长阻塞时长(毫秒), 值为0代表超速直接丢弃不执行阻塞
* @param resetBlockDuration 阻塞重置恢复时长, 超速阻塞后速率恢复正常多长时间后重置超速阻塞状态
*/
- public BlockDropRateLimitingStrategy(long maxRate, long maxBlockDuration, long resetBlockDuration) {
+ public BlockDropRateLimitingStrategy(int windowSecond, long maxRate, long maxBlockDuration, long resetBlockDuration) {
Preconditions.checkArgument(maxRate > 0);
Preconditions.checkArgument(maxBlockDuration >= 0);
Preconditions.checkArgument(resetBlockDuration >= 0);
+ this.windowSecond = windowSecond;
this.maxRate = maxRate;
this.blockEnable = maxBlockDuration > 0;
this.maxBlockDuration = maxBlockDuration;
this.resetBlockDuration = resetBlockDuration;
- this.stat = new SlidingWindowRateStat();
+ this.stat = new SlidingWindowRateStat(windowSecond);
this.firstBlockTs = 0;
this.lastBlockTs = 0;
this.firstBlockTsLast = new long[blockTsLastSize];
@@ -56,7 +57,7 @@ public class BlockDropRateLimitingStrategy implements RateLimitingStrategy {
@Override
public BlockDropRateLimitingStrategy withMaxRate(long maxRate) {
- return new BlockDropRateLimitingStrategy(maxRate, maxBlockDuration, resetBlockDuration);
+ return new BlockDropRateLimitingStrategy(windowSecond, maxRate, maxBlockDuration, resetBlockDuration);
}
@Override
@@ -100,10 +101,9 @@ public class BlockDropRateLimitingStrategy implements RateLimitingStrategy {
firstBlockTsLast[blockTsLastIndex] = firstBlockTs;
lastBlockTsLast[blockTsLastIndex] = lastBlockTs;
} else {
- if(currentTs - lastBlockTs <= 300L){
- lastBlockTs = currentTs;
- lastBlockTsLast[blockTsLastIndex] = lastBlockTs;
- }
+ lastBlockTs = currentTs;
+ lastBlockTsLast[blockTsLastIndex] = lastBlockTs;
+
// drop立马return
if (lastBlockTs - firstBlockTs > maxBlockDuration) {
limitingDropCount += 1;
@@ -136,16 +136,19 @@ public class BlockDropRateLimitingStrategy implements RateLimitingStrategy {
if (hasLimitingStat && currentTs - limitingStatStartTs > 120000L) {
hasLimitingStat = false;
StringBuilder blockTsInfos = new StringBuilder();
- int idx;
- for (int i = 0; i < blockTsLastSize; i++) {
- idx = (blockTsLastIndex - i) & blockTsLastMask;
- blockTsInfos.append(new Timestamp(firstBlockTsLast[idx]).toString())
- .append("_").append(new Timestamp(lastBlockTsLast[idx]).toString()).append("_").append(lastBlockTsLast[idx] - firstBlockTsLast[idx]);
- if(i != blockTsLastSize - 1){
- blockTsInfos.append(",\n");
+ if(blockEnable){
+ blockTsInfos.append('\n');
+ int idx;
+ for (int i = 0; i < blockTsLastSize; i++) {
+ idx = (blockTsLastIndex - i) & blockTsLastMask;
+ blockTsInfos.append(new Timestamp(firstBlockTsLast[idx]).toString())
+ .append("_").append(new Timestamp(lastBlockTsLast[idx]).toString()).append("_").append(lastBlockTsLast[idx] - firstBlockTsLast[idx]);
+ if(i != blockTsLastSize - 1){
+ blockTsInfos.append(",\n");
+ }
}
}
- LOG.info("from {} to {} limiting info for max rate({} bytes), blockCount: {}, blockMs: {}, dropCount: {}, dropSize: {},blockTsInfos:\n{}.", new Timestamp(limitingStatStartTs).toString(), new Timestamp(currentTs).toString(),
+ LOG.info("from {} to {} limiting info for max rate({} bytes), blockCount: {}, blockMs: {}, dropCount: {}, dropSize: {},blockTsInfos:{}.", new Timestamp(limitingStatStartTs).toString(), new Timestamp(currentTs).toString(),
maxRate,limitingBlockCount, limitingBlockMs, limitingDropCount, limitingDropSize, blockTsInfos.toString()
);
}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStat.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStat.java
index 660ab60..4f5e7c5 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStat.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStat.java
@@ -1,21 +1,32 @@
package com.geedgenetworks.connectors.kafka.rate;
+import org.apache.flink.util.Preconditions;
+
import java.io.Serializable;
// 专门用于实时统计发送字节速率, 输出最近一秒内速率
public class SlidingWindowRateStat implements Serializable {
- private static final long windowSize = 1000;
- private static final int bucketSize = 8;
- private static final int bucketMask = bucketSize - 1;
- private static final long bucketWindowSize = windowSize / bucketSize;
+ private final long windowSecond;
+ private final long windowSize;
+ private final int bucketSize;
+ private final long bucketWindowSize;
private final long[] buckets;
private int current;
private long currentWindowEndMs;
private long totalValue;
public SlidingWindowRateStat() {
+ this(5);
+ }
+
+ public SlidingWindowRateStat(int windowSecond) {
+ Preconditions.checkArgument(windowSecond > 0 && windowSecond <= 60);
+ this.windowSecond = windowSecond;
+ this.windowSize = windowSecond * 1000;
+ this.bucketWindowSize = 100;
+ this.bucketSize = (int) (windowSize / bucketWindowSize);
this.buckets = new long[bucketSize];
- this.current = (bucketSize - 1) & bucketMask;
+ this.current = bucketSize - 1;
this.currentWindowEndMs = System.currentTimeMillis() / bucketWindowSize * bucketWindowSize;
}
@@ -31,24 +42,27 @@ public class SlidingWindowRateStat implements Serializable {
currentUpdate(timeMs);
buckets[current] += value;
totalValue += value;
- return totalValue;
+ return totalValue / windowSecond;
}
public long getCurrentRate() {
long timeMs = System.currentTimeMillis();
currentUpdate(timeMs);
- return totalValue;
+ return totalValue / windowSecond;
}
public long getCurrentRate(long timeMs) {
currentUpdate(timeMs);
- return totalValue;
+ return totalValue / windowSecond;
}
private void currentUpdate(long timeMs){
if (timeMs > currentWindowEndMs) {
do {
- current = (current + 1) & bucketMask;
+ current = current + 1;
+ if(current >= bucketSize){
+ current = 0;
+ }
totalValue -= buckets[current];
buckets[current] = 0;
currentWindowEndMs += bucketWindowSize;
diff --git a/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyTest.java b/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyTest.java
index ebf902f..b25f60e 100644
--- a/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyTest.java
+++ b/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyTest.java
@@ -50,7 +50,7 @@ public class RateLimitingStrategyTest {
}
public static void testBlockDropRateLimitingStrategyForDrop() throws Exception{
- BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 0, 0 );
+ BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(1,600000, 0, 0 );
int times = 500 * 60;
long startTs = System.currentTimeMillis();
@@ -86,7 +86,7 @@ public class RateLimitingStrategyTest {
public static void testBlockDropRateLimitingStrategyForBlockDrop() throws Exception{
//BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 10000, 1000 );
- BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 40000, 1000 );
+ BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(1,600000, 40000, 1000 );
int times = 500 * 60;
long startTs = System.currentTimeMillis();
@@ -127,7 +127,7 @@ public class RateLimitingStrategyTest {
public static void testBlockDropRateLimitingStrategyForBlock() throws Exception{
//RateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 10000, 1000 );
- BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, TimeUtils.parseDuration("1d").toMillis(), 1000 );
+ BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(1,600000, TimeUtils.parseDuration("1d").toMillis(), 1000 );
int times = 500 * 60;
long startTs = System.currentTimeMillis();
@@ -203,7 +203,7 @@ public class RateLimitingStrategyTest {
Thread consumerThread = new Thread(() -> {
//BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 40000, 1000 );
- BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 30000, 1000 );
+ BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(1,600000, 30000, 1000 );
//BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 0, 1000 );
//BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, TimeUtils.parseDuration("1d").toMillis(), 1000 );
System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ",poll_before");