diff options
| author | doufenghu <[email protected]> | 2024-06-15 23:50:43 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-06-15 23:50:43 +0800 |
| commit | 691f7172a5ce463ca565b744d6c68f173427a6ca (patch) | |
| tree | 4c585ae27f13c1a6cb82c80d3bd7b2398733bb49 /groot-examples | |
| parent | 80769f631cfdd66ae5b5f1824a00d12fa2e5e43a (diff) | |
[Improve][docs] Add mock source connector documents.
Diffstat (limited to 'groot-examples')
4 files changed, 133 insertions, 4 deletions
diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java index 1f236d7..aabf037 100644 --- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java +++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java @@ -13,7 +13,7 @@ import java.nio.file.Paths; public class GrootStreamExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException { - String configPath = args.length > 0 ? args[0] : "/examples/kafka_to_print.yaml"; + String configPath = args.length > 0 ? args[0] : "/examples/mock_to_print.yaml"; String configFile = getTestConfigFile(configPath); ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs(); executeCommandArgs.setConfigFile(configFile); diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/mock_to_print.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/mock_to_print.yaml new file mode 100644 index 0000000..1c079a7 --- /dev/null +++ b/groot-examples/end-to-end-example/src/main/resources/examples/mock_to_print.yaml @@ -0,0 +1,122 @@ +sources: # [object] Define connector source + mock_source: + type: mock + properties: + mock.desc.file.path: ./config/template/mock_schema/session_record_mock_desc.json + rows.per.second: 10 + +processing_pipelines: + etl_processor: + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + functions: + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: 1 + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ __timestamp ] + output_fields: [ recv_time ] + parameters: + precision: seconds + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ session_id ] + parameters: + data_center_id_num: 2 + - function: EVAL + output_fields: [ ingestion_time ] + parameters: + value_expression: recv_time + + - function: DOMAIN + lookup_fields: [ http_host, ssl_sni, dtls_sni, quic_sni ] + output_fields: [ server_domain ] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + + + - function: ASN_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_asn ] + parameters: + kb_name: tsg_ip_asn + option: IP_TO_ASN + + - function: ASN_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_asn ] + parameters: + kb_name: tsg_ip_asn + option: IP_TO_ASN + + - function: GEOIP_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [] + parameters: + kb_name: tsg_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: client_country + PROVINCE: client_super_administrative_area + CITY: client_administrative_area + + - function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [] + parameters: + kb_name: tsg_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: server_country + PROVINCE: server_super_administrative_area + CITY: server_administrative_area + + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: seconds + + +sinks: + print_sink: + type: print + properties: + mode: log_info + format: json + + kafka_sink: + type: kafka + properties: + topic: SESSION-RECORD + kafka.bootstrap.servers: 192.168.44.12:9094 + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: json + json.ignore.parse.errors: false + log.failures.only: true + + +application: # [object] Define job configuration + env: + name: mock_to_print + parallelism: 3 + shade.identifier: aes + pipeline: + object-reuse: true + topology: + - name: mock_source + downstream: [ etl_processor ] + - name: etl_processor + downstream: [ print_sink ] + - name: print_sink + downstream: []
\ No newline at end of file diff --git a/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml b/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml index 1ffda9f..67e1dd6 100644 --- a/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml @@ -2,12 +2,12 @@ grootstream: knowledge_base: - name: tsg_ip_asn fs_type: local - fs_path: /Users/darnell/IdeaProjects/groot-stream/groot-core/src/test/resources/ + fs_path: ./config/dat/ files: - - asn.mmdb + - asn_builtin.mmdb - name: tsg_ip_location fs_type: local - fs_path: /Users/darnell/IdeaProjects/groot-stream/groot-core/src/test/resources/ + fs_path: ./config/dat files: - ip_builtin.mmdb properties: diff --git a/groot-examples/pom.xml b/groot-examples/pom.xml index 1fb6212..6184bda 100644 --- a/groot-examples/pom.xml +++ b/groot-examples/pom.xml @@ -42,6 +42,13 @@ <dependency> <groupId>com.geedgenetworks</groupId> + <artifactId>connector-mock</artifactId> + <version>${project.version}</version> + <scope>${scope}</scope> + </dependency> + + <dependency> + <groupId>com.geedgenetworks</groupId> <artifactId>connector-clickhouse</artifactId> <version>${project.version}</version> <scope>${scope}</scope> |
