# log-stream-completion-schema 基于Nacos的动态日志预处理程序,接收原始日志根据对应Schema定义进行数据清洗,并将结果回写Kafka。 当Nacos上Schame变更后可动态获取到最新版本的信息,无需重启任务。 ## 函数功能列表 * current_timestamp > 获取当前时间戳,若追加字段已有时间戳,不予以覆盖。 * from_unix_timestamp_ms > 获取字段值(毫秒级时间戳)将其转换为字符串日志格式(yyyy-MM-dd HH:mm:ss:SSS),例如:"2019-09-16 19:20:12.345" * snowflake_id > 雪花ID函数,返回一个一定条件内不重复的 long 类型数值。 > https://git.mesalab.cn/bigdata/algorithm/snowflake * geo_ip_detail > IP定位库,获取对应IP的详细地理位置信息,城市,州/省,国家 * geo_asn > ASN定位库,获取对应IP的ASN信息 * geo_ip_country > IP定位库,获取对应IP的地理位置信息,仅包含 国家 * set_value > 给予字段固定值。 * get_value > 获取字段值并追加到新的字段。 * if > IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。 > 若参数携带 $. 标识,则使用的是数据内的字段值;若不携带则为参数指定的值。 * sub_domain > 获取顶级域名 * decode_of_base64 > 根据日志提供的编码,解析base64,若编码为空时 ETL不做转换,保留原始数据信息。 * flattenSpec > 根据表达式解析json,使用FastJson2 jsonPath工具类 > https://alibaba.github.io/fastjson2/jsonpath_cn * combine > 路径合并: HOS统一访问路径(HOS URI) + 桶名 + 原始字段值 (文件唯一标识uuid) > 桶名根据参数进行指定 ## 弃用函数 ### TSG 22.02版本 * app_match > 根据APP_ID获取对应的APP名称 ### TSG 23.12版本 * radius_match > 根据IP获取对应的Raidus用户信息。 > 实际数据存储在HBase tsg_galaxy:relation_framedip_account表中,依赖RELATIONSHIP-RADIUS-ACCOUNT程序;使用时加载到内存中加速查询。 * gtpc_match > 根据日志common_tunnels内的信息,获取GTPC TEID对应的用户信息(phonenumber、imsi、imei),样例数据: ``` {"common_tunnels":"[{\"tunnels_schema_type\":\"GTP\",\"gtp_endpoint_a2b_teid\":247749709,\"gtp_endpoint_b2a_teid\":665547833,\"gtp_sgw_ip\":\"192.56.5.2\",\"gtp_pgw_ip\":\"192.56.10.20\",\"gtp_sgw_port\":2152,\"gtp_pgw_port\":2152}]"} ``` > 实际数据存储在HBase tsg_galaxy:relation_user_teid中,依赖RELATIONSHIP-RADIUS-USER程序;使用时加载到内存中加速查询。 ## 动态知识库逻辑 [流程图](https://git.mesalab.cn/bigdata/tsg/flink/log-completion-schema/-/blob/22.11-KNOWLEDGEBASE/images/%E7%9F%A5%E8%AF%86%E5%BA%93%E6%B5%81%E7%A8%8B%E5%9B%BE.png)