diff options
| author | doufenghu <[email protected]> | 2024-03-16 14:40:22 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-03-16 14:40:22 +0800 |
| commit | cd00fed1c16ae236ddeb18dac45d567736960c3e (patch) | |
| tree | ff301b6af8b1551efbec62c88683df948b8fee0b | |
| parent | 9a7cc00ee0c27ec664b96f151df50ed21f6831e0 (diff) | |
[Improve][Tests] Improve unit test in Flink13Contaner.
10 files changed, 180 insertions, 30 deletions
diff --git a/docs/env-config.md b/docs/env-config.md index ceb1f9e..3e91f3e 100644 --- a/docs/env-config.md +++ b/docs/env-config.md @@ -16,11 +16,14 @@ Note: The parallelism of a job can be overridden by explicitly configuring the p - System Level: The parallelism of a job can be specified by using the `flink-conf.yaml` file. ### execution.buffer-timeout -The maximum time frequency (milliseconds) for the flushing of the output buffers. If is not specified, the default value is `1000`. +The maximum time frequency (milliseconds) for the flushing of the output buffers. If is not specified, the default value is `100`. You can set directly in Flink's parameter `fink.execution.buffer-timeout` to override the value in the configuration file. - A positive value triggers flushing periodically by that interval - 0 triggers flushing after every record thus minimizing latency - -1 ms triggers flushing only when the output buffer is full thus maximizing throughput +### execution.runtime-mode +This parameter is used to define the runtime mode of the job, the default value is `STREAMING`. If you want to run the job in batch mode, you can set `execution.runtime-mode = "BATCH"`. + ### shade.identifier Specify the method of encryption, if you didn't have the requirement for encrypting or decrypting sensitive information in the configuration file, this option can be ignored. For more details, you can refer to the documentation [config-encryption-decryption](connector/config-encryption-decryption.md) diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml index f39ebb4..8e42947 100644 --- a/groot-bootstrap/pom.xml +++ b/groot-bootstrap/pom.xml @@ -126,7 +126,6 @@ <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> - <scope>${scope}</scope> </dependency> <dependency> diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java index 7309710..e487e2d 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ExecutionConfigKeyName.java @@ -6,6 +6,8 @@ public class ExecutionConfigKeyName { } public static final String PARALLELISM = "parallelism"; public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer-timeout"; + + public static final String RUNTIME_MODE = "execution.runtime-mode"; public static final String MAX_PARALLELISM = "pipeline.max-parallelism"; public static final String TIME_CHARACTERISTIC = "pipeline.time-characteristic"; public static final String CHECKPOINTING_INTERVAL = "execution.checkpointing.interval"; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index 2e29ca9..7141f5e 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -9,6 +9,7 @@ import com.geedgenetworks.common.config.GrootStreamConfig; import com.geedgenetworks.common.utils.ReflectionUtils; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.RestOptions; @@ -21,6 +22,8 @@ import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.TernaryBoolean; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.URL; import java.util.ArrayList; import java.util.List; @@ -137,6 +140,7 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ environment.getConfig().setGlobalJobParameters(configuration); setTimeCharacteristic(); setCheckpoint(); + setRuntimeMode(); EnvironmentUtil.setRestartStrategy(envConfig, environment.getConfig()); if (envConfig.hasPath(ExecutionConfigKeyName.BUFFER_TIMEOUT_MILLIS)) { @@ -156,6 +160,21 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ } + private void setRuntimeMode() { + if (envConfig.hasPath(ExecutionConfigKeyName.RUNTIME_MODE)) { + String runtimeMode = envConfig.getString(ExecutionConfigKeyName.RUNTIME_MODE); + if (runtimeMode.equalsIgnoreCase("batch")) { + environment.setRuntimeMode(RuntimeExecutionMode.BATCH); + } else if (runtimeMode.equalsIgnoreCase("streaming")) { + environment.setRuntimeMode(RuntimeExecutionMode.STREAMING); + } else { + log.warn( + "set runtime-mode failed, unknown runtime-mode [{}],only support batch,streaming", + runtimeMode); + } + } + } + private void setTimeCharacteristic() { if (envConfig.hasPath(ExecutionConfigKeyName.TIME_CHARACTERISTIC)) { String timeType = envConfig.getString(ExecutionConfigKeyName.TIME_CHARACTERISTIC); diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java index 39c1e2c..3f8435d 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java @@ -1,32 +1,22 @@ package com.geedgenetworks.test.common.container; import cn.hutool.core.util.XmlUtil; -import cn.hutool.json.XML; import com.geedgenetworks.bootstrap.utils.ConfigBuilder; -import com.geedgenetworks.common.config.CheckConfigUtil; -import com.geedgenetworks.common.config.CheckResult; -import com.geedgenetworks.common.config.SourceConfigOptions; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.core.pojo.SourceConfig; -import com.geedgenetworks.core.types.StructType; -import com.geedgenetworks.core.types.Types; import com.google.common.collect.Lists; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigResolveOptions; +import groovy.lang.Tuple2; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.Assertions; +import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; import org.testcontainers.utility.MountableFile; -import org.w3c.dom.Attr; import org.w3c.dom.Document; import org.w3c.dom.Element; -import org.w3c.dom.NodeList; - -import javax.xml.xpath.XPathConstants; import java.io.File; +import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; @@ -282,6 +272,68 @@ public final class ContainerUtil { ConfigResolveOptions.defaults().setAllowUnresolved(true)); } + public static List<String> getJVMThreadNames(GenericContainer<?> container) + throws IOException, InterruptedException { + return getJVMThreads(container).stream().map(Tuple2::getV1).collect(Collectors.toList()); + } + + public static Map<String, Integer> getJVMLiveObject(GenericContainer<?> container) + throws IOException, InterruptedException { + Container.ExecResult liveObjects = + container.execInContainer("jmap", "-histo:live", getJVMProcessId(container)); + Assertions.assertEquals(0, liveObjects.getExitCode()); + String value = liveObjects.getStdout().trim(); + return Arrays.stream(value.split("\n")) + .skip(2) + .map( + str -> + Arrays.stream(str.split(" ")) + .filter(StringUtils::isNotEmpty) + .collect(Collectors.toList())) + .filter(list -> list.size() == 4) + .collect( + Collectors.toMap( + list -> list.get(3), + list -> Integer.valueOf(list.get(1)), + (a, b) -> a)); + } + + public static List<Tuple2<String, String>> getJVMThreads(GenericContainer<?> container) + throws IOException, InterruptedException { + Container.ExecResult threads = + container.execInContainer("jstack", getJVMProcessId(container)); + Assertions.assertEquals(0, threads.getExitCode()); + // Thread name line example + // "hz.main.MetricsRegistry.thread-2" #232 prio=5 os_prio=0 tid=0x0000ffff3c003000 nid=0x5e + // waiting on condition [0x0000ffff6cf3a000] + return Arrays.stream(threads.getStdout().trim().split("\n\n")) + .filter(s -> s.startsWith("\"")) + .map( + threadStr -> + new Tuple2<>( + Arrays.stream(threadStr.split("\n")) + .filter(s -> s.startsWith("\"")) + .map(s -> s.substring(1, s.lastIndexOf("\""))) + .findFirst() + .get(), + threadStr)) + .collect(Collectors.toList()); + } + + private static String getJVMProcessId(GenericContainer<?> container) + throws IOException, InterruptedException { + Container.ExecResult processes = container.execInContainer("jps"); + Assertions.assertEquals(0, processes.getExitCode()); + Optional<String> server = + Arrays.stream(processes.getStdout().trim().split("\n")) + .filter(s -> s.contains("GrootstreamServer")) + .findFirst(); + Assertions.assertTrue(server.isPresent()); + return server.get().trim().split(" ")[0]; + } + + + diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink14Container.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink17Container.java index 690a0e1..371b542 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink14Container.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink17Container.java @@ -5,7 +5,7 @@ import lombok.NoArgsConstructor; @NoArgsConstructor @AutoService(TestContainer.class) -public class Flink14Container extends AbstractTestFlinkContainer { +public class Flink17Container extends AbstractTestFlinkContainer { @Override protected String getStartModuleName() { @@ -34,10 +34,10 @@ public class Flink14Container extends AbstractTestFlinkContainer { @Override public TestContainerId identifier() { - return TestContainerId.FLINK_1_14; + return TestContainerId.FLINK_1_17 ; } @Override protected String getDockerImage() { - return "flink:1.14.6-scala_2.12-java11"; + return "flink:1.17.2-scala_2.12-java11"; } } diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java index 7831b09..e837d25 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/TestContainerId.java @@ -8,9 +8,7 @@ import static com.geedgenetworks.test.common.container.EngineType.FLINK; @AllArgsConstructor public enum TestContainerId { FLINK_1_13(FLINK, "1.13.1"), - FLINK_1_14(FLINK, "1.14.6"), - FLINK_1_15(FLINK, "1.15.3"), - FLINK_1_16(FLINK, "1.16.0"); + FLINK_1_17(FLINK, "1.17.2"); private final EngineType engineType; private final String version; diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java index eba669a..02ce012 100644 --- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java +++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/EnvParameterTest.java @@ -25,7 +25,7 @@ import static org.awaitility.Awaitility.await; @Slf4j @DisabledOnContainer( - value = {TestContainerId.FLINK_1_14}, + value = {TestContainerId.FLINK_1_17}, type = {}, disabledReason = "only flink adjusts the parameter configuration rules") public class EnvParameterTest extends TestSuiteBase { diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java index 2f9a203..b84d169 100644 --- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java +++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrint.java @@ -1,16 +1,91 @@ package com.geedgenetworks.test.e2e.base; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.TypeReference; +import com.alibaba.nacos.client.naming.utils.CollectionUtils; +import com.geedgenetworks.test.common.TestSuiteBase; +import com.geedgenetworks.test.common.container.AbstractTestFlinkContainer; +import com.geedgenetworks.test.common.container.TestContainer; +import com.geedgenetworks.test.common.container.TestContainerId; +import com.geedgenetworks.test.common.junit.DisabledOnContainer; +import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.Container; - +import org.junit.jupiter.api.TestTemplate; import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.awaitility.Awaitility.await; + +@Slf4j +@DisabledOnContainer( + value = {TestContainerId.FLINK_1_17}, + type = {}, + disabledReason = "only flink adjusts the parameter configuration rules") +public class InlineToPrint extends TestSuiteBase { + + @TestTemplate + public void testInlineToPrint(AbstractTestFlinkContainer container) throws IOException, InterruptedException { + CompletableFuture.supplyAsync( + () -> { + try { + return container.executeJob("/inline_to_print.yaml"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); -public class InlineToPrint extends Flink13Container{ + AtomicReference<String> taskMangerID = new AtomicReference<>(); + await().atMost(300000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Map<String, Object> taskMangerInfo = JSON.parseObject(container.executeJobManagerInnerCommand( + "curl http://localhost:8081/taskmanagers"), new TypeReference<Map<String, Object>>() { + }); + List<Map<String, Object>> taskManagers = + (List<Map<String, Object>>) taskMangerInfo.get("taskmanagers"); + if (!CollectionUtils.isEmpty(taskManagers)) { + taskMangerID.set(taskManagers.get(0).get("id").toString()); + } + Assertions.assertNotNull(taskMangerID.get()); + }); - @Test - public void testInlineToPrint() throws IOException, InterruptedException { - Container.ExecResult execResult = executeGrootStreamJob("/inline_to_print.yaml"); - Assertions.assertEquals(0, execResult.getExitCode()); + AtomicReference<String> jobId = new AtomicReference<>(); + await().atMost(300000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Map<String, Object> jobInfo = JSON.parseObject(container.executeJobManagerInnerCommand( + "curl http://localhost:8081/jobs/overview"), new TypeReference<Map<String, Object>>() { + }); + List<Map<String, Object>> jobs = + (List<Map<String, Object>>) jobInfo.get("jobs"); + if (!CollectionUtils.isEmpty(jobs)) { + jobId.set(jobs.get(0).get("jid").toString()); + } + Assertions.assertNotNull(jobId.get()); + }); + //Obtain job metrics + AtomicReference<List<Map<String, Object>>> jobNumRestartsReference = new AtomicReference<>(); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Thread.sleep(5000); + String result = container.executeJobManagerInnerCommand( + String.format( + "curl http://localhost:8081/jobs/%s/metrics?get=numRestarts", jobId.get())); + List<Map<String, Object>> jobNumRestartsInfo = JSON.parseObject(result, new TypeReference<List<Map<String, Object>>>() { + }); + if (!CollectionUtils.isEmpty(jobNumRestartsInfo)) { + jobNumRestartsReference.set(jobNumRestartsInfo); + } + + Assertions.assertNotNull(jobNumRestartsReference.get()); + + }); } + } diff --git a/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml index c9ebf10..1d09282 100644 --- a/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml +++ b/groot-tests/test-e2e-base/src/test/resources/test_env_parameter_inline_to_print.yaml @@ -18,6 +18,8 @@ application: name: example-inline-to-print parallelism: 1 execution: + runtime-mode: streaming + buffer-timeout: 10 checkpointing: interval: 10000 mode: exactly_once |
