diff options
10 files changed, 64 insertions, 97 deletions
diff --git a/docs/env-config.md b/docs/env-config.md index 58f7e71..36d1ef9 100644 --- a/docs/env-config.md +++ b/docs/env-config.md @@ -45,15 +45,7 @@ This parameter is used to enable/disable object reuse for the execution of the j ### jars -Third-party jars can be loaded via `jars`, by using `jars="file://local/jar1.jar;file://local/jar2.jar"`. - -### pipeline.jars - -Specify a list of jar URLs via `pipeline.jars`, The jars are separated by `;` and will be uploaded to the flink cluster. - -### pipeline.classpaths - -Specify a list of classpath URLs via `pipeline.classpaths`, The classpaths are separated by `;` and will be added to the classpath of the flink cluster. +Third-party jars can be loaded via `jars`, by using `jars="file:///local/jar1.jar;file:///local/jar2.jar"`. ## Engine Parameter diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/CommandException.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/CommandException.java new file mode 100644 index 0000000..b6e590f --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/CommandException.java @@ -0,0 +1,10 @@ +package com.geedgenetworks.bootstrap.exception; + +public class CommandException extends RuntimeException { + public CommandException(String message) { + super(message); + } + public CommandException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/CommandExecuteException.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/CommandExecuteException.java index 3722286..fe7fea2 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/CommandExecuteException.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/CommandExecuteException.java @@ -1,10 +1,9 @@ package com.geedgenetworks.bootstrap.exception; -public class CommandExecuteException extends RuntimeException { +public class CommandExecuteException extends CommandException { public CommandExecuteException(String message) { super(message); } - public CommandExecuteException(String message, Throwable cause) { super(message, cause); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/ConfigCheckException.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/ConfigCheckException.java index 9d948a3..5c77e6b 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/ConfigCheckException.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/ConfigCheckException.java @@ -1,11 +1,9 @@ package com.geedgenetworks.bootstrap.exception; -@Deprecated -public class ConfigCheckException extends RuntimeException { +public class ConfigCheckException extends CommandException { public ConfigCheckException(String message) { super(message); } - public ConfigCheckException(String message, Throwable cause) { super(message, cause); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/JobExecuteException.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/JobExecuteException.java index 596bd47..dd4e91c 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/JobExecuteException.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/JobExecuteException.java @@ -4,7 +4,6 @@ public class JobExecuteException extends RuntimeException { public JobExecuteException(String message) { super(message); } - public JobExecuteException(String message, Throwable cause) { super(message, cause); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index c8945e8..fa56483 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -40,7 +40,7 @@ public class JobExecution { public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) { try { - jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.appBootstrapDir() + jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.bootstrapDir() .resolve(GrootStreamRunner.APP_JAR_NAME).toString()) .toURI().toURL())); } catch (MalformedURLException e) { @@ -48,8 +48,9 @@ public class JobExecution { } registerPlugin(jobConfig.getConfig(Constants.APPLICATION)); + this.jobRuntimeEnvironment = - JobRuntimeEnvironment.getInstance(this.registerPlugin(jobConfig, jarPaths), grootStreamConfig); + JobRuntimeEnvironment.getInstance(this.configureExecutionJars(jobConfig, jarPaths), grootStreamConfig); this.sourceExecutor = new SourceExecutor(jobRuntimeEnvironment, jobConfig); this.sinkExecutor = new SinkExecutor(jobRuntimeEnvironment, jobConfig); @@ -85,7 +86,7 @@ public class JobExecution { } - private Config registerPlugin(Config config, List<URL> jars) { + private Config configureExecutionJars(Config config, List<URL> jars) { config = this.injectJarsToConfig( config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars); return this.injectJarsToConfig( @@ -235,7 +236,7 @@ public class JobExecution { log.info("Job finished, execution duration: {} ms", jobEndTime - jobStartTime); } catch (Exception e) { - throw new JobExecuteException("Execute job error", e); + throw new JobExecuteException("Job execution failed.", e); } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/StartBuilder.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/StartBuilder.java index 620fa96..9f4835f 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/StartBuilder.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/StartBuilder.java @@ -1,6 +1,7 @@ package com.geedgenetworks.bootstrap.execution; import com.geedgenetworks.bootstrap.enums.DeployMode; +import com.geedgenetworks.bootstrap.exception.CommandExecuteException; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; @@ -84,11 +85,11 @@ public class StartBuilder { } else if (DeployMode.RUN_APPLICATION == MODE) { return Paths.get(""); } else { - throw new IllegalStateException("deploy mode not support : " + MODE); + throw new CommandExecuteException("Unsupported deploy mode: " + MODE); } } - public static Path appBootstrapDir() { + public static Path bootstrapDir() { return appRootDir().resolve("bootstrap"); } @@ -98,6 +99,7 @@ public class StartBuilder { public static Path pluginRootDir() { return Paths.get(getGrootStreamHome(), "plugins"); } + /** Plugin Connector Dir */ public static Path connectorDir() { return Paths.get(getGrootStreamHome(), "connectors"); @@ -110,71 +112,58 @@ public class StartBuilder { } /** - * return lib jars, which located in 'lib/*' or 'lib/{dir}/*'. - * @return + * Return lib jars, which located in 'lib/*' or 'lib/{dir}/*'. + * @return lib jars */ public static List<Path> getLibJars() { - Path libRootDir = libDir(); - if (Files.exists(libRootDir, new LinkOption[0]) && Files.isDirectory(libRootDir, new LinkOption[0])) { - try { - Stream<Path> stream = Files.walk(libRootDir, APP_LIB_DIR_DEPTH, new FileVisitOption[]{FileVisitOption.FOLLOW_LINKS}); - Throwable var2 = null; - - List var3; - try { - var3 = (List)stream.filter((it) -> { - return !it.toFile().isDirectory(); - }).filter((it) -> { - return it.getFileName().toString().endsWith(".jar"); - }).collect(Collectors.toList()); - } catch (Throwable var13) { - var2 = var13; - throw var13; - } finally { - if (stream != null) { - if (var2 != null) { - try { - stream.close(); - } catch (Throwable var12) { - var2.addSuppressed(var12); - } - } else { - stream.close(); - } - } - - } - - return var3; - } catch (IOException var15) { - throw new RuntimeException(var15); - } - } else { + Path libRootDir = libDir(); + if (!Files.exists(libRootDir) || !Files.isDirectory(libRootDir)) { return Collections.emptyList(); } + try (Stream<Path> stream = Files.walk(libRootDir, APP_LIB_DIR_DEPTH, FOLLOW_LINKS)) { + return stream.filter(it -> !it.toFile().isDirectory()) + .filter(it -> it.getFileName().toString().endsWith(".jar")) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new RuntimeException(e); + } } /** * return the jar package configured in env jars - * @param paths - * @return + * @param paths jar paths + * @return third party jars */ public static Set<Path> getThirdPartyJars(String paths) { - return (Set) Arrays.stream(paths.split(";")).filter((s) -> { - return !"".equals(s); - }).filter((it) -> { - return it.endsWith(".jar"); - }).map((path) -> { - return Paths.get(URI.create(path)); - }).collect(Collectors.toSet()); - } + return Arrays.stream(paths.split(";")) + .filter(s -> !"".equals(s)) + .filter(it -> it.endsWith(".jar")) + .map(path -> Paths.get(URI.create(path))) + .collect(Collectors.toSet()); + } - public static Path pluginTarball() { - return appRootDir().resolve("plugins.tar.gz"); + /** + * return connector jars, which located in 'connectors/*'. + * @return connector jars + */ + public static List<Path> getConnectorJars() { + Path connectorRootDir = StartBuilder.connectorDir(); + if (!Files.exists(connectorRootDir) || !Files.isDirectory(connectorRootDir)) { + return Collections.emptyList(); + } + try (Stream<Path> stream = Files.walk(connectorRootDir, 2, FOLLOW_LINKS)) { + return stream.filter( + it -> !it.toFile().isDirectory()) + .filter(it -> it.getFileName().toString().endsWith(".jar")) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new RuntimeException(e); + } } + /** return plugin's dependent jars, which located in 'plugins/${pluginName}/lib/*'. */ public static List<Path> getPluginsJarDependencies() { Path pluginRootDir = StartBuilder.pluginRootDir(); @@ -194,19 +183,6 @@ public class StartBuilder { } } - public static List<Path> getConnectorJars() { - Path connectorRootDir = StartBuilder.connectorDir(); - if (!Files.exists(connectorRootDir) || !Files.isDirectory(connectorRootDir)) { - return Collections.emptyList(); - } - try (Stream<Path> stream = Files.walk(connectorRootDir, 2, FOLLOW_LINKS)) { - return stream.filter( - it -> !it.toFile().isDirectory()) - .filter(it -> it.getFileName().toString().endsWith(".jar")) - .collect(Collectors.toList()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } + } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java index 6a106d2..26969f6 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java @@ -15,14 +15,14 @@ public class GrootStreamRunner { public static final String APP_JAR_NAME = EngineType.FLINK13.getJarName(); public static final String SHELL_NAME = EngineType.FLINK13.getShellName(); private final ExecuteCommandArgs bootstrapCommandArgs; - private final String appJar; + private final String bootstrapJar; GrootStreamRunner(String[] args) { this.bootstrapCommandArgs = CommandLineUtils.parse(args, new ExecuteCommandArgs(), SHELL_NAME, true); StartBuilder.setDeployMode(bootstrapCommandArgs.getDeployMode()); StartBuilder.setStarter(true); - this.appJar = StartBuilder.appBootstrapDir().resolve(APP_JAR_NAME).toString(); + this.bootstrapJar = StartBuilder.bootstrapDir().resolve(APP_JAR_NAME).toString(); } public static void main(String[] args) { @@ -47,7 +47,7 @@ public class GrootStreamRunner { command.add("-c"); command.add(APP_NAME); //set main jar name - command.add(appJar); + command.add(bootstrapJar); //set config file path command.add("--config"); command.add(bootstrapCommandArgs.getConfigFile()); diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/FactoryUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/FactoryUtil.java index 276e28c..93197c6 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/utils/FactoryUtil.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/FactoryUtil.java @@ -4,10 +4,7 @@ import com.geedgenetworks.common.exception.IMapStorageException; import lombok.extern.slf4j.Slf4j; import java.lang.reflect.InvocationTargetException; -import java.util.LinkedList; -import java.util.List; -import java.util.ServiceConfigurationError; -import java.util.ServiceLoader; +import java.util.*; import java.util.stream.Collectors; @Slf4j @@ -17,7 +14,6 @@ public class FactoryUtil { try { final List<T> result = new LinkedList<>(); ServiceLoader.load(factoryClass, classLoader).iterator().forEachRemaining(result::add); - List<T> foundFactories = result.stream() .filter(f -> factoryClass.isAssignableFrom(f.getClass())) diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/JsonPathUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/JsonPathUtil.java index 0823ddc..b9cfc72 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/utils/JsonPathUtil.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/JsonPathUtil.java @@ -10,10 +10,6 @@ import com.alibaba.fastjson2.JSONReader; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -/** - * @author qidaijie @Package com.zdjizhi.tools.json @Description: - * @date 2023/5/1917:51 - */ public class JsonPathUtil { private static final Log logger = LogFactory.get(); |
