diff options
| author | 李奉超 <[email protected]> | 2024-07-12 05:39:21 +0000 |
|---|---|---|
| committer | 李奉超 <[email protected]> | 2024-07-12 05:39:21 +0000 |
| commit | 321907759e968741690d691f43d1527a2b32fc4b (patch) | |
| tree | 153369594e57042125a0ca6187f7629430fa11e3 | |
| parent | 76548454d29184d9a432bc2621b38a11f1b93999 (diff) | |
| parent | 76b5f89c44bded7da01e27ee97065f9bd4c77848 (diff) | |
Merge branch 'feature/pipeline-type' into 'develop'
Feature/pipeline type
See merge request galaxy/platform/groot-stream!74
23 files changed, 180 insertions, 202 deletions
diff --git a/config/grootstream.yaml b/config/grootstream.yaml index d8d6306..31c2ae2 100644 --- a/config/grootstream.yaml +++ b/config/grootstream.yaml @@ -1,10 +1,10 @@ grootstream: knowledge_base: - name: tsg_ip_asn - fs_type: http - fs_path: http://192.168.44.12:9999/v1/knowledge_base + fs_type: http #hdfs,local + fs_path: http://192.168.44.12:9999/v1/knowledge_base #单机模式hdfs://192.168.44.12:9000/kb,集群模式hdfs://ns1/kb files: - - f9f6bc91-2142-4673-8249-e097c00fe1ea + - f9f6bc91-2142-4673-8249-e097c00fe1ea #文件系统中文件名asn_builtin.mmdb - name: tsg_ip_location fs_type: http fs_path: http://192.168.44.12:9999/v1/knowledge_base @@ -18,7 +18,3 @@ grootstream: scheduler.knowledge_base.update.interval.minutes: 5 hadoop.dfs.namenodes: 192.168.44.12 hadoop.dfs.replication: 1 - #hadoop.dfs.port: 9000 - #hadoop.dfs.user: root - #hadoop.dfs.nameservices: ns1 - #hadoop.dfs.ha.namenodes.ns1: nn1,nn2
\ No newline at end of file diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index 52b5001..11ba617 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -8,13 +8,13 @@ sources: filters: filter_operator: - type: com.geedgenetworks.core.filter.AviatorFilter + type: aviator properties: expression: event.server_ip != '12.12.12.12' processing_pipelines: projection_processor: - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + type: projection remove_fields: [http_request_line, http_response_line, http_response_content_type] functions: - function: DROP diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java index 077fee3..04a6a94 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java @@ -1,26 +1,43 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.common.utils.ReflectionUtils; +import com.geedgenetworks.core.filter.Filter; +import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; +import com.geedgenetworks.core.processor.projection.ProjectionProcessor; import com.typesafe.config.Config; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; - import java.net.URL; import java.net.URLClassLoader; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.ServiceLoader; import java.util.function.BiConsumer; public abstract class AbstractExecutor<K, V> implements Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> { protected JobRuntimeEnvironment jobRuntimeEnvironment; - protected final Config operatorConfig; - protected final Map<K,V> operatorMap; + protected final Map<String,Filter> filterMap = new HashMap<>(); + protected final Map<String,ProjectionProcessor> projectionProcessorMap = new HashMap<>(); + protected final Map<String, AggregateProcessor> aggregateProcessorMap = new HashMap<>(); protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) { this.operatorConfig = operatorConfig; this.operatorMap = initialize(jarPaths, operatorConfig); + ServiceLoader<Filter> filters = ServiceLoader.load(Filter.class); + for (Filter filter : filters) { + this.filterMap.put(filter.type(), filter); + } + ServiceLoader<ProjectionProcessor> projectionProcessors = ServiceLoader.load(ProjectionProcessor.class); + for (ProjectionProcessor projectionProcessor : projectionProcessors) { + this.projectionProcessorMap.put(projectionProcessor.type(), projectionProcessor); + } + ServiceLoader<AggregateProcessor> aggregateProcessors = ServiceLoader.load(AggregateProcessor.class); + for (AggregateProcessor aggregateProcessor : aggregateProcessors) { + this.aggregateProcessorMap.put(aggregateProcessor.type(), aggregateProcessor); + } } @Override diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java index 0897186..506aa11 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java @@ -9,7 +9,7 @@ import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.FilterConfigOptions; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.core.filter.AviatorFilter; +import com.geedgenetworks.core.filter.Filter; import com.geedgenetworks.core.pojo.FilterConfig; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -21,20 +21,22 @@ import java.util.List; import java.util.Map; /** - * Initialize config and execute filter operator + * Initialize config and execute filter operator */ @Slf4j public class FilterExecutor extends AbstractExecutor<String, FilterConfig> { private static final String PROCESSOR_TYPE = ProcessorType.FILTER.getType(); + public FilterExecutor(List<URL> jarPaths, Config config) { super(jarPaths, config); } + @Override protected Map<String, FilterConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, FilterConfig> filterConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.FILTERS)) { Config filters = operatorConfig.getConfig(Constants.FILTERS); - filters.root().unwrapped().forEach((key,value) -> { + filters.root().unwrapped().forEach((key, value) -> { CheckResult result = CheckConfigUtil.checkAllExists(filters.getConfig(key), FilterConfigOptions.TYPE.key(), FilterConfigOptions.PROPERTIES.key()); if (!result.isSuccess()) { @@ -55,22 +57,29 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> { public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { FilterConfig filterConfig = operatorMap.get(node.getName()); String className = filterConfig.getType(); - Class cls = null; - AviatorFilter aviatorFilter = null; - try { - cls = Class.forName(className); - aviatorFilter = (AviatorFilter) cls.newInstance(); - if (node.getParallelism() > 0) { - filterConfig.setParallelism(node.getParallelism()); - } + Filter filter; + if (filterMap.containsKey(filterConfig.getType())) { + filter = filterMap.get(filterConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(className); + filter = (Filter) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get filter instance failed!", e); + } + } + if (node.getParallelism() > 0) { + filterConfig.setParallelism(node.getParallelism()); + } + try { dataStream = - aviatorFilter.filterFunction( + filter.filterFunction( dataStream, filterConfig); } catch (Exception e) { throw new JobExecuteException("Create filter instance failed!", e); } - return dataStream; } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java index 4e45b78..fa5f572 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java @@ -2,7 +2,6 @@ package com.geedgenetworks.bootstrap.execution; import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.enums.ProcessorType; -import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.CheckConfigUtil; @@ -12,7 +11,6 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.core.pojo.ProjectionConfig; import com.geedgenetworks.core.processor.projection.ProjectionProcessor; -import com.geedgenetworks.utils.StringUtil; import com.google.common.collect.Maps; import com.typesafe.config.Config; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -30,12 +28,13 @@ public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionC public PostprocessingExecutor(List<URL> jarPaths, Config config) { super(jarPaths, config); } + @Override protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, ProjectionConfig> postprocessingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) { Config postprocessors = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES); - postprocessors.root().unwrapped().forEach((key,value) -> { + postprocessors.root().unwrapped().forEach((key, value) -> { CheckResult result = CheckConfigUtil.checkAllExists(postprocessors.getConfig(key), ProjectionConfigOptions.TYPE.key()); if (!result.isSuccess()) { @@ -68,21 +67,29 @@ public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionC @Override public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { - Class cls = null; - ProjectionProcessor projectionProcessor = null; ProjectionConfig projectionConfig = operatorMap.get(node.getName()); - try { - cls = Class.forName(projectionConfig.getType()); - projectionProcessor = (ProjectionProcessor) cls.newInstance(); - - if (node.getParallelism() > 0) { - projectionConfig.setParallelism(node.getParallelism()); + String className = projectionConfig.getType(); + ProjectionProcessor projectionProcessor; + if (projectionProcessorMap.containsKey(projectionConfig.getType())) { + projectionProcessor = projectionProcessorMap.get(projectionConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(className); + projectionProcessor = (ProjectionProcessor) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get postprocessing pipeline instance failed!", e); } + } + if (node.getParallelism() > 0) { + projectionConfig.setParallelism(node.getParallelism()); + } + try { dataStream = projectionProcessor.projectionProcessorFunction( dataStream, projectionConfig); } catch (Exception e) { - throw new JobExecuteException("create postprocessing pipeline operator error", e); + throw new JobExecuteException("Create postprocessing pipeline instance failed!", e); } return dataStream; } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java index 84e8718..31fcce8 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java @@ -26,15 +26,17 @@ import java.util.Map; @Slf4j public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionConfig> { private static final String PROCESSOR_TYPE = ProcessorType.PREPROCESSING.getType(); + public PreprocessingExecutor(List<URL> jarPaths, Config config) { super(jarPaths, config); } + @Override protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, ProjectionConfig> preprocessingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) { Config preprocessors = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES); - preprocessors.root().unwrapped().forEach((key,value) -> { + preprocessors.root().unwrapped().forEach((key, value) -> { CheckResult result = CheckConfigUtil.checkAllExists(preprocessors.getConfig(key), ProjectionConfigOptions.TYPE.key()); if (!result.isSuccess()) { @@ -67,21 +69,29 @@ public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionCo @Override public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { - Class cls = null; - ProjectionProcessor projectionProcessor = null; ProjectionConfig projectionConfig = operatorMap.get(node.getName()); - try { - cls = Class.forName(projectionConfig.getType()); - projectionProcessor = (ProjectionProcessor) cls.newInstance(); - - if (node.getParallelism() > 0) { - projectionConfig.setParallelism(node.getParallelism()); + String className = projectionConfig.getType(); + ProjectionProcessor projectionProcessor; + if (projectionProcessorMap.containsKey(projectionConfig.getType())) { + projectionProcessor = projectionProcessorMap.get(projectionConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(className); + projectionProcessor = (ProjectionProcessor) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get preprocessing pipeline instance failed!", e); } + } + if (node.getParallelism() > 0) { + projectionConfig.setParallelism(node.getParallelism()); + } + try { dataStream = projectionProcessor.projectionProcessorFunction( dataStream, projectionConfig); } catch (Exception e) { - throw new JobExecuteException("Create preprocessor pipeline instance failed!", e); + throw new JobExecuteException("Create preprocessing pipeline instance failed!", e); } return dataStream; } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java index a8798e8..4a3b204 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java @@ -18,6 +18,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; import java.util.List; import java.util.Map; + /** * Initialize config and execute processor */ @@ -27,12 +28,13 @@ public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfi public ProcessingExecutor(List<URL> jarPaths, Config config) { super(jarPaths, config); } + @Override protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, ProjectionConfig> processingConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.PROCESSING_PIPELINES)) { Config processors = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES); - processors.root().unwrapped().forEach((key,value) -> { + processors.root().unwrapped().forEach((key, value) -> { CheckResult result = CheckConfigUtil.checkAllExists(processors.getConfig(key), ProjectionConfigOptions.TYPE.key()); if (!result.isSuccess()) { @@ -66,16 +68,24 @@ public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfi @Override public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException { - Class cls = null; - ProjectionProcessor projectionProcessor = null; ProjectionConfig projectionConfig = operatorMap.get(node.getName()); - try { - cls = Class.forName(projectionConfig.getType()); - projectionProcessor = (ProjectionProcessor) cls.newInstance(); - - if (node.getParallelism() > 0) { - projectionConfig.setParallelism(node.getParallelism()); + String className = projectionConfig.getType(); + ProjectionProcessor projectionProcessor; + if (projectionProcessorMap.containsKey(projectionConfig.getType())) { + projectionProcessor = projectionProcessorMap.get(projectionConfig.getType()); + } else { + Class cls; + try { + cls = Class.forName(className); + projectionProcessor = (ProjectionProcessor) cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { + throw new JobExecuteException("get processing pipeline instance failed!", e); } + } + if (node.getParallelism() > 0) { + projectionConfig.setParallelism(node.getParallelism()); + } + try { dataStream = projectionProcessor.projectionProcessorFunction( dataStream, projectionConfig); diff --git a/groot-common/pom.xml b/groot-common/pom.xml index f00cfc9..8e1f1c8 100644 --- a/groot-common/pom.xml +++ b/groot-common/pom.xml @@ -86,22 +86,6 @@ <scope>provided</scope> </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop.version}</version> - </dependency> - </dependencies> <build> diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java index 3c7d095..231fac5 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java @@ -26,7 +26,6 @@ public final class Constants { public static final String SYSPROP_GROOTSTREAM_CONFIG = "grootstream.config"; public static final String SYSPROP_GROOTSTREAM_PREFIX = "props."; - public static final String SYSPROP_GROOTSTREAM_HADOOP_PREFIX = "hadoop."; public static final String SYSPROP_GROOTSTREAM_KB_SCHEDULER_INTERVAL_NAME = "scheduler.knowledge_base.update.interval.minutes"; public static final String HAZELCAST_GROOTSTREAM_CONFIG_FILE_PREFIX = "grootstream"; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java deleted file mode 100644 index ccec51b..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java +++ /dev/null @@ -1,107 +0,0 @@ -package com.geedgenetworks.common.utils; - -import com.alibaba.fastjson2.JSONArray; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.shaded.com.google.common.base.Splitter; -import lombok.extern.slf4j.Slf4j; - -import java.io.*; - -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -@Slf4j -public class HdfsUtils { - - private static FileSystem fs; - private static HdfsUtils hdfsInstance; - - private HdfsUtils(Map<String, String> hdfsProp) { - Configuration config = new Configuration(); - String paths = hdfsProp.getOrDefault("dfs.namenodes","192.168.44.12"); - String port = hdfsProp.getOrDefault("dfs.port","9000"); - String name = hdfsProp.getOrDefault("dfs.user","root"); - List<String> hadoopNameNodes = new ArrayList<>(); - String defaultFS = ""; - if (!paths.isEmpty()) { - try { - hadoopNameNodes= Splitter.on(",").trimResults().splitToList(paths); - } catch (RuntimeException e) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of dfs.namenodes is illegal"); - } - } - if (hadoopNameNodes.size() == 1) { - String replication = hdfsProp.getOrDefault("dfs.replication","1"); - defaultFS = "hdfs://" + hadoopNameNodes.get(0) + ":" + port; - config.set("fs.defaultFS", defaultFS); - config.set("dfs.replication", replication); - } else if (hadoopNameNodes.size() > 1) { - String replication = hdfsProp.getOrDefault("dfs.replication","2"); - String nameservices = hdfsProp.getOrDefault("dfs.nameservices","ns1"); - String ns1 = hdfsProp.getOrDefault("dfs.ha.namenodes.ns1","nn1,nn2"); - defaultFS = "hdfs://" + nameservices; - config.set("fs.defaultFS", defaultFS); - config.set("dfs.nameservices", nameservices); - config.set("dfs.ha.namenodes.ns1", ns1); - config.set("dfs.namenode.rpc-address.ns1.nn1", hadoopNameNodes.get(0) + ":" + port); - config.set("dfs.namenode.rpc-address.ns1.nn2", hadoopNameNodes.get(1) + ":" + port); - config.set("dfs.client.failover.proxy.provider.ns1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); - config.set("fs.AbstractFileSystem.hdfs.impl", "org.apache.hadoop.fs.Hdfs"); - config.set("dfs.replication", replication); - } - try { - fs = FileSystem.get(new URI(defaultFS), config, name); - } catch (IOException | URISyntaxException | InterruptedException e) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error",e); - } - } - - public static HdfsUtils initHdfsUtilInstance(Map<String, String> hdfsProp) { - if (hdfsInstance == null) { - hdfsInstance = new HdfsUtils(hdfsProp); - } - return hdfsInstance; - } - public static HdfsUtils getHdfsUtilInstance() { - if (hdfsInstance == null) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error"); - } - return hdfsInstance; - } - public byte[] readBytes(Path hdfsPath) { - - try { - if (fs.exists(hdfsPath)) { - try (FSDataInputStream inputStream = fs.open(hdfsPath); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { - byte[] buffer = new byte[1024]; - int bytesRead; - while ((bytesRead = inputStream.read(buffer)) != -1) { - outputStream.write(buffer, 0, bytesRead); - } - return outputStream.toByteArray(); - } catch (RuntimeException e) { - String errorMsg = String.format("get hdfs file [%s] failed", hdfsPath); - throw new GrootStreamRuntimeException( - CommonErrorCode.FILE_OPERATION_ERROR, errorMsg, e); - } - } - throw new GrootStreamRuntimeException( - CommonErrorCode.FILE_OPERATION_ERROR, String.format("hdfs filepath [%s] is not exists", hdfsPath)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } -}
\ No newline at end of file diff --git a/groot-core/pom.xml b/groot-core/pom.xml index 316bd5d..628670f 100644 --- a/groot-core/pom.xml +++ b/groot-core/pom.xml @@ -91,13 +91,12 @@ <artifactId>flink-table-planner-blink_${scala.version}</artifactId> <scope>provided</scope> </dependency> -<!-- <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-hadoop2-compat</artifactId> - <version>2.4.10</version> - <scope>compile</scope> - </dependency>--> - + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-hadoop-2-uber</artifactId> + <version>2.7.5-8.0</version> + <scope>provided</scope> + </dependency> </dependencies> diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java index dae6f94..668ba6f 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java @@ -22,4 +22,10 @@ public class AviatorFilter implements Filter { .name(FilterConfig.getName()); } } + + @Override + public String type() { + return "aviator"; + } + } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java index b2483e8..a173438 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java @@ -10,4 +10,5 @@ public interface Filter { SingleOutputStreamOperator<Event> filterFunction( SingleOutputStreamOperator<Event> singleOutputStreamOperator, FilterConfig FilterConfig) throws Exception; + String type(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java index 70783bd..bbf7d8d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java @@ -11,4 +11,5 @@ public interface AggregateProcessor { SingleOutputStreamOperator<Event> singleOutputStreamOperator, AggregateConfig aggregateConfig) throws Exception; + String type(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java index f7e6dab..f6fa9d5 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java @@ -28,4 +28,9 @@ public class AggregateProcessorImpl implements AggregateProcessor { }*/ return singleOutputStreamOperator; } + + @Override + public String type() { + return "aggregate"; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java index 8b7e8d1..f492767 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java @@ -8,7 +8,7 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.UDFContext; import com.geedgenetworks.common.utils.ColumnUtil; import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.utils.HdfsUtils; +import com.geedgenetworks.core.utils.HdfsUtils; import com.geedgenetworks.core.metrics.InternalMetrics; import com.geedgenetworks.core.pojo.ProjectionConfig; import com.geedgenetworks.common.udf.UDF; @@ -21,7 +21,6 @@ import com.googlecode.aviator.Options; import com.googlecode.aviator.exception.ExpressionRuntimeException; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.Counter; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; @@ -56,16 +55,6 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); CommonConfig commonConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class); KnowledgeBaseScheduler.startSchedulerForKnowledgeBase(Integer.parseInt(commonConfig.getPropertiesConfig().getOrDefault(Constants.SYSPROP_GROOTSTREAM_KB_SCHEDULER_INTERVAL_NAME, "5"))); - - for (Map.Entry<String, String> entry : commonConfig.getPropertiesConfig().entrySet()) { - Map<String, String> hdfsProp = new HashMap<>(); - if (entry.getKey().startsWith(Constants.SYSPROP_GROOTSTREAM_HADOOP_PREFIX)) { - hdfsProp.put(entry.getKey().replace(Constants.SYSPROP_GROOTSTREAM_HADOOP_PREFIX, ""), entry.getValue()); - } - if(!hdfsProp.isEmpty()){ - HdfsUtils.initHdfsUtilInstance(hdfsProp); - } - } for (UDFContext udfContext : udfContexts) { Expression filterExpression = null; UdfEntity udfEntity = new UdfEntity(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java index e97ab98..862ba5e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java @@ -11,4 +11,5 @@ public interface ProjectionProcessor { SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, ProjectionConfig projectionConfig) throws Exception; + String type(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java index 5f6d0fe..79b0e0d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java @@ -11,7 +11,7 @@ public class ProjectionProcessorImpl implements ProjectionProcessor { public SingleOutputStreamOperator<Event> projectionProcessorFunction( SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator, ProjectionConfig projectionConfig) - throws Exception { + throws Exception{ if (projectionConfig.getParallelism() != 0) { return grootEventSingleOutputStreamOperator @@ -24,4 +24,9 @@ public class ProjectionProcessorImpl implements ProjectionProcessor { .name(projectionConfig.getName()); } } + + @Override + public String type() { + return "projection"; + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java index 9a4509b..113e164 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java @@ -6,7 +6,7 @@ import com.alibaba.fastjson2.JSONArray; import com.geedgenetworks.common.config.KnowledgeBaseConfig; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.common.utils.HdfsUtils; +import com.geedgenetworks.core.utils.HdfsUtils; import com.geedgenetworks.core.pojo.KnowLedgeBaseFileMeta; import com.geedgenetworks.core.utils.HttpClientPoolUtil; import lombok.extern.slf4j.Slf4j; @@ -72,7 +72,7 @@ public abstract class AbstractKnowledgeBaseHandler { switch (fileSystemType) { case "hdfs": - return HdfsUtils.getHdfsUtilInstance().readBytes(new org.apache.hadoop.fs.Path(filePath, fileName)); + return HdfsUtils.readBytes(new org.apache.hadoop.fs.Path(filePath, fileName)); case "local": return PathUtil.readBytes(Paths.get(filePath, fileName)); default: diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/HdfsUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/HdfsUtils.java new file mode 100644 index 0000000..a991ce9 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/HdfsUtils.java @@ -0,0 +1,43 @@ +package com.geedgenetworks.core.utils; + +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import java.io.*; +import org.apache.hadoop.fs.FileSystem; + + + +@Slf4j +public class HdfsUtils { + + public static byte[] readBytes(Path hdfsPath) { + + try { + FileSystem fs = hdfsPath.getFileSystem(new Configuration()); + if (fs.exists(hdfsPath)) { + try (FSDataInputStream inputStream = fs.open(hdfsPath); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + byte[] buffer = new byte[1024]; + int bytesRead; + while ((bytesRead = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + } + return outputStream.toByteArray(); + } catch (RuntimeException e) { + String errorMsg = String.format("get hdfs file [%s] failed", hdfsPath); + throw new GrootStreamRuntimeException( + CommonErrorCode.FILE_OPERATION_ERROR, errorMsg, e); + } + } + throw new GrootStreamRuntimeException( + CommonErrorCode.FILE_OPERATION_ERROR, String.format("hdfs filepath [%s] is not exists", hdfsPath)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + +}
\ No newline at end of file diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.filter.Filter b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.filter.Filter new file mode 100644 index 0000000..2268533 --- /dev/null +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.filter.Filter @@ -0,0 +1 @@ +com.geedgenetworks.core.filter.AviatorFilter
\ No newline at end of file diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor new file mode 100644 index 0000000..426a1a9 --- /dev/null +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor @@ -0,0 +1 @@ +com.geedgenetworks.core.processor.aggregate.AggregateProcessorImpl
\ No newline at end of file diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor new file mode 100644 index 0000000..ede2c8c --- /dev/null +++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor @@ -0,0 +1 @@ +com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
\ No newline at end of file |
