diff options
| author | 梁超 <[email protected]> | 2023-12-06 10:17:02 +0000 |
|---|---|---|
| committer | 梁超 <[email protected]> | 2023-12-06 10:17:02 +0000 |
| commit | 4179a0a8874d75454defafb559e65532e5b7b75d (patch) | |
| tree | 1f20f73cf865c22d588f0ebd565ccf9d92540999 | |
| parent | e277117c6da9882308ee4238cb570e045852ee6b (diff) | |
| parent | 6ebefc9026c06a4a5f65958e5c570bfefb621dd1 (diff) | |
Merge branch 'bugfix/some-err' into 'main'v1.2-rc2
[GAL-444] fix: output unmatched sip record
See merge request galaxy/tsg_olap/sip-rtp-correlation!19
| -rw-r--r-- | pom.xml | 2 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java | 4 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java | 11 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/flink/voip/records/Record.java | 3 |
4 files changed, 16 insertions, 4 deletions
@@ -7,7 +7,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>sip-rtp-correlation</artifactId> - <version>1.2-rc1</version> + <version>1.2-rc2</version> <name>Flink : SIP-RTP : Correlation</name> diff --git a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java index 8701891..efb53b6 100644 --- a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java +++ b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java @@ -99,7 +99,9 @@ public class CorrelateApp { new JsonNodeSerializationSchema(), fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)); - voIpOperator.union(sipDoubleDirOperator).addSink(producer); + voIpOperator + .union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG)) + .addSink(producer); env.execute("SIP-RTP-CORRELATION"); } diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java index 6b9f8d2..274da5d 100644 --- a/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java +++ b/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java @@ -7,11 +7,13 @@ import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; /** * A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction. @@ -23,6 +25,9 @@ import org.apache.flink.util.Collector; public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode> implements FunctionHelper { + public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG = + new OutputTag<>("unmatched-sip", TypeInformation.of(ObjectNode.class)); + private transient Time fireInterval; private transient ValueState<ObjectNode> valueState; @@ -63,7 +68,7 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str out.collect(value); valueState.clear(); } else { - // If the address is not yet in the mapState. + // If the address is not yet in the valueState. valueState.update(value); } } else { @@ -77,6 +82,10 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str public void onTimer(long timestamp, KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx, Collector<ObjectNode> out) throws Exception { + final ObjectNode value = valueState.value(); + if (value != null) { + ctx.output(SIP_OUTPUT_TAG, value); + } valueState.clear(); } } diff --git a/src/main/java/com/zdjizhi/flink/voip/records/Record.java b/src/main/java/com/zdjizhi/flink/voip/records/Record.java index 41560e4..46052f3 100644 --- a/src/main/java/com/zdjizhi/flink/voip/records/Record.java +++ b/src/main/java/com/zdjizhi/flink/voip/records/Record.java @@ -61,7 +61,8 @@ public class Record { * @return The VSys ID as an integer. */ public int getVSysID() { - return Record.getInt(obj, F_COMMON_VSYS_ID); + int v = Record.getInt(obj, F_COMMON_VSYS_ID); + return v == 0 ? 1 : v; } /** |
