summaryrefslogtreecommitdiff
path: root/src/main/java/cn/ac
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/cn/ac')
-rw-r--r--src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java65
-rw-r--r--src/main/java/cn/ac/iie/common/SubscriberConfig.java36
-rw-r--r--src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java80
-rw-r--r--src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java81
-rw-r--r--src/main/java/cn/ac/iie/topology/StormRunner.java35
-rw-r--r--src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java62
-rw-r--r--src/main/java/cn/ac/iie/utils/TupleUtils.java21
-rw-r--r--src/main/java/cn/ac/iie/utils/hbase/PullHBaseUtils.java138
-rw-r--r--src/main/java/cn/ac/iie/utils/hbase/PushHBaseUtils.java93
9 files changed, 611 insertions, 0 deletions
diff --git a/src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java b/src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java
new file mode 100644
index 0000000..9f7598c
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java
@@ -0,0 +1,65 @@
+package cn.ac.iie.bolt;
+
+import cn.ac.iie.common.SubscriberConfig;
+import cn.ac.iie.utils.TupleUtils;
+import cn.ac.iie.utils.hbase.PullHBaseUtils;
+import cn.ac.iie.utils.hbase.PushHBaseUtils;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.log4j.Logger;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.*;
+
+/**
+ * @author qidaijie
+ */
+public class SubscriberIdBolt extends BaseBasicBolt {
+ private static Logger logger = Logger.getLogger(SubscriberIdBolt.class);
+ private static Map<String, String> subIdMap;
+ private List<Put> putList;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ subIdMap = new HashMap<>(83334);
+ putList = new ArrayList<>();
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ try {
+ if (TupleUtils.isTick(tuple)) {
+ PullHBaseUtils.change(subIdMap);
+ for (String key : subIdMap.keySet()) {
+ Put put = new Put(key.getBytes());
+ put.addColumn("subscriber_id".getBytes(), "account".getBytes(), subIdMap.get(key).getBytes());
+ putList.add(put);
+ }
+ PushHBaseUtils.insertData(putList);
+ putList.clear();
+ subIdMap.clear();
+ } else {
+ logger.warn(tuple.getString(0));
+ }
+ } catch (Exception e) {
+ logger.error("获取国家中心HBase Radius 写入分中心 HBase 异常");
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<String, Object>(16);
+ conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
+ SubscriberConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+ return conf;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+}
diff --git a/src/main/java/cn/ac/iie/common/SubscriberConfig.java b/src/main/java/cn/ac/iie/common/SubscriberConfig.java
new file mode 100644
index 0000000..5157e01
--- /dev/null
+++ b/src/main/java/cn/ac/iie/common/SubscriberConfig.java
@@ -0,0 +1,36 @@
+package cn.ac.iie.common;
+
+import cn.ac.iie.utils.SubscriberConfigurations;
+
+import java.io.Serializable;
+
+/**
+ * @author qidaijie
+ */
+public class SubscriberConfig implements Serializable {
+ private static final long serialVersionUID = -8326385159484059324L;
+
+ public static final int LIST_SIZE_MAX = 5000;
+
+ /***
+ * kafka and system
+ */
+ public static final String BOOTSTRAP_SERVERS = SubscriberConfigurations.getStringProperty(0, "bootstrap.servers");
+ public static final Integer SPOUT_PARALLELISM = SubscriberConfigurations.getIntProperty(0, "spout.parallelism");
+ public static final Integer FORMAT_BOLT_PARALLELISM = SubscriberConfigurations.getIntProperty(0, "format.bolt.parallelism");
+ public static final String GROUP_ID = SubscriberConfigurations.getStringProperty(0, "group.id");
+ public static final String KAFKA_TOPIC = SubscriberConfigurations.getStringProperty(0, "kafka.topic");
+ public static final String AUTO_OFFSET_RESET = SubscriberConfigurations.getStringProperty(0, "auto.offset.reset");
+ public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = SubscriberConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs");
+ public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = SubscriberConfigurations.getIntProperty(0, "topology.config.max.spout.pending");
+ public static final Integer TOPOLOGY_NUM_ACKS = SubscriberConfigurations.getIntProperty(0, "topology.num.acks");
+
+ public static final Integer TOPOLOGY_WORKERS = SubscriberConfigurations.getIntProperty(0, "topology.workers");
+
+ public static final String CHECK_IP_SCOPE = SubscriberConfigurations.getStringProperty(0, "check.ip.scope");
+
+ public static final String CENTER_HBASE_ZOOKEEPER_SERVERS = SubscriberConfigurations.getStringProperty(0, "center.hbase.zookeeper.servers");
+ public static final String HBASE_TABLE_NAME = SubscriberConfigurations.getStringProperty(0, "hbase.table.name");
+
+ public static final String NATIONAL_HBASE_ZOOKEEPER_SERVERS = SubscriberConfigurations.getStringProperty(0, "national.hbase.zookeeper.servers");
+} \ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
new file mode 100644
index 0000000..5d4619a
--- /dev/null
+++ b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
@@ -0,0 +1,80 @@
+package cn.ac.iie.spout;
+
+import cn.ac.iie.common.SubscriberConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.log4j.Logger;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ */
+public class CustomizedKafkaSpout extends BaseRichSpout {
+ private static final long serialVersionUID = 2934528972182398950L;
+ private KafkaConsumer<String, String> consumer;
+ private SpoutOutputCollector collector = null;
+ private TopologyContext context = null;
+ private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class);
+
+
+ private static Properties createConsumerConfig() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", SubscriberConfig.BOOTSTRAP_SERVERS);
+ props.put("group.id", SubscriberConfig.GROUP_ID);
+ props.put("auto.offset.reset", SubscriberConfig.AUTO_OFFSET_RESET);
+ props.put("session.timeout.ms", "60000");
+ props.put("max.poll.records", 3000);
+ props.put("max.partition.fetch.bytes", 31457280);
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ return props;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ // TODO Auto-generated method stub
+ this.collector = collector;
+ this.context = context;
+ Properties prop = createConsumerConfig();
+ this.consumer = new KafkaConsumer<>(prop);
+ this.consumer.subscribe(Arrays.asList(SubscriberConfig.KAFKA_TOPIC));
+ }
+
+ @Override
+ public void close() {
+ consumer.close();
+ }
+
+ @Override
+ public void nextTuple() {
+ try {
+ // TODO Auto-generated method stub
+ ConsumerRecords<String, String> records = consumer.poll(10000L);
+ Thread.sleep(300);
+ for (ConsumerRecord<String, String> record : records) {
+ this.collector.emit(new Values(record.value()));
+ }
+ } catch (Exception e) {
+ logger.error("kfaka-spout 发送数据出现异常" + e);
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // TODO Auto-generated method stub
+ declarer.declare(new Fields("source"));
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java b/src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java
new file mode 100644
index 0000000..5a0c903
--- /dev/null
+++ b/src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java
@@ -0,0 +1,81 @@
+package cn.ac.iie.topology;
+
+import cn.ac.iie.bolt.SubscriberIdBolt;
+import cn.ac.iie.common.SubscriberConfig;
+import cn.ac.iie.spout.CustomizedKafkaSpout;
+import org.apache.storm.Config;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.topology.TopologyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author qidaijie
+ */
+public class LogSubscriberTopology {
+ private static final Logger logger = LoggerFactory.getLogger(LogSubscriberTopology.class);
+ private final String topologyName;
+ private final Config topologyConfig;
+ private TopologyBuilder builder;
+
+ public LogSubscriberTopology() {
+ this(LogSubscriberTopology.class.getSimpleName());
+ }
+
+ public LogSubscriberTopology(String topologyName) {
+ this.topologyName = topologyName;
+ topologyConfig = createTopologConfig();
+ }
+
+ private Config createTopologConfig() {
+ Config conf = new Config();
+ conf.setDebug(false);
+ conf.setMessageTimeoutSecs(120);
+ conf.setTopologyWorkerMaxHeapSize(500);
+ conf.setMaxSpoutPending(SubscriberConfig.TOPOLOGY_CONFIG_MAX_SPOUT_PENDING);
+ if (SubscriberConfig.TOPOLOGY_NUM_ACKS == 0) {
+ conf.setNumAckers(0);
+ }
+ return conf;
+ }
+
+ public void runLocally() throws InterruptedException {
+ topologyConfig.setMaxTaskParallelism(1);
+ StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600);
+ }
+
+ public void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+ topologyConfig.setNumWorkers(SubscriberConfig.TOPOLOGY_WORKERS);
+ //设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌
+ topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8);
+ StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
+ }
+
+ private void buildTopology() {
+ builder = new TopologyBuilder();
+ builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), SubscriberConfig.SPOUT_PARALLELISM);
+ builder.setBolt("SubscriberIdBolt", new SubscriberIdBolt(), SubscriberConfig.FORMAT_BOLT_PARALLELISM).localOrShuffleGrouping("CustomizedKafkaSpout");
+ }
+
+ public static void main(String[] args) throws Exception {
+ LogSubscriberTopology csst = null;
+ boolean runLocally = true;
+ if (args.length >= 2 && "remote".equalsIgnoreCase(args[1])) {
+ runLocally = false;
+ csst = new LogSubscriberTopology(args[0]);
+ } else {
+ csst = new LogSubscriberTopology();
+ }
+ csst.buildTopology();
+
+ if (runLocally) {
+ logger.info("执行本地模式...");
+ csst.runLocally();
+ } else {
+ logger.info("执行远程部署模式...");
+ csst.runRemotely();
+ }
+ }
+}
diff --git a/src/main/java/cn/ac/iie/topology/StormRunner.java b/src/main/java/cn/ac/iie/topology/StormRunner.java
new file mode 100644
index 0000000..4d5be53
--- /dev/null
+++ b/src/main/java/cn/ac/iie/topology/StormRunner.java
@@ -0,0 +1,35 @@
+package cn.ac.iie.topology;
+
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.topology.TopologyBuilder;
+
+/**
+ * @author qidaijie
+ */
+public final class StormRunner{
+ private static final int MILLS_IN_SEC = 1000;
+
+ private StormRunner() {}
+
+ public static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException {
+
+ LocalCluster localCluster = new LocalCluster();
+ localCluster.submitTopology(topologyName, conf, builder.createTopology());
+ Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC);
+ localCluster.shutdown();
+
+ }
+
+ public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+
+ StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
+ }
+
+
+}
diff --git a/src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java b/src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java
new file mode 100644
index 0000000..95dd847
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java
@@ -0,0 +1,62 @@
+package cn.ac.iie.utils;
+
+import java.util.Properties;
+
+
+
+public final class SubscriberConfigurations {
+
+ private static Properties propCommon = new Properties();
+// private static Properties propService = new Properties();
+
+
+ public static String getStringProperty(Integer type, String key) {
+ if (type == 0) {
+ return propCommon.getProperty(key);
+// } else if (type == 1) {
+// return propService.getProperty(key);
+ } else {
+ return null;
+ }
+ }
+
+
+ public static Integer getIntProperty(Integer type, String key) {
+ if (type == 0) {
+ return Integer.parseInt(propCommon.getProperty(key));
+// } else if (type == 1) {
+// return Integer.parseInt(propService.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Long getLongProperty(Integer type, String key) {
+ if (type == 0) {
+ return Long.parseLong(propCommon.getProperty(key));
+// } else if (type == 1) {
+// return Long.parseLong(propService.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Boolean getBooleanProperty(Integer type, String key) {
+ if (type == 0) {
+ return "true".equals(propCommon.getProperty(key).toLowerCase().trim());
+// } else if (type == 1) {
+// return propService.getProperty(key).toLowerCase().trim().equals("true");
+ } else {
+ return null;
+ }
+ }
+
+ static {
+ try {
+ propCommon.load(SubscriberConfigurations.class.getClassLoader().getResourceAsStream("subscriber-config.properties"));
+ } catch (Exception e) {
+ propCommon = null;
+ System.err.println("配置加载失败");
+ }
+ }
+}
diff --git a/src/main/java/cn/ac/iie/utils/TupleUtils.java b/src/main/java/cn/ac/iie/utils/TupleUtils.java
new file mode 100644
index 0000000..c0dc410
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/TupleUtils.java
@@ -0,0 +1,21 @@
+package cn.ac.iie.utils;
+
+import org.apache.storm.Constants;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * @author Administrator
+ */
+public final class TupleUtils {
+ /**
+ * 判断是否系统自动发送的Tuple
+ *
+ * @param tuple
+ * @return
+ */
+ public static boolean isTick(Tuple tuple) {
+ return tuple != null
+ && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
+ && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
+ }
+}
diff --git a/src/main/java/cn/ac/iie/utils/hbase/PullHBaseUtils.java b/src/main/java/cn/ac/iie/utils/hbase/PullHBaseUtils.java
new file mode 100644
index 0000000..f464d95
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/hbase/PullHBaseUtils.java
@@ -0,0 +1,138 @@
+package cn.ac.iie.utils.hbase;
+
+import cn.ac.iie.common.SubscriberConfig;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * HBase 工具类
+ *
+ * @author qidaijie
+ */
+
+public class PullHBaseUtils {
+ private final static Logger logger = Logger.getLogger(PullHBaseUtils.class);
+ private static Map<String, String> subIdMap = new HashMap<>(83334);
+ private static Connection connection;
+ private static Long time;
+
+ private static String zookeeperIp;
+ private static String hBaseTable;
+
+ private static PullHBaseUtils pullHBaseUtils;
+
+ private static void getHbaseInstance() {
+ pullHBaseUtils = new PullHBaseUtils();
+ }
+
+
+ /**
+ * 构造函数-新
+ */
+ private PullHBaseUtils() {
+ zookeeperIp = SubscriberConfig.NATIONAL_HBASE_ZOOKEEPER_SERVERS;
+ hBaseTable = SubscriberConfig.HBASE_TABLE_NAME;
+ //获取连接
+ getHbaseConn();
+ }
+
+ private static void getHbaseConn() {
+ try {
+ // 管理Hbase的配置信息
+ Configuration configuration = HBaseConfiguration.create();
+ // 设置zookeeper节点
+ configuration.set("hbase.zookeeper.quorum", zookeeperIp);
+ configuration.set("hbase.client.retries.number", "3");
+ configuration.set("hbase.bulkload.retries.number", "3");
+ configuration.set("zookeeper.recovery.retry", "3");
+ connection = ConnectionFactory.createConnection(configuration);
+ time = System.currentTimeMillis();
+ logger.warn("PullHBaseUtils get HBase connection,now to getAll().");
+ } catch (IOException ioe) {
+ logger.error("PullHBaseUtils getHbaseConn() IOException===>{" + ioe + "}<===");
+ ioe.printStackTrace();
+ } catch (Exception e) {
+ logger.error("PullHBaseUtils getHbaseConn() Exception===>{" + e + "}<===");
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 更新变量
+ */
+ public static void change(Map<String, String> hashMap) {
+ if (pullHBaseUtils == null) {
+ getHbaseInstance();
+ }
+ long nowTime = System.currentTimeMillis();
+ timestampsFilter(time - 1000, nowTime + 500);
+ hashMap.putAll(subIdMap);
+ subIdMap.clear();
+ }
+
+
+ /**
+ * 获取变更内容
+ *
+ * @param startTime 开始时间
+ * @param endTime 结束时间
+ */
+ private static void timestampsFilter(Long startTime, Long endTime) {
+ Long begin = System.currentTimeMillis();
+ Table table = null;
+ ResultScanner scanner = null;
+ Scan scan2 = new Scan();
+ try {
+ table = connection.getTable(TableName.valueOf("sub:" + hBaseTable));
+ scan2.setTimeRange(startTime, endTime);
+ scanner = table.getScanner(scan2);
+ for (Result result : scanner) {
+ Cell[] cells = result.rawCells();
+ for (Cell cell : cells) {
+ String key = Bytes.toString(CellUtil.cloneRow(cell));
+ String value = Bytes.toString(CellUtil.cloneValue(cell));
+ if (subIdMap.containsKey(key)) {
+ if (!value.equals(subIdMap.get(key))) {
+ subIdMap.put(key, value);
+ }
+ } else {
+ subIdMap.put(key, value);
+ }
+ }
+ }
+ Long end = System.currentTimeMillis();
+ logger.warn("PullHBaseUtils Now subIdMap.keySet().size() is: " + subIdMap.keySet().size());
+ logger.warn("PullHBaseUtils Update cache timeConsuming is: " + (end - begin) + ",BeginTime: " + begin + ",EndTime: " + end);
+ time = endTime;
+ } catch (IOException ioe) {
+ logger.error("PullHBaseUtils timestampsFilter is IOException===>{" + ioe + "}<===");
+ ioe.printStackTrace();
+ } catch (Exception e) {
+ logger.error("PullHBaseUtils timestampsFilter is Exception===>{" + e + "}<===");
+ e.printStackTrace();
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ if (table != null) {
+ try {
+ table.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+
+}
diff --git a/src/main/java/cn/ac/iie/utils/hbase/PushHBaseUtils.java b/src/main/java/cn/ac/iie/utils/hbase/PushHBaseUtils.java
new file mode 100644
index 0000000..1e98181
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/hbase/PushHBaseUtils.java
@@ -0,0 +1,93 @@
+package cn.ac.iie.utils.hbase;
+
+import cn.ac.iie.common.SubscriberConfig;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HBase 工具类
+ *
+ * @author qidaijie
+ */
+
+public class PushHBaseUtils {
+ private final static Logger logger = Logger.getLogger(PushHBaseUtils.class);
+ private static Connection connection;
+ private static String zookeeperIp;
+ private static PushHBaseUtils pushHBaseUtils;
+
+ private static void getHbaseInstance() {
+ pushHBaseUtils = new PushHBaseUtils();
+ }
+
+ /**
+ * 构造函数-新
+ */
+ private PushHBaseUtils() {
+ zookeeperIp = SubscriberConfig.CENTER_HBASE_ZOOKEEPER_SERVERS;
+ //获取连接
+ getHbaseConn();
+ }
+
+ private static void getHbaseConn() {
+ try {
+ // 管理Hbase的配置信息
+ Configuration configuration = HBaseConfiguration.create();
+ // 设置zookeeper节点
+ configuration.set("hbase.zookeeper.quorum", zookeeperIp);
+ configuration.set("hbase.client.retries.number", "3");
+ configuration.set("hbase.bulkload.retries.number", "3");
+ configuration.set("zookeeper.recovery.retry", "3");
+ connection = ConnectionFactory.createConnection(configuration);
+ logger.warn("PullHBaseUtils get HBase connection,now to getAll().");
+ } catch (IOException ioe) {
+ logger.error("PullHBaseUtils getHbaseConn() IOException===>{" + ioe + "}<===");
+ ioe.printStackTrace();
+ } catch (Exception e) {
+ logger.error("PullHBaseUtils getHbaseConn() Exception===>{" + e + "}<===");
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 写入数据到HBase
+ *
+ * @param putList puts list
+ */
+ public static void insertData(List<Put> putList) {
+ if (pushHBaseUtils == null) {
+ getHbaseInstance();
+ }
+ Table table = null;
+ try {
+ table = connection.getTable(TableName.valueOf("sub:" + SubscriberConfig.HBASE_TABLE_NAME));
+ table.put(putList);
+ logger.warn("同步国家中心HBase增量写入分中心 HBase 成功,更新条数:" + putList.size());
+ } catch (IOException e) {
+ logger.error("同步国家中心 HBase 增量写入分中心 HBase 失败");
+ e.printStackTrace();
+ } finally {
+ try {
+ if (table != null) {
+ table.close();
+ }
+ } catch (IOException e) {
+ logger.error("HBase表关闭异常");
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+}