summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author梁超 <[email protected]>2023-12-15 02:27:43 +0000
committer梁超 <[email protected]>2023-12-15 02:27:43 +0000
commit1f6ef08a3013dede4cc3e19d07477e1e2c3f15f9 (patch)
treec27dc51022b14bdde4174bafdaea9095ec628120
parent2542a8bfd23958eee0da0585304c5f8318471d66 (diff)
parent87abd1e2ca306ac9adae783add0381829d478a4f (diff)
Merge branch 'hotfix/illegal-flags' into 'release/1.2'
[GAL-444] fix: fix error caused by invalid flags See merge request galaxy/tsg_olap/sip-rtp-correlation!23
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java16
1 files changed, 14 insertions, 2 deletions
diff --git a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java
index 98bbbe0..be41377 100644
--- a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java
+++ b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java
@@ -21,6 +21,8 @@ import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.function.Function;
+
/**
* The ErrorHandler class is responsible for handling and filtering error records from the input data stream.
* It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled.
@@ -112,6 +114,8 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
record.getClientPort() > 0 &&
record.getServerPort() > 0;
+ boolean cond8 = null != executeSafely(Record::getStreamDir, record);
+
final SIPRecord sipRecord = new SIPRecord(obj);
boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
@@ -120,7 +124,7 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
- boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() &&
+ boolean cond6 = StreamDir.DOUBLE == executeSafely(Record::getStreamDir, sipRecord) &&
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
@@ -129,7 +133,7 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0;
// Both client and server addresses in the data are valid.
- if (cond1 && (!cond5 || cond7) && (
+ if (cond1 && cond8 && (!cond5 || cond7) && (
// The address in the SIP one-way stream is valid and not an internal network address.
cond2 && cond3 && cond4 && cond5
// The coordinating addresses in the SIP double directional stream are valid
@@ -149,6 +153,14 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
// ======================================================================================
// ----------------------------------- private helper -----------------------------------
+ public static <T, R> R executeSafely(Function<T, R> function, T v) {
+ try {
+ return function.apply(v);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
private static boolean isIPAddress(final String ipaddr) {
if (null == ipaddr) {
return false;