diff options
| author | 窦凤虎 <[email protected]> | 2024-07-25 10:25:00 +0000 |
|---|---|---|
| committer | 窦凤虎 <[email protected]> | 2024-07-25 10:25:00 +0000 |
| commit | 708137a41c1806b4bc6925fc71b0a2892862d9ea (patch) | |
| tree | bccdcd67aabdd3f670f48309fd0de3e0ff20c897 | |
| parent | 8eaf2c973b21277cb613d5bd5418ef510cb72448 (diff) | |
| parent | b7dd745af52b972f86816a71d9b363296d484a3a (diff) | |
Merge branch 'feature/e2e-kafka-quota' into 'develop'
[Feature][e2e-kafka] Flink image add flink-shaded-hadoop-2-uber-2.7.5-8.0.jar....
See merge request galaxy/platform/groot-stream!82
24 files changed, 296 insertions, 58 deletions
diff --git a/config/template/mock_schema/object_statistics_mock_desc.json b/config/template/mock_schema/object_statistics_mock_desc.json index fb8590e..8542767 100644 --- a/config/template/mock_schema/object_statistics_mock_desc.json +++ b/config/template/mock_schema/object_statistics_mock_desc.json @@ -56,7 +56,7 @@ },{ "name": "bytes", "type": "Eval", - "expression": "in_bytes+ out_bytes" + "expression": "in_bytes+out_bytes" },{ "name": "new_in_sessions", "type": "Number", diff --git a/groot-core/pom.xml b/groot-core/pom.xml index 3243075..18ae33b 100644 --- a/groot-core/pom.xml +++ b/groot-core/pom.xml @@ -56,6 +56,8 @@ </exclusions> <classifier>optional</classifier> </dependency> + + <dependency> <groupId>com.geedgenetworks</groupId> <artifactId>groot-common</artifactId> diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java index be64fd9..2e4adcb 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java @@ -6,19 +6,18 @@ import com.geedgenetworks.common.config.KnowledgeBaseConfig; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.core.pojo.KnowLedgeBaseFileMeta; +import com.geedgenetworks.shaded.org.apache.http.HttpEntity; +import com.geedgenetworks.shaded.org.apache.http.client.methods.CloseableHttpResponse; +import com.geedgenetworks.shaded.org.apache.http.client.methods.HttpGet; +import com.geedgenetworks.shaded.org.apache.http.impl.client.CloseableHttpClient; +import com.geedgenetworks.shaded.org.apache.http.impl.client.HttpClients; +import com.geedgenetworks.shaded.org.apache.http.util.EntityUtils; import lombok.AllArgsConstructor; import lombok.Data; -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.file.Paths; import java.util.HashMap; import java.util.List; import java.util.Map; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java index d38097a..6ac292c 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java @@ -1,18 +1,14 @@ package com.geedgenetworks.core.udf.knowlegdebase.handler; import com.geedgenetworks.common.config.KnowledgeBaseConfig; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.core.pojo.KnowLedgeBaseFileMeta; import com.geedgenetworks.crypt.AESUtil; +import com.geedgenetworks.shaded.org.apache.http.impl.client.CloseableHttpClient; +import com.geedgenetworks.shaded.org.apache.http.impl.client.HttpClients; import lombok.Data; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.file.Paths; - /** * @author gujinkai * @version 1.0 diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java index 20defe3..cecdf98 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java @@ -2,15 +2,13 @@ package com.geedgenetworks.core.udf.knowlegdebase.handler; import com.alibaba.fastjson2.JSON; import com.geedgenetworks.common.config.KnowledgeBaseConfig; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.shaded.org.apache.http.HttpEntity; +import com.geedgenetworks.shaded.org.apache.http.client.methods.CloseableHttpResponse; +import com.geedgenetworks.shaded.org.apache.http.client.methods.HttpGet; +import com.geedgenetworks.shaded.org.apache.http.impl.client.CloseableHttpClient; +import com.geedgenetworks.shaded.org.apache.http.impl.client.HttpClients; +import com.geedgenetworks.shaded.org.apache.http.util.EntityUtils; import lombok.Data; -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java index dd2c710..56e4540 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java @@ -1,6 +1,8 @@ package com.geedgenetworks.core.utils; -import com.geedgenetworks.shaded.org.apache.http.*; +import com.geedgenetworks.shaded.org.apache.http.Header; +import com.geedgenetworks.shaded.org.apache.http.HttpEntity; +import com.geedgenetworks.shaded.org.apache.http.HttpStatus; import com.geedgenetworks.shaded.org.apache.http.client.methods.CloseableHttpResponse; import com.geedgenetworks.shaded.org.apache.http.client.methods.HttpGet; import com.geedgenetworks.shaded.org.apache.http.config.Registry; diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/session_record_mock_to_print.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/session_record_mock_to_print.yaml index f115643..e59360d 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/session_record_mock_to_print.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/session_record_mock_to_print.yaml @@ -115,7 +115,7 @@ application: # [object] Define job configuration object-reuse: true topology: - name: mock_source - downstream: [ print_sink ] + downstream: [ etl_processor ] - name: etl_processor downstream: [ print_sink ] - name: print_sink diff --git a/groot-shaded/http-client-shaded/pom.xml b/groot-shaded/http-client-shaded/pom.xml index 088509b..ced08a3 100644 --- a/groot-shaded/http-client-shaded/pom.xml +++ b/groot-shaded/http-client-shaded/pom.xml @@ -10,7 +10,7 @@ </parent> <artifactId>http-client-shaded</artifactId> - <name>Groot : Shaded : httpclient-shaded</name> + <name>Groot : Shaded : Http-client </name> <properties> @@ -42,7 +42,7 @@ <goal>shade</goal> </goals> <configuration> - <finalName>http-client-shaded</finalName> + <finalName>http-client-shaded-${httpclient.version}</finalName> <createSourcesJar>true</createSourcesJar> <shadeSourcesContent>true</shadeSourcesContent> <shadedArtifactAttached>false</shadedArtifactAttached> @@ -85,7 +85,7 @@ <configuration> <artifacts> <artifact> - <file>${basedir}/target/http-client-shaded.jar</file> + <file>${basedir}/target/http-client-shaded-${httpclient.version}.jar</file> <type>jar</type> <classifier>optional</classifier> </artifact> diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java index 54719bb..0f1e3f7 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java @@ -79,8 +79,8 @@ public abstract class AbstractTestContainer implements TestContainer { command.add(ContainerUtil.adaptPathForWin(binPath)); command.add("--config"); command.add(ContainerUtil.adaptPathForWin(confInContainerPath)); - /* command.add("--target"); - command.add("remote");*/ + command.add("--target"); + command.add("remote"); command.addAll(getExtraStartShellCommands()); return executeCommand(container, command); } diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java index a9a730e..30e6eb3 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java @@ -7,6 +7,7 @@ import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.images.PullPolicy; import org.testcontainers.lifecycle.Startables; import org.testcontainers.utility.DockerLoggerFactory; @@ -54,7 +55,8 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer { .waitingFor( new LogMessageWaitStrategy() .withRegEx(".*Starting the resource manager.*") - .withStartupTimeout(Duration.ofMinutes(2))); + .withStartupTimeout(Duration.ofMinutes(2))) + ; // Copy groot-stream bootstrap and some other files to the container copyGrootStreamStarterToContainer(jobManager); diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java index 58eee29..811bdf6 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java @@ -72,13 +72,6 @@ public final class ContainerUtil { // copy libs - String hbaseClientJar = "hbase-client-shaded-" + getProjectVersion() + ".jar"; - Path hbaseClientJarPath = - Paths.get(PROJECT_ROOT_PATH, "groot-shaded/hbase-client-shaded", "target", hbaseClientJar); - container.withCopyFileToContainer( - MountableFile.forHostPath(hbaseClientJarPath), - Paths.get(GrootStreamHomeInContainer, "lib", hbaseClientJar).toString()); - String formatJsonJar = "format-json-" + getProjectVersion() + ".jar"; Path formatJsonJarPath = @@ -94,6 +87,18 @@ public final class ContainerUtil { MountableFile.forHostPath(formatProtobufJarPath), Paths.get(GrootStreamHomeInContainer, "lib", formatProtobufJar).toString()); + String formatMsgpackJar = "format-msgpack-" + getProjectVersion() + ".jar"; + Path formatMsgpackJarPath = + Paths.get(PROJECT_ROOT_PATH, "groot-formats/format-msgpack", "target", formatMsgpackJar); + container.withCopyFileToContainer( MountableFile.forHostPath(formatMsgpackJarPath), + Paths.get(GrootStreamHomeInContainer, "lib", formatMsgpackJar).toString()); + + String formatRawJar = "format-raw-" + getProjectVersion() + ".jar"; + Path formatRawJarPath = + Paths.get(PROJECT_ROOT_PATH, "groot-formats/format-raw", "target", formatRawJar); + container.withCopyFileToContainer( MountableFile.forHostPath(formatRawJarPath), + Paths.get(GrootStreamHomeInContainer, "lib", formatRawJar).toString()); + //copy system config final String configPath = PROJECT_ROOT_PATH + "/config"; @@ -101,6 +106,13 @@ public final class ContainerUtil { container.withCopyFileToContainer(MountableFile.forHostPath(configPath), Paths.get(GrootStreamHomeInContainer, "config").toString()); + // copy grootstream.yaml + final String grootTestsCommonPath = PROJECT_ROOT_PATH + "/groot-tests/test-common/src/test/resources"; + checkPathExist(grootTestsCommonPath); + container.withCopyFileToContainer( + MountableFile.forHostPath(grootTestsCommonPath + "/grootstream.yaml"), + Paths.get(GrootStreamHomeInContainer, "config", "grootstream.yaml").toString()); + // copy bin final String startBinPath = startModulePath + File.separator + "src/main/bin/"; diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java index 01d2133..338c696 100644 --- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java +++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java @@ -40,7 +40,7 @@ public class Flink13Container extends AbstractTestFlinkContainer { @Override protected String getDockerImage() { - return "flink:1.13.1-scala_2.11-java11"; + return "192.168.40.153:8082/common/flink:1.13.1-scala_2.11-java11"; } diff --git a/groot-tests/test-common/src/test/resources/grootstream.yaml b/groot-tests/test-common/src/test/resources/grootstream.yaml new file mode 100644 index 0000000..5520945 --- /dev/null +++ b/groot-tests/test-common/src/test/resources/grootstream.yaml @@ -0,0 +1,17 @@ +grootstream: + knowledge_base: + - name: tsg_ip_asn + fs_type: local + fs_path: /tmp/grootstream/config/dat/ + files: + - asn_builtin.mmdb + - name: tsg_ip_location + fs_type: local + fs_path: /tmp/grootstream/config/dat/ + files: + - ip_builtin.mmdb + properties: + hos.path: http://192.168.44.12:9098/hos + hos.bucket.name.traffic_file: traffic_file_bucket + hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket + scheduler.knowledge_base.update.interval.minutes: 5 diff --git a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java index d01a514..56108c5 100644 --- a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java +++ b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java @@ -11,11 +11,14 @@ import com.geedgenetworks.test.common.junit.DisabledOnContainer; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -30,6 +33,7 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables;; import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerLoggerFactory; +import org.testcontainers.utility.MountableFile; import java.io.IOException; import java.time.Duration; @@ -51,8 +55,8 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.4.0"; private static final String KAFKA_HOST = "kafkaCluster"; private KafkaProducer<byte[], byte[]> producer; - private static final String DEFAULT_TEST_TOPIC_SOURCE = "test_topic_source"; + private static final String DEFAULT_TEST_TOPIC_CONSUME_GROUP = "test-consume-group"; @Override @BeforeAll @@ -60,10 +64,20 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)) .withNetwork(NETWORK) .withNetworkAliases(KAFKA_HOST) + .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true") + .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1") + .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT") + .withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN") + .withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "PLAIN") + .withEnv("KAFKA_LISTENER_NAME_BROKER_SASL_ENABLED_MECHANISMS", "PLAIN") + // .withEnv("KAFKA_AUTHORIZER_CLASS_NAME", "kafka.security.authorizer.AclAuthorizer") + .withEnv("KAFKA_SUPER_USERS", "User:admin") + .withEnv("KAFKA_OPTS", "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf") + .withCopyFileToContainer(MountableFile.forClasspathResource("kafka_server_jaas.conf"), "/etc/kafka/kafka_server_jaas.conf") + .withCopyFileToContainer(MountableFile.forClasspathResource("kafka_client_jass_cli.properties"), "/etc/kafka/kafka_client_jass_cli.properties") .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME))); Startables.deepStart(Stream.of(kafkaContainer)).join(); log.info("Kafka container started successfully"); - Awaitility.given() .ignoreExceptions() .atLeast(100, TimeUnit.MILLISECONDS) @@ -73,12 +87,12 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { log.info("Write 100 records to topic test_topic_source"); generateTestData(DEFAULT_TEST_TOPIC_SOURCE,0, 100); - } + } @TestTemplate - public void testKafkaSource(TestContainer container) { + public void testKafkaAsSourceConsume(TestContainer container) { generateTestData("test_topic_json", 0, 10); CompletableFuture.supplyAsync( () -> { @@ -99,7 +113,7 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { } @TestTemplate - public void testKafkaSourceErrorSchema(TestContainer container) { + public void testKafkaAsSourceConsumeErrorSchema(TestContainer container) { generateTestData("test_topic_error_json", 0, 10); CompletableFuture.supplyAsync( () -> { @@ -122,7 +136,7 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { } @TestTemplate - public void testKafkaSink(TestContainer container) throws IOException, InterruptedException { + public void testKafkaAsSink(TestContainer container) throws IOException, InterruptedException { CompletableFuture.supplyAsync( () -> { try { @@ -147,10 +161,41 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { } @TestTemplate - public void testKafkaSinkHandleErrorJsonFormat(TestContainer container) throws IOException, InterruptedException { + public void testKafkaAsSinkProducerQuota(TestContainer container) throws IOException, InterruptedException { + //Create topic with 3 partitions + executeShell("kafka-topics --create --topic SESSION-RECORD-QUOTA-TEST --bootstrap-server kafkaCluster:9092 --partitions 3 --replication-factor 1 --command-config /etc/kafka/kafka_client_jass_cli.properties"); + //Set producer quota to 5KB/s + executeShell("kafka-configs --bootstrap-server kafkaCluster:9092 --alter --add-config 'producer_byte_rate=5120' --entity-type users --entity-name admin --entity-type clients --entity-name SESSION-RECORD-QUOTA-TEST --command-config /etc/kafka/kafka_client_jass_cli.properties "); + CompletableFuture.supplyAsync( () -> { try { + Container.ExecResult execResult = container.executeJob("/kafka_producer_quota.yaml"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + return execResult; + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + + List<String> data = Lists.newArrayList(); + await().atMost(300000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + data.addAll(getKafkaConsumerListData("SESSION-RECORD-QUOTA-TEST")); + Assertions.assertTrue(StringUtils.contains(container.getServerLogs(), "TimeoutException") && data.size()>1000); + }); + + } + + + + @TestTemplate + public void testKafkaAsSinkHandleErrorJsonFormat(TestContainer container) throws IOException, InterruptedException { + CompletableFuture. supplyAsync( + () -> { + try { Container.ExecResult execResult = container.executeJob("/kafka_sink_handle_error_json_format.yaml"); Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); return execResult; @@ -236,20 +281,25 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - + properties.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); + properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"); producer = new KafkaProducer<>(properties); } - private Properties kafkaConsumerConfig() { - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consume-group"); - props.put( + private Properties kafkaConsumerConfig(String consumeGroup) { + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumeGroup); + properties.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); + properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"); + properties.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase()); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - return props; + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return properties; } private Properties kafkaByteConsumerConfig() { @@ -270,7 +320,7 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { private Map<String, String> getKafkaConsumerData(String topicName) { Map<String, String> data = new HashMap<>(); - try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { + try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig(DEFAULT_TEST_TOPIC_CONSUME_GROUP))) { consumer.subscribe(Arrays.asList(topicName)); Map<TopicPartition, Long> offsets = consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); @@ -292,7 +342,7 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { private List<String> getKafkaConsumerListData(String topicName) { List<String> data = new ArrayList<>(); - try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { + try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig(DEFAULT_TEST_TOPIC_CONSUME_GROUP))) { consumer.subscribe(Arrays.asList(topicName)); Map<TopicPartition, Long> offsets = consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); @@ -312,5 +362,14 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource { return data; } + private void executeShell(String command) { + try { + Container.ExecResult result = kafkaContainer.execInContainer("/bin/sh", "-c", command); + log.info("Execute shell command result: {},{}", result.getStdout(), result.getStderr()); + + } catch (Exception e) { + log.error("Execute shell command error: {}", e.getMessage()); + } + } } diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_client_jass_cli.properties b/groot-tests/test-e2e-kafka/src/test/resources/kafka_client_jass_cli.properties new file mode 100644 index 0000000..986cdb9 --- /dev/null +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_client_jass_cli.properties @@ -0,0 +1,3 @@ +security.protocol=SASL_PLAINTEXT +sasl.mechanism=PLAIN +sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
\ No newline at end of file diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml new file mode 100644 index 0000000..16d34a5 --- /dev/null +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml @@ -0,0 +1,120 @@ +sources: # [object] Define connector source + mock_source: + type: mock + properties: + mock.desc.file.path: /tmp/grootstream/config/template/mock_schema/session_record_mock_desc.json + rows.per.second: 100000 + +processing_pipelines: + etl_processor: + type: projection + functions: + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: 1 + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ __timestamp ] + output_fields: [ recv_time ] + parameters: + precision: seconds + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ session_id ] + parameters: + data_center_id_num: 2 + - function: EVAL + output_fields: [ ingestion_time ] + parameters: + value_expression: recv_time + + - function: DOMAIN + lookup_fields: [ http_host, ssl_sni, dtls_sni, quic_sni ] + output_fields: [ server_domain ] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + + - function: ASN_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_asn ] + parameters: + kb_name: tsg_ip_asn + option: IP_TO_ASN + + - function: ASN_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_asn ] + parameters: + kb_name: tsg_ip_asn + option: IP_TO_ASN + + - function: GEOIP_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [] + parameters: + kb_name: tsg_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: client_country + PROVINCE: client_super_administrative_area + CITY: client_administrative_area + + - function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [] + parameters: + kb_name: tsg_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: server_country + PROVINCE: server_super_administrative_area + CITY: server_administrative_area + + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: seconds + + +sinks: + print_sink: + type: print + properties: + mode: log_info + format: json + + kafka_sink: + type: kafka + properties: + topic: SESSION-RECORD-QUOTA-TEST + kafka.bootstrap.servers: kafkaCluster:9092 + kafka.client.id: SESSION-RECORD-QUOTA-TEST + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; + kafka.compression.type: snappy + format: json + json.ignore.parse.errors: false + log.failures.only: true + +application: # [object] Define job configuration + env: + name: kafka_producer_quota + parallelism: 1 + shade.identifier: default + pipeline: + object-reuse: true + topology: + - name: mock_source + downstream: [ etl_processor ] + - name: etl_processor + downstream: [ kafka_sink ] + - name: kafka_sink + downstream: []
\ No newline at end of file diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_server_jaas.conf b/groot-tests/test-e2e-kafka/src/test/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..cb4553f --- /dev/null +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_server_jaas.conf @@ -0,0 +1,8 @@ +KafkaServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="admin" + password="admin" + user_admin="admin" + user_firewall="admin" + user_olap="admin"; +}; diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml index 8f27a28..e12e76b 100644 --- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml @@ -40,6 +40,10 @@ sinks: properties: topic: test_sink_topic kafka.bootstrap.servers: kafkaCluster:9092 + kafka.client.id: test_sink_topic + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; kafka.retries: 0 kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml index 0fb0d3f..d65157a 100644 --- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml @@ -40,6 +40,9 @@ sinks: properties: topic: test_handle_error_json_format_topic kafka.bootstrap.servers: kafkaCluster:9092 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; kafka.retries: 0 kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml index fdd7817..d9cb80f 100644 --- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml @@ -40,6 +40,9 @@ sinks: properties: topic: test_skip_error_json_format_topic kafka.bootstrap.servers: kafkaCluster:9092 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; kafka.retries: 0 kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml index 0ae7258..3403ab9 100644 --- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml @@ -13,6 +13,9 @@ sources: kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; kafka.group.id: test_topic_json_group kafka.auto.offset.reset: earliest format: json @@ -28,6 +31,7 @@ application: # [object] Define job configuration env: name: example-kafka-to-print parallelism: 1 + shade.identifier: default pipeline: object-reuse: true topology: diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml index 503fa1b..1016560 100644 --- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml @@ -4,6 +4,9 @@ sources: properties: # [object] Kafka source properties topic: test_topic_error_json kafka.bootstrap.servers: kafkaCluster:9092 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 diff --git a/plugin-mapping.properties b/plugin-mapping.properties index c835941..4e2eb81 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -1,5 +1,7 @@ #Connectors -grootstream.source.Kafka = connector-kafka +grootstream.source.kafka = connector-kafka grootstream.sink.kafka = connector-kafka grootstream.source.clickhouse= connector-clickhouse grootstream.source.ipfix = connector-ipfix-collector +grootstream.source.mock = connector-mock +grootstream.source.file = connector-file
\ No newline at end of file @@ -36,7 +36,7 @@ <maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version> <flatten-maven-plugin.version>1.3.0</flatten-maven-plugin.version> <maven-git-commit-id-plugin.version>4.0.4</maven-git-commit-id-plugin.version> - <testcontainer.version>1.19.6</testcontainer.version> + <testcontainer.version>1.20.0</testcontainer.version> <awaitility.version>4.2.0</awaitility.version> <spotless.version>2.40.0</spotless.version> <slf4j.version>1.7.25</slf4j.version> @@ -748,6 +748,7 @@ <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> + <exclude>module-info.class</exclude> </excludes> </filter> </filters> |
