summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--config/grootstream_job_template.yaml6
-rw-r--r--config/udf.plugins4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java6
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ParameterSplitter.java13
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java2
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java138
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java101
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigLocator.java (renamed from groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigLocator.java)4
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java64
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java2
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/ConsulConfig.java24
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java36
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java7
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java28
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigSections.java3
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamDomConfigProcessor.java235
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/GrootYamlParser.java117
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/HdfsConfig.java10
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/HosConfig.java18
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/HttpConPoolConfig.java23
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java34
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeConfig.java15
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/NacosConfig.java34
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java206
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/SnowflakeConfig.java10
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/StorageConfig.java21
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/TypesafeConfigUtils.java2
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/ZookeeperConfig.java15
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/utils/HttpClientUtils.java31
-rw-r--r--groot-common/src/main/resources/groot.yaml74
-rw-r--r--groot-common/src/main/resources/udf.plugins4
-rw-r--r--groot-common/src/test/java/com/geedgenetworks/common/config/YamlGrootStreamConfigParserTest.java16
-rw-r--r--groot-common/src/test/resources/grootstream.yaml77
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java2
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java7
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java6
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/sink/KafkaSink.java66
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/source/KafkaSource.java102
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/types/BooleanType.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java23
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestamp.java)17
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java18
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java52
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java102
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java16
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java20
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java17
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java9
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java12
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/HadoopUtils.java139
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/GeoIpKnowledgeBase.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java10
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/hbase/HbaseConnectBuilder.java3
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java2
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java5
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java16
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java (renamed from groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampConverterTest.java)11
-rw-r--r--groot-example/src/main/resources/examples/inline-to-console-job.yaml11
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java114
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java8
65 files changed, 617 insertions, 1593 deletions
diff --git a/config/grootstream_job_template.yaml b/config/grootstream_job_template.yaml
index 3b798b2..c5bc99b 100644
--- a/config/grootstream_job_template.yaml
+++ b/config/grootstream_job_template.yaml
@@ -29,7 +29,6 @@ sources:
inline_source:
type : inline
fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
-
# watermark_timestamp: recv_time
# watermark_timestamp_unit: s
# watermark_lag: 60
@@ -85,6 +84,7 @@ preprocessing_pipelines:
lookup_fields: [ '' ]
output_fields: [ '' ]
filter: event.common_schema_type == 'BASE'
+
processing_pipelines:
session_record_processor: # [object] Processing Pipeline
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
@@ -129,7 +129,7 @@ processing_pipelines:
parameters:
param: $.tags[?(@.tag=='device_group')][0].value
- - function: UNIX_TIMESTAMP_FUNCTION
+ - function: CURRENT_UNIX_TIMESTAMP
output_fields: [ processing_time ]
parameters:
precision: seconds
@@ -184,7 +184,6 @@ processing_pipelines:
parameters:
path: [ props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file ]
-
sinks:
kafka_sink_a:
type: kafka
@@ -247,4 +246,3 @@ application: # [object] Application Configuration
- name: kafka_sink_b
parallelism: 1
downstream: []
-
diff --git a/config/udf.plugins b/config/udf.plugins
index d22c057..07867f6 100644
--- a/config/udf.plugins
+++ b/config/udf.plugins
@@ -3,9 +3,9 @@ com.geedgenetworks.core.udf.Drop
com.geedgenetworks.core.udf.AsnLookup
com.geedgenetworks.core.udf.Eval
com.geedgenetworks.core.udf.JsonExtract
-com.geedgenetworks.core.udf.UnixTimestamp
+com.geedgenetworks.core.udf.CurrentUnixTimestamp
com.geedgenetworks.core.udf.Domain
com.geedgenetworks.core.udf.DecodeBase64
com.geedgenetworks.core.udf.GeoIpLookup
com.geedgenetworks.core.udf.PathCombine
-com.geedgenetworks.core.udf.UnixTimestampConverter
+com.geedgenetworks.core.udf.UnixTimestampConverter \ No newline at end of file
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java
index ae904a7..3fb674d 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java
@@ -28,12 +28,12 @@ public abstract class CommandArgs {
@Parameter(
names = {"--check"},
- description = "check config")
+ description = "Whether check config")
protected boolean checkConfig = false;
-
@Parameter(names = {"-i", "--variable"},
splitter = ParameterSplitter.class,
- description = "user-defined parameters , such as -i data_center=bj")
+ description = "user-defined parameters , such as -i data_center=bj" +
+ "We use ',' as separator, when inside \"\", ',' are treated as normal characters instead of delimiters.")
protected List<String> variables = Collections.emptyList();
@Parameter(names = {"-n", "--name"},
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ParameterSplitter.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ParameterSplitter.java
index 55e507b..5c5bb9e 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ParameterSplitter.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ParameterSplitter.java
@@ -1,31 +1,32 @@
+
package com.geedgenetworks.bootstrap.command;
import com.beust.jcommander.converters.IParameterSplitter;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
public class ParameterSplitter implements IParameterSplitter {
+
@Override
public List<String> split(String value) {
- if (!value.contains(",")) {
- return Collections.singletonList(value);
- }
-
List<String> result = new ArrayList<>();
StringBuilder currentToken = new StringBuilder();
boolean insideBrackets = false;
+ boolean insideQuotes = false;
for (char c : value.toCharArray()) {
+
if (c == '[') {
insideBrackets = true;
} else if (c == ']') {
insideBrackets = false;
+ } else if (c == '"') {
+ insideQuotes = !insideQuotes;
}
- if (c == ',' && !insideBrackets) {
+ if (c == ',' && !insideQuotes && !insideBrackets) {
result.add(currentToken.toString().trim());
currentToken = new StringBuilder();
} else {
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
index e03181b..ba0d1a8 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
@@ -114,7 +114,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
}
configuration.setString(Constants.SYSPROP_UDF_PLUGIN_CONFIG, JSON.toJSONString(grootStreamConfig.getUDFPluginConfig()));
- configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getEngineConfig()));
+ configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getCommonConfig()));
environment.getConfig().enableObjectReuse();
environment.getConfig().setGlobalJobParameters(configuration);
setTimeCharacteristic();
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java
index f0edbd2..4fdf0c6 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java
@@ -1,141 +1,29 @@
package com.geedgenetworks.common.config;
-import org.yaml.snakeyaml.Yaml;
+import lombok.Data;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
-import static com.geedgenetworks.common.config.GrootYamlParser.getClassReflect;
-import static com.geedgenetworks.common.config.GrootYamlParser.loadYaml;
+import static com.google.common.base.Preconditions.checkNotNull;
-/** Created by wk on 2021/1/6. */
-public class CommonConfig {
- public static Map<String, Object> yamlData;
- public static List<String> udfPluginData = new ArrayList<>();
+/**
+ * Describe the common config of groot stream (grootstream.yaml).
+ */
+@Data
+public class CommonConfig implements Serializable {
- static {
- try {
- Yaml yaml = new Yaml();
- yamlData =
- yaml.load(
- GrootYamlParser.class
- .getClassLoader()
- .getResourceAsStream("groot.yaml"));
+ private List<KnowledgeBaseConfig> knowledgeBaseConfig = CommonConfigOptions.KNOWLEDGE_BASE.defaultValue();
+ private Map<String,String> propertiesConfig = CommonConfigOptions.PROPERTIES.defaultValue();
- InputStream inputStreamUdf = GrootYamlParser.class.getClassLoader().getResourceAsStream("groot-platform-plugin");
- if (inputStreamUdf != null) {
- BufferedReader br = new BufferedReader(new InputStreamReader(inputStreamUdf));
- String line;
- while ((line = br.readLine()) != null) {
- // 解析每行并添加到列表
- udfPluginData.add(line);
- }
- br.close();
- } else {
- // FIXME
- System.out.println("udf-plugin配置文件不存在");
-
- }
- } catch (Exception e) {
- // FIXME
- e.printStackTrace();
- }
+ public void setKnowledgeBaseConfig(List<KnowledgeBaseConfig> knowledgeBaseConfig) {
+ checkNotNull(knowledgeBaseConfig, CommonConfigOptions.KNOWLEDGE_BASE + "knowledgeConfig should not be null");
+ this.knowledgeBaseConfig = knowledgeBaseConfig;
}
- public static final Map<String, String> UDF_CLASS_REFLECT = getClassReflect(udfPluginData);
-
- public static final String NACOS_SERVER_ADDR =
- GrootYamlParser.getStringProperty("nacos.server_addr", yamlData);
- public static final String NACOS_USERNAME =
- GrootYamlParser.getStringProperty("nacos.username", yamlData);
- public static final String NACOS_PASSWORD =
- GrootYamlParser.getStringProperty("nacos.password", yamlData);
- public static final String NACOS_NAMESPACE =
- GrootYamlParser.getStringProperty("nacos.namespace", yamlData);
- public static final String NACOS_DATA_ID =
- GrootYamlParser.getStringProperty("nacos.data_id", yamlData);
- public static final String NACOS_GROUP =
- GrootYamlParser.getStringProperty("nacos.group", yamlData);
- public static final long NACOS_READ_TIMEOUT =
- GrootYamlParser.getIntProperty("nacos.read_timeout", yamlData);
-
- public static final String CONSUL_SERVER_ADDR =
- GrootYamlParser.getStringProperty("consul.server_addr", yamlData);
- public static final int CONSUL_SERVER_PORT =
- GrootYamlParser.getIntProperty("consul.server_port", yamlData);
- public static final String CONSUL_TOKEN =
- GrootYamlParser.getStringProperty("consul.token", yamlData);
-
- public static final String HOS_TOKEN = GrootYamlParser.getStringProperty("hos.token", yamlData);
- public static final int KNOWLEDGE_FILE_CHECK_NUMBER =
- GrootYamlParser.getIntProperty("knowledge.file_check_number", yamlData);
- public static final String ID_ARRAY =
- GrootYamlParser.getStringProperty("knowledge.id_array", yamlData);
- public static final int HTTP_POOL_MAX_CONNECTION =
- GrootYamlParser.getIntProperty("http.pool.max_connection", yamlData);
- public static final int HTTP_POOL_MAX_PER_ROUTE =
- GrootYamlParser.getIntProperty("http.pool.max_per_route", yamlData);
- public static final int HTTP_POOL_REQUEST_TIMEOUT =
- GrootYamlParser.getIntProperty("http.pool.request_timeout", yamlData);
- public static final int HTTP_POOL_CONNECT_TIMEOUT =
- GrootYamlParser.getIntProperty("http.pool.connect_timeout", yamlData);
- public static final int HTTP_POOL_RESPONSE_TIMEOUT =
- GrootYamlParser.getIntProperty("http.pool.response_timeout", yamlData);
- public static final String GTPC_FAMILY_NAME = "gtp";
- public static final String RADIUS_FAMILY_NAME = "radius";
- public static final String COMMON_FAMILY_NAME = "common";
- public static final String DEFAULT_RELATIONSHIP_MODULE = "vsys";
- public static final String ZOOKEEPER_QUORUM =
- GrootYamlParser.getStringProperty("zookeeper.quorum", yamlData);
- public static final long DATA_CENTER_ID_NUM =
- GrootYamlParser.getIntProperty("snowid.data_center_id_num", yamlData);
- public static final String KNOWLEDGEBASE_FILE_STORAGE_PATH =
- GrootYamlParser.getStringProperty("knowledgebase.file_storage_path", yamlData);
- public static final String KNOWLEDGEBASE_FILE_STORAGE_TYPE =
- GrootYamlParser.getStringProperty("knowledgebase.file_storage_type", yamlData);
- public static final String KNOWLEDGEBASE_TYPE_LIST =
- GrootYamlParser.getStringProperty("knowledgebase.type_list", yamlData);
- public static final String KNOWLEDGEBASE_NAME_LIST =
- GrootYamlParser.getStringProperty("knowledgebase.name_list", yamlData);
- public static final String HDFS_SERVERS =
- GrootYamlParser.getStringProperty("hdfs.servers", yamlData);
- /* public static final String IPV4_USER_DEFINED_ID = GrootYamlParser.getStringProperty( "knowledge.ipv4_user_defined_id");
- public static final String IPV6_USER_DEFINED_ID = GrootYamlParser.getStringProperty( "knowledge.ipv6_user_defined_id");
- public static final String IPV4_BUILT_IN_ID = GrootYamlParser.getStringProperty( "knowledge.ipv4_built_in_id");
- public static final String IPV6_BUILT_IN_ID = GrootYamlParser.getStringProperty( "knowledge.ipv6_built_in_id");
- public static final String ASN_V4_ID = GrootYamlParser.getStringProperty( "knowledge.asn_v4_id");
- public static final String ASN_V6_ID = GrootYamlParser.getStringProperty( "knowledge.asn_v6_id");*/
-
- public static final Map<String, Object> KNOWLEDGE_IPLOOKUP_MAP =
- GrootYamlParser.getMap("knowledge.iplookup", yamlData);
- public static final Map<String, Object> KNOWLEDGE_ASNLOOKUP_MAP =
- GrootYamlParser.getMap("knowledge.asnlookup", yamlData);
-
- public static final Integer HBASE_TICK_TUPLE_FREQ_SECS =
- GrootYamlParser.getIntProperty("hbase.tick_tuple_freq_secs", yamlData);
- public static final Integer HBASE_GTPC_SCAN_MAX_ROWS =
- GrootYamlParser.getIntProperty("hbase.gtpc_scan_max_rows", yamlData);
- public static final Integer HBASE_RADIUS_SCAN_MAX_ROWS =
- GrootYamlParser.getIntProperty("hbase.radius_scan_max_rows", yamlData);
- public static final String HBASE_RADIUS_TABLE_NAME =
- GrootYamlParser.getStringProperty("hbase.radius_table_name", yamlData);
- public static final String HBASE_GTPC_TABLE_NAME =
- GrootYamlParser.getStringProperty("hbase.gtpc_table_name", yamlData);
- public static final String HBASE_RPC_TIMEOUT =
- GrootYamlParser.getStringProperty("hbase.rpc_timeout", yamlData);
- public static final String DATA_RELATIONSHIP_MODEL =
- GrootYamlParser.getStringProperty("data.relationship_model", yamlData);
-
- public static void main(String[] args) {
- System.out.println("");
- }
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java
new file mode 100644
index 0000000..6dc9469
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java
@@ -0,0 +1,101 @@
+package com.geedgenetworks.common.config;
+
+import com.hazelcast.internal.config.AbstractDomConfigProcessor;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import lombok.extern.slf4j.Slf4j;
+import org.w3c.dom.Node;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.hazelcast.internal.config.DomConfigHelper.*;
+
+@Slf4j
+public class CommonConfigDomProcessor extends AbstractDomConfigProcessor {
+ private final GrootStreamConfig config;
+ CommonConfigDomProcessor(boolean domLevel3, GrootStreamConfig config) {
+ super(domLevel3);
+ this.config = config;
+ }
+
+ @Override
+ public void buildConfig(Node rootNode) {
+ final CommonConfig commonConfig = config.getCommonConfig();
+ for (Node node : childElements(rootNode)) {
+ String name = cleanNodeName(node);
+ if (CommonConfigOptions. KNOWLEDGE_BASE.key().equals(name)) {
+ commonConfig.setKnowledgeBaseConfig(parseKnowledgeBaseConfig(node));
+ } else if (CommonConfigOptions.PROPERTIES.key().equals(name)) {
+ commonConfig.setPropertiesConfig(parsePropertiesConfig(node));
+ } else {
+ log.warn("Unrecognized configuration element: " + name);
+ }
+
+ }
+ }
+
+ private Map<String, String> parsePropertiesConfig(Node properties) {
+
+ Map<String, String> propertiesMap = new HashMap<>();
+ for (Node node : childElements(properties)) {
+ String name = cleanNodeName(node);
+ propertiesMap.put(name,getTextContent(node));
+ }
+ return propertiesMap;
+ }
+
+
+ private List<KnowledgeBaseConfig> parseKnowledgeBaseConfig(Node kbRootNode) {
+
+ List<KnowledgeBaseConfig> knowledgeConfigList = new ArrayList<>();
+ for (Node node : childElements(kbRootNode)) {
+ knowledgeConfigList.add(parseKnowledgeBaseConfigAsObject(node));
+ }
+ return knowledgeConfigList;
+ }
+
+ private KnowledgeBaseConfig parseKnowledgeBaseConfigAsObject(Node kbNode) {
+ KnowledgeBaseConfig knowledgeBaseConfig = new KnowledgeBaseConfig();
+ for (Node node : childElements(kbNode)) {
+ String name = cleanNodeName(node);
+
+ if (CommonConfigOptions.KNOWLEDGE_BASE_NAME.key().equals(name)) {
+ knowledgeBaseConfig.setName(getTextContent(node));
+ } else if (CommonConfigOptions.KNOWLEDGE_BASE_TYPE.key().equals(name)) {
+ knowledgeBaseConfig.setType(getTextContent(node));
+ } else if (CommonConfigOptions.KNOWLEDGE_BASE_FILES.key().equals(name)) {
+ knowledgeBaseConfig.setFiles(parseKnowledgeBaseFilesConfig(node));
+ } else if (CommonConfigOptions.KNOWLEDGE_BASE_PROPERTIES.key().equals(name)) {
+ knowledgeBaseConfig.setProperties(parseKnowledgeBasePropertiesConfig(node));
+ }
+ else{
+ log.warn("Unrecognized configuration element: " + name);
+ }
+ }
+ return knowledgeBaseConfig;
+ }
+
+ private Map<String, String> parseKnowledgeBasePropertiesConfig(Node properties) {
+ Map<String, String> propertiesMap = new HashMap<>();
+ for (Node node : childElements(properties)) {
+ String name = cleanNodeName(node);
+ propertiesMap.put(name,getTextContent(node));
+ }
+ return propertiesMap;
+ }
+
+ private List<String> parseKnowledgeBaseFilesConfig(Node files) {
+
+ List<String> asnLookupConfigList = new ArrayList<>();
+ for (Node node : childElements(files)) {
+ String value = node.getNodeValue();
+ asnLookupConfigList.add(value);
+ }
+ return asnLookupConfigList;
+ }
+
+
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigLocator.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigLocator.java
index 3962324..5302cc2 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigLocator.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigLocator.java
@@ -5,9 +5,9 @@ import com.hazelcast.internal.config.AbstractConfigLocator;
import static com.hazelcast.internal.config.DeclarativeConfigUtil.YAML_ACCEPTED_SUFFIXES;
-public final class GrootStreamConfigLocator extends AbstractConfigLocator {
+public final class CommonConfigLocator extends AbstractConfigLocator {
- public GrootStreamConfigLocator() {
+ public CommonConfigLocator() {
}
@Override
public boolean locateFromSystemProperty() {
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java
new file mode 100644
index 0000000..9105afe
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java
@@ -0,0 +1,64 @@
+package com.geedgenetworks.common.config;
+
+import com.alibaba.fastjson2.TypeReference;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class CommonConfigOptions {
+
+ public static final Option<Map<String, String>> KNOWLEDGE_BASE_PROPERTIES =
+ Options.key("properties")
+ .mapType()
+ .defaultValue(new HashMap<String,String>())
+ .withDescription("The properties of knowledgebase");
+ public static final Option<String> KNOWLEDGE_BASE_NAME =
+ Options.key("name")
+ .stringType()
+ .defaultValue("")
+ .withDescription("The name of knowledgebase.");
+ public static final Option<String> KNOWLEDGE_BASE_TYPE =
+ Options.key("type")
+ .stringType()
+ .defaultValue("")
+ .withDescription("The type of knowledgebase.");
+
+ public static final Option<List<String>> KNOWLEDGE_BASE_FILES =
+ Options.key("files")
+ .listType()
+ .defaultValue(new ArrayList<String>())
+ .withDescription("The files of knowledgebase.");
+
+
+ public static final Option<String> KNOWLEDGE_BASE_STORAGE_FS_TYPE = Options.key("fs_type")
+ .stringType()
+ .defaultValue("localfile")
+ .withDescription("The fs type of knowledge base storage.");
+
+ public static final Option<String> KNOWLEDGE_BASE_STORAGE_FS_DEFAULT_PATH = Options.key("fs_default_path")
+ .stringType()
+ .defaultValue("")
+ .withDescription("The default path of knowledge base storage.");
+
+ public static final Option<List<KnowledgeBaseConfig>> KNOWLEDGE_BASE =
+ Options.key("knowledge_base")
+ .type(new TypeReference<List<KnowledgeBaseConfig>>() {})
+ .noDefaultValue()
+ .withDescription("The knowledge base configuration.");
+
+ public static final Option<Map<String, String>> PROPERTIES =
+ Options.key("properties")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("The general properties of grootstream");
+
+ public static final Option<String> ZOOKEEPER_QUORUM =
+ Options.key("quorum")
+ .stringType()
+ .defaultValue("")
+ .withDescription("The quorum of zookeeper.");
+
+
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java
index d7f7381..a967ae5 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java
@@ -26,7 +26,7 @@ public class ConfigProvider {
}
public static GrootStreamConfig locateAndGetGrootStreamConfig(Properties properties) {
- GrootStreamConfigLocator yamlConfigLocator = new GrootStreamConfigLocator();
+ CommonConfigLocator yamlConfigLocator = new CommonConfigLocator();
GrootStreamConfig config;
validateSuffixInSystemProperty(Constants.SYSPROP_GROOTSTREAM_CONFIG);
if (yamlConfigLocator.locateFromSystemProperty()) {
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ConsulConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ConsulConfig.java
deleted file mode 100644
index d8d5b7b..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/ConsulConfig.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-@Data
-public class ConsulConfig implements Serializable {
- public String serverAddr = ServerConfigOptions.CONSUL_SERVER_ADDR.defaultValue();
- public int serverPort = ServerConfigOptions.CONSUL_SERVER_PORT.defaultValue();
- public String token = ServerConfigOptions.CONSUL_TOKEN.defaultValue();
-
- public void setServerAddr(String serverAddr) {
- checkNotNull(serverAddr, ServerConfigOptions.CONSUL_SERVER_ADDR + "serverAddr should not be null");
- this.serverAddr = serverAddr;
- }
-
- public void setToken(String token) {
- checkNotNull(token, ServerConfigOptions.CONSUL_TOKEN + "token should not be null");
- this.token = token;
- }
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java
deleted file mode 100644
index a286086..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import lombok.Data;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Describe the common config of groot stream (groot.yaml).
- */
-@Data
-public class EngineConfig implements Serializable {
-
-
- private HttpConPoolConfig httpConPoolConfig = ServerConfigOptions.HTTP_CON_POOL.defaultValue();
- private List<KnowledgeConfig> knowledgeBaseConfig = ServerConfigOptions.KNOWLEDGE_BASE.defaultValue();
- private NacosConfig nacosConfig = ServerConfigOptions.NACOS.defaultValue();
- private ConsulConfig consulConfig = ServerConfigOptions.CONSUL.defaultValue();
- private ZookeeperConfig zookeeperConfig = ServerConfigOptions.ZOOKEEPER.defaultValue();
- private HdfsConfig hdfsConfig = ServerConfigOptions.HDFS.defaultValue();
- private Map<String,String> propertiesConfig = ServerConfigOptions.PROPERTIES.defaultValue();
-
-
- public void setKnowledgeBaseConfig(List<KnowledgeConfig> knowledgeBaseConfig) {
- checkNotNull(knowledgeBaseConfig, ServerConfigOptions.KNOWLEDGE_BASE + "knowledgeConfig should not be null");
- this.knowledgeBaseConfig = knowledgeBaseConfig;
- }
-
-
-
-
-
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java
index 00d8319..189b05b 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java
@@ -9,7 +9,7 @@ import java.util.List;
@Slf4j
public class GrootStreamConfig {
- private final EngineConfig engineConfig = new EngineConfig();
+ private final CommonConfig commonConfig = new CommonConfig();
private Config hazelcastConfig;
@@ -34,7 +34,6 @@ public class GrootStreamConfig {
}
-
/**
* Returns the absolute path for `groot-stream.home` based from the system property {@link
* GrootStreamProperties#GROOTSTREAM_HOME}
@@ -53,8 +52,8 @@ public class GrootStreamConfig {
public void setHazelcastConfig(Config hazelcastConfig) {
this.hazelcastConfig = hazelcastConfig;
}
- public EngineConfig getEngineConfig() {
- return engineConfig;
+ public CommonConfig getCommonConfig() {
+ return commonConfig;
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java
index ee76cb6..e948e8a 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java
@@ -19,13 +19,13 @@ public class GrootStreamConfigBuilder extends AbstractYamlConfigBuilder {
private final InputStream in;
public GrootStreamConfigBuilder() {
- this((GrootStreamConfigLocator) null);
+ this((CommonConfigLocator) null);
}
- public GrootStreamConfigBuilder(GrootStreamConfigLocator locator) {
+ public GrootStreamConfigBuilder(CommonConfigLocator locator) {
if (locator == null) {
- locator = new GrootStreamConfigLocator();
+ locator = new CommonConfigLocator();
locator.locateEverywhere();
}
this.in = locator.getIn();
@@ -46,16 +46,16 @@ public class GrootStreamConfigBuilder extends AbstractYamlConfigBuilder {
public GrootStreamConfig build(GrootStreamConfig config) {
try {
- parseAndBuildConfig(config);
+ parseAndBuildCommonConfig(config);
+ parseAndBuildUDFPluginConfig(config);
+ parseAndBuildHazelcastConfig(config);
} catch (Exception e) {
throw ExceptionUtil.rethrow(e);
}
- config.setHazelcastConfig(ConfigProvider.locateAndGetMemberConfig(getProperties()));
- config.setUDFPluginConfig(ConfigProvider.locateAndGetUDFPluginConfig(getProperties()));
return config;
}
- private void parseAndBuildConfig(GrootStreamConfig config) throws Exception {
+ private void parseAndBuildCommonConfig(GrootStreamConfig config) throws Exception {
YamlMapping yamlRootNode;
try {
yamlRootNode = (YamlMapping) YamlLoader.load(in);
@@ -67,6 +67,7 @@ public class GrootStreamConfigBuilder extends AbstractYamlConfigBuilder {
}
YamlNode grootStreamRoot =
yamlRootNode.childAsMapping(GrootStreamConfigSections.GROOTSTREAM.name);
+
if (grootStreamRoot == null) {
grootStreamRoot = yamlRootNode;
}
@@ -77,8 +78,19 @@ public class GrootStreamConfigBuilder extends AbstractYamlConfigBuilder {
replaceVariables(w3cRootNode);
importDocuments(grootStreamRoot);
- new GrootStreamDomConfigProcessor(true, config).buildConfig(w3cRootNode);
+ new CommonConfigDomProcessor(true, config).buildConfig(w3cRootNode);
+ }
+
+ private void parseAndBuildHazelcastConfig(GrootStreamConfig config){
+ config.setHazelcastConfig(ConfigProvider.locateAndGetMemberConfig(getProperties()));
}
+ private void parseAndBuildUDFPluginConfig(GrootStreamConfig config){
+ config.setUDFPluginConfig(ConfigProvider.locateAndGetUDFPluginConfig(getProperties()));
+ }
+
+
+
+
public GrootStreamConfigBuilder setProperties(Properties properties) {
if (properties == null) {
properties = System.getProperties();
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigSections.java b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigSections.java
index d0a35b8..d67c97e 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigSections.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigSections.java
@@ -2,8 +2,7 @@ package com.geedgenetworks.common.config;
/** Configuration sections for Hazelcast GrootStream shared by YAML based configurations */
enum GrootStreamConfigSections {
- GROOTSTREAM("grootstream", false),
- ENGINE("engine", false);
+ GROOTSTREAM("grootstream", false);
final String name;
final boolean multipleOccurrence;
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamDomConfigProcessor.java b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamDomConfigProcessor.java
deleted file mode 100644
index c3f7a20..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamDomConfigProcessor.java
+++ /dev/null
@@ -1,235 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import com.hazelcast.internal.config.AbstractDomConfigProcessor;
-import com.hazelcast.logging.ILogger;
-import com.hazelcast.logging.Logger;
-import lombok.extern.slf4j.Slf4j;
-import org.slf4j.LoggerFactory;
-import org.w3c.dom.Node;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static com.hazelcast.internal.config.DomConfigHelper.*;
-
-@Slf4j
-public class GrootStreamDomConfigProcessor extends AbstractDomConfigProcessor {
- // TODO 日志规范:使用日志门面 slf4j 代替
- private static final ILogger LOGGER = Logger.getLogger(GrootStreamDomConfigProcessor.class);
-
- private final GrootStreamConfig config;
-
- GrootStreamDomConfigProcessor(boolean domLevel3, GrootStreamConfig config) {
- super(domLevel3);
- this.config = config;
- }
-
-/* @Override
- public void buildConfig(Node rootNode) {
- for (Node node : childElements(rootNode)) {
- String nodeName = cleanNodeName(node);
- if (occurrenceSet.contains(nodeName)) {
- throw new InvalidConfigurationException(
- "Duplicate '" + nodeName + "' definition found in the configuration.");
- }
- if (handleNode(node, nodeName)) {
- continue;
- }
- if (!GrootStreamConfigSections.canOccurMultipleTimes(nodeName)) {
- occurrenceSet.add(nodeName);
- }
- }
- }*/
-
-/* private boolean handleNode(Node node, String name) {
- if (GrootStreamConfigSections.GROOTSTREAM.isEqual(name)) {
- parseEngineConfig(node, config);
- } else {
- return true;
- }
- parseEngineConfig(node, config);
-
- return false;
- }*/
-
- public void buildConfig(Node engineNode) {
- final EngineConfig engineConfig = config.getEngineConfig();
- for (Node node : childElements(engineNode)) {
- String name = cleanNodeName(node);
- if (ServerConfigOptions.HTTP_CON_POOL.key().equals(name)) {
- engineConfig.setHttpConPoolConfig(parseHttpConPoolConfig(node));
- } else if (ServerConfigOptions. KNOWLEDGE_BASE.key().equals(name)) {
- engineConfig.setKnowledgeBaseConfig(parseKnowledgeBaseConfig(node));
- } else if (ServerConfigOptions.NACOS.key().equals(name)) {
- engineConfig.setNacosConfig(parseNacosConfig(node));
- } else if (ServerConfigOptions.CONSUL.key().equals(name)) {
- engineConfig.setConsulConfig(parseConsulConfig(node));
- } else if (ServerConfigOptions.ZOOKEEPER.key().equals(name)) {
- engineConfig.setZookeeperConfig(parseZookeeperConfig(node));
- } else if (ServerConfigOptions.HDFS.key().equals(name)) {
- engineConfig.setHdfsConfig(parseHdfsConfig(node));
- } else if (ServerConfigOptions.PROPERTIES.key().equals(name)) {
- engineConfig.setPropertiesConfig(parsePropertiesConfig(node));
- } else {
- LOGGER.warning("Unrecognized configuration element: " + name);
-
- }
-
- }
- }
-
- private Map<String, String> parsePropertiesConfig(Node properties) {
-
- Map<String, String> propertiesMap = new HashMap<>();
- for (Node node : childElements(properties)) {
- String name = cleanNodeName(node);
- propertiesMap.put(name,getTextContent(node));
- }
- return propertiesMap;
- }
-
-
- private ZookeeperConfig parseZookeeperConfig(Node zookeeperNode) {
- ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
- for (Node node : childElements(zookeeperNode)) {
- String name = cleanNodeName(node);
- if (ServerConfigOptions.ZOOKEEPER_QUORUM.key().equals(name)) {
- zookeeperConfig.setQuorum(getTextContent(node));
- } else {
- LOGGER.warning("Unrecognized configuration element: " + name);
- }
- }
- return zookeeperConfig;
- }
-
- private HdfsConfig parseHdfsConfig(Node hdfsNode) {
- HdfsConfig hdfsConfig = new HdfsConfig();
- for (Node node : childElements(hdfsNode)) {
- String name = cleanNodeName(node);
- if (ServerConfigOptions.HDFS_SERVERS.key().equals(name)) {
- hdfsConfig.setServers(getTextContent(node));
- } else {
- LOGGER.warning("Unrecognized configuration element: " + name);
- }
- }
- return hdfsConfig;
- }
-
- private ConsulConfig parseConsulConfig(Node consulNode) {
- ConsulConfig consulConfig = new ConsulConfig();
- for (Node node : childElements(consulNode)) {
- String name = cleanNodeName(node);
- if (ServerConfigOptions.CONSUL_SERVER_ADDR.key().equals(name)) {
- consulConfig.setServerAddr(getTextContent(node));
- } else if (ServerConfigOptions.CONSUL_SERVER_PORT.key().equals(name)) {
- consulConfig.setServerPort(getIntegerValue(ServerConfigOptions.CONSUL_SERVER_PORT.key(), getTextContent(node)));
- } else if (ServerConfigOptions.CONSUL_TOKEN.key().equals(name)) {
- consulConfig.setToken(getTextContent(node));
- } else {
- LOGGER.warning("Unrecognized configuration element: " + name);
- }
- }
- return consulConfig;
- }
-
- private NacosConfig parseNacosConfig(Node nacosNode) {
- NacosConfig nacosConfig = new NacosConfig();
- for (Node node : childElements(nacosNode)) {
- String name = cleanNodeName(node);
- if (ServerConfigOptions.NACOS_SERVER_ADDR.key().equals(name)) {
- nacosConfig.setServerAddr(getTextContent(node));
- } else if (ServerConfigOptions.NACOS_NAMESPACE.key().equals(name)) {
- nacosConfig.setNamespace(getTextContent(node));
- } else if (ServerConfigOptions.NACOS_GROUP.key().equals(name)) {
- nacosConfig.setGroup(getTextContent(node));
- } else if (ServerConfigOptions.NACOS_DATA_ID.key().equals(name)) {
- nacosConfig.setDataId(getTextContent(node));
- } else if (ServerConfigOptions.NACOS_READ_TIMEOUT.key().equals(name)) {
- nacosConfig.setReadTimeout(getIntegerValue(ServerConfigOptions.NACOS_READ_TIMEOUT.key(), getTextContent(node)));
- } else if (ServerConfigOptions.NACOS_USERNAME.key().equals(name)) {
- nacosConfig.setUserName(getTextContent(node));
- } else if (ServerConfigOptions.NACOS_PASSWORD.key().equals(name)) {
- nacosConfig.setPassword(getTextContent(node));
- } else {
- LOGGER.warning("Unrecognized configuration element: " + name);
- }
- }
- return nacosConfig;
- }
-
-
- private HttpConPoolConfig parseHttpConPoolConfig(Node httpConPoolNode) {
- HttpConPoolConfig httpConPoolConfig = new HttpConPoolConfig();
- for (Node node : childElements(httpConPoolNode)) {
- String name = cleanNodeName(node);
- if (ServerConfigOptions.HTTP_CON_POOL_MAX_TOTAL.key().equals(name)) {
- httpConPoolConfig.setMaxTotal(getIntegerValue(ServerConfigOptions.HTTP_CON_POOL_MAX_TOTAL.key(), getTextContent(node)));
- } else if (ServerConfigOptions.HTTP_CON_POOL_MAX_PER_ROUTE.key().equals(name)) {
- httpConPoolConfig.setMaxPerRoute(getIntegerValue(ServerConfigOptions.HTTP_CON_POOL_MAX_PER_ROUTE.key(), getTextContent(node)));
- } else if (ServerConfigOptions.HTTP_CON_POOL_CONNECTION_REQUEST_TIMEOUT.key().equals(name)) {
- httpConPoolConfig.setConnectionRequestTimeout(getIntegerValue(ServerConfigOptions.HTTP_CON_POOL_CONNECTION_REQUEST_TIMEOUT.key(), getTextContent(node)));
- } else if (ServerConfigOptions.HTTP_CON_POOL_CONNECT_TIMEOUT.key().equals(name)) {
- httpConPoolConfig.setConnectTimeout(getIntegerValue(ServerConfigOptions.HTTP_CON_POOL_CONNECT_TIMEOUT.key(), getTextContent(node)));
- } else if (ServerConfigOptions.HTTP_CON_POOL_SOCKET_TIMEOUT.key().equals(name)) {
- httpConPoolConfig.setSocketTimeout(getIntegerValue(ServerConfigOptions.HTTP_CON_POOL_SOCKET_TIMEOUT.key(), getTextContent(node)));
- } else {
- LOGGER.warning("Unrecognized configuration element: " + name);
- }
- }
- return httpConPoolConfig;
-
- }
-
- private List<KnowledgeConfig> parseKnowledgeBaseConfig(Node knowledgeBaseNode) {
-
- List<KnowledgeConfig> knowledgeConfigList = new ArrayList<>();
- for (Node node : childElements(knowledgeBaseNode)) {
- knowledgeConfigList.add(parseKnowledgeConfig(node));
- }
- return knowledgeConfigList;
- }
-
- private KnowledgeConfig parseKnowledgeConfig(Node asnLookupNode) {
- KnowledgeConfig knowledgeConfig = new KnowledgeConfig();
- for (Node node : childElements(asnLookupNode)) {
- String name = cleanNodeName(node);
-
- if (ServerConfigOptions.KNOWLEDGE_BASE_NAME.key().equals(name)) {
- knowledgeConfig.setName(getTextContent(node));
- } else if (ServerConfigOptions.KNOWLEDGE_BASE_TYPE.key().equals(name)) {
- knowledgeConfig.setType(getTextContent(node));
- } else if (ServerConfigOptions.KNOWLEDGE_BASE_FILES.key().equals(name)) {
- knowledgeConfig.setFiles(parseFilesConfig(node));
- } else if (ServerConfigOptions.KNOWLEDGE_BASE_PROPERTIES.key().equals(name)) {
- knowledgeConfig.setProperties(parseKnowledgePropertiesConfig(node));
- }
- else{
- LOGGER.warning("Unrecognized configuration element: " + name);
- }
- }
- return knowledgeConfig;
- }
-
- private Map<String, String> parseKnowledgePropertiesConfig(Node properties) {
- Map<String, String> propertiesMap = new HashMap<>();
- for (Node node : childElements(properties)) {
- String name = cleanNodeName(node);
- propertiesMap.put(name,getTextContent(node));
- }
- return propertiesMap;
- }
-
- private List<String> parseFilesConfig(Node files) {
-
- List<String> asnLookupConfigList = new ArrayList<>();
- for (Node node : childElements(files)) {
- String value = node.getNodeValue();
- asnLookupConfigList.add(value);
- }
- return asnLookupConfigList;
- }
-
-
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootYamlParser.java b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootYamlParser.java
deleted file mode 100644
index 8aa9915..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootYamlParser.java
+++ /dev/null
@@ -1,117 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import cn.hutool.core.io.IoUtil;
-import cn.hutool.setting.yaml.YamlUtil;
-import lombok.NonNull;
-
-import java.io.InputStream;
-import java.lang.reflect.Method;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public final class GrootYamlParser {
-
- public static String getStringProperty(String key, Map<String, Object> yamlData) {
-
- return getYamlValue(yamlData, key, String.class);
- }
-
- public static Integer getIntProperty(String key, Map<String, Object> yamlData) {
-
- return getYamlValue(yamlData, key, Integer.class);
- }
-
- public static Long getLongProperty(String key, Map<String, Object> yamlData) {
-
- return getYamlValue(yamlData, key, Long.class);
- }
-
- public static <T> T getYamlValue(Map<String, Object> data, String key, Class<T> valueType) {
-
- String[] keys = key.split("\\.");
-
- for (int i = 0; i < keys.length - 1; i++) {
- Object value = data.get(keys[i]);
- // TODO 重复的分支
- if (value == null || !(value instanceof Map)) {
- return null;
- }
- data = (Map<String, Object>) value;
- }
-
- return data.containsKey(keys[keys.length - 1])
- ? valueType.cast(data.get(keys[keys.length - 1]))
- : null;
- }
-
- public static Map<String, String> getClassReflect(List<String> plugins) {
-
- Map<String, String> classReflect = new HashMap<>();
-
- for (String classPath : plugins) {
-
- Class cls = null;
- try {
- cls = Class.forName(classPath);
- Method method = cls.getMethod("functionName");
- Object object = cls.newInstance();
- String result = (String) method.invoke(object);
- classReflect.put(result, classPath);
- System.out.println("Returned Value: " + result);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- return classReflect;
- }
-
- public static Map<String, Object> loadYaml(@NonNull String filePath) {
- return loadYaml(Paths.get(filePath));
- }
- public static Map<String, Object> loadYaml(@NonNull Path filePath) {
- return loadYaml(IoUtil.toStream(filePath.toFile()));
- }
- public static Map<String, Object> loadYaml(InputStream input) {
- return YamlUtil.load(input, Map.class);
- }
-
-
- private static Object getValue(String key, Map<String, Object> data) {
- if (data.containsKey(key)) {
- return data.get(key);
- } else {
- for (Object value : data.values()) {
- if (value instanceof Map) {
- Object result = getValue(key, (Map<String, Object>) value);
- if (result != null) {
- return result;
- }
- }
- }
- }
- return null;
- }
-
- public static List<Object> getList(String key, Map<String, Object> yamlData) {
- Object value = getValue(key, yamlData);
- if (value instanceof List) {
- return (List<Object>) value;
- }
- return null;
- }
-
- public static Map<String, Object> getMap(String key, Map<String, Object> yamlData) {
- Object value = getValue(key, yamlData);
- if (value instanceof Map) {
- return (Map<String, Object>) value;
- }
- return null;
- }
-
- public static void main(String[] args) {
- System.out.println();
- }
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/HdfsConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/HdfsConfig.java
deleted file mode 100644
index 1323f54..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/HdfsConfig.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-public class HdfsConfig implements Serializable {
- private String servers = ServerConfigOptions.HDFS_SERVERS.defaultValue();
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/HosConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/HosConfig.java
deleted file mode 100644
index 903d6cd..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/HosConfig.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-@Data
-public class HosConfig implements Serializable {
- public String token = ServerConfigOptions.HOS_TOKEN.defaultValue();
-
- public void setToken(String token) {
- checkNotNull(token, ServerConfigOptions.HOS_TOKEN + "token should not be null");
- this.token = token;
- }
-
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/HttpConPoolConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/HttpConPoolConfig.java
deleted file mode 100644
index c0a0e74..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/HttpConPoolConfig.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-@Data
-public class HttpConPoolConfig implements Serializable {
-
- public static final long MIN_TIMEOUT_MS = 10;
- public int maxTotal = ServerConfigOptions.HTTP_CON_POOL_MAX_TOTAL.defaultValue();
- public int maxPerRoute = ServerConfigOptions.HTTP_CON_POOL_MAX_PER_ROUTE.defaultValue();
- public int connectionRequestTimeout = ServerConfigOptions.HTTP_CON_POOL_CONNECTION_REQUEST_TIMEOUT.defaultValue();
- public int connectTimeout = ServerConfigOptions.HTTP_CON_POOL_CONNECT_TIMEOUT.defaultValue();
- public int socketTimeout = ServerConfigOptions.HTTP_CON_POOL_SOCKET_TIMEOUT.defaultValue();
-
- public void setSocketTimeout(int socketTimeout) {
- checkArgument(socketTimeout > MIN_TIMEOUT_MS, ServerConfigOptions.HTTP_CON_POOL_SOCKET_TIMEOUT + "socketTimeout should be greater than " + MIN_TIMEOUT_MS);
- this.socketTimeout = socketTimeout;
- }
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java
new file mode 100644
index 0000000..c02df61
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java
@@ -0,0 +1,34 @@
+package com.geedgenetworks.common.config;
+
+import com.geedgenetworks.utils.StringUtil;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+@Data
+public class KnowledgeBaseConfig implements Serializable {
+ private String name = CommonConfigOptions.KNOWLEDGE_BASE_NAME.defaultValue();
+ private String type = CommonConfigOptions.KNOWLEDGE_BASE_TYPE.defaultValue();
+ private Map<String, String> properties = CommonConfigOptions.KNOWLEDGE_BASE_PROPERTIES.defaultValue();
+ private List<String> files = CommonConfigOptions.KNOWLEDGE_BASE_FILES.defaultValue();
+
+ public void setType(String type) {
+ checkArgument(StringUtil.isNotBlank(type) && Arrays.asList("asnlookup", "geoiplookup").contains(type.toLowerCase())
+ , CommonConfigOptions.KNOWLEDGE_BASE_TYPE + "Type should be asnlookup or geoiplookup");
+ this.type = type;
+ }
+
+ public void setFiles(List<String> files) {
+ checkArgument(files != null && files.size() > 0,
+ CommonConfigOptions.KNOWLEDGE_BASE_FILES + "files should not be null");
+ this.files = files;
+ }
+
+
+
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeConfig.java
deleted file mode 100644
index cc407e4..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeConfig.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import lombok.Data;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-@Data
-public class KnowledgeConfig implements Serializable {
- private String name = ServerConfigOptions.KNOWLEDGE_BASE_NAME.defaultValue();
- private String type = ServerConfigOptions.KNOWLEDGE_BASE_TYPE.defaultValue();
- private Map<String, String> properties = ServerConfigOptions.KNOWLEDGE_BASE_PROPERTIES.defaultValue();
- private List<String> files = ServerConfigOptions.KNOWLEDGE_BASE_FILES.defaultValue();
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/NacosConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/NacosConfig.java
deleted file mode 100644
index 2c1f7e7..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/NacosConfig.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-@Data
-public class NacosConfig implements Serializable {
- private String serverAddr = ServerConfigOptions.NACOS_SERVER_ADDR.defaultValue();
- private String userName = ServerConfigOptions.NACOS_USERNAME.defaultValue();
- private String password = ServerConfigOptions.NACOS_PASSWORD.defaultValue();
- private String namespace = ServerConfigOptions.NACOS_NAMESPACE.defaultValue();
- private String dataId = ServerConfigOptions.NACOS_DATA_ID.defaultValue();
- private String group = ServerConfigOptions.NACOS_GROUP.defaultValue();
- private int readTimeout = ServerConfigOptions.NACOS_READ_TIMEOUT.defaultValue();
-
- public void setReadTimeout(int readTimeout) {
- checkArgument(readTimeout > 0, ServerConfigOptions.NACOS_READ_TIMEOUT + "readTimeout should be greater than 0");
- this.readTimeout = readTimeout;
- }
-
- public void setServerAddr(String serverAddr) {
- checkNotNull(serverAddr, ServerConfigOptions.NACOS_SERVER_ADDR + "serverAddr should not be null");
- this.serverAddr = serverAddr;
- }
-
- public void setDataId(String dataId) {
- checkNotNull(dataId, ServerConfigOptions.NACOS_DATA_ID + "dataId should not be null");
- this.dataId = dataId;
- }
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java
deleted file mode 100644
index 2c03b21..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java
+++ /dev/null
@@ -1,206 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import com.alibaba.fastjson2.TypeReference;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class ServerConfigOptions {
-
-
- public static final Option<Integer> HTTP_CON_POOL_MAX_TOTAL =
- Options.key("max_total")
- .intType()
- .defaultValue(100)
- .withDescription("The max connections of http con pool.");
-
- public static final Option<Integer> HTTP_CON_POOL_MAX_PER_ROUTE =
- Options.key("max_per_route")
- .intType()
- .defaultValue(10)
- .withDescription("The max per route of http con pool.");
-
- public static final Option<Integer> HTTP_CON_POOL_CONNECTION_REQUEST_TIMEOUT =
- Options.key("connection_request_timeout")
- .intType()
- .defaultValue(10000)
- .withDescription("The connection request timeout of http con pool.");
- public static final Option<Integer> HTTP_CON_POOL_CONNECT_TIMEOUT =
- Options.key("connect_timeout")
- .intType()
- .defaultValue(10000)
- .withDescription("The connect timeout of http con pool.");
-
- public static final Option<Integer> HTTP_CON_POOL_SOCKET_TIMEOUT =
- Options.key("socket_timeout")
- .intType()
- .defaultValue(60000)
- .withDescription("The socket timeout of http con pool.");
-
- public static final Option<HttpConPoolConfig> HTTP_CON_POOL = Options.key("http_con_pool")
- .type(new TypeReference<HttpConPoolConfig>() {})
- .defaultValue(new HttpConPoolConfig())
- .withDescription("The http con pool configuration.");
-
- public static final Option<Map<String, String>> KNOWLEDGE_BASE_PROPERTIES =
- Options.key("properties")
- .mapType()
- .defaultValue(new HashMap<String,String>())
- .withDescription("The properties of knowledgebase");
- public static final Option<String> KNOWLEDGE_BASE_NAME =
- Options.key("name")
- .stringType()
- .defaultValue("")
- .withDescription("The name of knowledgebase.");
- public static final Option<String> KNOWLEDGE_BASE_TYPE =
- Options.key("type")
- .stringType()
- .defaultValue("")
- .withDescription("The type of knowledgebase.");
-
- public static final Option<List<String>> KNOWLEDGE_BASE_FILES =
- Options.key("files")
- .listType()
- .defaultValue(new ArrayList<>())
- .withDescription("The files of knowledgebase.");
-
-
- public static final Option<String> KNOWLEDGE_BASE_STORAGE_FS_TYPE = Options.key("fs_type")
- .stringType()
- .defaultValue("localfile")
- .withDescription("The fs type of knowledge base storage.");
-
- public static final Option<String> KNOWLEDGE_BASE_STORAGE_FS_DEFAULT_PATH = Options.key("fs_default_path")
- .stringType()
- .defaultValue("")
- .withDescription("The default path of knowledge base storage.");
-
-
-
- public static final Option<List<KnowledgeConfig>> KNOWLEDGE_BASE =
- Options.key("knowledge_base")
- .type(new TypeReference<List<KnowledgeConfig>>() {})
- .defaultValue(new ArrayList<>())
- .withDescription("The knowledge base configuration.");
-
- public static final Option<String> SNOWFLAKE_DATA_CENTER_ID =
- Options.key("data_center_id_num")
- .stringType()
- .defaultValue("")
- .withDescription("The data center id of snowflake.");
-
- public static final Option<SnowflakeConfig> SNOWFLAKE =
- Options.key("snowflake")
- .type(new TypeReference<SnowflakeConfig>() {})
- .defaultValue(new SnowflakeConfig())
- .withDescription("The snowflake configuration.");
-
-
- public static final Option<String> NACOS_SERVER_ADDR =
- Options.key("server_addr")
- .stringType()
- .defaultValue("")
- .withDescription("The server address of nacos.");
- public static final Option<String> NACOS_USERNAME =
- Options.key("username")
- .stringType()
- .defaultValue("nacos")
- .withDescription("The username of nacos.");
- public static final Option<String> NACOS_PASSWORD =
- Options.key("password")
- .stringType()
- .defaultValue("nacos")
- .withDescription("The password of nacos.");
- public static final Option<String> NACOS_NAMESPACE =
- Options.key("namespace")
- .stringType()
- .defaultValue("public")
- .withDescription("The namespace of nacos.");
- public static final Option<String> NACOS_DATA_ID =
- Options.key("data_id")
- .stringType()
- .defaultValue("")
- .withDescription("The data id of nacos.");
- public static final Option<String> NACOS_GROUP =
- Options.key("group")
- .stringType()
- .defaultValue("DEFAULT_GROUP")
- .withDescription("The group of nacos.");
- public static final Option<Integer> NACOS_READ_TIMEOUT =
- Options.key("read_timeout")
- .intType()
- .defaultValue(5000)
- .withDescription("The read timeout of nacos.");
-
- public static final Option<NacosConfig> NACOS =
- Options.key("nacos")
- .type(new TypeReference<NacosConfig>() {})
- .defaultValue(new NacosConfig())
- .withDescription("The nacos configuration.");
-
- public static final Option<String> CONSUL_SERVER_ADDR = Options.key("server_addr")
- .stringType()
- .defaultValue("")
- .withDescription("The server address of consul.");
- public static final Option<Integer> CONSUL_SERVER_PORT =
- Options.key("server_port")
- .intType()
- .defaultValue(8500)
- .withDescription("The server port of consul.");
- public static final Option<String> CONSUL_TOKEN =
- Options.key("token")
- .stringType()
- .defaultValue("")
- .withDescription("The token of consul.");
-
- public static final Option<ConsulConfig> CONSUL =
- Options.key("consul")
- .type(new TypeReference<ConsulConfig>() {})
- .defaultValue(new ConsulConfig())
- .withDescription("The consul configuration.");
-
- public static final Option<String> HOS_TOKEN =
- Options.key("token")
- .stringType()
- .defaultValue("")
- .withDescription("The token of hos.");
-
- public static final Option<HosConfig> HOS =
- Options.key("hos")
- .type(new TypeReference<HosConfig>() {})
- .defaultValue(new HosConfig())
- .withDescription("The hos configuration.");
-
-
- public static final Option<String> ZOOKEEPER_QUORUM =
- Options.key("quorum")
- .stringType()
- .defaultValue("")
- .withDescription("The quorum of zookeeper.");
-
- public static final Option<ZookeeperConfig> ZOOKEEPER =
- Options.key("zookeeper")
- .type(new TypeReference<ZookeeperConfig>() {})
- .defaultValue(new ZookeeperConfig())
- .withDescription("The zookeeper configuration.");
-
- public static final Option<String> HDFS_SERVERS =
- Options.key("servers")
- .stringType()
- .defaultValue("")
- .withDescription("The servers of hdfs.");
-
- public static final Option<HdfsConfig> HDFS = Options.key("hdfs")
- .type(new TypeReference<HdfsConfig>() {})
- .defaultValue(new HdfsConfig())
- .withDescription("The hdfs configuration.");
-
- public static final Option<Map<String, String>> PROPERTIES =
- Options.key("properties")
- .mapType()
- .defaultValue(new HashMap<String,String>())
- .withDescription("The properties of grootstream");
-
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/SnowflakeConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SnowflakeConfig.java
deleted file mode 100644
index 59e84b6..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/SnowflakeConfig.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-public class SnowflakeConfig implements Serializable {
- private String dataCenterId = ServerConfigOptions.SNOWFLAKE_DATA_CENTER_ID.defaultValue();
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/StorageConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/StorageConfig.java
deleted file mode 100644
index 64ef74c..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/StorageConfig.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import com.geedgenetworks.utils.StringUtil;
-import lombok.Data;
-
-import java.io.Serializable;
-import java.util.Arrays;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-@Data
-public class StorageConfig implements Serializable {
- private String fsType = ServerConfigOptions.KNOWLEDGE_BASE_STORAGE_FS_TYPE.defaultValue();
- private String fsDefaultPath = ServerConfigOptions.KNOWLEDGE_BASE_STORAGE_FS_DEFAULT_PATH.defaultValue();
-
- public void setFsType(String fsType) {
- checkArgument(StringUtil.isNotBlank(fsType) && Arrays.asList("hdfs", "localfile").contains(fsType.toLowerCase())
- , ServerConfigOptions.KNOWLEDGE_BASE_STORAGE_FS_TYPE + "fsType should be hdfs or localfile");
- this.fsType = fsType;
- }
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/TypesafeConfigUtils.java b/groot-common/src/main/java/com/geedgenetworks/common/config/TypesafeConfigUtils.java
index a8a6fa5..59c6088 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/TypesafeConfigUtils.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/TypesafeConfigUtils.java
@@ -20,7 +20,7 @@ public final class TypesafeConfigUtils {
* @param source config source
* @param prefix config prefix
* @param keepPrefix true if keep prefix
- * @deprecated use org.apache.seatunnel.api.configuration.Option interface instead
+ * @deprecated
*/
@Deprecated
public static Config extractSubConfig(Config source, String prefix, boolean keepPrefix) {
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ZookeeperConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ZookeeperConfig.java
deleted file mode 100644
index 77c8be7..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/ZookeeperConfig.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import java.io.Serializable;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-public class ZookeeperConfig implements Serializable {
- private String quorum = ServerConfigOptions.ZOOKEEPER_QUORUM.defaultValue();
-
- public void setQuorum(String quorum) {
- checkNotNull(quorum, ServerConfigOptions.ZOOKEEPER_QUORUM + "quorum should not be null");
- this.quorum = quorum;
- }
-
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/HttpClientUtils.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/HttpClientUtils.java
index 00c06e4..d8eb070 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/utils/HttpClientUtils.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/HttpClientUtils.java
@@ -1,6 +1,7 @@
package com.geedgenetworks.common.utils;
-import com.geedgenetworks.common.config.CommonConfig;
+import com.geedgenetworks.common.config.Option;
+import com.geedgenetworks.common.config.Options;
import com.geedgenetworks.utils.StringUtil;
import org.apache.commons.io.IOUtils;
import org.apache.http.*;
@@ -11,7 +12,6 @@ import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
-import org.apache.http.client.utils.URIBuilder;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ConnectTimeoutException;
@@ -41,7 +41,6 @@ import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.X509Certificate;
-import java.util.Map;
public class HttpClientUtils {
/** 全局连接池对象 */
@@ -51,7 +50,13 @@ public class HttpClientUtils {
private final Logger logger = LoggerFactory.getLogger(HttpClientUtils.class);
- public static final String ERROR_MESSAGE = "-1";
+ private static final String ERROR_MESSAGE = "-1";
+
+ private static final int DEFAULT_MAX_TOTAL = 400;
+ private static final int DEFAULT_MAX_PER_ROUTE = 10;
+ private static final int DEFAULT_CONNECTION_REQUEST_TIMEOUT = 10000;
+ private static final int DEFAULT_CONNECT_TIMEOUT = 10000;
+ private static final int DEFAULT_SOCKET_TIMEOUT = 60000;
/*
* 静态代码块配置连接池信息
@@ -59,9 +64,9 @@ public class HttpClientUtils {
static {
// 设置最大连接数
- CONN_MANAGER.setMaxTotal(CommonConfig.HTTP_POOL_MAX_CONNECTION);
+ CONN_MANAGER.setMaxTotal(DEFAULT_MAX_TOTAL);
// 设置每个连接的路由数
- CONN_MANAGER.setDefaultMaxPerRoute(CommonConfig.HTTP_POOL_MAX_PER_ROUTE);
+ CONN_MANAGER.setDefaultMaxPerRoute(DEFAULT_MAX_PER_ROUTE);
}
/**
@@ -98,9 +103,9 @@ public class HttpClientUtils {
PoolingHttpClientConnectionManager connManager =
new PoolingHttpClientConnectionManager(socketFactoryRegistry);
// 设置最大连接数
- connManager.setMaxTotal(CommonConfig.HTTP_POOL_MAX_CONNECTION);
+ connManager.setMaxTotal(DEFAULT_MAX_TOTAL);
// 设置每个连接的路由数
- connManager.setDefaultMaxPerRoute(CommonConfig.HTTP_POOL_MAX_PER_ROUTE);
+ connManager.setDefaultMaxPerRoute(DEFAULT_MAX_PER_ROUTE);
return connManager;
} catch (KeyManagementException | NoSuchAlgorithmException e) {
throw new RuntimeException(e.getMessage());
@@ -117,11 +122,11 @@ public class HttpClientUtils {
RequestConfig requestConfig =
RequestConfig.custom()
// 获取连接超时时间
- .setConnectionRequestTimeout(CommonConfig.HTTP_POOL_REQUEST_TIMEOUT)
+ .setConnectionRequestTimeout(DEFAULT_CONNECTION_REQUEST_TIMEOUT)
// 请求超时时间
- .setConnectTimeout(CommonConfig.HTTP_POOL_CONNECT_TIMEOUT)
+ .setConnectTimeout(DEFAULT_CONNECT_TIMEOUT)
// 响应超时时间
- .setSocketTimeout(CommonConfig.HTTP_POOL_RESPONSE_TIMEOUT)
+ .setSocketTimeout(DEFAULT_SOCKET_TIMEOUT)
.build();
/*
@@ -199,7 +204,7 @@ public class HttpClientUtils {
String msg = ERROR_MESSAGE;
// 获取客户端连接对象
- CloseableHttpClient httpClient = getHttpClient(CommonConfig.HTTP_POOL_RESPONSE_TIMEOUT);
+ CloseableHttpClient httpClient = getHttpClient(DEFAULT_SOCKET_TIMEOUT);
CloseableHttpResponse response = null;
try {
@@ -254,7 +259,7 @@ public class HttpClientUtils {
public String httpPost(URI uri, String requestBody, Header... headers) {
String msg = ERROR_MESSAGE;
// 获取客户端连接对象
- CloseableHttpClient httpClient = getHttpClient(CommonConfig.HTTP_POOL_RESPONSE_TIMEOUT);
+ CloseableHttpClient httpClient = getHttpClient(DEFAULT_SOCKET_TIMEOUT);
// 创建POST请求对象
CloseableHttpResponse response = null;
diff --git a/groot-common/src/main/resources/groot.yaml b/groot-common/src/main/resources/groot.yaml
deleted file mode 100644
index a5bc3f9..0000000
--- a/groot-common/src/main/resources/groot.yaml
+++ /dev/null
@@ -1,74 +0,0 @@
-
-nacos:
- server_addr: 192.168.44.12:8848
- username: nacos
- password: nacos
- namespace: test
- data_id: knowledge_base.json
- group: DEFAULT_GROUP
- read_timeout: 5000
-
-consul:
- server_addr: 192.168.41.30
- server_port: 8500
- token: c989b503-1d74-f49c-5698-e90fe461e938
-
-hos:
- token: c21f969b5f03d33d43e04f8f136e7682
-
-
-knowledge:
- file_check_number: 3
- # iplookup:
-# TSG:
-# ip_v4_user_defined_id: acf1db8589c5e277-2d8cb5cf98593ed8
- # ip_v6_user_defined_id: acf1db8589c5e277-a442cce2d726eddf
- # ip_v4_built_in_id: acf1db8589c5e277-42014a5735551e3e
- # ip_v6_built_in_id: acf1db8589c5e277-8a9a0362c1320877
-# CN:
- # ip_v4_user_defined_id: acf1db8589c5e277-2d8cb5cf98593ed8
- # ip_v6_user_defined_id: acf1db8589c5e277-a442cce2d726eddf
- # ip_v4_built_in_id: acf1db8589c5e277-10798bc00aed2135
- # ip_v6_built_in_id: acf1db8589c5e277-8a9a0362c1320877
- # asn_v4_id: 7ce2f9890950ba90-fcc25696bf11a8a0
- # asn_v6_id: 7ce2f9890950ba90-71f13b3736863ddb
- asnlookup:
- TSG:
- #asn_v4_id: 7ce2f9890950ba90-fe9081f96d1d8ca8
- #asn_v6_id: 7ce2f9890950ba90-c901e886e2d89a30
- asn_v4_id: 7ce2f9890950ba90-fcc25696bf11a8a0
- asn_v6_id: 7ce2f9890950ba90-71f13b3736863ddb
- # CN:
- # ip_v4_user_defined_id: acf1db8589c5e277-2d8cb5cf98593ed8
- # ip_v6_user_defined_id: acf1db8589c5e277-a442cce2d726eddf
- # ip_v4_built_in_id: acf1db8589c5e277-10798bc00aed2135
-# ip_v6_built_in_id: acf1db8589c5e277-8a9a0362c1320877
-# asn_v4_id: 7ce2f9890950ba90-fcc25696bf11a8a0
-# asn_v6_id: 7ce2f9890950ba90-71f13b3736863ddb
-http:
- pool:
- max_connection: 400
- max_per_route: 60
- request_timeout: 60000
- connect_timeout: 60000
- response_timeout: 60000
-
-snowid:
- data_center_id_num: 1
-zookeeper:
- quorum: 192.168.44.12:2181
-
-knowledgebase:
- file_storage_path: /knowledgebase/ETL-SESSION-RECORD-COMPLETED/
- file_storage_type: hdfs
-hdfs:
- servers: 192.168.44.11:9000,192.168.44.14:9000
-hbase:
- tick_tuple_freq_secs:
- gtpc_scan_max_rows: 10000
- radius_scan_max_rows: 10000
- radius_table_name: RADIUS-TABLE
- gtpc_table_name: GTPC-TABLE
- rpc_timeout:
-data:
- relationship_model: \ No newline at end of file
diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins
index d22c057..07867f6 100644
--- a/groot-common/src/main/resources/udf.plugins
+++ b/groot-common/src/main/resources/udf.plugins
@@ -3,9 +3,9 @@ com.geedgenetworks.core.udf.Drop
com.geedgenetworks.core.udf.AsnLookup
com.geedgenetworks.core.udf.Eval
com.geedgenetworks.core.udf.JsonExtract
-com.geedgenetworks.core.udf.UnixTimestamp
+com.geedgenetworks.core.udf.CurrentUnixTimestamp
com.geedgenetworks.core.udf.Domain
com.geedgenetworks.core.udf.DecodeBase64
com.geedgenetworks.core.udf.GeoIpLookup
com.geedgenetworks.core.udf.PathCombine
-com.geedgenetworks.core.udf.UnixTimestampConverter
+com.geedgenetworks.core.udf.UnixTimestampConverter \ No newline at end of file
diff --git a/groot-common/src/test/java/com/geedgenetworks/common/config/YamlGrootStreamConfigParserTest.java b/groot-common/src/test/java/com/geedgenetworks/common/config/YamlGrootStreamConfigParserTest.java
index 6e42ae4..9a122ed 100644
--- a/groot-common/src/test/java/com/geedgenetworks/common/config/YamlGrootStreamConfigParserTest.java
+++ b/groot-common/src/test/java/com/geedgenetworks/common/config/YamlGrootStreamConfigParserTest.java
@@ -3,23 +3,25 @@ package com.geedgenetworks.common.config;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
+
public class YamlGrootStreamConfigParserTest {
@Test
public void testGrootStreamConfig() {
- GrootStreamConfigLocator yamlConfigLocator = new GrootStreamConfigLocator();
+ CommonConfigLocator commonConfigLocator = new CommonConfigLocator();
GrootStreamConfig config;
- if (yamlConfigLocator.locateInWorkDirOrOnClasspath()) {
+ if (commonConfigLocator.locateInWorkDirOrOnClasspath()) {
// 2. Try loading YAML config from the working directory or from the classpath
- config = new GrootStreamConfigBuilder(yamlConfigLocator).setProperties(null).build();
+ config = new GrootStreamConfigBuilder(commonConfigLocator).build();
} else {
throw new RuntimeException("can't find yaml in resources");
}
Assertions.assertNotNull(config);
- Assertions.assertNotNull(config.getEngineConfig().getNacosConfig().getServerAddr());
- Assertions.assertTrue(config.getEngineConfig().getHttpConPoolConfig().getSocketTimeout() > 0);
- Assertions.assertNotNull(config.getEngineConfig().getKnowledgeBaseConfig());
- // Assertions.assertEquals("hos", config.getEngineConfig().getKnowledgeBaseConfig().get(0).getProperties().get("fs_type"));
+ Assertions.assertNotNull(config.getCommonConfig().getKnowledgeBaseConfig());
+ Assertions.assertTrue(config.getCommonConfig().getKnowledgeBaseConfig().size() > 0);
+ Assertions.assertTrue(Arrays.asList("asnlookup", "geoiplookup").contains(config.getCommonConfig().getKnowledgeBaseConfig().get(0).getType()));
+ Assertions.assertTrue(config.getUDFPluginConfig().size() > 0);
}
diff --git a/groot-common/src/test/resources/grootstream.yaml b/groot-common/src/test/resources/grootstream.yaml
index c233936..4aa7ef9 100644
--- a/groot-common/src/test/resources/grootstream.yaml
+++ b/groot-common/src/test/resources/grootstream.yaml
@@ -1,51 +1,34 @@
grootstream:
- engine:
- tick_tuple_freq_secs: 90
- gtpc_scan_max_rows: 10000
- radius_scan_max_rows: 10000
- radius_table_name: RADIUS-TABLE
- gtpc_table_name: GTPC-TABLE
- hbase_rpc_timeout: 60000
- http_con_pool:
- max_total: 400
- max_per_route: 60
- connection_request_timeout: 60000
- connect_timeout: 60000
- socket_timeout: 60000
- knowledge_base:
- - name: tsg_asnlookup
- type: asnlookup
- properties:
- fs_type: hos
- fs_default_path: http://path
- files:
- default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0
- user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb
- - name: CN_asnlookup
- type: asnlookup
- files:
- default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0
- user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb
- snowflake:
- data_center_id_num: 1
- nacos:
- server_addr: 192.168.44.12:8848
- username: nacos
- password: nacos
- namespace: test
- data_id: knowledge_base.json
- group: DEFAULT_GROUP
- read_timeout: 5000
- consul:
- server_addr: 192.168.41.30
- server_port: 8500
- token: c989b503-1d74-f49c-5698-e90fe461e938
- hos:
- token: c21f969b5f03d33d43e04f8f136e7682
- zookeeper:
- quorum: 192.168.44.12:2181
- hdfs:
- servers: 192.168.44.11:9000,192.168.44.14:9000
+ knowledge_base:
+ - name: tsg_asnlookup
+ type: asnlookup
+ properties:
+ fs_type: hos
+ fs_default_path: http://path
+ files:
+ - http://192.168.44.12:9098/hos/knowledge_base_bucket/757732ce-8214-4c34-aea2-aca6c51a7e82-YXNuX2J1aWx0aW4=.mmdb
+
+ - name: tsg_geoiplookup
+ type: geoiplookup
+ files:
+ - 7ce2f9890950ba90-fcc25696bf11a8a0
+ - 7ce2f9890950ba90-71f13b3736863ddb
+ properties:
+ hos.path: http://192.168.44.12:8089
+ hos.bucket.name.traffic_file: traffic_file_bucket
+ hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
index e2607a2..07d2f72 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
@@ -280,6 +280,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
} catch (SQLException e) {
LOG.error("ClickHouseBatchInsertFail url:" + url, e);
if (retryCount >= 3) {
+ LOG.error("ClickHouseBatchInsertFinalFail for rowCnt:" + rowCnt);
// throw e;
return;
}
@@ -290,6 +291,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
} catch (SQLException e) {
LOG.error("ClickHouseBatchInsertFail url:" + url, e);
if (retryCount >= 3) {
+ LOG.error("ClickHouseBatchInsertFinalFail for rowCnt:" + rowCnt);
//throw e;
if (connection != null) {
connection.close();
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
index 26e9ce4..b4cf0bb 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
@@ -1,5 +1,6 @@
package com.geedgenetworks.connectors.clickhouse.sink;
+import com.alibaba.fastjson2.JSON;
import com.geedgenetworks.core.pojo.Event;
import com.github.housepower.data.Block;
@@ -28,7 +29,11 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick
// int columnIdx = batch.paramIdx2ColumnIdx(i);
// batch.setObject(columnIdx, convertToCkDataType(columnTypes[i], value));
// batch.setObject(i, convertToCkDataType(dataType, value));
- batch.setObject(i, columnConverters[i].convert(value));
+ try {
+ batch.setObject(i, columnConverters[i].convert(value));
+ } catch (Exception e) {
+ throw new RuntimeException(columnNames[i] + "列转换值出错:" + value + ", event data:" + JSON.toJSONString(map), e);
+ }
}
}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java
index 328843b..423a45f 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java
@@ -37,8 +37,10 @@ public class EventKafkaDeserializationSchema implements KafkaDeserializationSche
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<Event> out) throws Exception {
try {
Event event = valueDeserialization.deserialize(record.value());
- event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, record.timestamp());
- out.collect(event);
+ if(event != null){
+ event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, record.timestamp());
+ out.collect(event);
+ }
}catch (Exception e) {
LOG.error("反序列化失败", e);
}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/sink/KafkaSink.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/sink/KafkaSink.java
deleted file mode 100644
index c07c7a6..0000000
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/sink/KafkaSink.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package com.geedgenetworks.connectors.kafka.sink;
-
-import com.geedgenetworks.connectors.kafka.util.KafkaUtils;
-import com.geedgenetworks.core.pojo.Event;
-import com.geedgenetworks.core.pojo.SinkConfigOld;
-import com.geedgenetworks.core.sink.Sink;
-import com.geedgenetworks.common.utils.ColumnUtil;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-
-import com.alibaba.fastjson2.JSON;
-
-public class KafkaSink implements Sink {
-
- @Override
- public void sink(
- SingleOutputStreamOperator<Event> streamlineEventSingleOutputStreamOperator,
- SinkConfigOld sinkConfig)
- throws Exception {
- if (sinkConfig.getParallelism() != 0) {
- streamlineEventSingleOutputStreamOperator
- .map(
- new MapFunction<Event, String>() {
- @Override
- public String map(Event value) throws Exception {
- if (sinkConfig.getOutput_fields() != null
- && sinkConfig.getOutput_fields().size() > 0) {
- value.setExtractedFields(
- ColumnUtil.columnSelector(
- value.getExtractedFields(),
- sinkConfig.getOutput_fields()));
- }
- String jsonString =
- JSON.toJSONString(value.getExtractedFields());
- return jsonString;
- }
- })
- .setParallelism(sinkConfig.getParallelism())
- .addSink(KafkaUtils.getKafkaSink(sinkConfig.getProperties()))
- .setParallelism(sinkConfig.getParallelism())
- .name(sinkConfig.getName());
- } else {
-
- streamlineEventSingleOutputStreamOperator
- .map(
- new MapFunction<Event, String>() {
- @Override
- public String map(Event value) throws Exception {
- if (sinkConfig.getOutput_fields() != null
- && sinkConfig.getOutput_fields().size() > 0) {
- value.setExtractedFields(
- ColumnUtil.columnSelector(
- value.getExtractedFields(),
- sinkConfig.getOutput_fields()));
- }
- String jsonString =
- JSON.toJSONString(value.getExtractedFields());
- return jsonString;
- }
- })
- .addSink(KafkaUtils.getKafkaSink(sinkConfig.getProperties()))
- .name(sinkConfig.getName());
- }
- }
-}
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/source/KafkaSource.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/source/KafkaSource.java
deleted file mode 100644
index 9cd7bc2..0000000
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/source/KafkaSource.java
+++ /dev/null
@@ -1,102 +0,0 @@
-package com.geedgenetworks.connectors.kafka.source;
-
-import com.geedgenetworks.connectors.kafka.util.KafkaUtils;
-import com.geedgenetworks.core.pojo.Event;
-import com.geedgenetworks.core.pojo.SourceConfigOld;
-import com.geedgenetworks.core.source.Source;
-import com.geedgenetworks.common.utils.ColumnUtil;
-
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson2.JSON;
-
-import java.time.Duration;
-import java.util.Map;
-
-public class KafkaSource implements Source {
-
- private static final Log logger = LogFactory.get();
-
- @Override
- public SingleOutputStreamOperator<Event> source(
- StreamExecutionEnvironment env, SourceConfigOld sourceConfig) throws Exception {
- DataStreamSource<Tuple2<String, Long>> sourceForSession =
- env.addSource(
- KafkaUtils.timestampDeserializationConsumer(sourceConfig.getProperties()));
- if (sourceConfig.getParallelism() != 0) {
- sourceForSession.setParallelism(sourceConfig.getParallelism());
- }
- SingleOutputStreamOperator<Event> streamEvent;
- streamEvent =
- sourceForSession.map(
- new MapFunction<Tuple2<String, Long>, Event>() {
- @Override
- public Event map(Tuple2<String, Long> message) {
-
- // FIXME TimestampDeserializationSchema 中返回了 Tuple2<>(null, null) 但此处没有判空
- Event event = new Event();
- try {
- // FIXME message_timestamp_field 被当做一个非常特殊的字段统一处理为
- // kafka message timestamp,但如果 json 字段中本来就存在该字段当如何 ?
- Map<String, Object> values = JSON.parseObject(message.f0);
- //event.set__timestamp(message.f1);
- // FIXME 下面的逻辑不具备带来不可预知错误的情况,不应该被包裹在 try 中
- values.put(Event.INTERNAL_TIMESTAMP_KEY, message.f1);
- if (sourceConfig
- .getProperties()
- .containsKey("message_timestamp_field")) {
- values.put(
- sourceConfig
- .getProperties()
- .get("message_timestamp_field")
- .toString(),
- message.f1);
- }
- if (sourceConfig.getOutput_fields() != null
- && sourceConfig.getOutput_fields().size() > 0) {
- values =
- ColumnUtil.columnSelector(
- values, sourceConfig.getOutput_fields());
- }
- event.setExtractedFields(values);
- return event;
- } catch (Exception e) {
- logger.error(e.toString());
- return event;
- // FIXME 当数据序列化发生异常,仍然返回了一个正常的数据!
- // 虽然它大概率是一个空数据,但是否存在极端场景会影响业务?
- // 如:在一个入库操作中插入一行全空数据,而数据库非空约束导致程序异常。
- }
- }
- });
-
- // FIXME MAP 算子不会影响并行度
- if (sourceConfig.getParallelism() != 0) {
- streamEvent.setParallelism(sourceConfig.getParallelism());
- }
- if (sourceConfig.getWatermark_lag() != null && sourceConfig.getWatermark_lag() > 0L) {
- WatermarkStrategy<Event> strategyForSession =
- WatermarkStrategy.<Event>forBoundedOutOfOrderness(
- Duration.ofMillis(sourceConfig.getWatermark_lag()))
- .withTimestampAssigner(
- (StreamlineEvent, timestamp) ->
- Integer.parseInt(
- String.valueOf(
- StreamlineEvent
- .getExtractedFields()
- .get(
- sourceConfig
- .getWatermark_timestamp())))
- * 1000);
- streamEvent.assignTimestampsAndWatermarks(strategyForSession);
- }
- return streamEvent;
- }
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java
index f268689..e62310d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.core.pojo;
-import com.geedgenetworks.common.config.KnowledgeConfig;
+import com.geedgenetworks.common.config.KnowledgeBaseConfig;
import com.geedgenetworks.core.utils.KnowlegdeBase.AbstractKnowledgeBase;
import lombok.Data;
@@ -8,7 +8,7 @@ import java.util.List;
@Data
public class KnowledgeBaseEntity {
- private KnowledgeConfig knowledgeConfig;
+ private KnowledgeBaseConfig knowledgeConfig;
private List<KnowLedgeFileEntity> knowLedgeFileEntityList;
private AbstractKnowledgeBase abstractKnowledgeBase;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/types/BooleanType.java b/groot-core/src/main/java/com/geedgenetworks/core/types/BooleanType.java
new file mode 100644
index 0000000..c19df1d
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/types/BooleanType.java
@@ -0,0 +1,8 @@
+package com.geedgenetworks.core.types;
+
+public class BooleanType extends DataType{
+ @Override
+ public String simpleString() {
+ return "boolean";
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
index 5f6f1d7..881f356 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
@@ -1,38 +1,27 @@
package com.geedgenetworks.core.udf;
-import com.alibaba.fastjson.JSON;
-import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.EngineConfig;
-import com.geedgenetworks.common.config.KnowledgeConfig;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.UDFContext;
import com.geedgenetworks.core.utils.KnowlegdeBase.AsnKnowledgeBase;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseUpdateJob;
-import com.geedgenetworks.utils.IpLookupV2;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.quartz.SchedulerException;
-
-import static com.geedgenetworks.core.utils.SchedulerUtils.shutdownScheduler;
@Slf4j
public class AsnLookup implements UDF {
- private UDFContext udfContext;
private String vender;
private String option;
+ private String lookupFieldName;
+ private String outputFieldName;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
checkUdfContext(udfContext);
- this.udfContext = udfContext;
this.vender = udfContext.getParameters().get("vendor_id").toString();
this.option = udfContext.getParameters().get("option").toString();
KnowledgeBaseUpdateJob.initKnowledgeBase(vender,AsnKnowledgeBase.getInstance(), runtimeContext);
@@ -43,6 +32,8 @@ public class AsnLookup implements UDF {
else {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Init AsnKnowledgeBase error ");
}
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@@ -51,15 +42,15 @@ public class AsnLookup implements UDF {
if(AsnKnowledgeBase.getVenderWithAsnLookup()!=null && AsnKnowledgeBase.getVenderWithAsnLookup().containsKey(vender)){
- if(event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))){
+ if(event.getExtractedFields().containsKey(lookupFieldName)){
switch (option) {
case "IP_TO_ASN":
String asn = AsnKnowledgeBase.getVenderWithAsnLookup()
.get(vender)
- .asnLookup(event.getExtractedFields().get(udfContext.getLookup_fields().get(0)).toString());
+ .asnLookup(event.getExtractedFields().get(lookupFieldName).toString());
if(!asn.isEmpty()) {
event.getExtractedFields()
- .put(udfContext.getOutput_fields().get(0), asn);
+ .put(outputFieldName, asn);
}
break;
default:
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java
index 670deef..474dd17 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestamp.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java
@@ -8,16 +8,14 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
@Slf4j
-public class UnixTimestamp implements UDF {
-
- private UDFContext udfContext;
+public class CurrentUnixTimestamp implements UDF {
private String precision;
+ private String outputFieldName;
+
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- this.udfContext = udfContext;
-
if(udfContext.getOutput_fields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
@@ -33,24 +31,25 @@ public class UnixTimestamp implements UDF {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct");
}
}
-
+ this.precision = udfContext.getParameters().get("precision").toString();
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@Override
public Event evaluate(Event event) {
long timestamp = System.currentTimeMillis();
- if ("seconds".equals(udfContext.getParameters().get("precision"))) {
+ if ("seconds".equals(precision)) {
timestamp = timestamp / 1000;
}
- event.getExtractedFields().put(udfContext.getOutput_fields().get(0), timestamp);
+ event.getExtractedFields().put(outputFieldName, timestamp);
return event;
}
@Override
public String functionName() {
- return "UNIX_TIMESTAMP_FUNCTION";
+ return "CURRENT_UNIX_TIMESTAMP";
}
@Override
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java
index 82cb1a4..7d825c9 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java
@@ -17,8 +17,9 @@ import java.util.Base64;
@Slf4j
public class DecodeBase64 implements UDF {
- private UDFContext udfContext;
-
+ private String lookupFieldNameFirst;
+ private String lookupFieldNameSecond;
+ private String outputFieldName;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
if(udfContext.getLookup_fields().size() !=2){
@@ -27,21 +28,22 @@ public class DecodeBase64 implements UDF {
if(udfContext.getOutput_fields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
- this.udfContext = udfContext;
-
+ this.lookupFieldNameFirst = udfContext.getLookup_fields().get(0);
+ this.lookupFieldNameSecond = udfContext.getLookup_fields().get(1);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@Override
public Event evaluate(Event event) {
- if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) {
+ if (event.getExtractedFields().containsKey(lookupFieldNameFirst)) {
String decodeResult = "";
String message =
(String)
event.getExtractedFields()
- .get(udfContext.getLookup_fields().get(0));
+ .get(lookupFieldNameFirst);
Object charset =
- event.getExtractedFields().getOrDefault(udfContext.getLookup_fields().get(1),"");
+ event.getExtractedFields().getOrDefault(lookupFieldNameSecond,"");
try {
if (StringUtil.isNotBlank(message)) {
byte[] base64decodedBytes = Base64.getDecoder().decode(message);
@@ -61,7 +63,7 @@ public class DecodeBase64 implements UDF {
+ e.getMessage());
}
event.getExtractedFields()
- .put(udfContext.getOutput_fields().get(0), decodeResult);
+ .put(outputFieldName, decodeResult);
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
index e7a5c37..a5d6b91 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
@@ -10,18 +10,18 @@ import com.geedgenetworks.utils.FormatUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
+import java.util.List;
+
@Slf4j
public class Domain implements UDF {
- private UDFContext udfContext;
private String option;
-
-
+ private List<String> lookupFields;
+ private String outputFieldName;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- this.udfContext = udfContext;
if(udfContext.getLookup_fields().isEmpty()){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup field is not empty");
}
@@ -42,6 +42,8 @@ public class Domain implements UDF {
}
}
this.option = udfContext.getParameters().get("option").toString();
+ this.lookupFields = udfContext.getLookup_fields();
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@@ -49,7 +51,7 @@ public class Domain implements UDF {
@Override
public Event evaluate(Event event) {
String domain = "";
- for (String lookupField : udfContext.getLookup_fields()){
+ for (String lookupField : lookupFields){
if(event.getExtractedFields().containsKey(lookupField)){
@@ -78,7 +80,7 @@ public class Domain implements UDF {
}
}
}
- event.getExtractedFields().put(udfContext.getOutput_fields().get(0), domain);
+ event.getExtractedFields().put(outputFieldName, domain);
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java
index 46a8072..dec2ddc 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java
@@ -11,11 +11,12 @@ import java.text.SimpleDateFormat;
import java.util.TimeZone;
@Slf4j
public class FromUnixTimestamp implements UDF {
- private UDFContext udfContext;
-
+ private String precision;
+ private String outputFieldName;
+ private String lookupFieldName;
+ private SimpleDateFormat sdf;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- this.udfContext = udfContext;
if(udfContext.getOutput_fields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
@@ -36,32 +37,35 @@ public class FromUnixTimestamp implements UDF {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters precision value is not correct");
}
}
+ this.precision = udfContext.getParameters().get("precision").toString();
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ switch (precision) {
+ case "seconds":
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ break;
+ case "milliseconds":
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
+ break;
+ case "microseconds":
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000");
+ break;
+ case "nanoseconds":
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000:000");
+ break;
+ default:
+ break;
+ }
+ sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
}
@Override
public Event evaluate(Event event) {
- if(event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))){
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- switch (udfContext.getParameters().get("precision").toString()) {
- case "seconds":
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- break;
- case "milliseconds":
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
- break;
- case "microseconds":
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000");
- break;
- case "nanoseconds":
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000:000");
- break;
- default:
- break;
- }
- sdf.setTimeZone(TimeZone.getTimeZone(udfContext.getParameters().get("timezone").toString()));
- String timestamp = sdf.format(Long.parseLong(event.getExtractedFields().get(udfContext.getLookup_fields().get(0)).toString()));
- event.getExtractedFields().put(udfContext.getOutput_fields().get(0), timestamp);
+ if(event.getExtractedFields().containsKey(lookupFieldName)){
+ String timestamp = sdf.format(Long.parseLong(event.getExtractedFields().get(lookupFieldName).toString()));
+ event.getExtractedFields().put(outputFieldName, timestamp);
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
index d2e29f6..4f657da 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
@@ -20,20 +20,19 @@ import java.util.Map;
@Slf4j
public class GeoIpLookup implements UDF {
- private UDFContext udfContext;
private String vender;
private String option;
-
- private Map<String,String> geolocation_field_mapping;
+ private String lookupFieldName;
+ private String outputFieldName;
+ private Map<String,String> geoLocationFieldMapping;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
checkUdfContext(udfContext);
- this.udfContext = udfContext;
this.vender = udfContext.getParameters().get("vendor_id").toString();
this.option = udfContext.getParameters().get("option").toString();
if(option.equals("IP_TO_OBJECT")){
- this.geolocation_field_mapping = (Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping");
+ this.geoLocationFieldMapping = (Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping");
}
KnowledgeBaseUpdateJob.initKnowledgeBase(vender, GeoIpKnowledgeBase.getInstance(),runtimeContext);
if(GeoIpKnowledgeBase.getVenderWithIpLookup()!=null && GeoIpKnowledgeBase.getVenderWithIpLookup().containsKey(vender)){
@@ -42,99 +41,75 @@ public class GeoIpLookup implements UDF {
else {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Init GeoIpLookup error ");
}
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@Override
public Event evaluate(Event event) {
- if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) {
+ if (event.getExtractedFields().containsKey(lookupFieldName)) {
switch (option) {
case "IP_TO_COUNTRY":
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.countryLookup(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
case "IP_TO_PROVINCE":
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.provinceLookup(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
case "IP_TO_CITY":
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.cityLookup(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
case "IP_TO_SUBDIVISION_ADDR":
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.cityLookupDetail(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
case "IP_TO_DETAIL":
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.locationLookupDetail(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
case "IP_TO_LATLNG":
- String geo =
- GeoIpKnowledgeBase.getVenderWithIpLookup()
- .get(vender)
- .latLngLookup(
- event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
- .toString());
-
event.getExtractedFields()
- .put(udfContext.getOutput_fields().get(0), geo);
+ .put(outputFieldName, GeoIpKnowledgeBase.getVenderWithIpLookup()
+ .get(vender)
+ .latLngLookup(
+ event.getExtractedFields()
+ .get(lookupFieldName)
+ .toString()));
break;
case "IP_TO_PROVIDER":
@@ -144,30 +119,22 @@ public class GeoIpLookup implements UDF {
.get(vender)
.ispLookup(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()),
TypeReference.mapType(
HashMap.class, String.class, Object.class));
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
serverIpMap.getOrDefault("isp", StringUtil.EMPTY));
break;
case "IP_TO_JSON ":
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.infoLookupToJSONString(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
case "IP_TO_OBJECT":
@@ -176,13 +143,10 @@ public class GeoIpLookup implements UDF {
.get(vender)
.infoLookup(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString());
- for (Map.Entry<String, String> entry : geolocation_field_mapping.entrySet()) {
+ for (Map.Entry<String, String> entry : geoLocationFieldMapping.entrySet()) {
switch (entry.getKey()) {
case "COUNTRY":
event.getExtractedFields()
@@ -216,16 +180,12 @@ public class GeoIpLookup implements UDF {
break;
}
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.infoLookup(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java
index c5433ea..7efc81e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java
@@ -10,7 +10,10 @@ import org.apache.flink.api.common.functions.RuntimeContext;
public class JsonExtract implements UDF {
private UDFContext udfContext;
+ private String lookupFieldName;
+ private String outputFieldName;
+ private String param;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
this.udfContext = udfContext;
@@ -26,7 +29,9 @@ public class JsonExtract implements UDF {
if(!udfContext.getParameters().containsKey("param")){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey param");
}
-
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.param =udfContext.getParameters().get("param").toString();
}
@@ -34,16 +39,15 @@ public class JsonExtract implements UDF {
@Override
public Event evaluate(Event event) {
- if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) {
+ if (event.getExtractedFields().containsKey(lookupFieldName)) {
String result =
(String)
JsonPathUtil.analysis(
event.getExtractedFields()
- .get(udfContext.getLookup_fields().get(0))
- .toString(),
- udfContext.getParameters().get("param").toString());
- event.getExtractedFields().put(udfContext.getOutput_fields().get(0), result);
+ .get(lookupFieldName)
+ .toString(),param);
+ event.getExtractedFields().put(outputFieldName, result);
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
index 7b21adf..03be355 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
@@ -2,7 +2,7 @@ package com.geedgenetworks.core.udf;
import com.alibaba.fastjson.JSON;
import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.EngineConfig;
+import com.geedgenetworks.common.config.CommonConfig;
import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.UDFContext;
import lombok.extern.slf4j.Slf4j;
@@ -14,24 +14,15 @@ import java.util.*;
@Slf4j
public class PathCombine implements UDF {
- private UDFContext udfContext;
-
- private StringBuilder stringBuilder;
-
- private Map<String, String> pathParameters = new LinkedHashMap<>();
-
-
-
-
+ private final Map<String, String> pathParameters = new LinkedHashMap<>();
+ private String outputFieldName;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- this.udfContext = udfContext;
-
Configuration configuration = (Configuration) runtimeContext
.getExecutionConfig().getGlobalJobParameters();
- EngineConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class);
+ CommonConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class);
Map<String,String> propertiesConfig =engineConfig.getPropertiesConfig();
if (udfContext.getParameters() != null
&& !udfContext.getParameters().isEmpty()) {
@@ -56,6 +47,7 @@ public class PathCombine implements UDF {
}
}
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@@ -78,7 +70,7 @@ public class PathCombine implements UDF {
}
}
String path = joinUrlParts(pathBuilder);
- event.getExtractedFields().put(udfContext.getOutput_fields().get(0), path);
+ event.getExtractedFields().put(outputFieldName, path);
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
index 134ed66..61fa44a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
@@ -5,23 +5,24 @@ import com.geedgenetworks.core.pojo.UDFContext;
import org.apache.flink.api.common.functions.RuntimeContext;
public class Rename implements UDF {
- private UDFContext udfContext;
-
+ private String lookupFieldName;
+ private String outputFieldName;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- this.udfContext = udfContext;
+
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@Override
public Event evaluate(Event event) {
- if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) {
+ if (event.getExtractedFields().containsKey(lookupFieldName)) {
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
event.getExtractedFields()
- .get(udfContext.getLookup_fields().get(0)));
- event.getExtractedFields().remove(udfContext.getLookup_fields().get(0));
+ .get(lookupFieldName));
+ event.getExtractedFields().remove(lookupFieldName);
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java
index da0b71f..070c42b 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java
@@ -1,9 +1,7 @@
package com.geedgenetworks.core.udf;
-import com.geedgenetworks.common.config.CommonConfig;
import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.UDFContext;
-import com.geedgenetworks.core.udf.UDF;
import com.geedgenetworks.core.utils.SnowflakeIdUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
@@ -11,21 +9,20 @@ import java.io.Serializable;
public class SnowflakeId implements Serializable, UDF {
- private UDFContext udfContext;
-
+ private String outputFieldName;
private SnowflakeIdUtils snowflakeIdUtils;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
String data_center_id_num = udfContext.getParameters().getOrDefault("data_center_id_num","0").toString();//转为数字
snowflakeIdUtils = new SnowflakeIdUtils(Integer.parseInt(data_center_id_num),runtimeContext.getIndexOfThisSubtask());
- this.udfContext = udfContext;
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@Override
public Event evaluate(Event event) {
event.getExtractedFields()
- .put(udfContext.getOutput_fields().get(0), snowflakeIdUtils.nextId());
+ .put(outputFieldName, snowflakeIdUtils.nextId());
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
index e557677..15228bb 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
@@ -14,8 +14,9 @@ import java.time.Instant;
public class UnixTimestampConverter implements UDF {
private UDFContext udfContext;
-
private String precision;
+ private String lookupFieldName;
+ private String outputFieldName;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
@@ -39,7 +40,8 @@ public class UnixTimestampConverter implements UDF {
this.precision =udfContext.getParameters().get("precision").toString();
}
}
-
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@@ -47,8 +49,8 @@ public class UnixTimestampConverter implements UDF {
@Override
public Event evaluate(Event event) {
- if(event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) {
- Long timestamp = Long.parseLong(event.getExtractedFields().get(udfContext.getLookup_fields().get(0)).toString());
+ if(event.getExtractedFields().containsKey(lookupFieldName)) {
+ Long timestamp = Long.parseLong(event.getExtractedFields().get(lookupFieldName).toString());
Instant instant = null;
if (String.valueOf(timestamp).length() == 13) {
// 时间戳长度大于10,表示为毫秒级时间戳
@@ -67,7 +69,7 @@ public class UnixTimestampConverter implements UDF {
timestamp = instant.toEpochMilli();
break;
}
- event.getExtractedFields().put(udfContext.getOutput_fields().get(0), timestamp);
+ event.getExtractedFields().put(outputFieldName, timestamp);
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/HadoopUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/HadoopUtils.java
deleted file mode 100644
index b7d3637..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/HadoopUtils.java
+++ /dev/null
@@ -1,139 +0,0 @@
-package com.geedgenetworks.core.utils;
-
-import com.geedgenetworks.common.config.CommonConfig;
-
-import com.geedgenetworks.utils.StringUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-
-import java.io.IOException;
-
-/**
- * @author qidaijie
- * @version 2022/11/2 17:57
- */
-public class HadoopUtils {
- private static final Log logger = LogFactory.get();
-
- private static HadoopUtils hadoopUtils;
-
- private static FileSystem fileSystem;
-
- private static void getInstance() {
- hadoopUtils = new HadoopUtils();
- }
-
- /** 构造函数 */
- private HadoopUtils() {
- // 获取连接
- getConnection();
- }
-
- private static void getConnection() {
- Configuration configuration = new Configuration();
- try {
- // 指定用户
- System.setProperty("HADOOP_USER_NAME", "etl");
- // 配置hdfs相关信息
- configuration.set("fs.defaultFS", "hdfs://ns1");
- configuration.set("hadoop.proxyuser.root.hosts", "*");
- configuration.set("hadoop.proxyuser.root.groups", "*");
- configuration.set("dfs.nameservices", "ns1");
- configuration.set("dfs.ha.namenodes.ns1", "nn1,nn2");
- String[] servers = StringUtil.split(CommonConfig.HDFS_SERVERS, ",");
- configuration.set("dfs.namenode.rpc-address.ns1.nn1", servers[0]);
- configuration.set("dfs.namenode.rpc-address.ns1.nn2", servers[1]);
- configuration.set(
- "dfs.client.failover.proxy.provider.ns1",
- "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
- // 创建fileSystem,用于连接hdfs
- fileSystem = FileSystem.get(configuration);
- } catch (IOException | RuntimeException e) {
- logger.error("Failed to create HDFS connection. message is: " + e.getMessage());
- e.printStackTrace();
- }
- }
-
- // /**
- // * 创建hdfs连接
- // */
- // static {
- // if
- // (FlowWriteConfig.FILE_SYSTEM_TYPE.equals(FlowWriteConfig.KNOWLEDGEBASE_FILE_STORAGE_TYPE)) {
- // Configuration configuration = new Configuration();
- // try {
- // //指定用户
- // System.setProperty("HADOOP_USER_NAME", "etl");
- // //配置hdfs相关信息
- // configuration.set("fs.defaultFS", "hdfs://ns1");
- // configuration.set("hadoop.proxyuser.root.hosts", "*");
- // configuration.set("hadoop.proxyuser.root.groups", "*");
- // configuration.set("dfs.nameservices", "ns1");
- // configuration.set("dfs.ha.namenodes.ns1", "nn1,nn2");
- // String[] servers = StringUtil.split(FlowWriteConfig.HDFS_SERVERS,
- // FlowWriteConfig.FORMAT_SPLITTER);
- // configuration.set("dfs.namenode.rpc-address.ns1.nn1", servers[0]);
- // configuration.set("dfs.namenode.rpc-address.ns1.nn2", servers[1]);
- // configuration.set("dfs.client.failover.proxy.provider.ns1",
- // "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
- // //创建fileSystem,用于连接hdfs
- // fileSystem = FileSystem.get(configuration);
- // } catch (IOException | RuntimeException e) {
- // logger.error("Failed to create HDFS connection. message is: " +
- // e.getMessage());
- // e.printStackTrace();
- // }
- // }
- // }
-
- /**
- * 下载HDFS文件
- *
- * @param filePath 文件路径
- * @return 文件
- */
- public static byte[] downloadFileByBytes(String filePath) {
- if (hadoopUtils == null) {
- getInstance();
- }
-
- try (FSDataInputStream open = fileSystem.open(new Path(filePath))) {
- byte[] bytes = new byte[open.available()];
- open.read(0, bytes, 0, open.available());
- return bytes;
- } catch (IOException e) {
- logger.error(
- "An I/O exception when files are download from HDFS. Message is :"
- + e.getMessage());
- }
- return null;
- }
-
- /**
- * 更新文件到HDFS
- *
- * @param filePath 文件路径
- * @param bytes 文件
- */
- public static void uploadFileByBytes(String filePath, byte[] bytes) {
- if (hadoopUtils == null) {
- getInstance();
- }
- try (FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(filePath), true)) {
- fsDataOutputStream.write(bytes);
- // fsDataOutputStream.flush();
- } catch (RuntimeException e) {
- logger.error("Uploading files to the HDFS is abnormal. Message is :" + e.getMessage());
- } catch (IOException e) {
- logger.error(
- "An I/O exception when files are uploaded to HDFS. Message is :"
- + e.getMessage());
- }
- }
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java
index cc312ea..535eaa5 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.core.utils.KnowlegdeBase;
-import com.geedgenetworks.common.config.KnowledgeConfig;
+import com.geedgenetworks.common.config.KnowledgeBaseConfig;
import com.geedgenetworks.core.pojo.KnowLedgeFileEntity;
import com.geedgenetworks.core.utils.HttpClientPoolUtil;
import org.slf4j.Logger;
@@ -17,7 +17,7 @@ public abstract class AbstractKnowledgeBase {
// 抽象类的构造函数
}
- abstract Boolean updateKnowledgeBase(KnowledgeConfig knowledgeConfig) ;
+ abstract Boolean updateKnowledgeBase(KnowledgeBaseConfig knowledgeConfig) ;
abstract String functionName();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java
index 9bf0b9a..9d0816a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.core.utils.KnowlegdeBase;
-import com.geedgenetworks.common.config.KnowledgeConfig;
+import com.geedgenetworks.common.config.KnowledgeBaseConfig;
import com.geedgenetworks.utils.IpLookupV2;
import lombok.Getter;
@@ -31,7 +31,7 @@ public class AsnKnowledgeBase extends AbstractKnowledgeBase {
- public Boolean updateKnowledgeBase(KnowledgeConfig knowledgeConfig) {
+ public Boolean updateKnowledgeBase(KnowledgeBaseConfig knowledgeConfig) {
IpLookupV2.Builder asnLookupBuilder = new IpLookupV2.Builder(false);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/GeoIpKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/GeoIpKnowledgeBase.java
index b2a1077..2ef6674 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/GeoIpKnowledgeBase.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/GeoIpKnowledgeBase.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.core.utils.KnowlegdeBase;
-import com.geedgenetworks.common.config.KnowledgeConfig;
+import com.geedgenetworks.common.config.KnowledgeBaseConfig;
import com.geedgenetworks.core.pojo.KnowLedgeFileEntity;
import com.geedgenetworks.utils.IpLookupV2;
import lombok.Getter;
@@ -26,7 +26,7 @@ public class GeoIpKnowledgeBase extends AbstractKnowledgeBase {
return instance;
}
@Override
- public Boolean updateKnowledgeBase(KnowledgeConfig knowledgeConfig) {
+ public Boolean updateKnowledgeBase(KnowledgeBaseConfig knowledgeConfig) {
IpLookupV2.Builder ipLookupBuilder = new IpLookupV2.Builder(false);
for (int i = 0; i < knowledgeConfig.getFiles().size(); i++) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java
index b879e67..0f130f4 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java
@@ -3,8 +3,8 @@ package com.geedgenetworks.core.utils.KnowlegdeBase;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.EngineConfig;
-import com.geedgenetworks.common.config.KnowledgeConfig;
+import com.geedgenetworks.common.config.CommonConfig;
+import com.geedgenetworks.common.config.KnowledgeBaseConfig;
import com.geedgenetworks.core.pojo.KnowLedgeFileEntity;
import com.geedgenetworks.core.pojo.KnowledgeBaseEntity;
import com.geedgenetworks.core.utils.HttpClientPoolUtil;
@@ -33,14 +33,14 @@ public class KnowledgeBaseUpdateJob implements Job {
private static KnowledgeBaseUpdateJob instance;
- private static EngineConfig engineConfig;
+ private static CommonConfig engineConfig;
public static synchronized KnowledgeBaseUpdateJob getInstance(RuntimeContext runtimeContext) {
if (instance == null) {
instance = new KnowledgeBaseUpdateJob();
Configuration configuration = (Configuration) runtimeContext
.getExecutionConfig().getGlobalJobParameters();
- engineConfig = com.alibaba.fastjson.JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class);
+ engineConfig = com.alibaba.fastjson.JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class);
startTask();
}
return instance;
@@ -70,7 +70,7 @@ public class KnowledgeBaseUpdateJob implements Job {
knowledgeBaseEntity.setKnowLedgeFileEntityList(knowLedgeFileEntityList);
try {
- for(KnowledgeConfig knowledgeConfig : engineConfig.getKnowledgeBaseConfig()){
+ for(KnowledgeBaseConfig knowledgeConfig : engineConfig.getKnowledgeBaseConfig()){
if(name.equals(knowledgeConfig.getName())){
knowledgeBaseEntity.setKnowledgeConfig(knowledgeConfig);
break;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/hbase/HbaseConnectBuilder.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/hbase/HbaseConnectBuilder.java
index 90bba33..b5c44fe 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/hbase/HbaseConnectBuilder.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/hbase/HbaseConnectBuilder.java
@@ -1,6 +1,7 @@
package com.geedgenetworks.core.utils.hbase;
import com.geedgenetworks.common.config.CommonConfig;
+import com.geedgenetworks.common.config.CommonConfigOptions;
import com.github.rholder.retry.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -43,7 +44,7 @@ public class HbaseConnectBuilder {
* @return This HbaseConnectBuilder instance.
*/
public HbaseConnectBuilder loadDefaultConfig() {
- this.config.set("hbase.zookeeper.quorum", CommonConfig.ZOOKEEPER_QUORUM);
+ this.config.set("hbase.zookeeper.quorum", CommonConfigOptions.ZOOKEEPER_QUORUM.defaultValue());
return this;
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java
index ef820e4..3d9a34e 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java
@@ -1,10 +1,8 @@
package com.geedgenetworks.core.udf.test;
-import com.geedgenetworks.common.config.KnowledgeConfig;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.core.pojo.UDFContext;
import com.geedgenetworks.core.udf.AsnLookup;
-import com.geedgenetworks.core.utils.KnowlegdeBase.AsnKnowledgeBase;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java
index d65c431..f5c44e4 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.core.udf.test;
-import com.geedgenetworks.common.config.KnowledgeConfig;
+import com.geedgenetworks.common.config.KnowledgeBaseConfig;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.core.pojo.UDFContext;
import com.geedgenetworks.core.udf.GeoIpLookup;
@@ -69,7 +69,7 @@ public class GeoIpLookupFunctionTest {
udfContext.getParameters().put("option","IP_TO_ASN");
udfContext.getParameters().put("vendor_id","tsg_asnlookup");*/
- KnowledgeConfig knowledgeConfig =new KnowledgeConfig();
+ KnowledgeBaseConfig knowledgeConfig =new KnowledgeBaseConfig();
knowledgeConfig.setName("tsg_geoiplookup");
knowledgeConfig.setType("geoiplookup");
knowledgeConfig.setFiles(Arrays.asList("acf1db8589c5e277-ead1a65e1c3973dc","acf1db8589c5e277-ead1a65e1c3973dc"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java
index 524b199..d91be11 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java
@@ -78,13 +78,14 @@ public class DomainFunctionTest {
public void testDomainFunctionFirstSignificantSubdomain() {
parameters.put("option", "FIRST_SIGNIFICANT_SUBDOMAIN");
+ udfContext.setParameters(parameters);
Domain domain = new Domain();
domain.open(null, udfContext);
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
- extractedFields.put("domain", "www.baidu.com");
+ extractedFields.put("v1", "www.baidu.com");
event.setExtractedFields(extractedFields);
Event result1 = domain.evaluate(event);
- assertEquals("baidu.com", result1.getExtractedFields().get("domain1"));
+ assertEquals("baidu.com", result1.getExtractedFields().get("v2"));
}
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java
index 39c825f..4886547 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java
@@ -21,15 +21,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class JsonExtractFunctionTest {
private static UDFContext udfContext;
- private static Map<String, Object> parameters ;
@BeforeAll
public static void setUp() {
udfContext = new UDFContext();
- parameters = new HashMap<>();
- parameters.put("param","$.tags[?(@.tag=='device_group')][0].value");
- udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Arrays.asList("device_tag"));
- udfContext.setOutput_fields(Arrays.asList("device_group"));
}
@Test
@@ -46,11 +40,6 @@ public class JsonExtractFunctionTest {
Assertions.assertThrows(GrootStreamRuntimeException.class, () -> {
jsonExtract.open(null, udfContext);
});
-
- udfContext.setLookup_fields(new ArrayList<>());
- udfContext.getLookup_fields().add("v1");
- udfContext.setOutput_fields(new ArrayList<>());
- udfContext.getOutput_fields().add("v2");
udfContext.setParameters(new HashMap<>());
udfContext.getParameters().put("other","other");
Assertions.assertThrows(GrootStreamRuntimeException.class, () -> {
@@ -64,6 +53,11 @@ public class JsonExtractFunctionTest {
JsonExtract jsonExtract = new JsonExtract();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("param","$.tags[?(@.tag=='device_group')][0].value");
+ udfContext.setParameters(parameters);
+ udfContext.setLookup_fields(Arrays.asList("device_tag"));
+ udfContext.setOutput_fields(Arrays.asList("device_group"));
jsonExtract.open(null, udfContext);
Event event = new Event();
String jsonString = "{\"device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-tsgx\\\"},{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-tsgx\\\"}]}\"}";
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampConverterTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java
index 4edbba1..93acbdd 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampConverterTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java
@@ -2,7 +2,6 @@ package com.geedgenetworks.core.udf.test.simple;
import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.UDFContext;
-import com.geedgenetworks.core.udf.FromUnixTimestamp;
import com.geedgenetworks.core.udf.UnixTimestampConverter;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -13,7 +12,7 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class FromUnixTimestampConverterTest {
+public class UnixTimestampConverterTest {
private static UDFContext udfContext;
@@ -24,7 +23,7 @@ public class FromUnixTimestampConverterTest {
udfContext.setOutput_fields(Arrays.asList("output"));
}
@Test
- public void testFromUnixTimestampFunctionMstoS() throws Exception {
+ public void testUnixTimestampFunctionMstoS() throws Exception {
Map<String, Object> parameters = new HashMap<>();
parameters.put("precision", "seconds");
@@ -42,7 +41,7 @@ public class FromUnixTimestampConverterTest {
}
@Test
- public void testFromUnixTimestampFunctionStoMs() throws Exception {
+ public void testUnixTimestampFunctionStoMs() throws Exception {
Map<String, Object> parameters = new HashMap<>();
parameters.put("precision", "milliseconds");
@@ -61,7 +60,7 @@ public class FromUnixTimestampConverterTest {
@Test
- public void testFromUnixTimestampFunctionStoS() throws Exception {
+ public void testUnixTimestampFunctionStoS() throws Exception {
Map<String, Object> parameters = new HashMap<>();
parameters.put("precision", "seconds");
@@ -80,7 +79,7 @@ public class FromUnixTimestampConverterTest {
@Test
- public void testFromUnixTimestampFunctionMstoMs() throws Exception {
+ public void testUnixTimestampFunctionMstoMs() throws Exception {
Map<String, Object> parameters = new HashMap<>();
parameters.put("precision", "milliseconds");
diff --git a/groot-example/src/main/resources/examples/inline-to-console-job.yaml b/groot-example/src/main/resources/examples/inline-to-console-job.yaml
index 4abac01..8bf6e7b 100644
--- a/groot-example/src/main/resources/examples/inline-to-console-job.yaml
+++ b/groot-example/src/main/resources/examples/inline-to-console-job.yaml
@@ -54,6 +54,17 @@ processing_pipelines:
output_fields: [ log_id ]
parameters:
data_center_id_num: 1
+ - function: ASN_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_asn ]
+ parameters:
+ option: IP_TO_ASN
+ vendor_id: tsg_asnlookup
+ - function: CURRENT_UNIX_TIMESTAMP
+ output_fields: [ processing_time ]
+ parameters:
+ precision: milliseconds
+
sinks:
print_sink:
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java
index 4401213..6f5c4b4 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java
@@ -5,15 +5,20 @@ import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.types.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.HashMap;
+import java.nio.charset.StandardCharsets;
import java.util.Map;
public class JsonEventDeserializationSchema implements DeserializationSchema<Event> {
+ private static final Logger LOG = LoggerFactory.getLogger(JsonEventDeserializationSchema.class);
+ private static final int MAX_CHARS_LENGTH = 1024 * 32;
private final StructType dataType;
private final boolean ignoreParseErrors;
private final JsonToMapDataConverter converter;
+ private final char[] tmpChars = new char[MAX_CHARS_LENGTH];
public JsonEventDeserializationSchema(StructType dataType, boolean ignoreParseErrors) {
this.dataType = dataType;
@@ -22,16 +27,19 @@ public class JsonEventDeserializationSchema implements DeserializationSchema<Eve
}
@Override
- public Event deserialize(byte[] message) throws IOException {
+ public Event deserialize(byte[] bytes) throws IOException {
Map<String, Object> map;
+ String message = decodeUTF8(bytes, 0, bytes.length);
+
// 没有配置type, 不进行类型校验和转换
if(dataType == null){
try {
map = JSON.parseObject(message);
} catch (Exception e) {
+ LOG.error(String.format("json解析失败for:%s", message), e);
if(ignoreParseErrors){
- map = new HashMap<>();
+ return null;
}else{
throw new IOException(e);
}
@@ -47,6 +55,106 @@ public class JsonEventDeserializationSchema implements DeserializationSchema<Eve
return event;
}
+ private String decodeUTF8(byte[] input, int offset, int byteLen) {
+ char[] chars = MAX_CHARS_LENGTH < byteLen? new char[byteLen]: tmpChars;
+ int len = decodeUTF8Strict(input, offset, byteLen, chars);
+ if (len < 0) {
+ return defaultDecodeUTF8(input, offset, byteLen);
+ }
+ return new String(chars, 0, len);
+ }
+
+ private static int decodeUTF8Strict(byte[] sa, int sp, int len, char[] da) {
+ final int sl = sp + len;
+ int dp = 0;
+ int dlASCII = Math.min(len, da.length);
+
+ // ASCII only optimized loop
+ while (dp < dlASCII && sa[sp] >= 0) {
+ da[dp++] = (char) sa[sp++];
+ }
+
+ while (sp < sl) {
+ int b1 = sa[sp++];
+ if (b1 >= 0) {
+ // 1 byte, 7 bits: 0xxxxxxx
+ da[dp++] = (char) b1;
+ } else if ((b1 >> 5) == -2 && (b1 & 0x1e) != 0) {
+ // 2 bytes, 11 bits: 110xxxxx 10xxxxxx
+ if (sp < sl) {
+ int b2 = sa[sp++];
+ if ((b2 & 0xc0) != 0x80) { // isNotContinuation(b2)
+ return -1;
+ } else {
+ da[dp++] = (char) (((b1 << 6) ^ b2) ^ (((byte) 0xC0 << 6) ^ ((byte) 0x80)));
+ }
+ continue;
+ }
+ return -1;
+ } else if ((b1 >> 4) == -2) {
+ // 3 bytes, 16 bits: 1110xxxx 10xxxxxx 10xxxxxx
+ if (sp + 1 < sl) {
+ int b2 = sa[sp++];
+ int b3 = sa[sp++];
+ if ((b1 == (byte) 0xe0 && (b2 & 0xe0) == 0x80)
+ || (b2 & 0xc0) != 0x80
+ || (b3 & 0xc0) != 0x80) { // isMalformed3(b1, b2, b3)
+ return -1;
+ } else {
+ char c =
+ (char)
+ ((b1 << 12)
+ ^ (b2 << 6)
+ ^ (b3
+ ^ (((byte) 0xE0 << 12)
+ ^ ((byte) 0x80 << 6)
+ ^ ((byte) 0x80))));
+ if (Character.isSurrogate(c)) {
+ return -1;
+ } else {
+ da[dp++] = c;
+ }
+ }
+ continue;
+ }
+ return -1;
+ } else if ((b1 >> 3) == -2) {
+ // 4 bytes, 21 bits: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx
+ if (sp + 2 < sl) {
+ int b2 = sa[sp++];
+ int b3 = sa[sp++];
+ int b4 = sa[sp++];
+ int uc =
+ ((b1 << 18)
+ ^ (b2 << 12)
+ ^ (b3 << 6)
+ ^ (b4
+ ^ (((byte) 0xF0 << 18)
+ ^ ((byte) 0x80 << 12)
+ ^ ((byte) 0x80 << 6)
+ ^ ((byte) 0x80))));
+ // isMalformed4 and shortest form check
+ if (((b2 & 0xc0) != 0x80 || (b3 & 0xc0) != 0x80 || (b4 & 0xc0) != 0x80)
+ || !Character.isSupplementaryCodePoint(uc)) {
+ return -1;
+ } else {
+ da[dp++] = Character.highSurrogate(uc);
+ da[dp++] = Character.lowSurrogate(uc);
+ }
+ continue;
+ }
+ return -1;
+ } else {
+ return -1;
+ }
+ }
+ return dp;
+ }
+
+ private static String defaultDecodeUTF8(byte[] bytes, int offset, int len) {
+ return new String(bytes, offset, len, StandardCharsets.UTF_8);
+ }
+
@Override
public boolean isEndOfStream(Event nextElement) {
return false;
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java
index 77fbb8e..5bec4e8 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java
@@ -23,7 +23,7 @@ public class JsonToMapDataConverter implements Serializable {
this.filedConverters = Arrays.stream(dataType.fields).collect(Collectors.toMap(f -> f.name, f -> this.makeConverter(f.dataType)));
}
- public Map<String, Object> convert(byte[] message){
+ public Map<String, Object> convert(String message){
try (JSONReader reader = JSONReader.of(message)) {
if (!reader.nextIfMatch('{')) {
throw new JSONException("object not start with {");
@@ -46,12 +46,12 @@ public class JsonToMapDataConverter implements Serializable {
return obj;
} catch (Exception e) {
- LOG.error("json解析失败", e);
+ LOG.error(String.format("json解析失败for:%s", message), e);
if(ignoreParseErrors){
- return new HashMap<>();
+ return null;
}
- throw new UnsupportedOperationException("json格式不正确:" + new String(message, StandardCharsets.UTF_8));
+ throw new UnsupportedOperationException("json格式不正确:" + message);
}
}