diff options
Diffstat (limited to 'src/main/java/com/zdjizhi/common/TopMetricProcessV2.java')
| -rw-r--r-- | src/main/java/com/zdjizhi/common/TopMetricProcessV2.java | 65 |
1 files changed, 0 insertions, 65 deletions
diff --git a/src/main/java/com/zdjizhi/common/TopMetricProcessV2.java b/src/main/java/com/zdjizhi/common/TopMetricProcessV2.java deleted file mode 100644 index 46d308d..0000000 --- a/src/main/java/com/zdjizhi/common/TopMetricProcessV2.java +++ /dev/null @@ -1,65 +0,0 @@ -package com.zdjizhi.common; - -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.util.Collector; - -import java.util.Comparator; -import java.util.Iterator; -import java.util.Map; -import java.util.TreeSet; - -public class TopMetricProcessV2 extends ProcessFunction<Map<String,Object>, Collector<Map<String,Object>>> { - - - private ValueState<Long> currentTimer; - private ListState<Map<String,Object>> itemState; - - @Override - public void open(Configuration parameters) throws Exception { - currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<>("_timer", Types.LONG)); - ListStateDescriptor<Map<String,Object>> itemViewStateDesc = new ListStateDescriptor("_state", Map.class); - itemState = getRuntimeContext().getListState(itemViewStateDesc); - } - - @Override - public void processElement(Map<String,Object> value, Context context, Collector<Collector<Map<String,Object>>> collector) throws Exception { - //判断定时器是否为空,为空则创建新的定时器 - Long curTimeStamp = currentTimer.value(); - if (curTimeStamp == null || curTimeStamp == 0) { - long onTimer = context.timestamp() + FlowWriteConfig.SINK_BATCH_TIME_OUT * 1000; - context.timerService().registerEventTimeTimer(onTimer); - currentTimer.update(onTimer); - } - itemState.add(value); - } - - @Override - public void onTimer(long timestamp, OnTimerContext ctx, Collector<Collector<Map<String, Object>>> out) throws Exception { - super.onTimer(timestamp, ctx, out); - - Iterator<Map<String,Object>> iterator = itemState.get().iterator(); - if(iterator.hasNext()){ - out.collect((Collector<Map<String, Object>>) iterator.next()); - } -// if (baseLogs.size() > FlowWriteConfig.SINK_BATCH) { -// Map last = baseLogs.last(); -// if (Double.compare(map.get(orderBy).doubleValue(), last.get(orderBy).doubleValue()) > 0) { -// baseLogs.pollLast(); -// baseLogs.add(map); -// } -// } else { -// baseLogs.add(map); -// } -// } - currentTimer.clear(); - itemState.clear(); - - - } -} |
