summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-04-06 01:11:57 +0800
committerdoufenghu <[email protected]>2024-04-06 01:11:57 +0800
commit58460028cf73336292bbf9a6b05fc7612e3f891b (patch)
treec80956e9a7e4aafed5e93c4ef7dd67a8f9d3cbbc
parent95bcb7db323b12d7e7f864dff615d73622d7e688 (diff)
[Feature][Tests] Improve kafka connector test by flink kafka-to-console and inline-to-kafka config.
-rw-r--r--groot-tests/pom.xml2
-rw-r--r--groot-tests/test-e2e-kafka/pom.xml7
-rw-r--r--groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java74
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml64
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml40
5 files changed, 162 insertions, 25 deletions
diff --git a/groot-tests/pom.xml b/groot-tests/pom.xml
index 041cd9d..76f533a 100644
--- a/groot-tests/pom.xml
+++ b/groot-tests/pom.xml
@@ -23,6 +23,7 @@
<maven.deploy.skip>true</maven.deploy.skip>
<maven-jar-plugin.version>2.4</maven-jar-plugin.version>
<rest-assured.version>4.3.1</rest-assured.version>
+ <snappy-java.version>1.1.8.3</snappy-java.version>
</properties>
<dependencies>
@@ -73,6 +74,7 @@
<version>${rest-assured.version}</version>
<scope>test</scope>
</dependency>
+
</dependencies>
<build>
diff --git a/groot-tests/test-e2e-kafka/pom.xml b/groot-tests/test-e2e-kafka/pom.xml
index fbbe0e7..ab1ba72 100644
--- a/groot-tests/test-e2e-kafka/pom.xml
+++ b/groot-tests/test-e2e-kafka/pom.xml
@@ -36,6 +36,13 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>${snappy-java.version}</version>
+ <scope>test</scope>
+ </dependency>
+
diff --git a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java
index 9cd09fc..e3af025 100644
--- a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java
+++ b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java
@@ -1,5 +1,8 @@
package com.geedgenetworks.test.e2e.kafka;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.TypeReference;
+import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.types.StructType;
import com.geedgenetworks.core.types.Types;
@@ -10,11 +13,13 @@ import com.geedgenetworks.test.common.container.TestContainer;
import com.geedgenetworks.test.common.container.TestContainerId;
import com.geedgenetworks.test.common.junit.DisabledOnContainer;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -23,18 +28,26 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.base.CharMatcher;
+import org.testcontainers.shaded.com.google.common.base.Strings;
+import org.testcontainers.shaded.com.google.common.primitives.Chars;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
+import static org.awaitility.Awaitility.await;
+
@Slf4j
@DisabledOnContainer(
value = {TestContainerId.FLINK_1_17},
@@ -47,11 +60,11 @@ public class KafkaConnectorTest extends TestSuiteBase implements TestResource {
private static final String KAFKA_HOST = "kafkaCluster";
private KafkaProducer<byte[], byte[]> producer;
-
+ private static final String DEFAULT_TEST_TOPIC_SOURCE = "test_topic_source";
@Override
@BeforeAll
- public void startUp() throws Exception {
+ public void startUp() {
kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
.withNetwork(NETWORK)
.withNetworkAliases(KAFKA_HOST)
@@ -67,46 +80,57 @@ public class KafkaConnectorTest extends TestSuiteBase implements TestResource {
.untilAsserted(this::initKafkaProducer);
log.info("Write 100 records to topic test_topic_source");
+ generateTestData(DEFAULT_TEST_TOPIC_SOURCE,0, 100);
+ }
- StructType dataType = Types.parseStructType("client_ip: string, server_ip: String");
- JsonSerializer serializer = new JsonSerializer(dataType);
- generateTestData(serializer::serialize, 0, 100);
+ @TestTemplate
+ public void testSourceKafkaJsonToConsole(TestContainer container) {
+ generateTestData("test_topic_json", 0, 10);
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return container.executeJob("/kafka_source_to_console.yaml");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+
+ await().atMost(300000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ String logs = container.getServerLogs();
+ Assertions.assertEquals(StringUtils.countMatches(logs, "PrintSinkFunction"), 10);
+ });
}
- private void generateTestData(ProducerRecordConverter converter, int start, int end) {
-
+ private void generateTestData(String topic, int start, int end) {
+ StructType dataType = Types.parseStructType("id: int, client_ip: string, server_ip: string, flag: string");
+ JsonSerializer serializer = new JsonSerializer(dataType);
for (int i = start; i < end; i++) {
Map<String, Object> row = Map
- .of("client_ip", "192.168.40.12", "server_ip", "8.8.8.8");
+ .of("id", i,
+ "client_ip", "192.168.40.12",
+ "server_ip", "8.8.8.8" ,
+ "flag", Boolean.FALSE.booleanValue());
ProducerRecord<byte[], byte[]> record =
- new ProducerRecord<>("TEST-TOPIC-SOURCE", converter.convert(row));
+ new ProducerRecord<>(topic, serializer.serialize(row));
producer.send(record);
}
-
-
}
@TestTemplate
- public void testSourceKafkaToConsole(TestContainer container) throws IOException, InterruptedException {
-
- List<String> data = getKafkaConsumerListData("TEST-TOPIC-SOURCE");
-
- for (String record : data) {
- log.info("Record: {}", record);}
-
- Assertions.assertEquals(100, data.size());
+ public void testSinkKafka(TestContainer container) throws IOException, InterruptedException {
+ Container.ExecResult execResult = container.executeJob("/kafka_sink_inline_to_kafka.yaml");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ List<String> data = getKafkaConsumerListData("test_topic");
+ Assertions.assertEquals(10, data.size()); // Check if all 10 records are consumed
}
- interface ProducerRecordConverter {
- byte[] convert(Map<String, Object> row);
- }
-
-
-
@AfterAll
@Override
public void tearDown() throws Exception {
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml
new file mode 100644
index 0000000..dbfbc1e
--- /dev/null
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml
@@ -0,0 +1,64 @@
+sources: # [object] Define connector source
+ inline_source:
+ type: inline
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: string
+ properties:
+ #
+ # [string] Event Data, it will be parsed to Map<String, Object> by the specified format.
+ #
+ data: '{"recv_time": 1705565615, "log_id":206211012872372224, "tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}'
+ format: json
+ interval.per.row: 1s
+ repeat.count: 10
+ json.ignore.parse.errors: false
+
+
+sinks:
+ connector_kafka:
+ type: kafka
+ properties:
+ topic: test_topic
+ kafka.bootstrap.servers: kafkaCluster:9092
+ kafka.retries: 0
+ kafka.linger.ms: 10
+ kafka.request.timeout.ms: 30000
+ kafka.batch.size: 262144
+ kafka.buffer.memory: 134217728
+ kafka.max.request.size: 10485760
+ kafka.compression.type: snappy
+ format: json
+ log.failures.only: true
+
+application: # [object] Define job configuration
+ env:
+ name: example-inline-to-kafka
+ parallelism: 1
+ shade.identifier: default
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: inline_source
+ downstream: [ connector_kafka ]
+ - name: connector_kafka
+ downstream: [] \ No newline at end of file
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml
new file mode 100644
index 0000000..2afee47
--- /dev/null
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml
@@ -0,0 +1,40 @@
+sources:
+ kafka_source:
+ type : kafka
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ properties: # [object] Kafka source properties
+ topic: test_topic_json
+ kafka.bootstrap.servers: kafkaCluster:9092
+ kafka.session.timeout.ms: 60000
+ kafka.max.poll.records: 3000
+ kafka.max.partition.fetch.bytes: 31457280
+ kafka.group.id: test_topic_json_group
+ kafka.auto.offset.reset: earliest
+ format: json
+
+sinks: # [object] Define connector sink
+ print_sink:
+ type: print
+ properties:
+ mode: log_warn
+ format: json
+
+application: # [object] Define job configuration
+ env:
+ name: example-kafka-to-print
+ parallelism: 1
+ pipeline:
+ object-reuse: true
+ execution:
+ restart:
+ strategy: no
+ topology:
+ - name: kafka_source
+ downstream: [print_sink]
+ - name: print_sink
+ downstream: [] \ No newline at end of file