summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2020-06-12 19:48:41 +0800
committerqidaijie <[email protected]>2020-06-12 19:48:41 +0800
commit3ad9d0cbd4d3605ba0cff67930c31eb9f5317e49 (patch)
treeb413dc6c992f3fc447a275752c674b92af99e3bd
parentf8dfa376c3852ae2b6f997ed2158ef302bfc9571 (diff)
增加 用户名写入Hbase程序和上下线日志程序
-rw-r--r--FlumeRadiusOnOffInterceptor/pom.xml145
-rw-r--r--FlumeRadiusOnOffInterceptor/properties/service_flow_config.properties16
-rw-r--r--FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java145
-rw-r--r--FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/bean/Knowledge.java61
-rw-r--r--FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java50
-rw-r--r--FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfigurations.java55
-rw-r--r--FlumeSubscriberInterceptor/pom.xml163
-rw-r--r--FlumeSubscriberInterceptor/properties/service_flow_config.properties16
-rw-r--r--FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java261
-rw-r--r--FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfig.java34
-rw-r--r--FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfigurations.java55
-rw-r--r--pom.xml2
12 files changed, 1003 insertions, 0 deletions
diff --git a/FlumeRadiusOnOffInterceptor/pom.xml b/FlumeRadiusOnOffInterceptor/pom.xml
new file mode 100644
index 0000000..4517ae3
--- /dev/null
+++ b/FlumeRadiusOnOffInterceptor/pom.xml
@@ -0,0 +1,145 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>dynamic_complement</artifactId>
+ <groupId>com.zdjizhi</groupId>
+ <version>1.0</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>FlumeRadiusOnOffInterceptor</artifactId>
+
+ <repositories>
+ <repository>
+ <id>nexus</id>
+ <name>Team Nexus Repository</name>
+ <url>http://192.168.40.125:8099/content/groups/public</url>
+ </repository>
+ </repositories>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <flume.version>1.9.0</flume.version>
+ <hbase.version>2.2.1</hbase.version>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.1</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>com.zdjizhi.flume.interceptor.FlumeOnOffApp</mainClass>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>META-INF/spring.handlers</resource>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>META-INF/spring.schemas</resource>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2.1</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <executable>java</executable>
+ <includeProjectDependencies>true</includeProjectDependencies>
+ <includePluginDependencies>false</includePluginDependencies>
+ <classpathScope>compile</classpathScope>
+ <mainClass>com.zdjizhi.flume.interceptor.FlumeOnOffApp</mainClass>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>properties</directory>
+ <includes>
+ <include>**/*.properties</include>
+ <include>**/*.xml</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+ </resources>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ <version>${flume.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>1.2.47</version>
+ </dependency>
+
+ <dependency>
+ <groupId>cglib</groupId>
+ <artifactId>cglib-nodep</artifactId>
+ <version>3.2.4</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>galaxy</artifactId>
+ <version>1.0.2</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ </dependencies>
+
+
+</project> \ No newline at end of file
diff --git a/FlumeRadiusOnOffInterceptor/properties/service_flow_config.properties b/FlumeRadiusOnOffInterceptor/properties/service_flow_config.properties
new file mode 100644
index 0000000..bf2e470
--- /dev/null
+++ b/FlumeRadiusOnOffInterceptor/properties/service_flow_config.properties
@@ -0,0 +1,16 @@
+#kafka broker下的topic名称
+#kafka.topic=SESSION-TEST-LOG
+
+#数据中心(UID)
+#data.center.id.num=15
+
+#zookeeper.servers=192.168.40.207:2181
+
+#用于过滤对准用户名
+#check.ip.scope=10,100,192
+
+#hbase-zookeeper地址
+hbase.zookeeper.servers=192.168.40.224:2181
+
+#hbase表名
+hbase.table.name=subscriber_info \ No newline at end of file
diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java
new file mode 100644
index 0000000..3eeafc7
--- /dev/null
+++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java
@@ -0,0 +1,145 @@
+package com.zdjizhi.flume.interceptor;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.base.Preconditions;
+import com.zdjizhi.flume.interceptor.bean.Knowledge;
+import com.zdjizhi.flume.interceptor.common.OnOffConfig;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.interceptor.Interceptor;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.*;
+
+
+/**
+ * @author qidaijie
+ */
+public class FlumeOnOffApp implements Interceptor {
+ private static Logger logger = Logger.getLogger(FlumeOnOffApp.class);
+
+ @Override
+ public void initialize() {
+
+ }
+
+ @Override
+ public Event intercept(Event event) {
+ String message = null;
+ try {
+ message = new String(event.getBody(), "utf-8");
+ } catch (UnsupportedEncodingException e) {
+ message = new String(event.getBody());
+ }
+ try {
+ message = parsingMessage(message);
+ if (StringUtils.isNotBlank(message)) {
+ event.setBody(message.getBytes());
+ return event;
+ } else {
+ return null;
+ }
+ } catch (Exception e) {
+ logger.error("FlumeOnOffApp intercept(Event event) method is error===>{" + e + "}<===");
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ @Override
+ public List<Event> intercept(List<Event> list) {
+ List resultList = new ArrayList();
+ for (Event event : list) {
+ Event r = intercept(event);
+ if (r != null) {
+ resultList.add(r);
+ }
+ }
+ return resultList;
+ }
+
+ @Override
+ public void close() {
+ logger.warn("FlumeOnOffApp is closed.");
+ }
+
+ /**
+ * 解析日志,并补全
+ * 补domain,补subscriber_id
+ *
+ * @param message Security原始日志
+ * @return 补全后的日志
+ * <p>
+ */
+ private String parsingMessage(String message) {
+ if (StringUtil.isNotBlank(message)) {
+ JSONObject jsonObject = JSONObject.parseObject(message);
+ //数据需包含 radius_packet_type and radius_acct_status_type 字段
+ if (jsonObject.containsKey(OnOffConfig.RADIUS_PACKET_TYPE) && jsonObject.containsKey(OnOffConfig.RADIUS_ACCT_STATUS_TYPE)) {
+ int packetType = jsonObject.getInteger(OnOffConfig.RADIUS_PACKET_TYPE);
+ int statusType = jsonObject.getInteger(OnOffConfig.RADIUS_ACCT_STATUS_TYPE);
+ //条件radius_packet_type = 4 and radius_acct_status_type = 1 or 2
+// boolean existed = OnOffConfig.ACCOUNTING_REQUEST == packetType && (OnOffConfig.START_BILLING == statusType || OnOffConfig.STOP_BILLING == statusType);
+ if (OnOffConfig.ACCOUNTING_REQUEST == packetType && (OnOffConfig.START_BILLING == statusType || OnOffConfig.STOP_BILLING == statusType)) {
+
+ Knowledge knowledge = new Knowledge();
+ knowledge.setFramed_ip(jsonObject.getString("radius_framed_ip"));
+ knowledge.setAccount(jsonObject.getString("radius_account"));
+ knowledge.setAcct_status_type(statusType);
+
+ /*
+ *如果存在时间戳则选择此时间戳没有获取当前时间
+ */
+ if (jsonObject.containsKey(OnOffConfig.RADIUS_EVENT_TIMESTAMP)) {
+ knowledge.setEvent_timestamp(jsonObject.getInteger("radius_event_timestamp"));
+ } else {
+ knowledge.setEvent_timestamp((System.currentTimeMillis() / 1000));
+ }
+
+ /*
+ * 标识同一个连接:
+ * 1.数据若存在acct_multi_session_id属性,取该属性
+ * 2. 不存在取 acct_session_id
+ */
+ if (jsonObject.containsKey(OnOffConfig.RADIUS_MULTI_SESSION_ID)) {
+ knowledge.setAcct_session_id(jsonObject.getString("radius_acct_multi_session_id"));
+ } else {
+ knowledge.setAcct_session_id(jsonObject.getString("radius_acct_session_id"));
+ }
+
+ /*
+ *用户的在线时长,以秒为单位,下线用户无此属性,默认为0
+ */
+ if (jsonObject.containsKey(OnOffConfig.RADIUS_SESSION_TIME)) {
+ knowledge.setAcct_session_time(jsonObject.getInteger("radius_acct_session_time"));
+ } else {
+ knowledge.setAcct_session_time(0);
+ }
+
+ return JSONObject.toJSONString(knowledge);
+ }
+ }
+ }
+ return null;
+ }
+
+ public static class FlumeDynamicAppBuilder implements Interceptor.Builder {
+
+ @Override
+ public Interceptor build() {
+ return new FlumeOnOffApp();
+ }
+
+ @Override
+ public void configure(Context context) {
+
+ }
+
+ }
+
+}
+
diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/bean/Knowledge.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/bean/Knowledge.java
new file mode 100644
index 0000000..9d054ed
--- /dev/null
+++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/bean/Knowledge.java
@@ -0,0 +1,61 @@
+package com.zdjizhi.flume.interceptor.bean;
+
+/**
+ * @author qidaijie
+ */
+public class Knowledge {
+ private String framed_ip;
+ private String account;
+ private String acct_session_id;
+ private int acct_status_type;
+ private int acct_session_time;
+ private long event_timestamp;
+
+ public String getFramed_ip() {
+ return framed_ip;
+ }
+
+ public void setFramed_ip(String framed_ip) {
+ this.framed_ip = framed_ip;
+ }
+
+ public String getAccount() {
+ return account;
+ }
+
+ public void setAccount(String account) {
+ this.account = account;
+ }
+
+ public int getAcct_status_type() {
+ return acct_status_type;
+ }
+
+ public void setAcct_status_type(int acct_status_type) {
+ this.acct_status_type = acct_status_type;
+ }
+
+ public long getEvent_timestamp() {
+ return event_timestamp;
+ }
+
+ public void setEvent_timestamp(long event_timestamp) {
+ this.event_timestamp = event_timestamp;
+ }
+
+ public String getAcct_session_id() {
+ return acct_session_id;
+ }
+
+ public void setAcct_session_id(String acct_session_id) {
+ this.acct_session_id = acct_session_id;
+ }
+
+ public int getAcct_session_time() {
+ return acct_session_time;
+ }
+
+ public void setAcct_session_time(int acct_session_time) {
+ this.acct_session_time = acct_session_time;
+ }
+}
diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java
new file mode 100644
index 0000000..7407290
--- /dev/null
+++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java
@@ -0,0 +1,50 @@
+package com.zdjizhi.flume.interceptor.common;
+
+
+/**
+ * @author Administrator
+ */
+public class OnOffConfig {
+ /**
+ * 4- Accounting-Request(账户授权)
+ */
+ public static final int ACCOUNTING_REQUEST = 4;
+ /**
+ * 1、开始计费
+ */
+ public static final int START_BILLING = 1;
+ /**
+ * 2、停止计费
+ */
+ public static final int STOP_BILLING = 2;
+
+ /**
+ * 计费请求报文类型
+ */
+ public static final String RADIUS_ACCT_STATUS_TYPE = "radius_acct_status_type";
+ /**
+ * 报文类型
+ */
+ public static final String RADIUS_PACKET_TYPE = "radius_packet_type";
+
+ /**
+ * 发送计费请求报文时间戳
+ */
+ public static final String RADIUS_EVENT_TIMESTAMP = "radius_event_timestamp";
+
+ /**
+ * 一个用户多个计费ID关联属性
+ */
+ public static final String RADIUS_MULTI_SESSION_ID = "radius_acct_multi_session_id";
+
+ /**
+ * 用户的在线时长,以秒为单位
+ */
+ public static final String RADIUS_SESSION_TIME = "radius_acct_session_time";
+
+ /**
+ * flume使用配置
+ */
+ public static final String HBASE_ZOOKEEPER_SERVERS = OnOffConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
+ public static final String HBASE_TABLE_NAME = OnOffConfigurations.getStringProperty(0, "hbase.table.name");
+} \ No newline at end of file
diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfigurations.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfigurations.java
new file mode 100644
index 0000000..caf8572
--- /dev/null
+++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfigurations.java
@@ -0,0 +1,55 @@
+package com.zdjizhi.flume.interceptor.common;
+
+import java.util.Properties;
+
+
+/**
+ * @author Administrator
+ */
+
+public final class OnOffConfigurations {
+
+ private static Properties propService = new Properties();
+
+
+ public static String getStringProperty(Integer type, String key) {
+ if (type == 0) {
+ return propService.getProperty(key);
+ } else {
+ return null;
+ }
+
+ }
+
+ public static Integer getIntProperty(Integer type, String key) {
+ if (type == 0) {
+ return Integer.parseInt(propService.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Long getLongProperty(Integer type, String key) {
+ if (type == 0) {
+ return Long.parseLong(propService.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Boolean getBooleanProperty(Integer type, String key) {
+ if (type == 0) {
+ return "true".equals(propService.getProperty(key).toLowerCase().trim());
+ } else {
+ return null;
+ }
+ }
+
+ static {
+ try {
+ propService.load(OnOffConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
+ } catch (Exception e) {
+ propService = null;
+ }
+ }
+}
diff --git a/FlumeSubscriberInterceptor/pom.xml b/FlumeSubscriberInterceptor/pom.xml
new file mode 100644
index 0000000..a359d60
--- /dev/null
+++ b/FlumeSubscriberInterceptor/pom.xml
@@ -0,0 +1,163 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>dynamic_complement</artifactId>
+ <groupId>com.zdjizhi</groupId>
+ <version>1.0</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>FlumeSubscriberInterceptor</artifactId>
+
+ <repositories>
+
+ <repository>
+ <id>ebi</id>
+ <name>www.ebi.ac.uk</name>
+ <url>http://www.ebi.ac.uk/intact/maven/nexus/content/groups/public/</url>
+ </repository>
+ </repositories>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <flume.version>1.9.0</flume.version>
+ <hbase.version>2.2.1</hbase.version>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.1</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>com.zdjizhi.flume.interceptor.FlumeSubscriberInterceptor</mainClass>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>META-INF/spring.handlers</resource>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>META-INF/spring.schemas</resource>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2.1</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <executable>java</executable>
+ <includeProjectDependencies>true</includeProjectDependencies>
+ <includePluginDependencies>false</includePluginDependencies>
+ <classpathScope>compile</classpathScope>
+ <mainClass>com.zdjizhi.flume.interceptor.FlumeSubscriberApp</mainClass>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>properties</directory>
+ <includes>
+ <include>**/*.properties</include>
+ <include>**/*.xml</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+ </resources>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ <version>${flume.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>1.2.47</version>
+ </dependency>
+
+ <dependency>
+ <groupId>cglib</groupId>
+ <artifactId>cglib-nodep</artifactId>
+ <version>3.2.4</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ </dependencies>
+
+
+</project> \ No newline at end of file
diff --git a/FlumeSubscriberInterceptor/properties/service_flow_config.properties b/FlumeSubscriberInterceptor/properties/service_flow_config.properties
new file mode 100644
index 0000000..bf2e470
--- /dev/null
+++ b/FlumeSubscriberInterceptor/properties/service_flow_config.properties
@@ -0,0 +1,16 @@
+#kafka broker下的topic名称
+#kafka.topic=SESSION-TEST-LOG
+
+#数据中心(UID)
+#data.center.id.num=15
+
+#zookeeper.servers=192.168.40.207:2181
+
+#用于过滤对准用户名
+#check.ip.scope=10,100,192
+
+#hbase-zookeeper地址
+hbase.zookeeper.servers=192.168.40.224:2181
+
+#hbase表名
+hbase.table.name=subscriber_info \ No newline at end of file
diff --git a/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java b/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java
new file mode 100644
index 0000000..9800a86
--- /dev/null
+++ b/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java
@@ -0,0 +1,261 @@
+package com.zdjizhi.flume.interceptor;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.base.Preconditions;
+import com.zdjizhi.flume.interceptor.common.SubscriberConfig;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.interceptor.Interceptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.*;
+
+
+/**
+ * @author qidaijie
+ */
+public class FlumeSubscriberApp implements Interceptor {
+ private static Logger logger = Logger.getLogger(FlumeSubscriberApp.class);
+
+ private static Map<String, String> subIdMap;
+ private List<Put> putList;
+ private static Connection connection;
+ private String hbaseZookeeperIp;
+ private String hbaseTableName;
+ private int updateHBaseTime;
+
+ public FlumeSubscriberApp(String hbaseZookeeperIp, String hbaseTableName, int updateHBaseTime) {
+ this.hbaseZookeeperIp = hbaseZookeeperIp;
+ this.hbaseTableName = hbaseTableName;
+ this.updateHBaseTime = updateHBaseTime;
+ }
+
+
+ @Override
+ public void initialize() {
+ subIdMap = new HashMap<>(256);
+ putList = new ArrayList<>();
+
+ // 管理HBase的配置信息
+ Configuration configuration = HBaseConfiguration.create();
+ // 设置zookeeper节点
+ configuration.set("hbase.zookeeper.quorum", hbaseZookeeperIp);
+ configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ configuration.set("hbase.client.retries.number", "3");
+ configuration.set("hbase.bulkload.retries.number", "3");
+ configuration.set("zookeeper.recovery.retry", "3");
+ try {
+ connection = ConnectionFactory.createConnection(configuration);
+ } catch (IOException e) {
+ logger.error("用户名信息写入HBase程序,连接HBase异常");
+ e.printStackTrace();
+ }
+
+ getAll(hbaseTableName);
+
+ new Timer().schedule(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ insertData(putList, hbaseTableName);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }, 0, updateHBaseTime * 1000);
+ }
+
+
+ @Override
+ public Event intercept(Event event) {
+ String message = null;
+ try {
+ message = new String(event.getBody(), "utf-8");
+ } catch (UnsupportedEncodingException e) {
+ message = new String(event.getBody());
+ }
+ try {
+ if (StringUtils.isNotBlank(message)) {
+ message = dealCommonMessage(message);
+ event.setBody(message.getBytes());
+ return event;
+ }
+ } catch (Exception e) {
+ logger.error("FlumeSubscriberApp intercept(Event event) method is error===>{" + e + "}<===");
+ e.printStackTrace();
+ }
+ return event;
+ }
+
+ @Override
+ public List<Event> intercept(List<Event> list) {
+ List resultList = new ArrayList();
+ for (Event event : list) {
+ Event r = intercept(event);
+ if (r != null) {
+ resultList.add(r);
+ }
+ }
+ return resultList;
+ }
+
+ @Override
+ public void close() {
+ logger.warn("FlumeSubscriberApp is closed.");
+ }
+
+ /**
+ * 解析日志,并补全
+ * 补domain,补subscriber_id
+ *
+ * @param message Security原始日志
+ * @return 补全后的日志
+ * <p>
+ */
+ private String dealCommonMessage(String message) {
+ JSONObject jsonObject = JSONObject.parseObject(message);
+ if (jsonObject.containsKey(SubscriberConfig.PACKET_TYPE) && jsonObject.containsKey(SubscriberConfig.STATUS_TYPE)) {
+ if (SubscriberConfig.ACCOUNTING_REQUEST == jsonObject.getInteger(SubscriberConfig.PACKET_TYPE)
+ && SubscriberConfig.START_BILLING == jsonObject.getInteger(SubscriberConfig.STATUS_TYPE)) {
+ String framedIp = jsonObject.getString("radius_framed_ip");
+ String account = jsonObject.getString("radius_account");
+ dataValidation(framedIp, account, putList);
+ }
+ if (putList.size() == SubscriberConfig.LIST_SIZE_MAX) {
+ insertData(putList, hbaseTableName);
+ }
+
+ }
+ return message;
+ }
+
+
+ /**
+ * 获取所有的 key value
+ */
+ private static void getAll(String tableNmae) {
+ try {
+ Table table = connection.getTable(TableName.valueOf("sub:" + tableNmae));
+ Scan scan = new Scan();
+ ResultScanner scanner = table.getScanner(scan);
+ for (Result result : scanner) {
+ Cell[] cells = result.rawCells();
+ for (Cell cell : cells) {
+ subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
+ }
+ }
+ scanner.close();
+ } catch (IOException e) {
+ logger.error("获取HBase所有row key出现异常");
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 写入数据到HBase
+ *
+ * @param putList puts list
+ */
+ private static void insertData(List<Put> putList, String tableName) {
+ Table table = null;
+ try {
+ table = connection.getTable(TableName.valueOf("sub:" + tableName));
+ table.put(putList);
+ putList.clear();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ if (table != null) {
+ table.close();
+ }
+ } catch (IOException e) {
+ logger.error("更新数据写入HBase失败");
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ /**
+ * 验证数据并与内存中的对比
+ *
+ * @param ip framed_ip
+ * @param account account
+ */
+ private static void dataValidation(String ip, String account, List<Put> putList) {
+ if (subIdMap.containsKey(ip)) {
+ if (!subIdMap.get(ip).equals(account)) {
+ Put put = new Put(ip.getBytes());
+ put.addColumn("subscriber_id".getBytes(), "account".getBytes(), account.getBytes());
+ putList.add(put);
+ subIdMap.put(ip, account);
+ }
+ } else {
+ Put put = new Put(ip.getBytes());
+ put.addColumn("subscriber_id".getBytes(), "account".getBytes(), account.getBytes());
+ putList.add(put);
+ subIdMap.put(ip, account);
+ }
+ }
+
+
+ public static class FlumeDynamicAppBuilder implements Interceptor.Builder {
+ private String hbaseZookeeperIp;
+ private String hbaseTableName;
+ private int updateHBaseTime;
+
+
+ @Override
+ public Interceptor build() {
+ return new FlumeSubscriberApp(hbaseZookeeperIp, hbaseTableName, updateHBaseTime);
+ }
+
+ @Override
+ public void configure(Context context) {
+ try {
+ this.hbaseZookeeperIp = context.getString("hbaseZookeeperIp", "");
+ Preconditions.checkNotNull("".equals(hbaseZookeeperIp), "hbaseZookeeperIp must be set!!");
+ logger.info("FlumeSubscriberApp Read hbaseZookeeperIp from configuration : " + hbaseZookeeperIp);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("FlumeSubscriberApp hbaseZookeeperIp invalid", e);
+ } catch (Exception e) {
+ logger.error("FlumeSubscriberApp Get hbaseZookeeperIp is error : " + e);
+ }
+
+ try {
+ this.hbaseTableName = context.getString("hbaseTableName", "");
+ Preconditions.checkNotNull("".equals(hbaseTableName), "hbaseTableName must be set!!");
+ logger.info("FlumeSubscriberApp Read hbaseTableName from configuration : " + hbaseTableName);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("FlumeSubscriberApp hbaseTableName invalid", e);
+ } catch (Exception e) {
+ logger.error("FlumeSubscriberApp Get hbaseTableName is error : " + e);
+ }
+
+ try {
+ this.updateHBaseTime = context.getInteger("updateHBaseTime", 30);
+ Preconditions.checkNotNull("".equals(updateHBaseTime), "updateHBaseTime must be set!!");
+ logger.info("FlumeSubscriberApp Read updateHBaseTime from configuration : " + updateHBaseTime);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("FlumeSubscriberApp updateHBaseTime invalid", e);
+ } catch (Exception e) {
+ logger.error("FlumeSubscriberApp Get updateHBaseTime is error : " + e);
+ }
+
+ }
+
+ }
+
+}
+
diff --git a/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfig.java b/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfig.java
new file mode 100644
index 0000000..7bd14d4
--- /dev/null
+++ b/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfig.java
@@ -0,0 +1,34 @@
+package com.zdjizhi.flume.interceptor.common;
+
+
+/**
+ * @author Administrator
+ */
+public class SubscriberConfig {
+ /**
+ * 最多存在多少数据即写入hbase
+ */
+ public static final int LIST_SIZE_MAX = 5000;
+ /**
+ * 4- Accounting-Request(账户授权)
+ */
+ public static final int ACCOUNTING_REQUEST = 4;
+ /**
+ * radius_packet_type
+ */
+ public static final String PACKET_TYPE = "radius_packet_type";
+ /**
+ * 1、开始计费
+ */
+ public static final int START_BILLING = 1;
+ /**
+ * radius_acct_status_type
+ */
+ public static final String STATUS_TYPE = "radius_acct_status_type";
+
+ /**
+ * flume使用配置
+ */
+ public static final String HBASE_ZOOKEEPER_SERVERS = SubscriberConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
+ public static final String HBASE_TABLE_NAME = SubscriberConfigurations.getStringProperty(0, "hbase.table.name");
+} \ No newline at end of file
diff --git a/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfigurations.java b/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfigurations.java
new file mode 100644
index 0000000..cbae69d
--- /dev/null
+++ b/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfigurations.java
@@ -0,0 +1,55 @@
+package com.zdjizhi.flume.interceptor.common;
+
+import java.util.Properties;
+
+
+/**
+ * @author Administrator
+ */
+
+public final class SubscriberConfigurations {
+
+ private static Properties propService = new Properties();
+
+
+ public static String getStringProperty(Integer type, String key) {
+ if (type == 0) {
+ return propService.getProperty(key);
+ } else {
+ return null;
+ }
+
+ }
+
+ public static Integer getIntProperty(Integer type, String key) {
+ if (type == 0) {
+ return Integer.parseInt(propService.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Long getLongProperty(Integer type, String key) {
+ if (type == 0) {
+ return Long.parseLong(propService.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Boolean getBooleanProperty(Integer type, String key) {
+ if (type == 0) {
+ return "true".equals(propService.getProperty(key).toLowerCase().trim());
+ } else {
+ return null;
+ }
+ }
+
+ static {
+ try {
+ propService.load(SubscriberConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
+ } catch (Exception e) {
+ propService = null;
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 1224ebf..023c16b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,6 +10,8 @@
<version>1.0</version>
<modules>
<module>FlumeDynamicInterceptor</module>
+ <module>FlumeSubscriberInterceptor</module>
+ <module>FlumeRadiusOnOffInterceptor</module>
</modules>