diff options
Diffstat (limited to 'src/main/java/com/zdjizhi/sink/HosSink.java')
| -rw-r--r-- | src/main/java/com/zdjizhi/sink/HosSink.java | 116 |
1 files changed, 61 insertions, 55 deletions
diff --git a/src/main/java/com/zdjizhi/sink/HosSink.java b/src/main/java/com/zdjizhi/sink/HosSink.java index 06a18a3..59a3209 100644 --- a/src/main/java/com/zdjizhi/sink/HosSink.java +++ b/src/main/java/com/zdjizhi/sink/HosSink.java @@ -6,6 +6,7 @@ import cn.hutool.core.text.CharSequenceUtil; import cn.hutool.core.util.*; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.google.common.util.concurrent.RateLimiter; import com.zdjizhi.config.Configs; import com.zdjizhi.pojo.FileChunk; import com.zdjizhi.utils.HttpClientUtil; @@ -29,6 +30,7 @@ import org.apache.http.util.EntityUtils; import java.io.IOException; import java.io.InterruptedIOException; import java.net.SocketException; +import java.net.URI; import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.Executors; @@ -71,13 +73,13 @@ public class HosSink extends RichSinkFunction<FileChunk> { public transient Counter pcapngChunksCounter; public transient Counter mediaChunksCounter; private boolean isAsync; - private CloseableHttpClient syncHttpClient; - private CloseableHttpAsyncClient asyncHttpClient; + private transient CloseableHttpClient syncHttpClient; + private transient CloseableHttpAsyncClient asyncHttpClient; private int loadBalanceMode; - private List<String> endpointList; - private volatile String endpoint; + private List<String> healthyEndpoints; + private volatile String healthyEndpoint; private String token; - private volatile String bathPutUrl; + private String bathPutKey; private HashMap<String, String> hosMessage; private String objectsMeta; private String objectsOffset; @@ -85,13 +87,12 @@ public class HosSink extends RichSinkFunction<FileChunk> { private long batchSize; private long batchInterval; private long chunkSize; - private ScheduledExecutorService executorService; + private transient ScheduledExecutorService executorService; private long rateLimitThreshold; private String rateLimitExpression; - private volatile long timestamp; - private long count; - private JexlExpression jexlExpression; - private JexlContext jexlContext; + private transient JexlExpression jexlExpression; + private transient JexlContext jexlContext; + private transient RateLimiter rateLimiter; public HosSink(Configuration configuration) { this.configuration = configuration; @@ -157,13 +158,25 @@ public class HosSink extends RichSinkFunction<FileChunk> { metricGroup.meter("numHtmlChunksOutPerSecond", new MeterView(htmlChunksCounter)); metricGroup.meter("numPcapngChunksOutPerSecond", new MeterView(pcapngChunksCounter)); metricGroup.meter("numMediaChunksOutPerSecond", new MeterView(mediaChunksCounter)); - endpointList = Arrays.asList(configuration.get(Configs.SINK_HOS_ENDPOINT).split(",")); - if (endpointList.size() == 1) { + executorService = Executors.newScheduledThreadPool(2); + String[] endpoints = configuration.get(Configs.SINK_HOS_ENDPOINT).split(","); + healthyEndpoints = new ArrayList<>(); + healthyEndpoints.addAll(Arrays.asList(endpoints)); + if (endpoints.length == 1) { loadBalanceMode = 0; - endpoint = endpointList.get(0); - } else { + healthyEndpoint = healthyEndpoints.get(0); + } else if (endpoints.length > 1) { loadBalanceMode = 1; - endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size())); + healthyEndpoint = RandomUtil.randomEle(healthyEndpoints); + executorService.scheduleWithFixedDelay(() -> { + for (String endpoint : endpoints) { + if (!PublicUtil.checkHealth(endpoint)) { + healthyEndpoints.remove(endpoint); + } else if (!healthyEndpoints.contains(endpoint)) { + healthyEndpoints.add(endpoint); + } + } + }, configuration.get(Configs.SINK_HOS_HEALTH_CHECK_INTERVAL_MS), configuration.get(Configs.SINK_HOS_HEALTH_CHECK_INTERVAL_MS), TimeUnit.MILLISECONDS); } token = configuration.get(Configs.SINK_HOS_TOKEN); isAsync = configuration.getBoolean(Configs.SINK_ASYNC); @@ -173,17 +186,15 @@ public class HosSink extends RichSinkFunction<FileChunk> { } else { syncHttpClient = HttpClientUtil.getInstance(configuration).getSyncHttpClient(); } - timestamp = System.currentTimeMillis(); batchSize = configuration.getLong(Configs.SINK_HOS_BATCH_SIZE); batchInterval = configuration.getInteger(Configs.SINK_HOS_BATCH_INTERVAL_MS); if (batchSize > 0 && batchInterval > 0) { - bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile"; + bathPutKey = PublicUtil.getUUID(); hosMessage = new HashMap<>(); byteList = new ArrayList<>(); objectsMeta = ""; objectsOffset = ""; chunkSize = 0; - executorService = Executors.newScheduledThreadPool(1); executorService.scheduleWithFixedDelay(() -> { synchronized (this) { if (!byteList.isEmpty()) { @@ -192,40 +203,39 @@ public class HosSink extends RichSinkFunction<FileChunk> { } }, batchInterval, batchInterval, TimeUnit.MILLISECONDS); } + rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD); if (rateLimitThreshold > 0) { - rateLimitThreshold = configuration.getLong(Configs.SINK_RATE_LIMIT_THRESHOLD); rateLimitExpression = configuration.getString(Configs.SINK_RATE_LIMIT_EXCLUSION_EXPRESSION); - count = 0; JexlEngine jexlEngine = new JexlBuilder().create(); jexlExpression = jexlEngine.createExpression(rateLimitExpression); jexlContext = new MapContext(); + rateLimiter = RateLimiter.create(rateLimitThreshold); } } @Override public void invoke(FileChunk fileChunk, Context context) throws RuntimeException { synchronized (this) { - long currentTimeMillis = System.currentTimeMillis(); + if (loadBalanceMode == 1) { + if (healthyEndpoints.isEmpty()) { + throw new RuntimeException("No healthy hos endpoints available"); + } else if (healthyEndpoints.size() == 1) { + healthyEndpoint = healthyEndpoints.get(0); + } else { + healthyEndpoint = RandomUtil.randomEle(healthyEndpoints); + } + } chunksInCounter.inc(); bytesInCounter.inc(fileChunk.getLength()); if (rateLimitThreshold > 0) { - count++; - if (currentTimeMillis - timestamp < 1000 && count > rateLimitThreshold) { - if (checkFileChunk(fileChunk)) { - sendFileChunk(fileChunk); - } else { - rateLimitDropChunksCounter.inc(); - } - } else if (currentTimeMillis - timestamp >= 1000) { + if (rateLimiter.tryAcquire()) { + sendFileChunk(fileChunk); + } else { if (checkFileChunk(fileChunk)) { sendFileChunk(fileChunk); } else { rateLimitDropChunksCounter.inc(); } - timestamp = currentTimeMillis; - count = 0; - } else { - sendFileChunk(fileChunk); } } else { sendFileChunk(fileChunk); @@ -261,9 +271,7 @@ public class HosSink extends RichSinkFunction<FileChunk> { hosMessage.put(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + ""); Map<String, Object> metaMap = fileChunk.getMeta(); if (metaMap != null && !metaMap.isEmpty()) { - for (String meta : metaMap.keySet()) { - hosMessage.put(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + ""); - } + metaMap.keySet().forEach(meta -> hosMessage.put(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + "")); } objectsMeta += hosMessage.toString() + ";"; hosMessage.clear(); @@ -277,18 +285,18 @@ public class HosSink extends RichSinkFunction<FileChunk> { sendBatchData(); } } else { - String url = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid()); + String url = URLUtil.normalize(healthyEndpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid()); HttpPut httpPut = new HttpPut(url); httpPut.setHeader(TOKEN, configuration.get(Configs.SINK_HOS_TOKEN)); httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2); httpPut.setHeader(HOS_COMBINE_MODE, fileChunk.getCombineMode()); String filename = fileChunk.getFileName(); - if (StrUtil.isNotEmpty(filename) && filename.contains(".")) { + if (CharSequenceUtil.isNotEmpty(filename) && filename.contains(".")) { httpPut.setHeader(HOS_META_FILENAME, filename); - } else if (StrUtil.isNotEmpty(filename) && !filename.contains(".")) { + } else if (CharSequenceUtil.isNotEmpty(filename) && !filename.contains(".")) { filename = filename + "." + fileChunk.getFileType(); httpPut.setHeader(HOS_META_FILENAME, filename); - } else if (StrUtil.isEmpty(filename) && StrUtil.isNotEmpty(fileChunk.getFileType())) { + } else if (CharSequenceUtil.isEmpty(filename) && CharSequenceUtil.isNotEmpty(fileChunk.getFileType())) { httpPut.setHeader(HOS_META_FILE_TYPE, fileChunk.getFileType()); } if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) { @@ -301,9 +309,7 @@ public class HosSink extends RichSinkFunction<FileChunk> { httpPut.setHeader(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + ""); Map<String, Object> metaMap = fileChunk.getMeta(); if (metaMap != null && !metaMap.isEmpty()) { - for (String meta : metaMap.keySet()) { - httpPut.setHeader(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + ""); - } + metaMap.keySet().forEach(meta -> httpPut.setHeader(HOS_META_PREFIX + CharSequenceUtil.toSymbolCase(meta, CharPool.DASHED), metaMap.get(meta) + "")); } httpPut.setEntity(new ByteArrayEntity(data)); executeRequest(httpPut); @@ -314,7 +320,7 @@ public class HosSink extends RichSinkFunction<FileChunk> { } private void sendBatchData() { - HttpPut httpPut = new HttpPut(bathPutUrl); + HttpPut httpPut = new HttpPut(URLUtil.normalize(healthyEndpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + bathPutKey) + "?multiFile"); httpPut.setHeader(TOKEN, token); httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2); httpPut.setHeader(HOS_COMBINE_MODE, COMBINE_MODE_SEEK); @@ -333,37 +339,37 @@ public class HosSink extends RichSinkFunction<FileChunk> { private void executeRequest(HttpPut httpPut) throws RuntimeException { if (isAsync) { - asyncHttpClient.execute(httpPut, new FutureCallback<HttpResponse>() { + asyncHttpClient.execute(httpPut, new FutureCallback<>() { @Override public void completed(HttpResponse httpResponse) { try { if (httpResponse.getStatusLine().getStatusCode() != 200) { String responseEntity = EntityUtils.toString(httpResponse.getEntity(), "UTF-8"); - LOG.error("put part to hos error. url: "+ httpPut.getURI().toString() +". code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity); + LOG.error("put part to hos error. url: " + httpPut.getURI().toString() + ". code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity); errorChunksCounter.inc(); } } catch (IOException e) { - LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), e); + LOG.error("put part to hos error. url: " + httpPut.getURI().toString(), e); errorChunksCounter.inc(); } } @Override public void failed(Exception ex) { - LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), ex); + LOG.error("put part to hos error. request failed. url: " + httpPut.getURI().toString(), ex); errorChunksCounter.inc(); if (ex instanceof IllegalStateException || ex instanceof IOReactorException) { throw new RuntimeException(ex); } if (loadBalanceMode == 1 && (ex instanceof SocketException || ex instanceof InterruptedIOException || ex instanceof UnknownHostException)) { - endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size())); - bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile"; + URI uri = httpPut.getURI(); + healthyEndpoints.remove(uri.getHost() + ":" + uri.getPort()); } } @Override public void cancelled() { - + LOG.error("put part to hos error. request cancelled. url: " + httpPut.getURI().toString()); } }); } else { @@ -372,14 +378,14 @@ public class HosSink extends RichSinkFunction<FileChunk> { response = syncHttpClient.execute(httpPut); if (response.getStatusLine().getStatusCode() != 200) { String responseEntity = EntityUtils.toString(response.getEntity(), "UTF-8"); - LOG.error("put part to hos error. url: "+ httpPut.getURI().toString() +". code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity); + LOG.error("put part to hos error. url: " + httpPut.getURI().toString() + ". code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity); errorChunksCounter.inc(); } } catch (IOException e) { - LOG.error("put part to hos error. url: "+ httpPut.getURI().toString(), e); + LOG.error("put part to hos error. url: " + httpPut.getURI().toString(), e); errorChunksCounter.inc(); if (loadBalanceMode == 1 && (e instanceof SocketException || e instanceof InterruptedIOException || e instanceof UnknownHostException)) { - endpoint = endpointList.get(RandomUtil.randomInt(endpointList.size())); + healthyEndpoints.remove(healthyEndpoint); } } finally { IoUtil.close(response); @@ -388,7 +394,7 @@ public class HosSink extends RichSinkFunction<FileChunk> { } private boolean checkFileChunk(FileChunk fileChunk) { - if (StrUtil.isNotEmpty(rateLimitExpression)) { + if (CharSequenceUtil.isNotEmpty(rateLimitExpression)) { jexlContext.set(fileChunk.getClass().getSimpleName(), fileChunk); return Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString()); } |
