summaryrefslogtreecommitdiff
path: root/groot-core
diff options
context:
space:
mode:
Diffstat (limited to 'groot-core')
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/types/BooleanType.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java23
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestamp.java)17
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java18
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java52
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java102
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java16
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java20
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java17
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java9
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java12
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/HadoopUtils.java139
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/GeoIpKnowledgeBase.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java10
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/hbase/HbaseConnectBuilder.java3
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java2
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java5
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java16
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java (renamed from groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampConverterTest.java)11
24 files changed, 165 insertions, 349 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java
index f268689..e62310d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.core.pojo;
-import com.geedgenetworks.common.config.KnowledgeConfig;
+import com.geedgenetworks.common.config.KnowledgeBaseConfig;
import com.geedgenetworks.core.utils.KnowlegdeBase.AbstractKnowledgeBase;
import lombok.Data;
@@ -8,7 +8,7 @@ import java.util.List;
@Data
public class KnowledgeBaseEntity {
- private KnowledgeConfig knowledgeConfig;
+ private KnowledgeBaseConfig knowledgeConfig;
private List<KnowLedgeFileEntity> knowLedgeFileEntityList;
private AbstractKnowledgeBase abstractKnowledgeBase;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/types/BooleanType.java b/groot-core/src/main/java/com/geedgenetworks/core/types/BooleanType.java
new file mode 100644
index 0000000..c19df1d
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/types/BooleanType.java
@@ -0,0 +1,8 @@
+package com.geedgenetworks.core.types;
+
+public class BooleanType extends DataType{
+ @Override
+ public String simpleString() {
+ return "boolean";
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
index 5f6f1d7..881f356 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
@@ -1,38 +1,27 @@
package com.geedgenetworks.core.udf;
-import com.alibaba.fastjson.JSON;
-import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.EngineConfig;
-import com.geedgenetworks.common.config.KnowledgeConfig;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.UDFContext;
import com.geedgenetworks.core.utils.KnowlegdeBase.AsnKnowledgeBase;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseUpdateJob;
-import com.geedgenetworks.utils.IpLookupV2;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.quartz.SchedulerException;
-
-import static com.geedgenetworks.core.utils.SchedulerUtils.shutdownScheduler;
@Slf4j
public class AsnLookup implements UDF {
- private UDFContext udfContext;
private String vender;
private String option;
+ private String lookupFieldName;
+ private String outputFieldName;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
checkUdfContext(udfContext);
- this.udfContext = udfContext;
this.vender = udfContext.getParameters().get("vendor_id").toString();
this.option = udfContext.getParameters().get("option").toString();
KnowledgeBaseUpdateJob.initKnowledgeBase(vender,AsnKnowledgeBase.getInstance(), runtimeContext);
@@ -43,6 +32,8 @@ public class AsnLookup implements UDF {
else {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Init AsnKnowledgeBase error ");
}
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@@ -51,15 +42,15 @@ public class AsnLookup implements UDF {
if(AsnKnowledgeBase.getVenderWithAsnLookup()!=null && AsnKnowledgeBase.getVenderWithAsnLookup().containsKey(vender)){
- if(event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))){
+ if(event.getExtractedFields().containsKey(lookupFieldName)){
switch (option) {
case "IP_TO_ASN":
String asn = AsnKnowledgeBase.getVenderWithAsnLookup()
.get(vender)
- .asnLookup(event.getExtractedFields().get(udfContext.getLookup_fields().get(0)).toString());
+ .asnLookup(event.getExtractedFields().get(lookupFieldName).toString());
if(!asn.isEmpty()) {
event.getExtractedFields()
- .put(udfContext.getOutput_fields().get(0), asn);
+ .put(outputFieldName, asn);
}
break;
default:
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java
index 670deef..474dd17 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestamp.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java
@@ -8,16 +8,14 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
@Slf4j
-public class UnixTimestamp implements UDF {
-
- private UDFContext udfContext;
+public class CurrentUnixTimestamp implements UDF {
private String precision;
+ private String outputFieldName;
+
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- this.udfContext = udfContext;
-
if(udfContext.getOutput_fields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
@@ -33,24 +31,25 @@ public class UnixTimestamp implements UDF {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct");
}
}
-
+ this.precision = udfContext.getParameters().get("precision").toString();
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@Override
public Event evaluate(Event event) {
long timestamp = System.currentTimeMillis();
- if ("seconds".equals(udfContext.getParameters().get("precision"))) {
+ if ("seconds".equals(precision)) {
timestamp = timestamp / 1000;
}
- event.getExtractedFields().put(udfContext.getOutput_fields().get(0), timestamp);
+ event.getExtractedFields().put(outputFieldName, timestamp);
return event;
}
@Override
public String functionName() {
- return "UNIX_TIMESTAMP_FUNCTION";
+ return "CURRENT_UNIX_TIMESTAMP";
}
@Override
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java
index 82cb1a4..7d825c9 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java
@@ -17,8 +17,9 @@ import java.util.Base64;
@Slf4j
public class DecodeBase64 implements UDF {
- private UDFContext udfContext;
-
+ private String lookupFieldNameFirst;
+ private String lookupFieldNameSecond;
+ private String outputFieldName;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
if(udfContext.getLookup_fields().size() !=2){
@@ -27,21 +28,22 @@ public class DecodeBase64 implements UDF {
if(udfContext.getOutput_fields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
- this.udfContext = udfContext;
-
+ this.lookupFieldNameFirst = udfContext.getLookup_fields().get(0);
+ this.lookupFieldNameSecond = udfContext.getLookup_fields().get(1);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@Override
public Event evaluate(Event event) {
- if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) {
+ if (event.getExtractedFields().containsKey(lookupFieldNameFirst)) {
String decodeResult = "";
String message =
(String)
event.getExtractedFields()
- .get(udfContext.getLookup_fields().get(0));
+ .get(lookupFieldNameFirst);
Object charset =
- event.getExtractedFields().getOrDefault(udfContext.getLookup_fields().get(1),"");
+ event.getExtractedFields().getOrDefault(lookupFieldNameSecond,"");
try {
if (StringUtil.isNotBlank(message)) {
byte[] base64decodedBytes = Base64.getDecoder().decode(message);
@@ -61,7 +63,7 @@ public class DecodeBase64 implements UDF {
+ e.getMessage());
}
event.getExtractedFields()
- .put(udfContext.getOutput_fields().get(0), decodeResult);
+ .put(outputFieldName, decodeResult);
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
index e7a5c37..a5d6b91 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
@@ -10,18 +10,18 @@ import com.geedgenetworks.utils.FormatUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
+import java.util.List;
+
@Slf4j
public class Domain implements UDF {
- private UDFContext udfContext;
private String option;
-
-
+ private List<String> lookupFields;
+ private String outputFieldName;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- this.udfContext = udfContext;
if(udfContext.getLookup_fields().isEmpty()){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup field is not empty");
}
@@ -42,6 +42,8 @@ public class Domain implements UDF {
}
}
this.option = udfContext.getParameters().get("option").toString();
+ this.lookupFields = udfContext.getLookup_fields();
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@@ -49,7 +51,7 @@ public class Domain implements UDF {
@Override
public Event evaluate(Event event) {
String domain = "";
- for (String lookupField : udfContext.getLookup_fields()){
+ for (String lookupField : lookupFields){
if(event.getExtractedFields().containsKey(lookupField)){
@@ -78,7 +80,7 @@ public class Domain implements UDF {
}
}
}
- event.getExtractedFields().put(udfContext.getOutput_fields().get(0), domain);
+ event.getExtractedFields().put(outputFieldName, domain);
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java
index 46a8072..dec2ddc 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java
@@ -11,11 +11,12 @@ import java.text.SimpleDateFormat;
import java.util.TimeZone;
@Slf4j
public class FromUnixTimestamp implements UDF {
- private UDFContext udfContext;
-
+ private String precision;
+ private String outputFieldName;
+ private String lookupFieldName;
+ private SimpleDateFormat sdf;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- this.udfContext = udfContext;
if(udfContext.getOutput_fields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
@@ -36,32 +37,35 @@ public class FromUnixTimestamp implements UDF {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters precision value is not correct");
}
}
+ this.precision = udfContext.getParameters().get("precision").toString();
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ switch (precision) {
+ case "seconds":
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ break;
+ case "milliseconds":
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
+ break;
+ case "microseconds":
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000");
+ break;
+ case "nanoseconds":
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000:000");
+ break;
+ default:
+ break;
+ }
+ sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
}
@Override
public Event evaluate(Event event) {
- if(event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))){
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- switch (udfContext.getParameters().get("precision").toString()) {
- case "seconds":
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- break;
- case "milliseconds":
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
- break;
- case "microseconds":
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000");
- break;
- case "nanoseconds":
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000:000");
- break;
- default:
- break;
- }
- sdf.setTimeZone(TimeZone.getTimeZone(udfContext.getParameters().get("timezone").toString()));
- String timestamp = sdf.format(Long.parseLong(event.getExtractedFields().get(udfContext.getLookup_fields().get(0)).toString()));
- event.getExtractedFields().put(udfContext.getOutput_fields().get(0), timestamp);
+ if(event.getExtractedFields().containsKey(lookupFieldName)){
+ String timestamp = sdf.format(Long.parseLong(event.getExtractedFields().get(lookupFieldName).toString()));
+ event.getExtractedFields().put(outputFieldName, timestamp);
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
index d2e29f6..4f657da 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
@@ -20,20 +20,19 @@ import java.util.Map;
@Slf4j
public class GeoIpLookup implements UDF {
- private UDFContext udfContext;
private String vender;
private String option;
-
- private Map<String,String> geolocation_field_mapping;
+ private String lookupFieldName;
+ private String outputFieldName;
+ private Map<String,String> geoLocationFieldMapping;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
checkUdfContext(udfContext);
- this.udfContext = udfContext;
this.vender = udfContext.getParameters().get("vendor_id").toString();
this.option = udfContext.getParameters().get("option").toString();
if(option.equals("IP_TO_OBJECT")){
- this.geolocation_field_mapping = (Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping");
+ this.geoLocationFieldMapping = (Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping");
}
KnowledgeBaseUpdateJob.initKnowledgeBase(vender, GeoIpKnowledgeBase.getInstance(),runtimeContext);
if(GeoIpKnowledgeBase.getVenderWithIpLookup()!=null && GeoIpKnowledgeBase.getVenderWithIpLookup().containsKey(vender)){
@@ -42,99 +41,75 @@ public class GeoIpLookup implements UDF {
else {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Init GeoIpLookup error ");
}
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@Override
public Event evaluate(Event event) {
- if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) {
+ if (event.getExtractedFields().containsKey(lookupFieldName)) {
switch (option) {
case "IP_TO_COUNTRY":
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.countryLookup(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
case "IP_TO_PROVINCE":
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.provinceLookup(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
case "IP_TO_CITY":
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.cityLookup(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
case "IP_TO_SUBDIVISION_ADDR":
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.cityLookupDetail(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
case "IP_TO_DETAIL":
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.locationLookupDetail(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
case "IP_TO_LATLNG":
- String geo =
- GeoIpKnowledgeBase.getVenderWithIpLookup()
- .get(vender)
- .latLngLookup(
- event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
- .toString());
-
event.getExtractedFields()
- .put(udfContext.getOutput_fields().get(0), geo);
+ .put(outputFieldName, GeoIpKnowledgeBase.getVenderWithIpLookup()
+ .get(vender)
+ .latLngLookup(
+ event.getExtractedFields()
+ .get(lookupFieldName)
+ .toString()));
break;
case "IP_TO_PROVIDER":
@@ -144,30 +119,22 @@ public class GeoIpLookup implements UDF {
.get(vender)
.ispLookup(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()),
TypeReference.mapType(
HashMap.class, String.class, Object.class));
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
serverIpMap.getOrDefault("isp", StringUtil.EMPTY));
break;
case "IP_TO_JSON ":
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.infoLookupToJSONString(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
case "IP_TO_OBJECT":
@@ -176,13 +143,10 @@ public class GeoIpLookup implements UDF {
.get(vender)
.infoLookup(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString());
- for (Map.Entry<String, String> entry : geolocation_field_mapping.entrySet()) {
+ for (Map.Entry<String, String> entry : geoLocationFieldMapping.entrySet()) {
switch (entry.getKey()) {
case "COUNTRY":
event.getExtractedFields()
@@ -216,16 +180,12 @@ public class GeoIpLookup implements UDF {
break;
}
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.infoLookup(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java
index c5433ea..7efc81e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java
@@ -10,7 +10,10 @@ import org.apache.flink.api.common.functions.RuntimeContext;
public class JsonExtract implements UDF {
private UDFContext udfContext;
+ private String lookupFieldName;
+ private String outputFieldName;
+ private String param;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
this.udfContext = udfContext;
@@ -26,7 +29,9 @@ public class JsonExtract implements UDF {
if(!udfContext.getParameters().containsKey("param")){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey param");
}
-
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.param =udfContext.getParameters().get("param").toString();
}
@@ -34,16 +39,15 @@ public class JsonExtract implements UDF {
@Override
public Event evaluate(Event event) {
- if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) {
+ if (event.getExtractedFields().containsKey(lookupFieldName)) {
String result =
(String)
JsonPathUtil.analysis(
event.getExtractedFields()
- .get(udfContext.getLookup_fields().get(0))
- .toString(),
- udfContext.getParameters().get("param").toString());
- event.getExtractedFields().put(udfContext.getOutput_fields().get(0), result);
+ .get(lookupFieldName)
+ .toString(),param);
+ event.getExtractedFields().put(outputFieldName, result);
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
index 7b21adf..03be355 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
@@ -2,7 +2,7 @@ package com.geedgenetworks.core.udf;
import com.alibaba.fastjson.JSON;
import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.EngineConfig;
+import com.geedgenetworks.common.config.CommonConfig;
import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.UDFContext;
import lombok.extern.slf4j.Slf4j;
@@ -14,24 +14,15 @@ import java.util.*;
@Slf4j
public class PathCombine implements UDF {
- private UDFContext udfContext;
-
- private StringBuilder stringBuilder;
-
- private Map<String, String> pathParameters = new LinkedHashMap<>();
-
-
-
-
+ private final Map<String, String> pathParameters = new LinkedHashMap<>();
+ private String outputFieldName;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- this.udfContext = udfContext;
-
Configuration configuration = (Configuration) runtimeContext
.getExecutionConfig().getGlobalJobParameters();
- EngineConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class);
+ CommonConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class);
Map<String,String> propertiesConfig =engineConfig.getPropertiesConfig();
if (udfContext.getParameters() != null
&& !udfContext.getParameters().isEmpty()) {
@@ -56,6 +47,7 @@ public class PathCombine implements UDF {
}
}
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@@ -78,7 +70,7 @@ public class PathCombine implements UDF {
}
}
String path = joinUrlParts(pathBuilder);
- event.getExtractedFields().put(udfContext.getOutput_fields().get(0), path);
+ event.getExtractedFields().put(outputFieldName, path);
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
index 134ed66..61fa44a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
@@ -5,23 +5,24 @@ import com.geedgenetworks.core.pojo.UDFContext;
import org.apache.flink.api.common.functions.RuntimeContext;
public class Rename implements UDF {
- private UDFContext udfContext;
-
+ private String lookupFieldName;
+ private String outputFieldName;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- this.udfContext = udfContext;
+
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@Override
public Event evaluate(Event event) {
- if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) {
+ if (event.getExtractedFields().containsKey(lookupFieldName)) {
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
event.getExtractedFields()
- .get(udfContext.getLookup_fields().get(0)));
- event.getExtractedFields().remove(udfContext.getLookup_fields().get(0));
+ .get(lookupFieldName));
+ event.getExtractedFields().remove(lookupFieldName);
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java
index da0b71f..070c42b 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java
@@ -1,9 +1,7 @@
package com.geedgenetworks.core.udf;
-import com.geedgenetworks.common.config.CommonConfig;
import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.UDFContext;
-import com.geedgenetworks.core.udf.UDF;
import com.geedgenetworks.core.utils.SnowflakeIdUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
@@ -11,21 +9,20 @@ import java.io.Serializable;
public class SnowflakeId implements Serializable, UDF {
- private UDFContext udfContext;
-
+ private String outputFieldName;
private SnowflakeIdUtils snowflakeIdUtils;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
String data_center_id_num = udfContext.getParameters().getOrDefault("data_center_id_num","0").toString();//转为数字
snowflakeIdUtils = new SnowflakeIdUtils(Integer.parseInt(data_center_id_num),runtimeContext.getIndexOfThisSubtask());
- this.udfContext = udfContext;
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@Override
public Event evaluate(Event event) {
event.getExtractedFields()
- .put(udfContext.getOutput_fields().get(0), snowflakeIdUtils.nextId());
+ .put(outputFieldName, snowflakeIdUtils.nextId());
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
index e557677..15228bb 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
@@ -14,8 +14,9 @@ import java.time.Instant;
public class UnixTimestampConverter implements UDF {
private UDFContext udfContext;
-
private String precision;
+ private String lookupFieldName;
+ private String outputFieldName;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
@@ -39,7 +40,8 @@ public class UnixTimestampConverter implements UDF {
this.precision =udfContext.getParameters().get("precision").toString();
}
}
-
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@@ -47,8 +49,8 @@ public class UnixTimestampConverter implements UDF {
@Override
public Event evaluate(Event event) {
- if(event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) {
- Long timestamp = Long.parseLong(event.getExtractedFields().get(udfContext.getLookup_fields().get(0)).toString());
+ if(event.getExtractedFields().containsKey(lookupFieldName)) {
+ Long timestamp = Long.parseLong(event.getExtractedFields().get(lookupFieldName).toString());
Instant instant = null;
if (String.valueOf(timestamp).length() == 13) {
// 时间戳长度大于10,表示为毫秒级时间戳
@@ -67,7 +69,7 @@ public class UnixTimestampConverter implements UDF {
timestamp = instant.toEpochMilli();
break;
}
- event.getExtractedFields().put(udfContext.getOutput_fields().get(0), timestamp);
+ event.getExtractedFields().put(outputFieldName, timestamp);
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/HadoopUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/HadoopUtils.java
deleted file mode 100644
index b7d3637..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/HadoopUtils.java
+++ /dev/null
@@ -1,139 +0,0 @@
-package com.geedgenetworks.core.utils;
-
-import com.geedgenetworks.common.config.CommonConfig;
-
-import com.geedgenetworks.utils.StringUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-
-import java.io.IOException;
-
-/**
- * @author qidaijie
- * @version 2022/11/2 17:57
- */
-public class HadoopUtils {
- private static final Log logger = LogFactory.get();
-
- private static HadoopUtils hadoopUtils;
-
- private static FileSystem fileSystem;
-
- private static void getInstance() {
- hadoopUtils = new HadoopUtils();
- }
-
- /** 构造函数 */
- private HadoopUtils() {
- // 获取连接
- getConnection();
- }
-
- private static void getConnection() {
- Configuration configuration = new Configuration();
- try {
- // 指定用户
- System.setProperty("HADOOP_USER_NAME", "etl");
- // 配置hdfs相关信息
- configuration.set("fs.defaultFS", "hdfs://ns1");
- configuration.set("hadoop.proxyuser.root.hosts", "*");
- configuration.set("hadoop.proxyuser.root.groups", "*");
- configuration.set("dfs.nameservices", "ns1");
- configuration.set("dfs.ha.namenodes.ns1", "nn1,nn2");
- String[] servers = StringUtil.split(CommonConfig.HDFS_SERVERS, ",");
- configuration.set("dfs.namenode.rpc-address.ns1.nn1", servers[0]);
- configuration.set("dfs.namenode.rpc-address.ns1.nn2", servers[1]);
- configuration.set(
- "dfs.client.failover.proxy.provider.ns1",
- "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
- // 创建fileSystem,用于连接hdfs
- fileSystem = FileSystem.get(configuration);
- } catch (IOException | RuntimeException e) {
- logger.error("Failed to create HDFS connection. message is: " + e.getMessage());
- e.printStackTrace();
- }
- }
-
- // /**
- // * 创建hdfs连接
- // */
- // static {
- // if
- // (FlowWriteConfig.FILE_SYSTEM_TYPE.equals(FlowWriteConfig.KNOWLEDGEBASE_FILE_STORAGE_TYPE)) {
- // Configuration configuration = new Configuration();
- // try {
- // //指定用户
- // System.setProperty("HADOOP_USER_NAME", "etl");
- // //配置hdfs相关信息
- // configuration.set("fs.defaultFS", "hdfs://ns1");
- // configuration.set("hadoop.proxyuser.root.hosts", "*");
- // configuration.set("hadoop.proxyuser.root.groups", "*");
- // configuration.set("dfs.nameservices", "ns1");
- // configuration.set("dfs.ha.namenodes.ns1", "nn1,nn2");
- // String[] servers = StringUtil.split(FlowWriteConfig.HDFS_SERVERS,
- // FlowWriteConfig.FORMAT_SPLITTER);
- // configuration.set("dfs.namenode.rpc-address.ns1.nn1", servers[0]);
- // configuration.set("dfs.namenode.rpc-address.ns1.nn2", servers[1]);
- // configuration.set("dfs.client.failover.proxy.provider.ns1",
- // "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
- // //创建fileSystem,用于连接hdfs
- // fileSystem = FileSystem.get(configuration);
- // } catch (IOException | RuntimeException e) {
- // logger.error("Failed to create HDFS connection. message is: " +
- // e.getMessage());
- // e.printStackTrace();
- // }
- // }
- // }
-
- /**
- * 下载HDFS文件
- *
- * @param filePath 文件路径
- * @return 文件
- */
- public static byte[] downloadFileByBytes(String filePath) {
- if (hadoopUtils == null) {
- getInstance();
- }
-
- try (FSDataInputStream open = fileSystem.open(new Path(filePath))) {
- byte[] bytes = new byte[open.available()];
- open.read(0, bytes, 0, open.available());
- return bytes;
- } catch (IOException e) {
- logger.error(
- "An I/O exception when files are download from HDFS. Message is :"
- + e.getMessage());
- }
- return null;
- }
-
- /**
- * 更新文件到HDFS
- *
- * @param filePath 文件路径
- * @param bytes 文件
- */
- public static void uploadFileByBytes(String filePath, byte[] bytes) {
- if (hadoopUtils == null) {
- getInstance();
- }
- try (FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(filePath), true)) {
- fsDataOutputStream.write(bytes);
- // fsDataOutputStream.flush();
- } catch (RuntimeException e) {
- logger.error("Uploading files to the HDFS is abnormal. Message is :" + e.getMessage());
- } catch (IOException e) {
- logger.error(
- "An I/O exception when files are uploaded to HDFS. Message is :"
- + e.getMessage());
- }
- }
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java
index cc312ea..535eaa5 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.core.utils.KnowlegdeBase;
-import com.geedgenetworks.common.config.KnowledgeConfig;
+import com.geedgenetworks.common.config.KnowledgeBaseConfig;
import com.geedgenetworks.core.pojo.KnowLedgeFileEntity;
import com.geedgenetworks.core.utils.HttpClientPoolUtil;
import org.slf4j.Logger;
@@ -17,7 +17,7 @@ public abstract class AbstractKnowledgeBase {
// 抽象类的构造函数
}
- abstract Boolean updateKnowledgeBase(KnowledgeConfig knowledgeConfig) ;
+ abstract Boolean updateKnowledgeBase(KnowledgeBaseConfig knowledgeConfig) ;
abstract String functionName();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java
index 9bf0b9a..9d0816a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.core.utils.KnowlegdeBase;
-import com.geedgenetworks.common.config.KnowledgeConfig;
+import com.geedgenetworks.common.config.KnowledgeBaseConfig;
import com.geedgenetworks.utils.IpLookupV2;
import lombok.Getter;
@@ -31,7 +31,7 @@ public class AsnKnowledgeBase extends AbstractKnowledgeBase {
- public Boolean updateKnowledgeBase(KnowledgeConfig knowledgeConfig) {
+ public Boolean updateKnowledgeBase(KnowledgeBaseConfig knowledgeConfig) {
IpLookupV2.Builder asnLookupBuilder = new IpLookupV2.Builder(false);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/GeoIpKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/GeoIpKnowledgeBase.java
index b2a1077..2ef6674 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/GeoIpKnowledgeBase.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/GeoIpKnowledgeBase.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.core.utils.KnowlegdeBase;
-import com.geedgenetworks.common.config.KnowledgeConfig;
+import com.geedgenetworks.common.config.KnowledgeBaseConfig;
import com.geedgenetworks.core.pojo.KnowLedgeFileEntity;
import com.geedgenetworks.utils.IpLookupV2;
import lombok.Getter;
@@ -26,7 +26,7 @@ public class GeoIpKnowledgeBase extends AbstractKnowledgeBase {
return instance;
}
@Override
- public Boolean updateKnowledgeBase(KnowledgeConfig knowledgeConfig) {
+ public Boolean updateKnowledgeBase(KnowledgeBaseConfig knowledgeConfig) {
IpLookupV2.Builder ipLookupBuilder = new IpLookupV2.Builder(false);
for (int i = 0; i < knowledgeConfig.getFiles().size(); i++) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java
index b879e67..0f130f4 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java
@@ -3,8 +3,8 @@ package com.geedgenetworks.core.utils.KnowlegdeBase;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.EngineConfig;
-import com.geedgenetworks.common.config.KnowledgeConfig;
+import com.geedgenetworks.common.config.CommonConfig;
+import com.geedgenetworks.common.config.KnowledgeBaseConfig;
import com.geedgenetworks.core.pojo.KnowLedgeFileEntity;
import com.geedgenetworks.core.pojo.KnowledgeBaseEntity;
import com.geedgenetworks.core.utils.HttpClientPoolUtil;
@@ -33,14 +33,14 @@ public class KnowledgeBaseUpdateJob implements Job {
private static KnowledgeBaseUpdateJob instance;
- private static EngineConfig engineConfig;
+ private static CommonConfig engineConfig;
public static synchronized KnowledgeBaseUpdateJob getInstance(RuntimeContext runtimeContext) {
if (instance == null) {
instance = new KnowledgeBaseUpdateJob();
Configuration configuration = (Configuration) runtimeContext
.getExecutionConfig().getGlobalJobParameters();
- engineConfig = com.alibaba.fastjson.JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class);
+ engineConfig = com.alibaba.fastjson.JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class);
startTask();
}
return instance;
@@ -70,7 +70,7 @@ public class KnowledgeBaseUpdateJob implements Job {
knowledgeBaseEntity.setKnowLedgeFileEntityList(knowLedgeFileEntityList);
try {
- for(KnowledgeConfig knowledgeConfig : engineConfig.getKnowledgeBaseConfig()){
+ for(KnowledgeBaseConfig knowledgeConfig : engineConfig.getKnowledgeBaseConfig()){
if(name.equals(knowledgeConfig.getName())){
knowledgeBaseEntity.setKnowledgeConfig(knowledgeConfig);
break;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/hbase/HbaseConnectBuilder.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/hbase/HbaseConnectBuilder.java
index 90bba33..b5c44fe 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/hbase/HbaseConnectBuilder.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/hbase/HbaseConnectBuilder.java
@@ -1,6 +1,7 @@
package com.geedgenetworks.core.utils.hbase;
import com.geedgenetworks.common.config.CommonConfig;
+import com.geedgenetworks.common.config.CommonConfigOptions;
import com.github.rholder.retry.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -43,7 +44,7 @@ public class HbaseConnectBuilder {
* @return This HbaseConnectBuilder instance.
*/
public HbaseConnectBuilder loadDefaultConfig() {
- this.config.set("hbase.zookeeper.quorum", CommonConfig.ZOOKEEPER_QUORUM);
+ this.config.set("hbase.zookeeper.quorum", CommonConfigOptions.ZOOKEEPER_QUORUM.defaultValue());
return this;
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java
index ef820e4..3d9a34e 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java
@@ -1,10 +1,8 @@
package com.geedgenetworks.core.udf.test;
-import com.geedgenetworks.common.config.KnowledgeConfig;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.core.pojo.UDFContext;
import com.geedgenetworks.core.udf.AsnLookup;
-import com.geedgenetworks.core.utils.KnowlegdeBase.AsnKnowledgeBase;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java
index d65c431..f5c44e4 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.core.udf.test;
-import com.geedgenetworks.common.config.KnowledgeConfig;
+import com.geedgenetworks.common.config.KnowledgeBaseConfig;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.core.pojo.UDFContext;
import com.geedgenetworks.core.udf.GeoIpLookup;
@@ -69,7 +69,7 @@ public class GeoIpLookupFunctionTest {
udfContext.getParameters().put("option","IP_TO_ASN");
udfContext.getParameters().put("vendor_id","tsg_asnlookup");*/
- KnowledgeConfig knowledgeConfig =new KnowledgeConfig();
+ KnowledgeBaseConfig knowledgeConfig =new KnowledgeBaseConfig();
knowledgeConfig.setName("tsg_geoiplookup");
knowledgeConfig.setType("geoiplookup");
knowledgeConfig.setFiles(Arrays.asList("acf1db8589c5e277-ead1a65e1c3973dc","acf1db8589c5e277-ead1a65e1c3973dc"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java
index 524b199..d91be11 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java
@@ -78,13 +78,14 @@ public class DomainFunctionTest {
public void testDomainFunctionFirstSignificantSubdomain() {
parameters.put("option", "FIRST_SIGNIFICANT_SUBDOMAIN");
+ udfContext.setParameters(parameters);
Domain domain = new Domain();
domain.open(null, udfContext);
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
- extractedFields.put("domain", "www.baidu.com");
+ extractedFields.put("v1", "www.baidu.com");
event.setExtractedFields(extractedFields);
Event result1 = domain.evaluate(event);
- assertEquals("baidu.com", result1.getExtractedFields().get("domain1"));
+ assertEquals("baidu.com", result1.getExtractedFields().get("v2"));
}
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java
index 39c825f..4886547 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java
@@ -21,15 +21,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class JsonExtractFunctionTest {
private static UDFContext udfContext;
- private static Map<String, Object> parameters ;
@BeforeAll
public static void setUp() {
udfContext = new UDFContext();
- parameters = new HashMap<>();
- parameters.put("param","$.tags[?(@.tag=='device_group')][0].value");
- udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Arrays.asList("device_tag"));
- udfContext.setOutput_fields(Arrays.asList("device_group"));
}
@Test
@@ -46,11 +40,6 @@ public class JsonExtractFunctionTest {
Assertions.assertThrows(GrootStreamRuntimeException.class, () -> {
jsonExtract.open(null, udfContext);
});
-
- udfContext.setLookup_fields(new ArrayList<>());
- udfContext.getLookup_fields().add("v1");
- udfContext.setOutput_fields(new ArrayList<>());
- udfContext.getOutput_fields().add("v2");
udfContext.setParameters(new HashMap<>());
udfContext.getParameters().put("other","other");
Assertions.assertThrows(GrootStreamRuntimeException.class, () -> {
@@ -64,6 +53,11 @@ public class JsonExtractFunctionTest {
JsonExtract jsonExtract = new JsonExtract();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("param","$.tags[?(@.tag=='device_group')][0].value");
+ udfContext.setParameters(parameters);
+ udfContext.setLookup_fields(Arrays.asList("device_tag"));
+ udfContext.setOutput_fields(Arrays.asList("device_group"));
jsonExtract.open(null, udfContext);
Event event = new Event();
String jsonString = "{\"device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-tsgx\\\"},{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-tsgx\\\"}]}\"}";
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampConverterTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java
index 4edbba1..93acbdd 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampConverterTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java
@@ -2,7 +2,6 @@ package com.geedgenetworks.core.udf.test.simple;
import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.UDFContext;
-import com.geedgenetworks.core.udf.FromUnixTimestamp;
import com.geedgenetworks.core.udf.UnixTimestampConverter;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -13,7 +12,7 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class FromUnixTimestampConverterTest {
+public class UnixTimestampConverterTest {
private static UDFContext udfContext;
@@ -24,7 +23,7 @@ public class FromUnixTimestampConverterTest {
udfContext.setOutput_fields(Arrays.asList("output"));
}
@Test
- public void testFromUnixTimestampFunctionMstoS() throws Exception {
+ public void testUnixTimestampFunctionMstoS() throws Exception {
Map<String, Object> parameters = new HashMap<>();
parameters.put("precision", "seconds");
@@ -42,7 +41,7 @@ public class FromUnixTimestampConverterTest {
}
@Test
- public void testFromUnixTimestampFunctionStoMs() throws Exception {
+ public void testUnixTimestampFunctionStoMs() throws Exception {
Map<String, Object> parameters = new HashMap<>();
parameters.put("precision", "milliseconds");
@@ -61,7 +60,7 @@ public class FromUnixTimestampConverterTest {
@Test
- public void testFromUnixTimestampFunctionStoS() throws Exception {
+ public void testUnixTimestampFunctionStoS() throws Exception {
Map<String, Object> parameters = new HashMap<>();
parameters.put("precision", "seconds");
@@ -80,7 +79,7 @@ public class FromUnixTimestampConverterTest {
@Test
- public void testFromUnixTimestampFunctionMstoMs() throws Exception {
+ public void testUnixTimestampFunctionMstoMs() throws Exception {
Map<String, Object> parameters = new HashMap<>();
parameters.put("precision", "milliseconds");