summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-04-28 21:58:30 +0800
committerdoufenghu <[email protected]>2024-04-28 21:58:30 +0800
commit08b9bbbf2f47e704611420e5735c38dd0dcec868 (patch)
treec2ff21a30a4711acb164e8551c2711109d87491b
parent09e2f6d23f5257213bcddbae67bee12b65bcf3dd (diff)
parent8f3b1bb2c58267df0c646a5bcb16555f37b6fdea (diff)
Merge branch 'develop' of https://git.mesalab.cn/galaxy/platform/groot-stream into develop
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java26
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java7
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java74
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/BlockDropRateLimitingStrategy.java162
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/NoRateLimitingStrategy.java28
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStatus.java7
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategy.java15
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyType.java5
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStat.java58
-rw-r--r--groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java28
-rw-r--r--groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyTest.java266
-rw-r--r--groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStatTest.java76
12 files changed, 744 insertions, 8 deletions
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java
index b827b42..8ab6bc8 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java
@@ -1,8 +1,10 @@
package com.geedgenetworks.connectors.kafka;
+import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategyType;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import java.time.Duration;
import java.util.List;
import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
@@ -22,6 +24,30 @@ public class KafkaConnectorOptions {
.withDescription("Optional flag to whether the producer should fail on errors, or only log them;\n"
+ "If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown, true by default.");
+ public static final ConfigOption<RateLimitingStrategyType> RATE_LIMITING_STRATEGY =
+ ConfigOptions.key("rate.limiting.strategy")
+ .enumType(RateLimitingStrategyType.class)
+ .defaultValue(RateLimitingStrategyType.NONE)
+ .withDescription("Rate limiting strategy, optionals:none,sliding_window, default none.");
+
+ public static final ConfigOption<String> RATE_LIMITING_LIMIT_RATE =
+ ConfigOptions.key( "rate.limiting.limit.rate")
+ .stringType()
+ .defaultValue("10Mbps")
+ .withDescription("Limit rate, examples: 10Mbps, 10Kbps, 10240bps.");
+
+ public static final ConfigOption<Duration> RATE_LIMITING_BLOCK_DURATION =
+ ConfigOptions.key("rate.limiting.block.duration")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(5))
+ .withDescription("对首次超出限流数据阻塞,最长阻塞多长时间后超出限流数据全部丢弃");
+
+ public static final ConfigOption<Duration> RATE_LIMITING_BLOCK_RESET_DURATION =
+ ConfigOptions.key("rate.limiting.block.reset.duration")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(30))
+ .withDescription("超速阻塞后速率恢复正常多长时间后重置超速阻塞状态");
+
public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS =
ConfigOptions.key( PROPERTIES_PREFIX + "bootstrap.servers")
.stringType()
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
index 761d1ad..964aa94 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
@@ -1,6 +1,7 @@
package com.geedgenetworks.connectors.kafka;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategy;
import com.geedgenetworks.core.connector.format.EncodingFormat;
import com.geedgenetworks.core.connector.sink.SinkProvider;
import com.geedgenetworks.core.types.StructType;
@@ -19,19 +20,22 @@ public class KafkaSinkProvider implements SinkProvider {
private final String topic;
private final Properties properties;
private final boolean logFailuresOnly;
+ private final RateLimitingStrategy rateLimitingStrategy;
public KafkaSinkProvider(
StructType dataType,
EncodingFormat valueEncodingFormat,
String topic,
Properties properties,
- boolean logFailuresOnly
+ boolean logFailuresOnly,
+ RateLimitingStrategy rateLimitingStrategy
) {
this.dataType = dataType;
this.valueSerialization = valueEncodingFormat.createRuntimeEncoder(dataType);
this.topic = topic;
this.properties = properties;
this.logFailuresOnly = logFailuresOnly;
+ this.rateLimitingStrategy = rateLimitingStrategy;
}
@Override
@@ -43,6 +47,7 @@ public class KafkaSinkProvider implements SinkProvider {
Optional.empty()
);
kafkaProducer.setLogFailuresOnly(logFailuresOnly);
+ kafkaProducer.setRateLimitingStrategy(rateLimitingStrategy);
return dataStream.addSink(kafkaProducer);
}
}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
index 8829076..c3b483f 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
@@ -1,5 +1,9 @@
package com.geedgenetworks.connectors.kafka;
+import com.geedgenetworks.connectors.kafka.rate.BlockDropRateLimitingStrategy;
+import com.geedgenetworks.connectors.kafka.rate.NoRateLimitingStrategy;
+import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategy;
+import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategyType;
import com.geedgenetworks.core.connector.format.DecodingFormat;
import com.geedgenetworks.core.connector.format.EncodingFormat;
import com.geedgenetworks.core.connector.sink.SinkProvider;
@@ -9,11 +13,9 @@ import com.geedgenetworks.core.factories.FactoryUtil.TableFactoryHelper;
import com.geedgenetworks.core.types.StructType;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.Preconditions;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
+import java.util.*;
import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptions.*;
import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
@@ -58,7 +60,7 @@ public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory {
boolean logFailuresOnly = config.get(LOG_FAILURES_ONLY);
final Properties properties = getKafkaProperties(context.getOptions());
- return new KafkaSinkProvider(dataType, valueEncodingFormat, topic, properties, logFailuresOnly);
+ return new KafkaSinkProvider(dataType, valueEncodingFormat, topic, properties, logFailuresOnly, getRateLimitingStrategy(config));
}
@Override
@@ -74,7 +76,69 @@ public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory {
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(LOG_FAILURES_ONLY);
+ options.add(RATE_LIMITING_STRATEGY);
+ options.add(RATE_LIMITING_LIMIT_RATE);
+ options.add(RATE_LIMITING_BLOCK_DURATION);
+ options.add(RATE_LIMITING_BLOCK_RESET_DURATION);
return options;
}
+ private RateLimitingStrategy getRateLimitingStrategy(ReadableConfig config){
+ RateLimitingStrategyType strategyType = config.get(RATE_LIMITING_STRATEGY);
+ switch (strategyType){
+ case NONE:
+ return new NoRateLimitingStrategy();
+ case SLIDING_WINDOW:
+ return new BlockDropRateLimitingStrategy(
+ parseRateLimitingRate(config.get(RATE_LIMITING_LIMIT_RATE)),
+ config.get(RATE_LIMITING_BLOCK_DURATION).toMillis(),
+ config.get(RATE_LIMITING_BLOCK_RESET_DURATION).toMillis());
+ default:
+ throw new IllegalArgumentException("not supported strategy:" + strategyType);
+ }
+ }
+
+ private long parseRateLimitingRate(String text){
+ Preconditions.checkNotNull(text);
+ final String trimmed = text.trim();
+ Preconditions.checkArgument(!trimmed.isEmpty());
+
+ final int len = trimmed.length();
+ int pos = 0;
+
+ char current;
+ while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
+ pos++;
+ }
+
+ final String number = trimmed.substring(0, pos);
+ final String unit = trimmed.substring(pos).trim().toLowerCase();
+
+ if (number.isEmpty()) {
+ throw new NumberFormatException("text does not start with a number");
+ }
+
+ final long value;
+ try {
+ value = Long.parseLong(number); // this throws a NumberFormatException on overflow
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("The value '" + number+ "' cannot be re represented as long.");
+ }
+
+ long multiplier;
+ if("mbps".equals(unit)){
+ multiplier = 1 << 20;
+ }else if("kbps".equals(unit)){
+ multiplier = 1 << 10;
+ }else if("bps".equals(unit)){
+ multiplier = 1;
+ }else if(unit.isEmpty()){
+ multiplier = 1;
+ }else{
+ throw new IllegalArgumentException(text);
+ }
+
+ // bit单位转为byte单位
+ return value * multiplier / 8;
+ }
}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/BlockDropRateLimitingStrategy.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/BlockDropRateLimitingStrategy.java
new file mode 100644
index 0000000..bb59486
--- /dev/null
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/BlockDropRateLimitingStrategy.java
@@ -0,0 +1,162 @@
+package com.geedgenetworks.connectors.kafka.rate;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.util.Arrays;
+
+public class BlockDropRateLimitingStrategy implements RateLimitingStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(BlockDropRateLimitingStrategy.class);
+ private static final int blockTsLastSize = 8;
+ private static final int blockTsLastMask = 7;
+ private final long maxRate;
+ private final boolean blockEnable;
+ private final long maxBlockDuration; // 毫秒
+ private final long resetBlockDuration; // 毫秒
+ private final SlidingWindowRateStat stat;
+ private int blockTsLastIndex;
+ private long[] firstBlockTsLast;
+ private long[] lastBlockTsLast;
+ private long firstBlockTs;
+ private long lastBlockTs;
+ private boolean hasLimitingStat; // 触发超速限制后, 输出统计信息
+ private long limitingStatStartTs;
+ private long limitingBlockCount;
+ private long limitingBlockMs;
+ private long limitingDropCount;
+ private long limitingDropSize;
+
+ /**
+ * @param maxRate 速率(单位秒)
+ * @param maxBlockDuration 最长阻塞时长(毫秒), 值为0代表超速直接丢弃不执行阻塞
+ * @param resetBlockDuration 阻塞重置恢复时长, 超速阻塞后速率恢复正常多长时间后重置超速阻塞状态
+ */
+ public BlockDropRateLimitingStrategy(long maxRate, long maxBlockDuration, long resetBlockDuration) {
+ Preconditions.checkArgument(maxRate > 0);
+ Preconditions.checkArgument(maxBlockDuration >= 0);
+ Preconditions.checkArgument(resetBlockDuration >= 0);
+ this.maxRate = maxRate;
+ this.blockEnable = maxBlockDuration > 0;
+ this.maxBlockDuration = maxBlockDuration;
+ this.resetBlockDuration = resetBlockDuration;
+ this.stat = new SlidingWindowRateStat();
+ this.firstBlockTs = 0;
+ this.lastBlockTs = 0;
+ this.firstBlockTsLast = new long[blockTsLastSize];
+ this.lastBlockTsLast = new long[blockTsLastSize];
+ this.blockTsLastIndex = blockTsLastMask;
+ }
+
+ @Override
+ public long maxRate() {
+ return maxRate;
+ }
+
+ @Override
+ public BlockDropRateLimitingStrategy withMaxRate(long maxRate) {
+ return new BlockDropRateLimitingStrategy(maxRate, maxBlockDuration, resetBlockDuration);
+ }
+
+ @Override
+ public RateLimitingStatus record(final long value) {
+ final long currentTs = System.currentTimeMillis();
+ outLimitingStat(currentTs);
+
+ long currentRate = stat.getCurrentRate(currentTs);
+ // ----未超速----
+ if (currentRate < maxRate) {
+ stat.record(value);
+ return RateLimitingStatus.NONE;
+ }
+
+ // 输出限速统计信息
+ if (!hasLimitingStat) {
+ hasLimitingStat = true;
+ limitingStatStartTs = currentTs;
+ limitingBlockCount = 0;
+ limitingBlockMs = 0;
+ limitingDropCount = 0;
+ limitingDropSize = 0;
+ }
+
+ // ----超速----
+ // --直接drop--
+ if (!blockEnable) {
+ limitingDropCount += 1;
+ limitingDropSize += value;
+ return RateLimitingStatus.DROP;
+ }
+
+ // --block后drop--
+
+ // 首次触发 or 重置后首次触发
+ if (firstBlockTs == 0L || currentTs - lastBlockTs > resetBlockDuration) {
+ //LOG.info("firstBlock");
+ firstBlockTs = currentTs;
+ lastBlockTs = firstBlockTs;
+ blockTsLastIndex = (blockTsLastIndex + 1) & blockTsLastMask;
+ firstBlockTsLast[blockTsLastIndex] = firstBlockTs;
+ lastBlockTsLast[blockTsLastIndex] = lastBlockTs;
+ } else {
+ if(currentTs - lastBlockTs <= 300L){
+ lastBlockTs = currentTs;
+ lastBlockTsLast[blockTsLastIndex] = lastBlockTs;
+ }
+ // drop立马return
+ if (lastBlockTs - firstBlockTs > maxBlockDuration) {
+ limitingDropCount += 1;
+ limitingDropSize += value;
+ return RateLimitingStatus.DROP;
+ }
+ }
+
+ // 需要阻塞, 直到速率下降后
+ long ts = currentTs;
+ while (currentRate >= maxRate) {
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ ts = System.currentTimeMillis();
+ currentRate = stat.getCurrentRate(ts);
+ }
+ limitingBlockCount += 1;
+ limitingBlockMs += ts - currentTs;
+
+
+ stat.record(value);
+ return RateLimitingStatus.BLOCK;
+ }
+
+ void outLimitingStat(long currentTs) {
+ //if (hasLimitingStat && currentTs - limitingStatStartTs > 300000L) {
+ if (hasLimitingStat && currentTs - limitingStatStartTs > 120000L) {
+ hasLimitingStat = false;
+ StringBuilder blockTsInfos = new StringBuilder();
+ int idx;
+ for (int i = 0; i < blockTsLastSize; i++) {
+ idx = (blockTsLastIndex - i) & blockTsLastMask;
+ blockTsInfos.append(new Timestamp(firstBlockTsLast[idx]).toString())
+ .append("_").append(new Timestamp(lastBlockTsLast[idx]).toString()).append("_").append(lastBlockTsLast[idx] - firstBlockTsLast[idx]);
+ if(i != blockTsLastSize - 1){
+ blockTsInfos.append(",\n");
+ }
+ }
+ LOG.info("from {} to {} limiting info for max rate({} bytes), blockCount: {}, blockMs: {}, dropCount: {}, dropSize: {},blockTsInfos:\n{}.", new Timestamp(limitingStatStartTs).toString(), new Timestamp(currentTs).toString(),
+ maxRate,limitingBlockCount, limitingBlockMs, limitingDropCount, limitingDropSize, blockTsInfos.toString()
+ );
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "BlockDropRateLimitingStrategy{" +
+ "maxRate=" + maxRate +
+ " bytes/second, maxBlockDuration=" + maxBlockDuration +
+ " ms, resetBlockDuration=" + resetBlockDuration +
+ " ms}";
+ }
+}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/NoRateLimitingStrategy.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/NoRateLimitingStrategy.java
new file mode 100644
index 0000000..25d34a2
--- /dev/null
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/NoRateLimitingStrategy.java
@@ -0,0 +1,28 @@
+package com.geedgenetworks.connectors.kafka.rate;
+
+public class NoRateLimitingStrategy implements RateLimitingStrategy {
+ @Override
+ public boolean rateLimited() {
+ return false; // 实际不限速
+ }
+
+ @Override
+ public long maxRate() {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public NoRateLimitingStrategy withMaxRate(long maxRate) {
+ return this;
+ }
+
+ @Override
+ public RateLimitingStatus record(long value) {
+ return RateLimitingStatus.NONE;
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName();
+ }
+}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStatus.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStatus.java
new file mode 100644
index 0000000..fa852f8
--- /dev/null
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStatus.java
@@ -0,0 +1,7 @@
+package com.geedgenetworks.connectors.kafka.rate;
+
+public enum RateLimitingStatus {
+ NONE,
+ BLOCK,
+ DROP
+}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategy.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategy.java
new file mode 100644
index 0000000..d179a42
--- /dev/null
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategy.java
@@ -0,0 +1,15 @@
+package com.geedgenetworks.connectors.kafka.rate;
+
+import java.io.Serializable;
+
+public interface RateLimitingStrategy extends Serializable {
+ default boolean rateLimited(){
+ return true;
+ }
+
+ long maxRate();
+
+ RateLimitingStrategy withMaxRate(long maxRate);
+
+ RateLimitingStatus record(long value);
+}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyType.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyType.java
new file mode 100644
index 0000000..23241e3
--- /dev/null
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyType.java
@@ -0,0 +1,5 @@
+package com.geedgenetworks.connectors.kafka.rate;
+
+public enum RateLimitingStrategyType {
+ NONE, SLIDING_WINDOW;
+}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStat.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStat.java
new file mode 100644
index 0000000..660ab60
--- /dev/null
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStat.java
@@ -0,0 +1,58 @@
+package com.geedgenetworks.connectors.kafka.rate;
+
+import java.io.Serializable;
+
+// 专门用于实时统计发送字节速率, 输出最近一秒内速率
+public class SlidingWindowRateStat implements Serializable {
+ private static final long windowSize = 1000;
+ private static final int bucketSize = 8;
+ private static final int bucketMask = bucketSize - 1;
+ private static final long bucketWindowSize = windowSize / bucketSize;
+ private final long[] buckets;
+ private int current;
+ private long currentWindowEndMs;
+ private long totalValue;
+
+ public SlidingWindowRateStat() {
+ this.buckets = new long[bucketSize];
+ this.current = (bucketSize - 1) & bucketMask;
+ this.currentWindowEndMs = System.currentTimeMillis() / bucketWindowSize * bucketWindowSize;
+ }
+
+ public void record(long value) {
+ long timeMs = System.currentTimeMillis();
+ currentUpdate(timeMs);
+ buckets[current] += value;
+ totalValue += value;
+ }
+
+ public long recordAndGetCurrentRate(long value) {
+ long timeMs = System.currentTimeMillis();
+ currentUpdate(timeMs);
+ buckets[current] += value;
+ totalValue += value;
+ return totalValue;
+ }
+
+ public long getCurrentRate() {
+ long timeMs = System.currentTimeMillis();
+ currentUpdate(timeMs);
+ return totalValue;
+ }
+
+ public long getCurrentRate(long timeMs) {
+ currentUpdate(timeMs);
+ return totalValue;
+ }
+
+ private void currentUpdate(long timeMs){
+ if (timeMs > currentWindowEndMs) {
+ do {
+ current = (current + 1) & bucketMask;
+ totalValue -= buckets[current];
+ buckets[current] = 0;
+ currentWindowEndMs += bucketWindowSize;
+ } while (currentWindowEndMs < timeMs);
+ }
+ }
+}
diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java
index ff0bb34..09e8190 100644
--- a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java
+++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java
@@ -17,6 +17,8 @@
package org.apache.flink.streaming.connectors.kafka;
+import com.geedgenetworks.connectors.kafka.rate.RateLimitingStatus;
+import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategy;
import com.geedgenetworks.core.metrics.InternalMetrics;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
@@ -253,6 +255,8 @@ public class GrootFlinkKafkaProducer<IN>
* Cache of metrics to replace already registered metrics instead of overwriting existing ones.
*/
private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap<>();
+ private RateLimitingStrategy rateLimitingStrategy;
+ private boolean rateLimitingEnable;
private transient InternalMetrics internalMetrics;
@@ -760,6 +764,11 @@ public class GrootFlinkKafkaProducer<IN>
this.logFailuresOnly = logFailuresOnly;
}
+ public void setRateLimitingStrategy(RateLimitingStrategy rateLimitingStrategy) {
+ this.rateLimitingStrategy = rateLimitingStrategy;
+ this.rateLimitingEnable = rateLimitingStrategy != null && rateLimitingStrategy.rateLimited();
+ }
+
/**
* Disables the propagation of exceptions thrown when committing presumably timed out Kafka
* transactions during recovery of the job. If a Kafka transaction is timed out, a commit will
@@ -781,6 +790,12 @@ public class GrootFlinkKafkaProducer<IN>
/** Initializes the connection to Kafka. */
@Override
public void open(Configuration configuration) throws Exception {
+ if(rateLimitingEnable){
+ int subtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ long subTaskMaxRate = rateLimitingStrategy.maxRate() / subtasks;
+ rateLimitingStrategy = rateLimitingStrategy.withMaxRate(subTaskMaxRate);
+ LOG.error("rateLimitingStrategy: {}", rateLimitingStrategy);
+ }
internalMetrics = new InternalMetrics(getRuntimeContext());
if (logFailuresOnly) {
callback =
@@ -914,8 +929,17 @@ public class GrootFlinkKafkaProducer<IN>
record = kafkaSchema.serialize(next, context.timestamp());
}
- pendingRecords.incrementAndGet();
- transaction.producer.send(record, callback);
+ if (!rateLimitingEnable) {
+ pendingRecords.incrementAndGet();
+ transaction.producer.send(record, callback);
+ } else {
+ if (rateLimitingStrategy.record(record.value().length) == RateLimitingStatus.DROP) {
+ internalMetrics.incrementDroppedEvents();
+ } else {
+ pendingRecords.incrementAndGet();
+ transaction.producer.send(record, callback);
+ }
+ }
} catch (Exception e) {
internalMetrics.incrementErrorEvents();
LOG.error("serialize error", e);
diff --git a/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyTest.java b/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyTest.java
new file mode 100644
index 0000000..ebf902f
--- /dev/null
+++ b/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyTest.java
@@ -0,0 +1,266 @@
+package com.geedgenetworks.connectors.kafka.rate;
+
+import org.apache.flink.util.TimeUtils;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class RateLimitingStrategyTest {
+
+ public static void main(String[] args) throws Exception {
+ //testNoRateLimitingStrategy();
+ //testBlockDropRateLimitingStrategyForDrop();
+ //testBlockDropRateLimitingStrategyForBlockDrop();
+ //testBlockDropRateLimitingStrategyForBlock();
+
+ testBlockDropRateLimitingStrategyForDropMultiThread();
+ }
+
+ public static void testNoRateLimitingStrategy() throws Exception{
+ RateLimitingStrategy strategy = new NoRateLimitingStrategy();
+
+ int times = 500 * 60;
+ long startTs = System.currentTimeMillis();
+ long ts;
+ long lastTs = startTs;
+ long diff;
+ long pass = 0;
+ long drop = 0;
+ for (int i = 0; i < times; i++) {
+ if(i < 500 * 30){
+ long value = 2000;
+ RateLimitingStatus status = strategy.record(value);
+ if(status == RateLimitingStatus.DROP){
+ drop += value;
+ }else{
+ pass += value;
+ }
+ }
+ ts = System.currentTimeMillis();
+ if(ts - lastTs >= 100){
+ lastTs = ts;
+ System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ":" + pass + ", " + drop);
+ }
+ diff = startTs + i * 2 - ts;
+ if(diff > 0 ){
+ Thread.sleep(diff);
+ }
+ }
+
+ }
+
+ public static void testBlockDropRateLimitingStrategyForDrop() throws Exception{
+ BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 0, 0 );
+
+ int times = 500 * 60;
+ long startTs = System.currentTimeMillis();
+ long ts;
+ long lastTs = startTs;
+ long diff;
+ long pass = 0;
+ long drop = 0;
+ for (int i = 0; i < times; i++) {
+ if(i < 500 * 30){
+ long value = 2000;
+ RateLimitingStatus status = strategy.record(value);
+ if(status == RateLimitingStatus.DROP){
+ drop += value;
+ }else{
+ pass += value;
+ }
+ }
+ ts = System.currentTimeMillis();
+ if(ts - lastTs >= 100){
+ lastTs = ts;
+ System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ":" + pass + ", " + drop);
+ }
+ diff = startTs + i * 2 - ts;
+ if(diff > 0 ){
+ Thread.sleep(diff);
+ }
+ }
+
+
+ strategy.outLimitingStat(System.currentTimeMillis() + 300000L);
+ }
+
+ public static void testBlockDropRateLimitingStrategyForBlockDrop() throws Exception{
+ //BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 10000, 1000 );
+ BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 40000, 1000 );
+
+ int times = 500 * 60;
+ long startTs = System.currentTimeMillis();
+ long ts;
+ long lastTs = startTs;
+ long diff;
+ long diffSum = 0;
+ long pass = 0;
+ long drop = 0;
+ for (int i = 0; i < times; i++) {
+ ts = System.currentTimeMillis();
+ if(ts - lastTs >= 100){
+ lastTs = ts;
+ System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ":" + pass + ", " + drop);
+ }
+ diff = startTs + (i + 1) * 2 - ts;
+ if(diff > 0 ){
+ if(i < 500 * 30){
+ diffSum += diff;
+ }
+ Thread.sleep(diff);
+ }
+
+ if(i < 500 * 30){
+ long value = 2000;
+ RateLimitingStatus status = strategy.record(value);
+ if(status == RateLimitingStatus.DROP){
+ drop += value;
+ }else{
+ pass += value;
+ }
+ }
+ }
+
+ System.out.println("diffSum:" + diffSum);
+ strategy.outLimitingStat(System.currentTimeMillis() + 300000L);
+ }
+
+ public static void testBlockDropRateLimitingStrategyForBlock() throws Exception{
+ //RateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 10000, 1000 );
+ BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, TimeUtils.parseDuration("1d").toMillis(), 1000 );
+
+ int times = 500 * 60;
+ long startTs = System.currentTimeMillis();
+ long ts;
+ long lastTs = startTs;
+ long diff;
+ long diffSum = 0;
+ long pass = 0;
+ long drop = 0;
+ for (int i = 0; i < times; i++) {
+ ts = System.currentTimeMillis();
+ if(ts - lastTs >= 100){
+ lastTs = ts;
+ System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ":" + pass + ", " + drop);
+ }
+ diff = startTs + (i + 1) * 2 - ts;
+ if(diff > 0 ){
+ if(i < 500 * 30){
+ diffSum += diff;
+ }
+ Thread.sleep(diff);
+ }
+
+ if(i < 500 * 30){
+ long value = 2000;
+ RateLimitingStatus status = strategy.record(value);
+ if(status == RateLimitingStatus.DROP){
+ drop += value;
+ }else{
+ pass += value;
+ }
+ }
+
+ }
+
+ System.out.println("diffSum:" + diffSum);
+ strategy.outLimitingStat(System.currentTimeMillis() + 300000L);
+ }
+
+
+ public static void testBlockDropRateLimitingStrategyForDropMultiThread() throws Exception{
+ final ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<>(1000000);
+
+ Thread producerThread = new Thread(() -> {
+ int times = 500 * 60;
+ long startTs = System.currentTimeMillis();
+ long ts;
+ long diff;
+ for (int i = 0; i < times; i++) {
+ ts = System.currentTimeMillis();
+ diff = startTs + (i + 1) * 2 - ts;
+ if(diff > 0 ){
+ try {
+ Thread.sleep(diff);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ if(i < 500 * 30){
+ long value = 2000;
+ try {
+ queue.put(value);
+ //System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ",put");
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + "producerThread end");
+ });
+
+ Thread consumerThread = new Thread(() -> {
+ //BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 40000, 1000 );
+ BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 30000, 1000 );
+ //BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 0, 1000 );
+ //BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, TimeUtils.parseDuration("1d").toMillis(), 1000 );
+ System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ",poll_before");
+ long startTs = System.currentTimeMillis();
+ long ts;
+ long lastTs = startTs;
+ long pass = 0;
+ long drop = 0;
+ int i = 1;
+ long diff;
+ while (true){
+ try {
+ Long value = queue.poll(60, TimeUnit.MICROSECONDS);
+ //System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ",poll");
+ if(value == null){
+ break;
+ }
+
+ ts = System.currentTimeMillis();
+
+ diff = startTs + (i + 1) * 2 * 12 / 6 - ts;
+ //diff = startTs + (i + 1) * 2 - ts;
+ //diff = startTs + (i + 1) * 2* 11 / 6 - ts;
+ if(diff > 0 ){
+ try {
+ Thread.sleep(diff);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ i++;
+
+ if(ts - lastTs >= 100){
+ lastTs = ts;
+ System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ":" + pass + ", " + drop);
+ }
+
+ RateLimitingStatus status = strategy.record(value);
+ if(status == RateLimitingStatus.DROP){
+ drop += value;
+ }else{
+ pass += value;
+ }
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+
+ System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ":" + pass + ", " + drop);
+ strategy.outLimitingStat(System.currentTimeMillis() + 300000L);
+ });
+
+ producerThread.start();
+ consumerThread.start();
+
+ producerThread.join();
+ consumerThread.join();
+ }
+
+} \ No newline at end of file
diff --git a/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStatTest.java b/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStatTest.java
new file mode 100644
index 0000000..65c3641
--- /dev/null
+++ b/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStatTest.java
@@ -0,0 +1,76 @@
+package com.geedgenetworks.connectors.kafka.rate;
+
+import com.geedgenetworks.connectors.kafka.rate.SlidingWindowRateStat;
+
+public class SlidingWindowRateStatTest {
+
+ public static void main(String[] args) throws Exception{
+ //testRate();
+ testRateLimit();
+ }
+
+ public static void testRate() throws Exception{
+ SlidingWindowRateStat stat = new SlidingWindowRateStat();
+
+ System.out.println(new java.sql.Timestamp(System.currentTimeMillis()));
+
+ int times = 500 * 60;
+ long startTs = System.currentTimeMillis();
+ long ts;
+ long diff;
+ for (int i = 0; i < times; i++) {
+ if(i < 500 * 30){
+ stat.record(2000);
+ }
+ ts = System.currentTimeMillis();
+ if(i % 50 == 0){
+ System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ":" + stat.getCurrentRate());
+ }
+ diff = startTs + i * 2 - ts;
+ if(diff > 0 ){
+ Thread.sleep(diff);
+ }
+ }
+
+ }
+
+ public static void testRateLimit() throws Exception{
+ SlidingWindowRateStat stat = new SlidingWindowRateStat();
+
+ System.out.println(new java.sql.Timestamp(System.currentTimeMillis()));
+
+ int times = 500 * 60;
+ long startTs = System.currentTimeMillis();
+ long ts;
+ long diff;
+ long rate;
+ boolean limit = false;
+ for (int i = 0; i < times; i++) {
+ rate = stat.getCurrentRate();
+ if(rate < 600000){
+ if(limit == true){
+ limit = false;
+ System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + "," + rate +", limit:" + i);
+ }
+ if(i < 500 * 30){
+ stat.record(2000);
+ }
+ }else{
+ if(limit == false){
+ limit = true;
+ System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + "," + rate +", limit:" + i);
+ }
+ }
+
+ ts = System.currentTimeMillis();
+ if(i % 50 == 0){
+ System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ":" + stat.getCurrentRate());
+ }
+ diff = startTs + i * 2 - ts;
+ if(diff > 0 ){
+ Thread.sleep(diff);
+ }
+ }
+
+ }
+}