summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-10-20 11:46:32 +0800
committerdoufenghu <[email protected]>2024-10-20 11:46:32 +0800
commit031224fe43961cd1df2c7b0239c6f813e765c105 (patch)
tree3f8736907c1e98d0475ca5bdfcd7a33a21b1df20
parent9f51ce8d96879aa5c383ac34bac543ad6fe3ed44 (diff)
[Improve][e2e-base-test] Integrate multiple types of processors into the test topology.
-rw-r--r--docs/grootstream-design-cn.md5
-rw-r--r--docs/processor/udaf.md33
-rw-r--r--docs/processor/udf.md14
-rw-r--r--docs/processor/udtf.md56
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java1
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml4
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java4
-rw-r--r--groot-tests/test-common/src/test/resources/grootstream.yaml7
-rw-r--r--groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java64
-rw-r--r--groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml192
10 files changed, 324 insertions, 56 deletions
diff --git a/docs/grootstream-design-cn.md b/docs/grootstream-design-cn.md
index 28121a3..41fcd0d 100644
--- a/docs/grootstream-design-cn.md
+++ b/docs/grootstream-design-cn.md
@@ -117,6 +117,7 @@ grootstream:
token: <vault-token>
default_key_path: <default-vault-key-path>
plugin_key_path: <plugin-vault-key-path>
+
ssl: ## SSL/TLS 客户端链接配置
skip_verification: true # 忽略SSL证书校验
private_key_path: /path/to/certs/worker.key # 客户端私钥文件路径
@@ -2084,7 +2085,7 @@ Parameters:
```yaml
# 将一个应用层协议按层级进行拆分,应用层协议由协议解析路径和应用组成。
-- function: JSON_UNROLL
+- function: PATH_UNROLL
lookup_fields: [ decoded_path, app]
output_fields: [ protocol_stack_id, app_name ]
parameters:
@@ -2114,7 +2115,7 @@ Parameters:
#Event5: {"app_name":"ssl.port_444","protocol_stack_id":"ETHERNET.IPv4.TCP.ssl.ssl.port_444"}
#只有路径参数的场景(或者上例中文件字段值为null).
-- function: JSON_UNROLL
+- function: PATH_UNROLL
lookup_fields: [ decoded_path]
output_fields: [ protocol_stack_id]
parameters:
diff --git a/docs/processor/udaf.md b/docs/processor/udaf.md
index dd1dd70..66d6ad5 100644
--- a/docs/processor/udaf.md
+++ b/docs/processor/udaf.md
@@ -41,7 +41,7 @@ COLLECT_LIST is used to collect the value of the field in the group of events.
- lookup_fields: required. Now only support one field.
- output_fields: optional. If not set, the output field name is `lookup_field_name`.
-### Example
+Example:
```yaml
- function: COLLECT_LIST
@@ -59,7 +59,7 @@ COLLECT_SET is used to collect the unique value of the field in the group of eve
- lookup_fields: required. Now only support one field.
- output_fields: optional. If not set, the output field name is `lookup_field_name`.
-### Example
+Example
```yaml
- function: COLLECT_SET
@@ -76,7 +76,7 @@ FIRST_VALUE is used to get the first value of the field in the group of events.
- lookup_fields: required. Now only support one field.
- output_fields: optional. If not set, the output field name is `lookup_field_name`.
-### Example
+Example
```yaml
- function: FIRST_VALUE
@@ -92,7 +92,7 @@ LAST_VALUE is used to get the last value of the field in the group of events.
- lookup_fields: required. Now only support one field.
- output_fields: optional. If not set, the output field name is `lookup_field_name`.
-### Example
+Example
```yaml
- function: LAST_VALUE
@@ -109,7 +109,7 @@ LONG_COUNT is used to count the number of events in the group of events.
- lookup_fields: optional.
- output_fields: required.
-### Example
+Example
```yaml
- function: LONG_COUNT
@@ -127,7 +127,7 @@ MEAN is used to calculate the mean value of the field in the group of events. Th
- parameters: optional.
- precision: `<Integer>` required. The precision of the mean value. Default is 2.
-### Example
+Example
```yaml
- function: MEAN
@@ -144,7 +144,7 @@ NUMBER_SUM is used to sum the value of the field in the group of events. The loo
- lookup_fields: required. Now only support one field.
- output_fields: optional. If not set, the output field name is `lookup_field_name`.
-### Example
+Example
```yaml
- function: NUMBER_SUM
@@ -164,7 +164,8 @@ hlld is a high-performance C server which is used to expose HyperLogLog sets and
- precision: `<Integer>` optional. The precision of the hlld value. Default is 12.
- output_format: `<String>` optional. The output format can be either `base64(encoded string)` or `binary(byte[])`. The default is `base64`.
-### Example
+Example
+
Merge multiple string field into a HyperLogLog data structure.
```yaml
- function: HLLD
@@ -194,8 +195,8 @@ Approx Count Distinct HLLD is used to count the approximate number of distinct v
- input_type: `<String>` optional. Refer to `HLLD` function.
- precision: `<Integer>` optional. Refer to `HLLD` function.
-### Example
-
+Example
+
```yaml
- function: APPROX_COUNT_DISTINCT_HLLD
lookup_fields: [client_ip]
@@ -228,8 +229,8 @@ A High Dynamic Range (HDR) Histogram. More details can be found in [HDR Histogra
- autoResize: `<Boolean>` optional. If true, the highestTrackableValue will auto-resize. Default is true.
- output_format: `<String>` optional. The output format can be either `base64(encoded string)` or `binary(byte[])`. The default is `base64`.
-### Example
-
+Example
+
```yaml
- function: HDR_HISTOGRAM
lookup_fields: [latency_ms]
@@ -264,8 +265,8 @@ Approx Quantile HDR is used to calculate the approximate quantile value of the f
- autoResize: `<Boolean>` optional. Refer to `HDR_HISTOGRAM` function.
- probability: `<Double>` optional. The probability of the quantile. Default is 0.5.
-### Example
-
+Example
+
```yaml
- function: APPROX_QUANTILE_HDR
lookup_fields: [latency_ms]
@@ -301,8 +302,8 @@ Approx Quantiles HDR is used to calculate the approximate quantile values of the
- autoResize: `<Boolean>` optional. Refer to `HDR_HISTOGRAM` function.
- probabilities: `<Array<Double>>` required. The list of probabilities of the quantiles. Range is 0 to 1.
-### Example
-
+Example
+
```yaml
- function: APPROX_QUANTILES_HDR
lookup_fields: [latency_ms]
diff --git a/docs/processor/udf.md b/docs/processor/udf.md
index 3298374..9ba93e9 100644
--- a/docs/processor/udf.md
+++ b/docs/processor/udf.md
@@ -201,17 +201,18 @@ If the value of `direction` is `69`, the value of `internal_ip` will be `client_
- function: EVAL
output_fields: [internal_ip]
parameters:
- value_expression: 'direction=69 ? client_ip : server_ip'
+ value_expression: "direction=69 ? client_ip : server_ip"
```
### Flatten
-Flatten the fields of nested structure to the top level. The new fields name are named using the field name prefixed with the names of the struct fields to reach it, separated by dots as default.
+Flatten the fields of nested structure to the top level. The new fields name are named using the field name prefixed with the names of the struct fields to reach it, separated by dots as default. The original fields will be removed.
```FLATTEN(filter, lookup_fields, output_fields[, parameters])```
+
- filter: optional
- lookup_fields: optional
-- output_fields: not required
+- output_fields: not required.
- parameters: optional
- prefix: `<String>` optional. Prefix string for flattened field names. Default is empty.
- depth: `<Integer>` optional. Number representing the nested levels to consider for flattening. Minimum 1. Default is `5`.
@@ -255,6 +256,7 @@ Output:
From unix timestamp function is used to convert the unix timestamp to date time string. The default time zone is UTC+0.
```FROM_UNIX_TIMESTAMP(filter, lookup_fields, output_fields[, parameters])```
+
- filter: optional
- lookup_fields: required
- output_fields: required
@@ -427,7 +429,7 @@ Remove the prefix "tags_" from the field names and rename the field "timestamp_m
```yaml
- function: RENAME
-- parameters:
+ parameters:
rename_fields:
timestamp_ms: recv_time_ms
rename_expression: key=string.replace_all(key,'tags_',''); return key;
@@ -440,7 +442,7 @@ Rename the field `client_ip` to `source_ip`, including the fields under the `enc
```yaml
- function: RENAME
-- parameters:
+ parameters:
parent_fields: [encapsulation.ipv4]
rename_fields:
client_ip: source_ip
@@ -509,7 +511,7 @@ Unix timestamp converter function is used to convert the unix timestamp precisio
- parameters: required
- precision: `<String>` required. Enum: `milliseconds`, `seconds`, `minutes`. The minutes precision is used to generate Unix timestamp, round it to the minute level, and output it in seconds format.
- Example:
-_`__timestamp` Internal field, from source ingestion time or current unix timestamp.
+ `__timestamp` Internal field, from source ingestion time or current unix timestamp.
```yaml
- function: UNIX_TIMESTAMP_CONVERTER
diff --git a/docs/processor/udtf.md b/docs/processor/udtf.md
index a6e8444..65a7840 100644
--- a/docs/processor/udtf.md
+++ b/docs/processor/udtf.md
@@ -29,8 +29,8 @@ The Unroll Function handles an array field—or an expression evaluating to an a
- parameters: optional
- regex: `<String>` optional. If lookup_fields is a string, the regex parameter is used to split the string into an array. The default value is a comma.
-#### Example
-
+Example
+
```yaml
functions:
- function: UNROLL
@@ -50,8 +50,8 @@ The JSON Unroll Function handles a JSON object, unrolls/explodes an array of obj
- path: `<String>` optional. Path to array to unroll, default is the root of the JSON object.
- new_path: `<String>` optional. Rename path to new_path, default is the same as path.
-#### Example
-
+Example
+
```yaml
functions:
- function: JSON_UNROLL
@@ -62,5 +62,53 @@ functions:
- new_path: tag
```
+### Path Unroll
+
+The PATH_UNROLL function processes a given file path, breaking it down into individual steps and transforming each step into a separate event while retaining top-level fields. At the final level, it outputs both the full file path and the file name.
+
+```PATH_UNROLL(filter, lookup_fields, output_fields[, parameters])```
+
+- filter: optional
+- lookup_fields: required
+- output_fields: required
+- parameters: optional
+ - separator: <String> optional. The delimiter used to split the path. Default is `/`.
+
+Example Usage:
+
+```yaml
+- function: PATH_UNROLL
+ lookup_fields: [ decoded_path, app]
+ output_fields: [ protocol_stack_id, app_name ]
+ parameters:
+ separator: "."
+```
+Input:
+
+```json
+{"decoded_path":"ETHERNET.IPv4.TCP.ssl","app":"wechat"}
+```
+When the input is processed, the following events are generated:
+```
+ #Event1: {"protocol_stack_id":"ETHERNET"}
+ #Event2: {"protocol_stack_id":"ETHERNET.IPv4"}
+ #Event3: {"protocol_stack_id":"ETHERNET.IPv4.TCP"}
+ #Event4: {"protocol_stack_id":"ETHERNET.IPv4.TCP.ssl"}
+ #Event5: {"app_name":"wechat","protocol_stack_id":"ETHERNET.IPv4.TCP.ssl.wechat"}
+```
+
+If decoded_path contains app value of `ETHERNET.IPv4.TCP.ssl`, the output will be as follows:
+```json
+{"decoded_path":"ETHERNET.IPv4.TCP.ssl","app":"ssl"}
+```
+In this case, the output will be:
+```
+ #Event1: {"protocol_stack_id":"ETHERNET"}
+ #Event2: {"protocol_stack_id":"ETHERNET.IPv4"}
+ #Event3: {"protocol_stack_id":"ETHERNET.IPv4.TCP"}
+ #Event4: {"protocol_stack_id":"ETHERNET.IPv4.TCP.ssl", "app_name":"ssl"}
+```
+
+
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java
index a1927db..a0b9ce5 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java
@@ -117,6 +117,7 @@ public class AsnKnowledgeBaseHandler extends AbstractKnowledgeBaseHandler {
}
} catch (Exception e) {
+ log.error("Current class path {}", this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath());
log.error("File {} operation failed. {} ", knowledgeBaseConfig.getFiles().get(i), e.getMessage());
return false;
}
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
index 8736aee..fd5c035 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
@@ -104,12 +104,12 @@ processing_pipelines:
device_tag: renamed_device_tag
- function: UUIDv5
- lookup_fields: [ client_ip, server_ip ] # 基于 client_ip, server_ip的值组成UUIDv5 name 参数值与命名空间结合后,通过哈希生成唯一的 UUID。
+ lookup_fields: [ client_ip, server_ip ]
output_fields: [ ip_uuid ]
parameters:
namespace: NAMESPACE_IP
- function: UUIDv7
- output_fields: [ log_uuid_v7 ] # 生成基于时间戳和随机数的 UUID
+ output_fields: [ log_uuid_v7 ]
- function: UUID
output_fields: [ log_uuid ]
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
index b833115..4ac3d03 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
@@ -81,6 +81,10 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
".*Successful registration at resource manager.*")
.withStartupTimeout(Duration.ofMinutes(2)));
+ // Copy groot-stream bootstrap and some other files to the container
+ copyGrootStreamStarterToContainer(taskManager);
+ copyGrootStreamStarterLoggingToContainer(taskManager);
+
Startables.deepStart(Stream.of(jobManager)).join();
Startables.deepStart(Stream.of(taskManager)).join();
// execute extra commands
diff --git a/groot-tests/test-common/src/test/resources/grootstream.yaml b/groot-tests/test-common/src/test/resources/grootstream.yaml
index 2eb105b..0def444 100644
--- a/groot-tests/test-common/src/test/resources/grootstream.yaml
+++ b/groot-tests/test-common/src/test/resources/grootstream.yaml
@@ -11,11 +11,4 @@ grootstream:
files:
- ip_builtin.mmdb
properties:
- hos.path: http://192.168.44.12:9098/hos
- hos.bucket.name.traffic_file: traffic_file_bucket
- hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket
scheduler.knowledge_base.update.interval.minutes: 5
- hos.bucket.name.rtp_file: traffic_rtp_file_bucket
- hos.bucket.name.http_file: traffic_http_file_bucket
- hos.bucket.name.eml_file: traffic_eml_file_bucket
- hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java
index 1c1e777..fdba36f 100644
--- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java
+++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java
@@ -27,30 +27,31 @@ import static org.awaitility.Awaitility.await;
disabledReason = "Only flink adjusts the parameter configuration rules")
public class InlineToPrintIT extends TestSuiteBase {
+
@TestTemplate
- public void testInlineToPrint(AbstractTestFlinkContainer container) throws IOException, InterruptedException {
+ public void testJobExecution(AbstractTestFlinkContainer container) throws IOException, InterruptedException {
CompletableFuture.supplyAsync(
() -> {
try {
- List<String> variables = List.of(
- "hos.bucket.name.rtp_file=cli_job_level_traffic_rtp_file_bucket",
- "hos.bucket.name.http_file=cli_job_level_traffic_http_file_bucket");
- return container.executeJob("/inline_to_print.yaml", variables);
+ return container.executeJob("/inline_to_print.yaml");
} catch (Exception e) {
- log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
});
AtomicReference<String> taskMangerID = new AtomicReference<>();
+
await().atMost(300000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Map<String, Object> taskMangerInfo = JSON.parseObject(container.executeJobManagerInnerCommand(
"curl http://localhost:8081/taskmanagers"), new TypeReference<Map<String, Object>>() {
});
+
+ @SuppressWarnings("unchecked")
List<Map<String, Object>> taskManagers =
(List<Map<String, Object>>) taskMangerInfo.get("taskmanagers");
+
if (!CollectionUtils.isEmpty(taskManagers)) {
taskMangerID.set(taskManagers.get(0).get("id").toString());
}
@@ -64,6 +65,7 @@ public class InlineToPrintIT extends TestSuiteBase {
Map<String, Object> jobInfo = JSON.parseObject(container.executeJobManagerInnerCommand(
"curl http://localhost:8081/jobs/overview"), new TypeReference<Map<String, Object>>() {
});
+ @SuppressWarnings("unchecked")
List<Map<String, Object>> jobs =
(List<Map<String, Object>>) jobInfo.get("jobs");
if (!CollectionUtils.isEmpty(jobs)) {
@@ -71,6 +73,7 @@ public class InlineToPrintIT extends TestSuiteBase {
}
Assertions.assertNotNull(jobId.get());
});
+
//Obtain job metrics
AtomicReference<List<Map<String, Object>>> jobNumRestartsReference = new AtomicReference<>();
await().atMost(60000, TimeUnit.MILLISECONDS)
@@ -78,8 +81,8 @@ public class InlineToPrintIT extends TestSuiteBase {
() -> {
Thread.sleep(5000);
String result = container.executeJobManagerInnerCommand(
- String.format(
- "curl http://localhost:8081/jobs/%s/metrics?get=numRestarts", jobId.get()));
+ String.format(
+ "curl http://localhost:8081/jobs/%s/metrics?get=numRestarts", jobId.get()));
List<Map<String, Object>> jobNumRestartsInfo = JSON.parseObject(result, new TypeReference<List<Map<String, Object>>>() {
});
if (!CollectionUtils.isEmpty(jobNumRestartsInfo)) {
@@ -90,12 +93,57 @@ public class InlineToPrintIT extends TestSuiteBase {
});
+
+ }
+
+ @TestTemplate
+ public void testUserDefinedJobVariables(AbstractTestFlinkContainer container) throws IOException, InterruptedException {
+
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ List<String> variables = List.of(
+ "hos.bucket.name.rtp_file=cli_job_level_traffic_rtp_file_bucket",
+ "hos.bucket.name.http_file=cli_job_level_traffic_http_file_bucket");
+ return container.executeJob("/inline_to_print.yaml", variables);
+ } catch (Exception e) {
+ log.error("Commit task exception : {} ", e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+
+
await().atMost(300000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
String logs = container.getServerLogs();
Assertions.assertTrue(StringUtils.countMatches(logs, "cli_job_level_traffic_rtp_file_bucket/test_pcap_file") > 10);
Assertions.assertTrue(StringUtils.countMatches(logs, "cli_job_level_traffic_http_file_bucket/test_http_req_file") > 10);
+ // Test server_ip filter -> output logs not contains 4.4.4.4 of server_ip
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && !StringUtils.contains(logs, "\"server_ip\":\"4.4.4.4\""));
+ // Test Drop function -> output logs not contains 5.5.5.5 of server_ip
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && !StringUtils.contains(logs, "\"server_ip\":\"5.5.5.5\""));
+
+ // Output logs contains server_asn
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"server_asn\""));
+ // Output logs contains server_domain
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"server_domain\""));
+
+ // Output logs contains server_country
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"server_country\""));
+ // Output logs contains mail_attachment_name equals 中文测试
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"mail_attachment_name\":\"中文测试\""));
+ // Test EVAL function -> output logs contains direction equals c2s
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"direction\":\"c2s\""));
+ // Test JSON Extract function -> output logs contains device_group equals XXG-TSG-BJ
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"device_group\":\"XXG-TSG-BJ\""));
+
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "client_ip_list"));
+
+
+
+
+
});
diff --git a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
index b4773a1..f724a36 100644
--- a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
+++ b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
@@ -2,48 +2,203 @@ sources:
inline_source:
type: inline
properties:
- data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","rtp_pcap_path":"test_pcap_file","http_request_body":"test_http_req_file","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]'
+ data: [{"tcp_rtt_ms":128,"decoded_as":"DNS","rtp_pcap_path":"test_pcap_file", "security_rule_id_list": [1,10,100,300], "http_request_body":"test_http_req_file","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","ssl_sni":"www.ct.cn", "http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"flags":8192, "address_type":4,"mail_subject":"中文标题测试","mail_attachment_name":"5Lit5paH5rWL6K+V","mail_attachment_name_charset": "utf8","device_tag": "{\"tags\":[{\"tag\":\"data_center\",\"value\":\"XXG-TSG-BJ\"},{\"tag\":\"device_group\",\"value\":\"XXG-TSG-BJ\"}]}", "client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","mail_subject":"中文标题测试","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","mail_subject":"english subject test","http_request_line":"GET / HTTP/1.1","http_host":"www.5555.com","http_url":"www.5555.com/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.1","server_ip":"5.5.5.5","client_port":42751,"server_port":53,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.6666.cn","http_url":"www.6666.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","mail_subject":"中文标题测试","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.100.1","server_ip":"6.6.6.6","client_port":42751,"server_port":53,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]
format: json
json.ignore.parse.errors: false
filters:
- filter_operator:
- type: com.geedgenetworks.core.filter.AviatorFilter
+ server_ip_filter:
+ type: aviator
properties:
- expression: event.server_ip != '12.12.12.12'
+ expression: event.server_ip != '4.4.4.4'
+
+splits:
+ decoded_as_split:
+ type: split
+ rules:
+ - tag: http_tag
+ expression: event.decoded_as == 'HTTP'
+ - tag: dns_tag
+ expression: event.decoded_as == 'DNS'
+
processing_pipelines:
projection_processor:
type: projection
remove_fields: [http_request_line, http_response_line, http_response_content_type]
functions:
+
- function: DROP
- filter: event.server_ip == '4.4.4.4'
+ filter: event.server_ip == '5.5.5.5'
+
+ - function: SNOWFLAKE_ID
+ output_fields: [ log_id ]
+ parameters:
+ data_center_id_num: 1
+
+ - function: UUID
+ output_fields: [ log_uuid ]
+
+ - function: UUIDv5
+ lookup_fields: [ client_ip, server_ip ]
+ output_fields: [ ip_uuid ]
+ parameters:
+ namespace: NAMESPACE_IP
+ - function: UUIDv7
+ output_fields: [ log_uuid_v7 ]
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_asn ]
+ parameters:
+ kb_name: tsg_ip_asn
+ option: IP_TO_ASN
+
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ ]
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: server_country
+ PROVINCE: server_super_administrative_area
+ CITY: server_administrative_area
+ LONGITUDE: server_longitude
+ LATITUDE: server_latitude
+ ISP: server_isp
+ ORGANIZATION: server_organization
+
+ - function : BASE64_ENCODE_TO_STRING
+ output_fields: [ mail_subject_base64 ]
+ parameters:
+ value_field: mail_subject
+
+ - function: BASE64_DECODE_TO_STRING
+ output_fields: [ mail_attachment_name ]
+ parameters:
+ value_field: mail_attachment_name
+ charset_field: mail_attachment_name_charset
+ - function: CURRENT_UNIX_TIMESTAMP
+ output_fields: [ current_unix_timestamp_ms ]
+ parameters:
+ precision: milliseconds
+
+ - function: DOMAIN
+ lookup_fields: [ http_host, ssl_sni, quic_sni ]
+ output_fields: [ server_domain ]
+ parameters:
+ option: FIRST_SIGNIFICANT_SUBDOMAIN
+
+ - function: EVAL
+ output_fields: [ recv_time ]
+ parameters:
+ value_expression: current_unix_timestamp_ms
+
+ - function: EVAL
+ output_fields: [ direction ]
+ parameters:
+ value_expression: "(flags & 24576) == 24576 ? 'double' : ((flags & 8192) == 8192 ? 'c2s' : ((flags & 16384) == 16384 ? 's2c' : 'unknown'))"
+
+ - function: EVAL
+ output_fields: [ constant_value ]
+ parameters:
+ value_expression: "'abc'"
+
+ - function: JSON_EXTRACT
+ lookup_fields: [ device_tag ]
+ output_fields: [ device_group ]
+ parameters:
+ value_expression: $.tags[?(@.tag=='device_group')][0].value
+
+ - function: FLATTEN
+ lookup_fields: [ device_tag ]
+ parameters:
+ prefix: olap
+ json_string_keys: [device_tag]
+
+ - function: FROM_UNIX_TIMESTAMP
+ lookup_fields: [ current_unix_timestamp_ms ]
+ output_fields: [ current_time_str ]
+ parameters:
+ precision: milliseconds
+
+ - function: GENERATE_STRING_ARRAY
+ lookup_fields: [server_ip, server_port]
+ output_fields: [server_ip_port]
+
- function: PATH_COMBINE
lookup_fields: [ rtp_pcap_path ]
output_fields: [ rtp_pcap_path ]
parameters:
path: [ props.hos.path, props.hos.bucket.name.rtp_file, rtp_pcap_path ]
+
- function: PATH_COMBINE
lookup_fields: [ http_request_body ]
output_fields: [ http_request_body ]
parameters:
path: [ props.hos.path, props.hos.bucket.name.http_file, http_request_body ]
+ - function: RENAME
+ parameters:
+ rename_fields:
+ current_unix_timestamp_ms: processing_time_ms
+ rename_expression: key = string.replace_all(key,'olap.device_tag.tags','device_tags'); return key;
+
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ __timestamp ]
+ output_fields: [stat_time_minute]
+ parameters:
+ precision: minutes
+
+ dns_table_processor:
+ type: table
+ functions:
+ - function: UNROLL
+ lookup_fields: [ security_rule_id_list ]
+ output_fields: [ security_rule_id ]
+
+ dns_aggregate_processor:
+ type: aggregate
+ group_by_fields: [ decoded_as ]
+ window_type: tumbling_processing_time
+ window_size: 5
+ functions:
+ - function: LONG_COUNT
+ output_fields: [ count ]
+ - function: COLLECT_LIST
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_ip_list ]
+
+
+
sinks:
- print_sink:
+ global_print_sink:
type: print
properties:
format: json
mode: log_warn
+ dns_print_sink:
+ type: print
+ properties:
+ format: json
+ mode: log_warn
+ http_print_sink:
+ type: print
+ properties:
+ format: json
+ mode: log_warn
+
application:
env:
name: example-inline-to-print
- parallelism: 3
+ parallelism: 1
pipeline:
object-reuse: true
properties:
+ hos.path: http://192.168.44.12:9098/hos
+ hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket
hos.bucket.name.rtp_file: job_level_traffic_rtp_file_bucket
hos.bucket.name.http_file: job_level_traffic_http_file_bucket
hos.bucket.name.eml_file: job_level_traffic_eml_file_bucket
@@ -51,10 +206,25 @@ application:
topology:
- name: inline_source
- downstream: [filter_operator]
- - name: filter_operator
+ downstream: [server_ip_filter]
+ - name: server_ip_filter
downstream: [ projection_processor ]
- name: projection_processor
- downstream: [ print_sink ]
- - name: print_sink
+ downstream: [ global_print_sink, decoded_as_split ]
+ parallelism: 2
+ - name: decoded_as_split
+ tags: [http_tag, dns_tag]
+ downstream: [ http_print_sink, dns_table_processor ]
+ parallelism: 2
+ - name: dns_table_processor
+ downstream: [ dns_aggregate_processor ]
+ parallelism: 2
+ - name: dns_aggregate_processor
+ downstream: [ dns_print_sink ]
+ parallelism: 2
+ - name: global_print_sink
+ downstream: []
+ - name: http_print_sink
+ downstream: []
+ - name: dns_print_sink
downstream: [] \ No newline at end of file