diff options
| author | shizhendong <[email protected]> | 2022-12-27 14:15:43 +0800 |
|---|---|---|
| committer | shizhendong <[email protected]> | 2022-12-27 14:15:43 +0800 |
| commit | ca050fbf30981c0bb0a1b0a9e52d4d93caa5fdda (patch) | |
| tree | dcc69faad332d949624b292e7ecea75713e77438 | |
| parent | a014a8c8a484884fa47b92f99b5e0ee7bc0a373c (diff) | |
feat: NEZ-2399 terminal record 支持 数据库存储 和 s3存储两种模式
1. terminal record 支持 数据库存储 和 s3存储两种模式
2. 新增 terminal record 过期数据清理JOB
3. 补充 terminal record 过期数据对 sftp log 数据清理
12 files changed, 495 insertions, 76 deletions
diff --git a/nz-admin/src/main/java/com/nis/common/job/JobConfig.java b/nz-admin/src/main/java/com/nis/common/job/JobConfig.java index 40f87728..4e77477e 100644 --- a/nz-admin/src/main/java/com/nis/common/job/JobConfig.java +++ b/nz-admin/src/main/java/com/nis/common/job/JobConfig.java @@ -1,6 +1,7 @@ package com.nis.common.job; import cn.hutool.log.Log; +import com.nis.common.utils.Tool; import com.nis.modules.agent.job.AgentHealthCheckJob; import com.nis.modules.agent.job.AutoConfigAgentJob; import com.nis.modules.agent.job.CortexHistoryDataDelJob; @@ -16,7 +17,7 @@ import com.nis.modules.endpoint.job.EndpointStateJob; import com.nis.modules.sys.job.CleanJob; import com.nis.modules.sys.job.SessionTimeOutJob; import com.nis.modules.sys.job.SysComponentJob; - +import com.nis.modules.terminal.job.CleanExpiredTerminalRecordDataJob; import com.nis.modules.terminal.job.CleanNoRealUseTerminalConnJob; import org.apache.commons.lang.StringUtils; import org.quartz.*; @@ -46,13 +47,13 @@ public class JobConfig { @Value("${nezha.alertSilenceJobCron:0/10 * * * * ?}") private String alertSilenceJobCron; - + @Value("${nezha.alertMessageExpiredJobCron:0/10 * * * * ?}") private String alertMessageExpiredJobCron; - + @Value("${nezha.alertRuleManagerJobCron:0/10 * * * * ?}") private String alertRuleManagerJobCron; - + @Value("${nezha.assetPingJobCron:0 0/1 * * * ? *}") private String assetPingJobCron; @@ -88,8 +89,11 @@ public class JobConfig { @Value("${nezha.cleanNoRealUseTerminalConnJobCron:0 0/5 * * * ? *}") private String cleanNoRealUseTerminalConnJobCron; - - @Autowired + + @Value("${nezha.cleanExpiredTerminalRecordDataJobCron:0 0 1 * * ?}") + private String cleanExpiredTerminalRecordDataJobCron; + + @Autowired private Scheduler scheduler; // @Qualifier("costomSchedulerFactoryBean") //private SchedulerFactoryBean schedulerFactoryBean; @@ -111,7 +115,8 @@ public class JobConfig { private String alertMessageHistoryPartManJobName = "ALERTMESSAGEHISTORYPARTMAN_JOB"; private String recordtRuleManagerJobName = "RECORDTRULEMANAGER_JOB"; private String cleanNoRealUseTerminalConnJobName = "CLEANNOREALUSETERMINALCONN_JOB"; - + private String cleanExpiredTerminalRecordDataJob = "CLEAN_EXPIRED_TERMINAL_RECORD_DATA_JOB"; + @Bean public JobDetail agentCronJobDetail() { return JobBuilder.newJob(AutoConfigAgentJob.class).withIdentity(JOB_NAME + agentJobName).storeDurably().build(); @@ -196,7 +201,12 @@ public class JobConfig { public JobDetail cleanNoRealUseTerminalConnJobDetail() { return JobBuilder.newJob(CleanNoRealUseTerminalConnJob.class).withIdentity(JOB_NAME + cleanNoRealUseTerminalConnJobName).storeDurably().build(); } - + + @Bean + public JobDetail cleanExpiredTerminalRecordDataJobDetail() { + return JobBuilder.newJob(CleanExpiredTerminalRecordDataJob.class).withIdentity(JOB_NAME + cleanExpiredTerminalRecordDataJob).storeDurably().build(); + } + @PostConstruct public void init() throws SchedulerException { // agent 健康检查定时任务 @@ -235,7 +245,7 @@ public class JobConfig { // terminal clean job 如果配置文件没有配置定时周期 每天凌晨1点执行一次 createCronScheduleJob(cleanJobCronName, cleanJobDetail(), StringUtils.isEmpty(terminalCleanJobCron) ? "0 0 1 * * ?" : terminalCleanJobCron); - + //sys component 状态更新线程 createCronScheduleJob(sysComponentJobName, sysComponentJobDetail(), StringUtils.isEmpty(sysComponentJobCron) ? "0/30 * * * * ?" : sysComponentJobCron); @@ -244,14 +254,17 @@ public class JobConfig { // alert message History Partition Management 分区管理 createCronScheduleJob(alertMessageHistoryPartManJobName, alertMessageHistoryPartManJobDetail(), StringUtils.isEmpty(alertMessageHistoryPartManJobCron) ? "0 0 0 1/1 * ?" : alertMessageHistoryPartManJobCron); - + // record rule 配置管理任务 createCronScheduleJob(recordtRuleManagerJobName, recordtRuleManagerJobDetail(), StringUtils.isEmpty(recordtRuleManagerJobCron) ? "0/10 * * * * ?" : recordtRuleManagerJobCron); // 清理没有真实使用的 terminal ssh/telnet 连接,以超过最后数据交互时间 30 分钟为准 (sys_config.session_timeout) createCronScheduleJob(cleanNoRealUseTerminalConnJobName, cleanNoRealUseTerminalConnJobDetail(), StringUtils.isEmpty(cleanNoRealUseTerminalConnJobCron) ? "0 0/5 * * * ? *" : cleanNoRealUseTerminalConnJobCron); + + // terminal record expired data clean job + createCronScheduleJob(cleanExpiredTerminalRecordDataJob, cleanExpiredTerminalRecordDataJobDetail(), Tool.StrUtil.emptyToDefault(cleanExpiredTerminalRecordDataJobCron, "0 0 1 * * ?")); } - + /** * 根据 jobName jobDetail jobCronExpression 创建任务,如果存在则删除再创建 * @param jobName diff --git a/nz-admin/src/main/java/com/nis/modules/sys/job/CleanJob.java b/nz-admin/src/main/java/com/nis/modules/sys/job/CleanJob.java index c9a55cf8..bf3d88bb 100644 --- a/nz-admin/src/main/java/com/nis/modules/sys/job/CleanJob.java +++ b/nz-admin/src/main/java/com/nis/modules/sys/job/CleanJob.java @@ -12,11 +12,9 @@ import com.nis.modules.alert.dao.AlertMessageHistoryPartitionManagementDao; import com.nis.modules.alert.entity.AlertRuleEvalLog; import com.nis.modules.alert.entity.AlertRuleSchedLog; import com.nis.modules.alert.job.AlertMessageHistoryPartitionManagementJob; -import com.nis.modules.alert.service.AlertMessageService; import com.nis.modules.alert.service.AlertRuleEvalLogService; import com.nis.modules.alert.service.AlertRuleSchedLogService; import com.nis.modules.sys.service.SysConfigService; -import com.nis.modules.terminal.service.TerminalSessionService; import org.apache.commons.lang3.time.StopWatch; import org.quartz.DisallowConcurrentExecution; import org.quartz.JobExecutionContext; @@ -29,8 +27,10 @@ import java.util.List; import java.util.stream.Collectors; /** - * 系统清理任务 1. terminal log,默认保留 365天. 2. alert_rule_eval_log,默认保留 30天 3. - * alert_rule_sched_log,默认保留 30天 4. alert_message_history ,默认不清理 + * 系统清理任务 + * 1. alert_rule_eval_log,默认保留 30天 + * 2. alert_rule_sched_log,默认保留 30天 + * 3. alert_message_history ,默认不清理 */ @DisallowConcurrentExecution public class CleanJob extends QuartzJobBean { @@ -38,18 +38,12 @@ public class CleanJob extends QuartzJobBean { private Log log = Log.get(); @Autowired - private TerminalSessionService terminalSessionService; - - @Autowired private AlertRuleEvalLogService alertRuleEvalLogService; @Autowired private AlertRuleSchedLogService alertRuleSchedLogService; @Autowired - private AlertMessageService alertMessageService; - - @Autowired private SysConfigService sysConfigService; @Autowired @@ -71,19 +65,19 @@ public class CleanJob extends QuartzJobBean { StopWatch sw = new StopWatch(); // terminal_* - String terminalRecordLocalRetention = sysConfigService.getValue("terminal_record_local_retention"); - terminalRecordLocalRetention = Tool.StrUtil.emptyToDefault(terminalRecordLocalRetention, "365"); - DateTime startCleanTerminalTime = Tool.DateUtil.offsetDay(now, -Integer.valueOf(terminalRecordLocalRetention)); - sw.start(); - try { - log.info("start clean terminal data"); - terminalSessionService.cleanExpiredDataByTime(startCleanTerminalTime); - } catch (Exception e) { - log.error("clean terminal data error", e); - } finally { - sw.stop(); - } - log.info("clean terminal data Run Time: {}", sw.toString()); +// String terminalRecordLocalRetention = sysConfigService.getValue("terminal_record_local_retention"); +// terminalRecordLocalRetention = Tool.StrUtil.emptyToDefault(terminalRecordLocalRetention, "365"); +// DateTime startCleanTerminalTime = Tool.DateUtil.offsetDay(now, -Integer.valueOf(terminalRecordLocalRetention)); +// sw.start(); +// try { +// log.info("start clean terminal data"); +// terminalSessionService.cleanExpiredDataByTime(startCleanTerminalTime); +// } catch (Exception e) { +// log.error("clean terminal data error", e); +// } finally { +// sw.stop(); +// } +// log.info("clean terminal data Run Time: {}", sw.toString()); // alert_rule_eval_log String ruleEvalLogLocalRetention = sysConfigService.getValue("alert_rule_eval_log_local_retention"); diff --git a/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalClient.java b/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalClient.java index 34cc7f11..b293c46d 100644 --- a/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalClient.java +++ b/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalClient.java @@ -201,7 +201,7 @@ public class TerminalClient { long terminalSessionStartTime = (long) TerminalSession.getTerminalAttributeInfo(clientId).get("terminalSessionStartTime"); terminalRecord.setEndTime((int) (System.currentTimeMillis() - terminalSessionStartTime)); terminalRecord.setContent(Files.readAllBytes(tempFile.toPath())); - terminalRecordService.save(terminalRecord); + terminalRecordService.saveTerminalRecordContent(terminalRecord); } catch (Exception e) { log.error(e, "Save terminal record content error. clientid: {}, file length: {}, File info: {}", clientId, tempFile.length(), tempFile.toString()); } finally { diff --git a/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalHandler.java b/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalHandler.java index 585efacf..1d2af4e2 100644 --- a/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalHandler.java +++ b/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalHandler.java @@ -363,7 +363,7 @@ public class TerminalHandler extends TextWebSocketHandler { long terminalSessionStartTime = (long) TerminalSession.getTerminalAttributeInfo(uuid).get("terminalSessionStartTime"); terminalRecord.setEndTime((int) (System.currentTimeMillis() - terminalSessionStartTime)); terminalRecord.setContent(Files.readAllBytes(tempFile.toPath())); - terminalRecordService.save(terminalRecord); + terminalRecordService.saveTerminalRecordContent(terminalRecord); } catch (Exception e) { log.error(e, "Save terminal record content error. clientid: {}, file length: {}, File info: {}", uuid, tempFile.length(), tempFile.toString()); } finally { diff --git a/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalInteractionRunnable.java b/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalInteractionRunnable.java index 5cb81c13..75bbeb7d 100644 --- a/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalInteractionRunnable.java +++ b/nz-admin/src/main/java/com/nis/modules/terminal/backend/TerminalInteractionRunnable.java @@ -123,7 +123,7 @@ public class TerminalInteractionRunnable extends Thread { } long terminalSessionStartTime = (long) TerminalSession.getTerminalAttributeInfo(clientId).get("terminalSessionStartTime"); terminalRecord.setEndTime((int) (System.currentTimeMillis() - terminalSessionStartTime)); - terminalRecordService.save(terminalRecord); + terminalRecordService.saveTerminalRecordContent(terminalRecord); } catch (Exception e) { log.error(e, "Terminal connection has been closed, Terminal Record file content record failed. clientid: {}, File info: {}", clientId, tempFile.toString()); } finally { diff --git a/nz-admin/src/main/java/com/nis/modules/terminal/entity/TerminalRecord.java b/nz-admin/src/main/java/com/nis/modules/terminal/entity/TerminalRecord.java index 8af8dc88..d8728e2e 100644 --- a/nz-admin/src/main/java/com/nis/modules/terminal/entity/TerminalRecord.java +++ b/nz-admin/src/main/java/com/nis/modules/terminal/entity/TerminalRecord.java @@ -35,9 +35,19 @@ public class TerminalRecord { private Integer endTime; /** + * 存储模式,1: 本地存储,2:S3对象存储 + */ + private Integer storageType; + + /** + * S3 path + */ + private String path; + + /** * 文件内容 * 记录的是 session开始后的时间偏移量 * 超过定义临时文件大小后,再新增一条记录 */ private byte[] content; -}
\ No newline at end of file +} diff --git a/nz-admin/src/main/java/com/nis/modules/terminal/job/CleanExpiredTerminalRecordDataJob.java b/nz-admin/src/main/java/com/nis/modules/terminal/job/CleanExpiredTerminalRecordDataJob.java new file mode 100644 index 00000000..8da2113e --- /dev/null +++ b/nz-admin/src/main/java/com/nis/modules/terminal/job/CleanExpiredTerminalRecordDataJob.java @@ -0,0 +1,180 @@ +package com.nis.modules.terminal.job; + +import cn.hutool.core.date.DateTime; +import cn.hutool.core.date.StopWatch; +import cn.hutool.core.util.BooleanUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.log.Log; +import com.amazonaws.SDKGlobalConfiguration; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.BucketLifecycleConfiguration; +import com.nis.common.utils.Constant; +import com.nis.common.utils.Tool; +import com.nis.modules.sys.service.SysConfService; +import com.nis.modules.terminal.service.TerminalRecordService; +import com.nis.modules.terminal.service.TerminalSessionService; +import com.nis.modules.terminal.utils.TerminalConstant; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.quartz.QuartzJobBean; +import org.springframework.transaction.annotation.Transactional; + +import java.util.Arrays; +import java.util.List; + +@DisallowConcurrentExecution +public class CleanExpiredTerminalRecordDataJob extends QuartzJobBean { + + private static final Log log = Log.get(); + + static { + System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true"); + } + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private SysConfService sysConfService; + + @Autowired + private TerminalSessionService terminalSessionService; + + @Autowired + private TerminalRecordService terminalRecordService; + + @Override + protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { + Thread.currentThread().setName("CleanExpiredTerminalRecordDataJob"); + + // 检查 haMode + String serverId = (String) redisTemplate.opsForValue().get(Constant.SYS_HA_LOCK); + log.info("[CleanExpiredTerminalRecordDataJob] [Get ha lock from redis] [server id: {}]", serverId); + if (!StrUtil.equals(serverId, Constant.SERVER_ID)) { + log.warn("[CleanExpiredTerminalRecordDataJob] [did not get the ha lock, not run job] [redis server id: {}] [system server id: {}]", serverId, Constant.SERVER_ID); + return; + } + + log.info("[CleanExpiredTerminalRecordDataJob] [start cleaning expired data]"); + + StopWatch sw = StopWatch.create("CleanExpiredTerminalRecordDataJob"); + sw.start(); + + try { + // clean Expired Terminal Record Data + this.cleanExpiredTerminalRecordData(); + } catch (Exception e) { + log.error(e, "[CleanExpiredTerminalRecordDataJob] [failed to clean data]"); + } finally { + sw.stop(); + } + + log.info("[CleanExpiredTerminalRecordDataJob] [clean data complete] [Run Time: {}]", sw.prettyPrint()); + } + + @Transactional(rollbackFor = Exception.class) + public void cleanExpiredTerminalRecordData() { + // 存储模式,1: 本地存储,2:S3对象存储 + String terminalStorageType = sysConfService.getValue(TerminalConstant.TERMINAL_STORAGE_TYPE); + terminalStorageType = StrUtil.emptyToDefault(terminalStorageType, "1"); + log.info("[cleanExpiredTerminalRecordData] [record storage type] [value: {}]", terminalStorageType); + + DateTime now = Tool.DateUtil.date(); + + // 数据存储天数,默认 365 天 + String terminalRecordLocalRetention = sysConfService.getValue(TerminalConstant.TERMINAL_RECORD_LOCAL_RETENTION); + terminalRecordLocalRetention = Tool.StrUtil.emptyToDefault(terminalRecordLocalRetention, "365"); + + // 过期时间 + DateTime expireDate = Tool.DateUtil.offsetDay(now, -Integer.valueOf(terminalRecordLocalRetention)); + log.info("[cleanExpiredTerminalRecordData] [clear expired record date in db] [now: {}] [retention days: {}] [expire date: {}]", now, terminalRecordLocalRetention, expireDate); + terminalSessionService.cleanExpiredDataByTime(expireDate); + log.info("[cleanExpiredTerminalRecordData] [clear expired record date in db finshed]"); + + // 如果是 S3 存储内容信息,则配置存储桶过期策略 + if (StrUtil.equals("2", terminalStorageType)) { + AmazonS3 s3Client = terminalRecordService.buildTerminalStorageS3Client(); + try { + String bucketName = sysConfService.getValue(TerminalConstant.TERMINAL_STORAGE_S3_BUCKET); + log.info("[cleanExpiredTerminalRecordData] [config terminal record bucket lifecycle] [bucketName: {}] [retention days: {}]", bucketName, terminalRecordLocalRetention); + + if (s3Client.doesBucketExistV2(bucketName)) { + log.info("[cleanExpiredTerminalRecordData] [bucket exist] [bucketName: {}]", bucketName); + + Integer retentionDay = Integer.valueOf(terminalRecordLocalRetention); + String ruleId = "terminalRecordExpirationRule"; + + // Retrieve the configuration. + // Verify that the configuration now has one rules. prefix: fake/ expirationInDays: sysconfig.metricsStorageRetentionDay + BucketLifecycleConfiguration bucketLifecycleConfiguration = s3Client.getBucketLifecycleConfiguration(bucketName); + if (ObjectUtil.isNull(bucketLifecycleConfiguration)) { + log.info("[cleanExpiredTerminalRecordData] [bucket has empty lifecycle configuration] [bucketName: {}]", bucketName); + this.setBucketLifecycleConfiguration(s3Client, bucketName, ruleId, retentionDay); + } else { + List<BucketLifecycleConfiguration.Rule> bucketLifecycleConfigurationRules = bucketLifecycleConfiguration.getRules(); + bucketLifecycleConfigurationRules = Tool.CollUtil.defaultIfEmpty(bucketLifecycleConfigurationRules, Tool.ListUtil.empty()); + + BucketLifecycleConfiguration.Rule cortexExpirationRule = bucketLifecycleConfigurationRules.stream() + .filter(rule -> StrUtil.equals(rule.getId(), ruleId)) + .findFirst() + .orElse(null); + + if (ObjectUtil.isNull(cortexExpirationRule)) { + log.info("[cleanExpiredTerminalRecordData] [bucket has empty lifecycle configuration] [bucketName: {}]", bucketName); + this.setBucketLifecycleConfiguration(s3Client, bucketName, ruleId, retentionDay); + } else { + // 当前存在过期策略,判断规则是否发生变化 + boolean isUpdate = BooleanUtil.or( + ObjectUtil.notEqual(BucketLifecycleConfiguration.ENABLED, cortexExpirationRule.getStatus()), + ObjectUtil.notEqual(retentionDay, cortexExpirationRule.getExpirationInDays()) + ); + log.info("[cleanExpiredTerminalRecordData] [bucket lifecycle configuration] [bucketName: {}] [updated: {}]", bucketName, isUpdate); + if (isUpdate) { + this.setBucketLifecycleConfiguration(s3Client, bucketName, ruleId, retentionDay); + } + } + } + + // query the existing lifecycle configuration + bucketLifecycleConfiguration = s3Client.getBucketLifecycleConfiguration(bucketName); + List<BucketLifecycleConfiguration.Rule> rules1 = bucketLifecycleConfiguration.getRules(); + for (BucketLifecycleConfiguration.Rule l : rules1) { + log.info("[cleanExpiredTerminalRecordData] [current lifecycle config info] [bucketName: {}] [status: {}] [expiration days: {}]", bucketName, l.getStatus(), l.getExpirationInDays()); + } + } else { + log.warn("[cleanExpiredTerminalRecordData] [bucket does not exist] [bucketName: {}]", bucketName); + } + } catch (Exception e) { + log.error(e, "[cleanExpiredTerminalRecordData] [error]"); + } finally { + s3Client.shutdown(); + } + } + } + + /** + * set Bucket Lifecycle Configuration + * + * @param s3Client + * @param bucketName + * @param ruleId + * @param retentionDay + */ + private void setBucketLifecycleConfiguration(AmazonS3 s3Client, String bucketName, String ruleId, Integer retentionDay) { + log.info("[setBucketLifecycleConfiguration] [bucketName: {}] [ruleId: {}] [retentionDay: {}]", bucketName, ruleId, retentionDay); + + BucketLifecycleConfiguration.Rule rule = new BucketLifecycleConfiguration.Rule() + .withId(ruleId) + .withExpirationInDays(retentionDay) + .withStatus(BucketLifecycleConfiguration.ENABLED); + + BucketLifecycleConfiguration configuration = new BucketLifecycleConfiguration().withRules(Arrays.asList(rule)); + // Save the configuration. + s3Client.setBucketLifecycleConfiguration(bucketName, configuration); + } + +} diff --git a/nz-admin/src/main/java/com/nis/modules/terminal/service/TerminalRecordService.java b/nz-admin/src/main/java/com/nis/modules/terminal/service/TerminalRecordService.java index 676e0b6e..7c5088bf 100644 --- a/nz-admin/src/main/java/com/nis/modules/terminal/service/TerminalRecordService.java +++ b/nz-admin/src/main/java/com/nis/modules/terminal/service/TerminalRecordService.java @@ -1,5 +1,6 @@ package com.nis.modules.terminal.service; +import com.amazonaws.services.s3.AmazonS3; import com.baomidou.mybatisplus.extension.service.IService; import com.nis.modules.terminal.entity.TerminalRecord; @@ -8,4 +9,18 @@ import java.util.Map; public interface TerminalRecordService extends IService<TerminalRecord> { Map queryTerminalRecordInfos(Map<String, Object> params); -}
\ No newline at end of file + + /** + * save Terminal Record Content + * + * @param record + */ + void saveTerminalRecordContent(TerminalRecord record); + + /** + * build Terminal Storage S3 Client + * + * @return + */ + AmazonS3 buildTerminalStorageS3Client(); +} diff --git a/nz-admin/src/main/java/com/nis/modules/terminal/service/impl/TerminalRecordServiceImpl.java b/nz-admin/src/main/java/com/nis/modules/terminal/service/impl/TerminalRecordServiceImpl.java index 492b82f9..a8062af8 100644 --- a/nz-admin/src/main/java/com/nis/modules/terminal/service/impl/TerminalRecordServiceImpl.java +++ b/nz-admin/src/main/java/com/nis/modules/terminal/service/impl/TerminalRecordServiceImpl.java @@ -1,30 +1,53 @@ package com.nis.modules.terminal.service.impl; +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.CharsetUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.log.Log; import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONPath; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.ObjectMetadata; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.nis.common.exception.NZException; import com.nis.common.smartvalidate.ValidateUtils; +import com.nis.common.utils.JSONUtil; import com.nis.common.utils.RCode; import com.nis.common.utils.StringUtils; +import com.nis.common.utils.Tool; +import com.nis.modules.sys.service.SysConfService; import com.nis.modules.terminal.dao.TerminalRecordDao; import com.nis.modules.terminal.entity.TerminalRecord; import com.nis.modules.terminal.service.TerminalRecordService; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections.CollectionUtils; +import com.nis.modules.terminal.utils.TerminalConstant; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.io.ByteArrayInputStream; import java.util.Collections; -import java.util.HashMap; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -@Slf4j @Service("terminalRecordService") public class TerminalRecordServiceImpl extends ServiceImpl<TerminalRecordDao, TerminalRecord> implements TerminalRecordService { + private static final Log log = Log.get(); + + @Autowired + private SysConfService sysConfService; + @Override public Map queryTerminalRecordInfos(Map<String, Object> params) { // uuid 不能为空 @@ -37,49 +60,209 @@ public class TerminalRecordServiceImpl extends ServiceImpl<TerminalRecordDao, Te throw new NZException(RCode.TERMINAL_QUERYSIZE_INVALIDED); } - List<TerminalRecord> list = this.list(new LambdaQueryWrapper<TerminalRecord>() + Map<Object, Object> resultMap = Tool.MapUtil.builder().map(); + resultMap.put("uuid", params.get("uuid")); + resultMap.put("startTime", time); + resultMap.put("endTime", null); + resultMap.put("hasNext", 0); + resultMap.put("list", Collections.emptyList()); + + // content Str List of uuid + List<String> contentStrListOfUuid = Tool.ListUtil.list(true); + + // record List In Db + List<TerminalRecord> recordListInDb = this.list(new LambdaQueryWrapper<TerminalRecord>() .eq(TerminalRecord::getUuid, params.get("uuid")) .orderByAsc(TerminalRecord::getStartTime)); - Map map = new HashMap(8); - map.put("uuid", params.get("uuid")); + log.info("[queryTerminalRecordInfos] [record data in db] [uuid: {}] [size: {}]", params.get("uuid"), recordListInDb.size()); - if (CollectionUtils.isNotEmpty(list)) { - int hashNext = 0; + if (Tool.CollUtil.isNotEmpty(recordListInDb)) { + // storage type + Integer recordStorageType = recordListInDb.stream().findFirst().get().getStorageType(); + if (ObjectUtil.equals(1, recordStorageType)) { + log.info("[queryTerminalRecordInfos] [record data source is db] [uuid: {}] [size: {}]", params.get("uuid"), recordListInDb.size()); + // db 本地存储 + contentStrListOfUuid = recordListInDb.stream().map(record -> StrUtil.str(record.getContent(), CharsetUtil.charset("UTF-8"))).collect(Collectors.toList()); + } else if (ObjectUtil.equals(2, recordStorageType)) { + log.info("[queryTerminalRecordInfos] [record data source is s3] [uuid: {}] [size in db: {}]", params.get("uuid"), recordListInDb.size()); + // S3 对象存储 + AmazonS3 s3Client = this.buildTerminalStorageS3Client(); + try { + String bucketName = sysConfService.getValue(TerminalConstant.TERMINAL_STORAGE_S3_BUCKET); - StringBuilder builder = new StringBuilder(); - builder.append("["); - for (TerminalRecord record : list) { - builder.append(new String(record.getContent(), CharsetUtil.charset("UTF-8"))); - } - String str = builder.substring(0, builder.length() - 1); - List<Map> maps = JSONArray.parseArray(str += "]", Map.class); - maps = maps.stream().filter(tempEntity -> { - Integer t = (Integer) tempEntity.get("t"); - if (t > time) { - return true; + /* + // query objects in s3 + ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request(); + listObjectsV2Request + .withBucketName(bucketName) + .withMaxKeys(1000) + .withPrefix(StrUtil.toString(params.get("uuid"))); + + ListObjectsV2Result listObjectsV2Result = s3Client.listObjectsV2(listObjectsV2Request); + List<S3ObjectSummary> S3ObjectSummarylist = listObjectsV2Result.getObjectSummaries(); + // 再次排序 + List<String> fileNameList = S3ObjectSummarylist.stream().map(S3ObjectSummary::getKey) + .sorted(Comparator.comparing(str -> Integer.valueOf(Tool.StrUtil.subAfter(str, "/", true)))).collect(Collectors.toList());*/ + + List<String> fileNameList = recordListInDb.stream().map(TerminalRecord::getPath) + .sorted(Comparator.comparing(str -> Integer.valueOf(Tool.StrUtil.subAfter(str, "/", true)))).collect(Collectors.toList()); + for (String fileName : fileNameList) { + String objectAsString = s3Client.getObjectAsString(bucketName, fileName); + contentStrListOfUuid.add(objectAsString); + } + log.info("[queryTerminalRecordInfos] [record data source is s3] [uuid: {}] [size in db: {}] [size in s3: {}]", params.get("uuid"), fileNameList.size(), contentStrListOfUuid.size()); + } finally { + s3Client.shutdown(); } - return false; - }).collect(Collectors.toList()); + } + } + if (Tool.CollUtil.isNotEmpty(contentStrListOfUuid)) { + // get List Of Content Obj Older Than Time Param + List<Map> contentObjList = this.getListOfContentObjOlderThanTimeParam(contentStrListOfUuid, time); // size 为 1 表示不限制条数 - if (!size.equals(1) && maps.size() > size) { - maps = maps.subList(0, size); - hashNext = 1; + if (BooleanUtil.and(ObjectUtil.notEqual(size, 1), + contentObjList.size() > size)) { + contentObjList = contentObjList.subList(0, size); + resultMap.put("hasNext", 1); + } + + Map endEntity = contentObjList.get(contentObjList.size() - 1); + resultMap.put("endTime", endEntity.get("t")); + resultMap.put("list", contentObjList); + } + return resultMap; + } + + /** + * get List Of Content Obj Older Than Time Param + * + * @param contentList + * @param time + * @return + */ + private List<Map> getListOfContentObjOlderThanTimeParam(List<String> contentList, Integer time) { + StringBuilder builder = new StringBuilder(); + builder.append("["); + + for (String content : contentList) { + builder.append(content); + } + + String str = builder.substring(0, builder.length() - 1); + + List<Map> maps = JSONArray.parseArray(str += "]", Map.class); + maps = maps.stream().filter(tempEntity -> { + Integer t = (Integer) tempEntity.get("t"); + if (t > time) { + return true; + } + return false; + }).collect(Collectors.toList()); + + return maps; + } + + @Override + public void saveTerminalRecordContent(TerminalRecord record) { + // 存储模式,1: 本地存储,2:S3对象存储 + String terminalStorageType = sysConfService.getValue(TerminalConstant.TERMINAL_STORAGE_TYPE); + terminalStorageType = StrUtil.emptyToDefault(terminalStorageType, "1"); + log.info("[saveTerminalRecordContent] [storage type] [value: {}] [uuid: {}]", terminalStorageType, record.getUuid()); + + // storageType + record.setStorageType(Integer.valueOf(terminalStorageType)); + + switch (terminalStorageType) { + case "1": { + // 本地存储 + this.save(record); + log.info("[saveTerminalRecordContent] [local storage record succeeded] [uuid: {}]", record.getUuid()); + break; } + case "2": { + // 保存记录至 db,不包含 content 内容 + TerminalRecord saveRecordEntity = new TerminalRecord(); + BeanUtil.copyProperties(record, saveRecordEntity, "content"); + + + String uuid = record.getUuid(); + Integer startTime = record.getStartTime(); + + // 以 uuid 为文件路径,startTime 为文件名称 + String key = StrUtil.concat(true, uuid, "/", startTime.toString()); + + saveRecordEntity.setPath(key); + this.save(saveRecordEntity); + + // S3 对象存储 + AmazonS3 s3Client = this.buildTerminalStorageS3Client(); + + String bucketName = sysConfService.getValue(TerminalConstant.TERMINAL_STORAGE_S3_BUCKET); + boolean doesBucketExist = s3Client.doesBucketExistV2(bucketName); + log.info("[saveTerminalRecordContent] [bucketName: {}] [doesBucketExist: {}]", bucketName, doesBucketExist); + + if (!doesBucketExist) { + log.info("[saveTerminalRecordContent] [create bucket] [name: {}]", bucketName); + s3Client.createBucket(bucketName); + } - Map endEntity = maps.get(maps.size() - 1); - map.put("endTime", endEntity.get("t")); - map.put("hasNext", hashNext); + byte[] content = record.getContent(); + ByteArrayInputStream input = null; + try { + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentType("application/json"); + metadata.setContentLength(content.length); + metadata.setHeader("x-hos-meta-message", "message"); - map.put("list", maps); - } else { - map.put("endTime", null); - map.put("hasNext", 0); - map.put("list", Collections.emptyList()); + input = new ByteArrayInputStream(content); + + // 以流方式上传对象,指定桶名、对象名、输入流、ObjectMetadata + s3Client.putObject(bucketName, key, input, metadata); + + log.info("[saveTerminalRecordContent] [remote storage record succeeded] [key: {}]", key); + } catch (Exception e) { + log.error(e, "[saveTerminalRecordContent] [upload record content to s3 error] [uuid: {}] [start: {}] [size: {}]", uuid, startTime, content.length); + } finally { + Tool.IoUtil.close(input); + // 关闭 AmazonS3Client + s3Client.shutdown(); + } + break; + } + default: + break; } - map.put("startTime", time); - return map; } -}
\ No newline at end of file + + /** + * build Terminal Storage S3 Client + * + * @return + */ + @Override + public AmazonS3 buildTerminalStorageS3Client() { + log.info("[buildTerminalStorageS3Client] [build Amazon S3 Client begin...]"); + + String s3Endpoint = sysConfService.getValue(TerminalConstant.TERMINAL_STORAGE_S3_ENDPOINT); + String accessKey = sysConfService.getValue(TerminalConstant.TERMINAL_STORAGE_S3_ACCESS_KEY); + String secretKey = sysConfService.getValue(TerminalConstant.TERMINAL_STORAGE_S3_SECRET_ACCESS_KEY); + + AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey); + ClientConfiguration clientConfig = new ClientConfiguration(); + clientConfig.setProtocol(Protocol.HTTP); + + AmazonS3 s3Client = AmazonS3ClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(credentials)) + .withClientConfiguration(clientConfig) + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration(s3Endpoint, null) + ) + .build(); + + log.info("[buildTerminalStorageS3Client] [build Amazon S3 Client finshed] [ClientInfo: {}]", JSONPath.read(JSONUtil.toJsonStr(s3Client), "endpoint")); + return s3Client; + } +} diff --git a/nz-admin/src/main/java/com/nis/modules/terminal/utils/TerminalConstant.java b/nz-admin/src/main/java/com/nis/modules/terminal/utils/TerminalConstant.java index d29b9b36..2937ed18 100644 --- a/nz-admin/src/main/java/com/nis/modules/terminal/utils/TerminalConstant.java +++ b/nz-admin/src/main/java/com/nis/modules/terminal/utils/TerminalConstant.java @@ -15,6 +15,18 @@ public class TerminalConstant { */ public static final Integer RECORD_TEMPFILE_MAX_LENGTH = 10485760; + + // 存储模式,1: 本地存储,2:S3对象存储 + public static final String TERMINAL_STORAGE_TYPE = "terminal_storage_type"; + + public static final String TERMINAL_STORAGE_S3_ENDPOINT = "terminal_storage_s3_endpoint"; + public static final String TERMINAL_STORAGE_S3_BUCKET = "terminal_storage_s3_bucket"; + public static final String TERMINAL_STORAGE_S3_ACCESS_KEY = "terminal_storage_s3_access_key"; + public static final String TERMINAL_STORAGE_S3_SECRET_ACCESS_KEY = "terminal_storage_s3_secret_access_key"; + + // 存储天数 + public static final String TERMINAL_RECORD_LOCAL_RETENTION = "terminal_record_local_retention"; + /** * 不可见字符 */ diff --git a/nz-admin/src/main/resources/db/V2022.12.26__modify_terminal_record_structure.sql b/nz-admin/src/main/resources/db/V2022.12.26__modify_terminal_record_structure.sql new file mode 100644 index 00000000..4bef91a4 --- /dev/null +++ b/nz-admin/src/main/resources/db/V2022.12.26__modify_terminal_record_structure.sql @@ -0,0 +1,11 @@ + +/** + 1. terminal_record add column storage_type & path + 2. terminal_record modify column content + */ + +ALTER TABLE `terminal_record` +ADD COLUMN IF NOT EXISTS `storage_type` int(1) NOT NULL DEFAULT 1 COMMENT '存储模式,1: 本地存储,2:S3对象存储', +ADD COLUMN IF NOT EXISTS `path` varchar(4096) DEFAULT NULL COMMENT 'S3对象存储时有效', +MODIFY COLUMN `content` MEDIUMBLOB NULL COMMENT '记录的是 session开始后的时间偏移量\r\n\r\n最大储存 65K,超过部分再新增一条记录'; + diff --git a/nz-admin/src/main/resources/mapper/terminal/TerminalSessionDao.xml b/nz-admin/src/main/resources/mapper/terminal/TerminalSessionDao.xml index 77983210..b371151c 100644 --- a/nz-admin/src/main/resources/mapper/terminal/TerminalSessionDao.xml +++ b/nz-admin/src/main/resources/mapper/terminal/TerminalSessionDao.xml @@ -72,10 +72,11 @@ WHERE ts.start_time <= #{startCleanTerminalTime}; - DELETE tr.*,ts.* + DELETE tr.*,ts.*,tsl.* FROM terminal_session ts LEFT JOIN terminal_record tr ON ts.uuid = tr.uuid + LEFT JOIN terminal_sftp_log tsl on ts.uuid = tsl.uuid WHERE ts.start_time <= #{startCleanTerminalTime}; </delete> |
