diff options
| author | qidaijie <[email protected]> | 2021-04-23 17:25:47 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2021-04-23 17:25:47 +0800 |
| commit | c8f8d3c9cf5ac2f9836307ead3da3b8309c1e57c (patch) | |
| tree | a8dbb4eb4b2cd9a5938b8b8f57aafe845abdbb1d | |
| parent | 24ff96be4cab715379f457034999a879e6eb9e50 (diff) | |
修改版本到最21.04.23,增加app_match函数
12 files changed, 514 insertions, 244 deletions
diff --git a/FlumeDynamicInterceptor/dependency-reduced-pom.xml b/FlumeDynamicInterceptor/dependency-reduced-pom.xml index ff88a66..1410d46 100644 --- a/FlumeDynamicInterceptor/dependency-reduced-pom.xml +++ b/FlumeDynamicInterceptor/dependency-reduced-pom.xml @@ -3,7 +3,7 @@ <parent> <artifactId>dynamic_complement</artifactId> <groupId>com.zdjizhi</groupId> - <version>1.0</version> + <version>v3.21.01.18</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>FlumeDynamicInterceptor</artifactId> @@ -87,6 +87,15 @@ <name>www.ebi.ac.uk</name> <url>http://www.ebi.ac.uk/intact/maven/nexus/content/groups/public/</url> </repository> + <repository> + <releases /> + <snapshots> + <updatePolicy>always</updatePolicy> + <checksumPolicy>fail</checksumPolicy> + </snapshots> + <id>maven-ali</id> + <url>http://maven.aliyun.com/nexus/content/groups/public/</url> + </repository> </repositories> <dependencies> <dependency> @@ -125,6 +134,18 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>hamcrest-core</artifactId> + <groupId>org.hamcrest</groupId> + </exclusion> + </exclusions> + </dependency> </dependencies> <properties> <hbase.version>2.2.1</hbase.version> diff --git a/FlumeDynamicInterceptor/pom.xml b/FlumeDynamicInterceptor/pom.xml index 5d6bae9..3183510 100644 --- a/FlumeDynamicInterceptor/pom.xml +++ b/FlumeDynamicInterceptor/pom.xml @@ -5,7 +5,7 @@ <parent> <artifactId>dynamic_complement</artifactId> <groupId>com.zdjizhi</groupId> - <version>1.0</version> + <version>v3.21.04.23</version> </parent> <modelVersion>4.0.0</modelVersion> @@ -18,20 +18,12 @@ <url>http://192.168.40.125:8099/content/groups/public</url> </repository> - <repository> - <id>ebi</id> - <name>www.ebi.ac.uk</name> - <url>http://www.ebi.ac.uk/intact/maven/nexus/content/groups/public/</url> - </repository> </repositories> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flume.version>1.9.0</flume.version> - <hbase.version>2.2.1</hbase.version> - <!--<hadoop.version>2.8.5</hadoop.version>--> - <!--<hbase.version>1.4.9</hbase.version>--> - <!--<hadoop.version>2.7.1</hadoop.version>--> + <hbase.version>2.2.3</hbase.version> </properties> <build> @@ -41,11 +33,11 @@ <artifactId>maven-shade-plugin</artifactId> <version>2.4.1</version> <configuration> - <createDependencyReducedPom>true</createDependencyReducedPom> + <createDependencyReducedPom>false</createDependencyReducedPom> </configuration> <executions> <execution> - <phase>package</phase> + <phase>install</phase> <goals> <goal>shade</goal> </goals> @@ -70,6 +62,21 @@ </execution> </executions> </plugin> + + <plugin> + <groupId>io.github.zlika</groupId> + <artifactId>reproducible-build-maven-plugin</artifactId> + <version>0.2</version> + <executions> + <execution> + <goals> + <goal>strip-jar</goal> + </goals> + <phase>install</phase> + </execution> + </executions> + </plugin> + <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> @@ -108,13 +115,6 @@ </includes> <filtering>false</filtering> </resource> - <!--<resource>--> - <!--<directory>src/main/java</directory>--> - <!--<includes>--> - <!--<include>log4j.properties</include>--> - <!--</includes>--> - <!--<filtering>false</filtering>--> - <!--</resource>--> </resources> </build> @@ -129,7 +129,6 @@ <dependency> <groupId>com.zdjizhi</groupId> <artifactId>galaxy</artifactId> -<!-- <version>1.0.2</version>--> <version>1.0.3</version> <exclusions> <exclusion> @@ -150,30 +149,16 @@ <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> - <version>1.2.47</version> + <version>1.2.70</version> </dependency> - <!--<dependency>--> - <!--<groupId>redis.clients</groupId>--> - <!--<artifactId>jedis</artifactId>--> - <!--<version>2.8.1</version>--> - <!--</dependency>--> - <!-- <dependency>--> - <!-- <groupId>org.apache.zookeeper</groupId>--> - <!-- <artifactId>zookeeper</artifactId>--> - <!-- <version>3.4.9</version>--> - <!-- <exclusions>--> - <!-- <exclusion>--> - <!-- <artifactId>slf4j-log4j12</artifactId>--> - <!-- <groupId>org.slf4j</groupId>--> - <!-- </exclusion>--> - <!-- <exclusion>--> - <!-- <artifactId>log4j-over-slf4j</artifactId>--> - <!-- <groupId>org.slf4j</groupId>--> - <!-- </exclusion>--> - <!-- </exclusions>--> - <!-- </dependency>--> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> <dependency> <groupId>cglib</groupId> @@ -199,94 +184,34 @@ </dependency> <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-server</artifactId> - <version>${hbase.version}</version> - <exclusions> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - <exclusion> - <artifactId>log4j-over-slf4j</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - </exclusions> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.5.2</version> </dependency> - <!--<dependency>--> - <!--<groupId>org.apache.hadoop</groupId>--> - <!--<artifactId>hadoop-common</artifactId>--> - <!--<version>${hadoop.version}</version>--> - <!--<exclusions>--> - <!--<exclusion>--> - <!--<artifactId>slf4j-log4j12</artifactId>--> - <!--<groupId>org.slf4j</groupId>--> - <!--</exclusion>--> - <!--<exclusion>--> - <!--<artifactId>log4j-over-slf4j</artifactId>--> - <!--<groupId>org.slf4j</groupId>--> - <!--</exclusion>--> - <!--</exclusions>--> - <!--</dependency>--> - - <!--<dependency>--> - <!--<groupId>org.apache.hadoop</groupId>--> - <!--<artifactId>hadoop-client</artifactId>--> - <!--<version>${hadoop.version}</version>--> - <!--<exclusions>--> - <!--<exclusion>--> - <!--<artifactId>slf4j-log4j12</artifactId>--> - <!--<groupId>org.slf4j</groupId>--> - <!--</exclusion>--> - <!--<exclusion>--> - <!--<artifactId>log4j-over-slf4j</artifactId>--> - <!--<groupId>org.slf4j</groupId>--> - <!--</exclusion>--> - <!--</exclusions>--> - <!--</dependency>--> - - <!--<dependency>--> - <!--<groupId>org.apache.hadoop</groupId>--> - <!--<artifactId>hadoop-hdfs</artifactId>--> - <!--<version>${hadoop.version}</version>--> - <!--<exclusions>--> - <!--<exclusion>--> - <!--<artifactId>slf4j-log4j12</artifactId>--> - <!--<groupId>org.slf4j</groupId>--> - <!--</exclusion>--> - <!--<exclusion>--> - <!--<artifactId>log4j-over-slf4j</artifactId>--> - <!--<groupId>org.slf4j</groupId>--> - <!--</exclusion>--> - <!--</exclusions>--> - <!--</dependency>--> - -<!-- <dependency>--> -<!-- <groupId>org.junit.jupiter</groupId>--> -<!-- <artifactId>junit-jupiter-api</artifactId>--> -<!-- <version>5.3.2</version>--> -<!-- <scope>compile</scope>--> -<!-- </dependency>--> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + <version>4.4.1</version> + </dependency> - <!--<dependency>--> - <!--<groupId>com.google.guava</groupId>--> - <!--<artifactId>guava</artifactId>--> - <!--<!–<version>18.0</version>–>--> - <!--<version>11.0.2</version>--> - <!--</dependency>--> + <dependency> + <groupId>com.jayway.jsonpath</groupId> + <artifactId>json-path</artifactId> + <version>2.4.0</version> + </dependency> - <!--<dependency>--> - <!--<groupId>org.apache.httpcomponents</groupId>--> - <!--<artifactId>httpclient</artifactId>--> - <!--<version>4.5.2</version>--> - <!--</dependency>--> + <dependency> + <groupId>io.prometheus</groupId> + <artifactId>simpleclient_pushgateway</artifactId> + <version>0.9.0</version> + </dependency> - <!--<dependency>--> - <!--<groupId>org.apache.httpcomponents</groupId>--> - <!--<artifactId>httpcore</artifactId>--> - <!--<version>4.4.1</version>--> - <!--</dependency>--> + <dependency> + <groupId>cn.hutool</groupId> + <artifactId>hutool-all</artifactId> + <version>5.5.2</version> + </dependency> </dependencies> diff --git a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeDynamicApp.java b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeDynamicApp.java index 55423ec..e379612 100644 --- a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeDynamicApp.java +++ b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeDynamicApp.java @@ -1,8 +1,13 @@ package com.zdjizhi.flume.interceptor; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import com.alibaba.fastjson.JSONObject; import com.google.common.base.Preconditions; +import com.jayway.jsonpath.InvalidPathException; +import com.jayway.jsonpath.JsonPath; import com.zdjizhi.flume.interceptor.common.FlowWriteConfig; +import com.zdjizhi.flume.interceptor.utils.app.AppUtils; import com.zdjizhi.flume.interceptor.utils.hbase.HBaseUtils; import com.zdjizhi.flume.interceptor.utils.json.JsonParseUtil; import com.zdjizhi.utils.Encodes; @@ -20,17 +25,21 @@ import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * @author qidaijie */ public class FlumeDynamicApp implements Interceptor { - private static Logger logger = Logger.getLogger(FlumeDynamicApp.class); + private static final Log logger = LogFactory.get(); + private static final Pattern PATTERN = Pattern.compile("[0-9]*"); private static IpLookup ipLookup; private static FormatUtils formatUtils; private String schemaHttpUrl; + private String appIdHttpUrl; private String uidZookeeperIp; private long dataCenterIdNum; private String ipDatPath; @@ -50,17 +59,6 @@ public class FlumeDynamicApp implements Interceptor { formatUtils = new FormatUtils.Builder(false).build(); //载入定位库 ipLookup = new IpLookup.Builder(false) - /** - * v1.0.2-com.zdjizhi.galaxy - */ -// .loadDataFileV4(ipDatPath + "Kazakhstan.mmdb") -// .loadDataFileV6(ipDatPath + "Kazakhstan.mmdb") -// .loadAsnDataFileV4(ipDatPath + "asn_v4.mmdb") -// .loadAsnDataFileV6(ipDatPath + "asn_v6.mmdb") - - /** - * v1.0.3-com.zdjizhi.galaxy - */ .loadDataFileV4(ipDatPath + "ip_v4.mmdb") .loadDataFileV6(ipDatPath + "ip_v6.mmdb") .loadDataFilePrivateV4(ipDatPath + "ip_private_v4.mmdb") @@ -70,8 +68,9 @@ public class FlumeDynamicApp implements Interceptor { .build(); } - public FlumeDynamicApp(String schemaHttpUrl, String zookeeperIp, long dataCenterIdNum, String ipDatPath, String hbaseZookeeperIp, String hbaseTableName) { + public FlumeDynamicApp(String schemaHttpUrl, String appIdHttpUrl, String zookeeperIp, long dataCenterIdNum, String ipDatPath, String hbaseZookeeperIp, String hbaseTableName) { this.schemaHttpUrl = schemaHttpUrl; + this.appIdHttpUrl = appIdHttpUrl; this.uidZookeeperIp = zookeeperIp; this.dataCenterIdNum = dataCenterIdNum; this.ipDatPath = ipDatPath; @@ -133,18 +132,29 @@ public class FlumeDynamicApp implements Interceptor { Object name = JsonParseUtil.getValue(object, strings[0]); String appendToKeyName = strings[1]; String functionName = strings[2]; - Object param = null; - if (strings[3] != null) { - param = JsonParseUtil.getValue(object, strings[3]); - } + Object appendTo = JsonParseUtil.getValue(object, appendToKeyName); + + String param = strings[3]; switch (functionName) { case "current_timestamp": - JsonParseUtil.setValue(object, appendToKeyName, getCurrentTime()); + if ((long) appendTo == 0L) { + JsonParseUtil.setValue(object, appendToKeyName, getCurrentTime()); + } break; case "snowflake_id": JsonParseUtil.setValue(object, appendToKeyName, formatUtils.getSnowflakeId(uidZookeeperIp, dataCenterIdNum)); break; + case "set_value": + if (name != null && param != null) { + JsonParseUtil.setValue(object, appendToKeyName, setValue(param)); + } + break; + case "flattenSpec": + if (name != null && StringUtil.isNotBlank(param)) { + JsonParseUtil.setValue(object, appendToKeyName, flattenSpec(name.toString(), isJsonValue(object, param))); + } + break; case "geo_ip_detail": if (name != null && JsonParseUtil.getValue(object, appendToKeyName) == null) { JsonParseUtil.setValue(object, appendToKeyName, getGeoIpDetail(name.toString())); @@ -155,9 +165,14 @@ public class FlumeDynamicApp implements Interceptor { JsonParseUtil.setValue(object, appendToKeyName, getGeoAsn(name.toString())); } break; + case "if": + if (param != null) { + JsonParseUtil.setValue(object, appendToKeyName, condition(object, param)); + } + break; case "get_value": if (name != null) { - JsonParseUtil.setValue(object, appendToKeyName, name.toString()); + JsonParseUtil.setValue(object, appendToKeyName, name); } break; case "radius_match": @@ -165,6 +180,11 @@ public class FlumeDynamicApp implements Interceptor { JsonParseUtil.setValue(object, appendToKeyName, radiusMatch(name.toString(), hbaseZookeeperIp, hbaseTableName)); } break; + case "app_match": + if ((int) name != 0 && appendTo == null) { + JsonParseUtil.setValue(object, appendToKeyName, appMatch(appIdHttpUrl, Integer.parseInt(name.toString()))); + } + break; case "geo_ip_country": if (name != null && JsonParseUtil.getValue(object, appendToKeyName) == null) { JsonParseUtil.setValue(object, appendToKeyName, getGeoIpCountry(name.toString())); @@ -172,19 +192,12 @@ public class FlumeDynamicApp implements Interceptor { break; case "decode_of_base64": if (name != null) { - if (param != null) { - JsonParseUtil.setValue(object, appendToKeyName, Encodes.decodeBase64String(name.toString(), param.toString())); - } else { - JsonParseUtil.setValue(object, appendToKeyName, Encodes.decodeBase64String(name.toString(), FlowWriteConfig.MAIL_DEFAULT_CHARSET)); - } + JsonParseUtil.setValue(object, appendToKeyName, decodeBase64(name.toString(), isJsonValue(object, param))); } break; case "sub_domain": - if (name != null) { - Object appendTo = JsonParseUtil.getValue(object, appendToKeyName); - if (appendTo == null || StringUtil.isBlank(appendTo.toString())) { - JsonParseUtil.setValue(object, appendToKeyName, replaceGetTopDomain(FormatUtils.getTopPrivateDomain(name.toString()))); - } + if (appendTo == null && name != null) { + JsonParseUtil.setValue(object, appendToKeyName, replaceGetTopDomain(name.toString())); } break; default: @@ -194,10 +207,9 @@ public class FlumeDynamicApp implements Interceptor { return JSONObject.toJSONString(object); } catch (Exception e) { - logger.error("FlumeDynamicApp dealCommonMessage is error===>{" + e + "}<==="); - e.printStackTrace(); -// return ""; - return message;//返回原数据 + logger.error("FlumeDynamicApp dealCommonMessage is error===>", e); + //返回原数据 + return message; } } @@ -208,15 +220,20 @@ public class FlumeDynamicApp implements Interceptor { * @param url * @return 顶级域名 */ - private String replaceGetTopDomain(String url) { - return url.replaceAll("InternetDomainName\\{name=", "").replaceAll("\\}", ""); + private static String replaceGetTopDomain(String url) { + try { + return FormatUtils.getTopPrivateDomain(url).replaceAll("InternetDomainName\\{name=", "").replaceAll("\\}", ""); + } catch (StringIndexOutOfBoundsException outException) { + logger.error("解析顶级域名异常,异常域名:" + url, outException); + return ""; + } } /** * 生成当前时间戳的操作 */ - private int getCurrentTime() { - return (int) (System.currentTimeMillis() / 1000); + private static long getCurrentTime() { + return (System.currentTimeMillis() / 1000); } /** @@ -225,7 +242,7 @@ public class FlumeDynamicApp implements Interceptor { * @param ip * @return */ - private String getGeoIpDetail(String ip) { + private static String getGeoIpDetail(String ip) { return ipLookup.cityLookupDetail(ip); } @@ -235,7 +252,7 @@ public class FlumeDynamicApp implements Interceptor { * @param ip * @return */ - private String getGeoAsn(String ip) { + private static String getGeoAsn(String ip) { // return ipLookup.asnLookup(ip, true);//v1.0.2-com.zdjizhi.galaxy return ipLookup.asnLookup(ip);//v1.0.3-com.zdjizhi.galaxy } @@ -246,7 +263,7 @@ public class FlumeDynamicApp implements Interceptor { * @param ip * @return */ - private String getGeoIpCountry(String ip) { + private static String getGeoIpCountry(String ip) { return ipLookup.countryLookup(ip); } @@ -260,8 +277,137 @@ public class FlumeDynamicApp implements Interceptor { return HBaseUtils.getAccount(ip, hbaseZookeeper, hbaseTable); } + /** + * appId与缓存中对应关系补全appName + * + * @param appId id + * @return appName + */ + private static String appMatch(String appIdHttpUrl, int appId) { + String appName = AppUtils.getAppName(appIdHttpUrl, appId); + if (StringUtil.isBlank(appName)) { + logger.warn("AppMap get appName is null, ID is :{}", appId); + } + return appName; + } + + /** + * 根据编码解码base64 + * + * @param message base64 + * @param charset 编码 + * @return 解码字符串 + */ + private static String decodeBase64(String message, Object charset) { + String result = ""; + try { + if (StringUtil.isNotBlank(message)) { + if (charset != null) { + result = Encodes.decodeBase64String(message, charset.toString()); + } else { + result = Encodes.decodeBase64String(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET); + } + } + } catch (Exception e) { + logger.error("解析 Base64 异常,异常信息:" + e); + } + return result; + } + + /** + * 根据表达式解析json + * + * @param message json + * @param expr 解析表达式 + * @return 解析结果 + */ + private static String flattenSpec(String message, String expr) { + String flattenResult = ""; + try { + ArrayList<String> read = JsonPath.parse(message).read(expr); + flattenResult = read.get(0); + } catch (ClassCastException | InvalidPathException e) { + logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误", e); + } + return flattenResult; + } + + /** + * 判断是否为日志字段,是则返回对应value,否则返回原始字符串 + * + * @param object 内存实体类 + * @param param 字段名/普通字符串 + * @return JSON.Value or String + */ + private static String isJsonValue(Object object, String param) { + if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) { + Object value = JsonParseUtil.getValue(object, param.substring(2)); + if (value != null) { + return value.toString(); + } else { + return ""; + } + } else { + return param; + } + } + + /** + * IF函数实现,解析日志构建三目运算 + * + * @param object 内存实体类 + * @param ifParam 字段名/普通字符串 + * @return resultA or resultB or "" + */ + private static Object condition(Object object, String ifParam) { + try { + String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER); + String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER); + String direction = isJsonValue(object, norms[0]); + if (StringUtil.isNotBlank(direction)) { + if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) { + String resultA = isJsonValue(object, split[1]); + String resultB = isJsonValue(object, split[2]); + String result = (Integer.parseInt(direction) == Integer.parseInt(norms[1])) ? resultA : resultB; + Matcher isNum = PATTERN.matcher(result); + if (isNum.matches()) { + return Long.parseLong(result); + } else { + return result; + } + } + } + } catch (Exception e) { + logger.error("IF 函数执行异常,异常信息:" + e); + e.printStackTrace(); + } + return null; + } + + /** + * 设置固定值函数 若为数字则转为long返回 + * + * @param param 默认值 + * @return 返回数字或字符串 + */ + static Object setValue(String param) { + try { + Matcher isNum = PATTERN.matcher(param); + if (isNum.matches()) { + return Long.parseLong(param); + } else { + return param; + } + } catch (Exception e) { + logger.error("SetValue 函数异常,异常信息:" + e); + e.printStackTrace(); + } + return null; + } + public static class FlumeDynamicAppBuilder implements Interceptor.Builder { private String schemaHttpUrl; + private String appIdHttpUrl; private String uidZookeeperIp; private long dataCenterIdNum; private String ipDatPath; @@ -272,6 +418,7 @@ public class FlumeDynamicApp implements Interceptor { @Override public Interceptor build() { return new FlumeDynamicApp(this.schemaHttpUrl, + this.appIdHttpUrl, this.uidZookeeperIp, this.dataCenterIdNum, this.ipDatPath, this.hbaseZookeeperIp, this.hbaseTableName); @@ -290,6 +437,16 @@ public class FlumeDynamicApp implements Interceptor { } try { + this.appIdHttpUrl = context.getString("appIdHttpUrl", ""); + Preconditions.checkNotNull("".equals(appIdHttpUrl), "appIdHttpUrl must be set!!"); + logger.info("FlumeDynamicApp Read appIdHttpUrl from configuration : " + appIdHttpUrl); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("FlumeDynamicApp appIdHttpUrl invalid", e); + } catch (Exception e) { + logger.error("FlumeDynamicApp Get appIdHttpUrl is error : " + e); + } + + try { this.uidZookeeperIp = context.getString("uidZookeeperIp", ""); Preconditions.checkNotNull("".equals(uidZookeeperIp), "uidZookeeperIp must be set!!"); logger.info("FlumeDynamicApp Read uidZookeeperIp from configuration : " + uidZookeeperIp); diff --git a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/FlowWriteConfig.java b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/FlowWriteConfig.java index 958197a..c31ea2c 100644 --- a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/FlowWriteConfig.java +++ b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/FlowWriteConfig.java @@ -7,6 +7,11 @@ import com.zdjizhi.flume.interceptor.utils.system.FlowWriteConfigurations; * @author Administrator */ public class FlowWriteConfig { + public static final String VISIBILITY = "disabled"; + public static final int IF_PARAM_LENGTH = 3; + public static final String FORMAT_SPLITTER = ","; + public static final String IS_JSON_KEY_TAG = "$."; + public static final String IF_CONDITION_SPLITTER = "="; // public static final String SEGMENTATION = ","; diff --git a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/app/AppUtils.java b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/app/AppUtils.java new file mode 100644 index 0000000..9035cf7 --- /dev/null +++ b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/app/AppUtils.java @@ -0,0 +1,116 @@ +package com.zdjizhi.flume.interceptor.utils.app; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.flume.interceptor.utils.http.HttpClientUtil; +import com.zdjizhi.utils.StringUtil; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * AppId 工具类 + * + * @author qidaijie + */ + +public class AppUtils { + private static final Log logger = LogFactory.get(); + private static Map<Integer, String> appIdMap = new ConcurrentHashMap<>(128); + private static AppUtils appUtils; + private static String appIdHttpUrl; + + private static void getAppInstance(String url) { + appUtils = new AppUtils(url); + } + + + /** + * 构造函数-新 + */ + private AppUtils(String url) { + appIdHttpUrl = url; + timestampsFilter(); + //定时更新 + updateAppIdCache(); + } + + /** + * 更新变量 + */ + private static void change() { + timestampsFilter(); + } + + + /** + * 获取变更内容 + */ + private static void timestampsFilter() { + try { + Long begin = System.currentTimeMillis(); + String schema = HttpClientUtil.requestByGetMethod(appIdHttpUrl); + if (StringUtil.isNotBlank(schema)) { + String data = JSONObject.parseObject(schema).getString("data"); + JSONArray objects = JSONArray.parseArray(data); + for (Object object : objects) { + JSONArray jsonArray = JSONArray.parseArray(object.toString()); + int key = jsonArray.getInteger(0); + String value = jsonArray.getString(1); + if (appIdMap.containsKey(key)) { + if (!value.equals(appIdMap.get(key))) { + appIdMap.put(key, value); + } + } else { + appIdMap.put(key, value); + } + } + logger.warn("Updating the correspondence takes time:" + (System.currentTimeMillis() - begin)); + logger.warn("Pull the length of the interface data:[" + objects.size() + "]"); + } + } catch (RuntimeException e) { + logger.error("Update cache app-id failed, exception:" + e); + } + } + + + /** + * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie + */ + private void updateAppIdCache() { + ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); + executorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + timestampsFilter(); + } catch (RuntimeException e) { + logger.error("AppUtils update AppCache is error===>{" + e + "}<==="); + } + } + }, 1, 300, TimeUnit.SECONDS); + } + + + /** + * 获取 appName + * + * @param appId app_id + * @return account + */ + public static String getAppName(String url, int appId) { + + if (appUtils == null) { + getAppInstance(url); + } + + return appIdMap.get(appId); + + } + +} diff --git a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/http/HttpClientUtil.java b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/http/HttpClientUtil.java index d8d6fd9..f4d0d44 100644 --- a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/http/HttpClientUtil.java +++ b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/http/HttpClientUtil.java @@ -1,6 +1,7 @@ package com.zdjizhi.flume.interceptor.utils.http; -import com.alibaba.fastjson.JSON; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; @@ -13,50 +14,62 @@ import java.io.InputStreamReader; /** * 获取网关schema工具类 + * @author qidaijie */ public class HttpClientUtil { - public static String requestByGetMethod(String s) { + private static final Log logger = LogFactory.get(); + + /** + * 请求网关获取schema + * + * @param http 网关url + * @return schema + */ + public static String requestByGetMethod(String http) { CloseableHttpClient httpClient = HttpClients.createDefault(); - StringBuilder entityStringBuilder = null; + StringBuilder entityStringBuilder; + + HttpGet get = new HttpGet(http); + BufferedReader bufferedReader = null; + CloseableHttpResponse httpResponse = null; try { - HttpGet get = new HttpGet(s); - CloseableHttpResponse httpResponse = null; httpResponse = httpClient.execute(get); - try { - HttpEntity entity = httpResponse.getEntity(); - entityStringBuilder = new StringBuilder(); - if (null != entity) { - BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); - String line = null; - while ((line = bufferedReader.readLine()) != null) { - entityStringBuilder.append(line); + HttpEntity entity = httpResponse.getEntity(); + entityStringBuilder = new StringBuilder(); + if (null != entity) { + bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); + int intC; + while ((intC = bufferedReader.read()) != -1) { + char c = (char) intC; + if (c == '\n') { + break; } + entityStringBuilder.append(c); } - } finally { - httpResponse.close(); + + return entityStringBuilder.toString(); } - } catch (Exception e) { - e.printStackTrace(); + } catch (IOException e) { + logger.error("Get Schema from Query engine ERROR! Exception message is:" + e); } finally { - try { - if (httpClient != null) { + if (httpClient != null) { + try { httpClient.close(); + } catch (IOException e) { + logger.error("Close HTTP Client ERROR! Exception messgae is:" + e); + } + } + if (httpResponse != null) { + try { + httpResponse.close(); + } catch (IOException e) { + logger.error("Close httpResponse ERROR! Exception messgae is:" + e); } - } catch (IOException e) { - e.printStackTrace(); + } + if (bufferedReader != null) { + org.apache.commons.io.IOUtils.closeQuietly(bufferedReader); } } - return entityStringBuilder.toString(); + return ""; } - - -// public static void main(String[] args) { -//// String s = HttpClientUtil.requestByGetMethod("http://192.168.40.224:9999/metadata/schema/v1/fields/security_event_log"); -//// System.out.println(s); -//// String schemaHttpRes = HttpClientUtil.requestByGetMethod("http://192.168.40.224:9999/metadata/schema/v1/fields/security_event_log"); -//// String schemaHttpRes = HttpClientUtil.requestByGetMethod("http://192.168.40.151:9999/metadata/schema/v1/fields/security_event_log"); -// String schemaHttpRes = HttpClientUtil.requestByGetMethod("http://192.168.40.224:9999/metadata/schema/v1/fields/connection_record_log"); -// String data = JSON.parseObject(schemaHttpRes).get("data").toString(); -// System.out.println(data); -// } } diff --git a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/json/JsonParseUtil.java b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/json/JsonParseUtil.java index b7e100b..3ee9f3b 100644 --- a/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/json/JsonParseUtil.java +++ b/FlumeDynamicInterceptor/src/main/java/com/zdjizhi/flume/interceptor/utils/json/JsonParseUtil.java @@ -3,6 +3,8 @@ package com.zdjizhi.flume.interceptor.utils.json; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.flume.interceptor.common.FlowWriteConfig; import com.zdjizhi.flume.interceptor.utils.http.HttpClientUtil; import net.sf.cglib.beans.BeanGenerator; import net.sf.cglib.beans.BeanMap; @@ -118,18 +120,40 @@ public class JsonParseUtil { JSONArray fields = (JSONArray) schemaJson.get("fields"); for (Object field : fields) { - String name = JSON.parseObject(field.toString()).get("name").toString(); - String type = JSON.parseObject(field.toString()).get("type").toString(); - map.put(name, getClassName(type)); - + String filedStr = field.toString(); + if (checkKeepField(filedStr)) { + String name = JsonPath.read(filedStr, "$.name").toString(); + String type = JsonPath.read(filedStr, "$.type").toString(); + //组合用来生成实体类的map + map.put(name, getClassName(type)); + } } - - return map; } /** + * 判断字段是否需要保留 + * + * @param message 单个field-json + * @return true or false + */ + private static boolean checkKeepField(String message) { + boolean isKeepField = true; + boolean isHiveDoc = JSON.parseObject(message).containsKey("doc"); + if (isHiveDoc) { + boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility"); + if (isHiveVi) { + String visibility = JsonPath.read(message, "$.doc.visibility").toString(); + if (FlowWriteConfig.VISIBILITY.equals(visibility)) { + isKeepField = false; + } + } + } + return isKeepField; + } + + /** * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList) * * @param http diff --git a/FlumeRadiusOnOffInterceptor/pom.xml b/FlumeRadiusOnOffInterceptor/pom.xml index 4517ae3..63a761f 100644 --- a/FlumeRadiusOnOffInterceptor/pom.xml +++ b/FlumeRadiusOnOffInterceptor/pom.xml @@ -5,7 +5,7 @@ <parent> <artifactId>dynamic_complement</artifactId> <groupId>com.zdjizhi</groupId> - <version>1.0</version> + <version>v3.21.04.23</version> </parent> <modelVersion>4.0.0</modelVersion> @@ -17,12 +17,13 @@ <name>Team Nexus Repository</name> <url>http://192.168.40.125:8099/content/groups/public</url> </repository> + </repositories> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flume.version>1.9.0</flume.version> - <hbase.version>2.2.1</hbase.version> + <hbase.version>2.2.3</hbase.version> </properties> <build> @@ -32,11 +33,11 @@ <artifactId>maven-shade-plugin</artifactId> <version>2.4.1</version> <configuration> - <createDependencyReducedPom>true</createDependencyReducedPom> + <createDependencyReducedPom>false</createDependencyReducedPom> </configuration> <executions> <execution> - <phase>package</phase> + <phase>install</phase> <goals> <goal>shade</goal> </goals> @@ -61,6 +62,21 @@ </execution> </executions> </plugin> + + <plugin> + <groupId>io.github.zlika</groupId> + <artifactId>reproducible-build-maven-plugin</artifactId> + <version>0.2</version> + <executions> + <execution> + <goals> + <goal>strip-jar</goal> + </goals> + <phase>install</phase> + </execution> + </executions> + </plugin> + <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> @@ -114,7 +130,7 @@ <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> - <version>1.2.47</version> + <version>1.2.70</version> </dependency> <dependency> @@ -126,7 +142,7 @@ <dependency> <groupId>com.zdjizhi</groupId> <artifactId>galaxy</artifactId> - <version>1.0.2</version> + <version>1.0.3</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java index 3eeafc7..13bc36d 100644 --- a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java +++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/FlumeOnOffApp.java @@ -25,6 +25,7 @@ public class FlumeOnOffApp implements Interceptor { @Override public void initialize() { + } @Override diff --git a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java index 7407290..d2ce4a4 100644 --- a/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java +++ b/FlumeRadiusOnOffInterceptor/src/main/java/com/zdjizhi/flume/interceptor/common/OnOffConfig.java @@ -19,15 +19,16 @@ public class OnOffConfig { public static final int STOP_BILLING = 2; /** - * 计费请求报文类型 - */ - public static final String RADIUS_ACCT_STATUS_TYPE = "radius_acct_status_type"; - /** * 报文类型 */ public static final String RADIUS_PACKET_TYPE = "radius_packet_type"; /** + * 计费请求报文类型 + */ + public static final String RADIUS_ACCT_STATUS_TYPE = "radius_acct_status_type"; + + /** * 发送计费请求报文时间戳 */ public static final String RADIUS_EVENT_TIMESTAMP = "radius_event_timestamp"; diff --git a/FlumeSubscriberInterceptor/pom.xml b/FlumeSubscriberInterceptor/pom.xml index a359d60..49d353a 100644 --- a/FlumeSubscriberInterceptor/pom.xml +++ b/FlumeSubscriberInterceptor/pom.xml @@ -5,25 +5,16 @@ <parent> <artifactId>dynamic_complement</artifactId> <groupId>com.zdjizhi</groupId> - <version>1.0</version> + <version>v3.21.04.23</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>FlumeSubscriberInterceptor</artifactId> - <repositories> - - <repository> - <id>ebi</id> - <name>www.ebi.ac.uk</name> - <url>http://www.ebi.ac.uk/intact/maven/nexus/content/groups/public/</url> - </repository> - </repositories> - <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flume.version>1.9.0</flume.version> - <hbase.version>2.2.1</hbase.version> + <hbase.version>2.2.3</hbase.version> </properties> <build> @@ -33,11 +24,11 @@ <artifactId>maven-shade-plugin</artifactId> <version>2.4.1</version> <configuration> - <createDependencyReducedPom>true</createDependencyReducedPom> + <createDependencyReducedPom>false</createDependencyReducedPom> </configuration> <executions> <execution> - <phase>package</phase> + <phase>install</phase> <goals> <goal>shade</goal> </goals> @@ -62,6 +53,21 @@ </execution> </executions> </plugin> + + <plugin> + <groupId>io.github.zlika</groupId> + <artifactId>reproducible-build-maven-plugin</artifactId> + <version>0.2</version> + <executions> + <execution> + <goals> + <goal>strip-jar</goal> + </goals> + <phase>install</phase> + </execution> + </executions> + </plugin> + <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> @@ -115,7 +121,7 @@ <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> - <version>1.2.47</version> + <version>1.2.70</version> </dependency> <dependency> @@ -141,21 +147,6 @@ </exclusions> </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-server</artifactId> - <version>${hbase.version}</version> - <exclusions> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - <exclusion> - <artifactId>log4j-over-slf4j</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - </exclusions> - </dependency> </dependencies> @@ -7,7 +7,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>dynamic_complement</artifactId> <packaging>pom</packaging> - <version>1.0</version> + <version>v3.21.04.23</version> <modules> <module>FlumeDynamicInterceptor</module> <module>FlumeSubscriberInterceptor</module> |
