summaryrefslogtreecommitdiff
path: root/src/maat.c
blob: e0b4ea140957f6c0a813ca8d0af2dd68114e4844 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#include "maat.h"
#include "config.h"
#include "job_ctx.h"

#include <MESA/maat.h>

static struct maat * dp_trace_maat_instance_create()
{
    int ret = 0;
    int redis_port_begin = 0;
    int redis_port_end = 0;
    int redis_port_select = 0;
    struct maat * target = NULL;

    const struct config * conf = global_config_get();

    struct maat_options * opts = maat_options_new();
    maat_options_set_logger(opts, "log/maat.log", (enum log_level)conf->maat_log_level);
    maat_options_set_instance_name(opts, "dp_trace_telemetry");

    dzlog_info("maat defere load:%u", conf->deferred_load_on);
    if (conf->deferred_load_on)
    {
        ret = maat_options_set_deferred_load_on(opts);
        if (ret != 0)
        {
            dzlog_warn("maat_options_set_deferred_load_on function execution failed.");
        }
    }

    maat_options_set_caller_thread_number(opts, 0);
    switch (conf->maat_input_mode)
    {
    case MAAT_INPUT_JSON:
        ret = maat_options_set_json_file(opts, conf->json_cfg_file);
        if (ret != 0)
        {
            dzlog_error("maat_options_set_json_file function execution failed.");
            goto end;
        }
        break;
    case MAAT_INPUT_REDIS:
        ret = sscanf(conf->redis_port_range, "%d-%d", &redis_port_begin, &redis_port_end);
        if (ret == 1)
        {
            redis_port_select = redis_port_begin;
        }
        else if (ret == 2)
        {
            srand(time(NULL));
            redis_port_select = redis_port_begin + rand() % (redis_port_end - redis_port_begin);
        }
        else
        {
            dzlog_error("Invalid redis port range %s, MAAT init failed.", conf->redis_port_range);
            goto end;
        }
        maat_options_set_redis(opts, conf->redis_server, redis_port_select, conf->redis_db_idx);
        break;
    default:
        dzlog_error("Invalid MAAT Input Mode: %d.", conf->maat_input_mode);
        goto end;
        break;
    }

    target = maat_new(opts, conf->table_schema);
    if (!target)
    {
        dzlog_error("maat_new function execution failed.");
        goto end;
    }

end:
    if (opts != NULL)
    {
        maat_options_free(opts);
    }
    return target;
}

void dp_trace_maat_init()
{
    dzlog_info("data path trace maat init start...");
    struct maat * target = dp_trace_maat_instance_create();
    DP_TRACE_VERIFY(target, "create maat instance failed.");

    int ret = maat_plugin_table_ex_schema_register(target, "DATAPATH_TELEMETRY_JOB", telemetry_job_add_cb,
                                                   telemetry_job_del_cb, NULL, 0, NULL);
    DP_TRACE_VERIFY(ret == 0, "failed at register callback of DATAPATH_TELEMETRY_JOB.");
    dzlog_info("data path trace maat init end");
}