summaryrefslogtreecommitdiff
path: root/groot-tests
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-03-09 17:27:45 +0800
committerdoufenghu <[email protected]>2024-03-09 17:27:45 +0800
commit9a7cc00ee0c27ec664b96f151df50ed21f6831e0 (patch)
treeef863cadcbfde213d6d810d9a678aedf31c5975b /groot-tests
parentd54368632b07de402335514ac64e0abd5d49c7af (diff)
[Feature][Tests] Support Env Parameter test in Flink13Container.
Diffstat (limited to 'groot-tests')
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java2
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink14Container.java43
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java5
-rw-r--r--groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java198
-rw-r--r--groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/TestUtils.java21
-rw-r--r--groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml45
6 files changed, 312 insertions, 2 deletions
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java
index 7c41167..01d2133 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java
@@ -5,7 +5,7 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AutoService(TestContainer.class)
-public class Flink13Container extends AbstractTestFlinkContainer{
+public class Flink13Container extends AbstractTestFlinkContainer {
@Override
protected String getStartModuleName() {
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink14Container.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink14Container.java
new file mode 100644
index 0000000..690a0e1
--- /dev/null
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink14Container.java
@@ -0,0 +1,43 @@
+package com.geedgenetworks.test.common.container;
+
+import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AutoService(TestContainer.class)
+public class Flink14Container extends AbstractTestFlinkContainer {
+
+ @Override
+ protected String getStartModuleName() {
+ return "groot-bootstrap";
+ }
+
+ @Override
+ protected String getStartShellName() {
+ return "start.sh";
+ }
+
+ @Override
+ protected String getConnectorModulePath() {
+ return "groot-connectors";
+ }
+
+ @Override
+ protected String getConnectorType() {
+ return "grootstream";
+ }
+
+ @Override
+ protected String getConnectorNamePrefix() {
+ return "connector-";
+ }
+
+ @Override
+ public TestContainerId identifier() {
+ return TestContainerId.FLINK_1_14;
+ }
+ @Override
+ protected String getDockerImage() {
+ return "flink:1.14.6-scala_2.12-java11";
+ }
+}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java
index ddd2012..7831b09 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java
@@ -7,7 +7,10 @@ import static com.geedgenetworks.test.common.container.EngineType.FLINK;
@Getter
@AllArgsConstructor
public enum TestContainerId {
- FLINK_1_13(FLINK, "1.13.6");
+ FLINK_1_13(FLINK, "1.13.1"),
+ FLINK_1_14(FLINK, "1.14.6"),
+ FLINK_1_15(FLINK, "1.15.3"),
+ FLINK_1_16(FLINK, "1.16.0");
private final EngineType engineType;
private final String version;
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java
new file mode 100644
index 0000000..eba669a
--- /dev/null
+++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java
@@ -0,0 +1,198 @@
+package com.geedgenetworks.test.e2e.base;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.TypeReference;
+import com.alibaba.nacos.client.naming.utils.CollectionUtils;
+import com.geedgenetworks.test.common.TestSuiteBase;
+import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer;
+import com.geedgenetworks.test.common.container.ContainerExtendedFactory;
+import com.geedgenetworks.test.common.container.TestContainerId;
+import com.geedgenetworks.test.common.junit.DisabledOnContainer;
+import com.geedgenetworks.test.common.junit.TestContainerExtension;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {TestContainerId.FLINK_1_14},
+ type = {},
+ disabledReason = "only flink adjusts the parameter configuration rules")
+public class EnvParameterTest extends TestSuiteBase {
+ @TestContainerExtension
+ protected final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/grootstream && chown -R flink /tmp/grootstream");
+ Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr());
+ };
+
+ @TestTemplate
+ public void testGeneralEnvParameter(AbstractTestFlinkContainer container)
+ throws IOException, InterruptedException {
+ genericTest(
+ "/test_env_parameter_inline_to_print.yaml", container);
+ }
+
+
+ public void genericTest(String configPath, AbstractTestFlinkContainer container)
+ throws IOException, InterruptedException {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return container.executeJob(configPath);
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+ // wait obtain job id
+ AtomicReference<String> jobId = new AtomicReference<>();
+ await().atMost(300000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Map<String, Object> jobInfo = JSON.parseObject(container.executeJobManagerInnerCommand(
+ "curl http://localhost:8081/jobs/overview"), new TypeReference<Map<String, Object>>() {
+ });
+ List<Map<String, Object>> jobs =
+ (List<Map<String, Object>>) jobInfo.get("jobs");
+ if (!CollectionUtils.isEmpty(jobs)) {
+ jobId.set(jobs.get(0).get("jid").toString());
+ }
+ Assertions.assertNotNull(jobId.get());
+ });
+
+ // obtain job info
+ AtomicReference<Map<String, Object>> jobInfoReference = new AtomicReference<>();
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Map<String, Object> jobInfo = JSON.parseObject( container.executeJobManagerInnerCommand(
+ String.format(
+ "curl http://localhost:8081/jobs/%s",
+ jobId.get())), new TypeReference<Map<String, Object>>() {
+ });
+
+ // wait the job initialization is complete and enters the Running state
+ if (null != jobInfo && "RUNNING".equals(jobInfo.get("state"))) {
+ jobInfoReference.set(jobInfo);
+ }
+ Assertions.assertNotNull(jobInfoReference.get());
+ });
+ Map<String, Object> jobInfo = jobInfoReference.get();
+
+ // obtain execution configuration
+ Map<String, Object> jobConfig = JSON.parseObject(container.executeJobManagerInnerCommand(
+ String.format(
+ "curl http://localhost:8081/jobs/%s/config", jobId.get())), new TypeReference<Map<String, Object>>() {
+ });
+
+ Map<String, Object> executionConfig =
+ (Map<String, Object>) jobConfig.get("execution-config");
+
+ // obtain checkpoint configuration
+ Map<String, Object> checkpointConfig =
+ JSON.parseObject(container.executeJobManagerInnerCommand(
+ String.format(
+ "curl http://localhost:8081/jobs/%s/checkpoints/config", jobId.get())), new TypeReference<Map<String, Object>>() {
+ });
+
+ // obtain checkpoint storage
+ AtomicReference<Map<String, Object>> completedCheckpointReference = new AtomicReference<>();
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Map<String, Object> checkpointsInfo =
+ JSON.parseObject(container.executeJobManagerInnerCommand(
+ String.format(
+ "curl http://localhost:8081/jobs/%s/checkpoints", jobId.get())), new TypeReference<Map<String, Object>>() {
+ });
+ Map<String, Object> latestCheckpoint =
+ (Map<String, Object>) checkpointsInfo.get("latest");
+ // waiting for at least one checkpoint trigger
+ if (null != latestCheckpoint) {
+ completedCheckpointReference.set(
+ (Map<String, Object>) latestCheckpoint.get("completed"));
+ Assertions.assertNotNull(completedCheckpointReference.get());
+ }
+ });
+ /**
+ * adjust the configuration of this {@link
+ * com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName} to use the 'flink.' and the
+ * flink parameter name, and check whether the configuration takes effect
+ */
+ // PARALLELISM
+ int parallelism = (int) executionConfig.get("job-parallelism");
+ Assertions.assertEquals(1, parallelism);
+
+ // MAX_PARALLELISM
+ int maxParallelism = (int) jobInfo.get("maxParallelism");
+ Assertions.assertEquals(5, maxParallelism);
+
+ // CHECKPOINT_INTERVAL
+ int interval = (int) checkpointConfig.get("interval");
+ Assertions.assertEquals(10000, interval);
+
+ // CHECKPOINT_MODE
+ String mode = checkpointConfig.get("mode").toString();
+ Assertions.assertEquals("exactly_once", mode);
+
+ // CHECKPOINT_TIMEOUT
+ int checkpointTimeout = (int) checkpointConfig.get("timeout");
+ Assertions.assertEquals(1200000, checkpointTimeout);
+
+ // CHECKPOINT_DATA_URI
+ String externalPath = completedCheckpointReference.get().get("external_path").toString();
+ Assertions.assertTrue(externalPath.startsWith("file:/tmp/grootstream/checkpoints"));
+
+ // MAX_CONCURRENT_CHECKPOINTS
+ int maxConcurrent = (int) checkpointConfig.get("max_concurrent");
+ Assertions.assertEquals(2, maxConcurrent);
+
+ // CHECKPOINT_CLEANUP_MODE
+ Map<String, Object> externalizationMap =
+ (Map<String, Object>) checkpointConfig.get("externalization");
+ boolean externalization = (boolean) externalizationMap.get("delete_on_cancellation");
+ Assertions.assertTrue(externalization);
+
+ // MIN_PAUSE_BETWEEN_CHECKPOINTS
+ int minPause = (int) checkpointConfig.get("min_pause");
+ Assertions.assertEquals(100, minPause);
+
+ // FAIL_ON_CHECKPOINTING_ERRORS
+ int tolerableFailedCheckpoints = (int) checkpointConfig.get("tolerable_failed_checkpoints");
+ Assertions.assertEquals(5, tolerableFailedCheckpoints);
+
+ // RESTART_STRATEGY / because the restart strategy is fixed-delay in config file, so don't
+ // check failure-rate
+ String restartStrategy = executionConfig.get("restart-strategy").toString();
+ Assertions.assertTrue(restartStrategy.contains("fixed delay"));
+
+ // RESTART_ATTEMPTS
+ Assertions.assertTrue(restartStrategy.contains("2 restart attempts"));
+
+ // RESTART_DELAY_BETWEEN_ATTEMPTS
+ Assertions.assertTrue(restartStrategy.contains("fixed delay (1000 ms)"));
+
+ // STATE_BACKEND
+ String stateBackend = checkpointConfig.get("state_backend").toString();
+ Assertions.assertTrue(stateBackend.contains("RocksDBStateBackend"));
+ }
+
+
+
+}
+
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/TestUtils.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/TestUtils.java
new file mode 100644
index 0000000..4aa2dc6
--- /dev/null
+++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/TestUtils.java
@@ -0,0 +1,21 @@
+package com.geedgenetworks.test.e2e.base;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+
+@Slf4j
+public class TestUtils {
+ public static String getResource(String confFile) {
+ return System.getProperty("user.dir")
+ + File.separator
+ + "src"
+ + File.separator
+ + "test"
+ + File.separator
+ + "resources"
+ + File.separator
+ + confFile;
+ }
+
+}
diff --git a/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml
new file mode 100644
index 0000000..c9ebf10
--- /dev/null
+++ b/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml
@@ -0,0 +1,45 @@
+sources:
+ inline_source:
+ type: inline
+ properties:
+ data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]'
+ format: json
+ json.ignore.parse.errors: false
+
+sinks:
+ print_sink:
+ type: print
+ properties:
+ format: json
+ mode: log_warn
+
+application:
+ env:
+ name: example-inline-to-print
+ parallelism: 1
+ execution:
+ checkpointing:
+ interval: 10000
+ mode: exactly_once
+ timeout: 1200000
+ data-uri: file:///tmp/grootstream/checkpoints
+ max-concurrent-checkpoints: 2
+ cleanup: true
+ min-pause: 100
+ tolerable-failed-checkpoints: 5
+ restart:
+ strategy: fixed-delay
+ attempts: 2
+ delayBetweenAttempts: 1000
+ state:
+ backend: rocksdb
+ flink:
+ pipeline:
+ max-parallelism: 5
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: inline_source
+ downstream: [print_sink]
+ - name: print_sink
+ downstream: [] \ No newline at end of file