summaryrefslogtreecommitdiff
path: root/shaping/src/shaper_maat.cpp
blob: d1f207d7674eba8242a7c3f3fcb3193be6420705 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
#include <MESA_prof_load.h>
#include <cstring>
#include <stdio.h>
#include <time.h>

#include <MESA/cJSON.h>
#include <MESA/maat.h>
#include <MESA/MESA_handle_logger.h>

#include "log.h"
#include "shaper_maat.h"
#include "utils.h"

#define SHAPING_STREAM_TIMEOUT 3600

enum input_mode
{
    MAAT_INPUT_JSON = 0,
    MAAT_INPUT_REDIS = 1,
    MAAT_INPUT_FILE = 2,
};

struct shaper_maat_config {
    enum input_mode input_mode;
    char table_info[2048];
    char accept_tags[2048];
    char json_cfg_file[2048];
    int redis_db_idx;
    char redis_ip[2048];
    char redis_port_range[2048];
};

struct maat *g_maat_instance = NULL;

void shaper_rule_ex_new(int table_id, const char *key, const char *table_line, void **ad, long argl, void *argp)
{
    struct shaping_rule *s_rule;
    cJSON *json=NULL;
    cJSON *tmp_obj = NULL;
    cJSON *user_region = NULL;
    cJSON *tmp_array_obj = NULL;
    int array_size;
    int i;

    s_rule = (struct shaping_rule*)calloc(1, sizeof(struct shaping_rule));
    s_rule->ref_cnt = 1;

    json = cJSON_Parse(table_line);
    if (!json) {//required
        LOG_ERROR("%s: json parse rule failed for table line %s", LOG_TAG_MAAT, table_line);
        goto END;
    }

    tmp_obj = cJSON_GetObjectItem(json, "compile_id");
    if (!tmp_obj) {
        LOG_ERROR("%s: json parse compile_id failed for table line %s", LOG_TAG_MAAT, table_line);
        goto END;
    }
    s_rule->id = tmp_obj->valueint;

    user_region = cJSON_GetObjectItem(json, "user_region");
    if (!user_region) {
        LOG_ERROR("%s: json parse user_region failed for table line %s", LOG_TAG_MAAT, table_line);
        goto END;
    }

    tmp_obj = cJSON_GetObjectItem(user_region, "priority");
    if (!tmp_obj) {
        LOG_ERROR("%s: json parse priority failed for table line %s", LOG_TAG_MAAT, table_line);
        goto END;
    }
    s_rule->priority = tmp_obj->valueint;

    tmp_obj = cJSON_GetObjectItem(user_region, "fair_factor");
    if (!tmp_obj) {
        LOG_ERROR("%s: json parse fair-factor failed for table line %s", LOG_TAG_MAAT, table_line);
        goto END;
    }
    s_rule->fair_factor = tmp_obj->valueint;

    tmp_obj = cJSON_GetObjectItem(user_region, "profile_chain");
    if (!tmp_obj) {//required
        LOG_ERROR("%s: json parse profile_chain failed for table line %s", LOG_TAG_MAAT, table_line);
        goto END;
    }
        
    array_size = cJSON_GetArraySize(tmp_obj);
    if (array_size < 1) {
        LOG_ERROR("%s: json parse profile_chain empty for table line %s", LOG_TAG_MAAT, table_line);
        goto END;
    }

    tmp_array_obj = cJSON_GetArrayItem(tmp_obj, 0);
    s_rule->primary_pf_id = tmp_array_obj->valueint;

    s_rule->borrow_pf_num = array_size - 1;
    for (i = 1; i < array_size; i++) {
        tmp_array_obj = cJSON_GetArrayItem(tmp_obj, i);
        s_rule->borrow_pf_id_array[i - 1] = tmp_array_obj->valueint;
    }

END:
    *ad = s_rule;
    if (json) {
        cJSON_Delete(json);
    }
    return;
}

void shaper_rule_ex_dup(int table_id, void **to, void **from, long argl, void *argp)
{
    struct shaping_rule *s_rule;

    if (*from == NULL) {
        return;
    }

    s_rule = (struct shaping_rule*)*from;
    __atomic_add_fetch(&s_rule->ref_cnt, 1, __ATOMIC_SEQ_CST);
    *to = s_rule;

    return;
}

void shaper_rule_ex_free(int table_id, void **ad, long argl, void *argp)
{
    struct shaping_rule *s_rule;

    if (*ad == NULL) {
        return;
    }

    s_rule = (struct shaping_rule*)*ad;
    if (__atomic_sub_fetch(&s_rule->ref_cnt, 1, __ATOMIC_SEQ_CST) > 0) {
        return;
    }

    free(s_rule);
    *ad = NULL;

    return;
}

void shaper_profile_ex_new(int table_id, const char *key, const char *table_line, void **ad, long argl, void *argp)
{
    struct shaping_profile *s_pf;
    cJSON *json=NULL;
    cJSON *tmp_array_obj = NULL;
    cJSON *tmp_obj = NULL;
    char split_by[64] = {0};
    char limits[128] = {0};
    char aqm_options[64] = {0};
    char dscp_marking[64] = {0};
    char volume_based_shaping[64] = {0};
    int limit_bandwidth;
    int array_size, i;
    int ret;

    s_pf = (struct shaping_profile*)calloc(1, sizeof(struct shaping_profile));
    s_pf->ref_cnt = 1;

    ret = sscanf(table_line, "%d\t%63s\t%127s\t%63s\t%63s\t%63s\t%d",
            &s_pf->id, split_by, limits, aqm_options, dscp_marking, volume_based_shaping, &s_pf->valid);
    if (ret != 7) {
        LOG_ERROR("%s: sscanf parse failed for profile line %s", LOG_TAG_MAAT, table_line);
        goto END;
    }
    
    //parse limits of profile
    json = cJSON_Parse(limits);
    if (!json) {
        LOG_ERROR("%s: json parse profile failed for profile id %d, line %s", LOG_TAG_MAAT, s_pf->id, table_line);
        goto END;
    }

    array_size = cJSON_GetArraySize(json);
    for (i = 0; i < array_size; i++) {
        tmp_array_obj = cJSON_GetArrayItem(json, i);
        tmp_obj = cJSON_GetObjectItem(tmp_array_obj, "bandwidth");
        if (!tmp_obj) {
            LOG_ERROR("%s: json parse limit bandwidth failed for profile id %d, line %s", LOG_TAG_MAAT, s_pf->id, table_line);
            goto END;
        }
        limit_bandwidth = tmp_obj->valueint;

        tmp_obj = cJSON_GetObjectItem(tmp_array_obj, "direction");
        if (!tmp_obj) {
            LOG_ERROR("%s: json parse limit direction failed for profile id %d, line %s", LOG_TAG_MAAT, s_pf->id, table_line);
            goto END;
        }
        if (strncmp(tmp_obj->valuestring, "incoming", strlen(tmp_obj->valuestring)) == 0) {
            s_pf->in_limit_bandwidth = limit_bandwidth;
        } else {
            s_pf->out_limit_bandwidth = limit_bandwidth;
        }
    }

    cJSON_Delete(json);
    json = NULL;


    //TODO:parse other argumens of profile

END:
    *ad = s_pf;
    if (json) {
        cJSON_Delete(json);
    }

    return;
}

void shaper_profile_ex_dup(int table_id, void **to, void **from, long argl, void *argp)
{
    struct shaping_profile *s_pf;

    if (*from == NULL) {
        return;
    }

    s_pf = (struct shaping_profile*)*from;
    __atomic_add_fetch(&s_pf->ref_cnt, 1, __ATOMIC_SEQ_CST);
    *to = s_pf;

    return;
}

void shaper_profile_ex_free(int table_id, void **ad, long argl, void *argp)
{
    struct shaping_profile *s_pf;
    
    s_pf = (struct shaping_profile*)*ad;
    if (__atomic_sub_fetch(&s_pf->ref_cnt, 1, __ATOMIC_SEQ_CST) > 0) {
        return;
    }

    free(s_pf);
    *ad = NULL;

    return;
}

void shaper_profile_update(struct shaping_profile_info *s_pf_info, struct shaping_profile *s_pf_ex, int priority)
{
    s_pf_info->id = s_pf_ex->id;
    s_pf_info->priority = priority;

    shaper_profile_ex_free(0, (void **)&s_pf_ex, 0, NULL);

    return;
}

static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_rule_info *s_rule_info, long long rule_compile_id)
{
    struct shaping_rule *s_rule = NULL;
    struct shaping_profile *s_pf = NULL;
    char pf_id_key[8] = {0};

    s_rule = (struct shaping_rule*)maat_plugin_table_get_ex_data(g_maat_instance, ctx->maat_info->rule_table_id, (char *)&rule_compile_id);
    if (!s_rule) {
        LOG_ERROR("%s maat_plugin_table_get_ex_data get rule failed for compile id %lld", LOG_TAG_MAAT, rule_compile_id);
        goto END;
    }
    s_rule_info->id = s_rule->id;

    snprintf(pf_id_key, sizeof(pf_id_key), "%d", s_rule->primary_pf_id);
    s_pf = (struct shaping_profile *)maat_plugin_table_get_ex_data(g_maat_instance, ctx->maat_info->profile_table_id, pf_id_key);
    if (!s_pf) {
        LOG_ERROR("%s maat_plugin_table_get_ex_data get profile failed for key %s", LOG_TAG_MAAT, pf_id_key);
        goto END;
    }
    shaper_profile_update(&s_rule_info->primary, s_pf, s_rule->priority);

    if (s_rule->borrow_pf_num == 0) {
        goto END;
    }

    for (int i = 0; i < s_rule->borrow_pf_num; i++) {
        snprintf(pf_id_key, sizeof(pf_id_key), "%d", s_rule->borrow_pf_id_array[i]);
        s_pf = (struct shaping_profile *)maat_plugin_table_get_ex_data(g_maat_instance, ctx->maat_info->profile_table_id, pf_id_key);
        if (!s_pf) {
            LOG_ERROR("%s maat_plugin_table_get_ex_data get profile failed for key %s", LOG_TAG_MAAT, pf_id_key);
            goto END;
        }

        if (s_rule->priority + i + 1 < SHAPING_PRIORITY_NUM_MAX) {//TODO: 优先级大于9的都按9处理
            shaper_profile_update(&s_rule_info->borrowing[i], s_pf, s_rule->priority + i + 1);
            s_rule_info->borrowing_num++;
        } else {
            goto END;
        }
    }

END:
    if (s_rule) {
        shaper_rule_ex_free(ctx->maat_info->rule_table_id, (void **)&s_rule, 0, NULL);
    }
    return;
}

void shaper_rules_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, long long *rule_compile_ids, int rule_num)
{
    int i, j;

    if (sf->rule_num + rule_num > SHAPING_RULE_NUM_MAX) {
        LOG_ERROR("%s: shaping exceed maat rule num limit %d for flow: %s", LOG_TAG_MAAT, SHAPING_RULE_NUM_MAX, addr_tuple4_to_str(&sf->tuple4));
        return;
    }

    for (i = sf->rule_num, j = 0; i < sf->rule_num + rule_num; i++, j++) {
        shaper_rule_update(ctx, &sf->matched_rule_infos[i], rule_compile_ids[j]);
    }
    sf->rule_num += rule_num;

    return;
}

static int shaper_maat_config_load(struct shaper_maat_config *conf)
{
    MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "MAAT", "INPUT_MODE", (int *)&conf->input_mode, MAAT_INPUT_REDIS);
    MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "MAAT", "TABLE_INFO", conf->table_info, sizeof(conf->table_info), "conf/table_info.conf");
    MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "MAAT", "JSON_FILE", conf->json_cfg_file, sizeof(conf->json_cfg_file), "conf/shaping_maat.json");
    MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "MAAT", "REDIS_DB_IDX", &conf->redis_db_idx, 0);
    MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "MAAT", "REDIS_IP", conf->redis_ip, sizeof(conf->redis_ip), "127.0.0.1");
    MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "MAAT", "REDIS_PORT", conf->redis_port_range, sizeof(conf->redis_port_range), "6379");

    return 0;
}

struct shaping_maat_info* shaper_maat_init(const char *instance_name)
{    
    struct maat_options *opts;
    struct shaping_maat_info *maat_info;
    struct shaper_maat_config conf;
    int ret;

    if (shaper_maat_config_load(&conf) < 0) {
        return NULL;
    }
    
    maat_info = (struct shaping_maat_info *)calloc(1, sizeof(struct shaping_maat_info));

    opts = maat_options_new();
    maat_options_set_instance_name(opts, instance_name);
    switch (conf.input_mode) {
        case MAAT_INPUT_JSON:
            maat_options_set_json_file(opts, conf.json_cfg_file);
            break;
        case MAAT_INPUT_REDIS:
            int port_begin, port_end;
            int port;
            ret = sscanf(conf.redis_port_range, "%d-%d", &port_begin, &port_end);
            switch (ret) {
                case 1:
                    port = port_begin;
                    break;
                case 2:
                    srand(time(NULL));
                    port = port_begin + rand() % (port_end - port_begin);
                    break;
                default:
                    LOG_ERROR("%s: shaping maat invalid redis port range %s", LOG_TAG_MAAT, conf.redis_port_range);
                    goto ERROR;
            }
            maat_options_set_redis(opts, conf.redis_ip, port, conf.redis_db_idx);
            break;
        default:
            LOG_ERROR("%s: shaping maat invalid input_mode %d", LOG_TAG_MAAT, conf.input_mode);
            goto ERROR;
    }

    g_maat_instance = maat_new(opts, conf.table_info);
    maat_options_free(opts);
    if (!g_maat_instance) {
        LOG_ERROR("%s: shaping maat init by maat_new failed", LOG_TAG_MAAT);
        goto ERROR;
    }

    maat_info->rule_table_id = maat_get_table_id(g_maat_instance, "TRAFFIC_SHAPING_COMPILE");
    if (maat_info->rule_table_id < 0) {
        LOG_ERROR("%s: shaping maat register table TRAFFIC_SHAPING_COMPILE failed", LOG_TAG_MAAT);
        goto ERROR;
    }
    maat_info->profile_table_id = maat_get_table_id(g_maat_instance, "TRAFFIC_SHAPING_PROFILE");
    if (maat_info->profile_table_id < 0) {
        LOG_ERROR("%s: shaping maat register table TRAFFIC_SHAPING_PROFILE failed", LOG_TAG_MAAT);
        goto ERROR;
    }

    ret = maat_plugin_table_ex_schema_register(g_maat_instance, maat_info->rule_table_id, shaper_rule_ex_new, shaper_rule_ex_free, shaper_rule_ex_dup, 0, NULL);
    if (ret < 0) {
        LOG_ERROR("%s: shaping maat register callback funcs for table TRAFFIC_SHAPING_COMPILE failed", LOG_TAG_MAAT);
        goto ERROR;
    }

    ret = maat_plugin_table_ex_schema_register(g_maat_instance, maat_info->profile_table_id, shaper_profile_ex_new, shaper_profile_ex_free, shaper_profile_ex_dup, 0, NULL);
    if (ret < 0) {
        LOG_ERROR("%s: shaping maat register callback funcs for table TRAFFIC_SHAPING_PROFILE failed", LOG_TAG_MAAT);
        goto ERROR;
    }

    LOG_DEBUG("%s: shaping maat init complete", LOG_TAG_MAAT);

    return maat_info;
ERROR:
    shaper_maat_destroy(maat_info);
    return NULL;
}

void shaper_maat_destroy(struct shaping_maat_info *maat_info)
{
    if (maat_info) {
        free(maat_info);
    }
    if (g_maat_instance) {
        maat_free(g_maat_instance);
    }

    return;
}