summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorshizhendong <[email protected]>2022-12-27 14:15:43 +0800
committershizhendong <[email protected]>2022-12-27 14:15:43 +0800
commitca050fbf30981c0bb0a1b0a9e52d4d93caa5fdda (patch)
treedcc69faad332d949624b292e7ecea75713e77438
parenta014a8c8a484884fa47b92f99b5e0ee7bc0a373c (diff)
feat: NEZ-2399 terminal record 支持 数据库存储 和 s3存储两种模式
1. terminal record 支持 数据库存储 和 s3存储两种模式 2. 新增 terminal record 过期数据清理JOB 3. 补充 terminal record 过期数据对 sftp log 数据清理
-rw-r--r--nz-admin/src/main/java/com/nis/common/job/JobConfig.java35
-rw-r--r--nz-admin/src/main/java/com/nis/modules/sys/job/CleanJob.java40
-rw-r--r--nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalClient.java2
-rw-r--r--nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalHandler.java2
-rw-r--r--nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalInteractionRunnable.java2
-rw-r--r--nz-admin/src/main/java/com/nis/modules/terminal/entity/TerminalRecord.java12
-rw-r--r--nz-admin/src/main/java/com/nis/modules/terminal/job/CleanExpiredTerminalRecordDataJob.java180
-rw-r--r--nz-admin/src/main/java/com/nis/modules/terminal/service/TerminalRecordService.java17
-rw-r--r--nz-admin/src/main/java/com/nis/modules/terminal/service/impl/TerminalRecordServiceImpl.java255
-rw-r--r--nz-admin/src/main/java/com/nis/modules/terminal/utils/TerminalConstant.java12
-rw-r--r--nz-admin/src/main/resources/db/V2022.12.26__modify_terminal_record_structure.sql11
-rw-r--r--nz-admin/src/main/resources/mapper/terminal/TerminalSessionDao.xml3
12 files changed, 495 insertions, 76 deletions
diff --git a/nz-admin/src/main/java/com/nis/common/job/JobConfig.java b/nz-admin/src/main/java/com/nis/common/job/JobConfig.java
index 40f87728..4e77477e 100644
--- a/nz-admin/src/main/java/com/nis/common/job/JobConfig.java
+++ b/nz-admin/src/main/java/com/nis/common/job/JobConfig.java
@@ -1,6 +1,7 @@
package com.nis.common.job;
import cn.hutool.log.Log;
+import com.nis.common.utils.Tool;
import com.nis.modules.agent.job.AgentHealthCheckJob;
import com.nis.modules.agent.job.AutoConfigAgentJob;
import com.nis.modules.agent.job.CortexHistoryDataDelJob;
@@ -16,7 +17,7 @@ import com.nis.modules.endpoint.job.EndpointStateJob;
import com.nis.modules.sys.job.CleanJob;
import com.nis.modules.sys.job.SessionTimeOutJob;
import com.nis.modules.sys.job.SysComponentJob;
-
+import com.nis.modules.terminal.job.CleanExpiredTerminalRecordDataJob;
import com.nis.modules.terminal.job.CleanNoRealUseTerminalConnJob;
import org.apache.commons.lang.StringUtils;
import org.quartz.*;
@@ -46,13 +47,13 @@ public class JobConfig {
@Value("${nezha.alertSilenceJobCron:0/10 * * * * ?}")
private String alertSilenceJobCron;
-
+
@Value("${nezha.alertMessageExpiredJobCron:0/10 * * * * ?}")
private String alertMessageExpiredJobCron;
-
+
@Value("${nezha.alertRuleManagerJobCron:0/10 * * * * ?}")
private String alertRuleManagerJobCron;
-
+
@Value("${nezha.assetPingJobCron:0 0/1 * * * ? *}")
private String assetPingJobCron;
@@ -88,8 +89,11 @@ public class JobConfig {
@Value("${nezha.cleanNoRealUseTerminalConnJobCron:0 0/5 * * * ? *}")
private String cleanNoRealUseTerminalConnJobCron;
-
- @Autowired
+
+ @Value("${nezha.cleanExpiredTerminalRecordDataJobCron:0 0 1 * * ?}")
+ private String cleanExpiredTerminalRecordDataJobCron;
+
+ @Autowired
private Scheduler scheduler;
// @Qualifier("costomSchedulerFactoryBean")
//private SchedulerFactoryBean schedulerFactoryBean;
@@ -111,7 +115,8 @@ public class JobConfig {
private String alertMessageHistoryPartManJobName = "ALERTMESSAGEHISTORYPARTMAN_JOB";
private String recordtRuleManagerJobName = "RECORDTRULEMANAGER_JOB";
private String cleanNoRealUseTerminalConnJobName = "CLEANNOREALUSETERMINALCONN_JOB";
-
+ private String cleanExpiredTerminalRecordDataJob = "CLEAN_EXPIRED_TERMINAL_RECORD_DATA_JOB";
+
@Bean
public JobDetail agentCronJobDetail() {
return JobBuilder.newJob(AutoConfigAgentJob.class).withIdentity(JOB_NAME + agentJobName).storeDurably().build();
@@ -196,7 +201,12 @@ public class JobConfig {
public JobDetail cleanNoRealUseTerminalConnJobDetail() {
return JobBuilder.newJob(CleanNoRealUseTerminalConnJob.class).withIdentity(JOB_NAME + cleanNoRealUseTerminalConnJobName).storeDurably().build();
}
-
+
+ @Bean
+ public JobDetail cleanExpiredTerminalRecordDataJobDetail() {
+ return JobBuilder.newJob(CleanExpiredTerminalRecordDataJob.class).withIdentity(JOB_NAME + cleanExpiredTerminalRecordDataJob).storeDurably().build();
+ }
+
@PostConstruct
public void init() throws SchedulerException {
// agent 健康检查定时任务
@@ -235,7 +245,7 @@ public class JobConfig {
// terminal clean job 如果配置文件没有配置定时周期 每天凌晨1点执行一次
createCronScheduleJob(cleanJobCronName, cleanJobDetail(), StringUtils.isEmpty(terminalCleanJobCron) ? "0 0 1 * * ?" : terminalCleanJobCron);
-
+
//sys component 状态更新线程
createCronScheduleJob(sysComponentJobName, sysComponentJobDetail(), StringUtils.isEmpty(sysComponentJobCron) ? "0/30 * * * * ?" : sysComponentJobCron);
@@ -244,14 +254,17 @@ public class JobConfig {
// alert message History Partition Management 分区管理
createCronScheduleJob(alertMessageHistoryPartManJobName, alertMessageHistoryPartManJobDetail(), StringUtils.isEmpty(alertMessageHistoryPartManJobCron) ? "0 0 0 1/1 * ?" : alertMessageHistoryPartManJobCron);
-
+
// record rule 配置管理任务
createCronScheduleJob(recordtRuleManagerJobName, recordtRuleManagerJobDetail(), StringUtils.isEmpty(recordtRuleManagerJobCron) ? "0/10 * * * * ?" : recordtRuleManagerJobCron);
// 清理没有真实使用的 terminal ssh/telnet 连接,以超过最后数据交互时间 30 分钟为准 (sys_config.session_timeout)
createCronScheduleJob(cleanNoRealUseTerminalConnJobName, cleanNoRealUseTerminalConnJobDetail(), StringUtils.isEmpty(cleanNoRealUseTerminalConnJobCron) ? "0 0/5 * * * ? *" : cleanNoRealUseTerminalConnJobCron);
+
+ // terminal record expired data clean job
+ createCronScheduleJob(cleanExpiredTerminalRecordDataJob, cleanExpiredTerminalRecordDataJobDetail(), Tool.StrUtil.emptyToDefault(cleanExpiredTerminalRecordDataJobCron, "0 0 1 * * ?"));
}
-
+
/**
* 根据 jobName jobDetail jobCronExpression 创建任务,如果存在则删除再创建
* @param jobName
diff --git a/nz-admin/src/main/java/com/nis/modules/sys/job/CleanJob.java b/nz-admin/src/main/java/com/nis/modules/sys/job/CleanJob.java
index c9a55cf8..bf3d88bb 100644
--- a/nz-admin/src/main/java/com/nis/modules/sys/job/CleanJob.java
+++ b/nz-admin/src/main/java/com/nis/modules/sys/job/CleanJob.java
@@ -12,11 +12,9 @@ import com.nis.modules.alert.dao.AlertMessageHistoryPartitionManagementDao;
import com.nis.modules.alert.entity.AlertRuleEvalLog;
import com.nis.modules.alert.entity.AlertRuleSchedLog;
import com.nis.modules.alert.job.AlertMessageHistoryPartitionManagementJob;
-import com.nis.modules.alert.service.AlertMessageService;
import com.nis.modules.alert.service.AlertRuleEvalLogService;
import com.nis.modules.alert.service.AlertRuleSchedLogService;
import com.nis.modules.sys.service.SysConfigService;
-import com.nis.modules.terminal.service.TerminalSessionService;
import org.apache.commons.lang3.time.StopWatch;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
@@ -29,8 +27,10 @@ import java.util.List;
import java.util.stream.Collectors;
/**
- * 系统清理任务 1. terminal log,默认保留 365天. 2. alert_rule_eval_log,默认保留 30天 3.
- * alert_rule_sched_log,默认保留 30天 4. alert_message_history ,默认不清理
+ * 系统清理任务
+ * 1. alert_rule_eval_log,默认保留 30天
+ * 2. alert_rule_sched_log,默认保留 30天
+ * 3. alert_message_history ,默认不清理
*/
@DisallowConcurrentExecution
public class CleanJob extends QuartzJobBean {
@@ -38,18 +38,12 @@ public class CleanJob extends QuartzJobBean {
private Log log = Log.get();
@Autowired
- private TerminalSessionService terminalSessionService;
-
- @Autowired
private AlertRuleEvalLogService alertRuleEvalLogService;
@Autowired
private AlertRuleSchedLogService alertRuleSchedLogService;
@Autowired
- private AlertMessageService alertMessageService;
-
- @Autowired
private SysConfigService sysConfigService;
@Autowired
@@ -71,19 +65,19 @@ public class CleanJob extends QuartzJobBean {
StopWatch sw = new StopWatch();
// terminal_*
- String terminalRecordLocalRetention = sysConfigService.getValue("terminal_record_local_retention");
- terminalRecordLocalRetention = Tool.StrUtil.emptyToDefault(terminalRecordLocalRetention, "365");
- DateTime startCleanTerminalTime = Tool.DateUtil.offsetDay(now, -Integer.valueOf(terminalRecordLocalRetention));
- sw.start();
- try {
- log.info("start clean terminal data");
- terminalSessionService.cleanExpiredDataByTime(startCleanTerminalTime);
- } catch (Exception e) {
- log.error("clean terminal data error", e);
- } finally {
- sw.stop();
- }
- log.info("clean terminal data Run Time: {}", sw.toString());
+// String terminalRecordLocalRetention = sysConfigService.getValue("terminal_record_local_retention");
+// terminalRecordLocalRetention = Tool.StrUtil.emptyToDefault(terminalRecordLocalRetention, "365");
+// DateTime startCleanTerminalTime = Tool.DateUtil.offsetDay(now, -Integer.valueOf(terminalRecordLocalRetention));
+// sw.start();
+// try {
+// log.info("start clean terminal data");
+// terminalSessionService.cleanExpiredDataByTime(startCleanTerminalTime);
+// } catch (Exception e) {
+// log.error("clean terminal data error", e);
+// } finally {
+// sw.stop();
+// }
+// log.info("clean terminal data Run Time: {}", sw.toString());
// alert_rule_eval_log
String ruleEvalLogLocalRetention = sysConfigService.getValue("alert_rule_eval_log_local_retention");
diff --git a/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalClient.java b/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalClient.java
index 34cc7f11..b293c46d 100644
--- a/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalClient.java
+++ b/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalClient.java
@@ -201,7 +201,7 @@ public class TerminalClient {
long terminalSessionStartTime = (long) TerminalSession.getTerminalAttributeInfo(clientId).get("terminalSessionStartTime");
terminalRecord.setEndTime((int) (System.currentTimeMillis() - terminalSessionStartTime));
terminalRecord.setContent(Files.readAllBytes(tempFile.toPath()));
- terminalRecordService.save(terminalRecord);
+ terminalRecordService.saveTerminalRecordContent(terminalRecord);
} catch (Exception e) {
log.error(e, "Save terminal record content error. clientid: {}, file length: {}, File info: {}", clientId, tempFile.length(), tempFile.toString());
} finally {
diff --git a/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalHandler.java b/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalHandler.java
index 585efacf..1d2af4e2 100644
--- a/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalHandler.java
+++ b/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalHandler.java
@@ -363,7 +363,7 @@ public class TerminalHandler extends TextWebSocketHandler {
long terminalSessionStartTime = (long) TerminalSession.getTerminalAttributeInfo(uuid).get("terminalSessionStartTime");
terminalRecord.setEndTime((int) (System.currentTimeMillis() - terminalSessionStartTime));
terminalRecord.setContent(Files.readAllBytes(tempFile.toPath()));
- terminalRecordService.save(terminalRecord);
+ terminalRecordService.saveTerminalRecordContent(terminalRecord);
} catch (Exception e) {
log.error(e, "Save terminal record content error. clientid: {}, file length: {}, File info: {}", uuid, tempFile.length(), tempFile.toString());
} finally {
diff --git a/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalInteractionRunnable.java b/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalInteractionRunnable.java
index 5cb81c13..75bbeb7d 100644
--- a/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalInteractionRunnable.java
+++ b/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalInteractionRunnable.java
@@ -123,7 +123,7 @@ public class TerminalInteractionRunnable extends Thread {
}
long terminalSessionStartTime = (long) TerminalSession.getTerminalAttributeInfo(clientId).get("terminalSessionStartTime");
terminalRecord.setEndTime((int) (System.currentTimeMillis() - terminalSessionStartTime));
- terminalRecordService.save(terminalRecord);
+ terminalRecordService.saveTerminalRecordContent(terminalRecord);
} catch (Exception e) {
log.error(e, "Terminal connection has been closed, Terminal Record file content record failed. clientid: {}, File info: {}", clientId, tempFile.toString());
} finally {
diff --git a/nz-admin/src/main/java/com/nis/modules/terminal/entity/TerminalRecord.java b/nz-admin/src/main/java/com/nis/modules/terminal/entity/TerminalRecord.java
index 8af8dc88..d8728e2e 100644
--- a/nz-admin/src/main/java/com/nis/modules/terminal/entity/TerminalRecord.java
+++ b/nz-admin/src/main/java/com/nis/modules/terminal/entity/TerminalRecord.java
@@ -35,9 +35,19 @@ public class TerminalRecord {
private Integer endTime;
/**
+ * 存储模式,1: 本地存储,2:S3对象存储
+ */
+ private Integer storageType;
+
+ /**
+ * S3 path
+ */
+ private String path;
+
+ /**
* 文件内容
* 记录的是 session开始后的时间偏移量
* 超过定义临时文件大小后,再新增一条记录
*/
private byte[] content;
-} \ No newline at end of file
+}
diff --git a/nz-admin/src/main/java/com/nis/modules/terminal/job/CleanExpiredTerminalRecordDataJob.java b/nz-admin/src/main/java/com/nis/modules/terminal/job/CleanExpiredTerminalRecordDataJob.java
new file mode 100644
index 00000000..8da2113e
--- /dev/null
+++ b/nz-admin/src/main/java/com/nis/modules/terminal/job/CleanExpiredTerminalRecordDataJob.java
@@ -0,0 +1,180 @@
+package com.nis.modules.terminal.job;
+
+import cn.hutool.core.date.DateTime;
+import cn.hutool.core.date.StopWatch;
+import cn.hutool.core.util.BooleanUtil;
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.log.Log;
+import com.amazonaws.SDKGlobalConfiguration;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.BucketLifecycleConfiguration;
+import com.nis.common.utils.Constant;
+import com.nis.common.utils.Tool;
+import com.nis.modules.sys.service.SysConfService;
+import com.nis.modules.terminal.service.TerminalRecordService;
+import com.nis.modules.terminal.service.TerminalSessionService;
+import com.nis.modules.terminal.utils.TerminalConstant;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.quartz.QuartzJobBean;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.Arrays;
+import java.util.List;
+
+@DisallowConcurrentExecution
+public class CleanExpiredTerminalRecordDataJob extends QuartzJobBean {
+
+ private static final Log log = Log.get();
+
+ static {
+ System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
+ }
+
+ @Autowired
+ private RedisTemplate redisTemplate;
+
+ @Autowired
+ private SysConfService sysConfService;
+
+ @Autowired
+ private TerminalSessionService terminalSessionService;
+
+ @Autowired
+ private TerminalRecordService terminalRecordService;
+
+ @Override
+ protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
+ Thread.currentThread().setName("CleanExpiredTerminalRecordDataJob");
+
+ // 检查 haMode
+ String serverId = (String) redisTemplate.opsForValue().get(Constant.SYS_HA_LOCK);
+ log.info("[CleanExpiredTerminalRecordDataJob] [Get ha lock from redis] [server id: {}]", serverId);
+ if (!StrUtil.equals(serverId, Constant.SERVER_ID)) {
+ log.warn("[CleanExpiredTerminalRecordDataJob] [did not get the ha lock, not run job] [redis server id: {}] [system server id: {}]", serverId, Constant.SERVER_ID);
+ return;
+ }
+
+ log.info("[CleanExpiredTerminalRecordDataJob] [start cleaning expired data]");
+
+ StopWatch sw = StopWatch.create("CleanExpiredTerminalRecordDataJob");
+ sw.start();
+
+ try {
+ // clean Expired Terminal Record Data
+ this.cleanExpiredTerminalRecordData();
+ } catch (Exception e) {
+ log.error(e, "[CleanExpiredTerminalRecordDataJob] [failed to clean data]");
+ } finally {
+ sw.stop();
+ }
+
+ log.info("[CleanExpiredTerminalRecordDataJob] [clean data complete] [Run Time: {}]", sw.prettyPrint());
+ }
+
+ @Transactional(rollbackFor = Exception.class)
+ public void cleanExpiredTerminalRecordData() {
+ // 存储模式,1: 本地存储,2:S3对象存储
+ String terminalStorageType = sysConfService.getValue(TerminalConstant.TERMINAL_STORAGE_TYPE);
+ terminalStorageType = StrUtil.emptyToDefault(terminalStorageType, "1");
+ log.info("[cleanExpiredTerminalRecordData] [record storage type] [value: {}]", terminalStorageType);
+
+ DateTime now = Tool.DateUtil.date();
+
+ // 数据存储天数,默认 365 天
+ String terminalRecordLocalRetention = sysConfService.getValue(TerminalConstant.TERMINAL_RECORD_LOCAL_RETENTION);
+ terminalRecordLocalRetention = Tool.StrUtil.emptyToDefault(terminalRecordLocalRetention, "365");
+
+ // 过期时间
+ DateTime expireDate = Tool.DateUtil.offsetDay(now, -Integer.valueOf(terminalRecordLocalRetention));
+ log.info("[cleanExpiredTerminalRecordData] [clear expired record date in db] [now: {}] [retention days: {}] [expire date: {}]", now, terminalRecordLocalRetention, expireDate);
+ terminalSessionService.cleanExpiredDataByTime(expireDate);
+ log.info("[cleanExpiredTerminalRecordData] [clear expired record date in db finshed]");
+
+ // 如果是 S3 存储内容信息,则配置存储桶过期策略
+ if (StrUtil.equals("2", terminalStorageType)) {
+ AmazonS3 s3Client = terminalRecordService.buildTerminalStorageS3Client();
+ try {
+ String bucketName = sysConfService.getValue(TerminalConstant.TERMINAL_STORAGE_S3_BUCKET);
+ log.info("[cleanExpiredTerminalRecordData] [config terminal record bucket lifecycle] [bucketName: {}] [retention days: {}]", bucketName, terminalRecordLocalRetention);
+
+ if (s3Client.doesBucketExistV2(bucketName)) {
+ log.info("[cleanExpiredTerminalRecordData] [bucket exist] [bucketName: {}]", bucketName);
+
+ Integer retentionDay = Integer.valueOf(terminalRecordLocalRetention);
+ String ruleId = "terminalRecordExpirationRule";
+
+ // Retrieve the configuration.
+ // Verify that the configuration now has one rules. prefix: fake/ expirationInDays: sysconfig.metricsStorageRetentionDay
+ BucketLifecycleConfiguration bucketLifecycleConfiguration = s3Client.getBucketLifecycleConfiguration(bucketName);
+ if (ObjectUtil.isNull(bucketLifecycleConfiguration)) {
+ log.info("[cleanExpiredTerminalRecordData] [bucket has empty lifecycle configuration] [bucketName: {}]", bucketName);
+ this.setBucketLifecycleConfiguration(s3Client, bucketName, ruleId, retentionDay);
+ } else {
+ List<BucketLifecycleConfiguration.Rule> bucketLifecycleConfigurationRules = bucketLifecycleConfiguration.getRules();
+ bucketLifecycleConfigurationRules = Tool.CollUtil.defaultIfEmpty(bucketLifecycleConfigurationRules, Tool.ListUtil.empty());
+
+ BucketLifecycleConfiguration.Rule cortexExpirationRule = bucketLifecycleConfigurationRules.stream()
+ .filter(rule -> StrUtil.equals(rule.getId(), ruleId))
+ .findFirst()
+ .orElse(null);
+
+ if (ObjectUtil.isNull(cortexExpirationRule)) {
+ log.info("[cleanExpiredTerminalRecordData] [bucket has empty lifecycle configuration] [bucketName: {}]", bucketName);
+ this.setBucketLifecycleConfiguration(s3Client, bucketName, ruleId, retentionDay);
+ } else {
+ // 当前存在过期策略,判断规则是否发生变化
+ boolean isUpdate = BooleanUtil.or(
+ ObjectUtil.notEqual(BucketLifecycleConfiguration.ENABLED, cortexExpirationRule.getStatus()),
+ ObjectUtil.notEqual(retentionDay, cortexExpirationRule.getExpirationInDays())
+ );
+ log.info("[cleanExpiredTerminalRecordData] [bucket lifecycle configuration] [bucketName: {}] [updated: {}]", bucketName, isUpdate);
+ if (isUpdate) {
+ this.setBucketLifecycleConfiguration(s3Client, bucketName, ruleId, retentionDay);
+ }
+ }
+ }
+
+ // query the existing lifecycle configuration
+ bucketLifecycleConfiguration = s3Client.getBucketLifecycleConfiguration(bucketName);
+ List<BucketLifecycleConfiguration.Rule> rules1 = bucketLifecycleConfiguration.getRules();
+ for (BucketLifecycleConfiguration.Rule l : rules1) {
+ log.info("[cleanExpiredTerminalRecordData] [current lifecycle config info] [bucketName: {}] [status: {}] [expiration days: {}]", bucketName, l.getStatus(), l.getExpirationInDays());
+ }
+ } else {
+ log.warn("[cleanExpiredTerminalRecordData] [bucket does not exist] [bucketName: {}]", bucketName);
+ }
+ } catch (Exception e) {
+ log.error(e, "[cleanExpiredTerminalRecordData] [error]");
+ } finally {
+ s3Client.shutdown();
+ }
+ }
+ }
+
+ /**
+ * set Bucket Lifecycle Configuration
+ *
+ * @param s3Client
+ * @param bucketName
+ * @param ruleId
+ * @param retentionDay
+ */
+ private void setBucketLifecycleConfiguration(AmazonS3 s3Client, String bucketName, String ruleId, Integer retentionDay) {
+ log.info("[setBucketLifecycleConfiguration] [bucketName: {}] [ruleId: {}] [retentionDay: {}]", bucketName, ruleId, retentionDay);
+
+ BucketLifecycleConfiguration.Rule rule = new BucketLifecycleConfiguration.Rule()
+ .withId(ruleId)
+ .withExpirationInDays(retentionDay)
+ .withStatus(BucketLifecycleConfiguration.ENABLED);
+
+ BucketLifecycleConfiguration configuration = new BucketLifecycleConfiguration().withRules(Arrays.asList(rule));
+ // Save the configuration.
+ s3Client.setBucketLifecycleConfiguration(bucketName, configuration);
+ }
+
+}
diff --git a/nz-admin/src/main/java/com/nis/modules/terminal/service/TerminalRecordService.java b/nz-admin/src/main/java/com/nis/modules/terminal/service/TerminalRecordService.java
index 676e0b6e..7c5088bf 100644
--- a/nz-admin/src/main/java/com/nis/modules/terminal/service/TerminalRecordService.java
+++ b/nz-admin/src/main/java/com/nis/modules/terminal/service/TerminalRecordService.java
@@ -1,5 +1,6 @@
package com.nis.modules.terminal.service;
+import com.amazonaws.services.s3.AmazonS3;
import com.baomidou.mybatisplus.extension.service.IService;
import com.nis.modules.terminal.entity.TerminalRecord;
@@ -8,4 +9,18 @@ import java.util.Map;
public interface TerminalRecordService extends IService<TerminalRecord> {
Map queryTerminalRecordInfos(Map<String, Object> params);
-} \ No newline at end of file
+
+ /**
+ * save Terminal Record Content
+ *
+ * @param record
+ */
+ void saveTerminalRecordContent(TerminalRecord record);
+
+ /**
+ * build Terminal Storage S3 Client
+ *
+ * @return
+ */
+ AmazonS3 buildTerminalStorageS3Client();
+}
diff --git a/nz-admin/src/main/java/com/nis/modules/terminal/service/impl/TerminalRecordServiceImpl.java b/nz-admin/src/main/java/com/nis/modules/terminal/service/impl/TerminalRecordServiceImpl.java
index 492b82f9..a8062af8 100644
--- a/nz-admin/src/main/java/com/nis/modules/terminal/service/impl/TerminalRecordServiceImpl.java
+++ b/nz-admin/src/main/java/com/nis/modules/terminal/service/impl/TerminalRecordServiceImpl.java
@@ -1,30 +1,53 @@
package com.nis.modules.terminal.service.impl;
+import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.CharsetUtil;
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.log.Log;
import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONPath;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.ObjectMetadata;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.nis.common.exception.NZException;
import com.nis.common.smartvalidate.ValidateUtils;
+import com.nis.common.utils.JSONUtil;
import com.nis.common.utils.RCode;
import com.nis.common.utils.StringUtils;
+import com.nis.common.utils.Tool;
+import com.nis.modules.sys.service.SysConfService;
import com.nis.modules.terminal.dao.TerminalRecordDao;
import com.nis.modules.terminal.entity.TerminalRecord;
import com.nis.modules.terminal.service.TerminalRecordService;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
+import com.nis.modules.terminal.utils.TerminalConstant;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.io.ByteArrayInputStream;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-@Slf4j
@Service("terminalRecordService")
public class TerminalRecordServiceImpl extends ServiceImpl<TerminalRecordDao, TerminalRecord> implements TerminalRecordService {
+ private static final Log log = Log.get();
+
+ @Autowired
+ private SysConfService sysConfService;
+
@Override
public Map queryTerminalRecordInfos(Map<String, Object> params) {
// uuid 不能为空
@@ -37,49 +60,209 @@ public class TerminalRecordServiceImpl extends ServiceImpl<TerminalRecordDao, Te
throw new NZException(RCode.TERMINAL_QUERYSIZE_INVALIDED);
}
- List<TerminalRecord> list = this.list(new LambdaQueryWrapper<TerminalRecord>()
+ Map<Object, Object> resultMap = Tool.MapUtil.builder().map();
+ resultMap.put("uuid", params.get("uuid"));
+ resultMap.put("startTime", time);
+ resultMap.put("endTime", null);
+ resultMap.put("hasNext", 0);
+ resultMap.put("list", Collections.emptyList());
+
+ // content Str List of uuid
+ List<String> contentStrListOfUuid = Tool.ListUtil.list(true);
+
+ // record List In Db
+ List<TerminalRecord> recordListInDb = this.list(new LambdaQueryWrapper<TerminalRecord>()
.eq(TerminalRecord::getUuid, params.get("uuid"))
.orderByAsc(TerminalRecord::getStartTime));
- Map map = new HashMap(8);
- map.put("uuid", params.get("uuid"));
+ log.info("[queryTerminalRecordInfos] [record data in db] [uuid: {}] [size: {}]", params.get("uuid"), recordListInDb.size());
- if (CollectionUtils.isNotEmpty(list)) {
- int hashNext = 0;
+ if (Tool.CollUtil.isNotEmpty(recordListInDb)) {
+ // storage type
+ Integer recordStorageType = recordListInDb.stream().findFirst().get().getStorageType();
+ if (ObjectUtil.equals(1, recordStorageType)) {
+ log.info("[queryTerminalRecordInfos] [record data source is db] [uuid: {}] [size: {}]", params.get("uuid"), recordListInDb.size());
+ // db 本地存储
+ contentStrListOfUuid = recordListInDb.stream().map(record -> StrUtil.str(record.getContent(), CharsetUtil.charset("UTF-8"))).collect(Collectors.toList());
+ } else if (ObjectUtil.equals(2, recordStorageType)) {
+ log.info("[queryTerminalRecordInfos] [record data source is s3] [uuid: {}] [size in db: {}]", params.get("uuid"), recordListInDb.size());
+ // S3 对象存储
+ AmazonS3 s3Client = this.buildTerminalStorageS3Client();
+ try {
+ String bucketName = sysConfService.getValue(TerminalConstant.TERMINAL_STORAGE_S3_BUCKET);
- StringBuilder builder = new StringBuilder();
- builder.append("[");
- for (TerminalRecord record : list) {
- builder.append(new String(record.getContent(), CharsetUtil.charset("UTF-8")));
- }
- String str = builder.substring(0, builder.length() - 1);
- List<Map> maps = JSONArray.parseArray(str += "]", Map.class);
- maps = maps.stream().filter(tempEntity -> {
- Integer t = (Integer) tempEntity.get("t");
- if (t > time) {
- return true;
+ /*
+ // query objects in s3
+ ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
+ listObjectsV2Request
+ .withBucketName(bucketName)
+ .withMaxKeys(1000)
+ .withPrefix(StrUtil.toString(params.get("uuid")));
+
+ ListObjectsV2Result listObjectsV2Result = s3Client.listObjectsV2(listObjectsV2Request);
+ List<S3ObjectSummary> S3ObjectSummarylist = listObjectsV2Result.getObjectSummaries();
+ // 再次排序
+ List<String> fileNameList = S3ObjectSummarylist.stream().map(S3ObjectSummary::getKey)
+ .sorted(Comparator.comparing(str -> Integer.valueOf(Tool.StrUtil.subAfter(str, "/", true)))).collect(Collectors.toList());*/
+
+ List<String> fileNameList = recordListInDb.stream().map(TerminalRecord::getPath)
+ .sorted(Comparator.comparing(str -> Integer.valueOf(Tool.StrUtil.subAfter(str, "/", true)))).collect(Collectors.toList());
+ for (String fileName : fileNameList) {
+ String objectAsString = s3Client.getObjectAsString(bucketName, fileName);
+ contentStrListOfUuid.add(objectAsString);
+ }
+ log.info("[queryTerminalRecordInfos] [record data source is s3] [uuid: {}] [size in db: {}] [size in s3: {}]", params.get("uuid"), fileNameList.size(), contentStrListOfUuid.size());
+ } finally {
+ s3Client.shutdown();
}
- return false;
- }).collect(Collectors.toList());
+ }
+ }
+ if (Tool.CollUtil.isNotEmpty(contentStrListOfUuid)) {
+ // get List Of Content Obj Older Than Time Param
+ List<Map> contentObjList = this.getListOfContentObjOlderThanTimeParam(contentStrListOfUuid, time);
// size 为 1 表示不限制条数
- if (!size.equals(1) && maps.size() > size) {
- maps = maps.subList(0, size);
- hashNext = 1;
+ if (BooleanUtil.and(ObjectUtil.notEqual(size, 1),
+ contentObjList.size() > size)) {
+ contentObjList = contentObjList.subList(0, size);
+ resultMap.put("hasNext", 1);
+ }
+
+ Map endEntity = contentObjList.get(contentObjList.size() - 1);
+ resultMap.put("endTime", endEntity.get("t"));
+ resultMap.put("list", contentObjList);
+ }
+ return resultMap;
+ }
+
+ /**
+ * get List Of Content Obj Older Than Time Param
+ *
+ * @param contentList
+ * @param time
+ * @return
+ */
+ private List<Map> getListOfContentObjOlderThanTimeParam(List<String> contentList, Integer time) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("[");
+
+ for (String content : contentList) {
+ builder.append(content);
+ }
+
+ String str = builder.substring(0, builder.length() - 1);
+
+ List<Map> maps = JSONArray.parseArray(str += "]", Map.class);
+ maps = maps.stream().filter(tempEntity -> {
+ Integer t = (Integer) tempEntity.get("t");
+ if (t > time) {
+ return true;
+ }
+ return false;
+ }).collect(Collectors.toList());
+
+ return maps;
+ }
+
+ @Override
+ public void saveTerminalRecordContent(TerminalRecord record) {
+ // 存储模式,1: 本地存储,2:S3对象存储
+ String terminalStorageType = sysConfService.getValue(TerminalConstant.TERMINAL_STORAGE_TYPE);
+ terminalStorageType = StrUtil.emptyToDefault(terminalStorageType, "1");
+ log.info("[saveTerminalRecordContent] [storage type] [value: {}] [uuid: {}]", terminalStorageType, record.getUuid());
+
+ // storageType
+ record.setStorageType(Integer.valueOf(terminalStorageType));
+
+ switch (terminalStorageType) {
+ case "1": {
+ // 本地存储
+ this.save(record);
+ log.info("[saveTerminalRecordContent] [local storage record succeeded] [uuid: {}]", record.getUuid());
+ break;
}
+ case "2": {
+ // 保存记录至 db,不包含 content 内容
+ TerminalRecord saveRecordEntity = new TerminalRecord();
+ BeanUtil.copyProperties(record, saveRecordEntity, "content");
+
+
+ String uuid = record.getUuid();
+ Integer startTime = record.getStartTime();
+
+ // 以 uuid 为文件路径,startTime 为文件名称
+ String key = StrUtil.concat(true, uuid, "/", startTime.toString());
+
+ saveRecordEntity.setPath(key);
+ this.save(saveRecordEntity);
+
+ // S3 对象存储
+ AmazonS3 s3Client = this.buildTerminalStorageS3Client();
+
+ String bucketName = sysConfService.getValue(TerminalConstant.TERMINAL_STORAGE_S3_BUCKET);
+ boolean doesBucketExist = s3Client.doesBucketExistV2(bucketName);
+ log.info("[saveTerminalRecordContent] [bucketName: {}] [doesBucketExist: {}]", bucketName, doesBucketExist);
+
+ if (!doesBucketExist) {
+ log.info("[saveTerminalRecordContent] [create bucket] [name: {}]", bucketName);
+ s3Client.createBucket(bucketName);
+ }
- Map endEntity = maps.get(maps.size() - 1);
- map.put("endTime", endEntity.get("t"));
- map.put("hasNext", hashNext);
+ byte[] content = record.getContent();
+ ByteArrayInputStream input = null;
+ try {
+ ObjectMetadata metadata = new ObjectMetadata();
+ metadata.setContentType("application/json");
+ metadata.setContentLength(content.length);
+ metadata.setHeader("x-hos-meta-message", "message");
- map.put("list", maps);
- } else {
- map.put("endTime", null);
- map.put("hasNext", 0);
- map.put("list", Collections.emptyList());
+ input = new ByteArrayInputStream(content);
+
+ // 以流方式上传对象,指定桶名、对象名、输入流、ObjectMetadata
+ s3Client.putObject(bucketName, key, input, metadata);
+
+ log.info("[saveTerminalRecordContent] [remote storage record succeeded] [key: {}]", key);
+ } catch (Exception e) {
+ log.error(e, "[saveTerminalRecordContent] [upload record content to s3 error] [uuid: {}] [start: {}] [size: {}]", uuid, startTime, content.length);
+ } finally {
+ Tool.IoUtil.close(input);
+ // 关闭 AmazonS3Client
+ s3Client.shutdown();
+ }
+ break;
+ }
+ default:
+ break;
}
- map.put("startTime", time);
- return map;
}
-} \ No newline at end of file
+
+ /**
+ * build Terminal Storage S3 Client
+ *
+ * @return
+ */
+ @Override
+ public AmazonS3 buildTerminalStorageS3Client() {
+ log.info("[buildTerminalStorageS3Client] [build Amazon S3 Client begin...]");
+
+ String s3Endpoint = sysConfService.getValue(TerminalConstant.TERMINAL_STORAGE_S3_ENDPOINT);
+ String accessKey = sysConfService.getValue(TerminalConstant.TERMINAL_STORAGE_S3_ACCESS_KEY);
+ String secretKey = sysConfService.getValue(TerminalConstant.TERMINAL_STORAGE_S3_SECRET_ACCESS_KEY);
+
+ AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
+ ClientConfiguration clientConfig = new ClientConfiguration();
+ clientConfig.setProtocol(Protocol.HTTP);
+
+ AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
+ .withCredentials(new AWSStaticCredentialsProvider(credentials))
+ .withClientConfiguration(clientConfig)
+ .withEndpointConfiguration(
+ new AwsClientBuilder.EndpointConfiguration(s3Endpoint, null)
+ )
+ .build();
+
+ log.info("[buildTerminalStorageS3Client] [build Amazon S3 Client finshed] [ClientInfo: {}]", JSONPath.read(JSONUtil.toJsonStr(s3Client), "endpoint"));
+ return s3Client;
+ }
+}
diff --git a/nz-admin/src/main/java/com/nis/modules/terminal/utils/TerminalConstant.java b/nz-admin/src/main/java/com/nis/modules/terminal/utils/TerminalConstant.java
index d29b9b36..2937ed18 100644
--- a/nz-admin/src/main/java/com/nis/modules/terminal/utils/TerminalConstant.java
+++ b/nz-admin/src/main/java/com/nis/modules/terminal/utils/TerminalConstant.java
@@ -15,6 +15,18 @@ public class TerminalConstant {
*/
public static final Integer RECORD_TEMPFILE_MAX_LENGTH = 10485760;
+
+ // 存储模式,1: 本地存储,2:S3对象存储
+ public static final String TERMINAL_STORAGE_TYPE = "terminal_storage_type";
+
+ public static final String TERMINAL_STORAGE_S3_ENDPOINT = "terminal_storage_s3_endpoint";
+ public static final String TERMINAL_STORAGE_S3_BUCKET = "terminal_storage_s3_bucket";
+ public static final String TERMINAL_STORAGE_S3_ACCESS_KEY = "terminal_storage_s3_access_key";
+ public static final String TERMINAL_STORAGE_S3_SECRET_ACCESS_KEY = "terminal_storage_s3_secret_access_key";
+
+ // 存储天数
+ public static final String TERMINAL_RECORD_LOCAL_RETENTION = "terminal_record_local_retention";
+
/**
* 不可见字符
*/
diff --git a/nz-admin/src/main/resources/db/V2022.12.26__modify_terminal_record_structure.sql b/nz-admin/src/main/resources/db/V2022.12.26__modify_terminal_record_structure.sql
new file mode 100644
index 00000000..4bef91a4
--- /dev/null
+++ b/nz-admin/src/main/resources/db/V2022.12.26__modify_terminal_record_structure.sql
@@ -0,0 +1,11 @@
+
+/**
+ 1. terminal_record add column storage_type & path
+ 2. terminal_record modify column content
+ */
+
+ALTER TABLE `terminal_record`
+ADD COLUMN IF NOT EXISTS `storage_type` int(1) NOT NULL DEFAULT 1 COMMENT '存储模式,1: 本地存储,2:S3对象存储',
+ADD COLUMN IF NOT EXISTS `path` varchar(4096) DEFAULT NULL COMMENT 'S3对象存储时有效',
+MODIFY COLUMN `content` MEDIUMBLOB NULL COMMENT '记录的是 session开始后的时间偏移量\r\n\r\n最大储存 65K,超过部分再新增一条记录';
+
diff --git a/nz-admin/src/main/resources/mapper/terminal/TerminalSessionDao.xml b/nz-admin/src/main/resources/mapper/terminal/TerminalSessionDao.xml
index 77983210..b371151c 100644
--- a/nz-admin/src/main/resources/mapper/terminal/TerminalSessionDao.xml
+++ b/nz-admin/src/main/resources/mapper/terminal/TerminalSessionDao.xml
@@ -72,10 +72,11 @@
WHERE
ts.start_time &lt;= #{startCleanTerminalTime};
- DELETE tr.*,ts.*
+ DELETE tr.*,ts.*,tsl.*
FROM
terminal_session ts
LEFT JOIN terminal_record tr ON ts.uuid = tr.uuid
+ LEFT JOIN terminal_sftp_log tsl on ts.uuid = tsl.uuid
WHERE
ts.start_time &lt;= #{startCleanTerminalTime};
</delete>