summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchenjinsong <[email protected]>2018-09-27 16:25:51 +0800
committerchenjinsong <[email protected]>2018-09-27 16:25:51 +0800
commit4929220ca98b643abaace509e80a4b73e5508a87 (patch)
treec70cc228a5e064f62031f9276dd5de946994373c
initial commit
-rw-r--r--nms_sync/.classpath14
-rw-r--r--nms_sync/.project17
-rw-r--r--nms_sync/.settings/org.eclipse.core.resources.prefs2
-rw-r--r--nms_sync/.settings/org.eclipse.jdt.core.prefs12
-rw-r--r--nms_sync/conf/config.properties4
-rw-r--r--nms_sync/conf/db.properties17
-rw-r--r--nms_sync/conf/log4j.properties13
-rw-r--r--nms_sync/lib/c3p0-0.9.1.2.jarbin0 -> 610790 bytes
-rw-r--r--nms_sync/lib/druid-1.0.29.jarbin0 -> 2148117 bytes
-rw-r--r--nms_sync/lib/fastjson-1.2.47.jarbin0 -> 546572 bytes
-rw-r--r--nms_sync/lib/jfinal-3.4-bin-with-src.jarbin0 -> 1525392 bytes
-rw-r--r--nms_sync/lib/log4j-1.2.15.jarbin0 -> 391834 bytes
-rw-r--r--nms_sync/lib/mysql-connector-java-5.1.0-bin.jarbin0 -> 566623 bytes
-rw-r--r--nms_sync/src/com/nms/interceptor/SyncDataInterceptor.java117
-rw-r--r--nms_sync/src/com/nms/interceptor/SyncStoredProcedure.java51
-rw-r--r--nms_sync/src/com/nms/main/Conn.java40
-rw-r--r--nms_sync/src/com/nms/main/SyncData.java66
-rw-r--r--nms_sync/src/com/nms/model/SyncDbInfo.java11
-rw-r--r--nms_sync/src/com/nms/model/TableEventLog.java11
-rw-r--r--nms_sync/src/com/nms/model/TableSyncInfo.java11
-rw-r--r--nms_sync/src/com/nms/model/basemodel/BaseSyncDbInfo.java60
-rw-r--r--nms_sync/src/com/nms/model/basemodel/BaseTableEventLog.java52
-rw-r--r--nms_sync/src/com/nms/model/basemodel/BaseTableSyncInfo.java116
-rw-r--r--nms_sync/src/com/nms/test/TestClass.java304
-rw-r--r--nms_sync/src/com/nms/test/TestExecutors.java32
-rw-r--r--nms_sync/src/com/nms/test/TestThread.java37
-rw-r--r--nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java196
-rw-r--r--nms_sync/src/com/nms/thread/SyncThread.java216
-rw-r--r--nms_sync/src/com/nms/util/GeneratorUtil.java62
29 files changed, 1461 insertions, 0 deletions
diff --git a/nms_sync/.classpath b/nms_sync/.classpath
new file mode 100644
index 0000000..a9c6d9f
--- /dev/null
+++ b/nms_sync/.classpath
@@ -0,0 +1,14 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" path="src"/>
+ <classpathentry kind="src" path="conf"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
+ <classpathentry kind="lib" path="lib/log4j-1.2.15.jar"/>
+ <classpathentry kind="lib" path="lib/mysql-connector-java-5.1.0-bin.jar"/>
+ <classpathentry kind="lib" path="lib/fastjson-1.2.47.jar"/>
+ <classpathentry kind="lib" path="lib/druid-1.0.29.jar"/>
+ <classpathentry kind="lib" path="lib/jfinal-3.4-bin-with-src.jar"/>
+ <classpathentry kind="lib" path="lib/c3p0-0.9.1.2.jar"/>
+ <classpathentry kind="output" path="bin"/>
+</classpath>
diff --git a/nms_sync/.project b/nms_sync/.project
new file mode 100644
index 0000000..9f8cd42
--- /dev/null
+++ b/nms_sync/.project
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>nms_sync</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
diff --git a/nms_sync/.settings/org.eclipse.core.resources.prefs b/nms_sync/.settings/org.eclipse.core.resources.prefs
new file mode 100644
index 0000000..99f26c0
--- /dev/null
+++ b/nms_sync/.settings/org.eclipse.core.resources.prefs
@@ -0,0 +1,2 @@
+eclipse.preferences.version=1
+encoding/<project>=UTF-8
diff --git a/nms_sync/.settings/org.eclipse.jdt.core.prefs b/nms_sync/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..d17b672
--- /dev/null
+++ b/nms_sync/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,12 @@
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
+org.eclipse.jdt.core.compiler.codegen.methodParameters=do not generate
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7
+org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
+org.eclipse.jdt.core.compiler.compliance=1.7
+org.eclipse.jdt.core.compiler.debug.lineNumber=generate
+org.eclipse.jdt.core.compiler.debug.localVariable=generate
+org.eclipse.jdt.core.compiler.debug.sourceFile=generate
+org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
+org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
+org.eclipse.jdt.core.compiler.source=1.7
diff --git a/nms_sync/conf/config.properties b/nms_sync/conf/config.properties
new file mode 100644
index 0000000..c32ace2
--- /dev/null
+++ b/nms_sync/conf/config.properties
@@ -0,0 +1,4 @@
+#\u4e3b\u5e93\u540c\u6b65\u5206\u5e93\u6570\u636e\u7ebf\u7a0b\u65f6\u95f4\u5dee
+syncMaterToSlaveTime=30000
+#\u4e3b\u5e93\u540c\u6b65\u5206\u5e93\u6570\u636e\u7ebf\u7a0b\u65f6\u95f4\u5dee
+syncSlaveToMaterTime=60000 \ No newline at end of file
diff --git a/nms_sync/conf/db.properties b/nms_sync/conf/db.properties
new file mode 100644
index 0000000..f9730b4
--- /dev/null
+++ b/nms_sync/conf/db.properties
@@ -0,0 +1,17 @@
+#dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}]
+#\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740
+dburl=jdbc:mysql://10.0.6.247:3306/nms-dev
+#\u6570\u636e\u5e93\u8d26\u6237\u540d
+dbusername=root
+#\u6570\u636e\u5e93\u5bc6\u7801
+dbpassword=111111
+#\u6570\u636e\u5e93\u540d\u79f0
+dbname=nms-dev
+#\u8fde\u63a5\u6c60\u521d\u59cb\u5316\u5927\u5c0f
+dbInitialSize=1
+#\u6700\u5927\u8fde\u63a5\u6570
+dbMaxActive=2
+#\u6700\u5c0f\u8fde\u63a5\u6570
+dbMinIdle=1
+#\u6700\u5927\u7b49\u5f85\u8fde\u63a5\u65f6\u95f4
+dbMaxWait=60000 \ No newline at end of file
diff --git a/nms_sync/conf/log4j.properties b/nms_sync/conf/log4j.properties
new file mode 100644
index 0000000..10f0a91
--- /dev/null
+++ b/nms_sync/conf/log4j.properties
@@ -0,0 +1,13 @@
+log4j.rootLogger=DEBUG, stdout, file
+#log4j.rootLogger=ERROR, stdout, file
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%n%-d{yyyy-MM-dd HH:mm:ss}%n[%p]-[Thread: %t]-[%C.%M()]: %m%n
+log4j.appender.stdout.layout.ConversionPattern=%d %p (%F:%L) [%t] - <%m>%n
+
+# Output to the File
+#log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
+#log4j.appender.file.DatePattern='_'yyyy-MM-dd'.log'
+#log4j.appender.file.File=./sync.log
+#log4j.appender.file.layout=org.apache.log4j.PatternLayout
+#log4j.appender.file.layout.ConversionPattern=%n%-d{yyyy-MM-dd HH:mm:ss}%n[%p]-[Thread: %t]-[%C.%M()]: %m%n \ No newline at end of file
diff --git a/nms_sync/lib/c3p0-0.9.1.2.jar b/nms_sync/lib/c3p0-0.9.1.2.jar
new file mode 100644
index 0000000..0f42d60
--- /dev/null
+++ b/nms_sync/lib/c3p0-0.9.1.2.jar
Binary files differ
diff --git a/nms_sync/lib/druid-1.0.29.jar b/nms_sync/lib/druid-1.0.29.jar
new file mode 100644
index 0000000..9278cc3
--- /dev/null
+++ b/nms_sync/lib/druid-1.0.29.jar
Binary files differ
diff --git a/nms_sync/lib/fastjson-1.2.47.jar b/nms_sync/lib/fastjson-1.2.47.jar
new file mode 100644
index 0000000..f342bca
--- /dev/null
+++ b/nms_sync/lib/fastjson-1.2.47.jar
Binary files differ
diff --git a/nms_sync/lib/jfinal-3.4-bin-with-src.jar b/nms_sync/lib/jfinal-3.4-bin-with-src.jar
new file mode 100644
index 0000000..e0f28c5
--- /dev/null
+++ b/nms_sync/lib/jfinal-3.4-bin-with-src.jar
Binary files differ
diff --git a/nms_sync/lib/log4j-1.2.15.jar b/nms_sync/lib/log4j-1.2.15.jar
new file mode 100644
index 0000000..c930a6a
--- /dev/null
+++ b/nms_sync/lib/log4j-1.2.15.jar
Binary files differ
diff --git a/nms_sync/lib/mysql-connector-java-5.1.0-bin.jar b/nms_sync/lib/mysql-connector-java-5.1.0-bin.jar
new file mode 100644
index 0000000..d848c13
--- /dev/null
+++ b/nms_sync/lib/mysql-connector-java-5.1.0-bin.jar
Binary files differ
diff --git a/nms_sync/src/com/nms/interceptor/SyncDataInterceptor.java b/nms_sync/src/com/nms/interceptor/SyncDataInterceptor.java
new file mode 100644
index 0000000..d1c4943
--- /dev/null
+++ b/nms_sync/src/com/nms/interceptor/SyncDataInterceptor.java
@@ -0,0 +1,117 @@
+package com.nms.interceptor;
+
+
+import java.util.List;
+import org.apache.log4j.Logger;
+
+import com.alibaba.fastjson.JSON;
+import com.jfinal.aop.Interceptor;
+import com.jfinal.aop.Invocation;
+import com.jfinal.plugin.activerecord.Db;
+import com.jfinal.plugin.activerecord.Record;
+import com.nms.model.SyncDbInfo;
+import com.nms.thread.SyncThread;
+
+public class SyncDataInterceptor implements Interceptor{
+ private Logger logger =Logger.getLogger(this.getClass());
+
+ @Override
+ public void intercept(Invocation inv) {
+ try{
+ logger.info("--------数据同步前 syncDataInterceptor拦截器拦截------------");
+ SyncThread target = inv.getTarget();
+ SyncDbInfo syncDbInfo = target.getSyncDbInfo();
+ String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/"
+ + syncDbInfo.get("database_name");
+ logger.info("当前数据库连接为 "+url);
+ //同步数据前 取出metadata表中最后一次同步的id信息 获取新增的数据信息 方便接下来修改表结构
+ Record metadataTableSyncInfo = Db.use("masterDataSource").findFirst("select * from table_sync_info where table_name='metadata' and event=1 and db_id=?",syncDbInfo.getId());
+ logger.info("获取metadata表中最后一次同步id的数据信息为 "+JSON.toJSONString(metadataTableSyncInfo));
+ //开始执行同步过程
+ inv.invoke();
+ //处理同步数据结束
+
+ //判断metadata表 是否有新增数据 如果有执行sql 修改表结构
+ if(metadataTableSyncInfo!=null){
+ List<Record> metadatas = Db.use(url).find("select m.*,cti.table_name from metadata m left join check_type_info cti on m.check_type_id=cti.id where m.id > ? ",metadataTableSyncInfo.getLong("last_id"));
+ logger.info("metadata表中新增数据信息查询结果为 "+JSON.toJSONString(metadatas));
+ if(metadatas!=null && metadatas.size()>0){
+ for(Record metadata:metadatas){
+ Record isExist = Db.use(url).findFirst("select COUNT(1) count from information_schema.`TABLES` WHERE TABLE_SCHEMA = '"+syncDbInfo.getStr("database_name")+"' and UPPER(TABLE_NAME) = ?",metadata.getStr("table_name"));
+ logger.info("判断metadata表新增数据是修改还是新增表操作结果为"+JSON.toJSONString(isExist));
+ //向数据库中添加新的字段
+ if(isExist.getInt("count")>0){
+ StringBuffer sqlString = new StringBuffer("alter table ");
+ sqlString.append(metadata.getStr("table_name").toUpperCase());
+ sqlString.append(" add(");
+ sqlString.append(metadata.getStr("filed_name")+" "+ toMysqlType(metadata.getStr("filed_type"))+")");
+ logger.info("修改metadata表结构 sql语句为"+sqlString.toString());
+ //执行添加字段
+ int resu =Db.use(url).update(sqlString.toString());
+ logger.info("修改表结构结果为 "+resu);
+ }
+ }
+ }
+ }
+ //判断check_type_info表 是否有新增数据 如果有执行存储过程 创建新表
+ List<Record> checkTypeInfos = Db.use(url).find(" select * from check_type_info where crete_state=0 ");
+ for(Record checkTypeInfo : checkTypeInfos){
+ //判断表是否存在
+ Record isExist = Db.use(url).findFirst("select COUNT(1) count from information_schema.`TABLES` WHERE TABLE_SCHEMA = '"+syncDbInfo.getStr("database_name")+"' and UPPER(TABLE_NAME) = ?",checkTypeInfo.getStr("TABLE_NAME"));
+ logger.info("check_type_info表中有新增数据 判断表是否创建"+JSON.toJSONString(isExist));
+ if(isExist.getInt("count")>0){
+ continue;
+ }else{
+ //创建表数据
+ String filedName ="";
+ String filedType ="";
+ StringBuffer sql= new StringBuffer();
+ StringBuffer cIndexFileds = new StringBuffer("data_check_time:seq_id:detection_set_info_id:");
+ List<Record> metadatas2 = Db.use(url).find("select * from metadata where 1=1 and check_type_id=? and state = '0' order by show_num asc",checkTypeInfo.getLong("ID"));
+ if(metadatas2!=null && metadatas2.size()>0) {
+ for(int i=0;i<metadatas2.size();i++){
+ filedName = metadatas2.get(i).getStr("filed_name");
+ sql.append(filedName+" ");
+ filedType = metadatas2.get(i).getStr("filed_type");
+ if(i != metadatas2.size()-1){
+ sql.append(toMysqlType(filedType)+",");
+ }else{
+ sql.append(toMysqlType(filedType));
+ }
+ //判断是否为统计
+ if(metadatas2.get(i).getStr("chart_state").equals("0")){
+ cIndexFileds.append(metadatas2.get(i).getStr("chart_state")+":");
+ }
+ }
+ }
+ logger.info("check_type_info新增数据创建表结构 参数信息 调用存储过程名称 pro_createTable");
+ logger.info("check_type_info新增数据创建表结构 参数信息 表名"+checkTypeInfo.getStr("TABLE_NAME"));
+ logger.info("check_type_info新增数据创建表结构 参数信息 sql"+sql.toString());
+ logger.info("check_type_info新增数据创建表结构 参数信息 字段名称"+cIndexFileds.toString());
+ SyncStoredProcedure syncStoreProcedure=new SyncStoredProcedure("pro_createTable",checkTypeInfo.getStr("TABLE_NAME"), sql.toString(),cIndexFileds.toString());
+ Db.use(url).execute(syncStoreProcedure);
+ logger.info("创建新表操作完成");
+ }
+ }
+
+ logger.info("--------数据同步前 syncDataInterceptor拦截器拦截结束------------");
+ }catch(Exception e){
+ e.printStackTrace();
+ logger.info("syncDataInterceptor拦截器内部程序出现异常信息"+e.getMessage());
+ }
+ }
+
+
+ private static String toMysqlType(String type){
+ type = type.trim().toLowerCase();
+ if(type.startsWith("date")){
+ type = type.replaceAll("date", "datetime");
+ }else if(type.startsWith("number")){
+ type = type.replaceAll("number", "bigint");
+ }else if(type.startsWith("varchar")){
+ type = type.replaceAll("varchar2", "varchar");
+ }
+ return type;
+ }
+
+}
diff --git a/nms_sync/src/com/nms/interceptor/SyncStoredProcedure.java b/nms_sync/src/com/nms/interceptor/SyncStoredProcedure.java
new file mode 100644
index 0000000..2389a57
--- /dev/null
+++ b/nms_sync/src/com/nms/interceptor/SyncStoredProcedure.java
@@ -0,0 +1,51 @@
+package com.nms.interceptor;
+
+import java.sql.CallableStatement;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.apache.log4j.Logger;
+
+import com.jfinal.plugin.activerecord.ICallback;
+
+public class SyncStoredProcedure implements ICallback{
+ private Logger logger=Logger.getLogger(this.getClass());
+ private String proName;
+ private String tableName;
+ private String filedAndType;
+ private String fileds;
+
+ public SyncStoredProcedure(String proName, String tableName, String filedAndType, String fileds) {
+ super();
+ this.proName = proName;
+ this.tableName = tableName;
+ this.filedAndType = filedAndType;
+ this.fileds = fileds;
+ }
+
+ @Override
+ public Object call(Connection conn) throws SQLException {
+ logger.info("开始调用存储过程任务");
+ CallableStatement proc=null;
+ try{
+ proc=conn.prepareCall("{call "+proName+"(?,?,?)}");
+ proc.setString(1,tableName);
+ proc.setString(2,filedAndType);
+ proc.setString(3, fileds);
+ proc.execute();
+ logger.info("调用存储过程任务结束");
+ } catch (Exception e){
+ logger.info("调用存储过程任务出现错误 存储过程名称"+proName+" 表名"+tableName+"参数 "+filedAndType+"------"+fileds);
+ e.printStackTrace();
+ } finally{
+ if(conn!=null){
+ conn.close();
+ }
+ if(proc!=null){
+ proc.close();
+ }
+ }
+ return null;
+ }
+}
+
diff --git a/nms_sync/src/com/nms/main/Conn.java b/nms_sync/src/com/nms/main/Conn.java
new file mode 100644
index 0000000..f9c0151
--- /dev/null
+++ b/nms_sync/src/com/nms/main/Conn.java
@@ -0,0 +1,40 @@
+package com.nms.main;
+
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+import com.jfinal.kit.PropKit;
+import com.jfinal.plugin.activerecord.ActiveRecordPlugin;
+import com.jfinal.plugin.druid.DruidPlugin;
+import com.nms.model.SyncDbInfo;
+
+/**
+ * 数据库连接相关
+ * @author Administrator
+ *
+ */
+public class Conn {
+ private Conn(){}
+
+ //初始化多数据源数据库连接
+ public static void createConn(List<SyncDbInfo> syncDbInfos){
+ Logger logger = Logger.getLogger(Conn.class);
+ logger.info("开始创建各分库数据库的连接池");
+ for (SyncDbInfo syncDbInfo : syncDbInfos) {
+ String url="jdbc:mysql://"+syncDbInfo.get("ip")+":"+syncDbInfo.get("port")+"/"+syncDbInfo.get("database_name");
+ logger.info("当前创建数据库连接信息为"+url);
+ //初始化各数据源插件
+ DruidPlugin druid=new DruidPlugin(url,(String)syncDbInfo.get("user"),(String)syncDbInfo.get("password"));
+ druid.setInitialSize(Integer.valueOf(PropKit.get("dbInitialSize")));
+ druid.setMaxActive(Integer.valueOf(PropKit.get("dbMaxActive")));
+ druid.setMinIdle(Integer.valueOf(PropKit.get("dbMinIdle")));
+ druid.setMaxWait(Integer.valueOf(PropKit.get("dbMaxWait")));
+ ActiveRecordPlugin arp=new ActiveRecordPlugin(url,druid);
+ arp.setShowSql(true);
+ druid.start();
+ arp.start();
+ }
+ logger.info("创建各分库数据库的连接池完成");
+ }
+}
diff --git a/nms_sync/src/com/nms/main/SyncData.java b/nms_sync/src/com/nms/main/SyncData.java
new file mode 100644
index 0000000..d78727c
--- /dev/null
+++ b/nms_sync/src/com/nms/main/SyncData.java
@@ -0,0 +1,66 @@
+package com.nms.main;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.jfinal.aop.Duang;
+import com.jfinal.kit.PropKit;
+import com.jfinal.plugin.activerecord.ActiveRecordPlugin;
+import com.jfinal.plugin.activerecord.Db;
+import com.jfinal.plugin.druid.DruidPlugin;
+import com.nms.model.SyncDbInfo;
+import com.nms.thread.SyncSlaveToMasterThread;
+import com.nms.thread.SyncThread;
+/**
+ * 数据同步主功能 相当于主动推送操作
+ * @author Administrator
+ *
+ */
+public class SyncData{
+ public static void main(String[] args) {
+ Logger logger = Logger.getLogger(SyncData.class);
+ logger.info("同步程序开始启动");
+ //从配置文件获取数据库连接信息
+ PropKit.use("db.properties");
+ //创建主数据库数据源
+ DruidPlugin masterDruid=new DruidPlugin(PropKit.get("dburl"),PropKit.get("dbusername"),PropKit.get("dbpassword"));
+ masterDruid.setInitialSize(Integer.valueOf(PropKit.get("dbInitialSize")));
+ masterDruid.setMaxActive(Integer.valueOf(PropKit.get("dbMaxActive")));
+ masterDruid.setMinIdle(Integer.valueOf(PropKit.get("dbMinIdle")));
+ masterDruid.setMaxWait(Integer.valueOf(PropKit.get("dbMaxWait")));
+ ActiveRecordPlugin masterArp=new ActiveRecordPlugin("masterDataSource",masterDruid);
+ masterArp.setShowSql(true);
+ masterDruid.start();
+ masterArp.start();
+ logger.info("加载配置文件 设置当前同步 masterDataSource 完成");
+ List<SyncDbInfo> syncDbInfos = SyncDbInfo.dao.use("masterDataSource").find("select * from sync_db_info");
+ logger.info("数据库获取其它分库 数据库连接信息"+JSON.toJSONString(syncDbInfos));
+ if(syncDbInfos!=null&&syncDbInfos.size()>0){
+ //创建其它数据源的连接
+ Conn.createConn(syncDbInfos);
+ logger.info("分库数据库连接池创建完成");
+ // 定时周期执行线程池 用于周期执行线程的运行
+ ScheduledExecutorService scheduleService = Executors.newScheduledThreadPool(syncDbInfos.size());
+ logger.info("创建线程池完毕 数量大小为"+syncDbInfos.size());
+ // 使用scheduleWithFixedDleay在上一个线程任务执行完成后 5分钟执行下一次任务
+ for(SyncDbInfo syncDbInfo : syncDbInfos){
+ // 主库向分库同步数据
+ SyncThread syncThread = Duang.duang(new SyncThread(syncDbInfo));
+ logger.info("创建主库同步分库线程执行任务");
+ scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS);
+ // 分库向主库同步数据
+ logger.info("创建分库数据同步到主库线程执行任务");
+ //scheduleService.scheduleWithFixedDelay(new SyncSlaveToMasterThread(syncDbInfo), 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS);
+ }
+ }else{
+ logger.info("获取同步记录信息失败 请检查数据库数据信息");
+ }
+ }
+}
diff --git a/nms_sync/src/com/nms/model/SyncDbInfo.java b/nms_sync/src/com/nms/model/SyncDbInfo.java
new file mode 100644
index 0000000..dbd2ed0
--- /dev/null
+++ b/nms_sync/src/com/nms/model/SyncDbInfo.java
@@ -0,0 +1,11 @@
+package com.nms.model;
+
+import com.nms.model.basemodel.BaseSyncDbInfo;
+
+/**
+ * Generated by JFinal.
+ */
+@SuppressWarnings("serial")
+public class SyncDbInfo extends BaseSyncDbInfo<SyncDbInfo> {
+ public static final SyncDbInfo dao = new SyncDbInfo().dao();
+}
diff --git a/nms_sync/src/com/nms/model/TableEventLog.java b/nms_sync/src/com/nms/model/TableEventLog.java
new file mode 100644
index 0000000..d7ee628
--- /dev/null
+++ b/nms_sync/src/com/nms/model/TableEventLog.java
@@ -0,0 +1,11 @@
+package com.nms.model;
+
+import com.nms.model.basemodel.BaseTableEventLog;
+
+/**
+ * Generated by JFinal.
+ */
+@SuppressWarnings("serial")
+public class TableEventLog extends BaseTableEventLog<TableEventLog> {
+ public static final TableEventLog dao = new TableEventLog().dao();
+}
diff --git a/nms_sync/src/com/nms/model/TableSyncInfo.java b/nms_sync/src/com/nms/model/TableSyncInfo.java
new file mode 100644
index 0000000..0c7f9e2
--- /dev/null
+++ b/nms_sync/src/com/nms/model/TableSyncInfo.java
@@ -0,0 +1,11 @@
+package com.nms.model;
+
+import com.nms.model.basemodel.BaseTableSyncInfo;
+
+/**
+ * Generated by JFinal.
+ */
+@SuppressWarnings("serial")
+public class TableSyncInfo extends BaseTableSyncInfo<TableSyncInfo> {
+ public static final TableSyncInfo dao = new TableSyncInfo().dao();
+}
diff --git a/nms_sync/src/com/nms/model/basemodel/BaseSyncDbInfo.java b/nms_sync/src/com/nms/model/basemodel/BaseSyncDbInfo.java
new file mode 100644
index 0000000..538137b
--- /dev/null
+++ b/nms_sync/src/com/nms/model/basemodel/BaseSyncDbInfo.java
@@ -0,0 +1,60 @@
+package com.nms.model.basemodel;
+
+import com.jfinal.plugin.activerecord.Model;
+import com.jfinal.plugin.activerecord.IBean;
+
+/**
+ * Generated by JFinal, do not modify this file.
+ */
+@SuppressWarnings("serial")
+public abstract class BaseSyncDbInfo<M extends BaseSyncDbInfo<M>> extends Model<M> implements IBean {
+
+ public void setId(java.lang.Long id) {
+ set("id", id);
+ }
+
+ public java.lang.Long getId() {
+ return getLong("id");
+ }
+
+ public void setIp(java.lang.String ip) {
+ set("ip", ip);
+ }
+
+ public java.lang.String getIp() {
+ return getStr("ip");
+ }
+
+ public void setPort(java.lang.Integer port) {
+ set("port", port);
+ }
+
+ public java.lang.Integer getPort() {
+ return getInt("port");
+ }
+
+ public void setDatabaseName(java.lang.String databaseName) {
+ set("database_name", databaseName);
+ }
+
+ public java.lang.String getDatabaseName() {
+ return getStr("database_name");
+ }
+
+ public void setUser(java.lang.String user) {
+ set("user", user);
+ }
+
+ public java.lang.String getUser() {
+ return getStr("user");
+ }
+
+ public void setPassword(java.lang.String password) {
+ set("password", password);
+ }
+
+ public java.lang.String getPassword() {
+ return getStr("password");
+ }
+
+}
diff --git a/nms_sync/src/com/nms/model/basemodel/BaseTableEventLog.java b/nms_sync/src/com/nms/model/basemodel/BaseTableEventLog.java
new file mode 100644
index 0000000..61f1f9d
--- /dev/null
+++ b/nms_sync/src/com/nms/model/basemodel/BaseTableEventLog.java
@@ -0,0 +1,52 @@
+package com.nms.model.basemodel;
+
+import com.jfinal.plugin.activerecord.Model;
+import com.jfinal.plugin.activerecord.IBean;
+
+/**
+ * Generated by JFinal, do not modify this file.
+ */
+@SuppressWarnings("serial")
+public abstract class BaseTableEventLog<M extends BaseTableEventLog<M>> extends Model<M> implements IBean {
+
+ public void setId(java.lang.Long id) {
+ set("id", id);
+ }
+
+ public java.lang.Long getId() {
+ return getLong("id");
+ }
+
+ public void setTable(java.lang.String table) {
+ set("table", table);
+ }
+
+ public java.lang.String getTable() {
+ return getStr("table");
+ }
+
+ public void setEvent(java.lang.Integer event) {
+ set("event", event);
+ }
+
+ public java.lang.Integer getEvent() {
+ return getInt("event");
+ }
+
+ public void setTargetId(java.lang.Long targetId) {
+ set("target_id", targetId);
+ }
+
+ public java.lang.Long getTargetId() {
+ return getLong("target_id");
+ }
+
+ public void setIds(java.lang.String ids) {
+ set("ids", ids);
+ }
+
+ public java.lang.String getIds() {
+ return getStr("ids");
+ }
+
+}
diff --git a/nms_sync/src/com/nms/model/basemodel/BaseTableSyncInfo.java b/nms_sync/src/com/nms/model/basemodel/BaseTableSyncInfo.java
new file mode 100644
index 0000000..4557803
--- /dev/null
+++ b/nms_sync/src/com/nms/model/basemodel/BaseTableSyncInfo.java
@@ -0,0 +1,116 @@
+package com.nms.model.basemodel;
+
+import com.jfinal.plugin.activerecord.Model;
+import com.jfinal.plugin.activerecord.IBean;
+
+/**
+ * Generated by JFinal, do not modify this file.
+ */
+@SuppressWarnings("serial")
+public abstract class BaseTableSyncInfo<M extends BaseTableSyncInfo<M>> extends Model<M> implements IBean {
+
+ public void setId(java.lang.Long id) {
+ set("id", id);
+ }
+
+ public java.lang.Long getId() {
+ return getLong("id");
+ }
+
+ public void setTableName(java.lang.String tableName) {
+ set("table_name", tableName);
+ }
+
+ public java.lang.String getTableName() {
+ return getStr("table_name");
+ }
+
+ public void setEvent(java.lang.Integer event) {
+ set("event", event);
+ }
+
+ public java.lang.Integer getEvent() {
+ return getInt("event");
+ }
+
+ public void setIdName(java.lang.String idName) {
+ set("id_name", idName);
+ }
+
+ public java.lang.String getIdName() {
+ return getStr("id_name");
+ }
+
+ public void setIdNames(java.lang.String idNames) {
+ set("id_names", idNames);
+ }
+
+ public java.lang.String getIdNames() {
+ return getStr("id_names");
+ }
+
+ public void setColumns(java.lang.String columns) {
+ set("columns", columns);
+ }
+
+ public java.lang.String getColumns() {
+ return getStr("columns");
+ }
+
+ public void setWhere(java.lang.String where) {
+ set("where", where);
+ }
+
+ public java.lang.String getWhere() {
+ return getStr("where");
+ }
+
+ public void setLastId(java.lang.Long lastId) {
+ set("last_Id", lastId);
+ }
+
+ public java.lang.Long getLastId() {
+ return getLong("last_Id");
+ }
+
+ public void setLastDate(java.util.Date lastDate) {
+ set("last_date", lastDate);
+ }
+
+ public java.util.Date getLastDate() {
+ return get("last_date");
+ }
+
+ public void setDbId(java.lang.Long dbId) {
+ set("db_id", dbId);
+ }
+
+ public java.lang.Long getDbId() {
+ return getLong("db_id");
+ }
+
+ public void setMode(java.lang.Integer mode) {
+ set("mode", mode);
+ }
+
+ public java.lang.Integer getMode() {
+ return getInt("mode");
+ }
+
+ public void setBatchSize(java.lang.Integer batchSize) {
+ set("batch_size", batchSize);
+ }
+
+ public java.lang.Integer getBatchSize() {
+ return getInt("batch_size");
+ }
+
+ public void setInterceptor(java.lang.String interceptor) {
+ set("interceptor", interceptor);
+ }
+
+ public java.lang.String getInterceptor() {
+ return getStr("interceptor");
+ }
+
+}
diff --git a/nms_sync/src/com/nms/test/TestClass.java b/nms_sync/src/com/nms/test/TestClass.java
new file mode 100644
index 0000000..c9190ed
--- /dev/null
+++ b/nms_sync/src/com/nms/test/TestClass.java
@@ -0,0 +1,304 @@
+package com.nms.test;
+
+import java.sql.CallableStatement;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import com.alibaba.fastjson.JSON;
+import com.jfinal.kit.PropKit;
+import com.jfinal.kit.StrKit;
+import com.jfinal.plugin.activerecord.ActiveRecordPlugin;
+import com.jfinal.plugin.activerecord.Db;
+import com.jfinal.plugin.activerecord.ICallback;
+import com.jfinal.plugin.activerecord.Record;
+import com.jfinal.plugin.c3p0.C3p0Plugin;
+import com.jfinal.plugin.druid.DruidPlugin;
+import com.mysql.jdbc.Connection;
+import com.mysql.jdbc.PreparedStatement;
+import com.mysql.jdbc.Statement;
+import com.nms.main.Conn;
+
+public class TestClass {
+ private static Connection getConnection(){
+ String driver ="com.mysql.jdbc.Driver";
+ String url="jdbc:mysql://localhost:3306/nms_sync";
+ String username="root";
+ String password="root";
+ Connection conn=null;
+ try{
+ Class.forName(driver);
+ conn = (Connection) DriverManager.getConnection(url, username, password);
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+ return conn;
+ }
+
+ private static Connection getNmsConnection(){
+ String driver ="com.mysql.jdbc.Driver";
+ String url="jdbc:mysql://10.0.6.247:3306/nms";
+ String username="nms";
+ String password="nms";
+ Connection conn=null;
+ try{
+ Class.forName(driver);
+ conn = (Connection) DriverManager.getConnection(url, username, password);
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+ return conn;
+ }
+
+ @Test
+ public void testA(){
+ Connection conn = getConnection();
+ System.out.println(conn);
+ PreparedStatement pstmt=null;
+ try {
+ pstmt=(PreparedStatement) conn.prepareStatement("insert into sync_db_info (ip,port,database_name) values (?,?,?)");
+ pstmt.setString(1, "10.0.6.247");
+ pstmt.setInt(2, 8080);
+ pstmt.setString(3,"nms");
+ int id = pstmt.executeUpdate();
+ System.out.println(id);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }finally{
+ try {
+ if(pstmt!=null){
+ pstmt.close();
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Test
+ public void importData(){
+ Connection nmsConn = getNmsConnection();
+ Connection conn = getConnection();
+ PreparedStatement pstmt=null;
+ PreparedStatement pstmt2=null;
+ try {
+ pstmt=(PreparedStatement)nmsConn.prepareStatement("select table_name as tn from information_schema.tables where TABLE_SCHEMA='nms'");
+ ResultSet resultSet = pstmt.executeQuery();
+ pstmt2=(PreparedStatement)conn.prepareStatement("insert into table_sync_info (table_name,event,last_id,last_date,db_id,mode) values (?,?,?,?,?,?)");
+ while(resultSet.next()){
+ String tableName = resultSet.getString("tn");
+
+ pstmt2.setString(1, tableName);
+ pstmt2.setInt(2, 1);
+ pstmt2.setInt(3, 1);
+ pstmt2.setDate(4,new Date(System.currentTimeMillis()));
+ pstmt2.setInt(5, 1);
+ pstmt2.setInt(6,1);
+ pstmt2.addBatch();
+
+ pstmt2.setString(1, tableName);
+ pstmt2.setInt(2, 2);
+ pstmt2.setInt(3, 1);
+ pstmt2.setDate(4,new Date(System.currentTimeMillis()));
+ pstmt2.setInt(5, 1);
+ pstmt2.setInt(6,1);
+ pstmt2.addBatch();
+
+ pstmt2.setString(1, tableName);
+ pstmt2.setInt(2, 3);
+ pstmt2.setInt(3, 1);
+ pstmt2.setDate(4,new Date(System.currentTimeMillis()));
+ pstmt2.setInt(5, 1);
+ pstmt2.setInt(6,1);
+ pstmt2.addBatch();
+ }
+ pstmt2.executeBatch();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }finally{
+ try {
+ if(pstmt!=null){
+ pstmt.close();
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ try {
+ if(pstmt2!=null){
+ pstmt2.close();
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+
+ @Test
+ public void testDruid(){
+ C3p0Plugin c3p0=new C3p0Plugin("jdbc:mysql://localhost:3306/nms_sync","root","root");
+ c3p0.setInitialPoolSize(1);
+ c3p0.setMaxIdleTime(30);
+ c3p0.setMaxPoolSize(2);
+ c3p0.setMinPoolSize(1);
+ ActiveRecordPlugin arp=new ActiveRecordPlugin("c3p0",c3p0);
+
+ c3p0.start();
+ arp.start();
+
+ C3p0Plugin c3p02=new C3p0Plugin("jdbc:mysql://10.0.6.247:3306/nms","nms","nms");
+ c3p02.setInitialPoolSize(1);
+ c3p02.setMaxIdleTime(30);
+ c3p02.setMaxPoolSize(2);
+ c3p02.setMinPoolSize(1);
+ ActiveRecordPlugin arp2=new ActiveRecordPlugin("c3p02",c3p02);
+
+ c3p02.start();
+ arp2.start();
+
+ DruidPlugin druid=new DruidPlugin("jdbc:mysql://10.0.6.247:3306/nms-dev","nms","nms");
+ druid.setInitialSize(1);
+ druid.setMaxActive(2);
+ druid.setMinIdle(1);
+ druid.setMaxWait(60000);
+ ActiveRecordPlugin arp3=new ActiveRecordPlugin("druid",druid);
+
+ druid.start();
+ arp3.start();
+
+ List<Record> find = Db.find("select * from table_sync_info");
+ for (Record record : find) {
+ System.out.println(record);
+ }
+
+ System.out.println("----------------------------");
+
+ List<Record> find2 = Db.use("c3p02").find("select * from node_table");
+ for (Record record : find2) {
+ System.out.println(record);
+ }
+
+
+ System.out.println("----------------------------");
+ List<Record> find3 = Db.use("druid").find("select * from node_table");
+ for (Record record : find3) {
+ System.out.println(record);
+ }
+
+ }
+
+ @Test
+ public void testGetDataSize(){
+ DruidPlugin druid=new DruidPlugin("jdbc:mysql://10.0.6.247:3306/nms","nms","nms");
+ druid.setInitialSize(1);
+ druid.setMaxActive(2);
+ druid.setMinIdle(1);
+ druid.setMaxWait(60000);
+ ActiveRecordPlugin arp=new ActiveRecordPlugin(druid);
+ druid.start();
+ arp.start();
+ List<Record> find = Db.find("select count(*) size from detect_info_cpu");
+ System.out.println(find.get(0).getInt("size"));
+ }
+
+ @Test
+ public void testBatchDelete(){
+ PropKit.use("db.properties");
+ DruidPlugin masterDruid=new DruidPlugin(PropKit.get("dburl"),PropKit.get("dbusername"),PropKit.get("dbpassword"));
+ masterDruid.setInitialSize(1);
+ masterDruid.setMaxActive(2);
+ masterDruid.setMinIdle(1);
+ masterDruid.setMaxWait(600000);
+ ActiveRecordPlugin masterArp=new ActiveRecordPlugin("masterDataSource",masterDruid);
+ masterArp.setShowSql(true);
+ masterDruid.start();
+ masterArp.start();
+ List<Integer> ids=new ArrayList<Integer>();
+ ids.add(100026);
+ ids.add(100027);
+ List<Record> find = Db.find("select * from xt_yh_js_index where id in (?,?)",ids.toArray());
+ System.out.println(JSON.toJSON(find));
+ Object[] array = ids.toArray();
+ System.out.println(array);
+ }
+
+ @Test
+ public void testICallBack(){
+ DruidPlugin druid=new DruidPlugin("jdbc:mysql://10.0.6.247:3306/nms-synctest2","nms","nms");
+ druid.setInitialSize(1);
+ druid.setMaxActive(2);
+ druid.setMinIdle(1);
+ druid.setMaxWait(60000);
+ ActiveRecordPlugin arp3=new ActiveRecordPlugin("druid",druid);
+
+ druid.start();
+ arp3.start();
+ Db.execute(new ICallback(){
+ @Override
+ public Object call(java.sql.Connection conn) throws SQLException {
+ CallableStatement proc=null;
+ try{
+ proc=conn.prepareCall("{call pro_createTable(?,?,?)}");
+ proc.setString(1,"di_thtest");
+ proc.setString(2,"age bigint,name varchar(11)");
+ proc.setString(3, "data_check_time:seq_id:detection_set_info_id:");
+ proc.execute();
+ } catch (Exception e){
+ e.printStackTrace();
+ } finally{
+ if(conn!=null){
+ conn.close();
+ }
+ if(proc!=null){
+ proc.close();
+ }
+ }
+ return null;
+ }
+ });
+ }
+
+
+ @Test
+ public void addEventRecordLibraryData(){
+ PropKit.use("db.properties");
+ DruidPlugin masterDruid=new DruidPlugin(PropKit.get("dburl"),PropKit.get("dbusername"),PropKit.get("dbpassword"));
+ masterDruid.setInitialSize(1);
+ masterDruid.setMaxActive(2);
+ masterDruid.setMinIdle(1);
+ masterDruid.setMaxWait(600000);
+ ActiveRecordPlugin masterArp=new ActiveRecordPlugin("masterDataSource",masterDruid);
+ masterArp.setShowSql(true);
+ masterDruid.start();
+ masterArp.start();
+
+ List<Record> find = Db.find("select * from event_record_library");
+ List<Record> datas=new ArrayList<Record>();
+ for(Record data:find){
+ Record record=new Record();
+ record.set("table_name", "event_record_library");
+ record.set("event", 1);
+ record.set("target_id", data.getLong("id"));
+ datas.add(record);
+ }
+ Db.batchSave("table_event_log", datas, 500);
+ }
+}
diff --git a/nms_sync/src/com/nms/test/TestExecutors.java b/nms_sync/src/com/nms/test/TestExecutors.java
new file mode 100644
index 0000000..30a7c0d
--- /dev/null
+++ b/nms_sync/src/com/nms/test/TestExecutors.java
@@ -0,0 +1,32 @@
+package com.nms.test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.junit.Test;
+
+public class TestExecutors {
+ @Test
+ public void test1(){
+ ExecutorService service = Executors.newFixedThreadPool(5);
+ for(int i=0;i<6;i++){
+ service.execute(new TestThread(i));
+ }
+ }
+
+
+ class TestThread implements Runnable{
+ private int index;
+ public TestThread(int index){
+ this.index=index;
+ }
+
+ @Override
+ public void run(){
+ if(index==3){
+ throw new RuntimeException("error");
+ }
+ System.out.println("test"+index);
+ }
+ }
+}
diff --git a/nms_sync/src/com/nms/test/TestThread.java b/nms_sync/src/com/nms/test/TestThread.java
new file mode 100644
index 0000000..061bc72
--- /dev/null
+++ b/nms_sync/src/com/nms/test/TestThread.java
@@ -0,0 +1,37 @@
+package com.nms.test;
+
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.alibaba.fastjson.JSON;
+import com.jfinal.plugin.activerecord.ActiveRecordPlugin;
+import com.jfinal.plugin.activerecord.Db;
+import com.jfinal.plugin.activerecord.Record;
+import com.jfinal.plugin.druid.DruidPlugin;
+
+public class TestThread implements Runnable{
+ static{
+ DruidPlugin druid=new DruidPlugin("jdbc:mysql://localhost:3306/nms_sync","nms","nms");
+ druid.setInitialSize(1);
+ druid.setMaxActive(2);
+ druid.setMinIdle(1);
+ druid.setMaxWait(60000);
+ ActiveRecordPlugin arp=new ActiveRecordPlugin(druid);
+ druid.start();
+ arp.start();
+ }
+ @Override
+ public void run() {
+ System.out.println("进入线程任务");
+ Record find = Db.findFirst(" select * from table_sync_info where id =1 ");
+ System.out.println(JSON.toJSON(find));
+ System.out.println("线程任务结束");
+ }
+
+ public static void main(String[] args) {
+ ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
+ service.scheduleWithFixedDelay(new TestThread(), 0, 5000, TimeUnit.MILLISECONDS);
+ }
+}
diff --git a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java
new file mode 100644
index 0000000..18f1b31
--- /dev/null
+++ b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java
@@ -0,0 +1,196 @@
+package com.nms.thread;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+import com.alibaba.fastjson.JSON;
+import com.jfinal.aop.Before;
+import com.jfinal.plugin.activerecord.Db;
+import com.jfinal.plugin.activerecord.IAtom;
+import com.jfinal.plugin.activerecord.Record;
+import com.jfinal.plugin.activerecord.tx.Tx;
+import com.nms.model.SyncDbInfo;
+
+@Before({Tx.class})
+public class SyncSlaveToMasterThread implements Runnable{
+ private Logger logger = Logger.getLogger(this.getClass());
+ private SyncDbInfo syncDbInfo;
+
+ public SyncSlaveToMasterThread(SyncDbInfo syncDbInfo) {
+ this.syncDbInfo = syncDbInfo;
+ }
+
+ @Override
+ public void run() {
+ try {
+ // 主库向分库同步数据
+ logger.info("开始分库数据同步主库");
+ // 获取url路径
+ final String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/"
+ + syncDbInfo.get("database_name");
+ logger.info("当前分库数据库连接为"+url);
+ List<Record> find = Db.use(url).find("select * from table_sync_info");
+ logger.info("查询分库需要同步到主库的数据信息"+JSON.toJSONString(find));
+ if (find != null && find.size() > 0) {
+ for (final Record record : find) {
+ // 循环同步数据标识
+ boolean flag = true;
+ // 判断表中的event事件 1代表insert 2代表update 3代表delete
+ if (record.getInt("event") == 1) {
+ if(record.getInt("mode").equals(1)){
+ while (flag) {
+ // 新增操作 取出最后更新id信息 查询增量数据
+ final List<Record> data = Db.use(url)
+ .find("select * from " + record.getStr("table_name") + " where "
+ + record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit " + record.getInt("batch_size"),
+ record.getInt("last_id"));
+ logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data));
+ if (data != null && data.size() > 0) {
+ Db.use(url).tx(new IAtom() {
+ @Override
+ public boolean run() throws SQLException {
+ return Db.use().tx(new IAtom() {
+ @Override
+ public boolean run() throws SQLException {
+ Db.use("masterDataSource").batchSave(record.getStr("table_name"), data, record.getInt("batch_size"));
+ // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
+ logger.info("主库同步增量更新数据完成");
+ Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name"));
+ logger.info("主库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId));
+ record.set("last_id", lastInsertId);
+ record.set("last_date", new Date());
+ Db.use(url).update("table_sync_info", record);
+ return true;
+ }
+ });
+ }
+ });
+ logger.info("主库同步增量更新数据完成 修改最后同步ID");
+ } else {
+ flag = false;
+ }
+ }
+ }else if(record.getInt("mode").equals(0)){
+ //当数据库表结构主键不是自增时 增量更新的操作步骤
+ while (flag) {
+ // 新增操作 取出最后更新id信息 查询增量数据
+ final List<Record> data =Db.use(url)
+ .find("select * from table_event_log where table_name = '" + record.getStr("table_name")
+ + "' and id > " + record.getLong("last_id") + " and event = "
+ + record.getInt("event") + " order by id asc limit "+record.getInt("batch_size"));
+ //logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data));
+ if (data != null && data.size() > 0) {
+ //多数据源事务 主数据源嵌套子数据源
+ Db.use(url).tx(new IAtom() {
+ @Override
+ public boolean run() throws SQLException {
+ return Db.use().tx(new IAtom() {
+ @Override
+ public boolean run() throws SQLException {
+ List<Integer> insertIds = new ArrayList<Integer>();
+ StringBuffer insertStr = new StringBuffer();
+ for (int i = 0; i < data.size(); i++) {
+ insertIds.add(data.get(i).getInt("target_id"));
+ if (i == 0) {
+ insertStr.append("?");
+ } else {
+ insertStr.append(",?");
+ }
+ }
+ List<Record> insertDatas = Db.use(url)
+ .find(" select * from " + record.getStr("table_name") + " where "
+ + record.getStr("id_name") + " in (" + insertStr + ") ",
+ insertIds.toArray());
+ for(Record insertData:insertDatas){
+ Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
+ insertData.set(record.getStr("id_name"), seqData.getLong("seqId"));
+ }
+ Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
+ // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
+ Object lastInsertId = data.get(data.size() - 1).get("id");
+ logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId));
+ record.set("last_id", lastInsertId);
+ record.set("last_date", new Date());
+ Db.use(url).update("table_sync_info", record);
+ return true;
+ }
+ });
+ }
+ });
+ logger.info("增量更新数据任务结束");
+ } else {
+ flag = false;
+ }
+ }
+ }
+ } else if (record.getInt("event") == 2 || record.getInt("event") == 3) {
+ // table_event_log sync_db_info两表查询获取修改数据信息 进行数据修改
+ while (flag) {
+ final List<Record> datas = Db.find(
+ " select * from table_event_log where table_name = '" + record.getStr("table_name")
+ + "' and id > " + record.getInt("last_id") + " and event = "
+ + record.getInt("event") + " order by id asc limit " + record.getInt("batch_size"));
+ logger.info("分库同步到主库数据的修改或者删除数据信息为"+JSON.toJSONString(datas));
+ if (datas != null && datas.size() > 0) {
+ Db.use(url).tx(new IAtom() {
+ @Override
+ public boolean run() throws SQLException {
+ return Db.use().tx(new IAtom() {
+ @Override
+ public boolean run() throws SQLException {
+ List<Integer> updateIds = new ArrayList<Integer>();
+ StringBuffer deleteStr = new StringBuffer();
+ for (int i = 0; i < datas.size(); i++) {
+ updateIds.add(datas.get(i).getInt("target_id"));
+ if (i == 0) {
+ deleteStr.append("?");
+ } else {
+ deleteStr.append(",?");
+ }
+ }
+ logger.info("分库同步到主库数据的操作数据的ID信息"+JSON.toJSONString(updateIds));
+ if (record.getInt("event") == 2) {
+ List<Record> updateDatas = Db.use(url)
+ .find(" select * from " + record.getStr("table_name") + " where "
+ + record.getStr("id_name") + " in (" + deleteStr + ") ",
+ updateIds.toArray());
+ logger.info("分库修改的数据信息为"+JSON.toJSONString(updateDatas));
+ if (updateDatas != null && updateDatas.size() > 0) {
+ Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
+ updateDatas, record.getInt("batch_size"));
+ }
+ logger.info("分库同步主库修改数据任务完成");
+ } else if (record.getInt("event") == 3) {
+ Db.use("masterDataSource").update("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+" in ("
+ + deleteStr + ") ", updateIds.toArray());
+ logger.info("分库同步主库删除数据任务完成");
+ }
+ Object lastUpdateId = datas.get(datas.size() - 1).get("id");
+ logger.info("获取最后一次操作数据的数据ID信息为"+lastUpdateId);
+ record.set("last_id", lastUpdateId);
+ record.set("last_date", new Date());
+ Db.use(url).update("table_sync_info", record);
+ return true;
+ }
+ });
+ }
+ });
+ logger.info("修改分库table_sync_info最后操作数据信息 用于下次同步操作完成");
+ } else {
+ flag = false;
+ }
+ }
+ }
+ }
+ }
+ logger.info("分库数据同步主库结束");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/nms_sync/src/com/nms/thread/SyncThread.java b/nms_sync/src/com/nms/thread/SyncThread.java
new file mode 100644
index 0000000..db99c15
--- /dev/null
+++ b/nms_sync/src/com/nms/thread/SyncThread.java
@@ -0,0 +1,216 @@
+package com.nms.thread;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import org.apache.log4j.Logger;
+import com.alibaba.fastjson.JSON;
+import com.jfinal.aop.Before;
+import com.jfinal.plugin.activerecord.Db;
+import com.jfinal.plugin.activerecord.IAtom;
+import com.jfinal.plugin.activerecord.Record;
+import com.nms.interceptor.SyncDataInterceptor;
+import com.nms.model.SyncDbInfo;
+import com.jfinal.plugin.activerecord.tx.Tx;
+/**
+ * 数据同步功能线程
+ *
+ * @author Administrator
+ *
+ */
+@Before({/*SyncDataInterceptor.class,*/Tx.class})
+public class SyncThread implements Runnable {
+ private Logger logger = Logger.getLogger(this.getClass());
+ private SyncDbInfo syncDbInfo;
+
+ public SyncThread() {
+ super();
+ }
+
+ public SyncThread(SyncDbInfo syncDbInfo) {
+ super();
+ this.syncDbInfo = syncDbInfo;
+ }
+
+ @Override
+ public void run() {
+ try {
+ logger.info("开始主库数据同步分库任务");
+ // 获取url路径
+ final String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/"
+ + syncDbInfo.get("database_name");
+ logger.info("获取分库数据库连接信息"+url);
+ List<Record> find = Db.use("masterDataSource").find("select * from table_sync_info where db_id=? ",
+ syncDbInfo.get("id"));
+ logger.info("查询主库须向分库同步数据信息"+JSON.toJSONString(find));
+ if (find != null && find.size() > 0) {
+ for (final Record record : find) {
+ // 循环同步数据标识
+ boolean flag = true;
+ // 判断表中的event事件 1代表insert 2代表update 3代表delete
+ if (record.getInt("event") == 1) {
+ //根据mode判断主键产生方式
+ if(record.getInt("mode").equals(1)){
+ while (flag) {
+ // 查询增量数据
+ final List<Record> data =Db.use("masterDataSource")
+ .find("select * from " + record.getStr("table_name") + " where "
+ + record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit "+record.getInt("batch_size"),
+ record.getLong("last_id"));
+ //logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data));
+ if (data != null && data.size() > 0) {
+ //多数据源事务 主数据源嵌套子数据源
+ Db.use().tx(new IAtom() {
+ @Override
+ public boolean run() throws SQLException {
+ return Db.use(url).tx(new IAtom() {
+ @Override
+ public boolean run() throws SQLException {
+ Db.use(url).batchSave(record.getStr("table_name"), data, record.getInt("batch_size"));
+ // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
+ Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name"));
+ logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId));
+ record.set("last_id", lastInsertId);
+ record.set("last_date", new Date());
+ Db.use("masterDataSource").update("table_sync_info", record);
+ return true;
+ }
+ });
+ }
+ });
+ logger.info("增量更新数据任务结束");
+ } else {
+ flag = false;
+ }
+ }
+ }else if(record.getInt("mode").equals(0)){
+ //当数据库表结构主键不是自增时 增量更新的操作步骤
+ while (flag) {
+ // 新增操作 取出最后更新id信息 查询增量数据
+ final List<Record> data =Db.use("masterDataSource")
+ .find("select * from table_event_log where table_name = '" + record.getStr("table_name")
+ + "' and id > " + record.getLong("last_id") + " and event = "
+ + record.getInt("event") + " order by id asc limit "+record.getInt("batch_size"));
+ //logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data));
+ if (data != null && data.size() > 0) {
+ //多数据源事务 主数据源嵌套子数据源
+ Db.use().tx(new IAtom() {
+ @Override
+ public boolean run() throws SQLException {
+ return Db.use(url).tx(new IAtom() {
+ @Override
+ public boolean run() throws SQLException {
+ List<Integer> insertIds = new ArrayList<Integer>();
+ StringBuffer insertStr = new StringBuffer();
+ for (int i = 0; i < data.size(); i++) {
+ insertIds.add(data.get(i).getInt("target_id"));
+ if (i == 0) {
+ insertStr.append("?");
+ } else {
+ insertStr.append(",?");
+ }
+ }
+ List<Record> insertDatas = Db.use("masterDataSource")
+ .find(" select * from " + record.getStr("table_name") + " where "
+ + record.getStr("id_name") + " in (" + insertStr + ") ",
+ insertIds.toArray());
+ for(Record insertData:insertDatas){
+ Record seqData = Db.use(url).findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
+ insertData.set(record.getStr("id_name"), seqData.getLong("seqId"));
+ }
+ Db.use(url).batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
+ // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
+ Object lastInsertId = data.get(data.size() - 1).get("id");
+ logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId));
+ record.set("last_id", lastInsertId);
+ record.set("last_date", new Date());
+ Db.use("masterDataSource").update("table_sync_info", record);
+ return true;
+ }
+ });
+ }
+ });
+ logger.info("增量更新数据任务结束");
+ } else {
+ flag = false;
+ }
+ }
+ }
+ } else if (record.getInt("event") == 2 || record.getInt("event") == 3) {
+ // table_event_log sync_db_info两表查询获取修改数据信息 进行数据修改
+ while (flag) {
+ final List<Record> datas = Db.find(
+ " select * from table_event_log where table_name = '" + record.getStr("table_name")
+ + "' and id > " + record.getLong("last_id") + " and event = "
+ + record.getInt("event") + " order by id asc limit "+record.getInt("batch_size"));
+ //logger.info("获取主库删除或者修改数据的数据信息"+JSON.toJSONString(datas));
+ if (datas != null && datas.size() > 0) {
+ //多数据源事务 主数据源嵌套子数据源
+ Db.use().tx(new IAtom() {
+ @Override
+ public boolean run() throws SQLException {
+ return Db.use(url).tx(new IAtom() {
+ @Override
+ public boolean run() throws SQLException {
+ List<Integer> updateIds = new ArrayList<Integer>();
+ StringBuffer deleteStr = new StringBuffer();
+ for (int i = 0; i < datas.size(); i++) {
+ updateIds.add(datas.get(i).getInt("target_id"));
+ if (i == 0) {
+ deleteStr.append("?");
+ } else {
+ deleteStr.append(",?");
+ }
+ }
+ logger.info("获取所有操作的数据id信息为"+JSON.toJSONString(updateIds));
+ if (record.getInt("event") == 2) {
+ List<Record> updateDatas = Db.use("masterDataSource")
+ .find(" select * from " + record.getStr("table_name") + " where "
+ + record.getStr("id_name") + " in (" + deleteStr + ") ",
+ updateIds.toArray());
+ //logger.info("获取所有修改数据的数据信息为"+JSON.toJSONString(updateDatas));
+ if (updateDatas != null && updateDatas.size() > 0) {
+ Db.use(url).batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
+ updateDatas, record.getInt("batch_size"));
+ }
+ logger.info("分库对主库修改操作的数据同步任务完成");
+ } else if (record.getInt("event") == 3) {
+ Db.use(url).update("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+" in ("
+ + deleteStr + ") ", updateIds.toArray());
+ logger.info("分库对主库删除操作的数据同步完成");
+ }
+ Object lastUpdateId = datas.get(datas.size() - 1).get("id");
+ logger.info("获取最后一次修改或者删除操作的数据ID信息"+JSON.toJSONString(lastUpdateId));
+ record.set("last_id", lastUpdateId);
+ record.set("last_date", new Date());
+ Db.use("masterDataSource").update("table_sync_info", record);
+ logger.info("修改table_sync_info记录结果 用于下次同步完成");
+ return true;
+ }
+ });
+ }
+ });
+ } else {
+ flag = false;
+ }
+ }
+ }
+ }
+ }
+ logger.info("主库数据同步分库结束");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public SyncDbInfo getSyncDbInfo() {
+ return syncDbInfo;
+ }
+
+ public void setSyncDbInfo(SyncDbInfo syncDbInfo) {
+ this.syncDbInfo = syncDbInfo;
+ }
+
+
+} \ No newline at end of file
diff --git a/nms_sync/src/com/nms/util/GeneratorUtil.java b/nms_sync/src/com/nms/util/GeneratorUtil.java
new file mode 100644
index 0000000..311f213
--- /dev/null
+++ b/nms_sync/src/com/nms/util/GeneratorUtil.java
@@ -0,0 +1,62 @@
+package com.nms.util;
+
+import javax.sql.DataSource;
+
+import com.jfinal.kit.PathKit;
+import com.jfinal.plugin.activerecord.generator.Generator;
+import com.jfinal.plugin.druid.DruidPlugin;
+
+/**
+ * 用于动态生成model以及basemodel生成器
+ * 可以在数据库有修改时 同时更新model信息
+ * @author Administrator
+ *
+ */
+public class GeneratorUtil {
+ public static void main(String args[]) {
+ // base model 所使用报名
+ String baseModelPackageName="com.nms.model.basemodel";
+ // base model 文件保存路径
+ String baseModelOutputDir=PathKit.getRootClassPath() + "/../src/com/nms/model/basemodel";
+ // model 所使用的报名
+ String modelPackageName = "com.nms.model";
+ // model 文件保存路径
+ String modelOutputDir = baseModelOutputDir+"/..";
+ // 创建生成器
+ //DruidPlugin druid = new DruidPlugin("jdbc:mysql://localhost:3306/nms_sync","root","root");
+ DruidPlugin druid = new DruidPlugin("jdbc:mysql://10.0.6.247:3306/nms","nms","nms");
+ druid.start();
+ Generator generator = new Generator(druid.getDataSource(),baseModelPackageName,baseModelOutputDir,modelPackageName,modelOutputDir);
+
+ // 设置是否生成链式setter方法
+ generator.setGenerateChainSetter(false);
+
+ // 设置不需要生成的表名
+ generator.addExcludedTable("di_systeminfo_disk");
+ generator.addExcludedTable("di_systeminfo_net");
+ generator.addExcludedTable("node_ip_table");
+ generator.addExcludedTable("node_lattice_record");
+ generator.addExcludedTable("nodegroup_arrow_position");
+ generator.addExcludedTable("pro_deltabspace_temp");
+ generator.addExcludedTable("set_det_data_con");
+ generator.addExcludedTable("sys_data_dictionary_item");
+ generator.addExcludedTable("sys_data_dictionary_name");
+ /* generator.addExcludedTable("v_detection_set_info");
+ generator.addExcludedTable("v_mission_node_group_1");
+ generator.addExcludedTable("v_mission_node_group_4");
+ generator.addExcludedTable("v_mission_node_group_6");
+ generator.addExcludedTable("v_node_table");
+*/
+ // 设置是否在Model中生成dao对象
+ generator.setGenerateDaoInModel(true);
+
+ // 设置是否生成字典文件
+ generator.setGenerateDataDictionary(false);
+
+ // 设置需要被移除的表明前缀用于生成modelName
+ generator.setRemovedTableNamePrefixes("");
+
+ // 开始生成
+ generator.generate();
+ }
+}