diff options
| author | zhanghongqing <[email protected]> | 2022-04-11 18:42:17 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2022-04-11 18:42:17 +0800 |
| commit | 200983d95fd38c5bb31dd14a736d4d21552ae265 (patch) | |
| tree | 27db354d2edf1c25e26f741c5cb9202d9f577281 | |
| parent | 87952f4eba56fe82320e7a80a8e2e81d4c295aba (diff) | |
websketch 导入导出文件功能代码优化
10 files changed, 337 insertions, 137 deletions
diff --git a/galaxy-job-executor/pom.xml b/galaxy-job-executor/pom.xml index bd40cb5..661a5b7 100644 --- a/galaxy-job-executor/pom.xml +++ b/galaxy-job-executor/pom.xml @@ -169,7 +169,7 @@ <JAR_FILE>${project.build.finalName}.xjar</JAR_FILE> </buildArgs> <imageTags> - <imageTag>v1.3.220303</imageTag> + <imageTag>v1.3.2204011-dev</imageTag> </imageTags> <resources> <resource> diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/JobUtil.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/JobUtil.java index f264898..d9d97fe 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/JobUtil.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/JobUtil.java @@ -6,15 +6,14 @@ import cn.hutool.core.text.csv.CsvUtil; import cn.hutool.core.text.csv.CsvWriter; import cn.hutool.core.util.CharsetUtil; import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONNull; import cn.hutool.log.Log; import com.mesalab.executor.exception.BusinessException; import com.xxl.job.core.log.XxlJobLogger; import java.io.File; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; +import java.util.*; public class JobUtil { private static Log logger = Log.get(); @@ -61,6 +60,7 @@ public class JobUtil { return null; } } + /** * 获取map中第一个非空数据值 * @@ -79,4 +79,34 @@ public class JobUtil { } return obj; } + + public static Object nullToDef(Map datum, String key, String def) { + if (datum.get(key) instanceof JSONNull) { + return def; + } else if (datum.get(key) instanceof String) { + return datum.get(key) == null || "null".equals(datum.get(key)) ? def : StrUtil.toString(datum.get(key)); + } + return datum.get(key); + } + + public static Object nullToDef(Object dataum, String def) { + if (dataum instanceof JSONNull) { + return def; + } else if (dataum instanceof String) { + return dataum == null || "null".equals(dataum) ? def : StrUtil.toString(dataum); + } + return dataum; + } + /** + * 获取map中的指定字段 + * 1.将null转为"" + */ + public static Map extractMap(Map source, String keys) { + String[] keysArr = StrUtil.split(keys, ","); + Map m = new HashMap(); + for (String key : keysArr) { + m.put(key, JobUtil.nullToDef(source, key, "")); + } + return m; + } } diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/enums/DataTypeEnum.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/enums/DataTypeEnum.java new file mode 100644 index 0000000..1a4427a --- /dev/null +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/enums/DataTypeEnum.java @@ -0,0 +1,32 @@ +package com.mesalab.executor.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; + +/** + * @description: + * @author: zhq + * @create: 2022-04-08 + **/ +@NoArgsConstructor +@AllArgsConstructor +@Getter +public enum DataTypeEnum { + + FILE("file"), + MYSQL("mysql"); + + private String name; + + public static DataTypeEnum match(String name) { + if (name != null) { + for (DataTypeEnum item : DataTypeEnum.values()) { + if (item.name().equals(name)) { + return item; + } + } + } + return null; + } +} diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataExtractJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataExtractJob.java index f240b60..c877209 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataExtractJob.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataExtractJob.java @@ -14,7 +14,6 @@ import com.zdjizhi.utils.StringUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.List; import java.util.Map; /** @@ -64,20 +63,18 @@ public class DataExtractJob { logger.error("params parser error , params is {}", params); return IJobHandler.FAIL; } - logger.info(params); - Map<String, String> sourceParams = paramsMap.get("source"); - Map<String, String> transformParams = paramsMap.get("transform"); - Map<String, String> sinkParams = paramsMap.get("sink"); + logger.info("query params is {}", params); - Map<String, List> sourceResult = dataSourceService.adapt(sourceParams); - if (ObjectUtil.isNotEmpty(transformParams)) { - Map<String, List> transformResult = dataTransformService.adapt(transformParams, sourceResult); - dataSinkService.adapt(transformResult, sinkParams); + Map<String,Object> sourceResult = dataSourceService.adapt(paramsMap.get("source")); + if (ObjectUtil.isNotEmpty(paramsMap.get("transform"))) { + Map<String, Object> transformResult = dataTransformService.adapt(paramsMap.get("transform"), sourceResult); + dataSinkService.adapt(transformResult, paramsMap.get("sink")); } else { - dataSinkService.adapt(sourceResult, sinkParams); + dataSinkService.adapt(sourceResult, paramsMap.get("sink")); } return ReturnT.SUCCESS; } catch (Exception e) { + e.printStackTrace(); XxlJobLogger.log(e.getMessage()); logger.error(e.getMessage()); return ReturnT.FAIL; @@ -85,7 +82,6 @@ public class DataExtractJob { } - private Map<String, Map<String, String>> parserParams(String params) { if (StringUtil.isBlank(params)) { XxlJobLogger.log("params is empty!"); @@ -96,16 +92,16 @@ public class DataExtractJob { XxlJobLogger.log("params is not format json ! "); return null; } - if (ObjectUtil.isEmpty(param.get("sink"))) { - XxlJobLogger.log("sink is empty ! "); + if (ObjectUtil.isEmpty(param.get("source"))) { + XxlJobLogger.log("source is empty ! "); return null; } if (ObjectUtil.isNull(param.get("transform"))) { - XxlJobLogger.log("transform is empty ! "); + XxlJobLogger.log("transform is null ! "); return null; } - if (ObjectUtil.isEmpty(param.get("source"))) { - XxlJobLogger.log("source is empty ! "); + if (ObjectUtil.isNull(param.get("sink"))) { + XxlJobLogger.log("sink is null ! "); return null; } return param; diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataflowJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataflowJob.java index 1f1d2a7..55234ef 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataflowJob.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataflowJob.java @@ -6,7 +6,6 @@ import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.RandomUtil; import cn.hutool.core.util.ZipUtil; import cn.hutool.crypto.digest.DigestUtil; -import cn.hutool.json.JSONUtil; import cn.hutool.log.Log; import com.google.common.base.Splitter; import com.mesalab.executor.core.config.StorgeConfig; @@ -141,7 +140,7 @@ public class DataflowJob { @XxlJob("httpToStoreJobHandler") public ReturnT<String> httpToStoreJobHandler(String params) { - List<HttpParam> httpParams = parserHttpParams(params); + List<HttpParam> httpParams = HttpParam.parserHttpParams(params); if (ObjectUtil.isEmpty(httpParams)) { logger.error("params parser error , params is {}", params); return IJobHandler.FAIL; @@ -238,31 +237,6 @@ public class DataflowJob { return results; } - private List<HttpParam> parserHttpParams(String params) { - if (StringUtil.isBlank(params)) { - XxlJobLogger.log("params is empty!"); - return null; - } - List<HttpParam> httpParams = JSONUtil.toList(params, HttpParam.class); - for (HttpParam httpParam : httpParams) { - if (ObjectUtil.isNull(httpParam)) { - XxlJobLogger.log("params is not format json ! "); - return null; - } - if (StringUtil.isBlank(httpParam.getUrl())) { - XxlJobLogger.log("url is empty ! "); - return null; - } - if (StringUtil.isEmpty(httpParam.getStore())) { - XxlJobLogger.log("store is empty ! "); - return null; - } - if (StringUtil.isBlank(httpParam.getMethod())) { - XxlJobLogger.log("this request method is default get"); - httpParam.setMethod(HttpMethod.GET.name()); - } - } - return httpParams; - } + } diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/HttpParam.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/HttpParam.java index 208e097..88a0e84 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/HttpParam.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/HttpParam.java @@ -1,8 +1,17 @@ package com.mesalab.executor.pojo; -import lombok.*; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.json.JSONUtil; +import com.xxl.job.core.log.XxlJobLogger; +import com.zdjizhi.utils.StringUtil; +import io.netty.handler.codec.http.HttpMethod; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; import java.io.Serializable; +import java.util.List; import java.util.Map; @Data @@ -15,4 +24,56 @@ public class HttpParam implements Serializable { private String method; private String resultKey;//指定结果集返回值 private Map<String, Object> requestBody;// http post请求参数 + + public static List<HttpParam> parserHttpParams(String params) { + if (StringUtil.isBlank(params)) { + XxlJobLogger.log("params is empty!"); + return null; + } + List<HttpParam> httpParams = JSONUtil.toList(params, HttpParam.class); + for (HttpParam httpParam : httpParams) { + if (ObjectUtil.isNull(httpParam)) { + XxlJobLogger.log("params is not format json ! "); + return null; + } + if (StringUtil.isBlank(httpParam.getUrl())) { + XxlJobLogger.log("url is empty ! "); + return null; + } + if (StringUtil.isEmpty(httpParam.getStore())) { + XxlJobLogger.log("store is empty ! "); + return null; + } + if (StringUtil.isBlank(httpParam.getMethod())) { + XxlJobLogger.log("this request method is default get"); + httpParam.setMethod(HttpMethod.GET.name()); + } + } + return httpParams; + } + + public static HttpParam parserHttpParam(String params) { + if (StringUtil.isBlank(params)) { + XxlJobLogger.log("params is empty!"); + return null; + } + HttpParam httpParam = JSONUtil.toBean(params, HttpParam.class,true); + if (ObjectUtil.isNull(httpParam)) { + XxlJobLogger.log("params is not format json ! "); + return null; + } + if (StringUtil.isBlank(httpParam.getUrl())) { + XxlJobLogger.log("url is empty ! "); + return null; + } + if (StringUtil.isEmpty(httpParam.getStore())) { + XxlJobLogger.log("store is empty ! "); + return null; + } + if (StringUtil.isBlank(httpParam.getMethod())) { + XxlJobLogger.log("this request method is default get"); + httpParam.setMethod(HttpMethod.GET.name()); + } + return httpParam; + } } diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java index a00178c..5435b89 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java @@ -1,7 +1,7 @@ package com.mesalab.executor.service; import cn.hutool.core.io.file.FileWriter; -import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.log.Log; import com.alibaba.fastjson.JSON; @@ -9,6 +9,8 @@ import com.alibaba.fastjson.serializer.SerializerFeature; import com.mesalab.executor.core.utils.DBUtils; import com.mesalab.executor.core.utils.JobUtil; import com.zdjizhi.utils.DateUtils; +import com.zdjizhi.utils.JsonMapper; +import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -16,8 +18,6 @@ import java.io.File; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; -import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -30,55 +30,53 @@ import java.util.Map; public class DataSinkService { private Log logger = Log.get(); - public void adapt(Map<String, List> transformResult, Map<String, String> sinkParams) throws Exception { - String type = sinkParams.get("type"); + public void adapt(Map<String, Object> transformResult, Map<String, String> sinkParams) throws Exception { + if (ObjectUtil.isEmpty(sinkParams.get("type"))) { + return; + } //1. 文件类型 2. 选择处理工具 3. 转为格式数据 - if ("mariadb".equals(type) || "mysql".equals(type)) { - mysqlSink(JobUtil.getFirstNotNull(transformResult), sinkParams); - } else if ("file".equals(type)) { - fileSink(JobUtil.getFirstNotNull(transformResult), sinkParams); + switch (sinkParams.get("type")) { + case "mariadb": + mysqlSink(transformResult, sinkParams); + break; + case "mysql": + mysqlSink(transformResult, sinkParams); + break; + case "file": + fileSink(JobUtil.getFirstNotNull(transformResult), sinkParams); + break; + default: + break; } } - public void fileSink(List<Map> transformResult, Map<String, String> sinkParams) throws Exception { + public void fileSink(Object transformResult, Map<String, String> sinkParams) throws Exception { String fileType = sinkParams.get("fileType"); String path = sinkParams.get("path"); String prefix = sinkParams.get("prefix"); - String date = DateUtils.getCurrentDate("yyyyMMdd"); - String filePath = StrUtil.concat(false, path, prefix, "_", date); + String date = ""; + if (StringUtils.isNotBlank(sinkParams.get("datePattern"))) { + date = "_".concat(DateUtils.getCurrentDate("yyyyMMdd")); + } + String filePath = StrUtil.concat(false, path, prefix, date); if ("json".equals(fileType)) { FileWriter fileWriter = new FileWriter(filePath.concat(".json")); File file = fileWriter.write(JSON.toJSONString(transformResult, SerializerFeature.PrettyFormat)); - logger.info("download {} pieces of json data to path {}", transformResult.size(), file.getAbsolutePath()); + logger.info("download json data to path {}", file.getAbsolutePath()); } else if ("csv".equals(fileType)) { - File file = JobUtil.createCsvFile(transformResult, filePath.concat(".csv")); - logger.info("download {} pieces of csv data to path {}", transformResult.size(), file.getAbsolutePath()); + List<Map> transform = (List<Map>) JsonMapper.fromJsonString(JsonMapper.toJsonString(transformResult), List.class); + File file = JobUtil.createCsvFile(transform, filePath.concat(".csv")); + logger.info("download {} pieces of csv data to path {}", transform.size(), file.getAbsolutePath()); } } /** * @return */ - public void mysqlSink(List<Map> transformResult, Map<String, String> sinkParams) throws Exception { - exec(transformResult, sinkParams); - } - - public static LinkedHashMap getMapValueForLinkedHashMap(Map dataMap) { - LinkedHashMap returnMap = new LinkedHashMap(); - if (MapUtil.isEmpty(dataMap)) { - return returnMap; - } - Iterator iterator = dataMap.keySet().iterator(); - while (iterator.hasNext()) { - Object objKey = iterator.next(); - Object objValue = dataMap.get(objKey); - if (objValue instanceof Map) { - returnMap.put(objKey, getMapValueForLinkedHashMap((Map) objValue)); - } else { - returnMap.put(objKey, objValue); - } - } - return returnMap; + public void mysqlSink(Object transformResult, Map<String, String> sinkParams) throws Exception { + Map<String, List> transform = (Map<String, List>) JsonMapper.fromJsonString(JsonMapper.toJsonString(transformResult), Map.class); + List list = JobUtil.getFirstNotNull(transform); + exec(list, sinkParams); } /** @@ -121,7 +119,7 @@ public class DataSinkService { } pst.executeBatch(); conn.commit(); - logger.info("花费的时间:" + (System.currentTimeMillis() - start) + "ms"); + logger.info("花费的时间: {}" + (System.currentTimeMillis() - start) + "ms"); } catch (SQLException e) { logger.error(e.getMessage()); throw e; diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSourceService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSourceService.java index 7ddf60c..8909fee 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSourceService.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSourceService.java @@ -3,19 +3,27 @@ package com.mesalab.executor.service; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.io.FileUtil; import cn.hutool.core.io.IoUtil; +import cn.hutool.core.io.StreamProgress; import cn.hutool.core.io.file.FileReader; -import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.db.DbUtil; import cn.hutool.db.Entity; import cn.hutool.db.handler.EntityListHandler; import cn.hutool.db.sql.SqlExecutor; +import cn.hutool.http.HttpUtil; import cn.hutool.json.JSONUtil; import cn.hutool.log.Log; import cn.hutool.poi.excel.ExcelReader; import cn.hutool.poi.excel.ExcelUtil; import com.mesalab.executor.core.utils.DBUtils; +import com.mesalab.executor.core.utils.HttpClientUtils; +import com.mesalab.executor.exception.BusinessException; import com.mesalab.executor.pojo.DBParam; +import com.mesalab.executor.pojo.HttpParam; +import com.xxl.job.core.handler.IJobHandler; +import com.zdjizhi.utils.JsonMapper; +import io.netty.handler.codec.http.HttpMethod; import org.apache.commons.io.filefilter.FileFilterUtils; import org.springframework.stereotype.Service; @@ -37,26 +45,107 @@ public class DataSourceService { private Log logger = Log.get(); - public Map<String, List> adapt(Map<String, String> sourceParams) throws Exception { + public Map<String,Object> adapt(Map<String, String> sourceParams) throws Exception { String type = sourceParams.get("type"); //1. 文件类型 2. 选择处理工具 3. 转为格式数据 - if ("file".equals(type)) { - return fileSource(sourceParams); - } else if ("mysql".equals(type) || "mariadb".equals(type)) { - return MapUtil.of("mysql", mysqlSource(sourceParams)); + Map<String, Object> sourceResult = new HashMap<>(); + switch (type) { + case "file": + sourceResult = fileSource(sourceParams); + break; + case "mysql": + sourceResult.put("mysql", mysqlSource(sourceParams)); + break; + case "mariadb": + sourceResult.put("mysql", mysqlSource(sourceParams)); + break; + case "spider": + sourceResult.put("spider", spider(sourceParams)); + break; + case "http": + sourceResult.put("http", http(sourceParams)); + break; + default: + break; + } + return sourceResult; + } + + /** + * 网址数据源 + * + * @param sourceParams url,method,requestBody,resultKey,tableName + * @return + */ + private Map http(Map<String,String> sourceParams) throws Exception { + + String url = String.valueOf(sourceParams.get("url")); + String method = String.valueOf(sourceParams.get("method")); + Map requestBody = JSONUtil.toBean(JsonMapper.toJsonString(sourceParams.get("requestBody")), Map.class); + + String resultMessage = ""; + if (HttpMethod.POST.name().equalsIgnoreCase(method)) { + resultMessage = HttpClientUtils.httpPost(url, JsonMapper.toJsonString(requestBody)); + } else if (HttpMethod.GET.name().equalsIgnoreCase(method)) { + resultMessage = HttpClientUtils.httpGet(url); + } else { + throw new BusinessException(IJobHandler.FAIL.getCode(), "Not support Method:" + method); + } + if (resultMessage.equals("-1")) { + throw new BusinessException(IJobHandler.FAIL.getCode(), "Http Client connection Error!"); + } + Map map = (Map) JsonMapper.fromJsonString(resultMessage, Map.class); + + return map; + } + + private Map spider(Map<String, String> sourceParams) throws Exception { + + HttpParam httpParam = HttpParam.parserHttpParam(JSONUtil.toJsonStr(sourceParams)); + + if (ObjectUtil.isEmpty(httpParam)) { + throw new BusinessException("params parser error"); + } + if (HttpMethod.GET.name().equalsIgnoreCase(httpParam.getMethod())) { + //带进度显示的文件下载 + HttpUtil.downloadFile(httpParam.getUrl(), FileUtil.file(httpParam.getStore()), new StreamProgress() { + @Override + public void start() { + logger.info("start download . . . "); + } + + @Override + public void progress(long progressSize) { + logger.info("loading {}", FileUtil.readableFileSize(progressSize)); + } + + @Override + public void finish() { + logger.info("download over !"); + } + }); + } else { + throw new BusinessException(IJobHandler.FAIL.getCode(), "Not support Method:" + httpParam.getMethod()); } return null; } private List<Map> mysqlSource(Map<String, String> sourceParams) throws Exception { DBParam dbParam = BeanUtil.fillBeanWithMap(sourceParams, new DBParam(), false); - DBUtils dbUtils = new DBUtils(); - Connection conn = dbUtils.getDBConn(sourceParams); - String sql = dbParam.getSql(); - List<Entity> entityList = SqlExecutor.query(conn, sql, new EntityListHandler()); - logger.info("query result size {}", entityList.size()); - List<Map> collect = entityList.stream().map(x -> BeanUtil.beanToMap(x)).collect(Collectors.toList()); - DbUtil.close(conn); + List<Map> collect; + Connection conn = null; + DBUtils dbUtils = null; + try { + dbUtils = new DBUtils(); + conn = dbUtils.getDBConn(sourceParams); + String sql = dbParam.getSql(); + List<Entity> entityList = SqlExecutor.query(conn, sql, new EntityListHandler()); + logger.info("query result size {}", entityList.size()); + collect = entityList.stream().map(x -> BeanUtil.beanToMap(x)).collect(Collectors.toList()); + } finally { + DbUtil.close(conn); + dbUtils.close(); + } return collect; } @@ -69,13 +158,13 @@ public class DataSourceService { * * @return */ - public Map<String, List> fileSource(Map<String, String> params) throws Exception { + public Map<String, Object> fileSource(Map<String, String> params) throws Exception { //解析参数 String path = params.get("path"); String fileTypeParams = params.get("fileType"); String[] fileTypes = StrUtil.split(fileTypeParams, ","); //处理数据 - Map<String, List> typeResult = new HashMap(); + Map<String, Object> typeResult = new HashMap(); for (String fileType : fileTypes) { if ("json".equals(fileType)) { List<File> files = FileUtil.loopFiles(Paths.get(path), 1, FileFilterUtils.suffixFileFilter(fileType)); diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataTransformService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataTransformService.java index 5d06da1..61598cd 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataTransformService.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataTransformService.java @@ -1,9 +1,9 @@ package com.mesalab.executor.service; -import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; -import cn.hutool.json.JSONNull; import cn.hutool.json.JSONUtil; +import com.mesalab.executor.core.utils.JobUtil; import org.apache.commons.collections4.ListUtils; import org.springframework.stereotype.Service; @@ -29,13 +29,21 @@ public class DataTransformService { * 业务转换 * * @param params - * @return + * @return Map<String, List> */ - public Map<String, List> adapt(Map<String, String> params, Map<String, List> sourceResult) throws Exception { - if ("websketch".equals(params.get("type"))) { - return MapUtil.of("mysql", webSketch(sourceResult)); + public Map<String, Object> adapt(Map<String, String> params, Object sourceResult) throws Exception { + Map<String, Object> transformResult = new HashMap<>(); + if (ObjectUtil.isEmpty(params.get("type"))) { + return transformResult; + } + switch (params.get("type")) { + case "websketch": + transformResult.put("websketch", webSketch(sourceResult)); + break; + default: + break; } - return null; + return transformResult; } /** @@ -43,9 +51,10 @@ public class DataTransformService { * threat_type 字段相同则不入库 * @return */ - public List<Map> webSketch(Map<String, List> sourceResult) throws Exception { - List<Map> json = sourceResult.get("json"); - List<Map> malwareData = sourceResult.get("xlsx"); + public List<Map> webSketch(Object sourceResult) throws Exception { + Map<String, List> source = JSONUtil.toBean(JSONUtil.toJsonStr(sourceResult), Map.class); + List<Map> json = source.get("json"); + List<Map> malwareData = source.get("xlsx"); Map<String, Map<String, List<String>>> malware = getMalware(malwareData); Map<String, List<String>> soft = malware.get(SOFTWARE); Map<String, List<String>> associatedGroups = malware.get(ASSOCIATED_GROUPS); @@ -58,27 +67,33 @@ public class DataTransformService { final String IP_PORT = "ip:port";// ioc_type改为port final String CAC = "command and control";// ioc_type改为port final String PE = "payload delivery";// ioc_type改为port - final String FIELDS = "ioc_value,ioc_type,threat_type,malware_alias,confidence_level,reference"; + final String FIELDS = "ioc_type,threat_type,malware_alias,confidence_level,reference"; long now = System.currentTimeMillis() / 1000; - //处理 + //处理 ioc=iov_value List<Map> transformResult = new ArrayList<>(); - Map<Object, Object> objectObjectHashMap = new HashMap<>(); for (Map map : json) { List<Map> data = (List<Map>) map.get("data"); for (Map datum : data) { Object ioc_type = datum.get("ioc_type"); if (iocTypeFilter.contains(ioc_type)) { - Map extMap = extractMap(datum, FIELDS); + Map extMap = JobUtil.extractMap(datum, FIELDS); + String iocValue = String.valueOf(Optional.ofNullable(datum.get("ioc_value")).orElse(datum.get("ioc"))); + extMap.put("ioc_value", iocValue); + + Object first = Optional.ofNullable(datum.get("first_seen_utc")).orElse(datum.get("first_seen")); + Object last = Optional.ofNullable(datum.get("last_seen_utc")).orElse(datum.get("last_seen")); + extMap.put("first_report_time", JobUtil.nullToDef(first, "")); + extMap.put("last_seen_time", JobUtil.nullToDef(last, "")); + if (IP_PORT.equals(ioc_type)) { extMap.put("ioc_type", IP); - String iocValue = String.valueOf(extMap.get("ioc_value")); extMap.put("ioc_value", iocValue.contains(":") ? iocValue.substring(0, iocValue.lastIndexOf(":")) : iocValue); } String threat_type = String.valueOf(datum.get("threat_type")); if ("botnet_cc".equals(threat_type)) { extMap.put("threat_type", CAC); - } else if ("payload".equals(threat_type)) { + } else if ("payload".equals(threat_type) || "payload_delivery".equals(threat_type)) { extMap.put("threat_type", PE); } @@ -104,8 +119,7 @@ public class DataTransformService { extMap.put("source", SOURCE); extMap.put("malware_name", malwareName); - extMap.put("first_report_time", nullToDef(datum, "first_seen_utc", "")); - extMap.put("last_seen_time", nullToDef(datum, "last_seen_utc", "")); + extMap.put("create_time", now); extMap.put("update_time", now); @@ -116,15 +130,6 @@ public class DataTransformService { return transformResult; } - public Object nullToDef(Map datum, String key, String def) { - if (datum.get(key) instanceof JSONNull) { - return def; - } else if (datum.get(key) instanceof String) { - return datum.get(key) == null || "null".equals(datum.get(key)) ? def : StrUtil.toString(datum.get(key)); - } - return datum.get(key); - } - /** * {key, value} * @@ -170,14 +175,5 @@ public class DataTransformService { return malwareMap; } - //1.将null转为"" - public Map extractMap(Map source, String keys) { - String[] keysArr = StrUtil.split(keys, ","); - Map m = new HashMap(); - for (String key : keysArr) { - m.put(key, nullToDef(source, key, "")); - } - return m; - } } diff --git a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DataExtractTest.java b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DataExtractTest.java index 01f7827..675ba15 100644 --- a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DataExtractTest.java +++ b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DataExtractTest.java @@ -7,6 +7,7 @@ import cn.hutool.core.io.file.FileWriter; import cn.hutool.core.map.CaseInsensitiveMap; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; +import cn.hutool.http.HttpUtil; import cn.hutool.json.JSONUtil; import cn.hutool.log.Log; import cn.hutool.poi.excel.ExcelReader; @@ -247,5 +248,28 @@ public class DataExtractTest { logger.info("download file to path {}", file.getAbsolutePath()); } } + @Test + public void spider(){ + HashMap<String, Object> map = new HashMap<>(); + HttpUtil.downloadFile("https://threatfox-api.abuse.ch/api/v1?query=get_iocs&days=7","D:\\test\\test\\2\\"); + } + @Test + public void http(){ + String type = null; + switch(type){ + case "mariadb": + System.err.println(1); + break; + case "mysql": + System.err.println(1); + break; + case "file": + System.err.println(1); + break; + default: + break; + } + } + }
\ No newline at end of file |
