summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2021-12-28 10:50:45 +0300
committerqidaijie <[email protected]>2021-12-28 10:50:45 +0300
commit4d10705cdb1e34604a91a5b670a74fb17a0fe00e (patch)
tree6e2341ca19cf6041042be5943f5504033c16144d
parent2a32156c9eb8205e3c7bad02c3bdacdeabdd9000 (diff)
提交两层聚合代码
-rw-r--r--properties/service_flow_config.properties3
-rw-r--r--src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java3
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java3
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java (renamed from src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java)6
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java1
-rw-r--r--src/main/java/com/zdjizhi/utils/general/ParseFunctions.java4
6 files changed, 9 insertions, 11 deletions
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 2f62434..93384d2 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -43,9 +43,10 @@ source.parallelism=1
#map函数并行度
parse.parallelism=2
-#count 函数并行度
+#first count 函数并行度
first.window.parallelism=2
+#second count 函数并行度
second.window.parallelism=2
#producer 并行度
diff --git a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java
index b726ca8..b6299af 100644
--- a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java
+++ b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java
@@ -7,7 +7,6 @@ import com.zdjizhi.utils.functions.*;
import com.zdjizhi.utils.kafka.Consumer;
import com.zdjizhi.utils.kafka.Producer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -39,7 +38,7 @@ public class StreamAggregateTopology {
DataStream<String> streamSource = environment.addSource(Consumer.getKafkaConsumer())
.setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM);
- SingleOutputStreamOperator<Tuple4<String, String, String, String>> parseDataMap = streamSource.map(new MapParseFunction())
+ SingleOutputStreamOperator<Tuple4<String, String, String, String>> parseDataMap = streamSource.map(new ParseMapFunction())
.name("ParseDataMap")
.setParallelism(StreamAggregateConfig.PARSE_PARALLELISM);
diff --git a/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java
index 1f821c1..be827c1 100644
--- a/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java
@@ -15,6 +15,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* @author qidaijie
@@ -28,7 +29,6 @@ public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple4<Strin
private static HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricsMap();
private static HashMap<String, String[]> actionMap = JsonParseUtil.getActionMap();
private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(320);
- private static String resultTimeKey = JsonParseUtil.getTimeKey();
@Override
@SuppressWarnings("unchecked")
@@ -61,7 +61,6 @@ public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple4<Strin
for (String countKey : cacheMap.keySet()) {
Map<String, Object> resultMap = cacheMap.get(countKey);
- JsonParseUtil.setValue(resultMap, resultTimeKey, endTime);
output.collect(new Tuple2<>(countKey, JsonMapper.toJsonString(resultMap)));
}
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java
index 90a21a1..aa59e6b 100644
--- a/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java
@@ -25,7 +25,7 @@ import java.util.Map;
* @Description:
* @date 2021/5/2715:01
*/
-public class MapParseFunction implements MapFunction<String,Tuple4<String,String, String, String>> {
+public class ParseMapFunction implements MapFunction<String,Tuple4<String,String, String, String>> {
private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class);
private static ArrayList<String[]> jobList = JsonParseUtil.getTransformsList();
@@ -38,7 +38,7 @@ public class MapParseFunction implements MapFunction<String,Tuple4<String,String
try {
if (StringUtil.isNotBlank(message)) {
Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
-// String streamTraceId = JsonParseUtil.getString(object, "common_stream_trace_id");
+ String streamTraceId = JsonParseUtil.getString(object, "common_stream_trace_id");
Map<String, Object> dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, object);
if (ParseFunctions.filterLogs(object)) {
for (String[] strings : jobList) {
@@ -76,8 +76,6 @@ public class MapParseFunction implements MapFunction<String,Tuple4<String,String
}
break;
case "hierarchy":
- String streamTraceId = JsonParseUtil.getString(object, "common_stream_trace_id");
-// RandomUtil.randomInt(0, StreamAggregateConfig.COUNT_PARALLELISM)
return new Tuple4<>(streamTraceId,JsonParseUtil.getString(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object));
default:
break;
diff --git a/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java
index 455414a..794b4ea 100644
--- a/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java
@@ -14,6 +14,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* @author qidaijie
diff --git a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java
index 5ab46e6..6bb3178 100644
--- a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java
+++ b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java
@@ -51,8 +51,8 @@ public class ParseFunctions {
for (String key : filtersMap.keySet()) {
switch (key) {
case "notempty":
- Object value = JsonParseUtil.getValue(object, filtersMap.get(key));
- if (value != null && StringUtil.isNotBlank(value.toString())) {
+ String value = JsonParseUtil.getString(object, filtersMap.get(key));
+ if (StringUtil.isNotBlank(value)) {
available = true;
}
break;