summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2020-12-25 17:25:33 +0800
committerqidaijie <[email protected]>2020-12-25 17:25:33 +0800
commit6c858283e8cbf7d69eb263dae27e8e1daa8dac05 (patch)
tree9c987e7fcbf299778ada437f79e9ee944064a440
parent5f094ab7edaf040b7a99211503e29c5e6dfd13e6 (diff)
提交线上使用版本代码
-rw-r--r--pom.xml29
-rw-r--r--properties/service_flow_config.properties38
-rw-r--r--src/main/java/cn/ac/iie/storm/bolt/CalculateBolt.java171
-rw-r--r--src/main/java/cn/ac/iie/storm/bolt/GatheringBolt.java (renamed from src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java)52
-rw-r--r--src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java97
-rw-r--r--src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java23
-rw-r--r--src/main/java/cn/ac/iie/storm/bolt/change/FilterBolt.java68
-rw-r--r--src/main/java/cn/ac/iie/storm/bolt/print/PrintBolt.java40
-rw-r--r--src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java6
-rw-r--r--src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java15
-rw-r--r--src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java37
-rw-r--r--src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java8
-rw-r--r--src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java15
-rw-r--r--src/test/java/com/wp/AppIdTest.java54
-rw-r--r--src/test/java/com/wp/FilterBolt.java133
-rw-r--r--src/test/java/com/wp/SchemaTest.java43
16 files changed, 344 insertions, 485 deletions
diff --git a/pom.xml b/pom.xml
index dbf8f94..64cdbe0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
<groupId>cn.ac.iie</groupId>
<artifactId>storm-olap-aggregation</artifactId>
- <version>v3.20.09.23</version>
+ <version>v3.20.11.17-ratelimit</version>
<packaging>jar</packaging>
@@ -20,20 +20,6 @@
<url>http://192.168.40.125:8099/content/groups/public</url>
</repository>
-
- <repository>
- <id>maven-ali</id>
- <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
- <releases>
- <enabled>true</enabled>
- </releases>
- <snapshots>
- <enabled>true</enabled>
- <updatePolicy>always</updatePolicy>
- <checksumPolicy>fail</checksumPolicy>
- </snapshots>
- </repository>
-
</repositories>
@@ -217,18 +203,5 @@
<version>4.4.1</version>
</dependency>
- <!--&lt;!&ndash; https://mvnrepository.com/artifact/org.apache.druid.extensions/druid-datasketches &ndash;&gt;-->
- <!--<dependency>-->
- <!--<groupId>org.apache.druid.extensions</groupId>-->
- <!--<artifactId>druid-datasketches</artifactId>-->
- <!--<version>0.16.0-incubating</version>-->
- <!--</dependency>-->
-
- <!--<dependency>-->
- <!--<groupId>com.jayway.jsonpath</groupId>-->
- <!--<artifactId>json-path</artifactId>-->
- <!--<version>2.4.0</version>-->
- <!--</dependency>-->
-
</dependencies>
</project>
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 85ef010..2166e6f 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,26 +1,34 @@
#管理kafka地址
-bootstrap.servers=192.168.40.203:9092
+bootstrap.servers=192.168.40.132:9092
#latest/earliest
auto.offset.reset=latest
-#压缩模式 none or snappy
-kafka.compression.type=none
+#接收自kafka的消费者 client-id
+consumer.client.id=live-chart-consumer-connection-record
+#回写给kafka的生产者 client-id
+producer.client.id=live-chart-producer-connection-record
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
#kafka broker下的topic名称
-kafka.topic=CONNECTION-RECORD-COMPLETED-LOG
+kafka.topic=test
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
-group.id=2020-11-11-2
+group.id=test-20200922
#输出topic
-results.bootstrap.servers=192.168.40.203:9092
+results.bootstrap.servers=192.168.40.132:9092
#输出topic
-results.output.topic=TRAFFIC-PROTOCOL-STAT-LOG
+results.output.topic=test-result
+
+#聚合时间,单位秒
+first.agg.time=5
#聚合时间,单位秒
-agg.time=15
+second.agg.time=15
#更新APP-ID时间
update.app.id.time=60
@@ -31,8 +39,14 @@ topology.workers=1
#spout并行度 建议与kafka分区数相同
spout.parallelism=1
-#处理补全操作的bolt并行度-worker的倍数
-datacenter.bolt.parallelism=1
+#拆分bolt并行度
+parse.bolt.parallelism=1
+
+#第一次聚合bolt并行度
+calculate.bolt.parallelism=1
+
+#二次聚合bolt并行度
+gathering.bolt.parallelism=1
#写入kafka的并行度10
kafka.bolt.parallelism=1
@@ -41,10 +55,10 @@ kafka.bolt.parallelism=1
batch.insert.num=2000
#网关的schema位置
-schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/liveChart
+schema.http=http://192.168.40.132:9999/metadata/schema/v1/fields/liveChart
#网关的schema位置
-app.id.http=http://192.168.44.67:9999/open-api/appDicList
+app.id.http=http://192.168.40.132:9999/open-api/appDicList
#数据中心(UID)
data.center.id.num=15
diff --git a/src/main/java/cn/ac/iie/storm/bolt/CalculateBolt.java b/src/main/java/cn/ac/iie/storm/bolt/CalculateBolt.java
new file mode 100644
index 0000000..1b62844
--- /dev/null
+++ b/src/main/java/cn/ac/iie/storm/bolt/CalculateBolt.java
@@ -0,0 +1,171 @@
+package cn.ac.iie.storm.bolt;
+
+import cn.ac.iie.storm.utils.combine.AggregateUtils;
+import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
+import cn.ac.iie.storm.utils.http.HttpClientUtil;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.log4j.Logger;
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.TupleUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @ClassNameAggregateBolt
+ * @Author [email protected]
+ * @Date2020/6/24 13:39
+ * @Version V1.0
+ **/
+public class CalculateBolt extends BaseBasicBolt {
+ private final static Logger logger = Logger.getLogger(CalculateBolt.class);
+ private static final long serialVersionUID = -7666031217706448622L;
+ private HashMap<String, JSONObject> metricsMap;
+ private HashMap<String, String[]> actionMap;
+ private HashMap<String, JSONObject> cacheMap;
+
+ /**
+ * 只会在程序初始化的时候执行一次
+ *
+ * @param stormConf
+ * @param context
+ */
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
+ cacheMap = new HashMap<>(32);
+
+ // TODO 获取action标签的内容
+ actionMap = AggregateUtils.getActionMap(schema);
+ metricsMap = AggregateUtils.getMetircsMap(schema);
+
+ }
+
+ @Override
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ try {
+ if (TupleUtils.isTick(input)) {
+ for (String key : cacheMap.keySet()) {
+ collector.emit(new Values(key, cacheMap.get(key).toString()));
+ }
+ cacheMap.clear();
+
+ } else {
+ String label = input.getStringByField("label");
+ //action中某个协议的所有function,如果没有就默认
+ String[] metrics = actionMap.getOrDefault(label, actionMap.get("Default"));
+
+ String dimensions = input.getStringByField("dimensions");
+ String message = input.getStringByField("message");
+
+ //一条数据
+ JSONObject event = JSONObject.parseObject(message);
+ //数据中的key值 (protocol,device_id,isp)
+ //map中对应的数据,可能为空,为空就默认创建一个对象
+ JSONObject cacheMessage = cacheMap.getOrDefault(dimensions, new JSONObject());
+ //TODO 接下来遍历所有的函数,去metrics的Map中去找对应的方法,并执行
+ for (String metric : metrics) {
+ String name = metricsMap.get(metric).getString("name");
+ //可能为空
+ String fieldName = metricsMap.get(name).getString("fieldName");
+
+ //TODO 每次新增函数,需要改动此处代码
+ functionSet(name, cacheMessage, cacheMessage.getString(name), event.getString(fieldName));
+ }
+ cacheMap.put(dimensions, cacheMessage);
+
+ }
+ } catch (Exception e) {
+ logger.error("计算节点异常,异常信息:" + e);
+ }
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("dimensions", "message"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<String, Object>(16);
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, StreamAggregateConfig.FIRST_AGG_TIME);
+ return conf;
+ }
+
+ /**
+ * 根据schema描述对应字段进行操作的 函数集合
+ *
+ * @param name 函数名称
+ * @param cacheMessage 结果集
+ * @param nameValue 当前值
+ * @param fieldNameValue 新加值
+ */
+ private static void functionSet(String name, JSONObject cacheMessage, String nameValue, String fieldNameValue) {
+ switch (name) {
+ case "sessions":
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
+ break;
+ case "c2s_byte_num":
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
+ break;
+ case "s2c_byte_num":
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
+ break;
+ case "c2s_pkt_num":
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
+ break;
+ case "s2c_pkt_num":
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
+ break;
+ case "c2s_ipfrag_num":
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
+ break;
+ case "s2c_ipfrag_num":
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
+ break;
+ case "s2c_tcp_lostlen":
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
+ break;
+ case "c2s_tcp_lostlen":
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
+ break;
+ case "c2s_tcp_unorder_num":
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
+ break;
+ case "s2c_tcp_unorder_num":
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
+ break;
+
+ case "unique_sip_num":
+ //TODO
+ //cacheMessage.put(name, AggregateUtils.)
+ break;
+ case "unique_cip_num":
+ //TODO
+ break;
+ default:
+ break;
+ }
+ }
+
+
+// cacheMessage.put("sessions", AggregateUtils.longSum(cacheMessage.getString("sessions"), event.getString("common_sessions")));
+// cacheMessage.put("c2s_byte_num", AggregateUtils.longSum(cacheMessage.getString("c2s_byte_num"), event.getString("common_c2s_byte_num")));
+// cacheMessage.put("s2c_byte_num", AggregateUtils.longSum(cacheMessage.getString("s2c_byte_num"), event.getString("common_s2c_byte_num")));
+// cacheMessage.put("c2s_pkt_num", AggregateUtils.longSum(cacheMessage.getString("c2s_pkt_num"), event.getString("common_c2s_pkt_num")));
+// cacheMessage.put("s2c_pkt_num", AggregateUtils.longSum(cacheMessage.getString("s2c_pkt_num"), event.getString("common_s2c_pkt_num")));
+// cacheMessage.put("c2s_ipfrag_num", AggregateUtils.longSum(cacheMessage.getString("c2s_ipfrag_num"), event.getString("common_c2s_ipfrag_num")));
+// cacheMessage.put("s2c_ipfrag_num", AggregateUtils.longSum(cacheMessage.getString("s2c_ipfrag_num"), event.getString("common_s2c_ipfrag_num")));
+// cacheMessage.put("c2s_tcp_lostlen", AggregateUtils.longSum(cacheMessage.getString("c2s_tcp_lostlen"), event.getString("common_c2s_tcp_lostlen")));
+// cacheMessage.put("s2c_tcp_lostlen", AggregateUtils.longSum(cacheMessage.getString("s2c_tcp_lostlen"), event.getString("common_s2c_tcp_lostlen")));
+// cacheMessage.put("c2s_tcp_unorder_num", AggregateUtils.longSum(cacheMessage.getString("c2s_tcp_unorder_num"), event.getString("common_c2s_tcp_unorder_num")));
+// cacheMessage.put("s2c_tcp_unorder_num", AggregateUtils.longSum(cacheMessage.getString("s2c_tcp_unorder_num"), event.getString("common_s2c_tcp_unorder_num")));
+}
diff --git a/src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java b/src/main/java/cn/ac/iie/storm/bolt/GatheringBolt.java
index 6a81fad..a9051db 100644
--- a/src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java
+++ b/src/main/java/cn/ac/iie/storm/bolt/GatheringBolt.java
@@ -24,9 +24,9 @@ import java.util.Map;
* @Date2020/6/24 13:39
* @Version V1.0
**/
-public class AggregateBolt extends BaseBasicBolt {
- private final static Logger logger = Logger.getLogger(AggregateBolt.class);
- private static final long serialVersionUID = -7666031217706448622L;
+public class GatheringBolt extends BaseBasicBolt {
+ private final static Logger logger = Logger.getLogger(GatheringBolt.class);
+ private static final long serialVersionUID = -6166717864837350277L;
private HashMap<String, JSONObject> metricsMap;
private HashMap<String, String[]> actionMap;
private HashMap<String, JSONObject> cacheMap;
@@ -40,7 +40,6 @@ public class AggregateBolt extends BaseBasicBolt {
*/
@Override
public void prepare(Map stormConf, TopologyContext context) {
-// timestampValue = System.currentTimeMillis() / 1000;
String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
timestamp = AggregateUtils.getTimeMetric(schema);
cacheMap = new HashMap<>(32);
@@ -65,40 +64,27 @@ public class AggregateBolt extends BaseBasicBolt {
cacheMap.clear();
} else {
- String label = input.getStringByField("label");
-
- //action中某个协议的所有function,如果没有就默认
- String[] metrics = actionMap.getOrDefault(label, actionMap.get("Default"));
-
+ String[] metrics = actionMap.get("Default");
String dimensions = input.getStringByField("dimensions");
-
String message = input.getStringByField("message");
//一条数据
JSONObject event = JSONObject.parseObject(message);
+
//数据中的key值 (protocol,device_id,isp)
//map中对应的数据,可能为空,为空就默认创建一个对象
JSONObject cacheMessage = cacheMap.getOrDefault(dimensions, new JSONObject());
//TODO 接下来遍历所有的函数,去metrics的Map中去找对应的方法,并执行
for (String metric : metrics) {
String name = metricsMap.get(metric).getString("name");
- //可能为空
- String fieldName = metricsMap.get(name).getString("fieldName");
- String nameValue = cacheMessage.getString(name);
- //map中的字段值
- nameValue = (nameValue == null) ? "0" : nameValue;
-
- String fieldNameValue = event.getString(fieldName);
- //数据中的字段值
- fieldNameValue = (fieldNameValue == null) ? "0" : fieldNameValue;
-
//TODO 每次新增函数,需要改动此处代码
- functionSet(name, cacheMessage, nameValue, fieldNameValue);
+ functionSet(name, cacheMessage, cacheMessage.getString(name), event.getString(name));
}
cacheMap.put(dimensions, cacheMessage);
}
} catch (Exception e) {
+ e.printStackTrace();
logger.error("计算节点异常,异常信息:" + e);
}
@@ -112,7 +98,7 @@ public class AggregateBolt extends BaseBasicBolt {
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>(16);
- conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, StreamAggregateConfig.AGG_TIME);
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, StreamAggregateConfig.SECOND_AGG_TIME);
return conf;
}
@@ -127,37 +113,37 @@ public class AggregateBolt extends BaseBasicBolt {
private static void functionSet(String name, JSONObject cacheMessage, String nameValue, String fieldNameValue) {
switch (name) {
case "sessions":
- cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
break;
case "c2s_byte_num":
- cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
break;
case "s2c_byte_num":
- cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
break;
case "c2s_pkt_num":
- cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
break;
case "s2c_pkt_num":
- cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
break;
case "c2s_ipfrag_num":
- cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
break;
case "s2c_ipfrag_num":
- cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
break;
case "s2c_tcp_lostlen":
- cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
break;
case "c2s_tcp_lostlen":
- cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
break;
case "c2s_tcp_unorder_num":
- cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
break;
case "s2c_tcp_unorder_num":
- cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
+ cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue));
break;
case "unique_sip_num":
diff --git a/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java b/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java
index 9c59c99..d7fdad9 100644
--- a/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java
+++ b/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java
@@ -64,46 +64,59 @@ public class ParseKvBolt extends BaseBasicBolt {
if (TupleUtils.isTick(tuple)) {
updateAppRelation(appMap);
} else {
- //TODO 解析tuple的 message
- JSONObject message = JSONObject.parseObject(tuple.getStringByField("source"));
-
- //TODO 新建一个dimensions的Json对象
- JSONObject dimensionsObj = transDimensions(dimensions, message);
-
- for (Object transform : transforms) {
- JSONObject transformObj = JSONObject.parseObject(transform.toString());
- String function = transformObj.getString("function");
- String name = transformObj.getString("name");
- String fieldName = transformObj.getString("fieldName");
- String parameters = transformObj.getString("parameters");
-
- switch (function) {
- case "alignment":
- if (StringUtil.isNotBlank(parameters)) {
- if (message.containsKey(fieldName)) {
- alignmentUtils(parameters, message, name, fieldName);
- }
+ String source = tuple.getStringByField("source");
+ if (StringUtil.isNotBlank(source)) {
+ //TODO 解析tuple的 message
+ JSONObject message = JSONObject.parseObject(tuple.getStringByField("source"));
+ String protocolLabel = message.getString("common_protocol_label");
+
+ if (StringUtil.isNotBlank(protocolLabel)) {
+
+ //TODO 新建一个dimensions的Json对象
+ JSONObject dimensionsObj = transDimensions(dimensions, message);
+
+ for (Object transform : transforms) {
+ JSONObject transformObj = JSONObject.parseObject(transform.toString());
+ String function = transformObj.getString("function");
+ String name = transformObj.getString("name");
+ String fieldName = transformObj.getString("fieldName");
+ String parameters = transformObj.getString("parameters");
+
+ switch (function) {
+ case "alignment":
+ if (StringUtil.isNotBlank(parameters)) {
+ if (message.containsKey(fieldName)) {
+ alignmentUtils(parameters, message, name, fieldName);
+ }
+ }
+ break;
+ case "combination":
+ if (StringUtil.isNotBlank(parameters)) {
+ String l7Proto = message.getString("common_l7_protocol");
+ if (StringUtil.isNotBlank(l7Proto)) {
+ String res = l7Proto + "/" + message.getString(fieldName);
+ message.put(fieldName, res);
+ dimensionsObj.put(name, res);
+ }
+ combinationUtils(parameters, message, name, fieldName, dimensionsObj);
+ collector.emit(new Values(dimensionsObj.getString(name), dimensionsObj.toString(), message.toString()));
+ }
+ break;
+// case "hierarchy":
+// String hierarchyValue = message.getString(fieldName);
+// //TODO 执行拆分代码
+// if (StringUtil.isNotBlank(hierarchyValue) && StringUtil.isNotBlank(parameters)) {
+// String[] hierarchyPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
+// String[] dimensionsArr = hierarchyValue.split(hierarchyPars[0]);
+// //TODO 递归拼接tuple并发送出去
+// System.out.println(dimensionsObj.get(name) + "---" + dimensionsObj.toString() + "---" + message.toString());
+// AggregateUtils.reSend(1, dimensionsArr, "", collector, message, dimensionsObj, name);
+// }
+// break;
+ default:
+ break;
}
- break;
- case "combination":
- if (StringUtil.isNotBlank(parameters)) {
- combinationUtils(parameters, message, name, fieldName, dimensionsObj);
- }
- break;
- case "hierarchy":
- String hierarchyValue = message.getString(fieldName);
- //TODO 执行拆分代码
- if (StringUtil.isNotBlank(hierarchyValue) && StringUtil.isNotBlank(parameters)) {
- String[] hierarchyPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
- String[] dimensionsArr = hierarchyValue.split(hierarchyPars[0]);
- //TODO 递归拼接tuple并发送出去
- AggregateUtils.reSend(1, dimensionsArr, "", collector, message, dimensionsObj, name);
- }
- break;
- default:
- //数据原样输出
- collector.emit(new Values(null, null, message.toString()));
- break;
+ }
}
}
}
@@ -138,13 +151,13 @@ public class ParseKvBolt extends BaseBasicBolt {
private static void alignmentUtils(String parameters, JSONObject message, String name, String fieldName) {
String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
String data = message.getString(fieldName);
-
+ logger.warn("alignmentPars=" + Arrays.toString(alignmentPars) + "data=" + data);
int subscript = Integer.parseInt(alignmentPars[0]);
String[] fieldSplit = data.split(alignmentPars[1]);
- Long appID = Long.valueOf(fieldSplit[subscript]);
+ Long appId = Long.valueOf(fieldSplit[subscript]);
int length = fieldSplit[subscript].length();
StringBuilder sb = new StringBuilder(data);
- message.put(name, sb.replace(0, length, appMap.get(appID)));
+ message.put(name, sb.replace(0, length, appMap.get(appId)));
}
/**
diff --git a/src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java b/src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java
index 36e57a6..af53133 100644
--- a/src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java
+++ b/src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java
@@ -1,8 +1,12 @@
package cn.ac.iie.storm.bolt;
+import cn.ac.iie.storm.utils.combine.AggregateUtils;
import cn.ac.iie.storm.utils.common.LogSendKafka;
import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+import kafka.utils.json.JsonObject;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
@@ -31,7 +35,23 @@ public class ResultSendBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
- logSendKafka.sendMessage(tuple.getStringByField("message"));
+ String message = tuple.getStringByField("message");
+ if (StringUtil.isNotBlank(message)) {
+ JSONObject jsonObject = JSONObject.parseObject(message);
+ StringBuffer stringBuffer = new StringBuffer();
+ String[] protocolIds = jsonObject.getString("protocol_id").split("/");
+ for (int i = (protocolIds.length - 1); i >= 0; i--) {
+ if (StringUtil.isBlank(stringBuffer.toString())) {
+ stringBuffer.append(protocolIds[i]);
+ jsonObject.put("protocol_id", stringBuffer.toString());
+ logSendKafka.sendMessage(jsonObject.toString());
+ } else {
+ stringBuffer.append("/").append(protocolIds[i]);
+ jsonObject.put("protocol_id", stringBuffer.toString());
+ logSendKafka.sendMessage(jsonObject.toString());
+ }
+ }
+ }
} catch (Exception e) {
logger.error(StreamAggregateConfig.RESULTS_OUTPUT_TOPIC + "日志发送Kafka过程出现异常,异常信息:" + e);
}
@@ -41,4 +61,5 @@ public class ResultSendBolt extends BaseBasicBolt {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
+
}
diff --git a/src/main/java/cn/ac/iie/storm/bolt/change/FilterBolt.java b/src/main/java/cn/ac/iie/storm/bolt/change/FilterBolt.java
deleted file mode 100644
index 6093a94..0000000
--- a/src/main/java/cn/ac/iie/storm/bolt/change/FilterBolt.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package cn.ac.iie.storm.bolt.change;
-
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-/**
- * @ClassNameFilterBolt
- * @Author [email protected]
- * @Date2020/7/1 12:02
- * @Version V1.0
- **/
-public class FilterBolt extends BaseBasicBolt {
- @Override
- public void execute(Tuple input, BasicOutputCollector collector) {
- JSONObject source = JSONObject.parseObject(input.getStringByField("source"));
- String schema = "";
-
- String data = JSONObject.parseObject(schema).getString("data");
-
- String filters = JSONObject.parseObject(data).getString("filters");
-
- boolean flag = true;
- String type = JSONObject.parseObject(filters).getString("type");
- JSONArray fieldsArr = JSONObject.parseArray(JSONObject.parseObject(filters).getString("fields"));
- if ("and".equals(type)) {
- for (int i = 0; i < fieldsArr.size(); i++) {
-
- JSONObject field = JSONObject.parseObject(fieldsArr.get(i).toString());
- String name = field.getString("fieldName");
- String fieldType = field.getString("type");
- Object values = field.get("values");
-
- Object nameValue = source.get(name);
-
- System.out.println("nameValue ========" +nameValue);
-
- if ("not".equals(fieldType)) {
-
- if (nameValue == values) {
- //满足过滤条件
- flag = false;
- }
-
- } else if ("in".equals(fieldType)) {
- if (!values.toString().contains(nameValue.toString())) {
- //满足过滤条件
- flag = false;
- }
- }
- }}
-
-
-
- collector.emit(new Values(source));
-
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("filter"));
- }
-}
diff --git a/src/main/java/cn/ac/iie/storm/bolt/print/PrintBolt.java b/src/main/java/cn/ac/iie/storm/bolt/print/PrintBolt.java
deleted file mode 100644
index e251f21..0000000
--- a/src/main/java/cn/ac/iie/storm/bolt/print/PrintBolt.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package cn.ac.iie.storm.bolt.print;
-
-import org.apache.log4j.Logger;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Tuple;
-
-/**
- * @ClassNamePrintBolt
- * @Author [email protected]
- * @Date2020/6/28 15:34
- * @Version V1.0
- **/
-public class PrintBolt extends BaseBasicBolt {
- private final static Logger logger = Logger.getLogger(PrintBolt.class);
- private static long a;
- private long b;
- public static long c;
-
- @Override
- public void execute(Tuple input, BasicOutputCollector collector) {
- logger.error("==================================一批数据=========================");
-
- a= System.currentTimeMillis();
- b= System.currentTimeMillis();
- c= System.currentTimeMillis();
-
-
- logger.error(Thread.currentThread() + "private static long a======:" + a);
- logger.error(Thread.currentThread() + "private long b======:" + b);
- logger.error(Thread.currentThread() + "public static long c======:" + c);
-
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-}
diff --git a/src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java
index fd6dfba..feeb86c 100644
--- a/src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java
+++ b/src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java
@@ -1,6 +1,7 @@
package cn.ac.iie.storm.spout;
import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -39,6 +40,11 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
props.put("auto.offset.reset", StreamAggregateConfig.AUTO_OFFSET_RESET);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+
+ /**
+ * 限流配置-20201117
+ */
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, StreamAggregateConfig.CONSUMER_CLIENT_ID);
return props;
}
diff --git a/src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java b/src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java
index ed38bce..520b829 100644
--- a/src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java
+++ b/src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java
@@ -1,6 +1,7 @@
package cn.ac.iie.storm.topology;
-import cn.ac.iie.storm.bolt.AggregateBolt;
+import cn.ac.iie.storm.bolt.CalculateBolt;
+import cn.ac.iie.storm.bolt.GatheringBolt;
import cn.ac.iie.storm.bolt.ResultSendBolt;
import cn.ac.iie.storm.bolt.ParseKvBolt;
import cn.ac.iie.storm.spout.CustomizedKafkaSpout;
@@ -66,15 +67,17 @@ public class StreamAggregateTopology {
builder = new TopologyBuilder();
builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), StreamAggregateConfig.SPOUT_PARALLELISM);
- builder.setBolt("ParseKvBolt", new ParseKvBolt(), StreamAggregateConfig.DATACENTER_BOLT_PARALLELISM)
+ builder.setBolt("ParseKvBolt", new ParseKvBolt(), StreamAggregateConfig.PARSE_BOLT_PARALLELISM)
.localOrShuffleGrouping("CustomizedKafkaSpout");
- builder.setBolt("AggregateBolt", new AggregateBolt(), StreamAggregateConfig.DATACENTER_BOLT_PARALLELISM)
- .fieldsGrouping("ParseKvBolt", new Fields("dimensions"));
+ builder.setBolt("CalculateBolt", new CalculateBolt(), StreamAggregateConfig.CALCULATE_BOLT_PARALLELISM)
+ .localOrShuffleGrouping("ParseKvBolt");
+
+ builder.setBolt("GatheringBolt", new GatheringBolt(), StreamAggregateConfig.GATHERING_BOLT_PARALLELISM)
+ .fieldsGrouping("CalculateBolt", new Fields("dimensions"));
builder.setBolt("ResultSendBolt", new ResultSendBolt(), StreamAggregateConfig.KAFKA_BOLT_PARALLELISM)
- .localOrShuffleGrouping("AggregateBolt");
-// builder.setBolt("PrintBolt", new PrintBolt(), 3).localOrShuffleGrouping("LogFlowWriteSpout");
+ .localOrShuffleGrouping("GatheringBolt");
}
diff --git a/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java b/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java
index c270040..de5938c 100644
--- a/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java
+++ b/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java
@@ -27,9 +27,11 @@ public class AggregateUtils {
* @param value2 第二个值
* @return value1 + value2
*/
- public static Long longSum(Long value1, Long value2) {
+ public static Long longSum(String value1, String value2) {
+ value1 = (value1 == null) ? "0" : value1;
+ value2 = (value2 == null) ? "0" : value2;
- return value1 + value2;
+ return Long.parseLong(value1) + Long.parseLong(value2);
}
/**
@@ -81,23 +83,6 @@ public class AggregateUtils {
* @param name
*/
public static void reSend(int headIndex, String[] splitArr, String initStr, BasicOutputCollector collector, JSONObject message, JSONObject dimesionsObj, String name) {
-
-// //递归拼接字符串
-// if (splitArr.length == headIndex - 1) {
-// //什么也不做
-// } else {
-// //递归的核心代码
-// if ("".equals(initStr)) {
-// initStr = splitArr[splitArr.length - headIndex];
-// } else {
-// initStr = initStr + "/" + splitArr[splitArr.length - headIndex];
-// }
-// dimesionsObj.put(name, initStr);
-//
-// collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString()));
-// reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name);
-// }
-
//递归拼接字符串
if (splitArr.length != headIndex - 1) {
//递归的核心代码
@@ -109,8 +94,6 @@ public class AggregateUtils {
dimesionsObj.put(name, initStr);
collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString()));
-
-
reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name);
}
}
@@ -156,6 +139,17 @@ public class AggregateUtils {
.getString("name");
}
+ public static void main(String[] args) {
+ String sc = "{\"status\":200,\"code\":\"200666\",\"queryKey\":null,\"success\":true,\"message\":\"ok\",\"statistics\":null,\"formatType\":null,\"meta\":null,\"data\":{\"type\":\"record\",\"name\":\"liveChart\",\"doc\":{\"timestamp\":{\"name\":\"stat_time\",\"type\":\"Long\"},\"dimensions\":[{\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"type\":\"String\"},{\"name\":\"entrance_id\",\"fieldName\":\"common_entrance_id\",\"type\":\"String\"},{\"name\":\"isp\",\"fieldName\":\"common_isp\",\"type\":\"String\"},{\"name\":\"data_center\",\"fieldName\":\"common_data_center\",\"type\":\"String\"}],\"metrics\":[{\"function\":\"sum\",\"name\":\"sessions\",\"fieldName\":\"common_sessions\"},{\"function\":\"sum\",\"name\":\"c2s_byte_num\",\"fieldName\":\"common_c2s_byte_num\"},{\"function\":\"sum\",\"name\":\"s2c_byte_num\",\"fieldName\":\"common_s2c_byte_num\"},{\"function\":\"sum\",\"name\":\"c2s_pkt_num\",\"fieldName\":\"common_c2s_pkt_num\"},{\"function\":\"sum\",\"name\":\"s2c_pkt_num\",\"fieldName\":\"common_s2c_pkt_num\"},{\"function\":\"sum\",\"name\":\"c2s_ipfrag_num\",\"fieldName\":\"common_c2s_ipfrag_num\"},{\"function\":\"sum\",\"name\":\"s2c_ipfrag_num\",\"fieldName\":\"common_s2c_ipfrag_num\"},{\"function\":\"sum\",\"name\":\"c2s_tcp_lostlen\",\"fieldName\":\"common_c2s_tcp_lostlen\"},{\"function\":\"sum\",\"name\":\"s2c_tcp_lostlen\",\"fieldName\":\"common_s2c_tcp_lostlen\"},{\"function\":\"sum\",\"name\":\"c2s_tcp_unorder_num\",\"fieldName\":\"common_c2s_tcp_unorder_num\"},{\"function\":\"sum\",\"name\":\"s2c_tcp_unorder_num\",\"fieldName\":\"common_s2c_tcp_unorder_num\"},{\"function\":\"disCount\",\"name\":\"unique_sip_num\",\"fieldName\":\"common_server_ip\"},{\"function\":\"disCount\",\"name\":\"unique_cip_num\",\"fieldName\":\"common_client_ip\"}],\"filters\":{\"type\":\"and\",\"fields\":[{\"fieldName\":\"common_protocol_label\",\"type\":\"not\",\"values\":null}]},\"transforms\":[{\"fieldName\":\"common_app_label\",\"function\":\"replaceapp\",\"name\":\"common_app_label\",\"parameters\":\"0,/\"},{\"function\":\"combination\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\":\"common_app_label,/\"},{\"function\":\"hierarchy\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\":\"/\"}],\"action\":[{\"label\":\"Default\",\"metrics\":\"sessions,c2s_byte_num,s2c_byte_num,c2s_pkt_num,s2c_pkt_num,c2s_ipfrag_num,s2c_ipfrag_num,c2s_tcp_lostlen,s2c_tcp_lostlen,c2s_tcp_unorder_num,s2c_tcp_unorder_num\"}],\"granularity\":{\"type\":\"period\",\"period\":\"5M\"}},\"fields\":[],\"task\":\"Protocol-Distribution\",\"in\":\"CONNECTION-RECORD-COMPLETED-LOG\",\"out\":\"TRAFFIC-PROTOCOL-STAT-LOG\"}}";
+ JSONObject jsonObject = JSONObject.parseObject(sc);
+ String data = jsonObject.getString("data");
+
+ System.out.println(JSONObject.parseObject(JSONObject.parseObject(JSONObject.parseObject(data)
+ .getString("doc"))
+ .getString("timestamp"))
+ .getString("name"));
+ }
+
/**
* 更新缓存中的对应关系map
*
@@ -183,6 +177,7 @@ public class AggregateUtils {
System.out.println((System.currentTimeMillis() - begin));
logger.warn("更新缓存中的对应的APP关系,拉取接口数据长度:[" + objects.size());
} catch (Exception e) {
+ e.printStackTrace();
logger.error("更新缓存APP-ID失败,异常:" + e);
}
}
diff --git a/src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java b/src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java
index 378fea6..ee42553 100644
--- a/src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java
+++ b/src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java
@@ -69,7 +69,13 @@ public class LogSendKafka {
properties.put("request.timeout.ms", 30000);
properties.put("batch.size", 262144);
properties.put("buffer.memory", 33554432);
- properties.put("compression.type", StreamAggregateConfig.KAFKA_COMPRESSION_TYPE);
+
+ /**
+ * 限流配置-20201117
+ */
+ properties.put(ProducerConfig.CLIENT_ID_CONFIG, StreamAggregateConfig.PRODUCER_CLIENT_ID);
+ properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, StreamAggregateConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
+
kafkaProducer = new KafkaProducer<>(properties);
}
diff --git a/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java b/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java
index a768ce1..fdeace4 100644
--- a/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java
+++ b/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java
@@ -11,7 +11,9 @@ public class StreamAggregateConfig {
* System
*/
public static final Integer SPOUT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "spout.parallelism");
- public static final Integer DATACENTER_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "datacenter.bolt.parallelism");
+ public static final Integer CALCULATE_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "calculate.bolt.parallelism");
+ public static final Integer GATHERING_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "gathering.bolt.parallelism");
+ public static final Integer PARSE_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "parse.bolt.parallelism");
public static final Integer TOPOLOGY_WORKERS = StreamAggregateConfigurations.getIntProperty(0, "topology.workers");
public static final Integer KAFKA_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "kafka.bolt.parallelism");
public static final Integer TOPOLOGY_NUM_ACKS = StreamAggregateConfigurations.getIntProperty(0, "topology.num.acks");
@@ -21,7 +23,8 @@ public class StreamAggregateConfig {
public static final Integer MAX_FAILURE_NUM = StreamAggregateConfigurations.getIntProperty(0, "max.failure.num");
- public static final Integer AGG_TIME = StreamAggregateConfigurations.getIntProperty(0, "agg.time");
+ public static final Integer FIRST_AGG_TIME = StreamAggregateConfigurations.getIntProperty(0, "first.agg.time");
+ public static final Integer SECOND_AGG_TIME = StreamAggregateConfigurations.getIntProperty(0, "second.agg.time");
public static final Integer UPDATE_APP_ID_TIME = StreamAggregateConfigurations.getIntProperty(0, "update.app.id.time");
@@ -34,7 +37,13 @@ public class StreamAggregateConfig {
public static final String RESULTS_OUTPUT_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "results.output.topic");
public static final String KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "kafka.topic");
public static final String AUTO_OFFSET_RESET = StreamAggregateConfigurations.getStringProperty(0, "auto.offset.reset");
- public static final String KAFKA_COMPRESSION_TYPE = StreamAggregateConfigurations.getStringProperty(0, "kafka.compression.type");
+
+ /**
+ * kafka限流配置-20201117
+ */
+ public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = StreamAggregateConfigurations.getStringProperty(0, "producer.kafka.compression.type");
+ public static final String CONSUMER_CLIENT_ID = StreamAggregateConfigurations.getStringProperty(0, "consumer.client.id");
+ public static final String PRODUCER_CLIENT_ID = StreamAggregateConfigurations.getStringProperty(0, "producer.client.id");
/**
* http
diff --git a/src/test/java/com/wp/AppIdTest.java b/src/test/java/com/wp/AppIdTest.java
deleted file mode 100644
index 39c9348..0000000
--- a/src/test/java/com/wp/AppIdTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package com.wp;
-
-import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
-import cn.ac.iie.storm.utils.http.HttpClientUtil;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-
-/**
- * @author qidaijie
- * @Package com.wp
- * @Description:
- * @date 2020/9/2215:09
- */
-public class AppIdTest {
-
- @Test
- public void appTest() {
- //http://192.168.44.12:9999/open-api/appDicList
- String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP);
- JSONObject jsonObject = JSONObject.parseObject(schema);
- String data = jsonObject.getString("data");
- HashMap<Long, String> map = new HashMap<>(16);
- JSONArray objects = JSONArray.parseArray(data);
- for (Object object : objects) {
- JSONArray jsonArray = JSONArray.parseArray(object.toString());
- map.put(jsonArray.getLong(0), jsonArray.getString(1));
-// System.out.println(object);
- }
- System.out.println(map.toString());
-
- System.out.println(map.size());
- }
-
- @Test
- public void changeApp() {
- String a = "QQ";
- String[] alignmentPars = "0,/".split(StreamAggregateConfig.FORMAT_SPLITTER);
- String data = "100/HTTP";
- int subscript = Integer.parseInt(alignmentPars[0]);
- String[] fieldSplit = data.split(alignmentPars[1]);
- Long appID = Long.valueOf(fieldSplit[subscript]);
- int length = fieldSplit[subscript].length();
- StringBuilder sb = new StringBuilder(data);
-
- System.out.println(sb.replace(0, length, a));
-
-
- }
-}
diff --git a/src/test/java/com/wp/FilterBolt.java b/src/test/java/com/wp/FilterBolt.java
deleted file mode 100644
index a3a9ee7..0000000
--- a/src/test/java/com/wp/FilterBolt.java
+++ /dev/null
@@ -1,133 +0,0 @@
-package com.wp;
-
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-
-/**
- * @ClassNameFilterBolt
- * @Author [email protected]
- * @Date2020/7/1 14:53
- * @Version V1.0
- **/
-public class FilterBolt {
- @SuppressWarnings("all")
- public static void main(String[] args) {
- JSONObject source = new JSONObject();
-
-
- String schema = "{\n" +
- " \"task\": \"Application-Protocol-Distribution\",\n" +
- " \"in\": \"CONNECTION-SKETCH-COMPLETED\",\n" +
- " \"out\": \"TRAFFIC-PROTOCOL-STAT-LOG\",\n" +
- " \"data\": {\n" +
- " \"timestamp\": {\n" +
- " \"name\": \"stat_time\"\n" +
- " },\n" +
- " \"dimensions\": [\n" +
- " {\n" +
- " \"name\": \"protocol_id\",\n" +
- " \"fieldName\": \"common_protocol_label\",\n" +
- " \"type\": \"String\"\n" +
- " },\n" +
- " {\n" +
- " \"name\": \"device_id\",\n" +
- " \"fieldName\": \"common_device_id\",\n" +
- " \"type\": \"String\"\n" +
- " },\n" +
- " {\n" +
- " \"name\": \"isp\",\n" +
- " \"fieldName\": \"common_isp\",\n" +
- " \"type\": \"String\"\n" +
- " }\n" +
- " ],\n" +
- " \"metrics\": [\n" +
- " { \"function\" : \"sessions_count\", \"name\" : \"sessions\"},\n" +
- " { \"function\" : \"c2s_byte_sum\", \"name\" : \"c2s_byte_len\", \"fieldName\" : \"common_c2s_byte_num\" },\n" +
- " { \"function\" : \"s2c_byte_sum\", \"name\" : \"s2c_byte_len\", \"fieldName\" : \"common_s2c_byte_num\" },\n" +
- " { \"function\" : \"c2s_pkt_sum\", \"name\" : \"c2s_pkt_num\", \"fieldName\" : \"common_c2s_pkt_num\" },\n" +
- " { \"function\" : \"s2c_pkt_sum\", \"name\" : \"s2c_pkt_num\", \"fieldName\" : \"common_s2c_pkt_num\" },\n" +
- " { \"function\" : \"sip_disCount\", \"name\" : \"unique_sip_num\", \"fieldName\" : \"common_server_ip\" },\n" +
- " { \"function\" : \"cip_disCount\", \"name\" : \"unique_cip_num\", \"fieldName\" : \"common_client_ip\" }\n" +
- " ],\n" +
- " \"filters\": {\n" +
- " \"type\": \"and\",\n" +
- " \"fields\": [\n" +
- " {\n" +
- " \"fieldName\": \"common_device_id\",\n" +
- " \"type\": \"not\",\n" +
- " \"values\": null\n" +
- " },\n" +
- " {\n" +
- " \"fieldName\": \"common_protocol_label\",\n" +
- " \"type\": \"not\",\n" +
- " \"values\": null\n" +
- " },\n" +
- " {\n" +
- " \"fieldName\": \"common_isp\",\n" +
- " \"type\": \"not\",\n" +
- " \"values\": null\n" +
- " }\n" +
- " ]\n" +
- " },\n" +
- " \"transforms\":[\n" +
- " {\"function\":\"combination\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\": \"common_app_label,/\"},\n" +
- " {\"function\":\"hierarchy\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\": \"/\"}\n" +
- " ],\n" +
- " \"action\":[\n" +
- " {\"label\": \"Default\", \"metrics\": \"sessions,c2s_byte_len,s2c_byte_len,c2s_pkt_num,s2c_pkt_num\"},\n" +
- " {\"label\": \"HTTP\", \"metrics\": \"sessions,c2s_byte_len,s2c_byte_len,c2s_pkt_num,s2c_pkt_num,unique_sip_num,unique_cip_num\"}\n" +
- " ],\n" +
- " \"granularity\":{\n" +
- " \"type\": \"period\",\n" +
- " \"period\": \"5M\"\n" +
- " }\n" +
- " }\n" +
- "}";
-
-
- source.put("common_protocol_label", "HTTP");
- source.put("common_isp", "Unicom");
- source.put("common_device_id", "1");
- String data = JSONObject.parseObject(schema).getString("data");
-
- String filters = JSONObject.parseObject(data).getString("filters");
-
- boolean flag = true;
- String type = JSONObject.parseObject(filters).getString("type");
- JSONArray fieldsArr = JSONObject.parseArray(JSONObject.parseObject(filters).getString("fields"));
- if ("and".equals(type)) {
- for (int i = 0; i < fieldsArr.size(); i++) {
-
- JSONObject field = JSONObject.parseObject(fieldsArr.get(i).toString());
- String name = field.getString("fieldName");
- String fieldType = field.getString("type");
- Object values = field.get("values");
-
- Object nameValue = source.get(name);
-
-
- if ("not".equals(fieldType)) {
-
- if (nameValue == values) {
- //满足过滤条件
- flag = false;
- }
-
- } else if ("in".equals(fieldType)) {
- if (!values.toString().contains(nameValue.toString())) {
- //满足过滤条件
- flag = false;
- }
- }
- }
-
- if (flag){
- System.out.println("输出到下个Bolt");
- }else {
-
- System.out.println("此条消息被过滤掉");
- }
-
- }
- }
-}
diff --git a/src/test/java/com/wp/SchemaTest.java b/src/test/java/com/wp/SchemaTest.java
deleted file mode 100644
index f275592..0000000
--- a/src/test/java/com/wp/SchemaTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package com.wp;
-
-import com.alibaba.fastjson.JSONObject;
-import com.zdjizhi.utils.StringUtil;
-
-/**
- * @ClassNameSchemaTest
- * @Author [email protected]
- * @Date2020/6/28 10:11
- * @Version V1.0
- **/
-public class SchemaTest {
-
- static String str = "";
-
- public static void main(String[] args) {
-
-
- String str1 = null;
- String str2 = " ";
-
-
- System.out.println(StringUtil.isNotBlank(str1));
- System.out.println(StringUtil.isNotEmpty(str1));
-
- System.out.println(StringUtil.isNotBlank(str2));
- System.out.println(StringUtil.isNotEmpty(str2));
-
- }
-
- public static void reAdd(int m, String[] split, String str) {
-
- //递归拼接字符串
- if (0 == m) {
- } else {
- //递归的核心代码
- str = str + split[m - 1] + "/";
- reAdd(m - 1, split, str);
-
- }
-
- }
-}