summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordefault <default@DESKTOP-7FEGRP2>2018-10-08 15:44:24 +0800
committerdefault <default@DESKTOP-7FEGRP2>2018-10-08 15:44:24 +0800
commit94f6853323dac772778320aefaf1ae530795f82a (patch)
tree455fb5903e9b963f55736d94eece1da3be0a3a22
parenta2d60895b9c6904cf467cb77efc9655b330ba10c (diff)
update
-rw-r--r--nms_sync/src/com/nms/main/SyncData.java4
-rw-r--r--nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java46
-rw-r--r--nms_sync/src/com/nms/thread/SyncThread.java46
3 files changed, 76 insertions, 20 deletions
diff --git a/nms_sync/src/com/nms/main/SyncData.java b/nms_sync/src/com/nms/main/SyncData.java
index a2970ad..d78727c 100644
--- a/nms_sync/src/com/nms/main/SyncData.java
+++ b/nms_sync/src/com/nms/main/SyncData.java
@@ -54,10 +54,10 @@ public class SyncData{
// 主库向分库同步数据
SyncThread syncThread = Duang.duang(new SyncThread(syncDbInfo));
logger.info("创建主库同步分库线程执行任务");
- //scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS);
+ scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS);
// 分库向主库同步数据
logger.info("创建分库数据同步到主库线程执行任务");
- scheduleService.scheduleWithFixedDelay(new SyncSlaveToMasterThread(syncDbInfo), 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS);
+ //scheduleService.scheduleWithFixedDelay(new SyncSlaveToMasterThread(syncDbInfo), 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS);
}
}else{
logger.info("获取同步记录信息失败 请检查数据库数据信息");
diff --git a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java
index f07bba5..603b76f 100644
--- a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java
+++ b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java
@@ -108,10 +108,14 @@ public class SyncSlaveToMasterThread implements Runnable{
insertIds.toArray());
for(Record insertData:insertDatas){
Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
- insertData.set(record.getStr("id_name"), seqData.getLong("seqId"));
if(record.getStr("table_name").equals("event_record_library")) {
+ //设置数据状态为同步数据
insertData.set("sync_status",1);
+ //设置同步数据所在数据库的id以及所在原来表的id 用于修改和删除 -1为中心或主库
+ insertData.set("old_id",insertData.getLong(record.getStr("id_name")));
+ insertData.set("db_id", syncDbInfo.getInt("id"));
}
+ insertData.set(record.getStr("id_name"), seqData.getLong("seqId"));
}
Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
@@ -147,30 +151,54 @@ public class SyncSlaveToMasterThread implements Runnable{
@Override
public boolean run() throws SQLException {
List<Integer> updateIds = new ArrayList<Integer>();
- StringBuffer deleteStr = new StringBuffer();
+ StringBuffer handleStr = new StringBuffer();
for (int i = 0; i < datas.size(); i++) {
updateIds.add(datas.get(i).getInt("target_id"));
if (i == 0) {
- deleteStr.append("?");
+ handleStr.append("?");
} else {
- deleteStr.append(",?");
+ handleStr.append(",?");
}
}
logger.info("分库同步到主库数据的操作数据的ID信息"+JSON.toJSONString(updateIds));
if (record.getInt("event") == 2) {
List<Record> updateDatas = Db.use(url.toString())
.find(" select * from " + record.getStr("table_name") + " where "
- + record.getStr("id_name") + " in (" + deleteStr + ") ",
+ + record.getStr("id_name") + " in (" + handleStr + ") ",
updateIds.toArray());
logger.info("分库修改的数据信息为"+JSON.toJSONString(updateDatas));
if (updateDatas != null && updateDatas.size() > 0) {
- Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
- updateDatas, record.getInt("batch_size"));
+ if(record.getStr("table_name").equals("event_record_library")) {
+ for(Record updateData:updateDatas) {
+ updateData.set("old_id",updateData.getLong("id"));
+ updateData.set("db_id", syncDbInfo.get("id"));
+ updateData.remove("id");
+ }
+ Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size"));
+ }else {
+ Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
+ updateDatas, record.getInt("batch_size"));
+ }
}
logger.info("分库同步主库修改数据任务完成");
} else if (record.getInt("event") == 3) {
- Db.use("masterDataSource").update("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+" in ("
- + deleteStr + ") ", updateIds.toArray());
+ if(record.getStr("table_name").equals("event_record_library")) {
+ updateIds.clear();
+ handleStr.delete(0,handleStr.length());
+ for (int i = 0; i < datas.size(); i++) {
+ updateIds.add(datas.get(i).getInt("target_id"));
+ updateIds.add(syncDbInfo.getInt("id"));
+ if (i == 0) {
+ handleStr.append("(?,?)");
+ } else {
+ handleStr.append(",(?,?)");
+ }
+ }
+ Db.use("masterDataSource").update("delete from event_record_library where (old_id,db_id) in ("+handleStr+")",updateIds.toArray());
+ }else {
+ Db.use("masterDataSource").update("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+" in ("
+ + handleStr + ") ", updateIds.toArray());
+ }
logger.info("分库同步主库删除数据任务完成");
}
Object lastUpdateId = datas.get(datas.size() - 1).get("id");
diff --git a/nms_sync/src/com/nms/thread/SyncThread.java b/nms_sync/src/com/nms/thread/SyncThread.java
index 5081eb4..0141f0f 100644
--- a/nms_sync/src/com/nms/thread/SyncThread.java
+++ b/nms_sync/src/com/nms/thread/SyncThread.java
@@ -118,10 +118,14 @@ public class SyncThread implements Runnable {
insertIds.toArray());
for(Record insertData:insertDatas){
Record seqData = Db.use(url.toString()).findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
- insertData.set(record.getStr("id_name"), seqData.getLong("seqId"));
if(record.getStr("table_name").equals("event_record_library")) {
+ //设置数据状态为同步数据
insertData.set("sync_status",1);
+ //设置同步数据所在数据库的id以及所在原来表的id 用于修改和删除 -1为中心或主库
+ insertData.set("old_id",insertData.getLong(record.getStr("id_name")));
+ insertData.set("db_id", -1);
}
+ insertData.set(record.getStr("id_name"), seqData.getLong("seqId"));
}
Db.use(url.toString()).batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
@@ -158,30 +162,54 @@ public class SyncThread implements Runnable {
@Override
public boolean run() throws SQLException {
List<Integer> updateIds = new ArrayList<Integer>();
- StringBuffer deleteStr = new StringBuffer();
+ StringBuffer handleStr = new StringBuffer();
for (int i = 0; i < datas.size(); i++) {
updateIds.add(datas.get(i).getInt("target_id"));
if (i == 0) {
- deleteStr.append("?");
+ handleStr.append("?");
} else {
- deleteStr.append(",?");
+ handleStr.append(",?");
}
}
logger.info("获取所有操作的数据id信息为"+JSON.toJSONString(updateIds));
if (record.getInt("event") == 2) {
List<Record> updateDatas = Db.use("masterDataSource")
.find(" select * from " + record.getStr("table_name") + " where "
- + record.getStr("id_name") + " in (" + deleteStr + ") ",
+ + record.getStr("id_name") + " in (" + handleStr + ") ",
updateIds.toArray());
//logger.info("获取所有修改数据的数据信息为"+JSON.toJSONString(updateDatas));
if (updateDatas != null && updateDatas.size() > 0) {
- Db.use(url.toString()).batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
- updateDatas, record.getInt("batch_size"));
+ if(record.getStr("table_name").equals("event_record_library")) {
+ for(Record updateData:updateDatas) {
+ updateData.set("old_id",updateData.getLong("id"));
+ updateData.set("db_id", -1);
+ updateData.remove("id");
+ }
+ Db.use(url.toString()).batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size"));
+ }else {
+ Db.use(url.toString()).batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
+ updateDatas, record.getInt("batch_size"));
+ }
}
logger.info("分库对主库修改操作的数据同步任务完成");
} else if (record.getInt("event") == 3) {
- Db.use(url.toString()).update("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+" in ("
- + deleteStr + ") ", updateIds.toArray());
+ if(record.getStr("table_name").equals("event_record_library")) {
+ updateIds.clear();
+ handleStr.delete(0,handleStr.length());
+ for (int i = 0; i < datas.size(); i++) {
+ updateIds.add(datas.get(i).getInt("target_id"));
+ updateIds.add(-1);
+ if (i == 0) {
+ handleStr.append("(?,?)");
+ } else {
+ handleStr.append(",(?,?)");
+ }
+ }
+ Db.use(url.toString()).update("delete from event_record_library where (old_id,db_id) in ("+handleStr+")",updateIds.toArray());
+ }else {
+ Db.use(url.toString()).update("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+" in ("
+ + handleStr + ") ", updateIds.toArray());
+ }
logger.info("分库对主库删除操作的数据同步完成");
}
Object lastUpdateId = datas.get(datas.size() - 1).get("id");