summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-04-11 18:42:17 +0800
committerzhanghongqing <[email protected]>2022-04-11 18:42:17 +0800
commit200983d95fd38c5bb31dd14a736d4d21552ae265 (patch)
tree27db354d2edf1c25e26f741c5cb9202d9f577281
parent87952f4eba56fe82320e7a80a8e2e81d4c295aba (diff)
websketch 导入导出文件功能代码优化
-rw-r--r--galaxy-job-executor/pom.xml2
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/JobUtil.java38
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/enums/DataTypeEnum.java32
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataExtractJob.java28
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataflowJob.java30
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/HttpParam.java63
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java70
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSourceService.java119
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataTransformService.java68
-rw-r--r--galaxy-job-executor/src/test/java/com/mesalab/executor/test/DataExtractTest.java24
10 files changed, 337 insertions, 137 deletions
diff --git a/galaxy-job-executor/pom.xml b/galaxy-job-executor/pom.xml
index bd40cb5..661a5b7 100644
--- a/galaxy-job-executor/pom.xml
+++ b/galaxy-job-executor/pom.xml
@@ -169,7 +169,7 @@
<JAR_FILE>${project.build.finalName}.xjar</JAR_FILE>
</buildArgs>
<imageTags>
- <imageTag>v1.3.220303</imageTag>
+ <imageTag>v1.3.2204011-dev</imageTag>
</imageTags>
<resources>
<resource>
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/JobUtil.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/JobUtil.java
index f264898..d9d97fe 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/JobUtil.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/core/utils/JobUtil.java
@@ -6,15 +6,14 @@ import cn.hutool.core.text.csv.CsvUtil;
import cn.hutool.core.text.csv.CsvWriter;
import cn.hutool.core.util.CharsetUtil;
import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.json.JSONNull;
import cn.hutool.log.Log;
import com.mesalab.executor.exception.BusinessException;
import com.xxl.job.core.log.XxlJobLogger;
import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
public class JobUtil {
private static Log logger = Log.get();
@@ -61,6 +60,7 @@ public class JobUtil {
return null;
}
}
+
/**
* 获取map中第一个非空数据值
*
@@ -79,4 +79,34 @@ public class JobUtil {
}
return obj;
}
+
+ public static Object nullToDef(Map datum, String key, String def) {
+ if (datum.get(key) instanceof JSONNull) {
+ return def;
+ } else if (datum.get(key) instanceof String) {
+ return datum.get(key) == null || "null".equals(datum.get(key)) ? def : StrUtil.toString(datum.get(key));
+ }
+ return datum.get(key);
+ }
+
+ public static Object nullToDef(Object dataum, String def) {
+ if (dataum instanceof JSONNull) {
+ return def;
+ } else if (dataum instanceof String) {
+ return dataum == null || "null".equals(dataum) ? def : StrUtil.toString(dataum);
+ }
+ return dataum;
+ }
+ /**
+ * 获取map中的指定字段
+ * 1.将null转为""
+ */
+ public static Map extractMap(Map source, String keys) {
+ String[] keysArr = StrUtil.split(keys, ",");
+ Map m = new HashMap();
+ for (String key : keysArr) {
+ m.put(key, JobUtil.nullToDef(source, key, ""));
+ }
+ return m;
+ }
}
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/enums/DataTypeEnum.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/enums/DataTypeEnum.java
new file mode 100644
index 0000000..1a4427a
--- /dev/null
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/enums/DataTypeEnum.java
@@ -0,0 +1,32 @@
+package com.mesalab.executor.enums;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+/**
+ * @description:
+ * @author: zhq
+ * @create: 2022-04-08
+ **/
+@NoArgsConstructor
+@AllArgsConstructor
+@Getter
+public enum DataTypeEnum {
+
+ FILE("file"),
+ MYSQL("mysql");
+
+ private String name;
+
+ public static DataTypeEnum match(String name) {
+ if (name != null) {
+ for (DataTypeEnum item : DataTypeEnum.values()) {
+ if (item.name().equals(name)) {
+ return item;
+ }
+ }
+ }
+ return null;
+ }
+}
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataExtractJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataExtractJob.java
index f240b60..c877209 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataExtractJob.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataExtractJob.java
@@ -14,7 +14,6 @@ import com.zdjizhi.utils.StringUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.util.List;
import java.util.Map;
/**
@@ -64,20 +63,18 @@ public class DataExtractJob {
logger.error("params parser error , params is {}", params);
return IJobHandler.FAIL;
}
- logger.info(params);
- Map<String, String> sourceParams = paramsMap.get("source");
- Map<String, String> transformParams = paramsMap.get("transform");
- Map<String, String> sinkParams = paramsMap.get("sink");
+ logger.info("query params is {}", params);
- Map<String, List> sourceResult = dataSourceService.adapt(sourceParams);
- if (ObjectUtil.isNotEmpty(transformParams)) {
- Map<String, List> transformResult = dataTransformService.adapt(transformParams, sourceResult);
- dataSinkService.adapt(transformResult, sinkParams);
+ Map<String,Object> sourceResult = dataSourceService.adapt(paramsMap.get("source"));
+ if (ObjectUtil.isNotEmpty(paramsMap.get("transform"))) {
+ Map<String, Object> transformResult = dataTransformService.adapt(paramsMap.get("transform"), sourceResult);
+ dataSinkService.adapt(transformResult, paramsMap.get("sink"));
} else {
- dataSinkService.adapt(sourceResult, sinkParams);
+ dataSinkService.adapt(sourceResult, paramsMap.get("sink"));
}
return ReturnT.SUCCESS;
} catch (Exception e) {
+ e.printStackTrace();
XxlJobLogger.log(e.getMessage());
logger.error(e.getMessage());
return ReturnT.FAIL;
@@ -85,7 +82,6 @@ public class DataExtractJob {
}
-
private Map<String, Map<String, String>> parserParams(String params) {
if (StringUtil.isBlank(params)) {
XxlJobLogger.log("params is empty!");
@@ -96,16 +92,16 @@ public class DataExtractJob {
XxlJobLogger.log("params is not format json ! ");
return null;
}
- if (ObjectUtil.isEmpty(param.get("sink"))) {
- XxlJobLogger.log("sink is empty ! ");
+ if (ObjectUtil.isEmpty(param.get("source"))) {
+ XxlJobLogger.log("source is empty ! ");
return null;
}
if (ObjectUtil.isNull(param.get("transform"))) {
- XxlJobLogger.log("transform is empty ! ");
+ XxlJobLogger.log("transform is null ! ");
return null;
}
- if (ObjectUtil.isEmpty(param.get("source"))) {
- XxlJobLogger.log("source is empty ! ");
+ if (ObjectUtil.isNull(param.get("sink"))) {
+ XxlJobLogger.log("sink is null ! ");
return null;
}
return param;
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataflowJob.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataflowJob.java
index 1f1d2a7..55234ef 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataflowJob.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/jobhandler/DataflowJob.java
@@ -6,7 +6,6 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.ZipUtil;
import cn.hutool.crypto.digest.DigestUtil;
-import cn.hutool.json.JSONUtil;
import cn.hutool.log.Log;
import com.google.common.base.Splitter;
import com.mesalab.executor.core.config.StorgeConfig;
@@ -141,7 +140,7 @@ public class DataflowJob {
@XxlJob("httpToStoreJobHandler")
public ReturnT<String> httpToStoreJobHandler(String params) {
- List<HttpParam> httpParams = parserHttpParams(params);
+ List<HttpParam> httpParams = HttpParam.parserHttpParams(params);
if (ObjectUtil.isEmpty(httpParams)) {
logger.error("params parser error , params is {}", params);
return IJobHandler.FAIL;
@@ -238,31 +237,6 @@ public class DataflowJob {
return results;
}
- private List<HttpParam> parserHttpParams(String params) {
- if (StringUtil.isBlank(params)) {
- XxlJobLogger.log("params is empty!");
- return null;
- }
- List<HttpParam> httpParams = JSONUtil.toList(params, HttpParam.class);
- for (HttpParam httpParam : httpParams) {
- if (ObjectUtil.isNull(httpParam)) {
- XxlJobLogger.log("params is not format json ! ");
- return null;
- }
- if (StringUtil.isBlank(httpParam.getUrl())) {
- XxlJobLogger.log("url is empty ! ");
- return null;
- }
- if (StringUtil.isEmpty(httpParam.getStore())) {
- XxlJobLogger.log("store is empty ! ");
- return null;
- }
- if (StringUtil.isBlank(httpParam.getMethod())) {
- XxlJobLogger.log("this request method is default get");
- httpParam.setMethod(HttpMethod.GET.name());
- }
- }
- return httpParams;
- }
+
}
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/HttpParam.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/HttpParam.java
index 208e097..88a0e84 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/HttpParam.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/pojo/HttpParam.java
@@ -1,8 +1,17 @@
package com.mesalab.executor.pojo;
-import lombok.*;
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.json.JSONUtil;
+import com.xxl.job.core.log.XxlJobLogger;
+import com.zdjizhi.utils.StringUtil;
+import io.netty.handler.codec.http.HttpMethod;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
import java.io.Serializable;
+import java.util.List;
import java.util.Map;
@Data
@@ -15,4 +24,56 @@ public class HttpParam implements Serializable {
private String method;
private String resultKey;//指定结果集返回值
private Map<String, Object> requestBody;// http post请求参数
+
+ public static List<HttpParam> parserHttpParams(String params) {
+ if (StringUtil.isBlank(params)) {
+ XxlJobLogger.log("params is empty!");
+ return null;
+ }
+ List<HttpParam> httpParams = JSONUtil.toList(params, HttpParam.class);
+ for (HttpParam httpParam : httpParams) {
+ if (ObjectUtil.isNull(httpParam)) {
+ XxlJobLogger.log("params is not format json ! ");
+ return null;
+ }
+ if (StringUtil.isBlank(httpParam.getUrl())) {
+ XxlJobLogger.log("url is empty ! ");
+ return null;
+ }
+ if (StringUtil.isEmpty(httpParam.getStore())) {
+ XxlJobLogger.log("store is empty ! ");
+ return null;
+ }
+ if (StringUtil.isBlank(httpParam.getMethod())) {
+ XxlJobLogger.log("this request method is default get");
+ httpParam.setMethod(HttpMethod.GET.name());
+ }
+ }
+ return httpParams;
+ }
+
+ public static HttpParam parserHttpParam(String params) {
+ if (StringUtil.isBlank(params)) {
+ XxlJobLogger.log("params is empty!");
+ return null;
+ }
+ HttpParam httpParam = JSONUtil.toBean(params, HttpParam.class,true);
+ if (ObjectUtil.isNull(httpParam)) {
+ XxlJobLogger.log("params is not format json ! ");
+ return null;
+ }
+ if (StringUtil.isBlank(httpParam.getUrl())) {
+ XxlJobLogger.log("url is empty ! ");
+ return null;
+ }
+ if (StringUtil.isEmpty(httpParam.getStore())) {
+ XxlJobLogger.log("store is empty ! ");
+ return null;
+ }
+ if (StringUtil.isBlank(httpParam.getMethod())) {
+ XxlJobLogger.log("this request method is default get");
+ httpParam.setMethod(HttpMethod.GET.name());
+ }
+ return httpParam;
+ }
}
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java
index a00178c..5435b89 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java
@@ -1,7 +1,7 @@
package com.mesalab.executor.service;
import cn.hutool.core.io.file.FileWriter;
-import cn.hutool.core.map.MapUtil;
+import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import com.alibaba.fastjson.JSON;
@@ -9,6 +9,8 @@ import com.alibaba.fastjson.serializer.SerializerFeature;
import com.mesalab.executor.core.utils.DBUtils;
import com.mesalab.executor.core.utils.JobUtil;
import com.zdjizhi.utils.DateUtils;
+import com.zdjizhi.utils.JsonMapper;
+import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -16,8 +18,6 @@ import java.io.File;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -30,55 +30,53 @@ import java.util.Map;
public class DataSinkService {
private Log logger = Log.get();
- public void adapt(Map<String, List> transformResult, Map<String, String> sinkParams) throws Exception {
- String type = sinkParams.get("type");
+ public void adapt(Map<String, Object> transformResult, Map<String, String> sinkParams) throws Exception {
+ if (ObjectUtil.isEmpty(sinkParams.get("type"))) {
+ return;
+ }
//1. 文件类型 2. 选择处理工具 3. 转为格式数据
- if ("mariadb".equals(type) || "mysql".equals(type)) {
- mysqlSink(JobUtil.getFirstNotNull(transformResult), sinkParams);
- } else if ("file".equals(type)) {
- fileSink(JobUtil.getFirstNotNull(transformResult), sinkParams);
+ switch (sinkParams.get("type")) {
+ case "mariadb":
+ mysqlSink(transformResult, sinkParams);
+ break;
+ case "mysql":
+ mysqlSink(transformResult, sinkParams);
+ break;
+ case "file":
+ fileSink(JobUtil.getFirstNotNull(transformResult), sinkParams);
+ break;
+ default:
+ break;
}
}
- public void fileSink(List<Map> transformResult, Map<String, String> sinkParams) throws Exception {
+ public void fileSink(Object transformResult, Map<String, String> sinkParams) throws Exception {
String fileType = sinkParams.get("fileType");
String path = sinkParams.get("path");
String prefix = sinkParams.get("prefix");
- String date = DateUtils.getCurrentDate("yyyyMMdd");
- String filePath = StrUtil.concat(false, path, prefix, "_", date);
+ String date = "";
+ if (StringUtils.isNotBlank(sinkParams.get("datePattern"))) {
+ date = "_".concat(DateUtils.getCurrentDate("yyyyMMdd"));
+ }
+ String filePath = StrUtil.concat(false, path, prefix, date);
if ("json".equals(fileType)) {
FileWriter fileWriter = new FileWriter(filePath.concat(".json"));
File file = fileWriter.write(JSON.toJSONString(transformResult, SerializerFeature.PrettyFormat));
- logger.info("download {} pieces of json data to path {}", transformResult.size(), file.getAbsolutePath());
+ logger.info("download json data to path {}", file.getAbsolutePath());
} else if ("csv".equals(fileType)) {
- File file = JobUtil.createCsvFile(transformResult, filePath.concat(".csv"));
- logger.info("download {} pieces of csv data to path {}", transformResult.size(), file.getAbsolutePath());
+ List<Map> transform = (List<Map>) JsonMapper.fromJsonString(JsonMapper.toJsonString(transformResult), List.class);
+ File file = JobUtil.createCsvFile(transform, filePath.concat(".csv"));
+ logger.info("download {} pieces of csv data to path {}", transform.size(), file.getAbsolutePath());
}
}
/**
* @return
*/
- public void mysqlSink(List<Map> transformResult, Map<String, String> sinkParams) throws Exception {
- exec(transformResult, sinkParams);
- }
-
- public static LinkedHashMap getMapValueForLinkedHashMap(Map dataMap) {
- LinkedHashMap returnMap = new LinkedHashMap();
- if (MapUtil.isEmpty(dataMap)) {
- return returnMap;
- }
- Iterator iterator = dataMap.keySet().iterator();
- while (iterator.hasNext()) {
- Object objKey = iterator.next();
- Object objValue = dataMap.get(objKey);
- if (objValue instanceof Map) {
- returnMap.put(objKey, getMapValueForLinkedHashMap((Map) objValue));
- } else {
- returnMap.put(objKey, objValue);
- }
- }
- return returnMap;
+ public void mysqlSink(Object transformResult, Map<String, String> sinkParams) throws Exception {
+ Map<String, List> transform = (Map<String, List>) JsonMapper.fromJsonString(JsonMapper.toJsonString(transformResult), Map.class);
+ List list = JobUtil.getFirstNotNull(transform);
+ exec(list, sinkParams);
}
/**
@@ -121,7 +119,7 @@ public class DataSinkService {
}
pst.executeBatch();
conn.commit();
- logger.info("花费的时间:" + (System.currentTimeMillis() - start) + "ms");
+ logger.info("花费的时间: {}" + (System.currentTimeMillis() - start) + "ms");
} catch (SQLException e) {
logger.error(e.getMessage());
throw e;
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSourceService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSourceService.java
index 7ddf60c..8909fee 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSourceService.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSourceService.java
@@ -3,19 +3,27 @@ package com.mesalab.executor.service;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.IoUtil;
+import cn.hutool.core.io.StreamProgress;
import cn.hutool.core.io.file.FileReader;
-import cn.hutool.core.map.MapUtil;
+import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.db.DbUtil;
import cn.hutool.db.Entity;
import cn.hutool.db.handler.EntityListHandler;
import cn.hutool.db.sql.SqlExecutor;
+import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONUtil;
import cn.hutool.log.Log;
import cn.hutool.poi.excel.ExcelReader;
import cn.hutool.poi.excel.ExcelUtil;
import com.mesalab.executor.core.utils.DBUtils;
+import com.mesalab.executor.core.utils.HttpClientUtils;
+import com.mesalab.executor.exception.BusinessException;
import com.mesalab.executor.pojo.DBParam;
+import com.mesalab.executor.pojo.HttpParam;
+import com.xxl.job.core.handler.IJobHandler;
+import com.zdjizhi.utils.JsonMapper;
+import io.netty.handler.codec.http.HttpMethod;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.springframework.stereotype.Service;
@@ -37,26 +45,107 @@ public class DataSourceService {
private Log logger = Log.get();
- public Map<String, List> adapt(Map<String, String> sourceParams) throws Exception {
+ public Map<String,Object> adapt(Map<String, String> sourceParams) throws Exception {
String type = sourceParams.get("type");
//1. 文件类型 2. 选择处理工具 3. 转为格式数据
- if ("file".equals(type)) {
- return fileSource(sourceParams);
- } else if ("mysql".equals(type) || "mariadb".equals(type)) {
- return MapUtil.of("mysql", mysqlSource(sourceParams));
+ Map<String, Object> sourceResult = new HashMap<>();
+ switch (type) {
+ case "file":
+ sourceResult = fileSource(sourceParams);
+ break;
+ case "mysql":
+ sourceResult.put("mysql", mysqlSource(sourceParams));
+ break;
+ case "mariadb":
+ sourceResult.put("mysql", mysqlSource(sourceParams));
+ break;
+ case "spider":
+ sourceResult.put("spider", spider(sourceParams));
+ break;
+ case "http":
+ sourceResult.put("http", http(sourceParams));
+ break;
+ default:
+ break;
+ }
+ return sourceResult;
+ }
+
+ /**
+ * 网址数据源
+ *
+ * @param sourceParams url,method,requestBody,resultKey,tableName
+ * @return
+ */
+ private Map http(Map<String,String> sourceParams) throws Exception {
+
+ String url = String.valueOf(sourceParams.get("url"));
+ String method = String.valueOf(sourceParams.get("method"));
+ Map requestBody = JSONUtil.toBean(JsonMapper.toJsonString(sourceParams.get("requestBody")), Map.class);
+
+ String resultMessage = "";
+ if (HttpMethod.POST.name().equalsIgnoreCase(method)) {
+ resultMessage = HttpClientUtils.httpPost(url, JsonMapper.toJsonString(requestBody));
+ } else if (HttpMethod.GET.name().equalsIgnoreCase(method)) {
+ resultMessage = HttpClientUtils.httpGet(url);
+ } else {
+ throw new BusinessException(IJobHandler.FAIL.getCode(), "Not support Method:" + method);
+ }
+ if (resultMessage.equals("-1")) {
+ throw new BusinessException(IJobHandler.FAIL.getCode(), "Http Client connection Error!");
+ }
+ Map map = (Map) JsonMapper.fromJsonString(resultMessage, Map.class);
+
+ return map;
+ }
+
+ private Map spider(Map<String, String> sourceParams) throws Exception {
+
+ HttpParam httpParam = HttpParam.parserHttpParam(JSONUtil.toJsonStr(sourceParams));
+
+ if (ObjectUtil.isEmpty(httpParam)) {
+ throw new BusinessException("params parser error");
+ }
+ if (HttpMethod.GET.name().equalsIgnoreCase(httpParam.getMethod())) {
+ //带进度显示的文件下载
+ HttpUtil.downloadFile(httpParam.getUrl(), FileUtil.file(httpParam.getStore()), new StreamProgress() {
+ @Override
+ public void start() {
+ logger.info("start download . . . ");
+ }
+
+ @Override
+ public void progress(long progressSize) {
+ logger.info("loading {}", FileUtil.readableFileSize(progressSize));
+ }
+
+ @Override
+ public void finish() {
+ logger.info("download over !");
+ }
+ });
+ } else {
+ throw new BusinessException(IJobHandler.FAIL.getCode(), "Not support Method:" + httpParam.getMethod());
}
return null;
}
private List<Map> mysqlSource(Map<String, String> sourceParams) throws Exception {
DBParam dbParam = BeanUtil.fillBeanWithMap(sourceParams, new DBParam(), false);
- DBUtils dbUtils = new DBUtils();
- Connection conn = dbUtils.getDBConn(sourceParams);
- String sql = dbParam.getSql();
- List<Entity> entityList = SqlExecutor.query(conn, sql, new EntityListHandler());
- logger.info("query result size {}", entityList.size());
- List<Map> collect = entityList.stream().map(x -> BeanUtil.beanToMap(x)).collect(Collectors.toList());
- DbUtil.close(conn);
+ List<Map> collect;
+ Connection conn = null;
+ DBUtils dbUtils = null;
+ try {
+ dbUtils = new DBUtils();
+ conn = dbUtils.getDBConn(sourceParams);
+ String sql = dbParam.getSql();
+ List<Entity> entityList = SqlExecutor.query(conn, sql, new EntityListHandler());
+ logger.info("query result size {}", entityList.size());
+ collect = entityList.stream().map(x -> BeanUtil.beanToMap(x)).collect(Collectors.toList());
+ } finally {
+ DbUtil.close(conn);
+ dbUtils.close();
+ }
return collect;
}
@@ -69,13 +158,13 @@ public class DataSourceService {
*
* @return
*/
- public Map<String, List> fileSource(Map<String, String> params) throws Exception {
+ public Map<String, Object> fileSource(Map<String, String> params) throws Exception {
//解析参数
String path = params.get("path");
String fileTypeParams = params.get("fileType");
String[] fileTypes = StrUtil.split(fileTypeParams, ",");
//处理数据
- Map<String, List> typeResult = new HashMap();
+ Map<String, Object> typeResult = new HashMap();
for (String fileType : fileTypes) {
if ("json".equals(fileType)) {
List<File> files = FileUtil.loopFiles(Paths.get(path), 1, FileFilterUtils.suffixFileFilter(fileType));
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataTransformService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataTransformService.java
index 5d06da1..61598cd 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataTransformService.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataTransformService.java
@@ -1,9 +1,9 @@
package com.mesalab.executor.service;
-import cn.hutool.core.map.MapUtil;
+import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
-import cn.hutool.json.JSONNull;
import cn.hutool.json.JSONUtil;
+import com.mesalab.executor.core.utils.JobUtil;
import org.apache.commons.collections4.ListUtils;
import org.springframework.stereotype.Service;
@@ -29,13 +29,21 @@ public class DataTransformService {
* 业务转换
*
* @param params
- * @return
+ * @return Map<String, List>
*/
- public Map<String, List> adapt(Map<String, String> params, Map<String, List> sourceResult) throws Exception {
- if ("websketch".equals(params.get("type"))) {
- return MapUtil.of("mysql", webSketch(sourceResult));
+ public Map<String, Object> adapt(Map<String, String> params, Object sourceResult) throws Exception {
+ Map<String, Object> transformResult = new HashMap<>();
+ if (ObjectUtil.isEmpty(params.get("type"))) {
+ return transformResult;
+ }
+ switch (params.get("type")) {
+ case "websketch":
+ transformResult.put("websketch", webSketch(sourceResult));
+ break;
+ default:
+ break;
}
- return null;
+ return transformResult;
}
/**
@@ -43,9 +51,10 @@ public class DataTransformService {
* threat_type 字段相同则不入库
* @return
*/
- public List<Map> webSketch(Map<String, List> sourceResult) throws Exception {
- List<Map> json = sourceResult.get("json");
- List<Map> malwareData = sourceResult.get("xlsx");
+ public List<Map> webSketch(Object sourceResult) throws Exception {
+ Map<String, List> source = JSONUtil.toBean(JSONUtil.toJsonStr(sourceResult), Map.class);
+ List<Map> json = source.get("json");
+ List<Map> malwareData = source.get("xlsx");
Map<String, Map<String, List<String>>> malware = getMalware(malwareData);
Map<String, List<String>> soft = malware.get(SOFTWARE);
Map<String, List<String>> associatedGroups = malware.get(ASSOCIATED_GROUPS);
@@ -58,27 +67,33 @@ public class DataTransformService {
final String IP_PORT = "ip:port";// ioc_type改为port
final String CAC = "command and control";// ioc_type改为port
final String PE = "payload delivery";// ioc_type改为port
- final String FIELDS = "ioc_value,ioc_type,threat_type,malware_alias,confidence_level,reference";
+ final String FIELDS = "ioc_type,threat_type,malware_alias,confidence_level,reference";
long now = System.currentTimeMillis() / 1000;
- //处理
+ //处理 ioc=iov_value
List<Map> transformResult = new ArrayList<>();
- Map<Object, Object> objectObjectHashMap = new HashMap<>();
for (Map map : json) {
List<Map> data = (List<Map>) map.get("data");
for (Map datum : data) {
Object ioc_type = datum.get("ioc_type");
if (iocTypeFilter.contains(ioc_type)) {
- Map extMap = extractMap(datum, FIELDS);
+ Map extMap = JobUtil.extractMap(datum, FIELDS);
+ String iocValue = String.valueOf(Optional.ofNullable(datum.get("ioc_value")).orElse(datum.get("ioc")));
+ extMap.put("ioc_value", iocValue);
+
+ Object first = Optional.ofNullable(datum.get("first_seen_utc")).orElse(datum.get("first_seen"));
+ Object last = Optional.ofNullable(datum.get("last_seen_utc")).orElse(datum.get("last_seen"));
+ extMap.put("first_report_time", JobUtil.nullToDef(first, ""));
+ extMap.put("last_seen_time", JobUtil.nullToDef(last, ""));
+
if (IP_PORT.equals(ioc_type)) {
extMap.put("ioc_type", IP);
- String iocValue = String.valueOf(extMap.get("ioc_value"));
extMap.put("ioc_value", iocValue.contains(":") ? iocValue.substring(0, iocValue.lastIndexOf(":")) : iocValue);
}
String threat_type = String.valueOf(datum.get("threat_type"));
if ("botnet_cc".equals(threat_type)) {
extMap.put("threat_type", CAC);
- } else if ("payload".equals(threat_type)) {
+ } else if ("payload".equals(threat_type) || "payload_delivery".equals(threat_type)) {
extMap.put("threat_type", PE);
}
@@ -104,8 +119,7 @@ public class DataTransformService {
extMap.put("source", SOURCE);
extMap.put("malware_name", malwareName);
- extMap.put("first_report_time", nullToDef(datum, "first_seen_utc", ""));
- extMap.put("last_seen_time", nullToDef(datum, "last_seen_utc", ""));
+
extMap.put("create_time", now);
extMap.put("update_time", now);
@@ -116,15 +130,6 @@ public class DataTransformService {
return transformResult;
}
- public Object nullToDef(Map datum, String key, String def) {
- if (datum.get(key) instanceof JSONNull) {
- return def;
- } else if (datum.get(key) instanceof String) {
- return datum.get(key) == null || "null".equals(datum.get(key)) ? def : StrUtil.toString(datum.get(key));
- }
- return datum.get(key);
- }
-
/**
* {key, value}
*
@@ -170,14 +175,5 @@ public class DataTransformService {
return malwareMap;
}
- //1.将null转为""
- public Map extractMap(Map source, String keys) {
- String[] keysArr = StrUtil.split(keys, ",");
- Map m = new HashMap();
- for (String key : keysArr) {
- m.put(key, nullToDef(source, key, ""));
- }
- return m;
- }
}
diff --git a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DataExtractTest.java b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DataExtractTest.java
index 01f7827..675ba15 100644
--- a/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DataExtractTest.java
+++ b/galaxy-job-executor/src/test/java/com/mesalab/executor/test/DataExtractTest.java
@@ -7,6 +7,7 @@ import cn.hutool.core.io.file.FileWriter;
import cn.hutool.core.map.CaseInsensitiveMap;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
+import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONUtil;
import cn.hutool.log.Log;
import cn.hutool.poi.excel.ExcelReader;
@@ -247,5 +248,28 @@ public class DataExtractTest {
logger.info("download file to path {}", file.getAbsolutePath());
}
}
+ @Test
+ public void spider(){
+ HashMap<String, Object> map = new HashMap<>();
+ HttpUtil.downloadFile("https://threatfox-api.abuse.ch/api/v1?query=get_iocs&days=7","D:\\test\\test\\2\\");
+ }
+ @Test
+ public void http(){
+ String type = null;
+ switch(type){
+ case "mariadb":
+ System.err.println(1);
+ break;
+ case "mysql":
+ System.err.println(1);
+ break;
+ case "file":
+ System.err.println(1);
+ break;
+ default:
+ break;
+ }
+ }
+
} \ No newline at end of file