summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-06-15 23:50:43 +0800
committerdoufenghu <[email protected]>2024-06-15 23:50:43 +0800
commit691f7172a5ce463ca565b744d6c68f173427a6ca (patch)
tree4c585ae27f13c1a6cb82c80d3bd7b2398733bb49
parent80769f631cfdd66ae5b5f1824a00d12fa2e5e43a (diff)
[Improve][docs] Add mock source connector documents.
-rw-r--r--config/dat/asn_builtin.mmdbbin0 -> 8447276 bytes
-rw-r--r--config/dat/ip_builtin.mmdbbin0 -> 30780802 bytes
-rw-r--r--config/template/grootstream_job_debug.yaml5
-rw-r--r--config/template/mock_schema/session_record_mock_desc.json519
-rw-r--r--docs/connector/connector.md192
-rw-r--r--docs/connector/formats/protobuf.md1
-rw-r--r--docs/connector/source/file.md1
-rw-r--r--docs/connector/source/mock.md18
-rw-r--r--docs/grootstream-config.md3
-rw-r--r--docs/processor/udf.md12
-rw-r--r--groot-core/src/test/resources/asn.mmdbbin2316 -> 0 bytes
-rw-r--r--groot-core/src/test/resources/asn_builtin.mmdbbin0 -> 8447276 bytes
-rw-r--r--groot-core/src/test/resources/ip_builtin.mmdbbin4738 -> 30780802 bytes
-rw-r--r--groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java2
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/mock_to_print.yaml122
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/grootstream.yaml6
-rw-r--r--groot-examples/pom.xml7
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java2
18 files changed, 872 insertions, 18 deletions
diff --git a/config/dat/asn_builtin.mmdb b/config/dat/asn_builtin.mmdb
new file mode 100644
index 0000000..050ed8f
--- /dev/null
+++ b/config/dat/asn_builtin.mmdb
Binary files differ
diff --git a/config/dat/ip_builtin.mmdb b/config/dat/ip_builtin.mmdb
new file mode 100644
index 0000000..d870783
--- /dev/null
+++ b/config/dat/ip_builtin.mmdb
Binary files differ
diff --git a/config/template/grootstream_job_debug.yaml b/config/template/grootstream_job_debug.yaml
index f33b2b2..0781a38 100644
--- a/config/template/grootstream_job_debug.yaml
+++ b/config/template/grootstream_job_debug.yaml
@@ -93,13 +93,12 @@ processing_pipelines:
functions: # [array of object] Function List
- function: GEOIP_LOOKUP
lookup_fields: [ server_ip ]
- output_fields: [ server_asn ]
parameters:
kb_name: tsg_ip_location
option: IP_TO_OBJECT
geolocation_field_mapping:
- COUNTRY: client_country_region
- PROVINCE: client_super_admin_area
+ COUNTRY: client_country_region
+ PROVINCE: client_super_admin_area
- function: ASN_LOOKUP
lookup_fields: [ server_ip ]
output_fields: [ server_asn ]
diff --git a/config/template/mock_schema/session_record_mock_desc.json b/config/template/mock_schema/session_record_mock_desc.json
new file mode 100644
index 0000000..a8a5dfa
--- /dev/null
+++ b/config/template/mock_schema/session_record_mock_desc.json
@@ -0,0 +1,519 @@
+[
+ {
+ "name": "device_id",
+ "type": "String",
+ "options": [
+ "9800165603191151",
+ "9800165603247024",
+ "9800165802621377",
+ "9800165603191148"
+ ]
+ },
+ {
+ "name": "sled_ip",
+ "type": "IpV4",
+ "start": "92.168.40.2",
+ "end": "92.168.40.100"
+ },
+ {
+ "name": "tcp_handshake_latency_ms",
+ "type": "Number",
+ "start": 1,
+ "end": 10000
+ },
+ {
+ "name": "out_link_id",
+ "type": "Number",
+ "start": 1,
+ "end": 65535
+ },
+ {
+ "name": "in_link_id",
+ "type": "Number",
+ "start": 1,
+ "end": 65535
+ },
+ {
+ "name": "address_type",
+ "type": "Number",
+ "options": [
+ 4
+ ]
+ },
+ {
+ "name": "vsys_id",
+ "type": "Number",
+ "options": [
+ 1
+ ]
+ },
+ {
+ "name": "flags",
+ "type": "Number",
+ "options": [
+ 1,
+ 8,
+ 24,
+ 16,
+ 72,
+ 4120,
+ 24882,
+ 26254,
+ 26314,
+ 26378,
+ 26444,
+ 27912,
+ 28681,
+ 29010,
+ 32560,
+ 58090,
+ 61064,
+ 91784,
+ 94344,
+ 124104,
+ 124650
+ ]
+ },
+ {
+ "name": "device_group",
+ "type": "String",
+ "options": [
+ "OLAP-MOCK-DG-1",
+ "OLAP-MOCK-DG-2",
+ "OLAP-MOCK-DG-3",
+ "OLAP-MOCK-DG-4"
+ ]
+ },
+ {
+ "name": "in_out_fields",
+ "type": "Union",
+ "unionFields": [
+ {
+ "weight": 10,
+ "fields": [
+ {
+ "name": "direction",
+ "type": "String",
+ "options": [
+ "Outbound"
+ ]
+ },
+ {
+ "name": "client_ip",
+ "type": "IpV4",
+ "start": "103.144.108.1",
+ "end": "103.144.108.255"
+ },
+ {
+ "name": "server_ip",
+ "type": "IpV4",
+ "start": "1.0.0.0",
+ "end": "162.105.10.255"
+ },
+ {
+ "name": "c2s_ttl",
+ "type": "Number",
+ "options": [
+ 63,
+ 127,
+ 128
+ ]
+ },
+ {
+ "name": "s2c_ttl",
+ "type": "Number",
+ "options": [
+ 41,
+ 42,
+ 43,
+ 44,
+ 45,
+ 46,
+ 53,
+ 58,
+ 59,
+ 60,
+ 109,
+ 110,
+ 111
+ ]
+ }
+ ]
+ },
+ {
+ "weight": 5,
+ "fields": [
+ {
+ "name": "direction",
+ "type": "String",
+ "options": [
+ "Inbound"
+ ]
+ },
+ {
+ "name": "client_ip",
+ "type": "IpV4",
+ "start": "1.0.0.0",
+ "end": "162.105.10.255"
+ },
+ {
+ "name": "server_ip",
+ "type": "IpV4",
+ "start": "103.144.108.1",
+ "end": "103.144.108.255"
+ },
+ {
+ "name": "c2s_ttl",
+ "type": "Number",
+ "options": [
+ 41,
+ 42,
+ 43,
+ 44,
+ 45,
+ 46,
+ 53,
+ 58,
+ 59,
+ 60,
+ 109,
+ 110,
+ 111
+ ]
+ },
+ {
+ "name": "s2c_ttl",
+ "type": "Number",
+ "options": [
+ 63,
+ 127,
+ 128
+ ]
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "client_port",
+ "type": "Number",
+ "start": 50000,
+ "end": 60000
+ },
+ {
+ "name": "server_port",
+ "type": "Number",
+ "start": 22,
+ "end": 30000
+ },
+ {
+ "name": "subscriber_id",
+ "type": "Expression",
+ "expression": "#{Name.name}",
+ "nullRate": 0.5
+ },
+ {
+ "name": "phone_number",
+ "type": "Expression",
+ "expression": "#{PhoneNumber.phoneNumberInternational}",
+ "nullRate": 0.5
+ },
+ {
+ "name": "treatment_as_fields",
+ "type": "Union",
+ "unionFields": [
+ {
+ "weight": 10,
+ "fields": [
+ {
+ "name": "security_action",
+ "type": "String",
+ "options": ["Deny", "Allow"]
+ }, {
+ "name": "security_rule_list",
+ "type": "Number",
+ "array": true,
+ "start": 1,
+ "end": 100,
+ "arrayLenMin":1,
+ "arrayLenMax":3
+ }
+ ]
+ }, {
+ "weight": 20,
+ "fields": [
+ {
+ "name": "monitor_rule_list",
+ "type": "Number",
+ "array": true,
+ "start": 101,
+ "end": 200,
+ "arrayLenMin":1,
+ "arrayLenMax":3
+ }
+ ]
+ },
+ {
+ "weight": 1,
+ "fields": [
+ {
+ "name": "proxy_action",
+ "type": "String",
+ "options": ["Intercept", "No Intercept"]
+ }, {
+ "name": "proxy_rule_list",
+ "type": "Number",
+ "array": true,
+ "start": 201,
+ "end": 300,
+ "arrayLenMin":1,
+ "arrayLenMax":3
+ }
+ ]
+ },
+ {
+ "weight": 70,
+ "fields": [
+ ]
+ }
+ ]
+ },
+ {
+ "name": "decoded_as_fields",
+ "type": "Union",
+ "unionFields": [
+ {
+ "weight": 4,
+ "fields": [
+ {
+ "name": "decoded_as",
+ "type": "String",
+ "options": ["BASE"]
+ },
+ {
+ "name": "app",
+ "type": "String",
+ "options": [
+ "ntp",
+ "stun",
+ "unknown",
+ "teredo",
+ "qq_web_qq",
+ "kugou",
+ "quic"
+ ]
+ },
+ {
+ "name": "ip_protocol",
+ "type": "String",
+ "options": [
+ "tcp",
+ "udp"
+ ]
+ }
+ ]
+ },
+ {
+ "weight": 3,
+ "fields": [
+ {
+ "name": "decoded_as",
+ "type": "String",
+ "options": [
+ "SSL"
+ ]
+ },
+ {
+ "name": "ssl_sni",
+ "type": "Expression",
+ "expression": "#{internet.domainName}",
+ "nullRate": 0.1
+ },
+ {
+ "name": "server_port",
+ "type": "Number",
+ "options": [
+ 443
+ ]
+ },
+ {
+ "name": "app",
+ "type": "String",
+ "options": [
+ "https",
+ "ssl",
+ "wps_office",
+ "microsoft",
+ "http2",
+ "anydesk",
+ "meituan",
+ "google",
+ "huawei",
+ "taobao"
+ ]
+ },
+ {
+ "name": "ip_protocol",
+ "type": "String",
+ "options": [
+ "tcp",
+ "udp"
+ ]
+ }
+ ]
+ },
+ {
+ "weight": 1,
+ "fields": [
+ {
+ "name": "decoded_as",
+ "type": "String",
+ "options": [
+ "HTTP"
+ ]
+ },
+ {
+ "name": "http_host",
+ "type": "Expression",
+ "expression": "#{internet.url}",
+ "nullRate": 0.1
+ },
+ {
+ "name": "http_status_code",
+ "type": "Number",
+ "options": [
+ 200,
+ 201,
+ 202
+ ]
+ },
+ {
+ "name": "http_user_agent",
+ "type": "Expression",
+ "expression": "#{internet.botUserAgentAny}"
+ },
+ {
+ "name": "server_port",
+ "type": "Number",
+ "options": [
+ 80,
+ 8009,
+ 8888,
+ 18080
+ ]
+ },
+ {
+ "name": "app",
+ "type": "String",
+ "options": [
+ "http",
+ "qq_web",
+ "wps_office",
+ "zhihu",
+ "bilibili",
+ "google",
+ "wecom"
+ ]
+ },
+ {
+ "name": "ip_protocol",
+ "type": "String",
+ "options": [
+ "tcp"
+ ]
+ }
+ ]
+ },
+ {
+ "weight": 1,
+ "fields": [
+ {
+ "name": "decoded_as",
+ "type": "String",
+ "options": [
+ "MAIL"
+ ]
+ },
+ {
+ "name": "mail_account",
+ "type": "Expression",
+ "expression": "#{internet.emailAddress}",
+ "nullRate": 0.7
+ },
+ {
+ "name": "server_port",
+ "type": "Number",
+ "options": [
+ 25,
+ 143,
+ 110
+ ]
+ },
+ {
+ "name": "app",
+ "type": "String",
+ "options": [
+ "imap",
+ "pop3",
+ "smtp"
+ ]
+ },
+ {
+ "name": "ip_protocol",
+ "type": "String",
+ "options": [
+ "tcp"
+ ]
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "sent_pkts",
+ "type": "Number",
+ "start": 1,
+ "end": 150
+ },
+ {
+ "name": "received_pkts",
+ "type": "Number",
+ "start": 1,
+ "end": 100
+ },
+ {
+ "name": "sent_bytes",
+ "type": "Number",
+ "start": 1,
+ "end": 15000
+ },
+ {
+ "name": "received_bytes",
+ "type": "Number",
+ "start": 1,
+ "end": 60000
+ },
+ {
+ "name": "start_timestamp_ms",
+ "type": "Timestamp",
+ "unit":"millis"
+ },
+ {
+ "name": "end_timestamp_ms",
+ "type": "Timestamp",
+ "unit":"millis"
+ },
+ {
+ "name": "duration_ms",
+ "type": "Number",
+ "start": 1,
+ "end": 10000
+ },
+ {
+ "name": "tcp_rtt_ms",
+ "type": "Number",
+ "start": 1,
+ "end": 20
+}
+
+] \ No newline at end of file
diff --git a/docs/connector/connector.md b/docs/connector/connector.md
index 7031bed..71e416c 100644
--- a/docs/connector/connector.md
+++ b/docs/connector/connector.md
@@ -51,6 +51,7 @@ schema:
### Local File
To retrieve the schema from a local file using its absolute path.
+> Ensures that the file path is accessible to all nodes in your Flink cluster.
```yaml
schema:
# by array
@@ -66,9 +67,198 @@ schema:
fields:
url: "https://localhost:8080/schema.json"
```
+## Mock Data Type
+The mock data type is used to define the template of the mock data.
+
+| Mock Type | Parameter | Result Type | Default | Description |
+|-----------------------------------------|-------------|-----------------------|---------------------|-------------------------------------------------------------------------------------------------------------------------|
+| **[Number](#Number)** | - | **int/bigint/double** | - | **Randomly generate a number.** |
+| - | start | number | 0 | The minimum value (include). |
+| - | end | number | int32.max | The maximum value (exclusive). |
+| - | options | array of number | (none) | The optional values. If set, the random value will be selected from the options and `start` and `end` will be ignored. |
+| - | random | boolean | true | Default is random mode. If set to false, the value will be generated in order. |
+| **[Sequence](#Sequence)** | - | **bigint** | - | **Generate a sequence number based on a specific step value .** |
+| - | start | bigint | 0 | The first number in the sequence (include). |
+| - | step | bigint | 1 | The number to add to each subsequent value. |
+| **[UniqueSequence](#UniqueSequence)** | - | **bigint** | - | **Generate a global unique sequence number.** |
+| - | start | bigint | 0 | The first number in the sequence (include). |
+| **[String](#String)** | - | string | - | **Randomly generate a string.** |
+| - | regex | string | [a-zA-Z]{0,5} | The regular expression. |
+| - | options | array of string | (none) | The optional values. If set, the random value will be selected from the options and `regex` will be ignored. |
+| - | random | boolean | true | Default is random mode. If set to false, the options value will be generated in order. |
+| **[Timestamp](#Timestamp)** | - | **bigint** | - | **Generate a unix timestamp in milliseconds or seconds.** |
+| - | unit | string | second | The unit of the timestamp. The optional values are `second`, `millis`. |
+| **[FormatTimestamp](#FormatTimestamp)** | - | **string** | - | **Generate a formatted timestamp.** |
+| - | format | string | yyyy-MM-dd HH:mm:ss | The format to output. |
+| - | utc | boolean | false | Default is local time. If set to true, the time will be converted to UTC time. |
+| **[IPv4](#IPv4)** | - | **string** | - | **Randomly generate a IPv4 address.** |
+| - | start | string | 0.0.0.0 | The minimum value of the IPv4 address(include). |
+| - | end | string | 255.255.255.255 | The maximum value of the IPv4 address(include). |
+| **[Expression](#Expression)** | - | string | - | **Use library [Datafaker](https://www.datafaker.net/documentation/expressions/) expressions to generate fake data.** |
+| - | expression | string | (none) | The datafaker expression used #{expression}. |
+| **[Object](#Object)** | - | **struct/object** | - | **Generate a object data structure. It used to define the nested structure of the mock data.** |
+| - | fields | array of object | (none) | The fields of the object. |
+| **[Union](#Union)** | - | - | - | **Generate a union data structure with multiple mock data type fields.** |
+| - | unionFields | array of object | (none) | The fields of the object. |
+| - | - fields | - array of object | (none) | |
+| - | - weight | - int | 0 | The weight of the generated object. |
+| | random | boolean | true | Default is random mode. If set to false, the options value will be generated in order. |
+
+### Common Parameters
+
+Mock data type supports some common parameters.
+
+| Parameter | Type | Default | Description |
+|---------------------|---------|---------|----------------------------------------------------------------------------------------|
+| [nullRate](#String) | double | 1 | Null value rate. The value range is [0, 1]. If set to 0.1, the null value rate is 10%. |
+| [array](#String) | boolean | false | Array flag. If set to true, the value will be generated as an array. |
+| arrayLenMin | int | 0 | The minimum length of the array(include). `array` flag must be set to true. |
+| arrayLenMax | int | 5 | The maximum length of the array(include). `array` flag must be set to true. |
+
+
+### Number
+- Randomly generate a integer number between 0 and 10000.
+```json
+{"name":"int_random","type":"Number","start":0,"end":10000}
+```
+- Generate a integer number between 0 and 10000, and the value will be generated in order.
+```json
+{"name":"int_inc","type":"Number","start":0,"end":10000,"random":false}
+```
+- Randomly generate a integer number from 20, 22, 25, 30.
+```json
+{"name":"int_options","type":"Number","options":[20,22,25,30]}
+```
+- randomly generate a double number between 0 and 10000.
+```json
+{"name":"double_random","type":"Number","start":0.0,"end":10000.0}
+```
+### Sequence
+
+- Generate a sequence number starting from 0 and incrementing by 2.
+```json
+{"name":"bigint_sequence","type":"Sequence","start":0,"step":2}
+```
+### UniqueSequence
+
+- Generate a global unique sequence number starting from 0.
+```json
+{"name":"id","type":"UniqueSequence","start":0}
+```
+
+### String
+
+- Randomly generate s string with a length between 0 and 5. And set null value rate is 10%.
+```json
+{"name":"str_regex","type":"String","regex":"[a-z]{5,10}","nullRate":0.1}
+```
+- Randomly generate a string from "a", "b", "c", "d".
+```json
+{"name":"str_options","type":"String","options":["a","b","c","d"]}
+```
+- Randomly generate a array of string. The length of the array is between 1 and 3.
+```json
+{"name":"array_str","type":"String","regex":"[a-z]{5,10}","array":true,"arrayLenMin":1,"arrayLenMax":3}
+```
+
+### Timestamp
+
+- Generate a current Unix timestamp in milliseconds.
+```json
+{"name":"timestamp_ms","type":"Timestamp","unit":"millis"}
+```
+### FormatTimestamp
+
+- Generate a formatted timestamp string using format `yyyy-MM-dd HH:mm:ss`.
+```json
+{"name":"timestamp_str","type":"FormatTimestamp","format":"yyyy-MM-dd HH:mm:ss"}
+```
+- Generate a formatted timestamp string using format `yyyy-MM-dd HH:mm:ss.SSS`.
+```json
+{"name":"timestamp_str","type":"FormatTimestamp","format":"yyyy-MM-dd HH:mm:ss.SSS"}
+```
+
+### IPv4
+- Generate a IPv4 address between 192.168.20.1 and 192.168.20.255.
+```json
+{"name":"ip","type":"IpV4","start":"192.168.20.1","end":"192.168.20.255"}
+```
+
+### Expression
+
+- Generate a fake email address.
+```json
+{"name":"emailAddress","type":"Expression","expression":"#{internet.emailAddress}"}
+```
+- Generate a fake domain name.
+```json
+{"name":"domain","type":"Expression","expression":"#{internet.domainName}"}
+```
+- Generate a fake IPv6 address.
+```json
+{"name":"ipv6","type":"Expression","expression":"#{internet.ipV6Address}"}
+```
+- Generate a fake phone number.
+```json
+{"name":"phoneNumber","type":"Expression","expression":"#{phoneNumber.phoneNumber}"}
+```
+### Object
+
+- Generate a object data structure.
+```json
+{"name":"object","type":"Object","fields":[{"name":"str","type":"String","regex":"[a-z]{5,10}","nullRate":0.1},{"name":"cate","type":"String","options":["a","b","c"]}]}
+```
+output:
+```json
+{"object": {"str":"abcde","cate":"a"}}
+```
+
+### Union
+- Generate a union mock data type fields. Generate object_id and item_id fields. When object_id is 10, item_id is randomly generated from 1, 2, 3, 4, 5. When object_id is 20, item_id is randomly generated from 6, 7. The first object generates 5/7 of the total, and the second object generates 2/7 of the total.
+```json
+{
+ "name": "unionFields",
+ "type": "Union",
+ "random": false,
+ "unionFields": [
+ {
+ "weight": 5,
+ "fields": [
+ {
+ "name": "object_id",
+ "type": "Number",
+ "options": [10]
+ },
+ {
+ "name": "item_id",
+ "type": "Number",
+ "options": [1, 2, 3, 4, 5],
+ "random": false
+ }
+ ]
+ },
+ {
+ "weight": 2,
+ "fields": [
+ {
+ "name": "object_id",
+ "type": "Number",
+ "options": [20]
+ },
+ {
+ "name": "item_id",
+ "type": "Number",
+ "options": [6, 7],
+ "random": false
+ }
+ ]
+ }
+ ]
+}
+```
# Sink Connector
-Sink Connector contains some common core features, and each sink connector supports them to varying degrees.
+The Sink Connector contains some common core features, and each sink connector supports these features to varying degrees.
## Common Sink Options
diff --git a/docs/connector/formats/protobuf.md b/docs/connector/formats/protobuf.md
index e55f6f1..18f86c8 100644
--- a/docs/connector/formats/protobuf.md
+++ b/docs/connector/formats/protobuf.md
@@ -9,6 +9,7 @@ It is very popular in Streaming Data Pipeline. Now support protobuf format in so
| Format Protobuf | Universal | [Download](http://192.168.40.153:8099/service/local/repositories/platform-release/content/com/geedgenetworks/format-protobuf/) |
## Format Options
+> Ensures that the file path is accessible to all nodes in your Flink cluster.
| Name | Type | Required | Default | Description |
|-------------------------------|----------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
diff --git a/docs/connector/source/file.md b/docs/connector/source/file.md
index f92ab84..edb4aab 100644
--- a/docs/connector/source/file.md
+++ b/docs/connector/source/file.md
@@ -23,6 +23,7 @@ File source custom properties.
## Example
This example read data of file test source and print to console.
+> Ensures that the file path is accessible to all nodes in your Flink cluster.
```yaml
sources:
diff --git a/docs/connector/source/mock.md b/docs/connector/source/mock.md
index 42894c5..dfd10d9 100644
--- a/docs/connector/source/mock.md
+++ b/docs/connector/source/mock.md
@@ -4,22 +4,22 @@
## Description
-Mock source connector is used to generate data. It is useful for testing.
+Mock source connector used to randomly generate the number of rows according to the user-defined schema. This connector helps you test the functionality of your system without relying on real data.
## Source Options
-File source custom properties.
+Mock source custom properties.
-| Name | Type | Required | Default | Description |
-|---------------------|---------|----------|---------|------------------------------------------------------------------------------------------------|
-| mock.desc.file.path | String | Yes | (none) | mock schema file path. |
-| rows.per.second | Integer | No | 1000 | Rows per second to control the emit rate. |
-| number.of.rows | Long | No | -1 | Total number of rows to emit. By default, the source is unbounded. |
-| millis.per.row | Long | No | 0 | Millis per row to control the emit rate. If greater than 0, rows.per.second is not effective. |
+| Name | Type | Required | Default | Description |
+|---------------------|---------|----------|---------|-----------------------------------------------------------------------------------------------------|
+| mock.desc.file.path | String | Yes | (none) | The path of the mock data structure file. |
+| rows.per.second | Integer | No | 1000 | The number of rows per second that connector generated. |
+| number.of.rows | Long | No | -1 | The total number of rows data generated. By default, the source is unbounded. |
+| millis.per.row | Long | No | 0 | The interval(mills) between each row. If greater than 0, then `rows.per.second` will be ignored. |
## Example
-This example mock source and print to console.
+This example randomly generates data of a specified schema `mock_example.json` and output to console. More details how to declare mock data type, click [here](../connector.md#mock-data-type).
```yaml
sources:
diff --git a/docs/grootstream-config.md b/docs/grootstream-config.md
index c278658..8ccb055 100644
--- a/docs/grootstream-config.md
+++ b/docs/grootstream-config.md
@@ -29,6 +29,9 @@ The knowledge base is a collection of libraries that can be used in the groot-st
| files | Array | No | (none) | The file list of the knowledge base object. |
### Define the knowledge base file from a local file
+
+> Ensures that the file path is accessible to all nodes in your Flink cluster.
+
```yaml
grootstream:
knowledge_base:
diff --git a/docs/processor/udf.md b/docs/processor/udf.md
index 74fa2d0..c81b705 100644
--- a/docs/processor/udf.md
+++ b/docs/processor/udf.md
@@ -310,6 +310,18 @@ Example:
kb_name: tsg_ip_location
option: IP_TO_DETAIL
```
+```yaml
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: []
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: server_country
+ PROVINCE: server_super_administrative_area
+ CITY: server_administrative_area
+```
### JSON Extract
JSON extract function is used to extract the value from json string.
diff --git a/groot-core/src/test/resources/asn.mmdb b/groot-core/src/test/resources/asn.mmdb
deleted file mode 100644
index a20e238..0000000
--- a/groot-core/src/test/resources/asn.mmdb
+++ /dev/null
Binary files differ
diff --git a/groot-core/src/test/resources/asn_builtin.mmdb b/groot-core/src/test/resources/asn_builtin.mmdb
new file mode 100644
index 0000000..050ed8f
--- /dev/null
+++ b/groot-core/src/test/resources/asn_builtin.mmdb
Binary files differ
diff --git a/groot-core/src/test/resources/ip_builtin.mmdb b/groot-core/src/test/resources/ip_builtin.mmdb
index b333c64..d870783 100644
--- a/groot-core/src/test/resources/ip_builtin.mmdb
+++ b/groot-core/src/test/resources/ip_builtin.mmdb
Binary files differ
diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
index 1f236d7..aabf037 100644
--- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
+++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
@@ -13,7 +13,7 @@ import java.nio.file.Paths;
public class GrootStreamExample {
public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
- String configPath = args.length > 0 ? args[0] : "/examples/kafka_to_print.yaml";
+ String configPath = args.length > 0 ? args[0] : "/examples/mock_to_print.yaml";
String configFile = getTestConfigFile(configPath);
ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs();
executeCommandArgs.setConfigFile(configFile);
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/mock_to_print.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/mock_to_print.yaml
new file mode 100644
index 0000000..1c079a7
--- /dev/null
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/mock_to_print.yaml
@@ -0,0 +1,122 @@
+sources: # [object] Define connector source
+ mock_source:
+ type: mock
+ properties:
+ mock.desc.file.path: ./config/template/mock_schema/session_record_mock_desc.json
+ rows.per.second: 10
+
+processing_pipelines:
+ etl_processor:
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ functions:
+ - function: SNOWFLAKE_ID
+ lookup_fields: ['']
+ output_fields: [log_id]
+ parameters:
+ data_center_id_num: 1
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ __timestamp ]
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+ - function: SNOWFLAKE_ID
+ lookup_fields: [ '' ]
+ output_fields: [ session_id ]
+ parameters:
+ data_center_id_num: 2
+ - function: EVAL
+ output_fields: [ ingestion_time ]
+ parameters:
+ value_expression: recv_time
+
+ - function: DOMAIN
+ lookup_fields: [ http_host, ssl_sni, dtls_sni, quic_sni ]
+ output_fields: [ server_domain ]
+ parameters:
+ option: FIRST_SIGNIFICANT_SUBDOMAIN
+
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_asn ]
+ parameters:
+ kb_name: tsg_ip_asn
+ option: IP_TO_ASN
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_asn ]
+ parameters:
+ kb_name: tsg_ip_asn
+ option: IP_TO_ASN
+
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: []
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: client_country
+ PROVINCE: client_super_administrative_area
+ CITY: client_administrative_area
+
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: []
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: server_country
+ PROVINCE: server_super_administrative_area
+ CITY: server_administrative_area
+
+
+ - function: CURRENT_UNIX_TIMESTAMP
+ output_fields: [ processing_time ]
+ parameters:
+ precision: seconds
+
+
+sinks:
+ print_sink:
+ type: print
+ properties:
+ mode: log_info
+ format: json
+
+ kafka_sink:
+ type: kafka
+ properties:
+ topic: SESSION-RECORD
+ kafka.bootstrap.servers: 192.168.44.12:9094
+ kafka.retries: 0
+ kafka.linger.ms: 10
+ kafka.request.timeout.ms: 30000
+ kafka.batch.size: 262144
+ kafka.buffer.memory: 134217728
+ kafka.max.request.size: 10485760
+ kafka.compression.type: snappy
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
+ format: json
+ json.ignore.parse.errors: false
+ log.failures.only: true
+
+
+application: # [object] Define job configuration
+ env:
+ name: mock_to_print
+ parallelism: 3
+ shade.identifier: aes
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: mock_source
+ downstream: [ etl_processor ]
+ - name: etl_processor
+ downstream: [ print_sink ]
+ - name: print_sink
+ downstream: [] \ No newline at end of file
diff --git a/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml b/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml
index 1ffda9f..67e1dd6 100644
--- a/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml
@@ -2,12 +2,12 @@ grootstream:
knowledge_base:
- name: tsg_ip_asn
fs_type: local
- fs_path: /Users/darnell/IdeaProjects/groot-stream/groot-core/src/test/resources/
+ fs_path: ./config/dat/
files:
- - asn.mmdb
+ - asn_builtin.mmdb
- name: tsg_ip_location
fs_type: local
- fs_path: /Users/darnell/IdeaProjects/groot-stream/groot-core/src/test/resources/
+ fs_path: ./config/dat
files:
- ip_builtin.mmdb
properties:
diff --git a/groot-examples/pom.xml b/groot-examples/pom.xml
index 1fb6212..6184bda 100644
--- a/groot-examples/pom.xml
+++ b/groot-examples/pom.xml
@@ -42,6 +42,13 @@
<dependency>
<groupId>com.geedgenetworks</groupId>
+ <artifactId>connector-mock</artifactId>
+ <version>${project.version}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
<artifactId>connector-clickhouse</artifactId>
<version>${project.version}</version>
<scope>${scope}</scope>
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
index 29154e5..a9a730e 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
@@ -60,7 +60,7 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
copyGrootStreamStarterToContainer(jobManager);
copyGrootStreamStarterLoggingToContainer(jobManager);
- jobManager.setPortBindings(Lists.newArrayList(String.format("%s:%s", 8081, 8084)));
+ jobManager.setPortBindings(Lists.newArrayList(String.format("%s:%s", 8084, 8081)));
taskManager =
new GenericContainer<>(dockerImage)