1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
#include "maat.h"
#include "config.h"
#include "job_ctx.h"
#include <MESA/maat.h>
static struct maat * dp_trace_maat_instance_create()
{
int ret = 0;
int redis_port_begin = 0;
int redis_port_end = 0;
int redis_port_select = 0;
struct maat * target = NULL;
const struct config * conf = global_config_get();
struct maat_options * opts = maat_options_new();
maat_options_set_logger(opts, "log/maat.log", (enum log_level)conf->maat_log_level);
maat_options_set_instance_name(opts, "dp_trace_telemetry");
dzlog_info("maat defere load:%u", conf->deferred_load_on);
if (conf->deferred_load_on)
{
ret = maat_options_set_deferred_load_on(opts);
if (ret != 0)
{
dzlog_warn("maat_options_set_deferred_load_on function execution failed.");
}
}
maat_options_set_caller_thread_number(opts, 0);
switch (conf->maat_input_mode)
{
case MAAT_INPUT_JSON:
ret = maat_options_set_json_file(opts, conf->json_cfg_file);
if (ret != 0)
{
dzlog_error("maat_options_set_json_file function execution failed.");
goto end;
}
break;
case MAAT_INPUT_REDIS:
ret = sscanf(conf->redis_port_range, "%d-%d", &redis_port_begin, &redis_port_end);
if (ret == 1)
{
redis_port_select = redis_port_begin;
}
else if (ret == 2)
{
srand(time(NULL));
redis_port_select = redis_port_begin + rand() % (redis_port_end - redis_port_begin);
}
else
{
dzlog_error("Invalid redis port range %s, MAAT init failed.", conf->redis_port_range);
goto end;
}
maat_options_set_redis(opts, conf->redis_server, redis_port_select, conf->redis_db_idx);
break;
default:
dzlog_error("Invalid MAAT Input Mode: %d.", conf->maat_input_mode);
goto end;
break;
}
target = maat_new(opts, conf->table_schema);
if (!target)
{
dzlog_error("maat_new function execution failed.");
goto end;
}
end:
if (opts != NULL)
{
maat_options_free(opts);
}
return target;
}
void dp_trace_maat_init()
{
dzlog_info("data path trace maat init start...");
struct maat * target = dp_trace_maat_instance_create();
DP_TRACE_VERIFY(target, "create maat instance failed.");
int ret = maat_plugin_table_ex_schema_register(target, "DATAPATH_TELEMETRY_JOB", telemetry_job_add_cb,
telemetry_job_del_cb, NULL, 0, NULL);
DP_TRACE_VERIFY(ret == 0, "failed at register callback of DATAPATH_TELEMETRY_JOB.");
dzlog_info("data path trace maat init end");
}
|