summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-11-10 22:43:16 +0800
committerdoufenghu <[email protected]>2024-11-10 22:43:16 +0800
commit73a5f46181af3c9e596e8b08dc27f63339b04c53 (patch)
tree93bb7a830deb742211ec7cb8d8416002b4a5e54e
parent16769de2e5ba334a5cfaacd8a53db2989264d022 (diff)
[Feature][SPI] SPI/Common module 依赖库梳理,xxExecutor删除不必要的参数传递。
-rw-r--r--docs/connector/connector.md2
-rw-r--r--docs/connector/formats/protobuf.md2
-rw-r--r--docs/connector/source/file.md2
-rw-r--r--docs/grootstream-config.md6
-rw-r--r--docs/processor/projection-processor.md2
-rw-r--r--docs/user-guide.md6
-rw-r--r--groot-bootstrap/pom.xml20
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java19
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/StageType.java32
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java6
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java18
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java24
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java142
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/OperatorNode.java (renamed from groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java)8
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java14
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java14
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java28
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java26
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java32
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java20
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java4
-rw-r--r--groot-common/pom.xml17
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java4
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java2
-rw-r--r--groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java2
-rw-r--r--groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java4
-rw-r--r--groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java5
-rw-r--r--groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java4
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java2
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java2
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java2
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java4
-rw-r--r--groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java19
-rw-r--r--groot-connectors/connector-mock/pom.xml1
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java4
-rw-r--r--groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java4
-rw-r--r--groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java2
-rw-r--r--groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java56
-rw-r--r--groot-connectors/pom.xml5
-rw-r--r--groot-core/pom.xml24
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineSourceProvider.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineTableFactory.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/filter/FilterFunction.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java)38
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunction.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java)9
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFactory.java17
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java)10
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessor.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java)10
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java2
-rw-r--r--groot-core/src/main/resources/META-INF/services/com.geedgenetworks.spi.processor.Processor6
-rw-r--r--groot-core/src/main/resources/META-INF/services/com.geedgenetworks.spi.processor.ProcessorFactory1
-rw-r--r--groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java2
-rw-r--r--groot-examples/pom.xml15
-rw-r--r--groot-formats/format-csv/pom.xml19
-rw-r--r--groot-formats/format-json/pom.xml19
-rw-r--r--groot-formats/format-msgpack/pom.xml19
-rw-r--r--groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java9
-rw-r--r--groot-formats/format-protobuf/pom.xml45
-rw-r--r--groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufEventSchemaTest.java1400
-rw-r--r--groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java9
-rw-r--r--groot-formats/format-protobuf/src/test/resources/format_protobuf_test_data.json1
-rw-r--r--groot-formats/format-raw/pom.xml24
-rw-r--r--groot-formats/pom.xml7
-rw-r--r--groot-spi/pom.xml1
-rw-r--r--groot-spi/src/main/java/com/geedgenetworks/spi/metrics/InternalMetrics.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/metrics/InternalMetrics.java)2
-rw-r--r--groot-spi/src/main/java/com/geedgenetworks/spi/processor/AggregateConfig.java1
-rw-r--r--groot-spi/src/main/java/com/geedgenetworks/spi/processor/AggregateProcessor.java5
-rw-r--r--groot-spi/src/main/java/com/geedgenetworks/spi/processor/Processor.java7
-rw-r--r--groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorConfig.java2
-rw-r--r--groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorFactory.java7
-rw-r--r--groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorProvider.java17
-rw-r--r--groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProjectionProcessor.java5
-rw-r--r--groot-spi/src/main/java/com/geedgenetworks/spi/processor/TableProcessor.java5
-rw-r--r--groot-spi/src/main/java/com/geedgenetworks/spi/sink/SinkProvider.java (renamed from groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SinkProvider.java)2
-rw-r--r--groot-spi/src/main/java/com/geedgenetworks/spi/sink/SinkTableFactory.java (renamed from groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SinkTableFactory.java)2
-rw-r--r--groot-spi/src/main/java/com/geedgenetworks/spi/source/SourceProvider.java (renamed from groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SourceProvider.java)2
-rw-r--r--groot-spi/src/main/java/com/geedgenetworks/spi/source/SourceTableFactory.java (renamed from groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SourceTableFactory.java)2
84 files changed, 1246 insertions, 1096 deletions
diff --git a/docs/connector/connector.md b/docs/connector/connector.md
index 93d64b0..ff495d4 100644
--- a/docs/connector/connector.md
+++ b/docs/connector/connector.md
@@ -70,7 +70,7 @@ schema:
To retrieve the schema from a local file using its absolute path.
-> Ensures that the file path is accessible to all nodes in your Flink cluster.
+> Ensures that the file path is accessible to all operatorNodes in your Flink cluster.
```yaml
schema:
diff --git a/docs/connector/formats/protobuf.md b/docs/connector/formats/protobuf.md
index 2dfb65e..e991fca 100644
--- a/docs/connector/formats/protobuf.md
+++ b/docs/connector/formats/protobuf.md
@@ -13,7 +13,7 @@
## Format Options
-> Ensures that the file path is accessible to all nodes in your Flink cluster.
+> Ensures that the file path is accessible to all operatorNodes in your Flink cluster.
| Name | Type | Required | Default | Description |
|-------------------------------|---------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
diff --git a/docs/connector/source/file.md b/docs/connector/source/file.md
index bdbf74e..75de66b 100644
--- a/docs/connector/source/file.md
+++ b/docs/connector/source/file.md
@@ -24,7 +24,7 @@ File source custom properties.
This example read data of file test source and print to console.
-> Ensures that the file path is accessible to all nodes in your Flink cluster.
+> Ensures that the file path is accessible to all operatorNodes in your Flink cluster.
```yaml
sources:
diff --git a/docs/grootstream-config.md b/docs/grootstream-config.md
index b7fd037..c5e3046 100644
--- a/docs/grootstream-config.md
+++ b/docs/grootstream-config.md
@@ -24,7 +24,7 @@ grootstream:
The knowledge base is a collection of libraries that can be used in the groot-stream job's UDFs. File system type can be specified `local`, `http` or `hdfs`.
If the value is `http`, must be ` QGW Knowledge Base Repository` URL. The library will be dynamically updated according to the `scheduler.knowledge_base.update.interval.minutes` configuration.
-If the value is `local`, the library will be loaded from the local file system. Need to manually upgrade all nodes in the Flink cluster when the library is updated.
+If the value is `local`, the library will be loaded from the local file system. Need to manually upgrade all operatorNodes in the Flink cluster when the library is updated.
If the value is `hdfs`, the library will be loaded from the HDFS file system. More details about hdfs operation can be found in the [HDFS](./faq.md#hadoop-hdfs-commands-for-beginners).
| Name | Type | Required | Default | Description |
@@ -36,7 +36,7 @@ If the value is `hdfs`, the library will be loaded from the HDFS file system. Mo
### Define the knowledge base file from a local file
-> Ensures that the file path is accessible to all nodes in your Flink cluster.
+> Ensures that the file path is accessible to all operatorNodes in your Flink cluster.
```yaml
grootstream:
@@ -65,7 +65,7 @@ grootstream:
### Define the knowledge base file from a HDFS file system
-> Ensure that the HDFS file system is accessible to all nodes in your Flink cluster.
+> Ensure that the HDFS file system is accessible to all operatorNodes in your Flink cluster.
```yaml
grootstream:
diff --git a/docs/processor/projection-processor.md b/docs/processor/projection-processor.md
index 4319f36..d11fcc9 100644
--- a/docs/processor/projection-processor.md
+++ b/docs/processor/projection-processor.md
@@ -38,7 +38,7 @@ filters:
processing_pipelines: # [object] Define Processors
projection_processor: # [object] Define projection processor name
- type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessor
remove_fields: [http_request_line, http_response_line, http_response_content_type]
functions: # [array of object] Define UDFs
- function: DROP # [string] Define DROP function for filter event
diff --git a/docs/user-guide.md b/docs/user-guide.md
index d52cfed..1db9f91 100644
--- a/docs/user-guide.md
+++ b/docs/user-guide.md
@@ -38,7 +38,7 @@ filters:
preprocessing_pipelines:
preprocessor:
- type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessor
functions:
- function: EVAL
output_fields: [additional_field_subdomain]
@@ -47,7 +47,7 @@ preprocessing_pipelines:
processing_pipelines:
processor:
- type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessor
remove_fields: [log_id]
output_fields: []
functions:
@@ -58,7 +58,7 @@ processing_pipelines:
postprocessing_pipelines:
postprocessor:
- type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessor
remove_fields: [dup_traffic_flag]
sinks:
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml
index a6c72e6..150c941 100644
--- a/groot-bootstrap/pom.xml
+++ b/groot-bootstrap/pom.xml
@@ -30,6 +30,18 @@
<dependency>
<groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-spi</artifactId>
+ <version>${revision}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-common</artifactId>
+ <version>${revision}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
<artifactId>connector-kafka</artifactId>
<version>${revision}</version>
<scope>${scope}</scope>
@@ -127,8 +139,6 @@
<scope>${scope}</scope>
</dependency>
-
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.version}</artifactId>
@@ -165,9 +175,6 @@
<scope>${scope}</scope>
</dependency>
-
-
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.version}</artifactId>
@@ -175,9 +182,6 @@
<scope>test</scope>
</dependency>
-
-
-
</dependencies>
<build>
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java
deleted file mode 100644
index 6f33cae..0000000
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.geedgenetworks.bootstrap.enums;
-
-public enum ProcessorType {
- SOURCE("source"),
- FILTER("filter"),
- SPLIT("split"),
- PREPROCESSING("preprocessing"),
- PROCESSING("processing"),
- POSTPROCESSING("postprocessing"),
- SINK("sink");
-
- private final String type;
-
- ProcessorType(String type) {this.type = type;}
-
- public String getType() {
- return type;
- }
-}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/StageType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/StageType.java
new file mode 100644
index 0000000..8b4e154
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/StageType.java
@@ -0,0 +1,32 @@
+package com.geedgenetworks.bootstrap.enums;
+
+public enum StageType {
+ SOURCE("source"),
+ FILTER("filter"),
+ SPLIT("split"),
+ PREPROCESSING("preprocessing"),
+ PROCESSING("processing"),
+ POSTPROCESSING("postprocessing"),
+ SINK("sink");
+
+ private final String type;
+ public String getType() {
+ return type;
+ }
+ StageType(String type) {this.type = type;}
+
+ public static StageType fromType(String type) {
+ for (StageType stage : values()) {
+ if (stage.type.equalsIgnoreCase(type)) {
+ return stage;
+ }
+ }
+ throw new IllegalArgumentException("Unknown type: " + type);
+ }
+
+ @Override
+ public String toString() {
+ return type;
+ }
+
+}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
index e0828a0..fe440f7 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
@@ -16,9 +16,9 @@ public abstract class AbstractExecutor<K, V>
protected final Config operatorConfig;
protected final Map<K,V> operatorMap;
- protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) {
+ protected AbstractExecutor(Config operatorConfig) {
this.operatorConfig = operatorConfig;
- this.operatorMap = initialize(jarPaths, operatorConfig);
+ this.operatorMap = initialize(operatorConfig);
}
@Override
@@ -27,7 +27,7 @@ public abstract class AbstractExecutor<K, V>
}
- protected abstract Map<K, V> initialize(List<URL> jarPaths, Config operatorConfig);
+ protected abstract Map<K, V> initialize(Config operatorConfig);
protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER =
(classLoader, url) -> {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
index ec748cc..a45380e 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
@@ -19,26 +19,24 @@ import java.util.ServiceLoader;
public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, ProcessorConfig> {
- protected AbstractProcessorExecutor(List<URL> jarPaths, Config operatorConfig) {
- super(jarPaths, operatorConfig);
+ protected AbstractProcessorExecutor(Config operatorConfig) {
+ super(operatorConfig);
}
@Override
- public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
- ProcessorConfig processorConfig = operatorMap.get(node.getName());
+ ProcessorConfig processorConfig = operatorMap.get(operatorNode.getName());
boolean found = false; // 标志变量
ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class);
for (Processor processor : processors) {
if(processor.type().equals(processorConfig.getType())){
found = true;
- if (node.getParallelism() > 0) {
- processorConfig.setParallelism(node.getParallelism());
+ if (operatorNode.getParallelism() > 0) {
+ processorConfig.setParallelism(operatorNode.getParallelism());
}
try {
-
- dataStream = processor.processorFunction(
- dataStream, processorConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
+ input = processor.process(jobRuntimeEnvironment.getStreamExecutionEnvironment(), input, processorConfig);
} catch (Exception e) {
throw new JobExecuteException("Create orderby pipeline instance failed!", e);
}
@@ -48,7 +46,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
if (!found) {
throw new JobExecuteException("No matching processor found for type: " + processorConfig.getType());
}
- return dataStream;
+ return input;
}
protected ProcessorConfig checkConfig(String key, Map<String, Object> value, Config processorsConfig) {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java
index d57d6bf..e43c949 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java
@@ -4,7 +4,7 @@ import com.geedgenetworks.bootstrap.exception.JobExecuteException;
public interface Executor<T, ENV extends RuntimeEnvironment> {
- T execute(T dataStream, Node edge) throws JobExecuteException;
+ T execute(T dataStream, OperatorNode edge) throws JobExecuteException;
void setRuntimeEnvironment(ENV runtimeEnvironment);
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
index 1ea19f8..d70420e 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.bootstrap.execution;
-import com.geedgenetworks.bootstrap.enums.ProcessorType;
+import com.geedgenetworks.bootstrap.enums.StageType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.config.Constants;
import com.geedgenetworks.common.config.CheckConfigUtil;
@@ -27,14 +27,14 @@ import java.util.ServiceLoader;
*/
@Slf4j
public class FilterExecutor extends AbstractExecutor<String, FilterConfig> {
- private static final String PROCESSOR_TYPE = ProcessorType.FILTER.getType();
+ private static final String PROCESSOR_TYPE = StageType.FILTER.getType();
- public FilterExecutor(List<URL> jarPaths, Config config) {
- super(jarPaths, config);
+ public FilterExecutor(Config config) {
+ super(config);
}
@Override
- protected Map<String, FilterConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ protected Map<String, FilterConfig> initialize(Config operatorConfig) {
Map<String, FilterConfig> filterConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.FILTERS)) {
Config filterConfig = operatorConfig.getConfig(Constants.FILTERS);
@@ -54,20 +54,20 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> {
}
@Override
- public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
- FilterConfig filterConfig = operatorMap.get(node.getName());
+ public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
+ FilterConfig filterConfig = operatorMap.get(operatorNode.getName());
boolean found = false; // 标志变量
ServiceLoader<Filter> filters = ServiceLoader.load(Filter.class);
for (Filter filter : filters) {
if(filter.type().equals(filterConfig.getType())){
found = true;
- if (node.getParallelism() > 0) {
- filterConfig.setParallelism(node.getParallelism());
+ if (operatorNode.getParallelism() > 0) {
+ filterConfig.setParallelism(operatorNode.getParallelism());
}
try {
- dataStream =
+ input =
filter.filterFunction(
- dataStream, filterConfig);
+ input, filterConfig);
} catch (Exception e) {
throw new JobExecuteException("Create filter instance failed!", e);
}
@@ -77,7 +77,7 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> {
if (!found) {
throw new JobExecuteException("No matching filter found for type: " + filterConfig.getType());
}
- return dataStream;
+ return input;
}
protected FilterConfig checkConfig(String key, Map<String, Object> value, Config config) {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
index 325f8a4..cd70f44 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson2.JSONObject;
-import com.geedgenetworks.bootstrap.enums.ProcessorType;
+import com.geedgenetworks.bootstrap.enums.StageType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.main.GrootStreamRunner;
import com.geedgenetworks.common.config.Constants;
@@ -36,7 +36,7 @@ public class JobExecution {
private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor;
private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor;
private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor;
- private final List<Node> nodes;
+ private final List<OperatorNode> operatorNodes;
private final List<URL> jarPaths;
private final Map<String,String> nodeNameWithSplitTags = new HashMap<>();
@@ -51,13 +51,13 @@ public class JobExecution {
registerPlugin(jobConfig.getConfig(Constants.APPLICATION));
- this.sourceExecutor = new SourceExecutor(jarPaths, jobConfig);
- this.sinkExecutor = new SinkExecutor(jarPaths, jobConfig);
- this.filterExecutor = new FilterExecutor(jarPaths, jobConfig);
- this.splitExecutor = new SplitExecutor(jarPaths, jobConfig);
- this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, jobConfig);
- this.processingExecutor = new ProcessingExecutor(jarPaths, jobConfig);
- this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, jobConfig);
+ this.sourceExecutor = new SourceExecutor(jobConfig);
+ this.sinkExecutor = new SinkExecutor(jobConfig);
+ this.filterExecutor = new FilterExecutor(jobConfig);
+ this.splitExecutor = new SplitExecutor(jobConfig);
+ this.preprocessingExecutor = new PreprocessingExecutor(jobConfig);
+ this.processingExecutor = new ProcessingExecutor(jobConfig);
+ this.postprocessingExecutor = new PostprocessingExecutor(jobConfig);
this.jobRuntimeEnvironment =
JobRuntimeEnvironment.getInstance(this.registerPlugin(jobConfig, jarPaths), grootStreamConfig);
@@ -68,7 +68,7 @@ public class JobExecution {
this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.nodes = buildJobNode(jobConfig);
+ this.operatorNodes = buildJobNode(jobConfig);
}
@@ -153,7 +153,7 @@ public class JobExecution {
return config;
}
- private List<Node> buildJobNode(Config config) {
+ private List<OperatorNode> buildJobNode(Config config) {
Map<String, Object> sources = Maps.newHashMap();
Map<String, Object> sinks = Maps.newHashMap();
@@ -187,34 +187,34 @@ public class JobExecution {
List<? extends Config> topology = config.getConfig(Constants.APPLICATION).getConfigList(Constants.APPLICATION_TOPOLOGY);
- List<Node> nodes = Lists.newArrayList();
+ List<OperatorNode> operatorNodes = Lists.newArrayList();
topology.forEach(item -> {
- Node node = JSONObject.from(item.root().unwrapped()).toJavaObject(Node.class);
- nodes.add(node);
+ OperatorNode operatorNode = JSONObject.from(item.root().unwrapped()).toJavaObject(OperatorNode.class);
+ operatorNodes.add(operatorNode);
});
- for (Node node : nodes) {
- if (sources.containsKey(node.getName())) {
- node.setType(ProcessorType.SOURCE);
- } else if (sinks.containsKey(node.getName())) {
- node.setType(ProcessorType.SINK);
- } else if (filters.containsKey(node.getName())) {
- node.setType(ProcessorType.FILTER);
- } else if (splits.containsKey(node.getName())) {
- node.setType(ProcessorType.SPLIT);
- } else if (preprocessingPipelines.containsKey(node.getName())) {
- node.setType(ProcessorType.PREPROCESSING);
- } else if (processingPipelines.containsKey(node.getName())) {
- node.setType(ProcessorType.PROCESSING);
- } else if (postprocessingPipelines.containsKey(node.getName())) {
- node.setType(ProcessorType.POSTPROCESSING);
+ for (OperatorNode operatorNode : operatorNodes) {
+ if (sources.containsKey(operatorNode.getName())) {
+ operatorNode.setType(StageType.SOURCE);
+ } else if (sinks.containsKey(operatorNode.getName())) {
+ operatorNode.setType(StageType.SINK);
+ } else if (filters.containsKey(operatorNode.getName())) {
+ operatorNode.setType(StageType.FILTER);
+ } else if (splits.containsKey(operatorNode.getName())) {
+ operatorNode.setType(StageType.SPLIT);
+ } else if (preprocessingPipelines.containsKey(operatorNode.getName())) {
+ operatorNode.setType(StageType.PREPROCESSING);
+ } else if (processingPipelines.containsKey(operatorNode.getName())) {
+ operatorNode.setType(StageType.PROCESSING);
+ } else if (postprocessingPipelines.containsKey(operatorNode.getName())) {
+ operatorNode.setType(StageType.POSTPROCESSING);
} else {
- throw new JobExecuteException("unsupported process type " + node.getName());
+ throw new JobExecuteException("unsupported process type " + operatorNode.getName());
}
}
- return nodes;
+ return operatorNodes;
}
@@ -223,14 +223,14 @@ public class JobExecution {
if (!jobRuntimeEnvironment.isLocalMode() && !jobRuntimeEnvironment.isTestMode()) {
jobRuntimeEnvironment.registerPlugin(jarPaths);
}
- List<Node> sourceNodes = nodes
- .stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList());
+ List<OperatorNode> sourceOperatorNodes = operatorNodes
+ .stream().filter(v -> v.getType().name().equals(StageType.SOURCE.name())).collect(Collectors.toList());
DataStream<Event> dataStream = null;
- for (Node sourceNode : sourceNodes) {
- dataStream = sourceExecutor.execute(dataStream, sourceNode);
- for (String nodeName : sourceNode.getDownstream()) {
+ for (OperatorNode sourceOperatorNode : sourceOperatorNodes) {
+ dataStream = sourceExecutor.execute(dataStream, sourceOperatorNode);
+ for (String nodeName : sourceOperatorNode.getDownstream()) {
buildJobGraph(dataStream, nodeName);
}
}
@@ -251,68 +251,68 @@ public class JobExecution {
}
private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) {
- Node node = getNode(downstreamNodeName).orElseGet(() -> {
+ OperatorNode operatorNode = getNode(downstreamNodeName).orElseGet(() -> {
throw new JobExecuteException("Can't find downstream node " + downstreamNodeName);
});
- if (node.getType().name().equals(ProcessorType.FILTER.name())) {
- if (nodeNameWithSplitTags.containsKey(node.getName())) {
- dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) {
- }), node);
+ if (operatorNode.getType().name().equals(StageType.FILTER.name())) {
+ if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) {
+ dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream)
+ .getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) {}), operatorNode);
} else {
- dataStream = filterExecutor.execute(dataStream, node);
+ dataStream = filterExecutor.execute(dataStream, operatorNode);
}
- } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) {
- if (node.getTags().size() == node.getDownstream().size()) {
- for (int i = 0; i < node.getDownstream().size();i++) {
- nodeNameWithSplitTags.put(node.getDownstream().get(i),node.getTags().get(i));
+ } else if (operatorNode.getType().name().equals(StageType.SPLIT.name())) {
+ if (operatorNode.getTags().size() == operatorNode.getDownstream().size()) {
+ for (int i = 0; i < operatorNode.getDownstream().size(); i++) {
+ nodeNameWithSplitTags.put(operatorNode.getDownstream().get(i), operatorNode.getTags().get(i));
}
}
else {
throw new JobExecuteException("split node downstream size not equal tags size");
}
- dataStream = splitExecutor.execute(dataStream, node);
- } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) {
- if (nodeNameWithSplitTags.containsKey(node.getName())) {
- dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())){
- }), node);
+ dataStream = splitExecutor.execute(dataStream, operatorNode);
+ } else if (operatorNode.getType().name().equals(StageType.PREPROCESSING.name())) {
+ if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) {
+ dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())){
+ }), operatorNode);
} else {
- dataStream = preprocessingExecutor.execute(dataStream, node);
+ dataStream = preprocessingExecutor.execute(dataStream, operatorNode);
}
- } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) {
- if (nodeNameWithSplitTags.containsKey(node.getName())) {
- dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) {
- }), node);
+ } else if (operatorNode.getType().name().equals(StageType.PROCESSING.name())) {
+ if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) {
+ dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) {
+ }), operatorNode);
} else {
- dataStream = processingExecutor.execute(dataStream, node);
+ dataStream = processingExecutor.execute(dataStream, operatorNode);
}
- } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) {
- if (nodeNameWithSplitTags.containsKey(node.getName())) {
- dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) {
- }), node);
+ } else if (operatorNode.getType().name().equals(StageType.POSTPROCESSING.name())) {
+ if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) {
+ dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) {
+ }), operatorNode);
} else {
- dataStream = postprocessingExecutor.execute(dataStream, node);
+ dataStream = postprocessingExecutor.execute(dataStream, operatorNode);
}
- } else if (node.getType().name().equals(ProcessorType.SINK.name())) {
- if (nodeNameWithSplitTags.containsKey(node.getName())) {
- dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) {
- }), node);
+ } else if (operatorNode.getType().name().equals(StageType.SINK.name())) {
+ if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) {
+ dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) {
+ }), operatorNode);
} else {
- dataStream = sinkExecutor.execute(dataStream, node);
+ dataStream = sinkExecutor.execute(dataStream, operatorNode);
}
} else {
- throw new JobExecuteException("unsupported process type " + node.getType().name());
+ throw new JobExecuteException("unsupported process type " + operatorNode.getType().name());
}
- for (String nodeName : node.getDownstream()) {
+ for (String nodeName : operatorNode.getDownstream()) {
buildJobGraph(dataStream, nodeName);
}
}
- private Optional<Node> getNode(String name) {
- return nodes.stream().filter(v -> v.getName().equals(name)).findFirst();
+ private Optional<OperatorNode> getNode(String name) {
+ return operatorNodes.stream().filter(v -> v.getName().equals(name)).findFirst();
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/OperatorNode.java
index 66303c2..8c4b392 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/OperatorNode.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.bootstrap.execution;
-import com.geedgenetworks.bootstrap.enums.ProcessorType;
+import com.geedgenetworks.bootstrap.enums.StageType;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -14,11 +14,11 @@ import java.util.List;
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
-public class Node implements Serializable {
+public class OperatorNode implements Serializable {
private String name;
- private ProcessorType type;
- private List<String> downstream = Collections.emptyList();
+ private StageType type;
private int parallelism;
+ private List<String> downstream = Collections.emptyList();
private List<String> tags = Collections.emptyList();
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
index e73b7dd..10d9188 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.bootstrap.execution;
-import com.geedgenetworks.bootstrap.enums.ProcessorType;
+import com.geedgenetworks.bootstrap.enums.StageType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.config.Constants;
import com.geedgenetworks.spi.processor.ProcessorConfig;
@@ -17,14 +17,14 @@ import java.util.Map;
* Initialize config and execute postprocessor
*/
public class PostprocessingExecutor extends AbstractProcessorExecutor {
- private static final String PROCESSOR_TYPE = ProcessorType.POSTPROCESSING.getType();
+ private static final String PROCESSOR_TYPE = StageType.POSTPROCESSING.getType();
- public PostprocessingExecutor(List<URL> jarPaths, Config config) {
- super(jarPaths, config);
+ public PostprocessingExecutor(Config config) {
+ super(config);
}
@Override
- protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ protected Map<String, ProcessorConfig> initialize(Config operatorConfig) {
Map<String, ProcessorConfig> postprocessingConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) {
Config postprocessors = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES);
@@ -37,7 +37,7 @@ public class PostprocessingExecutor extends AbstractProcessorExecutor {
@Override
- public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
- return super.execute(dataStream, node);
+ public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
+ return super.execute(input, operatorNode);
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
index 6179265..9acda99 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.bootstrap.execution;
-import com.geedgenetworks.bootstrap.enums.ProcessorType;
+import com.geedgenetworks.bootstrap.enums.StageType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.config.Constants;
import com.geedgenetworks.spi.processor.ProcessorConfig;
@@ -19,14 +19,14 @@ import java.util.Map;
*/
@Slf4j
public class PreprocessingExecutor extends AbstractProcessorExecutor {
- private static final String PROCESSOR_TYPE = ProcessorType.PREPROCESSING.getType();
+ private static final String PROCESSOR_TYPE = StageType.PREPROCESSING.getType();
- public PreprocessingExecutor(List<URL> jarPaths, Config config) {
- super(jarPaths, config);
+ public PreprocessingExecutor(Config config) {
+ super(config);
}
@Override
- protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ protected Map<String, ProcessorConfig> initialize(Config operatorConfig) {
Map<String, ProcessorConfig> preprocessingConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) {
Config preprocessors = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES);
@@ -38,9 +38,9 @@ public class PreprocessingExecutor extends AbstractProcessorExecutor {
}
@Override
- public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
+ public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
- return super.execute(dataStream, node);
+ return super.execute(input, operatorNode);
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
index bc6a09e..c49df88 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
@@ -1,15 +1,18 @@
package com.geedgenetworks.bootstrap.execution;
-import com.geedgenetworks.bootstrap.enums.ProcessorType;
+import com.geedgenetworks.bootstrap.enums.StageType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.config.Constants;
+import com.geedgenetworks.spi.processor.Processor;
import com.geedgenetworks.spi.processor.ProcessorConfig;
+import com.geedgenetworks.spi.processor.ProcessorProvider;
import com.geedgenetworks.spi.table.event.Event;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import org.apache.flink.streaming.api.datastream.DataStream;
import java.net.URL;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -17,27 +20,30 @@ import java.util.Map;
* Initialize config and execute processor
*/
public class ProcessingExecutor extends AbstractProcessorExecutor {
- private static final String PROCESSOR_TYPE = ProcessorType.PROCESSING.getType();
+ private static final String PROCESSOR_TYPE = StageType.PROCESSING.getType();
+ //private Map<String, Processor<?>> processors;
- public ProcessingExecutor(List<URL> jarPaths, Config config) {
- super(jarPaths, config);
+ public ProcessingExecutor(Config config) {
+ super(config);
}
@Override
- protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ protected Map<String, ProcessorConfig> initialize(Config operatorConfig) {
Map<String, ProcessorConfig> processingConfigMap = Maps.newHashMap();
+ //processors = new HashMap<>();
if (operatorConfig.hasPath(Constants.PROCESSING_PIPELINES)) {
- Config processors = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES);
- processors.root().unwrapped().forEach((key, value) -> {
- processingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, processors));
+ Config processingConfig = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES);
+ processingConfig.root().unwrapped().forEach((key, value) -> {
+ processingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, processingConfig));
+ //processors.put(key, ProcessorProvider.load(((Map<?, ?>) value).get("type").toString()));
+
});
}
return processingConfigMap;
}
@Override
- public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
-
- return super.execute(dataStream, node);
+ public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
+ return super.execute(input, operatorNode);
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
index b61b6f9..130705a 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson.JSONObject;
-import com.geedgenetworks.bootstrap.enums.ProcessorType;
+import com.geedgenetworks.bootstrap.enums.StageType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.utils.SchemaConfigParse;
import com.geedgenetworks.common.config.Constants;
@@ -11,8 +11,8 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
import com.geedgenetworks.spi.sink.SinkConfig;
import com.geedgenetworks.spi.sink.SinkConfigOptions;
-import com.geedgenetworks.spi.table.connector.SinkProvider;
-import com.geedgenetworks.spi.table.connector.SinkTableFactory;
+import com.geedgenetworks.spi.sink.SinkProvider;
+import com.geedgenetworks.spi.sink.SinkTableFactory;
import com.geedgenetworks.spi.table.event.Event;
import com.geedgenetworks.spi.table.factory.FactoryUtil;
import com.geedgenetworks.spi.table.factory.TableFactory;
@@ -24,8 +24,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import java.net.URL;
-import java.util.List;
import java.util.Map;
/**
@@ -33,13 +31,13 @@ import java.util.Map;
*/
@Slf4j
public class SinkExecutor extends AbstractExecutor<String, SinkConfig> {
- private static final String PROCESSOR_TYPE = ProcessorType.SINK.getType();
+ private static final String PROCESSOR_TYPE = StageType.SINK.getType();
- public SinkExecutor(List<URL> jarPaths, Config config) {
- super(jarPaths, config);
+ public SinkExecutor(Config config) {
+ super(config);
}
@Override
- protected Map<String, SinkConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ protected Map<String, SinkConfig> initialize(Config operatorConfig) {
Map<String, SinkConfig> sinkConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.SINKS)) {
@@ -64,8 +62,8 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> {
}
@Override
- public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
- SinkConfig sinkConfig = operatorMap.get(node.getName());
+ public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
+ SinkConfig sinkConfig = operatorMap.get(operatorNode.getName());
try {
SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, sinkConfig.getType());
Map<String, String> options = sinkConfig.getProperties();
@@ -84,9 +82,9 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> {
System.out.println(String.format("sink(%s) schema:\n%s", sinkConfig.getName(), schema.getDataType().treeString()));
}
- DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(dataStream);
- if (node.getParallelism() > 0) {
- dataStreamSink.setParallelism(node.getParallelism());
+ DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(input);
+ if (operatorNode.getParallelism() > 0) {
+ dataStreamSink.setParallelism(operatorNode.getParallelism());
}
dataStreamSink.name(sinkConfig.getName());
} catch (Exception e) {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
index 3eeaad6..5109540 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson2.JSONObject;
-import com.geedgenetworks.bootstrap.enums.ProcessorType;
+import com.geedgenetworks.bootstrap.enums.StageType;
import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.utils.SchemaConfigParse;
@@ -12,8 +12,8 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
import com.geedgenetworks.spi.configuration.SourceConfigOptions;
import com.geedgenetworks.spi.source.SourceConfig;
-import com.geedgenetworks.spi.table.connector.SourceProvider;
-import com.geedgenetworks.spi.table.connector.SourceTableFactory;
+import com.geedgenetworks.spi.source.SourceProvider;
+import com.geedgenetworks.spi.source.SourceTableFactory;
import com.geedgenetworks.spi.table.event.Event;
import com.geedgenetworks.spi.table.factory.FactoryUtil;
import com.geedgenetworks.spi.table.factory.TableFactory;
@@ -28,9 +28,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import java.net.URL;
import java.time.Duration;
-import java.util.List;
import java.util.Map;
/**
@@ -38,13 +36,13 @@ import java.util.Map;
*/
@Slf4j
public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
- private static final String PROCESSOR_TYPE = ProcessorType.SOURCE.getType();
+ private static final String PROCESSOR_TYPE = StageType.SOURCE.getType();
- public SourceExecutor(List<URL> jarPaths, Config config) {
- super(jarPaths, config);
+ public SourceExecutor(Config config) {
+ super(config);
}
@Override
- protected Map<String, SourceConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ protected Map<String, SourceConfig> initialize(Config operatorConfig) {
Map<String, SourceConfig> sourceConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.SOURCES)) {
Config sources = operatorConfig.getConfig(Constants.SOURCES);
@@ -68,8 +66,8 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
}
@Override
- public DataStream<Event> execute(DataStream<Event> outputStreamOperator, Node node) throws JobExecuteException {
- SourceConfig sourceConfig = operatorMap.get(node.getName());
+ public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
+ SourceConfig sourceConfig = operatorMap.get(operatorNode.getName());
SingleOutputStreamOperator sourceSingleOutputStreamOperator;
try {
SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, sourceConfig.getType());
@@ -90,17 +88,17 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
}
sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment()).name(sourceConfig.getName());
- if (node.getParallelism() > 0) {
- sourceSingleOutputStreamOperator.setParallelism(node.getParallelism());
+ if (operatorNode.getParallelism() > 0) {
+ sourceSingleOutputStreamOperator.setParallelism(operatorNode.getParallelism());
}
- sourceSingleOutputStreamOperator = setWatermarkIfNecessary(sourceSingleOutputStreamOperator, sourceConfig, node);
+ sourceSingleOutputStreamOperator = setWatermarkIfNecessary(sourceSingleOutputStreamOperator, sourceConfig, operatorNode);
return sourceSingleOutputStreamOperator;
} catch (Exception e) {
throw new JobExecuteException("Create source instance failed!", e);
}
}
- private SingleOutputStreamOperator<Event> setWatermarkIfNecessary(SingleOutputStreamOperator<Event> dataStream, SourceConfig sourceConfig, Node node){
+ private SingleOutputStreamOperator<Event> setWatermarkIfNecessary(SingleOutputStreamOperator<Event> dataStream, SourceConfig sourceConfig, OperatorNode operatorNode){
final String watermarkTimestamp = sourceConfig.getWatermark_timestamp();
if(StringUtils.isNotBlank(watermarkTimestamp)){
String timestampUnit = sourceConfig.getWatermark_timestamp_unit();
@@ -139,8 +137,8 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> {
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofMillis(watermarkLag))
.withTimestampAssigner(timestampAssigner)
);
- if (node.getParallelism() > 0) {
- dataStream.setParallelism(node.getParallelism());
+ if (operatorNode.getParallelism() > 0) {
+ dataStream.setParallelism(operatorNode.getParallelism());
}
}
return dataStream;
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
index 3d6f264..c142614 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
@@ -29,12 +29,12 @@ import java.util.ServiceLoader;
public class SplitExecutor extends AbstractExecutor<String, SplitConfig> {
- public SplitExecutor(List<URL> jarPaths, Config config) {
- super(jarPaths, config);
+ public SplitExecutor(Config config) {
+ super(config);
}
@Override
- protected Map<String, SplitConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
+ protected Map<String, SplitConfig> initialize(Config operatorConfig) {
Map<String, SplitConfig> splitConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.SPLITS)) {
Config splitsConfig = operatorConfig.getConfig(Constants.SPLITS);
@@ -56,20 +56,20 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> {
}
@Override
- public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
- SplitConfig splitConfig = operatorMap.get(node.getName());
+ public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException {
+ SplitConfig splitConfig = operatorMap.get(operatorNode.getName());
boolean found = false; // 标志变量
ServiceLoader<Split> splits = ServiceLoader.load(Split.class);
for (Split split : splits) {
found = true; // 标志变量
if(split.type().equals(splitConfig.getType())){
- if (node.getParallelism() > 0) {
- splitConfig.setParallelism(node.getParallelism());
+ if (operatorNode.getParallelism() > 0) {
+ splitConfig.setParallelism(operatorNode.getParallelism());
}
try {
- dataStream =
+ input =
split.splitFunction(
- dataStream, splitConfig);
+ input, splitConfig);
} catch (Exception e) {
throw new JobExecuteException("Create split instance failed!", e);
}
@@ -79,7 +79,7 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> {
if (!found) {
throw new JobExecuteException("No matching split found for type: " + splitConfig.getType());
}
- return dataStream;
+ return input;
}
protected SplitConfig checkConfig(String key, Map<String, Object> value, Config config) {
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java
index e52fd3b..130478e 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.bootstrap.main.simple.collect;
-import com.geedgenetworks.spi.table.connector.SinkProvider;
-import com.geedgenetworks.spi.table.connector.SinkTableFactory;
+import com.geedgenetworks.spi.sink.SinkProvider;
+import com.geedgenetworks.spi.sink.SinkTableFactory;
import com.geedgenetworks.spi.table.event.Event;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.streaming.api.datastream.DataStream;
diff --git a/groot-common/pom.xml b/groot-common/pom.xml
index 74960b6..66096ae 100644
--- a/groot-common/pom.xml
+++ b/groot-common/pom.xml
@@ -34,6 +34,23 @@
</dependency>
<dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>sketches</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.alibaba.nacos</groupId>
+ <artifactId>nacos-client</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+
+ <dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java
index afb9906..53cf99a 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java
@@ -1,8 +1,8 @@
package com.geedgenetworks.connectors.clickhouse;
import com.geedgenetworks.connectors.clickhouse.sink.EventBatchIntervalClickHouseSink;
-import com.geedgenetworks.spi.table.connector.SinkProvider;
-import com.geedgenetworks.spi.table.connector.SinkTableFactory;
+import com.geedgenetworks.spi.sink.SinkProvider;
+import com.geedgenetworks.spi.sink.SinkTableFactory;
import com.geedgenetworks.spi.table.event.Event;
import com.geedgenetworks.spi.table.factory.FactoryUtil;
import com.geedgenetworks.spi.table.factory.FactoryUtil.TableFactoryHelper;
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
index 1726fdd..8a52fb9 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
@@ -2,7 +2,7 @@ package com.geedgenetworks.connectors.clickhouse.sink;
import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.geedgenetworks.spi.metrics.InternalMetrics;
import com.geedgenetworks.spi.table.event.Event;
import com.geedgenetworks.spi.table.schema.Schema;
import com.geedgenetworks.spi.table.schema.SchemaChangeAware;
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java
index 5596049..a946f84 100644
--- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java
+++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.connectors.file;
-import com.geedgenetworks.spi.table.connector.SourceProvider;
+import com.geedgenetworks.spi.source.SourceProvider;
import com.geedgenetworks.spi.table.event.Event;
import com.geedgenetworks.spi.table.type.StructType;
import org.apache.commons.io.IOUtils;
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java
index 8add991..36a7610 100644
--- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java
+++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java
@@ -1,8 +1,8 @@
package com.geedgenetworks.connectors.file;
import com.geedgenetworks.spi.table.connector.DecodingFormat;
-import com.geedgenetworks.spi.table.connector.SourceProvider;
-import com.geedgenetworks.spi.table.connector.SourceTableFactory;
+import com.geedgenetworks.spi.source.SourceProvider;
+import com.geedgenetworks.spi.source.SourceTableFactory;
import com.geedgenetworks.spi.table.factory.DecodingFormatFactory;
import com.geedgenetworks.spi.table.factory.FactoryUtil;
import com.geedgenetworks.spi.table.type.StructType;
diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java
index d075307..7e86a2c 100644
--- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java
+++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java
@@ -3,10 +3,9 @@ package com.geedgenetworks.connectors.ipfix.collector;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.geedgenetworks.connectors.ipfix.collector.utils.IPFixUtil;
-import com.geedgenetworks.core.metrics.InternalMetrics;
-import com.geedgenetworks.spi.table.connector.SourceProvider;
+import com.geedgenetworks.spi.metrics.InternalMetrics;
+import com.geedgenetworks.spi.source.SourceProvider;
import com.geedgenetworks.spi.table.type.*;
-
import com.geedgenetworks.spi.table.event.Event;
import com.geedgenetworks.spi.table.type.DataType;
import com.geedgenetworks.spi.table.type.StructType;
diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java
index 2853f1c..2865019 100644
--- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java
+++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.connectors.ipfix.collector;
-import com.geedgenetworks.spi.table.connector.SourceProvider;
-import com.geedgenetworks.spi.table.connector.SourceTableFactory;
+import com.geedgenetworks.spi.source.SourceProvider;
+import com.geedgenetworks.spi.source.SourceTableFactory;
import com.geedgenetworks.spi.table.factory.FactoryUtil;
import com.geedgenetworks.spi.table.type.StructType;
import org.apache.flink.configuration.ConfigOption;
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java
index 19856e9..65b01d4 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.connectors.kafka;
-import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.geedgenetworks.spi.metrics.InternalMetrics;
import com.geedgenetworks.spi.table.event.Event;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
index 8c78669..b61b376 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
@@ -2,7 +2,7 @@ package com.geedgenetworks.connectors.kafka;
import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategy;
import com.geedgenetworks.spi.table.connector.EncodingFormat;
-import com.geedgenetworks.spi.table.connector.SinkProvider;
+import com.geedgenetworks.spi.sink.SinkProvider;
import com.geedgenetworks.spi.table.event.Event;
import com.geedgenetworks.spi.table.type.StructType;
import org.apache.flink.api.common.serialization.SerializationSchema;
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java
index 8ce7f19..81d766e 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.connectors.kafka;
import com.geedgenetworks.spi.table.connector.DecodingFormat;
-import com.geedgenetworks.spi.table.connector.SourceProvider;
+import com.geedgenetworks.spi.source.SourceProvider;
import com.geedgenetworks.spi.table.event.Event;
import com.geedgenetworks.spi.table.type.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
index 9a20ef5..0478e00 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
@@ -4,6 +4,10 @@ import com.geedgenetworks.connectors.kafka.rate.BlockDropRateLimitingStrategy;
import com.geedgenetworks.connectors.kafka.rate.NoRateLimitingStrategy;
import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategy;
import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategyType;
+import com.geedgenetworks.spi.sink.SinkProvider;
+import com.geedgenetworks.spi.sink.SinkTableFactory;
+import com.geedgenetworks.spi.source.SourceProvider;
+import com.geedgenetworks.spi.source.SourceTableFactory;
import com.geedgenetworks.spi.table.connector.*;
import com.geedgenetworks.spi.table.factory.DecodingFormatFactory;
import com.geedgenetworks.spi.table.factory.EncodingFormatFactory;
diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java
index 3b7e0c5..239d125 100644
--- a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java
+++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java
@@ -1,25 +1,8 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package org.apache.flink.streaming.connectors.kafka;
import com.geedgenetworks.connectors.kafka.rate.RateLimitingStatus;
import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategy;
-import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.geedgenetworks.spi.metrics.InternalMetrics;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
diff --git a/groot-connectors/connector-mock/pom.xml b/groot-connectors/connector-mock/pom.xml
index 4932eec..b13f7a5 100644
--- a/groot-connectors/connector-mock/pom.xml
+++ b/groot-connectors/connector-mock/pom.xml
@@ -18,6 +18,7 @@
<artifactId>datafaker</artifactId>
<version>1.9.0</version>
</dependency>
+
</dependencies>
</project> \ No newline at end of file
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java
index 57432cd..a978938 100644
--- a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java
@@ -2,8 +2,8 @@ package com.geedgenetworks.connectors.mock;
import com.geedgenetworks.connectors.mock.faker.FakerUtils;
import com.geedgenetworks.connectors.mock.faker.ObjectFaker;
-import com.geedgenetworks.spi.table.connector.SourceProvider;
-import com.geedgenetworks.spi.table.connector.SourceTableFactory;
+import com.geedgenetworks.spi.source.SourceProvider;
+import com.geedgenetworks.spi.source.SourceTableFactory;
import com.geedgenetworks.spi.table.event.Event;
import com.geedgenetworks.spi.table.factory.FactoryUtil;
import com.geedgenetworks.spi.table.type.StructType;
diff --git a/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java b/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java
index 09446fd..c7ada13 100644
--- a/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java
+++ b/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java
@@ -1,8 +1,8 @@
package com.geedgenetworks.connectors.starrocks;
-import com.geedgenetworks.spi.table.connector.SinkProvider;
-import com.geedgenetworks.spi.table.connector.SinkTableFactory;
+import com.geedgenetworks.spi.sink.SinkProvider;
+import com.geedgenetworks.spi.sink.SinkTableFactory;
import com.geedgenetworks.spi.table.event.Event;
import com.geedgenetworks.spi.table.factory.FactoryUtil;
import com.starrocks.connector.flink.table.sink.EventStarRocksDynamicSinkFunctionV2;
diff --git a/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java
index 94aa194..63920ab 100644
--- a/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java
+++ b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java
@@ -1,7 +1,7 @@
package com.starrocks.connector.flink.table.sink;
import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.geedgenetworks.spi.metrics.InternalMetrics;
import com.geedgenetworks.spi.table.event.Event;
import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity;
import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener;
diff --git a/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java
index 337109b..d1aed43 100644
--- a/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java
+++ b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java
@@ -1,28 +1,28 @@
-package com.starrocks.connector.flink.table.sink;
-
-import com.geedgenetworks.core.metrics.InternalMetrics;
-import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener;
-import com.starrocks.data.load.stream.StreamLoadResponse;
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-public class EventStreamLoadListener extends StarRocksStreamLoadListener {
- private transient InternalMetrics internalMetrics;
- public EventStreamLoadListener(RuntimeContext context, StarRocksSinkOptions sinkOptions, InternalMetrics internalMetrics) {
- super(context, sinkOptions);
- this.internalMetrics = internalMetrics;
- }
-
- @Override
- public void flushSucceedRecord(StreamLoadResponse response) {
- super.flushSucceedRecord(response);
- if (response.getFlushRows() != null) {
- internalMetrics.incrementOutEvents(response.getFlushRows());
- }
- }
-
- @Override
- public void flushFailedRecord() {
- super.flushFailedRecord();
- internalMetrics.incrementErrorEvents(1);
- }
-}
+package com.starrocks.connector.flink.table.sink;
+
+import com.geedgenetworks.spi.metrics.InternalMetrics;
+import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener;
+import com.starrocks.data.load.stream.StreamLoadResponse;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+public class EventStreamLoadListener extends StarRocksStreamLoadListener {
+ private transient InternalMetrics internalMetrics;
+ public EventStreamLoadListener(RuntimeContext context, StarRocksSinkOptions sinkOptions, InternalMetrics internalMetrics) {
+ super(context, sinkOptions);
+ this.internalMetrics = internalMetrics;
+ }
+
+ @Override
+ public void flushSucceedRecord(StreamLoadResponse response) {
+ super.flushSucceedRecord(response);
+ if (response.getFlushRows() != null) {
+ internalMetrics.incrementOutEvents(response.getFlushRows());
+ }
+ }
+
+ @Override
+ public void flushFailedRecord() {
+ super.flushFailedRecord();
+ internalMetrics.incrementErrorEvents(1);
+ }
+}
diff --git a/groot-connectors/pom.xml b/groot-connectors/pom.xml
index 302e7e4..939f3bb 100644
--- a/groot-connectors/pom.xml
+++ b/groot-connectors/pom.xml
@@ -20,6 +20,7 @@
<module>connector-starrocks</module>
</modules>
<dependencies>
+
<dependency>
<groupId>com.geedgenetworks</groupId>
<artifactId>groot-spi</artifactId>
@@ -29,14 +30,16 @@
<dependency>
<groupId>com.geedgenetworks</groupId>
- <artifactId>groot-core</artifactId>
+ <artifactId>groot-common</artifactId>
<version>${revision}</version>
<scope>provided</scope>
</dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
+ <scope>provided</scope>
</dependency>
</dependencies>
diff --git a/groot-core/pom.xml b/groot-core/pom.xml
index a5471ef..fb92c8d 100644
--- a/groot-core/pom.xml
+++ b/groot-core/pom.xml
@@ -17,6 +17,14 @@
<groupId>com.geedgenetworks</groupId>
<artifactId>groot-spi</artifactId>
<version>${revision}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-common</artifactId>
+ <version>${revision}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
@@ -40,22 +48,6 @@
</dependency>
<dependency>
- <groupId>com.geedgenetworks</groupId>
- <artifactId>sketches</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba.nacos</groupId>
- <artifactId>nacos-client</artifactId>
- <exclusions>
- <exclusion>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
</dependency>
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineSourceProvider.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineSourceProvider.java
index 279b8c1..d6e2ba9 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineSourceProvider.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineSourceProvider.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.core.connector.inline;
-import com.geedgenetworks.spi.table.connector.SourceProvider;
+import com.geedgenetworks.spi.source.SourceProvider;
import com.geedgenetworks.spi.table.connector.DecodingFormat;
import com.geedgenetworks.spi.table.event.Event;
import com.geedgenetworks.spi.table.type.StructType;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineTableFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineTableFactory.java
index 67b02b9..3117ef9 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineTableFactory.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineTableFactory.java
@@ -1,9 +1,9 @@
package com.geedgenetworks.core.connector.inline;
import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.spi.table.connector.SourceProvider;
+import com.geedgenetworks.spi.source.SourceProvider;
import com.geedgenetworks.spi.table.connector.DecodingFormat;
-import com.geedgenetworks.spi.table.connector.SourceTableFactory;
+import com.geedgenetworks.spi.source.SourceTableFactory;
import com.geedgenetworks.spi.table.factory.DecodingFormatFactory;
import com.geedgenetworks.spi.table.factory.FactoryUtil;
import com.geedgenetworks.spi.table.type.StructType;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java
index f3dfe37..cd8a4f6 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.core.connector.print;
import com.geedgenetworks.spi.table.connector.EncodingFormat;
-import com.geedgenetworks.spi.table.connector.SinkTableFactory;
+import com.geedgenetworks.spi.sink.SinkTableFactory;
import com.geedgenetworks.spi.table.event.Event;
import com.geedgenetworks.spi.table.type.StructType;
import org.apache.flink.configuration.ConfigOption;
@@ -9,7 +9,7 @@ import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import com.geedgenetworks.spi.table.connector.SinkProvider;
+import com.geedgenetworks.spi.sink.SinkProvider;
import com.geedgenetworks.spi.table.factory.FactoryUtil;
import com.geedgenetworks.spi.table.factory.EncodingFormatFactory;
import java.util.HashSet;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/FilterFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/FilterFunction.java
index 0549105..ec6a13c 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/filter/FilterFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/FilterFunction.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.core.filter;
import com.geedgenetworks.common.utils.ColumnUtil;
-import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.geedgenetworks.spi.metrics.InternalMetrics;
import com.geedgenetworks.spi.table.event.Event;
import com.geedgenetworks.spi.filter.FilterConfig;
import com.googlecode.aviator.AviatorEvaluator;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
index 268d50f..125ee7e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java
@@ -7,10 +7,10 @@ import com.geedgenetworks.common.config.Constants;
import com.geedgenetworks.common.config.KeybyEntity;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.core.metrics.InternalMetrics;
import com.geedgenetworks.spi.common.udf.AggregateFunction;
import com.geedgenetworks.spi.common.udf.UDFContext;
import com.geedgenetworks.spi.common.udf.UdfEntity;
+import com.geedgenetworks.spi.metrics.InternalMetrics;
import com.geedgenetworks.spi.processor.AggregateConfig;
import com.geedgenetworks.spi.table.event.Event;
import com.google.common.collect.Lists;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java
index 2fdcc3d..cbc80a2 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java
@@ -11,9 +11,9 @@ import com.geedgenetworks.spi.processor.AggregateConfig;
import com.geedgenetworks.spi.table.event.Event;
import com.geedgenetworks.spi.processor.Processor;
import com.typesafe.config.Config;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
@@ -24,41 +24,41 @@ import java.util.Map;
import static com.geedgenetworks.common.config.Constants.*;
-public class AggregateProcessorImpl implements Processor<AggregateConfig> {
+public class AggregateProcessor implements Processor<AggregateConfig> {
@Override
- public DataStream<Event> processorFunction(DataStream<Event> grootEventSingleOutputStreamOperator, AggregateConfig aggregateConfig, ExecutionConfig config) {
+ public DataStream<Event> process(StreamExecutionEnvironment env, DataStream<Event> input, AggregateConfig aggregateConfig) {
SingleOutputStreamOperator<Event> singleOutputStreamOperator;
if (aggregateConfig.getMini_batch()) {
switch (aggregateConfig.getWindow_type()) {
case TUMBLING_PROCESSING_TIME:
- singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ singleOutputStreamOperator = input
.process(new FirstAggregationProcessingTime(aggregateConfig, aggregateConfig.getWindow_size()))
.keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields()))
.window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size())))
- .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ .aggregate(new SecondAggregateProcessorFunction(env, aggregateConfig), new ProcessWindowFunction(aggregateConfig));
break;
case TUMBLING_EVENT_TIME:
- singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ singleOutputStreamOperator = input
.process(new FirstAggregationEventTime(aggregateConfig, aggregateConfig.getWindow_size()))
.keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields()))
.window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size())))
- .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ .aggregate(new SecondAggregateProcessorFunction(env, aggregateConfig), new ProcessWindowFunction(aggregateConfig));
break;
case SLIDING_PROCESSING_TIME:
- singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ singleOutputStreamOperator = input
.process(new FirstAggregationProcessingTime(aggregateConfig, aggregateConfig.getWindow_slide()))
.keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields()))
.window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide())))
- .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ .aggregate(new SecondAggregateProcessorFunction(env, aggregateConfig), new ProcessWindowFunction(aggregateConfig));
break;
case SLIDING_EVENT_TIME:
- singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ singleOutputStreamOperator = input
.process(new FirstAggregationEventTime(aggregateConfig, aggregateConfig.getWindow_slide()))
.keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields()))
.window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide())))
- .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ .aggregate(new SecondAggregateProcessorFunction(env, aggregateConfig), new ProcessWindowFunction(aggregateConfig));
break;
default:
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type");
@@ -67,28 +67,28 @@ public class AggregateProcessorImpl implements Processor<AggregateConfig> {
} else {
switch (aggregateConfig.getWindow_type()) {
case TUMBLING_PROCESSING_TIME:
- singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ singleOutputStreamOperator = input
.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields()))
.window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size())))
- .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ .aggregate(new AggregateProcessorFunction(env, aggregateConfig), new ProcessWindowFunction(aggregateConfig));
break;
case TUMBLING_EVENT_TIME:
- singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ singleOutputStreamOperator = input
.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields()))
.window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size())))
- .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ .aggregate(new AggregateProcessorFunction(env, aggregateConfig), new ProcessWindowFunction(aggregateConfig));
break;
case SLIDING_PROCESSING_TIME:
- singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ singleOutputStreamOperator = input
.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields()))
.window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide())))
- .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ .aggregate(new AggregateProcessorFunction(env, aggregateConfig), new ProcessWindowFunction(aggregateConfig));
break;
case SLIDING_EVENT_TIME:
- singleOutputStreamOperator = grootEventSingleOutputStreamOperator
+ singleOutputStreamOperator = input
.keyBy(new KeySelector(aggregateConfig.getGroup_by_fields()))
.window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide())))
- .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig));
+ .aggregate(new AggregateProcessorFunction(env, aggregateConfig), new ProcessWindowFunction(aggregateConfig));
break;
default:
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type");
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
index 8e902b6..0b22faa 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
@@ -18,6 +18,7 @@ import com.googlecode.aviator.Options;
import com.googlecode.aviator.exception.ExpressionRuntimeException;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.HashMap;
import java.util.LinkedList;
@@ -33,8 +34,8 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
private final List<String> udfClassNameLists;
private final LinkedList<UdfEntity> functions;
- public AggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) {
- udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
+ public AggregateProcessorFunction(StreamExecutionEnvironment env, AggregateConfig aggregateConfig) {
+ udfClassNameLists = JSON.parseObject(env.getConfig().getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
udfContexts = aggregateConfig.getFunctions();
if (udfContexts == null || udfContexts.isEmpty()) {
throw new RuntimeException();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunction.java
index 6e78606..2aab0e6 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunction.java
@@ -3,18 +3,17 @@ package com.geedgenetworks.core.processor.aggregate;
import com.geedgenetworks.common.config.Accumulator;
import com.geedgenetworks.common.config.KeybyEntity;
import com.geedgenetworks.common.utils.ColumnUtil;
-import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.geedgenetworks.spi.metrics.InternalMetrics;
import com.geedgenetworks.spi.processor.AggregateConfig;
import com.geedgenetworks.spi.table.event.Event;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import static com.geedgenetworks.spi.table.event.Event.WINDOW_END_TIMESTAMP;
import static com.geedgenetworks.spi.table.event.Event.WINDOW_START_TIMESTAMP;
-public class ProcessWindowFunctionImpl extends org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<
+public class ProcessWindowFunction extends org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<
Accumulator, // 输入类型
Event, // 输出类型
KeybyEntity, // 键类型
@@ -22,7 +21,7 @@ public class ProcessWindowFunctionImpl extends org.apache.flink.streaming.api.fu
private final AggregateConfig aggregateConfig;
private transient InternalMetrics internalMetrics;
- public ProcessWindowFunctionImpl(AggregateConfig aggregateConfig) {
+ public ProcessWindowFunction(AggregateConfig aggregateConfig) {
this.aggregateConfig = aggregateConfig;
}
@@ -34,7 +33,7 @@ public class ProcessWindowFunctionImpl extends org.apache.flink.streaming.api.fu
}
@Override
- public void process(KeybyEntity keybyEntity, ProcessWindowFunction<Accumulator, Event, KeybyEntity, TimeWindow>.Context context, Iterable<Accumulator> elements, Collector<Event> out) throws Exception {
+ public void process(KeybyEntity keybyEntity, org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<Accumulator, Event, KeybyEntity, TimeWindow>.Context context, Iterable<Accumulator> elements, Collector<Event> out) throws Exception {
Accumulator accumulator = elements.iterator().next();
Event event = new Event();
event.setExtractedFields(accumulator.getMetricsFields());
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
index 9087631..b9b53b7 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java
@@ -17,6 +17,7 @@ import com.googlecode.aviator.Options;
import com.googlecode.aviator.exception.ExpressionRuntimeException;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.HashMap;
import java.util.LinkedList;
@@ -31,8 +32,8 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co
private final List<String> udfClassNameLists;
private final LinkedList<UdfEntity> functions;
- public SecondAggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) {
- udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
+ public SecondAggregateProcessorFunction(StreamExecutionEnvironment env, AggregateConfig aggregateConfig) {
+ udfClassNameLists = JSON.parseObject(env.getConfig().getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
udfContexts = aggregateConfig.getFunctions();
if (udfContexts == null || udfContexts.isEmpty()) {
throw new RuntimeException();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFactory.java
new file mode 100644
index 0000000..b8b4201
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFactory.java
@@ -0,0 +1,17 @@
+package com.geedgenetworks.core.processor.projection;
+
+import com.geedgenetworks.spi.processor.Processor;
+import com.geedgenetworks.spi.processor.ProcessorFactory;
+
+public class ProjectionProcessFactory implements ProcessorFactory {
+
+ @Override
+ public String type() {
+ return "projection";
+ }
+
+ @Override
+ public Processor<?> createProcessor() {
+ return new ProjectionProcessor();
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
index ad8c6ca..004bd78 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
@@ -7,11 +7,11 @@ import com.geedgenetworks.common.config.*;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.utils.ColumnUtil;
-import com.geedgenetworks.core.metrics.InternalMetrics;
import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseScheduler;
import com.geedgenetworks.spi.common.udf.ScalarFunction;
import com.geedgenetworks.spi.common.udf.UDFContext;
import com.geedgenetworks.spi.common.udf.UdfEntity;
+import com.geedgenetworks.spi.metrics.InternalMetrics;
import com.geedgenetworks.spi.processor.ProjectionConfig;
import com.geedgenetworks.spi.table.event.Event;
import com.google.common.collect.Lists;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java
index 21ec8f4..d182bd8 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java
@@ -11,23 +11,23 @@ import com.geedgenetworks.spi.processor.Processor;
import com.geedgenetworks.spi.processor.ProjectionConfig;
import com.geedgenetworks.spi.table.event.Event;
import com.typesafe.config.Config;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Map;
-public class ProjectionProcessorImpl implements Processor<ProjectionConfig> {
+public class ProjectionProcessor implements Processor<ProjectionConfig> {
@Override
- public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, ProjectionConfig projectionConfig, ExecutionConfig config) {
+ public DataStream<Event> process(StreamExecutionEnvironment env, DataStream<Event> input, ProjectionConfig projectionConfig) {
if (projectionConfig.getParallelism() != 0) {
- return grootEventDataStream
+ return input
.process(new ProjectionProcessFunction(projectionConfig))
.setParallelism(projectionConfig.getParallelism())
.name(projectionConfig.getName());
} else {
- return grootEventDataStream
+ return input
.process(new ProjectionProcessFunction(projectionConfig))
.name(projectionConfig.getName());
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessor.java
index 8771cca..1db565b 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessor.java
@@ -10,23 +10,23 @@ import com.geedgenetworks.spi.processor.Processor;
import com.geedgenetworks.spi.processor.TableConfig;
import com.geedgenetworks.spi.table.event.Event;
import com.typesafe.config.Config;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Map;
-public class TableProcessorImpl implements Processor<TableConfig> {
+public class TableProcessor implements Processor<TableConfig> {
@Override
- public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, TableConfig tableConfig, ExecutionConfig config) {
+ public DataStream<Event> process(StreamExecutionEnvironment env, DataStream<Event> input, TableConfig tableConfig) {
if (tableConfig.getParallelism() != 0) {
- return grootEventDataStream
+ return input
.flatMap(new TableProcessorFunction(tableConfig))
.setParallelism(tableConfig.getParallelism())
.name(tableConfig.getName());
} else {
- return grootEventDataStream
+ return input
.flatMap(new TableProcessorFunction(tableConfig))
.name(tableConfig.getName());
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
index 5200d41..ecb76f4 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
@@ -5,10 +5,10 @@ import com.geedgenetworks.common.config.Constants;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.utils.ColumnUtil;
-import com.geedgenetworks.core.metrics.InternalMetrics;
import com.geedgenetworks.spi.common.udf.TableFunction;
import com.geedgenetworks.spi.common.udf.UDFContext;
import com.geedgenetworks.spi.common.udf.UdfEntity;
+import com.geedgenetworks.spi.metrics.InternalMetrics;
import com.geedgenetworks.spi.processor.TableConfig;
import com.geedgenetworks.spi.table.event.Event;
import com.google.common.collect.Lists;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java
index f5bd652..64465a2 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.core.split;
-import com.geedgenetworks.core.metrics.InternalMetrics;
import com.geedgenetworks.spi.common.udf.RuleContext;
+import com.geedgenetworks.spi.metrics.InternalMetrics;
import com.geedgenetworks.spi.split.SplitConfig;
import com.geedgenetworks.spi.table.event.Event;
import com.googlecode.aviator.AviatorEvaluator;
diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.spi.processor.Processor b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.spi.processor.Processor
index 1f32ffa..a7eef58 100644
--- a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.spi.processor.Processor
+++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.spi.processor.Processor
@@ -1,3 +1,3 @@
-com.geedgenetworks.core.processor.aggregate.AggregateProcessorImpl
-com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
-com.geedgenetworks.core.processor.table.TableProcessorImpl \ No newline at end of file
+com.geedgenetworks.core.processor.aggregate.AggregateProcessor
+com.geedgenetworks.core.processor.projection.ProjectionProcessor
+com.geedgenetworks.core.processor.table.TableProcessor \ No newline at end of file
diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.spi.processor.ProcessorFactory b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.spi.processor.ProcessorFactory
new file mode 100644
index 0000000..fa21381
--- /dev/null
+++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.spi.processor.ProcessorFactory
@@ -0,0 +1 @@
+com.geedgenetworks.core.processor.projection.ProjectionProcessFactory \ No newline at end of file
diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
index 5e64962..9b58289 100644
--- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
+++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
@@ -14,7 +14,7 @@ import java.util.List;
public class GrootStreamExample {
public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
- String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print_test.yaml";
+ String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml";
String configFile = getTestConfigFile(configPath);
ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs();
executeCommandArgs.setConfigFile(configFile);
diff --git a/groot-examples/pom.xml b/groot-examples/pom.xml
index d1dc891..bbc9c65 100644
--- a/groot-examples/pom.xml
+++ b/groot-examples/pom.xml
@@ -18,10 +18,11 @@
</modules>
<properties>
- <scope>compile</scope>
+ <scope>provided</scope>
</properties>
<dependencies>
+
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
@@ -30,34 +31,35 @@
<dependency>
<groupId>com.geedgenetworks</groupId>
<artifactId>groot-bootstrap</artifactId>
- <version>${project.version}</version>
+ <version>${revision}</version>
+ <scope>${scope}</scope>
</dependency>
<dependency>
<groupId>com.geedgenetworks</groupId>
<artifactId>connector-kafka</artifactId>
- <version>${project.version}</version>
+ <version>${revision}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>com.geedgenetworks</groupId>
<artifactId>connector-mock</artifactId>
- <version>${project.version}</version>
+ <version>${revision}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>com.geedgenetworks</groupId>
<artifactId>connector-clickhouse</artifactId>
- <version>${project.version}</version>
+ <version>${revision}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>com.geedgenetworks</groupId>
<artifactId>connector-ipfix-collector</artifactId>
- <version>${project.version}</version>
+ <version>${revision}</version>
<scope>${scope}</scope>
</dependency>
@@ -146,7 +148,6 @@
</dependency>
-
</dependencies>
diff --git a/groot-formats/format-csv/pom.xml b/groot-formats/format-csv/pom.xml
index 4940bcf..509a9c1 100644
--- a/groot-formats/format-csv/pom.xml
+++ b/groot-formats/format-csv/pom.xml
@@ -19,5 +19,24 @@
<version>${flink.version}</version>
<scope>${flink.scope}</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
+ <scope>${flink.scope}</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project> \ No newline at end of file
diff --git a/groot-formats/format-json/pom.xml b/groot-formats/format-json/pom.xml
index 36fef72..1036832 100644
--- a/groot-formats/format-json/pom.xml
+++ b/groot-formats/format-json/pom.xml
@@ -12,6 +12,25 @@
<artifactId>format-json</artifactId>
<name>Groot : Formats : Format-Json </name>
<dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project> \ No newline at end of file
diff --git a/groot-formats/format-msgpack/pom.xml b/groot-formats/format-msgpack/pom.xml
index a58e919..7d70875 100644
--- a/groot-formats/format-msgpack/pom.xml
+++ b/groot-formats/format-msgpack/pom.xml
@@ -19,15 +19,30 @@
<version>0.9.8</version>
</dependency>
- <!--<dependency>
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-core</artifactId>
+ <version>${revision}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
- </dependency>-->
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project> \ No newline at end of file
diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java
index b66c5b7..fced05e 100644
--- a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java
+++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java
@@ -1,9 +1,9 @@
package com.geedgenetworks.formats.msgpack;
-import com.geedgenetworks.spi.table.connector.SinkProvider;
-import com.geedgenetworks.spi.table.connector.SinkTableFactory;
-import com.geedgenetworks.spi.table.connector.SourceProvider;
-import com.geedgenetworks.spi.table.connector.SourceTableFactory;
+import com.geedgenetworks.spi.sink.SinkProvider;
+import com.geedgenetworks.spi.sink.SinkTableFactory;
+import com.geedgenetworks.spi.source.SourceProvider;
+import com.geedgenetworks.spi.source.SourceTableFactory;
import com.geedgenetworks.spi.table.event.Event;
import com.geedgenetworks.spi.table.factory.FactoryUtil;
import com.geedgenetworks.spi.table.factory.TableFactory;
@@ -69,6 +69,7 @@ public class MessagePackFormatFactoryTest {
SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, "inline");
Map<String, String> options = new HashMap<>();
options.put("data", Base64.getEncoder().encodeToString(bytes));
+ options.put("repeat.count", "3");
options.put("type", "base64");
options.put("format", "msgpack");
diff --git a/groot-formats/format-protobuf/pom.xml b/groot-formats/format-protobuf/pom.xml
index f14e1d1..9902ada 100644
--- a/groot-formats/format-protobuf/pom.xml
+++ b/groot-formats/format-protobuf/pom.xml
@@ -13,17 +13,11 @@
<name>Groot : Formats : Format-Protobuf </name>
<properties>
- <protobuf.version>3.23.4</protobuf.version>
+
</properties>
<dependencies>
- <!--<dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>${protobuf.version}</version>
- </dependency>-->
- <!--
- -->
+
<dependency>
<groupId>com.geedgenetworks</groupId>
<artifactId>protobuf-shaded</artifactId>
@@ -37,10 +31,45 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-core</artifactId>
+ <version>${revision}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>format-json</artifactId>
+ <version>${revision}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+
+
+
</dependencies>
</project> \ No newline at end of file
diff --git a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufEventSchemaTest.java b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufEventSchemaTest.java
index 9638bd6..df3c30a 100644
--- a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufEventSchemaTest.java
+++ b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufEventSchemaTest.java
@@ -1,700 +1,700 @@
-package com.geedgenetworks.formats.protobuf;
-
-import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.shaded.com.google.protobuf.ByteString;
-import com.geedgenetworks.shaded.com.google.protobuf.Descriptors;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.LineIterator;
-import org.apache.flink.util.Preconditions;
-import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.utils.ByteBufferOutputStream;
-import org.junit.jupiter.api.Test;
-import com.geedgenetworks.formats.protobuf.SchemaConverters.MessageConverter;
-
-import java.io.FileInputStream;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.jupiter.api.Assertions.*;
-
-/**
- * protoc --descriptor_set_out=proto3_types.desc --java_out=./ proto3_types.proto
- * protoc --descriptor_set_out=session_record_test.desc session_record_test.proto
- *
- */
-public class ProtobufEventSchemaTest {
-
- public static class InputDatas{
- Proto3TypesProtos.Proto3Types msg;
- Proto3TypesProtos.StructMessage subMsg1;
- Proto3TypesProtos.StructMessage subMsg2;
- Map<String, Object> map;
- Map<String, Object> subMap1;
- Map<String, Object> subMap2;
- }
-
- public static InputDatas geneInputDatas(){
- ThreadLocalRandom random = ThreadLocalRandom.current();
- Proto3TypesProtos.Proto3Types.Builder msgBuilder = Proto3TypesProtos.Proto3Types.newBuilder();
- Map<String, Object> map = new HashMap<>();
- Proto3TypesProtos.StructMessage.Builder subMsgBuilder1 = Proto3TypesProtos.StructMessage.newBuilder();
- Proto3TypesProtos.StructMessage.Builder subMsgBuilder2 = Proto3TypesProtos.StructMessage.newBuilder();
- Map<String, Object> subMap1 = new HashMap<>();
- Map<String, Object> subMap2 = new HashMap<>();
-
- long int64 = random.nextLong(1, Long.MAX_VALUE);
- msgBuilder.setInt64(int64);
- map.put("int64", int64);
-
- int int32 = random.nextInt(1, Integer.MAX_VALUE);
- msgBuilder.setInt32(int32);
- map.put("int32", int32);
-
- String text = "ut8字符串";
- msgBuilder.setText(text);
- map.put("text", text);
-
- byte[] bytes = new byte[]{1, 2, 3, 4, 5};
- msgBuilder.setBytes(ByteString.copyFrom(bytes));
- map.put("bytes", bytes);
-
- int enum_val = 1;
- msgBuilder.setEnumValValue(enum_val);
- map.put("enum_val", enum_val);
-
- // subMsg start
- long id = random.nextLong(1, Long.MAX_VALUE);
- subMsgBuilder1.setId(id);
- subMap1.put("id", id);
-
- String name = "ut8字符串1";
- subMsgBuilder1.setName(name);
- subMap1.put("name", name);
-
- int age = random.nextInt(1, Integer.MAX_VALUE);
- subMsgBuilder1.setAge(age);
- subMap1.put("age", age);
-
- double score = random.nextDouble(1, Integer.MAX_VALUE);
- subMsgBuilder1.setScore(score);
- subMap1.put("score", score);
-
- long optional_id = random.nextLong(1, Long.MAX_VALUE);
- subMsgBuilder1.setOptionalId(optional_id);
- subMap1.put("optional_id", optional_id);
-
- int optional_age = random.nextInt(1, Integer.MAX_VALUE);
- subMsgBuilder1.setOptionalAge(optional_age);
- subMap1.put("optional_age", optional_age);
-
- id = random.nextLong(1, Long.MAX_VALUE);
- subMsgBuilder2.setId(id);
- subMap2.put("id", id);
-
- name = "ut8字符串1";
- subMsgBuilder2.setName(name);
- subMap2.put("name", name);
-
- age = random.nextInt(1, Integer.MAX_VALUE);
- subMsgBuilder2.setAge(age);
- subMap2.put("age", age);
-
- score = random.nextDouble(1, Integer.MAX_VALUE);
- subMsgBuilder2.setScore(score);
- subMap2.put("score", score);
-
- optional_id = random.nextLong(1, Long.MAX_VALUE);
- subMsgBuilder2.setOptionalId(optional_id);
- subMap2.put("optional_id", optional_id);
-
- optional_age = random.nextInt(1, Integer.MAX_VALUE);
- subMsgBuilder2.setOptionalAge(optional_age);
- subMap2.put("optional_age", optional_age);
-
- Proto3TypesProtos.StructMessage subMsg1 = subMsgBuilder1.build();
- Proto3TypesProtos.StructMessage subMsg2 = subMsgBuilder2.build();
- // subMsg end
-
- msgBuilder.setMessage(subMsg1);
- map.put("message", subMap1);
-
- long optional_int64 = random.nextLong(1, Long.MAX_VALUE);
- msgBuilder.setOptionalInt64(optional_int64);
- map.put("optional_int64", optional_int64);
-
- int optional_int32 = random.nextInt(1, Integer.MAX_VALUE);
- msgBuilder.setOptionalInt32(optional_int32);
- map.put("optional_int32", optional_int32);
-
- String optional_text = "ut8字符串";
- msgBuilder.setOptionalText(optional_text);
- map.put("optional_text", optional_text);
-
- byte[] optional_bytes = new byte[]{1, 2, 3, 4, 5};
- msgBuilder.setOptionalBytes(ByteString.copyFrom(optional_bytes));
- map.put("optional_bytes", optional_bytes);
-
- int optional_enum_val = 1;
- msgBuilder.setOptionalEnumValValue(optional_enum_val);
- map.put("optional_enum_val", optional_enum_val);
-
- msgBuilder.setOptionalMessage(subMsg2);
- map.put("optional_message", subMap2);
-
- List<Long> repeated_int64 = Arrays.asList(1L, 3L, 5L);
- msgBuilder.addAllRepeatedInt64(repeated_int64);
- map.put("repeated_int64", repeated_int64);
-
- List<Integer> repeated_int32 = Arrays.asList(1, 3, 5);
- msgBuilder.addAllRepeatedInt32(repeated_int32);
- map.put("repeated_int32", repeated_int32);
-
- msgBuilder.addAllRepeatedMessage(Arrays.asList(subMsg1, subMsg2));
- map.put("repeated_message", Arrays.asList(subMap1, subMap2));
-
- InputDatas datas = new InputDatas();
- datas.msg = msgBuilder.build();
- datas.subMsg1 = subMsg1;
- datas.subMsg2 = subMsg2;
- datas.map = map;
- datas.subMap1 = subMap1;
- datas.subMap2 = subMap2;
- return datas;
- }
-
- public static InputDatas geneInputDatasDefaultValue(){
- ThreadLocalRandom random = ThreadLocalRandom.current();
- Proto3TypesProtos.Proto3Types.Builder msgBuilder = Proto3TypesProtos.Proto3Types.newBuilder();
- Map<String, Object> map = new HashMap<>();
- Proto3TypesProtos.StructMessage.Builder subMsgBuilder1 = Proto3TypesProtos.StructMessage.newBuilder();
- Proto3TypesProtos.StructMessage.Builder subMsgBuilder2 = Proto3TypesProtos.StructMessage.newBuilder();
- Map<String, Object> subMap1 = new HashMap<>();
- Map<String, Object> subMap2 = new HashMap<>();
-
- long int64 = 0;
- msgBuilder.setInt64(int64);
- map.put("int64", int64);
-
- int int32 = 0;
- msgBuilder.setInt32(int32);
- map.put("int32", int32);
-
- String text = "";
- msgBuilder.setText(text);
- map.put("text", text);
-
- byte[] bytes = new byte[]{};
- msgBuilder.setBytes(ByteString.copyFrom(bytes));
- map.put("bytes", bytes);
-
- int enum_val = 0;
- msgBuilder.setEnumValValue(enum_val);
- map.put("enum_val", enum_val);
-
- // subMsg start
- long id = 0;
- subMsgBuilder1.setId(id);
- subMap1.put("id", id);
-
- String name = "";
- subMsgBuilder1.setName(name);
- subMap1.put("name", name);
-
- int age = 0;
- subMsgBuilder1.setAge(age);
- subMap1.put("age", age);
-
- double score = 0;
- subMsgBuilder1.setScore(score);
- subMap1.put("score", score);
-
- long optional_id = 0;
- subMsgBuilder1.setOptionalId(optional_id);
- subMap1.put("optional_id", optional_id);
-
- int optional_age = 0;
- /*subMsgBuilder1.setOptionalAge(optional_age);
- subMap1.put("optional_age", optional_age);*/
-
- id = 0;
- subMsgBuilder2.setId(id);
- subMap2.put("id", id);
-
- name = "";
- subMsgBuilder2.setName(name);
- subMap2.put("name", name);
-
- age = 0;
- subMsgBuilder2.setAge(age);
- subMap2.put("age", age);
-
- score = 0;
- subMsgBuilder2.setScore(score);
- subMap2.put("score", score);
-
- optional_id = 0;
- subMsgBuilder2.setOptionalId(optional_id);
- subMap2.put("optional_id", optional_id);
-
- optional_age = 0;
- subMsgBuilder2.setOptionalAge(optional_age);
- subMap2.put("optional_age", optional_age);
-
- Proto3TypesProtos.StructMessage subMsg1 = subMsgBuilder1.build();
- Proto3TypesProtos.StructMessage subMsg2 = subMsgBuilder2.build();
- // subMsg end
-
- msgBuilder.setMessage(subMsg1);
- map.put("message", subMap1);
-
- long optional_int64 = 0;
- msgBuilder.setOptionalInt64(optional_int64);
- map.put("optional_int64", optional_int64);
-
- int optional_int32 = 0;
- msgBuilder.setOptionalInt32(optional_int32);
- map.put("optional_int32", optional_int32);
-
- String optional_text = "";
- msgBuilder.setOptionalText(optional_text);
- map.put("optional_text", optional_text);
-
- byte[] optional_bytes = new byte[]{};
- msgBuilder.setOptionalBytes(ByteString.copyFrom(optional_bytes));
- map.put("optional_bytes", optional_bytes);
-
- int optional_enum_val = 0;
- msgBuilder.setOptionalEnumValValue(optional_enum_val);
- map.put("optional_enum_val", optional_enum_val);
-
- msgBuilder.setOptionalMessage(subMsg2);
- map.put("optional_message", subMap2);
-
- List<Long> repeated_int64 = Arrays.asList();
- msgBuilder.addAllRepeatedInt64(repeated_int64);
- map.put("repeated_int64", repeated_int64);
-
- List<Integer> repeated_int32 = Arrays.asList();
- msgBuilder.addAllRepeatedInt32(repeated_int32);
- map.put("repeated_int32", repeated_int32);
-
- msgBuilder.addAllRepeatedMessage(Arrays.asList());
- map.put("repeated_message", Arrays.asList());
-
- InputDatas datas = new InputDatas();
- datas.msg = msgBuilder.build();
- datas.subMsg1 = subMsg1;
- datas.subMsg2 = subMsg2;
- datas.map = map;
- datas.subMap1 = subMap1;
- datas.subMap2 = subMap2;
- return datas;
- }
-
- public static InputDatas geneInputDatasUsePartialField(){
- ThreadLocalRandom random = ThreadLocalRandom.current();
- Proto3TypesProtos.Proto3Types.Builder msgBuilder = Proto3TypesProtos.Proto3Types.newBuilder();
- Map<String, Object> map = new HashMap<>();
- Proto3TypesProtos.StructMessage.Builder subMsgBuilder1 = Proto3TypesProtos.StructMessage.newBuilder();
- Proto3TypesProtos.StructMessage.Builder subMsgBuilder2 = Proto3TypesProtos.StructMessage.newBuilder();
- Map<String, Object> subMap1 = new HashMap<>();
- Map<String, Object> subMap2 = new HashMap<>();
-
- /*long int64 = random.nextLong(1, Long.MAX_VALUE);
- msgBuilder.setInt64(int64);
- map.put("int64", int64);*/
-
- int int32 = random.nextInt(1, Integer.MAX_VALUE);
- msgBuilder.setInt32(int32);
- map.put("int32", int32);
-
- String text = "ut8字符串";
- msgBuilder.setText(text);
- map.put("text", text);
-
- /*byte[] bytes = new byte[]{1, 2, 3, 4, 5};
- msgBuilder.setBytes(ByteString.copyFrom(bytes));
- map.put("bytes", bytes);*/
-
- /*int enum_val = 1;
- msgBuilder.setEnumValValue(enum_val);
- map.put("enum_val", enum_val);*/
-
- // subMsg start
- long id = random.nextLong(1, Long.MAX_VALUE);
- subMsgBuilder1.setId(id);
- subMap1.put("id", id);
-
- String name = "ut8字符串1";
- /*subMsgBuilder1.setName(name);
- subMap1.put("name", name);*/
-
- int age = random.nextInt(1, Integer.MAX_VALUE);
- subMsgBuilder1.setAge(age);
- subMap1.put("age", age);
-
- double score = random.nextDouble(1, Integer.MAX_VALUE);
- /*subMsgBuilder1.setScore(score);
- subMap1.put("score", score);*/
-
- long optional_id = random.nextLong(1, Long.MAX_VALUE);
- subMsgBuilder1.setOptionalId(optional_id);
- subMap1.put("optional_id", optional_id);
-
- int optional_age = random.nextInt(1, Integer.MAX_VALUE);
- /*subMsgBuilder1.setOptionalAge(optional_age);
- subMap1.put("optional_age", optional_age);*/
-
- id = random.nextLong(1, Long.MAX_VALUE);
- /*subMsgBuilder2.setId(id);
- subMap2.put("id", id);*/
-
- name = "ut8字符串1";
- subMsgBuilder2.setName(name);
- subMap2.put("name", name);
-
- age = random.nextInt(1, Integer.MAX_VALUE);
- /*subMsgBuilder2.setAge(age);
- subMap2.put("age", age);*/
-
- score = random.nextDouble(1, Integer.MAX_VALUE);
- subMsgBuilder2.setScore(score);
- subMap2.put("score", score);
-
- optional_id = random.nextLong(1, Long.MAX_VALUE);
- /*subMsgBuilder2.setOptionalId(optional_id);
- subMap2.put("optional_id", optional_id);*/
-
- optional_age = random.nextInt(1, Integer.MAX_VALUE);
- subMsgBuilder2.setOptionalAge(optional_age);
- subMap2.put("optional_age", optional_age);
-
- Proto3TypesProtos.StructMessage subMsg1 = subMsgBuilder1.build();
- Proto3TypesProtos.StructMessage subMsg2 = subMsgBuilder2.build();
- // subMsg end
-
- /*msgBuilder.setMessage(subMsg1);
- map.put("message", subMap1);*/
-
- long optional_int64 = random.nextLong(1, Long.MAX_VALUE);
- msgBuilder.setOptionalInt64(optional_int64);
- map.put("optional_int64", optional_int64);
-
- /*int optional_int32 = random.nextInt(1, Integer.MAX_VALUE);
- msgBuilder.setOptionalInt32(optional_int32);
- map.put("optional_int32", optional_int32);*/
-
- String optional_text = "ut8字符串";
- msgBuilder.setOptionalText(optional_text);
- map.put("optional_text", optional_text);
-
- /*byte[] optional_bytes = new byte[]{1, 2, 3, 4, 5};
- msgBuilder.setOptionalBytes(ByteString.copyFrom(optional_bytes));
- map.put("optional_bytes", optional_bytes);*/
-
- int optional_enum_val = 1;
- msgBuilder.setOptionalEnumValValue(optional_enum_val);
- map.put("optional_enum_val", optional_enum_val);
-
- msgBuilder.setOptionalMessage(subMsg2);
- map.put("optional_message", subMap2);
-
- /*List<Long> repeated_int64 = Arrays.asList(1L, 3L, 5L);
- msgBuilder.addAllRepeatedInt64(repeated_int64);
- map.put("repeated_int64", repeated_int64);*/
-
- List<Integer> repeated_int32 = Arrays.asList(1, 3, 5);
- msgBuilder.addAllRepeatedInt32(repeated_int32);
- map.put("repeated_int32", repeated_int32);
-
- msgBuilder.addAllRepeatedMessage(Arrays.asList(subMsg1, subMsg2));
- map.put("repeated_message", Arrays.asList(subMap1, subMap2));
-
- InputDatas datas = new InputDatas();
- datas.msg = msgBuilder.build();
- datas.subMsg1 = subMsg1;
- datas.subMsg2 = subMsg2;
- datas.map = map;
- datas.subMap1 = subMap1;
- datas.subMap2 = subMap2;
- return datas;
- }
-
- @Test
- public void testSerializeAndDeserialize() throws Exception{
- String path = getClass().getResource("/proto3_types.desc").getPath();
- Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"Proto3Types");
- InputDatas inputDatas = geneInputDatas();
-
- byte[] bytesSerByApi = inputDatas.msg.toByteArray();
-
- ProtobufSerializer serializer = new ProtobufSerializer(descriptor);
- byte[] bytesSer = serializer.serialize(inputDatas.map);
-
- System.out.println(String.format("built-in ser bytes size:%d\nmy ser bytes size:%d", bytesSerByApi.length, bytesSer.length));
- assertArrayEquals(bytesSerByApi, bytesSer);
-
- MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false);
- Map<String, Object> rstMap = messageConverter.converter(bytesSer);
-
- assertTrue(objEquals(inputDatas.map, rstMap, false), () -> "\n" + inputDatas.map.toString() + "\n" + rstMap.toString());
- System.out.println(inputDatas.map.toString());
- System.out.println(rstMap.toString());
- System.out.println(JSON.toJSONString(inputDatas.map));
- System.out.println(JSON.toJSONString(rstMap));
-
- System.out.println(JSON.toJSONString(inputDatas.map).equals(JSON.toJSONString(rstMap)));
- }
-
- @Test
- public void testSerializeAndDeserializeDefaultValue() throws Exception{
- String path = getClass().getResource("/proto3_types.desc").getPath();
- Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"Proto3Types");
- InputDatas inputDatas = geneInputDatasDefaultValue();
-
- byte[] bytesSerByApi = inputDatas.msg.toByteArray();
-
- ProtobufSerializer serializer = new ProtobufSerializer(descriptor);
- byte[] bytesSer = serializer.serialize(inputDatas.map);
-
- System.out.println(String.format("built-in ser bytes size:%d\nmy ser bytes size:%d", bytesSerByApi.length, bytesSer.length));
- assertArrayEquals(bytesSerByApi, bytesSer);
-
- MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false);
- Map<String, Object> rstMap = messageConverter.converter(bytesSer);
- messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), true);
- Map<String, Object> rstMapEmitDefaultValue = messageConverter.converter(bytesSer);
-
- // message不是null就输出, 数组长度大于0才输出, optional设置值就输出, optional bytes长度为0也输出
- System.out.println(inputDatas.map.toString());
- System.out.println(rstMap.toString());
- System.out.println(rstMapEmitDefaultValue.toString());
- System.out.println(JSON.toJSONString(inputDatas.map));
- System.out.println(JSON.toJSONString(rstMap));
- System.out.println(JSON.toJSONString(rstMapEmitDefaultValue));
-
- System.out.println(JSON.toJSONString(inputDatas.map).equals(JSON.toJSONString(rstMap)));
- }
-
- @Test
- public void testSerializeAndDeserializeUsePartialField() throws Exception{
- String path = getClass().getResource("/proto3_types.desc").getPath();
- Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"Proto3Types");
- InputDatas inputDatas = geneInputDatasUsePartialField();
-
- byte[] bytesSerByApi = inputDatas.msg.toByteArray();
-
- ProtobufSerializer serializer = new ProtobufSerializer(descriptor);
- byte[] bytesSer = serializer.serialize(inputDatas.map);
- System.out.println(Base64.getEncoder().encodeToString(bytesSer));
-
- System.out.println(String.format("built-in ser bytes size:%d\nmy ser bytes size:%d", bytesSerByApi.length, bytesSer.length));
- assertArrayEquals(bytesSerByApi, bytesSer);
-
- MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false);
- Map<String, Object> rstMap = messageConverter.converter(bytesSer);
-
- assertTrue(objEquals(inputDatas.map, rstMap, false), () -> "\n" + inputDatas.map.toString() + "\n" + rstMap.toString());
- System.out.println(inputDatas.map.toString());
- System.out.println(rstMap.toString());
- System.out.println(JSON.toJSONString(inputDatas.map));
- System.out.println(JSON.toJSONString(rstMap));
-
- System.out.println(JSON.toJSONString(inputDatas.map).equals(JSON.toJSONString(rstMap)));
- }
-
- @Test
- public void testSerializeAndDeserializeSessionRecord() throws Exception{
- String path = getClass().getResource("/session_record_test.desc").getPath();
- Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"SessionRecord");
- String json = "{\"recv_time\": 1704350600, \"log_id\": 185826449998479360, \"decoded_as\": \"BASE\", \"session_id\": 290502878495441820, \"start_timestamp_ms\": 1704350566378, \"end_timestamp_ms\": 1704350570816, \"duration_ms\": 4438, \"tcp_handshake_latency_ms\": 1105, \"ingestion_time\": 1704350600, \"processing_time\": 1704350600, \"device_id\": \"21426003\", \"out_link_id\": 65535, \"in_link_id\": 65535, \"device_tag\": \"{\\\"tags\\\":[{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-9140\\\"},{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-9140\\\"}]}\", \"data_center\": \"center-xxg-9140\", \"device_group\": \"group-xxg-9140\", \"sled_ip\": \"192.168.40.81\", \"address_type\": 4, \"vsys_id\": 1, \"t_vsys_id\": 1, \"flags\": 24592, \"flags_identify_info\": \"[1,1,2]\", \"statistics_rule_list\": [406583], \"client_ip\": \"192.56.151.80\", \"client_port\": 62241, \"client_os_desc\": \"Windows\", \"client_geolocation\": \"\\u7f8e\\u56fd.Unknown.Unknown..\", \"server_ip\": \"192.56.222.93\", \"server_port\": 14454, \"server_os_desc\": \"Linux\", \"server_geolocation\": \"\\u7f8e\\u56fd.Unknown.Unknown..\", \"ip_protocol\": \"tcp\", \"decoded_path\": \"ETHERNET.IPv4.TCP\", \"sent_pkts\": 4, \"received_pkts\": 5, \"sent_bytes\": 246, \"received_bytes\": 1809, \"tcp_rtt_ms\": 128, \"tcp_client_isn\": 568305009, \"tcp_server_isn\": 4027331180, \"in_src_mac\": \"a2:fa:dc:56:c7:b3\", \"out_src_mac\": \"48:73:97:96:38:20\", \"in_dest_mac\": \"48:73:97:96:38:20\", \"out_dest_mac\": \"a2:fa:dc:56:c7:b3\"}";
- Map<String, Object> map = JSON.parseObject(json);
-
- ProtobufSerializer serializer = new ProtobufSerializer(descriptor);
- byte[] bytesSer = serializer.serialize(map);
- System.out.println(Base64.getEncoder().encodeToString(bytesSer));
-
- System.out.println(String.format("my ser bytes size:%d", bytesSer.length));
-
- MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false);
- Map<String, Object> rstMap = messageConverter.converter(bytesSer);
-
- assertTrue(objEquals(map, rstMap, true), () -> "\n" + JSON.toJSONString(map) + "\n" + JSON.toJSONString(rstMap));
- System.out.println(map.toString());
- System.out.println(rstMap.toString());
- System.out.println(JSON.toJSONString(new TreeMap<>(map)));
- System.out.println(JSON.toJSONString(new TreeMap<>(rstMap)));
-
- System.out.println(JSON.toJSONString(new TreeMap<>(map)).equals(JSON.toJSONString(new TreeMap<>(rstMap))));
- }
-
-
- public static void main(String[] args) throws Exception{
- ProtobufEventSchemaTest test = new ProtobufEventSchemaTest();
- String path = test.getClass().getResource("/session_record_test.desc").getPath();
- Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"SessionRecord");
- MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false);
- ProtobufSerializer serializer = new ProtobufSerializer(descriptor);
-
- FileInputStream inputStream = new FileInputStream("D:\\doc\\groot\\SESSION-RECORD-24-0104.json");
- LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8");
- int count = 0;
- long jsonBytesTotalSize = 0;
- long protoBytesTotalSize = 0;
- long jsonBytesMinSize = Long.MAX_VALUE;
- long protoBytesMinSize = Long.MAX_VALUE;
- long jsonBytesMaxSize = 0;
- long protoBytesMaxSize = 0;
- long totalFieldCount = 0;
- long minFieldCount = Long.MAX_VALUE;
- long maxFieldCount = 0;
-
- CompressionType[] compressionTypes = new CompressionType[]{
- CompressionType.NONE, CompressionType.SNAPPY, CompressionType.LZ4, CompressionType.GZIP, CompressionType.ZSTD
- };
- long[][] compressionBytesSize = new long[compressionTypes.length][6];
- for (int i = 0; i < compressionBytesSize.length; i++) {
- compressionBytesSize[i][0] = 0;
- compressionBytesSize[i][1] = 0;
- compressionBytesSize[i][2] = Long.MAX_VALUE;
- compressionBytesSize[i][3] = Long.MAX_VALUE;
- compressionBytesSize[i][4] = 0;
- compressionBytesSize[i][5] = 0;
- }
-
- while (lines.hasNext()){
- String line = lines.next().trim();
- if(line.isEmpty()){
- continue;
- }
-
- Map<String, Object> map = JSON.parseObject(line);
- int fieldCount = map.size();
- byte[] bytesProto = serializer.serialize(map);
- byte[] bytesJson = JSON.toJSONString(map).getBytes(StandardCharsets.UTF_8);
- jsonBytesTotalSize += bytesJson.length;
- protoBytesTotalSize += bytesProto.length;
- jsonBytesMinSize = Math.min(jsonBytesMinSize, bytesJson.length);
- protoBytesMinSize = Math.min(protoBytesMinSize, bytesProto.length);
- jsonBytesMaxSize = Math.max(jsonBytesMaxSize, bytesJson.length);
- protoBytesMaxSize = Math.max(protoBytesMaxSize, bytesProto.length);
- totalFieldCount += fieldCount;
- minFieldCount = Math.min(minFieldCount, fieldCount);
- maxFieldCount = Math.max(maxFieldCount, fieldCount);
-
- Map<String, Object> rstMap = messageConverter.converter(bytesProto);
- Preconditions.checkArgument(test.objEquals(map, rstMap, true), "\n" + JSON.toJSONString(new TreeMap<>(map)) + "\n" + JSON.toJSONString(new TreeMap<>(rstMap)));
- Preconditions.checkArgument(JSON.toJSONString(new TreeMap<>(map)).equals(JSON.toJSONString(new TreeMap<>(rstMap))), "\n" + JSON.toJSONString(new TreeMap<>(map)) + "\n" + JSON.toJSONString(new TreeMap<>(rstMap)));
- count++;
-
- for (int i = 0; i < compressionTypes.length; i++) {
- CompressionType compressionType = compressionTypes[i];
- ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(1024 * 16);
- OutputStream outputStream = compressionType.wrapForOutput(bufferStream, (byte) 2);
- outputStream.write(bytesJson);
- outputStream.close();
- int jsonCompressSize = bufferStream.position();
-
- bufferStream = new ByteBufferOutputStream(1024 * 16);
- outputStream = compressionType.wrapForOutput(bufferStream, (byte) 2);
- outputStream.write(bytesProto);
- outputStream.close();
- int protoCompressSize = bufferStream.position();
-
- compressionBytesSize[i][0] += jsonCompressSize;
- compressionBytesSize[i][1] += protoCompressSize;
- compressionBytesSize[i][2] = Math.min(compressionBytesSize[i][2], jsonCompressSize);
- compressionBytesSize[i][3] = Math.min(compressionBytesSize[i][3], protoCompressSize);
- compressionBytesSize[i][4] = Math.max(compressionBytesSize[i][4], jsonCompressSize);
- compressionBytesSize[i][5] = Math.max(compressionBytesSize[i][5], protoCompressSize);
- }
-
- }
- System.out.println(String.format("count:%d, avgFieldCount:%d, minFieldCount:%d, maxFieldCount:%d, jsonBytesAvgSize:%d, protoBytesAvgSize:%d, jsonBytesMinSize:%d, protoBytesMinSize:%d, jsonBytesMaxSize:%d, protoBytesMaxSize:%d",
- count, totalFieldCount/count, minFieldCount, maxFieldCount, jsonBytesTotalSize/count, protoBytesTotalSize/count,
- jsonBytesMinSize, protoBytesMinSize, jsonBytesMaxSize, protoBytesMaxSize));
- for (int i = 0; i < compressionTypes.length; i++) {
- CompressionType compressionType = compressionTypes[i];
- System.out.println(String.format("compression(%s): count:%d, jsonBytesAvgSize:%d, protoBytesAvgSize:%d, avgRatio:%.2f, jsonBytesMinSize:%d, protoBytesMinSize:%d, minRatio:%.2f, jsonBytesMaxSize:%d, protoBytesMaxSize:%d, maxRatio:%.2f",
- compressionType, count, compressionBytesSize[i][0]/count, compressionBytesSize[i][1]/count, (((double)compressionBytesSize[i][1])/count)/(compressionBytesSize[i][0]/count),
- compressionBytesSize[i][2], compressionBytesSize[i][3], ((double)compressionBytesSize[i][3])/(compressionBytesSize[i][2]),
- compressionBytesSize[i][4], compressionBytesSize[i][5], ((double)compressionBytesSize[i][5])/(compressionBytesSize[i][4])));
- }
- }
-
- @Test
- public void testArrayInstance() throws Exception{
- Object bytes = new byte[]{1, 2, 3, 4, 5};
- Object ints = new int[]{1, 2, 3, 4, 5};
-
- System.out.println(bytes.getClass().isArray());
- System.out.println(bytes instanceof byte[]);
- System.out.println(bytes instanceof int[]);
- System.out.println(ints.getClass().isArray());
- System.out.println(ints instanceof byte[]);
- System.out.println(ints instanceof int[]);
- }
-
- private boolean objEquals(Object value1, Object value2, boolean numConvert){
- if(value1 == null){
- if(value1 != value2){
- return false;
- }
- }else if(value2 == null){
- return false;
- }else if(value1 instanceof Map){
- if(!mapEquals((Map<String, Object>) value1, (Map<String, Object>) value2, numConvert)){
- return false;
- }
- }else if(value1 instanceof List){
- if(!listEquals((List< Object>) value1, (List< Object>) value2, numConvert)){
- return false;
- }
- }else if(value1 instanceof byte[]){
- if(!Arrays.equals((byte[]) value1, (byte[]) value2)){
- return false;
- }
- }
- else{
- if(value1.getClass() != value2.getClass() || !value1.equals(value2)){
- if(numConvert && value1 instanceof Number && value2 instanceof Number && ((Number) value1).longValue() == ((Number) value2).longValue()){
-
- }else{
- return false;
- }
- }
- }
- return true;
- }
- private boolean mapEquals(Map<String, Object> map1, Map<String, Object> map2, boolean numConvert){
- if(map1.size() != map2.size()){
- return false;
- }
-
- for (Map.Entry<String, Object> entry : map1.entrySet()) {
- Object value1 = entry.getValue();
- Object value2 = map2.get(entry.getKey());
- if(!objEquals(value1, value2, numConvert)){
- return false;
- }
- }
-
- return true;
- }
-
- private boolean listEquals(List< Object> list1, List< Object> list2, boolean numConvert){
- if(list1.size() != list2.size()){
- return false;
- }
-
- for (int i = 0; i < list1.size(); i++) {
- if(!objEquals(list1.get(i), list2.get(i), numConvert)){
- return false;
- }
- }
-
- return true;
- }
-}
+package com.geedgenetworks.formats.protobuf;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.shaded.com.google.protobuf.ByteString;
+import com.geedgenetworks.shaded.com.google.protobuf.Descriptors;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+import com.geedgenetworks.formats.protobuf.SchemaConverters.MessageConverter;
+
+import java.io.FileInputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * protoc --descriptor_set_out=proto3_types.desc --java_out=./ proto3_types.proto
+ * protoc --descriptor_set_out=session_record_test.desc session_record_test.proto
+ *
+ */
+public class ProtobufEventSchemaTest {
+
+ public static class InputDatas{
+ Proto3TypesProtos.Proto3Types msg;
+ Proto3TypesProtos.StructMessage subMsg1;
+ Proto3TypesProtos.StructMessage subMsg2;
+ Map<String, Object> map;
+ Map<String, Object> subMap1;
+ Map<String, Object> subMap2;
+ }
+
+ public static InputDatas geneInputDatas(){
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ Proto3TypesProtos.Proto3Types.Builder msgBuilder = Proto3TypesProtos.Proto3Types.newBuilder();
+ Map<String, Object> map = new HashMap<>();
+ Proto3TypesProtos.StructMessage.Builder subMsgBuilder1 = Proto3TypesProtos.StructMessage.newBuilder();
+ Proto3TypesProtos.StructMessage.Builder subMsgBuilder2 = Proto3TypesProtos.StructMessage.newBuilder();
+ Map<String, Object> subMap1 = new HashMap<>();
+ Map<String, Object> subMap2 = new HashMap<>();
+
+ long int64 = random.nextLong(1, Long.MAX_VALUE);
+ msgBuilder.setInt64(int64);
+ map.put("int64", int64);
+
+ int int32 = random.nextInt(1, Integer.MAX_VALUE);
+ msgBuilder.setInt32(int32);
+ map.put("int32", int32);
+
+ String text = "ut8字符串";
+ msgBuilder.setText(text);
+ map.put("text", text);
+
+ byte[] bytes = new byte[]{1, 2, 3, 4, 5};
+ msgBuilder.setBytes(ByteString.copyFrom(bytes));
+ map.put("bytes", bytes);
+
+ int enum_val = 1;
+ msgBuilder.setEnumValValue(enum_val);
+ map.put("enum_val", enum_val);
+
+ // subMsg start
+ long id = random.nextLong(1, Long.MAX_VALUE);
+ subMsgBuilder1.setId(id);
+ subMap1.put("id", id);
+
+ String name = "ut8字符串1";
+ subMsgBuilder1.setName(name);
+ subMap1.put("name", name);
+
+ int age = random.nextInt(1, Integer.MAX_VALUE);
+ subMsgBuilder1.setAge(age);
+ subMap1.put("age", age);
+
+ double score = random.nextDouble(1, Integer.MAX_VALUE);
+ subMsgBuilder1.setScore(score);
+ subMap1.put("score", score);
+
+ long optional_id = random.nextLong(1, Long.MAX_VALUE);
+ subMsgBuilder1.setOptionalId(optional_id);
+ subMap1.put("optional_id", optional_id);
+
+ int optional_age = random.nextInt(1, Integer.MAX_VALUE);
+ subMsgBuilder1.setOptionalAge(optional_age);
+ subMap1.put("optional_age", optional_age);
+
+ id = random.nextLong(1, Long.MAX_VALUE);
+ subMsgBuilder2.setId(id);
+ subMap2.put("id", id);
+
+ name = "ut8字符串1";
+ subMsgBuilder2.setName(name);
+ subMap2.put("name", name);
+
+ age = random.nextInt(1, Integer.MAX_VALUE);
+ subMsgBuilder2.setAge(age);
+ subMap2.put("age", age);
+
+ score = random.nextDouble(1, Integer.MAX_VALUE);
+ subMsgBuilder2.setScore(score);
+ subMap2.put("score", score);
+
+ optional_id = random.nextLong(1, Long.MAX_VALUE);
+ subMsgBuilder2.setOptionalId(optional_id);
+ subMap2.put("optional_id", optional_id);
+
+ optional_age = random.nextInt(1, Integer.MAX_VALUE);
+ subMsgBuilder2.setOptionalAge(optional_age);
+ subMap2.put("optional_age", optional_age);
+
+ Proto3TypesProtos.StructMessage subMsg1 = subMsgBuilder1.build();
+ Proto3TypesProtos.StructMessage subMsg2 = subMsgBuilder2.build();
+ // subMsg end
+
+ msgBuilder.setMessage(subMsg1);
+ map.put("message", subMap1);
+
+ long optional_int64 = random.nextLong(1, Long.MAX_VALUE);
+ msgBuilder.setOptionalInt64(optional_int64);
+ map.put("optional_int64", optional_int64);
+
+ int optional_int32 = random.nextInt(1, Integer.MAX_VALUE);
+ msgBuilder.setOptionalInt32(optional_int32);
+ map.put("optional_int32", optional_int32);
+
+ String optional_text = "ut8字符串";
+ msgBuilder.setOptionalText(optional_text);
+ map.put("optional_text", optional_text);
+
+ byte[] optional_bytes = new byte[]{1, 2, 3, 4, 5};
+ msgBuilder.setOptionalBytes(ByteString.copyFrom(optional_bytes));
+ map.put("optional_bytes", optional_bytes);
+
+ int optional_enum_val = 1;
+ msgBuilder.setOptionalEnumValValue(optional_enum_val);
+ map.put("optional_enum_val", optional_enum_val);
+
+ msgBuilder.setOptionalMessage(subMsg2);
+ map.put("optional_message", subMap2);
+
+ List<Long> repeated_int64 = Arrays.asList(1L, 3L, 5L);
+ msgBuilder.addAllRepeatedInt64(repeated_int64);
+ map.put("repeated_int64", repeated_int64);
+
+ List<Integer> repeated_int32 = Arrays.asList(1, 3, 5);
+ msgBuilder.addAllRepeatedInt32(repeated_int32);
+ map.put("repeated_int32", repeated_int32);
+
+ msgBuilder.addAllRepeatedMessage(Arrays.asList(subMsg1, subMsg2));
+ map.put("repeated_message", Arrays.asList(subMap1, subMap2));
+
+ InputDatas datas = new InputDatas();
+ datas.msg = msgBuilder.build();
+ datas.subMsg1 = subMsg1;
+ datas.subMsg2 = subMsg2;
+ datas.map = map;
+ datas.subMap1 = subMap1;
+ datas.subMap2 = subMap2;
+ return datas;
+ }
+
+ public static InputDatas geneInputDatasDefaultValue(){
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ Proto3TypesProtos.Proto3Types.Builder msgBuilder = Proto3TypesProtos.Proto3Types.newBuilder();
+ Map<String, Object> map = new HashMap<>();
+ Proto3TypesProtos.StructMessage.Builder subMsgBuilder1 = Proto3TypesProtos.StructMessage.newBuilder();
+ Proto3TypesProtos.StructMessage.Builder subMsgBuilder2 = Proto3TypesProtos.StructMessage.newBuilder();
+ Map<String, Object> subMap1 = new HashMap<>();
+ Map<String, Object> subMap2 = new HashMap<>();
+
+ long int64 = 0;
+ msgBuilder.setInt64(int64);
+ map.put("int64", int64);
+
+ int int32 = 0;
+ msgBuilder.setInt32(int32);
+ map.put("int32", int32);
+
+ String text = "";
+ msgBuilder.setText(text);
+ map.put("text", text);
+
+ byte[] bytes = new byte[]{};
+ msgBuilder.setBytes(ByteString.copyFrom(bytes));
+ map.put("bytes", bytes);
+
+ int enum_val = 0;
+ msgBuilder.setEnumValValue(enum_val);
+ map.put("enum_val", enum_val);
+
+ // subMsg start
+ long id = 0;
+ subMsgBuilder1.setId(id);
+ subMap1.put("id", id);
+
+ String name = "";
+ subMsgBuilder1.setName(name);
+ subMap1.put("name", name);
+
+ int age = 0;
+ subMsgBuilder1.setAge(age);
+ subMap1.put("age", age);
+
+ double score = 0;
+ subMsgBuilder1.setScore(score);
+ subMap1.put("score", score);
+
+ long optional_id = 0;
+ subMsgBuilder1.setOptionalId(optional_id);
+ subMap1.put("optional_id", optional_id);
+
+ int optional_age = 0;
+ /*subMsgBuilder1.setOptionalAge(optional_age);
+ subMap1.put("optional_age", optional_age);*/
+
+ id = 0;
+ subMsgBuilder2.setId(id);
+ subMap2.put("id", id);
+
+ name = "";
+ subMsgBuilder2.setName(name);
+ subMap2.put("name", name);
+
+ age = 0;
+ subMsgBuilder2.setAge(age);
+ subMap2.put("age", age);
+
+ score = 0;
+ subMsgBuilder2.setScore(score);
+ subMap2.put("score", score);
+
+ optional_id = 0;
+ subMsgBuilder2.setOptionalId(optional_id);
+ subMap2.put("optional_id", optional_id);
+
+ optional_age = 0;
+ subMsgBuilder2.setOptionalAge(optional_age);
+ subMap2.put("optional_age", optional_age);
+
+ Proto3TypesProtos.StructMessage subMsg1 = subMsgBuilder1.build();
+ Proto3TypesProtos.StructMessage subMsg2 = subMsgBuilder2.build();
+ // subMsg end
+
+ msgBuilder.setMessage(subMsg1);
+ map.put("message", subMap1);
+
+ long optional_int64 = 0;
+ msgBuilder.setOptionalInt64(optional_int64);
+ map.put("optional_int64", optional_int64);
+
+ int optional_int32 = 0;
+ msgBuilder.setOptionalInt32(optional_int32);
+ map.put("optional_int32", optional_int32);
+
+ String optional_text = "";
+ msgBuilder.setOptionalText(optional_text);
+ map.put("optional_text", optional_text);
+
+ byte[] optional_bytes = new byte[]{};
+ msgBuilder.setOptionalBytes(ByteString.copyFrom(optional_bytes));
+ map.put("optional_bytes", optional_bytes);
+
+ int optional_enum_val = 0;
+ msgBuilder.setOptionalEnumValValue(optional_enum_val);
+ map.put("optional_enum_val", optional_enum_val);
+
+ msgBuilder.setOptionalMessage(subMsg2);
+ map.put("optional_message", subMap2);
+
+ List<Long> repeated_int64 = Arrays.asList();
+ msgBuilder.addAllRepeatedInt64(repeated_int64);
+ map.put("repeated_int64", repeated_int64);
+
+ List<Integer> repeated_int32 = Arrays.asList();
+ msgBuilder.addAllRepeatedInt32(repeated_int32);
+ map.put("repeated_int32", repeated_int32);
+
+ msgBuilder.addAllRepeatedMessage(Arrays.asList());
+ map.put("repeated_message", Arrays.asList());
+
+ InputDatas datas = new InputDatas();
+ datas.msg = msgBuilder.build();
+ datas.subMsg1 = subMsg1;
+ datas.subMsg2 = subMsg2;
+ datas.map = map;
+ datas.subMap1 = subMap1;
+ datas.subMap2 = subMap2;
+ return datas;
+ }
+
+ public static InputDatas geneInputDatasUsePartialField(){
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ Proto3TypesProtos.Proto3Types.Builder msgBuilder = Proto3TypesProtos.Proto3Types.newBuilder();
+ Map<String, Object> map = new HashMap<>();
+ Proto3TypesProtos.StructMessage.Builder subMsgBuilder1 = Proto3TypesProtos.StructMessage.newBuilder();
+ Proto3TypesProtos.StructMessage.Builder subMsgBuilder2 = Proto3TypesProtos.StructMessage.newBuilder();
+ Map<String, Object> subMap1 = new HashMap<>();
+ Map<String, Object> subMap2 = new HashMap<>();
+
+ /*long int64 = random.nextLong(1, Long.MAX_VALUE);
+ msgBuilder.setInt64(int64);
+ map.put("int64", int64);*/
+
+ int int32 = random.nextInt(1, Integer.MAX_VALUE);
+ msgBuilder.setInt32(int32);
+ map.put("int32", int32);
+
+ String text = "ut8字符串";
+ msgBuilder.setText(text);
+ map.put("text", text);
+
+ /*byte[] bytes = new byte[]{1, 2, 3, 4, 5};
+ msgBuilder.setBytes(ByteString.copyFrom(bytes));
+ map.put("bytes", bytes);*/
+
+ /*int enum_val = 1;
+ msgBuilder.setEnumValValue(enum_val);
+ map.put("enum_val", enum_val);*/
+
+ // subMsg start
+ long id = random.nextLong(1, Long.MAX_VALUE);
+ subMsgBuilder1.setId(id);
+ subMap1.put("id", id);
+
+ String name = "ut8字符串1";
+ /*subMsgBuilder1.setName(name);
+ subMap1.put("name", name);*/
+
+ int age = random.nextInt(1, Integer.MAX_VALUE);
+ subMsgBuilder1.setAge(age);
+ subMap1.put("age", age);
+
+ double score = random.nextDouble(1, Integer.MAX_VALUE);
+ /*subMsgBuilder1.setScore(score);
+ subMap1.put("score", score);*/
+
+ long optional_id = random.nextLong(1, Long.MAX_VALUE);
+ subMsgBuilder1.setOptionalId(optional_id);
+ subMap1.put("optional_id", optional_id);
+
+ int optional_age = random.nextInt(1, Integer.MAX_VALUE);
+ /*subMsgBuilder1.setOptionalAge(optional_age);
+ subMap1.put("optional_age", optional_age);*/
+
+ id = random.nextLong(1, Long.MAX_VALUE);
+ /*subMsgBuilder2.setId(id);
+ subMap2.put("id", id);*/
+
+ name = "ut8字符串1";
+ subMsgBuilder2.setName(name);
+ subMap2.put("name", name);
+
+ age = random.nextInt(1, Integer.MAX_VALUE);
+ /*subMsgBuilder2.setAge(age);
+ subMap2.put("age", age);*/
+
+ score = random.nextDouble(1, Integer.MAX_VALUE);
+ subMsgBuilder2.setScore(score);
+ subMap2.put("score", score);
+
+ optional_id = random.nextLong(1, Long.MAX_VALUE);
+ /*subMsgBuilder2.setOptionalId(optional_id);
+ subMap2.put("optional_id", optional_id);*/
+
+ optional_age = random.nextInt(1, Integer.MAX_VALUE);
+ subMsgBuilder2.setOptionalAge(optional_age);
+ subMap2.put("optional_age", optional_age);
+
+ Proto3TypesProtos.StructMessage subMsg1 = subMsgBuilder1.build();
+ Proto3TypesProtos.StructMessage subMsg2 = subMsgBuilder2.build();
+ // subMsg end
+
+ /*msgBuilder.setMessage(subMsg1);
+ map.put("message", subMap1);*/
+
+ long optional_int64 = random.nextLong(1, Long.MAX_VALUE);
+ msgBuilder.setOptionalInt64(optional_int64);
+ map.put("optional_int64", optional_int64);
+
+ /*int optional_int32 = random.nextInt(1, Integer.MAX_VALUE);
+ msgBuilder.setOptionalInt32(optional_int32);
+ map.put("optional_int32", optional_int32);*/
+
+ String optional_text = "ut8字符串";
+ msgBuilder.setOptionalText(optional_text);
+ map.put("optional_text", optional_text);
+
+ /*byte[] optional_bytes = new byte[]{1, 2, 3, 4, 5};
+ msgBuilder.setOptionalBytes(ByteString.copyFrom(optional_bytes));
+ map.put("optional_bytes", optional_bytes);*/
+
+ int optional_enum_val = 1;
+ msgBuilder.setOptionalEnumValValue(optional_enum_val);
+ map.put("optional_enum_val", optional_enum_val);
+
+ msgBuilder.setOptionalMessage(subMsg2);
+ map.put("optional_message", subMap2);
+
+ /*List<Long> repeated_int64 = Arrays.asList(1L, 3L, 5L);
+ msgBuilder.addAllRepeatedInt64(repeated_int64);
+ map.put("repeated_int64", repeated_int64);*/
+
+ List<Integer> repeated_int32 = Arrays.asList(1, 3, 5);
+ msgBuilder.addAllRepeatedInt32(repeated_int32);
+ map.put("repeated_int32", repeated_int32);
+
+ msgBuilder.addAllRepeatedMessage(Arrays.asList(subMsg1, subMsg2));
+ map.put("repeated_message", Arrays.asList(subMap1, subMap2));
+
+ InputDatas datas = new InputDatas();
+ datas.msg = msgBuilder.build();
+ datas.subMsg1 = subMsg1;
+ datas.subMsg2 = subMsg2;
+ datas.map = map;
+ datas.subMap1 = subMap1;
+ datas.subMap2 = subMap2;
+ return datas;
+ }
+
+ @Test
+ public void testSerializeAndDeserialize() throws Exception{
+ String path = getClass().getResource("/proto3_types.desc").getPath();
+ Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"Proto3Types");
+ InputDatas inputDatas = geneInputDatas();
+
+ byte[] bytesSerByApi = inputDatas.msg.toByteArray();
+
+ ProtobufSerializer serializer = new ProtobufSerializer(descriptor);
+ byte[] bytesSer = serializer.serialize(inputDatas.map);
+
+ System.out.println(String.format("built-in ser bytes size:%d\nmy ser bytes size:%d", bytesSerByApi.length, bytesSer.length));
+ assertArrayEquals(bytesSerByApi, bytesSer);
+
+ MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false);
+ Map<String, Object> rstMap = messageConverter.converter(bytesSer);
+
+ assertTrue(objEquals(inputDatas.map, rstMap, false), () -> "\n" + inputDatas.map.toString() + "\n" + rstMap.toString());
+ System.out.println(inputDatas.map.toString());
+ System.out.println(rstMap.toString());
+ System.out.println(JSON.toJSONString(inputDatas.map));
+ System.out.println(JSON.toJSONString(rstMap));
+
+ System.out.println(JSON.toJSONString(inputDatas.map).equals(JSON.toJSONString(rstMap)));
+ }
+
+ @Test
+ public void testSerializeAndDeserializeDefaultValue() throws Exception{
+ String path = getClass().getResource("/proto3_types.desc").getPath();
+ Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"Proto3Types");
+ InputDatas inputDatas = geneInputDatasDefaultValue();
+
+ byte[] bytesSerByApi = inputDatas.msg.toByteArray();
+
+ ProtobufSerializer serializer = new ProtobufSerializer(descriptor);
+ byte[] bytesSer = serializer.serialize(inputDatas.map);
+
+ System.out.println(String.format("built-in ser bytes size:%d\nmy ser bytes size:%d", bytesSerByApi.length, bytesSer.length));
+ assertArrayEquals(bytesSerByApi, bytesSer);
+
+ MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false);
+ Map<String, Object> rstMap = messageConverter.converter(bytesSer);
+ messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), true);
+ Map<String, Object> rstMapEmitDefaultValue = messageConverter.converter(bytesSer);
+
+ // message不是null就输出, 数组长度大于0才输出, optional设置值就输出, optional bytes长度为0也输出
+ System.out.println(inputDatas.map.toString());
+ System.out.println(rstMap.toString());
+ System.out.println(rstMapEmitDefaultValue.toString());
+ System.out.println(JSON.toJSONString(inputDatas.map));
+ System.out.println(JSON.toJSONString(rstMap));
+ System.out.println(JSON.toJSONString(rstMapEmitDefaultValue));
+
+ System.out.println(JSON.toJSONString(inputDatas.map).equals(JSON.toJSONString(rstMap)));
+ }
+
+ @Test
+ public void testSerializeAndDeserializeUsePartialField() throws Exception{
+ String path = getClass().getResource("/proto3_types.desc").getPath();
+ Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"Proto3Types");
+ InputDatas inputDatas = geneInputDatasUsePartialField();
+
+ byte[] bytesSerByApi = inputDatas.msg.toByteArray();
+
+ ProtobufSerializer serializer = new ProtobufSerializer(descriptor);
+ byte[] bytesSer = serializer.serialize(inputDatas.map);
+ System.out.println(Base64.getEncoder().encodeToString(bytesSer));
+
+ System.out.println(String.format("built-in ser bytes size:%d\nmy ser bytes size:%d", bytesSerByApi.length, bytesSer.length));
+ assertArrayEquals(bytesSerByApi, bytesSer);
+
+ MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false);
+ Map<String, Object> rstMap = messageConverter.converter(bytesSer);
+
+ assertTrue(objEquals(inputDatas.map, rstMap, false), () -> "\n" + inputDatas.map.toString() + "\n" + rstMap.toString());
+ System.out.println(inputDatas.map.toString());
+ System.out.println(rstMap.toString());
+ System.out.println(JSON.toJSONString(inputDatas.map));
+ System.out.println(JSON.toJSONString(rstMap));
+
+ System.out.println(JSON.toJSONString(inputDatas.map).equals(JSON.toJSONString(rstMap)));
+ }
+
+ @Test
+ public void testSerializeAndDeserializeSessionRecord() throws Exception{
+ String path = getClass().getResource("/session_record_test.desc").getPath();
+ Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"SessionRecord");
+ String json = "{\"recv_time\": 1704350600, \"log_id\": 185826449998479360, \"decoded_as\": \"BASE\", \"session_id\": 290502878495441820, \"start_timestamp_ms\": 1704350566378, \"end_timestamp_ms\": 1704350570816, \"duration_ms\": 4438, \"tcp_handshake_latency_ms\": 1105, \"ingestion_time\": 1704350600, \"processing_time\": 1704350600, \"device_id\": \"21426003\", \"out_link_id\": 65535, \"in_link_id\": 65535, \"device_tag\": \"{\\\"tags\\\":[{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-9140\\\"},{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-9140\\\"}]}\", \"data_center\": \"center-xxg-9140\", \"device_group\": \"group-xxg-9140\", \"sled_ip\": \"192.168.40.81\", \"address_type\": 4, \"vsys_id\": 1, \"t_vsys_id\": 1, \"flags\": 24592, \"flags_identify_info\": \"[1,1,2]\", \"statistics_rule_list\": [406583], \"client_ip\": \"192.56.151.80\", \"client_port\": 62241, \"client_os_desc\": \"Windows\", \"client_geolocation\": \"\\u7f8e\\u56fd.Unknown.Unknown..\", \"server_ip\": \"192.56.222.93\", \"server_port\": 14454, \"server_os_desc\": \"Linux\", \"server_geolocation\": \"\\u7f8e\\u56fd.Unknown.Unknown..\", \"ip_protocol\": \"tcp\", \"decoded_path\": \"ETHERNET.IPv4.TCP\", \"sent_pkts\": 4, \"received_pkts\": 5, \"sent_bytes\": 246, \"received_bytes\": 1809, \"tcp_rtt_ms\": 128, \"tcp_client_isn\": 568305009, \"tcp_server_isn\": 4027331180, \"in_src_mac\": \"a2:fa:dc:56:c7:b3\", \"out_src_mac\": \"48:73:97:96:38:20\", \"in_dest_mac\": \"48:73:97:96:38:20\", \"out_dest_mac\": \"a2:fa:dc:56:c7:b3\"}";
+ Map<String, Object> map = JSON.parseObject(json);
+
+ ProtobufSerializer serializer = new ProtobufSerializer(descriptor);
+ byte[] bytesSer = serializer.serialize(map);
+ System.out.println(Base64.getEncoder().encodeToString(bytesSer));
+
+ System.out.println(String.format("my ser bytes size:%d", bytesSer.length));
+
+ MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false);
+ Map<String, Object> rstMap = messageConverter.converter(bytesSer);
+
+ assertTrue(objEquals(map, rstMap, true), () -> "\n" + JSON.toJSONString(map) + "\n" + JSON.toJSONString(rstMap));
+ System.out.println(map.toString());
+ System.out.println(rstMap.toString());
+ System.out.println(JSON.toJSONString(new TreeMap<>(map)));
+ System.out.println(JSON.toJSONString(new TreeMap<>(rstMap)));
+
+ System.out.println(JSON.toJSONString(new TreeMap<>(map)).equals(JSON.toJSONString(new TreeMap<>(rstMap))));
+ }
+
+
+ public static void main(String[] args) throws Exception{
+ ProtobufEventSchemaTest test = new ProtobufEventSchemaTest();
+ String path = test.getClass().getResource("/session_record_test.desc").getPath();
+ Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"SessionRecord");
+ MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false);
+ ProtobufSerializer serializer = new ProtobufSerializer(descriptor);
+
+ FileInputStream inputStream = new FileInputStream( test.getClass().getResource("/format_protobuf_test_data.json").getPath());
+ LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8");
+ int count = 0;
+ long jsonBytesTotalSize = 0;
+ long protoBytesTotalSize = 0;
+ long jsonBytesMinSize = Long.MAX_VALUE;
+ long protoBytesMinSize = Long.MAX_VALUE;
+ long jsonBytesMaxSize = 0;
+ long protoBytesMaxSize = 0;
+ long totalFieldCount = 0;
+ long minFieldCount = Long.MAX_VALUE;
+ long maxFieldCount = 0;
+
+ CompressionType[] compressionTypes = new CompressionType[]{
+ CompressionType.NONE, CompressionType.SNAPPY, CompressionType.LZ4, CompressionType.GZIP, CompressionType.ZSTD
+ };
+ long[][] compressionBytesSize = new long[compressionTypes.length][6];
+ for (int i = 0; i < compressionBytesSize.length; i++) {
+ compressionBytesSize[i][0] = 0;
+ compressionBytesSize[i][1] = 0;
+ compressionBytesSize[i][2] = Long.MAX_VALUE;
+ compressionBytesSize[i][3] = Long.MAX_VALUE;
+ compressionBytesSize[i][4] = 0;
+ compressionBytesSize[i][5] = 0;
+ }
+
+ while (lines.hasNext()){
+ String line = lines.next().trim();
+ if(line.isEmpty()){
+ continue;
+ }
+
+ Map<String, Object> map = JSON.parseObject(line);
+ int fieldCount = map.size();
+ byte[] bytesProto = serializer.serialize(map);
+ byte[] bytesJson = JSON.toJSONString(map).getBytes(StandardCharsets.UTF_8);
+ jsonBytesTotalSize += bytesJson.length;
+ protoBytesTotalSize += bytesProto.length;
+ jsonBytesMinSize = Math.min(jsonBytesMinSize, bytesJson.length);
+ protoBytesMinSize = Math.min(protoBytesMinSize, bytesProto.length);
+ jsonBytesMaxSize = Math.max(jsonBytesMaxSize, bytesJson.length);
+ protoBytesMaxSize = Math.max(protoBytesMaxSize, bytesProto.length);
+ totalFieldCount += fieldCount;
+ minFieldCount = Math.min(minFieldCount, fieldCount);
+ maxFieldCount = Math.max(maxFieldCount, fieldCount);
+
+ Map<String, Object> rstMap = messageConverter.converter(bytesProto);
+ Preconditions.checkArgument(test.objEquals(map, rstMap, true), "\n" + JSON.toJSONString(new TreeMap<>(map)) + "\n" + JSON.toJSONString(new TreeMap<>(rstMap)));
+ Preconditions.checkArgument(JSON.toJSONString(new TreeMap<>(map)).equals(JSON.toJSONString(new TreeMap<>(rstMap))), "\n" + JSON.toJSONString(new TreeMap<>(map)) + "\n" + JSON.toJSONString(new TreeMap<>(rstMap)));
+ count++;
+
+ for (int i = 0; i < compressionTypes.length; i++) {
+ CompressionType compressionType = compressionTypes[i];
+ ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(1024 * 16);
+ OutputStream outputStream = compressionType.wrapForOutput(bufferStream, (byte) 2);
+ outputStream.write(bytesJson);
+ outputStream.close();
+ int jsonCompressSize = bufferStream.position();
+
+ bufferStream = new ByteBufferOutputStream(1024 * 16);
+ outputStream = compressionType.wrapForOutput(bufferStream, (byte) 2);
+ outputStream.write(bytesProto);
+ outputStream.close();
+ int protoCompressSize = bufferStream.position();
+
+ compressionBytesSize[i][0] += jsonCompressSize;
+ compressionBytesSize[i][1] += protoCompressSize;
+ compressionBytesSize[i][2] = Math.min(compressionBytesSize[i][2], jsonCompressSize);
+ compressionBytesSize[i][3] = Math.min(compressionBytesSize[i][3], protoCompressSize);
+ compressionBytesSize[i][4] = Math.max(compressionBytesSize[i][4], jsonCompressSize);
+ compressionBytesSize[i][5] = Math.max(compressionBytesSize[i][5], protoCompressSize);
+ }
+
+ }
+ System.out.println(String.format("count:%d, avgFieldCount:%d, minFieldCount:%d, maxFieldCount:%d, jsonBytesAvgSize:%d, protoBytesAvgSize:%d, jsonBytesMinSize:%d, protoBytesMinSize:%d, jsonBytesMaxSize:%d, protoBytesMaxSize:%d",
+ count, totalFieldCount/count, minFieldCount, maxFieldCount, jsonBytesTotalSize/count, protoBytesTotalSize/count,
+ jsonBytesMinSize, protoBytesMinSize, jsonBytesMaxSize, protoBytesMaxSize));
+ for (int i = 0; i < compressionTypes.length; i++) {
+ CompressionType compressionType = compressionTypes[i];
+ System.out.println(String.format("compression(%s): count:%d, jsonBytesAvgSize:%d, protoBytesAvgSize:%d, avgRatio:%.2f, jsonBytesMinSize:%d, protoBytesMinSize:%d, minRatio:%.2f, jsonBytesMaxSize:%d, protoBytesMaxSize:%d, maxRatio:%.2f",
+ compressionType, count, compressionBytesSize[i][0]/count, compressionBytesSize[i][1]/count, (((double)compressionBytesSize[i][1])/count)/(compressionBytesSize[i][0]/count),
+ compressionBytesSize[i][2], compressionBytesSize[i][3], ((double)compressionBytesSize[i][3])/(compressionBytesSize[i][2]),
+ compressionBytesSize[i][4], compressionBytesSize[i][5], ((double)compressionBytesSize[i][5])/(compressionBytesSize[i][4])));
+ }
+ }
+
+ @Test
+ public void testArrayInstance() throws Exception{
+ Object bytes = new byte[]{1, 2, 3, 4, 5};
+ Object ints = new int[]{1, 2, 3, 4, 5};
+
+ System.out.println(bytes.getClass().isArray());
+ System.out.println(bytes instanceof byte[]);
+ System.out.println(bytes instanceof int[]);
+ System.out.println(ints.getClass().isArray());
+ System.out.println(ints instanceof byte[]);
+ System.out.println(ints instanceof int[]);
+ }
+
+ private boolean objEquals(Object value1, Object value2, boolean numConvert){
+ if(value1 == null){
+ if(value1 != value2){
+ return false;
+ }
+ }else if(value2 == null){
+ return false;
+ }else if(value1 instanceof Map){
+ if(!mapEquals((Map<String, Object>) value1, (Map<String, Object>) value2, numConvert)){
+ return false;
+ }
+ }else if(value1 instanceof List){
+ if(!listEquals((List< Object>) value1, (List< Object>) value2, numConvert)){
+ return false;
+ }
+ }else if(value1 instanceof byte[]){
+ if(!Arrays.equals((byte[]) value1, (byte[]) value2)){
+ return false;
+ }
+ }
+ else{
+ if(value1.getClass() != value2.getClass() || !value1.equals(value2)){
+ if(numConvert && value1 instanceof Number && value2 instanceof Number && ((Number) value1).longValue() == ((Number) value2).longValue()){
+
+ }else{
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ private boolean mapEquals(Map<String, Object> map1, Map<String, Object> map2, boolean numConvert){
+ if(map1.size() != map2.size()){
+ return false;
+ }
+
+ for (Map.Entry<String, Object> entry : map1.entrySet()) {
+ Object value1 = entry.getValue();
+ Object value2 = map2.get(entry.getKey());
+ if(!objEquals(value1, value2, numConvert)){
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private boolean listEquals(List< Object> list1, List< Object> list2, boolean numConvert){
+ if(list1.size() != list2.size()){
+ return false;
+ }
+
+ for (int i = 0; i < list1.size(); i++) {
+ if(!objEquals(list1.get(i), list2.get(i), numConvert)){
+ return false;
+ }
+ }
+
+ return true;
+ }
+}
diff --git a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java
index c5d6320..95941e4 100644
--- a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java
+++ b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java
@@ -1,10 +1,10 @@
package com.geedgenetworks.formats.protobuf;
import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.spi.table.connector.SinkProvider;
-import com.geedgenetworks.spi.table.connector.SinkTableFactory;
-import com.geedgenetworks.spi.table.connector.SourceProvider;
-import com.geedgenetworks.spi.table.connector.SourceTableFactory;
+import com.geedgenetworks.spi.sink.SinkProvider;
+import com.geedgenetworks.spi.sink.SinkTableFactory;
+import com.geedgenetworks.spi.source.SourceProvider;
+import com.geedgenetworks.spi.source.SourceTableFactory;
import com.geedgenetworks.spi.table.event.Event;
import com.geedgenetworks.spi.table.factory.FactoryUtil;
import com.geedgenetworks.spi.table.factory.TableFactory;
@@ -27,6 +27,7 @@ class ProtobufFormatFactoryTest {
SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, "inline");
Map<String, String> options = new HashMap<>();
+ options.put("repeat.count", "3");
options.put("data", Base64.getEncoder().encodeToString(inputDatas.msg.toByteArray()));
options.put("type", "base64");
options.put("format", "protobuf");
diff --git a/groot-formats/format-protobuf/src/test/resources/format_protobuf_test_data.json b/groot-formats/format-protobuf/src/test/resources/format_protobuf_test_data.json
new file mode 100644
index 0000000..51dac53
--- /dev/null
+++ b/groot-formats/format-protobuf/src/test/resources/format_protobuf_test_data.json
@@ -0,0 +1 @@
+{"in_src_mac":"58:b3:8f:fa:3b:11","in_dest_mac":"48:73:97:96:38:27","out_src_mac":"48:73:97:96:38:27","out_dest_mac":"58:b3:8f:fa:3b:11","ip_protocol":"tcp","address_type":4,"client_ip":"192.168.32.110","server_ip":"180.163.210.217","client_port":54570,"server_port":8081,"tcp_client_isn":3530397760,"tcp_server_isn":1741812485,"tcp_rtt_ms":28,"tcp_handshake_latency_ms":28,"direction":"Outbound","in_link_id":29,"out_link_id":29,"start_timestamp_ms":1731167469371,"end_timestamp_ms":1731167474466,"duration_ms":5095,"sent_pkts":6,"sent_bytes":572,"tcp_c2s_ip_fragments":0,"received_pkts":4,"tcp_s2c_ip_fragments":0,"received_bytes":266,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_s2c_o3_pkts":0,"tcp_s2c_lost_bytes":0,"flags":28680,"flags_identify_info":[1,4,1,2],"app_transition":"unknown","decoded_as":"BASE","app_content":"unknown","app":"unknown","decoded_path":"ETHERNET.IPv4.TCP","client_country":"Private Network","server_country":"CN","app_category":"networking","server_asn":4812,"c2s_ttl":127,"s2c_ttl":47,"t_vsys_id":1,"vsys_id":1,"statistics_rule_list":[7731,7689,7532,7531,7372],"session_id":290530145510806375,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"XXG-TSG-BJ","device_group":"XXG-TSG-BJ","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"XXG-TSG-BJ\"},{\"tag\":\"device_group\",\"value\":\"XXG-TSG-BJ\"}]}","device_id":"9800165603191146","sled_ip":"192.168.40.62","dup_traffic_flag":0,"sc_rule_list":[4303],"sc_rsp_raw":[2002],"encapsulation":"[{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"c2s_source_mac\":\"48:73:97:96:38:27\",\"c2s_destination_mac\":\"58:b3:8f:fa:3b:11\",\"s2c_source_mac\":\"58:b3:8f:fa:3b:11\",\"s2c_destination_mac\":\"48:73:97:96:38:27\"}]","client_ip_tags":["Country Code:Private Network"],"server_ip_tags":["Country:China","ASN:4812","Country Code:CN"]} \ No newline at end of file
diff --git a/groot-formats/format-raw/pom.xml b/groot-formats/format-raw/pom.xml
index 3433e64..11aa4d1 100644
--- a/groot-formats/format-raw/pom.xml
+++ b/groot-formats/format-raw/pom.xml
@@ -13,6 +13,30 @@
<name>Groot : Formats : Format-Raw </name>
<dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka_${scala.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project> \ No newline at end of file
diff --git a/groot-formats/pom.xml b/groot-formats/pom.xml
index 78e8e35..1d42523 100644
--- a/groot-formats/pom.xml
+++ b/groot-formats/pom.xml
@@ -21,6 +21,7 @@
</modules>
<dependencies>
+
<dependency>
<groupId>com.geedgenetworks</groupId>
<artifactId>groot-spi</artifactId>
@@ -30,15 +31,11 @@
<dependency>
<groupId>com.geedgenetworks</groupId>
- <artifactId>groot-core</artifactId>
+ <artifactId>groot-common</artifactId>
<version>${revision}</version>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
- </dependency>
</dependencies>
diff --git a/groot-spi/pom.xml b/groot-spi/pom.xml
index da69f1b..e27a9db 100644
--- a/groot-spi/pom.xml
+++ b/groot-spi/pom.xml
@@ -17,6 +17,7 @@
<groupId>com.geedgenetworks</groupId>
<artifactId>groot-common</artifactId>
<version>${revision}</version>
+ <scope>provided</scope>
</dependency>
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/metrics/InternalMetrics.java b/groot-spi/src/main/java/com/geedgenetworks/spi/metrics/InternalMetrics.java
index 0bd3cc2..172072a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/metrics/InternalMetrics.java
+++ b/groot-spi/src/main/java/com/geedgenetworks/spi/metrics/InternalMetrics.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.core.metrics;
+package com.geedgenetworks.spi.metrics;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/AggregateConfig.java b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/AggregateConfig.java
index 3c06c8f..1d64d0d 100644
--- a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/AggregateConfig.java
+++ b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/AggregateConfig.java
@@ -11,7 +11,6 @@ import java.util.List;
@Data
public class AggregateConfig extends ProcessorConfig {
-
private List<String> group_by_fields;
private String window_timestamp_field;
private String window_type;
diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/AggregateProcessor.java b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/AggregateProcessor.java
deleted file mode 100644
index 4281c30..0000000
--- a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/AggregateProcessor.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package com.geedgenetworks.spi.processor;
-
-public interface AggregateProcessor extends Processor<AggregateConfig> {
-
-}
diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/Processor.java b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/Processor.java
index 5d994bb..ad42566 100644
--- a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/Processor.java
+++ b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/Processor.java
@@ -2,18 +2,17 @@ package com.geedgenetworks.spi.processor;
import com.geedgenetworks.spi.table.event.Event;
import com.typesafe.config.Config;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.Serializable;
import java.util.Map;
public interface Processor<T extends ProcessorConfig> extends Serializable {
- DataStream<Event> processorFunction(
- DataStream<Event> singleOutputStreamOperator,
- T processorConfig, ExecutionConfig config) ;
+ DataStream<Event> process(StreamExecutionEnvironment env, DataStream<Event> input, T processorConfig) ;
String type();
+
T checkConfig(String name, Map<String, Object> configProperties, Config typeSafeConfig);
}
diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorConfig.java b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorConfig.java
index 80f7ca2..e0fb40c 100644
--- a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorConfig.java
+++ b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorConfig.java
@@ -10,8 +10,8 @@ import java.util.Map;
public class ProcessorConfig implements Serializable {
private String type;
private int parallelism;
- private Map<String, Object> properties;
private String name;
private List<String> output_fields;
private List<String> remove_fields;
+ private Map<String, Object> properties;
}
diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorFactory.java b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorFactory.java
new file mode 100644
index 0000000..f7f0076
--- /dev/null
+++ b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorFactory.java
@@ -0,0 +1,7 @@
+package com.geedgenetworks.spi.processor;
+
+public interface ProcessorFactory {
+ String type();
+ Processor<?> createProcessor();
+
+}
diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorProvider.java b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorProvider.java
new file mode 100644
index 0000000..f71a560
--- /dev/null
+++ b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorProvider.java
@@ -0,0 +1,17 @@
+package com.geedgenetworks.spi.processor;
+
+import java.util.ServiceLoader;
+
+public class ProcessorProvider {
+
+ public static Processor<?> load(String type) {
+ ServiceLoader<ProcessorFactory> loader = ServiceLoader.load(ProcessorFactory.class);
+ for (ProcessorFactory factory : loader) {
+ if (factory.type().equals(type)) {
+ return factory.createProcessor();
+ }
+ }
+ throw new IllegalArgumentException("Processor type not found: " + type);
+ }
+
+}
diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProjectionProcessor.java b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProjectionProcessor.java
deleted file mode 100644
index 27643a6..0000000
--- a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProjectionProcessor.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package com.geedgenetworks.spi.processor;
-
-public interface ProjectionProcessor extends Processor<ProjectionConfig> {
-
-}
diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/TableProcessor.java b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/TableProcessor.java
deleted file mode 100644
index f6f904c..0000000
--- a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/TableProcessor.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package com.geedgenetworks.spi.processor;
-
-public interface TableProcessor extends Processor<TableConfig> {
-
-}
diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SinkProvider.java b/groot-spi/src/main/java/com/geedgenetworks/spi/sink/SinkProvider.java
index 79e4747..0c2f425 100644
--- a/groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SinkProvider.java
+++ b/groot-spi/src/main/java/com/geedgenetworks/spi/sink/SinkProvider.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.spi.table.connector;
+package com.geedgenetworks.spi.sink;
import com.geedgenetworks.spi.table.event.Event;
import org.apache.flink.streaming.api.datastream.DataStream;
diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SinkTableFactory.java b/groot-spi/src/main/java/com/geedgenetworks/spi/sink/SinkTableFactory.java
index 6f6e440..ecbb72c 100644
--- a/groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SinkTableFactory.java
+++ b/groot-spi/src/main/java/com/geedgenetworks/spi/sink/SinkTableFactory.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.spi.table.connector;
+package com.geedgenetworks.spi.sink;
import com.geedgenetworks.spi.table.factory.TableFactory;
diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SourceProvider.java b/groot-spi/src/main/java/com/geedgenetworks/spi/source/SourceProvider.java
index c792fbe..f7cf68f 100644
--- a/groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SourceProvider.java
+++ b/groot-spi/src/main/java/com/geedgenetworks/spi/source/SourceProvider.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.spi.table.connector;
+package com.geedgenetworks.spi.source;
import com.geedgenetworks.spi.table.event.Event;
import com.geedgenetworks.spi.table.type.StructType;
diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SourceTableFactory.java b/groot-spi/src/main/java/com/geedgenetworks/spi/source/SourceTableFactory.java
index 14ca025..8e56e41 100644
--- a/groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SourceTableFactory.java
+++ b/groot-spi/src/main/java/com/geedgenetworks/spi/source/SourceTableFactory.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.spi.table.connector;
+package com.geedgenetworks.spi.source;
import com.geedgenetworks.spi.table.factory.TableFactory;