diff options
| author | chenjinsong <[email protected]> | 2018-09-27 16:11:54 +0800 |
|---|---|---|
| committer | chenjinsong <[email protected]> | 2018-09-27 16:11:54 +0800 |
| commit | 56d71f261a8bd6031e47e2bf80867049a2aa13da (patch) | |
| tree | f09257b2143782a333a9eda3395137837d9bdad1 /src/com/nis/nmsclient/thread/socket | |
initial commit
Diffstat (limited to 'src/com/nis/nmsclient/thread/socket')
| -rw-r--r-- | src/com/nis/nmsclient/thread/socket/CommonSocket.java | 1199 | ||||
| -rw-r--r-- | src/com/nis/nmsclient/thread/socket/SSLCertOper.java | 238 | ||||
| -rw-r--r-- | src/com/nis/nmsclient/thread/socket/SSLClient.java | 288 | ||||
| -rw-r--r-- | src/com/nis/nmsclient/thread/socket/SSLServer.java | 534 | ||||
| -rw-r--r-- | src/com/nis/nmsclient/thread/socket/ServerCollectData.java | 581 |
5 files changed, 2840 insertions, 0 deletions
diff --git a/src/com/nis/nmsclient/thread/socket/CommonSocket.java b/src/com/nis/nmsclient/thread/socket/CommonSocket.java new file mode 100644 index 0000000..8248f32 --- /dev/null +++ b/src/com/nis/nmsclient/thread/socket/CommonSocket.java @@ -0,0 +1,1199 @@ +package com.nis.nmsclient.thread.socket; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.net.InetAddress; +import java.net.Socket; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.GregorianCalendar; +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; + +import com.nis.nmsclient.common.Contants; +import com.nis.nmsclient.util.FileUtil; +import com.nis.nmsclient.util.MD5Util; +import com.nis.nmsclient.util.Utils; +import com.nis.nmsclient.util.file.BufferedRandomAccessFile; +import com.socket.utils.FileComment; + +public class CommonSocket{ + static Logger logger = Logger.getLogger(CommonSocket.class); + protected static final String TEMP_SUFFIX = ".tp"; + //缓存字节长度 + protected static final int BUFF_SIZE = 1024; + public static final String SUCCESS = "success"; + public static final String FAIL = "fail"; + public static final String END = "end"; + /** + * 与Server握手请求 + */ + public static final String REQ_HAND_SHAKE = "char:handshake"; + /** + * 获取本机标志UUID请求 + */ + public static final String REQ_LOCAL_UUID = "char:uuid"; + /** + * 发送本机变更信息请求 + */ + public static final String REQ_LOCAL_CHANGE = "char:agentChange"; + /** + * 初始化配置请求 + */ + public static final String REQ_INIT_CONFIG = "char:init"; + /** + * 初始化任务请求 + */ + public static final String REQ_INIT_TASK = "char:initTask"; + /** + * 主动告警请求 + */ + public static final String REQ_ALARM = "char:alarm"; + /** + * 获取Server端系统时间请求 + */ + public static final String REQ_SERVER_SYSTEMDATE = "char:systemdate"; + /** + * Server升级请求 + */ + public static final String REQ_SERVER_UPGRADE = "char:upgradeServer"; + /** + * NC向DC发送错误信息 + */ + public static final String REQ_ERROR_INFO = "char:ncErrorInfo"; + // ========== 现由DC主动获取数据,以下命令暂留 + /** + * 批量上传数据文件请求 + */ + public static final String REQ_UPLOAD_DATAS ="byte:datas"; + /** + * 回传文件请求 + */ + public static final String REQ_TASK_RETURNFILE = "byte:taskReturn"; + // ========== 现由DC主动获取数据,以上命令暂留 + /** + * 发送任务结果请求【数据收集方式改为DC主动后,此请求只在NC启动时发送所有任务结果使用】 + */ + public static final String REQ_TASK_RESULT = "char:taskResult"; + /** + * 上传回传文件、任务结果文件、数据文件的打包文件请求【数据收集方式改为DC主动后,此请求只在NC启动时发送所有任务结果使用】 + */ + public static final String REQ_BP_UPLOAD_FIFE = "byte:bpUploadFile"; + //bpUploadFile的子命令类型 + public static final String BP_TYPE_TASK_RESULT = "taskresult"; + public static final String BP_TYPE_TASK_RETURN = "taskreturn"; + public static final String BP_TYPE_DETECT_DATA = "detectdata"; + /** + * 交换证书通信命令 + */ + public static final String REQ_CERT = "byte:cert"; + /** + * 更新监测设置信息通信命令 + */ + public static final String SERVER_UPDATE_CONFIG = "char:updateConfig"; + /** + * 下发第三方监测脚本命令 + */ + public static final String SEND_PLUGIN_SCRIPT_FILE = "char:sendPluginScriptFile"; + /** + * NC端是否报主动告警 变更 + */ + public static final String ACTIVE_ALARM_START_ALERT = "char:isActiveAlarmStart"; + /** + * 文件推送通信命令 + */ + public static final String SERVER_FILE_PUSH = "byte:filePush"; + /** + * 升级通信命令 + */ + public static final String SERVER_UPGRADE = "byte:upgrade"; + /** + * 下发任务通信命令 + */ + public static final String SERVER_TASK = "char:task"; + /** + * 任务撤消命令 + */ + public static final String SERVER_TASK_CANCEL = "char:taskCancel"; + /** + * DC主动向NC再次获取任务结果 + */ + public static final String SERVER_GET_TASKRESULT = "char:collectNonRltTaskResult"; + /** + * DC主动向NC收集监测数据 + */ + public static final String SERVER_COLLECT_DATA = "byte:collectData"; + //收集数据,Agent发送的类型 + public static final String DATA_TYPE_ZIP_DETECT = "zipDetectData";//监测数据zip + public static final String DATA_TYPE_CSV_DETECT = "csvDetectData";//批量上传csv监测数据 + public static final String DATA_TYPE_ZIP_TASKRESULT = "zipTaskResult";//任务结果zip + public static final String DATA_TYPE_OBJ_TASKRESULT = "objTaskResult";//批量上传任务结果obj + public static final String DATA_TYPE_ZIP_TASKRETURN = "zipTaskReturn";//任务回传文件zip + public static final String DATA_TYPE_FILE_TASKETURN = "fileTaskReturn";//单个任务回传文件 + + protected Socket socket = null; + protected OutputStream out = null; + protected InputStream in = null; + + public CommonSocket() { + super(); + } + + public CommonSocket(Socket client) throws Exception { + socket = client; + out = socket.getOutputStream(); + in = socket.getInputStream(); + } + + /** + * 发送消息,以字符形式,发送一行信息 + **/ + public boolean sendMessageByChar(String msg) throws Exception { + logger.debug("sendMessageByChar---" + msg); + PrintWriter pw = new PrintWriter(new OutputStreamWriter(out, + Contants.charset)); + pw.println(msg); + pw.flush(); + + return true; + } + + + /** + * 接收信息,以字符形式,接收一行信息 + * + */ + public String receiveMessageByChar() throws Exception { + BufferedReader br = new BufferedReader(new InputStreamReader(in, + Contants.charset)); + String str = br.readLine(); + logger.debug("receiveMessageByChar---" + str); + return str; + } + + /** + * 发送单个文件 + **/ + public boolean sendFileByByte(File file) throws Exception { + ObjectOutputStream oos = null; + FileInputStream fis = null; + try { + //发送文件大小和文件名 + oos = new ObjectOutputStream(out); + String[] strArr = new String[]{ + file.length() + "", file.getName() + }; + oos.writeObject(strArr); + //发送文件内容 + byte[] buff = new byte[BUFF_SIZE]; + int len = 0; + fis = new FileInputStream(file); + while ((len = fis.read(buff)) != -1) { + //将读取的内容写入文件 + out.write(buff, 0, len); + } + out.flush(); + } catch (Exception e) { + logger.error("Single file sending failure"); + throw e; + } finally{ + if(fis!=null){ + try { + fis.close(); + } catch (IOException e) { + logger.error(Utils.printExceptionStack(e)); + } + } + } + + return true; + } + + /** + * 接收单个文件 + * + */ + public boolean receiveFileByByte(String filePath) throws Exception { + ObjectInputStream ois = null; + FileOutputStream fos = null; + try { + + ois = new ObjectInputStream(in); + String[] strArr = (String[])ois.readObject(); + //接收文件大小 + long fileSize = Long.parseLong(strArr[0]); + //接收文件名 + String fileName = strArr[1]; + + //接收文件内容 + byte[] buff = new byte[BUFF_SIZE]; + fos = new FileOutputStream(filePath + File.separator + fileName); + int nRead = 0; + + //单个文件循环读取 + while ((nRead = in.read(buff, 0, (int)(BUFF_SIZE<fileSize?BUFF_SIZE:fileSize))) > 0) { + fos.write(buff,0,nRead); + fos.flush(); + fileSize -= nRead; + if(fileSize<=0){ + break; + } + } + fos.close(); + } catch (Exception e) { + logger.error("Single file receiving failure"); + throw e; + } finally { + try { + if (fos != null) { + fos.close(); + } + } catch (IOException e) { + logger.error(Utils.printExceptionStack(e)); + } + + } + + return true; + + } + + /** + * 批量上传文件 + * @param dir 本地文件集合根目录绝对路径 + * @param fileList 上传的文件列表 + */ + public void sendFileByBath(String dir, List<File> fileList) throws Exception { + ObjectOutputStream oos = null; + FileInputStream fis = null; + + try { + // 第一步发送本地根目录地址(用于地址截取)保证fileList的目录结构完整性 + this.sendMessageByChar(dir); + String result = this.receiveMessageByChar(); + logger.debug("根目录地址发送状态: " + result); + // 第二步 用ObjectOutputStream工具类 发送file对象信息 用于文件名,文件目录,文件大小的获取 + oos = new ObjectOutputStream(out); + List<String[]> fileStrList = new ArrayList<String[]>(); + if(fileList!=null && fileList.size()>0){ + for (File f : fileList) { + if (f.exists()) { + String[] tmpArr = new String[] { f.getAbsolutePath(), + f.length() + "" }; + fileStrList.add(tmpArr); + } else { + logger.warn("File:>" + f.getAbsolutePath() + + " do not exist, can not send"); + } + } + oos.writeObject(fileStrList); + // 第三部,发送文件 + byte[] buff = new byte[BUFF_SIZE]; + int len = 0; + // 循环上传文件 + for (File file : fileList) { + logger.debug("--sendFileByBath---" + file.getName() + "---length=" + file.length()); + fis = new FileInputStream(file); + while ((len = fis.read(buff)) != -1) {// 将读取的内容输出流 + out.write(buff, 0, len); + } + out.flush(); + fis.close(); + fis = null; + } + } + logger.debug("批量发送文件个数:" + (fileList==null ? 0 : fileList.size())); + } catch (Exception e) { + throw e; + } finally { + try { + if (fis != null) { + fis.close(); + fis = null; + } + } catch (IOException e) { + } + } + } + + /** + * 批量接收文件 + * @param newDir + */ + public boolean receiveFileByBath(String newDir) throws Exception { + ObjectInputStream ois = null; + FileOutputStream fos = null; + try { + //获取集合文件路径 + String oldDir = this.receiveMessageByChar(); + //logger.info("旧上传文件集合根目录: " + oldDir); + this.sendMessageByChar("success"); + ois = new ObjectInputStream(in); + List<String[]> fileList = (List<String[]>)ois.readObject(); + if(fileList != null && fileList.size()>0){ + for(String[] arr : fileList){ + String newUrl = arr[0].replace(oldDir, newDir);//新路径 + newUrl = newUrl.replaceAll("\\\\", "/"); + int fileLength = Integer.parseInt(arr[1]); //大小 + File newFile = new File(newUrl); + if(newFile.exists()){ + FileUtil.delDir(newFile); + logger.debug("receiveFileByBath delete file---" + newFile.getAbsolutePath()); + } + if(!newFile.getParentFile().exists()){ + newFile.getParentFile().mkdirs(); + } + fos = new FileOutputStream(newUrl+TEMP_SUFFIX); + + int nRead = 0; + byte[] buff = new byte[BUFF_SIZE]; + //单个文件循环读取 + while ((nRead = in.read(buff, 0, (int)(BUFF_SIZE<fileLength?BUFF_SIZE:fileLength))) > 0) { + fos.write(buff,0,nRead); + fos.flush(); + fileLength -= nRead; + if(fileLength<=0){ + break; + } + } + fos.close(); + fos = null; + File newFile2 = new File(newUrl+TEMP_SUFFIX); + //newFile2.renameTo(newFile); + FileUtils.copyFile(newFile2, newFile);//将临时文件名改为正式文件名,即去掉.tp后缀 + newFile2.delete();// 将临时文件删除 + } + } + logger.debug("批量接收文件个数:" + (fileList==null ? 0 : fileList.size())); + } catch (Exception e) { + throw e; + }finally{ + if(fos!=null){ + try { + fos.close(); + fos = null; + } catch (IOException e) { + } + } + } + + return true; + + } + + /** + * 批量上传文件, 并传入文件的Md5值 + * @param fileList 上传的文件列表 + */ + protected void sendFileWithMd5ByBath(List<String[]> fileCommentsList) throws IOException { + ObjectOutputStream oos = null; + FileInputStream fis = null; + + try { + // 第一步 用ObjectOutputStream工具类 发送file对象信息 用于文件名,文件目录,文件大小的获取 + oos = new ObjectOutputStream(out); + List<File> fileList = new LinkedList<File>(); + List<String[]> fileStrList = new ArrayList<String[]>(); + for(String[] fileComments : fileCommentsList){ + File file = new File(fileComments[0]); + if(file.exists()){ + String[] tmpArr = new String[]{ + file.getName(), file.length() + "",fileComments[1] + }; + fileList.add(file); + fileStrList.add(tmpArr); + }else { + logger.warn("File:>"+file.getAbsolutePath()+" do not exist, can not send"); + } + } + oos.writeObject(fileStrList); + // 第三部,发送文件 + byte[] buff = new byte[BUFF_SIZE]; + int len = 0; + // 循环上传文件 + for (File file : fileList) { + fis = new FileInputStream(file); + while ((len = fis.read(buff)) != -1) {// 将读取的内容输出流 + out.write(buff, 0, len); + } + out.flush(); + fis.close(); + fis = null; + } + logger.debug("批量发送文件结束,共 "+(fileList==null ? 0 : fileList.size())+ "个文件"); + } catch (IOException e) { + logger.error("Batch file failed!"); + throw new IOException(e); + } finally { + try { + if (fis != null) { + fis.close(); + fis = null; + } + } catch (IOException e) { + } + } + } + + /** + * 批量接收文件, 使用Md5校验文件是否完整 + * @param newDir + */ + public boolean receiveFileWithMd5ByBath(String newDir) throws IOException { + boolean flag = true; + ObjectInputStream ois = null; + FileOutputStream fos = null; + try { + ois = new ObjectInputStream(in); + List<String[]> fileList = (List<String[]>)ois.readObject(); + if(fileList != null && fileList.size()>0){ + int sucessCnt = 0; + int failCnt = 0; + for(int i=0; i<fileList.size(); i++){ + String[] arr = fileList.get(i);//arr[0] 存放文件名,arr[1] 存放文件长度,arr[2] 存放Md5值 + String newUrl = newDir + File.separator + arr[0]; + int fileLength = Integer.parseInt(arr[1]);//大小 + String md5Val = arr[2]; + File newFile = new File(newUrl); + if(newFile.exists()){ + FileUtil.delDir(newFile); + logger.debug("receiveFileWithMd5ByBath delete file--" + newFile.getAbsolutePath()); + } + if(!newFile.getParentFile().exists()){ + newFile.getParentFile().mkdirs(); + } + fos = new FileOutputStream(newUrl+TEMP_SUFFIX); + + //接收文件内容 + int nRead = 0; + byte[] buff = new byte[BUFF_SIZE]; + + //单个文件循环读取 + while ((nRead = in.read(buff, 0, (int)(BUFF_SIZE<fileLength?BUFF_SIZE:fileLength))) > 0) { + fos.write(buff,0,nRead); + fos.flush(); + fileLength -= nRead; + if(fileLength<=0){ + break; + } + } + fos.close(); + fos = null; + File newFile2 = new File(newUrl+TEMP_SUFFIX); + if (md5Val != null + && md5Val + .equals(MD5Util.getFileMD5String(newFile2))) { + logger.debug("接收文件" + (i+1) + "“" + newFile.getAbsolutePath() + "”完整"); + //newFile2.renameTo(newFile); + FileUtils.copyFile(newFile2, newFile);//将临时文件名改为正式文件名,即去掉.tp后缀 + newFile2.delete();// 将临时文件删除 + sucessCnt ++ ; + } else { + logger.debug("接收文件" + (i+1) + "“" + newFile.getAbsolutePath() + "”不完整,失败"); + failCnt ++ ; + } + } + logger.info("批理接收文件个数:" + fileList.size() + ", 成功:" + sucessCnt + ", 失败:" + failCnt); + if(failCnt > 0) { + flag = false; + } + }else{ + logger.info("批量接收文件列表为空"); + } + } catch (Exception e) { + logger.error("Batch file failure"); + throw new IOException(e); + }finally{ + if(fos!=null){ + try { + fos.close(); + fos = null; + } catch (IOException e) { + } + } + } + + return flag; + } + + /** + * 断点续传 发送方法 + * @time Mar 2, 2012-2:30:16 PM + * @param filePath + */ + protected boolean bpSendFile (String filePath) throws Exception { + File file = new File(filePath); + + //发送长度 end + this.sendMessageByChar(file.length()+""); + + String msg = this.receiveMessageByChar(); + long start = Long.parseLong(msg); + long end = file.length(); + logger.debug("start "+start); + logger.debug("end "+end); + bpSendFile(filePath, start, end); + + return true; + } + + /** + * 断点续传 接收方法 + * @time Mar 2, 2012-2:30:16 PM + * @param filePath + * @param start + * @param end + */ + protected int bpReceiveFile (String filePath) throws Exception { + File file = new File(filePath); + if(!file.exists()){ + file = new File(filePath+TEMP_SUFFIX); + } + String msg = this.receiveMessageByChar(); + long start = file.length(); + long end = Long.parseLong(msg); + this.sendMessageByChar(start+""); + + logger.debug("start "+start); + logger.debug("end "+end); + bpReceiveFile(file.getAbsolutePath(), start, end); + + //file.renameTo(new File(filePath)); + FileUtils.copyFile(file, new File(filePath));//将临时文件名改为正式文件名,即去掉.tp后缀 + file.delete();// 将临时文件删除 + + logger.debug("bpReceiveFile sucess"); + return 0; + } + + /** + * 断点续传 发送方法 + * @time Mar 2, 2012-2:30:16 PM + * @param filePath + * @param start + * @param end + */ + protected void bpSendFile (String filePath,long start,long end) throws Exception { + if (start == end) { + return; + } + BufferedRandomAccessFile braf = null; + try { + File file = new File(filePath); + + //- 不存在,终止; 存在则继续 + if(!file.exists()){ + this.sendMessageByChar(FAIL); + return ; + }else { + this.sendMessageByChar(SUCCESS); + } + + String msg = this.receiveMessageByChar(); + logger.debug("Recive: " + msg); + + //- BufferedRandomAccessFile 读取指定位置的文件字节数组,写入输出通讯 + byte[] b = new byte[BUFF_SIZE]; + braf = new BufferedRandomAccessFile(file,"r"); + braf.seek(start); + int nRead; + while ((nRead = braf.read(b, 0, BUFF_SIZE)) > 0) { + + out.write(b, 0, nRead); + start += nRead; + + //-- 读取完成 跳出 + if(start==end){break;} + + } + + }catch (Exception e) { + throw e; + }finally{ + try { + //- 关闭 随机访问文件对象(关闭流) + if(braf!= null){braf.close();} + } catch (IOException e) { + logger.error(Utils.printExceptionStack(e)); + } + + } + } + /** + * 断点续传 接收方法 + * @time Mar 2, 2012-2:30:16 PM + * @param filePath + * @param start + * @param end + */ + protected void bpReceiveFile (String filePath,long start,long end) throws Exception { + if(StringUtils.isEmpty(filePath)){ + return; + } + if (start == end) { + return; + } + BufferedRandomAccessFile raf = null; + try { + File file = new File(filePath); + + //- 文件路径不存在 则创建 + if (!file.getParentFile().exists()) { + file.getParentFile().mkdirs(); + } + + //- 文件不存在 则创建 + if (!file.exists()) { + file.createNewFile(); + } + + //- 接收发送端 发送数据准备 确认信息 + String msg = this.receiveMessageByChar(); + + if (FAIL.equals(msg)) { //结束操作 + return; + } else { + this.sendMessageByChar(SUCCESS); // 通知发送端 接收数据准备完成 确认信息 + } + + // 将通信中读出的数据 写入文件指定位置 + byte[] b = new byte[BUFF_SIZE]; + raf = new BufferedRandomAccessFile(file, "rw"); + raf.seek(start); + int nRead; + + while ((nRead = in.read(b, 0, BUFF_SIZE)) > 0) { + raf.write(b, 0, nRead); + start += nRead; + + if (start == end) { //写完跳出 + break; + } + } + } catch (Exception e) { + throw e; + }finally{ + try { + //- 关闭 随机访问文件对象(关闭流) + if(raf!= null){raf.close();} + } catch (IOException e) { + logger.error(Utils.printExceptionStack(e)); + } + } + } + + /** + * 断点续传 批量上传文件 + * @param fileList 上传的文件列表 + * @param dir 本地文件集合根目录绝对路径 + */ + protected void bpSendFileByBath(List<File> fileList,String dir) throws Exception { + BufferedRandomAccessFile oReadFile = null; + + try { + // 第一步发送本地根目录地址(用于地址截取)保证fileList的目录结构完整性 + this.sendMessageByChar("abs:"+(dir==null?"":dir)); + String result = this.receiveMessageByChar(); + logger.debug("根目录路径通信状态: " + result); + // 第二步 用ObjectOutputStream工具类 发送file对象信息 用于文件名,文件目录,文件大小的获取 + + //原文件文件名 和 大小(即end长度) + List<String[]> sourceFileList = new ArrayList<String[]>(); + for(File f : fileList){ + String[] tmpArr = new String[]{ + f.getAbsolutePath(),0+"",f.length() + "" + }; + sourceFileList.add(tmpArr); + } + + logger.debug("发送信息: " + Arrays.toString(sourceFileList.toArray())); + this.sendObject(sourceFileList); + + //得到需要下载的文件信息 + List<String[]> sendFileList = (List<String[]>)receiveObject(); + + // 第三部,发送文件 + byte[] buff = new byte[BUFF_SIZE]; + + // 循环上传文件 + for (String[] sendFile: sendFileList) { + long start = Long.parseLong(sendFile[1]); + long end = Long.parseLong(sendFile[2]); + if(start >= end){ + continue; + } + File file = new File(sendFile[0]); + oReadFile = new BufferedRandomAccessFile(file,"r"); + + // 定位文件指针到nPos位置 + oReadFile.seek(start); //从0开始 + int nRead; + + // 从输入流中读入字节流,然后写到文件中 + while ((nRead = oReadFile.read(buff, 0, BUFF_SIZE)) > 0) { + + out.write(buff, 0, nRead); + start += nRead; //调整为从1开始 + if(start >= end){ + break; + } + + } + oReadFile.close(); + oReadFile = null; + } + logger.debug("多文件上传结束,共 "+(fileList==null ? 0 : fileList.size())+ "个文件"); + } catch (Exception e) { + throw e; + } finally { + try { + if (oReadFile != null) { + oReadFile.close(); + oReadFile = null; + } + } catch (IOException e) { + logger.error(Utils.printExceptionStack(e)); + } + } + } + + /** + * 断点续传 批量接收文件 + * @param newDir + */ + protected void bpReceiveFileByBath(String newDir) throws Exception { + + BufferedRandomAccessFile oSavedFile = null; + try { + //获取集合文件路径 + String oldDir = this.receiveMessageByChar(); + int headLength = "abs:".length(); + oldDir = ((StringUtils.isNotEmpty(oldDir) + && oldDir.length()>=headLength) + ?oldDir.substring(headLength,oldDir.length()) + :oldDir); + if(StringUtils.isEmpty(oldDir)){ + logger.debug("远程 目录根路径为空 接收文件不保留目录格式 统一存放到本地目录:》"+newDir); + }else{ + logger.debug("根目录 记录: " + oldDir+" VS "+newDir); + } + this.sendMessageByChar(SUCCESS); + + List<String[]> remoteFileList = (List<String[]>)receiveObject(); + List<String[]> receiveFileList = new LinkedList<String[]>(); + byte[] buff = new byte[BUFF_SIZE]; + if(remoteFileList != null && remoteFileList.size()>0){ + for(String[] arr : remoteFileList){ + String newUrl = null; + if(StringUtils.isEmpty(oldDir)){ + newUrl = newDir+(new File(arr[0].replaceAll("\\\\", "/")).getName()); + }else{ + newUrl = arr[0].replace(oldDir, newDir);//新路径 + newUrl = newUrl.replaceAll("\\\\", "/"); + } + + File newFile = new File(newUrl); + + //该文件已存在 + if(newFile.exists()){ + continue; + } + + newFile = new File(newUrl+TEMP_SUFFIX); + arr[1] = newFile.length()+""; + receiveFileList.add(arr); + } + } + this.sendObject(receiveFileList); + + if(receiveFileList != null && receiveFileList.size()>0){ + for(String[] arr : receiveFileList){ + String newUrl = null; + if(StringUtils.isEmpty(oldDir)){ + newUrl = newDir+(new File(arr[0].replaceAll("\\\\", "/")).getName()); + }else{ + newUrl = arr[0].replace(oldDir, newDir);//新路径 + newUrl = newUrl.replaceAll("\\\\", "/"); + } + + File newFile = new File(newUrl+TEMP_SUFFIX); + + if(!newFile.getParentFile().exists()){ + newFile.getParentFile().mkdirs(); + } + + if(!newFile.exists()){ + newFile.createNewFile(); + } + + int start = Integer.parseInt(arr[1]); // 起始 + int end = Integer.parseInt(arr[2]); // 结束 + if(start<end){ + oSavedFile = new BufferedRandomAccessFile(newFile,"rw"); + oSavedFile.seek(start); + int nRead; + rfile:while ((nRead = in.read(buff, 0, BUFF_SIZE<end?BUFF_SIZE:end)) > 0) { + oSavedFile.write(buff,0,nRead); + end -= nRead; + if(end<=0){ + break rfile; + } + } + oSavedFile.close(); + oSavedFile = null; + } +// newFile.renameTo(new File(newUrl)); //将临时文件名改为正式文件名,即去掉.tp后缀 + FileUtils.copyFile(newFile, new File(newUrl)); //将临时文件名改为正式文件名,即去掉.tp后缀 + newFile.delete(); + } + } + logger.debug("多文件接收结束,共 "+(remoteFileList==null ? 0 : remoteFileList.size())+ "个文件"); +// } catch (IOException e) { +// logger.error("",e); +// } catch (ClassNotFoundException e) { +// logger.error("",e); + }finally{ + if(oSavedFile!=null){ +// try { + oSavedFile.close(); + oSavedFile = null; +// } catch (IOException e) { +// logger.error("",e); +// } + } + } + } + + /** + * 断点续传 批量上传文件, 并传入文件的Md5值 + * @param fileList 上传的文件列表 + * @param dir 本地文件集合根目录绝对路径 + */ + protected void bpSendFileByBathMD5(List<FileComment> fileCommentsList) throws Exception { + BufferedRandomAccessFile oReadFile = null; + + try { + //原文件文件名 和 大小(即end长度) + List<FileComment> sourceFileList = new ArrayList<FileComment>(); + if(fileCommentsList !=null && fileCommentsList.size()!=0){ + for(FileComment fileComment : fileCommentsList){ + File f = new File(fileComment.getFileName()); + if(!f.exists()){ + sourceFileList.add(new FileComment(f.getAbsolutePath(),0,-1,fileComment.getMd5Val())); + }else { + String md5Val = StringUtils.isEmpty(fileComment.getMd5Val())?MD5Util.getFileMD5String(f):fileComment.getMd5Val(); + sourceFileList.add(new FileComment(f.getAbsolutePath(),0,f.length(),md5Val)); + } + } + } + + logger.debug("发送信息: " + Arrays.toString(sourceFileList.toArray())); + this.sendObject(sourceFileList); + + //得到需要下载的文件信息 + List<FileComment> sendFileList = (List<FileComment>)receiveObject(); + + // 第三部,发送文件 + byte[] buff = new byte[BUFF_SIZE]; + + // 循环上传文件 + for (FileComment sendFile: sendFileList) { + long start = sendFile.getStart(); + long end = sendFile.getEnd(); + if(start >= end){ + continue; + } + File file = new File(sendFile.getFileName()); + oReadFile = new BufferedRandomAccessFile(file,"r"); + + // 定位文件指针到nPos位置 + oReadFile.seek(start); //从0开始 + int nRead; + + // 从输入流中读入字节流,然后写到文件中 + while ((nRead = oReadFile.read(buff, 0, BUFF_SIZE)) > 0) { + + out.write(buff, 0, nRead); + start += nRead; //调整为从1开始 + if(start >= end){ + break; + } + + } + oReadFile.close(); + oReadFile = null; + } + logger.debug("多文件上传结束,共 "+(sendFileList==null ? 0 : sendFileList.size())+ "个文件"); + } catch (Exception e) { + throw e; + } finally { + try { + if (oReadFile != null) { + oReadFile.close(); + oReadFile = null; + } + } catch (IOException e) { + logger.error(Utils.printExceptionStack(e)); + } + } + } + + + /** + * 断点续传 批量接收文件, 使用Md5校验文件是否完整 + * @param newDir + */ + public int bpReceiveFileByBathMd5(String newDir) throws Exception{ + if(newDir!=null){ + newDir += File.separator; + } + int rFlag = 0; //0 OK -1 MD5 ERROR -2 Function ERROR -3 文件不存在 + BufferedRandomAccessFile oSavedFile = null; //有缓存的 随机文件IO对象 + try { + + List<FileComment> remoteFileList = (List<FileComment>)receiveObject(); //接收可接收的文件信息 string[]{fileName,start,end,MD5} + List<FileComment> receiveFileList = new LinkedList<FileComment>(); //需要续传的文件及其索引信息 string[]{fileName,start,end,MD5} + byte[] buff = new byte[BUFF_SIZE]; //缓存 大小 + + //- 检查实际接收文件大小 + if(remoteFileList != null && remoteFileList.size()>0){ + for(FileComment arr : remoteFileList){ + //String newUrl = newDir+removeTimeTagFileName(new File(arr.getFileName()).getName(),null); + String filePath = arr.getFileName().replaceAll("\\\\", "/"); + String fileName = filePath.substring(filePath.lastIndexOf("/")+1, filePath.length()); + String newUrl = newDir+removeTimeTagFileName(fileName,null); + + File newFile = new File(newUrl); + + //-- 已接收完成 + if(newFile.exists()){ + // continue; + // 2013-1-6 jzz 如果接收完成也比较MD5值,主要是针对再次执行任务,直接拷来的文件 + //-- MD5为空 无需校验 + if(StringUtils.isEmpty(arr.getMd5Val())){ + continue; + } + //-- MD5相等, 接收完成 + if(arr.getMd5Val().equals(MD5Util.getFileMD5String(newFile))){ + logger.debug("1--" + newFile.getAbsolutePath()+" MD5值校验一致"); + continue; + } else {//-- MD5不相等,则删除该文件,下面重新接收 + FileUtil.delDir(newFile); + logger.debug("1--bpReceiveFileByBathMd5 delete file ---" + newFile.getAbsolutePath()); + logger.debug("1--" + newFile.getAbsolutePath()+" MD5值校验不一致"); + } + // 2013-1-6 jzz 修改结束 + } + + //-- 续传文件及起始长度 + newFile = new File(newUrl+TEMP_SUFFIX); + arr.setStart(newFile.length()); + receiveFileList.add(arr); + } + } + this.sendObject(receiveFileList); + + //- 接收文件 + if(receiveFileList != null && receiveFileList.size()>0){ + for(FileComment arr : receiveFileList){ + //String newUrl = newDir+removeTimeTagFileName(new File(arr.getFileName()).getName(),null); + String filePath = arr.getFileName().replaceAll("\\\\", "/"); + String fileName = filePath.substring(filePath.lastIndexOf("/")+1, filePath.length()); + String newUrl = newDir+removeTimeTagFileName(fileName,null); + + File newFile = new File(newUrl+TEMP_SUFFIX); + + if(!newFile.getParentFile().exists()){ + newFile.getParentFile().mkdirs(); + } + + //创建空文件 + if (!newFile.exists()) { + newFile.createNewFile(); + } + + long start = arr.getStart(); // 起始 + long end = arr.getEnd(); // 结束 + if(end == -1){ + return -3; + } + + if(start<end){ + oSavedFile = new BufferedRandomAccessFile(newFile,"rw"); + oSavedFile.seek(start); + int nRead; + rfile:while ((nRead = in.read(buff, 0, (int)(BUFF_SIZE<end?BUFF_SIZE:end))) > 0) { + oSavedFile.write(buff,0,nRead); + end -= nRead; + if(end<=0){ + break rfile; + } + } + oSavedFile.close(); + oSavedFile = null; + } + //newFile.renameTo(new File(newUrl)); + FileUtils.copyFile(newFile, new File(newUrl));//将临时文件名改为正式文件名,即去掉.tp后缀 + newFile.delete();// 将临时文件删除 + logger.debug(newFile.getAbsolutePath()+" 下载完成!"); + + //-- MD5为空 无需校验 + if(StringUtils.isEmpty(arr.getMd5Val())){ + continue; + } + + File newFile2 = new File(newUrl); + //-- MD5不相等,则删除该文件 返回-1 + if(!arr.getMd5Val().equals(MD5Util.getFileMD5String(newFile2))){ + //newFile.delete_bak(); + //使用删除文件公共方法 + FileUtil.delDir(newFile); + logger.debug("bpReceiveFileByBathMd5 delete file ---" + newFile.getAbsolutePath()); + //FileUtil.checkParentDirExist(newFile); + logger.debug(newFile.getAbsolutePath()+" MD5值校验不一致"); + return -1; + } else {//-- MD5相等 + logger.debug(newFile.getAbsolutePath()+" MD5值校验一致"); + } + } + } + logger.debug("多文件接收结束,共 "+(remoteFileList==null ? 0 : remoteFileList.size())+ "个文件"); + return rFlag; + } catch (Exception e) { + //return -2; + throw e; + }finally{ + if(oSavedFile!=null){ + try { + oSavedFile.close(); + oSavedFile = null; + } catch (IOException e) { + logger.error(Utils.printExceptionStack(e)); + } + } + } + } + + /** + * Object 形式 发送信息 + */ + protected void sendObject(Object object) throws Exception { + ObjectOutputStream oos = new ObjectOutputStream(out); + oos.writeObject(object); + oos.flush(); + } + + /** + * Object 形式 接收信息 + */ + protected Object receiveObject() throws Exception { + ObjectInputStream ois = new ObjectInputStream(in); + return ois.readObject(); + } + + /** + * 关闭通讯 + */ + public void close() { + try { + if(out!=null){ + out.close(); + out = null; + } + if(in!=null){ + in.close(); + in = null; + } + if(socket!=null && socket.isConnected()){ + socket.close(); + socket = null; + } + } catch (Exception e) { + logger.error(Utils.printExceptionStack(e)); + } + } + + /** + * 删除addTimeTagForFileName()方法 所添加的时间戳 + * @time Mar 12, 2012-3:36:16 PM + * @param fileName + * @return + */ + public static String removeTimeTagFileName(String fileName, String taskId) { + + if (StringUtils.isNotBlank(fileName) && fileName.contains("_")) { + + String timeTag = fileName.substring(fileName.lastIndexOf("_"), + fileName.lastIndexOf(".")==-1?fileName.length():fileName.lastIndexOf(".")); //针对无后缀名文件,时间戳截取校验 + fileName = fileName.replace(timeTag, ""); + + if(taskId!=null){ + fileName = fileName.replace("_" + taskId, ""); + } + + } + + return fileName; + + } + + /** + * 上传文件时,判断该文件是否已存在,如存在,则在后面加入时间戳 + * + * @param fileName + * 单纯的文件名 + * @param taskId 标识ID + * @return + * @throws UnknownHostException + */ + public static String addTimeTagForFileName(String fileName, String taskId, boolean isFile){ + + try + { + Calendar calendar = new GregorianCalendar(); + long timestamp = calendar.getTimeInMillis(); + + // 文件后缀 + String fielType = ""; + + if (isFile) {// 只是文件做名称处理,目录的话不用处理,直接使用原名称 + if (fileName.lastIndexOf(".") != -1) { + fielType = fileName.substring(fileName.lastIndexOf(".")); + fileName = fileName.substring(0, fileName.lastIndexOf(".")); + } + } + + if(taskId!=null){ + fileName += "_" + taskId; + } + fileName += "_" + timestamp+""+((int)(Math.random()*1000)); + + if(StringUtils.isNotBlank(Contants.AGENT_LOCAL_IP)) { + fileName = fileName+"_"+Contants.AGENT_LOCAL_IP; + } + fileName += fielType; + logger.debug("回传文件名称为: "+fileName); + } catch (Exception e) + { + logger.error("Generating the name exception of the return file", e); + } + + return fileName; + } +} diff --git a/src/com/nis/nmsclient/thread/socket/SSLCertOper.java b/src/com/nis/nmsclient/thread/socket/SSLCertOper.java new file mode 100644 index 0000000..32cf8d0 --- /dev/null +++ b/src/com/nis/nmsclient/thread/socket/SSLCertOper.java @@ -0,0 +1,238 @@ +package com.nis.nmsclient.thread.socket; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.security.KeyStore; +import java.security.SecureRandom; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; + +import org.apache.log4j.Logger; + +import com.nis.nmsclient.common.Contants; +import com.nis.nmsclient.util.DateUtil; +import com.nis.nmsclient.util.Utils; + +public class SSLCertOper { + static Logger logger = Logger.getLogger(SSLCertOper.class); + + public static SSLContext getSSLContext() throws Exception { + // 初始化上下文 + SSLContext ctx = SSLContext.getInstance(Contants.SSL_JSSE_TYPE); + KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); + KeyStore ks = KeyStore.getInstance(Contants.SSL_KEYSTORE_TYPE); + ks.load(new FileInputStream(Contants.SSL_KEY_STORE), + Contants.SSL_KEY_STORE_PASS.toCharArray());// 载入keystore + kmf.init(ks, Contants.SSL_KEY_PRIVATE_PASS.toCharArray()); + + TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509"); + KeyStore tks = KeyStore.getInstance(Contants.SSL_KEYSTORE_TYPE); + tks.load(new FileInputStream(Contants.SSL_TRUST_KEY_STORE), + Contants.SSL_KEY_STORE_PASS.toCharArray());// 载入keystore + tmf.init(tks); + + ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), + new SecureRandom()); + logger.debug("load keystore success."); + + return ctx; + } + + /** + * 创建 密匙对(私钥和公钥) + * + */ + public static String createKeyAndCert(String aliasName, String storePath, + String localIp, String keyPass, String storePass, String certName) { + BufferedReader bReader = null; + Process process = null; + try { + process = Runtime.getRuntime().exec( + "keytool -genkey -v -alias " + aliasName + + " -keyalg RSA -storetype " + + Contants.SSL_KEYSTORE_TYPE + " -keystore " + + storePath + " -validity 90 -dname \"CN=" + + localIp + + ",OU=cn,O=cn,L=cn,ST=cn,C=cn\" -storepass " + + storePass + " -keypass " + keyPass); + process.getOutputStream().close(); + bReader = new BufferedReader(new InputStreamReader(process + .getInputStream())); + process.getErrorStream().close(); + String line = null; + while ((line = bReader.readLine()) != null) { + System.out.println(line); + } + + process = Runtime.getRuntime().exec( + "keytool -export -alias " + aliasName + " -storetype " + + Contants.SSL_KEYSTORE_TYPE + " -keystore " + + storePath + " -file " + certName + " -storepass " + + storePass + ""); + + bReader = new BufferedReader(new InputStreamReader(process + .getInputStream())); + while ((line = bReader.readLine()) != null) { + System.out.println(line); + } + + return certName; + } catch (IOException e) { + logger.error("Create a key pair error!"); + logger.error(Utils.printExceptionStack(e)); + return null; + } finally { + if (bReader != null) { + try { + bReader.close(); + } catch (IOException e) { + logger.error(Utils.printExceptionStack(e)); + } + } + } + + } + + /** + * 将公钥引入KeyStore + * + */ + public static boolean importCertToStore(String aliasName, String storePath, + String certName, String storePass) { + BufferedReader bReader = null; + PrintWriter pw = null; + try { + Process process = Runtime.getRuntime().exec( + "keytool -import -v -trustcacerts -alias " + aliasName + + " -keystore " + storePath + " -file " + certName + + " -storetype " + Contants.SSL_KEYSTORE_TYPE + + " -storepass " + storePass + ""); + + bReader = new BufferedReader(new InputStreamReader(process + .getInputStream())); + pw = new PrintWriter(process.getOutputStream()); + + pw.write("y"); + pw.flush(); + pw.close(); + + String line = null; + while ((line = bReader.readLine()) != null) { + System.out.println(line); + } + + return true; + } catch (IOException e) { + logger.error("Error of importing authentication certificate!"); + logger.error(Utils.printExceptionStack(e)); + return false; + } finally { + if (pw != null) { + pw.close(); + } + if (bReader != null) { + try { + bReader.close(); + } catch (IOException e) { + logger.error(Utils.printExceptionStack(e)); + } + } + } + + } + + /** + * 删除KeyStore库中的密钥 + * + */ + public static boolean deleteKeyOrCertFromStore(String aliasName, + String storePath, String storePass) { + BufferedReader bReader = null; + PrintWriter pw = null; + try { + Process process = Runtime.getRuntime().exec( + "keytool -delete -v -alias " + aliasName + " -keystore " + + storePath + " -storetype " + + Contants.SSL_KEYSTORE_TYPE + " -storepass " + + storePass + ""); + + bReader = new BufferedReader(new InputStreamReader(process + .getInputStream())); + pw = new PrintWriter(process.getOutputStream()); + + // pw.write("y"); + pw.flush(); + pw.close(); + + String line = null; + while ((line = bReader.readLine()) != null) { + System.out.println(line); + } + + return true; + } catch (IOException e) { + logger.error("Delete" + storePath+ "library Key" + aliasName + "make a mistake!"); + logger.error(Utils.printExceptionStack(e)); + return false; + } finally { + if (pw != null) { + pw.close(); + } + if (bReader != null) { + try { + bReader.close(); + } catch (IOException e) { + logger.error(Utils.printExceptionStack(e)); + } + } + } + + } + + /** + * test main + * + * @time Aug 28, 2011-12:17:28 PM + * @param args + */ + public static void main(String args[]) { + String newServerKeyName = "serverks" + + DateUtil.getCurrentDate(DateUtil.YYYYMMDD); + String newServerKeyPsw = "123456"; + String newClientkeyName = "clientks" + + DateUtil.getCurrentDate(DateUtil.YYYYMMDD); + String newClientkeyPsw = "123456"; + String filepath0 = SSLCertOper.createKeyAndCert(newServerKeyName, + "D:\\workspace\\nms_client\\src\\key\\server_ks", "10.0.6.120", + newServerKeyPsw, "server", + "D:\\workspace\\nms_client\\src\\key\\server.cer"); + + SSLCertOper.importCertToStore(newServerKeyName, + "D:\\workspace\\nms_client\\src\\key\\client_ts", + "D:\\workspace\\nms_client\\src\\key\\server.cer", "client"); + + String filepath1 = SSLCertOper.createKeyAndCert(newClientkeyName, + "D:\\workspace\\nms_client\\src\\key\\client_ks", "localhost", + newClientkeyPsw, "client", + "D:\\workspace\\nms_client\\src\\key\\client.cer"); + + SSLCertOper.importCertToStore(newClientkeyName, + "D:\\workspace\\nms_client\\src\\key\\server_ts", + "D:\\workspace\\nms_client\\src\\key\\client.cer", "server"); + System.out.println(filepath0); + System.out.println(filepath1); + // Config.setValueByName("ssl.server.key.old", + // Constants.SSL_SERVER_KEY_NEW); + // Config.setValueByName("ssl.server.key.old.psw", + // Constants.SSL_SERVER_KEY_NEW_PSW); + // Config.setValueByName("ssl.server.key.new",newServerKeyName); + // Config.setValueByName("ssl.server.key.new.psw", newServerKeyPsw); + // Config.setValueByName("ssl.client.key",newClientkeyName); + // Config.setValueByName("ssl.client.key.psw", newClientkeyPsw); + } +} diff --git a/src/com/nis/nmsclient/thread/socket/SSLClient.java b/src/com/nis/nmsclient/thread/socket/SSLClient.java new file mode 100644 index 0000000..db8dd6d --- /dev/null +++ b/src/com/nis/nmsclient/thread/socket/SSLClient.java @@ -0,0 +1,288 @@ +package com.nis.nmsclient.thread.socket; + +import java.io.File; +import java.util.List; +import java.util.concurrent.Callable; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.SSLSocketFactory; + +import net.sf.json.JSONArray; + +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; + +import com.nis.nmsclient.common.Contants; +import com.nis.nmsclient.model.ReturnFilePO; +import com.nis.nmsclient.thread.task.TaskReqHandle; +import com.nis.nmsclient.thread.task.TaskResultOper; +import com.nis.nmsclient.util.FileUtil; +import com.nis.nmsclient.util.Utils; + + +/** + * 安全通讯的客户端 + **/ + +public class SSLClient extends CommonSocket implements Callable<Object>{ + Logger logger = Logger.getLogger(SSLClient.class); + private String name; + private String reqCmd; + private Object obj; + private String serverHost; + + public SSLClient(String name, String reqCmd, Object obj) { + this.name = name; + this.reqCmd = reqCmd; + this.obj = obj; + this.serverHost = Contants.SOCKET_SERVER_HOST; + } + + public SSLClient(String name, String reqCmd, Object obj, String serverHost) { + this.name = name; + this.reqCmd = reqCmd; + this.obj = obj; + this.serverHost = serverHost; + } + + /** + * 初始化客户端Socket + **/ + public void init() throws Exception { + SSLContext ctx = SSLCertOper.getSSLContext(); + SSLSocketFactory ssf = ctx.getSocketFactory(); + + socket = (SSLSocket) ssf.createSocket(serverHost, + Contants.SOCKET_SERVER_PORT); + logger.debug("create socket success."); + + //2014-1-23 hyx 如果建立socket成功,但是startHandshake握手失败,且未设置超时时间时,则会一直阻塞 + socket.setSoTimeout(1000 * 60 * Contants.SOCKET_TIMEOUT_MINUTES); + + ((SSLSocket) socket).startHandshake(); + logger.debug("handshake success."); + } + + @Override + public Object call() throws Exception { +// Thread.currentThread().setName(name + ">>通信 " + serverHost); + Thread.currentThread().setName(name + ">>Communication " + serverHost); + Object object = null; + try { + init(); + if(socket!=null){ + out = socket.getOutputStream(); + in = socket.getInputStream(); + //设置超时时间 + socket.setSoTimeout(1000 * 60 * Contants.SOCKET_TIMEOUT_MINUTES); + object = toDo(); + } + }catch (Exception e) { +// object = Contants.COMMON_MSG_FAIL + Contants.COMMON_MSG_SEPRATOR + "异常"; + object = Contants.COMMON_MSG_FAIL + Contants.COMMON_MSG_SEPRATOR + "anomaly"; + logger.error("Communication anomaly:" + Utils.printExceptionStack(e)); + } finally { + logger.debug("关闭通信"); + close(); + } + + return object; + } + + protected Object toDo() throws Exception { + logger.debug("发送通信请求:" + reqCmd); + //-- 无效操作处理 + if(StringUtils.isEmpty(reqCmd)){ + return null; + } + boolean flag = false; + String msg = null; + String result = null; + //-- 命令判断 + // 与Server通信 + if(reqCmd.equals(REQ_HAND_SHAKE)){ + flag = this.sendMessageByChar(reqCmd); + logger.debug("握手状态:" + (result = this.receiveMessageByChar())); + } + // 通知Server准备升级 + if(reqCmd.equals(REQ_SERVER_UPGRADE)){ + flag = this.sendMessageByChar(reqCmd); + result = this.receiveMessageByChar(); + } + // 获取本机标志UUID + if(reqCmd.equals(REQ_LOCAL_UUID)){ + flag = this.sendMessageByChar(reqCmd); + msg = this.receiveMessageByChar(); + flag = this.sendMessageByChar(CommonSocket.SUCCESS); + logger.info("本机标志UUID:" + msg); + } + //注释 by jinsj 2012-0531 修改为DC主动获取NC时间 + // 获取服务器系统时间 + if(reqCmd.equals(REQ_SERVER_SYSTEMDATE)){ + flag = this.sendMessageByChar(reqCmd); + msg = this.receiveMessageByChar(); + logger.debug("服务器系统时间:" + msg); + flag = this.sendMessageByChar(CommonSocket.SUCCESS); + } + // 发送本机的变更信息(操作系统类型和IP) + if(reqCmd.equals(REQ_LOCAL_CHANGE)){ + // 发送请求 + flag = this.sendMessageByChar(reqCmd); + result = this.receiveMessageByChar(); + // 发送信息: UUID$@$操作系统类型$@$LocalIP + flag = this.sendMessageByChar((String)obj); + // 接收变更结果: 0/1 $@$ 信息 + msg = this.receiveMessageByChar(); + flag = this.sendMessageByChar(CommonSocket.SUCCESS); + return msg; + } + // 初始化配置 + if(reqCmd.equals(REQ_INIT_CONFIG)){ + flag = this.sendMessageByChar(reqCmd); + msg = this.receiveMessageByChar();//接收配置信息 + flag = this.sendMessageByChar(CommonSocket.SUCCESS); + } + // 初始化任务 + if(reqCmd.equals(REQ_INIT_TASK)){ + // 发送请求 + flag = this.sendMessageByChar(reqCmd); + result = this.receiveMessageByChar(); + // 发送本机唯一标识 + flag = this.sendMessageByChar(Contants.AGENT_HOST_UUID + ""); + msg = this.receiveMessageByChar(); + flag = this.sendMessageByChar(CommonSocket.SUCCESS); + if (msg != null && !"".equals(msg)) { + JSONArray jsonArr = JSONArray.fromObject(msg); + //这里处理的任务,原则上不应含有”1 文件推送“类型的任务 + for (int i = 0; i < jsonArr.size(); i++) { + new TaskReqHandle().taskHandle(jsonArr.get(i).toString()); + } + } + logger.debug("初始化任务完成--" + msg); + return null; + } + // 发送主动告警信息 + if(reqCmd.equals(REQ_ALARM)){ + if(obj!=null){ + // 主动告警请求 + flag = this.sendMessageByChar(reqCmd); + result = this.receiveMessageByChar(); + // 发送主动告警信息内容 + flag = this.sendMessageByChar((String)obj); + logger.debug("主动告警信息:" + (String)obj); + result = this.receiveMessageByChar(); + }else{ + logger.debug("主动告警信息为空"); + } + } + // 发送任务结果 + if(reqCmd.equals(REQ_TASK_RESULT)){ + if(obj!=null){ + //发送任务结果请求 + flag = this.sendMessageByChar(reqCmd); + result = this.receiveMessageByChar(); + //发送任务结果内容 + flag = this.sendMessageByChar((String)obj); + result = this.receiveMessageByChar(); + }else{ + logger.warn("Task result information is empty"); + } + } + // 批量上传监测数据【数据收集方式改为DC主动后,此通信废弃】 + if(reqCmd.equals(REQ_UPLOAD_DATAS)){ + if(obj!=null && obj instanceof Object[]) { + Object[] objArr = (Object[])obj; + if (objArr != null && objArr.length > 1 && objArr[0] != null + && objArr[1] != null && objArr[1] instanceof List) { + //发送上传数据请求 + flag = this.sendMessageByChar(reqCmd); + result = this.receiveMessageByChar(); + //上传数据 + this.sendFileByBath((String) objArr[0], (List<File>) objArr[1]); + result = this.receiveMessageByChar(); + }else{ + logger.warn("Uploading the contents of the monitored data object is incorrect"); + } + }else{ + logger.warn("Uploading monitoring data objects is empty"); + } + } + // 任务执行的回传文件:单个文件发送,断点续传【数据收集方式改为DC主动后,此类废弃】 + if(reqCmd.equals(REQ_TASK_RETURNFILE)){ + if(obj!=null && obj instanceof ReturnFilePO){ + ReturnFilePO rfPo = (ReturnFilePO)obj; + //发送回传文件请求 + flag = this.sendMessageByChar(reqCmd); + result = this.receiveMessageByChar(); + //发送回传文件任务信息 + this.sendMessageByChar(TaskResultOper.getTaskResultMsg(rfPo + .getTaskId(), rfPo.getTaskType(), null, null, null, rfPo + .getStartTime(), rfPo.getEndTime(), rfPo.getIsLoop())); + result = this.receiveMessageByChar(); + //发送回传文件文件名称 + this.sendMessageByChar(rfPo.getReturnFileName()); + result = this.receiveMessageByChar(); + //发送回传文件 + flag = this.bpSendFile(Contants.localTaskReturnPath + File.separator + rfPo.getReturnFileName()); + result = this.receiveMessageByChar(); + }else{ + logger.warn("The return file object is empty"); + } + } + + // 发送压缩文件,断点续传 + if(reqCmd.equals(REQ_BP_UPLOAD_FIFE)){ + if(obj!=null && obj instanceof String[]) { + String[] strArr = (String[])obj; + if (strArr != null && strArr.length > 1){ + //打包上传文件请求 + flag = this.sendMessageByChar(reqCmd + ":" + strArr[0]); + result = this.receiveMessageByChar(); + //发送打包文件名 + File file = new File(strArr[1]); + flag = this.sendMessageByChar(file.getName()); + result = this.receiveMessageByChar(); + //上传打包文件 + flag = this.bpSendFile(strArr[1]); + result = this.receiveMessageByChar(); + //上传成功后删除或移动文件 + if(flag && SUCCESS.equalsIgnoreCase(result) && file.exists()){ + String dataType = strArr[0]; + if(BP_TYPE_DETECT_DATA.equalsIgnoreCase(dataType)){ + FileUtil.moveFile(file, Contants.localDataDonePath, true); + }else if(BP_TYPE_TASK_RESULT.equalsIgnoreCase(dataType)){ + FileUtil.moveFile(file, Contants.localTaskDonePath, true); + }else if(BP_TYPE_TASK_RETURN.equalsIgnoreCase(dataType)){ + FileUtil.moveFile(file, Contants.localTaskDonePath, true); + } + } + } + } + } + // 向DC发送NC端异常信息 + if(reqCmd.equals(REQ_ERROR_INFO)){ + if(obj!=null){ + flag = this.sendMessageByChar(reqCmd); + result = this.receiveMessageByChar(); + //发送异常内容 + flag = this.sendMessageByChar((String)obj); + result = this.receiveMessageByChar(); + }else{ + logger.warn("Abnormal information is empty"); + } + } + + if (flag && (SUCCESS.equalsIgnoreCase(result) || msg!=null)) { + msg = Contants.COMMON_MSG_SUCCESS + Contants.COMMON_MSG_SEPRATOR + (msg!=null ? msg : "成功"); + } else { +// msg = Contants.COMMON_MSG_FAIL + Contants.COMMON_MSG_SEPRATOR + "失败"; + msg = Contants.COMMON_MSG_FAIL + Contants.COMMON_MSG_SEPRATOR + "failed"; + } + logger.debug("SSLClient toDo()---" + msg); + + logger.debug("发送通信请求结束:" + reqCmd); + // -- 命令判断 + return msg; + } +}
\ No newline at end of file diff --git a/src/com/nis/nmsclient/thread/socket/SSLServer.java b/src/com/nis/nmsclient/thread/socket/SSLServer.java new file mode 100644 index 0000000..27852f8 --- /dev/null +++ b/src/com/nis/nmsclient/thread/socket/SSLServer.java @@ -0,0 +1,534 @@ +package com.nis.nmsclient.thread.socket; + +import java.io.File; +import java.io.IOException; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLServerSocket; +import net.sf.json.JSONObject; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.FalseFileFilter; +import org.apache.commons.io.filefilter.PrefixFileFilter; +import org.apache.commons.lang.ArrayUtils; +import org.apache.log4j.Logger; +import com.nis.nmsclient.common.Common; +import com.nis.nmsclient.common.Contants; +import com.nis.nmsclient.common.SysConfig; +import com.nis.nmsclient.config.DetecConfReqHandle; +import com.nis.nmsclient.model.ReturnFilePO; +import com.nis.nmsclient.model.Task1; +import com.nis.nmsclient.model.Task4; +import com.nis.nmsclient.model.Task6; +import com.nis.nmsclient.thread.alarm.AlarmUtil; +import com.nis.nmsclient.thread.alarm.ErrorCode; +import com.nis.nmsclient.thread.task.AgentCommand; +import com.nis.nmsclient.thread.task.TaskReqHandle; +import com.nis.nmsclient.util.DateUtil; +import com.nis.nmsclient.util.FileUtil; +import com.nis.nmsclient.util.FileWrUtil; +import com.nis.nmsclient.util.Utils; + +/** + * 用于安全通讯的服务Socket,采用java中的SSLServerSocket + * 接收服务端发送过来的对该客户端的各配置参数信息,并对相应程序进行设置 + **/ +public class SSLServer implements Runnable { + static Logger logger = Logger.getLogger(SSLServer.class); + SSLServerSocket ss = null; + + private String startTime;// 服务启动时间 + + public SSLServer() throws IOException{ + init(); + } + + /** + * 初始化服务Socket + **/ + public void init() throws IOException { + try { + startTime = System.currentTimeMillis() + ""; + + //初始化上下文 + SSLContext ctx = SSLCertOper.getSSLContext(); + ss = (SSLServerSocket) ctx.getServerSocketFactory() + .createServerSocket(Contants.SOCKET_AGENT_PORT); + ss.setNeedClientAuth(true);// 客户端要认证 + } catch (Exception e) { + logger.error(Utils.printExceptionStack(e)); +// throw new IOException("NmsClient监听端口[" + Contants.SOCKET_AGENT_PORT + "]创建失败"); + throw new IOException("NmsClient monitor port[" + Contants.SOCKET_AGENT_PORT + "]Create failure"); + } + } + + /** + * 重载方法:run 处理客户端的请求 + **/ + public void run() { + logger.info("通讯线程启动 成功"); + Socket socket = null; + while(true){ + try { + socket = ss.accept(); + if(!Common.NC_UPGRADE_FLAG){//当NC_UPGRADE_FLAG为false时,允许建立通讯,否则放弃通讯,用于NC升级功能 + logger.debug("来自:"+socket.getInetAddress().getHostAddress()); + Common.service.submit(new ServerThread(socket)); + }else{ //关闭 放弃的通讯 + logger.info("NC升级 抛弃通讯:"+socket.getInetAddress().getHostAddress()); + socket.close(); + } + } catch (Exception e) { + logger.error("Failure to establish communication " + ss.getInetAddress().getHostAddress() + + ": " + Utils.printExceptionStack(e)); + } + } + + } + + class ServerThread extends CommonSocket implements Runnable { + + public ServerThread(Socket s) throws Exception { + super(s); + } + + public void run(){ + String ip = null; + try { + ip = socket.getInetAddress().getHostAddress(); + + //设置超时时间 + socket.setSoTimeout(1000 * 60 * Contants.SOCKET_TIMEOUT_MINUTES); + +// Thread.currentThread().setName("通讯线程 》" + socket.getInetAddress().getHostAddress()); + Thread.currentThread().setName("Communication Thread 》" + socket.getInetAddress().getHostAddress()); + + String msg = this.receiveMessageByChar(); + logger.info("接收请求 " + msg); + + if(REQ_HAND_SHAKE.equalsIgnoreCase(msg)){// 握手操作 + //返回接收到的信息和NMSClient启动时间 + this.sendMessageByChar(SUCCESS+":"+msg+"|"+startTime); + }else if(SERVER_COLLECT_DATA.equalsIgnoreCase(msg)){// DC主动向NC收集数据 + /** ----当前通信DC_IP与配置DC_IP不同,更新IP---- **/ + if(!Contants.SOCKET_SERVER_HOST.equals(ip)){ + logger.info("变更通信DC_IP: " + Contants.SOCKET_SERVER_HOST + " --> " + ip); + Contants.SOCKET_SERVER_HOST = ip; + /** ----SeqId未取到,更新配置文件---- **/ + if(Contants.AGENT_HOST_UUID == null){ + SysConfig.updateConfigFile(Contants.SOCKET_SERVER_HOST_KEY, Contants.SOCKET_SERVER_HOST); + } + } + new ServerCollectData(this).sendData(); + }else if(SERVER_GET_TASKRESULT.equalsIgnoreCase(msg)){// DC再次向NC获取未入库的任务结果 + this.sendMessageByChar(SUCCESS); + String taskInfo = null; + List<File> fileList = new LinkedList<File>(); + while(!END.equalsIgnoreCase(taskInfo = this.receiveMessageByChar())){ + // taskInfo组织顺序:TaskId、TaskType、isLoop、startTime、endTime + String[] infos = taskInfo.split(Contants.COMMON_MSG_SEPRATOR_SPLIT); + if (infos.length < 4 || (!"0".equals(infos[2]) && infos.length < 5)) {// 参数个数不够4,或者周期任务参数个数不够5,则跳过本次处理 + logger.warn("DC gets the task result again, the task attribute is incomplete, skip this processing."); + this.sendMessageByChar(AgentCommand.RESULT_FAIL +// + Contants.COMMON_MSG_SEPRATOR + "任务参数不全,找不到任务结果"); + + Contants.COMMON_MSG_SEPRATOR + "i18n_client.SSLServer.sendMsg_n81i"); + continue; + } + String eTime = null; + if(infos.length > 4){// 非周期任务,参数个数为4 + eTime = infos[4]; + } + // 检查指定任务的的结果或回传文件是否存在:若存在,移动到incoming目录;若不存在,回复失败 + boolean isExistResult = checkTaskResultExist(infos[0], infos[1], infos[2], infos[3], eTime, fileList); + if(isExistResult){ + this.sendMessageByChar(AgentCommand.RESULT_OK + + Contants.COMMON_MSG_SEPRATOR + ""); + } else { + this.sendMessageByChar(AgentCommand.RESULT_FAIL +// + Contants.COMMON_MSG_SEPRATOR + "任务结果不存在"); + + Contants.COMMON_MSG_SEPRATOR + "i18n_client.SSLServer.noResult_n81i"); + } + } + this.sendMessageByChar(SUCCESS); + // 移动存在的任务结果和回传文件 + moveTaskResultOrReturn(fileList); + }else if(REQ_SERVER_SYSTEMDATE.equalsIgnoreCase(msg)){//add by jinsj 2012-05-31 DC主动获取NC时间 + this.sendMessageByChar(new Date().getTime()+""); + this.receiveMessageByChar(); + }else if(Contants.AGENT_HOST_UUID == null ){ + // 除了以上三个通信,其他通信都得判断SeqID是否获取到,若未取到,则要放弃通讯 + logger.info("NC尚未取到SeqID 抛弃通讯:"+socket.getInetAddress().getHostAddress()); + close(); + return; + } + + if(REQ_CERT.equalsIgnoreCase(msg)){ + this.sendMessageByChar(SUCCESS); + //接收证书 + this.receiveFileByByte(Contants.keyPath + File.separator + "server.cer"); + this.sendMessageByChar(SUCCESS); + //导入认证证书到库 + SSLCertOper.importCertToStore("serverks" + + DateUtil.getCurrentDate(DateUtil.YYYYMMDD), + Contants.SSL_TRUST_KEY_STORE, + Contants.keyPath + File.separator + "server.cer", Contants.SSL_KEY_STORE_PASS); + File file = new File(Contants.keyPath + File.separator + "server.cer"); + if(file.exists()){ + FileUtil.delDir(file); + } + + /*SSLClient sc = new SSLClient(); + SSLCertOper.CreateAndSendCert(sc); + sc.close();*/ + }else if(SERVER_UPDATE_CONFIG.equalsIgnoreCase(msg)){// 更新监测配置 + this.sendMessageByChar(SUCCESS); + String str = this.receiveMessageByChar(); + this.sendMessageByChar(SUCCESS); + logger.debug("updateConfig-->" + str); + new DetecConfReqHandle().handlerConfigByUpdate(str); + } else if(SEND_PLUGIN_SCRIPT_FILE.equalsIgnoreCase(msg)) { // 下发脚本 + File pluginDir = new File(Contants.localPluginScriptPath); + this.sendMessageByChar(SUCCESS); + String fileNames = this.receiveMessageByChar(); + Collection<?> files = FileUtils.listFiles(pluginDir, + new PrefixFileFilter(fileNames.split(",")), FalseFileFilter.FALSE); + for (Object file : files) { + ((File)file).delete(); + } + this.sendMessageByChar(SUCCESS); + this.bpReceiveFileByBath(pluginDir.getCanonicalPath()); + this.sendMessageByChar(SUCCESS); + + } else if(SERVER_FILE_PUSH.equalsIgnoreCase(msg)){//任务操作:文件推送 + this.sendMessageByChar(SUCCESS); + String str = this.receiveMessageByChar(); + logger.debug("task-->" + str); + this.sendMessageByChar(SUCCESS); + JSONObject jsonObj = JSONObject.fromObject(str); + String resultMsg = null; + if(str.contains("taskInfo")){ + JSONObject jsonObj2 = jsonObj.getJSONObject("taskInfo"); + Object obj = JSONObject.toBean(jsonObj2,Task1.class); + Task1 fileInfo = (Task1) obj; + // 接收文件 + resultMsg = new TaskReqHandle().filePush(this, fileInfo.getTaskParam(), fileInfo.getTaskId(), false); + } + if (resultMsg !=null && Contants.isSucessByResult(resultMsg)) { + this.sendMessageByChar(AgentCommand.RESULT_OK + + Contants.COMMON_MSG_SEPRATOR +// + "成功,详细信息如下:" + Contants.getDescByResult(resultMsg)); + + "i18n_client.SSLServer.success_n81i:" + Contants.getDescByResult(resultMsg)); + } else { + this.sendMessageByChar(AgentCommand.RESULT_FAIL + + Contants.COMMON_MSG_SEPRATOR +// + "失败,详细信息如下:" + Contants.getDescByResult(resultMsg)); + + "i18n_client.SSLServer.fail_n81i:" + Contants.getDescByResult(resultMsg)); + } + this.receiveMessageByChar(); + }else if(SERVER_UPGRADE.equalsIgnoreCase(msg)){//任务操作:升级 + this.sendMessageByChar(SUCCESS); + String str = this.receiveMessageByChar(); + logger.debug("task-->" + str); + this.sendMessageByChar(SUCCESS); + JSONObject jsonObj = JSONObject.fromObject(str); + String resultMsg = null; + TaskReqHandle handle = new TaskReqHandle(); + if(str.contains("taskInfo")){ + JSONObject jsonObj2 = jsonObj.getJSONObject("taskInfo"); + Object obj = JSONObject.toBean(jsonObj2,Task6.class); + Task6 task = (Task6) obj; + // 判断是否重新执行任务,并作提前处理 + reExecTask(task.getTaskId(), task.getOldTaskId()); + // 接收升级文件 + resultMsg = handle.filePush(this, task.getCommandParam(), + task.getTaskId(), true); + } + if (resultMsg !=null && Contants.isSucessByResult(resultMsg)) { + this.sendMessageByChar(AgentCommand.RESULT_SEND_OK + + Contants.COMMON_MSG_SEPRATOR +// + "下发成功,详细信息如下:" + Contants.getDescByResult(resultMsg)); + + "i18n_client.SSLServer.lssueSuccess_n81i:" + Contants.getDescByResult(resultMsg)); + } else { + this.sendMessageByChar(AgentCommand.RESULT_FAIL + + Contants.COMMON_MSG_SEPRATOR +// + "失败,详细信息如下: " + Contants.getDescByResult(resultMsg)); + + "i18n_client.SSLServer.fail_n81i: " + Contants.getDescByResult(resultMsg)); + } + String receiveMsg = this.receiveMessageByChar(); + if(resultMsg !=null && Contants.isSucessByResult(resultMsg) &&receiveMsg.equals(SUCCESS)){//处理升级 + handle.taskHandle(str); + } + }else if(SERVER_TASK.equalsIgnoreCase(msg)){//任务操作:命令执行和升级逆向任务 + this.sendMessageByChar(SUCCESS); + String str = this.receiveMessageByChar(); + logger.debug("task-->" + str); + this.sendMessageByChar(AgentCommand.RESULT_SEND_OK +// + Contants.COMMON_MSG_SEPRATOR + "下发成功"); + + Contants.COMMON_MSG_SEPRATOR + "i18n_client.SSLServer.lssueSuccess1_n81i"); + this.receiveMessageByChar(); + + //2015-6-23 针对reboot命令(之前存在会多次重启的问题,现修改为,接收到命令执行任务时,如果该任务的结果已经存在(incoming或者done里有),则不再执行) + try { + int taskType = 0; + JSONObject jsonObj = JSONObject.fromObject(str); + if(str.contains("typeInfo")){ + taskType = jsonObj.getInt("typeInfo"); + } + if(str.contains("taskInfo") && taskType==4){//taskType:命令执行任务(4) + JSONObject jsonObj2 = jsonObj.getJSONObject("taskInfo"); + Task4 task4 = (Task4)JSONObject.toBean(jsonObj2,Task4.class); + String taskId = task4.getTaskId()==null?"0":(task4.getTaskId()+""); + String isLoop = task4.getIsLoop()+""; + String startTime = task4.getStartTime()==null?"":task4.getStartTime()+""; + String endTime = task4.getEndTime()==null?"":task4.getEndTime()+""; + if(task4.getCommandType() == 2){//命令执行(4)->可执行命令(2) + logger.info("可执行命令 taskId:" + task4.getTaskId()); + List<File> fileList = new ArrayList<File>(); + boolean isExist = checkTaskResultExistFromDoneAndIncoming(taskId+"", taskType+"", isLoop, startTime, endTime, fileList);//非周期任务:0 + if(isExist) { + logger.info("任务已执行,不再重复执行:taskId:"+taskId+" taskType:"+taskType); + return; + } + } + } + } catch (Exception e) { + logger.error("For the next task, determine whether there is a result, if the result is no longer performing the exception", e); + } + + new TaskReqHandle().taskHandle(str); + }else if(SERVER_TASK_CANCEL.equalsIgnoreCase(msg)){//任务撤消操作 + this.sendMessageByChar(SUCCESS); + String str = this.receiveMessageByChar(); + logger.debug("taskcancle-->" + str); + if(str!=null && !"".equals(str)){ + Common.cancleTaskFuture(Long.parseLong(str), 0); + } + this.sendMessageByChar(AgentCommand.MISSION_CANCEL_FINISH +// + Contants.COMMON_MSG_SEPRATOR + "任务已撤消完成"); + + Contants.COMMON_MSG_SEPRATOR + "i18n_client.SSLServer.missionRevokeSuccess_n81i"); + this.receiveMessageByChar(); + }else if(ACTIVE_ALARM_START_ALERT.equalsIgnoreCase(msg)){// NC端是否报主动告警 变更 + this.sendMessageByChar(SUCCESS); + String str = this.receiveMessageByChar(); + this.sendMessageByChar(SUCCESS); + logger.debug("isStartActiveAlarm-->" + str); + JSONObject jsonObj = JSONObject.fromObject(str); + Boolean isStartActiveAlarm = (Boolean)jsonObj.get("showAutoAlarm"); + String webHandleTime = (String)jsonObj.get("webHandleTime"); + //更新Contants.ACTIIVE_ALARM_START + if(isStartActiveAlarm!=null) { + + Contants.ACTIIVE_ALARM_START = isStartActiveAlarm; + logger.info("NC是否主动告警:"+Contants.ACTIIVE_ALARM_START+" web端操作时间:"+webHandleTime); + } + } + + logger.debug("接收请求 " + msg + " 完成"); + } catch (Exception e) { + logger.error("Receiving information anomaly:" + Utils.printExceptionStack(e)); + if(ip==null){ + ip = Utils.getLocalIp(); + } +// AlarmUtil.sendNMSErrorMsg(ErrorCode.SocketError, ip , "NC通讯线程异常:" + e.getMessage()); + AlarmUtil.sendNMSErrorMsg(ErrorCode.SocketError, ip , "NC communication thread exception:" + e.getMessage()); + + return; + } finally { + logger.debug("关闭通信"); + close(); + } + } + + /** + * 重新执行任务,针对升级任务的推送文件的提前处理,将原任务的文件拷贝到新任务的临时目录 + * @param taskId + * @param oldTaskId + * @throws Exception + */ + private void reExecTask(Long taskId, Long oldTaskId) throws Exception { + // 如果原任务ID为空,说明不是重新执行任务,不执行任何操作 + if (oldTaskId == null || "".equals(oldTaskId.toString()) + || "0".equals(oldTaskId.toString())) { + return; + } + File tempDir = new File(Contants.localTempDataIncomingPath + File.separator + + "filepush_" + taskId); + if (!tempDir.exists()) { + tempDir.mkdirs(); + } + // 如果是升级任务,推送文件的保存路径getUpgradeTaskPushPath(taskId) + File oldFileDir = new File(TaskReqHandle.getUpgradeTaskPushPath(oldTaskId)); + try { + if(oldFileDir.exists()){ + FileUtils.copyDirectory(oldFileDir, tempDir); + } + } catch (IOException e) { + logger.error(e); + } + } + + /** + * DC再次获取任务结果 -- 检查指定任务的的结果或回传文件是否存在 + * @param isLoop 是否循环任务: 0 非周期, 1 周期 + * @param startTime 若非周期任务,升级时间或创建时间;若周期任务,某一周期的起始时间 + * @param endTime 若非周期任务,为空;若周期任务,某一周期的结束时间 + * @param fileList 用于存在找到的结果文件和回传文件 + * @return + */ + private boolean checkTaskResultExist(String taskId, String taskType, String isLoop, String sTime, String eTime, List<File> fileList) throws Exception{ + long startTime=(sTime==null || "".equals(sTime) || "null".equals(sTime)) ? 0l : Long.parseLong(sTime); + long endTime=(eTime==null || "".equals(eTime) || "null".equals(eTime)) ? 0l : Long.parseLong(eTime); + logger.debug("checkTaskResultExist startTime=" + DateUtil.getStingDate(DateUtil.YYYY_MM_DD_HH24_MM_SS, new Date(startTime))+" -- endTime=" + DateUtil.getStingDate(DateUtil.YYYY_MM_DD_HH24_MM_SS, new Date(endTime))); + + String dateName = DateUtil.getStingDate(DateUtil.YYYYMMDD, new Date(startTime)); + String prefix = "tasktype" + taskType + "_" + taskId; + + boolean isExistResult = false; + // 依次取nc_task/done下的result和return目录 + File[] fileDirs = FileUtil.getDirectoryArray(new File(Contants.localTaskDonePath)); + if(fileDirs==null){ + logger.info("fileDirs为空"); + }else{ + try{ + for(File dir : fileDirs){ + // -- 找到指定的日期目录dateName及之后的日期目录 + File[] dateFiles = FileUtil.sortASCByFileName(FileUtil.getDirsAfterDateName(dir, dateName)); + // -- 在找到的日期目录下检查文件是否存在 + for(File dateFile : dateFiles){ + File[] files = null; + if("0".equals(isLoop)){// 0 非周期 + files = FileUtil.getFilesStartWith(dateFile, prefix); + }else{//--- 周期任务取某一时间段内的结果与回传文件 + files = FileUtil.getFilesStartWithByMillis(dateFile, prefix, startTime, endTime); + } + + if(files.length>0){// 若在任一目录下找到,则不用再找其他日期目录,跳出第二个For循环 + fileList.addAll(Arrays.asList(files)); + isExistResult = true; + break; + } + } + } + }catch(Exception e){ + logger.error(e); + } + } + if(!isExistResult){ + logger.info("再次获取任务结果 > TaskId: " + taskId + ", TaskType: " + + taskType + ", IsLoop: " + isLoop + " > 任务结果不存在"); + } + + return isExistResult; + } + + /** + * + * 检查done和incoming里是否有任务结果信息(如果有,则不进行再次执行,避免重复执行任务,如重复reboot) + * @author dell Jun 23, 2015 + * @version 1.0 + * @param taskId + * @param taskType + * @param isLoop + * @param sTime + * @param eTime + * @param fileList + * @return + * @throws Exception + */ + private boolean checkTaskResultExistFromDoneAndIncoming(String taskId, String taskType, String isLoop, String sTime, String eTime, List<File> fileList) throws Exception{ + long startTime=(sTime==null || "".equals(sTime) || "null".equals(sTime)) ? 0l : Long.parseLong(sTime); + long endTime=(eTime==null || "".equals(eTime) || "null".equals(eTime)) ? 0l : Long.parseLong(eTime); + logger.debug("checkTaskResultExist startTime=" + DateUtil.getStingDate(DateUtil.YYYY_MM_DD_HH24_MM_SS, new Date(startTime))+" -- endTime=" + DateUtil.getStingDate(DateUtil.YYYY_MM_DD_HH24_MM_SS, new Date(endTime))); + + String dateName = DateUtil.getStingDate(DateUtil.YYYYMMDD, new Date(startTime)); + String prefix = "tasktype" + taskType + "_" + taskId; + + boolean isExistResult = false; + // 依次取nc_task/done和incoming下的result和return目录 + File[] fileDoneDirs = FileUtil.getDirectoryArray(new File(Contants.localTaskDonePath)); + File[] fileIncomingDirs = FileUtil.getDirectoryArray(new File(Contants.localTaskResultPath)); + File[] fileDirs = null; + if(fileIncomingDirs!=null && fileDoneDirs!=null) { + fileDirs = (File[])ArrayUtils.addAll(fileDoneDirs, fileIncomingDirs); + } + if(fileDirs==null){ + logger.info("fileDirs为空"); + }else{ + try{ + for(File dir : fileDirs){ + // -- 找到指定的日期目录dateName及之后的日期目录 + File[] dateFiles = FileUtil.sortASCByFileName(FileUtil.getDirsAfterDateName(dir, dateName)); + // -- 在找到的日期目录下检查文件是否存在 + for(File dateFile : dateFiles){ + File[] files = null; + if("0".equals(isLoop)){// 0 非周期 + files = FileUtil.getFilesStartWith(dateFile, prefix); + }else{//--- 周期任务取某一时间段内的结果与回传文件 + files = FileUtil.getFilesStartWithByMillis(dateFile, prefix, startTime, endTime); + } + + if(files.length>0){// 若在任一目录下找到,则不用再找其他日期目录,跳出第二个For循环 + fileList.addAll(Arrays.asList(files)); + isExistResult = true; + break; + } + } + } + }catch(Exception e){ + logger.error(e); + } + } + if(!isExistResult){ + logger.info("判断新下发的任务结果是否已经存在 > TaskId: " + taskId + ", TaskType: " + + taskType + ", IsLoop: " + isLoop + " > 任务结果不存在"); + }else { + logger.info("判断新下发的任务结果是否已经存在 > TaskId: " + taskId + ", TaskType: " + + taskType + ", IsLoop: " + isLoop + " > 任务结果已存在"); + } + + return isExistResult; + } + + /** + * DC再次获取任务结果 -- 移动找到的结果文件和回传文件到incoming目录 + */ + private void moveTaskResultOrReturn(List<File> fileList){ + if(fileList==null || fileList.size()==0){ + return; + } + try { + for(File file : fileList){ + // ---------- 任务回传文件处理 + if(file.getName().endsWith(Contants.TASK_RETURN_FILE_SUFFIX)){ + if(!file.exists() || !file.isFile()){ + continue; + } + // 移动实际回传的文件 + String[] resultArr = FileWrUtil.cfgFileReader(file); + if (resultArr != null && resultArr.length > 0) { + JSONObject jsonObject = JSONObject.fromObject(resultArr[0]); + ReturnFilePO rfPo = (ReturnFilePO) JSONObject.toBean(jsonObject, ReturnFilePO.class); + if(rfPo.getReturnFileName()!=null && !"".equals(rfPo.getReturnFileName())){ + File returnFile = new File(file.getParent() + File.separator + rfPo.getReturnFileName()); + FileUtil.moveFile(returnFile, Contants.localTaskReturnPath, true); + } + } + // 移动记录任务回传的临时文件 + FileUtil.moveFile(file, Contants.localTaskReturnPath, true); + }else { + // ---------- 任务结果处理 + FileUtil.moveFile(file, Contants.localTaskResultPath, true); + } + } + } catch (Exception e) { + logger.error("Get the task result again > mobile file exception again", e); + } + } + } + +}
\ No newline at end of file diff --git a/src/com/nis/nmsclient/thread/socket/ServerCollectData.java b/src/com/nis/nmsclient/thread/socket/ServerCollectData.java new file mode 100644 index 0000000..fa55551 --- /dev/null +++ b/src/com/nis/nmsclient/thread/socket/ServerCollectData.java @@ -0,0 +1,581 @@ +package com.nis.nmsclient.thread.socket; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; + +import net.sf.json.JSONObject; + +import org.apache.log4j.Logger; + +import com.nis.nmsclient.common.Contants; +import com.nis.nmsclient.model.ReturnFilePO; +import com.nis.nmsclient.thread.task.TaskResultOper; +import com.nis.nmsclient.util.DateUtil; +import com.nis.nmsclient.util.FileUtil; +import com.nis.nmsclient.util.FileWrUtil; +import com.nis.nmsclient.util.StringUtil; +import com.nis.nmsclient.util.Utils; +import com.nis.nmsclient.util.ZipUtil; + +/** + * 用于定时扫描并上传监测数据文件 + * + **/ +public class ServerCollectData { + Logger logger = Logger.getLogger(ServerCollectData.class); + private CommonSocket socket; + + public ServerCollectData(CommonSocket socket) { + this.socket = socket; + } + + public void sendData() { + logger.debug("传送数据开始 ~~~~~~~"); + try { + // 发送监测数据 + handleDetectData(); + // 发送任务结果 + handleTaskResult(); + // 发送任务回传文件 + handleTaskReturnFile(); + // 发送任务结果--针对有回传文件时写的任务结果 + handleTaskResult(); + // 结束通讯 + socket.sendMessageByChar(CommonSocket.END); + + } catch (Exception e) { + logger.error("Transmits data anomalies:" + Utils.printExceptionStack(e)); + } + logger.debug("传送数据结束 ~~~~~~~"); + } + + private void handleDetectData() throws Exception { + logger.debug("传送监测数据开始 ~~~~~~~"); + long startTime = System.currentTimeMillis(); + File parDir = new File(Contants.localDataCollection); + if(!parDir.exists()){ + return; + } + // == 1、针对数据文件过多时打包上传未完成的文件继续上传 + // ------------取所有未上传完成的Zip文件 + List<File> fileList = new LinkedList<File>(); + File[] fileArr = FileUtil.getFilesEndWith(parDir, ".zip"); + if (fileArr != null && fileArr.length > 0) { + for (File file : fileArr) { + if (!file.getName().startsWith(CommonSocket.BP_TYPE_DETECT_DATA)) { + continue; + } + fileList.add(file); + } + } + // ------------传送Zip文件 + if (fileList.size() > 0) { + sendZipFile(fileList, CommonSocket.DATA_TYPE_ZIP_DETECT); + } + + // == 2、检查当前数据文件数量,批量发送文件或打包上传 + File dataDir = new File(Contants.localDataFilePath); + if (!dataDir.exists()) { + logger.warn("Data directory“" + dataDir.getAbsolutePath() + "”Non-existent!!!"); + } else { + long total = 0; + List<File> allFiles = new ArrayList<File>(); + File[] dataDirs = FileUtil.getDirectoryArray(dataDir); + // ---- 数据处理 + total = handleNullDataFile(allFiles, dataDirs); + logger.info("本次收集监测数据文件总数:" + total + ", 正常数据:" + allFiles.size() + ", 空数据:" + (total - allFiles.size())); + total = allFiles.size();// 正常的要上传的数据个数 + + // --- 将所有数据文件一起打包,发送 + if (total > Contants.COMMON_ZIP_MIN_SIZE) { + long zipCnt = total/Contants.COMMON_ZIP_MAX_SIZE; + if (zipCnt > 0) {//2013-5-6 未上传的数据太多时,将监测数据压缩为多个文件 + for(int i=0; i<total/Contants.COMMON_ZIP_MAX_SIZE; i++){ + // 组织每次压缩的文件数组 + File[] dataFiles = new File[Contants.COMMON_ZIP_MAX_SIZE]; + int start = i * Contants.COMMON_ZIP_MAX_SIZE; + int end = (i + 1) * Contants.COMMON_ZIP_MAX_SIZE; + for (int j = start, k = 0; j < end; j++, k++) { + dataFiles[k] = allFiles.get(j); + } + // 压缩并发送监测数据 + compressAndSendDetecData(dataFiles); + } + }else{ + // 压缩并发送监测数据 + compressAndSendDetecData(dataDirs); + } + logger.info("本次收集将所有监测数据打包传送,监测数据总数:" + total + ",用时:" + + (System.currentTimeMillis() - startTime) + "ms"); + } else if (total > 0) { + // -- 按正常所有监测数据批量上传 + sendCSVData(dataDir, allFiles); + logger.info("本次收集传送监测数据总数:" + total + ",用时:" + + (System.currentTimeMillis() - startTime) + "ms"); + } else { + logger.info("本次收集未传送监测数据"); + } + } + logger.debug("传送监测数据结束 ~~~~~~~"); + } + + private void handleTaskResult() throws Exception { + logger.debug("传送任务结果开始 ~~~~~~~"); + long startTime = System.currentTimeMillis(); + // == 1、针对结果文件过多时打包上传未完成的文件继续上传 + File taskDir = new File(Contants.localTaskPath); + if (!taskDir.exists()) { + return; + } + // ------------取所有未上传完成的Zip文件 + List<File> fileList = new LinkedList<File>(); + File[] zipArr = FileUtil.getFilesEndWith(taskDir, ".zip"); + if (zipArr != null && zipArr.length > 0) { + for (File file : zipArr) { + if (!file.getName().startsWith(CommonSocket.BP_TYPE_TASK_RESULT)) { + continue; + } + fileList.add(file); + } + } + // ------------传送Zip文件 + if(fileList.size()>0){ + sendZipFile(fileList, CommonSocket.DATA_TYPE_ZIP_TASKRESULT); + } + + // == 2、检查当前结果文件数量,批量发送文件或打包上传 + File resultDir = new File(TaskResultOper.getTaskResultPath()); + if(!resultDir.exists()){ + return; + } + File[] fileArr = FileUtil.getFilesEndWith(resultDir, Contants.TASK_RESULT_FILE_SUFFIX); + // -- 将所有任务结果文件一起打包,发送 + if(fileArr.length > Contants.COMMON_ZIP_MIN_SIZE){ + int zipCnt = fileArr.length/Contants.COMMON_ZIP_MAX_SIZE; + if(zipCnt>0){//2013-5-6 未上传的结果文件太多时,将结果文件压缩为多个文件 + for(int i=0; i<fileArr.length/Contants.COMMON_ZIP_MAX_SIZE; i++){ + // 组织每次压缩的文件数组 + File[] resultFiles = new File[Contants.COMMON_ZIP_MAX_SIZE]; + int start = i * Contants.COMMON_ZIP_MAX_SIZE; + int end = (i + 1) * Contants.COMMON_ZIP_MAX_SIZE; + for(int j=start,k=0; j<end; j++,k++){ + resultFiles[k] = fileArr[j]; + } + // 压缩并发送任务结果 + compressAndSendTaskResult(resultFiles); + } + }else{ + // 压缩并发送任务结果 + compressAndSendTaskResult(fileArr); + } + + logger.info("本次收集将所有任务结果打包传送,任务结果总数:" + fileArr.length + ",用时:" + (System.currentTimeMillis() - startTime) + "ms"); + }else if(fileArr.length > 0){ + // -- 按正常的多个结果批量发送 + sendTaskResult(fileArr); + logger.info("本次收集传送任务结果总数:" + fileArr.length + ",用时:" + + (System.currentTimeMillis() - startTime) + "ms"); + } else { + logger.info("本次收集未传送任务结果"); + } + logger.debug("传送任务结果结束 ~~~~~~~"); + } + + private void handleTaskReturnFile() throws Exception { + logger.debug("传送回传文件开始 ~~~~~~~"); + long startTime = System.currentTimeMillis(); + // == 1、针对回传文件过多时打包上传未完成的文件继续上传 + File taskDir = new File(Contants.localTaskPath); + if (!taskDir.exists()) { + return; + } + // ------------取所有未上传完成的Zip文件 + List<File> fileList = new LinkedList<File>(); + File[] zipArr = FileUtil.getFilesEndWith(taskDir, ".zip"); + if (zipArr != null && zipArr.length > 0) { + for (File file : zipArr) { + if (!file.getName().startsWith(CommonSocket.BP_TYPE_TASK_RETURN)) { + continue; + } + fileList.add(file); + } + } + // ------------传送Zip文件 + if(fileList.size()>0){ + sendZipFile(fileList, CommonSocket.DATA_TYPE_ZIP_TASKRETURN); + } + + // == 2、检查当前回传文件数量,单个发送文件或打包上传 + File returnDir = new File(Contants.localTaskReturnPath); + if(!returnDir.exists()){ + return; + } + File[] fileArr = FileUtil.getFilesEndWith(returnDir, Contants.TASK_RETURN_FILE_SUFFIX); + if(fileArr == null || fileArr.length == 0){ + return; + } + //--- 将所有任务的回传文件及回传信息保存文件一起打包,发送 + if(fileArr.length > Contants.COMMON_MAX_RETURN_CNT){ + //压缩并删除原文件 + String compressFileStr = Contants.localTaskPath + + File.separator + + CommonSocket.addTimeTagForFileName(CommonSocket.BP_TYPE_TASK_RETURN, + null, true) + + ".zip"; + // 2013-03-22 由于DC再次获取未保存任务结果这个功能的实现,现修改将任务结果和回传文件压缩时不删除文件,而是将其移动到相应的日期目录 + ZipUtil.zipWithMoveFile(returnDir.listFiles(), compressFileStr, false); + //发送 + sendZipFile(new File(compressFileStr), CommonSocket.DATA_TYPE_ZIP_TASKRETURN); + logger.info("本次收集将所有任务回传文件打包传送,回传文件总数:" + fileArr.length + ",用时:" + (System.currentTimeMillis() - startTime) + "ms"); + }else if(fileArr.length > 0){ + //-- 按正常的一个任务一个任务的回传 + sendTaskReturn(fileArr); + logger.info("本次收集传送任务回传总数:" + fileArr.length + ",用时:" + + (System.currentTimeMillis() - startTime) + "ms"); + } else { + logger.info("本次收集未传送任务回传文件"); + } + logger.debug("传送回传文件结束 ~~~~~~~"); + } + + /** + * 遍历所有准备上传的数据文件,将空数据文件移动到指定目录,并记录所有文件总数 + * @param allFiles 所有非空文件集合 + * @param dataDirs 所有数据目录 + * @return 所有文件个数(包括空文件) + * @throws Exception + */ + private long handleNullDataFile(List<File> allFiles, File[] dataDirs) throws Exception { + long total = 0l; + for(File dir : dataDirs){ + File[] files = FileUtil.getFilesEndWith(dir, ".csv"); + if(files==null || files.length==0){ + continue; + } + files = FileUtil.sortASCByModify(files); // 修改日期升序排序 + total += files.length; + for (File file : files) { + if (file.length() > 0) { + allFiles.add(file); + continue; + } + //--- 处理空文件数据:移动空文件数据到指定日期目录 + String dirTime = DateUtil.getStingDate( + DateUtil.YYYYMMDD, + new Date(file.lastModified())); + String newDir = Contants.localDataErrorPath + + File.separator + + file.getParentFile().getName() + + File.separator + dirTime; + FileUtil.moveFile(file, newDir, true); + } + } + return total; + } + + private void compressAndSendDetecData(File[] dataFiles) throws Exception{ + // 压缩并移动原文件 + String compressFileStr = Contants.localDataCollection + + File.separator + + CommonSocket.addTimeTagForFileName( + CommonSocket.BP_TYPE_DETECT_DATA, null, true) + + ".zip"; + // 2013-3-29 由于压缩上传数据后,主动告警线程部分对数据检查存在问题,现将压缩后删除数据改为移动数据到日期目录 + ZipUtil.zipWithMoveFile(dataFiles, compressFileStr, true); + // 发送 + sendZipFile(new File(compressFileStr), CommonSocket.DATA_TYPE_ZIP_DETECT); + } + + private void compressAndSendTaskResult(File[] resultFiles) throws Exception { + //压缩并删除原文件 + String compressFileStr = Contants.localTaskPath + + File.separator + + CommonSocket.addTimeTagForFileName(CommonSocket.BP_TYPE_TASK_RESULT, null, true) + + ".zip"; + // 2013-03-22 由于DC再次获取未保存任务结果这个功能的实现,现修改将任务结果和回传文件压缩时不删除文件,而是将其移动到相应的日期目录 + ZipUtil.zipWithMoveFile(resultFiles, compressFileStr, true); + //发送 + sendZipFile(new File(compressFileStr), CommonSocket.DATA_TYPE_ZIP_TASKRESULT); + } + + private void compressAndSendTaskReturn(File[] returnFiles) throws Exception { + //压缩并删除原文件 + String compressFileStr = Contants.localTaskPath + + File.separator + + CommonSocket.addTimeTagForFileName(CommonSocket.BP_TYPE_TASK_RETURN, + null, true) + + ".zip"; + // 2013-03-22 由于DC再次获取未保存任务结果这个功能的实现,现修改将任务结果和回传文件压缩时不删除文件,而是将其移动到相应的日期目录 + ZipUtil.zipWithMoveFile(returnFiles, compressFileStr, false); + //发送 + sendZipFile(new File(compressFileStr), CommonSocket.DATA_TYPE_ZIP_TASKRETURN); + } + + /** + * 发送打包文件:整个命令通信包装方法 + * @param file + * @throws Exception + */ + private void sendZipFile(File file, String dataType) throws Exception { + //打包上传文件请求 + socket.sendMessageByChar(dataType); + socket.receiveMessageByChar(); + socket.sendMessageByChar(file.getName());//发送打包文件名 + socket.receiveMessageByChar(); + //上传打包文件 + socket.bpSendFile(file.getAbsolutePath()); + String result = socket.receiveMessageByChar(); + //上传成功后移动文件 + if(CommonSocket.SUCCESS.equalsIgnoreCase(result)){ + if(CommonSocket.DATA_TYPE_ZIP_DETECT.equalsIgnoreCase(dataType)){ + FileUtil.moveFile(file, Contants.localDataDonePath, true); + }else if(CommonSocket.DATA_TYPE_ZIP_TASKRESULT.equalsIgnoreCase(dataType)){ + FileUtil.moveFile(file, Contants.localTaskDonePath, true); + }else if(CommonSocket.DATA_TYPE_ZIP_TASKRETURN.equalsIgnoreCase(dataType)){ + FileUtil.moveFile(file, Contants.localTaskDonePath, true); + } + } + } + + private void sendZipFile(List<File> fileList, String dataType) throws Exception { + for(File file : fileList){ + sendZipFile(file, dataType); + } + } + + // 批量发送ZIP的 + /*private void sendZipFile(List<File> fileList, String parDir) throws Exception { + //打包上传文件请求 + socket.sendMessageByChar(CommonSocket.DETECT_DATA_TYPE_ZIP); + socket.receiveMessageByChar(); + //上传打包文件 + socket.bpSendFileByBath(fileList, parDir); + String result = socket.receiveMessageByChar(); + //上传成功后移动文件 + if(CommonSocket.SUCCESS.equalsIgnoreCase(result)){ + for(File file : fileList){ + FileUtil.moveFile(file, Contants.localDataDonePath, true); + } + } + }*/ + + /** + * 批量发送CSV数据文件 + * @param dataDir + * @param allFiles + * @throws Exception + */ + private void sendCSVData(File dataDir, List<File> allFiles) throws Exception { + //发送上传数据请求 + socket.sendMessageByChar(CommonSocket.DATA_TYPE_CSV_DETECT); + socket.receiveMessageByChar(); + //上传数据 + socket.sendFileByBath(dataDir.getParent(), allFiles); + String result = socket.receiveMessageByChar(); + if (CommonSocket.SUCCESS.equalsIgnoreCase(result)) { + /** + * 移动上传成功的数据文件到指定日期目录 + */ + File[] files = new File[allFiles.size()]; + moveDetecDataToDateDir(allFiles.toArray(files)); + } + } + + /** + * 批量发送任务结果 + * @param fileArr + * @throws Exception + */ + private void sendTaskResult(File[] fileArr) throws Exception { + //2013-4-16 修改升序排列方式:按修改时间 改为 按文件名,任务结果文件名都有时间后缀(ms),文件修改时间只到s取不到ms + fileArr = FileUtil.sortASCByFileName(fileArr); + List<String> results = new LinkedList<String>(); + StringBuffer sb = new StringBuffer(); + for(File file : fileArr){ + sb.delete(0, sb.length()); + if(!file.exists() || !file.isFile()){ + continue; + } + String[] resultArr = FileWrUtil.cfgFileReader(file); + if(resultArr!=null && resultArr.length>0){ + for(String res : resultArr){ + sb.append(res + ";"); + } + sb.deleteCharAt(sb.length()-1); + results.add(sb.toString()); + } + } + logger.debug("sendTaskResult-->" + Arrays.toString(results.toArray())); + //发送任务结果请求 + socket.sendMessageByChar(CommonSocket.DATA_TYPE_OBJ_TASKRESULT); + socket.receiveMessageByChar(); + //发送任务结果内容 + socket.sendObject(results); + String result = socket.receiveMessageByChar(); + if (CommonSocket.SUCCESS.equalsIgnoreCase(result)) { + // 移动上传成功的任务结果到指定日期目录 + moveTaskResultToDateDir(fileArr); + } + } + /** + * 单个发送任务回传文件 + * @param fileArr + * @throws Exception + */ + private void sendTaskReturn(File[] fileArr) throws Exception { + fileArr = FileUtil.sortASCByModify(fileArr); //修改日期升序排序 + for(File file : fileArr){ + if(!file.exists() || !file.isFile()){ + continue; + } + String[] resultArr = FileWrUtil.cfgFileReader(file); + if (resultArr == null || resultArr.length <= 0) { + continue; + } + JSONObject jsonObject = JSONObject.fromObject(resultArr[0]); + ReturnFilePO rfPo = (ReturnFilePO) JSONObject.toBean(jsonObject, ReturnFilePO.class); + + //--回传文件名和回传描述信息均为空时,则无回传文件 + if(StringUtil.isEmpty(rfPo.getReturnFileName()) && StringUtil.isEmpty(rfPo.getResDesc())){ + logger.warn("No return file, no return"); + FileUtil.delDir(file); + continue; + } + //--回传文件名为空,但回传描述信息不为空,则进行步骤2发送任务结果、删除文件 + /** + * 步骤1、回传文件 + */ + StringBuffer sb = new StringBuffer(); + if(rfPo.getResDesc()!=null){//取已有的结果描述信息 + sb.append(rfPo.getResDesc()); + } + //准备回传文件,回传文件名不为空即有回传的文件时,再回传 + boolean success = false; + if(rfPo.getReturnFileName()!=null && rfPo.getReturnFileName().trim().length()>0){ + //发送回传文件请求 + socket.sendMessageByChar(CommonSocket.DATA_TYPE_FILE_TASKETURN); + socket.receiveMessageByChar(); + //发送回传文件任务信息 + socket.sendMessageByChar(TaskResultOper.getTaskResultMsg(rfPo + .getTaskId(), rfPo.getTaskType(), null, null, null, rfPo + .getStartTime(), rfPo.getEndTime(), rfPo.getIsLoop())); + socket.receiveMessageByChar(); + //发送回传文件文件名称 + socket.sendMessageByChar(rfPo.getReturnFileName()); + socket.receiveMessageByChar(); + //发送回传文件 + socket.bpSendFile(Contants.localTaskReturnPath + File.separator + rfPo.getReturnFileName()); + String result = socket.receiveMessageByChar(); + success = true; +// sb.append("回传成功"); + sb.append("i18n_client.ServerCollectData.transFile_n81i"); + }else{ + success = true; + } + + /** + * 步骤2、判断文件是否回传完成 + */ + if(success){ + /** + * 步骤2-1、发送任务结果 + */ + TaskResultOper.sendTaskResult(rfPo.getTaskId(), rfPo.getTaskType(), + rfPo.getState(), sb.toString(), "", rfPo.getStartTime(), + rfPo.getEndTime(), false, rfPo.getIsLoop()); + /** + * 步骤2-2、移动上传成功的 保存回传文件信息的文件 和 回传文件 到指定日期目录 + */ + moveTaskReturnToDateDir(file, rfPo.getReturnFileName()); + } + } + } + + + /** + * 移动上传成功的任务结果到指定日期目录 + * 完整文件到目录:.../done/result/yyyyMMdd + * 0大小文件到目录: .../error/result/yyyyMMdd + */ + public static void moveTaskResultToDateDir(File... fileArr){ + if(fileArr==null || fileArr.length==0){ + return; + } + for (File file : fileArr) { + String dirTime = DateUtil.getStingDate( + DateUtil.YYYYMMDD, + new Date(file.lastModified())); + String newDir = ""; + if (file.length() > 0) { + newDir = Contants.localTaskDonePath + + File.separator + + file.getParentFile().getName() + + File.separator + dirTime; + } else { + newDir = Contants.localTaskErrorPath + + File.separator + + file.getParentFile().getName() + + File.separator + dirTime; + } + // -- 文件移动 + FileUtil.moveFile(file, newDir, true); + } + } + + /** + * 移动上传成功的 保存回传文件信息的文件 和 回传文件 到指定日期目录 + * 完整文件到目录:.../done/return/yyyyMMdd + * 0大小文件到目录: .../error/return/yyyyMMdd + */ + public static void moveTaskReturnToDateDir(File file, String returnFileName){ + String dirTime = DateUtil.getStingDate( + DateUtil.YYYYMMDD, + new Date(file.lastModified())); + String newDir = Contants.localTaskDonePath + + File.separator + + file.getParentFile().getName() + + File.separator + dirTime; + // -- 文件移动 + FileUtil.moveFile(file, newDir, true);// 保留任务信息的临时文件.return + if(returnFileName!=null && !"".equals(returnFileName)){// 实际回传的文件 + File curReturnFile = new File(Contants.localTaskReturnPath + File.separator + returnFileName); + FileUtil.moveFile(curReturnFile, newDir, true); + } + + } + + /** + * 移动上传成功的数据文件到指定日期目录 + * 完整文件到目录:.../done/type_procIden/yyyyMMdd + * 0大小文件到目录: .../error/type_procIden/yyyyMMdd + */ + public static void moveDetecDataToDateDir(File... allFiles){ + if(allFiles==null || allFiles.length==0){ + return; + } + for (File file : allFiles) { + String dirTime = DateUtil.getStingDate( + DateUtil.YYYYMMDD, + new Date(file.lastModified())); + String newDir = ""; + if (file.length() > 0) { + newDir = Contants.localDataDonePath + + File.separator + + file.getParentFile().getName() + + File.separator + dirTime; + } else { + newDir = Contants.localDataErrorPath + + File.separator + + file.getParentFile().getName() + + File.separator + dirTime; + } + // -- 文件移动 + FileUtil.moveFile(file, newDir, true); + } + } + +}
\ No newline at end of file |
