diff options
| author | zhanghongqing <[email protected]> | 2022-05-07 14:16:47 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2022-05-07 14:16:47 +0800 |
| commit | 21fe8dcf18c2200ecd885e44d4249d800feab220 (patch) | |
| tree | ca4fe6c5848a6538485959625ef685e77ac188cf | |
| parent | e19d2ce575220634b32444da9739d309965ad442 (diff) | |
websketch域名探测增加分片任务模式,可支持大数据量服务器平行扩展。
6 files changed, 297 insertions, 21 deletions
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataExtractJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataExtractJob.java index f45e7c3..23b224b 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataExtractJob.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataExtractJob.java @@ -1,6 +1,7 @@ package com.mesalab.executor.jobhandler; import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import cn.hutool.log.Log; import com.mesalab.executor.service.DataSinkService; @@ -10,10 +11,12 @@ import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.log.XxlJobLogger; +import com.xxl.job.core.util.ShardingUtil; import com.zdjizhi.utils.StringUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -84,6 +87,80 @@ public class DataExtractJob { } + @XxlJob("dataShardingExtractJobHandler") + public ReturnT<String> dataShardingExtract(String params) { + + try { + ShardingUtil.ShardingVO shardingVo = ShardingUtil.getShardingVo(); + int shardingTotal = shardingVo.getTotal();//执行器总数 + int shardingIndex = shardingVo.getIndex();//当前执行器编号 + XxlJobLogger.log("sharding params, sharding number = {}, total sharding count = {}", shardingVo.getIndex(), shardingVo.getTotal()); + logger.info("sharding params, sharding number = {}, total sharding count = {}", shardingVo.getIndex(), shardingVo.getTotal()); + + logger.info("query params is {}", params); + List<Map> paramsMaps = parserParams(params); + if (ObjectUtil.isEmpty(paramsMaps)) { + logger.error("params parser error , params is {}", params); + return IJobHandler.FAIL; + } + for (Map<String, Map<String, String>> paramsMap : paramsMaps) { + long start = System.currentTimeMillis(); + Map<String, String> soureParams = paramsMap.get("source"); + Map<String, String> transformParams = paramsMap.get("transform"); + Map<String, String> sinkParams = paramsMap.get("sink"); + + Map<String, String> countParams = new HashMap<>(); + countParams.putAll(soureParams); + countParams.put("sql", "select count(*) AS count from " + countParams.get("table")); + List<Map> countMap = dataSourceService.mysqlSource(countParams); + String parentSql = soureParams.get("sql"); + int totalCount = Integer.valueOf(StrUtil.toString(countMap.get(0).get("count"))); + //分片任务平均分配,每个执行器处理数据总量 [index*avgCount,(index+1)*avgCount] + int avgCount = totalCount % shardingTotal == 0 ? totalCount / shardingTotal : totalCount / shardingTotal + 1; + int shardingStart = shardingIndex * avgCount; + int shardingEnd = (shardingIndex + 1) * avgCount; + + int pageCount = Integer.valueOf(soureParams.get("sharding")); + int pageSize = avgCount % pageCount == 0 ? avgCount / pageCount : avgCount / pageCount + 1; + logger.info("total data {},current sharding {},process data size {} and data rang [{},{}]. paging count {} ,paging size {}.", totalCount, shardingIndex, avgCount, shardingStart, shardingEnd, pageCount, pageSize); + XxlJobLogger.log("total data {},current sharding {},process data size {} and data rang [{},{}]. paging count {} ,paging size {}.", totalCount, shardingIndex, avgCount, shardingStart, shardingEnd, pageCount, pageSize); + + for (int i = 0; i < pageCount; i++) { + int pageNum = i + 1; + //查询数据 + logger.info("sharding {} , paging {} source execute start", shardingIndex, pageNum); + XxlJobLogger.log("sharding {} ,paging {} source execute start", shardingIndex, pageNum); + String pageSql = parentSql + " order by id limit " + (shardingStart + i * pageSize) + "," + (pageSize > shardingEnd ? shardingEnd : pageSize); + soureParams.put("sql", pageSql); + List<Map> sourceResult = dataSourceService.mysqlSource(soureParams); + logger.info("sharding {} ,paging {} source execute end", shardingIndex, pageNum); + XxlJobLogger.log("sharding {} ,paging {} source execute end", shardingIndex, pageNum); + //处理数据 + logger.info("sharding {} ,paging {} transform execute start", shardingIndex, pageNum); + XxlJobLogger.log("sharding {} ,paging {} transform execute start", shardingIndex, pageNum); + List<Map> transformResult = dataTransformService.isFqdnAccessible(sourceResult, transformParams); + logger.info("sharding {} ,paging {} transform execute end", shardingIndex, pageNum); + XxlJobLogger.log("sharding {} ,paging {} transform execute end", shardingIndex, pageNum); + //数据入库 + logger.info("sharding {} ,paging {} sink execute start", shardingIndex, pageNum); + XxlJobLogger.log("sharding {} ,paging {} sink execute start", shardingIndex, pageNum); + dataSinkService.exec(transformResult, sinkParams); + logger.info("sharding {} ,paging {} sink execute end", shardingIndex, pageNum); + XxlJobLogger.log("sharding {} ,paging {} sink execute end", shardingIndex, pageNum); + } + logger.info("sharding {} task end, take time {}ms", shardingIndex, System.currentTimeMillis() - start); + XxlJobLogger.log("sharding {} task end, take time {}ms", shardingIndex, System.currentTimeMillis() - start); + } + + return ReturnT.SUCCESS; + } catch (Exception e) { + XxlJobLogger.log(e.getMessage()); + logger.error(e); + return ReturnT.FAIL; + } + + } + private List<Map> parserParams(String params) { if (StringUtil.isBlank(params)) { XxlJobLogger.log("params is empty!"); diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/DataExtractParam.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/DataExtractParam.java new file mode 100644 index 0000000..5f3010c --- /dev/null +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/DataExtractParam.java @@ -0,0 +1,39 @@ +package com.mesalab.executor.pojo; + +import lombok.Data; +import org.apache.poi.ss.formula.functions.T; + +import java.util.List; + +/** + * @description: + * @author: zhq + * @create: 2022-04-29 + **/ + +@Data +public class DataExtractParam { + + + private List<T> dataExtractParam; + + + public static class source { + + private String type; + private T t; + + } + + public static class transform { + + private String type; + + } + + public static class sink { + + private String type; + + } +} diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java index 80d7f47..c95dfb4 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java @@ -1,5 +1,6 @@ package com.mesalab.executor.service; +import cn.hutool.core.io.IoUtil; import cn.hutool.core.io.file.FileWriter; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; @@ -17,6 +18,7 @@ import org.springframework.stereotype.Service; import java.io.File; import java.sql.Connection; import java.sql.PreparedStatement; +import java.sql.SQLException; import java.util.List; import java.util.Map; @@ -99,10 +101,15 @@ public class DataSinkService { * @throws Exception */ public void exec(List<Map> transformResult, Map<String, String> sinkParams) throws Exception { + if (ObjectUtil.isEmpty(transformResult)) { + logger.info("sink data is empty "); + return; + } Connection conn = null; DBUtils dbUtils = null; PreparedStatement pst = null; try { + String tableName = sinkParams.get("table"); dbUtils = new DBUtils(); //创建数据库连接库对象 @@ -118,8 +125,8 @@ public class DataSinkService { pst = conn.prepareStatement(sql); int count = 0; int index = 1; - for (int i = 0; i < transformResult.size(); i++) { - for (Object val : transformResult.get(i).values()) { + for (int i = 1; i <= transformResult.size(); i++) { + for (Object val : transformResult.get(i - 1).values()) { if (val instanceof Long) { pst.setLong((index++), Long.valueOf(StrUtil.toString(val))); } else if (val instanceof Integer) { @@ -146,15 +153,20 @@ public class DataSinkService { logger.info("{} table {} , count {} , take {}", sinkParams.get("option"), tableName, count, (System.currentTimeMillis() - start) + "ms"); XxlJobLogger.log("{} table {} , count {} , take {}", sinkParams.get("option"), tableName, count, (System.currentTimeMillis() - start) + "ms"); } catch (Exception e) { - conn.rollback(); - logger.error(e.getMessage()); + if (conn != null) { + try { + conn.rollback(); + } catch (SQLException e1) { + logger.error(e1); + } + } + logger.error(e); throw e; } finally { - pst.close(); - conn.close(); + IoUtil.close(pst); + IoUtil.close(conn); dbUtils.close(); } } - } diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSourceService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSourceService.java index 27b1ef7..de5914a 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSourceService.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSourceService.java @@ -132,7 +132,7 @@ public class DataSourceService { return null; } - private List<Map> mysqlSource(Map<String, String> sourceParams) throws Exception { + public List<Map> mysqlSource(Map<String, String> sourceParams) throws Exception { logger.info("mysql data source starting, param is {}", sourceParams); DBParam dbParam = BeanUtil.fillBeanWithMap(sourceParams, new DBParam(), false); List<Map> collect; diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataTransformService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataTransformService.java index 329973b..f03ff0f 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataTransformService.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataTransformService.java @@ -86,6 +86,9 @@ public class DataTransformService { CountDownLatch countDownLatch = new CountDownLatch(t); //1.已经探测过不用重复探测 2.探测结果与数据库相同不用更新 Map uniqueFqdns = new ConcurrentHashMap(); + final String STATUS_CODE = "status_code"; + XxlJobLogger.log("fqdn accessible task starting "); + logger.info("fqdn accessible task starting "); for (List<Map> mapList : split) { fixedThreadPool.execute(new Runnable() { @Override @@ -98,9 +101,9 @@ public class DataTransformService { code = HttpKit.getStatus(UrlUtil.getUrl(map.get("fqdn"))); } uniqueFqdns.put(map.get("fqdn"), code); - if (Integer.valueOf(String.valueOf(map.get("status_code"))) != code) { + if (ObjectUtil.isEmpty(map.get(STATUS_CODE))||Integer.valueOf(String.valueOf(map.get(STATUS_CODE))) != code) { Map m = new ConcurrentHashMap(); - m.put("status_code", code); + m.put(STATUS_CODE, code); m.put("id", map.get("id")); transformResult.add(m); } @@ -110,10 +113,12 @@ public class DataTransformService { }); } try { + logger.info("websketch-fqdn-accessible transform running..."); + XxlJobLogger.log("websketch-fqdn-accessibe transform running"); countDownLatch.await(); fixedThreadPool.shutdown(); } catch (InterruptedException e) { - logger.error(e.getMessage()); + logger.error(e); } //去掉重复无效 logger.info("websketch-fqdn-accessible transform data {}, spend time {} ms", transformResult.size(), System.currentTimeMillis() - start); @@ -121,7 +126,61 @@ public class DataTransformService { return transformResult; } + public List<Map> isFqdnAccessible(List<Map> mariadbResult , Map<String, String> params) throws Exception { + Vector<Map> transformResult = new Vector<>(); + //判断是否可达 + long start = System.currentTimeMillis(); + int threadNum = Integer.valueOf(StrUtil.toString(Optional.ofNullable(params.get("parallelism")).orElse("5")));//线程数 + int partitionSize = (mariadbResult.size() / threadNum) > 0 ? mariadbResult.size() / threadNum : 1; + List<List<Map>> split = ListUtil.split(mariadbResult, partitionSize); + int t = split.size(); + ExecutorService fixedThreadPool = Executors.newFixedThreadPool(t); + logger.info("threads total count is :" + t); + // 用于计数线程是否执行完成 + CountDownLatch countDownLatch = new CountDownLatch(t); + //1.已经探测过不用重复探测 2.探测结果与数据库相同不用更新 + Map uniqueFqdns = new ConcurrentHashMap(); + final String STATUS_CODE = "status_code"; + XxlJobLogger.log("fqdn accessible task starting "); + logger.info("fqdn accessible task starting "); + for (List<Map> mapList : split) { + fixedThreadPool.execute(new Runnable() { + @Override + public void run() { + for (Map map : mapList) { + int code = 0; + if (uniqueFqdns.containsKey(map.get("fqdn"))) { + code = Integer.valueOf(String.valueOf(uniqueFqdns.get("fqdn"))); + } else { + code = HttpKit.getStatus(UrlUtil.getUrl(map.get("fqdn"))); + } + uniqueFqdns.put(map.get("fqdn"), code); + if (ObjectUtil.isEmpty(map.get(STATUS_CODE))||Integer.valueOf(String.valueOf(map.get(STATUS_CODE))) != code) { + Map m = new ConcurrentHashMap(); + m.put(STATUS_CODE, code); + m.put("id", map.get("id")); + transformResult.add(m); + } + } + countDownLatch.countDown(); + } + }); + } + try { + logger.info("websketch-fqdn-accessible transform running..."); + XxlJobLogger.log("websketch-fqdn-accessibe transform running"); + countDownLatch.await(); + fixedThreadPool.shutdown(); + } catch (InterruptedException e) { + logger.error(e); + fixedThreadPool.shutdownNow(); + } + //去掉重复无效 + logger.info("websketch-fqdn-accessible transform data {}, spend time {} ms", transformResult.size(), System.currentTimeMillis() - start); + XxlJobLogger.log("websketch-fqdn-accessible transform data {}, spend time {} ms", transformResult.size(), System.currentTimeMillis() - start); + return transformResult; + } /** * @param sourceResult ioc_value * threat_type 字段相同则不入库 diff --git a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DataExtractTest.java b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DataExtractTest.java index 68af2a7..005244e 100644 --- a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DataExtractTest.java +++ b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DataExtractTest.java @@ -21,7 +21,6 @@ import cn.hutool.poi.excel.ExcelReader; import cn.hutool.poi.excel.ExcelUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; -import com.google.common.collect.Lists; import com.mesalab.executor.core.utils.DBUtils; import com.mesalab.executor.core.utils.HttpClientUtils; import com.mesalab.executor.core.utils.JobUtil; @@ -31,6 +30,8 @@ import com.mesalab.executor.enums.DataTypeEnum; import com.mesalab.executor.exception.BusinessException; import com.mesalab.executor.service.DataSinkService; import com.mesalab.executor.service.DataSourceService; +import com.mesalab.executor.service.DataTransformService; +import com.xxl.job.core.log.XxlJobLogger; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import com.zdjizhi.utils.DateUtils; @@ -413,14 +414,17 @@ public class DataExtractTest { @Test public void test12() { - ArrayList<Object> objects = new ArrayList<>(); - objects.add(1); - objects.add(2); - objects.add(3); - objects.add(4); - - System.err.println(ListUtil.split(objects, 3)); - System.err.println(Lists.partition(objects, 3)); +// ArrayList<Object> objects = new ArrayList<>(); +// objects.add(1); +// objects.add(2); +// objects.add(3); +// objects.add(4); +// +//// System.err.println(ListUtil.split(objects, 3)); +//// System.err.println(Lists.partition(objects, 3)); + String fileTypeParams = "json"; + String[] fileTypes = StrUtil.split(fileTypeParams, ","); + System.err.println(Arrays.toString(fileTypes)); } @Test @@ -444,11 +448,25 @@ public class DataExtractTest { // Map<String, Object> map = new LinkedHashMap<>(); // map.put("fqdn", s); // result.add(map); - fileWriter.write("update domain_category_reputation set submit_user='tsg' where fqdn='"+s+"';\n",true); + fileWriter.write("update domain_category_reputation set submit_user='tsg' where fqdn='" + s + "';\n", true); } // dbUpdate(result); } + @Test + public void txtUpdate() { + + String operator = "FWDNS"; + FileReader fr = new FileReader("D:\\test\\test\\ip\\" + operator + ".txt"); + List<String> strings = fr.readLines(); + FileWriter fw = new FileWriter("D:\\test\\test\\ip\\" + operator + ".sql"); + for (String s : strings) { + fw.write("INSERT IGNORE INTO dns_server_info " + + "(ip_addr, dns_server_role, create_time, update_time) " + + "VALUES('" + StrUtil.trim(s) + "', '" + operator + "', current_timestamp(), current_timestamp());\n", true); + } + } + public void dbUpdate(List<Map> transformResult) { Map<String, String> sinkParams = new HashMap<>(); sinkParams.put("ip", "api.geedge.net"); @@ -507,4 +525,75 @@ public class DataExtractTest { dbUtils.close(); } } + + static Map<String, Map> paramsMap = new HashMap<>(); + + static { + Map<String, String> sourceParams = new HashMap<>(); + sourceParams.put("ip", "192.168.44.12"); + sourceParams.put("type", "mariadb"); + sourceParams.put("database", "web_sketch"); + sourceParams.put("table", "domain_category_reputation"); + sourceParams.put("username", "root"); + sourceParams.put("pin", "galaxy2019"); + sourceParams.put("sql", "SELECT id,fqdn,status_code FROM domain_category_reputation"); + sourceParams.put("sharding", "5"); + paramsMap.put("source", sourceParams); + } + + @Autowired + DataTransformService dataTransformService; + + @Test + public void shardingTest() { + Map<String, String> params = new HashMap<>(); + params.put("parallelism", "5"); + Map<String, String> sinkParams = new HashMap<>(); + sinkParams.put("type", "mariadb"); + sinkParams.put("ip", "192.168.44.12"); + sinkParams.put("database", "web_sketch"); + sinkParams.put("table", "domain_category_reputation"); + sinkParams.put("username", "root"); + sinkParams.put("pin", "galaxy2019"); + sinkParams.put("option", "update"); + try { + long start = System.currentTimeMillis(); + Map<String, String> map = paramsMap.get("source"); + Map<String, String> countParams = new HashMap<>(); + countParams.putAll(map); + countParams.put("sql", "select count(*) AS count from " + countParams.get("table")); + List<Map> countMap = dataSourceService.mysqlSource(countParams); + String parentSql = map.get("sql"); + int sharding = Integer.valueOf(map.get("sharding")); + int count = Integer.valueOf(StrUtil.toString(countMap.get(0).get("count"))); + logger.info("query result data count {} , sharding {}", count, sharding); + int shardingSize = count % sharding == 0 ? count / sharding : count / sharding + 1; + for (int i = 0; i < sharding; i++) { + logger.info("sharding {} source execute start", i + 1); + XxlJobLogger.log("sharding {} source execute start", i + 1); + String shardingSql = parentSql + " order by id desc limit " + i * shardingSize + "," + ((i + 1) * shardingSize > count ? count : (i + 1) * shardingSize); + map.put("sql", shardingSql); + List<Map> shardingResult = dataSourceService.mysqlSource(map); + logger.info("sharding {} source execute end", i + 1); + XxlJobLogger.log("sharding {} source execute end", i + 1); + + logger.info("sharding {} transform execute start", i + 1); + XxlJobLogger.log("sharding {} transform execute start", i + 1); + List<Map> transformResult = dataTransformService.isFqdnAccessible(shardingResult, params); + logger.info("sharding {} transform execute end", i + 1); + XxlJobLogger.log("sharding {} transform execute end", i + 1); + + logger.info("sharding {} sink execute start", i + 1); + XxlJobLogger.log("sharding {} sink execute start", i + 1); + dataSinkService.exec(transformResult, sinkParams); + logger.info("sharding {} sink execute end", i + 1); + XxlJobLogger.log("sharding {} sink execute end", i + 1); + } + logger.info("sharding task end, take time {}ms", System.currentTimeMillis() - start); + XxlJobLogger.log("sharding task end, take time {}ms", System.currentTimeMillis() - start); + } catch (Exception e) { + e.printStackTrace(); + } + + } }
\ No newline at end of file |
