summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2021-02-01 11:38:18 +0800
committerqidaijie <[email protected]>2021-02-01 11:38:18 +0800
commite6e2e41333033fed4147d4fc8b3513d3dc9e1bfe (patch)
tree07fb529d2dfddb8ff8654a27b373a182c40aaed1
parent265611e571d5c2df36ecfc358e6caacf49e1cc4e (diff)
提交线上使用版本HEADmaster
-rw-r--r--pom.xml174
-rw-r--r--properties/knowledge_config.properties36
-rw-r--r--src/main/java/cn/ac/iie/bean/Knowledge.java61
-rw-r--r--src/main/java/cn/ac/iie/bolt/RadiusCleanBolt.java108
-rw-r--r--src/main/java/cn/ac/iie/common/KnowledgeConfig.java65
-rw-r--r--src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java78
-rw-r--r--src/main/java/cn/ac/iie/topology/RadiusLogClearTopology.java81
-rw-r--r--src/main/java/cn/ac/iie/topology/StormRunner.java32
-rw-r--r--src/main/java/cn/ac/iie/utils/KnowledgeConfigurations.java53
-rw-r--r--src/main/java/cn/ac/iie/utils/LogToKafka.java77
-rw-r--r--src/main/java/cn/ac/iie/utils/TupleUtils.java21
-rw-r--r--src/main/java/log4j.properties25
-rw-r--r--src/test/java/cn/ac/iie/test/JsonTest.java11
13 files changed, 822 insertions, 0 deletions
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..c716e84
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,174 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>radius-account-knowledge</groupId>
+ <artifactId>radius-account-knowledge</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <name>radius-account-knowledge</name>
+ <url>http://maven.apache.org</url>
+
+ <repositories>
+ <repository>
+ <id>nexus</id>
+ <name>Team Nexus Repository</name>
+ <url>http://192.168.40.125:8099/content/groups/public</url>
+ </repository>
+
+
+ <repository>
+ <id>maven-ali</id>
+ <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ <updatePolicy>always</updatePolicy>
+ <checksumPolicy>fail</checksumPolicy>
+ </snapshots>
+ </repository>
+
+ </repositories>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.2</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>cn.ac.iie.topology.RadiusLogClearTopology</mainClass>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>META-INF/spring.handlers</resource>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>META-INF/spring.schemas</resource>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>properties</directory>
+ <includes>
+ <include>**/*.properties</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+ <resource>
+ <directory>src/main/java</directory>
+ <includes>
+ <include>log4j.properties</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+ </resources>
+ </build>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <kafka.version>1.0.0</kafka.version>
+ <storm.version>1.0.2</storm.version>
+ <zdjizhi.version>1.0.3</zdjizhi.version>
+ </properties>
+
+ <dependencies>
+ <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>1.0.0</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${storm.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-kafka</artifactId>
+ <version>${storm.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.mariadb.jdbc/mariadb-java-client -->
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>1.2.60</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>galaxy</artifactId>
+ <version>${zdjizhi.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ </dependencies>
+</project>
diff --git a/properties/knowledge_config.properties b/properties/knowledge_config.properties
new file mode 100644
index 0000000..764c53a
--- /dev/null
+++ b/properties/knowledge_config.properties
@@ -0,0 +1,36 @@
+#管理kafka地址
+bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
+
+#kafka broker下的topic名称
+kafka.topic=RADIUS-RECORD-LOG
+
+#kafka消费group id
+group.id=RADIUS-AAA
+
+#输出kafka server
+results.output.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
+
+#输出topic
+results.output.topic=RADIUS-ONFF-LOG
+
+#从kafka哪里开始读:earliest/latest
+auto.offset.reset=latest
+
+#storm topology workers
+topology.workers=1
+
+#storm spout parallelism
+spout.parallelism=3
+
+#storm bolt parallelism
+format.bolt.parallelism=3
+
+#tick时钟频率
+topology.tick.tuple.freq.secs=5
+
+topology.config.max.spout.pending=500000
+
+topology.num.acks=0
+
+#kafka批量条数
+batch.insert.num=2000 \ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/bean/Knowledge.java b/src/main/java/cn/ac/iie/bean/Knowledge.java
new file mode 100644
index 0000000..2bf06b9
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bean/Knowledge.java
@@ -0,0 +1,61 @@
+package cn.ac.iie.bean;
+
+/**
+ * @author qidaijie
+ */
+public class Knowledge {
+ private String framed_ip;
+ private String account;
+ private String acct_session_id;
+ private int acct_status_type;
+ private int acct_session_time;
+ private long event_timestamp;
+
+ public String getFramed_ip() {
+ return framed_ip;
+ }
+
+ public void setFramed_ip(String framed_ip) {
+ this.framed_ip = framed_ip;
+ }
+
+ public String getAccount() {
+ return account;
+ }
+
+ public void setAccount(String account) {
+ this.account = account;
+ }
+
+ public int getAcct_status_type() {
+ return acct_status_type;
+ }
+
+ public void setAcct_status_type(int acct_status_type) {
+ this.acct_status_type = acct_status_type;
+ }
+
+ public long getEvent_timestamp() {
+ return event_timestamp;
+ }
+
+ public void setEvent_timestamp(long event_timestamp) {
+ this.event_timestamp = event_timestamp;
+ }
+
+ public String getAcct_session_id() {
+ return acct_session_id;
+ }
+
+ public void setAcct_session_id(String acct_session_id) {
+ this.acct_session_id = acct_session_id;
+ }
+
+ public int getAcct_session_time() {
+ return acct_session_time;
+ }
+
+ public void setAcct_session_time(int acct_session_time) {
+ this.acct_session_time = acct_session_time;
+ }
+}
diff --git a/src/main/java/cn/ac/iie/bolt/RadiusCleanBolt.java b/src/main/java/cn/ac/iie/bolt/RadiusCleanBolt.java
new file mode 100644
index 0000000..d356dc1
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/RadiusCleanBolt.java
@@ -0,0 +1,108 @@
+package cn.ac.iie.bolt;
+
+import cn.ac.iie.bean.Knowledge;
+import cn.ac.iie.common.KnowledgeConfig;
+import cn.ac.iie.utils.LogToKafka;
+import cn.ac.iie.utils.TupleUtils;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.log4j.Logger;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author ZDJZ
+ */
+public class RadiusCleanBolt extends BaseBasicBolt {
+ private static final long serialVersionUID = -7099293750085572832L;
+ private List<String> list;
+ private LogToKafka logToKafka;
+ private static Logger logger = Logger.getLogger(RadiusCleanBolt.class);
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ list = new LinkedList<>();
+ logToKafka = LogToKafka.getInstance();
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ try {
+ if (TupleUtils.isTick(tuple)) {
+ logToKafka.sendMessage(list);
+ list.clear();
+ } else {
+ String message = tuple.getString(0);
+ if (StringUtil.isNotBlank(message)) {
+ JSONObject jsonObject = JSONObject.parseObject(message);
+ if (jsonObject.containsKey(KnowledgeConfig.RADIUS_PACKET_TYPE)) {
+ int packetType = jsonObject.getInteger(KnowledgeConfig.RADIUS_PACKET_TYPE);
+ if (KnowledgeConfig.ACCOUNTING_REQUEST == packetType) {
+ int statusType = jsonObject.getInteger("radius_acct_status_type");
+ if (KnowledgeConfig.START_BILLING == statusType || KnowledgeConfig.STOP_BILLING == statusType) {
+ Knowledge knowledge = new Knowledge();
+ knowledge.setFramed_ip(jsonObject.getString("radius_framed_ip"));
+ knowledge.setAccount(jsonObject.getString("radius_account"));
+ knowledge.setAcct_status_type(statusType);
+ /*
+ 如果存在时间戳则选择此时间戳没有获取当前时间
+ */
+ if (jsonObject.containsKey(KnowledgeConfig.RADIUS_EVENT_TIMESTAMP)) {
+ knowledge.setEvent_timestamp(jsonObject.getInteger("radius_event_timestamp"));
+ } else {
+ knowledge.setEvent_timestamp((System.currentTimeMillis() / 1000));
+ }
+ /*
+ * 标识同一个连接:
+ * 1.数据若存在acct_multi_session_id属性,取该属性
+ * 2. 不存在取 acct_session_id
+ */
+ if (jsonObject.containsKey(KnowledgeConfig.RADIUS_MULTI_SESSION_ID)) {
+ knowledge.setAcct_session_id(jsonObject.getString("radius_acct_multi_session_id"));
+ } else {
+ knowledge.setAcct_session_id(jsonObject.getString("radius_acct_session_id"));
+ }
+ /*
+ 用户的在线时长,以秒为单位,下线用户无此属性,默认为0
+ */
+ if (jsonObject.containsKey(KnowledgeConfig.RADIUS_SESSION_TIME)) {
+ knowledge.setAcct_session_time(jsonObject.getInteger("radius_acct_session_time"));
+ } else {
+ knowledge.setAcct_session_time(0);
+ }
+ list.add(JSONObject.toJSONString(knowledge));
+ }
+ }
+ }
+ if (list.size() == KnowledgeConfig.BATCH_INSERT_NUM) {
+ logToKafka.sendMessage(list);
+ list.clear();
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Radius 上下线日志解析异常", e);
+ }
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = new HashMap<>(16);
+ conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, KnowledgeConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+ return conf;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/common/KnowledgeConfig.java b/src/main/java/cn/ac/iie/common/KnowledgeConfig.java
new file mode 100644
index 0000000..4322f2c
--- /dev/null
+++ b/src/main/java/cn/ac/iie/common/KnowledgeConfig.java
@@ -0,0 +1,65 @@
+package cn.ac.iie.common;
+
+import cn.ac.iie.utils.KnowledgeConfigurations;
+
+import java.io.Serializable;
+
+/**
+ * @author qidaijie
+ */
+public class KnowledgeConfig implements Serializable {
+ private static final long serialVersionUID = -8326385159484059324L;
+
+ public static final String SEGMENTATION = ",";
+ /**
+ * 4- Accounting-Request(账户授权)
+ */
+ public static final int ACCOUNTING_REQUEST = 4;
+ /**
+ * 1、开始计费
+ */
+ public static final int START_BILLING = 1;
+ /**
+ * 2、停止计费
+ */
+ public static final int STOP_BILLING = 2;
+
+ /**
+ * 报文类型
+ */
+ public static final String RADIUS_PACKET_TYPE = "radius_packet_type";
+
+ /**
+ * 发送计费请求报文时间戳
+ */
+ public static final String RADIUS_EVENT_TIMESTAMP = "radius_event_timestamp";
+
+ /**
+ * 一个用户多个计费ID关联属性
+ */
+ public static final String RADIUS_MULTI_SESSION_ID = "radius_acct_multi_session_id";
+
+ /**
+ * 用户的在线时长,以秒为单位
+ */
+ public static final String RADIUS_SESSION_TIME = "radius_acct_session_time";
+
+ /***
+ * kafka and system
+ */
+ public static final String BOOTSTRAP_SERVERS = KnowledgeConfigurations.getStringProperty(0, "bootstrap.servers");
+ public static final Integer SPOUT_PARALLELISM = KnowledgeConfigurations.getIntProperty(0, "spout.parallelism");
+ public static final String GROUP_ID = KnowledgeConfigurations.getStringProperty(0, "group.id");
+ public static final String KAFKA_TOPIC = KnowledgeConfigurations.getStringProperty(0, "kafka.topic");
+ public static final String AUTO_OFFSET_RESET = KnowledgeConfigurations.getStringProperty(0, "auto.offset.reset");
+ public static final String RESULTS_OUTPUT_SERVERS = KnowledgeConfigurations.getStringProperty(0, "results.output.servers");
+ public static final String RESULTS_OUTPUT_TOPIC = KnowledgeConfigurations.getStringProperty(0, "results.output.topic");
+ public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = KnowledgeConfigurations.getIntProperty(0, "topology.config.max.spout.pending");
+ public static final Integer TOPOLOGY_NUM_ACKS = KnowledgeConfigurations.getIntProperty(0, "topology.num.acks");
+ public static final Integer TOPOLOGY_WORKERS = KnowledgeConfigurations.getIntProperty(0, "topology.workers");
+ public static final Integer FORMAT_BOLT_PARALLELISM = KnowledgeConfigurations.getIntProperty(0, "format.bolt.parallelism");
+ public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = KnowledgeConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs");
+ public static final Integer BATCH_INSERT_NUM = KnowledgeConfigurations.getIntProperty(0, "batch.insert.num");
+
+
+} \ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
new file mode 100644
index 0000000..93c88a4
--- /dev/null
+++ b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
@@ -0,0 +1,78 @@
+package cn.ac.iie.spout;
+
+import cn.ac.iie.common.KnowledgeConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.log4j.Logger;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ */
+public class CustomizedKafkaSpout extends BaseRichSpout {
+ private static final long serialVersionUID = 2934528972182398950L;
+ private KafkaConsumer<String, String> consumer;
+ private SpoutOutputCollector collector = null;
+ private TopologyContext context = null;
+ private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class);
+
+
+ private static Properties createConsumerConfig() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", KnowledgeConfig.BOOTSTRAP_SERVERS);
+ props.put("group.id", KnowledgeConfig.GROUP_ID);
+ props.put("auto.offset.reset", KnowledgeConfig.AUTO_OFFSET_RESET);
+ props.put("session.timeout.ms", "60000");
+ props.put("max.poll.records", 3000);
+ props.put("max.partition.fetch.bytes", 31457280);
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ return props;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ // TODO Auto-generated method stub
+ this.collector = collector;
+ this.context = context;
+ Properties prop = createConsumerConfig();
+ this.consumer = new KafkaConsumer<>(prop);
+ this.consumer.subscribe(Arrays.asList(KnowledgeConfig.KAFKA_TOPIC));
+ }
+
+ @Override
+ public void close() {
+ consumer.close();
+ }
+
+ @Override
+ public void nextTuple() {
+ try {
+ // TODO Auto-generated method stub
+ ConsumerRecords<String, String> records = consumer.poll(10000L);
+ Thread.sleep(300);
+ for (ConsumerRecord<String, String> record : records) {
+ this.collector.emit(new Values(record.value()));
+ }
+ } catch (Exception e) {
+ logger.error("kfaka-spout 发送数据出现异常" + e);
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // TODO Auto-generated method stub
+ declarer.declare(new Fields("source"));
+ }
+}
diff --git a/src/main/java/cn/ac/iie/topology/RadiusLogClearTopology.java b/src/main/java/cn/ac/iie/topology/RadiusLogClearTopology.java
new file mode 100644
index 0000000..07a4218
--- /dev/null
+++ b/src/main/java/cn/ac/iie/topology/RadiusLogClearTopology.java
@@ -0,0 +1,81 @@
+package cn.ac.iie.topology;
+
+import cn.ac.iie.bolt.RadiusCleanBolt;
+import cn.ac.iie.common.KnowledgeConfig;
+import cn.ac.iie.spout.CustomizedKafkaSpout;
+import org.apache.storm.Config;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.topology.TopologyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author qidaijie
+ */
+public class RadiusLogClearTopology {
+ private static final Logger logger = LoggerFactory.getLogger(RadiusLogClearTopology.class);
+ private final String topologyName;
+ private final Config topologyConfig;
+ private TopologyBuilder builder;
+
+ public RadiusLogClearTopology() {
+ this(RadiusLogClearTopology.class.getSimpleName());
+ }
+
+ public RadiusLogClearTopology(String topologyName) {
+ this.topologyName = topologyName;
+ topologyConfig = createTopologConfig();
+ }
+
+ private Config createTopologConfig() {
+ Config conf = new Config();
+ conf.setDebug(false);
+ conf.setMessageTimeoutSecs(120);
+ conf.setTopologyWorkerMaxHeapSize(500);
+ conf.setMaxSpoutPending(KnowledgeConfig.TOPOLOGY_CONFIG_MAX_SPOUT_PENDING);
+ if (KnowledgeConfig.TOPOLOGY_NUM_ACKS == 0) {
+ conf.setNumAckers(0);
+ }
+ return conf;
+ }
+
+ public void runLocally() throws InterruptedException {
+ topologyConfig.setMaxTaskParallelism(1);
+ StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600);
+ }
+
+ public void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+ topologyConfig.setNumWorkers(KnowledgeConfig.TOPOLOGY_WORKERS);
+ //设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌
+ topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8);
+ StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
+ }
+
+ private void buildTopology() {
+ builder = new TopologyBuilder();
+ builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), KnowledgeConfig.SPOUT_PARALLELISM);
+ builder.setBolt("RadiusCleanBolt", new RadiusCleanBolt(), KnowledgeConfig.FORMAT_BOLT_PARALLELISM).localOrShuffleGrouping("CustomizedKafkaSpout");
+ }
+
+ public static void main(String[] args) throws Exception {
+ RadiusLogClearTopology csst = null;
+ boolean runLocally = true;
+ if (args.length >= 2 && "remote".equalsIgnoreCase(args[1])) {
+ runLocally = false;
+ csst = new RadiusLogClearTopology(args[0]);
+ } else {
+ csst = new RadiusLogClearTopology();
+ }
+ csst.buildTopology();
+
+ if (runLocally) {
+ logger.info("执行本地模式...");
+ csst.runLocally();
+ } else {
+ logger.info("执行远程部署模式...");
+ csst.runRemotely();
+ }
+ }
+}
diff --git a/src/main/java/cn/ac/iie/topology/StormRunner.java b/src/main/java/cn/ac/iie/topology/StormRunner.java
new file mode 100644
index 0000000..d2d4ab9
--- /dev/null
+++ b/src/main/java/cn/ac/iie/topology/StormRunner.java
@@ -0,0 +1,32 @@
+package cn.ac.iie.topology;
+
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.topology.TopologyBuilder;
+
+public final class StormRunner{
+ private static final int MILLS_IN_SEC = 1000;
+
+ private StormRunner() {}
+
+ public static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException {
+
+ LocalCluster localCluster = new LocalCluster();
+ localCluster.submitTopology(topologyName, conf, builder.createTopology());
+ Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC);
+ localCluster.shutdown();
+
+ }
+
+ public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+
+ StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
+ }
+
+
+}
diff --git a/src/main/java/cn/ac/iie/utils/KnowledgeConfigurations.java b/src/main/java/cn/ac/iie/utils/KnowledgeConfigurations.java
new file mode 100644
index 0000000..dbaf58d
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/KnowledgeConfigurations.java
@@ -0,0 +1,53 @@
+package cn.ac.iie.utils;
+
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ */
+public final class KnowledgeConfigurations {
+
+ private static Properties propCommon = new Properties();
+
+ public static String getStringProperty(Integer type, String key) {
+ if (type == 0) {
+ return propCommon.getProperty(key);
+ } else {
+ return null;
+ }
+ }
+
+
+ public static Integer getIntProperty(Integer type, String key) {
+ if (type == 0) {
+ return Integer.parseInt(propCommon.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Long getLongProperty(Integer type, String key) {
+ if (type == 0) {
+ return Long.parseLong(propCommon.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Boolean getBooleanProperty(Integer type, String key) {
+ if (type == 0) {
+ return "true".equals(propCommon.getProperty(key).toLowerCase().trim());
+ } else {
+ return null;
+ }
+ }
+
+ static {
+ try {
+ propCommon.load(KnowledgeConfigurations.class.getClassLoader().getResourceAsStream("knowledge_config.properties"));
+ } catch (Exception e) {
+ propCommon = null;
+ System.err.println("配置加载失败");
+ }
+ }
+}
diff --git a/src/main/java/cn/ac/iie/utils/LogToKafka.java b/src/main/java/cn/ac/iie/utils/LogToKafka.java
new file mode 100644
index 0000000..6f08e60
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/LogToKafka.java
@@ -0,0 +1,77 @@
+package cn.ac.iie.utils;
+
+import cn.ac.iie.common.KnowledgeConfig;
+import org.apache.kafka.clients.producer.*;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * 日志写入数据中心类
+ *
+ * @author Administrator
+ * @create 2018-08-13 15:11
+ */
+
+public class LogToKafka {
+ private static Logger logger = Logger.getLogger(LogToKafka.class);
+
+ /**
+ * kafka生产者,用于向kafka中发送消息
+ */
+ private static Producer<String, String> kafkaProducer;
+
+ /**
+ * kafka生产者适配器(单例),用来代理kafka生产者发送消息
+ */
+ private static LogToKafka logToKafka;
+
+ private LogToKafka() {
+ initKafkaProducer();
+ }
+
+ public static LogToKafka getInstance() {
+ if (logToKafka == null) {
+ logToKafka = new LogToKafka();
+ }
+ return logToKafka;
+ }
+
+
+ public void sendMessage(List<String> list) {
+ final int[] errorSum = {0};
+ for (String value : list) {
+ kafkaProducer.send(new ProducerRecord<>(KnowledgeConfig.RESULTS_OUTPUT_TOPIC, value), new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception != null) {
+ logger.error("写入" + KnowledgeConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception);
+ errorSum[0]++;
+ }
+ }
+ });
+ }
+ kafkaProducer.flush();
+ logger.debug("Log sent to National Center successfully!!!!!");
+ }
+
+ /**
+ * 根据kafka生产者配置信息初始化kafka消息生产者,只初始化一次
+ */
+ private void initKafkaProducer() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", KnowledgeConfig.RESULTS_OUTPUT_SERVERS);
+ properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put("acks", "1");
+ properties.put("linger.ms", "2");
+ properties.put("request.timeout.ms", 30000);
+ properties.put("batch.size", 262144);
+ properties.put("buffer.memory", 33554432);
+// properties.put("compression.type", "snappy");
+ kafkaProducer = new KafkaProducer<>(properties);
+ }
+
+
+}
diff --git a/src/main/java/cn/ac/iie/utils/TupleUtils.java b/src/main/java/cn/ac/iie/utils/TupleUtils.java
new file mode 100644
index 0000000..c0dc410
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/TupleUtils.java
@@ -0,0 +1,21 @@
+package cn.ac.iie.utils;
+
+import org.apache.storm.Constants;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * @author Administrator
+ */
+public final class TupleUtils {
+ /**
+ * 判断是否系统自动发送的Tuple
+ *
+ * @param tuple
+ * @return
+ */
+ public static boolean isTick(Tuple tuple) {
+ return tuple != null
+ && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
+ && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
+ }
+}
diff --git a/src/main/java/log4j.properties b/src/main/java/log4j.properties
new file mode 100644
index 0000000..dfd50f1
--- /dev/null
+++ b/src/main/java/log4j.properties
@@ -0,0 +1,25 @@
+#Log4j
+log4j.rootLogger=info,console,file
+# 控制台日志设置
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=error
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
+
+# 文件日志设置
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.file.Threshold=error
+log4j.appender.file.encoding=UTF-8
+log4j.appender.file.Append=true
+#路径请用相对路径,做好相关测试输出到应用目下
+log4j.appender.file.file=${nis.root}/log/galaxy-name.log
+log4j.appender.file.DatePattern='.'yyyy-MM-dd
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
+log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
+#MyBatis 配置,com.nis.web.dao是mybatis接口所在包
+log4j.logger.com.nis.web.dao=debug
+#bonecp数据源配置
+log4j.category.com.jolbox=debug,console
+
+
diff --git a/src/test/java/cn/ac/iie/test/JsonTest.java b/src/test/java/cn/ac/iie/test/JsonTest.java
new file mode 100644
index 0000000..35224c1
--- /dev/null
+++ b/src/test/java/cn/ac/iie/test/JsonTest.java
@@ -0,0 +1,11 @@
+package cn.ac.iie.test;
+
+import org.junit.Test;
+
+public class JsonTest {
+
+ @Test
+ public void huToolTest() {
+
+ }
+}