summaryrefslogtreecommitdiff
path: root/groot-connectors
diff options
context:
space:
mode:
Diffstat (limited to 'groot-connectors')
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java12
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java17
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java5
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java8
-rw-r--r--groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java23
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java3
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java16
-rw-r--r--groot-connectors/connector-starrocks/pom.xml23
-rw-r--r--groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java85
-rw-r--r--groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java318
-rw-r--r--groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java28
-rw-r--r--groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory1
-rw-r--r--groot-connectors/pom.xml1
13 files changed, 533 insertions, 7 deletions
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java
index aae6678..bd95dd6 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java
@@ -9,9 +9,15 @@ import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.RuntimeContextAware;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
public class EventKafkaDeserializationSchema implements KafkaDeserializationSchema<Event>, RuntimeContextAware {
private static final Logger LOG = LoggerFactory.getLogger(EventKafkaDeserializationSchema.class);
private final DeserializationSchema<Event> valueDeserialization;
@@ -46,6 +52,12 @@ public class EventKafkaDeserializationSchema implements KafkaDeserializationSche
Event event = valueDeserialization.deserialize(record.value());
if(event != null){
event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, record.timestamp());
+ Headers headers = record.headers();
+ Map<String, String> headersMap = new HashMap<>();
+ for (Header header : headers) {
+ headersMap.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
+ }
+ event.getExtractedFields().put(Event.INTERNAL_HEADERS_KEY, headersMap);
out.collect(event);
internalMetrics.incrementOutEvents();
return;
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java
index 48734ea..7924f8d 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaConnectorOptionsUtil.java
@@ -1,11 +1,13 @@
package com.geedgenetworks.connectors.kafka;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class KafkaConnectorOptionsUtil {
public static final String PROPERTIES_PREFIX = "kafka.";
+ public static final String HEADERS_PREFIX = "headers.";
public static Properties getKafkaProperties(Map<String, String> tableOptions) {
final Properties kafkaProperties = new Properties();
@@ -23,6 +25,21 @@ public class KafkaConnectorOptionsUtil {
return kafkaProperties;
}
+ public static Map<String, String> getKafkaHeaders(Map<String, String> tableOptions) {
+ final Map<String, String> headers = new HashMap<>();
+
+ tableOptions.keySet().stream()
+ .filter(key -> key.startsWith(HEADERS_PREFIX))
+ .forEach(
+ key -> {
+ final String value = tableOptions.get(key);
+ final String subKey = key.substring((HEADERS_PREFIX).length());
+ headers.put(subKey, value);
+ });
+
+ return headers;
+ }
+
/**
* Decides if the table options contains Kafka client properties that start with prefix
* 'properties'.
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
index 964aa94..496e6a3 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
@@ -10,6 +10,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.connectors.kafka.GrootFlinkKafkaProducer;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -19,6 +20,7 @@ public class KafkaSinkProvider implements SinkProvider {
private final String topic;
private final Properties properties;
+ private final Map<String, String> headers;
private final boolean logFailuresOnly;
private final RateLimitingStrategy rateLimitingStrategy;
@@ -27,6 +29,7 @@ public class KafkaSinkProvider implements SinkProvider {
EncodingFormat valueEncodingFormat,
String topic,
Properties properties,
+ Map<String, String> headers,
boolean logFailuresOnly,
RateLimitingStrategy rateLimitingStrategy
) {
@@ -34,6 +37,7 @@ public class KafkaSinkProvider implements SinkProvider {
this.valueSerialization = valueEncodingFormat.createRuntimeEncoder(dataType);
this.topic = topic;
this.properties = properties;
+ this.headers = headers;
this.logFailuresOnly = logFailuresOnly;
this.rateLimitingStrategy = rateLimitingStrategy;
}
@@ -48,6 +52,7 @@ public class KafkaSinkProvider implements SinkProvider {
);
kafkaProducer.setLogFailuresOnly(logFailuresOnly);
kafkaProducer.setRateLimitingStrategy(rateLimitingStrategy);
+ kafkaProducer.setHeaders(headers);
return dataStream.addSink(kafkaProducer);
}
}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
index fbbaed2..394e618 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
@@ -18,8 +18,7 @@ import org.apache.flink.util.Preconditions;
import java.util.*;
import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptions.*;
-import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
-import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.getKafkaProperties;
+import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.*;
public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory {
public static final String IDENTIFIER = "kafka";
@@ -51,7 +50,7 @@ public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory {
// 获取valueEncodingFormat
EncodingFormat valueEncodingFormat = helper.discoverEncodingFormat(EncodingFormatFactory.class, FactoryUtil.FORMAT);
- helper.validateExcept(PROPERTIES_PREFIX); // 校验参数,排除properties.参数
+ helper.validateExcept(PROPERTIES_PREFIX, HEADERS_PREFIX); // 校验参数,排除properties.参数
StructType dataType = context.getDataType();
ReadableConfig config = context.getConfiguration();
@@ -59,8 +58,9 @@ public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory {
String topic = config.get(TOPIC).get(0);
boolean logFailuresOnly = config.get(LOG_FAILURES_ONLY);
final Properties properties = getKafkaProperties(context.getOptions());
+ Map<String, String> headers = getKafkaHeaders(context.getOptions());
- return new KafkaSinkProvider(dataType, valueEncodingFormat, topic, properties, logFailuresOnly, getRateLimitingStrategy(config));
+ return new KafkaSinkProvider(dataType, valueEncodingFormat, topic, properties, headers, logFailuresOnly, getRateLimitingStrategy(config));
}
@Override
diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java
index 09e8190..3b7e0c5 100644
--- a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java
+++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java
@@ -61,12 +61,15 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.BlockingDeque;
@@ -257,6 +260,8 @@ public class GrootFlinkKafkaProducer<IN>
private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap<>();
private RateLimitingStrategy rateLimitingStrategy;
private boolean rateLimitingEnable;
+ private Map<String, String> headers;
+ private Header[] recordHeaders;
private transient InternalMetrics internalMetrics;
@@ -769,6 +774,10 @@ public class GrootFlinkKafkaProducer<IN>
this.rateLimitingEnable = rateLimitingStrategy != null && rateLimitingStrategy.rateLimited();
}
+ public void setHeaders(Map<String, String> headers) {
+ this.headers = headers;
+ }
+
/**
* Disables the propagation of exceptions thrown when committing presumably timed out Kafka
* transactions during recovery of the job. If a Kafka transaction is timed out, a commit will
@@ -796,6 +805,17 @@ public class GrootFlinkKafkaProducer<IN>
rateLimitingStrategy = rateLimitingStrategy.withMaxRate(subTaskMaxRate);
LOG.error("rateLimitingStrategy: {}", rateLimitingStrategy);
}
+ if(headers != null && !headers.isEmpty()){
+ recordHeaders = new Header[headers.size()];
+ int i = 0;
+ for (Map.Entry<String, String> entry : headers.entrySet()) {
+ recordHeaders[i++] = new RecordHeader(entry.getKey(), entry.getValue().getBytes(StandardCharsets.UTF_8));
+ }
+ checkState(i == headers.size());
+ }else{
+ recordHeaders = new Header[0];
+ }
+
internalMetrics = new InternalMetrics(getRuntimeContext());
if (logFailuresOnly) {
callback =
@@ -929,6 +949,9 @@ public class GrootFlinkKafkaProducer<IN>
record = kafkaSchema.serialize(next, context.timestamp());
}
+ for (int i = 0; i < recordHeaders.length; i++) {
+ record.headers().add(recordHeaders[i]);
+ }
if (!rateLimitingEnable) {
pendingRecords.incrementAndGet();
transaction.producer.send(record, callback);
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java
index 5101fa1..0a36100 100644
--- a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java
@@ -165,7 +165,8 @@ public class FakerUtils {
private static Faker<?> parseSequenceFaker(JSONObject obj) {
long start = obj.getLongValue("start", 0L);
long step = obj.getLongValue("step", 1L);
- return new SequenceFaker(start, step);
+ int batch = obj.getIntValue("batch", 1);
+ return new SequenceFaker(start, step, batch);
}
private static Faker<?> parseStringFaker(JSONObject obj) {
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java
index 0005234..867f138 100644
--- a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java
@@ -3,22 +3,34 @@ package com.geedgenetworks.connectors.mock.faker;
public class SequenceFaker extends Faker<Long> {
private final long start;
private final long step;
+ private final int batch;
private long value;
+ private int cnt;
public SequenceFaker(long start) {
- this(start, 1);
+ this(start, 1, 1);
}
public SequenceFaker(long start, long step) {
+ this(start, step, 1);
+ }
+
+ public SequenceFaker(long start, long step, int batch) {
this.start = start;
this.step = step;
+ this.batch = batch;
this.value = start;
+ this.cnt = 0;
}
@Override
public Long geneValue() throws Exception {
Long rst = value;
- value += step;
+ cnt++;
+ if(cnt == batch){
+ cnt = 0;
+ value += step;
+ }
return rst;
}
diff --git a/groot-connectors/connector-starrocks/pom.xml b/groot-connectors/connector-starrocks/pom.xml
new file mode 100644
index 0000000..095ee6d
--- /dev/null
+++ b/groot-connectors/connector-starrocks/pom.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-connectors</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-starrocks</artifactId>
+ <name>Groot : Connectors : StarRocks </name>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.starrocks</groupId>
+ <artifactId>flink-connector-starrocks</artifactId>
+ <version>1.2.9_flink-1.13_2.12</version>
+ </dependency>
+ </dependencies>
+
+</project> \ No newline at end of file
diff --git a/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java b/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java
new file mode 100644
index 0000000..fc41481
--- /dev/null
+++ b/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java
@@ -0,0 +1,85 @@
+package com.geedgenetworks.connectors.starrocks;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.sink.SinkProvider;
+import com.geedgenetworks.core.factories.FactoryUtil;
+import com.geedgenetworks.core.factories.SinkTableFactory;
+import com.starrocks.connector.flink.table.sink.EventStarRocksDynamicSinkFunctionV2;
+import com.starrocks.connector.flink.table.sink.SinkFunctionFactory;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class StarRocksTableFactory implements SinkTableFactory {
+ public static final String IDENTIFIER = "starrocks";
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public SinkProvider getSinkProvider(Context context) {
+ final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validateExcept(CONNECTION_INFO_PREFIX);
+
+ final boolean logFailuresOnly = context.getConfiguration().get(LOG_FAILURES_ONLY);
+ StarRocksSinkOptions.Builder builder = StarRocksSinkOptions.builder();
+ context.getOptions().forEach((key, value) -> {
+ if(key.startsWith(CONNECTION_INFO_PREFIX)){
+ builder.withProperty(key.substring(CONNECTION_INFO_PREFIX.length()), value);
+ }
+ });
+ builder.withProperty("sink.properties.format", "json");
+ final StarRocksSinkOptions options = builder.build();
+ SinkFunctionFactory.detectStarRocksFeature(options);
+ Preconditions.checkArgument(options.isSupportTransactionStreamLoad());
+ final SinkFunction<Event> sinkFunction = new EventStarRocksDynamicSinkFunctionV2(options, logFailuresOnly);
+ return new SinkProvider() {
+ @Override
+ public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) {
+ /*DataStream<String> ds = dataStream.flatMap(new FlatMapFunction<Event, String>() {
+ @Override
+ public void flatMap(Event value, Collector<String> out) throws Exception {
+ try {
+ out.collect(JSON.toJSONString(value.getExtractedFields()));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ SinkFunction<String> sink = StarRocksSink.sink(options);
+ return ds.addSink(sink);
+ */
+ return dataStream.addSink(sinkFunction);
+ }
+ };
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(LOG_FAILURES_ONLY);
+ return options;
+ }
+
+ public static final String CONNECTION_INFO_PREFIX = "connection.";
+
+ public static final ConfigOption<Boolean> LOG_FAILURES_ONLY =
+ ConfigOptions.key("log.failures.only")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Optional flag to whether the sink should fail on errors, or only log them;\n"
+ + "If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown, true by default.");
+}
diff --git a/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java
new file mode 100644
index 0000000..71a9467
--- /dev/null
+++ b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java
@@ -0,0 +1,318 @@
+package com.starrocks.connector.flink.table.sink;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity;
+import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener;
+import com.starrocks.connector.flink.tools.EnvUtils;
+import com.starrocks.connector.flink.tools.JsonWrapper;
+import com.starrocks.data.load.stream.LabelGeneratorFactory;
+import com.starrocks.data.load.stream.StreamLoadSnapshot;
+import com.starrocks.data.load.stream.properties.StreamLoadProperties;
+import com.starrocks.data.load.stream.v2.StreamLoadManagerV2;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+public class EventStarRocksDynamicSinkFunctionV2 extends StarRocksDynamicSinkFunctionBase<Event> {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger log = LoggerFactory.getLogger(StarRocksDynamicSinkFunctionV2.class);
+
+ private static final int NESTED_ROW_DATA_HEADER_SIZE = 256;
+
+ private final StarRocksSinkOptions sinkOptions;
+ private final boolean logFailuresOnly;
+ private final StreamLoadProperties properties;
+ private StreamLoadManagerV2 sinkManager;
+
+ private transient volatile ListState<StarrocksSnapshotState> snapshotStates;
+
+ private transient long restoredCheckpointId;
+
+ private transient List<ExactlyOnceLabelGeneratorSnapshot> restoredGeneratorSnapshots;
+
+ private transient Map<Long, List<StreamLoadSnapshot>> snapshotMap;
+
+ private transient StarRocksStreamLoadListener streamLoadListener;
+
+ // Only valid when using exactly-once and label prefix is set
+ @Nullable
+ private transient ExactlyOnceLabelGeneratorFactory exactlyOnceLabelFactory;
+
+ @Deprecated
+ private transient ListState<Map<String, StarRocksSinkBufferEntity>> legacyState;
+ @Deprecated
+ private transient List<StarRocksSinkBufferEntity> legacyData;
+ private transient JsonWrapper jsonWrapper;
+ private transient InternalMetrics internalMetrics;
+
+ public EventStarRocksDynamicSinkFunctionV2(StarRocksSinkOptions sinkOptions, boolean logFailuresOnly) {
+ Preconditions.checkArgument(sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE);
+ this.sinkOptions = sinkOptions;
+ this.logFailuresOnly = logFailuresOnly;
+ this.properties = sinkOptions.getProperties(null);
+ this.sinkManager = new StreamLoadManagerV2(sinkOptions.getProperties(null),
+ sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE);
+ }
+
+ @Override
+ public void invoke(Event value, Context context) throws Exception {
+ internalMetrics.incrementInEvents(1);
+ String json;
+ try {
+ json = JSON.toJSONString(value.getExtractedFields());
+ } catch (Exception e) {
+ internalMetrics.incrementErrorEvents(1);
+ log.error("json convert error", e);
+ return;
+ }
+ try {
+ sinkManager.write(null, sinkOptions.getDatabaseName(), sinkOptions.getTableName(), json);
+ } catch (Exception e) {
+ internalMetrics.incrementErrorEvents(1);
+ if (logFailuresOnly) {
+ log.error("write error", e);
+ resetSinkManager();
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ private void resetSinkManager(){
+ try {
+ StreamLoadSnapshot snapshot = sinkManager.snapshot();
+ sinkManager.abort(snapshot);
+ } catch (Exception ex) {
+ log.error("write error", ex);
+ }
+ sinkManager.close();
+
+ this.sinkManager = new StreamLoadManagerV2(this.properties,
+ sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE);
+ sinkManager.setStreamLoadListener(streamLoadListener);
+
+ LabelGeneratorFactory labelGeneratorFactory;
+ String labelPrefix = sinkOptions.getLabelPrefix();
+ if (labelPrefix == null ||
+ sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE ||
+ !sinkOptions.isEnableExactlyOnceLabelGen()) {
+ labelGeneratorFactory = new LabelGeneratorFactory.DefaultLabelGeneratorFactory(
+ labelPrefix == null ? "flink" : labelPrefix);
+ } else {
+ labelGeneratorFactory = exactlyOnceLabelFactory;
+ }
+ sinkManager.setLabelGeneratorFactory(labelGeneratorFactory);
+
+ sinkManager.init();
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ internalMetrics = new InternalMetrics(getRuntimeContext());
+ this.streamLoadListener = new EventStreamLoadListener(getRuntimeContext(), sinkOptions, internalMetrics);
+ sinkManager.setStreamLoadListener(streamLoadListener);
+
+ LabelGeneratorFactory labelGeneratorFactory;
+ String labelPrefix = sinkOptions.getLabelPrefix();
+ if (labelPrefix == null ||
+ sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE ||
+ !sinkOptions.isEnableExactlyOnceLabelGen()) {
+ labelGeneratorFactory = new LabelGeneratorFactory.DefaultLabelGeneratorFactory(
+ labelPrefix == null ? "flink" : labelPrefix);
+ } else {
+ this.exactlyOnceLabelFactory = new ExactlyOnceLabelGeneratorFactory(
+ labelPrefix,
+ getRuntimeContext().getNumberOfParallelSubtasks(),
+ getRuntimeContext().getIndexOfThisSubtask(),
+ restoredCheckpointId);
+ exactlyOnceLabelFactory.restore(restoredGeneratorSnapshots);
+ labelGeneratorFactory = exactlyOnceLabelFactory;
+ }
+ sinkManager.setLabelGeneratorFactory(labelGeneratorFactory);
+
+ sinkManager.init();
+
+ if (sinkOptions.getSemantic() == StarRocksSinkSemantic.EXACTLY_ONCE) {
+ openForExactlyOnce();
+ }
+
+ log.info("Open sink function v2. {}", EnvUtils.getGitInformation());
+ }
+
+ private void openForExactlyOnce() throws Exception {
+ if (sinkOptions.isAbortLingeringTxns()) {
+ LingeringTransactionAborter aborter = new LingeringTransactionAborter(
+ sinkOptions.getLabelPrefix(),
+ restoredCheckpointId,
+ getRuntimeContext().getIndexOfThisSubtask(),
+ sinkOptions.getAbortCheckNumTxns(),
+ sinkOptions.getDbTables(),
+ restoredGeneratorSnapshots,
+ sinkManager.getStreamLoader());
+ aborter.execute();
+ }
+
+ notifyCheckpointComplete(Long.MAX_VALUE);
+ }
+
+ private JsonWrapper getOrCreateJsonWrapper() {
+ if (jsonWrapper == null) {
+ this.jsonWrapper = new JsonWrapper();
+ }
+
+ return jsonWrapper;
+ }
+
+ public void finish() {
+ sinkManager.flush();
+ }
+
+ @Override
+ public void close() {
+ log.info("Close sink function");
+ try {
+ sinkManager.flush();
+ } catch (Exception e) {
+ log.error("Failed to flush when closing", e);
+ throw e;
+ } finally {
+ StreamLoadSnapshot snapshot = sinkManager.snapshot();
+ sinkManager.abort(snapshot);
+ sinkManager.close();
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+ sinkManager.flush();
+ if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
+ return;
+ }
+
+ StreamLoadSnapshot snapshot = sinkManager.snapshot();
+
+ if (sinkManager.prepare(snapshot)) {
+ snapshotMap.put(functionSnapshotContext.getCheckpointId(), Collections.singletonList(snapshot));
+
+ snapshotStates.clear();
+ List<ExactlyOnceLabelGeneratorSnapshot> labelSnapshots = exactlyOnceLabelFactory == null ? null
+ : exactlyOnceLabelFactory.snapshot(functionSnapshotContext.getCheckpointId());
+ snapshotStates.add(StarrocksSnapshotState.of(snapshotMap, labelSnapshots));
+ } else {
+ sinkManager.abort(snapshot);
+ throw new RuntimeException("Snapshot state failed by prepare");
+ }
+
+ if (legacyState != null) {
+ legacyState.clear();
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
+ log.info("Initialize state");
+ if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
+ return;
+ }
+
+ ListStateDescriptor<byte[]> descriptor =
+ new ListStateDescriptor<>(
+ "starrocks-sink-transaction",
+ TypeInformation.of(new TypeHint<byte[]>() {})
+ );
+
+ ListState<byte[]> listState = functionInitializationContext.getOperatorStateStore().getListState(descriptor);
+ snapshotStates = new SimpleVersionedListState<>(listState, new StarRocksVersionedSerializer(getOrCreateJsonWrapper()));
+
+ // old version
+ ListStateDescriptor<Map<String, StarRocksSinkBufferEntity>> legacyDescriptor =
+ new ListStateDescriptor<>(
+ "buffered-rows",
+ TypeInformation.of(new TypeHint<Map<String, StarRocksSinkBufferEntity>>(){})
+ );
+ legacyState = functionInitializationContext.getOperatorStateStore().getListState(legacyDescriptor);
+ this.restoredCheckpointId = 0;
+ this.restoredGeneratorSnapshots = new ArrayList<>();
+ this.snapshotMap = new ConcurrentHashMap<>();
+ if (functionInitializationContext.isRestored()) {
+ for (StarrocksSnapshotState state : snapshotStates.get()) {
+ for (Map.Entry<Long, List<StreamLoadSnapshot>> entry : state.getData().entrySet()) {
+ snapshotMap.compute(entry.getKey(), (k, v) -> {
+ if (v == null) {
+ return new ArrayList<>(entry.getValue());
+ }
+ v.addAll(entry.getValue());
+ return v;
+ });
+ }
+
+ if (state.getLabelSnapshots() != null) {
+ List<ExactlyOnceLabelGeneratorSnapshot> labelSnapshots = state.getLabelSnapshots();
+ restoredGeneratorSnapshots.addAll(labelSnapshots);
+ long checkpointId = labelSnapshots.isEmpty() ? -1 : labelSnapshots.get(0).getCheckpointId();
+ restoredCheckpointId = Math.max(restoredCheckpointId, checkpointId);
+ }
+ }
+
+ legacyData = new ArrayList<>();
+ for (Map<String, StarRocksSinkBufferEntity> entry : legacyState.get()) {
+ legacyData.addAll(entry.values());
+ }
+ log.info("There are {} items from legacy state", legacyData.size());
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
+ return;
+ }
+
+ boolean succeed = true;
+ List<Long> commitCheckpointIds = snapshotMap.keySet().stream()
+ .filter(cpId -> cpId <= checkpointId)
+ .sorted(Long::compare)
+ .collect(Collectors.toList());
+
+ for (Long cpId : commitCheckpointIds) {
+ try {
+ for (StreamLoadSnapshot snapshot : snapshotMap.get(cpId)) {
+ if (!sinkManager.commit(snapshot)) {
+ succeed = false;
+ break;
+ }
+ }
+
+ if (!succeed) {
+ throw new RuntimeException(String.format("Failed to commit some transactions for snapshot %s, " +
+ "please check taskmanager logs for details", cpId));
+ }
+ } catch (Exception e) {
+ log.error("Failed to notify checkpoint complete, checkpoint id : {}", checkpointId, e);
+ throw new RuntimeException("Failed to notify checkpoint complete for checkpoint id " + checkpointId, e);
+ }
+
+ snapshotMap.remove(cpId);
+ }
+
+ // set legacyState to null to avoid clear it in latter snapshotState
+ legacyState = null;
+ }
+
+} \ No newline at end of file
diff --git a/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java
new file mode 100644
index 0000000..337109b
--- /dev/null
+++ b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java
@@ -0,0 +1,28 @@
+package com.starrocks.connector.flink.table.sink;
+
+import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener;
+import com.starrocks.data.load.stream.StreamLoadResponse;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+public class EventStreamLoadListener extends StarRocksStreamLoadListener {
+ private transient InternalMetrics internalMetrics;
+ public EventStreamLoadListener(RuntimeContext context, StarRocksSinkOptions sinkOptions, InternalMetrics internalMetrics) {
+ super(context, sinkOptions);
+ this.internalMetrics = internalMetrics;
+ }
+
+ @Override
+ public void flushSucceedRecord(StreamLoadResponse response) {
+ super.flushSucceedRecord(response);
+ if (response.getFlushRows() != null) {
+ internalMetrics.incrementOutEvents(response.getFlushRows());
+ }
+ }
+
+ @Override
+ public void flushFailedRecord() {
+ super.flushFailedRecord();
+ internalMetrics.incrementErrorEvents(1);
+ }
+}
diff --git a/groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
new file mode 100644
index 0000000..c04c5dc
--- /dev/null
+++ b/groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
@@ -0,0 +1 @@
+com.geedgenetworks.connectors.starrocks.StarRocksTableFactory
diff --git a/groot-connectors/pom.xml b/groot-connectors/pom.xml
index 1747fb3..cf5381c 100644
--- a/groot-connectors/pom.xml
+++ b/groot-connectors/pom.xml
@@ -17,6 +17,7 @@
<module>connector-ipfix-collector</module>
<module>connector-file</module>
<module>connector-mock</module>
+ <module>connector-starrocks</module>
</modules>
<dependencies>
<dependency>