1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
|
# Groot Stream 设计方案
# 目录
- [概述](#概述)
- [系统架构](#系统架构)
- [全局配置 grootstream.yaml](#全局配置-grootstreamyaml)
- [任务配置](#任务配置)
- [接入数据源(Sources)](#接入数据源sources)
- [Source 公共配置](#source-公共配置)
- [Schema配置](#schema配置)
- [Fields](#fields)
- [Local File](#local-file)
- [URL](#url)
- [Kafka Source](#kafka-source)
- [IPFIX Collector(UDP)](#ipfix-collectorudp)
- [File Source](#file-source)
- [Mock Source](#mock-source)
- [Inline Source](#inline-source)
- [过滤器(Filters)](#过滤器filters)
- [分流器(Splits)](#分流器splits)
- [任务处理器 (Processors)](#任务处理器-processors)
- [Projection Processor](#projection-processor)
- [Aggregate Processor](#aggregate-processor)
- [Table Processor](#table-processor)
- [输出Sinks](#输出sinks)
- [Kafka Sink](#kafka-sink)
- [ClickHouse Sink](#clickhouse-sink)
- [Print Sink](#print-sink)
- [Formats](#formats)
- [JSON](#json)
- [MessagePack](#messagepack)
- [Protobuf](#protobuf)
- [Raw](#raw)
- [任务编排](#任务编排)
- [函数定义](#函数定义)
- [内置UDF](#内置udf)
- [标量函数](#标量函数)
- [聚合函数](#聚合函数)
- [表格函数](#表格函数)
- [CN扩展UDF](#cn扩展udf)
- [实现原则](#实现原则)
- [相关问题](#相关问题)
# 概述
Groot Stream 是一个实时数据流处理平台,提供了灵活的数据定制管道,能够高效的从多种数据源收集数据,并对其进行加工和转换。具体包括过滤、解析、重组和数据聚合,以便更好的处理和管理数据。
主要优势:
- 实时数据处理:利用Flink作为底层引擎,可以针对大规模实时数据流提供高吞吐、低延迟的实时处理能力。
- 插件化管理:可自定义Functions, Packs, Sources 和Sinks,用于满足不同应用场景下的数据流定制需求。
- 降低开发成本:通过YML模版定制数据处理拓扑,无需编写代码快速实现ETL需求。替代现有Real-time Log Streaming ,Data Transporter ETL 和Gohangout数据加载模块。
应用场景:
- 数据汇聚场景
- 构建QuickConnect拓扑,各个分中心数据被集中汇聚到国家中心。
- 数据流定制
- 会话日志经过预处理后发给不同的系统或第三方厂商。
- 定义Filter 匹配符合条件的日志,然后预处理Pipeline对日志进行反序列化,增加处理时间,抽取域名等操作。
- Router A 经过 TSG Projection处理器,执行ID-Mapping映射Subscriber ID,发送到TSG系统中。
- Router B 经过CN Projection处理器,增加IoC标签库映射字段,删除不需要的字段,发送到CN系统中。
- Router C 经过第三方厂商 Projection处理器,过滤SSL、HTTP 日志,抽取部分字段发送到第三方厂商中。
- 将会话日志按应用层协议分流,分发到不同Topic中。
- 过滤匹配SSL日志,分发到SSL Topic。
- 过滤匹配邮件日志,分发到Email Topic。
- 数据聚合
# 系统架构

- **Sources**
- 接收多种数据源或收集器的连续数据输入, 包括Kafka、IPFIX Collector 或UDP 等。
- 配置参数包括基础配置和Source配置。例如Type 为Kafka,则需要增加Source参数kafka.bootstrap.servers, topics和kafka.consumer.group.id 等。
- **Filters**
- 对数据源中的日志进行筛选和过滤,缩小处理日志的范围。
- 通过定义过滤表达式,指定数据中某些属性、条件或规则,基于该表达式匹配符合条件的数据。例如:common_c2s_bytes_num <= 2147483647 && common_s2c_bytes_num<= 2147483647 ,过滤掉不符合Integer取值范围的数据。
- **QuickConnect**
- 基于最小化配置,快速构建Sources和Sinks之间的数据管道,可用于原型、测试或跨域数据汇聚。
- 通过在管道中插入Processors 或Pack。
- **Pipelines**
- 在数据流的不同处理阶段可以引用不同类型的Pipelines,所有Pipelines(一系列Functions组成)架构和内部结构一致,只分为Projection和Aggregate两种类型。按Pipeline所在数据流的位置可分为:
- **Pre-processing Pipelines :可选,**前处理数据管道对输入日志进行格式化或执行一系列全局处理函数(例如:从原始日志中提取感兴趣的字段)。
- **Processing Pipelines:**业务处理管道
- **Post-processing Pipelines ,可选,**后处理数据管道,发送到目的地之前对日志进行格式化或执行一系列全局处理函数(例如:对输出的日志进行格式验证、类型转换)
- 数据流处理基本单元为处理器,按功能分为无状态和有状态处理器。每个处理器可以连接多个函数,组成一个Pipeline。
- 投影处理器(Projection Processor):针对每条日志选择所需的列或属性。它属于无状态处理器,期间会严格按照处理器定义的函数(UDFs)顺序执行。例如:获取顶级域名,字符串转换、类型转换或反序列化等运算符函数组成一个Pipeline。
- 聚合处理器(Aggregate Processor):多条日志进行分组聚合统计。它属于有状态处理器,期间可经过一系列自定义聚合函数(UDAFs)。例如:计算不同IP的总带宽,不同域名总会话数等聚合函数组成一个Pipeline。
- 表格处理器(Table Processor):一条日志展开为多条输出。它属于无状态处理器,期间可经过一系列自定义聚合函数(UDTFs)。例如:将某个JSON格式的属性展开为多条,其他属性复制,将多条日志输出。
- **Sinks**
- 发送数据到多个目的地, 具体包括Kafka、HBase 或 Mysql 等。
- 每种Sink包括基础配置和Sink配置。例如Type 为Kafka,则需要Sink参数Kafka.bootstrap.servers, kafka.topic和kafka.producer.ack 等。
- **Packs**
- 复杂业务逻辑处理器,一般应用于无法通过函数实现的场景。例如:动态知识库加载及动态schema的数据序列化。
# 全局配置 grootstream.yaml
```yaml
grootstream:
# 知识库配置
knowledge_base:
- name: tsg_ip_asn # 知识库名称
fs_type: http # 文件系统类型(http,local,hdfs..)
fs_path: http://127.0.0.1:9999/v1/knowledge_base # 文件路径(单机模式hdfs://{ip}:{port}/{path},集群模式hdfs://{nameservice}/{path})
files:
- f9f6bc91-2142-4673-8249-e097c00fe1ea # 知识库文件名
# ....
- name: tsg_ip_location
# ....
kms:
local:
type: local
vault:
type: vault
url: <vault-url>
username: <vault-username>
password: <vault-password>
default_key_path: <default-vault-key-path>
plugin_key_path: <plugin-vault-key-path>
ssl: ## SSL/TLS 客户端链接配置
skip_verification: true # 忽略SSL证书校验
private_key_path: /path/to/certs/worker.key # 客户端私钥文件路径
certificate_path: /path/to/certs/worker.pem # 客户端证书文件路径
ca_certificate_path: /path/to/certs/root.pem # CA 根证书路径
properties: # 用户自定义属性的支持从函数中获取,使用方式见函数定义
hos.path: http://127.0.0.1:9093
hos.bucket.name.traffic_file: traffic_file_bucket
hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket
scheduler.knowledge_base.update.interval.minutes: 1 #知识库文件定时更新时间
```
| 属性名 | 必填 | 默认值 | 类型 | 描述 |
| -------------- | ---- | ------ | ------------------- | ---------------------------------------------- |
| knowledge_base | Y | - | Object | 知识库配置 |
| kms | N | - | Object | kms (key management system, 密钥管理系统) 配置 |
| ssl | N | - | Object | ssl配置 |
| properties | N | - | Map(String, Object) | 自定义属性配置:key-value 格式 |
# 任务配置
## 接入数据源(Sources)
### **Source 公共配置**
```yaml
sources:
kafka_source:
type : kafka # source connector 类型
# source表schema, 通过fields/local_file/url三种方式配置: 配置则转换校验, 只输出配置的列;没配置输出全部列, 不进行类型转换和校验
schema:
fields:
- name: common_recv_time
type: bigint
- name: common_log_id
type: bigint
# local_file: "schema/test_schema.json"
# url: "http://127.0.0.1/schema.json"
# watermark_timestamp: recv_time
# watermark_timestamp_unit: ms
# watermark_lag: 60
properties: # source connector 配置
prop_key1: prop_value1
prop_key2: prop_value2
#...
```
| 属性名 | 必填 | 默认值 | 类型 | 描述 |
|--------------------------|-------|-----------|-----------|------------------------------------------------------------------------------------------|
| **type** | Y | - | String | source唯一标识 |
| schema | N | - | Map | source表schema,配置则只输出配置的列,同时会进行类型转换和校验。 |
| watermark_timestamp | N | - | String | watermark timestamp字段名称。 |
| watermark_timestamp_unit | N | ms | String | watermark timestamp字段单位,可选值:ms(milliseconds),s(seconds)。如果配置watermark_timestamp,此字段是必须的。 |
| watermark_lag | N | - | Long | watermark out-of-order milliseconds。如果配置watermark_timestamp,此字段是必须的。 |
| properties | Y | - | Object | source属性配置 |
### schema配置
支持通过fields/local_file/url三种方式配置,只能同时配置一种方式。
#### Fields
支持配置属性列表和sql风格字符串(hive sql)
example:
```yaml
schema:
fields:
- name: common_recv_time
type: bigint
- name: common_log_id
type: bigint
```
支持的数据类型:
| 类型 | 对应java类型 | 描述 |
|---------|-----------------------|----------------------------------------------------------------------------|
| string | String | 字符串 |
| int | Integer | int |
| bigint | Long | bigint |
| float | Float | float |
| double | Double | double |
| boolean | Boolean | boolean |
| binary | byte[] | 字节数组 |
| struct | Map<String, Object> | 结构体。例如:struct<id:int, client_ip:string, data:struct<id:int, name:string>>。 |
| array | List<Object> | 数组。例如:array<int>, array<struct<id:int, client_ip:string>>。 |
#### Local File
读取本地文件中的schema定义,只支持tsg avro schema格式
- example
```yaml
schema:
local_file: "schema/test_schema.json"
```
- test_schema.json
```yaml
{
"type": "record",
"name": "test",
"fields" : [
{"name": "log_id", "type": "long"},
{"name": "recv_time", "type": "long"},
{"name": "client_ip", "type": "string","doc": {"visibility": "enabled"}}
]
}
```
#### URL
读取http url返回的schema定义,只支持tsg avro schema格式,支持动态更新schema,支持动态schema的connector有:clickhouse sink.
example:
```yaml
schema:
url: "http://127.0.0.1/schema.json"
```
### Kafka Source
```yaml
sources: # [object]
kafka_source: # [object] Source Name
# source标识
type : kafka
# 数据schema: 配置则转换校验, 只输出配置的列;没配置输出全部列, 不进行类型转换和校验
fields:
- name: common_recv_time
type: bigint
- name: common_log_id
type: bigint
# source属性配置
properties:
topic: SESSION-RECORD-COMPLETED
kafka.bootstrap.servers: 192.168.44.11:9092
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.group.id: SESSION-RECORD-COMPLETED-GROUP-GROOT-STREAM-20231021
kafka.auto.offset.reset: latest
format: json
```
| 属性名 | 必填 | 默认值 | 类型 | 描述 |
|-----------------------------|----|------|--------|---------------------------------------------------|
| **topic** | Y | - | String | Kafka Topic名称。支持 Topic列表,用分号分隔,如'topic-1;topic-2' |
| **kafka.bootstrap.servers** | Y | - | String | Kafka Broker 地址 |
| **format** | Y | JSON | String | format,用来反序列化消息JSONProtobufCSV... |
| Kafka Properties | N | - | | kafka Consumer Properties,以"kafka."作为前缀 |
| Format properties | N | - | | format properties,以Format类型作为前缀。例如: “protobuf.” |
### IPFIX Collector(UDP)
```yaml
sources: # [object]
ipfix_source: # [object] Source Name
# source标识
type : ipfix
# 数据schema: 配置则转换校验, 只输出配置的列;没配置输出全部列, 不进行类型转换和校验
fields:
- name: recv_time
type: bigint
- name: log_id
type: bigint
# source属性配置
properties:
port.range: 12345-12347
max.packet.size: 65535
max.receive.buffer.size: 104857600
service.discovery.registry.mode: 0
service.discovery.service.name: udp_ipfix
service.discovery.health.check.interval: 5
service.discovery.nacos.server.addr: 192.168.44.12:8848
service.discovery.nacos.username: nacos
service.discovery.nacos.password: nacos
service.discovery.nacos.namespace: test
service.discovery.nacos.group: IPFIX
service.discovery.consul.server.addr: 192.168.41.30
service.discovery.consul.server.port: 8500
service.discovery.consul.token:
```
| 属性名 | 必填 | 默认值 | 类型 | 描述 |
|--------------------------------------------|------------|-------------|----------|-----------------------------------------------------------------------------|
| port.range | Y | - | String | IPFIX Collector的UDP端口,指定单个端口或端口范围。例如指定单个端口为4739,指定端口范围为12345-12347。 |
| max.packet.size | N | 65535 | Integer | 单条UDP数据包的最大大小,最大值为65535(Bytes)。 |
| max.receive.buffer.size | N | 104857600 | Integer | UDP接收缓存区大小(Bytes)。 |
| service.discovery.registry.mode | N | - | Integer | 服务发现的注册模式,0为nacos,1为consul,其他为不使用服务发现。 |
| service.discovery.service.name | N | - | String | 服务发现中的serviceName。 |
| service.discovery.health.check.interval | N | - | Integer | 服务发现健康检查的时间间隔,单位秒。 |
| service.discovery.nacos.server.addr | N | - | String | nacos服务的地址,格式为ip:port, service.discovery.registry.mode为0时必须指定。 |
| service.discovery.nacos.username | N | - | String | nacos的用户名,service.discovery.registry.mode为0时必须指定。 |
| service.discovery.nacos.password | N | - | String | nacos的密码,service.discovery.registry.mode为0时必须指定。 |
| service.discovery.nacos.namespace | N | - | String | nacos中的命名空间,service.discovery.registry.mode为0时可设置,不设置为public。 |
| service.discovery.nacos.group | N | - | String | nacos中的所属组,service.discovery.registry.mode为0时可设置,不设置为DEFAULT。 |
| service.discovery.consul.server.ip | N | - | String | consul服务的ip,service.discovery.registry.mode为1时必须指定。 |
| service.discovery.consul.server.port | N | - | Integer | consul服务的端口,service.discovery.registry.mode为1时必须指定。 |
| service.discovery.consul.token | N | - | String | consul的token,service.discovery.registry.mode为1且consul开启验证时必须指定。 |
### File Source
从text file读取数据,支持本地文件和hdfs文件,用于测试以及从文件回放数据,这个source每个1s发送2条数据
```yaml
sources:
file_source:
type: file
properties:
# path: 'hdfs://ns1/test/logs.json'
path: './logs.json'
rows.per.second: 2
format: json
```
| 属性名 | 必填 | 默认值 | 类型 | 描述 |
|---------------------------|-------|------|---------|---------------------------------------------------------------------------------------------------------------------------------------------|
| **path** | Y | - | String | 文件路径,以[hdfs://](hdfs://ns1/test/logs.json)开头为hdfs文件,其它为本地文件系统文件。例如:./logs/logs.json, [hdfs://ns1/test/logs.json](hdfs://ns1/test/logs.json) |
| **format** | Y | - | String | 使用的format |
| rows.per.second | N | 1000 | Integer | 每秒生成行数 |
| number.of.rows | N | -1 | Long | 总生成行数,默认此source是无界流(会循环从文件生成数据),当配置大于0时此source为有界流 |
| millis.per.row | N | 0 | Long | 每行生成花费毫秒数,当大于0时,rows.per.second配置不生效 |
| read.local.file.in.client | N | true | Boolean | 是否在客户端读取本地文件,客户端读取限制文件大小最大为128MB。当为false时,在taskmanager端读取文件,必须在每个taskmanager的path存放文件 |
put file to hdfs:
```shell
# maka dir
hadoop fs -mkdir hdfs://ns1/test
# put local file to hdfs
hadoop fs -put logs.json hdfs://ns1/test
# list hdfs dir
hadoop fs -ls logs.json hdfs://ns1/test
```
### **Mock Source**
mock数据源,用于生成测试数据
```yaml
sources:
mock_source:
type : mock
properties:
mock.desc.file.path: './mock_example.json'
rows.per.second: 1
```
| 属性名 | 必填 | 默认值 | 类型 | 描述 |
|---------------------------|-----|-------|---------|----------------------------------------------------|
| **mock.desc.file.path** | Y | - | String | mock schema文件路径 |
| rows.per.second | N | 1000 | Integer | 每秒生成行数 |
| number.of.rows | N | -1 | Long | 总生成行数,默认此source是无界流(会循环从文件生成数据),当配置大于0时此source为有界流 |
| millis.per.row | N | 0 | Long | 每行生成花费毫秒数,当大于0时,rows.per.second配置不生效 |
#### mock desc 文件配置
mock desc为json配置,配置每个字段的mock规则,格式:
```json
[
{
"name": "field_name1",
"type": "type1",
"arg": "arg"
},
{
"name": "field_name2",
"type": "type2",
"arg": "arg"
}
]
```
#### mock type
| type | 参数 | 说明 | 返回数据类型 | 例子 |
|:----------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------|:--------------------------------------------------------------------------------------:|-------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Number | min(number):range起始值(包含),默认:0。max(number):range结束值(不包含),默认:int32.max。options(array<number>):number列表,配置options则[start, end)失效,默认:null。random(boolean):随机模式,默认:true。 | 用于生成number | 根据start、end、options的值,推测返回类型为:int或bigint或double | 随机生成[0, 10000)范围的int数据:{"name":"int_random","type":"Number","min":0,"max":10000}递增方式生成[0, 10000)范围的int数据:{"name":"int_inc","type":"Number","min":0,"max":10000,"random":false}从int列表生成int数据:{"name":"int_options","type":"Number","options":[20,22,25,30]}随机生成[0, 10000)范围的double数据:{"name":"double_random","type":"Number","min":0.0,"max":10000.0} |
| Sequence | start(bigint):range起始值(包含),默认:0。step(bigint):步长,默认:1。 | 用于生成bigint序列, 类似等差数列 | bigint | 生成0,1,2...序列:{"name":"sub_id","type":"Sequence","start":0}生成0,2,4...序列:{"name":"sub_id","type":"Sequence","start":0,"step":2} |
| UniqueSequence | start(bigint):range起始值(包含),默认:0。 | 用于生成唯一bigint序列,0,1,2...和Sequence的区别: Sequence每个线程单独生成序列 UniqueSequence生成数字整个程序保证唯一 | bigint | 生成0,1,2...序列:{"name":"id","type":"UniqueSequence","start":0} |
| String | regex(string):根据正则随机生成符合正则的字符串,默认:[a-zA-Z]{0,5}。options(array`<string>`):string列表,配置options则regex失效,默认:null。random(boolean):随机模式,默认:true。 | 用于生成string | string | 随机生成长度我5-10的小写英文字符串:{"name":"str_regex","type":"String","regex":"[a-z]{5,10}"}从string列表生成string数据:{"name":"str_options","type":"String","options":["a","b","c"]} |
| Timestamp | unit(string):second或millis,生成秒或者毫秒时间戳,默认:second。 | 用于生成时间戳(当前时间) | bigint | 生成unix时间戳:{"name":"timestamp","type":"Timestamp"}生成毫秒时间戳:{"name":"timestamp_ms","type":"Timestamp","unit":"millis"} |
| FormatTimestamp | format(string):format,默认:yyyy-MM-dd HH:mm:ss。utc(boolean):使用utc时区,默认:false,当地时区。 | 用于生成时间字符串(当前时间) | string | 生成时间字符串:{"name":"timestamp_str","type":"FormatTimestamp","format":"yyyy-MM-dd HH:mm:ss"}生成时间字符串:{"name":"timestamp_str","type":"FormatTimestamp","format":"yyyy-MM-dd HH:mm:ss.SSS"} |
| IPv4 | start(string):起始值ip(包含),默认:0.0.0.0。end(string):结束ip(包含),默认:255.255.255.255。 | 用于生成start-end范围的ip地址 | string | 随机生成192.168.20.1-192.168.20.255范围的ip地址:{"name":"ip","type":"IPv4","start":"192.168.20.1","end":"192.168.20.255"} |
| Expression | expression(string):Datafaker expression,默认:null。 | 用于使用datafaker库的expression生成随机字符串文档:https://www.datafaker.net/documentation/expressions | string | 生成人名:{"name":"name","type":"Expression","expression":"#{[Name.name](http://Name.name)}"}生成邮箱地址:{"name":"emailAddress","type":"Expression","expression":"#{internet.emailAddress}"} |
| Hlld | itemCount(bigint):总基数(总唯一元素数量),默认:1000000。batchCount(int):每次生成的hll随机添加的元素数量,默认:10000。precision(int):hll的精度,范围[4, 18],默认:12。 | 用于生成Hlld Sketch,hll算法的一种实现 | string(字节数组的base64) | 生成ip hll 每次大约包含1000个ip:{ "name": "ip_cnt", "type": "Hlld", "itemCount": 100000, "batchCount": 1000 } |
| HdrHistogram | max(bigint):histogram最大值,默认:100000。batchCount(int):每次生成的histogram随机添加的元素数量,默认:1000。numberOfSignificantValueDigits(int):histogram的精度,范围[1, 5],默认:1。 | 用于生成HdrHistogram Sketch,一种分位数Histogram Sketch | string(字节数组的base64) | 生成延时的Histogram,每次包含1000个ms延时: { "name": "ms_his", "type": "HdrHistogram", "max": 100000, "batchCount": 1000} |
| Eval | expression(string):AviatorScript expression,默认:null。 | 计算列,通过其它列计算值AviatorScript文档:https://www.yuque.com/boyan-avfmj/aviatorscript | 返回类型依赖expression,可能为任何类型 | 根据已有的in_bytes(bigint), out_bytes(bigint)列计算bytes(bigint)列其值为其它两个的和:{"name": "bytes", "type": "Eval", "expression": "in_bytes + out_bytes"} |
| Object | fields(array):每个字段的生成规则,可以使用所有type,默认:null。 | 用于生成struct/object类型fields内容和mock desc文件根配置一样,描述每个字段的生成规则 | struct/object | 生成object:{"name":"object","type":"Object","fields":[{"name":"str","type":"String","regex":"[a-z]{5,10}","nullRate":0.1},{"name":"cate","type":"String","options":["a","b","c"]}]} |
| Union | unionFields(array):每组字段生成规则,默认:null。每个元素的字段:fields(array):和Object配置一样weight(int):此组字段权重,根据权重按照比例生成数据random(boolean):随机模式,默认:true。 | 用于生成有关联的字段 | 各个字段配置类型 | 生成object_id、item_id字段,当object_id = 10时,item_id从[1, 2, 3, 4, 5]生成数据,当object_id = 20时,item_id从[6, 7]生成数据,第一种数据占比5/7,第二种数据占比2/7 |
- Union 举例
```json
{
"name": "unionFields",
"type": "Union",
"random": false,
"unionFields": [
{
"weight": 5,
"fields": [
{
"name": "object_id",
"type": "Number",
"options": [10]
},
{
"name": "item_id",
"type": "Number",
"options": [1, 2, 3, 4, 5],
"random": false
}
]
},
{
"weight": 2,
"fields": [
{
"name": "object_id",
"type": "Number",
"options": [20]
},
{
"name": "item_id",
"type": "Number",
"options": [6, 7],
"random": false
}
]
}
]
}
```
type通用参数:
| 参数 | 说明 | 例子 |
|-------------------------|-----------------------------------------|-----------------------------------------------------------------------------------------------------------------|
| nullRate(double) | 生成数据null值比率,默认是1,没有null值。 | 随机生成字符串,null值占10%:{"name":"str_regex","type":"String","regex":"[a-z]{5,10}","nullRate":0.1} |
| array(double) | 是否是数组类型,默认false。 | 生成数组字符串:{"name":"array_str","type":"String","regex":"[a-z]{5,10}","array":true,"arrayLenMin":1,"arrayLenMax":3} |
| arrayLenMin(int) | 数组最小长度(包含),默认0。array属性为true时才生效。 | |
| arrayLenMax(int) | 数组最大长度(包含),默认5。array属性为true时才生效。 | |
#### mock 示例
**各个类型生成查看**
配置:
```json
[
{
"name": "id",
"type": "UniqueSequence",
"start": 0
},
{
"name": "sub_id",
"type": "Sequence",
"start": 0
},
{
"name": "int_random",
"type": "Number",
"min": 0,
"max": 10000
},
{
"name": "int_inc",
"type": "Number",
"min": 0,
"max": 10000,
"random": false
},
{
"name": "int_options",
"type": "Number",
"options": [20, 22, 25, 30],
"random": true
},
{
"name": "int_options_round_robin",
"type": "Number",
"options": [20, 22, 25, 30],
"random": false
},
{
"name": "double_random",
"type": "Number",
"min": 0.0,
"max": 10000.0
},
{
"name": "str_regex",
"type": "String",
"regex": "[a-z]{5,10}",
"nullRate": 0.1
},
{
"name": "str_options",
"type": "String",
"options": ["a", "b", "c"]
},
{
"name": "str_options_round_robin",
"type": "String",
"options": ["a", "b", "c"],
"random": false
},
{
"name": "timestamp",
"type": "Timestamp"
},
{
"name": "timestamp_ms",
"type": "Timestamp",
"unit": "millis"
},
{
"name": "timestamp_str",
"type": "FormatTimestamp",
"format": "yyyy-MM-dd HH:mm:ss"
},
{
"name": "ip",
"type": "IpV4",
"start": "192.168.20.1",
"end": "192.168.20.255"
},
{
"name": "array_str",
"type": "String",
"options": ["a", "b", "c"],
"array": true,
"arrayLenMin": 1,
"arrayLenMax": 3
},
{
"name": "array_object",
"type": "Object",
"fields": [
{
"name": "str",
"type": "String",
"regex": "[a-z]{5,10}",
"nullRate": 0.1
},
{
"name": "name",
"type": "Expression",
"expression": "#{Name.name}"
},
{
"name": "emailAddress",
"type": "Expression",
"expression": "#{internet.emailAddress}"
}
]
}
]
```
生成数据:
```
{"id":0,"sub_id":0,"int_random":7604,"int_inc":0,"int_options":30,"int_options_round_robin":20,"double_random":2329.3205359759163,"str_regex":"wxzrpn","str_options":"b","str_options_round_robin":"a","timestamp":1717493414,"timestamp_ms":1717493414603,"timestamp_str":"2024-06-04 17:30:14","ip":"192.168.20.24","array_str":["b"],"array_object":{"str":"wvrzqde","name":"Berry Gorczany","emailAddress":"[email protected]"}}
{"id":1,"sub_id":1,"int_random":5760,"int_inc":1,"int_options":30,"int_options_round_robin":22,"double_random":9644.141255418077,"str_regex":"oadbz","str_options":"a","str_options_round_robin":"b","timestamp":1717493415,"timestamp_ms":1717493415603,"timestamp_str":"2024-06-04 17:30:15","ip":"192.168.20.127","array_str":["c"],"array_object":{"str":"bkcwtpl","name":"Alba Gottlieb","emailAddress":"[email protected]"}}
{"id":2,"sub_id":2,"int_random":3775,"int_inc":2,"int_options":20,"int_options_round_robin":25,"double_random":9573.948656302768,"str_regex":"rlhtrk","str_options":"b","str_options_round_robin":"c","timestamp":1717493416,"timestamp_ms":1717493416603,"timestamp_str":"2024-06-04 17:30:16","ip":"192.168.20.20","array_str":["b"],"array_object":{"name":"Celestina O'Reilly","emailAddress":"[email protected]"}}
{"id":3,"sub_id":3,"int_random":7877,"int_inc":3,"int_options":22,"int_options_round_robin":30,"double_random":8921.757584727951,"str_regex":"spydx","str_options":"c","str_options_round_robin":"a","timestamp":1717493417,"timestamp_ms":1717493417603,"timestamp_str":"2024-06-04 17:30:17","ip":"192.168.20.218","array_str":["a","a"],"array_object":{"name":"Dr. Nichole McGlynn","emailAddress":"[email protected]"}}
{"id":4,"sub_id":4,"int_random":8248,"int_inc":4,"int_options":30,"int_options_round_robin":20,"double_random":4105.3600047674545,"str_regex":"rbjelg","str_options":"b","str_options_round_robin":"b","timestamp":1717493418,"timestamp_ms":1717493418602,"timestamp_str":"2024-06-04 17:30:18","ip":"192.168.20.146","array_str":["b"],"array_object":{"str":"ekbyer","name":"Raul Leannon","emailAddress":"[email protected]"}}
{"id":5,"sub_id":5,"int_random":3663,"int_inc":5,"int_options":22,"int_options_round_robin":22,"double_random":7486.737315942628,"str_regex":"qyqqiyj","str_options":"c","str_options_round_robin":"c","timestamp":1717493419,"timestamp_ms":1717493419610,"timestamp_str":"2024-06-04 17:30:19","ip":"192.168.20.90","array_str":["c","b"],"array_object":{"str":"dbepb","name":"Moshe Powlowski","emailAddress":"[email protected]"}}
{"id":6,"sub_id":6,"int_random":6967,"int_inc":6,"int_options":22,"int_options_round_robin":25,"double_random":6742.751027323034,"str_regex":"slfghf","str_options":"a","str_options_round_robin":"a","timestamp":1717493420,"timestamp_ms":1717493420602,"timestamp_str":"2024-06-04 17:30:20","ip":"192.168.20.72","array_str":["b","b"],"array_object":{"name":"Alvera Graham","emailAddress":"[email protected]"}}
{"id":7,"sub_id":7,"int_random":5340,"int_inc":7,"int_options":25,"int_options_round_robin":30,"double_random":7259.505902869291,"str_regex":"yarcof","str_options":"c","str_options_round_robin":"b","timestamp":1717493421,"timestamp_ms":1717493421614,"timestamp_str":"2024-06-04 17:30:21","ip":"192.168.20.44","array_str":["a"],"array_object":{"str":"dxianwxv","name":"Pedro Kerluke","emailAddress":"[email protected]"}}
{"id":8,"sub_id":8,"int_random":8365,"int_inc":8,"int_options":25,"int_options_round_robin":20,"double_random":7142.049302311821,"str_options":"c","str_options_round_robin":"c","timestamp":1717493422,"timestamp_ms":1717493422603,"timestamp_str":"2024-06-04 17:30:22","ip":"192.168.20.197","array_str":["b"],"array_object":{"str":"mximiyd","name":"Herman Runte","emailAddress":"[email protected]"}}
{"id":9,"sub_id":9,"int_random":5944,"int_inc":9,"int_options":30,"int_options_round_robin":22,"double_random":1420.8479774375382,"str_regex":"eahpq","str_options":"b","str_options_round_robin":"a","timestamp":1717493423,"timestamp_ms":1717493423602,"timestamp_str":"2024-06-04 17:30:23","ip":"192.168.20.44","array_str":["a","a","b"],"array_object":{"str":"kseeqicxuh","name":"Kaitlyn Douglas","emailAddress":"[email protected]"}}
{"id":10,"sub_id":10,"int_random":9357,"int_inc":10,"int_options":30,"int_options_round_robin":25,"double_random":2451.2488213660886,"str_regex":"agwxbf","str_options":"b","str_options_round_robin":"b","timestamp":1717493424,"timestamp_ms":1717493424607,"timestamp_str":"2024-06-04 17:30:24","ip":"192.168.20.19","array_str":["b","c"],"array_object":{"str":"iidogsi","name":"Luigi McClure PhD","emailAddress":"[email protected]"}}
```
**object类型以及Union类型生成**
配置:
```json
[
{ "name": "name", "type": "String", "options": ["object_statistics"] },
{ "name": "timestamp_ms", "type": "Timestamp", "unit": "millis" },
{ "name": "tags", "type": "Object", "fields": [
{ "name": "vsys_id", "type": "Number", "options": [1] },
{ "name": "template_id", "type": "Number", "options": [1] },
{ "name": "chart_id", "type": "Number", "options": [1] },
{ "name": "version", "type": "Number", "options": [1] },
{ "name": "unionFields", "type": "Union", "unionFields": [
{ "weight": 2, "fields": [
{ "name": "object_type", "type": "String", "options": ["ip"] },
{ "name": "object_id", "type": "Number", "options": [7562] },
{ "name": "item_id", "type": "Number", "options": [7835, 7819] }
]
},
{ "weight": 2, "fields": [
{ "name": "object_type", "type": "String", "options": ["fqdn"] },
{ "name": "object_id", "type": "Number", "options": [13087] },
{ "name": "item_id", "type": "Number", "options": [229604,229603] }
]
}
]
}
]
},
{ "name": "fields", "type": "Object", "fields": [
{ "name": "in_bytes", "type": "Number", "min": 10000, "max": 200000},
{ "name": "out_bytes", "type": "Number", "min": 10000, "max": 200000},
{ "name": "new_in_sessions", "type": "Number", "min": 10, "max": 200},
{ "name": "new_out_sessions", "type": "Number", "min": 10, "max": 200}
]
}
]
```
生成数据:
```
{"name":"object_statistics","timestamp_ms":1717573879804,"tags":{"vsys_id":1,"template_id":1,"chart_id":1,"version":1,"object_type":"ip","object_id":7562,"item_id":7819},"fields":{"in_bytes":47083,"out_bytes":68389,"new_in_sessions":142,"new_out_sessions":92}}
{"name":"object_statistics","timestamp_ms":1717573879807,"tags":{"vsys_id":1,"template_id":1,"chart_id":1,"version":1,"object_type":"fqdn","object_id":13087,"item_id":229603},"fields":{"in_bytes":81118,"out_bytes":107287,"new_in_sessions":98,"new_out_sessions":86}}
{"name":"object_statistics","timestamp_ms":1717573879808,"tags":{"vsys_id":1,"template_id":1,"chart_id":1,"version":1,"object_type":"ip","object_id":7562,"item_id":7835},"fields":{"in_bytes":61395,"out_bytes":111095,"new_in_sessions":87,"new_out_sessions":149}}
{"name":"object_statistics","timestamp_ms":1717573879808,"tags":{"vsys_id":1,"template_id":1,"chart_id":1,"version":1,"object_type":"fqdn","object_id":13087,"item_id":229603},"fields":{"in_bytes":145986,"out_bytes":12166,"new_in_sessions":169,"new_out_sessions":127}}
{"name":"object_statistics","timestamp_ms":1717573880806,"tags":{"vsys_id":1,"template_id":1,"chart_id":1,"version":1,"object_type":"fqdn","object_id":13087,"item_id":229604},"fields":{"in_bytes":112797,"out_bytes":120310,"new_in_sessions":12,"new_out_sessions":177}}
{"name":"object_statistics","timestamp_ms":1717573880806,"tags":{"vsys_id":1,"template_id":1,"chart_id":1,"version":1,"object_type":"fqdn","object_id":13087,"item_id":229604},"fields":{"in_bytes":180960,"out_bytes":118214,"new_in_sessions":106,"new_out_sessions":73}}
{"name":"object_statistics","timestamp_ms":1717573880806,"tags":{"vsys_id":1,"template_id":1,"chart_id":1,"version":1,"object_type":"ip","object_id":7562,"item_id":7819},"fields":{"in_bytes":91394,"out_bytes":105840,"new_in_sessions":74,"new_out_sessions":177}}
{"name":"object_statistics","timestamp_ms":1717573880806,"tags":{"vsys_id":1,"template_id":1,"chart_id":1,"version":1,"object_type":"ip","object_id":7562,"item_id":7835},"fields":{"in_bytes":79266,"out_bytes":95721,"new_in_sessions":50,"new_out_sessions":88}}```
```
### Inline Source
用于简单测试format,function,sink等,这个source每个1s发送一条配置的data数据
```yaml
sources:
inline_source:
type : inline
fields:
- name: log_id
type: bigint
- name: recv_time
type: bigint
- name: client_ip
type: string
properties:
# 单条数据
data: '{"log_id": 1, "recv_time":"111", "client_ip":"192.168.0.1"}'
# 多条数据
# data: '[{"log_id": 1, "recv_time":"111", "client_ip":"192.168.0.1"}, {"log_id": 2, "recv_time":"222", "client_ip":"192.168.0.2"}]'
# data: '["1,111,192.168.0.1", "2,222,192.168.0.2"]'
format: json
json.ignore.parse.errors: false
```
| 属性名 | 必填 | 默认值 | 类型 | 描述 |
|-------------------|----|--------|----------|---------------------------------------------------|
| **data** | Y | - | String | source发送的数据,如果是json array形式则当做单独解析发送array每个元素 |
| **format** | Y | - | String | 使用的format |
| **type** | N | string | String | 数据类型:string(UTF8字符串),hex(十六进制编码),base64(base64编码) |
| interval.per.row | N | 1s | Duration | 发送每行数据间隔时间 |
| repeat.count | N | -1 | Integer | 重复发送data测试,负数则一直循环重复发送 |
| format properties | N | - | String | format properties配置,key为format值.+原始key |
## 过滤器(Filters)
```yaml
filters:
http_filter:
type: aviator
properties:
expression: event.decoded_as == 'HTTP' && event.server_port = 80
```
| 属性名 | 默认值 | 类型 | 必填 | 描述 |
|----------------|-----|--------|----|------------------------------------|
| **name** | - | String | Y | 过滤器名称,唯一标识,用于任务编排。例如:“http_filter“ |
| **type** | - | String | Y | 数据源类型。例如:aviator |
| **properties** | | | | |
| expression | - | String | N | 基于AviatorScript语法,过滤符合条件的事件; |
## 分流器(Splits)
```yaml
splits:
decoded_as_split:
type: split
rules:
- tag: http_tag
expression: event.decoded_as == 'HTTP'
- tag: dns_tag
expression: event.decoded_as == 'DNS'
```
| 属性名 | 默认值 | 类型 | 必填 | 描述 |
|------------|-----|--------|----|-----------------------------------------|
| **name** | - | String | Y | 过滤器名称,唯一标识,用于任务编排。例如:“decode_as_filter“ |
| **type** | - | String | Y | 数据源类型。例如:split |
| **rules** | | | | |
| tag | - | String | Y | 分流标记,同时需要在topology中配置,具体参见任务编排 |
| expression | - | String | Y | 基于AviatorScript语法,将符合条件的数据分流至下游算子; |
## 任务处理器 (Processors)
### Pre-processing Pipeline
```yaml
pre_processing_pipelines:
common_pre_processor:
type: projection
output_fields: []
functions:
- function: CURRENT_UNIX_TIMESTAMP
lookup_fields: []
output_fields: [processing_time]
parameters:
precision: milliseconds
```
### Processing Pipeline
```yaml
processing_pipelines:
session_record_processor:
type: projection
output_fields: []
functions:
- function: DOMAIN
lookup_fields: [http_host, ssl_sni, quic_sni]
output_fields: [server_domain]
option: FIRST_SIGNIFICANT_SUBDOMAIN
- function: ASN_LOOKUP
lookup_fields: [server_ip]
output_fields: [server_asn]
parameters:
option: IP_TO_ASN
vendor_id: tsg_asnlookup
- name: BASE64_DECODE_TO_STRING
lookup_fields: [mail_subject,mail_subject_charset]
output_fields: [mail_subject]
aggregate_processor:
type: aggregate
group_by_fields: [server_ip,server_port,client_ip,client_port]
window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
window_size: 60
window_slide: 10 #滑动窗口步长
mini_batch: true #是否开启预聚合优化
functions:
- function: NUMBER_SUM
lookup_fields: [ sent_pkts ]
output_fields: [ sent_pkts_sum ]
table_processor:
type: table
functions:
- function: JSON_UNROLL
lookup_fields: [ device_tag ]
output_fields: [ new_name2 ]
parameters:
path: tags
new_path: newtags
```
#### Projection Processor
| 属性名 | 默认值 | 类型 | 必填 | 描述 |
|---------------|-----|---------------|----|-------------------------|
| name | - | String | Y | Processor名称,唯一标识,用于任务编排 |
| type | - | String | Y | 数据源类型:projection |
| output_fields | - | Array(String) | N | 输出指定字段,默认发送全部字段。 |
| remove_fields | - | Array(String) | N | 删除指定字段,默认为空。 |
| functions | - | List(UDF) | Y | 自定义函数列表 |
| | | | | |
#### Aggregate Processor
| 属性名 | 默认值 | 类型 | 必填 | 描述 |
|------------------------|-----|------------|----|------------------------------------------------------------------------------------------------|
| name | - | String | Y | Processor名称,唯一标识,用于任务编排 |
| type | - | String | Y | 数据源类型:aggregate |
| group_by_fields | - | Integer | Y | 聚合的维度列 |
| window_type | - | Enum | Y | 时间窗口类型:tumbling_processing_time,tumbling_event_time,sliding_processing_time,sliding_event_time |
| window_size | - | Integer | Y | 窗口的大小,单位秒 |
| window_slide | - | Integer | N | 滑动窗口需要指定滑动步长,单位秒 |
| window_timestamp_field | - | String | N | 窗口开始的时间戳(ms)做为value输出的字段名 |
| mini_batch | - | Boolean | N | 默认为false,是否开启预聚合优化,在按照key进行聚合之前,先在本地进行汇聚,进而降低网络传输数据量 |
| functions | - | List(UDAF) | Y | 自定义函数列表 |
#### Table Processor
| 属性名 | 默认值 | 类型 | 必填 | 描述 |
|-----------|-----|------------|----|-------------------------|
| name | - | String | Y | Processor名称,唯一标识,用于任务编排 |
| type | - | String | Y | 数据源类型:table |
| functions | - | List(UDTF) | Y | 自定义函数列表 |
## 输出(Sinks)
### Sink通用配置
```yaml
sinks:
kafka_sink:
# sink标识
type: kafka
# sink schema
# schema:
# sink属性配置
properties:
prop_key1: prop_value1
prop_key2: prop_value2
#...
```
| 属性名 | 必填 | 默认值 | 类型 | 描述 |
|----------------|-------|---------|---------|------------------|
| **type** | Y | - | String | sink唯一标识 |
| `schema` | N | - | Map | 同source schema |
| properties | Y | - | Object | sink属性配置 |
### Kafka Sink
```yaml
sinks:
kafka_sink:
type: kafka
properties:
topic: SESSION-RECORD-JSON
kafka.bootstrap.servers: 192.168.44.12:9092
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
kafka.batch.size: 262144
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
kafka.compression.type: snappy
format: json
```
| 属性名 | 必填 | 默认值 | 类型 | 描述 |
|------------------------------------|----|--------|----------|--------------------------------------------------------|
| **topic** | Y | - | String | Kafka Topic名称。 |
| **kafka.bootstrap.servers** | Y | - | String | Kafka Broker 地址 |
| **format** | Y | - | String | format,用来序列化消息JSONProtobufCSV... |
| log.failures.only | N | true | Boolean | producer发生error时只打印日志, 否则抛出异常程序停止(重试) |
| rate.limiting.strategy | N | none | String | 限速策略:none:不限速(默认)sliding_window:限速,使用滑动窗口计算速率 |
| rate.limiting.limit.rate | N | 10Mbps | String | 限制的最大速率:单位必须是Mbps、Kbps、bps,例如:10Mbps, 10Kbps, 10240bps |
| rate.limiting.window.size | N | 5 | Integer | 窗口大小,单位秒 |
| rate.limiting.block.duration | N | 5min | Duration | 对首次超出限流数据阻塞,最长阻塞多长时间后超出限流数据全部丢弃 |
| rate.limiting.block.reset.duration | N | 30s | Duration | 超速阻塞后速率恢复正常多长时间后重置超速阻塞状态 |
| Kafka properties | N | - | String | kafka consumer/producer properties配置,key为kafka.+原始key |
| format properties | N | - | String | format properties配置,key为format值.+原始key |
### ClickHouse Sink
```yaml
sinks:
clickhouse_sink:
type: clickhouse
properties:
host: 192.168.40.222:9001,192.168.40.223:9001
table: tsg_galaxy_v3.session_record_local_old
batch.size: 100000
batch.interval: 30s
connection.user: default
connection.password: galaxy2019
```
| 属性名 | 必填 | 默认值 | 类型 | 描述 |
|----------------------------|----|---------|------------|---------------------------------------------------------------|
| **host** | Y | - | String | clickhouse host和tcp port信息。格式:host1:port,host2:port ...。 |
| **table** | Y | - | String | clickhouse table name. |
| **batch.size** | N | 100000 | Integer | 最大flush size,超过size会立刻flush。 |
| **batch.byte.size** | N | 200mb | MemorySize | 最大flush buffer字节大小,超过会立刻flush。 |
| **batch.interval** | N | 30s | Duration | 最大flush间隔,超过会立刻flush。 |
| connection.user | Y | - | String | clickhouse 连接 用户名 |
| connection.password | Y | - | String | clickhouse 连接 密码 |
| connection.database | N | default | String | clickhouse 连接 默认数据库 |
| connection.connect_timeout | N | 30 | Integer | 连接超时(单位秒) |
| connection.query_timeout | N | 300 | Integer | 查询超时(单位秒) |
| connection properties | N | - | String | clickhouse jdbc connection properties配置,key为connection.+原始key |
### Print Sink
用来测试的sink,把元素输出到标准输出或输出日志。
```yaml
sinks:
print_sink:
type: print
properties:
format: json
```
| 属性名 | 必填 | 默认值 | 类型 | 描述 |
|-------------------|----|--------|--------|----------------------------------------|
| **format** | Y | - | String | format,用来序列化消息 |
| **mode** | N | stdout | Enum | 输出模式,可选值:stdout,log_info,log_warn,null |
| format properties | N | - | String | format properties配置,key为format值.+原始key |
## Formats
### JSON
```yaml
sources:
kafka_source:
type : kafka
properties:
topic: SESSION-RECORD-COMPLETED
kafka.bootstrap.servers: 192.168.44.11:9092
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.group.id: SESSION-RECORD-COMPLETED-GROUP-GROOT-STREAM-20231021
kafka.auto.offset.reset: latest
format: json
json.ignore.parse.errors: true
```
| 属性名 | 必填 | 默认值 | 类型 | 描述 |
|---------------------|----|-------|---------|------------------------|
| ignore.parse.errors | N | false | Boolean | json解析时发生错误时忽略,否则抛出异常。 |
### MessagePack
```yaml
kafka_source_msgpack:
type : kafka
properties:
topic: msgpack-test
format: msgpack
kafka.bootstrap.servers: 192.168.44.12:9092
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.group.id: msgpack-test
kafka.auto.offset.reset: latest
inline_source_msgpack:
type : inline
properties:
data: g6Zsb2dfaWQBqXJlY3ZfdGltZc5mF3f5qWNsaWVudF9pcKsxOTIuMTY4LjAuMQ==
type: base64
format: msgpack
```
- 只需要指定format为msgpack,没有其它的参数。
- 支持所有数据类型的解析,包括复杂数据类型struct,array,以及binary。
### Protobuf
```yaml
sources:
inline_source_protobuf:
type : inline
properties:
data: CIin2awGEICAoLC/hYzKAhoEQkFTRSCch8z3wtqEhAQo6o/Xmc0xMMCy15nNMTjWIkDRCEiIp9msBlCIp9msBloIMjE0MjYwMDNg//8DaP//A3JqeyJ0YWdzIjpbeyJ0YWciOiJkYXRhX2NlbnRlciIsInZhbHVlIjoiY2VudGVyLXh4Zy05MTQwIn0seyJ0YWciOiJkZXZpY2VfZ3JvdXAiLCJ2YWx1ZSI6Imdyb3VwLXh4Zy05MTQwIn1dfXoPY2VudGVyLXh4Zy05MTQwggEOZ3JvdXAteHhnLTkxNDCKAQ0xOTIuMTY4LjQwLjgxkAEEmAEBoAEBqAGQwAGyAQdbMSwxLDJd4gEDt+gY4gINMTkyLjU2LjE1MS44MOgCoeYD8gIHV2luZG93c/oCGOe+juWbvS5Vbmtub3duLlVua25vd24uLrIDDTE5Mi41Ni4yMjIuOTO4A/ZwwgMFTGludXjKAxjnvo7lm70uVW5rbm93bi5Vbmtub3duLi6SBAN0Y3CaBBFFVEhFUk5FVC5JUHY0LlRDULAMBLgMBcAM9gHIDJEOoA2AAagN8cr+jgKwDezksIAPwg0RYTI6ZmE6ZGM6NTY6Yzc6YjPKDRE0ODo3Mzo5Nzo5NjozODoyMNINETQ4OjczOjk3Ojk2OjM4OjIw2g0RYTI6ZmE6ZGM6NTY6Yzc6YjM=
type: base64
format: protobuf
protobuf.descriptor.file.path: ./config/session_record_test.desc
protobuf.message.name: SessionRecord
```
| 属性名 | 必填 | 默认值 | 类型 | 描述 |
|----------------------|----|-------|---------|-----------------------------------------------------------------|
| descriptor.file.path | Y | - | String | The Protobuf descriptor file path. |
| message.name | Y | - | String | The protobuf MessageName to look for in the descriptor file. |
| ignore.parse.errors | N | false | Boolean | protobuf解析时发生错误时忽略,否则抛出异常。 |
| emit.default.values | N | false | Boolean | protobuf解析时是否设置默认值。不建议配置,严重影响性能。基本数据类型建议使用optional配置来显式处理null值。 |
protobuf 类型与内置类型对应表:
| protobuf类型 | 类型(原始对应类型) | 可以转换的类型 | 描述 |
|--------------------------------------|------------|----------------------------------------------|---------------------------------------------------------------------|
| int3,uint32,sint32,fixed32,sfixed32 | int | int, bigint, float, double(序列化时支持string类型转换) | 建议使用int32 其次使用sint32,不建议使用uint32(java读取出来是int类型 第一位代表符号位,可能读取出来是负数) |
| int64,uint64,sint64,fixed64,sfixed64 | bigint | int, bigint, float, double(序列化时支持string类型转换) | 建议使用int64,其次使用sint64 |
| float | float | int, bigint, float, double(序列化时支持string类型转换) | 建议使用double |
| double | double | int, bigint, float, double(序列化时支持string类型转换) | 建议使用double |
| bool | boolean | boolean, int(0为false, 非0为true) | 不建议使用bool,使用int32代替 |
| enum | int | int | 不建议使用enum,使用int32代替 |
| string | string | string(序列化时支持所有类型,调用toString方法) | |
| bytes | binary | binary | |
| message (结构体类型) | struct | struct | |
| repeated type (数组类型) | array | array | |
protobuf format使用步骤:
1. 定义proto文件(只支持proto3语法),int double等数值类型有null值时添加optional,建议int double总是添加optional选项。
2. 生成desc二进制文件(使用23.4版本)
示例:定义proto文件
```
syntax = "proto3";
// [START java_declaration]
// option java_multiple_files = true;
option java_package = "com.geedgenetworks.proto";
option java_outer_classname = "SessionRecordProtos";
// [END java_declaration]
message SessionRecord {
optional int64 recv_time = 1;
optional int64 log_id = 2;
string decoded_as = 3;
optional int64 session_id = 4;
optional int64 start_timestamp_ms = 5;
optional int64 end_timestamp_ms = 6;
optional int32 duration_ms = 7;
optional int32 tcp_handshake_latency_ms = 8;
optional int64 ingestion_time = 9;
optional int64 processing_time = 10;
string device_id = 11;
optional int32 out_link_id = 12;
optional int32 in_link_id = 13;
string device_tag = 14;
string data_center = 15;
string device_group = 16;
string sled_ip = 17;
optional int32 address_type = 18;
optional int32 vsys_id = 19;
optional int32 t_vsys_id = 20;
optional int64 flags = 21;
string flags_identify_info = 22;
repeated int64 security_rule_list = 23;
string security_action = 24;
repeated int64 monitor_rule_list = 25;
repeated int64 shaping_rule_list = 26;
repeated int64 proxy_rule_list = 27;
repeated int64 statistics_rule_list = 28;
repeated int64 sc_rule_list = 29;
repeated int64 sc_rsp_raw = 30;
repeated int64 sc_rsp_decrypted = 31;
string proxy_action = 32;
optional int32 proxy_pinning_status = 33;
optional int32 proxy_intercept_status = 34;
string proxy_passthrough_reason = 35;
optional int32 proxy_client_side_latency_ms = 36;
optional int32 proxy_server_side_latency_ms = 37;
string proxy_client_side_version = 38;
string proxy_server_side_version = 39;
optional int32 proxy_cert_verify = 40;
string proxy_intercept_error = 41;
optional int32 monitor_mirrored_pkts = 42;
optional int32 monitor_mirrored_bytes = 43;
string client_ip = 44;
optional int32 client_port = 45;
string client_os_desc = 46;
string client_geolocation = 47;
optional int64 client_asn = 48;
string subscriber_id = 49;
string imei = 50;
string imsi = 51;
string phone_number = 52;
string apn = 53;
string server_ip = 54;
optional int32 server_port = 55;
string server_os_desc = 56;
string server_geolocation = 57;
optional int64 server_asn = 58;
string server_fqdn = 59;
string server_domain = 60;
string app_transition = 61;
string app = 62;
string app_debug_info = 63;
string app_content = 64;
repeated int64 fqdn_category_list = 65;
string ip_protocol = 66;
string decoded_path = 67;
optional int32 dns_message_id = 68;
optional int32 dns_qr = 69;
optional int32 dns_opcode = 70;
optional int32 dns_aa = 71;
optional int32 dns_tc = 72;
optional int32 dns_rd = 73;
optional int32 dns_ra = 74;
optional int32 dns_rcode = 75;
optional int32 dns_qdcount = 76;
optional int32 dns_ancount = 77;
optional int32 dns_nscount = 78;
optional int32 dns_arcount = 79;
string dns_qname = 80;
optional int32 dns_qtype = 81;
optional int32 dns_qclass = 82;
string dns_cname = 83;
optional int32 dns_sub = 84;
string dns_rr = 85;
optional int32 dns_response_latency_ms = 86;
string http_url = 87;
string http_host = 88;
string http_request_line = 89;
string http_response_line = 90;
string http_request_body = 91;
string http_response_body = 92;
optional int32 http_proxy_flag = 93;
optional int32 http_sequence = 94;
string http_cookie = 95;
string http_referer = 96;
string http_user_agent = 97;
optional int64 http_request_content_length = 98;
string http_request_content_type = 99;
optional int64 http_response_content_length = 100;
string http_response_content_type = 101;
string http_set_cookie = 102;
string http_version = 103;
optional int32 http_status_code = 104;
optional int32 http_response_latency_ms = 105;
optional int32 http_session_duration_ms = 106;
optional int64 http_action_file_size = 107;
string ssl_version = 108;
string ssl_sni = 109;
string ssl_san = 110;
string ssl_cn = 111;
optional int32 ssl_handshake_latency_ms = 112;
string ssl_ja3_hash = 113;
string ssl_ja3s_hash = 114;
string ssl_cert_issuer = 115;
string ssl_cert_subject = 116;
optional int32 ssl_esni_flag = 117;
optional int32 ssl_ech_flag = 118;
string dtls_cookie = 119;
string dtls_version = 120;
string dtls_sni = 121;
string dtls_san = 122;
string dtls_cn = 123;
optional int32 dtls_handshake_latency_ms = 124;
string dtls_ja3_fingerprint = 125;
string dtls_ja3_hash = 126;
string dtls_cert_issuer = 127;
string dtls_cert_subject = 128;
string mail_protocol_type = 129;
string mail_account = 130;
string mail_from_cmd = 131;
string mail_to_cmd = 132;
string mail_from = 133;
string mail_password = 134;
string mail_to = 135;
string mail_cc = 136;
string mail_bcc = 137;
string mail_subject = 138;
string mail_subject_charset = 139;
string mail_attachment_name = 140;
string mail_attachment_name_charset = 141;
string mail_eml_file = 142;
string ftp_account = 143;
string ftp_url = 144;
string ftp_link_type = 145;
string quic_version = 146;
string quic_sni = 147;
string quic_user_agent = 148;
string rdp_cookie = 149;
string rdp_security_protocol = 150;
string rdp_client_channels = 151;
string rdp_keyboard_layout = 152;
string rdp_client_version = 153;
string rdp_client_name = 154;
string rdp_client_product_id = 155;
string rdp_desktop_width = 156;
string rdp_desktop_height = 157;
string rdp_requested_color_depth = 158;
string rdp_certificate_type = 159;
optional int32 rdp_certificate_count = 160;
optional int32 rdp_certificate_permanent = 161;
string rdp_encryption_level = 162;
string rdp_encryption_method = 163;
string ssh_version = 164;
string ssh_auth_success = 165;
string ssh_client_version = 166;
string ssh_server_version = 167;
string ssh_cipher_alg = 168;
string ssh_mac_alg = 169;
string ssh_compression_alg = 170;
string ssh_kex_alg = 171;
string ssh_host_key_alg = 172;
string ssh_host_key = 173;
string ssh_hassh = 174;
string sip_call_id = 175;
string sip_originator_description = 176;
string sip_responder_description = 177;
string sip_user_agent = 178;
string sip_server = 179;
string sip_originator_sdp_connect_ip = 180;
optional int32 sip_originator_sdp_media_port = 181;
string sip_originator_sdp_media_type = 182;
string sip_originator_sdp_content = 183;
string sip_responder_sdp_connect_ip = 184;
optional int32 sip_responder_sdp_media_port = 185;
string sip_responder_sdp_media_type = 186;
string sip_responder_sdp_content = 187;
optional int32 sip_duration_s = 188;
string sip_bye = 189;
optional int32 rtp_payload_type_c2s = 190;
optional int32 rtp_payload_type_s2c = 191;
string rtp_pcap_path = 192;
optional int32 rtp_originator_dir = 193;
string stratum_cryptocurrency = 194;
string stratum_mining_pools = 195;
string stratum_mining_program = 196;
string stratum_mining_subscribe = 197;
optional int64 sent_pkts = 198;
optional int64 received_pkts = 199;
optional int64 sent_bytes = 200;
optional int64 received_bytes = 201;
optional int64 tcp_c2s_ip_fragments = 202;
optional int64 tcp_s2c_ip_fragments = 203;
optional int64 tcp_c2s_lost_bytes = 204;
optional int64 tcp_s2c_lost_bytes = 205;
optional int64 tcp_c2s_o3_pkts = 206;
optional int64 tcp_s2c_o3_pkts = 207;
optional int64 tcp_c2s_rtx_pkts = 208;
optional int64 tcp_s2c_rtx_pkts = 209;
optional int64 tcp_c2s_rtx_bytes = 210;
optional int64 tcp_s2c_rtx_bytes = 211;
optional int32 tcp_rtt_ms = 212;
optional int64 tcp_client_isn = 213;
optional int64 tcp_server_isn = 214;
string packet_capture_file = 215;
string in_src_mac = 216;
string out_src_mac = 217;
string in_dest_mac = 218;
string out_dest_mac = 219;
string tunnels = 220;
optional int32 dup_traffic_flag = 221;
string tunnel_endpoint_a_desc = 222;
string tunnel_endpoint_b_desc = 223;
}
```
生成desc二进制文件
```
protoc --descriptor_set_out=session_record_test.desc session_record_test.proto
```
### Raw
Raw format允许读写原始(字节数组)值作为单个列。主要用于不涉及修改message从kakfa到kakfa同步topic场景。只需要指定format为raw,没有其它的参数。
```yaml
sources:
inline_source:
type: inline
properties:
data: 123abc
format: raw
sinks:
print_sink:
type: print
properties:
format: raw
```
### CSV
按照既定的Schema读取/写入csv格式数据。
| 属性名 | 必填 | 默认值 | 类型 | 描述 |
| --------------------------- | ---- | ------ | ------- | ------------------------------------------------------------ |
| csv.field.delimiter | Y | , | String | 指定字段值之间的分隔符,默认为逗号 |
| csv.quote.character | N | " | String | 指定用于包围字段值的引号字符,默认为双引号"。如果csv.disable.quote.character为true,无法使用该选项。 |
| csv.disable.quote.character | N | false | Boolean | 是否禁用包围字段值的引号字符。默认为false |
| csv.allow.comments | N | false | Boolean | 忽略以 `#` 开头的注释行(默认情况下禁用)。如果启用此选项,确保同时忽略解析错误,以允许存在空行。这意味着在处理 CSV 文件时,任何以 `#` 开头的行都将被视为注释,不会被解析或读取。 |
| csv.ignore.parse.errors | N | false | Boolean | 忽略解析错误,默认为false。遇到格式错误输出异常日志。 |
| csv.array.element.delimiter | N | ; | String | 数组中元素的分隔符 |
| csv.escape.character | N | | String | 转义特殊字符的字符。例如:分隔符、引号或换行符。 |
| csv.null.literal | N | | String | 指定NULL值的字符串 |
# 任务编排
```yaml
application:
env:
name: example-inline-to-print
parallelism: 3
shade.identifier: aes
kms.type: local
pipeline:
object-reuse: true
execution:
restart:
strategy: none
properties: # job级别变量,同名情况下会覆盖全局变量
hos.bucket.name.rtp_file: traffic_rtp_file_bucket
hos.bucket.name.http_file: traffic_http_file_bucket
hos.bucket.name.eml_file: traffic_eml_file_bucket
hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
# RestfulAPI 取需要加密的字段,返回数据类型为Array
projection.encrypt.schema.registry.uri: 127.0.0.1:9999/v1/schema/session_record?option=encrypt_fields
topology:
- name: inline_source
downstream: [decoded_as_split]
- name: decoded_as_split
tags: [http_tag, dns_tag] #需要在分流处理器的rules中进行定义,分流规则按照数组中的顺序对应downstream中的处理器,支持Pipelines,Sinks,Filters
downstream: [ projection_processor, aggregate_processor]
- name: projection_processor
downstream: [ print_sink ]
- name: aggregate_processor
downstream: [ print_sink ]
- name: print_sink
downstream: []
```
# 函数定义
## 内置UDF
函数可读取job配置文件(grootstream_job.yaml),每个函数在处理器管道中(Processor Pipeline )独立运行,互不影响。一个函数包括名称、传递数据(Event)、函数上下文信息(UDF Context) 及执行方法 evaluate(Event)。
- Function Name :函数名称,命名全大写单词之间用下划线分割,用于函数注册。
- Event:处理的事件,数据组织Map<field_name, field_value> event结构。
- UDF Context 函数执行环境上下文,包括输入数据,配置信息及其它状态信息。
- filter :过滤表达式;String类型,默认为空,它用于筛选需要经过函数处理的事件,具体过滤方式参考AviatorScript语法。
- lookup_fields:查找的字段;Array[String]类型,允许指定多个字段,在事件中查找字段名对应的值。
- output_fields:输出的字段;Array[String]类型,允许指定多个字段,用于将函数执行的结果附加到事件中。如果输出字段与查找字段相匹配,它们将覆盖原有字段的值;如果不匹配,将会在日志中添加一个新字段。
- parameters:扩展参数;选填,Map<String, Object>
> 函数表达式:FUNCTION_NAME(filter, lookup_fields, output_fields[, parameters])
### 标量函数
#### ASN Lookup
查找IP所属AS号。
- Parameters
- kb_name=`<string>` // 使用的知识库的名称 ,需要预先在全局配置中进行注册。
- option = `<string>`
- IP_TO_ASN
```yaml
- function: ASN_LOOKUP
lookup_fields: [ server_ip ]
output_fields: [ server_asn ]
parameters:
option: IP_TO_ASN
kb_name: tsg_ip_asn
```
#### Base64 Decode
将 Base64 编码二进制数据解码转换为字符串。
Parameters:
- value_field =<String>
- charset_field=<String> 可选,默认为UTF-8
```yaml
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]
parameters:
value_field: mail_subject
charset_field: mail_subject_charset
```
#### Base64 Encode
将 Base64 二进制数据编码转换为字符串。
Parameters:
- value_field =<String>
```yaml
- function: BASE64_ENCODE_TO_STRING
output_fields: [packet]
parameters:
value_field: packet
```
#### Current Unix Timestamp
获取系统当前时间戳。
- Parameters
- precision=seconds | milliseconds
```yaml
- function: CURRENT_UNIX_TIMESTAMP
output_fields: [ processing_time ]
parameters:
precision: milliseconds
```
#### Domain
域名处理函数。
Parameters:
- option = `<string>`
- TOP_LEVEL_DOMAIN 顶级域名
- FIRST_SIGNIFICANT_SUBDOMAIN 获取二级有效域名
- FQDN 获取FQDN
```yaml
- function: DOMAIN
lookup_fields: [ http_host,ssl_sni,dtls_sni,quic_sni ]
output_fields: [ server_domain ]
parameters:
option: FIRST_SIGNIFICANT_SUBDOMAIN
```
#### Drop
满足Filter表达式的日志增加删除标记,下游函数将不再执行,当前Projection Function 不再发送事件到下游。设置Event isDropped标记为true。
- 日志格式数据(无嵌套),丢弃符合过滤条件的数据
```shell
- function: DROP
filter: event.c2s_byte_num <10
```
- 丢弃object_id为13167 数据
```shell
- function: DROP
filter: event.object_id == 13167
# Input: {"object_id":13176,"item_id":83989295}
```
- metrics格式数据(多级嵌套),丢弃object_id为102且item_id大于等于2的数据,或object_id等于13176且item_id大于83989294的数据
```shell
- function: DROP
filter: (event.tags.object_id == 102 && event.tags.item_id >= 2) || (event.tags.object_id ==13176 && event.tags.item_id >= 83989294)
# Input: {"tags":{"object_id":13176,"item_id":83989295},"fields":{"in_bytes":1765830,"out_bytes":27446,"bytes":1793276},"timestamp_ms":1714443502000}
```
#### Encrypt
对敏感信息进行加密。支持引用动态规则,获取需要加密的字段,选择是否对当前字段进行加密 。
- 加密基于 Vault KMS,密钥支持动态更新;如果从 Vault 加载失败,系统将使用最近一次有效的密钥来加密数据。
- 读取任务变量 `projection.encrypt.schema.registry.uri`,返回敏感字段(类型为 Array),可以据此判断当前字段是否需要加密。如果访问 schema 失败,将使用最近一次的有效字段。
Parameters:
- identifier = `<string>` 加密算法唯一标识。支持:aes-128-gcm96, aes-256-gcm96, sm4-gcm96
- default_val= `<string>` 加密失败输出该值,默认将输出原值
```
- function: ENCRYPT
lookup_fields: [ phone_number ]
output_fields: [ phone_number ]
parameters:
identifier: aes-128-gcm96
```
#### Eval
通过值表达式,获取符合条件的值,添加到字段中。同时可以选择保留或删除指定的字段。
Parameters:
- value_expression=`<string>` 基于表达式设置字段的值,可以是一个常量
Example 1: 创建一个字段ingestion_time, 取自 recv_time值
```
- function: EVAL
output_fields: [ ingestion_time ]
parameters:
value_expression: 'recv_time'
```
Example 2: 创建一个字段internal_ip, 如果flags&8=8?client_ip : server_ip
```
- function: EVAL
output_fields: [ internal_ip ]
parameters:
value_expression: 'flags&8=8? client_ip : server_ip'
```
#### Flatten
扁平化嵌套结构使其成为顶级字段。新字段命名使用每层结构名称作为前缀,中间默认用句点“.”分隔。
- Parameters
- prefix= `<string>` //为扁平化的字段名称指定前缀。默认为空。
- depth=<int> // 扁平化的嵌套级别的最大值. 设置为1,仅扁平化顶级结构。默认设置为5
- delimiter=<String> 组合父级与子级名称的分隔符。默认为"."。
- json_string_keys=Array[string] 标识哪些JsonString格式的数据需要扁平化。默认为空。
Example 1: 对Metrics的fields,tags 嵌套结构进行扁平化,如果lookup_fields为空则对所有嵌套结构进行扁平化。
```
- function: FLATTEN
lookup_fields: [ tags, fields ]
```
Example 2: 会话日志字段encapsulation(JsonString格式)嵌套结构进行扁平化,并增加前缀tunnels,嵌套深度指定3,中间用下划线“."分隔
```yaml
- function: FLATTEN
lookup_fields: [ encapsulation ]
parameters:
prefix: tunnels
depth: 3
delimiter: .
json_string_keys: [ encapsulation]
# Output: tunnels.encapsulation.ipv4.client_ip: 192.168.4.1
```
#### From Unix Timestamp
将时间戳转换为日期类型,返回UTC日期时间格式字符串,输入支持10位和13位时间戳。
- Parameters
- precision=seconds // yyyy-MM-dd HH:mm:ss
- precision=milliseconds // yyyy-MM-dd HH:mm:ss:SSS
```yaml
- function: FROM_UNIX_TIMESTAMP
lookup_fields: [recv_time]
output_fields: [recv_time_string]
parameters:
precision: seconds
```
#### Generate String Array
创建字符串数组
```yaml
- function: GENERATE_STRING_ARRAY
lookup_fields: [ client_asn,server_asn ]
output_fields: [ asn_list ]
```
#### GeoIP Lookup
查找IP地理位置信息。
- Parameters
- kb_name=`<string>` // 使用的知识库的名称 ,需要预先在全局配置中进行注册。
- option = `<string>`
- IP_TO_COUNTRY 所属国家或地区
- IP_TO_PROVINCE 所属省/州
- IP_TO_CITY 所属城市
- IP_TO_SUBDIVISION_ADDR 如上三级以下信息,包括区、街道等。
- IP_TO_DETAIL 所属详情,包括如上四级,中间用英文句点分隔
- IP_TO_LATLNG 所属经纬度,中间用英文逗号分隔
- IP_TO_PROVIDER 所属服务提供商(ISP)
- IP_TO_JSON 返回所属位置详情,格式为JSON
- IP_TO_OBJECT 返回所属位置详情,格式为Response Object
- geolocation_field_mappingobject_key : field_name
```yaml
- function: GEOIP_LOOKUP
lookup_fields: [ server_ip ]
output_fields: [ server_geolocation ]
parameters:
kb_name: tsg_ip_location
option: IP_TO_DETAIL
```
```yaml
- function: GEOIP_LOOKUP
lookup_fields: [ server_ip ]
output_fields: [ server_geolocation ]
parameters:
kb_name: tsg_ip_location
option: IP_TO_OBJECT
geolocation_field_mapping:
COUNTRY: client_country_region
PROVINCE: client_super_admin_area
# 当option为“IP_TO_OBJECT” 时,支持字段映射(geolocation_field_mapping):
# - COUNTRY - 国家或地区
# - PROVINCE - 省/州
# - CITY - 城市
# - LONGITUDE - 精度
# - LATITUDE - 纬度
# - ISP - 运营商
# - ORGANIZATION - 组织
```
#### HMAC
使用密钥和消息使用哈希算法生成一个固定长度的消息摘要。HMAC(Hash-based Message Authentication Code)是一种基于哈希函数的消息认证码,用于验证数据的完整性和真实性。
Parameters:
- secret_key = `<string>` 用于生成MAC的密钥。
- algorithm= `<string>` 用于生成MAC的HASH算法。默认是`sha256`
- output_format = `<string>` 输出MAC的格式。默认为`'base64'` 。支持:`base64` | `hex `。
```
- function: HMAC
lookup_fields: [ phone_number ]
output_fields: [ phone_number_hmac ]
parameters:
secret_key: ******
output_format: base64
```
#### JSON Extract
解析JSON字段,通过表达式抽取json部分内容。
- Parameters
- value_expression=`<string>` //基于JsonPath表达式设置字段的值
```
JSON_EXTRACT(null, 'device_tag', 'data_center', parameters)
- parameters:
- value_expression = $.tags[?(@.tag=='data_center')][0].value
```
#### Path Combine
路径合并。
- Parameters
- path = Array[string]
```yaml
- function: PATH_COMBINE
lookup_fields: [ packet_capture_file ]
output_fields: [ packet_capture_file ]
parameters:
# 获取grootstream.yaml中properties配置的对应属性hos.path的值
path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file]
# Output: hos_path + bucket_name + packet_capture_file
```
#### Rename
重命名字段。
- Parameters
- parent_fields: Array[string] 指定哪些字段的子字段将进行重命名。如果为空,则仅会对顶级字段进行重命名,不支持对数组结构中的key进行重命名。
- rename_fields: 指定的字段进行重命名
- current_field_name : new_field_name
- rename_expression=`<string>` 对字段执行AviatorScript表达式,返回值作为重命名后的字段名,优先级低于rename_fields。
Example 1: 移除字段名"tags_"前缀 , 重命名字段timestamp_ms为recv_time_ms
```yaml
- function: RENAME
parameters:
rename_fields:
timestamp_ms: recv_time_ms
rename_expression: key=string.replace_all(key,'tags_',''); return key;
```
Example 2: client_ip 重命名为source_ip, 包括隧道encapsulation.ipv4下的字段
```yaml
- function: RENAME
parameters:
parent_fields: [encapsulation.ipv4]
rename_fields:
client_ip: source_ip
# Output: source_ip:192.168.4.1, encapsulation.ipv4.source_ip:192.168.12.12
```
#### Snowflake ID
基于雪花算法生成唯一ID。
Parameters:
- data_center_id_num = <int> 数据中心id,用与保证生成雪花id的唯一性。
````shell
- function: SNOWFLAKE_ID
output_fields: [ log_id ]
````
#### String Joiner
字符串拼接,可以指定分隔符,前缀与后缀。
```yaml
- function: STRING_JOINER
lookup_fields: [client_ip, server_ip]
output_fields: [ip_string]
parameters:
delimiter: ','
prefix: '['
suffix: ']'
# Output:ip_string='[client_ip, server_ip]'
```
#### Unix Timestamp Converter
转换时间戳精度,返回其他精度时间戳
- Parameters
- precision=seconds // 获取Unix时间戳并将其精确到秒级
- precision=milliseconds // 获取Unix时间戳并将其精确到毫秒级
- precision=minutes // 获取Unix时间戳将其精确到分钟级别,并以秒级格式输出
- interval = <int>//时长精度,单位取决于precision
```yaml
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [ __timestamp ]
output_fields: [ recv_time ]
parameters:
precision: seconds
interval: 300
# __timestamp:内置参数,从数据source的摄入时间,以300秒为精度返回时间戳,若precision = minutes,则为以300分钟为精度输出。
```
#### UUID
使用UUIDv4标准,生成128位随机UUID。实现方式参考:https://github.com/cowtowncoder/java-uuid-generator
```yaml
- function: UUID
output_fields: [log_uuid]
# 3f0f8d7e-d89e-4b0a-9f2e-2eab5c99d062
```
#### UUIDv5
是一种基于 **命名空间和名称** 生成的 UUID。与 `UUIDv4` 主要依赖随机数不同,`UUIDv5` 使用 SHA-1 哈希算法将命名空间和名称组合后生成一个确定性的 UUID。这意味着对同一命名空间和相同名称的输入,`UUIDv5` 总是会生成相同的 UUID。
- Parameters
- namespace = <Enum> 枚举值,命名空间是一个 UUID,它定义了名称所属的上下文。可指定如下命名空间:
- NAMESPACE_IP: 6ba7b890-9dad-11d1-80b4-00c04fd430c8
- NAMESPACE_DOMAIN: 6ba7b891-9dad-11d1-80b4-00c04fd430c8
- NAMESPACE_APP: 6ba7b892-9dad-11d1-80b4-00c04fd430c8
- NAMESPACE_SUBSCRIBER: 6ba7b893-9dad-11d1-80b4-00c04fd430c8
```yaml
- function: UUIDv5
lookup_fields: [ client_ip, server_ip ] # 基于 client_ip, server_ip的值组成UUIDv5 name 参数值与命名空间结合后,通过哈希生成唯一的 UUID。
output_fields: [ip_uuid]
parameters:
namespace: NAMESPACE_IP
# 2ed6657d-e927-568b-95e1-2665a8aea6a2
```
#### UUIDv7
通过时间戳和随机数生成唯一UUID,适合需要时间排序的场景,比如数据库索引和日志记录。
```yaml
- function: UUIDv7
output_fields: [log_uuid] # 生成基于时间戳和随机数的 UUID
# 2ed6657d-e927-568b-95e1-2665a8aea6a2
```
### 聚合函数
#### ARRAY_CONCAT_AGG
将多个数组中的各个元素连接成一个单一数组。输入类型必须为数组,字段值为NULL 将被忽略。
Parameters:
- mode(string): 聚合模式;可选值:`all`- 不去重(即包括重复元素);`distinct`:去重(即只保留唯一元素)。默认值:all
- max_size(int) : 限制返回数组的最大大小。当数组长度超过该值时,裁剪掉多余的元素. 默认值:100,000
```yaml
- function: ARRAY_CONCAT_AGG
lookup_fields: [ tags ]
output_fields: [ tags ]
parameters:
mode: distinct
max_size: 1000
```
####
#### Collect List
在时间窗口内将指定对象合并为List,不进行去重
```yaml
- function: COLLECT_LIST
lookup_fields: [client_ip]
output_fields: [client_ip_list]
# Output:client_ip_list= ['192.168.4.1','192.168.4.1','192.168.4.2']
```
#### Collect Set
在时间窗口内将指定对象合并为Set,对结果进行去重。
```yaml
- function: COLLECT_SET
lookup_fields: [client_ip]
output_fields: [client_ip_set]
# Output:client_ip_set= ['192.168.4.1','192.168.4.2']
```
#### First Value
返回时间窗口内第一个出现的不为空的value。
```yaml
- function: FIRST_VALUE
lookup_fields: [ received_bytes ]
output_fields: [ received_bytes_first ]
```
#### Last Value
返回时间窗口内最后一个出现的不为空的value。
```yaml
- function: LAST_VALUE
lookup_fields: [ received_bytes ]
output_fields: [ received_bytes_last ]
```
#### Long Count
在时间窗口内统计Event条数。
```yaml
- function: LONG_COUNT
lookup_fields: [ log_id ]
output_fields: [ sessions ]
```
#### Max
在时间窗口内获取最大值
```yaml
- function: MAX
lookup_fields: [ received_time ]
output_fields: [ received_time ]
```
#### Min
在时间窗口内获取最小值
```yaml
- function: MIN
lookup_fields: [ received_time ]
output_fields: [ received_time ]
```
#### Mean
在时间窗口内对指定的数值对象求平均值。
Parameters
- precision=<int> 返回的double类型结果精度,不配置则返回实际计算结果
```yaml
- function: MEAN
lookup_fields: [ received_bytes ]
output_fields: [ received_bytes_mean ]
parameters:
precision: 2
```
#### Number Sum
在时间窗口内对指定数字类型字段进行求和:支持 int,long,double,float类型。
```yaml
- function: NUMBER_SUM
lookup_fields: [received_bytes, sent_bytes]
output_fields: [received_bytes_sum]
```
```yaml
- function: NUMBER_SUM
lookup_fields: [sent_bytes]
```
#### HLLD
构建HLLD Sketch,输入列可以为常规类型列或HLLD Sketch列。
Parameters:
- input_type(string):输入列类型。可选值:regular(输入列为常规类型列,代表单个元素), sketch(输入列为sketch类型列,sketch类型)。默认值:sketch。
- precision(int):HLL精度。默认值:12。
- output_format(string):输出类型格式。可选值:base64(base64字符串), binary(byte[])。默认值:base64。
```yaml
- function: HLLD
lookup_fields: [ ip_hlld ]
output_fields: [ ip_hlld ]
parameters:
input_type: sketch
- function: HLLD
lookup_fields: [ ip ]
output_fields: [ ip_hlld ]
parameters:
input_type: regular
```
#### APPROX_COUNT_DISTINCT_HLLD
计算近似distinct count,输入列可以为常规类型列或HLLD Sketch列。
Parameters:
- input_type(string):输入列类型。可选值:regular(输入列为常规类型列,代表单个元素), sketch(输入列为sketch类型列,sketch类型)。默认值:sketch。
- precision(int):HLL精度。默认值:12。
```yaml
- function: APPROX_COUNT_DISTINCT_HLLD
lookup_fields: [ ip_hlld ]
output_fields: [ ip_count ]
parameters:
input_type: sketch
- function: APPROX_COUNT_DISTINCT_HLLD
lookup_fields: [ ip ]
output_fields: [ ip_count ]
parameters:
input_type: regular
```
#### HDR_HISTOGRAM
构建HdrHistogram Sketch,输入列可以为常规类型列或HdrHistogram Sketch列。
Parameters:
Parameters:
- input_type(string):输入列类型。可选值:regular(输入列为常规类型列,代表单个元素), sketch(输入列为sketch类型列,sketch类型)。默认值:sketch。
- lowestDiscernibleValue(int):除0外最小值,默认1
- highestTrackableValue(int):直方图可以记录的最大值,默认2
- numberOfSignificantValueDigits(int):指定数据值的精度,默认1;[1-5] 较大的值更精确,但会需要更多内存。
- autoResize(boolean):自动调整highestTrackableValue,默认true
- output_format(string):输出类型格式。可选值:base64(base64字符串), binary(byte[])。默认值:base64。
```yaml
- function: HDR_HISTOGRAM
lookup_fields: [latency_ms_histogram]
output_fields: [latency_ms_histogram]
parameters:
input_type: sketch
- function: HDR_HISTOGRAM
lookup_fields: [latency_ms]
output_fields: [latency_ms_histogram]
parameters:
input_type: regular
```
#### APPROX_QUANTILE_HDR
计算近似分位数,输入列可以为常规类型列或HdrHistogram Sketch列。
Parameters:
- input_type(string):输入列类型。可选值:regular(输入列为常规类型列,代表单个元素), sketch(输入列为sketch类型列,sketch类型)。默认值:sketch。
- lowestDiscernibleValue(int):除0外最小值,默认1
- highestTrackableValue(int):直方图可以记录的最大值,默认2
- numberOfSignificantValueDigits(int):指定数据值的精度,默认1;[1-5] 较大的值更精确,但会需要更多内存。
- autoResize(boolean):自动调整highestTrackableValue,默认true
- probability(double):分位数百分比,范围0-1,默认0.5
```yaml
- function: APPROX_QUANTILE_HDR
lookup_fields: [latency_ms]
output_fields: [latency_ms_p95]
parameters:
input_type: regular
probability: 0.95
- function: APPROX_QUANTILE_HDR
lookup_fields: [latency_ms_histogram]
output_fields: [latency_ms_p95]
parameters:
input_type: sketch
probability: 0.95
```
#### APPROX_QUANTILES_HDR
计算近似分位数,输入列可以为常规类型列或HdrHistogram Sketch列。
Parameters:
- input_type(string):输入列类型。可选值:regular(输入列为常规类型列,代表单个元素), sketch(输入列为sketch类型列,sketch类型)。默认值:sketch。
- lowestDiscernibleValue(int):除0外最小值,默认1
- highestTrackableValue(int):直方图可以记录的最大值,默认2
- numberOfSignificantValueDigits(int):指定数据值的精度,默认1;[1-5] 较大的值更精确,但会需要更多内存。
- autoResize(boolean):自动调整highestTrackableValue,默认true
- probabilities(double[]):分位数百分比数组,范围0-1,默认null,必须的属性。
```yaml
- function: APPROX_QUANTILES_HDR
lookup_fields: [latency_ms_HDR]
output_fields: [latency_ms_quantiles]
parameters:
input_type: sketch
probabilities: [0.5, 0.95, 0.99]
- function: APPROX_QUANTILES_HDR
lookup_fields: [latency_ms]
output_fields: [latency_ms_quantiles]
parameters:
input_type: regular
probabilities: [0.5, 0.95, 0.99]
```
### 表格函数
#### Unroll
展开函数用于处理一个数组类型字段 ,或配置一个用于分割字符串类型字段的表达式 , 并将该字段展开为单独的事件。支持处理 array或string类型字段。
Parameters:
- regex= string//用于将字符串分割为数组的正则表达式,如“,”按照逗号分割字符串,如果字段为数组类型则无需配置
```yaml
functions:
- function: UNROLL
lookup_fields: [ monitor_rule_list ]
output_fields: [ monitor_rule ]
# Input: Event { client_ip=‘192.168.1.1’,monitor_rule_list=[954779,930791]}
# Output:
#Event1: {client_ip=‘192.168.1.1’,monitor_rule=954779}
#Event2: {client_ip=‘192.168.1.1’,monitor_rule=930791}
```
#### Json Unroll
JSON 展开函数接收 JSON 对象字符串字段,将其中的对象数组展开为字符串类型单独事件,同时继承顶级字段。
Parameters:
- path= string//要展开的数组的路径,基于JsonPath表达式,不配置默认展开顶层数组
- new_path= string//新元素的路径,基于JsonPath表达式,不配置默认覆盖原path
```yaml
- function: JSON_UNROLL
lookup_fields: [ encapsulation]
output_fields: [ encapsulation ]
parameters:
path: tags
new_path: new_tag
# Input: Event { client_ip=‘192.168.1.1’,device_tag=‘{"tags":[{"tag":"data_center","value":"center-xxg-tsgx-1"}, {"tag":"device_group","value":"group-xxg-tsgx-2"}]}’}
# Output:
#Event1:{client_ip=‘192.168.1.1’,device_tag='{"new_tag":{"tag":"data_center","value":"center-xxg-tsgx-1"}’}'
#Event2:{client_ip=‘192.168.1.1’,device_tag='{"new_tag":{"tag":"data_center","value":"center-xxg-tsgx-2"}’}'
```
```yaml
- function: JSON_UNROLL
lookup_fields: [ encapsulation]
output_fields: [ encapsulation ]
#Input: Event { client_ip=‘192.168.1.1’,encapsulation=‘[{"tunnels_schema_type":"GRE"},{"tunnels_schema_type":"IPv4","client_ip":"12.1.1.1","server_ip":"14.1.1.1"}]’}
#Output:
#Event1:{client_ip=‘192.168.1.1’,encapsulation='{"tunnels_schema_type":"GRE"}'}
#Event2:{client_ip=‘192.168.1.1’,encapsulation='{"tunnels_schema_type":"IPv4","client_ip":"12.1.1.1","server_ip":"14.1.1.1"}'}
```
#### Path Unroll
将文件路径逐层展开,逐层输出路径和文件(可选)。
Parameters:
- separator= 路径分隔符(只能是单个字符),默认'/'。
```yaml
# 将一个应用层协议按层级进行拆分,应用层协议由协议解析路径和应用组成。
- function: PATH_UNROLL
lookup_fields: [ decoded_path, app]
output_fields: [ protocol_stack_id, app_name ]
parameters:
separator: "."
# Input: {"decoded_path":"ETHERNET.IPv4.TCP.ssl","app":"port_443"}
# Output:
#Event1: {"protocol_stack_id":"ETHERNET"}
#Event2: {"protocol_stack_id":"ETHERNET.IPv4"}
#Event3: {"protocol_stack_id":"ETHERNET.IPv4.TCP"}
#Event4: {"protocol_stack_id":"ETHERNET.IPv4.TCP.ssl"}
#Event5: {"app_name":"port_443","protocol_stack_id":"ETHERNET.IPv4.TCP.ssl.port_443"}
# Input: {"decoded_path":"ETHERNET.IPv4.TCP.ssl","app":"ssl"}
# Output:
#Event1: {"protocol_stack_id":"ETHERNET"}
#Event2: {"protocol_stack_id":"ETHERNET.IPv4"}
#Event3: {"protocol_stack_id":"ETHERNET.IPv4.TCP"}
#Event4: {"app_name":"ssl","protocol_stack_id":"ETHERNET.IPv4.TCP.ssl"}
# Input: {"decoded_path":"ETHERNET.IPv4.TCP.ssl","app":"ssl.port_444"}
# Output:
#Event1: {"protocol_stack_id":"ETHERNET"}
#Event2: {"protocol_stack_id":"ETHERNET.IPv4"}
#Event3: {"protocol_stack_id":"ETHERNET.IPv4.TCP"}
#Event4: {"protocol_stack_id":"ETHERNET.IPv4.TCP.ssl"}
#Event5: {"app_name":"ssl.port_444","protocol_stack_id":"ETHERNET.IPv4.TCP.ssl.ssl.port_444"}
#只有路径参数的场景(或者上例中文件字段值为null).
- function: PATH_UNROLL
lookup_fields: [ decoded_path]
output_fields: [ protocol_stack_id]
parameters:
separator: "."
# Input: {"decoded_path":"ETHERNET.IPv4.TCP.ssl"}
# Output:
#Event1: {"protocol_stack_id":"ETHERNET"}
#Event2: {"protocol_stack_id":"ETHERNET.IPv4"}
#Event3: {"protocol_stack_id":"ETHERNET.IPv4.TCP"}
#Event4: {"protocol_stack_id":"ETHERNET.IPv4.TCP.ssl"}
```
## CN扩展UDF
[CN函数库](https://docs.geedge.net/pages/viewpage.action?pageId=129087866)
用户自定义插件(IN Progress)
| 名称 | 描述 | 类型 | 必填 | 约束 |
|----------------------|---------|---------------|----|---------|
| user_define_process | | | | |
| function_name | 调用的方法名称 | String | Y | |
| class_name | 主类名 | String | Y | |
| jar_name | jar包名 | String | Y | |
| config_field | 私有属性配置 | JSON[Array] | N | 可自定义多属性 |
| input_schema | 输入字段 | json_String | Y | |
| output_schema | 输出字段 | json_String | Y | |
| | | | | |
| user_define_function | | | | |
| name | 名称 | String | Y | |
| description | 描述 | String | N | |
| type | 类型 | String | Y | udfudaf |
| class_name | 主类名 | String | Y | |
| jar_name | jar包名 | String | Y | |
# 设计原则
## 模块之间依赖关系

## 动态加载工厂模式

- 包命名: com.geedgenetworks. [模块名].XXX
- 统一依赖管理:第三方类库的依赖声明在项目的顶层 POM 文件(也称为 Project POM)中,各个子模块继承这些依赖,确保整个项目共享相同的依赖。
- 模块之间依赖:在每个模块的 POM 文件中定义依赖其他模块的关系。
- 每个模块按其职责命名 groot-[功能名称],例如:
- groot-common 公共模块,包含可复用功能、工具类或库,供其它模块引用。
- groot-core 核心模块,包含与业务逻辑紧密相关的核心功能、类、接口或服务。
- groot-bootstrap 启动模块,包含一些必要的初始化代码、配置解析或资源加载等,它属于应用程序起点,负责将一个流处理任务各个部分组装起来,使其正确运行。
- groot-connectors 连接器模块
- connecor-kafka 子模块,包含Source和Sink 功能
- connector-ipfix-collector 子模块,Source 功能
- connecotr-clickhouse 子模块,Sink 功能
- MockDataConnector(Source) 用于产生样例数据,用于测试、开发或演示目的的场景
- groot-formats format模块
- format-json 子模块,提供json format
- groot-tests 测试模块,包含多个模块,用于任务的集成和功能测试 (非单元测试)
- 对于不受检查异常(RuntimeException)在groot-common模块定义全局的异常处理类GrootRuntimeException,基于该自定义的异常抛出并附带更清晰的错误信息,用于限定问题的范围。其他各个模块按需实现Exception用于增加更多上下文异常提示信息。
- 自定义插件管理:Connectors(Source 和 Destination) 和 UDF 函数;
- UDF(用户自定义函数)—— 用于数据清洗、处理和格式转换。按实现方式可分为内置UDFs和用户扩展UDFs。
- UDF接口包括Function Name、传递数据(Event)、配置参数(context) 及执行方法 Evaluate(Event)
- 通过配置文件(udfs)管理平台已注册的函数列表
- 任务启动时包含两个步骤:验证所引用的函数是否在注册列表中;按照引用的顺序对函数进行实例化。
- 与通用工具类的关系:UDF 调用通用工具类的方法,以实现Evaluate的功能。
- 提供open 和 close 方法,用以对象初始化,处理连接器(如数据库连接、文件句柄等)相关的资源的打开和关闭。而open方法一次性初始化的方法,在 UDF 对象创建时执行,用于初始化对象级别的资源和状态。
- Event 内置字段(Internal Fields) 命名以双下划线开头,仅用于数据流处理,不发送到SINK 模块。
- __timestamp : 数据摄入时间(Ingestion Time)。当Source无法抽取时,使用当前时间(Unix epoch格式),一般用于标识“数据的摄入时间”。例如 Kafka Source 抽取头部_time属性。
- __inputId: 数据来源,事件的产生源头或来源的标识符或名称。用于事件追踪和管理,以确定事件是由哪个系统、应用程序、设备或实体产生的。例如Kafka Source 记录topic 名称。
# 相关问题
- 知识库更新为什么不基于Flink 广播流?
- 广播流适用于将配置或规则低吞吐事件流广播到下游所有Task中,不适用广播知识库大文件配置(GB级别)。
- 采用广播流动态广播知识库元数据方式,若更新知识库,当基于每个Task(线程)分别存储,占用内存较大;如果基于进程级(静态方法/变量)共享,可能会发生线程阻塞或死锁问题。
- 自定义函数如何提交到平台?
- Pack 在平台里定位是什么? 如何扩展?
- 数据分流方案?
- 使用Flink 侧输出流(side_output),对事件标记tag实现。
- Aggregate Processor 函数如何定义?怎么指定dimension、Metrics ?
- 支持基础滑动,滚动窗口聚合计算。Dimension 基于group_by_fields 指定,Metrics 通过自定义UDAF实现。
|