diff options
| author | qidaijie <[email protected]> | 2020-06-12 19:48:41 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2020-06-12 19:48:41 +0800 |
| commit | 3ad9d0cbd4d3605ba0cff67930c31eb9f5317e49 (patch) | |
| tree | b413dc6c992f3fc447a275752c674b92af99e3bd | |
| parent | f8dfa376c3852ae2b6f997ed2158ef302bfc9571 (diff) | |
增加 用户名写入Hbase程序和上下线日志程序
12 files changed, 1003 insertions, 0 deletions
diff --git a/FlumeRadiusOnOffInterceptor/pom.xml b/FlumeRadiusOnOffInterceptor/pom.xml new file mode 100644 index 0000000..4517ae3 --- /dev/null +++ b/FlumeRadiusOnOffInterceptor/pom.xml @@ -0,0 +1,145 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>dynamic_complement</artifactId> + <groupId>com.zdjizhi</groupId> + <version>1.0</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>FlumeRadiusOnOffInterceptor</artifactId> + + <repositories> + <repository> + <id>nexus</id> + <name>Team Nexus Repository</name> + <url>http://192.168.40.125:8099/content/groups/public</url> + </repository> + </repositories> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <flume.version>1.9.0</flume.version> + <hbase.version>2.2.1</hbase.version> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.1</version> + <configuration> + <createDependencyReducedPom>true</createDependencyReducedPom> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>com.zdjizhi.flume.interceptor.FlumeOnOffApp</mainClass> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>META-INF/spring.handlers</resource> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>META-INF/spring.schemas</resource> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.2.1</version> + <executions> + <execution> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + <configuration> + <executable>java</executable> + <includeProjectDependencies>true</includeProjectDependencies> + <includePluginDependencies>false</includePluginDependencies> + <classpathScope>compile</classpathScope> + <mainClass>com.zdjizhi.flume.interceptor.FlumeOnOffApp</mainClass> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + <resources> + <resource> + <directory>properties</directory> + <includes> + <include>**/*.properties</include> + <include>**/*.xml</include> + </includes> + <filtering>false</filtering> + </resource> + </resources> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + <version>${flume.version}</version> + <scope>provided</scope> + </dependency> + + + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>1.2.47</version> + </dependency> + + <dependency> + <groupId>cglib</groupId> + <artifactId>cglib-nodep</artifactId> + <version>3.2.4</version> + </dependency> + + <dependency> + <groupId>com.zdjizhi</groupId> + <artifactId>galaxy</artifactId> + <version>1.0.2</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + </dependencies> + + +</project>
\ No newline at end of file diff --git a/FlumeRadiusOnOffInterceptor/properties/service_flow_config.properties b/FlumeRadiusOnOffInterceptor/properties/service_flow_config.properties new file mode 100644 index 0000000..bf2e470 --- /dev/null +++ b/FlumeRadiusOnOffInterceptor/properties/service_flow_config.properties @@ -0,0 +1,16 @@ +#kafka broker下的topic名称 +#kafka.topic=SESSION-TEST-LOG + +#数据中心(UID) +#data.center.id.num=15 + +#zookeeper.servers=192.168.40.207:2181 + +#用于过滤对准用户名 +#check.ip.scope=10,100,192 + +#hbase-zookeeper地址 +hbase.zookeeper.servers=192.168.40.224:2181 + +#hbase表名 +hbase.table.name=subscriber_info
\ No newline at end of file diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java new file mode 100644 index 0000000..3eeafc7 --- /dev/null +++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java @@ -0,0 +1,145 @@ +package com.zdjizhi.flume.interceptor; + +import com.alibaba.fastjson.JSONObject; +import com.google.common.base.Preconditions; +import com.zdjizhi.flume.interceptor.bean.Knowledge; +import com.zdjizhi.flume.interceptor.common.OnOffConfig; +import com.zdjizhi.utils.StringUtil; +import org.apache.commons.lang.StringUtils; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.interceptor.Interceptor; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.*; + + +/** + * @author qidaijie + */ +public class FlumeOnOffApp implements Interceptor { + private static Logger logger = Logger.getLogger(FlumeOnOffApp.class); + + @Override + public void initialize() { + + } + + @Override + public Event intercept(Event event) { + String message = null; + try { + message = new String(event.getBody(), "utf-8"); + } catch (UnsupportedEncodingException e) { + message = new String(event.getBody()); + } + try { + message = parsingMessage(message); + if (StringUtils.isNotBlank(message)) { + event.setBody(message.getBytes()); + return event; + } else { + return null; + } + } catch (Exception e) { + logger.error("FlumeOnOffApp intercept(Event event) method is error===>{" + e + "}<==="); + e.printStackTrace(); + } + return null; + } + + @Override + public List<Event> intercept(List<Event> list) { + List resultList = new ArrayList(); + for (Event event : list) { + Event r = intercept(event); + if (r != null) { + resultList.add(r); + } + } + return resultList; + } + + @Override + public void close() { + logger.warn("FlumeOnOffApp is closed."); + } + + /** + * 解析日志,并补全 + * 补domain,补subscriber_id + * + * @param message Security原始日志 + * @return 补全后的日志 + * <p> + */ + private String parsingMessage(String message) { + if (StringUtil.isNotBlank(message)) { + JSONObject jsonObject = JSONObject.parseObject(message); + //数据需包含 radius_packet_type and radius_acct_status_type 字段 + if (jsonObject.containsKey(OnOffConfig.RADIUS_PACKET_TYPE) && jsonObject.containsKey(OnOffConfig.RADIUS_ACCT_STATUS_TYPE)) { + int packetType = jsonObject.getInteger(OnOffConfig.RADIUS_PACKET_TYPE); + int statusType = jsonObject.getInteger(OnOffConfig.RADIUS_ACCT_STATUS_TYPE); + //条件radius_packet_type = 4 and radius_acct_status_type = 1 or 2 +// boolean existed = OnOffConfig.ACCOUNTING_REQUEST == packetType && (OnOffConfig.START_BILLING == statusType || OnOffConfig.STOP_BILLING == statusType); + if (OnOffConfig.ACCOUNTING_REQUEST == packetType && (OnOffConfig.START_BILLING == statusType || OnOffConfig.STOP_BILLING == statusType)) { + + Knowledge knowledge = new Knowledge(); + knowledge.setFramed_ip(jsonObject.getString("radius_framed_ip")); + knowledge.setAccount(jsonObject.getString("radius_account")); + knowledge.setAcct_status_type(statusType); + + /* + *如果存在时间戳则选择此时间戳没有获取当前时间 + */ + if (jsonObject.containsKey(OnOffConfig.RADIUS_EVENT_TIMESTAMP)) { + knowledge.setEvent_timestamp(jsonObject.getInteger("radius_event_timestamp")); + } else { + knowledge.setEvent_timestamp((System.currentTimeMillis() / 1000)); + } + + /* + * 标识同一个连接: + * 1.数据若存在acct_multi_session_id属性,取该属性 + * 2. 不存在取 acct_session_id + */ + if (jsonObject.containsKey(OnOffConfig.RADIUS_MULTI_SESSION_ID)) { + knowledge.setAcct_session_id(jsonObject.getString("radius_acct_multi_session_id")); + } else { + knowledge.setAcct_session_id(jsonObject.getString("radius_acct_session_id")); + } + + /* + *用户的在线时长,以秒为单位,下线用户无此属性,默认为0 + */ + if (jsonObject.containsKey(OnOffConfig.RADIUS_SESSION_TIME)) { + knowledge.setAcct_session_time(jsonObject.getInteger("radius_acct_session_time")); + } else { + knowledge.setAcct_session_time(0); + } + + return JSONObject.toJSONString(knowledge); + } + } + } + return null; + } + + public static class FlumeDynamicAppBuilder implements Interceptor.Builder { + + @Override + public Interceptor build() { + return new FlumeOnOffApp(); + } + + @Override + public void configure(Context context) { + + } + + } + +} + diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/bean/Knowledge.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/bean/Knowledge.java new file mode 100644 index 0000000..9d054ed --- /dev/null +++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/bean/Knowledge.java @@ -0,0 +1,61 @@ +package com.zdjizhi.flume.interceptor.bean; + +/** + * @author qidaijie + */ +public class Knowledge { + private String framed_ip; + private String account; + private String acct_session_id; + private int acct_status_type; + private int acct_session_time; + private long event_timestamp; + + public String getFramed_ip() { + return framed_ip; + } + + public void setFramed_ip(String framed_ip) { + this.framed_ip = framed_ip; + } + + public String getAccount() { + return account; + } + + public void setAccount(String account) { + this.account = account; + } + + public int getAcct_status_type() { + return acct_status_type; + } + + public void setAcct_status_type(int acct_status_type) { + this.acct_status_type = acct_status_type; + } + + public long getEvent_timestamp() { + return event_timestamp; + } + + public void setEvent_timestamp(long event_timestamp) { + this.event_timestamp = event_timestamp; + } + + public String getAcct_session_id() { + return acct_session_id; + } + + public void setAcct_session_id(String acct_session_id) { + this.acct_session_id = acct_session_id; + } + + public int getAcct_session_time() { + return acct_session_time; + } + + public void setAcct_session_time(int acct_session_time) { + this.acct_session_time = acct_session_time; + } +} diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java new file mode 100644 index 0000000..7407290 --- /dev/null +++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java @@ -0,0 +1,50 @@ +package com.zdjizhi.flume.interceptor.common; + + +/** + * @author Administrator + */ +public class OnOffConfig { + /** + * 4- Accounting-Request(账户授权) + */ + public static final int ACCOUNTING_REQUEST = 4; + /** + * 1、开始计费 + */ + public static final int START_BILLING = 1; + /** + * 2、停止计费 + */ + public static final int STOP_BILLING = 2; + + /** + * 计费请求报文类型 + */ + public static final String RADIUS_ACCT_STATUS_TYPE = "radius_acct_status_type"; + /** + * 报文类型 + */ + public static final String RADIUS_PACKET_TYPE = "radius_packet_type"; + + /** + * 发送计费请求报文时间戳 + */ + public static final String RADIUS_EVENT_TIMESTAMP = "radius_event_timestamp"; + + /** + * 一个用户多个计费ID关联属性 + */ + public static final String RADIUS_MULTI_SESSION_ID = "radius_acct_multi_session_id"; + + /** + * 用户的在线时长,以秒为单位 + */ + public static final String RADIUS_SESSION_TIME = "radius_acct_session_time"; + + /** + * flume使用配置 + */ + public static final String HBASE_ZOOKEEPER_SERVERS = OnOffConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); + public static final String HBASE_TABLE_NAME = OnOffConfigurations.getStringProperty(0, "hbase.table.name"); +}
\ No newline at end of file diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfigurations.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfigurations.java new file mode 100644 index 0000000..caf8572 --- /dev/null +++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfigurations.java @@ -0,0 +1,55 @@ +package com.zdjizhi.flume.interceptor.common; + +import java.util.Properties; + + +/** + * @author Administrator + */ + +public final class OnOffConfigurations { + + private static Properties propService = new Properties(); + + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propService.getProperty(key); + } else { + return null; + } + + } + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propService.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propService.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return "true".equals(propService.getProperty(key).toLowerCase().trim()); + } else { + return null; + } + } + + static { + try { + propService.load(OnOffConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); + } catch (Exception e) { + propService = null; + } + } +} diff --git a/FlumeSubscriberInterceptor/pom.xml b/FlumeSubscriberInterceptor/pom.xml new file mode 100644 index 0000000..a359d60 --- /dev/null +++ b/FlumeSubscriberInterceptor/pom.xml @@ -0,0 +1,163 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>dynamic_complement</artifactId> + <groupId>com.zdjizhi</groupId> + <version>1.0</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>FlumeSubscriberInterceptor</artifactId> + + <repositories> + + <repository> + <id>ebi</id> + <name>www.ebi.ac.uk</name> + <url>http://www.ebi.ac.uk/intact/maven/nexus/content/groups/public/</url> + </repository> + </repositories> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <flume.version>1.9.0</flume.version> + <hbase.version>2.2.1</hbase.version> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.1</version> + <configuration> + <createDependencyReducedPom>true</createDependencyReducedPom> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>com.zdjizhi.flume.interceptor.FlumeSubscriberInterceptor</mainClass> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>META-INF/spring.handlers</resource> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>META-INF/spring.schemas</resource> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.2.1</version> + <executions> + <execution> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + <configuration> + <executable>java</executable> + <includeProjectDependencies>true</includeProjectDependencies> + <includePluginDependencies>false</includePluginDependencies> + <classpathScope>compile</classpathScope> + <mainClass>com.zdjizhi.flume.interceptor.FlumeSubscriberApp</mainClass> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + <resources> + <resource> + <directory>properties</directory> + <includes> + <include>**/*.properties</include> + <include>**/*.xml</include> + </includes> + <filtering>false</filtering> + </resource> + </resources> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + <version>${flume.version}</version> + <scope>provided</scope> + </dependency> + + + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>1.2.47</version> + </dependency> + + <dependency> + <groupId>cglib</groupId> + <artifactId>cglib-nodep</artifactId> + <version>3.2.4</version> + </dependency> + + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <version>${hbase.version}</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + </dependencies> + + +</project>
\ No newline at end of file diff --git a/FlumeSubscriberInterceptor/properties/service_flow_config.properties b/FlumeSubscriberInterceptor/properties/service_flow_config.properties new file mode 100644 index 0000000..bf2e470 --- /dev/null +++ b/FlumeSubscriberInterceptor/properties/service_flow_config.properties @@ -0,0 +1,16 @@ +#kafka broker下的topic名称 +#kafka.topic=SESSION-TEST-LOG + +#数据中心(UID) +#data.center.id.num=15 + +#zookeeper.servers=192.168.40.207:2181 + +#用于过滤对准用户名 +#check.ip.scope=10,100,192 + +#hbase-zookeeper地址 +hbase.zookeeper.servers=192.168.40.224:2181 + +#hbase表名 +hbase.table.name=subscriber_info
\ No newline at end of file diff --git a/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java b/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java new file mode 100644 index 0000000..9800a86 --- /dev/null +++ b/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeSubscriberApp.java @@ -0,0 +1,261 @@ +package com.zdjizhi.flume.interceptor; + +import com.alibaba.fastjson.JSONObject; +import com.google.common.base.Preconditions; +import com.zdjizhi.flume.interceptor.common.SubscriberConfig; +import org.apache.commons.lang.StringUtils; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.interceptor.Interceptor; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.*; + + +/** + * @author qidaijie + */ +public class FlumeSubscriberApp implements Interceptor { + private static Logger logger = Logger.getLogger(FlumeSubscriberApp.class); + + private static Map<String, String> subIdMap; + private List<Put> putList; + private static Connection connection; + private String hbaseZookeeperIp; + private String hbaseTableName; + private int updateHBaseTime; + + public FlumeSubscriberApp(String hbaseZookeeperIp, String hbaseTableName, int updateHBaseTime) { + this.hbaseZookeeperIp = hbaseZookeeperIp; + this.hbaseTableName = hbaseTableName; + this.updateHBaseTime = updateHBaseTime; + } + + + @Override + public void initialize() { + subIdMap = new HashMap<>(256); + putList = new ArrayList<>(); + + // 管理HBase的配置信息 + Configuration configuration = HBaseConfiguration.create(); + // 设置zookeeper节点 + configuration.set("hbase.zookeeper.quorum", hbaseZookeeperIp); + configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); + configuration.set("hbase.client.retries.number", "3"); + configuration.set("hbase.bulkload.retries.number", "3"); + configuration.set("zookeeper.recovery.retry", "3"); + try { + connection = ConnectionFactory.createConnection(configuration); + } catch (IOException e) { + logger.error("用户名信息写入HBase程序,连接HBase异常"); + e.printStackTrace(); + } + + getAll(hbaseTableName); + + new Timer().schedule(new TimerTask() { + @Override + public void run() { + try { + insertData(putList, hbaseTableName); + } catch (Exception e) { + e.printStackTrace(); + } + } + }, 0, updateHBaseTime * 1000); + } + + + @Override + public Event intercept(Event event) { + String message = null; + try { + message = new String(event.getBody(), "utf-8"); + } catch (UnsupportedEncodingException e) { + message = new String(event.getBody()); + } + try { + if (StringUtils.isNotBlank(message)) { + message = dealCommonMessage(message); + event.setBody(message.getBytes()); + return event; + } + } catch (Exception e) { + logger.error("FlumeSubscriberApp intercept(Event event) method is error===>{" + e + "}<==="); + e.printStackTrace(); + } + return event; + } + + @Override + public List<Event> intercept(List<Event> list) { + List resultList = new ArrayList(); + for (Event event : list) { + Event r = intercept(event); + if (r != null) { + resultList.add(r); + } + } + return resultList; + } + + @Override + public void close() { + logger.warn("FlumeSubscriberApp is closed."); + } + + /** + * 解析日志,并补全 + * 补domain,补subscriber_id + * + * @param message Security原始日志 + * @return 补全后的日志 + * <p> + */ + private String dealCommonMessage(String message) { + JSONObject jsonObject = JSONObject.parseObject(message); + if (jsonObject.containsKey(SubscriberConfig.PACKET_TYPE) && jsonObject.containsKey(SubscriberConfig.STATUS_TYPE)) { + if (SubscriberConfig.ACCOUNTING_REQUEST == jsonObject.getInteger(SubscriberConfig.PACKET_TYPE) + && SubscriberConfig.START_BILLING == jsonObject.getInteger(SubscriberConfig.STATUS_TYPE)) { + String framedIp = jsonObject.getString("radius_framed_ip"); + String account = jsonObject.getString("radius_account"); + dataValidation(framedIp, account, putList); + } + if (putList.size() == SubscriberConfig.LIST_SIZE_MAX) { + insertData(putList, hbaseTableName); + } + + } + return message; + } + + + /** + * 获取所有的 key value + */ + private static void getAll(String tableNmae) { + try { + Table table = connection.getTable(TableName.valueOf("sub:" + tableNmae)); + Scan scan = new Scan(); + ResultScanner scanner = table.getScanner(scan); + for (Result result : scanner) { + Cell[] cells = result.rawCells(); + for (Cell cell : cells) { + subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); + } + } + scanner.close(); + } catch (IOException e) { + logger.error("获取HBase所有row key出现异常"); + e.printStackTrace(); + } + } + + /** + * 写入数据到HBase + * + * @param putList puts list + */ + private static void insertData(List<Put> putList, String tableName) { + Table table = null; + try { + table = connection.getTable(TableName.valueOf("sub:" + tableName)); + table.put(putList); + putList.clear(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + try { + if (table != null) { + table.close(); + } + } catch (IOException e) { + logger.error("更新数据写入HBase失败"); + e.printStackTrace(); + } + } + + } + + /** + * 验证数据并与内存中的对比 + * + * @param ip framed_ip + * @param account account + */ + private static void dataValidation(String ip, String account, List<Put> putList) { + if (subIdMap.containsKey(ip)) { + if (!subIdMap.get(ip).equals(account)) { + Put put = new Put(ip.getBytes()); + put.addColumn("subscriber_id".getBytes(), "account".getBytes(), account.getBytes()); + putList.add(put); + subIdMap.put(ip, account); + } + } else { + Put put = new Put(ip.getBytes()); + put.addColumn("subscriber_id".getBytes(), "account".getBytes(), account.getBytes()); + putList.add(put); + subIdMap.put(ip, account); + } + } + + + public static class FlumeDynamicAppBuilder implements Interceptor.Builder { + private String hbaseZookeeperIp; + private String hbaseTableName; + private int updateHBaseTime; + + + @Override + public Interceptor build() { + return new FlumeSubscriberApp(hbaseZookeeperIp, hbaseTableName, updateHBaseTime); + } + + @Override + public void configure(Context context) { + try { + this.hbaseZookeeperIp = context.getString("hbaseZookeeperIp", ""); + Preconditions.checkNotNull("".equals(hbaseZookeeperIp), "hbaseZookeeperIp must be set!!"); + logger.info("FlumeSubscriberApp Read hbaseZookeeperIp from configuration : " + hbaseZookeeperIp); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("FlumeSubscriberApp hbaseZookeeperIp invalid", e); + } catch (Exception e) { + logger.error("FlumeSubscriberApp Get hbaseZookeeperIp is error : " + e); + } + + try { + this.hbaseTableName = context.getString("hbaseTableName", ""); + Preconditions.checkNotNull("".equals(hbaseTableName), "hbaseTableName must be set!!"); + logger.info("FlumeSubscriberApp Read hbaseTableName from configuration : " + hbaseTableName); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("FlumeSubscriberApp hbaseTableName invalid", e); + } catch (Exception e) { + logger.error("FlumeSubscriberApp Get hbaseTableName is error : " + e); + } + + try { + this.updateHBaseTime = context.getInteger("updateHBaseTime", 30); + Preconditions.checkNotNull("".equals(updateHBaseTime), "updateHBaseTime must be set!!"); + logger.info("FlumeSubscriberApp Read updateHBaseTime from configuration : " + updateHBaseTime); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("FlumeSubscriberApp updateHBaseTime invalid", e); + } catch (Exception e) { + logger.error("FlumeSubscriberApp Get updateHBaseTime is error : " + e); + } + + } + + } + +} + diff --git a/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfig.java b/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfig.java new file mode 100644 index 0000000..7bd14d4 --- /dev/null +++ b/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfig.java @@ -0,0 +1,34 @@ +package com.zdjizhi.flume.interceptor.common; + + +/** + * @author Administrator + */ +public class SubscriberConfig { + /** + * 最多存在多少数据即写入hbase + */ + public static final int LIST_SIZE_MAX = 5000; + /** + * 4- Accounting-Request(账户授权) + */ + public static final int ACCOUNTING_REQUEST = 4; + /** + * radius_packet_type + */ + public static final String PACKET_TYPE = "radius_packet_type"; + /** + * 1、开始计费 + */ + public static final int START_BILLING = 1; + /** + * radius_acct_status_type + */ + public static final String STATUS_TYPE = "radius_acct_status_type"; + + /** + * flume使用配置 + */ + public static final String HBASE_ZOOKEEPER_SERVERS = SubscriberConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); + public static final String HBASE_TABLE_NAME = SubscriberConfigurations.getStringProperty(0, "hbase.table.name"); +}
\ No newline at end of file diff --git a/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfigurations.java b/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfigurations.java new file mode 100644 index 0000000..cbae69d --- /dev/null +++ b/FlumeSubscriberInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/SubscriberConfigurations.java @@ -0,0 +1,55 @@ +package com.zdjizhi.flume.interceptor.common; + +import java.util.Properties; + + +/** + * @author Administrator + */ + +public final class SubscriberConfigurations { + + private static Properties propService = new Properties(); + + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propService.getProperty(key); + } else { + return null; + } + + } + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propService.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propService.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return "true".equals(propService.getProperty(key).toLowerCase().trim()); + } else { + return null; + } + } + + static { + try { + propService.load(SubscriberConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); + } catch (Exception e) { + propService = null; + } + } +} @@ -10,6 +10,8 @@ <version>1.0</version> <modules> <module>FlumeDynamicInterceptor</module> + <module>FlumeSubscriberInterceptor</module> + <module>FlumeRadiusOnOffInterceptor</module> </modules> |
