diff options
| author | 梁超 <[email protected]> | 2023-10-12 02:44:32 +0000 |
|---|---|---|
| committer | 梁超 <[email protected]> | 2023-10-12 02:44:32 +0000 |
| commit | 970977ba3c62d718fe96c84a6d11c765a3b943ad (patch) | |
| tree | afd9c99c7e9cb28cf9832ada554ca9712d438c7c | |
| parent | 35e2807a91984db6d62bf87d10af9d345bed2f0e (diff) | |
| parent | 732d9f5aa9b88725a24f3dbf379ada419fc93d6f (diff) | |
Merge branch 'hotfix/output-sip' into 'release/1.0'v1.0-rc4
hotfix: add sip record output
See merge request galaxy/tsg_olap/sip-rtp-correlation!11
| -rw-r--r-- | CHANGELOG.md | 5 | ||||
| -rw-r--r-- | pom.xml | 2 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java | 17 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java | 4 |
4 files changed, 19 insertions, 9 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 22339a4..c098da0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ # Changelog ### Hotfix - - [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常
\ No newline at end of file + - [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常 + +### Other + - 输出 SIP Record
\ No newline at end of file @@ -7,7 +7,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>sip-rtp-correlation</artifactId> - <version>1.0-rc3</version> + <version>1.0-rc4</version> <name>Flink : SIP-RTP : Correlation</name> diff --git a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java index 58b5e53..5b699f3 100644 --- a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java +++ b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java @@ -4,8 +4,8 @@ import com.zdjizhi.flink.voip.conf.FusionConfiguration; import com.zdjizhi.flink.voip.error.ErrorHandler; import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema; import com.zdjizhi.flink.voip.functions.*; -import com.zdjizhi.flink.voip.records.Record; -import com.zdjizhi.flink.voip.records.SIPRecord; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; @@ -13,12 +13,13 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.formats.json.JsonNodeDeserializationSchema; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import java.time.Duration; + import static com.zdjizhi.flink.voip.conf.FusionConfigs.*; /** @@ -52,7 +53,13 @@ public class CorrelateApp { fusionConfiguration .getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX)); - final DataStreamSource<ObjectNode> sourceStream = env.addSource(kafkaConsumer); + final DataStream<ObjectNode> sourceStream = env.addSource(kafkaConsumer) + .assignTimestampsAndWatermarks( + WatermarkStrategy + .<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(5)) + .withTimestampAssigner((SerializableTimestampAssigner<ObjectNode>) + (element, recordTimestamp) -> + element.get("common_start_timestamp_ms").asLong())); final ErrorHandler errorHandler = new ErrorHandler(config); @@ -92,7 +99,7 @@ public class CorrelateApp { new JsonNodeSerializationSchema(), fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)); - voIpOperator.addSink(producer); + voIpOperator.union(sipDoubleDirOperator).addSink(producer); env.execute("SIP-RTP-CORRELATION"); } diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java b/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java index c055db0..216e283 100644 --- a/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java +++ b/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java @@ -26,7 +26,7 @@ public interface FunctionHelper extends RichFunction { } default void registerNextFireTimestamp(TimerService timerService, long interval) { - long current = timerService.currentProcessingTime(); - timerService.registerProcessingTimeTimer(current + interval); + long current = timerService.currentWatermark(); + timerService.registerEventTimeTimer(current + interval); } } |
