From 0527fc54a42ee17e03ee34afac606ae9f30f8657 Mon Sep 17 00:00:00 2001 From: lifengchao Date: Fri, 13 Sep 2024 18:24:15 +0800 Subject: [feature][core] TSG-22596 add EXPLODE_APP_AND_PROTOCOL function for app-protocol-stat-traffic-merge application --- config/udf.plugins | 3 +- groot-common/src/main/resources/udf.plugins | 3 +- .../connectors/mock/faker/FakerUtils.java | 3 +- .../connectors/mock/faker/SequenceFaker.java | 16 +++- .../core/udf/udtf/ExplodeAppAndProtocol.java | 91 ++++++++++++++++++++++ 5 files changed, 111 insertions(+), 5 deletions(-) create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java diff --git a/config/udf.plugins b/config/udf.plugins index e4f940f..313a419 100644 --- a/config/udf.plugins +++ b/config/udf.plugins @@ -28,4 +28,5 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogram com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles com.geedgenetworks.core.udf.udtf.JsonUnroll -com.geedgenetworks.core.udf.udtf.Unroll \ No newline at end of file +com.geedgenetworks.core.udf.udtf.Unroll +com.geedgenetworks.core.udf.udtf.ExplodeAppAndProtocol \ No newline at end of file diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins index 7544cc7..3b0b0d5 100644 --- a/groot-common/src/main/resources/udf.plugins +++ b/groot-common/src/main/resources/udf.plugins @@ -27,4 +27,5 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogram com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles com.geedgenetworks.core.udf.udtf.JsonUnroll -com.geedgenetworks.core.udf.udtf.Unroll \ No newline at end of file +com.geedgenetworks.core.udf.udtf.Unroll +com.geedgenetworks.core.udf.udtf.ExplodeAppAndProtocol \ No newline at end of file diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java index 5101fa1..0a36100 100644 --- a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java @@ -165,7 +165,8 @@ public class FakerUtils { private static Faker parseSequenceFaker(JSONObject obj) { long start = obj.getLongValue("start", 0L); long step = obj.getLongValue("step", 1L); - return new SequenceFaker(start, step); + int batch = obj.getIntValue("batch", 1); + return new SequenceFaker(start, step, batch); } private static Faker parseStringFaker(JSONObject obj) { diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java index 0005234..867f138 100644 --- a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java @@ -3,22 +3,34 @@ package com.geedgenetworks.connectors.mock.faker; public class SequenceFaker extends Faker { private final long start; private final long step; + private final int batch; private long value; + private int cnt; public SequenceFaker(long start) { - this(start, 1); + this(start, 1, 1); } public SequenceFaker(long start, long step) { + this(start, step, 1); + } + + public SequenceFaker(long start, long step, int batch) { this.start = start; this.step = step; + this.batch = batch; this.value = start; + this.cnt = 0; } @Override public Long geneValue() throws Exception { Long rst = value; - value += step; + cnt++; + if(cnt == batch){ + cnt = 0; + value += step; + } return rst; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java new file mode 100644 index 0000000..227099f --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java @@ -0,0 +1,91 @@ +package com.geedgenetworks.core.udf.udtf; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.TableFunction; +import com.geedgenetworks.common.udf.UDFContext; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.util.Preconditions; + +import java.util.*; + +public class ExplodeAppAndProtocol implements TableFunction { + private String appField; + private String decodedPathField; + private String outputAppField; + private String outputProtocolField; + + @Override + public void open(RuntimeContext runtimeContext, UDFContext c) { + Preconditions.checkArgument(c.getLookup_fields().size() == 2, "input fields requested for app and decodedPath"); + Preconditions.checkArgument(c.getOutput_fields().size() == 2, "output fields requested for app and decodedPath"); + appField = c.getLookup_fields().get(0); + decodedPathField = c.getLookup_fields().get(1); + outputAppField = c.getOutput_fields().get(0); + outputProtocolField = c.getOutput_fields().get(1); + } + + @Override + public List evaluate(Event event) { + Map map = event.getExtractedFields(); + String appFullPath = (String) map.get(appField); + String decodedPath = (String) map.get(decodedPathField); + + if (StringUtils.isBlank(decodedPath)) { + return Collections.emptyList(); + } + + // decoded_path和app进行拼接,格式化 + String app; + if (StringUtils.isBlank(appFullPath)) { + app = null; + } else { + String[] appSplits = appFullPath.split("\\."); + app = appSplits[appSplits.length - 1]; + String firstAppProtocol = appSplits[0]; + String endProtocol = decodedPath.substring(decodedPath.lastIndexOf(".") + 1); + if (endProtocol.equals(firstAppProtocol)) { + if (appSplits.length > 1) { + decodedPath = decodedPath + appFullPath.substring(appFullPath.indexOf(".")); + } + } else { + decodedPath = decodedPath + "." + appFullPath; + } + } + + List events = new ArrayList<>(); + Event e; + Map fields; + + // 拆分decodedPath + int index = decodedPath.indexOf('.'); + String subDecodedPath; + while (index > 0) { + subDecodedPath = decodedPath.substring(0, index); + e = new Event(); + fields = new HashMap<>(map); + fields.put(outputAppField, null); + fields.put(outputProtocolField, subDecodedPath); + e.setExtractedFields(fields); + events.add(e); + index = decodedPath.indexOf('.', index + 1); + } + + e = new Event(); + fields = new HashMap<>(map); + fields.put(outputAppField, app); + fields.put(outputProtocolField, decodedPath); + e.setExtractedFields(fields); + events.add(e); + + return events; + } + + @Override + public void close() {} + + @Override + public String functionName() { + return "EXPLODE_APP_AND_PROTOCOL"; + } +} -- cgit v1.2.3