From d2579028fb90bd60ca9e5f9fa36cbde8a6db8872 Mon Sep 17 00:00:00 2001 From: doufenghu Date: Tue, 29 Oct 2024 20:42:50 +0800 Subject: [Improve][core] Add CheckUDFContextUtil for verifying UDF configurations. Rename lookup_fields and output_fields to lookupFields and outputFields. --- docs/develop-guide.md | 4 +- docs/processor/udf.md | 15 +- .../bootstrap/execution/AbstractExecutor.java | 2 +- .../execution/AbstractProcessorExecutor.java | 14 +- .../common/config/CheckConfigUtil.java | 42 ++-- .../geedgenetworks/common/config/CheckResult.java | 70 +----- .../common/config/CheckUDFContextUtil.java | 84 +++++++ .../common/config/UDFContextConfigOptions.java | 36 +++ .../geedgenetworks/common/udf/ScalarFunction.java | 15 ++ .../com/geedgenetworks/common/udf/UDFContext.java | 57 +---- .../com/geedgenetworks/core/udf/AsnLookup.java | 10 +- .../core/udf/CurrentUnixTimestamp.java | 6 +- .../com/geedgenetworks/core/udf/DecodeBase64.java | 6 +- .../java/com/geedgenetworks/core/udf/Domain.java | 151 ++++++------ .../com/geedgenetworks/core/udf/EncodeBase64.java | 10 +- .../java/com/geedgenetworks/core/udf/Encrypt.java | 8 +- .../java/com/geedgenetworks/core/udf/Eval.java | 4 +- .../java/com/geedgenetworks/core/udf/Flatten.java | 2 +- .../geedgenetworks/core/udf/FromUnixTimestamp.java | 10 +- .../core/udf/GenerateStringArray.java | 8 +- .../com/geedgenetworks/core/udf/GeoIpLookup.java | 10 +- .../java/com/geedgenetworks/core/udf/Hmac.java | 10 +- .../com/geedgenetworks/core/udf/JsonExtract.java | 10 +- .../com/geedgenetworks/core/udf/PathCombine.java | 2 +- .../com/geedgenetworks/core/udf/SnowflakeId.java | 6 +- .../com/geedgenetworks/core/udf/StringJoiner.java | 8 +- .../core/udf/UnixTimestampConverter.java | 10 +- .../udf/cn/AbstractKnowledgeScalarFunction.java | 8 +- .../core/udf/cn/ArrayElementsPrepend.java | 4 +- .../core/udf/cn/BaseStationLookup.java | 10 +- .../geedgenetworks/core/udf/cn/FieldsMerge.java | 4 +- .../geedgenetworks/core/udf/cn/H3CellLookup.java | 8 +- .../geedgenetworks/core/udf/udaf/CollectList.java | 8 +- .../geedgenetworks/core/udf/udaf/CollectSet.java | 8 +- .../geedgenetworks/core/udf/udaf/FirstValue.java | 8 +- .../HdrHistogram/HdrHistogramBaseAggregate.java | 242 ++++++++++---------- .../geedgenetworks/core/udf/udaf/LastValue.java | 8 +- .../geedgenetworks/core/udf/udaf/LongCount.java | 4 +- .../java/com/geedgenetworks/core/udf/udaf/Max.java | 8 +- .../com/geedgenetworks/core/udf/udaf/Mean.java | 8 +- .../java/com/geedgenetworks/core/udf/udaf/Min.java | 8 +- .../geedgenetworks/core/udf/udaf/NumberSum.java | 8 +- .../core/udf/udaf/hlld/HlldBaseAggregate.java | 252 ++++++++++----------- .../geedgenetworks/core/udf/udtf/JsonUnroll.java | 8 +- .../geedgenetworks/core/udf/udtf/PathUnroll.java | 236 +++++++++---------- .../com/geedgenetworks/core/udf/udtf/Unroll.java | 8 +- .../com/geedgenetworks/core/udf/uuid/UUID.java | 6 +- .../com/geedgenetworks/core/udf/uuid/UUIDv5.java | 8 +- .../com/geedgenetworks/core/udf/uuid/UUIDv7.java | 6 +- .../core/udf/cn/AnonymityLookupTest.java | 8 +- .../core/udf/cn/AppCategoryLookupTest.java | 2 +- .../core/udf/cn/BaseStationLookupTest.java | 4 +- .../core/udf/cn/DnsServerInfoLookupTest.java | 4 +- .../core/udf/cn/FqdnCategoryLookupTest.java | 2 +- .../core/udf/cn/FqdnWhoisLookupTest.java | 4 +- .../core/udf/cn/H3CellLookupTest.java | 4 +- .../geedgenetworks/core/udf/cn/IcpLookupTest.java | 4 +- .../core/udf/cn/IdcRenterLookupTest.java | 4 +- .../udf/cn/IntelligenceIndicatorLookupTest.java | 24 +- .../geedgenetworks/core/udf/cn/IocLookupTest.java | 8 +- .../core/udf/cn/IpZoneLookupTest.java | 4 +- .../core/udf/cn/LinkDirectionLookupTest.java | 4 +- .../core/udf/cn/UserDefineTagLookupTest.java | 16 +- .../geedgenetworks/core/udf/cn/VpnLookupTest.java | 8 +- .../core/udf/test/AsnLookupFunctionTest.java | 14 +- .../core/udf/test/GeoIpLookupFunctionTest.java | 14 +- .../core/udf/test/aggregate/CollectListTest.java | 8 +- .../core/udf/test/aggregate/CollectSetTest.java | 8 +- .../core/udf/test/aggregate/FirstValueTest.java | 8 +- .../core/udf/test/aggregate/LastValueTest.java | 8 +- .../core/udf/test/aggregate/LongCountTest.java | 6 +- .../core/udf/test/aggregate/MaxTest.java | 4 +- .../core/udf/test/aggregate/MeanTest.java | 20 +- .../core/udf/test/aggregate/MinTest.java | 4 +- .../core/udf/test/aggregate/NumberSumTest.java | 8 +- .../udf/test/simple/DecodeBase64FunctionTest.java | 8 +- .../core/udf/test/simple/DomainFunctionTest.java | 27 +-- .../udf/test/simple/EncodeBase64FunctionTest.java | 8 +- .../core/udf/test/simple/EncryptFunctionTest.java | 4 +- .../core/udf/test/simple/FlattenFunctionTest.java | 2 +- .../udf/test/simple/FromUnixTimestampTest.java | 4 +- .../simple/GenerateStringArrayFunctionTest.java | 4 +- .../core/udf/test/simple/HmacFunctionTest.java | 4 +- .../udf/test/simple/JsonExtractFunctionTest.java | 6 +- .../udf/test/simple/StringJoinerFunctionTest.java | 4 +- .../core/udf/test/simple/UUIDTest.java | 24 +- .../test/simple/UnixTimestampConverterTest.java | 4 +- .../udf/test/table/JsonUnrollFunctionTest.java | 12 +- .../core/udf/test/table/UnrollFunctionTest.java | 12 +- .../HdrHistogram/HdrHistogramQuantileTest.java | 176 +++++++------- .../HdrHistogram/HdrHistogramQuantilesTest.java | 194 ++++++++-------- .../udf/udaf/HdrHistogram/HdrHistogramTest.java | 202 ++++++++--------- .../udf/udaf/hlld/HlldApproxCountDistinctTest.java | 172 +++++++------- .../core/udf/udaf/hlld/HlldTest.java | 172 +++++++------- .../geedgenetworks/core/udf/udtf/UnrollTest.java | 216 +++++++++--------- .../resources/examples/inline_to_print_test.yaml | 10 + .../src/test/resources/inline_to_print.yaml | 4 +- 97 files changed, 1506 insertions(+), 1443 deletions(-) create mode 100644 groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java create mode 100644 groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java diff --git a/docs/develop-guide.md b/docs/develop-guide.md index 75e8803..927d2d3 100644 --- a/docs/develop-guide.md +++ b/docs/develop-guide.md @@ -21,7 +21,7 @@ Groot Stream based all stream processing on data records common known as events. ```json { "__timestamp": "", - "__input_id": "ID/Name of the source that delivered the event", + "__headers": "Map headers of the source that delivered the event", "__window_start_timestamp" : "", "__window_end_timestamp" : "", "key1": "", @@ -35,7 +35,7 @@ Groot Stream add internal fields during pipeline processing. A few notes about i - Treat internal fields as read-only. Modifying them can result in unintended consequences to your data flows. - Internal fields only exist for the duration of the event processing pipeline. They are not documented under sources or sinks. - If you do not configure a timestamp for extraction, the Pipeline process assigns the current time (in UNIX epoch format) to the __timestamp field. -- If you have multiple sources, you can determine which source the event came form by looking at the `__input_id` field. For example, the Kafka source adds the topic name to the `__input_id` field. +- If you have multiple sources, you can determine the origin of the event by examining the `__headers` field. For example, the Kafka source appends the topic name as the `__input_id` key in the `__headers`. ## How to write a high quality Git commit message diff --git a/docs/processor/udf.md b/docs/processor/udf.md index 0475192..e480275 100644 --- a/docs/processor/udf.md +++ b/docs/processor/udf.md @@ -96,18 +96,19 @@ Base64 encode function is commonly used to encode the binary data to base64 stri ```BASE64_ENCODE_TO_STRING(filter, output_fields[, parameters])``` - filter: optional -- lookup_fields: not required +- lookup_fields: required - output_fields: required - parameters: required - - value_field: `` required. + - input_type: `` required. Enum: `string`, `byte_array`. The input type of the value field. Example: ```yaml - function: BASE64_ENCODE_TO_STRING + lookup_fields: [packet] output_fields: [packet] parameters: - value_field: packet + input_type: string ``` ### Current Unix Timestamp @@ -141,7 +142,7 @@ Domain function is used to extract the domain from the url. - parameters: required - option: `` required. Enum: `TOP_LEVEL_DOMAIN`, `FIRST_SIGNIFICANT_SUBDOMAIN`. -#### Option +**Option** - `TOP_LEVEL_DOMAIN` is used to extract the top level domain from the url. For example, `www.abc.com` will be extracted to `com`. - `FIRST_SIGNIFICANT_SUBDOMAIN` is used to extract the first significant subdomain from the url. For example, `www.abc.com` will be extracted to `abc.com`. @@ -283,7 +284,7 @@ From unix timestamp function is used to convert the unix timestamp to date time - parameters: optional - precision: `` optional. Default is `seconds`. Enum: `milliseconds`, `seconds`. -#### Precision +**Precision** - `milliseconds` is used to convert the unix timestamp to milliseconds date time string. For example, `1619712000` will be converted to `2021-04-30 00:00:00.000`. - `seconds` is used to convert the unix timestamp to seconds date time string. For example, `1619712000` will be converted to `2021-04-30 00:00:00`. @@ -336,7 +337,7 @@ GeoIP lookup function is used to lookup the geoip information by ip address. You - ISP: `` optional. - ORGANIZATION: `` optional. -#### Option +**Option** - `IP_TO_COUNTRY` is used to lookup the country or region information by ip address. - `IP_TO_PROVINCE` is used to lookup the province or state information by ip address. @@ -348,7 +349,7 @@ GeoIP lookup function is used to lookup the geoip information by ip address. You - `IP_TO_JSON` is used to lookup the above information by ip address. The result is a json string. - `IP_TO_OBJECT` is used to lookup the above information by ip address. The result is a `LocationResponse` object. -#### GeoLocation Field Mapping +**GeoLocation Field Mapping** - `COUNTRY` is used to map the country information to the event field. - `PROVINCE` is used to map the province information to the event field. diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java index b45d643..f5b1a5d 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java @@ -14,7 +14,7 @@ import java.net.URLClassLoader; import java.util.*; import java.util.function.BiConsumer; -public abstract class AbstractExecutor +public abstract class AbstractExecutor implements Executor, JobRuntimeEnvironment> { protected JobRuntimeEnvironment jobRuntimeEnvironment; protected final Config operatorConfig; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java index 1719059..42a3a11 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java @@ -1,22 +1,14 @@ package com.geedgenetworks.bootstrap.execution; -import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.*; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.core.pojo.AggregateConfig; import com.geedgenetworks.core.pojo.ProcessorConfig; -import com.geedgenetworks.core.pojo.ProjectionConfig; -import com.geedgenetworks.core.pojo.TableConfig; import com.geedgenetworks.core.processor.Processor; -import com.geedgenetworks.core.processor.table.TableProcessor; -import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; -import com.geedgenetworks.core.processor.projection.ProjectionProcessor; import com.typesafe.config.Config; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; import java.util.List; @@ -59,7 +51,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor value, Config processorsConfig) { - ProcessorConfig projectionConfig = new ProcessorConfig(); + ProcessorConfig ProcessorConfig = new ProcessorConfig(); boolean found = false; // 标志变量 CheckResult result = CheckConfigUtil.checkAllExists(processorsConfig.getConfig(key), ProjectionConfigOptions.TYPE.key()); @@ -73,7 +65,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor missingParams = Arrays.stream(params) .filter(param -> !isValidParam(config, param)) @@ -20,11 +21,10 @@ public final class CheckConfigUtil { if (!missingParams.isEmpty()) { String errorMsg = String.format( - "please specify [%s] as non-empty.", String.join(",", missingParams)); + "Please specify [%s] as non-empty.", String.join(",", missingParams)); return CheckResult.error(errorMsg); - } else { - return CheckResult.success(); } + return CheckResult.success(); } /** check config if there was at least one usable */ @@ -33,48 +33,42 @@ public final class CheckConfigUtil { return CheckResult.success(); } - List missingParams = new LinkedList<>(); - for (String param : params) { - if (!isValidParam(config, param)) { - missingParams.add(param); - } - } + List missingParams = Arrays.stream(params) + .filter(param -> !isValidParam(config, param)) + .collect(Collectors.toList()); if (missingParams.size() == params.length) { String errorMsg = String.format( - "please specify at least one config of [%s] as non-empty.", + "Please specify at least one config of [%s] as non-empty.", String.join(",", missingParams)); return CheckResult.error(errorMsg); - } else { - return CheckResult.success(); } + return CheckResult.success(); } - public static boolean isValidParam(Config config, String param) { - boolean isValidParam = true; + public static boolean isValidParam(Config config, String param) { if (!config.hasPath(param)) { - isValidParam = false; - } else if (config.getAnyRef(param) instanceof List) { - isValidParam = !((List) config.getAnyRef(param)).isEmpty(); + return false; } - return isValidParam; + Object value = config.getAnyRef(param); + return !(value instanceof List && ((List) value).isEmpty()); } /** merge all check result */ public static CheckResult mergeCheckResults(CheckResult... checkResults) { + List notPassConfig = Arrays.stream(checkResults) .filter(item -> !item.isSuccess()) .collect(Collectors.toList()); if (notPassConfig.isEmpty()) { return CheckResult.success(); - } else { - String errMessage = - notPassConfig.stream() - .map(CheckResult::getMsg) - .collect(Collectors.joining(",")); - return CheckResult.error(errMessage); } + String errMessage = + notPassConfig.stream() + .map(CheckResult::getMsg) + .collect(Collectors.joining(",")); + return CheckResult.error(errMessage); } } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java index 5bf0196..e8e47f3 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java @@ -1,8 +1,13 @@ package com.geedgenetworks.common.config; +import lombok.Data; + +@Data public class CheckResult { private static final CheckResult SUCCESS = new CheckResult(true, ""); + private boolean success; + private String msg; private CheckResult(boolean success, String msg) { @@ -10,71 +15,16 @@ public class CheckResult { this.msg = msg; } + /** @return a successful instance of CheckResult */ public static CheckResult success() { return SUCCESS; } + /** + * @param msg the error message + * @return an error instance of CheckResult + */ public static CheckResult error(String msg) { return new CheckResult(false, msg); } - - public boolean isSuccess() { - return this.success; - } - - public String getMsg() { - return this.msg; - } - - public void setSuccess(boolean success) { - this.success = success; - } - - public void setMsg(String msg) { - this.msg = msg; - } - - public boolean equals(Object o) { - if (o == this) { - return true; - } else if (!(o instanceof CheckResult)) { - return false; - } else { - CheckResult other = (CheckResult)o; - if (!other.canEqual(this)) { - return false; - } else if (this.isSuccess() != other.isSuccess()) { - return false; - } else { - Object this$msg = this.getMsg(); - Object other$msg = other.getMsg(); - if (this$msg == null) { - if (other$msg != null) { - return false; - } - } else if (!this$msg.equals(other$msg)) { - return false; - } - - return true; - } - } - } - - protected boolean canEqual(Object other) { - return other instanceof CheckResult; - } - - public int hashCode() { - int PRIME = 59; - int result = 1; - result = result * PRIME + (this.isSuccess() ? 79 : 97); - Object $msg = this.getMsg(); - result = result * PRIME + ($msg == null ? 43 : $msg.hashCode()); - return result; - } - - public String toString() { - return "CheckResult(success=" + this.isSuccess() + ", msg=" + this.getMsg() + ")"; - } } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java new file mode 100644 index 0000000..80350f5 --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java @@ -0,0 +1,84 @@ +package com.geedgenetworks.common.config; + +import com.geedgenetworks.common.udf.UDFContext; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public final class CheckUDFContextUtil { + + private CheckUDFContextUtil() {} + + // Check if all the params are present in the UDFContext + public static CheckResult checkAllExists(UDFContext context, String... params) { + List missingParams = Arrays.stream(params) + .filter(param -> !isValidParam(context, param)) + .collect(Collectors.toList()); + + if (!missingParams.isEmpty()) { + String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", missingParams)); + return CheckResult.error(errorMsg); + } + return CheckResult.success(); + } + + // Check if at least one of the params is present in the UDFContext + public static CheckResult checkAtLeastOneExists(UDFContext context, String... params) { + if (params.length == 0) { + return CheckResult.success(); + } + + List missingParams = Arrays.stream(params) + .filter(param -> !isValidParam(context, param)) + .collect(Collectors.toList()); + + if (missingParams.size() == params.length) { + String errorMsg = java.lang.String.format("Please specify at least one config of [%s] as non-empty.", java.lang.String.join(",", missingParams)); + return CheckResult.error(errorMsg); + } + return CheckResult.success(); + } + + // Check Array/Map Object has only one item + public static CheckResult checkSingleItemExists (UDFContext context, String param) { + if (context == null) { + return CheckResult.error("UDFContext is null"); + } + + if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) { + return context.getLookupFields() != null && context.getLookupFields().size() == 1 ? CheckResult.success() : CheckResult.error("Lookup fields should have only one item"); + } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) { + return context.getOutputFields() != null && context.getOutputFields().size() == 1 ? CheckResult.success() : CheckResult.error("Output fields should have only one item"); + } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) { + return context.getParameters() != null && context.getParameters().size() == 1 ? CheckResult.success() : CheckResult.error("Parameters should have only one item"); + } else { + return CheckResult.error("Invalid param"); + } + + } + + public static boolean isValidParam(UDFContext context, String param) { + if (context == null) { + return false; + } + + if (UDFContextConfigOptions.NAME.key().equals(param)) { + return context.getName() != null; + } else if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) { + return context.getLookupFields() != null && !context.getLookupFields().isEmpty(); + } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) { + return context.getOutputFields() != null && !context.getOutputFields().isEmpty(); + } else if (UDFContextConfigOptions.FILTER.key().equals(param)) { + return context.getFilter() != null; + } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) { + return context.getParameters() != null && !context.getParameters().isEmpty(); + } else if (UDFContextConfigOptions.FUNCTION.key().equals(param)) { + return context.getFunction() != null; + } else { + return false; + } + + } + + +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java new file mode 100644 index 0000000..85bab48 --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java @@ -0,0 +1,36 @@ +package com.geedgenetworks.common.config; + +import java.util.List; +import java.util.Map; +import com.alibaba.fastjson2.TypeReference; +public interface UDFContextConfigOptions { + Option NAME = Options.key("name") + .stringType() + .noDefaultValue() + .withDescription("The name of the function."); + + Option> LOOKUP_FIELDS = Options.key("lookup_fields") + .listType() + .noDefaultValue() + .withDescription("The fields to be looked up."); + + Option> OUTPUT_FIELDS = Options.key("output_fields") + .listType() + .noDefaultValue() + .withDescription("The fields to be outputted."); + + Option FILTER = Options.key("filter") + .stringType() + .noDefaultValue() + .withDescription("The filter expression to be applied."); + + Option> PARAMETERS = Options.key("parameters") + .type(new TypeReference>() {}) + .noDefaultValue() + .withDescription("The parameters for the function."); + + Option FUNCTION = Options.key("function") + .stringType() + .noDefaultValue() + .withDescription("The function to be executed."); +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java index 2aab34b..2723652 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java @@ -1,5 +1,9 @@ package com.geedgenetworks.common.udf; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.config.CheckUDFContextUtil; +import com.geedgenetworks.common.config.UDFContextConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import org.apache.flink.api.common.functions.RuntimeContext; import java.io.Serializable; @@ -13,4 +17,15 @@ public interface ScalarFunction extends Serializable { void close(); + default void checkConfig(UDFContext udfContext) { + if (udfContext == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "UDFContext cannot be null"); + } + + if (!CheckUDFContextUtil.checkAtLeastOneExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key(), UDFContextConfigOptions.OUTPUT_FIELDS.key(), UDFContextConfigOptions.FILTER.key()).isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "At least one of the config should be specified."); + } + + } + } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java index 4062924..ea98226 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java @@ -1,63 +1,22 @@ package com.geedgenetworks.common.udf; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + import java.io.Serializable; import java.util.List; import java.util.Map; +@Data public class UDFContext implements Serializable { private String name; - private List lookup_fields; - private List output_fields; + @JsonProperty("lookup_fields") + private List lookupFields; + @JsonProperty("output_fields") + private List outputFields; private String filter; private Map parameters; private String function; - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public List getLookup_fields() { - return lookup_fields; - } - - public void setLookup_fields(List lookup_fields) { - this.lookup_fields = lookup_fields; - } - - public List getOutput_fields() { - return output_fields; - } - - public void setOutput_fields(List output_fields) { - this.output_fields = output_fields; - } - - public String getFilter() { - return filter; - } - - public void setFilter(String filter) { - this.filter = filter; - } - - public Map getParameters() { - return parameters; - } - - public void setParameters(Map parameters) { - this.parameters = parameters; - } - - public String getFunction() { - return function; - } - - public void setFunction(String function) { - this.function = function; - } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java index 6d8373e..bd1447a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java @@ -44,8 +44,8 @@ public class AsnLookup implements ScalarFunction { } else { log.error("AsnLookup init KnowledgeBase error "); } - this.lookupFieldName = udfContext.getLookup_fields().get(0); - this.outputFieldName = udfContext.getOutput_fields().get(0); + this.lookupFieldName = udfContext.getLookupFields().get(0); + this.outputFieldName = udfContext.getOutputFields().get(0); AsnKnowledgeBaseHandler.increment(); log.warn("AsnKnowledgeBaseHandlerCount "+AsnKnowledgeBaseHandler.getCount()); } @@ -81,14 +81,14 @@ public class AsnLookup implements ScalarFunction { private void checkUdfContext(UDFContext udfContext) { - if (udfContext.getLookup_fields() == null || udfContext.getOutput_fields() == null || udfContext.getParameters() == null) { + if (udfContext.getLookupFields() == null || udfContext.getOutputFields() == null || udfContext.getParameters() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - if (udfContext.getLookup_fields().size() != 1) { + if (udfContext.getLookupFields().size() != 1) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value"); } - if (udfContext.getOutput_fields().size() != 1) { + if (udfContext.getOutputFields().size() != 1) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java index 5770201..98b2d68 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java @@ -18,10 +18,10 @@ public class CurrentUnixTimestamp implements ScalarFunction { public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if( udfContext.getOutput_fields()==null || udfContext.getParameters() == null){ + if( udfContext.getOutputFields()==null || udfContext.getParameters() == null){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - if(udfContext.getOutput_fields().size() != 1){ + if(udfContext.getOutputFields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } if(!udfContext.getParameters().containsKey("precision")){ @@ -34,7 +34,7 @@ public class CurrentUnixTimestamp implements ScalarFunction { } } this.precision = udfContext.getParameters().get("precision").toString(); - this.outputFieldName = udfContext.getOutput_fields().get(0); + this.outputFieldName = udfContext.getOutputFields().get(0); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java index 663e626..bc8563a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java @@ -22,16 +22,16 @@ public class DecodeBase64 implements ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if(udfContext.getParameters()==null || udfContext.getOutput_fields()==null){ + if(udfContext.getParameters()==null || udfContext.getOutputFields()==null){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - if(udfContext.getOutput_fields().size() != 1){ + if(udfContext.getOutputFields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } if(!udfContext.getParameters().containsKey("value_field") ||!udfContext.getParameters().containsKey("charset_field") ){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey value_field and charset_field"); } - this.outputFieldName = udfContext.getOutput_fields().get(0); + this.outputFieldName = udfContext.getOutputFields().get(0); this.valueField =udfContext.getParameters().get("value_field").toString(); this.charsetField =udfContext.getParameters().get("charset_field").toString(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java index 7bc293e..a92f211 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java @@ -1,105 +1,124 @@ package com.geedgenetworks.core.udf; - +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.config.CheckResult; +import com.geedgenetworks.common.config.CheckUDFContextUtil; +import com.geedgenetworks.common.config.UDFContextConfigOptions; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.ScalarFunction; -import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.shaded.com.google.common.net.InternetDomainName; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; - import java.util.List; - import static com.geedgenetworks.utils.FormatUtils.getTopPrivateDomain; @Slf4j public class Domain implements ScalarFunction { - - - private String option; + private Option option; private List lookupFields; private String outputFieldName; - @Override - public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if(udfContext.getLookup_fields()==null || udfContext.getOutput_fields()==null || udfContext.getParameters() == null){ - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); - } - if(udfContext.getLookup_fields().isEmpty()){ - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup field is not empty"); - } - if(udfContext.getOutput_fields().size() != 1){ - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); - } - if(!udfContext.getParameters().containsKey("option")){ - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey option"); - } - else{ - if(!udfContext.getParameters().get("option").toString().equals("TOP_LEVEL_DOMAIN") && - !udfContext.getParameters().get("option").toString().equals("FIRST_SIGNIFICANT_SUBDOMAIN") && - !udfContext.getParameters().get("option").toString().equals("FQDN")){ - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct"); + enum Option { + TOP_LEVEL_DOMAIN, + FIRST_SIGNIFICANT_SUBDOMAIN, + FQDN; + + public static boolean isValid(String option) { + try { + Option.valueOf(option); + return true; + } catch (IllegalArgumentException e) { + return false; } } - this.option = udfContext.getParameters().get("option").toString(); - this.lookupFields = udfContext.getLookup_fields(); - this.outputFieldName = udfContext.getOutput_fields().get(0); + } + @Override + public void open(RuntimeContext runtimeContext, UDFContext udfContext) { + checkConfig(udfContext); + this.option = Option.valueOf(udfContext.getParameters().get("option").toString()); + this.lookupFields = udfContext.getLookupFields(); + this.outputFieldName = udfContext.getOutputFields().get(0); } @Override public Event evaluate(Event event) { String domain = ""; - switch (option) { - case "TOP_LEVEL_DOMAIN": - for (String lookupField : lookupFields){ - - Object valueObj = event.getExtractedFields().get(lookupField); - if (valueObj!=null) { - domain = getTopPrivateDomain((String) valueObj); - if (domain.contains(".")) { - domain = domain.substring(domain.indexOf(".") + 1); - } - if(!domain.isEmpty()){ - break; - } - } - } - break; - case "FIRST_SIGNIFICANT_SUBDOMAIN": - for (String lookupField : lookupFields){ - Object valueObj = event.getExtractedFields().get(lookupField); - if (valueObj!=null) { - domain = getTopPrivateDomain((String)valueObj); - } - if(!domain.isEmpty()){ + for (String fieldName : lookupFields) { + Object fieldValue = event.getExtractedFields().get(fieldName); + if (fieldValue == null) { + continue; + } + + String value = fieldValue.toString(); + try { + + String topPrivateDomain = getTopPrivateDomain(value); + + switch (option) { + case TOP_LEVEL_DOMAIN: + domain = InternetDomainName.from(topPrivateDomain).publicSuffix().toString(); break; - } - } - break; - case "FQDN": - for (String lookupField : lookupFields) { - Object valueObj = event.getExtractedFields().get(lookupField); - if (valueObj!=null) { - domain = (String)valueObj; - } - if(!domain.isEmpty()){ + case FIRST_SIGNIFICANT_SUBDOMAIN: + domain = topPrivateDomain; + break; + case FQDN: // Use the original value + domain = value; break; - } + default: + throw new IllegalArgumentException("Unknown option: " + option); + } + + if (!domain.isEmpty()) { // Found a valid domain will break the loop + break; } - break; + + } catch (IllegalArgumentException e) { + log.error("Invalid domain: {}", value); + } } + event.getExtractedFields().put(outputFieldName, domain); return event; } @Override - public String functionName() { - return "DOMAIN"; + public void checkConfig(UDFContext udfContext) { + CheckResult result = CheckUDFContextUtil.checkAllExists(udfContext, + UDFContextConfigOptions.LOOKUP_FIELDS.key(), + UDFContextConfigOptions.OUTPUT_FIELDS.key(), + UDFContextConfigOptions.PARAMETERS.key()); + + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg()); + } + + result = CheckUDFContextUtil.checkSingleItemExists(udfContext, UDFContextConfigOptions.OUTPUT_FIELDS.key()); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); + } + + if(!udfContext.getParameters().containsKey("option")){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format( + "UDF: %s, [%s] Option should be specified.", + udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key())); + } + + String optionValue = udfContext.getParameters().get("option").toString(); + if (!Option.isValid(optionValue)) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format( "UDF: %s, [%s] Option value is not correct.", + udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key())); + } + } + @Override + public String functionName() { + return "DOMAIN"; + } @Override public void close() { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java index fe45fff..a950252 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java @@ -22,20 +22,20 @@ public class EncodeBase64 implements ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if(udfContext.getParameters()==null || udfContext.getOutput_fields()==null || udfContext.getLookup_fields() == null){ + if(udfContext.getParameters()==null || udfContext.getOutputFields()==null || udfContext.getLookupFields() == null){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - if(udfContext.getOutput_fields().size() != 1){ + if(udfContext.getOutputFields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } - if(udfContext.getLookup_fields().size() != 1){ + if(udfContext.getLookupFields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value"); } if(!udfContext.getParameters().containsKey("input_type") ){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey value_field "); } - this.outputFieldName = udfContext.getOutput_fields().get(0); - this.lookupFieldName = udfContext.getLookup_fields().get(0); + this.outputFieldName = udfContext.getOutputFields().get(0); + this.lookupFieldName = udfContext.getLookupFields().get(0); this.input_type =udfContext.getParameters().get("input_type").toString(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java index b20ff18..591c202 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java @@ -48,8 +48,8 @@ public class Encrypt implements ScalarFunction { if (udfContext.getParameters().containsKey("default_val")) { this.defaultVal = udfContext.getParameters().get("default_val").toString(); } - this.lookupFieldName = udfContext.getLookup_fields().get(0); - this.outputFieldName = udfContext.getOutput_fields().get(0); + this.lookupFieldName = udfContext.getLookupFields().get(0); + this.outputFieldName = udfContext.getOutputFields().get(0); this.identifier = udfContext.getParameters().get("identifier").toString(); Configuration configuration = (Configuration) runtimeContext.getExecutionConfig().getGlobalJobParameters(); CommonConfig commonConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class); @@ -136,10 +136,10 @@ public class Encrypt implements ScalarFunction { if (udfContext.getParameters() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - if (udfContext.getLookup_fields().size() != 1) { + if (udfContext.getLookupFields().size() != 1) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value"); } - if (udfContext.getOutput_fields().size() != 1) { + if (udfContext.getOutputFields().size() != 1) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } if (!udfContext.getParameters().containsKey("identifier")) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java index 1b83d94..b04dc97 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java @@ -21,14 +21,14 @@ public class Eval implements ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if(udfContext.getOutput_fields()==null || udfContext.getParameters() == null){ + if(udfContext.getOutputFields()==null || udfContext.getParameters() == null){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } if(!udfContext.getParameters().containsKey("value_expression")){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey value_expression"); } String expr = (String) udfContext.getParameters().get("value_expression"); - List outputField = udfContext.getOutput_fields(); + List outputField = udfContext.getOutputFields(); output = outputField.get(0); calc = new EvalExecutor(expr); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java index 84c2c2a..d5d5761 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java @@ -30,7 +30,7 @@ public class Flatten implements ScalarFunction { prefix = udfContext.getParameters().getOrDefault("prefix", "").toString(); delimiter = udfContext.getParameters().getOrDefault("delimiter", ".").toString(); flattenKeys = new HashSet<>(); - for (String key : udfContext.getLookup_fields()) { + for (String key : udfContext.getLookupFields()) { this.flattenKeys.add(prefix.isEmpty() ? key : prefix + delimiter + key); } depth = Integer.parseInt(udfContext.getParameters().getOrDefault("depth", "5").toString()); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java index d9381cf..d8803c3 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java @@ -23,14 +23,14 @@ public class FromUnixTimestamp implements ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if(udfContext.getLookup_fields()==null || udfContext.getOutput_fields()==null || udfContext.getParameters() == null){ + if(udfContext.getLookupFields()==null || udfContext.getOutputFields()==null || udfContext.getParameters() == null){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - if(udfContext.getOutput_fields().size() != 1){ + if(udfContext.getOutputFields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } - if(udfContext.getLookup_fields().size() != 1){ + if(udfContext.getLookupFields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value"); } if(!udfContext.getParameters().containsKey("precision")){ @@ -46,8 +46,8 @@ public class FromUnixTimestamp implements ScalarFunction { } this.precision = udfContext.getParameters().get("precision").toString(); - this.outputFieldName = udfContext.getOutput_fields().get(0); - this.lookupFieldName = udfContext.getLookup_fields().get(0); + this.outputFieldName = udfContext.getOutputFields().get(0); + this.lookupFieldName = udfContext.getLookupFields().get(0); this.sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); switch (precision) { case "seconds": diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GenerateStringArray.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GenerateStringArray.java index ce4fc48..366f204 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GenerateStringArray.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GenerateStringArray.java @@ -18,14 +18,14 @@ public class GenerateStringArray implements ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if(udfContext.getLookup_fields()==null || udfContext.getOutput_fields()==null ){ + if(udfContext.getLookupFields()==null || udfContext.getOutputFields()==null ){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - if(udfContext.getOutput_fields().size() != 1){ + if(udfContext.getOutputFields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } - this.lookupFieldNames = udfContext.getLookup_fields(); - this.outputFieldName = udfContext.getOutput_fields().get(0); + this.lookupFieldNames = udfContext.getLookupFields(); + this.outputFieldName = udfContext.getOutputFields().get(0); } @Override diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java index 559748e..d73e1f2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java @@ -50,9 +50,9 @@ public class GeoIpLookup implements ScalarFunction { } else { log.error("GeoIpLookup init KnowledgeBase error "); } - this.lookupFieldName = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()){ - this.outputFieldName = udfContext.getOutput_fields().get(0); + this.lookupFieldName = udfContext.getLookupFields().get(0); + if(udfContext.getOutputFields()!=null && !udfContext.getOutputFields().isEmpty()){ + this.outputFieldName = udfContext.getOutputFields().get(0); } GeoIpKnowledgeBaseHandler.increment(); log.warn("GeoIpKnowledgeBaseHandler "+GeoIpKnowledgeBaseHandler.getCount()); @@ -105,10 +105,10 @@ public class GeoIpLookup implements ScalarFunction { private void checkUdfContext(UDFContext udfContext) { - if (udfContext.getLookup_fields() == null || udfContext.getParameters() == null) { + if (udfContext.getLookupFields() == null || udfContext.getParameters() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - if (udfContext.getLookup_fields().size() != 1) { + if (udfContext.getLookupFields().size() != 1) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value"); } if (!udfContext.getParameters().containsKey("kb_name")) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java index 0d2e1ca..098cdef 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java @@ -28,8 +28,8 @@ public class Hmac implements ScalarFunction { algorithm = udfContext.getParameters().get("algorithm").toString(); } this.hMac = new HMac(getHmacAlgorithm(algorithm), secretKey.getBytes()); - this.lookupFieldName = udfContext.getLookup_fields().get(0); - this.outputFieldName = udfContext.getOutput_fields().get(0); + this.lookupFieldName = udfContext.getLookupFields().get(0); + this.outputFieldName = udfContext.getOutputFields().get(0); this.outputFormat = "base64"; if (udfContext.getParameters().containsKey("output_format")) { this.outputFormat = udfContext.getParameters().get("output_format").toString(); @@ -68,13 +68,13 @@ public class Hmac implements ScalarFunction { } private void checkUdfContext(UDFContext udfContext) { - if (udfContext.getParameters() == null || udfContext.getOutput_fields() == null) { + if (udfContext.getParameters() == null || udfContext.getOutputFields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - if (udfContext.getLookup_fields().size() != 1) { + if (udfContext.getLookupFields().size() != 1) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value"); } - if (udfContext.getOutput_fields().size() != 1) { + if (udfContext.getOutputFields().size() != 1) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } if (!udfContext.getParameters().containsKey("secret_key")) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java index 36c6cea..57fe847 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java @@ -17,20 +17,20 @@ public class JsonExtract implements ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if(udfContext.getLookup_fields()==null || udfContext.getOutput_fields()==null || udfContext.getParameters() == null){ + if(udfContext.getLookupFields()==null || udfContext.getOutputFields()==null || udfContext.getParameters() == null){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - if(udfContext.getLookup_fields().size() != 1){ + if(udfContext.getLookupFields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value"); } - if(udfContext.getOutput_fields().size() != 1){ + if(udfContext.getOutputFields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } if(!udfContext.getParameters().containsKey("value_expression")){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey value_expression"); } - this.lookupFieldName = udfContext.getLookup_fields().get(0); - this.outputFieldName = udfContext.getOutput_fields().get(0); + this.lookupFieldName = udfContext.getLookupFields().get(0); + this.outputFieldName = udfContext.getOutputFields().get(0); this.expression =udfContext.getParameters().get("value_expression").toString(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java index 65617ba..0141a46 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java @@ -55,7 +55,7 @@ public class PathCombine implements ScalarFunction { } } } - this.outputFieldName = udfContext.getOutput_fields().get(0); + this.outputFieldName = udfContext.getOutputFields().get(0); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java index 42a19e6..b206f3b 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java @@ -17,15 +17,15 @@ public class SnowflakeId implements Serializable, ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if(udfContext.getOutput_fields()==null || udfContext.getParameters() == null ){ + if(udfContext.getOutputFields()==null || udfContext.getParameters() == null ){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - if(udfContext.getOutput_fields().size() != 1){ + if(udfContext.getOutputFields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } String data_center_id_num = udfContext.getParameters().getOrDefault("data_center_id_num","0").toString();//转为数字 snowflakeIdUtils = new SnowflakeIdUtils(Integer.parseInt(data_center_id_num),runtimeContext.getIndexOfThisSubtask()); - this.outputFieldName = udfContext.getOutput_fields().get(0); + this.outputFieldName = udfContext.getOutputFields().get(0); } @Override diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/StringJoiner.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/StringJoiner.java index df5c5b4..7e4ab68 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/StringJoiner.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/StringJoiner.java @@ -21,14 +21,14 @@ public class StringJoiner implements ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if(udfContext.getLookup_fields()==null || udfContext.getOutput_fields()==null || udfContext.getParameters() == null){ + if(udfContext.getLookupFields()==null || udfContext.getOutputFields()==null || udfContext.getParameters() == null){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - if(udfContext.getOutput_fields().size() != 1){ + if(udfContext.getOutputFields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } - this.lookupFieldNames = udfContext.getLookup_fields(); - this.outputFieldName = udfContext.getOutput_fields().get(0); + this.lookupFieldNames = udfContext.getLookupFields(); + this.outputFieldName = udfContext.getOutputFields().get(0); this.separator =udfContext.getParameters().getOrDefault("separator","").toString().trim(); this.prefix =udfContext.getParameters().getOrDefault("prefix","").toString().trim(); this.suffix =udfContext.getParameters().getOrDefault("suffix","").toString().trim(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java index bff8f18..62c4dfa 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java @@ -23,13 +23,13 @@ public class UnixTimestampConverter implements ScalarFunction { public void open(RuntimeContext runtimeContext, UDFContext udfContext) { this.udfContext = udfContext; - if(udfContext.getLookup_fields()==null || udfContext.getOutput_fields()==null || udfContext.getParameters() == null){ + if(udfContext.getLookupFields()==null || udfContext.getOutputFields()==null || udfContext.getParameters() == null){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - if(udfContext.getOutput_fields().size() != 1){ + if(udfContext.getOutputFields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } - if(udfContext.getLookup_fields().size() != 1){ + if(udfContext.getLookupFields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } if(!udfContext.getParameters().containsKey("precision")){ @@ -51,8 +51,8 @@ public class UnixTimestampConverter implements ScalarFunction { else{ this.interval = Long.parseLong(udfContext.getParameters().get("interval").toString()); } - this.lookupFieldName = udfContext.getLookup_fields().get(0); - this.outputFieldName = udfContext.getOutput_fields().get(0); + this.lookupFieldName = udfContext.getLookupFields().get(0); + this.outputFieldName = udfContext.getOutputFields().get(0); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeScalarFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeScalarFunction.java index 7c4aca2..e54b612 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeScalarFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeScalarFunction.java @@ -50,14 +50,14 @@ public abstract class AbstractKnowledgeScalarFunction implements ScalarFunction registerKnowledges(); - this.lookupFieldName = udfContext.getLookup_fields().get(0); - if (udfContext.getOutput_fields() != null && udfContext.getOutput_fields().size() > 0) { - this.outputFieldName = udfContext.getOutput_fields().get(0); + this.lookupFieldName = udfContext.getLookupFields().get(0); + if (udfContext.getOutputFields() != null && udfContext.getOutputFields().size() > 0) { + this.outputFieldName = udfContext.getOutputFields().get(0); } } private String buildMetricPrefix(UDFContext udfContext) { - return functionName().toLowerCase() + "_" + udfContext.getLookup_fields().get(0); + return functionName().toLowerCase() + "_" + udfContext.getLookupFields().get(0); } protected abstract void registerKnowledges(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/ArrayElementsPrepend.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/ArrayElementsPrepend.java index a591884..de64073 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/ArrayElementsPrepend.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/ArrayElementsPrepend.java @@ -24,8 +24,8 @@ public class ArrayElementsPrepend implements ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { this.prefix = udfContext.getParameters().get("prefix").toString(); - this.lookupFieldName = udfContext.getLookup_fields().get(0); - this.outputFieldName = udfContext.getOutput_fields().get(0); + this.lookupFieldName = udfContext.getLookupFields().get(0); + this.outputFieldName = udfContext.getOutputFields().get(0); } @Override diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/BaseStationLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/BaseStationLookup.java index d506461..191edd5 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/BaseStationLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/BaseStationLookup.java @@ -29,23 +29,23 @@ public class BaseStationLookup extends AbstractKnowledgeScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { super.open(runtimeContext, udfContext); - if (udfContext.getLookup_fields() == null || udfContext.getOutput_fields() == null || udfContext.getParameters() == null) { + if (udfContext.getLookupFields() == null || udfContext.getOutputFields() == null || udfContext.getParameters() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } int lookupFieldsSize = 1; - if (udfContext.getLookup_fields().size() != lookupFieldsSize) { + if (udfContext.getLookupFields().size() != lookupFieldsSize) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "lookup_fields must contain field: cell_id"); } int outputFieldsSize = 2; - if (udfContext.getOutput_fields().size() != outputFieldsSize) { + if (udfContext.getOutputFields().size() != outputFieldsSize) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "output_fields must contain two fields: longitude and latitude"); } - List lookupFields = udfContext.getLookup_fields(); + List lookupFields = udfContext.getLookupFields(); cellIdFieldName = lookupFields.get(0); - List outputFields = udfContext.getOutput_fields(); + List outputFields = udfContext.getOutputFields(); longitudeFieldName = outputFields.get(0); latitudeFieldName = outputFields.get(1); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FieldsMerge.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FieldsMerge.java index 6cd0c6b..f4338fc 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FieldsMerge.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FieldsMerge.java @@ -22,8 +22,8 @@ public class FieldsMerge implements ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - this.lookupFieldNames = udfContext.getLookup_fields(); - this.outputFieldName = udfContext.getOutput_fields().get(0); + this.lookupFieldNames = udfContext.getLookupFields(); + this.outputFieldName = udfContext.getOutputFields().get(0); } @Override diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/H3CellLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/H3CellLookup.java index 2698772..7389f4a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/H3CellLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/H3CellLookup.java @@ -32,12 +32,12 @@ public class H3CellLookup implements ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if (udfContext.getLookup_fields() == null || udfContext.getOutput_fields() == null || udfContext.getParameters() == null) { + if (udfContext.getLookupFields() == null || udfContext.getOutputFields() == null || udfContext.getParameters() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } int lookupFieldsSize = 2; - if (udfContext.getLookup_fields().size() != lookupFieldsSize) { + if (udfContext.getLookupFields().size() != lookupFieldsSize) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "lookup_fields must contain two fields: longitude and latitude"); } @@ -47,10 +47,10 @@ public class H3CellLookup implements ScalarFunction { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must contain key resolution and the value is between 0 and 15"); } - List lookupFields = udfContext.getLookup_fields(); + List lookupFields = udfContext.getLookupFields(); longitudeFieldName = lookupFields.get(0); latitudeFieldName = lookupFields.get(1); - outputFieldName = udfContext.getOutput_fields().get(0); + outputFieldName = udfContext.getOutputFields().get(0); res = (int) udfContext.getParameters().get("resolution"); try { h3 = H3Core.newInstance(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java index 230a3ca..8219fbd 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java @@ -36,12 +36,12 @@ public class CollectList implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if (udfContext.getLookup_fields() == null) { + if (udfContext.getLookupFields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { - this.outputField = udfContext.getOutput_fields().get(0); + this.lookupField = udfContext.getLookupFields().get(0); + if (udfContext.getOutputFields() != null && !udfContext.getOutputFields().isEmpty()) { + this.outputField = udfContext.getOutputFields().get(0); } else { outputField = lookupField; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java index 708a8a6..c23f0ca 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java @@ -22,12 +22,12 @@ public class CollectSet implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if (udfContext.getLookup_fields() == null) { + if (udfContext.getLookupFields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { - this.outputField = udfContext.getOutput_fields().get(0); + this.lookupField = udfContext.getLookupFields().get(0); + if (udfContext.getOutputFields() != null && !udfContext.getOutputFields().isEmpty()) { + this.outputField = udfContext.getOutputFields().get(0); } else { outputField = lookupField; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java index c6ba61b..f68448f 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java @@ -34,12 +34,12 @@ public class FirstValue implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if (udfContext.getLookup_fields() == null) { + if (udfContext.getLookupFields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { - this.outputField = udfContext.getOutput_fields().get(0); + this.lookupField = udfContext.getLookupFields().get(0); + if (udfContext.getOutputFields() != null && !udfContext.getOutputFields().isEmpty()) { + this.outputField = udfContext.getOutputFields().get(0); } else { outputField = lookupField; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java index a099fde..d8656e0 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java @@ -1,121 +1,121 @@ -package com.geedgenetworks.core.udf.udaf.HdrHistogram; - -import com.geedgenetworks.common.Accumulator; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.udf.AggregateFunction; -import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.sketch.util.StringUtils; -import org.HdrHistogram.ArrayHistogram; -import org.HdrHistogram.DirectMapHistogram; -import org.HdrHistogram.Histogramer; -import org.apache.commons.collections.CollectionUtils; - -import java.nio.charset.StandardCharsets; -import java.util.Map; - -public abstract class HdrHistogramBaseAggregate implements AggregateFunction { - protected String inputField; - protected String outputField; - protected boolean inputSketch; - protected long lowestDiscernibleValue; - protected long highestTrackableValue; - protected int numberOfSignificantValueDigits; - protected boolean autoResize; - - @Override - public void open(UDFContext c) { - inputField = c.getLookup_fields().get(0); - if (CollectionUtils.isNotEmpty(c.getOutput_fields())) { - outputField = c.getOutput_fields().get(0); - } else { - outputField = inputField; - } - Map params = c.getParameters(); - lowestDiscernibleValue = Long.parseLong(params.getOrDefault("lowestDiscernibleValue", "1").toString()); - highestTrackableValue = Long.parseLong(params.getOrDefault("highestTrackableValue", "2").toString()); - numberOfSignificantValueDigits = Integer.parseInt(params.getOrDefault("numberOfSignificantValueDigits", "1").toString()); - autoResize = Boolean.valueOf(params.getOrDefault("autoResize", "true").toString()); - inputSketch = "sketch".equalsIgnoreCase(params.getOrDefault("input_type", "sketch").toString()); - } - - @Override - public Accumulator initAccumulator(Accumulator acc) { - return acc; - } - - @Override - public Accumulator add(Event event, Accumulator acc) { - Object value = event.getExtractedFields().get(inputField); - if (value == null) { - return acc; - } - - if (inputSketch) { - updateHdrMerge(acc, value); - } else { - updateHdr(acc, value); - } - - return acc; - } - - @Override - public Accumulator merge(Accumulator acc, Accumulator other) { - Object agg = acc.getMetricsFields().get(outputField); - Object aggOther = other.getMetricsFields().get(outputField); - Object rst; - - if(agg == null){ - rst = aggOther; - } else if (aggOther == null) { - rst = agg; - }else{ - rst = ((Histogramer)agg).merge(((Histogramer) aggOther)); - } - - if(rst != null){ - acc.getMetricsFields().put(outputField, rst); - } - return acc; - } - - protected void updateHdr(Accumulator acc, Object value) { - Map aggs = acc.getMetricsFields(); - ArrayHistogram his = (ArrayHistogram) aggs.get(outputField); - if (his == null) { - his = new ArrayHistogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); - his.setAutoResize(autoResize); - aggs.put(outputField, his); - } - - his.recordValue(((Number) value).longValue()); - } - - - protected void updateHdrMerge(Accumulator acc, Object value) { - Map aggs = acc.getMetricsFields(); - ArrayHistogram his = (ArrayHistogram) aggs.get(outputField); - if (his == null) { - his = new ArrayHistogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); - his.setAutoResize(autoResize); - aggs.put(outputField, his); - } - - Histogramer h; - if (value instanceof String) { - byte[] bytes = StringUtils.decodeBase64(((String) value).getBytes(StandardCharsets.UTF_8)); - h = DirectMapHistogram.wrapBytes(bytes); - } else if (value instanceof byte[]) { - h = DirectMapHistogram.wrapBytes((byte[]) value); - } else if (value instanceof Histogramer) { - h = (Histogramer) value; - } else { - throw new IllegalArgumentException("Unsupported type " + value.getClass()); - } - - his.merge(h); - } - - @Override - public void close() {} -} +package com.geedgenetworks.core.udf.udaf.HdrHistogram; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.sketch.util.StringUtils; +import org.HdrHistogram.ArrayHistogram; +import org.HdrHistogram.DirectMapHistogram; +import org.HdrHistogram.Histogramer; +import org.apache.commons.collections.CollectionUtils; + +import java.nio.charset.StandardCharsets; +import java.util.Map; + +public abstract class HdrHistogramBaseAggregate implements AggregateFunction { + protected String inputField; + protected String outputField; + protected boolean inputSketch; + protected long lowestDiscernibleValue; + protected long highestTrackableValue; + protected int numberOfSignificantValueDigits; + protected boolean autoResize; + + @Override + public void open(UDFContext c) { + inputField = c.getLookupFields().get(0); + if (CollectionUtils.isNotEmpty(c.getOutputFields())) { + outputField = c.getOutputFields().get(0); + } else { + outputField = inputField; + } + Map params = c.getParameters(); + lowestDiscernibleValue = Long.parseLong(params.getOrDefault("lowestDiscernibleValue", "1").toString()); + highestTrackableValue = Long.parseLong(params.getOrDefault("highestTrackableValue", "2").toString()); + numberOfSignificantValueDigits = Integer.parseInt(params.getOrDefault("numberOfSignificantValueDigits", "1").toString()); + autoResize = Boolean.valueOf(params.getOrDefault("autoResize", "true").toString()); + inputSketch = "sketch".equalsIgnoreCase(params.getOrDefault("input_type", "sketch").toString()); + } + + @Override + public Accumulator initAccumulator(Accumulator acc) { + return acc; + } + + @Override + public Accumulator add(Event event, Accumulator acc) { + Object value = event.getExtractedFields().get(inputField); + if (value == null) { + return acc; + } + + if (inputSketch) { + updateHdrMerge(acc, value); + } else { + updateHdr(acc, value); + } + + return acc; + } + + @Override + public Accumulator merge(Accumulator acc, Accumulator other) { + Object agg = acc.getMetricsFields().get(outputField); + Object aggOther = other.getMetricsFields().get(outputField); + Object rst; + + if(agg == null){ + rst = aggOther; + } else if (aggOther == null) { + rst = agg; + }else{ + rst = ((Histogramer)agg).merge(((Histogramer) aggOther)); + } + + if(rst != null){ + acc.getMetricsFields().put(outputField, rst); + } + return acc; + } + + protected void updateHdr(Accumulator acc, Object value) { + Map aggs = acc.getMetricsFields(); + ArrayHistogram his = (ArrayHistogram) aggs.get(outputField); + if (his == null) { + his = new ArrayHistogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); + his.setAutoResize(autoResize); + aggs.put(outputField, his); + } + + his.recordValue(((Number) value).longValue()); + } + + + protected void updateHdrMerge(Accumulator acc, Object value) { + Map aggs = acc.getMetricsFields(); + ArrayHistogram his = (ArrayHistogram) aggs.get(outputField); + if (his == null) { + his = new ArrayHistogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); + his.setAutoResize(autoResize); + aggs.put(outputField, his); + } + + Histogramer h; + if (value instanceof String) { + byte[] bytes = StringUtils.decodeBase64(((String) value).getBytes(StandardCharsets.UTF_8)); + h = DirectMapHistogram.wrapBytes(bytes); + } else if (value instanceof byte[]) { + h = DirectMapHistogram.wrapBytes((byte[]) value); + } else if (value instanceof Histogramer) { + h = (Histogramer) value; + } else { + throw new IllegalArgumentException("Unsupported type " + value.getClass()); + } + + his.merge(h); + } + + @Override + public void close() {} +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java index e7435eb..f319e8d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java @@ -34,12 +34,12 @@ public class LastValue implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if (udfContext.getLookup_fields() == null) { + if (udfContext.getLookupFields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { - this.outputField = udfContext.getOutput_fields().get(0); + this.lookupField = udfContext.getLookupFields().get(0); + if (udfContext.getOutputFields() != null && !udfContext.getOutputFields().isEmpty()) { + this.outputField = udfContext.getOutputFields().get(0); } else { outputField = lookupField; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java index bf5260e..418eb9c 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java @@ -14,10 +14,10 @@ public class LongCount implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if (udfContext.getOutput_fields() == null) { + if (udfContext.getOutputFields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - outputField = udfContext.getOutput_fields().get(0); + outputField = udfContext.getOutputFields().get(0); } @Override diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Max.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Max.java index e4632bb..226df0a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Max.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Max.java @@ -20,12 +20,12 @@ public class Max implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if (udfContext.getLookup_fields() == null) { + if (udfContext.getLookupFields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { - this.outputField = udfContext.getOutput_fields().get(0); + this.lookupField = udfContext.getLookupFields().get(0); + if (udfContext.getOutputFields() != null && !udfContext.getOutputFields().isEmpty()) { + this.outputField = udfContext.getOutputFields().get(0); } else { outputField = lookupField; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java index 9c4e070..88f693f 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java @@ -20,12 +20,12 @@ public class Mean implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if (udfContext.getLookup_fields() == null) { + if (udfContext.getLookupFields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - lookupField = udfContext.getLookup_fields().get(0); - if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { - outputField = udfContext.getOutput_fields().get(0); + lookupField = udfContext.getLookupFields().get(0); + if (udfContext.getOutputFields() != null && !udfContext.getOutputFields().isEmpty()) { + outputField = udfContext.getOutputFields().get(0); } else { outputField = lookupField; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Min.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Min.java index 70153c9..6fd7046 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Min.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Min.java @@ -20,12 +20,12 @@ public class Min implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if (udfContext.getLookup_fields() == null) { + if (udfContext.getLookupFields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupField = udfContext.getLookup_fields().get(0); - if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { - this.outputField = udfContext.getOutput_fields().get(0); + this.lookupField = udfContext.getLookupFields().get(0); + if (udfContext.getOutputFields() != null && !udfContext.getOutputFields().isEmpty()) { + this.outputField = udfContext.getOutputFields().get(0); } else { outputField = lookupField; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java index 031d161..ab8f744 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java @@ -15,12 +15,12 @@ public class NumberSum implements AggregateFunction { @Override public void open(UDFContext udfContext) { - if (udfContext.getLookup_fields() == null) { + if (udfContext.getLookupFields() == null) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - lookupField = udfContext.getLookup_fields().get(0); - if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) { - outputField = udfContext.getOutput_fields().get(0); + lookupField = udfContext.getLookupFields().get(0); + if (udfContext.getOutputFields() != null && !udfContext.getOutputFields().isEmpty()) { + outputField = udfContext.getOutputFields().get(0); } else { outputField = lookupField; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java index 0802c22..c113c4a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java @@ -1,126 +1,126 @@ -package com.geedgenetworks.core.udf.udaf.hlld; - -import com.geedgenetworks.common.Accumulator; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.udf.AggregateFunction; -import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.sketch.hlld.Hll; -import com.geedgenetworks.sketch.hlld.HllUnion; -import com.geedgenetworks.sketch.hlld.HllUtils; -import org.apache.commons.collections.CollectionUtils; - -import java.util.Map; - -public abstract class HlldBaseAggregate implements AggregateFunction { - protected String inputField; - protected String outputField; - protected boolean inputSketch; - protected int precision = 12; - - @Override - public void open(UDFContext c) { - inputField = c.getLookup_fields().get(0); - if (CollectionUtils.isNotEmpty(c.getOutput_fields())) { - outputField = c.getOutput_fields().get(0); - } else { - outputField = inputField; - } - Map params = c.getParameters(); - precision = Integer.parseInt(params.getOrDefault("precision", "12").toString()); - inputSketch = "sketch".equalsIgnoreCase(params.getOrDefault("input_type", "sketch").toString()); - } - - @Override - public Accumulator initAccumulator(Accumulator acc) { - return acc; - } - - @Override - public Accumulator add(Event event, Accumulator acc) { - Object value = event.getExtractedFields().get(inputField); - if (value == null) { - return acc; - } - - if (inputSketch) { - updateHllUnion(acc, value); - } else { - updateHll(acc, value); - } - - return acc; - } - - @Override - public Accumulator merge(Accumulator acc, Accumulator other) { - Object agg = acc.getMetricsFields().get(outputField); - Object aggOther = other.getMetricsFields().get(outputField); - Object rst; - - if(agg == null){ - rst = aggOther; - } else if (aggOther == null) { - rst = agg; - }else{ - if(inputSketch){ - ((HllUnion)agg).update(((HllUnion) aggOther).getResult()); - rst = agg; - }else{ - final HllUnion union = new HllUnion(precision); - union.update((Hll) agg); - union.update((Hll) aggOther); - rst = union.getResult(); - } - } - - if(rst != null){ - acc.getMetricsFields().put(outputField, rst); - } - return acc; - } - - protected Hll getResultHll(Accumulator acc){ - Object agg = acc.getMetricsFields().get(outputField); - if (agg == null) { - return null; - } - - return inputSketch ? ((HllUnion) agg).getResult() : (Hll) agg; - } - - protected void updateHll(Accumulator acc, Object value) { - Map aggs = acc.getMetricsFields(); - Hll hll = (Hll) aggs.get(outputField); - if (hll == null) { - hll = new Hll(precision); - aggs.put(outputField, hll); - } - - if (value instanceof Integer || value instanceof Long) { - hll.add(((Number) value).longValue()); - } else if (value instanceof Float || value instanceof Double) { - hll.add(((Number) value).doubleValue()); - } else if (value instanceof String) { - hll.add((String) value); - } else if (value instanceof byte[]) { - hll.add((byte[]) value); - } else { - throw new IllegalArgumentException("Unsupported type " + value.getClass()); - } - } - - protected void updateHllUnion(Accumulator acc, Object value) { - Map aggs = acc.getMetricsFields(); - HllUnion hllUnion = (HllUnion) aggs.get(outputField); - if (hllUnion == null) { - hllUnion = new HllUnion(precision); - aggs.put(outputField, hllUnion); - } - - Hll hll = HllUtils.deserializeHll(value); - hllUnion.update(hll); - } - - @Override - public void close() {} -} +package com.geedgenetworks.core.udf.udaf.hlld; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.sketch.hlld.Hll; +import com.geedgenetworks.sketch.hlld.HllUnion; +import com.geedgenetworks.sketch.hlld.HllUtils; +import org.apache.commons.collections.CollectionUtils; + +import java.util.Map; + +public abstract class HlldBaseAggregate implements AggregateFunction { + protected String inputField; + protected String outputField; + protected boolean inputSketch; + protected int precision = 12; + + @Override + public void open(UDFContext c) { + inputField = c.getLookupFields().get(0); + if (CollectionUtils.isNotEmpty(c.getOutputFields())) { + outputField = c.getOutputFields().get(0); + } else { + outputField = inputField; + } + Map params = c.getParameters(); + precision = Integer.parseInt(params.getOrDefault("precision", "12").toString()); + inputSketch = "sketch".equalsIgnoreCase(params.getOrDefault("input_type", "sketch").toString()); + } + + @Override + public Accumulator initAccumulator(Accumulator acc) { + return acc; + } + + @Override + public Accumulator add(Event event, Accumulator acc) { + Object value = event.getExtractedFields().get(inputField); + if (value == null) { + return acc; + } + + if (inputSketch) { + updateHllUnion(acc, value); + } else { + updateHll(acc, value); + } + + return acc; + } + + @Override + public Accumulator merge(Accumulator acc, Accumulator other) { + Object agg = acc.getMetricsFields().get(outputField); + Object aggOther = other.getMetricsFields().get(outputField); + Object rst; + + if(agg == null){ + rst = aggOther; + } else if (aggOther == null) { + rst = agg; + }else{ + if(inputSketch){ + ((HllUnion)agg).update(((HllUnion) aggOther).getResult()); + rst = agg; + }else{ + final HllUnion union = new HllUnion(precision); + union.update((Hll) agg); + union.update((Hll) aggOther); + rst = union.getResult(); + } + } + + if(rst != null){ + acc.getMetricsFields().put(outputField, rst); + } + return acc; + } + + protected Hll getResultHll(Accumulator acc){ + Object agg = acc.getMetricsFields().get(outputField); + if (agg == null) { + return null; + } + + return inputSketch ? ((HllUnion) agg).getResult() : (Hll) agg; + } + + protected void updateHll(Accumulator acc, Object value) { + Map aggs = acc.getMetricsFields(); + Hll hll = (Hll) aggs.get(outputField); + if (hll == null) { + hll = new Hll(precision); + aggs.put(outputField, hll); + } + + if (value instanceof Integer || value instanceof Long) { + hll.add(((Number) value).longValue()); + } else if (value instanceof Float || value instanceof Double) { + hll.add(((Number) value).doubleValue()); + } else if (value instanceof String) { + hll.add((String) value); + } else if (value instanceof byte[]) { + hll.add((byte[]) value); + } else { + throw new IllegalArgumentException("Unsupported type " + value.getClass()); + } + } + + protected void updateHllUnion(Accumulator acc, Object value) { + Map aggs = acc.getMetricsFields(); + HllUnion hllUnion = (HllUnion) aggs.get(outputField); + if (hllUnion == null) { + hllUnion = new HllUnion(precision); + aggs.put(outputField, hllUnion); + } + + Hll hll = HllUtils.deserializeHll(value); + hllUnion.update(hll); + } + + @Override + public void close() {} +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/JsonUnroll.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/JsonUnroll.java index 012c2df..de89e2c 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/JsonUnroll.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/JsonUnroll.java @@ -34,12 +34,12 @@ public class JsonUnroll implements TableFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if(udfContext.getLookupFields()==null ){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupFieldName = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { - this.outputFieldName = udfContext.getOutput_fields().get(0); + this.lookupFieldName = udfContext.getLookupFields().get(0); + if(udfContext.getOutputFields()!=null && !udfContext.getOutputFields().isEmpty()) { + this.outputFieldName = udfContext.getOutputFields().get(0); } else { outputFieldName = lookupFieldName; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/PathUnroll.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/PathUnroll.java index e6514a2..e5732e0 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/PathUnroll.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/PathUnroll.java @@ -1,118 +1,118 @@ -package com.geedgenetworks.core.udf.udtf; - -import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.udf.TableFunction; -import com.geedgenetworks.common.udf.UDFContext; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.util.Preconditions; - -import java.util.*; - -public class PathUnroll implements TableFunction { - private String pathField; - private String fileField; - private char separator; - - private String outputPathField; - private String outputFileField; - private List events; - - @Override - public void open(RuntimeContext runtimeContext, UDFContext c) { - Preconditions.checkArgument(c.getLookup_fields().size() >= 1, "input fields requested one path param at least"); - Preconditions.checkArgument(CollectionUtils.isEmpty(c.getOutput_fields()) || c.getOutput_fields().size() == c.getOutput_fields().size(), "output fields requested same count param with input fields"); - pathField = c.getLookup_fields().get(0); - fileField = c.getLookup_fields().size() == 1? null: c.getLookup_fields().get(1); - - outputPathField = CollectionUtils.isEmpty(c.getOutput_fields())? pathField : c.getOutput_fields().get(0); - outputFileField = CollectionUtils.isEmpty(c.getOutput_fields()) || c.getLookup_fields().size() == 1 ? fileField : c.getOutput_fields().get(1); - Map params = c.getParameters() == null? Collections.EMPTY_MAP:c.getParameters(); - String sep = params.getOrDefault("separator", "/").toString(); - Preconditions.checkArgument(sep.length() == 1, "separator mush has one char"); - separator = sep.charAt(0); - events = new ArrayList<>(); - } - - @Override - public List evaluate(Event event) { - Map map = event.getExtractedFields(); - String p = (String) map.get(pathField); - // 去除path结尾的分隔符 - final String path = StringUtils.isBlank(p)? null: (separator != p.charAt(p.length() - 1) ? p: p.substring(0, p.length() - 1)); - final String fileName = fileField == null? null: (String) map.get(fileField); - - if (StringUtils.isBlank(path)) { - return Collections.emptyList(); - } - - if(events.size() > 100){ - events = new ArrayList<>(); - }else if(events.size() > 0){ - events.clear(); - } - Event e; - Map fields; - - // 拆分path - int index = path.indexOf(separator); - String subPath; - while (index > 0) { - subPath = path.substring(0, index); - e = new Event(); - fields = new HashMap<>(map); - fields.put(outputPathField, subPath); - if(outputFileField != null){ - fields.put(outputFileField, null); - } - e.setExtractedFields(fields); - events.add(e); - index = path.indexOf(separator, index + 1); - } - boolean hasFile = StringUtils.isNotBlank(fileName); - boolean pathContainsFile = hasFile && path.endsWith(fileName); - - if(!hasFile){ - e = new Event(); - fields = new HashMap<>(map); - fields.put(outputPathField, path); - if(outputFileField != null){ - fields.put(outputFileField, null); - } - e.setExtractedFields(fields); - events.add(e); - }else{ - e = new Event(); - fields = new HashMap<>(map); - fields.put(outputPathField, path); - if(outputFileField != null){ - fields.put(outputFileField, pathContainsFile? fileName:null); - } - e.setExtractedFields(fields); - events.add(e); - - // 输出path + file - if(!pathContainsFile){ - e = new Event(); - fields = new HashMap<>(map); - fields.put(outputPathField, path + separator + fileName); - if(outputFileField != null){ - fields.put(outputFileField, fileName); - } - e.setExtractedFields(fields); - events.add(e); - } - } - - return events; - } - - @Override - public void close() {} - - @Override - public String functionName() { - return "PATH_UNROLL"; - } -} +package com.geedgenetworks.core.udf.udtf; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.TableFunction; +import com.geedgenetworks.common.udf.UDFContext; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.util.Preconditions; + +import java.util.*; + +public class PathUnroll implements TableFunction { + private String pathField; + private String fileField; + private char separator; + + private String outputPathField; + private String outputFileField; + private List events; + + @Override + public void open(RuntimeContext runtimeContext, UDFContext c) { + Preconditions.checkArgument(c.getLookupFields().size() >= 1, "input fields requested one path param at least"); + Preconditions.checkArgument(CollectionUtils.isEmpty(c.getOutputFields()) || c.getOutputFields().size() == c.getOutputFields().size(), "output fields requested same count param with input fields"); + pathField = c.getLookupFields().get(0); + fileField = c.getLookupFields().size() == 1? null: c.getLookupFields().get(1); + + outputPathField = CollectionUtils.isEmpty(c.getOutputFields())? pathField : c.getOutputFields().get(0); + outputFileField = CollectionUtils.isEmpty(c.getOutputFields()) || c.getLookupFields().size() == 1 ? fileField : c.getOutputFields().get(1); + Map params = c.getParameters() == null? Collections.EMPTY_MAP:c.getParameters(); + String sep = params.getOrDefault("separator", "/").toString(); + Preconditions.checkArgument(sep.length() == 1, "separator mush has one char"); + separator = sep.charAt(0); + events = new ArrayList<>(); + } + + @Override + public List evaluate(Event event) { + Map map = event.getExtractedFields(); + String p = (String) map.get(pathField); + // 去除path结尾的分隔符 + final String path = StringUtils.isBlank(p)? null: (separator != p.charAt(p.length() - 1) ? p: p.substring(0, p.length() - 1)); + final String fileName = fileField == null? null: (String) map.get(fileField); + + if (StringUtils.isBlank(path)) { + return Collections.emptyList(); + } + + if(events.size() > 100){ + events = new ArrayList<>(); + }else if(events.size() > 0){ + events.clear(); + } + Event e; + Map fields; + + // 拆分path + int index = path.indexOf(separator); + String subPath; + while (index > 0) { + subPath = path.substring(0, index); + e = new Event(); + fields = new HashMap<>(map); + fields.put(outputPathField, subPath); + if(outputFileField != null){ + fields.put(outputFileField, null); + } + e.setExtractedFields(fields); + events.add(e); + index = path.indexOf(separator, index + 1); + } + boolean hasFile = StringUtils.isNotBlank(fileName); + boolean pathContainsFile = hasFile && path.endsWith(fileName); + + if(!hasFile){ + e = new Event(); + fields = new HashMap<>(map); + fields.put(outputPathField, path); + if(outputFileField != null){ + fields.put(outputFileField, null); + } + e.setExtractedFields(fields); + events.add(e); + }else{ + e = new Event(); + fields = new HashMap<>(map); + fields.put(outputPathField, path); + if(outputFileField != null){ + fields.put(outputFileField, pathContainsFile? fileName:null); + } + e.setExtractedFields(fields); + events.add(e); + + // 输出path + file + if(!pathContainsFile){ + e = new Event(); + fields = new HashMap<>(map); + fields.put(outputPathField, path + separator + fileName); + if(outputFileField != null){ + fields.put(outputFileField, fileName); + } + e.setExtractedFields(fields); + events.add(e); + } + } + + return events; + } + + @Override + public void close() {} + + @Override + public String functionName() { + return "PATH_UNROLL"; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/Unroll.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/Unroll.java index 9fd5a6e..ff4a9ae 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/Unroll.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/Unroll.java @@ -21,12 +21,12 @@ public class Unroll implements TableFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if(udfContext.getLookup_fields()==null ){ + if(udfContext.getLookupFields()==null ){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - this.lookupFieldName = udfContext.getLookup_fields().get(0); - if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { - this.outputFieldName = udfContext.getOutput_fields().get(0); + this.lookupFieldName = udfContext.getLookupFields().get(0); + if(udfContext.getOutputFields()!=null && !udfContext.getOutputFields().isEmpty()) { + this.outputFieldName = udfContext.getOutputFields().get(0); } else { outputFieldName = lookupFieldName; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java index 1ce65bc..4e9a031 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java @@ -17,13 +17,13 @@ public class UUID implements ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if(udfContext.getOutput_fields()==null ){ + if(udfContext.getOutputFields()==null ){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - if(udfContext.getOutput_fields().size() != 1){ + if(udfContext.getOutputFields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } - this.outputFieldName = udfContext.getOutput_fields().get(0); + this.outputFieldName = udfContext.getOutputFields().get(0); this.randomBasedGenerator = Generators.randomBasedGenerator(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java index b4ad808..3a433b8 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java @@ -21,18 +21,18 @@ public class UUIDv5 implements ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if(udfContext.getOutput_fields() == null || udfContext.getParameters() == null || udfContext.getLookup_fields() == null){ + if(udfContext.getOutputFields() == null || udfContext.getParameters() == null || udfContext.getLookupFields() == null){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - if(udfContext.getOutput_fields().size() != 1){ + if(udfContext.getOutputFields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } if(!udfContext.getParameters().containsKey(NAMESPACE_KEY) ){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Parameters must contain key: " + NAMESPACE_KEY); } - this.outputFieldName = udfContext.getOutput_fields().get(0); - this.lookupFieldNames = udfContext.getLookup_fields(); + this.outputFieldName = udfContext.getOutputFields().get(0); + this.lookupFieldNames = udfContext.getLookupFields(); String namespace = udfContext.getParameters().get(NAMESPACE_KEY).toString(); this.nameBasedGenerator = Generators.nameBasedGenerator(UUIDNameSpace.getUUID(namespace)); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java index 49025ef..60c388f 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java @@ -18,13 +18,13 @@ public class UUIDv7 implements ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if(udfContext.getOutput_fields()==null ){ + if(udfContext.getOutputFields()==null ){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } - if(udfContext.getOutput_fields().size() != 1){ + if(udfContext.getOutputFields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } - this.outputFieldName = udfContext.getOutput_fields().get(0); + this.outputFieldName = udfContext.getOutputFields().get(0); this.timeBasedEpochRandomGenerator = Generators.timeBasedEpochGenerator(); } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AnonymityLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AnonymityLookupTest.java index a52deb1..ae74fea 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AnonymityLookupTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AnonymityLookupTest.java @@ -37,8 +37,8 @@ class AnonymityLookupTest { parameters.put("kb_name", kbName); parameters.put("option", "IP_TO_NODE_TYPE"); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("server_ip")); - udfContext.setOutput_fields(Collections.singletonList("server_node_type")); + udfContext.setLookupFields(Collections.singletonList("server_ip")); + udfContext.setOutputFields(Collections.singletonList("server_node_type")); anonymityLookup.open(runtimeContext, udfContext); Event event = new Event(); @@ -56,8 +56,8 @@ class AnonymityLookupTest { parameters.put("kb_name", kbName); parameters.put("option", "DOMAIN_TO_NODE_TYPE"); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("domain")); - udfContext.setOutput_fields(Collections.singletonList("domain_node_type")); + udfContext.setLookupFields(Collections.singletonList("domain")); + udfContext.setOutputFields(Collections.singletonList("domain_node_type")); anonymityLookup.open(runtimeContext, udfContext); Event event = new Event(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AppCategoryLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AppCategoryLookupTest.java index f74ce39..713be3f 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AppCategoryLookupTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AppCategoryLookupTest.java @@ -30,7 +30,7 @@ class AppCategoryLookupTest { fieldMapping.put("COMPANY_CATEGORY", "app_company_category"); parameters.put("field_mapping", fieldMapping); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("app")); + udfContext.setLookupFields(Collections.singletonList("app")); RuntimeContext runtimeContext = mockRuntimeContext(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/BaseStationLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/BaseStationLookupTest.java index db52b3e..43b0bd5 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/BaseStationLookupTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/BaseStationLookupTest.java @@ -38,8 +38,8 @@ class BaseStationLookupTest { Map parameters = new HashMap<>(); parameters.put("kb_name", kbName); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Arrays.asList("cell_id")); - udfContext.setOutput_fields(Arrays.asList("subscriber_longitude", "subscriber_latitude")); + udfContext.setLookupFields(Arrays.asList("cell_id")); + udfContext.setOutputFields(Arrays.asList("subscriber_longitude", "subscriber_latitude")); RuntimeContext runtimeContext = mockRuntimeContext(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookupTest.java index 7f526d5..d1cca09 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookupTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookupTest.java @@ -25,8 +25,8 @@ class DnsServerInfoLookupTest { Map parameters = new HashMap<>(); parameters.put("kb_name", kbName); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("server_ip")); - udfContext.setOutput_fields(Collections.singletonList("server_dns_server")); + udfContext.setLookupFields(Collections.singletonList("server_ip")); + udfContext.setOutputFields(Collections.singletonList("server_dns_server")); RuntimeContext runtimeContext = mockRuntimeContext(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookupTest.java index db15642..0e64982 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookupTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookupTest.java @@ -29,7 +29,7 @@ class FqdnCategoryLookupTest { fieldMapping.put("REPUTATION_LEVEL", "domain_reputation_level"); parameters.put("field_mapping", fieldMapping); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("domain")); + udfContext.setLookupFields(Collections.singletonList("domain")); RuntimeContext runtimeContext = mockRuntimeContext(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookupTest.java index 42a98dc..93ee663 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookupTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookupTest.java @@ -24,8 +24,8 @@ class FqdnWhoisLookupTest { Map parameters = new HashMap<>(); parameters.put("kb_name", kbName); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("domain")); - udfContext.setOutput_fields(Collections.singletonList("domain_whois_org")); + udfContext.setLookupFields(Collections.singletonList("domain")); + udfContext.setOutputFields(Collections.singletonList("domain_whois_org")); RuntimeContext runtimeContext = mockRuntimeContext(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/H3CellLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/H3CellLookupTest.java index 641f58a..a7b98ab 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/H3CellLookupTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/H3CellLookupTest.java @@ -36,8 +36,8 @@ public class H3CellLookupTest { * resolution: 9 */ UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(Arrays.asList("subscriber_longitude", "subscriber_latitude")); - udfContext.setOutput_fields(Arrays.asList("first_location")); + udfContext.setLookupFields(Arrays.asList("subscriber_longitude", "subscriber_latitude")); + udfContext.setOutputFields(Arrays.asList("first_location")); Map parameters = new HashMap<>(); parameters.put("resolution", 9); udfContext.setParameters(parameters); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IcpLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IcpLookupTest.java index 3158124..c2032e0 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IcpLookupTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IcpLookupTest.java @@ -24,8 +24,8 @@ class IcpLookupTest { Map parameters = new HashMap<>(); parameters.put("kb_name", kbName); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("domain")); - udfContext.setOutput_fields(Collections.singletonList("domain_icp_company_name")); + udfContext.setLookupFields(Collections.singletonList("domain")); + udfContext.setOutputFields(Collections.singletonList("domain_icp_company_name")); RuntimeContext runtimeContext = mockRuntimeContext(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IdcRenterLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IdcRenterLookupTest.java index b15096b..7409a2f 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IdcRenterLookupTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IdcRenterLookupTest.java @@ -24,8 +24,8 @@ class IdcRenterLookupTest { Map parameters = new HashMap<>(); parameters.put("kb_name", kbName); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("server_ip")); - udfContext.setOutput_fields(Collections.singletonList("server_idc_renter")); + udfContext.setLookupFields(Collections.singletonList("server_ip")); + udfContext.setOutputFields(Collections.singletonList("server_idc_renter")); RuntimeContext runtimeContext = mockRuntimeContext(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java index 34fef6b..c946d35 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java @@ -40,8 +40,8 @@ public class IntelligenceIndicatorLookupTest { parameters.put("kb_name", kbName); parameters.put("option", "IP_TO_TAG"); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("server_ip")); - udfContext.setOutput_fields(Collections.singletonList("server_ip_tags")); + udfContext.setLookupFields(Collections.singletonList("server_ip")); + udfContext.setOutputFields(Collections.singletonList("server_ip_tags")); intelligenceIndicatorLookup.open(runtimeContext, udfContext); Event event = new Event(); @@ -59,8 +59,8 @@ public class IntelligenceIndicatorLookupTest { parameters.put("kb_name", kbName); parameters.put("option", "IP_TO_TAG"); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("server_ip")); - udfContext.setOutput_fields(Collections.singletonList("server_ip_tags")); + udfContext.setLookupFields(Collections.singletonList("server_ip")); + udfContext.setOutputFields(Collections.singletonList("server_ip_tags")); intelligenceIndicatorLookup.open(runtimeContext, udfContext); Event event = new Event(); @@ -82,8 +82,8 @@ public class IntelligenceIndicatorLookupTest { parameters.put("kb_name", kbName); parameters.put("option", "DOMAIN_TO_TAG"); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("domain")); - udfContext.setOutput_fields(Collections.singletonList("domain_tags")); + udfContext.setLookupFields(Collections.singletonList("domain")); + udfContext.setOutputFields(Collections.singletonList("domain_tags")); intelligenceIndicatorLookup.open(runtimeContext, udfContext); Event event = new Event(); @@ -101,8 +101,8 @@ public class IntelligenceIndicatorLookupTest { parameters.put("kb_name", kbName); parameters.put("option", "DOMAIN_TO_TAG"); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("domain")); - udfContext.setOutput_fields(Collections.singletonList("domain_tags")); + udfContext.setLookupFields(Collections.singletonList("domain")); + udfContext.setOutputFields(Collections.singletonList("domain_tags")); intelligenceIndicatorLookup.open(runtimeContext, udfContext); Event event = new Event(); @@ -124,8 +124,8 @@ public class IntelligenceIndicatorLookupTest { parameters.put("kb_name", kbName); parameters.put("option", "DOMAIN_TO_TAG"); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("domain")); - udfContext.setOutput_fields(Collections.singletonList("domain_tags")); + udfContext.setLookupFields(Collections.singletonList("domain")); + udfContext.setOutputFields(Collections.singletonList("domain_tags")); intelligenceIndicatorLookup.open(runtimeContext, udfContext); Event event = new Event(); @@ -143,8 +143,8 @@ public class IntelligenceIndicatorLookupTest { parameters.put("kb_name", kbName); parameters.put("option", "IP_TO_TAG"); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("server_ip")); - udfContext.setOutput_fields(Collections.singletonList("server_ip_tags")); + udfContext.setLookupFields(Collections.singletonList("server_ip")); + udfContext.setOutputFields(Collections.singletonList("server_ip_tags")); intelligenceIndicatorLookup.open(runtimeContext, udfContext); Event event = new Event(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IocLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IocLookupTest.java index f9d3b25..8c01bc7 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IocLookupTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IocLookupTest.java @@ -37,8 +37,8 @@ class IocLookupTest { parameters.put("kb_name", kbName); parameters.put("option", "IP_TO_MALWARE"); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("server_ip")); - udfContext.setOutput_fields(Collections.singletonList("server_malware")); + udfContext.setLookupFields(Collections.singletonList("server_ip")); + udfContext.setOutputFields(Collections.singletonList("server_malware")); iocLookup.open(runtimeContext, udfContext); Event event = new Event(); @@ -56,8 +56,8 @@ class IocLookupTest { parameters.put("kb_name", kbName); parameters.put("option", "DOMAIN_TO_MALWARE"); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("domain")); - udfContext.setOutput_fields(Collections.singletonList("domain_malware")); + udfContext.setLookupFields(Collections.singletonList("domain")); + udfContext.setOutputFields(Collections.singletonList("domain_malware")); iocLookup.open(runtimeContext, udfContext); Event event = new Event(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IpZoneLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IpZoneLookupTest.java index abe5ba0..e3024b5 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IpZoneLookupTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IpZoneLookupTest.java @@ -24,8 +24,8 @@ class IpZoneLookupTest { Map parameters = new HashMap<>(); parameters.put("kb_name", kbName); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("client_ip")); - udfContext.setOutput_fields(Collections.singletonList("client_zone")); + udfContext.setLookupFields(Collections.singletonList("client_ip")); + udfContext.setOutputFields(Collections.singletonList("client_zone")); RuntimeContext runtimeContext = mockRuntimeContext(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookupTest.java index c0a06fe..4f2f551 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookupTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookupTest.java @@ -24,8 +24,8 @@ class LinkDirectionLookupTest { Map parameters = new HashMap<>(); parameters.put("kb_name", kbName); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("in_link_id")); - udfContext.setOutput_fields(Collections.singletonList("in_link_direction")); + udfContext.setLookupFields(Collections.singletonList("in_link_id")); + udfContext.setOutputFields(Collections.singletonList("in_link_direction")); RuntimeContext runtimeContext = mockRuntimeContext(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookupTest.java index 9688199..a586aeb 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookupTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookupTest.java @@ -37,8 +37,8 @@ class UserDefineTagLookupTest { parameters.put("kb_name", kbName); parameters.put("option", "IP_TO_TAG"); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("client_ip")); - udfContext.setOutput_fields(Collections.singletonList("client_ip_tags")); + udfContext.setLookupFields(Collections.singletonList("client_ip")); + udfContext.setOutputFields(Collections.singletonList("client_ip_tags")); userDefineTagLookup.open(runtimeContext, udfContext); Event event = new Event(); @@ -59,8 +59,8 @@ class UserDefineTagLookupTest { parameters.put("kb_name", kbName); parameters.put("option", "DOMAIN_TO_TAG"); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("domain")); - udfContext.setOutput_fields(Collections.singletonList("domain_tags")); + udfContext.setLookupFields(Collections.singletonList("domain")); + udfContext.setOutputFields(Collections.singletonList("domain_tags")); userDefineTagLookup.open(runtimeContext, udfContext); Event event = new Event(); @@ -81,8 +81,8 @@ class UserDefineTagLookupTest { parameters.put("kb_name", kbName); parameters.put("option", "DOMAIN_TO_TAG"); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("domain")); - udfContext.setOutput_fields(Collections.singletonList("domain_tags")); + udfContext.setLookupFields(Collections.singletonList("domain")); + udfContext.setOutputFields(Collections.singletonList("domain_tags")); userDefineTagLookup.open(runtimeContext, udfContext); Event event = new Event(); @@ -103,8 +103,8 @@ class UserDefineTagLookupTest { parameters.put("kb_name", kbName); parameters.put("option", "APP_TO_TAG"); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("app")); - udfContext.setOutput_fields(Collections.singletonList("app_tags")); + udfContext.setLookupFields(Collections.singletonList("app")); + udfContext.setOutputFields(Collections.singletonList("app_tags")); userDefineTagLookup.open(runtimeContext, udfContext); Event event = new Event(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/VpnLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/VpnLookupTest.java index 3dd0992..50374f7 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/VpnLookupTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/VpnLookupTest.java @@ -30,8 +30,8 @@ class VpnLookupTest { parameters.put("kb_name", kbName); parameters.put("option", "IP_TO_VPN"); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("server_ip")); - udfContext.setOutput_fields(Collections.singletonList("server_vpn_service_name")); + udfContext.setLookupFields(Collections.singletonList("server_ip")); + udfContext.setOutputFields(Collections.singletonList("server_vpn_service_name")); RuntimeContext runtimeContext = mockRuntimeContext(); @@ -55,8 +55,8 @@ class VpnLookupTest { parameters.put("kb_name", kbName); parameters.put("option", "DOMAIN_TO_VPN"); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("domain")); - udfContext.setOutput_fields(Collections.singletonList("domain_vpn_service_name")); + udfContext.setLookupFields(Collections.singletonList("domain")); + udfContext.setOutputFields(Collections.singletonList("domain_vpn_service_name")); RuntimeContext runtimeContext = mockRuntimeContext(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java index 1e5c0a3..f10fe2b 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java @@ -26,8 +26,8 @@ public class AsnLookupFunctionTest { udfContext = new UDFContext(); parameters = new HashMap<>(); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("ip")); - udfContext.setOutput_fields(Collections.singletonList("asn")); + udfContext.setLookupFields(Collections.singletonList("ip")); + udfContext.setOutputFields(Collections.singletonList("asn")); } @@ -36,7 +36,7 @@ public class AsnLookupFunctionTest { @Test public void testInit(){ AsnLookup asnLookup = new AsnLookup(); - udfContext.setLookup_fields(new ArrayList<>()); + udfContext.setLookupFields(new ArrayList<>()); udfContext.setParameters(new HashMap<>()); udfContext.setParameters(null); Assertions.assertThrows(GrootStreamRuntimeException.class, () -> { @@ -48,10 +48,10 @@ public class AsnLookupFunctionTest { asnLookup.open(null, udfContext); }); - udfContext.setLookup_fields(new ArrayList<>()); - udfContext.getLookup_fields().add("v1"); - udfContext.setOutput_fields(new ArrayList<>()); - udfContext.getOutput_fields().add("v2"); + udfContext.setLookupFields(new ArrayList<>()); + udfContext.getLookupFields().add("v1"); + udfContext.setOutputFields(new ArrayList<>()); + udfContext.getOutputFields().add("v2"); udfContext.setParameters(new HashMap<>()); udfContext.getParameters().put("option","other"); Assertions.assertThrows(GrootStreamRuntimeException.class, () -> { diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java index bc67f1a..83f8ab4 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java @@ -25,14 +25,14 @@ public class GeoIpLookupFunctionTest { @BeforeAll public static void setUp() { udfContext = new UDFContext(); - udfContext.setLookup_fields(Collections.singletonList("ip")); - udfContext.setOutput_fields(Collections.singletonList("iplocation")); + udfContext.setLookupFields(Collections.singletonList("ip")); + udfContext.setOutputFields(Collections.singletonList("iplocation")); } @Test public void testInit(){ GeoIpLookup geoIpLookup = new GeoIpLookup(); - udfContext.setLookup_fields(new ArrayList<>()); + udfContext.setLookupFields(new ArrayList<>()); udfContext.setParameters(new HashMap<>()); udfContext.setParameters(null); Assertions.assertThrows(GrootStreamRuntimeException.class, () -> { @@ -44,10 +44,10 @@ public class GeoIpLookupFunctionTest { geoIpLookup.open(null, udfContext); }); - udfContext.setLookup_fields(new ArrayList<>()); - udfContext.getLookup_fields().add("v1"); - udfContext.setOutput_fields(new ArrayList<>()); - udfContext.getOutput_fields().add("v2"); + udfContext.setLookupFields(new ArrayList<>()); + udfContext.getLookupFields().add("v1"); + udfContext.setOutputFields(new ArrayList<>()); + udfContext.getOutputFields().add("v2"); udfContext.setParameters(new HashMap<>()); udfContext.getParameters().put("option","other"); Assertions.assertThrows(GrootStreamRuntimeException.class, () -> { diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java index cc96993..16d5cce 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java @@ -30,8 +30,8 @@ public class CollectListTest { private void testMerge(List arr,List arr2) { UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("field_list")); + udfContext.setLookupFields(List.of("field")); + udfContext.setOutputFields(Collections.singletonList("field_list")); CollectList collectList = new CollectList(); Map metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); @@ -68,8 +68,8 @@ public class CollectListTest { private void testGetResult(List arr) throws ParseException { UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("field_list")); + udfContext.setLookupFields(List.of("field")); + udfContext.setOutputFields(Collections.singletonList("field_list")); CollectList collectList = new CollectList(); Map metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java index cf897f6..8909794 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java @@ -28,8 +28,8 @@ public class CollectSetTest { private void testMerge(List arr,List arr2) { UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("field_list")); + udfContext.setLookupFields(List.of("field")); + udfContext.setOutputFields(Collections.singletonList("field_list")); CollectSet collectSet = new CollectSet(); Map metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); @@ -65,8 +65,8 @@ public class CollectSetTest { private static void testGetResult(List arr) throws ParseException { UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("field_list")); + udfContext.setLookupFields(List.of("field")); + udfContext.setOutputFields(Collections.singletonList("field_list")); CollectSet collectSet = new CollectSet(); Map metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java index d114a09..0acf1d5 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java @@ -27,8 +27,8 @@ public class FirstValueTest { private void testMerge(List arr,List arr2) { UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("field_first")); + udfContext.setLookupFields(List.of("field")); + udfContext.setOutputFields(Collections.singletonList("field_first")); FirstValue firstValue = new FirstValue(); Map metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); @@ -63,8 +63,8 @@ public class FirstValueTest { private static void testGetResult(List arr) throws ParseException { UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("field_first")); + udfContext.setLookupFields(List.of("field")); + udfContext.setOutputFields(Collections.singletonList("field_first")); FirstValue firstValue = new FirstValue(); Map metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java index 3f5432f..b2c9ceb 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java @@ -30,8 +30,8 @@ public class LastValueTest { private void testMerge(List arr,List arr2) { UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("field_last")); + udfContext.setLookupFields(List.of("field")); + udfContext.setOutputFields(Collections.singletonList("field_last")); LastValue lastValue = new LastValue(); Map metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); @@ -66,8 +66,8 @@ public class LastValueTest { private static void testGetResult(List arr) throws ParseException { UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("field_last")); + udfContext.setLookupFields(List.of("field")); + udfContext.setOutputFields(Collections.singletonList("field_last")); LastValue lastValue = new LastValue(); Map metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java index e747c42..54d9dba 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java @@ -31,8 +31,8 @@ public class LongCountTest { private void testMerge(Number[] arr,Number[] arr2) { UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("count")); + udfContext.setLookupFields(List.of("field")); + udfContext.setOutputFields(Collections.singletonList("count")); LongCount longCount = new LongCount(); Map metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); @@ -66,7 +66,7 @@ public class LongCountTest { private static void testGetResult(Number[] arr) throws ParseException { UDFContext udfContext = new UDFContext(); - udfContext.setOutput_fields(Collections.singletonList("count")); + udfContext.setOutputFields(Collections.singletonList("count")); LongCount longCount = new LongCount(); Map metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MaxTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MaxTest.java index 67fec93..311d51f 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MaxTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MaxTest.java @@ -31,8 +31,8 @@ public class MaxTest { // 初始化上下文 UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("value")); - udfContext.setOutput_fields(List.of("maxValue")); + udfContext.setLookupFields(List.of("value")); + udfContext.setOutputFields(List.of("maxValue")); maxFunction.open(udfContext); // 初始化累加器的 metricsFields diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java index 8255cb1..62efc0a 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java @@ -31,8 +31,8 @@ public class MeanTest { } private void testMerge(Number[] arr1,Number[] arr2,int precision) throws ParseException { UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("field_mean")); + udfContext.setLookupFields(List.of("field")); + udfContext.setOutputFields(Collections.singletonList("field_mean")); udfContext.setParameters(new HashMap<>()); udfContext.getParameters().put("precision", precision); Mean mean = new Mean(); @@ -47,8 +47,8 @@ public class MeanTest { } private Accumulator getMiddleResult(Number[] arr,int precision) throws ParseException { UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("field_mean")); + udfContext.setLookupFields(List.of("field")); + udfContext.setOutputFields(Collections.singletonList("field_mean")); udfContext.setParameters(new HashMap<>()); udfContext.getParameters().put("precision", precision); Mean mean = new Mean(); @@ -72,8 +72,8 @@ public class MeanTest { private void testInt(Number[] arr,int precision) throws ParseException { UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("field_mean")); + udfContext.setLookupFields(List.of("field")); + udfContext.setOutputFields(Collections.singletonList("field_mean")); udfContext.setParameters(new HashMap<>()); udfContext.getParameters().put("precision", precision); Mean mean = new Mean(); @@ -98,8 +98,8 @@ public class MeanTest { private void testDouble(Number[] arr,int precision) throws ParseException { UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("field_mean")); + udfContext.setLookupFields(List.of("field")); + udfContext.setOutputFields(Collections.singletonList("field_mean")); udfContext.setParameters(new HashMap<>()); udfContext.getParameters().put("precision", precision); Mean mean = new Mean(); @@ -123,8 +123,8 @@ public class MeanTest { private void testNoPrecision(Number[] arr) throws ParseException { UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("field_mean")); + udfContext.setLookupFields(List.of("field")); + udfContext.setOutputFields(Collections.singletonList("field_mean")); Mean mean = new Mean(); Map metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MinTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MinTest.java index ee63137..e5a1615 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MinTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MinTest.java @@ -26,8 +26,8 @@ public class MinTest { void setUp() { minFunction = new Min(); udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("value")); - udfContext.setOutput_fields(List.of("minValue")); + udfContext.setLookupFields(List.of("value")); + udfContext.setOutputFields(List.of("minValue")); minFunction.open(udfContext); acc = new Accumulator(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java index a4072ca..7ccb365 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java @@ -48,8 +48,8 @@ public class NumberSumTest { private void testMerge(Number[] arr,Number[] arr2) { UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("field_sum")); + udfContext.setLookupFields(List.of("field")); + udfContext.setOutputFields(Collections.singletonList("field_sum")); NumberSum numberSum = new NumberSum(); Map metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); @@ -83,8 +83,8 @@ public class NumberSumTest { private static void excute(Number[] arr, Class clazz) throws ParseException { UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("field")); - udfContext.setOutput_fields(Collections.singletonList("field_sum")); + udfContext.setLookupFields(List.of("field")); + udfContext.setOutputFields(Collections.singletonList("field_sum")); NumberSum numberSum = new NumberSum(); Map metricsFields = new HashMap<>(); Accumulator accumulator = new Accumulator(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DecodeBase64FunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DecodeBase64FunctionTest.java index 1d3b863..a5f31f7 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DecodeBase64FunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DecodeBase64FunctionTest.java @@ -22,8 +22,8 @@ public class DecodeBase64FunctionTest { @BeforeAll public static void setUp() { udfContext = new UDFContext(); - udfContext.setLookup_fields(Arrays.asList("message", "charset")); - udfContext.setOutput_fields(Collections.singletonList("decodeResult")); + udfContext.setLookupFields(Arrays.asList("message", "charset")); + udfContext.setOutputFields(Collections.singletonList("decodeResult")); Map map = new HashMap<>(); map.put("value_field","message"); map.put("charset_field","charset"); @@ -51,8 +51,8 @@ public class DecodeBase64FunctionTest { DecodeBase64 decodeBase64 = new DecodeBase64(); - udfContext.setLookup_fields(Collections.singletonList("message")); - udfContext.setOutput_fields(Collections.singletonList("decodeResult")); + udfContext.setLookupFields(Collections.singletonList("message")); + udfContext.setOutputFields(Collections.singletonList("decodeResult")); udfContext.getParameters().remove("value_field"); assertThrows(GrootStreamRuntimeException.class, () -> { diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java index 2126117..f8076cc 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java @@ -24,15 +24,15 @@ public class DomainFunctionTest { udfContext = new UDFContext(); parameters = new HashMap<>(); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("domain")); - udfContext.setOutput_fields(Collections.singletonList("domain1")); + udfContext.setLookupFields(Collections.singletonList("domain")); + udfContext.setOutputFields(Collections.singletonList("domain1")); } @Test public void testInit(){ Domain domain = new Domain(); - udfContext.setLookup_fields(new ArrayList<>()); + udfContext.setLookupFields(new ArrayList<>()); udfContext.setParameters(new HashMap<>()); udfContext.setParameters(null); Assertions.assertThrows(GrootStreamRuntimeException.class, () -> { @@ -44,10 +44,10 @@ public class DomainFunctionTest { domain.open(null, udfContext); }); - udfContext.setLookup_fields(new ArrayList<>()); - udfContext.getLookup_fields().add("v1"); - udfContext.setOutput_fields(new ArrayList<>()); - udfContext.getOutput_fields().add("v2"); + udfContext.setLookupFields(new ArrayList<>()); + udfContext.getLookupFields().add("v1"); + udfContext.setOutputFields(new ArrayList<>()); + udfContext.getOutputFields().add("v2"); udfContext.setParameters(new HashMap<>()); udfContext.getParameters().put("option","other"); Assertions.assertThrows(GrootStreamRuntimeException.class, () -> { @@ -59,16 +59,16 @@ public class DomainFunctionTest { @Test public void testDomainFunctionTopLevelDomain() { parameters.put("option", "TOP_LEVEL_DOMAIN"); - udfContext.setLookup_fields(Collections.singletonList("domain")); - udfContext.setOutput_fields(Collections.singletonList("domain1")); + udfContext.setLookupFields(Collections.singletonList("domain")); + udfContext.setOutputFields(Collections.singletonList("domain1")); Domain domain = new Domain(); domain.open(null, udfContext); Event event = new Event(); Map extractedFields = new HashMap<>(); - extractedFields.put("domain", "www.baidu.com"); + extractedFields.put("domain", "http://www.baidu.com.cn"); event.setExtractedFields(extractedFields); Event result1 = domain.evaluate(event); - assertEquals("com", result1.getExtractedFields().get("domain1")); + assertEquals("com.cn", result1.getExtractedFields().get("domain1")); } @Test @@ -76,8 +76,8 @@ public class DomainFunctionTest { parameters.put("option", "FIRST_SIGNIFICANT_SUBDOMAIN"); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("domain")); - udfContext.setOutput_fields(Collections.singletonList("domain1")); + udfContext.setLookupFields(Collections.singletonList("domain")); + udfContext.setOutputFields(Collections.singletonList("domain1")); Domain domain = new Domain(); domain.open(null, udfContext); Event event = new Event(); @@ -87,4 +87,5 @@ public class DomainFunctionTest { Event result1 = domain.evaluate(event); assertEquals("baidu.com", result1.getExtractedFields().get("domain1")); } + } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java index adda7c5..2bd6705 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java @@ -25,8 +25,8 @@ public class EncodeBase64FunctionTest { @Test public void testEncodeBase64FunctionForByte() { udfContext = new UDFContext(); - udfContext.setOutput_fields(Collections.singletonList("encodeResult")); - udfContext.setLookup_fields(Collections.singletonList("name")); + udfContext.setOutputFields(Collections.singletonList("encodeResult")); + udfContext.setLookupFields(Collections.singletonList("name")); Map map = new HashMap<>(); map.put("input_type","byte_array"); udfContext.setParameters(map); @@ -47,8 +47,8 @@ public class EncodeBase64FunctionTest { public void testEncodeBase64FunctionForString() { udfContext = new UDFContext(); - udfContext.setOutput_fields(Collections.singletonList("encodeResult")); - udfContext.setLookup_fields(Collections.singletonList("name")); + udfContext.setOutputFields(Collections.singletonList("encodeResult")); + udfContext.setLookupFields(Collections.singletonList("name")); Map map = new HashMap<>(); map.put("input_type","string"); udfContext.setParameters(map); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java index a83d853..c235494 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java @@ -42,8 +42,8 @@ public class EncryptFunctionTest { public static void setUp() throws IOException { Security.addProvider(new BouncyCastleProvider()); udfContext = new UDFContext(); - udfContext.setLookup_fields(Collections.singletonList("phone_number")); - udfContext.setOutput_fields(Collections.singletonList("phone_number")); + udfContext.setLookupFields(Collections.singletonList("phone_number")); + udfContext.setOutputFields(Collections.singletonList("phone_number")); httpClientPoolUtilMockedStatic = mockSensitiveFields(); } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java index 61c3975..e829b1d 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java @@ -25,7 +25,7 @@ public class FlattenFunctionTest { UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("parent1", "parent2", "parent3", "parent4","parent5","parent6","parent7","parent8")); + udfContext.setLookupFields(List.of("parent1", "parent2", "parent3", "parent4","parent5","parent6","parent7","parent8")); Map params = new HashMap<>(); params.put("prefix", "prefix"); params.put("depth", "4"); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java index d9f4538..6cb1bf8 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java @@ -19,8 +19,8 @@ public class FromUnixTimestampTest { @BeforeAll public static void setUp() { udfContext = new UDFContext(); - udfContext.setLookup_fields(Arrays.asList("unixTimestamp")); - udfContext.setOutput_fields(Arrays.asList("timestamp")); + udfContext.setLookupFields(Arrays.asList("unixTimestamp")); + udfContext.setOutputFields(Arrays.asList("timestamp")); } @Test public void testFromUnixTimestampMsFunction() throws Exception { diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/GenerateStringArrayFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/GenerateStringArrayFunctionTest.java index 5490299..1fbe06c 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/GenerateStringArrayFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/GenerateStringArrayFunctionTest.java @@ -17,8 +17,8 @@ public class GenerateStringArrayFunctionTest { @BeforeAll public static void setUp() { udfContext = new UDFContext(); - udfContext.setLookup_fields(Arrays.asList("t1", "t2","t3","t4","t5")); - udfContext.setOutput_fields(Collections.singletonList("result_list")); + udfContext.setLookupFields(Arrays.asList("t1", "t2","t3","t4","t5")); + udfContext.setOutputFields(Collections.singletonList("result_list")); } // 测试方法 diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/HmacFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/HmacFunctionTest.java index d2219d8..5a4d0d3 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/HmacFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/HmacFunctionTest.java @@ -23,8 +23,8 @@ public class HmacFunctionTest { @BeforeAll public static void setUp() { udfContext = new UDFContext(); - udfContext.setLookup_fields(Collections.singletonList("phone_number")); - udfContext.setOutput_fields(Collections.singletonList("phone_number_mac")); + udfContext.setLookupFields(Collections.singletonList("phone_number")); + udfContext.setOutputFields(Collections.singletonList("phone_number_mac")); } @Test diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java index 99f3f96..dd661de 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java @@ -25,7 +25,7 @@ public class JsonExtractFunctionTest { @Test public void testInit(){ JsonExtract jsonExtract = new JsonExtract(); - udfContext.setLookup_fields(new ArrayList<>()); + udfContext.setLookupFields(new ArrayList<>()); udfContext.setParameters(new HashMap<>()); udfContext.setParameters(null); Assertions.assertThrows(GrootStreamRuntimeException.class, () -> { @@ -52,8 +52,8 @@ public class JsonExtractFunctionTest { Map parameters = new HashMap<>(); parameters.put("value_expression","$.tags[?(@.tag=='device_group')][0].value"); udfContext.setParameters(parameters); - udfContext.setLookup_fields(Collections.singletonList("device_tag")); - udfContext.setOutput_fields(Collections.singletonList("device_group")); + udfContext.setLookupFields(Collections.singletonList("device_tag")); + udfContext.setOutputFields(Collections.singletonList("device_group")); jsonExtract.open(null, udfContext); Event event = new Event(); String jsonString = "{\"device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-tsgx\\\"},{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-tsgx\\\"}]}\"}"; diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/StringJoinerFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/StringJoinerFunctionTest.java index d80eb97..e9cde8a 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/StringJoinerFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/StringJoinerFunctionTest.java @@ -20,8 +20,8 @@ public class StringJoinerFunctionTest { @BeforeAll public static void setUp() { udfContext = new UDFContext(); - udfContext.setLookup_fields(Arrays.asList("server_ip", "client_ip")); - udfContext.setOutput_fields(Collections.singletonList("ip_string")); + udfContext.setLookupFields(Arrays.asList("server_ip", "client_ip")); + udfContext.setOutputFields(Collections.singletonList("ip_string")); Map params = new HashMap<>(); params.put("separator",","); params.put("prefix","["); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java index ef79d51..534569b 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java @@ -26,8 +26,8 @@ public class UUIDTest { UUIDv5 uuidv5 = new UUIDv5(); parameters = new HashMap<>(); udfContext.setParameters(parameters); - udfContext.setLookup_fields(List.of("client_ip","server_ip")); - udfContext.setOutput_fields(Collections.singletonList("uuid")); + udfContext.setLookupFields(List.of("client_ip","server_ip")); + udfContext.setOutputFields(Collections.singletonList("uuid")); parameters.put("namespace","NAMESPACE_IP_1"); Assertions.assertThrows(GrootStreamRuntimeException.class, () -> { uuidv5.open(null, udfContext); @@ -41,7 +41,7 @@ public class UUIDTest { UUID uuid = new UUID(); parameters = new HashMap<>(); udfContext.setParameters(parameters); - udfContext.setOutput_fields(Collections.singletonList("uuid")); + udfContext.setOutputFields(Collections.singletonList("uuid")); uuid.open(null, udfContext); Event event = new Event(); Map extractedFields = new HashMap<>(); @@ -55,7 +55,7 @@ public class UUIDTest { UUIDv7 uuid = new UUIDv7(); parameters = new HashMap<>(); udfContext.setParameters(parameters); - udfContext.setOutput_fields(Collections.singletonList("uuid")); + udfContext.setOutputFields(Collections.singletonList("uuid")); uuid.open(null, udfContext); Event event = new Event(); Map extractedFields = new HashMap<>(); @@ -69,8 +69,8 @@ public class UUIDTest { UUIDv5 uuidv5 = new UUIDv5(); parameters = new HashMap<>(); udfContext.setParameters(parameters); - udfContext.setLookup_fields(List.of("client_ip", "server_ip")); - udfContext.setOutput_fields(Collections.singletonList("uuid")); + udfContext.setLookupFields(List.of("client_ip", "server_ip")); + udfContext.setOutputFields(Collections.singletonList("uuid")); parameters.put("namespace","NAMESPACE_IP"); uuidv5.open(null, udfContext); Event event = new Event(); @@ -90,8 +90,8 @@ public class UUIDTest { UUIDv5 uuidv5 = new UUIDv5(); parameters = new HashMap<>(); udfContext.setParameters(parameters); - udfContext.setLookup_fields(List.of("domain")); - udfContext.setOutput_fields(Collections.singletonList("uuid")); + udfContext.setLookupFields(List.of("domain")); + udfContext.setOutputFields(Collections.singletonList("uuid")); parameters.put("namespace","NAMESPACE_DOMAIN"); uuidv5.open(null, udfContext); Event event = new Event(); @@ -107,8 +107,8 @@ public class UUIDTest { UUIDv5 uuidv5 = new UUIDv5(); parameters = new HashMap<>(); udfContext.setParameters(parameters); - udfContext.setLookup_fields(List.of("app")); - udfContext.setOutput_fields(Collections.singletonList("uuid")); + udfContext.setLookupFields(List.of("app")); + udfContext.setOutputFields(Collections.singletonList("uuid")); parameters.put("namespace","NAMESPACE_APP"); uuidv5.open(null, udfContext); Event event = new Event(); @@ -125,8 +125,8 @@ public class UUIDTest { UUIDv5 uuidv5 = new UUIDv5(); parameters = new HashMap<>(); udfContext.setParameters(parameters); - udfContext.setLookup_fields(List.of("subscriber_id")); - udfContext.setOutput_fields(Collections.singletonList("uuid")); + udfContext.setLookupFields(List.of("subscriber_id")); + udfContext.setOutputFields(Collections.singletonList("uuid")); parameters.put("namespace","NAMESPACE_SUBSCRIBER"); uuidv5.open(null, udfContext); Event event = new Event(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java index 12a2093..a0d70d7 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java @@ -19,8 +19,8 @@ public class UnixTimestampConverterTest { @BeforeAll public static void setUp() { udfContext = new UDFContext(); - udfContext.setLookup_fields(Arrays.asList("input")); - udfContext.setOutput_fields(Arrays.asList("output")); + udfContext.setLookupFields(Arrays.asList("input")); + udfContext.setOutputFields(Arrays.asList("output")); } @Test diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/JsonUnrollFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/JsonUnrollFunctionTest.java index 02f0b66..3749eb1 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/JsonUnrollFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/JsonUnrollFunctionTest.java @@ -52,8 +52,8 @@ public class JsonUnrollFunctionTest { JsonUnroll unroll = new JsonUnroll(); Event event = new Event(); event.setExtractedFields(nestedMap); - udfContext.setLookup_fields(List.of("k1")); - udfContext.setOutput_fields(List.of("newk1")); + udfContext.setLookupFields(List.of("k1")); + udfContext.setOutputFields(List.of("newk1")); unroll.open(null, udfContext); List result3 = unroll.evaluate(event); assertEquals(2, result3.size()); @@ -75,8 +75,8 @@ public class JsonUnrollFunctionTest { udfContext.setParameters(params); params.put("path", "$.k3_1.k3_1_1"); event.setExtractedFields(nestedMap); - udfContext.setLookup_fields(List.of("k3")); - udfContext.setOutput_fields(List.of("newk3")); + udfContext.setLookupFields(List.of("k3")); + udfContext.setOutputFields(List.of("newk3")); unroll.open(null, udfContext); List result2 = unroll.evaluate(event); assertEquals(2, result2.size()); @@ -95,8 +95,8 @@ public class JsonUnrollFunctionTest { udfContext.setParameters(params); params.put("path", "$.k4_1.k4_1_1"); event.setExtractedFields(nestedMap); - udfContext.setLookup_fields(List.of("k4")); - udfContext.setOutput_fields(List.of("newk4")); + udfContext.setLookupFields(List.of("k4")); + udfContext.setOutputFields(List.of("newk4")); unroll.open(null, udfContext); List result2 = unroll.evaluate(event); assertEquals(1, result2.size()); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnrollFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnrollFunctionTest.java index 2f4da76..db66e55 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnrollFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnrollFunctionTest.java @@ -34,8 +34,8 @@ public class UnrollFunctionTest { UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("k1")); - udfContext.setOutput_fields(List.of("newk1")); + udfContext.setLookupFields(List.of("k1")); + udfContext.setOutputFields(List.of("newk1")); Unroll unroll = new Unroll(); unroll.open(null, udfContext); Event event = new Event(); @@ -60,8 +60,8 @@ public class UnrollFunctionTest { udfContext.setParameters(params); udfContext.setParameters(params); event.setExtractedFields(nestedMap); - udfContext.setLookup_fields(List.of("k2")); - udfContext.setOutput_fields(List.of("k2")); + udfContext.setLookupFields(List.of("k2")); + udfContext.setOutputFields(List.of("k2")); unroll.open(null, udfContext); List result2 = unroll.evaluate(event); assertEquals(3, result2.size()); @@ -76,8 +76,8 @@ public class UnrollFunctionTest { Unroll unroll = new Unroll(); Event event = new Event(); event.setExtractedFields(nestedMap); - udfContext.setLookup_fields(List.of("k3")); - udfContext.setOutput_fields(List.of("newk3")); + udfContext.setLookupFields(List.of("k3")); + udfContext.setOutputFields(List.of("newk3")); unroll.open(null, udfContext); List result2 = unroll.evaluate(event); assertEquals(1, result2.size()); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantileTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantileTest.java index 33f7bad..990186d 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantileTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantileTest.java @@ -1,89 +1,89 @@ -package com.geedgenetworks.core.udf.udaf.HdrHistogram; - -import com.geedgenetworks.common.Accumulator; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.udf.AggregateFunction; -import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.sketch.util.StringUtils; -import org.HdrHistogram.ArrayHistogram; -import org.junit.jupiter.api.Test; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.*; - -public class HdrHistogramQuantileTest { - AggregateFunction agg; - Accumulator acc; - Event event; - - @Test - public void inputRegular() { - double probability = 0.5; - initData( "regular", 2, probability); - long count = 100000; - Map fields = event.getExtractedFields(); - for (int i = 1; i <= count; i++) { - fields.put("ms", i); - agg.add(event, acc); - } - - long expect = (long) (count * probability); - long rst = (long)agg.getResult(acc).getMetricsFields().get("ms_his"); - double error = Math.abs(rst - expect) / (double) expect; - System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error)); - assertTrue(error <= 0.05); - } - - @Test - public void inputSketch() { - double probability = 0.5; - initData( "sketch", 2, probability); - long count = 100000; - Map fields = event.getExtractedFields(); - - ArrayHistogram his = new ArrayHistogram(2); - for (int i = 1; i <= count; i++) { - his.recordValue(i); - } - fields.put("ms", StringUtils.encodeBase64String(his.toBytes())); - agg.add(event, acc); - - his = new ArrayHistogram(2); - for (int i = 1; i <= count; i++) { - his.recordValue(i); - } - fields.put("ms", his.toBytes()); - agg.add(event, acc); - - long expect = (long) (count * probability); - long rst = (long)agg.getResult(acc).getMetricsFields().get("ms_his"); - double error = Math.abs(rst - expect) / (double) expect; - System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error)); - assertTrue(error <= 0.05); - } - - private void initData(String input_type, int numberOfSignificantValueDigits, double probability){ - agg = new HdrHistogramQuantile(); - UDFContext c = new UDFContext(); - Map parameters = new HashMap<>(); - parameters.put("input_type", input_type); - parameters.put("numberOfSignificantValueDigits", numberOfSignificantValueDigits); - parameters.put("probability", probability); - c.setParameters(parameters); - c.setLookup_fields(Collections.singletonList("ms")); - c.setOutput_fields(Collections.singletonList("ms_his")); - - agg.open(c); - Map map = new HashMap<>(); - acc = new Accumulator(); - acc.setMetricsFields(map); - agg.initAccumulator(acc); - - event = new Event(); - Map fields = new HashMap<>(); - event.setExtractedFields(fields); - } +package com.geedgenetworks.core.udf.udaf.HdrHistogram; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.sketch.util.StringUtils; +import org.HdrHistogram.ArrayHistogram; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; + +public class HdrHistogramQuantileTest { + AggregateFunction agg; + Accumulator acc; + Event event; + + @Test + public void inputRegular() { + double probability = 0.5; + initData( "regular", 2, probability); + long count = 100000; + Map fields = event.getExtractedFields(); + for (int i = 1; i <= count; i++) { + fields.put("ms", i); + agg.add(event, acc); + } + + long expect = (long) (count * probability); + long rst = (long)agg.getResult(acc).getMetricsFields().get("ms_his"); + double error = Math.abs(rst - expect) / (double) expect; + System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error)); + assertTrue(error <= 0.05); + } + + @Test + public void inputSketch() { + double probability = 0.5; + initData( "sketch", 2, probability); + long count = 100000; + Map fields = event.getExtractedFields(); + + ArrayHistogram his = new ArrayHistogram(2); + for (int i = 1; i <= count; i++) { + his.recordValue(i); + } + fields.put("ms", StringUtils.encodeBase64String(his.toBytes())); + agg.add(event, acc); + + his = new ArrayHistogram(2); + for (int i = 1; i <= count; i++) { + his.recordValue(i); + } + fields.put("ms", his.toBytes()); + agg.add(event, acc); + + long expect = (long) (count * probability); + long rst = (long)agg.getResult(acc).getMetricsFields().get("ms_his"); + double error = Math.abs(rst - expect) / (double) expect; + System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error)); + assertTrue(error <= 0.05); + } + + private void initData(String input_type, int numberOfSignificantValueDigits, double probability){ + agg = new HdrHistogramQuantile(); + UDFContext c = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("input_type", input_type); + parameters.put("numberOfSignificantValueDigits", numberOfSignificantValueDigits); + parameters.put("probability", probability); + c.setParameters(parameters); + c.setLookupFields(Collections.singletonList("ms")); + c.setOutputFields(Collections.singletonList("ms_his")); + + agg.open(c); + Map map = new HashMap<>(); + acc = new Accumulator(); + acc.setMetricsFields(map); + agg.initAccumulator(acc); + + event = new Event(); + Map fields = new HashMap<>(); + event.setExtractedFields(fields); + } } \ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantilesTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantilesTest.java index 4eefd9a..a57645d 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantilesTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantilesTest.java @@ -1,98 +1,98 @@ -package com.geedgenetworks.core.udf.udaf.HdrHistogram; - -import com.geedgenetworks.common.Accumulator; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.udf.AggregateFunction; -import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.sketch.util.StringUtils; -import org.HdrHistogram.ArrayHistogram; -import org.junit.jupiter.api.Test; - -import java.util.*; - -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class HdrHistogramQuantilesTest { - AggregateFunction agg; - Accumulator acc; - Event event; - - @Test - public void inputRegular() { - double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1}; - initData( "regular", 2, probabilities); - long count = 100000; - Map fields = event.getExtractedFields(); - for (int i = 1; i <= count; i++) { - fields.put("ms", i); - agg.add(event, acc); - } - - long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray(); - - List rsts = (List)agg.getResult(acc).getMetricsFields().get("ms_his"); - for (int i = 0; i < expects.length; i++) { - long rst = rsts.get(i); - long expect = expects[i]; - double probability = probabilities[i]; - double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect; - System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error)); - assertTrue(error <= 0.05); - } - } - - @Test - public void inputSketch() { - double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1}; - initData( "sketch", 2, probabilities); - long count = 100000; - Map fields = event.getExtractedFields(); - long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray(); - - ArrayHistogram his = new ArrayHistogram(2); - for (int i = 1; i <= count; i++) { - his.recordValue(i); - } - fields.put("ms", StringUtils.encodeBase64String(his.toBytes())); - agg.add(event, acc); - - his = new ArrayHistogram(2); - for (int i = 1; i <= count; i++) { - his.recordValue(i); - } - fields.put("ms", his.toBytes()); - agg.add(event, acc); - - List rsts = (List)agg.getResult(acc).getMetricsFields().get("ms_his"); - for (int i = 0; i < expects.length; i++) { - long rst = rsts.get(i); - long expect = expects[i]; - double probability = probabilities[i]; - double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect; - System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error)); - assertTrue(error <= 0.05); - } - } - - private void initData(String input_type, int numberOfSignificantValueDigits, double[] probabilities){ - agg = new HdrHistogramQuantiles(); - UDFContext c = new UDFContext(); - Map parameters = new HashMap<>(); - parameters.put("input_type", input_type); - parameters.put("numberOfSignificantValueDigits", numberOfSignificantValueDigits); - parameters.put("probabilities", probabilities); - c.setParameters(parameters); - c.setLookup_fields(Collections.singletonList("ms")); - c.setOutput_fields(Collections.singletonList("ms_his")); - - agg.open(c); - Map map = new HashMap<>(); - acc = new Accumulator(); - acc.setMetricsFields(map); - agg.initAccumulator(acc); - - event = new Event(); - Map fields = new HashMap<>(); - event.setExtractedFields(fields); - } +package com.geedgenetworks.core.udf.udaf.HdrHistogram; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.sketch.util.StringUtils; +import org.HdrHistogram.ArrayHistogram; +import org.junit.jupiter.api.Test; + +import java.util.*; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class HdrHistogramQuantilesTest { + AggregateFunction agg; + Accumulator acc; + Event event; + + @Test + public void inputRegular() { + double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1}; + initData( "regular", 2, probabilities); + long count = 100000; + Map fields = event.getExtractedFields(); + for (int i = 1; i <= count; i++) { + fields.put("ms", i); + agg.add(event, acc); + } + + long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray(); + + List rsts = (List)agg.getResult(acc).getMetricsFields().get("ms_his"); + for (int i = 0; i < expects.length; i++) { + long rst = rsts.get(i); + long expect = expects[i]; + double probability = probabilities[i]; + double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect; + System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error)); + assertTrue(error <= 0.05); + } + } + + @Test + public void inputSketch() { + double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1}; + initData( "sketch", 2, probabilities); + long count = 100000; + Map fields = event.getExtractedFields(); + long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray(); + + ArrayHistogram his = new ArrayHistogram(2); + for (int i = 1; i <= count; i++) { + his.recordValue(i); + } + fields.put("ms", StringUtils.encodeBase64String(his.toBytes())); + agg.add(event, acc); + + his = new ArrayHistogram(2); + for (int i = 1; i <= count; i++) { + his.recordValue(i); + } + fields.put("ms", his.toBytes()); + agg.add(event, acc); + + List rsts = (List)agg.getResult(acc).getMetricsFields().get("ms_his"); + for (int i = 0; i < expects.length; i++) { + long rst = rsts.get(i); + long expect = expects[i]; + double probability = probabilities[i]; + double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect; + System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error)); + assertTrue(error <= 0.05); + } + } + + private void initData(String input_type, int numberOfSignificantValueDigits, double[] probabilities){ + agg = new HdrHistogramQuantiles(); + UDFContext c = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("input_type", input_type); + parameters.put("numberOfSignificantValueDigits", numberOfSignificantValueDigits); + parameters.put("probabilities", probabilities); + c.setParameters(parameters); + c.setLookupFields(Collections.singletonList("ms")); + c.setOutputFields(Collections.singletonList("ms_his")); + + agg.open(c); + Map map = new HashMap<>(); + acc = new Accumulator(); + acc.setMetricsFields(map); + agg.initAccumulator(acc); + + event = new Event(); + Map fields = new HashMap<>(); + event.setExtractedFields(fields); + } } \ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramTest.java index f177ca5..5905138 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramTest.java @@ -1,102 +1,102 @@ -package com.geedgenetworks.core.udf.udaf.HdrHistogram; - -import com.geedgenetworks.common.Accumulator; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.udf.AggregateFunction; -import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.sketch.util.StringUtils; -import org.HdrHistogram.ArrayHistogram; -import org.junit.jupiter.api.Test; - -import java.nio.charset.StandardCharsets; -import java.util.*; - -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class HdrHistogramTest { - AggregateFunction agg; - Accumulator acc; - Event event; - - @Test - public void inputRegular() { - double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1}; - initData( "regular", 2, "base64"); - long count = 100000; - Map fields = event.getExtractedFields(); - for (int i = 1; i <= count; i++) { - fields.put("ms", i); - agg.add(event, acc); - } - - long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray(); - String str = (String) agg.getResult(acc).getMetricsFields().get("ms_his"); - ArrayHistogram his = ArrayHistogram.fromBytes(StringUtils.decodeBase64(str.getBytes(StandardCharsets.UTF_8))); - - for (int i = 0; i < expects.length; i++) { - long rst = his.getValueAtPercentile(probabilities[i] * 100); - long expect = expects[i]; - double probability = probabilities[i]; - double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect; - System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error)); - assertTrue(error <= 0.05); - } - } - - @Test - public void inputSketch() { - double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1}; - initData( "sketch", 2, "binary"); - long count = 100000; - Map fields = event.getExtractedFields(); - long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray(); - - ArrayHistogram his = new ArrayHistogram(2); - for (int i = 1; i <= count; i++) { - his.recordValue(i); - } - fields.put("ms", StringUtils.encodeBase64String(his.toBytes())); - agg.add(event, acc); - - his = new ArrayHistogram(2); - for (int i = 1; i <= count; i++) { - his.recordValue(i); - } - fields.put("ms", his.toBytes()); - agg.add(event, acc); - - byte[] bytes = (byte[]) agg.getResult(acc).getMetricsFields().get("ms_his"); - ArrayHistogram h = ArrayHistogram.fromBytes(bytes); - - for (int i = 0; i < expects.length; i++) { - long rst = h.getValueAtPercentile(probabilities[i] * 100); - long expect = expects[i]; - double probability = probabilities[i]; - double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect; - System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error)); - assertTrue(error <= 0.05); - } - } - - private void initData(String input_type, int numberOfSignificantValueDigits, String output_format){ - agg = new HdrHistogram(); - UDFContext c = new UDFContext(); - Map parameters = new HashMap<>(); - parameters.put("input_type", input_type); - parameters.put("numberOfSignificantValueDigits", numberOfSignificantValueDigits); - parameters.put("output_format", output_format); - c.setParameters(parameters); - c.setLookup_fields(Collections.singletonList("ms")); - c.setOutput_fields(Collections.singletonList("ms_his")); - - agg.open(c); - Map map = new HashMap<>(); - acc = new Accumulator(); - acc.setMetricsFields(map); - agg.initAccumulator(acc); - - event = new Event(); - Map fields = new HashMap<>(); - event.setExtractedFields(fields); - } +package com.geedgenetworks.core.udf.udaf.HdrHistogram; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.sketch.util.StringUtils; +import org.HdrHistogram.ArrayHistogram; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.*; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class HdrHistogramTest { + AggregateFunction agg; + Accumulator acc; + Event event; + + @Test + public void inputRegular() { + double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1}; + initData( "regular", 2, "base64"); + long count = 100000; + Map fields = event.getExtractedFields(); + for (int i = 1; i <= count; i++) { + fields.put("ms", i); + agg.add(event, acc); + } + + long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray(); + String str = (String) agg.getResult(acc).getMetricsFields().get("ms_his"); + ArrayHistogram his = ArrayHistogram.fromBytes(StringUtils.decodeBase64(str.getBytes(StandardCharsets.UTF_8))); + + for (int i = 0; i < expects.length; i++) { + long rst = his.getValueAtPercentile(probabilities[i] * 100); + long expect = expects[i]; + double probability = probabilities[i]; + double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect; + System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error)); + assertTrue(error <= 0.05); + } + } + + @Test + public void inputSketch() { + double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1}; + initData( "sketch", 2, "binary"); + long count = 100000; + Map fields = event.getExtractedFields(); + long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray(); + + ArrayHistogram his = new ArrayHistogram(2); + for (int i = 1; i <= count; i++) { + his.recordValue(i); + } + fields.put("ms", StringUtils.encodeBase64String(his.toBytes())); + agg.add(event, acc); + + his = new ArrayHistogram(2); + for (int i = 1; i <= count; i++) { + his.recordValue(i); + } + fields.put("ms", his.toBytes()); + agg.add(event, acc); + + byte[] bytes = (byte[]) agg.getResult(acc).getMetricsFields().get("ms_his"); + ArrayHistogram h = ArrayHistogram.fromBytes(bytes); + + for (int i = 0; i < expects.length; i++) { + long rst = h.getValueAtPercentile(probabilities[i] * 100); + long expect = expects[i]; + double probability = probabilities[i]; + double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect; + System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error)); + assertTrue(error <= 0.05); + } + } + + private void initData(String input_type, int numberOfSignificantValueDigits, String output_format){ + agg = new HdrHistogram(); + UDFContext c = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("input_type", input_type); + parameters.put("numberOfSignificantValueDigits", numberOfSignificantValueDigits); + parameters.put("output_format", output_format); + c.setParameters(parameters); + c.setLookupFields(Collections.singletonList("ms")); + c.setOutputFields(Collections.singletonList("ms_his")); + + agg.open(c); + Map map = new HashMap<>(); + acc = new Accumulator(); + acc.setMetricsFields(map); + agg.initAccumulator(acc); + + event = new Event(); + Map fields = new HashMap<>(); + event.setExtractedFields(fields); + } } \ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinctTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinctTest.java index eae356d..b80d782 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinctTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinctTest.java @@ -1,87 +1,87 @@ -package com.geedgenetworks.core.udf.udaf.hlld; - - -import com.geedgenetworks.common.Accumulator; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.udf.AggregateFunction; -import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.sketch.hlld.Hll; -import com.geedgenetworks.sketch.util.StringUtils; -import org.junit.jupiter.api.Test; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertTrue; - - -public class HlldApproxCountDistinctTest { - AggregateFunction agg; - Accumulator acc; - Event event; - - - @Test - public void inputRegular() { - initData(14, "regular"); - long count = 100000; - Map fields = event.getExtractedFields(); - for (int i = 0; i < count; i++) { - fields.put("ip", i); - agg.add(event, acc); - } - - long rst = (long)agg.getResult(acc).getMetricsFields().get("ip_cnt"); - double error = Math.abs(rst - count) / (double) count; - System.out.println(String.format("%d,%d,%.4f", count , rst , error)); - assertTrue(error <= 0.05); - } - - @Test - public void inputSketch() { - initData(14, "sketch"); - long count = 150000; - Map fields = event.getExtractedFields(); - - Hll hll = new Hll(12); - for (int i = 0; i < 100000; i++) { - hll.add(i); - } - fields.put("ip", StringUtils.encodeBase64String(hll.toBytes())); - agg.add(event, acc); - - hll = new Hll(13); - for (int i = 50000; i < 150000; i++) { - hll.add(i); - } - fields.put("ip", hll.toBytes()); - agg.add(event, acc); - - long rst = (long)agg.getResult(acc).getMetricsFields().get("ip_cnt"); - double error = Math.abs(rst - count) / (double) count; - System.out.println(String.format("%d,%d,%.4f", count , rst , error)); - assertTrue(error <= 0.05); - } - - private void initData(int precision, String input_type){ - agg = new HlldApproxCountDistinct(); - UDFContext c = new UDFContext(); - Map parameters = new HashMap<>(); - parameters.put("precision", precision); - parameters.put("input_type", input_type); - c.setParameters(parameters); - c.setLookup_fields(Collections.singletonList("ip")); - c.setOutput_fields(Collections.singletonList("ip_cnt")); - - agg.open(c); - Map map = new HashMap<>(); - acc = new Accumulator(); - acc.setMetricsFields(map); - agg.initAccumulator(acc); - - event = new Event(); - Map fields = new HashMap<>(); - event.setExtractedFields(fields); - } +package com.geedgenetworks.core.udf.udaf.hlld; + + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.sketch.hlld.Hll; +import com.geedgenetworks.sketch.util.StringUtils; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertTrue; + + +public class HlldApproxCountDistinctTest { + AggregateFunction agg; + Accumulator acc; + Event event; + + + @Test + public void inputRegular() { + initData(14, "regular"); + long count = 100000; + Map fields = event.getExtractedFields(); + for (int i = 0; i < count; i++) { + fields.put("ip", i); + agg.add(event, acc); + } + + long rst = (long)agg.getResult(acc).getMetricsFields().get("ip_cnt"); + double error = Math.abs(rst - count) / (double) count; + System.out.println(String.format("%d,%d,%.4f", count , rst , error)); + assertTrue(error <= 0.05); + } + + @Test + public void inputSketch() { + initData(14, "sketch"); + long count = 150000; + Map fields = event.getExtractedFields(); + + Hll hll = new Hll(12); + for (int i = 0; i < 100000; i++) { + hll.add(i); + } + fields.put("ip", StringUtils.encodeBase64String(hll.toBytes())); + agg.add(event, acc); + + hll = new Hll(13); + for (int i = 50000; i < 150000; i++) { + hll.add(i); + } + fields.put("ip", hll.toBytes()); + agg.add(event, acc); + + long rst = (long)agg.getResult(acc).getMetricsFields().get("ip_cnt"); + double error = Math.abs(rst - count) / (double) count; + System.out.println(String.format("%d,%d,%.4f", count , rst , error)); + assertTrue(error <= 0.05); + } + + private void initData(int precision, String input_type){ + agg = new HlldApproxCountDistinct(); + UDFContext c = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("precision", precision); + parameters.put("input_type", input_type); + c.setParameters(parameters); + c.setLookupFields(Collections.singletonList("ip")); + c.setOutputFields(Collections.singletonList("ip_cnt")); + + agg.open(c); + Map map = new HashMap<>(); + acc = new Accumulator(); + acc.setMetricsFields(map); + agg.initAccumulator(acc); + + event = new Event(); + Map fields = new HashMap<>(); + event.setExtractedFields(fields); + } } \ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldTest.java index f489ee4..d6ed4c1 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldTest.java @@ -1,86 +1,86 @@ -package com.geedgenetworks.core.udf.udaf.hlld; - -import com.geedgenetworks.common.Accumulator; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.udf.AggregateFunction; -import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.sketch.hlld.Hll; -import com.geedgenetworks.sketch.util.StringUtils; -import org.junit.jupiter.api.Test; - -import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class HlldTest { - AggregateFunction agg; - Accumulator acc; - Event event; - - @Test - public void inputRegular() { - initData(14, "regular", "base64"); - long count = 100000; - Map fields = event.getExtractedFields(); - for (int i = 0; i < count; i++) { - fields.put("ip", i); - agg.add(event, acc); - } - - String hllStr = (String)agg.getResult(acc).getMetricsFields().get("ip_cnt"); - long rst = (long) Hll.fromBytes(StringUtils.decodeBase64(hllStr.getBytes(StandardCharsets.UTF_8))).size(); - double error = Math.abs(rst - count) / (double) count; - System.out.println(String.format("%d,%d,%.4f", count , rst , error)); - assertTrue(error <= 0.05); - } - - @Test - public void inputSketch() { - initData(14, "sketch", "binary"); - long count = 150000; - Map fields = event.getExtractedFields(); - for (int i = 0; i < 100000; i++) { - Hll hll = new Hll(12); - hll.add(i); - fields.put("ip", StringUtils.encodeBase64String(hll.toBytes())); - agg.add(event, acc); - } - for (int i = 50000; i < 150000; i++) { - Hll hll = new Hll(13); - hll.add(i); - fields.put("ip", hll.toBytes()); - agg.add(event, acc); - } - - byte[] hllBytes = (byte[])agg.getResult(acc).getMetricsFields().get("ip_cnt"); - long rst = (long) Hll.fromBytes(hllBytes).size(); - double error = Math.abs(rst - count) / (double) count; - System.out.println(String.format("%d,%d,%.4f", count , rst , error)); - assertTrue(error <= 0.05); - } - - private void initData(int precision, String input_type, String output_format){ - agg = new Hlld(); - UDFContext c = new UDFContext(); - Map parameters = new HashMap<>(); - parameters.put("precision", precision); - parameters.put("input_type", input_type); - parameters.put("output_format", output_format); - c.setParameters(parameters); - c.setLookup_fields(Collections.singletonList("ip")); - c.setOutput_fields(Collections.singletonList("ip_cnt")); - - agg.open(c); - Map map = new HashMap<>(); - acc = new Accumulator(); - acc.setMetricsFields(map); - agg.initAccumulator(acc); - - event = new Event(); - Map fields = new HashMap<>(); - event.setExtractedFields(fields); - } -} +package com.geedgenetworks.core.udf.udaf.hlld; + +import com.geedgenetworks.common.Accumulator; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.AggregateFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.sketch.hlld.Hll; +import com.geedgenetworks.sketch.util.StringUtils; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class HlldTest { + AggregateFunction agg; + Accumulator acc; + Event event; + + @Test + public void inputRegular() { + initData(14, "regular", "base64"); + long count = 100000; + Map fields = event.getExtractedFields(); + for (int i = 0; i < count; i++) { + fields.put("ip", i); + agg.add(event, acc); + } + + String hllStr = (String)agg.getResult(acc).getMetricsFields().get("ip_cnt"); + long rst = (long) Hll.fromBytes(StringUtils.decodeBase64(hllStr.getBytes(StandardCharsets.UTF_8))).size(); + double error = Math.abs(rst - count) / (double) count; + System.out.println(String.format("%d,%d,%.4f", count , rst , error)); + assertTrue(error <= 0.05); + } + + @Test + public void inputSketch() { + initData(14, "sketch", "binary"); + long count = 150000; + Map fields = event.getExtractedFields(); + for (int i = 0; i < 100000; i++) { + Hll hll = new Hll(12); + hll.add(i); + fields.put("ip", StringUtils.encodeBase64String(hll.toBytes())); + agg.add(event, acc); + } + for (int i = 50000; i < 150000; i++) { + Hll hll = new Hll(13); + hll.add(i); + fields.put("ip", hll.toBytes()); + agg.add(event, acc); + } + + byte[] hllBytes = (byte[])agg.getResult(acc).getMetricsFields().get("ip_cnt"); + long rst = (long) Hll.fromBytes(hllBytes).size(); + double error = Math.abs(rst - count) / (double) count; + System.out.println(String.format("%d,%d,%.4f", count , rst , error)); + assertTrue(error <= 0.05); + } + + private void initData(int precision, String input_type, String output_format){ + agg = new Hlld(); + UDFContext c = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("precision", precision); + parameters.put("input_type", input_type); + parameters.put("output_format", output_format); + c.setParameters(parameters); + c.setLookupFields(Collections.singletonList("ip")); + c.setOutputFields(Collections.singletonList("ip_cnt")); + + agg.open(c); + Map map = new HashMap<>(); + acc = new Accumulator(); + acc.setMetricsFields(map); + agg.initAccumulator(acc); + + event = new Event(); + Map fields = new HashMap<>(); + event.setExtractedFields(fields); + } +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udtf/UnrollTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udtf/UnrollTest.java index 15f3c10..3320f38 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/udtf/UnrollTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udtf/UnrollTest.java @@ -1,109 +1,109 @@ -package com.geedgenetworks.core.udf.udtf; - -import com.alibaba.fastjson2.JSON; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.udf.UDFContext; -import org.junit.jupiter.api.Test; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import static org.junit.jupiter.api.Assertions.*; - - -public class UnrollTest { - PathUnroll pathUnroll; - Event event; - - @Test - public void explodePathWithNoFileField() { - init("path", "out_path", "."); - Map fields = event.getExtractedFields(); - fields.put("path", "ETHERNET.IPv4.TCP.ssl"); - String[] excepted = new String[]{"ETHERNET","ETHERNET.IPv4","ETHERNET.IPv4.TCP","ETHERNET.IPv4.TCP.ssl"}; - String[] outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new); - System.out.println(JSON.toJSONString(outPaths)); - assertArrayEquals(outPaths, excepted); - // 忽略结尾的分隔符 - fields.put("path", "ETHERNET.IPv4.TCP.ssl."); - outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new); - System.out.println(JSON.toJSONString(outPaths)); - assertArrayEquals(outPaths, excepted); - // 空路径不输出 - fields.put("path", ""); - outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new); - System.out.println(JSON.toJSONString(outPaths)); - assertTrue(outPaths.length == 0); - - init("path", "out_path", "/"); - fields = event.getExtractedFields(); - fields.put("path", "ETHERNET/IPv4/TCP/ssl"); - excepted = new String[]{"ETHERNET","ETHERNET/IPv4","ETHERNET/IPv4/TCP","ETHERNET/IPv4/TCP/ssl"}; - outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new); - System.out.println(JSON.toJSONString(outPaths)); - assertArrayEquals(outPaths, excepted); - // 忽略结尾的分隔符 - fields.put("path", "ETHERNET/IPv4/TCP/ssl/"); - outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new); - System.out.println(JSON.toJSONString(outPaths)); - assertArrayEquals(outPaths, excepted); - } - - @Test - public void explodePathWithFileField() { - init("path", "file", "out_path", "out_file", "."); - Map fields = event.getExtractedFields(); - fields.put("path", "ETHERNET.IPv4.TCP.ssl"); - fields.put("file", "ssl"); - String[] excepted = new String[]{"ETHERNET", "ETHERNET.IPv4", "ETHERNET.IPv4.TCP", "ETHERNET.IPv4.TCP.ssl"}; - String[] exceptedFile = new String[]{null, null, null, "ssl"}; - String[] outPaths = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_path")).toArray(String[]::new); - String[] outFiles = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_file")).toArray(String[]::new); - System.out.println(JSON.toJSONString(outPaths)); - System.out.println(JSON.toJSONString(outFiles)); - assertArrayEquals(outPaths, excepted); - assertArrayEquals(outFiles, exceptedFile); - // 忽略结尾的分隔符 - fields.put("path", "ETHERNET.IPv4.TCP.ssl."); - fields.put("file", "ssl"); - outPaths = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_path")).toArray(String[]::new); - outFiles = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_file")).toArray(String[]::new); - System.out.println(JSON.toJSONString(outPaths)); - System.out.println(JSON.toJSONString(outFiles)); - assertArrayEquals(outPaths, excepted); - assertArrayEquals(outFiles, exceptedFile); - - fields.put("path", "ETHERNET.IPv4.TCP.ssl"); - fields.put("file", "ssl.aa"); - excepted = new String[]{"ETHERNET", "ETHERNET.IPv4", "ETHERNET.IPv4.TCP", "ETHERNET.IPv4.TCP.ssl", "ETHERNET.IPv4.TCP.ssl.ssl.aa"}; - exceptedFile = new String[]{null, null, null, null,"ssl.aa"}; - outPaths = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_path")).toArray(String[]::new); - outFiles = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_file")).toArray(String[]::new); - System.out.println(JSON.toJSONString(outPaths)); - System.out.println(JSON.toJSONString(outFiles)); - assertArrayEquals(outPaths, excepted); - assertArrayEquals(outFiles, exceptedFile); - } - - private void init(String pathField, String outputPathField, String separator){ - init(pathField, null, outputPathField, null, separator); - } - - private void init(String pathField, String fileField, String outputPathField, String outputFileField, String separator){ - pathUnroll = new PathUnroll(); - UDFContext c = new UDFContext(); - Map parameters = new HashMap<>(); - parameters.put("separator", separator); - c.setParameters(parameters); - c.setLookup_fields(Arrays.asList(pathField, fileField).stream().filter(x -> x != null).collect(Collectors.toList())); - c.setOutput_fields(Arrays.asList(outputPathField, outputFileField).stream().filter(x -> x != null).collect(Collectors.toList())); - - pathUnroll.open(null, c); - event = new Event(); - Map fields = new HashMap<>(); - event.setExtractedFields(fields); - } +package com.geedgenetworks.core.udf.udtf; + +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.*; + + +public class UnrollTest { + PathUnroll pathUnroll; + Event event; + + @Test + public void explodePathWithNoFileField() { + init("path", "out_path", "."); + Map fields = event.getExtractedFields(); + fields.put("path", "ETHERNET.IPv4.TCP.ssl"); + String[] excepted = new String[]{"ETHERNET","ETHERNET.IPv4","ETHERNET.IPv4.TCP","ETHERNET.IPv4.TCP.ssl"}; + String[] outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new); + System.out.println(JSON.toJSONString(outPaths)); + assertArrayEquals(outPaths, excepted); + // 忽略结尾的分隔符 + fields.put("path", "ETHERNET.IPv4.TCP.ssl."); + outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new); + System.out.println(JSON.toJSONString(outPaths)); + assertArrayEquals(outPaths, excepted); + // 空路径不输出 + fields.put("path", ""); + outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new); + System.out.println(JSON.toJSONString(outPaths)); + assertTrue(outPaths.length == 0); + + init("path", "out_path", "/"); + fields = event.getExtractedFields(); + fields.put("path", "ETHERNET/IPv4/TCP/ssl"); + excepted = new String[]{"ETHERNET","ETHERNET/IPv4","ETHERNET/IPv4/TCP","ETHERNET/IPv4/TCP/ssl"}; + outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new); + System.out.println(JSON.toJSONString(outPaths)); + assertArrayEquals(outPaths, excepted); + // 忽略结尾的分隔符 + fields.put("path", "ETHERNET/IPv4/TCP/ssl/"); + outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new); + System.out.println(JSON.toJSONString(outPaths)); + assertArrayEquals(outPaths, excepted); + } + + @Test + public void explodePathWithFileField() { + init("path", "file", "out_path", "out_file", "."); + Map fields = event.getExtractedFields(); + fields.put("path", "ETHERNET.IPv4.TCP.ssl"); + fields.put("file", "ssl"); + String[] excepted = new String[]{"ETHERNET", "ETHERNET.IPv4", "ETHERNET.IPv4.TCP", "ETHERNET.IPv4.TCP.ssl"}; + String[] exceptedFile = new String[]{null, null, null, "ssl"}; + String[] outPaths = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_path")).toArray(String[]::new); + String[] outFiles = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_file")).toArray(String[]::new); + System.out.println(JSON.toJSONString(outPaths)); + System.out.println(JSON.toJSONString(outFiles)); + assertArrayEquals(outPaths, excepted); + assertArrayEquals(outFiles, exceptedFile); + // 忽略结尾的分隔符 + fields.put("path", "ETHERNET.IPv4.TCP.ssl."); + fields.put("file", "ssl"); + outPaths = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_path")).toArray(String[]::new); + outFiles = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_file")).toArray(String[]::new); + System.out.println(JSON.toJSONString(outPaths)); + System.out.println(JSON.toJSONString(outFiles)); + assertArrayEquals(outPaths, excepted); + assertArrayEquals(outFiles, exceptedFile); + + fields.put("path", "ETHERNET.IPv4.TCP.ssl"); + fields.put("file", "ssl.aa"); + excepted = new String[]{"ETHERNET", "ETHERNET.IPv4", "ETHERNET.IPv4.TCP", "ETHERNET.IPv4.TCP.ssl", "ETHERNET.IPv4.TCP.ssl.ssl.aa"}; + exceptedFile = new String[]{null, null, null, null,"ssl.aa"}; + outPaths = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_path")).toArray(String[]::new); + outFiles = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_file")).toArray(String[]::new); + System.out.println(JSON.toJSONString(outPaths)); + System.out.println(JSON.toJSONString(outFiles)); + assertArrayEquals(outPaths, excepted); + assertArrayEquals(outFiles, exceptedFile); + } + + private void init(String pathField, String outputPathField, String separator){ + init(pathField, null, outputPathField, null, separator); + } + + private void init(String pathField, String fileField, String outputPathField, String outputFileField, String separator){ + pathUnroll = new PathUnroll(); + UDFContext c = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("separator", separator); + c.setParameters(parameters); + c.setLookupFields(Arrays.asList(pathField, fileField).stream().filter(x -> x != null).collect(Collectors.toList())); + c.setOutputFields(Arrays.asList(outputPathField, outputFileField).stream().filter(x -> x != null).collect(Collectors.toList())); + + pathUnroll.open(null, c); + event = new Event(); + Map fields = new HashMap<>(); + event.setExtractedFields(fields); + } } \ No newline at end of file diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml index fd5c035..77afab8 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml @@ -23,6 +23,8 @@ sources: type: string - name: device_tag type: string + - name: http_host + type: string properties: data: '{"tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}' format: json @@ -49,6 +51,13 @@ processing_pipelines: lookup_fields: [] output_fields: [] filter: event.client_ip == '192.168.10.100' + + - function: DOMAIN + lookup_fields: [ http_host, ssl_sni, quic_sni ] + output_fields: [ server_domain ] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + - function: SNOWFLAKE_ID lookup_fields: [] output_fields: [log_id] @@ -124,6 +133,7 @@ application: env: name: example-inline-to-print parallelism: 3 + kms.type: local pipeline: object-reuse: true topology: diff --git a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml index f724a36..2908ffb 100644 --- a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml +++ b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml @@ -70,9 +70,10 @@ processing_pipelines: ORGANIZATION: server_organization - function : BASE64_ENCODE_TO_STRING + lookup_fields: [ mail_subject ] output_fields: [ mail_subject_base64 ] parameters: - value_field: mail_subject + input_type: string - function: BASE64_DECODE_TO_STRING output_fields: [ mail_attachment_name ] @@ -196,6 +197,7 @@ application: parallelism: 1 pipeline: object-reuse: true + properties: hos.path: http://192.168.44.12:9098/hos hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket -- cgit v1.2.3 From ac04f1d8735fb500c11aa87239c9c8c23e5af41a Mon Sep 17 00:00:00 2001 From: doufenghu Date: Wed, 30 Oct 2024 20:15:51 +0800 Subject: [Improve][core] Enhance the drop, AsnLookup, and GeoIPLookup UDF context configuration checks by using a common validation utility. --- .../common/config/CheckUDFContextUtil.java | 59 +++++++--- .../common/config/UDFContextConfigOptions.java | 18 +++ .../common/exception/CommonErrorCode.java | 13 ++- .../com/geedgenetworks/core/udf/AsnLookup.java | 59 ++++++---- .../java/com/geedgenetworks/core/udf/Domain.java | 7 +- .../java/com/geedgenetworks/core/udf/Drop.java | 17 +++ .../com/geedgenetworks/core/udf/GeoIpLookup.java | 125 +++++++++++++-------- .../resources/examples/inline_to_print_test.yaml | 15 +++ 8 files changed, 220 insertions(+), 93 deletions(-) diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java index 80350f5..f1170be 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java @@ -11,12 +11,12 @@ public final class CheckUDFContextUtil { // Check if all the params are present in the UDFContext public static CheckResult checkAllExists(UDFContext context, String... params) { - List missingParams = Arrays.stream(params) - .filter(param -> !isValidParam(context, param)) + List invalidParams = Arrays.stream(params) + .filter(param -> isInvalidParam(context, param)) .collect(Collectors.toList()); - if (!missingParams.isEmpty()) { - String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", missingParams)); + if (!invalidParams.isEmpty()) { + String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", invalidParams)); return CheckResult.error(errorMsg); } return CheckResult.success(); @@ -28,19 +28,21 @@ public final class CheckUDFContextUtil { return CheckResult.success(); } - List missingParams = Arrays.stream(params) - .filter(param -> !isValidParam(context, param)) + List invalidParams = Arrays.stream(params) + .filter(param -> isInvalidParam(context, param)) .collect(Collectors.toList()); - if (missingParams.size() == params.length) { - String errorMsg = java.lang.String.format("Please specify at least one config of [%s] as non-empty.", java.lang.String.join(",", missingParams)); + if (invalidParams.size() == params.length) { + String errorMsg = java.lang.String.format("Please specify at least one config of [%s] as non-empty.", java.lang.String.join(",", invalidParams)); return CheckResult.error(errorMsg); } return CheckResult.success(); } + + // Check Array/Map Object has only one item - public static CheckResult checkSingleItemExists (UDFContext context, String param) { + public static CheckResult checkCollectionSingleItemExists (UDFContext context, String param) { if (context == null) { return CheckResult.error("UDFContext is null"); } @@ -57,25 +59,46 @@ public final class CheckUDFContextUtil { } - public static boolean isValidParam(UDFContext context, String param) { + // Check Parameters contains keys + public static CheckResult checkParametersContainsKeys(UDFContext context, String... keys) { + if (context == null) { + return CheckResult.error("UDFContext is null"); + } + + if (context.getParameters() == null) { + return CheckResult.error("Parameters is null"); + } + + List missingKeys = Arrays.stream(keys) + .filter(key -> !context.getParameters().containsKey(key)) + .collect(Collectors.toList()); + + if (!missingKeys.isEmpty()) { + String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", missingKeys)); + return CheckResult.error(errorMsg); + } + return CheckResult.success(); + } + + public static boolean isInvalidParam(UDFContext context, String param) { if (context == null) { - return false; + return true; } if (UDFContextConfigOptions.NAME.key().equals(param)) { - return context.getName() != null; + return context.getName() == null; } else if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) { - return context.getLookupFields() != null && !context.getLookupFields().isEmpty(); + return context.getLookupFields() == null || context.getLookupFields().isEmpty(); } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) { - return context.getOutputFields() != null && !context.getOutputFields().isEmpty(); + return context.getOutputFields() == null || context.getOutputFields().isEmpty(); } else if (UDFContextConfigOptions.FILTER.key().equals(param)) { - return context.getFilter() != null; + return context.getFilter() == null; } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) { - return context.getParameters() != null && !context.getParameters().isEmpty(); + return context.getParameters() == null || context.getParameters().isEmpty(); } else if (UDFContextConfigOptions.FUNCTION.key().equals(param)) { - return context.getFunction() != null; + return context.getFunction() == null; } else { - return false; + return true; } } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java index 85bab48..ac36b02 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java @@ -29,8 +29,26 @@ public interface UDFContextConfigOptions { .noDefaultValue() .withDescription("The parameters for the function."); + Option PARAMETERS_KB_NAME = Options.key("kb_name") + .stringType() + .noDefaultValue() + .withDescription("The name of the knowledge base."); + + Option PARAMETERS_OPTION = Options.key("option") + .stringType() + .noDefaultValue() + .withDescription("The option for the function."); + + Option> PARAMETERS_GEOLOCATION_FIELD_MAPPING = Options.key("geolocation_field_mapping") + .mapType() + .noDefaultValue() + .withDescription("The geolocation field mapping."); + + Option FUNCTION = Options.key("function") .stringType() .noDefaultValue() .withDescription("The function to be executed."); + + } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java b/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java index e4d9f59..5298810 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java @@ -2,12 +2,13 @@ package com.geedgenetworks.common.exception; public enum CommonErrorCode implements GrootStreamErrorCodeSupplier { - UNSUPPORTED_OPERATION("GROOT-STREAM-COMMON-0001", "Unsupported operation exception"), - ILLEGAL_ARGUMENT("GROOT-STREAM-COMMON-0002", "Illegal argument exception"), - SYNTAX_ERROR("GROOT-STREAM-COMMON-0003", "Syntax Error"), - FILE_OPERATION_ERROR("GROOT-STREAM-COMMON-0004", "File operation failed, such as (read,list,write,move,copy,sync) etc..."), - - CONFIG_VALIDATION_FAILED("GROOT-STREAM-COMMON-0005", "Configuration item validate failed"), + UNSUPPORTED_OPERATION("GROOT-STREAM-COMMON-0001", "Unsupported operation."), + ILLEGAL_ARGUMENT("GROOT-STREAM-COMMON-0002", "Illegal argument."), + SYNTAX_ERROR("GROOT-STREAM-COMMON-0003", "Syntax error."), + FILE_OPERATION_ERROR("GROOT-STREAM-COMMON-0004", "File operation failed (e.g., read, list, write, move, copy, sync)."), + CONFIG_VALIDATION_FAILED("GROOT-STREAM-COMMON-0005", "Configuration item validation failed."), + JSON_OPERATION_FAILED( + "GROOT-STREAM-COMMON-0006", "JSON convert/parse operation failed."), ; private final String code; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java index bd1447a..ac282b3 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java @@ -2,8 +2,7 @@ package com.geedgenetworks.core.udf; import com.alibaba.fastjson2.JSON; import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.CommonConfig; -import com.geedgenetworks.common.config.KnowledgeBaseConfig; +import com.geedgenetworks.common.config.*; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.ScalarFunction; @@ -24,10 +23,23 @@ public class AsnLookup implements ScalarFunction { private String lookupFieldName; private String outputFieldName; + + enum Option { + IP_TO_ASN; + + public static boolean isValid(String option) { + try { + Option.valueOf(option); + return true; + } catch (IllegalArgumentException e) { + return false; + } + } + } @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - checkUdfContext(udfContext); + checkConfig(udfContext); this.kbName = udfContext.getParameters().get("kb_name").toString(); this.option = udfContext.getParameters().get("option").toString(); Configuration configuration = (Configuration) runtimeContext @@ -79,29 +91,38 @@ public class AsnLookup implements ScalarFunction { } } - private void checkUdfContext(UDFContext udfContext) { - - if (udfContext.getLookupFields() == null || udfContext.getOutputFields() == null || udfContext.getParameters() == null) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + @Override + public void checkConfig(UDFContext udfContext) { + CheckResult result = CheckUDFContextUtil.checkAllExists(udfContext, + UDFContextConfigOptions.LOOKUP_FIELDS.key(), + UDFContextConfigOptions.OUTPUT_FIELDS.key(), + UDFContextConfigOptions.PARAMETERS.key()); + + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg()); } - if (udfContext.getLookupFields().size() != 1) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value"); + result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key()); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); } - if (udfContext.getOutputFields().size() != 1) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); + + result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.OUTPUT_FIELDS.key()); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); } - if (!udfContext.getParameters().containsKey("kb_name")) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey kb_name"); + result = CheckUDFContextUtil.checkParametersContainsKeys(udfContext, UDFContextConfigOptions.PARAMETERS_KB_NAME.key(), UDFContextConfigOptions.PARAMETERS_OPTION.key()); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); } - if (!udfContext.getParameters().containsKey("option")) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey option"); - } else { - if (!udfContext.getParameters().get("option").toString().equals("IP_TO_ASN")) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct"); - } + + String optionValue = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_OPTION.key()).toString(); + if (!Option.isValid(optionValue)) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format( "UDF: %s, [%s] Option value is not correct.", + udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key())); } + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java index a92f211..74816f5 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java @@ -95,18 +95,18 @@ public class Domain implements ScalarFunction { throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg()); } - result = CheckUDFContextUtil.checkSingleItemExists(udfContext, UDFContextConfigOptions.OUTPUT_FIELDS.key()); + result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.OUTPUT_FIELDS.key()); if (!result.isSuccess()) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); } - if(!udfContext.getParameters().containsKey("option")){ + if(!udfContext.getParameters().containsKey(UDFContextConfigOptions.PARAMETERS_OPTION.key())){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format( "UDF: %s, [%s] Option should be specified.", udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key())); } - String optionValue = udfContext.getParameters().get("option").toString(); + String optionValue = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_OPTION.key()).toString(); if (!Option.isValid(optionValue)) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format( "UDF: %s, [%s] Option value is not correct.", udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key())); @@ -114,7 +114,6 @@ public class Domain implements ScalarFunction { } - @Override public String functionName() { return "DOMAIN"; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java index 93cd0db..c7f13c2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java @@ -1,5 +1,10 @@ package com.geedgenetworks.core.udf; +import com.geedgenetworks.common.config.CheckResult; +import com.geedgenetworks.common.config.CheckUDFContextUtil; +import com.geedgenetworks.common.config.UDFContextConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; @@ -10,6 +15,7 @@ public class Drop implements ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { + checkConfig(udfContext); } @Override @@ -23,6 +29,17 @@ public class Drop implements ScalarFunction { return "DROP"; } + @Override + public void checkConfig(UDFContext udfContext) { + CheckResult result = CheckUDFContextUtil.checkAllExists(udfContext, + UDFContextConfigOptions.FILTER.key()); + + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg()); + } + + } + @Override public void close() { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java index d73e1f2..e800e5d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java @@ -1,8 +1,7 @@ package com.geedgenetworks.core.udf; import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.CommonConfig; -import com.geedgenetworks.common.config.KnowledgeBaseConfig; +import com.geedgenetworks.common.config.*; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.ScalarFunction; @@ -28,13 +27,55 @@ public class GeoIpLookup implements ScalarFunction { private String outputFieldName; private Map geoLocationFieldMapping; + enum Option { + IP_TO_COUNTRY, + IP_TO_PROVINCE, + IP_TO_CITY, + IP_TO_SUBDIVISION_ADDR, + IP_TO_DETAIL, + IP_TO_LATLNG, + IP_TO_PROVIDER, + IP_TO_JSON, + IP_TO_OBJECT + ; + + public static boolean isValid(String option) { + try { + Option.valueOf(option); + return true; + } catch (IllegalArgumentException e) { + return false; + } + } + } + + enum GeolocationFieldMapping { + COUNTRY, + PROVINCE, + CITY, + LONGITUDE, + LATITUDE, + ISP, + ORGANIZATION + ; + + public static boolean isValid(String option) { + try { + GeolocationFieldMapping.valueOf(option); + return true; + } catch (IllegalArgumentException e) { + return false; + } + } + } + @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - checkUdfContext(udfContext); - this.kbName = udfContext.getParameters().get("kb_name").toString(); - this.option = udfContext.getParameters().get("option").toString(); - if (option.equals("IP_TO_OBJECT")) { - this.geoLocationFieldMapping = (Map) udfContext.getParameters().get("geolocation_field_mapping"); + checkConfig(udfContext); + this.kbName = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_KB_NAME.key()).toString(); + this.option = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_OPTION.key()).toString(); + if (option.equals(Option.IP_TO_OBJECT.name())) { + this.geoLocationFieldMapping = (Map) udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_GEOLOCATION_FIELD_MAPPING.key()); } Configuration configuration = (Configuration) runtimeContext .getExecutionConfig().getGlobalJobParameters(); @@ -63,7 +104,7 @@ public class GeoIpLookup implements ScalarFunction { public Event evaluate(Event event) { Object valueObj = event.getExtractedFields().get(lookupFieldName); if (valueObj!=null) { - if ("IP_TO_OBJECT".equals(option)) { + if (Option.IP_TO_OBJECT.name().equals(option)) { LocationResponse response = GeoIpKnowledgeBaseHandler.lookUpObject(kbName,valueObj.toString()); for (Map.Entry entry : geoLocationFieldMapping.entrySet()) { String result = ""; @@ -103,58 +144,50 @@ public class GeoIpLookup implements ScalarFunction { return event; } - private void checkUdfContext(UDFContext udfContext) { + @Override + public void checkConfig(UDFContext udfContext) { + CheckResult result = CheckUDFContextUtil.checkAllExists(udfContext, + UDFContextConfigOptions.LOOKUP_FIELDS.key(), + UDFContextConfigOptions.PARAMETERS.key()); - if (udfContext.getLookupFields() == null || udfContext.getParameters() == null) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); - } - if (udfContext.getLookupFields().size() != 1) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value"); - } - if (!udfContext.getParameters().containsKey("kb_name")) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey kb_name"); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg()); } - if (!udfContext.getParameters().containsKey("option")) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey option"); - } else { - if (!udfContext.getParameters().get("option").toString().equals("IP_TO_COUNTRY") && //IP_TO_COUNTRY - !udfContext.getParameters().get("option").toString().equals("IP_TO_PROVINCE") && //IP_TO_PROVINCE - !udfContext.getParameters().get("option").toString().equals("IP_TO_CITY") && //IP_TO_CITY - !udfContext.getParameters().get("option").toString().equals("IP_TO_SUBDIVISION_ADDR") && //IP_TO_SUBDIVISION_ADDR - !udfContext.getParameters().get("option").toString().equals("IP_TO_DETAIL") && //IP_TO_DETAIL - !udfContext.getParameters().get("option").toString().equals("IP_TO_LATLNG") && //IP_TO_LATLNG - !udfContext.getParameters().get("option").toString().equals("IP_TO_PROVIDER") && //IP_TO_PROVIDER - !udfContext.getParameters().get("option").toString().equals("IP_TO_JSON") && //IP_TO_JSON - !udfContext.getParameters().get("option").toString().equals("IP_TO_OBJECT")) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct"); - } - if (udfContext.getParameters().get("option").toString().equals("IP_TO_OBJECT")) { - if (!udfContext.getParameters().containsKey("geolocation_field_mapping")) { + result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key()); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); + } - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey geolocation_field_mapping"); + result = CheckUDFContextUtil.checkParametersContainsKeys(udfContext, UDFContextConfigOptions.PARAMETERS_KB_NAME.key(), UDFContextConfigOptions.PARAMETERS_OPTION.key()); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); + } - } else { - Map geolocation_field_mapping = (Map) udfContext.getParameters().get("geolocation_field_mapping"); + String optionValue = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_OPTION.key()).toString(); + if (!Option.isValid(optionValue)) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format( "UDF: %s, [%s] Option value is not correct.", + udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key())); + } - if (!geolocation_field_mapping.isEmpty()) { + if (optionValue.equals(Option.IP_TO_OBJECT.name())) { + result = CheckUDFContextUtil.checkParametersContainsKeys(udfContext, UDFContextConfigOptions.PARAMETERS_GEOLOCATION_FIELD_MAPPING.key()); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); + } - for (Map.Entry entry : geolocation_field_mapping.entrySet()) { + Map fieldMap = (Map) udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_GEOLOCATION_FIELD_MAPPING.key()); - if (!entry.getKey().equals("COUNTRY") && !entry.getKey().equals("PROVINCE") && !entry.getKey().equals("CITY") && !entry.getKey().equals("LONGITUDE") && !entry.getKey().equals("LATITUDE") && !entry.getKey().equals("ISP") && !entry.getKey().equals("ORGANIZATION")) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters geolocation_field_mapping value is not correct"); - } - } - } else { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters geolocation_field_mapping value is not correct"); - } + for (Map.Entry entry : fieldMap.entrySet()) { + if (!GeolocationFieldMapping.isValid(entry.getKey())) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters geolocation_field_mapping value is not correct"); } } + } } - @Override public String functionName() { return "GEOIP_LOOKUP"; diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml index 77afab8..047f1ba 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml @@ -92,6 +92,21 @@ processing_pipelines: kb_name: tsg_ip_location option: IP_TO_DETAIL + - function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ ] + parameters: + kb_name: tsg_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: server_country + PROVINCE: server_super_administrative_area + CITY: server_administrative_area + LONGITUDE: server_longitude + LATITUDE: server_latitude + ISP: server_isp + ORGANIZATION: server_organization + - function: JSON_EXTRACT lookup_fields: [ device_tag ] output_fields: [ device_group ] -- cgit v1.2.3 From 7c44ae14a4a67ffeb53dc68d86399ad16158eec4 Mon Sep 17 00:00:00 2001 From: doufenghu Date: Wed, 30 Oct 2024 20:26:32 +0800 Subject: [Fix][test] add required filter expression --- .../java/com/geedgenetworks/core/udf/test/simple/DropFunctionTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DropFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DropFunctionTest.java index 294a492..027533e 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DropFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DropFunctionTest.java @@ -16,6 +16,7 @@ public class DropFunctionTest { @BeforeAll public static void setUp() { udfContext = new UDFContext(); + udfContext.setFilter("true"); udfContext.setParameters(new HashMap<>()); } -- cgit v1.2.3