summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/nis/restful/RestBusinessCode.java10
-rw-r--r--src/main/java/com/nis/util/Constants.java8
-rw-r--r--src/main/java/com/nis/util/JedisClusterUtils.java361
-rw-r--r--src/main/java/com/nis/util/ServiceAndRDBIndexReal.java2
-rw-r--r--src/main/java/com/nis/web/controller/BaseRestController.java6
-rw-r--r--src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java170
-rw-r--r--src/main/java/com/nis/web/service/restful/ConfigSourcesService.java400
-rw-r--r--src/main/java/com/nis/web/task/SyncAllConfigTask.java286
-rw-r--r--src/main/resources/nis.properties8
9 files changed, 1223 insertions, 28 deletions
diff --git a/src/main/java/com/nis/restful/RestBusinessCode.java b/src/main/java/com/nis/restful/RestBusinessCode.java
index 31b33b2..6b8bdf5 100644
--- a/src/main/java/com/nis/restful/RestBusinessCode.java
+++ b/src/main/java/com/nis/restful/RestBusinessCode.java
@@ -30,6 +30,16 @@ public enum RestBusinessCode {
missing_args (1002,"缺少必要的参数信息"),
/**
+ * 已接收,响应成功
+ */
+ syncStatusSuccessed(2021000,"已接收,响应成功"),
+
+ /**
+ * 已接收,响应失败(原因可能正在忙,指令不规范等)
+ */
+ syncStatusFailed(2021010,"已接收,响应失败(原因可能正在忙,指令不规范等)"),
+
+ /**
* 操作行为错误,1-插入2-更新 3-删除4-查询 ,插入时选择了删除这种错误返回该异常代码
*/
op_action_error (4002000,"不正确的操作行为"),
diff --git a/src/main/java/com/nis/util/Constants.java b/src/main/java/com/nis/util/Constants.java
index 193ac00..a9c343d 100644
--- a/src/main/java/com/nis/util/Constants.java
+++ b/src/main/java/com/nis/util/Constants.java
@@ -223,9 +223,13 @@ public final class Constants {
/**
- * redis分布式锁超时时间,默认五分钟,3000秒
+ * redis分布式锁超时时间,默认五分钟,300秒
*/
- public static final Long REDISLOCKTIME=Configurations.getLongProperty("redisLockTime", 3000);
+ public static final Long REDISLOCKTIME=Configurations.getLongProperty("redisLockTime", 300);
+ /**
+ * 全量数据同步分布式锁时间,默认两个小时
+ */
+ public static final Long CONFIGSYNCLOCKTIME=Configurations.getLongProperty("configSyncLockTime", 7200);
/**
* 获取redis分布式锁失败后的尝试获取锁的次数,每次失败暂停一秒钟后再次尝试
*/
diff --git a/src/main/java/com/nis/util/JedisClusterUtils.java b/src/main/java/com/nis/util/JedisClusterUtils.java
new file mode 100644
index 0000000..e5d1044
--- /dev/null
+++ b/src/main/java/com/nis/util/JedisClusterUtils.java
@@ -0,0 +1,361 @@
+package com.nis.util;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.nis.restful.RestBusinessCode;
+import com.nis.restful.ServiceRuntimeException;
+import com.nis.web.service.SpringContextHolder;
+
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisException;
+
+/**
+ * @ClassName: JedisClusterUtils
+ * @Description: redis-cluster库工具类
+ * @author (rkg)
+ * @date 2018年12月01日 上午10:31:00
+ * @version V1.0
+ */
+public class JedisClusterUtils {
+ private static Logger logger = LoggerFactory.getLogger(JedisClusterUtils.class);
+
+ /**
+ * 获取缓存
+ *
+ * @param key 键
+ * @return 值
+ */
+ public static String get(String key) {
+ String value = null;
+ JedisCluster jedis = null;
+ try {
+ jedis = getResource();
+ if (jedis.exists(key)) {
+ value = jedis.get(key);
+ value = StringUtils.isNotBlank(value) && !"nil".equalsIgnoreCase(value) ? value : null;
+ logger.debug("get {} = {}", key, value);
+ }
+ } catch (Exception e) {
+ throw new ServiceRuntimeException("从redis-cluster中获取" + key + "对应的值失败",
+ RestBusinessCode.KeyNotExistsInRedis.getValue());
+ } finally {
+ returnResource(jedis);
+ }
+ return value;
+ }
+
+ /**
+ * 获取缓存
+ *
+ * @param key 键
+ * @return 值
+ */
+ public static Object getObject(String key) {
+ Object value = null;
+ JedisCluster jedis = null;
+ try {
+ jedis = getResource();
+ if (jedis.exists(getBytesKey(key))) {
+ value = toObject(jedis.get(getBytesKey(key)));
+ logger.debug("getObject {} = {}", key, value);
+ }
+ } catch (Exception e) {
+ throw new ServiceRuntimeException("从redis-cluster中获取" + key + "对应的值失败",
+ RestBusinessCode.KeyNotExistsInRedis.getValue());
+ } finally {
+ returnResource(jedis);
+ }
+ return value;
+ }
+
+ /**
+ * <b>可以作为获取唯一id的方法</b><br/>
+ * 将key对应的value加上指定的值,只有value可以转为数字时该方法才可用
+ *
+ * @param String key
+ * @param long number 要减去的值
+ * @return long 相加后的值
+ */
+ public static long incrBy(String key, long number) {
+ JedisCluster jedis = getResource();
+ long len = jedis.incrBy(key, number);
+ returnResource(jedis);
+ return len;
+ }
+
+ /**
+ * 设置缓存
+ *
+ * @param key 键
+ * @param value 值
+ * @param cacheSeconds 超时时间,0为不超时
+ * @return
+ */
+ public static String set(String key, String value, int cacheSeconds) {
+ String result = null;
+ JedisCluster jedis = null;
+ try {
+ jedis = getResource();
+ result = jedis.set(key, value);
+ if (cacheSeconds != 0) {
+ jedis.expire(key, cacheSeconds);
+ }
+ logger.debug("set {} = {}", key, value);
+ } catch (Exception e) {
+ throw new ServiceRuntimeException("向redis-cluster中设置zset失败,key=" + key + ",value=" + value,
+ RestBusinessCode.ZsetFailed.getValue());
+ } finally {
+ returnResource(jedis);
+ }
+ return result;
+ }
+
+ /**
+ * 获取List缓存
+ *
+ * @param key 键
+ * @return 值
+ */
+ public static List<String> getList(String key) {
+ List<String> value = null;
+ JedisCluster jedis = null;
+ try {
+ jedis = getResource();
+ if (jedis.exists(key)) {
+ value = jedis.lrange(key, 0, -1);
+ logger.debug("getList {} = {}", key, value);
+ }
+ } catch (Exception e) {
+ throw new ServiceRuntimeException("从redis-cluster中获取" + key + "对应的值失败",
+ RestBusinessCode.KeyNotExistsInRedis.getValue());
+ } finally {
+ returnResource(jedis);
+ }
+ return value;
+ }
+
+ /**
+ * 获取List缓存
+ *
+ * @param key 键
+ * @return 值
+ */
+ public static List<Object> getObjectList(String key) {
+ List<Object> value = null;
+ JedisCluster jedis = null;
+ try {
+ jedis = getResource();
+ if (jedis.exists(getBytesKey(key))) {
+ List<byte[]> list = jedis.lrange(getBytesKey(key), 0, -1);
+ value = Lists.newArrayList();
+ for (byte[] bs : list) {
+ value.add(toObject(bs));
+ }
+ logger.debug("getObjectList {} = {}", key, value);
+ }
+ } catch (Exception e) {
+ throw new ServiceRuntimeException("从redis-cluster中获取" + key + "对应的值失败",
+ RestBusinessCode.KeyNotExistsInRedis.getValue());
+ } finally {
+ returnResource(jedis);
+ }
+ return value;
+ }
+
+ /**
+ * 缓存是否存在
+ *
+ * @param key 键
+ * @return
+ */
+ public static boolean exists(String key) {
+ boolean result = false;
+ JedisCluster jedis = null;
+ try {
+ jedis = getResource();
+ result = jedis.exists(key);
+ logger.debug("exists {}", key);
+ } catch (Exception e) {
+ throw new ServiceRuntimeException("从redis-cluster中判断" + key + "是否存在失败",
+ RestBusinessCode.ExistsKeyFailed.getValue());
+ } finally {
+ returnResource(jedis);
+ }
+ return result;
+ }
+
+ /**
+ * 缓存是否存在
+ *
+ * @param key 键
+ * @return
+ */
+ public static boolean existsObject(String key) {
+ boolean result = false;
+ JedisCluster jedis = null;
+ try {
+ jedis = getResource();
+ result = jedis.exists(getBytesKey(key));
+ logger.debug("existsObject {}", key);
+ } catch (Exception e) {
+ throw new ServiceRuntimeException("从redis-cluster中判断" + key + "是否存在失败",
+ RestBusinessCode.ExistsKeyFailed.getValue());
+ } finally {
+ returnResource(jedis);
+ }
+ return result;
+ }
+
+ /**
+ * 获取资源
+ *
+ * @return
+ * @throws JedisException
+ */
+ public static JedisCluster getResource() throws JedisException {
+ JedisCluster jedisCluster = SpringContextHolder.getBean(JedisCluster.class);
+ if (jedisCluster == null) {
+ throw new ServiceRuntimeException("无法获取redis-cluster连接,请联系管理员检查程序",
+ RestBusinessCode.CannotConnectionRedis.getValue());
+ }
+ return jedisCluster;
+
+ }
+
+ /**
+ * 释放资源
+ *
+ * @param jedis
+ * @param isBroken
+ */
+ public static void returnBrokenResource(JedisCluster jedis) {
+ if (jedis != null) {
+ try {
+ jedis.close();
+ } catch (Exception e) {
+ throw new ServiceRuntimeException("释放redis-cluster连接失败", RestBusinessCode.CannotConnectionRedis.getValue());
+ }
+ }
+ }
+
+ /**
+ * 释放资源
+ *
+ * @param jedis
+ * @param isBroken
+ */
+ public static void returnResource(JedisCluster jedis) {
+ if (jedis != null) {
+ try {
+ jedis.close();
+ } catch (Exception e) {
+ throw new ServiceRuntimeException("释放redis-cluster连接失败", RestBusinessCode.CannotConnectionRedis.getValue());
+ }
+ }
+ }
+
+ /**
+ * 获取byte[]类型Key
+ *
+ * @param key
+ * @return
+ */
+ public static byte[] getBytesKey(Object object) {
+ if (object instanceof String) {
+ return StringUtils.getBytes((String) object);
+ } else {
+ return ObjectUtils.serialize(object);
+ }
+ }
+
+ /**
+ * Object转换byte[]类型
+ *
+ * @param key
+ * @return
+ */
+ public static byte[] toBytes(Object object) {
+ return ObjectUtils.serialize(object);
+ }
+
+ /**
+ * byte[]型转换Object
+ *
+ * @param key
+ * @return
+ */
+ public static Object toObject(byte[] bytes) {
+ return ObjectUtils.unserialize(bytes);
+ }
+
+ // 设置成功返回的结果OK
+ private static final String LOCK_SUCCESS = "OK";
+ // NX -- Only set the key if it does not already exist. XX -- Only set the key
+ // if it already exist
+ private static final String SET_IF_NOT_EXIST = "NX";
+ // 失效单位秒(EX)还是毫秒(PX)
+ private static final String SET_WITH_EXPIRE_TIME = "EX";
+ private static final Long UNLOCK_SUCCESS = 1L;
+
+ /**
+ * 尝试获取分布式锁,如果没有key就set,有key就不操作
+ *
+ * @param requestId 请求标识(UUID.randomUUID().toString()),正常情况下是谁加的锁,谁去解锁不能a加的锁,b去解锁
+ * @return 是否获取成功
+ */
+ public static Boolean lock(String requestId) {
+ String key = "uiAndServiceConfigSyncLock";
+ String var1 = getResource().set(key, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME,
+ Constants.REDISLOCKTIME);
+ if (LOCK_SUCCESS.equals(var1)) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * 解锁操作
+ *
+ * @param value 客户端标识(requestId)
+ * @return
+ */
+ public static Boolean unLock(String value) {
+ String key = "uiAndServiceConfigSyncLock";
+ // 这个字符串是个lua脚本,代表的意思是如果根据key拿到的value跟传入的value相同就执行del,否则就返回0【保证安全性】
+ String luaScript = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then return redis.call(\"del\",KEYS[1]) else return 0 end";
+ // 这个命令就是去执行lua脚本,KEYS的集合就是第二个参数,ARGV的集合就是第三参数【保证解锁的原子操作】
+ Object var2 = getResource().eval(luaScript, Collections.singletonList(key), Collections.singletonList(value));
+ if (UNLOCK_SUCCESS == var2) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * 重试机制
+ *
+ * @param value 客户端标识
+ * @return
+ */
+ public static Boolean lockRetry(String value) {
+ Boolean flag = false;
+ try {
+ for (int i = 0; i < Constants.REDISRETRYNUM; i++) {
+ flag = lock(value);
+ if (flag) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ } catch (Exception e) {
+ logger.error("尝试获取redis分布式锁失败,失败原因:{}", ExceptionUtil.getExceptionMsg(e));
+ }
+ return flag;
+ }
+
+}
diff --git a/src/main/java/com/nis/util/ServiceAndRDBIndexReal.java b/src/main/java/com/nis/util/ServiceAndRDBIndexReal.java
index 64104ce..97b8269 100644
--- a/src/main/java/com/nis/util/ServiceAndRDBIndexReal.java
+++ b/src/main/java/com/nis/util/ServiceAndRDBIndexReal.java
@@ -337,7 +337,7 @@ public class ServiceAndRDBIndexReal {
return tableList.get(index);
} else {
logger.error("未从业务类型和表对应关系中,找到业务类型:{},配置类型:{}表名:{}对应的真实表名", service, type, tableName);
- throw new ServiceRuntimeException("无法从applicationConfig-rule.properties配置文件中,找到回调类配置service为"
+ throw new ServiceRuntimeException("无法从applicationConfig-rule.properties配置文件中,找到service为"
+ service + ",配置类型:" + type + "对应的真实表名", RestBusinessCode.NotFoundTableName.getValue());
}
diff --git a/src/main/java/com/nis/web/controller/BaseRestController.java b/src/main/java/com/nis/web/controller/BaseRestController.java
index 850857c..da284dc 100644
--- a/src/main/java/com/nis/web/controller/BaseRestController.java
+++ b/src/main/java/com/nis/web/controller/BaseRestController.java
@@ -211,7 +211,11 @@ public class BaseRestController {
restResult.setBusinessCode(RestBusinessCode.update_success);
} else if (requestMethod.equals(RequestMethod.POST.name())) {
restResult.setStatus(HttpStatus.CREATED);
- restResult.setBusinessCode(RestBusinessCode.add_success);
+ if (thread.getBusinessCode()!=0) {
+ restResult.setBusinessCode(RestBusinessCode.valueOf(thread.getBusinessCode()));
+ }else{
+ restResult.setBusinessCode(RestBusinessCode.add_success);
+ }
} else if (requestMethod.equals(RequestMethod.PATCH.name())) {
restResult.setStatus(HttpStatus.CREATED);
restResult.setBusinessCode(RestBusinessCode.update_success);
diff --git a/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java b/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java
index 83ae455..a29f65b 100644
--- a/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java
+++ b/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java
@@ -10,19 +10,19 @@ import java.util.UUID;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import com.zdjizhi.utils.StringUtil;
+import net.sf.json.JSONObject;
+
import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
-import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
import com.nis.domain.restful.ConfigCompile;
-import com.nis.domain.restful.ConfigCompileStartStop;
import com.nis.domain.restful.ConfigSource;
-import com.nis.domain.restful.ConfigSourceStartStop;
import com.nis.domain.restful.FileDesc;
import com.nis.domain.restful.GroupReuse;
import com.nis.domain.restful.GroupReuseSource;
@@ -30,7 +30,6 @@ import com.nis.restful.RestBusinessCode;
import com.nis.restful.RestServiceException;
import com.nis.restful.ServiceRuntimeException;
import com.nis.util.Constants;
-import com.nis.util.ExceptionUtil;
import com.nis.util.FileUtils;
import com.nis.util.MinioUtil;
import com.nis.util.StringUtils;
@@ -44,9 +43,6 @@ import com.nis.web.service.restful.ConfigSourcesService;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
-import com.zdjizhi.utils.StringUtil;
-
-import net.sf.json.JSONObject;
/**
* @ClassName: ConfigSourcesController
@@ -141,8 +137,8 @@ public class ConfigSourcesController extends BaseRestController {
configSourcesService.updateConfigSources(thread, start, configSourceStartStop.getConfigCompileStartStopList(),
configSourceStartStop.getOpTime(), sb,true);
-
-
+
+
}
} catch (Exception e) {
thread.setExceptionInfo("Maat 规则停启用异常:" + ExceptionUtil.getExceptionMsg(e));
@@ -621,6 +617,7 @@ public class ConfigSourcesController extends BaseRestController {
"MAAT规则分组复用域配置删除成功" + sb.toString(), Constants.IS_DEBUG ? groupReuseSource : null);
}
+
@RequestMapping(value = "/cfg/v1/getAllKVByCompileId", method = RequestMethod.GET)
@ApiOperation(value = "根据配置id获取对应的编译,组,域等信息", httpMethod = "GET", response = Map.class, notes = "根据配置id获取对应的编译,组,域等信息")
@ApiParam(value = "配置id", name = "getAllKVByCompileId", required = true)
@@ -642,6 +639,149 @@ public class ConfigSourcesController extends BaseRestController {
"根据配置id获取对应的编译,组,域等信息成功", map);
}
+ @RequestMapping(value = "/cfg_batch/v1/configSources", method = RequestMethod.POST, produces = org.springframework.http.MediaType.APPLICATION_JSON_VALUE)
+ @ApiOperation(value = "全量同步配置接收接口", httpMethod = "POST", response = Map.class, notes = "接收全量同步配置")
+ public Map receiveConfigSources(@RequestBody String jsonString, HttpServletRequest request,
+ HttpServletResponse response) {
+ long start = System.currentTimeMillis();
+ AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, null);
+ thread.setContent("全量同步不记录请求内容");
+ String allConfigSyncStatus = configSourcesService.getAllConfigSyncStatus();
+ if (allConfigSyncStatus != null) {
+ if (!allConfigSyncStatus.equals("0")) {
+ throw new RestServiceException("后台同步指令为" + allConfigSyncStatus + ",请先下发配置同步指令,或等待后台数据同步进程完成后再次下发配置同步指令",
+ RestBusinessCode.config_integrity_error.getValue());
+ }
+ } else {
+ throw new RestServiceException("没有获取到同步请求标志,请先下发配置同步指令",
+ RestBusinessCode.config_integrity_error.getValue());
+ }
+ try {
+ String configType = request.getHeader("Config-Type");
+ String serviceId = request.getHeader("Service-Id");
+ String configTable = request.getHeader("Config-Table");
+ String lastCompletedTag = request.getHeader("Last-Completed-Tag");
+// logger.info(new Date() + "-----------接收到json格式数据:" + jsonString + "-------:");
+ if (StringUtil.isEmpty(serviceId)) {
+ logger.error("未在请求头中获取到serviceId");
+ throw new RestServiceException(
+ RestBusinessCode.config_integrity_error.getErrorReason() + "," + "未在请求头中获取到serviceId",
+ RestBusinessCode.config_integrity_error.getValue());
+ }
+ if (StringUtil.isEmpty(configType)) {
+ logger.error("未在请求头中获取到Config-Type");
+ throw new RestServiceException(
+ RestBusinessCode.config_integrity_error.getErrorReason() + "," + "未在请求头中获取到Config-Type",
+ RestBusinessCode.config_integrity_error.getValue());
+ } else {
+ if (!("1".equals(configType) || "2".equals(configType))) {
+ logger.error("Config-Type的值只能是1(maat)和2(回调)");
+ throw new RestServiceException(
+ RestBusinessCode.config_integrity_error.getErrorReason() + ","
+ + "Config-Type的值只能是1(maat)和2(回调)",
+ RestBusinessCode.config_integrity_error.getValue());
+
+ }
+ }
+ logger.info("-----------开始存储到json格式数据------->>configType:" + configType + ",serviceId:" + serviceId
+ + ",configTable:" + configTable + ",lastCompletedTag:" + lastCompletedTag);
+
+ if (jsonString != null && !jsonString.trim().equals("{}")) {// 如果最后的service没有配置,不论是maat类还是回调类配置,都会传{}+lastCompletedTag(finished)来结束数据传输
+ String key = null;
+ if ("1".equals(configType)) {
+ key = "MAAT";
+ } else {
+ key = "UNMAAT";
+ }
+ key = key + "-" + serviceId + "-" + UUID.randomUUID();
+ configSourcesService.setRedisClusterKey(key, jsonString);
+ configSourcesService.setAllServiceKey(key);
+ }
+ if (!StringUtil.isEmpty(lastCompletedTag) && lastCompletedTag.trim().toLowerCase().equals("finished")) {
+ // 设置配置同步状态为接收配置完成
+ configSourcesService.setAllConfigSyncStatus("1");
+ logger.info("接收全量同步配置:FINISHED");
+ }
+ } catch (Exception e) {
+ // TODO: handle exception
+ logger.error(e.getMessage());
+ throw new RestServiceException(
+ RestBusinessCode.config_integrity_error.getErrorReason() + "," + e.getMessage(),
+ RestBusinessCode.config_integrity_error.getValue());
+ }
+
+ return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "已接收全量同步配置信息",
+ Constants.IS_DEBUG ? jsonString : null);
+ }
+
+ @RequestMapping(value = "/cfg_batch/v1/status", method = RequestMethod.POST, produces = org.springframework.http.MediaType.APPLICATION_JSON_VALUE)
+ @ApiOperation(value = "配置全量同步指令下发接口", httpMethod = "POST", response = Map.class, notes = "下发配置同步指令")
+ public Map acceptStatus(@RequestBody String jsonString, HttpServletRequest request, HttpServletResponse response) {
+ long start = System.currentTimeMillis();
+ AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, null);
+
+ JSONObject obj = JSONObject.fromObject(jsonString);
+ if (!StringUtil.isEmpty(obj.get("syncStatus"))) {
+ String allConfigSyncStatus = configSourcesService.getAllConfigSyncStatus();
+ if ("1".equals(obj.get("syncStatus").toString())) {
+ if (allConfigSyncStatus != null) {
+ if (allConfigSyncStatus.equals("0") || allConfigSyncStatus.equals("1")
+ || allConfigSyncStatus.equals("2")) {
+ throw new RestServiceException("已经下发过配置同步指令,请等待后台数据同步进程完成后再次下发配置同步指令",
+ RestBusinessCode.config_integrity_error.getValue());
+ }
+ }
+ logger.info("-----------配置同步指令下发:" + new Date());
+ // 设置配置同步状态为开始
+ //在下次开始同步之前把上次记录的key删除
+ if (JedisClusterUtils.exists("allConfigSyncKey")) {
+ JedisClusterUtils.getResource().del("allConfigSyncKey");
+ }
+ configSourcesService.setAllConfigSyncStatus("0");
+ } else if ("0".equals(obj.get("syncStatus").toString())) {// 取消同步指令
+ if (allConfigSyncStatus != null
+ && (allConfigSyncStatus.equals("0") || allConfigSyncStatus.equals("1"))) {// 只有在没有完全接收配置之前可以取消,否则不允许取消,因为接收完配置之后会把redis清空,所以这个时候不允许取消了
+ // 设置配置同步状态为完成,并把之前记录的key删除
+ configSourcesService.setAllConfigSyncStatus("3");
+ if (JedisClusterUtils.exists("allConfigSyncKey")) {
+ JedisClusterUtils.getResource().del("allConfigSyncKey");
+ }
+ } else {
+ throw new RestServiceException(
+ "配置同步指令状态为" + allConfigSyncStatus + ",不可执行取消操作了请等待后台数据同步进程完成后再次下发配置同步指令",
+ RestBusinessCode.config_integrity_error.getValue());
+ }
+ }
+ } else {
+ logger.error("未获取到同步状态码");
+ thread.setBusinessCode(RestBusinessCode.syncStatusFailed.getValue());
+ throw new RestServiceException("未获取到同步状态码", RestBusinessCode.syncStatusFailed.getValue());
+ }
+ thread.setBusinessCode(RestBusinessCode.syncStatusSuccessed.getValue());
+ return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "配置同步指令下发成功",
+ Constants.IS_DEBUG ? jsonString : null);
+ }
+
+ @RequestMapping(value = "/cfg_batch/v1/status", method = RequestMethod.GET)
+ @ApiOperation(value = "获取全量同步状态服务接口", httpMethod = "GET", response = Map.class, notes = "获取全量同步状态服务")
+ public Map getSyncStatus(HttpServletRequest request, HttpServletResponse response) {
+ long start = System.currentTimeMillis();
+ AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_GET, request, null);
+ List<JSONObject> list = new ArrayList<JSONObject>();
+ JSONObject obj = new JSONObject();
+ obj.put("service", "ntc");
+ String allConfigSyncStatus = configSourcesService.getAllConfigSyncStatus();
+ if (allConfigSyncStatus == null || allConfigSyncStatus.trim().equals("")) {
+ obj.put("status", "-1");
+ } else {
+ obj.put("status", configSourcesService.getAllConfigSyncStatus());
+ }
+ obj.put("opTime", "2018-11-23 08:31:27");
+ list.add(obj);
+ return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "获取全量同步状态成功",
+ list);
+ }
+
private void validateGroupReuseSource(AuditLogThread thread, long start, GroupReuseSource groupReuseSource) {
String errorInfo = "";
@@ -676,4 +816,16 @@ public class ConfigSourcesController extends BaseRestController {
return false;
}
+ @RequestMapping(value = "/cfg/v1/getConfigCount", method = RequestMethod.GET)
+ @ApiOperation(value = "获取有效无效的配置个数", httpMethod = "GET", response = Map.class, notes = "获取有效无效的配置个数")
+ public Map getConfigCount(HttpServletRequest request, HttpServletResponse response) {
+ long start = System.currentTimeMillis();
+ AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_GET, request, null);
+ Map<String, Integer> allConfigByScan = configSourcesService.getAllConfigByScan();
+ allConfigByScan.putAll(configSourcesService.getAllConfig());
+ return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "获取有效无效的配置个数成功",
+// configSourcesService.getAllConfig());
+ allConfigByScan);
+ }
+
}
diff --git a/src/main/java/com/nis/web/service/restful/ConfigSourcesService.java b/src/main/java/com/nis/web/service/restful/ConfigSourcesService.java
index 1abecf9..d4a3045 100644
--- a/src/main/java/com/nis/web/service/restful/ConfigSourcesService.java
+++ b/src/main/java/com/nis/web/service/restful/ConfigSourcesService.java
@@ -11,9 +11,14 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import net.sf.json.JSONObject;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -22,7 +27,6 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.nis.domain.restful.CommonSourceFieldCfg;
import com.nis.domain.restful.ConfigCompile;
-import com.nis.domain.restful.ConfigCompileStartStop;
import com.nis.domain.restful.ConfigGroupRelation;
import com.nis.domain.restful.DigestRegion;
import com.nis.domain.restful.GroupReuse;
@@ -41,11 +45,9 @@ import com.nis.util.GroupReuseVal;
import com.nis.util.JsonMapper;
import com.nis.util.ReadCommSourceXmlUtil;
import com.nis.util.ServiceAndRDBIndexReal;
+import com.zdjizhi.utils.StringUtil;
import com.nis.web.service.AuditLogThread;
import com.nis.web.service.BaseService;
-import com.zdjizhi.utils.StringUtil;
-
-import net.sf.json.JSONObject;
/**
* @ClassName: ControlService
@@ -122,11 +124,14 @@ public class ConfigSourcesService extends BaseService {
*/
public void saveMaatConfig(AuditLogThread thread, long start, List<ConfigCompile> configCompileList,
StringBuffer sb) throws Exception {
-
+
long currentTimeMillis = System.currentTimeMillis();
Map<Integer, List<MaatConfig>> maatMap = new HashMap<Integer, List<MaatConfig>>();
Map<Integer, List<MaatConfig>> configMap = new HashMap<Integer, List<MaatConfig>>();
+ if (configCompileList != null && configCompileList.size() > Constants.MAX_LIST_SIZE) {
+ thread.setSaveContentFlag(false);
+ }
for (ConfigCompile configCompile : configCompileList) {
Integer service = Integer.valueOf(configCompile.getService().toString());
MaatConfig maatConfig = new MaatConfig();
@@ -209,6 +214,7 @@ public class ConfigSourcesService extends BaseService {
}
}
maatConfig.setIpClientRangeMapList(dstMaplList);
+
if (maatMap.containsKey(service)) {
maatMap.get(service).add(maatConfig);
} else {
@@ -227,7 +233,6 @@ public class ConfigSourcesService extends BaseService {
List<Integer> dbIndexList = ServiceAndRDBIndexReal.getRedisDBByService(service);
if (!StringUtil.isEmpty(dbIndexList) && dbIndexList.size() > 0) {
for (Integer dbIndex : dbIndexList) {
-
// 分发到阀门有些业务需要添加编译属性到域配置
List<MaatConfig> newMaatConfigList = new ArrayList<MaatConfig>();
newMaatConfigList.addAll(maatMap.get(service));
@@ -255,7 +260,7 @@ public class ConfigSourcesService extends BaseService {
}
}
logger.info("---------------调用Redis maat配置新增接口---------------------");
-
+
long end = System.currentTimeMillis();
logger.warn("执行ConfigSourcesService.saveMaatConfig用时{}毫秒",end-currentTimeMillis);
configRedisService.saveMaatConfig(configMap);
@@ -263,7 +268,7 @@ public class ConfigSourcesService extends BaseService {
/**
* 添加额外属性到阀门或者webfocus
- *
+ *
* @param maatToValueMap
*/
private void addFieldToValveOrWebFocus(Integer service, List<MaatConfig> newMaatConfigList,
@@ -315,6 +320,184 @@ public class ConfigSourcesService extends BaseService {
}
}
}
+ /**
+ * @Description:
+ * @author(zdx) @date 2018年12月3日 下午6:48:32
+ * @param configCompileList
+ * @throws Exception
+ */
+
+ public void saveMaatConfig(List<ConfigCompile> configCompileList) throws Exception {
+ Map<Integer, List<MaatConfig>> maatMap = new HashMap<Integer, List<MaatConfig>>();
+ Map<Integer, List<MaatConfig>> configMap = new HashMap<Integer, List<MaatConfig>>();
+
+ for (ConfigCompile configCompile : configCompileList) {
+ Integer service = Integer.valueOf(configCompile.getService().toString());
+ MaatConfig maatConfig = new MaatConfig();
+
+ maatConfig.setService(service);
+ // 编译
+ maatConfig.setCompileMap(convertObjectToMap(configCompile, ConfigCompile.class));
+ // 分组
+ List<Map<String, String>> dstMaplList = null;
+ if (!StringUtil.isEmpty(configCompile.getGroupRelationList())) {
+ dstMaplList = new ArrayList<Map<String, String>>();
+ for (ConfigGroupRelation group : configCompile.getGroupRelationList()) {
+ dstMaplList.add(convertObjectToMap(group, ConfigGroupRelation.class));
+ }
+ }
+ maatConfig.setGroupMapList(dstMaplList);
+ // 字符串域
+ dstMaplList = null;
+ List<Map<String, String>> strongMapList = null;
+ if (!StringUtil.isEmpty(configCompile.getStrRegionList())) {
+ dstMaplList = new ArrayList<Map<String, String>>();
+ for (StrRegion region : configCompile.getStrRegionList()) {
+ if (StringUtil.isEmpty(region.getDistrict())) {
+ dstMaplList.add(convertObjectToMap(region, StrRegion.class));
+ } else {
+ if (StringUtil.isEmpty(strongMapList)) {
+ strongMapList = new ArrayList<Map<String, String>>();
+ }
+ strongMapList.add(convertObjectToMap(region, StrRegion.class));
+ }
+ }
+ }
+ maatConfig.setStrRegionMapList(dstMaplList);
+ // 增强字符串域
+ if (!StringUtil.isEmpty(strongMapList) && strongMapList.size() > 0) {
+ maatConfig.setStrStrRegionMapList((strongMapList));
+ }
+ // 数值域
+ dstMaplList = null;
+ if (!StringUtil.isEmpty(configCompile.getNumRegionList())) {
+ dstMaplList = new ArrayList<Map<String, String>>();
+ for (NumRegion region : configCompile.getNumRegionList()) {
+ dstMaplList.add(convertObjectToMap(region, NumRegion.class));
+ }
+ }
+ maatConfig.setNumRegionMapList(dstMaplList);
+
+ // Ip域
+ dstMaplList = null;
+ if (!StringUtil.isEmpty(configCompile.getIpRegionList())) {
+ dstMaplList = new ArrayList<Map<String, String>>();
+ for (IpRegion region : configCompile.getIpRegionList()) {
+ dstMaplList.add(convertObjectToMap(region, IpRegion.class));
+ }
+ }
+ maatConfig.setIpRegionMapList(dstMaplList);
+
+ // 摘要类域
+ dstMaplList = null;
+ if (!StringUtil.isEmpty(configCompile.getDigestRegionList())) {
+ dstMaplList = new ArrayList<Map<String, String>>();
+ for (DigestRegion region : configCompile.getDigestRegionList()) {
+ dstMaplList.add(convertObjectToMap(region, DigestRegion.class));
+ }
+ }
+
+ maatConfig.setFileDigestRegionMapList(dstMaplList);
+
+ // 文本相似性域
+ // dstMaplList = null;
+ // maatConfig.setFileLikeRegionMapList(dstMaplList);
+
+ // 生效范围IP域
+ dstMaplList = null;
+ if (!StringUtil.isEmpty(configCompile.getIpClientRangeList())) {
+ dstMaplList = new ArrayList<Map<String, String>>();
+ for (IpRegion region : configCompile.getIpClientRangeList()) {
+ dstMaplList.add(convertObjectToMap(region, IpRegion.class));
+ }
+ }
+ maatConfig.setIpClientRangeMapList(dstMaplList);
+
+ if (maatMap.containsKey(service)) {
+ maatMap.get(service).add(maatConfig);
+ } else {
+ List<MaatConfig> maatCfgList = new ArrayList<MaatConfig>();
+ maatCfgList.add(maatConfig);
+ maatMap.put(service, maatCfgList);
+
+ }
+ }
+
+ // 调用接口入redis
+
+ Iterator serviceIterator = maatMap.keySet().iterator();
+ while (serviceIterator.hasNext()) {
+ Integer service = Integer.valueOf(serviceIterator.next().toString());
+ List<Integer> dbIndexList = ServiceAndRDBIndexReal.getRedisDBByService(service);
+ if (!StringUtil.isEmpty(dbIndexList) && dbIndexList.size() > 0) {
+ for (Integer dbIndex : dbIndexList) {
+ // 分发到阀门有些业务需要添加编译属性到域配置
+ List<MaatConfig> newMaatConfigList = new ArrayList<MaatConfig>();
+ newMaatConfigList.addAll(maatMap.get(service));
+ if (dbIndex.intValue() == ServiceAndRDBIndexReal.getValveDBIndex().intValue()) {
+ Map<Integer, Map<String, String[]>> maatToValueMap = ServiceAndRDBIndexReal.getMaatToValveMap();
+ if (maatToValueMap.containsKey(service)) {
+
+ Map<String, String[]> regionAndFiledMap = maatToValueMap.get(service);
+ for (int i = 0; i < newMaatConfigList.size(); i++) {
+ MaatConfig maatConfig = newMaatConfigList.get(i);
+ MaatConfig newMaatConfig = (MaatConfig) JsonMapper
+ .fromJsonString(JsonMapper.toJsonString(maatConfig), MaatConfig.class);
+ Iterator iterator = regionAndFiledMap.keySet().iterator();
+ while (iterator.hasNext()) {
+ String regionName = iterator.next().toString();
+ PropertyDescriptor pd;
+ try {
+ pd = new PropertyDescriptor(regionName + "MapList", MaatConfig.class);
+ Method method = pd.getReadMethod();
+ Object object = method.invoke(newMaatConfig);
+
+ if (object != null) {
+
+ List<Map<String, String>> listMaps = new ArrayList<Map<String, String>>();
+ listMaps.addAll((List<Map<String, String>>) object);
+ String[] fields = regionAndFiledMap.get(regionName);
+ for (String fieldName : fields) {
+ String value = newMaatConfig.getCompileMap()
+ .get(fieldName.toLowerCase());
+ if (!StringUtil.isEmpty(value)) {
+ for (Map<String, String> map : listMaps) {
+ map.put(fieldName.toLowerCase(), value);
+ }
+ }
+ }
+ method = pd.getWriteMethod();
+ method.invoke(newMaatConfig, listMaps);
+ }
+ newMaatConfigList.set(i, newMaatConfig);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ throw new RestServiceException("未找到域列表,请检查配置文件中域类型是否正确!:" + e.getMessage(),
+ RestBusinessCode.service_runtime_error.getValue());
+ }
+
+ }
+ }
+ }
+ }
+
+ if (configMap.containsKey(dbIndex)) {
+ configMap.get(dbIndex).addAll(newMaatConfigList);
+ } else {
+ List<MaatConfig> list = new ArrayList<MaatConfig>();
+ list.addAll(newMaatConfigList);
+ configMap.put(dbIndex, list);
+ }
+
+ }
+ } else {
+ throw new ServiceRuntimeException("service为"+service+"的业务写入数据库序号映射关系不存在",
+ RestBusinessCode.ServiceNoFoundDBIndex.getValue());
+ }
+ }
+ logger.info("---------------调用Redis maat配置新增接口---------------------");
+ configRedisService.saveMaatConfig(configMap);
+ }
private Map<String, String> convertObjectToMap(Object obj, Class clazz) throws Exception {
Map<String, String> dstMap = new HashMap<String, String>();
@@ -346,6 +529,9 @@ public class ConfigSourcesService extends BaseService {
Map<String, String> validIdMap = new HashMap<String, String>();
Map<Integer, List<Long>> compileMap = new HashMap<Integer, List<Long>>();
+ if (compileList != null && compileList.size() > Constants.MAX_LIST_SIZE) {
+ thread.setSaveContentFlag(false);
+ }
if (null != compileList && compileList.size() > 0) {
for (ConfigCompile config : compileList) {
checkCompileOptForUpdate(config, isConfigStartStop);
@@ -353,6 +539,7 @@ public class ConfigSourcesService extends BaseService {
if (config.getOpTime() == null) {
config.setOpTime(opTime);
}
+
// compileAllList.add(config);
if (compileMap.containsKey(config.getService())) {
compileMap.get(config.getService()).add(config.getCompileId());
@@ -510,7 +697,7 @@ public class ConfigSourcesService extends BaseService {
maatTableName.substring(maatTableName.lastIndexOf("_") + 1));
dstStr = dstPath.replace("{fileName}", dstStr.substring(dstStr.lastIndexOf("/") + 1));
} else if ("file_id".equals(commonSourceFieldCfg.getDstName())) {
- // dstStr = dstStr.substring(dstStr.indexOf("group"));
+ //dstStr = dstStr.substring(dstStr.indexOf("group"));
}
}
switch (commonSourceFieldCfg.getFieldType()) {
@@ -637,6 +824,98 @@ public class ConfigSourcesService extends BaseService {
configRedisService.saveUnMaatConfig(configMap);
}
+
+ public void saveCommonSources(String jsonString) throws Exception {
+ JsonArray jsonObjectList = null;
+ try {
+ jsonObjectList = new JsonParser().parse(jsonString).getAsJsonArray();
+ } catch (Exception e) {
+ // TODO: handle exception
+ throw new RestServiceException(RestBusinessCode.CBParamFormateError.getErrorReason() + "," + e.getMessage(),
+ RestBusinessCode.CBParamFormateError.getValue());
+ }
+ Map<Integer, List<Map<String, String>>> dstMaps = new HashMap<Integer, List<Map<String, String>>>();
+ for (int i = 0; i < jsonObjectList.size(); i++) {
+ JsonObject jsonObj = (JsonObject) jsonObjectList.get(i);
+ Map<String, Object> srcMap = JSONObject.fromObject(JSONObject.fromObject((jsonObj.toString())));
+ if (srcMap.containsKey("service")) {
+ Map<String, String> dstMap = new HashMap<String, String>();
+ List<CommonSourceFieldCfg> commonSourceFieldCfgList = ReadCommSourceXmlUtil
+ .getCommonSourceCfgByService(srcMap.get("service").toString().trim());
+
+ // 获取IP类型
+ Integer ipType = null;
+ String ipTypeName = "";
+
+ for (CommonSourceFieldCfg commonSourceFieldCfg : commonSourceFieldCfgList) {
+ if (commonSourceFieldCfg.getDstName().equals("addr_type")) {
+ String dstVal = srcMap.get(commonSourceFieldCfg.getSrcName()).toString();
+ ipType = Integer.parseInt(dstVal);
+ }
+ }
+ if (ipType == null) {
+ ipType = 4;
+ }
+ for (CommonSourceFieldCfg commonSourceFieldCfg : commonSourceFieldCfgList) {
+ // 字段类型 String Number Date Ip Port
+ String dstStr = StringUtil.isEmpty(srcMap.get(commonSourceFieldCfg.getSrcName()))
+ ? commonSourceFieldCfg.getDefaultVal()
+ : srcMap.get(commonSourceFieldCfg.getSrcName()).toString();
+ if (!StringUtil.isEmpty(dstStr) && dstStr.startsWith("[") && dstStr.endsWith("]")) {
+ dstStr = srcMap.get(dstStr.substring(1, dstStr.length() - 1)).toString();
+ }
+
+ if ("dstFile".equals(commonSourceFieldCfg.getSrcName())) {
+ if ("dst_file".equals(commonSourceFieldCfg.getDstName())) {
+ String maatTableName = ServiceAndRDBIndexReal
+ .getUnMaatTableName(Integer.valueOf(srcMap.get("service").toString().trim()));
+ String dstPath = Constants.MM_SAMPLE_DST_PATH.replace("{tableType}",
+ maatTableName.substring(maatTableName.lastIndexOf("_") + 1));
+ dstStr = dstPath.replace("{fileName}", dstStr.substring(dstStr.lastIndexOf("/") + 1));
+ } else if ("file_id".equals(commonSourceFieldCfg.getDstName())) {
+ // dstStr = dstStr.substring(dstStr.indexOf("group"));
+ }
+ }
+ dstMap.put(commonSourceFieldCfg.getDstName(), dstStr);
+ }
+ if (StringUtil.isEmpty(dstMaps.get(Integer.valueOf(srcMap.get("service").toString())))) {
+ List<Map<String, String>> list = new ArrayList<Map<String, String>>();
+ list.add(dstMap);
+ dstMaps.put(Integer.valueOf(srcMap.get("service").toString()), list);
+ } else {
+ List<Map<String, String>> list = dstMaps.get(Integer.valueOf(srcMap.get("service").toString()));
+ list.add(dstMap);
+
+ }
+ }
+ }
+ logger.info("------------------调用非maat配置新增接口-------------------");
+ // 按service分库
+ Map<Integer, List<Map<String, String>>> configMap = new HashMap<Integer, List<Map<String, String>>>();
+ Iterator serviceIterator = dstMaps.keySet().iterator();
+ while (serviceIterator.hasNext()) {
+ Integer service = Integer.valueOf(serviceIterator.next().toString());
+ List<Integer> dbIndexList = ServiceAndRDBIndexReal.getRedisDBByService(service);
+ if (!StringUtil.isEmpty(dbIndexList) && dbIndexList.size() > 0) {
+ for (Integer dbIndex : dbIndexList) {
+ if (configMap.containsKey(dbIndex)) {
+ configMap.get(dbIndex).addAll(dstMaps.get(service));
+ } else {
+ List<Map<String, String>> list = new ArrayList<Map<String, String>>();
+ list.addAll(dstMaps.get(service));
+ configMap.put(dbIndex, list);
+ }
+ }
+ } else {
+ throw new ServiceRuntimeException("service为"+service+"的业务写入数据库序号映射关系不存在",
+ RestBusinessCode.ServiceNoFoundDBIndex.getValue());
+ }
+
+ }
+ configRedisService.saveUnMaatConfig(configMap);
+ }
+
+
/**
*
* @Description:回调类配置状态更新(停/启用)
@@ -659,8 +938,8 @@ public class ConfigSourcesService extends BaseService {
throw new RestServiceException(RestBusinessCode.CBParamFormateError.getErrorReason() + "," + e.getMessage(),
RestBusinessCode.CBParamFormateError.getValue());
}
- if (jsonObjectList != null && jsonObjectList.size() > Constants.MAX_LIST_SIZE) {
- thread.setContent("批量提交数量超过" + Constants.MAX_LIST_SIZE + "条,不记录请求内容");
+ if (jsonObjectList!=null&&jsonObjectList.size()>Constants.MAX_LIST_SIZE) {
+ thread.setSaveContentFlag(false);
}
// <service,cfgIdList>
Map<Integer, List<Long>> cfgMap = new HashMap<Integer, List<Long>>();
@@ -893,11 +1172,18 @@ public class ConfigSourcesService extends BaseService {
} else {
maatConfig.getIpRegionMapList().addAll(dstMapList);
}
-
- // maatConfig.setService(groupReuse.getService());
+
+ if ((maatConfig.getStrRegionMapList()!=null&&maatConfig.getStrRegionMapList().size()>Constants.MAX_LIST_SIZE)
+ ||(maatConfig.getStrStrRegionMapList()!=null&&maatConfig.getStrStrRegionMapList().size()>Constants.MAX_LIST_SIZE)
+ ||(maatConfig.getIpRegionMapList()!=null&&maatConfig.getIpRegionMapList().size()>Constants.MAX_LIST_SIZE)
+ ||(maatConfig.getNumRegionMapList()!=null&&maatConfig.getNumRegionMapList().size()>Constants.MAX_LIST_SIZE)) {
+ thread.setSaveContentFlag(false);
+ }
+ //maatConfig.setService(groupReuse.getService());
list.add(maatConfig);
}
-
+
+
// 调用接口入redis
logger.info("---------------调用Redis 分组复用配置新增接口---------------------");
configRedisService.saveGroupReuseConfig(list);
@@ -1048,4 +1334,90 @@ public class ConfigSourcesService extends BaseService {
RestBusinessCode.ReuseRegionIsNull.getValue());
}
}
+
+ /**
+ * 设置配置全量同步状态
+ *
+ * @param value
+ */
+ public void setAllConfigSyncStatus(String value) {
+ JedisClusterUtils.set("allConfigSyncStatus", value, Constants.CONFIGSYNCLOCKTIME.intValue());
+ }
+
+ /**
+ * 获取配置全量同步状态
+ *
+ * @return
+ */
+ public String getAllConfigSyncStatus() {
+ return JedisClusterUtils.get("allConfigSyncStatus");
+ }
+
+ /**
+ * 将界面发过来的数据存储到rediscluster中
+ *
+ * @param key
+ * @param value
+ */
+ public void setRedisClusterKey(String key, String value) {
+ JedisClusterUtils.set(key, value, 86400);// 24小时超时
+ }
+
+ /**
+ * 将所有业务的配置key记录下来,方便读取
+ *
+ * @param value
+ */
+ public void setAllServiceKey(String value) {
+ JedisCluster resource = JedisClusterUtils.getResource();
+ resource.append("allConfigSyncKey", value + ";");
+ }
+
+ public Map<String, Integer> getAllConfig() {
+ Jedis resource = JedisUtils.getResource(0);
+ Set<String> effectiveSet = new HashSet<>();
+ Set<String> obsoleteSet = new HashSet<>();
+ for (int i = 2; i < 6; i++) {
+ resource.select(i);
+ effectiveSet.addAll(resource.keys("EFFECTIVE_RULE:*_COMPILE*"));
+ obsoleteSet.addAll(resource.keys("OBSOLETE_RULE:*_COMPILE*"));
+ }
+ Map<String, Integer> map = new HashMap<>();
+ map.put("effectiveMaatKeys", effectiveSet.size());
+ map.put("obsoleteMaatKeys", obsoleteSet.size());
+ JedisUtils.returnBrokenResource(resource);
+ return map;
+ }
+
+ public Map<String, Integer> getAllConfigByScan() {
+ Jedis resource = JedisUtils.getResource(0);
+ Set<String> effectiveSet = new HashSet<>();
+ Set<String> obsoleteSet = new HashSet<>();
+ for (int i = 2; i < 6; i++) {
+ resource.select(i);
+ effectiveSet.addAll(getKeyByScan("EFFECTIVE_RULE:*_COMPILE*", resource));
+ obsoleteSet.addAll(getKeyByScan("OBSOLETE_RULE:*_COMPILE*", resource));
+ }
+ Map<String, Integer> map = new HashMap<>();
+ map.put("effectiveMaat", effectiveSet.size());
+ map.put("obsoleteMaat", obsoleteSet.size());
+ JedisUtils.returnBrokenResource(resource);
+ return map;
+ }
+
+ public List<String> getKeyByScan(String pattern, Jedis resource) {
+ List<String> list = new ArrayList<>();
+ int count = 1000;
+ String cursor = ScanParams.SCAN_POINTER_START;
+ ScanParams scanParams = new ScanParams();
+ scanParams.count(count);
+ scanParams.match(pattern);
+
+ do {
+ ScanResult<String> scanResult = resource.scan(cursor, scanParams);
+ list.addAll(scanResult.getResult());
+ cursor = scanResult.getStringCursor();
+ } while (!"0".equals(cursor));
+ return list;
+ }
}
diff --git a/src/main/java/com/nis/web/task/SyncAllConfigTask.java b/src/main/java/com/nis/web/task/SyncAllConfigTask.java
new file mode 100644
index 0000000..ec9b288
--- /dev/null
+++ b/src/main/java/com/nis/web/task/SyncAllConfigTask.java
@@ -0,0 +1,286 @@
+package com.nis.web.task;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import com.nis.domain.restful.ConfigSource;
+import com.nis.restful.RestBusinessCode;
+import com.nis.restful.ServiceRuntimeException;
+import com.nis.util.Constants;
+import com.nis.util.ExceptionUtil;
+import com.nis.util.JedisUtils;
+import com.nis.web.service.AuditLogThread;
+import com.nis.web.service.SpringContextHolder;
+import com.nis.web.service.restful.ConfigSourcesService;
+import com.zdjizhi.utils.JsonMapper;
+
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisException;
+
+@Component
+@PropertySource(value = { "classpath:nis.properties" })
+public class SyncAllConfigTask {
+ private static Logger logger = LoggerFactory.getLogger(SyncAllConfigTask.class);
+// @Autowired
+// private JedisCluster jedisCluster;
+ @Autowired
+ protected ConfigSourcesService configSourcesService;
+
+ /**
+ * 每次使用获取新连接,避免集群某些机器宕机后影响连接使用,注意需要在applicationContext-redis.xml中修改jedisCluster的scope为 prototype
+ *
+ * @return
+ * @throws JedisException
+ */
+ public static JedisCluster getResource() throws JedisException {
+ JedisCluster jedisCluster = SpringContextHolder.getBean(JedisCluster.class);
+ if (jedisCluster == null) {
+ throw new ServiceRuntimeException("无法获取redis-cluster连接,请联系管理员检查程序",
+ RestBusinessCode.CannotConnectionRedis.getValue());
+ }
+ return jedisCluster;
+
+ }
+
+ @Scheduled(cron = "${syncUiAndServiceConfigCron}")
+ public void syncRedisToCluster() {
+ String requestId = UUID.randomUUID().toString();
+ JedisCluster jedisCluster = getResource();
+ Map<Integer, Map<String, String>> map = null;
+ try {
+ if (lock(jedisCluster, requestId)) {// 避免集群环境下同一秒钟所有的机器都执行这个定时任务
+// if (true) {// 避免集群环境下同一秒钟所有的机器都执行这个定时任务
+ String allConfigSyncStatus = jedisCluster.get("allConfigSyncStatus");
+ if (allConfigSyncStatus != null) {// 配置初始化完成
+ if (allConfigSyncStatus.trim().equals("1")) {
+ map = getAllSeqAndVersion();
+ // 设置配置同步状态为正在进行
+ configSourcesService.setAllConfigSyncStatus("2");
+ logger.warn("开始执行配置全量导入操作,将allConfigSyncStatus值设置为2正在进行导入操作");
+ Map<String, String> maatMap = new HashMap<>();
+ Map<String, String> unMaatMap = new HashMap<>();
+ String allConfigSyncKey = jedisCluster.get("allConfigSyncKey");
+ if (allConfigSyncKey != null && !allConfigSyncKey.trim().equals("")) {
+ String[] split = org.apache.commons.lang.StringUtils.split(allConfigSyncKey, ";");
+ for (String key : split) {
+ String val = jedisCluster.get(key);
+ String md5 = DigestUtils.md5Hex(val);
+ if (key.startsWith("UNMAAT")) {
+ unMaatMap.put(md5, val);
+ } else if (key.startsWith("MAAT")) {
+ maatMap.put(md5, val);
+ }
+ }
+ flushRedisDb();
+ addConfigToRedis(maatMap, true);
+ addConfigToRedis(unMaatMap, false);
+ logger.warn("执行配置全量导入成功,将allConfigSyncStatus值设置为3,导入成功");
+ // 设置配置同步状态为写redis成功
+ configSourcesService.setAllConfigSyncStatus("3");
+
+ // 删除存储全量配置key的关系key
+ jedisCluster.del("allConfigSyncKey");
+ for (String key : split) {
+ jedisCluster.del(key);
+ }
+ logger.warn("删除allConfigSyncKey,及其中的内容成功");
+ }
+ } else {
+ logger.info(
+ "集群中allConfigSyncStatus的值是{}[开始:0(界面下发同步状态),初始化:1(配置接收完成状态),进行中:2(服务写redis),已完成:3(服务写redis完毕),失败:-1(服务写redis失败)],暂不执行全量配置同步操作",
+ allConfigSyncStatus);
+ }
+ } else {
+ logger.info("未从集群中获取到allConfigSyncStatus的值,暂不执行全量配置同步操作");
+ }
+
+ } else {
+ logger.info("没有从rediscluster中获取到allConfigSyncDistributedLock分布式锁,暂时不执行数据同步!");
+ }
+ } catch (Exception e) {
+ logger.error("同步界面配置到redis中失败,失败原因:{}", ExceptionUtil.getExceptionMsg(e));
+ // 设置配置同步状态为写redis失败
+ configSourcesService.setAllConfigSyncStatus("-1");
+ logger.error("执行配置全量导入失败,将allConfigSyncStatus值设置为-1,导入失败");
+ } finally {
+ unlock(jedisCluster, requestId);
+ closeConn(jedisCluster);
+ if (map != null && map.size() > 0) {
+ recoverRedisData(map);
+ }
+ }
+ }
+
+ /**
+ * 保存数据入库,有验证逻辑
+ *
+ * @param map
+ * @param isMaat
+ * @throws Exception
+ */
+ public void addConfigToRedisYZ(Map<String, String> map, boolean isMaat) throws Exception {
+ long time = System.currentTimeMillis();
+ StringBuffer sb = new StringBuffer();
+ if (isMaat) {
+ for (Entry<String, String> entry : map.entrySet()) {
+ ConfigSource configSource = new JsonMapper().fromJson(entry.getValue(), ConfigSource.class);
+ configSourcesService.saveMaatConfig(new AuditLogThread(), time, configSource.getConfigCompileList(),
+ sb);
+ }
+ } else {
+ for (Entry<String, String> entry : map.entrySet()) {
+ String value = entry.getValue();
+ configSourcesService.saveCommonSources(new AuditLogThread(), time, value);
+ }
+
+ }
+
+ }
+
+ /**
+ * 保存数据入库,无验证逻辑
+ *
+ * @param map
+ * @param isMaat
+ * @throws Exception
+ */
+ private void addConfigToRedis(Map<String, String> map, boolean isMaat) throws Exception {
+ if (isMaat) {
+ for (Entry<String, String> entry : map.entrySet()) {
+ ConfigSource configSource = new JsonMapper().fromJson(entry.getValue(), ConfigSource.class);
+ configSourcesService.saveMaatConfig(configSource.getConfigCompileList());
+ }
+ } else {
+ for (Entry<String, String> entry : map.entrySet()) {
+ String value = entry.getValue();
+ configSourcesService.saveCommonSources(value);
+ }
+
+ }
+
+ }
+
+ /**
+ * 关闭集群连接
+ *
+ * @param jedisCluster
+ */
+ private void closeConn(JedisCluster jedisCluster) {
+ if (jedisCluster != null) {
+ try {
+ jedisCluster.close();
+ } catch (Exception e) {
+ throw new ServiceRuntimeException("释放redis-cluster连接失败",
+ RestBusinessCode.CannotConnectionRedis.getValue());
+ }
+ }
+ }
+
+// 设置成功返回的结果OK
+ private static final String LOCK_SUCCESS = "OK";
+// NX -- Only set the key if it does not already exist. XX -- Only set the key
+// if it already exist
+ private static final String SET_IF_NOT_EXIST = "NX";
+// 失效单位秒(EX)还是毫秒(PX)
+ private static final String SET_WITH_EXPIRE_TIME = "EX";
+ private static final Long UNLOCK_SUCCESS = 1L;
+
+ /**
+ * 尝试获取分布式锁,如果没有key就set,有key就不操作
+ *
+ * @param requestId 请求标识(UUID.randomUUID().toString()),正常情况下是谁加的锁,谁去解锁不能a加的锁,b去解锁
+ * @return 是否获取成功
+ */
+ private Boolean lock(JedisCluster jedisCluster, String requestId) {
+ String key = "allConfigSyncDistributedLock";
+ String var1 = jedisCluster.set(key, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME,
+ Constants.CONFIGSYNCLOCKTIME);
+ if (LOCK_SUCCESS.equals(var1)) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * 解除redis分布式锁
+ *
+ * @param requestId
+ */
+ private boolean unlock(JedisCluster jedisCluster, String requestId) {
+ String key = "allConfigSyncDistributedLock";
+ // 这个字符串是个lua脚本,代表的意思是如果根据key拿到的value跟传入的value相同就执行del,否则就返回0【保证安全性】
+ String luaScript = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then return redis.call(\"del\",KEYS[1]) else return 0 end";
+ // 这个命令就是去执行lua脚本,KEYS的集合就是第二个参数,ARGV的集合就是第三参数【保证解锁的原子操作】
+ Object var2 = jedisCluster.eval(luaScript, Collections.singletonList(key),
+ Collections.singletonList(requestId));
+ if (UNLOCK_SUCCESS == var2) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * 从配置redis库中获取没个redisdb的maat_version,0,14,15号库不会有maat_version所以就不获取了
+ *
+ * @return
+ */
+ private Map<Integer, Map<String, String>> getAllSeqAndVersion() {
+ // 第一个key是redisdb,第二个key是redis的
+ Map<Integer, Map<String, String>> map = new HashMap<>();
+ for (int i = 1; i < 14; i++) {
+ String maatVersionStr = JedisUtils.get("MAAT_VERSION", i);
+ if (!map.containsKey(i)) {
+ Map<String, String> keyValMap = new HashMap<>();
+ if (maatVersionStr != null) {
+ keyValMap.put("MAAT_VERSION", maatVersionStr);
+ map.put(i, keyValMap);
+ }
+
+ }
+ }
+// String seqCompileid = JedisUtils.get("SEQ_COMPILEID", 0);
+// String seqGroupid = JedisUtils.get("SEQ_GROUPID", 0);
+// String seqRegionid = JedisUtils.get("SEQ_REGIONID", 0);
+// Map<String, String> keyValMap = new HashMap<>();
+// keyValMap.put("SEQ_COMPILEID", seqCompileid);
+// keyValMap.put("SEQ_GROUPID", seqGroupid);
+// keyValMap.put("SEQ_REGIONID", seqRegionid);
+// map.put(0, keyValMap);
+ return map;
+ }
+
+ /**
+ * 清空配置redis库,不清空0号库
+ */
+ private void flushRedisDb() {// 不清空0号库
+ for (int i = 1; i < 16; i++) {
+ JedisUtils.getResource(i).flushDB();
+ }
+ }
+
+ /**
+ * 恢复配置redis库中各个索引的maat_version
+ *
+ * @param map
+ */
+ private void recoverRedisData(Map<Integer, Map<String, String>> map) {
+ for (Integer redisDB : map.keySet()) {
+ Map<String, String> keyValMap = map.get(redisDB);
+ for (String redisKey : keyValMap.keySet()) {
+ JedisUtils.set(redisKey, String.valueOf(Long.parseLong(keyValMap.get(redisKey)) + 2l), 0, redisDB);
+ }
+ }
+ }
+
+}
diff --git a/src/main/resources/nis.properties b/src/main/resources/nis.properties
index 8706107..328d4ad 100644
--- a/src/main/resources/nis.properties
+++ b/src/main/resources/nis.properties
@@ -219,7 +219,13 @@ isOpenLogCountAndLast=true
#redis分布式锁超时时间,默认五分钟,3000秒
redisLockTime=3000
#获取redis分布式锁失败后的尝试获取锁的次数,每次失败暂停一秒钟后再次尝试
-redisRetryNum=500
+redisRetryNum=5
+
+#定时检测界面和服务的配置是否需要同步的定时任务间隔
+syncUiAndServiceConfigCron=0/10 * * * * ?
+#全量数据同步分布式锁时间,默认两个小时
+configSyncLockTime=7200
+
#ip定位库的地址
#ipLocationLibraryPath=/usr/local/ipLocalLibraay/Kazakhstan.mmdb
ipLocationLibraryPath=C:\\ipLocation\\Kazakhstan.mmdb