diff options
65 files changed, 617 insertions, 1593 deletions
diff --git a/config/grootstream_job_template.yaml b/config/grootstream_job_template.yaml index 3b798b2..c5bc99b 100644 --- a/config/grootstream_job_template.yaml +++ b/config/grootstream_job_template.yaml @@ -29,7 +29,6 @@ sources: inline_source: type : inline fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. - # watermark_timestamp: recv_time # watermark_timestamp_unit: s # watermark_lag: 60 @@ -85,6 +84,7 @@ preprocessing_pipelines: lookup_fields: [ '' ] output_fields: [ '' ] filter: event.common_schema_type == 'BASE' + processing_pipelines: session_record_processor: # [object] Processing Pipeline type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl @@ -129,7 +129,7 @@ processing_pipelines: parameters: param: $.tags[?(@.tag=='device_group')][0].value - - function: UNIX_TIMESTAMP_FUNCTION + - function: CURRENT_UNIX_TIMESTAMP output_fields: [ processing_time ] parameters: precision: seconds @@ -184,7 +184,6 @@ processing_pipelines: parameters: path: [ props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file ] - sinks: kafka_sink_a: type: kafka @@ -247,4 +246,3 @@ application: # [object] Application Configuration - name: kafka_sink_b parallelism: 1 downstream: [] - diff --git a/config/udf.plugins b/config/udf.plugins index d22c057..07867f6 100644 --- a/config/udf.plugins +++ b/config/udf.plugins @@ -3,9 +3,9 @@ com.geedgenetworks.core.udf.Drop com.geedgenetworks.core.udf.AsnLookup com.geedgenetworks.core.udf.Eval com.geedgenetworks.core.udf.JsonExtract -com.geedgenetworks.core.udf.UnixTimestamp +com.geedgenetworks.core.udf.CurrentUnixTimestamp com.geedgenetworks.core.udf.Domain com.geedgenetworks.core.udf.DecodeBase64 com.geedgenetworks.core.udf.GeoIpLookup com.geedgenetworks.core.udf.PathCombine -com.geedgenetworks.core.udf.UnixTimestampConverter +com.geedgenetworks.core.udf.UnixTimestampConverter
\ No newline at end of file diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java index ae904a7..3fb674d 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/CommandArgs.java @@ -28,12 +28,12 @@ public abstract class CommandArgs { @Parameter( names = {"--check"}, - description = "check config") + description = "Whether check config") protected boolean checkConfig = false; - @Parameter(names = {"-i", "--variable"}, splitter = ParameterSplitter.class, - description = "user-defined parameters , such as -i data_center=bj") + description = "user-defined parameters , such as -i data_center=bj" + + "We use ',' as separator, when inside \"\", ',' are treated as normal characters instead of delimiters.") protected List<String> variables = Collections.emptyList(); @Parameter(names = {"-n", "--name"}, diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ParameterSplitter.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ParameterSplitter.java index 55e507b..5c5bb9e 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ParameterSplitter.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ParameterSplitter.java @@ -1,31 +1,32 @@ + package com.geedgenetworks.bootstrap.command; import com.beust.jcommander.converters.IParameterSplitter; import java.util.ArrayList; -import java.util.Collections; import java.util.List; public class ParameterSplitter implements IParameterSplitter { + @Override public List<String> split(String value) { - if (!value.contains(",")) { - return Collections.singletonList(value); - } - List<String> result = new ArrayList<>(); StringBuilder currentToken = new StringBuilder(); boolean insideBrackets = false; + boolean insideQuotes = false; for (char c : value.toCharArray()) { + if (c == '[') { insideBrackets = true; } else if (c == ']') { insideBrackets = false; + } else if (c == '"') { + insideQuotes = !insideQuotes; } - if (c == ',' && !insideBrackets) { + if (c == ',' && !insideQuotes && !insideBrackets) { result.add(currentToken.toString().trim()); currentToken = new StringBuilder(); } else { diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index e03181b..ba0d1a8 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -114,7 +114,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration); } configuration.setString(Constants.SYSPROP_UDF_PLUGIN_CONFIG, JSON.toJSONString(grootStreamConfig.getUDFPluginConfig())); - configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getEngineConfig())); + configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(grootStreamConfig.getCommonConfig())); environment.getConfig().enableObjectReuse(); environment.getConfig().setGlobalJobParameters(configuration); setTimeCharacteristic(); diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java index f0edbd2..4fdf0c6 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java @@ -1,141 +1,29 @@ package com.geedgenetworks.common.config; -import org.yaml.snakeyaml.Yaml; +import lombok.Data; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.ArrayList; +import java.io.Serializable; import java.util.List; import java.util.Map; -import static com.geedgenetworks.common.config.GrootYamlParser.getClassReflect; -import static com.geedgenetworks.common.config.GrootYamlParser.loadYaml; +import static com.google.common.base.Preconditions.checkNotNull; -/** Created by wk on 2021/1/6. */ -public class CommonConfig { - public static Map<String, Object> yamlData; - public static List<String> udfPluginData = new ArrayList<>(); +/** + * Describe the common config of groot stream (grootstream.yaml). + */ +@Data +public class CommonConfig implements Serializable { - static { - try { - Yaml yaml = new Yaml(); - yamlData = - yaml.load( - GrootYamlParser.class - .getClassLoader() - .getResourceAsStream("groot.yaml")); + private List<KnowledgeBaseConfig> knowledgeBaseConfig = CommonConfigOptions.KNOWLEDGE_BASE.defaultValue(); + private Map<String,String> propertiesConfig = CommonConfigOptions.PROPERTIES.defaultValue(); - InputStream inputStreamUdf = GrootYamlParser.class.getClassLoader().getResourceAsStream("groot-platform-plugin"); - if (inputStreamUdf != null) { - BufferedReader br = new BufferedReader(new InputStreamReader(inputStreamUdf)); - String line; - while ((line = br.readLine()) != null) { - // 解析每行并添加到列表 - udfPluginData.add(line); - } - br.close(); - } else { - // FIXME - System.out.println("udf-plugin配置文件不存在"); - - } - } catch (Exception e) { - // FIXME - e.printStackTrace(); - } + public void setKnowledgeBaseConfig(List<KnowledgeBaseConfig> knowledgeBaseConfig) { + checkNotNull(knowledgeBaseConfig, CommonConfigOptions.KNOWLEDGE_BASE + "knowledgeConfig should not be null"); + this.knowledgeBaseConfig = knowledgeBaseConfig; } - public static final Map<String, String> UDF_CLASS_REFLECT = getClassReflect(udfPluginData); - - public static final String NACOS_SERVER_ADDR = - GrootYamlParser.getStringProperty("nacos.server_addr", yamlData); - public static final String NACOS_USERNAME = - GrootYamlParser.getStringProperty("nacos.username", yamlData); - public static final String NACOS_PASSWORD = - GrootYamlParser.getStringProperty("nacos.password", yamlData); - public static final String NACOS_NAMESPACE = - GrootYamlParser.getStringProperty("nacos.namespace", yamlData); - public static final String NACOS_DATA_ID = - GrootYamlParser.getStringProperty("nacos.data_id", yamlData); - public static final String NACOS_GROUP = - GrootYamlParser.getStringProperty("nacos.group", yamlData); - public static final long NACOS_READ_TIMEOUT = - GrootYamlParser.getIntProperty("nacos.read_timeout", yamlData); - - public static final String CONSUL_SERVER_ADDR = - GrootYamlParser.getStringProperty("consul.server_addr", yamlData); - public static final int CONSUL_SERVER_PORT = - GrootYamlParser.getIntProperty("consul.server_port", yamlData); - public static final String CONSUL_TOKEN = - GrootYamlParser.getStringProperty("consul.token", yamlData); - - public static final String HOS_TOKEN = GrootYamlParser.getStringProperty("hos.token", yamlData); - public static final int KNOWLEDGE_FILE_CHECK_NUMBER = - GrootYamlParser.getIntProperty("knowledge.file_check_number", yamlData); - public static final String ID_ARRAY = - GrootYamlParser.getStringProperty("knowledge.id_array", yamlData); - public static final int HTTP_POOL_MAX_CONNECTION = - GrootYamlParser.getIntProperty("http.pool.max_connection", yamlData); - public static final int HTTP_POOL_MAX_PER_ROUTE = - GrootYamlParser.getIntProperty("http.pool.max_per_route", yamlData); - public static final int HTTP_POOL_REQUEST_TIMEOUT = - GrootYamlParser.getIntProperty("http.pool.request_timeout", yamlData); - public static final int HTTP_POOL_CONNECT_TIMEOUT = - GrootYamlParser.getIntProperty("http.pool.connect_timeout", yamlData); - public static final int HTTP_POOL_RESPONSE_TIMEOUT = - GrootYamlParser.getIntProperty("http.pool.response_timeout", yamlData); - public static final String GTPC_FAMILY_NAME = "gtp"; - public static final String RADIUS_FAMILY_NAME = "radius"; - public static final String COMMON_FAMILY_NAME = "common"; - public static final String DEFAULT_RELATIONSHIP_MODULE = "vsys"; - public static final String ZOOKEEPER_QUORUM = - GrootYamlParser.getStringProperty("zookeeper.quorum", yamlData); - public static final long DATA_CENTER_ID_NUM = - GrootYamlParser.getIntProperty("snowid.data_center_id_num", yamlData); - public static final String KNOWLEDGEBASE_FILE_STORAGE_PATH = - GrootYamlParser.getStringProperty("knowledgebase.file_storage_path", yamlData); - public static final String KNOWLEDGEBASE_FILE_STORAGE_TYPE = - GrootYamlParser.getStringProperty("knowledgebase.file_storage_type", yamlData); - public static final String KNOWLEDGEBASE_TYPE_LIST = - GrootYamlParser.getStringProperty("knowledgebase.type_list", yamlData); - public static final String KNOWLEDGEBASE_NAME_LIST = - GrootYamlParser.getStringProperty("knowledgebase.name_list", yamlData); - public static final String HDFS_SERVERS = - GrootYamlParser.getStringProperty("hdfs.servers", yamlData); - /* public static final String IPV4_USER_DEFINED_ID = GrootYamlParser.getStringProperty( "knowledge.ipv4_user_defined_id"); - public static final String IPV6_USER_DEFINED_ID = GrootYamlParser.getStringProperty( "knowledge.ipv6_user_defined_id"); - public static final String IPV4_BUILT_IN_ID = GrootYamlParser.getStringProperty( "knowledge.ipv4_built_in_id"); - public static final String IPV6_BUILT_IN_ID = GrootYamlParser.getStringProperty( "knowledge.ipv6_built_in_id"); - public static final String ASN_V4_ID = GrootYamlParser.getStringProperty( "knowledge.asn_v4_id"); - public static final String ASN_V6_ID = GrootYamlParser.getStringProperty( "knowledge.asn_v6_id");*/ - - public static final Map<String, Object> KNOWLEDGE_IPLOOKUP_MAP = - GrootYamlParser.getMap("knowledge.iplookup", yamlData); - public static final Map<String, Object> KNOWLEDGE_ASNLOOKUP_MAP = - GrootYamlParser.getMap("knowledge.asnlookup", yamlData); - - public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = - GrootYamlParser.getIntProperty("hbase.tick_tuple_freq_secs", yamlData); - public static final Integer HBASE_GTPC_SCAN_MAX_ROWS = - GrootYamlParser.getIntProperty("hbase.gtpc_scan_max_rows", yamlData); - public static final Integer HBASE_RADIUS_SCAN_MAX_ROWS = - GrootYamlParser.getIntProperty("hbase.radius_scan_max_rows", yamlData); - public static final String HBASE_RADIUS_TABLE_NAME = - GrootYamlParser.getStringProperty("hbase.radius_table_name", yamlData); - public static final String HBASE_GTPC_TABLE_NAME = - GrootYamlParser.getStringProperty("hbase.gtpc_table_name", yamlData); - public static final String HBASE_RPC_TIMEOUT = - GrootYamlParser.getStringProperty("hbase.rpc_timeout", yamlData); - public static final String DATA_RELATIONSHIP_MODEL = - GrootYamlParser.getStringProperty("data.relationship_model", yamlData); - - public static void main(String[] args) { - System.out.println(""); - } } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java new file mode 100644 index 0000000..6dc9469 --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java @@ -0,0 +1,101 @@ +package com.geedgenetworks.common.config; + +import com.hazelcast.internal.config.AbstractDomConfigProcessor; +import com.hazelcast.logging.ILogger; +import com.hazelcast.logging.Logger; +import lombok.extern.slf4j.Slf4j; +import org.w3c.dom.Node; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.hazelcast.internal.config.DomConfigHelper.*; + +@Slf4j +public class CommonConfigDomProcessor extends AbstractDomConfigProcessor { + private final GrootStreamConfig config; + CommonConfigDomProcessor(boolean domLevel3, GrootStreamConfig config) { + super(domLevel3); + this.config = config; + } + + @Override + public void buildConfig(Node rootNode) { + final CommonConfig commonConfig = config.getCommonConfig(); + for (Node node : childElements(rootNode)) { + String name = cleanNodeName(node); + if (CommonConfigOptions. KNOWLEDGE_BASE.key().equals(name)) { + commonConfig.setKnowledgeBaseConfig(parseKnowledgeBaseConfig(node)); + } else if (CommonConfigOptions.PROPERTIES.key().equals(name)) { + commonConfig.setPropertiesConfig(parsePropertiesConfig(node)); + } else { + log.warn("Unrecognized configuration element: " + name); + } + + } + } + + private Map<String, String> parsePropertiesConfig(Node properties) { + + Map<String, String> propertiesMap = new HashMap<>(); + for (Node node : childElements(properties)) { + String name = cleanNodeName(node); + propertiesMap.put(name,getTextContent(node)); + } + return propertiesMap; + } + + + private List<KnowledgeBaseConfig> parseKnowledgeBaseConfig(Node kbRootNode) { + + List<KnowledgeBaseConfig> knowledgeConfigList = new ArrayList<>(); + for (Node node : childElements(kbRootNode)) { + knowledgeConfigList.add(parseKnowledgeBaseConfigAsObject(node)); + } + return knowledgeConfigList; + } + + private KnowledgeBaseConfig parseKnowledgeBaseConfigAsObject(Node kbNode) { + KnowledgeBaseConfig knowledgeBaseConfig = new KnowledgeBaseConfig(); + for (Node node : childElements(kbNode)) { + String name = cleanNodeName(node); + + if (CommonConfigOptions.KNOWLEDGE_BASE_NAME.key().equals(name)) { + knowledgeBaseConfig.setName(getTextContent(node)); + } else if (CommonConfigOptions.KNOWLEDGE_BASE_TYPE.key().equals(name)) { + knowledgeBaseConfig.setType(getTextContent(node)); + } else if (CommonConfigOptions.KNOWLEDGE_BASE_FILES.key().equals(name)) { + knowledgeBaseConfig.setFiles(parseKnowledgeBaseFilesConfig(node)); + } else if (CommonConfigOptions.KNOWLEDGE_BASE_PROPERTIES.key().equals(name)) { + knowledgeBaseConfig.setProperties(parseKnowledgeBasePropertiesConfig(node)); + } + else{ + log.warn("Unrecognized configuration element: " + name); + } + } + return knowledgeBaseConfig; + } + + private Map<String, String> parseKnowledgeBasePropertiesConfig(Node properties) { + Map<String, String> propertiesMap = new HashMap<>(); + for (Node node : childElements(properties)) { + String name = cleanNodeName(node); + propertiesMap.put(name,getTextContent(node)); + } + return propertiesMap; + } + + private List<String> parseKnowledgeBaseFilesConfig(Node files) { + + List<String> asnLookupConfigList = new ArrayList<>(); + for (Node node : childElements(files)) { + String value = node.getNodeValue(); + asnLookupConfigList.add(value); + } + return asnLookupConfigList; + } + + +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigLocator.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigLocator.java index 3962324..5302cc2 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigLocator.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigLocator.java @@ -5,9 +5,9 @@ import com.hazelcast.internal.config.AbstractConfigLocator; import static com.hazelcast.internal.config.DeclarativeConfigUtil.YAML_ACCEPTED_SUFFIXES; -public final class GrootStreamConfigLocator extends AbstractConfigLocator { +public final class CommonConfigLocator extends AbstractConfigLocator { - public GrootStreamConfigLocator() { + public CommonConfigLocator() { } @Override public boolean locateFromSystemProperty() { diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java new file mode 100644 index 0000000..9105afe --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java @@ -0,0 +1,64 @@ +package com.geedgenetworks.common.config; + +import com.alibaba.fastjson2.TypeReference; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class CommonConfigOptions { + + public static final Option<Map<String, String>> KNOWLEDGE_BASE_PROPERTIES = + Options.key("properties") + .mapType() + .defaultValue(new HashMap<String,String>()) + .withDescription("The properties of knowledgebase"); + public static final Option<String> KNOWLEDGE_BASE_NAME = + Options.key("name") + .stringType() + .defaultValue("") + .withDescription("The name of knowledgebase."); + public static final Option<String> KNOWLEDGE_BASE_TYPE = + Options.key("type") + .stringType() + .defaultValue("") + .withDescription("The type of knowledgebase."); + + public static final Option<List<String>> KNOWLEDGE_BASE_FILES = + Options.key("files") + .listType() + .defaultValue(new ArrayList<String>()) + .withDescription("The files of knowledgebase."); + + + public static final Option<String> KNOWLEDGE_BASE_STORAGE_FS_TYPE = Options.key("fs_type") + .stringType() + .defaultValue("localfile") + .withDescription("The fs type of knowledge base storage."); + + public static final Option<String> KNOWLEDGE_BASE_STORAGE_FS_DEFAULT_PATH = Options.key("fs_default_path") + .stringType() + .defaultValue("") + .withDescription("The default path of knowledge base storage."); + + public static final Option<List<KnowledgeBaseConfig>> KNOWLEDGE_BASE = + Options.key("knowledge_base") + .type(new TypeReference<List<KnowledgeBaseConfig>>() {}) + .noDefaultValue() + .withDescription("The knowledge base configuration."); + + public static final Option<Map<String, String>> PROPERTIES = + Options.key("properties") + .mapType() + .noDefaultValue() + .withDescription("The general properties of grootstream"); + + public static final Option<String> ZOOKEEPER_QUORUM = + Options.key("quorum") + .stringType() + .defaultValue("") + .withDescription("The quorum of zookeeper."); + + +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java index d7f7381..a967ae5 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java @@ -26,7 +26,7 @@ public class ConfigProvider { } public static GrootStreamConfig locateAndGetGrootStreamConfig(Properties properties) { - GrootStreamConfigLocator yamlConfigLocator = new GrootStreamConfigLocator(); + CommonConfigLocator yamlConfigLocator = new CommonConfigLocator(); GrootStreamConfig config; validateSuffixInSystemProperty(Constants.SYSPROP_GROOTSTREAM_CONFIG); if (yamlConfigLocator.locateFromSystemProperty()) { diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ConsulConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ConsulConfig.java deleted file mode 100644 index d8d5b7b..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/ConsulConfig.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.geedgenetworks.common.config; - -import lombok.Data; - -import java.io.Serializable; - -import static com.google.common.base.Preconditions.checkNotNull; - -@Data -public class ConsulConfig implements Serializable { - public String serverAddr = ServerConfigOptions.CONSUL_SERVER_ADDR.defaultValue(); - public int serverPort = ServerConfigOptions.CONSUL_SERVER_PORT.defaultValue(); - public String token = ServerConfigOptions.CONSUL_TOKEN.defaultValue(); - - public void setServerAddr(String serverAddr) { - checkNotNull(serverAddr, ServerConfigOptions.CONSUL_SERVER_ADDR + "serverAddr should not be null"); - this.serverAddr = serverAddr; - } - - public void setToken(String token) { - checkNotNull(token, ServerConfigOptions.CONSUL_TOKEN + "token should not be null"); - this.token = token; - } -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java deleted file mode 100644 index a286086..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.geedgenetworks.common.config; - -import lombok.Data; - -import java.io.Serializable; -import java.util.List; -import java.util.Map; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Describe the common config of groot stream (groot.yaml). - */ -@Data -public class EngineConfig implements Serializable { - - - private HttpConPoolConfig httpConPoolConfig = ServerConfigOptions.HTTP_CON_POOL.defaultValue(); - private List<KnowledgeConfig> knowledgeBaseConfig = ServerConfigOptions.KNOWLEDGE_BASE.defaultValue(); - private NacosConfig nacosConfig = ServerConfigOptions.NACOS.defaultValue(); - private ConsulConfig consulConfig = ServerConfigOptions.CONSUL.defaultValue(); - private ZookeeperConfig zookeeperConfig = ServerConfigOptions.ZOOKEEPER.defaultValue(); - private HdfsConfig hdfsConfig = ServerConfigOptions.HDFS.defaultValue(); - private Map<String,String> propertiesConfig = ServerConfigOptions.PROPERTIES.defaultValue(); - - - public void setKnowledgeBaseConfig(List<KnowledgeConfig> knowledgeBaseConfig) { - checkNotNull(knowledgeBaseConfig, ServerConfigOptions.KNOWLEDGE_BASE + "knowledgeConfig should not be null"); - this.knowledgeBaseConfig = knowledgeBaseConfig; - } - - - - - -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java index 00d8319..189b05b 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java @@ -9,7 +9,7 @@ import java.util.List; @Slf4j public class GrootStreamConfig { - private final EngineConfig engineConfig = new EngineConfig(); + private final CommonConfig commonConfig = new CommonConfig(); private Config hazelcastConfig; @@ -34,7 +34,6 @@ public class GrootStreamConfig { } - /** * Returns the absolute path for `groot-stream.home` based from the system property {@link * GrootStreamProperties#GROOTSTREAM_HOME} @@ -53,8 +52,8 @@ public class GrootStreamConfig { public void setHazelcastConfig(Config hazelcastConfig) { this.hazelcastConfig = hazelcastConfig; } - public EngineConfig getEngineConfig() { - return engineConfig; + public CommonConfig getCommonConfig() { + return commonConfig; } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java index ee76cb6..e948e8a 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java @@ -19,13 +19,13 @@ public class GrootStreamConfigBuilder extends AbstractYamlConfigBuilder { private final InputStream in; public GrootStreamConfigBuilder() { - this((GrootStreamConfigLocator) null); + this((CommonConfigLocator) null); } - public GrootStreamConfigBuilder(GrootStreamConfigLocator locator) { + public GrootStreamConfigBuilder(CommonConfigLocator locator) { if (locator == null) { - locator = new GrootStreamConfigLocator(); + locator = new CommonConfigLocator(); locator.locateEverywhere(); } this.in = locator.getIn(); @@ -46,16 +46,16 @@ public class GrootStreamConfigBuilder extends AbstractYamlConfigBuilder { public GrootStreamConfig build(GrootStreamConfig config) { try { - parseAndBuildConfig(config); + parseAndBuildCommonConfig(config); + parseAndBuildUDFPluginConfig(config); + parseAndBuildHazelcastConfig(config); } catch (Exception e) { throw ExceptionUtil.rethrow(e); } - config.setHazelcastConfig(ConfigProvider.locateAndGetMemberConfig(getProperties())); - config.setUDFPluginConfig(ConfigProvider.locateAndGetUDFPluginConfig(getProperties())); return config; } - private void parseAndBuildConfig(GrootStreamConfig config) throws Exception { + private void parseAndBuildCommonConfig(GrootStreamConfig config) throws Exception { YamlMapping yamlRootNode; try { yamlRootNode = (YamlMapping) YamlLoader.load(in); @@ -67,6 +67,7 @@ public class GrootStreamConfigBuilder extends AbstractYamlConfigBuilder { } YamlNode grootStreamRoot = yamlRootNode.childAsMapping(GrootStreamConfigSections.GROOTSTREAM.name); + if (grootStreamRoot == null) { grootStreamRoot = yamlRootNode; } @@ -77,8 +78,19 @@ public class GrootStreamConfigBuilder extends AbstractYamlConfigBuilder { replaceVariables(w3cRootNode); importDocuments(grootStreamRoot); - new GrootStreamDomConfigProcessor(true, config).buildConfig(w3cRootNode); + new CommonConfigDomProcessor(true, config).buildConfig(w3cRootNode); + } + + private void parseAndBuildHazelcastConfig(GrootStreamConfig config){ + config.setHazelcastConfig(ConfigProvider.locateAndGetMemberConfig(getProperties())); } + private void parseAndBuildUDFPluginConfig(GrootStreamConfig config){ + config.setUDFPluginConfig(ConfigProvider.locateAndGetUDFPluginConfig(getProperties())); + } + + + + public GrootStreamConfigBuilder setProperties(Properties properties) { if (properties == null) { properties = System.getProperties(); diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigSections.java b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigSections.java index d0a35b8..d67c97e 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigSections.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigSections.java @@ -2,8 +2,7 @@ package com.geedgenetworks.common.config; /** Configuration sections for Hazelcast GrootStream shared by YAML based configurations */ enum GrootStreamConfigSections { - GROOTSTREAM("grootstream", false), - ENGINE("engine", false); + GROOTSTREAM("grootstream", false); final String name; final boolean multipleOccurrence; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamDomConfigProcessor.java b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamDomConfigProcessor.java deleted file mode 100644 index c3f7a20..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamDomConfigProcessor.java +++ /dev/null @@ -1,235 +0,0 @@ -package com.geedgenetworks.common.config; - -import com.hazelcast.internal.config.AbstractDomConfigProcessor; -import com.hazelcast.logging.ILogger; -import com.hazelcast.logging.Logger; -import lombok.extern.slf4j.Slf4j; -import org.slf4j.LoggerFactory; -import org.w3c.dom.Node; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static com.hazelcast.internal.config.DomConfigHelper.*; - -@Slf4j -public class GrootStreamDomConfigProcessor extends AbstractDomConfigProcessor { - // TODO 日志规范:使用日志门面 slf4j 代替 - private static final ILogger LOGGER = Logger.getLogger(GrootStreamDomConfigProcessor.class); - - private final GrootStreamConfig config; - - GrootStreamDomConfigProcessor(boolean domLevel3, GrootStreamConfig config) { - super(domLevel3); - this.config = config; - } - -/* @Override - public void buildConfig(Node rootNode) { - for (Node node : childElements(rootNode)) { - String nodeName = cleanNodeName(node); - if (occurrenceSet.contains(nodeName)) { - throw new InvalidConfigurationException( - "Duplicate '" + nodeName + "' definition found in the configuration."); - } - if (handleNode(node, nodeName)) { - continue; - } - if (!GrootStreamConfigSections.canOccurMultipleTimes(nodeName)) { - occurrenceSet.add(nodeName); - } - } - }*/ - -/* private boolean handleNode(Node node, String name) { - if (GrootStreamConfigSections.GROOTSTREAM.isEqual(name)) { - parseEngineConfig(node, config); - } else { - return true; - } - parseEngineConfig(node, config); - - return false; - }*/ - - public void buildConfig(Node engineNode) { - final EngineConfig engineConfig = config.getEngineConfig(); - for (Node node : childElements(engineNode)) { - String name = cleanNodeName(node); - if (ServerConfigOptions.HTTP_CON_POOL.key().equals(name)) { - engineConfig.setHttpConPoolConfig(parseHttpConPoolConfig(node)); - } else if (ServerConfigOptions. KNOWLEDGE_BASE.key().equals(name)) { - engineConfig.setKnowledgeBaseConfig(parseKnowledgeBaseConfig(node)); - } else if (ServerConfigOptions.NACOS.key().equals(name)) { - engineConfig.setNacosConfig(parseNacosConfig(node)); - } else if (ServerConfigOptions.CONSUL.key().equals(name)) { - engineConfig.setConsulConfig(parseConsulConfig(node)); - } else if (ServerConfigOptions.ZOOKEEPER.key().equals(name)) { - engineConfig.setZookeeperConfig(parseZookeeperConfig(node)); - } else if (ServerConfigOptions.HDFS.key().equals(name)) { - engineConfig.setHdfsConfig(parseHdfsConfig(node)); - } else if (ServerConfigOptions.PROPERTIES.key().equals(name)) { - engineConfig.setPropertiesConfig(parsePropertiesConfig(node)); - } else { - LOGGER.warning("Unrecognized configuration element: " + name); - - } - - } - } - - private Map<String, String> parsePropertiesConfig(Node properties) { - - Map<String, String> propertiesMap = new HashMap<>(); - for (Node node : childElements(properties)) { - String name = cleanNodeName(node); - propertiesMap.put(name,getTextContent(node)); - } - return propertiesMap; - } - - - private ZookeeperConfig parseZookeeperConfig(Node zookeeperNode) { - ZookeeperConfig zookeeperConfig = new ZookeeperConfig(); - for (Node node : childElements(zookeeperNode)) { - String name = cleanNodeName(node); - if (ServerConfigOptions.ZOOKEEPER_QUORUM.key().equals(name)) { - zookeeperConfig.setQuorum(getTextContent(node)); - } else { - LOGGER.warning("Unrecognized configuration element: " + name); - } - } - return zookeeperConfig; - } - - private HdfsConfig parseHdfsConfig(Node hdfsNode) { - HdfsConfig hdfsConfig = new HdfsConfig(); - for (Node node : childElements(hdfsNode)) { - String name = cleanNodeName(node); - if (ServerConfigOptions.HDFS_SERVERS.key().equals(name)) { - hdfsConfig.setServers(getTextContent(node)); - } else { - LOGGER.warning("Unrecognized configuration element: " + name); - } - } - return hdfsConfig; - } - - private ConsulConfig parseConsulConfig(Node consulNode) { - ConsulConfig consulConfig = new ConsulConfig(); - for (Node node : childElements(consulNode)) { - String name = cleanNodeName(node); - if (ServerConfigOptions.CONSUL_SERVER_ADDR.key().equals(name)) { - consulConfig.setServerAddr(getTextContent(node)); - } else if (ServerConfigOptions.CONSUL_SERVER_PORT.key().equals(name)) { - consulConfig.setServerPort(getIntegerValue(ServerConfigOptions.CONSUL_SERVER_PORT.key(), getTextContent(node))); - } else if (ServerConfigOptions.CONSUL_TOKEN.key().equals(name)) { - consulConfig.setToken(getTextContent(node)); - } else { - LOGGER.warning("Unrecognized configuration element: " + name); - } - } - return consulConfig; - } - - private NacosConfig parseNacosConfig(Node nacosNode) { - NacosConfig nacosConfig = new NacosConfig(); - for (Node node : childElements(nacosNode)) { - String name = cleanNodeName(node); - if (ServerConfigOptions.NACOS_SERVER_ADDR.key().equals(name)) { - nacosConfig.setServerAddr(getTextContent(node)); - } else if (ServerConfigOptions.NACOS_NAMESPACE.key().equals(name)) { - nacosConfig.setNamespace(getTextContent(node)); - } else if (ServerConfigOptions.NACOS_GROUP.key().equals(name)) { - nacosConfig.setGroup(getTextContent(node)); - } else if (ServerConfigOptions.NACOS_DATA_ID.key().equals(name)) { - nacosConfig.setDataId(getTextContent(node)); - } else if (ServerConfigOptions.NACOS_READ_TIMEOUT.key().equals(name)) { - nacosConfig.setReadTimeout(getIntegerValue(ServerConfigOptions.NACOS_READ_TIMEOUT.key(), getTextContent(node))); - } else if (ServerConfigOptions.NACOS_USERNAME.key().equals(name)) { - nacosConfig.setUserName(getTextContent(node)); - } else if (ServerConfigOptions.NACOS_PASSWORD.key().equals(name)) { - nacosConfig.setPassword(getTextContent(node)); - } else { - LOGGER.warning("Unrecognized configuration element: " + name); - } - } - return nacosConfig; - } - - - private HttpConPoolConfig parseHttpConPoolConfig(Node httpConPoolNode) { - HttpConPoolConfig httpConPoolConfig = new HttpConPoolConfig(); - for (Node node : childElements(httpConPoolNode)) { - String name = cleanNodeName(node); - if (ServerConfigOptions.HTTP_CON_POOL_MAX_TOTAL.key().equals(name)) { - httpConPoolConfig.setMaxTotal(getIntegerValue(ServerConfigOptions.HTTP_CON_POOL_MAX_TOTAL.key(), getTextContent(node))); - } else if (ServerConfigOptions.HTTP_CON_POOL_MAX_PER_ROUTE.key().equals(name)) { - httpConPoolConfig.setMaxPerRoute(getIntegerValue(ServerConfigOptions.HTTP_CON_POOL_MAX_PER_ROUTE.key(), getTextContent(node))); - } else if (ServerConfigOptions.HTTP_CON_POOL_CONNECTION_REQUEST_TIMEOUT.key().equals(name)) { - httpConPoolConfig.setConnectionRequestTimeout(getIntegerValue(ServerConfigOptions.HTTP_CON_POOL_CONNECTION_REQUEST_TIMEOUT.key(), getTextContent(node))); - } else if (ServerConfigOptions.HTTP_CON_POOL_CONNECT_TIMEOUT.key().equals(name)) { - httpConPoolConfig.setConnectTimeout(getIntegerValue(ServerConfigOptions.HTTP_CON_POOL_CONNECT_TIMEOUT.key(), getTextContent(node))); - } else if (ServerConfigOptions.HTTP_CON_POOL_SOCKET_TIMEOUT.key().equals(name)) { - httpConPoolConfig.setSocketTimeout(getIntegerValue(ServerConfigOptions.HTTP_CON_POOL_SOCKET_TIMEOUT.key(), getTextContent(node))); - } else { - LOGGER.warning("Unrecognized configuration element: " + name); - } - } - return httpConPoolConfig; - - } - - private List<KnowledgeConfig> parseKnowledgeBaseConfig(Node knowledgeBaseNode) { - - List<KnowledgeConfig> knowledgeConfigList = new ArrayList<>(); - for (Node node : childElements(knowledgeBaseNode)) { - knowledgeConfigList.add(parseKnowledgeConfig(node)); - } - return knowledgeConfigList; - } - - private KnowledgeConfig parseKnowledgeConfig(Node asnLookupNode) { - KnowledgeConfig knowledgeConfig = new KnowledgeConfig(); - for (Node node : childElements(asnLookupNode)) { - String name = cleanNodeName(node); - - if (ServerConfigOptions.KNOWLEDGE_BASE_NAME.key().equals(name)) { - knowledgeConfig.setName(getTextContent(node)); - } else if (ServerConfigOptions.KNOWLEDGE_BASE_TYPE.key().equals(name)) { - knowledgeConfig.setType(getTextContent(node)); - } else if (ServerConfigOptions.KNOWLEDGE_BASE_FILES.key().equals(name)) { - knowledgeConfig.setFiles(parseFilesConfig(node)); - } else if (ServerConfigOptions.KNOWLEDGE_BASE_PROPERTIES.key().equals(name)) { - knowledgeConfig.setProperties(parseKnowledgePropertiesConfig(node)); - } - else{ - LOGGER.warning("Unrecognized configuration element: " + name); - } - } - return knowledgeConfig; - } - - private Map<String, String> parseKnowledgePropertiesConfig(Node properties) { - Map<String, String> propertiesMap = new HashMap<>(); - for (Node node : childElements(properties)) { - String name = cleanNodeName(node); - propertiesMap.put(name,getTextContent(node)); - } - return propertiesMap; - } - - private List<String> parseFilesConfig(Node files) { - - List<String> asnLookupConfigList = new ArrayList<>(); - for (Node node : childElements(files)) { - String value = node.getNodeValue(); - asnLookupConfigList.add(value); - } - return asnLookupConfigList; - } - - -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootYamlParser.java b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootYamlParser.java deleted file mode 100644 index 8aa9915..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootYamlParser.java +++ /dev/null @@ -1,117 +0,0 @@ -package com.geedgenetworks.common.config; - -import cn.hutool.core.io.IoUtil; -import cn.hutool.setting.yaml.YamlUtil; -import lombok.NonNull; - -import java.io.InputStream; -import java.lang.reflect.Method; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public final class GrootYamlParser { - - public static String getStringProperty(String key, Map<String, Object> yamlData) { - - return getYamlValue(yamlData, key, String.class); - } - - public static Integer getIntProperty(String key, Map<String, Object> yamlData) { - - return getYamlValue(yamlData, key, Integer.class); - } - - public static Long getLongProperty(String key, Map<String, Object> yamlData) { - - return getYamlValue(yamlData, key, Long.class); - } - - public static <T> T getYamlValue(Map<String, Object> data, String key, Class<T> valueType) { - - String[] keys = key.split("\\."); - - for (int i = 0; i < keys.length - 1; i++) { - Object value = data.get(keys[i]); - // TODO 重复的分支 - if (value == null || !(value instanceof Map)) { - return null; - } - data = (Map<String, Object>) value; - } - - return data.containsKey(keys[keys.length - 1]) - ? valueType.cast(data.get(keys[keys.length - 1])) - : null; - } - - public static Map<String, String> getClassReflect(List<String> plugins) { - - Map<String, String> classReflect = new HashMap<>(); - - for (String classPath : plugins) { - - Class cls = null; - try { - cls = Class.forName(classPath); - Method method = cls.getMethod("functionName"); - Object object = cls.newInstance(); - String result = (String) method.invoke(object); - classReflect.put(result, classPath); - System.out.println("Returned Value: " + result); - } catch (Exception e) { - e.printStackTrace(); - } - } - return classReflect; - } - - public static Map<String, Object> loadYaml(@NonNull String filePath) { - return loadYaml(Paths.get(filePath)); - } - public static Map<String, Object> loadYaml(@NonNull Path filePath) { - return loadYaml(IoUtil.toStream(filePath.toFile())); - } - public static Map<String, Object> loadYaml(InputStream input) { - return YamlUtil.load(input, Map.class); - } - - - private static Object getValue(String key, Map<String, Object> data) { - if (data.containsKey(key)) { - return data.get(key); - } else { - for (Object value : data.values()) { - if (value instanceof Map) { - Object result = getValue(key, (Map<String, Object>) value); - if (result != null) { - return result; - } - } - } - } - return null; - } - - public static List<Object> getList(String key, Map<String, Object> yamlData) { - Object value = getValue(key, yamlData); - if (value instanceof List) { - return (List<Object>) value; - } - return null; - } - - public static Map<String, Object> getMap(String key, Map<String, Object> yamlData) { - Object value = getValue(key, yamlData); - if (value instanceof Map) { - return (Map<String, Object>) value; - } - return null; - } - - public static void main(String[] args) { - System.out.println(); - } -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/HdfsConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/HdfsConfig.java deleted file mode 100644 index 1323f54..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/HdfsConfig.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.geedgenetworks.common.config; - -import lombok.Data; - -import java.io.Serializable; - -@Data -public class HdfsConfig implements Serializable { - private String servers = ServerConfigOptions.HDFS_SERVERS.defaultValue(); -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/HosConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/HosConfig.java deleted file mode 100644 index 903d6cd..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/HosConfig.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.geedgenetworks.common.config; - -import lombok.Data; - -import java.io.Serializable; - -import static com.google.common.base.Preconditions.checkNotNull; - -@Data -public class HosConfig implements Serializable { - public String token = ServerConfigOptions.HOS_TOKEN.defaultValue(); - - public void setToken(String token) { - checkNotNull(token, ServerConfigOptions.HOS_TOKEN + "token should not be null"); - this.token = token; - } - -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/HttpConPoolConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/HttpConPoolConfig.java deleted file mode 100644 index c0a0e74..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/HttpConPoolConfig.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.geedgenetworks.common.config; - -import lombok.Data; - -import java.io.Serializable; - -import static com.google.common.base.Preconditions.checkArgument; - -@Data -public class HttpConPoolConfig implements Serializable { - - public static final long MIN_TIMEOUT_MS = 10; - public int maxTotal = ServerConfigOptions.HTTP_CON_POOL_MAX_TOTAL.defaultValue(); - public int maxPerRoute = ServerConfigOptions.HTTP_CON_POOL_MAX_PER_ROUTE.defaultValue(); - public int connectionRequestTimeout = ServerConfigOptions.HTTP_CON_POOL_CONNECTION_REQUEST_TIMEOUT.defaultValue(); - public int connectTimeout = ServerConfigOptions.HTTP_CON_POOL_CONNECT_TIMEOUT.defaultValue(); - public int socketTimeout = ServerConfigOptions.HTTP_CON_POOL_SOCKET_TIMEOUT.defaultValue(); - - public void setSocketTimeout(int socketTimeout) { - checkArgument(socketTimeout > MIN_TIMEOUT_MS, ServerConfigOptions.HTTP_CON_POOL_SOCKET_TIMEOUT + "socketTimeout should be greater than " + MIN_TIMEOUT_MS); - this.socketTimeout = socketTimeout; - } -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java new file mode 100644 index 0000000..c02df61 --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java @@ -0,0 +1,34 @@ +package com.geedgenetworks.common.config; + +import com.geedgenetworks.utils.StringUtil; +import lombok.Data; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkArgument; + +@Data +public class KnowledgeBaseConfig implements Serializable { + private String name = CommonConfigOptions.KNOWLEDGE_BASE_NAME.defaultValue(); + private String type = CommonConfigOptions.KNOWLEDGE_BASE_TYPE.defaultValue(); + private Map<String, String> properties = CommonConfigOptions.KNOWLEDGE_BASE_PROPERTIES.defaultValue(); + private List<String> files = CommonConfigOptions.KNOWLEDGE_BASE_FILES.defaultValue(); + + public void setType(String type) { + checkArgument(StringUtil.isNotBlank(type) && Arrays.asList("asnlookup", "geoiplookup").contains(type.toLowerCase()) + , CommonConfigOptions.KNOWLEDGE_BASE_TYPE + "Type should be asnlookup or geoiplookup"); + this.type = type; + } + + public void setFiles(List<String> files) { + checkArgument(files != null && files.size() > 0, + CommonConfigOptions.KNOWLEDGE_BASE_FILES + "files should not be null"); + this.files = files; + } + + + +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeConfig.java deleted file mode 100644 index cc407e4..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeConfig.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.geedgenetworks.common.config; - -import lombok.Data; - -import java.io.Serializable; -import java.util.List; -import java.util.Map; - -@Data -public class KnowledgeConfig implements Serializable { - private String name = ServerConfigOptions.KNOWLEDGE_BASE_NAME.defaultValue(); - private String type = ServerConfigOptions.KNOWLEDGE_BASE_TYPE.defaultValue(); - private Map<String, String> properties = ServerConfigOptions.KNOWLEDGE_BASE_PROPERTIES.defaultValue(); - private List<String> files = ServerConfigOptions.KNOWLEDGE_BASE_FILES.defaultValue(); -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/NacosConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/NacosConfig.java deleted file mode 100644 index 2c1f7e7..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/NacosConfig.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.geedgenetworks.common.config; - -import lombok.Data; - -import java.io.Serializable; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -@Data -public class NacosConfig implements Serializable { - private String serverAddr = ServerConfigOptions.NACOS_SERVER_ADDR.defaultValue(); - private String userName = ServerConfigOptions.NACOS_USERNAME.defaultValue(); - private String password = ServerConfigOptions.NACOS_PASSWORD.defaultValue(); - private String namespace = ServerConfigOptions.NACOS_NAMESPACE.defaultValue(); - private String dataId = ServerConfigOptions.NACOS_DATA_ID.defaultValue(); - private String group = ServerConfigOptions.NACOS_GROUP.defaultValue(); - private int readTimeout = ServerConfigOptions.NACOS_READ_TIMEOUT.defaultValue(); - - public void setReadTimeout(int readTimeout) { - checkArgument(readTimeout > 0, ServerConfigOptions.NACOS_READ_TIMEOUT + "readTimeout should be greater than 0"); - this.readTimeout = readTimeout; - } - - public void setServerAddr(String serverAddr) { - checkNotNull(serverAddr, ServerConfigOptions.NACOS_SERVER_ADDR + "serverAddr should not be null"); - this.serverAddr = serverAddr; - } - - public void setDataId(String dataId) { - checkNotNull(dataId, ServerConfigOptions.NACOS_DATA_ID + "dataId should not be null"); - this.dataId = dataId; - } -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java deleted file mode 100644 index 2c03b21..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java +++ /dev/null @@ -1,206 +0,0 @@ -package com.geedgenetworks.common.config; - -import com.alibaba.fastjson2.TypeReference; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class ServerConfigOptions { - - - public static final Option<Integer> HTTP_CON_POOL_MAX_TOTAL = - Options.key("max_total") - .intType() - .defaultValue(100) - .withDescription("The max connections of http con pool."); - - public static final Option<Integer> HTTP_CON_POOL_MAX_PER_ROUTE = - Options.key("max_per_route") - .intType() - .defaultValue(10) - .withDescription("The max per route of http con pool."); - - public static final Option<Integer> HTTP_CON_POOL_CONNECTION_REQUEST_TIMEOUT = - Options.key("connection_request_timeout") - .intType() - .defaultValue(10000) - .withDescription("The connection request timeout of http con pool."); - public static final Option<Integer> HTTP_CON_POOL_CONNECT_TIMEOUT = - Options.key("connect_timeout") - .intType() - .defaultValue(10000) - .withDescription("The connect timeout of http con pool."); - - public static final Option<Integer> HTTP_CON_POOL_SOCKET_TIMEOUT = - Options.key("socket_timeout") - .intType() - .defaultValue(60000) - .withDescription("The socket timeout of http con pool."); - - public static final Option<HttpConPoolConfig> HTTP_CON_POOL = Options.key("http_con_pool") - .type(new TypeReference<HttpConPoolConfig>() {}) - .defaultValue(new HttpConPoolConfig()) - .withDescription("The http con pool configuration."); - - public static final Option<Map<String, String>> KNOWLEDGE_BASE_PROPERTIES = - Options.key("properties") - .mapType() - .defaultValue(new HashMap<String,String>()) - .withDescription("The properties of knowledgebase"); - public static final Option<String> KNOWLEDGE_BASE_NAME = - Options.key("name") - .stringType() - .defaultValue("") - .withDescription("The name of knowledgebase."); - public static final Option<String> KNOWLEDGE_BASE_TYPE = - Options.key("type") - .stringType() - .defaultValue("") - .withDescription("The type of knowledgebase."); - - public static final Option<List<String>> KNOWLEDGE_BASE_FILES = - Options.key("files") - .listType() - .defaultValue(new ArrayList<>()) - .withDescription("The files of knowledgebase."); - - - public static final Option<String> KNOWLEDGE_BASE_STORAGE_FS_TYPE = Options.key("fs_type") - .stringType() - .defaultValue("localfile") - .withDescription("The fs type of knowledge base storage."); - - public static final Option<String> KNOWLEDGE_BASE_STORAGE_FS_DEFAULT_PATH = Options.key("fs_default_path") - .stringType() - .defaultValue("") - .withDescription("The default path of knowledge base storage."); - - - - public static final Option<List<KnowledgeConfig>> KNOWLEDGE_BASE = - Options.key("knowledge_base") - .type(new TypeReference<List<KnowledgeConfig>>() {}) - .defaultValue(new ArrayList<>()) - .withDescription("The knowledge base configuration."); - - public static final Option<String> SNOWFLAKE_DATA_CENTER_ID = - Options.key("data_center_id_num") - .stringType() - .defaultValue("") - .withDescription("The data center id of snowflake."); - - public static final Option<SnowflakeConfig> SNOWFLAKE = - Options.key("snowflake") - .type(new TypeReference<SnowflakeConfig>() {}) - .defaultValue(new SnowflakeConfig()) - .withDescription("The snowflake configuration."); - - - public static final Option<String> NACOS_SERVER_ADDR = - Options.key("server_addr") - .stringType() - .defaultValue("") - .withDescription("The server address of nacos."); - public static final Option<String> NACOS_USERNAME = - Options.key("username") - .stringType() - .defaultValue("nacos") - .withDescription("The username of nacos."); - public static final Option<String> NACOS_PASSWORD = - Options.key("password") - .stringType() - .defaultValue("nacos") - .withDescription("The password of nacos."); - public static final Option<String> NACOS_NAMESPACE = - Options.key("namespace") - .stringType() - .defaultValue("public") - .withDescription("The namespace of nacos."); - public static final Option<String> NACOS_DATA_ID = - Options.key("data_id") - .stringType() - .defaultValue("") - .withDescription("The data id of nacos."); - public static final Option<String> NACOS_GROUP = - Options.key("group") - .stringType() - .defaultValue("DEFAULT_GROUP") - .withDescription("The group of nacos."); - public static final Option<Integer> NACOS_READ_TIMEOUT = - Options.key("read_timeout") - .intType() - .defaultValue(5000) - .withDescription("The read timeout of nacos."); - - public static final Option<NacosConfig> NACOS = - Options.key("nacos") - .type(new TypeReference<NacosConfig>() {}) - .defaultValue(new NacosConfig()) - .withDescription("The nacos configuration."); - - public static final Option<String> CONSUL_SERVER_ADDR = Options.key("server_addr") - .stringType() - .defaultValue("") - .withDescription("The server address of consul."); - public static final Option<Integer> CONSUL_SERVER_PORT = - Options.key("server_port") - .intType() - .defaultValue(8500) - .withDescription("The server port of consul."); - public static final Option<String> CONSUL_TOKEN = - Options.key("token") - .stringType() - .defaultValue("") - .withDescription("The token of consul."); - - public static final Option<ConsulConfig> CONSUL = - Options.key("consul") - .type(new TypeReference<ConsulConfig>() {}) - .defaultValue(new ConsulConfig()) - .withDescription("The consul configuration."); - - public static final Option<String> HOS_TOKEN = - Options.key("token") - .stringType() - .defaultValue("") - .withDescription("The token of hos."); - - public static final Option<HosConfig> HOS = - Options.key("hos") - .type(new TypeReference<HosConfig>() {}) - .defaultValue(new HosConfig()) - .withDescription("The hos configuration."); - - - public static final Option<String> ZOOKEEPER_QUORUM = - Options.key("quorum") - .stringType() - .defaultValue("") - .withDescription("The quorum of zookeeper."); - - public static final Option<ZookeeperConfig> ZOOKEEPER = - Options.key("zookeeper") - .type(new TypeReference<ZookeeperConfig>() {}) - .defaultValue(new ZookeeperConfig()) - .withDescription("The zookeeper configuration."); - - public static final Option<String> HDFS_SERVERS = - Options.key("servers") - .stringType() - .defaultValue("") - .withDescription("The servers of hdfs."); - - public static final Option<HdfsConfig> HDFS = Options.key("hdfs") - .type(new TypeReference<HdfsConfig>() {}) - .defaultValue(new HdfsConfig()) - .withDescription("The hdfs configuration."); - - public static final Option<Map<String, String>> PROPERTIES = - Options.key("properties") - .mapType() - .defaultValue(new HashMap<String,String>()) - .withDescription("The properties of grootstream"); - -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/SnowflakeConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SnowflakeConfig.java deleted file mode 100644 index 59e84b6..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/SnowflakeConfig.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.geedgenetworks.common.config; - -import lombok.Data; - -import java.io.Serializable; - -@Data -public class SnowflakeConfig implements Serializable { - private String dataCenterId = ServerConfigOptions.SNOWFLAKE_DATA_CENTER_ID.defaultValue(); -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/StorageConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/StorageConfig.java deleted file mode 100644 index 64ef74c..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/StorageConfig.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.geedgenetworks.common.config; - -import com.geedgenetworks.utils.StringUtil; -import lombok.Data; - -import java.io.Serializable; -import java.util.Arrays; - -import static com.google.common.base.Preconditions.checkArgument; - -@Data -public class StorageConfig implements Serializable { - private String fsType = ServerConfigOptions.KNOWLEDGE_BASE_STORAGE_FS_TYPE.defaultValue(); - private String fsDefaultPath = ServerConfigOptions.KNOWLEDGE_BASE_STORAGE_FS_DEFAULT_PATH.defaultValue(); - - public void setFsType(String fsType) { - checkArgument(StringUtil.isNotBlank(fsType) && Arrays.asList("hdfs", "localfile").contains(fsType.toLowerCase()) - , ServerConfigOptions.KNOWLEDGE_BASE_STORAGE_FS_TYPE + "fsType should be hdfs or localfile"); - this.fsType = fsType; - } -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/TypesafeConfigUtils.java b/groot-common/src/main/java/com/geedgenetworks/common/config/TypesafeConfigUtils.java index a8a6fa5..59c6088 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/TypesafeConfigUtils.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/TypesafeConfigUtils.java @@ -20,7 +20,7 @@ public final class TypesafeConfigUtils { * @param source config source * @param prefix config prefix * @param keepPrefix true if keep prefix - * @deprecated use org.apache.seatunnel.api.configuration.Option interface instead + * @deprecated */ @Deprecated public static Config extractSubConfig(Config source, String prefix, boolean keepPrefix) { diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ZookeeperConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ZookeeperConfig.java deleted file mode 100644 index 77c8be7..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/ZookeeperConfig.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.geedgenetworks.common.config; - -import java.io.Serializable; - -import static com.google.common.base.Preconditions.checkNotNull; - -public class ZookeeperConfig implements Serializable { - private String quorum = ServerConfigOptions.ZOOKEEPER_QUORUM.defaultValue(); - - public void setQuorum(String quorum) { - checkNotNull(quorum, ServerConfigOptions.ZOOKEEPER_QUORUM + "quorum should not be null"); - this.quorum = quorum; - } - -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/HttpClientUtils.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/HttpClientUtils.java index 00c06e4..d8eb070 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/utils/HttpClientUtils.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/HttpClientUtils.java @@ -1,6 +1,7 @@ package com.geedgenetworks.common.utils; -import com.geedgenetworks.common.config.CommonConfig; +import com.geedgenetworks.common.config.Option; +import com.geedgenetworks.common.config.Options; import com.geedgenetworks.utils.StringUtil; import org.apache.commons.io.IOUtils; import org.apache.http.*; @@ -11,7 +12,6 @@ import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.protocol.HttpClientContext; -import org.apache.http.client.utils.URIBuilder; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; import org.apache.http.conn.ConnectTimeoutException; @@ -41,7 +41,6 @@ import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.cert.X509Certificate; -import java.util.Map; public class HttpClientUtils { /** 全局连接池对象 */ @@ -51,7 +50,13 @@ public class HttpClientUtils { private final Logger logger = LoggerFactory.getLogger(HttpClientUtils.class); - public static final String ERROR_MESSAGE = "-1"; + private static final String ERROR_MESSAGE = "-1"; + + private static final int DEFAULT_MAX_TOTAL = 400; + private static final int DEFAULT_MAX_PER_ROUTE = 10; + private static final int DEFAULT_CONNECTION_REQUEST_TIMEOUT = 10000; + private static final int DEFAULT_CONNECT_TIMEOUT = 10000; + private static final int DEFAULT_SOCKET_TIMEOUT = 60000; /* * 静态代码块配置连接池信息 @@ -59,9 +64,9 @@ public class HttpClientUtils { static { // 设置最大连接数 - CONN_MANAGER.setMaxTotal(CommonConfig.HTTP_POOL_MAX_CONNECTION); + CONN_MANAGER.setMaxTotal(DEFAULT_MAX_TOTAL); // 设置每个连接的路由数 - CONN_MANAGER.setDefaultMaxPerRoute(CommonConfig.HTTP_POOL_MAX_PER_ROUTE); + CONN_MANAGER.setDefaultMaxPerRoute(DEFAULT_MAX_PER_ROUTE); } /** @@ -98,9 +103,9 @@ public class HttpClientUtils { PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry); // 设置最大连接数 - connManager.setMaxTotal(CommonConfig.HTTP_POOL_MAX_CONNECTION); + connManager.setMaxTotal(DEFAULT_MAX_TOTAL); // 设置每个连接的路由数 - connManager.setDefaultMaxPerRoute(CommonConfig.HTTP_POOL_MAX_PER_ROUTE); + connManager.setDefaultMaxPerRoute(DEFAULT_MAX_PER_ROUTE); return connManager; } catch (KeyManagementException | NoSuchAlgorithmException e) { throw new RuntimeException(e.getMessage()); @@ -117,11 +122,11 @@ public class HttpClientUtils { RequestConfig requestConfig = RequestConfig.custom() // 获取连接超时时间 - .setConnectionRequestTimeout(CommonConfig.HTTP_POOL_REQUEST_TIMEOUT) + .setConnectionRequestTimeout(DEFAULT_CONNECTION_REQUEST_TIMEOUT) // 请求超时时间 - .setConnectTimeout(CommonConfig.HTTP_POOL_CONNECT_TIMEOUT) + .setConnectTimeout(DEFAULT_CONNECT_TIMEOUT) // 响应超时时间 - .setSocketTimeout(CommonConfig.HTTP_POOL_RESPONSE_TIMEOUT) + .setSocketTimeout(DEFAULT_SOCKET_TIMEOUT) .build(); /* @@ -199,7 +204,7 @@ public class HttpClientUtils { String msg = ERROR_MESSAGE; // 获取客户端连接对象 - CloseableHttpClient httpClient = getHttpClient(CommonConfig.HTTP_POOL_RESPONSE_TIMEOUT); + CloseableHttpClient httpClient = getHttpClient(DEFAULT_SOCKET_TIMEOUT); CloseableHttpResponse response = null; try { @@ -254,7 +259,7 @@ public class HttpClientUtils { public String httpPost(URI uri, String requestBody, Header... headers) { String msg = ERROR_MESSAGE; // 获取客户端连接对象 - CloseableHttpClient httpClient = getHttpClient(CommonConfig.HTTP_POOL_RESPONSE_TIMEOUT); + CloseableHttpClient httpClient = getHttpClient(DEFAULT_SOCKET_TIMEOUT); // 创建POST请求对象 CloseableHttpResponse response = null; diff --git a/groot-common/src/main/resources/groot.yaml b/groot-common/src/main/resources/groot.yaml deleted file mode 100644 index a5bc3f9..0000000 --- a/groot-common/src/main/resources/groot.yaml +++ /dev/null @@ -1,74 +0,0 @@ - -nacos: - server_addr: 192.168.44.12:8848 - username: nacos - password: nacos - namespace: test - data_id: knowledge_base.json - group: DEFAULT_GROUP - read_timeout: 5000 - -consul: - server_addr: 192.168.41.30 - server_port: 8500 - token: c989b503-1d74-f49c-5698-e90fe461e938 - -hos: - token: c21f969b5f03d33d43e04f8f136e7682 - - -knowledge: - file_check_number: 3 - # iplookup: -# TSG: -# ip_v4_user_defined_id: acf1db8589c5e277-2d8cb5cf98593ed8 - # ip_v6_user_defined_id: acf1db8589c5e277-a442cce2d726eddf - # ip_v4_built_in_id: acf1db8589c5e277-42014a5735551e3e - # ip_v6_built_in_id: acf1db8589c5e277-8a9a0362c1320877 -# CN: - # ip_v4_user_defined_id: acf1db8589c5e277-2d8cb5cf98593ed8 - # ip_v6_user_defined_id: acf1db8589c5e277-a442cce2d726eddf - # ip_v4_built_in_id: acf1db8589c5e277-10798bc00aed2135 - # ip_v6_built_in_id: acf1db8589c5e277-8a9a0362c1320877 - # asn_v4_id: 7ce2f9890950ba90-fcc25696bf11a8a0 - # asn_v6_id: 7ce2f9890950ba90-71f13b3736863ddb - asnlookup: - TSG: - #asn_v4_id: 7ce2f9890950ba90-fe9081f96d1d8ca8 - #asn_v6_id: 7ce2f9890950ba90-c901e886e2d89a30 - asn_v4_id: 7ce2f9890950ba90-fcc25696bf11a8a0 - asn_v6_id: 7ce2f9890950ba90-71f13b3736863ddb - # CN: - # ip_v4_user_defined_id: acf1db8589c5e277-2d8cb5cf98593ed8 - # ip_v6_user_defined_id: acf1db8589c5e277-a442cce2d726eddf - # ip_v4_built_in_id: acf1db8589c5e277-10798bc00aed2135 -# ip_v6_built_in_id: acf1db8589c5e277-8a9a0362c1320877 -# asn_v4_id: 7ce2f9890950ba90-fcc25696bf11a8a0 -# asn_v6_id: 7ce2f9890950ba90-71f13b3736863ddb -http: - pool: - max_connection: 400 - max_per_route: 60 - request_timeout: 60000 - connect_timeout: 60000 - response_timeout: 60000 - -snowid: - data_center_id_num: 1 -zookeeper: - quorum: 192.168.44.12:2181 - -knowledgebase: - file_storage_path: /knowledgebase/ETL-SESSION-RECORD-COMPLETED/ - file_storage_type: hdfs -hdfs: - servers: 192.168.44.11:9000,192.168.44.14:9000 -hbase: - tick_tuple_freq_secs: - gtpc_scan_max_rows: 10000 - radius_scan_max_rows: 10000 - radius_table_name: RADIUS-TABLE - gtpc_table_name: GTPC-TABLE - rpc_timeout: -data: - relationship_model:
\ No newline at end of file diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins index d22c057..07867f6 100644 --- a/groot-common/src/main/resources/udf.plugins +++ b/groot-common/src/main/resources/udf.plugins @@ -3,9 +3,9 @@ com.geedgenetworks.core.udf.Drop com.geedgenetworks.core.udf.AsnLookup com.geedgenetworks.core.udf.Eval com.geedgenetworks.core.udf.JsonExtract -com.geedgenetworks.core.udf.UnixTimestamp +com.geedgenetworks.core.udf.CurrentUnixTimestamp com.geedgenetworks.core.udf.Domain com.geedgenetworks.core.udf.DecodeBase64 com.geedgenetworks.core.udf.GeoIpLookup com.geedgenetworks.core.udf.PathCombine -com.geedgenetworks.core.udf.UnixTimestampConverter +com.geedgenetworks.core.udf.UnixTimestampConverter
\ No newline at end of file diff --git a/groot-common/src/test/java/com/geedgenetworks/common/config/YamlGrootStreamConfigParserTest.java b/groot-common/src/test/java/com/geedgenetworks/common/config/YamlGrootStreamConfigParserTest.java index 6e42ae4..9a122ed 100644 --- a/groot-common/src/test/java/com/geedgenetworks/common/config/YamlGrootStreamConfigParserTest.java +++ b/groot-common/src/test/java/com/geedgenetworks/common/config/YamlGrootStreamConfigParserTest.java @@ -3,23 +3,25 @@ package com.geedgenetworks.common.config; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.Arrays; + public class YamlGrootStreamConfigParserTest { @Test public void testGrootStreamConfig() { - GrootStreamConfigLocator yamlConfigLocator = new GrootStreamConfigLocator(); + CommonConfigLocator commonConfigLocator = new CommonConfigLocator(); GrootStreamConfig config; - if (yamlConfigLocator.locateInWorkDirOrOnClasspath()) { + if (commonConfigLocator.locateInWorkDirOrOnClasspath()) { // 2. Try loading YAML config from the working directory or from the classpath - config = new GrootStreamConfigBuilder(yamlConfigLocator).setProperties(null).build(); + config = new GrootStreamConfigBuilder(commonConfigLocator).build(); } else { throw new RuntimeException("can't find yaml in resources"); } Assertions.assertNotNull(config); - Assertions.assertNotNull(config.getEngineConfig().getNacosConfig().getServerAddr()); - Assertions.assertTrue(config.getEngineConfig().getHttpConPoolConfig().getSocketTimeout() > 0); - Assertions.assertNotNull(config.getEngineConfig().getKnowledgeBaseConfig()); - // Assertions.assertEquals("hos", config.getEngineConfig().getKnowledgeBaseConfig().get(0).getProperties().get("fs_type")); + Assertions.assertNotNull(config.getCommonConfig().getKnowledgeBaseConfig()); + Assertions.assertTrue(config.getCommonConfig().getKnowledgeBaseConfig().size() > 0); + Assertions.assertTrue(Arrays.asList("asnlookup", "geoiplookup").contains(config.getCommonConfig().getKnowledgeBaseConfig().get(0).getType())); + Assertions.assertTrue(config.getUDFPluginConfig().size() > 0); } diff --git a/groot-common/src/test/resources/grootstream.yaml b/groot-common/src/test/resources/grootstream.yaml index c233936..4aa7ef9 100644 --- a/groot-common/src/test/resources/grootstream.yaml +++ b/groot-common/src/test/resources/grootstream.yaml @@ -1,51 +1,34 @@ grootstream: - engine: - tick_tuple_freq_secs: 90 - gtpc_scan_max_rows: 10000 - radius_scan_max_rows: 10000 - radius_table_name: RADIUS-TABLE - gtpc_table_name: GTPC-TABLE - hbase_rpc_timeout: 60000 - http_con_pool: - max_total: 400 - max_per_route: 60 - connection_request_timeout: 60000 - connect_timeout: 60000 - socket_timeout: 60000 - knowledge_base: - - name: tsg_asnlookup - type: asnlookup - properties: - fs_type: hos - fs_default_path: http://path - files: - default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0 - user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb - - name: CN_asnlookup - type: asnlookup - files: - default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0 - user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb - snowflake: - data_center_id_num: 1 - nacos: - server_addr: 192.168.44.12:8848 - username: nacos - password: nacos - namespace: test - data_id: knowledge_base.json - group: DEFAULT_GROUP - read_timeout: 5000 - consul: - server_addr: 192.168.41.30 - server_port: 8500 - token: c989b503-1d74-f49c-5698-e90fe461e938 - hos: - token: c21f969b5f03d33d43e04f8f136e7682 - zookeeper: - quorum: 192.168.44.12:2181 - hdfs: - servers: 192.168.44.11:9000,192.168.44.14:9000 + knowledge_base: + - name: tsg_asnlookup + type: asnlookup + properties: + fs_type: hos + fs_default_path: http://path + files: + - http://192.168.44.12:9098/hos/knowledge_base_bucket/757732ce-8214-4c34-aea2-aca6c51a7e82-YXNuX2J1aWx0aW4=.mmdb + + - name: tsg_geoiplookup + type: geoiplookup + files: + - 7ce2f9890950ba90-fcc25696bf11a8a0 + - 7ce2f9890950ba90-71f13b3736863ddb + properties: + hos.path: http://192.168.44.12:8089 + hos.bucket.name.traffic_file: traffic_file_bucket + hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket + + + + + + + + + + + + diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java index e2607a2..07d2f72 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java @@ -280,6 +280,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun } catch (SQLException e) { LOG.error("ClickHouseBatchInsertFail url:" + url, e); if (retryCount >= 3) { + LOG.error("ClickHouseBatchInsertFinalFail for rowCnt:" + rowCnt); // throw e; return; } @@ -290,6 +291,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun } catch (SQLException e) { LOG.error("ClickHouseBatchInsertFail url:" + url, e); if (retryCount >= 3) { + LOG.error("ClickHouseBatchInsertFinalFail for rowCnt:" + rowCnt); //throw e; if (connection != null) { connection.close(); diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java index 26e9ce4..b4cf0bb 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java @@ -1,5 +1,6 @@ package com.geedgenetworks.connectors.clickhouse.sink; +import com.alibaba.fastjson2.JSON; import com.geedgenetworks.core.pojo.Event; import com.github.housepower.data.Block; @@ -28,7 +29,11 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick // int columnIdx = batch.paramIdx2ColumnIdx(i); // batch.setObject(columnIdx, convertToCkDataType(columnTypes[i], value)); // batch.setObject(i, convertToCkDataType(dataType, value)); - batch.setObject(i, columnConverters[i].convert(value)); + try { + batch.setObject(i, columnConverters[i].convert(value)); + } catch (Exception e) { + throw new RuntimeException(columnNames[i] + "列转换值出错:" + value + ", event data:" + JSON.toJSONString(map), e); + } } } diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java index 328843b..423a45f 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java @@ -37,8 +37,10 @@ public class EventKafkaDeserializationSchema implements KafkaDeserializationSche public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<Event> out) throws Exception { try { Event event = valueDeserialization.deserialize(record.value()); - event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, record.timestamp()); - out.collect(event); + if(event != null){ + event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, record.timestamp()); + out.collect(event); + } }catch (Exception e) { LOG.error("反序列化失败", e); } diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/sink/KafkaSink.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/sink/KafkaSink.java deleted file mode 100644 index c07c7a6..0000000 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/sink/KafkaSink.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.geedgenetworks.connectors.kafka.sink; - -import com.geedgenetworks.connectors.kafka.util.KafkaUtils; -import com.geedgenetworks.core.pojo.Event; -import com.geedgenetworks.core.pojo.SinkConfigOld; -import com.geedgenetworks.core.sink.Sink; -import com.geedgenetworks.common.utils.ColumnUtil; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; - -import com.alibaba.fastjson2.JSON; - -public class KafkaSink implements Sink { - - @Override - public void sink( - SingleOutputStreamOperator<Event> streamlineEventSingleOutputStreamOperator, - SinkConfigOld sinkConfig) - throws Exception { - if (sinkConfig.getParallelism() != 0) { - streamlineEventSingleOutputStreamOperator - .map( - new MapFunction<Event, String>() { - @Override - public String map(Event value) throws Exception { - if (sinkConfig.getOutput_fields() != null - && sinkConfig.getOutput_fields().size() > 0) { - value.setExtractedFields( - ColumnUtil.columnSelector( - value.getExtractedFields(), - sinkConfig.getOutput_fields())); - } - String jsonString = - JSON.toJSONString(value.getExtractedFields()); - return jsonString; - } - }) - .setParallelism(sinkConfig.getParallelism()) - .addSink(KafkaUtils.getKafkaSink(sinkConfig.getProperties())) - .setParallelism(sinkConfig.getParallelism()) - .name(sinkConfig.getName()); - } else { - - streamlineEventSingleOutputStreamOperator - .map( - new MapFunction<Event, String>() { - @Override - public String map(Event value) throws Exception { - if (sinkConfig.getOutput_fields() != null - && sinkConfig.getOutput_fields().size() > 0) { - value.setExtractedFields( - ColumnUtil.columnSelector( - value.getExtractedFields(), - sinkConfig.getOutput_fields())); - } - String jsonString = - JSON.toJSONString(value.getExtractedFields()); - return jsonString; - } - }) - .addSink(KafkaUtils.getKafkaSink(sinkConfig.getProperties())) - .name(sinkConfig.getName()); - } - } -} diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/source/KafkaSource.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/source/KafkaSource.java deleted file mode 100644 index 9cd7bc2..0000000 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/source/KafkaSource.java +++ /dev/null @@ -1,102 +0,0 @@ -package com.geedgenetworks.connectors.kafka.source; - -import com.geedgenetworks.connectors.kafka.util.KafkaUtils; -import com.geedgenetworks.core.pojo.Event; -import com.geedgenetworks.core.pojo.SourceConfigOld; -import com.geedgenetworks.core.source.Source; -import com.geedgenetworks.common.utils.ColumnUtil; - -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.alibaba.fastjson2.JSON; - -import java.time.Duration; -import java.util.Map; - -public class KafkaSource implements Source { - - private static final Log logger = LogFactory.get(); - - @Override - public SingleOutputStreamOperator<Event> source( - StreamExecutionEnvironment env, SourceConfigOld sourceConfig) throws Exception { - DataStreamSource<Tuple2<String, Long>> sourceForSession = - env.addSource( - KafkaUtils.timestampDeserializationConsumer(sourceConfig.getProperties())); - if (sourceConfig.getParallelism() != 0) { - sourceForSession.setParallelism(sourceConfig.getParallelism()); - } - SingleOutputStreamOperator<Event> streamEvent; - streamEvent = - sourceForSession.map( - new MapFunction<Tuple2<String, Long>, Event>() { - @Override - public Event map(Tuple2<String, Long> message) { - - // FIXME TimestampDeserializationSchema 中返回了 Tuple2<>(null, null) 但此处没有判空 - Event event = new Event(); - try { - // FIXME message_timestamp_field 被当做一个非常特殊的字段统一处理为 - // kafka message timestamp,但如果 json 字段中本来就存在该字段当如何 ? - Map<String, Object> values = JSON.parseObject(message.f0); - //event.set__timestamp(message.f1); - // FIXME 下面的逻辑不具备带来不可预知错误的情况,不应该被包裹在 try 中 - values.put(Event.INTERNAL_TIMESTAMP_KEY, message.f1); - if (sourceConfig - .getProperties() - .containsKey("message_timestamp_field")) { - values.put( - sourceConfig - .getProperties() - .get("message_timestamp_field") - .toString(), - message.f1); - } - if (sourceConfig.getOutput_fields() != null - && sourceConfig.getOutput_fields().size() > 0) { - values = - ColumnUtil.columnSelector( - values, sourceConfig.getOutput_fields()); - } - event.setExtractedFields(values); - return event; - } catch (Exception e) { - logger.error(e.toString()); - return event; - // FIXME 当数据序列化发生异常,仍然返回了一个正常的数据! - // 虽然它大概率是一个空数据,但是否存在极端场景会影响业务? - // 如:在一个入库操作中插入一行全空数据,而数据库非空约束导致程序异常。 - } - } - }); - - // FIXME MAP 算子不会影响并行度 - if (sourceConfig.getParallelism() != 0) { - streamEvent.setParallelism(sourceConfig.getParallelism()); - } - if (sourceConfig.getWatermark_lag() != null && sourceConfig.getWatermark_lag() > 0L) { - WatermarkStrategy<Event> strategyForSession = - WatermarkStrategy.<Event>forBoundedOutOfOrderness( - Duration.ofMillis(sourceConfig.getWatermark_lag())) - .withTimestampAssigner( - (StreamlineEvent, timestamp) -> - Integer.parseInt( - String.valueOf( - StreamlineEvent - .getExtractedFields() - .get( - sourceConfig - .getWatermark_timestamp()))) - * 1000); - streamEvent.assignTimestampsAndWatermarks(strategyForSession); - } - return streamEvent; - } -} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java index f268689..e62310d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java @@ -1,6 +1,6 @@ package com.geedgenetworks.core.pojo; -import com.geedgenetworks.common.config.KnowledgeConfig; +import com.geedgenetworks.common.config.KnowledgeBaseConfig; import com.geedgenetworks.core.utils.KnowlegdeBase.AbstractKnowledgeBase; import lombok.Data; @@ -8,7 +8,7 @@ import java.util.List; @Data public class KnowledgeBaseEntity { - private KnowledgeConfig knowledgeConfig; + private KnowledgeBaseConfig knowledgeConfig; private List<KnowLedgeFileEntity> knowLedgeFileEntityList; private AbstractKnowledgeBase abstractKnowledgeBase; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/types/BooleanType.java b/groot-core/src/main/java/com/geedgenetworks/core/types/BooleanType.java new file mode 100644 index 0000000..c19df1d --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/types/BooleanType.java @@ -0,0 +1,8 @@ +package com.geedgenetworks.core.types; + +public class BooleanType extends DataType{ + @Override + public String simpleString() { + return "boolean"; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java index 5f6f1d7..881f356 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java @@ -1,38 +1,27 @@ package com.geedgenetworks.core.udf; -import com.alibaba.fastjson.JSON; -import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.EngineConfig; -import com.geedgenetworks.common.config.KnowledgeConfig; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.core.pojo.Event; import com.geedgenetworks.core.pojo.UDFContext; import com.geedgenetworks.core.utils.KnowlegdeBase.AsnKnowledgeBase; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseUpdateJob; -import com.geedgenetworks.utils.IpLookupV2; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.configuration.Configuration; -import org.quartz.SchedulerException; - -import static com.geedgenetworks.core.utils.SchedulerUtils.shutdownScheduler; @Slf4j public class AsnLookup implements UDF { - private UDFContext udfContext; private String vender; private String option; + private String lookupFieldName; + private String outputFieldName; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { checkUdfContext(udfContext); - this.udfContext = udfContext; this.vender = udfContext.getParameters().get("vendor_id").toString(); this.option = udfContext.getParameters().get("option").toString(); KnowledgeBaseUpdateJob.initKnowledgeBase(vender,AsnKnowledgeBase.getInstance(), runtimeContext); @@ -43,6 +32,8 @@ public class AsnLookup implements UDF { else { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Init AsnKnowledgeBase error "); } + this.lookupFieldName = udfContext.getLookup_fields().get(0); + this.outputFieldName = udfContext.getOutput_fields().get(0); } @@ -51,15 +42,15 @@ public class AsnLookup implements UDF { if(AsnKnowledgeBase.getVenderWithAsnLookup()!=null && AsnKnowledgeBase.getVenderWithAsnLookup().containsKey(vender)){ - if(event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))){ + if(event.getExtractedFields().containsKey(lookupFieldName)){ switch (option) { case "IP_TO_ASN": String asn = AsnKnowledgeBase.getVenderWithAsnLookup() .get(vender) - .asnLookup(event.getExtractedFields().get(udfContext.getLookup_fields().get(0)).toString()); + .asnLookup(event.getExtractedFields().get(lookupFieldName).toString()); if(!asn.isEmpty()) { event.getExtractedFields() - .put(udfContext.getOutput_fields().get(0), asn); + .put(outputFieldName, asn); } break; default: diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java index 670deef..474dd17 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestamp.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java @@ -8,16 +8,14 @@ import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; @Slf4j -public class UnixTimestamp implements UDF { - - private UDFContext udfContext; +public class CurrentUnixTimestamp implements UDF { private String precision; + private String outputFieldName; + @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - this.udfContext = udfContext; - if(udfContext.getOutput_fields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } @@ -33,24 +31,25 @@ public class UnixTimestamp implements UDF { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct"); } } - + this.precision = udfContext.getParameters().get("precision").toString(); + this.outputFieldName = udfContext.getOutput_fields().get(0); } @Override public Event evaluate(Event event) { long timestamp = System.currentTimeMillis(); - if ("seconds".equals(udfContext.getParameters().get("precision"))) { + if ("seconds".equals(precision)) { timestamp = timestamp / 1000; } - event.getExtractedFields().put(udfContext.getOutput_fields().get(0), timestamp); + event.getExtractedFields().put(outputFieldName, timestamp); return event; } @Override public String functionName() { - return "UNIX_TIMESTAMP_FUNCTION"; + return "CURRENT_UNIX_TIMESTAMP"; } @Override diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java index 82cb1a4..7d825c9 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java @@ -17,8 +17,9 @@ import java.util.Base64; @Slf4j public class DecodeBase64 implements UDF { - private UDFContext udfContext; - + private String lookupFieldNameFirst; + private String lookupFieldNameSecond; + private String outputFieldName; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { if(udfContext.getLookup_fields().size() !=2){ @@ -27,21 +28,22 @@ public class DecodeBase64 implements UDF { if(udfContext.getOutput_fields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } - this.udfContext = udfContext; - + this.lookupFieldNameFirst = udfContext.getLookup_fields().get(0); + this.lookupFieldNameSecond = udfContext.getLookup_fields().get(1); + this.outputFieldName = udfContext.getOutput_fields().get(0); } @Override public Event evaluate(Event event) { - if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) { + if (event.getExtractedFields().containsKey(lookupFieldNameFirst)) { String decodeResult = ""; String message = (String) event.getExtractedFields() - .get(udfContext.getLookup_fields().get(0)); + .get(lookupFieldNameFirst); Object charset = - event.getExtractedFields().getOrDefault(udfContext.getLookup_fields().get(1),""); + event.getExtractedFields().getOrDefault(lookupFieldNameSecond,""); try { if (StringUtil.isNotBlank(message)) { byte[] base64decodedBytes = Base64.getDecoder().decode(message); @@ -61,7 +63,7 @@ public class DecodeBase64 implements UDF { + e.getMessage()); } event.getExtractedFields() - .put(udfContext.getOutput_fields().get(0), decodeResult); + .put(outputFieldName, decodeResult); } return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java index e7a5c37..a5d6b91 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java @@ -10,18 +10,18 @@ import com.geedgenetworks.utils.FormatUtils; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; +import java.util.List; + @Slf4j public class Domain implements UDF { - private UDFContext udfContext; private String option; - - + private List<String> lookupFields; + private String outputFieldName; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - this.udfContext = udfContext; if(udfContext.getLookup_fields().isEmpty()){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup field is not empty"); } @@ -42,6 +42,8 @@ public class Domain implements UDF { } } this.option = udfContext.getParameters().get("option").toString(); + this.lookupFields = udfContext.getLookup_fields(); + this.outputFieldName = udfContext.getOutput_fields().get(0); } @@ -49,7 +51,7 @@ public class Domain implements UDF { @Override public Event evaluate(Event event) { String domain = ""; - for (String lookupField : udfContext.getLookup_fields()){ + for (String lookupField : lookupFields){ if(event.getExtractedFields().containsKey(lookupField)){ @@ -78,7 +80,7 @@ public class Domain implements UDF { } } } - event.getExtractedFields().put(udfContext.getOutput_fields().get(0), domain); + event.getExtractedFields().put(outputFieldName, domain); return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java index 46a8072..dec2ddc 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java @@ -11,11 +11,12 @@ import java.text.SimpleDateFormat; import java.util.TimeZone; @Slf4j public class FromUnixTimestamp implements UDF { - private UDFContext udfContext; - + private String precision; + private String outputFieldName; + private String lookupFieldName; + private SimpleDateFormat sdf; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - this.udfContext = udfContext; if(udfContext.getOutput_fields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } @@ -36,32 +37,35 @@ public class FromUnixTimestamp implements UDF { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters precision value is not correct"); } } + this.precision = udfContext.getParameters().get("precision").toString(); + this.outputFieldName = udfContext.getOutput_fields().get(0); + this.lookupFieldName = udfContext.getLookup_fields().get(0); + this.sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + switch (precision) { + case "seconds": + sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + break; + case "milliseconds": + sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); + break; + case "microseconds": + sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000"); + break; + case "nanoseconds": + sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000:000"); + break; + default: + break; + } + sdf.setTimeZone(TimeZone.getTimeZone("UTC")); } @Override public Event evaluate(Event event) { - if(event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))){ - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - switch (udfContext.getParameters().get("precision").toString()) { - case "seconds": - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - break; - case "milliseconds": - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); - break; - case "microseconds": - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000"); - break; - case "nanoseconds": - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000:000"); - break; - default: - break; - } - sdf.setTimeZone(TimeZone.getTimeZone(udfContext.getParameters().get("timezone").toString())); - String timestamp = sdf.format(Long.parseLong(event.getExtractedFields().get(udfContext.getLookup_fields().get(0)).toString())); - event.getExtractedFields().put(udfContext.getOutput_fields().get(0), timestamp); + if(event.getExtractedFields().containsKey(lookupFieldName)){ + String timestamp = sdf.format(Long.parseLong(event.getExtractedFields().get(lookupFieldName).toString())); + event.getExtractedFields().put(outputFieldName, timestamp); } return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java index d2e29f6..4f657da 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java @@ -20,20 +20,19 @@ import java.util.Map; @Slf4j public class GeoIpLookup implements UDF { - private UDFContext udfContext; private String vender; private String option; - - private Map<String,String> geolocation_field_mapping; + private String lookupFieldName; + private String outputFieldName; + private Map<String,String> geoLocationFieldMapping; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { checkUdfContext(udfContext); - this.udfContext = udfContext; this.vender = udfContext.getParameters().get("vendor_id").toString(); this.option = udfContext.getParameters().get("option").toString(); if(option.equals("IP_TO_OBJECT")){ - this.geolocation_field_mapping = (Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping"); + this.geoLocationFieldMapping = (Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping"); } KnowledgeBaseUpdateJob.initKnowledgeBase(vender, GeoIpKnowledgeBase.getInstance(),runtimeContext); if(GeoIpKnowledgeBase.getVenderWithIpLookup()!=null && GeoIpKnowledgeBase.getVenderWithIpLookup().containsKey(vender)){ @@ -42,99 +41,75 @@ public class GeoIpLookup implements UDF { else { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Init GeoIpLookup error "); } + this.lookupFieldName = udfContext.getLookup_fields().get(0); + this.outputFieldName = udfContext.getOutput_fields().get(0); } @Override public Event evaluate(Event event) { - if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) { + if (event.getExtractedFields().containsKey(lookupFieldName)) { switch (option) { case "IP_TO_COUNTRY": event.getExtractedFields() - .put( - udfContext.getOutput_fields().get(0), + .put(outputFieldName, GeoIpKnowledgeBase.getVenderWithIpLookup() .get(vender) .countryLookup( event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) + .get(lookupFieldName) .toString())); break; case "IP_TO_PROVINCE": event.getExtractedFields() - .put( - udfContext.getOutput_fields().get(0), + .put(outputFieldName, GeoIpKnowledgeBase.getVenderWithIpLookup() .get(vender) .provinceLookup( event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) + .get(lookupFieldName) .toString())); break; case "IP_TO_CITY": event.getExtractedFields() - .put( - udfContext.getOutput_fields().get(0), + .put(outputFieldName, GeoIpKnowledgeBase.getVenderWithIpLookup() .get(vender) .cityLookup( event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) + .get(lookupFieldName) .toString())); break; case "IP_TO_SUBDIVISION_ADDR": event.getExtractedFields() - .put( - udfContext.getOutput_fields().get(0), + .put(outputFieldName, GeoIpKnowledgeBase.getVenderWithIpLookup() .get(vender) .cityLookupDetail( event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) + .get(lookupFieldName) .toString())); break; case "IP_TO_DETAIL": event.getExtractedFields() - .put( - udfContext.getOutput_fields().get(0), + .put(outputFieldName, GeoIpKnowledgeBase.getVenderWithIpLookup() .get(vender) .locationLookupDetail( event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) + .get(lookupFieldName) .toString())); break; case "IP_TO_LATLNG": - String geo = - GeoIpKnowledgeBase.getVenderWithIpLookup() - .get(vender) - .latLngLookup( - event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) - .toString()); - event.getExtractedFields() - .put(udfContext.getOutput_fields().get(0), geo); + .put(outputFieldName, GeoIpKnowledgeBase.getVenderWithIpLookup() + .get(vender) + .latLngLookup( + event.getExtractedFields() + .get(lookupFieldName) + .toString())); break; case "IP_TO_PROVIDER": @@ -144,30 +119,22 @@ public class GeoIpLookup implements UDF { .get(vender) .ispLookup( event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) + .get(lookupFieldName) .toString()), TypeReference.mapType( HashMap.class, String.class, Object.class)); event.getExtractedFields() - .put( - udfContext.getOutput_fields().get(0), + .put(outputFieldName, serverIpMap.getOrDefault("isp", StringUtil.EMPTY)); break; case "IP_TO_JSON ": event.getExtractedFields() - .put( - udfContext.getOutput_fields().get(0), + .put(outputFieldName, GeoIpKnowledgeBase.getVenderWithIpLookup() .get(vender) .infoLookupToJSONString( event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) + .get(lookupFieldName) .toString())); break; case "IP_TO_OBJECT": @@ -176,13 +143,10 @@ public class GeoIpLookup implements UDF { .get(vender) .infoLookup( event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) + .get(lookupFieldName) .toString()); - for (Map.Entry<String, String> entry : geolocation_field_mapping.entrySet()) { + for (Map.Entry<String, String> entry : geoLocationFieldMapping.entrySet()) { switch (entry.getKey()) { case "COUNTRY": event.getExtractedFields() @@ -216,16 +180,12 @@ public class GeoIpLookup implements UDF { break; } event.getExtractedFields() - .put( - udfContext.getOutput_fields().get(0), + .put(outputFieldName, GeoIpKnowledgeBase.getVenderWithIpLookup() .get(vender) .infoLookup( event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) + .get(lookupFieldName) .toString())); break; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java index c5433ea..7efc81e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java @@ -10,7 +10,10 @@ import org.apache.flink.api.common.functions.RuntimeContext; public class JsonExtract implements UDF { private UDFContext udfContext; + private String lookupFieldName; + private String outputFieldName; + private String param; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { this.udfContext = udfContext; @@ -26,7 +29,9 @@ public class JsonExtract implements UDF { if(!udfContext.getParameters().containsKey("param")){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey param"); } - + this.lookupFieldName = udfContext.getLookup_fields().get(0); + this.outputFieldName = udfContext.getOutput_fields().get(0); + this.param =udfContext.getParameters().get("param").toString(); } @@ -34,16 +39,15 @@ public class JsonExtract implements UDF { @Override public Event evaluate(Event event) { - if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) { + if (event.getExtractedFields().containsKey(lookupFieldName)) { String result = (String) JsonPathUtil.analysis( event.getExtractedFields() - .get(udfContext.getLookup_fields().get(0)) - .toString(), - udfContext.getParameters().get("param").toString()); - event.getExtractedFields().put(udfContext.getOutput_fields().get(0), result); + .get(lookupFieldName) + .toString(),param); + event.getExtractedFields().put(outputFieldName, result); } return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java index 7b21adf..03be355 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java @@ -2,7 +2,7 @@ package com.geedgenetworks.core.udf; import com.alibaba.fastjson.JSON; import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.EngineConfig; +import com.geedgenetworks.common.config.CommonConfig; import com.geedgenetworks.core.pojo.Event; import com.geedgenetworks.core.pojo.UDFContext; import lombok.extern.slf4j.Slf4j; @@ -14,24 +14,15 @@ import java.util.*; @Slf4j public class PathCombine implements UDF { - private UDFContext udfContext; - - private StringBuilder stringBuilder; - - private Map<String, String> pathParameters = new LinkedHashMap<>(); - - - - + private final Map<String, String> pathParameters = new LinkedHashMap<>(); + private String outputFieldName; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - this.udfContext = udfContext; - Configuration configuration = (Configuration) runtimeContext .getExecutionConfig().getGlobalJobParameters(); - EngineConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class); + CommonConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class); Map<String,String> propertiesConfig =engineConfig.getPropertiesConfig(); if (udfContext.getParameters() != null && !udfContext.getParameters().isEmpty()) { @@ -56,6 +47,7 @@ public class PathCombine implements UDF { } } + this.outputFieldName = udfContext.getOutput_fields().get(0); } @@ -78,7 +70,7 @@ public class PathCombine implements UDF { } } String path = joinUrlParts(pathBuilder); - event.getExtractedFields().put(udfContext.getOutput_fields().get(0), path); + event.getExtractedFields().put(outputFieldName, path); return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java index 134ed66..61fa44a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java @@ -5,23 +5,24 @@ import com.geedgenetworks.core.pojo.UDFContext; import org.apache.flink.api.common.functions.RuntimeContext; public class Rename implements UDF { - private UDFContext udfContext; - + private String lookupFieldName; + private String outputFieldName; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - this.udfContext = udfContext; + + this.lookupFieldName = udfContext.getLookup_fields().get(0); + this.outputFieldName = udfContext.getOutput_fields().get(0); } @Override public Event evaluate(Event event) { - if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) { + if (event.getExtractedFields().containsKey(lookupFieldName)) { event.getExtractedFields() - .put( - udfContext.getOutput_fields().get(0), + .put(outputFieldName, event.getExtractedFields() - .get(udfContext.getLookup_fields().get(0))); - event.getExtractedFields().remove(udfContext.getLookup_fields().get(0)); + .get(lookupFieldName)); + event.getExtractedFields().remove(lookupFieldName); } return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java index da0b71f..070c42b 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java @@ -1,9 +1,7 @@ package com.geedgenetworks.core.udf; -import com.geedgenetworks.common.config.CommonConfig; import com.geedgenetworks.core.pojo.Event; import com.geedgenetworks.core.pojo.UDFContext; -import com.geedgenetworks.core.udf.UDF; import com.geedgenetworks.core.utils.SnowflakeIdUtils; import org.apache.flink.api.common.functions.RuntimeContext; @@ -11,21 +9,20 @@ import java.io.Serializable; public class SnowflakeId implements Serializable, UDF { - private UDFContext udfContext; - + private String outputFieldName; private SnowflakeIdUtils snowflakeIdUtils; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { String data_center_id_num = udfContext.getParameters().getOrDefault("data_center_id_num","0").toString();//转为数字 snowflakeIdUtils = new SnowflakeIdUtils(Integer.parseInt(data_center_id_num),runtimeContext.getIndexOfThisSubtask()); - this.udfContext = udfContext; + this.outputFieldName = udfContext.getOutput_fields().get(0); } @Override public Event evaluate(Event event) { event.getExtractedFields() - .put(udfContext.getOutput_fields().get(0), snowflakeIdUtils.nextId()); + .put(outputFieldName, snowflakeIdUtils.nextId()); return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java index e557677..15228bb 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java @@ -14,8 +14,9 @@ import java.time.Instant; public class UnixTimestampConverter implements UDF { private UDFContext udfContext; - private String precision; + private String lookupFieldName; + private String outputFieldName; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { @@ -39,7 +40,8 @@ public class UnixTimestampConverter implements UDF { this.precision =udfContext.getParameters().get("precision").toString(); } } - + this.lookupFieldName = udfContext.getLookup_fields().get(0); + this.outputFieldName = udfContext.getOutput_fields().get(0); } @@ -47,8 +49,8 @@ public class UnixTimestampConverter implements UDF { @Override public Event evaluate(Event event) { - if(event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) { - Long timestamp = Long.parseLong(event.getExtractedFields().get(udfContext.getLookup_fields().get(0)).toString()); + if(event.getExtractedFields().containsKey(lookupFieldName)) { + Long timestamp = Long.parseLong(event.getExtractedFields().get(lookupFieldName).toString()); Instant instant = null; if (String.valueOf(timestamp).length() == 13) { // 时间戳长度大于10,表示为毫秒级时间戳 @@ -67,7 +69,7 @@ public class UnixTimestampConverter implements UDF { timestamp = instant.toEpochMilli(); break; } - event.getExtractedFields().put(udfContext.getOutput_fields().get(0), timestamp); + event.getExtractedFields().put(outputFieldName, timestamp); } return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/HadoopUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/HadoopUtils.java deleted file mode 100644 index b7d3637..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/HadoopUtils.java +++ /dev/null @@ -1,139 +0,0 @@ -package com.geedgenetworks.core.utils; - -import com.geedgenetworks.common.config.CommonConfig; - -import com.geedgenetworks.utils.StringUtil; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; - -import java.io.IOException; - -/** - * @author qidaijie - * @version 2022/11/2 17:57 - */ -public class HadoopUtils { - private static final Log logger = LogFactory.get(); - - private static HadoopUtils hadoopUtils; - - private static FileSystem fileSystem; - - private static void getInstance() { - hadoopUtils = new HadoopUtils(); - } - - /** 构造函数 */ - private HadoopUtils() { - // 获取连接 - getConnection(); - } - - private static void getConnection() { - Configuration configuration = new Configuration(); - try { - // 指定用户 - System.setProperty("HADOOP_USER_NAME", "etl"); - // 配置hdfs相关信息 - configuration.set("fs.defaultFS", "hdfs://ns1"); - configuration.set("hadoop.proxyuser.root.hosts", "*"); - configuration.set("hadoop.proxyuser.root.groups", "*"); - configuration.set("dfs.nameservices", "ns1"); - configuration.set("dfs.ha.namenodes.ns1", "nn1,nn2"); - String[] servers = StringUtil.split(CommonConfig.HDFS_SERVERS, ","); - configuration.set("dfs.namenode.rpc-address.ns1.nn1", servers[0]); - configuration.set("dfs.namenode.rpc-address.ns1.nn2", servers[1]); - configuration.set( - "dfs.client.failover.proxy.provider.ns1", - "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); - // 创建fileSystem,用于连接hdfs - fileSystem = FileSystem.get(configuration); - } catch (IOException | RuntimeException e) { - logger.error("Failed to create HDFS connection. message is: " + e.getMessage()); - e.printStackTrace(); - } - } - - // /** - // * 创建hdfs连接 - // */ - // static { - // if - // (FlowWriteConfig.FILE_SYSTEM_TYPE.equals(FlowWriteConfig.KNOWLEDGEBASE_FILE_STORAGE_TYPE)) { - // Configuration configuration = new Configuration(); - // try { - // //指定用户 - // System.setProperty("HADOOP_USER_NAME", "etl"); - // //配置hdfs相关信息 - // configuration.set("fs.defaultFS", "hdfs://ns1"); - // configuration.set("hadoop.proxyuser.root.hosts", "*"); - // configuration.set("hadoop.proxyuser.root.groups", "*"); - // configuration.set("dfs.nameservices", "ns1"); - // configuration.set("dfs.ha.namenodes.ns1", "nn1,nn2"); - // String[] servers = StringUtil.split(FlowWriteConfig.HDFS_SERVERS, - // FlowWriteConfig.FORMAT_SPLITTER); - // configuration.set("dfs.namenode.rpc-address.ns1.nn1", servers[0]); - // configuration.set("dfs.namenode.rpc-address.ns1.nn2", servers[1]); - // configuration.set("dfs.client.failover.proxy.provider.ns1", - // "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); - // //创建fileSystem,用于连接hdfs - // fileSystem = FileSystem.get(configuration); - // } catch (IOException | RuntimeException e) { - // logger.error("Failed to create HDFS connection. message is: " + - // e.getMessage()); - // e.printStackTrace(); - // } - // } - // } - - /** - * 下载HDFS文件 - * - * @param filePath 文件路径 - * @return 文件 - */ - public static byte[] downloadFileByBytes(String filePath) { - if (hadoopUtils == null) { - getInstance(); - } - - try (FSDataInputStream open = fileSystem.open(new Path(filePath))) { - byte[] bytes = new byte[open.available()]; - open.read(0, bytes, 0, open.available()); - return bytes; - } catch (IOException e) { - logger.error( - "An I/O exception when files are download from HDFS. Message is :" - + e.getMessage()); - } - return null; - } - - /** - * 更新文件到HDFS - * - * @param filePath 文件路径 - * @param bytes 文件 - */ - public static void uploadFileByBytes(String filePath, byte[] bytes) { - if (hadoopUtils == null) { - getInstance(); - } - try (FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(filePath), true)) { - fsDataOutputStream.write(bytes); - // fsDataOutputStream.flush(); - } catch (RuntimeException e) { - logger.error("Uploading files to the HDFS is abnormal. Message is :" + e.getMessage()); - } catch (IOException e) { - logger.error( - "An I/O exception when files are uploaded to HDFS. Message is :" - + e.getMessage()); - } - } -} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java index cc312ea..535eaa5 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java @@ -1,6 +1,6 @@ package com.geedgenetworks.core.utils.KnowlegdeBase; -import com.geedgenetworks.common.config.KnowledgeConfig; +import com.geedgenetworks.common.config.KnowledgeBaseConfig; import com.geedgenetworks.core.pojo.KnowLedgeFileEntity; import com.geedgenetworks.core.utils.HttpClientPoolUtil; import org.slf4j.Logger; @@ -17,7 +17,7 @@ public abstract class AbstractKnowledgeBase { // 抽象类的构造函数 } - abstract Boolean updateKnowledgeBase(KnowledgeConfig knowledgeConfig) ; + abstract Boolean updateKnowledgeBase(KnowledgeBaseConfig knowledgeConfig) ; abstract String functionName(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java index 9bf0b9a..9d0816a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java @@ -1,7 +1,7 @@ package com.geedgenetworks.core.utils.KnowlegdeBase; -import com.geedgenetworks.common.config.KnowledgeConfig; +import com.geedgenetworks.common.config.KnowledgeBaseConfig; import com.geedgenetworks.utils.IpLookupV2; import lombok.Getter; @@ -31,7 +31,7 @@ public class AsnKnowledgeBase extends AbstractKnowledgeBase { - public Boolean updateKnowledgeBase(KnowledgeConfig knowledgeConfig) { + public Boolean updateKnowledgeBase(KnowledgeBaseConfig knowledgeConfig) { IpLookupV2.Builder asnLookupBuilder = new IpLookupV2.Builder(false); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/GeoIpKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/GeoIpKnowledgeBase.java index b2a1077..2ef6674 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/GeoIpKnowledgeBase.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/GeoIpKnowledgeBase.java @@ -1,7 +1,7 @@ package com.geedgenetworks.core.utils.KnowlegdeBase; -import com.geedgenetworks.common.config.KnowledgeConfig; +import com.geedgenetworks.common.config.KnowledgeBaseConfig; import com.geedgenetworks.core.pojo.KnowLedgeFileEntity; import com.geedgenetworks.utils.IpLookupV2; import lombok.Getter; @@ -26,7 +26,7 @@ public class GeoIpKnowledgeBase extends AbstractKnowledgeBase { return instance; } @Override - public Boolean updateKnowledgeBase(KnowledgeConfig knowledgeConfig) { + public Boolean updateKnowledgeBase(KnowledgeBaseConfig knowledgeConfig) { IpLookupV2.Builder ipLookupBuilder = new IpLookupV2.Builder(false); for (int i = 0; i < knowledgeConfig.getFiles().size(); i++) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java index b879e67..0f130f4 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java @@ -3,8 +3,8 @@ package com.geedgenetworks.core.utils.KnowlegdeBase; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.TypeReference; import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.EngineConfig; -import com.geedgenetworks.common.config.KnowledgeConfig; +import com.geedgenetworks.common.config.CommonConfig; +import com.geedgenetworks.common.config.KnowledgeBaseConfig; import com.geedgenetworks.core.pojo.KnowLedgeFileEntity; import com.geedgenetworks.core.pojo.KnowledgeBaseEntity; import com.geedgenetworks.core.utils.HttpClientPoolUtil; @@ -33,14 +33,14 @@ public class KnowledgeBaseUpdateJob implements Job { private static KnowledgeBaseUpdateJob instance; - private static EngineConfig engineConfig; + private static CommonConfig engineConfig; public static synchronized KnowledgeBaseUpdateJob getInstance(RuntimeContext runtimeContext) { if (instance == null) { instance = new KnowledgeBaseUpdateJob(); Configuration configuration = (Configuration) runtimeContext .getExecutionConfig().getGlobalJobParameters(); - engineConfig = com.alibaba.fastjson.JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class); + engineConfig = com.alibaba.fastjson.JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class); startTask(); } return instance; @@ -70,7 +70,7 @@ public class KnowledgeBaseUpdateJob implements Job { knowledgeBaseEntity.setKnowLedgeFileEntityList(knowLedgeFileEntityList); try { - for(KnowledgeConfig knowledgeConfig : engineConfig.getKnowledgeBaseConfig()){ + for(KnowledgeBaseConfig knowledgeConfig : engineConfig.getKnowledgeBaseConfig()){ if(name.equals(knowledgeConfig.getName())){ knowledgeBaseEntity.setKnowledgeConfig(knowledgeConfig); break; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/hbase/HbaseConnectBuilder.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/hbase/HbaseConnectBuilder.java index 90bba33..b5c44fe 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/hbase/HbaseConnectBuilder.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/hbase/HbaseConnectBuilder.java @@ -1,6 +1,7 @@ package com.geedgenetworks.core.utils.hbase; import com.geedgenetworks.common.config.CommonConfig; +import com.geedgenetworks.common.config.CommonConfigOptions; import com.github.rholder.retry.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -43,7 +44,7 @@ public class HbaseConnectBuilder { * @return This HbaseConnectBuilder instance. */ public HbaseConnectBuilder loadDefaultConfig() { - this.config.set("hbase.zookeeper.quorum", CommonConfig.ZOOKEEPER_QUORUM); + this.config.set("hbase.zookeeper.quorum", CommonConfigOptions.ZOOKEEPER_QUORUM.defaultValue()); return this; } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java index ef820e4..3d9a34e 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java @@ -1,10 +1,8 @@ package com.geedgenetworks.core.udf.test; -import com.geedgenetworks.common.config.KnowledgeConfig; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.core.pojo.UDFContext; import com.geedgenetworks.core.udf.AsnLookup; -import com.geedgenetworks.core.utils.KnowlegdeBase.AsnKnowledgeBase; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java index d65c431..f5c44e4 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java @@ -1,6 +1,6 @@ package com.geedgenetworks.core.udf.test; -import com.geedgenetworks.common.config.KnowledgeConfig; +import com.geedgenetworks.common.config.KnowledgeBaseConfig; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.core.pojo.UDFContext; import com.geedgenetworks.core.udf.GeoIpLookup; @@ -69,7 +69,7 @@ public class GeoIpLookupFunctionTest { udfContext.getParameters().put("option","IP_TO_ASN"); udfContext.getParameters().put("vendor_id","tsg_asnlookup");*/ - KnowledgeConfig knowledgeConfig =new KnowledgeConfig(); + KnowledgeBaseConfig knowledgeConfig =new KnowledgeBaseConfig(); knowledgeConfig.setName("tsg_geoiplookup"); knowledgeConfig.setType("geoiplookup"); knowledgeConfig.setFiles(Arrays.asList("acf1db8589c5e277-ead1a65e1c3973dc","acf1db8589c5e277-ead1a65e1c3973dc")); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java index 524b199..d91be11 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java @@ -78,13 +78,14 @@ public class DomainFunctionTest { public void testDomainFunctionFirstSignificantSubdomain() { parameters.put("option", "FIRST_SIGNIFICANT_SUBDOMAIN"); + udfContext.setParameters(parameters); Domain domain = new Domain(); domain.open(null, udfContext); Event event = new Event(); Map<String, Object> extractedFields = new HashMap<>(); - extractedFields.put("domain", "www.baidu.com"); + extractedFields.put("v1", "www.baidu.com"); event.setExtractedFields(extractedFields); Event result1 = domain.evaluate(event); - assertEquals("baidu.com", result1.getExtractedFields().get("domain1")); + assertEquals("baidu.com", result1.getExtractedFields().get("v2")); } } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java index 39c825f..4886547 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java @@ -21,15 +21,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class JsonExtractFunctionTest { private static UDFContext udfContext; - private static Map<String, Object> parameters ; @BeforeAll public static void setUp() { udfContext = new UDFContext(); - parameters = new HashMap<>(); - parameters.put("param","$.tags[?(@.tag=='device_group')][0].value"); - udfContext.setParameters(parameters); - udfContext.setLookup_fields(Arrays.asList("device_tag")); - udfContext.setOutput_fields(Arrays.asList("device_group")); } @Test @@ -46,11 +40,6 @@ public class JsonExtractFunctionTest { Assertions.assertThrows(GrootStreamRuntimeException.class, () -> { jsonExtract.open(null, udfContext); }); - - udfContext.setLookup_fields(new ArrayList<>()); - udfContext.getLookup_fields().add("v1"); - udfContext.setOutput_fields(new ArrayList<>()); - udfContext.getOutput_fields().add("v2"); udfContext.setParameters(new HashMap<>()); udfContext.getParameters().put("other","other"); Assertions.assertThrows(GrootStreamRuntimeException.class, () -> { @@ -64,6 +53,11 @@ public class JsonExtractFunctionTest { JsonExtract jsonExtract = new JsonExtract(); + Map<String, Object> parameters = new HashMap<>(); + parameters.put("param","$.tags[?(@.tag=='device_group')][0].value"); + udfContext.setParameters(parameters); + udfContext.setLookup_fields(Arrays.asList("device_tag")); + udfContext.setOutput_fields(Arrays.asList("device_group")); jsonExtract.open(null, udfContext); Event event = new Event(); String jsonString = "{\"device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-tsgx\\\"},{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-tsgx\\\"}]}\"}"; diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampConverterTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java index 4edbba1..93acbdd 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampConverterTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java @@ -2,7 +2,6 @@ package com.geedgenetworks.core.udf.test.simple; import com.geedgenetworks.core.pojo.Event; import com.geedgenetworks.core.pojo.UDFContext; -import com.geedgenetworks.core.udf.FromUnixTimestamp; import com.geedgenetworks.core.udf.UnixTimestampConverter; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -13,7 +12,7 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; -public class FromUnixTimestampConverterTest { +public class UnixTimestampConverterTest { private static UDFContext udfContext; @@ -24,7 +23,7 @@ public class FromUnixTimestampConverterTest { udfContext.setOutput_fields(Arrays.asList("output")); } @Test - public void testFromUnixTimestampFunctionMstoS() throws Exception { + public void testUnixTimestampFunctionMstoS() throws Exception { Map<String, Object> parameters = new HashMap<>(); parameters.put("precision", "seconds"); @@ -42,7 +41,7 @@ public class FromUnixTimestampConverterTest { } @Test - public void testFromUnixTimestampFunctionStoMs() throws Exception { + public void testUnixTimestampFunctionStoMs() throws Exception { Map<String, Object> parameters = new HashMap<>(); parameters.put("precision", "milliseconds"); @@ -61,7 +60,7 @@ public class FromUnixTimestampConverterTest { @Test - public void testFromUnixTimestampFunctionStoS() throws Exception { + public void testUnixTimestampFunctionStoS() throws Exception { Map<String, Object> parameters = new HashMap<>(); parameters.put("precision", "seconds"); @@ -80,7 +79,7 @@ public class FromUnixTimestampConverterTest { @Test - public void testFromUnixTimestampFunctionMstoMs() throws Exception { + public void testUnixTimestampFunctionMstoMs() throws Exception { Map<String, Object> parameters = new HashMap<>(); parameters.put("precision", "milliseconds"); diff --git a/groot-example/src/main/resources/examples/inline-to-console-job.yaml b/groot-example/src/main/resources/examples/inline-to-console-job.yaml index 4abac01..8bf6e7b 100644 --- a/groot-example/src/main/resources/examples/inline-to-console-job.yaml +++ b/groot-example/src/main/resources/examples/inline-to-console-job.yaml @@ -54,6 +54,17 @@ processing_pipelines: output_fields: [ log_id ] parameters: data_center_id_num: 1 + - function: ASN_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_asn ] + parameters: + option: IP_TO_ASN + vendor_id: tsg_asnlookup + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: milliseconds + sinks: print_sink: diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java index 4401213..6f5c4b4 100644 --- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java +++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java @@ -5,15 +5,20 @@ import com.geedgenetworks.core.pojo.Event; import com.geedgenetworks.core.types.StructType; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.HashMap; +import java.nio.charset.StandardCharsets; import java.util.Map; public class JsonEventDeserializationSchema implements DeserializationSchema<Event> { + private static final Logger LOG = LoggerFactory.getLogger(JsonEventDeserializationSchema.class); + private static final int MAX_CHARS_LENGTH = 1024 * 32; private final StructType dataType; private final boolean ignoreParseErrors; private final JsonToMapDataConverter converter; + private final char[] tmpChars = new char[MAX_CHARS_LENGTH]; public JsonEventDeserializationSchema(StructType dataType, boolean ignoreParseErrors) { this.dataType = dataType; @@ -22,16 +27,19 @@ public class JsonEventDeserializationSchema implements DeserializationSchema<Eve } @Override - public Event deserialize(byte[] message) throws IOException { + public Event deserialize(byte[] bytes) throws IOException { Map<String, Object> map; + String message = decodeUTF8(bytes, 0, bytes.length); + // 没有配置type, 不进行类型校验和转换 if(dataType == null){ try { map = JSON.parseObject(message); } catch (Exception e) { + LOG.error(String.format("json解析失败for:%s", message), e); if(ignoreParseErrors){ - map = new HashMap<>(); + return null; }else{ throw new IOException(e); } @@ -47,6 +55,106 @@ public class JsonEventDeserializationSchema implements DeserializationSchema<Eve return event; } + private String decodeUTF8(byte[] input, int offset, int byteLen) { + char[] chars = MAX_CHARS_LENGTH < byteLen? new char[byteLen]: tmpChars; + int len = decodeUTF8Strict(input, offset, byteLen, chars); + if (len < 0) { + return defaultDecodeUTF8(input, offset, byteLen); + } + return new String(chars, 0, len); + } + + private static int decodeUTF8Strict(byte[] sa, int sp, int len, char[] da) { + final int sl = sp + len; + int dp = 0; + int dlASCII = Math.min(len, da.length); + + // ASCII only optimized loop + while (dp < dlASCII && sa[sp] >= 0) { + da[dp++] = (char) sa[sp++]; + } + + while (sp < sl) { + int b1 = sa[sp++]; + if (b1 >= 0) { + // 1 byte, 7 bits: 0xxxxxxx + da[dp++] = (char) b1; + } else if ((b1 >> 5) == -2 && (b1 & 0x1e) != 0) { + // 2 bytes, 11 bits: 110xxxxx 10xxxxxx + if (sp < sl) { + int b2 = sa[sp++]; + if ((b2 & 0xc0) != 0x80) { // isNotContinuation(b2) + return -1; + } else { + da[dp++] = (char) (((b1 << 6) ^ b2) ^ (((byte) 0xC0 << 6) ^ ((byte) 0x80))); + } + continue; + } + return -1; + } else if ((b1 >> 4) == -2) { + // 3 bytes, 16 bits: 1110xxxx 10xxxxxx 10xxxxxx + if (sp + 1 < sl) { + int b2 = sa[sp++]; + int b3 = sa[sp++]; + if ((b1 == (byte) 0xe0 && (b2 & 0xe0) == 0x80) + || (b2 & 0xc0) != 0x80 + || (b3 & 0xc0) != 0x80) { // isMalformed3(b1, b2, b3) + return -1; + } else { + char c = + (char) + ((b1 << 12) + ^ (b2 << 6) + ^ (b3 + ^ (((byte) 0xE0 << 12) + ^ ((byte) 0x80 << 6) + ^ ((byte) 0x80)))); + if (Character.isSurrogate(c)) { + return -1; + } else { + da[dp++] = c; + } + } + continue; + } + return -1; + } else if ((b1 >> 3) == -2) { + // 4 bytes, 21 bits: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx + if (sp + 2 < sl) { + int b2 = sa[sp++]; + int b3 = sa[sp++]; + int b4 = sa[sp++]; + int uc = + ((b1 << 18) + ^ (b2 << 12) + ^ (b3 << 6) + ^ (b4 + ^ (((byte) 0xF0 << 18) + ^ ((byte) 0x80 << 12) + ^ ((byte) 0x80 << 6) + ^ ((byte) 0x80)))); + // isMalformed4 and shortest form check + if (((b2 & 0xc0) != 0x80 || (b3 & 0xc0) != 0x80 || (b4 & 0xc0) != 0x80) + || !Character.isSupplementaryCodePoint(uc)) { + return -1; + } else { + da[dp++] = Character.highSurrogate(uc); + da[dp++] = Character.lowSurrogate(uc); + } + continue; + } + return -1; + } else { + return -1; + } + } + return dp; + } + + private static String defaultDecodeUTF8(byte[] bytes, int offset, int len) { + return new String(bytes, offset, len, StandardCharsets.UTF_8); + } + @Override public boolean isEndOfStream(Event nextElement) { return false; diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java index 77fbb8e..5bec4e8 100644 --- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java +++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java @@ -23,7 +23,7 @@ public class JsonToMapDataConverter implements Serializable { this.filedConverters = Arrays.stream(dataType.fields).collect(Collectors.toMap(f -> f.name, f -> this.makeConverter(f.dataType))); } - public Map<String, Object> convert(byte[] message){ + public Map<String, Object> convert(String message){ try (JSONReader reader = JSONReader.of(message)) { if (!reader.nextIfMatch('{')) { throw new JSONException("object not start with {"); @@ -46,12 +46,12 @@ public class JsonToMapDataConverter implements Serializable { return obj; } catch (Exception e) { - LOG.error("json解析失败", e); + LOG.error(String.format("json解析失败for:%s", message), e); if(ignoreParseErrors){ - return new HashMap<>(); + return null; } - throw new UnsupportedOperationException("json格式不正确:" + new String(message, StandardCharsets.UTF_8)); + throw new UnsupportedOperationException("json格式不正确:" + message); } } |
