summaryrefslogtreecommitdiff
path: root/groot-tests
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-07-24 11:50:48 +0800
committerdoufenghu <[email protected]>2024-07-24 11:50:48 +0800
commitbdf64cb08bce82523f6c61d96c8bed36d9d90385 (patch)
tree85a3556e8168c7c8fa9bfc1790b5148bf44e84f5 /groot-tests
parent3cff9a87fa0beab38caff2b34d7344b4186e24e1 (diff)
[Feature][e2e-kafka] Flink image add flink-shaded-hadoop-2-uber-2.7.5-8.0.jar. Add mock data to kafka integration test.
Diffstat (limited to 'groot-tests')
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java4
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java4
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java19
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java2
-rw-r--r--groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java55
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml117
6 files changed, 180 insertions, 21 deletions
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java
index 54719bb..0f1e3f7 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java
@@ -79,8 +79,8 @@ public abstract class AbstractTestContainer implements TestContainer {
command.add(ContainerUtil.adaptPathForWin(binPath));
command.add("--config");
command.add(ContainerUtil.adaptPathForWin(confInContainerPath));
- /* command.add("--target");
- command.add("remote");*/
+ command.add("--target");
+ command.add("remote");
command.addAll(getExtraStartShellCommands());
return executeCommand(container, command);
}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
index a9a730e..30e6eb3 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
@@ -7,6 +7,7 @@ import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.images.PullPolicy;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;
@@ -54,7 +55,8 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
.waitingFor(
new LogMessageWaitStrategy()
.withRegEx(".*Starting the resource manager.*")
- .withStartupTimeout(Duration.ofMinutes(2)));
+ .withStartupTimeout(Duration.ofMinutes(2)))
+ ;
// Copy groot-stream bootstrap and some other files to the container
copyGrootStreamStarterToContainer(jobManager);
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
index 58eee29..c22dfe4 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
@@ -72,13 +72,6 @@ public final class ContainerUtil {
// copy libs
- String hbaseClientJar = "hbase-client-shaded-" + getProjectVersion() + ".jar";
- Path hbaseClientJarPath =
- Paths.get(PROJECT_ROOT_PATH, "groot-shaded/hbase-client-shaded", "target", hbaseClientJar);
- container.withCopyFileToContainer(
- MountableFile.forHostPath(hbaseClientJarPath),
- Paths.get(GrootStreamHomeInContainer, "lib", hbaseClientJar).toString());
-
String formatJsonJar = "format-json-" + getProjectVersion() + ".jar";
Path formatJsonJarPath =
@@ -94,6 +87,18 @@ public final class ContainerUtil {
MountableFile.forHostPath(formatProtobufJarPath),
Paths.get(GrootStreamHomeInContainer, "lib", formatProtobufJar).toString());
+ String formatMsgpackJar = "format-msgpack-" + getProjectVersion() + ".jar";
+ Path formatMsgpackJarPath =
+ Paths.get(PROJECT_ROOT_PATH, "groot-formats/format-msgpack", "target", formatMsgpackJar);
+ container.withCopyFileToContainer( MountableFile.forHostPath(formatMsgpackJarPath),
+ Paths.get(GrootStreamHomeInContainer, "lib", formatMsgpackJar).toString());
+
+ String formatRawJar = "format-raw-" + getProjectVersion() + ".jar";
+ Path formatRawJarPath =
+ Paths.get(PROJECT_ROOT_PATH, "groot-formats/format-raw", "target", formatRawJar);
+ container.withCopyFileToContainer( MountableFile.forHostPath(formatRawJarPath),
+ Paths.get(GrootStreamHomeInContainer, "lib", formatRawJar).toString());
+
//copy system config
final String configPath = PROJECT_ROOT_PATH + "/config";
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java
index 01d2133..338c696 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java
@@ -40,7 +40,7 @@ public class Flink13Container extends AbstractTestFlinkContainer {
@Override
protected String getDockerImage() {
- return "flink:1.13.1-scala_2.11-java11";
+ return "192.168.40.153:8082/common/flink:1.13.1-scala_2.11-java11";
}
diff --git a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java
index d01a514..fbf31f7 100644
--- a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java
+++ b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java
@@ -51,8 +51,8 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.4.0";
private static final String KAFKA_HOST = "kafkaCluster";
private KafkaProducer<byte[], byte[]> producer;
-
private static final String DEFAULT_TEST_TOPIC_SOURCE = "test_topic_source";
+ private static final String DEFAULT_TEST_TOPIC_CONSUME_GROUP = "test-consume-group";
@Override
@BeforeAll
@@ -63,7 +63,6 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
.withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
Startables.deepStart(Stream.of(kafkaContainer)).join();
log.info("Kafka container started successfully");
-
Awaitility.given()
.ignoreExceptions()
.atLeast(100, TimeUnit.MILLISECONDS)
@@ -73,9 +72,9 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
log.info("Write 100 records to topic test_topic_source");
generateTestData(DEFAULT_TEST_TOPIC_SOURCE,0, 100);
- }
-
+ executeShell("kafka-topics --create --topic SESSION-RECORD-QUOTA-TEST --bootstrap-server kafkaCluster:9092 --partitions 3 --replication-factor 1");
+ }
@TestTemplate
public void testKafkaSource(TestContainer container) {
@@ -147,10 +146,37 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
}
@TestTemplate
- public void testKafkaSinkHandleErrorJsonFormat(TestContainer container) throws IOException, InterruptedException {
+ public void testKafkaProducerQuota(TestContainer container) throws IOException, InterruptedException {
+
CompletableFuture.supplyAsync(
() -> {
try {
+ Container.ExecResult execResult = container.executeJob("/kafka_producer_quota.yaml");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ return execResult;
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+
+ List<String> data = Lists.newArrayList();
+ await().atMost(300000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ data.addAll(getKafkaConsumerListData("SESSION-RECORD-QUOTA-TEST"));
+ Assertions.assertTrue(data.size()>10); // Check if all records are consumed
+ });
+
+ }
+
+
+
+ @TestTemplate
+ public void testKafkaSinkHandleErrorJsonFormat(TestContainer container) throws IOException, InterruptedException {
+ CompletableFuture. supplyAsync(
+ () -> {
+ try {
Container.ExecResult execResult = container.executeJob("/kafka_sink_handle_error_json_format.yaml");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
return execResult;
@@ -236,14 +262,13 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-
producer = new KafkaProducer<>(properties);
}
- private Properties kafkaConsumerConfig() {
+ private Properties kafkaConsumerConfig(String consumeGroup) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consume-group");
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, consumeGroup);
props.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.toString().toLowerCase());
@@ -270,7 +295,7 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
private Map<String, String> getKafkaConsumerData(String topicName) {
Map<String, String> data = new HashMap<>();
- try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) {
+ try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig(DEFAULT_TEST_TOPIC_CONSUME_GROUP))) {
consumer.subscribe(Arrays.asList(topicName));
Map<TopicPartition, Long> offsets =
consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
@@ -292,7 +317,7 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
private List<String> getKafkaConsumerListData(String topicName) {
List<String> data = new ArrayList<>();
- try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) {
+ try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig(DEFAULT_TEST_TOPIC_CONSUME_GROUP))) {
consumer.subscribe(Arrays.asList(topicName));
Map<TopicPartition, Long> offsets =
consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
@@ -312,5 +337,15 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
return data;
}
+ private void executeShell(String command) {
+ try {
+ Container.ExecResult result = kafkaContainer.execInContainer("/bin/sh", "-c", command);
+ log.info("Execute shell command result: {}", result.getStdout());
+ log.info("Execute shell command error: {}", result.getStderr());
+ } catch (Exception e) {
+ log.error("Execute shell command error: {}", e.getMessage());
+ }
+ }
+
}
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml
new file mode 100644
index 0000000..a1d6d57
--- /dev/null
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml
@@ -0,0 +1,117 @@
+sources: # [object] Define connector source
+ mock_source:
+ type: mock
+ properties:
+ mock.desc.file.path: /tmp/grootstream/config/template/mock_schema/session_record_mock_desc.json
+ rows.per.second: 100
+
+processing_pipelines:
+ etl_processor:
+ type: projection
+ functions:
+ - function: SNOWFLAKE_ID
+ lookup_fields: ['']
+ output_fields: [log_id]
+ parameters:
+ data_center_id_num: 1
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ __timestamp ]
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+ - function: SNOWFLAKE_ID
+ lookup_fields: [ '' ]
+ output_fields: [ session_id ]
+ parameters:
+ data_center_id_num: 2
+ - function: EVAL
+ output_fields: [ ingestion_time ]
+ parameters:
+ value_expression: recv_time
+
+ - function: DOMAIN
+ lookup_fields: [ http_host, ssl_sni, dtls_sni, quic_sni ]
+ output_fields: [ server_domain ]
+ parameters:
+ option: FIRST_SIGNIFICANT_SUBDOMAIN
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_asn ]
+ parameters:
+ kb_name: tsg_ip_asn
+ option: IP_TO_ASN
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_asn ]
+ parameters:
+ kb_name: tsg_ip_asn
+ option: IP_TO_ASN
+
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: []
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: client_country
+ PROVINCE: client_super_administrative_area
+ CITY: client_administrative_area
+
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: []
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: server_country
+ PROVINCE: server_super_administrative_area
+ CITY: server_administrative_area
+
+
+ - function: CURRENT_UNIX_TIMESTAMP
+ output_fields: [ processing_time ]
+ parameters:
+ precision: seconds
+
+
+sinks:
+ print_sink:
+ type: print
+ properties:
+ mode: log_info
+ format: json
+
+ kafka_sink:
+ type: kafka
+ properties:
+ topic: SESSION-RECORD-QUOTA-TEST
+ kafka.bootstrap.servers: kafkaCluster:9092
+ kafka.retries: 0
+ kafka.linger.ms: 10
+ kafka.request.timeout.ms: 30000
+ kafka.batch.size: 262144
+ kafka.buffer.memory: 134217728
+ kafka.max.request.size: 10485760
+ kafka.compression.type: snappy
+ format: json
+ json.ignore.parse.errors: false
+ log.failures.only: true
+
+application: # [object] Define job configuration
+ env:
+ name: kafka_producer_quota
+ parallelism: 3
+ shade.identifier: default
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: mock_source
+ downstream: [ etl_processor ]
+ - name: etl_processor
+ downstream: [ kafka_sink ]
+ - name: kafka_sink
+ downstream: [] \ No newline at end of file