diff options
| author | chaochaoc <[email protected]> | 2024-05-10 11:58:35 +0800 |
|---|---|---|
| committer | chaochaoc <[email protected]> | 2024-05-10 11:58:35 +0800 |
| commit | d01235e092820c8c962935d4bab87ee4e74bb1bd (patch) | |
| tree | 93676fa2c79c620be13aa936b9c39ac897400264 /src | |
| parent | f21e814763292bd8341ae59fe14fa245e84f40ad (diff) | |
[GAL-568] fix: add field 'rtp_originator_dir'
Diffstat (limited to 'src')
3 files changed, 54 insertions, 5 deletions
diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java index a08f86b..c8e32b7 100644 --- a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java +++ b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java @@ -1,9 +1,8 @@ package com.zdjizhi.flink.voip.functions; import com.zdjizhi.flink.voip.conf.FusionConfigs; -import com.zdjizhi.flink.voip.records.Record; -import com.zdjizhi.flink.voip.records.SchemaType; -import com.zdjizhi.flink.voip.records.StreamDir; +import com.zdjizhi.flink.voip.records.*; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.*; import org.apache.flink.api.common.time.Time; @@ -80,6 +79,8 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A final ObjectNode rtpObj = entry.getValue(); final Record rtpRecord = new Record(rtpObj); + completeOriginatorField(rtpRecord, new SIPRecord(sipObj)); + rtpRecord.merge(sipObj) .setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue()); out.collect(rtpObj); @@ -121,6 +122,8 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A final StreamDir streamDir = rtpRecord.getStreamDir(); if (null != info) { + completeOriginatorField(rtpRecord, new SIPRecord(info.getObj())); + rtpRecord.merge(info.getObj()) .setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue()); out.collect(rtpObj); @@ -150,9 +153,28 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx, Collector<ObjectNode> out) throws Exception { for (ObjectNode obj : rtpState.values()) { + final Record rtpRecord = new Record(obj); + rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode()); out.collect(obj); } rtpState.clear(); sipState.clear(); } + + // ====================================================================== + // PRIVATE HELPER + // ====================================================================== + + private void completeOriginatorField(final Record rtpRecord, final SIPRecord sipRecord) { + if (StringUtils.isNoneBlank(rtpRecord.getClientIp())) { + if (StringUtils.equals(sipRecord.getOriginatorSdpConnectIp(), rtpRecord.getClientIp())) { + rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.C2S.getCode()); + return; + } else if (StringUtils.equals(sipRecord.getResponderSdpConnectIp(), rtpRecord.getClientIp())) { + rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.S2C.getCode()); + return; + } + } + rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode()); + } }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java b/src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java new file mode 100644 index 0000000..c8df7db --- /dev/null +++ b/src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java @@ -0,0 +1,27 @@ +package com.zdjizhi.flink.voip.records; + +import lombok.Getter; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +public class RTPRecord extends Record { + + public static final String F_ORIGINATOR_DIR = "rtp_originator_dir"; + + public RTPRecord(ObjectNode obj) { + super(obj); + } + + @Getter + public enum OriginatorDir { + + UNKNOWN(0), + C2S(1), + S2C(2); + + private final int code; + + OriginatorDir(int code) { + this.code = code; + } + } +} diff --git a/src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java b/src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java index 829f5f5..0f22986 100644 --- a/src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java +++ b/src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java @@ -34,10 +34,10 @@ public class RecordTest { final ObjectNode obj = mapper.createObjectNode(); final Record record = new Record(obj); record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue()); - assertEquals(SchemaType.RTP.getValue(), record.getSchemaType()); + assertEquals(SchemaType.RTP, record.getSchemaType()); obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaType.VOIP.getValue())); - assertEquals(SchemaType.VOIP.getValue(), record.getSchemaType()); + assertEquals(SchemaType.VOIP, record.getSchemaType()); } @Test |
