summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfangshunjian <[email protected]>2018-12-21 21:49:35 +0600
committerfangshunjian <[email protected]>2018-12-21 21:50:29 +0600
commitaf4f0910ba531cc9791ff54cb86cea1c64caba8e (patch)
tree94559f2794e50ed0a1b22e7c95a2cdea9cf088bb
parente61cf4796d18a12514eda7e4dd7422e182735ab0 (diff)
1、主库向从库同步和从库向主库同步分别一个线程池master
2、程序启东时,new表全量更新 3、修改log日志信息
-rw-r--r--nms_sync/conf/config.properties6
-rw-r--r--nms_sync/conf/druid.properties30
-rw-r--r--nms_sync/src/com/nms/main/Conn.java2
-rw-r--r--nms_sync/src/com/nms/main/SyncData.java31
-rw-r--r--nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java174
-rw-r--r--nms_sync/src/com/nms/thread/SyncThread.java16
-rw-r--r--nms_sync/src/com/nms/util/StopWatch.java164
7 files changed, 353 insertions, 70 deletions
diff --git a/nms_sync/conf/config.properties b/nms_sync/conf/config.properties
index c32ace2..33a36dd 100644
--- a/nms_sync/conf/config.properties
+++ b/nms_sync/conf/config.properties
@@ -1,4 +1,8 @@
#\u4e3b\u5e93\u540c\u6b65\u5206\u5e93\u6570\u636e\u7ebf\u7a0b\u65f6\u95f4\u5dee
syncMaterToSlaveTime=30000
#\u4e3b\u5e93\u540c\u6b65\u5206\u5e93\u6570\u636e\u7ebf\u7a0b\u65f6\u95f4\u5dee
-syncSlaveToMaterTime=60000 \ No newline at end of file
+syncSlaveToMaterTime=60000
+#从库向主库同步的线程池数量
+slave.to.master.pool.num=3
+#主库向从库同步的线程池数量
+master.to.slave.pool.num=2
diff --git a/nms_sync/conf/druid.properties b/nms_sync/conf/druid.properties
new file mode 100644
index 0000000..0073e99
--- /dev/null
+++ b/nms_sync/conf/druid.properties
@@ -0,0 +1,30 @@
+#检测数据库链接是否有效,必须配置
+druid.validationQuery=SELECT 1 from dual
+#初始连接数
+druid.initialSize=1
+#最大连接池数量
+druid.maxActive=3
+#去掉,配置文件对应去掉
+#druid.maxIdle=20
+#配置0,当线程池数量不足,自动补充。
+druid.minIdle=1
+#获取链接超时时间为1分钟,单位为毫秒。
+druid.maxWait=60000
+#获取链接的时候,不校验是否可用,开启会有损性能。
+druid.testOnBorrow=false
+#归还链接到连接池的时候校验链接是否可用。
+druid.testOnReturn=false
+#此项配置为true即可,不影响性能,并且保证安全性。意义为:申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。
+druid.testWhileIdle=true
+#1.Destroy线程会检测连接的间隔时间
+#2.testWhileIdle的判断依据
+druid.timeBetweenEvictionRunsMillis=600000
+#一个链接生存的时间
+druid.minEvictableIdleTimeMillis=600000
+#链接使用超过时间限制是否回收
+druid.removeAbandoned=false
+#超过时间限制时间(单位秒),目前为5分钟,如果有业务处理时间超过5分钟,可以适当调整。
+druid.removeAbandonedTimeout=300
+#链接回收的时候控制台打印信息,测试环境可以加上true,线上环境false。会影响性能。
+druid.logAbandoned=false
+druid.filters= \ No newline at end of file
diff --git a/nms_sync/src/com/nms/main/Conn.java b/nms_sync/src/com/nms/main/Conn.java
index 80793c9..54b2ddd 100644
--- a/nms_sync/src/com/nms/main/Conn.java
+++ b/nms_sync/src/com/nms/main/Conn.java
@@ -32,10 +32,12 @@ public class Conn {
druid.setMaxWait(Integer.valueOf(PropKit.get("dbMaxWait")));
druid.setRemoveAbandoned(Boolean.valueOf(PropKit.get("dbRemoveAbandoned")));
druid.setRemoveAbandonedTimeoutMillis(Integer.valueOf(PropKit.get("dbRemoveAbandonedTimeout")));
+ druid.setConnectionProperties(SyncData.DRUID_CONFIG_FILE_PATH);//druid 配置文件路径
ActiveRecordPlugin arp=new ActiveRecordPlugin(url,druid);
arp.setShowSql(Boolean.valueOf(PropKit.get("dbShowSql")));
druid.start();
arp.start();
+ logger.debug(String.format("分库数据库连接池创建成功,ip: %s", syncDbInfo.get("ip")));
}
logger.info("创建各分库数据库的连接池完成");
}
diff --git a/nms_sync/src/com/nms/main/SyncData.java b/nms_sync/src/com/nms/main/SyncData.java
index a722153..d2c559d 100644
--- a/nms_sync/src/com/nms/main/SyncData.java
+++ b/nms_sync/src/com/nms/main/SyncData.java
@@ -1,21 +1,19 @@
package com.nms.main;
+import java.net.URL;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
+
import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
import com.jfinal.aop.Duang;
import com.jfinal.kit.PropKit;
import com.jfinal.plugin.activerecord.ActiveRecordPlugin;
-import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.druid.DruidPlugin;
import com.nms.model.SyncDbInfo;
import com.nms.thread.SyncSlaveToMasterThread;
@@ -35,6 +33,19 @@ public class SyncData{
private static final ThreadLocal<Set<Long>> threadLocalLoopUpdateMissionIds = new ThreadLocal<Set<Long>>();
// 线程变量记录周期任务结果新增的任务id
private static final ThreadLocal<Set<Long>> threadLocalLoopInsertMissionIds = new ThreadLocal<Set<Long>>();
+ //druid 配置文件路径
+ public static final String DRUID_CONFIG_FILE_PATH;
+
+ static {
+ URL urlObj = SyncData.class.getClassLoader().getResource("druid.properties");
+ if(urlObj==null){
+ System.err.println("找不到配置文件:druid.properties");
+ logger.error("No configuration file can be found: druid.properties");
+ System.exit(1);
+ }
+ DRUID_CONFIG_FILE_PATH = urlObj.getPath().replaceAll("%20", " ");
+ logger.debug(String.format("druid配置文件路径:", DRUID_CONFIG_FILE_PATH));
+ }
public static void main(String[] args) {
@@ -51,6 +62,7 @@ public class SyncData{
masterDruid.setMaxWait(Integer.valueOf(PropKit.get("dbMaxWait")));
masterDruid.setRemoveAbandoned(Boolean.valueOf(PropKit.get("dbRemoveAbandoned")));
masterDruid.setRemoveAbandonedTimeoutMillis(Integer.valueOf(PropKit.get("dbRemoveAbandonedTimeout")));
+ masterDruid.setConnectionProperties(DRUID_CONFIG_FILE_PATH);//druid 配置文件路径
ActiveRecordPlugin masterArp=new ActiveRecordPlugin("masterDataSource",masterDruid);
masterArp.setShowSql(Boolean.valueOf(PropKit.get("dbShowSql")));
masterDruid.start();
@@ -63,26 +75,25 @@ public class SyncData{
Conn.createConn(syncDbInfos);
logger.info("分库数据库连接池创建完成");
// 定时周期执行线程池 用于周期执行线程的运行
- ScheduledExecutorService scheduleService = Executors.newScheduledThreadPool(syncDbInfos.size());
+ ScheduledExecutorService masterToSlavePool = Executors.newScheduledThreadPool(PropKit.use("config.properties").getInt("master.to.slave.pool.num", 2));
+ ScheduledExecutorService slaveToMasterPool = Executors.newScheduledThreadPool(PropKit.use("config.properties").getInt("slave.to.master.pool.num", 3));
logger.info("创建线程池完毕 数量大小为"+syncDbInfos.size());
// 使用scheduleWithFixedDleay在上一个线程任务执行完成后 5分钟执行下一次任务
for(SyncDbInfo syncDbInfo : syncDbInfos){
// 主库向分库同步数据
SyncThread syncThread = Duang.duang(new SyncThread(syncDbInfo));
logger.info("创建主库同步分库线程执行任务");
- scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS);
+ masterToSlavePool.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS);
// 分库向主库同步数据
logger.info("创建分库数据同步到主库线程执行任务");
SyncSlaveToMasterThread syncSlaveToMasterThread = Duang.duang(new SyncSlaveToMasterThread(syncDbInfo));
- scheduleService.scheduleWithFixedDelay(syncSlaveToMasterThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS);
+ slaveToMasterPool.scheduleWithFixedDelay(syncSlaveToMasterThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS);
}
}else{
- logger.info("获取同步记录信息失败 请检查数据库数据信息");
logger.error("获取同步记录信息失败 请检查数据库数据信息");
}
} catch(Exception e) {
- e.printStackTrace();
- logger.error("数据同步启动发生异常 信息为:"+e.getMessage());
+ logger.error("数据同步启动发生异常",e);
}
}
diff --git a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java
index d3b0656..7955c92 100644
--- a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java
+++ b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java
@@ -7,22 +7,27 @@ import java.util.List;
import java.util.Set;
import org.apache.log4j.Logger;
-import com.nms.interceptor.SyncMissionResultStatisticalInterceptor;
-import com.nms.main.SyncData;
+
+import com.alibaba.druid.util.StringUtils;
import com.alibaba.fastjson.JSON;
import com.jfinal.aop.Before;
+import com.jfinal.json.Json;
+import com.jfinal.kit.Prop;
+import com.jfinal.kit.PropKit;
import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.DbPro;
import com.jfinal.plugin.activerecord.IAtom;
import com.jfinal.plugin.activerecord.Record;
-import com.jfinal.plugin.activerecord.tx.Tx;
+import com.nms.interceptor.SyncMissionResultStatisticalInterceptor;
+import com.nms.main.SyncData;
import com.nms.model.SyncDbInfo;
+import com.nms.util.StopWatch;
@Before({SyncMissionResultStatisticalInterceptor.class})
public class SyncSlaveToMasterThread implements Runnable{
private Logger logger = Logger.getLogger(this.getClass());
private SyncDbInfo syncDbInfo;
-
+ private boolean firstRun= true;
public SyncSlaveToMasterThread() {
super();
}
@@ -34,8 +39,11 @@ public class SyncSlaveToMasterThread implements Runnable{
@Override
public void run() {
+ Thread.currentThread().setName(syncDbInfo.getIp()+"->同步主库");
String errorTableName=null;
String errorUrl=null;
+ StopWatch sw = new StopWatch();
+ sw.start();
try {
// 主库向分库同步数据
logger.info("开始分库数据同步主库");
@@ -53,6 +61,10 @@ public class SyncSlaveToMasterThread implements Runnable{
//logger.info("查询分库需要同步到主库的数据信息"+JSON.toJSONString(find));
if (find != null && find.size() > 0) {
for (final Record record : find) {
+ long total = 0;
+ //同步 表 開始
+ sw.tag("s_" +record.getInt("event")+ record.getStr("table_name"));
+
//logger.info("分库数据同步到主库 正在操作的表名为:"+ record.getStr("table_name"));
errorTableName=record.getStr("table_name");
//针对个别特殊表动态条件
@@ -67,10 +79,29 @@ public class SyncSlaveToMasterThread implements Runnable{
columns.setLength(0);
columns.append(record.getStr("columns"));
}
+
+
+ final StringBuilder colRelation = new StringBuilder();
+ //整理关联查询的 column
+ if("*".equalsIgnoreCase(columns.toString())) {
+ colRelation.append("tt.*");
+ }else {
+ String[] cols = columns.toString().split(",");
+ colRelation.setLength(0);
+ for(String s : cols) {
+ if(!StringUtils.isEmpty(s)) {
+ colRelation.append(",tt.");
+ colRelation.append(s.trim());
+ }
+ }
+ colRelation.deleteCharAt(0);
+ }
+
// 循环同步数据标识
boolean flag = true;
// 判断表中的event事件 1代表insert 2代表update 3代表delete
if (record.getInt("event") == 1) {
+ sw.tag("s_insert"+ record.getStr("table_name"));
if(record.getInt("mode").equals(1)||record.getInt("mode").equals(2)){
while (flag) {
// 新增操作 取出最后更新id信息 查询增量数据
@@ -80,6 +111,7 @@ public class SyncSlaveToMasterThread implements Runnable{
record.getLong("last_id"));
//logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data));
if (data != null && data.size() > 0) {
+ total += data.size();
final Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name"));
if(record.getInt("mode").equals(2)&&record.getStr("table_name").contains("mission_result_table")) {
Set<Long> set = SyncData.getThreadlocalInsertMissionIds();
@@ -128,11 +160,10 @@ public class SyncSlaveToMasterThread implements Runnable{
masterDb.batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
updateDatas, record.getInt("batch_size"));
}
- logger.info("分库同步 detection_info_new 增量更新数据完成 表名为"+record.getStr("table_name"));
- logger.info("分库同步 detection_info_new 最后数据的id信息为"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId);
record.set("last_date", new Date());
slaveDb.update("table_sync_info", record);
+ logger.debug("表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" ,lastId : " + lastInsertId +",num:" + data.size());
return true;
}
});
@@ -147,11 +178,10 @@ public class SyncSlaveToMasterThread implements Runnable{
public boolean run() throws SQLException {
masterDb.batchSave(record.getStr("table_name"), data, record.getInt("batch_size"));
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
- logger.info("分库同步增量更新数据完成 表名为"+record.getStr("table_name"));
- logger.info("分库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId);
record.set("last_date", new Date());
slaveDb.update("table_sync_info", record);
+ logger.debug("表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" ,lastId : " + lastInsertId +",num:" + data.size());
return true;
}
});
@@ -209,10 +239,10 @@ public class SyncSlaveToMasterThread implements Runnable{
masterDb.batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
Object lastInsertId = data.get(data.size() - 1).get("id");
- logger.info("分库增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId);
record.set("last_date", new Date());
slaveDb.update("table_sync_info", record);
+ logger.debug("表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" ,lastId : " + lastInsertId +",num:" + insertDatas.size());
return true;
}else{
Object lastInsertId = data.get(data.size() - 1).get("id");
@@ -240,11 +270,10 @@ public class SyncSlaveToMasterThread implements Runnable{
masterDb.batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
updateDatas, record.getInt("batch_size"));
}
- logger.info("分库同步 loopmission_state_table 增量更新数据完成 表名为"+record.getStr("table_name"));
- logger.info("分库同步 loopmission_state_table 最后数据的id信息为"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId);
record.set("last_date", new Date());
slaveDb.update("table_sync_info", record);
+ logger.debug("表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" ,lastId : " + lastInsertId +",num:" + insertDatas.size());
return true;
}
}
@@ -258,37 +287,67 @@ public class SyncSlaveToMasterThread implements Runnable{
}
}
} else if (record.getInt("event") == 2 || record.getInt("event") == 3) {
+ Boolean allUpdate = PropKit.use("").getBoolean("detection_all_update", false);//是否启用全量同步
+ //监测信息最新表同步 全量更新
+ if((firstRun || allUpdate) && "detection_info_new".equalsIgnoreCase(record.getStr("table_name"))) {
+ long fromId = 0L;//开始id
+ String sql = "select max(id) last_id from table_event_log where table_name='detection_info_new' and event = 2 and id > "+ record.getLong("last_id");
+ Record maxIdRec = slaveDb.findFirst(sql);
+ if(maxIdRec != null) {
+ while(true) {
+ String dinSql = "select ID, `DETECTION_SET_INFO_ID`, `CHECK_WAY`, `DETECTION_STATE_INFO`, `PERFORMACE_DATA`, `CURRENT_TIMES`, `START_TIME`, `WAIT_TIME`, `DELAY_TIME`, `NEXT_CHECK_TIME`, `OFF_LINE`, `POLICE_LEVEL`, `DATA_CHECK_TIME`, `DATA_ARRIVE_TIME`, `DETECTIONED_STATE`, `NODE_IP`, `STATUS_CHANGE_TIME`, `DATA_CHECK_TIME_DIGITAL`, `DATA_ARRIVE_TIME_DIGITAL`, `SEQ_ID`, `DETECTION_INFO_ID`, `VALID`, `POLICE_EMERGENT` from detection_info_new din where id > ? order by id limit "+ record.getInt("batch_size") ;
+ List<Record> dinList = slaveDb.find(dinSql, fromId);
+ if(dinList!= null && dinList.size()>0) {
+ fromId = dinList.get(dinList.size()-1).getLong("ID");
+ for(Record r : dinList) {
+ r.remove("ID");
+ }
+ masterDb.batchUpdate("detection_info_new", "DETECTION_SET_INFO_ID,SEQ_ID", dinList, record.getInt("batch_size"));
+ logger.debug("分库同步表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" ,fromId : " + fromId +",num:" + dinList.size());
+ }else {
+ Long newLastId = maxIdRec.getLong("last_id");
+ record.set("last_id", Long.valueOf(newLastId));
+ record.set("last_date", new Date());
+ slaveDb.update("table_sync_info", record);
+ logger.debug(String.format("new表全量更新完成,最后update id:%s ", newLastId) );
+ break;
+ }
+ }
+ }
+ continue;
+ }
// table_event_log sync_db_info两表查询获取修改数据信息 进行数据修改
while (flag) {
- final List<Record> datas = slaveDb.find(
- " select * from table_event_log where table_name = '" + record.getStr("table_name")
- + "' and id > " + record.getLong("last_id") + " and event = "
- + record.getInt("event") + " order by id asc limit " + record.getInt("batch_size"));
+ String sql = "select max(id) lastId,count(1) total from ( select id from table_event_log l "
+ + " where table_name = '"+record.getStr("table_name")+"' "
+ + " and event = "+record.getInt("event")
+ + " and id > "+record.getLong("last_id")
+ +" order by id asc limit " + record.getInt("batch_size") +" ) llll";
+
+ Record statRec = slaveDb.findFirst(sql);
+ final Long num = statRec.getLong("total");//将要同步的数据量
+ final String lastId = statRec.getStr("lastId");//将要同步的最大id
//logger.info("分库同步到主库数据的修改或者删除数据信息为"+JSON.toJSONString(datas));
- if (datas != null && datas.size() > 0) {
+ if (num > 0) {
+ total += num;
slaveDb.tx(new IAtom() {
@Override
public boolean run() throws SQLException {
return masterDb.tx(new IAtom() {
@Override
public boolean run() throws SQLException {
- List<Long> updateIds = new ArrayList<Long>();
List<Record> deleteRecords=new ArrayList<Record>();
- StringBuilder handleStr=new StringBuilder();
- for (int i = 0; i < datas.size(); i++) {
- updateIds.add(datas.get(i).getLong("target_id"));
- if(i==0) {
- handleStr.append("?");
- }else {
- handleStr.append(",?");
- }
- }
- logger.info("分库同步到主库数据的操作数据的ID信息"+JSON.toJSONString(updateIds));
+
if (record.getInt("event") == 2) {
- List<Record> updateDatas = slaveDb
- .find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where "
- + record.getStr("id_name") + " in (" + handleStr + ") ",
- updateIds.toArray());
+ String sql = "select "+colRelation.toString()+" from " + record.getStr("table_name") +" tt "
+ + " left join table_event_log log on tt."+record.getStr("id_name")+"=log.target_id "
+ + " where log.table_name = '"+record.getStr("table_name")+"'"
+ + " and log.event = " +record.getInt("event")
+ + " and log.id < " + (Long.valueOf(lastId)+1)
+ + " and log.id > "+record.getLong("last_id")
+ + " order by log.id asc limit " + record.getInt("batch_size");
+ logger.debug(String.format("查询sql:%s", sql));
+ List<Record> updateDatas = slaveDb.find(sql);
//logger.info("分库修改的数据信息为"+JSON.toJSONString(updateDatas));
if (updateDatas != null && updateDatas.size() > 0) {
if(record.getStr("table_name").equals("event_record_library")) {
@@ -312,31 +371,34 @@ public class SyncSlaveToMasterThread implements Runnable{
}
SyncData.setThreadlocalUpdateMissionIds(missionIds);
masterDb.batchUpdate(record.getStr("table_name"), "mission_id,seq_id",updateDatas, record.getInt("batch_size"));
- }else {
- if(record.getStr("table_name").equals("detection_info_new")) {
- for(Record updateData:updateDatas) {
- Record findFirst = masterDb.findFirst("select * from " + record.getStr("table_name")+" where DETECTION_SET_INFO_ID = ? and SEQ_ID = ? ",updateData.get("DETECTION_SET_INFO_ID"),updateData.get("SEQ_ID"));
- updateData.set(record.getStr("id_name"),findFirst.getLong(record.getStr("id_name")));
- }
+ }else if(record.getStr("table_name").equals("detection_info_new")){
+ for(Record updateData:updateDatas) {
+ updateData.remove(record.getStr("id_name"));
}
- if(record.getStr("table_name").equals("loopmission_state_table")) {
- Set<Long> missionIds = SyncData.getThreadlocalLoopUpdateMissionIds();
- for(Record updateData:updateDatas) {
- missionIds.add(updateData.getLong(record.getStr("id_name")));
- }
- SyncData.setThreadlocalLoopUpdateMissionIds(missionIds);
+ masterDb.batchUpdate(record.getStr("table_name"), "DETECTION_SET_INFO_ID,SEQ_ID",updateDatas, record.getInt("batch_size"));
+
+ }else if(record.getStr("table_name").equals("loopmission_state_table")) {
+ Set<Long> missionIds = SyncData.getThreadlocalLoopUpdateMissionIds();
+ for(Record updateData:updateDatas) {
+ missionIds.add(updateData.getLong(record.getStr("id_name")));
}
+ SyncData.setThreadlocalLoopUpdateMissionIds(missionIds);
masterDb.batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
updateDatas, record.getInt("batch_size"));
}
}
- logger.info("分库同步主库修改数据任务完成");
} else if (record.getInt("event") == 3) {
- for (int i = 0; i < datas.size(); i++) {
+
+ String sql = "select target_id from table_event_log l "
+ + " where table_name = '"+record.getStr("table_name")+"' "
+ + " and event = "+record.getInt("event")
+ + " and id > "+record.getLong("last_id")+" limit " + record.getInt("batch_size");
+ List<Record> delList = slaveDb.find(sql);
+ for (int i = 0; i < delList.size(); i++) {
Record deleteRecord=new Record();
- deleteRecord.set(record.getStr("id_name"), datas.get(i).getLong("target_id"));
+ deleteRecord.set(record.getStr("id_name"), delList.get(i).getLong("target_id"));
//如果是针对 event_record_library 下两行数据使用 不是则仅仅赋值 无意义
- deleteRecord.set("old_id", datas.get(i).getLong("target_id"));
+ deleteRecord.set("old_id", delList.get(i).getLong("target_id"));
deleteRecord.set("db_id", syncDbInfo.get("id"));
deleteRecords.add(deleteRecord);
}
@@ -345,11 +407,9 @@ public class SyncSlaveToMasterThread implements Runnable{
}else {
masterDb.batch("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+"=?",record.getStr("id_name"),deleteRecords,record.getInt("batch_size"));
}
- logger.info("分库同步主库删除数据任务完成");
}
- Object lastUpdateId = datas.get(datas.size() - 1).get("id");
- logger.info("分库同步获取最后一次操作数据的数据ID信息为"+lastUpdateId);
- record.set("last_id", lastUpdateId);
+ logger.debug("表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" ,lastId : " + lastId +",num:" + num);
+ record.set("last_id", Long.valueOf(lastId));
record.set("last_date", new Date());
slaveDb.update("table_sync_info", record);
return true;
@@ -357,20 +417,21 @@ public class SyncSlaveToMasterThread implements Runnable{
});
}
});
- logger.info("修改分库table_sync_info最后操作数据信息 用于下次同步操作完成");
} else {
flag = false;
}
}
+
}
+ sw.tag("e_" +record.getInt("event")+ record.getStr("table_name"));
+ logger.debug("分库同步表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" , 耗时 : " + sw.toString(sw.between("e_" +record.getInt("event")+ record.getStr("table_name"), "s_" +record.getInt("event")+ record.getStr("table_name")))+" ,totalnum : " + total);
}
}
- logger.info("分库数据同步主库结束");
- logger.info("##################################################");
+ sw.end();
+ logger.info(errorUrl + " 分库数据同步主库结束,耗时:"+sw.toString(sw.total()));
+ firstRun = false;//修改首次启动标志位
} catch (Exception e) {
- logger.error("分库同步主库数据连接为:"+errorUrl);
logger.error("分库同步主库数据当前操作的异常表名为:"+errorTableName);
- logger.error("分库数据同步主库发生错误 异常信息为:"+e.getMessage());
logger.error("分库数据同步主库发生错误 异常信息",e);
// 如果出现异常信息 清楚线程变量 不用进行统计
SyncData.removeThreadlocalUpdateMissionIds();
@@ -379,6 +440,7 @@ public class SyncSlaveToMasterThread implements Runnable{
SyncData.removeThreadlocalLoopInsertMissionIds();
e.printStackTrace();
}
+
}
public SyncDbInfo getSyncDbInfo() {
diff --git a/nms_sync/src/com/nms/thread/SyncThread.java b/nms_sync/src/com/nms/thread/SyncThread.java
index 49647fc..4b64e6c 100644
--- a/nms_sync/src/com/nms/thread/SyncThread.java
+++ b/nms_sync/src/com/nms/thread/SyncThread.java
@@ -14,6 +14,7 @@ import com.jfinal.plugin.activerecord.Record;
import com.nms.interceptor.SyncDataInterceptor;
import com.nms.interceptor.SyncSocketInterceptor;
import com.nms.model.SyncDbInfo;
+import com.nms.util.StopWatch;
import com.jfinal.plugin.activerecord.tx.Tx;
/**
* 数据同步功能线程
@@ -37,7 +38,10 @@ public class SyncThread implements Runnable {
@Override
public void run() {
+ Thread.currentThread().setName("主库同步->" + syncDbInfo.getIp());
String errorTableName=null;
+ StopWatch sw = new StopWatch();
+ sw.start();
try {
logger.info("开始主库数据同步分库任务");
// 获取url路径
@@ -48,12 +52,16 @@ public class SyncThread implements Runnable {
final DbPro masterDb = Db.use("masterDataSource");
final DbPro slaveDb = Db.use(url.toString());
-
+ logger.debug("數據源獲取成功");
List<Record> find = masterDb.find("select * from table_sync_info where db_id=? ",
syncDbInfo.get("id"));
//logger.info("查询主库须向分库同步数据信息"+JSON.toJSONString(find));
if (find != null && find.size() > 0) {
for (final Record record : find) {
+ logger.debug("主库同步表开始:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +",lastId:" + record.getStr("last_id"));
+ long total = 0;
+ //同步 表 開始
+ sw.tag("s_" +record.getInt("event")+ record.getStr("table_name"));
//logger.info("主库数据同步到分库 正在操作的表名为:"+ record.getStr("table_name"));
errorTableName=record.getStr("table_name");
//针对个别特殊表动态条件
@@ -270,13 +278,15 @@ public class SyncThread implements Runnable {
}
}
}
+ sw.tag("e_" +record.getInt("event")+ record.getStr("table_name"));
+ logger.debug("主库同步表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" , 耗时 : " + sw.toString(sw.between("e_" +record.getInt("event")+ record.getStr("table_name"), "s_" +record.getInt("event")+ record.getStr("table_name")))+" ,totalnum : " + total);
}
}
- logger.info("主库数据同步分库结束");
+ sw.end();
+ logger.info("主库数据同步分库结束,耗时:"+sw.toString(sw.total()));
logger.info("*****************************************************");
} catch (Exception e) {
logger.error("主库同步分库数据当前操作的异常表名为:"+errorTableName);
- logger.error("主库数据同步分库发生错误 异常信息为:"+e.getMessage());
logger.error("主库数据同步分库发生错误 异常信息",e);
e.printStackTrace();
}
diff --git a/nms_sync/src/com/nms/util/StopWatch.java b/nms_sync/src/com/nms/util/StopWatch.java
new file mode 100644
index 0000000..895f9ac
--- /dev/null
+++ b/nms_sync/src/com/nms/util/StopWatch.java
@@ -0,0 +1,164 @@
+package com.nms.util;
+
+import java.util.LinkedHashMap;
+
+/**
+ * 秒表计时器
+ * @author fang
+ *
+ */
+public class StopWatch {
+ private static final long SEC_MILL = 1000;
+ private static final long MIN_MILL = 60 * SEC_MILL;
+ private static final long HOUR_MILL = 60 * MIN_MILL;
+ private static final long DAY_MILL = 24 * HOUR_MILL;
+ private long start;
+ private long end;
+ private LinkedHashMap<String,Long> tagMap = new LinkedHashMap<String,Long>();
+
+ public StopWatch(){
+ start();
+ }
+
+ public static StopWatch newStopWacth(){
+ return new StopWatch();
+ }
+
+ /**
+ * 计时器开始
+ * @return
+ */
+ public long start(){
+ this.start = System.currentTimeMillis();
+ return start;
+ }
+
+ /**
+ * 计时器结束
+ * @return
+ */
+ public long end(){
+ this.end = System.currentTimeMillis();
+ return end;
+ }
+
+
+ public long tag(String tag){
+ long l = System.currentTimeMillis();
+ this.tagMap.put(tag, l);
+ return l;
+ }
+
+ /**
+ * 计算两个 tag 之间的时间差
+ * @param b
+ * @param a
+ * @return
+ */
+ public long between(String b,String a){
+ Long l1 = this.tagMap.get(b);
+ Long l2 = this.tagMap.get(a);
+ if(l1 != null && l2 != null){
+ return l1-l2;
+ }
+ return -1;
+ }
+
+ /**
+ * 兩個 tag之間的時間差
+ * @param b
+ * @param a
+ * @return
+ */
+ public String timeBetween(String b,String a) {
+ long between = between(b, a);
+ return toString(between);
+ }
+
+
+ public static String toString(long l){
+ StringBuilder sb = new StringBuilder();
+ if(l >= DAY_MILL){
+ sb.append((l/DAY_MILL));
+ sb.append( "天");
+ l = l % DAY_MILL;
+ }
+ if(l >= HOUR_MILL){
+ sb.append((l/HOUR_MILL));
+ sb.append( "小时");
+ l = l % HOUR_MILL;
+ }
+ if(l >= MIN_MILL){
+ sb.append((l/MIN_MILL));
+ sb.append( "分");
+ l = l % MIN_MILL;
+ }
+ if(l >= SEC_MILL){
+ sb.append((l/SEC_MILL));
+ sb.append( "秒");
+ l = l % SEC_MILL;
+ }
+
+ sb.append((l));
+ sb.append( "毫秒");
+
+ return sb.toString();
+ }
+
+ public String toString(){
+
+ return "";
+ }
+
+ /**
+ * 从开始到结束总耗时
+ * @return
+ */
+ public long total(){
+ long temp = System.currentTimeMillis();
+ if(this.end < this.start){
+ this.end = temp;
+ }
+ return end - start;
+ }
+
+
+ public void reset(){
+ this.tagMap.clear();
+ this.start();
+ }
+
+ public long getStart() {
+ return start;
+ }
+
+ public void setStart(long start) {
+ this.start = start;
+ }
+
+ public long getEnd() {
+ return end;
+ }
+
+ public void setEnd(long end) {
+ this.end = end;
+ }
+
+ public LinkedHashMap<String, Long> getTag() {
+ return tagMap;
+ }
+
+ public void LinkedHashMap(LinkedHashMap<String, Long> tag) {
+ this.tagMap = tag;
+ }
+
+ public static void main(String[] args) {
+ long s = System.currentTimeMillis();
+ long end = s +2*DAY_MILL+ 12 * MIN_MILL + 30*SEC_MILL + 388;
+
+ String string = StopWatch.toString(end -s);
+ System.out.println(string);
+
+ }
+
+}