summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/zdjizhi/FileChunkCombiner.java32
-rw-r--r--src/main/java/com/zdjizhi/config/Configs.java57
-rw-r--r--src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java6
-rw-r--r--src/main/java/com/zdjizhi/function/FileChunkFilter.java48
-rw-r--r--src/main/java/com/zdjizhi/sink/HBaseSink.java210
-rw-r--r--src/main/java/com/zdjizhi/sink/HosSink.java213
-rw-r--r--src/main/java/com/zdjizhi/utils/HBaseColumnConstants.java50
-rw-r--r--src/main/java/com/zdjizhi/utils/HBaseConnectionUtil.java59
-rw-r--r--src/main/java/com/zdjizhi/utils/HttpClientUtil.java87
-rw-r--r--src/main/java/com/zdjizhi/utils/HttpHeaderConstants.java54
-rw-r--r--src/main/java/com/zdjizhi/utils/PublicConstants.java33
-rw-r--r--src/main/java/com/zdjizhi/utils/PublicUtil.java77
-rw-r--r--src/main/resources/common.properties40
13 files changed, 837 insertions, 129 deletions
diff --git a/src/main/java/com/zdjizhi/FileChunkCombiner.java b/src/main/java/com/zdjizhi/FileChunkCombiner.java
index 06f7402..1c5fc08 100644
--- a/src/main/java/com/zdjizhi/FileChunkCombiner.java
+++ b/src/main/java/com/zdjizhi/FileChunkCombiner.java
@@ -3,12 +3,12 @@ package com.zdjizhi;
import com.zdjizhi.config.Configs;
import com.zdjizhi.function.*;
import com.zdjizhi.pojo.*;
+import com.zdjizhi.sink.HBaseSink;
import com.zdjizhi.sink.HosSink;
import com.zdjizhi.kafka.KafkaConsumer;
import com.zdjizhi.trigger.LastChunkOrNoDataInTimeTrigger;
import com.zdjizhi.trigger.MultipleTrigger;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
@@ -24,9 +24,8 @@ import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
-import java.util.Objects;
-public class FileChunkCombiner extends KafkaConsumer {
+public class FileChunkCombiner {
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);
@@ -43,7 +42,7 @@ public class FileChunkCombiner extends KafkaConsumer {
.name("Kafka Source")
.map(new ParseMessagePackMapFunction())
.name("Map: Parse Message Pack")
- .filter((FilterFunction<FileChunk>) Objects::nonNull)
+ .filter(new FileChunkFilter(configuration.getLong(Configs.FILE_MAX_SIZE), configuration.getString(Configs.FILTER_EXPRESSION)))
.assignTimestampsAndWatermarks(watermarkStrategy);
OutputTag<FileChunk> delayedChunkOutputTag = new OutputTag<FileChunk>("delayed-chunk") {
@@ -63,14 +62,23 @@ public class FileChunkCombiner extends KafkaConsumer {
.setParallelism(configuration.get(Configs.COMBINER_WINDOW_PARALLELISM))
.disableChaining();
- HosSink hosSink = new HosSink(configuration);
- windowStream.addSink(hosSink)
- .name("Hos")
- .setParallelism(configuration.get(Configs.SINK_HOS_PARALLELISM));
- windowStream.getSideOutput(delayedChunkOutputTag)
- .map(new SideOutputMapFunction())
- .addSink(hosSink)
- .name("Hos Delayed Chunk");
+ if ("hos".equals(configuration.get(Configs.SINK_TYPE))) {
+ windowStream.addSink(new HosSink(configuration))
+ .name("Hos")
+ .setParallelism(configuration.get(Configs.SINK_PARALLELISM));
+ windowStream.getSideOutput(delayedChunkOutputTag)
+ .map(new SideOutputMapFunction())
+ .addSink(new HosSink(configuration))
+ .name("Hos Delayed Chunk");
+ } else {
+ windowStream.addSink(new HBaseSink(configuration))
+ .name("HBase")
+ .setParallelism(configuration.get(Configs.SINK_PARALLELISM));
+ windowStream.getSideOutput(delayedChunkOutputTag)
+ .map(new SideOutputMapFunction())
+ .addSink(new HBaseSink(configuration))
+ .name("HBase Delayed Chunk");
+ }
environment.execute(configuration.get(Configs.FLINK_JOB_NAME));
}
diff --git a/src/main/java/com/zdjizhi/config/Configs.java b/src/main/java/com/zdjizhi/config/Configs.java
index 6d79bb8..e28426e 100644
--- a/src/main/java/com/zdjizhi/config/Configs.java
+++ b/src/main/java/com/zdjizhi/config/Configs.java
@@ -9,9 +9,6 @@ public class Configs {
.defaultValue("FILE-CHUNK-COMBINER")
.withDescription("The name of job.");
- public static final ConfigOption<Integer> SOURCE_KAFKA_PARALLELISM = ConfigOptions.key("source.kafka.parallelism")
- .intType()
- .defaultValue(1);
public static final ConfigOption<String> KAFKA_BROKER = ConfigOptions.key("source.kafka.broker")
.stringType()
.noDefaultValue();
@@ -46,9 +43,6 @@ public class Configs {
.stringType()
.noDefaultValue();
- public static final ConfigOption<Integer> PARSE_MESSAGE_PACK_PARALLELISM = ConfigOptions.key("parse.message.pack.parallelism")
- .intType()
- .defaultValue(1);
public static final ConfigOption<Integer> COMBINER_WINDOW_PARALLELISM = ConfigOptions.key("combiner.window.parallelism")
.intType()
.defaultValue(1);
@@ -58,13 +52,28 @@ public class Configs {
public static final ConfigOption<Long> COMBINER_WINDOW_IDLE_TIME = ConfigOptions.key("combiner.window.idle.time")
.longType()
.defaultValue(5L);
- public static final ConfigOption<Long> COMBINER_WINDOW_KEY_MAX_CHUNK = ConfigOptions.key("combiner.window.key.max.chunk")
- .longType()
- .defaultValue(5L);
- public static final ConfigOption<Integer> SINK_HOS_PARALLELISM = ConfigOptions.key("sink.hos.parallelism")
+ public static final ConfigOption<String> SINK_TYPE = ConfigOptions.key("sink.type")
+ .stringType()
+ .defaultValue("hos");
+ public static final ConfigOption<Integer> SINK_PARALLELISM = ConfigOptions.key("sink.parallelism")
.intType()
.defaultValue(1);
+ public static final ConfigOption<Boolean> SINK_ASYNC = ConfigOptions.key("sink.async")
+ .booleanType()
+ .defaultValue(false);
+ public static final ConfigOption<Boolean> SINK_BATCH = ConfigOptions.key("sink.batch")
+ .booleanType()
+ .defaultValue(false);
+ public static final ConfigOption<Integer> SINK_BATCH_COUNT = ConfigOptions.key("sink.batch.count")
+ .intType()
+ .defaultValue(1);
+ public static final ConfigOption<Long> SINK_BATCH_SIZE = ConfigOptions.key("sink.batch.size")
+ .longType()
+ .defaultValue(Long.MAX_VALUE);
+ public static final ConfigOption<Integer> SINK_HOS_LOAD_BALANCE_MODE = ConfigOptions.key("sink.hos.load.balance.mode")
+ .intType()
+ .defaultValue(0);
public static final ConfigOption<String> SINK_HOS_ENDPOINT = ConfigOptions.key("sink.hos.endpoint")
.stringType()
.noDefaultValue();
@@ -73,7 +82,7 @@ public class Configs {
.noDefaultValue();
public static final ConfigOption<String> SINK_HOS_TOKEN = ConfigOptions.key("sink.hos.token")
.stringType()
- .noDefaultValue();
+ .defaultValue("");
public static final ConfigOption<Integer> SINK_HOS_HTTP_MAX_TOTAL = ConfigOptions.key("sink.hos.http.max.total")
.intType()
.defaultValue(2000);
@@ -93,4 +102,30 @@ public class Configs {
.intType()
.defaultValue(60000);
+ public static final ConfigOption<String> SINK_HBASE_ZOOKEEPER = ConfigOptions.key("sink.hbase.zookeeper")
+ .stringType()
+ .defaultValue("");
+ public static final ConfigOption<Integer> SINK_HBASE_RETRIES_NUMBER = ConfigOptions.key("sink.hbase.retries.number")
+ .intType()
+ .defaultValue(10);
+ public static final ConfigOption<Integer> SINK_HBASE_RPC_TIMEOUT = ConfigOptions.key("sink.hbase.rpc.timeout")
+ .intType()
+ .defaultValue(600000);
+ public static final ConfigOption<Integer> SINK_HBASE_CLIENT_WRITE_BUFFER = ConfigOptions.key("sink.hbase.client.write.buffer")
+ .intType()
+ .defaultValue(10485760);
+ public static final ConfigOption<Integer> SINK_HBASE_CLIENT_IPC_POOL_SIZE = ConfigOptions.key("sink.hbase.client.ipc.pool.size")
+ .intType()
+ .defaultValue(1);
+
+ public static final ConfigOption<Integer> FILE_MAX_CHUNK_COUNT = ConfigOptions.key("file.max.chunk.count")
+ .intType()
+ .defaultValue(100000);
+ public static final ConfigOption<Long> FILE_MAX_SIZE = ConfigOptions.key("file.max.size")
+ .longType()
+ .defaultValue(10737418240L);
+ public static final ConfigOption<String> FILTER_EXPRESSION = ConfigOptions.key("filter.expression")
+ .stringType()
+ .defaultValue("");
+
}
diff --git a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
index 8f0f40d..5eb134d 100644
--- a/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
+++ b/src/main/java/com/zdjizhi/function/CombineChunkProcessWindowFunction.java
@@ -10,7 +10,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import java.util.List;
+import java.util.*;
public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<FileChunk, FileChunk, String, TimeWindow> {
@@ -35,8 +35,8 @@ public class CombineChunkProcessWindowFunction extends ProcessWindowFunction<Fil
}
@Override
- public void process(String string, Context context, Iterable<FileChunk> elements, Collector<FileChunk> out) throws Exception {
- List<FileChunk> fileChunks = PublicUtil.combine(elements, configuration.get(Configs.COMBINER_WINDOW_KEY_MAX_CHUNK), duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter);
+ public void process(String string, Context context, Iterable<FileChunk> elements, Collector<FileChunk> out) {
+ List<FileChunk> fileChunks = PublicUtil.combine(elements, configuration.get(Configs.FILE_MAX_CHUNK_COUNT), duplicateChunkCounter, combineErrorCounter, seekChunkCounter, appendChunkCounter);
for (FileChunk fileChunk : fileChunks) {
out.collect(fileChunk);
}
diff --git a/src/main/java/com/zdjizhi/function/FileChunkFilter.java b/src/main/java/com/zdjizhi/function/FileChunkFilter.java
new file mode 100644
index 0000000..e0af784
--- /dev/null
+++ b/src/main/java/com/zdjizhi/function/FileChunkFilter.java
@@ -0,0 +1,48 @@
+package com.zdjizhi.function;
+
+import cn.hutool.core.util.StrUtil;
+import com.zdjizhi.pojo.FileChunk;
+import org.apache.commons.jexl3.*;
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+
+public class FileChunkFilter extends RichFilterFunction<FileChunk> {
+ private final long maxFileSize;
+ private final String filterExpression;
+ private transient Counter filterChunkCounter;
+ private JexlExpression jexlExpression;
+ private JexlContext jexlContext;
+
+ public FileChunkFilter(long maxFileSize, String filterExpression) {
+ this.maxFileSize = maxFileSize;
+ this.filterExpression = filterExpression;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
+ filterChunkCounter = metricGroup.counter("filterChunkCount");
+ JexlEngine jexlEngine = new JexlBuilder().create();
+ jexlExpression = jexlEngine.createExpression(filterExpression);
+ jexlContext = new MapContext();
+ }
+
+ @Override
+ public boolean filter(FileChunk value) {
+ if (value == null || value.getOffset() > maxFileSize) {
+ filterChunkCounter.inc();
+ return false;
+ }
+ if (StrUtil.isNotEmpty(filterExpression)) {
+ jexlContext.set(value.getClass().getSimpleName(), value);
+ if (!Boolean.parseBoolean(jexlExpression.evaluate(jexlContext).toString())) {
+ filterChunkCounter.inc();
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/sink/HBaseSink.java b/src/main/java/com/zdjizhi/sink/HBaseSink.java
new file mode 100644
index 0000000..15c2c01
--- /dev/null
+++ b/src/main/java/com/zdjizhi/sink/HBaseSink.java
@@ -0,0 +1,210 @@
+package com.zdjizhi.sink;
+
+import cn.hutool.core.io.IoUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.config.Configs;
+import com.zdjizhi.pojo.FileChunk;
+import com.zdjizhi.utils.HBaseColumnConstants;
+import com.zdjizhi.utils.HBaseConnectionUtil;
+import com.zdjizhi.utils.PublicConstants;
+import com.zdjizhi.utils.PublicUtil;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+
+import static com.zdjizhi.utils.PublicConstants.*;
+import static com.zdjizhi.utils.HBaseColumnConstants.*;
+
+public class HBaseSink extends RichSinkFunction<FileChunk> {
+ private static final Log LOG = LogFactory.get();
+
+ private final Configuration configuration;
+ private transient Counter sendHBaseCounter;
+ private transient Counter sendHBaseErrorCounter;
+ private transient Counter sendHBaseFileCounter;
+ private transient Counter sendHBaseChunkCounter;
+ private boolean isAsync;
+ private Connection syncHBaseConnection;
+ private AsyncConnection AsyncHBaseConnection;
+ private Table table;
+ private Table indexTimeTable;
+ private Table indexFilenameTable;
+ private AsyncTable<AdvancedScanResultConsumer> asyncTable;
+ private AsyncTable<AdvancedScanResultConsumer> asyncIndexTimeTable;
+ private AsyncTable<AdvancedScanResultConsumer> asyncIndexFilenameTable;
+ private List<Put> dataPutList;
+ private List<Put> indexTimePutList;
+ private List<Put> indexFilenamePutList;
+ private long chunkSize;
+ private int chunkCount;
+ private long maxBatchSize;
+ private long maxBatchCount;
+
+ public HBaseSink(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
+ sendHBaseCounter = metricGroup.counter("sendHBaseCount");
+ sendHBaseErrorCounter = metricGroup.counter("sendHBaseErrorCount");
+ sendHBaseFileCounter = metricGroup.counter("sendHBaseFileCount");
+ sendHBaseChunkCounter = metricGroup.counter("sendHBaseChunkCount");
+ isAsync = configuration.getBoolean(Configs.SINK_ASYNC);
+ if (isAsync) {
+ AsyncHBaseConnection = HBaseConnectionUtil.getInstance(configuration).getAsyncHBaseConnection();
+ asyncTable = AsyncHBaseConnection.getTable(TableName.valueOf("default:" + configuration.get(Configs.SINK_HOS_BUCKET)));
+ asyncIndexTimeTable = AsyncHBaseConnection.getTable(TableName.valueOf("default:index_time_" + configuration.get(Configs.SINK_HOS_BUCKET)));
+ asyncIndexFilenameTable = AsyncHBaseConnection.getTable(TableName.valueOf("default:index_filename_" + configuration.get(Configs.SINK_HOS_BUCKET)));
+ } else {
+ syncHBaseConnection = HBaseConnectionUtil.getInstance(configuration).getSyncHBaseConnection();
+ table = syncHBaseConnection.getTable(TableName.valueOf("default:" + configuration.get(Configs.SINK_HOS_BUCKET)));
+ indexTimeTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_time_" + configuration.get(Configs.SINK_HOS_BUCKET)));
+ indexFilenameTable = syncHBaseConnection.getTable(TableName.valueOf("default:index_filename_" + configuration.get(Configs.SINK_HOS_BUCKET)));
+ }
+ maxBatchSize = configuration.getLong(Configs.SINK_BATCH_SIZE);
+ maxBatchCount = configuration.getInteger(Configs.SINK_BATCH_COUNT);
+ dataPutList = new ArrayList<>();
+ indexTimePutList = new ArrayList<>();
+ indexFilenamePutList = new ArrayList<>();
+ chunkSize = 0;
+ chunkCount = 0;
+ }
+
+ @Override
+ public void invoke(FileChunk fileChunk, Context context) {
+ if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode()) && configuration.get(Configs.SINK_BATCH)) {
+ sendHBaseChunkCounter.inc();
+ byte[] data = "".getBytes();
+ if (fileChunk.getChunk() != null) {
+ data = fileChunk.getChunk();
+ }
+ long timestamp = System.currentTimeMillis();
+ Map<String, String> partMessageMap = new HashMap<>();
+ partMessageMap.put(APPEND_FILE_PART_MESSAGE_CHUNK_COUNT, fileChunk.getChunkCount() + "");
+ partMessageMap.put(APPEND_FILE_PART_MESSAGE_LAST_PART_FLAG, fileChunk.getLastChunkFlag() + "");
+ partMessageMap.put(APPEND_FILE_PART_MESSAGE_SIZE, data.length + "");
+ Put dataPut = new Put(Bytes.toBytes(PublicUtil.getRowKey(fileChunk.getUuid()) + PublicConstants.FILE_DATA_ROW_SUFFIX));
+ dataPut.addColumn(BYTE_FAMILY_DATA, Bytes.toBytes(String.valueOf(fileChunk.getOffset())), data);
+ dataPut.addColumn(BYTE_FAMILY_META, Bytes.toBytes(String.valueOf(fileChunk.getOffset())), Bytes.toBytes(partMessageMap.toString()));
+ dataPutList.add(dataPut);
+ if (fileChunk.getOffset() == 0) {
+ Put metaPut = new Put(PublicUtil.getRowKey(fileChunk.getUuid()).getBytes());
+ String filename = fileChunk.getUuid() + "." + fileChunk.getFileType();
+ String randomIndexHead = PublicUtil.getIndexDataHead(filename);
+ String indexTimeKey = randomIndexHead + "|" + timestamp + "|" + PublicUtil.getRowKey(fileChunk.getUuid());
+ String indexFilenameKey = randomIndexHead + "|" + filename;
+ metaPut.addColumn(BYTE_FAMILY_META, Bytes.toBytes(COLUMN_USER_DEFINED_META_PREFIX + FILE_META_FILE_TYPE), Bytes.toBytes(fileChunk.getFileType()));
+ metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_CONTENT_TYPE, Bytes.toBytes("application/octet-stream"));
+ metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_FILENAME, Bytes.toBytes(filename));
+ metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_UPLOAD_METHOD, Bytes.toBytes(String.valueOf(5)));
+ metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_INDEX_FILENAME_KEY, Bytes.toBytes(indexTimeKey));
+ metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_INDEX_TIME_KEY, Bytes.toBytes(indexFilenameKey));
+ metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_LAST_MODIFIED, Bytes.toBytes(timestamp));
+ metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_APPEND_MODE, Bytes.toBytes(APPEND_MODE_OFFSET));
+ metaPut.addColumn(BYTE_FAMILY_META, BYTE_COLUMN_COMBINE_MODE, Bytes.toBytes(fileChunk.getCombineMode()));
+ dataPutList.add(metaPut);
+ Put indexTimePut = new Put(Bytes.toBytes(indexTimeKey));
+ indexTimePut.addColumn(HBaseColumnConstants.BYTE_FAMILY_META, HBaseColumnConstants.BYTE_COLUMN_FILENAME, Bytes.toBytes(fileChunk.getUuid()));
+ indexTimePutList.add(indexTimePut);
+ Put indexFilenamePut = new Put(Bytes.toBytes(indexFilenameKey));
+ indexFilenamePut.addColumn(HBaseColumnConstants.BYTE_FAMILY_META, HBaseColumnConstants.BYTE_COLUMN_FILENAME, Bytes.toBytes(fileChunk.getUuid()));
+ indexFilenamePutList.add(indexFilenamePut);
+ sendHBaseFileCounter.inc();
+ } else {
+ Put metaPut = new Put(Bytes.toBytes(PublicUtil.getRowKey(fileChunk.getUuid())));
+ metaPut.addColumn(HBaseColumnConstants.BYTE_FAMILY_META, HBaseColumnConstants.BYTE_COLUMN_LAST_MODIFIED, Bytes.toBytes(timestamp));
+ dataPutList.add(metaPut);
+ }
+ chunkCount++;
+ chunkSize += data.length;
+ if (chunkSize >= maxBatchSize || chunkCount >= maxBatchCount) {
+ if (isAsync) {
+ if (dataPutList.size() > 0) {
+ List<CompletableFuture<Object>> futures = asyncTable.batch(dataPutList);
+ sendHBaseCounter.inc();
+ CompletableFuture.supplyAsync(() -> {
+ for (CompletableFuture<Object> completableFuture : futures) {
+ completableFuture.whenCompleteAsync((result, error) -> {
+ if (error != null) {
+ LOG.error("put chunk to hbase error. ", error.getMessage());
+ sendHBaseErrorCounter.inc();
+ }
+ });
+ }
+ return null;
+ });
+ dataPutList.clear();
+ }
+ if (indexTimePutList.size() > 0) {
+ asyncIndexTimeTable.batch(indexTimePutList);
+ sendHBaseCounter.inc();
+ indexTimePutList.clear();
+ }
+ if (indexFilenamePutList.size() > 0) {
+ asyncIndexFilenameTable.batch(indexFilenamePutList);
+ sendHBaseCounter.inc();
+ indexFilenamePutList.clear();
+ }
+ } else {
+ if (dataPutList.size() > 0) {
+ try {
+ sendHBaseCounter.inc();
+ table.batch(dataPutList, null);
+ } catch (IOException | InterruptedException e) {
+ LOG.error("put chunk to hbase data table error. ", e.getMessage());
+ sendHBaseErrorCounter.inc();
+ }finally {
+ dataPutList.clear();
+ }
+ }
+ if (indexTimePutList.size() > 0) {
+ try {
+ sendHBaseCounter.inc();
+ indexTimeTable.batch(indexTimePutList, null);
+ } catch (IOException | InterruptedException e) {
+ LOG.error("put chunk to hbase index time table error. ", e.getMessage());
+ sendHBaseErrorCounter.inc();
+ }finally {
+ indexTimePutList.clear();
+ }
+ }
+ if (indexFilenamePutList.size() > 0) {
+ try {
+ sendHBaseCounter.inc();
+ indexFilenameTable.batch(indexFilenamePutList, null);
+ } catch (IOException | InterruptedException e) {
+ LOG.error("put chunk to hbase index filename table error. ", e.getMessage());
+ sendHBaseErrorCounter.inc();
+ }finally {
+ indexFilenamePutList.clear();
+ }
+ }
+ }
+ chunkSize = 0;
+ chunkCount = 0;
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ IoUtil.close(table);
+ IoUtil.close(indexTimeTable);
+ IoUtil.close(indexFilenameTable);
+ IoUtil.close(syncHBaseConnection);
+ IoUtil.close(AsyncHBaseConnection);
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/sink/HosSink.java b/src/main/java/com/zdjizhi/sink/HosSink.java
index f49a43b..502c678 100644
--- a/src/main/java/com/zdjizhi/sink/HosSink.java
+++ b/src/main/java/com/zdjizhi/sink/HosSink.java
@@ -1,19 +1,61 @@
package com.zdjizhi.sink;
+import cn.hutool.core.io.IoUtil;
+import cn.hutool.core.util.*;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.config.Configs;
import com.zdjizhi.pojo.FileChunk;
import com.zdjizhi.utils.HttpClientUtil;
import com.zdjizhi.utils.PublicUtil;
+import org.apache.commons.lang.CharEncoding;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.http.conn.HttpHostConnectException;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.util.EntityUtils;
import java.io.IOException;
+import java.net.ConnectException;
+import java.util.*;
+
+import static com.zdjizhi.utils.HttpHeaderConstants.*;
+import static com.zdjizhi.utils.PublicConstants.*;
public class HosSink extends RichSinkFunction<FileChunk> {
+ private static final Log LOG = LogFactory.get();
private final Configuration configuration;
+ private transient Counter sendHosCounter;
private transient Counter sendHosErrorCounter;
+ private transient Counter sendHosFileCounter;
+ private transient Counter sendHosChunkCounter;
+ private boolean isAsync;
+ private CloseableHttpClient syncHttpClient;
+ private CloseableHttpAsyncClient asyncHttpClient;
+ private int loadBalanceMode;
+ private volatile String endpoint;
+ private List<String> ipList;
+ private List<String> portList;
+ private String token;
+ private volatile String bathPutUrl;
+ private HashMap<String, String> hosMessage;
+ private String objectsMeta = "";
+ private String objectsOffset = "";
+ private List<byte[]> byteList;
+ private long maxBatchSize;
+ private long maxBatchCount;
+ private long chunkSize = 0;
+ private int chunkCount = 0;
public HosSink(Configuration configuration) {
this.configuration = configuration;
@@ -23,17 +65,182 @@ public class HosSink extends RichSinkFunction<FileChunk> {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
+ sendHosCounter = metricGroup.counter("sendHosCount");
sendHosErrorCounter = metricGroup.counter("sendHosErrorCount");
+ sendHosFileCounter = metricGroup.counter("sendHosFileCount");
+ sendHosChunkCounter = metricGroup.counter("sendHosChunkCount");
+ loadBalanceMode = configuration.getInteger(Configs.SINK_HOS_LOAD_BALANCE_MODE);
+ if (loadBalanceMode == 0) {
+ endpoint = configuration.getString(Configs.SINK_HOS_ENDPOINT);
+ } else if (loadBalanceMode == 1) {
+ String[] ipPortArr = configuration.get(Configs.SINK_HOS_ENDPOINT).split(":");
+ ipList = Arrays.asList(ipPortArr[0].split(","));
+ portList = Arrays.asList(ipPortArr[1].split(","));
+ endpoint = ipList.get(RandomUtil.randomInt(ipList.size())) + ":" + portList.get(RandomUtil.randomInt(portList.size()));
+ }
+ token = configuration.get(Configs.SINK_HOS_TOKEN);
+ isAsync = configuration.getBoolean(Configs.SINK_ASYNC);
+ if (isAsync) {
+ asyncHttpClient = HttpClientUtil.getInstance(configuration).getAsyncHttpClient();
+ asyncHttpClient.start();
+ } else {
+ syncHttpClient = HttpClientUtil.getInstance(configuration).getSyncHttpClient();
+ }
+ bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile";
+ maxBatchSize = configuration.getLong(Configs.SINK_BATCH_SIZE);
+ maxBatchCount = configuration.getInteger(Configs.SINK_BATCH_COUNT);
+ hosMessage = new HashMap<>();
+ objectsMeta = "";
+ objectsOffset = "";
+ byteList = new ArrayList<>();
}
@Override
public void invoke(FileChunk fileChunk, Context context) {
- PublicUtil.sendToHos(fileChunk, configuration, sendHosErrorCounter);
+ byte[] data = "".getBytes();
+ if (fileChunk.getChunk() != null) {
+ data = fileChunk.getChunk();
+ }
+ sendHosChunkCounter.inc();
+ if (configuration.get(Configs.SINK_BATCH)) {
+ hosMessage.put(HOS_META_FILE_TYPE, fileChunk.getFileType());
+ hosMessage.put(HOS_META_FILENAME, fileChunk.getUuid());
+ if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) {
+ hosMessage.put(HOS_OFFSET, fileChunk.getOffset() + "");
+ hosMessage.put(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + "");
+ if (fileChunk.getOffset() == 0) {
+ sendHosFileCounter.inc();
+ }
+ } else {
+ hosMessage.put(HOS_PART_NUMBER, fileChunk.getTimestamp() + "");
+ hosMessage.put(HOS_PART_CHUNK_NUMBERS, fileChunk.getChunkNumbers());
+ }
+ hosMessage.put(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + "");
+ Map<String, Object> metaMap = fileChunk.getMeta();
+ if (metaMap != null && metaMap.size() > 0) {
+ for (String meta : metaMap.keySet()) {
+ hosMessage.put(HOS_META_PREFIX + StrUtil.toSymbolCase(meta, CharUtil.DASHED), metaMap.get(meta) + "");
+ }
+ }
+ objectsMeta += hosMessage.toString() + ";";
+ hosMessage.clear();
+ objectsOffset += data.length + ";";
+ byteList.add(data);
+ chunkCount++;
+ chunkSize += data.length;
+ if (chunkSize >= maxBatchSize || chunkCount >= maxBatchCount) {
+ HttpPut httpPut = new HttpPut(bathPutUrl);
+ httpPut.setHeader(TOKEN, token);
+ httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2);
+ httpPut.setHeader(HOS_COMBINE_MODE, fileChunk.getCombineMode());
+ httpPut.setHeader(HOS_OBJECTS_META, objectsMeta);
+ httpPut.setHeader(HOS_OBJECTS_OFFSET, objectsOffset);
+ byte[][] bytes = new byte[byteList.size()][];
+ byteList.toArray(bytes);
+ byte[] newData = ArrayUtil.addAll(bytes);
+ httpPut.setEntity(new ByteArrayEntity(newData));
+ byteList.clear();
+ executeRequest(httpPut);
+ objectsMeta = "";
+ objectsOffset = "";
+ chunkSize = 0;
+ chunkCount = 0;
+ }
+ } else {
+ String url = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid());
+ HttpPut httpPut = new HttpPut(url);
+ httpPut.setHeader(TOKEN, configuration.get(Configs.SINK_HOS_TOKEN));
+ httpPut.setHeader(HOS_UPLOAD_TYPE, UPLOAD_TYPE_APPENDV2);
+ httpPut.setHeader(HOS_COMBINE_MODE, fileChunk.getCombineMode());
+ String filename = fileChunk.getFileName();
+ if (StrUtil.isNotEmpty(filename) && filename.contains(".")) {
+ httpPut.setHeader(HOS_META_FILENAME, filename);
+ } else if (StrUtil.isNotEmpty(filename) && !filename.contains(".")) {
+ filename = filename + "." + fileChunk.getFileType();
+ httpPut.setHeader(HOS_META_FILENAME, filename);
+ } else if (StrUtil.isEmpty(filename) && StrUtil.isNotEmpty(fileChunk.getFileType())) {
+ httpPut.setHeader(HOS_META_FILE_TYPE, fileChunk.getFileType());
+ }
+ if (COMBINE_MODE_SEEK.equals(fileChunk.getCombineMode())) {
+ httpPut.setHeader(HOS_OFFSET, fileChunk.getOffset() + "");
+ httpPut.setHeader(HOS_PART_LAST_FLAG, fileChunk.getLastChunkFlag() + "");
+ if (fileChunk.getOffset() == 0) {
+ sendHosFileCounter.inc();
+ }
+ } else {
+ httpPut.setHeader(HOS_PART_NUMBER, fileChunk.getTimestamp() + "");
+ httpPut.setHeader(HOS_PART_CHUNK_NUMBERS, fileChunk.getChunkNumbers());
+ }
+ httpPut.setHeader(HOS_PART_CHUNK_COUNT, fileChunk.getChunkCount() + "");
+ Map<String, Object> metaMap = fileChunk.getMeta();
+ if (metaMap != null && metaMap.size() > 0) {
+ for (String meta : metaMap.keySet()) {
+ httpPut.setHeader(HOS_META_PREFIX + StrUtil.toSymbolCase(meta, CharUtil.DASHED), metaMap.get(meta) + "");
+ }
+ }
+ httpPut.setEntity(new ByteArrayEntity(fileChunk.getChunk()));
+ executeRequest(httpPut);
+ }
}
@Override
- public void close() throws IOException {
- HttpClientUtil.getInstance(null).close();
+ public void close() {
+ IoUtil.close(syncHttpClient);
+ IoUtil.close(asyncHttpClient);
+ }
+
+ private void executeRequest(HttpPut httpPut) {
+ sendHosCounter.inc();
+ if (isAsync) {
+ asyncHttpClient.execute(httpPut, new FutureCallback<HttpResponse>() {
+ @Override
+ public void completed(HttpResponse httpResponse) {
+ try {
+ if (httpResponse.getStatusLine().getStatusCode() != 200) {
+ String responseEntity = EntityUtils.toString(httpResponse.getEntity(), CharEncoding.UTF_8);
+ LOG.error("put part to hos error. code: " + httpResponse.getStatusLine().getStatusCode() + ". message: " + responseEntity);
+ sendHosErrorCounter.inc();
+ }
+ } catch (IOException e) {
+ LOG.error("put part to hos error.", e);
+ sendHosErrorCounter.inc();
+ }
+ }
+
+ @Override
+ public void failed(Exception ex) {
+ LOG.error("put part to hos error.", ex);
+ sendHosErrorCounter.inc();
+ if (loadBalanceMode == 1 && ex instanceof ConnectException) {
+ endpoint = ipList.get(RandomUtil.randomInt(ipList.size())) + ":" + portList.get(RandomUtil.randomInt(portList.size()));
+ bathPutUrl = URLUtil.normalize(endpoint + "/hos/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + PublicUtil.getUUID()) + "?multiFile";
+ }
+ }
+
+ @Override
+ public void cancelled() {
+
+ }
+ });
+ } else {
+ CloseableHttpResponse response = null;
+ try {
+ response = syncHttpClient.execute(httpPut);
+ if (response.getStatusLine().getStatusCode() != 200) {
+ String responseEntity = EntityUtils.toString(response.getEntity(), CharEncoding.UTF_8);
+ LOG.error("put part to hos error. code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity);
+ sendHosErrorCounter.inc();
+ }
+ } catch (IOException e) {
+ LOG.error("put part to hos error.", e);
+ sendHosErrorCounter.inc();
+ if (loadBalanceMode == 1 && (e instanceof HttpHostConnectException || e instanceof ConnectTimeoutException)) {
+ endpoint = ipList.get(RandomUtil.randomInt(ipList.size())) + ":" + portList.get(RandomUtil.randomInt(portList.size()));
+ }
+ } finally {
+ IoUtil.close(response);
+ }
+ }
}
}
diff --git a/src/main/java/com/zdjizhi/utils/HBaseColumnConstants.java b/src/main/java/com/zdjizhi/utils/HBaseColumnConstants.java
new file mode 100644
index 0000000..e4f3f51
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/HBaseColumnConstants.java
@@ -0,0 +1,50 @@
+package com.zdjizhi.utils;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+public interface HBaseColumnConstants {
+
+ String FAMILY_DATA = "data";
+ String FAMILY_META = "meta";
+ String COLUMN_FILENAME = "filename";
+ String COLUMN_CONTENT_TYPE = "content_type";
+ String COLUMN_CONTENT_LENGTH = "content_length";
+ String COLUMN_LAST_MODIFIED = "last_modified";
+ String COLUMN_INDEX_FILENAME_KEY = "index_filename_key";
+ String COLUMN_INDEX_TIME_KEY = "index_time_key";
+ String COLUMN_UPLOAD_METHOD = "upload_method";
+ String COLUMN_STORAGE_BACKEND = "storage_backend";
+ String COLUMN_FILE_PATH = "file_path";
+ String COLUMN_APPEND_MODE = "append_mode";
+ String COLUMN_APPEND_PART_SIZE = "append_part_size";
+ String COLUMN_APPEND_PART_CHUNK_COUNT = "append_part_chunk_count";
+ String COLUMN_COMBINE_MODE = "combine_mode";
+ String COLUMN_USER_DEFINED_META_PREFIX = "user_defined_meta_";
+ String BUCKET_COLUMN_USER = "user";
+ String BUCKET_COLUMN_PRIVILEGE = "privilege";
+ String BUCKET_COLUMN_TTL = "ttl";
+ String BUCKET_COLUMN_WAL = "wal";
+ String BUCKET_COLUMN_LOCATION = "location";
+
+ byte[] BYTE_FAMILY_DATA = Bytes.toBytes(FAMILY_DATA);
+ byte[] BYTE_FAMILY_META = Bytes.toBytes(FAMILY_META);
+ byte[] BYTE_COLUMN_FILENAME = Bytes.toBytes(COLUMN_FILENAME);
+ byte[] BYTE_COLUMN_CONTENT_TYPE = Bytes.toBytes(COLUMN_CONTENT_TYPE);
+ byte[] BYTE_COLUMN_CONTENT_LENGTH = Bytes.toBytes(COLUMN_CONTENT_LENGTH);
+ byte[] BYTE_COLUMN_LAST_MODIFIED = Bytes.toBytes(COLUMN_LAST_MODIFIED);
+ byte[] BYTE_COLUMN_INDEX_FILENAME_KEY = Bytes.toBytes(COLUMN_INDEX_FILENAME_KEY);
+ byte[] BYTE_COLUMN_INDEX_TIME_KEY = Bytes.toBytes(COLUMN_INDEX_TIME_KEY);
+ byte[] BYTE_COLUMN_UPLOAD_METHOD = Bytes.toBytes(COLUMN_UPLOAD_METHOD);
+ byte[] BYTE_COLUMN_STORAGE_BACKEND = Bytes.toBytes(COLUMN_STORAGE_BACKEND);
+ byte[] BYTE_COLUMN_FILE_PATH = Bytes.toBytes(COLUMN_FILE_PATH);
+ byte[] BYTE_COLUMN_APPEND_MODE = Bytes.toBytes(COLUMN_APPEND_MODE);
+ byte[] BYTE_COLUMN_APPEND_PART_SIZE = Bytes.toBytes(COLUMN_APPEND_PART_SIZE);
+ byte[] BYTE_COLUMN_APPEND_PART_CHUNK_COUNT = Bytes.toBytes(COLUMN_APPEND_PART_CHUNK_COUNT);
+ byte[] BYTE_COLUMN_COMBINE_MODE = Bytes.toBytes(COLUMN_COMBINE_MODE);
+ byte[] BYTE_BUCKET_COLUMN_USER = Bytes.toBytes(BUCKET_COLUMN_USER);
+ byte[] BYTE_BUCKET_COLUMN_PRIVILEGE = Bytes.toBytes(BUCKET_COLUMN_PRIVILEGE);
+ byte[] BYTE_BUCKET_COLUMN_TTL = Bytes.toBytes(BUCKET_COLUMN_TTL);
+ byte[] BYTE_BUCKET_COLUMN_WAL = Bytes.toBytes(BUCKET_COLUMN_WAL);
+ byte[] BYTE_BUCKET_COLUMN_LOCATION = Bytes.toBytes(BUCKET_COLUMN_LOCATION);
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/HBaseConnectionUtil.java b/src/main/java/com/zdjizhi/utils/HBaseConnectionUtil.java
new file mode 100644
index 0000000..be82770
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/HBaseConnectionUtil.java
@@ -0,0 +1,59 @@
+package com.zdjizhi.utils;
+
+import com.zdjizhi.config.Configs;
+import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionConfiguration;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+public class HBaseConnectionUtil {
+
+ private static HBaseConnectionUtil hbaseConnectionUtil = null;
+ private final org.apache.hadoop.conf.Configuration hbaseConfiguration;
+
+ private HBaseConnectionUtil(Configuration configuration) {
+ hbaseConfiguration = HBaseConfiguration.create();
+ hbaseConfiguration.set(HConstants.ZOOKEEPER_QUORUM, configuration.getString(Configs.SINK_HBASE_ZOOKEEPER));
+ hbaseConfiguration.set(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, "2181");
+ hbaseConfiguration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase");
+ hbaseConfiguration.set(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, configuration.get(Configs.SINK_HBASE_RETRIES_NUMBER) + "");
+ hbaseConfiguration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, configuration.get(Configs.SINK_HBASE_RPC_TIMEOUT) + "");
+ hbaseConfiguration.set(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, "1073741800");
+ hbaseConfiguration.set(ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY, configuration.get(Configs.SINK_HBASE_CLIENT_WRITE_BUFFER) + "");
+ hbaseConfiguration.set(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, configuration.get(Configs.SINK_HBASE_CLIENT_IPC_POOL_SIZE) + "");
+
+ }
+
+ public static synchronized HBaseConnectionUtil getInstance(Configuration configuration) {
+ if (null == hbaseConnectionUtil) {
+ hbaseConnectionUtil = new HBaseConnectionUtil(configuration);
+ }
+ return hbaseConnectionUtil;
+ }
+
+ public Connection getSyncHBaseConnection() {
+ Connection syncHBaseConnection;
+ try {
+ syncHBaseConnection = ConnectionFactory.createConnection(hbaseConfiguration);
+ } catch (IOException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ return syncHBaseConnection;
+ }
+
+ public AsyncConnection getAsyncHBaseConnection() {
+ AsyncConnection asyncHBaseConnection;
+ try {
+ asyncHBaseConnection = ConnectionFactory.createAsyncConnection(hbaseConfiguration).get();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ return asyncHBaseConnection;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/HttpClientUtil.java
index 37f8975..f2e5d33 100644
--- a/src/main/java/com/zdjizhi/utils/HttpClientUtil.java
+++ b/src/main/java/com/zdjizhi/utils/HttpClientUtil.java
@@ -2,14 +2,11 @@ package com.zdjizhi.utils;
import com.zdjizhi.config.Configs;
import org.apache.flink.configuration.Configuration;
-import org.apache.http.Header;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpRequest;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
@@ -17,13 +14,18 @@ import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
-import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
+import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.nio.reactor.ConnectingIOReactor;
+import org.apache.http.nio.reactor.IOReactorException;
import javax.net.ssl.*;
-import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.UnknownHostException;
@@ -34,19 +36,10 @@ import java.security.cert.X509Certificate;
public class HttpClientUtil {
private static HttpClientUtil httpClientUtil;
- private final CloseableHttpClient closeableHttpClient;
private final Configuration configuration;
private HttpClientUtil(Configuration configuration) {
this.configuration = configuration;
- closeableHttpClient = HttpClients.custom()
- // 把请求相关的超时信息设置到连接客户端
- .setDefaultRequestConfig(getRequestConfig())
- // 把请求重试设置到连接客户端
- .setRetryHandler(getRetryHandler())
- // 配置连接池管理对象
- .setConnectionManager(getSslClientManager())
- .build();
}
private RequestConfig getRequestConfig() {
@@ -87,7 +80,7 @@ public class HttpClientUtil {
};
}
- private PoolingHttpClientConnectionManager getSslClientManager() {
+ private PoolingHttpClientConnectionManager getSyncSslClientManager() {
PoolingHttpClientConnectionManager connManager;
try {
X509TrustManager trustManager = new X509TrustManager() {
@@ -122,6 +115,45 @@ public class HttpClientUtil {
return connManager;
}
+ private PoolingNHttpClientConnectionManager getAsyncSslClientManager() {
+ PoolingNHttpClientConnectionManager connManager;
+ try {
+// X509TrustManager trustManager = new X509TrustManager() {
+// @Override
+// public X509Certificate[] getAcceptedIssuers() {
+// return null;
+// }
+//
+// @Override
+// public void checkClientTrusted(X509Certificate[] xcs, String str) {
+// }
+//
+// @Override
+// public void checkServerTrusted(X509Certificate[] xcs, String str) {
+// }
+// };
+// SSLContext sslContext = SSLContext.getInstance(SSLConnectionSocketFactory.TLS);
+// sslContext.init(null, new TrustManager[]{trustManager}, null);
+// SSLIOSessionStrategy sslioSessionStrategy = new SSLIOSessionStrategy(sslContext, SSLIOSessionStrategy.ALLOW_ALL_HOSTNAME_VERIFIER);
+// SSLIOSessionStrategy defaultStrategy = SSLIOSessionStrategy.getDefaultStrategy();
+// Registry<SchemeIOSessionStrategy> schemeIOSessionStrategyRegistry = RegistryBuilder.<SchemeIOSessionStrategy>create()
+// .register("http", NoopIOSessionStrategy.INSTANCE)
+// .register("https", sslioSessionStrategy).build();
+
+ IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
+ .setIoThreadCount(Runtime.getRuntime().availableProcessors())
+ .setSoKeepAlive(true)
+ .build();
+ ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
+ connManager = new PoolingNHttpClientConnectionManager(ioReactor);
+ connManager.setMaxTotal(configuration.get(Configs.SINK_HOS_HTTP_MAX_TOTAL));
+ connManager.setDefaultMaxPerRoute(configuration.get(Configs.SINK_HOS_HTTP_MAX_PER_ROUTE));
+ } catch (IOReactorException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ return connManager;
+ }
+
public static synchronized HttpClientUtil getInstance(Configuration configuration) {
if (null == httpClientUtil) {
httpClientUtil = new HttpClientUtil(configuration);
@@ -129,20 +161,19 @@ public class HttpClientUtil {
return httpClientUtil;
}
- public void close() throws IOException {
- closeableHttpClient.close();
+ public CloseableHttpClient getSyncHttpClient() {
+ return HttpClients.custom()
+ .setDefaultRequestConfig(getRequestConfig())
+ .setRetryHandler(getRetryHandler())
+ .setConnectionManager(getSyncSslClientManager())
+ .build();
}
- public CloseableHttpResponse httpPut(String url, byte[] requestBody, Header... headers) throws IOException {
- HttpPut put = new HttpPut(url);
- if (StringUtil.isNotEmpty(headers)) {
- for (Header header : headers) {
- if (StringUtil.isNotEmpty(header)) {
- put.addHeader(header);
- }
- }
- }
- put.setEntity(new ByteArrayEntity(requestBody));
- return closeableHttpClient.execute(put);
+ public CloseableHttpAsyncClient getAsyncHttpClient() {
+ return HttpAsyncClients.custom()
+ .setDefaultRequestConfig(getRequestConfig())
+ .setConnectionManager(getAsyncSslClientManager())
+ .build();
}
+
}
diff --git a/src/main/java/com/zdjizhi/utils/HttpHeaderConstants.java b/src/main/java/com/zdjizhi/utils/HttpHeaderConstants.java
new file mode 100644
index 0000000..1ef3d93
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/HttpHeaderConstants.java
@@ -0,0 +1,54 @@
+package com.zdjizhi.utils;
+
+public interface HttpHeaderConstants {
+ /*
+ * Standard HTTP Headers
+ */
+ String CACHE_CONTROL = "Cache-Control";
+ String CONTENT_DISPOSITION = "Content-Disposition";
+ String CONTENT_ENCODING = "Content-Encoding";
+ String CONTENT_LENGTH = "Content-Length";
+ String CONTENT_RANGE = "Content-Range";
+ String CONTENT_MD5 = "Content-MD5";
+ String CONTENT_TYPE = "Content-Type";
+ String CONTENT_LANGUAGE = "Content-Language";
+ String DATE = "Date";
+ String LAST_MODIFIED = "Last-Modified";
+ String SERVER = "Server";
+ String CONNECTION = "Connection";
+ String ETAG = "ETag";
+
+ /*
+ * Hos HTTP Headers
+ */
+ String AMZ_ACL = "x-amz-acl";
+ String AMZ_CONTENT_SHA_256 = "x-amz-content-sha256";
+ String AMZ_DATE = "x-amz-date";
+ String HOS_WAL = "x-hos-wal";
+ String HOS_COMPRESSION = "x-hos-compression";
+ String HOS_SPLITS = "x-hos-splits";
+ String HOS_UPLOAD_TYPE = "x-hos-upload-type";
+ String HOS_START_TIME = "x-hos-start-time";
+ String HOS_END_TIME = "x-hos-end-time";
+ String HOS_QUICK = "x-hos-quick";
+ String HOS_OBJECT_INFO = "x-hos-object-info";
+ String TOKEN = "token";
+ String HOS_POSITION = "x-hos-position";
+ String HOS_NEXT_APPEND_POSITION = "x-hos-next-append-position";
+ String HOS_COMBINE_MODE = "x-hos-combine-mode";
+ String HOS_PART_NUMBER = "x-hos-part-number";
+ String HOS_OFFSET = "x-hos-offset";
+ String HOS_PART_LAST_FLAG = "x-hos-part-last-flag";
+ String HOS_PART_CHUNK_COUNT = "x-hos-part-chunk-count";
+ String HOS_PART_CHUNK_NUMBERS = "x-hos-part-chunk-numbers";
+ String HOS_META_FILE_TYPE = "x-hos-meta-file-type";
+ String HOS_META_FILENAME = "x-hos-meta-filename";
+ String FILE_SIZE = "File-Size";
+ String LOCATION = "Location";
+ String HOS_OBJECT_TYPE = "x-hos-object-type";
+ String HOS_APPEND_INFO = "x-hos-append-info";
+ String HOS_META_PREFIX = "x-hos-meta-";
+ String HOS_METADATA_DIRECTIVE = "x-hos-metadata-directive";
+ String HOS_OBJECTS_META = "x-hos-objects-meta";
+ String HOS_OBJECTS_OFFSET = "x-hos-objects-offset";
+}
diff --git a/src/main/java/com/zdjizhi/utils/PublicConstants.java b/src/main/java/com/zdjizhi/utils/PublicConstants.java
new file mode 100644
index 0000000..39586b2
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/PublicConstants.java
@@ -0,0 +1,33 @@
+package com.zdjizhi.utils;
+
+public interface PublicConstants {
+ String STORAGE_BACKEND_HBASE = "hbase";
+ String STORAGE_BACKEND_FILESYSTEM = "filesystem";
+ String APPEND_MODE_OFFSET = "offset";
+ String APPEND_MODE_PART_NUMBER = "partNumber";
+ String COMBINE_MODE_APPEND = "append";
+ String COMBINE_MODE_SEEK = "seek";
+ String MULTIPART_FILE_PART_MESSAGE_ETAG = "etag";
+ String MULTIPART_FILE_PART_MESSAGE_SIZE = "partSize";
+ String APPEND_FILE_PART_MESSAGE_SIZE = "partSize";
+ String APPEND_FILE_PART_MESSAGE_CHUNK_COUNT = "chunkCount";
+ String APPEND_FILE_PART_MESSAGE_LAST_PART_FLAG = "lastPartFlag";
+ String APPEND_FILE_PART_MESSAGE_CHUNK_NUMBERS = "chunkNumbers";
+ String BUCKET_META_PRIVILEGE_PRIVATE = "private";
+ String BUCKET_META_PRIVILEGE_PUBLIC = "public-read-write";
+ String DEFAULT_BUCKET_META_WAL = "close";
+ String UPLOAD_TYPE_UPDATE = "put";
+ String UPLOAD_TYPE_APPENDV2 = "appendV2";
+ String UPLOAD_TYPE_APPEND = "append";
+ String DEFAULT_GET_FILE_LIST_QUICK = "false";
+ String FILE_DATA_ROW_SUFFIX = "0";
+ String FILE_DATA_COLUMN = "0";
+ String FILE_TYPE_PACPNG = "pcapng";
+ String FILE_TYPE_PACP = "pcap";
+ String FILE_META_FILE_TYPE = "file_type";
+ String FILE_META_FILENAME = "filename";
+ String OBJECT_TYPE_NORMAL = "Normal";
+ String OBJECT_TYPE_APPENDABLE = "Appendable";
+ String OBJECT_TYPE_MULTIPART = "Multipart";
+ String DEFAULT_METADATA_DIRECTIVE = "REPLACE_NEW";
+}
diff --git a/src/main/java/com/zdjizhi/utils/PublicUtil.java b/src/main/java/com/zdjizhi/utils/PublicUtil.java
index ac11fbb..6074954 100644
--- a/src/main/java/com/zdjizhi/utils/PublicUtil.java
+++ b/src/main/java/com/zdjizhi/utils/PublicUtil.java
@@ -1,25 +1,19 @@
package com.zdjizhi.utils;
-import cn.hutool.core.io.IoUtil;
import cn.hutool.core.util.ArrayUtil;
-import cn.hutool.core.util.CharUtil;
import cn.hutool.core.util.StrUtil;
+import cn.hutool.crypto.digest.DigestUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.config.Configs;
import com.zdjizhi.pojo.FileChunk;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
-import org.apache.http.Header;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.message.BasicHeader;
-import org.apache.http.util.EntityUtils;
-import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+import static com.zdjizhi.utils.PublicConstants.COMBINE_MODE_SEEK;
+
public class PublicUtil {
private static final Log LOG = LogFactory.get();
@@ -27,8 +21,9 @@ public class PublicUtil {
List<FileChunk> combinedFileChunkList = new ArrayList<>();
try {
List<FileChunk> originalFileChunkList = StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList());
+ System.out.println(originalFileChunkList);
List<byte[]> waitingToCombineChunkList = new ArrayList<>();
- if ("seek".equals(originalFileChunkList.get(0).getCombineMode())) {
+ if (COMBINE_MODE_SEEK.equals(originalFileChunkList.get(0).getCombineMode())) {
seekChunkCounter.inc();
// 按照offset排序
originalFileChunkList.sort(Comparator.comparingLong(FileChunk::getOffset));
@@ -118,7 +113,7 @@ public class PublicUtil {
byte[][] bytes = new byte[byteList.size()][];
byteList.toArray(bytes);
byte[] newData = ArrayUtil.addAll(bytes);
- if ("seek".equals(combineMode)) {
+ if (COMBINE_MODE_SEEK.equals(combineMode)) {
fileChunk.setOffset(offset);
fileChunk.setLastChunkFlag(lastChunkFlag);
} else {
@@ -137,55 +132,17 @@ public class PublicUtil {
return fileChunk;
}
- public static void sendToHos(FileChunk fileChunk, Configuration configuration,Counter sendHosErrorCounter){
- CloseableHttpResponse response = null;
- try {
- String url = configuration.get(Configs.SINK_HOS_ENDPOINT) + "/" + configuration.get(Configs.SINK_HOS_BUCKET) + "/" + fileChunk.getUuid();
- byte[] data;
- if (fileChunk.getChunk() != null) {
- data = fileChunk.getChunk();
- } else {
- data = "".getBytes();
- }
- List<Header> headers = new ArrayList<>();
- headers.add(new BasicHeader("token", configuration.get(Configs.SINK_HOS_TOKEN)));
- headers.add(new BasicHeader("x-hos-upload-type", "appendV2"));
- headers.add(new BasicHeader("x-hos-combine-mode", fileChunk.getCombineMode()));
- String filename = fileChunk.getFileName();
- if (StrUtil.isNotEmpty(filename) && filename.contains(".")) {
- headers.add(new BasicHeader("x-hos-meta-filename", filename));
- } else if (StrUtil.isNotEmpty(filename) && !filename.contains(".")) {
- filename = filename + "." + fileChunk.getFileType();
- headers.add(new BasicHeader("x-hos-meta-filename", filename));
- } else if (StrUtil.isEmpty(filename) && StrUtil.isNotEmpty(fileChunk.getFileType())) {
- headers.add(new BasicHeader("x-hos-meta-file-type", fileChunk.getFileType()));
- }
- if ("seek".equals(fileChunk.getCombineMode())) {
- headers.add(new BasicHeader("x-hos-offset", fileChunk.getOffset() + ""));
- headers.add(new BasicHeader("x-hos-part-last-flag", fileChunk.getLastChunkFlag() + ""));
- } else {
- headers.add(new BasicHeader("x-hos-part-number", fileChunk.getTimestamp() + ""));
- headers.add(new BasicHeader("x-hos-part-chunk-numbers", fileChunk.getChunkNumbers()));
- }
- headers.add(new BasicHeader("x-hos-part-chunk-count", fileChunk.getChunkCount() + ""));
- Map<String, Object> metaMap = fileChunk.getMeta();
- if (metaMap != null && metaMap.size() > 0) {
- for (String meta : metaMap.keySet()) {
- headers.add(new BasicHeader("x-hos-meta-" + StrUtil.toSymbolCase(meta, CharUtil.DASHED), metaMap.get(meta) + ""));
- }
- }
- response = HttpClientUtil.getInstance(configuration).httpPut(url, data, headers.toArray(new Header[0]));
- if (response.getStatusLine().getStatusCode() != 200) {
- String responseEntity = EntityUtils.toString(response.getEntity(), "UTF-8");
- LOG.error("put part to hos error. code: " + response.getStatusLine().getStatusCode() + ". message: " + responseEntity);
- sendHosErrorCounter.inc();
- }
- } catch (IOException e) {
- LOG.error("put part to hos error.", e);
- sendHosErrorCounter.inc();
- } finally {
- IoUtil.close(response);
- }
+ public static String getUUID() {
+ return UUID.randomUUID().toString().replace("-", "").toLowerCase();
}
+ public static String getRowKey(String filename) {
+ String md5str = DigestUtil.md5Hex(filename);
+ md5str = md5str.substring(8, 24);
+ return md5str;
+ }
+
+ public static String getIndexDataHead(String filename) {
+ return getRowKey(filename).substring(0, 1);
+ }
}
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
index e8e2d19..b71bda5 100644
--- a/src/main/resources/common.properties
+++ b/src/main/resources/common.properties
@@ -1,8 +1,7 @@
flink.job.name=agg_traffic_file_chunk_combine
#source�������
-source.kafka.parallelism=1
#9092Ϊ����֤ 9095Ϊssl 9094Ϊsasl
-source.kafka.broker=192.168.44.12:9092
+source.kafka.broker=192.168.40.151:9092,192.168.40.152:9092,192.168.40.203:9092
source.kafka.group.id=test1
source.kafka.topic=TRAFFIC-FILE-STREAM-RECORD
#earliest��ͷ��ʼ latest����
@@ -19,21 +18,38 @@ source.kafka.user=admin
source.kafka.pin=galaxy2019
#SSL��Ҫ
source.kafka.tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
-parse.message.pack.parallelism=1
#�����������
-combiner.window.parallelism=3
+combiner.window.parallelism=2
combiner.window.time=10
#�೤ʱ��δд�������򴥷�����
combiner.window.idle.time=5
-combiner.window.key.max.chunk=100000
+file.max.chunk.count=100000
+file.max.size=1073741824
+#eval����ʽ�������ֶι���
+#filter.expression=FileChunk.fileType == "txt" || FileChunk.fileType == "eml"
+#sink�������
+sink.parallelism=2
+sink.type=hos
+sink.async=false
+sink.batch=true
+sink.batch.count=100
+sink.batch.size=102400
#hos sink�������
-sink.hos.parallelism=3
-sink.hos.endpoint=http://192.168.44.12:9098/hos
+#0����nginx��1��ѯ����hos��Ĭ��0
+sink.hos.load.balance.mode=1
+#����nginx�򵥸�hos����Ϊip:port�����ʶ��hos������Ϊip1,ip2:port1,port2
+sink.hos.endpoint=192.168.40.151,192.168.40.152,192.168.40.203:8186
sink.hos.bucket=traffic_file_bucket
sink.hos.token=c21f969b5f03d33d43e04f8f136e7682
sink.hos.http.error.retry=3
-sink.hos.http.max.total=2000
-sink.hos.http.max.per.route=1000
-sink.hos.http.connect.timeout=10000
-sink.hos.http.request.timeout=10000
-sink.hos.http.socket.timeout=60000 \ No newline at end of file
+sink.hos.http.max.total=10
+sink.hos.http.max.per.route=10
+sink.hos.http.connect.timeout=1000
+sink.hos.http.request.timeout=5000
+sink.hos.http.socket.timeout=60000
+#hbase sink�������
+sink.hbase.zookeeper=192.168.44.12
+sink.hbase.retries.number=10
+sink.hbase.rpc.timeout=600000
+sink.hbase.client.write.buffer=10971520
+sink.hbase.client.ipc.pool.size=3 \ No newline at end of file