summaryrefslogtreecommitdiff
path: root/src/mrl_redis.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/mrl_redis.c')
-rw-r--r--src/mrl_redis.c74
1 files changed, 54 insertions, 20 deletions
diff --git a/src/mrl_redis.c b/src/mrl_redis.c
index 113ba68..966c94a 100644
--- a/src/mrl_redis.c
+++ b/src/mrl_redis.c
@@ -24,10 +24,12 @@ void ht_nominee_free_cb(void * data)
struct mrl_nominee_item *nominee_item = (struct mrl_nominee_item *)data;
if(nominee_item != NULL)
{
+ MESA_handle_runtime_log(mrl_instance.mrl_log_handle, RLOG_LV_DEBUG,"ht_nominee_free_cb","the nominee item %s is free.",nominee_item->ip_addr);
free(nominee_item);
nominee_item = NULL;
global_stat.free_memory += sizeof(struct mrl_nominee_item);
}
+
}
MESA_htable_handle mrl_htable_init(void * fn_data_free_cb)
@@ -112,12 +114,28 @@ int Maat_plugin_table(Maat_feather_t feather,const char* table_name, Maat_start_
}
-Maat_feather_t mrl_Maat_feather_init(const char * instance_name,const char *table_name, Maat_start_callback_t *start,Maat_update_callback_t *update,Maat_finish_callback_t *finish,void *u_para)
+Maat_feather_t mrl_Maat_feather_init()
{
- printf("mrl maat feather init\n");
+ MESA_handle_runtime_log(mrl_instance.mrl_log_handle, RLOG_LV_DEBUG,"mrl_Maat_feather_init","start init Maat feather!");
Maat_feather_t feather;
- feather = Maat_init(instance_name,mrl_instance.mrl_cfg.Maat_redis_ip,mrl_instance.mrl_cfg.Maat_redis_port,mrl_instance.mrl_cfg.Maat_stat_path);
- Maat_plugin_table(feather,table_name,start,update,finish,u_para);
+ const char *nominee_instance = "mrl_nominee";
+ const char *candidate_instance = "mrl_candidate";
+ // init Maat
+ feather = Maat_feather(mrl_instance.mrl_cfg.Maat_max_threads, mrl_instance.mrl_cfg.Maat_table_path, mrl_instance.mrl_log_handle);
+ wrapped_Maat_set_feather_opt(feather, MAAT_OPT_INSTANCE_NAME, nominee_instance, strlen(nominee_instance)+1);
+ wrapped_Maat_set_feather_opt(feather, MAAT_OPT_INSTANCE_NAME, candidate_instance, strlen(candidate_instance)+1);
+ wrapped_Maat_set_feather_opt(feather, MAAT_OPT_REDIS_IP, mrl_instance.mrl_cfg.Maat_redis_ip, strlen(mrl_instance.mrl_cfg.Maat_redis_ip)+1);
+ wrapped_Maat_set_feather_opt(feather, MAAT_OPT_REDIS_PORT, &(mrl_instance.mrl_cfg.Maat_redis_port), sizeof(mrl_instance.mrl_cfg.Maat_redis_port));
+ wrapped_Maat_set_feather_opt(feather, MAAT_OPT_STAT_FILE_PATH, mrl_instance.mrl_cfg.Maat_stat_path, strlen(mrl_instance.mrl_cfg.Maat_stat_path)+1);
+ wrapped_Maat_set_feather_opt(feather, MAAT_OPT_STAT_ON, NULL, 0);
+ wrapped_Maat_set_feather_opt(feather, MAAT_OPT_PERF_ON, NULL, 0);
+ int ret = Maat_initiate_feather(feather);
+ if(ret< 0)
+ {
+ MESA_handle_runtime_log(mrl_instance.mrl_log_handle, RLOG_LV_FATAL,"Maat_init","Maat_initiate_feather func error!");
+ assert(0);
+ }
+ Maat_plugin_table(feather,IR_NOMINEE_IP_TABLE_NAME,NULL,nominee_update_cb,NULL,NULL);
return feather;
}
@@ -149,7 +167,6 @@ void mrl_create_candidate_item(struct mrl_candidate_item *candidate_item, struct
void nominee_update_cb(int table_id,const char* table_line,void* u_para)
{
int ret = 0;
- uint32_t client_ip = 0;
struct mrl_nominee_item *nominee_item = (struct mrl_nominee_item *)calloc(1, sizeof(struct mrl_nominee_item));
global_stat.malloc_memory += sizeof(struct mrl_nominee_item);
sscanf(table_line,"%d\t%d\t%d\t%s\t%d\t%s",
@@ -159,35 +176,42 @@ void nominee_update_cb(int table_id,const char* table_line,void* u_para)
"config_id:%d,group_id:%d,addr_type:%d,ip_addr:%s,is_vaild:%d,op_time:%s",
nominee_item->config_id, nominee_item->group_id,nominee_item->addr_type,
nominee_item->ip_addr, nominee_item->is_vaild,nominee_item->op_time);
+ struct mrl_ht_nominee_key nominee_key;
+ memset(&nominee_key,0,sizeof(struct mrl_ht_nominee_key));
+ inet_pton(AF_INET,nominee_item->ip_addr,&(nominee_key.sip));
+ inet_pton(AF_INET,mrl_instance.mrl_cfg.dest_ip,&(nominee_key.dip));
+ nominee_key.sport=htons(mrl_instance.mrl_cfg.local_port);
+ nominee_key.dport=htons(mrl_instance.mrl_cfg.dest_port);
switch(nominee_item->is_vaild)
{
case 0:
- if(MESA_htable_search_cb(mrl_instance.ht_nominee, (const unsigned char *)nominee_item->ip_addr, MRL_STR_IP_LEN,NULL,NULL,NULL) != NULL)
+ if(MESA_htable_search_cb(mrl_instance.ht_nominee, (const unsigned char *)&nominee_key, sizeof(nominee_key),NULL,NULL,NULL) != NULL)
{
- ret = MESA_htable_del(mrl_instance.ht_nominee, (const unsigned char *)nominee_item->ip_addr, MRL_STR_IP_LEN, NULL);
+ ret = MESA_htable_del(mrl_instance.ht_nominee, (const unsigned char *)&nominee_key, sizeof(nominee_key), NULL);
if(ret < 0)
{
MESA_handle_runtime_log(mrl_instance.mrl_log_handle, RLOG_LV_FATAL,"nominee_update_cb","MESA_htable_del func error! ret is %d",ret);
assert(0);
}
+ MESA_handle_runtime_log(mrl_instance.mrl_log_handle, RLOG_LV_DEBUG,"nominee_update_cb","the nominee key[sip:%u, dip:%u, sport:%hu, dport:%hu] is deleted.",nominee_key.sip,nominee_key.dip,nominee_key.sport,nominee_key.dport);
}
else
{
- MESA_handle_runtime_log(mrl_instance.mrl_log_handle, RLOG_LV_DEBUG,"nominee_update_cb","the nominee item %s is not exsit in nominee htable.",nominee_item->ip_addr);
+ MESA_handle_runtime_log(mrl_instance.mrl_log_handle, RLOG_LV_DEBUG,"nominee_update_cb","the nominee key[sip:%u, dip:%u, sport:%hu, dport:%hu] is not exsit in nominee htable.",nominee_key.sip,nominee_key.dip,nominee_key.sport,nominee_key.dport);
}
break;
case 1:
- if(MESA_htable_search_cb(mrl_instance.ht_nominee, (const unsigned char *)nominee_item->ip_addr, MRL_STR_IP_LEN,NULL,NULL,NULL) == NULL)
+ if(MESA_htable_search_cb(mrl_instance.ht_nominee, (const unsigned char *)&nominee_key, sizeof(nominee_key),NULL,NULL,NULL) == NULL)
{
- ret = MESA_htable_add(mrl_instance.ht_nominee, (const unsigned char *)nominee_item->ip_addr, MRL_STR_IP_LEN, nominee_item);
+ ret = MESA_htable_add(mrl_instance.ht_nominee, (const unsigned char *)&nominee_key, sizeof(nominee_key), nominee_item);
if(ret < 0)
{
MESA_handle_runtime_log(mrl_instance.mrl_log_handle, RLOG_LV_FATAL,"nominee_update_cb","MESA_htable_add func error! ret is %d",ret);
assert(0);
}
- inet_pton(AF_INET,nominee_item->ip_addr,&client_ip);
- ret = MESA_lqueue_join_tail(mrl_instance.mrl_queue,&client_ip,sizeof(client_ip));
- MESA_handle_runtime_log(mrl_instance.mrl_log_handle, RLOG_LV_FATAL,"nominee_update_cb","insert queue ip is %u",client_ip);
+ MESA_handle_runtime_log(mrl_instance.mrl_log_handle, RLOG_LV_DEBUG,"nominee_update_cb","insert the nominee key[sip:%u, dip:%u, sport:%hu, dport:%hu] into nominee htable.",nominee_key.sip,nominee_key.dip,nominee_key.sport,nominee_key.dport);
+ ret = MESA_lqueue_join_tail(mrl_instance.mrl_queue,&(nominee_key.sip),sizeof(nominee_key.sip));
+ MESA_handle_runtime_log(mrl_instance.mrl_log_handle, RLOG_LV_DEBUG,"nominee_update_cb","insert queue ip is %u",nominee_key.sip);
if(ret != 0)
{
MESA_handle_runtime_log(mrl_instance.mrl_log_handle, RLOG_LV_FATAL,"nominee_update_cb","MESA_lqueue_join_tail func error! ret is %d",ret);
@@ -196,7 +220,7 @@ void nominee_update_cb(int table_id,const char* table_line,void* u_para)
}
else
{
- MESA_handle_runtime_log(mrl_instance.mrl_log_handle, RLOG_LV_DEBUG,"nominee_update_cb","the nominee item %s is duplicated in nominee htable.",nominee_item->ip_addr);
+ MESA_handle_runtime_log(mrl_instance.mrl_log_handle, RLOG_LV_DEBUG,"nominee_update_cb","the nominee key[sip:%u, dip:%u, sport:%hu, dport:%hu] is duplicated in nominee htable.",nominee_key.sip,nominee_key.dip,nominee_key.sport,nominee_key.dport);
}
break;
default:
@@ -298,7 +322,7 @@ unsigned int get_gdev_ip_index(UINT32 gdev_ip)
return index;
}
-void mrl_identify_nominee(struct streaminfo *mystream)
+bool mrl_identify_nominee(struct streaminfo *mystream)
{
char ip_addr[MRL_STR_IP_LEN];
memset(ip_addr,0,MRL_STR_IP_LEN);
@@ -308,17 +332,27 @@ void mrl_identify_nominee(struct streaminfo *mystream)
struct mrl_candidate_item candidate_item;
memset(&candidate_item ,0,sizeof(struct mrl_candidate_item));
inet_ntop(AF_INET, &(mystream->addr.tuple4_v4->saddr), ip_addr, MRL_STR_IP_LEN);
- MESA_handle_runtime_log(mrl_instance.mrl_log_handle, RLOG_LV_DEBUG,"mrl_identify_nominee","cur identify nominee stream ip is %s\n",ip_addr);
- if(MESA_htable_search_cb(mrl_instance.ht_nominee, (const unsigned char *)ip_addr, MRL_STR_IP_LEN,ht_search_cb,(void *)&candidate_item,&ret) != NULL)
+ struct mrl_ht_nominee_key nominee_key;
+ memset(&nominee_key,0,sizeof(nominee_key));
+ nominee_key.sip = mystream->addr.tuple4_v4->saddr;
+ nominee_key.dip= mystream->addr.tuple4_v4->daddr;
+ nominee_key.sport= mystream->addr.tuple4_v4->source;
+ nominee_key.dport= mystream->addr.tuple4_v4->dest;
+ MESA_handle_runtime_log(mrl_instance.mrl_log_handle, RLOG_LV_DEBUG,"mrl_identify_nominee","cur stream nominee key is [sip:%u, dip:%u, sport:%hu, dport:%hu].",nominee_key.sip, nominee_key.dip, nominee_key.sport, nominee_key.dport);
+ if(MESA_htable_search_cb(mrl_instance.ht_nominee, (const unsigned char *)&nominee_key, sizeof(nominee_key),ht_search_cb,(void *)&candidate_item,&ret) != NULL)
{
global_stat.recv_detect_pkts ++;
+ MESA_handle_runtime_log(mrl_instance.mrl_log_handle, RLOG_LV_DEBUG,"mrl_identify_nominee","cur detected packet key is [sip:%u, dip:%u, sport:%hu, dport:%hu].",nominee_key.sip, nominee_key.dip, nominee_key.sport, nominee_key.dport);
get_rawpkt_opt_from_streaminfo(mystream, RAW_PKT_GET_GDEV_IP, &(gdev_ip));
- //printf("cur gdev ip is %d\n",gdev_ip);
index = get_gdev_ip_index(gdev_ip);
- //printf("cur gdev info index is %d\n",index);
assert(index < mrl_instance.mrl_cfg.vxlan_gdev_num);//������ھ�˵�������ݰ���GDEV��������̽���GDEV�У�����
mrl_get_vxlan_info(&(candidate_item.vxlan_info),index);
- Maat_set_cmd_line(mrl_instance.candidate_feather,&candidate_item);
+ Maat_set_cmd_line(mrl_instance.mrl_feather,&candidate_item);
+ return true;
+ }
+ else
+ {
+ return false;
}
}