summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLAPTOP-CUUVN8AS\wk <[email protected]>2021-10-29 11:14:33 +0800
committerLAPTOP-CUUVN8AS\wk <[email protected]>2021-10-29 11:14:33 +0800
commitf1a67e36db7d4fe05eadbf001f1054fdf104cf3c (patch)
tree88b6152dec370b442b60f30736d92ff6d843715a
parentc42621bf0dff54445ae00cbfe4f3373ed1ec7bac (diff)
增加accout-framedip关系
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/ParseFunction.java34
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/TimerFunctionAccountWithFramedIp.java75
2 files changed, 109 insertions, 0 deletions
diff --git a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
new file mode 100644
index 0000000..78d1a1c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
@@ -0,0 +1,34 @@
+package com.zdjizhi.utils.functions;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+
+public class ParseFunction implements MapFunction<String, Tuple2<String, String>> {
+ private static final Log logger = LogFactory.get();
+
+
+ @Override
+ public Tuple2<String, String> map(String logs) {
+ try {
+ JSONObject jsonObject = JSONObject.parseObject(logs);
+ String framedIp = jsonObject.getString("radius_framed_ip");
+ String account = jsonObject.getString("radius_account");
+
+ return Tuple2.of(framedIp, account);
+
+ } catch (RuntimeException e) {
+ logger.error("解析Radius数据获取用户信息异常,异常信息:" + e);
+ }
+ return Tuple2.of("", "");
+ }
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/functions/TimerFunctionAccountWithFramedIp.java b/src/main/java/com/zdjizhi/utils/functions/TimerFunctionAccountWithFramedIp.java
new file mode 100644
index 0000000..f355648
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/TimerFunctionAccountWithFramedIp.java
@@ -0,0 +1,75 @@
+package com.zdjizhi.utils.functions;
+
+
+import com.zdjizhi.common.RadiusRelationshipConfig;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.zdjizhi.utils.hbase.HBaseUtils.insertData;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/6/2316:59
+ */
+public class TimerFunctionAccountWithFramedIp extends KeyedProcessFunction<String, Tuple2<String, String>, Object> {
+ private static final Logger logger = LoggerFactory.getLogger(TimerFunctionAccountWithFramedIp.class);
+ private static Map<String,String> map = new HashMap<String,String>();
+ private static List<Put> putList = new ArrayList<>();
+ private static boolean first = true;
+
+
+ @Override
+ public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
+
+ for(Map.Entry<String,String> entry: map.entrySet() ){
+
+ Put put = new Put(entry.getKey().getBytes());
+ put.addColumn("radius".getBytes(), "framed_ip".getBytes(), entry.getValue().getBytes());
+ put.addColumn("radius".getBytes(), "last_found_time".getBytes(), Bytes.toBytes(timestamp/1000));
+
+
+ if(putList.size()<100000){
+ putList.add(put);
+ }
+ else{
+ insertData(putList,RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME);
+ putList.clear();
+ }
+ }
+ if(putList.size()>0) {
+ insertData(putList, RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME);
+ putList.clear();
+ }
+ map.clear();
+ ctx.timerService().registerProcessingTimeTimer(timestamp + (RadiusRelationshipConfig.HBASE_TICK_TUPLE_FREQ_SECS * 1000));
+ }
+
+ @Override
+ public void processElement(Tuple2<String, String> value, Context ctx, Collector<Object> out) throws Exception {
+ //仅在该算子接收到第一个数据时,注册一个定时器
+ if (first) {
+ first = false;
+ Long time = System.currentTimeMillis();
+ ctx.timerService().registerProcessingTimeTimer(time + (RadiusRelationshipConfig.HBASE_TICK_TUPLE_FREQ_SECS * 1000));
+ }
+ String account = value.f1;
+
+ String framedIp = value.f0;
+ if (StringUtil.isNotBlank(framedIp) && StringUtil.isNotBlank(account)) {
+ map.put(account,framedIp);
+ }
+ }
+}