summaryrefslogtreecommitdiff
path: root/groot-tests
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-03-16 14:40:22 +0800
committerdoufenghu <[email protected]>2024-03-16 14:40:22 +0800
commitcd00fed1c16ae236ddeb18dac45d567736960c3e (patch)
treeff301b6af8b1551efbec62c88683df948b8fee0b /groot-tests
parent9a7cc00ee0c27ec664b96f151df50ed21f6831e0 (diff)
[Improve][Tests] Improve unit test in Flink13Contaner.
Diffstat (limited to 'groot-tests')
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java78
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink17Container.java (renamed from groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink14Container.java)6
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java4
-rw-r--r--groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java2
-rw-r--r--groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java91
-rw-r--r--groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml2
6 files changed, 155 insertions, 28 deletions
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
index 39c1e2c..3f8435d 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
@@ -1,32 +1,22 @@
package com.geedgenetworks.test.common.container;
import cn.hutool.core.util.XmlUtil;
-import cn.hutool.json.XML;
import com.geedgenetworks.bootstrap.utils.ConfigBuilder;
-import com.geedgenetworks.common.config.CheckConfigUtil;
-import com.geedgenetworks.common.config.CheckResult;
-import com.geedgenetworks.common.config.SourceConfigOptions;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.core.pojo.SourceConfig;
-import com.geedgenetworks.core.types.StructType;
-import com.geedgenetworks.core.types.Types;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigResolveOptions;
+import groovy.lang.Tuple2;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Assertions;
+import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.MountableFile;
-import org.w3c.dom.Attr;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
-
-import javax.xml.xpath.XPathConstants;
import java.io.File;
+import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
@@ -282,6 +272,68 @@ public final class ContainerUtil {
ConfigResolveOptions.defaults().setAllowUnresolved(true));
}
+ public static List<String> getJVMThreadNames(GenericContainer<?> container)
+ throws IOException, InterruptedException {
+ return getJVMThreads(container).stream().map(Tuple2::getV1).collect(Collectors.toList());
+ }
+
+ public static Map<String, Integer> getJVMLiveObject(GenericContainer<?> container)
+ throws IOException, InterruptedException {
+ Container.ExecResult liveObjects =
+ container.execInContainer("jmap", "-histo:live", getJVMProcessId(container));
+ Assertions.assertEquals(0, liveObjects.getExitCode());
+ String value = liveObjects.getStdout().trim();
+ return Arrays.stream(value.split("\n"))
+ .skip(2)
+ .map(
+ str ->
+ Arrays.stream(str.split(" "))
+ .filter(StringUtils::isNotEmpty)
+ .collect(Collectors.toList()))
+ .filter(list -> list.size() == 4)
+ .collect(
+ Collectors.toMap(
+ list -> list.get(3),
+ list -> Integer.valueOf(list.get(1)),
+ (a, b) -> a));
+ }
+
+ public static List<Tuple2<String, String>> getJVMThreads(GenericContainer<?> container)
+ throws IOException, InterruptedException {
+ Container.ExecResult threads =
+ container.execInContainer("jstack", getJVMProcessId(container));
+ Assertions.assertEquals(0, threads.getExitCode());
+ // Thread name line example
+ // "hz.main.MetricsRegistry.thread-2" #232 prio=5 os_prio=0 tid=0x0000ffff3c003000 nid=0x5e
+ // waiting on condition [0x0000ffff6cf3a000]
+ return Arrays.stream(threads.getStdout().trim().split("\n\n"))
+ .filter(s -> s.startsWith("\""))
+ .map(
+ threadStr ->
+ new Tuple2<>(
+ Arrays.stream(threadStr.split("\n"))
+ .filter(s -> s.startsWith("\""))
+ .map(s -> s.substring(1, s.lastIndexOf("\"")))
+ .findFirst()
+ .get(),
+ threadStr))
+ .collect(Collectors.toList());
+ }
+
+ private static String getJVMProcessId(GenericContainer<?> container)
+ throws IOException, InterruptedException {
+ Container.ExecResult processes = container.execInContainer("jps");
+ Assertions.assertEquals(0, processes.getExitCode());
+ Optional<String> server =
+ Arrays.stream(processes.getStdout().trim().split("\n"))
+ .filter(s -> s.contains("GrootstreamServer"))
+ .findFirst();
+ Assertions.assertTrue(server.isPresent());
+ return server.get().trim().split(" ")[0];
+ }
+
+
+
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink14Container.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink17Container.java
index 690a0e1..371b542 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink14Container.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink17Container.java
@@ -5,7 +5,7 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AutoService(TestContainer.class)
-public class Flink14Container extends AbstractTestFlinkContainer {
+public class Flink17Container extends AbstractTestFlinkContainer {
@Override
protected String getStartModuleName() {
@@ -34,10 +34,10 @@ public class Flink14Container extends AbstractTestFlinkContainer {
@Override
public TestContainerId identifier() {
- return TestContainerId.FLINK_1_14;
+ return TestContainerId.FLINK_1_17 ;
}
@Override
protected String getDockerImage() {
- return "flink:1.14.6-scala_2.12-java11";
+ return "flink:1.17.2-scala_2.12-java11";
}
}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java
index 7831b09..e837d25 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java
@@ -8,9 +8,7 @@ import static com.geedgenetworks.test.common.container.EngineType.FLINK;
@AllArgsConstructor
public enum TestContainerId {
FLINK_1_13(FLINK, "1.13.1"),
- FLINK_1_14(FLINK, "1.14.6"),
- FLINK_1_15(FLINK, "1.15.3"),
- FLINK_1_16(FLINK, "1.16.0");
+ FLINK_1_17(FLINK, "1.17.2");
private final EngineType engineType;
private final String version;
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java
index eba669a..02ce012 100644
--- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java
+++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java
@@ -25,7 +25,7 @@ import static org.awaitility.Awaitility.await;
@Slf4j
@DisabledOnContainer(
- value = {TestContainerId.FLINK_1_14},
+ value = {TestContainerId.FLINK_1_17},
type = {},
disabledReason = "only flink adjusts the parameter configuration rules")
public class EnvParameterTest extends TestSuiteBase {
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java
index 2f9a203..b84d169 100644
--- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java
+++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java
@@ -1,16 +1,91 @@
package com.geedgenetworks.test.e2e.base;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.TypeReference;
+import com.alibaba.nacos.client.naming.utils.CollectionUtils;
+import com.geedgenetworks.test.common.TestSuiteBase;
+import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer;
+import com.geedgenetworks.test.common.container.TestContainer;
+import com.geedgenetworks.test.common.container.TestContainerId;
+import com.geedgenetworks.test.common.junit.DisabledOnContainer;
+import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-
+import org.junit.jupiter.api.TestTemplate;
import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {TestContainerId.FLINK_1_17},
+ type = {},
+ disabledReason = "only flink adjusts the parameter configuration rules")
+public class InlineToPrint extends TestSuiteBase {
+
+ @TestTemplate
+ public void testInlineToPrint(AbstractTestFlinkContainer container) throws IOException, InterruptedException {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return container.executeJob("/inline_to_print.yaml");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
-public class InlineToPrint extends Flink13Container{
+ AtomicReference<String> taskMangerID = new AtomicReference<>();
+ await().atMost(300000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Map<String, Object> taskMangerInfo = JSON.parseObject(container.executeJobManagerInnerCommand(
+ "curl http://localhost:8081/taskmanagers"), new TypeReference<Map<String, Object>>() {
+ });
+ List<Map<String, Object>> taskManagers =
+ (List<Map<String, Object>>) taskMangerInfo.get("taskmanagers");
+ if (!CollectionUtils.isEmpty(taskManagers)) {
+ taskMangerID.set(taskManagers.get(0).get("id").toString());
+ }
+ Assertions.assertNotNull(taskMangerID.get());
+ });
- @Test
- public void testInlineToPrint() throws IOException, InterruptedException {
- Container.ExecResult execResult = executeGrootStreamJob("/inline_to_print.yaml");
- Assertions.assertEquals(0, execResult.getExitCode());
+ AtomicReference<String> jobId = new AtomicReference<>();
+ await().atMost(300000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Map<String, Object> jobInfo = JSON.parseObject(container.executeJobManagerInnerCommand(
+ "curl http://localhost:8081/jobs/overview"), new TypeReference<Map<String, Object>>() {
+ });
+ List<Map<String, Object>> jobs =
+ (List<Map<String, Object>>) jobInfo.get("jobs");
+ if (!CollectionUtils.isEmpty(jobs)) {
+ jobId.set(jobs.get(0).get("jid").toString());
+ }
+ Assertions.assertNotNull(jobId.get());
+ });
+ //Obtain job metrics
+ AtomicReference<List<Map<String, Object>>> jobNumRestartsReference = new AtomicReference<>();
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Thread.sleep(5000);
+ String result = container.executeJobManagerInnerCommand(
+ String.format(
+ "curl http://localhost:8081/jobs/%s/metrics?get=numRestarts", jobId.get()));
+ List<Map<String, Object>> jobNumRestartsInfo = JSON.parseObject(result, new TypeReference<List<Map<String, Object>>>() {
+ });
+ if (!CollectionUtils.isEmpty(jobNumRestartsInfo)) {
+ jobNumRestartsReference.set(jobNumRestartsInfo);
+ }
+
+ Assertions.assertNotNull(jobNumRestartsReference.get());
+
+ });
}
+
}
diff --git a/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml
index c9ebf10..1d09282 100644
--- a/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml
+++ b/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml
@@ -18,6 +18,8 @@ application:
name: example-inline-to-print
parallelism: 1
execution:
+ runtime-mode: streaming
+ buffer-timeout: 10
checkpointing:
interval: 10000
mode: exactly_once