summaryrefslogtreecommitdiff
path: root/src/com/nis/nmsclient/thread/task/TaskReqHandle.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/com/nis/nmsclient/thread/task/TaskReqHandle.java')
-rw-r--r--src/com/nis/nmsclient/thread/task/TaskReqHandle.java397
1 files changed, 397 insertions, 0 deletions
diff --git a/src/com/nis/nmsclient/thread/task/TaskReqHandle.java b/src/com/nis/nmsclient/thread/task/TaskReqHandle.java
new file mode 100644
index 0000000..3381742
--- /dev/null
+++ b/src/com/nis/nmsclient/thread/task/TaskReqHandle.java
@@ -0,0 +1,397 @@
+package com.nis.nmsclient.thread.task;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import net.sf.json.JSONObject;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+
+import com.nis.nmsclient.common.Common;
+import com.nis.nmsclient.common.Contants;
+import com.nis.nmsclient.model.CommandPO;
+import com.nis.nmsclient.model.ParamFilePush;
+import com.nis.nmsclient.model.ParamUpgrade;
+import com.nis.nmsclient.model.Task4;
+import com.nis.nmsclient.model.Task6;
+import com.nis.nmsclient.thread.socket.CommonSocket;
+import com.nis.nmsclient.util.FileUtil;
+import com.nis.nmsclient.util.ProcessUtil;
+import com.nis.nmsclient.util.Utils;
+
+public class TaskReqHandle {
+ Logger logger = Logger.getLogger(TaskReqHandle.class);
+ private Thread singleThread;
+
+ /**
+ * 任务请求处理步骤1:分析任务请求参数,分不同任务类型处理
+ */
+ public void taskHandle(String str){
+ Date execTime = new Date();
+ boolean flag = true;
+ int taskType = 0;
+ long taskId = 0;
+ String threadName = null;
+ CommandPO command = null;
+ try {
+ JSONObject jsonObj = JSONObject.fromObject(str);
+ if(str.contains("typeInfo")){
+ taskType = jsonObj.getInt("typeInfo");
+ }
+ if(str.contains("taskInfo")){
+ JSONObject jsonObj2 = jsonObj.getJSONObject("taskInfo");
+ Object obj = null;
+ /**
+ * 任务类型:2 非流文本数据获取,3 流文本数据获取,4 命令执行,5 shell注册, 6升级
+ */
+ switch (taskType) {
+ case 4:
+ obj = JSONObject.toBean(jsonObj2,Task4.class);
+ Task4 task4 = (Task4) obj;
+ taskId = task4.getTaskId();
+ /**
+ * 命令类型:1 Agent原生支持命令,2可执行命令(2 脚本,3 shell命令)
+ */
+ if(task4.getCommandType() == 1){
+// threadName = "原生命令 id:" + task4.getTaskId() + ">>" + task4.getCommandName();
+ threadName = "Native command ID:" + task4.getTaskId() + ">>" + task4.getCommandName();
+ }else if(task4.getCommandType() == 2){
+// threadName = "可执行命令 id:" + task4.getTaskId();
+ threadName = "Executable command ID:" + task4.getTaskId();
+ }
+ command = new CommandPO();
+ command.setExecId(task4.getTaskId());
+ command.setExecType(task4.getTaskType());
+ command.setCommandName(task4.getCommandName());
+ command.setCommandParam(task4.getCommandParam());
+ command.setExecState(task4.getState());
+ command.setExecVersion(null);
+ command.setIsLoop(task4.getIsLoop());
+
+ handleTaskThread(task4.getTaskId(), task4.getStartTime(), task4.getEndTime(), task4
+ .getIsLoop(), task4.getLoopDelay(), command, threadName, task4.getMissionState());
+
+ break;
+ case 6:
+ obj = JSONObject.toBean(jsonObj2,Task6.class);
+ Task6 task6 = (Task6) obj;
+ taskId = task6.getTaskId();
+// threadName = "升级 id:" + task6.getTaskId() + ">>" + task6.getCommandName();
+ threadName = "Upgrade ID:" + task6.getTaskId() + ">>" + task6.getCommandName();
+ command = new CommandPO();
+ command.setExecId(task6.getTaskId());
+ command.setExecType(task6.getTaskType());
+ command.setCommandName(task6.getCommandName());
+ command.setCommandParam(task6.getCommandParam());
+ command.setExecState(task6.getState());
+ command.setExecVersion(task6.getVersion());
+ command.setSrcPath(getUpgradeTaskPushPath(task6.getTaskId()));
+
+ handleTaskThread(task6.getTaskId(), task6.getUpgradeTime(), null, 0, 0, command, threadName, 0);
+ break;
+ default:
+ flag = false;
+ break;
+ }
+ }else{
+ flag = false;
+ }
+ } catch (Exception e) {
+ logger.error(Utils.printExceptionStack(e));
+ flag = false;
+ }
+
+ if(!flag){
+ TaskResultOper.sendTaskResult(taskId, taskType,
+// AgentCommand.RESULT_FAIL, "发送内容格式不正确", "", execTime, new Date(), false, -1l);
+// AgentCommand.RESULT_FAIL, "Incorrect content format", "", execTime, new Date(), false, -1l);
+ AgentCommand.RESULT_FAIL, "i18n_client.TaskReqHandle.sendInfoFormatErr_n81i", "", execTime, new Date(), false, -1l);
+ }
+ }
+
+ /**
+ * 文件推送处理
+ */
+ public String filePush(CommonSocket socket, String taskParam, long taskId, boolean isUpgrade){
+ String msg = null;
+ StringBuffer sb = new StringBuffer();
+ File tempDir = null;
+ try {
+ tempDir = new File(Contants.localTempPath + File.separator
+ + "filepush_" + taskId);
+ if (!tempDir.exists()) {
+ tempDir.mkdirs();
+ }
+ // ------步骤1:接收Md5校验的推送文件到临时目录
+ int flag = socket.bpReceiveFileByBathMd5(tempDir.getAbsolutePath());
+
+ if (flag == 0){// ------步骤2:接收成功,与参数比对
+ if(taskParam==null || "".equals(taskParam)){
+ msg = Contants.COMMON_MSG_FAIL + Contants.COMMON_MSG_SEPRATOR + "i18n_client.TaskReqHandle.pushFileParamIsNull_n81i";
+// logger.info(msg);//i18nlog
+ return msg;
+ }
+ // ------步骤2-1:解析参数
+ String[] params = taskParam.trim().split(AgentCommand.PARAM_SEPRATOR);
+ if (params != null && params.length >= 1) {
+ for (int i = 0; i < params.length; i++) {
+ //2012-4-28 任务参数中对路径的格式化将在界面上进行,原因此处会对转义字符的\也转换为/,故replace("\\", "/")去掉
+ params[i] = params[i].trim().replaceAll("[\n\t\r]","");//.replace("\\", "/");//[\\s*\n\t\r]
+ logger.debug("filePush-->param: " + params[i]);
+ ParamFilePush fParam = null;
+ if(isUpgrade){
+ ParamUpgrade cfu = (ParamUpgrade) JSONObject.toBean(JSONObject
+ .fromObject(params[i]), ParamUpgrade.class);
+ fParam = new ParamFilePush();
+ fParam.setFileName(cfu.getFileName());
+ fParam.setUsername(cfu.getUsername());
+ fParam.setGroupName(cfu.getGroupName());
+ fParam.setParam1(cfu.getParam1());
+ }else{
+ fParam = (ParamFilePush) JSONObject.toBean(
+ JSONObject.fromObject(params[i]),
+ ParamFilePush.class);
+ }
+ if(fParam.getDestPath()==null || fParam.getDestPath().trim().length()<=0){
+ fParam.setDestPath(getUpgradeTaskPushPath(taskId));//设置默认推送目的地
+ logger.debug("filePush-->destPath: " + fParam.getDestPath());
+ }
+ if(fParam.getFileName()==null || fParam.getFileName().trim().length()<=0){
+// msg = "推送文件名参数为空";
+// msg = "File push parameters are empty";
+ msg = "i18n_client.TaskReqHandle.pushFileNameParamIsNull_n81i";
+// logger.debug(msg);//i18nlog
+ break;
+ }
+ /*if(!ProcessUtil.checkUserPass(fParam.getUsername(), fParam.getParam1())){
+ msg = "[" + fParam.getFileName() + "]推送文件的用户名或密码不正确;";
+ logger.debug(msg);
+ break;
+ }*/
+ // 判断用户名是否正确
+ if(!ProcessUtil.checkUserOrGroupExist(fParam.getUsername(), fParam.getGroupName())){
+// msg = "[" + fParam.getFileName() + "]推送文件的属主或属群不正确;";
+// msg = "[" + fParam.getFileName() + "]The owner or group of the push file is incorrect;";
+ msg = "[" + fParam.getFileName() + "]i18n_client.TaskReqHandle.userGroupErr_n81i;";
+// logger.debug(msg);//i18nlog
+ break;
+ }
+ // ------步骤2-2:文件存在并与Md5值比较文件是否完整
+ File pushFile = new File(tempDir.getAbsolutePath()
+ + File.separator + fParam.getFileName());
+ if (!pushFile.exists()){
+// msg = "推送临时文件不存在,请检查推送文件名称与参数文件名("
+// + fParam.getFileName() + ")是否一致";
+// msg = "The push temporary file does not exist. Please check whether the push file name is consistent with the parameter file name("
+// + fParam.getFileName() + ")";
+ msg = "i18n_client.TaskReqHandle.pushFileNotExists_n81i("
+ + fParam.getFileName() + ")";
+// logger.warn(msg + "--" + pushFile.getAbsolutePath());//i18nlog
+ break;
+ }
+ // ------步骤2-3:判断推送目录是否存在,不存在创建
+ File destFile = new File(fParam.getDestPath()
+ + File.separator + fParam.getFileName());
+ if(!destFile.getParentFile().exists()){
+ destFile.getParentFile().mkdirs();
+ }
+ // ------步骤2-4:判断是否直接覆盖
+ if (fParam.getIsCover() != null
+ && "Y".equalsIgnoreCase(fParam.getIsCover())) {// 覆盖,则直接Copy并赋权限与所有者
+ if (destFile.exists()) {
+ //destFile.delete_bak();
+ //使用删除文件公共方法
+ FileUtil.delDir(destFile);
+ logger.debug("filePush delete file--" + destFile.getAbsolutePath());
+ //FileUtil.checkParentDirExist(destFile);
+ }
+ msg = copyAndSetPermission(pushFile
+ .getCanonicalPath(), destFile
+ .getCanonicalPath(), fParam.getUsername(),
+ fParam.getGroupName(), fParam
+ .getPermisson());
+ } else if (!destFile.exists()) {// 不覆盖,则判断文件不存在的话,再Copy并赋权限与所有者
+ msg = copyAndSetPermission(pushFile
+ .getCanonicalPath(), destFile
+ .getCanonicalPath(), fParam.getUsername(),
+ fParam.getGroupName(), fParam
+ .getPermisson());
+ }
+ if(msg==null || msg.length()<=0){
+// sb.append("[" + fParam.getFileName() + "]成功推送到[" + fParam.getDestPath() + "];");
+// sb.append("[" + fParam.getFileName() + "]successfully pushed to[" + fParam.getDestPath() + "];");
+ sb.append("[" + fParam.getFileName() + "]i18n_client.TaskReqHandle.successPush_n81i[" + fParam.getDestPath() + "];");
+ logger.debug("推送文件" + (i+1) + "成功---" + pushFile.getCanonicalPath());
+ }else{
+// msg = msg+"[" + fParam.getFileName() + "]推送失败;";//文件推送失败的具体原因
+ msg = msg+"[" + fParam.getFileName() + "]i18n_client.TaskReqHandle.pushFail_n81i;";//文件推送失败的具体原因
+ logger.debug("推送文件" + (i+1) + "失败---" + pushFile.getCanonicalPath());
+ break;
+ }
+ }//for end
+ } else {
+// msg = "文件推送参数不正确";
+// msg = "File push parameter is incorrect";
+ msg = "i18n_client.TaskReqHandle.pushParamErr_n81i";
+// logger.warn(msg + "<" + taskParam + ">");//i18nlog
+ }
+ //所有文件推送成功,删除临时接收文件目录
+ if(msg==null || msg.length()<=0){
+ if(tempDir!=null && tempDir.exists()){
+ try {
+ logger.debug("删除临时目录--" + tempDir.getAbsolutePath());
+ FileUtils.deleteDirectory(tempDir);
+ FileUtil.checkParentDirExist(tempDir);
+ } catch (IOException e) {
+ }
+ }
+ }
+ }else {
+ socket.close();
+ }
+
+ if(msg == null){
+ msg = Contants.COMMON_MSG_SUCCESS + Contants.COMMON_MSG_SEPRATOR + sb.toString();
+ }else {
+ sb.append(msg);
+ msg = Contants.COMMON_MSG_FAIL + Contants.COMMON_MSG_SEPRATOR + sb.toString();
+ }
+
+ } catch (Exception e) {
+ logger.error("Receive push file exception:" + Utils.printExceptionStack(e));
+// msg = Contants.COMMON_MSG_FAIL + Contants.COMMON_MSG_SEPRATOR + "接收推送文件异常," + e.getMessage();
+// msg = Contants.COMMON_MSG_FAIL + Contants.COMMON_MSG_SEPRATOR + "Received push file exception," + e.getMessage();
+ msg = Contants.COMMON_MSG_FAIL + Contants.COMMON_MSG_SEPRATOR + "i18n_client.TaskReqHandle.reciveFileErr_n81i," + e.getMessage();
+ return msg;
+ }finally{
+ if(tempDir!=null && tempDir.exists() && tempDir.listFiles().length==0){
+ try {
+ FileUtils.deleteDirectory(tempDir);
+ logger.debug("finally删除临时目录--" + tempDir.getAbsolutePath());
+ FileUtil.checkParentDirExist(tempDir);
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ return msg;
+ }
+
+ /**
+ * 文件推送部分的拷备工作,由临时文件目录拷备到推送的最终目的地,并赋相应的权限组
+ */
+ private String copyAndSetPermission(String source, String destFile, String user, String group, String permission) throws Exception{
+ String result = null;
+ if (source != null && destFile != null) {
+ // 根据操作系统确定获取进程ID的方式
+ String os = System.getProperty("os.name");
+ if (os.startsWith("Windows")) {
+ FileUtils.copyFile(new File(source), new File(
+ destFile));//目标路径不存在自动创建
+ } else if (os.startsWith("Linux")) {
+ StringBuffer sb = new StringBuffer();
+ //source destFile都不能含有空格
+ source = source.replace(" ", "\\ ");
+ destFile = destFile.replace(" ", "\\ ");
+ sb.append("\\cp -f " + source + " " + destFile + ";");//2015-11-6 hyx: cp - f修改成\\cp -f (有时候如果不加\\会提示是否,就会有问题)
+ if (permission != null && !"".equals(permission.trim())) {
+ sb.append("chmod " + permission + " "
+ + destFile + ";");
+ }
+ if (user != null && !"".equals(user.trim())) {
+ sb.append("chown " + user + " " + destFile + ";");
+ }
+ if (group != null && !"".equals(group.trim())) {
+ sb.append("chgrp " + group + " " + destFile);
+ }
+ result = ProcessUtil.execLinuxCmd(sb.toString());
+ } else {
+ throw new IOException("unknown operating system: " + os);
+ }
+ }else{
+// result = "源文件或目标文件为空";
+// result = "The source file or target file is empty";
+ result = "i18n_client.TaskReqHandle.sourceOrTargetIsNull_n81i";
+ }
+
+ return result;
+ }
+
+
+ /**
+ * 任务请求处理步骤2:将分析包装好的任务,统一判断处理并添加到线程中执行
+ */
+ public void handleTaskThread(Long taskId, Long startTime, Long endTime,
+ long isLoop, long loopDelay, final CommandPO command,
+ final String threadName, long missionState) {
+ if(missionState == AgentCommand.MISSION_CANCEL_START){//如果任务状态为,撤消任务
+ logger.warn("The task is in the revocation, and the ID is not processed:" + taskId);
+ return;
+ }
+ if(Common.getTaskFuture(taskId)!=null){//当前任务已存在执行,则不执行该当前任务
+ logger.warn("The task already exists to execute the ID:" + taskId);
+ return;
+ }
+ // 设置任务结束时间,且当前时间已超过任务结束时间
+ if (endTime != null && endTime.longValue() > 0
+ && endTime.longValue() <= System.currentTimeMillis()) {
+ logger.warn("The task has expired ID:" + taskId);
+ return;
+ }
+ long delay = 0;
+ if (startTime != null) {
+ delay = startTime - System.currentTimeMillis();
+ }
+ ScheduledFuture<?> taskFuture = null;
+ LoopTaskThread loopTaskThread = null;
+ if (isLoop == 0) {// 非周期任务
+ taskFuture = Common.scheduled.schedule(new Runnable() {
+ public void run() {
+ Thread.currentThread().setName(threadName);
+ new AgentCommand(command).exec();
+ }
+ }, delay, TimeUnit.MILLISECONDS);
+ } else {
+ Future<?> singleFuture = null;
+ if(delay <= 0){//开始时间之后接到任务,先执行一次,第二次按周期点执行
+ long now = System.currentTimeMillis();
+ long cnt = (now - startTime)/(loopDelay * 60 * 1000);
+ if((now - startTime)%(loopDelay * 60 * 1000)!=0){
+ delay = startTime + loopDelay *60 *1000 * (cnt + 1) - System.currentTimeMillis();
+ ///仅执行一次的,并在周期执行代码中第一次执行判断单次执行完成与否,未完成结束掉
+ singleFuture = Common.scheduled.schedule(new Runnable() {
+ public void run() {
+ singleThread = Thread.currentThread();
+// Thread.currentThread().setName(threadName + " 周期单次");
+ Thread.currentThread().setName(threadName + " Periodic Single Time");
+ new AgentCommand(command).exec();
+ }
+ }, 0, TimeUnit.MILLISECONDS);
+ }
+ }
+ loopTaskThread = new LoopTaskThread(threadName, command, loopDelay, singleFuture, singleThread);
+ taskFuture = Common.scheduled.scheduleAtFixedRate(loopTaskThread, delay, loopDelay * 60 * 1000, TimeUnit.MILLISECONDS);
+ // 周期任务,若设置结束时间,则添加取消线程
+ if (endTime != null && endTime.longValue() > 0) {
+ long endDelay = endTime.longValue() - System.currentTimeMillis();
+ if (endDelay > 0) {
+ Common.cancleTaskFuture(taskId, endDelay);
+ }
+ }// 取消线程结束
+
+ }
+ // 将正在执行的任务添加到全局变量,目的是避免重复执行任务
+ Common.putTaskFuture(taskId, taskFuture, loopTaskThread);
+ }
+
+ public static String getUpgradeTaskPushPath(long taskId){
+ return Contants.localUploadsPath + File.separator + taskId;
+ }
+}
+ \ No newline at end of file