summaryrefslogtreecommitdiff
path: root/src/main/java/com
diff options
context:
space:
mode:
authorhoujinchuan <[email protected]>2024-07-17 18:10:02 +0800
committerhoujinchuan <[email protected]>2024-07-17 18:10:02 +0800
commitbeb553dddfa278dde1501c5dedc64f886f573db9 (patch)
tree759e380cad476760349c07f1b16ba41e414c8942 /src/main/java/com
parentda7fecc4c10015dff0f4008b8a442302648ed8d8 (diff)
修复hos挂掉一台恢复后,hos sink负载不均衡的问题HEADv1.3.5develop
Diffstat (limited to 'src/main/java/com')
-rw-r--r--src/main/java/com/zdjizhi/FileChunkCombiner.java9
-rw-r--r--src/main/java/com/zdjizhi/config/Configs.java3
-rw-r--r--src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java10
-rw-r--r--src/main/java/com/zdjizhi/function/FileChunkFilterFunction.java8
-rw-r--r--src/main/java/com/zdjizhi/function/map/ParseMessagePackMapFunction.java4
-rw-r--r--src/main/java/com/zdjizhi/function/map/ParseProxyFileMetaFlatMapFunction.java4
-rw-r--r--src/main/java/com/zdjizhi/function/map/ParseSessionFileMetaFlatMapFunction.java8
-rw-r--r--src/main/java/com/zdjizhi/sink/HBaseSink.java65
-rw-r--r--src/main/java/com/zdjizhi/sink/HosSink.java116
-rw-r--r--src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java12
-rw-r--r--src/main/java/com/zdjizhi/utils/PublicUtil.java30
11 files changed, 147 insertions, 122 deletions
diff --git a/src/main/java/com/zdjizhi/FileChunkCombiner.java b/src/main/java/com/zdjizhi/FileChunkCombiner.java
index 6955039..e56c018 100644
--- a/src/main/java/com/zdjizhi/FileChunkCombiner.java
+++ b/src/main/java/com/zdjizhi/FileChunkCombiner.java
@@ -1,6 +1,6 @@
package com.zdjizhi;
-import cn.hutool.core.util.StrUtil;
+import cn.hutool.core.text.CharSequenceUtil;
import com.zdjizhi.config.Configs;
import com.zdjizhi.function.*;
import com.zdjizhi.function.map.ParseMessagePackMapFunction;
@@ -42,7 +42,7 @@ public class FileChunkCombiner {
List<Trigger<Object, TimeWindow>> triggers = new ArrayList<>();
triggers.add(ProcessingTimeTrigger.create());
- if (configuration.get(Configs.COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER)) {
+ if (Boolean.TRUE.equals(configuration.get(Configs.COMBINER_WINDOW_ENABLE_LAST_CHUNK_TRIGGER))) {
triggers.add(LastChunkTrigger.create());
}
Trigger<Object, TimeWindow> trigger = MultipleTrigger.of(triggers);
@@ -58,8 +58,9 @@ public class FileChunkCombiner {
SingleOutputStreamOperator<FileChunk> fileMetaProxySingleOutputStreamOperator;
for (String sinkType : configuration.get(Configs.SINK_TYPE).split(",")) {
switch (sinkType) {
+ default:
case "hos":
- if (StrUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) {
+ if (CharSequenceUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) {
windowStream
.filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "sink_hos"))
.name("Filter: Hos")
@@ -75,7 +76,7 @@ public class FileChunkCombiner {
}
break;
case "hbase":
- if (StrUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) {
+ if (CharSequenceUtil.isNotEmpty(configuration.getString(Configs.SINK_FILTER_EXPRESSION))) {
windowStream
.filter(new FileChunkFilterFunction(configuration.getString(Configs.SINK_FILTER_EXPRESSION), "sink_hbase"))
.name("Filter: HBase")
diff --git a/src/main/java/com/zdjizhi/config/Configs.java b/src/main/java/com/zdjizhi/config/Configs.java
index 25ba9aa..65f0784 100644
--- a/src/main/java/com/zdjizhi/config/Configs.java
+++ b/src/main/java/com/zdjizhi/config/Configs.java
@@ -91,6 +91,9 @@ public class Configs {
public static final ConfigOption<Integer> SINK_HOS_BATCH_INTERVAL_MS = ConfigOptions.key("sink.hos.batch.interval.ms")
.intType()
.defaultValue(0);
+ public static final ConfigOption<Integer> SINK_HOS_HEALTH_CHECK_INTERVAL_MS = ConfigOptions.key("sink.hos.health.check.interval.ms")
+ .intType()
+ .defaultValue(60000);
public static final ConfigOption<Integer> SINK_HTTP_CLIENT_MAX_TOTAL = ConfigOptions.key("sink.http.client.max.total")
.intType()
diff --git a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
index 736758d..929af5c 100644
--- a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
+++ b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
@@ -1,10 +1,10 @@
package com.zdjizhi.function;
-import cn.hutool.core.util.ArrayUtil;
+import cn.hutool.core.util.PrimitiveArrayUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.pojo.FileChunk;
-import com.zdjizhi.utils.StringUtil;
+import org.apache.commons.lang.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MeterView;
@@ -162,7 +162,7 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
timestampAndSizes.append(originalFileChunk.getTimestamp()).append("-").append(chunk.length).append(";");
}
}
- if (waitingToCombineChunkList.size() > 0) {
+ if (!waitingToCombineChunkList.isEmpty()) {
FileChunk fileChunk = combineChunk(waitingToCombineChunkList, originalFileChunkList.get(0).getUuid(), originalFileChunkList.get(0).getFileName(), originalFileChunkList.get(0).getFileType(), 0, "append", 0, originalFileChunkList.get(0).getMeta(), startTimestamp, timestampAndSizes.toString());
if (fileChunk != null) {
combinedFileChunkList.add(fileChunk);
@@ -183,12 +183,12 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
fileChunk.setChunkCount(byteList.size());
byte[][] bytes = new byte[byteList.size()][];
byteList.toArray(bytes);
- byte[] newData = ArrayUtil.addAll(bytes);
+ byte[] newData = PrimitiveArrayUtil.addAll(bytes);
if (COMBINE_MODE_SEEK.equals(combineMode)) {
fileChunk.setOffset(offset);
fileChunk.setLastChunkFlag(lastChunkFlag);
} else {
- if (StringUtil.isNotEmpty(chunkNumbers)) {
+ if (StringUtils.isNotEmpty(chunkNumbers)) {
fileChunk.setChunkNumbers(chunkNumbers);
}
}
diff --git a/src/main/java/com/zdjizhi/function/FileChunkFilterFunction.java b/src/main/java/com/zdjizhi/function/FileChunkFilterFunction.java
index 41f46f7..4a00e17 100644
--- a/src/main/java/com/zdjizhi/function/FileChunkFilterFunction.java
+++ b/src/main/java/com/zdjizhi/function/FileChunkFilterFunction.java
@@ -1,6 +1,6 @@
package com.zdjizhi.function;
-import cn.hutool.core.util.StrUtil;
+import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.pojo.FileChunk;
@@ -19,8 +19,8 @@ public class FileChunkFilterFunction extends RichFilterFunction<FileChunk> {
public transient Counter filterChunksCounter;
public transient Counter emlChunksCounter;
public transient Counter txtChunksCounter;
- private JexlExpression jexlExpression;
- private JexlContext jexlContext;
+ private transient JexlExpression jexlExpression;
+ private transient JexlContext jexlContext;
public FileChunkFilterFunction(String filterExpression, String functionName) {
this.filterExpression = filterExpression;
@@ -48,7 +48,7 @@ public class FileChunkFilterFunction extends RichFilterFunction<FileChunk> {
filterChunksCounter.inc();
return false;
}
- if (StrUtil.isNotEmpty(filterExpression)) {
+ if (CharSequenceUtil.isNotEmpty(filterExpression)) {
jexlContext.set("FileChunk", value);
if (!Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString())) {
filterChunksCounter.inc();
diff --git a/src/main/java/com/zdjizhi/function/map/ParseMessagePackMapFunction.java b/src/main/java/com/zdjizhi/function/map/ParseMessagePackMapFunction.java
index be556a4..66b9516 100644
--- a/src/main/java/com/zdjizhi/function/map/ParseMessagePackMapFunction.java
+++ b/src/main/java/com/zdjizhi/function/map/ParseMessagePackMapFunction.java
@@ -225,9 +225,7 @@ public class ParseMessagePackMapFunction extends RichMapFunction<byte[], FileChu
case "meta":
String meta = messageUnpacker.unpackString();
JSONObject metaJsonObject = JSONUtil.parseObj(meta);
- for (String key : metaJsonObject.keySet()) {
- metaMap.put(key, metaJsonObject.get(key));
- }
+ metaJsonObject.keySet().forEach(key -> metaMap.put(key, metaJsonObject.get(key)));
fileChunk.setMeta(metaMap);
break;
default:
diff --git a/src/main/java/com/zdjizhi/function/map/ParseProxyFileMetaFlatMapFunction.java b/src/main/java/com/zdjizhi/function/map/ParseProxyFileMetaFlatMapFunction.java
index 8b7ce56..a3b1f16 100644
--- a/src/main/java/com/zdjizhi/function/map/ParseProxyFileMetaFlatMapFunction.java
+++ b/src/main/java/com/zdjizhi/function/map/ParseProxyFileMetaFlatMapFunction.java
@@ -45,7 +45,7 @@ public class ParseProxyFileMetaFlatMapFunction extends RichFlatMapFunction<Strin
try {
chunksInCounter.inc();
JSONObject record = JSONObject.parseObject(value);
- if (record.containsKey("proxy_rule_list") && record.getJSONArray("proxy_rule_list").size() > 0) {
+ if (record.containsKey("proxy_rule_list") && !record.getJSONArray("proxy_rule_list").isEmpty()) {
if (record.containsKey("http_request_body")) {
FileChunk fileChunk = new FileChunk();
Map<String, Object> metaMap = new HashMap<>();
@@ -84,7 +84,7 @@ public class ParseProxyFileMetaFlatMapFunction extends RichFlatMapFunction<Strin
}
private void getFileMeta(JSONObject record, Map<String, Object> metaMap) {
- metaMap.put("policyId", record.containsKey("monitor_rule_list") && record.getJSONArray("monitor_rule_list").size() > 0 ? record.getJSONArray("monitor_rule_list").getInteger(0) : 0);
+ metaMap.put("policyId", record.containsKey("monitor_rule_list") && !record.getJSONArray("monitor_rule_list").isEmpty() ? record.getJSONArray("monitor_rule_list").getInteger(0) : 0);
metaMap.put("serverIP", record.getString("server_ip"));
metaMap.put("serverPort", record.getInteger("server_port"));
metaMap.put("clientIP", record.getString("client_ip"));
diff --git a/src/main/java/com/zdjizhi/function/map/ParseSessionFileMetaFlatMapFunction.java b/src/main/java/com/zdjizhi/function/map/ParseSessionFileMetaFlatMapFunction.java
index cdbb131..cb68d93 100644
--- a/src/main/java/com/zdjizhi/function/map/ParseSessionFileMetaFlatMapFunction.java
+++ b/src/main/java/com/zdjizhi/function/map/ParseSessionFileMetaFlatMapFunction.java
@@ -49,10 +49,8 @@ public class ParseSessionFileMetaFlatMapFunction extends RichFlatMapFunction<Str
try {
chunksInCounter.inc();
JSONObject record = JSONObject.parseObject(value);
- if (record.containsKey("security_rule_list")
- && record.getJSONArray("security_rule_list").size() > 0
- || record.containsKey("monitor_rule_list")
- && record.getJSONArray("monitor_rule_list").size() > 0) {
+ if (record.containsKey("security_rule_list") && !record.getJSONArray("security_rule_list").isEmpty()
+ || record.containsKey("monitor_rule_list") && !record.getJSONArray("monitor_rule_list").isEmpty()) {
if (record.containsKey("http_request_body")) {
FileChunk fileChunk = new FileChunk();
Map<String, Object> metaMap = new HashMap<>();
@@ -103,7 +101,7 @@ public class ParseSessionFileMetaFlatMapFunction extends RichFlatMapFunction<Str
}
private void getFileMeta(JSONObject record, Map<String, Object> metaMap) {
- metaMap.put("policyId", record.containsKey("monitor_rule_list") && record.getJSONArray("monitor_rule_list").size() > 0 ? record.getJSONArray("monitor_rule_list").getInteger(0) : 0);
+ metaMap.put("policyId", record.containsKey("monitor_rule_list") && !record.getJSONArray("monitor_rule_list").isEmpty() ? record.getJSONArray("monitor_rule_list").getInteger(0) : 0);
metaMap.put("serverIP", record.getString("server_ip"));
metaMap.put("serverPort", record.getInteger("server_port"));
metaMap.put("clientIP", record.getString("client_ip"));
diff --git a/src/main/java/com/zdjizhi/sink/HBaseSink.java b/src/main/java/com/zdjizhi/sink/HBaseSink.java
index 3ba9d15..321fc00 100644
--- a/src/main/java/com/zdjizhi/sink/HBaseSink.java
+++ b/src/main/java/com/zdjizhi/sink/HBaseSink.java
@@ -1,9 +1,10 @@
package com.zdjizhi.sink;
import cn.hutool.core.io.IoUtil;
-import cn.hutool.core.util.StrUtil;
+import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.google.common.util.concurrent.RateLimiter;
import com.zdjizhi.config.Configs;
import com.zdjizhi.pojo.FileChunk;
import com.zdjizhi.utils.HBaseColumnConstants;
@@ -63,27 +64,26 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
public transient Counter pcapngChunksCounter;
public transient Counter mediaChunksCounter;
private boolean isAsync;
- private Connection syncHBaseConnection;
- private AsyncConnection asyncHBaseConnection;
- private Table table;
- private Table indexTimeTable;
- private Table indexFilenameTable;
- private AsyncTable<AdvancedScanResultConsumer> asyncTable;
- private AsyncTable<AdvancedScanResultConsumer> asyncIndexTimeTable;
- private AsyncTable<AdvancedScanResultConsumer> asyncIndexFilenameTable;
- private List<Put> dataPutList;
- private List<Put> indexTimePutList;
- private List<Put> indexFilenamePutList;
+ private transient Connection syncHBaseConnection;
+ private transient AsyncConnection asyncHBaseConnection;
+ private transient Table table;
+ private transient Table indexTimeTable;
+ private transient Table indexFilenameTable;
+ private transient AsyncTable<AdvancedScanResultConsumer> asyncTable;
+ private transient AsyncTable<AdvancedScanResultConsumer> asyncIndexTimeTable;
+ private transient AsyncTable<AdvancedScanResultConsumer> asyncIndexFilenameTable;
+ private transient List<Put> dataPutList;
+ private transient List<Put> indexTimePutList;
+ private transient List<Put> indexFilenamePutList;
private long chunkSize;
private long batchSize;
private long batchInterval;
- private ScheduledExecutorService executorService;
+ private transient ScheduledExecutorService executorService;
private long rateLimitThreshold;
private String rateLimitExpression;
- private volatile long timestamp;
- private long count;
- private JexlExpression jexlExpression;
- private JexlContext jexlContext;
+ private transient JexlExpression jexlExpression;
+ private transient JexlContext jexlContext;
+ private transient RateLimiter rateLimiter;
public HBaseSink(Configuration configuration) {
this.configuration = configuration;
@@ -161,7 +161,6 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
indexTimeTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_time_" + configuration.get(Configs.SINK_HOS_BUCKET)));
indexFilenameTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_filename_" + configuration.get(Configs.SINK_HOS_BUCKET)));
}
- timestamp = System.currentTimeMillis();
batchSize = configuration.getLong(Configs.SINK_HBASE_BATCH_SIZE);
batchInterval = configuration.getInteger(Configs.SINK_HBASE_BATCH_INTERVAL_MS);
dataPutList = new ArrayList<>();
@@ -178,40 +177,30 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
}
}, batchInterval, batchInterval, TimeUnit.MILLISECONDS);
}
+ rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD);
if (rateLimitThreshold > 0) {
- rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD);
rateLimitExpression = configuration.getString(Configs.SINK_RATE_LIMIT_EXCLUSION_EXPRESSION);
- count = 0;
JexlEngine jexlEngine = new JexlBuilder().create();
jexlExpression = jexlEngine.createExpression(rateLimitExpression);
jexlContext = new MapContext();
+ rateLimiter = RateLimiter.create(rateLimitThreshold);
}
}
@Override
public void invoke(FileChunk fileChunk, Context context) {
synchronized (this) {
- long currentTimeMillis = System.currentTimeMillis();
chunksInCounter.inc();
bytesInCounter.inc(fileChunk.getLength());
if (rateLimitThreshold > 0) {
- count++;
- if (currentTimeMillis - timestamp < 1000 && count > rateLimitThreshold) {
- if (checkFileChunk(fileChunk)) {
- sendFileChunk(fileChunk);
- } else {
- rateLimitDropChunksCounter.inc();
- }
- } else if (currentTimeMillis - timestamp >= 1000) {
+ if(rateLimiter.tryAcquire()){
+ sendFileChunk(fileChunk);
+ }else {
if (checkFileChunk(fileChunk)) {
sendFileChunk(fileChunk);
} else {
rateLimitDropChunksCounter.inc();
}
- timestamp = currentTimeMillis;
- count = 0;
- } else {
- sendFileChunk(fileChunk);
}
} else {
sendFileChunk(fileChunk);
@@ -303,21 +292,21 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
return null;
});
dataPutList.clear();
- if (indexTimePutList.size() > 0) {
+ if (!indexTimePutList.isEmpty()) {
asyncIndexTimeTable.batch(indexTimePutList);
indexTimePutList.clear();
}
- if (indexFilenamePutList.size() > 0) {
+ if (!indexFilenamePutList.isEmpty()) {
asyncIndexFilenameTable.batch(indexFilenamePutList);
indexFilenamePutList.clear();
}
} else {
try {
table.batch(dataPutList, null);
- if (indexTimePutList.size() > 0) {
+ if (!indexTimePutList.isEmpty()) {
indexTimeTable.batch(indexTimePutList, null);
}
- if (indexFilenamePutList.size() > 0) {
+ if (!indexFilenamePutList.isEmpty()) {
indexFilenameTable.batch(indexFilenamePutList, null);
}
} catch (IOException | InterruptedException e) {
@@ -334,7 +323,7 @@ public class HBaseSink extends RichSinkFunction<FileChunk> {
}
private boolean checkFileChunk(FileChunk fileChunk) {
- if (StrUtil.isNotEmpty(rateLimitExpression)) {
+ if (CharSequenceUtil.isNotEmpty(rateLimitExpression)) {
jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk);
return Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString());
}
diff --git a/src/main/java/com/zdjizhi/sink/HosSink.java b/src/main/java/com/zdjizhi/sink/HosSink.java
index 06a18a3..59a3209 100644
--- a/src/main/java/com/zdjizhi/sink/HosSink.java
+++ b/src/main/java/com/zdjizhi/sink/HosSink.java
@@ -6,6 +6,7 @@ import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.core.util.*;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.google.common.util.concurrent.RateLimiter;
import com.zdjizhi.config.Configs;
import com.zdjizhi.pojo.FileChunk;
import com.zdjizhi.utils.HttpClientUtil;
@@ -29,6 +30,7 @@ import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketException;
+import java.net.URI;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.Executors;
@@ -71,13 +73,13 @@ public class HosSink extends RichSinkFunction<FileChunk> {
public transient Counter pcapngChunksCounter;
public transient Counter mediaChunksCounter;
private boolean isAsync;
- private CloseableHttpClient syncHttpClient;
- private CloseableHttpAsyncClient asyncHttpClient;
+ private transient CloseableHttpClient syncHttpClient;
+ private transient CloseableHttpAsyncClient asyncHttpClient;
private int loadBalanceMode;
- private List<String> endpointList;
- private volatile String endpoint;
+ private List<String> healthyEndpoints;
+ private volatile String healthyEndpoint;
private String token;
- private volatile String bathPutUrl;
+ private String bathPutKey;
private HashMap<String, String> hosMessage;
private String objectsMeta;
private String objectsOffset;
@@ -85,13 +87,12 @@ public class HosSink extends RichSinkFunction<FileChunk> {
private long batchSize;
private long batchInterval;
private long chunkSize;
- private ScheduledExecutorService executorService;
+ private transient ScheduledExecutorService executorService;
private long rateLimitThreshold;
private String rateLimitExpression;
- private volatile long timestamp;
- private long count;
- private JexlExpression jexlExpression;
- private JexlContext jexlContext;
+ private transient JexlExpression jexlExpression;
+ private transient JexlContext jexlContext;
+ private transient RateLimiter rateLimiter;
public HosSink(Configuration configuration) {
this.configuration = configuration;
@@ -157,13 +158,25 @@ public class HosSink extends RichSinkFunction<FileChunk> {
metricGroup.meter("numHtmlChunksOutPerSecond", new MeterView(htmlChunksCounter));
metricGroup.meter("numPcapngChunksOutPerSecond", new MeterView(pcapngChunksCounter));
metricGroup.meter("numMediaChunksOutPerSecond", new MeterView(mediaChunksCounter));
- endpointList = Arrays.asList(configuration.get(Configs.SINK_HOS_ENDPOINT).split(","));
- if (endpointList.size() == 1) {
+ executorService = Executors.newScheduledThreadPool(2);
+ String[] endpoints = configuration.get(Configs.SINK_HOS_ENDPOINT).split(",");
+ healthyEndpoints = new ArrayList<>();
+ healthyEndpoints.addAll(Arrays.asList(endpoints));
+ if (endpoints.length == 1) {
loadBalanceMode = 0;
- endpoint = endpointList.get(0);
- } else {
+ healthyEndpoint = healthyEndpoints.get(0);
+ } else if (endpoints.length > 1) {
loadBalanceMode = 1;
- endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size()));
+ healthyEndpoint = RandomUtil.randomEle(healthyEndpoints);
+ executorService.scheduleWithFixedDelay(() -> {
+ for (String endpoint : endpoints) {
+ if (!PublicUtil.checkHealth(endpoint)) {
+ healthyEndpoints.remove(endpoint);
+ } else if (!healthyEndpoints.contains(endpoint)) {
+ healthyEndpoints.add(endpoint);
+ }
+ }
+ }, configuration.get(Configs.SINK_HOS_HEALTH_CHECK_INTERVAL_MS), configuration.get(Configs.SINK_HOS_HEALTH_CHECK_INTERVAL_MS), TimeUnit.MILLISECONDS);
}
token = configuration.get(Configs.SINK_HOS_TOKEN);
isAsync = configuration.getBoolean(Configs.SINK_ASYNC);
@@ -173,17 +186,15 @@ public class HosSink extends RichSinkFunction<FileChunk> {
} else {
syncHttpClient = HttpClientUtil.getInstance(configuration).getSyncHttpClient();
}
- timestamp = System.currentTimeMillis();
batchSize = configuration.getLong(Configs.SINK_HOS_BATCH_SIZE);
batchInterval = configuration.getInteger(Configs.SINK_HOS_BATCH_INTERVAL_MS);
if (batchSize > 0 && batchInterval > 0) {
- bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile";
+ bathPutKey = PublicUtil.getUUID();
hosMessage = new HashMap<>();
byteList = new ArrayList<>();
objectsMeta = "";
objectsOffset = "";
chunkSize = 0;
- executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(() -> {
synchronized (this) {
if (!byteList.isEmpty()) {
@@ -192,40 +203,39 @@ public class HosSink extends RichSinkFunction<FileChunk> {
}
}, batchInterval, batchInterval, TimeUnit.MILLISECONDS);
}
+ rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD);
if (rateLimitThreshold > 0) {
- rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD);
rateLimitExpression = configuration.getString(Configs.SINK_RATE_LIMIT_EXCLUSION_EXPRESSION);
- count = 0;
JexlEngine jexlEngine = new JexlBuilder().create();
jexlExpression = jexlEngine.createExpression(rateLimitExpression);
jexlContext = new MapContext();
+ rateLimiter = RateLimiter.create(rateLimitThreshold);
}
}
@Override
public void invoke(FileChunk fileChunk, Context context) throws RuntimeException {
synchronized (this) {
- long currentTimeMillis = System.currentTimeMillis();
+ if (loadBalanceMode == 1) {
+ if (healthyEndpoints.isEmpty()) {
+ throw new RuntimeException("No healthy hos endpoints available");
+ } else if (healthyEndpoints.size() == 1) {
+ healthyEndpoint = healthyEndpoints.get(0);
+ } else {
+ healthyEndpoint = RandomUtil.randomEle(healthyEndpoints);
+ }
+ }
chunksInCounter.inc();
bytesInCounter.inc(fileChunk.getLength());
if (rateLimitThreshold > 0) {
- count++;
- if (currentTimeMillis - timestamp < 1000 && count > rateLimitThreshold) {
- if (checkFileChunk(fileChunk)) {
- sendFileChunk(fileChunk);
- } else {
- rateLimitDropChunksCounter.inc();
- }
- } else if (currentTimeMillis - timestamp >= 1000) {
+ if (rateLimiter.tryAcquire()) {
+ sendFileChunk(fileChunk);
+ } else {
if (checkFileChunk(fileChunk)) {
sendFileChunk(fileChunk);
} else {
rateLimitDropChunksCounter.inc();
}
- timestamp = currentTimeMillis;
- count = 0;
- } else {
- sendFileChunk(fileChunk);
}
} else {
sendFileChunk(fileChunk);
@@ -261,9 +271,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
hosMessage.put(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + "");
Map<String, Object> metaMap = fileChunk.getMeta();
if (metaMap != null && !metaMap.isEmpty()) {
- for (String meta : metaMap.keySet()) {
- hosMessage.put(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + "");
- }
+ metaMap.keySet().forEach(meta -> hosMessage.put(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + ""));
}
objectsMeta += hosMessage.toString() + ";";
hosMessage.clear();
@@ -277,18 +285,18 @@ public class HosSink extends RichSinkFunction<FileChunk> {
sendBatchData();
}
} else {
- String url = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid());
+ String url = URLUtil.normalize(healthyEndpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid());
HttpPut httpPut = new HttpPut(url);
httpPut.setHeader(TOKEN, configuration.get(Configs.SINK_HOS_TOKEN));
httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2);
httpPut.setHeader(HOS_COMBINE_MODE, fileChunk.getCombineMode());
String filename = fileChunk.getFileName();
- if (StrUtil.isNotEmpty(filename) && filename.contains(".")) {
+ if (CharSequenceUtil.isNotEmpty(filename) && filename.contains(".")) {
httpPut.setHeader(HOS_META_FILENAME, filename);
- } else if (StrUtil.isNotEmpty(filename) && !filename.contains(".")) {
+ } else if (CharSequenceUtil.isNotEmpty(filename) && !filename.contains(".")) {
filename = filename + "." + fileChunk.getFileType();
httpPut.setHeader(HOS_META_FILENAME, filename);
- } else if (StrUtil.isEmpty(filename) && StrUtil.isNotEmpty(fileChunk.getFileType())) {
+ } else if (CharSequenceUtil.isEmpty(filename) && CharSequenceUtil.isNotEmpty(fileChunk.getFileType())) {
httpPut.setHeader(HOS_META_FILE_TYPE, fileChunk.getFileType());
}
if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) {
@@ -301,9 +309,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
httpPut.setHeader(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + "");
Map<String, Object> metaMap = fileChunk.getMeta();
if (metaMap != null && !metaMap.isEmpty()) {
- for (String meta : metaMap.keySet()) {
- httpPut.setHeader(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + "");
- }
+ metaMap.keySet().forEach(meta -> httpPut.setHeader(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + ""));
}
httpPut.setEntity(new ByteArrayEntity(data));
executeRequest(httpPut);
@@ -314,7 +320,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
}
private void sendBatchData() {
- HttpPut httpPut = new HttpPut(bathPutUrl);
+ HttpPut httpPut = new HttpPut(URLUtil.normalize(healthyEndpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + bathPutKey) + "?multiFile");
httpPut.setHeader(TOKEN, token);
httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2);
httpPut.setHeader(HOS_COMBINE_MODE, COMBINE_MODE_SEEK);
@@ -333,37 +339,37 @@ public class HosSink extends RichSinkFunction<FileChunk> {
private void executeRequest(HttpPut httpPut) throws RuntimeException {
if (isAsync) {
- asyncHttpClient.execute(httpPut, new FutureCallback<HttpResponse>() {
+ asyncHttpClient.execute(httpPut, new FutureCallback<>() {
@Override
public void completed(HttpResponse httpResponse) {
try {
if (httpResponse.getStatusLine().getStatusCode() != 200) {
String responseEntity = EntityUtils.toString(httpResponse.getEntity(), "UTF-8");
- LOG.error("put part to hos error. url: "+ httpPut.getURI().toString() +". code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity);
+ LOG.error("put part to hos error. url: " + httpPut.getURI().toString() + ". code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity);
errorChunksCounter.inc();
}
} catch (IOException e) {
- LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), e);
+ LOG.error("put part to hos error. url: " + httpPut.getURI().toString(), e);
errorChunksCounter.inc();
}
}
@Override
public void failed(Exception ex) {
- LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), ex);
+ LOG.error("put part to hos error. request failed. url: " + httpPut.getURI().toString(), ex);
errorChunksCounter.inc();
if (ex instanceof IllegalStateException || ex instanceof IOReactorException) {
throw new RuntimeException(ex);
}
if (loadBalanceMode == 1 && (ex instanceof SocketException || ex instanceof InterruptedIOException || ex instanceof UnknownHostException)) {
- endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size()));
- bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile";
+ URI uri = httpPut.getURI();
+ healthyEndpoints.remove(uri.getHost() + ":" + uri.getPort());
}
}
@Override
public void cancelled() {
-
+ LOG.error("put part to hos error. request cancelled. url: " + httpPut.getURI().toString());
}
});
} else {
@@ -372,14 +378,14 @@ public class HosSink extends RichSinkFunction<FileChunk> {
response = syncHttpClient.execute(httpPut);
if (response.getStatusLine().getStatusCode() != 200) {
String responseEntity = EntityUtils.toString(response.getEntity(), "UTF-8");
- LOG.error("put part to hos error. url: "+ httpPut.getURI().toString() +". code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity);
+ LOG.error("put part to hos error. url: " + httpPut.getURI().toString() + ". code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity);
errorChunksCounter.inc();
}
} catch (IOException e) {
- LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), e);
+ LOG.error("put part to hos error. url: " + httpPut.getURI().toString(), e);
errorChunksCounter.inc();
if (loadBalanceMode == 1 && (e instanceof SocketException || e instanceof InterruptedIOException || e instanceof UnknownHostException)) {
- endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size()));
+ healthyEndpoints.remove(healthyEndpoint);
}
} finally {
IoUtil.close(response);
@@ -388,7 +394,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
}
private boolean checkFileChunk(FileChunk fileChunk) {
- if (StrUtil.isNotEmpty(rateLimitExpression)) {
+ if (CharSequenceUtil.isNotEmpty(rateLimitExpression)) {
jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk);
return Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString());
}
diff --git a/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java b/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java
index 8aaecff..542bc26 100644
--- a/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java
+++ b/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java
@@ -71,10 +71,10 @@ public class OssSinkByCaffeineCache extends RichSinkFunction<FileChunk> {
public transient Counter between100KBAnd1MBTxtChunksCounter;
public transient Counter greaterThan10MBTxtChunksCounter;
private boolean isAsync;
- private CloseableHttpClient syncHttpClient;
- private CloseableHttpAsyncClient asyncHttpClient;
+ private transient CloseableHttpClient syncHttpClient;
+ private transient CloseableHttpAsyncClient asyncHttpClient;
private List<String> endpointList;
- private Cache<String, FileChunk> cache;
+ private transient Cache<String, FileChunk> cache;
public OssSinkByCaffeineCache(Configuration configuration) {
this.configuration = configuration;
@@ -242,9 +242,9 @@ public class OssSinkByCaffeineCache extends RichSinkFunction<FileChunk> {
calculateFileChunkMetrics(fileChunk, fileId);
}
- private void executeRequest(HttpPost httpPost, String url) throws RuntimeException{
+ private void executeRequest(HttpPost httpPost, String url) throws RuntimeException {
if (isAsync) {
- asyncHttpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
+ asyncHttpClient.execute(httpPost, new FutureCallback<>() {
@Override
public void completed(HttpResponse httpResponse) {
try {
@@ -275,7 +275,7 @@ public class OssSinkByCaffeineCache extends RichSinkFunction<FileChunk> {
@Override
public void cancelled() {
-
+ LOG.error("post file error. request cancelled. url: " + url);
}
});
} else {
diff --git a/src/main/java/com/zdjizhi/utils/PublicUtil.java b/src/main/java/com/zdjizhi/utils/PublicUtil.java
index ef027bc..1ef2e7f 100644
--- a/src/main/java/com/zdjizhi/utils/PublicUtil.java
+++ b/src/main/java/com/zdjizhi/utils/PublicUtil.java
@@ -1,7 +1,13 @@
package com.zdjizhi.utils;
+import cn.hutool.core.io.IoUtil;
+import cn.hutool.core.util.URLUtil;
import cn.hutool.crypto.digest.DigestUtil;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.util.*;
public class PublicUtil {
@@ -19,4 +25,28 @@ public class PublicUtil {
public static String getIndexDataHead(String filename) {
return getRowKey(filename).substring(0, 1);
}
+
+ public static boolean checkHealth(String endpoint) {
+ boolean isHealth = false;
+ InputStream inputStream = null;
+ try {
+ URL url = new URL(URLUtil.normalize(endpoint + "/actuator/health"));
+ HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
+ urlConnection.setRequestMethod("GET");
+ urlConnection.setConnectTimeout(10000);
+ urlConnection.setReadTimeout(10000);
+ int responseCode = urlConnection.getResponseCode();
+ if (responseCode == HttpURLConnection.HTTP_OK) {
+ inputStream = urlConnection.getInputStream();
+ String responseBody = IoUtil.read(inputStream, StandardCharsets.UTF_8);
+ if (responseBody.contains("UP")) {
+ isHealth = true;
+ }
+ }
+ } catch (Exception ignored) {
+ } finally {
+ IoUtil.close(inputStream);
+ }
+ return isHealth;
+ }
}