diff options
| author | doufenghu <[email protected]> | 2024-07-25 18:18:34 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-07-25 18:18:34 +0800 |
| commit | b7dd745af52b972f86816a71d9b363296d484a3a (patch) | |
| tree | f6677e8d0678813026c6e2aa08215b450d93de5f | |
| parent | 56e08d0fb05df64e7745cd52282593a28dce67c6 (diff) | |
[Improve][e2e-kafka] Kafka supports SASL_PLAINTEXT and includes a producer quota test based on the topic.
12 files changed, 101 insertions, 22 deletions
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java index c22dfe4..811bdf6 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java @@ -106,6 +106,13 @@ public final class ContainerUtil { container.withCopyFileToContainer(MountableFile.forHostPath(configPath), Paths.get(GrootStreamHomeInContainer, "config").toString()); + // copy grootstream.yaml + final String grootTestsCommonPath = PROJECT_ROOT_PATH + "/groot-tests/test-common/src/test/resources"; + checkPathExist(grootTestsCommonPath); + container.withCopyFileToContainer( + MountableFile.forHostPath(grootTestsCommonPath + "/grootstream.yaml"), + Paths.get(GrootStreamHomeInContainer, "config", "grootstream.yaml").toString()); + // copy bin final String startBinPath = startModulePath + File.separator + "src/main/bin/"; diff --git a/groot-tests/test-common/src/test/resources/grootstream.yaml b/groot-tests/test-common/src/test/resources/grootstream.yaml new file mode 100644 index 0000000..5520945 --- /dev/null +++ b/groot-tests/test-common/src/test/resources/grootstream.yaml @@ -0,0 +1,17 @@ +grootstream: + knowledge_base: + - name: tsg_ip_asn + fs_type: local + fs_path: /tmp/grootstream/config/dat/ + files: + - asn_builtin.mmdb + - name: tsg_ip_location + fs_type: local + fs_path: /tmp/grootstream/config/dat/ + files: + - ip_builtin.mmdb + properties: + hos.path: http://192.168.44.12:9098/hos + hos.bucket.name.traffic_file: traffic_file_bucket + hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket + scheduler.knowledge_base.update.interval.minutes: 5 diff --git a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java index fbf31f7..56108c5 100644 --- a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java +++ b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java @@ -11,11 +11,14 @@ import com.geedgenetworks.test.common.junit.DisabledOnContainer; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -30,6 +33,7 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables;; import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerLoggerFactory; +import org.testcontainers.utility.MountableFile; import java.io.IOException; import java.time.Duration; @@ -60,6 +64,17 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)) .withNetwork(NETWORK) .withNetworkAliases(KAFKA_HOST) + .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true") + .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1") + .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT") + .withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN") + .withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "PLAIN") + .withEnv("KAFKA_LISTENER_NAME_BROKER_SASL_ENABLED_MECHANISMS", "PLAIN") + // .withEnv("KAFKA_AUTHORIZER_CLASS_NAME", "kafka.security.authorizer.AclAuthorizer") + .withEnv("KAFKA_SUPER_USERS", "User:admin") + .withEnv("KAFKA_OPTS", "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf") + .withCopyFileToContainer(MountableFile.forClasspathResource("kafka_server_jaas.conf"), "/etc/kafka/kafka_server_jaas.conf") + .withCopyFileToContainer(MountableFile.forClasspathResource("kafka_client_jass_cli.properties"), "/etc/kafka/kafka_client_jass_cli.properties") .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME))); Startables.deepStart(Stream.of(kafkaContainer)).join(); log.info("Kafka container started successfully"); @@ -72,12 +87,12 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { log.info("Write 100 records to topic test_topic_source"); generateTestData(DEFAULT_TEST_TOPIC_SOURCE,0, 100); - executeShell("kafka-topics --create --topic SESSION-RECORD-QUOTA-TEST --bootstrap-server kafkaCluster:9092 --partitions 3 --replication-factor 1"); + } @TestTemplate - public void testKafkaSource(TestContainer container) { + public void testKafkaAsSourceConsume(TestContainer container) { generateTestData("test_topic_json", 0, 10); CompletableFuture.supplyAsync( () -> { @@ -98,7 +113,7 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { } @TestTemplate - public void testKafkaSourceErrorSchema(TestContainer container) { + public void testKafkaAsSourceConsumeErrorSchema(TestContainer container) { generateTestData("test_topic_error_json", 0, 10); CompletableFuture.supplyAsync( () -> { @@ -121,7 +136,7 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { } @TestTemplate - public void testKafkaSink(TestContainer container) throws IOException, InterruptedException { + public void testKafkaAsSink(TestContainer container) throws IOException, InterruptedException { CompletableFuture.supplyAsync( () -> { try { @@ -146,7 +161,11 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { } @TestTemplate - public void testKafkaProducerQuota(TestContainer container) throws IOException, InterruptedException { + public void testKafkaAsSinkProducerQuota(TestContainer container) throws IOException, InterruptedException { + //Create topic with 3 partitions + executeShell("kafka-topics --create --topic SESSION-RECORD-QUOTA-TEST --bootstrap-server kafkaCluster:9092 --partitions 3 --replication-factor 1 --command-config /etc/kafka/kafka_client_jass_cli.properties"); + //Set producer quota to 5KB/s + executeShell("kafka-configs --bootstrap-server kafkaCluster:9092 --alter --add-config 'producer_byte_rate=5120' --entity-type users --entity-name admin --entity-type clients --entity-name SESSION-RECORD-QUOTA-TEST --command-config /etc/kafka/kafka_client_jass_cli.properties "); CompletableFuture.supplyAsync( () -> { @@ -164,8 +183,8 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { await().atMost(300000, TimeUnit.MILLISECONDS) .untilAsserted( () -> { - data.addAll(getKafkaConsumerListData("SESSION-RECORD-QUOTA-TEST")); - Assertions.assertTrue(data.size()>10); // Check if all records are consumed + data.addAll(getKafkaConsumerListData("SESSION-RECORD-QUOTA-TEST")); + Assertions.assertTrue(StringUtils.contains(container.getServerLogs(), "TimeoutException") && data.size()>1000); }); } @@ -173,7 +192,7 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { @TestTemplate - public void testKafkaSinkHandleErrorJsonFormat(TestContainer container) throws IOException, InterruptedException { + public void testKafkaAsSinkHandleErrorJsonFormat(TestContainer container) throws IOException, InterruptedException { CompletableFuture. supplyAsync( () -> { try { @@ -262,19 +281,25 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + properties.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); + properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"); producer = new KafkaProducer<>(properties); } private Properties kafkaConsumerConfig(String consumeGroup) { - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, consumeGroup); - props.put( + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumeGroup); + properties.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); + properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"); + properties.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase()); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - return props; + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return properties; } private Properties kafkaByteConsumerConfig() { @@ -340,12 +365,11 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { private void executeShell(String command) { try { Container.ExecResult result = kafkaContainer.execInContainer("/bin/sh", "-c", command); - log.info("Execute shell command result: {}", result.getStdout()); - log.info("Execute shell command error: {}", result.getStderr()); + log.info("Execute shell command result: {},{}", result.getStdout(), result.getStderr()); + } catch (Exception e) { log.error("Execute shell command error: {}", e.getMessage()); } } - } diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_client_jass_cli.properties b/groot-tests/test-e2e-kafka/src/test/resources/kafka_client_jass_cli.properties new file mode 100644 index 0000000..986cdb9 --- /dev/null +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_client_jass_cli.properties @@ -0,0 +1,3 @@ +security.protocol=SASL_PLAINTEXT +sasl.mechanism=PLAIN +sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
\ No newline at end of file diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml index a1d6d57..16d34a5 100644 --- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml @@ -3,7 +3,7 @@ sources: # [object] Define connector source type: mock properties: mock.desc.file.path: /tmp/grootstream/config/template/mock_schema/session_record_mock_desc.json - rows.per.second: 100 + rows.per.second: 100000 processing_pipelines: etl_processor: @@ -90,12 +90,15 @@ sinks: properties: topic: SESSION-RECORD-QUOTA-TEST kafka.bootstrap.servers: kafkaCluster:9092 - kafka.retries: 0 + kafka.client.id: SESSION-RECORD-QUOTA-TEST kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 kafka.batch.size: 262144 kafka.buffer.memory: 134217728 kafka.max.request.size: 10485760 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; kafka.compression.type: snappy format: json json.ignore.parse.errors: false @@ -104,7 +107,7 @@ sinks: application: # [object] Define job configuration env: name: kafka_producer_quota - parallelism: 3 + parallelism: 1 shade.identifier: default pipeline: object-reuse: true diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_server_jaas.conf b/groot-tests/test-e2e-kafka/src/test/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..cb4553f --- /dev/null +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_server_jaas.conf @@ -0,0 +1,8 @@ +KafkaServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="admin" + password="admin" + user_admin="admin" + user_firewall="admin" + user_olap="admin"; +}; diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml index 8f27a28..e12e76b 100644 --- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml @@ -40,6 +40,10 @@ sinks: properties: topic: test_sink_topic kafka.bootstrap.servers: kafkaCluster:9092 + kafka.client.id: test_sink_topic + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; kafka.retries: 0 kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml index 0fb0d3f..d65157a 100644 --- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml @@ -40,6 +40,9 @@ sinks: properties: topic: test_handle_error_json_format_topic kafka.bootstrap.servers: kafkaCluster:9092 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; kafka.retries: 0 kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml index fdd7817..d9cb80f 100644 --- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml @@ -40,6 +40,9 @@ sinks: properties: topic: test_skip_error_json_format_topic kafka.bootstrap.servers: kafkaCluster:9092 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; kafka.retries: 0 kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml index 0ae7258..3403ab9 100644 --- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml @@ -13,6 +13,9 @@ sources: kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; kafka.group.id: test_topic_json_group kafka.auto.offset.reset: earliest format: json @@ -28,6 +31,7 @@ application: # [object] Define job configuration env: name: example-kafka-to-print parallelism: 1 + shade.identifier: default pipeline: object-reuse: true topology: diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml index 503fa1b..1016560 100644 --- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml @@ -4,6 +4,9 @@ sources: properties: # [object] Kafka source properties topic: test_topic_error_json kafka.bootstrap.servers: kafkaCluster:9092 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 @@ -36,7 +36,7 @@ <maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version> <flatten-maven-plugin.version>1.3.0</flatten-maven-plugin.version> <maven-git-commit-id-plugin.version>4.0.4</maven-git-commit-id-plugin.version> - <testcontainer.version>1.19.6</testcontainer.version> + <testcontainer.version>1.20.0</testcontainer.version> <awaitility.version>4.2.0</awaitility.version> <spotless.version>2.40.0</spotless.version> <slf4j.version>1.7.25</slf4j.version> |
