diff options
| author | qidaijie <[email protected]> | 2020-12-25 17:25:33 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2020-12-25 17:25:33 +0800 |
| commit | 6c858283e8cbf7d69eb263dae27e8e1daa8dac05 (patch) | |
| tree | 9c987e7fcbf299778ada437f79e9ee944064a440 | |
| parent | 5f094ab7edaf040b7a99211503e29c5e6dfd13e6 (diff) | |
提交线上使用版本代码
16 files changed, 344 insertions, 485 deletions
@@ -4,7 +4,7 @@ <groupId>cn.ac.iie</groupId> <artifactId>storm-olap-aggregation</artifactId> - <version>v3.20.09.23</version> + <version>v3.20.11.17-ratelimit</version> <packaging>jar</packaging> @@ -20,20 +20,6 @@ <url>http://192.168.40.125:8099/content/groups/public</url> </repository> - - <repository> - <id>maven-ali</id> - <url>http://maven.aliyun.com/nexus/content/groups/public/</url> - <releases> - <enabled>true</enabled> - </releases> - <snapshots> - <enabled>true</enabled> - <updatePolicy>always</updatePolicy> - <checksumPolicy>fail</checksumPolicy> - </snapshots> - </repository> - </repositories> @@ -217,18 +203,5 @@ <version>4.4.1</version> </dependency> - <!--<!– https://mvnrepository.com/artifact/org.apache.druid.extensions/druid-datasketches –>--> - <!--<dependency>--> - <!--<groupId>org.apache.druid.extensions</groupId>--> - <!--<artifactId>druid-datasketches</artifactId>--> - <!--<version>0.16.0-incubating</version>--> - <!--</dependency>--> - - <!--<dependency>--> - <!--<groupId>com.jayway.jsonpath</groupId>--> - <!--<artifactId>json-path</artifactId>--> - <!--<version>2.4.0</version>--> - <!--</dependency>--> - </dependencies> </project> diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 85ef010..2166e6f 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,26 +1,34 @@ #管理kafka地址 -bootstrap.servers=192.168.40.203:9092 +bootstrap.servers=192.168.40.132:9092 #latest/earliest auto.offset.reset=latest -#压缩模式 none or snappy -kafka.compression.type=none +#接收自kafka的消费者 client-id +consumer.client.id=live-chart-consumer-connection-record +#回写给kafka的生产者 client-id +producer.client.id=live-chart-producer-connection-record + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=snappy #kafka broker下的topic名称 -kafka.topic=CONNECTION-RECORD-COMPLETED-LOG +kafka.topic=test #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=2020-11-11-2 +group.id=test-20200922 #输出topic -results.bootstrap.servers=192.168.40.203:9092 +results.bootstrap.servers=192.168.40.132:9092 #输出topic -results.output.topic=TRAFFIC-PROTOCOL-STAT-LOG +results.output.topic=test-result + +#聚合时间,单位秒 +first.agg.time=5 #聚合时间,单位秒 -agg.time=15 +second.agg.time=15 #更新APP-ID时间 update.app.id.time=60 @@ -31,8 +39,14 @@ topology.workers=1 #spout并行度 建议与kafka分区数相同 spout.parallelism=1 -#处理补全操作的bolt并行度-worker的倍数 -datacenter.bolt.parallelism=1 +#拆分bolt并行度 +parse.bolt.parallelism=1 + +#第一次聚合bolt并行度 +calculate.bolt.parallelism=1 + +#二次聚合bolt并行度 +gathering.bolt.parallelism=1 #写入kafka的并行度10 kafka.bolt.parallelism=1 @@ -41,10 +55,10 @@ kafka.bolt.parallelism=1 batch.insert.num=2000 #网关的schema位置 -schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/liveChart +schema.http=http://192.168.40.132:9999/metadata/schema/v1/fields/liveChart #网关的schema位置 -app.id.http=http://192.168.44.67:9999/open-api/appDicList +app.id.http=http://192.168.40.132:9999/open-api/appDicList #数据中心(UID) data.center.id.num=15 diff --git a/src/main/java/cn/ac/iie/storm/bolt/CalculateBolt.java b/src/main/java/cn/ac/iie/storm/bolt/CalculateBolt.java new file mode 100644 index 0000000..1b62844 --- /dev/null +++ b/src/main/java/cn/ac/iie/storm/bolt/CalculateBolt.java @@ -0,0 +1,171 @@ +package cn.ac.iie.storm.bolt; + +import cn.ac.iie.storm.utils.combine.AggregateUtils; +import cn.ac.iie.storm.utils.file.StreamAggregateConfig; +import cn.ac.iie.storm.utils.http.HttpClientUtil; +import com.alibaba.fastjson.JSONObject; +import org.apache.log4j.Logger; +import org.apache.storm.Config; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.TupleUtils; + +import java.util.HashMap; +import java.util.Map; + +/** + * @ClassNameAggregateBolt + * @Author [email protected] + * @Date2020/6/24 13:39 + * @Version V1.0 + **/ +public class CalculateBolt extends BaseBasicBolt { + private final static Logger logger = Logger.getLogger(CalculateBolt.class); + private static final long serialVersionUID = -7666031217706448622L; + private HashMap<String, JSONObject> metricsMap; + private HashMap<String, String[]> actionMap; + private HashMap<String, JSONObject> cacheMap; + + /** + * 只会在程序初始化的时候执行一次 + * + * @param stormConf + * @param context + */ + @Override + public void prepare(Map stormConf, TopologyContext context) { + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + cacheMap = new HashMap<>(32); + + // TODO 获取action标签的内容 + actionMap = AggregateUtils.getActionMap(schema); + metricsMap = AggregateUtils.getMetircsMap(schema); + + } + + @Override + public void execute(Tuple input, BasicOutputCollector collector) { + try { + if (TupleUtils.isTick(input)) { + for (String key : cacheMap.keySet()) { + collector.emit(new Values(key, cacheMap.get(key).toString())); + } + cacheMap.clear(); + + } else { + String label = input.getStringByField("label"); + //action中某个协议的所有function,如果没有就默认 + String[] metrics = actionMap.getOrDefault(label, actionMap.get("Default")); + + String dimensions = input.getStringByField("dimensions"); + String message = input.getStringByField("message"); + + //一条数据 + JSONObject event = JSONObject.parseObject(message); + //数据中的key值 (protocol,device_id,isp) + //map中对应的数据,可能为空,为空就默认创建一个对象 + JSONObject cacheMessage = cacheMap.getOrDefault(dimensions, new JSONObject()); + //TODO 接下来遍历所有的函数,去metrics的Map中去找对应的方法,并执行 + for (String metric : metrics) { + String name = metricsMap.get(metric).getString("name"); + //可能为空 + String fieldName = metricsMap.get(name).getString("fieldName"); + + //TODO 每次新增函数,需要改动此处代码 + functionSet(name, cacheMessage, cacheMessage.getString(name), event.getString(fieldName)); + } + cacheMap.put(dimensions, cacheMessage); + + } + } catch (Exception e) { + logger.error("计算节点异常,异常信息:" + e); + } + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("dimensions", "message")); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + Map<String, Object> conf = new HashMap<String, Object>(16); + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, StreamAggregateConfig.FIRST_AGG_TIME); + return conf; + } + + /** + * 根据schema描述对应字段进行操作的 函数集合 + * + * @param name 函数名称 + * @param cacheMessage 结果集 + * @param nameValue 当前值 + * @param fieldNameValue 新加值 + */ + private static void functionSet(String name, JSONObject cacheMessage, String nameValue, String fieldNameValue) { + switch (name) { + case "sessions": + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); + break; + case "c2s_byte_num": + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); + break; + case "s2c_byte_num": + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); + break; + case "c2s_pkt_num": + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); + break; + case "s2c_pkt_num": + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); + break; + case "c2s_ipfrag_num": + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); + break; + case "s2c_ipfrag_num": + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); + break; + case "s2c_tcp_lostlen": + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); + break; + case "c2s_tcp_lostlen": + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); + break; + case "c2s_tcp_unorder_num": + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); + break; + case "s2c_tcp_unorder_num": + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); + break; + + case "unique_sip_num": + //TODO + //cacheMessage.put(name, AggregateUtils.) + break; + case "unique_cip_num": + //TODO + break; + default: + break; + } + } + + +// cacheMessage.put("sessions", AggregateUtils.longSum(cacheMessage.getString("sessions"), event.getString("common_sessions"))); +// cacheMessage.put("c2s_byte_num", AggregateUtils.longSum(cacheMessage.getString("c2s_byte_num"), event.getString("common_c2s_byte_num"))); +// cacheMessage.put("s2c_byte_num", AggregateUtils.longSum(cacheMessage.getString("s2c_byte_num"), event.getString("common_s2c_byte_num"))); +// cacheMessage.put("c2s_pkt_num", AggregateUtils.longSum(cacheMessage.getString("c2s_pkt_num"), event.getString("common_c2s_pkt_num"))); +// cacheMessage.put("s2c_pkt_num", AggregateUtils.longSum(cacheMessage.getString("s2c_pkt_num"), event.getString("common_s2c_pkt_num"))); +// cacheMessage.put("c2s_ipfrag_num", AggregateUtils.longSum(cacheMessage.getString("c2s_ipfrag_num"), event.getString("common_c2s_ipfrag_num"))); +// cacheMessage.put("s2c_ipfrag_num", AggregateUtils.longSum(cacheMessage.getString("s2c_ipfrag_num"), event.getString("common_s2c_ipfrag_num"))); +// cacheMessage.put("c2s_tcp_lostlen", AggregateUtils.longSum(cacheMessage.getString("c2s_tcp_lostlen"), event.getString("common_c2s_tcp_lostlen"))); +// cacheMessage.put("s2c_tcp_lostlen", AggregateUtils.longSum(cacheMessage.getString("s2c_tcp_lostlen"), event.getString("common_s2c_tcp_lostlen"))); +// cacheMessage.put("c2s_tcp_unorder_num", AggregateUtils.longSum(cacheMessage.getString("c2s_tcp_unorder_num"), event.getString("common_c2s_tcp_unorder_num"))); +// cacheMessage.put("s2c_tcp_unorder_num", AggregateUtils.longSum(cacheMessage.getString("s2c_tcp_unorder_num"), event.getString("common_s2c_tcp_unorder_num"))); +} diff --git a/src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java b/src/main/java/cn/ac/iie/storm/bolt/GatheringBolt.java index 6a81fad..a9051db 100644 --- a/src/main/java/cn/ac/iie/storm/bolt/AggregateBolt.java +++ b/src/main/java/cn/ac/iie/storm/bolt/GatheringBolt.java @@ -24,9 +24,9 @@ import java.util.Map; * @Date2020/6/24 13:39 * @Version V1.0 **/ -public class AggregateBolt extends BaseBasicBolt { - private final static Logger logger = Logger.getLogger(AggregateBolt.class); - private static final long serialVersionUID = -7666031217706448622L; +public class GatheringBolt extends BaseBasicBolt { + private final static Logger logger = Logger.getLogger(GatheringBolt.class); + private static final long serialVersionUID = -6166717864837350277L; private HashMap<String, JSONObject> metricsMap; private HashMap<String, String[]> actionMap; private HashMap<String, JSONObject> cacheMap; @@ -40,7 +40,6 @@ public class AggregateBolt extends BaseBasicBolt { */ @Override public void prepare(Map stormConf, TopologyContext context) { -// timestampValue = System.currentTimeMillis() / 1000; String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); timestamp = AggregateUtils.getTimeMetric(schema); cacheMap = new HashMap<>(32); @@ -65,40 +64,27 @@ public class AggregateBolt extends BaseBasicBolt { cacheMap.clear(); } else { - String label = input.getStringByField("label"); - - //action中某个协议的所有function,如果没有就默认 - String[] metrics = actionMap.getOrDefault(label, actionMap.get("Default")); - + String[] metrics = actionMap.get("Default"); String dimensions = input.getStringByField("dimensions"); - String message = input.getStringByField("message"); //一条数据 JSONObject event = JSONObject.parseObject(message); + //数据中的key值 (protocol,device_id,isp) //map中对应的数据,可能为空,为空就默认创建一个对象 JSONObject cacheMessage = cacheMap.getOrDefault(dimensions, new JSONObject()); //TODO 接下来遍历所有的函数,去metrics的Map中去找对应的方法,并执行 for (String metric : metrics) { String name = metricsMap.get(metric).getString("name"); - //可能为空 - String fieldName = metricsMap.get(name).getString("fieldName"); - String nameValue = cacheMessage.getString(name); - //map中的字段值 - nameValue = (nameValue == null) ? "0" : nameValue; - - String fieldNameValue = event.getString(fieldName); - //数据中的字段值 - fieldNameValue = (fieldNameValue == null) ? "0" : fieldNameValue; - //TODO 每次新增函数,需要改动此处代码 - functionSet(name, cacheMessage, nameValue, fieldNameValue); + functionSet(name, cacheMessage, cacheMessage.getString(name), event.getString(name)); } cacheMap.put(dimensions, cacheMessage); } } catch (Exception e) { + e.printStackTrace(); logger.error("计算节点异常,异常信息:" + e); } @@ -112,7 +98,7 @@ public class AggregateBolt extends BaseBasicBolt { @Override public Map<String, Object> getComponentConfiguration() { Map<String, Object> conf = new HashMap<String, Object>(16); - conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, StreamAggregateConfig.AGG_TIME); + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, StreamAggregateConfig.SECOND_AGG_TIME); return conf; } @@ -127,37 +113,37 @@ public class AggregateBolt extends BaseBasicBolt { private static void functionSet(String name, JSONObject cacheMessage, String nameValue, String fieldNameValue) { switch (name) { case "sessions": - cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); break; case "c2s_byte_num": - cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); break; case "s2c_byte_num": - cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); break; case "c2s_pkt_num": - cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); break; case "s2c_pkt_num": - cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); break; case "c2s_ipfrag_num": - cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); break; case "s2c_ipfrag_num": - cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); break; case "s2c_tcp_lostlen": - cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); break; case "c2s_tcp_lostlen": - cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); break; case "c2s_tcp_unorder_num": - cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); break; case "s2c_tcp_unorder_num": - cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue))); + cacheMessage.put(name, AggregateUtils.longSum(nameValue, fieldNameValue)); break; case "unique_sip_num": diff --git a/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java b/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java index 9c59c99..d7fdad9 100644 --- a/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java +++ b/src/main/java/cn/ac/iie/storm/bolt/ParseKvBolt.java @@ -64,46 +64,59 @@ public class ParseKvBolt extends BaseBasicBolt { if (TupleUtils.isTick(tuple)) { updateAppRelation(appMap); } else { - //TODO 解析tuple的 message - JSONObject message = JSONObject.parseObject(tuple.getStringByField("source")); - - //TODO 新建一个dimensions的Json对象 - JSONObject dimensionsObj = transDimensions(dimensions, message); - - for (Object transform : transforms) { - JSONObject transformObj = JSONObject.parseObject(transform.toString()); - String function = transformObj.getString("function"); - String name = transformObj.getString("name"); - String fieldName = transformObj.getString("fieldName"); - String parameters = transformObj.getString("parameters"); - - switch (function) { - case "alignment": - if (StringUtil.isNotBlank(parameters)) { - if (message.containsKey(fieldName)) { - alignmentUtils(parameters, message, name, fieldName); - } + String source = tuple.getStringByField("source"); + if (StringUtil.isNotBlank(source)) { + //TODO 解析tuple的 message + JSONObject message = JSONObject.parseObject(tuple.getStringByField("source")); + String protocolLabel = message.getString("common_protocol_label"); + + if (StringUtil.isNotBlank(protocolLabel)) { + + //TODO 新建一个dimensions的Json对象 + JSONObject dimensionsObj = transDimensions(dimensions, message); + + for (Object transform : transforms) { + JSONObject transformObj = JSONObject.parseObject(transform.toString()); + String function = transformObj.getString("function"); + String name = transformObj.getString("name"); + String fieldName = transformObj.getString("fieldName"); + String parameters = transformObj.getString("parameters"); + + switch (function) { + case "alignment": + if (StringUtil.isNotBlank(parameters)) { + if (message.containsKey(fieldName)) { + alignmentUtils(parameters, message, name, fieldName); + } + } + break; + case "combination": + if (StringUtil.isNotBlank(parameters)) { + String l7Proto = message.getString("common_l7_protocol"); + if (StringUtil.isNotBlank(l7Proto)) { + String res = l7Proto + "/" + message.getString(fieldName); + message.put(fieldName, res); + dimensionsObj.put(name, res); + } + combinationUtils(parameters, message, name, fieldName, dimensionsObj); + collector.emit(new Values(dimensionsObj.getString(name), dimensionsObj.toString(), message.toString())); + } + break; +// case "hierarchy": +// String hierarchyValue = message.getString(fieldName); +// //TODO 执行拆分代码 +// if (StringUtil.isNotBlank(hierarchyValue) && StringUtil.isNotBlank(parameters)) { +// String[] hierarchyPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); +// String[] dimensionsArr = hierarchyValue.split(hierarchyPars[0]); +// //TODO 递归拼接tuple并发送出去 +// System.out.println(dimensionsObj.get(name) + "---" + dimensionsObj.toString() + "---" + message.toString()); +// AggregateUtils.reSend(1, dimensionsArr, "", collector, message, dimensionsObj, name); +// } +// break; + default: + break; } - break; - case "combination": - if (StringUtil.isNotBlank(parameters)) { - combinationUtils(parameters, message, name, fieldName, dimensionsObj); - } - break; - case "hierarchy": - String hierarchyValue = message.getString(fieldName); - //TODO 执行拆分代码 - if (StringUtil.isNotBlank(hierarchyValue) && StringUtil.isNotBlank(parameters)) { - String[] hierarchyPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); - String[] dimensionsArr = hierarchyValue.split(hierarchyPars[0]); - //TODO 递归拼接tuple并发送出去 - AggregateUtils.reSend(1, dimensionsArr, "", collector, message, dimensionsObj, name); - } - break; - default: - //数据原样输出 - collector.emit(new Values(null, null, message.toString())); - break; + } } } } @@ -138,13 +151,13 @@ public class ParseKvBolt extends BaseBasicBolt { private static void alignmentUtils(String parameters, JSONObject message, String name, String fieldName) { String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); String data = message.getString(fieldName); - + logger.warn("alignmentPars=" + Arrays.toString(alignmentPars) + "data=" + data); int subscript = Integer.parseInt(alignmentPars[0]); String[] fieldSplit = data.split(alignmentPars[1]); - Long appID = Long.valueOf(fieldSplit[subscript]); + Long appId = Long.valueOf(fieldSplit[subscript]); int length = fieldSplit[subscript].length(); StringBuilder sb = new StringBuilder(data); - message.put(name, sb.replace(0, length, appMap.get(appID))); + message.put(name, sb.replace(0, length, appMap.get(appId))); } /** diff --git a/src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java b/src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java index 36e57a6..af53133 100644 --- a/src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java +++ b/src/main/java/cn/ac/iie/storm/bolt/ResultSendBolt.java @@ -1,8 +1,12 @@ package cn.ac.iie.storm.bolt; +import cn.ac.iie.storm.utils.combine.AggregateUtils; import cn.ac.iie.storm.utils.common.LogSendKafka; import cn.ac.iie.storm.utils.file.StreamAggregateConfig; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import kafka.utils.json.JsonObject; import org.apache.log4j.Logger; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; @@ -31,7 +35,23 @@ public class ResultSendBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { try { - logSendKafka.sendMessage(tuple.getStringByField("message")); + String message = tuple.getStringByField("message"); + if (StringUtil.isNotBlank(message)) { + JSONObject jsonObject = JSONObject.parseObject(message); + StringBuffer stringBuffer = new StringBuffer(); + String[] protocolIds = jsonObject.getString("protocol_id").split("/"); + for (int i = (protocolIds.length - 1); i >= 0; i--) { + if (StringUtil.isBlank(stringBuffer.toString())) { + stringBuffer.append(protocolIds[i]); + jsonObject.put("protocol_id", stringBuffer.toString()); + logSendKafka.sendMessage(jsonObject.toString()); + } else { + stringBuffer.append("/").append(protocolIds[i]); + jsonObject.put("protocol_id", stringBuffer.toString()); + logSendKafka.sendMessage(jsonObject.toString()); + } + } + } } catch (Exception e) { logger.error(StreamAggregateConfig.RESULTS_OUTPUT_TOPIC + "日志发送Kafka过程出现异常,异常信息:" + e); } @@ -41,4 +61,5 @@ public class ResultSendBolt extends BaseBasicBolt { public void declareOutputFields(OutputFieldsDeclarer declarer) { } + } diff --git a/src/main/java/cn/ac/iie/storm/bolt/change/FilterBolt.java b/src/main/java/cn/ac/iie/storm/bolt/change/FilterBolt.java deleted file mode 100644 index 6093a94..0000000 --- a/src/main/java/cn/ac/iie/storm/bolt/change/FilterBolt.java +++ /dev/null @@ -1,68 +0,0 @@ -package cn.ac.iie.storm.bolt.change; - -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import org.apache.storm.topology.BasicOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseBasicBolt; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; - -/** - * @ClassNameFilterBolt - * @Author [email protected] - * @Date2020/7/1 12:02 - * @Version V1.0 - **/ -public class FilterBolt extends BaseBasicBolt { - @Override - public void execute(Tuple input, BasicOutputCollector collector) { - JSONObject source = JSONObject.parseObject(input.getStringByField("source")); - String schema = ""; - - String data = JSONObject.parseObject(schema).getString("data"); - - String filters = JSONObject.parseObject(data).getString("filters"); - - boolean flag = true; - String type = JSONObject.parseObject(filters).getString("type"); - JSONArray fieldsArr = JSONObject.parseArray(JSONObject.parseObject(filters).getString("fields")); - if ("and".equals(type)) { - for (int i = 0; i < fieldsArr.size(); i++) { - - JSONObject field = JSONObject.parseObject(fieldsArr.get(i).toString()); - String name = field.getString("fieldName"); - String fieldType = field.getString("type"); - Object values = field.get("values"); - - Object nameValue = source.get(name); - - System.out.println("nameValue ========" +nameValue); - - if ("not".equals(fieldType)) { - - if (nameValue == values) { - //满足过滤条件 - flag = false; - } - - } else if ("in".equals(fieldType)) { - if (!values.toString().contains(nameValue.toString())) { - //满足过滤条件 - flag = false; - } - } - }} - - - - collector.emit(new Values(source)); - - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("filter")); - } -} diff --git a/src/main/java/cn/ac/iie/storm/bolt/print/PrintBolt.java b/src/main/java/cn/ac/iie/storm/bolt/print/PrintBolt.java deleted file mode 100644 index e251f21..0000000 --- a/src/main/java/cn/ac/iie/storm/bolt/print/PrintBolt.java +++ /dev/null @@ -1,40 +0,0 @@ -package cn.ac.iie.storm.bolt.print; - -import org.apache.log4j.Logger; -import org.apache.storm.topology.BasicOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseBasicBolt; -import org.apache.storm.tuple.Tuple; - -/** - * @ClassNamePrintBolt - * @Author [email protected] - * @Date2020/6/28 15:34 - * @Version V1.0 - **/ -public class PrintBolt extends BaseBasicBolt { - private final static Logger logger = Logger.getLogger(PrintBolt.class); - private static long a; - private long b; - public static long c; - - @Override - public void execute(Tuple input, BasicOutputCollector collector) { - logger.error("==================================一批数据========================="); - - a= System.currentTimeMillis(); - b= System.currentTimeMillis(); - c= System.currentTimeMillis(); - - - logger.error(Thread.currentThread() + "private static long a======:" + a); - logger.error(Thread.currentThread() + "private long b======:" + b); - logger.error(Thread.currentThread() + "public static long c======:" + c); - - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - - } -} diff --git a/src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java index fd6dfba..feeb86c 100644 --- a/src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java +++ b/src/main/java/cn/ac/iie/storm/spout/CustomizedKafkaSpout.java @@ -1,6 +1,7 @@ package cn.ac.iie.storm.spout; import cn.ac.iie.storm.utils.file.StreamAggregateConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -39,6 +40,11 @@ public class CustomizedKafkaSpout extends BaseRichSpout { props.put("auto.offset.reset", StreamAggregateConfig.AUTO_OFFSET_RESET); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + + /** + * 限流配置-20201117 + */ + props.put(ConsumerConfig.CLIENT_ID_CONFIG, StreamAggregateConfig.CONSUMER_CLIENT_ID); return props; } diff --git a/src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java b/src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java index ed38bce..520b829 100644 --- a/src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java +++ b/src/main/java/cn/ac/iie/storm/topology/StreamAggregateTopology.java @@ -1,6 +1,7 @@ package cn.ac.iie.storm.topology; -import cn.ac.iie.storm.bolt.AggregateBolt; +import cn.ac.iie.storm.bolt.CalculateBolt; +import cn.ac.iie.storm.bolt.GatheringBolt; import cn.ac.iie.storm.bolt.ResultSendBolt; import cn.ac.iie.storm.bolt.ParseKvBolt; import cn.ac.iie.storm.spout.CustomizedKafkaSpout; @@ -66,15 +67,17 @@ public class StreamAggregateTopology { builder = new TopologyBuilder(); builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), StreamAggregateConfig.SPOUT_PARALLELISM); - builder.setBolt("ParseKvBolt", new ParseKvBolt(), StreamAggregateConfig.DATACENTER_BOLT_PARALLELISM) + builder.setBolt("ParseKvBolt", new ParseKvBolt(), StreamAggregateConfig.PARSE_BOLT_PARALLELISM) .localOrShuffleGrouping("CustomizedKafkaSpout"); - builder.setBolt("AggregateBolt", new AggregateBolt(), StreamAggregateConfig.DATACENTER_BOLT_PARALLELISM) - .fieldsGrouping("ParseKvBolt", new Fields("dimensions")); + builder.setBolt("CalculateBolt", new CalculateBolt(), StreamAggregateConfig.CALCULATE_BOLT_PARALLELISM) + .localOrShuffleGrouping("ParseKvBolt"); + + builder.setBolt("GatheringBolt", new GatheringBolt(), StreamAggregateConfig.GATHERING_BOLT_PARALLELISM) + .fieldsGrouping("CalculateBolt", new Fields("dimensions")); builder.setBolt("ResultSendBolt", new ResultSendBolt(), StreamAggregateConfig.KAFKA_BOLT_PARALLELISM) - .localOrShuffleGrouping("AggregateBolt"); -// builder.setBolt("PrintBolt", new PrintBolt(), 3).localOrShuffleGrouping("LogFlowWriteSpout"); + .localOrShuffleGrouping("GatheringBolt"); } diff --git a/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java b/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java index c270040..de5938c 100644 --- a/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java +++ b/src/main/java/cn/ac/iie/storm/utils/combine/AggregateUtils.java @@ -27,9 +27,11 @@ public class AggregateUtils { * @param value2 第二个值 * @return value1 + value2 */ - public static Long longSum(Long value1, Long value2) { + public static Long longSum(String value1, String value2) { + value1 = (value1 == null) ? "0" : value1; + value2 = (value2 == null) ? "0" : value2; - return value1 + value2; + return Long.parseLong(value1) + Long.parseLong(value2); } /** @@ -81,23 +83,6 @@ public class AggregateUtils { * @param name */ public static void reSend(int headIndex, String[] splitArr, String initStr, BasicOutputCollector collector, JSONObject message, JSONObject dimesionsObj, String name) { - -// //递归拼接字符串 -// if (splitArr.length == headIndex - 1) { -// //什么也不做 -// } else { -// //递归的核心代码 -// if ("".equals(initStr)) { -// initStr = splitArr[splitArr.length - headIndex]; -// } else { -// initStr = initStr + "/" + splitArr[splitArr.length - headIndex]; -// } -// dimesionsObj.put(name, initStr); -// -// collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString())); -// reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name); -// } - //递归拼接字符串 if (splitArr.length != headIndex - 1) { //递归的核心代码 @@ -109,8 +94,6 @@ public class AggregateUtils { dimesionsObj.put(name, initStr); collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString())); - - reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name); } } @@ -156,6 +139,17 @@ public class AggregateUtils { .getString("name"); } + public static void main(String[] args) { + String sc = "{\"status\":200,\"code\":\"200666\",\"queryKey\":null,\"success\":true,\"message\":\"ok\",\"statistics\":null,\"formatType\":null,\"meta\":null,\"data\":{\"type\":\"record\",\"name\":\"liveChart\",\"doc\":{\"timestamp\":{\"name\":\"stat_time\",\"type\":\"Long\"},\"dimensions\":[{\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"type\":\"String\"},{\"name\":\"entrance_id\",\"fieldName\":\"common_entrance_id\",\"type\":\"String\"},{\"name\":\"isp\",\"fieldName\":\"common_isp\",\"type\":\"String\"},{\"name\":\"data_center\",\"fieldName\":\"common_data_center\",\"type\":\"String\"}],\"metrics\":[{\"function\":\"sum\",\"name\":\"sessions\",\"fieldName\":\"common_sessions\"},{\"function\":\"sum\",\"name\":\"c2s_byte_num\",\"fieldName\":\"common_c2s_byte_num\"},{\"function\":\"sum\",\"name\":\"s2c_byte_num\",\"fieldName\":\"common_s2c_byte_num\"},{\"function\":\"sum\",\"name\":\"c2s_pkt_num\",\"fieldName\":\"common_c2s_pkt_num\"},{\"function\":\"sum\",\"name\":\"s2c_pkt_num\",\"fieldName\":\"common_s2c_pkt_num\"},{\"function\":\"sum\",\"name\":\"c2s_ipfrag_num\",\"fieldName\":\"common_c2s_ipfrag_num\"},{\"function\":\"sum\",\"name\":\"s2c_ipfrag_num\",\"fieldName\":\"common_s2c_ipfrag_num\"},{\"function\":\"sum\",\"name\":\"c2s_tcp_lostlen\",\"fieldName\":\"common_c2s_tcp_lostlen\"},{\"function\":\"sum\",\"name\":\"s2c_tcp_lostlen\",\"fieldName\":\"common_s2c_tcp_lostlen\"},{\"function\":\"sum\",\"name\":\"c2s_tcp_unorder_num\",\"fieldName\":\"common_c2s_tcp_unorder_num\"},{\"function\":\"sum\",\"name\":\"s2c_tcp_unorder_num\",\"fieldName\":\"common_s2c_tcp_unorder_num\"},{\"function\":\"disCount\",\"name\":\"unique_sip_num\",\"fieldName\":\"common_server_ip\"},{\"function\":\"disCount\",\"name\":\"unique_cip_num\",\"fieldName\":\"common_client_ip\"}],\"filters\":{\"type\":\"and\",\"fields\":[{\"fieldName\":\"common_protocol_label\",\"type\":\"not\",\"values\":null}]},\"transforms\":[{\"fieldName\":\"common_app_label\",\"function\":\"replaceapp\",\"name\":\"common_app_label\",\"parameters\":\"0,/\"},{\"function\":\"combination\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\":\"common_app_label,/\"},{\"function\":\"hierarchy\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\":\"/\"}],\"action\":[{\"label\":\"Default\",\"metrics\":\"sessions,c2s_byte_num,s2c_byte_num,c2s_pkt_num,s2c_pkt_num,c2s_ipfrag_num,s2c_ipfrag_num,c2s_tcp_lostlen,s2c_tcp_lostlen,c2s_tcp_unorder_num,s2c_tcp_unorder_num\"}],\"granularity\":{\"type\":\"period\",\"period\":\"5M\"}},\"fields\":[],\"task\":\"Protocol-Distribution\",\"in\":\"CONNECTION-RECORD-COMPLETED-LOG\",\"out\":\"TRAFFIC-PROTOCOL-STAT-LOG\"}}"; + JSONObject jsonObject = JSONObject.parseObject(sc); + String data = jsonObject.getString("data"); + + System.out.println(JSONObject.parseObject(JSONObject.parseObject(JSONObject.parseObject(data) + .getString("doc")) + .getString("timestamp")) + .getString("name")); + } + /** * 更新缓存中的对应关系map * @@ -183,6 +177,7 @@ public class AggregateUtils { System.out.println((System.currentTimeMillis() - begin)); logger.warn("更新缓存中的对应的APP关系,拉取接口数据长度:[" + objects.size()); } catch (Exception e) { + e.printStackTrace(); logger.error("更新缓存APP-ID失败,异常:" + e); } } diff --git a/src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java b/src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java index 378fea6..ee42553 100644 --- a/src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java +++ b/src/main/java/cn/ac/iie/storm/utils/common/LogSendKafka.java @@ -69,7 +69,13 @@ public class LogSendKafka { properties.put("request.timeout.ms", 30000); properties.put("batch.size", 262144); properties.put("buffer.memory", 33554432); - properties.put("compression.type", StreamAggregateConfig.KAFKA_COMPRESSION_TYPE); + + /** + * 限流配置-20201117 + */ + properties.put(ProducerConfig.CLIENT_ID_CONFIG, StreamAggregateConfig.PRODUCER_CLIENT_ID); + properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, StreamAggregateConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); + kafkaProducer = new KafkaProducer<>(properties); } diff --git a/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java b/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java index a768ce1..fdeace4 100644 --- a/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java +++ b/src/main/java/cn/ac/iie/storm/utils/file/StreamAggregateConfig.java @@ -11,7 +11,9 @@ public class StreamAggregateConfig { * System */ public static final Integer SPOUT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "spout.parallelism"); - public static final Integer DATACENTER_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "datacenter.bolt.parallelism"); + public static final Integer CALCULATE_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "calculate.bolt.parallelism"); + public static final Integer GATHERING_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "gathering.bolt.parallelism"); + public static final Integer PARSE_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "parse.bolt.parallelism"); public static final Integer TOPOLOGY_WORKERS = StreamAggregateConfigurations.getIntProperty(0, "topology.workers"); public static final Integer KAFKA_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "kafka.bolt.parallelism"); public static final Integer TOPOLOGY_NUM_ACKS = StreamAggregateConfigurations.getIntProperty(0, "topology.num.acks"); @@ -21,7 +23,8 @@ public class StreamAggregateConfig { public static final Integer MAX_FAILURE_NUM = StreamAggregateConfigurations.getIntProperty(0, "max.failure.num"); - public static final Integer AGG_TIME = StreamAggregateConfigurations.getIntProperty(0, "agg.time"); + public static final Integer FIRST_AGG_TIME = StreamAggregateConfigurations.getIntProperty(0, "first.agg.time"); + public static final Integer SECOND_AGG_TIME = StreamAggregateConfigurations.getIntProperty(0, "second.agg.time"); public static final Integer UPDATE_APP_ID_TIME = StreamAggregateConfigurations.getIntProperty(0, "update.app.id.time"); @@ -34,7 +37,13 @@ public class StreamAggregateConfig { public static final String RESULTS_OUTPUT_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "results.output.topic"); public static final String KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "kafka.topic"); public static final String AUTO_OFFSET_RESET = StreamAggregateConfigurations.getStringProperty(0, "auto.offset.reset"); - public static final String KAFKA_COMPRESSION_TYPE = StreamAggregateConfigurations.getStringProperty(0, "kafka.compression.type"); + + /** + * kafka限流配置-20201117 + */ + public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = StreamAggregateConfigurations.getStringProperty(0, "producer.kafka.compression.type"); + public static final String CONSUMER_CLIENT_ID = StreamAggregateConfigurations.getStringProperty(0, "consumer.client.id"); + public static final String PRODUCER_CLIENT_ID = StreamAggregateConfigurations.getStringProperty(0, "producer.client.id"); /** * http diff --git a/src/test/java/com/wp/AppIdTest.java b/src/test/java/com/wp/AppIdTest.java deleted file mode 100644 index 39c9348..0000000 --- a/src/test/java/com/wp/AppIdTest.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.wp; - -import cn.ac.iie.storm.utils.file.StreamAggregateConfig; -import cn.ac.iie.storm.utils.http.HttpClientUtil; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; - -/** - * @author qidaijie - * @Package com.wp - * @Description: - * @date 2020/9/2215:09 - */ -public class AppIdTest { - - @Test - public void appTest() { - //http://192.168.44.12:9999/open-api/appDicList - String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP); - JSONObject jsonObject = JSONObject.parseObject(schema); - String data = jsonObject.getString("data"); - HashMap<Long, String> map = new HashMap<>(16); - JSONArray objects = JSONArray.parseArray(data); - for (Object object : objects) { - JSONArray jsonArray = JSONArray.parseArray(object.toString()); - map.put(jsonArray.getLong(0), jsonArray.getString(1)); -// System.out.println(object); - } - System.out.println(map.toString()); - - System.out.println(map.size()); - } - - @Test - public void changeApp() { - String a = "QQ"; - String[] alignmentPars = "0,/".split(StreamAggregateConfig.FORMAT_SPLITTER); - String data = "100/HTTP"; - int subscript = Integer.parseInt(alignmentPars[0]); - String[] fieldSplit = data.split(alignmentPars[1]); - Long appID = Long.valueOf(fieldSplit[subscript]); - int length = fieldSplit[subscript].length(); - StringBuilder sb = new StringBuilder(data); - - System.out.println(sb.replace(0, length, a)); - - - } -} diff --git a/src/test/java/com/wp/FilterBolt.java b/src/test/java/com/wp/FilterBolt.java deleted file mode 100644 index a3a9ee7..0000000 --- a/src/test/java/com/wp/FilterBolt.java +++ /dev/null @@ -1,133 +0,0 @@ -package com.wp; - -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; - -/** - * @ClassNameFilterBolt - * @Author [email protected] - * @Date2020/7/1 14:53 - * @Version V1.0 - **/ -public class FilterBolt { - @SuppressWarnings("all") - public static void main(String[] args) { - JSONObject source = new JSONObject(); - - - String schema = "{\n" + - " \"task\": \"Application-Protocol-Distribution\",\n" + - " \"in\": \"CONNECTION-SKETCH-COMPLETED\",\n" + - " \"out\": \"TRAFFIC-PROTOCOL-STAT-LOG\",\n" + - " \"data\": {\n" + - " \"timestamp\": {\n" + - " \"name\": \"stat_time\"\n" + - " },\n" + - " \"dimensions\": [\n" + - " {\n" + - " \"name\": \"protocol_id\",\n" + - " \"fieldName\": \"common_protocol_label\",\n" + - " \"type\": \"String\"\n" + - " },\n" + - " {\n" + - " \"name\": \"device_id\",\n" + - " \"fieldName\": \"common_device_id\",\n" + - " \"type\": \"String\"\n" + - " },\n" + - " {\n" + - " \"name\": \"isp\",\n" + - " \"fieldName\": \"common_isp\",\n" + - " \"type\": \"String\"\n" + - " }\n" + - " ],\n" + - " \"metrics\": [\n" + - " { \"function\" : \"sessions_count\", \"name\" : \"sessions\"},\n" + - " { \"function\" : \"c2s_byte_sum\", \"name\" : \"c2s_byte_len\", \"fieldName\" : \"common_c2s_byte_num\" },\n" + - " { \"function\" : \"s2c_byte_sum\", \"name\" : \"s2c_byte_len\", \"fieldName\" : \"common_s2c_byte_num\" },\n" + - " { \"function\" : \"c2s_pkt_sum\", \"name\" : \"c2s_pkt_num\", \"fieldName\" : \"common_c2s_pkt_num\" },\n" + - " { \"function\" : \"s2c_pkt_sum\", \"name\" : \"s2c_pkt_num\", \"fieldName\" : \"common_s2c_pkt_num\" },\n" + - " { \"function\" : \"sip_disCount\", \"name\" : \"unique_sip_num\", \"fieldName\" : \"common_server_ip\" },\n" + - " { \"function\" : \"cip_disCount\", \"name\" : \"unique_cip_num\", \"fieldName\" : \"common_client_ip\" }\n" + - " ],\n" + - " \"filters\": {\n" + - " \"type\": \"and\",\n" + - " \"fields\": [\n" + - " {\n" + - " \"fieldName\": \"common_device_id\",\n" + - " \"type\": \"not\",\n" + - " \"values\": null\n" + - " },\n" + - " {\n" + - " \"fieldName\": \"common_protocol_label\",\n" + - " \"type\": \"not\",\n" + - " \"values\": null\n" + - " },\n" + - " {\n" + - " \"fieldName\": \"common_isp\",\n" + - " \"type\": \"not\",\n" + - " \"values\": null\n" + - " }\n" + - " ]\n" + - " },\n" + - " \"transforms\":[\n" + - " {\"function\":\"combination\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\": \"common_app_label,/\"},\n" + - " {\"function\":\"hierarchy\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\": \"/\"}\n" + - " ],\n" + - " \"action\":[\n" + - " {\"label\": \"Default\", \"metrics\": \"sessions,c2s_byte_len,s2c_byte_len,c2s_pkt_num,s2c_pkt_num\"},\n" + - " {\"label\": \"HTTP\", \"metrics\": \"sessions,c2s_byte_len,s2c_byte_len,c2s_pkt_num,s2c_pkt_num,unique_sip_num,unique_cip_num\"}\n" + - " ],\n" + - " \"granularity\":{\n" + - " \"type\": \"period\",\n" + - " \"period\": \"5M\"\n" + - " }\n" + - " }\n" + - "}"; - - - source.put("common_protocol_label", "HTTP"); - source.put("common_isp", "Unicom"); - source.put("common_device_id", "1"); - String data = JSONObject.parseObject(schema).getString("data"); - - String filters = JSONObject.parseObject(data).getString("filters"); - - boolean flag = true; - String type = JSONObject.parseObject(filters).getString("type"); - JSONArray fieldsArr = JSONObject.parseArray(JSONObject.parseObject(filters).getString("fields")); - if ("and".equals(type)) { - for (int i = 0; i < fieldsArr.size(); i++) { - - JSONObject field = JSONObject.parseObject(fieldsArr.get(i).toString()); - String name = field.getString("fieldName"); - String fieldType = field.getString("type"); - Object values = field.get("values"); - - Object nameValue = source.get(name); - - - if ("not".equals(fieldType)) { - - if (nameValue == values) { - //满足过滤条件 - flag = false; - } - - } else if ("in".equals(fieldType)) { - if (!values.toString().contains(nameValue.toString())) { - //满足过滤条件 - flag = false; - } - } - } - - if (flag){ - System.out.println("输出到下个Bolt"); - }else { - - System.out.println("此条消息被过滤掉"); - } - - } - } -} diff --git a/src/test/java/com/wp/SchemaTest.java b/src/test/java/com/wp/SchemaTest.java deleted file mode 100644 index f275592..0000000 --- a/src/test/java/com/wp/SchemaTest.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.wp; - -import com.alibaba.fastjson.JSONObject; -import com.zdjizhi.utils.StringUtil; - -/** - * @ClassNameSchemaTest - * @Author [email protected] - * @Date2020/6/28 10:11 - * @Version V1.0 - **/ -public class SchemaTest { - - static String str = ""; - - public static void main(String[] args) { - - - String str1 = null; - String str2 = " "; - - - System.out.println(StringUtil.isNotBlank(str1)); - System.out.println(StringUtil.isNotEmpty(str1)); - - System.out.println(StringUtil.isNotBlank(str2)); - System.out.println(StringUtil.isNotEmpty(str2)); - - } - - public static void reAdd(int m, String[] split, String str) { - - //递归拼接字符串 - if (0 == m) { - } else { - //递归的核心代码 - str = str + split[m - 1] + "/"; - reAdd(m - 1, split, str); - - } - - } -} |
