summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/common/TopMetricProcessV2.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/zdjizhi/common/TopMetricProcessV2.java')
-rw-r--r--src/main/java/com/zdjizhi/common/TopMetricProcessV2.java65
1 files changed, 0 insertions, 65 deletions
diff --git a/src/main/java/com/zdjizhi/common/TopMetricProcessV2.java b/src/main/java/com/zdjizhi/common/TopMetricProcessV2.java
deleted file mode 100644
index 46d308d..0000000
--- a/src/main/java/com/zdjizhi/common/TopMetricProcessV2.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package com.zdjizhi.common;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.util.Collector;
-
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeSet;
-
-public class TopMetricProcessV2 extends ProcessFunction<Map<String,Object>, Collector<Map<String,Object>>> {
-
-
- private ValueState<Long> currentTimer;
- private ListState<Map<String,Object>> itemState;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<>("_timer", Types.LONG));
- ListStateDescriptor<Map<String,Object>> itemViewStateDesc = new ListStateDescriptor("_state", Map.class);
- itemState = getRuntimeContext().getListState(itemViewStateDesc);
- }
-
- @Override
- public void processElement(Map<String,Object> value, Context context, Collector<Collector<Map<String,Object>>> collector) throws Exception {
- //判断定时器是否为空,为空则创建新的定时器
- Long curTimeStamp = currentTimer.value();
- if (curTimeStamp == null || curTimeStamp == 0) {
- long onTimer = context.timestamp() + FlowWriteConfig.SINK_BATCH_TIME_OUT * 1000;
- context.timerService().registerEventTimeTimer(onTimer);
- currentTimer.update(onTimer);
- }
- itemState.add(value);
- }
-
- @Override
- public void onTimer(long timestamp, OnTimerContext ctx, Collector<Collector<Map<String, Object>>> out) throws Exception {
- super.onTimer(timestamp, ctx, out);
-
- Iterator<Map<String,Object>> iterator = itemState.get().iterator();
- if(iterator.hasNext()){
- out.collect((Collector<Map<String, Object>>) iterator.next());
- }
-// if (baseLogs.size() > FlowWriteConfig.SINK_BATCH) {
-// Map last = baseLogs.last();
-// if (Double.compare(map.get(orderBy).doubleValue(), last.get(orderBy).doubleValue()) > 0) {
-// baseLogs.pollLast();
-// baseLogs.add(map);
-// }
-// } else {
-// baseLogs.add(map);
-// }
-// }
- currentTimer.clear();
- itemState.clear();
-
-
- }
-}