summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author窦凤虎 <[email protected]>2024-05-10 12:51:01 +0000
committer窦凤虎 <[email protected]>2024-05-10 12:51:01 +0000
commit515562bdb21b8a63c4b799fa89206e7262aa4f26 (patch)
tree8f71892cd0ec92d447961a1d314d8677536b6c19
parent43f6bc60ec5636a8218e07fc82d2461ae2a66f5c (diff)
parent4797ffd0910ef96d3c9975639e74b4749489b94b (diff)
Merge branch 'feature/e2e-test-kafka' into 'develop'
Feature/e2e test kafka See merge request galaxy/platform/groot-stream!51
-rw-r--r--config/template/grootstream_job_debug.yaml3
-rw-r--r--config/template/grootstream_job_template.yaml50
-rw-r--r--config/udf.plugins2
-rw-r--r--docs/connector/sink/kafka.md2
-rw-r--r--docs/connector/source/kafka.md2
-rw-r--r--docs/filter/aviator.md2
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java4
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java7
-rw-r--r--groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java130
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml (renamed from groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml)2
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml64
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml64
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml (renamed from groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml)3
-rw-r--r--groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml39
-rw-r--r--pom.xml2
15 files changed, 336 insertions, 40 deletions
diff --git a/config/template/grootstream_job_debug.yaml b/config/template/grootstream_job_debug.yaml
index a1a287d..f33b2b2 100644
--- a/config/template/grootstream_job_debug.yaml
+++ b/config/template/grootstream_job_debug.yaml
@@ -65,9 +65,8 @@ sources:
filters:
schema_type_filter:
type: com.geedgenetworks.core.filter.AviatorFilter
- output_fields:
properties:
- expression: event.decoded_as == 'SSL' || event.decoded_as == 'HTTP' || event.decoded_as == 'BASE'
+ expression: event.ip_protocol == 'tcp'
preprocessing_pipelines:
diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml
index 58d0abc..e07344a 100644
--- a/config/template/grootstream_job_template.yaml
+++ b/config/template/grootstream_job_template.yaml
@@ -15,12 +15,47 @@ sources: # [object] Define connector source
properties: # [object] Kafka source properties
topic: SESSION-RECORD # [string] Topic Name, consumer will subscribe this topic.
kafka.bootstrap.servers: 127.0.0.1:9092 # [string] Kafka Bootstrap Servers, if you have multiple servers, use comma to separate them.
- kafka.session.timeout.ms: 60000 # [number] Kafka Session Timeout, default is 60000
+
+ # The minimum amount of data the server should return for a fetch request.
+ # If insufficient data is available the request will wait for that much data to accumulate before answering the request.
+ # The default setting of 1 byte means that fetch requests are answered as soon as that many byte(s) of data is available or the fetch request times out waiting for data to arrive.
+ # Setting this to a larger value will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency
+ kafka.fetch.min.bytes: 1
+
+ # The timeout used to detect client failures when using Kafka’s group management facility.
+ # The client sends periodic heartbeats to indicate its liveness to the broker.
+ # If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a rebalance.
+ # Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.
+ kafka.session.timeout.ms: 60000 # [number] Kafka Session Timeout, default is 45 seconds
+
+ # Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group.
+ # The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value.
+ # It can be adjusted even lower to control the expected time for normal rebalances.
+ kafka.heartbeat.interval.ms: 10000 # [number] The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities.
+
+ # The maximum number of records returned in a single call to poll().
+ # Note, that max.poll.records does not impact the underlying fetching behavior.
+ # The consumer will cache the records from each fetch request and returns them incrementally from each poll.
kafka.max.poll.records: 3000
+
+ # The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer.
+ # If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress.
+ # The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). See fetch.max.bytes for limiting the consumer request size.
kafka.max.partition.fetch.bytes: 31457280
+
+ kafka.security.protocol: SSL
+ kafka.ssl.endpoint.identification.algorithm: ""
+ kafka.ssl.keystore.location: /data/tsg/olap/flink/topology/data/keystore.jks
+ kafka.ssl.keystore.password: 86cf0e2ffba3f541a6c6761313e5cc7e
+ kafka.ssl.truststore.location: /data/tsg/olap/flink/topology/data/truststore.jks
+ kafka.ssl.truststore.password: 86cf0e2ffba3f541a6c6761313e5cc7e
+ kafka.ssl.key.password: 86cf0e2ffba3f541a6c6761313e5cc7e
+ #kafka.security.protocol: SASL_PLAINTEXT
+ #kafka.sasl.mechanism: PLAIN
+ #kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
+
kafka.group.id: SESSION-RECORD-GROUP-GROOT-STREAM-001 # [string] Kafka Group ID for Consumer
kafka.auto.offset.reset: latest # [string] Kafka Auto Offset Reset, default is latest
- kafka.compression.type: snappy # [string] Kafka Compression Type, default is none
format: json # [string] Data Format for Source. eg. json, protobuf, etc.
json.ignore.parse.errors: false # [boolean] Flag to ignore parse errors, default will record the parse errors. If set true, it will ignore the parse errors.
@@ -261,11 +296,22 @@ sinks: # [object] Define connector sink
topic: SESSION-RECORD-A
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.retries: 0
+
kafka.linger.ms: 10
+
kafka.request.timeout.ms: 30000
+ # The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition.
+ # This helps performance on both the client and the server. This configuration controls the default batch size in bytes.
kafka.batch.size: 262144
+ # The total bytes of memory the producer can use to buffer records waiting to be sent to the server.
+ # If records are sent faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception.
+ # This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering.
+ # Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
+ # The compression type for all data generated by the producer.
+ # The default is none (i.e. no compression). Valid values are none, gzip, snappy, lz4, or zstd.
+ # Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).
kafka.compression.type: snappy
format: json
json.ignore.parse.errors: false
diff --git a/config/udf.plugins b/config/udf.plugins
index 8da4df5..348548e 100644
--- a/config/udf.plugins
+++ b/config/udf.plugins
@@ -1,9 +1,9 @@
com.geedgenetworks.core.udf.AsnLookup
com.geedgenetworks.core.udf.CurrentUnixTimestamp
com.geedgenetworks.core.udf.DecodeBase64
-com.geedgenetworks.core.udf.EncodeBase64
com.geedgenetworks.core.udf.Domain
com.geedgenetworks.core.udf.Drop
+com.geedgenetworks.core.udf.EncodeBase64
com.geedgenetworks.core.udf.Eval
com.geedgenetworks.core.udf.FromUnixTimestamp
com.geedgenetworks.core.udf.GenerateStringArray
diff --git a/docs/connector/sink/kafka.md b/docs/connector/sink/kafka.md
index e4d1a3f..682e163 100644
--- a/docs/connector/sink/kafka.md
+++ b/docs/connector/sink/kafka.md
@@ -1,7 +1,7 @@
# Kafka
> Kafka sink connector
## Description
-Sink connector for Apache Kafka. Write data to Kafka topic.
+Sink connector for Apache Kafka. Write data to Kafka topic. More details about producer configs can be found [here](https://docs.confluent.io/platform/current/installation/configuration/producer-configs.htmls).
## Key Features
diff --git a/docs/connector/source/kafka.md b/docs/connector/source/kafka.md
index 52d541b..49871c4 100644
--- a/docs/connector/source/kafka.md
+++ b/docs/connector/source/kafka.md
@@ -1,7 +1,7 @@
# Kafka
> Kafka source connector
## Description
-Source connector for Apache Kafka
+Source connector for Apache Kafka. More details about consumer configs can be found [here](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html).
## Source Options
In order to use the Kafka connector, the following dependencies are required. They can be download by Nexus Maven Repository.
diff --git a/docs/filter/aviator.md b/docs/filter/aviator.md
index 08afc69..5fe7ed2 100644
--- a/docs/filter/aviator.md
+++ b/docs/filter/aviator.md
@@ -26,7 +26,7 @@ This example read data from inline source and print to console. It will filter t
filter_operator: # [object] AviatorFilter operator name
type: com.geedgenetworks.core.filter.AviatorFilter
properties:
- expression: event.server_ip == '8.8.8.8' # [string] Aviator expression, it return true or false.
+ expression: event.server_ip == '8.8.8.8' || event.decoded_as == 'HTTP' # [string] Aviator expression, it return true or false.
sinks:
print_sink:
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java
index 5c51a3f..89d68e0 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java
@@ -38,10 +38,10 @@ public class JsonEventDeserializationSchema implements DeserializationSchema<Eve
map = JSON.parseObject(message);
} catch (Exception e) {
if(ignoreParseErrors){
- LOG.error(String.format("json解析失败for:%s", message), e);
+ LOG.error(String.format("JSON Parse Errors:%s", message), e);
return null;
}else{
- throw new IOException(String.format("json解析失败for:%s", message), e);
+ throw new IOException(String.format("JSON Parse Errors:%s", message), e);
}
}
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java
index 96b55d5..f40d2e2 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java
@@ -34,10 +34,8 @@ public class JsonToMapDataConverter implements Serializable {
while (!reader.nextIfMatch('}') ){
String fieldName = reader.readFieldName();
ValueConverter converter = filedConverters.get(fieldName);
- //System.out.println(fieldName);
if(converter != null){
Object rst = converter.convert(reader);
- //System.out.println(rst);
obj.put(fieldName, rst);
}else{
reader.skipValue();
@@ -46,12 +44,13 @@ public class JsonToMapDataConverter implements Serializable {
return obj;
} catch (Exception e) {
- LOG.error(String.format("json解析失败for:%s", message), e);
+
+ LOG.error(String.format("JSON Parse Errors:%s", message), e);
if(ignoreParseErrors){
return null;
}
- throw new UnsupportedOperationException("json格式不正确:" + message);
+ throw new UnsupportedOperationException("Unsupported type or invalid data format:" + message);
}
}
diff --git a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java
index e3af025..09f8aca 100644
--- a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java
+++ b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java
@@ -1,9 +1,5 @@
package com.geedgenetworks.test.e2e.kafka;
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.TypeReference;
-import com.alibaba.nacos.client.naming.utils.CollectionUtils;
-import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.types.StructType;
import com.geedgenetworks.core.types.Types;
import com.geedgenetworks.formats.json.JsonSerializer;
@@ -12,6 +8,7 @@ import com.geedgenetworks.test.common.TestSuiteBase;
import com.geedgenetworks.test.common.container.TestContainer;
import com.geedgenetworks.test.common.container.TestContainerId;
import com.geedgenetworks.test.common.junit.DisabledOnContainer;
+import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.*;
@@ -19,7 +16,6 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -31,10 +27,7 @@ import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.com.google.common.base.CharMatcher;
-import org.testcontainers.shaded.com.google.common.base.Strings;
-import org.testcontainers.shaded.com.google.common.primitives.Chars;
+import org.testcontainers.lifecycle.Startables;;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;
@@ -43,7 +36,6 @@ import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import static org.awaitility.Awaitility.await;
@@ -86,19 +78,19 @@ public class KafkaConnectorTest extends TestSuiteBase implements TestResource {
@TestTemplate
- public void testSourceKafkaJsonToConsole(TestContainer container) {
+ public void testKafkaSource(TestContainer container) {
generateTestData("test_topic_json", 0, 10);
CompletableFuture.supplyAsync(
() -> {
try {
- return container.executeJob("/kafka_source_to_console.yaml");
+ return container.executeJob("/kafka_source.yaml");
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
});
- await().atMost(300000, TimeUnit.MILLISECONDS)
+ await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
String logs = container.getServerLogs();
@@ -106,6 +98,109 @@ public class KafkaConnectorTest extends TestSuiteBase implements TestResource {
});
}
+ @TestTemplate
+ public void testKafkaSourceErrorSchema(TestContainer container) {
+ generateTestData("test_topic_error_json", 0, 10);
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ Container.ExecResult execResult = container.executeJob("/kafka_source_error_schema.yaml");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ return execResult;
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ String logs = container.getServerLogs();
+ Assertions.assertTrue(StringUtils.contains(logs, "NumberFormatException"));
+ });
+ }
+
+ @TestTemplate
+ public void testKafkaSink(TestContainer container) throws IOException, InterruptedException {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ Container.ExecResult execResult = container.executeJob("/kafka_sink.yaml");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ return execResult;
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+
+
+ List<String> data = Lists.newArrayList();
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ data.addAll(getKafkaConsumerListData("test_sink_topic"));
+ Assertions.assertEquals(10, data.size()); // Check if all 10 records are consumed
+ });
+
+ }
+
+ @TestTemplate
+ public void testKafkaSinkHandleErrorJsonFormat(TestContainer container) throws IOException, InterruptedException {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ Container.ExecResult execResult = container.executeJob("/kafka_sink_handle_error_json_format.yaml");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ return execResult;
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+
+
+ List<String> data = Lists.newArrayList();
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ data.addAll(getKafkaConsumerListData("test_handle_error_json_format_topic"));
+ Assertions.assertTrue(StringUtils.contains(container.getServerLogs(), "UnsupportedOperationException"));
+ Assertions.assertEquals(0, data.size());
+ });
+
+
+
+ }
+
+ @TestTemplate
+ public void testKafkaSinkSkipErrorJsonFormat(TestContainer container) throws IOException, InterruptedException {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ Container.ExecResult execResult = container.executeJob("/kafka_sink_skip_error_json_format.yaml");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ return execResult;
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+
+
+ List<String> data = Lists.newArrayList();
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ data.addAll(getKafkaConsumerListData("test_skip_error_json_format_topic"));
+ Assertions.assertTrue(StringUtils.contains(container.getServerLogs(), "NullPointerException"));
+ Assertions.assertEquals(0, data.size());
+ });
+ }
+
+
+
private void generateTestData(String topic, int start, int end) {
StructType dataType = Types.parseStructType("id: int, client_ip: string, server_ip: string, flag: string");
JsonSerializer serializer = new JsonSerializer(dataType);
@@ -123,14 +218,6 @@ public class KafkaConnectorTest extends TestSuiteBase implements TestResource {
}
- @TestTemplate
- public void testSinkKafka(TestContainer container) throws IOException, InterruptedException {
- Container.ExecResult execResult = container.executeJob("/kafka_sink_inline_to_kafka.yaml");
- Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
- List<String> data = getKafkaConsumerListData("test_topic");
- Assertions.assertEquals(10, data.size()); // Check if all 10 records are consumed
- }
-
@AfterAll
@Override
public void tearDown() throws Exception {
@@ -149,6 +236,7 @@ public class KafkaConnectorTest extends TestSuiteBase implements TestResource {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+
producer = new KafkaProducer<>(properties);
}
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml
index dbfbc1e..8f27a28 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml
@@ -38,7 +38,7 @@ sinks:
connector_kafka:
type: kafka
properties:
- topic: test_topic
+ topic: test_sink_topic
kafka.bootstrap.servers: kafkaCluster:9092
kafka.retries: 0
kafka.linger.ms: 10
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml
new file mode 100644
index 0000000..0fb0d3f
--- /dev/null
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml
@@ -0,0 +1,64 @@
+sources: # [object] Define connector source
+ inline_source:
+ type: inline
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: bigint
+ properties:
+ #
+ # [string] Event Data, it will be parsed to Map<String, Object> by the specified format.
+ #
+ data: '{"recv_time": 1705565615, "log_id":206211012872372224, "tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}'
+ format: json
+ interval.per.row: 1s
+ repeat.count: 10
+ json.ignore.parse.errors: false
+
+
+sinks:
+ connector_kafka:
+ type: kafka
+ properties:
+ topic: test_handle_error_json_format_topic
+ kafka.bootstrap.servers: kafkaCluster:9092
+ kafka.retries: 0
+ kafka.linger.ms: 10
+ kafka.request.timeout.ms: 30000
+ kafka.batch.size: 262144
+ kafka.buffer.memory: 134217728
+ kafka.max.request.size: 10485760
+ kafka.compression.type: snappy
+ format: json
+ log.failures.only: true
+
+application: # [object] Define job configuration
+ env:
+ name: example-inline-to-kafka
+ parallelism: 1
+ shade.identifier: default
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: inline_source
+ downstream: [ connector_kafka ]
+ - name: connector_kafka
+ downstream: [] \ No newline at end of file
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml
new file mode 100644
index 0000000..fdd7817
--- /dev/null
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml
@@ -0,0 +1,64 @@
+sources: # [object] Define connector source
+ inline_source:
+ type: inline
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: bigint
+ properties:
+ #
+ # [string] Event Data, it will be parsed to Map<String, Object> by the specified format.
+ #
+ data: '{"recv_time": 1705565615, "log_id":206211012872372224, "tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}'
+ format: json
+ interval.per.row: 1s
+ repeat.count: 10
+ json.ignore.parse.errors: true
+
+
+sinks:
+ connector_kafka:
+ type: kafka
+ properties:
+ topic: test_skip_error_json_format_topic
+ kafka.bootstrap.servers: kafkaCluster:9092
+ kafka.retries: 0
+ kafka.linger.ms: 10
+ kafka.request.timeout.ms: 30000
+ kafka.batch.size: 262144
+ kafka.buffer.memory: 134217728
+ kafka.max.request.size: 10485760
+ kafka.compression.type: snappy
+ format: json
+ log.failures.only: true
+
+application: # [object] Define job configuration
+ env:
+ name: example-inline-to-kafka
+ parallelism: 1
+ shade.identifier: default
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: inline_source
+ downstream: [ connector_kafka ]
+ - name: connector_kafka
+ downstream: [] \ No newline at end of file
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml
index 2afee47..0ae7258 100644
--- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml
@@ -30,9 +30,6 @@ application: # [object] Define job configuration
parallelism: 1
pipeline:
object-reuse: true
- execution:
- restart:
- strategy: no
topology:
- name: kafka_source
downstream: [print_sink]
diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml
new file mode 100644
index 0000000..503fa1b
--- /dev/null
+++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml
@@ -0,0 +1,39 @@
+sources:
+ kafka_source:
+ type : kafka
+ properties: # [object] Kafka source properties
+ topic: test_topic_error_json
+ kafka.bootstrap.servers: kafkaCluster:9092
+ kafka.session.timeout.ms: 60000
+ kafka.max.poll.records: 3000
+ kafka.max.partition.fetch.bytes: 31457280
+ kafka.group.id: test_topic_error_json_group
+ kafka.auto.offset.reset: earliest
+ format: json
+
+sinks: # [object] Define connector sink
+ print_sink:
+ type: print
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: bigint
+ properties:
+ mode: log_warn
+ format: json
+ json.ignore.parse.errors: false
+
+
+application: # [object] Define job configuration
+ env:
+ name: example-kafka-to-print
+ parallelism: 1
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: kafka_source
+ downstream: [print_sink]
+ - name: print_sink
+ downstream: [] \ No newline at end of file
diff --git a/pom.xml b/pom.xml
index ac97694..de8a18c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
</modules>
<properties>
- <revision>1.3.1</revision>
+ <revision>1.4.0-SNAPSHOT</revision>
<java.version>11</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>${java.version}</maven.compiler.source>