diff options
| author | doufenghu <[email protected]> | 2024-07-24 11:50:48 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-07-24 11:50:48 +0800 |
| commit | bdf64cb08bce82523f6c61d96c8bed36d9d90385 (patch) | |
| tree | 85a3556e8168c7c8fa9bfc1790b5148bf44e84f5 /groot-tests | |
| parent | 3cff9a87fa0beab38caff2b34d7344b4186e24e1 (diff) | |
[Feature][e2e-kafka] Flink image add flink-shaded-hadoop-2-uber-2.7.5-8.0.jar. Add mock data to kafka integration test.
Diffstat (limited to 'groot-tests')
6 files changed, 180 insertions, 21 deletions
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java index 54719bb..0f1e3f7 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java @@ -79,8 +79,8 @@ public abstract class AbstractTestContainer implements TestContainer { command.add(ContainerUtil.adaptPathForWin(binPath)); command.add("--config"); command.add(ContainerUtil.adaptPathForWin(confInContainerPath)); - /* command.add("--target"); - command.add("remote");*/ + command.add("--target"); + command.add("remote"); command.addAll(getExtraStartShellCommands()); return executeCommand(container, command); } diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java index a9a730e..30e6eb3 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java @@ -7,6 +7,7 @@ import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.images.PullPolicy; import org.testcontainers.lifecycle.Startables; import org.testcontainers.utility.DockerLoggerFactory; @@ -54,7 +55,8 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer { .waitingFor( new LogMessageWaitStrategy() .withRegEx(".*Starting the resource manager.*") - .withStartupTimeout(Duration.ofMinutes(2))); + .withStartupTimeout(Duration.ofMinutes(2))) + ; // Copy groot-stream bootstrap and some other files to the container copyGrootStreamStarterToContainer(jobManager); diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java index 58eee29..c22dfe4 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java @@ -72,13 +72,6 @@ public final class ContainerUtil { // copy libs - String hbaseClientJar = "hbase-client-shaded-" + getProjectVersion() + ".jar"; - Path hbaseClientJarPath = - Paths.get(PROJECT_ROOT_PATH, "groot-shaded/hbase-client-shaded", "target", hbaseClientJar); - container.withCopyFileToContainer( - MountableFile.forHostPath(hbaseClientJarPath), - Paths.get(GrootStreamHomeInContainer, "lib", hbaseClientJar).toString()); - String formatJsonJar = "format-json-" + getProjectVersion() + ".jar"; Path formatJsonJarPath = @@ -94,6 +87,18 @@ public final class ContainerUtil { MountableFile.forHostPath(formatProtobufJarPath), Paths.get(GrootStreamHomeInContainer, "lib", formatProtobufJar).toString()); + String formatMsgpackJar = "format-msgpack-" + getProjectVersion() + ".jar"; + Path formatMsgpackJarPath = + Paths.get(PROJECT_ROOT_PATH, "groot-formats/format-msgpack", "target", formatMsgpackJar); + container.withCopyFileToContainer( MountableFile.forHostPath(formatMsgpackJarPath), + Paths.get(GrootStreamHomeInContainer, "lib", formatMsgpackJar).toString()); + + String formatRawJar = "format-raw-" + getProjectVersion() + ".jar"; + Path formatRawJarPath = + Paths.get(PROJECT_ROOT_PATH, "groot-formats/format-raw", "target", formatRawJar); + container.withCopyFileToContainer( MountableFile.forHostPath(formatRawJarPath), + Paths.get(GrootStreamHomeInContainer, "lib", formatRawJar).toString()); + //copy system config final String configPath = PROJECT_ROOT_PATH + "/config"; diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java index 01d2133..338c696 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java @@ -40,7 +40,7 @@ public class Flink13Container extends AbstractTestFlinkContainer { @Override protected String getDockerImage() { - return "flink:1.13.1-scala_2.11-java11"; + return "192.168.40.153:8082/common/flink:1.13.1-scala_2.11-java11"; } diff --git a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java index d01a514..fbf31f7 100644 --- a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java +++ b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java @@ -51,8 +51,8 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.4.0"; private static final String KAFKA_HOST = "kafkaCluster"; private KafkaProducer<byte[], byte[]> producer; - private static final String DEFAULT_TEST_TOPIC_SOURCE = "test_topic_source"; + private static final String DEFAULT_TEST_TOPIC_CONSUME_GROUP = "test-consume-group"; @Override @BeforeAll @@ -63,7 +63,6 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME))); Startables.deepStart(Stream.of(kafkaContainer)).join(); log.info("Kafka container started successfully"); - Awaitility.given() .ignoreExceptions() .atLeast(100, TimeUnit.MILLISECONDS) @@ -73,9 +72,9 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { log.info("Write 100 records to topic test_topic_source"); generateTestData(DEFAULT_TEST_TOPIC_SOURCE,0, 100); - } - + executeShell("kafka-topics --create --topic SESSION-RECORD-QUOTA-TEST --bootstrap-server kafkaCluster:9092 --partitions 3 --replication-factor 1"); + } @TestTemplate public void testKafkaSource(TestContainer container) { @@ -147,10 +146,37 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { } @TestTemplate - public void testKafkaSinkHandleErrorJsonFormat(TestContainer container) throws IOException, InterruptedException { + public void testKafkaProducerQuota(TestContainer container) throws IOException, InterruptedException { + CompletableFuture.supplyAsync( () -> { try { + Container.ExecResult execResult = container.executeJob("/kafka_producer_quota.yaml"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + return execResult; + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + + List<String> data = Lists.newArrayList(); + await().atMost(300000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + data.addAll(getKafkaConsumerListData("SESSION-RECORD-QUOTA-TEST")); + Assertions.assertTrue(data.size()>10); // Check if all records are consumed + }); + + } + + + + @TestTemplate + public void testKafkaSinkHandleErrorJsonFormat(TestContainer container) throws IOException, InterruptedException { + CompletableFuture. supplyAsync( + () -> { + try { Container.ExecResult execResult = container.executeJob("/kafka_sink_handle_error_json_format.yaml"); Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); return execResult; @@ -236,14 +262,13 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - producer = new KafkaProducer<>(properties); } - private Properties kafkaConsumerConfig() { + private Properties kafkaConsumerConfig(String consumeGroup) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consume-group"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, consumeGroup); props.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase()); @@ -270,7 +295,7 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { private Map<String, String> getKafkaConsumerData(String topicName) { Map<String, String> data = new HashMap<>(); - try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { + try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig(DEFAULT_TEST_TOPIC_CONSUME_GROUP))) { consumer.subscribe(Arrays.asList(topicName)); Map<TopicPartition, Long> offsets = consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); @@ -292,7 +317,7 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { private List<String> getKafkaConsumerListData(String topicName) { List<String> data = new ArrayList<>(); - try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { + try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig(DEFAULT_TEST_TOPIC_CONSUME_GROUP))) { consumer.subscribe(Arrays.asList(topicName)); Map<TopicPartition, Long> offsets = consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); @@ -312,5 +337,15 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { return data; } + private void executeShell(String command) { + try { + Container.ExecResult result = kafkaContainer.execInContainer("/bin/sh", "-c", command); + log.info("Execute shell command result: {}", result.getStdout()); + log.info("Execute shell command error: {}", result.getStderr()); + } catch (Exception e) { + log.error("Execute shell command error: {}", e.getMessage()); + } + } + } diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml new file mode 100644 index 0000000..a1d6d57 --- /dev/null +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml @@ -0,0 +1,117 @@ +sources: # [object] Define connector source + mock_source: + type: mock + properties: + mock.desc.file.path: /tmp/grootstream/config/template/mock_schema/session_record_mock_desc.json + rows.per.second: 100 + +processing_pipelines: + etl_processor: + type: projection + functions: + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: 1 + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ __timestamp ] + output_fields: [ recv_time ] + parameters: + precision: seconds + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ session_id ] + parameters: + data_center_id_num: 2 + - function: EVAL + output_fields: [ ingestion_time ] + parameters: + value_expression: recv_time + + - function: DOMAIN + lookup_fields: [ http_host, ssl_sni, dtls_sni, quic_sni ] + output_fields: [ server_domain ] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + + - function: ASN_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_asn ] + parameters: + kb_name: tsg_ip_asn + option: IP_TO_ASN + + - function: ASN_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_asn ] + parameters: + kb_name: tsg_ip_asn + option: IP_TO_ASN + + - function: GEOIP_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [] + parameters: + kb_name: tsg_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: client_country + PROVINCE: client_super_administrative_area + CITY: client_administrative_area + + - function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [] + parameters: + kb_name: tsg_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: server_country + PROVINCE: server_super_administrative_area + CITY: server_administrative_area + + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: seconds + + +sinks: + print_sink: + type: print + properties: + mode: log_info + format: json + + kafka_sink: + type: kafka + properties: + topic: SESSION-RECORD-QUOTA-TEST + kafka.bootstrap.servers: kafkaCluster:9092 + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + format: json + json.ignore.parse.errors: false + log.failures.only: true + +application: # [object] Define job configuration + env: + name: kafka_producer_quota + parallelism: 3 + shade.identifier: default + pipeline: + object-reuse: true + topology: + - name: mock_source + downstream: [ etl_processor ] + - name: etl_processor + downstream: [ kafka_sink ] + - name: kafka_sink + downstream: []
\ No newline at end of file |
