diff options
Diffstat (limited to 'src/main/java/com/zdjizhi/common/CKDelayProcess.java')
| -rw-r--r-- | src/main/java/com/zdjizhi/common/CKDelayProcess.java | 65 |
1 files changed, 65 insertions, 0 deletions
diff --git a/src/main/java/com/zdjizhi/common/CKDelayProcess.java b/src/main/java/com/zdjizhi/common/CKDelayProcess.java new file mode 100644 index 0000000..35ec90e --- /dev/null +++ b/src/main/java/com/zdjizhi/common/CKDelayProcess.java @@ -0,0 +1,65 @@ +package com.zdjizhi.common; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.List; +import java.util.Map; +import java.util.Spliterator; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +public class CKDelayProcess extends ProcessFunction<Map<String, Object>, List<Map<String, Object>>> { + + + private ValueState<Long> currentTimer; + private ListState<Map<String, Object>> itemState; + private String stateName; + + @Override + public void open(Configuration parameters) throws Exception { + currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<>(getStateName() + "_timer", Types.LONG)); + ListStateDescriptor<Map<String, Object>> itemViewStateDesc = new ListStateDescriptor(getStateName() + "_state", Map.class); + itemState = getRuntimeContext().getListState(itemViewStateDesc); + } + + @Override + public void processElement(Map<String, Object> value, Context context, Collector<List<Map<String, Object>>> collector) throws Exception { + //判断定时器是否为空,为空则创建新的定时器 + Long curTimeStamp = currentTimer.value(); + if (curTimeStamp == null || curTimeStamp == 0) { + long onTimer = context.timestamp() + FlowWriteConfig.SINK_BATCH_TIME_OUT * 1000; + context.timerService().registerEventTimeTimer(onTimer); + currentTimer.update(onTimer); + } + itemState.add(value); + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<Map<String, Object>>> out) throws Exception { + Spliterator<Map<String, Object>> spliterator = itemState.get().spliterator(); + List<Map<String, Object>> collect = StreamSupport.stream(spliterator, false) + .collect(Collectors.toList()); + out.collect(collect); + currentTimer.clear(); + itemState.clear(); + } + + public CKDelayProcess(String stateName) { + this.stateName = stateName; + } + + public String getStateName() { + return stateName; + } + + public void setStateName(String stateName) { + this.stateName = stateName; + } +} |
