diff options
| author | doufenghu <[email protected]> | 2024-11-09 20:01:24 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-11-09 20:01:24 +0800 |
| commit | 16769de2e5ba334a5cfaacd8a53db2989264d022 (patch) | |
| tree | 37dcce46bf5dbefb494498ac895f44b12d04e169 /groot-common/src/main | |
| parent | f3f2857a6e7bb9ccbf45c86209d971bafe75b603 (diff) | |
[Feature][SPI] 增加groot-spi模块,解耦core和common模块之间的复杂依赖关系,移除一些不需要的类库。
Diffstat (limited to 'groot-common/src/main')
26 files changed, 156 insertions, 535 deletions
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Event.java b/groot-common/src/main/java/com/geedgenetworks/common/Event.java deleted file mode 100644 index 20ecca7..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/Event.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.geedgenetworks.common; - -import lombok.Data; - -import java.io.Serializable; -import java.util.Map; - -@Data -public class Event implements Serializable { - public static final String INTERNAL_TIMESTAMP_KEY = "__timestamp"; - public static final String INTERNAL_HEADERS_KEY = "__headers"; - public static final String WINDOW_START_TIMESTAMP = "__window_start_timestamp"; - public static final String WINDOW_END_TIMESTAMP = "__window_end_timestamp"; - - private Map<String, Object> extractedFields; - //Dropped flag, default is false. if set to true, indicates whether an event has been intentionally excluded and removed from further processing. - private boolean isDropped = false; - -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java b/groot-common/src/main/java/com/geedgenetworks/common/config/Accumulator.java index 403cecc..fdadea3 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Accumulator.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/Accumulator.java @@ -1,8 +1,6 @@ -package com.geedgenetworks.common; +package com.geedgenetworks.common.config; import lombok.Data; -import org.apache.flink.metrics.Counter; - import java.io.Serializable; import java.util.Map; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java deleted file mode 100644 index af94abf..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/AggregateConfigOptions.java +++ /dev/null @@ -1,56 +0,0 @@ -package com.geedgenetworks.common.config; - -import com.alibaba.fastjson2.TypeReference; -import com.geedgenetworks.common.udf.UDFContext; - -import java.util.List; - -public interface AggregateConfigOptions { - Option<String> TYPE = Options.key("type") - .stringType() - .noDefaultValue() - .withDescription("The type of processor."); - - Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields") - .listType() - .noDefaultValue() - .withDescription("The fields to be outputted."); - - Option<List<String>> REMOVE_FIELDS = Options.key("remove_fields") - .listType() - .noDefaultValue() - .withDescription("The fields to be removed."); - - Option<List<UDFContext>> FUNCTIONS = Options.key("functions") - .type(new TypeReference<List<UDFContext>>() {}) - .noDefaultValue() - .withDescription("The functions to be executed."); - - Option<List<String>> GROUP_BY_FIELDS = Options.key("group_by_fields") - .listType() - .noDefaultValue() - .withDescription("The fields to be key by."); - - Option<String> WINDOW_TYPE = Options.key("window_type") - .stringType() - .noDefaultValue() - .withDescription("The type of window."); - - Option<Integer> WINDOW_SIZE = Options.key("window_size") - .intType() - .noDefaultValue() - .withDescription("The size of window."); - Option<Boolean> MINI_BATCH = Options.key("mini_batch") - .booleanType() - .defaultValue(false) - .withDescription("The label of pre_aggrergate."); - Option<Integer> WINDOW_SLIDE = Options.key("window_slide") - .intType() - .noDefaultValue() - .withDescription("The size of sliding window."); - - Option<String> WINDOW_TIMESTAMP_FIELD = Options.key("window_timestamp_field") - .stringType() - .noDefaultValue() - .withDescription("which field to be set the start time of window."); -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java index 1d4e819..3084c00 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java @@ -3,7 +3,6 @@ package com.geedgenetworks.common.config; import com.typesafe.config.Config; import java.util.Arrays; -import java.util.LinkedList; import java.util.List; import java.util.stream.Collectors; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java deleted file mode 100644 index f1170be..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java +++ /dev/null @@ -1,107 +0,0 @@ -package com.geedgenetworks.common.config; - -import com.geedgenetworks.common.udf.UDFContext; -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; - -public final class CheckUDFContextUtil { - - private CheckUDFContextUtil() {} - - // Check if all the params are present in the UDFContext - public static CheckResult checkAllExists(UDFContext context, String... params) { - List<String> invalidParams = Arrays.stream(params) - .filter(param -> isInvalidParam(context, param)) - .collect(Collectors.toList()); - - if (!invalidParams.isEmpty()) { - String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", invalidParams)); - return CheckResult.error(errorMsg); - } - return CheckResult.success(); - } - - // Check if at least one of the params is present in the UDFContext - public static CheckResult checkAtLeastOneExists(UDFContext context, String... params) { - if (params.length == 0) { - return CheckResult.success(); - } - - List<String> invalidParams = Arrays.stream(params) - .filter(param -> isInvalidParam(context, param)) - .collect(Collectors.toList()); - - if (invalidParams.size() == params.length) { - String errorMsg = java.lang.String.format("Please specify at least one config of [%s] as non-empty.", java.lang.String.join(",", invalidParams)); - return CheckResult.error(errorMsg); - } - return CheckResult.success(); - } - - - - // Check Array/Map Object has only one item - public static CheckResult checkCollectionSingleItemExists (UDFContext context, String param) { - if (context == null) { - return CheckResult.error("UDFContext is null"); - } - - if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) { - return context.getLookupFields() != null && context.getLookupFields().size() == 1 ? CheckResult.success() : CheckResult.error("Lookup fields should have only one item"); - } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) { - return context.getOutputFields() != null && context.getOutputFields().size() == 1 ? CheckResult.success() : CheckResult.error("Output fields should have only one item"); - } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) { - return context.getParameters() != null && context.getParameters().size() == 1 ? CheckResult.success() : CheckResult.error("Parameters should have only one item"); - } else { - return CheckResult.error("Invalid param"); - } - - } - - // Check Parameters contains keys - public static CheckResult checkParametersContainsKeys(UDFContext context, String... keys) { - if (context == null) { - return CheckResult.error("UDFContext is null"); - } - - if (context.getParameters() == null) { - return CheckResult.error("Parameters is null"); - } - - List<String> missingKeys = Arrays.stream(keys) - .filter(key -> !context.getParameters().containsKey(key)) - .collect(Collectors.toList()); - - if (!missingKeys.isEmpty()) { - String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", missingKeys)); - return CheckResult.error(errorMsg); - } - return CheckResult.success(); - } - - public static boolean isInvalidParam(UDFContext context, String param) { - if (context == null) { - return true; - } - - if (UDFContextConfigOptions.NAME.key().equals(param)) { - return context.getName() == null; - } else if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) { - return context.getLookupFields() == null || context.getLookupFields().isEmpty(); - } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) { - return context.getOutputFields() == null || context.getOutputFields().isEmpty(); - } else if (UDFContextConfigOptions.FILTER.key().equals(param)) { - return context.getFilter() == null; - } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) { - return context.getParameters() == null || context.getParameters().isEmpty(); - } else if (UDFContextConfigOptions.FUNCTION.key().equals(param)) { - return context.getFunction() == null; - } else { - return true; - } - - } - - -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigLocator.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigLocator.java index 5302cc2..65e7437 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigLocator.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigLocator.java @@ -1,6 +1,5 @@ package com.geedgenetworks.common.config; -import com.geedgenetworks.common.Constants; import com.hazelcast.internal.config.AbstractConfigLocator; import static com.hazelcast.internal.config.DeclarativeConfigUtil.YAML_ACCEPTED_SUFFIXES; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java index a967ae5..5dfcc1c 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigProvider.java @@ -1,6 +1,5 @@ package com.geedgenetworks.common.config; -import com.geedgenetworks.common.Constants; import com.hazelcast.client.config.ClientConfig; import com.hazelcast.client.config.YamlClientConfigBuilder; import com.hazelcast.client.config.impl.YamlClientConfigLocator; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java index 27ce8fb..ac4d0bf 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/Constants.java @@ -1,4 +1,4 @@ -package com.geedgenetworks.common; +package com.geedgenetworks.common.config; public final class Constants { diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/FilterConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/FilterConfigOptions.java deleted file mode 100644 index a553608..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/FilterConfigOptions.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.geedgenetworks.common.config; - -import java.util.Map; - -public interface FilterConfigOptions { - Option<String> TYPE = Options.key("type") - .stringType() - .noDefaultValue() - .withDescription("The type of filter ."); - - Option<Map<String, String>> PROPERTIES = Options.key("properties") - .mapType() - .noDefaultValue() - .withDescription("Custom properties for filter."); -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java index 189b05b..4dc6cbe 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfig.java @@ -1,6 +1,5 @@ package com.geedgenetworks.common.config; -import com.geedgenetworks.common.Constants; import com.hazelcast.config.Config; import lombok.extern.slf4j.Slf4j; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java index 4b5a974..842d1bf 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/GrootStreamConfigBuilder.java @@ -13,6 +13,7 @@ import org.w3c.dom.Node; import java.io.InputStream; import java.util.Properties; + import static com.hazelcast.internal.config.yaml.W3cDomUtil.asW3cNode; public class GrootStreamConfigBuilder extends AbstractYamlConfigBuilder { diff --git a/groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java b/groot-common/src/main/java/com/geedgenetworks/common/config/KeybyEntity.java index f1dc38f..3e6ded8 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/KeybyEntity.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/KeybyEntity.java @@ -1,4 +1,4 @@ -package com.geedgenetworks.common; +package com.geedgenetworks.common.config; import lombok.Data; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ProjectionConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ProjectionConfigOptions.java deleted file mode 100644 index 1a813af..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/ProjectionConfigOptions.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.geedgenetworks.common.config; - -import com.alibaba.fastjson2.TypeReference; -import com.geedgenetworks.common.udf.UDFContext; - -import java.util.List; - -public interface ProjectionConfigOptions { - Option<String> TYPE = Options.key("type") - .stringType() - .noDefaultValue() - .withDescription("The type of processor."); - - Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields") - .listType() - .noDefaultValue() - .withDescription("The fields to be outputted."); - - Option<List<String>> REMOVE_FIELDS = Options.key("remove_fields") - .listType() - .noDefaultValue() - .withDescription("The fields to be removed."); - - Option<List<UDFContext>> FUNCTIONS = Options.key("functions") - .type(new TypeReference<List<UDFContext>>() {}) - .noDefaultValue() - .withDescription("The functions to be executed."); - - - - - - -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/SinkConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SinkConfigOptions.java deleted file mode 100644 index 1662bcb..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/SinkConfigOptions.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.geedgenetworks.common.config; - -import java.util.Map; - -public interface SinkConfigOptions { - Option<String> TYPE = Options.key("type") - .stringType() - .noDefaultValue() - .withDescription("The type of sink."); - - Option<Map<String, String>> PROPERTIES = Options.key("properties") - .mapType() - .noDefaultValue() - .withDescription("Custom properties for sink."); -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/SourceConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SourceConfigOptions.java deleted file mode 100644 index 4192fe9..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/SourceConfigOptions.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.geedgenetworks.common.config; - -import com.alibaba.fastjson2.TypeReference; -import java.util.List; -import java.util.Map; - -public interface SourceConfigOptions { - Option<String> TYPE = Options.key("type") - .stringType() - .noDefaultValue() - .withDescription("The type of source."); - - Option<Map<String, String>> PROPERTIES = Options.key("properties") - .mapType() - .noDefaultValue() - .withDescription("Custom properties for source."); - - Option<List<Map<String, String>>> FIELDS = Options.key("fields") - .type(new TypeReference<List<Map<String, String>>>() {}) - .noDefaultValue() - .withDescription("When fields is not specified, all fields in source will be outputted as format of Map<String, Object>."); - - - - -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java deleted file mode 100644 index a2acb71..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/SplitConfigOptions.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.geedgenetworks.common.config; - -import com.alibaba.fastjson2.TypeReference; -import com.geedgenetworks.common.udf.RuleContext; -import java.util.List; - -public interface SplitConfigOptions { - Option<String> TYPE = Options.key("type") - .stringType() - .noDefaultValue() - .withDescription("The type of route ."); - - Option<List<RuleContext>> RULES = Options.key("rules") - .type(new TypeReference<List<RuleContext>>() {}) - .noDefaultValue() - .withDescription("The rules to be executed."); - -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/TableConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/TableConfigOptions.java deleted file mode 100644 index 480496d..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/TableConfigOptions.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.geedgenetworks.common.config; - -import com.alibaba.fastjson2.TypeReference; -import com.geedgenetworks.common.udf.UDFContext; - -import java.util.List; - -public interface TableConfigOptions { - Option<String> TYPE = Options.key("type") - .stringType() - .noDefaultValue() - .withDescription("The type of processor."); - - Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields") - .listType() - .noDefaultValue() - .withDescription("The fields to be outputted."); - - Option<List<String>> REMOVE_FIELDS = Options.key("remove_fields") - .listType() - .noDefaultValue() - .withDescription("The fields to be removed."); - - Option<List<UDFContext>> FUNCTIONS = Options.key("functions") - .type(new TypeReference<List<UDFContext>>() {}) - .noDefaultValue() - .withDescription("The functions to be executed."); - - - - - - -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java deleted file mode 100644 index 87bbf36..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java +++ /dev/null @@ -1,63 +0,0 @@ -package com.geedgenetworks.common.config; - -import java.util.List; -import java.util.Map; -import com.alibaba.fastjson2.TypeReference; -public interface UDFContextConfigOptions { - Option<String> NAME = Options.key("name") - .stringType() - .noDefaultValue() - .withDescription("The name of the function."); - - Option<List<String>> LOOKUP_FIELDS = Options.key("lookup_fields") - .listType() - .noDefaultValue() - .withDescription("The fields to be looked up."); - - Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields") - .listType() - .noDefaultValue() - .withDescription("The fields to be outputted."); - - Option<String> FILTER = Options.key("filter") - .stringType() - .noDefaultValue() - .withDescription("The filter expression to be applied."); - - Option<Map<String, Object>> PARAMETERS = Options.key("parameters") - .type(new TypeReference<Map<String, Object>>() {}) - .noDefaultValue() - .withDescription("The parameters for the function."); - - Option<String> PARAMETERS_KB_NAME = Options.key("kb_name") - .stringType() - .noDefaultValue() - .withDescription("The name of the knowledge base."); - - Option<String> PARAMETERS_OPTION = Options.key("option") - .stringType() - .noDefaultValue() - .withDescription("The option for the function."); - - Option<Map<String, String>> PARAMETERS_GEOLOCATION_FIELD_MAPPING = Options.key("geolocation_field_mapping") - .mapType() - .noDefaultValue() - .withDescription("The geolocation field mapping."); - - Option<String> PARAMETERS_IDENTIFIER = Options.key("identifier") - .stringType() - .noDefaultValue() - .withDescription("The identifier for the parameters of function."); - - Option<String> PARAMETERS_SECRET_KEY = Options.key("secret_key") - .stringType() - .noDefaultValue() - .withDescription("The secret key for the function."); - - Option<String> FUNCTION = Options.key("function") - .stringType() - .noDefaultValue() - .withDescription("The function to be executed."); - - -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFPluginConfigLocator.java b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFPluginConfigLocator.java index 49da576..4be72b6 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFPluginConfigLocator.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFPluginConfigLocator.java @@ -1,6 +1,5 @@ package com.geedgenetworks.common.config; -import com.geedgenetworks.common.Constants; import com.hazelcast.internal.config.AbstractConfigLocator; import lombok.extern.slf4j.Slf4j; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/crypto/CryptoShade.java b/groot-common/src/main/java/com/geedgenetworks/common/crypto/CryptoShade.java deleted file mode 100644 index 985f4df..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/crypto/CryptoShade.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.geedgenetworks.common.crypto; - -public interface CryptoShade { - - /** - * The unique identifier of the current interface, used it to select the correct {@link - * CryptoShade} - */ - String getIdentifier(); - - /** - * Encrypt the content - * - * @param content The content to encrypt - */ - String encrypt(String content); - - /** - * Decrypt the content - * - * @param content The content to decrypt - */ - String decrypt(String content); - - /** To expand the options that user want to encrypt */ - default String[] sensitiveOptions() { - return new String[0]; - } -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java deleted file mode 100644 index 6f6e048..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/AggregateFunction.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.geedgenetworks.common.udf; - -import com.geedgenetworks.common.Accumulator; -import com.geedgenetworks.common.Event; - -import java.io.Serializable; - -public interface AggregateFunction extends Serializable { - - void open(UDFContext udfContext); - Accumulator initAccumulator(Accumulator acc); - Accumulator add(Event val, Accumulator acc); - String functionName(); - Accumulator getResult(Accumulator acc); - Accumulator merge(Accumulator a, Accumulator b); - default void close(){}; -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java deleted file mode 100644 index 6aa9e3d..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.geedgenetworks.common.udf; - -import com.geedgenetworks.common.Event; -import com.googlecode.aviator.Expression; -import lombok.Data; -import org.apache.flink.util.OutputTag; - -import java.io.Serializable; - -@Data -public class RuleContext implements Serializable { - - private String tag; - private String expression; - private Expression compiledExpression; - private OutputTag<Event> outputTag ; - - -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java deleted file mode 100644 index 2723652..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.geedgenetworks.common.udf; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.config.CheckUDFContextUtil; -import com.geedgenetworks.common.config.UDFContextConfigOptions; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import org.apache.flink.api.common.functions.RuntimeContext; -import java.io.Serializable; - -public interface ScalarFunction extends Serializable { - - void open(RuntimeContext runtimeContext, UDFContext udfContext); - - Event evaluate(Event event); - - String functionName(); - - void close(); - - default void checkConfig(UDFContext udfContext) { - if (udfContext == null) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "UDFContext cannot be null"); - } - - if (!CheckUDFContextUtil.checkAtLeastOneExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key(), UDFContextConfigOptions.OUTPUT_FIELDS.key(), UDFContextConfigOptions.FILTER.key()).isSuccess()) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "At least one of the config should be specified."); - } - - } - -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/TableFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/TableFunction.java deleted file mode 100644 index e602291..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/TableFunction.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.geedgenetworks.common.udf; - -import com.geedgenetworks.common.Event; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.util.Collector; - -import java.io.Serializable; -import java.util.List; - -public interface TableFunction extends Serializable { - - void open(RuntimeContext runtimeContext, UDFContext udfContext); - - List<Event> evaluate(Event event); - - String functionName(); - - void close(); - -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java deleted file mode 100644 index ea98226..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.geedgenetworks.common.udf; - -import com.fasterxml.jackson.annotation.JsonProperty; -import lombok.Data; - -import java.io.Serializable; -import java.util.List; -import java.util.Map; - -@Data -public class UDFContext implements Serializable { - - private String name; - @JsonProperty("lookup_fields") - private List<String> lookupFields; - @JsonProperty("output_fields") - private List<String> outputFields; - private String filter; - private Map<String, Object> parameters; - private String function; - -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/HttpClientPoolUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/HttpClientPoolUtil.java new file mode 100644 index 0000000..cb98cc9 --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/HttpClientPoolUtil.java @@ -0,0 +1,152 @@ +package com.geedgenetworks.common.utils; + +import com.geedgenetworks.shaded.org.apache.http.*; +import com.geedgenetworks.shaded.org.apache.http.HttpEntity; +import com.geedgenetworks.shaded.org.apache.http.client.methods.CloseableHttpResponse; +import com.geedgenetworks.shaded.org.apache.http.client.methods.HttpGet; +import com.geedgenetworks.shaded.org.apache.http.config.Registry; +import com.geedgenetworks.shaded.org.apache.http.config.RegistryBuilder; +import com.geedgenetworks.shaded.org.apache.http.conn.socket.ConnectionSocketFactory; +import com.geedgenetworks.shaded.org.apache.http.conn.socket.PlainConnectionSocketFactory; +import com.geedgenetworks.shaded.org.apache.http.conn.ssl.NoopHostnameVerifier; +import com.geedgenetworks.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import com.geedgenetworks.shaded.org.apache.http.impl.client.CloseableHttpClient; +import com.geedgenetworks.shaded.org.apache.http.impl.client.HttpClientBuilder; +import com.geedgenetworks.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import com.geedgenetworks.shaded.org.apache.http.util.EntityUtils; +import com.geedgenetworks.utils.StringUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.IOUtils; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; +import java.io.IOException; +import java.net.URI; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.X509Certificate; + +@Slf4j +public class HttpClientPoolUtil { + + private static HttpClientPoolUtil instance; + private final CloseableHttpClient httpClient; + + private static final int DEFAULT_MAX_TOTAL = 400; + private static final int DEFAULT_MAX_PER_ROUTE = 10; + private static final int DEFAULT_CONNECTION_REQUEST_TIMEOUT = 10000; + private static final int DEFAULT_CONNECT_TIMEOUT = 10000; + private static final int DEFAULT_SOCKET_TIMEOUT = 60000; + private HttpClientPoolUtil() { + // 创建连接池管理器 + PoolingHttpClientConnectionManager connectionManager = getSslClientManager(); + // 设置最大连接数 + connectionManager.setMaxTotal(DEFAULT_MAX_TOTAL); + // 设置每个路由的最大连接数 + connectionManager.setDefaultMaxPerRoute(DEFAULT_MAX_PER_ROUTE); + // 创建 HttpClient 实例 + httpClient = HttpClientBuilder.create() + .setConnectionManager(connectionManager) + .build(); + } + + private PoolingHttpClientConnectionManager getSslClientManager() { + try { + // 在调用SSL之前需要重写验证方法,取消检测SSL + X509TrustManager trustManager = + new X509TrustManager() { + @Override + public X509Certificate[] getAcceptedIssuers() { + return null; + } + + @Override + public void checkClientTrusted(X509Certificate[] xcs, String str) {} + + @Override + public void checkServerTrusted(X509Certificate[] xcs, String str) {} + }; + SSLContext ctx = SSLContext.getInstance(SSLConnectionSocketFactory.TLS); + ctx.init(null, new TrustManager[] {trustManager}, null); + SSLConnectionSocketFactory socketFactory = + new SSLConnectionSocketFactory(ctx, NoopHostnameVerifier.INSTANCE); + Registry<ConnectionSocketFactory> socketFactoryRegistry = + RegistryBuilder.<ConnectionSocketFactory>create() + .register("http", PlainConnectionSocketFactory.INSTANCE) + .register("https", socketFactory) + .build(); + // 创建ConnectionManager,添加Connection配置信息 + PoolingHttpClientConnectionManager connManager = + new PoolingHttpClientConnectionManager(socketFactoryRegistry); + // 设置最大连接数 + connManager.setMaxTotal(DEFAULT_MAX_TOTAL); + // 设置每个连接的路由数 + connManager.setDefaultMaxPerRoute(DEFAULT_MAX_PER_ROUTE); + return connManager; + } catch (KeyManagementException | NoSuchAlgorithmException e) { + throw new RuntimeException(e.getMessage()); + } + } + + public static synchronized HttpClientPoolUtil getInstance() { + if (instance == null) { + instance = new HttpClientPoolUtil(); + } + return instance; + } + + public String httpGet(URI uri, Header... headers) throws IOException { + HttpGet httpGet = new HttpGet(uri); + String msg =""; + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpGet.addHeader(h); + // log.info("request header : {}", h); + } + } + try(CloseableHttpResponse response = httpClient.execute(httpGet)) { + int statusCode = response.getStatusLine().getStatusCode(); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + msg = EntityUtils.toString(entity, "UTF-8"); + + if (statusCode != HttpStatus.SC_OK) { + log.error("Http get content is :{}", msg); + } + } + return msg; + } + + public void close() throws IOException { + httpClient.close(); + } + + public byte[] httpGetByte(String url, Header... headers) { + byte[] result = null; + // 获取客户端连接对象 + // 创建GET请求对象 + HttpGet httpGet = new HttpGet(url); + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpGet.addHeader(h); + } + } + + try (CloseableHttpResponse response = httpClient.execute(httpGet)) { + int statusCode = response.getStatusLine().getStatusCode(); + // 获取响应实体 + result = IOUtils.toByteArray(response.getEntity().getContent()); + + if (statusCode != HttpStatus.SC_OK) { + log.error("httpGetByte error ! " + statusCode); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return result; + + } + +} |
