summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author梁超 <[email protected]>2023-10-12 02:44:32 +0000
committer梁超 <[email protected]>2023-10-12 02:44:32 +0000
commit970977ba3c62d718fe96c84a6d11c765a3b943ad (patch)
treeafd9c99c7e9cb28cf9832ada554ca9712d438c7c
parent35e2807a91984db6d62bf87d10af9d345bed2f0e (diff)
parent732d9f5aa9b88725a24f3dbf379ada419fc93d6f (diff)
Merge branch 'hotfix/output-sip' into 'release/1.0'v1.0-rc4
hotfix: add sip record output See merge request galaxy/tsg_olap/sip-rtp-correlation!11
-rw-r--r--CHANGELOG.md5
-rw-r--r--pom.xml2
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java17
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java4
4 files changed, 19 insertions, 9 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 22339a4..c098da0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,7 @@
# Changelog
### Hotfix
- - [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常 \ No newline at end of file
+ - [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常
+
+### Other
+ - 输出 SIP Record \ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 9ad26c3..eceeb27 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>sip-rtp-correlation</artifactId>
- <version>1.0-rc3</version>
+ <version>1.0-rc4</version>
<name>Flink : SIP-RTP : Correlation</name>
diff --git a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java
index 58b5e53..5b699f3 100644
--- a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java
+++ b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java
@@ -4,8 +4,8 @@ import com.zdjizhi.flink.voip.conf.FusionConfiguration;
import com.zdjizhi.flink.voip.error.ErrorHandler;
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
import com.zdjizhi.flink.voip.functions.*;
-import com.zdjizhi.flink.voip.records.Record;
-import com.zdjizhi.flink.voip.records.SIPRecord;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
@@ -13,12 +13,13 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import java.time.Duration;
+
import static com.zdjizhi.flink.voip.conf.FusionConfigs.*;
/**
@@ -52,7 +53,13 @@ public class CorrelateApp {
fusionConfiguration
.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
- final DataStreamSource<ObjectNode> sourceStream = env.addSource(kafkaConsumer);
+ final DataStream<ObjectNode> sourceStream = env.addSource(kafkaConsumer)
+ .assignTimestampsAndWatermarks(
+ WatermarkStrategy
+ .<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(5))
+ .withTimestampAssigner((SerializableTimestampAssigner<ObjectNode>)
+ (element, recordTimestamp) ->
+ element.get("common_start_timestamp_ms").asLong()));
final ErrorHandler errorHandler = new ErrorHandler(config);
@@ -92,7 +99,7 @@ public class CorrelateApp {
new JsonNodeSerializationSchema(),
fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
- voIpOperator.addSink(producer);
+ voIpOperator.union(sipDoubleDirOperator).addSink(producer);
env.execute("SIP-RTP-CORRELATION");
}
diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java b/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java
index c055db0..216e283 100644
--- a/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java
+++ b/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java
@@ -26,7 +26,7 @@ public interface FunctionHelper extends RichFunction {
}
default void registerNextFireTimestamp(TimerService timerService, long interval) {
- long current = timerService.currentProcessingTime();
- timerService.registerProcessingTimeTimer(current + interval);
+ long current = timerService.currentWatermark();
+ timerService.registerEventTimeTimer(current + interval);
}
}