summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author窦凤虎 <[email protected]>2024-10-30 12:32:27 +0000
committer窦凤虎 <[email protected]>2024-10-30 12:32:27 +0000
commit7e9a0593fea1f42a0f1c955abbc4d72ea96d34a8 (patch)
tree95629e0b37cefdcd69eb97f15b5ac1935225f37c
parentc938ead4c0c6c51cbb3403ef6e39b0634a78abba (diff)
parent7c44ae14a4a67ffeb53dc68d86399ad16158eec4 (diff)
Merge branch 'improve/udf-config-check' into 'develop'
[Improve][core] Add CheckUDFContextUtil for verifying UDF configurations.... See merge request galaxy/platform/groot-stream!126
-rw-r--r--docs/develop-guide.md4
-rw-r--r--docs/processor/udf.md15
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java14
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java42
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java70
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java107
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java54
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java13
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java15
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java57
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java63
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java6
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java6
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java150
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java17
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java10
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java10
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/GenerateStringArray.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java131
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java10
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java10
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java6
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/StringJoiner.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java10
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeScalarFunction.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/ArrayElementsPrepend.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/BaseStationLookup.java10
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FieldsMerge.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/H3CellLookup.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java242
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Max.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Min.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java252
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/JsonUnroll.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/PathUnroll.java236
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/Unroll.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java6
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java6
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AnonymityLookupTest.java8
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AppCategoryLookupTest.java2
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/BaseStationLookupTest.java4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookupTest.java4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookupTest.java2
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookupTest.java4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/H3CellLookupTest.java4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IcpLookupTest.java4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IdcRenterLookupTest.java4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java24
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IocLookupTest.java8
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IpZoneLookupTest.java4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookupTest.java4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookupTest.java16
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/VpnLookupTest.java8
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java14
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java14
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java8
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java8
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java8
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java8
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java6
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MaxTest.java4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java20
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MinTest.java4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java8
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DecodeBase64FunctionTest.java8
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java27
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DropFunctionTest.java1
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java8
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java2
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/GenerateStringArrayFunctionTest.java4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/HmacFunctionTest.java4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java6
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/StringJoinerFunctionTest.java4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java24
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/JsonUnrollFunctionTest.java12
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnrollFunctionTest.java12
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantileTest.java176
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantilesTest.java194
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramTest.java202
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinctTest.java172
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldTest.java172
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/udtf/UnrollTest.java216
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml25
-rw-r--r--groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml4
100 files changed, 1700 insertions, 1509 deletions
diff --git a/docs/develop-guide.md b/docs/develop-guide.md
index 75e8803..927d2d3 100644
--- a/docs/develop-guide.md
+++ b/docs/develop-guide.md
@@ -21,7 +21,7 @@ Groot Stream based all stream processing on data records common known as events.
```json
{
"__timestamp": "<Timestamp in UNIX epoch format (milliseconds)>",
- "__input_id": "ID/Name of the source that delivered the event",
+ "__headers": "Map<String, String> headers of the source that delivered the event",
"__window_start_timestamp" : "<Timestamp in UNIX epoch format (milliseconds)>",
"__window_end_timestamp" : "<Timestamp in UNIX epoch format (milliseconds)>",
"key1": "<value1>",
@@ -35,7 +35,7 @@ Groot Stream add internal fields during pipeline processing. A few notes about i
- Treat internal fields as read-only. Modifying them can result in unintended consequences to your data flows.
- Internal fields only exist for the duration of the event processing pipeline. They are not documented under sources or sinks.
- If you do not configure a timestamp for extraction, the Pipeline process assigns the current time (in UNIX epoch format) to the __timestamp field.
-- If you have multiple sources, you can determine which source the event came form by looking at the `__input_id` field. For example, the Kafka source adds the topic name to the `__input_id` field.
+- If you have multiple sources, you can determine the origin of the event by examining the `__headers` field. For example, the Kafka source appends the topic name as the `__input_id` key in the `__headers`.
## How to write a high quality Git commit message
diff --git a/docs/processor/udf.md b/docs/processor/udf.md
index 0475192..e480275 100644
--- a/docs/processor/udf.md
+++ b/docs/processor/udf.md
@@ -96,18 +96,19 @@ Base64 encode function is commonly used to encode the binary data to base64 stri
```BASE64_ENCODE_TO_STRING(filter, output_fields[, parameters])```
- filter: optional
-- lookup_fields: not required
+- lookup_fields: required
- output_fields: required
- parameters: required
- - value_field: `<String>` required.
+ - input_type: `<String>` required. Enum: `string`, `byte_array`. The input type of the value field.
Example:
```yaml
- function: BASE64_ENCODE_TO_STRING
+ lookup_fields: [packet]
output_fields: [packet]
parameters:
- value_field: packet
+ input_type: string
```
### Current Unix Timestamp
@@ -141,7 +142,7 @@ Domain function is used to extract the domain from the url.
- parameters: required
- option: `<String>` required. Enum: `TOP_LEVEL_DOMAIN`, `FIRST_SIGNIFICANT_SUBDOMAIN`.
-#### Option
+**Option**
- `TOP_LEVEL_DOMAIN` is used to extract the top level domain from the url. For example, `www.abc.com` will be extracted to `com`.
- `FIRST_SIGNIFICANT_SUBDOMAIN` is used to extract the first significant subdomain from the url. For example, `www.abc.com` will be extracted to `abc.com`.
@@ -283,7 +284,7 @@ From unix timestamp function is used to convert the unix timestamp to date time
- parameters: optional
- precision: `<String>` optional. Default is `seconds`. Enum: `milliseconds`, `seconds`.
-#### Precision
+**Precision**
- `milliseconds` is used to convert the unix timestamp to milliseconds date time string. For example, `1619712000` will be converted to `2021-04-30 00:00:00.000`.
- `seconds` is used to convert the unix timestamp to seconds date time string. For example, `1619712000` will be converted to `2021-04-30 00:00:00`.
@@ -336,7 +337,7 @@ GeoIP lookup function is used to lookup the geoip information by ip address. You
- ISP: `<String>` optional.
- ORGANIZATION: `<String>` optional.
-#### Option
+**Option**
- `IP_TO_COUNTRY` is used to lookup the country or region information by ip address.
- `IP_TO_PROVINCE` is used to lookup the province or state information by ip address.
@@ -348,7 +349,7 @@ GeoIP lookup function is used to lookup the geoip information by ip address. You
- `IP_TO_JSON` is used to lookup the above information by ip address. The result is a json string.
- `IP_TO_OBJECT` is used to lookup the above information by ip address. The result is a `LocationResponse` object.
-#### GeoLocation Field Mapping
+**GeoLocation Field Mapping**
- `COUNTRY` is used to map the country information to the event field.
- `PROVINCE` is used to map the province information to the event field.
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
index b45d643..f5b1a5d 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
@@ -14,7 +14,7 @@ import java.net.URLClassLoader;
import java.util.*;
import java.util.function.BiConsumer;
-public abstract class AbstractExecutor<K, V>
+public abstract class AbstractExecutor<K, V>
implements Executor<DataStream<Event>, JobRuntimeEnvironment> {
protected JobRuntimeEnvironment jobRuntimeEnvironment;
protected final Config operatorConfig;
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
index 1719059..42a3a11 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
@@ -1,22 +1,14 @@
package com.geedgenetworks.bootstrap.execution;
-import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.config.*;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.core.pojo.AggregateConfig;
import com.geedgenetworks.core.pojo.ProcessorConfig;
-import com.geedgenetworks.core.pojo.ProjectionConfig;
-import com.geedgenetworks.core.pojo.TableConfig;
import com.geedgenetworks.core.processor.Processor;
-import com.geedgenetworks.core.processor.table.TableProcessor;
-import com.geedgenetworks.core.processor.aggregate.AggregateProcessor;
-import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
import com.typesafe.config.Config;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
import java.util.List;
@@ -59,7 +51,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
}
protected ProcessorConfig checkConfig(String key, Map<String, Object> value, Config processorsConfig) {
- ProcessorConfig projectionConfig = new ProcessorConfig();
+ ProcessorConfig ProcessorConfig = new ProcessorConfig();
boolean found = false; // 标志变量
CheckResult result = CheckConfigUtil.checkAllExists(processorsConfig.getConfig(key),
ProjectionConfigOptions.TYPE.key());
@@ -73,7 +65,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
if(processor.type().equals(value.getOrDefault("type", "").toString())){
found = true;
try {
- projectionConfig = processor.checkConfig(key, value, processorsConfig);
+ ProcessorConfig = processor.checkConfig(key, value, processorsConfig);
} catch (Exception e) {
throw new JobExecuteException("Create orderby pipeline instance failed!", e);
@@ -84,7 +76,7 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
if (!found) {
throw new JobExecuteException("No matching processor found for type: " + value.getOrDefault("type", "").toString());
}
- return projectionConfig;
+ return ProcessorConfig;
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java
index 96df69c..1d4e819 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java
@@ -12,6 +12,7 @@ public final class CheckConfigUtil {
private CheckConfigUtil() {}
public static CheckResult checkAllExists(Config config, String... params) {
+
List<String> missingParams =
Arrays.stream(params)
.filter(param -> !isValidParam(config, param))
@@ -20,11 +21,10 @@ public final class CheckConfigUtil {
if (!missingParams.isEmpty()) {
String errorMsg =
String.format(
- "please specify [%s] as non-empty.", String.join(",", missingParams));
+ "Please specify [%s] as non-empty.", String.join(",", missingParams));
return CheckResult.error(errorMsg);
- } else {
- return CheckResult.success();
}
+ return CheckResult.success();
}
/** check config if there was at least one usable */
@@ -33,48 +33,42 @@ public final class CheckConfigUtil {
return CheckResult.success();
}
- List<String> missingParams = new LinkedList<>();
- for (String param : params) {
- if (!isValidParam(config, param)) {
- missingParams.add(param);
- }
- }
+ List<String> missingParams = Arrays.stream(params)
+ .filter(param -> !isValidParam(config, param))
+ .collect(Collectors.toList());
if (missingParams.size() == params.length) {
String errorMsg =
String.format(
- "please specify at least one config of [%s] as non-empty.",
+ "Please specify at least one config of [%s] as non-empty.",
String.join(",", missingParams));
return CheckResult.error(errorMsg);
- } else {
- return CheckResult.success();
}
+ return CheckResult.success();
}
- public static boolean isValidParam(Config config, String param) {
- boolean isValidParam = true;
+ public static boolean isValidParam(Config config, String param) {
if (!config.hasPath(param)) {
- isValidParam = false;
- } else if (config.getAnyRef(param) instanceof List) {
- isValidParam = !((List<?>) config.getAnyRef(param)).isEmpty();
+ return false;
}
- return isValidParam;
+ Object value = config.getAnyRef(param);
+ return !(value instanceof List && ((List<?>) value).isEmpty());
}
/** merge all check result */
public static CheckResult mergeCheckResults(CheckResult... checkResults) {
+
List<CheckResult> notPassConfig =
Arrays.stream(checkResults)
.filter(item -> !item.isSuccess())
.collect(Collectors.toList());
if (notPassConfig.isEmpty()) {
return CheckResult.success();
- } else {
- String errMessage =
- notPassConfig.stream()
- .map(CheckResult::getMsg)
- .collect(Collectors.joining(","));
- return CheckResult.error(errMessage);
}
+ String errMessage =
+ notPassConfig.stream()
+ .map(CheckResult::getMsg)
+ .collect(Collectors.joining(","));
+ return CheckResult.error(errMessage);
}
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java
index 5bf0196..e8e47f3 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java
@@ -1,8 +1,13 @@
package com.geedgenetworks.common.config;
+import lombok.Data;
+
+@Data
public class CheckResult {
private static final CheckResult SUCCESS = new CheckResult(true, "");
+
private boolean success;
+
private String msg;
private CheckResult(boolean success, String msg) {
@@ -10,71 +15,16 @@ public class CheckResult {
this.msg = msg;
}
+ /** @return a successful instance of CheckResult */
public static CheckResult success() {
return SUCCESS;
}
+ /**
+ * @param msg the error message
+ * @return an error instance of CheckResult
+ */
public static CheckResult error(String msg) {
return new CheckResult(false, msg);
}
-
- public boolean isSuccess() {
- return this.success;
- }
-
- public String getMsg() {
- return this.msg;
- }
-
- public void setSuccess(boolean success) {
- this.success = success;
- }
-
- public void setMsg(String msg) {
- this.msg = msg;
- }
-
- public boolean equals(Object o) {
- if (o == this) {
- return true;
- } else if (!(o instanceof CheckResult)) {
- return false;
- } else {
- CheckResult other = (CheckResult)o;
- if (!other.canEqual(this)) {
- return false;
- } else if (this.isSuccess() != other.isSuccess()) {
- return false;
- } else {
- Object this$msg = this.getMsg();
- Object other$msg = other.getMsg();
- if (this$msg == null) {
- if (other$msg != null) {
- return false;
- }
- } else if (!this$msg.equals(other$msg)) {
- return false;
- }
-
- return true;
- }
- }
- }
-
- protected boolean canEqual(Object other) {
- return other instanceof CheckResult;
- }
-
- public int hashCode() {
- int PRIME = 59;
- int result = 1;
- result = result * PRIME + (this.isSuccess() ? 79 : 97);
- Object $msg = this.getMsg();
- result = result * PRIME + ($msg == null ? 43 : $msg.hashCode());
- return result;
- }
-
- public String toString() {
- return "CheckResult(success=" + this.isSuccess() + ", msg=" + this.getMsg() + ")";
- }
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java
new file mode 100644
index 0000000..f1170be
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java
@@ -0,0 +1,107 @@
+package com.geedgenetworks.common.config;
+
+import com.geedgenetworks.common.udf.UDFContext;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public final class CheckUDFContextUtil {
+
+ private CheckUDFContextUtil() {}
+
+ // Check if all the params are present in the UDFContext
+ public static CheckResult checkAllExists(UDFContext context, String... params) {
+ List<String> invalidParams = Arrays.stream(params)
+ .filter(param -> isInvalidParam(context, param))
+ .collect(Collectors.toList());
+
+ if (!invalidParams.isEmpty()) {
+ String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", invalidParams));
+ return CheckResult.error(errorMsg);
+ }
+ return CheckResult.success();
+ }
+
+ // Check if at least one of the params is present in the UDFContext
+ public static CheckResult checkAtLeastOneExists(UDFContext context, String... params) {
+ if (params.length == 0) {
+ return CheckResult.success();
+ }
+
+ List<String> invalidParams = Arrays.stream(params)
+ .filter(param -> isInvalidParam(context, param))
+ .collect(Collectors.toList());
+
+ if (invalidParams.size() == params.length) {
+ String errorMsg = java.lang.String.format("Please specify at least one config of [%s] as non-empty.", java.lang.String.join(",", invalidParams));
+ return CheckResult.error(errorMsg);
+ }
+ return CheckResult.success();
+ }
+
+
+
+ // Check Array/Map Object has only one item
+ public static CheckResult checkCollectionSingleItemExists (UDFContext context, String param) {
+ if (context == null) {
+ return CheckResult.error("UDFContext is null");
+ }
+
+ if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) {
+ return context.getLookupFields() != null && context.getLookupFields().size() == 1 ? CheckResult.success() : CheckResult.error("Lookup fields should have only one item");
+ } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) {
+ return context.getOutputFields() != null && context.getOutputFields().size() == 1 ? CheckResult.success() : CheckResult.error("Output fields should have only one item");
+ } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) {
+ return context.getParameters() != null && context.getParameters().size() == 1 ? CheckResult.success() : CheckResult.error("Parameters should have only one item");
+ } else {
+ return CheckResult.error("Invalid param");
+ }
+
+ }
+
+ // Check Parameters contains keys
+ public static CheckResult checkParametersContainsKeys(UDFContext context, String... keys) {
+ if (context == null) {
+ return CheckResult.error("UDFContext is null");
+ }
+
+ if (context.getParameters() == null) {
+ return CheckResult.error("Parameters is null");
+ }
+
+ List<String> missingKeys = Arrays.stream(keys)
+ .filter(key -> !context.getParameters().containsKey(key))
+ .collect(Collectors.toList());
+
+ if (!missingKeys.isEmpty()) {
+ String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", missingKeys));
+ return CheckResult.error(errorMsg);
+ }
+ return CheckResult.success();
+ }
+
+ public static boolean isInvalidParam(UDFContext context, String param) {
+ if (context == null) {
+ return true;
+ }
+
+ if (UDFContextConfigOptions.NAME.key().equals(param)) {
+ return context.getName() == null;
+ } else if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) {
+ return context.getLookupFields() == null || context.getLookupFields().isEmpty();
+ } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) {
+ return context.getOutputFields() == null || context.getOutputFields().isEmpty();
+ } else if (UDFContextConfigOptions.FILTER.key().equals(param)) {
+ return context.getFilter() == null;
+ } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) {
+ return context.getParameters() == null || context.getParameters().isEmpty();
+ } else if (UDFContextConfigOptions.FUNCTION.key().equals(param)) {
+ return context.getFunction() == null;
+ } else {
+ return true;
+ }
+
+ }
+
+
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java
new file mode 100644
index 0000000..ac36b02
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java
@@ -0,0 +1,54 @@
+package com.geedgenetworks.common.config;
+
+import java.util.List;
+import java.util.Map;
+import com.alibaba.fastjson2.TypeReference;
+public interface UDFContextConfigOptions {
+ Option<String> NAME = Options.key("name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The name of the function.");
+
+ Option<List<String>> LOOKUP_FIELDS = Options.key("lookup_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The fields to be looked up.");
+
+ Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The fields to be outputted.");
+
+ Option<String> FILTER = Options.key("filter")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The filter expression to be applied.");
+
+ Option<Map<String, Object>> PARAMETERS = Options.key("parameters")
+ .type(new TypeReference<Map<String, Object>>() {})
+ .noDefaultValue()
+ .withDescription("The parameters for the function.");
+
+ Option<String> PARAMETERS_KB_NAME = Options.key("kb_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The name of the knowledge base.");
+
+ Option<String> PARAMETERS_OPTION = Options.key("option")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The option for the function.");
+
+ Option<Map<String, String>> PARAMETERS_GEOLOCATION_FIELD_MAPPING = Options.key("geolocation_field_mapping")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("The geolocation field mapping.");
+
+
+ Option<String> FUNCTION = Options.key("function")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The function to be executed.");
+
+
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java b/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java
index e4d9f59..5298810 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java
@@ -2,12 +2,13 @@ package com.geedgenetworks.common.exception;
public enum CommonErrorCode implements GrootStreamErrorCodeSupplier {
- UNSUPPORTED_OPERATION("GROOT-STREAM-COMMON-0001", "Unsupported operation exception"),
- ILLEGAL_ARGUMENT("GROOT-STREAM-COMMON-0002", "Illegal argument exception"),
- SYNTAX_ERROR("GROOT-STREAM-COMMON-0003", "Syntax Error"),
- FILE_OPERATION_ERROR("GROOT-STREAM-COMMON-0004", "File operation failed, such as (read,list,write,move,copy,sync) etc..."),
-
- CONFIG_VALIDATION_FAILED("GROOT-STREAM-COMMON-0005", "Configuration item validate failed"),
+ UNSUPPORTED_OPERATION("GROOT-STREAM-COMMON-0001", "Unsupported operation."),
+ ILLEGAL_ARGUMENT("GROOT-STREAM-COMMON-0002", "Illegal argument."),
+ SYNTAX_ERROR("GROOT-STREAM-COMMON-0003", "Syntax error."),
+ FILE_OPERATION_ERROR("GROOT-STREAM-COMMON-0004", "File operation failed (e.g., read, list, write, move, copy, sync)."),
+ CONFIG_VALIDATION_FAILED("GROOT-STREAM-COMMON-0005", "Configuration item validation failed."),
+ JSON_OPERATION_FAILED(
+ "GROOT-STREAM-COMMON-0006", "JSON convert/parse operation failed."),
;
private final String code;
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java
index 2aab34b..2723652 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java
@@ -1,5 +1,9 @@
package com.geedgenetworks.common.udf;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.config.CheckUDFContextUtil;
+import com.geedgenetworks.common.config.UDFContextConfigOptions;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import org.apache.flink.api.common.functions.RuntimeContext;
import java.io.Serializable;
@@ -13,4 +17,15 @@ public interface ScalarFunction extends Serializable {
void close();
+ default void checkConfig(UDFContext udfContext) {
+ if (udfContext == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "UDFContext cannot be null");
+ }
+
+ if (!CheckUDFContextUtil.checkAtLeastOneExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key(), UDFContextConfigOptions.OUTPUT_FIELDS.key(), UDFContextConfigOptions.FILTER.key()).isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "At least one of the config should be specified.");
+ }
+
+ }
+
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java
index 4062924..ea98226 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java
@@ -1,63 +1,22 @@
package com.geedgenetworks.common.udf;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+
import java.io.Serializable;
import java.util.List;
import java.util.Map;
+@Data
public class UDFContext implements Serializable {
private String name;
- private List<String> lookup_fields;
- private List<String> output_fields;
+ @JsonProperty("lookup_fields")
+ private List<String> lookupFields;
+ @JsonProperty("output_fields")
+ private List<String> outputFields;
private String filter;
private Map<String, Object> parameters;
private String function;
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public List<String> getLookup_fields() {
- return lookup_fields;
- }
-
- public void setLookup_fields(List<String> lookup_fields) {
- this.lookup_fields = lookup_fields;
- }
-
- public List<String> getOutput_fields() {
- return output_fields;
- }
-
- public void setOutput_fields(List<String> output_fields) {
- this.output_fields = output_fields;
- }
-
- public String getFilter() {
- return filter;
- }
-
- public void setFilter(String filter) {
- this.filter = filter;
- }
-
- public Map<String, Object> getParameters() {
- return parameters;
- }
-
- public void setParameters(Map<String, Object> parameters) {
- this.parameters = parameters;
- }
-
- public String getFunction() {
- return function;
- }
-
- public void setFunction(String function) {
- this.function = function;
- }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
index 6d8373e..ac282b3 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
@@ -2,8 +2,7 @@ package com.geedgenetworks.core.udf;
import com.alibaba.fastjson2.JSON;
import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.CommonConfig;
-import com.geedgenetworks.common.config.KnowledgeBaseConfig;
+import com.geedgenetworks.common.config.*;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.ScalarFunction;
@@ -24,10 +23,23 @@ public class AsnLookup implements ScalarFunction {
private String lookupFieldName;
private String outputFieldName;
+
+ enum Option {
+ IP_TO_ASN;
+
+ public static boolean isValid(String option) {
+ try {
+ Option.valueOf(option);
+ return true;
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ }
+ }
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- checkUdfContext(udfContext);
+ checkConfig(udfContext);
this.kbName = udfContext.getParameters().get("kb_name").toString();
this.option = udfContext.getParameters().get("option").toString();
Configuration configuration = (Configuration) runtimeContext
@@ -44,8 +56,8 @@ public class AsnLookup implements ScalarFunction {
} else {
log.error("AsnLookup init KnowledgeBase error ");
}
- this.lookupFieldName = udfContext.getLookup_fields().get(0);
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.lookupFieldName = udfContext.getLookupFields().get(0);
+ this.outputFieldName = udfContext.getOutputFields().get(0);
AsnKnowledgeBaseHandler.increment();
log.warn("AsnKnowledgeBaseHandlerCount "+AsnKnowledgeBaseHandler.getCount());
}
@@ -79,29 +91,38 @@ public class AsnLookup implements ScalarFunction {
}
}
- private void checkUdfContext(UDFContext udfContext) {
-
- if (udfContext.getLookup_fields() == null || udfContext.getOutput_fields() == null || udfContext.getParameters() == null) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ @Override
+ public void checkConfig(UDFContext udfContext) {
+ CheckResult result = CheckUDFContextUtil.checkAllExists(udfContext,
+ UDFContextConfigOptions.LOOKUP_FIELDS.key(),
+ UDFContextConfigOptions.OUTPUT_FIELDS.key(),
+ UDFContextConfigOptions.PARAMETERS.key());
+
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg());
}
- if (udfContext.getLookup_fields().size() != 1) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value");
+ result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key());
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
}
- if (udfContext.getOutput_fields().size() != 1) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
+
+ result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.OUTPUT_FIELDS.key());
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
}
- if (!udfContext.getParameters().containsKey("kb_name")) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey kb_name");
+ result = CheckUDFContextUtil.checkParametersContainsKeys(udfContext, UDFContextConfigOptions.PARAMETERS_KB_NAME.key(), UDFContextConfigOptions.PARAMETERS_OPTION.key());
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
}
- if (!udfContext.getParameters().containsKey("option")) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey option");
- } else {
- if (!udfContext.getParameters().get("option").toString().equals("IP_TO_ASN")) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct");
- }
+
+ String optionValue = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_OPTION.key()).toString();
+ if (!Option.isValid(optionValue)) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format( "UDF: %s, [%s] Option value is not correct.",
+ udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key()));
}
+
}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java
index 5770201..98b2d68 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java
@@ -18,10 +18,10 @@ public class CurrentUnixTimestamp implements ScalarFunction {
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if( udfContext.getOutput_fields()==null || udfContext.getParameters() == null){
+ if( udfContext.getOutputFields()==null || udfContext.getParameters() == null){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- if(udfContext.getOutput_fields().size() != 1){
+ if(udfContext.getOutputFields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
if(!udfContext.getParameters().containsKey("precision")){
@@ -34,7 +34,7 @@ public class CurrentUnixTimestamp implements ScalarFunction {
}
}
this.precision = udfContext.getParameters().get("precision").toString();
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.outputFieldName = udfContext.getOutputFields().get(0);
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java
index 663e626..bc8563a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java
@@ -22,16 +22,16 @@ public class DecodeBase64 implements ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if(udfContext.getParameters()==null || udfContext.getOutput_fields()==null){
+ if(udfContext.getParameters()==null || udfContext.getOutputFields()==null){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- if(udfContext.getOutput_fields().size() != 1){
+ if(udfContext.getOutputFields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
if(!udfContext.getParameters().containsKey("value_field") ||!udfContext.getParameters().containsKey("charset_field") ){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey value_field and charset_field");
}
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.outputFieldName = udfContext.getOutputFields().get(0);
this.valueField =udfContext.getParameters().get("value_field").toString();
this.charsetField =udfContext.getParameters().get("charset_field").toString();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
index 7bc293e..74816f5 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
@@ -1,106 +1,124 @@
package com.geedgenetworks.core.udf;
-
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.config.CheckResult;
+import com.geedgenetworks.common.config.CheckUDFContextUtil;
+import com.geedgenetworks.common.config.UDFContextConfigOptions;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.ScalarFunction;
-import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.shaded.com.google.common.net.InternetDomainName;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
-
import java.util.List;
-
import static com.geedgenetworks.utils.FormatUtils.getTopPrivateDomain;
@Slf4j
public class Domain implements ScalarFunction {
-
-
- private String option;
+ private Option option;
private List<String> lookupFields;
private String outputFieldName;
- @Override
- public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null || udfContext.getOutput_fields()==null || udfContext.getParameters() == null){
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
- }
- if(udfContext.getLookup_fields().isEmpty()){
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup field is not empty");
- }
- if(udfContext.getOutput_fields().size() != 1){
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
- }
- if(!udfContext.getParameters().containsKey("option")){
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey option");
- }
- else{
- if(!udfContext.getParameters().get("option").toString().equals("TOP_LEVEL_DOMAIN") &&
- !udfContext.getParameters().get("option").toString().equals("FIRST_SIGNIFICANT_SUBDOMAIN") &&
- !udfContext.getParameters().get("option").toString().equals("FQDN")){
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct");
+ enum Option {
+ TOP_LEVEL_DOMAIN,
+ FIRST_SIGNIFICANT_SUBDOMAIN,
+ FQDN;
+
+ public static boolean isValid(String option) {
+ try {
+ Option.valueOf(option);
+ return true;
+ } catch (IllegalArgumentException e) {
+ return false;
}
}
- this.option = udfContext.getParameters().get("option").toString();
- this.lookupFields = udfContext.getLookup_fields();
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ }
+ @Override
+ public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
+ checkConfig(udfContext);
+ this.option = Option.valueOf(udfContext.getParameters().get("option").toString());
+ this.lookupFields = udfContext.getLookupFields();
+ this.outputFieldName = udfContext.getOutputFields().get(0);
}
@Override
public Event evaluate(Event event) {
String domain = "";
- switch (option) {
- case "TOP_LEVEL_DOMAIN":
- for (String lookupField : lookupFields){
-
- Object valueObj = event.getExtractedFields().get(lookupField);
- if (valueObj!=null) {
- domain = getTopPrivateDomain((String) valueObj);
- if (domain.contains(".")) {
- domain = domain.substring(domain.indexOf(".") + 1);
- }
- if(!domain.isEmpty()){
- break;
- }
- }
- }
- break;
- case "FIRST_SIGNIFICANT_SUBDOMAIN":
- for (String lookupField : lookupFields){
- Object valueObj = event.getExtractedFields().get(lookupField);
- if (valueObj!=null) {
- domain = getTopPrivateDomain((String)valueObj);
- }
- if(!domain.isEmpty()){
+ for (String fieldName : lookupFields) {
+ Object fieldValue = event.getExtractedFields().get(fieldName);
+ if (fieldValue == null) {
+ continue;
+ }
+
+ String value = fieldValue.toString();
+ try {
+
+ String topPrivateDomain = getTopPrivateDomain(value);
+
+ switch (option) {
+ case TOP_LEVEL_DOMAIN:
+ domain = InternetDomainName.from(topPrivateDomain).publicSuffix().toString();
break;
- }
- }
- break;
- case "FQDN":
- for (String lookupField : lookupFields) {
- Object valueObj = event.getExtractedFields().get(lookupField);
- if (valueObj!=null) {
- domain = (String)valueObj;
- }
- if(!domain.isEmpty()){
+ case FIRST_SIGNIFICANT_SUBDOMAIN:
+ domain = topPrivateDomain;
+ break;
+ case FQDN: // Use the original value
+ domain = value;
break;
- }
+ default:
+ throw new IllegalArgumentException("Unknown option: " + option);
}
- break;
+
+ if (!domain.isEmpty()) { // Found a valid domain will break the loop
+ break;
+ }
+
+ } catch (IllegalArgumentException e) {
+ log.error("Invalid domain: {}", value);
+ }
}
+
event.getExtractedFields().put(outputFieldName, domain);
return event;
}
@Override
+ public void checkConfig(UDFContext udfContext) {
+ CheckResult result = CheckUDFContextUtil.checkAllExists(udfContext,
+ UDFContextConfigOptions.LOOKUP_FIELDS.key(),
+ UDFContextConfigOptions.OUTPUT_FIELDS.key(),
+ UDFContextConfigOptions.PARAMETERS.key());
+
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg());
+ }
+
+ result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.OUTPUT_FIELDS.key());
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
+ }
+
+ if(!udfContext.getParameters().containsKey(UDFContextConfigOptions.PARAMETERS_OPTION.key())){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format(
+ "UDF: %s, [%s] Option should be specified.",
+ udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key()));
+ }
+
+ String optionValue = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_OPTION.key()).toString();
+ if (!Option.isValid(optionValue)) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format( "UDF: %s, [%s] Option value is not correct.",
+ udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key()));
+ }
+
+ }
+
+ @Override
public String functionName() {
return "DOMAIN";
}
-
-
@Override
public void close() {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java
index 93cd0db..c7f13c2 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java
@@ -1,5 +1,10 @@
package com.geedgenetworks.core.udf;
+import com.geedgenetworks.common.config.CheckResult;
+import com.geedgenetworks.common.config.CheckUDFContextUtil;
+import com.geedgenetworks.common.config.UDFContextConfigOptions;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
@@ -10,6 +15,7 @@ public class Drop implements ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
+ checkConfig(udfContext);
}
@Override
@@ -24,6 +30,17 @@ public class Drop implements ScalarFunction {
}
@Override
+ public void checkConfig(UDFContext udfContext) {
+ CheckResult result = CheckUDFContextUtil.checkAllExists(udfContext,
+ UDFContextConfigOptions.FILTER.key());
+
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg());
+ }
+
+ }
+
+ @Override
public void close() {
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java
index fe45fff..a950252 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java
@@ -22,20 +22,20 @@ public class EncodeBase64 implements ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if(udfContext.getParameters()==null || udfContext.getOutput_fields()==null || udfContext.getLookup_fields() == null){
+ if(udfContext.getParameters()==null || udfContext.getOutputFields()==null || udfContext.getLookupFields() == null){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- if(udfContext.getOutput_fields().size() != 1){
+ if(udfContext.getOutputFields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
- if(udfContext.getLookup_fields().size() != 1){
+ if(udfContext.getLookupFields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value");
}
if(!udfContext.getParameters().containsKey("input_type") ){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey value_field ");
}
- this.outputFieldName = udfContext.getOutput_fields().get(0);
- this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutputFields().get(0);
+ this.lookupFieldName = udfContext.getLookupFields().get(0);
this.input_type =udfContext.getParameters().get("input_type").toString();
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java
index 7fdbfa0..c8e21b2 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java
@@ -51,8 +51,8 @@ public class Encrypt implements ScalarFunction {
if (udfContext.getParameters().containsKey("default_val")) {
this.defaultVal = udfContext.getParameters().get("default_val").toString();
}
- this.lookupFieldName = udfContext.getLookup_fields().get(0);
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.lookupFieldName = udfContext.getLookupFields().get(0);
+ this.outputFieldName = udfContext.getOutputFields().get(0);
this.identifier = udfContext.getParameters().get("identifier").toString();
Configuration configuration = (Configuration) runtimeContext.getExecutionConfig().getGlobalJobParameters();
CommonConfig commonConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class);
@@ -139,10 +139,10 @@ public class Encrypt implements ScalarFunction {
if (udfContext.getParameters() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- if (udfContext.getLookup_fields().size() != 1) {
+ if (udfContext.getLookupFields().size() != 1) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value");
}
- if (udfContext.getOutput_fields().size() != 1) {
+ if (udfContext.getOutputFields().size() != 1) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
if (!udfContext.getParameters().containsKey("identifier")) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java
index 1b83d94..b04dc97 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Eval.java
@@ -21,14 +21,14 @@ public class Eval implements ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if(udfContext.getOutput_fields()==null || udfContext.getParameters() == null){
+ if(udfContext.getOutputFields()==null || udfContext.getParameters() == null){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
if(!udfContext.getParameters().containsKey("value_expression")){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey value_expression");
}
String expr = (String) udfContext.getParameters().get("value_expression");
- List<String> outputField = udfContext.getOutput_fields();
+ List<String> outputField = udfContext.getOutputFields();
output = outputField.get(0);
calc = new EvalExecutor(expr);
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java
index 84c2c2a..d5d5761 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java
@@ -30,7 +30,7 @@ public class Flatten implements ScalarFunction {
prefix = udfContext.getParameters().getOrDefault("prefix", "").toString();
delimiter = udfContext.getParameters().getOrDefault("delimiter", ".").toString();
flattenKeys = new HashSet<>();
- for (String key : udfContext.getLookup_fields()) {
+ for (String key : udfContext.getLookupFields()) {
this.flattenKeys.add(prefix.isEmpty() ? key : prefix + delimiter + key);
}
depth = Integer.parseInt(udfContext.getParameters().getOrDefault("depth", "5").toString());
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java
index d9381cf..d8803c3 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java
@@ -23,14 +23,14 @@ public class FromUnixTimestamp implements ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null || udfContext.getOutput_fields()==null || udfContext.getParameters() == null){
+ if(udfContext.getLookupFields()==null || udfContext.getOutputFields()==null || udfContext.getParameters() == null){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- if(udfContext.getOutput_fields().size() != 1){
+ if(udfContext.getOutputFields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
- if(udfContext.getLookup_fields().size() != 1){
+ if(udfContext.getLookupFields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value");
}
if(!udfContext.getParameters().containsKey("precision")){
@@ -46,8 +46,8 @@ public class FromUnixTimestamp implements ScalarFunction {
}
this.precision = udfContext.getParameters().get("precision").toString();
- this.outputFieldName = udfContext.getOutput_fields().get(0);
- this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutputFields().get(0);
+ this.lookupFieldName = udfContext.getLookupFields().get(0);
this.sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
switch (precision) {
case "seconds":
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GenerateStringArray.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GenerateStringArray.java
index ce4fc48..366f204 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GenerateStringArray.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GenerateStringArray.java
@@ -18,14 +18,14 @@ public class GenerateStringArray implements ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null || udfContext.getOutput_fields()==null ){
+ if(udfContext.getLookupFields()==null || udfContext.getOutputFields()==null ){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- if(udfContext.getOutput_fields().size() != 1){
+ if(udfContext.getOutputFields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
- this.lookupFieldNames = udfContext.getLookup_fields();
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.lookupFieldNames = udfContext.getLookupFields();
+ this.outputFieldName = udfContext.getOutputFields().get(0);
}
@Override
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
index 559748e..e800e5d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
@@ -1,8 +1,7 @@
package com.geedgenetworks.core.udf;
import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.CommonConfig;
-import com.geedgenetworks.common.config.KnowledgeBaseConfig;
+import com.geedgenetworks.common.config.*;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.ScalarFunction;
@@ -28,13 +27,55 @@ public class GeoIpLookup implements ScalarFunction {
private String outputFieldName;
private Map<String, String> geoLocationFieldMapping;
+ enum Option {
+ IP_TO_COUNTRY,
+ IP_TO_PROVINCE,
+ IP_TO_CITY,
+ IP_TO_SUBDIVISION_ADDR,
+ IP_TO_DETAIL,
+ IP_TO_LATLNG,
+ IP_TO_PROVIDER,
+ IP_TO_JSON,
+ IP_TO_OBJECT
+ ;
+
+ public static boolean isValid(String option) {
+ try {
+ Option.valueOf(option);
+ return true;
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ }
+ }
+
+ enum GeolocationFieldMapping {
+ COUNTRY,
+ PROVINCE,
+ CITY,
+ LONGITUDE,
+ LATITUDE,
+ ISP,
+ ORGANIZATION
+ ;
+
+ public static boolean isValid(String option) {
+ try {
+ GeolocationFieldMapping.valueOf(option);
+ return true;
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ }
+ }
+
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- checkUdfContext(udfContext);
- this.kbName = udfContext.getParameters().get("kb_name").toString();
- this.option = udfContext.getParameters().get("option").toString();
- if (option.equals("IP_TO_OBJECT")) {
- this.geoLocationFieldMapping = (Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping");
+ checkConfig(udfContext);
+ this.kbName = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_KB_NAME.key()).toString();
+ this.option = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_OPTION.key()).toString();
+ if (option.equals(Option.IP_TO_OBJECT.name())) {
+ this.geoLocationFieldMapping = (Map<String, String>) udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_GEOLOCATION_FIELD_MAPPING.key());
}
Configuration configuration = (Configuration) runtimeContext
.getExecutionConfig().getGlobalJobParameters();
@@ -50,9 +91,9 @@ public class GeoIpLookup implements ScalarFunction {
} else {
log.error("GeoIpLookup init KnowledgeBase error ");
}
- this.lookupFieldName = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()){
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.lookupFieldName = udfContext.getLookupFields().get(0);
+ if(udfContext.getOutputFields()!=null && !udfContext.getOutputFields().isEmpty()){
+ this.outputFieldName = udfContext.getOutputFields().get(0);
}
GeoIpKnowledgeBaseHandler.increment();
log.warn("GeoIpKnowledgeBaseHandler "+GeoIpKnowledgeBaseHandler.getCount());
@@ -63,7 +104,7 @@ public class GeoIpLookup implements ScalarFunction {
public Event evaluate(Event event) {
Object valueObj = event.getExtractedFields().get(lookupFieldName);
if (valueObj!=null) {
- if ("IP_TO_OBJECT".equals(option)) {
+ if (Option.IP_TO_OBJECT.name().equals(option)) {
LocationResponse response = GeoIpKnowledgeBaseHandler.lookUpObject(kbName,valueObj.toString());
for (Map.Entry<String, String> entry : geoLocationFieldMapping.entrySet()) {
String result = "";
@@ -103,58 +144,50 @@ public class GeoIpLookup implements ScalarFunction {
return event;
}
- private void checkUdfContext(UDFContext udfContext) {
+ @Override
+ public void checkConfig(UDFContext udfContext) {
+ CheckResult result = CheckUDFContextUtil.checkAllExists(udfContext,
+ UDFContextConfigOptions.LOOKUP_FIELDS.key(),
+ UDFContextConfigOptions.PARAMETERS.key());
- if (udfContext.getLookup_fields() == null || udfContext.getParameters() == null) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
- }
- if (udfContext.getLookup_fields().size() != 1) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value");
- }
- if (!udfContext.getParameters().containsKey("kb_name")) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey kb_name");
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg());
}
- if (!udfContext.getParameters().containsKey("option")) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey option");
- } else {
- if (!udfContext.getParameters().get("option").toString().equals("IP_TO_COUNTRY") && //IP_TO_COUNTRY
- !udfContext.getParameters().get("option").toString().equals("IP_TO_PROVINCE") && //IP_TO_PROVINCE
- !udfContext.getParameters().get("option").toString().equals("IP_TO_CITY") && //IP_TO_CITY
- !udfContext.getParameters().get("option").toString().equals("IP_TO_SUBDIVISION_ADDR") && //IP_TO_SUBDIVISION_ADDR
- !udfContext.getParameters().get("option").toString().equals("IP_TO_DETAIL") && //IP_TO_DETAIL
- !udfContext.getParameters().get("option").toString().equals("IP_TO_LATLNG") && //IP_TO_LATLNG
- !udfContext.getParameters().get("option").toString().equals("IP_TO_PROVIDER") && //IP_TO_PROVIDER
- !udfContext.getParameters().get("option").toString().equals("IP_TO_JSON") && //IP_TO_JSON
- !udfContext.getParameters().get("option").toString().equals("IP_TO_OBJECT")) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct");
- }
- if (udfContext.getParameters().get("option").toString().equals("IP_TO_OBJECT")) {
- if (!udfContext.getParameters().containsKey("geolocation_field_mapping")) {
+ result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key());
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
+ }
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey geolocation_field_mapping");
+ result = CheckUDFContextUtil.checkParametersContainsKeys(udfContext, UDFContextConfigOptions.PARAMETERS_KB_NAME.key(), UDFContextConfigOptions.PARAMETERS_OPTION.key());
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
+ }
- } else {
- Map<String, String> geolocation_field_mapping = (Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping");
+ String optionValue = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_OPTION.key()).toString();
+ if (!Option.isValid(optionValue)) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format( "UDF: %s, [%s] Option value is not correct.",
+ udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key()));
+ }
- if (!geolocation_field_mapping.isEmpty()) {
+ if (optionValue.equals(Option.IP_TO_OBJECT.name())) {
+ result = CheckUDFContextUtil.checkParametersContainsKeys(udfContext, UDFContextConfigOptions.PARAMETERS_GEOLOCATION_FIELD_MAPPING.key());
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
+ }
- for (Map.Entry<String, String> entry : geolocation_field_mapping.entrySet()) {
+ Map<String, String> fieldMap = (Map<String, String>) udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_GEOLOCATION_FIELD_MAPPING.key());
- if (!entry.getKey().equals("COUNTRY") && !entry.getKey().equals("PROVINCE") && !entry.getKey().equals("CITY") && !entry.getKey().equals("LONGITUDE") && !entry.getKey().equals("LATITUDE") && !entry.getKey().equals("ISP") && !entry.getKey().equals("ORGANIZATION")) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters geolocation_field_mapping value is not correct");
- }
- }
- } else {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters geolocation_field_mapping value is not correct");
- }
+ for (Map.Entry<String, String> entry : fieldMap.entrySet()) {
+ if (!GeolocationFieldMapping.isValid(entry.getKey())) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters geolocation_field_mapping value is not correct");
}
}
+
}
}
-
@Override
public String functionName() {
return "GEOIP_LOOKUP";
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java
index 0d2e1ca..098cdef 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java
@@ -28,8 +28,8 @@ public class Hmac implements ScalarFunction {
algorithm = udfContext.getParameters().get("algorithm").toString();
}
this.hMac = new HMac(getHmacAlgorithm(algorithm), secretKey.getBytes());
- this.lookupFieldName = udfContext.getLookup_fields().get(0);
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.lookupFieldName = udfContext.getLookupFields().get(0);
+ this.outputFieldName = udfContext.getOutputFields().get(0);
this.outputFormat = "base64";
if (udfContext.getParameters().containsKey("output_format")) {
this.outputFormat = udfContext.getParameters().get("output_format").toString();
@@ -68,13 +68,13 @@ public class Hmac implements ScalarFunction {
}
private void checkUdfContext(UDFContext udfContext) {
- if (udfContext.getParameters() == null || udfContext.getOutput_fields() == null) {
+ if (udfContext.getParameters() == null || udfContext.getOutputFields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- if (udfContext.getLookup_fields().size() != 1) {
+ if (udfContext.getLookupFields().size() != 1) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value");
}
- if (udfContext.getOutput_fields().size() != 1) {
+ if (udfContext.getOutputFields().size() != 1) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
if (!udfContext.getParameters().containsKey("secret_key")) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java
index 36c6cea..57fe847 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java
@@ -17,20 +17,20 @@ public class JsonExtract implements ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null || udfContext.getOutput_fields()==null || udfContext.getParameters() == null){
+ if(udfContext.getLookupFields()==null || udfContext.getOutputFields()==null || udfContext.getParameters() == null){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- if(udfContext.getLookup_fields().size() != 1){
+ if(udfContext.getLookupFields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value");
}
- if(udfContext.getOutput_fields().size() != 1){
+ if(udfContext.getOutputFields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
if(!udfContext.getParameters().containsKey("value_expression")){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey value_expression");
}
- this.lookupFieldName = udfContext.getLookup_fields().get(0);
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.lookupFieldName = udfContext.getLookupFields().get(0);
+ this.outputFieldName = udfContext.getOutputFields().get(0);
this.expression =udfContext.getParameters().get("value_expression").toString();
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
index 65617ba..0141a46 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
@@ -55,7 +55,7 @@ public class PathCombine implements ScalarFunction {
}
}
}
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.outputFieldName = udfContext.getOutputFields().get(0);
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java
index 42a19e6..b206f3b 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java
@@ -17,15 +17,15 @@ public class SnowflakeId implements Serializable, ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if(udfContext.getOutput_fields()==null || udfContext.getParameters() == null ){
+ if(udfContext.getOutputFields()==null || udfContext.getParameters() == null ){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- if(udfContext.getOutput_fields().size() != 1){
+ if(udfContext.getOutputFields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
String data_center_id_num = udfContext.getParameters().getOrDefault("data_center_id_num","0").toString();//转为数字
snowflakeIdUtils = new SnowflakeIdUtils(Integer.parseInt(data_center_id_num),runtimeContext.getIndexOfThisSubtask());
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.outputFieldName = udfContext.getOutputFields().get(0);
}
@Override
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/StringJoiner.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/StringJoiner.java
index df5c5b4..7e4ab68 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/StringJoiner.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/StringJoiner.java
@@ -21,14 +21,14 @@ public class StringJoiner implements ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null || udfContext.getOutput_fields()==null || udfContext.getParameters() == null){
+ if(udfContext.getLookupFields()==null || udfContext.getOutputFields()==null || udfContext.getParameters() == null){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- if(udfContext.getOutput_fields().size() != 1){
+ if(udfContext.getOutputFields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
- this.lookupFieldNames = udfContext.getLookup_fields();
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.lookupFieldNames = udfContext.getLookupFields();
+ this.outputFieldName = udfContext.getOutputFields().get(0);
this.separator =udfContext.getParameters().getOrDefault("separator","").toString().trim();
this.prefix =udfContext.getParameters().getOrDefault("prefix","").toString().trim();
this.suffix =udfContext.getParameters().getOrDefault("suffix","").toString().trim();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
index bff8f18..62c4dfa 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
@@ -23,13 +23,13 @@ public class UnixTimestampConverter implements ScalarFunction {
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
this.udfContext = udfContext;
- if(udfContext.getLookup_fields()==null || udfContext.getOutput_fields()==null || udfContext.getParameters() == null){
+ if(udfContext.getLookupFields()==null || udfContext.getOutputFields()==null || udfContext.getParameters() == null){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- if(udfContext.getOutput_fields().size() != 1){
+ if(udfContext.getOutputFields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
- if(udfContext.getLookup_fields().size() != 1){
+ if(udfContext.getLookupFields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
if(!udfContext.getParameters().containsKey("precision")){
@@ -51,8 +51,8 @@ public class UnixTimestampConverter implements ScalarFunction {
else{
this.interval = Long.parseLong(udfContext.getParameters().get("interval").toString());
}
- this.lookupFieldName = udfContext.getLookup_fields().get(0);
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.lookupFieldName = udfContext.getLookupFields().get(0);
+ this.outputFieldName = udfContext.getOutputFields().get(0);
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeScalarFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeScalarFunction.java
index 7c4aca2..e54b612 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeScalarFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeScalarFunction.java
@@ -50,14 +50,14 @@ public abstract class AbstractKnowledgeScalarFunction implements ScalarFunction
registerKnowledges();
- this.lookupFieldName = udfContext.getLookup_fields().get(0);
- if (udfContext.getOutput_fields() != null && udfContext.getOutput_fields().size() > 0) {
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.lookupFieldName = udfContext.getLookupFields().get(0);
+ if (udfContext.getOutputFields() != null && udfContext.getOutputFields().size() > 0) {
+ this.outputFieldName = udfContext.getOutputFields().get(0);
}
}
private String buildMetricPrefix(UDFContext udfContext) {
- return functionName().toLowerCase() + "_" + udfContext.getLookup_fields().get(0);
+ return functionName().toLowerCase() + "_" + udfContext.getLookupFields().get(0);
}
protected abstract void registerKnowledges();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/ArrayElementsPrepend.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/ArrayElementsPrepend.java
index a591884..de64073 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/ArrayElementsPrepend.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/ArrayElementsPrepend.java
@@ -24,8 +24,8 @@ public class ArrayElementsPrepend implements ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
this.prefix = udfContext.getParameters().get("prefix").toString();
- this.lookupFieldName = udfContext.getLookup_fields().get(0);
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.lookupFieldName = udfContext.getLookupFields().get(0);
+ this.outputFieldName = udfContext.getOutputFields().get(0);
}
@Override
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/BaseStationLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/BaseStationLookup.java
index d506461..191edd5 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/BaseStationLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/BaseStationLookup.java
@@ -29,23 +29,23 @@ public class BaseStationLookup extends AbstractKnowledgeScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
super.open(runtimeContext, udfContext);
- if (udfContext.getLookup_fields() == null || udfContext.getOutput_fields() == null || udfContext.getParameters() == null) {
+ if (udfContext.getLookupFields() == null || udfContext.getOutputFields() == null || udfContext.getParameters() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
int lookupFieldsSize = 1;
- if (udfContext.getLookup_fields().size() != lookupFieldsSize) {
+ if (udfContext.getLookupFields().size() != lookupFieldsSize) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "lookup_fields must contain field: cell_id");
}
int outputFieldsSize = 2;
- if (udfContext.getOutput_fields().size() != outputFieldsSize) {
+ if (udfContext.getOutputFields().size() != outputFieldsSize) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "output_fields must contain two fields: longitude and latitude");
}
- List<String> lookupFields = udfContext.getLookup_fields();
+ List<String> lookupFields = udfContext.getLookupFields();
cellIdFieldName = lookupFields.get(0);
- List<String> outputFields = udfContext.getOutput_fields();
+ List<String> outputFields = udfContext.getOutputFields();
longitudeFieldName = outputFields.get(0);
latitudeFieldName = outputFields.get(1);
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FieldsMerge.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FieldsMerge.java
index 6cd0c6b..f4338fc 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FieldsMerge.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FieldsMerge.java
@@ -22,8 +22,8 @@ public class FieldsMerge implements ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- this.lookupFieldNames = udfContext.getLookup_fields();
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.lookupFieldNames = udfContext.getLookupFields();
+ this.outputFieldName = udfContext.getOutputFields().get(0);
}
@Override
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/H3CellLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/H3CellLookup.java
index 2698772..7389f4a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/H3CellLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/H3CellLookup.java
@@ -32,12 +32,12 @@ public class H3CellLookup implements ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if (udfContext.getLookup_fields() == null || udfContext.getOutput_fields() == null || udfContext.getParameters() == null) {
+ if (udfContext.getLookupFields() == null || udfContext.getOutputFields() == null || udfContext.getParameters() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
int lookupFieldsSize = 2;
- if (udfContext.getLookup_fields().size() != lookupFieldsSize) {
+ if (udfContext.getLookupFields().size() != lookupFieldsSize) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "lookup_fields must contain two fields: longitude and latitude");
}
@@ -47,10 +47,10 @@ public class H3CellLookup implements ScalarFunction {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must contain key resolution and the value is between 0 and 15");
}
- List<String> lookupFields = udfContext.getLookup_fields();
+ List<String> lookupFields = udfContext.getLookupFields();
longitudeFieldName = lookupFields.get(0);
latitudeFieldName = lookupFields.get(1);
- outputFieldName = udfContext.getOutput_fields().get(0);
+ outputFieldName = udfContext.getOutputFields().get(0);
res = (int) udfContext.getParameters().get("resolution");
try {
h3 = H3Core.newInstance();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
index 230a3ca..8219fbd 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectList.java
@@ -36,12 +36,12 @@ public class CollectList implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if (udfContext.getLookup_fields() == null) {
+ if (udfContext.getLookupFields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupField = udfContext.getLookup_fields().get(0);
- if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
- this.outputField = udfContext.getOutput_fields().get(0);
+ this.lookupField = udfContext.getLookupFields().get(0);
+ if (udfContext.getOutputFields() != null && !udfContext.getOutputFields().isEmpty()) {
+ this.outputField = udfContext.getOutputFields().get(0);
} else {
outputField = lookupField;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
index 708a8a6..c23f0ca 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/CollectSet.java
@@ -22,12 +22,12 @@ public class CollectSet implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if (udfContext.getLookup_fields() == null) {
+ if (udfContext.getLookupFields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupField = udfContext.getLookup_fields().get(0);
- if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
- this.outputField = udfContext.getOutput_fields().get(0);
+ this.lookupField = udfContext.getLookupFields().get(0);
+ if (udfContext.getOutputFields() != null && !udfContext.getOutputFields().isEmpty()) {
+ this.outputField = udfContext.getOutputFields().get(0);
} else {
outputField = lookupField;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java
index c6ba61b..f68448f 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/FirstValue.java
@@ -34,12 +34,12 @@ public class FirstValue implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if (udfContext.getLookup_fields() == null) {
+ if (udfContext.getLookupFields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupField = udfContext.getLookup_fields().get(0);
- if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
- this.outputField = udfContext.getOutput_fields().get(0);
+ this.lookupField = udfContext.getLookupFields().get(0);
+ if (udfContext.getOutputFields() != null && !udfContext.getOutputFields().isEmpty()) {
+ this.outputField = udfContext.getOutputFields().get(0);
} else {
outputField = lookupField;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java
index a099fde..d8656e0 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java
@@ -1,121 +1,121 @@
-package com.geedgenetworks.core.udf.udaf.HdrHistogram;
-
-import com.geedgenetworks.common.Accumulator;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.udf.AggregateFunction;
-import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.sketch.util.StringUtils;
-import org.HdrHistogram.ArrayHistogram;
-import org.HdrHistogram.DirectMapHistogram;
-import org.HdrHistogram.Histogramer;
-import org.apache.commons.collections.CollectionUtils;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-
-public abstract class HdrHistogramBaseAggregate implements AggregateFunction {
- protected String inputField;
- protected String outputField;
- protected boolean inputSketch;
- protected long lowestDiscernibleValue;
- protected long highestTrackableValue;
- protected int numberOfSignificantValueDigits;
- protected boolean autoResize;
-
- @Override
- public void open(UDFContext c) {
- inputField = c.getLookup_fields().get(0);
- if (CollectionUtils.isNotEmpty(c.getOutput_fields())) {
- outputField = c.getOutput_fields().get(0);
- } else {
- outputField = inputField;
- }
- Map<String, Object> params = c.getParameters();
- lowestDiscernibleValue = Long.parseLong(params.getOrDefault("lowestDiscernibleValue", "1").toString());
- highestTrackableValue = Long.parseLong(params.getOrDefault("highestTrackableValue", "2").toString());
- numberOfSignificantValueDigits = Integer.parseInt(params.getOrDefault("numberOfSignificantValueDigits", "1").toString());
- autoResize = Boolean.valueOf(params.getOrDefault("autoResize", "true").toString());
- inputSketch = "sketch".equalsIgnoreCase(params.getOrDefault("input_type", "sketch").toString());
- }
-
- @Override
- public Accumulator initAccumulator(Accumulator acc) {
- return acc;
- }
-
- @Override
- public Accumulator add(Event event, Accumulator acc) {
- Object value = event.getExtractedFields().get(inputField);
- if (value == null) {
- return acc;
- }
-
- if (inputSketch) {
- updateHdrMerge(acc, value);
- } else {
- updateHdr(acc, value);
- }
-
- return acc;
- }
-
- @Override
- public Accumulator merge(Accumulator acc, Accumulator other) {
- Object agg = acc.getMetricsFields().get(outputField);
- Object aggOther = other.getMetricsFields().get(outputField);
- Object rst;
-
- if(agg == null){
- rst = aggOther;
- } else if (aggOther == null) {
- rst = agg;
- }else{
- rst = ((Histogramer)agg).merge(((Histogramer) aggOther));
- }
-
- if(rst != null){
- acc.getMetricsFields().put(outputField, rst);
- }
- return acc;
- }
-
- protected void updateHdr(Accumulator acc, Object value) {
- Map<String, Object> aggs = acc.getMetricsFields();
- ArrayHistogram his = (ArrayHistogram) aggs.get(outputField);
- if (his == null) {
- his = new ArrayHistogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
- his.setAutoResize(autoResize);
- aggs.put(outputField, his);
- }
-
- his.recordValue(((Number) value).longValue());
- }
-
-
- protected void updateHdrMerge(Accumulator acc, Object value) {
- Map<String, Object> aggs = acc.getMetricsFields();
- ArrayHistogram his = (ArrayHistogram) aggs.get(outputField);
- if (his == null) {
- his = new ArrayHistogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
- his.setAutoResize(autoResize);
- aggs.put(outputField, his);
- }
-
- Histogramer h;
- if (value instanceof String) {
- byte[] bytes = StringUtils.decodeBase64(((String) value).getBytes(StandardCharsets.UTF_8));
- h = DirectMapHistogram.wrapBytes(bytes);
- } else if (value instanceof byte[]) {
- h = DirectMapHistogram.wrapBytes((byte[]) value);
- } else if (value instanceof Histogramer) {
- h = (Histogramer) value;
- } else {
- throw new IllegalArgumentException("Unsupported type " + value.getClass());
- }
-
- his.merge(h);
- }
-
- @Override
- public void close() {}
-}
+package com.geedgenetworks.core.udf.udaf.HdrHistogram;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.sketch.util.StringUtils;
+import org.HdrHistogram.ArrayHistogram;
+import org.HdrHistogram.DirectMapHistogram;
+import org.HdrHistogram.Histogramer;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+public abstract class HdrHistogramBaseAggregate implements AggregateFunction {
+ protected String inputField;
+ protected String outputField;
+ protected boolean inputSketch;
+ protected long lowestDiscernibleValue;
+ protected long highestTrackableValue;
+ protected int numberOfSignificantValueDigits;
+ protected boolean autoResize;
+
+ @Override
+ public void open(UDFContext c) {
+ inputField = c.getLookupFields().get(0);
+ if (CollectionUtils.isNotEmpty(c.getOutputFields())) {
+ outputField = c.getOutputFields().get(0);
+ } else {
+ outputField = inputField;
+ }
+ Map<String, Object> params = c.getParameters();
+ lowestDiscernibleValue = Long.parseLong(params.getOrDefault("lowestDiscernibleValue", "1").toString());
+ highestTrackableValue = Long.parseLong(params.getOrDefault("highestTrackableValue", "2").toString());
+ numberOfSignificantValueDigits = Integer.parseInt(params.getOrDefault("numberOfSignificantValueDigits", "1").toString());
+ autoResize = Boolean.valueOf(params.getOrDefault("autoResize", "true").toString());
+ inputSketch = "sketch".equalsIgnoreCase(params.getOrDefault("input_type", "sketch").toString());
+ }
+
+ @Override
+ public Accumulator initAccumulator(Accumulator acc) {
+ return acc;
+ }
+
+ @Override
+ public Accumulator add(Event event, Accumulator acc) {
+ Object value = event.getExtractedFields().get(inputField);
+ if (value == null) {
+ return acc;
+ }
+
+ if (inputSketch) {
+ updateHdrMerge(acc, value);
+ } else {
+ updateHdr(acc, value);
+ }
+
+ return acc;
+ }
+
+ @Override
+ public Accumulator merge(Accumulator acc, Accumulator other) {
+ Object agg = acc.getMetricsFields().get(outputField);
+ Object aggOther = other.getMetricsFields().get(outputField);
+ Object rst;
+
+ if(agg == null){
+ rst = aggOther;
+ } else if (aggOther == null) {
+ rst = agg;
+ }else{
+ rst = ((Histogramer)agg).merge(((Histogramer) aggOther));
+ }
+
+ if(rst != null){
+ acc.getMetricsFields().put(outputField, rst);
+ }
+ return acc;
+ }
+
+ protected void updateHdr(Accumulator acc, Object value) {
+ Map<String, Object> aggs = acc.getMetricsFields();
+ ArrayHistogram his = (ArrayHistogram) aggs.get(outputField);
+ if (his == null) {
+ his = new ArrayHistogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
+ his.setAutoResize(autoResize);
+ aggs.put(outputField, his);
+ }
+
+ his.recordValue(((Number) value).longValue());
+ }
+
+
+ protected void updateHdrMerge(Accumulator acc, Object value) {
+ Map<String, Object> aggs = acc.getMetricsFields();
+ ArrayHistogram his = (ArrayHistogram) aggs.get(outputField);
+ if (his == null) {
+ his = new ArrayHistogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
+ his.setAutoResize(autoResize);
+ aggs.put(outputField, his);
+ }
+
+ Histogramer h;
+ if (value instanceof String) {
+ byte[] bytes = StringUtils.decodeBase64(((String) value).getBytes(StandardCharsets.UTF_8));
+ h = DirectMapHistogram.wrapBytes(bytes);
+ } else if (value instanceof byte[]) {
+ h = DirectMapHistogram.wrapBytes((byte[]) value);
+ } else if (value instanceof Histogramer) {
+ h = (Histogramer) value;
+ } else {
+ throw new IllegalArgumentException("Unsupported type " + value.getClass());
+ }
+
+ his.merge(h);
+ }
+
+ @Override
+ public void close() {}
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java
index e7435eb..f319e8d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LastValue.java
@@ -34,12 +34,12 @@ public class LastValue implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if (udfContext.getLookup_fields() == null) {
+ if (udfContext.getLookupFields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupField = udfContext.getLookup_fields().get(0);
- if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
- this.outputField = udfContext.getOutput_fields().get(0);
+ this.lookupField = udfContext.getLookupFields().get(0);
+ if (udfContext.getOutputFields() != null && !udfContext.getOutputFields().isEmpty()) {
+ this.outputField = udfContext.getOutputFields().get(0);
} else {
outputField = lookupField;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java
index bf5260e..418eb9c 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/LongCount.java
@@ -14,10 +14,10 @@ public class LongCount implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if (udfContext.getOutput_fields() == null) {
+ if (udfContext.getOutputFields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- outputField = udfContext.getOutput_fields().get(0);
+ outputField = udfContext.getOutputFields().get(0);
}
@Override
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Max.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Max.java
index e4632bb..226df0a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Max.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Max.java
@@ -20,12 +20,12 @@ public class Max implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if (udfContext.getLookup_fields() == null) {
+ if (udfContext.getLookupFields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupField = udfContext.getLookup_fields().get(0);
- if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
- this.outputField = udfContext.getOutput_fields().get(0);
+ this.lookupField = udfContext.getLookupFields().get(0);
+ if (udfContext.getOutputFields() != null && !udfContext.getOutputFields().isEmpty()) {
+ this.outputField = udfContext.getOutputFields().get(0);
} else {
outputField = lookupField;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
index 9c4e070..88f693f 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Mean.java
@@ -20,12 +20,12 @@ public class Mean implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if (udfContext.getLookup_fields() == null) {
+ if (udfContext.getLookupFields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- lookupField = udfContext.getLookup_fields().get(0);
- if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
- outputField = udfContext.getOutput_fields().get(0);
+ lookupField = udfContext.getLookupFields().get(0);
+ if (udfContext.getOutputFields() != null && !udfContext.getOutputFields().isEmpty()) {
+ outputField = udfContext.getOutputFields().get(0);
} else {
outputField = lookupField;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Min.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Min.java
index 70153c9..6fd7046 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Min.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/Min.java
@@ -20,12 +20,12 @@ public class Min implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if (udfContext.getLookup_fields() == null) {
+ if (udfContext.getLookupFields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupField = udfContext.getLookup_fields().get(0);
- if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
- this.outputField = udfContext.getOutput_fields().get(0);
+ this.lookupField = udfContext.getLookupFields().get(0);
+ if (udfContext.getOutputFields() != null && !udfContext.getOutputFields().isEmpty()) {
+ this.outputField = udfContext.getOutputFields().get(0);
} else {
outputField = lookupField;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
index 031d161..ab8f744 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/NumberSum.java
@@ -15,12 +15,12 @@ public class NumberSum implements AggregateFunction {
@Override
public void open(UDFContext udfContext) {
- if (udfContext.getLookup_fields() == null) {
+ if (udfContext.getLookupFields() == null) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- lookupField = udfContext.getLookup_fields().get(0);
- if (udfContext.getOutput_fields() != null && !udfContext.getOutput_fields().isEmpty()) {
- outputField = udfContext.getOutput_fields().get(0);
+ lookupField = udfContext.getLookupFields().get(0);
+ if (udfContext.getOutputFields() != null && !udfContext.getOutputFields().isEmpty()) {
+ outputField = udfContext.getOutputFields().get(0);
} else {
outputField = lookupField;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java
index 0802c22..c113c4a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java
@@ -1,126 +1,126 @@
-package com.geedgenetworks.core.udf.udaf.hlld;
-
-import com.geedgenetworks.common.Accumulator;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.udf.AggregateFunction;
-import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.sketch.hlld.Hll;
-import com.geedgenetworks.sketch.hlld.HllUnion;
-import com.geedgenetworks.sketch.hlld.HllUtils;
-import org.apache.commons.collections.CollectionUtils;
-
-import java.util.Map;
-
-public abstract class HlldBaseAggregate implements AggregateFunction {
- protected String inputField;
- protected String outputField;
- protected boolean inputSketch;
- protected int precision = 12;
-
- @Override
- public void open(UDFContext c) {
- inputField = c.getLookup_fields().get(0);
- if (CollectionUtils.isNotEmpty(c.getOutput_fields())) {
- outputField = c.getOutput_fields().get(0);
- } else {
- outputField = inputField;
- }
- Map<String, Object> params = c.getParameters();
- precision = Integer.parseInt(params.getOrDefault("precision", "12").toString());
- inputSketch = "sketch".equalsIgnoreCase(params.getOrDefault("input_type", "sketch").toString());
- }
-
- @Override
- public Accumulator initAccumulator(Accumulator acc) {
- return acc;
- }
-
- @Override
- public Accumulator add(Event event, Accumulator acc) {
- Object value = event.getExtractedFields().get(inputField);
- if (value == null) {
- return acc;
- }
-
- if (inputSketch) {
- updateHllUnion(acc, value);
- } else {
- updateHll(acc, value);
- }
-
- return acc;
- }
-
- @Override
- public Accumulator merge(Accumulator acc, Accumulator other) {
- Object agg = acc.getMetricsFields().get(outputField);
- Object aggOther = other.getMetricsFields().get(outputField);
- Object rst;
-
- if(agg == null){
- rst = aggOther;
- } else if (aggOther == null) {
- rst = agg;
- }else{
- if(inputSketch){
- ((HllUnion)agg).update(((HllUnion) aggOther).getResult());
- rst = agg;
- }else{
- final HllUnion union = new HllUnion(precision);
- union.update((Hll) agg);
- union.update((Hll) aggOther);
- rst = union.getResult();
- }
- }
-
- if(rst != null){
- acc.getMetricsFields().put(outputField, rst);
- }
- return acc;
- }
-
- protected Hll getResultHll(Accumulator acc){
- Object agg = acc.getMetricsFields().get(outputField);
- if (agg == null) {
- return null;
- }
-
- return inputSketch ? ((HllUnion) agg).getResult() : (Hll) agg;
- }
-
- protected void updateHll(Accumulator acc, Object value) {
- Map<String, Object> aggs = acc.getMetricsFields();
- Hll hll = (Hll) aggs.get(outputField);
- if (hll == null) {
- hll = new Hll(precision);
- aggs.put(outputField, hll);
- }
-
- if (value instanceof Integer || value instanceof Long) {
- hll.add(((Number) value).longValue());
- } else if (value instanceof Float || value instanceof Double) {
- hll.add(((Number) value).doubleValue());
- } else if (value instanceof String) {
- hll.add((String) value);
- } else if (value instanceof byte[]) {
- hll.add((byte[]) value);
- } else {
- throw new IllegalArgumentException("Unsupported type " + value.getClass());
- }
- }
-
- protected void updateHllUnion(Accumulator acc, Object value) {
- Map<String, Object> aggs = acc.getMetricsFields();
- HllUnion hllUnion = (HllUnion) aggs.get(outputField);
- if (hllUnion == null) {
- hllUnion = new HllUnion(precision);
- aggs.put(outputField, hllUnion);
- }
-
- Hll hll = HllUtils.deserializeHll(value);
- hllUnion.update(hll);
- }
-
- @Override
- public void close() {}
-}
+package com.geedgenetworks.core.udf.udaf.hlld;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.sketch.hlld.Hll;
+import com.geedgenetworks.sketch.hlld.HllUnion;
+import com.geedgenetworks.sketch.hlld.HllUtils;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.Map;
+
+public abstract class HlldBaseAggregate implements AggregateFunction {
+ protected String inputField;
+ protected String outputField;
+ protected boolean inputSketch;
+ protected int precision = 12;
+
+ @Override
+ public void open(UDFContext c) {
+ inputField = c.getLookupFields().get(0);
+ if (CollectionUtils.isNotEmpty(c.getOutputFields())) {
+ outputField = c.getOutputFields().get(0);
+ } else {
+ outputField = inputField;
+ }
+ Map<String, Object> params = c.getParameters();
+ precision = Integer.parseInt(params.getOrDefault("precision", "12").toString());
+ inputSketch = "sketch".equalsIgnoreCase(params.getOrDefault("input_type", "sketch").toString());
+ }
+
+ @Override
+ public Accumulator initAccumulator(Accumulator acc) {
+ return acc;
+ }
+
+ @Override
+ public Accumulator add(Event event, Accumulator acc) {
+ Object value = event.getExtractedFields().get(inputField);
+ if (value == null) {
+ return acc;
+ }
+
+ if (inputSketch) {
+ updateHllUnion(acc, value);
+ } else {
+ updateHll(acc, value);
+ }
+
+ return acc;
+ }
+
+ @Override
+ public Accumulator merge(Accumulator acc, Accumulator other) {
+ Object agg = acc.getMetricsFields().get(outputField);
+ Object aggOther = other.getMetricsFields().get(outputField);
+ Object rst;
+
+ if(agg == null){
+ rst = aggOther;
+ } else if (aggOther == null) {
+ rst = agg;
+ }else{
+ if(inputSketch){
+ ((HllUnion)agg).update(((HllUnion) aggOther).getResult());
+ rst = agg;
+ }else{
+ final HllUnion union = new HllUnion(precision);
+ union.update((Hll) agg);
+ union.update((Hll) aggOther);
+ rst = union.getResult();
+ }
+ }
+
+ if(rst != null){
+ acc.getMetricsFields().put(outputField, rst);
+ }
+ return acc;
+ }
+
+ protected Hll getResultHll(Accumulator acc){
+ Object agg = acc.getMetricsFields().get(outputField);
+ if (agg == null) {
+ return null;
+ }
+
+ return inputSketch ? ((HllUnion) agg).getResult() : (Hll) agg;
+ }
+
+ protected void updateHll(Accumulator acc, Object value) {
+ Map<String, Object> aggs = acc.getMetricsFields();
+ Hll hll = (Hll) aggs.get(outputField);
+ if (hll == null) {
+ hll = new Hll(precision);
+ aggs.put(outputField, hll);
+ }
+
+ if (value instanceof Integer || value instanceof Long) {
+ hll.add(((Number) value).longValue());
+ } else if (value instanceof Float || value instanceof Double) {
+ hll.add(((Number) value).doubleValue());
+ } else if (value instanceof String) {
+ hll.add((String) value);
+ } else if (value instanceof byte[]) {
+ hll.add((byte[]) value);
+ } else {
+ throw new IllegalArgumentException("Unsupported type " + value.getClass());
+ }
+ }
+
+ protected void updateHllUnion(Accumulator acc, Object value) {
+ Map<String, Object> aggs = acc.getMetricsFields();
+ HllUnion hllUnion = (HllUnion) aggs.get(outputField);
+ if (hllUnion == null) {
+ hllUnion = new HllUnion(precision);
+ aggs.put(outputField, hllUnion);
+ }
+
+ Hll hll = HllUtils.deserializeHll(value);
+ hllUnion.update(hll);
+ }
+
+ @Override
+ public void close() {}
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/JsonUnroll.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/JsonUnroll.java
index 012c2df..de89e2c 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/JsonUnroll.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/JsonUnroll.java
@@ -34,12 +34,12 @@ public class JsonUnroll implements TableFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null ){
+ if(udfContext.getLookupFields()==null ){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupFieldName = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.lookupFieldName = udfContext.getLookupFields().get(0);
+ if(udfContext.getOutputFields()!=null && !udfContext.getOutputFields().isEmpty()) {
+ this.outputFieldName = udfContext.getOutputFields().get(0);
}
else {
outputFieldName = lookupFieldName;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/PathUnroll.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/PathUnroll.java
index e6514a2..e5732e0 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/PathUnroll.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/PathUnroll.java
@@ -1,118 +1,118 @@
-package com.geedgenetworks.core.udf.udtf;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.udf.TableFunction;
-import com.geedgenetworks.common.udf.UDFContext;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.util.Preconditions;
-
-import java.util.*;
-
-public class PathUnroll implements TableFunction {
- private String pathField;
- private String fileField;
- private char separator;
-
- private String outputPathField;
- private String outputFileField;
- private List<Event> events;
-
- @Override
- public void open(RuntimeContext runtimeContext, UDFContext c) {
- Preconditions.checkArgument(c.getLookup_fields().size() >= 1, "input fields requested one path param at least");
- Preconditions.checkArgument(CollectionUtils.isEmpty(c.getOutput_fields()) || c.getOutput_fields().size() == c.getOutput_fields().size(), "output fields requested same count param with input fields");
- pathField = c.getLookup_fields().get(0);
- fileField = c.getLookup_fields().size() == 1? null: c.getLookup_fields().get(1);
-
- outputPathField = CollectionUtils.isEmpty(c.getOutput_fields())? pathField : c.getOutput_fields().get(0);
- outputFileField = CollectionUtils.isEmpty(c.getOutput_fields()) || c.getLookup_fields().size() == 1 ? fileField : c.getOutput_fields().get(1);
- Map<String, Object> params = c.getParameters() == null? Collections.EMPTY_MAP:c.getParameters();
- String sep = params.getOrDefault("separator", "/").toString();
- Preconditions.checkArgument(sep.length() == 1, "separator mush has one char");
- separator = sep.charAt(0);
- events = new ArrayList<>();
- }
-
- @Override
- public List<Event> evaluate(Event event) {
- Map<String, Object> map = event.getExtractedFields();
- String p = (String) map.get(pathField);
- // 去除path结尾的分隔符
- final String path = StringUtils.isBlank(p)? null: (separator != p.charAt(p.length() - 1) ? p: p.substring(0, p.length() - 1));
- final String fileName = fileField == null? null: (String) map.get(fileField);
-
- if (StringUtils.isBlank(path)) {
- return Collections.emptyList();
- }
-
- if(events.size() > 100){
- events = new ArrayList<>();
- }else if(events.size() > 0){
- events.clear();
- }
- Event e;
- Map<String, Object> fields;
-
- // 拆分path
- int index = path.indexOf(separator);
- String subPath;
- while (index > 0) {
- subPath = path.substring(0, index);
- e = new Event();
- fields = new HashMap<>(map);
- fields.put(outputPathField, subPath);
- if(outputFileField != null){
- fields.put(outputFileField, null);
- }
- e.setExtractedFields(fields);
- events.add(e);
- index = path.indexOf(separator, index + 1);
- }
- boolean hasFile = StringUtils.isNotBlank(fileName);
- boolean pathContainsFile = hasFile && path.endsWith(fileName);
-
- if(!hasFile){
- e = new Event();
- fields = new HashMap<>(map);
- fields.put(outputPathField, path);
- if(outputFileField != null){
- fields.put(outputFileField, null);
- }
- e.setExtractedFields(fields);
- events.add(e);
- }else{
- e = new Event();
- fields = new HashMap<>(map);
- fields.put(outputPathField, path);
- if(outputFileField != null){
- fields.put(outputFileField, pathContainsFile? fileName:null);
- }
- e.setExtractedFields(fields);
- events.add(e);
-
- // 输出path + file
- if(!pathContainsFile){
- e = new Event();
- fields = new HashMap<>(map);
- fields.put(outputPathField, path + separator + fileName);
- if(outputFileField != null){
- fields.put(outputFileField, fileName);
- }
- e.setExtractedFields(fields);
- events.add(e);
- }
- }
-
- return events;
- }
-
- @Override
- public void close() {}
-
- @Override
- public String functionName() {
- return "PATH_UNROLL";
- }
-}
+package com.geedgenetworks.core.udf.udtf;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.TableFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.util.Preconditions;
+
+import java.util.*;
+
+public class PathUnroll implements TableFunction {
+ private String pathField;
+ private String fileField;
+ private char separator;
+
+ private String outputPathField;
+ private String outputFileField;
+ private List<Event> events;
+
+ @Override
+ public void open(RuntimeContext runtimeContext, UDFContext c) {
+ Preconditions.checkArgument(c.getLookupFields().size() >= 1, "input fields requested one path param at least");
+ Preconditions.checkArgument(CollectionUtils.isEmpty(c.getOutputFields()) || c.getOutputFields().size() == c.getOutputFields().size(), "output fields requested same count param with input fields");
+ pathField = c.getLookupFields().get(0);
+ fileField = c.getLookupFields().size() == 1? null: c.getLookupFields().get(1);
+
+ outputPathField = CollectionUtils.isEmpty(c.getOutputFields())? pathField : c.getOutputFields().get(0);
+ outputFileField = CollectionUtils.isEmpty(c.getOutputFields()) || c.getLookupFields().size() == 1 ? fileField : c.getOutputFields().get(1);
+ Map<String, Object> params = c.getParameters() == null? Collections.EMPTY_MAP:c.getParameters();
+ String sep = params.getOrDefault("separator", "/").toString();
+ Preconditions.checkArgument(sep.length() == 1, "separator mush has one char");
+ separator = sep.charAt(0);
+ events = new ArrayList<>();
+ }
+
+ @Override
+ public List<Event> evaluate(Event event) {
+ Map<String, Object> map = event.getExtractedFields();
+ String p = (String) map.get(pathField);
+ // 去除path结尾的分隔符
+ final String path = StringUtils.isBlank(p)? null: (separator != p.charAt(p.length() - 1) ? p: p.substring(0, p.length() - 1));
+ final String fileName = fileField == null? null: (String) map.get(fileField);
+
+ if (StringUtils.isBlank(path)) {
+ return Collections.emptyList();
+ }
+
+ if(events.size() > 100){
+ events = new ArrayList<>();
+ }else if(events.size() > 0){
+ events.clear();
+ }
+ Event e;
+ Map<String, Object> fields;
+
+ // 拆分path
+ int index = path.indexOf(separator);
+ String subPath;
+ while (index > 0) {
+ subPath = path.substring(0, index);
+ e = new Event();
+ fields = new HashMap<>(map);
+ fields.put(outputPathField, subPath);
+ if(outputFileField != null){
+ fields.put(outputFileField, null);
+ }
+ e.setExtractedFields(fields);
+ events.add(e);
+ index = path.indexOf(separator, index + 1);
+ }
+ boolean hasFile = StringUtils.isNotBlank(fileName);
+ boolean pathContainsFile = hasFile && path.endsWith(fileName);
+
+ if(!hasFile){
+ e = new Event();
+ fields = new HashMap<>(map);
+ fields.put(outputPathField, path);
+ if(outputFileField != null){
+ fields.put(outputFileField, null);
+ }
+ e.setExtractedFields(fields);
+ events.add(e);
+ }else{
+ e = new Event();
+ fields = new HashMap<>(map);
+ fields.put(outputPathField, path);
+ if(outputFileField != null){
+ fields.put(outputFileField, pathContainsFile? fileName:null);
+ }
+ e.setExtractedFields(fields);
+ events.add(e);
+
+ // 输出path + file
+ if(!pathContainsFile){
+ e = new Event();
+ fields = new HashMap<>(map);
+ fields.put(outputPathField, path + separator + fileName);
+ if(outputFileField != null){
+ fields.put(outputFileField, fileName);
+ }
+ e.setExtractedFields(fields);
+ events.add(e);
+ }
+ }
+
+ return events;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public String functionName() {
+ return "PATH_UNROLL";
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/Unroll.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/Unroll.java
index 9fd5a6e..ff4a9ae 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/Unroll.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/Unroll.java
@@ -21,12 +21,12 @@ public class Unroll implements TableFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null ){
+ if(udfContext.getLookupFields()==null ){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- this.lookupFieldName = udfContext.getLookup_fields().get(0);
- if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.lookupFieldName = udfContext.getLookupFields().get(0);
+ if(udfContext.getOutputFields()!=null && !udfContext.getOutputFields().isEmpty()) {
+ this.outputFieldName = udfContext.getOutputFields().get(0);
}
else {
outputFieldName = lookupFieldName;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java
index 1ce65bc..4e9a031 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java
@@ -17,13 +17,13 @@ public class UUID implements ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if(udfContext.getOutput_fields()==null ){
+ if(udfContext.getOutputFields()==null ){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- if(udfContext.getOutput_fields().size() != 1){
+ if(udfContext.getOutputFields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.outputFieldName = udfContext.getOutputFields().get(0);
this.randomBasedGenerator = Generators.randomBasedGenerator();
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java
index b4ad808..3a433b8 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java
@@ -21,18 +21,18 @@ public class UUIDv5 implements ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if(udfContext.getOutput_fields() == null || udfContext.getParameters() == null || udfContext.getLookup_fields() == null){
+ if(udfContext.getOutputFields() == null || udfContext.getParameters() == null || udfContext.getLookupFields() == null){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- if(udfContext.getOutput_fields().size() != 1){
+ if(udfContext.getOutputFields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
if(!udfContext.getParameters().containsKey(NAMESPACE_KEY) ){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Parameters must contain key: " + NAMESPACE_KEY);
}
- this.outputFieldName = udfContext.getOutput_fields().get(0);
- this.lookupFieldNames = udfContext.getLookup_fields();
+ this.outputFieldName = udfContext.getOutputFields().get(0);
+ this.lookupFieldNames = udfContext.getLookupFields();
String namespace = udfContext.getParameters().get(NAMESPACE_KEY).toString();
this.nameBasedGenerator = Generators.nameBasedGenerator(UUIDNameSpace.getUUID(namespace));
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java
index 49025ef..60c388f 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java
@@ -18,13 +18,13 @@ public class UUIDv7 implements ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if(udfContext.getOutput_fields()==null ){
+ if(udfContext.getOutputFields()==null ){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
- if(udfContext.getOutput_fields().size() != 1){
+ if(udfContext.getOutputFields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
- this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.outputFieldName = udfContext.getOutputFields().get(0);
this.timeBasedEpochRandomGenerator = Generators.timeBasedEpochGenerator();
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AnonymityLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AnonymityLookupTest.java
index a52deb1..ae74fea 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AnonymityLookupTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AnonymityLookupTest.java
@@ -37,8 +37,8 @@ class AnonymityLookupTest {
parameters.put("kb_name", kbName);
parameters.put("option", "IP_TO_NODE_TYPE");
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("server_ip"));
- udfContext.setOutput_fields(Collections.singletonList("server_node_type"));
+ udfContext.setLookupFields(Collections.singletonList("server_ip"));
+ udfContext.setOutputFields(Collections.singletonList("server_node_type"));
anonymityLookup.open(runtimeContext, udfContext);
Event event = new Event();
@@ -56,8 +56,8 @@ class AnonymityLookupTest {
parameters.put("kb_name", kbName);
parameters.put("option", "DOMAIN_TO_NODE_TYPE");
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("domain"));
- udfContext.setOutput_fields(Collections.singletonList("domain_node_type"));
+ udfContext.setLookupFields(Collections.singletonList("domain"));
+ udfContext.setOutputFields(Collections.singletonList("domain_node_type"));
anonymityLookup.open(runtimeContext, udfContext);
Event event = new Event();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AppCategoryLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AppCategoryLookupTest.java
index f74ce39..713be3f 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AppCategoryLookupTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AppCategoryLookupTest.java
@@ -30,7 +30,7 @@ class AppCategoryLookupTest {
fieldMapping.put("COMPANY_CATEGORY", "app_company_category");
parameters.put("field_mapping", fieldMapping);
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("app"));
+ udfContext.setLookupFields(Collections.singletonList("app"));
RuntimeContext runtimeContext = mockRuntimeContext();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/BaseStationLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/BaseStationLookupTest.java
index db52b3e..43b0bd5 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/BaseStationLookupTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/BaseStationLookupTest.java
@@ -38,8 +38,8 @@ class BaseStationLookupTest {
Map<String, Object> parameters = new HashMap<>();
parameters.put("kb_name", kbName);
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Arrays.asList("cell_id"));
- udfContext.setOutput_fields(Arrays.asList("subscriber_longitude", "subscriber_latitude"));
+ udfContext.setLookupFields(Arrays.asList("cell_id"));
+ udfContext.setOutputFields(Arrays.asList("subscriber_longitude", "subscriber_latitude"));
RuntimeContext runtimeContext = mockRuntimeContext();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookupTest.java
index 7f526d5..d1cca09 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookupTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookupTest.java
@@ -25,8 +25,8 @@ class DnsServerInfoLookupTest {
Map<String, Object> parameters = new HashMap<>();
parameters.put("kb_name", kbName);
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("server_ip"));
- udfContext.setOutput_fields(Collections.singletonList("server_dns_server"));
+ udfContext.setLookupFields(Collections.singletonList("server_ip"));
+ udfContext.setOutputFields(Collections.singletonList("server_dns_server"));
RuntimeContext runtimeContext = mockRuntimeContext();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookupTest.java
index db15642..0e64982 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookupTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookupTest.java
@@ -29,7 +29,7 @@ class FqdnCategoryLookupTest {
fieldMapping.put("REPUTATION_LEVEL", "domain_reputation_level");
parameters.put("field_mapping", fieldMapping);
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("domain"));
+ udfContext.setLookupFields(Collections.singletonList("domain"));
RuntimeContext runtimeContext = mockRuntimeContext();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookupTest.java
index 42a98dc..93ee663 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookupTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookupTest.java
@@ -24,8 +24,8 @@ class FqdnWhoisLookupTest {
Map<String, Object> parameters = new HashMap<>();
parameters.put("kb_name", kbName);
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("domain"));
- udfContext.setOutput_fields(Collections.singletonList("domain_whois_org"));
+ udfContext.setLookupFields(Collections.singletonList("domain"));
+ udfContext.setOutputFields(Collections.singletonList("domain_whois_org"));
RuntimeContext runtimeContext = mockRuntimeContext();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/H3CellLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/H3CellLookupTest.java
index 641f58a..a7b98ab 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/H3CellLookupTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/H3CellLookupTest.java
@@ -36,8 +36,8 @@ public class H3CellLookupTest {
* resolution: 9
*/
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(Arrays.asList("subscriber_longitude", "subscriber_latitude"));
- udfContext.setOutput_fields(Arrays.asList("first_location"));
+ udfContext.setLookupFields(Arrays.asList("subscriber_longitude", "subscriber_latitude"));
+ udfContext.setOutputFields(Arrays.asList("first_location"));
Map<String, Object> parameters = new HashMap<>();
parameters.put("resolution", 9);
udfContext.setParameters(parameters);
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IcpLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IcpLookupTest.java
index 3158124..c2032e0 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IcpLookupTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IcpLookupTest.java
@@ -24,8 +24,8 @@ class IcpLookupTest {
Map<String, Object> parameters = new HashMap<>();
parameters.put("kb_name", kbName);
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("domain"));
- udfContext.setOutput_fields(Collections.singletonList("domain_icp_company_name"));
+ udfContext.setLookupFields(Collections.singletonList("domain"));
+ udfContext.setOutputFields(Collections.singletonList("domain_icp_company_name"));
RuntimeContext runtimeContext = mockRuntimeContext();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IdcRenterLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IdcRenterLookupTest.java
index b15096b..7409a2f 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IdcRenterLookupTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IdcRenterLookupTest.java
@@ -24,8 +24,8 @@ class IdcRenterLookupTest {
Map<String, Object> parameters = new HashMap<>();
parameters.put("kb_name", kbName);
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("server_ip"));
- udfContext.setOutput_fields(Collections.singletonList("server_idc_renter"));
+ udfContext.setLookupFields(Collections.singletonList("server_ip"));
+ udfContext.setOutputFields(Collections.singletonList("server_idc_renter"));
RuntimeContext runtimeContext = mockRuntimeContext();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java
index 9275812..316e4d1 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java
@@ -40,8 +40,8 @@ public class IntelligenceIndicatorLookupTest {
parameters.put("kb_name", kbName);
parameters.put("option", "IP_TO_TAG");
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("server_ip"));
- udfContext.setOutput_fields(Collections.singletonList("server_ip_tags"));
+ udfContext.setLookupFields(Collections.singletonList("server_ip"));
+ udfContext.setOutputFields(Collections.singletonList("server_ip_tags"));
intelligenceIndicatorLookup.open(runtimeContext, udfContext);
Event event = new Event();
@@ -59,8 +59,8 @@ public class IntelligenceIndicatorLookupTest {
parameters.put("kb_name", kbName);
parameters.put("option", "IP_TO_TAG");
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("server_ip"));
- udfContext.setOutput_fields(Collections.singletonList("server_ip_tags"));
+ udfContext.setLookupFields(Collections.singletonList("server_ip"));
+ udfContext.setOutputFields(Collections.singletonList("server_ip_tags"));
intelligenceIndicatorLookup.open(runtimeContext, udfContext);
Event event = new Event();
@@ -82,8 +82,8 @@ public class IntelligenceIndicatorLookupTest {
parameters.put("kb_name", kbName);
parameters.put("option", "DOMAIN_TO_TAG");
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("domain"));
- udfContext.setOutput_fields(Collections.singletonList("domain_tags"));
+ udfContext.setLookupFields(Collections.singletonList("domain"));
+ udfContext.setOutputFields(Collections.singletonList("domain_tags"));
intelligenceIndicatorLookup.open(runtimeContext, udfContext);
Event event = new Event();
@@ -101,8 +101,8 @@ public class IntelligenceIndicatorLookupTest {
parameters.put("kb_name", kbName);
parameters.put("option", "DOMAIN_TO_TAG");
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("domain"));
- udfContext.setOutput_fields(Collections.singletonList("domain_tags"));
+ udfContext.setLookupFields(Collections.singletonList("domain"));
+ udfContext.setOutputFields(Collections.singletonList("domain_tags"));
intelligenceIndicatorLookup.open(runtimeContext, udfContext);
Event event = new Event();
@@ -124,8 +124,8 @@ public class IntelligenceIndicatorLookupTest {
parameters.put("kb_name", kbName);
parameters.put("option", "DOMAIN_TO_TAG");
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("domain"));
- udfContext.setOutput_fields(Collections.singletonList("domain_tags"));
+ udfContext.setLookupFields(Collections.singletonList("domain"));
+ udfContext.setOutputFields(Collections.singletonList("domain_tags"));
intelligenceIndicatorLookup.open(runtimeContext, udfContext);
Event event = new Event();
@@ -143,8 +143,8 @@ public class IntelligenceIndicatorLookupTest {
parameters.put("kb_name", kbName);
parameters.put("option", "IP_TO_TAG");
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("server_ip"));
- udfContext.setOutput_fields(Collections.singletonList("server_ip_tags"));
+ udfContext.setLookupFields(Collections.singletonList("server_ip"));
+ udfContext.setOutputFields(Collections.singletonList("server_ip_tags"));
intelligenceIndicatorLookup.open(runtimeContext, udfContext);
Event event = new Event();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IocLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IocLookupTest.java
index f9d3b25..8c01bc7 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IocLookupTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IocLookupTest.java
@@ -37,8 +37,8 @@ class IocLookupTest {
parameters.put("kb_name", kbName);
parameters.put("option", "IP_TO_MALWARE");
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("server_ip"));
- udfContext.setOutput_fields(Collections.singletonList("server_malware"));
+ udfContext.setLookupFields(Collections.singletonList("server_ip"));
+ udfContext.setOutputFields(Collections.singletonList("server_malware"));
iocLookup.open(runtimeContext, udfContext);
Event event = new Event();
@@ -56,8 +56,8 @@ class IocLookupTest {
parameters.put("kb_name", kbName);
parameters.put("option", "DOMAIN_TO_MALWARE");
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("domain"));
- udfContext.setOutput_fields(Collections.singletonList("domain_malware"));
+ udfContext.setLookupFields(Collections.singletonList("domain"));
+ udfContext.setOutputFields(Collections.singletonList("domain_malware"));
iocLookup.open(runtimeContext, udfContext);
Event event = new Event();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IpZoneLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IpZoneLookupTest.java
index abe5ba0..e3024b5 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IpZoneLookupTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IpZoneLookupTest.java
@@ -24,8 +24,8 @@ class IpZoneLookupTest {
Map<String, Object> parameters = new HashMap<>();
parameters.put("kb_name", kbName);
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("client_ip"));
- udfContext.setOutput_fields(Collections.singletonList("client_zone"));
+ udfContext.setLookupFields(Collections.singletonList("client_ip"));
+ udfContext.setOutputFields(Collections.singletonList("client_zone"));
RuntimeContext runtimeContext = mockRuntimeContext();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookupTest.java
index c0a06fe..4f2f551 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookupTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookupTest.java
@@ -24,8 +24,8 @@ class LinkDirectionLookupTest {
Map<String, Object> parameters = new HashMap<>();
parameters.put("kb_name", kbName);
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("in_link_id"));
- udfContext.setOutput_fields(Collections.singletonList("in_link_direction"));
+ udfContext.setLookupFields(Collections.singletonList("in_link_id"));
+ udfContext.setOutputFields(Collections.singletonList("in_link_direction"));
RuntimeContext runtimeContext = mockRuntimeContext();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookupTest.java
index 9688199..a586aeb 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookupTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookupTest.java
@@ -37,8 +37,8 @@ class UserDefineTagLookupTest {
parameters.put("kb_name", kbName);
parameters.put("option", "IP_TO_TAG");
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("client_ip"));
- udfContext.setOutput_fields(Collections.singletonList("client_ip_tags"));
+ udfContext.setLookupFields(Collections.singletonList("client_ip"));
+ udfContext.setOutputFields(Collections.singletonList("client_ip_tags"));
userDefineTagLookup.open(runtimeContext, udfContext);
Event event = new Event();
@@ -59,8 +59,8 @@ class UserDefineTagLookupTest {
parameters.put("kb_name", kbName);
parameters.put("option", "DOMAIN_TO_TAG");
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("domain"));
- udfContext.setOutput_fields(Collections.singletonList("domain_tags"));
+ udfContext.setLookupFields(Collections.singletonList("domain"));
+ udfContext.setOutputFields(Collections.singletonList("domain_tags"));
userDefineTagLookup.open(runtimeContext, udfContext);
Event event = new Event();
@@ -81,8 +81,8 @@ class UserDefineTagLookupTest {
parameters.put("kb_name", kbName);
parameters.put("option", "DOMAIN_TO_TAG");
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("domain"));
- udfContext.setOutput_fields(Collections.singletonList("domain_tags"));
+ udfContext.setLookupFields(Collections.singletonList("domain"));
+ udfContext.setOutputFields(Collections.singletonList("domain_tags"));
userDefineTagLookup.open(runtimeContext, udfContext);
Event event = new Event();
@@ -103,8 +103,8 @@ class UserDefineTagLookupTest {
parameters.put("kb_name", kbName);
parameters.put("option", "APP_TO_TAG");
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("app"));
- udfContext.setOutput_fields(Collections.singletonList("app_tags"));
+ udfContext.setLookupFields(Collections.singletonList("app"));
+ udfContext.setOutputFields(Collections.singletonList("app_tags"));
userDefineTagLookup.open(runtimeContext, udfContext);
Event event = new Event();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/VpnLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/VpnLookupTest.java
index 3dd0992..50374f7 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/VpnLookupTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/VpnLookupTest.java
@@ -30,8 +30,8 @@ class VpnLookupTest {
parameters.put("kb_name", kbName);
parameters.put("option", "IP_TO_VPN");
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("server_ip"));
- udfContext.setOutput_fields(Collections.singletonList("server_vpn_service_name"));
+ udfContext.setLookupFields(Collections.singletonList("server_ip"));
+ udfContext.setOutputFields(Collections.singletonList("server_vpn_service_name"));
RuntimeContext runtimeContext = mockRuntimeContext();
@@ -55,8 +55,8 @@ class VpnLookupTest {
parameters.put("kb_name", kbName);
parameters.put("option", "DOMAIN_TO_VPN");
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("domain"));
- udfContext.setOutput_fields(Collections.singletonList("domain_vpn_service_name"));
+ udfContext.setLookupFields(Collections.singletonList("domain"));
+ udfContext.setOutputFields(Collections.singletonList("domain_vpn_service_name"));
RuntimeContext runtimeContext = mockRuntimeContext();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java
index 1e5c0a3..f10fe2b 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java
@@ -26,8 +26,8 @@ public class AsnLookupFunctionTest {
udfContext = new UDFContext();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("ip"));
- udfContext.setOutput_fields(Collections.singletonList("asn"));
+ udfContext.setLookupFields(Collections.singletonList("ip"));
+ udfContext.setOutputFields(Collections.singletonList("asn"));
}
@@ -36,7 +36,7 @@ public class AsnLookupFunctionTest {
@Test
public void testInit(){
AsnLookup asnLookup = new AsnLookup();
- udfContext.setLookup_fields(new ArrayList<>());
+ udfContext.setLookupFields(new ArrayList<>());
udfContext.setParameters(new HashMap<>());
udfContext.setParameters(null);
Assertions.assertThrows(GrootStreamRuntimeException.class, () -> {
@@ -48,10 +48,10 @@ public class AsnLookupFunctionTest {
asnLookup.open(null, udfContext);
});
- udfContext.setLookup_fields(new ArrayList<>());
- udfContext.getLookup_fields().add("v1");
- udfContext.setOutput_fields(new ArrayList<>());
- udfContext.getOutput_fields().add("v2");
+ udfContext.setLookupFields(new ArrayList<>());
+ udfContext.getLookupFields().add("v1");
+ udfContext.setOutputFields(new ArrayList<>());
+ udfContext.getOutputFields().add("v2");
udfContext.setParameters(new HashMap<>());
udfContext.getParameters().put("option","other");
Assertions.assertThrows(GrootStreamRuntimeException.class, () -> {
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java
index bc67f1a..83f8ab4 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java
@@ -25,14 +25,14 @@ public class GeoIpLookupFunctionTest {
@BeforeAll
public static void setUp() {
udfContext = new UDFContext();
- udfContext.setLookup_fields(Collections.singletonList("ip"));
- udfContext.setOutput_fields(Collections.singletonList("iplocation"));
+ udfContext.setLookupFields(Collections.singletonList("ip"));
+ udfContext.setOutputFields(Collections.singletonList("iplocation"));
}
@Test
public void testInit(){
GeoIpLookup geoIpLookup = new GeoIpLookup();
- udfContext.setLookup_fields(new ArrayList<>());
+ udfContext.setLookupFields(new ArrayList<>());
udfContext.setParameters(new HashMap<>());
udfContext.setParameters(null);
Assertions.assertThrows(GrootStreamRuntimeException.class, () -> {
@@ -44,10 +44,10 @@ public class GeoIpLookupFunctionTest {
geoIpLookup.open(null, udfContext);
});
- udfContext.setLookup_fields(new ArrayList<>());
- udfContext.getLookup_fields().add("v1");
- udfContext.setOutput_fields(new ArrayList<>());
- udfContext.getOutput_fields().add("v2");
+ udfContext.setLookupFields(new ArrayList<>());
+ udfContext.getLookupFields().add("v1");
+ udfContext.setOutputFields(new ArrayList<>());
+ udfContext.getOutputFields().add("v2");
udfContext.setParameters(new HashMap<>());
udfContext.getParameters().put("option","other");
Assertions.assertThrows(GrootStreamRuntimeException.class, () -> {
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
index cc96993..16d5cce 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectListTest.java
@@ -30,8 +30,8 @@ public class CollectListTest {
private void testMerge(List<String> arr,List<String> arr2) {
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("field_list"));
+ udfContext.setLookupFields(List.of("field"));
+ udfContext.setOutputFields(Collections.singletonList("field_list"));
CollectList collectList = new CollectList();
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
@@ -68,8 +68,8 @@ public class CollectListTest {
private void testGetResult(List<String> arr) throws ParseException {
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("field_list"));
+ udfContext.setLookupFields(List.of("field"));
+ udfContext.setOutputFields(Collections.singletonList("field_list"));
CollectList collectList = new CollectList();
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
index cf897f6..8909794 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/CollectSetTest.java
@@ -28,8 +28,8 @@ public class CollectSetTest {
private void testMerge(List<String> arr,List<String> arr2) {
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("field_list"));
+ udfContext.setLookupFields(List.of("field"));
+ udfContext.setOutputFields(Collections.singletonList("field_list"));
CollectSet collectSet = new CollectSet();
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
@@ -65,8 +65,8 @@ public class CollectSetTest {
private static void testGetResult(List<String> arr) throws ParseException {
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("field_list"));
+ udfContext.setLookupFields(List.of("field"));
+ udfContext.setOutputFields(Collections.singletonList("field_list"));
CollectSet collectSet = new CollectSet();
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java
index d114a09..0acf1d5 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/FirstValueTest.java
@@ -27,8 +27,8 @@ public class FirstValueTest {
private void testMerge(List<String> arr,List<String> arr2) {
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("field_first"));
+ udfContext.setLookupFields(List.of("field"));
+ udfContext.setOutputFields(Collections.singletonList("field_first"));
FirstValue firstValue = new FirstValue();
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
@@ -63,8 +63,8 @@ public class FirstValueTest {
private static void testGetResult(List<String> arr) throws ParseException {
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("field_first"));
+ udfContext.setLookupFields(List.of("field"));
+ udfContext.setOutputFields(Collections.singletonList("field_first"));
FirstValue firstValue = new FirstValue();
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java
index 3f5432f..b2c9ceb 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LastValueTest.java
@@ -30,8 +30,8 @@ public class LastValueTest {
private void testMerge(List<String> arr,List<String> arr2) {
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("field_last"));
+ udfContext.setLookupFields(List.of("field"));
+ udfContext.setOutputFields(Collections.singletonList("field_last"));
LastValue lastValue = new LastValue();
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
@@ -66,8 +66,8 @@ public class LastValueTest {
private static void testGetResult(List<String> arr) throws ParseException {
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("field_last"));
+ udfContext.setLookupFields(List.of("field"));
+ udfContext.setOutputFields(Collections.singletonList("field_last"));
LastValue lastValue = new LastValue();
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java
index e747c42..54d9dba 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/LongCountTest.java
@@ -31,8 +31,8 @@ public class LongCountTest {
private void testMerge(Number[] arr,Number[] arr2) {
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("count"));
+ udfContext.setLookupFields(List.of("field"));
+ udfContext.setOutputFields(Collections.singletonList("count"));
LongCount longCount = new LongCount();
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
@@ -66,7 +66,7 @@ public class LongCountTest {
private static void testGetResult(Number[] arr) throws ParseException {
UDFContext udfContext = new UDFContext();
- udfContext.setOutput_fields(Collections.singletonList("count"));
+ udfContext.setOutputFields(Collections.singletonList("count"));
LongCount longCount = new LongCount();
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MaxTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MaxTest.java
index 67fec93..311d51f 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MaxTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MaxTest.java
@@ -31,8 +31,8 @@ public class MaxTest {
// 初始化上下文
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("value"));
- udfContext.setOutput_fields(List.of("maxValue"));
+ udfContext.setLookupFields(List.of("value"));
+ udfContext.setOutputFields(List.of("maxValue"));
maxFunction.open(udfContext);
// 初始化累加器的 metricsFields
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java
index 8255cb1..62efc0a 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MeanTest.java
@@ -31,8 +31,8 @@ public class MeanTest {
}
private void testMerge(Number[] arr1,Number[] arr2,int precision) throws ParseException {
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("field_mean"));
+ udfContext.setLookupFields(List.of("field"));
+ udfContext.setOutputFields(Collections.singletonList("field_mean"));
udfContext.setParameters(new HashMap<>());
udfContext.getParameters().put("precision", precision);
Mean mean = new Mean();
@@ -47,8 +47,8 @@ public class MeanTest {
}
private Accumulator getMiddleResult(Number[] arr,int precision) throws ParseException {
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("field_mean"));
+ udfContext.setLookupFields(List.of("field"));
+ udfContext.setOutputFields(Collections.singletonList("field_mean"));
udfContext.setParameters(new HashMap<>());
udfContext.getParameters().put("precision", precision);
Mean mean = new Mean();
@@ -72,8 +72,8 @@ public class MeanTest {
private void testInt(Number[] arr,int precision) throws ParseException {
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("field_mean"));
+ udfContext.setLookupFields(List.of("field"));
+ udfContext.setOutputFields(Collections.singletonList("field_mean"));
udfContext.setParameters(new HashMap<>());
udfContext.getParameters().put("precision", precision);
Mean mean = new Mean();
@@ -98,8 +98,8 @@ public class MeanTest {
private void testDouble(Number[] arr,int precision) throws ParseException {
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("field_mean"));
+ udfContext.setLookupFields(List.of("field"));
+ udfContext.setOutputFields(Collections.singletonList("field_mean"));
udfContext.setParameters(new HashMap<>());
udfContext.getParameters().put("precision", precision);
Mean mean = new Mean();
@@ -123,8 +123,8 @@ public class MeanTest {
private void testNoPrecision(Number[] arr) throws ParseException {
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("field_mean"));
+ udfContext.setLookupFields(List.of("field"));
+ udfContext.setOutputFields(Collections.singletonList("field_mean"));
Mean mean = new Mean();
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MinTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MinTest.java
index ee63137..e5a1615 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MinTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/MinTest.java
@@ -26,8 +26,8 @@ public class MinTest {
void setUp() {
minFunction = new Min();
udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("value"));
- udfContext.setOutput_fields(List.of("minValue"));
+ udfContext.setLookupFields(List.of("value"));
+ udfContext.setOutputFields(List.of("minValue"));
minFunction.open(udfContext);
acc = new Accumulator();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java
index a4072ca..7ccb365 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/aggregate/NumberSumTest.java
@@ -48,8 +48,8 @@ public class NumberSumTest {
private void testMerge(Number[] arr,Number[] arr2) {
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("field_sum"));
+ udfContext.setLookupFields(List.of("field"));
+ udfContext.setOutputFields(Collections.singletonList("field_sum"));
NumberSum numberSum = new NumberSum();
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
@@ -83,8 +83,8 @@ public class NumberSumTest {
private static void excute(Number[] arr, Class<? extends Number> clazz) throws ParseException {
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("field"));
- udfContext.setOutput_fields(Collections.singletonList("field_sum"));
+ udfContext.setLookupFields(List.of("field"));
+ udfContext.setOutputFields(Collections.singletonList("field_sum"));
NumberSum numberSum = new NumberSum();
Map<String, Object> metricsFields = new HashMap<>();
Accumulator accumulator = new Accumulator();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DecodeBase64FunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DecodeBase64FunctionTest.java
index 1d3b863..a5f31f7 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DecodeBase64FunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DecodeBase64FunctionTest.java
@@ -22,8 +22,8 @@ public class DecodeBase64FunctionTest {
@BeforeAll
public static void setUp() {
udfContext = new UDFContext();
- udfContext.setLookup_fields(Arrays.asList("message", "charset"));
- udfContext.setOutput_fields(Collections.singletonList("decodeResult"));
+ udfContext.setLookupFields(Arrays.asList("message", "charset"));
+ udfContext.setOutputFields(Collections.singletonList("decodeResult"));
Map<String,Object> map = new HashMap<>();
map.put("value_field","message");
map.put("charset_field","charset");
@@ -51,8 +51,8 @@ public class DecodeBase64FunctionTest {
DecodeBase64 decodeBase64 = new DecodeBase64();
- udfContext.setLookup_fields(Collections.singletonList("message"));
- udfContext.setOutput_fields(Collections.singletonList("decodeResult"));
+ udfContext.setLookupFields(Collections.singletonList("message"));
+ udfContext.setOutputFields(Collections.singletonList("decodeResult"));
udfContext.getParameters().remove("value_field");
assertThrows(GrootStreamRuntimeException.class,
() -> {
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java
index 2126117..f8076cc 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java
@@ -24,15 +24,15 @@ public class DomainFunctionTest {
udfContext = new UDFContext();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("domain"));
- udfContext.setOutput_fields(Collections.singletonList("domain1"));
+ udfContext.setLookupFields(Collections.singletonList("domain"));
+ udfContext.setOutputFields(Collections.singletonList("domain1"));
}
@Test
public void testInit(){
Domain domain = new Domain();
- udfContext.setLookup_fields(new ArrayList<>());
+ udfContext.setLookupFields(new ArrayList<>());
udfContext.setParameters(new HashMap<>());
udfContext.setParameters(null);
Assertions.assertThrows(GrootStreamRuntimeException.class, () -> {
@@ -44,10 +44,10 @@ public class DomainFunctionTest {
domain.open(null, udfContext);
});
- udfContext.setLookup_fields(new ArrayList<>());
- udfContext.getLookup_fields().add("v1");
- udfContext.setOutput_fields(new ArrayList<>());
- udfContext.getOutput_fields().add("v2");
+ udfContext.setLookupFields(new ArrayList<>());
+ udfContext.getLookupFields().add("v1");
+ udfContext.setOutputFields(new ArrayList<>());
+ udfContext.getOutputFields().add("v2");
udfContext.setParameters(new HashMap<>());
udfContext.getParameters().put("option","other");
Assertions.assertThrows(GrootStreamRuntimeException.class, () -> {
@@ -59,16 +59,16 @@ public class DomainFunctionTest {
@Test
public void testDomainFunctionTopLevelDomain() {
parameters.put("option", "TOP_LEVEL_DOMAIN");
- udfContext.setLookup_fields(Collections.singletonList("domain"));
- udfContext.setOutput_fields(Collections.singletonList("domain1"));
+ udfContext.setLookupFields(Collections.singletonList("domain"));
+ udfContext.setOutputFields(Collections.singletonList("domain1"));
Domain domain = new Domain();
domain.open(null, udfContext);
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
- extractedFields.put("domain", "www.baidu.com");
+ extractedFields.put("domain", "http://www.baidu.com.cn");
event.setExtractedFields(extractedFields);
Event result1 = domain.evaluate(event);
- assertEquals("com", result1.getExtractedFields().get("domain1"));
+ assertEquals("com.cn", result1.getExtractedFields().get("domain1"));
}
@Test
@@ -76,8 +76,8 @@ public class DomainFunctionTest {
parameters.put("option", "FIRST_SIGNIFICANT_SUBDOMAIN");
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("domain"));
- udfContext.setOutput_fields(Collections.singletonList("domain1"));
+ udfContext.setLookupFields(Collections.singletonList("domain"));
+ udfContext.setOutputFields(Collections.singletonList("domain1"));
Domain domain = new Domain();
domain.open(null, udfContext);
Event event = new Event();
@@ -87,4 +87,5 @@ public class DomainFunctionTest {
Event result1 = domain.evaluate(event);
assertEquals("baidu.com", result1.getExtractedFields().get("domain1"));
}
+
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DropFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DropFunctionTest.java
index 294a492..027533e 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DropFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DropFunctionTest.java
@@ -16,6 +16,7 @@ public class DropFunctionTest {
@BeforeAll
public static void setUp() {
udfContext = new UDFContext();
+ udfContext.setFilter("true");
udfContext.setParameters(new HashMap<>());
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java
index adda7c5..2bd6705 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java
@@ -25,8 +25,8 @@ public class EncodeBase64FunctionTest {
@Test
public void testEncodeBase64FunctionForByte() {
udfContext = new UDFContext();
- udfContext.setOutput_fields(Collections.singletonList("encodeResult"));
- udfContext.setLookup_fields(Collections.singletonList("name"));
+ udfContext.setOutputFields(Collections.singletonList("encodeResult"));
+ udfContext.setLookupFields(Collections.singletonList("name"));
Map<String,Object> map = new HashMap<>();
map.put("input_type","byte_array");
udfContext.setParameters(map);
@@ -47,8 +47,8 @@ public class EncodeBase64FunctionTest {
public void testEncodeBase64FunctionForString() {
udfContext = new UDFContext();
- udfContext.setOutput_fields(Collections.singletonList("encodeResult"));
- udfContext.setLookup_fields(Collections.singletonList("name"));
+ udfContext.setOutputFields(Collections.singletonList("encodeResult"));
+ udfContext.setLookupFields(Collections.singletonList("name"));
Map<String,Object> map = new HashMap<>();
map.put("input_type","string");
udfContext.setParameters(map);
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java
index d2091f7..e9f1698 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java
@@ -42,8 +42,8 @@ public class EncryptFunctionTest {
public static void setUp() throws IOException {
Security.addProvider(new BouncyCastleProvider());
udfContext = new UDFContext();
- udfContext.setLookup_fields(Collections.singletonList("phone_number"));
- udfContext.setOutput_fields(Collections.singletonList("phone_number"));
+ udfContext.setLookupFields(Collections.singletonList("phone_number"));
+ udfContext.setOutputFields(Collections.singletonList("phone_number"));
httpClientPoolUtilMockedStatic = mockSensitiveFields();
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java
index 61c3975..e829b1d 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java
@@ -25,7 +25,7 @@ public class FlattenFunctionTest {
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("parent1", "parent2", "parent3", "parent4","parent5","parent6","parent7","parent8"));
+ udfContext.setLookupFields(List.of("parent1", "parent2", "parent3", "parent4","parent5","parent6","parent7","parent8"));
Map<String, Object> params = new HashMap<>();
params.put("prefix", "prefix");
params.put("depth", "4");
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java
index d9f4538..6cb1bf8 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java
@@ -19,8 +19,8 @@ public class FromUnixTimestampTest {
@BeforeAll
public static void setUp() {
udfContext = new UDFContext();
- udfContext.setLookup_fields(Arrays.asList("unixTimestamp"));
- udfContext.setOutput_fields(Arrays.asList("timestamp"));
+ udfContext.setLookupFields(Arrays.asList("unixTimestamp"));
+ udfContext.setOutputFields(Arrays.asList("timestamp"));
}
@Test
public void testFromUnixTimestampMsFunction() throws Exception {
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/GenerateStringArrayFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/GenerateStringArrayFunctionTest.java
index 5490299..1fbe06c 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/GenerateStringArrayFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/GenerateStringArrayFunctionTest.java
@@ -17,8 +17,8 @@ public class GenerateStringArrayFunctionTest {
@BeforeAll
public static void setUp() {
udfContext = new UDFContext();
- udfContext.setLookup_fields(Arrays.asList("t1", "t2","t3","t4","t5"));
- udfContext.setOutput_fields(Collections.singletonList("result_list"));
+ udfContext.setLookupFields(Arrays.asList("t1", "t2","t3","t4","t5"));
+ udfContext.setOutputFields(Collections.singletonList("result_list"));
}
// 测试方法
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/HmacFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/HmacFunctionTest.java
index d2219d8..5a4d0d3 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/HmacFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/HmacFunctionTest.java
@@ -23,8 +23,8 @@ public class HmacFunctionTest {
@BeforeAll
public static void setUp() {
udfContext = new UDFContext();
- udfContext.setLookup_fields(Collections.singletonList("phone_number"));
- udfContext.setOutput_fields(Collections.singletonList("phone_number_mac"));
+ udfContext.setLookupFields(Collections.singletonList("phone_number"));
+ udfContext.setOutputFields(Collections.singletonList("phone_number_mac"));
}
@Test
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java
index 99f3f96..dd661de 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java
@@ -25,7 +25,7 @@ public class JsonExtractFunctionTest {
@Test
public void testInit(){
JsonExtract jsonExtract = new JsonExtract();
- udfContext.setLookup_fields(new ArrayList<>());
+ udfContext.setLookupFields(new ArrayList<>());
udfContext.setParameters(new HashMap<>());
udfContext.setParameters(null);
Assertions.assertThrows(GrootStreamRuntimeException.class, () -> {
@@ -52,8 +52,8 @@ public class JsonExtractFunctionTest {
Map<String, Object> parameters = new HashMap<>();
parameters.put("value_expression","$.tags[?(@.tag=='device_group')][0].value");
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Collections.singletonList("device_tag"));
- udfContext.setOutput_fields(Collections.singletonList("device_group"));
+ udfContext.setLookupFields(Collections.singletonList("device_tag"));
+ udfContext.setOutputFields(Collections.singletonList("device_group"));
jsonExtract.open(null, udfContext);
Event event = new Event();
String jsonString = "{\"device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-tsgx\\\"},{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-tsgx\\\"}]}\"}";
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/StringJoinerFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/StringJoinerFunctionTest.java
index d80eb97..e9cde8a 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/StringJoinerFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/StringJoinerFunctionTest.java
@@ -20,8 +20,8 @@ public class StringJoinerFunctionTest {
@BeforeAll
public static void setUp() {
udfContext = new UDFContext();
- udfContext.setLookup_fields(Arrays.asList("server_ip", "client_ip"));
- udfContext.setOutput_fields(Collections.singletonList("ip_string"));
+ udfContext.setLookupFields(Arrays.asList("server_ip", "client_ip"));
+ udfContext.setOutputFields(Collections.singletonList("ip_string"));
Map<String, Object> params = new HashMap<>();
params.put("separator",",");
params.put("prefix","[");
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java
index ef79d51..534569b 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java
@@ -26,8 +26,8 @@ public class UUIDTest {
UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(List.of("client_ip","server_ip"));
- udfContext.setOutput_fields(Collections.singletonList("uuid"));
+ udfContext.setLookupFields(List.of("client_ip","server_ip"));
+ udfContext.setOutputFields(Collections.singletonList("uuid"));
parameters.put("namespace","NAMESPACE_IP_1");
Assertions.assertThrows(GrootStreamRuntimeException.class, () -> {
uuidv5.open(null, udfContext);
@@ -41,7 +41,7 @@ public class UUIDTest {
UUID uuid = new UUID();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
- udfContext.setOutput_fields(Collections.singletonList("uuid"));
+ udfContext.setOutputFields(Collections.singletonList("uuid"));
uuid.open(null, udfContext);
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
@@ -55,7 +55,7 @@ public class UUIDTest {
UUIDv7 uuid = new UUIDv7();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
- udfContext.setOutput_fields(Collections.singletonList("uuid"));
+ udfContext.setOutputFields(Collections.singletonList("uuid"));
uuid.open(null, udfContext);
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
@@ -69,8 +69,8 @@ public class UUIDTest {
UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(List.of("client_ip", "server_ip"));
- udfContext.setOutput_fields(Collections.singletonList("uuid"));
+ udfContext.setLookupFields(List.of("client_ip", "server_ip"));
+ udfContext.setOutputFields(Collections.singletonList("uuid"));
parameters.put("namespace","NAMESPACE_IP");
uuidv5.open(null, udfContext);
Event event = new Event();
@@ -90,8 +90,8 @@ public class UUIDTest {
UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(List.of("domain"));
- udfContext.setOutput_fields(Collections.singletonList("uuid"));
+ udfContext.setLookupFields(List.of("domain"));
+ udfContext.setOutputFields(Collections.singletonList("uuid"));
parameters.put("namespace","NAMESPACE_DOMAIN");
uuidv5.open(null, udfContext);
Event event = new Event();
@@ -107,8 +107,8 @@ public class UUIDTest {
UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(List.of("app"));
- udfContext.setOutput_fields(Collections.singletonList("uuid"));
+ udfContext.setLookupFields(List.of("app"));
+ udfContext.setOutputFields(Collections.singletonList("uuid"));
parameters.put("namespace","NAMESPACE_APP");
uuidv5.open(null, udfContext);
Event event = new Event();
@@ -125,8 +125,8 @@ public class UUIDTest {
UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(List.of("subscriber_id"));
- udfContext.setOutput_fields(Collections.singletonList("uuid"));
+ udfContext.setLookupFields(List.of("subscriber_id"));
+ udfContext.setOutputFields(Collections.singletonList("uuid"));
parameters.put("namespace","NAMESPACE_SUBSCRIBER");
uuidv5.open(null, udfContext);
Event event = new Event();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java
index 12a2093..a0d70d7 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java
@@ -19,8 +19,8 @@ public class UnixTimestampConverterTest {
@BeforeAll
public static void setUp() {
udfContext = new UDFContext();
- udfContext.setLookup_fields(Arrays.asList("input"));
- udfContext.setOutput_fields(Arrays.asList("output"));
+ udfContext.setLookupFields(Arrays.asList("input"));
+ udfContext.setOutputFields(Arrays.asList("output"));
}
@Test
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/JsonUnrollFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/JsonUnrollFunctionTest.java
index 02f0b66..3749eb1 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/JsonUnrollFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/JsonUnrollFunctionTest.java
@@ -52,8 +52,8 @@ public class JsonUnrollFunctionTest {
JsonUnroll unroll = new JsonUnroll();
Event event = new Event();
event.setExtractedFields(nestedMap);
- udfContext.setLookup_fields(List.of("k1"));
- udfContext.setOutput_fields(List.of("newk1"));
+ udfContext.setLookupFields(List.of("k1"));
+ udfContext.setOutputFields(List.of("newk1"));
unroll.open(null, udfContext);
List<Event> result3 = unroll.evaluate(event);
assertEquals(2, result3.size());
@@ -75,8 +75,8 @@ public class JsonUnrollFunctionTest {
udfContext.setParameters(params);
params.put("path", "$.k3_1.k3_1_1");
event.setExtractedFields(nestedMap);
- udfContext.setLookup_fields(List.of("k3"));
- udfContext.setOutput_fields(List.of("newk3"));
+ udfContext.setLookupFields(List.of("k3"));
+ udfContext.setOutputFields(List.of("newk3"));
unroll.open(null, udfContext);
List<Event> result2 = unroll.evaluate(event);
assertEquals(2, result2.size());
@@ -95,8 +95,8 @@ public class JsonUnrollFunctionTest {
udfContext.setParameters(params);
params.put("path", "$.k4_1.k4_1_1");
event.setExtractedFields(nestedMap);
- udfContext.setLookup_fields(List.of("k4"));
- udfContext.setOutput_fields(List.of("newk4"));
+ udfContext.setLookupFields(List.of("k4"));
+ udfContext.setOutputFields(List.of("newk4"));
unroll.open(null, udfContext);
List<Event> result2 = unroll.evaluate(event);
assertEquals(1, result2.size());
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnrollFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnrollFunctionTest.java
index 2f4da76..db66e55 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnrollFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnrollFunctionTest.java
@@ -34,8 +34,8 @@ public class UnrollFunctionTest {
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("k1"));
- udfContext.setOutput_fields(List.of("newk1"));
+ udfContext.setLookupFields(List.of("k1"));
+ udfContext.setOutputFields(List.of("newk1"));
Unroll unroll = new Unroll();
unroll.open(null, udfContext);
Event event = new Event();
@@ -60,8 +60,8 @@ public class UnrollFunctionTest {
udfContext.setParameters(params);
udfContext.setParameters(params);
event.setExtractedFields(nestedMap);
- udfContext.setLookup_fields(List.of("k2"));
- udfContext.setOutput_fields(List.of("k2"));
+ udfContext.setLookupFields(List.of("k2"));
+ udfContext.setOutputFields(List.of("k2"));
unroll.open(null, udfContext);
List<Event> result2 = unroll.evaluate(event);
assertEquals(3, result2.size());
@@ -76,8 +76,8 @@ public class UnrollFunctionTest {
Unroll unroll = new Unroll();
Event event = new Event();
event.setExtractedFields(nestedMap);
- udfContext.setLookup_fields(List.of("k3"));
- udfContext.setOutput_fields(List.of("newk3"));
+ udfContext.setLookupFields(List.of("k3"));
+ udfContext.setOutputFields(List.of("newk3"));
unroll.open(null, udfContext);
List<Event> result2 = unroll.evaluate(event);
assertEquals(1, result2.size());
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantileTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantileTest.java
index 33f7bad..990186d 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantileTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantileTest.java
@@ -1,89 +1,89 @@
-package com.geedgenetworks.core.udf.udaf.HdrHistogram;
-
-import com.geedgenetworks.common.Accumulator;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.udf.AggregateFunction;
-import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.sketch.util.StringUtils;
-import org.HdrHistogram.ArrayHistogram;
-import org.junit.jupiter.api.Test;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.*;
-
-public class HdrHistogramQuantileTest {
- AggregateFunction agg;
- Accumulator acc;
- Event event;
-
- @Test
- public void inputRegular() {
- double probability = 0.5;
- initData( "regular", 2, probability);
- long count = 100000;
- Map<String, Object> fields = event.getExtractedFields();
- for (int i = 1; i <= count; i++) {
- fields.put("ms", i);
- agg.add(event, acc);
- }
-
- long expect = (long) (count * probability);
- long rst = (long)agg.getResult(acc).getMetricsFields().get("ms_his");
- double error = Math.abs(rst - expect) / (double) expect;
- System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error));
- assertTrue(error <= 0.05);
- }
-
- @Test
- public void inputSketch() {
- double probability = 0.5;
- initData( "sketch", 2, probability);
- long count = 100000;
- Map<String, Object> fields = event.getExtractedFields();
-
- ArrayHistogram his = new ArrayHistogram(2);
- for (int i = 1; i <= count; i++) {
- his.recordValue(i);
- }
- fields.put("ms", StringUtils.encodeBase64String(his.toBytes()));
- agg.add(event, acc);
-
- his = new ArrayHistogram(2);
- for (int i = 1; i <= count; i++) {
- his.recordValue(i);
- }
- fields.put("ms", his.toBytes());
- agg.add(event, acc);
-
- long expect = (long) (count * probability);
- long rst = (long)agg.getResult(acc).getMetricsFields().get("ms_his");
- double error = Math.abs(rst - expect) / (double) expect;
- System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error));
- assertTrue(error <= 0.05);
- }
-
- private void initData(String input_type, int numberOfSignificantValueDigits, double probability){
- agg = new HdrHistogramQuantile();
- UDFContext c = new UDFContext();
- Map<String, Object> parameters = new HashMap<>();
- parameters.put("input_type", input_type);
- parameters.put("numberOfSignificantValueDigits", numberOfSignificantValueDigits);
- parameters.put("probability", probability);
- c.setParameters(parameters);
- c.setLookup_fields(Collections.singletonList("ms"));
- c.setOutput_fields(Collections.singletonList("ms_his"));
-
- agg.open(c);
- Map<String, Object> map = new HashMap<>();
- acc = new Accumulator();
- acc.setMetricsFields(map);
- agg.initAccumulator(acc);
-
- event = new Event();
- Map<String, Object> fields = new HashMap<>();
- event.setExtractedFields(fields);
- }
+package com.geedgenetworks.core.udf.udaf.HdrHistogram;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.sketch.util.StringUtils;
+import org.HdrHistogram.ArrayHistogram;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class HdrHistogramQuantileTest {
+ AggregateFunction agg;
+ Accumulator acc;
+ Event event;
+
+ @Test
+ public void inputRegular() {
+ double probability = 0.5;
+ initData( "regular", 2, probability);
+ long count = 100000;
+ Map<String, Object> fields = event.getExtractedFields();
+ for (int i = 1; i <= count; i++) {
+ fields.put("ms", i);
+ agg.add(event, acc);
+ }
+
+ long expect = (long) (count * probability);
+ long rst = (long)agg.getResult(acc).getMetricsFields().get("ms_his");
+ double error = Math.abs(rst - expect) / (double) expect;
+ System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error));
+ assertTrue(error <= 0.05);
+ }
+
+ @Test
+ public void inputSketch() {
+ double probability = 0.5;
+ initData( "sketch", 2, probability);
+ long count = 100000;
+ Map<String, Object> fields = event.getExtractedFields();
+
+ ArrayHistogram his = new ArrayHistogram(2);
+ for (int i = 1; i <= count; i++) {
+ his.recordValue(i);
+ }
+ fields.put("ms", StringUtils.encodeBase64String(his.toBytes()));
+ agg.add(event, acc);
+
+ his = new ArrayHistogram(2);
+ for (int i = 1; i <= count; i++) {
+ his.recordValue(i);
+ }
+ fields.put("ms", his.toBytes());
+ agg.add(event, acc);
+
+ long expect = (long) (count * probability);
+ long rst = (long)agg.getResult(acc).getMetricsFields().get("ms_his");
+ double error = Math.abs(rst - expect) / (double) expect;
+ System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error));
+ assertTrue(error <= 0.05);
+ }
+
+ private void initData(String input_type, int numberOfSignificantValueDigits, double probability){
+ agg = new HdrHistogramQuantile();
+ UDFContext c = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("input_type", input_type);
+ parameters.put("numberOfSignificantValueDigits", numberOfSignificantValueDigits);
+ parameters.put("probability", probability);
+ c.setParameters(parameters);
+ c.setLookupFields(Collections.singletonList("ms"));
+ c.setOutputFields(Collections.singletonList("ms_his"));
+
+ agg.open(c);
+ Map<String, Object> map = new HashMap<>();
+ acc = new Accumulator();
+ acc.setMetricsFields(map);
+ agg.initAccumulator(acc);
+
+ event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ event.setExtractedFields(fields);
+ }
} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantilesTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantilesTest.java
index 4eefd9a..a57645d 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantilesTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramQuantilesTest.java
@@ -1,98 +1,98 @@
-package com.geedgenetworks.core.udf.udaf.HdrHistogram;
-
-import com.geedgenetworks.common.Accumulator;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.udf.AggregateFunction;
-import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.sketch.util.StringUtils;
-import org.HdrHistogram.ArrayHistogram;
-import org.junit.jupiter.api.Test;
-
-import java.util.*;
-
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class HdrHistogramQuantilesTest {
- AggregateFunction agg;
- Accumulator acc;
- Event event;
-
- @Test
- public void inputRegular() {
- double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1};
- initData( "regular", 2, probabilities);
- long count = 100000;
- Map<String, Object> fields = event.getExtractedFields();
- for (int i = 1; i <= count; i++) {
- fields.put("ms", i);
- agg.add(event, acc);
- }
-
- long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray();
-
- List<Long> rsts = (List<Long>)agg.getResult(acc).getMetricsFields().get("ms_his");
- for (int i = 0; i < expects.length; i++) {
- long rst = rsts.get(i);
- long expect = expects[i];
- double probability = probabilities[i];
- double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect;
- System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error));
- assertTrue(error <= 0.05);
- }
- }
-
- @Test
- public void inputSketch() {
- double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1};
- initData( "sketch", 2, probabilities);
- long count = 100000;
- Map<String, Object> fields = event.getExtractedFields();
- long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray();
-
- ArrayHistogram his = new ArrayHistogram(2);
- for (int i = 1; i <= count; i++) {
- his.recordValue(i);
- }
- fields.put("ms", StringUtils.encodeBase64String(his.toBytes()));
- agg.add(event, acc);
-
- his = new ArrayHistogram(2);
- for (int i = 1; i <= count; i++) {
- his.recordValue(i);
- }
- fields.put("ms", his.toBytes());
- agg.add(event, acc);
-
- List<Long> rsts = (List<Long>)agg.getResult(acc).getMetricsFields().get("ms_his");
- for (int i = 0; i < expects.length; i++) {
- long rst = rsts.get(i);
- long expect = expects[i];
- double probability = probabilities[i];
- double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect;
- System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error));
- assertTrue(error <= 0.05);
- }
- }
-
- private void initData(String input_type, int numberOfSignificantValueDigits, double[] probabilities){
- agg = new HdrHistogramQuantiles();
- UDFContext c = new UDFContext();
- Map<String, Object> parameters = new HashMap<>();
- parameters.put("input_type", input_type);
- parameters.put("numberOfSignificantValueDigits", numberOfSignificantValueDigits);
- parameters.put("probabilities", probabilities);
- c.setParameters(parameters);
- c.setLookup_fields(Collections.singletonList("ms"));
- c.setOutput_fields(Collections.singletonList("ms_his"));
-
- agg.open(c);
- Map<String, Object> map = new HashMap<>();
- acc = new Accumulator();
- acc.setMetricsFields(map);
- agg.initAccumulator(acc);
-
- event = new Event();
- Map<String, Object> fields = new HashMap<>();
- event.setExtractedFields(fields);
- }
+package com.geedgenetworks.core.udf.udaf.HdrHistogram;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.sketch.util.StringUtils;
+import org.HdrHistogram.ArrayHistogram;
+import org.junit.jupiter.api.Test;
+
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HdrHistogramQuantilesTest {
+ AggregateFunction agg;
+ Accumulator acc;
+ Event event;
+
+ @Test
+ public void inputRegular() {
+ double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1};
+ initData( "regular", 2, probabilities);
+ long count = 100000;
+ Map<String, Object> fields = event.getExtractedFields();
+ for (int i = 1; i <= count; i++) {
+ fields.put("ms", i);
+ agg.add(event, acc);
+ }
+
+ long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray();
+
+ List<Long> rsts = (List<Long>)agg.getResult(acc).getMetricsFields().get("ms_his");
+ for (int i = 0; i < expects.length; i++) {
+ long rst = rsts.get(i);
+ long expect = expects[i];
+ double probability = probabilities[i];
+ double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect;
+ System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error));
+ assertTrue(error <= 0.05);
+ }
+ }
+
+ @Test
+ public void inputSketch() {
+ double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1};
+ initData( "sketch", 2, probabilities);
+ long count = 100000;
+ Map<String, Object> fields = event.getExtractedFields();
+ long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray();
+
+ ArrayHistogram his = new ArrayHistogram(2);
+ for (int i = 1; i <= count; i++) {
+ his.recordValue(i);
+ }
+ fields.put("ms", StringUtils.encodeBase64String(his.toBytes()));
+ agg.add(event, acc);
+
+ his = new ArrayHistogram(2);
+ for (int i = 1; i <= count; i++) {
+ his.recordValue(i);
+ }
+ fields.put("ms", his.toBytes());
+ agg.add(event, acc);
+
+ List<Long> rsts = (List<Long>)agg.getResult(acc).getMetricsFields().get("ms_his");
+ for (int i = 0; i < expects.length; i++) {
+ long rst = rsts.get(i);
+ long expect = expects[i];
+ double probability = probabilities[i];
+ double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect;
+ System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error));
+ assertTrue(error <= 0.05);
+ }
+ }
+
+ private void initData(String input_type, int numberOfSignificantValueDigits, double[] probabilities){
+ agg = new HdrHistogramQuantiles();
+ UDFContext c = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("input_type", input_type);
+ parameters.put("numberOfSignificantValueDigits", numberOfSignificantValueDigits);
+ parameters.put("probabilities", probabilities);
+ c.setParameters(parameters);
+ c.setLookupFields(Collections.singletonList("ms"));
+ c.setOutputFields(Collections.singletonList("ms_his"));
+
+ agg.open(c);
+ Map<String, Object> map = new HashMap<>();
+ acc = new Accumulator();
+ acc.setMetricsFields(map);
+ agg.initAccumulator(acc);
+
+ event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ event.setExtractedFields(fields);
+ }
} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramTest.java
index f177ca5..5905138 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramTest.java
@@ -1,102 +1,102 @@
-package com.geedgenetworks.core.udf.udaf.HdrHistogram;
-
-import com.geedgenetworks.common.Accumulator;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.udf.AggregateFunction;
-import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.sketch.util.StringUtils;
-import org.HdrHistogram.ArrayHistogram;
-import org.junit.jupiter.api.Test;
-
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class HdrHistogramTest {
- AggregateFunction agg;
- Accumulator acc;
- Event event;
-
- @Test
- public void inputRegular() {
- double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1};
- initData( "regular", 2, "base64");
- long count = 100000;
- Map<String, Object> fields = event.getExtractedFields();
- for (int i = 1; i <= count; i++) {
- fields.put("ms", i);
- agg.add(event, acc);
- }
-
- long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray();
- String str = (String) agg.getResult(acc).getMetricsFields().get("ms_his");
- ArrayHistogram his = ArrayHistogram.fromBytes(StringUtils.decodeBase64(str.getBytes(StandardCharsets.UTF_8)));
-
- for (int i = 0; i < expects.length; i++) {
- long rst = his.getValueAtPercentile(probabilities[i] * 100);
- long expect = expects[i];
- double probability = probabilities[i];
- double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect;
- System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error));
- assertTrue(error <= 0.05);
- }
- }
-
- @Test
- public void inputSketch() {
- double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1};
- initData( "sketch", 2, "binary");
- long count = 100000;
- Map<String, Object> fields = event.getExtractedFields();
- long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray();
-
- ArrayHistogram his = new ArrayHistogram(2);
- for (int i = 1; i <= count; i++) {
- his.recordValue(i);
- }
- fields.put("ms", StringUtils.encodeBase64String(his.toBytes()));
- agg.add(event, acc);
-
- his = new ArrayHistogram(2);
- for (int i = 1; i <= count; i++) {
- his.recordValue(i);
- }
- fields.put("ms", his.toBytes());
- agg.add(event, acc);
-
- byte[] bytes = (byte[]) agg.getResult(acc).getMetricsFields().get("ms_his");
- ArrayHistogram h = ArrayHistogram.fromBytes(bytes);
-
- for (int i = 0; i < expects.length; i++) {
- long rst = h.getValueAtPercentile(probabilities[i] * 100);
- long expect = expects[i];
- double probability = probabilities[i];
- double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect;
- System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error));
- assertTrue(error <= 0.05);
- }
- }
-
- private void initData(String input_type, int numberOfSignificantValueDigits, String output_format){
- agg = new HdrHistogram();
- UDFContext c = new UDFContext();
- Map<String, Object> parameters = new HashMap<>();
- parameters.put("input_type", input_type);
- parameters.put("numberOfSignificantValueDigits", numberOfSignificantValueDigits);
- parameters.put("output_format", output_format);
- c.setParameters(parameters);
- c.setLookup_fields(Collections.singletonList("ms"));
- c.setOutput_fields(Collections.singletonList("ms_his"));
-
- agg.open(c);
- Map<String, Object> map = new HashMap<>();
- acc = new Accumulator();
- acc.setMetricsFields(map);
- agg.initAccumulator(acc);
-
- event = new Event();
- Map<String, Object> fields = new HashMap<>();
- event.setExtractedFields(fields);
- }
+package com.geedgenetworks.core.udf.udaf.HdrHistogram;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.sketch.util.StringUtils;
+import org.HdrHistogram.ArrayHistogram;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HdrHistogramTest {
+ AggregateFunction agg;
+ Accumulator acc;
+ Event event;
+
+ @Test
+ public void inputRegular() {
+ double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1};
+ initData( "regular", 2, "base64");
+ long count = 100000;
+ Map<String, Object> fields = event.getExtractedFields();
+ for (int i = 1; i <= count; i++) {
+ fields.put("ms", i);
+ agg.add(event, acc);
+ }
+
+ long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray();
+ String str = (String) agg.getResult(acc).getMetricsFields().get("ms_his");
+ ArrayHistogram his = ArrayHistogram.fromBytes(StringUtils.decodeBase64(str.getBytes(StandardCharsets.UTF_8)));
+
+ for (int i = 0; i < expects.length; i++) {
+ long rst = his.getValueAtPercentile(probabilities[i] * 100);
+ long expect = expects[i];
+ double probability = probabilities[i];
+ double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect;
+ System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error));
+ assertTrue(error <= 0.05);
+ }
+ }
+
+ @Test
+ public void inputSketch() {
+ double[] probabilities = new double[]{0, 0.1, 0.25, 0.5, 0.75, 1};
+ initData( "sketch", 2, "binary");
+ long count = 100000;
+ Map<String, Object> fields = event.getExtractedFields();
+ long[] expects = Arrays.stream(probabilities).mapToLong(x -> (long) (count * x)).toArray();
+
+ ArrayHistogram his = new ArrayHistogram(2);
+ for (int i = 1; i <= count; i++) {
+ his.recordValue(i);
+ }
+ fields.put("ms", StringUtils.encodeBase64String(his.toBytes()));
+ agg.add(event, acc);
+
+ his = new ArrayHistogram(2);
+ for (int i = 1; i <= count; i++) {
+ his.recordValue(i);
+ }
+ fields.put("ms", his.toBytes());
+ agg.add(event, acc);
+
+ byte[] bytes = (byte[]) agg.getResult(acc).getMetricsFields().get("ms_his");
+ ArrayHistogram h = ArrayHistogram.fromBytes(bytes);
+
+ for (int i = 0; i < expects.length; i++) {
+ long rst = h.getValueAtPercentile(probabilities[i] * 100);
+ long expect = expects[i];
+ double probability = probabilities[i];
+ double error = probability <= 0.01? 0 : Math.abs(rst - expect) / (double) expect;
+ System.out.println(String.format("%s:%d,%d,%.4f", probability, expect , rst , error));
+ assertTrue(error <= 0.05);
+ }
+ }
+
+ private void initData(String input_type, int numberOfSignificantValueDigits, String output_format){
+ agg = new HdrHistogram();
+ UDFContext c = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("input_type", input_type);
+ parameters.put("numberOfSignificantValueDigits", numberOfSignificantValueDigits);
+ parameters.put("output_format", output_format);
+ c.setParameters(parameters);
+ c.setLookupFields(Collections.singletonList("ms"));
+ c.setOutputFields(Collections.singletonList("ms_his"));
+
+ agg.open(c);
+ Map<String, Object> map = new HashMap<>();
+ acc = new Accumulator();
+ acc.setMetricsFields(map);
+ agg.initAccumulator(acc);
+
+ event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ event.setExtractedFields(fields);
+ }
} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinctTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinctTest.java
index eae356d..b80d782 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinctTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinctTest.java
@@ -1,87 +1,87 @@
-package com.geedgenetworks.core.udf.udaf.hlld;
-
-
-import com.geedgenetworks.common.Accumulator;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.udf.AggregateFunction;
-import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.sketch.hlld.Hll;
-import com.geedgenetworks.sketch.util.StringUtils;
-import org.junit.jupiter.api.Test;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-
-public class HlldApproxCountDistinctTest {
- AggregateFunction agg;
- Accumulator acc;
- Event event;
-
-
- @Test
- public void inputRegular() {
- initData(14, "regular");
- long count = 100000;
- Map<String, Object> fields = event.getExtractedFields();
- for (int i = 0; i < count; i++) {
- fields.put("ip", i);
- agg.add(event, acc);
- }
-
- long rst = (long)agg.getResult(acc).getMetricsFields().get("ip_cnt");
- double error = Math.abs(rst - count) / (double) count;
- System.out.println(String.format("%d,%d,%.4f", count , rst , error));
- assertTrue(error <= 0.05);
- }
-
- @Test
- public void inputSketch() {
- initData(14, "sketch");
- long count = 150000;
- Map<String, Object> fields = event.getExtractedFields();
-
- Hll hll = new Hll(12);
- for (int i = 0; i < 100000; i++) {
- hll.add(i);
- }
- fields.put("ip", StringUtils.encodeBase64String(hll.toBytes()));
- agg.add(event, acc);
-
- hll = new Hll(13);
- for (int i = 50000; i < 150000; i++) {
- hll.add(i);
- }
- fields.put("ip", hll.toBytes());
- agg.add(event, acc);
-
- long rst = (long)agg.getResult(acc).getMetricsFields().get("ip_cnt");
- double error = Math.abs(rst - count) / (double) count;
- System.out.println(String.format("%d,%d,%.4f", count , rst , error));
- assertTrue(error <= 0.05);
- }
-
- private void initData(int precision, String input_type){
- agg = new HlldApproxCountDistinct();
- UDFContext c = new UDFContext();
- Map<String, Object> parameters = new HashMap<>();
- parameters.put("precision", precision);
- parameters.put("input_type", input_type);
- c.setParameters(parameters);
- c.setLookup_fields(Collections.singletonList("ip"));
- c.setOutput_fields(Collections.singletonList("ip_cnt"));
-
- agg.open(c);
- Map<String, Object> map = new HashMap<>();
- acc = new Accumulator();
- acc.setMetricsFields(map);
- agg.initAccumulator(acc);
-
- event = new Event();
- Map<String, Object> fields = new HashMap<>();
- event.setExtractedFields(fields);
- }
+package com.geedgenetworks.core.udf.udaf.hlld;
+
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.sketch.hlld.Hll;
+import com.geedgenetworks.sketch.util.StringUtils;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+public class HlldApproxCountDistinctTest {
+ AggregateFunction agg;
+ Accumulator acc;
+ Event event;
+
+
+ @Test
+ public void inputRegular() {
+ initData(14, "regular");
+ long count = 100000;
+ Map<String, Object> fields = event.getExtractedFields();
+ for (int i = 0; i < count; i++) {
+ fields.put("ip", i);
+ agg.add(event, acc);
+ }
+
+ long rst = (long)agg.getResult(acc).getMetricsFields().get("ip_cnt");
+ double error = Math.abs(rst - count) / (double) count;
+ System.out.println(String.format("%d,%d,%.4f", count , rst , error));
+ assertTrue(error <= 0.05);
+ }
+
+ @Test
+ public void inputSketch() {
+ initData(14, "sketch");
+ long count = 150000;
+ Map<String, Object> fields = event.getExtractedFields();
+
+ Hll hll = new Hll(12);
+ for (int i = 0; i < 100000; i++) {
+ hll.add(i);
+ }
+ fields.put("ip", StringUtils.encodeBase64String(hll.toBytes()));
+ agg.add(event, acc);
+
+ hll = new Hll(13);
+ for (int i = 50000; i < 150000; i++) {
+ hll.add(i);
+ }
+ fields.put("ip", hll.toBytes());
+ agg.add(event, acc);
+
+ long rst = (long)agg.getResult(acc).getMetricsFields().get("ip_cnt");
+ double error = Math.abs(rst - count) / (double) count;
+ System.out.println(String.format("%d,%d,%.4f", count , rst , error));
+ assertTrue(error <= 0.05);
+ }
+
+ private void initData(int precision, String input_type){
+ agg = new HlldApproxCountDistinct();
+ UDFContext c = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("precision", precision);
+ parameters.put("input_type", input_type);
+ c.setParameters(parameters);
+ c.setLookupFields(Collections.singletonList("ip"));
+ c.setOutputFields(Collections.singletonList("ip_cnt"));
+
+ agg.open(c);
+ Map<String, Object> map = new HashMap<>();
+ acc = new Accumulator();
+ acc.setMetricsFields(map);
+ agg.initAccumulator(acc);
+
+ event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ event.setExtractedFields(fields);
+ }
} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldTest.java
index f489ee4..d6ed4c1 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udaf/hlld/HlldTest.java
@@ -1,86 +1,86 @@
-package com.geedgenetworks.core.udf.udaf.hlld;
-
-import com.geedgenetworks.common.Accumulator;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.udf.AggregateFunction;
-import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.sketch.hlld.Hll;
-import com.geedgenetworks.sketch.util.StringUtils;
-import org.junit.jupiter.api.Test;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class HlldTest {
- AggregateFunction agg;
- Accumulator acc;
- Event event;
-
- @Test
- public void inputRegular() {
- initData(14, "regular", "base64");
- long count = 100000;
- Map<String, Object> fields = event.getExtractedFields();
- for (int i = 0; i < count; i++) {
- fields.put("ip", i);
- agg.add(event, acc);
- }
-
- String hllStr = (String)agg.getResult(acc).getMetricsFields().get("ip_cnt");
- long rst = (long) Hll.fromBytes(StringUtils.decodeBase64(hllStr.getBytes(StandardCharsets.UTF_8))).size();
- double error = Math.abs(rst - count) / (double) count;
- System.out.println(String.format("%d,%d,%.4f", count , rst , error));
- assertTrue(error <= 0.05);
- }
-
- @Test
- public void inputSketch() {
- initData(14, "sketch", "binary");
- long count = 150000;
- Map<String, Object> fields = event.getExtractedFields();
- for (int i = 0; i < 100000; i++) {
- Hll hll = new Hll(12);
- hll.add(i);
- fields.put("ip", StringUtils.encodeBase64String(hll.toBytes()));
- agg.add(event, acc);
- }
- for (int i = 50000; i < 150000; i++) {
- Hll hll = new Hll(13);
- hll.add(i);
- fields.put("ip", hll.toBytes());
- agg.add(event, acc);
- }
-
- byte[] hllBytes = (byte[])agg.getResult(acc).getMetricsFields().get("ip_cnt");
- long rst = (long) Hll.fromBytes(hllBytes).size();
- double error = Math.abs(rst - count) / (double) count;
- System.out.println(String.format("%d,%d,%.4f", count , rst , error));
- assertTrue(error <= 0.05);
- }
-
- private void initData(int precision, String input_type, String output_format){
- agg = new Hlld();
- UDFContext c = new UDFContext();
- Map<String, Object> parameters = new HashMap<>();
- parameters.put("precision", precision);
- parameters.put("input_type", input_type);
- parameters.put("output_format", output_format);
- c.setParameters(parameters);
- c.setLookup_fields(Collections.singletonList("ip"));
- c.setOutput_fields(Collections.singletonList("ip_cnt"));
-
- agg.open(c);
- Map<String, Object> map = new HashMap<>();
- acc = new Accumulator();
- acc.setMetricsFields(map);
- agg.initAccumulator(acc);
-
- event = new Event();
- Map<String, Object> fields = new HashMap<>();
- event.setExtractedFields(fields);
- }
-}
+package com.geedgenetworks.core.udf.udaf.hlld;
+
+import com.geedgenetworks.common.Accumulator;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.AggregateFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.sketch.hlld.Hll;
+import com.geedgenetworks.sketch.util.StringUtils;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HlldTest {
+ AggregateFunction agg;
+ Accumulator acc;
+ Event event;
+
+ @Test
+ public void inputRegular() {
+ initData(14, "regular", "base64");
+ long count = 100000;
+ Map<String, Object> fields = event.getExtractedFields();
+ for (int i = 0; i < count; i++) {
+ fields.put("ip", i);
+ agg.add(event, acc);
+ }
+
+ String hllStr = (String)agg.getResult(acc).getMetricsFields().get("ip_cnt");
+ long rst = (long) Hll.fromBytes(StringUtils.decodeBase64(hllStr.getBytes(StandardCharsets.UTF_8))).size();
+ double error = Math.abs(rst - count) / (double) count;
+ System.out.println(String.format("%d,%d,%.4f", count , rst , error));
+ assertTrue(error <= 0.05);
+ }
+
+ @Test
+ public void inputSketch() {
+ initData(14, "sketch", "binary");
+ long count = 150000;
+ Map<String, Object> fields = event.getExtractedFields();
+ for (int i = 0; i < 100000; i++) {
+ Hll hll = new Hll(12);
+ hll.add(i);
+ fields.put("ip", StringUtils.encodeBase64String(hll.toBytes()));
+ agg.add(event, acc);
+ }
+ for (int i = 50000; i < 150000; i++) {
+ Hll hll = new Hll(13);
+ hll.add(i);
+ fields.put("ip", hll.toBytes());
+ agg.add(event, acc);
+ }
+
+ byte[] hllBytes = (byte[])agg.getResult(acc).getMetricsFields().get("ip_cnt");
+ long rst = (long) Hll.fromBytes(hllBytes).size();
+ double error = Math.abs(rst - count) / (double) count;
+ System.out.println(String.format("%d,%d,%.4f", count , rst , error));
+ assertTrue(error <= 0.05);
+ }
+
+ private void initData(int precision, String input_type, String output_format){
+ agg = new Hlld();
+ UDFContext c = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("precision", precision);
+ parameters.put("input_type", input_type);
+ parameters.put("output_format", output_format);
+ c.setParameters(parameters);
+ c.setLookupFields(Collections.singletonList("ip"));
+ c.setOutputFields(Collections.singletonList("ip_cnt"));
+
+ agg.open(c);
+ Map<String, Object> map = new HashMap<>();
+ acc = new Accumulator();
+ acc.setMetricsFields(map);
+ agg.initAccumulator(acc);
+
+ event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ event.setExtractedFields(fields);
+ }
+}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udtf/UnrollTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udtf/UnrollTest.java
index 15f3c10..3320f38 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/udtf/UnrollTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udtf/UnrollTest.java
@@ -1,109 +1,109 @@
-package com.geedgenetworks.core.udf.udtf;
-
-import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.udf.UDFContext;
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import static org.junit.jupiter.api.Assertions.*;
-
-
-public class UnrollTest {
- PathUnroll pathUnroll;
- Event event;
-
- @Test
- public void explodePathWithNoFileField() {
- init("path", "out_path", ".");
- Map<String, Object> fields = event.getExtractedFields();
- fields.put("path", "ETHERNET.IPv4.TCP.ssl");
- String[] excepted = new String[]{"ETHERNET","ETHERNET.IPv4","ETHERNET.IPv4.TCP","ETHERNET.IPv4.TCP.ssl"};
- String[] outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new);
- System.out.println(JSON.toJSONString(outPaths));
- assertArrayEquals(outPaths, excepted);
- // 忽略结尾的分隔符
- fields.put("path", "ETHERNET.IPv4.TCP.ssl.");
- outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new);
- System.out.println(JSON.toJSONString(outPaths));
- assertArrayEquals(outPaths, excepted);
- // 空路径不输出
- fields.put("path", "");
- outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new);
- System.out.println(JSON.toJSONString(outPaths));
- assertTrue(outPaths.length == 0);
-
- init("path", "out_path", "/");
- fields = event.getExtractedFields();
- fields.put("path", "ETHERNET/IPv4/TCP/ssl");
- excepted = new String[]{"ETHERNET","ETHERNET/IPv4","ETHERNET/IPv4/TCP","ETHERNET/IPv4/TCP/ssl"};
- outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new);
- System.out.println(JSON.toJSONString(outPaths));
- assertArrayEquals(outPaths, excepted);
- // 忽略结尾的分隔符
- fields.put("path", "ETHERNET/IPv4/TCP/ssl/");
- outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new);
- System.out.println(JSON.toJSONString(outPaths));
- assertArrayEquals(outPaths, excepted);
- }
-
- @Test
- public void explodePathWithFileField() {
- init("path", "file", "out_path", "out_file", ".");
- Map<String, Object> fields = event.getExtractedFields();
- fields.put("path", "ETHERNET.IPv4.TCP.ssl");
- fields.put("file", "ssl");
- String[] excepted = new String[]{"ETHERNET", "ETHERNET.IPv4", "ETHERNET.IPv4.TCP", "ETHERNET.IPv4.TCP.ssl"};
- String[] exceptedFile = new String[]{null, null, null, "ssl"};
- String[] outPaths = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_path")).toArray(String[]::new);
- String[] outFiles = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_file")).toArray(String[]::new);
- System.out.println(JSON.toJSONString(outPaths));
- System.out.println(JSON.toJSONString(outFiles));
- assertArrayEquals(outPaths, excepted);
- assertArrayEquals(outFiles, exceptedFile);
- // 忽略结尾的分隔符
- fields.put("path", "ETHERNET.IPv4.TCP.ssl.");
- fields.put("file", "ssl");
- outPaths = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_path")).toArray(String[]::new);
- outFiles = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_file")).toArray(String[]::new);
- System.out.println(JSON.toJSONString(outPaths));
- System.out.println(JSON.toJSONString(outFiles));
- assertArrayEquals(outPaths, excepted);
- assertArrayEquals(outFiles, exceptedFile);
-
- fields.put("path", "ETHERNET.IPv4.TCP.ssl");
- fields.put("file", "ssl.aa");
- excepted = new String[]{"ETHERNET", "ETHERNET.IPv4", "ETHERNET.IPv4.TCP", "ETHERNET.IPv4.TCP.ssl", "ETHERNET.IPv4.TCP.ssl.ssl.aa"};
- exceptedFile = new String[]{null, null, null, null,"ssl.aa"};
- outPaths = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_path")).toArray(String[]::new);
- outFiles = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_file")).toArray(String[]::new);
- System.out.println(JSON.toJSONString(outPaths));
- System.out.println(JSON.toJSONString(outFiles));
- assertArrayEquals(outPaths, excepted);
- assertArrayEquals(outFiles, exceptedFile);
- }
-
- private void init(String pathField, String outputPathField, String separator){
- init(pathField, null, outputPathField, null, separator);
- }
-
- private void init(String pathField, String fileField, String outputPathField, String outputFileField, String separator){
- pathUnroll = new PathUnroll();
- UDFContext c = new UDFContext();
- Map<String, Object> parameters = new HashMap<>();
- parameters.put("separator", separator);
- c.setParameters(parameters);
- c.setLookup_fields(Arrays.asList(pathField, fileField).stream().filter(x -> x != null).collect(Collectors.toList()));
- c.setOutput_fields(Arrays.asList(outputPathField, outputFileField).stream().filter(x -> x != null).collect(Collectors.toList()));
-
- pathUnroll.open(null, c);
- event = new Event();
- Map<String, Object> fields = new HashMap<>();
- event.setExtractedFields(fields);
- }
+package com.geedgenetworks.core.udf.udtf;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+
+public class UnrollTest {
+ PathUnroll pathUnroll;
+ Event event;
+
+ @Test
+ public void explodePathWithNoFileField() {
+ init("path", "out_path", ".");
+ Map<String, Object> fields = event.getExtractedFields();
+ fields.put("path", "ETHERNET.IPv4.TCP.ssl");
+ String[] excepted = new String[]{"ETHERNET","ETHERNET.IPv4","ETHERNET.IPv4.TCP","ETHERNET.IPv4.TCP.ssl"};
+ String[] outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new);
+ System.out.println(JSON.toJSONString(outPaths));
+ assertArrayEquals(outPaths, excepted);
+ // 忽略结尾的分隔符
+ fields.put("path", "ETHERNET.IPv4.TCP.ssl.");
+ outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new);
+ System.out.println(JSON.toJSONString(outPaths));
+ assertArrayEquals(outPaths, excepted);
+ // 空路径不输出
+ fields.put("path", "");
+ outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new);
+ System.out.println(JSON.toJSONString(outPaths));
+ assertTrue(outPaths.length == 0);
+
+ init("path", "out_path", "/");
+ fields = event.getExtractedFields();
+ fields.put("path", "ETHERNET/IPv4/TCP/ssl");
+ excepted = new String[]{"ETHERNET","ETHERNET/IPv4","ETHERNET/IPv4/TCP","ETHERNET/IPv4/TCP/ssl"};
+ outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new);
+ System.out.println(JSON.toJSONString(outPaths));
+ assertArrayEquals(outPaths, excepted);
+ // 忽略结尾的分隔符
+ fields.put("path", "ETHERNET/IPv4/TCP/ssl/");
+ outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new);
+ System.out.println(JSON.toJSONString(outPaths));
+ assertArrayEquals(outPaths, excepted);
+ }
+
+ @Test
+ public void explodePathWithFileField() {
+ init("path", "file", "out_path", "out_file", ".");
+ Map<String, Object> fields = event.getExtractedFields();
+ fields.put("path", "ETHERNET.IPv4.TCP.ssl");
+ fields.put("file", "ssl");
+ String[] excepted = new String[]{"ETHERNET", "ETHERNET.IPv4", "ETHERNET.IPv4.TCP", "ETHERNET.IPv4.TCP.ssl"};
+ String[] exceptedFile = new String[]{null, null, null, "ssl"};
+ String[] outPaths = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_path")).toArray(String[]::new);
+ String[] outFiles = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_file")).toArray(String[]::new);
+ System.out.println(JSON.toJSONString(outPaths));
+ System.out.println(JSON.toJSONString(outFiles));
+ assertArrayEquals(outPaths, excepted);
+ assertArrayEquals(outFiles, exceptedFile);
+ // 忽略结尾的分隔符
+ fields.put("path", "ETHERNET.IPv4.TCP.ssl.");
+ fields.put("file", "ssl");
+ outPaths = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_path")).toArray(String[]::new);
+ outFiles = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_file")).toArray(String[]::new);
+ System.out.println(JSON.toJSONString(outPaths));
+ System.out.println(JSON.toJSONString(outFiles));
+ assertArrayEquals(outPaths, excepted);
+ assertArrayEquals(outFiles, exceptedFile);
+
+ fields.put("path", "ETHERNET.IPv4.TCP.ssl");
+ fields.put("file", "ssl.aa");
+ excepted = new String[]{"ETHERNET", "ETHERNET.IPv4", "ETHERNET.IPv4.TCP", "ETHERNET.IPv4.TCP.ssl", "ETHERNET.IPv4.TCP.ssl.ssl.aa"};
+ exceptedFile = new String[]{null, null, null, null,"ssl.aa"};
+ outPaths = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_path")).toArray(String[]::new);
+ outFiles = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_file")).toArray(String[]::new);
+ System.out.println(JSON.toJSONString(outPaths));
+ System.out.println(JSON.toJSONString(outFiles));
+ assertArrayEquals(outPaths, excepted);
+ assertArrayEquals(outFiles, exceptedFile);
+ }
+
+ private void init(String pathField, String outputPathField, String separator){
+ init(pathField, null, outputPathField, null, separator);
+ }
+
+ private void init(String pathField, String fileField, String outputPathField, String outputFileField, String separator){
+ pathUnroll = new PathUnroll();
+ UDFContext c = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("separator", separator);
+ c.setParameters(parameters);
+ c.setLookupFields(Arrays.asList(pathField, fileField).stream().filter(x -> x != null).collect(Collectors.toList()));
+ c.setOutputFields(Arrays.asList(outputPathField, outputFileField).stream().filter(x -> x != null).collect(Collectors.toList()));
+
+ pathUnroll.open(null, c);
+ event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ event.setExtractedFields(fields);
+ }
} \ No newline at end of file
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
index fd5c035..047f1ba 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
@@ -23,6 +23,8 @@ sources:
type: string
- name: device_tag
type: string
+ - name: http_host
+ type: string
properties:
data: '{"tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}'
format: json
@@ -49,6 +51,13 @@ processing_pipelines:
lookup_fields: []
output_fields: []
filter: event.client_ip == '192.168.10.100'
+
+ - function: DOMAIN
+ lookup_fields: [ http_host, ssl_sni, quic_sni ]
+ output_fields: [ server_domain ]
+ parameters:
+ option: FIRST_SIGNIFICANT_SUBDOMAIN
+
- function: SNOWFLAKE_ID
lookup_fields: []
output_fields: [log_id]
@@ -83,6 +92,21 @@ processing_pipelines:
kb_name: tsg_ip_location
option: IP_TO_DETAIL
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ ]
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: server_country
+ PROVINCE: server_super_administrative_area
+ CITY: server_administrative_area
+ LONGITUDE: server_longitude
+ LATITUDE: server_latitude
+ ISP: server_isp
+ ORGANIZATION: server_organization
+
- function: JSON_EXTRACT
lookup_fields: [ device_tag ]
output_fields: [ device_group ]
@@ -124,6 +148,7 @@ application:
env:
name: example-inline-to-print
parallelism: 3
+ kms.type: local
pipeline:
object-reuse: true
topology:
diff --git a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
index f724a36..2908ffb 100644
--- a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
+++ b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
@@ -70,9 +70,10 @@ processing_pipelines:
ORGANIZATION: server_organization
- function : BASE64_ENCODE_TO_STRING
+ lookup_fields: [ mail_subject ]
output_fields: [ mail_subject_base64 ]
parameters:
- value_field: mail_subject
+ input_type: string
- function: BASE64_DECODE_TO_STRING
output_fields: [ mail_attachment_name ]
@@ -196,6 +197,7 @@ application:
parallelism: 1
pipeline:
object-reuse: true
+
properties:
hos.path: http://192.168.44.12:9098/hos
hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket