summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/sink/HBaseSink.java
diff options
context:
space:
mode:
authorhoujinchuan <[email protected]>2024-07-17 18:10:02 +0800
committerhoujinchuan <[email protected]>2024-07-17 18:10:02 +0800
commitbeb553dddfa278dde1501c5dedc64f886f573db9 (patch)
tree759e380cad476760349c07f1b16ba41e414c8942 /src/main/java/com/zdjizhi/sink/HBaseSink.java
parentda7fecc4c10015dff0f4008b8a442302648ed8d8 (diff)
修复hos挂掉一台恢复后,hos sink负载不均衡的问题HEADv1.3.5develop
Diffstat (limited to 'src/main/java/com/zdjizhi/sink/HBaseSink.java')
-rw-r--r--src/main/java/com/zdjizhi/sink/HBaseSink.java65
1 files changed, 27 insertions, 38 deletions
diff --git a/src/main/java/com/zdjizhi/sink/HBaseSink.java b/src/main/java/com/zdjizhi/sink/HBaseSink.java
index 3ba9d15..321fc00 100644
--- a/src/main/java/com/zdjizhi/sink/HBaseSink.java
+++ b/src/main/java/com/zdjizhi/sink/HBaseSink.java
@@ -1,9 +1,10 @@
package com.zdjizhi.sink;
import cn.hutool.core.io.IoUtil;
-import cn.hutool.core.util.StrUtil;
+import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.google.common.util.concurrent.RateLimiter;
import com.zdjizhi.config.Configs;
import com.zdjizhi.pojo.FileChunk;
import com.zdjizhi.utils.HBaseColumnConstants;
@@ -63,27 +64,26 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
public transient Counter pcapngChunksCounter;
public transient Counter mediaChunksCounter;
private boolean isAsync;
- private Connection syncHBaseConnection;
- private AsyncConnection asyncHBaseConnection;
- private Table table;
- private Table indexTimeTable;
- private Table indexFilenameTable;
- private AsyncTable<AdvancedScanResultConsumer> asyncTable;
- private AsyncTable<AdvancedScanResultConsumer> asyncIndexTimeTable;
- private AsyncTable<AdvancedScanResultConsumer> asyncIndexFilenameTable;
- private List<Put> dataPutList;
- private List<Put> indexTimePutList;
- private List<Put> indexFilenamePutList;
+ private transient Connection syncHBaseConnection;
+ private transient AsyncConnection asyncHBaseConnection;
+ private transient Table table;
+ private transient Table indexTimeTable;
+ private transient Table indexFilenameTable;
+ private transient AsyncTable<AdvancedScanResultConsumer> asyncTable;
+ private transient AsyncTable<AdvancedScanResultConsumer> asyncIndexTimeTable;
+ private transient AsyncTable<AdvancedScanResultConsumer> asyncIndexFilenameTable;
+ private transient List<Put> dataPutList;
+ private transient List<Put> indexTimePutList;
+ private transient List<Put> indexFilenamePutList;
private long chunkSize;
private long batchSize;
private long batchInterval;
- private ScheduledExecutorService executorService;
+ private transient ScheduledExecutorService executorService;
private long rateLimitThreshold;
private String rateLimitExpression;
- private volatile long timestamp;
- private long count;
- private JexlExpression jexlExpression;
- private JexlContext jexlContext;
+ private transient JexlExpression jexlExpression;
+ private transient JexlContext jexlContext;
+ private transient RateLimiter rateLimiter;
public HBaseSink(Configuration configuration) {
this.configuration = configuration;
@@ -161,7 +161,6 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
indexTimeTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_time_" + configuration.get(Configs.SINK_HOS_BUCKET)));
indexFilenameTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_filename_" + configuration.get(Configs.SINK_HOS_BUCKET)));
}
- timestamp = System.currentTimeMillis();
batchSize = configuration.getLong(Configs.SINK_HBASE_BATCH_SIZE);
batchInterval = configuration.getInteger(Configs.SINK_HBASE_BATCH_INTERVAL_MS);
dataPutList = new ArrayList<>();
@@ -178,40 +177,30 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
}
}, batchInterval, batchInterval, TimeUnit.MILLISECONDS);
}
+ rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD);
if (rateLimitThreshold > 0) {
- rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD);
rateLimitExpression = configuration.getString(Configs.SINK_RATE_LIMIT_EXCLUSION_EXPRESSION);
- count = 0;
JexlEngine jexlEngine = new JexlBuilder().create();
jexlExpression = jexlEngine.createExpression(rateLimitExpression);
jexlContext = new MapContext();
+ rateLimiter = RateLimiter.create(rateLimitThreshold);
}
}
@Override
public void invoke(FileChunk fileChunk, Context context) {
synchronized (this) {
- long currentTimeMillis = System.currentTimeMillis();
chunksInCounter.inc();
bytesInCounter.inc(fileChunk.getLength());
if (rateLimitThreshold > 0) {
- count++;
- if (currentTimeMillis - timestamp < 1000 && count > rateLimitThreshold) {
- if (checkFileChunk(fileChunk)) {
- sendFileChunk(fileChunk);
- } else {
- rateLimitDropChunksCounter.inc();
- }
- } else if (currentTimeMillis - timestamp >= 1000) {
+ if(rateLimiter.tryAcquire()){
+ sendFileChunk(fileChunk);
+ }else {
if (checkFileChunk(fileChunk)) {
sendFileChunk(fileChunk);
} else {
rateLimitDropChunksCounter.inc();
}
- timestamp = currentTimeMillis;
- count = 0;
- } else {
- sendFileChunk(fileChunk);
}
} else {
sendFileChunk(fileChunk);
@@ -303,21 +292,21 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
return null;
});
dataPutList.clear();
- if (indexTimePutList.size() > 0) {
+ if (!indexTimePutList.isEmpty()) {
asyncIndexTimeTable.batch(indexTimePutList);
indexTimePutList.clear();
}
- if (indexFilenamePutList.size() > 0) {
+ if (!indexFilenamePutList.isEmpty()) {
asyncIndexFilenameTable.batch(indexFilenamePutList);
indexFilenamePutList.clear();
}
} else {
try {
table.batch(dataPutList, null);
- if (indexTimePutList.size() > 0) {
+ if (!indexTimePutList.isEmpty()) {
indexTimeTable.batch(indexTimePutList, null);
}
- if (indexFilenamePutList.size() > 0) {
+ if (!indexFilenamePutList.isEmpty()) {
indexFilenameTable.batch(indexFilenamePutList, null);
}
} catch (IOException | InterruptedException e) {
@@ -334,7 +323,7 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
}
private boolean checkFileChunk(FileChunk fileChunk) {
- if (StrUtil.isNotEmpty(rateLimitExpression)) {
+ if (CharSequenceUtil.isNotEmpty(rateLimitExpression)) {
jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk);
return Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString());
}