From d82e8100a1ea9297af71d1b2daa02f9b6358cc19 Mon Sep 17 00:00:00 2001 From: doufenghu Date: Tue, 27 Aug 2024 19:55:20 +0800 Subject: add SM4 description (GAL-650) --- docs/connector/config-encryption-decryption.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'docs') diff --git a/docs/connector/config-encryption-decryption.md b/docs/connector/config-encryption-decryption.md index 3146569..c2b05f6 100644 --- a/docs/connector/config-encryption-decryption.md +++ b/docs/connector/config-encryption-decryption.md @@ -6,14 +6,14 @@ In production environments, sensitive configuration items such as passwords are ## How to use -Groot Stream default support base64 and AES encryption and decryption. +Groot Stream support base64, AES and SM4 encryption and decryption. Base64 encryption support encrypt the following parameters: - username - password - auth -AES encryption support encrypt the following parameters: +AES/SM4 encryption support encrypt the following parameters: - username - password - auth -- cgit v1.2.3 From 0d5ce165d30383a4b9b9945b61150d8e8015893d Mon Sep 17 00:00:00 2001 From: doufenghu Date: Mon, 2 Sep 2024 23:36:07 +0800 Subject: [Fix][e2e-common] Support user-defined variables via CLI when submitting a job. --- docs/env-config.md | 23 +++++++++++++++++++--- docs/grootstream-config.md | 5 ++++- docs/user-guide.md | 4 ++-- .../bootstrap/main/GrootStreamRunner.java | 2 +- .../common/container/AbstractTestContainer.java | 22 +++++++++++++++++++-- .../container/AbstractTestFlinkContainer.java | 8 +++++++- .../test/common/container/TestContainer.java | 3 +++ .../test/e2e/base/InlineToPrintIT.java | 11 +++++++---- 8 files changed, 64 insertions(+), 14 deletions(-) (limited to 'docs') diff --git a/docs/env-config.md b/docs/env-config.md index 7a31494..8e22a53 100644 --- a/docs/env-config.md +++ b/docs/env-config.md @@ -57,10 +57,10 @@ Specify a list of classpath URLs via `pipeline.classpaths`, The classpaths are s You can directly use the flink parameter by prefixing `flink.`, such as `flink.execution.buffer-timeout`, `flink.object-reuse`, etc. More details can be found in the official [flink documentation](https://flink.apache.org/). Of course, you can use groot stream parameter, here are some parameter names corresponding to the names in Flink. -| Groot Stream | Flink | +| Groot Stream | Flink | |----------------------------------------|---------------------------------------------------------------| -| execution.buffer-timeout | flink.execution.buffer-timeout | -| pipeline.object-reuse | flink.object-reuse | +| execution.buffer-timeout | flink.execution.buffer-timeout.interval | +| pipeline.object-reuse | flink.pipeline.object-reuse | | pipeline.max-parallelism | flink.pipeline.max-parallelism | | execution.restart.strategy | flink.restart-strategy | | execution.restart.attempts | flink.restart-strategy.fixed-delay.attempts | @@ -70,3 +70,20 @@ Of course, you can use groot stream parameter, here are some parameter names cor | execution.restart.delayInterval | flink.restart-strategy.failure-rate.delay | | ... | ... | +## Properties +Job-level user-defined variables can be set in the `properties` section using key-value pairs, where the key represents a configuration property and the value specifies the desired setting. +The properties can be used in the configuration file by using `props.${property_name}`. It will override the corresponding settings in the `grootstream.yaml` file for the duration of the job. +```yaml +application: + env: + name: example-inline-to-print + parallelism: 3 + pipeline: + object-reuse: true + properties: + hos.bucket.name.rtp_file: job_level_traffic_rtp_file_bucket + hos.bucket.name.http_file: job_level_traffic_http_file_bucket + hos.bucket.name.eml_file: job_level_traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: job_level_traffic_policy_capture_file_bucket +``` + diff --git a/docs/grootstream-config.md b/docs/grootstream-config.md index fb902ae..9dd442f 100644 --- a/docs/grootstream-config.md +++ b/docs/grootstream-config.md @@ -20,7 +20,7 @@ grootstream: ``` -### Knowledge Base +## Knowledge Base The knowledge base is a collection of libraries that can be used in the groot-stream job's UDFs. File system type can be specified `local`, `http` or `hdfs`. If the value is `http`, must be ` QGW Knowledge Base Repository` URL. The library will be dynamically updated according to the `scheduler.knowledge_base.update.interval.minutes` configuration. @@ -77,3 +77,6 @@ grootstream: - asn_builtin.mmdb - asn_user_defined.mmdb ``` +## Properties +Global user-defined variables can be set in the `properties` section using key-value pairs, where the key represents a configuration property and the value specifies the desired setting. +The properties can be used in the configuration file by using `props.${property_name}`. \ No newline at end of file diff --git a/docs/user-guide.md b/docs/user-guide.md index e35616f..d52cfed 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -153,7 +153,7 @@ Used to define job environment configuration information. For more details, you # Command ## Run a job by CLI - +Note: When submitting a job via CLI, you can use `-D` parameter to specify flink configuration. For example, `-Dexecution.buffer-timeout.interval=1000` to set the buffer timeout to 1000ms. More details can be found in the official [flink documentation](https://flink.apache.org/). ```bash Usage: start.sh [options] Options: @@ -164,7 +164,7 @@ Options: -e, --deploy-mode Deploy mode, only support [run] (default: run) --target Submitted target type, support [local, remote, yarn-session, yarn-per-job] -n, --name Job name (default: groot-stream-job) - -i, --variable User-defined parameters, eg. -i key=value (default: []) + -i, --variable User-defined variables, eg. -i key=value (default: []) -h, --help Show help message -v, --version Show version message diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java index 8ab8bdc..6a106d2 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/main/GrootStreamRunner.java @@ -76,7 +76,7 @@ public class GrootStreamRunner { bootstrapCommandArgs.getVariables().stream() .filter(Objects::nonNull) .map(String::trim) - .forEach(variable -> command.add("-D" + variable)); + .forEach(variable -> command.add("-i " + variable)); return command; } diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java index 0f1e3f7..14eb5fb 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java @@ -62,8 +62,9 @@ public abstract class AbstractTestContainer implements TestContainer { container, this.startModuleFullPath, GROOTSTREAM_HOME); } - protected Container.ExecResult executeJob(GenericContainer container, String confFile) + protected Container.ExecResult executeJob(GenericContainer container, String confFile, List variables) throws IOException, InterruptedException { + final String confInContainerPath = ContainerUtil.copyConfigFileToContainer(container, confFile); // copy connectors ContainerUtil.copyConnectorJarToContainer( @@ -81,10 +82,27 @@ public abstract class AbstractTestContainer implements TestContainer { command.add(ContainerUtil.adaptPathForWin(confInContainerPath)); command.add("--target"); command.add("remote"); - command.addAll(getExtraStartShellCommands()); + List extraStartShellCommands = new ArrayList<>(getExtraStartShellCommands()); + if (variables != null && !variables.isEmpty()) { + variables.forEach( + v -> { + extraStartShellCommands.add("-i"); + extraStartShellCommands.add(v); + }); + } + command.addAll(extraStartShellCommands); return executeCommand(container, command); } + + + protected Container.ExecResult executeJob(GenericContainer container, String confFile) + throws IOException, InterruptedException { + return executeJob(container, confFile, null); + } + + + protected Container.ExecResult savepointJob(GenericContainer container, String jobId) throws IOException, InterruptedException { final List command = new ArrayList<>(); diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java index 30e6eb3..b833115 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java @@ -127,8 +127,14 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer { @Override public Container.ExecResult executeJob(String confFile) throws IOException, InterruptedException { + return executeJob(confFile, null); + } + + @Override + public Container.ExecResult executeJob(String confFile, List variables) + throws IOException, InterruptedException { log.info("test in container: {}", identifier()); - return executeJob(jobManager, confFile); + return executeJob(jobManager, confFile, variables); } @Override diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java index 6e4cd1f..b3bf77a 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainer.java @@ -5,6 +5,7 @@ import org.testcontainers.containers.Container; import org.testcontainers.containers.Network; import java.io.IOException; +import java.util.List; public interface TestContainer extends TestResource { Network NETWORK = Network.newNetwork(); @@ -15,6 +16,8 @@ public interface TestContainer extends TestResource { Container.ExecResult executeJob(String confFile) throws IOException, InterruptedException; + Container.ExecResult executeJob(String confFile, List variables) + throws IOException, InterruptedException; default Container.ExecResult savepointJob(String jobId) throws IOException, InterruptedException { diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java index dde7b28..1c1e777 100644 --- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java +++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java @@ -24,7 +24,7 @@ import static org.awaitility.Awaitility.await; @DisabledOnContainer( value = {TestContainerId.FLINK_1_17}, type = {}, - disabledReason = "only flink adjusts the parameter configuration rules") + disabledReason = "Only flink adjusts the parameter configuration rules") public class InlineToPrintIT extends TestSuiteBase { @TestTemplate @@ -32,7 +32,10 @@ public class InlineToPrintIT extends TestSuiteBase { CompletableFuture.supplyAsync( () -> { try { - return container.executeJob("/inline_to_print.yaml"); + List variables = List.of( + "hos.bucket.name.rtp_file=cli_job_level_traffic_rtp_file_bucket", + "hos.bucket.name.http_file=cli_job_level_traffic_http_file_bucket"); + return container.executeJob("/inline_to_print.yaml", variables); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -91,8 +94,8 @@ public class InlineToPrintIT extends TestSuiteBase { .untilAsserted( () -> { String logs = container.getServerLogs(); - Assertions.assertTrue(StringUtils.countMatches(logs, "job_level_traffic_rtp_file_bucket/test_pcap_file") > 10); - Assertions.assertTrue(StringUtils.countMatches(logs, "job_level_traffic_http_file_bucket/test_http_req_file") > 10); + Assertions.assertTrue(StringUtils.countMatches(logs, "cli_job_level_traffic_rtp_file_bucket/test_pcap_file") > 10); + Assertions.assertTrue(StringUtils.countMatches(logs, "cli_job_level_traffic_http_file_bucket/test_http_req_file") > 10); }); -- cgit v1.2.3