summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author窦凤虎 <[email protected]>2024-07-25 10:25:00 +0000
committer窦凤虎 <[email protected]>2024-07-25 10:25:00 +0000
commit708137a41c1806b4bc6925fc71b0a2892862d9ea (patch)
treebccdcd67aabdd3f670f48309fd0de3e0ff20c897
parent8eaf2c973b21277cb613d5bd5418ef510cb72448 (diff)
parentb7dd745af52b972f86816a71d9b363296d484a3a (diff)
Merge branch 'feature/e2e-kafka-quota' into 'develop'
[Feature][e2e-kafka] Flink image add flink-shaded-hadoop-2-uber-2.7.5-8.0.jar.... See merge request galaxy/platform/groot-stream!82
-rw-r--r--config/template/mock_schema/object_statistics_mock_desc.json2
-rw-r--r--groot-core/pom.xml2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java13
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java4
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/session_record_mock_to_print.yaml2
-rw-r--r--groot-shaded/http-client-shaded/pom.xml6
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java4
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java4
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java26
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java2
-rw-r--r--groot-tests/test-common/src/test/resources/grootstream.yaml17
-rw-r--r--groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java95
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_client_jass_cli.properties3
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml120
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_server_jaas.conf8
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml4
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml3
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml3
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml4
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml3
-rw-r--r--plugin-mapping.properties4
-rw-r--r--pom.xml3
24 files changed, 296 insertions, 58 deletions
diff --git a/config/template/mock_schema/object_statistics_mock_desc.json b/config/template/mock_schema/object_statistics_mock_desc.json
index fb8590e..8542767 100644
--- a/config/template/mock_schema/object_statistics_mock_desc.json
+++ b/config/template/mock_schema/object_statistics_mock_desc.json
@@ -56,7 +56,7 @@
},{
"name": "bytes",
"type": "Eval",
- "expression": "in_bytes+ out_bytes"
+ "expression": "in_bytes+out_bytes"
},{
"name": "new_in_sessions",
"type": "Number",
diff --git a/groot-core/pom.xml b/groot-core/pom.xml
index 3243075..18ae33b 100644
--- a/groot-core/pom.xml
+++ b/groot-core/pom.xml
@@ -56,6 +56,8 @@
</exclusions>
<classifier>optional</classifier>
</dependency>
+
+
<dependency>
<groupId>com.geedgenetworks</groupId>
<artifactId>groot-common</artifactId>
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java
index be64fd9..2e4adcb 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java
@@ -6,19 +6,18 @@ import com.geedgenetworks.common.config.KnowledgeBaseConfig;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.core.pojo.KnowLedgeBaseFileMeta;
+import com.geedgenetworks.shaded.org.apache.http.HttpEntity;
+import com.geedgenetworks.shaded.org.apache.http.client.methods.CloseableHttpResponse;
+import com.geedgenetworks.shaded.org.apache.http.client.methods.HttpGet;
+import com.geedgenetworks.shaded.org.apache.http.impl.client.CloseableHttpClient;
+import com.geedgenetworks.shaded.org.apache.http.impl.client.HttpClients;
+import com.geedgenetworks.shaded.org.apache.http.util.EntityUtils;
import lombok.AllArgsConstructor;
import lombok.Data;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java
index d38097a..6ac292c 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java
@@ -1,18 +1,14 @@
package com.geedgenetworks.core.udf.knowlegdebase.handler;
import com.geedgenetworks.common.config.KnowledgeBaseConfig;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.core.pojo.KnowLedgeBaseFileMeta;
import com.geedgenetworks.crypt.AESUtil;
+import com.geedgenetworks.shaded.org.apache.http.impl.client.CloseableHttpClient;
+import com.geedgenetworks.shaded.org.apache.http.impl.client.HttpClients;
import lombok.Data;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.file.Paths;
-
/**
* @author gujinkai
* @version 1.0
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java
index 20defe3..cecdf98 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java
@@ -2,15 +2,13 @@ package com.geedgenetworks.core.udf.knowlegdebase.handler;
import com.alibaba.fastjson2.JSON;
import com.geedgenetworks.common.config.KnowledgeBaseConfig;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.shaded.org.apache.http.HttpEntity;
+import com.geedgenetworks.shaded.org.apache.http.client.methods.CloseableHttpResponse;
+import com.geedgenetworks.shaded.org.apache.http.client.methods.HttpGet;
+import com.geedgenetworks.shaded.org.apache.http.impl.client.CloseableHttpClient;
+import com.geedgenetworks.shaded.org.apache.http.impl.client.HttpClients;
+import com.geedgenetworks.shaded.org.apache.http.util.EntityUtils;
import lombok.Data;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java
index dd2c710..56e4540 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/HttpClientPoolUtil.java
@@ -1,6 +1,8 @@
package com.geedgenetworks.core.utils;
-import com.geedgenetworks.shaded.org.apache.http.*;
+import com.geedgenetworks.shaded.org.apache.http.Header;
+import com.geedgenetworks.shaded.org.apache.http.HttpEntity;
+import com.geedgenetworks.shaded.org.apache.http.HttpStatus;
import com.geedgenetworks.shaded.org.apache.http.client.methods.CloseableHttpResponse;
import com.geedgenetworks.shaded.org.apache.http.client.methods.HttpGet;
import com.geedgenetworks.shaded.org.apache.http.config.Registry;
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/session_record_mock_to_print.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/session_record_mock_to_print.yaml
index f115643..e59360d 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/session_record_mock_to_print.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/session_record_mock_to_print.yaml
@@ -115,7 +115,7 @@ application: # [object] Define job configuration
object-reuse: true
topology:
- name: mock_source
- downstream: [ print_sink ]
+ downstream: [ etl_processor ]
- name: etl_processor
downstream: [ print_sink ]
- name: print_sink
diff --git a/groot-shaded/http-client-shaded/pom.xml b/groot-shaded/http-client-shaded/pom.xml
index 088509b..ced08a3 100644
--- a/groot-shaded/http-client-shaded/pom.xml
+++ b/groot-shaded/http-client-shaded/pom.xml
@@ -10,7 +10,7 @@
</parent>
<artifactId>http-client-shaded</artifactId>
- <name>Groot : Shaded : httpclient-shaded</name>
+ <name>Groot : Shaded : Http-client </name>
<properties>
@@ -42,7 +42,7 @@
<goal>shade</goal>
</goals>
<configuration>
- <finalName>http-client-shaded</finalName>
+ <finalName>http-client-shaded-${httpclient.version}</finalName>
<createSourcesJar>true</createSourcesJar>
<shadeSourcesContent>true</shadeSourcesContent>
<shadedArtifactAttached>false</shadedArtifactAttached>
@@ -85,7 +85,7 @@
<configuration>
<artifacts>
<artifact>
- <file>${basedir}/target/http-client-shaded.jar</file>
+ <file>${basedir}/target/http-client-shaded-${httpclient.version}.jar</file>
<type>jar</type>
<classifier>optional</classifier>
</artifact>
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java
index 54719bb..0f1e3f7 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestContainer.java
@@ -79,8 +79,8 @@ public abstract class AbstractTestContainer implements TestContainer {
command.add(ContainerUtil.adaptPathForWin(binPath));
command.add("--config");
command.add(ContainerUtil.adaptPathForWin(confInContainerPath));
- /* command.add("--target");
- command.add("remote");*/
+ command.add("--target");
+ command.add("remote");
command.addAll(getExtraStartShellCommands());
return executeCommand(container, command);
}
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
index a9a730e..30e6eb3 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
@@ -7,6 +7,7 @@ import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.images.PullPolicy;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;
@@ -54,7 +55,8 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
.waitingFor(
new LogMessageWaitStrategy()
.withRegEx(".*Starting the resource manager.*")
- .withStartupTimeout(Duration.ofMinutes(2)));
+ .withStartupTimeout(Duration.ofMinutes(2)))
+ ;
// Copy groot-stream bootstrap and some other files to the container
copyGrootStreamStarterToContainer(jobManager);
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
index 58eee29..811bdf6 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/ContainerUtil.java
@@ -72,13 +72,6 @@ public final class ContainerUtil {
// copy libs
- String hbaseClientJar = "hbase-client-shaded-" + getProjectVersion() + ".jar";
- Path hbaseClientJarPath =
- Paths.get(PROJECT_ROOT_PATH, "groot-shaded/hbase-client-shaded", "target", hbaseClientJar);
- container.withCopyFileToContainer(
- MountableFile.forHostPath(hbaseClientJarPath),
- Paths.get(GrootStreamHomeInContainer, "lib", hbaseClientJar).toString());
-
String formatJsonJar = "format-json-" + getProjectVersion() + ".jar";
Path formatJsonJarPath =
@@ -94,6 +87,18 @@ public final class ContainerUtil {
MountableFile.forHostPath(formatProtobufJarPath),
Paths.get(GrootStreamHomeInContainer, "lib", formatProtobufJar).toString());
+ String formatMsgpackJar = "format-msgpack-" + getProjectVersion() + ".jar";
+ Path formatMsgpackJarPath =
+ Paths.get(PROJECT_ROOT_PATH, "groot-formats/format-msgpack", "target", formatMsgpackJar);
+ container.withCopyFileToContainer( MountableFile.forHostPath(formatMsgpackJarPath),
+ Paths.get(GrootStreamHomeInContainer, "lib", formatMsgpackJar).toString());
+
+ String formatRawJar = "format-raw-" + getProjectVersion() + ".jar";
+ Path formatRawJarPath =
+ Paths.get(PROJECT_ROOT_PATH, "groot-formats/format-raw", "target", formatRawJar);
+ container.withCopyFileToContainer( MountableFile.forHostPath(formatRawJarPath),
+ Paths.get(GrootStreamHomeInContainer, "lib", formatRawJar).toString());
+
//copy system config
final String configPath = PROJECT_ROOT_PATH + "/config";
@@ -101,6 +106,13 @@ public final class ContainerUtil {
container.withCopyFileToContainer(MountableFile.forHostPath(configPath),
Paths.get(GrootStreamHomeInContainer, "config").toString());
+ // copy grootstream.yaml
+ final String grootTestsCommonPath = PROJECT_ROOT_PATH + "/groot-tests/test-common/src/test/resources";
+ checkPathExist(grootTestsCommonPath);
+ container.withCopyFileToContainer(
+ MountableFile.forHostPath(grootTestsCommonPath + "/grootstream.yaml"),
+ Paths.get(GrootStreamHomeInContainer, "config", "grootstream.yaml").toString());
+
// copy bin
final String startBinPath = startModulePath + File.separator + "src/main/bin/";
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java
index 01d2133..338c696 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/Flink13Container.java
@@ -40,7 +40,7 @@ public class Flink13Container extends AbstractTestFlinkContainer {
@Override
protected String getDockerImage() {
- return "flink:1.13.1-scala_2.11-java11";
+ return "192.168.40.153:8082/common/flink:1.13.1-scala_2.11-java11";
}
diff --git a/groot-tests/test-common/src/test/resources/grootstream.yaml b/groot-tests/test-common/src/test/resources/grootstream.yaml
new file mode 100644
index 0000000..5520945
--- /dev/null
+++ b/groot-tests/test-common/src/test/resources/grootstream.yaml
@@ -0,0 +1,17 @@
+grootstream:
+ knowledge_base:
+ - name: tsg_ip_asn
+ fs_type: local
+ fs_path: /tmp/grootstream/config/dat/
+ files:
+ - asn_builtin.mmdb
+ - name: tsg_ip_location
+ fs_type: local
+ fs_path: /tmp/grootstream/config/dat/
+ files:
+ - ip_builtin.mmdb
+ properties:
+ hos.path: http://192.168.44.12:9098/hos
+ hos.bucket.name.traffic_file: traffic_file_bucket
+ hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket
+ scheduler.knowledge_base.update.interval.minutes: 5
diff --git a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java
index d01a514..56108c5 100644
--- a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java
+++ b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorIT.java
@@ -11,11 +11,14 @@ import com.geedgenetworks.test.common.junit.DisabledOnContainer;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -30,6 +33,7 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
import java.io.IOException;
import java.time.Duration;
@@ -51,8 +55,8 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.4.0";
private static final String KAFKA_HOST = "kafkaCluster";
private KafkaProducer<byte[], byte[]> producer;
-
private static final String DEFAULT_TEST_TOPIC_SOURCE = "test_topic_source";
+ private static final String DEFAULT_TEST_TOPIC_CONSUME_GROUP = "test-consume-group";
@Override
@BeforeAll
@@ -60,10 +64,20 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
.withNetwork(NETWORK)
.withNetworkAliases(KAFKA_HOST)
+ .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
+ .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
+ .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT")
+ .withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN")
+ .withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "PLAIN")
+ .withEnv("KAFKA_LISTENER_NAME_BROKER_SASL_ENABLED_MECHANISMS", "PLAIN")
+ // .withEnv("KAFKA_AUTHORIZER_CLASS_NAME", "kafka.security.authorizer.AclAuthorizer")
+ .withEnv("KAFKA_SUPER_USERS", "User:admin")
+ .withEnv("KAFKA_OPTS", "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf")
+ .withCopyFileToContainer(MountableFile.forClasspathResource("kafka_server_jaas.conf"), "/etc/kafka/kafka_server_jaas.conf")
+ .withCopyFileToContainer(MountableFile.forClasspathResource("kafka_client_jass_cli.properties"), "/etc/kafka/kafka_client_jass_cli.properties")
.withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
Startables.deepStart(Stream.of(kafkaContainer)).join();
log.info("Kafka container started successfully");
-
Awaitility.given()
.ignoreExceptions()
.atLeast(100, TimeUnit.MILLISECONDS)
@@ -73,12 +87,12 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
log.info("Write 100 records to topic test_topic_source");
generateTestData(DEFAULT_TEST_TOPIC_SOURCE,0, 100);
- }
+ }
@TestTemplate
- public void testKafkaSource(TestContainer container) {
+ public void testKafkaAsSourceConsume(TestContainer container) {
generateTestData("test_topic_json", 0, 10);
CompletableFuture.supplyAsync(
() -> {
@@ -99,7 +113,7 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
}
@TestTemplate
- public void testKafkaSourceErrorSchema(TestContainer container) {
+ public void testKafkaAsSourceConsumeErrorSchema(TestContainer container) {
generateTestData("test_topic_error_json", 0, 10);
CompletableFuture.supplyAsync(
() -> {
@@ -122,7 +136,7 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
}
@TestTemplate
- public void testKafkaSink(TestContainer container) throws IOException, InterruptedException {
+ public void testKafkaAsSink(TestContainer container) throws IOException, InterruptedException {
CompletableFuture.supplyAsync(
() -> {
try {
@@ -147,10 +161,41 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
}
@TestTemplate
- public void testKafkaSinkHandleErrorJsonFormat(TestContainer container) throws IOException, InterruptedException {
+ public void testKafkaAsSinkProducerQuota(TestContainer container) throws IOException, InterruptedException {
+ //Create topic with 3 partitions
+ executeShell("kafka-topics --create --topic SESSION-RECORD-QUOTA-TEST --bootstrap-server kafkaCluster:9092 --partitions 3 --replication-factor 1 --command-config /etc/kafka/kafka_client_jass_cli.properties");
+ //Set producer quota to 5KB/s
+ executeShell("kafka-configs --bootstrap-server kafkaCluster:9092 --alter --add-config 'producer_byte_rate=5120' --entity-type users --entity-name admin --entity-type clients --entity-name SESSION-RECORD-QUOTA-TEST --command-config /etc/kafka/kafka_client_jass_cli.properties ");
+
CompletableFuture.supplyAsync(
() -> {
try {
+ Container.ExecResult execResult = container.executeJob("/kafka_producer_quota.yaml");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ return execResult;
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+
+ List<String> data = Lists.newArrayList();
+ await().atMost(300000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ data.addAll(getKafkaConsumerListData("SESSION-RECORD-QUOTA-TEST"));
+ Assertions.assertTrue(StringUtils.contains(container.getServerLogs(), "TimeoutException") && data.size()>1000);
+ });
+
+ }
+
+
+
+ @TestTemplate
+ public void testKafkaAsSinkHandleErrorJsonFormat(TestContainer container) throws IOException, InterruptedException {
+ CompletableFuture. supplyAsync(
+ () -> {
+ try {
Container.ExecResult execResult = container.executeJob("/kafka_sink_handle_error_json_format.yaml");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
return execResult;
@@ -236,20 +281,25 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-
+ properties.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+ properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";");
producer = new KafkaProducer<>(properties);
}
- private Properties kafkaConsumerConfig() {
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consume-group");
- props.put(
+ private Properties kafkaConsumerConfig(String consumeGroup) {
+ Properties properties = new Properties();
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumeGroup);
+ properties.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+ properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";");
+ properties.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.toString().toLowerCase());
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- return props;
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ return properties;
}
private Properties kafkaByteConsumerConfig() {
@@ -270,7 +320,7 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
private Map<String, String> getKafkaConsumerData(String topicName) {
Map<String, String> data = new HashMap<>();
- try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) {
+ try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig(DEFAULT_TEST_TOPIC_CONSUME_GROUP))) {
consumer.subscribe(Arrays.asList(topicName));
Map<TopicPartition, Long> offsets =
consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
@@ -292,7 +342,7 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
private List<String> getKafkaConsumerListData(String topicName) {
List<String> data = new ArrayList<>();
- try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) {
+ try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig(DEFAULT_TEST_TOPIC_CONSUME_GROUP))) {
consumer.subscribe(Arrays.asList(topicName));
Map<TopicPartition, Long> offsets =
consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
@@ -312,5 +362,14 @@ public class KafkaConnectorIT extends TestSuiteBase implements TestResource {
return data;
}
+ private void executeShell(String command) {
+ try {
+ Container.ExecResult result = kafkaContainer.execInContainer("/bin/sh", "-c", command);
+ log.info("Execute shell command result: {},{}", result.getStdout(), result.getStderr());
+
+ } catch (Exception e) {
+ log.error("Execute shell command error: {}", e.getMessage());
+ }
+ }
}
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_client_jass_cli.properties b/groot-tests/test-e2e-kafka/src/test/resources/kafka_client_jass_cli.properties
new file mode 100644
index 0000000..986cdb9
--- /dev/null
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_client_jass_cli.properties
@@ -0,0 +1,3 @@
+security.protocol=SASL_PLAINTEXT
+sasl.mechanism=PLAIN
+sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; \ No newline at end of file
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml
new file mode 100644
index 0000000..16d34a5
--- /dev/null
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_producer_quota.yaml
@@ -0,0 +1,120 @@
+sources: # [object] Define connector source
+ mock_source:
+ type: mock
+ properties:
+ mock.desc.file.path: /tmp/grootstream/config/template/mock_schema/session_record_mock_desc.json
+ rows.per.second: 100000
+
+processing_pipelines:
+ etl_processor:
+ type: projection
+ functions:
+ - function: SNOWFLAKE_ID
+ lookup_fields: ['']
+ output_fields: [log_id]
+ parameters:
+ data_center_id_num: 1
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ __timestamp ]
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+ - function: SNOWFLAKE_ID
+ lookup_fields: [ '' ]
+ output_fields: [ session_id ]
+ parameters:
+ data_center_id_num: 2
+ - function: EVAL
+ output_fields: [ ingestion_time ]
+ parameters:
+ value_expression: recv_time
+
+ - function: DOMAIN
+ lookup_fields: [ http_host, ssl_sni, dtls_sni, quic_sni ]
+ output_fields: [ server_domain ]
+ parameters:
+ option: FIRST_SIGNIFICANT_SUBDOMAIN
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_asn ]
+ parameters:
+ kb_name: tsg_ip_asn
+ option: IP_TO_ASN
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_asn ]
+ parameters:
+ kb_name: tsg_ip_asn
+ option: IP_TO_ASN
+
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: []
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: client_country
+ PROVINCE: client_super_administrative_area
+ CITY: client_administrative_area
+
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: []
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: server_country
+ PROVINCE: server_super_administrative_area
+ CITY: server_administrative_area
+
+
+ - function: CURRENT_UNIX_TIMESTAMP
+ output_fields: [ processing_time ]
+ parameters:
+ precision: seconds
+
+
+sinks:
+ print_sink:
+ type: print
+ properties:
+ mode: log_info
+ format: json
+
+ kafka_sink:
+ type: kafka
+ properties:
+ topic: SESSION-RECORD-QUOTA-TEST
+ kafka.bootstrap.servers: kafkaCluster:9092
+ kafka.client.id: SESSION-RECORD-QUOTA-TEST
+ kafka.linger.ms: 10
+ kafka.request.timeout.ms: 30000
+ kafka.batch.size: 262144
+ kafka.buffer.memory: 134217728
+ kafka.max.request.size: 10485760
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
+ kafka.compression.type: snappy
+ format: json
+ json.ignore.parse.errors: false
+ log.failures.only: true
+
+application: # [object] Define job configuration
+ env:
+ name: kafka_producer_quota
+ parallelism: 1
+ shade.identifier: default
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: mock_source
+ downstream: [ etl_processor ]
+ - name: etl_processor
+ downstream: [ kafka_sink ]
+ - name: kafka_sink
+ downstream: [] \ No newline at end of file
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_server_jaas.conf b/groot-tests/test-e2e-kafka/src/test/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..cb4553f
--- /dev/null
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_server_jaas.conf
@@ -0,0 +1,8 @@
+KafkaServer {
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ username="admin"
+ password="admin"
+ user_admin="admin"
+ user_firewall="admin"
+ user_olap="admin";
+};
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml
index 8f27a28..e12e76b 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml
@@ -40,6 +40,10 @@ sinks:
properties:
topic: test_sink_topic
kafka.bootstrap.servers: kafkaCluster:9092
+ kafka.client.id: test_sink_topic
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml
index 0fb0d3f..d65157a 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml
@@ -40,6 +40,9 @@ sinks:
properties:
topic: test_handle_error_json_format_topic
kafka.bootstrap.servers: kafkaCluster:9092
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml
index fdd7817..d9cb80f 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml
@@ -40,6 +40,9 @@ sinks:
properties:
topic: test_skip_error_json_format_topic
kafka.bootstrap.servers: kafkaCluster:9092
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml
index 0ae7258..3403ab9 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml
@@ -13,6 +13,9 @@ sources:
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
kafka.group.id: test_topic_json_group
kafka.auto.offset.reset: earliest
format: json
@@ -28,6 +31,7 @@ application: # [object] Define job configuration
env:
name: example-kafka-to-print
parallelism: 1
+ shade.identifier: default
pipeline:
object-reuse: true
topology:
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml
index 503fa1b..1016560 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml
@@ -4,6 +4,9 @@ sources:
properties: # [object] Kafka source properties
topic: test_topic_error_json
kafka.bootstrap.servers: kafkaCluster:9092
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index c835941..4e2eb81 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -1,5 +1,7 @@
#Connectors
-grootstream.source.Kafka = connector-kafka
+grootstream.source.kafka = connector-kafka
grootstream.sink.kafka = connector-kafka
grootstream.source.clickhouse= connector-clickhouse
grootstream.source.ipfix = connector-ipfix-collector
+grootstream.source.mock = connector-mock
+grootstream.source.file = connector-file \ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 24a994a..5e3e128 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,7 +36,7 @@
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
<flatten-maven-plugin.version>1.3.0</flatten-maven-plugin.version>
<maven-git-commit-id-plugin.version>4.0.4</maven-git-commit-id-plugin.version>
- <testcontainer.version>1.19.6</testcontainer.version>
+ <testcontainer.version>1.20.0</testcontainer.version>
<awaitility.version>4.2.0</awaitility.version>
<spotless.version>2.40.0</spotless.version>
<slf4j.version>1.7.25</slf4j.version>
@@ -748,6 +748,7 @@
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
+ <exclude>module-info.class</exclude>
</excludes>
</filter>
</filters>