summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/sink/HosSink.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/zdjizhi/sink/HosSink.java')
-rw-r--r--src/main/java/com/zdjizhi/sink/HosSink.java116
1 files changed, 61 insertions, 55 deletions
diff --git a/src/main/java/com/zdjizhi/sink/HosSink.java b/src/main/java/com/zdjizhi/sink/HosSink.java
index 06a18a3..59a3209 100644
--- a/src/main/java/com/zdjizhi/sink/HosSink.java
+++ b/src/main/java/com/zdjizhi/sink/HosSink.java
@@ -6,6 +6,7 @@ import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.core.util.*;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.google.common.util.concurrent.RateLimiter;
import com.zdjizhi.config.Configs;
import com.zdjizhi.pojo.FileChunk;
import com.zdjizhi.utils.HttpClientUtil;
@@ -29,6 +30,7 @@ import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketException;
+import java.net.URI;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.Executors;
@@ -71,13 +73,13 @@ public class HosSink extends RichSinkFunction<FileChunk> {
public transient Counter pcapngChunksCounter;
public transient Counter mediaChunksCounter;
private boolean isAsync;
- private CloseableHttpClient syncHttpClient;
- private CloseableHttpAsyncClient asyncHttpClient;
+ private transient CloseableHttpClient syncHttpClient;
+ private transient CloseableHttpAsyncClient asyncHttpClient;
private int loadBalanceMode;
- private List<String> endpointList;
- private volatile String endpoint;
+ private List<String> healthyEndpoints;
+ private volatile String healthyEndpoint;
private String token;
- private volatile String bathPutUrl;
+ private String bathPutKey;
private HashMap<String, String> hosMessage;
private String objectsMeta;
private String objectsOffset;
@@ -85,13 +87,12 @@ public class HosSink extends RichSinkFunction<FileChunk> {
private long batchSize;
private long batchInterval;
private long chunkSize;
- private ScheduledExecutorService executorService;
+ private transient ScheduledExecutorService executorService;
private long rateLimitThreshold;
private String rateLimitExpression;
- private volatile long timestamp;
- private long count;
- private JexlExpression jexlExpression;
- private JexlContext jexlContext;
+ private transient JexlExpression jexlExpression;
+ private transient JexlContext jexlContext;
+ private transient RateLimiter rateLimiter;
public HosSink(Configuration configuration) {
this.configuration = configuration;
@@ -157,13 +158,25 @@ public class HosSink extends RichSinkFunction<FileChunk> {
metricGroup.meter("numHtmlChunksOutPerSecond", new MeterView(htmlChunksCounter));
metricGroup.meter("numPcapngChunksOutPerSecond", new MeterView(pcapngChunksCounter));
metricGroup.meter("numMediaChunksOutPerSecond", new MeterView(mediaChunksCounter));
- endpointList = Arrays.asList(configuration.get(Configs.SINK_HOS_ENDPOINT).split(","));
- if (endpointList.size() == 1) {
+ executorService = Executors.newScheduledThreadPool(2);
+ String[] endpoints = configuration.get(Configs.SINK_HOS_ENDPOINT).split(",");
+ healthyEndpoints = new ArrayList<>();
+ healthyEndpoints.addAll(Arrays.asList(endpoints));
+ if (endpoints.length == 1) {
loadBalanceMode = 0;
- endpoint = endpointList.get(0);
- } else {
+ healthyEndpoint = healthyEndpoints.get(0);
+ } else if (endpoints.length > 1) {
loadBalanceMode = 1;
- endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size()));
+ healthyEndpoint = RandomUtil.randomEle(healthyEndpoints);
+ executorService.scheduleWithFixedDelay(() -> {
+ for (String endpoint : endpoints) {
+ if (!PublicUtil.checkHealth(endpoint)) {
+ healthyEndpoints.remove(endpoint);
+ } else if (!healthyEndpoints.contains(endpoint)) {
+ healthyEndpoints.add(endpoint);
+ }
+ }
+ }, configuration.get(Configs.SINK_HOS_HEALTH_CHECK_INTERVAL_MS), configuration.get(Configs.SINK_HOS_HEALTH_CHECK_INTERVAL_MS), TimeUnit.MILLISECONDS);
}
token = configuration.get(Configs.SINK_HOS_TOKEN);
isAsync = configuration.getBoolean(Configs.SINK_ASYNC);
@@ -173,17 +186,15 @@ public class HosSink extends RichSinkFunction<FileChunk> {
} else {
syncHttpClient = HttpClientUtil.getInstance(configuration).getSyncHttpClient();
}
- timestamp = System.currentTimeMillis();
batchSize = configuration.getLong(Configs.SINK_HOS_BATCH_SIZE);
batchInterval = configuration.getInteger(Configs.SINK_HOS_BATCH_INTERVAL_MS);
if (batchSize > 0 && batchInterval > 0) {
- bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile";
+ bathPutKey = PublicUtil.getUUID();
hosMessage = new HashMap<>();
byteList = new ArrayList<>();
objectsMeta = "";
objectsOffset = "";
chunkSize = 0;
- executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(() -> {
synchronized (this) {
if (!byteList.isEmpty()) {
@@ -192,40 +203,39 @@ public class HosSink extends RichSinkFunction<FileChunk> {
}
}, batchInterval, batchInterval, TimeUnit.MILLISECONDS);
}
+ rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD);
if (rateLimitThreshold > 0) {
- rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD);
rateLimitExpression = configuration.getString(Configs.SINK_RATE_LIMIT_EXCLUSION_EXPRESSION);
- count = 0;
JexlEngine jexlEngine = new JexlBuilder().create();
jexlExpression = jexlEngine.createExpression(rateLimitExpression);
jexlContext = new MapContext();
+ rateLimiter = RateLimiter.create(rateLimitThreshold);
}
}
@Override
public void invoke(FileChunk fileChunk, Context context) throws RuntimeException {
synchronized (this) {
- long currentTimeMillis = System.currentTimeMillis();
+ if (loadBalanceMode == 1) {
+ if (healthyEndpoints.isEmpty()) {
+ throw new RuntimeException("No healthy hos endpoints available");
+ } else if (healthyEndpoints.size() == 1) {
+ healthyEndpoint = healthyEndpoints.get(0);
+ } else {
+ healthyEndpoint = RandomUtil.randomEle(healthyEndpoints);
+ }
+ }
chunksInCounter.inc();
bytesInCounter.inc(fileChunk.getLength());
if (rateLimitThreshold > 0) {
- count++;
- if (currentTimeMillis - timestamp < 1000 && count > rateLimitThreshold) {
- if (checkFileChunk(fileChunk)) {
- sendFileChunk(fileChunk);
- } else {
- rateLimitDropChunksCounter.inc();
- }
- } else if (currentTimeMillis - timestamp >= 1000) {
+ if (rateLimiter.tryAcquire()) {
+ sendFileChunk(fileChunk);
+ } else {
if (checkFileChunk(fileChunk)) {
sendFileChunk(fileChunk);
} else {
rateLimitDropChunksCounter.inc();
}
- timestamp = currentTimeMillis;
- count = 0;
- } else {
- sendFileChunk(fileChunk);
}
} else {
sendFileChunk(fileChunk);
@@ -261,9 +271,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
hosMessage.put(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + "");
Map<String, Object> metaMap = fileChunk.getMeta();
if (metaMap != null && !metaMap.isEmpty()) {
- for (String meta : metaMap.keySet()) {
- hosMessage.put(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + "");
- }
+ metaMap.keySet().forEach(meta -> hosMessage.put(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + ""));
}
objectsMeta += hosMessage.toString() + ";";
hosMessage.clear();
@@ -277,18 +285,18 @@ public class HosSink extends RichSinkFunction<FileChunk> {
sendBatchData();
}
} else {
- String url = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid());
+ String url = URLUtil.normalize(healthyEndpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid());
HttpPut httpPut = new HttpPut(url);
httpPut.setHeader(TOKEN, configuration.get(Configs.SINK_HOS_TOKEN));
httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2);
httpPut.setHeader(HOS_COMBINE_MODE, fileChunk.getCombineMode());
String filename = fileChunk.getFileName();
- if (StrUtil.isNotEmpty(filename) && filename.contains(".")) {
+ if (CharSequenceUtil.isNotEmpty(filename) && filename.contains(".")) {
httpPut.setHeader(HOS_META_FILENAME, filename);
- } else if (StrUtil.isNotEmpty(filename) && !filename.contains(".")) {
+ } else if (CharSequenceUtil.isNotEmpty(filename) && !filename.contains(".")) {
filename = filename + "." + fileChunk.getFileType();
httpPut.setHeader(HOS_META_FILENAME, filename);
- } else if (StrUtil.isEmpty(filename) && StrUtil.isNotEmpty(fileChunk.getFileType())) {
+ } else if (CharSequenceUtil.isEmpty(filename) && CharSequenceUtil.isNotEmpty(fileChunk.getFileType())) {
httpPut.setHeader(HOS_META_FILE_TYPE, fileChunk.getFileType());
}
if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) {
@@ -301,9 +309,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
httpPut.setHeader(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + "");
Map<String, Object> metaMap = fileChunk.getMeta();
if (metaMap != null && !metaMap.isEmpty()) {
- for (String meta : metaMap.keySet()) {
- httpPut.setHeader(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + "");
- }
+ metaMap.keySet().forEach(meta -> httpPut.setHeader(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + ""));
}
httpPut.setEntity(new ByteArrayEntity(data));
executeRequest(httpPut);
@@ -314,7 +320,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
}
private void sendBatchData() {
- HttpPut httpPut = new HttpPut(bathPutUrl);
+ HttpPut httpPut = new HttpPut(URLUtil.normalize(healthyEndpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + bathPutKey) + "?multiFile");
httpPut.setHeader(TOKEN, token);
httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2);
httpPut.setHeader(HOS_COMBINE_MODE, COMBINE_MODE_SEEK);
@@ -333,37 +339,37 @@ public class HosSink extends RichSinkFunction<FileChunk> {
private void executeRequest(HttpPut httpPut) throws RuntimeException {
if (isAsync) {
- asyncHttpClient.execute(httpPut, new FutureCallback<HttpResponse>() {
+ asyncHttpClient.execute(httpPut, new FutureCallback<>() {
@Override
public void completed(HttpResponse httpResponse) {
try {
if (httpResponse.getStatusLine().getStatusCode() != 200) {
String responseEntity = EntityUtils.toString(httpResponse.getEntity(), "UTF-8");
- LOG.error("put part to hos error. url: "+ httpPut.getURI().toString() +". code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity);
+ LOG.error("put part to hos error. url: " + httpPut.getURI().toString() + ". code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity);
errorChunksCounter.inc();
}
} catch (IOException e) {
- LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), e);
+ LOG.error("put part to hos error. url: " + httpPut.getURI().toString(), e);
errorChunksCounter.inc();
}
}
@Override
public void failed(Exception ex) {
- LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), ex);
+ LOG.error("put part to hos error. request failed. url: " + httpPut.getURI().toString(), ex);
errorChunksCounter.inc();
if (ex instanceof IllegalStateException || ex instanceof IOReactorException) {
throw new RuntimeException(ex);
}
if (loadBalanceMode == 1 && (ex instanceof SocketException || ex instanceof InterruptedIOException || ex instanceof UnknownHostException)) {
- endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size()));
- bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile";
+ URI uri = httpPut.getURI();
+ healthyEndpoints.remove(uri.getHost() + ":" + uri.getPort());
}
}
@Override
public void cancelled() {
-
+ LOG.error("put part to hos error. request cancelled. url: " + httpPut.getURI().toString());
}
});
} else {
@@ -372,14 +378,14 @@ public class HosSink extends RichSinkFunction<FileChunk> {
response = syncHttpClient.execute(httpPut);
if (response.getStatusLine().getStatusCode() != 200) {
String responseEntity = EntityUtils.toString(response.getEntity(), "UTF-8");
- LOG.error("put part to hos error. url: "+ httpPut.getURI().toString() +". code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity);
+ LOG.error("put part to hos error. url: " + httpPut.getURI().toString() + ". code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity);
errorChunksCounter.inc();
}
} catch (IOException e) {
- LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), e);
+ LOG.error("put part to hos error. url: " + httpPut.getURI().toString(), e);
errorChunksCounter.inc();
if (loadBalanceMode == 1 && (e instanceof SocketException || e instanceof InterruptedIOException || e instanceof UnknownHostException)) {
- endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size()));
+ healthyEndpoints.remove(healthyEndpoint);
}
} finally {
IoUtil.close(response);
@@ -388,7 +394,7 @@ public class HosSink extends RichSinkFunction<FileChunk> {
}
private boolean checkFileChunk(FileChunk fileChunk) {
- if (StrUtil.isNotEmpty(rateLimitExpression)) {
+ if (CharSequenceUtil.isNotEmpty(rateLimitExpression)) {
jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk);
return Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString());
}