diff options
| author | doufenghu <[email protected]> | 2024-11-10 22:43:16 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-11-10 22:43:16 +0800 |
| commit | 73a5f46181af3c9e596e8b08dc27f63339b04c53 (patch) | |
| tree | 93bb7a830deb742211ec7cb8d8416002b4a5e54e | |
| parent | 16769de2e5ba334a5cfaacd8a53db2989264d022 (diff) | |
[Feature][SPI] SPI/Common module 依赖库梳理,xxExecutor删除不必要的参数传递。
84 files changed, 1246 insertions, 1096 deletions
diff --git a/docs/connector/connector.md b/docs/connector/connector.md index 93d64b0..ff495d4 100644 --- a/docs/connector/connector.md +++ b/docs/connector/connector.md @@ -70,7 +70,7 @@ schema: To retrieve the schema from a local file using its absolute path. -> Ensures that the file path is accessible to all nodes in your Flink cluster. +> Ensures that the file path is accessible to all operatorNodes in your Flink cluster. ```yaml schema: diff --git a/docs/connector/formats/protobuf.md b/docs/connector/formats/protobuf.md index 2dfb65e..e991fca 100644 --- a/docs/connector/formats/protobuf.md +++ b/docs/connector/formats/protobuf.md @@ -13,7 +13,7 @@ ## Format Options -> Ensures that the file path is accessible to all nodes in your Flink cluster. +> Ensures that the file path is accessible to all operatorNodes in your Flink cluster. | Name | Type | Required | Default | Description | |-------------------------------|---------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| diff --git a/docs/connector/source/file.md b/docs/connector/source/file.md index bdbf74e..75de66b 100644 --- a/docs/connector/source/file.md +++ b/docs/connector/source/file.md @@ -24,7 +24,7 @@ File source custom properties. This example read data of file test source and print to console. -> Ensures that the file path is accessible to all nodes in your Flink cluster. +> Ensures that the file path is accessible to all operatorNodes in your Flink cluster. ```yaml sources: diff --git a/docs/grootstream-config.md b/docs/grootstream-config.md index b7fd037..c5e3046 100644 --- a/docs/grootstream-config.md +++ b/docs/grootstream-config.md @@ -24,7 +24,7 @@ grootstream: The knowledge base is a collection of libraries that can be used in the groot-stream job's UDFs. File system type can be specified `local`, `http` or `hdfs`. If the value is `http`, must be ` QGW Knowledge Base Repository` URL. The library will be dynamically updated according to the `scheduler.knowledge_base.update.interval.minutes` configuration. -If the value is `local`, the library will be loaded from the local file system. Need to manually upgrade all nodes in the Flink cluster when the library is updated. +If the value is `local`, the library will be loaded from the local file system. Need to manually upgrade all operatorNodes in the Flink cluster when the library is updated. If the value is `hdfs`, the library will be loaded from the HDFS file system. More details about hdfs operation can be found in the [HDFS](./faq.md#hadoop-hdfs-commands-for-beginners). | Name | Type | Required | Default | Description | @@ -36,7 +36,7 @@ If the value is `hdfs`, the library will be loaded from the HDFS file system. Mo ### Define the knowledge base file from a local file -> Ensures that the file path is accessible to all nodes in your Flink cluster. +> Ensures that the file path is accessible to all operatorNodes in your Flink cluster. ```yaml grootstream: @@ -65,7 +65,7 @@ grootstream: ### Define the knowledge base file from a HDFS file system -> Ensure that the HDFS file system is accessible to all nodes in your Flink cluster. +> Ensure that the HDFS file system is accessible to all operatorNodes in your Flink cluster. ```yaml grootstream: diff --git a/docs/processor/projection-processor.md b/docs/processor/projection-processor.md index 4319f36..d11fcc9 100644 --- a/docs/processor/projection-processor.md +++ b/docs/processor/projection-processor.md @@ -38,7 +38,7 @@ filters: processing_pipelines: # [object] Define Processors projection_processor: # [object] Define projection processor name - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + type: com.geedgenetworks.core.processor.projection.ProjectionProcessor remove_fields: [http_request_line, http_response_line, http_response_content_type] functions: # [array of object] Define UDFs - function: DROP # [string] Define DROP function for filter event diff --git a/docs/user-guide.md b/docs/user-guide.md index d52cfed..1db9f91 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -38,7 +38,7 @@ filters: preprocessing_pipelines: preprocessor: - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + type: com.geedgenetworks.core.processor.projection.ProjectionProcessor functions: - function: EVAL output_fields: [additional_field_subdomain] @@ -47,7 +47,7 @@ preprocessing_pipelines: processing_pipelines: processor: - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + type: com.geedgenetworks.core.processor.projection.ProjectionProcessor remove_fields: [log_id] output_fields: [] functions: @@ -58,7 +58,7 @@ processing_pipelines: postprocessing_pipelines: postprocessor: - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + type: com.geedgenetworks.core.processor.projection.ProjectionProcessor remove_fields: [dup_traffic_flag] sinks: diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml index a6c72e6..150c941 100644 --- a/groot-bootstrap/pom.xml +++ b/groot-bootstrap/pom.xml @@ -30,6 +30,18 @@ <dependency> <groupId>com.geedgenetworks</groupId> + <artifactId>groot-spi</artifactId> + <version>${revision}</version> + </dependency> + + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-common</artifactId> + <version>${revision}</version> + </dependency> + + <dependency> + <groupId>com.geedgenetworks</groupId> <artifactId>connector-kafka</artifactId> <version>${revision}</version> <scope>${scope}</scope> @@ -127,8 +139,6 @@ <scope>${scope}</scope> </dependency> - - <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_${scala.version}</artifactId> @@ -165,9 +175,6 @@ <scope>${scope}</scope> </dependency> - - - <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils_${scala.version}</artifactId> @@ -175,9 +182,6 @@ <scope>test</scope> </dependency> - - - </dependencies> <build> diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java deleted file mode 100644 index 6f33cae..0000000 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/ProcessorType.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.geedgenetworks.bootstrap.enums; - -public enum ProcessorType { - SOURCE("source"), - FILTER("filter"), - SPLIT("split"), - PREPROCESSING("preprocessing"), - PROCESSING("processing"), - POSTPROCESSING("postprocessing"), - SINK("sink"); - - private final String type; - - ProcessorType(String type) {this.type = type;} - - public String getType() { - return type; - } -} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/StageType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/StageType.java new file mode 100644 index 0000000..8b4e154 --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/StageType.java @@ -0,0 +1,32 @@ +package com.geedgenetworks.bootstrap.enums; + +public enum StageType { + SOURCE("source"), + FILTER("filter"), + SPLIT("split"), + PREPROCESSING("preprocessing"), + PROCESSING("processing"), + POSTPROCESSING("postprocessing"), + SINK("sink"); + + private final String type; + public String getType() { + return type; + } + StageType(String type) {this.type = type;} + + public static StageType fromType(String type) { + for (StageType stage : values()) { + if (stage.type.equalsIgnoreCase(type)) { + return stage; + } + } + throw new IllegalArgumentException("Unknown type: " + type); + } + + @Override + public String toString() { + return type; + } + +} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java index e0828a0..fe440f7 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java @@ -16,9 +16,9 @@ public abstract class AbstractExecutor<K, V> protected final Config operatorConfig; protected final Map<K,V> operatorMap; - protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) { + protected AbstractExecutor(Config operatorConfig) { this.operatorConfig = operatorConfig; - this.operatorMap = initialize(jarPaths, operatorConfig); + this.operatorMap = initialize(operatorConfig); } @Override @@ -27,7 +27,7 @@ public abstract class AbstractExecutor<K, V> } - protected abstract Map<K, V> initialize(List<URL> jarPaths, Config operatorConfig); + protected abstract Map<K, V> initialize(Config operatorConfig); protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER = (classLoader, url) -> { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java index ec748cc..a45380e 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java @@ -19,26 +19,24 @@ import java.util.ServiceLoader; public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, ProcessorConfig> { - protected AbstractProcessorExecutor(List<URL> jarPaths, Config operatorConfig) { - super(jarPaths, operatorConfig); + protected AbstractProcessorExecutor(Config operatorConfig) { + super(operatorConfig); } @Override - public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { - ProcessorConfig processorConfig = operatorMap.get(node.getName()); + ProcessorConfig processorConfig = operatorMap.get(operatorNode.getName()); boolean found = false; // 标志变量 ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class); for (Processor processor : processors) { if(processor.type().equals(processorConfig.getType())){ found = true; - if (node.getParallelism() > 0) { - processorConfig.setParallelism(node.getParallelism()); + if (operatorNode.getParallelism() > 0) { + processorConfig.setParallelism(operatorNode.getParallelism()); } try { - - dataStream = processor.processorFunction( - dataStream, processorConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); + input = processor.process(jobRuntimeEnvironment.getStreamExecutionEnvironment(), input, processorConfig); } catch (Exception e) { throw new JobExecuteException("Create orderby pipeline instance failed!", e); } @@ -48,7 +46,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, if (!found) { throw new JobExecuteException("No matching processor found for type: " + processorConfig.getType()); } - return dataStream; + return input; } protected ProcessorConfig checkConfig(String key, Map<String, Object> value, Config processorsConfig) { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java index d57d6bf..e43c949 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Executor.java @@ -4,7 +4,7 @@ import com.geedgenetworks.bootstrap.exception.JobExecuteException; public interface Executor<T, ENV extends RuntimeEnvironment> { - T execute(T dataStream, Node edge) throws JobExecuteException; + T execute(T dataStream, OperatorNode edge) throws JobExecuteException; void setRuntimeEnvironment(ENV runtimeEnvironment); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java index 1ea19f8..d70420e 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java @@ -1,6 +1,6 @@ package com.geedgenetworks.bootstrap.execution; -import com.geedgenetworks.bootstrap.enums.ProcessorType; +import com.geedgenetworks.bootstrap.enums.StageType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.config.Constants; import com.geedgenetworks.common.config.CheckConfigUtil; @@ -27,14 +27,14 @@ import java.util.ServiceLoader; */ @Slf4j public class FilterExecutor extends AbstractExecutor<String, FilterConfig> { - private static final String PROCESSOR_TYPE = ProcessorType.FILTER.getType(); + private static final String PROCESSOR_TYPE = StageType.FILTER.getType(); - public FilterExecutor(List<URL> jarPaths, Config config) { - super(jarPaths, config); + public FilterExecutor(Config config) { + super(config); } @Override - protected Map<String, FilterConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + protected Map<String, FilterConfig> initialize(Config operatorConfig) { Map<String, FilterConfig> filterConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.FILTERS)) { Config filterConfig = operatorConfig.getConfig(Constants.FILTERS); @@ -54,20 +54,20 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> { } @Override - public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { - FilterConfig filterConfig = operatorMap.get(node.getName()); + public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { + FilterConfig filterConfig = operatorMap.get(operatorNode.getName()); boolean found = false; // 标志变量 ServiceLoader<Filter> filters = ServiceLoader.load(Filter.class); for (Filter filter : filters) { if(filter.type().equals(filterConfig.getType())){ found = true; - if (node.getParallelism() > 0) { - filterConfig.setParallelism(node.getParallelism()); + if (operatorNode.getParallelism() > 0) { + filterConfig.setParallelism(operatorNode.getParallelism()); } try { - dataStream = + input = filter.filterFunction( - dataStream, filterConfig); + input, filterConfig); } catch (Exception e) { throw new JobExecuteException("Create filter instance failed!", e); } @@ -77,7 +77,7 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> { if (!found) { throw new JobExecuteException("No matching filter found for type: " + filterConfig.getType()); } - return dataStream; + return input; } protected FilterConfig checkConfig(String key, Map<String, Object> value, Config config) { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index 325f8a4..cd70f44 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -1,7 +1,7 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson2.JSONObject; -import com.geedgenetworks.bootstrap.enums.ProcessorType; +import com.geedgenetworks.bootstrap.enums.StageType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.config.Constants; @@ -36,7 +36,7 @@ public class JobExecution { private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor; private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor; private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor; - private final List<Node> nodes; + private final List<OperatorNode> operatorNodes; private final List<URL> jarPaths; private final Map<String,String> nodeNameWithSplitTags = new HashMap<>(); @@ -51,13 +51,13 @@ public class JobExecution { registerPlugin(jobConfig.getConfig(Constants.APPLICATION)); - this.sourceExecutor = new SourceExecutor(jarPaths, jobConfig); - this.sinkExecutor = new SinkExecutor(jarPaths, jobConfig); - this.filterExecutor = new FilterExecutor(jarPaths, jobConfig); - this.splitExecutor = new SplitExecutor(jarPaths, jobConfig); - this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, jobConfig); - this.processingExecutor = new ProcessingExecutor(jarPaths, jobConfig); - this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, jobConfig); + this.sourceExecutor = new SourceExecutor(jobConfig); + this.sinkExecutor = new SinkExecutor(jobConfig); + this.filterExecutor = new FilterExecutor(jobConfig); + this.splitExecutor = new SplitExecutor(jobConfig); + this.preprocessingExecutor = new PreprocessingExecutor(jobConfig); + this.processingExecutor = new ProcessingExecutor(jobConfig); + this.postprocessingExecutor = new PostprocessingExecutor(jobConfig); this.jobRuntimeEnvironment = JobRuntimeEnvironment.getInstance(this.registerPlugin(jobConfig, jarPaths), grootStreamConfig); @@ -68,7 +68,7 @@ public class JobExecution { this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.nodes = buildJobNode(jobConfig); + this.operatorNodes = buildJobNode(jobConfig); } @@ -153,7 +153,7 @@ public class JobExecution { return config; } - private List<Node> buildJobNode(Config config) { + private List<OperatorNode> buildJobNode(Config config) { Map<String, Object> sources = Maps.newHashMap(); Map<String, Object> sinks = Maps.newHashMap(); @@ -187,34 +187,34 @@ public class JobExecution { List<? extends Config> topology = config.getConfig(Constants.APPLICATION).getConfigList(Constants.APPLICATION_TOPOLOGY); - List<Node> nodes = Lists.newArrayList(); + List<OperatorNode> operatorNodes = Lists.newArrayList(); topology.forEach(item -> { - Node node = JSONObject.from(item.root().unwrapped()).toJavaObject(Node.class); - nodes.add(node); + OperatorNode operatorNode = JSONObject.from(item.root().unwrapped()).toJavaObject(OperatorNode.class); + operatorNodes.add(operatorNode); }); - for (Node node : nodes) { - if (sources.containsKey(node.getName())) { - node.setType(ProcessorType.SOURCE); - } else if (sinks.containsKey(node.getName())) { - node.setType(ProcessorType.SINK); - } else if (filters.containsKey(node.getName())) { - node.setType(ProcessorType.FILTER); - } else if (splits.containsKey(node.getName())) { - node.setType(ProcessorType.SPLIT); - } else if (preprocessingPipelines.containsKey(node.getName())) { - node.setType(ProcessorType.PREPROCESSING); - } else if (processingPipelines.containsKey(node.getName())) { - node.setType(ProcessorType.PROCESSING); - } else if (postprocessingPipelines.containsKey(node.getName())) { - node.setType(ProcessorType.POSTPROCESSING); + for (OperatorNode operatorNode : operatorNodes) { + if (sources.containsKey(operatorNode.getName())) { + operatorNode.setType(StageType.SOURCE); + } else if (sinks.containsKey(operatorNode.getName())) { + operatorNode.setType(StageType.SINK); + } else if (filters.containsKey(operatorNode.getName())) { + operatorNode.setType(StageType.FILTER); + } else if (splits.containsKey(operatorNode.getName())) { + operatorNode.setType(StageType.SPLIT); + } else if (preprocessingPipelines.containsKey(operatorNode.getName())) { + operatorNode.setType(StageType.PREPROCESSING); + } else if (processingPipelines.containsKey(operatorNode.getName())) { + operatorNode.setType(StageType.PROCESSING); + } else if (postprocessingPipelines.containsKey(operatorNode.getName())) { + operatorNode.setType(StageType.POSTPROCESSING); } else { - throw new JobExecuteException("unsupported process type " + node.getName()); + throw new JobExecuteException("unsupported process type " + operatorNode.getName()); } } - return nodes; + return operatorNodes; } @@ -223,14 +223,14 @@ public class JobExecution { if (!jobRuntimeEnvironment.isLocalMode() && !jobRuntimeEnvironment.isTestMode()) { jobRuntimeEnvironment.registerPlugin(jarPaths); } - List<Node> sourceNodes = nodes - .stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList()); + List<OperatorNode> sourceOperatorNodes = operatorNodes + .stream().filter(v -> v.getType().name().equals(StageType.SOURCE.name())).collect(Collectors.toList()); DataStream<Event> dataStream = null; - for (Node sourceNode : sourceNodes) { - dataStream = sourceExecutor.execute(dataStream, sourceNode); - for (String nodeName : sourceNode.getDownstream()) { + for (OperatorNode sourceOperatorNode : sourceOperatorNodes) { + dataStream = sourceExecutor.execute(dataStream, sourceOperatorNode); + for (String nodeName : sourceOperatorNode.getDownstream()) { buildJobGraph(dataStream, nodeName); } } @@ -251,68 +251,68 @@ public class JobExecution { } private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) { - Node node = getNode(downstreamNodeName).orElseGet(() -> { + OperatorNode operatorNode = getNode(downstreamNodeName).orElseGet(() -> { throw new JobExecuteException("Can't find downstream node " + downstreamNodeName); }); - if (node.getType().name().equals(ProcessorType.FILTER.name())) { - if (nodeNameWithSplitTags.containsKey(node.getName())) { - dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) { - }), node); + if (operatorNode.getType().name().equals(StageType.FILTER.name())) { + if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) { + dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream) + .getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) {}), operatorNode); } else { - dataStream = filterExecutor.execute(dataStream, node); + dataStream = filterExecutor.execute(dataStream, operatorNode); } - } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) { - if (node.getTags().size() == node.getDownstream().size()) { - for (int i = 0; i < node.getDownstream().size();i++) { - nodeNameWithSplitTags.put(node.getDownstream().get(i),node.getTags().get(i)); + } else if (operatorNode.getType().name().equals(StageType.SPLIT.name())) { + if (operatorNode.getTags().size() == operatorNode.getDownstream().size()) { + for (int i = 0; i < operatorNode.getDownstream().size(); i++) { + nodeNameWithSplitTags.put(operatorNode.getDownstream().get(i), operatorNode.getTags().get(i)); } } else { throw new JobExecuteException("split node downstream size not equal tags size"); } - dataStream = splitExecutor.execute(dataStream, node); - } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - if (nodeNameWithSplitTags.containsKey(node.getName())) { - dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())){ - }), node); + dataStream = splitExecutor.execute(dataStream, operatorNode); + } else if (operatorNode.getType().name().equals(StageType.PREPROCESSING.name())) { + if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) { + dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())){ + }), operatorNode); } else { - dataStream = preprocessingExecutor.execute(dataStream, node); + dataStream = preprocessingExecutor.execute(dataStream, operatorNode); } - } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - if (nodeNameWithSplitTags.containsKey(node.getName())) { - dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) { - }), node); + } else if (operatorNode.getType().name().equals(StageType.PROCESSING.name())) { + if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) { + dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) { + }), operatorNode); } else { - dataStream = processingExecutor.execute(dataStream, node); + dataStream = processingExecutor.execute(dataStream, operatorNode); } - } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - if (nodeNameWithSplitTags.containsKey(node.getName())) { - dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) { - }), node); + } else if (operatorNode.getType().name().equals(StageType.POSTPROCESSING.name())) { + if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) { + dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) { + }), operatorNode); } else { - dataStream = postprocessingExecutor.execute(dataStream, node); + dataStream = postprocessingExecutor.execute(dataStream, operatorNode); } - } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - if (nodeNameWithSplitTags.containsKey(node.getName())) { - dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) { - }), node); + } else if (operatorNode.getType().name().equals(StageType.SINK.name())) { + if (nodeNameWithSplitTags.containsKey(operatorNode.getName())) { + dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(operatorNode.getName())) { + }), operatorNode); } else { - dataStream = sinkExecutor.execute(dataStream, node); + dataStream = sinkExecutor.execute(dataStream, operatorNode); } } else { - throw new JobExecuteException("unsupported process type " + node.getType().name()); + throw new JobExecuteException("unsupported process type " + operatorNode.getType().name()); } - for (String nodeName : node.getDownstream()) { + for (String nodeName : operatorNode.getDownstream()) { buildJobGraph(dataStream, nodeName); } } - private Optional<Node> getNode(String name) { - return nodes.stream().filter(v -> v.getName().equals(name)).findFirst(); + private Optional<OperatorNode> getNode(String name) { + return operatorNodes.stream().filter(v -> v.getName().equals(name)).findFirst(); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/OperatorNode.java index 66303c2..8c4b392 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/OperatorNode.java @@ -1,6 +1,6 @@ package com.geedgenetworks.bootstrap.execution; -import com.geedgenetworks.bootstrap.enums.ProcessorType; +import com.geedgenetworks.bootstrap.enums.StageType; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; @@ -14,11 +14,11 @@ import java.util.List; @NoArgsConstructor @AllArgsConstructor @EqualsAndHashCode -public class Node implements Serializable { +public class OperatorNode implements Serializable { private String name; - private ProcessorType type; - private List<String> downstream = Collections.emptyList(); + private StageType type; private int parallelism; + private List<String> downstream = Collections.emptyList(); private List<String> tags = Collections.emptyList(); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java index e73b7dd..10d9188 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java @@ -1,6 +1,6 @@ package com.geedgenetworks.bootstrap.execution; -import com.geedgenetworks.bootstrap.enums.ProcessorType; +import com.geedgenetworks.bootstrap.enums.StageType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.config.Constants; import com.geedgenetworks.spi.processor.ProcessorConfig; @@ -17,14 +17,14 @@ import java.util.Map; * Initialize config and execute postprocessor */ public class PostprocessingExecutor extends AbstractProcessorExecutor { - private static final String PROCESSOR_TYPE = ProcessorType.POSTPROCESSING.getType(); + private static final String PROCESSOR_TYPE = StageType.POSTPROCESSING.getType(); - public PostprocessingExecutor(List<URL> jarPaths, Config config) { - super(jarPaths, config); + public PostprocessingExecutor(Config config) { + super(config); } @Override - protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + protected Map<String, ProcessorConfig> initialize(Config operatorConfig) { Map<String, ProcessorConfig> postprocessingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) { Config postprocessors = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES); @@ -37,7 +37,7 @@ public class PostprocessingExecutor extends AbstractProcessorExecutor { @Override - public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { - return super.execute(dataStream, node); + public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { + return super.execute(input, operatorNode); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java index 6179265..9acda99 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java @@ -1,6 +1,6 @@ package com.geedgenetworks.bootstrap.execution; -import com.geedgenetworks.bootstrap.enums.ProcessorType; +import com.geedgenetworks.bootstrap.enums.StageType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.config.Constants; import com.geedgenetworks.spi.processor.ProcessorConfig; @@ -19,14 +19,14 @@ import java.util.Map; */ @Slf4j public class PreprocessingExecutor extends AbstractProcessorExecutor { - private static final String PROCESSOR_TYPE = ProcessorType.PREPROCESSING.getType(); + private static final String PROCESSOR_TYPE = StageType.PREPROCESSING.getType(); - public PreprocessingExecutor(List<URL> jarPaths, Config config) { - super(jarPaths, config); + public PreprocessingExecutor(Config config) { + super(config); } @Override - protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + protected Map<String, ProcessorConfig> initialize(Config operatorConfig) { Map<String, ProcessorConfig> preprocessingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) { Config preprocessors = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES); @@ -38,9 +38,9 @@ public class PreprocessingExecutor extends AbstractProcessorExecutor { } @Override - public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { + public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { - return super.execute(dataStream, node); + return super.execute(input, operatorNode); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java index bc6a09e..c49df88 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java @@ -1,15 +1,18 @@ package com.geedgenetworks.bootstrap.execution; -import com.geedgenetworks.bootstrap.enums.ProcessorType; +import com.geedgenetworks.bootstrap.enums.StageType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.config.Constants; +import com.geedgenetworks.spi.processor.Processor; import com.geedgenetworks.spi.processor.ProcessorConfig; +import com.geedgenetworks.spi.processor.ProcessorProvider; import com.geedgenetworks.spi.table.event.Event; import com.google.common.collect.Maps; import com.typesafe.config.Config; import org.apache.flink.streaming.api.datastream.DataStream; import java.net.URL; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -17,27 +20,30 @@ import java.util.Map; * Initialize config and execute processor */ public class ProcessingExecutor extends AbstractProcessorExecutor { - private static final String PROCESSOR_TYPE = ProcessorType.PROCESSING.getType(); + private static final String PROCESSOR_TYPE = StageType.PROCESSING.getType(); + //private Map<String, Processor<?>> processors; - public ProcessingExecutor(List<URL> jarPaths, Config config) { - super(jarPaths, config); + public ProcessingExecutor(Config config) { + super(config); } @Override - protected Map<String, ProcessorConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + protected Map<String, ProcessorConfig> initialize(Config operatorConfig) { Map<String, ProcessorConfig> processingConfigMap = Maps.newHashMap(); + //processors = new HashMap<>(); if (operatorConfig.hasPath(Constants.PROCESSING_PIPELINES)) { - Config processors = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES); - processors.root().unwrapped().forEach((key, value) -> { - processingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, processors)); + Config processingConfig = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES); + processingConfig.root().unwrapped().forEach((key, value) -> { + processingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, processingConfig)); + //processors.put(key, ProcessorProvider.load(((Map<?, ?>) value).get("type").toString())); + }); } return processingConfigMap; } @Override - public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { - - return super.execute(dataStream, node); + public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { + return super.execute(input, operatorNode); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java index b61b6f9..130705a 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SinkExecutor.java @@ -1,7 +1,7 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson.JSONObject; -import com.geedgenetworks.bootstrap.enums.ProcessorType; +import com.geedgenetworks.bootstrap.enums.StageType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; import com.geedgenetworks.common.config.Constants; @@ -11,8 +11,8 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.spi.sink.SinkConfig; import com.geedgenetworks.spi.sink.SinkConfigOptions; -import com.geedgenetworks.spi.table.connector.SinkProvider; -import com.geedgenetworks.spi.table.connector.SinkTableFactory; +import com.geedgenetworks.spi.sink.SinkProvider; +import com.geedgenetworks.spi.sink.SinkTableFactory; import com.geedgenetworks.spi.table.event.Event; import com.geedgenetworks.spi.table.factory.FactoryUtil; import com.geedgenetworks.spi.table.factory.TableFactory; @@ -24,8 +24,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; -import java.net.URL; -import java.util.List; import java.util.Map; /** @@ -33,13 +31,13 @@ import java.util.Map; */ @Slf4j public class SinkExecutor extends AbstractExecutor<String, SinkConfig> { - private static final String PROCESSOR_TYPE = ProcessorType.SINK.getType(); + private static final String PROCESSOR_TYPE = StageType.SINK.getType(); - public SinkExecutor(List<URL> jarPaths, Config config) { - super(jarPaths, config); + public SinkExecutor(Config config) { + super(config); } @Override - protected Map<String, SinkConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + protected Map<String, SinkConfig> initialize(Config operatorConfig) { Map<String, SinkConfig> sinkConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.SINKS)) { @@ -64,8 +62,8 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> { } @Override - public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { - SinkConfig sinkConfig = operatorMap.get(node.getName()); + public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { + SinkConfig sinkConfig = operatorMap.get(operatorNode.getName()); try { SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, sinkConfig.getType()); Map<String, String> options = sinkConfig.getProperties(); @@ -84,9 +82,9 @@ public class SinkExecutor extends AbstractExecutor<String, SinkConfig> { System.out.println(String.format("sink(%s) schema:\n%s", sinkConfig.getName(), schema.getDataType().treeString())); } - DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(dataStream); - if (node.getParallelism() > 0) { - dataStreamSink.setParallelism(node.getParallelism()); + DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(input); + if (operatorNode.getParallelism() > 0) { + dataStreamSink.setParallelism(operatorNode.getParallelism()); } dataStreamSink.name(sinkConfig.getName()); } catch (Exception e) { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java index 3eeaad6..5109540 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SourceExecutor.java @@ -1,7 +1,7 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson2.JSONObject; -import com.geedgenetworks.bootstrap.enums.ProcessorType; +import com.geedgenetworks.bootstrap.enums.StageType; import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.utils.SchemaConfigParse; @@ -12,8 +12,8 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.spi.configuration.SourceConfigOptions; import com.geedgenetworks.spi.source.SourceConfig; -import com.geedgenetworks.spi.table.connector.SourceProvider; -import com.geedgenetworks.spi.table.connector.SourceTableFactory; +import com.geedgenetworks.spi.source.SourceProvider; +import com.geedgenetworks.spi.source.SourceTableFactory; import com.geedgenetworks.spi.table.event.Event; import com.geedgenetworks.spi.table.factory.FactoryUtil; import com.geedgenetworks.spi.table.factory.TableFactory; @@ -28,9 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import java.net.URL; import java.time.Duration; -import java.util.List; import java.util.Map; /** @@ -38,13 +36,13 @@ import java.util.Map; */ @Slf4j public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { - private static final String PROCESSOR_TYPE = ProcessorType.SOURCE.getType(); + private static final String PROCESSOR_TYPE = StageType.SOURCE.getType(); - public SourceExecutor(List<URL> jarPaths, Config config) { - super(jarPaths, config); + public SourceExecutor(Config config) { + super(config); } @Override - protected Map<String, SourceConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + protected Map<String, SourceConfig> initialize(Config operatorConfig) { Map<String, SourceConfig> sourceConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.SOURCES)) { Config sources = operatorConfig.getConfig(Constants.SOURCES); @@ -68,8 +66,8 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { } @Override - public DataStream<Event> execute(DataStream<Event> outputStreamOperator, Node node) throws JobExecuteException { - SourceConfig sourceConfig = operatorMap.get(node.getName()); + public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { + SourceConfig sourceConfig = operatorMap.get(operatorNode.getName()); SingleOutputStreamOperator sourceSingleOutputStreamOperator; try { SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, sourceConfig.getType()); @@ -90,17 +88,17 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { } sourceSingleOutputStreamOperator = sourceProvider.produceDataStream(jobRuntimeEnvironment.getStreamExecutionEnvironment()).name(sourceConfig.getName()); - if (node.getParallelism() > 0) { - sourceSingleOutputStreamOperator.setParallelism(node.getParallelism()); + if (operatorNode.getParallelism() > 0) { + sourceSingleOutputStreamOperator.setParallelism(operatorNode.getParallelism()); } - sourceSingleOutputStreamOperator = setWatermarkIfNecessary(sourceSingleOutputStreamOperator, sourceConfig, node); + sourceSingleOutputStreamOperator = setWatermarkIfNecessary(sourceSingleOutputStreamOperator, sourceConfig, operatorNode); return sourceSingleOutputStreamOperator; } catch (Exception e) { throw new JobExecuteException("Create source instance failed!", e); } } - private SingleOutputStreamOperator<Event> setWatermarkIfNecessary(SingleOutputStreamOperator<Event> dataStream, SourceConfig sourceConfig, Node node){ + private SingleOutputStreamOperator<Event> setWatermarkIfNecessary(SingleOutputStreamOperator<Event> dataStream, SourceConfig sourceConfig, OperatorNode operatorNode){ final String watermarkTimestamp = sourceConfig.getWatermark_timestamp(); if(StringUtils.isNotBlank(watermarkTimestamp)){ String timestampUnit = sourceConfig.getWatermark_timestamp_unit(); @@ -139,8 +137,8 @@ public class SourceExecutor extends AbstractExecutor<String, SourceConfig> { WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofMillis(watermarkLag)) .withTimestampAssigner(timestampAssigner) ); - if (node.getParallelism() > 0) { - dataStream.setParallelism(node.getParallelism()); + if (operatorNode.getParallelism() > 0) { + dataStream.setParallelism(operatorNode.getParallelism()); } } return dataStream; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java index 3d6f264..c142614 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java @@ -29,12 +29,12 @@ import java.util.ServiceLoader; public class SplitExecutor extends AbstractExecutor<String, SplitConfig> { - public SplitExecutor(List<URL> jarPaths, Config config) { - super(jarPaths, config); + public SplitExecutor(Config config) { + super(config); } @Override - protected Map<String, SplitConfig> initialize(List<URL> jarPaths, Config operatorConfig) { + protected Map<String, SplitConfig> initialize(Config operatorConfig) { Map<String, SplitConfig> splitConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.SPLITS)) { Config splitsConfig = operatorConfig.getConfig(Constants.SPLITS); @@ -56,20 +56,20 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> { } @Override - public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { - SplitConfig splitConfig = operatorMap.get(node.getName()); + public DataStream<Event> execute(DataStream<Event> input, OperatorNode operatorNode) throws JobExecuteException { + SplitConfig splitConfig = operatorMap.get(operatorNode.getName()); boolean found = false; // 标志变量 ServiceLoader<Split> splits = ServiceLoader.load(Split.class); for (Split split : splits) { found = true; // 标志变量 if(split.type().equals(splitConfig.getType())){ - if (node.getParallelism() > 0) { - splitConfig.setParallelism(node.getParallelism()); + if (operatorNode.getParallelism() > 0) { + splitConfig.setParallelism(operatorNode.getParallelism()); } try { - dataStream = + input = split.splitFunction( - dataStream, splitConfig); + input, splitConfig); } catch (Exception e) { throw new JobExecuteException("Create split instance failed!", e); } @@ -79,7 +79,7 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> { if (!found) { throw new JobExecuteException("No matching split found for type: " + splitConfig.getType()); } - return dataStream; + return input; } protected SplitConfig checkConfig(String key, Map<String, Object> value, Config config) { diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java index e52fd3b..130478e 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/collect/CollectTableFactory.java @@ -1,7 +1,7 @@ package com.geedgenetworks.bootstrap.main.simple.collect; -import com.geedgenetworks.spi.table.connector.SinkProvider; -import com.geedgenetworks.spi.table.connector.SinkTableFactory; +import com.geedgenetworks.spi.sink.SinkProvider; +import com.geedgenetworks.spi.sink.SinkTableFactory; import com.geedgenetworks.spi.table.event.Event; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.streaming.api.datastream.DataStream; diff --git a/groot-common/pom.xml b/groot-common/pom.xml index 74960b6..66096ae 100644 --- a/groot-common/pom.xml +++ b/groot-common/pom.xml @@ -34,6 +34,23 @@ </dependency> <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>sketches</artifactId> + </dependency> + + <dependency> + <groupId>com.alibaba.nacos</groupId> + <artifactId>nacos-client</artifactId> + <exclusions> + <exclusion> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </exclusion> + </exclusions> + </dependency> + + + <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> </dependency> diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java index afb9906..53cf99a 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java @@ -1,8 +1,8 @@ package com.geedgenetworks.connectors.clickhouse; import com.geedgenetworks.connectors.clickhouse.sink.EventBatchIntervalClickHouseSink; -import com.geedgenetworks.spi.table.connector.SinkProvider; -import com.geedgenetworks.spi.table.connector.SinkTableFactory; +import com.geedgenetworks.spi.sink.SinkProvider; +import com.geedgenetworks.spi.sink.SinkTableFactory; import com.geedgenetworks.spi.table.event.Event; import com.geedgenetworks.spi.table.factory.FactoryUtil; import com.geedgenetworks.spi.table.factory.FactoryUtil.TableFactoryHelper; diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java index 1726fdd..8a52fb9 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java @@ -2,7 +2,7 @@ package com.geedgenetworks.connectors.clickhouse.sink; import com.alibaba.fastjson2.JSON; -import com.geedgenetworks.core.metrics.InternalMetrics; +import com.geedgenetworks.spi.metrics.InternalMetrics; import com.geedgenetworks.spi.table.event.Event; import com.geedgenetworks.spi.table.schema.Schema; import com.geedgenetworks.spi.table.schema.SchemaChangeAware; diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java index 5596049..a946f84 100644 --- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java +++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java @@ -1,6 +1,6 @@ package com.geedgenetworks.connectors.file; -import com.geedgenetworks.spi.table.connector.SourceProvider; +import com.geedgenetworks.spi.source.SourceProvider; import com.geedgenetworks.spi.table.event.Event; import com.geedgenetworks.spi.table.type.StructType; import org.apache.commons.io.IOUtils; diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java index 8add991..36a7610 100644 --- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java +++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java @@ -1,8 +1,8 @@ package com.geedgenetworks.connectors.file; import com.geedgenetworks.spi.table.connector.DecodingFormat; -import com.geedgenetworks.spi.table.connector.SourceProvider; -import com.geedgenetworks.spi.table.connector.SourceTableFactory; +import com.geedgenetworks.spi.source.SourceProvider; +import com.geedgenetworks.spi.source.SourceTableFactory; import com.geedgenetworks.spi.table.factory.DecodingFormatFactory; import com.geedgenetworks.spi.table.factory.FactoryUtil; import com.geedgenetworks.spi.table.type.StructType; diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java index d075307..7e86a2c 100644 --- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java +++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java @@ -3,10 +3,9 @@ package com.geedgenetworks.connectors.ipfix.collector; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.geedgenetworks.connectors.ipfix.collector.utils.IPFixUtil; -import com.geedgenetworks.core.metrics.InternalMetrics; -import com.geedgenetworks.spi.table.connector.SourceProvider; +import com.geedgenetworks.spi.metrics.InternalMetrics; +import com.geedgenetworks.spi.source.SourceProvider; import com.geedgenetworks.spi.table.type.*; - import com.geedgenetworks.spi.table.event.Event; import com.geedgenetworks.spi.table.type.DataType; import com.geedgenetworks.spi.table.type.StructType; diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java index 2853f1c..2865019 100644 --- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java +++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java @@ -1,7 +1,7 @@ package com.geedgenetworks.connectors.ipfix.collector; -import com.geedgenetworks.spi.table.connector.SourceProvider; -import com.geedgenetworks.spi.table.connector.SourceTableFactory; +import com.geedgenetworks.spi.source.SourceProvider; +import com.geedgenetworks.spi.source.SourceTableFactory; import com.geedgenetworks.spi.table.factory.FactoryUtil; import com.geedgenetworks.spi.table.type.StructType; import org.apache.flink.configuration.ConfigOption; diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java index 19856e9..65b01d4 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java @@ -1,6 +1,6 @@ package com.geedgenetworks.connectors.kafka; -import com.geedgenetworks.core.metrics.InternalMetrics; +import com.geedgenetworks.spi.metrics.InternalMetrics; import com.geedgenetworks.spi.table.event.Event; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.DeserializationSchema; diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java index 8c78669..b61b376 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java @@ -2,7 +2,7 @@ package com.geedgenetworks.connectors.kafka; import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategy; import com.geedgenetworks.spi.table.connector.EncodingFormat; -import com.geedgenetworks.spi.table.connector.SinkProvider; +import com.geedgenetworks.spi.sink.SinkProvider; import com.geedgenetworks.spi.table.event.Event; import com.geedgenetworks.spi.table.type.StructType; import org.apache.flink.api.common.serialization.SerializationSchema; diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java index 8ce7f19..81d766e 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java @@ -1,7 +1,7 @@ package com.geedgenetworks.connectors.kafka; import com.geedgenetworks.spi.table.connector.DecodingFormat; -import com.geedgenetworks.spi.table.connector.SourceProvider; +import com.geedgenetworks.spi.source.SourceProvider; import com.geedgenetworks.spi.table.event.Event; import com.geedgenetworks.spi.table.type.StructType; import org.apache.flink.api.common.serialization.DeserializationSchema; diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java index 9a20ef5..0478e00 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java @@ -4,6 +4,10 @@ import com.geedgenetworks.connectors.kafka.rate.BlockDropRateLimitingStrategy; import com.geedgenetworks.connectors.kafka.rate.NoRateLimitingStrategy; import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategy; import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategyType; +import com.geedgenetworks.spi.sink.SinkProvider; +import com.geedgenetworks.spi.sink.SinkTableFactory; +import com.geedgenetworks.spi.source.SourceProvider; +import com.geedgenetworks.spi.source.SourceTableFactory; import com.geedgenetworks.spi.table.connector.*; import com.geedgenetworks.spi.table.factory.DecodingFormatFactory; import com.geedgenetworks.spi.table.factory.EncodingFormatFactory; diff --git a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java index 3b7e0c5..239d125 100644 --- a/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java +++ b/groot-connectors/connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/GrootFlinkKafkaProducer.java @@ -1,25 +1,8 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package org.apache.flink.streaming.connectors.kafka; import com.geedgenetworks.connectors.kafka.rate.RateLimitingStatus; import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategy; -import com.geedgenetworks.core.metrics.InternalMetrics; +import com.geedgenetworks.spi.metrics.InternalMetrics; import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; diff --git a/groot-connectors/connector-mock/pom.xml b/groot-connectors/connector-mock/pom.xml index 4932eec..b13f7a5 100644 --- a/groot-connectors/connector-mock/pom.xml +++ b/groot-connectors/connector-mock/pom.xml @@ -18,6 +18,7 @@ <artifactId>datafaker</artifactId> <version>1.9.0</version> </dependency> + </dependencies> </project>
\ No newline at end of file diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java index 57432cd..a978938 100644 --- a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java @@ -2,8 +2,8 @@ package com.geedgenetworks.connectors.mock; import com.geedgenetworks.connectors.mock.faker.FakerUtils; import com.geedgenetworks.connectors.mock.faker.ObjectFaker; -import com.geedgenetworks.spi.table.connector.SourceProvider; -import com.geedgenetworks.spi.table.connector.SourceTableFactory; +import com.geedgenetworks.spi.source.SourceProvider; +import com.geedgenetworks.spi.source.SourceTableFactory; import com.geedgenetworks.spi.table.event.Event; import com.geedgenetworks.spi.table.factory.FactoryUtil; import com.geedgenetworks.spi.table.type.StructType; diff --git a/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java b/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java index 09446fd..c7ada13 100644 --- a/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java +++ b/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java @@ -1,8 +1,8 @@ package com.geedgenetworks.connectors.starrocks; -import com.geedgenetworks.spi.table.connector.SinkProvider; -import com.geedgenetworks.spi.table.connector.SinkTableFactory; +import com.geedgenetworks.spi.sink.SinkProvider; +import com.geedgenetworks.spi.sink.SinkTableFactory; import com.geedgenetworks.spi.table.event.Event; import com.geedgenetworks.spi.table.factory.FactoryUtil; import com.starrocks.connector.flink.table.sink.EventStarRocksDynamicSinkFunctionV2; diff --git a/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java index 94aa194..63920ab 100644 --- a/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java +++ b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java @@ -1,7 +1,7 @@ package com.starrocks.connector.flink.table.sink; import com.alibaba.fastjson2.JSON; -import com.geedgenetworks.core.metrics.InternalMetrics; +import com.geedgenetworks.spi.metrics.InternalMetrics; import com.geedgenetworks.spi.table.event.Event; import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity; import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener; diff --git a/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java index 337109b..d1aed43 100644 --- a/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java +++ b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java @@ -1,28 +1,28 @@ -package com.starrocks.connector.flink.table.sink;
-
-import com.geedgenetworks.core.metrics.InternalMetrics;
-import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener;
-import com.starrocks.data.load.stream.StreamLoadResponse;
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-public class EventStreamLoadListener extends StarRocksStreamLoadListener {
- private transient InternalMetrics internalMetrics;
- public EventStreamLoadListener(RuntimeContext context, StarRocksSinkOptions sinkOptions, InternalMetrics internalMetrics) {
- super(context, sinkOptions);
- this.internalMetrics = internalMetrics;
- }
-
- @Override
- public void flushSucceedRecord(StreamLoadResponse response) {
- super.flushSucceedRecord(response);
- if (response.getFlushRows() != null) {
- internalMetrics.incrementOutEvents(response.getFlushRows());
- }
- }
-
- @Override
- public void flushFailedRecord() {
- super.flushFailedRecord();
- internalMetrics.incrementErrorEvents(1);
- }
-}
+package com.starrocks.connector.flink.table.sink; + +import com.geedgenetworks.spi.metrics.InternalMetrics; +import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener; +import com.starrocks.data.load.stream.StreamLoadResponse; +import org.apache.flink.api.common.functions.RuntimeContext; + +public class EventStreamLoadListener extends StarRocksStreamLoadListener { + private transient InternalMetrics internalMetrics; + public EventStreamLoadListener(RuntimeContext context, StarRocksSinkOptions sinkOptions, InternalMetrics internalMetrics) { + super(context, sinkOptions); + this.internalMetrics = internalMetrics; + } + + @Override + public void flushSucceedRecord(StreamLoadResponse response) { + super.flushSucceedRecord(response); + if (response.getFlushRows() != null) { + internalMetrics.incrementOutEvents(response.getFlushRows()); + } + } + + @Override + public void flushFailedRecord() { + super.flushFailedRecord(); + internalMetrics.incrementErrorEvents(1); + } +} diff --git a/groot-connectors/pom.xml b/groot-connectors/pom.xml index 302e7e4..939f3bb 100644 --- a/groot-connectors/pom.xml +++ b/groot-connectors/pom.xml @@ -20,6 +20,7 @@ <module>connector-starrocks</module> </modules> <dependencies> + <dependency> <groupId>com.geedgenetworks</groupId> <artifactId>groot-spi</artifactId> @@ -29,14 +30,16 @@ <dependency> <groupId>com.geedgenetworks</groupId> - <artifactId>groot-core</artifactId> + <artifactId>groot-common</artifactId> <version>${revision}</version> <scope>provided</scope> </dependency> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId> + <scope>provided</scope> </dependency> </dependencies> diff --git a/groot-core/pom.xml b/groot-core/pom.xml index a5471ef..fb92c8d 100644 --- a/groot-core/pom.xml +++ b/groot-core/pom.xml @@ -17,6 +17,14 @@ <groupId>com.geedgenetworks</groupId> <artifactId>groot-spi</artifactId> <version>${revision}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-common</artifactId> + <version>${revision}</version> + <scope>provided</scope> </dependency> <dependency> @@ -40,22 +48,6 @@ </dependency> <dependency> - <groupId>com.geedgenetworks</groupId> - <artifactId>sketches</artifactId> - </dependency> - - <dependency> - <groupId>com.alibaba.nacos</groupId> - <artifactId>nacos-client</artifactId> - <exclusions> - <exclusion> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> </dependency> diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineSourceProvider.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineSourceProvider.java index 279b8c1..d6e2ba9 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineSourceProvider.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineSourceProvider.java @@ -1,6 +1,6 @@ package com.geedgenetworks.core.connector.inline; -import com.geedgenetworks.spi.table.connector.SourceProvider; +import com.geedgenetworks.spi.source.SourceProvider; import com.geedgenetworks.spi.table.connector.DecodingFormat; import com.geedgenetworks.spi.table.event.Event; import com.geedgenetworks.spi.table.type.StructType; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineTableFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineTableFactory.java index 67b02b9..3117ef9 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineTableFactory.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/inline/InlineTableFactory.java @@ -1,9 +1,9 @@ package com.geedgenetworks.core.connector.inline; import com.alibaba.fastjson2.JSON; -import com.geedgenetworks.spi.table.connector.SourceProvider; +import com.geedgenetworks.spi.source.SourceProvider; import com.geedgenetworks.spi.table.connector.DecodingFormat; -import com.geedgenetworks.spi.table.connector.SourceTableFactory; +import com.geedgenetworks.spi.source.SourceTableFactory; import com.geedgenetworks.spi.table.factory.DecodingFormatFactory; import com.geedgenetworks.spi.table.factory.FactoryUtil; import com.geedgenetworks.spi.table.type.StructType; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java index f3dfe37..cd8a4f6 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/print/PrintTableFactory.java @@ -1,7 +1,7 @@ package com.geedgenetworks.core.connector.print; import com.geedgenetworks.spi.table.connector.EncodingFormat; -import com.geedgenetworks.spi.table.connector.SinkTableFactory; +import com.geedgenetworks.spi.sink.SinkTableFactory; import com.geedgenetworks.spi.table.event.Event; import com.geedgenetworks.spi.table.type.StructType; import org.apache.flink.configuration.ConfigOption; @@ -9,7 +9,7 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; -import com.geedgenetworks.spi.table.connector.SinkProvider; +import com.geedgenetworks.spi.sink.SinkProvider; import com.geedgenetworks.spi.table.factory.FactoryUtil; import com.geedgenetworks.spi.table.factory.EncodingFormatFactory; import java.util.HashSet; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/FilterFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/FilterFunction.java index 0549105..ec6a13c 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/filter/FilterFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/FilterFunction.java @@ -1,7 +1,7 @@ package com.geedgenetworks.core.filter; import com.geedgenetworks.common.utils.ColumnUtil; -import com.geedgenetworks.core.metrics.InternalMetrics; +import com.geedgenetworks.spi.metrics.InternalMetrics; import com.geedgenetworks.spi.table.event.Event; import com.geedgenetworks.spi.filter.FilterConfig; import com.googlecode.aviator.AviatorEvaluator; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java index 268d50f..125ee7e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java @@ -7,10 +7,10 @@ import com.geedgenetworks.common.config.Constants; import com.geedgenetworks.common.config.KeybyEntity; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.core.metrics.InternalMetrics; import com.geedgenetworks.spi.common.udf.AggregateFunction; import com.geedgenetworks.spi.common.udf.UDFContext; import com.geedgenetworks.spi.common.udf.UdfEntity; +import com.geedgenetworks.spi.metrics.InternalMetrics; import com.geedgenetworks.spi.processor.AggregateConfig; import com.geedgenetworks.spi.table.event.Event; import com.google.common.collect.Lists; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java index 2fdcc3d..cbc80a2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java @@ -11,9 +11,9 @@ import com.geedgenetworks.spi.processor.AggregateConfig; import com.geedgenetworks.spi.table.event.Event; import com.geedgenetworks.spi.processor.Processor; import com.typesafe.config.Config; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; @@ -24,41 +24,41 @@ import java.util.Map; import static com.geedgenetworks.common.config.Constants.*; -public class AggregateProcessorImpl implements Processor<AggregateConfig> { +public class AggregateProcessor implements Processor<AggregateConfig> { @Override - public DataStream<Event> processorFunction(DataStream<Event> grootEventSingleOutputStreamOperator, AggregateConfig aggregateConfig, ExecutionConfig config) { + public DataStream<Event> process(StreamExecutionEnvironment env, DataStream<Event> input, AggregateConfig aggregateConfig) { SingleOutputStreamOperator<Event> singleOutputStreamOperator; if (aggregateConfig.getMini_batch()) { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - singleOutputStreamOperator = grootEventSingleOutputStreamOperator + singleOutputStreamOperator = input .process(new FirstAggregationProcessingTime(aggregateConfig, aggregateConfig.getWindow_size())) .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) .window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) - .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + .aggregate(new SecondAggregateProcessorFunction(env, aggregateConfig), new ProcessWindowFunction(aggregateConfig)); break; case TUMBLING_EVENT_TIME: - singleOutputStreamOperator = grootEventSingleOutputStreamOperator + singleOutputStreamOperator = input .process(new FirstAggregationEventTime(aggregateConfig, aggregateConfig.getWindow_size())) .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) .window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) - .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + .aggregate(new SecondAggregateProcessorFunction(env, aggregateConfig), new ProcessWindowFunction(aggregateConfig)); break; case SLIDING_PROCESSING_TIME: - singleOutputStreamOperator = grootEventSingleOutputStreamOperator + singleOutputStreamOperator = input .process(new FirstAggregationProcessingTime(aggregateConfig, aggregateConfig.getWindow_slide())) .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) .window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) - .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + .aggregate(new SecondAggregateProcessorFunction(env, aggregateConfig), new ProcessWindowFunction(aggregateConfig)); break; case SLIDING_EVENT_TIME: - singleOutputStreamOperator = grootEventSingleOutputStreamOperator + singleOutputStreamOperator = input .process(new FirstAggregationEventTime(aggregateConfig, aggregateConfig.getWindow_slide())) .keyBy(new PreKeySelector(aggregateConfig.getGroup_by_fields())) .window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) - .aggregate(new SecondAggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + .aggregate(new SecondAggregateProcessorFunction(env, aggregateConfig), new ProcessWindowFunction(aggregateConfig)); break; default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); @@ -67,28 +67,28 @@ public class AggregateProcessorImpl implements Processor<AggregateConfig> { } else { switch (aggregateConfig.getWindow_type()) { case TUMBLING_PROCESSING_TIME: - singleOutputStreamOperator = grootEventSingleOutputStreamOperator + singleOutputStreamOperator = input .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) .window(TumblingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) - .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + .aggregate(new AggregateProcessorFunction(env, aggregateConfig), new ProcessWindowFunction(aggregateConfig)); break; case TUMBLING_EVENT_TIME: - singleOutputStreamOperator = grootEventSingleOutputStreamOperator + singleOutputStreamOperator = input .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) .window(TumblingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()))) - .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + .aggregate(new AggregateProcessorFunction(env, aggregateConfig), new ProcessWindowFunction(aggregateConfig)); break; case SLIDING_PROCESSING_TIME: - singleOutputStreamOperator = grootEventSingleOutputStreamOperator + singleOutputStreamOperator = input .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) .window(SlidingProcessingTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) - .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + .aggregate(new AggregateProcessorFunction(env, aggregateConfig), new ProcessWindowFunction(aggregateConfig)); break; case SLIDING_EVENT_TIME: - singleOutputStreamOperator = grootEventSingleOutputStreamOperator + singleOutputStreamOperator = input .keyBy(new KeySelector(aggregateConfig.getGroup_by_fields())) .window(SlidingEventTimeWindows.of(Time.seconds(aggregateConfig.getWindow_size()), Time.seconds(aggregateConfig.getWindow_slide()))) - .aggregate(new AggregateProcessorFunction(aggregateConfig, config), new ProcessWindowFunctionImpl(aggregateConfig)); + .aggregate(new AggregateProcessorFunction(env, aggregateConfig), new ProcessWindowFunction(aggregateConfig)); break; default: throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Invalid window type"); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java index 8e902b6..0b22faa 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java @@ -18,6 +18,7 @@ import com.googlecode.aviator.Options; import com.googlecode.aviator.exception.ExpressionRuntimeException; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.HashMap; import java.util.LinkedList; @@ -33,8 +34,8 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f private final List<String> udfClassNameLists; private final LinkedList<UdfEntity> functions; - public AggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) { - udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); + public AggregateProcessorFunction(StreamExecutionEnvironment env, AggregateConfig aggregateConfig) { + udfClassNameLists = JSON.parseObject(env.getConfig().getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); udfContexts = aggregateConfig.getFunctions(); if (udfContexts == null || udfContexts.isEmpty()) { throw new RuntimeException(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunction.java index 6e78606..2aab0e6 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunctionImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/ProcessWindowFunction.java @@ -3,18 +3,17 @@ package com.geedgenetworks.core.processor.aggregate; import com.geedgenetworks.common.config.Accumulator; import com.geedgenetworks.common.config.KeybyEntity; import com.geedgenetworks.common.utils.ColumnUtil; -import com.geedgenetworks.core.metrics.InternalMetrics; +import com.geedgenetworks.spi.metrics.InternalMetrics; import com.geedgenetworks.spi.processor.AggregateConfig; import com.geedgenetworks.spi.table.event.Event; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import static com.geedgenetworks.spi.table.event.Event.WINDOW_END_TIMESTAMP; import static com.geedgenetworks.spi.table.event.Event.WINDOW_START_TIMESTAMP; -public class ProcessWindowFunctionImpl extends org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction< +public class ProcessWindowFunction extends org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction< Accumulator, // 输入类型 Event, // 输出类型 KeybyEntity, // 键类型 @@ -22,7 +21,7 @@ public class ProcessWindowFunctionImpl extends org.apache.flink.streaming.api.fu private final AggregateConfig aggregateConfig; private transient InternalMetrics internalMetrics; - public ProcessWindowFunctionImpl(AggregateConfig aggregateConfig) { + public ProcessWindowFunction(AggregateConfig aggregateConfig) { this.aggregateConfig = aggregateConfig; } @@ -34,7 +33,7 @@ public class ProcessWindowFunctionImpl extends org.apache.flink.streaming.api.fu } @Override - public void process(KeybyEntity keybyEntity, ProcessWindowFunction<Accumulator, Event, KeybyEntity, TimeWindow>.Context context, Iterable<Accumulator> elements, Collector<Event> out) throws Exception { + public void process(KeybyEntity keybyEntity, org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<Accumulator, Event, KeybyEntity, TimeWindow>.Context context, Iterable<Accumulator> elements, Collector<Event> out) throws Exception { Accumulator accumulator = elements.iterator().next(); Event event = new Event(); event.setExtractedFields(accumulator.getMetricsFields()); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java index 9087631..b9b53b7 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java @@ -17,6 +17,7 @@ import com.googlecode.aviator.Options; import com.googlecode.aviator.exception.ExpressionRuntimeException; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.HashMap; import java.util.LinkedList; @@ -31,8 +32,8 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co private final List<String> udfClassNameLists; private final LinkedList<UdfEntity> functions; - public SecondAggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) { - udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); + public SecondAggregateProcessorFunction(StreamExecutionEnvironment env, AggregateConfig aggregateConfig) { + udfClassNameLists = JSON.parseObject(env.getConfig().getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); udfContexts = aggregateConfig.getFunctions(); if (udfContexts == null || udfContexts.isEmpty()) { throw new RuntimeException(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFactory.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFactory.java new file mode 100644 index 0000000..b8b4201 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFactory.java @@ -0,0 +1,17 @@ +package com.geedgenetworks.core.processor.projection; + +import com.geedgenetworks.spi.processor.Processor; +import com.geedgenetworks.spi.processor.ProcessorFactory; + +public class ProjectionProcessFactory implements ProcessorFactory { + + @Override + public String type() { + return "projection"; + } + + @Override + public Processor<?> createProcessor() { + return new ProjectionProcessor(); + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java index ad8c6ca..004bd78 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java @@ -7,11 +7,11 @@ import com.geedgenetworks.common.config.*; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.utils.ColumnUtil; -import com.geedgenetworks.core.metrics.InternalMetrics; import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseScheduler; import com.geedgenetworks.spi.common.udf.ScalarFunction; import com.geedgenetworks.spi.common.udf.UDFContext; import com.geedgenetworks.spi.common.udf.UdfEntity; +import com.geedgenetworks.spi.metrics.InternalMetrics; import com.geedgenetworks.spi.processor.ProjectionConfig; import com.geedgenetworks.spi.table.event.Event; import com.google.common.collect.Lists; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java index 21ec8f4..d182bd8 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java @@ -11,23 +11,23 @@ import com.geedgenetworks.spi.processor.Processor; import com.geedgenetworks.spi.processor.ProjectionConfig; import com.geedgenetworks.spi.table.event.Event; import com.typesafe.config.Config; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Map; -public class ProjectionProcessorImpl implements Processor<ProjectionConfig> { +public class ProjectionProcessor implements Processor<ProjectionConfig> { @Override - public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, ProjectionConfig projectionConfig, ExecutionConfig config) { + public DataStream<Event> process(StreamExecutionEnvironment env, DataStream<Event> input, ProjectionConfig projectionConfig) { if (projectionConfig.getParallelism() != 0) { - return grootEventDataStream + return input .process(new ProjectionProcessFunction(projectionConfig)) .setParallelism(projectionConfig.getParallelism()) .name(projectionConfig.getName()); } else { - return grootEventDataStream + return input .process(new ProjectionProcessFunction(projectionConfig)) .name(projectionConfig.getName()); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessor.java index 8771cca..1db565b 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessor.java @@ -10,23 +10,23 @@ import com.geedgenetworks.spi.processor.Processor; import com.geedgenetworks.spi.processor.TableConfig; import com.geedgenetworks.spi.table.event.Event; import com.typesafe.config.Config; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Map; -public class TableProcessorImpl implements Processor<TableConfig> { +public class TableProcessor implements Processor<TableConfig> { @Override - public DataStream<Event> processorFunction(DataStream<Event> grootEventDataStream, TableConfig tableConfig, ExecutionConfig config) { + public DataStream<Event> process(StreamExecutionEnvironment env, DataStream<Event> input, TableConfig tableConfig) { if (tableConfig.getParallelism() != 0) { - return grootEventDataStream + return input .flatMap(new TableProcessorFunction(tableConfig)) .setParallelism(tableConfig.getParallelism()) .name(tableConfig.getName()); } else { - return grootEventDataStream + return input .flatMap(new TableProcessorFunction(tableConfig)) .name(tableConfig.getName()); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java index 5200d41..ecb76f4 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java @@ -5,10 +5,10 @@ import com.geedgenetworks.common.config.Constants; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.utils.ColumnUtil; -import com.geedgenetworks.core.metrics.InternalMetrics; import com.geedgenetworks.spi.common.udf.TableFunction; import com.geedgenetworks.spi.common.udf.UDFContext; import com.geedgenetworks.spi.common.udf.UdfEntity; +import com.geedgenetworks.spi.metrics.InternalMetrics; import com.geedgenetworks.spi.processor.TableConfig; import com.geedgenetworks.spi.table.event.Event; import com.google.common.collect.Lists; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java index f5bd652..64465a2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/split/SplitFunction.java @@ -1,7 +1,7 @@ package com.geedgenetworks.core.split; -import com.geedgenetworks.core.metrics.InternalMetrics; import com.geedgenetworks.spi.common.udf.RuleContext; +import com.geedgenetworks.spi.metrics.InternalMetrics; import com.geedgenetworks.spi.split.SplitConfig; import com.geedgenetworks.spi.table.event.Event; import com.googlecode.aviator.AviatorEvaluator; diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.spi.processor.Processor b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.spi.processor.Processor index 1f32ffa..a7eef58 100644 --- a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.spi.processor.Processor +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.spi.processor.Processor @@ -1,3 +1,3 @@ -com.geedgenetworks.core.processor.aggregate.AggregateProcessorImpl -com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl -com.geedgenetworks.core.processor.table.TableProcessorImpl
\ No newline at end of file +com.geedgenetworks.core.processor.aggregate.AggregateProcessor +com.geedgenetworks.core.processor.projection.ProjectionProcessor +com.geedgenetworks.core.processor.table.TableProcessor
\ No newline at end of file diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.spi.processor.ProcessorFactory b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.spi.processor.ProcessorFactory new file mode 100644 index 0000000..fa21381 --- /dev/null +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.spi.processor.ProcessorFactory @@ -0,0 +1 @@ +com.geedgenetworks.core.processor.projection.ProjectionProcessFactory
\ No newline at end of file diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java index 5e64962..9b58289 100644 --- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java +++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java @@ -14,7 +14,7 @@ import java.util.List; public class GrootStreamExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException { - String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print_test.yaml"; + String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml"; String configFile = getTestConfigFile(configPath); ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs(); executeCommandArgs.setConfigFile(configFile); diff --git a/groot-examples/pom.xml b/groot-examples/pom.xml index d1dc891..bbc9c65 100644 --- a/groot-examples/pom.xml +++ b/groot-examples/pom.xml @@ -18,10 +18,11 @@ </modules> <properties> - <scope>compile</scope> + <scope>provided</scope> </properties> <dependencies> + <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> @@ -30,34 +31,35 @@ <dependency> <groupId>com.geedgenetworks</groupId> <artifactId>groot-bootstrap</artifactId> - <version>${project.version}</version> + <version>${revision}</version> + <scope>${scope}</scope> </dependency> <dependency> <groupId>com.geedgenetworks</groupId> <artifactId>connector-kafka</artifactId> - <version>${project.version}</version> + <version>${revision}</version> <scope>${scope}</scope> </dependency> <dependency> <groupId>com.geedgenetworks</groupId> <artifactId>connector-mock</artifactId> - <version>${project.version}</version> + <version>${revision}</version> <scope>${scope}</scope> </dependency> <dependency> <groupId>com.geedgenetworks</groupId> <artifactId>connector-clickhouse</artifactId> - <version>${project.version}</version> + <version>${revision}</version> <scope>${scope}</scope> </dependency> <dependency> <groupId>com.geedgenetworks</groupId> <artifactId>connector-ipfix-collector</artifactId> - <version>${project.version}</version> + <version>${revision}</version> <scope>${scope}</scope> </dependency> @@ -146,7 +148,6 @@ </dependency> - </dependencies> diff --git a/groot-formats/format-csv/pom.xml b/groot-formats/format-csv/pom.xml index 4940bcf..509a9c1 100644 --- a/groot-formats/format-csv/pom.xml +++ b/groot-formats/format-csv/pom.xml @@ -19,5 +19,24 @@ <version>${flink.version}</version> <scope>${flink.scope}</scope> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId> + <scope>${flink.scope}</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + </dependencies> </project>
\ No newline at end of file diff --git a/groot-formats/format-json/pom.xml b/groot-formats/format-json/pom.xml index 36fef72..1036832 100644 --- a/groot-formats/format-json/pom.xml +++ b/groot-formats/format-json/pom.xml @@ -12,6 +12,25 @@ <artifactId>format-json</artifactId> <name>Groot : Formats : Format-Json </name> <dependencies> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + </dependencies> </project>
\ No newline at end of file diff --git a/groot-formats/format-msgpack/pom.xml b/groot-formats/format-msgpack/pom.xml index a58e919..7d70875 100644 --- a/groot-formats/format-msgpack/pom.xml +++ b/groot-formats/format-msgpack/pom.xml @@ -19,15 +19,30 @@ <version>0.9.8</version> </dependency> - <!--<dependency> + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-core</artifactId> + <version>${revision}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.version}</artifactId> + <scope>test</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.version}</artifactId> - </dependency>--> + <scope>test</scope> + </dependency> </dependencies> </project>
\ No newline at end of file diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java index b66c5b7..fced05e 100644 --- a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java +++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java @@ -1,9 +1,9 @@ package com.geedgenetworks.formats.msgpack; -import com.geedgenetworks.spi.table.connector.SinkProvider; -import com.geedgenetworks.spi.table.connector.SinkTableFactory; -import com.geedgenetworks.spi.table.connector.SourceProvider; -import com.geedgenetworks.spi.table.connector.SourceTableFactory; +import com.geedgenetworks.spi.sink.SinkProvider; +import com.geedgenetworks.spi.sink.SinkTableFactory; +import com.geedgenetworks.spi.source.SourceProvider; +import com.geedgenetworks.spi.source.SourceTableFactory; import com.geedgenetworks.spi.table.event.Event; import com.geedgenetworks.spi.table.factory.FactoryUtil; import com.geedgenetworks.spi.table.factory.TableFactory; @@ -69,6 +69,7 @@ public class MessagePackFormatFactoryTest { SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, "inline"); Map<String, String> options = new HashMap<>(); options.put("data", Base64.getEncoder().encodeToString(bytes)); + options.put("repeat.count", "3"); options.put("type", "base64"); options.put("format", "msgpack"); diff --git a/groot-formats/format-protobuf/pom.xml b/groot-formats/format-protobuf/pom.xml index f14e1d1..9902ada 100644 --- a/groot-formats/format-protobuf/pom.xml +++ b/groot-formats/format-protobuf/pom.xml @@ -13,17 +13,11 @@ <name>Groot : Formats : Format-Protobuf </name> <properties> - <protobuf.version>3.23.4</protobuf.version> + </properties> <dependencies> - <!--<dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - <version>${protobuf.version}</version> - </dependency>--> - <!-- - --> + <dependency> <groupId>com.geedgenetworks</groupId> <artifactId>protobuf-shaded</artifactId> @@ -37,10 +31,45 @@ </exclusions> </dependency> <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-core</artifactId> + <version>${revision}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>format-json</artifactId> + <version>${revision}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.version}</artifactId> <version>${flink.version}</version> <scope>test</scope> </dependency> + + + </dependencies> </project>
\ No newline at end of file diff --git a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufEventSchemaTest.java b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufEventSchemaTest.java index 9638bd6..df3c30a 100644 --- a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufEventSchemaTest.java +++ b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufEventSchemaTest.java @@ -1,700 +1,700 @@ -package com.geedgenetworks.formats.protobuf;
-
-import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.shaded.com.google.protobuf.ByteString;
-import com.geedgenetworks.shaded.com.google.protobuf.Descriptors;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.LineIterator;
-import org.apache.flink.util.Preconditions;
-import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.utils.ByteBufferOutputStream;
-import org.junit.jupiter.api.Test;
-import com.geedgenetworks.formats.protobuf.SchemaConverters.MessageConverter;
-
-import java.io.FileInputStream;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.jupiter.api.Assertions.*;
-
-/**
- * protoc --descriptor_set_out=proto3_types.desc --java_out=./ proto3_types.proto
- * protoc --descriptor_set_out=session_record_test.desc session_record_test.proto
- *
- */
-public class ProtobufEventSchemaTest {
-
- public static class InputDatas{
- Proto3TypesProtos.Proto3Types msg;
- Proto3TypesProtos.StructMessage subMsg1;
- Proto3TypesProtos.StructMessage subMsg2;
- Map<String, Object> map;
- Map<String, Object> subMap1;
- Map<String, Object> subMap2;
- }
-
- public static InputDatas geneInputDatas(){
- ThreadLocalRandom random = ThreadLocalRandom.current();
- Proto3TypesProtos.Proto3Types.Builder msgBuilder = Proto3TypesProtos.Proto3Types.newBuilder();
- Map<String, Object> map = new HashMap<>();
- Proto3TypesProtos.StructMessage.Builder subMsgBuilder1 = Proto3TypesProtos.StructMessage.newBuilder();
- Proto3TypesProtos.StructMessage.Builder subMsgBuilder2 = Proto3TypesProtos.StructMessage.newBuilder();
- Map<String, Object> subMap1 = new HashMap<>();
- Map<String, Object> subMap2 = new HashMap<>();
-
- long int64 = random.nextLong(1, Long.MAX_VALUE);
- msgBuilder.setInt64(int64);
- map.put("int64", int64);
-
- int int32 = random.nextInt(1, Integer.MAX_VALUE);
- msgBuilder.setInt32(int32);
- map.put("int32", int32);
-
- String text = "ut8字符串";
- msgBuilder.setText(text);
- map.put("text", text);
-
- byte[] bytes = new byte[]{1, 2, 3, 4, 5};
- msgBuilder.setBytes(ByteString.copyFrom(bytes));
- map.put("bytes", bytes);
-
- int enum_val = 1;
- msgBuilder.setEnumValValue(enum_val);
- map.put("enum_val", enum_val);
-
- // subMsg start
- long id = random.nextLong(1, Long.MAX_VALUE);
- subMsgBuilder1.setId(id);
- subMap1.put("id", id);
-
- String name = "ut8字符串1";
- subMsgBuilder1.setName(name);
- subMap1.put("name", name);
-
- int age = random.nextInt(1, Integer.MAX_VALUE);
- subMsgBuilder1.setAge(age);
- subMap1.put("age", age);
-
- double score = random.nextDouble(1, Integer.MAX_VALUE);
- subMsgBuilder1.setScore(score);
- subMap1.put("score", score);
-
- long optional_id = random.nextLong(1, Long.MAX_VALUE);
- subMsgBuilder1.setOptionalId(optional_id);
- subMap1.put("optional_id", optional_id);
-
- int optional_age = random.nextInt(1, Integer.MAX_VALUE);
- subMsgBuilder1.setOptionalAge(optional_age);
- subMap1.put("optional_age", optional_age);
-
- id = random.nextLong(1, Long.MAX_VALUE);
- subMsgBuilder2.setId(id);
- subMap2.put("id", id);
-
- name = "ut8字符串1";
- subMsgBuilder2.setName(name);
- subMap2.put("name", name);
-
- age = random.nextInt(1, Integer.MAX_VALUE);
- subMsgBuilder2.setAge(age);
- subMap2.put("age", age);
-
- score = random.nextDouble(1, Integer.MAX_VALUE);
- subMsgBuilder2.setScore(score);
- subMap2.put("score", score);
-
- optional_id = random.nextLong(1, Long.MAX_VALUE);
- subMsgBuilder2.setOptionalId(optional_id);
- subMap2.put("optional_id", optional_id);
-
- optional_age = random.nextInt(1, Integer.MAX_VALUE);
- subMsgBuilder2.setOptionalAge(optional_age);
- subMap2.put("optional_age", optional_age);
-
- Proto3TypesProtos.StructMessage subMsg1 = subMsgBuilder1.build();
- Proto3TypesProtos.StructMessage subMsg2 = subMsgBuilder2.build();
- // subMsg end
-
- msgBuilder.setMessage(subMsg1);
- map.put("message", subMap1);
-
- long optional_int64 = random.nextLong(1, Long.MAX_VALUE);
- msgBuilder.setOptionalInt64(optional_int64);
- map.put("optional_int64", optional_int64);
-
- int optional_int32 = random.nextInt(1, Integer.MAX_VALUE);
- msgBuilder.setOptionalInt32(optional_int32);
- map.put("optional_int32", optional_int32);
-
- String optional_text = "ut8字符串";
- msgBuilder.setOptionalText(optional_text);
- map.put("optional_text", optional_text);
-
- byte[] optional_bytes = new byte[]{1, 2, 3, 4, 5};
- msgBuilder.setOptionalBytes(ByteString.copyFrom(optional_bytes));
- map.put("optional_bytes", optional_bytes);
-
- int optional_enum_val = 1;
- msgBuilder.setOptionalEnumValValue(optional_enum_val);
- map.put("optional_enum_val", optional_enum_val);
-
- msgBuilder.setOptionalMessage(subMsg2);
- map.put("optional_message", subMap2);
-
- List<Long> repeated_int64 = Arrays.asList(1L, 3L, 5L);
- msgBuilder.addAllRepeatedInt64(repeated_int64);
- map.put("repeated_int64", repeated_int64);
-
- List<Integer> repeated_int32 = Arrays.asList(1, 3, 5);
- msgBuilder.addAllRepeatedInt32(repeated_int32);
- map.put("repeated_int32", repeated_int32);
-
- msgBuilder.addAllRepeatedMessage(Arrays.asList(subMsg1, subMsg2));
- map.put("repeated_message", Arrays.asList(subMap1, subMap2));
-
- InputDatas datas = new InputDatas();
- datas.msg = msgBuilder.build();
- datas.subMsg1 = subMsg1;
- datas.subMsg2 = subMsg2;
- datas.map = map;
- datas.subMap1 = subMap1;
- datas.subMap2 = subMap2;
- return datas;
- }
-
- public static InputDatas geneInputDatasDefaultValue(){
- ThreadLocalRandom random = ThreadLocalRandom.current();
- Proto3TypesProtos.Proto3Types.Builder msgBuilder = Proto3TypesProtos.Proto3Types.newBuilder();
- Map<String, Object> map = new HashMap<>();
- Proto3TypesProtos.StructMessage.Builder subMsgBuilder1 = Proto3TypesProtos.StructMessage.newBuilder();
- Proto3TypesProtos.StructMessage.Builder subMsgBuilder2 = Proto3TypesProtos.StructMessage.newBuilder();
- Map<String, Object> subMap1 = new HashMap<>();
- Map<String, Object> subMap2 = new HashMap<>();
-
- long int64 = 0;
- msgBuilder.setInt64(int64);
- map.put("int64", int64);
-
- int int32 = 0;
- msgBuilder.setInt32(int32);
- map.put("int32", int32);
-
- String text = "";
- msgBuilder.setText(text);
- map.put("text", text);
-
- byte[] bytes = new byte[]{};
- msgBuilder.setBytes(ByteString.copyFrom(bytes));
- map.put("bytes", bytes);
-
- int enum_val = 0;
- msgBuilder.setEnumValValue(enum_val);
- map.put("enum_val", enum_val);
-
- // subMsg start
- long id = 0;
- subMsgBuilder1.setId(id);
- subMap1.put("id", id);
-
- String name = "";
- subMsgBuilder1.setName(name);
- subMap1.put("name", name);
-
- int age = 0;
- subMsgBuilder1.setAge(age);
- subMap1.put("age", age);
-
- double score = 0;
- subMsgBuilder1.setScore(score);
- subMap1.put("score", score);
-
- long optional_id = 0;
- subMsgBuilder1.setOptionalId(optional_id);
- subMap1.put("optional_id", optional_id);
-
- int optional_age = 0;
- /*subMsgBuilder1.setOptionalAge(optional_age);
- subMap1.put("optional_age", optional_age);*/
-
- id = 0;
- subMsgBuilder2.setId(id);
- subMap2.put("id", id);
-
- name = "";
- subMsgBuilder2.setName(name);
- subMap2.put("name", name);
-
- age = 0;
- subMsgBuilder2.setAge(age);
- subMap2.put("age", age);
-
- score = 0;
- subMsgBuilder2.setScore(score);
- subMap2.put("score", score);
-
- optional_id = 0;
- subMsgBuilder2.setOptionalId(optional_id);
- subMap2.put("optional_id", optional_id);
-
- optional_age = 0;
- subMsgBuilder2.setOptionalAge(optional_age);
- subMap2.put("optional_age", optional_age);
-
- Proto3TypesProtos.StructMessage subMsg1 = subMsgBuilder1.build();
- Proto3TypesProtos.StructMessage subMsg2 = subMsgBuilder2.build();
- // subMsg end
-
- msgBuilder.setMessage(subMsg1);
- map.put("message", subMap1);
-
- long optional_int64 = 0;
- msgBuilder.setOptionalInt64(optional_int64);
- map.put("optional_int64", optional_int64);
-
- int optional_int32 = 0;
- msgBuilder.setOptionalInt32(optional_int32);
- map.put("optional_int32", optional_int32);
-
- String optional_text = "";
- msgBuilder.setOptionalText(optional_text);
- map.put("optional_text", optional_text);
-
- byte[] optional_bytes = new byte[]{};
- msgBuilder.setOptionalBytes(ByteString.copyFrom(optional_bytes));
- map.put("optional_bytes", optional_bytes);
-
- int optional_enum_val = 0;
- msgBuilder.setOptionalEnumValValue(optional_enum_val);
- map.put("optional_enum_val", optional_enum_val);
-
- msgBuilder.setOptionalMessage(subMsg2);
- map.put("optional_message", subMap2);
-
- List<Long> repeated_int64 = Arrays.asList();
- msgBuilder.addAllRepeatedInt64(repeated_int64);
- map.put("repeated_int64", repeated_int64);
-
- List<Integer> repeated_int32 = Arrays.asList();
- msgBuilder.addAllRepeatedInt32(repeated_int32);
- map.put("repeated_int32", repeated_int32);
-
- msgBuilder.addAllRepeatedMessage(Arrays.asList());
- map.put("repeated_message", Arrays.asList());
-
- InputDatas datas = new InputDatas();
- datas.msg = msgBuilder.build();
- datas.subMsg1 = subMsg1;
- datas.subMsg2 = subMsg2;
- datas.map = map;
- datas.subMap1 = subMap1;
- datas.subMap2 = subMap2;
- return datas;
- }
-
- public static InputDatas geneInputDatasUsePartialField(){
- ThreadLocalRandom random = ThreadLocalRandom.current();
- Proto3TypesProtos.Proto3Types.Builder msgBuilder = Proto3TypesProtos.Proto3Types.newBuilder();
- Map<String, Object> map = new HashMap<>();
- Proto3TypesProtos.StructMessage.Builder subMsgBuilder1 = Proto3TypesProtos.StructMessage.newBuilder();
- Proto3TypesProtos.StructMessage.Builder subMsgBuilder2 = Proto3TypesProtos.StructMessage.newBuilder();
- Map<String, Object> subMap1 = new HashMap<>();
- Map<String, Object> subMap2 = new HashMap<>();
-
- /*long int64 = random.nextLong(1, Long.MAX_VALUE);
- msgBuilder.setInt64(int64);
- map.put("int64", int64);*/
-
- int int32 = random.nextInt(1, Integer.MAX_VALUE);
- msgBuilder.setInt32(int32);
- map.put("int32", int32);
-
- String text = "ut8字符串";
- msgBuilder.setText(text);
- map.put("text", text);
-
- /*byte[] bytes = new byte[]{1, 2, 3, 4, 5};
- msgBuilder.setBytes(ByteString.copyFrom(bytes));
- map.put("bytes", bytes);*/
-
- /*int enum_val = 1;
- msgBuilder.setEnumValValue(enum_val);
- map.put("enum_val", enum_val);*/
-
- // subMsg start
- long id = random.nextLong(1, Long.MAX_VALUE);
- subMsgBuilder1.setId(id);
- subMap1.put("id", id);
-
- String name = "ut8字符串1";
- /*subMsgBuilder1.setName(name);
- subMap1.put("name", name);*/
-
- int age = random.nextInt(1, Integer.MAX_VALUE);
- subMsgBuilder1.setAge(age);
- subMap1.put("age", age);
-
- double score = random.nextDouble(1, Integer.MAX_VALUE);
- /*subMsgBuilder1.setScore(score);
- subMap1.put("score", score);*/
-
- long optional_id = random.nextLong(1, Long.MAX_VALUE);
- subMsgBuilder1.setOptionalId(optional_id);
- subMap1.put("optional_id", optional_id);
-
- int optional_age = random.nextInt(1, Integer.MAX_VALUE);
- /*subMsgBuilder1.setOptionalAge(optional_age);
- subMap1.put("optional_age", optional_age);*/
-
- id = random.nextLong(1, Long.MAX_VALUE);
- /*subMsgBuilder2.setId(id);
- subMap2.put("id", id);*/
-
- name = "ut8字符串1";
- subMsgBuilder2.setName(name);
- subMap2.put("name", name);
-
- age = random.nextInt(1, Integer.MAX_VALUE);
- /*subMsgBuilder2.setAge(age);
- subMap2.put("age", age);*/
-
- score = random.nextDouble(1, Integer.MAX_VALUE);
- subMsgBuilder2.setScore(score);
- subMap2.put("score", score);
-
- optional_id = random.nextLong(1, Long.MAX_VALUE);
- /*subMsgBuilder2.setOptionalId(optional_id);
- subMap2.put("optional_id", optional_id);*/
-
- optional_age = random.nextInt(1, Integer.MAX_VALUE);
- subMsgBuilder2.setOptionalAge(optional_age);
- subMap2.put("optional_age", optional_age);
-
- Proto3TypesProtos.StructMessage subMsg1 = subMsgBuilder1.build();
- Proto3TypesProtos.StructMessage subMsg2 = subMsgBuilder2.build();
- // subMsg end
-
- /*msgBuilder.setMessage(subMsg1);
- map.put("message", subMap1);*/
-
- long optional_int64 = random.nextLong(1, Long.MAX_VALUE);
- msgBuilder.setOptionalInt64(optional_int64);
- map.put("optional_int64", optional_int64);
-
- /*int optional_int32 = random.nextInt(1, Integer.MAX_VALUE);
- msgBuilder.setOptionalInt32(optional_int32);
- map.put("optional_int32", optional_int32);*/
-
- String optional_text = "ut8字符串";
- msgBuilder.setOptionalText(optional_text);
- map.put("optional_text", optional_text);
-
- /*byte[] optional_bytes = new byte[]{1, 2, 3, 4, 5};
- msgBuilder.setOptionalBytes(ByteString.copyFrom(optional_bytes));
- map.put("optional_bytes", optional_bytes);*/
-
- int optional_enum_val = 1;
- msgBuilder.setOptionalEnumValValue(optional_enum_val);
- map.put("optional_enum_val", optional_enum_val);
-
- msgBuilder.setOptionalMessage(subMsg2);
- map.put("optional_message", subMap2);
-
- /*List<Long> repeated_int64 = Arrays.asList(1L, 3L, 5L);
- msgBuilder.addAllRepeatedInt64(repeated_int64);
- map.put("repeated_int64", repeated_int64);*/
-
- List<Integer> repeated_int32 = Arrays.asList(1, 3, 5);
- msgBuilder.addAllRepeatedInt32(repeated_int32);
- map.put("repeated_int32", repeated_int32);
-
- msgBuilder.addAllRepeatedMessage(Arrays.asList(subMsg1, subMsg2));
- map.put("repeated_message", Arrays.asList(subMap1, subMap2));
-
- InputDatas datas = new InputDatas();
- datas.msg = msgBuilder.build();
- datas.subMsg1 = subMsg1;
- datas.subMsg2 = subMsg2;
- datas.map = map;
- datas.subMap1 = subMap1;
- datas.subMap2 = subMap2;
- return datas;
- }
-
- @Test
- public void testSerializeAndDeserialize() throws Exception{
- String path = getClass().getResource("/proto3_types.desc").getPath();
- Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"Proto3Types");
- InputDatas inputDatas = geneInputDatas();
-
- byte[] bytesSerByApi = inputDatas.msg.toByteArray();
-
- ProtobufSerializer serializer = new ProtobufSerializer(descriptor);
- byte[] bytesSer = serializer.serialize(inputDatas.map);
-
- System.out.println(String.format("built-in ser bytes size:%d\nmy ser bytes size:%d", bytesSerByApi.length, bytesSer.length));
- assertArrayEquals(bytesSerByApi, bytesSer);
-
- MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false);
- Map<String, Object> rstMap = messageConverter.converter(bytesSer);
-
- assertTrue(objEquals(inputDatas.map, rstMap, false), () -> "\n" + inputDatas.map.toString() + "\n" + rstMap.toString());
- System.out.println(inputDatas.map.toString());
- System.out.println(rstMap.toString());
- System.out.println(JSON.toJSONString(inputDatas.map));
- System.out.println(JSON.toJSONString(rstMap));
-
- System.out.println(JSON.toJSONString(inputDatas.map).equals(JSON.toJSONString(rstMap)));
- }
-
- @Test
- public void testSerializeAndDeserializeDefaultValue() throws Exception{
- String path = getClass().getResource("/proto3_types.desc").getPath();
- Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"Proto3Types");
- InputDatas inputDatas = geneInputDatasDefaultValue();
-
- byte[] bytesSerByApi = inputDatas.msg.toByteArray();
-
- ProtobufSerializer serializer = new ProtobufSerializer(descriptor);
- byte[] bytesSer = serializer.serialize(inputDatas.map);
-
- System.out.println(String.format("built-in ser bytes size:%d\nmy ser bytes size:%d", bytesSerByApi.length, bytesSer.length));
- assertArrayEquals(bytesSerByApi, bytesSer);
-
- MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false);
- Map<String, Object> rstMap = messageConverter.converter(bytesSer);
- messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), true);
- Map<String, Object> rstMapEmitDefaultValue = messageConverter.converter(bytesSer);
-
- // message不是null就输出, 数组长度大于0才输出, optional设置值就输出, optional bytes长度为0也输出
- System.out.println(inputDatas.map.toString());
- System.out.println(rstMap.toString());
- System.out.println(rstMapEmitDefaultValue.toString());
- System.out.println(JSON.toJSONString(inputDatas.map));
- System.out.println(JSON.toJSONString(rstMap));
- System.out.println(JSON.toJSONString(rstMapEmitDefaultValue));
-
- System.out.println(JSON.toJSONString(inputDatas.map).equals(JSON.toJSONString(rstMap)));
- }
-
- @Test
- public void testSerializeAndDeserializeUsePartialField() throws Exception{
- String path = getClass().getResource("/proto3_types.desc").getPath();
- Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"Proto3Types");
- InputDatas inputDatas = geneInputDatasUsePartialField();
-
- byte[] bytesSerByApi = inputDatas.msg.toByteArray();
-
- ProtobufSerializer serializer = new ProtobufSerializer(descriptor);
- byte[] bytesSer = serializer.serialize(inputDatas.map);
- System.out.println(Base64.getEncoder().encodeToString(bytesSer));
-
- System.out.println(String.format("built-in ser bytes size:%d\nmy ser bytes size:%d", bytesSerByApi.length, bytesSer.length));
- assertArrayEquals(bytesSerByApi, bytesSer);
-
- MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false);
- Map<String, Object> rstMap = messageConverter.converter(bytesSer);
-
- assertTrue(objEquals(inputDatas.map, rstMap, false), () -> "\n" + inputDatas.map.toString() + "\n" + rstMap.toString());
- System.out.println(inputDatas.map.toString());
- System.out.println(rstMap.toString());
- System.out.println(JSON.toJSONString(inputDatas.map));
- System.out.println(JSON.toJSONString(rstMap));
-
- System.out.println(JSON.toJSONString(inputDatas.map).equals(JSON.toJSONString(rstMap)));
- }
-
- @Test
- public void testSerializeAndDeserializeSessionRecord() throws Exception{
- String path = getClass().getResource("/session_record_test.desc").getPath();
- Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"SessionRecord");
- String json = "{\"recv_time\": 1704350600, \"log_id\": 185826449998479360, \"decoded_as\": \"BASE\", \"session_id\": 290502878495441820, \"start_timestamp_ms\": 1704350566378, \"end_timestamp_ms\": 1704350570816, \"duration_ms\": 4438, \"tcp_handshake_latency_ms\": 1105, \"ingestion_time\": 1704350600, \"processing_time\": 1704350600, \"device_id\": \"21426003\", \"out_link_id\": 65535, \"in_link_id\": 65535, \"device_tag\": \"{\\\"tags\\\":[{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-9140\\\"},{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-9140\\\"}]}\", \"data_center\": \"center-xxg-9140\", \"device_group\": \"group-xxg-9140\", \"sled_ip\": \"192.168.40.81\", \"address_type\": 4, \"vsys_id\": 1, \"t_vsys_id\": 1, \"flags\": 24592, \"flags_identify_info\": \"[1,1,2]\", \"statistics_rule_list\": [406583], \"client_ip\": \"192.56.151.80\", \"client_port\": 62241, \"client_os_desc\": \"Windows\", \"client_geolocation\": \"\\u7f8e\\u56fd.Unknown.Unknown..\", \"server_ip\": \"192.56.222.93\", \"server_port\": 14454, \"server_os_desc\": \"Linux\", \"server_geolocation\": \"\\u7f8e\\u56fd.Unknown.Unknown..\", \"ip_protocol\": \"tcp\", \"decoded_path\": \"ETHERNET.IPv4.TCP\", \"sent_pkts\": 4, \"received_pkts\": 5, \"sent_bytes\": 246, \"received_bytes\": 1809, \"tcp_rtt_ms\": 128, \"tcp_client_isn\": 568305009, \"tcp_server_isn\": 4027331180, \"in_src_mac\": \"a2:fa:dc:56:c7:b3\", \"out_src_mac\": \"48:73:97:96:38:20\", \"in_dest_mac\": \"48:73:97:96:38:20\", \"out_dest_mac\": \"a2:fa:dc:56:c7:b3\"}";
- Map<String, Object> map = JSON.parseObject(json);
-
- ProtobufSerializer serializer = new ProtobufSerializer(descriptor);
- byte[] bytesSer = serializer.serialize(map);
- System.out.println(Base64.getEncoder().encodeToString(bytesSer));
-
- System.out.println(String.format("my ser bytes size:%d", bytesSer.length));
-
- MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false);
- Map<String, Object> rstMap = messageConverter.converter(bytesSer);
-
- assertTrue(objEquals(map, rstMap, true), () -> "\n" + JSON.toJSONString(map) + "\n" + JSON.toJSONString(rstMap));
- System.out.println(map.toString());
- System.out.println(rstMap.toString());
- System.out.println(JSON.toJSONString(new TreeMap<>(map)));
- System.out.println(JSON.toJSONString(new TreeMap<>(rstMap)));
-
- System.out.println(JSON.toJSONString(new TreeMap<>(map)).equals(JSON.toJSONString(new TreeMap<>(rstMap))));
- }
-
-
- public static void main(String[] args) throws Exception{
- ProtobufEventSchemaTest test = new ProtobufEventSchemaTest();
- String path = test.getClass().getResource("/session_record_test.desc").getPath();
- Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"SessionRecord");
- MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false);
- ProtobufSerializer serializer = new ProtobufSerializer(descriptor);
-
- FileInputStream inputStream = new FileInputStream("D:\\doc\\groot\\SESSION-RECORD-24-0104.json");
- LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8");
- int count = 0;
- long jsonBytesTotalSize = 0;
- long protoBytesTotalSize = 0;
- long jsonBytesMinSize = Long.MAX_VALUE;
- long protoBytesMinSize = Long.MAX_VALUE;
- long jsonBytesMaxSize = 0;
- long protoBytesMaxSize = 0;
- long totalFieldCount = 0;
- long minFieldCount = Long.MAX_VALUE;
- long maxFieldCount = 0;
-
- CompressionType[] compressionTypes = new CompressionType[]{
- CompressionType.NONE, CompressionType.SNAPPY, CompressionType.LZ4, CompressionType.GZIP, CompressionType.ZSTD
- };
- long[][] compressionBytesSize = new long[compressionTypes.length][6];
- for (int i = 0; i < compressionBytesSize.length; i++) {
- compressionBytesSize[i][0] = 0;
- compressionBytesSize[i][1] = 0;
- compressionBytesSize[i][2] = Long.MAX_VALUE;
- compressionBytesSize[i][3] = Long.MAX_VALUE;
- compressionBytesSize[i][4] = 0;
- compressionBytesSize[i][5] = 0;
- }
-
- while (lines.hasNext()){
- String line = lines.next().trim();
- if(line.isEmpty()){
- continue;
- }
-
- Map<String, Object> map = JSON.parseObject(line);
- int fieldCount = map.size();
- byte[] bytesProto = serializer.serialize(map);
- byte[] bytesJson = JSON.toJSONString(map).getBytes(StandardCharsets.UTF_8);
- jsonBytesTotalSize += bytesJson.length;
- protoBytesTotalSize += bytesProto.length;
- jsonBytesMinSize = Math.min(jsonBytesMinSize, bytesJson.length);
- protoBytesMinSize = Math.min(protoBytesMinSize, bytesProto.length);
- jsonBytesMaxSize = Math.max(jsonBytesMaxSize, bytesJson.length);
- protoBytesMaxSize = Math.max(protoBytesMaxSize, bytesProto.length);
- totalFieldCount += fieldCount;
- minFieldCount = Math.min(minFieldCount, fieldCount);
- maxFieldCount = Math.max(maxFieldCount, fieldCount);
-
- Map<String, Object> rstMap = messageConverter.converter(bytesProto);
- Preconditions.checkArgument(test.objEquals(map, rstMap, true), "\n" + JSON.toJSONString(new TreeMap<>(map)) + "\n" + JSON.toJSONString(new TreeMap<>(rstMap)));
- Preconditions.checkArgument(JSON.toJSONString(new TreeMap<>(map)).equals(JSON.toJSONString(new TreeMap<>(rstMap))), "\n" + JSON.toJSONString(new TreeMap<>(map)) + "\n" + JSON.toJSONString(new TreeMap<>(rstMap)));
- count++;
-
- for (int i = 0; i < compressionTypes.length; i++) {
- CompressionType compressionType = compressionTypes[i];
- ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(1024 * 16);
- OutputStream outputStream = compressionType.wrapForOutput(bufferStream, (byte) 2);
- outputStream.write(bytesJson);
- outputStream.close();
- int jsonCompressSize = bufferStream.position();
-
- bufferStream = new ByteBufferOutputStream(1024 * 16);
- outputStream = compressionType.wrapForOutput(bufferStream, (byte) 2);
- outputStream.write(bytesProto);
- outputStream.close();
- int protoCompressSize = bufferStream.position();
-
- compressionBytesSize[i][0] += jsonCompressSize;
- compressionBytesSize[i][1] += protoCompressSize;
- compressionBytesSize[i][2] = Math.min(compressionBytesSize[i][2], jsonCompressSize);
- compressionBytesSize[i][3] = Math.min(compressionBytesSize[i][3], protoCompressSize);
- compressionBytesSize[i][4] = Math.max(compressionBytesSize[i][4], jsonCompressSize);
- compressionBytesSize[i][5] = Math.max(compressionBytesSize[i][5], protoCompressSize);
- }
-
- }
- System.out.println(String.format("count:%d, avgFieldCount:%d, minFieldCount:%d, maxFieldCount:%d, jsonBytesAvgSize:%d, protoBytesAvgSize:%d, jsonBytesMinSize:%d, protoBytesMinSize:%d, jsonBytesMaxSize:%d, protoBytesMaxSize:%d",
- count, totalFieldCount/count, minFieldCount, maxFieldCount, jsonBytesTotalSize/count, protoBytesTotalSize/count,
- jsonBytesMinSize, protoBytesMinSize, jsonBytesMaxSize, protoBytesMaxSize));
- for (int i = 0; i < compressionTypes.length; i++) {
- CompressionType compressionType = compressionTypes[i];
- System.out.println(String.format("compression(%s): count:%d, jsonBytesAvgSize:%d, protoBytesAvgSize:%d, avgRatio:%.2f, jsonBytesMinSize:%d, protoBytesMinSize:%d, minRatio:%.2f, jsonBytesMaxSize:%d, protoBytesMaxSize:%d, maxRatio:%.2f",
- compressionType, count, compressionBytesSize[i][0]/count, compressionBytesSize[i][1]/count, (((double)compressionBytesSize[i][1])/count)/(compressionBytesSize[i][0]/count),
- compressionBytesSize[i][2], compressionBytesSize[i][3], ((double)compressionBytesSize[i][3])/(compressionBytesSize[i][2]),
- compressionBytesSize[i][4], compressionBytesSize[i][5], ((double)compressionBytesSize[i][5])/(compressionBytesSize[i][4])));
- }
- }
-
- @Test
- public void testArrayInstance() throws Exception{
- Object bytes = new byte[]{1, 2, 3, 4, 5};
- Object ints = new int[]{1, 2, 3, 4, 5};
-
- System.out.println(bytes.getClass().isArray());
- System.out.println(bytes instanceof byte[]);
- System.out.println(bytes instanceof int[]);
- System.out.println(ints.getClass().isArray());
- System.out.println(ints instanceof byte[]);
- System.out.println(ints instanceof int[]);
- }
-
- private boolean objEquals(Object value1, Object value2, boolean numConvert){
- if(value1 == null){
- if(value1 != value2){
- return false;
- }
- }else if(value2 == null){
- return false;
- }else if(value1 instanceof Map){
- if(!mapEquals((Map<String, Object>) value1, (Map<String, Object>) value2, numConvert)){
- return false;
- }
- }else if(value1 instanceof List){
- if(!listEquals((List< Object>) value1, (List< Object>) value2, numConvert)){
- return false;
- }
- }else if(value1 instanceof byte[]){
- if(!Arrays.equals((byte[]) value1, (byte[]) value2)){
- return false;
- }
- }
- else{
- if(value1.getClass() != value2.getClass() || !value1.equals(value2)){
- if(numConvert && value1 instanceof Number && value2 instanceof Number && ((Number) value1).longValue() == ((Number) value2).longValue()){
-
- }else{
- return false;
- }
- }
- }
- return true;
- }
- private boolean mapEquals(Map<String, Object> map1, Map<String, Object> map2, boolean numConvert){
- if(map1.size() != map2.size()){
- return false;
- }
-
- for (Map.Entry<String, Object> entry : map1.entrySet()) {
- Object value1 = entry.getValue();
- Object value2 = map2.get(entry.getKey());
- if(!objEquals(value1, value2, numConvert)){
- return false;
- }
- }
-
- return true;
- }
-
- private boolean listEquals(List< Object> list1, List< Object> list2, boolean numConvert){
- if(list1.size() != list2.size()){
- return false;
- }
-
- for (int i = 0; i < list1.size(); i++) {
- if(!objEquals(list1.get(i), list2.get(i), numConvert)){
- return false;
- }
- }
-
- return true;
- }
-}
+package com.geedgenetworks.formats.protobuf; + +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.shaded.com.google.protobuf.ByteString; +import com.geedgenetworks.shaded.com.google.protobuf.Descriptors; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.LineIterator; +import org.apache.flink.util.Preconditions; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.junit.jupiter.api.Test; +import com.geedgenetworks.formats.protobuf.SchemaConverters.MessageConverter; + +import java.io.FileInputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * protoc --descriptor_set_out=proto3_types.desc --java_out=./ proto3_types.proto + * protoc --descriptor_set_out=session_record_test.desc session_record_test.proto + * + */ +public class ProtobufEventSchemaTest { + + public static class InputDatas{ + Proto3TypesProtos.Proto3Types msg; + Proto3TypesProtos.StructMessage subMsg1; + Proto3TypesProtos.StructMessage subMsg2; + Map<String, Object> map; + Map<String, Object> subMap1; + Map<String, Object> subMap2; + } + + public static InputDatas geneInputDatas(){ + ThreadLocalRandom random = ThreadLocalRandom.current(); + Proto3TypesProtos.Proto3Types.Builder msgBuilder = Proto3TypesProtos.Proto3Types.newBuilder(); + Map<String, Object> map = new HashMap<>(); + Proto3TypesProtos.StructMessage.Builder subMsgBuilder1 = Proto3TypesProtos.StructMessage.newBuilder(); + Proto3TypesProtos.StructMessage.Builder subMsgBuilder2 = Proto3TypesProtos.StructMessage.newBuilder(); + Map<String, Object> subMap1 = new HashMap<>(); + Map<String, Object> subMap2 = new HashMap<>(); + + long int64 = random.nextLong(1, Long.MAX_VALUE); + msgBuilder.setInt64(int64); + map.put("int64", int64); + + int int32 = random.nextInt(1, Integer.MAX_VALUE); + msgBuilder.setInt32(int32); + map.put("int32", int32); + + String text = "ut8字符串"; + msgBuilder.setText(text); + map.put("text", text); + + byte[] bytes = new byte[]{1, 2, 3, 4, 5}; + msgBuilder.setBytes(ByteString.copyFrom(bytes)); + map.put("bytes", bytes); + + int enum_val = 1; + msgBuilder.setEnumValValue(enum_val); + map.put("enum_val", enum_val); + + // subMsg start + long id = random.nextLong(1, Long.MAX_VALUE); + subMsgBuilder1.setId(id); + subMap1.put("id", id); + + String name = "ut8字符串1"; + subMsgBuilder1.setName(name); + subMap1.put("name", name); + + int age = random.nextInt(1, Integer.MAX_VALUE); + subMsgBuilder1.setAge(age); + subMap1.put("age", age); + + double score = random.nextDouble(1, Integer.MAX_VALUE); + subMsgBuilder1.setScore(score); + subMap1.put("score", score); + + long optional_id = random.nextLong(1, Long.MAX_VALUE); + subMsgBuilder1.setOptionalId(optional_id); + subMap1.put("optional_id", optional_id); + + int optional_age = random.nextInt(1, Integer.MAX_VALUE); + subMsgBuilder1.setOptionalAge(optional_age); + subMap1.put("optional_age", optional_age); + + id = random.nextLong(1, Long.MAX_VALUE); + subMsgBuilder2.setId(id); + subMap2.put("id", id); + + name = "ut8字符串1"; + subMsgBuilder2.setName(name); + subMap2.put("name", name); + + age = random.nextInt(1, Integer.MAX_VALUE); + subMsgBuilder2.setAge(age); + subMap2.put("age", age); + + score = random.nextDouble(1, Integer.MAX_VALUE); + subMsgBuilder2.setScore(score); + subMap2.put("score", score); + + optional_id = random.nextLong(1, Long.MAX_VALUE); + subMsgBuilder2.setOptionalId(optional_id); + subMap2.put("optional_id", optional_id); + + optional_age = random.nextInt(1, Integer.MAX_VALUE); + subMsgBuilder2.setOptionalAge(optional_age); + subMap2.put("optional_age", optional_age); + + Proto3TypesProtos.StructMessage subMsg1 = subMsgBuilder1.build(); + Proto3TypesProtos.StructMessage subMsg2 = subMsgBuilder2.build(); + // subMsg end + + msgBuilder.setMessage(subMsg1); + map.put("message", subMap1); + + long optional_int64 = random.nextLong(1, Long.MAX_VALUE); + msgBuilder.setOptionalInt64(optional_int64); + map.put("optional_int64", optional_int64); + + int optional_int32 = random.nextInt(1, Integer.MAX_VALUE); + msgBuilder.setOptionalInt32(optional_int32); + map.put("optional_int32", optional_int32); + + String optional_text = "ut8字符串"; + msgBuilder.setOptionalText(optional_text); + map.put("optional_text", optional_text); + + byte[] optional_bytes = new byte[]{1, 2, 3, 4, 5}; + msgBuilder.setOptionalBytes(ByteString.copyFrom(optional_bytes)); + map.put("optional_bytes", optional_bytes); + + int optional_enum_val = 1; + msgBuilder.setOptionalEnumValValue(optional_enum_val); + map.put("optional_enum_val", optional_enum_val); + + msgBuilder.setOptionalMessage(subMsg2); + map.put("optional_message", subMap2); + + List<Long> repeated_int64 = Arrays.asList(1L, 3L, 5L); + msgBuilder.addAllRepeatedInt64(repeated_int64); + map.put("repeated_int64", repeated_int64); + + List<Integer> repeated_int32 = Arrays.asList(1, 3, 5); + msgBuilder.addAllRepeatedInt32(repeated_int32); + map.put("repeated_int32", repeated_int32); + + msgBuilder.addAllRepeatedMessage(Arrays.asList(subMsg1, subMsg2)); + map.put("repeated_message", Arrays.asList(subMap1, subMap2)); + + InputDatas datas = new InputDatas(); + datas.msg = msgBuilder.build(); + datas.subMsg1 = subMsg1; + datas.subMsg2 = subMsg2; + datas.map = map; + datas.subMap1 = subMap1; + datas.subMap2 = subMap2; + return datas; + } + + public static InputDatas geneInputDatasDefaultValue(){ + ThreadLocalRandom random = ThreadLocalRandom.current(); + Proto3TypesProtos.Proto3Types.Builder msgBuilder = Proto3TypesProtos.Proto3Types.newBuilder(); + Map<String, Object> map = new HashMap<>(); + Proto3TypesProtos.StructMessage.Builder subMsgBuilder1 = Proto3TypesProtos.StructMessage.newBuilder(); + Proto3TypesProtos.StructMessage.Builder subMsgBuilder2 = Proto3TypesProtos.StructMessage.newBuilder(); + Map<String, Object> subMap1 = new HashMap<>(); + Map<String, Object> subMap2 = new HashMap<>(); + + long int64 = 0; + msgBuilder.setInt64(int64); + map.put("int64", int64); + + int int32 = 0; + msgBuilder.setInt32(int32); + map.put("int32", int32); + + String text = ""; + msgBuilder.setText(text); + map.put("text", text); + + byte[] bytes = new byte[]{}; + msgBuilder.setBytes(ByteString.copyFrom(bytes)); + map.put("bytes", bytes); + + int enum_val = 0; + msgBuilder.setEnumValValue(enum_val); + map.put("enum_val", enum_val); + + // subMsg start + long id = 0; + subMsgBuilder1.setId(id); + subMap1.put("id", id); + + String name = ""; + subMsgBuilder1.setName(name); + subMap1.put("name", name); + + int age = 0; + subMsgBuilder1.setAge(age); + subMap1.put("age", age); + + double score = 0; + subMsgBuilder1.setScore(score); + subMap1.put("score", score); + + long optional_id = 0; + subMsgBuilder1.setOptionalId(optional_id); + subMap1.put("optional_id", optional_id); + + int optional_age = 0; + /*subMsgBuilder1.setOptionalAge(optional_age); + subMap1.put("optional_age", optional_age);*/ + + id = 0; + subMsgBuilder2.setId(id); + subMap2.put("id", id); + + name = ""; + subMsgBuilder2.setName(name); + subMap2.put("name", name); + + age = 0; + subMsgBuilder2.setAge(age); + subMap2.put("age", age); + + score = 0; + subMsgBuilder2.setScore(score); + subMap2.put("score", score); + + optional_id = 0; + subMsgBuilder2.setOptionalId(optional_id); + subMap2.put("optional_id", optional_id); + + optional_age = 0; + subMsgBuilder2.setOptionalAge(optional_age); + subMap2.put("optional_age", optional_age); + + Proto3TypesProtos.StructMessage subMsg1 = subMsgBuilder1.build(); + Proto3TypesProtos.StructMessage subMsg2 = subMsgBuilder2.build(); + // subMsg end + + msgBuilder.setMessage(subMsg1); + map.put("message", subMap1); + + long optional_int64 = 0; + msgBuilder.setOptionalInt64(optional_int64); + map.put("optional_int64", optional_int64); + + int optional_int32 = 0; + msgBuilder.setOptionalInt32(optional_int32); + map.put("optional_int32", optional_int32); + + String optional_text = ""; + msgBuilder.setOptionalText(optional_text); + map.put("optional_text", optional_text); + + byte[] optional_bytes = new byte[]{}; + msgBuilder.setOptionalBytes(ByteString.copyFrom(optional_bytes)); + map.put("optional_bytes", optional_bytes); + + int optional_enum_val = 0; + msgBuilder.setOptionalEnumValValue(optional_enum_val); + map.put("optional_enum_val", optional_enum_val); + + msgBuilder.setOptionalMessage(subMsg2); + map.put("optional_message", subMap2); + + List<Long> repeated_int64 = Arrays.asList(); + msgBuilder.addAllRepeatedInt64(repeated_int64); + map.put("repeated_int64", repeated_int64); + + List<Integer> repeated_int32 = Arrays.asList(); + msgBuilder.addAllRepeatedInt32(repeated_int32); + map.put("repeated_int32", repeated_int32); + + msgBuilder.addAllRepeatedMessage(Arrays.asList()); + map.put("repeated_message", Arrays.asList()); + + InputDatas datas = new InputDatas(); + datas.msg = msgBuilder.build(); + datas.subMsg1 = subMsg1; + datas.subMsg2 = subMsg2; + datas.map = map; + datas.subMap1 = subMap1; + datas.subMap2 = subMap2; + return datas; + } + + public static InputDatas geneInputDatasUsePartialField(){ + ThreadLocalRandom random = ThreadLocalRandom.current(); + Proto3TypesProtos.Proto3Types.Builder msgBuilder = Proto3TypesProtos.Proto3Types.newBuilder(); + Map<String, Object> map = new HashMap<>(); + Proto3TypesProtos.StructMessage.Builder subMsgBuilder1 = Proto3TypesProtos.StructMessage.newBuilder(); + Proto3TypesProtos.StructMessage.Builder subMsgBuilder2 = Proto3TypesProtos.StructMessage.newBuilder(); + Map<String, Object> subMap1 = new HashMap<>(); + Map<String, Object> subMap2 = new HashMap<>(); + + /*long int64 = random.nextLong(1, Long.MAX_VALUE); + msgBuilder.setInt64(int64); + map.put("int64", int64);*/ + + int int32 = random.nextInt(1, Integer.MAX_VALUE); + msgBuilder.setInt32(int32); + map.put("int32", int32); + + String text = "ut8字符串"; + msgBuilder.setText(text); + map.put("text", text); + + /*byte[] bytes = new byte[]{1, 2, 3, 4, 5}; + msgBuilder.setBytes(ByteString.copyFrom(bytes)); + map.put("bytes", bytes);*/ + + /*int enum_val = 1; + msgBuilder.setEnumValValue(enum_val); + map.put("enum_val", enum_val);*/ + + // subMsg start + long id = random.nextLong(1, Long.MAX_VALUE); + subMsgBuilder1.setId(id); + subMap1.put("id", id); + + String name = "ut8字符串1"; + /*subMsgBuilder1.setName(name); + subMap1.put("name", name);*/ + + int age = random.nextInt(1, Integer.MAX_VALUE); + subMsgBuilder1.setAge(age); + subMap1.put("age", age); + + double score = random.nextDouble(1, Integer.MAX_VALUE); + /*subMsgBuilder1.setScore(score); + subMap1.put("score", score);*/ + + long optional_id = random.nextLong(1, Long.MAX_VALUE); + subMsgBuilder1.setOptionalId(optional_id); + subMap1.put("optional_id", optional_id); + + int optional_age = random.nextInt(1, Integer.MAX_VALUE); + /*subMsgBuilder1.setOptionalAge(optional_age); + subMap1.put("optional_age", optional_age);*/ + + id = random.nextLong(1, Long.MAX_VALUE); + /*subMsgBuilder2.setId(id); + subMap2.put("id", id);*/ + + name = "ut8字符串1"; + subMsgBuilder2.setName(name); + subMap2.put("name", name); + + age = random.nextInt(1, Integer.MAX_VALUE); + /*subMsgBuilder2.setAge(age); + subMap2.put("age", age);*/ + + score = random.nextDouble(1, Integer.MAX_VALUE); + subMsgBuilder2.setScore(score); + subMap2.put("score", score); + + optional_id = random.nextLong(1, Long.MAX_VALUE); + /*subMsgBuilder2.setOptionalId(optional_id); + subMap2.put("optional_id", optional_id);*/ + + optional_age = random.nextInt(1, Integer.MAX_VALUE); + subMsgBuilder2.setOptionalAge(optional_age); + subMap2.put("optional_age", optional_age); + + Proto3TypesProtos.StructMessage subMsg1 = subMsgBuilder1.build(); + Proto3TypesProtos.StructMessage subMsg2 = subMsgBuilder2.build(); + // subMsg end + + /*msgBuilder.setMessage(subMsg1); + map.put("message", subMap1);*/ + + long optional_int64 = random.nextLong(1, Long.MAX_VALUE); + msgBuilder.setOptionalInt64(optional_int64); + map.put("optional_int64", optional_int64); + + /*int optional_int32 = random.nextInt(1, Integer.MAX_VALUE); + msgBuilder.setOptionalInt32(optional_int32); + map.put("optional_int32", optional_int32);*/ + + String optional_text = "ut8字符串"; + msgBuilder.setOptionalText(optional_text); + map.put("optional_text", optional_text); + + /*byte[] optional_bytes = new byte[]{1, 2, 3, 4, 5}; + msgBuilder.setOptionalBytes(ByteString.copyFrom(optional_bytes)); + map.put("optional_bytes", optional_bytes);*/ + + int optional_enum_val = 1; + msgBuilder.setOptionalEnumValValue(optional_enum_val); + map.put("optional_enum_val", optional_enum_val); + + msgBuilder.setOptionalMessage(subMsg2); + map.put("optional_message", subMap2); + + /*List<Long> repeated_int64 = Arrays.asList(1L, 3L, 5L); + msgBuilder.addAllRepeatedInt64(repeated_int64); + map.put("repeated_int64", repeated_int64);*/ + + List<Integer> repeated_int32 = Arrays.asList(1, 3, 5); + msgBuilder.addAllRepeatedInt32(repeated_int32); + map.put("repeated_int32", repeated_int32); + + msgBuilder.addAllRepeatedMessage(Arrays.asList(subMsg1, subMsg2)); + map.put("repeated_message", Arrays.asList(subMap1, subMap2)); + + InputDatas datas = new InputDatas(); + datas.msg = msgBuilder.build(); + datas.subMsg1 = subMsg1; + datas.subMsg2 = subMsg2; + datas.map = map; + datas.subMap1 = subMap1; + datas.subMap2 = subMap2; + return datas; + } + + @Test + public void testSerializeAndDeserialize() throws Exception{ + String path = getClass().getResource("/proto3_types.desc").getPath(); + Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"Proto3Types"); + InputDatas inputDatas = geneInputDatas(); + + byte[] bytesSerByApi = inputDatas.msg.toByteArray(); + + ProtobufSerializer serializer = new ProtobufSerializer(descriptor); + byte[] bytesSer = serializer.serialize(inputDatas.map); + + System.out.println(String.format("built-in ser bytes size:%d\nmy ser bytes size:%d", bytesSerByApi.length, bytesSer.length)); + assertArrayEquals(bytesSerByApi, bytesSer); + + MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false); + Map<String, Object> rstMap = messageConverter.converter(bytesSer); + + assertTrue(objEquals(inputDatas.map, rstMap, false), () -> "\n" + inputDatas.map.toString() + "\n" + rstMap.toString()); + System.out.println(inputDatas.map.toString()); + System.out.println(rstMap.toString()); + System.out.println(JSON.toJSONString(inputDatas.map)); + System.out.println(JSON.toJSONString(rstMap)); + + System.out.println(JSON.toJSONString(inputDatas.map).equals(JSON.toJSONString(rstMap))); + } + + @Test + public void testSerializeAndDeserializeDefaultValue() throws Exception{ + String path = getClass().getResource("/proto3_types.desc").getPath(); + Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"Proto3Types"); + InputDatas inputDatas = geneInputDatasDefaultValue(); + + byte[] bytesSerByApi = inputDatas.msg.toByteArray(); + + ProtobufSerializer serializer = new ProtobufSerializer(descriptor); + byte[] bytesSer = serializer.serialize(inputDatas.map); + + System.out.println(String.format("built-in ser bytes size:%d\nmy ser bytes size:%d", bytesSerByApi.length, bytesSer.length)); + assertArrayEquals(bytesSerByApi, bytesSer); + + MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false); + Map<String, Object> rstMap = messageConverter.converter(bytesSer); + messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), true); + Map<String, Object> rstMapEmitDefaultValue = messageConverter.converter(bytesSer); + + // message不是null就输出, 数组长度大于0才输出, optional设置值就输出, optional bytes长度为0也输出 + System.out.println(inputDatas.map.toString()); + System.out.println(rstMap.toString()); + System.out.println(rstMapEmitDefaultValue.toString()); + System.out.println(JSON.toJSONString(inputDatas.map)); + System.out.println(JSON.toJSONString(rstMap)); + System.out.println(JSON.toJSONString(rstMapEmitDefaultValue)); + + System.out.println(JSON.toJSONString(inputDatas.map).equals(JSON.toJSONString(rstMap))); + } + + @Test + public void testSerializeAndDeserializeUsePartialField() throws Exception{ + String path = getClass().getResource("/proto3_types.desc").getPath(); + Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"Proto3Types"); + InputDatas inputDatas = geneInputDatasUsePartialField(); + + byte[] bytesSerByApi = inputDatas.msg.toByteArray(); + + ProtobufSerializer serializer = new ProtobufSerializer(descriptor); + byte[] bytesSer = serializer.serialize(inputDatas.map); + System.out.println(Base64.getEncoder().encodeToString(bytesSer)); + + System.out.println(String.format("built-in ser bytes size:%d\nmy ser bytes size:%d", bytesSerByApi.length, bytesSer.length)); + assertArrayEquals(bytesSerByApi, bytesSer); + + MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false); + Map<String, Object> rstMap = messageConverter.converter(bytesSer); + + assertTrue(objEquals(inputDatas.map, rstMap, false), () -> "\n" + inputDatas.map.toString() + "\n" + rstMap.toString()); + System.out.println(inputDatas.map.toString()); + System.out.println(rstMap.toString()); + System.out.println(JSON.toJSONString(inputDatas.map)); + System.out.println(JSON.toJSONString(rstMap)); + + System.out.println(JSON.toJSONString(inputDatas.map).equals(JSON.toJSONString(rstMap))); + } + + @Test + public void testSerializeAndDeserializeSessionRecord() throws Exception{ + String path = getClass().getResource("/session_record_test.desc").getPath(); + Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"SessionRecord"); + String json = "{\"recv_time\": 1704350600, \"log_id\": 185826449998479360, \"decoded_as\": \"BASE\", \"session_id\": 290502878495441820, \"start_timestamp_ms\": 1704350566378, \"end_timestamp_ms\": 1704350570816, \"duration_ms\": 4438, \"tcp_handshake_latency_ms\": 1105, \"ingestion_time\": 1704350600, \"processing_time\": 1704350600, \"device_id\": \"21426003\", \"out_link_id\": 65535, \"in_link_id\": 65535, \"device_tag\": \"{\\\"tags\\\":[{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-9140\\\"},{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-9140\\\"}]}\", \"data_center\": \"center-xxg-9140\", \"device_group\": \"group-xxg-9140\", \"sled_ip\": \"192.168.40.81\", \"address_type\": 4, \"vsys_id\": 1, \"t_vsys_id\": 1, \"flags\": 24592, \"flags_identify_info\": \"[1,1,2]\", \"statistics_rule_list\": [406583], \"client_ip\": \"192.56.151.80\", \"client_port\": 62241, \"client_os_desc\": \"Windows\", \"client_geolocation\": \"\\u7f8e\\u56fd.Unknown.Unknown..\", \"server_ip\": \"192.56.222.93\", \"server_port\": 14454, \"server_os_desc\": \"Linux\", \"server_geolocation\": \"\\u7f8e\\u56fd.Unknown.Unknown..\", \"ip_protocol\": \"tcp\", \"decoded_path\": \"ETHERNET.IPv4.TCP\", \"sent_pkts\": 4, \"received_pkts\": 5, \"sent_bytes\": 246, \"received_bytes\": 1809, \"tcp_rtt_ms\": 128, \"tcp_client_isn\": 568305009, \"tcp_server_isn\": 4027331180, \"in_src_mac\": \"a2:fa:dc:56:c7:b3\", \"out_src_mac\": \"48:73:97:96:38:20\", \"in_dest_mac\": \"48:73:97:96:38:20\", \"out_dest_mac\": \"a2:fa:dc:56:c7:b3\"}"; + Map<String, Object> map = JSON.parseObject(json); + + ProtobufSerializer serializer = new ProtobufSerializer(descriptor); + byte[] bytesSer = serializer.serialize(map); + System.out.println(Base64.getEncoder().encodeToString(bytesSer)); + + System.out.println(String.format("my ser bytes size:%d", bytesSer.length)); + + MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false); + Map<String, Object> rstMap = messageConverter.converter(bytesSer); + + assertTrue(objEquals(map, rstMap, true), () -> "\n" + JSON.toJSONString(map) + "\n" + JSON.toJSONString(rstMap)); + System.out.println(map.toString()); + System.out.println(rstMap.toString()); + System.out.println(JSON.toJSONString(new TreeMap<>(map))); + System.out.println(JSON.toJSONString(new TreeMap<>(rstMap))); + + System.out.println(JSON.toJSONString(new TreeMap<>(map)).equals(JSON.toJSONString(new TreeMap<>(rstMap)))); + } + + + public static void main(String[] args) throws Exception{ + ProtobufEventSchemaTest test = new ProtobufEventSchemaTest(); + String path = test.getClass().getResource("/session_record_test.desc").getPath(); + Descriptors.Descriptor descriptor = ProtobufUtils.buildDescriptor(ProtobufUtils.readDescriptorFileContent(path),"SessionRecord"); + MessageConverter messageConverter = new MessageConverter(descriptor, SchemaConverters.toStructType(descriptor), false); + ProtobufSerializer serializer = new ProtobufSerializer(descriptor); + + FileInputStream inputStream = new FileInputStream( test.getClass().getResource("/format_protobuf_test_data.json").getPath()); + LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8"); + int count = 0; + long jsonBytesTotalSize = 0; + long protoBytesTotalSize = 0; + long jsonBytesMinSize = Long.MAX_VALUE; + long protoBytesMinSize = Long.MAX_VALUE; + long jsonBytesMaxSize = 0; + long protoBytesMaxSize = 0; + long totalFieldCount = 0; + long minFieldCount = Long.MAX_VALUE; + long maxFieldCount = 0; + + CompressionType[] compressionTypes = new CompressionType[]{ + CompressionType.NONE, CompressionType.SNAPPY, CompressionType.LZ4, CompressionType.GZIP, CompressionType.ZSTD + }; + long[][] compressionBytesSize = new long[compressionTypes.length][6]; + for (int i = 0; i < compressionBytesSize.length; i++) { + compressionBytesSize[i][0] = 0; + compressionBytesSize[i][1] = 0; + compressionBytesSize[i][2] = Long.MAX_VALUE; + compressionBytesSize[i][3] = Long.MAX_VALUE; + compressionBytesSize[i][4] = 0; + compressionBytesSize[i][5] = 0; + } + + while (lines.hasNext()){ + String line = lines.next().trim(); + if(line.isEmpty()){ + continue; + } + + Map<String, Object> map = JSON.parseObject(line); + int fieldCount = map.size(); + byte[] bytesProto = serializer.serialize(map); + byte[] bytesJson = JSON.toJSONString(map).getBytes(StandardCharsets.UTF_8); + jsonBytesTotalSize += bytesJson.length; + protoBytesTotalSize += bytesProto.length; + jsonBytesMinSize = Math.min(jsonBytesMinSize, bytesJson.length); + protoBytesMinSize = Math.min(protoBytesMinSize, bytesProto.length); + jsonBytesMaxSize = Math.max(jsonBytesMaxSize, bytesJson.length); + protoBytesMaxSize = Math.max(protoBytesMaxSize, bytesProto.length); + totalFieldCount += fieldCount; + minFieldCount = Math.min(minFieldCount, fieldCount); + maxFieldCount = Math.max(maxFieldCount, fieldCount); + + Map<String, Object> rstMap = messageConverter.converter(bytesProto); + Preconditions.checkArgument(test.objEquals(map, rstMap, true), "\n" + JSON.toJSONString(new TreeMap<>(map)) + "\n" + JSON.toJSONString(new TreeMap<>(rstMap))); + Preconditions.checkArgument(JSON.toJSONString(new TreeMap<>(map)).equals(JSON.toJSONString(new TreeMap<>(rstMap))), "\n" + JSON.toJSONString(new TreeMap<>(map)) + "\n" + JSON.toJSONString(new TreeMap<>(rstMap))); + count++; + + for (int i = 0; i < compressionTypes.length; i++) { + CompressionType compressionType = compressionTypes[i]; + ByteBufferOutputStream bufferStream = new ByteBufferOutputStream(1024 * 16); + OutputStream outputStream = compressionType.wrapForOutput(bufferStream, (byte) 2); + outputStream.write(bytesJson); + outputStream.close(); + int jsonCompressSize = bufferStream.position(); + + bufferStream = new ByteBufferOutputStream(1024 * 16); + outputStream = compressionType.wrapForOutput(bufferStream, (byte) 2); + outputStream.write(bytesProto); + outputStream.close(); + int protoCompressSize = bufferStream.position(); + + compressionBytesSize[i][0] += jsonCompressSize; + compressionBytesSize[i][1] += protoCompressSize; + compressionBytesSize[i][2] = Math.min(compressionBytesSize[i][2], jsonCompressSize); + compressionBytesSize[i][3] = Math.min(compressionBytesSize[i][3], protoCompressSize); + compressionBytesSize[i][4] = Math.max(compressionBytesSize[i][4], jsonCompressSize); + compressionBytesSize[i][5] = Math.max(compressionBytesSize[i][5], protoCompressSize); + } + + } + System.out.println(String.format("count:%d, avgFieldCount:%d, minFieldCount:%d, maxFieldCount:%d, jsonBytesAvgSize:%d, protoBytesAvgSize:%d, jsonBytesMinSize:%d, protoBytesMinSize:%d, jsonBytesMaxSize:%d, protoBytesMaxSize:%d", + count, totalFieldCount/count, minFieldCount, maxFieldCount, jsonBytesTotalSize/count, protoBytesTotalSize/count, + jsonBytesMinSize, protoBytesMinSize, jsonBytesMaxSize, protoBytesMaxSize)); + for (int i = 0; i < compressionTypes.length; i++) { + CompressionType compressionType = compressionTypes[i]; + System.out.println(String.format("compression(%s): count:%d, jsonBytesAvgSize:%d, protoBytesAvgSize:%d, avgRatio:%.2f, jsonBytesMinSize:%d, protoBytesMinSize:%d, minRatio:%.2f, jsonBytesMaxSize:%d, protoBytesMaxSize:%d, maxRatio:%.2f", + compressionType, count, compressionBytesSize[i][0]/count, compressionBytesSize[i][1]/count, (((double)compressionBytesSize[i][1])/count)/(compressionBytesSize[i][0]/count), + compressionBytesSize[i][2], compressionBytesSize[i][3], ((double)compressionBytesSize[i][3])/(compressionBytesSize[i][2]), + compressionBytesSize[i][4], compressionBytesSize[i][5], ((double)compressionBytesSize[i][5])/(compressionBytesSize[i][4]))); + } + } + + @Test + public void testArrayInstance() throws Exception{ + Object bytes = new byte[]{1, 2, 3, 4, 5}; + Object ints = new int[]{1, 2, 3, 4, 5}; + + System.out.println(bytes.getClass().isArray()); + System.out.println(bytes instanceof byte[]); + System.out.println(bytes instanceof int[]); + System.out.println(ints.getClass().isArray()); + System.out.println(ints instanceof byte[]); + System.out.println(ints instanceof int[]); + } + + private boolean objEquals(Object value1, Object value2, boolean numConvert){ + if(value1 == null){ + if(value1 != value2){ + return false; + } + }else if(value2 == null){ + return false; + }else if(value1 instanceof Map){ + if(!mapEquals((Map<String, Object>) value1, (Map<String, Object>) value2, numConvert)){ + return false; + } + }else if(value1 instanceof List){ + if(!listEquals((List< Object>) value1, (List< Object>) value2, numConvert)){ + return false; + } + }else if(value1 instanceof byte[]){ + if(!Arrays.equals((byte[]) value1, (byte[]) value2)){ + return false; + } + } + else{ + if(value1.getClass() != value2.getClass() || !value1.equals(value2)){ + if(numConvert && value1 instanceof Number && value2 instanceof Number && ((Number) value1).longValue() == ((Number) value2).longValue()){ + + }else{ + return false; + } + } + } + return true; + } + private boolean mapEquals(Map<String, Object> map1, Map<String, Object> map2, boolean numConvert){ + if(map1.size() != map2.size()){ + return false; + } + + for (Map.Entry<String, Object> entry : map1.entrySet()) { + Object value1 = entry.getValue(); + Object value2 = map2.get(entry.getKey()); + if(!objEquals(value1, value2, numConvert)){ + return false; + } + } + + return true; + } + + private boolean listEquals(List< Object> list1, List< Object> list2, boolean numConvert){ + if(list1.size() != list2.size()){ + return false; + } + + for (int i = 0; i < list1.size(); i++) { + if(!objEquals(list1.get(i), list2.get(i), numConvert)){ + return false; + } + } + + return true; + } +} diff --git a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java index c5d6320..95941e4 100644 --- a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java +++ b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java @@ -1,10 +1,10 @@ package com.geedgenetworks.formats.protobuf; import com.alibaba.fastjson2.JSON; -import com.geedgenetworks.spi.table.connector.SinkProvider; -import com.geedgenetworks.spi.table.connector.SinkTableFactory; -import com.geedgenetworks.spi.table.connector.SourceProvider; -import com.geedgenetworks.spi.table.connector.SourceTableFactory; +import com.geedgenetworks.spi.sink.SinkProvider; +import com.geedgenetworks.spi.sink.SinkTableFactory; +import com.geedgenetworks.spi.source.SourceProvider; +import com.geedgenetworks.spi.source.SourceTableFactory; import com.geedgenetworks.spi.table.event.Event; import com.geedgenetworks.spi.table.factory.FactoryUtil; import com.geedgenetworks.spi.table.factory.TableFactory; @@ -27,6 +27,7 @@ class ProtobufFormatFactoryTest { SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, "inline"); Map<String, String> options = new HashMap<>(); + options.put("repeat.count", "3"); options.put("data", Base64.getEncoder().encodeToString(inputDatas.msg.toByteArray())); options.put("type", "base64"); options.put("format", "protobuf"); diff --git a/groot-formats/format-protobuf/src/test/resources/format_protobuf_test_data.json b/groot-formats/format-protobuf/src/test/resources/format_protobuf_test_data.json new file mode 100644 index 0000000..51dac53 --- /dev/null +++ b/groot-formats/format-protobuf/src/test/resources/format_protobuf_test_data.json @@ -0,0 +1 @@ +{"in_src_mac":"58:b3:8f:fa:3b:11","in_dest_mac":"48:73:97:96:38:27","out_src_mac":"48:73:97:96:38:27","out_dest_mac":"58:b3:8f:fa:3b:11","ip_protocol":"tcp","address_type":4,"client_ip":"192.168.32.110","server_ip":"180.163.210.217","client_port":54570,"server_port":8081,"tcp_client_isn":3530397760,"tcp_server_isn":1741812485,"tcp_rtt_ms":28,"tcp_handshake_latency_ms":28,"direction":"Outbound","in_link_id":29,"out_link_id":29,"start_timestamp_ms":1731167469371,"end_timestamp_ms":1731167474466,"duration_ms":5095,"sent_pkts":6,"sent_bytes":572,"tcp_c2s_ip_fragments":0,"received_pkts":4,"tcp_s2c_ip_fragments":0,"received_bytes":266,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_s2c_o3_pkts":0,"tcp_s2c_lost_bytes":0,"flags":28680,"flags_identify_info":[1,4,1,2],"app_transition":"unknown","decoded_as":"BASE","app_content":"unknown","app":"unknown","decoded_path":"ETHERNET.IPv4.TCP","client_country":"Private Network","server_country":"CN","app_category":"networking","server_asn":4812,"c2s_ttl":127,"s2c_ttl":47,"t_vsys_id":1,"vsys_id":1,"statistics_rule_list":[7731,7689,7532,7531,7372],"session_id":290530145510806375,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"XXG-TSG-BJ","device_group":"XXG-TSG-BJ","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"XXG-TSG-BJ\"},{\"tag\":\"device_group\",\"value\":\"XXG-TSG-BJ\"}]}","device_id":"9800165603191146","sled_ip":"192.168.40.62","dup_traffic_flag":0,"sc_rule_list":[4303],"sc_rsp_raw":[2002],"encapsulation":"[{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"c2s_source_mac\":\"48:73:97:96:38:27\",\"c2s_destination_mac\":\"58:b3:8f:fa:3b:11\",\"s2c_source_mac\":\"58:b3:8f:fa:3b:11\",\"s2c_destination_mac\":\"48:73:97:96:38:27\"}]","client_ip_tags":["Country Code:Private Network"],"server_ip_tags":["Country:China","ASN:4812","Country Code:CN"]}
\ No newline at end of file diff --git a/groot-formats/format-raw/pom.xml b/groot-formats/format-raw/pom.xml index 3433e64..11aa4d1 100644 --- a/groot-formats/format-raw/pom.xml +++ b/groot-formats/format-raw/pom.xml @@ -13,6 +13,30 @@ <name>Groot : Formats : Format-Raw </name> <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka_${scala.version}</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> </dependencies> </project>
\ No newline at end of file diff --git a/groot-formats/pom.xml b/groot-formats/pom.xml index 78e8e35..1d42523 100644 --- a/groot-formats/pom.xml +++ b/groot-formats/pom.xml @@ -21,6 +21,7 @@ </modules> <dependencies> + <dependency> <groupId>com.geedgenetworks</groupId> <artifactId>groot-spi</artifactId> @@ -30,15 +31,11 @@ <dependency> <groupId>com.geedgenetworks</groupId> - <artifactId>groot-core</artifactId> + <artifactId>groot-common</artifactId> <version>${revision}</version> <scope>provided</scope> </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId> - </dependency> </dependencies> diff --git a/groot-spi/pom.xml b/groot-spi/pom.xml index da69f1b..e27a9db 100644 --- a/groot-spi/pom.xml +++ b/groot-spi/pom.xml @@ -17,6 +17,7 @@ <groupId>com.geedgenetworks</groupId> <artifactId>groot-common</artifactId> <version>${revision}</version> + <scope>provided</scope> </dependency> diff --git a/groot-core/src/main/java/com/geedgenetworks/core/metrics/InternalMetrics.java b/groot-spi/src/main/java/com/geedgenetworks/spi/metrics/InternalMetrics.java index 0bd3cc2..172072a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/metrics/InternalMetrics.java +++ b/groot-spi/src/main/java/com/geedgenetworks/spi/metrics/InternalMetrics.java @@ -1,4 +1,4 @@ -package com.geedgenetworks.core.metrics; +package com.geedgenetworks.spi.metrics; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.metrics.Counter; diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/AggregateConfig.java b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/AggregateConfig.java index 3c06c8f..1d64d0d 100644 --- a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/AggregateConfig.java +++ b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/AggregateConfig.java @@ -11,7 +11,6 @@ import java.util.List; @Data public class AggregateConfig extends ProcessorConfig { - private List<String> group_by_fields; private String window_timestamp_field; private String window_type; diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/AggregateProcessor.java b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/AggregateProcessor.java deleted file mode 100644 index 4281c30..0000000 --- a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/AggregateProcessor.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.geedgenetworks.spi.processor; - -public interface AggregateProcessor extends Processor<AggregateConfig> { - -} diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/Processor.java b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/Processor.java index 5d994bb..ad42566 100644 --- a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/Processor.java +++ b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/Processor.java @@ -2,18 +2,17 @@ package com.geedgenetworks.spi.processor; import com.geedgenetworks.spi.table.event.Event; import com.typesafe.config.Config; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.Serializable; import java.util.Map; public interface Processor<T extends ProcessorConfig> extends Serializable { - DataStream<Event> processorFunction( - DataStream<Event> singleOutputStreamOperator, - T processorConfig, ExecutionConfig config) ; + DataStream<Event> process(StreamExecutionEnvironment env, DataStream<Event> input, T processorConfig) ; String type(); + T checkConfig(String name, Map<String, Object> configProperties, Config typeSafeConfig); } diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorConfig.java b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorConfig.java index 80f7ca2..e0fb40c 100644 --- a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorConfig.java +++ b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorConfig.java @@ -10,8 +10,8 @@ import java.util.Map; public class ProcessorConfig implements Serializable { private String type; private int parallelism; - private Map<String, Object> properties; private String name; private List<String> output_fields; private List<String> remove_fields; + private Map<String, Object> properties; } diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorFactory.java b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorFactory.java new file mode 100644 index 0000000..f7f0076 --- /dev/null +++ b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorFactory.java @@ -0,0 +1,7 @@ +package com.geedgenetworks.spi.processor; + +public interface ProcessorFactory { + String type(); + Processor<?> createProcessor(); + +} diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorProvider.java b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorProvider.java new file mode 100644 index 0000000..f71a560 --- /dev/null +++ b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProcessorProvider.java @@ -0,0 +1,17 @@ +package com.geedgenetworks.spi.processor; + +import java.util.ServiceLoader; + +public class ProcessorProvider { + + public static Processor<?> load(String type) { + ServiceLoader<ProcessorFactory> loader = ServiceLoader.load(ProcessorFactory.class); + for (ProcessorFactory factory : loader) { + if (factory.type().equals(type)) { + return factory.createProcessor(); + } + } + throw new IllegalArgumentException("Processor type not found: " + type); + } + +} diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProjectionProcessor.java b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProjectionProcessor.java deleted file mode 100644 index 27643a6..0000000 --- a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/ProjectionProcessor.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.geedgenetworks.spi.processor; - -public interface ProjectionProcessor extends Processor<ProjectionConfig> { - -} diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/TableProcessor.java b/groot-spi/src/main/java/com/geedgenetworks/spi/processor/TableProcessor.java deleted file mode 100644 index f6f904c..0000000 --- a/groot-spi/src/main/java/com/geedgenetworks/spi/processor/TableProcessor.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.geedgenetworks.spi.processor; - -public interface TableProcessor extends Processor<TableConfig> { - -} diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SinkProvider.java b/groot-spi/src/main/java/com/geedgenetworks/spi/sink/SinkProvider.java index 79e4747..0c2f425 100644 --- a/groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SinkProvider.java +++ b/groot-spi/src/main/java/com/geedgenetworks/spi/sink/SinkProvider.java @@ -1,4 +1,4 @@ -package com.geedgenetworks.spi.table.connector; +package com.geedgenetworks.spi.sink; import com.geedgenetworks.spi.table.event.Event; import org.apache.flink.streaming.api.datastream.DataStream; diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SinkTableFactory.java b/groot-spi/src/main/java/com/geedgenetworks/spi/sink/SinkTableFactory.java index 6f6e440..ecbb72c 100644 --- a/groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SinkTableFactory.java +++ b/groot-spi/src/main/java/com/geedgenetworks/spi/sink/SinkTableFactory.java @@ -1,4 +1,4 @@ -package com.geedgenetworks.spi.table.connector; +package com.geedgenetworks.spi.sink; import com.geedgenetworks.spi.table.factory.TableFactory; diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SourceProvider.java b/groot-spi/src/main/java/com/geedgenetworks/spi/source/SourceProvider.java index c792fbe..f7cf68f 100644 --- a/groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SourceProvider.java +++ b/groot-spi/src/main/java/com/geedgenetworks/spi/source/SourceProvider.java @@ -1,4 +1,4 @@ -package com.geedgenetworks.spi.table.connector; +package com.geedgenetworks.spi.source; import com.geedgenetworks.spi.table.event.Event; import com.geedgenetworks.spi.table.type.StructType; diff --git a/groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SourceTableFactory.java b/groot-spi/src/main/java/com/geedgenetworks/spi/source/SourceTableFactory.java index 14ca025..8e56e41 100644 --- a/groot-spi/src/main/java/com/geedgenetworks/spi/table/connector/SourceTableFactory.java +++ b/groot-spi/src/main/java/com/geedgenetworks/spi/source/SourceTableFactory.java @@ -1,4 +1,4 @@ -package com.geedgenetworks.spi.table.connector; +package com.geedgenetworks.spi.source; import com.geedgenetworks.spi.table.factory.TableFactory; |
