summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/common/CKDelayProcess.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/zdjizhi/common/CKDelayProcess.java')
-rw-r--r--src/main/java/com/zdjizhi/common/CKDelayProcess.java65
1 files changed, 65 insertions, 0 deletions
diff --git a/src/main/java/com/zdjizhi/common/CKDelayProcess.java b/src/main/java/com/zdjizhi/common/CKDelayProcess.java
new file mode 100644
index 0000000..35ec90e
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/CKDelayProcess.java
@@ -0,0 +1,65 @@
+package com.zdjizhi.common;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Spliterator;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+public class CKDelayProcess extends ProcessFunction<Map<String, Object>, List<Map<String, Object>>> {
+
+
+ private ValueState<Long> currentTimer;
+ private ListState<Map<String, Object>> itemState;
+ private String stateName;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<>(getStateName() + "_timer", Types.LONG));
+ ListStateDescriptor<Map<String, Object>> itemViewStateDesc = new ListStateDescriptor(getStateName() + "_state", Map.class);
+ itemState = getRuntimeContext().getListState(itemViewStateDesc);
+ }
+
+ @Override
+ public void processElement(Map<String, Object> value, Context context, Collector<List<Map<String, Object>>> collector) throws Exception {
+ //判断定时器是否为空,为空则创建新的定时器
+ Long curTimeStamp = currentTimer.value();
+ if (curTimeStamp == null || curTimeStamp == 0) {
+ long onTimer = context.timestamp() + FlowWriteConfig.SINK_BATCH_TIME_OUT * 1000;
+ context.timerService().registerEventTimeTimer(onTimer);
+ currentTimer.update(onTimer);
+ }
+ itemState.add(value);
+ }
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<Map<String, Object>>> out) throws Exception {
+ Spliterator<Map<String, Object>> spliterator = itemState.get().spliterator();
+ List<Map<String, Object>> collect = StreamSupport.stream(spliterator, false)
+ .collect(Collectors.toList());
+ out.collect(collect);
+ currentTimer.clear();
+ itemState.clear();
+ }
+
+ public CKDelayProcess(String stateName) {
+ this.stateName = stateName;
+ }
+
+ public String getStateName() {
+ return stateName;
+ }
+
+ public void setStateName(String stateName) {
+ this.stateName = stateName;
+ }
+}