summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorliuwentan <[email protected]>2024-02-21 19:02:13 +0800
committerliuwentan <[email protected]>2024-02-21 19:02:13 +0800
commit26d642bdcf32ed3bcbc25bf75a4a7e0b822df887 (patch)
treed4dc82ae8b303b0cc11298af7689eb6d81135d99 /src
parent7e159477ac6dd91a5fdbbc405d14b7157ae62d66 (diff)
[PATCH]delete useless cumulative logic
Diffstat (limited to 'src')
-rw-r--r--src/inc_internal/maat_redis_monitor.h2
-rw-r--r--src/inc_internal/maat_rule.h1
-rw-r--r--src/maat_redis_monitor.c326
3 files changed, 188 insertions, 141 deletions
diff --git a/src/inc_internal/maat_redis_monitor.h b/src/inc_internal/maat_redis_monitor.h
index 144024d..d1d0b99 100644
--- a/src/inc_internal/maat_redis_monitor.h
+++ b/src/inc_internal/maat_redis_monitor.h
@@ -81,7 +81,7 @@ void maat_rewrite_table_line_with_foreign(struct serial_rule *s_rule);
int maat_get_rm_key_list(redisContext *c, long long instance_version,
long long *new_version, struct table_manager *tbl_mgr,
struct serial_rule **list, int *update_type,
- int cumulative_off, struct log_handle *logger);
+ struct log_handle *logger);
void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
void (*start_fn)(long long, int, void *),
diff --git a/src/inc_internal/maat_rule.h b/src/inc_internal/maat_rule.h
index 71ce688..6f6e751 100644
--- a/src/inc_internal/maat_rule.h
+++ b/src/inc_internal/maat_rule.h
@@ -124,7 +124,6 @@ struct maat_options {
int deferred_load_on;
int maat_json_is_gzipped;
- int cumulative_update_off; //Default: cumulative update on
int gc_timeout_ms;
int rule_effect_interval_ms;
diff --git a/src/maat_redis_monitor.c b/src/maat_redis_monitor.c
index 590e096..77cb4d3 100644
--- a/src/maat_redis_monitor.c
+++ b/src/maat_redis_monitor.c
@@ -50,8 +50,9 @@ struct expected_reply {
redisReply possible_replies[REDIS_REPLY_SIZE];
};
-static char *get_foreign_cont_filename(const char *table_name, long long rule_id,
- const char *foreign_key, const char *dir)
+static char *
+get_foreign_cont_filename(const char *table_name, long long rule_id,
+ const char *foreign_key, const char *dir)
{
char buffer[512] = {0};
@@ -63,7 +64,8 @@ static char *get_foreign_cont_filename(const char *table_name, long long rule_id
return filename;
}
-static const char *maat_cmd_find_Nth_column(const char *line, int Nth, int *column_len)
+static const char *
+maat_cmd_find_Nth_column(const char *line, int Nth, int *column_len)
{
size_t i = 0;
int j = 0;
@@ -99,16 +101,17 @@ static const char *maat_cmd_find_Nth_column(const char *line, int Nth, int *colu
return line + start;
}
-static void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns,
- int n_foreign, const char *dir, struct log_handle *logger)
+static void
+get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns,
+ int n_foreign, const char *dir, struct log_handle *logger)
{
int foreign_key_size = 0;
p_rule->f_keys = ALLOC(struct foreign_key, n_foreign);
for (int i = 0; i < n_foreign; i++) {
- const char *p_foreign = maat_cmd_find_Nth_column(p_rule->table_line,
- foreign_columns[i],
- &foreign_key_size);
+ const char *p_foreign =
+ maat_cmd_find_Nth_column(p_rule->table_line, foreign_columns[i],
+ &foreign_key_size);
if (NULL == p_foreign) {
log_fatal(logger, MODULE_REDIS_MONITOR,
"[%s:%d] Get %s,%lld foreign keys failed: No %dth column",
@@ -122,10 +125,12 @@ static void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns,
continue;
}
- if (0 != strncmp(p_foreign, foreign_source_prefix, strlen(foreign_source_prefix))) {
+ if (0 != strncmp(p_foreign, foreign_source_prefix,
+ strlen(foreign_source_prefix))) {
log_fatal(logger, MODULE_REDIS_MONITOR,
- "[%s:%d] Get %s,%lld foreign key failed: Invalid source prefix %s",
- __FUNCTION__, __LINE__, p_rule->table_name, p_rule->rule_id, p_foreign);
+ "[%s:%d] Get %s,%lld foreign key failed: "
+ "Invalid source prefix %s", __FUNCTION__, __LINE__,
+ p_rule->table_name, p_rule->rule_id, p_foreign);
continue;
}
@@ -133,18 +138,19 @@ static void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns,
foreign_key_size = foreign_key_size - strlen(foreign_source_prefix);
p_foreign += strlen(foreign_source_prefix);
- if (0 != strncmp(p_foreign, foreign_key_prefix, strlen(foreign_key_prefix))) {
+ if (0 != strncmp(p_foreign, foreign_key_prefix,
+ strlen(foreign_key_prefix))) {
log_info(logger, MODULE_REDIS_MONITOR,
"[%s:%d] %s, %lld foreign key prefix %s is not recommended",
- __FUNCTION__, __LINE__, p_rule->table_name, p_rule->rule_id, p_foreign);
+ __FUNCTION__, __LINE__, p_rule->table_name, p_rule->rule_id,
+ p_foreign);
}
p_rule->f_keys[p_rule->n_foreign].key = ALLOC(char, foreign_key_size + 1);
memcpy(p_rule->f_keys[p_rule->n_foreign].key, p_foreign, foreign_key_size);
- p_rule->f_keys[p_rule->n_foreign].filename = get_foreign_cont_filename(p_rule->table_name,
- p_rule->rule_id,
- p_rule->f_keys[p_rule->n_foreign].key,
- dir);
+ p_rule->f_keys[p_rule->n_foreign].filename =
+ get_foreign_cont_filename(p_rule->table_name, p_rule->rule_id,
+ p_rule->f_keys[p_rule->n_foreign].key, dir);
p_rule->n_foreign++;
}
@@ -153,8 +159,9 @@ static void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns,
}
}
-static int get_foreign_keys_define(redisContext *ctx, struct serial_rule *rule_list,
- int rule_num, struct maat *maat_inst, const char *dir)
+static int
+get_foreign_keys_define(redisContext *ctx, struct serial_rule *rule_list,
+ int rule_num, struct maat *maat_inst, const char *dir)
{
int rule_with_foreign_key = 0;
@@ -163,29 +170,37 @@ static int get_foreign_keys_define(redisContext *ctx, struct serial_rule *rule_l
continue;
}
- int table_id = table_manager_get_table_id(maat_inst->tbl_mgr, rule_list[i].table_name);
+ int table_id = table_manager_get_table_id(maat_inst->tbl_mgr,
+ rule_list[i].table_name);
void *schema = table_manager_get_schema(maat_inst->tbl_mgr, table_id);
- enum table_type table_type = table_manager_get_table_type(maat_inst->tbl_mgr, table_id);
+
+ enum table_type table_type =
+ table_manager_get_table_type(maat_inst->tbl_mgr, table_id);
+
if (!schema || table_type != TABLE_TYPE_PLUGIN) {
continue;
}
int foreign_columns[8];
- int n_foreign_column = plugin_table_get_foreign_column((struct plugin_schema *)schema,
- foreign_columns);
+ int n_foreign_column =
+ plugin_table_get_foreign_column((struct plugin_schema *)schema,
+ foreign_columns);
if (0 == n_foreign_column) {
continue;
}
- _get_foregin_keys(rule_list+i, foreign_columns, n_foreign_column, dir, maat_inst->logger);
+ get_foregin_keys(rule_list+i, foreign_columns, n_foreign_column,
+ dir, maat_inst->logger);
rule_with_foreign_key++;
}
return rule_with_foreign_key;
}
-int maat_get_foreign_keys_by_prefix(redisContext *ctx, struct serial_rule *rule_list,
- int rule_num, const char* dir, struct log_handle *logger)
+int maat_get_foreign_keys_by_prefix(redisContext *ctx,
+ struct serial_rule *rule_list,
+ int rule_num, const char* dir,
+ struct log_handle *logger)
{
int j = 0;
int foreign_key_size = 0;
@@ -199,9 +214,13 @@ int maat_get_foreign_keys_by_prefix(redisContext *ctx, struct serial_rule *rule_
j = 1;
n_foreign = 0;
do {
- p_foreign = maat_cmd_find_Nth_column(rule_list[i].table_line, j, &foreign_key_size);
- if (p_foreign != NULL && foreign_key_size > (int)strlen(foreign_source_prefix) &&
- 0 == strncmp(p_foreign, foreign_source_prefix, strlen(foreign_source_prefix))) {
+ p_foreign = maat_cmd_find_Nth_column(rule_list[i].table_line, j,
+ &foreign_key_size);
+
+ if (p_foreign != NULL &&
+ foreign_key_size > (int)strlen(foreign_source_prefix) &&
+ 0 == strncmp(p_foreign, foreign_source_prefix,
+ strlen(foreign_source_prefix))) {
foreign_columns[n_foreign] = j;
n_foreign++;
}
@@ -209,7 +228,8 @@ int maat_get_foreign_keys_by_prefix(redisContext *ctx, struct serial_rule *rule_
} while (p_foreign != NULL && n_foreign < MAX_FOREIGN_CLMN_NUM);
if (n_foreign > 0) {
- _get_foregin_keys(rule_list+i, foreign_columns, n_foreign, dir, logger);
+ get_foregin_keys(rule_list+i, foreign_columns, n_foreign,
+ dir, logger);
rule_with_foreign_key++;
}
}
@@ -222,8 +242,9 @@ struct foreign_conts_track {
int foreign_idx;
};
-static int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list,
- int rule_num, struct log_handle *logger)
+static int
+get_maat_redis_value(redisContext *c, struct serial_rule *rule_list,
+ int rule_num, struct log_handle *logger)
{
int i = 0;
int failed_cnt = 0;
@@ -321,8 +342,10 @@ static int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list,
return 0;
}
-int maat_get_redis_value(redisContext *c, struct serial_rule *rule_list,
- int rule_num, int print_process, struct log_handle *logger)
+int maat_get_redis_value(redisContext *c,
+ struct serial_rule *rule_list,
+ int rule_num, int print_process,
+ struct log_handle *logger)
{
int max_redis_batch = 4096;
int success_cnt = 0;
@@ -330,7 +353,7 @@ int maat_get_redis_value(redisContext *c, struct serial_rule *rule_list,
while (success_cnt < rule_num) {
int batch_cnt = MIN(rule_num-success_cnt, max_redis_batch);
- int ret = _get_maat_redis_value(c, rule_list+success_cnt, batch_cnt, logger);
+ int ret = get_maat_redis_value(c, rule_list+success_cnt, batch_cnt, logger);
if (ret < 0) {
return -1;
} else {
@@ -352,15 +375,17 @@ int maat_get_redis_value(redisContext *c, struct serial_rule *rule_list,
return 0;
}
-static int get_inc_key_list(long long instance_version, long long target_version,
- redisContext *c, struct serial_rule **list,
- struct log_handle *logger)
+static int
+get_inc_key_list(long long instance_version, long long target_version,
+ redisContext *c, struct serial_rule **list,
+ struct log_handle *logger)
{
//Returns all the elements in the sorted set at key with a score that instance_version < score <= redis_version.
//The elements are considered to be ordered from low to high scores(instance_version).
- redisReply *reply = (redisReply *)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld",
- mr_status_sset, instance_version,
- target_version);
+ redisReply *reply =
+ (redisReply *)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld",
+ mr_status_sset, instance_version,
+ target_version);
if (NULL == reply) {
log_fatal(logger, MODULE_REDIS_MONITOR,
"[%s:%d] GET %s failed with a NULL reply, error: %s",
@@ -480,15 +505,15 @@ void maat_set_serial_rule(struct serial_rule *rule, enum maat_operation op,
int maat_get_rm_key_list(redisContext *c, long long instance_version,
long long *new_version, struct table_manager *tbl_mgr,
struct serial_rule **list, int *update_type,
- int cumulative_off, struct log_handle *logger)
+ struct log_handle *logger)
{
int rule_num = 0;
- long long target_version = 0;
struct serial_rule *s_rule_array = NULL;
redisReply *reply = (redisReply *)redisCommand(c, "GET MAAT_VERSION");
if (reply != NULL) {
- if (reply->type == REDIS_REPLY_NIL || reply->type == REDIS_REPLY_ERROR) {
+ if (reply->type == REDIS_REPLY_NIL ||
+ reply->type == REDIS_REPLY_ERROR) {
log_fatal(logger, MODULE_REDIS_MONITOR,
"[%s:%d] GET MAAT_VERSION failed, maybe Redis is busy",
__FUNCTION__, __LINE__);
@@ -531,46 +556,33 @@ int maat_get_rm_key_list(redisContext *c, long long instance_version,
goto FULL_UPDATE;
}
- if (redis_version > instance_version && 1 == cumulative_off) {
- target_version = instance_version;
- } else {
- target_version = redis_version - 1;
+ /* redis_version > instance_version */
+ rule_num = get_inc_key_list(instance_version, redis_version, c,
+ &s_rule_array, logger);
+ if (rule_num < 0) {
+ goto FULL_UPDATE;
}
- do {
- target_version++;
- rule_num = get_inc_key_list(instance_version, target_version,
- c, &s_rule_array, logger);
- if (rule_num > 0) {
- break;
- } else if (rule_num < 0) {
- goto FULL_UPDATE;
- } else {
- //ret=0, nothing to do.
- }
-
- } while (0 == rule_num && target_version <= redis_version && 1 == cumulative_off);
-
if (0 == rule_num) {
log_info(logger, MODULE_REDIS_MONITOR,
"Got nothing after ZRANGEBYSCORE %s (%lld %lld",
- mr_status_sset, instance_version, target_version - 1);
+ mr_status_sset, instance_version, redis_version);
return 0;
}
log_info(logger, MODULE_REDIS_MONITOR,
"Inc Update from instance_version %lld to %lld (%d entries)",
- instance_version, target_version, rule_num);
+ instance_version, redis_version, rule_num);
*list = s_rule_array;
*update_type = MAAT_UPDATE_TYPE_INC;
- *new_version = target_version;
+ *new_version = redis_version;
return rule_num;
FULL_UPDATE:
log_fatal(logger, MODULE_REDIS_MONITOR,
- "Initiate full update from instance_version %lld to %lld",
- instance_version, redis_version);
+ "Initiate full update from instance_version %lld to %lld",
+ instance_version, redis_version);
size_t append_cmd_cnt = 0;
int ret = redisAppendCommand(c, "MULTI");
append_cmd_cnt++;
@@ -641,7 +653,8 @@ FULL_UPDATE:
}
if (tbl_mgr) {
- int table_id = table_manager_get_table_id(tbl_mgr, s_rule_array[full_idx].table_name);
+ int table_id =
+ table_manager_get_table_id(tbl_mgr, s_rule_array[full_idx].table_name);
//Unrecognized table.
if (table_id < 0) {
continue;
@@ -662,16 +675,17 @@ FULL_UPDATE:
return rule_num ;
}
-static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
- int rule_num, int print_fn, struct log_handle *logger)
+static void
+get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
+ int rule_num, int print_fn, struct log_handle *logger)
{
int i = 0;
int j = 0;
UNUSED int ret = 0;
int key_num = 0;
struct serial_rule *s_rule = NULL;
- struct foreign_conts_track *track = ALLOC(struct foreign_conts_track,
- rule_num * MAX_FOREIGN_CLMN_NUM);
+ struct foreign_conts_track *track =
+ ALLOC(struct foreign_conts_track, rule_num * MAX_FOREIGN_CLMN_NUM);
for (i = 0; i < rule_num; i++) {
s_rule = rule_list + i;
@@ -689,7 +703,8 @@ static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
if (ret == -1) {
log_fatal(logger, MODULE_REDIS_MONITOR,
"[%s:%d] Foreign content file %s remove failed",
- __FUNCTION__, __LINE__, rule_list[i].f_keys[j].filename);
+ __FUNCTION__, __LINE__,
+ rule_list[i].f_keys[j].filename);
}
}
} else {
@@ -705,7 +720,8 @@ static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
}
char redis_cmd[256] = {0};
- snprintf(redis_cmd, sizeof(redis_cmd), "GET %s", s_rule->f_keys[j].key);
+ snprintf(redis_cmd, sizeof(redis_cmd), "GET %s",
+ s_rule->f_keys[j].key);
ret = redisAppendCommand(c, redis_cmd);
track[key_num].rule_idx = i;
track[key_num].foreign_idx = j;
@@ -720,8 +736,8 @@ static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
ret = maat_wrap_redis_get_reply(c, &reply);
if (ret == REDIS_ERR) {
log_fatal(logger, MODULE_REDIS_MONITOR,
- "[%s:%d] Get %s,%lld foreign key %s content failed, redis server error",
- __FUNCTION__, __LINE__,
+ "[%s:%d] Get %s,%lld foreign key %s content failed,"
+ " redis server error", __FUNCTION__, __LINE__,
rule_list[track[i].rule_idx].table_name,
rule_list[track[i].rule_idx].rule_id,
rule_list[track[i].rule_idx].f_keys[track[i].foreign_idx].key);
@@ -741,8 +757,9 @@ static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
FILE *fp = fopen(s_rule->f_keys[track[i].foreign_idx].filename, "w");
if (NULL == fp) {
log_fatal(logger, MODULE_REDIS_MONITOR,
- "[%s:%d] Write foreign content failed: fopen %s error", __FUNCTION__,
- __LINE__, s_rule->f_keys[track[i].foreign_idx].filename);
+ "[%s:%d] Write foreign content failed: fopen %s error",
+ __FUNCTION__, __LINE__,
+ s_rule->f_keys[track[i].foreign_idx].filename);
} else {
fwrite(reply->str, 1, reply->len, fp);
fclose(fp);
@@ -762,15 +779,18 @@ static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
return;
}
-void maat_get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
- int rule_num, int print_fn, struct log_handle *logger)
+void maat_get_foreign_conts(redisContext *c,
+ struct serial_rule *rule_list,
+ int rule_num, int print_fn,
+ struct log_handle *logger)
{
int max_redis_batch = 4096;
int success_cnt = 0;
while (success_cnt < rule_num) {
int batch_cnt = MIN(rule_num - success_cnt, max_redis_batch);
- _get_foreign_conts(c, rule_list + success_cnt, batch_cnt, print_fn, logger);
+ get_foreign_conts(c, rule_list + success_cnt, batch_cnt,
+ print_fn, logger);
success_cnt += batch_cnt;
}
}
@@ -806,9 +826,10 @@ void maat_rewrite_table_line_with_foreign(struct serial_rule *s_rule)
for (i = 0; i < s_rule->n_foreign; i++) {
int origin_column_size = 0;
- const char *origin_column = maat_cmd_find_Nth_column(s_rule->table_line,
- s_rule->f_keys[i].column,
- &origin_column_size);
+ const char *origin_column =
+ maat_cmd_find_Nth_column(s_rule->table_line, s_rule->f_keys[i].column,
+ &origin_column_size);
+
strncat(pos_rewrite_line, pos_origin_line, origin_column - pos_origin_line);
pos_rewrite_line += origin_column - pos_origin_line;
pos_origin_line = origin_column+origin_column_size;
@@ -839,8 +860,8 @@ static int redlock_try_lock(redisContext *c, const char *lock_name,
{
int ret = 0;
- redisReply *reply = maat_wrap_redis_command(c, "SET %s locked NX PX %lld",
- lock_name, expire);
+ redisReply *reply =
+ maat_wrap_redis_command(c, "SET %s locked NX PX %lld", lock_name, expire);
if (reply->type == REDIS_REPLY_NIL) {
ret = 0;
} else {
@@ -853,15 +874,17 @@ static int redlock_try_lock(redisContext *c, const char *lock_name,
return ret;
}
-static long long exec_serial_rule_begin(redisContext* c, size_t rule_num,
- size_t renew_rule_num, int *renew_allowed,
- long long *transaction_version)
+static long long
+exec_serial_rule_begin(redisContext* c, size_t rule_num,
+ size_t renew_rule_num, int *renew_allowed,
+ long long *transaction_version)
{
int ret = -1;
redisReply *data_reply = NULL;
if (renew_rule_num > 0) {
- while (0 == redlock_try_lock(c, mr_expire_lock, mr_expire_lock_timeout_ms)) {
+ while (0 == redlock_try_lock(c, mr_expire_lock,
+ mr_expire_lock_timeout_ms)) {
usleep(1000);
}
*renew_allowed = 1;
@@ -903,9 +926,10 @@ const char* lua_exec_done=
"redis.call(\'del\', KEYS[4]);"
"redis.call(\'zadd\', KEYS[3], ARGV[1], maat_version);"
"return maat_version;";
-static redisReply* exec_serial_rule_end(redisContext *c, const char *transaction_list,
- long long server_time, int renew_allowed,
- struct expected_reply *expect_reply, size_t *cnt)
+static redisReply *
+exec_serial_rule_end(redisContext *c, const char *transaction_list,
+ long long server_time, int renew_allowed,
+ struct expected_reply *expect_reply, size_t *cnt)
{
redisReply *data_reply = NULL;
@@ -916,9 +940,10 @@ static redisReply* exec_serial_rule_end(redisContext *c, const char *transaction
}
if (strlen(transaction_list) > 0) {
- data_reply = maat_wrap_redis_command(c, "eval %s 4 MAAT_VERSION %s %s %s %lld",
- lua_exec_done, mr_status_sset, mr_version_sset,
- transaction_list, server_time);
+ data_reply =
+ maat_wrap_redis_command(c, "eval %s 4 MAAT_VERSION %s %s %s %lld",
+ lua_exec_done, mr_status_sset, mr_version_sset,
+ transaction_list, server_time);
freeReplyObject(data_reply);
data_reply = NULL;
expected_reply_add(expect_reply + *cnt, -1, REDIS_REPLY_INTEGER, 0);
@@ -930,10 +955,11 @@ static redisReply* exec_serial_rule_end(redisContext *c, const char *transaction
return data_reply;
}
-static void exec_serial_rule(redisContext *c, const char *transaction_list,
- struct serial_rule *s_rule, size_t rule_num,
- struct expected_reply *expect_reply, size_t *cnt,
- size_t offset, int renew_allowed)
+static void
+exec_serial_rule(redisContext *c, const char *transaction_list,
+ struct serial_rule *s_rule, size_t rule_num,
+ struct expected_reply *expect_reply, size_t *cnt,
+ size_t offset, int renew_allowed)
{
size_t i = 0;
size_t append_cmd_cnt = 0;
@@ -947,7 +973,8 @@ static void exec_serial_rule(redisContext *c, const char *transaction_list,
s_rule[i].table_name,
s_rule[i].rule_id,
s_rule[i].table_line);
- expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_STATUS, 0);
+ expected_reply_add(expect_reply+*cnt, i+offset,
+ REDIS_REPLY_STATUS, 0);
(*cnt)++;
append_cmd_cnt++;
//Allowing add duplicated members for rule id recycling.
@@ -964,8 +991,10 @@ static void exec_serial_rule(redisContext *c, const char *transaction_list,
s_rule[i].timeout,
s_rule[i].table_name,
s_rule[i].rule_id);
- expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1);
- expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0);
+ expected_reply_add(expect_reply+*cnt, i+offset,
+ REDIS_REPLY_INTEGER, 1);
+ expected_reply_add(expect_reply+*cnt, i+offset,
+ REDIS_REPLY_INTEGER, 0);
(*cnt)++;
append_cmd_cnt++;
}
@@ -978,7 +1007,8 @@ static void exec_serial_rule(redisContext *c, const char *transaction_list,
mr_key_prefix[MAAT_OP_DEL],
s_rule[i].table_name,
s_rule[i].rule_id);
- expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_STATUS, 0);
+ expected_reply_add(expect_reply+*cnt, i+offset,
+ REDIS_REPLY_STATUS, 0);
(*cnt)++;
append_cmd_cnt++;
@@ -987,7 +1017,8 @@ static void exec_serial_rule(redisContext *c, const char *transaction_list,
s_rule[i].table_name,
s_rule[i].rule_id,
MAAT_REDIS_SYNC_TIME);
- expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1);
+ expected_reply_add(expect_reply+*cnt, i+offset,
+ REDIS_REPLY_INTEGER, 1);
(*cnt)++;
append_cmd_cnt++;
@@ -1046,8 +1077,8 @@ static void exec_serial_rule(redisContext *c, const char *transaction_list,
}
}
-static int mr_operation_success(redisReply *actual_reply,
- struct expected_reply *expected)
+static int
+mr_operation_success(redisReply *actual_reply, struct expected_reply *expected)
{
if (expected->possible_replies[0].type != actual_reply->type) {
return 0;
@@ -1089,7 +1120,8 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule,
long long transaction_version = 0;
long long transaction_finished_version = 0;
size_t max_multi_cmd_num = MAX_REDIS_OP_PER_SRULE * serial_rule_num + 2; // 2 for operation in exec_serial_rule_end()
- struct expected_reply *expected_reply = ALLOC(struct expected_reply, max_multi_cmd_num);
+ struct expected_reply *expected_reply =
+ ALLOC(struct expected_reply, max_multi_cmd_num);
for (i = 0; i < serial_rule_num; i++) {
if (s_rule[i].op == MAAT_OP_RENEW_TIMEOUT) {
@@ -1097,8 +1129,8 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule,
}
}
- int ret = exec_serial_rule_begin(c, serial_rule_num, renew_num, &renew_allowed,
- &transaction_version);
+ int ret = exec_serial_rule_begin(c, serial_rule_num, renew_num,
+ &renew_allowed, &transaction_version);
//Preconditions for transaction are not satisfied.
if (ret != 0) {
success_cnt = -1;
@@ -1106,8 +1138,8 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule,
}
if (transaction_version > 0) {
- snprintf(transaction_list, sizeof(transaction_list), "MAAT_TRANSACTION_%lld",
- transaction_version);
+ snprintf(transaction_list, sizeof(transaction_list),
+ "MAAT_TRANSACTION_%lld", transaction_version);
}
while (success_cnt < serial_rule_num) {
@@ -1118,8 +1150,9 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule,
success_cnt+=batch_cnt;
}
- transaction_reply = exec_serial_rule_end(c, transaction_list, server_time, renew_allowed,
- expected_reply, &multi_cmd_cnt);
+ transaction_reply = exec_serial_rule_end(c, transaction_list, server_time,
+ renew_allowed, expected_reply,
+ &multi_cmd_cnt);
if (transaction_reply->type != REDIS_REPLY_NIL) {
assert(transaction_reply->elements == multi_cmd_cnt);
for (i = 0; i < multi_cmd_cnt; i++) {
@@ -1135,9 +1168,11 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule,
rule_seq = expected_reply[i].s_rule_seq;
log_fatal(logger, MODULE_REDIS_MONITOR,
- "[%s:%d] %s %s %lld failed, rule id maybe conflict or not exist",
- __FUNCTION__, __LINE__, mr_op_str[s_rule[rule_seq].op],
- s_rule[rule_seq].table_name, s_rule[rule_seq].rule_id);
+ "[%s:%d] %s %s %lld failed, rule id maybe conflict"
+ " or not exist", __FUNCTION__, __LINE__,
+ mr_op_str[s_rule[rule_seq].op],
+ s_rule[rule_seq].table_name,
+ s_rule[rule_seq].rule_id);
success_cnt--;
last_failed = rule_seq;
}
@@ -1146,7 +1181,8 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule,
}
if (transaction_version > 0) {
- transaction_finished_version = maat_read_redis_integer(transaction_reply->element[multi_cmd_cnt - 1]);
+ transaction_finished_version =
+ maat_read_redis_integer(transaction_reply->element[multi_cmd_cnt - 1]);
log_info(logger, MODULE_REDIS_MONITOR,
"Redis transaction MAAT_PRE_VER = %lld , MAAT_VERSION = %lld",
transaction_version, transaction_finished_version);
@@ -1176,7 +1212,8 @@ error_out:
return success_cnt;
}
-static void cleanup_update_status(redisContext *c, struct log_handle *logger)
+static void
+cleanup_update_status(redisContext *c, struct log_handle *logger)
{
long long version_upper_bound = 0;
long long version_lower_bound = 0;
@@ -1225,21 +1262,24 @@ static void cleanup_update_status(redisContext *c, struct log_handle *logger)
}
version_lower_bound = maat_read_redis_integer(sub_reply->element[0]);
- version_upper_bound = maat_read_redis_integer(sub_reply->element[sub_reply->elements-1]);
+ version_upper_bound =
+ maat_read_redis_integer(sub_reply->element[sub_reply->elements-1]);
+
freeReplyObject(reply);
reply = NULL;
//To deal with maat_version reset to 0, do NOT use -inf as lower bound intentionally.
reply = maat_wrap_redis_command(c, "ZREMRANGEBYSCORE %s %lld %lld",
- mr_status_sset, version_lower_bound,
- version_upper_bound);
+ mr_status_sset, version_lower_bound,
+ version_upper_bound);
entry_num = maat_read_redis_integer(reply);
freeReplyObject(reply);
reply = NULL;
log_info(logger, MODULE_REDIS_MONITOR,
- "Clean up update status from version %lld to %lld (%lld versions, %lld entries)",
- version_lower_bound, version_upper_bound, version_num, entry_num);
+ "Clean up update status from version %lld to %lld "
+ "(%lld versions, %lld entries)", version_lower_bound,
+ version_upper_bound, version_num, entry_num);
return;
error_out:
@@ -1247,7 +1287,8 @@ error_out:
reply = NULL;
}
-static void check_maat_expiration(redisContext *c, struct log_handle *logger)
+static void
+check_maat_expiration(redisContext *c, struct log_handle *logger)
{
UNUSED int ret = 0;
@@ -1256,9 +1297,11 @@ static void check_maat_expiration(redisContext *c, struct log_handle *logger)
return;
}
- redisReply *data_reply= maat_wrap_redis_command(c, "ZRANGEBYSCORE %s -inf %lld",
- mr_expire_sset, server_time);
- if (data_reply->type != REDIS_REPLY_ARRAY || 0 == data_reply->elements) {
+ redisReply *data_reply =
+ maat_wrap_redis_command(c, "ZRANGEBYSCORE %s -inf %lld",
+ mr_expire_sset, server_time);
+ if (data_reply->type != REDIS_REPLY_ARRAY ||
+ 0 == data_reply->elements) {
freeReplyObject(data_reply);
data_reply = NULL;
return;
@@ -1276,10 +1319,12 @@ static void check_maat_expiration(redisContext *c, struct log_handle *logger)
freeReplyObject(data_reply);
data_reply = NULL;
- int success_cnt = maat_cmd_write_rule(c, s_rule, s_rule_num, server_time, logger);
+ int success_cnt = maat_cmd_write_rule(c, s_rule, s_rule_num,
+ server_time, logger);
if (success_cnt < 0) {
- log_fatal(logger, MODULE_REDIS_MONITOR, "[%s:%d] maat_cmd_write_rule failed.",
- __FUNCTION__, __LINE__);
+ log_fatal(logger, MODULE_REDIS_MONITOR,
+ "[%s:%d] maat_cmd_write_rule failed.",
+ __FUNCTION__, __LINE__);
} else if (success_cnt == (int)s_rule_num) {
log_info(logger, MODULE_REDIS_MONITOR,
"Succesfully expired %zu rules in Redis", s_rule_num);
@@ -1348,7 +1393,6 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
int rule_num = maat_get_rm_key_list(mr_ctx->read_ctx, version,
&new_version, maat_inst->tbl_mgr,
&rule_list, &update_type,
- maat_inst->opts.cumulative_update_off,
maat_inst->logger);
//redis communication error
if (rule_num < 0) {
@@ -1370,8 +1414,8 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
redisFree(mr_ctx->read_ctx);
mr_ctx->read_ctx = NULL;
log_fatal(maat_inst->logger, MODULE_REDIS_MONITOR,
- "[%s:%d] Get Redis value failed, abandon update and close connection",
- __FUNCTION__, __LINE__);
+ "[%s:%d] Get Redis value failed, abandon update"
+ " and close connection", __FUNCTION__, __LINE__);
goto clean_up;
}
@@ -1383,7 +1427,8 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
if (empty_value_num == rule_num) {
log_info(maat_inst->logger, MODULE_REDIS_MONITOR,
- "All %d rules are empty, abandon update", empty_value_num);
+ "All %d rules are empty, abandon update",
+ empty_value_num);
goto clean_up;
}
@@ -1406,7 +1451,8 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
continue;
}
- table_id = table_manager_get_table_id(maat_inst->tbl_mgr, rule_list[i].table_name);
+ table_id = table_manager_get_table_id(maat_inst->tbl_mgr,
+ rule_list[i].table_name);
//Unrecognized table.
if (table_id < 0) {
no_table_num++;
@@ -1414,7 +1460,8 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
}
if (rule_list[i].op == MAAT_OP_DEL) {
- valid_column = table_manager_get_valid_column(maat_inst->tbl_mgr, table_id);
+ valid_column = table_manager_get_valid_column(maat_inst->tbl_mgr,
+ table_id);
ret = validate_line(rule_list[i].table_line, valid_column);
if (ret < 0) {
log_fatal(maat_inst->logger, MODULE_REDIS_MONITOR,
@@ -1436,8 +1483,9 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
if (call_update_num < rule_num) {
log_fatal(maat_inst->logger, MODULE_REDIS_MONITOR,
- "[%s:%d] Load %d entries to match engine, no table: %d, empty value: %d",
- __FUNCTION__, __LINE__, call_update_num, no_table_num, empty_value_num);
+ "[%s:%d] Load %d entries to match engine, "
+ "no table: %d, empty value: %d", __FUNCTION__, __LINE__,
+ call_update_num, no_table_num, empty_value_num);
}
clean_up: