diff options
| author | doufenghu <[email protected]> | 2024-03-16 14:40:22 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-03-16 14:40:22 +0800 |
| commit | cd00fed1c16ae236ddeb18dac45d567736960c3e (patch) | |
| tree | ff301b6af8b1551efbec62c88683df948b8fee0b /groot-tests | |
| parent | 9a7cc00ee0c27ec664b96f151df50ed21f6831e0 (diff) | |
[Improve][Tests] Improve unit test in Flink13Contaner.
Diffstat (limited to 'groot-tests')
| -rw-r--r-- | groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java | 78 | ||||
| -rw-r--r-- | groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink17Container.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink14Container.java) | 6 | ||||
| -rw-r--r-- | groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java | 4 | ||||
| -rw-r--r-- | groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java | 2 | ||||
| -rw-r--r-- | groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java | 91 | ||||
| -rw-r--r-- | groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml | 2 |
6 files changed, 155 insertions, 28 deletions
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java index 39c1e2c..3f8435d 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java @@ -1,32 +1,22 @@ package com.geedgenetworks.test.common.container; import cn.hutool.core.util.XmlUtil; -import cn.hutool.json.XML; import com.geedgenetworks.bootstrap.utils.ConfigBuilder; -import com.geedgenetworks.common.config.CheckConfigUtil; -import com.geedgenetworks.common.config.CheckResult; -import com.geedgenetworks.common.config.SourceConfigOptions; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.core.pojo.SourceConfig; -import com.geedgenetworks.core.types.StructType; -import com.geedgenetworks.core.types.Types; import com.google.common.collect.Lists; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigResolveOptions; +import groovy.lang.Tuple2; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.Assertions; +import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; import org.testcontainers.utility.MountableFile; -import org.w3c.dom.Attr; import org.w3c.dom.Document; import org.w3c.dom.Element; -import org.w3c.dom.NodeList; - -import javax.xml.xpath.XPathConstants; import java.io.File; +import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; @@ -282,6 +272,68 @@ public final class ContainerUtil { ConfigResolveOptions.defaults().setAllowUnresolved(true)); } + public static List<String> getJVMThreadNames(GenericContainer<?> container) + throws IOException, InterruptedException { + return getJVMThreads(container).stream().map(Tuple2::getV1).collect(Collectors.toList()); + } + + public static Map<String, Integer> getJVMLiveObject(GenericContainer<?> container) + throws IOException, InterruptedException { + Container.ExecResult liveObjects = + container.execInContainer("jmap", "-histo:live", getJVMProcessId(container)); + Assertions.assertEquals(0, liveObjects.getExitCode()); + String value = liveObjects.getStdout().trim(); + return Arrays.stream(value.split("\n")) + .skip(2) + .map( + str -> + Arrays.stream(str.split(" ")) + .filter(StringUtils::isNotEmpty) + .collect(Collectors.toList())) + .filter(list -> list.size() == 4) + .collect( + Collectors.toMap( + list -> list.get(3), + list -> Integer.valueOf(list.get(1)), + (a, b) -> a)); + } + + public static List<Tuple2<String, String>> getJVMThreads(GenericContainer<?> container) + throws IOException, InterruptedException { + Container.ExecResult threads = + container.execInContainer("jstack", getJVMProcessId(container)); + Assertions.assertEquals(0, threads.getExitCode()); + // Thread name line example + // "hz.main.MetricsRegistry.thread-2" #232 prio=5 os_prio=0 tid=0x0000ffff3c003000 nid=0x5e + // waiting on condition [0x0000ffff6cf3a000] + return Arrays.stream(threads.getStdout().trim().split("\n\n")) + .filter(s -> s.startsWith("\"")) + .map( + threadStr -> + new Tuple2<>( + Arrays.stream(threadStr.split("\n")) + .filter(s -> s.startsWith("\"")) + .map(s -> s.substring(1, s.lastIndexOf("\""))) + .findFirst() + .get(), + threadStr)) + .collect(Collectors.toList()); + } + + private static String getJVMProcessId(GenericContainer<?> container) + throws IOException, InterruptedException { + Container.ExecResult processes = container.execInContainer("jps"); + Assertions.assertEquals(0, processes.getExitCode()); + Optional<String> server = + Arrays.stream(processes.getStdout().trim().split("\n")) + .filter(s -> s.contains("GrootstreamServer")) + .findFirst(); + Assertions.assertTrue(server.isPresent()); + return server.get().trim().split(" ")[0]; + } + + + diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink14Container.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink17Container.java index 690a0e1..371b542 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink14Container.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink17Container.java @@ -5,7 +5,7 @@ import lombok.NoArgsConstructor; @NoArgsConstructor @AutoService(TestContainer.class) -public class Flink14Container extends AbstractTestFlinkContainer { +public class Flink17Container extends AbstractTestFlinkContainer { @Override protected String getStartModuleName() { @@ -34,10 +34,10 @@ public class Flink14Container extends AbstractTestFlinkContainer { @Override public TestContainerId identifier() { - return TestContainerId.FLINK_1_14; + return TestContainerId.FLINK_1_17 ; } @Override protected String getDockerImage() { - return "flink:1.14.6-scala_2.12-java11"; + return "flink:1.17.2-scala_2.12-java11"; } } diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java index 7831b09..e837d25 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java @@ -8,9 +8,7 @@ import static com.geedgenetworks.test.common.container.EngineType.FLINK; @AllArgsConstructor public enum TestContainerId { FLINK_1_13(FLINK, "1.13.1"), - FLINK_1_14(FLINK, "1.14.6"), - FLINK_1_15(FLINK, "1.15.3"), - FLINK_1_16(FLINK, "1.16.0"); + FLINK_1_17(FLINK, "1.17.2"); private final EngineType engineType; private final String version; diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java index eba669a..02ce012 100644 --- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java +++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java @@ -25,7 +25,7 @@ import static org.awaitility.Awaitility.await; @Slf4j @DisabledOnContainer( - value = {TestContainerId.FLINK_1_14}, + value = {TestContainerId.FLINK_1_17}, type = {}, disabledReason = "only flink adjusts the parameter configuration rules") public class EnvParameterTest extends TestSuiteBase { diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java index 2f9a203..b84d169 100644 --- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java +++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java @@ -1,16 +1,91 @@ package com.geedgenetworks.test.e2e.base; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.TypeReference; +import com.alibaba.nacos.client.naming.utils.CollectionUtils; +import com.geedgenetworks.test.common.TestSuiteBase; +import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer; +import com.geedgenetworks.test.common.container.TestContainer; +import com.geedgenetworks.test.common.container.TestContainerId; +import com.geedgenetworks.test.common.junit.DisabledOnContainer; +import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.Container; - +import org.junit.jupiter.api.TestTemplate; import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.awaitility.Awaitility.await; + +@Slf4j +@DisabledOnContainer( + value = {TestContainerId.FLINK_1_17}, + type = {}, + disabledReason = "only flink adjusts the parameter configuration rules") +public class InlineToPrint extends TestSuiteBase { + + @TestTemplate + public void testInlineToPrint(AbstractTestFlinkContainer container) throws IOException, InterruptedException { + CompletableFuture.supplyAsync( + () -> { + try { + return container.executeJob("/inline_to_print.yaml"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); -public class InlineToPrint extends Flink13Container{ + AtomicReference<String> taskMangerID = new AtomicReference<>(); + await().atMost(300000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Map<String, Object> taskMangerInfo = JSON.parseObject(container.executeJobManagerInnerCommand( + "curl http://localhost:8081/taskmanagers"), new TypeReference<Map<String, Object>>() { + }); + List<Map<String, Object>> taskManagers = + (List<Map<String, Object>>) taskMangerInfo.get("taskmanagers"); + if (!CollectionUtils.isEmpty(taskManagers)) { + taskMangerID.set(taskManagers.get(0).get("id").toString()); + } + Assertions.assertNotNull(taskMangerID.get()); + }); - @Test - public void testInlineToPrint() throws IOException, InterruptedException { - Container.ExecResult execResult = executeGrootStreamJob("/inline_to_print.yaml"); - Assertions.assertEquals(0, execResult.getExitCode()); + AtomicReference<String> jobId = new AtomicReference<>(); + await().atMost(300000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Map<String, Object> jobInfo = JSON.parseObject(container.executeJobManagerInnerCommand( + "curl http://localhost:8081/jobs/overview"), new TypeReference<Map<String, Object>>() { + }); + List<Map<String, Object>> jobs = + (List<Map<String, Object>>) jobInfo.get("jobs"); + if (!CollectionUtils.isEmpty(jobs)) { + jobId.set(jobs.get(0).get("jid").toString()); + } + Assertions.assertNotNull(jobId.get()); + }); + //Obtain job metrics + AtomicReference<List<Map<String, Object>>> jobNumRestartsReference = new AtomicReference<>(); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Thread.sleep(5000); + String result = container.executeJobManagerInnerCommand( + String.format( + "curl http://localhost:8081/jobs/%s/metrics?get=numRestarts", jobId.get())); + List<Map<String, Object>> jobNumRestartsInfo = JSON.parseObject(result, new TypeReference<List<Map<String, Object>>>() { + }); + if (!CollectionUtils.isEmpty(jobNumRestartsInfo)) { + jobNumRestartsReference.set(jobNumRestartsInfo); + } + + Assertions.assertNotNull(jobNumRestartsReference.get()); + + }); } + } diff --git a/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml index c9ebf10..1d09282 100644 --- a/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml +++ b/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml @@ -18,6 +18,8 @@ application: name: example-inline-to-print parallelism: 1 execution: + runtime-mode: streaming + buffer-timeout: 10 checkpointing: interval: 10000 mode: exactly_once |
