summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchaoc <[email protected]>2023-12-15 10:27:02 +0800
committerchaoc <[email protected]>2023-12-15 10:27:02 +0800
commit07beccc732466f2d160946f0c2bd751be5260005 (patch)
treeca717b712e975db3e322485f331df01c0e0a841d
parent93ed6bcddc9d61cee591f6769955cd1bb76559b1 (diff)
[GAL-444] fix: fix error caused by invalid flags
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java16
1 files changed, 14 insertions, 2 deletions
diff --git a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java
index 98bbbe0..be41377 100644
--- a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java
+++ b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java
@@ -21,6 +21,8 @@ import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.function.Function;
+
/**
* The ErrorHandler class is responsible for handling and filtering error records from the input data stream.
* It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled.
@@ -112,6 +114,8 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
record.getClientPort() > 0 &&
record.getServerPort() > 0;
+ boolean cond8 = null != executeSafely(Record::getStreamDir, record);
+
final SIPRecord sipRecord = new SIPRecord(obj);
boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
@@ -120,7 +124,7 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
- boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() &&
+ boolean cond6 = StreamDir.DOUBLE == executeSafely(Record::getStreamDir, sipRecord) &&
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
@@ -129,7 +133,7 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0;
// Both client and server addresses in the data are valid.
- if (cond1 && (!cond5 || cond7) && (
+ if (cond1 && cond8 && (!cond5 || cond7) && (
// The address in the SIP one-way stream is valid and not an internal network address.
cond2 && cond3 && cond4 && cond5
// The coordinating addresses in the SIP double directional stream are valid
@@ -149,6 +153,14 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
// ======================================================================================
// ----------------------------------- private helper -----------------------------------
+ public static <T, R> R executeSafely(Function<T, R> function, T v) {
+ try {
+ return function.apply(v);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
private static boolean isIPAddress(final String ipaddr) {
if (null == ipaddr) {
return false;