summaryrefslogtreecommitdiff
path: root/src/com/nis/nmsclient/thread/upload/DataSendThread.java
diff options
context:
space:
mode:
authorchenjinsong <[email protected]>2018-09-27 16:11:54 +0800
committerchenjinsong <[email protected]>2018-09-27 16:11:54 +0800
commit56d71f261a8bd6031e47e2bf80867049a2aa13da (patch)
treef09257b2143782a333a9eda3395137837d9bdad1 /src/com/nis/nmsclient/thread/upload/DataSendThread.java
initial commit
Diffstat (limited to 'src/com/nis/nmsclient/thread/upload/DataSendThread.java')
-rw-r--r--src/com/nis/nmsclient/thread/upload/DataSendThread.java589
1 files changed, 589 insertions, 0 deletions
diff --git a/src/com/nis/nmsclient/thread/upload/DataSendThread.java b/src/com/nis/nmsclient/thread/upload/DataSendThread.java
new file mode 100644
index 0000000..bb51d43
--- /dev/null
+++ b/src/com/nis/nmsclient/thread/upload/DataSendThread.java
@@ -0,0 +1,589 @@
+package com.nis.nmsclient.thread.upload;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+import com.nis.nmsclient.common.Common;
+import com.nis.nmsclient.common.Contants;
+import com.nis.nmsclient.common.StopWatch;
+import com.nis.nmsclient.model.ReturnFilePO;
+import com.nis.nmsclient.thread.socket.CommonSocket;
+import com.nis.nmsclient.thread.socket.SSLCertOper;
+import com.nis.nmsclient.thread.task.TaskResultOper;
+import com.nis.nmsclient.util.DateUtil;
+import com.nis.nmsclient.util.FileUtil;
+import com.nis.nmsclient.util.FileWrUtil;
+import com.nis.nmsclient.util.StringUtil;
+import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream;
+
+import net.sf.json.JSONObject;
+
+/**
+ * 数据发送线程
+ * 1、暂时只发送监测数据,csv或zip
+ * 2、间隔 10 秒启动一次,同时具备心跳的功能
+ * 3、实现断线重连
+ * @author dell
+ *
+ */
+public class DataSendThread implements Runnable{
+ public static final Logger logger = Logger.getLogger(DataSendThread.class);
+ private Socket socket;
+ private OutputStream os;
+ private InputStream is;
+ private String host;
+ private int port;
+
+ public DataSendThread(String host,int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+
+ public void init() throws Exception {
+ SSLContext ctx = SSLCertOper.getSSLContext();
+ SSLSocketFactory ssf = ctx.getSocketFactory();
+ socket = (SSLSocket) ssf.createSocket(host,port);
+ logger.debug("create socket success.");
+
+ //2014-1-23 hyx 如果建立socket成功,但是startHandshake握手失败,且未设置超时时间时,则会一直阻塞
+ socket.setSoTimeout(1000 * 60 * Contants.SOCKET_TIMEOUT_MINUTES);
+ ((SSLSocket) socket).startHandshake();
+ logger.debug("handshake success.");
+ os = socket.getOutputStream();
+ is = socket.getInputStream();
+ }
+
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("DataSendThread");
+ logger.debug("开始");
+ StopWatch sw = new StopWatch();
+ sw.start();
+ try {
+ //重连
+ if(socket == null || socket.isClosed()|| !socket.isConnected()|| os == null || is ==null ){
+ init();
+ }
+ //发送心跳
+ sendHeartBeat();
+ //发送监测数据
+ sendDetectData();
+ //发送任务回传文件
+ sendTaskReturnFile();
+ //发送任务结果
+ sendTaskResult();
+
+ } catch (SSLException | SocketException e){
+ logger.error("SocketException|SSLException",e);
+ try {
+ init();
+ } catch (Exception e1) { }
+ } catch (Exception e) {
+ logger.error("",e);
+ }
+ sw.end();
+ logger.debug("耗时:"+sw.toString(sw.total()));
+ logger.debug("结束");
+ }
+
+
+
+ /**
+ * 发送监测数据
+ */
+ private void sendDetectData(){
+ StopWatch sw = new StopWatch();
+ try {
+ File dataDir = new File(Contants.localDataFilePath);
+ if (!dataDir.exists()) {
+ logger.warn("数据目录 : " + dataDir.getAbsolutePath() + " 不存在!!!");
+ } else{
+ long total = 0;
+ List<File> allFiles = new ArrayList<File>();
+ File[] dataDirs = FileUtil.getDirectoryArray(dataDir);
+ // ---- 数据处理
+ total = handleNullDataFile(allFiles, dataDirs);
+ logger.info("本次收集监测数据文件总数:" + total + ", 正常数据:" + allFiles.size() + ", 空数据:" + (total - allFiles.size()));
+ total = allFiles.size();// 正常的要上传的数据个数
+
+ if(total > 0){
+ for (int i = 0; i < total; i++) {
+ File tem = allFiles.get(i);
+ this.sendData(tem, BusinessType.DetectData);
+ String res = this.readLine();
+ if(CommonSocket.SUCCESS.equalsIgnoreCase(res)){//发送成功
+ this.moveDetecDataToDateDir(tem);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("",e);
+ }
+ sw.end();
+ logger.debug("耗时:"+sw.toString(sw.total()));
+ logger.debug("结束");
+ }
+
+
+ /**
+ * 发送任务结果
+ */
+ private void sendTaskResult() {
+ StopWatch sw = new StopWatch();
+ try {
+ logger.debug("传送任务结果开始 ~~~~~~~");
+ long startTime = System.currentTimeMillis();
+ // == 1、针对结果文件过多时打包上传未完成的文件继续上传
+ File taskDir = new File(Contants.localTaskPath);
+ if (!taskDir.exists()) {
+ return;
+ }
+
+ // == 2、检查当前结果文件数量,批量发送文件或打包上传
+ File resultDir = new File(TaskResultOper.getTaskResultPath());
+ if(!resultDir.exists()){
+ return;
+ }
+ // == 3
+ File[] fileArr = FileUtil.getFilesEndWith(resultDir, Contants.TASK_RESULT_FILE_SUFFIX);
+ fileArr = FileUtil.sortASCByFileName(fileArr);
+ List<String> results = new LinkedList<String>();
+ StringBuffer sb = new StringBuffer();
+ for(File file : fileArr){
+ sb.delete(0, sb.length());
+ if(!file.exists() || !file.isFile()){
+ continue;
+ }
+ String[] resultArr = FileWrUtil.cfgFileReader(file);
+ if(resultArr!=null && resultArr.length>0){
+ for(String res : resultArr){
+ sb.append(res + ";");
+ }
+ sb.deleteCharAt(sb.length()-1);
+ results.add(sb.toString());
+ }
+ }
+ if(results.size() >0 ){
+ logger.debug("sendTaskResult-->" + Arrays.toString(results.toArray()));
+ //发送任务结果请求
+ this.sendObject(results, BusinessType.ObjTaskResult);
+ String res = this.readLine();
+ if (CommonSocket.SUCCESS.equalsIgnoreCase(res)) {
+ // 移动上传成功的任务结果到指定日期目录
+ moveTaskResultToDateDir(fileArr);
+ }
+ }
+ sw.end();
+ logger.info("本次收集传送任务结果总数:" + fileArr.length + ",用时:" + sw.toString(sw.total()));
+ } catch (Exception e) {
+ logger.error("",e);
+ }
+ logger.debug("结束");
+ }
+
+
+ private void sendTaskReturnFile(){
+ StopWatch sw = new StopWatch();
+ logger.debug("传送回传文件开始 ~~~~~~~");
+ try {
+ // == 1、针对回传文件过多时打包上传未完成的文件继续上传
+ File taskDir = new File(Contants.localTaskPath);
+ if (!taskDir.exists()) {
+ return;
+ }
+
+ // == 2、检查当前回传文件数量,单个发送文件或打包上传
+ File returnDir = new File(Contants.localTaskReturnPath);
+ if(!returnDir.exists()){
+ return;
+ }
+ File[] fileArr = FileUtil.getFilesEndWith(returnDir, Contants.TASK_RETURN_FILE_SUFFIX);
+ if(fileArr == null || fileArr.length == 0){
+ return;
+ }
+
+ // == 3、发送
+ fileArr = FileUtil.sortASCByModify(fileArr); //修改日期升序排序
+ for(File file : fileArr){
+ if(!file.exists() || !file.isFile()){
+ continue;
+ }
+ String[] resultArr = FileWrUtil.cfgFileReader(file);
+ if (resultArr == null || resultArr.length <= 0) {
+ continue;
+ }
+ JSONObject jsonObject = JSONObject.fromObject(resultArr[0]);
+ ReturnFilePO rfPo = (ReturnFilePO) JSONObject.toBean(jsonObject, ReturnFilePO.class);
+
+ //--回传文件名和回传描述信息均为空时,则无回传文件
+ if(StringUtil.isEmpty(rfPo.getReturnFileName()) && StringUtil.isEmpty(rfPo.getResDesc())){
+ logger.warn("无回传文件, 不用回传");
+ FileUtil.delDir(file);
+ continue;
+ }
+ //--回传文件名为空,但回传描述信息不为空,则进行步骤2发送任务结果、删除文件
+ /**
+ * 步骤1、回传文件
+ */
+ StringBuffer sb = new StringBuffer();
+ if(rfPo.getResDesc()!=null){//取已有的结果描述信息
+ sb.append(rfPo.getResDesc());
+ }
+ //准备回传文件,回传文件名不为空即有回传的文件时,再回传
+ if(rfPo.getReturnFileName()!=null && rfPo.getReturnFileName().trim().length()>0){
+ //发送回传文件请求
+ String msg = TaskResultOper.getTaskResultMsg(rfPo
+ .getTaskId(), rfPo.getTaskType(), null, null, null, rfPo
+ .getStartTime(), rfPo.getEndTime(), rfPo.getIsLoop());
+ File returnFile = new File(Contants.localTaskReturnPath + File.separator + rfPo.getReturnFileName());
+ String res = this.sendReturnFile(msg, returnFile, BusinessType.FileTaskReturn);
+ if(CommonSocket.SUCCESS.equals(res)){
+ /**
+ * 步骤2-1、发送任务结果
+ */
+ TaskResultOper.sendTaskResult(rfPo.getTaskId(), rfPo.getTaskType(),
+ rfPo.getState(), sb.toString(), "", rfPo.getStartTime(),
+ rfPo.getEndTime(), false, rfPo.getIsLoop());
+ /**
+ * 步骤2-2、移动上传成功的 保存回传文件信息的文件 和 回传文件 到指定日期目录
+ */
+ moveTaskReturnToDateDir(file, rfPo.getReturnFileName());
+ }
+ }
+ }
+ sw.end();
+ logger.info("本次收集传送任务回传总数:" + fileArr.length + ",用时:" + sw.toString(sw.total()));
+ } catch (Exception e) {
+ logger.error("send Task Return File error",e);
+ }
+ logger.debug("结束");
+ }
+
+
+
+ /**
+ * 发送心跳
+ * @throws IOException
+ */
+ private void sendHeartBeat() throws IOException{
+ byte[] msg = (System.currentTimeMillis()+"").getBytes();
+ int len = msg.length;
+ int t = BusinessType.HeartBeat.getType();
+ os.write(Common.intToByteArray(len));
+ os.write(0);//消息类型
+ os.write(t);//业务类型
+ os.write(msg);
+ os.flush();
+ String res = readLine();
+ logger.debug("HeartBeat :" + res);
+ }
+
+ /***
+ * 发送任务回传文件
+ * @param msg
+ * @param file
+ * @param type
+ * @throws IOException
+ */
+ private String sendReturnFile(String msg,File file,BusinessType type) throws IOException{
+ String name = file.getName();
+ byte[] nameByte = name.getBytes();
+ if(nameByte.length > Byte.MAX_VALUE){
+ throw new IllegalArgumentException("file name too long,max length is 127");
+ }
+ byte[] msgBytes = msg.getBytes();
+ int msgLen = msgBytes.length;
+ int totalLen = (int)file.length() + msgBytes.length +4;
+ int t = type.getType();
+ os.write(Common.intToByteArray(totalLen));
+ os.write(nameByte.length);//文件名的长度
+ os.write(t);//业务类型
+ os.write(nameByte);//文件名
+ os.write(Common.intToByteArray(msgLen));//消息头长度
+ os.write(msgBytes);//消息头
+ FileInputStream fis = null;
+ try {
+ fis = new FileInputStream(file);
+ IOUtils.copy(fis, os);//文件内容
+ os.flush();
+ } finally {
+ if(fis != null){
+ fis.close();
+ }
+ }
+ String res = readLine();
+ logger.debug("sendReturnFile :" + res);
+ return res;
+ }
+
+ private void sendObject(Object data,BusinessType type) throws IOException{
+ ByteOutputStream bos = null;
+ ObjectOutputStream oos = null;
+ try {
+ bos = new ByteOutputStream();
+ oos = new ObjectOutputStream(bos);
+ oos.writeObject(data);
+ byte[] bytes = bos.getBytes();
+ int len = bytes.length;
+ int t = type.getType();
+ os.write(Common.intToByteArray(len));
+ os.write(0);
+ os.write(t);
+ os.write(bytes);
+ } finally {
+ if(bos != null){
+ bos.close();
+ }
+ if(oos != null){
+ oos.close();
+ }
+ }
+
+ }
+
+ /**
+ * 发送数据
+ * @param data
+ * @param type
+ * @throws IOException
+ */
+ private void sendData(byte[] data,BusinessType type) throws IOException{
+ int len = data.length;
+ int t = type.getType();
+ os.write(Common.intToByteArray(len));
+ os.write(0);
+ os.write(t);
+ os.write(data);
+ }
+
+ /**
+ * 发送数据
+ * @param file
+ * @param bt
+ * @throws IOException
+ */
+ private void sendData(File file,BusinessType bt) throws IOException{
+ int len = (int)file.length();
+ int t = bt.getType();
+ os.write(Common.intToByteArray(len));
+ os.write(0);
+ os.write(t);
+ FileInputStream fis = null;
+ try {
+ fis = new FileInputStream(file);
+ IOUtils.copy(fis, os);
+ os.flush();
+ } finally {
+ if(fis != null){
+ fis.close();
+ }
+ }
+ }
+
+
+ /**
+ * 发送文件 包括 文件名
+ * @param file
+ * @param type
+ * @throws IOException
+ */
+ private void sendFile(File file ,BusinessType type) throws IOException{
+ String name = file.getName();
+ byte[] nameByte = name.getBytes();
+ if(nameByte.length > Byte.MAX_VALUE){
+ throw new IllegalArgumentException("file name too long,max length is 127");
+ }
+ int len = (int)file.length();
+ os.write(Common.intToByteArray(len));//文件长度
+ os.write(nameByte.length);//文件名长度
+ os.write((byte)type.getType());//业务类型
+ os.write(name.getBytes());//文件名
+ FileInputStream fis = null;
+ try {
+ fis = new FileInputStream(file);
+ IOUtils.copy(fis, os);
+ os.flush();
+ } finally {
+ if(fis != null){
+ fis.close();
+ }
+ }
+
+ String res = readLine();
+ logger.debug("HeartBeat :" + res);
+ }
+
+ private String readLine() throws IOException{
+ BufferedReader br = new BufferedReader(new InputStreamReader(is, Contants.charset));
+ String str = br.readLine();
+ logger.debug("recieve : " + str);
+ return str;
+ }
+
+ /**
+ * 移动上传成功的数据文件到指定日期目录
+ * 完整文件到目录:.../done/type_procIden/yyyyMMdd
+ * 0大小文件到目录: .../error/type_procIden/yyyyMMdd
+ */
+ public static void moveDetecDataToDateDir(File... allFiles){
+ if(allFiles==null || allFiles.length==0){
+ return;
+ }
+ for (File file : allFiles) {
+ String dirTime = DateUtil.getStingDate(
+ DateUtil.YYYYMMDD,
+ new Date(file.lastModified()));
+ String newDir = "";
+ if (file.length() > 0) {
+ newDir = Contants.localDataDonePath
+ + File.separator
+ + file.getParentFile().getName()
+ + File.separator + dirTime;
+ } else {
+ newDir = Contants.localDataErrorPath
+ + File.separator
+ + file.getParentFile().getName()
+ + File.separator + dirTime;
+ }
+ // -- 文件移动
+ FileUtil.moveFile(file, newDir, true);
+ }
+ }
+
+ /**
+ * 遍历所有准备上传的数据文件,将空数据文件移动到指定目录,并记录所有文件总数
+ * @param allFiles 所有非空文件集合
+ * @param dataDirs 所有数据目录
+ * @return 所有文件个数(包括空文件)
+ * @throws Exception
+ */
+ private long handleNullDataFile(List<File> allFiles, File[] dataDirs) throws Exception {
+ long total = 0l;
+ for(File dir : dataDirs){
+ File[] files = FileUtil.getFilesEndWith(dir, ".csv");
+ if(files==null || files.length==0){
+ continue;
+ }
+ files = FileUtil.sortASCByModify(files); // 修改日期升序排序
+ total += files.length;
+ for (File file : files) {
+ if (file.length() > 0) {
+ allFiles.add(file);
+ continue;
+ }
+ //--- 处理空文件数据:移动空文件数据到指定日期目录
+ String dirTime = DateUtil.getStingDate(
+ DateUtil.YYYYMMDD,
+ new Date(file.lastModified()));
+ String newDir = Contants.localDataErrorPath
+ + File.separator
+ + file.getParentFile().getName()
+ + File.separator + dirTime;
+ FileUtil.moveFile(file, newDir, true);
+ }
+ }
+ return total;
+ }
+
+
+ /**
+ * 移动上传成功的任务结果到指定日期目录
+ * 完整文件到目录:.../done/result/yyyyMMdd
+ * 0大小文件到目录: .../error/result/yyyyMMdd
+ */
+ public static void moveTaskResultToDateDir(File... fileArr){
+ if(fileArr==null || fileArr.length==0){
+ return;
+ }
+ for (File file : fileArr) {
+ String dirTime = DateUtil.getStingDate(
+ DateUtil.YYYYMMDD,
+ new Date(file.lastModified()));
+ String newDir = "";
+ if (file.length() > 0) {
+ newDir = Contants.localTaskDonePath
+ + File.separator
+ + file.getParentFile().getName()
+ + File.separator + dirTime;
+ } else {
+ newDir = Contants.localTaskErrorPath
+ + File.separator
+ + file.getParentFile().getName()
+ + File.separator + dirTime;
+ }
+ // -- 文件移动
+ FileUtil.moveFile(file, newDir, true);
+ }
+ }
+
+ /**
+ * 移动上传成功的 保存回传文件信息的文件 和 回传文件 到指定日期目录
+ * 完整文件到目录:.../done/return/yyyyMMdd
+ * 0大小文件到目录: .../error/return/yyyyMMdd
+ */
+ public static void moveTaskReturnToDateDir(File file, String returnFileName){
+ String dirTime = DateUtil.getStingDate(
+ DateUtil.YYYYMMDD,
+ new Date(file.lastModified()));
+ String newDir = Contants.localTaskDonePath
+ + File.separator
+ + file.getParentFile().getName()
+ + File.separator + dirTime;
+ // -- 文件移动
+ FileUtil.moveFile(file, newDir, true);// 保留任务信息的临时文件.return
+ if(returnFileName!=null && !"".equals(returnFileName)){// 实际回传的文件
+ File curReturnFile = new File(Contants.localTaskReturnPath + File.separator + returnFileName);
+ FileUtil.moveFile(curReturnFile, newDir, true);
+ }
+
+ }
+
+
+ public static void main(String[] args) throws Exception {
+
+ DataSendThread.logger.debug("--------------开始-----------------");
+
+ int threadSize = 1;//默认开10个线程
+ int interval = 10;//间隔 60 s
+ if(args != null&&args.length >0){
+ threadSize = Integer.valueOf(args[0]);
+ }
+ if(args != null&&args.length >1){
+ interval = Integer.valueOf(args[1]);
+ }
+
+ ScheduledExecutorService stp = Executors.newScheduledThreadPool(100);
+ for(int i = 0;i< threadSize;i++){
+ DataSendThread dst = new DataSendThread("10.0.6.108", 9527);
+ stp.scheduleWithFixedDelay(dst, 0, interval, TimeUnit.SECONDS);
+ }
+ }
+
+}