diff options
| author | 梁超 <[email protected]> | 2024-10-21 06:43:13 +0000 |
|---|---|---|
| committer | 梁超 <[email protected]> | 2024-10-21 06:43:13 +0000 |
| commit | 2954275dd3cf3be19e5faebebfdfbf00779bcb42 (patch) | |
| tree | aca1383d3d1fe8e2676c1c88501e5cdffc7cb7b0 | |
| parent | 4ef6c25e6937a8d6e917c95bce3ea6213937552e (diff) | |
| parent | 728e3407e8125250450e9fa97bc786e47e2d3ec1 (diff) | |
Merge branch 'bugfix/eval-error' into 'master'
[TSG-22767] fix: fix function exception
See merge request galaxy/tsg_olap/sip-rtp-correlation!34
6 files changed, 33 insertions, 9 deletions
@@ -7,7 +7,7 @@ <groupId>com.geedgenetworks.application</groupId> <artifactId>sip-rtp-correlation</artifactId> - <version>2.0-rc7</version> + <version>2.0-rc8</version> <name>Flink : SIP-RTP : Correlation</name> diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java index 20d0746..d718a14 100644 --- a/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java +++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java @@ -20,6 +20,7 @@ public class VoipUDFFactory implements UDFFactory { put("HAS_EXTERNAL_IP_ADDRESS", new HasExternalIpAddress()); put("STREAM_DIR", new StreamDir()); + put("STREAM_DIR_SET", new StreamDirSet()); put("FIND_NOT_BLANK", new FindNotBlank()); put("SORT_ADDRESS", new SortAddress()); diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java index ab422b7..13ab4a6 100644 --- a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java +++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java @@ -2,6 +2,7 @@ package com.geedgenetworks.flink.easy.application.voip.udf; import com.google.common.collect.Lists; import com.zdjizhi.utils.IPUtil; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.functions.ScalarFunction; @@ -17,6 +18,9 @@ public class SortAddress extends ScalarFunction { public static String of( Tuple2<String, Integer> a1, Tuple2<String, Integer> a2) { var list = Lists.newArrayList(a1, a2); + if (a1.f1 == null || a2.f1 == null || StringUtils.isAnyEmpty(a1.f0, a2.f0)) { + return ""; + } list.sort((a, b) -> { if (a.f1.equals(b.f1)) { return Long.compare( diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java index 3f5166f..e571cf4 100644 --- a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java +++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java @@ -18,9 +18,4 @@ public class StreamDir extends ScalarFunction { } return v; } - - public static void main(String[] args) { - System.out.println(8192L + 16384L); - System.out.println(new StreamDir().eval(8192L + 16384L)); - } }
\ No newline at end of file diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDirSet.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDirSet.java new file mode 100644 index 0000000..bd44ae4 --- /dev/null +++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDirSet.java @@ -0,0 +1,22 @@ +package com.geedgenetworks.flink.easy.application.voip.udf; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; + +public class StreamDirSet extends ScalarFunction { + + public @DataTypeHint("BIGINT") Long eval(Long flags) { + if (flags == null) { + return 8192 + 16384L; + } + long r = 0; + if ((flags & 8192) == 0) { + r += 8192; + } + if ((flags & 16384) == 0) { + r += 16384; + } + return r; + } + +} diff --git a/src/main/resources/jobs/job.yml b/src/main/resources/jobs/job.yml index 2789c11..cd4bbdf 100644 --- a/src/main/resources/jobs/job.yml +++ b/src/main/resources/jobs/job.yml @@ -335,13 +335,13 @@ pipeline: splits: # Invalid ip or port - name: error1-records - where: NOT(IS_IP_ADDRESS(client_ip)) || NOT(IS_IP_ADDRESS(server_ip)) || client_port <= 0 || server_port <= 0 + where: NOT(IS_IP_ADDRESS(client_ip)) || NOT(IS_IP_ADDRESS(server_ip)) || client_port.isNull || client_port <= 0 || server_port.isNull || server_port <= 0 # Invalid stream dir - name: error2-records where: decoded_as == 'SIP' &&STREAM_DIR(flags) != 1 && STREAM_DIR(flags) != 2 && STREAM_DIR(flags) != 3 # Invalid: SIP one-way stream and has invalid network address - name: error3-records - where: decoded_as == 'SIP' && ( NOT(HAS_IP_ADDRESS(sip_originator_sdp_connect_ip, sip_responder_sdp_connect_ip)) || sip_originator_sdp_media_port <= 0 || sip_responder_sdp_media_port <= 0 ) + where: decoded_as == 'SIP' && ( NOT(HAS_IP_ADDRESS(sip_originator_sdp_connect_ip, sip_responder_sdp_connect_ip)) || sip_originator_sdp_media_port.isNull || sip_originator_sdp_media_port <= 0 || sip_responder_sdp_media_port.isNull && sip_responder_sdp_media_port <= 0 ) - name: error4-records where: decoded_as == 'SIP' && STREAM_DIR(flags) == 3 && ( NOT( IS_IP_ADDRESS(sip_originator_sdp_connect_ip) ) || NOT( IS_IP_ADDRESS(sip_responder_sdp_connect_ip) ) ) @@ -643,7 +643,9 @@ pipeline: - if: STREAM_DIR(flags) != 3 && @v1.isNotNull && STREAM_DIR(@v1.$flags) != STREAM_DIR(flags) then: - |- - OUTPUT ok FROM withColumns(recv_time to sip_call_id), + OUTPUT ok FROM withColumns(recv_time to t_vsys_id), + STREAM_DIR_SET(flags) AS flags, + withColumns(flags_identify_info to sip_call_id), FIND_NOT_BLANK(@v1.$sip_originator_description, sip_originator_description) AS sip_originator_description, FIND_NOT_BLANK(@v1.$sip_responder_description, sip_responder_description) AS sip_responder_description, FIND_NOT_BLANK(@v1.$sip_user_agent, sip_user_agent) AS sip_user_agent, |
