diff options
| author | chaoc <[email protected]> | 2023-10-13 14:38:23 +0800 |
|---|---|---|
| committer | chaoc <[email protected]> | 2023-10-13 14:38:23 +0800 |
| commit | 10ce6cfa07eb9749e1d56a7766dab074d547dd26 (patch) | |
| tree | 4d34c3649b54d8cf4f965dfff0f1b7803318da2d | |
| parent | 9d5d99974b7cba980871625d0e7935a5d2ae448c (diff) | |
feat: add config 'DETERMINE_INTRANET_IP_BE_ABNORMAL'
| -rw-r--r-- | README.md | 3 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java | 9 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java | 24 |
3 files changed, 28 insertions, 8 deletions
@@ -28,7 +28,8 @@ flink run -c com.zdjizhi.flink.voip.CorrelateApp path/to/sip-rtp-correlation-<ve | source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties | | sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 | | sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties | -| error.records.output.enable | STRING | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 | +| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 | +| determine.intranet.ip.be.abnormal | BOOLEAN | N | True | SIP 中协商四元组中存在内网 IP 地址时,是否将其判定为异常数据 | | error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 | | error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties | | sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) | diff --git a/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java b/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java index 698d5da..31dac18 100644 --- a/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java +++ b/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java @@ -57,6 +57,15 @@ public class FusionConfigs { "If set to true, the error records will be sent to the specified Kafka topic."); /** + * Configuration option to determine whether intranet IP addresses should be considered abnormal. + */ + public static final ConfigOption<Boolean> DETERMINE_INTRANET_IP_BE_ABNORMAL = + ConfigOptions.key("determine.intranet.ip.be.abnormal") + .booleanType() + .defaultValue(true) + .withDescription("Specifies whether intranet IP addresses should be treated as abnormal."); + + /** * Configuration option for specifying the Kafka topic name where the error data will be sent. * This configuration option is used when the output of error records is enabled. */ diff --git a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java index 8f79f05..bd8cd9b 100644 --- a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java +++ b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java @@ -3,6 +3,7 @@ package com.zdjizhi.flink.voip.error; import com.zdjizhi.flink.voip.conf.FusionConfigs; import com.zdjizhi.flink.voip.conf.FusionConfiguration; import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema; +import com.zdjizhi.flink.voip.functions.FunctionHelper; import com.zdjizhi.flink.voip.records.Record; import com.zdjizhi.flink.voip.records.SIPRecord; import com.zdjizhi.flink.voip.records.SchemaType; @@ -88,10 +89,19 @@ public class ErrorHandler { * The MeaninglessAddressProcessFunction is a custom ProcessFunction used to check for records * with invalid or meaningless addresses and ports. It separates them into the invalid output tag if necessary. */ -class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, ObjectNode> { +class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, ObjectNode> implements FunctionHelper { private static final Logger LOG = LoggerFactory.getLogger(MeaninglessAddressProcessFunction.class); + private transient boolean determineIntranetIpBeAbnormal; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + final Configuration config = getGlobalConfiguration(); + determineIntranetIpBeAbnormal = config.get(FusionConfigs.DETERMINE_INTRANET_IP_BE_ABNORMAL); + } + @Override public void processElement(ObjectNode obj, ProcessFunction<ObjectNode, ObjectNode>.Context ctx, @@ -100,20 +110,20 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje // Check for invalid or meaningless addresses and ports boolean cond1 = StringUtils.isNotBlank(record.getClientIp()) && StringUtils.isNotBlank(record.getServerIp()) && - record.getClientPort() >= 0 && - record.getServerPort() >= 0; + record.getClientPort() > 0 && + record.getServerPort() > 0; final SIPRecord sipRecord = new SIPRecord(obj); boolean cond2 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp()) - || isInternalIp(sipRecord.getOriginatorSdpConnectIp()); + || (!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getOriginatorSdpConnectIp())); boolean cond3 = !isIPAddress(sipRecord.getResponderSdpConnectIp()) - || isInternalIp(sipRecord.getResponderSdpConnectIp()); + || (!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getResponderSdpConnectIp())); boolean cond4 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) || isIPAddress(sipRecord.getResponderSdpConnectIp()); boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType()); boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() && - isInternalIp(sipRecord.getResponderSdpConnectIp()) && - isInternalIp(sipRecord.getOriginatorSdpConnectIp()); + (!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getResponderSdpConnectIp())) && + (!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getOriginatorSdpConnectIp())); // Both client and server addresses in the data are valid. if (cond1 && ( |
