summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author李奉超 <[email protected]>2024-07-12 05:39:21 +0000
committer李奉超 <[email protected]>2024-07-12 05:39:21 +0000
commit321907759e968741690d691f43d1527a2b32fc4b (patch)
tree153369594e57042125a0ca6187f7629430fa11e3
parent76548454d29184d9a432bc2621b38a11f1b93999 (diff)
parent76b5f89c44bded7da01e27ee97065f9bd4c77848 (diff)
Merge branch 'feature/pipeline-type' into 'develop'
Feature/pipeline type See merge request galaxy/platform/groot-stream!74
-rw-r--r--config/grootstream.yaml10
-rw-r--r--config/grootstream_job_example.yaml4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java23
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java35
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java31
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java30
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java28
-rw-r--r--groot-common/pom.xml16
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Constants.java1
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java107
-rw-r--r--groot-core/pom.xml13
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java6
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java1
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java1
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java13
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java1
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java7
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/HdfsUtils.java43
-rw-r--r--groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.filter.Filter1
-rw-r--r--groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor1
-rw-r--r--groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor1
23 files changed, 180 insertions, 202 deletions
diff --git a/config/grootstream.yaml b/config/grootstream.yaml
index d8d6306..31c2ae2 100644
--- a/config/grootstream.yaml
+++ b/config/grootstream.yaml
@@ -1,10 +1,10 @@
grootstream:
knowledge_base:
- name: tsg_ip_asn
- fs_type: http
- fs_path: http://192.168.44.12:9999/v1/knowledge_base
+ fs_type: http #hdfs,local
+ fs_path: http://192.168.44.12:9999/v1/knowledge_base #单机模式hdfs://192.168.44.12:9000/kb,集群模式hdfs://ns1/kb
files:
- - f9f6bc91-2142-4673-8249-e097c00fe1ea
+ - f9f6bc91-2142-4673-8249-e097c00fe1ea #文件系统中文件名asn_builtin.mmdb
- name: tsg_ip_location
fs_type: http
fs_path: http://192.168.44.12:9999/v1/knowledge_base
@@ -18,7 +18,3 @@ grootstream:
scheduler.knowledge_base.update.interval.minutes: 5
hadoop.dfs.namenodes: 192.168.44.12
hadoop.dfs.replication: 1
- #hadoop.dfs.port: 9000
- #hadoop.dfs.user: root
- #hadoop.dfs.nameservices: ns1
- #hadoop.dfs.ha.namenodes.ns1: nn1,nn2 \ No newline at end of file
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml
index 52b5001..11ba617 100644
--- a/config/grootstream_job_example.yaml
+++ b/config/grootstream_job_example.yaml
@@ -8,13 +8,13 @@ sources:
filters:
filter_operator:
- type: com.geedgenetworks.core.filter.AviatorFilter
+ type: aviator
properties:
expression: event.server_ip != '12.12.12.12'
processing_pipelines:
projection_processor:
- type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ type: projection
remove_fields: [http_request_line, http_response_line, http_response_content_type]
functions:
- function: DROP
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
index 077fee3..04a6a94 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
@@ -1,26 +1,43 @@
package com.geedgenetworks.bootstrap.execution;
import com.geedgenetworks.common.utils.ReflectionUtils;
+import com.geedgenetworks.core.filter.Filter;
+import com.geedgenetworks.core.processor.aggregate.AggregateProcessor;
+import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
import com.typesafe.config.Config;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.ServiceLoader;
import java.util.function.BiConsumer;
public abstract class AbstractExecutor<K, V>
implements Executor<SingleOutputStreamOperator, JobRuntimeEnvironment> {
protected JobRuntimeEnvironment jobRuntimeEnvironment;
-
protected final Config operatorConfig;
-
protected final Map<K,V> operatorMap;
+ protected final Map<String,Filter> filterMap = new HashMap<>();
+ protected final Map<String,ProjectionProcessor> projectionProcessorMap = new HashMap<>();
+ protected final Map<String, AggregateProcessor> aggregateProcessorMap = new HashMap<>();
protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) {
this.operatorConfig = operatorConfig;
this.operatorMap = initialize(jarPaths, operatorConfig);
+ ServiceLoader<Filter> filters = ServiceLoader.load(Filter.class);
+ for (Filter filter : filters) {
+ this.filterMap.put(filter.type(), filter);
+ }
+ ServiceLoader<ProjectionProcessor> projectionProcessors = ServiceLoader.load(ProjectionProcessor.class);
+ for (ProjectionProcessor projectionProcessor : projectionProcessors) {
+ this.projectionProcessorMap.put(projectionProcessor.type(), projectionProcessor);
+ }
+ ServiceLoader<AggregateProcessor> aggregateProcessors = ServiceLoader.load(AggregateProcessor.class);
+ for (AggregateProcessor aggregateProcessor : aggregateProcessors) {
+ this.aggregateProcessorMap.put(aggregateProcessor.type(), aggregateProcessor);
+ }
}
@Override
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
index 0897186..506aa11 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
@@ -9,7 +9,7 @@ import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.FilterConfigOptions;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.core.filter.AviatorFilter;
+import com.geedgenetworks.core.filter.Filter;
import com.geedgenetworks.core.pojo.FilterConfig;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
@@ -21,20 +21,22 @@ import java.util.List;
import java.util.Map;
/**
- * Initialize config and execute filter operator
+ * Initialize config and execute filter operator
*/
@Slf4j
public class FilterExecutor extends AbstractExecutor<String, FilterConfig> {
private static final String PROCESSOR_TYPE = ProcessorType.FILTER.getType();
+
public FilterExecutor(List<URL> jarPaths, Config config) {
super(jarPaths, config);
}
+
@Override
protected Map<String, FilterConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
Map<String, FilterConfig> filterConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.FILTERS)) {
Config filters = operatorConfig.getConfig(Constants.FILTERS);
- filters.root().unwrapped().forEach((key,value) -> {
+ filters.root().unwrapped().forEach((key, value) -> {
CheckResult result = CheckConfigUtil.checkAllExists(filters.getConfig(key),
FilterConfigOptions.TYPE.key(), FilterConfigOptions.PROPERTIES.key());
if (!result.isSuccess()) {
@@ -55,22 +57,29 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> {
public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
FilterConfig filterConfig = operatorMap.get(node.getName());
String className = filterConfig.getType();
- Class cls = null;
- AviatorFilter aviatorFilter = null;
- try {
- cls = Class.forName(className);
- aviatorFilter = (AviatorFilter) cls.newInstance();
- if (node.getParallelism() > 0) {
- filterConfig.setParallelism(node.getParallelism());
- }
+ Filter filter;
+ if (filterMap.containsKey(filterConfig.getType())) {
+ filter = filterMap.get(filterConfig.getType());
+ } else {
+ Class cls;
+ try {
+ cls = Class.forName(className);
+ filter = (Filter) cls.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
+ throw new JobExecuteException("get filter instance failed!", e);
+ }
+ }
+ if (node.getParallelism() > 0) {
+ filterConfig.setParallelism(node.getParallelism());
+ }
+ try {
dataStream =
- aviatorFilter.filterFunction(
+ filter.filterFunction(
dataStream, filterConfig);
} catch (Exception e) {
throw new JobExecuteException("Create filter instance failed!", e);
}
-
return dataStream;
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
index 4e45b78..fa5f572 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
@@ -2,7 +2,6 @@ package com.geedgenetworks.bootstrap.execution;
import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.enums.ProcessorType;
-import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.CheckConfigUtil;
@@ -12,7 +11,6 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
import com.geedgenetworks.core.pojo.ProjectionConfig;
import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
-import com.geedgenetworks.utils.StringUtil;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -30,12 +28,13 @@ public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionC
public PostprocessingExecutor(List<URL> jarPaths, Config config) {
super(jarPaths, config);
}
+
@Override
protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
Map<String, ProjectionConfig> postprocessingConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) {
Config postprocessors = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES);
- postprocessors.root().unwrapped().forEach((key,value) -> {
+ postprocessors.root().unwrapped().forEach((key, value) -> {
CheckResult result = CheckConfigUtil.checkAllExists(postprocessors.getConfig(key),
ProjectionConfigOptions.TYPE.key());
if (!result.isSuccess()) {
@@ -68,21 +67,29 @@ public class PostprocessingExecutor extends AbstractExecutor<String, ProjectionC
@Override
public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
- Class cls = null;
- ProjectionProcessor projectionProcessor = null;
ProjectionConfig projectionConfig = operatorMap.get(node.getName());
- try {
- cls = Class.forName(projectionConfig.getType());
- projectionProcessor = (ProjectionProcessor) cls.newInstance();
-
- if (node.getParallelism() > 0) {
- projectionConfig.setParallelism(node.getParallelism());
+ String className = projectionConfig.getType();
+ ProjectionProcessor projectionProcessor;
+ if (projectionProcessorMap.containsKey(projectionConfig.getType())) {
+ projectionProcessor = projectionProcessorMap.get(projectionConfig.getType());
+ } else {
+ Class cls;
+ try {
+ cls = Class.forName(className);
+ projectionProcessor = (ProjectionProcessor) cls.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
+ throw new JobExecuteException("get postprocessing pipeline instance failed!", e);
}
+ }
+ if (node.getParallelism() > 0) {
+ projectionConfig.setParallelism(node.getParallelism());
+ }
+ try {
dataStream =
projectionProcessor.projectionProcessorFunction(
dataStream, projectionConfig);
} catch (Exception e) {
- throw new JobExecuteException("create postprocessing pipeline operator error", e);
+ throw new JobExecuteException("Create postprocessing pipeline instance failed!", e);
}
return dataStream;
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
index 84e8718..31fcce8 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
@@ -26,15 +26,17 @@ import java.util.Map;
@Slf4j
public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionConfig> {
private static final String PROCESSOR_TYPE = ProcessorType.PREPROCESSING.getType();
+
public PreprocessingExecutor(List<URL> jarPaths, Config config) {
super(jarPaths, config);
}
+
@Override
protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
Map<String, ProjectionConfig> preprocessingConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) {
Config preprocessors = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES);
- preprocessors.root().unwrapped().forEach((key,value) -> {
+ preprocessors.root().unwrapped().forEach((key, value) -> {
CheckResult result = CheckConfigUtil.checkAllExists(preprocessors.getConfig(key),
ProjectionConfigOptions.TYPE.key());
if (!result.isSuccess()) {
@@ -67,21 +69,29 @@ public class PreprocessingExecutor extends AbstractExecutor<String, ProjectionCo
@Override
public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
- Class cls = null;
- ProjectionProcessor projectionProcessor = null;
ProjectionConfig projectionConfig = operatorMap.get(node.getName());
- try {
- cls = Class.forName(projectionConfig.getType());
- projectionProcessor = (ProjectionProcessor) cls.newInstance();
-
- if (node.getParallelism() > 0) {
- projectionConfig.setParallelism(node.getParallelism());
+ String className = projectionConfig.getType();
+ ProjectionProcessor projectionProcessor;
+ if (projectionProcessorMap.containsKey(projectionConfig.getType())) {
+ projectionProcessor = projectionProcessorMap.get(projectionConfig.getType());
+ } else {
+ Class cls;
+ try {
+ cls = Class.forName(className);
+ projectionProcessor = (ProjectionProcessor) cls.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
+ throw new JobExecuteException("get preprocessing pipeline instance failed!", e);
}
+ }
+ if (node.getParallelism() > 0) {
+ projectionConfig.setParallelism(node.getParallelism());
+ }
+ try {
dataStream =
projectionProcessor.projectionProcessorFunction(
dataStream, projectionConfig);
} catch (Exception e) {
- throw new JobExecuteException("Create preprocessor pipeline instance failed!", e);
+ throw new JobExecuteException("Create preprocessing pipeline instance failed!", e);
}
return dataStream;
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
index a8798e8..4a3b204 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
@@ -18,6 +18,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
import java.util.List;
import java.util.Map;
+
/**
* Initialize config and execute processor
*/
@@ -27,12 +28,13 @@ public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfi
public ProcessingExecutor(List<URL> jarPaths, Config config) {
super(jarPaths, config);
}
+
@Override
protected Map<String, ProjectionConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
Map<String, ProjectionConfig> processingConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.PROCESSING_PIPELINES)) {
Config processors = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES);
- processors.root().unwrapped().forEach((key,value) -> {
+ processors.root().unwrapped().forEach((key, value) -> {
CheckResult result = CheckConfigUtil.checkAllExists(processors.getConfig(key),
ProjectionConfigOptions.TYPE.key());
if (!result.isSuccess()) {
@@ -66,16 +68,24 @@ public class ProcessingExecutor extends AbstractExecutor<String, ProjectionConfi
@Override
public SingleOutputStreamOperator execute(SingleOutputStreamOperator dataStream, Node node) throws JobExecuteException {
- Class cls = null;
- ProjectionProcessor projectionProcessor = null;
ProjectionConfig projectionConfig = operatorMap.get(node.getName());
- try {
- cls = Class.forName(projectionConfig.getType());
- projectionProcessor = (ProjectionProcessor) cls.newInstance();
-
- if (node.getParallelism() > 0) {
- projectionConfig.setParallelism(node.getParallelism());
+ String className = projectionConfig.getType();
+ ProjectionProcessor projectionProcessor;
+ if (projectionProcessorMap.containsKey(projectionConfig.getType())) {
+ projectionProcessor = projectionProcessorMap.get(projectionConfig.getType());
+ } else {
+ Class cls;
+ try {
+ cls = Class.forName(className);
+ projectionProcessor = (ProjectionProcessor) cls.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
+ throw new JobExecuteException("get processing pipeline instance failed!", e);
}
+ }
+ if (node.getParallelism() > 0) {
+ projectionConfig.setParallelism(node.getParallelism());
+ }
+ try {
dataStream =
projectionProcessor.projectionProcessorFunction(
dataStream, projectionConfig);
diff --git a/groot-common/pom.xml b/groot-common/pom.xml
index f00cfc9..8e1f1c8 100644
--- a/groot-common/pom.xml
+++ b/groot-common/pom.xml
@@ -86,22 +86,6 @@
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
-
</dependencies>
<build>
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
index 3c7d095..231fac5 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
@@ -26,7 +26,6 @@ public final class Constants {
public static final String SYSPROP_GROOTSTREAM_CONFIG = "grootstream.config";
public static final String SYSPROP_GROOTSTREAM_PREFIX = "props.";
- public static final String SYSPROP_GROOTSTREAM_HADOOP_PREFIX = "hadoop.";
public static final String SYSPROP_GROOTSTREAM_KB_SCHEDULER_INTERVAL_NAME = "scheduler.knowledge_base.update.interval.minutes";
public static final String HAZELCAST_GROOTSTREAM_CONFIG_FILE_PREFIX = "grootstream";
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java
deleted file mode 100644
index ccec51b..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java
+++ /dev/null
@@ -1,107 +0,0 @@
-package com.geedgenetworks.common.utils;
-
-import com.alibaba.fastjson2.JSONArray;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.shaded.com.google.common.base.Splitter;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.*;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-@Slf4j
-public class HdfsUtils {
-
- private static FileSystem fs;
- private static HdfsUtils hdfsInstance;
-
- private HdfsUtils(Map<String, String> hdfsProp) {
- Configuration config = new Configuration();
- String paths = hdfsProp.getOrDefault("dfs.namenodes","192.168.44.12");
- String port = hdfsProp.getOrDefault("dfs.port","9000");
- String name = hdfsProp.getOrDefault("dfs.user","root");
- List<String> hadoopNameNodes = new ArrayList<>();
- String defaultFS = "";
- if (!paths.isEmpty()) {
- try {
- hadoopNameNodes= Splitter.on(",").trimResults().splitToList(paths);
- } catch (RuntimeException e) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of dfs.namenodes is illegal");
- }
- }
- if (hadoopNameNodes.size() == 1) {
- String replication = hdfsProp.getOrDefault("dfs.replication","1");
- defaultFS = "hdfs://" + hadoopNameNodes.get(0) + ":" + port;
- config.set("fs.defaultFS", defaultFS);
- config.set("dfs.replication", replication);
- } else if (hadoopNameNodes.size() > 1) {
- String replication = hdfsProp.getOrDefault("dfs.replication","2");
- String nameservices = hdfsProp.getOrDefault("dfs.nameservices","ns1");
- String ns1 = hdfsProp.getOrDefault("dfs.ha.namenodes.ns1","nn1,nn2");
- defaultFS = "hdfs://" + nameservices;
- config.set("fs.defaultFS", defaultFS);
- config.set("dfs.nameservices", nameservices);
- config.set("dfs.ha.namenodes.ns1", ns1);
- config.set("dfs.namenode.rpc-address.ns1.nn1", hadoopNameNodes.get(0) + ":" + port);
- config.set("dfs.namenode.rpc-address.ns1.nn2", hadoopNameNodes.get(1) + ":" + port);
- config.set("dfs.client.failover.proxy.provider.ns1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
- config.set("fs.AbstractFileSystem.hdfs.impl", "org.apache.hadoop.fs.Hdfs");
- config.set("dfs.replication", replication);
- }
- try {
- fs = FileSystem.get(new URI(defaultFS), config, name);
- } catch (IOException | URISyntaxException | InterruptedException e) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error",e);
- }
- }
-
- public static HdfsUtils initHdfsUtilInstance(Map<String, String> hdfsProp) {
- if (hdfsInstance == null) {
- hdfsInstance = new HdfsUtils(hdfsProp);
- }
- return hdfsInstance;
- }
- public static HdfsUtils getHdfsUtilInstance() {
- if (hdfsInstance == null) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error");
- }
- return hdfsInstance;
- }
- public byte[] readBytes(Path hdfsPath) {
-
- try {
- if (fs.exists(hdfsPath)) {
- try (FSDataInputStream inputStream = fs.open(hdfsPath);
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
- byte[] buffer = new byte[1024];
- int bytesRead;
- while ((bytesRead = inputStream.read(buffer)) != -1) {
- outputStream.write(buffer, 0, bytesRead);
- }
- return outputStream.toByteArray();
- } catch (RuntimeException e) {
- String errorMsg = String.format("get hdfs file [%s] failed", hdfsPath);
- throw new GrootStreamRuntimeException(
- CommonErrorCode.FILE_OPERATION_ERROR, errorMsg, e);
- }
- }
- throw new GrootStreamRuntimeException(
- CommonErrorCode.FILE_OPERATION_ERROR, String.format("hdfs filepath [%s] is not exists", hdfsPath));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-} \ No newline at end of file
diff --git a/groot-core/pom.xml b/groot-core/pom.xml
index 316bd5d..628670f 100644
--- a/groot-core/pom.xml
+++ b/groot-core/pom.xml
@@ -91,13 +91,12 @@
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<scope>provided</scope>
</dependency>
-<!-- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-hadoop2-compat</artifactId>
- <version>2.4.10</version>
- <scope>compile</scope>
- </dependency>-->
-
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-hadoop-2-uber</artifactId>
+ <version>2.7.5-8.0</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java
index dae6f94..668ba6f 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/AviatorFilter.java
@@ -22,4 +22,10 @@ public class AviatorFilter implements Filter {
.name(FilterConfig.getName());
}
}
+
+ @Override
+ public String type() {
+ return "aviator";
+ }
+
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java
index b2483e8..a173438 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/filter/Filter.java
@@ -10,4 +10,5 @@ public interface Filter {
SingleOutputStreamOperator<Event> filterFunction(
SingleOutputStreamOperator<Event> singleOutputStreamOperator, FilterConfig FilterConfig)
throws Exception;
+ String type();
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java
index 70783bd..bbf7d8d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessor.java
@@ -11,4 +11,5 @@ public interface AggregateProcessor {
SingleOutputStreamOperator<Event> singleOutputStreamOperator,
AggregateConfig aggregateConfig)
throws Exception;
+ String type();
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
index f7e6dab..f6fa9d5 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorImpl.java
@@ -28,4 +28,9 @@ public class AggregateProcessorImpl implements AggregateProcessor {
}*/
return singleOutputStreamOperator;
}
+
+ @Override
+ public String type() {
+ return "aggregate";
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
index 8b7e8d1..f492767 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
@@ -8,7 +8,7 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.UDFContext;
import com.geedgenetworks.common.utils.ColumnUtil;
import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.utils.HdfsUtils;
+import com.geedgenetworks.core.utils.HdfsUtils;
import com.geedgenetworks.core.metrics.InternalMetrics;
import com.geedgenetworks.core.pojo.ProjectionConfig;
import com.geedgenetworks.common.udf.UDF;
@@ -21,7 +21,6 @@ import com.googlecode.aviator.Options;
import com.googlecode.aviator.exception.ExpressionRuntimeException;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
@@ -56,16 +55,6 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
CommonConfig commonConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class);
KnowledgeBaseScheduler.startSchedulerForKnowledgeBase(Integer.parseInt(commonConfig.getPropertiesConfig().getOrDefault(Constants.SYSPROP_GROOTSTREAM_KB_SCHEDULER_INTERVAL_NAME, "5")));
-
- for (Map.Entry<String, String> entry : commonConfig.getPropertiesConfig().entrySet()) {
- Map<String, String> hdfsProp = new HashMap<>();
- if (entry.getKey().startsWith(Constants.SYSPROP_GROOTSTREAM_HADOOP_PREFIX)) {
- hdfsProp.put(entry.getKey().replace(Constants.SYSPROP_GROOTSTREAM_HADOOP_PREFIX, ""), entry.getValue());
- }
- if(!hdfsProp.isEmpty()){
- HdfsUtils.initHdfsUtilInstance(hdfsProp);
- }
- }
for (UDFContext udfContext : udfContexts) {
Expression filterExpression = null;
UdfEntity udfEntity = new UdfEntity();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java
index e97ab98..862ba5e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessor.java
@@ -11,4 +11,5 @@ public interface ProjectionProcessor {
SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator,
ProjectionConfig projectionConfig)
throws Exception;
+ String type();
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java
index 5f6d0fe..79b0e0d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessorImpl.java
@@ -11,7 +11,7 @@ public class ProjectionProcessorImpl implements ProjectionProcessor {
public SingleOutputStreamOperator<Event> projectionProcessorFunction(
SingleOutputStreamOperator<Event> grootEventSingleOutputStreamOperator,
ProjectionConfig projectionConfig)
- throws Exception {
+ throws Exception{
if (projectionConfig.getParallelism() != 0) {
return grootEventSingleOutputStreamOperator
@@ -24,4 +24,9 @@ public class ProjectionProcessorImpl implements ProjectionProcessor {
.name(projectionConfig.getName());
}
}
+
+ @Override
+ public String type() {
+ return "projection";
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java
index 9a4509b..113e164 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java
@@ -6,7 +6,7 @@ import com.alibaba.fastjson2.JSONArray;
import com.geedgenetworks.common.config.KnowledgeBaseConfig;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.common.utils.HdfsUtils;
+import com.geedgenetworks.core.utils.HdfsUtils;
import com.geedgenetworks.core.pojo.KnowLedgeBaseFileMeta;
import com.geedgenetworks.core.utils.HttpClientPoolUtil;
import lombok.extern.slf4j.Slf4j;
@@ -72,7 +72,7 @@ public abstract class AbstractKnowledgeBaseHandler {
switch (fileSystemType) {
case "hdfs":
- return HdfsUtils.getHdfsUtilInstance().readBytes(new org.apache.hadoop.fs.Path(filePath, fileName));
+ return HdfsUtils.readBytes(new org.apache.hadoop.fs.Path(filePath, fileName));
case "local":
return PathUtil.readBytes(Paths.get(filePath, fileName));
default:
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/HdfsUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/HdfsUtils.java
new file mode 100644
index 0000000..a991ce9
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/HdfsUtils.java
@@ -0,0 +1,43 @@
+package com.geedgenetworks.core.utils;
+
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import java.io.*;
+import org.apache.hadoop.fs.FileSystem;
+
+
+
+@Slf4j
+public class HdfsUtils {
+
+ public static byte[] readBytes(Path hdfsPath) {
+
+ try {
+ FileSystem fs = hdfsPath.getFileSystem(new Configuration());
+ if (fs.exists(hdfsPath)) {
+ try (FSDataInputStream inputStream = fs.open(hdfsPath);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+ byte[] buffer = new byte[1024];
+ int bytesRead;
+ while ((bytesRead = inputStream.read(buffer)) != -1) {
+ outputStream.write(buffer, 0, bytesRead);
+ }
+ return outputStream.toByteArray();
+ } catch (RuntimeException e) {
+ String errorMsg = String.format("get hdfs file [%s] failed", hdfsPath);
+ throw new GrootStreamRuntimeException(
+ CommonErrorCode.FILE_OPERATION_ERROR, errorMsg, e);
+ }
+ }
+ throw new GrootStreamRuntimeException(
+ CommonErrorCode.FILE_OPERATION_ERROR, String.format("hdfs filepath [%s] is not exists", hdfsPath));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+} \ No newline at end of file
diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.filter.Filter b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.filter.Filter
new file mode 100644
index 0000000..2268533
--- /dev/null
+++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.filter.Filter
@@ -0,0 +1 @@
+com.geedgenetworks.core.filter.AviatorFilter \ No newline at end of file
diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor
new file mode 100644
index 0000000..426a1a9
--- /dev/null
+++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.aggregate.AggregateProcessor
@@ -0,0 +1 @@
+com.geedgenetworks.core.processor.aggregate.AggregateProcessorImpl \ No newline at end of file
diff --git a/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor
new file mode 100644
index 0000000..ede2c8c
--- /dev/null
+++ b/groot-core/src/main/resources/META-INF/services/com.geedgenetworks.core.processor.projection.ProjectionProcessor
@@ -0,0 +1 @@
+com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl \ No newline at end of file