summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchaoc <[email protected]>2023-10-13 14:38:23 +0800
committerchaoc <[email protected]>2023-10-13 14:38:23 +0800
commit10ce6cfa07eb9749e1d56a7766dab074d547dd26 (patch)
tree4d34c3649b54d8cf4f965dfff0f1b7803318da2d
parent9d5d99974b7cba980871625d0e7935a5d2ae448c (diff)
feat: add config 'DETERMINE_INTRANET_IP_BE_ABNORMAL'
-rw-r--r--README.md3
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java9
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java24
3 files changed, 28 insertions, 8 deletions
diff --git a/README.md b/README.md
index 42df51d..7740b28 100644
--- a/README.md
+++ b/README.md
@@ -28,7 +28,8 @@ flink run -c com.zdjizhi.flink.voip.CorrelateApp path/to/sip-rtp-correlation-<ve
| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 |
| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
-| error.records.output.enable | STRING | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
+| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
+| determine.intranet.ip.be.abnormal | BOOLEAN | N | True | SIP 中协商四元组中存在内网 IP 地址时,是否将其判定为异常数据 |
| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |
diff --git a/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java b/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java
index 698d5da..31dac18 100644
--- a/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java
+++ b/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java
@@ -57,6 +57,15 @@ public class FusionConfigs {
"If set to true, the error records will be sent to the specified Kafka topic.");
/**
+ * Configuration option to determine whether intranet IP addresses should be considered abnormal.
+ */
+ public static final ConfigOption<Boolean> DETERMINE_INTRANET_IP_BE_ABNORMAL =
+ ConfigOptions.key("determine.intranet.ip.be.abnormal")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Specifies whether intranet IP addresses should be treated as abnormal.");
+
+ /**
* Configuration option for specifying the Kafka topic name where the error data will be sent.
* This configuration option is used when the output of error records is enabled.
*/
diff --git a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java
index 8f79f05..bd8cd9b 100644
--- a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java
+++ b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java
@@ -3,6 +3,7 @@ package com.zdjizhi.flink.voip.error;
import com.zdjizhi.flink.voip.conf.FusionConfigs;
import com.zdjizhi.flink.voip.conf.FusionConfiguration;
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
+import com.zdjizhi.flink.voip.functions.FunctionHelper;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
@@ -88,10 +89,19 @@ public class ErrorHandler {
* The MeaninglessAddressProcessFunction is a custom ProcessFunction used to check for records
* with invalid or meaningless addresses and ports. It separates them into the invalid output tag if necessary.
*/
-class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, ObjectNode> {
+class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, ObjectNode> implements FunctionHelper {
private static final Logger LOG = LoggerFactory.getLogger(MeaninglessAddressProcessFunction.class);
+ private transient boolean determineIntranetIpBeAbnormal;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ final Configuration config = getGlobalConfiguration();
+ determineIntranetIpBeAbnormal = config.get(FusionConfigs.DETERMINE_INTRANET_IP_BE_ABNORMAL);
+ }
+
@Override
public void processElement(ObjectNode obj,
ProcessFunction<ObjectNode, ObjectNode>.Context ctx,
@@ -100,20 +110,20 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
// Check for invalid or meaningless addresses and ports
boolean cond1 = StringUtils.isNotBlank(record.getClientIp()) &&
StringUtils.isNotBlank(record.getServerIp()) &&
- record.getClientPort() >= 0 &&
- record.getServerPort() >= 0;
+ record.getClientPort() > 0 &&
+ record.getServerPort() > 0;
final SIPRecord sipRecord = new SIPRecord(obj);
boolean cond2 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
- || isInternalIp(sipRecord.getOriginatorSdpConnectIp());
+ || (!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond3 = !isIPAddress(sipRecord.getResponderSdpConnectIp())
- || isInternalIp(sipRecord.getResponderSdpConnectIp());
+ || (!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getResponderSdpConnectIp()));
boolean cond4 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() &&
- isInternalIp(sipRecord.getResponderSdpConnectIp()) &&
- isInternalIp(sipRecord.getOriginatorSdpConnectIp());
+ (!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
+ (!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
// Both client and server addresses in the data are valid.
if (cond1 && (