summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author梁超 <[email protected]>2023-12-06 10:17:02 +0000
committer梁超 <[email protected]>2023-12-06 10:17:02 +0000
commit4179a0a8874d75454defafb559e65532e5b7b75d (patch)
tree1f20f73cf865c22d588f0ebd565ccf9d92540999
parente277117c6da9882308ee4238cb570e045852ee6b (diff)
parent6ebefc9026c06a4a5f65958e5c570bfefb621dd1 (diff)
Merge branch 'bugfix/some-err' into 'main'v1.2-rc2
[GAL-444] fix: output unmatched sip record See merge request galaxy/tsg_olap/sip-rtp-correlation!19
-rw-r--r--pom.xml2
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java4
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java11
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/records/Record.java3
4 files changed, 16 insertions, 4 deletions
diff --git a/pom.xml b/pom.xml
index fc50cd4..ae4a45f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>sip-rtp-correlation</artifactId>
- <version>1.2-rc1</version>
+ <version>1.2-rc2</version>
<name>Flink : SIP-RTP : Correlation</name>
diff --git a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java
index 8701891..efb53b6 100644
--- a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java
+++ b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java
@@ -99,7 +99,9 @@ public class CorrelateApp {
new JsonNodeSerializationSchema(),
fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
- voIpOperator.union(sipDoubleDirOperator).addSink(producer);
+ voIpOperator
+ .union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG))
+ .addSink(producer);
env.execute("SIP-RTP-CORRELATION");
}
diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java
index 6b9f8d2..274da5d 100644
--- a/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java
+++ b/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java
@@ -7,11 +7,13 @@ import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
/**
* A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction.
@@ -23,6 +25,9 @@ import org.apache.flink.util.Collector;
public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>
implements FunctionHelper {
+ public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG =
+ new OutputTag<>("unmatched-sip", TypeInformation.of(ObjectNode.class));
+
private transient Time fireInterval;
private transient ValueState<ObjectNode> valueState;
@@ -63,7 +68,7 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str
out.collect(value);
valueState.clear();
} else {
- // If the address is not yet in the mapState.
+ // If the address is not yet in the valueState.
valueState.update(value);
}
} else {
@@ -77,6 +82,10 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str
public void onTimer(long timestamp,
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx,
Collector<ObjectNode> out) throws Exception {
+ final ObjectNode value = valueState.value();
+ if (value != null) {
+ ctx.output(SIP_OUTPUT_TAG, value);
+ }
valueState.clear();
}
}
diff --git a/src/main/java/com/zdjizhi/flink/voip/records/Record.java b/src/main/java/com/zdjizhi/flink/voip/records/Record.java
index 41560e4..46052f3 100644
--- a/src/main/java/com/zdjizhi/flink/voip/records/Record.java
+++ b/src/main/java/com/zdjizhi/flink/voip/records/Record.java
@@ -61,7 +61,8 @@ public class Record {
* @return The VSys ID as an integer.
*/
public int getVSysID() {
- return Record.getInt(obj, F_COMMON_VSYS_ID);
+ int v = Record.getInt(obj, F_COMMON_VSYS_ID);
+ return v == 0 ? 1 : v;
}
/**