diff options
| author | chenjinsong <[email protected]> | 2018-09-27 16:11:54 +0800 |
|---|---|---|
| committer | chenjinsong <[email protected]> | 2018-09-27 16:11:54 +0800 |
| commit | 56d71f261a8bd6031e47e2bf80867049a2aa13da (patch) | |
| tree | f09257b2143782a333a9eda3395137837d9bdad1 /src/com/nis/nmsclient/thread/task/TaskReqHandle.java | |
initial commit
Diffstat (limited to 'src/com/nis/nmsclient/thread/task/TaskReqHandle.java')
| -rw-r--r-- | src/com/nis/nmsclient/thread/task/TaskReqHandle.java | 397 |
1 files changed, 397 insertions, 0 deletions
diff --git a/src/com/nis/nmsclient/thread/task/TaskReqHandle.java b/src/com/nis/nmsclient/thread/task/TaskReqHandle.java new file mode 100644 index 0000000..3381742 --- /dev/null +++ b/src/com/nis/nmsclient/thread/task/TaskReqHandle.java @@ -0,0 +1,397 @@ +package com.nis.nmsclient.thread.task; + +import java.io.File; +import java.io.IOException; +import java.util.Date; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import net.sf.json.JSONObject; + +import org.apache.commons.io.FileUtils; +import org.apache.log4j.Logger; + +import com.nis.nmsclient.common.Common; +import com.nis.nmsclient.common.Contants; +import com.nis.nmsclient.model.CommandPO; +import com.nis.nmsclient.model.ParamFilePush; +import com.nis.nmsclient.model.ParamUpgrade; +import com.nis.nmsclient.model.Task4; +import com.nis.nmsclient.model.Task6; +import com.nis.nmsclient.thread.socket.CommonSocket; +import com.nis.nmsclient.util.FileUtil; +import com.nis.nmsclient.util.ProcessUtil; +import com.nis.nmsclient.util.Utils; + +public class TaskReqHandle { + Logger logger = Logger.getLogger(TaskReqHandle.class); + private Thread singleThread; + + /** + * 任务请求处理步骤1:分析任务请求参数,分不同任务类型处理 + */ + public void taskHandle(String str){ + Date execTime = new Date(); + boolean flag = true; + int taskType = 0; + long taskId = 0; + String threadName = null; + CommandPO command = null; + try { + JSONObject jsonObj = JSONObject.fromObject(str); + if(str.contains("typeInfo")){ + taskType = jsonObj.getInt("typeInfo"); + } + if(str.contains("taskInfo")){ + JSONObject jsonObj2 = jsonObj.getJSONObject("taskInfo"); + Object obj = null; + /** + * 任务类型:2 非流文本数据获取,3 流文本数据获取,4 命令执行,5 shell注册, 6升级 + */ + switch (taskType) { + case 4: + obj = JSONObject.toBean(jsonObj2,Task4.class); + Task4 task4 = (Task4) obj; + taskId = task4.getTaskId(); + /** + * 命令类型:1 Agent原生支持命令,2可执行命令(2 脚本,3 shell命令) + */ + if(task4.getCommandType() == 1){ +// threadName = "原生命令 id:" + task4.getTaskId() + ">>" + task4.getCommandName(); + threadName = "Native command ID:" + task4.getTaskId() + ">>" + task4.getCommandName(); + }else if(task4.getCommandType() == 2){ +// threadName = "可执行命令 id:" + task4.getTaskId(); + threadName = "Executable command ID:" + task4.getTaskId(); + } + command = new CommandPO(); + command.setExecId(task4.getTaskId()); + command.setExecType(task4.getTaskType()); + command.setCommandName(task4.getCommandName()); + command.setCommandParam(task4.getCommandParam()); + command.setExecState(task4.getState()); + command.setExecVersion(null); + command.setIsLoop(task4.getIsLoop()); + + handleTaskThread(task4.getTaskId(), task4.getStartTime(), task4.getEndTime(), task4 + .getIsLoop(), task4.getLoopDelay(), command, threadName, task4.getMissionState()); + + break; + case 6: + obj = JSONObject.toBean(jsonObj2,Task6.class); + Task6 task6 = (Task6) obj; + taskId = task6.getTaskId(); +// threadName = "升级 id:" + task6.getTaskId() + ">>" + task6.getCommandName(); + threadName = "Upgrade ID:" + task6.getTaskId() + ">>" + task6.getCommandName(); + command = new CommandPO(); + command.setExecId(task6.getTaskId()); + command.setExecType(task6.getTaskType()); + command.setCommandName(task6.getCommandName()); + command.setCommandParam(task6.getCommandParam()); + command.setExecState(task6.getState()); + command.setExecVersion(task6.getVersion()); + command.setSrcPath(getUpgradeTaskPushPath(task6.getTaskId())); + + handleTaskThread(task6.getTaskId(), task6.getUpgradeTime(), null, 0, 0, command, threadName, 0); + break; + default: + flag = false; + break; + } + }else{ + flag = false; + } + } catch (Exception e) { + logger.error(Utils.printExceptionStack(e)); + flag = false; + } + + if(!flag){ + TaskResultOper.sendTaskResult(taskId, taskType, +// AgentCommand.RESULT_FAIL, "发送内容格式不正确", "", execTime, new Date(), false, -1l); +// AgentCommand.RESULT_FAIL, "Incorrect content format", "", execTime, new Date(), false, -1l); + AgentCommand.RESULT_FAIL, "i18n_client.TaskReqHandle.sendInfoFormatErr_n81i", "", execTime, new Date(), false, -1l); + } + } + + /** + * 文件推送处理 + */ + public String filePush(CommonSocket socket, String taskParam, long taskId, boolean isUpgrade){ + String msg = null; + StringBuffer sb = new StringBuffer(); + File tempDir = null; + try { + tempDir = new File(Contants.localTempPath + File.separator + + "filepush_" + taskId); + if (!tempDir.exists()) { + tempDir.mkdirs(); + } + // ------步骤1:接收Md5校验的推送文件到临时目录 + int flag = socket.bpReceiveFileByBathMd5(tempDir.getAbsolutePath()); + + if (flag == 0){// ------步骤2:接收成功,与参数比对 + if(taskParam==null || "".equals(taskParam)){ + msg = Contants.COMMON_MSG_FAIL + Contants.COMMON_MSG_SEPRATOR + "i18n_client.TaskReqHandle.pushFileParamIsNull_n81i"; +// logger.info(msg);//i18nlog + return msg; + } + // ------步骤2-1:解析参数 + String[] params = taskParam.trim().split(AgentCommand.PARAM_SEPRATOR); + if (params != null && params.length >= 1) { + for (int i = 0; i < params.length; i++) { + //2012-4-28 任务参数中对路径的格式化将在界面上进行,原因此处会对转义字符的\也转换为/,故replace("\\", "/")去掉 + params[i] = params[i].trim().replaceAll("[\n\t\r]","");//.replace("\\", "/");//[\\s*\n\t\r] + logger.debug("filePush-->param: " + params[i]); + ParamFilePush fParam = null; + if(isUpgrade){ + ParamUpgrade cfu = (ParamUpgrade) JSONObject.toBean(JSONObject + .fromObject(params[i]), ParamUpgrade.class); + fParam = new ParamFilePush(); + fParam.setFileName(cfu.getFileName()); + fParam.setUsername(cfu.getUsername()); + fParam.setGroupName(cfu.getGroupName()); + fParam.setParam1(cfu.getParam1()); + }else{ + fParam = (ParamFilePush) JSONObject.toBean( + JSONObject.fromObject(params[i]), + ParamFilePush.class); + } + if(fParam.getDestPath()==null || fParam.getDestPath().trim().length()<=0){ + fParam.setDestPath(getUpgradeTaskPushPath(taskId));//设置默认推送目的地 + logger.debug("filePush-->destPath: " + fParam.getDestPath()); + } + if(fParam.getFileName()==null || fParam.getFileName().trim().length()<=0){ +// msg = "推送文件名参数为空"; +// msg = "File push parameters are empty"; + msg = "i18n_client.TaskReqHandle.pushFileNameParamIsNull_n81i"; +// logger.debug(msg);//i18nlog + break; + } + /*if(!ProcessUtil.checkUserPass(fParam.getUsername(), fParam.getParam1())){ + msg = "[" + fParam.getFileName() + "]推送文件的用户名或密码不正确;"; + logger.debug(msg); + break; + }*/ + // 判断用户名是否正确 + if(!ProcessUtil.checkUserOrGroupExist(fParam.getUsername(), fParam.getGroupName())){ +// msg = "[" + fParam.getFileName() + "]推送文件的属主或属群不正确;"; +// msg = "[" + fParam.getFileName() + "]The owner or group of the push file is incorrect;"; + msg = "[" + fParam.getFileName() + "]i18n_client.TaskReqHandle.userGroupErr_n81i;"; +// logger.debug(msg);//i18nlog + break; + } + // ------步骤2-2:文件存在并与Md5值比较文件是否完整 + File pushFile = new File(tempDir.getAbsolutePath() + + File.separator + fParam.getFileName()); + if (!pushFile.exists()){ +// msg = "推送临时文件不存在,请检查推送文件名称与参数文件名(" +// + fParam.getFileName() + ")是否一致"; +// msg = "The push temporary file does not exist. Please check whether the push file name is consistent with the parameter file name(" +// + fParam.getFileName() + ")"; + msg = "i18n_client.TaskReqHandle.pushFileNotExists_n81i(" + + fParam.getFileName() + ")"; +// logger.warn(msg + "--" + pushFile.getAbsolutePath());//i18nlog + break; + } + // ------步骤2-3:判断推送目录是否存在,不存在创建 + File destFile = new File(fParam.getDestPath() + + File.separator + fParam.getFileName()); + if(!destFile.getParentFile().exists()){ + destFile.getParentFile().mkdirs(); + } + // ------步骤2-4:判断是否直接覆盖 + if (fParam.getIsCover() != null + && "Y".equalsIgnoreCase(fParam.getIsCover())) {// 覆盖,则直接Copy并赋权限与所有者 + if (destFile.exists()) { + //destFile.delete_bak(); + //使用删除文件公共方法 + FileUtil.delDir(destFile); + logger.debug("filePush delete file--" + destFile.getAbsolutePath()); + //FileUtil.checkParentDirExist(destFile); + } + msg = copyAndSetPermission(pushFile + .getCanonicalPath(), destFile + .getCanonicalPath(), fParam.getUsername(), + fParam.getGroupName(), fParam + .getPermisson()); + } else if (!destFile.exists()) {// 不覆盖,则判断文件不存在的话,再Copy并赋权限与所有者 + msg = copyAndSetPermission(pushFile + .getCanonicalPath(), destFile + .getCanonicalPath(), fParam.getUsername(), + fParam.getGroupName(), fParam + .getPermisson()); + } + if(msg==null || msg.length()<=0){ +// sb.append("[" + fParam.getFileName() + "]成功推送到[" + fParam.getDestPath() + "];"); +// sb.append("[" + fParam.getFileName() + "]successfully pushed to[" + fParam.getDestPath() + "];"); + sb.append("[" + fParam.getFileName() + "]i18n_client.TaskReqHandle.successPush_n81i[" + fParam.getDestPath() + "];"); + logger.debug("推送文件" + (i+1) + "成功---" + pushFile.getCanonicalPath()); + }else{ +// msg = msg+"[" + fParam.getFileName() + "]推送失败;";//文件推送失败的具体原因 + msg = msg+"[" + fParam.getFileName() + "]i18n_client.TaskReqHandle.pushFail_n81i;";//文件推送失败的具体原因 + logger.debug("推送文件" + (i+1) + "失败---" + pushFile.getCanonicalPath()); + break; + } + }//for end + } else { +// msg = "文件推送参数不正确"; +// msg = "File push parameter is incorrect"; + msg = "i18n_client.TaskReqHandle.pushParamErr_n81i"; +// logger.warn(msg + "<" + taskParam + ">");//i18nlog + } + //所有文件推送成功,删除临时接收文件目录 + if(msg==null || msg.length()<=0){ + if(tempDir!=null && tempDir.exists()){ + try { + logger.debug("删除临时目录--" + tempDir.getAbsolutePath()); + FileUtils.deleteDirectory(tempDir); + FileUtil.checkParentDirExist(tempDir); + } catch (IOException e) { + } + } + } + }else { + socket.close(); + } + + if(msg == null){ + msg = Contants.COMMON_MSG_SUCCESS + Contants.COMMON_MSG_SEPRATOR + sb.toString(); + }else { + sb.append(msg); + msg = Contants.COMMON_MSG_FAIL + Contants.COMMON_MSG_SEPRATOR + sb.toString(); + } + + } catch (Exception e) { + logger.error("Receive push file exception:" + Utils.printExceptionStack(e)); +// msg = Contants.COMMON_MSG_FAIL + Contants.COMMON_MSG_SEPRATOR + "接收推送文件异常," + e.getMessage(); +// msg = Contants.COMMON_MSG_FAIL + Contants.COMMON_MSG_SEPRATOR + "Received push file exception," + e.getMessage(); + msg = Contants.COMMON_MSG_FAIL + Contants.COMMON_MSG_SEPRATOR + "i18n_client.TaskReqHandle.reciveFileErr_n81i," + e.getMessage(); + return msg; + }finally{ + if(tempDir!=null && tempDir.exists() && tempDir.listFiles().length==0){ + try { + FileUtils.deleteDirectory(tempDir); + logger.debug("finally删除临时目录--" + tempDir.getAbsolutePath()); + FileUtil.checkParentDirExist(tempDir); + } catch (IOException e) { + } + } + } + + return msg; + } + + /** + * 文件推送部分的拷备工作,由临时文件目录拷备到推送的最终目的地,并赋相应的权限组 + */ + private String copyAndSetPermission(String source, String destFile, String user, String group, String permission) throws Exception{ + String result = null; + if (source != null && destFile != null) { + // 根据操作系统确定获取进程ID的方式 + String os = System.getProperty("os.name"); + if (os.startsWith("Windows")) { + FileUtils.copyFile(new File(source), new File( + destFile));//目标路径不存在自动创建 + } else if (os.startsWith("Linux")) { + StringBuffer sb = new StringBuffer(); + //source destFile都不能含有空格 + source = source.replace(" ", "\\ "); + destFile = destFile.replace(" ", "\\ "); + sb.append("\\cp -f " + source + " " + destFile + ";");//2015-11-6 hyx: cp - f修改成\\cp -f (有时候如果不加\\会提示是否,就会有问题) + if (permission != null && !"".equals(permission.trim())) { + sb.append("chmod " + permission + " " + + destFile + ";"); + } + if (user != null && !"".equals(user.trim())) { + sb.append("chown " + user + " " + destFile + ";"); + } + if (group != null && !"".equals(group.trim())) { + sb.append("chgrp " + group + " " + destFile); + } + result = ProcessUtil.execLinuxCmd(sb.toString()); + } else { + throw new IOException("unknown operating system: " + os); + } + }else{ +// result = "源文件或目标文件为空"; +// result = "The source file or target file is empty"; + result = "i18n_client.TaskReqHandle.sourceOrTargetIsNull_n81i"; + } + + return result; + } + + + /** + * 任务请求处理步骤2:将分析包装好的任务,统一判断处理并添加到线程中执行 + */ + public void handleTaskThread(Long taskId, Long startTime, Long endTime, + long isLoop, long loopDelay, final CommandPO command, + final String threadName, long missionState) { + if(missionState == AgentCommand.MISSION_CANCEL_START){//如果任务状态为,撤消任务 + logger.warn("The task is in the revocation, and the ID is not processed:" + taskId); + return; + } + if(Common.getTaskFuture(taskId)!=null){//当前任务已存在执行,则不执行该当前任务 + logger.warn("The task already exists to execute the ID:" + taskId); + return; + } + // 设置任务结束时间,且当前时间已超过任务结束时间 + if (endTime != null && endTime.longValue() > 0 + && endTime.longValue() <= System.currentTimeMillis()) { + logger.warn("The task has expired ID:" + taskId); + return; + } + long delay = 0; + if (startTime != null) { + delay = startTime - System.currentTimeMillis(); + } + ScheduledFuture<?> taskFuture = null; + LoopTaskThread loopTaskThread = null; + if (isLoop == 0) {// 非周期任务 + taskFuture = Common.scheduled.schedule(new Runnable() { + public void run() { + Thread.currentThread().setName(threadName); + new AgentCommand(command).exec(); + } + }, delay, TimeUnit.MILLISECONDS); + } else { + Future<?> singleFuture = null; + if(delay <= 0){//开始时间之后接到任务,先执行一次,第二次按周期点执行 + long now = System.currentTimeMillis(); + long cnt = (now - startTime)/(loopDelay * 60 * 1000); + if((now - startTime)%(loopDelay * 60 * 1000)!=0){ + delay = startTime + loopDelay *60 *1000 * (cnt + 1) - System.currentTimeMillis(); + ///仅执行一次的,并在周期执行代码中第一次执行判断单次执行完成与否,未完成结束掉 + singleFuture = Common.scheduled.schedule(new Runnable() { + public void run() { + singleThread = Thread.currentThread(); +// Thread.currentThread().setName(threadName + " 周期单次"); + Thread.currentThread().setName(threadName + " Periodic Single Time"); + new AgentCommand(command).exec(); + } + }, 0, TimeUnit.MILLISECONDS); + } + } + loopTaskThread = new LoopTaskThread(threadName, command, loopDelay, singleFuture, singleThread); + taskFuture = Common.scheduled.scheduleAtFixedRate(loopTaskThread, delay, loopDelay * 60 * 1000, TimeUnit.MILLISECONDS); + // 周期任务,若设置结束时间,则添加取消线程 + if (endTime != null && endTime.longValue() > 0) { + long endDelay = endTime.longValue() - System.currentTimeMillis(); + if (endDelay > 0) { + Common.cancleTaskFuture(taskId, endDelay); + } + }// 取消线程结束 + + } + // 将正在执行的任务添加到全局变量,目的是避免重复执行任务 + Common.putTaskFuture(taskId, taskFuture, loopTaskThread); + } + + public static String getUpgradeTaskPushPath(long taskId){ + return Contants.localUploadsPath + File.separator + taskId; + } +} +
\ No newline at end of file |
