diff options
3 files changed, 231 insertions, 1 deletions
diff --git a/nz-admin/src/main/java/com/nis/common/job/JobConfig.java b/nz-admin/src/main/java/com/nis/common/job/JobConfig.java index e22e767a..d96af3b1 100644 --- a/nz-admin/src/main/java/com/nis/common/job/JobConfig.java +++ b/nz-admin/src/main/java/com/nis/common/job/JobConfig.java @@ -1,9 +1,11 @@ package com.nis.common.job; +import cn.hutool.core.util.StrUtil; import cn.hutool.log.Log; import com.nis.modules.agent.job.AgentHealthCheckJob; import com.nis.modules.agent.job.AutoConfigAgentJob; import com.nis.modules.agent.job.CortexHistoryDataDelJob; +import com.nis.modules.agent.job.LokiHistoryDataDelJob; import com.nis.modules.alert.backend.AlertMessageExpiredJob; import com.nis.modules.alert.backend.AlertRuleManagerJob; import com.nis.modules.alert.backend.AlertSilenceStatusJob; @@ -16,7 +18,6 @@ import com.nis.modules.endpoint.job.EndpointStateJob; import com.nis.modules.sys.job.CleanJob; import com.nis.modules.sys.job.SessionTimeOutJob; import com.nis.modules.sys.job.SysComponentJob; - import org.apache.commons.lang.StringUtils; import org.quartz.*; import org.springframework.beans.factory.annotation.Autowired; @@ -79,6 +80,9 @@ public class JobConfig { @Value("${nezha.cortexHistoryDataDelJobCron:0 0 0/2 * * ?}") private String cortexHistoryDataDelJobCron; + @Value("${nezha.lokiHistoryDataDelJobCron:0 0 0/2 * * ?}") + private String lokiHistoryDataDelJobCron; + @Value("${nezha.alertMessageHistoryPartManJobCron:0 0 0 1/1 * ?}") private String alertMessageHistoryPartManJobCron; @@ -104,6 +108,7 @@ public class JobConfig { private String cleanJobCronName = "CLEAN_JOB"; private String sysComponentJobName = "SYSCOMPONENT_JOB"; private String cortexHistoryDataDelJobName = "CORTEXHISTORYDATADEL_JOB"; + private String lokiHistoryDataDelJobName = "LOKIHISTORYDATADELJOB_JOB"; private String alertMessageHistoryPartManJobName = "ALERTMESSAGEHISTORYPARTMAN_JOB"; private String recordtRuleManagerJobName = "RECORDTRULEMANAGER_JOB"; @@ -178,6 +183,11 @@ public class JobConfig { } @Bean + public JobDetail lokiHistoryDataDelJobDetail() { + return JobBuilder.newJob(LokiHistoryDataDelJob.class).withIdentity(JOB_NAME + lokiHistoryDataDelJobName).storeDurably().build(); + } + + @Bean public JobDetail alertMessageHistoryPartManJobDetail() { return JobBuilder.newJob(AlertMessageHistoryPartitionManagementJob.class).withIdentity(JOB_NAME + alertMessageHistoryPartManJobName).storeDurably().build(); } @@ -232,6 +242,9 @@ public class JobConfig { // cortex metrics 历史数据删除 createCronScheduleJob(cortexHistoryDataDelJobName, cortexHistoryDataDelJobDetail(), StringUtils.isEmpty(cortexHistoryDataDelJobCron) ? "0 0 0/2 * * ?" : cortexHistoryDataDelJobCron); + // loki logs 历史数据删除 + createCronScheduleJob(lokiHistoryDataDelJobName, lokiHistoryDataDelJobDetail(), StrUtil.emptyToDefault(lokiHistoryDataDelJobCron, "0 0 0/2 * * ?")); + // alert message History Partition Management 分区管理 createCronScheduleJob(alertMessageHistoryPartManJobName, alertMessageHistoryPartManJobDetail(), StringUtils.isEmpty(alertMessageHistoryPartManJobCron) ? "0 0 0 1/1 * ?" : alertMessageHistoryPartManJobCron); diff --git a/nz-admin/src/main/java/com/nis/common/utils/Constant.java b/nz-admin/src/main/java/com/nis/common/utils/Constant.java index 9c0fc850..e0a88bf9 100644 --- a/nz-admin/src/main/java/com/nis/common/utils/Constant.java +++ b/nz-admin/src/main/java/com/nis/common/utils/Constant.java @@ -836,6 +836,7 @@ public class Constant { public static final String SYSCONFIG_KEY_LOGS_STORAGE_S3_ACCESS_KEY = "logs_storage_s3_access_key"; public static final String SYSCONFIG_KEY_LOGS_STORAGE_S3_SECRET_ACCESS_KEY = "logs_storage_s3_secret_access_key"; public static final String SYSCONFIG_KEY_LOGS_STORAGE_S3_INSECURE = "logs_storage_s3_insecure"; + public static final String SYSCONFIG_KEY_LOGS_STORAGE_S3_REGION = "logs_storage_s3_region"; /** * agent 配置下发 cortex / loki 重启超时时间设置 (在多少 秒 内返回 503 仍视为可用) diff --git a/nz-admin/src/main/java/com/nis/modules/agent/job/LokiHistoryDataDelJob.java b/nz-admin/src/main/java/com/nis/modules/agent/job/LokiHistoryDataDelJob.java new file mode 100644 index 00000000..bde17c12 --- /dev/null +++ b/nz-admin/src/main/java/com/nis/modules/agent/job/LokiHistoryDataDelJob.java @@ -0,0 +1,216 @@ +package com.nis.modules.agent.job; + +import cn.hutool.core.date.StopWatch; +import cn.hutool.core.util.BooleanUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.log.Log; +import com.alibaba.fastjson.JSONPath; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.SDKGlobalConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.BucketLifecycleConfiguration; +import com.amazonaws.services.s3.model.lifecycle.LifecycleFilter; +import com.amazonaws.services.s3.model.lifecycle.LifecyclePrefixPredicate; +import com.nis.common.utils.Constant; +import com.nis.common.utils.JSONUtil; +import com.nis.common.utils.Tool; +import com.nis.modules.sys.entity.SysConfigEntity; +import com.nis.modules.sys.service.SysConfigService; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.quartz.QuartzJobBean; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@DisallowConcurrentExecution +public class LokiHistoryDataDelJob extends QuartzJobBean { + + static { + System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true"); + } + + private static final Log log = Log.get(); + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private SysConfigService sysConfigService; + + @Override + protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { + Thread.currentThread().setName("LokiHistoryDataDelJob"); + + // 检查 haMode + String serverId = (String) redisTemplate.opsForValue().get(Constant.SYS_HA_LOCK); + log.info("[LokiHistoryDataDelJob] [Get ha lock from redis] [server id: {}]", serverId); + if (!StrUtil.equals(serverId, Constant.SERVER_ID)) { + log.warn("[LokiHistoryDataDelJob] [did not get the ha lock, not run job] [redis server id: {}] [system server id: {}]", serverId, Constant.SERVER_ID); + return; + } + + log.info("[LokiHistoryDataDelJob] [begin]"); + + StopWatch sw = StopWatch.create("LokiHistoryDataDelJob"); + sw.start(); + try { + // Loki Storage Retention + this.lokiStorageRetention(); + } catch (Exception e) { + log.error(e, "[LokiHistoryDataDelJob] [error]"); + } finally { + sw.stop(); + } + + log.info("[LokiHistoryDataDelJob] [finshed] [Run Time: {}]", sw.prettyPrint()); + } + + + /** + * buidl buildAmazonS3Client + * + * @param sysConfigMap + * @return + */ + private AmazonS3 buildAmazonS3Client(Map<String, String> sysConfigMap) { + log.info("[buildAmazonS3Client] [build Amazon S3 Client] [begin]"); + String connectionScheme = sysConfigMap.get(Constant.LOKI_MONITOR_SCHEME); + connectionScheme = StrUtil.isEmpty(connectionScheme) ? "http" : connectionScheme; + + String minioEndpoint = sysConfigMap.get(Constant.SYSCONFIG_KEY_LOGS_STORAGE_S3_ENDPOINT); + String accessKey = sysConfigMap.get(Constant.SYSCONFIG_KEY_LOGS_STORAGE_S3_ACCESS_KEY); + String secretKey = sysConfigMap.get(Constant.SYSCONFIG_KEY_LOGS_STORAGE_S3_SECRET_ACCESS_KEY); + String regionStr = sysConfigMap.get(Constant.SYSCONFIG_KEY_LOGS_STORAGE_S3_REGION); + + AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey); + ClientConfiguration clientConfig = new ClientConfiguration(); + clientConfig.setProtocol(Protocol.valueOf(connectionScheme.toUpperCase())); + + AmazonS3 s3Client = AmazonS3ClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(credentials)) + .withClientConfiguration(clientConfig) + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration(minioEndpoint, StrUtil.isEmpty(regionStr) ? null : regionStr) + ) + .build(); + log.info("[buildAmazonS3Client] [S3 Endpoint] [url: {}]", JSONPath.read(JSONUtil.toJsonStr(s3Client), "endpoint")); + log.info("[buildAmazonS3Client] [finshed]"); + return s3Client; + } + + /** + * loki Storage Retention + * 通过对 minio bucket 设置生命周期实现清理过期数据 + */ + public void lokiStorageRetention() { + List<SysConfigEntity> sysConfigList = sysConfigService.list(); + Map<String, String> sysConfigMap = sysConfigList.stream().collect(Collectors.toMap(SysConfigEntity::getParamKey, SysConfigEntity::getParamValue)); + + // logs storage 标识 1: 本地文件存储,2:S3对象存储 + String logsStorageType = sysConfigMap.get(Constant.SYSCONFIG_KEY_LOGS_STORAGE_TYPE); + logsStorageType = StrUtil.emptyToDefault(logsStorageType, "1"); + + log.info("[lokiStorageRetention] [begin] [logs_storage_type: {}]", logsStorageType); + if (StrUtil.equals("1", logsStorageType)) { + log.warn("[lokiStorageRetention] [not run job] [logs_storage_type: {}]", logsStorageType); + return; + } + + // build Amazon S3 Client + AmazonS3 s3Client = this.buildAmazonS3Client(sysConfigMap); + + String bucketName = sysConfigMap.get(Constant.SYSCONFIG_KEY_LOGS_STORAGE_S3_BUCKET); + boolean doesBucketExistV2 = s3Client.doesBucketExistV2(bucketName); + log.info("[lokiStorageRetention] [bucket: {}] [exist: {}]", bucketName, doesBucketExistV2); + + if (doesBucketExistV2) { + // 日志存储天数,默认 7 天 + String retentionDayStr = sysConfigMap.get(Constant.SYSCONFIG_KEY_LOGS_STORAGE_RETENTION); + Integer retentionDay = Integer.valueOf(StrUtil.emptyToDefault(retentionDayStr, "7")); + + // Retrieve the configuration. + // Verify that the configuration now has one rules. prefix: fake/ expirationInDays: sysconfig.logsStorageRetentionDay + BucketLifecycleConfiguration bucketLifecycleConfiguration = s3Client.getBucketLifecycleConfiguration(bucketName); + log.info("[lokiStorageRetention] [currunt lifecycle] [exist: {}]", ObjectUtil.isNotNull(bucketLifecycleConfiguration)); + + String ruleId = "LogsExpirationRule"; + if (ObjectUtil.isNull(bucketLifecycleConfiguration)) { + log.info("[lokiStorageRetention] [add lifecycle configuration]"); + this.setBucketLifecycleConfiguration(s3Client, bucketName, ruleId, retentionDay); + } else { + List<BucketLifecycleConfiguration.Rule> bucketLifecycleConfigurationRules = bucketLifecycleConfiguration.getRules(); + bucketLifecycleConfigurationRules = Tool.CollUtil.defaultIfEmpty(bucketLifecycleConfigurationRules, Tool.ListUtil.empty()); + BucketLifecycleConfiguration.Rule logsExpirationRule = bucketLifecycleConfigurationRules.stream().filter(rule -> StrUtil.equals(rule.getId(), ruleId)).findFirst().orElse(null); + log.info("[lokiStorageRetention] [filter rules by id] [bucket: {}] [ruleId: {}] [exist: {}]", bucketName, ruleId, ObjectUtil.isNotNull(logsExpirationRule)); + + if (ObjectUtil.isNull(logsExpirationRule)) { + log.info("[lokiStorageRetention] [add lifecycle configuration]"); + this.setBucketLifecycleConfiguration(s3Client, bucketName, ruleId, retentionDay); + } else { + // 当前存在过期策略,判断规则是否发生变化 + boolean isUpdate = BooleanUtil.or( + ObjectUtil.notEqual(BucketLifecycleConfiguration.ENABLED, logsExpirationRule.getStatus()), + ObjectUtil.notEqual(retentionDay, logsExpirationRule.getExpirationInDays()) + ); + + log.info("[lokiStorageRetention] [lifecycle configuration] [isUpdate: {}]", isUpdate); + if (isUpdate) { + log.info("[lokiStorageRetention] [update lifecycle configuration]"); + this.setBucketLifecycleConfiguration(s3Client, bucketName, ruleId, retentionDay); + } else { + log.info("[lokiStorageRetention] [lifecycle configuration is not update] [bucket: {}]", bucketName); + } + } + } + + // query the existing lifecycle configuration + bucketLifecycleConfiguration = s3Client.getBucketLifecycleConfiguration(bucketName); + List<BucketLifecycleConfiguration.Rule> rules1 = bucketLifecycleConfiguration.getRules(); + for (BucketLifecycleConfiguration.Rule l : rules1) { + LifecyclePrefixPredicate predicate = (LifecyclePrefixPredicate) l.getFilter().getPredicate(); + log.info("[lokiStorageRetention] [current lifecycle configuration] [bucket: {}] [status: {}] [prefix: {}] [expiration days: {}]", bucketName, l.getStatus(), predicate != null ? predicate.getPrefix() : "", l.getExpirationInDays()); + } + } else { + log.warn("[lokiStorageRetention] [bucket does not exist] [name: {}]", bucketName); + } + + log.info("[lokiStorageRetention] [finshed]"); + } + + /** + * set Bucket Lifecycle Configuration + * + * @param s3Client + * @param bucketName + * @param ruleId + * @param retentionDay + */ + private void setBucketLifecycleConfiguration(AmazonS3 s3Client, String bucketName, String ruleId, Integer retentionDay) { + log.info("[setBucketLifecycleConfiguration] [bucket: {}] [ruleId: {}] [retentionDay: {}]", bucketName, ruleId, retentionDay); + // Create a rule to archive objects with the "glacierobjects/" prefix to Glacier immediately. + BucketLifecycleConfiguration.Rule rule = new BucketLifecycleConfiguration.Rule() + .withId(ruleId) + .withFilter(new LifecycleFilter()) + .withExpirationInDays(retentionDay) + .withStatus(BucketLifecycleConfiguration.ENABLED); + + // Add the rules to a new BucketLifecycleConfiguration. + BucketLifecycleConfiguration configuration = new BucketLifecycleConfiguration().withRules(Arrays.asList(rule)); + // Save the configuration. + s3Client.setBucketLifecycleConfiguration(bucketName, configuration); + } + +} |
