summaryrefslogtreecommitdiff
path: root/src/main/java/cn
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2021-02-01 11:39:37 +0800
committerqidaijie <[email protected]>2021-02-01 11:39:37 +0800
commit896e99e3788fd1dc83c6d96eb645b0b519087b8b (patch)
tree7256fbdb61b0b70a2fb1b3b036dcac48882c1940 /src/main/java/cn
parent084f6731ba501097fe326e351b1d9b2b6afd8df9 (diff)
提交线上使用版本HEADmaster
Diffstat (limited to 'src/main/java/cn')
-rw-r--r--src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java169
-rw-r--r--src/main/java/cn/ac/iie/common/SubscriberConfig.java52
-rw-r--r--src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java10
-rw-r--r--src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java81
-rw-r--r--src/main/java/cn/ac/iie/topology/StormRunner.java3
-rw-r--r--src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java62
6 files changed, 372 insertions, 5 deletions
diff --git a/src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java b/src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java
new file mode 100644
index 0000000..af08f87
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java
@@ -0,0 +1,169 @@
+package cn.ac.iie.bolt;
+
+import cn.ac.iie.common.SubscriberConfig;
+import cn.ac.iie.utils.TupleUtils;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * @author qidaijie
+ */
+public class SubscriberIdBolt extends BaseBasicBolt {
+ private static Logger logger = Logger.getLogger(SubscriberIdBolt.class);
+ private static Map<String, String> subIdMap;
+ private List<Put> putList;
+ private static Connection connection;
+
+ static {
+ // 管理Hbase的配置信息
+ Configuration configuration = HBaseConfiguration.create();
+ // 设置zookeeper节点
+ configuration.set("hbase.zookeeper.quorum", SubscriberConfig.HBASE_ZOOKEEPER_SERVERS);
+ configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ configuration.set("hbase.client.retries.number", "3");
+ configuration.set("hbase.bulkload.retries.number", "3");
+ configuration.set("zookeeper.recovery.retry", "3");
+ try {
+ connection = ConnectionFactory.createConnection(configuration);
+ } catch (IOException e) {
+ logger.error("Subscriber写入HBase程序连接HBase异常");
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ subIdMap = new HashMap<>(3333334);
+ putList = new ArrayList<>();
+ getAll();
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ try {
+ if (TupleUtils.isTick(tuple)) {
+ insertData(putList);
+ } else {
+ String message = tuple.getString(0);
+ if (StringUtil.isNotBlank(message)) {
+ JSONObject jsonObject = JSONObject.parseObject(message);
+ if (jsonObject.containsKey(SubscriberConfig.PACKET_TYPE) && jsonObject.containsKey(SubscriberConfig.STATUS_TYPE)) {
+ if (SubscriberConfig.ACCOUNTING_REQUEST == jsonObject.getInteger(SubscriberConfig.PACKET_TYPE)
+ && SubscriberConfig.START_BILLING == jsonObject.getInteger(SubscriberConfig.STATUS_TYPE)) {
+ String framedIp = jsonObject.getString("radius_framed_ip");
+ String account = jsonObject.getString("radius_account");
+ dataValidation(framedIp, account, putList);
+ }
+ if (putList.size() == SubscriberConfig.LIST_SIZE_MAX) {
+ insertData(putList);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Radius 日志解析/更新HBase 失败!");
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<String, Object>(16);
+ conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
+ SubscriberConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+ return conf;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+ /**
+ * 获取所有的 key value
+ */
+ private static void getAll() {
+ try {
+ Table table = connection.getTable(TableName.valueOf("sub:" + SubscriberConfig.HBASE_TABLE_NAME));
+ Scan scan = new Scan();
+ ResultScanner scanner = table.getScanner(scan);
+ for (Result result : scanner) {
+ Cell[] cells = result.rawCells();
+ for (Cell cell : cells) {
+ subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
+ }
+ }
+ logger.warn("初始化内存数据成功--初始化map大小->(" + subIdMap.size() + ")");
+ scanner.close();
+ } catch (IOException e) {
+ logger.error("初始化内存数据异常");
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 写入数据到HBase
+ *
+ * @param putList puts list
+ */
+ private static void insertData(List<Put> putList) {
+ Table table = null;
+ try {
+ table = connection.getTable(TableName.valueOf("sub:" + SubscriberConfig.HBASE_TABLE_NAME));
+ table.put(putList);
+ putList.clear();
+ logger.warn("更新HBase成功,更新条数:" + putList.size());
+ } catch (IOException e) {
+ logger.error("更新数据写入HBase失败");
+ e.printStackTrace();
+ } finally {
+ try {
+ if (table != null) {
+ table.close();
+ }
+ } catch (IOException e) {
+ logger.error("HBase表关闭异常");
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ /**
+ * 验证数据并与内存中的对比
+ *
+ * @param ip framed_ip
+ * @param account account
+ */
+ private static void dataValidation(String ip, String account, List<Put> putList) {
+ if (subIdMap.containsKey(ip)) {
+ if (!subIdMap.get(ip).equals(account)) {
+ Put put = new Put(ip.getBytes());
+ put.addColumn("subscriber_id".getBytes(), "account".getBytes(), account.getBytes());
+ putList.add(put);
+ subIdMap.put(ip, account);
+ }
+ } else {
+ Put put = new Put(ip.getBytes());
+ put.addColumn("subscriber_id".getBytes(), "account".getBytes(), account.getBytes());
+ putList.add(put);
+ subIdMap.put(ip, account);
+ }
+ }
+}
diff --git a/src/main/java/cn/ac/iie/common/SubscriberConfig.java b/src/main/java/cn/ac/iie/common/SubscriberConfig.java
new file mode 100644
index 0000000..45040ee
--- /dev/null
+++ b/src/main/java/cn/ac/iie/common/SubscriberConfig.java
@@ -0,0 +1,52 @@
+package cn.ac.iie.common;
+
+import cn.ac.iie.utils.SubscriberConfigurations;
+
+import java.io.Serializable;
+
+/**
+ * @author qidaijie
+ */
+public class SubscriberConfig implements Serializable {
+ private static final long serialVersionUID = -8326385159484059324L;
+
+ public static final String SEGMENTATION = ",";
+ public static final int LIST_SIZE_MAX = 5000;
+ /**
+ * 4- Accounting-Request(账户授权)
+ */
+ public static final int ACCOUNTING_REQUEST = 4;
+ /**
+ * radius_packet_type
+ */
+ public static final String PACKET_TYPE = "radius_packet_type";
+ /**
+ * 1、开始计费
+ */
+ public static final int START_BILLING = 1;
+ /**
+ * radius_acct_status_type
+ */
+ public static final String STATUS_TYPE = "radius_acct_status_type";
+
+
+ /***
+ * kafka and system
+ */
+ public static final String BOOTSTRAP_SERVERS = SubscriberConfigurations.getStringProperty(0, "bootstrap.servers");
+ public static final Integer SPOUT_PARALLELISM = SubscriberConfigurations.getIntProperty(0, "spout.parallelism");
+ public static final Integer FORMAT_BOLT_PARALLELISM = SubscriberConfigurations.getIntProperty(0, "format.bolt.parallelism");
+ public static final String GROUP_ID = SubscriberConfigurations.getStringProperty(0, "group.id");
+ public static final String KAFKA_TOPIC = SubscriberConfigurations.getStringProperty(0, "kafka.topic");
+ public static final String AUTO_OFFSET_RESET = SubscriberConfigurations.getStringProperty(0, "auto.offset.reset");
+ public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = SubscriberConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs");
+ public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = SubscriberConfigurations.getIntProperty(0, "topology.config.max.spout.pending");
+ public static final Integer TOPOLOGY_NUM_ACKS = SubscriberConfigurations.getIntProperty(0, "topology.num.acks");
+
+ public static final Integer TOPOLOGY_WORKERS = SubscriberConfigurations.getIntProperty(0, "topology.workers");
+
+ public static final String CHECK_IP_SCOPE = SubscriberConfigurations.getStringProperty(0, "check.ip.scope");
+
+ public static final String HBASE_ZOOKEEPER_SERVERS = SubscriberConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
+ public static final String HBASE_TABLE_NAME = SubscriberConfigurations.getStringProperty(0, "hbase.table.name");
+} \ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
index 96c8330..6c9dadc 100644
--- a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
+++ b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
@@ -1,6 +1,6 @@
package cn.ac.iie.spout;
-import cn.ac.iie.common.AddressConfig;
+import cn.ac.iie.common.SubscriberConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -29,9 +29,9 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
private static Properties createConsumerConfig() {
Properties props = new Properties();
- props.put("bootstrap.servers", AddressConfig.BOOTSTRAP_SERVERS);
- props.put("group.id", AddressConfig.GROUP_ID);
- props.put("auto.offset.reset", AddressConfig.AUTO_OFFSET_RESET);
+ props.put("bootstrap.servers", SubscriberConfig.BOOTSTRAP_SERVERS);
+ props.put("group.id", SubscriberConfig.GROUP_ID);
+ props.put("auto.offset.reset", SubscriberConfig.AUTO_OFFSET_RESET);
props.put("session.timeout.ms", "60000");
props.put("max.poll.records", 3000);
props.put("max.partition.fetch.bytes", 31457280);
@@ -47,7 +47,7 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
this.context = context;
Properties prop = createConsumerConfig();
this.consumer = new KafkaConsumer<>(prop);
- this.consumer.subscribe(Arrays.asList(AddressConfig.KAFKA_TOPIC));
+ this.consumer.subscribe(Arrays.asList(SubscriberConfig.KAFKA_TOPIC));
}
@Override
diff --git a/src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java b/src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java
new file mode 100644
index 0000000..5a0c903
--- /dev/null
+++ b/src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java
@@ -0,0 +1,81 @@
+package cn.ac.iie.topology;
+
+import cn.ac.iie.bolt.SubscriberIdBolt;
+import cn.ac.iie.common.SubscriberConfig;
+import cn.ac.iie.spout.CustomizedKafkaSpout;
+import org.apache.storm.Config;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.topology.TopologyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author qidaijie
+ */
+public class LogSubscriberTopology {
+ private static final Logger logger = LoggerFactory.getLogger(LogSubscriberTopology.class);
+ private final String topologyName;
+ private final Config topologyConfig;
+ private TopologyBuilder builder;
+
+ public LogSubscriberTopology() {
+ this(LogSubscriberTopology.class.getSimpleName());
+ }
+
+ public LogSubscriberTopology(String topologyName) {
+ this.topologyName = topologyName;
+ topologyConfig = createTopologConfig();
+ }
+
+ private Config createTopologConfig() {
+ Config conf = new Config();
+ conf.setDebug(false);
+ conf.setMessageTimeoutSecs(120);
+ conf.setTopologyWorkerMaxHeapSize(500);
+ conf.setMaxSpoutPending(SubscriberConfig.TOPOLOGY_CONFIG_MAX_SPOUT_PENDING);
+ if (SubscriberConfig.TOPOLOGY_NUM_ACKS == 0) {
+ conf.setNumAckers(0);
+ }
+ return conf;
+ }
+
+ public void runLocally() throws InterruptedException {
+ topologyConfig.setMaxTaskParallelism(1);
+ StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600);
+ }
+
+ public void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+ topologyConfig.setNumWorkers(SubscriberConfig.TOPOLOGY_WORKERS);
+ //设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌
+ topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8);
+ StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
+ }
+
+ private void buildTopology() {
+ builder = new TopologyBuilder();
+ builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), SubscriberConfig.SPOUT_PARALLELISM);
+ builder.setBolt("SubscriberIdBolt", new SubscriberIdBolt(), SubscriberConfig.FORMAT_BOLT_PARALLELISM).localOrShuffleGrouping("CustomizedKafkaSpout");
+ }
+
+ public static void main(String[] args) throws Exception {
+ LogSubscriberTopology csst = null;
+ boolean runLocally = true;
+ if (args.length >= 2 && "remote".equalsIgnoreCase(args[1])) {
+ runLocally = false;
+ csst = new LogSubscriberTopology(args[0]);
+ } else {
+ csst = new LogSubscriberTopology();
+ }
+ csst.buildTopology();
+
+ if (runLocally) {
+ logger.info("执行本地模式...");
+ csst.runLocally();
+ } else {
+ logger.info("执行远程部署模式...");
+ csst.runRemotely();
+ }
+ }
+}
diff --git a/src/main/java/cn/ac/iie/topology/StormRunner.java b/src/main/java/cn/ac/iie/topology/StormRunner.java
index d2d4ab9..4d5be53 100644
--- a/src/main/java/cn/ac/iie/topology/StormRunner.java
+++ b/src/main/java/cn/ac/iie/topology/StormRunner.java
@@ -9,6 +9,9 @@ import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
+/**
+ * @author qidaijie
+ */
public final class StormRunner{
private static final int MILLS_IN_SEC = 1000;
diff --git a/src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java b/src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java
new file mode 100644
index 0000000..95dd847
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java
@@ -0,0 +1,62 @@
+package cn.ac.iie.utils;
+
+import java.util.Properties;
+
+
+
+public final class SubscriberConfigurations {
+
+ private static Properties propCommon = new Properties();
+// private static Properties propService = new Properties();
+
+
+ public static String getStringProperty(Integer type, String key) {
+ if (type == 0) {
+ return propCommon.getProperty(key);
+// } else if (type == 1) {
+// return propService.getProperty(key);
+ } else {
+ return null;
+ }
+ }
+
+
+ public static Integer getIntProperty(Integer type, String key) {
+ if (type == 0) {
+ return Integer.parseInt(propCommon.getProperty(key));
+// } else if (type == 1) {
+// return Integer.parseInt(propService.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Long getLongProperty(Integer type, String key) {
+ if (type == 0) {
+ return Long.parseLong(propCommon.getProperty(key));
+// } else if (type == 1) {
+// return Long.parseLong(propService.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Boolean getBooleanProperty(Integer type, String key) {
+ if (type == 0) {
+ return "true".equals(propCommon.getProperty(key).toLowerCase().trim());
+// } else if (type == 1) {
+// return propService.getProperty(key).toLowerCase().trim().equals("true");
+ } else {
+ return null;
+ }
+ }
+
+ static {
+ try {
+ propCommon.load(SubscriberConfigurations.class.getClassLoader().getResourceAsStream("subscriber-config.properties"));
+ } catch (Exception e) {
+ propCommon = null;
+ System.err.println("配置加载失败");
+ }
+ }
+}