diff options
| author | 梁超 <[email protected]> | 2023-12-08 06:36:00 +0000 |
|---|---|---|
| committer | 梁超 <[email protected]> | 2023-12-08 06:36:00 +0000 |
| commit | 93ed6bcddc9d61cee591f6769955cd1bb76559b1 (patch) | |
| tree | e10a5e32d250ebee4e45929a2aa33b73ee1b2041 | |
| parent | e51d693fa051f1d4a3f905365991cc3686f03cbc (diff) | |
| parent | 4ab96737a162a8a0cb726f1abc48ad60743ebbae (diff) | |
Merge branch 'hotfix/duplicate-voip' into 'release/1.1'v1.1-rc3
merge: merge 1.2
See merge request galaxy/tsg_olap/sip-rtp-correlation!22
6 files changed, 36 insertions, 8 deletions
@@ -7,7 +7,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>sip-rtp-correlation</artifactId> - <version>1.1-rc2</version> + <version>1.1-rc3</version> <name>Flink : SIP-RTP : Correlation</name> @@ -24,6 +24,18 @@ <jackson.version>2.13.2.20220328</jackson.version> </properties> + <distributionManagement> + <repository> + <id>platform-releases</id> + <url>http://192.168.40.153:8099/content/repositories/platform-release</url> + <uniqueVersion>true</uniqueVersion> + </repository> + <snapshotRepository> + <id>platform-snapshots</id> + <url>http://192.168.40.153:8099/content/repositories/platform-snapshot</url> + </snapshotRepository> + </distributionManagement> + <dependencies> <dependency> <groupId>org.apache.flink</groupId> diff --git a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java index 5b699f3..ecad56f 100644 --- a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java +++ b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java @@ -99,7 +99,9 @@ public class CorrelateApp { new JsonNodeSerializationSchema(), fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)); - voIpOperator.union(sipDoubleDirOperator).addSink(producer); + voIpOperator + .union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG)) + .addSink(producer); env.execute("SIP-RTP-CORRELATION"); } diff --git a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java index 2f1b767..98bbbe0 100644 --- a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java +++ b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java @@ -9,7 +9,6 @@ import com.zdjizhi.flink.voip.records.SIPRecord; import com.zdjizhi.flink.voip.records.SchemaType; import com.zdjizhi.flink.voip.records.StreamDir; import com.zdjizhi.utils.IPUtil; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; @@ -108,8 +107,8 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje Collector<ObjectNode> out) throws Exception { final Record record = new Record(obj); // Check for invalid or meaningless addresses and ports - boolean cond1 = StringUtils.isNotBlank(record.getClientIp()) && - StringUtils.isNotBlank(record.getServerIp()) && + boolean cond1 = isIPAddress(record.getClientIp()) && + isIPAddress(record.getServerIp()) && record.getClientPort() > 0 && record.getServerPort() > 0; @@ -125,8 +124,12 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje (includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) && (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp())); + boolean cond7 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) && + isIPAddress(sipRecord.getResponderSdpConnectIp()) && + sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0; + // Both client and server addresses in the data are valid. - if (cond1 && ( + if (cond1 && (!cond5 || cond7) && ( // The address in the SIP one-way stream is valid and not an internal network address. cond2 && cond3 && cond4 && cond5 // The coordinating addresses in the SIP double directional stream are valid diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java index 6b9f8d2..274da5d 100644 --- a/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java +++ b/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java @@ -7,11 +7,13 @@ import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; /** * A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction. @@ -23,6 +25,9 @@ import org.apache.flink.util.Collector; public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode> implements FunctionHelper { + public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG = + new OutputTag<>("unmatched-sip", TypeInformation.of(ObjectNode.class)); + private transient Time fireInterval; private transient ValueState<ObjectNode> valueState; @@ -63,7 +68,7 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str out.collect(value); valueState.clear(); } else { - // If the address is not yet in the mapState. + // If the address is not yet in the valueState. valueState.update(value); } } else { @@ -77,6 +82,10 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str public void onTimer(long timestamp, KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx, Collector<ObjectNode> out) throws Exception { + final ObjectNode value = valueState.value(); + if (value != null) { + ctx.output(SIP_OUTPUT_TAG, value); + } valueState.clear(); } } diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java index 19dc44e..a08f86b 100644 --- a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java +++ b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java @@ -83,6 +83,7 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A rtpRecord.merge(sipObj) .setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue()); out.collect(rtpObj); + iterator.remove(); switch (entry.getKey()) { case S2C: diff --git a/src/main/java/com/zdjizhi/flink/voip/records/Record.java b/src/main/java/com/zdjizhi/flink/voip/records/Record.java index 8e57e17..2d1cdc6 100644 --- a/src/main/java/com/zdjizhi/flink/voip/records/Record.java +++ b/src/main/java/com/zdjizhi/flink/voip/records/Record.java @@ -57,7 +57,8 @@ public class Record { * @return The VSys ID as an integer. */ public int getVSysID() { - return Record.getInt(obj, F_COMMON_VSYS_ID); + int v = Record.getInt(obj, F_COMMON_VSYS_ID); + return v == 0 ? 1 : v; } /** |
