diff options
| author | LAPTOP-CUUVN8AS\wk <[email protected]> | 2021-10-29 11:14:33 +0800 |
|---|---|---|
| committer | LAPTOP-CUUVN8AS\wk <[email protected]> | 2021-10-29 11:14:33 +0800 |
| commit | f1a67e36db7d4fe05eadbf001f1054fdf104cf3c (patch) | |
| tree | 88b6152dec370b442b60f30736d92ff6d843715a | |
| parent | c42621bf0dff54445ae00cbfe4f3373ed1ec7bac (diff) | |
增加accout-framedip关系
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/ParseFunction.java | 34 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/TimerFunctionAccountWithFramedIp.java | 75 |
2 files changed, 109 insertions, 0 deletions
diff --git a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java new file mode 100644 index 0000000..78d1a1c --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java @@ -0,0 +1,34 @@ +package com.zdjizhi.utils.functions; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSONObject; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ + +public class ParseFunction implements MapFunction<String, Tuple2<String, String>> { + private static final Log logger = LogFactory.get(); + + + @Override + public Tuple2<String, String> map(String logs) { + try { + JSONObject jsonObject = JSONObject.parseObject(logs); + String framedIp = jsonObject.getString("radius_framed_ip"); + String account = jsonObject.getString("radius_account"); + + return Tuple2.of(framedIp, account); + + } catch (RuntimeException e) { + logger.error("解析Radius数据获取用户信息异常,异常信息:" + e); + } + return Tuple2.of("", ""); + } +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/functions/TimerFunctionAccountWithFramedIp.java b/src/main/java/com/zdjizhi/utils/functions/TimerFunctionAccountWithFramedIp.java new file mode 100644 index 0000000..f355648 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/TimerFunctionAccountWithFramedIp.java @@ -0,0 +1,75 @@ +package com.zdjizhi.utils.functions; + + +import com.zdjizhi.common.RadiusRelationshipConfig; +import com.zdjizhi.utils.StringUtil; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.zdjizhi.utils.hbase.HBaseUtils.insertData; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/6/2316:59 + */ +public class TimerFunctionAccountWithFramedIp extends KeyedProcessFunction<String, Tuple2<String, String>, Object> { + private static final Logger logger = LoggerFactory.getLogger(TimerFunctionAccountWithFramedIp.class); + private static Map<String,String> map = new HashMap<String,String>(); + private static List<Put> putList = new ArrayList<>(); + private static boolean first = true; + + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception { + + for(Map.Entry<String,String> entry: map.entrySet() ){ + + Put put = new Put(entry.getKey().getBytes()); + put.addColumn("radius".getBytes(), "framed_ip".getBytes(), entry.getValue().getBytes()); + put.addColumn("radius".getBytes(), "last_found_time".getBytes(), Bytes.toBytes(timestamp/1000)); + + + if(putList.size()<100000){ + putList.add(put); + } + else{ + insertData(putList,RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME); + putList.clear(); + } + } + if(putList.size()>0) { + insertData(putList, RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME); + putList.clear(); + } + map.clear(); + ctx.timerService().registerProcessingTimeTimer(timestamp + (RadiusRelationshipConfig.HBASE_TICK_TUPLE_FREQ_SECS * 1000)); + } + + @Override + public void processElement(Tuple2<String, String> value, Context ctx, Collector<Object> out) throws Exception { + //仅在该算子接收到第一个数据时,注册一个定时器 + if (first) { + first = false; + Long time = System.currentTimeMillis(); + ctx.timerService().registerProcessingTimeTimer(time + (RadiusRelationshipConfig.HBASE_TICK_TUPLE_FREQ_SECS * 1000)); + } + String account = value.f1; + + String framedIp = value.f0; + if (StringUtil.isNotBlank(framedIp) && StringUtil.isNotBlank(account)) { + map.put(account,framedIp); + } + } +} |
