diff options
| author | wangchengcheng <[email protected]> | 2022-08-11 14:29:07 +0800 |
|---|---|---|
| committer | wangchengcheng <[email protected]> | 2022-08-11 14:29:07 +0800 |
| commit | f64d73fc4ceb95a231348630d02670e4a10377db (patch) | |
| tree | fcae686a8c2e73c675abda036a5c979b3ed8829d /src | |
| parent | 37975911b7a358a4510abdcf995082a9bce15aa9 (diff) | |
2.修改getDy()函数业务逻辑。
Diffstat (limited to 'src')
| -rw-r--r-- | src/main/java/cn/ac/iie/storm/spout/ConnectionKafkaSpout.java | 74 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/storm/spout/SessionKafkaSpout.java | 71 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/storm/spout/TransactionKafkaSpout.java | 71 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/storm/utils/general/GetMap.java | 195 | ||||
| -rw-r--r-- | src/test/java/TestUFT.java | 10 | ||||
| -rw-r--r-- | src/test/java/TestUnicode.java | 15 |
6 files changed, 436 insertions, 0 deletions
diff --git a/src/main/java/cn/ac/iie/storm/spout/ConnectionKafkaSpout.java b/src/main/java/cn/ac/iie/storm/spout/ConnectionKafkaSpout.java new file mode 100644 index 0000000..e774bd1 --- /dev/null +++ b/src/main/java/cn/ac/iie/storm/spout/ConnectionKafkaSpout.java @@ -0,0 +1,74 @@ +package cn.ac.iie.storm.spout; + +import cn.ac.iie.storm.utils.config.StreamAggregateConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.log4j.Logger; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.util.Collections; +import java.util.Map; +import java.util.Properties; + +/** + * kafkaSpout + */ +public class ConnectionKafkaSpout extends BaseRichSpout { + private static final long serialVersionUID = -3363788553406229592L; + private KafkaConsumer<String, String> consumer; + private SpoutOutputCollector collector = null; + private TopologyContext context = null; + private final static Logger logger = Logger.getLogger(ConnectionKafkaSpout.class); + + private static Properties createConsumerConfig() { + Properties props = new Properties(); + props.put("bootstrap.servers", StreamAggregateConfig.BOOTSTRAP_SERVERS); + props.put("group.id", StreamAggregateConfig.GROUP_ID); + props.put("session.timeout.ms", "60000"); + props.put("max.poll.records", 3000); + props.put("max.partition.fetch.bytes", 31457280); + props.put("auto.offset.reset", StreamAggregateConfig.AUTO_OFFSET_RESET); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + logger.info("链接kafka"+StreamAggregateConfig.BOOTSTRAP_SERVERS+"成功"); + return props; + } + + @Override + public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { + this.collector=spoutOutputCollector; + this.context=topologyContext; + Properties prop = createConsumerConfig(); + this.consumer = new KafkaConsumer<>(prop); + this.consumer.subscribe(Collections.singletonList(StreamAggregateConfig.KAFKA_TOPIC)); + } + @Override + public void close() { + consumer.close(); + } + @Override + public void nextTuple() { + try { + ConsumerRecords<String, String> records = consumer.poll(StreamAggregateConfig.KAFKA_COMSUMER_POLL); + Thread.sleep(StreamAggregateConfig.TOPOLOGY_SPOUT_SLEEP_TIME); + for (ConsumerRecord<String, String> record : records) { + this.collector.emit(new Values(record.value())); + } + } catch (Exception e) { + logger.error("KafkaSpout发送消息出现异常!", e); + e.printStackTrace(); + } + } + + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("flooding-source")); + } +} diff --git a/src/main/java/cn/ac/iie/storm/spout/SessionKafkaSpout.java b/src/main/java/cn/ac/iie/storm/spout/SessionKafkaSpout.java new file mode 100644 index 0000000..081c3a0 --- /dev/null +++ b/src/main/java/cn/ac/iie/storm/spout/SessionKafkaSpout.java @@ -0,0 +1,71 @@ +package cn.ac.iie.storm.spout; + +import cn.ac.iie.storm.utils.config.StreamAggregateConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.log4j.Logger; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.util.Collections; +import java.util.Map; +import java.util.Properties; + +public class SessionKafkaSpout extends BaseRichSpout { + private static final long serialVersionUID = -3363788553406229592L; + private KafkaConsumer<String, String> consumer; + private SpoutOutputCollector collector = null; + private TopologyContext context = null; + private final static Logger logger = Logger.getLogger(ConnectionKafkaSpout.class); + + private static Properties createConsumerConfig() { + Properties props = new Properties(); + props.put("bootstrap.servers", StreamAggregateConfig.BOOTSTRAP_SERVERS); + props.put("group.id", StreamAggregateConfig.GROUP_ID); + props.put("session.timeout.ms", "60000"); + props.put("max.poll.records", 3000); + props.put("max.partition.fetch.bytes", 31457280); + props.put("auto.offset.reset", StreamAggregateConfig.AUTO_OFFSET_RESET); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + logger.info("链接kafka"+StreamAggregateConfig.BOOTSTRAP_SERVERS+"成功"); + return props; + } + + @Override + public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { + this.collector=spoutOutputCollector; + this.context=topologyContext; + Properties prop = createConsumerConfig(); + this.consumer = new KafkaConsumer<>(prop); + this.consumer.subscribe(Collections.singletonList(StreamAggregateConfig.KAFKA_TOPIC_SESSION)); + } + @Override + public void close() { + consumer.close(); + } + @Override + public void nextTuple() { + try { + ConsumerRecords<String, String> records = consumer.poll(StreamAggregateConfig.KAFKA_COMSUMER_POLL); + Thread.sleep(StreamAggregateConfig.TOPOLOGY_SPOUT_SLEEP_TIME); + for (ConsumerRecord<String, String> record : records) { + this.collector.emit(new Values(record.value())); + } + } catch (Exception e) { + logger.error("KafkaSpout发送消息出现异常!", e); + e.printStackTrace(); + } + } + + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("session-source")); + } +} diff --git a/src/main/java/cn/ac/iie/storm/spout/TransactionKafkaSpout.java b/src/main/java/cn/ac/iie/storm/spout/TransactionKafkaSpout.java new file mode 100644 index 0000000..6fbdde0 --- /dev/null +++ b/src/main/java/cn/ac/iie/storm/spout/TransactionKafkaSpout.java @@ -0,0 +1,71 @@ +package cn.ac.iie.storm.spout; + +import cn.ac.iie.storm.utils.config.StreamAggregateConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.log4j.Logger; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.util.Collections; +import java.util.Map; +import java.util.Properties; + +public class TransactionKafkaSpout extends BaseRichSpout { + private static final long serialVersionUID = -3363788553406229592L; + private KafkaConsumer<String, String> consumer; + private SpoutOutputCollector collector = null; + private TopologyContext context = null; + private final static Logger logger = Logger.getLogger(ConnectionKafkaSpout.class); + + private static Properties createConsumerConfig() { + Properties props = new Properties(); + props.put("bootstrap.servers", StreamAggregateConfig.BOOTSTRAP_SERVERS); + props.put("group.id", StreamAggregateConfig.GROUP_ID); + props.put("session.timeout.ms", "60000"); + props.put("max.poll.records", 3000); + props.put("max.partition.fetch.bytes", 31457280); + props.put("auto.offset.reset", StreamAggregateConfig.AUTO_OFFSET_RESET); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + logger.info("链接kafka"+StreamAggregateConfig.BOOTSTRAP_SERVERS+"成功"); + return props; + } + + @Override + public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { + this.collector=spoutOutputCollector; + this.context=topologyContext; + Properties prop = createConsumerConfig(); + this.consumer = new KafkaConsumer<>(prop); + this.consumer.subscribe(Collections.singletonList(StreamAggregateConfig.KAFKA_TOPIC_TRANSACTION)); + } + @Override + public void close() { + consumer.close(); + } + @Override + public void nextTuple() { + try { + ConsumerRecords<String, String> records = consumer.poll(StreamAggregateConfig.KAFKA_COMSUMER_POLL); + Thread.sleep(StreamAggregateConfig.TOPOLOGY_SPOUT_SLEEP_TIME); + for (ConsumerRecord<String, String> record : records) { + this.collector.emit(new Values(record.value())); + } + } catch (Exception e) { + logger.error("KafkaSpout发送消息出现异常!", e); + e.printStackTrace(); + } + } + + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("transaction-source")); + } +} diff --git a/src/main/java/cn/ac/iie/storm/utils/general/GetMap.java b/src/main/java/cn/ac/iie/storm/utils/general/GetMap.java new file mode 100644 index 0000000..e4c2494 --- /dev/null +++ b/src/main/java/cn/ac/iie/storm/utils/general/GetMap.java @@ -0,0 +1,195 @@ +package cn.ac.iie.storm.utils.general; + +import com.zdjizhi.utils.StringUtil; +import org.apache.logging.log4j.util.PropertiesUtil; + +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +public class GetMap { + private static Properties propService = new Properties(); + private static Properties propService1 = new Properties(); + + + private static Properties propService_city = new Properties(); + private static Properties propService_province = new Properties(); + private static Properties propService_country = new Properties(); + private static Properties propService_jstx = new Properties(); + + + private static Map<String, String> labelMap; + private static Map<String, String> appMap; + private static Map<String, String> cityMap; + private static Map<String, String> provinceMap; + private static Map<String, String> countryMap; + private static Map<String, String> jstxIdMap; + + + + public static String getJstxDetail_id(int common_app_id){ + String appId = String.valueOf(common_app_id); + if (appMap.get(appId) != null) { + return appMap.get(appId); + } else { + return appId; + } + } + + public static String getProto_id(String common_schema_type){ + return labelMap.get(common_schema_type); + } + + public static String getDy(String location){ + String[] split; + if(location==null){ + return "0"; + } else if (location.contains("..")){ + + //处理新疆省内location逻辑。 + split = location.split("\\.\\."); + if (split.length == 2) { + String xjCity = split[0]; + if (xjCity.length()<1) { + String xjCounty = split[1]; + if (xjCounty == null) { + xjCounty = "0"; + } + return xjCounty; + } + return xjCity; + }else { + return "0"; + } + } else { + //处理新疆省外location逻辑 + //消除location中的空格 + String address = location.replace(" ",""); + String thirdLevelAdd = cityMap.get(address); + if (StringUtil.isNotBlank(thirdLevelAdd)){ + System.out.println("执行的这里"); + return thirdLevelAdd; + }else { + String[] add = address.split("\\."); + if (StringUtil.isNotBlank( provinceMap.get(add[1]))){ + return provinceMap.get(add[1]); + }else if (StringUtil.isNotBlank(countryMap.get(add[0]))){ + return countryMap.get(add[0]); + }else { + return "0"; + } + } + } + } + + + public static String getFjSzd(String location,String mail_from,String mail_protocol_type) { + if (mail_from != null&& mail_protocol_type.equals("SMTP")) { + String[] split; + if (location == null) { + return String.valueOf(3); + } else if (location.contains(",,")) { + return String.valueOf(1); + } else { + split = location.split(","); + if (split.length == 3) { + String country = split[2].replace(" ", ""); + String s = countryMap.get(country); + if (country.equals("China")) { + return String.valueOf(1); + } else if (s != null) { + return String.valueOf(2); + } else { + return String.valueOf(3); + } + } else { + return String.valueOf(3); + } + } + }else { + return String.valueOf(3); + } + } + + + public static String getSjSzd(String location,String mail_to,String mail_protocol_type) { + if (mail_to != null&&(mail_protocol_type.equals("POP3")||mail_protocol_type.equals("IMAP"))) { + String[] split; + if (location == null) { + return String.valueOf(3); + } else if (location.contains(",,")) { + return String.valueOf(1); + } else { + split = location.split(","); + if (split.length == 3) { + String country = split[2].replace(" ", ""); + String s = countryMap.get(country); + if (country.equals("China")) { + return String.valueOf(1); + } else if (s != null) { + return String.valueOf(2); + } else { + return String.valueOf(3); + } + } else { + return String.valueOf(3); + } + } + }else { + return String.valueOf(3); + } + } + + + + public static String getDns_name(String dns_qname){ + if (dns_qname==null){ + return ""; + }else if (dns_qname.contains(" ")){ + return ""; + }else { + return dns_qname; + } + } + + + static { + try { + InputStream applabel = PropertiesUtil.class.getClassLoader().getResourceAsStream("common_app_label_coding.properties"); + InputStream app = PropertiesUtil.class.getClassLoader().getResourceAsStream("common_app_id_coding.properties"); + InputStream jstxid = PropertiesUtil.class.getClassLoader().getResourceAsStream("common_jstx_id_coding.properties"); + + InputStream cityNumbering = PropertiesUtil.class.getClassLoader().getResourceAsStream("common_city_coding.properties"); + InputStream provinceNumbering = PropertiesUtil.class.getClassLoader().getResourceAsStream("common_province_coding.properties"); + InputStream countryNumbering = PropertiesUtil.class.getClassLoader().getResourceAsStream("common_country_coding.properties"); + + + propService1.load(app); + propService.load(applabel); + propService_jstx.load(jstxid); + + propService_city.load(cityNumbering); + propService_province.load(provinceNumbering); + propService_country.load(countryNumbering); + + + appMap = new HashMap<String, String>((Map)propService1); + labelMap = new HashMap<String, String>((Map) propService); + jstxIdMap = new HashMap<String, String>((Map) propService_jstx); + cityMap = new HashMap<String, String>((Map) propService_city); + provinceMap = new HashMap<String, String>((Map) propService_province); + countryMap = new HashMap<String, String>((Map) propService_country); + } catch (Exception e) { +// propCommon = null; + propService = null; + propService1 = null; + propService_jstx = null; + propService_city = null; + propService_province = null; + propService_country = null; + } + } + + +} diff --git a/src/test/java/TestUFT.java b/src/test/java/TestUFT.java new file mode 100644 index 0000000..b0cba09 --- /dev/null +++ b/src/test/java/TestUFT.java @@ -0,0 +1,10 @@ +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; + +public class TestUFT { + public static void main(String[] args) throws UnsupportedEncodingException { + String str = "�й�"; + System.out.println(new String( str.getBytes("GBK") , StandardCharsets.UTF_8)); + } + +} diff --git a/src/test/java/TestUnicode.java b/src/test/java/TestUnicode.java new file mode 100644 index 0000000..a623898 --- /dev/null +++ b/src/test/java/TestUnicode.java @@ -0,0 +1,15 @@ +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; + +public class TestUnicode { + public static void main(String[] args) throws UnsupportedEncodingException { + String str ="中国"; + String utf8 = new String(str.getBytes(StandardCharsets.UTF_8)); + System.out.println(utf8); + + String unicode = new String (str.getBytes(), StandardCharsets.UTF_8); + System.out.println(unicode); + String gbk = new String(unicode.getBytes("GBK")); + System.out.println(gbk); + } +} |
