diff options
| author | houjinchuan <[email protected]> | 2024-07-17 18:10:02 +0800 |
|---|---|---|
| committer | houjinchuan <[email protected]> | 2024-07-17 18:10:02 +0800 |
| commit | beb553dddfa278dde1501c5dedc64f886f573db9 (patch) | |
| tree | 759e380cad476760349c07f1b16ba41e414c8942 /src/main/java/com/zdjizhi/sink/HBaseSink.java | |
| parent | da7fecc4c10015dff0f4008b8a442302648ed8d8 (diff) | |
Diffstat (limited to 'src/main/java/com/zdjizhi/sink/HBaseSink.java')
| -rw-r--r-- | src/main/java/com/zdjizhi/sink/HBaseSink.java | 65 |
1 files changed, 27 insertions, 38 deletions
diff --git a/src/main/java/com/zdjizhi/sink/HBaseSink.java b/src/main/java/com/zdjizhi/sink/HBaseSink.java index 3ba9d15..321fc00 100644 --- a/src/main/java/com/zdjizhi/sink/HBaseSink.java +++ b/src/main/java/com/zdjizhi/sink/HBaseSink.java @@ -1,9 +1,10 @@ package com.zdjizhi.sink; import cn.hutool.core.io.IoUtil; -import cn.hutool.core.util.StrUtil; +import cn.hutool.core.text.CharSequenceUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.google.common.util.concurrent.RateLimiter; import com.zdjizhi.config.Configs; import com.zdjizhi.pojo.FileChunk; import com.zdjizhi.utils.HBaseColumnConstants; @@ -63,27 +64,26 @@ public class HBaseSink extends RichSinkFunction<FileChunk> { public transient Counter pcapngChunksCounter; public transient Counter mediaChunksCounter; private boolean isAsync; - private Connection syncHBaseConnection; - private AsyncConnection asyncHBaseConnection; - private Table table; - private Table indexTimeTable; - private Table indexFilenameTable; - private AsyncTable<AdvancedScanResultConsumer> asyncTable; - private AsyncTable<AdvancedScanResultConsumer> asyncIndexTimeTable; - private AsyncTable<AdvancedScanResultConsumer> asyncIndexFilenameTable; - private List<Put> dataPutList; - private List<Put> indexTimePutList; - private List<Put> indexFilenamePutList; + private transient Connection syncHBaseConnection; + private transient AsyncConnection asyncHBaseConnection; + private transient Table table; + private transient Table indexTimeTable; + private transient Table indexFilenameTable; + private transient AsyncTable<AdvancedScanResultConsumer> asyncTable; + private transient AsyncTable<AdvancedScanResultConsumer> asyncIndexTimeTable; + private transient AsyncTable<AdvancedScanResultConsumer> asyncIndexFilenameTable; + private transient List<Put> dataPutList; + private transient List<Put> indexTimePutList; + private transient List<Put> indexFilenamePutList; private long chunkSize; private long batchSize; private long batchInterval; - private ScheduledExecutorService executorService; + private transient ScheduledExecutorService executorService; private long rateLimitThreshold; private String rateLimitExpression; - private volatile long timestamp; - private long count; - private JexlExpression jexlExpression; - private JexlContext jexlContext; + private transient JexlExpression jexlExpression; + private transient JexlContext jexlContext; + private transient RateLimiter rateLimiter; public HBaseSink(Configuration configuration) { this.configuration = configuration; @@ -161,7 +161,6 @@ public class HBaseSink extends RichSinkFunction<FileChunk> { indexTimeTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_time_" + configuration.get(Configs.SINK_HOS_BUCKET))); indexFilenameTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_filename_" + configuration.get(Configs.SINK_HOS_BUCKET))); } - timestamp = System.currentTimeMillis(); batchSize = configuration.getLong(Configs.SINK_HBASE_BATCH_SIZE); batchInterval = configuration.getInteger(Configs.SINK_HBASE_BATCH_INTERVAL_MS); dataPutList = new ArrayList<>(); @@ -178,40 +177,30 @@ public class HBaseSink extends RichSinkFunction<FileChunk> { } }, batchInterval, batchInterval, TimeUnit.MILLISECONDS); } + rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD); if (rateLimitThreshold > 0) { - rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD); rateLimitExpression = configuration.getString(Configs.SINK_RATE_LIMIT_EXCLUSION_EXPRESSION); - count = 0; JexlEngine jexlEngine = new JexlBuilder().create(); jexlExpression = jexlEngine.createExpression(rateLimitExpression); jexlContext = new MapContext(); + rateLimiter = RateLimiter.create(rateLimitThreshold); } } @Override public void invoke(FileChunk fileChunk, Context context) { synchronized (this) { - long currentTimeMillis = System.currentTimeMillis(); chunksInCounter.inc(); bytesInCounter.inc(fileChunk.getLength()); if (rateLimitThreshold > 0) { - count++; - if (currentTimeMillis - timestamp < 1000 && count > rateLimitThreshold) { - if (checkFileChunk(fileChunk)) { - sendFileChunk(fileChunk); - } else { - rateLimitDropChunksCounter.inc(); - } - } else if (currentTimeMillis - timestamp >= 1000) { + if(rateLimiter.tryAcquire()){ + sendFileChunk(fileChunk); + }else { if (checkFileChunk(fileChunk)) { sendFileChunk(fileChunk); } else { rateLimitDropChunksCounter.inc(); } - timestamp = currentTimeMillis; - count = 0; - } else { - sendFileChunk(fileChunk); } } else { sendFileChunk(fileChunk); @@ -303,21 +292,21 @@ public class HBaseSink extends RichSinkFunction<FileChunk> { return null; }); dataPutList.clear(); - if (indexTimePutList.size() > 0) { + if (!indexTimePutList.isEmpty()) { asyncIndexTimeTable.batch(indexTimePutList); indexTimePutList.clear(); } - if (indexFilenamePutList.size() > 0) { + if (!indexFilenamePutList.isEmpty()) { asyncIndexFilenameTable.batch(indexFilenamePutList); indexFilenamePutList.clear(); } } else { try { table.batch(dataPutList, null); - if (indexTimePutList.size() > 0) { + if (!indexTimePutList.isEmpty()) { indexTimeTable.batch(indexTimePutList, null); } - if (indexFilenamePutList.size() > 0) { + if (!indexFilenamePutList.isEmpty()) { indexFilenameTable.batch(indexFilenamePutList, null); } } catch (IOException | InterruptedException e) { @@ -334,7 +323,7 @@ public class HBaseSink extends RichSinkFunction<FileChunk> { } private boolean checkFileChunk(FileChunk fileChunk) { - if (StrUtil.isNotEmpty(rateLimitExpression)) { + if (CharSequenceUtil.isNotEmpty(rateLimitExpression)) { jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk); return Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString()); } |
