From cb6fc7984ae0714b41c96e1835872d7649db9857 Mon Sep 17 00:00:00 2001 From: lifengchao Date: Sun, 28 Apr 2024 18:25:48 +0800 Subject: GAL-557 Groot Stream Kafka Sink 支持限速配置 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../connectors/kafka/KafkaConnectorOptions.java | 26 ++ .../connectors/kafka/KafkaSinkProvider.java | 7 +- .../connectors/kafka/KafkaTableFactory.java | 74 +++++- .../kafka/rate/BlockDropRateLimitingStrategy.java | 162 +++++++++++++ .../kafka/rate/NoRateLimitingStrategy.java | 28 +++ .../connectors/kafka/rate/RateLimitingStatus.java | 7 + .../kafka/rate/RateLimitingStrategy.java | 15 ++ .../kafka/rate/RateLimitingStrategyType.java | 5 + .../kafka/rate/SlidingWindowRateStat.java | 58 +++++ .../connectors/kafka/GrootFlinkKafkaProducer.java | 28 ++- .../kafka/rate/RateLimitingStrategyTest.java | 266 +++++++++++++++++++++ .../kafka/rate/SlidingWindowRateStatTest.java | 76 ++++++ 12 files changed, 744 insertions(+), 8 deletions(-) create mode 100644 groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/BlockDropRateLimitingStrategy.java create mode 100644 groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/NoRateLimitingStrategy.java create mode 100644 groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStatus.java create mode 100644 groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategy.java create mode 100644 groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyType.java create mode 100644 groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStat.java create mode 100644 groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyTest.java create mode 100644 groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStatTest.java diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java index b827b42..8ab6bc8 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptions.java @@ -1,8 +1,10 @@ package com.geedgenetworks.connectors.kafka; +import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategyType; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import java.time.Duration; import java.util.List; import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX; @@ -22,6 +24,30 @@ public class KafkaConnectorOptions { .withDescription("Optional flag to whether the producer should fail on errors, or only log them;\n" + "If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown, true by default."); + public static final ConfigOption RATE_LIMITING_STRATEGY = + ConfigOptions.key("rate.limiting.strategy") + .enumType(RateLimitingStrategyType.class) + .defaultValue(RateLimitingStrategyType.NONE) + .withDescription("Rate limiting strategy, optionals:none,sliding_window, default none."); + + public static final ConfigOption RATE_LIMITING_LIMIT_RATE = + ConfigOptions.key( "rate.limiting.limit.rate") + .stringType() + .defaultValue("10Mbps") + .withDescription("Limit rate, examples: 10Mbps, 10Kbps, 10240bps."); + + public static final ConfigOption RATE_LIMITING_BLOCK_DURATION = + ConfigOptions.key("rate.limiting.block.duration") + .durationType() + .defaultValue(Duration.ofMinutes(5)) + .withDescription("对首次超出限流数据阻塞,最长阻塞多长时间后超出限流数据全部丢弃"); + + public static final ConfigOption RATE_LIMITING_BLOCK_RESET_DURATION = + ConfigOptions.key("rate.limiting.block.reset.duration") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription("超速阻塞后速率恢复正常多长时间后重置超速阻塞状态"); + public static final ConfigOption PROPS_BOOTSTRAP_SERVERS = ConfigOptions.key( PROPERTIES_PREFIX + "bootstrap.servers") .stringType() diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java index 761d1ad..964aa94 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java @@ -1,6 +1,7 @@ package com.geedgenetworks.connectors.kafka; import com.geedgenetworks.common.Event; +import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategy; import com.geedgenetworks.core.connector.format.EncodingFormat; import com.geedgenetworks.core.connector.sink.SinkProvider; import com.geedgenetworks.core.types.StructType; @@ -19,19 +20,22 @@ public class KafkaSinkProvider implements SinkProvider { private final String topic; private final Properties properties; private final boolean logFailuresOnly; + private final RateLimitingStrategy rateLimitingStrategy; public KafkaSinkProvider( StructType dataType, EncodingFormat valueEncodingFormat, String topic, Properties properties, - boolean logFailuresOnly + boolean logFailuresOnly, + RateLimitingStrategy rateLimitingStrategy ) { this.dataType = dataType; this.valueSerialization = valueEncodingFormat.createRuntimeEncoder(dataType); this.topic = topic; this.properties = properties; this.logFailuresOnly = logFailuresOnly; + this.rateLimitingStrategy = rateLimitingStrategy; } @Override @@ -43,6 +47,7 @@ public class KafkaSinkProvider implements SinkProvider { Optional.empty() ); kafkaProducer.setLogFailuresOnly(logFailuresOnly); + kafkaProducer.setRateLimitingStrategy(rateLimitingStrategy); return dataStream.addSink(kafkaProducer); } } diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java index 8829076..c3b483f 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java @@ -1,5 +1,9 @@ package com.geedgenetworks.connectors.kafka; +import com.geedgenetworks.connectors.kafka.rate.BlockDropRateLimitingStrategy; +import com.geedgenetworks.connectors.kafka.rate.NoRateLimitingStrategy; +import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategy; +import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategyType; import com.geedgenetworks.core.connector.format.DecodingFormat; import com.geedgenetworks.core.connector.format.EncodingFormat; import com.geedgenetworks.core.connector.sink.SinkProvider; @@ -9,11 +13,9 @@ import com.geedgenetworks.core.factories.FactoryUtil.TableFactoryHelper; import com.geedgenetworks.core.types.StructType; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.util.Preconditions; -import java.util.HashSet; -import java.util.List; -import java.util.Properties; -import java.util.Set; +import java.util.*; import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptions.*; import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX; @@ -58,7 +60,7 @@ public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory { boolean logFailuresOnly = config.get(LOG_FAILURES_ONLY); final Properties properties = getKafkaProperties(context.getOptions()); - return new KafkaSinkProvider(dataType, valueEncodingFormat, topic, properties, logFailuresOnly); + return new KafkaSinkProvider(dataType, valueEncodingFormat, topic, properties, logFailuresOnly, getRateLimitingStrategy(config)); } @Override @@ -74,7 +76,69 @@ public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory { public Set> optionalOptions() { final Set> options = new HashSet<>(); options.add(LOG_FAILURES_ONLY); + options.add(RATE_LIMITING_STRATEGY); + options.add(RATE_LIMITING_LIMIT_RATE); + options.add(RATE_LIMITING_BLOCK_DURATION); + options.add(RATE_LIMITING_BLOCK_RESET_DURATION); return options; } + private RateLimitingStrategy getRateLimitingStrategy(ReadableConfig config){ + RateLimitingStrategyType strategyType = config.get(RATE_LIMITING_STRATEGY); + switch (strategyType){ + case NONE: + return new NoRateLimitingStrategy(); + case SLIDING_WINDOW: + return new BlockDropRateLimitingStrategy( + parseRateLimitingRate(config.get(RATE_LIMITING_LIMIT_RATE)), + config.get(RATE_LIMITING_BLOCK_DURATION).toMillis(), + config.get(RATE_LIMITING_BLOCK_RESET_DURATION).toMillis()); + default: + throw new IllegalArgumentException("not supported strategy:" + strategyType); + } + } + + private long parseRateLimitingRate(String text){ + Preconditions.checkNotNull(text); + final String trimmed = text.trim(); + Preconditions.checkArgument(!trimmed.isEmpty()); + + final int len = trimmed.length(); + int pos = 0; + + char current; + while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') { + pos++; + } + + final String number = trimmed.substring(0, pos); + final String unit = trimmed.substring(pos).trim().toLowerCase(); + + if (number.isEmpty()) { + throw new NumberFormatException("text does not start with a number"); + } + + final long value; + try { + value = Long.parseLong(number); // this throws a NumberFormatException on overflow + } catch (NumberFormatException e) { + throw new IllegalArgumentException("The value '" + number+ "' cannot be re represented as long."); + } + + long multiplier; + if("mbps".equals(unit)){ + multiplier = 1 << 20; + }else if("kbps".equals(unit)){ + multiplier = 1 << 10; + }else if("bps".equals(unit)){ + multiplier = 1; + }else if(unit.isEmpty()){ + multiplier = 1; + }else{ + throw new IllegalArgumentException(text); + } + + // bit单位转为byte单位 + return value * multiplier / 8; + } } diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/BlockDropRateLimitingStrategy.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/BlockDropRateLimitingStrategy.java new file mode 100644 index 0000000..bb59486 --- /dev/null +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/BlockDropRateLimitingStrategy.java @@ -0,0 +1,162 @@ +package com.geedgenetworks.connectors.kafka.rate; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Timestamp; +import java.util.Arrays; + +public class BlockDropRateLimitingStrategy implements RateLimitingStrategy { + private static final Logger LOG = LoggerFactory.getLogger(BlockDropRateLimitingStrategy.class); + private static final int blockTsLastSize = 8; + private static final int blockTsLastMask = 7; + private final long maxRate; + private final boolean blockEnable; + private final long maxBlockDuration; // 毫秒 + private final long resetBlockDuration; // 毫秒 + private final SlidingWindowRateStat stat; + private int blockTsLastIndex; + private long[] firstBlockTsLast; + private long[] lastBlockTsLast; + private long firstBlockTs; + private long lastBlockTs; + private boolean hasLimitingStat; // 触发超速限制后, 输出统计信息 + private long limitingStatStartTs; + private long limitingBlockCount; + private long limitingBlockMs; + private long limitingDropCount; + private long limitingDropSize; + + /** + * @param maxRate 速率(单位秒) + * @param maxBlockDuration 最长阻塞时长(毫秒), 值为0代表超速直接丢弃不执行阻塞 + * @param resetBlockDuration 阻塞重置恢复时长, 超速阻塞后速率恢复正常多长时间后重置超速阻塞状态 + */ + public BlockDropRateLimitingStrategy(long maxRate, long maxBlockDuration, long resetBlockDuration) { + Preconditions.checkArgument(maxRate > 0); + Preconditions.checkArgument(maxBlockDuration >= 0); + Preconditions.checkArgument(resetBlockDuration >= 0); + this.maxRate = maxRate; + this.blockEnable = maxBlockDuration > 0; + this.maxBlockDuration = maxBlockDuration; + this.resetBlockDuration = resetBlockDuration; + this.stat = new SlidingWindowRateStat(); + this.firstBlockTs = 0; + this.lastBlockTs = 0; + this.firstBlockTsLast = new long[blockTsLastSize]; + this.lastBlockTsLast = new long[blockTsLastSize]; + this.blockTsLastIndex = blockTsLastMask; + } + + @Override + public long maxRate() { + return maxRate; + } + + @Override + public BlockDropRateLimitingStrategy withMaxRate(long maxRate) { + return new BlockDropRateLimitingStrategy(maxRate, maxBlockDuration, resetBlockDuration); + } + + @Override + public RateLimitingStatus record(final long value) { + final long currentTs = System.currentTimeMillis(); + outLimitingStat(currentTs); + + long currentRate = stat.getCurrentRate(currentTs); + // ----未超速---- + if (currentRate < maxRate) { + stat.record(value); + return RateLimitingStatus.NONE; + } + + // 输出限速统计信息 + if (!hasLimitingStat) { + hasLimitingStat = true; + limitingStatStartTs = currentTs; + limitingBlockCount = 0; + limitingBlockMs = 0; + limitingDropCount = 0; + limitingDropSize = 0; + } + + // ----超速---- + // --直接drop-- + if (!blockEnable) { + limitingDropCount += 1; + limitingDropSize += value; + return RateLimitingStatus.DROP; + } + + // --block后drop-- + + // 首次触发 or 重置后首次触发 + if (firstBlockTs == 0L || currentTs - lastBlockTs > resetBlockDuration) { + //LOG.info("firstBlock"); + firstBlockTs = currentTs; + lastBlockTs = firstBlockTs; + blockTsLastIndex = (blockTsLastIndex + 1) & blockTsLastMask; + firstBlockTsLast[blockTsLastIndex] = firstBlockTs; + lastBlockTsLast[blockTsLastIndex] = lastBlockTs; + } else { + if(currentTs - lastBlockTs <= 300L){ + lastBlockTs = currentTs; + lastBlockTsLast[blockTsLastIndex] = lastBlockTs; + } + // drop立马return + if (lastBlockTs - firstBlockTs > maxBlockDuration) { + limitingDropCount += 1; + limitingDropSize += value; + return RateLimitingStatus.DROP; + } + } + + // 需要阻塞, 直到速率下降后 + long ts = currentTs; + while (currentRate >= maxRate) { + try { + Thread.sleep(5); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + ts = System.currentTimeMillis(); + currentRate = stat.getCurrentRate(ts); + } + limitingBlockCount += 1; + limitingBlockMs += ts - currentTs; + + + stat.record(value); + return RateLimitingStatus.BLOCK; + } + + void outLimitingStat(long currentTs) { + //if (hasLimitingStat && currentTs - limitingStatStartTs > 300000L) { + if (hasLimitingStat && currentTs - limitingStatStartTs > 120000L) { + hasLimitingStat = false; + StringBuilder blockTsInfos = new StringBuilder(); + int idx; + for (int i = 0; i < blockTsLastSize; i++) { + idx = (blockTsLastIndex - i) & blockTsLastMask; + blockTsInfos.append(new Timestamp(firstBlockTsLast[idx]).toString()) + .append("_").append(new Timestamp(lastBlockTsLast[idx]).toString()).append("_").append(lastBlockTsLast[idx] - firstBlockTsLast[idx]); + if(i != blockTsLastSize - 1){ + blockTsInfos.append(",\n"); + } + } + LOG.info("from {} to {} limiting info for max rate({} bytes), blockCount: {}, blockMs: {}, dropCount: {}, dropSize: {},blockTsInfos:\n{}.", new Timestamp(limitingStatStartTs).toString(), new Timestamp(currentTs).toString(), + maxRate,limitingBlockCount, limitingBlockMs, limitingDropCount, limitingDropSize, blockTsInfos.toString() + ); + } + } + + @Override + public String toString() { + return "BlockDropRateLimitingStrategy{" + + "maxRate=" + maxRate + + " bytes/second, maxBlockDuration=" + maxBlockDuration + + " ms, resetBlockDuration=" + resetBlockDuration + + " ms}"; + } +} diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/NoRateLimitingStrategy.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/NoRateLimitingStrategy.java new file mode 100644 index 0000000..25d34a2 --- /dev/null +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/NoRateLimitingStrategy.java @@ -0,0 +1,28 @@ +package com.geedgenetworks.connectors.kafka.rate; + +public class NoRateLimitingStrategy implements RateLimitingStrategy { + @Override + public boolean rateLimited() { + return false; // 实际不限速 + } + + @Override + public long maxRate() { + return Long.MAX_VALUE; + } + + @Override + public NoRateLimitingStrategy withMaxRate(long maxRate) { + return this; + } + + @Override + public RateLimitingStatus record(long value) { + return RateLimitingStatus.NONE; + } + + @Override + public String toString() { + return this.getClass().getSimpleName(); + } +} diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStatus.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStatus.java new file mode 100644 index 0000000..fa852f8 --- /dev/null +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStatus.java @@ -0,0 +1,7 @@ +package com.geedgenetworks.connectors.kafka.rate; + +public enum RateLimitingStatus { + NONE, + BLOCK, + DROP +} diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategy.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategy.java new file mode 100644 index 0000000..d179a42 --- /dev/null +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategy.java @@ -0,0 +1,15 @@ +package com.geedgenetworks.connectors.kafka.rate; + +import java.io.Serializable; + +public interface RateLimitingStrategy extends Serializable { + default boolean rateLimited(){ + return true; + } + + long maxRate(); + + RateLimitingStrategy withMaxRate(long maxRate); + + RateLimitingStatus record(long value); +} diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyType.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyType.java new file mode 100644 index 0000000..23241e3 --- /dev/null +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyType.java @@ -0,0 +1,5 @@ +package com.geedgenetworks.connectors.kafka.rate; + +public enum RateLimitingStrategyType { + NONE, SLIDING_WINDOW; +} diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStat.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStat.java new file mode 100644 index 0000000..660ab60 --- /dev/null +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStat.java @@ -0,0 +1,58 @@ +package com.geedgenetworks.connectors.kafka.rate; + +import java.io.Serializable; + +// 专门用于实时统计发送字节速率, 输出最近一秒内速率 +public class SlidingWindowRateStat implements Serializable { + private static final long windowSize = 1000; + private static final int bucketSize = 8; + private static final int bucketMask = bucketSize - 1; + private static final long bucketWindowSize = windowSize / bucketSize; + private final long[] buckets; + private int current; + private long currentWindowEndMs; + private long totalValue; + + public SlidingWindowRateStat() { + this.buckets = new long[bucketSize]; + this.current = (bucketSize - 1) & bucketMask; + this.currentWindowEndMs = System.currentTimeMillis() / bucketWindowSize * bucketWindowSize; + } + + public void record(long value) { + long timeMs = System.currentTimeMillis(); + currentUpdate(timeMs); + buckets[current] += value; + totalValue += value; + } + + public long recordAndGetCurrentRate(long value) { + long timeMs = System.currentTimeMillis(); + currentUpdate(timeMs); + buckets[current] += value; + totalValue += value; + return totalValue; + } + + public long getCurrentRate() { + long timeMs = System.currentTimeMillis(); + currentUpdate(timeMs); + return totalValue; + } + + public long getCurrentRate(long timeMs) { + currentUpdate(timeMs); + return totalValue; + } + + private void currentUpdate(long timeMs){ + if (timeMs > currentWindowEndMs) { + do { + current = (current + 1) & bucketMask; + totalValue -= buckets[current]; + buckets[current] = 0; + currentWindowEndMs += bucketWindowSize; + } while (currentWindowEndMs < timeMs); + } + } +} diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java index ff0bb34..09e8190 100644 --- a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java +++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java @@ -17,6 +17,8 @@ package org.apache.flink.streaming.connectors.kafka; +import com.geedgenetworks.connectors.kafka.rate.RateLimitingStatus; +import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategy; import com.geedgenetworks.core.metrics.InternalMetrics; import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Internal; @@ -253,6 +255,8 @@ public class GrootFlinkKafkaProducer * Cache of metrics to replace already registered metrics instead of overwriting existing ones. */ private final Map previouslyCreatedMetrics = new HashMap<>(); + private RateLimitingStrategy rateLimitingStrategy; + private boolean rateLimitingEnable; private transient InternalMetrics internalMetrics; @@ -760,6 +764,11 @@ public class GrootFlinkKafkaProducer this.logFailuresOnly = logFailuresOnly; } + public void setRateLimitingStrategy(RateLimitingStrategy rateLimitingStrategy) { + this.rateLimitingStrategy = rateLimitingStrategy; + this.rateLimitingEnable = rateLimitingStrategy != null && rateLimitingStrategy.rateLimited(); + } + /** * Disables the propagation of exceptions thrown when committing presumably timed out Kafka * transactions during recovery of the job. If a Kafka transaction is timed out, a commit will @@ -781,6 +790,12 @@ public class GrootFlinkKafkaProducer /** Initializes the connection to Kafka. */ @Override public void open(Configuration configuration) throws Exception { + if(rateLimitingEnable){ + int subtasks = getRuntimeContext().getNumberOfParallelSubtasks(); + long subTaskMaxRate = rateLimitingStrategy.maxRate() / subtasks; + rateLimitingStrategy = rateLimitingStrategy.withMaxRate(subTaskMaxRate); + LOG.error("rateLimitingStrategy: {}", rateLimitingStrategy); + } internalMetrics = new InternalMetrics(getRuntimeContext()); if (logFailuresOnly) { callback = @@ -914,8 +929,17 @@ public class GrootFlinkKafkaProducer record = kafkaSchema.serialize(next, context.timestamp()); } - pendingRecords.incrementAndGet(); - transaction.producer.send(record, callback); + if (!rateLimitingEnable) { + pendingRecords.incrementAndGet(); + transaction.producer.send(record, callback); + } else { + if (rateLimitingStrategy.record(record.value().length) == RateLimitingStatus.DROP) { + internalMetrics.incrementDroppedEvents(); + } else { + pendingRecords.incrementAndGet(); + transaction.producer.send(record, callback); + } + } } catch (Exception e) { internalMetrics.incrementErrorEvents(); LOG.error("serialize error", e); diff --git a/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyTest.java b/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyTest.java new file mode 100644 index 0000000..ebf902f --- /dev/null +++ b/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/RateLimitingStrategyTest.java @@ -0,0 +1,266 @@ +package com.geedgenetworks.connectors.kafka.rate; + +import org.apache.flink.util.TimeUtils; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class RateLimitingStrategyTest { + + public static void main(String[] args) throws Exception { + //testNoRateLimitingStrategy(); + //testBlockDropRateLimitingStrategyForDrop(); + //testBlockDropRateLimitingStrategyForBlockDrop(); + //testBlockDropRateLimitingStrategyForBlock(); + + testBlockDropRateLimitingStrategyForDropMultiThread(); + } + + public static void testNoRateLimitingStrategy() throws Exception{ + RateLimitingStrategy strategy = new NoRateLimitingStrategy(); + + int times = 500 * 60; + long startTs = System.currentTimeMillis(); + long ts; + long lastTs = startTs; + long diff; + long pass = 0; + long drop = 0; + for (int i = 0; i < times; i++) { + if(i < 500 * 30){ + long value = 2000; + RateLimitingStatus status = strategy.record(value); + if(status == RateLimitingStatus.DROP){ + drop += value; + }else{ + pass += value; + } + } + ts = System.currentTimeMillis(); + if(ts - lastTs >= 100){ + lastTs = ts; + System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ":" + pass + ", " + drop); + } + diff = startTs + i * 2 - ts; + if(diff > 0 ){ + Thread.sleep(diff); + } + } + + } + + public static void testBlockDropRateLimitingStrategyForDrop() throws Exception{ + BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 0, 0 ); + + int times = 500 * 60; + long startTs = System.currentTimeMillis(); + long ts; + long lastTs = startTs; + long diff; + long pass = 0; + long drop = 0; + for (int i = 0; i < times; i++) { + if(i < 500 * 30){ + long value = 2000; + RateLimitingStatus status = strategy.record(value); + if(status == RateLimitingStatus.DROP){ + drop += value; + }else{ + pass += value; + } + } + ts = System.currentTimeMillis(); + if(ts - lastTs >= 100){ + lastTs = ts; + System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ":" + pass + ", " + drop); + } + diff = startTs + i * 2 - ts; + if(diff > 0 ){ + Thread.sleep(diff); + } + } + + + strategy.outLimitingStat(System.currentTimeMillis() + 300000L); + } + + public static void testBlockDropRateLimitingStrategyForBlockDrop() throws Exception{ + //BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 10000, 1000 ); + BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 40000, 1000 ); + + int times = 500 * 60; + long startTs = System.currentTimeMillis(); + long ts; + long lastTs = startTs; + long diff; + long diffSum = 0; + long pass = 0; + long drop = 0; + for (int i = 0; i < times; i++) { + ts = System.currentTimeMillis(); + if(ts - lastTs >= 100){ + lastTs = ts; + System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ":" + pass + ", " + drop); + } + diff = startTs + (i + 1) * 2 - ts; + if(diff > 0 ){ + if(i < 500 * 30){ + diffSum += diff; + } + Thread.sleep(diff); + } + + if(i < 500 * 30){ + long value = 2000; + RateLimitingStatus status = strategy.record(value); + if(status == RateLimitingStatus.DROP){ + drop += value; + }else{ + pass += value; + } + } + } + + System.out.println("diffSum:" + diffSum); + strategy.outLimitingStat(System.currentTimeMillis() + 300000L); + } + + public static void testBlockDropRateLimitingStrategyForBlock() throws Exception{ + //RateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 10000, 1000 ); + BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, TimeUtils.parseDuration("1d").toMillis(), 1000 ); + + int times = 500 * 60; + long startTs = System.currentTimeMillis(); + long ts; + long lastTs = startTs; + long diff; + long diffSum = 0; + long pass = 0; + long drop = 0; + for (int i = 0; i < times; i++) { + ts = System.currentTimeMillis(); + if(ts - lastTs >= 100){ + lastTs = ts; + System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ":" + pass + ", " + drop); + } + diff = startTs + (i + 1) * 2 - ts; + if(diff > 0 ){ + if(i < 500 * 30){ + diffSum += diff; + } + Thread.sleep(diff); + } + + if(i < 500 * 30){ + long value = 2000; + RateLimitingStatus status = strategy.record(value); + if(status == RateLimitingStatus.DROP){ + drop += value; + }else{ + pass += value; + } + } + + } + + System.out.println("diffSum:" + diffSum); + strategy.outLimitingStat(System.currentTimeMillis() + 300000L); + } + + + public static void testBlockDropRateLimitingStrategyForDropMultiThread() throws Exception{ + final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1000000); + + Thread producerThread = new Thread(() -> { + int times = 500 * 60; + long startTs = System.currentTimeMillis(); + long ts; + long diff; + for (int i = 0; i < times; i++) { + ts = System.currentTimeMillis(); + diff = startTs + (i + 1) * 2 - ts; + if(diff > 0 ){ + try { + Thread.sleep(diff); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + if(i < 500 * 30){ + long value = 2000; + try { + queue.put(value); + //System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ",put"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + "producerThread end"); + }); + + Thread consumerThread = new Thread(() -> { + //BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 40000, 1000 ); + BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 30000, 1000 ); + //BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, 0, 1000 ); + //BlockDropRateLimitingStrategy strategy = new BlockDropRateLimitingStrategy(600000, TimeUtils.parseDuration("1d").toMillis(), 1000 ); + System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ",poll_before"); + long startTs = System.currentTimeMillis(); + long ts; + long lastTs = startTs; + long pass = 0; + long drop = 0; + int i = 1; + long diff; + while (true){ + try { + Long value = queue.poll(60, TimeUnit.MICROSECONDS); + //System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ",poll"); + if(value == null){ + break; + } + + ts = System.currentTimeMillis(); + + diff = startTs + (i + 1) * 2 * 12 / 6 - ts; + //diff = startTs + (i + 1) * 2 - ts; + //diff = startTs + (i + 1) * 2* 11 / 6 - ts; + if(diff > 0 ){ + try { + Thread.sleep(diff); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + i++; + + if(ts - lastTs >= 100){ + lastTs = ts; + System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ":" + pass + ", " + drop); + } + + RateLimitingStatus status = strategy.record(value); + if(status == RateLimitingStatus.DROP){ + drop += value; + }else{ + pass += value; + } + } catch (InterruptedException e) { + break; + } + } + + System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ":" + pass + ", " + drop); + strategy.outLimitingStat(System.currentTimeMillis() + 300000L); + }); + + producerThread.start(); + consumerThread.start(); + + producerThread.join(); + consumerThread.join(); + } + +} \ No newline at end of file diff --git a/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStatTest.java b/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStatTest.java new file mode 100644 index 0000000..65c3641 --- /dev/null +++ b/groot-connectors/connector-kafka/src/test/java/com/geedgenetworks/connectors/kafka/rate/SlidingWindowRateStatTest.java @@ -0,0 +1,76 @@ +package com.geedgenetworks.connectors.kafka.rate; + +import com.geedgenetworks.connectors.kafka.rate.SlidingWindowRateStat; + +public class SlidingWindowRateStatTest { + + public static void main(String[] args) throws Exception{ + //testRate(); + testRateLimit(); + } + + public static void testRate() throws Exception{ + SlidingWindowRateStat stat = new SlidingWindowRateStat(); + + System.out.println(new java.sql.Timestamp(System.currentTimeMillis())); + + int times = 500 * 60; + long startTs = System.currentTimeMillis(); + long ts; + long diff; + for (int i = 0; i < times; i++) { + if(i < 500 * 30){ + stat.record(2000); + } + ts = System.currentTimeMillis(); + if(i % 50 == 0){ + System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ":" + stat.getCurrentRate()); + } + diff = startTs + i * 2 - ts; + if(diff > 0 ){ + Thread.sleep(diff); + } + } + + } + + public static void testRateLimit() throws Exception{ + SlidingWindowRateStat stat = new SlidingWindowRateStat(); + + System.out.println(new java.sql.Timestamp(System.currentTimeMillis())); + + int times = 500 * 60; + long startTs = System.currentTimeMillis(); + long ts; + long diff; + long rate; + boolean limit = false; + for (int i = 0; i < times; i++) { + rate = stat.getCurrentRate(); + if(rate < 600000){ + if(limit == true){ + limit = false; + System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + "," + rate +", limit:" + i); + } + if(i < 500 * 30){ + stat.record(2000); + } + }else{ + if(limit == false){ + limit = true; + System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + "," + rate +", limit:" + i); + } + } + + ts = System.currentTimeMillis(); + if(i % 50 == 0){ + System.out.println(new java.sql.Timestamp(System.currentTimeMillis()) + ":" + stat.getCurrentRate()); + } + diff = startTs + i * 2 - ts; + if(diff > 0 ){ + Thread.sleep(diff); + } + } + + } +} -- cgit v1.2.3