diff options
| author | 窦凤虎 <[email protected]> | 2024-05-10 12:51:01 +0000 |
|---|---|---|
| committer | 窦凤虎 <[email protected]> | 2024-05-10 12:51:01 +0000 |
| commit | 515562bdb21b8a63c4b799fa89206e7262aa4f26 (patch) | |
| tree | 8f71892cd0ec92d447961a1d314d8677536b6c19 | |
| parent | 43f6bc60ec5636a8218e07fc82d2461ae2a66f5c (diff) | |
| parent | 4797ffd0910ef96d3c9975639e74b4749489b94b (diff) | |
Merge branch 'feature/e2e-test-kafka' into 'develop'
Feature/e2e test kafka
See merge request galaxy/platform/groot-stream!51
15 files changed, 336 insertions, 40 deletions
diff --git a/config/template/grootstream_job_debug.yaml b/config/template/grootstream_job_debug.yaml index a1a287d..f33b2b2 100644 --- a/config/template/grootstream_job_debug.yaml +++ b/config/template/grootstream_job_debug.yaml @@ -65,9 +65,8 @@ sources: filters: schema_type_filter: type: com.geedgenetworks.core.filter.AviatorFilter - output_fields: properties: - expression: event.decoded_as == 'SSL' || event.decoded_as == 'HTTP' || event.decoded_as == 'BASE' + expression: event.ip_protocol == 'tcp' preprocessing_pipelines: diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml index 58d0abc..e07344a 100644 --- a/config/template/grootstream_job_template.yaml +++ b/config/template/grootstream_job_template.yaml @@ -15,12 +15,47 @@ sources: # [object] Define connector source properties: # [object] Kafka source properties topic: SESSION-RECORD # [string] Topic Name, consumer will subscribe this topic. kafka.bootstrap.servers: 127.0.0.1:9092 # [string] Kafka Bootstrap Servers, if you have multiple servers, use comma to separate them. - kafka.session.timeout.ms: 60000 # [number] Kafka Session Timeout, default is 60000 + + # The minimum amount of data the server should return for a fetch request. + # If insufficient data is available the request will wait for that much data to accumulate before answering the request. + # The default setting of 1 byte means that fetch requests are answered as soon as that many byte(s) of data is available or the fetch request times out waiting for data to arrive. + # Setting this to a larger value will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency + kafka.fetch.min.bytes: 1 + + # The timeout used to detect client failures when using Kafka’s group management facility. + # The client sends periodic heartbeats to indicate its liveness to the broker. + # If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a rebalance. + # Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms. + kafka.session.timeout.ms: 60000 # [number] Kafka Session Timeout, default is 45 seconds + + # Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. + # The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. + # It can be adjusted even lower to control the expected time for normal rebalances. + kafka.heartbeat.interval.ms: 10000 # [number] The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. + + # The maximum number of records returned in a single call to poll(). + # Note, that max.poll.records does not impact the underlying fetching behavior. + # The consumer will cache the records from each fetch request and returns them incrementally from each poll. kafka.max.poll.records: 3000 + + # The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. + # If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. + # The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). See fetch.max.bytes for limiting the consumer request size. kafka.max.partition.fetch.bytes: 31457280 + + kafka.security.protocol: SSL + kafka.ssl.endpoint.identification.algorithm: "" + kafka.ssl.keystore.location: /data/tsg/olap/flink/topology/data/keystore.jks + kafka.ssl.keystore.password: 86cf0e2ffba3f541a6c6761313e5cc7e + kafka.ssl.truststore.location: /data/tsg/olap/flink/topology/data/truststore.jks + kafka.ssl.truststore.password: 86cf0e2ffba3f541a6c6761313e5cc7e + kafka.ssl.key.password: 86cf0e2ffba3f541a6c6761313e5cc7e + #kafka.security.protocol: SASL_PLAINTEXT + #kafka.sasl.mechanism: PLAIN + #kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: SESSION-RECORD-GROUP-GROOT-STREAM-001 # [string] Kafka Group ID for Consumer kafka.auto.offset.reset: latest # [string] Kafka Auto Offset Reset, default is latest - kafka.compression.type: snappy # [string] Kafka Compression Type, default is none format: json # [string] Data Format for Source. eg. json, protobuf, etc. json.ignore.parse.errors: false # [boolean] Flag to ignore parse errors, default will record the parse errors. If set true, it will ignore the parse errors. @@ -261,11 +296,22 @@ sinks: # [object] Define connector sink topic: SESSION-RECORD-A kafka.bootstrap.servers: 127.0.0.1:9092 kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + # The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. + # This helps performance on both the client and the server. This configuration controls the default batch size in bytes. kafka.batch.size: 262144 + # The total bytes of memory the producer can use to buffer records waiting to be sent to the server. + # If records are sent faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception. + # This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. + # Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests. kafka.buffer.memory: 134217728 kafka.max.request.size: 10485760 + # The compression type for all data generated by the producer. + # The default is none (i.e. no compression). Valid values are none, gzip, snappy, lz4, or zstd. + # Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression). kafka.compression.type: snappy format: json json.ignore.parse.errors: false diff --git a/config/udf.plugins b/config/udf.plugins index 8da4df5..348548e 100644 --- a/config/udf.plugins +++ b/config/udf.plugins @@ -1,9 +1,9 @@ com.geedgenetworks.core.udf.AsnLookup com.geedgenetworks.core.udf.CurrentUnixTimestamp com.geedgenetworks.core.udf.DecodeBase64 -com.geedgenetworks.core.udf.EncodeBase64 com.geedgenetworks.core.udf.Domain com.geedgenetworks.core.udf.Drop +com.geedgenetworks.core.udf.EncodeBase64 com.geedgenetworks.core.udf.Eval com.geedgenetworks.core.udf.FromUnixTimestamp com.geedgenetworks.core.udf.GenerateStringArray diff --git a/docs/connector/sink/kafka.md b/docs/connector/sink/kafka.md index e4d1a3f..682e163 100644 --- a/docs/connector/sink/kafka.md +++ b/docs/connector/sink/kafka.md @@ -1,7 +1,7 @@ # Kafka > Kafka sink connector ## Description -Sink connector for Apache Kafka. Write data to Kafka topic. +Sink connector for Apache Kafka. Write data to Kafka topic. More details about producer configs can be found [here](https://docs.confluent.io/platform/current/installation/configuration/producer-configs.htmls). ## Key Features diff --git a/docs/connector/source/kafka.md b/docs/connector/source/kafka.md index 52d541b..49871c4 100644 --- a/docs/connector/source/kafka.md +++ b/docs/connector/source/kafka.md @@ -1,7 +1,7 @@ # Kafka > Kafka source connector ## Description -Source connector for Apache Kafka +Source connector for Apache Kafka. More details about consumer configs can be found [here](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html). ## Source Options In order to use the Kafka connector, the following dependencies are required. They can be download by Nexus Maven Repository. diff --git a/docs/filter/aviator.md b/docs/filter/aviator.md index 08afc69..5fe7ed2 100644 --- a/docs/filter/aviator.md +++ b/docs/filter/aviator.md @@ -26,7 +26,7 @@ This example read data from inline source and print to console. It will filter t filter_operator: # [object] AviatorFilter operator name type: com.geedgenetworks.core.filter.AviatorFilter properties: - expression: event.server_ip == '8.8.8.8' # [string] Aviator expression, it return true or false. + expression: event.server_ip == '8.8.8.8' || event.decoded_as == 'HTTP' # [string] Aviator expression, it return true or false. sinks: print_sink: diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java index 5c51a3f..89d68e0 100644 --- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java +++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java @@ -38,10 +38,10 @@ public class JsonEventDeserializationSchema implements DeserializationSchema<Eve map = JSON.parseObject(message); } catch (Exception e) { if(ignoreParseErrors){ - LOG.error(String.format("json解析失败for:%s", message), e); + LOG.error(String.format("JSON Parse Errors:%s", message), e); return null; }else{ - throw new IOException(String.format("json解析失败for:%s", message), e); + throw new IOException(String.format("JSON Parse Errors:%s", message), e); } } diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java index 96b55d5..f40d2e2 100644 --- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java +++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java @@ -34,10 +34,8 @@ public class JsonToMapDataConverter implements Serializable { while (!reader.nextIfMatch('}') ){ String fieldName = reader.readFieldName(); ValueConverter converter = filedConverters.get(fieldName); - //System.out.println(fieldName); if(converter != null){ Object rst = converter.convert(reader); - //System.out.println(rst); obj.put(fieldName, rst); }else{ reader.skipValue(); @@ -46,12 +44,13 @@ public class JsonToMapDataConverter implements Serializable { return obj; } catch (Exception e) { - LOG.error(String.format("json解析失败for:%s", message), e); + + LOG.error(String.format("JSON Parse Errors:%s", message), e); if(ignoreParseErrors){ return null; } - throw new UnsupportedOperationException("json格式不正确:" + message); + throw new UnsupportedOperationException("Unsupported type or invalid data format:" + message); } } diff --git a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java index e3af025..09f8aca 100644 --- a/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java +++ b/groot-tests/test-e2e-kafka/src/test/java/com/geedgenetworks/test/e2e/kafka/KafkaConnectorTest.java @@ -1,9 +1,5 @@ package com.geedgenetworks.test.e2e.kafka; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.TypeReference; -import com.alibaba.nacos.client.naming.utils.CollectionUtils; -import com.geedgenetworks.common.Event; import com.geedgenetworks.core.types.StructType; import com.geedgenetworks.core.types.Types; import com.geedgenetworks.formats.json.JsonSerializer; @@ -12,6 +8,7 @@ import com.geedgenetworks.test.common.TestSuiteBase; import com.geedgenetworks.test.common.container.TestContainer; import com.geedgenetworks.test.common.container.TestContainerId; import com.geedgenetworks.test.common.junit.DisabledOnContainer; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.*; @@ -19,7 +16,6 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -31,10 +27,7 @@ import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.Container; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.shaded.com.google.common.base.CharMatcher; -import org.testcontainers.shaded.com.google.common.base.Strings; -import org.testcontainers.shaded.com.google.common.primitives.Chars; +import org.testcontainers.lifecycle.Startables;; import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerLoggerFactory; @@ -43,7 +36,6 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import static org.awaitility.Awaitility.await; @@ -86,19 +78,19 @@ public class KafkaConnectorTest extends TestSuiteBase implements TestResource { @TestTemplate - public void testSourceKafkaJsonToConsole(TestContainer container) { + public void testKafkaSource(TestContainer container) { generateTestData("test_topic_json", 0, 10); CompletableFuture.supplyAsync( () -> { try { - return container.executeJob("/kafka_source_to_console.yaml"); + return container.executeJob("/kafka_source.yaml"); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); } }); - await().atMost(300000, TimeUnit.MILLISECONDS) + await().atMost(60000, TimeUnit.MILLISECONDS) .untilAsserted( () -> { String logs = container.getServerLogs(); @@ -106,6 +98,109 @@ public class KafkaConnectorTest extends TestSuiteBase implements TestResource { }); } + @TestTemplate + public void testKafkaSourceErrorSchema(TestContainer container) { + generateTestData("test_topic_error_json", 0, 10); + CompletableFuture.supplyAsync( + () -> { + try { + Container.ExecResult execResult = container.executeJob("/kafka_source_error_schema.yaml"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + return execResult; + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + String logs = container.getServerLogs(); + Assertions.assertTrue(StringUtils.contains(logs, "NumberFormatException")); + }); + } + + @TestTemplate + public void testKafkaSink(TestContainer container) throws IOException, InterruptedException { + CompletableFuture.supplyAsync( + () -> { + try { + Container.ExecResult execResult = container.executeJob("/kafka_sink.yaml"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + return execResult; + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + + + List<String> data = Lists.newArrayList(); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + data.addAll(getKafkaConsumerListData("test_sink_topic")); + Assertions.assertEquals(10, data.size()); // Check if all 10 records are consumed + }); + + } + + @TestTemplate + public void testKafkaSinkHandleErrorJsonFormat(TestContainer container) throws IOException, InterruptedException { + CompletableFuture.supplyAsync( + () -> { + try { + Container.ExecResult execResult = container.executeJob("/kafka_sink_handle_error_json_format.yaml"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + return execResult; + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + + + List<String> data = Lists.newArrayList(); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + data.addAll(getKafkaConsumerListData("test_handle_error_json_format_topic")); + Assertions.assertTrue(StringUtils.contains(container.getServerLogs(), "UnsupportedOperationException")); + Assertions.assertEquals(0, data.size()); + }); + + + + } + + @TestTemplate + public void testKafkaSinkSkipErrorJsonFormat(TestContainer container) throws IOException, InterruptedException { + CompletableFuture.supplyAsync( + () -> { + try { + Container.ExecResult execResult = container.executeJob("/kafka_sink_skip_error_json_format.yaml"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + return execResult; + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + + + List<String> data = Lists.newArrayList(); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + data.addAll(getKafkaConsumerListData("test_skip_error_json_format_topic")); + Assertions.assertTrue(StringUtils.contains(container.getServerLogs(), "NullPointerException")); + Assertions.assertEquals(0, data.size()); + }); + } + + + private void generateTestData(String topic, int start, int end) { StructType dataType = Types.parseStructType("id: int, client_ip: string, server_ip: string, flag: string"); JsonSerializer serializer = new JsonSerializer(dataType); @@ -123,14 +218,6 @@ public class KafkaConnectorTest extends TestSuiteBase implements TestResource { } - @TestTemplate - public void testSinkKafka(TestContainer container) throws IOException, InterruptedException { - Container.ExecResult execResult = container.executeJob("/kafka_sink_inline_to_kafka.yaml"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - List<String> data = getKafkaConsumerListData("test_topic"); - Assertions.assertEquals(10, data.size()); // Check if all 10 records are consumed - } - @AfterAll @Override public void tearDown() throws Exception { @@ -149,6 +236,7 @@ public class KafkaConnectorTest extends TestSuiteBase implements TestResource { properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producer = new KafkaProducer<>(properties); } diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml index dbfbc1e..8f27a28 100644 --- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_inline_to_kafka.yaml +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink.yaml @@ -38,7 +38,7 @@ sinks: connector_kafka: type: kafka properties: - topic: test_topic + topic: test_sink_topic kafka.bootstrap.servers: kafkaCluster:9092 kafka.retries: 0 kafka.linger.ms: 10 diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml new file mode 100644 index 0000000..0fb0d3f --- /dev/null +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_handle_error_json_format.yaml @@ -0,0 +1,64 @@ +sources: # [object] Define connector source + inline_source: + type: inline + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: bigint + properties: + # + # [string] Event Data, it will be parsed to Map<String, Object> by the specified format. + # + data: '{"recv_time": 1705565615, "log_id":206211012872372224, "tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}' + format: json + interval.per.row: 1s + repeat.count: 10 + json.ignore.parse.errors: false + + +sinks: + connector_kafka: + type: kafka + properties: + topic: test_handle_error_json_format_topic + kafka.bootstrap.servers: kafkaCluster:9092 + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + format: json + log.failures.only: true + +application: # [object] Define job configuration + env: + name: example-inline-to-kafka + parallelism: 1 + shade.identifier: default + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [ connector_kafka ] + - name: connector_kafka + downstream: []
\ No newline at end of file diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml new file mode 100644 index 0000000..fdd7817 --- /dev/null +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_sink_skip_error_json_format.yaml @@ -0,0 +1,64 @@ +sources: # [object] Define connector source + inline_source: + type: inline + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: bigint + properties: + # + # [string] Event Data, it will be parsed to Map<String, Object> by the specified format. + # + data: '{"recv_time": 1705565615, "log_id":206211012872372224, "tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}' + format: json + interval.per.row: 1s + repeat.count: 10 + json.ignore.parse.errors: true + + +sinks: + connector_kafka: + type: kafka + properties: + topic: test_skip_error_json_format_topic + kafka.bootstrap.servers: kafkaCluster:9092 + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + format: json + log.failures.only: true + +application: # [object] Define job configuration + env: + name: example-inline-to-kafka + parallelism: 1 + shade.identifier: default + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [ connector_kafka ] + - name: connector_kafka + downstream: []
\ No newline at end of file diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml index 2afee47..0ae7258 100644 --- a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_to_console.yaml +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source.yaml @@ -30,9 +30,6 @@ application: # [object] Define job configuration parallelism: 1 pipeline: object-reuse: true - execution: - restart: - strategy: no topology: - name: kafka_source downstream: [print_sink] diff --git a/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml new file mode 100644 index 0000000..503fa1b --- /dev/null +++ b/groot-tests/test-e2e-kafka/src/test/resources/kafka_source_error_schema.yaml @@ -0,0 +1,39 @@ +sources: + kafka_source: + type : kafka + properties: # [object] Kafka source properties + topic: test_topic_error_json + kafka.bootstrap.servers: kafkaCluster:9092 + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.group.id: test_topic_error_json_group + kafka.auto.offset.reset: earliest + format: json + +sinks: # [object] Define connector sink + print_sink: + type: print + schema: + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: client_ip + type: string + - name: server_ip + type: bigint + properties: + mode: log_warn + format: json + json.ignore.parse.errors: false + + +application: # [object] Define job configuration + env: + name: example-kafka-to-print + parallelism: 1 + pipeline: + object-reuse: true + topology: + - name: kafka_source + downstream: [print_sink] + - name: print_sink + downstream: []
\ No newline at end of file @@ -23,7 +23,7 @@ </modules> <properties> - <revision>1.3.1</revision> + <revision>1.4.0-SNAPSHOT</revision> <java.version>11</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>${java.version}</maven.compiler.source> |
