diff options
| author | 李奉超 <[email protected]> | 2024-10-11 05:57:55 +0000 |
|---|---|---|
| committer | 李奉超 <[email protected]> | 2024-10-11 05:57:55 +0000 |
| commit | 2e724cd350ae6d3009e49f81eba71f74c6aec810 (patch) | |
| tree | d179091f898cdda427dbb145e119dc9a6c4ebe1f | |
| parent | 13c046f56014ac7496c1d6ec07029756bb3cc287 (diff) | |
| parent | 52e849290b05b81a7ede56bb1e388e0ec2bf9d17 (diff) | |
Merge branch 'connector-starrocks' into 'develop'
GAL-660 支持StarRocks Sink
See merge request galaxy/platform/groot-stream!113
10 files changed, 592 insertions, 0 deletions
diff --git a/docs/connector/sink/starrocks.md b/docs/connector/sink/starrocks.md new file mode 100644 index 0000000..f07e432 --- /dev/null +++ b/docs/connector/sink/starrocks.md @@ -0,0 +1,83 @@ +# Starrocks + +> Starrocks sink connector +> +> ## Description +> +> Sink connector for Starrocks, know more in https://docs.starrocks.io/zh/docs/loading/Flink-connector-starrocks/. + +## Sink Options + +Starrocks sink custom properties. If properties belongs to Starrocks Flink Connector Config, you can use `connection.` prefix to set. + +| Name | Type | Required | Default | Description | +|---------------------|---------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| log.failures.only | Boolean | No | true | Optional flag to whether the sink should fail on errors, or only log them; If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown, true by default. | +| connection.jdbc-url | String | Yes | (none) | The address that is used to connect to the MySQL server of the FE. You can specify multiple addresses, which must be separated by a comma (,). Format: jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3>.. | +| connection.load-url | String | Yes | (none) | The address that is used to connect to the HTTP server of the FE. You can specify multiple addresses, which must be separated by a semicolon (;). Format: <fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>.. | +| connection.config | Map | No | (none) | Starrocks Flink Connector Options, know more in https://docs.starrocks.io/docs/loading/Flink-connector-starrocks/#options. | + +## Example + +This example read data of inline test source and write to Starrocks table `test`. + +```yaml +sources: # [object] Define connector source + inline_source: + type: inline + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string + properties: + # + # [string] Event Data, it will be parsed to Map<String, Object> by the specified format. + # + data: '{"recv_time": 1705565615, "log_id":206211012872372224, "tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}' + format: json + json.ignore.parse.errors: false + +sinks: + starrocks_sink: + type: starrocks + properties: + "log.failures.only": false + "connection.jdbc-url": "jdbc:mysql://192.168.40.222:9030" + "connection.load-url": "192.168.40.222:8030" + "connection.database-name": "test" + "connection.table-name": "test" + "connection.username": "root" + "connection.password": "" + "connection.sink.buffer-flush.interval-ms": "30000" + +application: # [object] Define job configuration + env: + name: groot-stream-job-inline-to-starrocks + parallelism: 3 + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [ starrocks_sink ] + - name: starrocks_sink + downstream: [ ] +``` + diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml index ab68e08..2da2b11 100644 --- a/groot-bootstrap/pom.xml +++ b/groot-bootstrap/pom.xml @@ -66,6 +66,13 @@ <dependency> <groupId>com.geedgenetworks</groupId> + <artifactId>connector-starrocks</artifactId> + <version>${revision}</version> + <scope>${scope}</scope> + </dependency> + + <dependency> + <groupId>com.geedgenetworks</groupId> <artifactId>format-json</artifactId> <version>${revision}</version> <scope>${scope}</scope> diff --git a/groot-connectors/connector-starrocks/pom.xml b/groot-connectors/connector-starrocks/pom.xml new file mode 100644 index 0000000..095ee6d --- /dev/null +++ b/groot-connectors/connector-starrocks/pom.xml @@ -0,0 +1,23 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-connectors</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>connector-starrocks</artifactId> + <name>Groot : Connectors : StarRocks </name> + + <dependencies> + <dependency> + <groupId>com.starrocks</groupId> + <artifactId>flink-connector-starrocks</artifactId> + <version>1.2.9_flink-1.13_2.12</version> + </dependency> + </dependencies> + +</project>
\ No newline at end of file diff --git a/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java b/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java new file mode 100644 index 0000000..fc41481 --- /dev/null +++ b/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java @@ -0,0 +1,85 @@ +package com.geedgenetworks.connectors.starrocks;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.sink.SinkProvider;
+import com.geedgenetworks.core.factories.FactoryUtil;
+import com.geedgenetworks.core.factories.SinkTableFactory;
+import com.starrocks.connector.flink.table.sink.EventStarRocksDynamicSinkFunctionV2;
+import com.starrocks.connector.flink.table.sink.SinkFunctionFactory;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class StarRocksTableFactory implements SinkTableFactory {
+ public static final String IDENTIFIER = "starrocks";
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public SinkProvider getSinkProvider(Context context) {
+ final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validateExcept(CONNECTION_INFO_PREFIX);
+
+ final boolean logFailuresOnly = context.getConfiguration().get(LOG_FAILURES_ONLY);
+ StarRocksSinkOptions.Builder builder = StarRocksSinkOptions.builder();
+ context.getOptions().forEach((key, value) -> {
+ if(key.startsWith(CONNECTION_INFO_PREFIX)){
+ builder.withProperty(key.substring(CONNECTION_INFO_PREFIX.length()), value);
+ }
+ });
+ builder.withProperty("sink.properties.format", "json");
+ final StarRocksSinkOptions options = builder.build();
+ SinkFunctionFactory.detectStarRocksFeature(options);
+ Preconditions.checkArgument(options.isSupportTransactionStreamLoad());
+ final SinkFunction<Event> sinkFunction = new EventStarRocksDynamicSinkFunctionV2(options, logFailuresOnly);
+ return new SinkProvider() {
+ @Override
+ public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) {
+ /*DataStream<String> ds = dataStream.flatMap(new FlatMapFunction<Event, String>() {
+ @Override
+ public void flatMap(Event value, Collector<String> out) throws Exception {
+ try {
+ out.collect(JSON.toJSONString(value.getExtractedFields()));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ SinkFunction<String> sink = StarRocksSink.sink(options);
+ return ds.addSink(sink);
+ */
+ return dataStream.addSink(sinkFunction);
+ }
+ };
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(LOG_FAILURES_ONLY);
+ return options;
+ }
+
+ public static final String CONNECTION_INFO_PREFIX = "connection.";
+
+ public static final ConfigOption<Boolean> LOG_FAILURES_ONLY =
+ ConfigOptions.key("log.failures.only")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Optional flag to whether the sink should fail on errors, or only log them;\n"
+ + "If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown, true by default.");
+}
diff --git a/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java new file mode 100644 index 0000000..71a9467 --- /dev/null +++ b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java @@ -0,0 +1,318 @@ +package com.starrocks.connector.flink.table.sink; + +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.metrics.InternalMetrics; +import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity; +import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener; +import com.starrocks.connector.flink.tools.EnvUtils; +import com.starrocks.connector.flink.tools.JsonWrapper; +import com.starrocks.data.load.stream.LabelGeneratorFactory; +import com.starrocks.data.load.stream.StreamLoadSnapshot; +import com.starrocks.data.load.stream.properties.StreamLoadProperties; +import com.starrocks.data.load.stream.v2.StreamLoadManagerV2; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +public class EventStarRocksDynamicSinkFunctionV2 extends StarRocksDynamicSinkFunctionBase<Event> { + + private static final long serialVersionUID = 1L; + private static final Logger log = LoggerFactory.getLogger(StarRocksDynamicSinkFunctionV2.class); + + private static final int NESTED_ROW_DATA_HEADER_SIZE = 256; + + private final StarRocksSinkOptions sinkOptions; + private final boolean logFailuresOnly; + private final StreamLoadProperties properties; + private StreamLoadManagerV2 sinkManager; + + private transient volatile ListState<StarrocksSnapshotState> snapshotStates; + + private transient long restoredCheckpointId; + + private transient List<ExactlyOnceLabelGeneratorSnapshot> restoredGeneratorSnapshots; + + private transient Map<Long, List<StreamLoadSnapshot>> snapshotMap; + + private transient StarRocksStreamLoadListener streamLoadListener; + + // Only valid when using exactly-once and label prefix is set + @Nullable + private transient ExactlyOnceLabelGeneratorFactory exactlyOnceLabelFactory; + + @Deprecated + private transient ListState<Map<String, StarRocksSinkBufferEntity>> legacyState; + @Deprecated + private transient List<StarRocksSinkBufferEntity> legacyData; + private transient JsonWrapper jsonWrapper; + private transient InternalMetrics internalMetrics; + + public EventStarRocksDynamicSinkFunctionV2(StarRocksSinkOptions sinkOptions, boolean logFailuresOnly) { + Preconditions.checkArgument(sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE); + this.sinkOptions = sinkOptions; + this.logFailuresOnly = logFailuresOnly; + this.properties = sinkOptions.getProperties(null); + this.sinkManager = new StreamLoadManagerV2(sinkOptions.getProperties(null), + sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE); + } + + @Override + public void invoke(Event value, Context context) throws Exception { + internalMetrics.incrementInEvents(1); + String json; + try { + json = JSON.toJSONString(value.getExtractedFields()); + } catch (Exception e) { + internalMetrics.incrementErrorEvents(1); + log.error("json convert error", e); + return; + } + try { + sinkManager.write(null, sinkOptions.getDatabaseName(), sinkOptions.getTableName(), json); + } catch (Exception e) { + internalMetrics.incrementErrorEvents(1); + if (logFailuresOnly) { + log.error("write error", e); + resetSinkManager(); + } else { + throw e; + } + } + } + + private void resetSinkManager(){ + try { + StreamLoadSnapshot snapshot = sinkManager.snapshot(); + sinkManager.abort(snapshot); + } catch (Exception ex) { + log.error("write error", ex); + } + sinkManager.close(); + + this.sinkManager = new StreamLoadManagerV2(this.properties, + sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE); + sinkManager.setStreamLoadListener(streamLoadListener); + + LabelGeneratorFactory labelGeneratorFactory; + String labelPrefix = sinkOptions.getLabelPrefix(); + if (labelPrefix == null || + sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE || + !sinkOptions.isEnableExactlyOnceLabelGen()) { + labelGeneratorFactory = new LabelGeneratorFactory.DefaultLabelGeneratorFactory( + labelPrefix == null ? "flink" : labelPrefix); + } else { + labelGeneratorFactory = exactlyOnceLabelFactory; + } + sinkManager.setLabelGeneratorFactory(labelGeneratorFactory); + + sinkManager.init(); + } + + @Override + public void open(Configuration parameters) throws Exception { + internalMetrics = new InternalMetrics(getRuntimeContext()); + this.streamLoadListener = new EventStreamLoadListener(getRuntimeContext(), sinkOptions, internalMetrics); + sinkManager.setStreamLoadListener(streamLoadListener); + + LabelGeneratorFactory labelGeneratorFactory; + String labelPrefix = sinkOptions.getLabelPrefix(); + if (labelPrefix == null || + sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE || + !sinkOptions.isEnableExactlyOnceLabelGen()) { + labelGeneratorFactory = new LabelGeneratorFactory.DefaultLabelGeneratorFactory( + labelPrefix == null ? "flink" : labelPrefix); + } else { + this.exactlyOnceLabelFactory = new ExactlyOnceLabelGeneratorFactory( + labelPrefix, + getRuntimeContext().getNumberOfParallelSubtasks(), + getRuntimeContext().getIndexOfThisSubtask(), + restoredCheckpointId); + exactlyOnceLabelFactory.restore(restoredGeneratorSnapshots); + labelGeneratorFactory = exactlyOnceLabelFactory; + } + sinkManager.setLabelGeneratorFactory(labelGeneratorFactory); + + sinkManager.init(); + + if (sinkOptions.getSemantic() == StarRocksSinkSemantic.EXACTLY_ONCE) { + openForExactlyOnce(); + } + + log.info("Open sink function v2. {}", EnvUtils.getGitInformation()); + } + + private void openForExactlyOnce() throws Exception { + if (sinkOptions.isAbortLingeringTxns()) { + LingeringTransactionAborter aborter = new LingeringTransactionAborter( + sinkOptions.getLabelPrefix(), + restoredCheckpointId, + getRuntimeContext().getIndexOfThisSubtask(), + sinkOptions.getAbortCheckNumTxns(), + sinkOptions.getDbTables(), + restoredGeneratorSnapshots, + sinkManager.getStreamLoader()); + aborter.execute(); + } + + notifyCheckpointComplete(Long.MAX_VALUE); + } + + private JsonWrapper getOrCreateJsonWrapper() { + if (jsonWrapper == null) { + this.jsonWrapper = new JsonWrapper(); + } + + return jsonWrapper; + } + + public void finish() { + sinkManager.flush(); + } + + @Override + public void close() { + log.info("Close sink function"); + try { + sinkManager.flush(); + } catch (Exception e) { + log.error("Failed to flush when closing", e); + throw e; + } finally { + StreamLoadSnapshot snapshot = sinkManager.snapshot(); + sinkManager.abort(snapshot); + sinkManager.close(); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { + sinkManager.flush(); + if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) { + return; + } + + StreamLoadSnapshot snapshot = sinkManager.snapshot(); + + if (sinkManager.prepare(snapshot)) { + snapshotMap.put(functionSnapshotContext.getCheckpointId(), Collections.singletonList(snapshot)); + + snapshotStates.clear(); + List<ExactlyOnceLabelGeneratorSnapshot> labelSnapshots = exactlyOnceLabelFactory == null ? null + : exactlyOnceLabelFactory.snapshot(functionSnapshotContext.getCheckpointId()); + snapshotStates.add(StarrocksSnapshotState.of(snapshotMap, labelSnapshots)); + } else { + sinkManager.abort(snapshot); + throw new RuntimeException("Snapshot state failed by prepare"); + } + + if (legacyState != null) { + legacyState.clear(); + } + } + + @Override + public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception { + log.info("Initialize state"); + if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) { + return; + } + + ListStateDescriptor<byte[]> descriptor = + new ListStateDescriptor<>( + "starrocks-sink-transaction", + TypeInformation.of(new TypeHint<byte[]>() {}) + ); + + ListState<byte[]> listState = functionInitializationContext.getOperatorStateStore().getListState(descriptor); + snapshotStates = new SimpleVersionedListState<>(listState, new StarRocksVersionedSerializer(getOrCreateJsonWrapper())); + + // old version + ListStateDescriptor<Map<String, StarRocksSinkBufferEntity>> legacyDescriptor = + new ListStateDescriptor<>( + "buffered-rows", + TypeInformation.of(new TypeHint<Map<String, StarRocksSinkBufferEntity>>(){}) + ); + legacyState = functionInitializationContext.getOperatorStateStore().getListState(legacyDescriptor); + this.restoredCheckpointId = 0; + this.restoredGeneratorSnapshots = new ArrayList<>(); + this.snapshotMap = new ConcurrentHashMap<>(); + if (functionInitializationContext.isRestored()) { + for (StarrocksSnapshotState state : snapshotStates.get()) { + for (Map.Entry<Long, List<StreamLoadSnapshot>> entry : state.getData().entrySet()) { + snapshotMap.compute(entry.getKey(), (k, v) -> { + if (v == null) { + return new ArrayList<>(entry.getValue()); + } + v.addAll(entry.getValue()); + return v; + }); + } + + if (state.getLabelSnapshots() != null) { + List<ExactlyOnceLabelGeneratorSnapshot> labelSnapshots = state.getLabelSnapshots(); + restoredGeneratorSnapshots.addAll(labelSnapshots); + long checkpointId = labelSnapshots.isEmpty() ? -1 : labelSnapshots.get(0).getCheckpointId(); + restoredCheckpointId = Math.max(restoredCheckpointId, checkpointId); + } + } + + legacyData = new ArrayList<>(); + for (Map<String, StarRocksSinkBufferEntity> entry : legacyState.get()) { + legacyData.addAll(entry.values()); + } + log.info("There are {} items from legacy state", legacyData.size()); + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) { + return; + } + + boolean succeed = true; + List<Long> commitCheckpointIds = snapshotMap.keySet().stream() + .filter(cpId -> cpId <= checkpointId) + .sorted(Long::compare) + .collect(Collectors.toList()); + + for (Long cpId : commitCheckpointIds) { + try { + for (StreamLoadSnapshot snapshot : snapshotMap.get(cpId)) { + if (!sinkManager.commit(snapshot)) { + succeed = false; + break; + } + } + + if (!succeed) { + throw new RuntimeException(String.format("Failed to commit some transactions for snapshot %s, " + + "please check taskmanager logs for details", cpId)); + } + } catch (Exception e) { + log.error("Failed to notify checkpoint complete, checkpoint id : {}", checkpointId, e); + throw new RuntimeException("Failed to notify checkpoint complete for checkpoint id " + checkpointId, e); + } + + snapshotMap.remove(cpId); + } + + // set legacyState to null to avoid clear it in latter snapshotState + legacyState = null; + } + +}
\ No newline at end of file diff --git a/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java new file mode 100644 index 0000000..337109b --- /dev/null +++ b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java @@ -0,0 +1,28 @@ +package com.starrocks.connector.flink.table.sink;
+
+import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener;
+import com.starrocks.data.load.stream.StreamLoadResponse;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+public class EventStreamLoadListener extends StarRocksStreamLoadListener {
+ private transient InternalMetrics internalMetrics;
+ public EventStreamLoadListener(RuntimeContext context, StarRocksSinkOptions sinkOptions, InternalMetrics internalMetrics) {
+ super(context, sinkOptions);
+ this.internalMetrics = internalMetrics;
+ }
+
+ @Override
+ public void flushSucceedRecord(StreamLoadResponse response) {
+ super.flushSucceedRecord(response);
+ if (response.getFlushRows() != null) {
+ internalMetrics.incrementOutEvents(response.getFlushRows());
+ }
+ }
+
+ @Override
+ public void flushFailedRecord() {
+ super.flushFailedRecord();
+ internalMetrics.incrementErrorEvents(1);
+ }
+}
diff --git a/groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory new file mode 100644 index 0000000..c04c5dc --- /dev/null +++ b/groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory @@ -0,0 +1 @@ +com.geedgenetworks.connectors.starrocks.StarRocksTableFactory
diff --git a/groot-connectors/pom.xml b/groot-connectors/pom.xml index 1747fb3..cf5381c 100644 --- a/groot-connectors/pom.xml +++ b/groot-connectors/pom.xml @@ -17,6 +17,7 @@ <module>connector-ipfix-collector</module> <module>connector-file</module> <module>connector-mock</module> + <module>connector-starrocks</module> </modules> <dependencies> <dependency> diff --git a/groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java b/groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java index ca8d4e5..518a3f4 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java @@ -1,8 +1,11 @@ package com.geedgenetworks.core.types; +import com.alibaba.fastjson2.JSON; import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -29,6 +32,42 @@ public class TypesTest { } @Test + void test() { + Map<String, Object> map = new LinkedHashMap<>(); + map.put("a", 1); + map.put("b", "aa"); + map.put("c", List.of(1, 2, 3)); + map.put("int_array", new int[]{1, 2, 3}); + map.put("str_array", new String[]{"1", "2", "3"}); + map.put("obj_array", new Object[]{"1", "2", "3"}); + String jsonString = JSON.toJSONString(map); + System.out.println(jsonString); + } + + @Test + void test2() { + Object obj = new int[]{1, 2, 3}; + System.out.println(obj instanceof byte[]); + System.out.println(obj instanceof int[]); + System.out.println(obj instanceof String[]); + System.out.println(obj instanceof Object[]); + System.out.println(); + + obj = new String[]{"1", "2", "3"}; + System.out.println(obj instanceof byte[]); + System.out.println(obj instanceof int[]); + System.out.println(obj instanceof String[]); + System.out.println(obj instanceof Object[]); + System.out.println(); + + obj = new Object[]{"1", "2", "3"}; + System.out.println(obj instanceof byte[]); + System.out.println(obj instanceof int[]); + System.out.println(obj instanceof String[]); + System.out.println(obj instanceof Object[]); + } + + @Test void testParserBaseType() { assertEquals(new IntegerType(), Types.parseDataType("INT")); assertEquals(new LongType(), Types.parseDataType("biGint")); diff --git a/groot-release/pom.xml b/groot-release/pom.xml index 229b23f..30803ec 100644 --- a/groot-release/pom.xml +++ b/groot-release/pom.xml @@ -121,6 +121,13 @@ <scope>provided</scope> </dependency> + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>connector-starrocks</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <!--Format Json --> <dependency> <groupId>com.geedgenetworks</groupId> |
