diff options
| -rw-r--r-- | pom.xml | 290 | ||||
| -rw-r--r-- | properties/subscriber-config.properties | 36 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java | 65 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/common/SubscriberConfig.java | 36 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java | 80 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java | 81 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/topology/StormRunner.java | 35 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java | 62 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/utils/TupleUtils.java | 21 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/utils/hbase/PullHBaseUtils.java | 138 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/utils/hbase/PushHBaseUtils.java | 93 | ||||
| -rw-r--r-- | src/main/java/log4j.properties | 25 |
12 files changed, 962 insertions, 0 deletions
@@ -0,0 +1,290 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>log-subscriber-hbase-datacenter</groupId> + <artifactId>log-subscriber-hbase-datacenter</artifactId> + <version>v3.20.08.18</version> + <packaging>jar</packaging> + + <name>log-subscriber-hbase</name> + <url>http://maven.apache.org</url> + + <repositories> + + <!--<repository>--> + <!--<id>nexus</id>--> + <!--<name>Team Nexus Repository</name>--> + <!--<url>http://192.168.40.125:8099/content/groups/public</url>--> + <!--</repository>--> + + + <!--<repository>--> + <!--<id>maven-ali</id>--> + <!--<url>http://maven.aliyun.com/nexus/content/groups/public/</url>--> + <!--<releases>--> + <!--<enabled>true</enabled>--> + <!--</releases>--> + <!--<snapshots>--> + <!--<enabled>true</enabled>--> + <!--<updatePolicy>always</updatePolicy>--> + <!--<checksumPolicy>fail</checksumPolicy>--> + <!--</snapshots>--> + <!--</repository>--> + + </repositories> + + <build> + <plugins> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.2</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>cn.ac.iie.topology.LogSubscriberTopology</mainClass> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>META-INF/spring.handlers</resource> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>META-INF/spring.schemas</resource> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + <resources> + <resource> + <directory>properties</directory> + <includes> + <include>**/*.properties</include> + <include>**/*.xml</include> + </includes> + <filtering>false</filtering> + </resource> + <resource> + <directory>src/main/java</directory> + <includes> + <include>log4j.properties</include> + </includes> + <filtering>false</filtering> + </resource> + </resources> + </build> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <kafka.version>1.0.0</kafka.version> + <storm.version>1.0.2</storm.version> + <hbase.version>2.2.3</hbase.version> + <hadoop.version>2.7.1</hadoop.version> + </properties> + + <dependencies> + <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>1.0.0</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${storm.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-kafka</artifactId> + <version>${storm.version}</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.mariadb.jdbc/mariadb-java-client --> + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>1.2.70</version> + </dependency> + + <!--<dependency>--> + <!--<groupId>com.zdjizhi</groupId>--> + <!--<artifactId>galaxy</artifactId>--> + <!--<version>1.0.3</version>--> + <!--<exclusions>--> + <!--<exclusion>--> + <!--<artifactId>slf4j-log4j12</artifactId>--> + <!--<groupId>org.slf4j</groupId>--> + <!--</exclusion>--> + <!--<exclusion>--> + <!--<artifactId>log4j-over-slf4j</artifactId>--> + <!--<groupId>org.slf4j</groupId>--> + <!--</exclusion>--> + <!--</exclusions>--> + <!--</dependency>--> + + <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>3.4.9</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client --> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <version>${hbase.version}</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server --> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + + </dependencies> +</project> diff --git a/properties/subscriber-config.properties b/properties/subscriber-config.properties new file mode 100644 index 0000000..50c0415 --- /dev/null +++ b/properties/subscriber-config.properties @@ -0,0 +1,36 @@ +#管理kafka地址 +bootstrap.servers=192.168.40.132:9092 + +#从kafka哪里开始读:earliest/latest +auto.offset.reset=latest + +#分中心 hbase zookeeper地址 +center.hbase.zookeeper.servers=192.168.40.131:2181 + +#国家中心 hbase zookeeper +national.hbase.zookeeper.servers=192.168.40.132:2181 + +#hbase table name +hbase.table.name=subscriber_info + +#tick时钟频率 +topology.tick.tuple.freq.secs=300 + +topology.config.max.spout.pending=500000 + +topology.num.acks=0 + +#kafka broker下的topic名称 +kafka.topic=RADIUS-RECORD-LOG + +#kafka消费group id +group.id=account-to-hbase-a + +#storm topology workers +topology.workers=1 + +#storm spout parallelism +spout.parallelism=1 + +#storm bolt parallelism +format.bolt.parallelism=1 diff --git a/src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java b/src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java new file mode 100644 index 0000000..9f7598c --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java @@ -0,0 +1,65 @@ +package cn.ac.iie.bolt; + +import cn.ac.iie.common.SubscriberConfig; +import cn.ac.iie.utils.TupleUtils; +import cn.ac.iie.utils.hbase.PullHBaseUtils; +import cn.ac.iie.utils.hbase.PushHBaseUtils; +import org.apache.hadoop.hbase.client.*; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Tuple; + +import java.util.*; + +/** + * @author qidaijie + */ +public class SubscriberIdBolt extends BaseBasicBolt { + private static Logger logger = Logger.getLogger(SubscriberIdBolt.class); + private static Map<String, String> subIdMap; + private List<Put> putList; + + @Override + public void prepare(Map stormConf, TopologyContext context) { + subIdMap = new HashMap<>(83334); + putList = new ArrayList<>(); + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + try { + if (TupleUtils.isTick(tuple)) { + PullHBaseUtils.change(subIdMap); + for (String key : subIdMap.keySet()) { + Put put = new Put(key.getBytes()); + put.addColumn("subscriber_id".getBytes(), "account".getBytes(), subIdMap.get(key).getBytes()); + putList.add(put); + } + PushHBaseUtils.insertData(putList); + putList.clear(); + subIdMap.clear(); + } else { + logger.warn(tuple.getString(0)); + } + } catch (Exception e) { + logger.error("获取国家中心HBase Radius 写入分中心 HBase 异常"); + e.printStackTrace(); + } + } + + @Override + public Map<String, Object> getComponentConfiguration() { + Map<String, Object> conf = new HashMap<String, Object>(16); + conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, + SubscriberConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + return conf; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } +} diff --git a/src/main/java/cn/ac/iie/common/SubscriberConfig.java b/src/main/java/cn/ac/iie/common/SubscriberConfig.java new file mode 100644 index 0000000..5157e01 --- /dev/null +++ b/src/main/java/cn/ac/iie/common/SubscriberConfig.java @@ -0,0 +1,36 @@ +package cn.ac.iie.common; + +import cn.ac.iie.utils.SubscriberConfigurations; + +import java.io.Serializable; + +/** + * @author qidaijie + */ +public class SubscriberConfig implements Serializable { + private static final long serialVersionUID = -8326385159484059324L; + + public static final int LIST_SIZE_MAX = 5000; + + /*** + * kafka and system + */ + public static final String BOOTSTRAP_SERVERS = SubscriberConfigurations.getStringProperty(0, "bootstrap.servers"); + public static final Integer SPOUT_PARALLELISM = SubscriberConfigurations.getIntProperty(0, "spout.parallelism"); + public static final Integer FORMAT_BOLT_PARALLELISM = SubscriberConfigurations.getIntProperty(0, "format.bolt.parallelism"); + public static final String GROUP_ID = SubscriberConfigurations.getStringProperty(0, "group.id"); + public static final String KAFKA_TOPIC = SubscriberConfigurations.getStringProperty(0, "kafka.topic"); + public static final String AUTO_OFFSET_RESET = SubscriberConfigurations.getStringProperty(0, "auto.offset.reset"); + public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = SubscriberConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs"); + public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = SubscriberConfigurations.getIntProperty(0, "topology.config.max.spout.pending"); + public static final Integer TOPOLOGY_NUM_ACKS = SubscriberConfigurations.getIntProperty(0, "topology.num.acks"); + + public static final Integer TOPOLOGY_WORKERS = SubscriberConfigurations.getIntProperty(0, "topology.workers"); + + public static final String CHECK_IP_SCOPE = SubscriberConfigurations.getStringProperty(0, "check.ip.scope"); + + public static final String CENTER_HBASE_ZOOKEEPER_SERVERS = SubscriberConfigurations.getStringProperty(0, "center.hbase.zookeeper.servers"); + public static final String HBASE_TABLE_NAME = SubscriberConfigurations.getStringProperty(0, "hbase.table.name"); + + public static final String NATIONAL_HBASE_ZOOKEEPER_SERVERS = SubscriberConfigurations.getStringProperty(0, "national.hbase.zookeeper.servers"); +}
\ No newline at end of file diff --git a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java new file mode 100644 index 0000000..5d4619a --- /dev/null +++ b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java @@ -0,0 +1,80 @@ +package cn.ac.iie.spout; + +import cn.ac.iie.common.SubscriberConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.log4j.Logger; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * @author qidaijie + */ +public class CustomizedKafkaSpout extends BaseRichSpout { + private static final long serialVersionUID = 2934528972182398950L; + private KafkaConsumer<String, String> consumer; + private SpoutOutputCollector collector = null; + private TopologyContext context = null; + private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class); + + + private static Properties createConsumerConfig() { + Properties props = new Properties(); + props.put("bootstrap.servers", SubscriberConfig.BOOTSTRAP_SERVERS); + props.put("group.id", SubscriberConfig.GROUP_ID); + props.put("auto.offset.reset", SubscriberConfig.AUTO_OFFSET_RESET); + props.put("session.timeout.ms", "60000"); + props.put("max.poll.records", 3000); + props.put("max.partition.fetch.bytes", 31457280); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + return props; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + // TODO Auto-generated method stub + this.collector = collector; + this.context = context; + Properties prop = createConsumerConfig(); + this.consumer = new KafkaConsumer<>(prop); + this.consumer.subscribe(Arrays.asList(SubscriberConfig.KAFKA_TOPIC)); + } + + @Override + public void close() { + consumer.close(); + } + + @Override + public void nextTuple() { + try { + // TODO Auto-generated method stub + ConsumerRecords<String, String> records = consumer.poll(10000L); + Thread.sleep(300); + for (ConsumerRecord<String, String> record : records) { + this.collector.emit(new Values(record.value())); + } + } catch (Exception e) { + logger.error("kfaka-spout 发送数据出现异常" + e); + e.printStackTrace(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // TODO Auto-generated method stub + declarer.declare(new Fields("source")); + } + +} diff --git a/src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java b/src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java new file mode 100644 index 0000000..5a0c903 --- /dev/null +++ b/src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java @@ -0,0 +1,81 @@ +package cn.ac.iie.topology; + +import cn.ac.iie.bolt.SubscriberIdBolt; +import cn.ac.iie.common.SubscriberConfig; +import cn.ac.iie.spout.CustomizedKafkaSpout; +import org.apache.storm.Config; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.topology.TopologyBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author qidaijie + */ +public class LogSubscriberTopology { + private static final Logger logger = LoggerFactory.getLogger(LogSubscriberTopology.class); + private final String topologyName; + private final Config topologyConfig; + private TopologyBuilder builder; + + public LogSubscriberTopology() { + this(LogSubscriberTopology.class.getSimpleName()); + } + + public LogSubscriberTopology(String topologyName) { + this.topologyName = topologyName; + topologyConfig = createTopologConfig(); + } + + private Config createTopologConfig() { + Config conf = new Config(); + conf.setDebug(false); + conf.setMessageTimeoutSecs(120); + conf.setTopologyWorkerMaxHeapSize(500); + conf.setMaxSpoutPending(SubscriberConfig.TOPOLOGY_CONFIG_MAX_SPOUT_PENDING); + if (SubscriberConfig.TOPOLOGY_NUM_ACKS == 0) { + conf.setNumAckers(0); + } + return conf; + } + + public void runLocally() throws InterruptedException { + topologyConfig.setMaxTaskParallelism(1); + StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600); + } + + public void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { + topologyConfig.setNumWorkers(SubscriberConfig.TOPOLOGY_WORKERS); + //设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌 + topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8); + StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig); + } + + private void buildTopology() { + builder = new TopologyBuilder(); + builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), SubscriberConfig.SPOUT_PARALLELISM); + builder.setBolt("SubscriberIdBolt", new SubscriberIdBolt(), SubscriberConfig.FORMAT_BOLT_PARALLELISM).localOrShuffleGrouping("CustomizedKafkaSpout"); + } + + public static void main(String[] args) throws Exception { + LogSubscriberTopology csst = null; + boolean runLocally = true; + if (args.length >= 2 && "remote".equalsIgnoreCase(args[1])) { + runLocally = false; + csst = new LogSubscriberTopology(args[0]); + } else { + csst = new LogSubscriberTopology(); + } + csst.buildTopology(); + + if (runLocally) { + logger.info("执行本地模式..."); + csst.runLocally(); + } else { + logger.info("执行远程部署模式..."); + csst.runRemotely(); + } + } +} diff --git a/src/main/java/cn/ac/iie/topology/StormRunner.java b/src/main/java/cn/ac/iie/topology/StormRunner.java new file mode 100644 index 0000000..4d5be53 --- /dev/null +++ b/src/main/java/cn/ac/iie/topology/StormRunner.java @@ -0,0 +1,35 @@ +package cn.ac.iie.topology; + + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.topology.TopologyBuilder; + +/** + * @author qidaijie + */ +public final class StormRunner{ + private static final int MILLS_IN_SEC = 1000; + + private StormRunner() {} + + public static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException { + + LocalCluster localCluster = new LocalCluster(); + localCluster.submitTopology(topologyName, conf, builder.createTopology()); + Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC); + localCluster.shutdown(); + + } + + public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { + + StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology()); + } + + +} diff --git a/src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java b/src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java new file mode 100644 index 0000000..95dd847 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java @@ -0,0 +1,62 @@ +package cn.ac.iie.utils; + +import java.util.Properties; + + + +public final class SubscriberConfigurations { + + private static Properties propCommon = new Properties(); +// private static Properties propService = new Properties(); + + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propCommon.getProperty(key); +// } else if (type == 1) { +// return propService.getProperty(key); + } else { + return null; + } + } + + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propCommon.getProperty(key)); +// } else if (type == 1) { +// return Integer.parseInt(propService.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propCommon.getProperty(key)); +// } else if (type == 1) { +// return Long.parseLong(propService.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return "true".equals(propCommon.getProperty(key).toLowerCase().trim()); +// } else if (type == 1) { +// return propService.getProperty(key).toLowerCase().trim().equals("true"); + } else { + return null; + } + } + + static { + try { + propCommon.load(SubscriberConfigurations.class.getClassLoader().getResourceAsStream("subscriber-config.properties")); + } catch (Exception e) { + propCommon = null; + System.err.println("配置加载失败"); + } + } +} diff --git a/src/main/java/cn/ac/iie/utils/TupleUtils.java b/src/main/java/cn/ac/iie/utils/TupleUtils.java new file mode 100644 index 0000000..c0dc410 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/TupleUtils.java @@ -0,0 +1,21 @@ +package cn.ac.iie.utils; + +import org.apache.storm.Constants; +import org.apache.storm.tuple.Tuple; + +/** + * @author Administrator + */ +public final class TupleUtils { + /** + * 判断是否系统自动发送的Tuple + * + * @param tuple + * @return + */ + public static boolean isTick(Tuple tuple) { + return tuple != null + && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent()) + && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId()); + } +} diff --git a/src/main/java/cn/ac/iie/utils/hbase/PullHBaseUtils.java b/src/main/java/cn/ac/iie/utils/hbase/PullHBaseUtils.java new file mode 100644 index 0000000..f464d95 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/hbase/PullHBaseUtils.java @@ -0,0 +1,138 @@ +package cn.ac.iie.utils.hbase; + +import cn.ac.iie.common.SubscriberConfig; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * HBase 工具类 + * + * @author qidaijie + */ + +public class PullHBaseUtils { + private final static Logger logger = Logger.getLogger(PullHBaseUtils.class); + private static Map<String, String> subIdMap = new HashMap<>(83334); + private static Connection connection; + private static Long time; + + private static String zookeeperIp; + private static String hBaseTable; + + private static PullHBaseUtils pullHBaseUtils; + + private static void getHbaseInstance() { + pullHBaseUtils = new PullHBaseUtils(); + } + + + /** + * 构造函数-新 + */ + private PullHBaseUtils() { + zookeeperIp = SubscriberConfig.NATIONAL_HBASE_ZOOKEEPER_SERVERS; + hBaseTable = SubscriberConfig.HBASE_TABLE_NAME; + //获取连接 + getHbaseConn(); + } + + private static void getHbaseConn() { + try { + // 管理Hbase的配置信息 + Configuration configuration = HBaseConfiguration.create(); + // 设置zookeeper节点 + configuration.set("hbase.zookeeper.quorum", zookeeperIp); + configuration.set("hbase.client.retries.number", "3"); + configuration.set("hbase.bulkload.retries.number", "3"); + configuration.set("zookeeper.recovery.retry", "3"); + connection = ConnectionFactory.createConnection(configuration); + time = System.currentTimeMillis(); + logger.warn("PullHBaseUtils get HBase connection,now to getAll()."); + } catch (IOException ioe) { + logger.error("PullHBaseUtils getHbaseConn() IOException===>{" + ioe + "}<==="); + ioe.printStackTrace(); + } catch (Exception e) { + logger.error("PullHBaseUtils getHbaseConn() Exception===>{" + e + "}<==="); + e.printStackTrace(); + } + } + + /** + * 更新变量 + */ + public static void change(Map<String, String> hashMap) { + if (pullHBaseUtils == null) { + getHbaseInstance(); + } + long nowTime = System.currentTimeMillis(); + timestampsFilter(time - 1000, nowTime + 500); + hashMap.putAll(subIdMap); + subIdMap.clear(); + } + + + /** + * 获取变更内容 + * + * @param startTime 开始时间 + * @param endTime 结束时间 + */ + private static void timestampsFilter(Long startTime, Long endTime) { + Long begin = System.currentTimeMillis(); + Table table = null; + ResultScanner scanner = null; + Scan scan2 = new Scan(); + try { + table = connection.getTable(TableName.valueOf("sub:" + hBaseTable)); + scan2.setTimeRange(startTime, endTime); + scanner = table.getScanner(scan2); + for (Result result : scanner) { + Cell[] cells = result.rawCells(); + for (Cell cell : cells) { + String key = Bytes.toString(CellUtil.cloneRow(cell)); + String value = Bytes.toString(CellUtil.cloneValue(cell)); + if (subIdMap.containsKey(key)) { + if (!value.equals(subIdMap.get(key))) { + subIdMap.put(key, value); + } + } else { + subIdMap.put(key, value); + } + } + } + Long end = System.currentTimeMillis(); + logger.warn("PullHBaseUtils Now subIdMap.keySet().size() is: " + subIdMap.keySet().size()); + logger.warn("PullHBaseUtils Update cache timeConsuming is: " + (end - begin) + ",BeginTime: " + begin + ",EndTime: " + end); + time = endTime; + } catch (IOException ioe) { + logger.error("PullHBaseUtils timestampsFilter is IOException===>{" + ioe + "}<==="); + ioe.printStackTrace(); + } catch (Exception e) { + logger.error("PullHBaseUtils timestampsFilter is Exception===>{" + e + "}<==="); + e.printStackTrace(); + } finally { + if (scanner != null) { + scanner.close(); + } + if (table != null) { + try { + table.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + +} diff --git a/src/main/java/cn/ac/iie/utils/hbase/PushHBaseUtils.java b/src/main/java/cn/ac/iie/utils/hbase/PushHBaseUtils.java new file mode 100644 index 0000000..1e98181 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/hbase/PushHBaseUtils.java @@ -0,0 +1,93 @@ +package cn.ac.iie.utils.hbase; + +import cn.ac.iie.common.SubscriberConfig; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * HBase 工具类 + * + * @author qidaijie + */ + +public class PushHBaseUtils { + private final static Logger logger = Logger.getLogger(PushHBaseUtils.class); + private static Connection connection; + private static String zookeeperIp; + private static PushHBaseUtils pushHBaseUtils; + + private static void getHbaseInstance() { + pushHBaseUtils = new PushHBaseUtils(); + } + + /** + * 构造函数-新 + */ + private PushHBaseUtils() { + zookeeperIp = SubscriberConfig.CENTER_HBASE_ZOOKEEPER_SERVERS; + //获取连接 + getHbaseConn(); + } + + private static void getHbaseConn() { + try { + // 管理Hbase的配置信息 + Configuration configuration = HBaseConfiguration.create(); + // 设置zookeeper节点 + configuration.set("hbase.zookeeper.quorum", zookeeperIp); + configuration.set("hbase.client.retries.number", "3"); + configuration.set("hbase.bulkload.retries.number", "3"); + configuration.set("zookeeper.recovery.retry", "3"); + connection = ConnectionFactory.createConnection(configuration); + logger.warn("PullHBaseUtils get HBase connection,now to getAll()."); + } catch (IOException ioe) { + logger.error("PullHBaseUtils getHbaseConn() IOException===>{" + ioe + "}<==="); + ioe.printStackTrace(); + } catch (Exception e) { + logger.error("PullHBaseUtils getHbaseConn() Exception===>{" + e + "}<==="); + e.printStackTrace(); + } + } + + /** + * 写入数据到HBase + * + * @param putList puts list + */ + public static void insertData(List<Put> putList) { + if (pushHBaseUtils == null) { + getHbaseInstance(); + } + Table table = null; + try { + table = connection.getTable(TableName.valueOf("sub:" + SubscriberConfig.HBASE_TABLE_NAME)); + table.put(putList); + logger.warn("同步国家中心HBase增量写入分中心 HBase 成功,更新条数:" + putList.size()); + } catch (IOException e) { + logger.error("同步国家中心 HBase 增量写入分中心 HBase 失败"); + e.printStackTrace(); + } finally { + try { + if (table != null) { + table.close(); + } + } catch (IOException e) { + logger.error("HBase表关闭异常"); + e.printStackTrace(); + } + } + + } + +} diff --git a/src/main/java/log4j.properties b/src/main/java/log4j.properties new file mode 100644 index 0000000..9d91936 --- /dev/null +++ b/src/main/java/log4j.properties @@ -0,0 +1,25 @@ +#Log4j +log4j.rootLogger=info,console,file +# 控制台日志设置 +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=info +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n + +# 文件日志设置 +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.Threshold=info +log4j.appender.file.encoding=UTF-8 +log4j.appender.file.Append=true +#路径请用相对路径,做好相关测试输出到应用目下 +log4j.appender.file.file=${nis.root}/log/galaxy-name.log +log4j.appender.file.DatePattern='.'yyyy-MM-dd +log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n +log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n +#MyBatis 配置,com.nis.web.dao是mybatis接口所在包 +log4j.logger.com.nis.web.dao=debug +#bonecp数据源配置 +log4j.category.com.jolbox=debug,console + + |
