summaryrefslogtreecommitdiff
path: root/groot-common/src/main
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-11-09 20:01:24 +0800
committerdoufenghu <[email protected]>2024-11-09 20:01:24 +0800
commit16769de2e5ba334a5cfaacd8a53db2989264d022 (patch)
tree37dcce46bf5dbefb494498ac895f44b12d04e169 /groot-common/src/main
parentf3f2857a6e7bb9ccbf45c86209d971bafe75b603 (diff)
[Feature][SPI] 增加groot-spi模块,解耦core和common模块之间的复杂依赖关系,移除一些不需要的类库。
Diffstat (limited to 'groot-common/src/main')
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Event.java19
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/Accumulator.java (renamed from groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java)4
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java56
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java1
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java107
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigLocator.java1
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java1
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java (renamed from groot-common/src/main/java/com/geedgenetworks/common/Constants.java)2
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/FilterConfigOptions.java15
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java1
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java1
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/KeybyEntity.java (renamed from groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java)2
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/ProjectionConfigOptions.java34
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/SinkConfigOptions.java15
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/SourceConfigOptions.java26
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java18
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/TableConfigOptions.java34
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java63
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/UDFPluginConfigLocator.java1
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/crypto/CryptoShade.java29
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java17
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java19
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java31
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/TableFunction.java20
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java22
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/utils/HttpClientPoolUtil.java152
26 files changed, 156 insertions, 535 deletions
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Event.java b/groot-common/src/main/java/com/geedgenetworks/common/Event.java
deleted file mode 100644
index 20ecca7..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/Event.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.geedgenetworks.common;
-
-import lombok.Data;
-
-import java.io.Serializable;
-import java.util.Map;
-
-@Data
-public class Event implements Serializable {
- public static final String INTERNAL_TIMESTAMP_KEY = "__timestamp";
- public static final String INTERNAL_HEADERS_KEY = "__headers";
- public static final String WINDOW_START_TIMESTAMP = "__window_start_timestamp";
- public static final String WINDOW_END_TIMESTAMP = "__window_end_timestamp";
-
- private Map<String, Object> extractedFields;
- //Dropped flag, default is false. if set to true, indicates whether an event has been intentionally excluded and removed from further processing.
- private boolean isDropped = false;
-
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java b/groot-common/src/main/java/com/geedgenetworks/common/config/Accumulator.java
index 403cecc..fdadea3 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/Accumulator.java
@@ -1,8 +1,6 @@
-package com.geedgenetworks.common;
+package com.geedgenetworks.common.config;
import lombok.Data;
-import org.apache.flink.metrics.Counter;
-
import java.io.Serializable;
import java.util.Map;
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java
deleted file mode 100644
index af94abf..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import com.alibaba.fastjson2.TypeReference;
-import com.geedgenetworks.common.udf.UDFContext;
-
-import java.util.List;
-
-public interface AggregateConfigOptions {
- Option<String> TYPE = Options.key("type")
- .stringType()
- .noDefaultValue()
- .withDescription("The type of processor.");
-
- Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields")
- .listType()
- .noDefaultValue()
- .withDescription("The fields to be outputted.");
-
- Option<List<String>> REMOVE_FIELDS = Options.key("remove_fields")
- .listType()
- .noDefaultValue()
- .withDescription("The fields to be removed.");
-
- Option<List<UDFContext>> FUNCTIONS = Options.key("functions")
- .type(new TypeReference<List<UDFContext>>() {})
- .noDefaultValue()
- .withDescription("The functions to be executed.");
-
- Option<List<String>> GROUP_BY_FIELDS = Options.key("group_by_fields")
- .listType()
- .noDefaultValue()
- .withDescription("The fields to be key by.");
-
- Option<String> WINDOW_TYPE = Options.key("window_type")
- .stringType()
- .noDefaultValue()
- .withDescription("The type of window.");
-
- Option<Integer> WINDOW_SIZE = Options.key("window_size")
- .intType()
- .noDefaultValue()
- .withDescription("The size of window.");
- Option<Boolean> MINI_BATCH = Options.key("mini_batch")
- .booleanType()
- .defaultValue(false)
- .withDescription("The label of pre_aggrergate.");
- Option<Integer> WINDOW_SLIDE = Options.key("window_slide")
- .intType()
- .noDefaultValue()
- .withDescription("The size of sliding window.");
-
- Option<String> WINDOW_TIMESTAMP_FIELD = Options.key("window_timestamp_field")
- .stringType()
- .noDefaultValue()
- .withDescription("which field to be set the start time of window.");
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java
index 1d4e819..3084c00 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java
@@ -3,7 +3,6 @@ package com.geedgenetworks.common.config;
import com.typesafe.config.Config;
import java.util.Arrays;
-import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java
deleted file mode 100644
index f1170be..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java
+++ /dev/null
@@ -1,107 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import com.geedgenetworks.common.udf.UDFContext;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public final class CheckUDFContextUtil {
-
- private CheckUDFContextUtil() {}
-
- // Check if all the params are present in the UDFContext
- public static CheckResult checkAllExists(UDFContext context, String... params) {
- List<String> invalidParams = Arrays.stream(params)
- .filter(param -> isInvalidParam(context, param))
- .collect(Collectors.toList());
-
- if (!invalidParams.isEmpty()) {
- String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", invalidParams));
- return CheckResult.error(errorMsg);
- }
- return CheckResult.success();
- }
-
- // Check if at least one of the params is present in the UDFContext
- public static CheckResult checkAtLeastOneExists(UDFContext context, String... params) {
- if (params.length == 0) {
- return CheckResult.success();
- }
-
- List<String> invalidParams = Arrays.stream(params)
- .filter(param -> isInvalidParam(context, param))
- .collect(Collectors.toList());
-
- if (invalidParams.size() == params.length) {
- String errorMsg = java.lang.String.format("Please specify at least one config of [%s] as non-empty.", java.lang.String.join(",", invalidParams));
- return CheckResult.error(errorMsg);
- }
- return CheckResult.success();
- }
-
-
-
- // Check Array/Map Object has only one item
- public static CheckResult checkCollectionSingleItemExists (UDFContext context, String param) {
- if (context == null) {
- return CheckResult.error("UDFContext is null");
- }
-
- if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) {
- return context.getLookupFields() != null && context.getLookupFields().size() == 1 ? CheckResult.success() : CheckResult.error("Lookup fields should have only one item");
- } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) {
- return context.getOutputFields() != null && context.getOutputFields().size() == 1 ? CheckResult.success() : CheckResult.error("Output fields should have only one item");
- } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) {
- return context.getParameters() != null && context.getParameters().size() == 1 ? CheckResult.success() : CheckResult.error("Parameters should have only one item");
- } else {
- return CheckResult.error("Invalid param");
- }
-
- }
-
- // Check Parameters contains keys
- public static CheckResult checkParametersContainsKeys(UDFContext context, String... keys) {
- if (context == null) {
- return CheckResult.error("UDFContext is null");
- }
-
- if (context.getParameters() == null) {
- return CheckResult.error("Parameters is null");
- }
-
- List<String> missingKeys = Arrays.stream(keys)
- .filter(key -> !context.getParameters().containsKey(key))
- .collect(Collectors.toList());
-
- if (!missingKeys.isEmpty()) {
- String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", missingKeys));
- return CheckResult.error(errorMsg);
- }
- return CheckResult.success();
- }
-
- public static boolean isInvalidParam(UDFContext context, String param) {
- if (context == null) {
- return true;
- }
-
- if (UDFContextConfigOptions.NAME.key().equals(param)) {
- return context.getName() == null;
- } else if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) {
- return context.getLookupFields() == null || context.getLookupFields().isEmpty();
- } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) {
- return context.getOutputFields() == null || context.getOutputFields().isEmpty();
- } else if (UDFContextConfigOptions.FILTER.key().equals(param)) {
- return context.getFilter() == null;
- } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) {
- return context.getParameters() == null || context.getParameters().isEmpty();
- } else if (UDFContextConfigOptions.FUNCTION.key().equals(param)) {
- return context.getFunction() == null;
- } else {
- return true;
- }
-
- }
-
-
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigLocator.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigLocator.java
index 5302cc2..65e7437 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigLocator.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigLocator.java
@@ -1,6 +1,5 @@
package com.geedgenetworks.common.config;
-import com.geedgenetworks.common.Constants;
import com.hazelcast.internal.config.AbstractConfigLocator;
import static com.hazelcast.internal.config.DeclarativeConfigUtil.YAML_ACCEPTED_SUFFIXES;
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java
index a967ae5..5dfcc1c 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java
@@ -1,6 +1,5 @@
package com.geedgenetworks.common.config;
-import com.geedgenetworks.common.Constants;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.config.YamlClientConfigBuilder;
import com.hazelcast.client.config.impl.YamlClientConfigLocator;
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java
index 27ce8fb..ac4d0bf 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.common;
+package com.geedgenetworks.common.config;
public final class Constants {
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/FilterConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/FilterConfigOptions.java
deleted file mode 100644
index a553608..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/FilterConfigOptions.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import java.util.Map;
-
-public interface FilterConfigOptions {
- Option<String> TYPE = Options.key("type")
- .stringType()
- .noDefaultValue()
- .withDescription("The type of filter .");
-
- Option<Map<String, String>> PROPERTIES = Options.key("properties")
- .mapType()
- .noDefaultValue()
- .withDescription("Custom properties for filter.");
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java
index 189b05b..4dc6cbe 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java
@@ -1,6 +1,5 @@
package com.geedgenetworks.common.config;
-import com.geedgenetworks.common.Constants;
import com.hazelcast.config.Config;
import lombok.extern.slf4j.Slf4j;
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java
index 4b5a974..842d1bf 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java
@@ -13,6 +13,7 @@ import org.w3c.dom.Node;
import java.io.InputStream;
import java.util.Properties;
+
import static com.hazelcast.internal.config.yaml.W3cDomUtil.asW3cNode;
public class GrootStreamConfigBuilder extends AbstractYamlConfigBuilder {
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java b/groot-common/src/main/java/com/geedgenetworks/common/config/KeybyEntity.java
index f1dc38f..3e6ded8 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/KeybyEntity.java
@@ -1,4 +1,4 @@
-package com.geedgenetworks.common;
+package com.geedgenetworks.common.config;
import lombok.Data;
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ProjectionConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ProjectionConfigOptions.java
deleted file mode 100644
index 1a813af..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/ProjectionConfigOptions.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import com.alibaba.fastjson2.TypeReference;
-import com.geedgenetworks.common.udf.UDFContext;
-
-import java.util.List;
-
-public interface ProjectionConfigOptions {
- Option<String> TYPE = Options.key("type")
- .stringType()
- .noDefaultValue()
- .withDescription("The type of processor.");
-
- Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields")
- .listType()
- .noDefaultValue()
- .withDescription("The fields to be outputted.");
-
- Option<List<String>> REMOVE_FIELDS = Options.key("remove_fields")
- .listType()
- .noDefaultValue()
- .withDescription("The fields to be removed.");
-
- Option<List<UDFContext>> FUNCTIONS = Options.key("functions")
- .type(new TypeReference<List<UDFContext>>() {})
- .noDefaultValue()
- .withDescription("The functions to be executed.");
-
-
-
-
-
-
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/SinkConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SinkConfigOptions.java
deleted file mode 100644
index 1662bcb..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/SinkConfigOptions.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import java.util.Map;
-
-public interface SinkConfigOptions {
- Option<String> TYPE = Options.key("type")
- .stringType()
- .noDefaultValue()
- .withDescription("The type of sink.");
-
- Option<Map<String, String>> PROPERTIES = Options.key("properties")
- .mapType()
- .noDefaultValue()
- .withDescription("Custom properties for sink.");
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/SourceConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SourceConfigOptions.java
deleted file mode 100644
index 4192fe9..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/SourceConfigOptions.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import com.alibaba.fastjson2.TypeReference;
-import java.util.List;
-import java.util.Map;
-
-public interface SourceConfigOptions {
- Option<String> TYPE = Options.key("type")
- .stringType()
- .noDefaultValue()
- .withDescription("The type of source.");
-
- Option<Map<String, String>> PROPERTIES = Options.key("properties")
- .mapType()
- .noDefaultValue()
- .withDescription("Custom properties for source.");
-
- Option<List<Map<String, String>>> FIELDS = Options.key("fields")
- .type(new TypeReference<List<Map<String, String>>>() {})
- .noDefaultValue()
- .withDescription("When fields is not specified, all fields in source will be outputted as format of Map<String, Object>.");
-
-
-
-
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java
deleted file mode 100644
index a2acb71..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import com.alibaba.fastjson2.TypeReference;
-import com.geedgenetworks.common.udf.RuleContext;
-import java.util.List;
-
-public interface SplitConfigOptions {
- Option<String> TYPE = Options.key("type")
- .stringType()
- .noDefaultValue()
- .withDescription("The type of route .");
-
- Option<List<RuleContext>> RULES = Options.key("rules")
- .type(new TypeReference<List<RuleContext>>() {})
- .noDefaultValue()
- .withDescription("The rules to be executed.");
-
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/TableConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/TableConfigOptions.java
deleted file mode 100644
index 480496d..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/TableConfigOptions.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import com.alibaba.fastjson2.TypeReference;
-import com.geedgenetworks.common.udf.UDFContext;
-
-import java.util.List;
-
-public interface TableConfigOptions {
- Option<String> TYPE = Options.key("type")
- .stringType()
- .noDefaultValue()
- .withDescription("The type of processor.");
-
- Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields")
- .listType()
- .noDefaultValue()
- .withDescription("The fields to be outputted.");
-
- Option<List<String>> REMOVE_FIELDS = Options.key("remove_fields")
- .listType()
- .noDefaultValue()
- .withDescription("The fields to be removed.");
-
- Option<List<UDFContext>> FUNCTIONS = Options.key("functions")
- .type(new TypeReference<List<UDFContext>>() {})
- .noDefaultValue()
- .withDescription("The functions to be executed.");
-
-
-
-
-
-
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java
deleted file mode 100644
index 87bbf36..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import java.util.List;
-import java.util.Map;
-import com.alibaba.fastjson2.TypeReference;
-public interface UDFContextConfigOptions {
- Option<String> NAME = Options.key("name")
- .stringType()
- .noDefaultValue()
- .withDescription("The name of the function.");
-
- Option<List<String>> LOOKUP_FIELDS = Options.key("lookup_fields")
- .listType()
- .noDefaultValue()
- .withDescription("The fields to be looked up.");
-
- Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields")
- .listType()
- .noDefaultValue()
- .withDescription("The fields to be outputted.");
-
- Option<String> FILTER = Options.key("filter")
- .stringType()
- .noDefaultValue()
- .withDescription("The filter expression to be applied.");
-
- Option<Map<String, Object>> PARAMETERS = Options.key("parameters")
- .type(new TypeReference<Map<String, Object>>() {})
- .noDefaultValue()
- .withDescription("The parameters for the function.");
-
- Option<String> PARAMETERS_KB_NAME = Options.key("kb_name")
- .stringType()
- .noDefaultValue()
- .withDescription("The name of the knowledge base.");
-
- Option<String> PARAMETERS_OPTION = Options.key("option")
- .stringType()
- .noDefaultValue()
- .withDescription("The option for the function.");
-
- Option<Map<String, String>> PARAMETERS_GEOLOCATION_FIELD_MAPPING = Options.key("geolocation_field_mapping")
- .mapType()
- .noDefaultValue()
- .withDescription("The geolocation field mapping.");
-
- Option<String> PARAMETERS_IDENTIFIER = Options.key("identifier")
- .stringType()
- .noDefaultValue()
- .withDescription("The identifier for the parameters of function.");
-
- Option<String> PARAMETERS_SECRET_KEY = Options.key("secret_key")
- .stringType()
- .noDefaultValue()
- .withDescription("The secret key for the function.");
-
- Option<String> FUNCTION = Options.key("function")
- .stringType()
- .noDefaultValue()
- .withDescription("The function to be executed.");
-
-
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFPluginConfigLocator.java b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFPluginConfigLocator.java
index 49da576..4be72b6 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFPluginConfigLocator.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFPluginConfigLocator.java
@@ -1,6 +1,5 @@
package com.geedgenetworks.common.config;
-import com.geedgenetworks.common.Constants;
import com.hazelcast.internal.config.AbstractConfigLocator;
import lombok.extern.slf4j.Slf4j;
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/crypto/CryptoShade.java b/groot-common/src/main/java/com/geedgenetworks/common/crypto/CryptoShade.java
deleted file mode 100644
index 985f4df..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/crypto/CryptoShade.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package com.geedgenetworks.common.crypto;
-
-public interface CryptoShade {
-
- /**
- * The unique identifier of the current interface, used it to select the correct {@link
- * CryptoShade}
- */
- String getIdentifier();
-
- /**
- * Encrypt the content
- *
- * @param content The content to encrypt
- */
- String encrypt(String content);
-
- /**
- * Decrypt the content
- *
- * @param content The content to decrypt
- */
- String decrypt(String content);
-
- /** To expand the options that user want to encrypt */
- default String[] sensitiveOptions() {
- return new String[0];
- }
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java
deleted file mode 100644
index 6f6e048..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package com.geedgenetworks.common.udf;
-
-import com.geedgenetworks.common.Accumulator;
-import com.geedgenetworks.common.Event;
-
-import java.io.Serializable;
-
-public interface AggregateFunction extends Serializable {
-
- void open(UDFContext udfContext);
- Accumulator initAccumulator(Accumulator acc);
- Accumulator add(Event val, Accumulator acc);
- String functionName();
- Accumulator getResult(Accumulator acc);
- Accumulator merge(Accumulator a, Accumulator b);
- default void close(){};
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java
deleted file mode 100644
index 6aa9e3d..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.geedgenetworks.common.udf;
-
-import com.geedgenetworks.common.Event;
-import com.googlecode.aviator.Expression;
-import lombok.Data;
-import org.apache.flink.util.OutputTag;
-
-import java.io.Serializable;
-
-@Data
-public class RuleContext implements Serializable {
-
- private String tag;
- private String expression;
- private Expression compiledExpression;
- private OutputTag<Event> outputTag ;
-
-
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java
deleted file mode 100644
index 2723652..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package com.geedgenetworks.common.udf;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.config.CheckUDFContextUtil;
-import com.geedgenetworks.common.config.UDFContextConfigOptions;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import java.io.Serializable;
-
-public interface ScalarFunction extends Serializable {
-
- void open(RuntimeContext runtimeContext, UDFContext udfContext);
-
- Event evaluate(Event event);
-
- String functionName();
-
- void close();
-
- default void checkConfig(UDFContext udfContext) {
- if (udfContext == null) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "UDFContext cannot be null");
- }
-
- if (!CheckUDFContextUtil.checkAtLeastOneExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key(), UDFContextConfigOptions.OUTPUT_FIELDS.key(), UDFContextConfigOptions.FILTER.key()).isSuccess()) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "At least one of the config should be specified.");
- }
-
- }
-
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/TableFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/TableFunction.java
deleted file mode 100644
index e602291..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/udf/TableFunction.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package com.geedgenetworks.common.udf;
-
-import com.geedgenetworks.common.Event;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
-import java.util.List;
-
-public interface TableFunction extends Serializable {
-
- void open(RuntimeContext runtimeContext, UDFContext udfContext);
-
- List<Event> evaluate(Event event);
-
- String functionName();
-
- void close();
-
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java
deleted file mode 100644
index ea98226..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package com.geedgenetworks.common.udf;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import lombok.Data;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-@Data
-public class UDFContext implements Serializable {
-
- private String name;
- @JsonProperty("lookup_fields")
- private List<String> lookupFields;
- @JsonProperty("output_fields")
- private List<String> outputFields;
- private String filter;
- private Map<String, Object> parameters;
- private String function;
-
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/HttpClientPoolUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/HttpClientPoolUtil.java
new file mode 100644
index 0000000..cb98cc9
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/HttpClientPoolUtil.java
@@ -0,0 +1,152 @@
+package com.geedgenetworks.common.utils;
+
+import com.geedgenetworks.shaded.org.apache.http.*;
+import com.geedgenetworks.shaded.org.apache.http.HttpEntity;
+import com.geedgenetworks.shaded.org.apache.http.client.methods.CloseableHttpResponse;
+import com.geedgenetworks.shaded.org.apache.http.client.methods.HttpGet;
+import com.geedgenetworks.shaded.org.apache.http.config.Registry;
+import com.geedgenetworks.shaded.org.apache.http.config.RegistryBuilder;
+import com.geedgenetworks.shaded.org.apache.http.conn.socket.ConnectionSocketFactory;
+import com.geedgenetworks.shaded.org.apache.http.conn.socket.PlainConnectionSocketFactory;
+import com.geedgenetworks.shaded.org.apache.http.conn.ssl.NoopHostnameVerifier;
+import com.geedgenetworks.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import com.geedgenetworks.shaded.org.apache.http.impl.client.CloseableHttpClient;
+import com.geedgenetworks.shaded.org.apache.http.impl.client.HttpClientBuilder;
+import com.geedgenetworks.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import com.geedgenetworks.shaded.org.apache.http.util.EntityUtils;
+import com.geedgenetworks.utils.StringUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.io.IOException;
+import java.net.URI;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.X509Certificate;
+
+@Slf4j
+public class HttpClientPoolUtil {
+
+ private static HttpClientPoolUtil instance;
+ private final CloseableHttpClient httpClient;
+
+ private static final int DEFAULT_MAX_TOTAL = 400;
+ private static final int DEFAULT_MAX_PER_ROUTE = 10;
+ private static final int DEFAULT_CONNECTION_REQUEST_TIMEOUT = 10000;
+ private static final int DEFAULT_CONNECT_TIMEOUT = 10000;
+ private static final int DEFAULT_SOCKET_TIMEOUT = 60000;
+ private HttpClientPoolUtil() {
+ // 创建连接池管理器
+ PoolingHttpClientConnectionManager connectionManager = getSslClientManager();
+ // 设置最大连接数
+ connectionManager.setMaxTotal(DEFAULT_MAX_TOTAL);
+ // 设置每个路由的最大连接数
+ connectionManager.setDefaultMaxPerRoute(DEFAULT_MAX_PER_ROUTE);
+ // 创建 HttpClient 实例
+ httpClient = HttpClientBuilder.create()
+ .setConnectionManager(connectionManager)
+ .build();
+ }
+
+ private PoolingHttpClientConnectionManager getSslClientManager() {
+ try {
+ // 在调用SSL之前需要重写验证方法,取消检测SSL
+ X509TrustManager trustManager =
+ new X509TrustManager() {
+ @Override
+ public X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+
+ @Override
+ public void checkClientTrusted(X509Certificate[] xcs, String str) {}
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] xcs, String str) {}
+ };
+ SSLContext ctx = SSLContext.getInstance(SSLConnectionSocketFactory.TLS);
+ ctx.init(null, new TrustManager[] {trustManager}, null);
+ SSLConnectionSocketFactory socketFactory =
+ new SSLConnectionSocketFactory(ctx, NoopHostnameVerifier.INSTANCE);
+ Registry<ConnectionSocketFactory> socketFactoryRegistry =
+ RegistryBuilder.<ConnectionSocketFactory>create()
+ .register("http", PlainConnectionSocketFactory.INSTANCE)
+ .register("https", socketFactory)
+ .build();
+ // 创建ConnectionManager,添加Connection配置信息
+ PoolingHttpClientConnectionManager connManager =
+ new PoolingHttpClientConnectionManager(socketFactoryRegistry);
+ // 设置最大连接数
+ connManager.setMaxTotal(DEFAULT_MAX_TOTAL);
+ // 设置每个连接的路由数
+ connManager.setDefaultMaxPerRoute(DEFAULT_MAX_PER_ROUTE);
+ return connManager;
+ } catch (KeyManagementException | NoSuchAlgorithmException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ public static synchronized HttpClientPoolUtil getInstance() {
+ if (instance == null) {
+ instance = new HttpClientPoolUtil();
+ }
+ return instance;
+ }
+
+ public String httpGet(URI uri, Header... headers) throws IOException {
+ HttpGet httpGet = new HttpGet(uri);
+ String msg ="";
+ if (StringUtil.isNotEmpty(headers)) {
+ for (Header h : headers) {
+ httpGet.addHeader(h);
+ // log.info("request header : {}", h);
+ }
+ }
+ try(CloseableHttpResponse response = httpClient.execute(httpGet)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ // 获取响应实体
+ HttpEntity entity = response.getEntity();
+ // 获取响应信息
+ msg = EntityUtils.toString(entity, "UTF-8");
+
+ if (statusCode != HttpStatus.SC_OK) {
+ log.error("Http get content is :{}", msg);
+ }
+ }
+ return msg;
+ }
+
+ public void close() throws IOException {
+ httpClient.close();
+ }
+
+ public byte[] httpGetByte(String url, Header... headers) {
+ byte[] result = null;
+ // 获取客户端连接对象
+ // 创建GET请求对象
+ HttpGet httpGet = new HttpGet(url);
+ if (StringUtil.isNotEmpty(headers)) {
+ for (Header h : headers) {
+ httpGet.addHeader(h);
+ }
+ }
+
+ try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ // 获取响应实体
+ result = IOUtils.toByteArray(response.getEntity().getContent());
+
+ if (statusCode != HttpStatus.SC_OK) {
+ log.error("httpGetByte error ! " + statusCode);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return result;
+
+ }
+
+}