summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordefault <default@DESKTOP-7FEGRP2>2018-10-24 18:17:04 +0800
committerdefault <default@DESKTOP-7FEGRP2>2018-10-24 18:17:04 +0800
commit0efdb80e5c6107dc3f7996e8ae49a5a1b205b1a3 (patch)
tree5cf3acfd1a1d45bddb34946e2106b5c7fd182660
parentea3230250e25d6e0916621920ef8f2a4a11a3782 (diff)
update
-rw-r--r--nms_sync/bin/db.properties2
-rw-r--r--nms_sync/conf/db.properties2
-rw-r--r--nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java92
3 files changed, 68 insertions, 28 deletions
diff --git a/nms_sync/bin/db.properties b/nms_sync/bin/db.properties
index 7b577b9..69071e9 100644
--- a/nms_sync/bin/db.properties
+++ b/nms_sync/bin/db.properties
@@ -1,6 +1,6 @@
#dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}]
#\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740
-dburl=jdbc:mysql://192.168.11.153:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true
+dburl=jdbc:mysql://192.168.10.182:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true
#\u6570\u636e\u5e93\u8d26\u6237\u540d
dbusername=nms
#\u6570\u636e\u5e93\u5bc6\u7801
diff --git a/nms_sync/conf/db.properties b/nms_sync/conf/db.properties
index 7b577b9..69071e9 100644
--- a/nms_sync/conf/db.properties
+++ b/nms_sync/conf/db.properties
@@ -1,6 +1,6 @@
#dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}]
#\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740
-dburl=jdbc:mysql://192.168.11.153:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true
+dburl=jdbc:mysql://192.168.10.182:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true
#\u6570\u636e\u5e93\u8d26\u6237\u540d
dbusername=nms
#\u6570\u636e\u5e93\u5bc6\u7801
diff --git a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java
index e1664cc..91949c7 100644
--- a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java
+++ b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java
@@ -43,6 +43,7 @@ public class SyncSlaveToMasterThread implements Runnable{
//logger.info("查询分库需要同步到主库的数据信息"+JSON.toJSONString(find));
if (find != null && find.size() > 0) {
for (final Record record : find) {
+ logger.info("分库数据同步到主库 正在操作的表名为:"+ record.getStr("table_name"));
//如果设定指定字段 则只操作指定字段数据 无则操作全部
final StringBuffer columns=new StringBuffer();
columns.append("*");
@@ -71,32 +72,65 @@ public class SyncSlaveToMasterThread implements Runnable{
}
// 针对监测结果表的id值 自动生成处理
if("detection_info_new".equals(record.getStr("table_name"))) {
- for(Record entity:data) {
- Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
- entity.set("old_id",entity.getLong(record.getStr("id_name")));
- entity.set("db_id", syncDbInfo.getInt("id"));
- entity.set(record.getStr("id_name"), seqData.getLong("seqId"));
- }
+ Db.use(url.toString()).tx(new IAtom() {
+ @Override
+ public boolean run() throws SQLException {
+ // TODO Auto-generated method stub
+ return Db.use("masterDataSource").tx(new IAtom() {
+ @Override
+ public boolean run() throws SQLException {
+ final List<Record> insertDatas=new ArrayList<Record>();
+ final List<Record> updateDatas=new ArrayList<Record>();
+ for(Record entity:data) {
+ // 循环遍历数据 判断当前数据是新增还是修改
+ Record findFirst = Db.use("masterDataSource").findFirst("select * from " + record.getStr("table_name")+" where DETECTION_SET_INFO_ID = ? and SEQ_ID = ? ",entity.get("DETECTION_SET_INFO_ID"),entity.get("SEQ_ID"));
+ if(null!=findFirst) {
+ entity.set(record.getStr("id_name"),findFirst.getStr("id_name"));
+ updateDatas.add(entity);
+ }else {
+ Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
+ entity.set(record.getStr("id_name"), seqData.getLong("seqId"));
+ insertDatas.add(entity);
+ }
+ }
+ if(insertDatas.size()>0) {
+ Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
+ }
+ if(updateDatas.size()>0) {
+ Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
+ updateDatas, record.getInt("batch_size"));
+ }
+ logger.info("分库同步 detection_info_new 增量更新数据完成 表名为"+record.getStr("table_name"));
+ logger.info("分库同步 detection_info_new 最后数据的id信息为"+JSON.toJSONString(lastInsertId));
+ record.set("last_id", lastInsertId);
+ record.set("last_date", new Date());
+ Db.use(url.toString()).update("table_sync_info", record);
+ return true;
+ }
+ });
+ }
+ });
+ }else {
+ Db.use(url.toString()).tx(new IAtom() {
+ @Override
+ public boolean run() throws SQLException {
+ return Db.use("masterDataSource").tx(new IAtom() {
+ @Override
+ public boolean run() throws SQLException {
+ Db.use("masterDataSource").batchSave(record.getStr("table_name"), data, record.getInt("batch_size"));
+ // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
+ logger.info("分库同步增量更新数据完成 表名为"+record.getStr("table_name"));
+ logger.info("分库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId));
+ record.set("last_id", lastInsertId);
+ record.set("last_date", new Date());
+ Db.use(url.toString()).update("table_sync_info", record);
+ return true;
+ }
+ });
+ }
+ });
+ logger.info("分库同步增量更新数据完成 修改最后同步ID");
}
- Db.use(url.toString()).tx(new IAtom() {
- @Override
- public boolean run() throws SQLException {
- return Db.use("masterDataSource").tx(new IAtom() {
- @Override
- public boolean run() throws SQLException {
- Db.use("masterDataSource").batchSave(record.getStr("table_name"), data, record.getInt("batch_size"));
- // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
- logger.info("分库同步增量更新数据完成 表名为"+record.getStr("table_name"));
- logger.info("分库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId));
- record.set("last_id", lastInsertId);
- record.set("last_date", new Date());
- Db.use(url.toString()).update("table_sync_info", record);
- return true;
- }
- });
- }
- });
- logger.info("分库同步增量更新数据完成 修改最后同步ID");
} else {
flag = false;
}
@@ -195,7 +229,7 @@ public class SyncSlaveToMasterThread implements Runnable{
updateIds.toArray());
//logger.info("分库修改的数据信息为"+JSON.toJSONString(updateDatas));
if (updateDatas != null && updateDatas.size() > 0) {
- if(record.getStr("table_name").equals("event_record_library")||record.getStr("table_name").equals("detection_info_new")) {
+ if(record.getStr("table_name").equals("event_record_library")) {
for(Record updateData:updateDatas) {
updateData.set("old_id",updateData.getLong(record.getStr("id_name")));
updateData.set("db_id", syncDbInfo.get("id"));
@@ -204,6 +238,12 @@ public class SyncSlaveToMasterThread implements Runnable{
}
Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size"));
}else {
+ if(record.getStr("table_name").equals("detection_info_new")) {
+ for(Record updateData:updateDatas) {
+ Record findFirst = Db.use("masterDataSource").findFirst("select * from " + record.getStr("table_name")+" where DETECTION_SET_INFO_ID = ? and SEQ_ID = ? ",updateData.get("DETECTION_SET_INFO_ID"),updateData.get("SEQ_ID"));
+ updateData.set(record.getStr("id_name"),findFirst.getLong(record.getStr("id_name")));
+ }
+ }
Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
updateDatas, record.getInt("batch_size"));
}