summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ci/travis.sh1
-rw-r--r--cmake/Package.cmake3
-rw-r--r--common/include/utils.h1
-rw-r--r--common/libavl/libavl.c23
-rw-r--r--common/src/log.cpp12
-rw-r--r--conf/CMakeLists.txt2
-rw-r--r--conf/shaping.conf (renamed from conf/main.conf)7
-rw-r--r--conf/zlog.conf2
-rw-r--r--shaping/CMakeLists.txt1
-rw-r--r--shaping/include/shaper.h26
-rw-r--r--shaping/include/shaper_marsio.h5
-rw-r--r--shaping/include/shaper_stat.h58
-rw-r--r--shaping/src/main.cpp19
-rw-r--r--shaping/src/shaper.cpp297
-rw-r--r--shaping/src/shaper_maat.cpp6
-rw-r--r--shaping/src/shaper_marsio.cpp21
-rw-r--r--shaping/src/shaper_session.cpp2
-rw-r--r--shaping/src/shaper_stat.cpp314
-rw-r--r--shaping/src/shaper_swarmkv.cpp1
-rw-r--r--shaping/test/gtest_shaper.cpp358
-rw-r--r--shaping/test/gtest_shaper_bak.cpp234
-rw-r--r--shaping/test/stub.cpp9
-rw-r--r--shaping/test/stub.h5
-rw-r--r--shaping/test/test_conf/shaping.conf (renamed from shaping/test/test_conf/main.conf)8
-rw-r--r--shaping/test/test_conf/zlog.conf2
25 files changed, 702 insertions, 715 deletions
diff --git a/ci/travis.sh b/ci/travis.sh
index 9e62eec..cd5333b 100644
--- a/ci/travis.sh
+++ b/ci/travis.sh
@@ -38,6 +38,7 @@ yum install -y systemd-devel
yum install -y tsg_master-devel
yum install -y framework_env
yum install -y mrzcpd
+yum install -y libfieldstat3-devel
yum install -y libmaat4-devel
yum install -y libswarmkv-devel
yum install -y libMESA_handle_logger-devel
diff --git a/cmake/Package.cmake b/cmake/Package.cmake
index 3eb5d64..32411a5 100644
--- a/cmake/Package.cmake
+++ b/cmake/Package.cmake
@@ -31,7 +31,8 @@ else()
endif()
# setup %config(noreplace)
-set(CPACK_RPM_USER_FILELIST "%config(noreplace) ${CMAKE_INSTALL_PREFIX}/conf/main.conf"
+set(CPACK_RPM_USER_FILELIST "%config(noreplace) ${CMAKE_INSTALL_PREFIX}/conf/shaping.conf"
+ "%config(noreplace) ${CMAKE_INSTALL_PREFIX}/conf/shaping_maat.json"
"%config(noreplace) ${CMAKE_INSTALL_PREFIX}/conf/zlog.conf"
"%config(noreplace) ${CMAKE_INSTALL_PREFIX}/conf/table_info.json")
diff --git a/common/include/utils.h b/common/include/utils.h
index a94dde3..a42cf02 100644
--- a/common/include/utils.h
+++ b/common/include/utils.h
@@ -10,6 +10,7 @@ extern "C"
#define LOG_TAG_SHAPING "SHAPING"
#define LOG_TAG_SWARMKV "SWARMKV"
+#define LOG_TAG_STAT "STAT"
#define LOG_TAG_MAAT "MAAT"
#define LOG_TAG_MARSIO "MARSIO"
#define LOG_TAG_UTILS "UTILS"
diff --git a/common/libavl/libavl.c b/common/libavl/libavl.c
index 8d439a1..7bfeee2 100644
--- a/common/libavl/libavl.c
+++ b/common/libavl/libavl.c
@@ -1,14 +1,14 @@
+#include <assert.h>
#include <stdlib.h>
#include "libavl.h"
#include "avl_tree.h"
-#define AVL_NODE_IN_TREE 0x01
struct avl_node {
unsigned long long unique_key;
void *data;
void(*free_cb) (void *);
- unsigned int flag;
+ unsigned int ref_count;
struct avl_tree_node node;
};
@@ -131,7 +131,7 @@ int avl_node_in_tree(struct avl_node* pnode)
return 0;
}
- if (pnode->flag & AVL_NODE_IN_TREE) {
+ if (pnode->ref_count > 0) {
return 1;
}
@@ -162,6 +162,11 @@ int avl_tree_node_insert(struct avl_tree *tree, struct avl_node* pnode)
return -1;
}
+ if (avl_node_in_tree(pnode)) {
+ pnode->ref_count++;
+ return 0;
+ }
+
if (tree->curr_node_num == tree->max_node_num) {
return -1;
}
@@ -171,7 +176,7 @@ int avl_tree_node_insert(struct avl_tree *tree, struct avl_node* pnode)
return -1;
}
- pnode->flag |= AVL_NODE_IN_TREE;
+ pnode->ref_count = 1;
tree->curr_node_num++;
return 0;
@@ -179,12 +184,18 @@ int avl_tree_node_insert(struct avl_tree *tree, struct avl_node* pnode)
void avl_tree_node_remove(struct avl_tree *tree, struct avl_node* pnode)
{
- if (!pnode || !(pnode->flag & AVL_NODE_IN_TREE)) {
+ if (!pnode || pnode->ref_count == 0) {
+ return;
+ }
+
+ pnode->ref_count--;
+ assert(pnode->ref_count >= 0);
+
+ if (pnode->ref_count > 0) {
return;
}
avl_tree_remove(&tree->root, &pnode->node);
- pnode->flag &= ~AVL_NODE_IN_TREE;
tree->curr_node_num--;
return;
}
diff --git a/common/src/log.cpp b/common/src/log.cpp
index 290fbd9..af4794e 100644
--- a/common/src/log.cpp
+++ b/common/src/log.cpp
@@ -6,13 +6,13 @@ void *g_default_logger = NULL;
// return -1 : error
int LOG_INIT(const char *profile)
{
- // if (0 != MESA_handle_runtime_log_creation(profile))
- // {
- // fprintf(stderr, "FATAL: unable to create runtime logger\n");
- // return -1;
- // }
+ if (0 != MESA_handle_runtime_log_creation(profile))
+ {
+ fprintf(stderr, "FATAL: unable to create runtime logger\n");
+ return -1;
+ }
- g_default_logger = MESA_create_runtime_log_handle("./log/shaping", RLOG_LV_DEBUG);
+ g_default_logger = MESA_create_runtime_log_handle("log/shaping", RLOG_LV_DEBUG);
if (g_default_logger == NULL)
{
fprintf(stderr, "FATAL: unable to create log handle\n");
diff --git a/conf/CMakeLists.txt b/conf/CMakeLists.txt
index b278878..ddc2429 100644
--- a/conf/CMakeLists.txt
+++ b/conf/CMakeLists.txt
@@ -1,4 +1,4 @@
-install(FILES main.conf DESTINATION conf COMPONENT Profile)
+install(FILES shaping.conf DESTINATION conf COMPONENT Profile)
install(FILES shaping_maat.json DESTINATION conf COMPONENT Profile)
install(FILES table_info.json DESTINATION conf COMPONENT Profile)
install(FILES zlog.conf DESTINATION conf COMPONENT Profile) \ No newline at end of file
diff --git a/conf/main.conf b/conf/shaping.conf
index e134a70..5a8adec 100644
--- a/conf/main.conf
+++ b/conf/shaping.conf
@@ -28,9 +28,10 @@ SWARMKV_CLUSTER_ANNOUNCE_PORT=8501
SWARMKV_HEALTH_CHECK_PORT=0
SWARMKV_HEALTH_CHECK_ANNOUNCE_PORT=1111
-#[METRIC]
-#TELEGRAF_IP="127.0.0.1"
-#TELEGRAF_PORT=6667
+[METRIC]
+FIELDSTAT_OUTPUT_INTERVAL_MS=500
+LINE_PROTOCOL_SERVER_IP="127.0.0.1"
+LINE_PROTOCOL_SERVER_PORT=6667
[CONFIG]
#PROFILE_QUEUE_LEN_PER_PRIORITY_MAX=128
diff --git a/conf/zlog.conf b/conf/zlog.conf
index a7a988d..6ce69f4 100644
--- a/conf/zlog.conf
+++ b/conf/zlog.conf
@@ -7,4 +7,4 @@ INFO=20
FATAL=30
[rules]
-shaping.fatal "./log/shaping.log.%d(%F)";
+log_shaping.fatal "./log/shaping.log.%d(%F)";
diff --git a/shaping/CMakeLists.txt b/shaping/CMakeLists.txt
index e27398f..0ecbc19 100644
--- a/shaping/CMakeLists.txt
+++ b/shaping/CMakeLists.txt
@@ -4,6 +4,7 @@ target_link_libraries(shaper PUBLIC avl_tree)
target_link_libraries(shaper PUBLIC cjson)
target_link_libraries(shaper PUBLIC MESA_handle_logger)
target_link_libraries(shaper PUBLIC MESA_prof_load)
+target_link_libraries(shaper PUBLIC fieldstat3)
target_link_libraries(shaper PUBLIC pthread)
target_include_directories(shaper PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include)
add_executable(shaping_engine src/main.cpp)
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h
index 6140dad..83c1343 100644
--- a/shaping/include/shaper.h
+++ b/shaping/include/shaper.h
@@ -17,17 +17,15 @@
#define SHAPING_WROK_THREAD_NUM_MAX 128
-#define SHAPING_GLOBAL_CONF_FILE "./conf/main.conf"
+#define SHAPING_GLOBAL_CONF_FILE "./conf/shaping.conf"
-struct shaping_global_conf {
- char telegraf_ip[16];
- short telegraf_port;
+struct shaping_system_conf {
unsigned int session_queue_len_max;
unsigned int priority_queue_len_max;
int polling_node_num_max[SHAPING_PRIORITY_NUM_MAX];
int work_thread_num;
int cpu_affinity_enable;
- int cpu_affinity_mask[SHAPING_WROK_THREAD_NUM_MAX];
+ unsigned long long cpu_affinity_mask;
};
struct shaping_thread_ctx {
@@ -41,7 +39,6 @@ struct shaping_thread_ctx {
struct shaping_maat_info *maat_info;
struct session_table *session_table;
int session_need_reset;
- int cpu_mask;
unsigned int session_queue_len_max;
int polling_node_num_max[SHAPING_PRIORITY_NUM_MAX];
};
@@ -51,6 +48,7 @@ struct shaping_ctx {
struct swarmkv *swarmkv_db;//handle of swarmkv
struct shaping_maat_info *maat_info;
struct shaping_marsio_info *marsio_info;
+ struct shaping_stat *stat;
struct shaping_thread_ctx *thread_ctx;
};
@@ -111,9 +109,6 @@ struct shaper_flow_instance {
struct shaper;//instance of shaping, thread unsafe
-//extern struct shaping_global_runtime_para g_rt_para;
-//extern struct shaping_global_conf g_sp_conf;
-
struct shaping_flow* shaping_flow_new();
void shaping_flow_free(struct shaping_flow *sf);
struct shaper* shaper_new(unsigned int priority_queue_len_max);
@@ -122,20 +117,19 @@ void shaper_free(struct shaper *sp);
bool shaper_queue_empty(struct shaping_flow *sf);
void shaper_packet_dequeue(struct shaping_flow *sf);
struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf);
-void shaper_queue_clear(struct shaping_flow *sf, struct shaping_stat_data **stat_hashtbl, struct shaping_thread_ctx *ctx);
+void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx);
/*return value: 0 for success, -1 for failed*/
-int shaper_flow_push(struct shaping_flow *sf, struct shaper *sp, struct shaping_stat_data **stat_hashtbl, unsigned long long enqueue_time);
+int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time);
/*return num of sf_ins*/
-void shaper_flow_pop(struct shaper *sp, struct shaping_flow *sf, struct shaping_stat_data **stat_hashtbl);
+void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf);
int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num);
-enum shaping_packet_action shaper_pkt_action_decide(struct swarmkv *db, struct shaping_flow *sf, struct shaper *sp,
- int priority, struct shaping_stat_data **stat_hashtbl, int sf_in_queue);
+//enum shaping_packet_action shaper_pkt_action_decide(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority, int sf_in_queue);
-int shaper_global_conf_init(struct shaping_global_conf *conf);
+int shaper_global_conf_init(struct shaping_system_conf *conf);
void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx);
-void shaping_stream_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf);
+void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf);
struct shaping_ctx *shaping_engine_init();
void shaping_engine_destroy(struct shaping_ctx *ctx);
diff --git a/shaping/include/shaper_marsio.h b/shaping/include/shaper_marsio.h
index f06f306..d0de401 100644
--- a/shaping/include/shaper_marsio.h
+++ b/shaping/include/shaper_marsio.h
@@ -1,11 +1,14 @@
#include <marsio.h>
#include "shaper.h"
+#define SHAPER_MARSIO_RX_BRUST_MAX 128
+
struct shaping_marsio_info
{
struct mr_instance *instance;
struct mr_vdev *mr_dev;
struct mr_sendpath *mr_path;
+ int rx_brust_max;
};
struct metadata
@@ -35,7 +38,7 @@ struct ctrl_pkt_data
int shaping_rule_num;
};
-struct shaping_marsio_info* shaper_marsio_init(int thread_num);
+struct shaping_marsio_info* shaper_marsio_init(struct shaping_system_conf *sys_conf);
void shaper_marsio_destroy(struct shaping_marsio_info *marsio_info);
int shaper_marsio_pkt_metadata_get(marsio_buff_t *rx_buff, struct metadata *meta, int is_ctrl_buff, struct raw_pkt_parser *raw_parser);
int shaper_marsio_ctrl_pkt_data_parse(struct ctrl_pkt_data *ctrl_data, const char *data, size_t length); \ No newline at end of file
diff --git a/shaping/include/shaper_stat.h b/shaping/include/shaper_stat.h
index 93183f8..30d869e 100644
--- a/shaping/include/shaper_stat.h
+++ b/shaping/include/shaper_stat.h
@@ -1,9 +1,9 @@
#include <netinet/in.h>
#include "uthash.h"
+#include <MESA/fieldstat.h>
+#define SHAPER_STAT_COLUMN_NUM_MAX 20
struct shaping_stat_data_dir {
- unsigned long long rx_pkts;
- unsigned long long rx_bytes;
unsigned long long tx_pkts;
unsigned long long tx_bytes;
unsigned long long drop_pkts;
@@ -27,22 +27,52 @@ struct shaping_stat_data {
UT_hash_handle hh;
};
+#if 0
struct shaping_stat {
int sock_fd;
struct sockaddr_in sock_addr;
struct timespec update_time;
struct shaping_stat_data *stat_hashtbl;
};
+#endif
-void shaper_stat_send_free(struct shaping_stat *stat);
-void shaper_stat_send(struct shaping_stat *stat, struct shaping_stat_data **stat_hashtbl);
-struct shaping_stat* shaper_stat_new(char *telegraf_ip, short telegraf_port);
-
-void shaper_stat_queueing_pkt_inc(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, int priority, unsigned char direction, int pkt_len, int profile_type);
-void shaper_stat_queueing_pkt_dec(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, int priority, unsigned char direction, int pkt_len, int profile_type);
-void shaper_stat_forward_inc(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, int priority, unsigned char direction, int pkt_len, int profile_type);
-void shaper_stat_forward_all_rule_inc(struct shaping_stat_data **stat_hashtbl, struct shaping_flow *sf, unsigned char direction, int pkt_len);
-void shaper_stat_drop_inc(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, int priority, unsigned char direction, int pkt_len);
-void shaper_stat_queueing_session_inc(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, int priority, int profile_type);
-void shaper_stat_queueing_session_dec(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, int priority, int profile_type);
-void shaper_stat_max_latency_update(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, int priority, unsigned char direction, unsigned long long latency, int profile_type); \ No newline at end of file
+enum shaping_stat_tags_index {
+ TAG_RULE_ID_IDX = 0,
+ TAG_PROFILE_ID_IDX,
+ TAG_PRIORITY_IDX,
+ TAG_PROFILE_TYPE_IDX,
+ TAG_IDX_MAX
+};
+
+enum shaping_stat_column_index {
+ QUEUEING_SESSIONS_IDX = 0,
+ IN_MAX_LATENCY_IDX,
+ IN_QUEUE_LEN_IDX,
+ OUT_MAX_LATENCY_IDX,
+ OUT_QUEUE_LEN_IDX,
+ IN_PKTS_IDX,
+ IN_BYTES_IDX,
+ IN_DROP_PKTS_IDX,
+ OUT_PKTS_IDX,
+ OUT_BYTES_IDX,
+ OUT_DROP_PKTS_IDX,
+ STAT_COLUNM_IDX_MAX
+};
+
+struct shaping_stat {
+ struct fieldstat_dynamic_instance *instance;
+ int table_id;
+ unsigned int column_ids[STAT_COLUNM_IDX_MAX];
+};
+
+void shaper_stat_destroy(struct shaping_stat *stat);
+struct shaping_stat* shaper_stat_init(int thread_num);
+
+void shaper_stat_queueing_pkt_inc(struct shaping_stat *stat, int rule_id, int profile_id, int priority, unsigned char direction, int pkt_len, int profile_type, int thread_id);
+void shaper_stat_queueing_pkt_dec(struct shaping_stat *stat, int rule_id, int profile_id, int priority, unsigned char direction, int pkt_len, int profile_type, int thread_id);
+void shaper_stat_forward_inc(struct shaping_stat *stat, int rule_id, int profile_id, int priority, unsigned char direction, int pkt_len, int profile_type, int thread_id);
+void shaper_stat_forward_all_rule_inc(struct shaping_stat *stat, struct shaping_flow *sf, unsigned char direction, int pkt_len, int thread_id);
+void shaper_stat_drop_inc(struct shaping_stat *stat, int rule_id, int profile_id, int priority, unsigned char direction, int pkt_len, int thread_id);
+void shaper_stat_queueing_session_inc(struct shaping_stat *stat, int rule_id, int profile_id, int priority, int profile_type, int thread_id);
+void shaper_stat_queueing_session_dec(struct shaping_stat *stat, int rule_id, int profile_id, int priority, int profile_type, int thread_id);
+void shaper_stat_max_latency_update(struct shaping_stat *stat, int rule_id, int profile_id, int priority, unsigned char direction, unsigned long long latency, int profile_type, int thread_id); \ No newline at end of file
diff --git a/shaping/src/main.cpp b/shaping/src/main.cpp
index 16f0aa8..476816f 100644
--- a/shaping/src/main.cpp
+++ b/shaping/src/main.cpp
@@ -8,29 +8,10 @@
#include "shaper_marsio.h"
#include "shaper_session.h"
-static int thread_set_affinity(int core_id)
-{
- int num_cores = sysconf(_SC_NPROCESSORS_ONLN);
- if (core_id < 0 || core_id >= num_cores)
- {
- return EINVAL;
- }
-
- cpu_set_t cpuset;
- CPU_ZERO(&cpuset);
- CPU_SET(core_id, &cpuset);
-
- return pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
-}
-
static void *shaper_thread_loop(void *data)
{
struct shaping_thread_ctx *ctx = (struct shaping_thread_ctx *)data;
- if (ctx->cpu_mask >= 0)
- {
- thread_set_affinity(ctx->cpu_mask);
- }
marsio_thread_init(ctx->marsio_info->instance);
//loop to process pkts
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index cdc3657..6090a15 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -1,7 +1,4 @@
-#include "log.h"
-#include "session_table.h"
#include <MESA/swarmkv.h>
-#include <cstring>
#include <marsio.h>
#include <cjson/cJSON.h>
#include <MESA/MESA_prof_load.h>
@@ -13,6 +10,8 @@
extern "C" {
#include "libavl.h"
}
+#include "log.h"
+#include "session_table.h"
#include "addr_tuple4.h"
#include "raw_packet.h"
#include "shaper.h"
@@ -42,6 +41,11 @@ struct shaping_async_cb_arg {
unsigned char direction;
};
+struct shaping_profile_container {
+ struct shaping_profile_info *pf_info;
+ int pf_type;
+};
+
struct shaper* shaper_new(unsigned int priority_queue_len_max)
{
struct shaper *sp = NULL;
@@ -186,18 +190,19 @@ void shaper_packet_dequeue(struct shaping_flow *sf)
return;
}
-void shaper_queue_clear(struct shaping_flow *sf, struct shaping_stat_data **stat_hashtbl, struct shaping_thread_ctx *ctx)
+void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx)
{
struct shaping_packet_wrapper *pkt_wrapper;
+ struct shaping_stat *stat = ctx->stat;
struct shaping_rule_info *rule = &sf->matched_rule_infos[0];
while (!shaper_queue_empty(sf)) {
pkt_wrapper = shaper_first_pkt_get(sf);
- shaper_stat_queueing_pkt_dec(stat_hashtbl, rule->id, rule->primary.id, rule->primary.priority,
- pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY);
- shaper_stat_drop_inc(stat_hashtbl, rule->id, rule->primary.id, rule->primary.priority,
- pkt_wrapper->direction, pkt_wrapper->length);
+ shaper_stat_queueing_pkt_dec(stat, rule->id, rule->primary.id, rule->primary.priority,
+ pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
+ shaper_stat_drop_inc(stat, rule->id, rule->primary.id, rule->primary.priority,
+ pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index);
marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index);
shaper_packet_dequeue(sf);
@@ -207,11 +212,11 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_stat_data **stat
}
//return success(0) while any avl tree insert success
-int shaper_flow_push(struct shaping_flow *sf, struct shaper *sp,
- struct shaping_stat_data **stat_hashtbl, unsigned long long enqueue_time)
+int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time)
{
struct shaping_node *s_node = (struct shaping_node*)sf;
struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor];
+ struct shaper *sp = ctx->sp;
struct shaping_packet_wrapper *pkt_wrapper = NULL;
int priority;
int ret = -1;
@@ -224,9 +229,9 @@ int shaper_flow_push(struct shaping_flow *sf, struct shaper *sp,
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
- shaper_stat_queueing_pkt_inc(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id,
- priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY);
- shaper_stat_queueing_session_inc(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY);
+ shaper_stat_queueing_pkt_inc(ctx->stat, s_rule_info->id, s_rule_info->primary.id,
+ priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
+ shaper_stat_queueing_session_inc(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
s_rule_info->primary.enqueue_time_us = enqueue_time;
}
@@ -238,9 +243,9 @@ int shaper_flow_push(struct shaping_flow *sf, struct shaper *sp,
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
- shaper_stat_queueing_pkt_inc(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id,
- priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW);
- shaper_stat_queueing_session_inc(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW);
+ shaper_stat_queueing_pkt_inc(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id,
+ priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index);
+ shaper_stat_queueing_session_inc(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index);
s_rule_info->borrowing[i].enqueue_time_us = enqueue_time;
}
}
@@ -258,10 +263,11 @@ static unsigned long long shaper_pkt_latency_calculate(struct shaping_profile_in
return (curr_time - enqueue_time);
}
-static void shaping_flow_remove_from_pool(struct shaping_flow *sf, struct shaper *sp, struct shaping_stat_data **stat_hashtbl)
+void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
{
struct shaping_node *s_node = (struct shaping_node*)sf;
struct shaping_rule_info *s_rule_info = &sf->matched_rule_infos[sf->anchor];
+ struct shaper *sp = ctx->sp;
struct shaping_packet_wrapper *pkt_wrapper = NULL;
struct timespec curr_time;
unsigned long long latency;
@@ -276,13 +282,13 @@ static void shaping_flow_remove_from_pool(struct shaping_flow *sf, struct shaper
priority = s_rule_info->primary.priority;
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
- shaper_stat_queueing_pkt_dec(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id,
- priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY);
- shaper_stat_queueing_session_dec(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY);
+ shaper_stat_queueing_pkt_dec(ctx->stat, s_rule_info->id, s_rule_info->primary.id,
+ priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
+ shaper_stat_queueing_session_dec(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
latency = shaper_pkt_latency_calculate(&s_rule_info->primary, &curr_time);
- shaper_stat_max_latency_update(stat_hashtbl, s_rule_info->id, s_rule_info->primary.id,
- priority, pkt_wrapper->direction, latency, SHAPING_PROFILE_TYPE_PRIMARY);
+ shaper_stat_max_latency_update(ctx->stat, s_rule_info->id, s_rule_info->primary.id,
+ priority, pkt_wrapper->direction, latency, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
}
if (s_rule_info->borrowing_num == 0) {
@@ -293,13 +299,13 @@ static void shaping_flow_remove_from_pool(struct shaping_flow *sf, struct shaper
priority = s_rule_info->borrowing[i].priority;
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
- shaper_stat_queueing_pkt_dec(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id,
- priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW);
- shaper_stat_queueing_session_dec(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW);
+ shaper_stat_queueing_pkt_dec(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id,
+ priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index);
+ shaper_stat_queueing_session_dec(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index);
- latency = shaper_pkt_latency_calculate(&s_rule_info->primary, &curr_time);
- shaper_stat_max_latency_update(stat_hashtbl, s_rule_info->id, s_rule_info->borrowing[i].id,
- priority, pkt_wrapper->direction, latency, SHAPING_PROFILE_TYPE_BORROW);
+ latency = shaper_pkt_latency_calculate(&s_rule_info->borrowing[i], &curr_time);
+ shaper_stat_max_latency_update(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id,
+ priority, pkt_wrapper->direction, latency, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index);
}
}
@@ -331,13 +337,6 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i
return count;
}
-void shaper_flow_pop(struct shaper *sp, struct shaping_flow *sf, struct shaping_stat_data **stat_hashtbl)
-{
- shaping_flow_remove_from_pool(sf, sp, stat_hashtbl);
-
- return;
-}
-
static void shaper_deposit_token_add(struct shaping_profile_info *pf_info, int req_token, unsigned char direction)
{
if (direction == SHAPING_DIR_IN) {
@@ -486,23 +485,43 @@ enum shaping_packet_action shaper_pkt_action_decide(struct shaping_flow *sf, str
}
#endif
-static struct shaping_profile_info * shaper_profile_get(struct shaping_rule_info *s_rule_info, int priority, int *profile_type)
+int shaper_profile_get(struct shaping_rule_info *s_rule_info, int priority, struct shaping_profile_container pf_container[])
{
- int i;
+ int num = 0;
- if (s_rule_info->primary.priority == priority) {
- *profile_type = SHAPING_PROFILE_TYPE_PRIMARY;
- return &s_rule_info->primary;
- }
+ if (priority == SHAPING_PRIORITY_NUM_MAX - 1) {//priority 9 allow multi profiles for one priority
+ if (s_rule_info->primary.priority == priority) {
+ pf_container[num].pf_type = SHAPING_PROFILE_TYPE_PRIMARY;
+ pf_container[num].pf_info = &s_rule_info->primary;
+ num++;
+ }
- for (i = 0; i < s_rule_info->borrowing_num; i++) {
- if (s_rule_info->borrowing[i].priority == priority) {
- *profile_type = SHAPING_PROFILE_TYPE_BORROW;
- return &s_rule_info->borrowing[i];
+ for (int i = 0; i < s_rule_info->borrowing_num; i++) {
+ if (s_rule_info->borrowing[i].priority == priority) {
+ pf_container[num].pf_type = SHAPING_PROFILE_TYPE_BORROW;
+ pf_container[num].pf_info = &s_rule_info->borrowing[i];
+ num++;
+ }
+ }
+
+ return num;
+ } else {
+ if (s_rule_info->primary.priority == priority) {
+ pf_container[0].pf_type = SHAPING_PROFILE_TYPE_PRIMARY;
+ pf_container[0].pf_info = &s_rule_info->primary;
+ return 1;
+ }
+
+ for (int i = 0; i < s_rule_info->borrowing_num; i++) {
+ if (s_rule_info->borrowing[i].priority == priority) {
+ pf_container[0].pf_type = SHAPING_PROFILE_TYPE_BORROW;
+ pf_container[0].pf_info = &s_rule_info->borrowing[i];
+ return 1;
+ }
}
}
- return NULL;
+ return num;
}
static int shaper_next_anchor_get(struct shaping_flow *sf, unsigned char direction)
@@ -516,37 +535,85 @@ static int shaper_next_anchor_get(struct shaping_flow *sf, unsigned char directi
return anchor;
}
-enum shaping_packet_action shaper_pkt_action_decide(struct swarmkv *db, struct shaping_flow *sf, struct shaper *sp,
- int priority, struct shaping_stat_data **stat_hashtbl, int sf_in_queue)
+static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int priority)
{
- int profile_type = 0;
- struct shaping_profile_info *profile = NULL;
struct shaping_rule_info *rule = NULL;
+ struct shaping_profile_info *profile = NULL;
+ int profile_type;
struct shaping_packet_wrapper *pkt_wrapper = NULL;
+ struct shaping_profile_container pf_container[SHAPING_PRIORITY_NUM_MAX];
struct timespec curr_time;
unsigned long long enqueue_time;
+ int get_token_success = 0;
+ int profile_num;
rule = &sf->matched_rule_infos[sf->anchor];
- profile = shaper_profile_get(rule, priority, &profile_type);
- assert(profile != NULL);
+ profile_num = shaper_profile_get(rule, priority, pf_container);
+ assert(profile_num > 0);
pkt_wrapper = shaper_first_pkt_get(sf);
assert(pkt_wrapper != NULL);
if (pkt_wrapper->tcp_pure_contorl) {
- if (sf_in_queue) {
- shaper_flow_pop(sp, sf, stat_hashtbl);
- }
- shaper_stat_forward_all_rule_inc(stat_hashtbl, sf, pkt_wrapper->direction, pkt_wrapper->length);
+ shaper_flow_pop(ctx, sf);
+ shaper_stat_forward_all_rule_inc(ctx->stat, sf, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index);
return SHAPING_FORWARD;
}
- if (0 == shaper_token_consume(db, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction)) {
- shaper_stat_forward_inc(stat_hashtbl, rule->id, profile->id, profile->priority,
- pkt_wrapper->direction, pkt_wrapper->length, profile_type);
-
- if (sf_in_queue) {
- shaper_flow_pop(sp, sf, stat_hashtbl);
+ for (int i = 0; i < profile_num; i++) {
+ profile = pf_container[i].pf_info;
+ profile_type = pf_container[i].pf_type;
+ if (0 == shaper_token_consume(ctx->swarmkv_db, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction)) {
+ shaper_stat_forward_inc(ctx->stat, rule->id, profile->id, profile->priority,
+ pkt_wrapper->direction, pkt_wrapper->length, profile_type, ctx->thread_index);
+ get_token_success = 1;
+ break;
}
+ }
+
+ if (!get_token_success) {
+ return SHAPING_QUEUED;
+ }
+
+ shaper_flow_pop(ctx, sf);
+ sf->anchor = shaper_next_anchor_get(sf, pkt_wrapper->direction);
+ if (sf->anchor == 0) {//no next rule
+ return SHAPING_FORWARD;
+ }
+
+ //push sf for next rule
+ clock_gettime(CLOCK_MONOTONIC, &curr_time);
+ enqueue_time = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
+ if (0 == shaper_flow_push(ctx, sf, enqueue_time)) {
+ return SHAPING_QUEUED;
+ } else {
+ rule = &sf->matched_rule_infos[sf->anchor];
+ shaper_stat_drop_inc(ctx->stat, rule->id, rule->primary.id,
+ rule->primary.priority, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index);
+ sf->anchor = 0;
+ return SHAPING_DROP;
+ }
+}
+
+static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile)
+{
+ int profile_type = SHAPING_PROFILE_TYPE_PRIMARY;
+ struct shaping_rule_info *rule = NULL;
+ struct shaping_packet_wrapper *pkt_wrapper = NULL;
+ struct timespec curr_time;
+ unsigned long long enqueue_time;
+
+ rule = &sf->matched_rule_infos[sf->anchor];
+ pkt_wrapper = shaper_first_pkt_get(sf);
+ assert(pkt_wrapper != NULL);
+
+ if (pkt_wrapper->tcp_pure_contorl) {
+ shaper_stat_forward_all_rule_inc(ctx->stat, sf, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index);
+ return SHAPING_FORWARD;
+ }
+
+ if (0 == shaper_token_consume(ctx->swarmkv_db, sf, pkt_wrapper->length, profile, profile_type, pkt_wrapper->direction)) {
+ shaper_stat_forward_inc(ctx->stat, rule->id, profile->id, profile->priority,
+ pkt_wrapper->direction, pkt_wrapper->length, profile_type, ctx->thread_index);
sf->anchor = shaper_next_anchor_get(sf, pkt_wrapper->direction);
if (sf->anchor == 0) {//no next rule
@@ -557,29 +624,24 @@ enum shaping_packet_action shaper_pkt_action_decide(struct swarmkv *db, struct s
goto FLOW_PUSH;
}
} else {
- if (sf_in_queue) {
- return SHAPING_QUEUED;
- } else {
- enqueue_time = pkt_wrapper->enqueue_time_us;
- goto FLOW_PUSH;
- }
+ enqueue_time = pkt_wrapper->enqueue_time_us;
+ goto FLOW_PUSH;
}
FLOW_PUSH:
- if (0 == shaper_flow_push(sf, sp, stat_hashtbl, enqueue_time)) {
+ if (0 == shaper_flow_push(ctx, sf, enqueue_time)) {
return SHAPING_QUEUED;
} else {
rule = &sf->matched_rule_infos[sf->anchor];
- shaper_stat_drop_inc(stat_hashtbl, rule->id, rule->primary.id,
- rule->primary.priority, pkt_wrapper->direction, pkt_wrapper->length);
-
+ shaper_stat_drop_inc(ctx->stat, rule->id, rule->primary.id,
+ rule->primary.priority, pkt_wrapper->direction, pkt_wrapper->length, ctx->thread_index);
sf->anchor = 0;
return SHAPING_DROP;
}
}
static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_flow *sf, int priority,
- struct shaping_stat_data **stat_hashtbl, struct shaping_thread_ctx *ctx)
+ struct shaping_stat *stat, struct shaping_thread_ctx *ctx)
{
struct shaping_packet_wrapper *pkt_wrapper;
struct shaping_rule_info *rule = NULL;
@@ -600,7 +662,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
shaping_ret = shaper_pkt_action_decide(g_swarmkv_db, sf, sp, priority, stat_hashtbl, 1);
}
#endif
- shaping_ret = shaper_pkt_action_decide(ctx->swarmkv_db, sf, sp, priority, stat_hashtbl, 1);
+ shaping_ret = shaper_pkt_action_decide_queueing(ctx, sf, priority);
switch (shaping_ret) {
case SHAPING_QUEUED:
@@ -632,16 +694,16 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
pkt_wrapper = shaper_first_pkt_get(sf);
sf->anchor = 0;
- if (0 == shaper_flow_push(sf, sp, stat_hashtbl, pkt_wrapper->enqueue_time_us)) {
+ if (0 == shaper_flow_push(ctx, sf, pkt_wrapper->enqueue_time_us)) {
/*in pkt process, when queue not empty,
new pkt's queueing stat was added to primary profile of first rule.
while shaper_flow_push() here will add queueing stat to every profile of first rule,
so need adjust queueing stat here*/
rule = &sf->matched_rule_infos[sf->anchor];
- shaper_stat_queueing_pkt_dec(stat_hashtbl, rule->id, rule->primary.id, rule->primary.priority,
- pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY);
+ shaper_stat_queueing_pkt_dec(stat, rule->id, rule->primary.id, rule->primary.priority,
+ pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
} else {
- shaper_queue_clear(sf, stat_hashtbl, ctx);//first packet fail, then every packet will fail
+ shaper_queue_clear(sf, ctx);//first packet fail, then every packet will fail
if (sf->flag & STREAM_CLOSE) {
shaping_flow_free(sf);
}
@@ -650,12 +712,10 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
}
}
-void shaping_stream_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf)
+void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct shaping_flow *sf)
{
- int priority;
int shaping_ret;
struct shaping_rule_info *s_rule;
- struct shaper *sp = ctx->sp;
struct shaping_stat *stat = ctx->stat;
struct shaping_marsio_info *marsio_info = ctx->marsio_info;
struct timespec curr_time;
@@ -664,19 +724,18 @@ void shaping_stream_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
if (!shaper_queue_empty(sf)) {//already have queueing pkt, enqueue directly
s_rule = &sf->matched_rule_infos[0];
if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, &curr_time, meta)) {
- shaper_stat_queueing_pkt_inc(&stat->stat_hashtbl, s_rule->id,
+ shaper_stat_queueing_pkt_inc(stat, s_rule->id,
s_rule->primary.id, s_rule->primary.priority, meta->dir, meta->raw_len,
- SHAPING_PROFILE_TYPE_PRIMARY);
+ SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
} else {
- shaper_stat_drop_inc(&stat->stat_hashtbl, s_rule->id,
- s_rule->primary.id, s_rule->primary.priority, meta->dir, meta->raw_len);
+ shaper_stat_drop_inc(stat, s_rule->id, s_rule->primary.id, s_rule->primary.priority, meta->dir, meta->raw_len, ctx->thread_index);
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
}
} else {
if (meta->is_tcp_pure_ctrl) {
marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1);
- shaper_stat_forward_all_rule_inc(&stat->stat_hashtbl, sf, meta->dir, meta->raw_len);
+ shaper_stat_forward_all_rule_inc(stat, sf, meta->dir, meta->raw_len, ctx->thread_index);
goto JUDGE_CLOSE;//for tcp pure control pkt, transmit it directly
}
@@ -688,9 +747,7 @@ void shaping_stream_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
sf->anchor = 0;
- priority = sf->matched_rule_infos[sf->anchor].primary.priority;
- shaping_ret = shaper_pkt_action_decide(ctx->swarmkv_db, sf, sp, priority,
- &stat->stat_hashtbl, 0);
+ shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, &sf->matched_rule_infos[sf->anchor].primary);
switch (shaping_ret) {
case SHAPING_QUEUED:
break;
@@ -716,8 +773,6 @@ JUDGE_CLOSE:
}
}
- shaper_stat_send(stat, &stat->stat_hashtbl);
-
return;
}
@@ -734,16 +789,13 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_
}
for (int j = 0; j < sf_num; j++) {
- ret = shaper_polling_first_pkt_token_get(sp, sf_ins[j].sf, sf_ins[j].priority, &stat->stat_hashtbl, ctx);
+ ret = shaper_polling_first_pkt_token_get(sp, sf_ins[j].sf, sf_ins[j].priority, stat, ctx);
if (ret == 0) {
- goto STAT_DATA_SEND;
+ return;
}
}
}
-STAT_DATA_SEND:
- shaper_stat_send(stat, &stat->stat_hashtbl);
-
return;
}
@@ -811,40 +863,44 @@ static struct shaping_flow *shaper_raw_pkt_session_handle(struct shaping_thread_
void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx)
{
- marsio_buff_t *rx_buff;
+ marsio_buff_t *rx_buff[SHAPER_MARSIO_RX_BRUST_MAX];
struct shaping_flow *sf = NULL;
struct metadata meta;
int rx_num;
+ int i;
- rx_num = marsio_recv_burst(ctx->marsio_info->mr_dev, ctx->thread_index, &rx_buff, 1);
+ rx_num = marsio_recv_burst(ctx->marsio_info->mr_dev, ctx->thread_index, rx_buff, ctx->marsio_info->rx_brust_max);
if (rx_num <= 0) {
polling_entry(ctx->sp, ctx->stat, ctx);
return;
}
- if (marsio_buff_is_ctrlbuf(rx_buff)) {
- sf = shaper_ctrl_pkt_session_handle(ctx, rx_buff, &meta);
- } else {
- sf = shaper_raw_pkt_session_handle(ctx, rx_buff, &meta);
- }
+ for (i = 0; i < rx_num; i++) {
+ if (marsio_buff_is_ctrlbuf(rx_buff[i])) {
+ sf = shaper_ctrl_pkt_session_handle(ctx, rx_buff[i], &meta);
+ } else {
+ sf = shaper_raw_pkt_session_handle(ctx, rx_buff[i], &meta);
+ }
- if (meta.is_ctrl_pkt || !sf) {//ctrl pkt need send directly
- marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &rx_buff, 1);
- } else {
- shaping_stream_process(ctx, rx_buff, &meta, sf);
+ if (meta.is_ctrl_pkt || !sf) {//ctrl pkt need send directly
+ marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &rx_buff[i], 1);
+ } else {
+ shaping_packet_process(ctx, rx_buff[i], &meta, sf);
+ }
+ polling_entry(ctx->sp, ctx->stat, ctx);
}
- polling_entry(ctx->sp, ctx->stat, ctx);
return;
}
-int shaper_global_conf_init(struct shaping_global_conf *conf)
+int shaper_global_conf_init(struct shaping_system_conf *conf)
{
int ret;
int array_num;
cJSON *json = NULL;
cJSON *tmp_obj = NULL, *tmp_array_obj = NULL;
char polling_node_num_max[128] = {0};
+ unsigned int cpu_mask[SHAPING_WROK_THREAD_NUM_MAX] = {0};
ret = MESA_load_profile_int_nodef(SHAPING_GLOBAL_CONF_FILE, "SYSTEM", "WORK_THREAD_NUM", &conf->work_thread_num);
if (ret < 0) {
@@ -859,11 +915,14 @@ int shaper_global_conf_init(struct shaping_global_conf *conf)
return ret;
}
- ret = MESA_load_profile_uint_range(SHAPING_GLOBAL_CONF_FILE, "SYSTEM", "CPU_AFFINITY_MASK", SHAPING_WROK_THREAD_NUM_MAX, (unsigned int *)conf->cpu_affinity_mask);
+ ret = MESA_load_profile_uint_range(SHAPING_GLOBAL_CONF_FILE, "SYSTEM", "CPU_AFFINITY_MASK", SHAPING_WROK_THREAD_NUM_MAX, cpu_mask);
if (ret < 0 || ret != conf->work_thread_num) {
LOG_ERROR("%s: shaping init global conf get CPU_AFFINITY_MASK failed or incomplete config", LOG_TAG_SHAPING);
return -1;
}
+ for (int i = 0; i < conf->work_thread_num; i++) {
+ conf->cpu_affinity_mask |= 1 << cpu_mask[i];
+ }
#if 0 //temporarily not support array config
array_num = SHAPING_PRIORITY_NUM_MAX;
@@ -921,10 +980,6 @@ int shaper_global_conf_init(struct shaping_global_conf *conf)
}
/*************************************************************************/
-
- MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "TELEGRAF_IP", conf->telegraf_ip, sizeof(conf->telegraf_ip), "127.0.0.1");
- MESA_load_profile_short_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "TELEGRAF_PORT", &conf->telegraf_port, 6379);
-
MESA_load_profile_uint_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "SESSION_QUEUE_LEN_MAX", &conf->session_queue_len_max, 128);
MESA_load_profile_uint_def(SHAPING_GLOBAL_CONF_FILE, "CONFIG", "PRIORITY_QUEUE_LEN_MAX", &conf->priority_queue_len_max, 1024);
@@ -945,11 +1000,11 @@ void shaping_engine_destroy(struct shaping_ctx *ctx)
shaper_swarmkv_destroy(ctx->swarmkv_db);
shaper_maat_destroy(ctx->maat_info);
shaper_marsio_destroy(ctx->marsio_info);
+ shaper_stat_destroy(ctx->stat);
if (ctx->thread_ctx) {
for (int i = 0; i < ctx->thread_num; i++) {
shaper_free(ctx->thread_ctx[i].sp);
- shaper_stat_send_free(ctx->thread_ctx[i].stat);
session_table_destory(ctx->thread_ctx[i].session_table);
}
free(ctx->thread_ctx);
@@ -963,7 +1018,7 @@ void shaping_engine_destroy(struct shaping_ctx *ctx)
struct shaping_ctx *shaping_engine_init()
{
- struct shaping_global_conf conf;
+ struct shaping_system_conf conf;
struct shaping_ctx *ctx = NULL;
int ret;
@@ -994,22 +1049,26 @@ struct shaping_ctx *shaping_engine_init()
}
/*init marsio*/
- ctx->marsio_info = shaper_marsio_init(conf.work_thread_num);
+ ctx->marsio_info = shaper_marsio_init(&conf);
if (ctx->marsio_info == NULL) {
goto ERROR;
}
+
+ ctx->stat = shaper_stat_init(conf.work_thread_num);
+ if (ctx->stat == NULL) {
+ goto ERROR;
+ }
ctx->thread_ctx = (struct shaping_thread_ctx *)calloc(conf.work_thread_num, sizeof(struct shaping_thread_ctx));
ctx->thread_num = conf.work_thread_num;
for (int i = 0; i < conf.work_thread_num; i++) {
ctx->thread_ctx[i].thread_index = i;
ctx->thread_ctx[i].sp = shaper_new(conf.priority_queue_len_max);
- ctx->thread_ctx[i].stat = shaper_stat_new(conf.telegraf_ip, conf.telegraf_port);
+ ctx->thread_ctx[i].stat = ctx->stat;
ctx->thread_ctx[i].session_table = session_table_create();
ctx->thread_ctx[i].maat_info = ctx->maat_info;
ctx->thread_ctx[i].marsio_info = ctx->marsio_info;
ctx->thread_ctx[i].swarmkv_db = ctx->swarmkv_db;
- ctx->thread_ctx[i].cpu_mask = conf.cpu_affinity_enable ? conf.cpu_affinity_mask[i] : -1;
ctx->thread_ctx[i].ref_ctx = ctx;
ctx->thread_ctx[i].session_queue_len_max = conf.session_queue_len_max;
memcpy(ctx->thread_ctx[i].polling_node_num_max, conf.polling_node_num_max, sizeof(conf.polling_node_num_max));
diff --git a/shaping/src/shaper_maat.cpp b/shaping/src/shaper_maat.cpp
index 7d62314..1849987 100644
--- a/shaping/src/shaper_maat.cpp
+++ b/shaping/src/shaper_maat.cpp
@@ -283,12 +283,12 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_ru
goto END;
}
- if (s_rule->priority + i + 1 < SHAPING_PRIORITY_NUM_MAX) {//TODO: 优先级大于9的都按9处理
+ if (s_rule->priority + i + 1 < SHAPING_PRIORITY_NUM_MAX) {
shaper_profile_update(&s_rule_info->borrowing[i], s_pf, s_rule->priority + i + 1);
- s_rule_info->borrowing_num++;
} else {
- goto END;
+ shaper_profile_update(&s_rule_info->borrowing[i], s_pf, SHAPING_PRIORITY_NUM_MAX - 1);
}
+ s_rule_info->borrowing_num++;
}
END:
diff --git a/shaping/src/shaper_marsio.cpp b/shaping/src/shaper_marsio.cpp
index 3ea774d..cbc5d09 100644
--- a/shaping/src/shaper_marsio.cpp
+++ b/shaping/src/shaper_marsio.cpp
@@ -1,6 +1,7 @@
#include <MESA_prof_load.h>
#include <MESA/MESA_handle_logger.h>
#include <cjson/cJSON.h>
+#include <marsio.h>
#include "log.h"
#include "raw_packet.h"
@@ -10,6 +11,7 @@
struct shaper_marsio_config
{
+ int rx_brust_max;
char app_symbol[256];
char dev_interface[256];
};
@@ -29,6 +31,9 @@ static int shaper_marsio_config_load(struct shaper_marsio_config *conf)
LOG_ERROR("%s: shaping load MARSIO conf APP_SYMBOL failed", LOG_TAG_MARSIO);
return ret;
}
+
+ ret = MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "SYSTEM", "RX_BRUST_MAX", &conf->rx_brust_max, 1);
+ conf->rx_brust_max = conf->rx_brust_max <= SHAPER_MARSIO_RX_BRUST_MAX ? conf->rx_brust_max : SHAPER_MARSIO_RX_BRUST_MAX;
return 0;
}
@@ -56,7 +61,7 @@ void shaper_marsio_destroy(struct shaping_marsio_info *marsio_info)
return;
}
-struct shaping_marsio_info* shaper_marsio_init(int thread_num)
+struct shaping_marsio_info* shaper_marsio_init(struct shaping_system_conf *sys_conf)
{
struct shaper_marsio_config conf;
struct shaping_marsio_info *marsio_info;
@@ -75,18 +80,24 @@ struct shaping_marsio_info* shaper_marsio_init(int thread_num)
goto ERROR;
}
- if (marsio_option_set(marsio_info->instance, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)) != 0)
- {
+ if (marsio_option_set(marsio_info->instance, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)) != 0) {
LOG_ERROR("%s: shaping marsio set MARSIO_OPT_EXIT_WHEN_ERR failed", LOG_TAG_MARSIO);
goto ERROR;
}
+ if (sys_conf->cpu_affinity_enable) {
+ if (marsio_option_set(marsio_info->instance, MARSIO_OPT_THREAD_MASK, &sys_conf->cpu_affinity_mask, sizeof(sys_conf->cpu_affinity_mask)) != 0) {
+ LOG_ERROR("%s: shaping marsio set MARSIO_OPT_THREAD_MASK failed", LOG_TAG_MARSIO);
+ goto ERROR;
+ }
+ }
+
if (marsio_init(marsio_info->instance, conf.app_symbol) != 0) {
LOG_ERROR("%s: shaping marsio init failed", LOG_TAG_MARSIO);
goto ERROR;
}
- marsio_info->mr_dev = marsio_open_device(marsio_info->instance, conf.dev_interface, thread_num, thread_num);
+ marsio_info->mr_dev = marsio_open_device(marsio_info->instance, conf.dev_interface, sys_conf->work_thread_num, sys_conf->work_thread_num);
if (!marsio_info->mr_dev) {
LOG_ERROR("%s: shaping marsio open device %s failed", LOG_TAG_MARSIO, conf.dev_interface);
goto ERROR;
@@ -98,6 +109,8 @@ struct shaping_marsio_info* shaper_marsio_init(int thread_num)
goto ERROR;
}
+ marsio_info->rx_brust_max = conf.rx_brust_max;
+
return marsio_info;
ERROR:
diff --git a/shaping/src/shaper_session.cpp b/shaping/src/shaper_session.cpp
index badcea9..6194a90 100644
--- a/shaping/src/shaper_session.cpp
+++ b/shaping/src/shaper_session.cpp
@@ -80,7 +80,7 @@ void shaper_session_data_free_cb(void *session_data, void *data)
struct shaping_thread_ctx *ctx = (struct shaping_thread_ctx *)data;
if (sf) {
- shaper_queue_clear(sf, &ctx->stat->stat_hashtbl, ctx);
+ shaper_queue_clear(sf, ctx);
shaping_flow_free(sf);
}
diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp
index 9ed1669..6c9644b 100644
--- a/shaping/src/shaper_stat.cpp
+++ b/shaping/src/shaper_stat.cpp
@@ -2,297 +2,231 @@
#include <time.h>
#include <sys/socket.h>
#include <arpa/inet.h>
+#include <MESA/MESA_prof_load.h>
+#include <fieldstat.h>
-#include <MESA/stream.h>
-
+#include "log.h"
+#include "utils.h"
#include "shaper.h"
#include "shaper_stat.h"
+struct shaper_stat_conf {
+ int enable_backgroud_thread;
+ int output_interval_ms;
+ char telegraf_ip[16];
+ short telegraf_port;
+};
-#define SHAPING_STAT_SEND_INTERVAL_SEC 1 //unit: second
-#define SHAPING_STAT_SEND_INTERVAL_NS 500000000 //unit: nano second
-
-#define SHAPING_STAT_FORMAT "SHAPING-STAT,rule_id=%d,profile_id=%d,priority=%d,profile_type=%s "\
- "queueing_sessions=%d,in_rx_pkts=%llu,in_rx_bytes=%llu,"\
- "in_tx_pkts=%llu,in_tx_bytes=%llu,in_drop_pkts=%llu,in_max_latency_us=%llu,in_queue_len=%lld,"\
- "out_rx_pkts=%llu,out_rx_bytes=%llu,out_tx_pkts=%llu,out_tx_bytes=%llu,"\
- "out_drop_pkts=%llu,out_max_latency_us=%llu,out_queue_len=%lld"
-
-static void shaper_stat_counter_clear(struct shaping_stat_data *s)
-{
- long long in_queue_len, out_queue_len;
- struct shaping_stat_data_dir *in = &s->incoming;
- struct shaping_stat_data_dir *out = &s->outgoing;
-
- in_queue_len = in->queue_len;//queue_len is gauge metric, do not clear
- out_queue_len = out->queue_len;
-
- memset(in, 0, sizeof(struct shaping_stat_data_dir));
- memset(out, 0, sizeof(struct shaping_stat_data_dir));
-
- in->queue_len = in_queue_len;
- out->queue_len = out_queue_len;
-
- return;
-}
-
-static void shaper_stat_data_send(struct shaping_stat *stat, struct shaping_stat_data *s)
-{
- char buf[1024];
- struct shaping_stat_data_dir *in = &s->incoming;
- struct shaping_stat_data_dir *out = &s->outgoing;
-
- snprintf(buf, sizeof(buf), SHAPING_STAT_FORMAT, s->key.rule_id, s->key.profile_id, s->key.priority,
- s->key.profile_type == SHAPING_PROFILE_TYPE_PRIMARY ? "primary" : "borrow",
- s->queueing_session_num, in->rx_pkts, in->rx_bytes, in->tx_pkts, in->tx_bytes,
- in->drop_pkts, in->max_latency, in->queue_len, out->rx_pkts, out->rx_bytes, out->tx_pkts, out->tx_bytes,
- out->drop_pkts, out->max_latency, out->queue_len);
-
- sendto(stat->sock_fd, buf, strlen(buf), 0, (struct sockaddr*)&stat->sock_addr, sizeof(stat->sock_addr));
-
- shaper_stat_counter_clear(s);
- return;
-}
+thread_local struct fieldstat_tag tags[TAG_IDX_MAX];
-static void shaper_stat_data_send_free(struct shaping_stat *stat, struct shaping_stat_data **stat_hashtbl)
+void shaper_stat_destroy(struct shaping_stat *stat)
{
- struct shaping_stat_data *s, *tmp = NULL;
-
- if (!stat || !*stat_hashtbl) {
+ if (!stat) {
return;
}
- HASH_ITER(hh, *stat_hashtbl, s, tmp) {
- shaper_stat_data_send(stat, s);
- HASH_DEL(*stat_hashtbl, s);
- free(s);
+ if (stat->instance) {
+ fieldstat_dynamic_instance_free(stat->instance);
}
+ free(stat);
return;
}
-void shaper_stat_send_free(struct shaping_stat *stat)
+static int shaper_stat_conf_load(struct shaper_stat_conf *conf)
{
- if (!stat) {
- return;
- }
+ memset(conf, 0, sizeof(struct shaper_stat_conf));
- if (stat->stat_hashtbl) {
- shaper_stat_data_send_free(stat, &stat->stat_hashtbl);
- }
- free(stat);
+ MESA_load_profile_string_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "TELEGRAF_IP", conf->telegraf_ip, sizeof(conf->telegraf_ip), "127.0.0.1");
+ MESA_load_profile_short_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "TELEGRAF_PORT", &conf->telegraf_port, 6379);
+ MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_OUTPUT_INTERVAL_MS", &conf->output_interval_ms, 500);
+ MESA_load_profile_int_def(SHAPING_GLOBAL_CONF_FILE, "METRIC", "FIELDSTAT_ENABLE_BACKGRUND_THREAD", &conf->enable_backgroud_thread, 1);
- return;
+ return 0;
}
-void shaper_stat_send(struct shaping_stat *stat, struct shaping_stat_data **stat_hashtbl)
+struct shaping_stat* shaper_stat_init(int thread_num)
{
- struct shaping_stat_data *s, *tmp = NULL;
- struct timespec curr_time;
-
- if (!stat || !*stat_hashtbl) {
- return;
+ struct shaping_stat *stat = NULL;
+ int column_num;
+ struct shaper_stat_conf conf;
+ const char *column_name[] = {"queueing_sessions", "in_max_latency_us", "in_queue_len", "out_max_latency_us", "out_queue_len", //first line is gauge, second line is counter
+ "in_pkts", "in_bytes", "in_drop_pkts", "out_pkts", "out_bytes", "out_drop_pkts"};
+ enum field_type column_type[] = {FIELD_TYPE_GAUGE, FIELD_TYPE_GAUGE, FIELD_TYPE_GAUGE, FIELD_TYPE_GAUGE, FIELD_TYPE_GAUGE,
+ FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER};
+
+ column_num = sizeof(column_name)/sizeof(column_name[0]);
+ if (column_num != STAT_COLUNM_IDX_MAX) {
+ LOG_ERROR("%s: shaping init fieldstat failed, column_num %d != index num %d", LOG_TAG_STAT, column_num, STAT_COLUNM_IDX_MAX);
+ goto ERROR;
}
- clock_gettime(CLOCK_MONOTONIC, &curr_time);
- if (curr_time.tv_sec - stat->update_time.tv_sec >= SHAPING_STAT_SEND_INTERVAL_SEC||
- curr_time.tv_nsec - stat->update_time.tv_nsec >= SHAPING_STAT_SEND_INTERVAL_NS) {
- stat->update_time = curr_time;
- HASH_ITER(hh, *stat_hashtbl, s, tmp) {
- shaper_stat_data_send(stat, s);
- }
+ if (shaper_stat_conf_load(&conf) != 0) {
+ LOG_ERROR("%s: shaping init metric conf failed", LOG_TAG_STAT);
+ goto ERROR;
}
- return;
-}
+ stat = (struct shaping_stat *)calloc(1, sizeof(struct shaping_stat));
-struct shaping_stat* shaper_stat_new(char *telegraf_ip, short telegraf_port)
-{
- struct shaping_stat *stat = NULL;
- struct timespec curr_time;
+ stat->instance = fieldstat_dynamic_instance_new("shaping_engine", thread_num);
+ if (stat->instance == NULL) {
+ LOG_ERROR("%s: shaping init fieldstat instance failed", LOG_TAG_STAT);
+ goto ERROR;
+ }
- clock_gettime(CLOCK_MONOTONIC, &curr_time);
+ fieldstat_dynamic_set_output_interval(stat->instance, conf.output_interval_ms);
+ fieldstat_dynamic_set_line_protocol_server(stat->instance, conf.telegraf_ip, conf.telegraf_port);
+ if (conf.enable_backgroud_thread == 0) {
+ fieldstat_dynamic_disable_background_thread(stat->instance);
+ }
- stat = (struct shaping_stat *)calloc(1, sizeof(struct shaping_stat));
+ stat->table_id = fieldstat_register_dynamic_table(stat->instance, "shaping_metric", column_name, column_type, column_num, stat->column_ids);
+ if (stat->table_id < 0) {
+ LOG_ERROR("%s: shaping fieldstat register table failed", LOG_TAG_STAT);
+ goto ERROR;
+ }
+
+ tags[TAG_RULE_ID_IDX].key = "rule_id";
+ tags[TAG_RULE_ID_IDX].value_type = 0;
+ tags[TAG_PROFILE_ID_IDX].key = "profile_id";
+ tags[TAG_PROFILE_ID_IDX].value_type = 0;
+ tags[TAG_PRIORITY_IDX].key = "priority";
+ tags[TAG_PRIORITY_IDX].value_type = 0;
+ tags[TAG_PROFILE_TYPE_IDX].key = "profile_type";
+ tags[TAG_PROFILE_TYPE_IDX].value_type = 2;
- stat->sock_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
- stat->sock_addr.sin_family = AF_INET;
- stat->sock_addr.sin_port = htons(telegraf_port);
- stat->sock_addr.sin_addr.s_addr = inet_addr(telegraf_ip);
- stat->update_time = curr_time;
+ fieldstat_dynamic_instance_start(stat->instance);
return stat;
+
+ERROR:
+ if (stat) {
+ if (stat->instance) {
+ fieldstat_dynamic_instance_free(stat->instance);
+ }
+ free(stat);
+ }
+ return NULL;
}
-static struct shaping_stat_data *shaper_stat_ins_get(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, int priority, int profile_type)
+static void shaper_stat_tags_build(int rule_id, int profile_id, int priority, int profile_type)
{
- struct shaping_stat_data *s_stat_data = NULL;
- struct shaping_stat_data_key key;
- memset(&key, 0, sizeof(key));//important for uthash opration
- key.rule_id = rule_id;
- key.profile_id = profile_id;
- key.priority = priority;
- key.profile_type = profile_type;
+ tags[TAG_RULE_ID_IDX].value_int = rule_id;
- HASH_FIND(hh, *stat_hashtbl, &key, sizeof(struct shaping_stat_data_key), s_stat_data);
- if (!s_stat_data) {
- s_stat_data = (struct shaping_stat_data *)calloc(1, sizeof(struct shaping_stat_data));
+ tags[TAG_PROFILE_ID_IDX].value_int = profile_id;
- memcpy(&s_stat_data->key, &key, sizeof(key));
+ tags[TAG_PRIORITY_IDX].value_int = priority;
- HASH_ADD(hh, *stat_hashtbl, key, sizeof(struct shaping_stat_data_key), s_stat_data);
+ if (profile_type == SHAPING_PROFILE_TYPE_PRIMARY) {
+ tags[TAG_PROFILE_TYPE_IDX].value_str = "primary";
+ } else {
+ tags[TAG_PROFILE_TYPE_IDX].value_str = "borrow";
}
- return s_stat_data;
+ return;
}
-void shaper_stat_drop_inc(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id,
- int priority, unsigned char direction, int pkt_len)
+void shaper_stat_drop_inc(struct shaping_stat *stat, int rule_id, int profile_id,
+ int priority, unsigned char direction, int pkt_len, int thread_id)
{
- struct shaping_stat_data *s_stat_data = NULL;
-
- s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, SHAPING_PROFILE_TYPE_PRIMARY);
+ shaper_stat_tags_build(rule_id, profile_id, priority, SHAPING_PROFILE_TYPE_PRIMARY);
if (direction == SHAPING_DIR_IN) {
- s_stat_data->incoming.drop_pkts++;
- s_stat_data->incoming.rx_pkts++;
- s_stat_data->incoming.rx_bytes += pkt_len;
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_DROP_PKTS_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id);
} else {
- s_stat_data->outgoing.drop_pkts++;
- s_stat_data->outgoing.rx_pkts++;
- s_stat_data->outgoing.rx_bytes += pkt_len;
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_DROP_PKTS_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id);
}
return;
}
-void shaper_stat_forward_inc(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id,
- int priority, unsigned char direction, int pkt_len, int profile_type)
+void shaper_stat_forward_inc(struct shaping_stat *stat, int rule_id, int profile_id,
+ int priority, unsigned char direction, int pkt_len, int profile_type, int thread_id)
{
- struct shaping_stat_data *s_stat_data = NULL;
-
- s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, profile_type);
+ shaper_stat_tags_build(rule_id, profile_id, priority, profile_type);
if (direction == SHAPING_DIR_IN) {
- s_stat_data->incoming.tx_pkts++;
- s_stat_data->incoming.tx_bytes += pkt_len;
- s_stat_data->incoming.rx_pkts++;
- s_stat_data->incoming.rx_bytes += pkt_len;
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_PKTS_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id);
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_BYTES_IDX], "shaping_metric_row", pkt_len, tags, TAG_IDX_MAX, thread_id);
} else {
- s_stat_data->outgoing.tx_pkts++;
- s_stat_data->outgoing.tx_bytes += pkt_len;
- s_stat_data->outgoing.rx_pkts++;
- s_stat_data->outgoing.rx_bytes += pkt_len;
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_PKTS_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id);
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_BYTES_IDX], "shaping_metric_row", pkt_len, tags, TAG_IDX_MAX, thread_id);
}
return;
}
-void shaper_stat_forward_all_rule_inc(struct shaping_stat_data **stat_hashtbl, struct shaping_flow *sf, unsigned char direction, int pkt_len)
+void shaper_stat_forward_all_rule_inc(struct shaping_stat *stat, struct shaping_flow *sf, unsigned char direction, int pkt_len, int thread_id)
{
struct shaping_rule_info *rule;
int i;
for (i = 0; i < sf->rule_num; i++) {
rule = &sf->matched_rule_infos[i];
- shaper_stat_forward_inc(stat_hashtbl, rule->id, rule->primary.id, rule->primary.priority, direction, pkt_len, SHAPING_PROFILE_TYPE_PRIMARY);
+ shaper_stat_forward_inc(stat, rule->id, rule->primary.id, rule->primary.priority, direction, pkt_len, SHAPING_PROFILE_TYPE_PRIMARY, thread_id);
}
return;
}
-void shaper_stat_queueing_session_inc(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, int priority, int profile_type)
+void shaper_stat_queueing_session_inc(struct shaping_stat *stat, int rule_id, int profile_id, int priority, int profile_type, int thread_id)
{
- struct shaping_stat_data *s_stat_data = NULL;
-
- s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, profile_type);
-
- s_stat_data->queueing_session_num++;
+ shaper_stat_tags_build(rule_id, profile_id, priority, profile_type);
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[QUEUEING_SESSIONS_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id);
return;
}
-void shaper_stat_queueing_session_dec(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id, int priority, int profile_type)
+void shaper_stat_queueing_session_dec(struct shaping_stat *stat, int rule_id, int profile_id, int priority, int profile_type, int thread_id)
{
- struct shaping_stat_data *s_stat_data = NULL;
-
- s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, profile_type);
-
- s_stat_data->queueing_session_num--;
+ shaper_stat_tags_build(rule_id, profile_id, priority, profile_type);
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[QUEUEING_SESSIONS_IDX], "shaping_metric_row", -1, tags, TAG_IDX_MAX, thread_id);
return;
}
-void shaper_stat_queueing_pkt_inc(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id,
- int priority, unsigned char direction, int pkt_len, int profile_type)
+void shaper_stat_queueing_pkt_inc(struct shaping_stat *stat, int rule_id, int profile_id,
+ int priority, unsigned char direction, int pkt_len, int profile_type, int thread_id)
{
- struct shaping_stat_data *s_stat_data = NULL;
-
- s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, profile_type);
-
+ shaper_stat_tags_build(rule_id, profile_id, priority, profile_type);
if (direction == SHAPING_DIR_IN) {
- s_stat_data->incoming.rx_pkts++;
- s_stat_data->incoming.rx_bytes += pkt_len;
- s_stat_data->incoming.queue_len++;
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_QUEUE_LEN_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id);
} else {
- s_stat_data->outgoing.rx_pkts++;
- s_stat_data->outgoing.rx_bytes += pkt_len;
- s_stat_data->outgoing.queue_len++;
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], "shaping_metric_row", 1, tags, TAG_IDX_MAX, thread_id);
}
return;
}
-void shaper_stat_queueing_pkt_dec(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id,
- int priority, unsigned char direction, int pkt_len, int profile_type)
+void shaper_stat_queueing_pkt_dec(struct shaping_stat *stat, int rule_id, int profile_id,
+ int priority, unsigned char direction, int pkt_len, int profile_type, int thread_id)
{
- struct shaping_stat_data *s_stat_data = NULL;
-
- s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, profile_type);
-
+ shaper_stat_tags_build(rule_id, profile_id, priority, profile_type);
if (direction == SHAPING_DIR_IN) {
- s_stat_data->incoming.rx_pkts--;
- s_stat_data->incoming.rx_bytes -= pkt_len;
- s_stat_data->incoming.queue_len--;
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_QUEUE_LEN_IDX], "shaping_metric_row", -1, tags, TAG_IDX_MAX, thread_id);
} else {
- s_stat_data->outgoing.rx_pkts--;
- s_stat_data->outgoing.rx_bytes -= pkt_len;
- s_stat_data->outgoing.queue_len--;
+ fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], "shaping_metric_row", -1, tags, TAG_IDX_MAX, thread_id);
}
return;
}
-void shaper_stat_max_latency_update(struct shaping_stat_data **stat_hashtbl, int rule_id, int profile_id,
- int priority, unsigned char direction, unsigned long long latency, int profile_type)
+void shaper_stat_max_latency_update(struct shaping_stat *stat, int rule_id, int profile_id,
+ int priority, unsigned char direction, unsigned long long latency, int profile_type, int thread_id)
{
- struct shaping_stat_data *s_stat_data = NULL;
-
- s_stat_data = shaper_stat_ins_get(stat_hashtbl, rule_id, profile_id, priority, profile_type);
+ unsigned long long old_latency;
+ shaper_stat_tags_build(rule_id, profile_id, priority, profile_type);
if (direction == SHAPING_DIR_IN) {
- if (latency > s_stat_data->incoming.max_latency) {
- s_stat_data->incoming.max_latency = latency;
+ old_latency = fieldstat_dynamic_table_metric_value_get(stat->instance, stat->table_id, stat->column_ids[IN_MAX_LATENCY_IDX], "shaping_metric_row", tags, TAG_IDX_MAX, thread_id);
+ if (latency > old_latency) {
+ fieldstat_dynamic_table_metric_value_set(stat->instance, stat->table_id, stat->column_ids[IN_MAX_LATENCY_IDX], "shaping_metric_row", latency, tags, TAG_IDX_MAX, thread_id);
}
} else {
- if (latency > s_stat_data->outgoing.max_latency) {
- s_stat_data->outgoing.max_latency = latency;
+ old_latency = fieldstat_dynamic_table_metric_value_get(stat->instance, stat->table_id, stat->column_ids[OUT_MAX_LATENCY_IDX], "shaping_metric_row", tags, TAG_IDX_MAX, thread_id);
+ if (latency > old_latency) {
+ fieldstat_dynamic_table_metric_value_set(stat->instance, stat->table_id, stat->column_ids[OUT_MAX_LATENCY_IDX], "shaping_metric_row", latency, tags, TAG_IDX_MAX, thread_id);
}
}
return;
-}
-
-#if 0
-/*********just for self test stub****************/
-void stub_shaper_stat_send(int thread_seq)
-{
- struct shaping_stat_data *s, *tmp;
-
- HASH_ITER(hh, g_rt_para.stat[thread_seq]->stat_hashtbl, s, tmp) {
- shaper_stat_data_send(g_rt_para.stat[thread_seq], s);
- }
-
- return;
-}
-/************************************************/
-#endif \ No newline at end of file
+} \ No newline at end of file
diff --git a/shaping/src/shaper_swarmkv.cpp b/shaping/src/shaper_swarmkv.cpp
index 2c871a1..c0cfb6a 100644
--- a/shaping/src/shaper_swarmkv.cpp
+++ b/shaping/src/shaper_swarmkv.cpp
@@ -106,6 +106,7 @@ struct swarmkv* shaper_swarmkv_init()
swarmkv_options_set_health_check_port(swarmkv_opts, conf.swarmkv_health_check_port);
swarmkv_options_set_health_check_announce_port(swarmkv_opts, conf.swarmkv_health_check_announce_port);
swarmkv_options_set_log_path(swarmkv_opts, "log");
+ swarmkv_options_set_log_level(swarmkv_opts, 4);
swarmkv_db = swarmkv_open(swarmkv_opts, conf.swarmkv_cluster_name, &err);
if (err) {
diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp
index 50aef6f..23d91a9 100644
--- a/shaping/test/gtest_shaper.cpp
+++ b/shaping/test/gtest_shaper.cpp
@@ -1,12 +1,21 @@
#include <cstring>
+#include <ctime>
+#include <fieldstat.h>
#include <gtest/gtest.h>
+#include <cjson/cJSON.h>
#include "shaper.h"
#include "shaper_maat.h"
+#include "shaper_stat.h"
#include "shaper_marsio.h"
#include "stub.h"
#define SHAPING_SESSION_QUEUE_LEN 128
+#define SHAPING_STAT_FILE_NAME "/tmp/shaping_metrics.json"
+#define FIELDSTAT_AUTO_TIME_MAX 999999000
+
+char profile_type_primary[] = "primary";
+char profile_type_borrow[] = "borrow";
static struct stub_packet* packet_new(unsigned long long income_time, unsigned int length, unsigned char dir)
{
@@ -54,13 +63,13 @@ static void send_packets(struct shaping_thread_ctx *ctx, struct shaping_flow *sf
meta.is_tcp_pure_ctrl = 1;
}
- shaping_stream_process(ctx, packet, &meta, sf);
+ shaping_packet_process(ctx, packet, &meta, sf);
for (int j = 0; j < polling_times; j++) {
polling_entry(ctx->sp, ctx->stat, ctx);
}
- stub_curr_time_inc();
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
}
return;
@@ -91,6 +100,70 @@ static int judge_packet_eq(struct stub_pkt_queue *expec_queue, struct stub_pkt_q
return 0;
}
+static void shaping_stat_judge(char *file_line, int rule_id, int profile_id, int priority,
+ unsigned long long tx_pkts, unsigned long long tx_bytes,
+ unsigned long long drop_pkts, long long queue_len, long long max_latency,
+ int queueing_sessions, unsigned char direction, char profile_type[])
+{
+ cJSON *json = NULL;
+ cJSON *tmp_obj = NULL;
+ char attr_name[32] = {0};
+
+ json = cJSON_Parse(file_line);
+ ASSERT_TRUE(json != NULL);
+
+ tmp_obj = cJSON_GetObjectItem(json, "rule_id");
+ ASSERT_TRUE(tmp_obj != NULL);
+ EXPECT_EQ(rule_id, atoi(tmp_obj->valuestring));
+
+ tmp_obj = cJSON_GetObjectItem(json, "profile_id");
+ ASSERT_TRUE(tmp_obj != NULL);
+ EXPECT_EQ(profile_id, atoi(tmp_obj->valuestring));
+
+ tmp_obj = cJSON_GetObjectItem(json, "priority");
+ ASSERT_TRUE(tmp_obj != NULL);
+ EXPECT_EQ(priority, atoi(tmp_obj->valuestring));
+
+ tmp_obj = cJSON_GetObjectItem(json, "profile_type");
+ ASSERT_TRUE(tmp_obj != NULL);
+ EXPECT_STREQ(tmp_obj->valuestring, profile_type);
+
+ tmp_obj = cJSON_GetObjectItem(json, "queueing_sessions");
+ ASSERT_TRUE(tmp_obj != NULL);
+ EXPECT_EQ(queueing_sessions, tmp_obj->valueint);
+
+ snprintf(attr_name, sizeof(attr_name), "%s_pkts", direction == SHAPING_DIR_OUT ? "out" : "in");
+ tmp_obj = cJSON_GetObjectItem(json, attr_name);
+ ASSERT_TRUE(tmp_obj != NULL);
+ EXPECT_EQ(tx_pkts, tmp_obj->valueint);
+
+ snprintf(attr_name, sizeof(attr_name), "%s_bytes", direction == SHAPING_DIR_OUT ? "out" : "in");
+ tmp_obj = cJSON_GetObjectItem(json, attr_name);
+ ASSERT_TRUE(tmp_obj != NULL);
+ EXPECT_EQ(tx_bytes, tmp_obj->valueint);
+
+ snprintf(attr_name, sizeof(attr_name), "%s_drop_pkts", direction == SHAPING_DIR_OUT ? "out" : "in");
+ tmp_obj = cJSON_GetObjectItem(json, attr_name);
+ ASSERT_TRUE(tmp_obj != NULL);
+ EXPECT_EQ(drop_pkts, tmp_obj->valueint);
+
+ if (max_latency != -1) {
+ snprintf(attr_name, sizeof(attr_name), "%s_max_latency_us", direction == SHAPING_DIR_OUT ? "out" : "in");
+ tmp_obj = cJSON_GetObjectItem(json, attr_name);
+ ASSERT_TRUE(tmp_obj != NULL);
+ EXPECT_EQ(max_latency, tmp_obj->valueint);
+ }
+
+ snprintf(attr_name, sizeof(attr_name), "%s_queue_len", direction == SHAPING_DIR_OUT ? "out" : "in");
+ tmp_obj = cJSON_GetObjectItem(json, attr_name);
+ ASSERT_TRUE(tmp_obj != NULL);
+ EXPECT_EQ(queue_len, tmp_obj->valueint);
+
+ cJSON_Delete(json);
+
+ return;
+}
+
/*session1 match rule1
rule1:
profile: limit 1000*/
@@ -117,6 +190,7 @@ TEST(single_session, udp_tx_in_order)
actual_tx_queue = stub_get_tx_queue();
shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 1);
+
/**********send packets*********************/
send_packets(&ctx->thread_ctx[0], sf, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0);
/*******************************************/
@@ -129,18 +203,21 @@ TEST(single_session, udp_tx_in_order)
stub_refresh_token_bucket(0);
for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc();
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
}
+ /***********send stat data here********************/
+ stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
+ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+
shaping_flow_free(sf);
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
-#if 0
/*******test statistics***********/
sleep(2);//wait telegraf to output
char line[1024];
@@ -149,11 +226,10 @@ TEST(single_session, udp_tx_in_order)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 100, 10000, 100, 10000, 0, 0, 170, 0, DIR_ROUTE_UP, profile_type_primary);//max latency is last 10 pkts
+ shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 170, 0, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
-#endif
}
/*session1 match rule1
@@ -190,30 +266,35 @@ TEST(single_session, tcp_tx_in_order)
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
-#if 0
+
/***********send stat data here********************/
- stub_shaper_stat_send(0);
+ stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
+ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
sleep(2);//wait telegraf generate metric
-#endif
+
stub_refresh_token_bucket(0);
for (int i = 0; i < 10; i++) {//10 pkts which is not pure control
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc();
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
while (!TAILQ_EMPTY(&expec_tx_queue)) {//20 pure contorl pkts
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1));
- stub_curr_time_inc();
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
}
+ /***********send stat data here********************/
+ stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
+ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+
shaping_flow_free(sf);
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
-#if 0
+
/*******test statistics***********/
sleep(2);//wait telegraf to output
char line[1024];
@@ -222,15 +303,14 @@ TEST(single_session, tcp_tx_in_order)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 40, 4000, 10, 1000, 0, 30, 0, 1, DIR_ROUTE_UP, profile_type_primary);//max latency is first queueing pkts
+ shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 30, 0, 1, SHAPING_DIR_OUT, profile_type_primary);
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 0, 0, 30, 3000, 0, 0, 30, 0, DIR_ROUTE_UP, profile_type_primary);
+ shaping_stat_judge(line, 0, 0, 1, 30, 3000, 0, 0, 30 + (STUB_TIME_INC_FOR_METRIC_SEND / 1000), 0, SHAPING_DIR_OUT, profile_type_primary);
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
-#endif
}
/*session1 match rule1
@@ -275,7 +355,7 @@ TEST(single_session, udp_diff_direction)
stub_refresh_token_bucket(0);
for (int i = 0; i < 20; i++) {
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc();
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
}
//10 out packets
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
@@ -283,11 +363,15 @@ TEST(single_session, udp_diff_direction)
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
+ /***********send stat data here********************/
+ stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
+ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+
shaping_flow_free(sf);
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
-#if 0
+
/*******test statistics***********/
sleep(2);//wait telegraf to output
char line[1024];
@@ -296,12 +380,12 @@ TEST(single_session, udp_diff_direction)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 20, 2000, 20, 2000, 0, 0, 20, 0, DIR_ROUTE_UP, profile_type_primary);//max latency is last 10 pkts
- shaping_stat_judge(line, 0, 0, 1, 20, 2000, 20, 2000, 0, 0, 20, 0, DIR_ROUTE_DOWN, profile_type_primary);
+ shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 20, 0, SHAPING_DIR_OUT, profile_type_primary);
+
+ shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 20, 0, SHAPING_DIR_IN, profile_type_primary);
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
-#endif
}
/*session1 match rule1, rule2, rule3
@@ -351,17 +435,20 @@ TEST(single_session, udp_multi_rules)
//there are 3 rules, send one packet need 3 polling process, so 10 packets need 30 polling
//even though invoke polling more than 30 times, there should be only 10 pkts be sent
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc();
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
}
+ /***********send stat data here********************/
+ stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
+ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+
shaping_flow_free(sf);
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
-#if 0
/*******test statistics***********/
sleep(2);//wait telegraf to output
char line[1024];
@@ -370,18 +457,17 @@ TEST(single_session, udp_multi_rules)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 0
- shaping_stat_judge(line, 0, 0, 1, 100, 10000, 100, 10000, 0, 0, 506, 0, DIR_ROUTE_UP, profile_type_primary);//max latency is last pkt
+ shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 506, 0, SHAPING_DIR_OUT, profile_type_primary);
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1
- shaping_stat_judge(line, 1, 1, 2, 100, 10000, 100, 10000, 0, 0, 1, 0, DIR_ROUTE_UP, profile_type_primary);//latency of every queued pkt is 1
+ shaping_stat_judge(line, 1, 1, 2, 100, 10000, 0, 0, 1, 0, SHAPING_DIR_OUT, profile_type_primary);//latency of every queued pkt is 1
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2
- shaping_stat_judge(line, 2, 2, 3, 100, 10000, 100, 10000, 0, 0, 90, 0, DIR_ROUTE_UP, profile_type_primary);//max latency is first queued pkt
+ shaping_stat_judge(line, 2, 2, 3, 100, 10000, 0, 0, 90, 0, SHAPING_DIR_OUT, profile_type_primary);//max latency is first queued pkt
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
-#endif
}
/*session1 match rule1
@@ -423,17 +509,94 @@ TEST(single_session, udp_borrow)
stub_refresh_token_bucket(2);
for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc();
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+ }
+ ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
+ ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
+ }
+
+ /***********send stat data here********************/
+ stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
+ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+
+ shaping_flow_free(sf);
+ shaping_engine_destroy(ctx);
+ stub_clear_matched_shaping_rules();
+
+ /*******test statistics***********/
+ sleep(2);//wait telegraf to output
+ char line[1024];
+ FILE *stat_file;
+
+ stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
+ memset(line, 0, sizeof(line));
+ ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary
+ shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 170, 0, SHAPING_DIR_OUT, profile_type_primary);
+
+ ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow
+ shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, 170, 0, SHAPING_DIR_OUT, profile_type_borrow);
+
+ fclose(stat_file);
+ stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
+ fclose(stat_file);
+}
+
+/*session1 match rule1
+ rule1:
+ priority: 9
+ profile1: limit 0
+ profile2: limit 0
+ profile3: limit 1000*/
+TEST(single_session, udp_borrow_same_priority_9)
+{
+ struct stub_pkt_queue expec_tx_queue;
+ struct stub_pkt_queue *actual_tx_queue;
+ struct shaping_ctx *ctx = NULL;
+ struct shaping_flow *sf = NULL;
+ long long rule_id[] = {1};
+ int priority[] = {9};
+ int profile_num[] = {3};
+ int profile_id[][MAX_REF_PROFILE] = {{1, 2, 3}};
+
+ TAILQ_INIT(&expec_tx_queue);
+ stub_init();
+ ctx = shaping_engine_init();
+ ASSERT_TRUE(ctx != NULL);
+ sf = shaping_flow_new();
+ ASSERT_TRUE(sf != NULL);
+
+ stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id);
+ stub_set_token_bucket_avl_per_sec(1, 0, SHAPING_DIR_OUT);
+ stub_set_token_bucket_avl_per_sec(2, 0, SHAPING_DIR_OUT);
+ stub_set_token_bucket_avl_per_sec(3, 1000, SHAPING_DIR_OUT);
+ actual_tx_queue = stub_get_tx_queue();
+ shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 1);
+
+ /*******send packets***********/
+ send_packets(&ctx->thread_ctx[0], sf, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0);
+
+
+ //first 10 packets
+ ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
+
+ while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 90 delay packets
+ stub_refresh_token_bucket(3);
+ for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent
+ polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
}
+ /***********send stat data here********************/
+ stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
+ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+
shaping_flow_free(sf);
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
-#if 0
/*******test statistics***********/
sleep(2);//wait telegraf to output
char line[1024];
@@ -442,15 +605,17 @@ TEST(single_session, udp_borrow)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary
- shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 0, 0, 170, 0, DIR_ROUTE_UP, profile_type_primary);
+ shaping_stat_judge(line, 1, 1, 9, 0, 0, 0, 0, 170, 0, SHAPING_DIR_OUT, profile_type_primary);
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow
- shaping_stat_judge(line, 1, 2, 2, 100, 10000, 100, 10000, 0, 0, 170, 0, DIR_ROUTE_UP, profile_type_borrow);
+ shaping_stat_judge(line, 1, 2, 9, 0, 0, 0, 0, 170, 0, SHAPING_DIR_OUT, profile_type_borrow);
+
+ ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 3, borrow
+ shaping_stat_judge(line, 1, 3, 9, 100, 10000, 0, 0, 170, 0, SHAPING_DIR_OUT, profile_type_borrow);
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
-#endif
}
/*session1 match rule1, session2 match rule2
@@ -512,7 +677,7 @@ TEST(two_session_diff_priority, udp_in_order)
stub_refresh_token_bucket(1);
for (int i = 0; i < 10; i++) {
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc();
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));//stream2 priority 1, first
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
@@ -523,18 +688,21 @@ TEST(two_session_diff_priority, udp_in_order)
stub_refresh_token_bucket(1);
for (int i = 0; i < 10; i++) {
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc();
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));//stream1 priority 2
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
}
+ /***********send stat data here********************/
+ stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
+ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+
shaping_flow_free(sf1);
shaping_flow_free(sf2);
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
-#if 0
/*******test statistics***********/
sleep(2);//wait telegraf to output
char line[1024];
@@ -543,15 +711,14 @@ TEST(two_session_diff_priority, udp_in_order)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 0
- shaping_stat_judge(line, 0, 0, 2, 100, 10000, 100, 10000, 0, 0, 280, 0, DIR_ROUTE_UP, profile_type_primary);//max latency is every queued pkts
+ shaping_stat_judge(line, 0, 0, 2, 100, 10000, 0, 0, 280, 0, SHAPING_DIR_OUT, profile_type_primary);//max latency is every queued pkts
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1
- shaping_stat_judge(line, 1, 1, 1, 100, 10000, 100, 10000, 0, 0, 90, 0, DIR_ROUTE_UP, profile_type_primary);//max latency is every queued pkts
+ shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 90, 0, SHAPING_DIR_OUT, profile_type_primary);//max latency is every queued pkts
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
-#endif
}
/*session1 match rule1,rule2,rule4; session2 match rule3
@@ -626,7 +793,7 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule)
stub_refresh_token_bucket(4);
for (int j = 0; j < 2; j++) {
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc();
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
@@ -639,7 +806,7 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule)
stub_refresh_token_bucket(4);
for (int i = 0; i < 10; i++) {
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc();
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));//stream2 priority 3, first
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
@@ -652,18 +819,21 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule)
stub_refresh_token_bucket(4);
for (int i = 0; i < 30; i++) {
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc();
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
}
+ /***********send stat data here********************/
+ stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
+ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+
shaping_flow_free(sf1);
shaping_flow_free(sf2);
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
-#if 0
/*******test statistics***********/
sleep(2);//wait telegraf to output
char line[1024];
@@ -673,21 +843,20 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule)
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 1, 1, 1, 20, 2000, 20, 2000, 0, 0, 58, 0, DIR_ROUTE_UP, profile_type_primary);//profile_id 1, max latency is last pkt
+ shaping_stat_judge(line, 1, 1, 1, 20, 2000, 0, 0, 58, 0, SHAPING_DIR_OUT, profile_type_primary);//profile_id 1, max latency is last pkt
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 2, 2, 2, 20, 2000, 20, 2000, 0, 0, 1, 0, DIR_ROUTE_UP, profile_type_primary);//profile_id 2, evevy queued pkt's latency is 1
+ shaping_stat_judge(line, 2, 2, 2, 20, 2000, 0, 0, 1, 0, SHAPING_DIR_OUT, profile_type_primary);//profile_id 2, evevy queued pkt's latency is 1
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 4, 4, 4, 20, 2000, 20, 2000, 0, 0, 11, 0, DIR_ROUTE_UP, profile_type_primary);//profile_id 4, max latency is first queued pkt
+ shaping_stat_judge(line, 4, 4, 4, 20, 2000, 0, 0, 11, 0, SHAPING_DIR_OUT, profile_type_primary);//profile_id 4, max latency is first queued pkt
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 3, 3, 3, 20, 2000, 20, 2000, 0, 0, 12, 0, DIR_ROUTE_UP, profile_type_primary);//profile_id 3, every queued pkt's latency is 12
+ shaping_stat_judge(line, 3, 3, 3, 20, 2000, 0, 0, 12, 0, SHAPING_DIR_OUT, profile_type_primary);//profile_id 3, every queued pkt's latency is 12
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
-#endif
}
/*session1 match rule1
@@ -736,7 +905,7 @@ TEST(single_session_async, udp_tx_in_order)
stub_refresh_token_bucket(0);
for (int i = 0; i < 10; i++) {//异步获取token多发送了10个报文,补回token,不应发送报文
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc();
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
@@ -744,17 +913,20 @@ TEST(single_session_async, udp_tx_in_order)
stub_refresh_token_bucket(0);
for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc();
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
}
+ /***********send stat data here********************/
+ stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
+ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+
shaping_flow_free(sf);
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
-#if 0
/*******test statistics***********/
sleep(2);//wait telegraf to output
char line[1024];
@@ -763,11 +935,10 @@ TEST(single_session_async, udp_tx_in_order)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 100, 10000, 100, 10000, 0, 0, 160, 0, DIR_ROUTE_UP, profile_type_primary);//max latency is last 10 pkts
+ shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 160, 0, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
-#endif
}
/*session1 match rule1
@@ -809,11 +980,14 @@ TEST(single_session_async, udp_close_before_async_exec)
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
+ /***********send stat data here********************/
+ stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
+ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+
shaping_flow_free(sf);
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
-#if 0
/*******test statistics***********/
sleep(2);//wait telegraf to output
char line[1024];
@@ -822,11 +996,10 @@ TEST(single_session_async, udp_close_before_async_exec)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 10, 1000, 10, 1000, 0, 0, 0, 0, DIR_ROUTE_UP, profile_type_primary);
+ shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 0, 0, 0, SHAPING_DIR_OUT, profile_type_primary);
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
-#endif
}
/*session1 match rule1; session2 match rule2
@@ -890,7 +1063,7 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order)
stub_refresh_token_bucket(2);
for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc();
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
@@ -900,18 +1073,21 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order)
stub_refresh_token_bucket(2);
for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc();
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
}
+ /***********send stat data here********************/
+ stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
+ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+
shaping_flow_free(sf1);
shaping_flow_free(sf2);
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
-#if 0
/*******test statistics***********/
sleep(2);//wait telegraf to output
char line[1024];
@@ -920,18 +1096,17 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary
- shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 0, 0, 470, 0, DIR_ROUTE_UP, profile_type_primary);
+ shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 470, 0, SHAPING_DIR_OUT, profile_type_primary);
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow
- shaping_stat_judge(line, 1, 2, 2, 100, 10000, 100, 10000, 0, 0, 470, 0, DIR_ROUTE_UP, profile_type_borrow);
+ shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, 470, 0, SHAPING_DIR_OUT, profile_type_borrow);
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, primary
- shaping_stat_judge(line, 2, 2, 1, 100, 10000, 100, 10000, 0, 0, 190, 0, DIR_ROUTE_UP, profile_type_primary);
+ shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 190, 0, SHAPING_DIR_OUT, profile_type_primary);
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
-#endif
}
/*session1 match rule1; session2 match rule1
@@ -984,18 +1159,21 @@ TEST(two_session_same_rule, udp_tx_in_order)
stub_refresh_token_bucket(1);
for (int i = 0; i < 20; i++) {//even though invoke polling more than 10 times, there should be only 10 pkts be sent
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc();
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
}
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
}
+ /***********send stat data here********************/
+ stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
+ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+
shaping_flow_free(sf1);
shaping_flow_free(sf2);
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
-#if 0
/*******test statistics***********/
sleep(2);//wait telegraf to output
char line[1024];
@@ -1004,11 +1182,10 @@ TEST(two_session_same_rule, udp_tx_in_order)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 1, 1, 1, 200, 20000, 200, 20000, 0, 0, 370, 0, DIR_ROUTE_UP, profile_type_primary);
+ shaping_stat_judge(line, 1, 1, 1, 200, 20000, 0, 0, 370, 0, SHAPING_DIR_OUT, profile_type_primary);
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
-#endif
}
/*session1 match rule1; session2 match rule2
@@ -1037,7 +1214,7 @@ TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order)
int profile_id[][MAX_REF_PROFILE] = {{0}, {0}};
int stream1_pkt_num = 0;
int stream2_pkt_num = 0;
- struct timespec curr_time;
+ time_t curr_time;
TAILQ_INIT(&expec_tx_queue1);
@@ -1065,8 +1242,8 @@ TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order)
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));
}
- clock_gettime(CLOCK_MONOTONIC, &curr_time);
- srand(curr_time.tv_sec);
+ time(&curr_time);
+ srand(curr_time);
for (int i = 0; i < 99; i++) {
if (rand() % 2 == 0) {
send_packets(&ctx->thread_ctx[0], sf1, 1, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0);
@@ -1101,12 +1278,15 @@ TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order)
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
+ /***********send stat data here********************/
+ stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
+ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+
shaping_flow_free(sf1);
shaping_flow_free(sf2);
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
-#if 0
/*******test statistics***********/
sleep(2);//wait telegraf to output
char line[1024];
@@ -1115,17 +1295,14 @@ TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 1, 0, 1, stream1_pkt_num, stream1_pkt_num*100,
- stream1_pkt_num, stream1_pkt_num*100, 0, 0, -1, 0, DIR_ROUTE_UP, profile_type_primary);//can't predict a certain latency cause of random
+ shaping_stat_judge(line, 1, 0, 1, stream1_pkt_num, stream1_pkt_num*100, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//can't predict a certain latency cause of random
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 2, 0, 2, stream2_pkt_num, stream2_pkt_num*100,
- stream2_pkt_num, stream2_pkt_num*100, 0, 0, -1, 0, DIR_ROUTE_UP, profile_type_primary);//can't predict a certain latency cause of random
+ shaping_stat_judge(line, 2, 0, 2, stream2_pkt_num, stream2_pkt_num*100, 0, 0, -1, 0, SHAPING_DIR_OUT, profile_type_primary);//can't predict a certain latency cause of random
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
-#endif
}
/*session1 match rule1
@@ -1165,15 +1342,18 @@ TEST(statistics, udp_drop_pkt)
while (!TAILQ_EMPTY(&expec_tx_queue)) {
stub_refresh_token_bucket(0);
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc();
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1));
}
+ /***********send stat data here********************/
+ stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
+ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+
shaping_flow_free(sf);
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
-#if 0
/*******test statistics***********/
sleep(2);//wait telegraf to output
char line[1024];
@@ -1182,11 +1362,10 @@ TEST(statistics, udp_drop_pkt)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 210, 21000, 110, 11000, 100, 0, 200, 0, DIR_ROUTE_UP, profile_type_primary);//every queued pkt's latency is 200
+ shaping_stat_judge(line, 0, 0, 1, SHAPING_SESSION_QUEUE_LEN+10, (SHAPING_SESSION_QUEUE_LEN+10)*100, 100, 0, 228, 0, SHAPING_DIR_OUT, profile_type_primary);//every queued pkt's latency is max
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
-#endif
}
/*session1 match rule1
@@ -1215,16 +1394,15 @@ TEST(statistics, udp_queueing_pkt)
stub_set_token_bucket_avl_per_sec(0, 1000, SHAPING_DIR_OUT);
actual_tx_queue = stub_get_tx_queue();
shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 1);
-
- /*******packets, OP_STATE_DATA***********/
+ /*******send packets***********/
send_packets(&ctx->thread_ctx[0], sf, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0);
-#if 0
+
/***********send stat data here********************/
- stub_shaper_stat_send(0);
+ stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
+ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
sleep(2);//wait telegraf generate metric
-#endif
//first 10 packets
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
@@ -1233,15 +1411,18 @@ TEST(statistics, udp_queueing_pkt)
while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 90 delay packets
stub_refresh_token_bucket(0);
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc();
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 1));
}
+ /***********send stat data here********************/
+ stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
+ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+
shaping_flow_free(sf);
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
-#if 0
/*******test statistics***********/
sleep(2);//wait telegraf to output
char line[1024];
@@ -1251,20 +1432,19 @@ TEST(statistics, udp_queueing_pkt)
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//stat data first sent
- shaping_stat_judge(line, 0, 0, 1, 100, 10000, 10, 1000, 0, 90, 0, 1, DIR_ROUTE_UP, profile_type_primary);
+ shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 90, 0, 1, SHAPING_DIR_OUT, profile_type_primary);
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//stat data last sent
- shaping_stat_judge(line, 0, 0, 1, 0, 0, 90, 9000, 0, 0, 90, 0, DIR_ROUTE_UP, profile_type_primary);
+ shaping_stat_judge(line, 0, 0, 1, 90, 9000, 0, 0, 90 + (STUB_TIME_INC_FOR_METRIC_SEND / 1000), 0, SHAPING_DIR_OUT, profile_type_primary);
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
-#endif
}
int main(int argc, char **argv)
{
testing::InitGoogleTest(&argc, argv);
- //testing::GTEST_FLAG(filter) = "two_session_diff_priority.udp_in_order";
+ //testing::GTEST_FLAG(filter) = "single_session.udp_borrow_same_priority_9";
return RUN_ALL_TESTS();
} \ No newline at end of file
diff --git a/shaping/test/gtest_shaper_bak.cpp b/shaping/test/gtest_shaper_bak.cpp
deleted file mode 100644
index a947a8b..0000000
--- a/shaping/test/gtest_shaper_bak.cpp
+++ /dev/null
@@ -1,234 +0,0 @@
-#include <MESA/stream.h>
-#include <cjson/cJSON.h>
-#include <gtest/gtest.h>
-#include <vector>
-#include <stdlib.h>
-#include <time.h>
-
-#include "stub.h"
-
-#define SHAPING_STAT_FILE_NAME "/tmp/shaping_metrics.json"
-#define SHAPING_SESSION_QUEUE_LEN 100
-#define SHAPING_SESSIONS_LIMIT_PER_AVL 1000
-
-char profile_type_primary[] = "primary";
-char profile_type_borrow[] = "borrow";
-
-
-using namespace std;
-
-static int judge_packet_eq(struct stub_pkt_queue *expec_queue, struct stub_pkt_queue *actual_queue, int num)
-{
- struct stub_packet_node *expec_pkt_node;
- struct stub_packet_node *actual_pkt_node;
-
- for (int i = 0; i < num; i++) {
- if(TAILQ_EMPTY(actual_queue)) {
- return -1;
- }
- expec_pkt_node = TAILQ_FIRST(expec_queue);
- actual_pkt_node = TAILQ_FIRST(actual_queue);
- if (expec_pkt_node->raw_packet != actual_pkt_node->raw_packet) {
- return -2;
- }
-
- TAILQ_REMOVE(expec_queue, expec_pkt_node, node);
- TAILQ_REMOVE(actual_queue, actual_pkt_node, node);
- free(expec_pkt_node->raw_packet);
- free(expec_pkt_node);
- free(actual_pkt_node);
- }
-
- return 0;
-}
-
-static struct stub_packet* packet_new(unsigned long long income_time, unsigned int length, unsigned char dir)
-{
- struct stub_packet *packet;
-
- packet = (struct stub_packet*)calloc(1, sizeof(struct stub_packet));
- packet->income_time = income_time;
- packet->length = length;
- packet->direction = dir;
-
- return packet;
-}
-
-static struct stub_packet_node* packet_node_new(stub_packet *packet)
-{
- struct stub_packet_node *pkt_node;
-
- pkt_node = (struct stub_packet_node*)calloc(1, sizeof(struct stub_packet_node));
- pkt_node->raw_packet = packet;
-
- return pkt_node;
-}
-
-static void send_packets(struct streaminfo *stream, void **pme, int pkt_num, int pkt_len,
- unsigned char dir, struct stub_pkt_queue *expec_tx_queue, int polling_times,
- int is_tcp, int is_pure_control)
-{
- struct stub_packet_node *pkt_node;
- struct stub_packet *packet;
- unsigned long long time;
- char ret;
-
- stream->threadnum = 0;//just 1 thread for test!!!
-
- for (int i = 0; i < pkt_num; i++) {
- time = stub_curr_time_get();
- packet = packet_new(time, pkt_len, dir);
- if (expec_tx_queue) {
- pkt_node = packet_node_new(packet);
- TAILQ_INSERT_TAIL(expec_tx_queue, pkt_node, node);
- }
-
- time++;
-
- stream->routedir = dir;
- stream->hash_index = pkt_len;//just for stub test!!!!! use hash_index to store pkt_len
- if (is_tcp) {
- stream->ptcpdetail->clientpktnum++;
- if (is_pure_control) {
- stream->ptcpdetail->pdata = NULL;
- } else {
- stream->ptcpdetail->pdata = (void*)0x1234;//just for stub test, will not access this pointer
- }
- } else {
- stream->pudpdetail->clientpktnum++;
- }
-
- if (is_tcp) {
- ret = tcp_allpkt_raw_process(stream, NULL, packet, pme);
- } else {
- ret = udp_raw_process(stream, NULL, packet, pme);
- }
-
- if (ret == APP_STATE_GIVEME) {
- if (!packet->detained_flag) {
- stub_send_packet(packet);
- }
- }
-
- for (int j = 0; j < polling_times; j++) {
- polling_entry(NULL, NULL, 0, NULL);
- }
-
- stub_curr_time_inc();
- }
-}
-
-static void stream_state_change_udp(struct streaminfo *stream, void **pme, unsigned char state)
-{
- stream->opstate = state;
- //udp_process(stream, pme, 0, NULL);//just change state, no packet
- if (state == OP_STATE_PENDING) {
- stub_stream_bridge_sync_cb_invoke(stream);
- } else if(state == OP_STATE_CLOSE) {
- stub_stream_bridge_free_cb_invoke(stream);
- }
-
- udp_raw_process(stream, NULL, NULL, pme);//just change state, no packet
-
- return;
-}
-
-static void stream_state_change_tcp(struct streaminfo *stream, void **pme, unsigned char state)
-{
- stream->pktstate = state;
- //tcp_allpkt_process(stream, pme, 0, NULL);//just change state, no packet
- if (state == OP_STATE_PENDING) {
- stub_stream_bridge_sync_cb_invoke(stream);
- } else if(state == OP_STATE_CLOSE) {
- stub_stream_bridge_free_cb_invoke(stream);
- }
-
- tcp_allpkt_raw_process(stream, NULL, NULL, pme);//just change state, no packet
-
- return;
-}
-
-static void shaping_stat_judge(char *file_line, int rule_id, int profile_id, int priority,
- unsigned long long rx_pkts, unsigned long long rx_bytes,
- unsigned long long tx_pkts, unsigned long long tx_bytes,
- unsigned long long drop_pkts, long long queue_len, long long max_latency,
- int queueing_sessions, unsigned char direction, char profile_type[])
-{
- cJSON *json = NULL;
- cJSON *tmp_obj = NULL;
- char attr_name[32] = {0};
-
- json = cJSON_Parse(file_line);
- ASSERT_TRUE(json != NULL);
-
- tmp_obj = cJSON_GetObjectItem(json, "rule_id");
- ASSERT_TRUE(tmp_obj != NULL);
- EXPECT_EQ(rule_id, atoi(tmp_obj->valuestring));
-
- tmp_obj = cJSON_GetObjectItem(json, "profile_id");
- ASSERT_TRUE(tmp_obj != NULL);
- EXPECT_EQ(profile_id, atoi(tmp_obj->valuestring));
-
- tmp_obj = cJSON_GetObjectItem(json, "priority");
- ASSERT_TRUE(tmp_obj != NULL);
- EXPECT_EQ(priority, atoi(tmp_obj->valuestring));
-
- tmp_obj = cJSON_GetObjectItem(json, "profile_type");
- ASSERT_TRUE(tmp_obj != NULL);
- EXPECT_STREQ(tmp_obj->valuestring, profile_type);
-
- tmp_obj = cJSON_GetObjectItem(json, "queueing_sessions");
- ASSERT_TRUE(tmp_obj != NULL);
- EXPECT_EQ(queueing_sessions, tmp_obj->valueint);
-
- snprintf(attr_name, sizeof(attr_name), "%s_rx_pkts", direction == DIR_ROUTE_UP ? "out" : "in");
- tmp_obj = cJSON_GetObjectItem(json, attr_name);
- ASSERT_TRUE(tmp_obj != NULL);
- EXPECT_EQ(rx_pkts, tmp_obj->valueint);
-
- snprintf(attr_name, sizeof(attr_name), "%s_rx_bytes", direction == DIR_ROUTE_UP ? "out" : "in");
- tmp_obj = cJSON_GetObjectItem(json, attr_name);
- ASSERT_TRUE(tmp_obj != NULL);
- EXPECT_EQ(rx_bytes, tmp_obj->valueint);
-
- snprintf(attr_name, sizeof(attr_name), "%s_tx_pkts", direction == DIR_ROUTE_UP ? "out" : "in");
- tmp_obj = cJSON_GetObjectItem(json, attr_name);
- ASSERT_TRUE(tmp_obj != NULL);
- EXPECT_EQ(tx_pkts, tmp_obj->valueint);
-
- snprintf(attr_name, sizeof(attr_name), "%s_tx_bytes", direction == DIR_ROUTE_UP ? "out" : "in");
- tmp_obj = cJSON_GetObjectItem(json, attr_name);
- ASSERT_TRUE(tmp_obj != NULL);
- EXPECT_EQ(tx_bytes, tmp_obj->valueint);
-
- snprintf(attr_name, sizeof(attr_name), "%s_drop_pkts", direction == DIR_ROUTE_UP ? "out" : "in");
- tmp_obj = cJSON_GetObjectItem(json, attr_name);
- ASSERT_TRUE(tmp_obj != NULL);
- EXPECT_EQ(drop_pkts, tmp_obj->valueint);
-
- if (max_latency != -1) {
- snprintf(attr_name, sizeof(attr_name), "%s_max_latency_us", direction == DIR_ROUTE_UP ? "out" : "in");
- tmp_obj = cJSON_GetObjectItem(json, attr_name);
- ASSERT_TRUE(tmp_obj != NULL);
- EXPECT_EQ(max_latency, tmp_obj->valueint);
- }
-
- snprintf(attr_name, sizeof(attr_name), "%s_queue_len", direction == DIR_ROUTE_UP ? "out" : "in");
- tmp_obj = cJSON_GetObjectItem(json, attr_name);
- ASSERT_TRUE(tmp_obj != NULL);
- EXPECT_EQ(queue_len, tmp_obj->valueint);
-
- cJSON_Delete(json);
-
- return;
-}
-
-
-
-
-int main(int argc, char **argv)
-{
- testing::InitGoogleTest(&argc, argv);
- //testing::GTEST_FLAG(filter) = "single_session.udp_diff_direction";
- return RUN_ALL_TESTS();
-} \ No newline at end of file
diff --git a/shaping/test/stub.cpp b/shaping/test/stub.cpp
index 2bf4a8f..b49bafd 100644
--- a/shaping/test/stub.cpp
+++ b/shaping/test/stub.cpp
@@ -158,9 +158,9 @@ struct stub_pkt_queue* stub_get_tx_queue()
return &tx_queue;
}
-void stub_curr_time_inc()
+void stub_curr_time_inc(unsigned long long time_ns)
{
- curr_time += 1000;
+ curr_time += time_ns;
return;
}
@@ -272,6 +272,11 @@ int swarmkv_options_set_log_path(struct swarmkv_options *opts, const char *logpa
return 0;
}
+int swarmkv_options_set_log_level(struct swarmkv_options *opts, int loglevel)
+{
+ return 0;
+}
+
void swarmkv_tconsume(struct swarmkv * db, const char * key, size_t keylen, long long tokens, swarmkv_on_reply_callback_t *cb, void *cb_arg)
{
int actual_tokens;
diff --git a/shaping/test/stub.h b/shaping/test/stub.h
index e394da3..e0702a2 100644
--- a/shaping/test/stub.h
+++ b/shaping/test/stub.h
@@ -9,6 +9,9 @@
#define STUB_MAAT_SHAPING_RULE_TABLE_ID 0
#define STUB_MAAT_SHAPING_PROFILE_TABLE_ID 1
+#define STUB_TIME_INC_FOR_PACKET 1000
+#define STUB_TIME_INC_FOR_METRIC_SEND 1000000
+
struct stub_packet {
unsigned char direction;
unsigned char pure_control;
@@ -37,7 +40,7 @@ struct stub_pkt_queue* stub_get_tx_queue();
int stub_AQM_drop_packet(int queue_len, unsigned long long income_time);
-void stub_curr_time_inc();
+void stub_curr_time_inc(unsigned long long time_ns);
unsigned long long stub_curr_time_get();
void stub_init();
diff --git a/shaping/test/test_conf/main.conf b/shaping/test/test_conf/shaping.conf
index e134a70..78d458f 100644
--- a/shaping/test/test_conf/main.conf
+++ b/shaping/test/test_conf/shaping.conf
@@ -28,9 +28,11 @@ SWARMKV_CLUSTER_ANNOUNCE_PORT=8501
SWARMKV_HEALTH_CHECK_PORT=0
SWARMKV_HEALTH_CHECK_ANNOUNCE_PORT=1111
-#[METRIC]
-#TELEGRAF_IP="127.0.0.1"
-#TELEGRAF_PORT=6667
+[METRIC]
+FIELDSTAT_OUTPUT_INTERVAL_MS=999999000
+FIELDSTAT_ENABLE_BACKGRUND_THREAD=0
+TELEGRAF_IP="127.0.0.1"
+TELEGRAF_PORT=6667
[CONFIG]
#PROFILE_QUEUE_LEN_PER_PRIORITY_MAX=128
diff --git a/shaping/test/test_conf/zlog.conf b/shaping/test/test_conf/zlog.conf
index a7a988d..accff95 100644
--- a/shaping/test/test_conf/zlog.conf
+++ b/shaping/test/test_conf/zlog.conf
@@ -7,4 +7,4 @@ INFO=20
FATAL=30
[rules]
-shaping.fatal "./log/shaping.log.%d(%F)";
+log_shaping.DEBUG "./log/shaping.log.%d(%F)";