summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-07-25 18:18:34 +0800
committerdoufenghu <[email protected]>2024-07-25 18:18:34 +0800
commitb7dd745af52b972f86816a71d9b363296d484a3a (patch)
treef6677e8d0678813026c6e2aa08215b450d93de5f
parent56e08d0fb05df64e7745cd52282593a28dce67c6 (diff)
[Improve][e2e-kafka] Kafka supports SASL_PLAINTEXT and includes a producer quota test based on the topic.
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java7
-rw-r--r--groot-tests/test-common/src/test/resources/grootstream.yaml17
-rw-r--r--groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java60
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_client_jass_cli.properties3
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml9
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_server_jaas.conf8
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml4
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml3
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml3
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml4
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml3
-rw-r--r--pom.xml2
12 files changed, 101 insertions, 22 deletions
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
index c22dfe4..811bdf6 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
@@ -106,6 +106,13 @@ public final class ContainerUtil {
container.withCopyFileToContainer(MountableFile.forHostPath(configPath),
Paths.get(GrootStreamHomeInContainer, "config").toString());
+ // copy grootstream.yaml
+ final String grootTestsCommonPath = PROJECT_ROOT_PATH + "/groot-tests/test-common/src/test/resources";
+ checkPathExist(grootTestsCommonPath);
+ container.withCopyFileToContainer(
+ MountableFile.forHostPath(grootTestsCommonPath + "/grootstream.yaml"),
+ Paths.get(GrootStreamHomeInContainer, "config", "grootstream.yaml").toString());
+
// copy bin
final String startBinPath = startModulePath + File.separator + "src/main/bin/";
diff --git a/groot-tests/test-common/src/test/resources/grootstream.yaml b/groot-tests/test-common/src/test/resources/grootstream.yaml
new file mode 100644
index 0000000..5520945
--- /dev/null
+++ b/groot-tests/test-common/src/test/resources/grootstream.yaml
@@ -0,0 +1,17 @@
+grootstream:
+ knowledge_base:
+ - name: tsg_ip_asn
+ fs_type: local
+ fs_path: /tmp/grootstream/config/dat/
+ files:
+ - asn_builtin.mmdb
+ - name: tsg_ip_location
+ fs_type: local
+ fs_path: /tmp/grootstream/config/dat/
+ files:
+ - ip_builtin.mmdb
+ properties:
+ hos.path: http://192.168.44.12:9098/hos
+ hos.bucket.name.traffic_file: traffic_file_bucket
+ hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket
+ scheduler.knowledge_base.update.interval.minutes: 5
diff --git a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java
index fbf31f7..56108c5 100644
--- a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java
+++ b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java
@@ -11,11 +11,14 @@ import com.geedgenetworks.test.common.junit.DisabledOnContainer;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -30,6 +33,7 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
import java.io.IOException;
import java.time.Duration;
@@ -60,6 +64,17 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
.withNetwork(NETWORK)
.withNetworkAliases(KAFKA_HOST)
+ .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
+ .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
+ .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT")
+ .withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN")
+ .withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "PLAIN")
+ .withEnv("KAFKA_LISTENER_NAME_BROKER_SASL_ENABLED_MECHANISMS", "PLAIN")
+ // .withEnv("KAFKA_AUTHORIZER_CLASS_NAME", "kafka.security.authorizer.AclAuthorizer")
+ .withEnv("KAFKA_SUPER_USERS", "User:admin")
+ .withEnv("KAFKA_OPTS", "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf")
+ .withCopyFileToContainer(MountableFile.forClasspathResource("kafka_server_jaas.conf"), "/etc/kafka/kafka_server_jaas.conf")
+ .withCopyFileToContainer(MountableFile.forClasspathResource("kafka_client_jass_cli.properties"), "/etc/kafka/kafka_client_jass_cli.properties")
.withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
Startables.deepStart(Stream.of(kafkaContainer)).join();
log.info("Kafka container started successfully");
@@ -72,12 +87,12 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
log.info("Write 100 records to topic test_topic_source");
generateTestData(DEFAULT_TEST_TOPIC_SOURCE,0, 100);
- executeShell("kafka-topics --create --topic SESSION-RECORD-QUOTA-TEST --bootstrap-server kafkaCluster:9092 --partitions 3 --replication-factor 1");
+
}
@TestTemplate
- public void testKafkaSource(TestContainer container) {
+ public void testKafkaAsSourceConsume(TestContainer container) {
generateTestData("test_topic_json", 0, 10);
CompletableFuture.supplyAsync(
() -> {
@@ -98,7 +113,7 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
}
@TestTemplate
- public void testKafkaSourceErrorSchema(TestContainer container) {
+ public void testKafkaAsSourceConsumeErrorSchema(TestContainer container) {
generateTestData("test_topic_error_json", 0, 10);
CompletableFuture.supplyAsync(
() -> {
@@ -121,7 +136,7 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
}
@TestTemplate
- public void testKafkaSink(TestContainer container) throws IOException, InterruptedException {
+ public void testKafkaAsSink(TestContainer container) throws IOException, InterruptedException {
CompletableFuture.supplyAsync(
() -> {
try {
@@ -146,7 +161,11 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
}
@TestTemplate
- public void testKafkaProducerQuota(TestContainer container) throws IOException, InterruptedException {
+ public void testKafkaAsSinkProducerQuota(TestContainer container) throws IOException, InterruptedException {
+ //Create topic with 3 partitions
+ executeShell("kafka-topics --create --topic SESSION-RECORD-QUOTA-TEST --bootstrap-server kafkaCluster:9092 --partitions 3 --replication-factor 1 --command-config /etc/kafka/kafka_client_jass_cli.properties");
+ //Set producer quota to 5KB/s
+ executeShell("kafka-configs --bootstrap-server kafkaCluster:9092 --alter --add-config 'producer_byte_rate=5120' --entity-type users --entity-name admin --entity-type clients --entity-name SESSION-RECORD-QUOTA-TEST --command-config /etc/kafka/kafka_client_jass_cli.properties ");
CompletableFuture.supplyAsync(
() -> {
@@ -164,8 +183,8 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
await().atMost(300000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
- data.addAll(getKafkaConsumerListData("SESSION-RECORD-QUOTA-TEST"));
- Assertions.assertTrue(data.size()>10); // Check if all records are consumed
+ data.addAll(getKafkaConsumerListData("SESSION-RECORD-QUOTA-TEST"));
+ Assertions.assertTrue(StringUtils.contains(container.getServerLogs(), "TimeoutException") && data.size()>1000);
});
}
@@ -173,7 +192,7 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
@TestTemplate
- public void testKafkaSinkHandleErrorJsonFormat(TestContainer container) throws IOException, InterruptedException {
+ public void testKafkaAsSinkHandleErrorJsonFormat(TestContainer container) throws IOException, InterruptedException {
CompletableFuture. supplyAsync(
() -> {
try {
@@ -262,19 +281,25 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ properties.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+ properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";");
producer = new KafkaProducer<>(properties);
}
private Properties kafkaConsumerConfig(String consumeGroup) {
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
- props.put(ConsumerConfig.GROUP_ID_CONFIG, consumeGroup);
- props.put(
+ Properties properties = new Properties();
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumeGroup);
+ properties.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+ properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";");
+ properties.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.toString().toLowerCase());
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- return props;
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ return properties;
}
private Properties kafkaByteConsumerConfig() {
@@ -340,12 +365,11 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
private void executeShell(String command) {
try {
Container.ExecResult result = kafkaContainer.execInContainer("/bin/sh", "-c", command);
- log.info("Execute shell command result: {}", result.getStdout());
- log.info("Execute shell command error: {}", result.getStderr());
+ log.info("Execute shell command result: {},{}", result.getStdout(), result.getStderr());
+
} catch (Exception e) {
log.error("Execute shell command error: {}", e.getMessage());
}
}
-
}
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_client_jass_cli.properties b/groot-tests/test-e2e-kafka/src/test/resources/kafka_client_jass_cli.properties
new file mode 100644
index 0000000..986cdb9
--- /dev/null
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_client_jass_cli.properties
@@ -0,0 +1,3 @@
+security.protocol=SASL_PLAINTEXT
+sasl.mechanism=PLAIN
+sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; \ No newline at end of file
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml
index a1d6d57..16d34a5 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml
@@ -3,7 +3,7 @@ sources: # [object] Define connector source
type: mock
properties:
mock.desc.file.path: /tmp/grootstream/config/template/mock_schema/session_record_mock_desc.json
- rows.per.second: 100
+ rows.per.second: 100000
processing_pipelines:
etl_processor:
@@ -90,12 +90,15 @@ sinks:
properties:
topic: SESSION-RECORD-QUOTA-TEST
kafka.bootstrap.servers: kafkaCluster:9092
- kafka.retries: 0
+ kafka.client.id: SESSION-RECORD-QUOTA-TEST
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
kafka.batch.size: 262144
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
kafka.compression.type: snappy
format: json
json.ignore.parse.errors: false
@@ -104,7 +107,7 @@ sinks:
application: # [object] Define job configuration
env:
name: kafka_producer_quota
- parallelism: 3
+ parallelism: 1
shade.identifier: default
pipeline:
object-reuse: true
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_server_jaas.conf b/groot-tests/test-e2e-kafka/src/test/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..cb4553f
--- /dev/null
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_server_jaas.conf
@@ -0,0 +1,8 @@
+KafkaServer {
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ username="admin"
+ password="admin"
+ user_admin="admin"
+ user_firewall="admin"
+ user_olap="admin";
+};
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml
index 8f27a28..e12e76b 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml
@@ -40,6 +40,10 @@ sinks:
properties:
topic: test_sink_topic
kafka.bootstrap.servers: kafkaCluster:9092
+ kafka.client.id: test_sink_topic
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml
index 0fb0d3f..d65157a 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml
@@ -40,6 +40,9 @@ sinks:
properties:
topic: test_handle_error_json_format_topic
kafka.bootstrap.servers: kafkaCluster:9092
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml
index fdd7817..d9cb80f 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml
@@ -40,6 +40,9 @@ sinks:
properties:
topic: test_skip_error_json_format_topic
kafka.bootstrap.servers: kafkaCluster:9092
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml
index 0ae7258..3403ab9 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml
@@ -13,6 +13,9 @@ sources:
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
kafka.group.id: test_topic_json_group
kafka.auto.offset.reset: earliest
format: json
@@ -28,6 +31,7 @@ application: # [object] Define job configuration
env:
name: example-kafka-to-print
parallelism: 1
+ shade.identifier: default
pipeline:
object-reuse: true
topology:
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml
index 503fa1b..1016560 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml
@@ -4,6 +4,9 @@ sources:
properties: # [object] Kafka source properties
topic: test_topic_error_json
kafka.bootstrap.servers: kafkaCluster:9092
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
diff --git a/pom.xml b/pom.xml
index 6981738..5e3e128 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,7 +36,7 @@
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
<flatten-maven-plugin.version>1.3.0</flatten-maven-plugin.version>
<maven-git-commit-id-plugin.version>4.0.4</maven-git-commit-id-plugin.version>
- <testcontainer.version>1.19.6</testcontainer.version>
+ <testcontainer.version>1.20.0</testcontainer.version>
<awaitility.version>4.2.0</awaitility.version>
<spotless.version>2.40.0</spotless.version>
<slf4j.version>1.7.25</slf4j.version>