summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author梁超 <[email protected]>2024-10-21 06:43:13 +0000
committer梁超 <[email protected]>2024-10-21 06:43:13 +0000
commit2954275dd3cf3be19e5faebebfdfbf00779bcb42 (patch)
treeaca1383d3d1fe8e2676c1c88501e5cdffc7cb7b0
parent4ef6c25e6937a8d6e917c95bce3ea6213937552e (diff)
parent728e3407e8125250450e9fa97bc786e47e2d3ec1 (diff)
Merge branch 'bugfix/eval-error' into 'master'
[TSG-22767] fix: fix function exception See merge request galaxy/tsg_olap/sip-rtp-correlation!34
-rw-r--r--pom.xml2
-rw-r--r--src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java1
-rw-r--r--src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java4
-rw-r--r--src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java5
-rw-r--r--src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDirSet.java22
-rw-r--r--src/main/resources/jobs/job.yml8
6 files changed, 33 insertions, 9 deletions
diff --git a/pom.xml b/pom.xml
index 1ac6d36..75c3b5b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
<groupId>com.geedgenetworks.application</groupId>
<artifactId>sip-rtp-correlation</artifactId>
- <version>2.0-rc7</version>
+ <version>2.0-rc8</version>
<name>Flink : SIP-RTP : Correlation</name>
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java
index 20d0746..d718a14 100644
--- a/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java
@@ -20,6 +20,7 @@ public class VoipUDFFactory implements UDFFactory {
put("HAS_EXTERNAL_IP_ADDRESS", new HasExternalIpAddress());
put("STREAM_DIR", new StreamDir());
+ put("STREAM_DIR_SET", new StreamDirSet());
put("FIND_NOT_BLANK", new FindNotBlank());
put("SORT_ADDRESS", new SortAddress());
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java
index ab422b7..13ab4a6 100644
--- a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java
@@ -2,6 +2,7 @@ package com.geedgenetworks.flink.easy.application.voip.udf;
import com.google.common.collect.Lists;
import com.zdjizhi.utils.IPUtil;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
@@ -17,6 +18,9 @@ public class SortAddress extends ScalarFunction {
public static String of(
Tuple2<String, Integer> a1, Tuple2<String, Integer> a2) {
var list = Lists.newArrayList(a1, a2);
+ if (a1.f1 == null || a2.f1 == null || StringUtils.isAnyEmpty(a1.f0, a2.f0)) {
+ return "";
+ }
list.sort((a, b) -> {
if (a.f1.equals(b.f1)) {
return Long.compare(
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java
index 3f5166f..e571cf4 100644
--- a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java
@@ -18,9 +18,4 @@ public class StreamDir extends ScalarFunction {
}
return v;
}
-
- public static void main(String[] args) {
- System.out.println(8192L + 16384L);
- System.out.println(new StreamDir().eval(8192L + 16384L));
- }
} \ No newline at end of file
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDirSet.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDirSet.java
new file mode 100644
index 0000000..bd44ae4
--- /dev/null
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDirSet.java
@@ -0,0 +1,22 @@
+package com.geedgenetworks.flink.easy.application.voip.udf;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+
+public class StreamDirSet extends ScalarFunction {
+
+ public @DataTypeHint("BIGINT") Long eval(Long flags) {
+ if (flags == null) {
+ return 8192 + 16384L;
+ }
+ long r = 0;
+ if ((flags & 8192) == 0) {
+ r += 8192;
+ }
+ if ((flags & 16384) == 0) {
+ r += 16384;
+ }
+ return r;
+ }
+
+}
diff --git a/src/main/resources/jobs/job.yml b/src/main/resources/jobs/job.yml
index 2789c11..cd4bbdf 100644
--- a/src/main/resources/jobs/job.yml
+++ b/src/main/resources/jobs/job.yml
@@ -335,13 +335,13 @@ pipeline:
splits:
# Invalid ip or port
- name: error1-records
- where: NOT(IS_IP_ADDRESS(client_ip)) || NOT(IS_IP_ADDRESS(server_ip)) || client_port <= 0 || server_port <= 0
+ where: NOT(IS_IP_ADDRESS(client_ip)) || NOT(IS_IP_ADDRESS(server_ip)) || client_port.isNull || client_port <= 0 || server_port.isNull || server_port <= 0
# Invalid stream dir
- name: error2-records
where: decoded_as == 'SIP' &&STREAM_DIR(flags) != 1 && STREAM_DIR(flags) != 2 && STREAM_DIR(flags) != 3
# Invalid: SIP one-way stream and has invalid network address
- name: error3-records
- where: decoded_as == 'SIP' && ( NOT(HAS_IP_ADDRESS(sip_originator_sdp_connect_ip, sip_responder_sdp_connect_ip)) || sip_originator_sdp_media_port <= 0 || sip_responder_sdp_media_port <= 0 )
+ where: decoded_as == 'SIP' && ( NOT(HAS_IP_ADDRESS(sip_originator_sdp_connect_ip, sip_responder_sdp_connect_ip)) || sip_originator_sdp_media_port.isNull || sip_originator_sdp_media_port <= 0 || sip_responder_sdp_media_port.isNull && sip_responder_sdp_media_port <= 0 )
- name: error4-records
where: decoded_as == 'SIP' && STREAM_DIR(flags) == 3 && ( NOT( IS_IP_ADDRESS(sip_originator_sdp_connect_ip) ) || NOT( IS_IP_ADDRESS(sip_responder_sdp_connect_ip) ) )
@@ -643,7 +643,9 @@ pipeline:
- if: STREAM_DIR(flags) != 3 && @v1.isNotNull && STREAM_DIR(@v1.$flags) != STREAM_DIR(flags)
then:
- |-
- OUTPUT ok FROM withColumns(recv_time to sip_call_id),
+ OUTPUT ok FROM withColumns(recv_time to t_vsys_id),
+ STREAM_DIR_SET(flags) AS flags,
+ withColumns(flags_identify_info to sip_call_id),
FIND_NOT_BLANK(@v1.$sip_originator_description, sip_originator_description) AS sip_originator_description,
FIND_NOT_BLANK(@v1.$sip_responder_description, sip_responder_description) AS sip_responder_description,
FIND_NOT_BLANK(@v1.$sip_user_agent, sip_user_agent) AS sip_user_agent,