diff options
| author | 梁超 <[email protected]> | 2023-12-15 02:27:43 +0000 |
|---|---|---|
| committer | 梁超 <[email protected]> | 2023-12-15 02:27:43 +0000 |
| commit | 1f6ef08a3013dede4cc3e19d07477e1e2c3f15f9 (patch) | |
| tree | c27dc51022b14bdde4174bafdaea9095ec628120 | |
| parent | 2542a8bfd23958eee0da0585304c5f8318471d66 (diff) | |
| parent | 87abd1e2ca306ac9adae783add0381829d478a4f (diff) | |
Merge branch 'hotfix/illegal-flags' into 'release/1.2'
[GAL-444] fix: fix error caused by invalid flags
See merge request galaxy/tsg_olap/sip-rtp-correlation!23
| -rw-r--r-- | src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java | 16 |
1 files changed, 14 insertions, 2 deletions
diff --git a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java index 98bbbe0..be41377 100644 --- a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java +++ b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java @@ -21,6 +21,8 @@ import org.apache.flink.util.OutputTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.function.Function; + /** * The ErrorHandler class is responsible for handling and filtering error records from the input data stream. * It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled. @@ -112,6 +114,8 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje record.getClientPort() > 0 && record.getServerPort() > 0; + boolean cond8 = null != executeSafely(Record::getStreamDir, record); + final SIPRecord sipRecord = new SIPRecord(obj); boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) || isIPAddress(sipRecord.getResponderSdpConnectIp()); @@ -120,7 +124,7 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp()) || (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp())); boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType()); - boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() && + boolean cond6 = StreamDir.DOUBLE == executeSafely(Record::getStreamDir, sipRecord) && (includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) && (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp())); @@ -129,7 +133,7 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0; // Both client and server addresses in the data are valid. - if (cond1 && (!cond5 || cond7) && ( + if (cond1 && cond8 && (!cond5 || cond7) && ( // The address in the SIP one-way stream is valid and not an internal network address. cond2 && cond3 && cond4 && cond5 // The coordinating addresses in the SIP double directional stream are valid @@ -149,6 +153,14 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje // ====================================================================================== // ----------------------------------- private helper ----------------------------------- + public static <T, R> R executeSafely(Function<T, R> function, T v) { + try { + return function.apply(v); + } catch (Exception e) { + return null; + } + } + private static boolean isIPAddress(final String ipaddr) { if (null == ipaddr) { return false; |
