summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-02-22 13:50:32 +0800
committerlifengchao <[email protected]>2024-02-22 13:50:32 +0800
commit64da48538f993bc652404c84b56dd2c71bc25ff5 (patch)
treea8c1dded67fd33f4f779eaf39ae916580817c9d0
parent68991747b9dd5775ba58439b51331a686c650294 (diff)
[feature][connector-kafka][connector-clickhouse] GAL-486 kafka clickhouse source sink输出Metrics
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java24
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java650
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java17
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java5
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java4
-rw-r--r--groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer2.java55
-rw-r--r--groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer2.java1909
-rw-r--r--groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/RuntimeContextAware.java7
-rw-r--r--groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher2.java38
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java4
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatOptions.java30
-rw-r--r--groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java4
-rw-r--r--groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java3
-rw-r--r--groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufOptions.java70
14 files changed, 2421 insertions, 399 deletions
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
index e3e9e76..ce58cda 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
@@ -5,6 +5,7 @@ import com.geedgenetworks.connectors.clickhouse.jdbc.ClickHouseBatchInsertConnec
import com.geedgenetworks.connectors.clickhouse.jdbc.ClickHousePreparedBatchInsertStatement;
import com.geedgenetworks.connectors.clickhouse.jdbc.DataTypeStringV2;
import com.geedgenetworks.connectors.clickhouse.util.ClickHouseUtils;
+import com.geedgenetworks.core.metrics.InternalMetrics;
import com.github.housepower.data.*;
import com.github.housepower.data.type.*;
import com.github.housepower.data.type.complex.*;
@@ -55,10 +56,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
private int batchSize;
private long batchIntervalMs;
private transient volatile boolean closed;
- private transient Counter numRecordsOut;
- private transient Counter numRecordsOutFailed;
- private transient Meter numRecordsOutPerSecond;
- private transient Meter numRecordsOutFailedPerSecond;
+ private transient InternalMetrics internalMetrics;
private transient Thread outThread;
private transient ReentrantLock lock;
private transient Block batch;
@@ -132,10 +130,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
}
private void initMetric() throws Exception {
- numRecordsOut = getRuntimeContext().getMetricGroup().counter("ckNumRecordsOut");
- numRecordsOutFailed = getRuntimeContext().getMetricGroup().counter("ckNumRecordsOutFailed");
- numRecordsOutPerSecond = getRuntimeContext().getMetricGroup().meter("ckNumRecordsOutPerSecond", new MeterView(numRecordsOut));
- numRecordsOutFailedPerSecond = getRuntimeContext().getMetricGroup().meter("ckNumRecordsOutFailedPerSecond", new MeterView(numRecordsOutFailed));
+ internalMetrics = new InternalMetrics(getRuntimeContext());
}
private void initClickHouseParams() throws Exception {
@@ -207,6 +202,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
@Override
public final void invoke(T value, Context context) throws Exception {
checkFlushException();
+ internalMetrics.incrementInEvents();
+
lock.lock();
try {
if (addBatch(batch, value)) {
@@ -217,7 +214,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
flush();
}
} catch (Exception e) {
- numRecordsOutFailed.inc();
+ internalMetrics.incrementErrorEvents();
LOG.error("转换ck类型异常", e);
} finally{
lock.unlock();
@@ -253,8 +250,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
private void doFlush(Block block) throws Exception {
long start = System.currentTimeMillis();
int rowCnt = block.rowCnt();
- ClickHouseUtils.showBlockColumnsByteSize(block); // 仅用于测试
- LOG.warn("flush " + rowCnt + " start:" + new Timestamp(start) + "," + (start - lastFlushTs));
+ long blockByteSize = ClickHouseUtils.getBlockColumnsByteSize(block);
+ LOG.warn("flush " + rowCnt + ", totalSize:" + (blockByteSize >>> 20) + "M" + ", start:" + new Timestamp(start) + "," + (start - lastFlushTs));
lastFlushTs = System.currentTimeMillis();
int retryCount = 0;
@@ -275,14 +272,15 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
ClickHouseUtils.copyInsertBlockColumns(block, stmt.getBlock());
stmt.executeBatch();
- numRecordsOut.inc(rowCnt);
+ internalMetrics.incrementOutEvents(rowCnt);
+ internalMetrics.incrementOutBytes(blockByteSize);
LOG.warn("flush " + rowCnt + " end:" + new Timestamp(System.currentTimeMillis()) + "," + (System.currentTimeMillis() - start));
return;
} catch (Exception e) {
LOG.error("ClickHouseBatchInsertFail url:" + url, e);
if (retryCount >= 3) {
- numRecordsOutFailed.inc(rowCnt);
+ internalMetrics.incrementErrorEvents(rowCnt);
LOG.error("ClickHouseBatchInsertFinalFail for rowCnt:" + rowCnt);
// throw e;
return;
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java
index 0e1a576..4a35878 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java
@@ -1,325 +1,325 @@
-package com.geedgenetworks.connectors.clickhouse.util;
-
-import com.github.housepower.buffer.ByteArrayWriter;
-import com.github.housepower.data.*;
-import com.github.housepower.data.type.*;
-import com.github.housepower.data.type.complex.DataTypeArray;
-import com.github.housepower.data.type.complex.DataTypeDecimal;
-import com.github.housepower.data.type.complex.DataTypeFixedString;
-import com.github.housepower.data.type.complex.DataTypeString;
-import com.github.housepower.jdbc.ClickHouseArray;
-import com.github.housepower.jdbc.ClickHouseConnection;
-import com.github.housepower.misc.BytesCharSeq;
-import com.github.housepower.misc.DateTimeUtil;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Field;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.sql.*;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-public class ClickHouseUtils {
- static final Logger LOG = LoggerFactory.getLogger(ClickHouseUtils.class);
- static final Pattern VALUES_REGEX = Pattern.compile("[Vv][Aa][Ll][Uu][Ee][Ss]\\s*\\(");
- static final byte[] EMPTY_BYTES = new byte[0];
- public static final BytesCharSeq EMPTY_BYTES_CHAR_SEQ = new BytesCharSeq(EMPTY_BYTES);
- private static Field blockColumnsField;
- private static Field blockSettingsField;
- private static Field blockNameAndPositionsField;
- private static Field blockPlaceholderIndexesField;
- private static Field blockRowCntField;
-
- static {
- try {
- blockColumnsField = Block.class.getDeclaredField("columns");
- blockColumnsField.setAccessible(true);
- blockSettingsField = Block.class.getDeclaredField("settings");
- blockSettingsField.setAccessible(true);
- blockNameAndPositionsField = Block.class.getDeclaredField("nameAndPositions");
- blockNameAndPositionsField.setAccessible(true);
- blockPlaceholderIndexesField = Block.class.getDeclaredField("placeholderIndexes");
- blockPlaceholderIndexesField.setAccessible(true);
- blockRowCntField = Block.class.getDeclaredField("rowCnt");
- blockRowCntField.setAccessible(true);
- } catch (NoSuchFieldException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 转换host => jdbc url格式 192.168.40.222:9001,192.168.40.223:9001 =>
- * jdbc:clickhouse://192.168.40.222:9001,jdbc:clickhouse://192.168.40.223:9001
- */
- public static String[] buildUrlsFromHost(String host) {
- String[] hosts = host.split(",");
- List<String> urls = new ArrayList<>(hosts.length);
- for (int i = 0; i < hosts.length; i++) {
- String[] ipPort = hosts[i].trim().split(":");
- String ip = ipPort[0].trim();
- int port = Integer.parseInt(ipPort[1].trim());
- urls.add("jdbc:clickhouse://" + ip + ":" + port);
- }
- return urls.toArray(new String[urls.size()]);
- }
-
- public static String genePreparedInsertSql(String table, String[] columnNames) {
- StringBuilder sb = new StringBuilder("insert into ");
- sb.append(table).append("(");
- //sb.append(String.join(",", Arrays.stream(columnNames).map(x -> "`" + x +"`").collect(Collectors.toList())));
- sb.append(String.join(",", columnNames));
- sb.append(")");
- sb.append(" values(");
- sb.append(String.join(",", Arrays.stream(columnNames).map(x -> "?").collect(Collectors.toList())));
- sb.append(")");
- return sb.toString();
- }
-
- public static Tuple2<String[], Object[]> getInsertColumnsAndDefaultValuesForTable(
- String[] urls, int urlIndex, Properties connInfo, String table) throws Exception {
- Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
-
- int retryCount = 0;
- while (true) {
- retryCount++;
- ClickHouseConnection connection = null;
- Statement stmt = null;
- ResultSet rst = null;
- try {
- connection = (ClickHouseConnection) DriverManager.getConnection(urls[urlIndex], connInfo);
- stmt = connection.createStatement();
- rst = stmt.executeQuery("desc " + table);
-
- List<String> columnNames = new ArrayList<>();
- List<Object> columnDefaultValues = new ArrayList<>();
- while (rst.next()) {
- String name = rst.getString("name");
- String typeStr = rst.getString("type");
- String defaultTypeStr = rst.getString("default_type");
- String defaultExpression = rst.getString("default_expression");
- if ("LowCardinality(String)".equals(typeStr)) {
- typeStr = "String";
- }
- IDataType<?, ?> type = DataTypeFactory.get(typeStr, connection.serverContext());
- if ("MATERIALIZED".equals(defaultTypeStr)) {
- continue;
- }
- Object defaultValue = parseDefaultValue(type, defaultExpression); // 只解析数字和字符串
- if (defaultValue == null && !type.nullable()) {
- if (type instanceof DataTypeArray) {
- defaultValue = new ClickHouseArray(((DataTypeArray) type).getElemDataType(), new Object[0]);
- } else {
- defaultValue = type.defaultValue();
- }
- }
- columnNames.add(name);
- columnDefaultValues.add(defaultValue);
- }
-
- return new Tuple2<>(columnNames.toArray(new String[columnNames.size()]), columnDefaultValues.toArray(new Object[columnDefaultValues.size()]));
- } catch (SQLException e) {
- LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e);
- if (retryCount >= 3) {
- throw e;
- }
- urlIndex++;
- if (urlIndex == urls.length) {
- urlIndex = 0;
- }
- } finally {
- if (rst != null) {
- rst.close();
- }
- if (stmt != null) {
- stmt.close();
- }
- if (connection != null) {
- connection.close();
- }
- }
- }
- }
-
- public static ZoneId chooseTimeZone(String[] urls, int urlIndex, Properties connInfo) throws Exception {
- Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
-
- int retryCount = 0;
- while (true) {
- retryCount++;
- Connection connection = null;
- try {
- connection = DriverManager.getConnection(urls[urlIndex], connInfo);
- ZoneId tz = DateTimeUtil.chooseTimeZone(((ClickHouseConnection) connection).serverContext());
- return tz;
- } catch (SQLException e) {
- LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e);
- if (retryCount >= 3) {
- throw e;
- }
- urlIndex++;
- if (urlIndex == urls.length) {
- urlIndex = 0;
- }
- } finally {
- if (connection != null) {
- connection.close();
- }
- }
- }
- }
-
- public static Block getInsertBlockForTable(String[] urls, int urlIndex, Properties connInfo, String table) throws Exception {
- String sql = "insert into " + table + " values";
- return getInsertBlockForSql(urls, urlIndex, connInfo, sql);
- }
-
- public static Block getInsertBlockForSql(String[] urls, int urlIndex, Properties connInfo, String sql) throws Exception {
- Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
-
- String insertQuery;
- if (sql.trim().toLowerCase().endsWith("values")) {
- insertQuery = sql;
- } else {
- Matcher matcher = VALUES_REGEX.matcher(sql);
- Preconditions.checkArgument(matcher.find(), "insert sql syntax error:%s", sql);
- insertQuery = sql.substring(0, matcher.end() - 1);
- }
- LOG.warn("getInsertBlock insertQuery:{}.", insertQuery);
-
- int retryCount = 0;
- while (true) {
- retryCount++;
- Connection connection = null;
- try {
- connection = DriverManager.getConnection(urls[urlIndex], connInfo);
- Block block = ((ClickHouseConnection) connection).getSampleBlock(insertQuery);
- return block;
- } catch (SQLException e) {
- LOG.error("ClickHouse getInsertBlock Exception url:" + urls[urlIndex], e);
- if (retryCount >= 3) {
- throw e;
- }
- urlIndex++;
- if (urlIndex == urls.length) {
- urlIndex = 0;
- }
- } finally {
- if (connection != null) {
- connection.close();
- }
- }
- }
- }
-
- public static Block newInsertBlockFrom(Block block) throws Exception {
- int rowCnt = 0; // (int) blockRowCntField.get(block);
- int columnCnt = block.columnCnt();
- BlockSettings settings = (BlockSettings) blockSettingsField.get(block);
- IColumn[] srcColumns = (IColumn[]) blockColumnsField.get(block);
- IColumn[] columns = new IColumn[columnCnt];
-
- for (int i = 0; i < columnCnt; i++) {
- String name = srcColumns[i].name();
- IDataType<?, ?> dataType = srcColumns[i].type();
- columns[i] = ColumnFactory.createColumn(name, dataType, null); // values用于rst读取
- }
-
- Block newBlock = new Block(rowCnt, columns, settings);
- return newBlock;
- }
-
- public static void copyInsertBlockColumns(Block src, Block desc) throws Exception {
- desc.cleanup();
-
- IColumn[] srcColumns = (IColumn[]) blockColumnsField.get(src);
- IColumn[] descColumns = (IColumn[]) blockColumnsField.get(desc);
- for (int i = 0; i < srcColumns.length; i++) {
- descColumns[i] = srcColumns[i];
- }
-
- blockRowCntField.set(desc, blockRowCntField.get(src));
- }
-
- public static void resetInsertBlockColumns(Block block) throws Exception {
- block.cleanup();
- blockRowCntField.set(block, 0);
-
- IColumn[] columns = (IColumn[]) blockColumnsField.get(block);
- for (int i = 0; i < columns.length; i++) {
- String name = columns[i].name();
- IDataType<?, ?> dataType = columns[i].type();
- columns[i] = ColumnFactory.createColumn(name, dataType, null); // values用于rst读取
- }
-
- block.initWriteBuffer();
- }
-
- private static Object parseDefaultValue(IDataType<?, ?> type, String defaultExpression) {
- Object defaultValue = null;
- if (!StringUtils.isBlank(defaultExpression)) {
- if (type instanceof DataTypeString || type instanceof DataTypeFixedString) {
- defaultValue = defaultExpression;
- } else if (type instanceof DataTypeInt8) {
- defaultValue = (byte) Integer.parseInt(defaultExpression);
- } else if (type instanceof DataTypeUInt8 || type instanceof DataTypeInt16) {
- defaultValue = (short) Integer.parseInt(defaultExpression);
- } else if (type instanceof DataTypeUInt16 || type instanceof DataTypeInt32) {
- defaultValue = Integer.parseInt(defaultExpression);
- } else if (type instanceof DataTypeUInt32 || type instanceof DataTypeInt64) {
- defaultValue = Long.parseLong(defaultExpression);
- } else if (type instanceof DataTypeUInt64) {
- defaultValue = BigInteger.valueOf(Long.parseLong(defaultExpression));
- } else if (type instanceof DataTypeFloat32) {
- defaultValue = Float.parseFloat(defaultExpression);
- } else if (type instanceof DataTypeFloat64) {
- defaultValue = Double.parseDouble(defaultExpression);
- } else if (type instanceof DataTypeDecimal) {
- defaultValue = new BigDecimal(Double.parseDouble(defaultExpression));
- }
- }
-
- return defaultValue;
- }
-
- // 仅用于测试
- public static void showBlockColumnsByteSize(Block block) throws Exception {
- IColumn[] columns = (IColumn[]) blockColumnsField.get(block);
- Field field = AbstractColumn.class.getDeclaredField("buffer");
- field.setAccessible(true);
- Field columnWriterField = ColumnWriterBuffer.class.getDeclaredField("columnWriter");
- columnWriterField.setAccessible(true);
- Field byteBufferListField = ByteArrayWriter.class.getDeclaredField("byteBufferList");
- byteBufferListField.setAccessible(true);
- int size = 0;
- int totalSize = 0;
- double unitM = 1 << 20;
- LOG.warn("rowCount:" + block.rowCnt());
- for (int i = 0; i < columns.length; i++) {
- Object columnWriter = columnWriterField.get(field.get(columns[i]));
- List<ByteBuffer> byteBufferList =
- (List<ByteBuffer>) byteBufferListField.get(columnWriter);
- size = 0;
- for (ByteBuffer byteBuffer : byteBufferList) {
- size += byteBuffer.position();
- }
- totalSize += size;
- if (size > unitM) {
- // LOG.warn(columns[i].name() + "buf cnt:" + byteBufferList.size() + ", size:" +
- // size/unitM + "M");
- }
- }
- LOG.warn("totalSize:" + totalSize / unitM + "M");
- }
-}
+package com.geedgenetworks.connectors.clickhouse.util;
+
+import com.github.housepower.buffer.ByteArrayWriter;
+import com.github.housepower.data.*;
+import com.github.housepower.data.type.*;
+import com.github.housepower.data.type.complex.DataTypeArray;
+import com.github.housepower.data.type.complex.DataTypeDecimal;
+import com.github.housepower.data.type.complex.DataTypeFixedString;
+import com.github.housepower.data.type.complex.DataTypeString;
+import com.github.housepower.jdbc.ClickHouseArray;
+import com.github.housepower.jdbc.ClickHouseConnection;
+import com.github.housepower.misc.BytesCharSeq;
+import com.github.housepower.misc.DateTimeUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.*;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class ClickHouseUtils {
+ static final Logger LOG = LoggerFactory.getLogger(ClickHouseUtils.class);
+ static final Pattern VALUES_REGEX = Pattern.compile("[Vv][Aa][Ll][Uu][Ee][Ss]\\s*\\(");
+ static final byte[] EMPTY_BYTES = new byte[0];
+ public static final BytesCharSeq EMPTY_BYTES_CHAR_SEQ = new BytesCharSeq(EMPTY_BYTES);
+ private static Field blockColumnsField;
+ private static Field blockSettingsField;
+ private static Field blockNameAndPositionsField;
+ private static Field blockPlaceholderIndexesField;
+ private static Field blockRowCntField;
+
+ static {
+ try {
+ blockColumnsField = Block.class.getDeclaredField("columns");
+ blockColumnsField.setAccessible(true);
+ blockSettingsField = Block.class.getDeclaredField("settings");
+ blockSettingsField.setAccessible(true);
+ blockNameAndPositionsField = Block.class.getDeclaredField("nameAndPositions");
+ blockNameAndPositionsField.setAccessible(true);
+ blockPlaceholderIndexesField = Block.class.getDeclaredField("placeholderIndexes");
+ blockPlaceholderIndexesField.setAccessible(true);
+ blockRowCntField = Block.class.getDeclaredField("rowCnt");
+ blockRowCntField.setAccessible(true);
+ } catch (NoSuchFieldException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 转换host => jdbc url格式 192.168.40.222:9001,192.168.40.223:9001 =>
+ * jdbc:clickhouse://192.168.40.222:9001,jdbc:clickhouse://192.168.40.223:9001
+ */
+ public static String[] buildUrlsFromHost(String host) {
+ String[] hosts = host.split(",");
+ List<String> urls = new ArrayList<>(hosts.length);
+ for (int i = 0; i < hosts.length; i++) {
+ String[] ipPort = hosts[i].trim().split(":");
+ String ip = ipPort[0].trim();
+ int port = Integer.parseInt(ipPort[1].trim());
+ urls.add("jdbc:clickhouse://" + ip + ":" + port);
+ }
+ return urls.toArray(new String[urls.size()]);
+ }
+
+ public static String genePreparedInsertSql(String table, String[] columnNames) {
+ StringBuilder sb = new StringBuilder("insert into ");
+ sb.append(table).append("(");
+ //sb.append(String.join(",", Arrays.stream(columnNames).map(x -> "`" + x +"`").collect(Collectors.toList())));
+ sb.append(String.join(",", columnNames));
+ sb.append(")");
+ sb.append(" values(");
+ sb.append(String.join(",", Arrays.stream(columnNames).map(x -> "?").collect(Collectors.toList())));
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public static Tuple2<String[], Object[]> getInsertColumnsAndDefaultValuesForTable(
+ String[] urls, int urlIndex, Properties connInfo, String table) throws Exception {
+ Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
+
+ int retryCount = 0;
+ while (true) {
+ retryCount++;
+ ClickHouseConnection connection = null;
+ Statement stmt = null;
+ ResultSet rst = null;
+ try {
+ connection = (ClickHouseConnection) DriverManager.getConnection(urls[urlIndex], connInfo);
+ stmt = connection.createStatement();
+ rst = stmt.executeQuery("desc " + table);
+
+ List<String> columnNames = new ArrayList<>();
+ List<Object> columnDefaultValues = new ArrayList<>();
+ while (rst.next()) {
+ String name = rst.getString("name");
+ String typeStr = rst.getString("type");
+ String defaultTypeStr = rst.getString("default_type");
+ String defaultExpression = rst.getString("default_expression");
+ if ("LowCardinality(String)".equals(typeStr)) {
+ typeStr = "String";
+ }
+ IDataType<?, ?> type = DataTypeFactory.get(typeStr, connection.serverContext());
+ if ("MATERIALIZED".equals(defaultTypeStr)) {
+ continue;
+ }
+ Object defaultValue = parseDefaultValue(type, defaultExpression); // 只解析数字和字符串
+ if (defaultValue == null && !type.nullable()) {
+ if (type instanceof DataTypeArray) {
+ defaultValue = new ClickHouseArray(((DataTypeArray) type).getElemDataType(), new Object[0]);
+ } else {
+ defaultValue = type.defaultValue();
+ }
+ }
+ columnNames.add(name);
+ columnDefaultValues.add(defaultValue);
+ }
+
+ return new Tuple2<>(columnNames.toArray(new String[columnNames.size()]), columnDefaultValues.toArray(new Object[columnDefaultValues.size()]));
+ } catch (SQLException e) {
+ LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e);
+ if (retryCount >= 3) {
+ throw e;
+ }
+ urlIndex++;
+ if (urlIndex == urls.length) {
+ urlIndex = 0;
+ }
+ } finally {
+ if (rst != null) {
+ rst.close();
+ }
+ if (stmt != null) {
+ stmt.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+ }
+
+ public static ZoneId chooseTimeZone(String[] urls, int urlIndex, Properties connInfo) throws Exception {
+ Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
+
+ int retryCount = 0;
+ while (true) {
+ retryCount++;
+ Connection connection = null;
+ try {
+ connection = DriverManager.getConnection(urls[urlIndex], connInfo);
+ ZoneId tz = DateTimeUtil.chooseTimeZone(((ClickHouseConnection) connection).serverContext());
+ return tz;
+ } catch (SQLException e) {
+ LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e);
+ if (retryCount >= 3) {
+ throw e;
+ }
+ urlIndex++;
+ if (urlIndex == urls.length) {
+ urlIndex = 0;
+ }
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+ }
+
+ public static Block getInsertBlockForTable(String[] urls, int urlIndex, Properties connInfo, String table) throws Exception {
+ String sql = "insert into " + table + " values";
+ return getInsertBlockForSql(urls, urlIndex, connInfo, sql);
+ }
+
+ public static Block getInsertBlockForSql(String[] urls, int urlIndex, Properties connInfo, String sql) throws Exception {
+ Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
+
+ String insertQuery;
+ if (sql.trim().toLowerCase().endsWith("values")) {
+ insertQuery = sql;
+ } else {
+ Matcher matcher = VALUES_REGEX.matcher(sql);
+ Preconditions.checkArgument(matcher.find(), "insert sql syntax error:%s", sql);
+ insertQuery = sql.substring(0, matcher.end() - 1);
+ }
+ LOG.warn("getInsertBlock insertQuery:{}.", insertQuery);
+
+ int retryCount = 0;
+ while (true) {
+ retryCount++;
+ Connection connection = null;
+ try {
+ connection = DriverManager.getConnection(urls[urlIndex], connInfo);
+ Block block = ((ClickHouseConnection) connection).getSampleBlock(insertQuery);
+ return block;
+ } catch (SQLException e) {
+ LOG.error("ClickHouse getInsertBlock Exception url:" + urls[urlIndex], e);
+ if (retryCount >= 3) {
+ throw e;
+ }
+ urlIndex++;
+ if (urlIndex == urls.length) {
+ urlIndex = 0;
+ }
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+ }
+
+ public static Block newInsertBlockFrom(Block block) throws Exception {
+ int rowCnt = 0; // (int) blockRowCntField.get(block);
+ int columnCnt = block.columnCnt();
+ BlockSettings settings = (BlockSettings) blockSettingsField.get(block);
+ IColumn[] srcColumns = (IColumn[]) blockColumnsField.get(block);
+ IColumn[] columns = new IColumn[columnCnt];
+
+ for (int i = 0; i < columnCnt; i++) {
+ String name = srcColumns[i].name();
+ IDataType<?, ?> dataType = srcColumns[i].type();
+ columns[i] = ColumnFactory.createColumn(name, dataType, null); // values用于rst读取
+ }
+
+ Block newBlock = new Block(rowCnt, columns, settings);
+ return newBlock;
+ }
+
+ public static void copyInsertBlockColumns(Block src, Block desc) throws Exception {
+ desc.cleanup();
+
+ IColumn[] srcColumns = (IColumn[]) blockColumnsField.get(src);
+ IColumn[] descColumns = (IColumn[]) blockColumnsField.get(desc);
+ for (int i = 0; i < srcColumns.length; i++) {
+ descColumns[i] = srcColumns[i];
+ }
+
+ blockRowCntField.set(desc, blockRowCntField.get(src));
+ }
+
+ public static void resetInsertBlockColumns(Block block) throws Exception {
+ block.cleanup();
+ blockRowCntField.set(block, 0);
+
+ IColumn[] columns = (IColumn[]) blockColumnsField.get(block);
+ for (int i = 0; i < columns.length; i++) {
+ String name = columns[i].name();
+ IDataType<?, ?> dataType = columns[i].type();
+ columns[i] = ColumnFactory.createColumn(name, dataType, null); // values用于rst读取
+ }
+
+ block.initWriteBuffer();
+ }
+
+ private static Object parseDefaultValue(IDataType<?, ?> type, String defaultExpression) {
+ Object defaultValue = null;
+ if (!StringUtils.isBlank(defaultExpression)) {
+ if (type instanceof DataTypeString || type instanceof DataTypeFixedString) {
+ defaultValue = defaultExpression;
+ } else if (type instanceof DataTypeInt8) {
+ defaultValue = (byte) Integer.parseInt(defaultExpression);
+ } else if (type instanceof DataTypeUInt8 || type instanceof DataTypeInt16) {
+ defaultValue = (short) Integer.parseInt(defaultExpression);
+ } else if (type instanceof DataTypeUInt16 || type instanceof DataTypeInt32) {
+ defaultValue = Integer.parseInt(defaultExpression);
+ } else if (type instanceof DataTypeUInt32 || type instanceof DataTypeInt64) {
+ defaultValue = Long.parseLong(defaultExpression);
+ } else if (type instanceof DataTypeUInt64) {
+ defaultValue = BigInteger.valueOf(Long.parseLong(defaultExpression));
+ } else if (type instanceof DataTypeFloat32) {
+ defaultValue = Float.parseFloat(defaultExpression);
+ } else if (type instanceof DataTypeFloat64) {
+ defaultValue = Double.parseDouble(defaultExpression);
+ } else if (type instanceof DataTypeDecimal) {
+ defaultValue = new BigDecimal(Double.parseDouble(defaultExpression));
+ }
+ }
+
+ return defaultValue;
+ }
+
+ public static long getBlockColumnsByteSize(Block block) throws Exception {
+ IColumn[] columns = (IColumn[]) blockColumnsField.get(block);
+ Field field = AbstractColumn.class.getDeclaredField("buffer");
+ field.setAccessible(true);
+ Field columnWriterField = ColumnWriterBuffer.class.getDeclaredField("columnWriter");
+ columnWriterField.setAccessible(true);
+ Field byteBufferListField = ByteArrayWriter.class.getDeclaredField("byteBufferList");
+ byteBufferListField.setAccessible(true);
+ int size = 0;
+ int totalSize = 0;
+
+ for (int i = 0; i < columns.length; i++) {
+ Object columnWriter = columnWriterField.get(field.get(columns[i]));
+ List<ByteBuffer> byteBufferList =
+ (List<ByteBuffer>) byteBufferListField.get(columnWriter);
+ size = 0;
+ for (ByteBuffer byteBuffer : byteBufferList) {
+ size += byteBuffer.position();
+ }
+ totalSize += size;
+ /*if (size > 1000000) {
+ LOG.warn(columns[i].name() + "buf cnt:" + byteBufferList.size() + ", size:" + size/1000000.0 + "M");
+ }*/
+ }
+
+ return totalSize;
+ }
+
+
+}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java
index ad9b6aa..aae6678 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java
@@ -1,17 +1,21 @@
package com.geedgenetworks.connectors.kafka;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.metrics.InternalMetrics;
+import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.RuntimeContextAware;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class EventKafkaDeserializationSchema implements KafkaDeserializationSchema<Event>{
+public class EventKafkaDeserializationSchema implements KafkaDeserializationSchema<Event>, RuntimeContextAware {
private static final Logger LOG = LoggerFactory.getLogger(EventKafkaDeserializationSchema.class);
private final DeserializationSchema<Event> valueDeserialization;
+ private transient InternalMetrics internalMetrics;
public EventKafkaDeserializationSchema(DeserializationSchema<Event> valueDeserialization) {
this.valueDeserialization = valueDeserialization;
@@ -35,19 +39,30 @@ public class EventKafkaDeserializationSchema implements KafkaDeserializationSche
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<Event> out) throws Exception {
+ internalMetrics.incrementInEvents();
+ internalMetrics.incrementInBytes(record.value().length);
+
try {
Event event = valueDeserialization.deserialize(record.value());
if(event != null){
event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, record.timestamp());
out.collect(event);
+ internalMetrics.incrementOutEvents();
+ return;
}
}catch (Exception e) {
LOG.error("反序列化失败", e);
}
+ internalMetrics.incrementErrorEvents();
}
@Override
public TypeInformation<Event> getProducedType() {
return TypeInformation.of(Event.class);
}
+
+ @Override
+ public void runtimeContextAware(RuntimeContext runtimeContext) {
+ internalMetrics = new InternalMetrics(runtimeContext);
+ }
}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
index 2ac3223..d300650 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
@@ -7,7 +7,7 @@ import com.geedgenetworks.core.types.StructType;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer2;
import java.util.Optional;
import java.util.Properties;
@@ -33,12 +33,13 @@ public class KafkaSinkProvider implements SinkProvider {
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) {
- FlinkKafkaProducer<Event> kafkaProducer = new FlinkKafkaProducer<>(
+ FlinkKafkaProducer2<Event> kafkaProducer = new FlinkKafkaProducer2<>(
topic,
valueSerialization,
properties,
Optional.empty()
);
+ kafkaProducer.setLogFailuresOnly(true);
return dataStream.addSink(kafkaProducer);
}
}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java
index ead1f3b..ce0ddf8 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java
@@ -7,7 +7,7 @@ import com.geedgenetworks.core.types.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer2;
import java.util.List;
import java.util.Properties;
@@ -34,7 +34,7 @@ public class KafkaSourceProvider implements SourceProvider {
@Override
public SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env) {
- FlinkKafkaConsumer<Event> kafkaConsumer = new FlinkKafkaConsumer<>(
+ FlinkKafkaConsumer2<Event> kafkaConsumer = new FlinkKafkaConsumer2<>(
topics,
new EventKafkaDeserializationSchema(valueDeserialization),
properties
diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer2.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer2.java
new file mode 100644
index 0000000..d37121d
--- /dev/null
+++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer2.java
@@ -0,0 +1,55 @@
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher2;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+@PublicEvolving
+public class FlinkKafkaConsumer2<T> extends FlinkKafkaConsumer<T> {
+
+ public FlinkKafkaConsumer2(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
+ super(topics, deserializer, props);
+ }
+
+ @Override
+ protected AbstractFetcher<T, ?> createFetcher(
+ SourceContext<T> sourceContext,
+ Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
+ SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
+ StreamingRuntimeContext runtimeContext,
+ OffsetCommitMode offsetCommitMode,
+ MetricGroup consumerMetricGroup,
+ boolean useMetrics)
+ throws Exception {
+
+ // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
+ // this overwrites whatever setting the user configured in the properties
+ adjustAutoCommitConfig(properties, offsetCommitMode);
+
+ return new KafkaFetcher2<>(
+ sourceContext,
+ assignedPartitionsWithInitialOffsets,
+ watermarkStrategy,
+ runtimeContext,
+ runtimeContext.getProcessingTimeService(),
+ runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
+ runtimeContext.getUserCodeClassLoader(),
+ runtimeContext.getTaskNameWithSubtasks(),
+ deserializer,
+ properties,
+ pollTimeout,
+ runtimeContext.getMetricGroup(),
+ consumerMetricGroup,
+ useMetrics);
+ }
+}
diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer2.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer2.java
new file mode 100644
index 0000000..818df4e
--- /dev/null
+++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer2.java
@@ -0,0 +1,1909 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import com.geedgenetworks.core.metrics.InternalMetrics;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper;
+import org.apache.flink.streaming.connectors.kafka.internals.TransactionalIdsGenerator;
+import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TemporaryClassLoaderContext;
+import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. By default producer will use {@link
+ * FlinkKafkaProducer2.Semantic#AT_LEAST_ONCE} semantic. Before using {@link
+ * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE} please refer to Flink's Kafka connector documentation.
+ */
+@PublicEvolving
+public class FlinkKafkaProducer2<IN>
+ extends TwoPhaseCommitSinkFunction<
+ IN,
+ FlinkKafkaProducer2.KafkaTransactionState,
+ FlinkKafkaProducer2.KafkaTransactionContext> {
+
+ /**
+ * Semantics that can be chosen.
+ * <li>{@link #EXACTLY_ONCE}
+ * <li>{@link #AT_LEAST_ONCE}
+ * <li>{@link #NONE}
+ */
+ public enum Semantic {
+
+ /**
+ * Semantic.EXACTLY_ONCE the Flink producer will write all messages in a Kafka transaction
+ * that will be committed to Kafka on a checkpoint.
+ *
+ * <p>In this mode {@link FlinkKafkaProducer2} sets up a pool of {@link
+ * FlinkKafkaInternalProducer}. Between each checkpoint a Kafka transaction is created,
+ * which is committed on {@link FlinkKafkaProducer2#notifyCheckpointComplete(long)}. If
+ * checkpoint complete notifications are running late, {@link FlinkKafkaProducer2} can run
+ * out of {@link FlinkKafkaInternalProducer}s in the pool. In that case any subsequent
+ * {@link FlinkKafkaProducer2#snapshotState(FunctionSnapshotContext)} requests will fail and
+ * {@link FlinkKafkaProducer2} will keep using the {@link FlinkKafkaInternalProducer} from
+ * the previous checkpoint. To decrease the chance of failing checkpoints there are four
+ * options:
+ * <li>decrease number of max concurrent checkpoints
+ * <li>make checkpoints more reliable (so that they complete faster)
+ * <li>increase the delay between checkpoints
+ * <li>increase the size of {@link FlinkKafkaInternalProducer}s pool
+ */
+ EXACTLY_ONCE,
+
+ /**
+ * Semantic.AT_LEAST_ONCE the Flink producer will wait for all outstanding messages in the
+ * Kafka buffers to be acknowledged by the Kafka producer on a checkpoint.
+ */
+ AT_LEAST_ONCE,
+
+ /**
+ * Semantic.NONE means that nothing will be guaranteed. Messages can be lost and/or
+ * duplicated in case of failure.
+ */
+ NONE
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer2.class);
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Number of characters to truncate the taskName to for the Kafka transactionalId. The maximum
+ * this can possibly be set to is 32,767 - (length of operatorUniqueId).
+ */
+ private static final short maxTaskNameSize = 1_000;
+
+ /**
+ * This coefficient determines what is the safe scale down factor.
+ *
+ * <p>If the Flink application previously failed before first checkpoint completed or we are
+ * starting new batch of {@link FlinkKafkaProducer2} from scratch without clean shutdown of the
+ * previous one, {@link FlinkKafkaProducer2} doesn't know what was the set of previously used
+ * Kafka's transactionalId's. In that case, it will try to play safe and abort all of the
+ * possible transactionalIds from the range of: {@code [0, getNumberOfParallelSubtasks() *
+ * kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR) }
+ *
+ * <p>The range of available to use transactional ids is: {@code [0,
+ * getNumberOfParallelSubtasks() * kafkaProducersPoolSize) }
+ *
+ * <p>This means that if we decrease {@code getNumberOfParallelSubtasks()} by a factor larger
+ * than {@code SAFE_SCALE_DOWN_FACTOR} we can have a left some lingering transaction.
+ */
+ public static final int SAFE_SCALE_DOWN_FACTOR = 5;
+
+ /**
+ * Default number of KafkaProducers in the pool. See {@link
+ * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}.
+ */
+ public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5;
+
+ /** Default value for kafka transaction timeout. */
+ public static final Time DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Time.hours(1);
+
+ /** Configuration key for disabling the metrics reporting. */
+ public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
+
+ /**
+ * Descriptor of the transactional IDs list. Note: This state is serialized by Kryo Serializer
+ * and it has compatibility problem that will be removed later. Please use
+ * NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2.
+ */
+ @Deprecated
+ private static final ListStateDescriptor<FlinkKafkaProducer2.NextTransactionalIdHint>
+ NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR =
+ new ListStateDescriptor<>(
+ "next-transactional-id-hint",
+ TypeInformation.of(NextTransactionalIdHint.class));
+
+ private static final ListStateDescriptor<FlinkKafkaProducer2.NextTransactionalIdHint>
+ NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2 =
+ new ListStateDescriptor<>(
+ "next-transactional-id-hint-v2",
+ new NextTransactionalIdHintSerializer());
+
+ /** State for nextTransactionalIdHint. */
+ private transient ListState<FlinkKafkaProducer2.NextTransactionalIdHint>
+ nextTransactionalIdHintState;
+
+ /** Generator for Transactional IDs. */
+ private transient TransactionalIdsGenerator transactionalIdsGenerator;
+
+ /** Hint for picking next transactional id. */
+ private transient FlinkKafkaProducer2.NextTransactionalIdHint nextTransactionalIdHint;
+
+ /** User defined properties for the Producer. */
+ protected final Properties producerConfig;
+
+ /** The name of the default topic this producer is writing data to. */
+ protected final String defaultTopicId;
+
+ /**
+ * (Serializable) SerializationSchema for turning objects used with Flink into. byte[] for
+ * Kafka.
+ */
+ @Nullable private final KeyedSerializationSchema<IN> keyedSchema;
+
+ /**
+ * (Serializable) serialization schema for serializing records to {@link ProducerRecord
+ * ProducerRecords}.
+ */
+ @Nullable private final KafkaSerializationSchema<IN> kafkaSchema;
+
+ /** User-provided partitioner for assigning an object to a Kafka partition for each topic. */
+ @Nullable private final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
+
+ /** Partitions of each topic. */
+ protected final Map<String, int[]> topicPartitionsMap;
+
+ /**
+ * Max number of producers in the pool. If all producers are in use, snapshoting state will
+ * throw an exception.
+ */
+ private final int kafkaProducersPoolSize;
+
+ /** Pool of available transactional ids. */
+ private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque<>();
+
+ /** Flag controlling whether we are writing the Flink record's timestamp into Kafka. */
+ protected boolean writeTimestampToKafka = false;
+
+ /** Flag indicating whether to accept failures (and log them), or to fail on failures. */
+ private boolean logFailuresOnly;
+
+ /** Semantic chosen for this instance. */
+ protected FlinkKafkaProducer2.Semantic semantic;
+
+ // -------------------------------- Runtime fields ------------------------------------------
+
+ /** The callback than handles error propagation or logging callbacks. */
+ @Nullable protected transient Callback callback;
+
+ /** Errors encountered in the async producer are stored here. */
+ @Nullable protected transient volatile Exception asyncException;
+
+ /** Number of unacknowledged records. */
+ protected final AtomicLong pendingRecords = new AtomicLong();
+
+ /**
+ * Cache of metrics to replace already registered metrics instead of overwriting existing ones.
+ */
+ private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap<>();
+
+ private transient InternalMetrics internalMetrics;
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic.
+ *
+ * @param brokerList Comma separated addresses of the brokers
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined (keyless) serialization schema.
+ */
+ public FlinkKafkaProducer2(
+ String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
+ this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList));
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic.
+ *
+ * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as the
+ * partitioner. This default partitioner maps each sink subtask to a single Kafka partition
+ * (i.e. all records received by a sink subtask will end up in the same Kafka partition).
+ *
+ * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer2(String,
+ * SerializationSchema, Properties, Optional)} instead.
+ *
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined key-less serialization schema.
+ * @param producerConfig Properties with the producer configuration.
+ */
+ public FlinkKafkaProducer2(
+ String topicId,
+ SerializationSchema<IN> serializationSchema,
+ Properties producerConfig) {
+ this(
+ topicId,
+ serializationSchema,
+ producerConfig,
+ Optional.of(new FlinkFixedPartitioner<>()));
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It
+ * accepts a key-less {@link SerializationSchema} and possibly a custom {@link
+ * FlinkKafkaPartitioner}.
+ *
+ * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not
+ * have an attached key. Therefore, if a partitioner is also not provided, records will be
+ * distributed to Kafka partitions in a round-robin fashion.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A key-less serializable serialization schema for turning user
+ * objects into a kafka-consumable byte[]
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
+ * the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka
+ * partitions. If a partitioner is not provided, records will be distributed to Kafka
+ * partitions in a round-robin fashion.
+ */
+ public FlinkKafkaProducer2(
+ String topicId,
+ SerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+ this(
+ topicId,
+ serializationSchema,
+ producerConfig,
+ customPartitioner.orElse(null),
+ Semantic.AT_LEAST_ONCE,
+ DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It
+ * accepts a key-less {@link SerializationSchema} and possibly a custom {@link
+ * FlinkKafkaPartitioner}.
+ *
+ * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not
+ * have an attached key. Therefore, if a partitioner is also not provided, records will be
+ * distributed to Kafka partitions in a round-robin fashion.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A key-less serializable serialization schema for turning user
+ * objects into a kafka-consumable byte[]
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
+ * the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka
+ * partitions. If a partitioner is not provided, records will be distributed to Kafka
+ * partitions in a round-robin fashion.
+ * @param semantic Defines semantic that will be used by this producer (see {@link
+ * FlinkKafkaProducer2.Semantic}).
+ * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link
+ * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}).
+ */
+ public FlinkKafkaProducer2(
+ String topicId,
+ SerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ @Nullable FlinkKafkaPartitioner<IN> customPartitioner,
+ FlinkKafkaProducer2.Semantic semantic,
+ int kafkaProducersPoolSize) {
+ this(
+ topicId,
+ null,
+ null,
+ new KafkaSerializationSchemaWrapper<>(
+ topicId, customPartitioner, false, serializationSchema),
+ producerConfig,
+ semantic,
+ kafkaProducersPoolSize);
+ }
+
+ // ------------------- Key/Value serialization schema constructors ----------------------
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic.
+ *
+ * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as the
+ * partitioner. This default partitioner maps each sink subtask to a single Kafka partition
+ * (i.e. all records received by a sink subtask will end up in the same Kafka partition).
+ *
+ * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer2(String,
+ * KeyedSerializationSchema, Properties, Optional)} instead.
+ *
+ * @param brokerList Comma separated addresses of the brokers
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined serialization schema supporting key/value messages
+ * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties,
+ * FlinkKafkaProducer2.Semantic)}
+ */
+ @Deprecated
+ public FlinkKafkaProducer2(
+ String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
+ this(
+ topicId,
+ serializationSchema,
+ getPropertiesFromBrokerList(brokerList),
+ Optional.of(new FlinkFixedPartitioner<IN>()));
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic.
+ *
+ * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as the
+ * partitioner. This default partitioner maps each sink subtask to a single Kafka partition
+ * (i.e. all records received by a sink subtask will end up in the same Kafka partition).
+ *
+ * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer2(String,
+ * KeyedSerializationSchema, Properties, Optional)} instead.
+ *
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined serialization schema supporting key/value messages
+ * @param producerConfig Properties with the producer configuration.
+ * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties,
+ * FlinkKafkaProducer2.Semantic)}
+ */
+ @Deprecated
+ public FlinkKafkaProducer2(
+ String topicId,
+ KeyedSerializationSchema<IN> serializationSchema,
+ Properties producerConfig) {
+ this(
+ topicId,
+ serializationSchema,
+ producerConfig,
+ Optional.of(new FlinkFixedPartitioner<IN>()));
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic.
+ *
+ * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as the
+ * partitioner. This default partitioner maps each sink subtask to a single Kafka partition
+ * (i.e. all records received by a sink subtask will end up in the same Kafka partition).
+ *
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined serialization schema supporting key/value messages
+ * @param producerConfig Properties with the producer configuration.
+ * @param semantic Defines semantic that will be used by this producer (see {@link
+ * FlinkKafkaProducer2.Semantic}).
+ * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties,
+ * FlinkKafkaProducer2.Semantic)}
+ */
+ @Deprecated
+ public FlinkKafkaProducer2(
+ String topicId,
+ KeyedSerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ FlinkKafkaProducer2.Semantic semantic) {
+ this(
+ topicId,
+ serializationSchema,
+ producerConfig,
+ Optional.of(new FlinkFixedPartitioner<IN>()),
+ semantic,
+ DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It
+ * accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link
+ * FlinkKafkaPartitioner}.
+ *
+ * <p>If a partitioner is not provided, written records will be partitioned by the attached key
+ * of each record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If
+ * written records do not have a key (i.e., {@link
+ * KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they will be
+ * distributed to Kafka partitions in a round-robin fashion.
+ *
+ * @param defaultTopicId The default topic to write data to
+ * @param serializationSchema A serializable serialization schema for turning user objects into
+ * a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
+ * the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka
+ * partitions. If a partitioner is not provided, records will be partitioned by the key of
+ * each record (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the
+ * keys are {@code null}, then records will be distributed to Kafka partitions in a
+ * round-robin fashion.
+ * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties,
+ * FlinkKafkaProducer2.Semantic)}
+ */
+ @Deprecated
+ public FlinkKafkaProducer2(
+ String defaultTopicId,
+ KeyedSerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+ this(
+ defaultTopicId,
+ serializationSchema,
+ producerConfig,
+ customPartitioner,
+ FlinkKafkaProducer2.Semantic.AT_LEAST_ONCE,
+ DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It
+ * accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link
+ * FlinkKafkaPartitioner}.
+ *
+ * <p>If a partitioner is not provided, written records will be partitioned by the attached key
+ * of each record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If
+ * written records do not have a key (i.e., {@link
+ * KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they will be
+ * distributed to Kafka partitions in a round-robin fashion.
+ *
+ * @param defaultTopicId The default topic to write data to
+ * @param serializationSchema A serializable serialization schema for turning user objects into
+ * a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
+ * the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka
+ * partitions. If a partitioner is not provided, records will be partitioned by the key of
+ * each record (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the
+ * keys are {@code null}, then records will be distributed to Kafka partitions in a
+ * round-robin fashion.
+ * @param semantic Defines semantic that will be used by this producer (see {@link
+ * FlinkKafkaProducer2.Semantic}).
+ * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link
+ * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}).
+ * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties,
+ * FlinkKafkaProducer2.Semantic)}
+ */
+ @Deprecated
+ public FlinkKafkaProducer2(
+ String defaultTopicId,
+ KeyedSerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
+ FlinkKafkaProducer2.Semantic semantic,
+ int kafkaProducersPoolSize) {
+ this(
+ defaultTopicId,
+ serializationSchema,
+ customPartitioner.orElse(null),
+ null, /* kafka serialization schema */
+ producerConfig,
+ semantic,
+ kafkaProducersPoolSize);
+ }
+
+ /**
+ * Creates a {@link FlinkKafkaProducer2} for a given topic. The sink produces its input to the
+ * topic. It accepts a {@link KafkaSerializationSchema} for serializing records to a {@link
+ * ProducerRecord}, including partitioning information.
+ *
+ * @param defaultTopic The default topic to write data to
+ * @param serializationSchema A serializable serialization schema for turning user objects into
+ * a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
+ * the only required argument.
+ * @param semantic Defines semantic that will be used by this producer (see {@link
+ * FlinkKafkaProducer2.Semantic}).
+ */
+ public FlinkKafkaProducer2(
+ String defaultTopic,
+ KafkaSerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ FlinkKafkaProducer2.Semantic semantic) {
+ this(
+ defaultTopic,
+ serializationSchema,
+ producerConfig,
+ semantic,
+ DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It
+ * accepts a {@link KafkaSerializationSchema} and possibly a custom {@link
+ * FlinkKafkaPartitioner}.
+ *
+ * @param defaultTopic The default topic to write data to
+ * @param serializationSchema A serializable serialization schema for turning user objects into
+ * a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
+ * the only required argument.
+ * @param semantic Defines semantic that will be used by this producer (see {@link
+ * FlinkKafkaProducer2.Semantic}).
+ * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link
+ * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}).
+ */
+ public FlinkKafkaProducer2(
+ String defaultTopic,
+ KafkaSerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ FlinkKafkaProducer2.Semantic semantic,
+ int kafkaProducersPoolSize) {
+ this(
+ defaultTopic,
+ null,
+ null, /* keyed schema and FlinkKafkaPartitioner */
+ serializationSchema,
+ producerConfig,
+ semantic,
+ kafkaProducersPoolSize);
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It
+ * accepts a {@link KafkaSerializationSchema} and possibly a custom {@link
+ * FlinkKafkaPartitioner}.
+ *
+ * <p>If a partitioner is not provided, written records will be partitioned by the attached key
+ * of each record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If
+ * written records do not have a key (i.e., {@link
+ * KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they will be
+ * distributed to Kafka partitions in a round-robin fashion.
+ *
+ * @param defaultTopic The default topic to write data to
+ * @param keyedSchema A serializable serialization schema for turning user objects into a
+ * kafka-consumable byte[] supporting key/value messages
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka
+ * partitions. If a partitioner is not provided, records will be partitioned by the key of
+ * each record (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the
+ * keys are {@code null}, then records will be distributed to Kafka partitions in a
+ * round-robin fashion.
+ * @param kafkaSchema A serializable serialization schema for turning user objects into a
+ * kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
+ * the only required argument.
+ * @param semantic Defines semantic that will be used by this producer (see {@link
+ * FlinkKafkaProducer2.Semantic}).
+ * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link
+ * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}).
+ */
+ private FlinkKafkaProducer2(
+ String defaultTopic,
+ KeyedSerializationSchema<IN> keyedSchema,
+ FlinkKafkaPartitioner<IN> customPartitioner,
+ KafkaSerializationSchema<IN> kafkaSchema,
+ Properties producerConfig,
+ FlinkKafkaProducer2.Semantic semantic,
+ int kafkaProducersPoolSize) {
+ super(
+ new FlinkKafkaProducer2.TransactionStateSerializer(),
+ new FlinkKafkaProducer2.ContextStateSerializer());
+
+ this.defaultTopicId = checkNotNull(defaultTopic, "defaultTopic is null");
+
+ if (kafkaSchema != null) {
+ this.keyedSchema = null;
+ this.kafkaSchema = kafkaSchema;
+ this.flinkKafkaPartitioner = null;
+ ClosureCleaner.clean(
+ this.kafkaSchema, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+
+ if (customPartitioner != null) {
+ throw new IllegalArgumentException(
+ "Customer partitioner can only be used when"
+ + "using a KeyedSerializationSchema or SerializationSchema.");
+ }
+ } else if (keyedSchema != null) {
+ this.kafkaSchema = null;
+ this.keyedSchema = keyedSchema;
+ this.flinkKafkaPartitioner = customPartitioner;
+ ClosureCleaner.clean(
+ this.flinkKafkaPartitioner,
+ ExecutionConfig.ClosureCleanerLevel.RECURSIVE,
+ true);
+ ClosureCleaner.clean(
+ this.keyedSchema, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ } else {
+ throw new IllegalArgumentException(
+ "You must provide either a KafkaSerializationSchema or a"
+ + "KeyedSerializationSchema.");
+ }
+
+ this.producerConfig = checkNotNull(producerConfig, "producerConfig is null");
+ this.semantic = checkNotNull(semantic, "semantic is null");
+ this.kafkaProducersPoolSize = kafkaProducersPoolSize;
+ checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize must be non empty");
+
+ // set the producer configuration properties for kafka record key value serializers.
+ if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+ this.producerConfig.put(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ ByteArraySerializer.class.getName());
+ } else {
+ LOG.warn(
+ "Overwriting the '{}' is not recommended",
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+ }
+
+ if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+ this.producerConfig.put(
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ ByteArraySerializer.class.getName());
+ } else {
+ LOG.warn(
+ "Overwriting the '{}' is not recommended",
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+ }
+
+ // eagerly ensure that bootstrap servers are set.
+ if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+ throw new IllegalArgumentException(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
+ + " must be supplied in the producer config properties.");
+ }
+
+ if (!producerConfig.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
+ long timeout = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMilliseconds();
+ checkState(
+ timeout < Integer.MAX_VALUE && timeout > 0,
+ "timeout does not fit into 32 bit integer");
+ this.producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) timeout);
+ LOG.warn(
+ "Property [{}] not specified. Setting it to {}",
+ ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
+ DEFAULT_KAFKA_TRANSACTION_TIMEOUT);
+ }
+
+ // Enable transactionTimeoutWarnings to avoid silent data loss
+ // See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1):
+ // The KafkaProducer may not throw an exception if the transaction failed to commit
+ if (semantic == FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) {
+ final Object object =
+ this.producerConfig.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
+ final long transactionTimeout;
+ if (object instanceof String && StringUtils.isNumeric((String) object)) {
+ transactionTimeout = Long.parseLong((String) object);
+ } else if (object instanceof Number) {
+ transactionTimeout = ((Number) object).longValue();
+ } else {
+ throw new IllegalArgumentException(
+ ProducerConfig.TRANSACTION_TIMEOUT_CONFIG
+ + " must be numeric, was "
+ + object);
+ }
+ super.setTransactionTimeout(transactionTimeout);
+ super.enableTransactionTimeoutWarnings(0.8);
+ }
+
+ this.topicPartitionsMap = new HashMap<>();
+ }
+
+ // ---------------------------------- Properties --------------------------
+
+ /**
+ * If set to true, Flink will write the (event time) timestamp attached to each record into
+ * Kafka. Timestamps must be positive for Kafka to accept them.
+ *
+ * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to
+ * Kafka.
+ */
+ public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
+ this.writeTimestampToKafka = writeTimestampToKafka;
+ if (kafkaSchema instanceof KafkaSerializationSchemaWrapper) {
+ ((KafkaSerializationSchemaWrapper<IN>) kafkaSchema)
+ .setWriteTimestamp(writeTimestampToKafka);
+ }
+ }
+
+ /**
+ * Defines whether the producer should fail on errors, or only log them. If this is set to true,
+ * then exceptions will be only logged, if set to false, exceptions will be eventually thrown
+ * and cause the streaming program to fail (and enter recovery).
+ *
+ * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+ */
+ public void setLogFailuresOnly(boolean logFailuresOnly) {
+ this.logFailuresOnly = logFailuresOnly;
+ }
+
+ /**
+ * Disables the propagation of exceptions thrown when committing presumably timed out Kafka
+ * transactions during recovery of the job. If a Kafka transaction is timed out, a commit will
+ * never be successful. Hence, use this feature to avoid recovery loops of the Job. Exceptions
+ * will still be logged to inform the user that data loss might have occurred.
+ *
+ * <p>Note that we use {@link System#currentTimeMillis()} to track the age of a transaction.
+ * Moreover, only exceptions thrown during the recovery are caught, i.e., the producer will
+ * attempt at least one commit of the transaction before giving up.
+ */
+ @Override
+ public FlinkKafkaProducer2<IN> ignoreFailuresAfterTransactionTimeout() {
+ super.ignoreFailuresAfterTransactionTimeout();
+ return this;
+ }
+
+ // ----------------------------------- Utilities --------------------------
+
+ /** Initializes the connection to Kafka. */
+ @Override
+ public void open(Configuration configuration) throws Exception {
+ internalMetrics = new InternalMetrics(getRuntimeContext());
+ if (logFailuresOnly) {
+ callback =
+ new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception e) {
+ if (e != null) {
+ internalMetrics.incrementErrorEvents();
+ LOG.error(
+ "Error while sending record to Kafka: " + e.getMessage(),
+ e);
+ }else{
+ internalMetrics.incrementOutEvents();
+ internalMetrics.incrementOutBytes(metadata.serializedValueSize());
+ }
+
+ acknowledgeMessage();
+ }
+ };
+ } else {
+ callback =
+ new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception != null) {
+ internalMetrics.incrementErrorEvents();
+ if(asyncException == null){
+ asyncException = exception;
+ }
+ }else{
+ internalMetrics.incrementOutEvents();
+ internalMetrics.incrementOutBytes(metadata.serializedValueSize());
+ }
+
+ acknowledgeMessage();
+ }
+ };
+ }
+
+ RuntimeContext ctx = getRuntimeContext();
+
+ if (flinkKafkaPartitioner != null) {
+ flinkKafkaPartitioner.open(
+ ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
+ }
+
+ if (kafkaSchema instanceof KafkaContextAware) {
+ KafkaContextAware<IN> contextAwareSchema = (KafkaContextAware<IN>) kafkaSchema;
+ contextAwareSchema.setParallelInstanceId(ctx.getIndexOfThisSubtask());
+ contextAwareSchema.setNumParallelInstances(ctx.getNumberOfParallelSubtasks());
+ }
+
+ if (kafkaSchema != null) {
+ kafkaSchema.open(
+ RuntimeContextInitializationContextAdapters.serializationAdapter(
+ getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
+ }
+
+ if (keyedSchema == null && kafkaSchema == null){
+ throw new RuntimeException(
+ "We have neither KafkaSerializationSchema nor KeyedSerializationSchema, this"
+ + "is a bug.");
+ }
+
+ super.open(configuration);
+ }
+
+ @Override
+ public void invoke(
+ FlinkKafkaProducer2.KafkaTransactionState transaction, IN next, Context context)
+ throws FlinkKafkaException {
+ checkErroneous();
+
+ internalMetrics.incrementInEvents();
+ try {
+ ProducerRecord<byte[], byte[]> record;
+ if (keyedSchema != null) {
+ byte[] serializedKey = keyedSchema.serializeKey(next);
+ byte[] serializedValue = keyedSchema.serializeValue(next);
+ String targetTopic = keyedSchema.getTargetTopic(next);
+ if (targetTopic == null) {
+ targetTopic = defaultTopicId;
+ }
+
+ Long timestamp = null;
+ if (this.writeTimestampToKafka) {
+ timestamp = context.timestamp();
+ }
+
+ int[] partitions = topicPartitionsMap.get(targetTopic);
+ if (null == partitions) {
+ partitions = getPartitionsByTopic(targetTopic, transaction.producer);
+ topicPartitionsMap.put(targetTopic, partitions);
+ }
+ if (flinkKafkaPartitioner != null) {
+ record =
+ new ProducerRecord<>(
+ targetTopic,
+ flinkKafkaPartitioner.partition(
+ next,
+ serializedKey,
+ serializedValue,
+ targetTopic,
+ partitions),
+ timestamp,
+ serializedKey,
+ serializedValue);
+ } else {
+ record =
+ new ProducerRecord<>(
+ targetTopic, null, timestamp, serializedKey, serializedValue);
+ }
+ } else {
+ if (kafkaSchema instanceof KafkaContextAware) {
+ @SuppressWarnings("unchecked")
+ KafkaContextAware<IN> contextAwareSchema = (KafkaContextAware<IN>) kafkaSchema;
+
+ String targetTopic = contextAwareSchema.getTargetTopic(next);
+ if (targetTopic == null) {
+ targetTopic = defaultTopicId;
+ }
+ int[] partitions = topicPartitionsMap.get(targetTopic);
+
+ if (null == partitions) {
+ partitions = getPartitionsByTopic(targetTopic, transaction.producer);
+ topicPartitionsMap.put(targetTopic, partitions);
+ }
+
+ contextAwareSchema.setPartitions(partitions);
+ }
+ record = kafkaSchema.serialize(next, context.timestamp());
+ }
+
+ pendingRecords.incrementAndGet();
+ transaction.producer.send(record, callback);
+ } catch (Exception e) {
+ internalMetrics.incrementErrorEvents();
+ LOG.error("serialize error", e);
+ }
+ }
+
+ @Override
+ public void close() throws FlinkKafkaException {
+ // First close the producer for current transaction.
+ try {
+ final KafkaTransactionState currentTransaction = currentTransaction();
+ if (currentTransaction != null) {
+ // to avoid exceptions on aborting transactions with some pending records
+ flush(currentTransaction);
+
+ // normal abort for AT_LEAST_ONCE and NONE do not clean up resources because of
+ // producer reusing, thus
+ // we need to close it manually
+ switch (semantic) {
+ case EXACTLY_ONCE:
+ break;
+ case AT_LEAST_ONCE:
+ case NONE:
+ currentTransaction.producer.flush();
+ currentTransaction.producer.close(Duration.ofSeconds(0));
+ break;
+ }
+ }
+ super.close();
+ } catch (Exception e) {
+ asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
+ } finally {
+ // We may have to close producer of the current transaction in case some exception was
+ // thrown before
+ // the normal close routine finishes.
+ if (currentTransaction() != null) {
+ try {
+ currentTransaction().producer.close(Duration.ofSeconds(0));
+ } catch (Throwable t) {
+ LOG.warn("Error closing producer.", t);
+ }
+ }
+ // Make sure all the producers for pending transactions are closed.
+ pendingTransactions()
+ .forEach(
+ transaction -> {
+ try {
+ transaction.getValue().producer.close(Duration.ofSeconds(0));
+ } catch (Throwable t) {
+ LOG.warn("Error closing producer.", t);
+ }
+ });
+ // make sure we propagate pending errors
+ checkErroneous();
+ }
+ }
+
+ // ------------------- Logic for handling checkpoint flushing -------------------------- //
+
+ @Override
+ protected FlinkKafkaProducer2.KafkaTransactionState beginTransaction()
+ throws FlinkKafkaException {
+ switch (semantic) {
+ case EXACTLY_ONCE:
+ FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
+ producer.beginTransaction();
+ return new FlinkKafkaProducer2.KafkaTransactionState(
+ producer.getTransactionalId(), producer);
+ case AT_LEAST_ONCE:
+ case NONE:
+ // Do not create new producer on each beginTransaction() if it is not necessary
+ final FlinkKafkaProducer2.KafkaTransactionState currentTransaction =
+ currentTransaction();
+ if (currentTransaction != null && currentTransaction.producer != null) {
+ return new FlinkKafkaProducer2.KafkaTransactionState(
+ currentTransaction.producer);
+ }
+ return new FlinkKafkaProducer2.KafkaTransactionState(
+ initNonTransactionalProducer(true));
+ default:
+ throw new UnsupportedOperationException("Not implemented semantic");
+ }
+ }
+
+ @Override
+ protected void preCommit(FlinkKafkaProducer2.KafkaTransactionState transaction)
+ throws FlinkKafkaException {
+ switch (semantic) {
+ case EXACTLY_ONCE:
+ case AT_LEAST_ONCE:
+ flush(transaction);
+ break;
+ case NONE:
+ break;
+ default:
+ throw new UnsupportedOperationException("Not implemented semantic");
+ }
+ checkErroneous();
+ }
+
+ @Override
+ protected void commit(FlinkKafkaProducer2.KafkaTransactionState transaction) {
+ if (transaction.isTransactional()) {
+ try {
+ transaction.producer.commitTransaction();
+ } finally {
+ recycleTransactionalProducer(transaction.producer);
+ }
+ }
+ }
+
+ @Override
+ protected void recoverAndCommit(FlinkKafkaProducer2.KafkaTransactionState transaction) {
+ if (transaction.isTransactional()) {
+ FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
+ try {
+ producer = initTransactionalProducer(transaction.transactionalId, false);
+ producer.resumeTransaction(transaction.producerId, transaction.epoch);
+ producer.commitTransaction();
+ } catch (InvalidTxnStateException | ProducerFencedException ex) {
+ // That means we have committed this transaction before.
+ LOG.warn(
+ "Encountered error {} while recovering transaction {}. "
+ + "Presumably this transaction has been already committed before",
+ ex,
+ transaction);
+ } finally {
+ if (producer != null) {
+ producer.close(0, TimeUnit.SECONDS);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void abort(FlinkKafkaProducer2.KafkaTransactionState transaction) {
+ if (transaction.isTransactional()) {
+ transaction.producer.abortTransaction();
+ recycleTransactionalProducer(transaction.producer);
+ }
+ }
+
+ @Override
+ protected void recoverAndAbort(FlinkKafkaProducer2.KafkaTransactionState transaction) {
+ if (transaction.isTransactional()) {
+ FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
+ try {
+ producer = initTransactionalProducer(transaction.transactionalId, false);
+ producer.initTransactions();
+ } finally {
+ if (producer != null) {
+ producer.close(0, TimeUnit.SECONDS);
+ }
+ }
+ }
+ }
+
+ /**
+ * <b>ATTENTION to subclass implementors:</b> When overriding this method, please always call
+ * {@code super.acknowledgeMessage()} to keep the invariants of the internal bookkeeping of the
+ * producer. If not, be sure to know what you are doing.
+ */
+ protected void acknowledgeMessage() {
+ pendingRecords.decrementAndGet();
+ }
+
+ /**
+ * Flush pending records.
+ *
+ * @param transaction
+ */
+ private void flush(FlinkKafkaProducer2.KafkaTransactionState transaction)
+ throws FlinkKafkaException {
+ if (transaction.producer != null) {
+ transaction.producer.flush();
+ }
+ long pendingRecordsCount = pendingRecords.get();
+ if (pendingRecordsCount != 0) {
+ throw new IllegalStateException(
+ "Pending record count must be zero at this point: " + pendingRecordsCount);
+ }
+
+ // if the flushed requests has errors, we should propagate it also and fail the checkpoint
+ checkErroneous();
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+
+ nextTransactionalIdHintState.clear();
+ // To avoid duplication only first subtask keeps track of next transactional id hint.
+ // Otherwise all of the
+ // subtasks would write exactly same information.
+ if (getRuntimeContext().getIndexOfThisSubtask() == 0
+ && semantic == FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) {
+ checkState(
+ nextTransactionalIdHint != null,
+ "nextTransactionalIdHint must be set for EXACTLY_ONCE");
+ long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;
+
+ // If we scaled up, some (unknown) subtask must have created new transactional ids from
+ // scratch. In that
+ // case we adjust nextFreeTransactionalId by the range of transactionalIds that could be
+ // used for this
+ // scaling up.
+ if (getRuntimeContext().getNumberOfParallelSubtasks()
+ > nextTransactionalIdHint.lastParallelism) {
+ nextFreeTransactionalId +=
+ getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
+ }
+
+ nextTransactionalIdHintState.add(
+ new FlinkKafkaProducer2.NextTransactionalIdHint(
+ getRuntimeContext().getNumberOfParallelSubtasks(),
+ nextFreeTransactionalId));
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ if (semantic != FlinkKafkaProducer2.Semantic.NONE
+ && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
+ LOG.warn(
+ "Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.",
+ semantic,
+ FlinkKafkaProducer2.Semantic.NONE);
+ semantic = FlinkKafkaProducer2.Semantic.NONE;
+ }
+
+ nextTransactionalIdHintState =
+ context.getOperatorStateStore()
+ .getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
+
+ if (context.getOperatorStateStore()
+ .getRegisteredStateNames()
+ .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) {
+ migrateNextTransactionalIdHindState(context);
+ }
+
+ String taskName = getRuntimeContext().getTaskName();
+ // Kafka transactional IDs are limited in length to be less than the max value of a short,
+ // so we truncate here if necessary to a more reasonable length string.
+ if (taskName.length() > maxTaskNameSize) {
+ taskName = taskName.substring(0, maxTaskNameSize);
+ LOG.warn(
+ "Truncated task name for Kafka TransactionalId from {} to {}.",
+ getRuntimeContext().getTaskName(),
+ taskName);
+ }
+ transactionalIdsGenerator =
+ new TransactionalIdsGenerator(
+ taskName
+ + "-"
+ + ((StreamingRuntimeContext) getRuntimeContext())
+ .getOperatorUniqueID(),
+ getRuntimeContext().getIndexOfThisSubtask(),
+ getRuntimeContext().getNumberOfParallelSubtasks(),
+ kafkaProducersPoolSize,
+ SAFE_SCALE_DOWN_FACTOR);
+
+ if (semantic != FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) {
+ nextTransactionalIdHint = null;
+ } else {
+ ArrayList<FlinkKafkaProducer2.NextTransactionalIdHint> transactionalIdHints =
+ Lists.newArrayList(nextTransactionalIdHintState.get());
+ if (transactionalIdHints.size() > 1) {
+ throw new IllegalStateException(
+ "There should be at most one next transactional id hint written by the first subtask");
+ } else if (transactionalIdHints.size() == 0) {
+ nextTransactionalIdHint = new FlinkKafkaProducer2.NextTransactionalIdHint(0, 0);
+
+ // this means that this is either:
+ // (1) the first execution of this application
+ // (2) previous execution has failed before first checkpoint completed
+ //
+ // in case of (2) we have to abort all previous transactions
+ abortTransactions(transactionalIdsGenerator.generateIdsToAbort());
+ } else {
+ nextTransactionalIdHint = transactionalIdHints.get(0);
+ }
+ }
+
+ super.initializeState(context);
+ }
+
+ @Override
+ protected Optional<FlinkKafkaProducer2.KafkaTransactionContext> initializeUserContext() {
+ if (semantic != FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) {
+ return Optional.empty();
+ }
+
+ Set<String> transactionalIds = generateNewTransactionalIds();
+ resetAvailableTransactionalIdsPool(transactionalIds);
+ return Optional.of(new FlinkKafkaProducer2.KafkaTransactionContext(transactionalIds));
+ }
+
+ private Set<String> generateNewTransactionalIds() {
+ checkState(
+ nextTransactionalIdHint != null,
+ "nextTransactionalIdHint must be present for EXACTLY_ONCE");
+
+ Set<String> transactionalIds =
+ transactionalIdsGenerator.generateIdsToUse(
+ nextTransactionalIdHint.nextFreeTransactionalId);
+ LOG.info("Generated new transactionalIds {}", transactionalIds);
+ return transactionalIds;
+ }
+
+ @Override
+ protected void finishRecoveringContext(
+ Collection<FlinkKafkaProducer2.KafkaTransactionState> handledTransactions) {
+ cleanUpUserContext(handledTransactions);
+ resetAvailableTransactionalIdsPool(getUserContext().get().transactionalIds);
+ LOG.info("Recovered transactionalIds {}", getUserContext().get().transactionalIds);
+ }
+
+ protected FlinkKafkaInternalProducer<byte[], byte[]> createProducer() {
+ return new FlinkKafkaInternalProducer<>(this.producerConfig);
+ }
+
+ /**
+ * After initialization make sure that all previous transactions from the current user context
+ * have been completed.
+ *
+ * @param handledTransactions transactions which were already committed or aborted and do not
+ * need further handling
+ */
+ private void cleanUpUserContext(
+ Collection<FlinkKafkaProducer2.KafkaTransactionState> handledTransactions) {
+ if (!getUserContext().isPresent()) {
+ return;
+ }
+ HashSet<String> abortTransactions = new HashSet<>(getUserContext().get().transactionalIds);
+ handledTransactions.forEach(
+ kafkaTransactionState ->
+ abortTransactions.remove(kafkaTransactionState.transactionalId));
+ abortTransactions(abortTransactions);
+ }
+
+ private void resetAvailableTransactionalIdsPool(Collection<String> transactionalIds) {
+ availableTransactionalIds.clear();
+ availableTransactionalIds.addAll(transactionalIds);
+ }
+
+ // ----------------------------------- Utilities --------------------------
+
+ private void abortTransactions(final Set<String> transactionalIds) {
+ final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ transactionalIds
+ .parallelStream()
+ .forEach(
+ transactionalId -> {
+ // The parallelStream executes the consumer in a separated thread pool.
+ // Because the consumer(e.g. Kafka) uses the context classloader to
+ // construct some class
+ // we should set the correct classloader for it.
+ try (TemporaryClassLoaderContext ignored =
+ TemporaryClassLoaderContext.of(classLoader)) {
+ // don't mess with the original configuration or any other
+ // properties of the
+ // original object
+ // -> create an internal kafka producer on our own and do not rely
+ // on
+ // initTransactionalProducer().
+ final Properties myConfig = new Properties();
+ myConfig.putAll(producerConfig);
+ initTransactionalProducerConfig(myConfig, transactionalId);
+ FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer = null;
+ try {
+ kafkaProducer = new FlinkKafkaInternalProducer<>(myConfig);
+ // it suffices to call initTransactions - this will abort any
+ // lingering transactions
+ kafkaProducer.initTransactions();
+ } finally {
+ if (kafkaProducer != null) {
+ kafkaProducer.close(Duration.ofSeconds(0));
+ }
+ }
+ }
+ });
+ }
+
+ int getTransactionCoordinatorId() {
+ final FlinkKafkaProducer2.KafkaTransactionState currentTransaction = currentTransaction();
+ if (currentTransaction == null || currentTransaction.producer == null) {
+ throw new IllegalArgumentException();
+ }
+ return currentTransaction.producer.getTransactionCoordinatorId();
+ }
+
+ /**
+ * For each checkpoint we create new {@link FlinkKafkaInternalProducer} so that new transactions
+ * will not clash with transactions created during previous checkpoints ({@code
+ * producer.initTransactions()} assures that we obtain new producerId and epoch counters).
+ */
+ private FlinkKafkaInternalProducer<byte[], byte[]> createTransactionalProducer()
+ throws FlinkKafkaException {
+ String transactionalId = availableTransactionalIds.poll();
+ if (transactionalId == null) {
+ throw new FlinkKafkaException(
+ FlinkKafkaErrorCode.PRODUCERS_POOL_EMPTY,
+ "Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");
+ }
+ FlinkKafkaInternalProducer<byte[], byte[]> producer =
+ initTransactionalProducer(transactionalId, true);
+ producer.initTransactions();
+ return producer;
+ }
+
+ private void recycleTransactionalProducer(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
+ availableTransactionalIds.add(producer.getTransactionalId());
+ producer.flush();
+ producer.close(Duration.ofSeconds(0));
+ }
+
+ private FlinkKafkaInternalProducer<byte[], byte[]> initTransactionalProducer(
+ String transactionalId, boolean registerMetrics) {
+ initTransactionalProducerConfig(producerConfig, transactionalId);
+ return initProducer(registerMetrics);
+ }
+
+ private static void initTransactionalProducerConfig(
+ Properties producerConfig, String transactionalId) {
+ producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+ }
+
+ private FlinkKafkaInternalProducer<byte[], byte[]> initNonTransactionalProducer(
+ boolean registerMetrics) {
+ producerConfig.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+ return initProducer(registerMetrics);
+ }
+
+ private FlinkKafkaInternalProducer<byte[], byte[]> initProducer(boolean registerMetrics) {
+ FlinkKafkaInternalProducer<byte[], byte[]> producer = createProducer();
+
+ LOG.info(
+ "Starting FlinkKafkaInternalProducer ({}/{}) to produce into default topic {}",
+ getRuntimeContext().getIndexOfThisSubtask() + 1,
+ getRuntimeContext().getNumberOfParallelSubtasks(),
+ defaultTopicId);
+
+ // register Kafka metrics to Flink accumulators
+ if (registerMetrics
+ && !Boolean.parseBoolean(
+ producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
+ Map<MetricName, ? extends Metric> metrics = producer.metrics();
+
+ if (metrics == null) {
+ // MapR's Kafka implementation returns null here.
+ LOG.info("Producer implementation does not support metrics");
+ } else {
+ final MetricGroup kafkaMetricGroup =
+ getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
+ for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
+ String name = entry.getKey().name();
+ Metric metric = entry.getValue();
+
+ KafkaMetricMutableWrapper wrapper = previouslyCreatedMetrics.get(name);
+ if (wrapper != null) {
+ wrapper.setKafkaMetric(metric);
+ } else {
+ // TODO: somehow merge metrics from all active producers?
+ wrapper = new KafkaMetricMutableWrapper(metric);
+ previouslyCreatedMetrics.put(name, wrapper);
+ kafkaMetricGroup.gauge(name, wrapper);
+ }
+ }
+ }
+ }
+ return producer;
+ }
+
+ protected void checkErroneous() throws FlinkKafkaException {
+ Exception e = asyncException;
+ if (e != null) {
+ // prevent double throwing
+ asyncException = null;
+ throw new FlinkKafkaException(
+ FlinkKafkaErrorCode.EXTERNAL_ERROR,
+ "Failed to send data to Kafka: " + e.getMessage(),
+ e);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ }
+
+ private void migrateNextTransactionalIdHindState(FunctionInitializationContext context)
+ throws Exception {
+ ListState<NextTransactionalIdHint> oldNextTransactionalIdHintState =
+ context.getOperatorStateStore()
+ .getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
+ nextTransactionalIdHintState =
+ context.getOperatorStateStore()
+ .getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
+
+ ArrayList<NextTransactionalIdHint> oldTransactionalIdHints =
+ Lists.newArrayList(oldNextTransactionalIdHintState.get());
+ if (!oldTransactionalIdHints.isEmpty()) {
+ nextTransactionalIdHintState.addAll(oldTransactionalIdHints);
+ // clear old state
+ oldNextTransactionalIdHintState.clear();
+ }
+ }
+
+ private static Properties getPropertiesFromBrokerList(String brokerList) {
+ String[] elements = brokerList.split(",");
+
+ // validate the broker addresses
+ for (String broker : elements) {
+ NetUtils.getCorrectHostnamePort(broker);
+ }
+
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+ return props;
+ }
+
+ protected static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
+ // the fetched list is immutable, so we're creating a mutable copy in order to sort it
+ List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
+
+ // sort the partitions by partition id to make sure the fetched partition list is the same
+ // across subtasks
+ Collections.sort(
+ partitionsList,
+ new Comparator<PartitionInfo>() {
+ @Override
+ public int compare(PartitionInfo o1, PartitionInfo o2) {
+ return Integer.compare(o1.partition(), o2.partition());
+ }
+ });
+
+ int[] partitions = new int[partitionsList.size()];
+ for (int i = 0; i < partitions.length; i++) {
+ partitions[i] = partitionsList.get(i).partition();
+ }
+
+ return partitions;
+ }
+
+ /** State for handling transactions. */
+ @VisibleForTesting
+ @Internal
+ public static class KafkaTransactionState {
+
+ private final transient FlinkKafkaInternalProducer<byte[], byte[]> producer;
+
+ @Nullable final String transactionalId;
+
+ final long producerId;
+
+ final short epoch;
+
+ @VisibleForTesting
+ public KafkaTransactionState(
+ String transactionalId, FlinkKafkaInternalProducer<byte[], byte[]> producer) {
+ this(transactionalId, producer.getProducerId(), producer.getEpoch(), producer);
+ }
+
+ @VisibleForTesting
+ public KafkaTransactionState(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
+ this(null, -1, (short) -1, producer);
+ }
+
+ @VisibleForTesting
+ public KafkaTransactionState(
+ @Nullable String transactionalId,
+ long producerId,
+ short epoch,
+ FlinkKafkaInternalProducer<byte[], byte[]> producer) {
+ this.transactionalId = transactionalId;
+ this.producerId = producerId;
+ this.epoch = epoch;
+ this.producer = producer;
+ }
+
+ boolean isTransactional() {
+ return transactionalId != null;
+ }
+
+ public FlinkKafkaInternalProducer<byte[], byte[]> getProducer() {
+ return producer;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "%s [transactionalId=%s, producerId=%s, epoch=%s]",
+ this.getClass().getSimpleName(), transactionalId, producerId, epoch);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FlinkKafkaProducer2.KafkaTransactionState that =
+ (FlinkKafkaProducer2.KafkaTransactionState) o;
+
+ if (producerId != that.producerId) {
+ return false;
+ }
+ if (epoch != that.epoch) {
+ return false;
+ }
+ return transactionalId != null
+ ? transactionalId.equals(that.transactionalId)
+ : that.transactionalId == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = transactionalId != null ? transactionalId.hashCode() : 0;
+ result = 31 * result + (int) (producerId ^ (producerId >>> 32));
+ result = 31 * result + (int) epoch;
+ return result;
+ }
+ }
+
+ /**
+ * Context associated to this instance of the {@link FlinkKafkaProducer2}. User for keeping track
+ * of the transactionalIds.
+ */
+ @VisibleForTesting
+ @Internal
+ public static class KafkaTransactionContext {
+ final Set<String> transactionalIds;
+
+ @VisibleForTesting
+ public KafkaTransactionContext(Set<String> transactionalIds) {
+ checkNotNull(transactionalIds);
+ this.transactionalIds = transactionalIds;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FlinkKafkaProducer2.KafkaTransactionContext that =
+ (FlinkKafkaProducer2.KafkaTransactionContext) o;
+
+ return transactionalIds.equals(that.transactionalIds);
+ }
+
+ @Override
+ public int hashCode() {
+ return transactionalIds.hashCode();
+ }
+ }
+
+ /**
+ * {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link
+ * FlinkKafkaProducer2.KafkaTransactionState}.
+ */
+ @VisibleForTesting
+ @Internal
+ public static class TransactionStateSerializer
+ extends TypeSerializerSingleton<FlinkKafkaProducer2.KafkaTransactionState> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public FlinkKafkaProducer2.KafkaTransactionState createInstance() {
+ return null;
+ }
+
+ @Override
+ public FlinkKafkaProducer2.KafkaTransactionState copy(
+ FlinkKafkaProducer2.KafkaTransactionState from) {
+ return from;
+ }
+
+ @Override
+ public FlinkKafkaProducer2.KafkaTransactionState copy(
+ FlinkKafkaProducer2.KafkaTransactionState from,
+ FlinkKafkaProducer2.KafkaTransactionState reuse) {
+ return from;
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(
+ FlinkKafkaProducer2.KafkaTransactionState record, DataOutputView target)
+ throws IOException {
+ if (record.transactionalId == null) {
+ target.writeBoolean(false);
+ } else {
+ target.writeBoolean(true);
+ target.writeUTF(record.transactionalId);
+ }
+ target.writeLong(record.producerId);
+ target.writeShort(record.epoch);
+ }
+
+ @Override
+ public FlinkKafkaProducer2.KafkaTransactionState deserialize(DataInputView source)
+ throws IOException {
+ String transactionalId = null;
+ if (source.readBoolean()) {
+ transactionalId = source.readUTF();
+ }
+ long producerId = source.readLong();
+ short epoch = source.readShort();
+ return new FlinkKafkaProducer2.KafkaTransactionState(
+ transactionalId, producerId, epoch, null);
+ }
+
+ @Override
+ public FlinkKafkaProducer2.KafkaTransactionState deserialize(
+ FlinkKafkaProducer2.KafkaTransactionState reuse, DataInputView source)
+ throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ boolean hasTransactionalId = source.readBoolean();
+ target.writeBoolean(hasTransactionalId);
+ if (hasTransactionalId) {
+ target.writeUTF(source.readUTF());
+ }
+ target.writeLong(source.readLong());
+ target.writeShort(source.readShort());
+ }
+
+ // -----------------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<FlinkKafkaProducer2.KafkaTransactionState>
+ snapshotConfiguration() {
+ return new TransactionStateSerializerSnapshot();
+ }
+
+ /** Serializer configuration snapshot for compatibility and format evolution. */
+ @SuppressWarnings("WeakerAccess")
+ public static final class TransactionStateSerializerSnapshot
+ extends SimpleTypeSerializerSnapshot<FlinkKafkaProducer2.KafkaTransactionState> {
+
+ public TransactionStateSerializerSnapshot() {
+ super(TransactionStateSerializer::new);
+ }
+ }
+ }
+
+ /**
+ * {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link
+ * FlinkKafkaProducer2.KafkaTransactionContext}.
+ */
+ @VisibleForTesting
+ @Internal
+ public static class ContextStateSerializer
+ extends TypeSerializerSingleton<FlinkKafkaProducer2.KafkaTransactionContext> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public FlinkKafkaProducer2.KafkaTransactionContext createInstance() {
+ return null;
+ }
+
+ @Override
+ public FlinkKafkaProducer2.KafkaTransactionContext copy(
+ FlinkKafkaProducer2.KafkaTransactionContext from) {
+ return from;
+ }
+
+ @Override
+ public FlinkKafkaProducer2.KafkaTransactionContext copy(
+ FlinkKafkaProducer2.KafkaTransactionContext from,
+ FlinkKafkaProducer2.KafkaTransactionContext reuse) {
+ return from;
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(
+ FlinkKafkaProducer2.KafkaTransactionContext record, DataOutputView target)
+ throws IOException {
+ int numIds = record.transactionalIds.size();
+ target.writeInt(numIds);
+ for (String id : record.transactionalIds) {
+ target.writeUTF(id);
+ }
+ }
+
+ @Override
+ public FlinkKafkaProducer2.KafkaTransactionContext deserialize(DataInputView source)
+ throws IOException {
+ int numIds = source.readInt();
+ Set<String> ids = new HashSet<>(numIds);
+ for (int i = 0; i < numIds; i++) {
+ ids.add(source.readUTF());
+ }
+ return new FlinkKafkaProducer2.KafkaTransactionContext(ids);
+ }
+
+ @Override
+ public FlinkKafkaProducer2.KafkaTransactionContext deserialize(
+ FlinkKafkaProducer2.KafkaTransactionContext reuse, DataInputView source)
+ throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ int numIds = source.readInt();
+ target.writeInt(numIds);
+ for (int i = 0; i < numIds; i++) {
+ target.writeUTF(source.readUTF());
+ }
+ }
+
+ // -----------------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<KafkaTransactionContext> snapshotConfiguration() {
+ return new ContextStateSerializerSnapshot();
+ }
+
+ /** Serializer configuration snapshot for compatibility and format evolution. */
+ @SuppressWarnings("WeakerAccess")
+ public static final class ContextStateSerializerSnapshot
+ extends SimpleTypeSerializerSnapshot<KafkaTransactionContext> {
+
+ public ContextStateSerializerSnapshot() {
+ super(ContextStateSerializer::new);
+ }
+ }
+ }
+
+ /** Keep information required to deduce next safe to use transactional id. */
+ public static class NextTransactionalIdHint {
+ public int lastParallelism = 0;
+ public long nextFreeTransactionalId = 0;
+
+ public NextTransactionalIdHint() {
+ this(0, 0);
+ }
+
+ public NextTransactionalIdHint(int parallelism, long nextFreeTransactionalId) {
+ this.lastParallelism = parallelism;
+ this.nextFreeTransactionalId = nextFreeTransactionalId;
+ }
+
+ @Override
+ public String toString() {
+ return "NextTransactionalIdHint["
+ + "lastParallelism="
+ + lastParallelism
+ + ", nextFreeTransactionalId="
+ + nextFreeTransactionalId
+ + ']';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ NextTransactionalIdHint that = (NextTransactionalIdHint) o;
+
+ if (lastParallelism != that.lastParallelism) {
+ return false;
+ }
+ return nextFreeTransactionalId == that.nextFreeTransactionalId;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = lastParallelism;
+ result =
+ 31 * result
+ + (int) (nextFreeTransactionalId ^ (nextFreeTransactionalId >>> 32));
+ return result;
+ }
+ }
+
+ /**
+ * {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link
+ * FlinkKafkaProducer2.NextTransactionalIdHint}.
+ */
+ @VisibleForTesting
+ @Internal
+ public static class NextTransactionalIdHintSerializer
+ extends TypeSerializerSingleton<NextTransactionalIdHint> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public NextTransactionalIdHint createInstance() {
+ return new NextTransactionalIdHint();
+ }
+
+ @Override
+ public NextTransactionalIdHint copy(NextTransactionalIdHint from) {
+ return from;
+ }
+
+ @Override
+ public NextTransactionalIdHint copy(
+ NextTransactionalIdHint from, NextTransactionalIdHint reuse) {
+ return from;
+ }
+
+ @Override
+ public int getLength() {
+ return Long.BYTES + Integer.BYTES;
+ }
+
+ @Override
+ public void serialize(NextTransactionalIdHint record, DataOutputView target)
+ throws IOException {
+ target.writeLong(record.nextFreeTransactionalId);
+ target.writeInt(record.lastParallelism);
+ }
+
+ @Override
+ public NextTransactionalIdHint deserialize(DataInputView source) throws IOException {
+ long nextFreeTransactionalId = source.readLong();
+ int lastParallelism = source.readInt();
+ return new NextTransactionalIdHint(lastParallelism, nextFreeTransactionalId);
+ }
+
+ @Override
+ public NextTransactionalIdHint deserialize(
+ NextTransactionalIdHint reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ target.writeLong(source.readLong());
+ target.writeInt(source.readInt());
+ }
+
+ @Override
+ public TypeSerializerSnapshot<NextTransactionalIdHint> snapshotConfiguration() {
+ return new NextTransactionalIdHintSerializerSnapshot();
+ }
+
+ /** Serializer configuration snapshot for compatibility and format evolution. */
+ @SuppressWarnings("WeakerAccess")
+ public static final class NextTransactionalIdHintSerializerSnapshot
+ extends SimpleTypeSerializerSnapshot<NextTransactionalIdHint> {
+
+ public NextTransactionalIdHintSerializerSnapshot() {
+ super(NextTransactionalIdHintSerializer::new);
+ }
+ }
+ }
+}
diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/RuntimeContextAware.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/RuntimeContextAware.java
new file mode 100644
index 0000000..ed35475
--- /dev/null
+++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/RuntimeContextAware.java
@@ -0,0 +1,7 @@
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+public interface RuntimeContextAware {
+ void runtimeContextAware(RuntimeContext runtimeContext);
+}
diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher2.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher2.java
new file mode 100644
index 0000000..4000079
--- /dev/null
+++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher2.java
@@ -0,0 +1,38 @@
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.RuntimeContextAware;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class KafkaFetcher2<T> extends KafkaFetcher<T> {
+ public KafkaFetcher2(
+ SourceFunction.SourceContext<T> sourceContext,
+ Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
+ SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
+ StreamingRuntimeContext runtimeContext,
+ ProcessingTimeService processingTimeProvider,
+ long autoWatermarkInterval,
+ ClassLoader userCodeClassLoader,
+ String taskNameWithSubtasks,
+ KafkaDeserializationSchema<T> deserializer,
+ Properties kafkaProperties,
+ long pollTimeout,
+ MetricGroup subtaskMetricGroup,
+ MetricGroup consumerMetricGroup,
+ boolean useMetrics) throws Exception {
+ super(sourceContext, assignedPartitionsWithInitialOffsets, watermarkStrategy, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, taskNameWithSubtasks, deserializer, kafkaProperties, pollTimeout, subtaskMetricGroup, consumerMetricGroup, useMetrics);
+ if(deserializer instanceof RuntimeContextAware){
+ ((RuntimeContextAware) deserializer).runtimeContextAware(runtimeContext);
+ }
+ }
+}
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java
index f6201b7..5c51a3f 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java
@@ -37,11 +37,11 @@ public class JsonEventDeserializationSchema implements DeserializationSchema<Eve
try {
map = JSON.parseObject(message);
} catch (Exception e) {
- LOG.error(String.format("json解析失败for:%s", message), e);
if(ignoreParseErrors){
+ LOG.error(String.format("json解析失败for:%s", message), e);
return null;
}else{
- throw new IOException(e);
+ throw new IOException(String.format("json解析失败for:%s", message), e);
}
}
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatOptions.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatOptions.java
index ef529b0..fa7a486 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatOptions.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatOptions.java
@@ -1,15 +1,15 @@
-package com.geedgenetworks.formats.json;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-
-public class JsonFormatOptions {
- public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
- ConfigOptions.key("ignore.parse.errors")
- .booleanType()
- .defaultValue(true)
- .withDescription(
- "Optional flag to skip fields and rows with parse errors instead of failing;\n"
- + "fields are set to null in case of errors, true by default.");
-
-}
+package com.geedgenetworks.formats.json;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class JsonFormatOptions {
+ public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
+ ConfigOptions.key("ignore.parse.errors")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional flag to skip fields and rows with parse errors instead of failing;\n"
+ + "fields are set to null in case of errors, true by default.");
+
+}
diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java
index dbd3cbe..c599445 100644
--- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java
+++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java
@@ -47,11 +47,11 @@ public class ProtobufEventDeserializationSchema implements DeserializationSchema
event.setExtractedFields(map);
return event;
} catch (Exception e) {
- LOG.error(String.format("proto解析失败for:%s", Base64.getEncoder().encodeToString(message)), e);
if(ignoreParseErrors){
+ LOG.error(String.format("proto解析失败for:%s", Base64.getEncoder().encodeToString(message)), e);
return null;
}else{
- throw new IOException(e);
+ throw new IOException(String.format("proto解析失败for:%s", Base64.getEncoder().encodeToString(message)), e);
}
}
}
diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java
index 74c3e21..7f33dad 100644
--- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java
+++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java
@@ -32,8 +32,7 @@ public class ProtobufEventSerializationSchema implements SerializationSchema<Eve
try {
return serializer.serialize(map);
} catch (Exception e) {
- LOG.error(String.format("proto serialize失败for:%s", JSON.toJSONString(map)), e);
- return null;
+ throw new RuntimeException(String.format("proto serialize失败for:%s", JSON.toJSONString(map)),e);
}
}
}
diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufOptions.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufOptions.java
index 9fadce3..b29091a 100644
--- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufOptions.java
+++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufOptions.java
@@ -1,35 +1,35 @@
-package com.geedgenetworks.formats.protobuf;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-
-public class ProtobufOptions {
- public static final ConfigOption<String> MESSAGE_NAME =
- ConfigOptions.key("message.name")
- .stringType()
- .noDefaultValue()
- .withDescription( "The protobuf MessageName to look for in descriptor file.");
-
- public static final ConfigOption<String> DESC_FILE_PATH =
- ConfigOptions.key("descriptor.file.path")
- .stringType()
- .noDefaultValue()
- .withDescription( "The Protobuf descriptor file.");
-
- public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
- ConfigOptions.key("ignore.parse.errors")
- .booleanType()
- .defaultValue(true)
- .withDescription(
- "Optional flag to skip fields and rows with parse errors instead of failing;\n"
- + "fields are set to null in case of errors, true by default.");
-
- public static final ConfigOption<Boolean> EMIT_DEFAULT_VALUES =
- ConfigOptions.key("emit.default.values")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "Optional flag for whether to render fields with zero values when deserializing Protobuf to struct.\n"
- + "When a field is empty in the serialized Protobuf, this library will deserialize them as null by default. However, this flag can control whether to render the type-specific zero value.\n"
- + "This operates similarly to includingDefaultValues in protobuf-java-util's JsonFormat, or emitDefaults in golang/protobuf's jsonpb.");
-}
+package com.geedgenetworks.formats.protobuf;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class ProtobufOptions {
+ public static final ConfigOption<String> MESSAGE_NAME =
+ ConfigOptions.key("message.name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription( "The protobuf MessageName to look for in descriptor file.");
+
+ public static final ConfigOption<String> DESC_FILE_PATH =
+ ConfigOptions.key("descriptor.file.path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription( "The Protobuf descriptor file.");
+
+ public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
+ ConfigOptions.key("ignore.parse.errors")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional flag to skip fields and rows with parse errors instead of failing;\n"
+ + "fields are set to null in case of errors, true by default.");
+
+ public static final ConfigOption<Boolean> EMIT_DEFAULT_VALUES =
+ ConfigOptions.key("emit.default.values")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional flag for whether to render fields with zero values when deserializing Protobuf to struct.\n"
+ + "When a field is empty in the serialized Protobuf, this library will deserialize them as null by default. However, this flag can control whether to render the type-specific zero value.\n"
+ + "This operates similarly to includingDefaultValues in protobuf-java-util's JsonFormat, or emitDefaults in golang/protobuf's jsonpb.");
+}