diff options
| author | qidaijie <[email protected]> | 2019-11-08 11:46:29 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2019-11-08 11:46:29 +0800 |
| commit | 027908d6cbb6a87dd8e94a73df5df2c2ab0405f8 (patch) | |
| tree | aa5eb686a8dbf81458068c50a48882710a35f67c | |
radius 写入hbase初始版本
| -rw-r--r-- | pom.xml | 287 | ||||
| -rw-r--r-- | properties/address_routine.properties | 41 | ||||
| -rw-r--r-- | properties/core-site.xml | 64 | ||||
| -rw-r--r-- | properties/hbase-site.xml | 81 | ||||
| -rw-r--r-- | properties/hdfs-site.xml | 116 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/bolt/SubcribeIdBolt.java | 159 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/bolt/ToHBaseBolt.java | 109 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/common/AddressConfig.java | 38 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java | 78 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/topology/LogAddressRedisTopology.java | 83 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/topology/StormRunner.java | 32 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/utils/AddressConfigurations.java | 62 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/utils/TupleUtils.java | 21 | ||||
| -rw-r--r-- | src/main/java/log4j.properties | 25 | ||||
| -rw-r--r-- | src/test/java/cn/ac/iie/test/SubcribeIdBolt.java | 161 | ||||
| -rw-r--r-- | src/test/java/cn/ac/iie/test/SubcribeIdBoltone.java | 117 | ||||
| -rw-r--r-- | src/test/java/cn/ac/iie/test/subTest.java | 32 |
17 files changed, 1506 insertions, 0 deletions
@@ -0,0 +1,287 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>log-address-hbase</groupId> + <artifactId>log-address-hbase</artifactId> + <version>0.0.1-SNAPSHOT</version> + <packaging>jar</packaging> + + <name>log-address-hbase</name> + <url>http://maven.apache.org</url> + + <repositories> + <repository> + <id>nexus</id> + <name>Team Nexus Repository</name> + <url>http://192.168.10.125:8099/content/groups/public</url> + </repository> + </repositories> + + <build> + <plugins> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.2</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>cn.ac.iie.topology.LogAddressRedisTopology</mainClass> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>META-INF/spring.handlers</resource> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>META-INF/spring.schemas</resource> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + <resources> + <resource> + <directory>properties</directory> + <includes> + <include>**/*.properties</include> + <include>**/*.xml</include> + </includes> + <filtering>false</filtering> + </resource> + <resource> + <directory>src/main/java</directory> + <includes> + <include>log4j.properties</include> + </includes> + <filtering>false</filtering> + </resource> + </resources> + </build> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <kafka.version>1.0.0</kafka.version> + <storm.version>1.0.2</storm.version> + <hbase.version>1.4.9</hbase.version> + <hadoop.version>2.7.1</hadoop.version> + </properties> + + <dependencies> + <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>1.0.0</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${storm.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-kafka</artifactId> + <version>${storm.version}</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.nis</groupId> + <artifactId>nis-core</artifactId> + <version>1.0</version> + </dependency> + + <dependency> + <groupId>info.monitorenter</groupId> + <artifactId>cpdetector</artifactId> + <version>1.0.7</version> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>18.0</version> + </dependency> + + <dependency> + <groupId>redis.clients</groupId> + <artifactId>jedis</artifactId> + <version>2.8.1</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>3.8.1</version> + <scope>test</scope> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.mariadb.jdbc/mariadb-java-client --> + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>1.2.47</version> + </dependency> + + <dependency> + <groupId>com.zdjizhi</groupId> + <artifactId>galaxy</artifactId> + <version>1.0.1</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>3.4.9</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client --> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <version>${hbase.version}</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server --> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + + </dependencies> +</project> diff --git a/properties/address_routine.properties b/properties/address_routine.properties new file mode 100644 index 0000000..7692862 --- /dev/null +++ b/properties/address_routine.properties @@ -0,0 +1,41 @@ +#管理kafka地址 +bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092 +#从kafka哪里开始读:earliest/latest +auto.offset.reset=latest + +#hbase zookeeper地址 +hbase.zookeeper.servers=192.168.40.203:2186 + +#hbase tablename +hbase.table.name=subcriber_info + +#tick时钟频率 +topology.tick.tuple.freq.secs=50 + +topology.config.max.spout.pending=500000 + +topology.num.acks=0 + +#redis过期时间 +expiration.time=604800 + +#用于过滤对准用户名 +check.ip.scope=10,100,192 + +#kafka broker下的topic名称 +kafka.topic=RADIUS-LOG + +#kafka消费group id +group.id=account-to-hbase + +#storm topology workers +topology.workers=1 + +#storm spout parallelism +spout.parallelism=10 + +#storm bolt parallelism +format.bolt.parallelism=10 + +#1=单机逐条写入 2=集群逐条写入 3=集群批量写入 +redis.model=1 diff --git a/properties/core-site.xml b/properties/core-site.xml new file mode 100644 index 0000000..66742f0 --- /dev/null +++ b/properties/core-site.xml @@ -0,0 +1,64 @@ +<?xml version="1.0" encoding="UTF-8"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +<!-- Put site-specific property overrides in this file. --> + +<configuration> + <property> + <name>fs.defaultFS</name> + <value>hdfs://ns1</value> + </property> + <property> + <name>hadoop.tmp.dir</name> + <value>file:/opt/hadoop/tmp</value> + </property> + <property> + <name>io.file.buffer.size</name> + <value>131702</value> + </property> + <property> + <name>hadoop.proxyuser.root.hosts</name> + <value>*</value> + </property> + <property> + <name>hadoop.proxyuser.root.groups</name> + <value>*</value> + </property> + <property> + <name>hadoop.logfile.size</name> + <value>10000000</value> + <description>The max size of each log file</description> + </property> + + <property> + <name>hadoop.logfile.count</name> + <value>1</value> + <description>The max number of log files</description> + </property> + <property> + <name>ha.zookeeper.quorum</name> + <value>192.168.40.202:2181,192.168.40.203:2181,192.168.40.206:2181</value> + </property> +<property> +<name>io.compression.codecs</name> +<value>com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec</value> +</property> +<property> +<name>io.compression.codec.lzo.class</name> +<value>com.hadoop.compression.lzo.LzoCodec</value> +</property> +</configuration> + diff --git a/properties/hbase-site.xml b/properties/hbase-site.xml new file mode 100644 index 0000000..b81715c --- /dev/null +++ b/properties/hbase-site.xml @@ -0,0 +1,81 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +--> +<configuration> + <property> + <name>hbase.rootdir</name> + <value>hdfs://ns1/hbase/hbase-1.4.9</value> + </property> + <property> + <name>hbase.cluster.distributed</name> + <value>true</value> + </property> + <property> + <name>hbase.zookeeper.quorum</name> + <value>192.168.40.202,192.168.40.203,192.168.40.206</value> + </property> +<property> +<name>hbase.master.info.port</name> +<value>60010</value> +</property> +<!-- 开启启schema支持 对应hbase的namespace --> +<property> + <name>phoenix.schema.isNamespaceMappingEnabled</name> + <value>true</value> +</property> +<property> + <name>phoenix.schema.mapSystemTablesToNamespace</name> + <value>true</value> +</property> +<property> + <name>hbase.client.keyvalue.maxsize</name> + <value>99428800</value> + </property> + <property> + <name>hbase.server.keyvalue.maxsize</name> + <value>99428800</value> +</property> + <property> + <name>hbase.regionserver.wal.codec</name> + <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value> + </property> + <property> + <name>phoenix.query.timeoutMs</name> + <value>1800000</value> + </property> + <property> + <name>hbase.regionserver.lease.period</name> + <value>1200000</value> + </property> + <property> + <name>hbase.rpc.timeout</name> + <value>1200000</value> + </property> + <property> + <name>hbase.client.scanner.caching</name> + <value>1000</value> + </property> + <property> + <name>hbase.client.scanner.timeout.period</name> + <value>1200000</value> + </property> +</configuration> diff --git a/properties/hdfs-site.xml b/properties/hdfs-site.xml new file mode 100644 index 0000000..5bf742b --- /dev/null +++ b/properties/hdfs-site.xml @@ -0,0 +1,116 @@ +<?xml version="1.0" encoding="UTF-8"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +<!-- Put site-specific property overrides in this file. --> + +<configuration> + <property> + <name>dfs.namenode.name.dir</name> + <value>file:/home/bigdata/hadoop/dfs/name</value> + </property> + <property> + <name>dfs.datanode.data.dir</name> + <value>file:/home/bigdata/hadoop/dfs/data</value> + </property> + <property> + <name>dfs.replication</name> + <value>2</value> + </property> + <property> + <name>dfs.namenode.secondary.http-address</name> + <value>192.168.40.202:9001</value> + </property> + <property> + <name>dfs.webhdfs.enabled</name> + <value>true</value> + </property> + <property> + <name>dfs.permissions</name> + <value>false</value> + </property> + <property> + <name>dfs.permissions.enabled</name> + <value>false</value> + </property> + <property> + <name>dfs.nameservices</name> + <value>ns1</value> + </property> + <property> + <name>dfs.blocksize</name> + <value>134217728</value> + </property> + <property> + <name>dfs.ha.namenodes.ns1</name> + <value>nn1,nn2</value> + </property> + <!-- nn1的RPC通信地址,nn1所在地址 --> + <property> + <name>dfs.namenode.rpc-address.ns1.nn1</name> + <value>192.168.40.202:9000</value> + </property> + <!-- nn1的http通信地址,外部访问地址 --> + <property> + <name>dfs.namenode.http-address.ns1.nn1</name> + <value>192.168.40.202:50070</value> + </property> + <!-- nn2的RPC通信地址,nn2所在地址 --> + <property> + <name>dfs.namenode.rpc-address.ns1.nn2</name> + <value>192.168.40.203:9000</value> + </property> + <!-- nn2的http通信地址,外部访问地址 --> + <property> + <name>dfs.namenode.http-address.ns1.nn2</name> + <value>192.168.40.203:50070</value> + </property> + <!-- 指定NameNode的元数据在JournalNode日志上的存放位置(一般和zookeeper部署在一起) --> + <property> + <name>dfs.namenode.shared.edits.dir</name> + <value>qjournal://192.168.40.203:8485;192.168.40.206:8485;192.168.40.202:8485/ns1</value> + </property> + <!-- 指定JournalNode在本地磁盘存放数据的位置 --> + <property> + <name>dfs.journalnode.edits.dir</name> + <value>/home/bigdata/hadoop/journal</value> + </property> + <!--客户端通过代理访问namenode,访问文件系统,HDFS 客户端与Active 节点通信的Java 类,使用其确定Active 节点是否活跃 --> + <property> + <name>dfs.client.failover.proxy.provider.ns1</name> + <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value> + </property> + <!--这是配置自动切换的方法,有多种使用方法,具体可以看官网,在文末会给地址,这里是远程登录杀死的方法 --> + <property> + <name>dfs.ha.fencing.methods</name> + <value>sshfence</value> + </property> + <!-- 这个是使用sshfence隔离机制时才需要配置ssh免登陆 --> + <property> + <name>dfs.ha.fencing.ssh.private-key-files</name> + <value>/root/.ssh/id_rsa</value> + </property> + <!-- 配置sshfence隔离机制超时时间,这个属性同上,如果你是用脚本的方法切换,这个应该是可以不配置的 --> + <property> + <name>dfs.ha.fencing.ssh.connect-timeout</name> + <value>30000</value> + </property> + <!-- 这个是开启自动故障转移,如果你没有自动故障转移,这个可以先不配 --> + <property> + <name>dfs.ha.automatic-failover.enabled</name> + <value>true</value> + </property> +</configuration> + diff --git a/src/main/java/cn/ac/iie/bolt/SubcribeIdBolt.java b/src/main/java/cn/ac/iie/bolt/SubcribeIdBolt.java new file mode 100644 index 0000000..d71542a --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/SubcribeIdBolt.java @@ -0,0 +1,159 @@ +package cn.ac.iie.bolt; + +import cn.ac.iie.common.AddressConfig; +import cn.ac.iie.utils.TupleUtils; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.io.IOException; +import java.util.*; + +/** + * @author qidaijie + */ +public class SubcribeIdBolt extends BaseBasicBolt { + private static Logger logger = Logger.getLogger(SubcribeIdBolt.class); + private static Map<String, String> subIdMap; + private List<Put> putList; + private static Connection connection; + + static { + // 管理Hbase的配置信息 + Configuration configuration = HBaseConfiguration.create(); + // 设置zookeeper节点 + configuration.set("hbase.zookeeper.quorum", AddressConfig.HBASE_ZOOKEEPER_SERVERS); + try { + connection = ConnectionFactory.createConnection(configuration); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void prepare(Map stormConf, TopologyContext context) { + subIdMap = new HashMap<>(16); + putList = new ArrayList<>(); + getAll(); + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + try { + if (TupleUtils.isTick(tuple)) { + insertData(putList); + } else { + String message = tuple.getString(0); + if (StringUtil.isNotBlank(message)) { + JSONObject jsonObject = JSONObject.parseObject(message); + String ip = jsonObject.getString("framed_ip"); + String account = jsonObject.getString("account"); + dataValidation(ip, account, putList); + if (putList.size() == AddressConfig.LIST_SIZE_MAX) { + insertData(putList); + } + } + } + } catch (Exception e) { + logger.error("Radius写入Redis出现异常", e); + } + + } + + @Override + public Map<String, Object> getComponentConfiguration() { + Map<String, Object> conf = new HashMap<String, Object>(16); + conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, + AddressConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + return conf; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + + /** + * 获取所有的 key value + */ + private static void getAll() { + try { + Table table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME)); + Scan scan2 = new Scan(); + ResultScanner scanner = table.getScanner(scan2); + for (Result result : scanner) { + Cell[] cells = result.rawCells(); + for (Cell cell : cells) { + subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); + } + } + scanner.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * 写入数据到hbase + */ + private static void insertData(List<Put> putList) { + Table table = null; + try { + table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME)); + table.put(putList); + putList.clear(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + try { + if (table != null) { + table.close(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + } + + /** + * 验证数据并与内存中的对比 + * + * @param ip framed_ip + * @param account account + */ + private static void dataValidation(String ip, String account, List<Put> putList) { + if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) { +// String s = ip.split("\\.")[0]; +// if (!AddressConfig.CHECK_IP_SCOPE.contains(s)) { + if (subIdMap.containsKey(ip)) { + if (!subIdMap.get(ip).equals(account)) { + Put put = new Put(ip.getBytes()); + put.addColumn("subscribe_id".getBytes(), "account".getBytes(), account.getBytes()); + putList.add(put); + subIdMap.put(ip, account); + } + } else { + Put put = new Put(ip.getBytes()); + put.addColumn("subscribe_id".getBytes(), "account".getBytes(), account.getBytes()); + putList.add(put); + subIdMap.put(ip, account); + } +// } + } + } +} diff --git a/src/main/java/cn/ac/iie/bolt/ToHBaseBolt.java b/src/main/java/cn/ac/iie/bolt/ToHBaseBolt.java new file mode 100644 index 0000000..fa584b0 --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/ToHBaseBolt.java @@ -0,0 +1,109 @@ +package cn.ac.iie.bolt; + +import cn.ac.iie.common.AddressConfig; +import cn.ac.iie.utils.TupleUtils; +import com.zdjizhi.utils.StringUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Tuple; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author qidaijie + */ +public class ToHBaseBolt extends BaseBasicBolt { + private static Logger logger = Logger.getLogger(ToHBaseBolt.class); + private static List<Put> putList; + private static Connection connection; + + static { + // 管理Hbase的配置信息 + Configuration configuration = HBaseConfiguration.create(); + // 设置zookeeper节点 + configuration.set("hbase.zookeeper.quorum", AddressConfig.HBASE_ZOOKEEPER_SERVERS); + try { + connection = ConnectionFactory.createConnection(configuration); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void prepare(Map stormConf, TopologyContext context) { + putList = new ArrayList<>(); + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + try { + if (TupleUtils.isTick(tuple)) { + insertData(); + putList.clear(); + } else { + String message = tuple.getString(0); + if (StringUtil.isNotBlank(message)) { + String[] split = message.split("-"); + Put put = new Put(split[0].getBytes()); + put.addColumn("subscribe_id".getBytes(), "account".getBytes(), split[1].getBytes()); + putList.add(put); + if (putList.size() == AddressConfig.LIST_SIZE_MAX) { + insertData(); + putList.clear(); + } + } + } + } catch (Exception e) { + logger.error("Radius写入Redis出现异常", e); + } + + } + + @Override + public Map<String, Object> getComponentConfiguration() { + Map<String, Object> conf = new HashMap<String, Object>(16); + conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, + AddressConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + return conf; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + + + /** + * 写入数据到hbase + */ + private static void insertData() { + Table table = null; + try { + table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME)); + table.put(putList); + } catch (IOException e) { + e.printStackTrace(); + } finally { + try { + if (table != null) { + table.close(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + } + +} diff --git a/src/main/java/cn/ac/iie/common/AddressConfig.java b/src/main/java/cn/ac/iie/common/AddressConfig.java new file mode 100644 index 0000000..2f851c9 --- /dev/null +++ b/src/main/java/cn/ac/iie/common/AddressConfig.java @@ -0,0 +1,38 @@ +package cn.ac.iie.common; + +import cn.ac.iie.utils.AddressConfigurations; + +import java.io.Serializable; + +/** + * @author qidaijie + */ +public class AddressConfig implements Serializable { + private static final long serialVersionUID = -8326385159484059324L; + + public static final String SEGMENTATION = ","; + public static final int LIST_SIZE_MAX = 5000; + + /*** + * kafka and system + */ + public static final String BOOTSTRAP_SERVERS = AddressConfigurations.getStringProperty(0, "bootstrap.servers"); + public static final Integer SPOUT_PARALLELISM = AddressConfigurations.getIntProperty(0, "spout.parallelism"); + public static final Integer FORMAT_BOLT_PARALLELISM = AddressConfigurations.getIntProperty(0, "format.bolt.parallelism"); + public static final String GROUP_ID = AddressConfigurations.getStringProperty(0, "group.id"); + public static final String KAFKA_TOPIC = AddressConfigurations.getStringProperty(0, "kafka.topic"); + public static final String AUTO_OFFSET_RESET = AddressConfigurations.getStringProperty(0, "auto.offset.reset"); + public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = AddressConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs"); + public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = AddressConfigurations.getIntProperty(0, "topology.config.max.spout.pending"); + public static final Integer TOPOLOGY_NUM_ACKS = AddressConfigurations.getIntProperty(0, "topology.num.acks"); + + public static final Integer TOPOLOGY_WORKERS = AddressConfigurations.getIntProperty(0, "topology.workers"); + + public static final Integer EXPIRATION_TIME = AddressConfigurations.getIntProperty(0, "expiration.time"); + public static final Integer REDIS_MODEL = AddressConfigurations.getIntProperty(0, "redis.model"); + + public static final String CHECK_IP_SCOPE = AddressConfigurations.getStringProperty(0, "check.ip.scope"); + + public static final String HBASE_ZOOKEEPER_SERVERS = AddressConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); + public static final String HBASE_TABLE_NAME = AddressConfigurations.getStringProperty(0, "hbase.table.name"); +}
\ No newline at end of file diff --git a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java new file mode 100644 index 0000000..96c8330 --- /dev/null +++ b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java @@ -0,0 +1,78 @@ +package cn.ac.iie.spout; + +import cn.ac.iie.common.AddressConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.log4j.Logger; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; + +/** + * @author qidaijie + */ +public class CustomizedKafkaSpout extends BaseRichSpout { + private static final long serialVersionUID = 2934528972182398950L; + private KafkaConsumer<String, String> consumer; + private SpoutOutputCollector collector = null; + private TopologyContext context = null; + private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class); + + + private static Properties createConsumerConfig() { + Properties props = new Properties(); + props.put("bootstrap.servers", AddressConfig.BOOTSTRAP_SERVERS); + props.put("group.id", AddressConfig.GROUP_ID); + props.put("auto.offset.reset", AddressConfig.AUTO_OFFSET_RESET); + props.put("session.timeout.ms", "60000"); + props.put("max.poll.records", 3000); + props.put("max.partition.fetch.bytes", 31457280); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + return props; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + // TODO Auto-generated method stub + this.collector = collector; + this.context = context; + Properties prop = createConsumerConfig(); + this.consumer = new KafkaConsumer<>(prop); + this.consumer.subscribe(Arrays.asList(AddressConfig.KAFKA_TOPIC)); + } + + @Override + public void close() { + consumer.close(); + } + + @Override + public void nextTuple() { + try { + // TODO Auto-generated method stub + ConsumerRecords<String, String> records = consumer.poll(10000L); + Thread.sleep(300); + for (ConsumerRecord<String, String> record : records) { + this.collector.emit(new Values(record.value())); + } + } catch (Exception e) { + logger.error("kfaka-spout 发送数据出现异常" + e); + e.printStackTrace(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // TODO Auto-generated method stub + declarer.declare(new Fields("source")); + } +} diff --git a/src/main/java/cn/ac/iie/topology/LogAddressRedisTopology.java b/src/main/java/cn/ac/iie/topology/LogAddressRedisTopology.java new file mode 100644 index 0000000..491cc97 --- /dev/null +++ b/src/main/java/cn/ac/iie/topology/LogAddressRedisTopology.java @@ -0,0 +1,83 @@ +package cn.ac.iie.topology; + +import cn.ac.iie.bolt.SubcribeIdBolt; +import cn.ac.iie.bolt.ToHBaseBolt; +import cn.ac.iie.common.AddressConfig; +import cn.ac.iie.spout.CustomizedKafkaSpout; +import org.apache.storm.Config; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.topology.TopologyBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author qidaijie + */ +public class LogAddressRedisTopology { + private static final Logger logger = LoggerFactory.getLogger(LogAddressRedisTopology.class); + private final String topologyName; + private final Config topologyConfig; + private TopologyBuilder builder; + + public LogAddressRedisTopology() { + this(LogAddressRedisTopology.class.getSimpleName()); + } + + public LogAddressRedisTopology(String topologyName) { + this.topologyName = topologyName; + topologyConfig = createTopologConfig(); + } + + private Config createTopologConfig() { + Config conf = new Config(); + conf.setDebug(false); + conf.setMessageTimeoutSecs(120); + conf.setTopologyWorkerMaxHeapSize(500); + conf.setMaxSpoutPending(AddressConfig.TOPOLOGY_CONFIG_MAX_SPOUT_PENDING); + if (AddressConfig.TOPOLOGY_NUM_ACKS == 0) { + conf.setNumAckers(0); + } + return conf; + } + + public void runLocally() throws InterruptedException { + topologyConfig.setMaxTaskParallelism(1); + StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600); + } + + public void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { + topologyConfig.setNumWorkers(AddressConfig.TOPOLOGY_WORKERS); + topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8);//设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌 + StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig); + } + + private void buildTopology() { + builder = new TopologyBuilder(); + builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), AddressConfig.SPOUT_PARALLELISM); + builder.setBolt("SubcribeIdBolt", new SubcribeIdBolt(), AddressConfig.FORMAT_BOLT_PARALLELISM).localOrShuffleGrouping("CustomizedKafkaSpout"); +// builder.setBolt("ToHBaseBolt", new ToHBaseBolt(), 1).localOrShuffleGrouping("SubcribeIdBolt"); + + } + + public static void main(String[] args) throws Exception { + LogAddressRedisTopology csst = null; + boolean runLocally = true; + if (args.length >= 2 && "remote".equalsIgnoreCase(args[1])) { + runLocally = false; + csst = new LogAddressRedisTopology(args[0]); + } else { + csst = new LogAddressRedisTopology(); + } + csst.buildTopology(); + + if (runLocally) { + logger.info("执行本地模式..."); + csst.runLocally(); + } else { + logger.info("执行远程部署模式..."); + csst.runRemotely(); + } + } +} diff --git a/src/main/java/cn/ac/iie/topology/StormRunner.java b/src/main/java/cn/ac/iie/topology/StormRunner.java new file mode 100644 index 0000000..d2d4ab9 --- /dev/null +++ b/src/main/java/cn/ac/iie/topology/StormRunner.java @@ -0,0 +1,32 @@ +package cn.ac.iie.topology; + + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.topology.TopologyBuilder; + +public final class StormRunner{ + private static final int MILLS_IN_SEC = 1000; + + private StormRunner() {} + + public static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException { + + LocalCluster localCluster = new LocalCluster(); + localCluster.submitTopology(topologyName, conf, builder.createTopology()); + Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC); + localCluster.shutdown(); + + } + + public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { + + StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology()); + } + + +} diff --git a/src/main/java/cn/ac/iie/utils/AddressConfigurations.java b/src/main/java/cn/ac/iie/utils/AddressConfigurations.java new file mode 100644 index 0000000..ebdfcc7 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/AddressConfigurations.java @@ -0,0 +1,62 @@ +package cn.ac.iie.utils; + +import java.util.Properties; + + + +public final class AddressConfigurations { + + private static Properties propCommon = new Properties(); +// private static Properties propService = new Properties(); + + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propCommon.getProperty(key); +// } else if (type == 1) { +// return propService.getProperty(key); + } else { + return null; + } + } + + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propCommon.getProperty(key)); +// } else if (type == 1) { +// return Integer.parseInt(propService.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propCommon.getProperty(key)); +// } else if (type == 1) { +// return Long.parseLong(propService.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return "true".equals(propCommon.getProperty(key).toLowerCase().trim()); +// } else if (type == 1) { +// return propService.getProperty(key).toLowerCase().trim().equals("true"); + } else { + return null; + } + } + + static { + try { + propCommon.load(AddressConfigurations.class.getClassLoader().getResourceAsStream("address_routine.properties")); + } catch (Exception e) { + propCommon = null; + System.err.println("配置加载失败"); + } + } +} diff --git a/src/main/java/cn/ac/iie/utils/TupleUtils.java b/src/main/java/cn/ac/iie/utils/TupleUtils.java new file mode 100644 index 0000000..c0dc410 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/TupleUtils.java @@ -0,0 +1,21 @@ +package cn.ac.iie.utils; + +import org.apache.storm.Constants; +import org.apache.storm.tuple.Tuple; + +/** + * @author Administrator + */ +public final class TupleUtils { + /** + * 判断是否系统自动发送的Tuple + * + * @param tuple + * @return + */ + public static boolean isTick(Tuple tuple) { + return tuple != null + && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent()) + && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId()); + } +} diff --git a/src/main/java/log4j.properties b/src/main/java/log4j.properties new file mode 100644 index 0000000..dfd50f1 --- /dev/null +++ b/src/main/java/log4j.properties @@ -0,0 +1,25 @@ +#Log4j +log4j.rootLogger=info,console,file +# 控制台日志设置 +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=error +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n + +# 文件日志设置 +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.Threshold=error +log4j.appender.file.encoding=UTF-8 +log4j.appender.file.Append=true +#路径请用相对路径,做好相关测试输出到应用目下 +log4j.appender.file.file=${nis.root}/log/galaxy-name.log +log4j.appender.file.DatePattern='.'yyyy-MM-dd +log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n +log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n +#MyBatis 配置,com.nis.web.dao是mybatis接口所在包 +log4j.logger.com.nis.web.dao=debug +#bonecp数据源配置 +log4j.category.com.jolbox=debug,console + + diff --git a/src/test/java/cn/ac/iie/test/SubcribeIdBolt.java b/src/test/java/cn/ac/iie/test/SubcribeIdBolt.java new file mode 100644 index 0000000..add969a --- /dev/null +++ b/src/test/java/cn/ac/iie/test/SubcribeIdBolt.java @@ -0,0 +1,161 @@ +package cn.ac.iie.test; + +import cn.ac.iie.common.AddressConfig; +import cn.ac.iie.utils.TupleUtils; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Tuple; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author qidaijie + */ +public class SubcribeIdBolt extends BaseBasicBolt { + private static Logger logger = Logger.getLogger(SubcribeIdBolt.class); + private static Map<String, String> subIdMap; + private List<Put> putList; + private static Connection connection; + + static { + // 管理Hbase的配置信息 + Configuration configuration = HBaseConfiguration.create(); + // 设置zookeeper节点 + configuration.set("hbase.zookeeper.quorum", AddressConfig.HBASE_ZOOKEEPER_SERVERS); + try { + connection = ConnectionFactory.createConnection(configuration); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void prepare(Map stormConf, TopologyContext context) { + subIdMap = new HashMap<>(16); + putList = new ArrayList<>(); + getAll(); + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + try { + if (TupleUtils.isTick(tuple)) { + insertData(putList); + } else { + String message = tuple.getString(0); + if (StringUtil.isNotBlank(message)) { + JSONObject jsonObject = JSONObject.parseObject(message); + String ip = jsonObject.getString("framed_ip"); + String account = jsonObject.getString("account"); + dataValidation(ip, account, putList); + if (putList.size() == AddressConfig.LIST_SIZE_MAX) { + insertData(putList); + } + } + } + } catch (Exception e) { + logger.error("Radius写入Redis出现异常", e); + } + + } + + @Override + public Map<String, Object> getComponentConfiguration() { + Map<String, Object> conf = new HashMap<String, Object>(16); + conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, + AddressConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + return conf; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + + /** + * 获取所有的 key value + */ + private static void getAll() { + try { + Table table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME)); + Scan scan2 = new Scan(); + ResultScanner scanner = table.getScanner(scan2); + for (Result result : scanner) { + Cell[] cells = result.rawCells(); + for (Cell cell : cells) { + subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); + } + } + scanner.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * 写入数据到hbase + */ + private static void insertData(List<Put> putList) { + Table table = null; + try { + table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME)); + table.put(putList); + logger.error("写入hbase数目:" + putList.size()); + putList.clear(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + try { + if (table != null) { + table.close(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + } + + /** + * 验证数据并与内存中的对比 + * + * @param ip framed_ip + * @param account account + */ + private static void dataValidation(String ip, String account, List<Put> putList) { + if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) { +// String s = ip.split("\\.")[0]; +// if (!AddressConfig.CHECK_IP_SCOPE.contains(s)) { + if (subIdMap.containsKey(ip)) { + if (!subIdMap.get(ip).equals(account)) { + Put put = new Put(ip.getBytes()); + put.addColumn("subscribe_id".getBytes(), "account".getBytes(), account.getBytes()); + putList.add(put); + subIdMap.put(ip, account); + } + } else { + Put put = new Put(ip.getBytes()); + put.addColumn("subscribe_id".getBytes(), "account".getBytes(), account.getBytes()); + putList.add(put); + subIdMap.put(ip, account); + } +// } + } + } +} diff --git a/src/test/java/cn/ac/iie/test/SubcribeIdBoltone.java b/src/test/java/cn/ac/iie/test/SubcribeIdBoltone.java new file mode 100644 index 0000000..00b54a8 --- /dev/null +++ b/src/test/java/cn/ac/iie/test/SubcribeIdBoltone.java @@ -0,0 +1,117 @@ +package cn.ac.iie.test; + +import cn.ac.iie.common.AddressConfig; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * @author qidaijie + */ +public class SubcribeIdBoltone extends BaseBasicBolt { + private static Logger logger = Logger.getLogger(SubcribeIdBoltone.class); + private static Map<String, String> subIdMap; + private static Connection connection; + + static { + // 管理Hbase的配置信息 + Configuration configuration = HBaseConfiguration.create(); + // 设置zookeeper节点 + configuration.set("hbase.zookeeper.quorum", AddressConfig.HBASE_ZOOKEEPER_SERVERS); + try { + connection = ConnectionFactory.createConnection(configuration); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void prepare(Map stormConf, TopologyContext context) { + subIdMap = new HashMap<>(16); + getAll(); + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + try { + String message = tuple.getString(0); + if (StringUtil.isNotBlank(message)) { + JSONObject jsonObject = JSONObject.parseObject(message); + String ip = jsonObject.getString("framed_ip"); + String account = jsonObject.getString("account"); +// dataValidation(ip, account, collector); + collector.emit(new Values(ip + "-" + account)); + } + } catch (Exception e) { + logger.error("Radius写入Redis出现异常", e); + } + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("connLog")); + } + + /** + * 获取所有的 key value + */ + private static void getAll() { + try { + Table table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME)); + Scan scan2 = new Scan(); + ResultScanner scanner = table.getScanner(scan2); + for (Result result : scanner) { + Cell[] cells = result.rawCells(); + for (Cell cell : cells) { + subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); + } + } + scanner.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + + /** + * 验证数据并与内存中的对比 + * + * @param ip framed_ip + * @param account account + */ + private static void dataValidation(String ip, String account, BasicOutputCollector collector) { + if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) { +// String s = ip.split("\\.")[0]; +// if (!AddressConfig.CHECK_IP_SCOPE.contains(s)) { + if (subIdMap.containsKey(ip)) { + if (!subIdMap.get(ip).equals(account)) { + subIdMap.put(ip, account); + collector.emit(new Values(ip + "-" + account)); + } + } else { + subIdMap.put(ip, account); + collector.emit(new Values(ip + "-" + account)); + } +// } + } + } +} diff --git a/src/test/java/cn/ac/iie/test/subTest.java b/src/test/java/cn/ac/iie/test/subTest.java new file mode 100644 index 0000000..468517a --- /dev/null +++ b/src/test/java/cn/ac/iie/test/subTest.java @@ -0,0 +1,32 @@ +package cn.ac.iie.test; + +import cn.ac.iie.common.AddressConfig; +import com.zdjizhi.utils.StringUtil; + +public class subTest { + + + public static void main(String[] args) { + boolean validation = dataValidation("1.168.40.123", "abcd"); + System.out.println(validation); + + if ("10,100,192".contains("1")){ + System.out.println("yes"); + }else{ + System.out.println("no"); + } + + String s = "192.168.40.125-abcd"; + System.out.println(s.split("-")[0]); + } + + + private static boolean dataValidation(String ip, String account) { + if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) { + String s = ip.split("\\.")[0]; + return !AddressConfig.CHECK_IP_SCOPE.contains(s); + } + return false; + } + +} |
