summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorwangchengcheng <[email protected]>2022-08-11 14:29:07 +0800
committerwangchengcheng <[email protected]>2022-08-11 14:29:07 +0800
commitf64d73fc4ceb95a231348630d02670e4a10377db (patch)
treefcae686a8c2e73c675abda036a5c979b3ed8829d /src
parent37975911b7a358a4510abdcf995082a9bce15aa9 (diff)
1.添加yys、ydy、mddy等监控指标。HEADmaster
2.修改getDy()函数业务逻辑。
Diffstat (limited to 'src')
-rw-r--r--src/main/java/cn/ac/iie/storm/spout/ConnectionKafkaSpout.java74
-rw-r--r--src/main/java/cn/ac/iie/storm/spout/SessionKafkaSpout.java71
-rw-r--r--src/main/java/cn/ac/iie/storm/spout/TransactionKafkaSpout.java71
-rw-r--r--src/main/java/cn/ac/iie/storm/utils/general/GetMap.java195
-rw-r--r--src/test/java/TestUFT.java10
-rw-r--r--src/test/java/TestUnicode.java15
6 files changed, 436 insertions, 0 deletions
diff --git a/src/main/java/cn/ac/iie/storm/spout/ConnectionKafkaSpout.java b/src/main/java/cn/ac/iie/storm/spout/ConnectionKafkaSpout.java
new file mode 100644
index 0000000..e774bd1
--- /dev/null
+++ b/src/main/java/cn/ac/iie/storm/spout/ConnectionKafkaSpout.java
@@ -0,0 +1,74 @@
+package cn.ac.iie.storm.spout;
+
+import cn.ac.iie.storm.utils.config.StreamAggregateConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.log4j.Logger;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * kafkaSpout
+ */
+public class ConnectionKafkaSpout extends BaseRichSpout {
+ private static final long serialVersionUID = -3363788553406229592L;
+ private KafkaConsumer<String, String> consumer;
+ private SpoutOutputCollector collector = null;
+ private TopologyContext context = null;
+ private final static Logger logger = Logger.getLogger(ConnectionKafkaSpout.class);
+
+ private static Properties createConsumerConfig() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", StreamAggregateConfig.BOOTSTRAP_SERVERS);
+ props.put("group.id", StreamAggregateConfig.GROUP_ID);
+ props.put("session.timeout.ms", "60000");
+ props.put("max.poll.records", 3000);
+ props.put("max.partition.fetch.bytes", 31457280);
+ props.put("auto.offset.reset", StreamAggregateConfig.AUTO_OFFSET_RESET);
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ logger.info("链接kafka"+StreamAggregateConfig.BOOTSTRAP_SERVERS+"成功");
+ return props;
+ }
+
+ @Override
+ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+ this.collector=spoutOutputCollector;
+ this.context=topologyContext;
+ Properties prop = createConsumerConfig();
+ this.consumer = new KafkaConsumer<>(prop);
+ this.consumer.subscribe(Collections.singletonList(StreamAggregateConfig.KAFKA_TOPIC));
+ }
+ @Override
+ public void close() {
+ consumer.close();
+ }
+ @Override
+ public void nextTuple() {
+ try {
+ ConsumerRecords<String, String> records = consumer.poll(StreamAggregateConfig.KAFKA_COMSUMER_POLL);
+ Thread.sleep(StreamAggregateConfig.TOPOLOGY_SPOUT_SLEEP_TIME);
+ for (ConsumerRecord<String, String> record : records) {
+ this.collector.emit(new Values(record.value()));
+ }
+ } catch (Exception e) {
+ logger.error("KafkaSpout发送消息出现异常!", e);
+ e.printStackTrace();
+ }
+ }
+
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("flooding-source"));
+ }
+}
diff --git a/src/main/java/cn/ac/iie/storm/spout/SessionKafkaSpout.java b/src/main/java/cn/ac/iie/storm/spout/SessionKafkaSpout.java
new file mode 100644
index 0000000..081c3a0
--- /dev/null
+++ b/src/main/java/cn/ac/iie/storm/spout/SessionKafkaSpout.java
@@ -0,0 +1,71 @@
+package cn.ac.iie.storm.spout;
+
+import cn.ac.iie.storm.utils.config.StreamAggregateConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.log4j.Logger;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+public class SessionKafkaSpout extends BaseRichSpout {
+ private static final long serialVersionUID = -3363788553406229592L;
+ private KafkaConsumer<String, String> consumer;
+ private SpoutOutputCollector collector = null;
+ private TopologyContext context = null;
+ private final static Logger logger = Logger.getLogger(ConnectionKafkaSpout.class);
+
+ private static Properties createConsumerConfig() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", StreamAggregateConfig.BOOTSTRAP_SERVERS);
+ props.put("group.id", StreamAggregateConfig.GROUP_ID);
+ props.put("session.timeout.ms", "60000");
+ props.put("max.poll.records", 3000);
+ props.put("max.partition.fetch.bytes", 31457280);
+ props.put("auto.offset.reset", StreamAggregateConfig.AUTO_OFFSET_RESET);
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ logger.info("链接kafka"+StreamAggregateConfig.BOOTSTRAP_SERVERS+"成功");
+ return props;
+ }
+
+ @Override
+ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+ this.collector=spoutOutputCollector;
+ this.context=topologyContext;
+ Properties prop = createConsumerConfig();
+ this.consumer = new KafkaConsumer<>(prop);
+ this.consumer.subscribe(Collections.singletonList(StreamAggregateConfig.KAFKA_TOPIC_SESSION));
+ }
+ @Override
+ public void close() {
+ consumer.close();
+ }
+ @Override
+ public void nextTuple() {
+ try {
+ ConsumerRecords<String, String> records = consumer.poll(StreamAggregateConfig.KAFKA_COMSUMER_POLL);
+ Thread.sleep(StreamAggregateConfig.TOPOLOGY_SPOUT_SLEEP_TIME);
+ for (ConsumerRecord<String, String> record : records) {
+ this.collector.emit(new Values(record.value()));
+ }
+ } catch (Exception e) {
+ logger.error("KafkaSpout发送消息出现异常!", e);
+ e.printStackTrace();
+ }
+ }
+
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("session-source"));
+ }
+}
diff --git a/src/main/java/cn/ac/iie/storm/spout/TransactionKafkaSpout.java b/src/main/java/cn/ac/iie/storm/spout/TransactionKafkaSpout.java
new file mode 100644
index 0000000..6fbdde0
--- /dev/null
+++ b/src/main/java/cn/ac/iie/storm/spout/TransactionKafkaSpout.java
@@ -0,0 +1,71 @@
+package cn.ac.iie.storm.spout;
+
+import cn.ac.iie.storm.utils.config.StreamAggregateConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.log4j.Logger;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+public class TransactionKafkaSpout extends BaseRichSpout {
+ private static final long serialVersionUID = -3363788553406229592L;
+ private KafkaConsumer<String, String> consumer;
+ private SpoutOutputCollector collector = null;
+ private TopologyContext context = null;
+ private final static Logger logger = Logger.getLogger(ConnectionKafkaSpout.class);
+
+ private static Properties createConsumerConfig() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", StreamAggregateConfig.BOOTSTRAP_SERVERS);
+ props.put("group.id", StreamAggregateConfig.GROUP_ID);
+ props.put("session.timeout.ms", "60000");
+ props.put("max.poll.records", 3000);
+ props.put("max.partition.fetch.bytes", 31457280);
+ props.put("auto.offset.reset", StreamAggregateConfig.AUTO_OFFSET_RESET);
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ logger.info("链接kafka"+StreamAggregateConfig.BOOTSTRAP_SERVERS+"成功");
+ return props;
+ }
+
+ @Override
+ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+ this.collector=spoutOutputCollector;
+ this.context=topologyContext;
+ Properties prop = createConsumerConfig();
+ this.consumer = new KafkaConsumer<>(prop);
+ this.consumer.subscribe(Collections.singletonList(StreamAggregateConfig.KAFKA_TOPIC_TRANSACTION));
+ }
+ @Override
+ public void close() {
+ consumer.close();
+ }
+ @Override
+ public void nextTuple() {
+ try {
+ ConsumerRecords<String, String> records = consumer.poll(StreamAggregateConfig.KAFKA_COMSUMER_POLL);
+ Thread.sleep(StreamAggregateConfig.TOPOLOGY_SPOUT_SLEEP_TIME);
+ for (ConsumerRecord<String, String> record : records) {
+ this.collector.emit(new Values(record.value()));
+ }
+ } catch (Exception e) {
+ logger.error("KafkaSpout发送消息出现异常!", e);
+ e.printStackTrace();
+ }
+ }
+
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("transaction-source"));
+ }
+}
diff --git a/src/main/java/cn/ac/iie/storm/utils/general/GetMap.java b/src/main/java/cn/ac/iie/storm/utils/general/GetMap.java
new file mode 100644
index 0000000..e4c2494
--- /dev/null
+++ b/src/main/java/cn/ac/iie/storm/utils/general/GetMap.java
@@ -0,0 +1,195 @@
+package cn.ac.iie.storm.utils.general;
+
+import com.zdjizhi.utils.StringUtil;
+import org.apache.logging.log4j.util.PropertiesUtil;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class GetMap {
+ private static Properties propService = new Properties();
+ private static Properties propService1 = new Properties();
+
+
+ private static Properties propService_city = new Properties();
+ private static Properties propService_province = new Properties();
+ private static Properties propService_country = new Properties();
+ private static Properties propService_jstx = new Properties();
+
+
+ private static Map<String, String> labelMap;
+ private static Map<String, String> appMap;
+ private static Map<String, String> cityMap;
+ private static Map<String, String> provinceMap;
+ private static Map<String, String> countryMap;
+ private static Map<String, String> jstxIdMap;
+
+
+
+ public static String getJstxDetail_id(int common_app_id){
+ String appId = String.valueOf(common_app_id);
+ if (appMap.get(appId) != null) {
+ return appMap.get(appId);
+ } else {
+ return appId;
+ }
+ }
+
+ public static String getProto_id(String common_schema_type){
+ return labelMap.get(common_schema_type);
+ }
+
+ public static String getDy(String location){
+ String[] split;
+ if(location==null){
+ return "0";
+ } else if (location.contains("..")){
+
+ //处理新疆省内location逻辑。
+ split = location.split("\\.\\.");
+ if (split.length == 2) {
+ String xjCity = split[0];
+ if (xjCity.length()<1) {
+ String xjCounty = split[1];
+ if (xjCounty == null) {
+ xjCounty = "0";
+ }
+ return xjCounty;
+ }
+ return xjCity;
+ }else {
+ return "0";
+ }
+ } else {
+ //处理新疆省外location逻辑
+ //消除location中的空格
+ String address = location.replace(" ","");
+ String thirdLevelAdd = cityMap.get(address);
+ if (StringUtil.isNotBlank(thirdLevelAdd)){
+ System.out.println("执行的这里");
+ return thirdLevelAdd;
+ }else {
+ String[] add = address.split("\\.");
+ if (StringUtil.isNotBlank( provinceMap.get(add[1]))){
+ return provinceMap.get(add[1]);
+ }else if (StringUtil.isNotBlank(countryMap.get(add[0]))){
+ return countryMap.get(add[0]);
+ }else {
+ return "0";
+ }
+ }
+ }
+ }
+
+
+ public static String getFjSzd(String location,String mail_from,String mail_protocol_type) {
+ if (mail_from != null&& mail_protocol_type.equals("SMTP")) {
+ String[] split;
+ if (location == null) {
+ return String.valueOf(3);
+ } else if (location.contains(",,")) {
+ return String.valueOf(1);
+ } else {
+ split = location.split(",");
+ if (split.length == 3) {
+ String country = split[2].replace(" ", "");
+ String s = countryMap.get(country);
+ if (country.equals("China")) {
+ return String.valueOf(1);
+ } else if (s != null) {
+ return String.valueOf(2);
+ } else {
+ return String.valueOf(3);
+ }
+ } else {
+ return String.valueOf(3);
+ }
+ }
+ }else {
+ return String.valueOf(3);
+ }
+ }
+
+
+ public static String getSjSzd(String location,String mail_to,String mail_protocol_type) {
+ if (mail_to != null&&(mail_protocol_type.equals("POP3")||mail_protocol_type.equals("IMAP"))) {
+ String[] split;
+ if (location == null) {
+ return String.valueOf(3);
+ } else if (location.contains(",,")) {
+ return String.valueOf(1);
+ } else {
+ split = location.split(",");
+ if (split.length == 3) {
+ String country = split[2].replace(" ", "");
+ String s = countryMap.get(country);
+ if (country.equals("China")) {
+ return String.valueOf(1);
+ } else if (s != null) {
+ return String.valueOf(2);
+ } else {
+ return String.valueOf(3);
+ }
+ } else {
+ return String.valueOf(3);
+ }
+ }
+ }else {
+ return String.valueOf(3);
+ }
+ }
+
+
+
+ public static String getDns_name(String dns_qname){
+ if (dns_qname==null){
+ return "";
+ }else if (dns_qname.contains(" ")){
+ return "";
+ }else {
+ return dns_qname;
+ }
+ }
+
+
+ static {
+ try {
+ InputStream applabel = PropertiesUtil.class.getClassLoader().getResourceAsStream("common_app_label_coding.properties");
+ InputStream app = PropertiesUtil.class.getClassLoader().getResourceAsStream("common_app_id_coding.properties");
+ InputStream jstxid = PropertiesUtil.class.getClassLoader().getResourceAsStream("common_jstx_id_coding.properties");
+
+ InputStream cityNumbering = PropertiesUtil.class.getClassLoader().getResourceAsStream("common_city_coding.properties");
+ InputStream provinceNumbering = PropertiesUtil.class.getClassLoader().getResourceAsStream("common_province_coding.properties");
+ InputStream countryNumbering = PropertiesUtil.class.getClassLoader().getResourceAsStream("common_country_coding.properties");
+
+
+ propService1.load(app);
+ propService.load(applabel);
+ propService_jstx.load(jstxid);
+
+ propService_city.load(cityNumbering);
+ propService_province.load(provinceNumbering);
+ propService_country.load(countryNumbering);
+
+
+ appMap = new HashMap<String, String>((Map)propService1);
+ labelMap = new HashMap<String, String>((Map) propService);
+ jstxIdMap = new HashMap<String, String>((Map) propService_jstx);
+ cityMap = new HashMap<String, String>((Map) propService_city);
+ provinceMap = new HashMap<String, String>((Map) propService_province);
+ countryMap = new HashMap<String, String>((Map) propService_country);
+ } catch (Exception e) {
+// propCommon = null;
+ propService = null;
+ propService1 = null;
+ propService_jstx = null;
+ propService_city = null;
+ propService_province = null;
+ propService_country = null;
+ }
+ }
+
+
+}
diff --git a/src/test/java/TestUFT.java b/src/test/java/TestUFT.java
new file mode 100644
index 0000000..b0cba09
--- /dev/null
+++ b/src/test/java/TestUFT.java
@@ -0,0 +1,10 @@
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+
+public class TestUFT {
+ public static void main(String[] args) throws UnsupportedEncodingException {
+ String str = "�й�";
+ System.out.println(new String( str.getBytes("GBK") , StandardCharsets.UTF_8));
+ }
+
+}
diff --git a/src/test/java/TestUnicode.java b/src/test/java/TestUnicode.java
new file mode 100644
index 0000000..a623898
--- /dev/null
+++ b/src/test/java/TestUnicode.java
@@ -0,0 +1,15 @@
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+
+public class TestUnicode {
+ public static void main(String[] args) throws UnsupportedEncodingException {
+ String str ="中国";
+ String utf8 = new String(str.getBytes(StandardCharsets.UTF_8));
+ System.out.println(utf8);
+
+ String unicode = new String (str.getBytes(), StandardCharsets.UTF_8);
+ System.out.println(unicode);
+ String gbk = new String(unicode.getBytes("GBK"));
+ System.out.println(gbk);
+ }
+}