diff options
| author | lifengchao <[email protected]> | 2024-02-22 13:50:32 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-02-22 13:50:32 +0800 |
| commit | 64da48538f993bc652404c84b56dd2c71bc25ff5 (patch) | |
| tree | a8c1dded67fd33f4f779eaf39ae916580817c9d0 | |
| parent | 68991747b9dd5775ba58439b51331a686c650294 (diff) | |
[feature][connector-kafka][connector-clickhouse] GAL-486 kafka clickhouse source sink输出Metrics
14 files changed, 2421 insertions, 399 deletions
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java index e3e9e76..ce58cda 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java @@ -5,6 +5,7 @@ import com.geedgenetworks.connectors.clickhouse.jdbc.ClickHouseBatchInsertConnec import com.geedgenetworks.connectors.clickhouse.jdbc.ClickHousePreparedBatchInsertStatement;
import com.geedgenetworks.connectors.clickhouse.jdbc.DataTypeStringV2;
import com.geedgenetworks.connectors.clickhouse.util.ClickHouseUtils;
+import com.geedgenetworks.core.metrics.InternalMetrics;
import com.github.housepower.data.*;
import com.github.housepower.data.type.*;
import com.github.housepower.data.type.complex.*;
@@ -55,10 +56,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun private int batchSize;
private long batchIntervalMs;
private transient volatile boolean closed;
- private transient Counter numRecordsOut;
- private transient Counter numRecordsOutFailed;
- private transient Meter numRecordsOutPerSecond;
- private transient Meter numRecordsOutFailedPerSecond;
+ private transient InternalMetrics internalMetrics;
private transient Thread outThread;
private transient ReentrantLock lock;
private transient Block batch;
@@ -132,10 +130,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun }
private void initMetric() throws Exception {
- numRecordsOut = getRuntimeContext().getMetricGroup().counter("ckNumRecordsOut");
- numRecordsOutFailed = getRuntimeContext().getMetricGroup().counter("ckNumRecordsOutFailed");
- numRecordsOutPerSecond = getRuntimeContext().getMetricGroup().meter("ckNumRecordsOutPerSecond", new MeterView(numRecordsOut));
- numRecordsOutFailedPerSecond = getRuntimeContext().getMetricGroup().meter("ckNumRecordsOutFailedPerSecond", new MeterView(numRecordsOutFailed));
+ internalMetrics = new InternalMetrics(getRuntimeContext());
}
private void initClickHouseParams() throws Exception {
@@ -207,6 +202,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun @Override
public final void invoke(T value, Context context) throws Exception {
checkFlushException();
+ internalMetrics.incrementInEvents();
+
lock.lock();
try {
if (addBatch(batch, value)) {
@@ -217,7 +214,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun flush();
}
} catch (Exception e) {
- numRecordsOutFailed.inc();
+ internalMetrics.incrementErrorEvents();
LOG.error("转换ck类型异常", e);
} finally{
lock.unlock();
@@ -253,8 +250,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun private void doFlush(Block block) throws Exception {
long start = System.currentTimeMillis();
int rowCnt = block.rowCnt();
- ClickHouseUtils.showBlockColumnsByteSize(block); // 仅用于测试
- LOG.warn("flush " + rowCnt + " start:" + new Timestamp(start) + "," + (start - lastFlushTs));
+ long blockByteSize = ClickHouseUtils.getBlockColumnsByteSize(block);
+ LOG.warn("flush " + rowCnt + ", totalSize:" + (blockByteSize >>> 20) + "M" + ", start:" + new Timestamp(start) + "," + (start - lastFlushTs));
lastFlushTs = System.currentTimeMillis();
int retryCount = 0;
@@ -275,14 +272,15 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun ClickHouseUtils.copyInsertBlockColumns(block, stmt.getBlock());
stmt.executeBatch();
- numRecordsOut.inc(rowCnt);
+ internalMetrics.incrementOutEvents(rowCnt);
+ internalMetrics.incrementOutBytes(blockByteSize);
LOG.warn("flush " + rowCnt + " end:" + new Timestamp(System.currentTimeMillis()) + "," + (System.currentTimeMillis() - start));
return;
} catch (Exception e) {
LOG.error("ClickHouseBatchInsertFail url:" + url, e);
if (retryCount >= 3) {
- numRecordsOutFailed.inc(rowCnt);
+ internalMetrics.incrementErrorEvents(rowCnt);
LOG.error("ClickHouseBatchInsertFinalFail for rowCnt:" + rowCnt);
// throw e;
return;
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java index 0e1a576..4a35878 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java @@ -1,325 +1,325 @@ -package com.geedgenetworks.connectors.clickhouse.util; - -import com.github.housepower.buffer.ByteArrayWriter; -import com.github.housepower.data.*; -import com.github.housepower.data.type.*; -import com.github.housepower.data.type.complex.DataTypeArray; -import com.github.housepower.data.type.complex.DataTypeDecimal; -import com.github.housepower.data.type.complex.DataTypeFixedString; -import com.github.housepower.data.type.complex.DataTypeString; -import com.github.housepower.jdbc.ClickHouseArray; -import com.github.housepower.jdbc.ClickHouseConnection; -import com.github.housepower.misc.BytesCharSeq; -import com.github.housepower.misc.DateTimeUtil; -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.Field; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.nio.ByteBuffer; -import java.sql.*; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -public class ClickHouseUtils { - static final Logger LOG = LoggerFactory.getLogger(ClickHouseUtils.class); - static final Pattern VALUES_REGEX = Pattern.compile("[Vv][Aa][Ll][Uu][Ee][Ss]\\s*\\("); - static final byte[] EMPTY_BYTES = new byte[0]; - public static final BytesCharSeq EMPTY_BYTES_CHAR_SEQ = new BytesCharSeq(EMPTY_BYTES); - private static Field blockColumnsField; - private static Field blockSettingsField; - private static Field blockNameAndPositionsField; - private static Field blockPlaceholderIndexesField; - private static Field blockRowCntField; - - static { - try { - blockColumnsField = Block.class.getDeclaredField("columns"); - blockColumnsField.setAccessible(true); - blockSettingsField = Block.class.getDeclaredField("settings"); - blockSettingsField.setAccessible(true); - blockNameAndPositionsField = Block.class.getDeclaredField("nameAndPositions"); - blockNameAndPositionsField.setAccessible(true); - blockPlaceholderIndexesField = Block.class.getDeclaredField("placeholderIndexes"); - blockPlaceholderIndexesField.setAccessible(true); - blockRowCntField = Block.class.getDeclaredField("rowCnt"); - blockRowCntField.setAccessible(true); - } catch (NoSuchFieldException e) { - e.printStackTrace(); - } - } - - /** - * 转换host => jdbc url格式 192.168.40.222:9001,192.168.40.223:9001 => - * jdbc:clickhouse://192.168.40.222:9001,jdbc:clickhouse://192.168.40.223:9001 - */ - public static String[] buildUrlsFromHost(String host) { - String[] hosts = host.split(","); - List<String> urls = new ArrayList<>(hosts.length); - for (int i = 0; i < hosts.length; i++) { - String[] ipPort = hosts[i].trim().split(":"); - String ip = ipPort[0].trim(); - int port = Integer.parseInt(ipPort[1].trim()); - urls.add("jdbc:clickhouse://" + ip + ":" + port); - } - return urls.toArray(new String[urls.size()]); - } - - public static String genePreparedInsertSql(String table, String[] columnNames) { - StringBuilder sb = new StringBuilder("insert into "); - sb.append(table).append("("); - //sb.append(String.join(",", Arrays.stream(columnNames).map(x -> "`" + x +"`").collect(Collectors.toList()))); - sb.append(String.join(",", columnNames)); - sb.append(")"); - sb.append(" values("); - sb.append(String.join(",", Arrays.stream(columnNames).map(x -> "?").collect(Collectors.toList()))); - sb.append(")"); - return sb.toString(); - } - - public static Tuple2<String[], Object[]> getInsertColumnsAndDefaultValuesForTable( - String[] urls, int urlIndex, Properties connInfo, String table) throws Exception { - Class.forName("com.github.housepower.jdbc.ClickHouseDriver"); - - int retryCount = 0; - while (true) { - retryCount++; - ClickHouseConnection connection = null; - Statement stmt = null; - ResultSet rst = null; - try { - connection = (ClickHouseConnection) DriverManager.getConnection(urls[urlIndex], connInfo); - stmt = connection.createStatement(); - rst = stmt.executeQuery("desc " + table); - - List<String> columnNames = new ArrayList<>(); - List<Object> columnDefaultValues = new ArrayList<>(); - while (rst.next()) { - String name = rst.getString("name"); - String typeStr = rst.getString("type"); - String defaultTypeStr = rst.getString("default_type"); - String defaultExpression = rst.getString("default_expression"); - if ("LowCardinality(String)".equals(typeStr)) { - typeStr = "String"; - } - IDataType<?, ?> type = DataTypeFactory.get(typeStr, connection.serverContext()); - if ("MATERIALIZED".equals(defaultTypeStr)) { - continue; - } - Object defaultValue = parseDefaultValue(type, defaultExpression); // 只解析数字和字符串 - if (defaultValue == null && !type.nullable()) { - if (type instanceof DataTypeArray) { - defaultValue = new ClickHouseArray(((DataTypeArray) type).getElemDataType(), new Object[0]); - } else { - defaultValue = type.defaultValue(); - } - } - columnNames.add(name); - columnDefaultValues.add(defaultValue); - } - - return new Tuple2<>(columnNames.toArray(new String[columnNames.size()]), columnDefaultValues.toArray(new Object[columnDefaultValues.size()])); - } catch (SQLException e) { - LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e); - if (retryCount >= 3) { - throw e; - } - urlIndex++; - if (urlIndex == urls.length) { - urlIndex = 0; - } - } finally { - if (rst != null) { - rst.close(); - } - if (stmt != null) { - stmt.close(); - } - if (connection != null) { - connection.close(); - } - } - } - } - - public static ZoneId chooseTimeZone(String[] urls, int urlIndex, Properties connInfo) throws Exception { - Class.forName("com.github.housepower.jdbc.ClickHouseDriver"); - - int retryCount = 0; - while (true) { - retryCount++; - Connection connection = null; - try { - connection = DriverManager.getConnection(urls[urlIndex], connInfo); - ZoneId tz = DateTimeUtil.chooseTimeZone(((ClickHouseConnection) connection).serverContext()); - return tz; - } catch (SQLException e) { - LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e); - if (retryCount >= 3) { - throw e; - } - urlIndex++; - if (urlIndex == urls.length) { - urlIndex = 0; - } - } finally { - if (connection != null) { - connection.close(); - } - } - } - } - - public static Block getInsertBlockForTable(String[] urls, int urlIndex, Properties connInfo, String table) throws Exception { - String sql = "insert into " + table + " values"; - return getInsertBlockForSql(urls, urlIndex, connInfo, sql); - } - - public static Block getInsertBlockForSql(String[] urls, int urlIndex, Properties connInfo, String sql) throws Exception { - Class.forName("com.github.housepower.jdbc.ClickHouseDriver"); - - String insertQuery; - if (sql.trim().toLowerCase().endsWith("values")) { - insertQuery = sql; - } else { - Matcher matcher = VALUES_REGEX.matcher(sql); - Preconditions.checkArgument(matcher.find(), "insert sql syntax error:%s", sql); - insertQuery = sql.substring(0, matcher.end() - 1); - } - LOG.warn("getInsertBlock insertQuery:{}.", insertQuery); - - int retryCount = 0; - while (true) { - retryCount++; - Connection connection = null; - try { - connection = DriverManager.getConnection(urls[urlIndex], connInfo); - Block block = ((ClickHouseConnection) connection).getSampleBlock(insertQuery); - return block; - } catch (SQLException e) { - LOG.error("ClickHouse getInsertBlock Exception url:" + urls[urlIndex], e); - if (retryCount >= 3) { - throw e; - } - urlIndex++; - if (urlIndex == urls.length) { - urlIndex = 0; - } - } finally { - if (connection != null) { - connection.close(); - } - } - } - } - - public static Block newInsertBlockFrom(Block block) throws Exception { - int rowCnt = 0; // (int) blockRowCntField.get(block); - int columnCnt = block.columnCnt(); - BlockSettings settings = (BlockSettings) blockSettingsField.get(block); - IColumn[] srcColumns = (IColumn[]) blockColumnsField.get(block); - IColumn[] columns = new IColumn[columnCnt]; - - for (int i = 0; i < columnCnt; i++) { - String name = srcColumns[i].name(); - IDataType<?, ?> dataType = srcColumns[i].type(); - columns[i] = ColumnFactory.createColumn(name, dataType, null); // values用于rst读取 - } - - Block newBlock = new Block(rowCnt, columns, settings); - return newBlock; - } - - public static void copyInsertBlockColumns(Block src, Block desc) throws Exception { - desc.cleanup(); - - IColumn[] srcColumns = (IColumn[]) blockColumnsField.get(src); - IColumn[] descColumns = (IColumn[]) blockColumnsField.get(desc); - for (int i = 0; i < srcColumns.length; i++) { - descColumns[i] = srcColumns[i]; - } - - blockRowCntField.set(desc, blockRowCntField.get(src)); - } - - public static void resetInsertBlockColumns(Block block) throws Exception { - block.cleanup(); - blockRowCntField.set(block, 0); - - IColumn[] columns = (IColumn[]) blockColumnsField.get(block); - for (int i = 0; i < columns.length; i++) { - String name = columns[i].name(); - IDataType<?, ?> dataType = columns[i].type(); - columns[i] = ColumnFactory.createColumn(name, dataType, null); // values用于rst读取 - } - - block.initWriteBuffer(); - } - - private static Object parseDefaultValue(IDataType<?, ?> type, String defaultExpression) { - Object defaultValue = null; - if (!StringUtils.isBlank(defaultExpression)) { - if (type instanceof DataTypeString || type instanceof DataTypeFixedString) { - defaultValue = defaultExpression; - } else if (type instanceof DataTypeInt8) { - defaultValue = (byte) Integer.parseInt(defaultExpression); - } else if (type instanceof DataTypeUInt8 || type instanceof DataTypeInt16) { - defaultValue = (short) Integer.parseInt(defaultExpression); - } else if (type instanceof DataTypeUInt16 || type instanceof DataTypeInt32) { - defaultValue = Integer.parseInt(defaultExpression); - } else if (type instanceof DataTypeUInt32 || type instanceof DataTypeInt64) { - defaultValue = Long.parseLong(defaultExpression); - } else if (type instanceof DataTypeUInt64) { - defaultValue = BigInteger.valueOf(Long.parseLong(defaultExpression)); - } else if (type instanceof DataTypeFloat32) { - defaultValue = Float.parseFloat(defaultExpression); - } else if (type instanceof DataTypeFloat64) { - defaultValue = Double.parseDouble(defaultExpression); - } else if (type instanceof DataTypeDecimal) { - defaultValue = new BigDecimal(Double.parseDouble(defaultExpression)); - } - } - - return defaultValue; - } - - // 仅用于测试 - public static void showBlockColumnsByteSize(Block block) throws Exception { - IColumn[] columns = (IColumn[]) blockColumnsField.get(block); - Field field = AbstractColumn.class.getDeclaredField("buffer"); - field.setAccessible(true); - Field columnWriterField = ColumnWriterBuffer.class.getDeclaredField("columnWriter"); - columnWriterField.setAccessible(true); - Field byteBufferListField = ByteArrayWriter.class.getDeclaredField("byteBufferList"); - byteBufferListField.setAccessible(true); - int size = 0; - int totalSize = 0; - double unitM = 1 << 20; - LOG.warn("rowCount:" + block.rowCnt()); - for (int i = 0; i < columns.length; i++) { - Object columnWriter = columnWriterField.get(field.get(columns[i])); - List<ByteBuffer> byteBufferList = - (List<ByteBuffer>) byteBufferListField.get(columnWriter); - size = 0; - for (ByteBuffer byteBuffer : byteBufferList) { - size += byteBuffer.position(); - } - totalSize += size; - if (size > unitM) { - // LOG.warn(columns[i].name() + "buf cnt:" + byteBufferList.size() + ", size:" + - // size/unitM + "M"); - } - } - LOG.warn("totalSize:" + totalSize / unitM + "M"); - } -} +package com.geedgenetworks.connectors.clickhouse.util;
+
+import com.github.housepower.buffer.ByteArrayWriter;
+import com.github.housepower.data.*;
+import com.github.housepower.data.type.*;
+import com.github.housepower.data.type.complex.DataTypeArray;
+import com.github.housepower.data.type.complex.DataTypeDecimal;
+import com.github.housepower.data.type.complex.DataTypeFixedString;
+import com.github.housepower.data.type.complex.DataTypeString;
+import com.github.housepower.jdbc.ClickHouseArray;
+import com.github.housepower.jdbc.ClickHouseConnection;
+import com.github.housepower.misc.BytesCharSeq;
+import com.github.housepower.misc.DateTimeUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.*;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class ClickHouseUtils {
+ static final Logger LOG = LoggerFactory.getLogger(ClickHouseUtils.class);
+ static final Pattern VALUES_REGEX = Pattern.compile("[Vv][Aa][Ll][Uu][Ee][Ss]\\s*\\(");
+ static final byte[] EMPTY_BYTES = new byte[0];
+ public static final BytesCharSeq EMPTY_BYTES_CHAR_SEQ = new BytesCharSeq(EMPTY_BYTES);
+ private static Field blockColumnsField;
+ private static Field blockSettingsField;
+ private static Field blockNameAndPositionsField;
+ private static Field blockPlaceholderIndexesField;
+ private static Field blockRowCntField;
+
+ static {
+ try {
+ blockColumnsField = Block.class.getDeclaredField("columns");
+ blockColumnsField.setAccessible(true);
+ blockSettingsField = Block.class.getDeclaredField("settings");
+ blockSettingsField.setAccessible(true);
+ blockNameAndPositionsField = Block.class.getDeclaredField("nameAndPositions");
+ blockNameAndPositionsField.setAccessible(true);
+ blockPlaceholderIndexesField = Block.class.getDeclaredField("placeholderIndexes");
+ blockPlaceholderIndexesField.setAccessible(true);
+ blockRowCntField = Block.class.getDeclaredField("rowCnt");
+ blockRowCntField.setAccessible(true);
+ } catch (NoSuchFieldException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 转换host => jdbc url格式 192.168.40.222:9001,192.168.40.223:9001 =>
+ * jdbc:clickhouse://192.168.40.222:9001,jdbc:clickhouse://192.168.40.223:9001
+ */
+ public static String[] buildUrlsFromHost(String host) {
+ String[] hosts = host.split(",");
+ List<String> urls = new ArrayList<>(hosts.length);
+ for (int i = 0; i < hosts.length; i++) {
+ String[] ipPort = hosts[i].trim().split(":");
+ String ip = ipPort[0].trim();
+ int port = Integer.parseInt(ipPort[1].trim());
+ urls.add("jdbc:clickhouse://" + ip + ":" + port);
+ }
+ return urls.toArray(new String[urls.size()]);
+ }
+
+ public static String genePreparedInsertSql(String table, String[] columnNames) {
+ StringBuilder sb = new StringBuilder("insert into ");
+ sb.append(table).append("(");
+ //sb.append(String.join(",", Arrays.stream(columnNames).map(x -> "`" + x +"`").collect(Collectors.toList())));
+ sb.append(String.join(",", columnNames));
+ sb.append(")");
+ sb.append(" values(");
+ sb.append(String.join(",", Arrays.stream(columnNames).map(x -> "?").collect(Collectors.toList())));
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public static Tuple2<String[], Object[]> getInsertColumnsAndDefaultValuesForTable(
+ String[] urls, int urlIndex, Properties connInfo, String table) throws Exception {
+ Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
+
+ int retryCount = 0;
+ while (true) {
+ retryCount++;
+ ClickHouseConnection connection = null;
+ Statement stmt = null;
+ ResultSet rst = null;
+ try {
+ connection = (ClickHouseConnection) DriverManager.getConnection(urls[urlIndex], connInfo);
+ stmt = connection.createStatement();
+ rst = stmt.executeQuery("desc " + table);
+
+ List<String> columnNames = new ArrayList<>();
+ List<Object> columnDefaultValues = new ArrayList<>();
+ while (rst.next()) {
+ String name = rst.getString("name");
+ String typeStr = rst.getString("type");
+ String defaultTypeStr = rst.getString("default_type");
+ String defaultExpression = rst.getString("default_expression");
+ if ("LowCardinality(String)".equals(typeStr)) {
+ typeStr = "String";
+ }
+ IDataType<?, ?> type = DataTypeFactory.get(typeStr, connection.serverContext());
+ if ("MATERIALIZED".equals(defaultTypeStr)) {
+ continue;
+ }
+ Object defaultValue = parseDefaultValue(type, defaultExpression); // 只解析数字和字符串
+ if (defaultValue == null && !type.nullable()) {
+ if (type instanceof DataTypeArray) {
+ defaultValue = new ClickHouseArray(((DataTypeArray) type).getElemDataType(), new Object[0]);
+ } else {
+ defaultValue = type.defaultValue();
+ }
+ }
+ columnNames.add(name);
+ columnDefaultValues.add(defaultValue);
+ }
+
+ return new Tuple2<>(columnNames.toArray(new String[columnNames.size()]), columnDefaultValues.toArray(new Object[columnDefaultValues.size()]));
+ } catch (SQLException e) {
+ LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e);
+ if (retryCount >= 3) {
+ throw e;
+ }
+ urlIndex++;
+ if (urlIndex == urls.length) {
+ urlIndex = 0;
+ }
+ } finally {
+ if (rst != null) {
+ rst.close();
+ }
+ if (stmt != null) {
+ stmt.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+ }
+
+ public static ZoneId chooseTimeZone(String[] urls, int urlIndex, Properties connInfo) throws Exception {
+ Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
+
+ int retryCount = 0;
+ while (true) {
+ retryCount++;
+ Connection connection = null;
+ try {
+ connection = DriverManager.getConnection(urls[urlIndex], connInfo);
+ ZoneId tz = DateTimeUtil.chooseTimeZone(((ClickHouseConnection) connection).serverContext());
+ return tz;
+ } catch (SQLException e) {
+ LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e);
+ if (retryCount >= 3) {
+ throw e;
+ }
+ urlIndex++;
+ if (urlIndex == urls.length) {
+ urlIndex = 0;
+ }
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+ }
+
+ public static Block getInsertBlockForTable(String[] urls, int urlIndex, Properties connInfo, String table) throws Exception {
+ String sql = "insert into " + table + " values";
+ return getInsertBlockForSql(urls, urlIndex, connInfo, sql);
+ }
+
+ public static Block getInsertBlockForSql(String[] urls, int urlIndex, Properties connInfo, String sql) throws Exception {
+ Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
+
+ String insertQuery;
+ if (sql.trim().toLowerCase().endsWith("values")) {
+ insertQuery = sql;
+ } else {
+ Matcher matcher = VALUES_REGEX.matcher(sql);
+ Preconditions.checkArgument(matcher.find(), "insert sql syntax error:%s", sql);
+ insertQuery = sql.substring(0, matcher.end() - 1);
+ }
+ LOG.warn("getInsertBlock insertQuery:{}.", insertQuery);
+
+ int retryCount = 0;
+ while (true) {
+ retryCount++;
+ Connection connection = null;
+ try {
+ connection = DriverManager.getConnection(urls[urlIndex], connInfo);
+ Block block = ((ClickHouseConnection) connection).getSampleBlock(insertQuery);
+ return block;
+ } catch (SQLException e) {
+ LOG.error("ClickHouse getInsertBlock Exception url:" + urls[urlIndex], e);
+ if (retryCount >= 3) {
+ throw e;
+ }
+ urlIndex++;
+ if (urlIndex == urls.length) {
+ urlIndex = 0;
+ }
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+ }
+
+ public static Block newInsertBlockFrom(Block block) throws Exception {
+ int rowCnt = 0; // (int) blockRowCntField.get(block);
+ int columnCnt = block.columnCnt();
+ BlockSettings settings = (BlockSettings) blockSettingsField.get(block);
+ IColumn[] srcColumns = (IColumn[]) blockColumnsField.get(block);
+ IColumn[] columns = new IColumn[columnCnt];
+
+ for (int i = 0; i < columnCnt; i++) {
+ String name = srcColumns[i].name();
+ IDataType<?, ?> dataType = srcColumns[i].type();
+ columns[i] = ColumnFactory.createColumn(name, dataType, null); // values用于rst读取
+ }
+
+ Block newBlock = new Block(rowCnt, columns, settings);
+ return newBlock;
+ }
+
+ public static void copyInsertBlockColumns(Block src, Block desc) throws Exception {
+ desc.cleanup();
+
+ IColumn[] srcColumns = (IColumn[]) blockColumnsField.get(src);
+ IColumn[] descColumns = (IColumn[]) blockColumnsField.get(desc);
+ for (int i = 0; i < srcColumns.length; i++) {
+ descColumns[i] = srcColumns[i];
+ }
+
+ blockRowCntField.set(desc, blockRowCntField.get(src));
+ }
+
+ public static void resetInsertBlockColumns(Block block) throws Exception {
+ block.cleanup();
+ blockRowCntField.set(block, 0);
+
+ IColumn[] columns = (IColumn[]) blockColumnsField.get(block);
+ for (int i = 0; i < columns.length; i++) {
+ String name = columns[i].name();
+ IDataType<?, ?> dataType = columns[i].type();
+ columns[i] = ColumnFactory.createColumn(name, dataType, null); // values用于rst读取
+ }
+
+ block.initWriteBuffer();
+ }
+
+ private static Object parseDefaultValue(IDataType<?, ?> type, String defaultExpression) {
+ Object defaultValue = null;
+ if (!StringUtils.isBlank(defaultExpression)) {
+ if (type instanceof DataTypeString || type instanceof DataTypeFixedString) {
+ defaultValue = defaultExpression;
+ } else if (type instanceof DataTypeInt8) {
+ defaultValue = (byte) Integer.parseInt(defaultExpression);
+ } else if (type instanceof DataTypeUInt8 || type instanceof DataTypeInt16) {
+ defaultValue = (short) Integer.parseInt(defaultExpression);
+ } else if (type instanceof DataTypeUInt16 || type instanceof DataTypeInt32) {
+ defaultValue = Integer.parseInt(defaultExpression);
+ } else if (type instanceof DataTypeUInt32 || type instanceof DataTypeInt64) {
+ defaultValue = Long.parseLong(defaultExpression);
+ } else if (type instanceof DataTypeUInt64) {
+ defaultValue = BigInteger.valueOf(Long.parseLong(defaultExpression));
+ } else if (type instanceof DataTypeFloat32) {
+ defaultValue = Float.parseFloat(defaultExpression);
+ } else if (type instanceof DataTypeFloat64) {
+ defaultValue = Double.parseDouble(defaultExpression);
+ } else if (type instanceof DataTypeDecimal) {
+ defaultValue = new BigDecimal(Double.parseDouble(defaultExpression));
+ }
+ }
+
+ return defaultValue;
+ }
+
+ public static long getBlockColumnsByteSize(Block block) throws Exception {
+ IColumn[] columns = (IColumn[]) blockColumnsField.get(block);
+ Field field = AbstractColumn.class.getDeclaredField("buffer");
+ field.setAccessible(true);
+ Field columnWriterField = ColumnWriterBuffer.class.getDeclaredField("columnWriter");
+ columnWriterField.setAccessible(true);
+ Field byteBufferListField = ByteArrayWriter.class.getDeclaredField("byteBufferList");
+ byteBufferListField.setAccessible(true);
+ int size = 0;
+ int totalSize = 0;
+
+ for (int i = 0; i < columns.length; i++) {
+ Object columnWriter = columnWriterField.get(field.get(columns[i]));
+ List<ByteBuffer> byteBufferList =
+ (List<ByteBuffer>) byteBufferListField.get(columnWriter);
+ size = 0;
+ for (ByteBuffer byteBuffer : byteBufferList) {
+ size += byteBuffer.position();
+ }
+ totalSize += size;
+ /*if (size > 1000000) {
+ LOG.warn(columns[i].name() + "buf cnt:" + byteBufferList.size() + ", size:" + size/1000000.0 + "M");
+ }*/
+ }
+
+ return totalSize;
+ }
+
+
+}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java index ad9b6aa..aae6678 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java @@ -1,17 +1,21 @@ package com.geedgenetworks.connectors.kafka; import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.metrics.InternalMetrics; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import org.apache.flink.streaming.connectors.kafka.RuntimeContextAware; import org.apache.flink.util.Collector; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class EventKafkaDeserializationSchema implements KafkaDeserializationSchema<Event>{ +public class EventKafkaDeserializationSchema implements KafkaDeserializationSchema<Event>, RuntimeContextAware { private static final Logger LOG = LoggerFactory.getLogger(EventKafkaDeserializationSchema.class); private final DeserializationSchema<Event> valueDeserialization; + private transient InternalMetrics internalMetrics; public EventKafkaDeserializationSchema(DeserializationSchema<Event> valueDeserialization) { this.valueDeserialization = valueDeserialization; @@ -35,19 +39,30 @@ public class EventKafkaDeserializationSchema implements KafkaDeserializationSche @Override public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<Event> out) throws Exception { + internalMetrics.incrementInEvents(); + internalMetrics.incrementInBytes(record.value().length); + try { Event event = valueDeserialization.deserialize(record.value()); if(event != null){ event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, record.timestamp()); out.collect(event); + internalMetrics.incrementOutEvents(); + return; } }catch (Exception e) { LOG.error("反序列化失败", e); } + internalMetrics.incrementErrorEvents(); } @Override public TypeInformation<Event> getProducedType() { return TypeInformation.of(Event.class); } + + @Override + public void runtimeContextAware(RuntimeContext runtimeContext) { + internalMetrics = new InternalMetrics(runtimeContext); + } } diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java index 2ac3223..d300650 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java @@ -7,7 +7,7 @@ import com.geedgenetworks.core.types.StructType; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer2; import java.util.Optional; import java.util.Properties; @@ -33,12 +33,13 @@ public class KafkaSinkProvider implements SinkProvider { @Override public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) { - FlinkKafkaProducer<Event> kafkaProducer = new FlinkKafkaProducer<>( + FlinkKafkaProducer2<Event> kafkaProducer = new FlinkKafkaProducer2<>( topic, valueSerialization, properties, Optional.empty() ); + kafkaProducer.setLogFailuresOnly(true); return dataStream.addSink(kafkaProducer); } } diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java index ead1f3b..ce0ddf8 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java @@ -7,7 +7,7 @@ import com.geedgenetworks.core.types.StructType; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer2; import java.util.List; import java.util.Properties; @@ -34,7 +34,7 @@ public class KafkaSourceProvider implements SourceProvider { @Override public SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env) { - FlinkKafkaConsumer<Event> kafkaConsumer = new FlinkKafkaConsumer<>( + FlinkKafkaConsumer2<Event> kafkaConsumer = new FlinkKafkaConsumer2<>( topics, new EventKafkaDeserializationSchema(valueDeserialization), properties diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer2.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer2.java new file mode 100644 index 0000000..d37121d --- /dev/null +++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer2.java @@ -0,0 +1,55 @@ +package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher2;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+@PublicEvolving
+public class FlinkKafkaConsumer2<T> extends FlinkKafkaConsumer<T> {
+
+ public FlinkKafkaConsumer2(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
+ super(topics, deserializer, props);
+ }
+
+ @Override
+ protected AbstractFetcher<T, ?> createFetcher(
+ SourceContext<T> sourceContext,
+ Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
+ SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
+ StreamingRuntimeContext runtimeContext,
+ OffsetCommitMode offsetCommitMode,
+ MetricGroup consumerMetricGroup,
+ boolean useMetrics)
+ throws Exception {
+
+ // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
+ // this overwrites whatever setting the user configured in the properties
+ adjustAutoCommitConfig(properties, offsetCommitMode);
+
+ return new KafkaFetcher2<>(
+ sourceContext,
+ assignedPartitionsWithInitialOffsets,
+ watermarkStrategy,
+ runtimeContext,
+ runtimeContext.getProcessingTimeService(),
+ runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
+ runtimeContext.getUserCodeClassLoader(),
+ runtimeContext.getTaskNameWithSubtasks(),
+ deserializer,
+ properties,
+ pollTimeout,
+ runtimeContext.getMetricGroup(),
+ consumerMetricGroup,
+ useMetrics);
+ }
+}
diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer2.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer2.java new file mode 100644 index 0000000..818df4e --- /dev/null +++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer2.java @@ -0,0 +1,1909 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import com.geedgenetworks.core.metrics.InternalMetrics; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; +import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kafka.internals.TransactionalIdsGenerator; +import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.NetUtils; +import org.apache.flink.util.TemporaryClassLoaderContext; +import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Flink Sink to produce data into a Kafka topic. By default producer will use {@link + * FlinkKafkaProducer2.Semantic#AT_LEAST_ONCE} semantic. Before using {@link + * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE} please refer to Flink's Kafka connector documentation. + */ +@PublicEvolving +public class FlinkKafkaProducer2<IN> + extends TwoPhaseCommitSinkFunction< + IN, + FlinkKafkaProducer2.KafkaTransactionState, + FlinkKafkaProducer2.KafkaTransactionContext> { + + /** + * Semantics that can be chosen. + * <li>{@link #EXACTLY_ONCE} + * <li>{@link #AT_LEAST_ONCE} + * <li>{@link #NONE} + */ + public enum Semantic { + + /** + * Semantic.EXACTLY_ONCE the Flink producer will write all messages in a Kafka transaction + * that will be committed to Kafka on a checkpoint. + * + * <p>In this mode {@link FlinkKafkaProducer2} sets up a pool of {@link + * FlinkKafkaInternalProducer}. Between each checkpoint a Kafka transaction is created, + * which is committed on {@link FlinkKafkaProducer2#notifyCheckpointComplete(long)}. If + * checkpoint complete notifications are running late, {@link FlinkKafkaProducer2} can run + * out of {@link FlinkKafkaInternalProducer}s in the pool. In that case any subsequent + * {@link FlinkKafkaProducer2#snapshotState(FunctionSnapshotContext)} requests will fail and + * {@link FlinkKafkaProducer2} will keep using the {@link FlinkKafkaInternalProducer} from + * the previous checkpoint. To decrease the chance of failing checkpoints there are four + * options: + * <li>decrease number of max concurrent checkpoints + * <li>make checkpoints more reliable (so that they complete faster) + * <li>increase the delay between checkpoints + * <li>increase the size of {@link FlinkKafkaInternalProducer}s pool + */ + EXACTLY_ONCE, + + /** + * Semantic.AT_LEAST_ONCE the Flink producer will wait for all outstanding messages in the + * Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. + */ + AT_LEAST_ONCE, + + /** + * Semantic.NONE means that nothing will be guaranteed. Messages can be lost and/or + * duplicated in case of failure. + */ + NONE + } + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer2.class); + + private static final long serialVersionUID = 1L; + + /** + * Number of characters to truncate the taskName to for the Kafka transactionalId. The maximum + * this can possibly be set to is 32,767 - (length of operatorUniqueId). + */ + private static final short maxTaskNameSize = 1_000; + + /** + * This coefficient determines what is the safe scale down factor. + * + * <p>If the Flink application previously failed before first checkpoint completed or we are + * starting new batch of {@link FlinkKafkaProducer2} from scratch without clean shutdown of the + * previous one, {@link FlinkKafkaProducer2} doesn't know what was the set of previously used + * Kafka's transactionalId's. In that case, it will try to play safe and abort all of the + * possible transactionalIds from the range of: {@code [0, getNumberOfParallelSubtasks() * + * kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR) } + * + * <p>The range of available to use transactional ids is: {@code [0, + * getNumberOfParallelSubtasks() * kafkaProducersPoolSize) } + * + * <p>This means that if we decrease {@code getNumberOfParallelSubtasks()} by a factor larger + * than {@code SAFE_SCALE_DOWN_FACTOR} we can have a left some lingering transaction. + */ + public static final int SAFE_SCALE_DOWN_FACTOR = 5; + + /** + * Default number of KafkaProducers in the pool. See {@link + * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}. + */ + public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5; + + /** Default value for kafka transaction timeout. */ + public static final Time DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Time.hours(1); + + /** Configuration key for disabling the metrics reporting. */ + public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; + + /** + * Descriptor of the transactional IDs list. Note: This state is serialized by Kryo Serializer + * and it has compatibility problem that will be removed later. Please use + * NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2. + */ + @Deprecated + private static final ListStateDescriptor<FlinkKafkaProducer2.NextTransactionalIdHint> + NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR = + new ListStateDescriptor<>( + "next-transactional-id-hint", + TypeInformation.of(NextTransactionalIdHint.class)); + + private static final ListStateDescriptor<FlinkKafkaProducer2.NextTransactionalIdHint> + NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2 = + new ListStateDescriptor<>( + "next-transactional-id-hint-v2", + new NextTransactionalIdHintSerializer()); + + /** State for nextTransactionalIdHint. */ + private transient ListState<FlinkKafkaProducer2.NextTransactionalIdHint> + nextTransactionalIdHintState; + + /** Generator for Transactional IDs. */ + private transient TransactionalIdsGenerator transactionalIdsGenerator; + + /** Hint for picking next transactional id. */ + private transient FlinkKafkaProducer2.NextTransactionalIdHint nextTransactionalIdHint; + + /** User defined properties for the Producer. */ + protected final Properties producerConfig; + + /** The name of the default topic this producer is writing data to. */ + protected final String defaultTopicId; + + /** + * (Serializable) SerializationSchema for turning objects used with Flink into. byte[] for + * Kafka. + */ + @Nullable private final KeyedSerializationSchema<IN> keyedSchema; + + /** + * (Serializable) serialization schema for serializing records to {@link ProducerRecord + * ProducerRecords}. + */ + @Nullable private final KafkaSerializationSchema<IN> kafkaSchema; + + /** User-provided partitioner for assigning an object to a Kafka partition for each topic. */ + @Nullable private final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner; + + /** Partitions of each topic. */ + protected final Map<String, int[]> topicPartitionsMap; + + /** + * Max number of producers in the pool. If all producers are in use, snapshoting state will + * throw an exception. + */ + private final int kafkaProducersPoolSize; + + /** Pool of available transactional ids. */ + private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque<>(); + + /** Flag controlling whether we are writing the Flink record's timestamp into Kafka. */ + protected boolean writeTimestampToKafka = false; + + /** Flag indicating whether to accept failures (and log them), or to fail on failures. */ + private boolean logFailuresOnly; + + /** Semantic chosen for this instance. */ + protected FlinkKafkaProducer2.Semantic semantic; + + // -------------------------------- Runtime fields ------------------------------------------ + + /** The callback than handles error propagation or logging callbacks. */ + @Nullable protected transient Callback callback; + + /** Errors encountered in the async producer are stored here. */ + @Nullable protected transient volatile Exception asyncException; + + /** Number of unacknowledged records. */ + protected final AtomicLong pendingRecords = new AtomicLong(); + + /** + * Cache of metrics to replace already registered metrics instead of overwriting existing ones. + */ + private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap<>(); + + private transient InternalMetrics internalMetrics; + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic. + * + * @param brokerList Comma separated addresses of the brokers + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined (keyless) serialization schema. + */ + public FlinkKafkaProducer2( + String brokerList, String topicId, SerializationSchema<IN> serializationSchema) { + this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList)); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic. + * + * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as the + * partitioner. This default partitioner maps each sink subtask to a single Kafka partition + * (i.e. all records received by a sink subtask will end up in the same Kafka partition). + * + * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer2(String, + * SerializationSchema, Properties, Optional)} instead. + * + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined key-less serialization schema. + * @param producerConfig Properties with the producer configuration. + */ + public FlinkKafkaProducer2( + String topicId, + SerializationSchema<IN> serializationSchema, + Properties producerConfig) { + this( + topicId, + serializationSchema, + producerConfig, + Optional.of(new FlinkFixedPartitioner<>())); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It + * accepts a key-less {@link SerializationSchema} and possibly a custom {@link + * FlinkKafkaPartitioner}. + * + * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not + * have an attached key. Therefore, if a partitioner is also not provided, records will be + * distributed to Kafka partitions in a round-robin fashion. + * + * @param topicId The topic to write data to + * @param serializationSchema A key-less serializable serialization schema for turning user + * objects into a kafka-consumable byte[] + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is + * the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka + * partitions. If a partitioner is not provided, records will be distributed to Kafka + * partitions in a round-robin fashion. + */ + public FlinkKafkaProducer2( + String topicId, + SerializationSchema<IN> serializationSchema, + Properties producerConfig, + Optional<FlinkKafkaPartitioner<IN>> customPartitioner) { + this( + topicId, + serializationSchema, + producerConfig, + customPartitioner.orElse(null), + Semantic.AT_LEAST_ONCE, + DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It + * accepts a key-less {@link SerializationSchema} and possibly a custom {@link + * FlinkKafkaPartitioner}. + * + * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not + * have an attached key. Therefore, if a partitioner is also not provided, records will be + * distributed to Kafka partitions in a round-robin fashion. + * + * @param topicId The topic to write data to + * @param serializationSchema A key-less serializable serialization schema for turning user + * objects into a kafka-consumable byte[] + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is + * the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka + * partitions. If a partitioner is not provided, records will be distributed to Kafka + * partitions in a round-robin fashion. + * @param semantic Defines semantic that will be used by this producer (see {@link + * FlinkKafkaProducer2.Semantic}). + * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link + * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}). + */ + public FlinkKafkaProducer2( + String topicId, + SerializationSchema<IN> serializationSchema, + Properties producerConfig, + @Nullable FlinkKafkaPartitioner<IN> customPartitioner, + FlinkKafkaProducer2.Semantic semantic, + int kafkaProducersPoolSize) { + this( + topicId, + null, + null, + new KafkaSerializationSchemaWrapper<>( + topicId, customPartitioner, false, serializationSchema), + producerConfig, + semantic, + kafkaProducersPoolSize); + } + + // ------------------- Key/Value serialization schema constructors ---------------------- + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic. + * + * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as the + * partitioner. This default partitioner maps each sink subtask to a single Kafka partition + * (i.e. all records received by a sink subtask will end up in the same Kafka partition). + * + * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer2(String, + * KeyedSerializationSchema, Properties, Optional)} instead. + * + * @param brokerList Comma separated addresses of the brokers + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined serialization schema supporting key/value messages + * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties, + * FlinkKafkaProducer2.Semantic)} + */ + @Deprecated + public FlinkKafkaProducer2( + String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) { + this( + topicId, + serializationSchema, + getPropertiesFromBrokerList(brokerList), + Optional.of(new FlinkFixedPartitioner<IN>())); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic. + * + * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as the + * partitioner. This default partitioner maps each sink subtask to a single Kafka partition + * (i.e. all records received by a sink subtask will end up in the same Kafka partition). + * + * <p>To use a custom partitioner, please use {@link #FlinkKafkaProducer2(String, + * KeyedSerializationSchema, Properties, Optional)} instead. + * + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined serialization schema supporting key/value messages + * @param producerConfig Properties with the producer configuration. + * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties, + * FlinkKafkaProducer2.Semantic)} + */ + @Deprecated + public FlinkKafkaProducer2( + String topicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig) { + this( + topicId, + serializationSchema, + producerConfig, + Optional.of(new FlinkFixedPartitioner<IN>())); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic. + * + * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as the + * partitioner. This default partitioner maps each sink subtask to a single Kafka partition + * (i.e. all records received by a sink subtask will end up in the same Kafka partition). + * + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined serialization schema supporting key/value messages + * @param producerConfig Properties with the producer configuration. + * @param semantic Defines semantic that will be used by this producer (see {@link + * FlinkKafkaProducer2.Semantic}). + * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties, + * FlinkKafkaProducer2.Semantic)} + */ + @Deprecated + public FlinkKafkaProducer2( + String topicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig, + FlinkKafkaProducer2.Semantic semantic) { + this( + topicId, + serializationSchema, + producerConfig, + Optional.of(new FlinkFixedPartitioner<IN>()), + semantic, + DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It + * accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link + * FlinkKafkaPartitioner}. + * + * <p>If a partitioner is not provided, written records will be partitioned by the attached key + * of each record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If + * written records do not have a key (i.e., {@link + * KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they will be + * distributed to Kafka partitions in a round-robin fashion. + * + * @param defaultTopicId The default topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into + * a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is + * the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka + * partitions. If a partitioner is not provided, records will be partitioned by the key of + * each record (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the + * keys are {@code null}, then records will be distributed to Kafka partitions in a + * round-robin fashion. + * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties, + * FlinkKafkaProducer2.Semantic)} + */ + @Deprecated + public FlinkKafkaProducer2( + String defaultTopicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig, + Optional<FlinkKafkaPartitioner<IN>> customPartitioner) { + this( + defaultTopicId, + serializationSchema, + producerConfig, + customPartitioner, + FlinkKafkaProducer2.Semantic.AT_LEAST_ONCE, + DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It + * accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link + * FlinkKafkaPartitioner}. + * + * <p>If a partitioner is not provided, written records will be partitioned by the attached key + * of each record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If + * written records do not have a key (i.e., {@link + * KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they will be + * distributed to Kafka partitions in a round-robin fashion. + * + * @param defaultTopicId The default topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into + * a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is + * the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka + * partitions. If a partitioner is not provided, records will be partitioned by the key of + * each record (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the + * keys are {@code null}, then records will be distributed to Kafka partitions in a + * round-robin fashion. + * @param semantic Defines semantic that will be used by this producer (see {@link + * FlinkKafkaProducer2.Semantic}). + * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link + * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}). + * @deprecated use {@link #FlinkKafkaProducer2(String, KafkaSerializationSchema, Properties, + * FlinkKafkaProducer2.Semantic)} + */ + @Deprecated + public FlinkKafkaProducer2( + String defaultTopicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig, + Optional<FlinkKafkaPartitioner<IN>> customPartitioner, + FlinkKafkaProducer2.Semantic semantic, + int kafkaProducersPoolSize) { + this( + defaultTopicId, + serializationSchema, + customPartitioner.orElse(null), + null, /* kafka serialization schema */ + producerConfig, + semantic, + kafkaProducersPoolSize); + } + + /** + * Creates a {@link FlinkKafkaProducer2} for a given topic. The sink produces its input to the + * topic. It accepts a {@link KafkaSerializationSchema} for serializing records to a {@link + * ProducerRecord}, including partitioning information. + * + * @param defaultTopic The default topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into + * a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is + * the only required argument. + * @param semantic Defines semantic that will be used by this producer (see {@link + * FlinkKafkaProducer2.Semantic}). + */ + public FlinkKafkaProducer2( + String defaultTopic, + KafkaSerializationSchema<IN> serializationSchema, + Properties producerConfig, + FlinkKafkaProducer2.Semantic semantic) { + this( + defaultTopic, + serializationSchema, + producerConfig, + semantic, + DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It + * accepts a {@link KafkaSerializationSchema} and possibly a custom {@link + * FlinkKafkaPartitioner}. + * + * @param defaultTopic The default topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into + * a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is + * the only required argument. + * @param semantic Defines semantic that will be used by this producer (see {@link + * FlinkKafkaProducer2.Semantic}). + * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link + * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}). + */ + public FlinkKafkaProducer2( + String defaultTopic, + KafkaSerializationSchema<IN> serializationSchema, + Properties producerConfig, + FlinkKafkaProducer2.Semantic semantic, + int kafkaProducersPoolSize) { + this( + defaultTopic, + null, + null, /* keyed schema and FlinkKafkaPartitioner */ + serializationSchema, + producerConfig, + semantic, + kafkaProducersPoolSize); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It + * accepts a {@link KafkaSerializationSchema} and possibly a custom {@link + * FlinkKafkaPartitioner}. + * + * <p>If a partitioner is not provided, written records will be partitioned by the attached key + * of each record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If + * written records do not have a key (i.e., {@link + * KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they will be + * distributed to Kafka partitions in a round-robin fashion. + * + * @param defaultTopic The default topic to write data to + * @param keyedSchema A serializable serialization schema for turning user objects into a + * kafka-consumable byte[] supporting key/value messages + * @param customPartitioner A serializable partitioner for assigning messages to Kafka + * partitions. If a partitioner is not provided, records will be partitioned by the key of + * each record (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the + * keys are {@code null}, then records will be distributed to Kafka partitions in a + * round-robin fashion. + * @param kafkaSchema A serializable serialization schema for turning user objects into a + * kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is + * the only required argument. + * @param semantic Defines semantic that will be used by this producer (see {@link + * FlinkKafkaProducer2.Semantic}). + * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link + * FlinkKafkaProducer2.Semantic#EXACTLY_ONCE}). + */ + private FlinkKafkaProducer2( + String defaultTopic, + KeyedSerializationSchema<IN> keyedSchema, + FlinkKafkaPartitioner<IN> customPartitioner, + KafkaSerializationSchema<IN> kafkaSchema, + Properties producerConfig, + FlinkKafkaProducer2.Semantic semantic, + int kafkaProducersPoolSize) { + super( + new FlinkKafkaProducer2.TransactionStateSerializer(), + new FlinkKafkaProducer2.ContextStateSerializer()); + + this.defaultTopicId = checkNotNull(defaultTopic, "defaultTopic is null"); + + if (kafkaSchema != null) { + this.keyedSchema = null; + this.kafkaSchema = kafkaSchema; + this.flinkKafkaPartitioner = null; + ClosureCleaner.clean( + this.kafkaSchema, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + + if (customPartitioner != null) { + throw new IllegalArgumentException( + "Customer partitioner can only be used when" + + "using a KeyedSerializationSchema or SerializationSchema."); + } + } else if (keyedSchema != null) { + this.kafkaSchema = null; + this.keyedSchema = keyedSchema; + this.flinkKafkaPartitioner = customPartitioner; + ClosureCleaner.clean( + this.flinkKafkaPartitioner, + ExecutionConfig.ClosureCleanerLevel.RECURSIVE, + true); + ClosureCleaner.clean( + this.keyedSchema, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + } else { + throw new IllegalArgumentException( + "You must provide either a KafkaSerializationSchema or a" + + "KeyedSerializationSchema."); + } + + this.producerConfig = checkNotNull(producerConfig, "producerConfig is null"); + this.semantic = checkNotNull(semantic, "semantic is null"); + this.kafkaProducersPoolSize = kafkaProducersPoolSize; + checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize must be non empty"); + + // set the producer configuration properties for kafka record key value serializers. + if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { + this.producerConfig.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + ByteArraySerializer.class.getName()); + } else { + LOG.warn( + "Overwriting the '{}' is not recommended", + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); + } + + if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { + this.producerConfig.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + ByteArraySerializer.class.getName()); + } else { + LOG.warn( + "Overwriting the '{}' is not recommended", + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); + } + + // eagerly ensure that bootstrap servers are set. + if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) { + throw new IllegalArgumentException( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + + " must be supplied in the producer config properties."); + } + + if (!producerConfig.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) { + long timeout = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMilliseconds(); + checkState( + timeout < Integer.MAX_VALUE && timeout > 0, + "timeout does not fit into 32 bit integer"); + this.producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) timeout); + LOG.warn( + "Property [{}] not specified. Setting it to {}", + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, + DEFAULT_KAFKA_TRANSACTION_TIMEOUT); + } + + // Enable transactionTimeoutWarnings to avoid silent data loss + // See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1): + // The KafkaProducer may not throw an exception if the transaction failed to commit + if (semantic == FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) { + final Object object = + this.producerConfig.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG); + final long transactionTimeout; + if (object instanceof String && StringUtils.isNumeric((String) object)) { + transactionTimeout = Long.parseLong((String) object); + } else if (object instanceof Number) { + transactionTimeout = ((Number) object).longValue(); + } else { + throw new IllegalArgumentException( + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG + + " must be numeric, was " + + object); + } + super.setTransactionTimeout(transactionTimeout); + super.enableTransactionTimeoutWarnings(0.8); + } + + this.topicPartitionsMap = new HashMap<>(); + } + + // ---------------------------------- Properties -------------------------- + + /** + * If set to true, Flink will write the (event time) timestamp attached to each record into + * Kafka. Timestamps must be positive for Kafka to accept them. + * + * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to + * Kafka. + */ + public void setWriteTimestampToKafka(boolean writeTimestampToKafka) { + this.writeTimestampToKafka = writeTimestampToKafka; + if (kafkaSchema instanceof KafkaSerializationSchemaWrapper) { + ((KafkaSerializationSchemaWrapper<IN>) kafkaSchema) + .setWriteTimestamp(writeTimestampToKafka); + } + } + + /** + * Defines whether the producer should fail on errors, or only log them. If this is set to true, + * then exceptions will be only logged, if set to false, exceptions will be eventually thrown + * and cause the streaming program to fail (and enter recovery). + * + * @param logFailuresOnly The flag to indicate logging-only on exceptions. + */ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.logFailuresOnly = logFailuresOnly; + } + + /** + * Disables the propagation of exceptions thrown when committing presumably timed out Kafka + * transactions during recovery of the job. If a Kafka transaction is timed out, a commit will + * never be successful. Hence, use this feature to avoid recovery loops of the Job. Exceptions + * will still be logged to inform the user that data loss might have occurred. + * + * <p>Note that we use {@link System#currentTimeMillis()} to track the age of a transaction. + * Moreover, only exceptions thrown during the recovery are caught, i.e., the producer will + * attempt at least one commit of the transaction before giving up. + */ + @Override + public FlinkKafkaProducer2<IN> ignoreFailuresAfterTransactionTimeout() { + super.ignoreFailuresAfterTransactionTimeout(); + return this; + } + + // ----------------------------------- Utilities -------------------------- + + /** Initializes the connection to Kafka. */ + @Override + public void open(Configuration configuration) throws Exception { + internalMetrics = new InternalMetrics(getRuntimeContext()); + if (logFailuresOnly) { + callback = + new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception e) { + if (e != null) { + internalMetrics.incrementErrorEvents(); + LOG.error( + "Error while sending record to Kafka: " + e.getMessage(), + e); + }else{ + internalMetrics.incrementOutEvents(); + internalMetrics.incrementOutBytes(metadata.serializedValueSize()); + } + + acknowledgeMessage(); + } + }; + } else { + callback = + new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + internalMetrics.incrementErrorEvents(); + if(asyncException == null){ + asyncException = exception; + } + }else{ + internalMetrics.incrementOutEvents(); + internalMetrics.incrementOutBytes(metadata.serializedValueSize()); + } + + acknowledgeMessage(); + } + }; + } + + RuntimeContext ctx = getRuntimeContext(); + + if (flinkKafkaPartitioner != null) { + flinkKafkaPartitioner.open( + ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks()); + } + + if (kafkaSchema instanceof KafkaContextAware) { + KafkaContextAware<IN> contextAwareSchema = (KafkaContextAware<IN>) kafkaSchema; + contextAwareSchema.setParallelInstanceId(ctx.getIndexOfThisSubtask()); + contextAwareSchema.setNumParallelInstances(ctx.getNumberOfParallelSubtasks()); + } + + if (kafkaSchema != null) { + kafkaSchema.open( + RuntimeContextInitializationContextAdapters.serializationAdapter( + getRuntimeContext(), metricGroup -> metricGroup.addGroup("user"))); + } + + if (keyedSchema == null && kafkaSchema == null){ + throw new RuntimeException( + "We have neither KafkaSerializationSchema nor KeyedSerializationSchema, this" + + "is a bug."); + } + + super.open(configuration); + } + + @Override + public void invoke( + FlinkKafkaProducer2.KafkaTransactionState transaction, IN next, Context context) + throws FlinkKafkaException { + checkErroneous(); + + internalMetrics.incrementInEvents(); + try { + ProducerRecord<byte[], byte[]> record; + if (keyedSchema != null) { + byte[] serializedKey = keyedSchema.serializeKey(next); + byte[] serializedValue = keyedSchema.serializeValue(next); + String targetTopic = keyedSchema.getTargetTopic(next); + if (targetTopic == null) { + targetTopic = defaultTopicId; + } + + Long timestamp = null; + if (this.writeTimestampToKafka) { + timestamp = context.timestamp(); + } + + int[] partitions = topicPartitionsMap.get(targetTopic); + if (null == partitions) { + partitions = getPartitionsByTopic(targetTopic, transaction.producer); + topicPartitionsMap.put(targetTopic, partitions); + } + if (flinkKafkaPartitioner != null) { + record = + new ProducerRecord<>( + targetTopic, + flinkKafkaPartitioner.partition( + next, + serializedKey, + serializedValue, + targetTopic, + partitions), + timestamp, + serializedKey, + serializedValue); + } else { + record = + new ProducerRecord<>( + targetTopic, null, timestamp, serializedKey, serializedValue); + } + } else { + if (kafkaSchema instanceof KafkaContextAware) { + @SuppressWarnings("unchecked") + KafkaContextAware<IN> contextAwareSchema = (KafkaContextAware<IN>) kafkaSchema; + + String targetTopic = contextAwareSchema.getTargetTopic(next); + if (targetTopic == null) { + targetTopic = defaultTopicId; + } + int[] partitions = topicPartitionsMap.get(targetTopic); + + if (null == partitions) { + partitions = getPartitionsByTopic(targetTopic, transaction.producer); + topicPartitionsMap.put(targetTopic, partitions); + } + + contextAwareSchema.setPartitions(partitions); + } + record = kafkaSchema.serialize(next, context.timestamp()); + } + + pendingRecords.incrementAndGet(); + transaction.producer.send(record, callback); + } catch (Exception e) { + internalMetrics.incrementErrorEvents(); + LOG.error("serialize error", e); + } + } + + @Override + public void close() throws FlinkKafkaException { + // First close the producer for current transaction. + try { + final KafkaTransactionState currentTransaction = currentTransaction(); + if (currentTransaction != null) { + // to avoid exceptions on aborting transactions with some pending records + flush(currentTransaction); + + // normal abort for AT_LEAST_ONCE and NONE do not clean up resources because of + // producer reusing, thus + // we need to close it manually + switch (semantic) { + case EXACTLY_ONCE: + break; + case AT_LEAST_ONCE: + case NONE: + currentTransaction.producer.flush(); + currentTransaction.producer.close(Duration.ofSeconds(0)); + break; + } + } + super.close(); + } catch (Exception e) { + asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException); + } finally { + // We may have to close producer of the current transaction in case some exception was + // thrown before + // the normal close routine finishes. + if (currentTransaction() != null) { + try { + currentTransaction().producer.close(Duration.ofSeconds(0)); + } catch (Throwable t) { + LOG.warn("Error closing producer.", t); + } + } + // Make sure all the producers for pending transactions are closed. + pendingTransactions() + .forEach( + transaction -> { + try { + transaction.getValue().producer.close(Duration.ofSeconds(0)); + } catch (Throwable t) { + LOG.warn("Error closing producer.", t); + } + }); + // make sure we propagate pending errors + checkErroneous(); + } + } + + // ------------------- Logic for handling checkpoint flushing -------------------------- // + + @Override + protected FlinkKafkaProducer2.KafkaTransactionState beginTransaction() + throws FlinkKafkaException { + switch (semantic) { + case EXACTLY_ONCE: + FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer(); + producer.beginTransaction(); + return new FlinkKafkaProducer2.KafkaTransactionState( + producer.getTransactionalId(), producer); + case AT_LEAST_ONCE: + case NONE: + // Do not create new producer on each beginTransaction() if it is not necessary + final FlinkKafkaProducer2.KafkaTransactionState currentTransaction = + currentTransaction(); + if (currentTransaction != null && currentTransaction.producer != null) { + return new FlinkKafkaProducer2.KafkaTransactionState( + currentTransaction.producer); + } + return new FlinkKafkaProducer2.KafkaTransactionState( + initNonTransactionalProducer(true)); + default: + throw new UnsupportedOperationException("Not implemented semantic"); + } + } + + @Override + protected void preCommit(FlinkKafkaProducer2.KafkaTransactionState transaction) + throws FlinkKafkaException { + switch (semantic) { + case EXACTLY_ONCE: + case AT_LEAST_ONCE: + flush(transaction); + break; + case NONE: + break; + default: + throw new UnsupportedOperationException("Not implemented semantic"); + } + checkErroneous(); + } + + @Override + protected void commit(FlinkKafkaProducer2.KafkaTransactionState transaction) { + if (transaction.isTransactional()) { + try { + transaction.producer.commitTransaction(); + } finally { + recycleTransactionalProducer(transaction.producer); + } + } + } + + @Override + protected void recoverAndCommit(FlinkKafkaProducer2.KafkaTransactionState transaction) { + if (transaction.isTransactional()) { + FlinkKafkaInternalProducer<byte[], byte[]> producer = null; + try { + producer = initTransactionalProducer(transaction.transactionalId, false); + producer.resumeTransaction(transaction.producerId, transaction.epoch); + producer.commitTransaction(); + } catch (InvalidTxnStateException | ProducerFencedException ex) { + // That means we have committed this transaction before. + LOG.warn( + "Encountered error {} while recovering transaction {}. " + + "Presumably this transaction has been already committed before", + ex, + transaction); + } finally { + if (producer != null) { + producer.close(0, TimeUnit.SECONDS); + } + } + } + } + + @Override + protected void abort(FlinkKafkaProducer2.KafkaTransactionState transaction) { + if (transaction.isTransactional()) { + transaction.producer.abortTransaction(); + recycleTransactionalProducer(transaction.producer); + } + } + + @Override + protected void recoverAndAbort(FlinkKafkaProducer2.KafkaTransactionState transaction) { + if (transaction.isTransactional()) { + FlinkKafkaInternalProducer<byte[], byte[]> producer = null; + try { + producer = initTransactionalProducer(transaction.transactionalId, false); + producer.initTransactions(); + } finally { + if (producer != null) { + producer.close(0, TimeUnit.SECONDS); + } + } + } + } + + /** + * <b>ATTENTION to subclass implementors:</b> When overriding this method, please always call + * {@code super.acknowledgeMessage()} to keep the invariants of the internal bookkeeping of the + * producer. If not, be sure to know what you are doing. + */ + protected void acknowledgeMessage() { + pendingRecords.decrementAndGet(); + } + + /** + * Flush pending records. + * + * @param transaction + */ + private void flush(FlinkKafkaProducer2.KafkaTransactionState transaction) + throws FlinkKafkaException { + if (transaction.producer != null) { + transaction.producer.flush(); + } + long pendingRecordsCount = pendingRecords.get(); + if (pendingRecordsCount != 0) { + throw new IllegalStateException( + "Pending record count must be zero at this point: " + pendingRecordsCount); + } + + // if the flushed requests has errors, we should propagate it also and fail the checkpoint + checkErroneous(); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + super.snapshotState(context); + + nextTransactionalIdHintState.clear(); + // To avoid duplication only first subtask keeps track of next transactional id hint. + // Otherwise all of the + // subtasks would write exactly same information. + if (getRuntimeContext().getIndexOfThisSubtask() == 0 + && semantic == FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) { + checkState( + nextTransactionalIdHint != null, + "nextTransactionalIdHint must be set for EXACTLY_ONCE"); + long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId; + + // If we scaled up, some (unknown) subtask must have created new transactional ids from + // scratch. In that + // case we adjust nextFreeTransactionalId by the range of transactionalIds that could be + // used for this + // scaling up. + if (getRuntimeContext().getNumberOfParallelSubtasks() + > nextTransactionalIdHint.lastParallelism) { + nextFreeTransactionalId += + getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize; + } + + nextTransactionalIdHintState.add( + new FlinkKafkaProducer2.NextTransactionalIdHint( + getRuntimeContext().getNumberOfParallelSubtasks(), + nextFreeTransactionalId)); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + if (semantic != FlinkKafkaProducer2.Semantic.NONE + && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) { + LOG.warn( + "Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.", + semantic, + FlinkKafkaProducer2.Semantic.NONE); + semantic = FlinkKafkaProducer2.Semantic.NONE; + } + + nextTransactionalIdHintState = + context.getOperatorStateStore() + .getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2); + + if (context.getOperatorStateStore() + .getRegisteredStateNames() + .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) { + migrateNextTransactionalIdHindState(context); + } + + String taskName = getRuntimeContext().getTaskName(); + // Kafka transactional IDs are limited in length to be less than the max value of a short, + // so we truncate here if necessary to a more reasonable length string. + if (taskName.length() > maxTaskNameSize) { + taskName = taskName.substring(0, maxTaskNameSize); + LOG.warn( + "Truncated task name for Kafka TransactionalId from {} to {}.", + getRuntimeContext().getTaskName(), + taskName); + } + transactionalIdsGenerator = + new TransactionalIdsGenerator( + taskName + + "-" + + ((StreamingRuntimeContext) getRuntimeContext()) + .getOperatorUniqueID(), + getRuntimeContext().getIndexOfThisSubtask(), + getRuntimeContext().getNumberOfParallelSubtasks(), + kafkaProducersPoolSize, + SAFE_SCALE_DOWN_FACTOR); + + if (semantic != FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) { + nextTransactionalIdHint = null; + } else { + ArrayList<FlinkKafkaProducer2.NextTransactionalIdHint> transactionalIdHints = + Lists.newArrayList(nextTransactionalIdHintState.get()); + if (transactionalIdHints.size() > 1) { + throw new IllegalStateException( + "There should be at most one next transactional id hint written by the first subtask"); + } else if (transactionalIdHints.size() == 0) { + nextTransactionalIdHint = new FlinkKafkaProducer2.NextTransactionalIdHint(0, 0); + + // this means that this is either: + // (1) the first execution of this application + // (2) previous execution has failed before first checkpoint completed + // + // in case of (2) we have to abort all previous transactions + abortTransactions(transactionalIdsGenerator.generateIdsToAbort()); + } else { + nextTransactionalIdHint = transactionalIdHints.get(0); + } + } + + super.initializeState(context); + } + + @Override + protected Optional<FlinkKafkaProducer2.KafkaTransactionContext> initializeUserContext() { + if (semantic != FlinkKafkaProducer2.Semantic.EXACTLY_ONCE) { + return Optional.empty(); + } + + Set<String> transactionalIds = generateNewTransactionalIds(); + resetAvailableTransactionalIdsPool(transactionalIds); + return Optional.of(new FlinkKafkaProducer2.KafkaTransactionContext(transactionalIds)); + } + + private Set<String> generateNewTransactionalIds() { + checkState( + nextTransactionalIdHint != null, + "nextTransactionalIdHint must be present for EXACTLY_ONCE"); + + Set<String> transactionalIds = + transactionalIdsGenerator.generateIdsToUse( + nextTransactionalIdHint.nextFreeTransactionalId); + LOG.info("Generated new transactionalIds {}", transactionalIds); + return transactionalIds; + } + + @Override + protected void finishRecoveringContext( + Collection<FlinkKafkaProducer2.KafkaTransactionState> handledTransactions) { + cleanUpUserContext(handledTransactions); + resetAvailableTransactionalIdsPool(getUserContext().get().transactionalIds); + LOG.info("Recovered transactionalIds {}", getUserContext().get().transactionalIds); + } + + protected FlinkKafkaInternalProducer<byte[], byte[]> createProducer() { + return new FlinkKafkaInternalProducer<>(this.producerConfig); + } + + /** + * After initialization make sure that all previous transactions from the current user context + * have been completed. + * + * @param handledTransactions transactions which were already committed or aborted and do not + * need further handling + */ + private void cleanUpUserContext( + Collection<FlinkKafkaProducer2.KafkaTransactionState> handledTransactions) { + if (!getUserContext().isPresent()) { + return; + } + HashSet<String> abortTransactions = new HashSet<>(getUserContext().get().transactionalIds); + handledTransactions.forEach( + kafkaTransactionState -> + abortTransactions.remove(kafkaTransactionState.transactionalId)); + abortTransactions(abortTransactions); + } + + private void resetAvailableTransactionalIdsPool(Collection<String> transactionalIds) { + availableTransactionalIds.clear(); + availableTransactionalIds.addAll(transactionalIds); + } + + // ----------------------------------- Utilities -------------------------- + + private void abortTransactions(final Set<String> transactionalIds) { + final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + transactionalIds + .parallelStream() + .forEach( + transactionalId -> { + // The parallelStream executes the consumer in a separated thread pool. + // Because the consumer(e.g. Kafka) uses the context classloader to + // construct some class + // we should set the correct classloader for it. + try (TemporaryClassLoaderContext ignored = + TemporaryClassLoaderContext.of(classLoader)) { + // don't mess with the original configuration or any other + // properties of the + // original object + // -> create an internal kafka producer on our own and do not rely + // on + // initTransactionalProducer(). + final Properties myConfig = new Properties(); + myConfig.putAll(producerConfig); + initTransactionalProducerConfig(myConfig, transactionalId); + FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer = null; + try { + kafkaProducer = new FlinkKafkaInternalProducer<>(myConfig); + // it suffices to call initTransactions - this will abort any + // lingering transactions + kafkaProducer.initTransactions(); + } finally { + if (kafkaProducer != null) { + kafkaProducer.close(Duration.ofSeconds(0)); + } + } + } + }); + } + + int getTransactionCoordinatorId() { + final FlinkKafkaProducer2.KafkaTransactionState currentTransaction = currentTransaction(); + if (currentTransaction == null || currentTransaction.producer == null) { + throw new IllegalArgumentException(); + } + return currentTransaction.producer.getTransactionCoordinatorId(); + } + + /** + * For each checkpoint we create new {@link FlinkKafkaInternalProducer} so that new transactions + * will not clash with transactions created during previous checkpoints ({@code + * producer.initTransactions()} assures that we obtain new producerId and epoch counters). + */ + private FlinkKafkaInternalProducer<byte[], byte[]> createTransactionalProducer() + throws FlinkKafkaException { + String transactionalId = availableTransactionalIds.poll(); + if (transactionalId == null) { + throw new FlinkKafkaException( + FlinkKafkaErrorCode.PRODUCERS_POOL_EMPTY, + "Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints."); + } + FlinkKafkaInternalProducer<byte[], byte[]> producer = + initTransactionalProducer(transactionalId, true); + producer.initTransactions(); + return producer; + } + + private void recycleTransactionalProducer(FlinkKafkaInternalProducer<byte[], byte[]> producer) { + availableTransactionalIds.add(producer.getTransactionalId()); + producer.flush(); + producer.close(Duration.ofSeconds(0)); + } + + private FlinkKafkaInternalProducer<byte[], byte[]> initTransactionalProducer( + String transactionalId, boolean registerMetrics) { + initTransactionalProducerConfig(producerConfig, transactionalId); + return initProducer(registerMetrics); + } + + private static void initTransactionalProducerConfig( + Properties producerConfig, String transactionalId) { + producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); + } + + private FlinkKafkaInternalProducer<byte[], byte[]> initNonTransactionalProducer( + boolean registerMetrics) { + producerConfig.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG); + return initProducer(registerMetrics); + } + + private FlinkKafkaInternalProducer<byte[], byte[]> initProducer(boolean registerMetrics) { + FlinkKafkaInternalProducer<byte[], byte[]> producer = createProducer(); + + LOG.info( + "Starting FlinkKafkaInternalProducer ({}/{}) to produce into default topic {}", + getRuntimeContext().getIndexOfThisSubtask() + 1, + getRuntimeContext().getNumberOfParallelSubtasks(), + defaultTopicId); + + // register Kafka metrics to Flink accumulators + if (registerMetrics + && !Boolean.parseBoolean( + producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) { + Map<MetricName, ? extends Metric> metrics = producer.metrics(); + + if (metrics == null) { + // MapR's Kafka implementation returns null here. + LOG.info("Producer implementation does not support metrics"); + } else { + final MetricGroup kafkaMetricGroup = + getRuntimeContext().getMetricGroup().addGroup("KafkaProducer"); + for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) { + String name = entry.getKey().name(); + Metric metric = entry.getValue(); + + KafkaMetricMutableWrapper wrapper = previouslyCreatedMetrics.get(name); + if (wrapper != null) { + wrapper.setKafkaMetric(metric); + } else { + // TODO: somehow merge metrics from all active producers? + wrapper = new KafkaMetricMutableWrapper(metric); + previouslyCreatedMetrics.put(name, wrapper); + kafkaMetricGroup.gauge(name, wrapper); + } + } + } + } + return producer; + } + + protected void checkErroneous() throws FlinkKafkaException { + Exception e = asyncException; + if (e != null) { + // prevent double throwing + asyncException = null; + throw new FlinkKafkaException( + FlinkKafkaErrorCode.EXTERNAL_ERROR, + "Failed to send data to Kafka: " + e.getMessage(), + e); + } + } + + private void readObject(java.io.ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); + } + + private void migrateNextTransactionalIdHindState(FunctionInitializationContext context) + throws Exception { + ListState<NextTransactionalIdHint> oldNextTransactionalIdHintState = + context.getOperatorStateStore() + .getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); + nextTransactionalIdHintState = + context.getOperatorStateStore() + .getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2); + + ArrayList<NextTransactionalIdHint> oldTransactionalIdHints = + Lists.newArrayList(oldNextTransactionalIdHintState.get()); + if (!oldTransactionalIdHints.isEmpty()) { + nextTransactionalIdHintState.addAll(oldTransactionalIdHints); + // clear old state + oldNextTransactionalIdHintState.clear(); + } + } + + private static Properties getPropertiesFromBrokerList(String brokerList) { + String[] elements = brokerList.split(","); + + // validate the broker addresses + for (String broker : elements) { + NetUtils.getCorrectHostnamePort(broker); + } + + Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); + return props; + } + + protected static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) { + // the fetched list is immutable, so we're creating a mutable copy in order to sort it + List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic)); + + // sort the partitions by partition id to make sure the fetched partition list is the same + // across subtasks + Collections.sort( + partitionsList, + new Comparator<PartitionInfo>() { + @Override + public int compare(PartitionInfo o1, PartitionInfo o2) { + return Integer.compare(o1.partition(), o2.partition()); + } + }); + + int[] partitions = new int[partitionsList.size()]; + for (int i = 0; i < partitions.length; i++) { + partitions[i] = partitionsList.get(i).partition(); + } + + return partitions; + } + + /** State for handling transactions. */ + @VisibleForTesting + @Internal + public static class KafkaTransactionState { + + private final transient FlinkKafkaInternalProducer<byte[], byte[]> producer; + + @Nullable final String transactionalId; + + final long producerId; + + final short epoch; + + @VisibleForTesting + public KafkaTransactionState( + String transactionalId, FlinkKafkaInternalProducer<byte[], byte[]> producer) { + this(transactionalId, producer.getProducerId(), producer.getEpoch(), producer); + } + + @VisibleForTesting + public KafkaTransactionState(FlinkKafkaInternalProducer<byte[], byte[]> producer) { + this(null, -1, (short) -1, producer); + } + + @VisibleForTesting + public KafkaTransactionState( + @Nullable String transactionalId, + long producerId, + short epoch, + FlinkKafkaInternalProducer<byte[], byte[]> producer) { + this.transactionalId = transactionalId; + this.producerId = producerId; + this.epoch = epoch; + this.producer = producer; + } + + boolean isTransactional() { + return transactionalId != null; + } + + public FlinkKafkaInternalProducer<byte[], byte[]> getProducer() { + return producer; + } + + @Override + public String toString() { + return String.format( + "%s [transactionalId=%s, producerId=%s, epoch=%s]", + this.getClass().getSimpleName(), transactionalId, producerId, epoch); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FlinkKafkaProducer2.KafkaTransactionState that = + (FlinkKafkaProducer2.KafkaTransactionState) o; + + if (producerId != that.producerId) { + return false; + } + if (epoch != that.epoch) { + return false; + } + return transactionalId != null + ? transactionalId.equals(that.transactionalId) + : that.transactionalId == null; + } + + @Override + public int hashCode() { + int result = transactionalId != null ? transactionalId.hashCode() : 0; + result = 31 * result + (int) (producerId ^ (producerId >>> 32)); + result = 31 * result + (int) epoch; + return result; + } + } + + /** + * Context associated to this instance of the {@link FlinkKafkaProducer2}. User for keeping track + * of the transactionalIds. + */ + @VisibleForTesting + @Internal + public static class KafkaTransactionContext { + final Set<String> transactionalIds; + + @VisibleForTesting + public KafkaTransactionContext(Set<String> transactionalIds) { + checkNotNull(transactionalIds); + this.transactionalIds = transactionalIds; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FlinkKafkaProducer2.KafkaTransactionContext that = + (FlinkKafkaProducer2.KafkaTransactionContext) o; + + return transactionalIds.equals(that.transactionalIds); + } + + @Override + public int hashCode() { + return transactionalIds.hashCode(); + } + } + + /** + * {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link + * FlinkKafkaProducer2.KafkaTransactionState}. + */ + @VisibleForTesting + @Internal + public static class TransactionStateSerializer + extends TypeSerializerSingleton<FlinkKafkaProducer2.KafkaTransactionState> { + + private static final long serialVersionUID = 1L; + + @Override + public boolean isImmutableType() { + return true; + } + + @Override + public FlinkKafkaProducer2.KafkaTransactionState createInstance() { + return null; + } + + @Override + public FlinkKafkaProducer2.KafkaTransactionState copy( + FlinkKafkaProducer2.KafkaTransactionState from) { + return from; + } + + @Override + public FlinkKafkaProducer2.KafkaTransactionState copy( + FlinkKafkaProducer2.KafkaTransactionState from, + FlinkKafkaProducer2.KafkaTransactionState reuse) { + return from; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize( + FlinkKafkaProducer2.KafkaTransactionState record, DataOutputView target) + throws IOException { + if (record.transactionalId == null) { + target.writeBoolean(false); + } else { + target.writeBoolean(true); + target.writeUTF(record.transactionalId); + } + target.writeLong(record.producerId); + target.writeShort(record.epoch); + } + + @Override + public FlinkKafkaProducer2.KafkaTransactionState deserialize(DataInputView source) + throws IOException { + String transactionalId = null; + if (source.readBoolean()) { + transactionalId = source.readUTF(); + } + long producerId = source.readLong(); + short epoch = source.readShort(); + return new FlinkKafkaProducer2.KafkaTransactionState( + transactionalId, producerId, epoch, null); + } + + @Override + public FlinkKafkaProducer2.KafkaTransactionState deserialize( + FlinkKafkaProducer2.KafkaTransactionState reuse, DataInputView source) + throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + boolean hasTransactionalId = source.readBoolean(); + target.writeBoolean(hasTransactionalId); + if (hasTransactionalId) { + target.writeUTF(source.readUTF()); + } + target.writeLong(source.readLong()); + target.writeShort(source.readShort()); + } + + // ----------------------------------------------------------------------------------- + + @Override + public TypeSerializerSnapshot<FlinkKafkaProducer2.KafkaTransactionState> + snapshotConfiguration() { + return new TransactionStateSerializerSnapshot(); + } + + /** Serializer configuration snapshot for compatibility and format evolution. */ + @SuppressWarnings("WeakerAccess") + public static final class TransactionStateSerializerSnapshot + extends SimpleTypeSerializerSnapshot<FlinkKafkaProducer2.KafkaTransactionState> { + + public TransactionStateSerializerSnapshot() { + super(TransactionStateSerializer::new); + } + } + } + + /** + * {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link + * FlinkKafkaProducer2.KafkaTransactionContext}. + */ + @VisibleForTesting + @Internal + public static class ContextStateSerializer + extends TypeSerializerSingleton<FlinkKafkaProducer2.KafkaTransactionContext> { + + private static final long serialVersionUID = 1L; + + @Override + public boolean isImmutableType() { + return true; + } + + @Override + public FlinkKafkaProducer2.KafkaTransactionContext createInstance() { + return null; + } + + @Override + public FlinkKafkaProducer2.KafkaTransactionContext copy( + FlinkKafkaProducer2.KafkaTransactionContext from) { + return from; + } + + @Override + public FlinkKafkaProducer2.KafkaTransactionContext copy( + FlinkKafkaProducer2.KafkaTransactionContext from, + FlinkKafkaProducer2.KafkaTransactionContext reuse) { + return from; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize( + FlinkKafkaProducer2.KafkaTransactionContext record, DataOutputView target) + throws IOException { + int numIds = record.transactionalIds.size(); + target.writeInt(numIds); + for (String id : record.transactionalIds) { + target.writeUTF(id); + } + } + + @Override + public FlinkKafkaProducer2.KafkaTransactionContext deserialize(DataInputView source) + throws IOException { + int numIds = source.readInt(); + Set<String> ids = new HashSet<>(numIds); + for (int i = 0; i < numIds; i++) { + ids.add(source.readUTF()); + } + return new FlinkKafkaProducer2.KafkaTransactionContext(ids); + } + + @Override + public FlinkKafkaProducer2.KafkaTransactionContext deserialize( + FlinkKafkaProducer2.KafkaTransactionContext reuse, DataInputView source) + throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + int numIds = source.readInt(); + target.writeInt(numIds); + for (int i = 0; i < numIds; i++) { + target.writeUTF(source.readUTF()); + } + } + + // ----------------------------------------------------------------------------------- + + @Override + public TypeSerializerSnapshot<KafkaTransactionContext> snapshotConfiguration() { + return new ContextStateSerializerSnapshot(); + } + + /** Serializer configuration snapshot for compatibility and format evolution. */ + @SuppressWarnings("WeakerAccess") + public static final class ContextStateSerializerSnapshot + extends SimpleTypeSerializerSnapshot<KafkaTransactionContext> { + + public ContextStateSerializerSnapshot() { + super(ContextStateSerializer::new); + } + } + } + + /** Keep information required to deduce next safe to use transactional id. */ + public static class NextTransactionalIdHint { + public int lastParallelism = 0; + public long nextFreeTransactionalId = 0; + + public NextTransactionalIdHint() { + this(0, 0); + } + + public NextTransactionalIdHint(int parallelism, long nextFreeTransactionalId) { + this.lastParallelism = parallelism; + this.nextFreeTransactionalId = nextFreeTransactionalId; + } + + @Override + public String toString() { + return "NextTransactionalIdHint[" + + "lastParallelism=" + + lastParallelism + + ", nextFreeTransactionalId=" + + nextFreeTransactionalId + + ']'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + NextTransactionalIdHint that = (NextTransactionalIdHint) o; + + if (lastParallelism != that.lastParallelism) { + return false; + } + return nextFreeTransactionalId == that.nextFreeTransactionalId; + } + + @Override + public int hashCode() { + int result = lastParallelism; + result = + 31 * result + + (int) (nextFreeTransactionalId ^ (nextFreeTransactionalId >>> 32)); + return result; + } + } + + /** + * {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link + * FlinkKafkaProducer2.NextTransactionalIdHint}. + */ + @VisibleForTesting + @Internal + public static class NextTransactionalIdHintSerializer + extends TypeSerializerSingleton<NextTransactionalIdHint> { + + private static final long serialVersionUID = 1L; + + @Override + public boolean isImmutableType() { + return true; + } + + @Override + public NextTransactionalIdHint createInstance() { + return new NextTransactionalIdHint(); + } + + @Override + public NextTransactionalIdHint copy(NextTransactionalIdHint from) { + return from; + } + + @Override + public NextTransactionalIdHint copy( + NextTransactionalIdHint from, NextTransactionalIdHint reuse) { + return from; + } + + @Override + public int getLength() { + return Long.BYTES + Integer.BYTES; + } + + @Override + public void serialize(NextTransactionalIdHint record, DataOutputView target) + throws IOException { + target.writeLong(record.nextFreeTransactionalId); + target.writeInt(record.lastParallelism); + } + + @Override + public NextTransactionalIdHint deserialize(DataInputView source) throws IOException { + long nextFreeTransactionalId = source.readLong(); + int lastParallelism = source.readInt(); + return new NextTransactionalIdHint(lastParallelism, nextFreeTransactionalId); + } + + @Override + public NextTransactionalIdHint deserialize( + NextTransactionalIdHint reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + target.writeLong(source.readLong()); + target.writeInt(source.readInt()); + } + + @Override + public TypeSerializerSnapshot<NextTransactionalIdHint> snapshotConfiguration() { + return new NextTransactionalIdHintSerializerSnapshot(); + } + + /** Serializer configuration snapshot for compatibility and format evolution. */ + @SuppressWarnings("WeakerAccess") + public static final class NextTransactionalIdHintSerializerSnapshot + extends SimpleTypeSerializerSnapshot<NextTransactionalIdHint> { + + public NextTransactionalIdHintSerializerSnapshot() { + super(NextTransactionalIdHintSerializer::new); + } + } + } +} diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/RuntimeContextAware.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/RuntimeContextAware.java new file mode 100644 index 0000000..ed35475 --- /dev/null +++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/RuntimeContextAware.java @@ -0,0 +1,7 @@ +package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+public interface RuntimeContextAware {
+ void runtimeContextAware(RuntimeContext runtimeContext);
+}
diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher2.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher2.java new file mode 100644 index 0000000..4000079 --- /dev/null +++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher2.java @@ -0,0 +1,38 @@ +package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.RuntimeContextAware;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class KafkaFetcher2<T> extends KafkaFetcher<T> {
+ public KafkaFetcher2(
+ SourceFunction.SourceContext<T> sourceContext,
+ Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
+ SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
+ StreamingRuntimeContext runtimeContext,
+ ProcessingTimeService processingTimeProvider,
+ long autoWatermarkInterval,
+ ClassLoader userCodeClassLoader,
+ String taskNameWithSubtasks,
+ KafkaDeserializationSchema<T> deserializer,
+ Properties kafkaProperties,
+ long pollTimeout,
+ MetricGroup subtaskMetricGroup,
+ MetricGroup consumerMetricGroup,
+ boolean useMetrics) throws Exception {
+ super(sourceContext, assignedPartitionsWithInitialOffsets, watermarkStrategy, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, taskNameWithSubtasks, deserializer, kafkaProperties, pollTimeout, subtaskMetricGroup, consumerMetricGroup, useMetrics);
+ if(deserializer instanceof RuntimeContextAware){
+ ((RuntimeContextAware) deserializer).runtimeContextAware(runtimeContext);
+ }
+ }
+}
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java index f6201b7..5c51a3f 100644 --- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java +++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java @@ -37,11 +37,11 @@ public class JsonEventDeserializationSchema implements DeserializationSchema<Eve try { map = JSON.parseObject(message); } catch (Exception e) { - LOG.error(String.format("json解析失败for:%s", message), e); if(ignoreParseErrors){ + LOG.error(String.format("json解析失败for:%s", message), e); return null; }else{ - throw new IOException(e); + throw new IOException(String.format("json解析失败for:%s", message), e); } } diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatOptions.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatOptions.java index ef529b0..fa7a486 100644 --- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatOptions.java +++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatOptions.java @@ -1,15 +1,15 @@ -package com.geedgenetworks.formats.json; - -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; - -public class JsonFormatOptions { - public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = - ConfigOptions.key("ignore.parse.errors") - .booleanType() - .defaultValue(true) - .withDescription( - "Optional flag to skip fields and rows with parse errors instead of failing;\n" - + "fields are set to null in case of errors, true by default."); - -} +package com.geedgenetworks.formats.json;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class JsonFormatOptions {
+ public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
+ ConfigOptions.key("ignore.parse.errors")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional flag to skip fields and rows with parse errors instead of failing;\n"
+ + "fields are set to null in case of errors, true by default.");
+
+}
diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java index dbd3cbe..c599445 100644 --- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java +++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java @@ -47,11 +47,11 @@ public class ProtobufEventDeserializationSchema implements DeserializationSchema event.setExtractedFields(map); return event; } catch (Exception e) { - LOG.error(String.format("proto解析失败for:%s", Base64.getEncoder().encodeToString(message)), e); if(ignoreParseErrors){ + LOG.error(String.format("proto解析失败for:%s", Base64.getEncoder().encodeToString(message)), e); return null; }else{ - throw new IOException(e); + throw new IOException(String.format("proto解析失败for:%s", Base64.getEncoder().encodeToString(message)), e); } } } diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java index 74c3e21..7f33dad 100644 --- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java +++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java @@ -32,8 +32,7 @@ public class ProtobufEventSerializationSchema implements SerializationSchema<Eve try { return serializer.serialize(map); } catch (Exception e) { - LOG.error(String.format("proto serialize失败for:%s", JSON.toJSONString(map)), e); - return null; + throw new RuntimeException(String.format("proto serialize失败for:%s", JSON.toJSONString(map)),e); } } } diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufOptions.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufOptions.java index 9fadce3..b29091a 100644 --- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufOptions.java +++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufOptions.java @@ -1,35 +1,35 @@ -package com.geedgenetworks.formats.protobuf; - -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; - -public class ProtobufOptions { - public static final ConfigOption<String> MESSAGE_NAME = - ConfigOptions.key("message.name") - .stringType() - .noDefaultValue() - .withDescription( "The protobuf MessageName to look for in descriptor file."); - - public static final ConfigOption<String> DESC_FILE_PATH = - ConfigOptions.key("descriptor.file.path") - .stringType() - .noDefaultValue() - .withDescription( "The Protobuf descriptor file."); - - public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = - ConfigOptions.key("ignore.parse.errors") - .booleanType() - .defaultValue(true) - .withDescription( - "Optional flag to skip fields and rows with parse errors instead of failing;\n" - + "fields are set to null in case of errors, true by default."); - - public static final ConfigOption<Boolean> EMIT_DEFAULT_VALUES = - ConfigOptions.key("emit.default.values") - .booleanType() - .defaultValue(false) - .withDescription( - "Optional flag for whether to render fields with zero values when deserializing Protobuf to struct.\n" - + "When a field is empty in the serialized Protobuf, this library will deserialize them as null by default. However, this flag can control whether to render the type-specific zero value.\n" - + "This operates similarly to includingDefaultValues in protobuf-java-util's JsonFormat, or emitDefaults in golang/protobuf's jsonpb."); -} +package com.geedgenetworks.formats.protobuf;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class ProtobufOptions {
+ public static final ConfigOption<String> MESSAGE_NAME =
+ ConfigOptions.key("message.name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription( "The protobuf MessageName to look for in descriptor file.");
+
+ public static final ConfigOption<String> DESC_FILE_PATH =
+ ConfigOptions.key("descriptor.file.path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription( "The Protobuf descriptor file.");
+
+ public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
+ ConfigOptions.key("ignore.parse.errors")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional flag to skip fields and rows with parse errors instead of failing;\n"
+ + "fields are set to null in case of errors, true by default.");
+
+ public static final ConfigOption<Boolean> EMIT_DEFAULT_VALUES =
+ ConfigOptions.key("emit.default.values")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional flag for whether to render fields with zero values when deserializing Protobuf to struct.\n"
+ + "When a field is empty in the serialized Protobuf, this library will deserialize them as null by default. However, this flag can control whether to render the type-specific zero value.\n"
+ + "This operates similarly to includingDefaultValues in protobuf-java-util's JsonFormat, or emitDefaults in golang/protobuf's jsonpb.");
+}
|
