diff options
| author | qidaijie <[email protected]> | 2021-12-28 10:50:45 +0300 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2021-12-28 10:50:45 +0300 |
| commit | 4d10705cdb1e34604a91a5b670a74fb17a0fe00e (patch) | |
| tree | 6e2341ca19cf6041042be5943f5504033c16144d | |
| parent | 2a32156c9eb8205e3c7bad02c3bdacdeabdd9000 (diff) | |
提交两层聚合代码
| -rw-r--r-- | properties/service_flow_config.properties | 3 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java | 3 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java | 3 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java (renamed from src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java) | 6 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java | 1 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/general/ParseFunctions.java | 4 |
6 files changed, 9 insertions, 11 deletions
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 2f62434..93384d2 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -43,9 +43,10 @@ source.parallelism=1 #map函数并行度 parse.parallelism=2 -#count 函数并行度 +#first count 函数并行度 first.window.parallelism=2 +#second count 函数并行度 second.window.parallelism=2 #producer 并行度 diff --git a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java index b726ca8..b6299af 100644 --- a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java +++ b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java @@ -7,7 +7,6 @@ import com.zdjizhi.utils.functions.*; import com.zdjizhi.utils.kafka.Consumer; import com.zdjizhi.utils.kafka.Producer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -39,7 +38,7 @@ public class StreamAggregateTopology { DataStream<String> streamSource = environment.addSource(Consumer.getKafkaConsumer()) .setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM); - SingleOutputStreamOperator<Tuple4<String, String, String, String>> parseDataMap = streamSource.map(new MapParseFunction()) + SingleOutputStreamOperator<Tuple4<String, String, String, String>> parseDataMap = streamSource.map(new ParseMapFunction()) .name("ParseDataMap") .setParallelism(StreamAggregateConfig.PARSE_PARALLELISM); diff --git a/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java index 1f821c1..be827c1 100644 --- a/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java @@ -15,6 +15,7 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * @author qidaijie @@ -28,7 +29,6 @@ public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple4<Strin private static HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricsMap(); private static HashMap<String, String[]> actionMap = JsonParseUtil.getActionMap(); private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(320); - private static String resultTimeKey = JsonParseUtil.getTimeKey(); @Override @SuppressWarnings("unchecked") @@ -61,7 +61,6 @@ public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple4<Strin for (String countKey : cacheMap.keySet()) { Map<String, Object> resultMap = cacheMap.get(countKey); - JsonParseUtil.setValue(resultMap, resultTimeKey, endTime); output.collect(new Tuple2<>(countKey, JsonMapper.toJsonString(resultMap))); } } diff --git a/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java index 90a21a1..aa59e6b 100644 --- a/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java @@ -25,7 +25,7 @@ import java.util.Map; * @Description: * @date 2021/5/2715:01 */ -public class MapParseFunction implements MapFunction<String,Tuple4<String,String, String, String>> { +public class ParseMapFunction implements MapFunction<String,Tuple4<String,String, String, String>> { private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class); private static ArrayList<String[]> jobList = JsonParseUtil.getTransformsList(); @@ -38,7 +38,7 @@ public class MapParseFunction implements MapFunction<String,Tuple4<String,String try { if (StringUtil.isNotBlank(message)) { Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class); -// String streamTraceId = JsonParseUtil.getString(object, "common_stream_trace_id"); + String streamTraceId = JsonParseUtil.getString(object, "common_stream_trace_id"); Map<String, Object> dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, object); if (ParseFunctions.filterLogs(object)) { for (String[] strings : jobList) { @@ -76,8 +76,6 @@ public class MapParseFunction implements MapFunction<String,Tuple4<String,String } break; case "hierarchy": - String streamTraceId = JsonParseUtil.getString(object, "common_stream_trace_id"); -// RandomUtil.randomInt(0, StreamAggregateConfig.COUNT_PARALLELISM) return new Tuple4<>(streamTraceId,JsonParseUtil.getString(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object)); default: break; diff --git a/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java index 455414a..794b4ea 100644 --- a/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java @@ -14,6 +14,7 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * @author qidaijie diff --git a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java index 5ab46e6..6bb3178 100644 --- a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java +++ b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java @@ -51,8 +51,8 @@ public class ParseFunctions { for (String key : filtersMap.keySet()) { switch (key) { case "notempty": - Object value = JsonParseUtil.getValue(object, filtersMap.get(key)); - if (value != null && StringUtil.isNotBlank(value.toString())) { + String value = JsonParseUtil.getString(object, filtersMap.get(key)); + if (StringUtil.isNotBlank(value)) { available = true; } break; |
