summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-09-13 18:24:15 +0800
committerlifengchao <[email protected]>2024-09-13 18:24:15 +0800
commit0527fc54a42ee17e03ee34afac606ae9f30f8657 (patch)
tree2917564e156ef10756765022ee315cbd7bc57f5c
parent2ed4587200e28ceca9895e897212ecd28312485f (diff)
[feature][core] TSG-22596 add EXPLODE_APP_AND_PROTOCOL function for app-protocol-stat-traffic-merge application
-rw-r--r--config/udf.plugins3
-rw-r--r--groot-common/src/main/resources/udf.plugins3
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java3
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java16
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java91
5 files changed, 111 insertions, 5 deletions
diff --git a/config/udf.plugins b/config/udf.plugins
index e4f940f..313a419 100644
--- a/config/udf.plugins
+++ b/config/udf.plugins
@@ -28,4 +28,5 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogram
com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile
com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles
com.geedgenetworks.core.udf.udtf.JsonUnroll
-com.geedgenetworks.core.udf.udtf.Unroll \ No newline at end of file
+com.geedgenetworks.core.udf.udtf.Unroll
+com.geedgenetworks.core.udf.udtf.ExplodeAppAndProtocol \ No newline at end of file
diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins
index 7544cc7..3b0b0d5 100644
--- a/groot-common/src/main/resources/udf.plugins
+++ b/groot-common/src/main/resources/udf.plugins
@@ -27,4 +27,5 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogram
com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile
com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles
com.geedgenetworks.core.udf.udtf.JsonUnroll
-com.geedgenetworks.core.udf.udtf.Unroll \ No newline at end of file
+com.geedgenetworks.core.udf.udtf.Unroll
+com.geedgenetworks.core.udf.udtf.ExplodeAppAndProtocol \ No newline at end of file
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java
index 5101fa1..0a36100 100644
--- a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java
@@ -165,7 +165,8 @@ public class FakerUtils {
private static Faker<?> parseSequenceFaker(JSONObject obj) {
long start = obj.getLongValue("start", 0L);
long step = obj.getLongValue("step", 1L);
- return new SequenceFaker(start, step);
+ int batch = obj.getIntValue("batch", 1);
+ return new SequenceFaker(start, step, batch);
}
private static Faker<?> parseStringFaker(JSONObject obj) {
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java
index 0005234..867f138 100644
--- a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java
@@ -3,22 +3,34 @@ package com.geedgenetworks.connectors.mock.faker;
public class SequenceFaker extends Faker<Long> {
private final long start;
private final long step;
+ private final int batch;
private long value;
+ private int cnt;
public SequenceFaker(long start) {
- this(start, 1);
+ this(start, 1, 1);
}
public SequenceFaker(long start, long step) {
+ this(start, step, 1);
+ }
+
+ public SequenceFaker(long start, long step, int batch) {
this.start = start;
this.step = step;
+ this.batch = batch;
this.value = start;
+ this.cnt = 0;
}
@Override
public Long geneValue() throws Exception {
Long rst = value;
- value += step;
+ cnt++;
+ if(cnt == batch){
+ cnt = 0;
+ value += step;
+ }
return rst;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java
new file mode 100644
index 0000000..227099f
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java
@@ -0,0 +1,91 @@
+package com.geedgenetworks.core.udf.udtf;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.TableFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.util.Preconditions;
+
+import java.util.*;
+
+public class ExplodeAppAndProtocol implements TableFunction {
+ private String appField;
+ private String decodedPathField;
+ private String outputAppField;
+ private String outputProtocolField;
+
+ @Override
+ public void open(RuntimeContext runtimeContext, UDFContext c) {
+ Preconditions.checkArgument(c.getLookup_fields().size() == 2, "input fields requested for app and decodedPath");
+ Preconditions.checkArgument(c.getOutput_fields().size() == 2, "output fields requested for app and decodedPath");
+ appField = c.getLookup_fields().get(0);
+ decodedPathField = c.getLookup_fields().get(1);
+ outputAppField = c.getOutput_fields().get(0);
+ outputProtocolField = c.getOutput_fields().get(1);
+ }
+
+ @Override
+ public List<Event> evaluate(Event event) {
+ Map<String, Object> map = event.getExtractedFields();
+ String appFullPath = (String) map.get(appField);
+ String decodedPath = (String) map.get(decodedPathField);
+
+ if (StringUtils.isBlank(decodedPath)) {
+ return Collections.emptyList();
+ }
+
+ // decoded_path和app进行拼接,格式化
+ String app;
+ if (StringUtils.isBlank(appFullPath)) {
+ app = null;
+ } else {
+ String[] appSplits = appFullPath.split("\\.");
+ app = appSplits[appSplits.length - 1];
+ String firstAppProtocol = appSplits[0];
+ String endProtocol = decodedPath.substring(decodedPath.lastIndexOf(".") + 1);
+ if (endProtocol.equals(firstAppProtocol)) {
+ if (appSplits.length > 1) {
+ decodedPath = decodedPath + appFullPath.substring(appFullPath.indexOf("."));
+ }
+ } else {
+ decodedPath = decodedPath + "." + appFullPath;
+ }
+ }
+
+ List<Event> events = new ArrayList<>();
+ Event e;
+ Map<String, Object> fields;
+
+ // 拆分decodedPath
+ int index = decodedPath.indexOf('.');
+ String subDecodedPath;
+ while (index > 0) {
+ subDecodedPath = decodedPath.substring(0, index);
+ e = new Event();
+ fields = new HashMap<>(map);
+ fields.put(outputAppField, null);
+ fields.put(outputProtocolField, subDecodedPath);
+ e.setExtractedFields(fields);
+ events.add(e);
+ index = decodedPath.indexOf('.', index + 1);
+ }
+
+ e = new Event();
+ fields = new HashMap<>(map);
+ fields.put(outputAppField, app);
+ fields.put(outputProtocolField, decodedPath);
+ e.setExtractedFields(fields);
+ events.add(e);
+
+ return events;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public String functionName() {
+ return "EXPLODE_APP_AND_PROTOCOL";
+ }
+}