summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-05-07 14:16:47 +0800
committerzhanghongqing <[email protected]>2022-05-07 14:16:47 +0800
commit21fe8dcf18c2200ecd885e44d4249d800feab220 (patch)
treeca4fe6c5848a6538485959625ef685e77ac188cf
parente19d2ce575220634b32444da9739d309965ad442 (diff)
websketch域名探测增加分片任务模式,可支持大数据量服务器平行扩展。
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataExtractJob.java77
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/DataExtractParam.java39
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java26
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSourceService.java2
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataTransformService.java65
-rw-r--r--galaxy-job-executor/src/test/java/com/mesalab/executor/test/DataExtractTest.java109
6 files changed, 297 insertions, 21 deletions
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataExtractJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataExtractJob.java
index f45e7c3..23b224b 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataExtractJob.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataExtractJob.java
@@ -1,6 +1,7 @@
package com.mesalab.executor.jobhandler;
import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import cn.hutool.log.Log;
import com.mesalab.executor.service.DataSinkService;
@@ -10,10 +11,12 @@ import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
+import com.xxl.job.core.util.ShardingUtil;
import com.zdjizhi.utils.StringUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -84,6 +87,80 @@ public class DataExtractJob {
}
+ @XxlJob("dataShardingExtractJobHandler")
+ public ReturnT<String> dataShardingExtract(String params) {
+
+ try {
+ ShardingUtil.ShardingVO shardingVo = ShardingUtil.getShardingVo();
+ int shardingTotal = shardingVo.getTotal();//执行器总数
+ int shardingIndex = shardingVo.getIndex();//当前执行器编号
+ XxlJobLogger.log("sharding params, sharding number = {}, total sharding count = {}", shardingVo.getIndex(), shardingVo.getTotal());
+ logger.info("sharding params, sharding number = {}, total sharding count = {}", shardingVo.getIndex(), shardingVo.getTotal());
+
+ logger.info("query params is {}", params);
+ List<Map> paramsMaps = parserParams(params);
+ if (ObjectUtil.isEmpty(paramsMaps)) {
+ logger.error("params parser error , params is {}", params);
+ return IJobHandler.FAIL;
+ }
+ for (Map<String, Map<String, String>> paramsMap : paramsMaps) {
+ long start = System.currentTimeMillis();
+ Map<String, String> soureParams = paramsMap.get("source");
+ Map<String, String> transformParams = paramsMap.get("transform");
+ Map<String, String> sinkParams = paramsMap.get("sink");
+
+ Map<String, String> countParams = new HashMap<>();
+ countParams.putAll(soureParams);
+ countParams.put("sql", "select count(*) AS count from " + countParams.get("table"));
+ List<Map> countMap = dataSourceService.mysqlSource(countParams);
+ String parentSql = soureParams.get("sql");
+ int totalCount = Integer.valueOf(StrUtil.toString(countMap.get(0).get("count")));
+ //分片任务平均分配,每个执行器处理数据总量 [index*avgCount,(index+1)*avgCount]
+ int avgCount = totalCount % shardingTotal == 0 ? totalCount / shardingTotal : totalCount / shardingTotal + 1;
+ int shardingStart = shardingIndex * avgCount;
+ int shardingEnd = (shardingIndex + 1) * avgCount;
+
+ int pageCount = Integer.valueOf(soureParams.get("sharding"));
+ int pageSize = avgCount % pageCount == 0 ? avgCount / pageCount : avgCount / pageCount + 1;
+ logger.info("total data {},current sharding {},process data size {} and data rang [{},{}]. paging count {} ,paging size {}.", totalCount, shardingIndex, avgCount, shardingStart, shardingEnd, pageCount, pageSize);
+ XxlJobLogger.log("total data {},current sharding {},process data size {} and data rang [{},{}]. paging count {} ,paging size {}.", totalCount, shardingIndex, avgCount, shardingStart, shardingEnd, pageCount, pageSize);
+
+ for (int i = 0; i < pageCount; i++) {
+ int pageNum = i + 1;
+ //查询数据
+ logger.info("sharding {} , paging {} source execute start", shardingIndex, pageNum);
+ XxlJobLogger.log("sharding {} ,paging {} source execute start", shardingIndex, pageNum);
+ String pageSql = parentSql + " order by id limit " + (shardingStart + i * pageSize) + "," + (pageSize > shardingEnd ? shardingEnd : pageSize);
+ soureParams.put("sql", pageSql);
+ List<Map> sourceResult = dataSourceService.mysqlSource(soureParams);
+ logger.info("sharding {} ,paging {} source execute end", shardingIndex, pageNum);
+ XxlJobLogger.log("sharding {} ,paging {} source execute end", shardingIndex, pageNum);
+ //处理数据
+ logger.info("sharding {} ,paging {} transform execute start", shardingIndex, pageNum);
+ XxlJobLogger.log("sharding {} ,paging {} transform execute start", shardingIndex, pageNum);
+ List<Map> transformResult = dataTransformService.isFqdnAccessible(sourceResult, transformParams);
+ logger.info("sharding {} ,paging {} transform execute end", shardingIndex, pageNum);
+ XxlJobLogger.log("sharding {} ,paging {} transform execute end", shardingIndex, pageNum);
+ //数据入库
+ logger.info("sharding {} ,paging {} sink execute start", shardingIndex, pageNum);
+ XxlJobLogger.log("sharding {} ,paging {} sink execute start", shardingIndex, pageNum);
+ dataSinkService.exec(transformResult, sinkParams);
+ logger.info("sharding {} ,paging {} sink execute end", shardingIndex, pageNum);
+ XxlJobLogger.log("sharding {} ,paging {} sink execute end", shardingIndex, pageNum);
+ }
+ logger.info("sharding {} task end, take time {}ms", shardingIndex, System.currentTimeMillis() - start);
+ XxlJobLogger.log("sharding {} task end, take time {}ms", shardingIndex, System.currentTimeMillis() - start);
+ }
+
+ return ReturnT.SUCCESS;
+ } catch (Exception e) {
+ XxlJobLogger.log(e.getMessage());
+ logger.error(e);
+ return ReturnT.FAIL;
+ }
+
+ }
+
private List<Map> parserParams(String params) {
if (StringUtil.isBlank(params)) {
XxlJobLogger.log("params is empty!");
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/DataExtractParam.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/DataExtractParam.java
new file mode 100644
index 0000000..5f3010c
--- /dev/null
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/DataExtractParam.java
@@ -0,0 +1,39 @@
+package com.mesalab.executor.pojo;
+
+import lombok.Data;
+import org.apache.poi.ss.formula.functions.T;
+
+import java.util.List;
+
+/**
+ * @description:
+ * @author: zhq
+ * @create: 2022-04-29
+ **/
+
+@Data
+public class DataExtractParam {
+
+
+ private List<T> dataExtractParam;
+
+
+ public static class source {
+
+ private String type;
+ private T t;
+
+ }
+
+ public static class transform {
+
+ private String type;
+
+ }
+
+ public static class sink {
+
+ private String type;
+
+ }
+}
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java
index 80d7f47..c95dfb4 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java
@@ -1,5 +1,6 @@
package com.mesalab.executor.service;
+import cn.hutool.core.io.IoUtil;
import cn.hutool.core.io.file.FileWriter;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
@@ -17,6 +18,7 @@ import org.springframework.stereotype.Service;
import java.io.File;
import java.sql.Connection;
import java.sql.PreparedStatement;
+import java.sql.SQLException;
import java.util.List;
import java.util.Map;
@@ -99,10 +101,15 @@ public class DataSinkService {
* @throws Exception
*/
public void exec(List<Map> transformResult, Map<String, String> sinkParams) throws Exception {
+ if (ObjectUtil.isEmpty(transformResult)) {
+ logger.info("sink data is empty ");
+ return;
+ }
Connection conn = null;
DBUtils dbUtils = null;
PreparedStatement pst = null;
try {
+
String tableName = sinkParams.get("table");
dbUtils = new DBUtils();
//创建数据库连接库对象
@@ -118,8 +125,8 @@ public class DataSinkService {
pst = conn.prepareStatement(sql);
int count = 0;
int index = 1;
- for (int i = 0; i < transformResult.size(); i++) {
- for (Object val : transformResult.get(i).values()) {
+ for (int i = 1; i <= transformResult.size(); i++) {
+ for (Object val : transformResult.get(i - 1).values()) {
if (val instanceof Long) {
pst.setLong((index++), Long.valueOf(StrUtil.toString(val)));
} else if (val instanceof Integer) {
@@ -146,15 +153,20 @@ public class DataSinkService {
logger.info("{} table {} , count {} , take {}", sinkParams.get("option"), tableName, count, (System.currentTimeMillis() - start) + "ms");
XxlJobLogger.log("{} table {} , count {} , take {}", sinkParams.get("option"), tableName, count, (System.currentTimeMillis() - start) + "ms");
} catch (Exception e) {
- conn.rollback();
- logger.error(e.getMessage());
+ if (conn != null) {
+ try {
+ conn.rollback();
+ } catch (SQLException e1) {
+ logger.error(e1);
+ }
+ }
+ logger.error(e);
throw e;
} finally {
- pst.close();
- conn.close();
+ IoUtil.close(pst);
+ IoUtil.close(conn);
dbUtils.close();
}
}
-
}
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSourceService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSourceService.java
index 27b1ef7..de5914a 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSourceService.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSourceService.java
@@ -132,7 +132,7 @@ public class DataSourceService {
return null;
}
- private List<Map> mysqlSource(Map<String, String> sourceParams) throws Exception {
+ public List<Map> mysqlSource(Map<String, String> sourceParams) throws Exception {
logger.info("mysql data source starting, param is {}", sourceParams);
DBParam dbParam = BeanUtil.fillBeanWithMap(sourceParams, new DBParam(), false);
List<Map> collect;
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataTransformService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataTransformService.java
index 329973b..f03ff0f 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataTransformService.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataTransformService.java
@@ -86,6 +86,9 @@ public class DataTransformService {
CountDownLatch countDownLatch = new CountDownLatch(t);
//1.已经探测过不用重复探测 2.探测结果与数据库相同不用更新
Map uniqueFqdns = new ConcurrentHashMap();
+ final String STATUS_CODE = "status_code";
+ XxlJobLogger.log("fqdn accessible task starting ");
+ logger.info("fqdn accessible task starting ");
for (List<Map> mapList : split) {
fixedThreadPool.execute(new Runnable() {
@Override
@@ -98,9 +101,9 @@ public class DataTransformService {
code = HttpKit.getStatus(UrlUtil.getUrl(map.get("fqdn")));
}
uniqueFqdns.put(map.get("fqdn"), code);
- if (Integer.valueOf(String.valueOf(map.get("status_code"))) != code) {
+ if (ObjectUtil.isEmpty(map.get(STATUS_CODE))||Integer.valueOf(String.valueOf(map.get(STATUS_CODE))) != code) {
Map m = new ConcurrentHashMap();
- m.put("status_code", code);
+ m.put(STATUS_CODE, code);
m.put("id", map.get("id"));
transformResult.add(m);
}
@@ -110,10 +113,12 @@ public class DataTransformService {
});
}
try {
+ logger.info("websketch-fqdn-accessible transform running...");
+ XxlJobLogger.log("websketch-fqdn-accessibe transform running");
countDownLatch.await();
fixedThreadPool.shutdown();
} catch (InterruptedException e) {
- logger.error(e.getMessage());
+ logger.error(e);
}
//去掉重复无效
logger.info("websketch-fqdn-accessible transform data {}, spend time {} ms", transformResult.size(), System.currentTimeMillis() - start);
@@ -121,7 +126,61 @@ public class DataTransformService {
return transformResult;
}
+ public List<Map> isFqdnAccessible(List<Map> mariadbResult , Map<String, String> params) throws Exception {
+ Vector<Map> transformResult = new Vector<>();
+ //判断是否可达
+ long start = System.currentTimeMillis();
+ int threadNum = Integer.valueOf(StrUtil.toString(Optional.ofNullable(params.get("parallelism")).orElse("5")));//线程数
+ int partitionSize = (mariadbResult.size() / threadNum) > 0 ? mariadbResult.size() / threadNum : 1;
+ List<List<Map>> split = ListUtil.split(mariadbResult, partitionSize);
+ int t = split.size();
+ ExecutorService fixedThreadPool = Executors.newFixedThreadPool(t);
+ logger.info("threads total count is :" + t);
+ // 用于计数线程是否执行完成
+ CountDownLatch countDownLatch = new CountDownLatch(t);
+ //1.已经探测过不用重复探测 2.探测结果与数据库相同不用更新
+ Map uniqueFqdns = new ConcurrentHashMap();
+ final String STATUS_CODE = "status_code";
+ XxlJobLogger.log("fqdn accessible task starting ");
+ logger.info("fqdn accessible task starting ");
+ for (List<Map> mapList : split) {
+ fixedThreadPool.execute(new Runnable() {
+ @Override
+ public void run() {
+ for (Map map : mapList) {
+ int code = 0;
+ if (uniqueFqdns.containsKey(map.get("fqdn"))) {
+ code = Integer.valueOf(String.valueOf(uniqueFqdns.get("fqdn")));
+ } else {
+ code = HttpKit.getStatus(UrlUtil.getUrl(map.get("fqdn")));
+ }
+ uniqueFqdns.put(map.get("fqdn"), code);
+ if (ObjectUtil.isEmpty(map.get(STATUS_CODE))||Integer.valueOf(String.valueOf(map.get(STATUS_CODE))) != code) {
+ Map m = new ConcurrentHashMap();
+ m.put(STATUS_CODE, code);
+ m.put("id", map.get("id"));
+ transformResult.add(m);
+ }
+ }
+ countDownLatch.countDown();
+ }
+ });
+ }
+ try {
+ logger.info("websketch-fqdn-accessible transform running...");
+ XxlJobLogger.log("websketch-fqdn-accessibe transform running");
+ countDownLatch.await();
+ fixedThreadPool.shutdown();
+ } catch (InterruptedException e) {
+ logger.error(e);
+ fixedThreadPool.shutdownNow();
+ }
+ //去掉重复无效
+ logger.info("websketch-fqdn-accessible transform data {}, spend time {} ms", transformResult.size(), System.currentTimeMillis() - start);
+ XxlJobLogger.log("websketch-fqdn-accessible transform data {}, spend time {} ms", transformResult.size(), System.currentTimeMillis() - start);
+ return transformResult;
+ }
/**
* @param sourceResult ioc_value
* threat_type 字段相同则不入库
diff --git a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DataExtractTest.java b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DataExtractTest.java
index 68af2a7..005244e 100644
--- a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DataExtractTest.java
+++ b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DataExtractTest.java
@@ -21,7 +21,6 @@ import cn.hutool.poi.excel.ExcelReader;
import cn.hutool.poi.excel.ExcelUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
-import com.google.common.collect.Lists;
import com.mesalab.executor.core.utils.DBUtils;
import com.mesalab.executor.core.utils.HttpClientUtils;
import com.mesalab.executor.core.utils.JobUtil;
@@ -31,6 +30,8 @@ import com.mesalab.executor.enums.DataTypeEnum;
import com.mesalab.executor.exception.BusinessException;
import com.mesalab.executor.service.DataSinkService;
import com.mesalab.executor.service.DataSourceService;
+import com.mesalab.executor.service.DataTransformService;
+import com.xxl.job.core.log.XxlJobLogger;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import com.zdjizhi.utils.DateUtils;
@@ -413,14 +414,17 @@ public class DataExtractTest {
@Test
public void test12() {
- ArrayList<Object> objects = new ArrayList<>();
- objects.add(1);
- objects.add(2);
- objects.add(3);
- objects.add(4);
-
- System.err.println(ListUtil.split(objects, 3));
- System.err.println(Lists.partition(objects, 3));
+// ArrayList<Object> objects = new ArrayList<>();
+// objects.add(1);
+// objects.add(2);
+// objects.add(3);
+// objects.add(4);
+//
+//// System.err.println(ListUtil.split(objects, 3));
+//// System.err.println(Lists.partition(objects, 3));
+ String fileTypeParams = "json";
+ String[] fileTypes = StrUtil.split(fileTypeParams, ",");
+ System.err.println(Arrays.toString(fileTypes));
}
@Test
@@ -444,11 +448,25 @@ public class DataExtractTest {
// Map<String, Object> map = new LinkedHashMap<>();
// map.put("fqdn", s);
// result.add(map);
- fileWriter.write("update domain_category_reputation set submit_user='tsg' where fqdn='"+s+"';\n",true);
+ fileWriter.write("update domain_category_reputation set submit_user='tsg' where fqdn='" + s + "';\n", true);
}
// dbUpdate(result);
}
+ @Test
+ public void txtUpdate() {
+
+ String operator = "FWDNS";
+ FileReader fr = new FileReader("D:\\test\\test\\ip\\" + operator + ".txt");
+ List<String> strings = fr.readLines();
+ FileWriter fw = new FileWriter("D:\\test\\test\\ip\\" + operator + ".sql");
+ for (String s : strings) {
+ fw.write("INSERT IGNORE INTO dns_server_info " +
+ "(ip_addr, dns_server_role, create_time, update_time) " +
+ "VALUES('" + StrUtil.trim(s) + "', '" + operator + "', current_timestamp(), current_timestamp());\n", true);
+ }
+ }
+
public void dbUpdate(List<Map> transformResult) {
Map<String, String> sinkParams = new HashMap<>();
sinkParams.put("ip", "api.geedge.net");
@@ -507,4 +525,75 @@ public class DataExtractTest {
dbUtils.close();
}
}
+
+ static Map<String, Map> paramsMap = new HashMap<>();
+
+ static {
+ Map<String, String> sourceParams = new HashMap<>();
+ sourceParams.put("ip", "192.168.44.12");
+ sourceParams.put("type", "mariadb");
+ sourceParams.put("database", "web_sketch");
+ sourceParams.put("table", "domain_category_reputation");
+ sourceParams.put("username", "root");
+ sourceParams.put("pin", "galaxy2019");
+ sourceParams.put("sql", "SELECT id,fqdn,status_code FROM domain_category_reputation");
+ sourceParams.put("sharding", "5");
+ paramsMap.put("source", sourceParams);
+ }
+
+ @Autowired
+ DataTransformService dataTransformService;
+
+ @Test
+ public void shardingTest() {
+ Map<String, String> params = new HashMap<>();
+ params.put("parallelism", "5");
+ Map<String, String> sinkParams = new HashMap<>();
+ sinkParams.put("type", "mariadb");
+ sinkParams.put("ip", "192.168.44.12");
+ sinkParams.put("database", "web_sketch");
+ sinkParams.put("table", "domain_category_reputation");
+ sinkParams.put("username", "root");
+ sinkParams.put("pin", "galaxy2019");
+ sinkParams.put("option", "update");
+ try {
+ long start = System.currentTimeMillis();
+ Map<String, String> map = paramsMap.get("source");
+ Map<String, String> countParams = new HashMap<>();
+ countParams.putAll(map);
+ countParams.put("sql", "select count(*) AS count from " + countParams.get("table"));
+ List<Map> countMap = dataSourceService.mysqlSource(countParams);
+ String parentSql = map.get("sql");
+ int sharding = Integer.valueOf(map.get("sharding"));
+ int count = Integer.valueOf(StrUtil.toString(countMap.get(0).get("count")));
+ logger.info("query result data count {} , sharding {}", count, sharding);
+ int shardingSize = count % sharding == 0 ? count / sharding : count / sharding + 1;
+ for (int i = 0; i < sharding; i++) {
+ logger.info("sharding {} source execute start", i + 1);
+ XxlJobLogger.log("sharding {} source execute start", i + 1);
+ String shardingSql = parentSql + " order by id desc limit " + i * shardingSize + "," + ((i + 1) * shardingSize > count ? count : (i + 1) * shardingSize);
+ map.put("sql", shardingSql);
+ List<Map> shardingResult = dataSourceService.mysqlSource(map);
+ logger.info("sharding {} source execute end", i + 1);
+ XxlJobLogger.log("sharding {} source execute end", i + 1);
+
+ logger.info("sharding {} transform execute start", i + 1);
+ XxlJobLogger.log("sharding {} transform execute start", i + 1);
+ List<Map> transformResult = dataTransformService.isFqdnAccessible(shardingResult, params);
+ logger.info("sharding {} transform execute end", i + 1);
+ XxlJobLogger.log("sharding {} transform execute end", i + 1);
+
+ logger.info("sharding {} sink execute start", i + 1);
+ XxlJobLogger.log("sharding {} sink execute start", i + 1);
+ dataSinkService.exec(transformResult, sinkParams);
+ logger.info("sharding {} sink execute end", i + 1);
+ XxlJobLogger.log("sharding {} sink execute end", i + 1);
+ }
+ logger.info("sharding task end, take time {}ms", System.currentTimeMillis() - start);
+ XxlJobLogger.log("sharding task end, take time {}ms", System.currentTimeMillis() - start);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
} \ No newline at end of file