diff options
| author | qidaijie <[email protected]> | 2020-06-12 19:48:41 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2020-06-12 19:48:41 +0800 |
| commit | 3ad9d0cbd4d3605ba0cff67930c31eb9f5317e49 (patch) | |
| tree | b413dc6c992f3fc447a275752c674b92af99e3bd /FlumeRadiusOnOffInterceptor | |
| parent | f8dfa376c3852ae2b6f997ed2158ef302bfc9571 (diff) | |
增加 用户名写入Hbase程序和上下线日志程序
Diffstat (limited to 'FlumeRadiusOnOffInterceptor')
6 files changed, 472 insertions, 0 deletions
diff --git a/FlumeRadiusOnOffInterceptor/pom.xml b/FlumeRadiusOnOffInterceptor/pom.xml new file mode 100644 index 0000000..4517ae3 --- /dev/null +++ b/FlumeRadiusOnOffInterceptor/pom.xml @@ -0,0 +1,145 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>dynamic_complement</artifactId> + <groupId>com.zdjizhi</groupId> + <version>1.0</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>FlumeRadiusOnOffInterceptor</artifactId> + + <repositories> + <repository> + <id>nexus</id> + <name>Team Nexus Repository</name> + <url>http://192.168.40.125:8099/content/groups/public</url> + </repository> + </repositories> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <flume.version>1.9.0</flume.version> + <hbase.version>2.2.1</hbase.version> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.1</version> + <configuration> + <createDependencyReducedPom>true</createDependencyReducedPom> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>com.zdjizhi.flume.interceptor.FlumeOnOffApp</mainClass> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>META-INF/spring.handlers</resource> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>META-INF/spring.schemas</resource> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.2.1</version> + <executions> + <execution> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + <configuration> + <executable>java</executable> + <includeProjectDependencies>true</includeProjectDependencies> + <includePluginDependencies>false</includePluginDependencies> + <classpathScope>compile</classpathScope> + <mainClass>com.zdjizhi.flume.interceptor.FlumeOnOffApp</mainClass> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + <resources> + <resource> + <directory>properties</directory> + <includes> + <include>**/*.properties</include> + <include>**/*.xml</include> + </includes> + <filtering>false</filtering> + </resource> + </resources> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + <version>${flume.version}</version> + <scope>provided</scope> + </dependency> + + + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>1.2.47</version> + </dependency> + + <dependency> + <groupId>cglib</groupId> + <artifactId>cglib-nodep</artifactId> + <version>3.2.4</version> + </dependency> + + <dependency> + <groupId>com.zdjizhi</groupId> + <artifactId>galaxy</artifactId> + <version>1.0.2</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + </dependencies> + + +</project>
\ No newline at end of file diff --git a/FlumeRadiusOnOffInterceptor/properties/service_flow_config.properties b/FlumeRadiusOnOffInterceptor/properties/service_flow_config.properties new file mode 100644 index 0000000..bf2e470 --- /dev/null +++ b/FlumeRadiusOnOffInterceptor/properties/service_flow_config.properties @@ -0,0 +1,16 @@ +#kafka broker下的topic名称 +#kafka.topic=SESSION-TEST-LOG + +#数据中心(UID) +#data.center.id.num=15 + +#zookeeper.servers=192.168.40.207:2181 + +#用于过滤对准用户名 +#check.ip.scope=10,100,192 + +#hbase-zookeeper地址 +hbase.zookeeper.servers=192.168.40.224:2181 + +#hbase表名 +hbase.table.name=subscriber_info
\ No newline at end of file diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java new file mode 100644 index 0000000..3eeafc7 --- /dev/null +++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java @@ -0,0 +1,145 @@ +package com.zdjizhi.flume.interceptor; + +import com.alibaba.fastjson.JSONObject; +import com.google.common.base.Preconditions; +import com.zdjizhi.flume.interceptor.bean.Knowledge; +import com.zdjizhi.flume.interceptor.common.OnOffConfig; +import com.zdjizhi.utils.StringUtil; +import org.apache.commons.lang.StringUtils; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.interceptor.Interceptor; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.*; + + +/** + * @author qidaijie + */ +public class FlumeOnOffApp implements Interceptor { + private static Logger logger = Logger.getLogger(FlumeOnOffApp.class); + + @Override + public void initialize() { + + } + + @Override + public Event intercept(Event event) { + String message = null; + try { + message = new String(event.getBody(), "utf-8"); + } catch (UnsupportedEncodingException e) { + message = new String(event.getBody()); + } + try { + message = parsingMessage(message); + if (StringUtils.isNotBlank(message)) { + event.setBody(message.getBytes()); + return event; + } else { + return null; + } + } catch (Exception e) { + logger.error("FlumeOnOffApp intercept(Event event) method is error===>{" + e + "}<==="); + e.printStackTrace(); + } + return null; + } + + @Override + public List<Event> intercept(List<Event> list) { + List resultList = new ArrayList(); + for (Event event : list) { + Event r = intercept(event); + if (r != null) { + resultList.add(r); + } + } + return resultList; + } + + @Override + public void close() { + logger.warn("FlumeOnOffApp is closed."); + } + + /** + * 解析日志,并补全 + * 补domain,补subscriber_id + * + * @param message Security原始日志 + * @return 补全后的日志 + * <p> + */ + private String parsingMessage(String message) { + if (StringUtil.isNotBlank(message)) { + JSONObject jsonObject = JSONObject.parseObject(message); + //数据需包含 radius_packet_type and radius_acct_status_type 字段 + if (jsonObject.containsKey(OnOffConfig.RADIUS_PACKET_TYPE) && jsonObject.containsKey(OnOffConfig.RADIUS_ACCT_STATUS_TYPE)) { + int packetType = jsonObject.getInteger(OnOffConfig.RADIUS_PACKET_TYPE); + int statusType = jsonObject.getInteger(OnOffConfig.RADIUS_ACCT_STATUS_TYPE); + //条件radius_packet_type = 4 and radius_acct_status_type = 1 or 2 +// boolean existed = OnOffConfig.ACCOUNTING_REQUEST == packetType && (OnOffConfig.START_BILLING == statusType || OnOffConfig.STOP_BILLING == statusType); + if (OnOffConfig.ACCOUNTING_REQUEST == packetType && (OnOffConfig.START_BILLING == statusType || OnOffConfig.STOP_BILLING == statusType)) { + + Knowledge knowledge = new Knowledge(); + knowledge.setFramed_ip(jsonObject.getString("radius_framed_ip")); + knowledge.setAccount(jsonObject.getString("radius_account")); + knowledge.setAcct_status_type(statusType); + + /* + *如果存在时间戳则选择此时间戳没有获取当前时间 + */ + if (jsonObject.containsKey(OnOffConfig.RADIUS_EVENT_TIMESTAMP)) { + knowledge.setEvent_timestamp(jsonObject.getInteger("radius_event_timestamp")); + } else { + knowledge.setEvent_timestamp((System.currentTimeMillis() / 1000)); + } + + /* + * 标识同一个连接: + * 1.数据若存在acct_multi_session_id属性,取该属性 + * 2. 不存在取 acct_session_id + */ + if (jsonObject.containsKey(OnOffConfig.RADIUS_MULTI_SESSION_ID)) { + knowledge.setAcct_session_id(jsonObject.getString("radius_acct_multi_session_id")); + } else { + knowledge.setAcct_session_id(jsonObject.getString("radius_acct_session_id")); + } + + /* + *用户的在线时长,以秒为单位,下线用户无此属性,默认为0 + */ + if (jsonObject.containsKey(OnOffConfig.RADIUS_SESSION_TIME)) { + knowledge.setAcct_session_time(jsonObject.getInteger("radius_acct_session_time")); + } else { + knowledge.setAcct_session_time(0); + } + + return JSONObject.toJSONString(knowledge); + } + } + } + return null; + } + + public static class FlumeDynamicAppBuilder implements Interceptor.Builder { + + @Override + public Interceptor build() { + return new FlumeOnOffApp(); + } + + @Override + public void configure(Context context) { + + } + + } + +} + diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/bean/Knowledge.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/bean/Knowledge.java new file mode 100644 index 0000000..9d054ed --- /dev/null +++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/bean/Knowledge.java @@ -0,0 +1,61 @@ +package com.zdjizhi.flume.interceptor.bean; + +/** + * @author qidaijie + */ +public class Knowledge { + private String framed_ip; + private String account; + private String acct_session_id; + private int acct_status_type; + private int acct_session_time; + private long event_timestamp; + + public String getFramed_ip() { + return framed_ip; + } + + public void setFramed_ip(String framed_ip) { + this.framed_ip = framed_ip; + } + + public String getAccount() { + return account; + } + + public void setAccount(String account) { + this.account = account; + } + + public int getAcct_status_type() { + return acct_status_type; + } + + public void setAcct_status_type(int acct_status_type) { + this.acct_status_type = acct_status_type; + } + + public long getEvent_timestamp() { + return event_timestamp; + } + + public void setEvent_timestamp(long event_timestamp) { + this.event_timestamp = event_timestamp; + } + + public String getAcct_session_id() { + return acct_session_id; + } + + public void setAcct_session_id(String acct_session_id) { + this.acct_session_id = acct_session_id; + } + + public int getAcct_session_time() { + return acct_session_time; + } + + public void setAcct_session_time(int acct_session_time) { + this.acct_session_time = acct_session_time; + } +} diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java new file mode 100644 index 0000000..7407290 --- /dev/null +++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java @@ -0,0 +1,50 @@ +package com.zdjizhi.flume.interceptor.common; + + +/** + * @author Administrator + */ +public class OnOffConfig { + /** + * 4- Accounting-Request(账户授权) + */ + public static final int ACCOUNTING_REQUEST = 4; + /** + * 1、开始计费 + */ + public static final int START_BILLING = 1; + /** + * 2、停止计费 + */ + public static final int STOP_BILLING = 2; + + /** + * 计费请求报文类型 + */ + public static final String RADIUS_ACCT_STATUS_TYPE = "radius_acct_status_type"; + /** + * 报文类型 + */ + public static final String RADIUS_PACKET_TYPE = "radius_packet_type"; + + /** + * 发送计费请求报文时间戳 + */ + public static final String RADIUS_EVENT_TIMESTAMP = "radius_event_timestamp"; + + /** + * 一个用户多个计费ID关联属性 + */ + public static final String RADIUS_MULTI_SESSION_ID = "radius_acct_multi_session_id"; + + /** + * 用户的在线时长,以秒为单位 + */ + public static final String RADIUS_SESSION_TIME = "radius_acct_session_time"; + + /** + * flume使用配置 + */ + public static final String HBASE_ZOOKEEPER_SERVERS = OnOffConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); + public static final String HBASE_TABLE_NAME = OnOffConfigurations.getStringProperty(0, "hbase.table.name"); +}
\ No newline at end of file diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfigurations.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfigurations.java new file mode 100644 index 0000000..caf8572 --- /dev/null +++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfigurations.java @@ -0,0 +1,55 @@ +package com.zdjizhi.flume.interceptor.common; + +import java.util.Properties; + + +/** + * @author Administrator + */ + +public final class OnOffConfigurations { + + private static Properties propService = new Properties(); + + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propService.getProperty(key); + } else { + return null; + } + + } + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propService.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propService.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return "true".equals(propService.getProperty(key).toLowerCase().trim()); + } else { + return null; + } + } + + static { + try { + propService.load(OnOffConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); + } catch (Exception e) { + propService = null; + } + } +} |
