diff options
| author | zhanghongqing <[email protected]> | 2022-04-26 17:19:34 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2022-04-26 17:19:34 +0800 |
| commit | e19d2ce575220634b32444da9739d309965ad442 (patch) | |
| tree | 29ba499291ed55733f1e0badb978a904e36ede07 | |
| parent | 5820b3b2d4293b5a403fecab66654f4c507f8a80 (diff) | |
websketch 文件导出支持同时输出json,csv文件,数据入库改为手动控制事务
| -rw-r--r-- | galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java | 31 |
1 files changed, 16 insertions, 15 deletions
diff --git a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java index 57ffa54..80d7f47 100644 --- a/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java +++ b/galaxy-job-executor/src/main/java/com/mesalab/executor/service/DataSinkService.java @@ -13,12 +13,10 @@ import com.zdjizhi.utils.DateUtils; import com.zdjizhi.utils.JsonMapper; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; import java.io.File; import java.sql.Connection; import java.sql.PreparedStatement; -import java.sql.SQLException; import java.util.List; import java.util.Map; @@ -57,7 +55,7 @@ public class DataSinkService { XxlJobLogger.log("file sink data is empty "); return; } - String fileType = sinkParams.get("fileType"); + String[] fileTypes = StrUtil.split(sinkParams.get("fileType"), ","); String path = sinkParams.get("path"); String prefix = sinkParams.get("prefix"); String date = ""; @@ -65,17 +63,20 @@ public class DataSinkService { date = "_".concat(DateUtils.getCurrentDate("yyyyMMdd")); } String filePath = StrUtil.concat(false, path, prefix, date); - if ("json".equals(fileType)) { - FileWriter fileWriter = new FileWriter(filePath.concat(".json")); - File file = fileWriter.write(JSON.toJSONString(transformResult, SerializerFeature.PrettyFormat)); - logger.info("download json data to path {}", file.getAbsolutePath()); - XxlJobLogger.log("download json data to path {}", file.getAbsolutePath()); - } else if ("csv".equals(fileType)) { - List<Map> transform = (List<Map>) JsonMapper.fromJsonString(JsonMapper.toJsonString(transformResult), List.class); - File file = JobUtil.createCsvFile(transform, filePath.concat(".csv")); - logger.info("download {} pieces of csv data to path {}", transform.size(), file.getAbsolutePath()); - XxlJobLogger.log("download {} pieces of csv data to path {}", transform.size(), file.getAbsolutePath()); + for (String fileType : fileTypes) { + if ("json".equals(fileType)) { + FileWriter fileWriter = new FileWriter(filePath.concat(".json")); + File file = fileWriter.write(JSON.toJSONString(transformResult, SerializerFeature.PrettyFormat)); + logger.info("download json data to path {}", file.getAbsolutePath()); + XxlJobLogger.log("download json data to path {}", file.getAbsolutePath()); + } else if ("csv".equals(fileType)) { + List<Map> transform = (List<Map>) JsonMapper.fromJsonString(JsonMapper.toJsonString(transformResult), List.class); + File file = JobUtil.createCsvFile(transform, filePath.concat(".csv")); + logger.info("download {} pieces of csv data to path {}", transform.size(), file.getAbsolutePath()); + XxlJobLogger.log("download {} pieces of csv data to path {}", transform.size(), file.getAbsolutePath()); + } } + } /** @@ -97,7 +98,6 @@ public class DataSinkService { /** * @throws Exception */ - @Transactional(rollbackFor = Exception.class) public void exec(List<Map> transformResult, Map<String, String> sinkParams) throws Exception { Connection conn = null; DBUtils dbUtils = null; @@ -145,7 +145,8 @@ public class DataSinkService { conn.commit(); logger.info("{} table {} , count {} , take {}", sinkParams.get("option"), tableName, count, (System.currentTimeMillis() - start) + "ms"); XxlJobLogger.log("{} table {} , count {} , take {}", sinkParams.get("option"), tableName, count, (System.currentTimeMillis() - start) + "ms"); - } catch (SQLException e) { + } catch (Exception e) { + conn.rollback(); logger.error(e.getMessage()); throw e; } finally { |
