1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
|
# Introduction
GrootStream is a real-time ETL platform based on the concepts of flow-based programing. Groot provides a template to quickly build a data flow job. It includes sources, filters, processing pipelines and sinks etc.
The main format of the config template file is `yaml`, for more details of this format type you can refer to [YAML-GUIDE](https://yaml.org/spec/1.2/spec.html).
# Job Config
## Config file structure
```yaml
sources:
inline_source:
type: inline
schema:
fields:
- name: log_id
type: bigint
- name: recv_time
type: bigint
- name: fqdn_string
type: string
- name: client_ip
type: string
- name: server_ip
type: string
- name: decoded_as
type: string
properties:
data: '{"log_id": 1, "recv_time":"111","fqdn_string":"baidu.com", "client_ip":"192.168.0.1","server_ip":"120.233.20.242","decoded_as":"BASE", "dup_traffic_flag":1}'
format: json
json.ignore.parse.errors: false
filters:
filter:
type: com.geedgenetworks.core.filter.AviatorFilter
properties:
expression: event.decoded_as == 'BASE'
preprocessing_pipelines:
preprocessor:
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
functions:
- function: EVAL
output_fields: ['additional_field_subdomain']
parameters:
value_expression: fqdn_string
processing_pipelines:
processor:
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
remove_fields: [log_id]
output_fields: []
functions:
- function: DROP
lookup_fields: []
output_fields: []
filter: event.client_ip == '192.168.10.100'
- function: SNOWFLAKE_ID
lookup_fields: []
output_fields: [ log_id ]
postprocessing_pipelines:
postprocessor:
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
remove_fields: [dup_traffic_flag]
sinks:
print_sink:
type: print
properties:
format: json
application:
env:
name: inline-to-print-job
parallelism: 3
pipeline:
object-reuse: true
topology:
- name: inline_source
parallelism: 1
downstream: [http_filter]
- name: http_filter
downstream: [preprocessor]
- name: preprocessor
downstream: [processor]
- name: processor
downstream: [postprocessor]
- name: postprocessor
downstream: [ print_sink ]
- name: print_sink
parallelism: 1
downstream: []
```
## Schema Structure
Some sources are not strongly limited schema, so you need use `fields` to define the field name and type. The source can customize the schema. Like `Kafka` `Inline` source etc.
```yaml
Schema:
fields:
- name: log_id
type: bigint
- name: recv_time
type: bigint
- name: fqdn_string
type: string
- name: client_ip
type: string
- name: server_ip
type: string
- name: decoded_as
type: string
```
`name` The name of the field. `type` The data type of the field.
| Data type | Value type in Java | Description |
|:----------|:--------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| string | `java.lang.String` | string |
| boolean | `java.lang.Boolean` | boolean |
| int | `java.lang.Integer` | All numbers from -2,147,483,648 to 2,147,483,647 are allowed. |
| bigint | `java.lang.Long` | All numbers between -9,223,372,036,854,775,808 and 9,223,372,036,854,775,807 are allowed. |
| float | `java.lang.Float` | Float-precision numeric data from -1.79E+308 to 1.79E+308. |
| double | `java.lang.Double` | Double precision floating point. Handle most decimals. |
| binary | `byte[]` | bytes. |
| struct | `java.util.Map<String, Object>` | A Map is an object that maps keys to values. The value type includes all types. example: struct<id:int, client_ip:string, data:struct<id:int, name:string>>. |
| array | `List<Object>` | A array is a data type that represents a collection of elements. The element type includes all types. example: array<int>, array<struct<id:int, client_ip:string>>. |
## Sources
Source is used to define where GrootStream needs to ingest data. Multiple sources can be defined in a job. The supported sources are listed in [Source Connectors](connector/source). Each source has its own specific parameters to define how to fetch data, and GrootStream also extracts the properties that each source will use, such as the `topic` and `kafka.bootstrap.servers` of the `Kafka` source.
## Filters
Filter operator is used to define the conditions for filtering data. Multiple filters can be defined in a job. The supported filters are listed in [Filters](filter). Each filter has its own specific parameters to define how to filter data, and GrootStream also extracts the properties that each filter will use, such as the `expression` of the `Aviator` filter.
Based on the filter expression, the event will be passed to downstream if the expression is true, otherwise it will be dropped.
## Processing Pipelines
Processing pipelines are used to define the event processing logic of the job. It can be categorized by functionality into stateless and stateful processors. Based processing order, it can be categorized into pre-processing pipeline, processing pipeline and post-processing pipeline. Each processor can assemble `UDFs`(User-defined functions) into a pipeline. The detail of processor is listed in [Processor](processor).
## Sinks
Sink is used to define where GrootStream needs to output data. Multiple sinks can be defined in a job. The supported sinks are listed in [Sink Connectors](connector/sink). Each sink has its own specific parameters to define how to output data, and GrootStream also extracts the properties that each sink will use, such as the `topic` and `kafka.bootstrap.servers` of the `Kafka` sink.
## Application
Used to define some common parameters of the job and the topology of the job. such as the name of the job, the parallelism of the job, etc. The following configuration parameters are supported.
### ENV
Used to define job environment configuration information. For more details, you can refer to the documentation [JobEnvConfig](./env-config.md).
# Command
## Run a job by CLI
```bash
Usage: start.sh [options]
Options:
--check Whether check config (default: false)
--encrypt Show encrypted config file. If config file is palin text, it will be encrypted and printed to the console.
--decrypt Show decrypted config file. If config file is encrypted, it will be decrypted and printed to the console.
-c, --config <config file> Config file path, must be specified
-e, --deploy-mode <deploy mode> Deploy mode, only support [run] (default: run)
--target <target> Submitted target type, support [local, remote, yarn-session, yarn-per-job]
-n, --name <name> Job name (default: groot-stream-job)
-i, --variable <variable> User-defined parameters, eg. -i key=value (default: [])
-h, --help Show help message
-v, --version Show version message
```
```
|