diff options
| author | 刘畅 <[email protected]> | 2023-10-11 01:43:14 +0000 |
|---|---|---|
| committer | 刘畅 <[email protected]> | 2023-10-11 01:43:14 +0000 |
| commit | 329a44aef63ed3247812481a112b8fac0facbde0 (patch) | |
| tree | 4cd6b3eb3e2caead4309df025d0d968be24431d4 | |
| parent | cd9e72e6e614691903e9befd0b1b7d9d4c3d8845 (diff) | |
| parent | ba9ed468a08a21e92f2e1cf5d61db493f9e5b188 (diff) | |
Merge branch 'adapt_new_arc_swarmkv' into 'rel'v1.3.0
optimize performance for swarmkv
See merge request tango/shaping-engine!42
| -rw-r--r-- | .gitlab-ci.yml | 204 | ||||
| -rw-r--r-- | common/include/utils.h | 6 | ||||
| -rw-r--r-- | deps/timeout/Makefile | 68 | ||||
| -rw-r--r-- | deps/timeout/test-timeout.c | 530 | ||||
| -rw-r--r-- | deps/timeout/timeout-bitops.c | 249 | ||||
| -rw-r--r-- | deps/timeout/timeout-debug.h | 77 | ||||
| -rw-r--r-- | deps/timeout/timeout.c | 749 | ||||
| -rw-r--r-- | deps/timeout/timeout.h | 256 | ||||
| -rw-r--r-- | shaping/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | shaping/include/shaper.h | 15 | ||||
| -rw-r--r-- | shaping/include/shaper_stat.h | 2 | ||||
| -rw-r--r-- | shaping/include/shaper_swarmkv.h | 3 | ||||
| -rw-r--r-- | shaping/src/main.cpp | 2 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 140 | ||||
| -rw-r--r-- | shaping/src/shaper_maat.cpp | 2 | ||||
| -rw-r--r-- | shaping/src/shaper_session.cpp | 2 | ||||
| -rw-r--r-- | shaping/src/shaper_stat.cpp | 27 | ||||
| -rw-r--r-- | shaping/src/shaper_swarmkv.cpp | 6 | ||||
| -rw-r--r-- | shaping/test/gtest_shaper.cpp | 253 | ||||
| -rw-r--r-- | shaping/test/stub.cpp | 18 | ||||
| -rw-r--r-- | shaping/test/stub.h | 3 | ||||
| -rw-r--r-- | shaping/test/test_conf/zlog.conf | 2 |
22 files changed, 2275 insertions, 342 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index c0cb8a2..aa12c63 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -13,108 +13,108 @@ stages: # compile use image: build-env:master ############################################################################### -.build_by_travis_for_centos7: - image: $BUILD_IMAGE_CENTOS7 - before_script: - - mkdir -p $BUILD_PADDING_PREFIX/$CI_PROJECT_NAMESPACE/ - - ln -s $CI_PROJECT_DIR $BUILD_PADDING_PREFIX/$CI_PROJECT_PATH - - cd $BUILD_PADDING_PREFIX/$CI_PROJECT_PATH - - chmod +x ./ci/travis.sh - script: - - yum makecache - - ./ci/travis.sh - tags: - - share - -branch_build_debug_for_centos7: - stage: build - extends: .build_by_travis_for_centos7 - variables: - BUILD_TYPE: Debug - except: - - /^develop-.*$/i - - /^release-.*$/i - - tags - -branch_build_release_for_centos7: - stage: build - variables: - BUILD_TYPE: RelWithDebInfo - extends: .build_by_travis_for_centos7 - except: - - /^develop-.*$/i - - /^release-.*$/i - - tags - -develop_build_debug_for_centos7: - stage: build - extends: .build_by_travis_for_centos7 - variables: - TESTING_VERSION_BUILD: 1 - UPLOAD_SYMBOL_FILES: 1 - BUILD_TYPE: Debug -# ASAN_OPTION: ADDRESS - PACKAGE: 1 - PULP3_REPO_NAME: tsg-testing-x86_64.el7 - PULP3_DIST_NAME: tsg-testing-x86_64.el7 - artifacts: - name: "shaping_engine-develop-$CI_COMMIT_REF_NAME-debug" - paths: - - build/*.rpm - only: - - /^develop-.*$/i - - /^release-.*$/i - -develop_build_release_for_centos7: - stage: build - extends: .build_by_travis_for_centos7 - variables: - TESTING_VERSION_BUILD: 1 - UPLOAD_SYMBOL_FILES: 1 -# ASAN_OPTION: ADDRESS - BUILD_TYPE: RelWithDebInfo - PACKAGE: 1 - PULP3_REPO_NAME: tsg-testing-x86_64.el7 - PULP3_DIST_NAME: tsg-testing-x86_64.el7 - artifacts: - name: "shaping_engine-develop-$CI_COMMIT_REF_NAME-release" - paths: - - build/*.rpm - only: - - /^develop-.*$/i - - /^release-.*$/i - -release_build_debug_for_centos7: - stage: build - variables: - UPLOAD_SYMBOL_FILES: 1 - BUILD_TYPE: Debug - PACKAGE: 1 - PULP3_REPO_NAME: tsg-stable-x86_64.el7 - PULP3_DIST_NAME: tsg-stable-x86_64.el7 - extends: .build_by_travis_for_centos7 - artifacts: - name: "shaping_engine-install-$CI_COMMIT_REF_NAME-debug" - paths: - - build/*.rpm - only: - - tags - -release_build_release_for_centos7: - stage: build - variables: - BUILD_TYPE: RelWithDebInfo - UPLOAD_SYMBOL_FILES: 1 - PACKAGE: 1 - PULP3_REPO_NAME: tsg-stable-x86_64.el7 - PULP3_DIST_NAME: tsg-stable-x86_64.el7 - extends: .build_by_travis_for_centos7 - artifacts: - name: "shaping_engine-install-$CI_COMMIT_REF_NAME-release" - paths: - - build/*.rpm - only: - - tags +# .build_by_travis_for_centos7: +# image: $BUILD_IMAGE_CENTOS7 +# before_script: +# - mkdir -p $BUILD_PADDING_PREFIX/$CI_PROJECT_NAMESPACE/ +# - ln -s $CI_PROJECT_DIR $BUILD_PADDING_PREFIX/$CI_PROJECT_PATH +# - cd $BUILD_PADDING_PREFIX/$CI_PROJECT_PATH +# - chmod +x ./ci/travis.sh +# script: +# - yum makecache +# - ./ci/travis.sh +# tags: +# - share + +# branch_build_debug_for_centos7: +# stage: build +# extends: .build_by_travis_for_centos7 +# variables: +# BUILD_TYPE: Debug +# except: +# - /^develop-.*$/i +# - /^release-.*$/i +# - tags + +# branch_build_release_for_centos7: +# stage: build +# variables: +# BUILD_TYPE: RelWithDebInfo +# extends: .build_by_travis_for_centos7 +# except: +# - /^develop-.*$/i +# - /^release-.*$/i +# - tags + +# develop_build_debug_for_centos7: +# stage: build +# extends: .build_by_travis_for_centos7 +# variables: +# TESTING_VERSION_BUILD: 1 +# UPLOAD_SYMBOL_FILES: 1 +# BUILD_TYPE: Debug +# # ASAN_OPTION: ADDRESS +# PACKAGE: 1 +# PULP3_REPO_NAME: tsg-testing-x86_64.el7 +# PULP3_DIST_NAME: tsg-testing-x86_64.el7 +# artifacts: +# name: "shaping_engine-develop-$CI_COMMIT_REF_NAME-debug" +# paths: +# - build/*.rpm +# only: +# - /^develop-.*$/i +# - /^release-.*$/i + +# develop_build_release_for_centos7: +# stage: build +# extends: .build_by_travis_for_centos7 +# variables: +# TESTING_VERSION_BUILD: 1 +# UPLOAD_SYMBOL_FILES: 1 +# # ASAN_OPTION: ADDRESS +# BUILD_TYPE: RelWithDebInfo +# PACKAGE: 1 +# PULP3_REPO_NAME: tsg-testing-x86_64.el7 +# PULP3_DIST_NAME: tsg-testing-x86_64.el7 +# artifacts: +# name: "shaping_engine-develop-$CI_COMMIT_REF_NAME-release" +# paths: +# - build/*.rpm +# only: +# - /^develop-.*$/i +# - /^release-.*$/i + +# release_build_debug_for_centos7: +# stage: build +# variables: +# UPLOAD_SYMBOL_FILES: 1 +# BUILD_TYPE: Debug +# PACKAGE: 1 +# PULP3_REPO_NAME: tsg-stable-x86_64.el7 +# PULP3_DIST_NAME: tsg-stable-x86_64.el7 +# extends: .build_by_travis_for_centos7 +# artifacts: +# name: "shaping_engine-install-$CI_COMMIT_REF_NAME-debug" +# paths: +# - build/*.rpm +# only: +# - tags + +# release_build_release_for_centos7: +# stage: build +# variables: +# BUILD_TYPE: RelWithDebInfo +# UPLOAD_SYMBOL_FILES: 1 +# PACKAGE: 1 +# PULP3_REPO_NAME: tsg-stable-x86_64.el7 +# PULP3_DIST_NAME: tsg-stable-x86_64.el7 +# extends: .build_by_travis_for_centos7 +# artifacts: +# name: "shaping_engine-install-$CI_COMMIT_REF_NAME-release" +# paths: +# - build/*.rpm +# only: +# - tags ############################################################################### # compile use image: build-env:rockylinux diff --git a/common/include/utils.h b/common/include/utils.h index 0f257a5..c03b291 100644 --- a/common/include/utils.h +++ b/common/include/utils.h @@ -8,6 +8,12 @@ extern "C" #define MIN(a, b) ((a) > (b) ? (b) : (a)) +#ifndef container_of +#define container_of(ptr, type, member) ({ \ + const typeof( ((type *)0)->member ) *__mptr = (ptr); \ + (type *)( (char *)__mptr - offsetof(type,member) );}) +#endif + #define LOG_TAG_SHAPING "SHAPING" #define LOG_TAG_SWARMKV "SWARMKV" #define LOG_TAG_STAT "STAT" diff --git a/deps/timeout/Makefile b/deps/timeout/Makefile new file mode 100644 index 0000000..554ebb9 --- /dev/null +++ b/deps/timeout/Makefile @@ -0,0 +1,68 @@ +# NOTE: GNU Make 3.81 won't export MAKEFLAGS if .POSIX is specified, but +# Solaris make won't export MAKEFLAGS unless .POSIX is specified. +$(firstword ignore).POSIX: + +.DEFAULT_GOAL = all + +.SUFFIXES: + +all: + +# +# USER-MODIFIABLE MACROS +# +top_srcdir = . +top_builddir = . + +CFLAGS = -O2 -march=native -g -Wall -Wextra -Wno-unused-parameter -Wno-unused-function +SOFLAGS = $$(auto_soflags) +LIBS = $$(auto_libs) + +ALL_CPPFLAGS = -I$(top_srcdir) -DWHEEL_BIT=$(WHEEL_BIT) -DWHEEL_NUM=$(WHEEL_NUM) $(CPPFLAGS) +ALL_CFLAGS = $(CFLAGS) +ALL_SOFLAGS = $(SOFLAGS) +ALL_LDFLAGS = $(LDFLAGS) +ALL_LIBS = $(LIBS) + +LUA_API = 5.3 +LUA = lua +LUA51_CPPFLAGS = $(LUA_CPPFLAGS) +LUA52_CPPFLAGS = $(LUA_CPPFLAGS) +LUA53_CPPFLAGS = $(LUA_CPPFLAGS) + +WHEEL_BIT = 6 +WHEEL_NUM = 4 + +RM = rm -f + +# END MACROS + +SHRC = \ + top_srcdir="$(top_srcdir)"; \ + top_builddir="$(top_builddir)"; \ + . "$${top_srcdir}/Rules.shrc" + +LUA_APIS = 5.1 5.2 5.3 + +include $(top_srcdir)/lua/Rules.mk +include $(top_srcdir)/bench/Rules.mk + +all: test-timeout + +timeout.o: $(top_srcdir)/timeout.c +test-timeout.o: $(top_srcdir)/test-timeout.c + +timeout.o test-timeout.o: + @$(SHRC); echo_cmd $(CC) $(ALL_CFLAGS) -c -o $@ $${top_srcdir}/$(@F:%.o=%.c) $(ALL_CPPFLAGS) + +test-timeout: timeout.o test-timeout.o + @$(SHRC); echo_cmd $(CC) $(ALL_CPPFLAGS) $(ALL_CFLAGS) -o $@ timeout.o test-timeout.o + +.PHONY: clean clean~ + +clean: + $(RM) $(top_builddir)/test-timeout $(top_builddir)/*.o + $(RM) -r $(top_builddir)/*.dSYM + +clean~: + find $(top_builddir) $(top_srcdir) -name "*~" -exec $(RM) -- {} "+" diff --git a/deps/timeout/test-timeout.c b/deps/timeout/test-timeout.c new file mode 100644 index 0000000..ab9f72d --- /dev/null +++ b/deps/timeout/test-timeout.c @@ -0,0 +1,530 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <limits.h> + +#include "timeout.h" + +#define THE_END_OF_TIME ((timeout_t)-1) + +static int check_misc(void) { + if (TIMEOUT_VERSION != timeout_version()) + return 1; + if (TIMEOUT_V_REL != timeout_v_rel()) + return 1; + if (TIMEOUT_V_API != timeout_v_api()) + return 1; + if (TIMEOUT_V_ABI != timeout_v_abi()) + return 1; + if (strcmp(timeout_vendor(), TIMEOUT_VENDOR)) + return 1; + return 0; +} + +static int check_open_close(timeout_t hz_set, timeout_t hz_expect) { + int err=0; + struct timeouts *tos = timeouts_open(hz_set, &err); + if (!tos) + return 1; + if (err) + return 1; + if (hz_expect != timeouts_hz(tos)) + return 1; + timeouts_close(tos); + return 0; +} + +/* Not very random */ +static timeout_t random_to(timeout_t min, timeout_t max) +{ + if (max <= min) + return min; + /* Not actually all that random, but should exercise the code. */ + timeout_t rand64 = random() * (timeout_t)INT_MAX + random(); + return min + (rand64 % (max-min)); +} + +/* configuration for check_randomized */ +struct rand_cfg { + /* When creating timeouts, smallest possible delay */ + timeout_t min_timeout; + /* When creating timeouts, largest possible delay */ + timeout_t max_timeout; + /* First time to start the clock at. */ + timeout_t start_at; + /* Do not advance the clock past this time. */ + timeout_t end_at; + /* Number of timeouts to create and monitor. */ + int n_timeouts; + /* Advance the clock by no more than this each step. */ + timeout_t max_step; + /* Use relative timers and stepping */ + int relative; + /* Every time the clock ticks, try removing this many timeouts at + * random. */ + int try_removing; + /* When we're done, advance the clock to the end of time. */ + int finalize; +}; + +static int check_randomized(const struct rand_cfg *cfg) +{ +#define FAIL() do { \ + printf("Failure on line %d\n", __LINE__); \ + goto done; \ + } while (0) + + int i, err; + int rv = 1; + struct timeout *t = calloc(cfg->n_timeouts, sizeof(struct timeout)); + timeout_t *timeouts = calloc(cfg->n_timeouts, sizeof(timeout_t)); + uint8_t *fired = calloc(cfg->n_timeouts, sizeof(uint8_t)); + uint8_t *found = calloc(cfg->n_timeouts, sizeof(uint8_t)); + uint8_t *deleted = calloc(cfg->n_timeouts, sizeof(uint8_t)); + struct timeouts *tos = timeouts_open(0, &err); + timeout_t now = cfg->start_at; + int n_added_pending = 0, cnt_added_pending = 0; + int n_added_expired = 0, cnt_added_expired = 0; + struct timeouts_it it_p, it_e, it_all; + int p_done = 0, e_done = 0, all_done = 0; + struct timeout *to = NULL; + const int rel = cfg->relative; + + if (!t || !timeouts || !tos || !fired || !found || !deleted) + FAIL(); + timeouts_update(tos, cfg->start_at); + + for (i = 0; i < cfg->n_timeouts; ++i) { + if (&t[i] != timeout_init(&t[i], rel ? 0 : TIMEOUT_ABS)) + FAIL(); + if (timeout_pending(&t[i])) + FAIL(); + if (timeout_expired(&t[i])) + FAIL(); + + timeouts[i] = random_to(cfg->min_timeout, cfg->max_timeout); + + timeouts_add(tos, &t[i], timeouts[i] - (rel ? now : 0)); + if (timeouts[i] <= cfg->start_at) { + if (timeout_pending(&t[i])) + FAIL(); + if (! timeout_expired(&t[i])) + FAIL(); + ++n_added_expired; + } else { + if (! timeout_pending(&t[i])) + FAIL(); + if (timeout_expired(&t[i])) + FAIL(); + ++n_added_pending; + } + } + + if (!!n_added_pending != timeouts_pending(tos)) + FAIL(); + if (!!n_added_expired != timeouts_expired(tos)) + FAIL(); + + /* Test foreach, interleaving a few iterators. */ + TIMEOUTS_IT_INIT(&it_p, TIMEOUTS_PENDING); + TIMEOUTS_IT_INIT(&it_e, TIMEOUTS_EXPIRED); + TIMEOUTS_IT_INIT(&it_all, TIMEOUTS_ALL); + while (! (p_done && e_done && all_done)) { + if (!p_done) { + to = timeouts_next(tos, &it_p); + if (to) { + i = to - &t[0]; + ++found[i]; + ++cnt_added_pending; + } else { + p_done = 1; + } + } + if (!e_done) { + to = timeouts_next(tos, &it_e); + if (to) { + i = to - &t[0]; + ++found[i]; + ++cnt_added_expired; + } else { + e_done = 1; + } + } + if (!all_done) { + to = timeouts_next(tos, &it_all); + if (to) { + i = to - &t[0]; + ++found[i]; + } else { + all_done = 1; + } + } + } + + for (i = 0; i < cfg->n_timeouts; ++i) { + if (found[i] != 2) + FAIL(); + } + if (cnt_added_expired != n_added_expired) + FAIL(); + if (cnt_added_pending != n_added_pending) + FAIL(); + + while (NULL != (to = timeouts_get(tos))) { + i = to - &t[0]; + assert(&t[i] == to); + if (timeouts[i] > cfg->start_at) + FAIL(); /* shouldn't have happened yet */ + + --n_added_expired; /* drop expired timeouts. */ + ++fired[i]; + } + + if (n_added_expired != 0) + FAIL(); + + while (now < cfg->end_at) { + int n_fired_this_time = 0; + timeout_t first_at = timeouts_timeout(tos) + now; + + timeout_t oldtime = now; + timeout_t step = random_to(1, cfg->max_step); + int another; + now += step; + if (rel) + timeouts_step(tos, step); + else + timeouts_update(tos, now); + + for (i = 0; i < cfg->try_removing; ++i) { + int idx = random() % cfg->n_timeouts; + if (! fired[idx]) { + timeout_del(&t[idx]); + ++deleted[idx]; + } + } + + another = (timeouts_timeout(tos) == 0); + + while (NULL != (to = timeouts_get(tos))) { + if (! another) + FAIL(); /* Thought we saw the last one! */ + i = to - &t[0]; + assert(&t[i] == to); + if (timeouts[i] > now) + FAIL(); /* shouldn't have happened yet */ + if (timeouts[i] <= oldtime) + FAIL(); /* should have happened already */ + if (timeouts[i] < first_at) + FAIL(); /* first_at should've been earlier */ + fired[i]++; + n_fired_this_time++; + another = (timeouts_timeout(tos) == 0); + } + if (n_fired_this_time && first_at > now) + FAIL(); /* first_at should've been earlier */ + if (another) + FAIL(); /* Huh? We think there are more? */ + if (!timeouts_check(tos, stderr)) + FAIL(); + } + + for (i = 0; i < cfg->n_timeouts; ++i) { + if (fired[i] > 1) + FAIL(); /* Nothing fired twice. */ + if (timeouts[i] <= now) { + if (!(fired[i] || deleted[i])) + FAIL(); + } else { + if (fired[i]) + FAIL(); + } + if (fired[i] && deleted[i]) + FAIL(); + if (cfg->finalize > 1) { + if (!fired[i]) + timeout_del(&t[i]); + } + } + + /* Now nothing more should fire between now and the end of time. */ + if (cfg->finalize) { + timeouts_update(tos, THE_END_OF_TIME); + if (cfg->finalize > 1) { + if (timeouts_get(tos)) + FAIL(); + TIMEOUTS_FOREACH(to, tos, TIMEOUTS_ALL) + FAIL(); + } + } + rv = 0; + + done: + if (tos) timeouts_close(tos); + if (t) free(t); + if (timeouts) free(timeouts); + if (fired) free(fired); + if (found) free(found); + if (deleted) free(deleted); + return rv; +} + +struct intervals_cfg { + const timeout_t *timeouts; + int n_timeouts; + timeout_t start_at; + timeout_t end_at; + timeout_t skip; +}; + +int +check_intervals(struct intervals_cfg *cfg) +{ + int i, err; + int rv = 1; + struct timeout *to; + struct timeout *t = calloc(cfg->n_timeouts, sizeof(struct timeout)); + unsigned *fired = calloc(cfg->n_timeouts, sizeof(unsigned)); + struct timeouts *tos = timeouts_open(0, &err); + + timeout_t now = cfg->start_at; + if (!t || !tos || !fired) + FAIL(); + + timeouts_update(tos, now); + + for (i = 0; i < cfg->n_timeouts; ++i) { + if (&t[i] != timeout_init(&t[i], TIMEOUT_INT)) + FAIL(); + if (timeout_pending(&t[i])) + FAIL(); + if (timeout_expired(&t[i])) + FAIL(); + + timeouts_add(tos, &t[i], cfg->timeouts[i]); + if (! timeout_pending(&t[i])) + FAIL(); + if (timeout_expired(&t[i])) + FAIL(); + } + + while (now < cfg->end_at) { + timeout_t delay = timeouts_timeout(tos); + if (cfg->skip && delay < cfg->skip) + delay = cfg->skip; + timeouts_step(tos, delay); + now += delay; + + while (NULL != (to = timeouts_get(tos))) { + i = to - &t[0]; + assert(&t[i] == to); + fired[i]++; + if (0 != (to->expires - cfg->start_at) % cfg->timeouts[i]) + FAIL(); + if (to->expires <= now) + FAIL(); + if (to->expires > now + cfg->timeouts[i]) + FAIL(); + } + if (!timeouts_check(tos, stderr)) + FAIL(); + } + + timeout_t duration = now - cfg->start_at; + for (i = 0; i < cfg->n_timeouts; ++i) { + if (cfg->skip) { + if (fired[i] > duration / cfg->timeouts[i]) + FAIL(); + } else { + if (fired[i] != duration / cfg->timeouts[i]) + FAIL(); + } + if (!timeout_pending(&t[i])) + FAIL(); + } + + rv = 0; + done: + if (t) free(t); + if (fired) free(fired); + if (tos) free(tos); + return rv; +} + +int +main(int argc, char **argv) +{ + int j; + int n_failed = 0; +#define DO(fn) do { \ + printf("."); fflush(stdout); \ + if (fn) { \ + ++n_failed; \ + printf("%s failed\n", #fn); \ + } \ + } while (0) + +#define DO_N(n, fn) do { \ + for (j = 0; j < (n); ++j) { \ + DO(fn); \ + } \ + } while (0) + + DO(check_misc()); + DO(check_open_close(1000, 1000)); + DO(check_open_close(0, TIMEOUT_mHZ)); + + struct rand_cfg cfg1 = { + .min_timeout = 1, + .max_timeout = 100, + .start_at = 5, + .end_at = 1000, + .n_timeouts = 1000, + .max_step = 10, + .relative = 0, + .try_removing = 0, + .finalize = 2, + }; + DO_N(300,check_randomized(&cfg1)); + + struct rand_cfg cfg2 = { + .min_timeout = 20, + .max_timeout = 1000, + .start_at = 10, + .end_at = 100, + .n_timeouts = 1000, + .max_step = 5, + .relative = 1, + .try_removing = 0, + .finalize = 2, + }; + DO_N(300,check_randomized(&cfg2)); + + struct rand_cfg cfg2b = { + .min_timeout = 20, + .max_timeout = 1000, + .start_at = 10, + .end_at = 100, + .n_timeouts = 1000, + .max_step = 5, + .relative = 1, + .try_removing = 0, + .finalize = 1, + }; + DO_N(300,check_randomized(&cfg2b)); + + struct rand_cfg cfg2c = { + .min_timeout = 20, + .max_timeout = 1000, + .start_at = 10, + .end_at = 100, + .n_timeouts = 1000, + .max_step = 5, + .relative = 1, + .try_removing = 0, + .finalize = 0, + }; + DO_N(300,check_randomized(&cfg2c)); + + struct rand_cfg cfg3 = { + .min_timeout = 2000, + .max_timeout = ((uint64_t)1) << 50, + .start_at = 100, + .end_at = ((uint64_t)1) << 49, + .n_timeouts = 1000, + .max_step = ((uint64_t)1) << 31, + .relative = 0, + .try_removing = 0, + .finalize = 2, + }; + DO_N(10,check_randomized(&cfg3)); + + struct rand_cfg cfg3b = { + .min_timeout = ((uint64_t)1) << 50, + .max_timeout = ((uint64_t)1) << 52, + .start_at = 100, + .end_at = ((uint64_t)1) << 53, + .n_timeouts = 1000, + .max_step = ((uint64_t)1)<<48, + .relative = 0, + .try_removing = 0, + .finalize = 2, + }; + DO_N(10,check_randomized(&cfg3b)); + + struct rand_cfg cfg4 = { + .min_timeout = 2000, + .max_timeout = ((uint64_t)1) << 30, + .start_at = 100, + .end_at = ((uint64_t)1) << 26, + .n_timeouts = 10000, + .max_step = 1<<16, + .relative = 0, + .try_removing = 3, + .finalize = 2, + }; + DO_N(10,check_randomized(&cfg4)); + + const timeout_t primes[] = { + 2,3,5,7,11,13,17,19,23,29,31,37,41,43,47,53, + 59,61,67,71,73,79,83,89,97 + }; + const timeout_t factors_of_1337[] = { + 1, 7, 191, 1337 + }; + const timeout_t multiples_of_five[] = { + 5, 10, 15, 20, 25, 30, 35, 40, 45, 50 + }; + + struct intervals_cfg icfg1 = { + .timeouts = primes, + .n_timeouts = sizeof(primes)/sizeof(timeout_t), + .start_at = 50, + .end_at = 5322, + .skip = 0, + }; + DO(check_intervals(&icfg1)); + + struct intervals_cfg icfg2 = { + .timeouts = factors_of_1337, + .n_timeouts = sizeof(factors_of_1337)/sizeof(timeout_t), + .start_at = 50, + .end_at = 50000, + .skip = 0, + }; + DO(check_intervals(&icfg2)); + + struct intervals_cfg icfg3 = { + .timeouts = multiples_of_five, + .n_timeouts = sizeof(multiples_of_five)/sizeof(timeout_t), + .start_at = 49, + .end_at = 5333, + .skip = 0, + }; + DO(check_intervals(&icfg3)); + + struct intervals_cfg icfg4 = { + .timeouts = primes, + .n_timeouts = sizeof(primes)/sizeof(timeout_t), + .start_at = 50, + .end_at = 5322, + .skip = 16, + }; + DO(check_intervals(&icfg4)); + + if (n_failed) { + puts("\nFAIL"); + } else { + puts("\nOK"); + } + return !!n_failed; +} + +/* TODO: + + * Solve PR#3. + + * Investigate whether any untaken branches are possible. + + */ diff --git a/deps/timeout/timeout-bitops.c b/deps/timeout/timeout-bitops.c new file mode 100644 index 0000000..d8325db --- /dev/null +++ b/deps/timeout/timeout-bitops.c @@ -0,0 +1,249 @@ +#include <stdint.h> +#ifdef _MSC_VER +#include <intrin.h> /* _BitScanForward, _BitScanReverse */ +#endif + +/* First define ctz and clz functions; these are compiler-dependent if + * you want them to be fast. */ +#if defined(__GNUC__) && !defined(TIMEOUT_DISABLE_GNUC_BITOPS) + +/* On GCC and clang and some others, we can use __builtin functions. They + * are not defined for n==0, but timeout.s never calls them with n==0. */ + +#define ctz64(n) __builtin_ctzll(n) +#define clz64(n) __builtin_clzll(n) +#if LONG_BITS == 32 +#define ctz32(n) __builtin_ctzl(n) +#define clz32(n) __builtin_clzl(n) +#else +#define ctz32(n) __builtin_ctz(n) +#define clz32(n) __builtin_clz(n) +#endif + +#elif defined(_MSC_VER) && !defined(TIMEOUT_DISABLE_MSVC_BITOPS) + +/* On MSVC, we have these handy functions. We can ignore their return + * values, since we will never supply val == 0. */ + +static __inline int ctz32(unsigned long val) +{ + DWORD zeros = 0; + _BitScanForward(&zeros, val); + return zeros; +} +static __inline int clz32(unsigned long val) +{ + DWORD zeros = 0; + _BitScanReverse(&zeros, val); + return zeros; +} +#ifdef _WIN64 +/* According to the documentation, these only exist on Win64. */ +static __inline int ctz64(uint64_t val) +{ + DWORD zeros = 0; + _BitScanForward64(&zeros, val); + return zeros; +} +static __inline int clz64(uint64_t val) +{ + DWORD zeros = 0; + _BitScanReverse64(&zeros, val); + return zeros; +} +#else +static __inline int ctz64(uint64_t val) +{ + uint32_t lo = (uint32_t) val; + uint32_t hi = (uint32_t) (val >> 32); + return lo ? ctz32(lo) : 32 + ctz32(hi); +} +static __inline int clz64(uint64_t val) +{ + uint32_t lo = (uint32_t) val; + uint32_t hi = (uint32_t) (val >> 32); + return hi ? clz32(hi) : 32 + clz32(lo); +} +#endif + +/* End of MSVC case. */ + +#else + +/* TODO: There are more clever ways to do this in the generic case. */ + + +#define process_(one, cz_bits, bits) \ + if (x < ( one << (cz_bits - bits))) { rv += bits; x <<= bits; } + +#define process64(bits) process_((UINT64_C(1)), 64, (bits)) +static inline int clz64(uint64_t x) +{ + int rv = 0; + + process64(32); + process64(16); + process64(8); + process64(4); + process64(2); + process64(1); + return rv; +} +#define process32(bits) process_((UINT32_C(1)), 32, (bits)) +static inline int clz32(uint32_t x) +{ + int rv = 0; + + process32(16); + process32(8); + process32(4); + process32(2); + process32(1); + return rv; +} + +#undef process_ +#undef process32 +#undef process64 +#define process_(one, bits) \ + if ((x & ((one << (bits))-1)) == 0) { rv += bits; x >>= bits; } + +#define process64(bits) process_((UINT64_C(1)), bits) +static inline int ctz64(uint64_t x) +{ + int rv = 0; + + process64(32); + process64(16); + process64(8); + process64(4); + process64(2); + process64(1); + return rv; +} + +#define process32(bits) process_((UINT32_C(1)), bits) +static inline int ctz32(uint32_t x) +{ + int rv = 0; + + process32(16); + process32(8); + process32(4); + process32(2); + process32(1); + return rv; +} + +#undef process32 +#undef process64 +#undef process_ + +/* End of generic case */ + +#endif /* End of defining ctz */ + +#ifdef TEST_BITOPS +#include <stdio.h> +#include <stdlib.h> + +static uint64_t testcases[] = { + 13371337 * 10, + 100, + 385789752, + 82574, + (((uint64_t)1)<<63) + (((uint64_t)1)<<31) + 10101 +}; + +static int +naive_clz(int bits, uint64_t v) +{ + int r = 0; + uint64_t bit = ((uint64_t)1) << (bits-1); + while (bit && 0 == (v & bit)) { + r++; + bit >>= 1; + } + /* printf("clz(%d,%lx) -> %d\n", bits, v, r); */ + return r; +} + +static int +naive_ctz(int bits, uint64_t v) +{ + int r = 0; + uint64_t bit = 1; + while (bit && 0 == (v & bit)) { + r++; + bit <<= 1; + if (r == bits) + break; + } + /* printf("ctz(%d,%lx) -> %d\n", bits, v, r); */ + return r; +} + +static int +check(uint64_t vv) +{ + uint32_t v32 = (uint32_t) vv; + + if (vv == 0) + return 1; /* c[tl]z64(0) is undefined. */ + + if (ctz64(vv) != naive_ctz(64, vv)) { + printf("mismatch with ctz64: %d\n", ctz64(vv)); + exit(1); + return 0; + } + if (clz64(vv) != naive_clz(64, vv)) { + printf("mismatch with clz64: %d\n", clz64(vv)); + exit(1); + return 0; + } + + if (v32 == 0) + return 1; /* c[lt]z(0) is undefined. */ + + if (ctz32(v32) != naive_ctz(32, v32)) { + printf("mismatch with ctz32: %d\n", ctz32(v32)); + exit(1); + return 0; + } + if (clz32(v32) != naive_clz(32, v32)) { + printf("mismatch with clz32: %d\n", clz32(v32)); + exit(1); + return 0; + } + return 1; +} + +int +main(int c, char **v) +{ + unsigned int i; + const unsigned int n = sizeof(testcases)/sizeof(testcases[0]); + int result = 0; + + for (i = 0; i <= 63; ++i) { + uint64_t x = 1 << i; + if (!check(x)) + result = 1; + --x; + if (!check(x)) + result = 1; + } + + for (i = 0; i < n; ++i) { + if (! check(testcases[i])) + result = 1; + } + if (result) { + puts("FAIL"); + } else { + puts("OK"); + } + return result; +} +#endif + diff --git a/deps/timeout/timeout-debug.h b/deps/timeout/timeout-debug.h new file mode 100644 index 0000000..fc727a6 --- /dev/null +++ b/deps/timeout/timeout-debug.h @@ -0,0 +1,77 @@ +/* + * D E B U G R O U T I N E S + * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#if TIMEOUT_DEBUG - 0 +#include <stdlib.h> +#include <stdio.h> + +#undef TIMEOUT_DEBUG +#define TIMEOUT_DEBUG 1 +#define DEBUG_LEVEL timeout_debug + +static int timeout_debug; + +#define SAYit_(lvl, fmt, ...) do { \ + if (DEBUG_LEVEL >= (lvl)) \ + fprintf(stderr, fmt "%s", __FILE__, __LINE__, __func__, __VA_ARGS__); \ +} while (0) + +#define SAYit(lvl, ...) SAYit_((lvl), "%s:%d:%s: " __VA_ARGS__, "\n") + +#define PANIC(...) do { \ + SAYit(0, __VA_ARGS__); \ + _Exit(EXIT_FAILURE); \ +} while (0) +#else +#undef TIMEOUT_DEBUG +#define TIMEOUT_DEBUG 0 +#define DEBUG_LEVEL 0 + +#define SAYit(...) (void)0 +#endif + +#define SAY(...) SAYit(1, __VA_ARGS__) +#define HAI SAY("HAI") + + +static inline char *fmt_(char *buf, uint64_t ts, int wheel_bit, int wheel_num) { + char *p = buf; + int wheel, n, i; + + for (wheel = wheel_num - 2; wheel >= 0; wheel--) { + n = ((1 << wheel_bit) - 1) & (ts >> (wheel * WHEEL_BIT)); + + for (i = wheel_bit - 1; i >= 0; i--) { + *p++ = '0' + !!(n & (1 << i)); + } + + if (wheel != 0) + *p++ = ':'; + } + + *p = 0; + + return buf; +} /* fmt_() */ + +#define fmt(ts) fmt_(((char[((1 << WHEEL_BIT) * WHEEL_NUM) + WHEEL_NUM + 1]){ 0 }), (ts), WHEEL_BIT, WHEEL_NUM) + + +static inline char *bin64_(char *buf, uint64_t n, int wheel_bit) { + char *p = buf; + int i; + + for (i = 0; i < (1 << wheel_bit); i++) { + *p++ = "01"[0x1 & (n >> (((1 << wheel_bit) - 1) - i))]; + } + + *p = 0; + + return buf; +} /* bin64_() */ + +#define bin64(ts) bin64_(((char[((1 << WHEEL_BIT) * WHEEL_NUM) + 1]){ 0 }), (ts), WHEEL_BIT) + + diff --git a/deps/timeout/timeout.c b/deps/timeout/timeout.c new file mode 100644 index 0000000..aac21a0 --- /dev/null +++ b/deps/timeout/timeout.c @@ -0,0 +1,749 @@ +/* ========================================================================== + * timeout.c - Tickless hierarchical timing wheel. + * -------------------------------------------------------------------------- + * Copyright (c) 2013, 2014 William Ahern + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to permit + * persons to whom the Software is furnished to do so, subject to the + * following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS + * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE + * USE OR OTHER DEALINGS IN THE SOFTWARE. + * ========================================================================== + */ +#include <limits.h> /* CHAR_BIT */ + +#include <stddef.h> /* NULL */ +#include <stdlib.h> /* malloc(3) free(3) */ +#include <stdio.h> /* FILE fprintf(3) */ + +#include <inttypes.h> /* UINT64_C uint64_t */ + +#include <string.h> /* memset(3) */ + +#include <errno.h> /* errno */ + +#include <sys/queue.h> /* TAILQ(3) */ + +#include "timeout.h" + +#if TIMEOUT_DEBUG - 0 +#include "timeout-debug.h" +#endif + +#ifdef TIMEOUT_DISABLE_RELATIVE_ACCESS +#define TO_SET_TIMEOUTS(to, T) ((void)0) +#else +#define TO_SET_TIMEOUTS(to, T) ((to)->timeouts = (T)) +#endif + +/* + * A N C I L L A R Y R O U T I N E S + * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#define abstime_t timeout_t /* for documentation purposes */ +#define reltime_t timeout_t /* "" */ + +#if !defined countof +#define countof(a) (sizeof (a) / sizeof *(a)) +#endif + +#if !defined endof +#define endof(a) (&(a)[countof(a)]) +#endif + +#if !defined MIN +#define MIN(a, b) (((a) < (b))? (a) : (b)) +#endif + +#if !defined MAX +#define MAX(a, b) (((a) > (b))? (a) : (b)) +#endif + +#if !defined TAILQ_CONCAT +#define TAILQ_CONCAT(head1, head2, field) do { \ + if (!TAILQ_EMPTY(head2)) { \ + *(head1)->tqh_last = (head2)->tqh_first; \ + (head2)->tqh_first->field.tqe_prev = (head1)->tqh_last; \ + (head1)->tqh_last = (head2)->tqh_last; \ + TAILQ_INIT((head2)); \ + } \ +} while (0) +#endif + +#if !defined TAILQ_FOREACH_SAFE +#define TAILQ_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = TAILQ_FIRST(head); \ + (var) && ((tvar) = TAILQ_NEXT(var, field), 1); \ + (var) = (tvar)) +#endif + + +/* + * B I T M A N I P U L A T I O N R O U T I N E S + * + * The macros and routines below implement wheel parameterization. The + * inputs are: + * + * WHEEL_BIT - The number of value bits mapped in each wheel. The + * lowest-order WHEEL_BIT bits index the lowest-order (highest + * resolution) wheel, the next group of WHEEL_BIT bits the + * higher wheel, etc. + * + * WHEEL_NUM - The number of wheels. WHEEL_BIT * WHEEL_NUM = the number of + * value bits used by all the wheels. For the default of 6 and + * 4, only the low 24 bits are processed. Any timeout value + * larger than this will cycle through again. + * + * The implementation uses bit fields to remember which slot in each wheel + * is populated, and to generate masks of expiring slots according to the + * current update interval (i.e. the "tickless" aspect). The slots to + * process in a wheel are (populated-set & interval-mask). + * + * WHEEL_BIT cannot be larger than 6 bits because 2^6 -> 64 is the largest + * number of slots which can be tracked in a uint64_t integer bit field. + * WHEEL_BIT cannot be smaller than 3 bits because of our rotr and rotl + * routines, which only operate on all the value bits in an integer, and + * there's no integer smaller than uint8_t. + * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#if !defined WHEEL_BIT +#define WHEEL_BIT 6 +#endif + +#if !defined WHEEL_NUM +#define WHEEL_NUM 4 +#endif + +#define WHEEL_LEN (1U << WHEEL_BIT) +#define WHEEL_MAX (WHEEL_LEN - 1) +#define WHEEL_MASK (WHEEL_LEN - 1) +#define TIMEOUT_MAX ((TIMEOUT_C(1) << (WHEEL_BIT * WHEEL_NUM)) - 1) + +#include "timeout-bitops.c" + +#if WHEEL_BIT == 6 +#define ctz(n) ctz64(n) +#define clz(n) clz64(n) +#define fls(n) ((int)(64 - clz64(n))) +#else +#define ctz(n) ctz32(n) +#define clz(n) clz32(n) +#define fls(n) ((int)(32 - clz32(n))) +#endif + +#if WHEEL_BIT == 6 +#define WHEEL_C(n) UINT64_C(n) +#define WHEEL_PRIu PRIu64 +#define WHEEL_PRIx PRIx64 + +typedef uint64_t wheel_t; + +#elif WHEEL_BIT == 5 + +#define WHEEL_C(n) UINT32_C(n) +#define WHEEL_PRIu PRIu32 +#define WHEEL_PRIx PRIx32 + +typedef uint32_t wheel_t; + +#elif WHEEL_BIT == 4 + +#define WHEEL_C(n) UINT16_C(n) +#define WHEEL_PRIu PRIu16 +#define WHEEL_PRIx PRIx16 + +typedef uint16_t wheel_t; + +#elif WHEEL_BIT == 3 + +#define WHEEL_C(n) UINT8_C(n) +#define WHEEL_PRIu PRIu8 +#define WHEEL_PRIx PRIx8 + +typedef uint8_t wheel_t; + +#else +#error invalid WHEEL_BIT value +#endif + + +static inline wheel_t rotl(const wheel_t v, int c) { + if (!(c &= (sizeof v * CHAR_BIT - 1))) + return v; + + return (v << c) | (v >> (sizeof v * CHAR_BIT - c)); +} /* rotl() */ + + +static inline wheel_t rotr(const wheel_t v, int c) { + if (!(c &= (sizeof v * CHAR_BIT - 1))) + return v; + + return (v >> c) | (v << (sizeof v * CHAR_BIT - c)); +} /* rotr() */ + + +/* + * T I M E R R O U T I N E S + * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +TAILQ_HEAD(timeout_list, timeout); + +struct timeouts { + struct timeout_list wheel[WHEEL_NUM][WHEEL_LEN], expired; + + wheel_t pending[WHEEL_NUM]; + + timeout_t curtime; + timeout_t hertz; + long long count; +}; /* struct timeouts */ + + +static struct timeouts *timeouts_init(struct timeouts *T, timeout_t hz) { + unsigned i, j; + + for (i = 0; i < countof(T->wheel); i++) { + for (j = 0; j < countof(T->wheel[i]); j++) { + TAILQ_INIT(&T->wheel[i][j]); + } + } + + TAILQ_INIT(&T->expired); + + for (i = 0; i < countof(T->pending); i++) { + T->pending[i] = 0; + } + + T->curtime = 0; + T->hertz = (hz)? hz : TIMEOUT_mHZ; + T->count =0; + return T; +} /* timeouts_init() */ +long long timeouts_count(struct timeouts *T) +{ + return T->count; +} +TIMEOUT_PUBLIC struct timeouts *timeouts_open(timeout_t hz, int *error) { + struct timeouts *T; + + if ((T = malloc(sizeof *T))) + return timeouts_init(T, hz); + + *error = errno; + + return NULL; +} /* timeouts_open() */ + + +static void timeouts_reset(struct timeouts *T) { + struct timeout_list reset; + struct timeout *to; + unsigned i, j; + + TAILQ_INIT(&reset); + + for (i = 0; i < countof(T->wheel); i++) { + for (j = 0; j < countof(T->wheel[i]); j++) { + TAILQ_CONCAT(&reset, &T->wheel[i][j], tqe); + } + } + + TAILQ_CONCAT(&reset, &T->expired, tqe); + + TAILQ_FOREACH(to, &reset, tqe) { + to->pending = NULL; + TO_SET_TIMEOUTS(to, NULL); + } + T->count=0; +} /* timeouts_reset() */ + + +TIMEOUT_PUBLIC void timeouts_close(struct timeouts *T) { + /* + * NOTE: Delete installed timeouts so timeout_pending() and + * timeout_expired() worked as expected. + */ + timeouts_reset(T); + + free(T); +} /* timeouts_close() */ + + +TIMEOUT_PUBLIC timeout_t timeouts_hz(struct timeouts *T) { + return T->hertz; +} /* timeouts_hz() */ + + +TIMEOUT_PUBLIC void timeouts_del(struct timeouts *T, struct timeout *to) { + if (to->pending) { + TAILQ_REMOVE(to->pending, to, tqe); + + if (to->pending != &T->expired && TAILQ_EMPTY(to->pending)) { + ptrdiff_t index = to->pending - &T->wheel[0][0]; + int wheel = index / WHEEL_LEN; + int slot = index % WHEEL_LEN; + + T->pending[wheel] &= ~(WHEEL_C(1) << slot); + } + + to->pending = NULL; + TO_SET_TIMEOUTS(to, NULL); + T->count--; + } +} /* timeouts_del() */ + + +static inline reltime_t timeout_rem(struct timeouts *T, struct timeout *to) { + return to->expires - T->curtime; +} /* timeout_rem() */ + + +static inline int timeout_wheel(timeout_t timeout) { + /* must be called with timeout != 0, so fls input is nonzero */ + return (fls(MIN(timeout, TIMEOUT_MAX)) - 1) / WHEEL_BIT; +} /* timeout_wheel() */ + + +static inline int timeout_slot(int wheel, timeout_t expires) { + return WHEEL_MASK & ((expires >> (wheel * WHEEL_BIT)) - !!wheel); +} /* timeout_slot() */ + + +static void timeouts_sched(struct timeouts *T, struct timeout *to, timeout_t expires) { + timeout_t rem; + int wheel, slot; + + timeouts_del(T, to); + + to->expires = expires; + + TO_SET_TIMEOUTS(to, T); + + if (expires > T->curtime) { + rem = timeout_rem(T, to); + + /* rem is nonzero since: + * rem == timeout_rem(T,to), + * == to->expires - T->curtime + * and above we have expires > T->curtime. + */ + wheel = timeout_wheel(rem); + slot = timeout_slot(wheel, to->expires); + + to->pending = &T->wheel[wheel][slot]; + TAILQ_INSERT_TAIL(to->pending, to, tqe); + + T->pending[wheel] |= WHEEL_C(1) << slot; + } else { + to->pending = &T->expired; + TAILQ_INSERT_TAIL(to->pending, to, tqe); + } +} /* timeouts_sched() */ + + +#ifndef TIMEOUT_DISABLE_INTERVALS +static void timeouts_readd(struct timeouts *T, struct timeout *to) { + to->expires += to->interval; + + if (to->expires <= T->curtime) { + /* If we've missed the next firing of this timeout, reschedule + * it to occur at the next multiple of its interval after + * the last time that it fired. + */ + timeout_t n = T->curtime - to->expires; + timeout_t r = n % to->interval; + to->expires = T->curtime + (to->interval - r); + } + + timeouts_sched(T, to, to->expires); +} /* timeouts_readd() */ +#endif + + +TIMEOUT_PUBLIC void timeouts_add(struct timeouts *T, struct timeout *to, timeout_t timeout) { +#ifndef TIMEOUT_DISABLE_INTERVALS + if (to->flags & TIMEOUT_INT) + to->interval = MAX(1, timeout); +#endif + T->count++; + if (to->flags & TIMEOUT_ABS) + timeouts_sched(T, to, timeout); + else + timeouts_sched(T, to, T->curtime + timeout); +} /* timeouts_add() */ + + +TIMEOUT_PUBLIC void timeouts_update(struct timeouts *T, abstime_t curtime) { + timeout_t elapsed = curtime - T->curtime; + struct timeout_list todo; + int wheel; + + TAILQ_INIT(&todo); + + /* + * There's no avoiding looping over every wheel. It's best to keep + * WHEEL_NUM smallish. + */ + for (wheel = 0; wheel < WHEEL_NUM; wheel++) { + wheel_t pending; + + /* + * Calculate the slots expiring in this wheel + * + * If the elapsed time is greater than the maximum period of + * the wheel, mark every position as expiring. + * + * Otherwise, to determine the expired slots fill in all the + * bits between the last slot processed and the current + * slot, inclusive of the last slot. We'll bitwise-AND this + * with our pending set below. + * + * If a wheel rolls over, force a tick of the next higher + * wheel. + */ + if ((elapsed >> (wheel * WHEEL_BIT)) > WHEEL_MAX) { + pending = (wheel_t)~WHEEL_C(0); + } else { + wheel_t _elapsed = WHEEL_MASK & (elapsed >> (wheel * WHEEL_BIT)); + int oslot, nslot; + + /* + * TODO: It's likely that at least one of the + * following three bit fill operations is redundant + * or can be replaced with a simpler operation. + */ + oslot = WHEEL_MASK & (T->curtime >> (wheel * WHEEL_BIT)); + pending = rotl(((UINT64_C(1) << _elapsed) - 1), oslot); + + nslot = WHEEL_MASK & (curtime >> (wheel * WHEEL_BIT)); + pending |= rotr(rotl(((WHEEL_C(1) << _elapsed) - 1), nslot), _elapsed); + pending |= WHEEL_C(1) << nslot; + } + + while (pending & T->pending[wheel]) { + /* ctz input cannot be zero: loop condition. */ + int slot = ctz(pending & T->pending[wheel]); + TAILQ_CONCAT(&todo, &T->wheel[wheel][slot], tqe); + T->pending[wheel] &= ~(UINT64_C(1) << slot); + } + + if (!(0x1 & pending)) + break; /* break if we didn't wrap around end of wheel */ + + /* if we're continuing, the next wheel must tick at least once */ + elapsed = MAX(elapsed, (WHEEL_LEN << (wheel * WHEEL_BIT))); + } + + T->curtime = curtime; + + while (!TAILQ_EMPTY(&todo)) { + struct timeout *to = TAILQ_FIRST(&todo); + + TAILQ_REMOVE(&todo, to, tqe); + to->pending = NULL; + + timeouts_sched(T, to, to->expires); + } + + return; +} /* timeouts_update() */ + + +TIMEOUT_PUBLIC void timeouts_step(struct timeouts *T, reltime_t elapsed) { + timeouts_update(T, T->curtime + elapsed); +} /* timeouts_step() */ + + +TIMEOUT_PUBLIC bool timeouts_pending(struct timeouts *T) { + wheel_t pending = 0; + int wheel; + + for (wheel = 0; wheel < WHEEL_NUM; wheel++) { + pending |= T->pending[wheel]; + } + + return !!pending; +} /* timeouts_pending() */ + + +TIMEOUT_PUBLIC bool timeouts_expired(struct timeouts *T) { + return !TAILQ_EMPTY(&T->expired); +} /* timeouts_expired() */ + + +/* + * Calculate the interval before needing to process any timeouts pending on + * any wheel. + * + * (This is separated from the public API routine so we can evaluate our + * wheel invariant assertions irrespective of the expired queue.) + * + * This might return a timeout value sooner than any installed timeout if + * only higher-order wheels have timeouts pending. We can only know when to + * process a wheel, not precisely when a timeout is scheduled. Our timeout + * accuracy could be off by 2^(N*M)-1 units where N is the wheel number and + * M is WHEEL_BIT. Only timeouts which have fallen through to wheel 0 can be + * known exactly. + * + * We should never return a timeout larger than the lowest actual timeout. + */ +static timeout_t timeouts_int(struct timeouts *T) { + timeout_t timeout = ~TIMEOUT_C(0), _timeout; + timeout_t relmask; + int wheel, slot; + + relmask = 0; + + for (wheel = 0; wheel < WHEEL_NUM; wheel++) { + if (T->pending[wheel]) { + slot = WHEEL_MASK & (T->curtime >> (wheel * WHEEL_BIT)); + + /* ctz input cannot be zero: T->pending[wheel] is + * nonzero, so rotr() is nonzero. */ + _timeout = (ctz(rotr(T->pending[wheel], slot)) + !!wheel) << (wheel * WHEEL_BIT); + /* +1 to higher order wheels as those timeouts are one rotation in the future (otherwise they'd be on a lower wheel or expired) */ + + _timeout -= relmask & T->curtime; + /* reduce by how much lower wheels have progressed */ + + timeout = MIN(_timeout, timeout); + } + + relmask <<= WHEEL_BIT; + relmask |= WHEEL_MASK; + } + + return timeout; +} /* timeouts_int() */ + + +/* + * Calculate the interval our caller can wait before needing to process + * events. + */ +TIMEOUT_PUBLIC timeout_t timeouts_timeout(struct timeouts *T) { + if (!TAILQ_EMPTY(&T->expired)) + return 0; + + return timeouts_int(T); +} /* timeouts_timeout() */ + + +TIMEOUT_PUBLIC struct timeout *timeouts_get(struct timeouts *T) { + if (!TAILQ_EMPTY(&T->expired)) { + struct timeout *to = TAILQ_FIRST(&T->expired); + + TAILQ_REMOVE(&T->expired, to, tqe); + to->pending = NULL; + TO_SET_TIMEOUTS(to, NULL); + +#ifndef TIMEOUT_DISABLE_INTERVALS + if ((to->flags & TIMEOUT_INT) && to->interval > 0) + timeouts_readd(T, to); +#endif + T->count--; + return to; + } else { + return 0; + } +} /* timeouts_get() */ + + +/* + * Use dumb looping to locate the earliest timeout pending on the wheel so + * our invariant assertions can check the result of our optimized code. + */ +static struct timeout *timeouts_min(struct timeouts *T) { + struct timeout *to, *min = NULL; + unsigned i, j; + + for (i = 0; i < countof(T->wheel); i++) { + for (j = 0; j < countof(T->wheel[i]); j++) { + TAILQ_FOREACH(to, &T->wheel[i][j], tqe) { + if (!min || to->expires < min->expires) + min = to; + } + } + } + + return min; +} /* timeouts_min() */ + + +/* + * Check some basic algorithm invariants. If these invariants fail then + * something is definitely broken. + */ +#define report(...) do { \ + if ((fp)) \ + fprintf(fp, __VA_ARGS__); \ +} while (0) + +#define check(expr, ...) do { \ + if (!(expr)) { \ + report(__VA_ARGS__); \ + return 0; \ + } \ +} while (0) + +TIMEOUT_PUBLIC bool timeouts_check(struct timeouts *T, FILE *fp) { + timeout_t timeout; + struct timeout *to; + + if ((to = timeouts_min(T))) { + check(to->expires > T->curtime, "missed timeout (expires:%" TIMEOUT_PRIu " <= curtime:%" TIMEOUT_PRIu ")\n", to->expires, T->curtime); + + timeout = timeouts_int(T); + check(timeout <= to->expires - T->curtime, "wrong soft timeout (soft:%" TIMEOUT_PRIu " > hard:%" TIMEOUT_PRIu ") (expires:%" TIMEOUT_PRIu "; curtime:%" TIMEOUT_PRIu ")\n", timeout, (to->expires - T->curtime), to->expires, T->curtime); + + timeout = timeouts_timeout(T); + check(timeout <= to->expires - T->curtime, "wrong soft timeout (soft:%" TIMEOUT_PRIu " > hard:%" TIMEOUT_PRIu ") (expires:%" TIMEOUT_PRIu "; curtime:%" TIMEOUT_PRIu ")\n", timeout, (to->expires - T->curtime), to->expires, T->curtime); + } else { + timeout = timeouts_timeout(T); + + if (!TAILQ_EMPTY(&T->expired)) + check(timeout == 0, "wrong soft timeout (soft:%" TIMEOUT_PRIu " != hard:%" TIMEOUT_PRIu ")\n", timeout, TIMEOUT_C(0)); + else + check(timeout == ~TIMEOUT_C(0), "wrong soft timeout (soft:%" TIMEOUT_PRIu " != hard:%" TIMEOUT_PRIu ")\n", timeout, ~TIMEOUT_C(0)); + } + + return 1; +} /* timeouts_check() */ + + +#define ENTER \ + do { \ + static const int pc0 = __LINE__; \ + switch (pc0 + it->pc) { \ + case __LINE__: (void)0 + +#define SAVE_AND_DO(do_statement) \ + do { \ + it->pc = __LINE__ - pc0; \ + do_statement; \ + case __LINE__: (void)0; \ + } while (0) + +#define YIELD(rv) \ + SAVE_AND_DO(return (rv)) + +#define LEAVE \ + SAVE_AND_DO(break); \ + } \ + } while (0) + +TIMEOUT_PUBLIC struct timeout *timeouts_next(struct timeouts *T, struct timeouts_it *it) { + struct timeout *to; + + ENTER; + + if (it->flags & TIMEOUTS_EXPIRED) { + if (it->flags & TIMEOUTS_CLEAR) { + while ((to = timeouts_get(T))) { + YIELD(to); + } + } else { + TAILQ_FOREACH_SAFE(to, &T->expired, tqe, it->to) { + YIELD(to); + } + } + } + + if (it->flags & TIMEOUTS_PENDING) { + for (it->i = 0; it->i < countof(T->wheel); it->i++) { + for (it->j = 0; it->j < countof(T->wheel[it->i]); it->j++) { + TAILQ_FOREACH_SAFE(to, &T->wheel[it->i][it->j], tqe, it->to) { + YIELD(to); + } + } + } + } + + LEAVE; + + return NULL; +} /* timeouts_next */ + +#undef LEAVE +#undef YIELD +#undef SAVE_AND_DO +#undef ENTER + + +/* + * T I M E O U T R O U T I N E S + * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +TIMEOUT_PUBLIC struct timeout *timeout_init(struct timeout *to, int flags) { + memset(to, 0, sizeof *to); + + to->flags = flags; + + return to; +} /* timeout_init() */ + + +#ifndef TIMEOUT_DISABLE_RELATIVE_ACCESS +TIMEOUT_PUBLIC bool timeout_pending(struct timeout *to) { + return to->pending && to->pending != &to->timeouts->expired; +} /* timeout_pending() */ + + +TIMEOUT_PUBLIC bool timeout_expired(struct timeout *to) { + return to->pending && to->pending == &to->timeouts->expired; +} /* timeout_expired() */ + + +TIMEOUT_PUBLIC void timeout_del(struct timeout *to) { + timeouts_del(to->timeouts, to); +} /* timeout_del() */ +#endif + + +/* + * V E R S I O N I N T E R F A C E S + * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +TIMEOUT_PUBLIC int timeout_version(void) { + return TIMEOUT_VERSION; +} /* timeout_version() */ + + +TIMEOUT_PUBLIC const char *timeout_vendor(void) { + return TIMEOUT_VENDOR; +} /* timeout_version() */ + + +TIMEOUT_PUBLIC int timeout_v_rel(void) { + return TIMEOUT_V_REL; +} /* timeout_version() */ + + +TIMEOUT_PUBLIC int timeout_v_abi(void) { + return TIMEOUT_V_ABI; +} /* timeout_version() */ + + +TIMEOUT_PUBLIC int timeout_v_api(void) { + return TIMEOUT_V_API; +} /* timeout_version() */ + diff --git a/deps/timeout/timeout.h b/deps/timeout/timeout.h new file mode 100644 index 0000000..b597ab5 --- /dev/null +++ b/deps/timeout/timeout.h @@ -0,0 +1,256 @@ +/* ========================================================================== + * timeout.h - Tickless hierarchical timing wheel. + * -------------------------------------------------------------------------- + * Copyright (c) 2013, 2014 William Ahern + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to permit + * persons to whom the Software is furnished to do so, subject to the + * following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS + * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE + * USE OR OTHER DEALINGS IN THE SOFTWARE. + * ========================================================================== + */ +#ifndef TIMEOUT_H +#define TIMEOUT_H + +#include <stdbool.h> /* bool */ +#include <stdio.h> /* FILE */ + +#include <inttypes.h> /* PRIu64 PRIx64 PRIX64 uint64_t */ + +#include <sys/queue.h> /* TAILQ(3) */ + + +/* + * V E R S I O N I N T E R F A C E S + * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#if !defined TIMEOUT_PUBLIC +#define TIMEOUT_PUBLIC +#endif + +#define TIMEOUT_VERSION TIMEOUT_V_REL +#define TIMEOUT_VENDOR "[email protected]" + +#define TIMEOUT_V_REL 0x20160226 +#define TIMEOUT_V_ABI 0x20160224 +#define TIMEOUT_V_API 0x20160226 + +TIMEOUT_PUBLIC int timeout_version(void); + +TIMEOUT_PUBLIC const char *timeout_vendor(void); + +TIMEOUT_PUBLIC int timeout_v_rel(void); + +TIMEOUT_PUBLIC int timeout_v_abi(void); + +TIMEOUT_PUBLIC int timeout_v_api(void); + + +/* + * I N T E G E R T Y P E I N T E R F A C E S + * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#define TIMEOUT_C(n) UINT64_C(n) +#define TIMEOUT_PRIu PRIu64 +#define TIMEOUT_PRIx PRIx64 +#define TIMEOUT_PRIX PRIX64 + +#define TIMEOUT_mHZ TIMEOUT_C(1000) +#define TIMEOUT_uHZ TIMEOUT_C(1000000) +#define TIMEOUT_nHZ TIMEOUT_C(1000000000) + +typedef uint64_t timeout_t; + +#define timeout_error_t int /* for documentation purposes */ + + +/* + * C A L L B A C K I N T E R F A C E + * + * Callback function parameters unspecified to make embedding into existing + * applications easier. + * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#ifndef TIMEOUT_CB_OVERRIDE +struct timeout_cb { + void (*fn)(); + void *arg; +}; /* struct timeout_cb */ +#endif + +/* + * T I M E O U T I N T E R F A C E S + * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#ifndef TIMEOUT_DISABLE_INTERVALS +#define TIMEOUT_INT 0x01 /* interval (repeating) timeout */ +#endif +#define TIMEOUT_ABS 0x02 /* treat timeout values as absolute */ + +#define TIMEOUT_INITIALIZER(flags) { (flags) } + +#define timeout_setcb(to, fn, arg) do { \ + (to)->callback.fn = (fn); \ + (to)->callback.arg = (arg); \ +} while (0) + +struct timeout { + int flags; + + timeout_t expires; + /* absolute expiration time */ + + struct timeout_list *pending; + /* timeout list if pending on wheel or expiry queue */ + + TAILQ_ENTRY(timeout) tqe; + /* entry member for struct timeout_list lists */ + +#ifndef TIMEOUT_DISABLE_CALLBACKS + struct timeout_cb callback; + /* optional callback information */ +#endif + +#ifndef TIMEOUT_DISABLE_INTERVALS + timeout_t interval; + /* timeout interval if periodic */ +#endif + +#ifndef TIMEOUT_DISABLE_RELATIVE_ACCESS + struct timeouts *timeouts; + /* timeouts collection if member of */ +#endif +}; /* struct timeout */ + + +TIMEOUT_PUBLIC struct timeout *timeout_init(struct timeout *, int); +/* initialize timeout structure (same as TIMEOUT_INITIALIZER) */ + +#ifndef TIMEOUT_DISABLE_RELATIVE_ACCESS +TIMEOUT_PUBLIC bool timeout_pending(struct timeout *); +/* true if on timing wheel, false otherwise */ + +TIMEOUT_PUBLIC bool timeout_expired(struct timeout *); +/* true if on expired queue, false otherwise */ + +TIMEOUT_PUBLIC void timeout_del(struct timeout *); +/* remove timeout from any timing wheel (okay if not member of any) */ +#endif + +/* + * T I M I N G W H E E L I N T E R F A C E S + * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +struct timeouts; + +TIMEOUT_PUBLIC struct timeouts *timeouts_open(timeout_t, timeout_error_t *); +/* open a new timing wheel, setting optional HZ (for float conversions) */ + +TIMEOUT_PUBLIC void timeouts_close(struct timeouts *); +/* destroy timing wheel */ + +TIMEOUT_PUBLIC long long timeouts_count(struct timeouts *); +/* count timeout of timeouts*/ + +TIMEOUT_PUBLIC timeout_t timeouts_hz(struct timeouts *); +/* return HZ setting (for float conversions) */ + +TIMEOUT_PUBLIC void timeouts_update(struct timeouts *, timeout_t); +/* update timing wheel with current absolute time */ + +TIMEOUT_PUBLIC void timeouts_step(struct timeouts *, timeout_t); +/* step timing wheel by relative time */ + +TIMEOUT_PUBLIC timeout_t timeouts_timeout(struct timeouts *); +/* return interval to next required update */ + +TIMEOUT_PUBLIC void timeouts_add(struct timeouts *, struct timeout *, timeout_t); +/* add timeout to timing wheel */ + +TIMEOUT_PUBLIC void timeouts_del(struct timeouts *, struct timeout *); +/* remove timeout from any timing wheel or expired queue (okay if on neither) */ + +TIMEOUT_PUBLIC struct timeout *timeouts_get(struct timeouts *); +/* return any expired timeout (caller should loop until NULL-return) */ + +TIMEOUT_PUBLIC bool timeouts_pending(struct timeouts *); +/* return true if any timeouts pending on timing wheel */ + +TIMEOUT_PUBLIC bool timeouts_expired(struct timeouts *); +/* return true if any timeouts on expired queue */ + +TIMEOUT_PUBLIC bool timeouts_check(struct timeouts *, FILE *); +/* return true if invariants hold. describes failures to optional file handle. */ + +#define TIMEOUTS_PENDING 0x10 +#define TIMEOUTS_EXPIRED 0x20 +#define TIMEOUTS_ALL (TIMEOUTS_PENDING|TIMEOUTS_EXPIRED) +#define TIMEOUTS_CLEAR 0x40 + +#define TIMEOUTS_IT_INITIALIZER(flags) { (flags), 0, 0, 0, 0 } + +#define TIMEOUTS_IT_INIT(cur, _flags) do { \ + (cur)->flags = (_flags); \ + (cur)->pc = 0; \ +} while (0) + +struct timeouts_it { + int flags; + unsigned pc, i, j; + struct timeout *to; +}; /* struct timeouts_it */ + +TIMEOUT_PUBLIC struct timeout *timeouts_next(struct timeouts *, struct timeouts_it *); +/* return next timeout in pending wheel or expired queue. caller can delete + * the returned timeout, but should not otherwise manipulate the timing + * wheel. in particular, caller SHOULD NOT delete any other timeout as that + * could invalidate cursor state and trigger a use-after-free. + */ + +#define TIMEOUTS_FOREACH(var, T, flags) \ + struct timeouts_it _it = TIMEOUTS_IT_INITIALIZER((flags)); \ + while (((var) = timeouts_next((T), &_it))) + + +/* + * B O N U S W H E E L I N T E R F A C E S + * + * I usually use floating point timeouts in all my code, but it's cleaner to + * separate it to keep the core algorithmic code simple. + * + * Using macros instead of static inline routines where <math.h> routines + * might be used to keep -lm linking optional. + * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#include <math.h> /* ceil(3) */ + +#define timeouts_f2i(T, f) \ + ((timeout_t)ceil((f) * timeouts_hz((T)))) /* prefer late expiration over early */ + +#define timeouts_i2f(T, i) \ + ((double)(i) / timeouts_hz((T))) + +#define timeouts_addf(T, to, timeout) \ + timeouts_add((T), (to), timeouts_f2i((T), (timeout))) + +#endif /* TIMEOUT_H */ diff --git a/shaping/CMakeLists.txt b/shaping/CMakeLists.txt index 846a1bb..b96c476 100644 --- a/shaping/CMakeLists.txt +++ b/shaping/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(shaper src/shaper_maat.cpp src/shaper_marsio.cpp src/shaper_session.cpp src/shaper_stat.cpp src/shaper_swarmkv.cpp src/shaper.cpp src/shaper_global_stat.cpp) +add_library(shaper src/shaper_maat.cpp src/shaper_marsio.cpp src/shaper_session.cpp src/shaper_stat.cpp src/shaper_swarmkv.cpp src/shaper.cpp src/shaper_global_stat.cpp ${PROJECT_SOURCE_DIR}/deps/timeout/timeout.c) target_link_libraries(shaper PUBLIC common) target_link_libraries(shaper PUBLIC avl_tree) target_link_libraries(shaper PUBLIC cjson) @@ -7,6 +7,7 @@ target_link_libraries(shaper PUBLIC MESA_prof_load) target_link_libraries(shaper PUBLIC fieldstat3) target_link_libraries(shaper PUBLIC pthread) target_include_directories(shaper PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include) +target_include_directories(shaper PUBLIC ${PROJECT_SOURCE_DIR}/deps/timeout) add_executable(shaping_engine src/main.cpp) target_link_libraries(shaping_engine PUBLIC shaper) target_link_libraries(shaping_engine PUBLIC maat4) diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index 7821459..a218884 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -6,6 +6,9 @@ #include "session_table.h" #include "utils.h" #include "shaper_stat.h" +extern "C" { +#include "timeout.h" +} #define SHAPING_DIR_IN 0x1 #define SHAPING_DIR_OUT 0x2 @@ -22,6 +25,9 @@ #define SHAPING_WROK_THREAD_NUM_MAX 128 +#define SHAPING_STAT_REFRESH_INTERVAL_SEC 2 +#define SHAPING_STAT_REFRESH_MAX_PER_POLLING 5 + #define SHAPING_GLOBAL_CONF_FILE "./conf/shaping.conf" struct shaping_system_conf { @@ -46,6 +52,8 @@ struct shaping_thread_ctx { struct swarmkv *swarmkv_db;//handle of swarmkv struct shaping_maat_info *maat_info; struct session_table *session_table; + struct timeouts *expires; + time_t last_update_timeout_sec; int session_need_reset; struct shaping_system_conf conf; }; @@ -84,9 +92,8 @@ struct shaping_profile_info { int priority; int in_deposit_token; int out_deposit_token; - int async_token_ref_count; - int async_queue_len_ref_count; unsigned long long enqueue_time_us;//to calculate max latency + unsigned long long last_failed_get_token_ms; unsigned char is_priority_blocked; unsigned char is_invalid; struct shaping_stat_for_profile stat; @@ -142,6 +149,8 @@ struct shaping_flow { unsigned long long processed_pkts; struct timespec stat_update_time; time_t check_rule_time; + struct timeout timeout_handle; + time_t last_update_timeout_sec; }; struct shaper_flow_instance { @@ -151,7 +160,7 @@ struct shaper_flow_instance { struct shaper;//instance of shaping, thread unsafe -struct shaping_flow* shaping_flow_new(); +struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx); void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf); struct shaper* shaper_new(unsigned int priority_queue_len_max); void shaper_free(struct shaper *sp); diff --git a/shaping/include/shaper_stat.h b/shaping/include/shaper_stat.h index 20da941..a2527cd 100644 --- a/shaping/include/shaper_stat.h +++ b/shaping/include/shaper_stat.h @@ -55,4 +55,4 @@ void shaper_stat_forward_all_rule_inc(struct shaping_stat *stat, struct shaping_ void shaper_stat_drop_inc(struct shaping_stat_for_profile *profile_stat, unsigned char direction, int thread_id); void shaper_stat_max_latency_update(struct shaping_stat_for_profile *profile_stat, unsigned char direction, unsigned long long latency, int thread_id); -void shaper_stat_refresh(struct shaping_stat *stat, struct shaping_flow *sf, int thread_id, int force);
\ No newline at end of file +void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int thread_id, int force);
\ No newline at end of file diff --git a/shaping/include/shaper_swarmkv.h b/shaping/include/shaper_swarmkv.h index 8ccfe19..e533802 100644 --- a/shaping/include/shaper_swarmkv.h +++ b/shaping/include/shaper_swarmkv.h @@ -1,4 +1,5 @@ +#include <MESA/swarmkv.h> -struct swarmkv* shaper_swarmkv_init(); +struct swarmkv* shaper_swarmkv_init(int caller_thread_num); void shaper_swarmkv_destroy(struct swarmkv* swarmkv_db); void swarmkv_reload_log_level();
\ No newline at end of file diff --git a/shaping/src/main.cpp b/shaping/src/main.cpp index 91e1202..f9a38b2 100644 --- a/shaping/src/main.cpp +++ b/shaping/src/main.cpp @@ -25,6 +25,8 @@ static void *shaper_thread_loop(void *data) return NULL; } + swarmkv_register_thread(ctx->swarmkv_db); + //loop to process pkts while(!quit) { shaper_packet_recv_and_process(ctx); diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 09a0c24..02d7f33 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -27,8 +27,14 @@ extern "C" { #define MICRO_SECONDS_PER_SEC 1000000 #define NANO_SECONDS_PER_SEC 1000000000 +#define NANO_SECONDS_PER_MILLI_SEC 1000000 +#define MILLI_SECONDS_PER_SEC 1000 + #define SHAPING_LATENCY_THRESHOLD 2000000 //2s +#define TOKEN_ENLARGE_TIMES 10 +#define TOKEN_GET_FAILED_INTERVAL_MS 1 + #define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 "HMGET tsg-shaping-%d priority-0" #define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 " priority-1" #define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 " priority-2" @@ -137,7 +143,7 @@ static void shaping_node_free(struct shaping_node *s_node) return; } -struct shaping_flow* shaping_flow_new() +struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx) { struct shaping_node *s_node = NULL; int i; @@ -157,6 +163,8 @@ struct shaping_flow* shaping_flow_new() TAILQ_INIT(&s_node->shaping_flow.packet_queue); s_node->shaping_flow.ref_count = 1; s_node->shaping_flow.priority = SHAPING_PRIORITY_NUM_MAX - 1; + timeout_init(&s_node->shaping_flow.timeout_handle, TIMEOUT_ABS); + timeouts_add(ctx->expires, &s_node->shaping_flow.timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC); return &s_node->shaping_flow; @@ -170,7 +178,8 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) struct shaping_node *s_node = (struct shaping_node*)sf; if (__atomic_sub_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST) == 0) { - shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 1); + timeouts_del(ctx->expires, &sf->timeout_handle); + shaper_stat_refresh(ctx, sf, ctx->thread_index, 1); shaping_node_free(s_node); } @@ -250,19 +259,6 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) return; } -static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * cb_arg) -{ - struct shaping_global_stat *global_stat = (struct shaping_global_stat *)cb_arg; - - shaper_global_stat_async_callback_inc(global_stat); - - if (reply->type != SWARMKV_REPLY_INTEGER) { - shaper_global_stat_async_hincrby_failed_inc(global_stat); - } - - return; -} - //return success(0) while any avl tree insert success int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time) { @@ -277,20 +273,10 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un pkt_wrapper = shaper_first_pkt_get(sf); assert(pkt_wrapper != NULL); - if ((sf->flag & SESSION_UPDATE_PF_PRIO_LEN) == 0) { - if (sf->processed_pkts > CONFIRM_PRIORITY_PKTS) { - sf->flag |= SESSION_UPDATE_PF_PRIO_LEN; - } - } - priority = s_rule_info->primary.priority; avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; - if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - shaper_global_stat_async_invoke_inc(ctx->global_stat); - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority); - } } if (s_rule_info->borrowing_num == 0) {// no borrow profile @@ -302,10 +288,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; - if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - shaper_global_stat_async_invoke_inc(ctx->global_stat); - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority); - } + //TODO: calculate queue_len for borrow profile and add judge when refresh stat???? + //shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index); } } @@ -345,10 +329,6 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) priority = s_rule_info->primary.priority; if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); - if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - shaper_global_stat_async_invoke_inc(ctx->global_stat); - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority); - } } if (s_rule_info->borrowing_num == 0) { @@ -359,10 +339,7 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) priority = s_rule_info->borrowing[i].priority; if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); - if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - shaper_global_stat_async_invoke_inc(ctx->global_stat); - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority); - } + //TODO: calculate queue_len for borrow profile and add judge when refresh stat???? } } @@ -416,6 +393,8 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg shaper_global_stat_async_callback_inc(arg->ctx->global_stat); + LOG_INFO("Swarmkv reply type =%d, integer =%llu",reply->type, reply->integer); + if (reply->type != SWARMKV_REPLY_INTEGER) { shaper_global_stat_async_tconsume_failed_inc(arg->ctx->global_stat); goto END; @@ -428,10 +407,16 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg s_pf_info->is_invalid = 0; } + if (reply->integer == 0) {//no token + struct timespec curr_time; + clock_gettime(CLOCK_MONOTONIC, &curr_time); + s_pf_info->last_failed_get_token_ms = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; + goto END; + } + shaper_deposit_token_add(s_pf_info, reply->integer, arg->direction);//deposit tokens to profile END: - __atomic_sub_fetch(&s_pf_info->async_token_ref_count, 1, __ATOMIC_SEQ_CST); shaping_flow_free(arg->ctx, sf);//sub ref count and decide if need to free free(cb_arg); cb_arg = NULL; @@ -470,7 +455,6 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct struct shaping_async_cb_arg *arg = NULL; char key[32] = {0}; - __atomic_add_fetch(&pf_info->async_token_ref_count, 1, __ATOMIC_SEQ_CST); __atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST); snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming"); @@ -483,14 +467,14 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaper_global_stat_async_invoke_inc(ctx->global_stat); switch (pf_info->type) { case PROFILE_TYPE_GENERIC: - swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits, shaper_token_get_cb, arg); + swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg); break; case PROFILE_TYPE_HOST_FARINESS: case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS: - swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits, shaper_token_get_cb, arg); + swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg); break; case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST: - swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits, shaper_token_get_cb, arg); + swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg); break; default: if (arg) { @@ -499,11 +483,6 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct break; } - if (__atomic_load_n(&pf_info->async_token_ref_count, __ATOMIC_SEQ_CST) != 0) {//has async operation not completed - shaper_deposit_token_sub(pf_info, req_token_bits, direction); - return SHAPER_TOKEN_GET_SUCCESS; - } - if (pf_info->is_invalid) { if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token return SHAPER_TOKEN_GET_SUCCESS; @@ -551,7 +530,6 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb } END: - __atomic_sub_fetch(&s_pf_info->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST); shaping_flow_free(arg->ctx, sf);//sub ref count and decide if need to free free(cb_arg); cb_arg = NULL; @@ -572,20 +550,15 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st arg->sf = sf; arg->priority = priority; - __atomic_add_fetch(&profile->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST); __atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST); shaper_global_stat_async_invoke_inc(ctx->global_stat); swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id); - if (__atomic_load_n(&profile->async_queue_len_ref_count, __ATOMIC_SEQ_CST) != 0) { - return 0; + if (profile->is_priority_blocked) { + return 1; } else { - if (profile->is_priority_blocked) { - return 1; - } else { - return 0; - } + return 0; } } @@ -609,6 +582,18 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f return SHAPER_TOKEN_GET_PASS;//rule is disabled, don't need to get token and forward packet } + if (shaper_deposit_token_is_enough(profile, req_token_bytes * 8, direction)) { + shaper_deposit_token_sub(profile, req_token_bytes * 8, direction); + return SHAPER_TOKEN_GET_SUCCESS; + } + + struct timespec curr_timespec; + clock_gettime(CLOCK_MONOTONIC, &curr_timespec); + unsigned long long curr_time_ms = curr_timespec.tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; + if (curr_time_ms - profile->last_failed_get_token_ms < TOKEN_GET_FAILED_INTERVAL_MS) {//if failed to get token in last 1ms, return failed; for swarmkv can't reproduce token in 1ms + return SHAPER_TOKEN_GET_FAILED; + } + if (shaper_profile_is_priority_blocked(ctx, sf, profile)) { return SHAPER_TOKEN_GET_FAILED; } else { @@ -831,12 +816,12 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ break; } - shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 0); + shaper_stat_refresh(ctx, sf, ctx->thread_index, 0); if (shaper_queue_empty(sf)) { if (sf->flag & SESSION_CLOSE) { - shaping_flow_free(ctx, sf); sf->flag &= (~SESSION_CLOSE); + shaping_flow_free(ctx, sf); } return 0; } else { @@ -849,8 +834,8 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ } else { shaper_queue_clear(sf, ctx);//first packet fail, then every packet will fail if (sf->flag & SESSION_CLOSE) { - shaping_flow_free(ctx, sf); sf->flag &= (~SESSION_CLOSE); + shaping_flow_free(ctx, sf); } } return 0; @@ -911,14 +896,20 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu } END: - shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 0); + shaper_stat_refresh(ctx, sf, ctx->thread_index, 0); + time_t curr_time = time(NULL); + if (curr_time > sf->last_update_timeout_sec) { + timeouts_add(ctx->expires, &sf->timeout_handle, curr_time + SHAPING_STAT_REFRESH_INTERVAL_SEC); + sf->last_update_timeout_sec = curr_time; + } + if(sf->flag & SESSION_CLOSE) { if (shaper_queue_empty(sf)) { char *addr_str = addr_tuple4_to_str(&sf->tuple4); LOG_DEBUG("%s: shaping free a shaping_flow for session: %s", LOG_TAG_SHAPING, addr_str); - shaping_flow_free(ctx, sf); sf->flag &= (~SESSION_CLOSE); + shaping_flow_free(ctx, sf); if (addr_str) { free(addr_str); @@ -931,6 +922,27 @@ END: void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_thread_ctx *ctx) { + swarmkv_caller_loop(ctx->swarmkv_db, SWARMKV_LOOP_NONBLOCK, NULL); + + struct timeout *t = NULL; + struct shaping_flow *sf = NULL; + time_t curr_time = time(NULL); + int cnt = 0; + + if (curr_time > ctx->last_update_timeout_sec) { + timeouts_update(ctx->expires, curr_time); + ctx->last_update_timeout_sec = curr_time; + } + + t = timeouts_get(ctx->expires); + while (t && cnt < SHAPING_STAT_REFRESH_MAX_PER_POLLING) { + sf = container_of(t, struct shaping_flow, timeout_handle); + shaper_stat_refresh(ctx, sf, ctx->thread_index, 0); + timeouts_add(ctx->expires, &sf->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC); + t = timeouts_get(ctx->expires); + cnt++; + } + if (shaper_global_stat_queueing_pkts_get(ctx->global_stat) == 0) { return; } @@ -1182,6 +1194,7 @@ void shaping_engine_destroy(struct shaping_ctx *ctx) for (int i = 0; i < ctx->thread_num; i++) { shaper_free(ctx->thread_ctx[i].sp); session_table_destory(ctx->thread_ctx[i].session_table); + timeouts_close(ctx->thread_ctx[i].expires); } free(ctx->thread_ctx); } @@ -1196,7 +1209,7 @@ struct shaping_ctx *shaping_engine_init() { struct shaping_system_conf conf; struct shaping_ctx *ctx = NULL; - int ret; + int ret, error; memset(&conf, 0, sizeof(conf)); ctx = (struct shaping_ctx *)calloc(1, sizeof(struct shaping_ctx)); @@ -1208,7 +1221,7 @@ struct shaping_ctx *shaping_engine_init() } /*init swarmkv*/ - ctx->swarmkv_db = shaper_swarmkv_init(); + ctx->swarmkv_db = shaper_swarmkv_init(conf.work_thread_num); if (ctx->swarmkv_db == NULL) { goto ERROR; } @@ -1246,6 +1259,7 @@ struct shaping_ctx *shaping_engine_init() ctx->thread_ctx[i].maat_info = ctx->maat_info; ctx->thread_ctx[i].marsio_info = ctx->marsio_info; ctx->thread_ctx[i].swarmkv_db = ctx->swarmkv_db; + ctx->thread_ctx[i].expires = timeouts_open(0, &error); ctx->thread_ctx[i].ref_ctx = ctx; memcpy(&ctx->thread_ctx[i].conf, &conf, sizeof(conf)); } diff --git a/shaping/src/shaper_maat.cpp b/shaping/src/shaper_maat.cpp index 4b4f21f..64db3e6 100644 --- a/shaping/src/shaper_maat.cpp +++ b/shaping/src/shaper_maat.cpp @@ -381,7 +381,7 @@ void shaper_rules_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf } if (sf->rule_num > 0 && priority_changed) { - shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 1); + shaper_stat_refresh(ctx, sf, ctx->thread_index, 1); } sf->rule_num += rule_num; diff --git a/shaping/src/shaper_session.cpp b/shaping/src/shaper_session.cpp index f43f76f..af4a7ee 100644 --- a/shaping/src/shaper_session.cpp +++ b/shaping/src/shaper_session.cpp @@ -30,7 +30,7 @@ struct shaping_flow* shaper_session_opening(struct shaping_thread_ctx *ctx, stru return NULL; } - sf = shaping_flow_new(); + sf = shaping_flow_new(ctx); raw_packet_parser_get_most_inner_tuple4(raw_parser, &sf->tuple4); sf->src_ip_str = addr_src_ip_to_str(&sf->tuple4); sf->src_ip_str_len = strlen(sf->src_ip_str); diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp index 6bd2cfb..6ede233 100644 --- a/shaping/src/shaper_stat.cpp +++ b/shaping/src/shaper_stat.cpp @@ -4,12 +4,14 @@ #include <sys/socket.h> #include <arpa/inet.h> #include <MESA/MESA_prof_load.h> +#include <MESA/swarmkv.h> #include <fieldstat.h> #include "log.h" #include "utils.h" #include "shaper.h" #include "shaper_stat.h" +#include "shaper_global_stat.h" #define SHAPER_STAT_ROW_NAME "traffic_shaping_rule_hits" @@ -131,9 +133,23 @@ static void shaper_stat_tags_build(int vsys_id, int rule_id, int profile_id, int return; } -static void shaper_stat_profile_metirc_refresh(struct shaping_stat *stat, int vsys_id, int thread_id, int rule_id, struct shaping_profile_info *profile, int profile_type, int need_update_guage) +static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, void * cb_arg) +{ + struct shaping_global_stat *global_stat = (struct shaping_global_stat *)cb_arg; + + shaper_global_stat_async_callback_inc(global_stat); + + if (reply->type != SWARMKV_REPLY_INTEGER) { + shaper_global_stat_async_hincrby_failed_inc(global_stat); + } + + return; +} + +static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, int vsys_id, int thread_id, int rule_id, struct shaping_profile_info *profile, int profile_type, int need_update_guage) { struct shaping_stat_for_profile *profile_stat = &profile->stat; + struct shaping_stat *stat = ctx->stat; unsigned long long old_latency; shaper_stat_tags_build(vsys_id, rule_id, profile->id, profile->priority, profile_type); @@ -158,6 +174,9 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_stat *stat, int vs if (need_update_guage) { fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.queue_len, tags, TAG_IDX_MAX, thread_id); fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.queue_len, tags, TAG_IDX_MAX, thread_id); + + shaper_global_stat_async_invoke_inc(ctx->global_stat); + swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d %lld", profile->id, profile->priority, profile_stat->in.queue_len + profile_stat->out.queue_len); memset(profile_stat, 0, sizeof(struct shaping_stat_for_profile)); } else { profile_stat->in.pkts = 0; @@ -174,7 +193,7 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_stat *stat, int vs return; } -void shaper_stat_refresh(struct shaping_stat *stat, struct shaping_flow *sf, int thread_id, int force) +void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int thread_id, int force) { struct shaping_rule_info *rule; struct timespec curr_time; @@ -199,10 +218,10 @@ void shaper_stat_refresh(struct shaping_stat *stat, struct shaping_flow *sf, int for (int i = 0; i < sf->rule_num; i++) { rule = &sf->matched_rule_infos[i]; - shaper_stat_profile_metirc_refresh(stat, rule->vsys_id, thread_id, rule->id, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage); + shaper_stat_profile_metirc_refresh(ctx, rule->vsys_id, thread_id, rule->id, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage); for (int j = 0; j < rule->borrowing_num; j++) { - shaper_stat_profile_metirc_refresh(stat, rule->vsys_id, thread_id, rule->id, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage); + shaper_stat_profile_metirc_refresh(ctx, rule->vsys_id, thread_id, rule->id, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage); } } diff --git a/shaping/src/shaper_swarmkv.cpp b/shaping/src/shaper_swarmkv.cpp index 6d2e32f..05ccecc 100644 --- a/shaping/src/shaper_swarmkv.cpp +++ b/shaping/src/shaper_swarmkv.cpp @@ -1,10 +1,10 @@ -#include <MESA/swarmkv.h> #include <MESA/MESA_handle_logger.h> #include <MESA/MESA_prof_load.h> #include "log.h" #include "shaper.h" #include "utils.h" +#include "shaper_swarmkv.h" struct shaper_swarmkv_conf { @@ -97,7 +97,7 @@ void swarmkv_reload_log_level() return; } -struct swarmkv* shaper_swarmkv_init() +struct swarmkv* shaper_swarmkv_init(int caller_thread_num) { struct swarmkv_options *swarmkv_opts = NULL; struct swarmkv *swarmkv_db = NULL; @@ -120,6 +120,8 @@ struct swarmkv* shaper_swarmkv_init() swarmkv_options_set_health_check_announce_port(swarmkv_opts, conf.swarmkv_health_check_announce_port); swarmkv_options_set_log_path(swarmkv_opts, "log"); swarmkv_options_set_log_level(swarmkv_opts, conf.swarmkv_log_level); + swarmkv_options_set_caller_thread_number(swarmkv_opts, caller_thread_num); + swarmkv_options_set_worker_thread_number(swarmkv_opts, 1); swarmkv_db = swarmkv_open(swarmkv_opts, conf.swarmkv_cluster_name, &err); if (err) { diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp index a1a9b77..252d5f5 100644 --- a/shaping/test/gtest_shaper.cpp +++ b/shaping/test/gtest_shaper.cpp @@ -5,6 +5,7 @@ #include <cjson/cJSON.h> #include <sys/queue.h> +#include "log.h" #include "shaper.h" #include "shaper_maat.h" #include "shaper_stat.h" @@ -239,7 +240,7 @@ TEST(single_session, udp_tx_in_order) stub_init(); ctx = shaping_engine_init(); ASSERT_TRUE(ctx != NULL); - sf = shaping_flow_new(); + sf = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf != NULL); stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id); @@ -270,7 +271,6 @@ TEST(single_session, udp_tx_in_order) fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy shaper_global_stat_refresh(ctx->global_stat); @@ -287,7 +287,7 @@ TEST(single_session, udp_tx_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 170, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts + shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 170000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); @@ -320,7 +320,7 @@ TEST(single_session, tcp_tx_in_order) stub_init(); ctx = shaping_engine_init(); ASSERT_TRUE(ctx != NULL); - sf = shaping_flow_new(); + sf = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf != NULL); stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id); @@ -340,8 +340,7 @@ TEST(single_session, tcp_tx_in_order) /***********send stat data here********************/ - stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric - shaper_stat_refresh(ctx->thread_ctx[0].stat, sf, ctx->thread_ctx[0].thread_index, 1); + shaper_stat_refresh(&ctx->thread_ctx[0], sf, ctx->thread_ctx[0].thread_index, 1); fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy sleep(2);//wait telegraf generate metric @@ -357,7 +356,6 @@ TEST(single_session, tcp_tx_in_order) fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy @@ -375,7 +373,7 @@ TEST(single_session, tcp_tx_in_order) shaping_stat_judge(line, 0, 0, 1, 30, 3000, 0, 10, 0, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 0, 30 + (STUB_TIME_INC_FOR_METRIC_SEND / 1000), SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 0, 30000, SHAPING_DIR_OUT, profile_type_primary); fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file @@ -401,7 +399,7 @@ TEST(single_session, udp_diff_direction) stub_init(); ctx = shaping_engine_init(); ASSERT_TRUE(ctx != NULL); - sf = shaping_flow_new(); + sf = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf != NULL); stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id); @@ -436,7 +434,6 @@ TEST(single_session, udp_diff_direction) fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy @@ -451,9 +448,9 @@ TEST(single_session, udp_diff_direction) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 20, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 20000, SHAPING_DIR_OUT, profile_type_primary); - shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 20, SHAPING_DIR_IN, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 20000, SHAPING_DIR_IN, profile_type_primary); fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); @@ -481,12 +478,12 @@ TEST(single_session, udp_multi_rules) stub_init(); ctx = shaping_engine_init(); ASSERT_TRUE(ctx != NULL); - sf = shaping_flow_new(); + sf = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf != NULL); stub_set_matched_shaping_rules(3, rule_id, priority, profile_num, profile_id); - stub_set_token_bucket_avl_per_sec(0, 1200, SHAPING_DIR_OUT); - stub_set_token_bucket_avl_per_sec(1, 1100, SHAPING_DIR_OUT); + stub_set_token_bucket_avl_per_sec(0, 3000, SHAPING_DIR_OUT); + stub_set_token_bucket_avl_per_sec(1, 2000, SHAPING_DIR_OUT); stub_set_token_bucket_avl_per_sec(2, 1000, SHAPING_DIR_OUT); actual_tx_queue = stub_get_tx_queue(); shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 3); @@ -516,7 +513,6 @@ TEST(single_session, udp_multi_rules) fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy shaping_engine_destroy(ctx); @@ -529,13 +525,13 @@ TEST(single_session, udp_multi_rules) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 0 - shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 506, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 506000, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1 - shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 1, SHAPING_DIR_OUT, profile_type_primary);//latency of every queued pkt is 1 + shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 1000, SHAPING_DIR_OUT, profile_type_primary);//latency of every queued pkt is 1 ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2 - shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 90, SHAPING_DIR_OUT, profile_type_primary);//max latency is first queued pkt + shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 90000, SHAPING_DIR_OUT, profile_type_primary);//max latency is first queued pkt fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file @@ -561,7 +557,7 @@ TEST(single_session, udp_borrow) stub_init(); ctx = shaping_engine_init(); ASSERT_TRUE(ctx != NULL); - sf = shaping_flow_new(); + sf = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf != NULL); stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id); @@ -591,7 +587,6 @@ TEST(single_session, udp_borrow) fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy shaping_engine_destroy(ctx); @@ -604,7 +599,7 @@ TEST(single_session, udp_borrow) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary - shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 170, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 170000, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow); @@ -635,7 +630,7 @@ TEST(single_session, udp_borrow_same_priority_9) stub_init(); ctx = shaping_engine_init(); ASSERT_TRUE(ctx != NULL); - sf = shaping_flow_new(); + sf = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf != NULL); stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id); @@ -666,7 +661,6 @@ TEST(single_session, udp_borrow_same_priority_9) fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy shaping_engine_destroy(ctx); @@ -679,7 +673,7 @@ TEST(single_session, udp_borrow_same_priority_9) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary - shaping_stat_judge(line, 1, 1, 9, 0, 0, 0, 0, 170, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 1, 1, 9, 0, 0, 0, 0, 170000, SHAPING_DIR_OUT, profile_type_primary); #if 0 //fieldstat don't output a row when all values is zero ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow @@ -721,9 +715,9 @@ TEST(two_session_diff_priority, udp_in_order) TAILQ_INIT(&expec_tx_queue2); ctx = shaping_engine_init(); ASSERT_TRUE(ctx != NULL); - sf1 = shaping_flow_new(); + sf1 = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf1 != NULL); - sf2 = shaping_flow_new(); + sf2 = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf2 != NULL); stub_set_matched_shaping_rules(2, rule_ids, prioritys, profile_nums, profile_ids); @@ -775,7 +769,6 @@ TEST(two_session_diff_priority, udp_in_order) fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy @@ -789,10 +782,10 @@ TEST(two_session_diff_priority, udp_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 0 - shaping_stat_judge(line, 0, 0, 2, 100, 10000, 0, 0, 280, SHAPING_DIR_OUT, profile_type_primary);//max latency is every queued pkts + shaping_stat_judge(line, 0, 0, 2, 100, 10000, 0, 0, 280000, SHAPING_DIR_OUT, profile_type_primary);//max latency is every queued pkts ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1 - shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 90, SHAPING_DIR_OUT, profile_type_primary);//max latency is every queued pkts + shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 90000, SHAPING_DIR_OUT, profile_type_primary);//max latency is every queued pkts fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file @@ -834,9 +827,9 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule) ctx = shaping_engine_init(); ASSERT_TRUE(ctx != NULL); - sf1 = shaping_flow_new(); + sf1 = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf1 != NULL); - sf2 = shaping_flow_new(); + sf2 = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf2 != NULL); stub_set_matched_shaping_rules(4, rule_ids, prioritys, profile_nums, profile_ids); @@ -895,7 +888,6 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule) fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy @@ -910,16 +902,16 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule) memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 1, 1, 1, 20, 2000, 0, 0, 48, SHAPING_DIR_OUT, profile_type_primary);//profile_id 1, max latency is last pkt + shaping_stat_judge(line, 1, 1, 1, 20, 2000, 0, 0, 48000, SHAPING_DIR_OUT, profile_type_primary);//profile_id 1, max latency is last pkt ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 2, 2, 1, 20, 2000, 0, 0, 1, SHAPING_DIR_OUT, profile_type_primary);//profile_id 2, evevy queued pkt's latency is 1 + shaping_stat_judge(line, 2, 2, 1, 20, 2000, 0, 0, 1000, SHAPING_DIR_OUT, profile_type_primary);//profile_id 2, evevy queued pkt's latency is 1 ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 4, 4, 1, 20, 2000, 0, 0, 1, SHAPING_DIR_OUT, profile_type_primary);//profile_id 4, max latency is first queued pkt + shaping_stat_judge(line, 4, 4, 1, 20, 2000, 0, 0, 1000, SHAPING_DIR_OUT, profile_type_primary);//profile_id 4, max latency is first queued pkt ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 3, 3, 3, 20, 2000, 0, 0, 40, SHAPING_DIR_OUT, profile_type_primary);//profile_id 3, every queued pkt's latency is 40 + shaping_stat_judge(line, 3, 3, 3, 20, 2000, 0, 0, 40000, SHAPING_DIR_OUT, profile_type_primary);//profile_id 3, every queued pkt's latency is 40 fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file @@ -947,7 +939,7 @@ TEST(single_session_async, udp_tx_in_order) stub_init(); ctx = shaping_engine_init(); ASSERT_TRUE(ctx != NULL); - sf = shaping_flow_new(); + sf = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf != NULL); stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id); @@ -965,15 +957,8 @@ TEST(single_session_async, udp_tx_in_order) send_packets(&ctx->thread_ctx[0], sf, 80, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); - //first 20 packets,async get token - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 20)); - ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); - - stub_refresh_token_bucket(0); - for (int i = 0; i < 10; i++) {//异步获取token多发送了10个报文,补回token,不应发送报文 - polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); - } + //first 10 packets, got token + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 90 delay packets @@ -990,7 +975,6 @@ TEST(single_session_async, udp_tx_in_order) fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy shaping_engine_destroy(ctx); @@ -1003,7 +987,7 @@ TEST(single_session_async, udp_tx_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 160, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts + shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 170000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); @@ -1029,7 +1013,7 @@ TEST(single_session_async, udp_close_before_async_exec) stub_init(); ctx = shaping_engine_init(); ASSERT_TRUE(ctx != NULL); - sf = shaping_flow_new(); + sf = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf != NULL); stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id); @@ -1041,35 +1025,21 @@ TEST(single_session_async, udp_close_before_async_exec) /*******send packets***********/ send_packets(&ctx->thread_ctx[0], sf, 10, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0); - + ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));//async callback haven't been called, no token, no packet be sent + sf->flag |= SESSION_CLOSE;// receive close ctrlbuf stub_set_async_token_get_times(0, 0);//refresh async count, async thread will be executed sleep(1);//ensure async thread exec complete - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); - - shaping_flow_free(&ctx->thread_ctx[0], sf); - fieldstat_global_disable_prometheus_endpoint(); - - /***********send stat data here********************/ - stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric - fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy + for (int i = 0; i < 10; i++) { + polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); + } + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10)); + fieldstat_global_disable_prometheus_endpoint(); shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); - - /*******test statistics***********/ - sleep(2);//wait telegraf to output - FILE *stat_file; - - stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_primary); - fclose(stat_file); - stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file - fclose(stat_file); } /*session1 match rule1; session2 match rule2 @@ -1105,9 +1075,9 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order) ctx = shaping_engine_init(); ASSERT_TRUE(ctx != NULL); - sf1 = shaping_flow_new(); + sf1 = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf1 != NULL); - sf2 = shaping_flow_new(); + sf2 = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf2 != NULL); @@ -1138,6 +1108,7 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order) ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); } + shaper_stat_refresh(&ctx->thread_ctx[0], sf2, 0, 1);//refresh stat, to ensure priority queue_len in swarmkv is correct while (!TAILQ_EMPTY(&expec_tx_queue1)) {//last 90 delay packets stub_refresh_token_bucket(2); @@ -1154,7 +1125,6 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order) fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy shaping_engine_destroy(ctx); @@ -1167,13 +1137,13 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary - shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 470, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 470000, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, primary - shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 190, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 190000, SHAPING_DIR_OUT, profile_type_primary); fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file @@ -1205,9 +1175,9 @@ TEST(two_session_same_rule, udp_tx_in_order) ctx = shaping_engine_init(); ASSERT_TRUE(ctx != NULL); - sf1 = shaping_flow_new(); + sf1 = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf1 != NULL); - sf2 = shaping_flow_new(); + sf2 = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf2 != NULL); stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id); @@ -1241,7 +1211,6 @@ TEST(two_session_same_rule, udp_tx_in_order) fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy shaping_engine_destroy(ctx); @@ -1254,7 +1223,7 @@ TEST(two_session_same_rule, udp_tx_in_order) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 1, 1, 1, 200, 20000, 0, 0, 370, SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 1, 1, 1, 200, 20000, 0, 0, 370000, SHAPING_DIR_OUT, profile_type_primary); fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); @@ -1292,9 +1261,9 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order) ctx = shaping_engine_init(); ASSERT_TRUE(ctx != NULL); - sf1 = shaping_flow_new(); + sf1 = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf1 != NULL); - sf2 = shaping_flow_new(); + sf2 = shaping_flow_new(&ctx->thread_ctx[1]); ASSERT_TRUE(sf2 != NULL); stub_set_matched_shaping_rules(2, rule_ids, prioritys, profile_nums, profile_id); @@ -1305,21 +1274,12 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order) shaper_rules_update(&ctx->thread_ctx[1], sf2, rule_id2, 1); /*******send packets***********/ - for (int i = 0; i < 100; i++) { - send_packets(&ctx->thread_ctx[0], sf1, 1, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0); - send_packets(&ctx->thread_ctx[1], sf2, 1, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0); - - if (i < 5) { - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1)); - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1)); - } - } + send_packets(&ctx->thread_ctx[0], sf1, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0); + send_packets(&ctx->thread_ctx[1], sf2, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0); + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); - stub_refresh_token_bucket(0); - polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//优先级高的session发出一个报文,将优先级信息更新到swarmkv中 - + shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 0, 1);//刷新线程0中的优先级队列长度到swarmkv中 for (int i = 0; i < 10; i++) {//线程1中的session优先级为2,被线程0中优先级为1的session阻断 stub_refresh_token_bucket(0); polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]); @@ -1333,6 +1293,7 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order) ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 priority 1 } + shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 0, 1);//刷新线程0中的优先级队列长度到swarmkv中 while (!TAILQ_EMPTY(&expec_tx_queue2)) { stub_refresh_token_bucket(0); polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]); @@ -1349,9 +1310,6 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order) stub_clear_matched_shaping_rules(); } - - - /*session1 match rule1; session2 match rule2 rule1: priority:1 @@ -1362,7 +1320,7 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order) profile_a: limit 1000 */ -TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order) +TEST(two_session_diff_priority_same_profile, session_timer_test) { struct stub_pkt_queue expec_tx_queue1; struct stub_pkt_queue expec_tx_queue2; @@ -1376,9 +1334,6 @@ TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order) int profile_nums[] = {1, 1}; int prioritys[] = {1, 2}; int profile_id[][MAX_REF_PROFILE] = {{0}, {0}}; - int stream1_pkt_num = 0; - int stream2_pkt_num = 0; - time_t curr_time; TAILQ_INIT(&expec_tx_queue1); @@ -1387,9 +1342,9 @@ TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order) ctx = shaping_engine_init(); ASSERT_TRUE(ctx != NULL); - sf1 = shaping_flow_new(); + sf1 = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf1 != NULL); - sf2 = shaping_flow_new(); + sf2 = shaping_flow_new(&ctx->thread_ctx[1]); ASSERT_TRUE(sf2 != NULL); stub_set_matched_shaping_rules(2, rule_ids, prioritys, profile_nums, profile_id); @@ -1397,45 +1352,48 @@ TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order) stub_set_token_bucket_avl_per_sec(0, 1000, SHAPING_DIR_OUT); actual_tx_queue = stub_get_tx_queue(); shaper_rules_update(&ctx->thread_ctx[0], sf1, rule_id1, 1); - shaper_rules_update(&ctx->thread_ctx[0], sf2, rule_id2, 1); + shaper_rules_update(&ctx->thread_ctx[1], sf2, rule_id2, 1); /*******send packets***********/ - {//为方便判断统计时的顺序,先给rule1发送一个报文,保证统计发送时rule1顺序在前 - send_packets(&ctx->thread_ctx[0], sf1, 1, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0); - stream1_pkt_num++; - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1)); - } + send_packets(&ctx->thread_ctx[0], sf1, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0); + send_packets(&ctx->thread_ctx[1], sf2, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0); + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10)); + ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); - time(&curr_time); - srand(curr_time); - for (int i = 0; i < 99; i++) { - if (rand() % 2 == 0) { - send_packets(&ctx->thread_ctx[0], sf1, 1, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0); - stream1_pkt_num++; - if (i < 9) { - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1)); - } - } else { - send_packets(&ctx->thread_ctx[0], sf2, 1, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0); - stream2_pkt_num++; - if (i < 9) { - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1)); - } - } + sleep(3);//wait session timer to expire, to refresh priority queue_len to swarmkv + for (int i = 0; i < 200; i++) { + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);//inc time to refresh stat in timer } + polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//timer triggered in polling + polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]); - ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); + for (int i = 0; i < 10; i++) {//线程1中的session优先级为2,被线程0中优先级为1的session阻断 + stub_refresh_token_bucket(0); + polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); + + ASSERT_EQ(-1, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//优先级低,不能发出报文 + } while (!TAILQ_EMPTY(&expec_tx_queue1)) { stub_refresh_token_bucket(0); polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 priority 1 } + sleep(3);//wait session timer to expire, to refresh priority queue_len to swarmkv + for (int i = 0; i < 200; i++) { + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);//inc time to refresh stat in timer + } + polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//timer triggered in polling + polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]); + while (!TAILQ_EMPTY(&expec_tx_queue2)) { stub_refresh_token_bucket(0); - polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); + polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]); + stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET); ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//sf2 priority 2 } @@ -1443,32 +1401,10 @@ TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order) ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); shaping_flow_free(&ctx->thread_ctx[0], sf1); - shaping_flow_free(&ctx->thread_ctx[0], sf2); + shaping_flow_free(&ctx->thread_ctx[1], sf2); fieldstat_global_disable_prometheus_endpoint(); - - /***********send stat data here********************/ - stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric - fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - - shaping_engine_destroy(ctx); stub_clear_matched_shaping_rules(); - - /*******test statistics***********/ - sleep(2);//wait telegraf to output - FILE *stat_file; - - stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); - memset(line, 0, sizeof(line)); - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 1, 0, 1, stream1_pkt_num, stream1_pkt_num*100, 0, 0, -1, SHAPING_DIR_OUT, profile_type_primary);//can't predict a certain latency cause of random - - ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 2, 0, 2, stream2_pkt_num, stream2_pkt_num*100, 0, 0, -1, SHAPING_DIR_OUT, profile_type_primary);//can't predict a certain latency cause of random - - fclose(stat_file); - stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file - fclose(stat_file); } /*session1 match rule1 @@ -1489,7 +1425,7 @@ TEST(statistics, udp_drop_pkt) stub_init(); ctx = shaping_engine_init(); ASSERT_TRUE(ctx != NULL); - sf = shaping_flow_new(); + sf = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf != NULL); stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id); @@ -1516,7 +1452,6 @@ TEST(statistics, udp_drop_pkt) fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy shaper_global_stat_refresh(ctx->global_stat); @@ -1531,7 +1466,7 @@ TEST(statistics, udp_drop_pkt) stat_file = fopen(SHAPING_STAT_FILE_NAME, "r"); memset(line, 0, sizeof(line)); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file)); - shaping_stat_judge(line, 0, 0, 1, SHAPING_SESSION_QUEUE_LEN+10, (SHAPING_SESSION_QUEUE_LEN+10)*100, 100, 0, 228, SHAPING_DIR_OUT, profile_type_primary);//every queued pkt's latency is max + shaping_stat_judge(line, 0, 0, 1, SHAPING_SESSION_QUEUE_LEN+10, (SHAPING_SESSION_QUEUE_LEN+10)*100, 100, 0, 228000, SHAPING_DIR_OUT, profile_type_primary);//every queued pkt's latency is max fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file fclose(stat_file); @@ -1564,7 +1499,7 @@ TEST(statistics, udp_queueing_pkt) ctx = shaping_engine_init(); ASSERT_TRUE(ctx != NULL); - sf = shaping_flow_new(); + sf = shaping_flow_new(&ctx->thread_ctx[0]); ASSERT_TRUE(sf != NULL); stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id); @@ -1577,8 +1512,7 @@ TEST(statistics, udp_queueing_pkt) /***********send stat data here********************/ - stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric - shaper_stat_refresh(ctx->thread_ctx[0].stat, sf, ctx->thread_ctx[0].thread_index, 1); + shaper_stat_refresh(&ctx->thread_ctx[0], sf, ctx->thread_ctx[0].thread_index, 1); fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy shaper_global_stat_refresh(ctx->global_stat); sleep(2);//wait telegraf generate metric @@ -1605,7 +1539,6 @@ TEST(statistics, udp_queueing_pkt) fieldstat_global_disable_prometheus_endpoint(); /***********send stat data here********************/ - stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy shaper_global_stat_refresh(ctx->global_stat); @@ -1623,7 +1556,7 @@ TEST(statistics, udp_queueing_pkt) shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 90, 0, SHAPING_DIR_OUT, profile_type_primary); ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//stat data last sent - shaping_stat_judge(line, 0, 0, 1, 90, 9000, 0, 0, 90 + (STUB_TIME_INC_FOR_METRIC_SEND / 1000), SHAPING_DIR_OUT, profile_type_primary); + shaping_stat_judge(line, 0, 0, 1, 90, 9000, 0, 0, 90000, SHAPING_DIR_OUT, profile_type_primary); fclose(stat_file); stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file @@ -1640,6 +1573,6 @@ TEST(statistics, udp_queueing_pkt) int main(int argc, char **argv) { testing::InitGoogleTest(&argc, argv); - //testing::GTEST_FLAG(filter) = "single_session.udp_borrow_same_priority_9"; + //testing::GTEST_FLAG(filter) = "single_session_async.udp_close_before_async_exec"; return RUN_ALL_TESTS(); }
\ No newline at end of file diff --git a/shaping/test/stub.cpp b/shaping/test/stub.cpp index 9406752..25d4550 100644 --- a/shaping/test/stub.cpp +++ b/shaping/test/stub.cpp @@ -15,6 +15,7 @@ #include <pthread.h> #include "stub.h" #include "shaper_maat.h" +#include "log.h" using namespace std; @@ -166,6 +167,8 @@ void stub_init() { int i; + LOG_INIT("./conf/zlog.conf"); + TAILQ_INIT(&tx_queue); memset(&matched_rules, 0, sizeof(struct stub_matched_rules)); memset(&pf_array, 0, MAX_STUB_PROFILE_NUM * sizeof(struct shaping_profile)); @@ -418,6 +421,21 @@ void swarmkv_close(struct swarmkv * db) } return; } + +void swarmkv_caller_loop(struct swarmkv *db, int flags, struct timeval *tv) +{ + return; +} + +int swarmkv_options_set_caller_thread_number(struct swarmkv_options *opts, int nr_caller_threads) +{ + return 0; +} + +int swarmkv_options_set_worker_thread_number(struct swarmkv_options *opts, int nr_worker_threads) +{ + return 0; +} /**********************************************/ /*************stub of maat*********************/ diff --git a/shaping/test/stub.h b/shaping/test/stub.h index 7f09e02..80e099b 100644 --- a/shaping/test/stub.h +++ b/shaping/test/stub.h @@ -9,8 +9,7 @@ #define STUB_MAAT_SHAPING_RULE_TABLE_ID 0 #define STUB_MAAT_SHAPING_PROFILE_TABLE_ID 1 -#define STUB_TIME_INC_FOR_PACKET 1000 -#define STUB_TIME_INC_FOR_METRIC_SEND 1000000 +#define STUB_TIME_INC_FOR_PACKET 1000000 #define STUB_TEST_VSYS_ID 2333 diff --git a/shaping/test/test_conf/zlog.conf b/shaping/test/test_conf/zlog.conf index ad3644d..4d6c9d5 100644 --- a/shaping/test/test_conf/zlog.conf +++ b/shaping/test/test_conf/zlog.conf @@ -8,4 +8,4 @@ FATAL=30 [rules] log_shaping.DEBUG "./log/shaping.log.%d(%F)"; -log_shaping.DEBUG >stdout;
\ No newline at end of file +#log_shaping.DEBUG >stdout;
\ No newline at end of file |
