diff options
| author | doufenghu <[email protected]> | 2024-03-09 17:27:45 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-03-09 17:27:45 +0800 |
| commit | 9a7cc00ee0c27ec664b96f151df50ed21f6831e0 (patch) | |
| tree | ef863cadcbfde213d6d810d9a678aedf31c5975b | |
| parent | d54368632b07de402335514ac64e0abd5d49c7af (diff) | |
[Feature][Tests] Support Env Parameter test in Flink13Container.
6 files changed, 312 insertions, 2 deletions
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java index 7c41167..01d2133 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java @@ -5,7 +5,7 @@ import lombok.NoArgsConstructor; @NoArgsConstructor @AutoService(TestContainer.class) -public class Flink13Container extends AbstractTestFlinkContainer{ +public class Flink13Container extends AbstractTestFlinkContainer { @Override protected String getStartModuleName() { diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink14Container.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink14Container.java new file mode 100644 index 0000000..690a0e1 --- /dev/null +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink14Container.java @@ -0,0 +1,43 @@ +package com.geedgenetworks.test.common.container; + +import com.google.auto.service.AutoService; +import lombok.NoArgsConstructor; + +@NoArgsConstructor +@AutoService(TestContainer.class) +public class Flink14Container extends AbstractTestFlinkContainer { + + @Override + protected String getStartModuleName() { + return "groot-bootstrap"; + } + + @Override + protected String getStartShellName() { + return "start.sh"; + } + + @Override + protected String getConnectorModulePath() { + return "groot-connectors"; + } + + @Override + protected String getConnectorType() { + return "grootstream"; + } + + @Override + protected String getConnectorNamePrefix() { + return "connector-"; + } + + @Override + public TestContainerId identifier() { + return TestContainerId.FLINK_1_14; + } + @Override + protected String getDockerImage() { + return "flink:1.14.6-scala_2.12-java11"; + } +} diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java index ddd2012..7831b09 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java @@ -7,7 +7,10 @@ import static com.geedgenetworks.test.common.container.EngineType.FLINK; @Getter @AllArgsConstructor public enum TestContainerId { - FLINK_1_13(FLINK, "1.13.6"); + FLINK_1_13(FLINK, "1.13.1"), + FLINK_1_14(FLINK, "1.14.6"), + FLINK_1_15(FLINK, "1.15.3"), + FLINK_1_16(FLINK, "1.16.0"); private final EngineType engineType; private final String version; diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java new file mode 100644 index 0000000..eba669a --- /dev/null +++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java @@ -0,0 +1,198 @@ +package com.geedgenetworks.test.e2e.base; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.TypeReference; +import com.alibaba.nacos.client.naming.utils.CollectionUtils; +import com.geedgenetworks.test.common.TestSuiteBase; +import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer; +import com.geedgenetworks.test.common.container.ContainerExtendedFactory; +import com.geedgenetworks.test.common.container.TestContainerId; +import com.geedgenetworks.test.common.junit.DisabledOnContainer; +import com.geedgenetworks.test.common.junit.TestContainerExtension; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.awaitility.Awaitility.await; + +@Slf4j +@DisabledOnContainer( + value = {TestContainerId.FLINK_1_14}, + type = {}, + disabledReason = "only flink adjusts the parameter configuration rules") +public class EnvParameterTest extends TestSuiteBase { + @TestContainerExtension + protected final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/grootstream && chown -R flink /tmp/grootstream"); + Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr()); + }; + + @TestTemplate + public void testGeneralEnvParameter(AbstractTestFlinkContainer container) + throws IOException, InterruptedException { + genericTest( + "/test_env_parameter_inline_to_print.yaml", container); + } + + + public void genericTest(String configPath, AbstractTestFlinkContainer container) + throws IOException, InterruptedException { + CompletableFuture.supplyAsync( + () -> { + try { + return container.executeJob(configPath); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + // wait obtain job id + AtomicReference<String> jobId = new AtomicReference<>(); + await().atMost(300000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Map<String, Object> jobInfo = JSON.parseObject(container.executeJobManagerInnerCommand( + "curl http://localhost:8081/jobs/overview"), new TypeReference<Map<String, Object>>() { + }); + List<Map<String, Object>> jobs = + (List<Map<String, Object>>) jobInfo.get("jobs"); + if (!CollectionUtils.isEmpty(jobs)) { + jobId.set(jobs.get(0).get("jid").toString()); + } + Assertions.assertNotNull(jobId.get()); + }); + + // obtain job info + AtomicReference<Map<String, Object>> jobInfoReference = new AtomicReference<>(); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Map<String, Object> jobInfo = JSON.parseObject( container.executeJobManagerInnerCommand( + String.format( + "curl http://localhost:8081/jobs/%s", + jobId.get())), new TypeReference<Map<String, Object>>() { + }); + + // wait the job initialization is complete and enters the Running state + if (null != jobInfo && "RUNNING".equals(jobInfo.get("state"))) { + jobInfoReference.set(jobInfo); + } + Assertions.assertNotNull(jobInfoReference.get()); + }); + Map<String, Object> jobInfo = jobInfoReference.get(); + + // obtain execution configuration + Map<String, Object> jobConfig = JSON.parseObject(container.executeJobManagerInnerCommand( + String.format( + "curl http://localhost:8081/jobs/%s/config", jobId.get())), new TypeReference<Map<String, Object>>() { + }); + + Map<String, Object> executionConfig = + (Map<String, Object>) jobConfig.get("execution-config"); + + // obtain checkpoint configuration + Map<String, Object> checkpointConfig = + JSON.parseObject(container.executeJobManagerInnerCommand( + String.format( + "curl http://localhost:8081/jobs/%s/checkpoints/config", jobId.get())), new TypeReference<Map<String, Object>>() { + }); + + // obtain checkpoint storage + AtomicReference<Map<String, Object>> completedCheckpointReference = new AtomicReference<>(); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Map<String, Object> checkpointsInfo = + JSON.parseObject(container.executeJobManagerInnerCommand( + String.format( + "curl http://localhost:8081/jobs/%s/checkpoints", jobId.get())), new TypeReference<Map<String, Object>>() { + }); + Map<String, Object> latestCheckpoint = + (Map<String, Object>) checkpointsInfo.get("latest"); + // waiting for at least one checkpoint trigger + if (null != latestCheckpoint) { + completedCheckpointReference.set( + (Map<String, Object>) latestCheckpoint.get("completed")); + Assertions.assertNotNull(completedCheckpointReference.get()); + } + }); + /** + * adjust the configuration of this {@link + * com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName} to use the 'flink.' and the + * flink parameter name, and check whether the configuration takes effect + */ + // PARALLELISM + int parallelism = (int) executionConfig.get("job-parallelism"); + Assertions.assertEquals(1, parallelism); + + // MAX_PARALLELISM + int maxParallelism = (int) jobInfo.get("maxParallelism"); + Assertions.assertEquals(5, maxParallelism); + + // CHECKPOINT_INTERVAL + int interval = (int) checkpointConfig.get("interval"); + Assertions.assertEquals(10000, interval); + + // CHECKPOINT_MODE + String mode = checkpointConfig.get("mode").toString(); + Assertions.assertEquals("exactly_once", mode); + + // CHECKPOINT_TIMEOUT + int checkpointTimeout = (int) checkpointConfig.get("timeout"); + Assertions.assertEquals(1200000, checkpointTimeout); + + // CHECKPOINT_DATA_URI + String externalPath = completedCheckpointReference.get().get("external_path").toString(); + Assertions.assertTrue(externalPath.startsWith("file:/tmp/grootstream/checkpoints")); + + // MAX_CONCURRENT_CHECKPOINTS + int maxConcurrent = (int) checkpointConfig.get("max_concurrent"); + Assertions.assertEquals(2, maxConcurrent); + + // CHECKPOINT_CLEANUP_MODE + Map<String, Object> externalizationMap = + (Map<String, Object>) checkpointConfig.get("externalization"); + boolean externalization = (boolean) externalizationMap.get("delete_on_cancellation"); + Assertions.assertTrue(externalization); + + // MIN_PAUSE_BETWEEN_CHECKPOINTS + int minPause = (int) checkpointConfig.get("min_pause"); + Assertions.assertEquals(100, minPause); + + // FAIL_ON_CHECKPOINTING_ERRORS + int tolerableFailedCheckpoints = (int) checkpointConfig.get("tolerable_failed_checkpoints"); + Assertions.assertEquals(5, tolerableFailedCheckpoints); + + // RESTART_STRATEGY / because the restart strategy is fixed-delay in config file, so don't + // check failure-rate + String restartStrategy = executionConfig.get("restart-strategy").toString(); + Assertions.assertTrue(restartStrategy.contains("fixed delay")); + + // RESTART_ATTEMPTS + Assertions.assertTrue(restartStrategy.contains("2 restart attempts")); + + // RESTART_DELAY_BETWEEN_ATTEMPTS + Assertions.assertTrue(restartStrategy.contains("fixed delay (1000 ms)")); + + // STATE_BACKEND + String stateBackend = checkpointConfig.get("state_backend").toString(); + Assertions.assertTrue(stateBackend.contains("RocksDBStateBackend")); + } + + + +} + diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/TestUtils.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/TestUtils.java new file mode 100644 index 0000000..4aa2dc6 --- /dev/null +++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/TestUtils.java @@ -0,0 +1,21 @@ +package com.geedgenetworks.test.e2e.base; + +import lombok.extern.slf4j.Slf4j; + +import java.io.File; + +@Slf4j +public class TestUtils { + public static String getResource(String confFile) { + return System.getProperty("user.dir") + + File.separator + + "src" + + File.separator + + "test" + + File.separator + + "resources" + + File.separator + + confFile; + } + +} diff --git a/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml new file mode 100644 index 0000000..c9ebf10 --- /dev/null +++ b/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml @@ -0,0 +1,45 @@ +sources: + inline_source: + type: inline + properties: + data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]' + format: json + json.ignore.parse.errors: false + +sinks: + print_sink: + type: print + properties: + format: json + mode: log_warn + +application: + env: + name: example-inline-to-print + parallelism: 1 + execution: + checkpointing: + interval: 10000 + mode: exactly_once + timeout: 1200000 + data-uri: file:///tmp/grootstream/checkpoints + max-concurrent-checkpoints: 2 + cleanup: true + min-pause: 100 + tolerable-failed-checkpoints: 5 + restart: + strategy: fixed-delay + attempts: 2 + delayBetweenAttempts: 1000 + state: + backend: rocksdb + flink: + pipeline: + max-parallelism: 5 + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [print_sink] + - name: print_sink + downstream: []
\ No newline at end of file |
