summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordefault <default@DESKTOP-7FEGRP2>2018-11-12 11:00:07 +0800
committerdefault <default@DESKTOP-7FEGRP2>2018-11-12 11:00:07 +0800
commit058a1c090b3190441f9b4822d53df51d804d1516 (patch)
tree79b072a332fb88e42d4118c81fd389a70a4c5bd1
parent8e46841e918cabd2afe6bce114a0444cb8fcd026 (diff)
修改周期任务相关数据缺失问题
-rw-r--r--nms_sync/src/com/nms/interceptor/SyncMissionResultStatisticalInterceptor.java35
-rw-r--r--nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java104
2 files changed, 82 insertions, 57 deletions
diff --git a/nms_sync/src/com/nms/interceptor/SyncMissionResultStatisticalInterceptor.java b/nms_sync/src/com/nms/interceptor/SyncMissionResultStatisticalInterceptor.java
index a6d74ff..ae6043f 100644
--- a/nms_sync/src/com/nms/interceptor/SyncMissionResultStatisticalInterceptor.java
+++ b/nms_sync/src/com/nms/interceptor/SyncMissionResultStatisticalInterceptor.java
@@ -188,7 +188,27 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{
}
// 修改周期任务的执行状态
- Record result = Db.use().findFirst("select lmst.mission_id missionId, t.ok, t.fail, t.total\r\n" +
+ Record result = null;
+ // 周期任务时进行统计
+ if(loopmissionStateTableInfo.getInt("mission_state").equals(7)) {
+ result=Db.use().findFirst("select lmst.mission_id missionId, t.ok, t.fail, t.total\r\n" +
+ "from (select mrt.mission_id, \r\n" +
+ "ifnull(sum(case mrt.result when 7 then 1 else 0 end), 0) ok, \r\n" +
+ "ifnull(sum(case mrt.result when 4 then 1 else 0 end), 0) fail, \r\n" +
+ "count(mrt.seq_id) total \r\n" +
+ "from mission_result_table4 mrt \r\n" +
+ "group by mrt.mission_id) t \r\n" +
+ "left join (select max(lst.cur_mission_id) cur_mission_id, lst.mission_id \r\n" +
+ "from mission_result_table4 mrt4 \r\n" +
+ "left join loopmission_state_table lst on mrt4.mission_id = lst.cur_mission_id \r\n" +
+ "group by lst.mission_id) lmst \r\n" +
+ "on lmst.cur_mission_id = t.mission_id \r\n" +
+ "left join mission_state_table mst \r\n" +
+ "on mst.mission_id = lmst.mission_id \r\n" +
+ "where lmst.cur_mission_id is not null and mst.is_loop=1 \r\n" +
+ "and mst.mission_id = ? ", loopmissionStateTableInfo.getLong("mission_id"));
+ }else {
+ result=Db.use().findFirst("select lmst.mission_id missionId, t.ok, t.fail, t.total\r\n" +
"from (select mrt.mission_id, \r\n" +
"ifnull(sum(case mrt.result when 0 then 1 else 0 end), 0) ok, \r\n" +
"ifnull(sum(case mrt.result when 1 then 1 when -1 then 1 else 0 end), 0) fail, \r\n" +
@@ -204,6 +224,7 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{
"on mst.mission_id = lmst.mission_id \r\n" +
"where lmst.cur_mission_id is not null and mst.is_loop=1 \r\n" +
"and mst.mission_id = ? ", loopmissionStateTableInfo.getLong("mission_id"));
+ }
if(null!=result) {
if(null!=result.get("missionId")&&result.getStr("missionId").length()>0) {
Long ok = result.getLong("ok");
@@ -244,6 +265,18 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{
}
break;
}
+ }else {
+ missionState = loopmissionStateTableInfo.getInt("mission_state");
+ Integer missionTableInfoState = missionTableInfo.getInt("mission_state");
+ switch(missionState) {
+ case(3):
+ Record findFirst = Db.use().findFirst("select COUNT(*) count from loopmission_state_table where mission_id = ? and mission_state=0",loopmissionStateTableInfo.getLong("mission_id"));
+ if(findFirst.getInt("count").equals(0)&&missionTableInfoState!=3) {
+ status=3;
+ autoDesc=format.format(System.currentTimeMillis())+"i18n_server.UpgradeService.sql.revoke_n81i";
+ }
+ break;
+ }
}
Record missionStateTableResult =new Record();
diff --git a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java
index 93f31d1..b655ccc 100644
--- a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java
+++ b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java
@@ -132,45 +132,6 @@ public class SyncSlaveToMasterThread implements Runnable{
});
}
});
- }else if(record.getStr("table_name").equals("loopmission_state_table")){
- Db.use(url.toString()).tx(new IAtom() {
- @Override
- public boolean run() throws SQLException {
- // TODO Auto-generated method stub
- return Db.use("masterDataSource").tx(new IAtom() {
- @Override
- public boolean run() throws SQLException {
- Set<Long> set = SyncData.getThreadlocalLoopInsertMissionIds();
- final List<Record> insertDatas=new ArrayList<Record>();
- final List<Record> updateDatas=new ArrayList<Record>();
- for(Record entity:data) {
- // 循环遍历数据 判断当前数据是新增还是修改
- Record findFirst = Db.use("masterDataSource").findFirst("select * from " + record.getStr("table_name")+" where "+record.getStr("id_name")+"= ? ",entity.get(record.getStr("id_name")));
- if(null!=findFirst) {
- updateDatas.add(entity);
- }else {
- set.add(entity.getLong(record.getStr("id_name")));
- insertDatas.add(entity);
- }
- }
- SyncData.setThreadlocalLoopInsertMissionIds(set);
- if(insertDatas.size()>0) {
- Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
- }
- if(updateDatas.size()>0) {
- Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
- updateDatas, record.getInt("batch_size"));
- }
- logger.info("分库同步 loopmission_state_table 增量更新数据完成 表名为"+record.getStr("table_name"));
- logger.info("分库同步 loopmission_state_table 最后数据的id信息为"+JSON.toJSONString(lastInsertId));
- record.set("last_id", lastInsertId);
- record.set("last_date", new Date());
- Db.use(url.toString()).update("table_sync_info", record);
- return true;
- }
- });
- }
- });
}else {
Db.use(url.toString()).tx(new IAtom() {
@Override
@@ -227,25 +188,56 @@ public class SyncSlaveToMasterThread implements Runnable{
.find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where "
+ record.getStr("id_name") + " in (" + insertStr + ") ",
insertIds.toArray());
- for(Record insertData:insertDatas){
- Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
- if(record.getStr("table_name").equals("event_record_library")) {
- //设置数据状态为同步数据
- insertData.set("sync_status",1);
- //设置同步数据所在数据库的id以及所在原来表的id 用于修改和删除 -1为中心或主库
- insertData.set("old_id",insertData.getLong(record.getStr("id_name")));
- insertData.set("db_id", syncDbInfo.getLong("id"));
+ if(!record.getStr("table_name").equals("loopmission_state_table")) {
+ for(Record insertData:insertDatas){
+ if(record.getStr("table_name").equals("event_record_library")) {
+ Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
+ //设置数据状态为同步数据
+ insertData.set("sync_status",1);
+ //设置同步数据所在数据库的id以及所在原来表的id 用于修改和删除 -1为中心或主库
+ insertData.set("old_id",insertData.getLong(record.getStr("id_name")));
+ insertData.set("db_id", syncDbInfo.getLong("id"));
+ insertData.set(record.getStr("id_name"), seqData.getLong("seqId"));
+ }
+ }
+ Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
+ // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
+ Object lastInsertId = data.get(data.size() - 1).get("id");
+ logger.info("分库增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId));
+ record.set("last_id", lastInsertId);
+ record.set("last_date", new Date());
+ Db.use(url.toString()).update("table_sync_info", record);
+ return true;
+ }else{
+ Object lastInsertId = data.get(data.size() - 1).get("id");
+ Set<Long> set = SyncData.getThreadlocalLoopInsertMissionIds();
+ final List<Record> insertDatas2=new ArrayList<Record>();
+ final List<Record> updateDatas=new ArrayList<Record>();
+ for(Record entity:insertDatas) {
+ // 循环遍历数据 判断当前数据是新增还是修改
+ Record findFirst = Db.use("masterDataSource").findFirst("select * from " + record.getStr("table_name")+" where "+record.getStr("id_name")+"= ? ",entity.get(record.getStr("id_name")));
+ if(null!=findFirst) {
+ updateDatas.add(entity);
+ }else {
+ set.add(entity.getLong(record.getStr("id_name")));
+ insertDatas2.add(entity);
+ }
}
- insertData.set(record.getStr("id_name"), seqData.getLong("seqId"));
+ SyncData.setThreadlocalLoopInsertMissionIds(set);
+ if(insertDatas2.size()>0) {
+ Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
+ }
+ if(updateDatas.size()>0) {
+ Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
+ updateDatas, record.getInt("batch_size"));
+ }
+ logger.info("分库同步 loopmission_state_table 增量更新数据完成 表名为"+record.getStr("table_name"));
+ logger.info("分库同步 loopmission_state_table 最后数据的id信息为"+JSON.toJSONString(lastInsertId));
+ record.set("last_id", lastInsertId);
+ record.set("last_date", new Date());
+ Db.use(url.toString()).update("table_sync_info", record);
+ return true;
}
- Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
- // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
- Object lastInsertId = data.get(data.size() - 1).get("id");
- logger.info("分库增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId));
- record.set("last_id", lastInsertId);
- record.set("last_date", new Date());
- Db.use(url.toString()).update("table_sync_info", record);
- return true;
}
});
}