summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/env-config.md10
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/CommandException.java10
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/CommandExecuteException.java3
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/ConfigCheckException.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/JobExecuteException.java1
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java9
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/StartBuilder.java108
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java6
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/utils/FactoryUtil.java6
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/utils/JsonPathUtil.java4
10 files changed, 64 insertions, 97 deletions
diff --git a/docs/env-config.md b/docs/env-config.md
index 58f7e71..36d1ef9 100644
--- a/docs/env-config.md
+++ b/docs/env-config.md
@@ -45,15 +45,7 @@ This parameter is used to enable/disable object reuse for the execution of the j
### jars
-Third-party jars can be loaded via `jars`, by using `jars="file://local/jar1.jar;file://local/jar2.jar"`.
-
-### pipeline.jars
-
-Specify a list of jar URLs via `pipeline.jars`, The jars are separated by `;` and will be uploaded to the flink cluster.
-
-### pipeline.classpaths
-
-Specify a list of classpath URLs via `pipeline.classpaths`, The classpaths are separated by `;` and will be added to the classpath of the flink cluster.
+Third-party jars can be loaded via `jars`, by using `jars="file:///local/jar1.jar;file:///local/jar2.jar"`.
## Engine Parameter
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/CommandException.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/CommandException.java
new file mode 100644
index 0000000..b6e590f
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/CommandException.java
@@ -0,0 +1,10 @@
+package com.geedgenetworks.bootstrap.exception;
+
+public class CommandException extends RuntimeException {
+ public CommandException(String message) {
+ super(message);
+ }
+ public CommandException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/CommandExecuteException.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/CommandExecuteException.java
index 3722286..fe7fea2 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/CommandExecuteException.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/CommandExecuteException.java
@@ -1,10 +1,9 @@
package com.geedgenetworks.bootstrap.exception;
-public class CommandExecuteException extends RuntimeException {
+public class CommandExecuteException extends CommandException {
public CommandExecuteException(String message) {
super(message);
}
-
public CommandExecuteException(String message, Throwable cause) {
super(message, cause);
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/ConfigCheckException.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/ConfigCheckException.java
index 9d948a3..5c77e6b 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/ConfigCheckException.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/ConfigCheckException.java
@@ -1,11 +1,9 @@
package com.geedgenetworks.bootstrap.exception;
-@Deprecated
-public class ConfigCheckException extends RuntimeException {
+public class ConfigCheckException extends CommandException {
public ConfigCheckException(String message) {
super(message);
}
-
public ConfigCheckException(String message, Throwable cause) {
super(message, cause);
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/JobExecuteException.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/JobExecuteException.java
index 596bd47..dd4e91c 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/JobExecuteException.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/exception/JobExecuteException.java
@@ -4,7 +4,6 @@ public class JobExecuteException extends RuntimeException {
public JobExecuteException(String message) {
super(message);
}
-
public JobExecuteException(String message, Throwable cause) {
super(message, cause);
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
index c8945e8..fa56483 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
@@ -40,7 +40,7 @@ public class JobExecution {
public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) {
try {
- jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.appBootstrapDir()
+ jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.bootstrapDir()
.resolve(GrootStreamRunner.APP_JAR_NAME).toString())
.toURI().toURL()));
} catch (MalformedURLException e) {
@@ -48,8 +48,9 @@ public class JobExecution {
}
registerPlugin(jobConfig.getConfig(Constants.APPLICATION));
+
this.jobRuntimeEnvironment =
- JobRuntimeEnvironment.getInstance(this.registerPlugin(jobConfig, jarPaths), grootStreamConfig);
+ JobRuntimeEnvironment.getInstance(this.configureExecutionJars(jobConfig, jarPaths), grootStreamConfig);
this.sourceExecutor = new SourceExecutor(jobRuntimeEnvironment, jobConfig);
this.sinkExecutor = new SinkExecutor(jobRuntimeEnvironment, jobConfig);
@@ -85,7 +86,7 @@ public class JobExecution {
}
- private Config registerPlugin(Config config, List<URL> jars) {
+ private Config configureExecutionJars(Config config, List<URL> jars) {
config = this.injectJarsToConfig(
config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars);
return this.injectJarsToConfig(
@@ -235,7 +236,7 @@ public class JobExecution {
log.info("Job finished, execution duration: {} ms", jobEndTime - jobStartTime);
} catch (Exception e) {
- throw new JobExecuteException("Execute job error", e);
+ throw new JobExecuteException("Job execution failed.", e);
}
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/StartBuilder.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/StartBuilder.java
index 620fa96..9f4835f 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/StartBuilder.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/StartBuilder.java
@@ -1,6 +1,7 @@
package com.geedgenetworks.bootstrap.execution;
import com.geedgenetworks.bootstrap.enums.DeployMode;
+import com.geedgenetworks.bootstrap.exception.CommandExecuteException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
@@ -84,11 +85,11 @@ public class StartBuilder {
} else if (DeployMode.RUN_APPLICATION == MODE) {
return Paths.get("");
} else {
- throw new IllegalStateException("deploy mode not support : " + MODE);
+ throw new CommandExecuteException("Unsupported deploy mode: " + MODE);
}
}
- public static Path appBootstrapDir() {
+ public static Path bootstrapDir() {
return appRootDir().resolve("bootstrap");
}
@@ -98,6 +99,7 @@ public class StartBuilder {
public static Path pluginRootDir() {
return Paths.get(getGrootStreamHome(), "plugins");
}
+
/** Plugin Connector Dir */
public static Path connectorDir() {
return Paths.get(getGrootStreamHome(), "connectors");
@@ -110,71 +112,58 @@ public class StartBuilder {
}
/**
- * return lib jars, which located in 'lib/*' or 'lib/{dir}/*'.
- * @return
+ * Return lib jars, which located in 'lib/*' or 'lib/{dir}/*'.
+ * @return lib jars
*/
public static List<Path> getLibJars() {
- Path libRootDir = libDir();
- if (Files.exists(libRootDir, new LinkOption[0]) && Files.isDirectory(libRootDir, new LinkOption[0])) {
- try {
- Stream<Path> stream = Files.walk(libRootDir, APP_LIB_DIR_DEPTH, new FileVisitOption[]{FileVisitOption.FOLLOW_LINKS});
- Throwable var2 = null;
-
- List var3;
- try {
- var3 = (List)stream.filter((it) -> {
- return !it.toFile().isDirectory();
- }).filter((it) -> {
- return it.getFileName().toString().endsWith(".jar");
- }).collect(Collectors.toList());
- } catch (Throwable var13) {
- var2 = var13;
- throw var13;
- } finally {
- if (stream != null) {
- if (var2 != null) {
- try {
- stream.close();
- } catch (Throwable var12) {
- var2.addSuppressed(var12);
- }
- } else {
- stream.close();
- }
- }
-
- }
-
- return var3;
- } catch (IOException var15) {
- throw new RuntimeException(var15);
- }
- } else {
+ Path libRootDir = libDir();
+ if (!Files.exists(libRootDir) || !Files.isDirectory(libRootDir)) {
return Collections.emptyList();
}
+ try (Stream<Path> stream = Files.walk(libRootDir, APP_LIB_DIR_DEPTH, FOLLOW_LINKS)) {
+ return stream.filter(it -> !it.toFile().isDirectory())
+ .filter(it -> it.getFileName().toString().endsWith(".jar"))
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
/**
* return the jar package configured in env jars
- * @param paths
- * @return
+ * @param paths jar paths
+ * @return third party jars
*/
public static Set<Path> getThirdPartyJars(String paths) {
- return (Set) Arrays.stream(paths.split(";")).filter((s) -> {
- return !"".equals(s);
- }).filter((it) -> {
- return it.endsWith(".jar");
- }).map((path) -> {
- return Paths.get(URI.create(path));
- }).collect(Collectors.toSet());
- }
+ return Arrays.stream(paths.split(";"))
+ .filter(s -> !"".equals(s))
+ .filter(it -> it.endsWith(".jar"))
+ .map(path -> Paths.get(URI.create(path)))
+ .collect(Collectors.toSet());
+ }
- public static Path pluginTarball() {
- return appRootDir().resolve("plugins.tar.gz");
+ /**
+ * return connector jars, which located in 'connectors/*'.
+ * @return connector jars
+ */
+ public static List<Path> getConnectorJars() {
+ Path connectorRootDir = StartBuilder.connectorDir();
+ if (!Files.exists(connectorRootDir) || !Files.isDirectory(connectorRootDir)) {
+ return Collections.emptyList();
+ }
+ try (Stream<Path> stream = Files.walk(connectorRootDir, 2, FOLLOW_LINKS)) {
+ return stream.filter(
+ it -> !it.toFile().isDirectory())
+ .filter(it -> it.getFileName().toString().endsWith(".jar"))
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
+
/** return plugin's dependent jars, which located in 'plugins/${pluginName}/lib/*'. */
public static List<Path> getPluginsJarDependencies() {
Path pluginRootDir = StartBuilder.pluginRootDir();
@@ -194,19 +183,6 @@ public class StartBuilder {
}
}
- public static List<Path> getConnectorJars() {
- Path connectorRootDir = StartBuilder.connectorDir();
- if (!Files.exists(connectorRootDir) || !Files.isDirectory(connectorRootDir)) {
- return Collections.emptyList();
- }
- try (Stream<Path> stream = Files.walk(connectorRootDir, 2, FOLLOW_LINKS)) {
- return stream.filter(
- it -> !it.toFile().isDirectory())
- .filter(it -> it.getFileName().toString().endsWith(".jar"))
- .collect(Collectors.toList());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
+
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java
index 6a106d2..26969f6 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java
@@ -15,14 +15,14 @@ public class GrootStreamRunner {
public static final String APP_JAR_NAME = EngineType.FLINK13.getJarName();
public static final String SHELL_NAME = EngineType.FLINK13.getShellName();
private final ExecuteCommandArgs bootstrapCommandArgs;
- private final String appJar;
+ private final String bootstrapJar;
GrootStreamRunner(String[] args) {
this.bootstrapCommandArgs =
CommandLineUtils.parse(args, new ExecuteCommandArgs(), SHELL_NAME, true);
StartBuilder.setDeployMode(bootstrapCommandArgs.getDeployMode());
StartBuilder.setStarter(true);
- this.appJar = StartBuilder.appBootstrapDir().resolve(APP_JAR_NAME).toString();
+ this.bootstrapJar = StartBuilder.bootstrapDir().resolve(APP_JAR_NAME).toString();
}
public static void main(String[] args) {
@@ -47,7 +47,7 @@ public class GrootStreamRunner {
command.add("-c");
command.add(APP_NAME);
//set main jar name
- command.add(appJar);
+ command.add(bootstrapJar);
//set config file path
command.add("--config");
command.add(bootstrapCommandArgs.getConfigFile());
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/FactoryUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/FactoryUtil.java
index 276e28c..93197c6 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/utils/FactoryUtil.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/FactoryUtil.java
@@ -4,10 +4,7 @@ import com.geedgenetworks.common.exception.IMapStorageException;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationTargetException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ServiceConfigurationError;
-import java.util.ServiceLoader;
+import java.util.*;
import java.util.stream.Collectors;
@Slf4j
@@ -17,7 +14,6 @@ public class FactoryUtil {
try {
final List<T> result = new LinkedList<>();
ServiceLoader.load(factoryClass, classLoader).iterator().forEachRemaining(result::add);
-
List<T> foundFactories =
result.stream()
.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/JsonPathUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/JsonPathUtil.java
index 0823ddc..b9cfc72 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/utils/JsonPathUtil.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/JsonPathUtil.java
@@ -10,10 +10,6 @@ import com.alibaba.fastjson2.JSONReader;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-/**
- * @author qidaijie @Package com.zdjizhi.tools.json @Description:
- * @date 2023/5/1917:51
- */
public class JsonPathUtil {
private static final Log logger = LogFactory.get();