summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhengchao <[email protected]>2022-04-26 16:34:54 +0800
committerzhengchao <[email protected]>2022-04-26 16:34:54 +0800
commitfd88b6a266ec38d8d5663eeb3429a36970cea099 (patch)
tree8be6d79899a6d44e687f3a827471383941fe1371
parent742512e10f57bf2df171af77ce61170702eaa489 (diff)
EX_data_rt_row2EX_data返回-1时,ip_rule和fqdn_rule内存未释放,导致内存泄漏 TSG-10475。v3.6.5
-rw-r--r--src/entry/Maat_command.cpp4959
-rw-r--r--src/entry/Maat_rule.cpp2
-rw-r--r--src/entry/Maat_table_runtime.cpp24
3 files changed, 2501 insertions, 2484 deletions
diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp
index 16c777d..4ca6baa 100644
--- a/src/entry/Maat_command.cpp
+++ b/src/entry/Maat_command.cpp
@@ -1,2479 +1,2480 @@
-#include "Maat_command.h"
-#include "Maat_rule.h"
-#include "Maat_rule_internal.h"
-#include "Maat_utils.h"
-#include "config_monitor.h"
-#include "map_str2int.h"
-#include "hiredis.h"
-#include <MESA/MESA_handle_logger.h>
-#include <errno.h>
-#include <pthread.h>
-#include <assert.h>
-#include <unistd.h>
-
-#define maat_redis_monitor (module_name_str("MAAT_REDIS_MONITOR"))
-#define maat_command (module_name_str("MAAT_COMMAND"))
-const time_t MAAT_REDIS_RECONNECT_INTERVAL=5;
-const char* mr_key_prefix[2]={"OBSOLETE_RULE","EFFECTIVE_RULE"};
-const char* mr_status_sset="MAAT_UPDATE_STATUS";
-const char* mr_expire_sset="MAAT_EXPIRE_TIMER";
-const char* mr_label_sset="MAAT_LABEL_INDEX";
-const char* mr_version_sset="MAAT_VERSION_TIMER";
-const char* mr_expire_lock="EXPIRE_OP_LOCK";
-const long mr_expire_lock_timeout=300*1000;
-const static int MAAT_REDIS_SYNC_TIME=30*60;
-const char* mr_op_str[]={"DEL","ADD","RENEW_TIMEOUT"};
-const char* foreign_source_prefix="redis://";
-const char* foreign_key_prefix="__FILE_";
-
-
-
-int _wrap_redisGetReply(redisContext *c, redisReply **reply)
-{
- return redisGetReply(c, (void **)reply);
-}
-redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...)
-{
- va_list ap;
- void *reply = NULL;
- int ret=REDIS_ERR, retry=0;
- while(reply==NULL&&retry<2&&ret!=REDIS_OK)
- {
- va_start(ap,format);
- reply = redisvCommand(c,format,ap);
- va_end(ap);
- if(reply==NULL)
- {
- ret=redisReconnect(c);
- retry++;
- }
- }
- return (redisReply *)reply;
-}
-redisContext * connect_redis(const char*redis_ip, int redis_port, int redis_db, void* logger)
-{
- struct timeval connect_timeout;
- connect_timeout.tv_sec=0;
- connect_timeout.tv_usec=100*1000; // 100 ms
- redisReply* reply=NULL;
-
- redisContext * ctx;
- ctx=redisConnectWithTimeout(redis_ip, redis_port,connect_timeout);
- if(ctx==NULL||ctx->err)
- {
- if(logger==NULL)
- {
- printf("Unable to connect redis server %s:%d db%d, error: %s\n",
- redis_ip, redis_port, redis_db, ctx==NULL ? "Unknown" : ctx->errstr);
-
- }
- else
- {
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
- "Unable to connect redis server %s:%d db%d, error: %s",
- redis_ip, redis_port, redis_db, ctx==NULL ? "Unknown" : ctx->errstr);
- }
- if(ctx!=NULL) redisFree(ctx);
- return NULL;
- }
- redisEnableKeepAlive(ctx);
- reply=_wrap_redisCommand(ctx, "select %d",redis_db);
- freeReplyObject(reply);
- reply=NULL;
-
- return ctx;
-
-}
-
-int connect_redis_for_write(struct source_redis_ctx* mr_ctx, void* logger)
-{
- assert(mr_ctx->write_ctx==NULL);
- mr_ctx->write_ctx=connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db, logger);
- if(mr_ctx->write_ctx==NULL)
- {
- return -1;
- }
- else
- {
- return 0;
- }
-}
-redisContext* get_redis_ctx_for_write(struct _Maat_feather_t * feather)
-{
- int ret=0;
- if(feather->mr_ctx.write_ctx==NULL)
- {
- ret=connect_redis_for_write(&(feather->mr_ctx), feather->logger);
- if(ret!=0)
- {
- return NULL;
- }
- }
- return feather->mr_ctx.write_ctx;
-}
-long long read_redis_integer(const redisReply* reply)
-{
- switch(reply->type)
- {
- case REDIS_REPLY_INTEGER:
- return reply->integer;
- break;
- case REDIS_REPLY_ARRAY:
- assert(reply->element[0]->type==REDIS_REPLY_INTEGER);
- return reply->element[0]->integer;
- break;
- case REDIS_REPLY_STRING:
- return atoll(reply->str);
- break;
- default:
- return -1;
- break;
- }
- return 0;
-}
-long long redis_server_time(redisContext* ctx)
-{
- long long server_time=0;
- redisReply* data_reply=NULL;
- data_reply=_wrap_redisCommand(ctx,"TIME");
- if(data_reply->type==REDIS_REPLY_ARRAY)
- {
- server_time=atoll(data_reply->element[0]->str);
- freeReplyObject(data_reply);
- data_reply=NULL;
- }
- return server_time;
-}
-enum MAAT_TABLE_TYPE type_region2table(const struct Maat_region_t* p)
-{
- enum MAAT_TABLE_TYPE ret=TABLE_TYPE_IP;
- switch(p->region_type)
- {
- case REGION_IP:
- ret=TABLE_TYPE_IP;
- break;
- case REGION_EXPR:
- if(p->expr_rule.district==NULL)
- {
- ret=TABLE_TYPE_EXPR;
- }
- else
- {
- ret=TABLE_TYPE_EXPR_PLUS;
- }
- break;
- case REGION_INTERVAL:
- if(p->interval_rule.district==NULL)
- {
- ret=TABLE_TYPE_INTERVAL;
- }
- else
- {
- ret=TABLE_TYPE_INTERVAL_PLUS;
- }
- break;
- case REGION_DIGEST:
- ret=TABLE_TYPE_DIGEST;
- break;
- case REGION_SIMILARITY:
- ret=TABLE_TYPE_SIMILARITY;
- break;
- default:
- assert(0);
- }
- return ret;
-}
-int get_valid_flag_offset(const char* line, enum MAAT_TABLE_TYPE type,int valid_column_seq)
-{
- size_t offset=0, len=0;
- unsigned int column_seq=0, ret=0;
- switch(type)
- {
- case TABLE_TYPE_EXPR:
- column_seq=7;
- break;
- case TABLE_TYPE_IP:
- column_seq=14;
- break;
- case TABLE_TYPE_IP_PLUS:
- column_seq=18;
- break;
- case TABLE_TYPE_COMPILE:
- column_seq=8;
- break;
- case TABLE_TYPE_PLUGIN:
- case TABLE_TYPE_IP_PLUGIN:
- case TABLE_TYPE_FQDN_PLUGIN:
- if(valid_column_seq<0)
- {
- return -1;
- }
- column_seq=(unsigned int)valid_column_seq;
- break;
- case TABLE_TYPE_INTERVAL:
- column_seq=5;
- break;
- case TABLE_TYPE_INTERVAL_PLUS:
- column_seq=6;
- break;
- case TABLE_TYPE_DIGEST:
- column_seq=6;
- break;
- case TABLE_TYPE_SIMILARITY:
- column_seq=5;
- break;
- case TABLE_TYPE_EXPR_PLUS:
- column_seq=8;
- break;
- case TABLE_TYPE_GROUP2COMPILE:
- case TABLE_TYPE_GROUP2GROUP:
- column_seq=3;
- break;
- default:
- assert(0);
- }
-
- ret=get_column_pos(line, column_seq, &offset, &len);
- if(ret<0||offset>=strlen(line)||(line[offset]!='1'&&line[offset]!='0'))// 0 is also a valid value for some non-MAAT producer.
- {
- return -1;
- }
- return offset;
-}
-int invalidate_line(char* line, enum MAAT_TABLE_TYPE type,int valid_column_seq)
-{
- int i=0;
- i=get_valid_flag_offset(line, type,valid_column_seq);
- if(i<0)
- {
- return -1;
- }
- line[i]='0';
- return 0;
-}
-
-void serialize_group2group(enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g, char* buff, size_t sz)
-{
- snprintf(buff, sz, "%d\t%d\t%d", g2g->group_id,
- g2g->superior_group_id,
- op);
- return;
-}
-void serialize_group2compile(enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c, char* buff, size_t sz)
-{
- snprintf(buff, sz, "%d\t%d\t%d\t%d\t%s\t%d", g2c->group_id,
- g2c->compile_id,
- op,
- g2c->not_flag,
- g2c->virtual_table_name?g2c->virtual_table_name:"null",
- g2c->clause_index);
- return;
-}
-void serialize_compile(const struct Maat_rule_t* p_m_rule, const char* huge_service_defined, int clause_num, enum MAAT_OPERATION op, char* buff, size_t sz)
-{
- if(op==MAAT_OP_RENEW_TIMEOUT) op=MAAT_OP_ADD;
- const char* service_define=huge_service_defined?huge_service_defined:(strlen(p_m_rule->service_defined)?p_m_rule->service_defined:"null");
-
- snprintf(buff, sz, "%d\t%d\t%hhu\t%hhu\t%hhu\t0\t%s\t%d\t%d",
- p_m_rule->config_id,
- p_m_rule->service_id,
- p_m_rule->action,
- p_m_rule->do_blacklist,
- p_m_rule->do_log,
- service_define,
- op,
- clause_num);
- return;
-}
-void serialize_region(const struct Maat_cmd_region* p, int group_id, char* buff, size_t sz)
-{
- UNUSED size_t ret=0;
- switch(p->region_type)
- {
- case REGION_IP:
- ret=snprintf(buff, sz, "%d\t%d\t%d\t%s\t%s\t%hu\t%hu\t%s\t%s\t%hu\t%hu\t%d\t%d\t1",
- p->region_id,
- group_id,
- p->ip_rule.addr_type,
- p->ip_rule.src_ip,
- p->ip_rule.mask_src_ip,
- p->ip_rule.src_port,
- p->ip_rule.mask_src_port,
- p->ip_rule.dst_ip,
- p->ip_rule.mask_dst_ip,
- p->ip_rule.dst_port,
- p->ip_rule.mask_dst_port,
- p->ip_rule.protocol,
- p->ip_rule.direction);
- break;
- case REGION_IP_PLUS:
- ret=snprintf(buff, sz, "%d\t%d\t%d\t%s\t%s\t%s\t%s\t%hu\t%hu\t%s\t%s\t%s\t%s\t%hu\t%hu\t%d\t%d\t1",
- p->region_id,
- group_id,
- p->ip_plus_rule.addr_type,
- p->ip_plus_rule.saddr_format,
- p->ip_plus_rule.src_ip1,
- p->ip_plus_rule.src_ip2,
- p->ip_plus_rule.sport_format,
- p->ip_plus_rule.src_port1,
- p->ip_plus_rule.src_port2,
- p->ip_plus_rule.daddr_format,
- p->ip_plus_rule.dst_ip1,
- p->ip_plus_rule.dst_ip2,
- p->ip_plus_rule.dport_format,
- p->ip_plus_rule.dst_port1,
- p->ip_plus_rule.dst_port2,
- p->ip_plus_rule.protocol,
- p->ip_plus_rule.direction);
- break;
- case REGION_EXPR:
- if(p->expr_rule.district==NULL)
- {
- ret=snprintf(buff,sz,"%d\t%d\t%s\t%d\t%d\t%d\t1",
- p->region_id,
- group_id,
- p->expr_rule.keywords,
- p->expr_rule.expr_type,
- p->expr_rule.match_method,
- p->expr_rule.hex_bin);
- }
- else //expr_plus
- {
- ret=snprintf(buff,sz,"%d\t%d\t%s\t%s\t%d\t%d\t%d\t1",
- p->region_id,
- group_id,
- p->expr_rule.district,
- p->expr_rule.keywords,
- p->expr_rule.expr_type,
- p->expr_rule.match_method,
- p->expr_rule.hex_bin);
- }
- break;
- case REGION_INTERVAL:
- ret=snprintf(buff,sz,"%d\t%d\t%u\t%u\t1",
- p->region_id,
- group_id,
- p->interval_rule.low_boundary,
- p->interval_rule.up_boundary);
- break;
- case REGION_DIGEST:
- ret=snprintf(buff,sz,"%d\t%d\t%llu\t%s\t%hd\t1",
- p->region_id,
- group_id,
- p->digest_rule.orgin_len,
- p->digest_rule.digest_string,
- p->digest_rule.confidence_degree);
- break;
- case REGION_SIMILARITY:
- ret=snprintf(buff,sz,"%d\t%d\t%s\t%hd\t1",
- p->region_id,
- group_id,
- p->similarity_rule.target,
- p->similarity_rule.threshold);
- break;
- default:
- assert(0);
- }
- assert(ret<sz);
- return;
-}
-void empty_serial_rules(struct serial_rule_t* rule)
-{
- if(rule->table_line!=NULL)
- {
- free(rule->table_line);
- }
- if(rule->n_foreign>0)
- {
- for(int i=0; i<rule->n_foreign; i++)
- {
- free(rule->f_keys[i].filename);
- free(rule->f_keys[i].key);
- }
- free(rule->f_keys);
- }
- memset(rule,0,sizeof(struct serial_rule_t));
- return;
-}
-void set_serial_rule(struct serial_rule_t* rule, enum MAAT_OPERATION op, unsigned long rule_id,int label_id,const char* table_name,const char* line, long long timeout)
-{
- memset(rule, 0, sizeof(struct serial_rule_t));
- rule->op=op;
- rule->rule_id=rule_id;
- rule->label_id=label_id;
- rule->timeout=timeout;
- assert(strlen(table_name)<sizeof(rule->table_name));
- strncpy(rule->table_name, table_name, sizeof(rule->table_name));
- if(line!=NULL)
- {
- rule->table_line=_maat_strdup(line);
- }
- return;
-}
-int get_inc_key_list(long long instance_version, long long target_version, redisContext *c, struct serial_rule_t** list,void* logger)
-{
- redisReply* reply=NULL,*tmp_reply=NULL;
- char err_buff[256], op_str[4];
- int rule_num=0;
- UNUSED int ret=0;
- unsigned int i=0, j=0;
- long long nearest_rule_version;
- struct serial_rule_t *s_rule=NULL;
-
- //Returns all the elements in the sorted set at key with a score that instance_version < score <= redis_version.
- //The elements are considered to be ordered from low to high scores(instance_version).
- reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld",mr_status_sset,instance_version,target_version);
-
- if(reply==NULL)
- {
- __redis_strerror_r(errno,err_buff,sizeof(err_buff)-1);
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
- "GET %s failed %s.",mr_status_sset,err_buff);
- return -1;
- }
- assert(reply->type==REDIS_REPLY_ARRAY);
- rule_num=reply->elements;
- if(reply->elements==0)
- {
- freeReplyObject(reply);
- reply=NULL;
- return 0;
- }
-
- tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",mr_status_sset,reply->element[0]->str);
- if(tmp_reply->type!=REDIS_REPLY_STRING)
- {
- MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
- "ZSCORE %s %s failed Version: %lld->%lld",mr_status_sset,reply->element[0]->str,instance_version, target_version);
- freeReplyObject(tmp_reply);
- tmp_reply=NULL;
- freeReplyObject(reply);
- reply=NULL;
- return -1;
- }
- nearest_rule_version=read_redis_integer(tmp_reply);
- freeReplyObject(tmp_reply);
- tmp_reply=NULL;
- if(nearest_rule_version<0)
- {
- return -1;
- }
- if(nearest_rule_version!=instance_version+1)
- {
- MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
- "Noncontinuous VERSION Redis: %lld MAAT: %lld.",nearest_rule_version,instance_version);
- }
- s_rule=(struct serial_rule_t*)calloc(reply->elements,sizeof(struct serial_rule_t));
- for(i=0, j=0;i<reply->elements;i++)
- {
- assert(reply->element[i]->type==REDIS_REPLY_STRING);
- ret=sscanf(reply->element[i]->str,"%[^,],%[^,],%lu",op_str,s_rule[j].table_name,&(s_rule[j].rule_id));
- if(ret!=3||s_rule[i].rule_id<0)
- {
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
- "Invalid Redis Key: %s",reply->element[i]->str);
- continue;
- }
- if(strncmp(op_str,"ADD",strlen("ADD"))==0)
- {
- s_rule[j].op=MAAT_OP_ADD;
- }
- else if(strncmp(op_str,"DEL",strlen("DEL"))==0)
- {
- s_rule[j].op=MAAT_OP_DEL;
- }
- else
- {
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
- "Invalid Redis Key: %s",reply->element[i]->str);
- continue;
- }
- j++;
- }
- rule_num=j;
- *list=s_rule;
- freeReplyObject(reply);
- reply=NULL;
-
- return rule_num;
-}
-struct s_rule_array_t
-{
- int cnt;
- int size;
- struct serial_rule_t* array;
-};
-void save_serial_rule_cb(const uchar * key, uint size, void * data, void * user)
-{
- struct s_rule_array_t* array=(struct s_rule_array_t*)user;
- int i=array->cnt;
- memcpy(&(array->array[i]),data,sizeof(struct serial_rule_t));
- array->array[i].op=MAAT_OP_ADD;
- return;
-}
-int recovery_history_version(const struct serial_rule_t* current, int current_num, const struct serial_rule_t* changed, int changed_num, struct serial_rule_t** history_result)
-{
- int i=0,ret=0;
- unsigned int history_num=0;
- int hash_slot_size=1;
- MESA_htable_handle htable=NULL;
- MESA_htable_create_args_t hargs;
- struct s_rule_array_t tmp_array;
- char hkey[256+20];
- int tmp=current_num+changed_num;
- for(;tmp>0;tmp=tmp/2)
- {
- hash_slot_size*=2;
- }
-
- memset(&hargs,0,sizeof(hargs));
- hargs.thread_safe=0;
- hargs.hash_slot_size = hash_slot_size;
- hargs.max_elem_num = 0;
- hargs.eliminate_type = HASH_ELIMINATE_ALGO_FIFO;
- hargs.expire_time = 0;
- hargs.key_comp = NULL;
- hargs.key2index = NULL;
- hargs.recursive = 1;
- hargs.data_free = NULL;//data is an reference, no need to free.
- hargs.data_expire_with_condition = NULL;
- htable=MESA_htable_create(&hargs, sizeof(hargs));
- MESA_htable_print_crtl(htable, 0);
-
- for(i=0;i<current_num;i++)
- {
- snprintf(hkey,sizeof(hkey),"%ld,%s",current[i].rule_id,current[i].table_name);
- ret=MESA_htable_add(htable, (uchar*)hkey, strlen(hkey), current+i);
- assert(ret>0);
- }
-
- for(i=changed_num-1;i>=0;i--)
- {
- snprintf(hkey,sizeof(hkey),"%ld,%s",changed[i].rule_id,changed[i].table_name);
- if(changed[i].op==MAAT_OP_ADD)//newly added rule is need to delete from current, so that history version can be recovered.
- {
- ret=MESA_htable_del(htable, (uchar*)hkey, strlen(hkey), NULL);
- }
- else
- {
- ret=MESA_htable_add(htable, (uchar*)hkey, strlen(hkey),changed+i);
- }
- if(ret<0)//failed
- {
- goto error_out;
- }
- }
- history_num=MESA_htable_get_elem_num(htable);
- tmp_array.cnt=0;
- tmp_array.size=history_num;
- tmp_array.array=(struct serial_rule_t*)calloc(history_num,sizeof(struct serial_rule_t));
- MESA_htable_iterate(htable, save_serial_rule_cb, &tmp_array);
- *history_result=tmp_array.array;
- ret=history_num;
-error_out:
- MESA_htable_destroy(htable, NULL);
- return ret;
-}
-
-int get_rm_key_list(redisContext *c, long long instance_version, long long desired_version, long long* new_version, struct Maat_table_manager* table_mgr, struct serial_rule_t** list,int *update_type, void* logger, int cumulative_off)
-{
- redisReply* reply=NULL,*sub_reply=NULL;
- char err_buff[256];
- long long redis_version=0,target_version=0;
- int rule_num=0, changed_rule_num=0, table_id=0;
- int ret=0;
- unsigned int i=0,full_idx =0,append_cmd_cnt=0;
- struct serial_rule_t *s_rule_array=NULL, *changed_rule_array=NULL, *history_rule_array=NULL;
-
- reply=(redisReply*)redisCommand(c, "GET MAAT_VERSION");
- if(reply!=NULL)
- {
-
- if(reply->type==REDIS_REPLY_NIL||reply->type==REDIS_REPLY_ERROR)
- {
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"GET MAAT_VERSION failed, maybe Redis is busy.");
- freeReplyObject(reply);
- reply=NULL;
- return -1;
- }
- }
- else
- {
- memset(err_buff, 0, sizeof(err_buff));
- __redis_strerror_r(errno, err_buff, sizeof(err_buff)-1);
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
- "GET MAAT_VERSION failed %s.",err_buff);
- return -1;
- }
- redis_version=read_redis_integer(reply);
- if(redis_version<0)
- {
- if(reply->type==REDIS_REPLY_ERROR)
- {
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
- "Redis Communication error: %s.",reply->str);
- }
- return -1;
- }
- freeReplyObject(reply);
- reply=NULL;
- if(redis_version==instance_version)
- {
- return 0;
- }
-
- if(instance_version==0||desired_version!=0)
- {
- goto FULL_UPDATE;
- }
- if(redis_version<instance_version)
- {
- MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
- "VERSION roll back MAAT: %lld -> Redis: %lld.",instance_version,redis_version);
- goto FULL_UPDATE;
- }
- if(redis_version>instance_version&&cumulative_off==1)
- {
- target_version=instance_version;
- }
- else
- {
- target_version=redis_version-1;
- }
- do{
- target_version++;
- rule_num=get_inc_key_list(instance_version, target_version, c, &s_rule_array,logger);
- if(rule_num>0)
- {
- break;
- }
- else if(rule_num<0)
- {
- goto FULL_UPDATE;
- }
- else
- {
- //ret=0, nothing to do.
- }
-
- }while(rule_num==0&&target_version<=redis_version&&cumulative_off==1);
- if(rule_num==0)
- {
- MESA_handle_runtime_log(logger, RLOG_LV_DEBUG, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%lld %lld, cumulative %s"
- ,mr_status_sset,instance_version,target_version-1,cumulative_off==1?"OFF":"ON");
- return 0;
- }
- MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
- "Inc Update from instance_version %lld to %lld (%d entries).",instance_version,target_version,rule_num);
- *list=s_rule_array;
- *update_type=CM_UPDATE_TYPE_INC;
- *new_version=target_version;
- return rule_num;
-
-FULL_UPDATE:
- MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
- "Initiate full udpate from instance_version %d to %lld.",instance_version,desired_version==0?redis_version:desired_version);
- append_cmd_cnt=0;
- ret=redisAppendCommand(c, "MULTI");
- append_cmd_cnt++;
- ret=redisAppendCommand(c, "GET MAAT_VERSION");
- append_cmd_cnt++;
- ret=redisAppendCommand(c, "KEYS EFFECTIVE_RULE:*");
- append_cmd_cnt++;
- //consume reply "OK" and "QUEUED".
- for(i=0;i<append_cmd_cnt;i++)
- {
- _wrap_redisGetReply(c, &reply);
- freeReplyObject(reply);
- reply=NULL;
- }
- reply=_wrap_redisCommand(c,"EXEC");
- if(reply==NULL)
- {
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
- "Redis Communication error: %s.",c->errstr);
- return -1;
- }
- if(reply->type!=REDIS_REPLY_ARRAY)
- {
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
- "Invalid Redis Key List type %d", reply->type);
- freeReplyObject(reply);
- reply=NULL;
- return -1;
- }
- *new_version=read_redis_integer(reply->element[0]);
- sub_reply=reply->element[1];
- if(sub_reply->type!=REDIS_REPLY_ARRAY)
- {
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
- "Invalid Redis Key List type %d", sub_reply->type);
- freeReplyObject(reply);
- reply=NULL;
- return -1;
- }
-
- s_rule_array=(struct serial_rule_t*)calloc(sub_reply->elements,sizeof(struct serial_rule_t));
- for(i=0, full_idx=0; i<sub_reply->elements; i++)
- {
- if(sub_reply->element[i]->type!=REDIS_REPLY_STRING)
- {
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
- "Invalid Redis Key Type: %d", sub_reply->element[i]->type);
- continue;
- }
- ret=sscanf(sub_reply->element[i]->str,"%*[^:]:%[^,],%ld",s_rule_array[full_idx].table_name,&(s_rule_array[full_idx].rule_id));
- s_rule_array[full_idx].op=MAAT_OP_ADD;
- if(ret!=2||s_rule_array[full_idx].rule_id<0||strlen(s_rule_array[full_idx].table_name)==0)
- {
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
- "Invalid Redis Key Format: %s", sub_reply->element[i]->str);
- continue;
- }
- if(table_mgr)
- {
- table_id=Maat_table_get_id_by_name(table_mgr, s_rule_array[full_idx].table_name);
- if(table_id<0)//Unrecognized table.
- {
- continue;
- }
- }
- full_idx++;
- }
- rule_num=full_idx;
- freeReplyObject(reply);
- reply=NULL;
- if(desired_version!=0)
- {
- changed_rule_num=get_inc_key_list(desired_version, redis_version, c, &changed_rule_array, logger);
- if(changed_rule_num<0)
- {
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
- "Recover history version %lld faild where as redis version is %lld.", desired_version, redis_version);
- }
- else if(changed_rule_num==0)
- {
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
- "Nothing to recover from history version %lld to redis version is %lld.", desired_version, redis_version);
- }
- else
- {
- ret=recovery_history_version(s_rule_array, full_idx, changed_rule_array, changed_rule_num, &history_rule_array);
- if(ret>0)
- {
- free(s_rule_array);
- s_rule_array=history_rule_array;
- rule_num=ret;
- *new_version=desired_version;
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
- "Successfully recovered from history version %lld to redis version is %lld.", desired_version, redis_version);
- }
- }
- free(changed_rule_array);
- }
- *list=s_rule_array;
- *update_type=CM_UPDATE_TYPE_FULL;
- MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
- "Full update %d keys of version %lld.", rule_num, *new_version);
-
- return rule_num ;
-}
-
-int _get_maat_redis_value(redisContext *c, struct serial_rule_t* rule_list, int rule_num, void* logger)
-{
- int i=0,failed_cnt=0,idx=0;
- UNUSED int ret=0;
- int error_happened=0;
- int *retry_ids=(int*)malloc(sizeof(int)*rule_num);
- char redis_cmd[256];
- redisReply* reply=NULL;
- for(i=0;i<rule_num;i++)
- {
- snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%ld",mr_key_prefix[rule_list[i].op]
- ,rule_list[i].table_name
- ,rule_list[i].rule_id);
- ret=redisAppendCommand(c, redis_cmd);
- assert(ret==REDIS_OK);
- }
- for(i=0;i<rule_num;i++)
- {
- ret=_wrap_redisGetReply(c,&reply);
- if(ret==REDIS_ERR)
- {
- MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,
- "Redis GET %s:%s,%d failed, redis server error.", mr_key_prefix[rule_list[i].op],
- rule_list[i].table_name,
- rule_list[i].rule_id);
- error_happened=1;
- break;
- }
- if(reply->type==REDIS_REPLY_STRING)
- {
- rule_list[i].table_line=_maat_strdup(reply->str);
- }
- else
- {
- if(reply->type==REDIS_REPLY_NIL)
- {
- retry_ids[failed_cnt]=i;
- failed_cnt++;
- }
- else
- {
- MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor
- ,"Redis GET %s:%s,%d failed",mr_key_prefix[rule_list[i].op]
- ,rule_list[i].table_name
- ,rule_list[i].rule_id);
- error_happened=1;
- }
- }
- freeReplyObject(reply);
- reply=NULL;
- }
- if(error_happened==1)
- {
- free(retry_ids);
- return -1;
- }
-
- for(i=0;i<failed_cnt;i++)
- {
- idx=retry_ids[i];
- snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%ld",mr_key_prefix[MAAT_OP_DEL]
- ,rule_list[idx].table_name
- ,rule_list[idx].rule_id);
- ret=redisAppendCommand(c, redis_cmd);
- }
- for(i=0;i<failed_cnt;i++)
- {
- idx=retry_ids[i];
- ret=_wrap_redisGetReply(c,&reply);
- if(ret==REDIS_ERR)
- {
- MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor
- ,"redis command %s failed, redis server error.", redis_cmd);
- free(retry_ids);
- return -1;
- }
- if(reply->type==REDIS_REPLY_STRING)
- {
- rule_list[idx].table_line=_maat_strdup(reply->str);
- }
- else if(reply->type==REDIS_REPLY_ERROR)//Deal with Redis response: "Loading Redis is loading the database in memory"
- {
- MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor,
- "redis command %s error, reply type=%d, error str=%s", redis_cmd, reply->type, reply->str);
- }
- else //Handle type "nil"
- {
- MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor,
- "redis command %s failed, reply type=%d", redis_cmd, reply->type);
- }
-
- freeReplyObject(reply);
- reply=NULL;
-
- }
- free(retry_ids);
- return 0;
-}
-int get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,void* logger,int print_process)
-{
- int max_redis_batch=4*1024,batch_cnt=0;
- int success_cnt=0,ret=0;
- int next_print=10;
- while(success_cnt<rule_num)
- {
- batch_cnt=MIN(rule_num-success_cnt,max_redis_batch);
- ret=_get_maat_redis_value(c,rule_list+success_cnt,batch_cnt,logger);
- if(ret<0)
- {
- return -1;
- }
- else
- {
- success_cnt+=batch_cnt;
- }
- if(print_process==1)
- {
- if((success_cnt*100)/rule_num>next_print)
- {
- printf(" >%d%%",next_print);
- next_print+=10;
- }
- }
- }
- if(print_process==1)
- {
- printf(" >100%%\n");
- }
- return 0;
-}
-
-int mr_transaction_success(redisReply* data_reply)
-{
- if(data_reply->type==REDIS_REPLY_NIL)
- {
- return 0;
- }
- else
- {
- return 1;
- }
-}
-
-int redlock_try_lock(redisContext *ctx, const char* lock_name, long long expire)
-{
- redisReply* reply=NULL;
- int ret=0;
- reply=_wrap_redisCommand(ctx,"SET %s locked NX PX %lld", lock_name, expire);
- if(reply->type==REDIS_REPLY_NIL)
- {
- ret=0;
- }
- else
- {
- ret=1;
- }
- freeReplyObject(reply);
- reply=NULL;
-
- return ret;
-}
-void redlock_unlock(redisContext * ctx, const char * lock_name)
-{
- redisReply* reply=NULL;
- reply=_wrap_redisCommand(ctx,"DEL %s", lock_name);
- freeReplyObject(reply);
- reply=NULL;
-
-}
-#define POSSIBLE_REDIS_REPLY_SIZE 2
-struct expected_reply
-{
- int srule_seq;
- int possible_reply_num;
- redisReply possible_replies[POSSIBLE_REDIS_REPLY_SIZE];
-};
-void expected_reply_add(struct expected_reply* expected, int srule_seq, int type, long long integer)
-{
- int i=expected->possible_reply_num;
- assert(i<POSSIBLE_REDIS_REPLY_SIZE);
- expected->srule_seq=srule_seq;
- expected->possible_replies[i].type=type;
- expected->possible_replies[i].integer=integer;
- expected->possible_reply_num++;
-}
-int mr_operation_success(redisReply* actual_reply, struct expected_reply* expected)
-{
- int i=0;
- if(expected->possible_replies[0].type!=actual_reply->type)
- {
- return 0;
- }
- for(i=0; i< expected->possible_reply_num; i++)
- {
- if(expected->possible_replies[i].type==REDIS_REPLY_INTEGER &&
- expected->possible_replies[i].type==actual_reply->type &&
- expected->possible_replies[i].integer==actual_reply->integer)
- {
- return 1;
- }
- if(expected->possible_replies[i].type==REDIS_REPLY_STATUS &&
- expected->possible_replies[i].type==actual_reply->type &&
- 0==strcasecmp(actual_reply->str, "OK"))
- {
- return 1;
- }
- }
- return 0;
-
-}
-
-long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule_num,int *renew_allowed, long long *transaction_version)
-{
- int ret=-1;
- redisReply* data_reply=NULL;
- if(renew_rule_num>0)
- {
- while(0==redlock_try_lock(ctx, mr_expire_lock, mr_expire_lock_timeout))
- {
- usleep(1000);
- }
- *renew_allowed=1;
- }
- if(rule_num>renew_rule_num)
- {
- data_reply=_wrap_redisCommand(ctx, "INCRBY MAAT_PRE_VER 1");
- *transaction_version=read_redis_integer(data_reply);
- freeReplyObject(data_reply);
- data_reply=NULL;
- if(*transaction_version<0)
- {
- return -1;
- }
- }
- if(*renew_allowed==1||rule_num>renew_rule_num)
- {
- data_reply=_wrap_redisCommand(ctx,"MULTI");
- freeReplyObject(data_reply);
- data_reply=NULL;
- ret=0;
- }
- return ret;
-}
-//parameters: 4 keys: MAAT_VERSION MAAT_UPDATE_STATUS MAAT_VERSION_TIMER MAAT_TRANSACTION_xx, 1 args: SERVER_TIME
-const char* lua_exec_done=
-"local maat_version=redis.call(\'incrby\', KEYS[1], 1);"
-"local transaction=redis.call(\'lrange\', KEYS[4], 0, -1);"
-"for k,v in pairs(transaction) do"
-" redis.call(\'zadd\', KEYS[2], maat_version, v);"
-"end;"
-"redis.call(\'del\', KEYS[4]);"
-"redis.call(\'zadd\', KEYS[3], ARGV[1], maat_version);"
-"return maat_version;";
-redisReply* _exec_serial_rule_end(redisContext* ctx, const char* transaction_list, long long server_time, int renew_allowed, struct expected_reply* expect_reply, unsigned int *cnt)
-{
- redisReply* data_reply=NULL;
- if(renew_allowed==1)
- {
- redlock_unlock(ctx, mr_expire_lock);
- expect_reply[*cnt].srule_seq=-1;
- (*cnt)++;
- }
- if(strlen(transaction_list)>0)
- {
- data_reply=_wrap_redisCommand(ctx, "eval %s 4 MAAT_VERSION %s %s %s %lld",
- lua_exec_done,
- mr_status_sset,
- mr_version_sset,
- transaction_list,
- server_time);
- freeReplyObject(data_reply);
- data_reply=NULL;
- expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
- (*cnt)++;
- }
- data_reply=_wrap_redisCommand(ctx,"EXEC");
- return data_reply;
-}
-
-void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct serial_rule_t* s_rule, unsigned int rule_num, struct expected_reply* expect_reply, unsigned int *cnt, int offset,int renew_allowed)
-{
- redisReply* data_reply=NULL;
- unsigned int append_cmd_cnt=0, i=0;
- for(i=0;i<rule_num;i++)
- {
- switch(s_rule[i].op)
- {
- case MAAT_OP_ADD:
- redisAppendCommand(ctx,"SET %s:%s,%lu %s",
- mr_key_prefix[MAAT_OP_ADD],
- s_rule[i].table_name,
- s_rule[i].rule_id,
- s_rule[i].table_line);
- expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_STATUS, 0);
- (*cnt)++;
- append_cmd_cnt++;
- //Allowing add duplicated members for rule id recycling.
- redisAppendCommand(ctx,"RPUSH %s ADD,%s,%lu",
- transaction_list,
- s_rule[i].table_name,
- s_rule[i].rule_id);
- expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
- (*cnt)++;
- append_cmd_cnt++;
- if(s_rule[i].timeout>0)
- {
- redisAppendCommand(ctx,"ZADD %s %lld %s,%lu",
- mr_expire_sset,
- s_rule[i].timeout,
- s_rule[i].table_name,
- s_rule[i].rule_id);
- expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1);
- expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0);
- (*cnt)++;
- append_cmd_cnt++;
- }
- if(s_rule[i].label_id>0)
- {
- redisAppendCommand(ctx,"ZADD %s %d %s,%lu",
- mr_label_sset,
- s_rule[i].label_id,
- s_rule[i].table_name,
- s_rule[i].rule_id);
- expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1);
- expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0);
-
- (*cnt)++;
-
- append_cmd_cnt++;
- }
- break;
- case MAAT_OP_DEL:
- redisAppendCommand(ctx,"RENAME %s:%s,%lu %s:%s,%lu",
- mr_key_prefix[MAAT_OP_ADD],
- s_rule[i].table_name,
- s_rule[i].rule_id,
- mr_key_prefix[MAAT_OP_DEL],
- s_rule[i].table_name,
- s_rule[i].rule_id
- );
- expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_STATUS, 0);
- (*cnt)++;
- append_cmd_cnt++;
-
- redisAppendCommand(ctx,"EXPIRE %s:%s,%lu %d",
- mr_key_prefix[MAAT_OP_DEL],
- s_rule[i].table_name,
- s_rule[i].rule_id,
- MAAT_REDIS_SYNC_TIME);
- expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1);
- (*cnt)++;
- append_cmd_cnt++;
-
- //NX: Don't update already exisiting elements. Always add new elements.
- redisAppendCommand(ctx,"RPUSH %s DEL,%s,%lu",
- transaction_list,
- s_rule[i].table_name,
- s_rule[i].rule_id);
- expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
- (*cnt)++;
- append_cmd_cnt++;
-
- // Try to remove from expiration sorted set, no matter wheather it exists or not.
- redisAppendCommand(ctx,"ZREM %s %s,%lu",
- mr_expire_sset,
- s_rule[i].table_name,
- s_rule[i].rule_id);
- expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
- (*cnt)++;
- append_cmd_cnt++;
-
- // Try to remove from label sorted set, no matter wheather it exists or not.
- redisAppendCommand(ctx,"ZREM %s %s,%lu",
- mr_label_sset,
- s_rule[i].table_name,
- s_rule[i].rule_id);
- expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
- (*cnt)++;
- append_cmd_cnt++;
- break;
- case MAAT_OP_RENEW_TIMEOUT:
- if(renew_allowed!=1)
- {
- continue;
- }
- //s_rule[i].timeout>0 was checked by caller.
- redisAppendCommand(ctx,"ZADD %s %lld %s,%lu",
- mr_expire_sset,
- s_rule[i].timeout,
- s_rule[i].table_name,
- s_rule[i].rule_id);
- expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
- (*cnt)++;
- append_cmd_cnt++;
-
- break;
- default:
- assert(0);
- break;
- }
- }
- for(i=0;i<append_cmd_cnt;i++)
- {
- _wrap_redisGetReply(ctx, &data_reply);
- freeReplyObject(data_reply);
- data_reply=NULL;
- }
- return;
-}
-
-int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule, unsigned int serial_rule_num, long long server_time, void* logger)
-{
- unsigned int max_redis_batch=1*1024, batch_cnt=0;
- int renew_allowed=0,last_failed=-1;
- redisReply*transaction_reply=NULL,*p=NULL;
- unsigned int i=0, rule_seq=0;
-
- unsigned int multi_cmd_cnt=0, success_cnt=0;
- const int MAX_REDIS_OP_PER_SRULE=8;
- unsigned int max_multi_cmd_num=MAX_REDIS_OP_PER_SRULE*serial_rule_num+2;// 2 for operation in _exec_serial_rule_end()
- char transaction_list[MAX_TABLE_NAME_LEN*2]={0};
- struct expected_reply *expected_reply=(struct expected_reply*)calloc(sizeof(struct expected_reply), max_multi_cmd_num);
- long long transaction_version=0, transaction_finished_version=0;
- int renew_num=0,ret=0;
- for(i=0;i<serial_rule_num;i++)
- {
- if(s_rule[i].op==MAAT_OP_RENEW_TIMEOUT)
- {
- renew_num++;
- }
- }
-
- ret=_exec_serial_rule_begin(ctx, serial_rule_num, renew_num, &renew_allowed, &transaction_version);
- if(ret!=0)//Preconditions for transaction are not satisfied.
- {
- success_cnt=-1;
- goto error_out;
- }
- if(transaction_version>0)
- {
- snprintf(transaction_list, sizeof(transaction_list), "MAAT_TRANSACTION_%lld", transaction_version);
- }
- while(success_cnt<serial_rule_num)
- {
- batch_cnt=MIN(serial_rule_num-success_cnt, max_redis_batch);
- _exec_serial_rule(ctx, transaction_list, s_rule+success_cnt, batch_cnt, expected_reply, &multi_cmd_cnt,success_cnt,renew_allowed);
- assert(multi_cmd_cnt<max_multi_cmd_num);
- success_cnt+=batch_cnt;
- }
- transaction_reply=_exec_serial_rule_end(ctx, transaction_list, server_time, renew_allowed, expected_reply, &multi_cmd_cnt);
- if(1==mr_transaction_success(transaction_reply))
- {
- assert(transaction_reply->elements==multi_cmd_cnt);
- for(i=0;i<multi_cmd_cnt;i++)
- {
- p=transaction_reply->element[i];
- //failed is acceptable
- //or transaciton is success
- //or continuation of last failed
- if(expected_reply[i].srule_seq==-1||1==mr_operation_success(p, expected_reply+i)||last_failed==expected_reply[i].srule_seq)
- {
- continue;
- }
- rule_seq=expected_reply[i].srule_seq;
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_command,
- "%s %s %d failed, rule id maybe conflict or not exist.",
- mr_op_str[s_rule[rule_seq].op],
- s_rule[rule_seq].table_name,s_rule[rule_seq].rule_id);
- success_cnt--;
- last_failed=rule_seq;
- }
- }
- else
- {
- success_cnt=-1;
- }
- if(transaction_version>0)
- {
- transaction_finished_version=read_redis_integer(transaction_reply->element[multi_cmd_cnt-1]);
- MESA_handle_runtime_log(logger, RLOG_LV_DEBUG, maat_command,
- "Redis transaction MAAT_PRE_VER = %lld , MAAT_VERSION = %lld ",
- transaction_version,
- transaction_finished_version);
- }
-
- freeReplyObject(transaction_reply);
- transaction_reply=NULL;
-
-error_out:
- if(renew_num>0&&renew_allowed!=1)
- {
- for(i=0;i<(unsigned int)serial_rule_num;i++)
- {
- if(s_rule[i].op==MAAT_OP_RENEW_TIMEOUT)
- {
- MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_command
- ,"%s %s %d is not allowed due to lock contention.",mr_op_str[MAAT_OP_RENEW_TIMEOUT]
- , s_rule[i].table_name,s_rule[i].rule_id);
- }
- }
- if(success_cnt>0)
- {
- success_cnt-=renew_num;
- }
- }
- free(expected_reply);
- return success_cnt;
-}
-
-
-void check_maat_expiration(redisContext *ctx, void *logger)
-{
- unsigned int i=0,s_rule_num=0;
- UNUSED int ret=0;
- int success_cnt=0;
- redisReply* data_reply=NULL;
- struct serial_rule_t* s_rule=NULL;
- long long server_time=0;
-
- server_time=redis_server_time(ctx);
- if(!server_time)
- {
- return;
- }
- data_reply=_wrap_redisCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",mr_expire_sset,server_time);
- if(data_reply->type!=REDIS_REPLY_ARRAY||data_reply->elements==0)
- {
- freeReplyObject(data_reply);
- data_reply=NULL;
- return;
- }
- s_rule_num=data_reply->elements;
- s_rule=(struct serial_rule_t*)calloc(sizeof(struct serial_rule_t),s_rule_num);
- for(i=0;i<s_rule_num;i++)
- {
- s_rule[i].op=MAAT_OP_DEL;
- ret=sscanf(data_reply->element[i]->str,"%[^,],%ld",s_rule[i].table_name,&(s_rule[i].rule_id));
- assert(ret==2);
- }
- freeReplyObject(data_reply);
- data_reply=NULL;
- success_cnt=exec_serial_rule(ctx,s_rule, s_rule_num,server_time, logger);
-
- if(success_cnt==(int)s_rule_num)
- {
- MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor
- ,"Succesfully expired %d rules in Redis.", s_rule_num);
- }
- else
- {
- MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor
- ,"Failed to expired %d of %d rules in Redis, try later.", s_rule_num-success_cnt,s_rule_num);
- }
-
- free(s_rule);
- return;
-}
-void cleanup_update_status(redisContext *ctx, void *logger)
-{
- redisReply* reply=NULL,*sub_reply=NULL;
- int append_cmd_cnt=0,i=0;
- long long server_time=0, version_upper_bound=0,version_lower_bound=0,version_num=0,entry_num=0;
-
- server_time=redis_server_time(ctx);
- if(!server_time)
- {
- return;
- }
- reply=_wrap_redisCommand(ctx,"MULTI");
- freeReplyObject(reply);
- reply=NULL;
- redisAppendCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",mr_version_sset,server_time-MAAT_REDIS_SYNC_TIME);
- append_cmd_cnt++;
- redisAppendCommand(ctx, "ZREMRANGEBYSCORE %s -inf %lld",mr_version_sset,server_time-MAAT_REDIS_SYNC_TIME);
- append_cmd_cnt++;
- //consume reply "OK" and "QUEUED".
- for(i=0;i<append_cmd_cnt;i++)
- {
- _wrap_redisGetReply(ctx, &reply);
- freeReplyObject(reply);
- reply=NULL;
- }
- reply=_wrap_redisCommand(ctx,"EXEC");
- if(reply->type!=REDIS_REPLY_ARRAY)
- {
- goto error_out;
- }
- sub_reply=reply->element[0];
- if(sub_reply->type!=REDIS_REPLY_ARRAY)
- {
- goto error_out;
- }
- version_num=sub_reply->elements;
- if(version_num==0)
- {
- goto error_out;
- }
- version_lower_bound=read_redis_integer(sub_reply->element[0]);
- version_upper_bound=read_redis_integer(sub_reply->element[sub_reply->elements-1]);
- freeReplyObject(reply);
- reply=NULL;
-
- //To deal with maat_version reset to 0, do NOT use -inf as lower bound intentionally.
- reply=_wrap_redisCommand(ctx,"ZREMRANGEBYSCORE %s %lld %lld",mr_status_sset,version_lower_bound,version_upper_bound);
- entry_num=read_redis_integer(reply);
- freeReplyObject(reply);
- reply=NULL;
-
- MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor
- ,"Clean up update status from version %lld to %lld (%lld versions, %lld entries)."
- ,version_lower_bound
- ,version_upper_bound
- ,version_num
- ,entry_num);
- return;
-
-error_out:
- freeReplyObject(reply);
- reply=NULL;
- return;
-}
-const char* find_Nth_column(const char* line, int Nth, int* column_len)
-{
- size_t i=0;
- int j=0;
- int start=0, end=0;
- size_t line_len= strlen(line);
- for(i=0;i<line_len;i++)
- {
- if(line[i]!=' '&&line[i]!='\t')
- {
- continue;
- }
- j++;
- if(j==Nth-1)
- {
- start=i+1;
- }
- if(j==Nth)
- {
- end=i;
- break;
- }
- }
- if(start==end)
- {
- return NULL;
- }
- if(end==0)
- {
- end=i;
- }
- *column_len=end-start;
- return line+start;
-}
-char* get_foreign_cont_filename(const char* table_name, int rule_id, const char* foreign_key, const char* dir)
-{
- char* filename=NULL;
- char buffer[512];
- snprintf(buffer, sizeof(buffer),"%s/%s-%d-%s",dir, table_name, rule_id, foreign_key);
- filename=(char*)calloc(sizeof(char), strlen(buffer)+1);
- memcpy(filename, buffer, strlen(buffer));
- return filename;
-}
-void rewrite_table_line_with_foreign(struct serial_rule_t*p)
-{
- int origin_column_size=0;
- const char* origin_column=NULL, *pos_origin_line=NULL;
- char* pos_rewrite_line=NULL;
- char* rewrite_line=NULL;
- size_t fn_size=0;
- int i=0;
- for(i=0; i<p->n_foreign; i++)
- {
- fn_size+=strlen(p->f_keys[i].filename);
- }
-
- rewrite_line=(char*)calloc(sizeof(char), strlen(p->table_line)+fn_size);
- pos_origin_line=p->table_line;
- pos_rewrite_line=rewrite_line;
-
- for(i=0; i<p->n_foreign; i++)
- {
- origin_column=find_Nth_column(p->table_line, p->f_keys[i].column, &origin_column_size);
- strncat(pos_rewrite_line, pos_origin_line, origin_column-pos_origin_line);
- pos_rewrite_line+=origin_column-pos_origin_line;
- pos_origin_line=origin_column+origin_column_size;
-
- strncat(pos_rewrite_line, p->f_keys[i].filename, strlen(p->f_keys[i].filename));
- pos_rewrite_line+=strlen(p->f_keys[i].filename);
- }
- strncat(pos_rewrite_line, pos_origin_line, strlen(p->table_line)-(pos_origin_line-p->table_line));
-
- free(p->table_line);
- p->table_line=rewrite_line;
- return;
-}
-void _get_foregin_keys(struct serial_rule_t* p_rule, int* foreign_columns, int n_foreign, const char* dir, void* logger)
-{
- int i=0;
- const char* p_foreign=NULL;
- int foreign_key_size=0;
- p_rule->f_keys=ALLOC(struct foreign_key, n_foreign);
- for(i=0; i<n_foreign; i++)
- {
- p_foreign=find_Nth_column(p_rule->table_line, foreign_columns[i], &foreign_key_size);
- if(p_foreign==NULL)
- {
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
- "Get %s,%d foreign keys failed: No %dth column.",
- p_rule->table_name, p_rule->rule_id, foreign_columns[i]);
- continue;
- }
- if(0==strncasecmp(p_foreign, "null", strlen("null")))
- {//emtpy file
- continue;
- }
- if(0!=strncmp(p_foreign, foreign_source_prefix, strlen(foreign_source_prefix)))
- {
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
- "Get %s,%d foreign key failed: Invalid source prefix %s.",
- p_rule->table_name, p_rule->rule_id, p_foreign);
- continue;
- }
- p_rule->f_keys[p_rule->n_foreign].column=foreign_columns[i];
- foreign_key_size=foreign_key_size-strlen(foreign_source_prefix);
- p_foreign+=strlen(foreign_source_prefix);
- if(0!=strncmp(p_foreign, foreign_key_prefix, strlen(foreign_key_prefix)))
- {
- MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
- "%s, %d foreign key prefix %s is not recommended.",
- p_rule->table_name, p_rule->rule_id, p_foreign);
- }
- p_rule->f_keys[p_rule->n_foreign].key=ALLOC(char, foreign_key_size+1);
- memcpy(p_rule->f_keys[p_rule->n_foreign].key, p_foreign, foreign_key_size);
- p_rule->f_keys[p_rule->n_foreign].filename=get_foreign_cont_filename(p_rule->table_name, p_rule->rule_id, p_rule->f_keys[p_rule->n_foreign].key, dir);
- p_rule->n_foreign++;
- }
- if(p_rule->n_foreign==0)
- {
- free(p_rule->f_keys);
- p_rule->f_keys=NULL;
- }
- return;
-}
-int get_foreign_keys_define(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, _Maat_feather_t* feather, const char* dir,void *logger)
-{
- int i=0;
- int rule_with_foreign_key=0;
- struct Maat_table_schema* p_table=NULL;
- struct plugin_table_schema* plugin_desc=NULL;
- for(i=0; i<rule_num; i++)
- {
- if(rule_list[i].table_line==NULL)
- {
- continue;
- }
- p_table=Maat_table_get_desc_by_name(feather->table_mgr, rule_list[i].table_name);
- if(!p_table||p_table->table_type!=TABLE_TYPE_PLUGIN)
- {
- continue;
- }
- plugin_desc= &(p_table->plugin);
- if(plugin_desc->n_foreign==0)
- {
- continue;
- }
- _get_foregin_keys(rule_list+i, plugin_desc->foreign_columns, plugin_desc->n_foreign, dir, logger);
- rule_with_foreign_key++;
- }
- return rule_with_foreign_key;
-}
-int get_foreign_keys_by_prefix(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, const char* dir,void *logger)
-{
- int i=0, j=0, foreign_key_size=0;
- int rule_with_foreign_key=0;
- const char* p_foreign=NULL;
-
- int n_foreign=0;
- int foreign_columns[MAX_FOREIGN_CLMN_NUM];
- for(i=0; i<rule_num; i++)
- {
- j=1;
- n_foreign=0;
- do{
- p_foreign=find_Nth_column(rule_list[i].table_line, j, &foreign_key_size);
- if(p_foreign!=NULL&&foreign_key_size>(int)strlen(foreign_source_prefix)&&0==strncmp(p_foreign,foreign_source_prefix, strlen(foreign_source_prefix)))
- {
- foreign_columns[n_foreign]=j;
- n_foreign++;
- }
- j++;
- }while(p_foreign!=NULL&&n_foreign<MAX_FOREIGN_CLMN_NUM);
- if(n_foreign>0)
- {
- _get_foregin_keys(rule_list+i, foreign_columns, n_foreign,dir,logger);
- rule_with_foreign_key++;
- }
- }
- return rule_with_foreign_key;
-}
-
-struct foreign_conts_track
-{
- int rule_idx;
- int foreign_idx;
-};
-void _get_foreign_conts(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, int print_fn, void *logger)
-{
- int i=0, j=0;
- UNUSED int ret=0;
- int key_num=0;
- struct foreign_conts_track* track=ALLOC(struct foreign_conts_track, rule_num*MAX_FOREIGN_CLMN_NUM);
- char redis_cmd[256];
- redisReply* reply=NULL;
- struct serial_rule_t*p=NULL;
- FILE* fp=NULL;
- struct stat file_info;
-
- for(i=0;i<rule_num;i++)
- {
- p=rule_list+i;
- if(p->n_foreign==0)
- {
- continue;
- }
- if(p->op==MAAT_OP_DEL)
- {
- for(j=0; j<rule_list[i].n_foreign; j++)
- {
- if(rule_list[i].f_keys[j].filename==NULL)
- {
- continue;
- }
- //ret=system_cmd_rm(rule_list[i].f_keys[j].filename);
- ret=remove(rule_list[i].f_keys[j].filename);
- if(ret==-1)
- {
- MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_module,
- "Foreign content file %s remove failed.",
- rule_list[i].f_keys[j].filename);
- }
- }
- }
- else
- {
- for(j=0; j<p->n_foreign; j++)
- {
- if(rule_list[i].f_keys[j].filename==NULL)
- {
- continue;
- }
- ret=stat(p->f_keys[j].filename, &file_info);
- if(ret==0)
- {
- continue;
- }
- snprintf(redis_cmd,sizeof(redis_cmd),"GET %s", p->f_keys[j].key);
- ret=redisAppendCommand(ctx, redis_cmd);
- track[key_num].rule_idx=i;
- track[key_num].foreign_idx=j;
- key_num++;
- assert(ret==REDIS_OK);
- }
- }
- }
- for(i=0;i<key_num;i++)
- {
- ret=_wrap_redisGetReply(ctx,&reply);
- if(ret==REDIS_ERR)
- {
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor
- ,"Get %s,%d foreign key %s content failed, redis server error."
- ,rule_list[track[i].rule_idx].table_name
- ,rule_list[track[i].rule_idx].rule_id
- ,rule_list[track[i].rule_idx].f_keys[track[i].foreign_idx].key);
- break;
- }
-
- if(reply->type!=REDIS_REPLY_STRING)
- {
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor
- ,"Get %s,%d foreign key %s content failed."
- ,rule_list[track[i].rule_idx].table_name
- ,rule_list[track[i].rule_idx].rule_id
- ,rule_list[track[i].rule_idx].f_keys[track[i].foreign_idx].key);
- continue;
- }
- else
- {
- p=rule_list+track[i].rule_idx;
- fp=fopen(p->f_keys[track[i].foreign_idx].filename, "w");
- if(fp==NULL)
- {
- MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor
- , "Write foreign content failed: fopen %s error."
- , p->f_keys[track[i].foreign_idx]);
- }
- else
- {
- fwrite(reply->str, 1, reply->len, fp);
- fclose(fp);
- fp=NULL;
- if(print_fn==1)
- {
- printf("Written foreign content %s\n",p->f_keys[track[i].foreign_idx].filename);
- }
- }
- }
- freeReplyObject(reply);
- reply=NULL;
- }
-
- free(track);
- return;
-}
-void get_foreign_conts(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, int print_fn, void *logger)
-{
- int max_redis_batch=4*1024,batch_cnt=0;
- int success_cnt=0;
- while(success_cnt<rule_num)
- {
- batch_cnt=MIN(rule_num-success_cnt,max_redis_batch);
- _get_foreign_conts(ctx,rule_list+success_cnt,batch_cnt,print_fn,logger);
- success_cnt+=batch_cnt;
- }
- return;
-}
-
-void redis_monitor_traverse(long long version, struct source_redis_ctx* mr_ctx,
- void (*start)(long long,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para
- int (*update)(const char* ,const char*,void* ),//table name ,line ,u_para
- void (*finish)(void*),//u_para
- void* u_para,
- const char* dec_key,
- _Maat_feather_t* feather)
-{
- int table_id=0, i=0, rule_num=0, empty_value_num=0, valid_column=-1;
- int ret=0;
- struct serial_rule_t* rule_list=NULL;
- int update_type=CM_UPDATE_TYPE_INC;
- long long new_version=0;
- enum MAAT_TABLE_TYPE table_type;
- enum MAAT_SCAN_TYPE scan_type;
- struct Maat_table_schema* table_schema=NULL;
- void* logger=feather->logger;
-
- if(mr_ctx->write_ctx!=NULL&&mr_ctx->write_ctx->err==0)//authorized to write
- {
- //For thread safe, deliberately use redis_read_ctx but not redis_write_ctx.
- if(1==redlock_try_lock(mr_ctx->read_ctx, mr_expire_lock, mr_expire_lock_timeout))
- {
- check_maat_expiration(mr_ctx->read_ctx, logger);
- cleanup_update_status(mr_ctx->read_ctx, logger);
- redlock_unlock(mr_ctx->read_ctx, mr_expire_lock);
- }
- }
- if(mr_ctx->read_ctx==NULL||mr_ctx->read_ctx->err)
- {
- if(time(NULL)-mr_ctx->last_reconnect_time < MAAT_REDIS_RECONNECT_INTERVAL)
- {
- return;
- }
- mr_ctx->last_reconnect_time=time(NULL);
- if(mr_ctx->read_ctx!=NULL)
- {
- redisFree(mr_ctx->read_ctx);
- }
- MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, "Reconnecting...");
- mr_ctx->read_ctx=connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db, feather->logger);
- if(mr_ctx->read_ctx==NULL)
- {
- return;
- }
- else
- {
- version=0;//Trigger full update when reconnect to redis.
- }
- }
-
- rule_num=get_rm_key_list(mr_ctx->read_ctx, version, feather->load_version_from, &new_version, feather->table_mgr, &rule_list, &update_type, logger, feather->cumulative_update_off);
- if(rule_num<0)//redis communication error
- {
- return;
- }
- feather->load_version_from=0;//only valid for one time.
- if(rule_num==0&&update_type==CM_UPDATE_TYPE_INC)//error or nothing changed
- {
- return;
- }
- if(rule_num>0)
- {
- ret=get_maat_redis_value(mr_ctx->read_ctx, rule_list, rule_num, logger, 0);
- if(ret<0)
- {
- MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Get Redis value failed, abandon update.");
- goto clean_up;
- }
- for(i=0;i<rule_num;i++)
- {
- if(rule_list[i].table_line==NULL)
- {
- empty_value_num++;
- }
- }
- if(empty_value_num==rule_num)
- {
- MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor,"All %d rules are empty, abandon update.",empty_value_num);
- goto clean_up;
- }
- if(empty_value_num>0)
- {
- MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor,"%d of %d rules are empty.",empty_value_num,rule_num);
- }
- ret=get_foreign_keys_define(mr_ctx->read_ctx, rule_list, rule_num, feather, feather->foreign_cont_dir, logger);
- if(ret>0)
- {
- get_foreign_conts(mr_ctx->read_ctx, rule_list, rule_num, 0, logger);
- }
- }
- start(new_version,update_type,u_para);
- MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Start %s update: %lld -> %lld (%d entries).",
- update_type==CM_UPDATE_TYPE_INC?"INC":"FULL",version,new_version,rule_num);
- for(i=0;i<rule_num;i++)
- {
- if(rule_list[i].table_line==NULL||rule_list[i].with_error==1)
- {
- continue;
- }
- table_id=Maat_table_get_id_by_name(feather->table_mgr, rule_list[i].table_name);
- if(table_id<0)//Unrecognized table.
- {
- continue;
- }
- table_type=Maat_table_get_type_by_id(feather->table_mgr, table_id);
- if(rule_list[i].op==MAAT_OP_DEL)
- {
-
- scan_type=Maat_table_get_scan_type(table_type);
- table_schema=Maat_table_get_scan_by_id(feather->table_mgr, table_id, scan_type, NULL);
- valid_column=Maat_table_xx_plugin_table_get_valid_flag_column(table_schema);
- ret=invalidate_line(rule_list[i].table_line, table_type, valid_column);
- if(ret<0)
- {
- MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Invalidate line failed, invaid format %s ."
- ,rule_list[i].table_line);
- continue;
- }
- }
- if(rule_list[i].n_foreign>0)
- {
- rewrite_table_line_with_foreign(rule_list+i);
- }
- update(rule_list[i].table_name,rule_list[i].table_line,u_para);
- }
- finish(u_para);
-
-clean_up:
- for(i=0;i<rule_num;i++)
- {
- empty_serial_rules(rule_list+i);
- }
- free(rule_list);
- rule_list=NULL;
- return;
-}
-
-void _maat_copy_region(struct Maat_region_t* dst,const struct Maat_region_t* src)
-{
- memcpy(dst,src,sizeof(struct Maat_region_t));
- if(src->table_name!=NULL)
- {
- dst->table_name=_maat_strdup(src->table_name);
- }
- switch(dst->region_type)
- {
- case REGION_IP:
- dst->ip_rule.src_ip=_maat_strdup(src->ip_rule.src_ip);
- dst->ip_rule.mask_src_ip=_maat_strdup(src->ip_rule.mask_src_ip);
- dst->ip_rule.dst_ip=_maat_strdup(src->ip_rule.dst_ip);
- dst->ip_rule.mask_dst_ip=_maat_strdup(src->ip_rule.mask_dst_ip);
- break;
- case REGION_EXPR:
- dst->expr_rule.keywords=_maat_strdup(src->expr_rule.keywords);
- dst->expr_rule.district=_maat_strdup(src->expr_rule.district);
- break;
- case REGION_INTERVAL:
- break;
- case REGION_DIGEST:
- dst->digest_rule.digest_string=_maat_strdup(src->digest_rule.digest_string);
- break;
- case REGION_SIMILARITY:
- dst->similarity_rule.target=_maat_strdup(src->similarity_rule.target);
- break;
- default:
- assert(0);
- }
- return;
-}
-void _maat_empty_region(struct Maat_region_t* p)
-{
- free((char*)p->table_name);
- p->table_name=NULL;
- switch(p->region_type)
- {
- case REGION_IP:
- free((char*)p->ip_rule.src_ip);
- free((char*)p->ip_rule.mask_src_ip);
- free((char*)p->ip_rule.dst_ip);
- free((char*)p->ip_rule.mask_dst_ip);
- break;
- case REGION_EXPR:
- free((char*)p->expr_rule.keywords);
- free((char*)p->expr_rule.district);
- break;
- case REGION_INTERVAL:
- break;
- case REGION_DIGEST:
- free((char*)p->digest_rule.digest_string);
- break;
- case REGION_SIMILARITY:
- free((char*)p->similarity_rule.target);
- break;
- default:
- assert(0);
- }
- memset(p,0,sizeof(struct Maat_region_t));
- return;
-
-}
-int Maat_command_raw_set_lines(Maat_feather_t feather,const struct Maat_cmd_line** line_rule, size_t n_line ,enum MAAT_OPERATION op)
-{
- size_t i=0;
- _Maat_feather_t* _feather=(_Maat_feather_t*)feather;
- int ret=0, table_id=0,success_cnt=0;
- struct serial_rule_t *s_rule=NULL;
- long long server_time=0,absolute_expire_time=0;
- redisContext* write_ctx=get_redis_ctx_for_write(_feather);
- if(write_ctx==NULL)
- {
- return -1;
- }
- server_time=redis_server_time(write_ctx);
- if(!server_time)
- {
- return -1;
- }
- s_rule=ALLOC(struct serial_rule_t, n_line);
- for(i=0;i<n_line;i++)
- {
- table_id=Maat_table_get_id_by_name(_feather->table_mgr, line_rule[i]->table_name);
- if(table_id<0)
- {
- MESA_handle_runtime_log(_feather->logger, RLOG_LV_FATAL, maat_command,
- "Command raw set line id %d failed: unknown table %s.",
- line_rule[i]->rule_id,
- line_rule[i]->table_name);
- ret=-1;
- goto error_out;
- }
- if(op==MAAT_OP_RENEW_TIMEOUT)
- {
- assert(line_rule[i]->expire_after>0);
- }
- if(line_rule[i]->expire_after>0)
- {
- absolute_expire_time=server_time+line_rule[i]->expire_after;
- }
- set_serial_rule(s_rule+i, op,line_rule[i]->rule_id,line_rule[i]->label_id, line_rule[i]->table_name,
- line_rule[i]->table_line, absolute_expire_time);
- }
- success_cnt=exec_serial_rule(write_ctx,s_rule, n_line,server_time,_feather->logger);
- if(success_cnt<0||(size_t)success_cnt!=n_line)//error
- {
- ret=-1;
- goto error_out;
- }
- ret=success_cnt;
- _feather->line_cmd_acc_num+=success_cnt;
-
-error_out:
- for(i=0;i<n_line;i++)
- {
- empty_serial_rules(s_rule+i);
- }
- free(s_rule);
- return ret;
-
-}
-int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_cmd_line** line_rule, int line_num ,enum MAAT_OPERATION op)
-{
- int i=0, j=0;
- _Maat_feather_t* _feather=(_Maat_feather_t*)feather;
- int ret=0, table_id=0,success_cnt=0;
- struct serial_rule_t *s_rule=NULL;
- struct Maat_table_schema* p_table=NULL;
- struct plugin_table_schema* plugin_desc=NULL;
- long long server_time=0,absolute_expire_time=0;
- const char* p_foreign=NULL;
- int foreign_key_size=0;
- redisContext* write_ctx=get_redis_ctx_for_write(_feather);
- if(write_ctx==NULL)
- {
- return -1;
- }
- server_time=redis_server_time(write_ctx);
- if(!server_time)
- {
- return -1;
- }
- s_rule=ALLOC(struct serial_rule_t, line_num);
- for(i=0;i<line_num;i++)
- {
- table_id=Maat_table_get_id_by_name(_feather->table_mgr, line_rule[i]->table_name);
- if(table_id<0)
- {
- MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command
- ,"Command set line id %d failed: unknown table %s."
- , line_rule[i]->rule_id
- , line_rule[i]->table_name);
- ret=-1;
- goto error_out;
- }
- p_table=Maat_table_get_by_id_raw(_feather->table_mgr, table_id);
- if(!p_table)
- {
- ret=-1;
- goto error_out;
- }
- int valid_flag_column=0;
-
- valid_flag_column=Maat_table_xx_plugin_table_get_valid_flag_column(p_table);
- if(valid_flag_column<0)
- {
- MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command
- ,"Command set line id %d failed: table %s is not a plugin or ip_plugin table."
- , line_rule[i]->rule_id
- , line_rule[i]->table_name);
- ret=-1;
- goto error_out;
-
- }
-
- if(op==MAAT_OP_ADD)
- {
- ret=get_valid_flag_offset(line_rule[i]->table_line
- , p_table->table_type
- , valid_flag_column);
- if(ret<0||
- (op==MAAT_OP_ADD&&line_rule[i]->table_line[ret]!='1'))
- {
- MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command
- ,"Command set line %s %d failed: illegal valid flag."
- , line_rule[i]->table_name, line_rule[i]->rule_id);
- ret=-1;
- goto error_out;
- }
- }
- if(op==MAAT_OP_RENEW_TIMEOUT)
- {
- assert(line_rule[i]->expire_after>0);
- }
- if(line_rule[i]->expire_after>0)
- {
- absolute_expire_time=server_time+line_rule[i]->expire_after;
- }
- if(plugin_desc && plugin_desc->n_foreign>0)
- {
- for(j=0;j<plugin_desc->n_foreign;j++)
- {
- p_foreign=find_Nth_column(line_rule[i]->table_line, plugin_desc->foreign_columns[j], &foreign_key_size);
- if(p_foreign==NULL)
- {
- MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL, maat_command
- , "Command set line %s %d failed: No %dth column."
- , line_rule[i]->table_name, line_rule[i]->rule_id
- , plugin_desc->foreign_columns[j]);
- ret=-1;
- goto error_out;
- }
- if(0!=strncmp(p_foreign, foreign_source_prefix, strlen(foreign_source_prefix)))
- {
- MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_redis_monitor
- ,"Command set line %s %d failed: Source prefix %s is mandatory."
- , line_rule[i]->table_name, line_rule[i]->rule_id, foreign_source_prefix);
- ret=-1;
- goto error_out;
- }
- }
-
- }
- set_serial_rule(s_rule+i, op,line_rule[i]->rule_id,line_rule[i]->label_id, line_rule[i]->table_name,
- line_rule[i]->table_line, absolute_expire_time);
- }
- success_cnt=exec_serial_rule(write_ctx,s_rule, line_num,server_time,_feather->logger);
- if(success_cnt<0||success_cnt!=line_num)//error
- {
- ret=-1;
- goto error_out;
- }
- ret=success_cnt;
- _feather->line_cmd_acc_num+=success_cnt;
-
-error_out:
- for(i=0;i<line_num;i++)
- {
- empty_serial_rules(s_rule+i);
- }
- free(s_rule);
- return ret;
-}
-
-int Maat_cmd_set_line(Maat_feather_t feather,const struct Maat_cmd_line* line_rule, enum MAAT_OPERATION op)
-{
- int ret=0;
- ret=Maat_cmd_set_lines(feather,&line_rule, 1, op);
- return ret;
-}
-int Maat_cmd_set_file(Maat_feather_t feather,const char* key, const char* value, size_t size, enum MAAT_OPERATION op)
-{
- struct _Maat_feather_t* _feather=(struct _Maat_feather_t*)feather;
- redisContext* ctx=_feather->mr_ctx.write_ctx;
- if(ctx==NULL)
- {
- MESA_handle_runtime_log(_feather->logger, RLOG_LV_FATAL, maat_command, "%s failed: Redis is not connected.", __FUNCTION__);
- return -1;
- }
- const char *arg_vec[3];
- size_t len_vec[3];
- arg_vec[0] = "SET";
- len_vec[0] = strlen("SET");
-
- arg_vec[1] = key;
- len_vec[1] = strlen(key);
-
- arg_vec[2] = value;
- len_vec[2] = size;
-
- redisReply *reply=NULL;
- if(0!=strncmp(key, foreign_key_prefix, strlen(foreign_key_prefix)))
- {
- MESA_handle_runtime_log(_feather->logger, RLOG_LV_FATAL, maat_command, "Invalid File key, prefix %s is mandatory.", foreign_key_prefix);
- return -1;
- }
- switch(op)
- {
- case MAAT_OP_ADD:
- reply= (redisReply *)redisCommandArgv(ctx, sizeof(arg_vec) / sizeof(arg_vec[0]), arg_vec, len_vec);
- break;
- case MAAT_OP_DEL:
- reply=_wrap_redisCommand(ctx,"EXPIRE %s %d", key, MAAT_REDIS_SYNC_TIME);
- break;
- default:
- return -1;
- break;
- }
- if(reply==NULL||reply->type==REDIS_REPLY_NIL||reply->type==REDIS_REPLY_ERROR)
- {
- MESA_handle_runtime_log(_feather->logger, RLOG_LV_FATAL, maat_command,"Set file failed, maybe Redis is busy.");
- freeReplyObject(reply);
- reply=NULL;
- return -1;
- }
- freeReplyObject(reply);
- reply=NULL;
- return 1;
-}
-
-long long Maat_cmd_incrby(Maat_feather_t feather,const char* key, int increment)
-{
- _Maat_feather_t* _feather=(_Maat_feather_t*)feather;
- redisReply* data_reply=NULL;
- long long result=0;
- redisContext* write_ctx=get_redis_ctx_for_write(_feather);
-
- if(write_ctx==NULL)
- {
- return -1;
- }
- data_reply=_wrap_redisCommand(write_ctx,"INCRBY %s %d", key, increment);
- if(data_reply->type==REDIS_REPLY_INTEGER)
- {
- result=data_reply->integer;
- }
- else
- {
- result=-1;
- }
- freeReplyObject(data_reply);
- data_reply=NULL;
- return result;
-}
-int Maat_command_get_new_group_id(Maat_feather_t feather)
-{
- int group_id=0;
- group_id=(int) Maat_cmd_incrby(feather, mr_group_id_var, 1);
- return group_id;
-}
-int Maat_command_get_new_region_id(Maat_feather_t feather)
-{
- int region_id=0;
- region_id=(int) Maat_cmd_incrby(feather, mr_region_id_var, 1);
- return region_id;
-}
-
-void Maat_cmd_key_free(struct Maat_cmd_key**keys, int size)
-{
- int i=0;
- struct Maat_cmd_key* p=*keys;
- for(i=0; i<size; i++, p++)
- {
- free(p->table_name);
- p->table_name=NULL;
- p->rule_id=0;
- }
- free(*keys);
- *keys=NULL;
- return;
-}
-
-int Maat_cmd_key_select(Maat_feather_t feather, int label_id, struct Maat_cmd_key** keys)
-{
- _Maat_feather_t* _feather=(_Maat_feather_t*)feather;
- redisReply* data_reply=NULL;
- char* tmp=NULL;
- unsigned int i=0;
- struct Maat_cmd_key* result=NULL;
- int result_cnt=0;
- redisContext* write_ctx=get_redis_ctx_for_write(_feather);
- if(write_ctx==NULL)
- {
- return -1;
- }
-
- data_reply=_wrap_redisCommand(write_ctx,"ZRANGEBYSCORE %s %d %d",
- mr_label_sset,
- label_id,
- label_id);
- result_cnt=data_reply->elements;
- result=ALLOC(struct Maat_cmd_key, data_reply->elements);
- for(i=0;i<data_reply->elements;i++)
- {
- result[i].table_name=_maat_strdup(data_reply->element[i]->str);
- tmp=strchr(result[i].table_name, ',');
- if(tmp!=NULL)
- {
- *tmp='\0';
- tmp++;
- result[i].rule_id=atoi(tmp);
- }
- else// old version compatible
- {
- result[i].rule_id=atoi(result[i].table_name);
- free(result[i].table_name);
- result[i].table_name=NULL;
- }
- }
- freeReplyObject(data_reply);
- data_reply=NULL;
-
- *keys=result;
- return result_cnt;
-}
-
-int redis_flush_DB(redisContext* ctx, int db_index, void* logger)
-{
- redisReply* data_reply=NULL;
- long long maat_redis_version=0, dbsize=0;
- int append_cmd_cnt=0, i=0,ret=0;
- int redis_transaction_success=1;
-
- data_reply=_wrap_redisCommand(ctx, "WATCH MAAT_VERSION");
- freeReplyObject(data_reply);
- data_reply=NULL;
- data_reply=_wrap_redisCommand(ctx, "GET MAAT_VERSION");
- if(data_reply->type==REDIS_REPLY_NIL)
- {
- maat_redis_version=0;
- }
- else
- {
- maat_redis_version=read_redis_integer(data_reply);
- maat_redis_version++;
- freeReplyObject(data_reply);
- data_reply=NULL;
- }
- data_reply=_wrap_redisCommand(ctx, "DBSIZE");
- dbsize=read_redis_integer(data_reply);
- freeReplyObject(data_reply);
- data_reply=NULL;
-
- data_reply=_wrap_redisCommand(ctx,"MULTI");
- freeReplyObject(data_reply);
- data_reply=NULL;
-
- redisAppendCommand(ctx,"FLUSHDB");
- append_cmd_cnt++;
- redisAppendCommand(ctx,"SET MAAT_VERSION %lld",maat_redis_version);
- append_cmd_cnt++;
- redisAppendCommand(ctx,"SET MAAT_PRE_VER %lld",maat_redis_version);
- append_cmd_cnt++;
- redisAppendCommand(ctx,"SET %s 1", mr_region_id_var);
- append_cmd_cnt++;
- redisAppendCommand(ctx,"SET %s 1", mr_group_id_var);
- append_cmd_cnt++;
- redisAppendCommand(ctx,"EXEC");
- append_cmd_cnt++;
- for(i=0;i<append_cmd_cnt;i++)
- {
- ret=_wrap_redisGetReply(ctx, &data_reply);
- if(ret==REDIS_OK)
- {
- if(0==mr_transaction_success(data_reply))
- {
- redis_transaction_success=0;
- }
- freeReplyObject(data_reply);
- data_reply=NULL;
- }
- }
- if(redis_transaction_success==1)
- {
- MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_command
- ,"FlushDB %d, MAAT_VERSION=%llu, DBSize=%llu."
- ,db_index, (maat_redis_version==0)?0:(maat_redis_version-1),dbsize
- );
- }
- return redis_transaction_success;
-}
-TAILQ_HEAD(serial_rule_q, serial_rule_t);
-
-struct Maat_command_batch
-{
- int batch_size;
- serial_rule_q queue;
- struct _Maat_feather_t * feather;
- long long server_time;
-};
-struct Maat_command_batch* Maat_command_batch_new(Maat_feather_t feather)
-{
- struct Maat_command_batch* batch=ALLOC(struct Maat_command_batch, 1);
- TAILQ_INIT(&batch->queue);
- batch->feather=(struct _Maat_feather_t *)feather;
- redisContext* write_ctx=get_redis_ctx_for_write(batch->feather);
- if(write_ctx==NULL)
- {
- free(batch);
- return NULL;
- }
- batch->server_time=redis_server_time(write_ctx);
- if(!batch->server_time)
- {
- free(batch);
- return NULL;
- }
- return batch;
-}
-
-int Maat_command_batch_set_region(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_region* region, int group_id)
-{
- struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1);
- long long absolute_expire_time=0;
- char line[MAX_TABLE_LINE_SIZE];
-
- serialize_region(region, group_id, line, sizeof(line));
-
- set_serial_rule(s_rule, op, region->region_id, 0, region->table_name,
- line, absolute_expire_time);
- TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries);
- batch->batch_size++;
- return 0;
-
-}
-#define TO_GROUP2X_KEY(group_id, parent_id) ((unsigned long)group_id<<32|parent_id)
-
-int Maat_command_batch_set_group2group(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g)
-{
- struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1);
- long long absolute_expire_time=0;
- char line[MAX_TABLE_LINE_SIZE];
-
- serialize_group2group(op, g2g, line, sizeof(line));
-
- set_serial_rule(s_rule, op, TO_GROUP2X_KEY(g2g->group_id, g2g->superior_group_id), 0, g2g->table_name,
- line, absolute_expire_time);
-
- TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries);
- batch->batch_size++;
- return 0;
-}
-int Maat_command_batch_set_group2compile(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c)
-{
- struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1);
- long long absolute_expire_time=0;
- char line[MAX_TABLE_LINE_SIZE];
-
- serialize_group2compile(op, g2c, line, sizeof(line));
- set_serial_rule(s_rule, op, TO_GROUP2X_KEY(g2c->group_id, g2c->compile_id), 0, g2c->table_name,
- line, absolute_expire_time);
- TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries);
- batch->batch_size++;
- return 0;
-}
-int Maat_command_batch_set_compile(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_rule_t* compile, const char* table_name, const char * huge_service_defined, int clause_num, int label_id, int expire_after)
-{
-
- struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1);
- long long absolute_expire_time=0;
- char line[MAX_TABLE_LINE_SIZE];
- serialize_compile(compile, huge_service_defined, clause_num, op, line, sizeof(line));
-
- if(expire_after>0)
- {
- absolute_expire_time=batch->server_time+expire_after;
- }
- set_serial_rule(s_rule, op, compile->config_id, label_id, table_name,
- line, absolute_expire_time);
-
- TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries);
- batch->batch_size++;
- return 0;
-}
-int Maat_command_batch_commit(struct Maat_command_batch* batch)
-{
- struct serial_rule_t* s_rule_array=ALLOC(struct serial_rule_t, batch->batch_size);
- int i=0;
- redisContext* write_ctx=get_redis_ctx_for_write(batch->feather);
- struct serial_rule_t * tmp = TAILQ_FIRST(&batch->queue);
-
- while(tmp != NULL)
- {
- TAILQ_REMOVE(&batch->queue, tmp, entries);
- memcpy(s_rule_array+i, tmp, sizeof(*tmp));
- free(tmp);
- tmp = TAILQ_FIRST(&batch->queue);
- i++;
- }
- assert(i==batch->batch_size);
- exec_serial_rule(write_ctx, s_rule_array, batch->batch_size, batch->server_time, batch->feather->logger);
- for(i=0; i<batch->batch_size; i++)
- {
- empty_serial_rules(s_rule_array+i);
- }
- free(s_rule_array);
- free(batch);
- return i;
-}
-
-int Maat_command_raw_set_compile(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_rule_t* compile, const char* table_name, const char * huge_service_defined, int clause_num, int label_id, int expire_after)
-{
- struct Maat_command_batch* batch=NULL;
- batch=Maat_command_batch_new(feather);
- Maat_command_batch_set_compile(batch, op, compile, table_name, huge_service_defined, clause_num, label_id, expire_after);
- Maat_command_batch_commit(batch);
- return 0;
-}
-int Maat_command_raw_set_region(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_region* region, int group_id)
-{
- struct Maat_command_batch* batch=NULL;
- batch=Maat_command_batch_new(feather);
- Maat_command_batch_set_region(batch, op, region, group_id);
- Maat_command_batch_commit(batch);
- return 0;
-}
-int Maat_command_raw_set_group2compile(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c)
-{
- struct Maat_command_batch* batch=NULL;
- batch=Maat_command_batch_new(feather);
- Maat_command_batch_set_group2compile(batch, op, g2c);
- Maat_command_batch_commit(batch);
- return 0;
-}
-int Maat_command_raw_set_group2group(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g)
-{
-
- struct Maat_command_batch* batch=NULL;
- batch=Maat_command_batch_new(feather);
- Maat_command_batch_set_group2group(batch, op, g2g);
- Maat_command_batch_commit(batch);
- return 0;
-}
-
-int Maat_cmd_flushDB(Maat_feather_t feather)
-{
- _Maat_feather_t* _feather=(_Maat_feather_t*)feather;
- int ret=0;
- redisContext* write_ctx=get_redis_ctx_for_write(_feather);
- if(write_ctx==NULL)
- {
- return -1;
- }
- do
- {
- ret=redis_flush_DB(_feather->mr_ctx.write_ctx, _feather->mr_ctx.redis_db, _feather->logger);
- }while(ret==0);
- return 0;
-}
-
+#include "Maat_command.h"
+#include "Maat_rule.h"
+#include "Maat_rule_internal.h"
+#include "Maat_utils.h"
+#include "config_monitor.h"
+#include "map_str2int.h"
+#include "hiredis.h"
+#include <MESA/MESA_handle_logger.h>
+#include <errno.h>
+#include <pthread.h>
+#include <assert.h>
+#include <unistd.h>
+
+#define maat_redis_monitor (module_name_str("MAAT_REDIS_MONITOR"))
+#define maat_command (module_name_str("MAAT_COMMAND"))
+const time_t MAAT_REDIS_RECONNECT_INTERVAL=5;
+const char* mr_key_prefix[2]={"OBSOLETE_RULE","EFFECTIVE_RULE"};
+const char* mr_status_sset="MAAT_UPDATE_STATUS";
+const char* mr_expire_sset="MAAT_EXPIRE_TIMER";
+const char* mr_label_sset="MAAT_LABEL_INDEX";
+const char* mr_version_sset="MAAT_VERSION_TIMER";
+const char* mr_expire_lock="EXPIRE_OP_LOCK";
+const long mr_expire_lock_timeout=300*1000;
+const static int MAAT_REDIS_SYNC_TIME=30*60;
+const char* mr_op_str[]={"DEL","ADD","RENEW_TIMEOUT"};
+const char* foreign_source_prefix="redis://";
+const char* foreign_key_prefix="__FILE_";
+
+
+
+int _wrap_redisGetReply(redisContext *c, redisReply **reply)
+{
+ return redisGetReply(c, (void **)reply);
+}
+redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...)
+{
+ va_list ap;
+ void *reply = NULL;
+ int ret=REDIS_ERR, retry=0;
+ while(reply==NULL&&retry<2&&ret!=REDIS_OK)
+ {
+ va_start(ap,format);
+ reply = redisvCommand(c,format,ap);
+ va_end(ap);
+ if(reply==NULL)
+ {
+ ret=redisReconnect(c);
+ retry++;
+ }
+ }
+ return (redisReply *)reply;
+}
+redisContext * connect_redis(const char*redis_ip, int redis_port, int redis_db, void* logger)
+{
+ struct timeval connect_timeout;
+ connect_timeout.tv_sec=0;
+ connect_timeout.tv_usec=100*1000; // 100 ms
+ redisReply* reply=NULL;
+
+ redisContext * ctx;
+ ctx=redisConnectWithTimeout(redis_ip, redis_port,connect_timeout);
+ if(ctx==NULL||ctx->err)
+ {
+ if(logger==NULL)
+ {
+ printf("Unable to connect redis server %s:%d db%d, error: %s\n",
+ redis_ip, redis_port, redis_db, ctx==NULL ? "Unknown" : ctx->errstr);
+
+ }
+ else
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
+ "Unable to connect redis server %s:%d db%d, error: %s",
+ redis_ip, redis_port, redis_db, ctx==NULL ? "Unknown" : ctx->errstr);
+ }
+ if(ctx!=NULL) redisFree(ctx);
+ return NULL;
+ }
+ redisEnableKeepAlive(ctx);
+ reply=_wrap_redisCommand(ctx, "select %d",redis_db);
+ freeReplyObject(reply);
+ reply=NULL;
+
+ return ctx;
+
+}
+
+int connect_redis_for_write(struct source_redis_ctx* mr_ctx, void* logger)
+{
+ assert(mr_ctx->write_ctx==NULL);
+ mr_ctx->write_ctx=connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db, logger);
+ if(mr_ctx->write_ctx==NULL)
+ {
+ return -1;
+ }
+ else
+ {
+ return 0;
+ }
+}
+redisContext* get_redis_ctx_for_write(struct _Maat_feather_t * feather)
+{
+ int ret=0;
+ if(feather->mr_ctx.write_ctx==NULL)
+ {
+ ret=connect_redis_for_write(&(feather->mr_ctx), feather->logger);
+ if(ret!=0)
+ {
+ return NULL;
+ }
+ }
+ return feather->mr_ctx.write_ctx;
+}
+long long read_redis_integer(const redisReply* reply)
+{
+ switch(reply->type)
+ {
+ case REDIS_REPLY_INTEGER:
+ return reply->integer;
+ break;
+ case REDIS_REPLY_ARRAY:
+ assert(reply->element[0]->type==REDIS_REPLY_INTEGER);
+ return reply->element[0]->integer;
+ break;
+ case REDIS_REPLY_STRING:
+ return atoll(reply->str);
+ break;
+ default:
+ return -1;
+ break;
+ }
+ return 0;
+}
+long long redis_server_time(redisContext* ctx)
+{
+ long long server_time=0;
+ redisReply* data_reply=NULL;
+ data_reply=_wrap_redisCommand(ctx,"TIME");
+ if(data_reply->type==REDIS_REPLY_ARRAY)
+ {
+ server_time=atoll(data_reply->element[0]->str);
+ freeReplyObject(data_reply);
+ data_reply=NULL;
+ }
+ return server_time;
+}
+enum MAAT_TABLE_TYPE type_region2table(const struct Maat_region_t* p)
+{
+ enum MAAT_TABLE_TYPE ret=TABLE_TYPE_IP;
+ switch(p->region_type)
+ {
+ case REGION_IP:
+ ret=TABLE_TYPE_IP;
+ break;
+ case REGION_EXPR:
+ if(p->expr_rule.district==NULL)
+ {
+ ret=TABLE_TYPE_EXPR;
+ }
+ else
+ {
+ ret=TABLE_TYPE_EXPR_PLUS;
+ }
+ break;
+ case REGION_INTERVAL:
+ if(p->interval_rule.district==NULL)
+ {
+ ret=TABLE_TYPE_INTERVAL;
+ }
+ else
+ {
+ ret=TABLE_TYPE_INTERVAL_PLUS;
+ }
+ break;
+ case REGION_DIGEST:
+ ret=TABLE_TYPE_DIGEST;
+ break;
+ case REGION_SIMILARITY:
+ ret=TABLE_TYPE_SIMILARITY;
+ break;
+ default:
+ assert(0);
+ }
+ return ret;
+}
+int get_valid_flag_offset(const char* line, enum MAAT_TABLE_TYPE type,int valid_column_seq)
+{
+ size_t offset=0, len=0;
+ unsigned int column_seq=0, ret=0;
+ switch(type)
+ {
+ case TABLE_TYPE_EXPR:
+ column_seq=7;
+ break;
+ case TABLE_TYPE_IP:
+ column_seq=14;
+ break;
+ case TABLE_TYPE_IP_PLUS:
+ column_seq=18;
+ break;
+ case TABLE_TYPE_COMPILE:
+ column_seq=8;
+ break;
+ case TABLE_TYPE_PLUGIN:
+ case TABLE_TYPE_IP_PLUGIN:
+ case TABLE_TYPE_FQDN_PLUGIN:
+ if(valid_column_seq<0)
+ {
+ return -1;
+ }
+ column_seq=(unsigned int)valid_column_seq;
+ break;
+ case TABLE_TYPE_INTERVAL:
+ column_seq=5;
+ break;
+ case TABLE_TYPE_INTERVAL_PLUS:
+ column_seq=6;
+ break;
+ case TABLE_TYPE_DIGEST:
+ column_seq=6;
+ break;
+ case TABLE_TYPE_SIMILARITY:
+ column_seq=5;
+ break;
+ case TABLE_TYPE_EXPR_PLUS:
+ column_seq=8;
+ break;
+ case TABLE_TYPE_GROUP2COMPILE:
+ case TABLE_TYPE_GROUP2GROUP:
+ column_seq=3;
+ break;
+ default:
+ assert(0);
+ }
+
+ ret=get_column_pos(line, column_seq, &offset, &len);
+ if(ret<0||offset>=strlen(line)||(line[offset]!='1'&&line[offset]!='0'))// 0 is also a valid value for some non-MAAT producer.
+ {
+ return -1;
+ }
+ return offset;
+}
+int invalidate_line(char* line, enum MAAT_TABLE_TYPE type,int valid_column_seq)
+{
+ int i=0;
+ i=get_valid_flag_offset(line, type,valid_column_seq);
+ if(i<0)
+ {
+ return -1;
+ }
+ line[i]='0';
+ return 0;
+}
+
+void serialize_group2group(enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g, char* buff, size_t sz)
+{
+ snprintf(buff, sz, "%d\t%d\t%d", g2g->group_id,
+ g2g->superior_group_id,
+ op);
+ return;
+}
+void serialize_group2compile(enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c, char* buff, size_t sz)
+{
+ snprintf(buff, sz, "%d\t%d\t%d\t%d\t%s\t%d", g2c->group_id,
+ g2c->compile_id,
+ op,
+ g2c->not_flag,
+ g2c->virtual_table_name?g2c->virtual_table_name:"null",
+ g2c->clause_index);
+ return;
+}
+void serialize_compile(const struct Maat_rule_t* p_m_rule, const char* huge_service_defined, int clause_num, enum MAAT_OPERATION op, char* buff, size_t sz)
+{
+ if(op==MAAT_OP_RENEW_TIMEOUT) op=MAAT_OP_ADD;
+ const char* service_define=huge_service_defined?huge_service_defined:(strlen(p_m_rule->service_defined)?p_m_rule->service_defined:"null");
+
+ snprintf(buff, sz, "%d\t%d\t%hhu\t%hhu\t%hhu\t0\t%s\t%d\t%d",
+ p_m_rule->config_id,
+ p_m_rule->service_id,
+ p_m_rule->action,
+ p_m_rule->do_blacklist,
+ p_m_rule->do_log,
+ service_define,
+ op,
+ clause_num);
+ return;
+}
+void serialize_region(const struct Maat_cmd_region* p, int group_id, char* buff, size_t sz)
+{
+ UNUSED size_t ret=0;
+ switch(p->region_type)
+ {
+ case REGION_IP:
+ ret=snprintf(buff, sz, "%d\t%d\t%d\t%s\t%s\t%hu\t%hu\t%s\t%s\t%hu\t%hu\t%d\t%d\t1",
+ p->region_id,
+ group_id,
+ p->ip_rule.addr_type,
+ p->ip_rule.src_ip,
+ p->ip_rule.mask_src_ip,
+ p->ip_rule.src_port,
+ p->ip_rule.mask_src_port,
+ p->ip_rule.dst_ip,
+ p->ip_rule.mask_dst_ip,
+ p->ip_rule.dst_port,
+ p->ip_rule.mask_dst_port,
+ p->ip_rule.protocol,
+ p->ip_rule.direction);
+ break;
+ case REGION_IP_PLUS:
+ ret=snprintf(buff, sz, "%d\t%d\t%d\t%s\t%s\t%s\t%s\t%hu\t%hu\t%s\t%s\t%s\t%s\t%hu\t%hu\t%d\t%d\t1",
+ p->region_id,
+ group_id,
+ p->ip_plus_rule.addr_type,
+ p->ip_plus_rule.saddr_format,
+ p->ip_plus_rule.src_ip1,
+ p->ip_plus_rule.src_ip2,
+ p->ip_plus_rule.sport_format,
+ p->ip_plus_rule.src_port1,
+ p->ip_plus_rule.src_port2,
+ p->ip_plus_rule.daddr_format,
+ p->ip_plus_rule.dst_ip1,
+ p->ip_plus_rule.dst_ip2,
+ p->ip_plus_rule.dport_format,
+ p->ip_plus_rule.dst_port1,
+ p->ip_plus_rule.dst_port2,
+ p->ip_plus_rule.protocol,
+ p->ip_plus_rule.direction);
+ break;
+ case REGION_EXPR:
+ if(p->expr_rule.district==NULL)
+ {
+ ret=snprintf(buff,sz,"%d\t%d\t%s\t%d\t%d\t%d\t1",
+ p->region_id,
+ group_id,
+ p->expr_rule.keywords,
+ p->expr_rule.expr_type,
+ p->expr_rule.match_method,
+ p->expr_rule.hex_bin);
+ }
+ else //expr_plus
+ {
+ ret=snprintf(buff,sz,"%d\t%d\t%s\t%s\t%d\t%d\t%d\t1",
+ p->region_id,
+ group_id,
+ p->expr_rule.district,
+ p->expr_rule.keywords,
+ p->expr_rule.expr_type,
+ p->expr_rule.match_method,
+ p->expr_rule.hex_bin);
+ }
+ break;
+ case REGION_INTERVAL:
+ ret=snprintf(buff,sz,"%d\t%d\t%u\t%u\t1",
+ p->region_id,
+ group_id,
+ p->interval_rule.low_boundary,
+ p->interval_rule.up_boundary);
+ break;
+ case REGION_DIGEST:
+ ret=snprintf(buff,sz,"%d\t%d\t%llu\t%s\t%hd\t1",
+ p->region_id,
+ group_id,
+ p->digest_rule.orgin_len,
+ p->digest_rule.digest_string,
+ p->digest_rule.confidence_degree);
+ break;
+ case REGION_SIMILARITY:
+ ret=snprintf(buff,sz,"%d\t%d\t%s\t%hd\t1",
+ p->region_id,
+ group_id,
+ p->similarity_rule.target,
+ p->similarity_rule.threshold);
+ break;
+ default:
+ assert(0);
+ }
+ assert(ret<sz);
+ return;
+}
+void empty_serial_rules(struct serial_rule_t* rule)
+{
+ if(rule->table_line!=NULL)
+ {
+ free(rule->table_line);
+ }
+ if(rule->n_foreign>0)
+ {
+ for(int i=0; i<rule->n_foreign; i++)
+ {
+ free(rule->f_keys[i].filename);
+ free(rule->f_keys[i].key);
+ }
+ free(rule->f_keys);
+ }
+ memset(rule,0,sizeof(struct serial_rule_t));
+ return;
+}
+void set_serial_rule(struct serial_rule_t* rule, enum MAAT_OPERATION op, unsigned long rule_id,int label_id,const char* table_name,const char* line, long long timeout)
+{
+ memset(rule, 0, sizeof(struct serial_rule_t));
+ rule->op=op;
+ rule->rule_id=rule_id;
+ rule->label_id=label_id;
+ rule->timeout=timeout;
+ assert(strlen(table_name)<sizeof(rule->table_name));
+ strncpy(rule->table_name, table_name, sizeof(rule->table_name));
+ if(line!=NULL)
+ {
+ rule->table_line=_maat_strdup(line);
+ }
+ return;
+}
+int get_inc_key_list(long long instance_version, long long target_version, redisContext *c, struct serial_rule_t** list,void* logger)
+{
+ redisReply* reply=NULL,*tmp_reply=NULL;
+ char err_buff[256], op_str[4];
+ int rule_num=0;
+ UNUSED int ret=0;
+ unsigned int i=0, j=0;
+ long long nearest_rule_version;
+ struct serial_rule_t *s_rule=NULL;
+
+ //Returns all the elements in the sorted set at key with a score that instance_version < score <= redis_version.
+ //The elements are considered to be ordered from low to high scores(instance_version).
+ reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld",mr_status_sset,instance_version,target_version);
+
+ if(reply==NULL)
+ {
+ __redis_strerror_r(errno,err_buff,sizeof(err_buff)-1);
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
+ "GET %s failed %s.",mr_status_sset,err_buff);
+ return -1;
+ }
+ assert(reply->type==REDIS_REPLY_ARRAY);
+ rule_num=reply->elements;
+ if(reply->elements==0)
+ {
+ freeReplyObject(reply);
+ reply=NULL;
+ return 0;
+ }
+
+ tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",mr_status_sset,reply->element[0]->str);
+ if(tmp_reply->type!=REDIS_REPLY_STRING)
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
+ "ZSCORE %s %s failed Version: %lld->%lld",mr_status_sset,reply->element[0]->str,instance_version, target_version);
+ freeReplyObject(tmp_reply);
+ tmp_reply=NULL;
+ freeReplyObject(reply);
+ reply=NULL;
+ return -1;
+ }
+ nearest_rule_version=read_redis_integer(tmp_reply);
+ freeReplyObject(tmp_reply);
+ tmp_reply=NULL;
+ if(nearest_rule_version<0)
+ {
+ return -1;
+ }
+ if(nearest_rule_version!=instance_version+1)
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
+ "Noncontinuous VERSION Redis: %lld MAAT: %lld.",nearest_rule_version,instance_version);
+ }
+ s_rule=(struct serial_rule_t*)calloc(reply->elements,sizeof(struct serial_rule_t));
+ for(i=0, j=0;i<reply->elements;i++)
+ {
+ assert(reply->element[i]->type==REDIS_REPLY_STRING);
+ ret=sscanf(reply->element[i]->str,"%[^,],%[^,],%lu",op_str,s_rule[j].table_name,&(s_rule[j].rule_id));
+ if(ret!=3||s_rule[i].rule_id<0)
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
+ "Invalid Redis Key: %s",reply->element[i]->str);
+ continue;
+ }
+ if(strncmp(op_str,"ADD",strlen("ADD"))==0)
+ {
+ s_rule[j].op=MAAT_OP_ADD;
+ }
+ else if(strncmp(op_str,"DEL",strlen("DEL"))==0)
+ {
+ s_rule[j].op=MAAT_OP_DEL;
+ }
+ else
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
+ "Invalid Redis Key: %s",reply->element[i]->str);
+ continue;
+ }
+ j++;
+ }
+ rule_num=j;
+ *list=s_rule;
+ freeReplyObject(reply);
+ reply=NULL;
+
+ return rule_num;
+}
+struct s_rule_array_t
+{
+ int cnt;
+ int size;
+ struct serial_rule_t* array;
+};
+void save_serial_rule_cb(const uchar * key, uint size, void * data, void * user)
+{
+ struct s_rule_array_t* array=(struct s_rule_array_t*)user;
+ int i=array->cnt;
+ memcpy(&(array->array[i]),data,sizeof(struct serial_rule_t));
+ array->array[i].op=MAAT_OP_ADD;
+ return;
+}
+int recovery_history_version(const struct serial_rule_t* current, int current_num, const struct serial_rule_t* changed, int changed_num, struct serial_rule_t** history_result)
+{
+ int i=0,ret=0;
+ unsigned int history_num=0;
+ int hash_slot_size=1;
+ MESA_htable_handle htable=NULL;
+ MESA_htable_create_args_t hargs;
+ struct s_rule_array_t tmp_array;
+ char hkey[256+20];
+ int tmp=current_num+changed_num;
+ for(;tmp>0;tmp=tmp/2)
+ {
+ hash_slot_size*=2;
+ }
+
+ memset(&hargs,0,sizeof(hargs));
+ hargs.thread_safe=0;
+ hargs.hash_slot_size = hash_slot_size;
+ hargs.max_elem_num = 0;
+ hargs.eliminate_type = HASH_ELIMINATE_ALGO_FIFO;
+ hargs.expire_time = 0;
+ hargs.key_comp = NULL;
+ hargs.key2index = NULL;
+ hargs.recursive = 1;
+ hargs.data_free = NULL;//data is an reference, no need to free.
+ hargs.data_expire_with_condition = NULL;
+ htable=MESA_htable_create(&hargs, sizeof(hargs));
+ MESA_htable_print_crtl(htable, 0);
+
+ for(i=0;i<current_num;i++)
+ {
+ snprintf(hkey,sizeof(hkey),"%ld,%s",current[i].rule_id,current[i].table_name);
+ ret=MESA_htable_add(htable, (uchar*)hkey, strlen(hkey), current+i);
+ assert(ret>0);
+ }
+
+ for(i=changed_num-1;i>=0;i--)
+ {
+ snprintf(hkey,sizeof(hkey),"%ld,%s",changed[i].rule_id,changed[i].table_name);
+ if(changed[i].op==MAAT_OP_ADD)//newly added rule is need to delete from current, so that history version can be recovered.
+ {
+ ret=MESA_htable_del(htable, (uchar*)hkey, strlen(hkey), NULL);
+ }
+ else
+ {
+ ret=MESA_htable_add(htable, (uchar*)hkey, strlen(hkey),changed+i);
+ }
+ if(ret<0)//failed
+ {
+ goto error_out;
+ }
+ }
+ history_num=MESA_htable_get_elem_num(htable);
+ tmp_array.cnt=0;
+ tmp_array.size=history_num;
+ tmp_array.array=(struct serial_rule_t*)calloc(history_num,sizeof(struct serial_rule_t));
+ MESA_htable_iterate(htable, save_serial_rule_cb, &tmp_array);
+ *history_result=tmp_array.array;
+ ret=history_num;
+error_out:
+ MESA_htable_destroy(htable, NULL);
+ return ret;
+}
+
+int get_rm_key_list(redisContext *c, long long instance_version, long long desired_version, long long* new_version, struct Maat_table_manager* table_mgr, struct serial_rule_t** list,int *update_type, void* logger, int cumulative_off)
+{
+ redisReply* reply=NULL,*sub_reply=NULL;
+ char err_buff[256];
+ long long redis_version=0,target_version=0;
+ int rule_num=0, changed_rule_num=0, table_id=0;
+ int ret=0;
+ unsigned int i=0,full_idx =0,append_cmd_cnt=0;
+ struct serial_rule_t *s_rule_array=NULL, *changed_rule_array=NULL, *history_rule_array=NULL;
+
+ reply=(redisReply*)redisCommand(c, "GET MAAT_VERSION");
+ if(reply!=NULL)
+ {
+
+ if(reply->type==REDIS_REPLY_NIL||reply->type==REDIS_REPLY_ERROR)
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"GET MAAT_VERSION failed, maybe Redis is busy.");
+ freeReplyObject(reply);
+ reply=NULL;
+ return -1;
+ }
+ }
+ else
+ {
+ memset(err_buff, 0, sizeof(err_buff));
+ __redis_strerror_r(errno, err_buff, sizeof(err_buff)-1);
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
+ "GET MAAT_VERSION failed %s.",err_buff);
+ return -1;
+ }
+ redis_version=read_redis_integer(reply);
+ if(redis_version<0)
+ {
+ if(reply->type==REDIS_REPLY_ERROR)
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
+ "Redis Communication error: %s.",reply->str);
+ }
+ return -1;
+ }
+ freeReplyObject(reply);
+ reply=NULL;
+ if(redis_version==instance_version)
+ {
+ return 0;
+ }
+
+ if(instance_version==0||desired_version!=0)
+ {
+ goto FULL_UPDATE;
+ }
+ if(redis_version<instance_version)
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
+ "VERSION roll back MAAT: %lld -> Redis: %lld.",instance_version,redis_version);
+ goto FULL_UPDATE;
+ }
+ if(redis_version>instance_version&&cumulative_off==1)
+ {
+ target_version=instance_version;
+ }
+ else
+ {
+ target_version=redis_version-1;
+ }
+ do{
+ target_version++;
+ rule_num=get_inc_key_list(instance_version, target_version, c, &s_rule_array,logger);
+ if(rule_num>0)
+ {
+ break;
+ }
+ else if(rule_num<0)
+ {
+ goto FULL_UPDATE;
+ }
+ else
+ {
+ //ret=0, nothing to do.
+ }
+
+ }while(rule_num==0&&target_version<=redis_version&&cumulative_off==1);
+ if(rule_num==0)
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_DEBUG, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%lld %lld, cumulative %s"
+ ,mr_status_sset,instance_version,target_version-1,cumulative_off==1?"OFF":"ON");
+ return 0;
+ }
+ MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
+ "Inc Update from instance_version %lld to %lld (%d entries).",instance_version,target_version,rule_num);
+ *list=s_rule_array;
+ *update_type=CM_UPDATE_TYPE_INC;
+ *new_version=target_version;
+ return rule_num;
+
+FULL_UPDATE:
+ MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
+ "Initiate full udpate from instance_version %d to %lld.",instance_version,desired_version==0?redis_version:desired_version);
+ append_cmd_cnt=0;
+ ret=redisAppendCommand(c, "MULTI");
+ append_cmd_cnt++;
+ ret=redisAppendCommand(c, "GET MAAT_VERSION");
+ append_cmd_cnt++;
+ ret=redisAppendCommand(c, "KEYS EFFECTIVE_RULE:*");
+ append_cmd_cnt++;
+ //consume reply "OK" and "QUEUED".
+ for(i=0;i<append_cmd_cnt;i++)
+ {
+ _wrap_redisGetReply(c, &reply);
+ freeReplyObject(reply);
+ reply=NULL;
+ }
+ reply=_wrap_redisCommand(c,"EXEC");
+ if(reply==NULL)
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
+ "Redis Communication error: %s.",c->errstr);
+ return -1;
+ }
+ if(reply->type!=REDIS_REPLY_ARRAY)
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
+ "Invalid Redis Key List type %d", reply->type);
+ freeReplyObject(reply);
+ reply=NULL;
+ return -1;
+ }
+ *new_version=read_redis_integer(reply->element[0]);
+ sub_reply=reply->element[1];
+ if(sub_reply->type!=REDIS_REPLY_ARRAY)
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
+ "Invalid Redis Key List type %d", sub_reply->type);
+ freeReplyObject(reply);
+ reply=NULL;
+ return -1;
+ }
+
+ s_rule_array=(struct serial_rule_t*)calloc(sub_reply->elements,sizeof(struct serial_rule_t));
+ for(i=0, full_idx=0; i<sub_reply->elements; i++)
+ {
+ if(sub_reply->element[i]->type!=REDIS_REPLY_STRING)
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
+ "Invalid Redis Key Type: %d", sub_reply->element[i]->type);
+ continue;
+ }
+ ret=sscanf(sub_reply->element[i]->str,"%*[^:]:%[^,],%ld",s_rule_array[full_idx].table_name,&(s_rule_array[full_idx].rule_id));
+ s_rule_array[full_idx].op=MAAT_OP_ADD;
+ if(ret!=2||s_rule_array[full_idx].rule_id<0||strlen(s_rule_array[full_idx].table_name)==0)
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
+ "Invalid Redis Key Format: %s", sub_reply->element[i]->str);
+ continue;
+ }
+ if(table_mgr)
+ {
+ table_id=Maat_table_get_id_by_name(table_mgr, s_rule_array[full_idx].table_name);
+ if(table_id<0)//Unrecognized table.
+ {
+ continue;
+ }
+ }
+ full_idx++;
+ }
+ rule_num=full_idx;
+ freeReplyObject(reply);
+ reply=NULL;
+ if(desired_version!=0)
+ {
+ changed_rule_num=get_inc_key_list(desired_version, redis_version, c, &changed_rule_array, logger);
+ if(changed_rule_num<0)
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
+ "Recover history version %lld faild where as redis version is %lld.", desired_version, redis_version);
+ }
+ else if(changed_rule_num==0)
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
+ "Nothing to recover from history version %lld to redis version is %lld.", desired_version, redis_version);
+ }
+ else
+ {
+ ret=recovery_history_version(s_rule_array, full_idx, changed_rule_array, changed_rule_num, &history_rule_array);
+ if(ret>0)
+ {
+ free(s_rule_array);
+ s_rule_array=history_rule_array;
+ rule_num=ret;
+ *new_version=desired_version;
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
+ "Successfully recovered from history version %lld to redis version is %lld.", desired_version, redis_version);
+ }
+ }
+ free(changed_rule_array);
+ }
+ *list=s_rule_array;
+ *update_type=CM_UPDATE_TYPE_FULL;
+ MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
+ "Full update %d keys of version %lld.", rule_num, *new_version);
+
+ return rule_num ;
+}
+
+int _get_maat_redis_value(redisContext *c, struct serial_rule_t* rule_list, int rule_num, void* logger)
+{
+ int i=0,failed_cnt=0,idx=0;
+ UNUSED int ret=0;
+ int error_happened=0;
+ int *retry_ids=(int*)malloc(sizeof(int)*rule_num);
+ char redis_cmd[256];
+ redisReply* reply=NULL;
+ for(i=0;i<rule_num;i++)
+ {
+ snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%ld",mr_key_prefix[rule_list[i].op]
+ ,rule_list[i].table_name
+ ,rule_list[i].rule_id);
+ ret=redisAppendCommand(c, redis_cmd);
+ assert(ret==REDIS_OK);
+ }
+ for(i=0;i<rule_num;i++)
+ {
+ ret=_wrap_redisGetReply(c,&reply);
+ if(ret==REDIS_ERR)
+ {
+ MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,
+ "Redis GET %s:%s,%d failed, redis server error.", mr_key_prefix[rule_list[i].op],
+ rule_list[i].table_name,
+ rule_list[i].rule_id);
+ error_happened=1;
+ break;
+ }
+ if(reply->type==REDIS_REPLY_STRING)
+ {
+ rule_list[i].table_line=_maat_strdup(reply->str);
+ }
+ else
+ {
+ if(reply->type==REDIS_REPLY_NIL)
+ {
+ retry_ids[failed_cnt]=i;
+ failed_cnt++;
+ }
+ else
+ {
+ MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor
+ ,"Redis GET %s:%s,%d failed",mr_key_prefix[rule_list[i].op]
+ ,rule_list[i].table_name
+ ,rule_list[i].rule_id);
+ error_happened=1;
+ }
+ }
+ freeReplyObject(reply);
+ reply=NULL;
+ }
+ if(error_happened==1)
+ {
+ free(retry_ids);
+ return -1;
+ }
+
+ for(i=0;i<failed_cnt;i++)
+ {
+ idx=retry_ids[i];
+ snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%ld",mr_key_prefix[MAAT_OP_DEL]
+ ,rule_list[idx].table_name
+ ,rule_list[idx].rule_id);
+ ret=redisAppendCommand(c, redis_cmd);
+ }
+ for(i=0;i<failed_cnt;i++)
+ {
+ idx=retry_ids[i];
+ ret=_wrap_redisGetReply(c,&reply);
+ if(ret==REDIS_ERR)
+ {
+ MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor
+ ,"redis command %s failed, redis server error.", redis_cmd);
+ free(retry_ids);
+ return -1;
+ }
+ if(reply->type==REDIS_REPLY_STRING)
+ {
+ rule_list[idx].table_line=_maat_strdup(reply->str);
+ }
+ else if(reply->type==REDIS_REPLY_ERROR)//Deal with Redis response: "Loading Redis is loading the database in memory"
+ {
+ MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor,
+ "redis command %s error, reply type=%d, error str=%s", redis_cmd, reply->type, reply->str);
+ }
+ else //Handle type "nil"
+ {
+ MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor,
+ "redis command %s failed, reply type=%d", redis_cmd, reply->type);
+ }
+
+ freeReplyObject(reply);
+ reply=NULL;
+
+ }
+ free(retry_ids);
+ return 0;
+}
+int get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,void* logger,int print_process)
+{
+ int max_redis_batch=4*1024,batch_cnt=0;
+ int success_cnt=0,ret=0;
+ int next_print=10;
+ while(success_cnt<rule_num)
+ {
+ batch_cnt=MIN(rule_num-success_cnt,max_redis_batch);
+ ret=_get_maat_redis_value(c,rule_list+success_cnt,batch_cnt,logger);
+ if(ret<0)
+ {
+ return -1;
+ }
+ else
+ {
+ success_cnt+=batch_cnt;
+ }
+ if(print_process==1)
+ {
+ if((success_cnt*100)/rule_num>next_print)
+ {
+ printf(" >%d%%",next_print);
+ next_print+=10;
+ }
+ }
+ }
+ if(print_process==1)
+ {
+ printf(" >100%%\n");
+ }
+ return 0;
+}
+
+int mr_transaction_success(redisReply* data_reply)
+{
+ if(data_reply->type==REDIS_REPLY_NIL)
+ {
+ return 0;
+ }
+ else
+ {
+ return 1;
+ }
+}
+
+int redlock_try_lock(redisContext *ctx, const char* lock_name, long long expire)
+{
+ redisReply* reply=NULL;
+ int ret=0;
+ reply=_wrap_redisCommand(ctx,"SET %s locked NX PX %lld", lock_name, expire);
+ if(reply->type==REDIS_REPLY_NIL)
+ {
+ ret=0;
+ }
+ else
+ {
+ ret=1;
+ }
+ freeReplyObject(reply);
+ reply=NULL;
+
+ return ret;
+}
+void redlock_unlock(redisContext * ctx, const char * lock_name)
+{
+ redisReply* reply=NULL;
+ reply=_wrap_redisCommand(ctx,"DEL %s", lock_name);
+ freeReplyObject(reply);
+ reply=NULL;
+
+}
+#define POSSIBLE_REDIS_REPLY_SIZE 2
+struct expected_reply
+{
+ int srule_seq;
+ int possible_reply_num;
+ redisReply possible_replies[POSSIBLE_REDIS_REPLY_SIZE];
+};
+void expected_reply_add(struct expected_reply* expected, int srule_seq, int type, long long integer)
+{
+ int i=expected->possible_reply_num;
+ assert(i<POSSIBLE_REDIS_REPLY_SIZE);
+ expected->srule_seq=srule_seq;
+ expected->possible_replies[i].type=type;
+ expected->possible_replies[i].integer=integer;
+ expected->possible_reply_num++;
+}
+int mr_operation_success(redisReply* actual_reply, struct expected_reply* expected)
+{
+ int i=0;
+ if(expected->possible_replies[0].type!=actual_reply->type)
+ {
+ return 0;
+ }
+ for(i=0; i< expected->possible_reply_num; i++)
+ {
+ if(expected->possible_replies[i].type==REDIS_REPLY_INTEGER &&
+ expected->possible_replies[i].type==actual_reply->type &&
+ expected->possible_replies[i].integer==actual_reply->integer)
+ {
+ return 1;
+ }
+ if(expected->possible_replies[i].type==REDIS_REPLY_STATUS &&
+ expected->possible_replies[i].type==actual_reply->type &&
+ 0==strcasecmp(actual_reply->str, "OK"))
+ {
+ return 1;
+ }
+ }
+ return 0;
+
+}
+
+long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule_num,int *renew_allowed, long long *transaction_version)
+{
+ int ret=-1;
+ redisReply* data_reply=NULL;
+ if(renew_rule_num>0)
+ {
+ while(0==redlock_try_lock(ctx, mr_expire_lock, mr_expire_lock_timeout))
+ {
+ usleep(1000);
+ }
+ *renew_allowed=1;
+ }
+ if(rule_num>renew_rule_num)
+ {
+ data_reply=_wrap_redisCommand(ctx, "INCRBY MAAT_PRE_VER 1");
+ *transaction_version=read_redis_integer(data_reply);
+ freeReplyObject(data_reply);
+ data_reply=NULL;
+ if(*transaction_version<0)
+ {
+ return -1;
+ }
+ }
+ if(*renew_allowed==1||rule_num>renew_rule_num)
+ {
+ data_reply=_wrap_redisCommand(ctx,"MULTI");
+ freeReplyObject(data_reply);
+ data_reply=NULL;
+ ret=0;
+ }
+ return ret;
+}
+//parameters: 4 keys: MAAT_VERSION MAAT_UPDATE_STATUS MAAT_VERSION_TIMER MAAT_TRANSACTION_xx, 1 args: SERVER_TIME
+const char* lua_exec_done=
+"local maat_version=redis.call(\'incrby\', KEYS[1], 1);"
+"local transaction=redis.call(\'lrange\', KEYS[4], 0, -1);"
+"for k,v in pairs(transaction) do"
+" redis.call(\'zadd\', KEYS[2], maat_version, v);"
+"end;"
+"redis.call(\'del\', KEYS[4]);"
+"redis.call(\'zadd\', KEYS[3], ARGV[1], maat_version);"
+"return maat_version;";
+redisReply* _exec_serial_rule_end(redisContext* ctx, const char* transaction_list, long long server_time, int renew_allowed, struct expected_reply* expect_reply, unsigned int *cnt)
+{
+ redisReply* data_reply=NULL;
+ if(renew_allowed==1)
+ {
+ redlock_unlock(ctx, mr_expire_lock);
+ expect_reply[*cnt].srule_seq=-1;
+ (*cnt)++;
+ }
+ if(strlen(transaction_list)>0)
+ {
+ data_reply=_wrap_redisCommand(ctx, "eval %s 4 MAAT_VERSION %s %s %s %lld",
+ lua_exec_done,
+ mr_status_sset,
+ mr_version_sset,
+ transaction_list,
+ server_time);
+ freeReplyObject(data_reply);
+ data_reply=NULL;
+ expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
+ (*cnt)++;
+ }
+ data_reply=_wrap_redisCommand(ctx,"EXEC");
+ return data_reply;
+}
+
+void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct serial_rule_t* s_rule, unsigned int rule_num, struct expected_reply* expect_reply, unsigned int *cnt, int offset,int renew_allowed)
+{
+ redisReply* data_reply=NULL;
+ unsigned int append_cmd_cnt=0, i=0;
+ for(i=0;i<rule_num;i++)
+ {
+ switch(s_rule[i].op)
+ {
+ case MAAT_OP_ADD:
+ redisAppendCommand(ctx,"SET %s:%s,%lu %s",
+ mr_key_prefix[MAAT_OP_ADD],
+ s_rule[i].table_name,
+ s_rule[i].rule_id,
+ s_rule[i].table_line);
+ expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_STATUS, 0);
+ (*cnt)++;
+ append_cmd_cnt++;
+ //Allowing add duplicated members for rule id recycling.
+ redisAppendCommand(ctx,"RPUSH %s ADD,%s,%lu",
+ transaction_list,
+ s_rule[i].table_name,
+ s_rule[i].rule_id);
+ expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
+ (*cnt)++;
+ append_cmd_cnt++;
+ if(s_rule[i].timeout>0)
+ {
+ redisAppendCommand(ctx,"ZADD %s %lld %s,%lu",
+ mr_expire_sset,
+ s_rule[i].timeout,
+ s_rule[i].table_name,
+ s_rule[i].rule_id);
+ expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1);
+ expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0);
+ (*cnt)++;
+ append_cmd_cnt++;
+ }
+ if(s_rule[i].label_id>0)
+ {
+ redisAppendCommand(ctx,"ZADD %s %d %s,%lu",
+ mr_label_sset,
+ s_rule[i].label_id,
+ s_rule[i].table_name,
+ s_rule[i].rule_id);
+ expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1);
+ expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0);
+
+ (*cnt)++;
+
+ append_cmd_cnt++;
+ }
+ break;
+ case MAAT_OP_DEL:
+ redisAppendCommand(ctx,"RENAME %s:%s,%lu %s:%s,%lu",
+ mr_key_prefix[MAAT_OP_ADD],
+ s_rule[i].table_name,
+ s_rule[i].rule_id,
+ mr_key_prefix[MAAT_OP_DEL],
+ s_rule[i].table_name,
+ s_rule[i].rule_id
+ );
+ expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_STATUS, 0);
+ (*cnt)++;
+ append_cmd_cnt++;
+
+ redisAppendCommand(ctx,"EXPIRE %s:%s,%lu %d",
+ mr_key_prefix[MAAT_OP_DEL],
+ s_rule[i].table_name,
+ s_rule[i].rule_id,
+ MAAT_REDIS_SYNC_TIME);
+ expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1);
+ (*cnt)++;
+ append_cmd_cnt++;
+
+ //NX: Don't update already exisiting elements. Always add new elements.
+ redisAppendCommand(ctx,"RPUSH %s DEL,%s,%lu",
+ transaction_list,
+ s_rule[i].table_name,
+ s_rule[i].rule_id);
+ expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
+ (*cnt)++;
+ append_cmd_cnt++;
+
+ // Try to remove from expiration sorted set, no matter wheather it exists or not.
+ redisAppendCommand(ctx,"ZREM %s %s,%lu",
+ mr_expire_sset,
+ s_rule[i].table_name,
+ s_rule[i].rule_id);
+ expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
+ (*cnt)++;
+ append_cmd_cnt++;
+
+ // Try to remove from label sorted set, no matter wheather it exists or not.
+ redisAppendCommand(ctx,"ZREM %s %s,%lu",
+ mr_label_sset,
+ s_rule[i].table_name,
+ s_rule[i].rule_id);
+ expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
+ (*cnt)++;
+ append_cmd_cnt++;
+ break;
+ case MAAT_OP_RENEW_TIMEOUT:
+ if(renew_allowed!=1)
+ {
+ continue;
+ }
+ //s_rule[i].timeout>0 was checked by caller.
+ redisAppendCommand(ctx,"ZADD %s %lld %s,%lu",
+ mr_expire_sset,
+ s_rule[i].timeout,
+ s_rule[i].table_name,
+ s_rule[i].rule_id);
+ expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
+ (*cnt)++;
+ append_cmd_cnt++;
+
+ break;
+ default:
+ assert(0);
+ break;
+ }
+ }
+ for(i=0;i<append_cmd_cnt;i++)
+ {
+ _wrap_redisGetReply(ctx, &data_reply);
+ freeReplyObject(data_reply);
+ data_reply=NULL;
+ }
+ return;
+}
+
+int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule, unsigned int serial_rule_num, long long server_time, void* logger)
+{
+ unsigned int max_redis_batch=1*1024, batch_cnt=0;
+ int renew_allowed=0,last_failed=-1;
+ redisReply*transaction_reply=NULL,*p=NULL;
+ unsigned int i=0, rule_seq=0;
+
+ unsigned int multi_cmd_cnt=0, success_cnt=0;
+ const int MAX_REDIS_OP_PER_SRULE=8;
+ unsigned int max_multi_cmd_num=MAX_REDIS_OP_PER_SRULE*serial_rule_num+2;// 2 for operation in _exec_serial_rule_end()
+ char transaction_list[MAX_TABLE_NAME_LEN*2]={0};
+ struct expected_reply *expected_reply=(struct expected_reply*)calloc(sizeof(struct expected_reply), max_multi_cmd_num);
+ long long transaction_version=0, transaction_finished_version=0;
+ int renew_num=0,ret=0;
+ for(i=0;i<serial_rule_num;i++)
+ {
+ if(s_rule[i].op==MAAT_OP_RENEW_TIMEOUT)
+ {
+ renew_num++;
+ }
+ }
+
+ ret=_exec_serial_rule_begin(ctx, serial_rule_num, renew_num, &renew_allowed, &transaction_version);
+ if(ret!=0)//Preconditions for transaction are not satisfied.
+ {
+ success_cnt=-1;
+ goto error_out;
+ }
+ if(transaction_version>0)
+ {
+ snprintf(transaction_list, sizeof(transaction_list), "MAAT_TRANSACTION_%lld", transaction_version);
+ }
+ while(success_cnt<serial_rule_num)
+ {
+ batch_cnt=MIN(serial_rule_num-success_cnt, max_redis_batch);
+ _exec_serial_rule(ctx, transaction_list, s_rule+success_cnt, batch_cnt, expected_reply, &multi_cmd_cnt,success_cnt,renew_allowed);
+ assert(multi_cmd_cnt<max_multi_cmd_num);
+ success_cnt+=batch_cnt;
+ }
+ transaction_reply=_exec_serial_rule_end(ctx, transaction_list, server_time, renew_allowed, expected_reply, &multi_cmd_cnt);
+ if(1==mr_transaction_success(transaction_reply))
+ {
+ assert(transaction_reply->elements==multi_cmd_cnt);
+ for(i=0;i<multi_cmd_cnt;i++)
+ {
+ p=transaction_reply->element[i];
+ //failed is acceptable
+ //or transaciton is success
+ //or continuation of last failed
+ if(expected_reply[i].srule_seq==-1||1==mr_operation_success(p, expected_reply+i)||last_failed==expected_reply[i].srule_seq)
+ {
+ continue;
+ }
+ rule_seq=expected_reply[i].srule_seq;
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_command,
+ "%s %s %d failed, rule id maybe conflict or not exist.",
+ mr_op_str[s_rule[rule_seq].op],
+ s_rule[rule_seq].table_name,s_rule[rule_seq].rule_id);
+ success_cnt--;
+ last_failed=rule_seq;
+ }
+ }
+ else
+ {
+ success_cnt=-1;
+ }
+ if(transaction_version>0)
+ {
+ transaction_finished_version=read_redis_integer(transaction_reply->element[multi_cmd_cnt-1]);
+ MESA_handle_runtime_log(logger, RLOG_LV_DEBUG, maat_command,
+ "Redis transaction MAAT_PRE_VER = %lld , MAAT_VERSION = %lld ",
+ transaction_version,
+ transaction_finished_version);
+ }
+
+ freeReplyObject(transaction_reply);
+ transaction_reply=NULL;
+
+error_out:
+ if(renew_num>0&&renew_allowed!=1)
+ {
+ for(i=0;i<(unsigned int)serial_rule_num;i++)
+ {
+ if(s_rule[i].op==MAAT_OP_RENEW_TIMEOUT)
+ {
+ MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_command
+ ,"%s %s %d is not allowed due to lock contention.",mr_op_str[MAAT_OP_RENEW_TIMEOUT]
+ , s_rule[i].table_name,s_rule[i].rule_id);
+ }
+ }
+ if(success_cnt>0)
+ {
+ success_cnt-=renew_num;
+ }
+ }
+ free(expected_reply);
+ return success_cnt;
+}
+
+
+void check_maat_expiration(redisContext *ctx, void *logger)
+{
+ unsigned int i=0,s_rule_num=0;
+ UNUSED int ret=0;
+ int success_cnt=0;
+ redisReply* data_reply=NULL;
+ struct serial_rule_t* s_rule=NULL;
+ long long server_time=0;
+
+ server_time=redis_server_time(ctx);
+ if(!server_time)
+ {
+ return;
+ }
+ data_reply=_wrap_redisCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",mr_expire_sset,server_time);
+ if(data_reply->type!=REDIS_REPLY_ARRAY||data_reply->elements==0)
+ {
+ freeReplyObject(data_reply);
+ data_reply=NULL;
+ return;
+ }
+ s_rule_num=data_reply->elements;
+ s_rule=(struct serial_rule_t*)calloc(sizeof(struct serial_rule_t),s_rule_num);
+ for(i=0;i<s_rule_num;i++)
+ {
+ s_rule[i].op=MAAT_OP_DEL;
+ ret=sscanf(data_reply->element[i]->str,"%[^,],%ld",s_rule[i].table_name,&(s_rule[i].rule_id));
+ assert(ret==2);
+ }
+ freeReplyObject(data_reply);
+ data_reply=NULL;
+ success_cnt=exec_serial_rule(ctx,s_rule, s_rule_num,server_time, logger);
+
+ if(success_cnt==(int)s_rule_num)
+ {
+ MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor
+ ,"Succesfully expired %d rules in Redis.", s_rule_num);
+ }
+ else
+ {
+ MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor
+ ,"Failed to expired %d of %d rules in Redis, try later.", s_rule_num-success_cnt,s_rule_num);
+ }
+
+ free(s_rule);
+ return;
+}
+void cleanup_update_status(redisContext *ctx, void *logger)
+{
+ redisReply* reply=NULL,*sub_reply=NULL;
+ int append_cmd_cnt=0,i=0;
+ long long server_time=0, version_upper_bound=0,version_lower_bound=0,version_num=0,entry_num=0;
+
+ server_time=redis_server_time(ctx);
+ if(!server_time)
+ {
+ return;
+ }
+ reply=_wrap_redisCommand(ctx,"MULTI");
+ freeReplyObject(reply);
+ reply=NULL;
+ redisAppendCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",mr_version_sset,server_time-MAAT_REDIS_SYNC_TIME);
+ append_cmd_cnt++;
+ redisAppendCommand(ctx, "ZREMRANGEBYSCORE %s -inf %lld",mr_version_sset,server_time-MAAT_REDIS_SYNC_TIME);
+ append_cmd_cnt++;
+ //consume reply "OK" and "QUEUED".
+ for(i=0;i<append_cmd_cnt;i++)
+ {
+ _wrap_redisGetReply(ctx, &reply);
+ freeReplyObject(reply);
+ reply=NULL;
+ }
+ reply=_wrap_redisCommand(ctx,"EXEC");
+ if(reply->type!=REDIS_REPLY_ARRAY)
+ {
+ goto error_out;
+ }
+ sub_reply=reply->element[0];
+ if(sub_reply->type!=REDIS_REPLY_ARRAY)
+ {
+ goto error_out;
+ }
+ version_num=sub_reply->elements;
+ if(version_num==0)
+ {
+ goto error_out;
+ }
+ version_lower_bound=read_redis_integer(sub_reply->element[0]);
+ version_upper_bound=read_redis_integer(sub_reply->element[sub_reply->elements-1]);
+ freeReplyObject(reply);
+ reply=NULL;
+
+ //To deal with maat_version reset to 0, do NOT use -inf as lower bound intentionally.
+ reply=_wrap_redisCommand(ctx,"ZREMRANGEBYSCORE %s %lld %lld",mr_status_sset,version_lower_bound,version_upper_bound);
+ entry_num=read_redis_integer(reply);
+ freeReplyObject(reply);
+ reply=NULL;
+
+ MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor
+ ,"Clean up update status from version %lld to %lld (%lld versions, %lld entries)."
+ ,version_lower_bound
+ ,version_upper_bound
+ ,version_num
+ ,entry_num);
+ return;
+
+error_out:
+ freeReplyObject(reply);
+ reply=NULL;
+ return;
+}
+const char* find_Nth_column(const char* line, int Nth, int* column_len)
+{
+ size_t i=0;
+ int j=0;
+ int start=0, end=0;
+ size_t line_len= strlen(line);
+ for(i=0;i<line_len;i++)
+ {
+ if(line[i]!=' '&&line[i]!='\t')
+ {
+ continue;
+ }
+ j++;
+ if(j==Nth-1)
+ {
+ start=i+1;
+ }
+ if(j==Nth)
+ {
+ end=i;
+ break;
+ }
+ }
+ if(start==end)
+ {
+ return NULL;
+ }
+ if(end==0)
+ {
+ end=i;
+ }
+ *column_len=end-start;
+ return line+start;
+}
+char* get_foreign_cont_filename(const char* table_name, int rule_id, const char* foreign_key, const char* dir)
+{
+ char* filename=NULL;
+ char buffer[512];
+ snprintf(buffer, sizeof(buffer),"%s/%s-%d-%s",dir, table_name, rule_id, foreign_key);
+ filename=(char*)calloc(sizeof(char), strlen(buffer)+1);
+ memcpy(filename, buffer, strlen(buffer));
+ return filename;
+}
+void rewrite_table_line_with_foreign(struct serial_rule_t*p)
+{
+ int origin_column_size=0;
+ const char* origin_column=NULL, *pos_origin_line=NULL;
+ char* pos_rewrite_line=NULL;
+ char* rewrite_line=NULL;
+ size_t fn_size=0;
+ int i=0;
+ for(i=0; i<p->n_foreign; i++)
+ {
+ fn_size+=strlen(p->f_keys[i].filename);
+ }
+
+ rewrite_line=(char*)calloc(sizeof(char), strlen(p->table_line)+fn_size);
+ pos_origin_line=p->table_line;
+ pos_rewrite_line=rewrite_line;
+
+ for(i=0; i<p->n_foreign; i++)
+ {
+ origin_column=find_Nth_column(p->table_line, p->f_keys[i].column, &origin_column_size);
+ strncat(pos_rewrite_line, pos_origin_line, origin_column-pos_origin_line);
+ pos_rewrite_line+=origin_column-pos_origin_line;
+ pos_origin_line=origin_column+origin_column_size;
+
+ strncat(pos_rewrite_line, p->f_keys[i].filename, strlen(p->f_keys[i].filename));
+ pos_rewrite_line+=strlen(p->f_keys[i].filename);
+ }
+ strncat(pos_rewrite_line, pos_origin_line, strlen(p->table_line)-(pos_origin_line-p->table_line));
+
+ free(p->table_line);
+ p->table_line=rewrite_line;
+ return;
+}
+void _get_foregin_keys(struct serial_rule_t* p_rule, int* foreign_columns, int n_foreign, const char* dir, void* logger)
+{
+ int i=0;
+ const char* p_foreign=NULL;
+ int foreign_key_size=0;
+ p_rule->f_keys=ALLOC(struct foreign_key, n_foreign);
+ for(i=0; i<n_foreign; i++)
+ {
+ p_foreign=find_Nth_column(p_rule->table_line, foreign_columns[i], &foreign_key_size);
+ if(p_foreign==NULL)
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
+ "Get %s,%d foreign keys failed: No %dth column.",
+ p_rule->table_name, p_rule->rule_id, foreign_columns[i]);
+ continue;
+ }
+ if(0==strncasecmp(p_foreign, "null", strlen("null")))
+ {//emtpy file
+ continue;
+ }
+ if(0!=strncmp(p_foreign, foreign_source_prefix, strlen(foreign_source_prefix)))
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
+ "Get %s,%d foreign key failed: Invalid source prefix %s.",
+ p_rule->table_name, p_rule->rule_id, p_foreign);
+ continue;
+ }
+ p_rule->f_keys[p_rule->n_foreign].column=foreign_columns[i];
+ foreign_key_size=foreign_key_size-strlen(foreign_source_prefix);
+ p_foreign+=strlen(foreign_source_prefix);
+ if(0!=strncmp(p_foreign, foreign_key_prefix, strlen(foreign_key_prefix)))
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
+ "%s, %d foreign key prefix %s is not recommended.",
+ p_rule->table_name, p_rule->rule_id, p_foreign);
+ }
+ p_rule->f_keys[p_rule->n_foreign].key=ALLOC(char, foreign_key_size+1);
+ memcpy(p_rule->f_keys[p_rule->n_foreign].key, p_foreign, foreign_key_size);
+ p_rule->f_keys[p_rule->n_foreign].filename=get_foreign_cont_filename(p_rule->table_name, p_rule->rule_id, p_rule->f_keys[p_rule->n_foreign].key, dir);
+ p_rule->n_foreign++;
+ }
+ if(p_rule->n_foreign==0)
+ {
+ free(p_rule->f_keys);
+ p_rule->f_keys=NULL;
+ }
+ return;
+}
+int get_foreign_keys_define(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, _Maat_feather_t* feather, const char* dir,void *logger)
+{
+ int i=0;
+ int rule_with_foreign_key=0;
+ struct Maat_table_schema* p_table=NULL;
+ struct plugin_table_schema* plugin_desc=NULL;
+ for(i=0; i<rule_num; i++)
+ {
+ if(rule_list[i].table_line==NULL)
+ {
+ continue;
+ }
+ p_table=Maat_table_get_desc_by_name(feather->table_mgr, rule_list[i].table_name);
+ if(!p_table||p_table->table_type!=TABLE_TYPE_PLUGIN)
+ {
+ continue;
+ }
+ plugin_desc= &(p_table->plugin);
+ if(plugin_desc->n_foreign==0)
+ {
+ continue;
+ }
+ _get_foregin_keys(rule_list+i, plugin_desc->foreign_columns, plugin_desc->n_foreign, dir, logger);
+ rule_with_foreign_key++;
+ }
+ return rule_with_foreign_key;
+}
+int get_foreign_keys_by_prefix(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, const char* dir,void *logger)
+{
+ int i=0, j=0, foreign_key_size=0;
+ int rule_with_foreign_key=0;
+ const char* p_foreign=NULL;
+
+ int n_foreign=0;
+ int foreign_columns[MAX_FOREIGN_CLMN_NUM];
+ for(i=0; i<rule_num; i++)
+ {
+ j=1;
+ n_foreign=0;
+ do{
+ p_foreign=find_Nth_column(rule_list[i].table_line, j, &foreign_key_size);
+ if(p_foreign!=NULL&&foreign_key_size>(int)strlen(foreign_source_prefix)&&0==strncmp(p_foreign,foreign_source_prefix, strlen(foreign_source_prefix)))
+ {
+ foreign_columns[n_foreign]=j;
+ n_foreign++;
+ }
+ j++;
+ }while(p_foreign!=NULL&&n_foreign<MAX_FOREIGN_CLMN_NUM);
+ if(n_foreign>0)
+ {
+ _get_foregin_keys(rule_list+i, foreign_columns, n_foreign,dir,logger);
+ rule_with_foreign_key++;
+ }
+ }
+ return rule_with_foreign_key;
+}
+
+struct foreign_conts_track
+{
+ int rule_idx;
+ int foreign_idx;
+};
+void _get_foreign_conts(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, int print_fn, void *logger)
+{
+ int i=0, j=0;
+ UNUSED int ret=0;
+ int key_num=0;
+ struct foreign_conts_track* track=ALLOC(struct foreign_conts_track, rule_num*MAX_FOREIGN_CLMN_NUM);
+ char redis_cmd[256];
+ redisReply* reply=NULL;
+ struct serial_rule_t*p=NULL;
+ FILE* fp=NULL;
+ struct stat file_info;
+
+ for(i=0;i<rule_num;i++)
+ {
+ p=rule_list+i;
+ if(p->n_foreign==0)
+ {
+ continue;
+ }
+ if(p->op==MAAT_OP_DEL)
+ {
+ for(j=0; j<rule_list[i].n_foreign; j++)
+ {
+ if(rule_list[i].f_keys[j].filename==NULL)
+ {
+ continue;
+ }
+ //ret=system_cmd_rm(rule_list[i].f_keys[j].filename);
+ ret=remove(rule_list[i].f_keys[j].filename);
+ if(ret==-1)
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_module,
+ "Foreign content file %s remove failed.",
+ rule_list[i].f_keys[j].filename);
+ }
+ }
+ }
+ else
+ {
+ for(j=0; j<p->n_foreign; j++)
+ {
+ if(rule_list[i].f_keys[j].filename==NULL)
+ {
+ continue;
+ }
+ ret=stat(p->f_keys[j].filename, &file_info);
+ if(ret==0)
+ {
+ continue;
+ }
+ snprintf(redis_cmd,sizeof(redis_cmd),"GET %s", p->f_keys[j].key);
+ ret=redisAppendCommand(ctx, redis_cmd);
+ track[key_num].rule_idx=i;
+ track[key_num].foreign_idx=j;
+ key_num++;
+ assert(ret==REDIS_OK);
+ }
+ }
+ }
+ for(i=0;i<key_num;i++)
+ {
+ ret=_wrap_redisGetReply(ctx,&reply);
+ if(ret==REDIS_ERR)
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor
+ ,"Get %s,%d foreign key %s content failed, redis server error."
+ ,rule_list[track[i].rule_idx].table_name
+ ,rule_list[track[i].rule_idx].rule_id
+ ,rule_list[track[i].rule_idx].f_keys[track[i].foreign_idx].key);
+ break;
+ }
+
+ if(reply->type!=REDIS_REPLY_STRING)
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor
+ ,"Get %s,%d foreign key %s content failed."
+ ,rule_list[track[i].rule_idx].table_name
+ ,rule_list[track[i].rule_idx].rule_id
+ ,rule_list[track[i].rule_idx].f_keys[track[i].foreign_idx].key);
+ continue;
+ }
+ else
+ {
+ p=rule_list+track[i].rule_idx;
+ fp=fopen(p->f_keys[track[i].foreign_idx].filename, "w");
+ if(fp==NULL)
+ {
+ MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor
+ , "Write foreign content failed: fopen %s error."
+ , p->f_keys[track[i].foreign_idx]);
+ }
+ else
+ {
+ fwrite(reply->str, 1, reply->len, fp);
+ fclose(fp);
+ fp=NULL;
+ if(print_fn==1)
+ {
+ printf("Written foreign content %s\n",p->f_keys[track[i].foreign_idx].filename);
+ }
+ }
+ }
+ freeReplyObject(reply);
+ reply=NULL;
+ }
+
+ free(track);
+ return;
+}
+void get_foreign_conts(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, int print_fn, void *logger)
+{
+ int max_redis_batch=4*1024,batch_cnt=0;
+ int success_cnt=0;
+ while(success_cnt<rule_num)
+ {
+ batch_cnt=MIN(rule_num-success_cnt,max_redis_batch);
+ _get_foreign_conts(ctx,rule_list+success_cnt,batch_cnt,print_fn,logger);
+ success_cnt+=batch_cnt;
+ }
+ return;
+}
+
+void redis_monitor_traverse(long long version, struct source_redis_ctx* mr_ctx,
+ void (*start)(long long,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para
+ int (*update)(const char* ,const char*,void* ),//table name ,line ,u_para
+ void (*finish)(void*),//u_para
+ void* u_para,
+ const char* dec_key,
+ _Maat_feather_t* feather)
+{
+ int table_id=0, i=0, rule_num=0, empty_value_num=0, valid_column=-1;
+ int ret=0;
+ struct serial_rule_t* rule_list=NULL;
+ int update_type=CM_UPDATE_TYPE_INC;
+ long long new_version=0;
+ enum MAAT_TABLE_TYPE table_type;
+ enum MAAT_SCAN_TYPE scan_type;
+ struct Maat_table_schema* table_schema=NULL;
+ void* logger=feather->logger;
+
+ if(mr_ctx->write_ctx!=NULL&&mr_ctx->write_ctx->err==0)//authorized to write
+ {
+ //For thread safe, deliberately use redis_read_ctx but not redis_write_ctx.
+ if(1==redlock_try_lock(mr_ctx->read_ctx, mr_expire_lock, mr_expire_lock_timeout))
+ {
+ check_maat_expiration(mr_ctx->read_ctx, logger);
+ cleanup_update_status(mr_ctx->read_ctx, logger);
+ redlock_unlock(mr_ctx->read_ctx, mr_expire_lock);
+ }
+ }
+ if(mr_ctx->read_ctx==NULL||mr_ctx->read_ctx->err)
+ {
+ if(time(NULL)-mr_ctx->last_reconnect_time < MAAT_REDIS_RECONNECT_INTERVAL)
+ {
+ return;
+ }
+ mr_ctx->last_reconnect_time=time(NULL);
+ if(mr_ctx->read_ctx!=NULL)
+ {
+ redisFree(mr_ctx->read_ctx);
+ }
+ MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, "Reconnecting...");
+ mr_ctx->read_ctx=connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db, feather->logger);
+ if(mr_ctx->read_ctx==NULL)
+ {
+ return;
+ }
+ else
+ {
+ version=0;//Trigger full update when reconnect to redis.
+ }
+ }
+
+ rule_num=get_rm_key_list(mr_ctx->read_ctx, version, feather->load_version_from, &new_version, feather->table_mgr, &rule_list, &update_type, logger, feather->cumulative_update_off);
+ if(rule_num<0)//redis communication error
+ {
+ return;
+ }
+ feather->load_version_from=0;//only valid for one time.
+ if(rule_num==0&&update_type==CM_UPDATE_TYPE_INC)//error or nothing changed
+ {
+ return;
+ }
+ if(rule_num>0)
+ {
+ ret=get_maat_redis_value(mr_ctx->read_ctx, rule_list, rule_num, logger, 0);
+ if(ret<0)
+ {
+ MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Get Redis value failed, abandon update.");
+ goto clean_up;
+ }
+ for(i=0;i<rule_num;i++)
+ {
+ if(rule_list[i].table_line==NULL)
+ {
+ empty_value_num++;
+ }
+ }
+ if(empty_value_num==rule_num)
+ {
+ MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor,"All %d rules are empty, abandon update.",empty_value_num);
+ goto clean_up;
+ }
+ if(empty_value_num>0)
+ {
+
+ MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor,"%d of %d rules are empty.",empty_value_num,rule_num);
+ }
+ ret=get_foreign_keys_define(mr_ctx->read_ctx, rule_list, rule_num, feather, feather->foreign_cont_dir, logger);
+ if(ret>0)
+ {
+ get_foreign_conts(mr_ctx->read_ctx, rule_list, rule_num, 0, logger);
+ }
+ }
+ start(new_version,update_type,u_para);
+ MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Start %s update: %lld -> %lld (%d entries).",
+ update_type==CM_UPDATE_TYPE_INC?"INC":"FULL",version,new_version,rule_num);
+ for(i=0;i<rule_num;i++)
+ {
+ if(rule_list[i].table_line==NULL||rule_list[i].with_error==1)
+ {
+ continue;
+ }
+ table_id=Maat_table_get_id_by_name(feather->table_mgr, rule_list[i].table_name);
+ if(table_id<0)//Unrecognized table.
+ {
+ continue;
+ }
+ table_type=Maat_table_get_type_by_id(feather->table_mgr, table_id);
+ if(rule_list[i].op==MAAT_OP_DEL)
+ {
+
+ scan_type=Maat_table_get_scan_type(table_type);
+ table_schema=Maat_table_get_scan_by_id(feather->table_mgr, table_id, scan_type, NULL);
+ valid_column=Maat_table_xx_plugin_table_get_valid_flag_column(table_schema);
+ ret=invalidate_line(rule_list[i].table_line, table_type, valid_column);
+ if(ret<0)
+ {
+ MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Invalidate line failed, invaid format %s ."
+ ,rule_list[i].table_line);
+ continue;
+ }
+ }
+ if(rule_list[i].n_foreign>0)
+ {
+ rewrite_table_line_with_foreign(rule_list+i);
+ }
+ update(rule_list[i].table_name,rule_list[i].table_line,u_para);
+ }
+ finish(u_para);
+
+clean_up:
+ for(i=0;i<rule_num;i++)
+ {
+ empty_serial_rules(rule_list+i);
+ }
+ free(rule_list);
+ rule_list=NULL;
+ return;
+}
+
+void _maat_copy_region(struct Maat_region_t* dst,const struct Maat_region_t* src)
+{
+ memcpy(dst,src,sizeof(struct Maat_region_t));
+ if(src->table_name!=NULL)
+ {
+ dst->table_name=_maat_strdup(src->table_name);
+ }
+ switch(dst->region_type)
+ {
+ case REGION_IP:
+ dst->ip_rule.src_ip=_maat_strdup(src->ip_rule.src_ip);
+ dst->ip_rule.mask_src_ip=_maat_strdup(src->ip_rule.mask_src_ip);
+ dst->ip_rule.dst_ip=_maat_strdup(src->ip_rule.dst_ip);
+ dst->ip_rule.mask_dst_ip=_maat_strdup(src->ip_rule.mask_dst_ip);
+ break;
+ case REGION_EXPR:
+ dst->expr_rule.keywords=_maat_strdup(src->expr_rule.keywords);
+ dst->expr_rule.district=_maat_strdup(src->expr_rule.district);
+ break;
+ case REGION_INTERVAL:
+ break;
+ case REGION_DIGEST:
+ dst->digest_rule.digest_string=_maat_strdup(src->digest_rule.digest_string);
+ break;
+ case REGION_SIMILARITY:
+ dst->similarity_rule.target=_maat_strdup(src->similarity_rule.target);
+ break;
+ default:
+ assert(0);
+ }
+ return;
+}
+void _maat_empty_region(struct Maat_region_t* p)
+{
+ free((char*)p->table_name);
+ p->table_name=NULL;
+ switch(p->region_type)
+ {
+ case REGION_IP:
+ free((char*)p->ip_rule.src_ip);
+ free((char*)p->ip_rule.mask_src_ip);
+ free((char*)p->ip_rule.dst_ip);
+ free((char*)p->ip_rule.mask_dst_ip);
+ break;
+ case REGION_EXPR:
+ free((char*)p->expr_rule.keywords);
+ free((char*)p->expr_rule.district);
+ break;
+ case REGION_INTERVAL:
+ break;
+ case REGION_DIGEST:
+ free((char*)p->digest_rule.digest_string);
+ break;
+ case REGION_SIMILARITY:
+ free((char*)p->similarity_rule.target);
+ break;
+ default:
+ assert(0);
+ }
+ memset(p,0,sizeof(struct Maat_region_t));
+ return;
+
+}
+int Maat_command_raw_set_lines(Maat_feather_t feather,const struct Maat_cmd_line** line_rule, size_t n_line ,enum MAAT_OPERATION op)
+{
+ size_t i=0;
+ _Maat_feather_t* _feather=(_Maat_feather_t*)feather;
+ int ret=0, table_id=0,success_cnt=0;
+ struct serial_rule_t *s_rule=NULL;
+ long long server_time=0,absolute_expire_time=0;
+ redisContext* write_ctx=get_redis_ctx_for_write(_feather);
+ if(write_ctx==NULL)
+ {
+ return -1;
+ }
+ server_time=redis_server_time(write_ctx);
+ if(!server_time)
+ {
+ return -1;
+ }
+ s_rule=ALLOC(struct serial_rule_t, n_line);
+ for(i=0;i<n_line;i++)
+ {
+ table_id=Maat_table_get_id_by_name(_feather->table_mgr, line_rule[i]->table_name);
+ if(table_id<0)
+ {
+ MESA_handle_runtime_log(_feather->logger, RLOG_LV_FATAL, maat_command,
+ "Command raw set line id %d failed: unknown table %s.",
+ line_rule[i]->rule_id,
+ line_rule[i]->table_name);
+ ret=-1;
+ goto error_out;
+ }
+ if(op==MAAT_OP_RENEW_TIMEOUT)
+ {
+ assert(line_rule[i]->expire_after>0);
+ }
+ if(line_rule[i]->expire_after>0)
+ {
+ absolute_expire_time=server_time+line_rule[i]->expire_after;
+ }
+ set_serial_rule(s_rule+i, op,line_rule[i]->rule_id,line_rule[i]->label_id, line_rule[i]->table_name,
+ line_rule[i]->table_line, absolute_expire_time);
+ }
+ success_cnt=exec_serial_rule(write_ctx,s_rule, n_line,server_time,_feather->logger);
+ if(success_cnt<0||(size_t)success_cnt!=n_line)//error
+ {
+ ret=-1;
+ goto error_out;
+ }
+ ret=success_cnt;
+ _feather->line_cmd_acc_num+=success_cnt;
+
+error_out:
+ for(i=0;i<n_line;i++)
+ {
+ empty_serial_rules(s_rule+i);
+ }
+ free(s_rule);
+ return ret;
+
+}
+int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_cmd_line** line_rule, int line_num ,enum MAAT_OPERATION op)
+{
+ int i=0, j=0;
+ _Maat_feather_t* _feather=(_Maat_feather_t*)feather;
+ int ret=0, table_id=0,success_cnt=0;
+ struct serial_rule_t *s_rule=NULL;
+ struct Maat_table_schema* p_table=NULL;
+ struct plugin_table_schema* plugin_desc=NULL;
+ long long server_time=0,absolute_expire_time=0;
+ const char* p_foreign=NULL;
+ int foreign_key_size=0;
+ redisContext* write_ctx=get_redis_ctx_for_write(_feather);
+ if(write_ctx==NULL)
+ {
+ return -1;
+ }
+ server_time=redis_server_time(write_ctx);
+ if(!server_time)
+ {
+ return -1;
+ }
+ s_rule=ALLOC(struct serial_rule_t, line_num);
+ for(i=0;i<line_num;i++)
+ {
+ table_id=Maat_table_get_id_by_name(_feather->table_mgr, line_rule[i]->table_name);
+ if(table_id<0)
+ {
+ MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command
+ ,"Command set line id %d failed: unknown table %s."
+ , line_rule[i]->rule_id
+ , line_rule[i]->table_name);
+ ret=-1;
+ goto error_out;
+ }
+ p_table=Maat_table_get_by_id_raw(_feather->table_mgr, table_id);
+ if(!p_table)
+ {
+ ret=-1;
+ goto error_out;
+ }
+ int valid_flag_column=0;
+
+ valid_flag_column=Maat_table_xx_plugin_table_get_valid_flag_column(p_table);
+ if(valid_flag_column<0)
+ {
+ MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command
+ ,"Command set line id %d failed: table %s is not a plugin or ip_plugin table."
+ , line_rule[i]->rule_id
+ , line_rule[i]->table_name);
+ ret=-1;
+ goto error_out;
+
+ }
+
+ if(op==MAAT_OP_ADD)
+ {
+ ret=get_valid_flag_offset(line_rule[i]->table_line
+ , p_table->table_type
+ , valid_flag_column);
+ if(ret<0||
+ (op==MAAT_OP_ADD&&line_rule[i]->table_line[ret]!='1'))
+ {
+ MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command
+ ,"Command set line %s %d failed: illegal valid flag."
+ , line_rule[i]->table_name, line_rule[i]->rule_id);
+ ret=-1;
+ goto error_out;
+ }
+ }
+ if(op==MAAT_OP_RENEW_TIMEOUT)
+ {
+ assert(line_rule[i]->expire_after>0);
+ }
+ if(line_rule[i]->expire_after>0)
+ {
+ absolute_expire_time=server_time+line_rule[i]->expire_after;
+ }
+ if(plugin_desc && plugin_desc->n_foreign>0)
+ {
+ for(j=0;j<plugin_desc->n_foreign;j++)
+ {
+ p_foreign=find_Nth_column(line_rule[i]->table_line, plugin_desc->foreign_columns[j], &foreign_key_size);
+ if(p_foreign==NULL)
+ {
+ MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL, maat_command
+ , "Command set line %s %d failed: No %dth column."
+ , line_rule[i]->table_name, line_rule[i]->rule_id
+ , plugin_desc->foreign_columns[j]);
+ ret=-1;
+ goto error_out;
+ }
+ if(0!=strncmp(p_foreign, foreign_source_prefix, strlen(foreign_source_prefix)))
+ {
+ MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_redis_monitor
+ ,"Command set line %s %d failed: Source prefix %s is mandatory."
+ , line_rule[i]->table_name, line_rule[i]->rule_id, foreign_source_prefix);
+ ret=-1;
+ goto error_out;
+ }
+ }
+
+ }
+ set_serial_rule(s_rule+i, op,line_rule[i]->rule_id,line_rule[i]->label_id, line_rule[i]->table_name,
+ line_rule[i]->table_line, absolute_expire_time);
+ }
+ success_cnt=exec_serial_rule(write_ctx,s_rule, line_num,server_time,_feather->logger);
+ if(success_cnt<0||success_cnt!=line_num)//error
+ {
+ ret=-1;
+ goto error_out;
+ }
+ ret=success_cnt;
+ _feather->line_cmd_acc_num+=success_cnt;
+
+error_out:
+ for(i=0;i<line_num;i++)
+ {
+ empty_serial_rules(s_rule+i);
+ }
+ free(s_rule);
+ return ret;
+}
+
+int Maat_cmd_set_line(Maat_feather_t feather,const struct Maat_cmd_line* line_rule, enum MAAT_OPERATION op)
+{
+ int ret=0;
+ ret=Maat_cmd_set_lines(feather,&line_rule, 1, op);
+ return ret;
+}
+int Maat_cmd_set_file(Maat_feather_t feather,const char* key, const char* value, size_t size, enum MAAT_OPERATION op)
+{
+ struct _Maat_feather_t* _feather=(struct _Maat_feather_t*)feather;
+ redisContext* ctx=_feather->mr_ctx.write_ctx;
+ if(ctx==NULL)
+ {
+ MESA_handle_runtime_log(_feather->logger, RLOG_LV_FATAL, maat_command, "%s failed: Redis is not connected.", __FUNCTION__);
+ return -1;
+ }
+ const char *arg_vec[3];
+ size_t len_vec[3];
+ arg_vec[0] = "SET";
+ len_vec[0] = strlen("SET");
+
+ arg_vec[1] = key;
+ len_vec[1] = strlen(key);
+
+ arg_vec[2] = value;
+ len_vec[2] = size;
+
+ redisReply *reply=NULL;
+ if(0!=strncmp(key, foreign_key_prefix, strlen(foreign_key_prefix)))
+ {
+ MESA_handle_runtime_log(_feather->logger, RLOG_LV_FATAL, maat_command, "Invalid File key, prefix %s is mandatory.", foreign_key_prefix);
+ return -1;
+ }
+ switch(op)
+ {
+ case MAAT_OP_ADD:
+ reply= (redisReply *)redisCommandArgv(ctx, sizeof(arg_vec) / sizeof(arg_vec[0]), arg_vec, len_vec);
+ break;
+ case MAAT_OP_DEL:
+ reply=_wrap_redisCommand(ctx,"EXPIRE %s %d", key, MAAT_REDIS_SYNC_TIME);
+ break;
+ default:
+ return -1;
+ break;
+ }
+ if(reply==NULL||reply->type==REDIS_REPLY_NIL||reply->type==REDIS_REPLY_ERROR)
+ {
+ MESA_handle_runtime_log(_feather->logger, RLOG_LV_FATAL, maat_command,"Set file failed, maybe Redis is busy.");
+ freeReplyObject(reply);
+ reply=NULL;
+ return -1;
+ }
+ freeReplyObject(reply);
+ reply=NULL;
+ return 1;
+}
+
+long long Maat_cmd_incrby(Maat_feather_t feather,const char* key, int increment)
+{
+ _Maat_feather_t* _feather=(_Maat_feather_t*)feather;
+ redisReply* data_reply=NULL;
+ long long result=0;
+ redisContext* write_ctx=get_redis_ctx_for_write(_feather);
+
+ if(write_ctx==NULL)
+ {
+ return -1;
+ }
+ data_reply=_wrap_redisCommand(write_ctx,"INCRBY %s %d", key, increment);
+ if(data_reply->type==REDIS_REPLY_INTEGER)
+ {
+ result=data_reply->integer;
+ }
+ else
+ {
+ result=-1;
+ }
+ freeReplyObject(data_reply);
+ data_reply=NULL;
+ return result;
+}
+int Maat_command_get_new_group_id(Maat_feather_t feather)
+{
+ int group_id=0;
+ group_id=(int) Maat_cmd_incrby(feather, mr_group_id_var, 1);
+ return group_id;
+}
+int Maat_command_get_new_region_id(Maat_feather_t feather)
+{
+ int region_id=0;
+ region_id=(int) Maat_cmd_incrby(feather, mr_region_id_var, 1);
+ return region_id;
+}
+
+void Maat_cmd_key_free(struct Maat_cmd_key**keys, int size)
+{
+ int i=0;
+ struct Maat_cmd_key* p=*keys;
+ for(i=0; i<size; i++, p++)
+ {
+ free(p->table_name);
+ p->table_name=NULL;
+ p->rule_id=0;
+ }
+ free(*keys);
+ *keys=NULL;
+ return;
+}
+
+int Maat_cmd_key_select(Maat_feather_t feather, int label_id, struct Maat_cmd_key** keys)
+{
+ _Maat_feather_t* _feather=(_Maat_feather_t*)feather;
+ redisReply* data_reply=NULL;
+ char* tmp=NULL;
+ unsigned int i=0;
+ struct Maat_cmd_key* result=NULL;
+ int result_cnt=0;
+ redisContext* write_ctx=get_redis_ctx_for_write(_feather);
+ if(write_ctx==NULL)
+ {
+ return -1;
+ }
+
+ data_reply=_wrap_redisCommand(write_ctx,"ZRANGEBYSCORE %s %d %d",
+ mr_label_sset,
+ label_id,
+ label_id);
+ result_cnt=data_reply->elements;
+ result=ALLOC(struct Maat_cmd_key, data_reply->elements);
+ for(i=0;i<data_reply->elements;i++)
+ {
+ result[i].table_name=_maat_strdup(data_reply->element[i]->str);
+ tmp=strchr(result[i].table_name, ',');
+ if(tmp!=NULL)
+ {
+ *tmp='\0';
+ tmp++;
+ result[i].rule_id=atoi(tmp);
+ }
+ else// old version compatible
+ {
+ result[i].rule_id=atoi(result[i].table_name);
+ free(result[i].table_name);
+ result[i].table_name=NULL;
+ }
+ }
+ freeReplyObject(data_reply);
+ data_reply=NULL;
+
+ *keys=result;
+ return result_cnt;
+}
+
+int redis_flush_DB(redisContext* ctx, int db_index, void* logger)
+{
+ redisReply* data_reply=NULL;
+ long long maat_redis_version=0, dbsize=0;
+ int append_cmd_cnt=0, i=0,ret=0;
+ int redis_transaction_success=1;
+
+ data_reply=_wrap_redisCommand(ctx, "WATCH MAAT_VERSION");
+ freeReplyObject(data_reply);
+ data_reply=NULL;
+ data_reply=_wrap_redisCommand(ctx, "GET MAAT_VERSION");
+ if(data_reply->type==REDIS_REPLY_NIL)
+ {
+ maat_redis_version=0;
+ }
+ else
+ {
+ maat_redis_version=read_redis_integer(data_reply);
+ maat_redis_version++;
+ freeReplyObject(data_reply);
+ data_reply=NULL;
+ }
+ data_reply=_wrap_redisCommand(ctx, "DBSIZE");
+ dbsize=read_redis_integer(data_reply);
+ freeReplyObject(data_reply);
+ data_reply=NULL;
+
+ data_reply=_wrap_redisCommand(ctx,"MULTI");
+ freeReplyObject(data_reply);
+ data_reply=NULL;
+
+ redisAppendCommand(ctx,"FLUSHDB");
+ append_cmd_cnt++;
+ redisAppendCommand(ctx,"SET MAAT_VERSION %lld",maat_redis_version);
+ append_cmd_cnt++;
+ redisAppendCommand(ctx,"SET MAAT_PRE_VER %lld",maat_redis_version);
+ append_cmd_cnt++;
+ redisAppendCommand(ctx,"SET %s 1", mr_region_id_var);
+ append_cmd_cnt++;
+ redisAppendCommand(ctx,"SET %s 1", mr_group_id_var);
+ append_cmd_cnt++;
+ redisAppendCommand(ctx,"EXEC");
+ append_cmd_cnt++;
+ for(i=0;i<append_cmd_cnt;i++)
+ {
+ ret=_wrap_redisGetReply(ctx, &data_reply);
+ if(ret==REDIS_OK)
+ {
+ if(0==mr_transaction_success(data_reply))
+ {
+ redis_transaction_success=0;
+ }
+ freeReplyObject(data_reply);
+ data_reply=NULL;
+ }
+ }
+ if(redis_transaction_success==1)
+ {
+ MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_command
+ ,"FlushDB %d, MAAT_VERSION=%llu, DBSize=%llu."
+ ,db_index, (maat_redis_version==0)?0:(maat_redis_version-1),dbsize
+ );
+ }
+ return redis_transaction_success;
+}
+TAILQ_HEAD(serial_rule_q, serial_rule_t);
+
+struct Maat_command_batch
+{
+ int batch_size;
+ serial_rule_q queue;
+ struct _Maat_feather_t * feather;
+ long long server_time;
+};
+struct Maat_command_batch* Maat_command_batch_new(Maat_feather_t feather)
+{
+ struct Maat_command_batch* batch=ALLOC(struct Maat_command_batch, 1);
+ TAILQ_INIT(&batch->queue);
+ batch->feather=(struct _Maat_feather_t *)feather;
+ redisContext* write_ctx=get_redis_ctx_for_write(batch->feather);
+ if(write_ctx==NULL)
+ {
+ free(batch);
+ return NULL;
+ }
+ batch->server_time=redis_server_time(write_ctx);
+ if(!batch->server_time)
+ {
+ free(batch);
+ return NULL;
+ }
+ return batch;
+}
+
+int Maat_command_batch_set_region(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_region* region, int group_id)
+{
+ struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1);
+ long long absolute_expire_time=0;
+ char line[MAX_TABLE_LINE_SIZE];
+
+ serialize_region(region, group_id, line, sizeof(line));
+
+ set_serial_rule(s_rule, op, region->region_id, 0, region->table_name,
+ line, absolute_expire_time);
+ TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries);
+ batch->batch_size++;
+ return 0;
+
+}
+#define TO_GROUP2X_KEY(group_id, parent_id) ((unsigned long)group_id<<32|parent_id)
+
+int Maat_command_batch_set_group2group(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g)
+{
+ struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1);
+ long long absolute_expire_time=0;
+ char line[MAX_TABLE_LINE_SIZE];
+
+ serialize_group2group(op, g2g, line, sizeof(line));
+
+ set_serial_rule(s_rule, op, TO_GROUP2X_KEY(g2g->group_id, g2g->superior_group_id), 0, g2g->table_name,
+ line, absolute_expire_time);
+
+ TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries);
+ batch->batch_size++;
+ return 0;
+}
+int Maat_command_batch_set_group2compile(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c)
+{
+ struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1);
+ long long absolute_expire_time=0;
+ char line[MAX_TABLE_LINE_SIZE];
+
+ serialize_group2compile(op, g2c, line, sizeof(line));
+ set_serial_rule(s_rule, op, TO_GROUP2X_KEY(g2c->group_id, g2c->compile_id), 0, g2c->table_name,
+ line, absolute_expire_time);
+ TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries);
+ batch->batch_size++;
+ return 0;
+}
+int Maat_command_batch_set_compile(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_rule_t* compile, const char* table_name, const char * huge_service_defined, int clause_num, int label_id, int expire_after)
+{
+
+ struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1);
+ long long absolute_expire_time=0;
+ char line[MAX_TABLE_LINE_SIZE];
+ serialize_compile(compile, huge_service_defined, clause_num, op, line, sizeof(line));
+
+ if(expire_after>0)
+ {
+ absolute_expire_time=batch->server_time+expire_after;
+ }
+ set_serial_rule(s_rule, op, compile->config_id, label_id, table_name,
+ line, absolute_expire_time);
+
+ TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries);
+ batch->batch_size++;
+ return 0;
+}
+int Maat_command_batch_commit(struct Maat_command_batch* batch)
+{
+ struct serial_rule_t* s_rule_array=ALLOC(struct serial_rule_t, batch->batch_size);
+ int i=0;
+ redisContext* write_ctx=get_redis_ctx_for_write(batch->feather);
+ struct serial_rule_t * tmp = TAILQ_FIRST(&batch->queue);
+
+ while(tmp != NULL)
+ {
+ TAILQ_REMOVE(&batch->queue, tmp, entries);
+ memcpy(s_rule_array+i, tmp, sizeof(*tmp));
+ free(tmp);
+ tmp = TAILQ_FIRST(&batch->queue);
+ i++;
+ }
+ assert(i==batch->batch_size);
+ exec_serial_rule(write_ctx, s_rule_array, batch->batch_size, batch->server_time, batch->feather->logger);
+ for(i=0; i<batch->batch_size; i++)
+ {
+ empty_serial_rules(s_rule_array+i);
+ }
+ free(s_rule_array);
+ free(batch);
+ return i;
+}
+
+int Maat_command_raw_set_compile(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_rule_t* compile, const char* table_name, const char * huge_service_defined, int clause_num, int label_id, int expire_after)
+{
+ struct Maat_command_batch* batch=NULL;
+ batch=Maat_command_batch_new(feather);
+ Maat_command_batch_set_compile(batch, op, compile, table_name, huge_service_defined, clause_num, label_id, expire_after);
+ Maat_command_batch_commit(batch);
+ return 0;
+}
+int Maat_command_raw_set_region(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_region* region, int group_id)
+{
+ struct Maat_command_batch* batch=NULL;
+ batch=Maat_command_batch_new(feather);
+ Maat_command_batch_set_region(batch, op, region, group_id);
+ Maat_command_batch_commit(batch);
+ return 0;
+}
+int Maat_command_raw_set_group2compile(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c)
+{
+ struct Maat_command_batch* batch=NULL;
+ batch=Maat_command_batch_new(feather);
+ Maat_command_batch_set_group2compile(batch, op, g2c);
+ Maat_command_batch_commit(batch);
+ return 0;
+}
+int Maat_command_raw_set_group2group(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g)
+{
+
+ struct Maat_command_batch* batch=NULL;
+ batch=Maat_command_batch_new(feather);
+ Maat_command_batch_set_group2group(batch, op, g2g);
+ Maat_command_batch_commit(batch);
+ return 0;
+}
+
+int Maat_cmd_flushDB(Maat_feather_t feather)
+{
+ _Maat_feather_t* _feather=(_Maat_feather_t*)feather;
+ int ret=0;
+ redisContext* write_ctx=get_redis_ctx_for_write(_feather);
+ if(write_ctx==NULL)
+ {
+ return -1;
+ }
+ do
+ {
+ ret=redis_flush_DB(_feather->mr_ctx.write_ctx, _feather->mr_ctx.redis_db, _feather->logger);
+ }while(ret==0);
+ return 0;
+}
+
diff --git a/src/entry/Maat_rule.cpp b/src/entry/Maat_rule.cpp
index 29573c6..05e67d9 100644
--- a/src/entry/Maat_rule.cpp
+++ b/src/entry/Maat_rule.cpp
@@ -57,7 +57,7 @@ extern "C"
}
#endif
-int MAAT_FRAME_VERSION_3_6_4_20220423=1;
+int MAAT_FRAME_VERSION_3_6_5_20220426=1;
int is_valid_table_name(const char* str)
{
diff --git a/src/entry/Maat_table_runtime.cpp b/src/entry/Maat_table_runtime.cpp
index a87148b..657a886 100644
--- a/src/entry/Maat_table_runtime.cpp
+++ b/src/entry/Maat_table_runtime.cpp
@@ -58,6 +58,11 @@ struct ip_rule* ip_plugin_row2ip_rule(const struct ip_plugin_table_schema* sche
range_rule->user_tag=NULL;
return range_rule;
}
+void ip_rule_free(struct ip_rule* p)
+{
+ free(p);
+ return;
+}
struct Maat_table_runtime_manager
{
struct Maat_table_runtime** table_rt;
@@ -452,7 +457,12 @@ void Maat_table_runtime_fqdn_plugin_new_row(struct Maat_table_runtime* table_rt,
if(atoi(row+is_valid_offset)==1)//add
{
fqdn_rule=fqdn_rule_new((unsigned int)atoi(row+row_id_offset), row+fqdn_offset, fqdn_len, atoi(row+is_suffix_flag_offset));
- EX_data_rt_row2EX_data(fqdn_plugin_rt->ex_data_rt, row, row+row_id_offset, row_id_len, fqdn_rule, logger);
+ ret=EX_data_rt_row2EX_data(fqdn_plugin_rt->ex_data_rt, row, row+row_id_offset, row_id_len, fqdn_rule, logger);
+ if(ret<0)
+ {
+ fqdn_rule_free(fqdn_rule);
+ fqdn_rule=NULL;
+ }
}
else
{
@@ -690,13 +700,19 @@ void Maat_table_runtime_ip_plugin_new_row(struct Maat_table_runtime* table_rt, s
}
if(atoi(row+is_valid_offset)==1)//add
{
- EX_data_rt_row2EX_data(ip_plugin_rt->ex_data_rt, row, row+row_id_offset, row_id_len, ip_rule, logger);
+ ret=EX_data_rt_row2EX_data(ip_plugin_rt->ex_data_rt, row, row+row_id_offset, row_id_len, ip_rule, logger);
+ if(ret<0)
+ {
+ ip_rule_free(ip_rule);
+ ip_rule=NULL;
+ }
}
else
{
- EX_data_rt_delete_by_row(ip_plugin_rt->ex_data_rt, row, row+row_id_offset, row_id_len, logger);
- free(ip_rule);
+ ret=EX_data_rt_delete_by_row(ip_plugin_rt->ex_data_rt, row, row+row_id_offset, row_id_len, logger);
+ ip_rule_free(ip_rule);
+ ip_rule=NULL;
}
table_rt->origin_rule_num=EX_data_rt_get_ex_container_count(ip_plugin_rt->ex_data_rt);
}