summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-04-26 17:19:34 +0800
committerzhanghongqing <[email protected]>2022-04-26 17:19:34 +0800
commite19d2ce575220634b32444da9739d309965ad442 (patch)
tree29ba499291ed55733f1e0badb978a904e36ede07
parent5820b3b2d4293b5a403fecab66654f4c507f8a80 (diff)
websketch 文件导出支持同时输出json,csv文件,数据入库改为手动控制事务
-rw-r--r--galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java31
1 files changed, 16 insertions, 15 deletions
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java
index 57ffa54..80d7f47 100644
--- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java
+++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java
@@ -13,12 +13,10 @@ import com.zdjizhi.utils.DateUtils;
import com.zdjizhi.utils.JsonMapper;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.sql.Connection;
import java.sql.PreparedStatement;
-import java.sql.SQLException;
import java.util.List;
import java.util.Map;
@@ -57,7 +55,7 @@ public class DataSinkService {
XxlJobLogger.log("file sink data is empty ");
return;
}
- String fileType = sinkParams.get("fileType");
+ String[] fileTypes = StrUtil.split(sinkParams.get("fileType"), ",");
String path = sinkParams.get("path");
String prefix = sinkParams.get("prefix");
String date = "";
@@ -65,17 +63,20 @@ public class DataSinkService {
date = "_".concat(DateUtils.getCurrentDate("yyyyMMdd"));
}
String filePath = StrUtil.concat(false, path, prefix, date);
- if ("json".equals(fileType)) {
- FileWriter fileWriter = new FileWriter(filePath.concat(".json"));
- File file = fileWriter.write(JSON.toJSONString(transformResult, SerializerFeature.PrettyFormat));
- logger.info("download json data to path {}", file.getAbsolutePath());
- XxlJobLogger.log("download json data to path {}", file.getAbsolutePath());
- } else if ("csv".equals(fileType)) {
- List<Map> transform = (List<Map>) JsonMapper.fromJsonString(JsonMapper.toJsonString(transformResult), List.class);
- File file = JobUtil.createCsvFile(transform, filePath.concat(".csv"));
- logger.info("download {} pieces of csv data to path {}", transform.size(), file.getAbsolutePath());
- XxlJobLogger.log("download {} pieces of csv data to path {}", transform.size(), file.getAbsolutePath());
+ for (String fileType : fileTypes) {
+ if ("json".equals(fileType)) {
+ FileWriter fileWriter = new FileWriter(filePath.concat(".json"));
+ File file = fileWriter.write(JSON.toJSONString(transformResult, SerializerFeature.PrettyFormat));
+ logger.info("download json data to path {}", file.getAbsolutePath());
+ XxlJobLogger.log("download json data to path {}", file.getAbsolutePath());
+ } else if ("csv".equals(fileType)) {
+ List<Map> transform = (List<Map>) JsonMapper.fromJsonString(JsonMapper.toJsonString(transformResult), List.class);
+ File file = JobUtil.createCsvFile(transform, filePath.concat(".csv"));
+ logger.info("download {} pieces of csv data to path {}", transform.size(), file.getAbsolutePath());
+ XxlJobLogger.log("download {} pieces of csv data to path {}", transform.size(), file.getAbsolutePath());
+ }
}
+
}
/**
@@ -97,7 +98,6 @@ public class DataSinkService {
/**
* @throws Exception
*/
- @Transactional(rollbackFor = Exception.class)
public void exec(List<Map> transformResult, Map<String, String> sinkParams) throws Exception {
Connection conn = null;
DBUtils dbUtils = null;
@@ -145,7 +145,8 @@ public class DataSinkService {
conn.commit();
logger.info("{} table {} , count {} , take {}", sinkParams.get("option"), tableName, count, (System.currentTimeMillis() - start) + "ms");
XxlJobLogger.log("{} table {} , count {} , take {}", sinkParams.get("option"), tableName, count, (System.currentTimeMillis() - start) + "ms");
- } catch (SQLException e) {
+ } catch (Exception e) {
+ conn.rollback();
logger.error(e.getMessage());
throw e;
} finally {