summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author刘畅 <[email protected]>2023-10-11 01:43:14 +0000
committer刘畅 <[email protected]>2023-10-11 01:43:14 +0000
commit329a44aef63ed3247812481a112b8fac0facbde0 (patch)
tree4cd6b3eb3e2caead4309df025d0d968be24431d4
parentcd9e72e6e614691903e9befd0b1b7d9d4c3d8845 (diff)
parentba9ed468a08a21e92f2e1cf5d61db493f9e5b188 (diff)
Merge branch 'adapt_new_arc_swarmkv' into 'rel'v1.3.0
optimize performance for swarmkv See merge request tango/shaping-engine!42
-rw-r--r--.gitlab-ci.yml204
-rw-r--r--common/include/utils.h6
-rw-r--r--deps/timeout/Makefile68
-rw-r--r--deps/timeout/test-timeout.c530
-rw-r--r--deps/timeout/timeout-bitops.c249
-rw-r--r--deps/timeout/timeout-debug.h77
-rw-r--r--deps/timeout/timeout.c749
-rw-r--r--deps/timeout/timeout.h256
-rw-r--r--shaping/CMakeLists.txt3
-rw-r--r--shaping/include/shaper.h15
-rw-r--r--shaping/include/shaper_stat.h2
-rw-r--r--shaping/include/shaper_swarmkv.h3
-rw-r--r--shaping/src/main.cpp2
-rw-r--r--shaping/src/shaper.cpp140
-rw-r--r--shaping/src/shaper_maat.cpp2
-rw-r--r--shaping/src/shaper_session.cpp2
-rw-r--r--shaping/src/shaper_stat.cpp27
-rw-r--r--shaping/src/shaper_swarmkv.cpp6
-rw-r--r--shaping/test/gtest_shaper.cpp253
-rw-r--r--shaping/test/stub.cpp18
-rw-r--r--shaping/test/stub.h3
-rw-r--r--shaping/test/test_conf/zlog.conf2
22 files changed, 2275 insertions, 342 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index c0cb8a2..aa12c63 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -13,108 +13,108 @@ stages:
# compile use image: build-env:master
###############################################################################
-.build_by_travis_for_centos7:
- image: $BUILD_IMAGE_CENTOS7
- before_script:
- - mkdir -p $BUILD_PADDING_PREFIX/$CI_PROJECT_NAMESPACE/
- - ln -s $CI_PROJECT_DIR $BUILD_PADDING_PREFIX/$CI_PROJECT_PATH
- - cd $BUILD_PADDING_PREFIX/$CI_PROJECT_PATH
- - chmod +x ./ci/travis.sh
- script:
- - yum makecache
- - ./ci/travis.sh
- tags:
- - share
-
-branch_build_debug_for_centos7:
- stage: build
- extends: .build_by_travis_for_centos7
- variables:
- BUILD_TYPE: Debug
- except:
- - /^develop-.*$/i
- - /^release-.*$/i
- - tags
-
-branch_build_release_for_centos7:
- stage: build
- variables:
- BUILD_TYPE: RelWithDebInfo
- extends: .build_by_travis_for_centos7
- except:
- - /^develop-.*$/i
- - /^release-.*$/i
- - tags
-
-develop_build_debug_for_centos7:
- stage: build
- extends: .build_by_travis_for_centos7
- variables:
- TESTING_VERSION_BUILD: 1
- UPLOAD_SYMBOL_FILES: 1
- BUILD_TYPE: Debug
-# ASAN_OPTION: ADDRESS
- PACKAGE: 1
- PULP3_REPO_NAME: tsg-testing-x86_64.el7
- PULP3_DIST_NAME: tsg-testing-x86_64.el7
- artifacts:
- name: "shaping_engine-develop-$CI_COMMIT_REF_NAME-debug"
- paths:
- - build/*.rpm
- only:
- - /^develop-.*$/i
- - /^release-.*$/i
-
-develop_build_release_for_centos7:
- stage: build
- extends: .build_by_travis_for_centos7
- variables:
- TESTING_VERSION_BUILD: 1
- UPLOAD_SYMBOL_FILES: 1
-# ASAN_OPTION: ADDRESS
- BUILD_TYPE: RelWithDebInfo
- PACKAGE: 1
- PULP3_REPO_NAME: tsg-testing-x86_64.el7
- PULP3_DIST_NAME: tsg-testing-x86_64.el7
- artifacts:
- name: "shaping_engine-develop-$CI_COMMIT_REF_NAME-release"
- paths:
- - build/*.rpm
- only:
- - /^develop-.*$/i
- - /^release-.*$/i
-
-release_build_debug_for_centos7:
- stage: build
- variables:
- UPLOAD_SYMBOL_FILES: 1
- BUILD_TYPE: Debug
- PACKAGE: 1
- PULP3_REPO_NAME: tsg-stable-x86_64.el7
- PULP3_DIST_NAME: tsg-stable-x86_64.el7
- extends: .build_by_travis_for_centos7
- artifacts:
- name: "shaping_engine-install-$CI_COMMIT_REF_NAME-debug"
- paths:
- - build/*.rpm
- only:
- - tags
-
-release_build_release_for_centos7:
- stage: build
- variables:
- BUILD_TYPE: RelWithDebInfo
- UPLOAD_SYMBOL_FILES: 1
- PACKAGE: 1
- PULP3_REPO_NAME: tsg-stable-x86_64.el7
- PULP3_DIST_NAME: tsg-stable-x86_64.el7
- extends: .build_by_travis_for_centos7
- artifacts:
- name: "shaping_engine-install-$CI_COMMIT_REF_NAME-release"
- paths:
- - build/*.rpm
- only:
- - tags
+# .build_by_travis_for_centos7:
+# image: $BUILD_IMAGE_CENTOS7
+# before_script:
+# - mkdir -p $BUILD_PADDING_PREFIX/$CI_PROJECT_NAMESPACE/
+# - ln -s $CI_PROJECT_DIR $BUILD_PADDING_PREFIX/$CI_PROJECT_PATH
+# - cd $BUILD_PADDING_PREFIX/$CI_PROJECT_PATH
+# - chmod +x ./ci/travis.sh
+# script:
+# - yum makecache
+# - ./ci/travis.sh
+# tags:
+# - share
+
+# branch_build_debug_for_centos7:
+# stage: build
+# extends: .build_by_travis_for_centos7
+# variables:
+# BUILD_TYPE: Debug
+# except:
+# - /^develop-.*$/i
+# - /^release-.*$/i
+# - tags
+
+# branch_build_release_for_centos7:
+# stage: build
+# variables:
+# BUILD_TYPE: RelWithDebInfo
+# extends: .build_by_travis_for_centos7
+# except:
+# - /^develop-.*$/i
+# - /^release-.*$/i
+# - tags
+
+# develop_build_debug_for_centos7:
+# stage: build
+# extends: .build_by_travis_for_centos7
+# variables:
+# TESTING_VERSION_BUILD: 1
+# UPLOAD_SYMBOL_FILES: 1
+# BUILD_TYPE: Debug
+# # ASAN_OPTION: ADDRESS
+# PACKAGE: 1
+# PULP3_REPO_NAME: tsg-testing-x86_64.el7
+# PULP3_DIST_NAME: tsg-testing-x86_64.el7
+# artifacts:
+# name: "shaping_engine-develop-$CI_COMMIT_REF_NAME-debug"
+# paths:
+# - build/*.rpm
+# only:
+# - /^develop-.*$/i
+# - /^release-.*$/i
+
+# develop_build_release_for_centos7:
+# stage: build
+# extends: .build_by_travis_for_centos7
+# variables:
+# TESTING_VERSION_BUILD: 1
+# UPLOAD_SYMBOL_FILES: 1
+# # ASAN_OPTION: ADDRESS
+# BUILD_TYPE: RelWithDebInfo
+# PACKAGE: 1
+# PULP3_REPO_NAME: tsg-testing-x86_64.el7
+# PULP3_DIST_NAME: tsg-testing-x86_64.el7
+# artifacts:
+# name: "shaping_engine-develop-$CI_COMMIT_REF_NAME-release"
+# paths:
+# - build/*.rpm
+# only:
+# - /^develop-.*$/i
+# - /^release-.*$/i
+
+# release_build_debug_for_centos7:
+# stage: build
+# variables:
+# UPLOAD_SYMBOL_FILES: 1
+# BUILD_TYPE: Debug
+# PACKAGE: 1
+# PULP3_REPO_NAME: tsg-stable-x86_64.el7
+# PULP3_DIST_NAME: tsg-stable-x86_64.el7
+# extends: .build_by_travis_for_centos7
+# artifacts:
+# name: "shaping_engine-install-$CI_COMMIT_REF_NAME-debug"
+# paths:
+# - build/*.rpm
+# only:
+# - tags
+
+# release_build_release_for_centos7:
+# stage: build
+# variables:
+# BUILD_TYPE: RelWithDebInfo
+# UPLOAD_SYMBOL_FILES: 1
+# PACKAGE: 1
+# PULP3_REPO_NAME: tsg-stable-x86_64.el7
+# PULP3_DIST_NAME: tsg-stable-x86_64.el7
+# extends: .build_by_travis_for_centos7
+# artifacts:
+# name: "shaping_engine-install-$CI_COMMIT_REF_NAME-release"
+# paths:
+# - build/*.rpm
+# only:
+# - tags
###############################################################################
# compile use image: build-env:rockylinux
diff --git a/common/include/utils.h b/common/include/utils.h
index 0f257a5..c03b291 100644
--- a/common/include/utils.h
+++ b/common/include/utils.h
@@ -8,6 +8,12 @@ extern "C"
#define MIN(a, b) ((a) > (b) ? (b) : (a))
+#ifndef container_of
+#define container_of(ptr, type, member) ({ \
+ const typeof( ((type *)0)->member ) *__mptr = (ptr); \
+ (type *)( (char *)__mptr - offsetof(type,member) );})
+#endif
+
#define LOG_TAG_SHAPING "SHAPING"
#define LOG_TAG_SWARMKV "SWARMKV"
#define LOG_TAG_STAT "STAT"
diff --git a/deps/timeout/Makefile b/deps/timeout/Makefile
new file mode 100644
index 0000000..554ebb9
--- /dev/null
+++ b/deps/timeout/Makefile
@@ -0,0 +1,68 @@
+# NOTE: GNU Make 3.81 won't export MAKEFLAGS if .POSIX is specified, but
+# Solaris make won't export MAKEFLAGS unless .POSIX is specified.
+$(firstword ignore).POSIX:
+
+.DEFAULT_GOAL = all
+
+.SUFFIXES:
+
+all:
+
+#
+# USER-MODIFIABLE MACROS
+#
+top_srcdir = .
+top_builddir = .
+
+CFLAGS = -O2 -march=native -g -Wall -Wextra -Wno-unused-parameter -Wno-unused-function
+SOFLAGS = $$(auto_soflags)
+LIBS = $$(auto_libs)
+
+ALL_CPPFLAGS = -I$(top_srcdir) -DWHEEL_BIT=$(WHEEL_BIT) -DWHEEL_NUM=$(WHEEL_NUM) $(CPPFLAGS)
+ALL_CFLAGS = $(CFLAGS)
+ALL_SOFLAGS = $(SOFLAGS)
+ALL_LDFLAGS = $(LDFLAGS)
+ALL_LIBS = $(LIBS)
+
+LUA_API = 5.3
+LUA = lua
+LUA51_CPPFLAGS = $(LUA_CPPFLAGS)
+LUA52_CPPFLAGS = $(LUA_CPPFLAGS)
+LUA53_CPPFLAGS = $(LUA_CPPFLAGS)
+
+WHEEL_BIT = 6
+WHEEL_NUM = 4
+
+RM = rm -f
+
+# END MACROS
+
+SHRC = \
+ top_srcdir="$(top_srcdir)"; \
+ top_builddir="$(top_builddir)"; \
+ . "$${top_srcdir}/Rules.shrc"
+
+LUA_APIS = 5.1 5.2 5.3
+
+include $(top_srcdir)/lua/Rules.mk
+include $(top_srcdir)/bench/Rules.mk
+
+all: test-timeout
+
+timeout.o: $(top_srcdir)/timeout.c
+test-timeout.o: $(top_srcdir)/test-timeout.c
+
+timeout.o test-timeout.o:
+ @$(SHRC); echo_cmd $(CC) $(ALL_CFLAGS) -c -o $@ $${top_srcdir}/$(@F:%.o=%.c) $(ALL_CPPFLAGS)
+
+test-timeout: timeout.o test-timeout.o
+ @$(SHRC); echo_cmd $(CC) $(ALL_CPPFLAGS) $(ALL_CFLAGS) -o $@ timeout.o test-timeout.o
+
+.PHONY: clean clean~
+
+clean:
+ $(RM) $(top_builddir)/test-timeout $(top_builddir)/*.o
+ $(RM) -r $(top_builddir)/*.dSYM
+
+clean~:
+ find $(top_builddir) $(top_srcdir) -name "*~" -exec $(RM) -- {} "+"
diff --git a/deps/timeout/test-timeout.c b/deps/timeout/test-timeout.c
new file mode 100644
index 0000000..ab9f72d
--- /dev/null
+++ b/deps/timeout/test-timeout.c
@@ -0,0 +1,530 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <limits.h>
+
+#include "timeout.h"
+
+#define THE_END_OF_TIME ((timeout_t)-1)
+
+static int check_misc(void) {
+ if (TIMEOUT_VERSION != timeout_version())
+ return 1;
+ if (TIMEOUT_V_REL != timeout_v_rel())
+ return 1;
+ if (TIMEOUT_V_API != timeout_v_api())
+ return 1;
+ if (TIMEOUT_V_ABI != timeout_v_abi())
+ return 1;
+ if (strcmp(timeout_vendor(), TIMEOUT_VENDOR))
+ return 1;
+ return 0;
+}
+
+static int check_open_close(timeout_t hz_set, timeout_t hz_expect) {
+ int err=0;
+ struct timeouts *tos = timeouts_open(hz_set, &err);
+ if (!tos)
+ return 1;
+ if (err)
+ return 1;
+ if (hz_expect != timeouts_hz(tos))
+ return 1;
+ timeouts_close(tos);
+ return 0;
+}
+
+/* Not very random */
+static timeout_t random_to(timeout_t min, timeout_t max)
+{
+ if (max <= min)
+ return min;
+ /* Not actually all that random, but should exercise the code. */
+ timeout_t rand64 = random() * (timeout_t)INT_MAX + random();
+ return min + (rand64 % (max-min));
+}
+
+/* configuration for check_randomized */
+struct rand_cfg {
+ /* When creating timeouts, smallest possible delay */
+ timeout_t min_timeout;
+ /* When creating timeouts, largest possible delay */
+ timeout_t max_timeout;
+ /* First time to start the clock at. */
+ timeout_t start_at;
+ /* Do not advance the clock past this time. */
+ timeout_t end_at;
+ /* Number of timeouts to create and monitor. */
+ int n_timeouts;
+ /* Advance the clock by no more than this each step. */
+ timeout_t max_step;
+ /* Use relative timers and stepping */
+ int relative;
+ /* Every time the clock ticks, try removing this many timeouts at
+ * random. */
+ int try_removing;
+ /* When we're done, advance the clock to the end of time. */
+ int finalize;
+};
+
+static int check_randomized(const struct rand_cfg *cfg)
+{
+#define FAIL() do { \
+ printf("Failure on line %d\n", __LINE__); \
+ goto done; \
+ } while (0)
+
+ int i, err;
+ int rv = 1;
+ struct timeout *t = calloc(cfg->n_timeouts, sizeof(struct timeout));
+ timeout_t *timeouts = calloc(cfg->n_timeouts, sizeof(timeout_t));
+ uint8_t *fired = calloc(cfg->n_timeouts, sizeof(uint8_t));
+ uint8_t *found = calloc(cfg->n_timeouts, sizeof(uint8_t));
+ uint8_t *deleted = calloc(cfg->n_timeouts, sizeof(uint8_t));
+ struct timeouts *tos = timeouts_open(0, &err);
+ timeout_t now = cfg->start_at;
+ int n_added_pending = 0, cnt_added_pending = 0;
+ int n_added_expired = 0, cnt_added_expired = 0;
+ struct timeouts_it it_p, it_e, it_all;
+ int p_done = 0, e_done = 0, all_done = 0;
+ struct timeout *to = NULL;
+ const int rel = cfg->relative;
+
+ if (!t || !timeouts || !tos || !fired || !found || !deleted)
+ FAIL();
+ timeouts_update(tos, cfg->start_at);
+
+ for (i = 0; i < cfg->n_timeouts; ++i) {
+ if (&t[i] != timeout_init(&t[i], rel ? 0 : TIMEOUT_ABS))
+ FAIL();
+ if (timeout_pending(&t[i]))
+ FAIL();
+ if (timeout_expired(&t[i]))
+ FAIL();
+
+ timeouts[i] = random_to(cfg->min_timeout, cfg->max_timeout);
+
+ timeouts_add(tos, &t[i], timeouts[i] - (rel ? now : 0));
+ if (timeouts[i] <= cfg->start_at) {
+ if (timeout_pending(&t[i]))
+ FAIL();
+ if (! timeout_expired(&t[i]))
+ FAIL();
+ ++n_added_expired;
+ } else {
+ if (! timeout_pending(&t[i]))
+ FAIL();
+ if (timeout_expired(&t[i]))
+ FAIL();
+ ++n_added_pending;
+ }
+ }
+
+ if (!!n_added_pending != timeouts_pending(tos))
+ FAIL();
+ if (!!n_added_expired != timeouts_expired(tos))
+ FAIL();
+
+ /* Test foreach, interleaving a few iterators. */
+ TIMEOUTS_IT_INIT(&it_p, TIMEOUTS_PENDING);
+ TIMEOUTS_IT_INIT(&it_e, TIMEOUTS_EXPIRED);
+ TIMEOUTS_IT_INIT(&it_all, TIMEOUTS_ALL);
+ while (! (p_done && e_done && all_done)) {
+ if (!p_done) {
+ to = timeouts_next(tos, &it_p);
+ if (to) {
+ i = to - &t[0];
+ ++found[i];
+ ++cnt_added_pending;
+ } else {
+ p_done = 1;
+ }
+ }
+ if (!e_done) {
+ to = timeouts_next(tos, &it_e);
+ if (to) {
+ i = to - &t[0];
+ ++found[i];
+ ++cnt_added_expired;
+ } else {
+ e_done = 1;
+ }
+ }
+ if (!all_done) {
+ to = timeouts_next(tos, &it_all);
+ if (to) {
+ i = to - &t[0];
+ ++found[i];
+ } else {
+ all_done = 1;
+ }
+ }
+ }
+
+ for (i = 0; i < cfg->n_timeouts; ++i) {
+ if (found[i] != 2)
+ FAIL();
+ }
+ if (cnt_added_expired != n_added_expired)
+ FAIL();
+ if (cnt_added_pending != n_added_pending)
+ FAIL();
+
+ while (NULL != (to = timeouts_get(tos))) {
+ i = to - &t[0];
+ assert(&t[i] == to);
+ if (timeouts[i] > cfg->start_at)
+ FAIL(); /* shouldn't have happened yet */
+
+ --n_added_expired; /* drop expired timeouts. */
+ ++fired[i];
+ }
+
+ if (n_added_expired != 0)
+ FAIL();
+
+ while (now < cfg->end_at) {
+ int n_fired_this_time = 0;
+ timeout_t first_at = timeouts_timeout(tos) + now;
+
+ timeout_t oldtime = now;
+ timeout_t step = random_to(1, cfg->max_step);
+ int another;
+ now += step;
+ if (rel)
+ timeouts_step(tos, step);
+ else
+ timeouts_update(tos, now);
+
+ for (i = 0; i < cfg->try_removing; ++i) {
+ int idx = random() % cfg->n_timeouts;
+ if (! fired[idx]) {
+ timeout_del(&t[idx]);
+ ++deleted[idx];
+ }
+ }
+
+ another = (timeouts_timeout(tos) == 0);
+
+ while (NULL != (to = timeouts_get(tos))) {
+ if (! another)
+ FAIL(); /* Thought we saw the last one! */
+ i = to - &t[0];
+ assert(&t[i] == to);
+ if (timeouts[i] > now)
+ FAIL(); /* shouldn't have happened yet */
+ if (timeouts[i] <= oldtime)
+ FAIL(); /* should have happened already */
+ if (timeouts[i] < first_at)
+ FAIL(); /* first_at should've been earlier */
+ fired[i]++;
+ n_fired_this_time++;
+ another = (timeouts_timeout(tos) == 0);
+ }
+ if (n_fired_this_time && first_at > now)
+ FAIL(); /* first_at should've been earlier */
+ if (another)
+ FAIL(); /* Huh? We think there are more? */
+ if (!timeouts_check(tos, stderr))
+ FAIL();
+ }
+
+ for (i = 0; i < cfg->n_timeouts; ++i) {
+ if (fired[i] > 1)
+ FAIL(); /* Nothing fired twice. */
+ if (timeouts[i] <= now) {
+ if (!(fired[i] || deleted[i]))
+ FAIL();
+ } else {
+ if (fired[i])
+ FAIL();
+ }
+ if (fired[i] && deleted[i])
+ FAIL();
+ if (cfg->finalize > 1) {
+ if (!fired[i])
+ timeout_del(&t[i]);
+ }
+ }
+
+ /* Now nothing more should fire between now and the end of time. */
+ if (cfg->finalize) {
+ timeouts_update(tos, THE_END_OF_TIME);
+ if (cfg->finalize > 1) {
+ if (timeouts_get(tos))
+ FAIL();
+ TIMEOUTS_FOREACH(to, tos, TIMEOUTS_ALL)
+ FAIL();
+ }
+ }
+ rv = 0;
+
+ done:
+ if (tos) timeouts_close(tos);
+ if (t) free(t);
+ if (timeouts) free(timeouts);
+ if (fired) free(fired);
+ if (found) free(found);
+ if (deleted) free(deleted);
+ return rv;
+}
+
+struct intervals_cfg {
+ const timeout_t *timeouts;
+ int n_timeouts;
+ timeout_t start_at;
+ timeout_t end_at;
+ timeout_t skip;
+};
+
+int
+check_intervals(struct intervals_cfg *cfg)
+{
+ int i, err;
+ int rv = 1;
+ struct timeout *to;
+ struct timeout *t = calloc(cfg->n_timeouts, sizeof(struct timeout));
+ unsigned *fired = calloc(cfg->n_timeouts, sizeof(unsigned));
+ struct timeouts *tos = timeouts_open(0, &err);
+
+ timeout_t now = cfg->start_at;
+ if (!t || !tos || !fired)
+ FAIL();
+
+ timeouts_update(tos, now);
+
+ for (i = 0; i < cfg->n_timeouts; ++i) {
+ if (&t[i] != timeout_init(&t[i], TIMEOUT_INT))
+ FAIL();
+ if (timeout_pending(&t[i]))
+ FAIL();
+ if (timeout_expired(&t[i]))
+ FAIL();
+
+ timeouts_add(tos, &t[i], cfg->timeouts[i]);
+ if (! timeout_pending(&t[i]))
+ FAIL();
+ if (timeout_expired(&t[i]))
+ FAIL();
+ }
+
+ while (now < cfg->end_at) {
+ timeout_t delay = timeouts_timeout(tos);
+ if (cfg->skip && delay < cfg->skip)
+ delay = cfg->skip;
+ timeouts_step(tos, delay);
+ now += delay;
+
+ while (NULL != (to = timeouts_get(tos))) {
+ i = to - &t[0];
+ assert(&t[i] == to);
+ fired[i]++;
+ if (0 != (to->expires - cfg->start_at) % cfg->timeouts[i])
+ FAIL();
+ if (to->expires <= now)
+ FAIL();
+ if (to->expires > now + cfg->timeouts[i])
+ FAIL();
+ }
+ if (!timeouts_check(tos, stderr))
+ FAIL();
+ }
+
+ timeout_t duration = now - cfg->start_at;
+ for (i = 0; i < cfg->n_timeouts; ++i) {
+ if (cfg->skip) {
+ if (fired[i] > duration / cfg->timeouts[i])
+ FAIL();
+ } else {
+ if (fired[i] != duration / cfg->timeouts[i])
+ FAIL();
+ }
+ if (!timeout_pending(&t[i]))
+ FAIL();
+ }
+
+ rv = 0;
+ done:
+ if (t) free(t);
+ if (fired) free(fired);
+ if (tos) free(tos);
+ return rv;
+}
+
+int
+main(int argc, char **argv)
+{
+ int j;
+ int n_failed = 0;
+#define DO(fn) do { \
+ printf("."); fflush(stdout); \
+ if (fn) { \
+ ++n_failed; \
+ printf("%s failed\n", #fn); \
+ } \
+ } while (0)
+
+#define DO_N(n, fn) do { \
+ for (j = 0; j < (n); ++j) { \
+ DO(fn); \
+ } \
+ } while (0)
+
+ DO(check_misc());
+ DO(check_open_close(1000, 1000));
+ DO(check_open_close(0, TIMEOUT_mHZ));
+
+ struct rand_cfg cfg1 = {
+ .min_timeout = 1,
+ .max_timeout = 100,
+ .start_at = 5,
+ .end_at = 1000,
+ .n_timeouts = 1000,
+ .max_step = 10,
+ .relative = 0,
+ .try_removing = 0,
+ .finalize = 2,
+ };
+ DO_N(300,check_randomized(&cfg1));
+
+ struct rand_cfg cfg2 = {
+ .min_timeout = 20,
+ .max_timeout = 1000,
+ .start_at = 10,
+ .end_at = 100,
+ .n_timeouts = 1000,
+ .max_step = 5,
+ .relative = 1,
+ .try_removing = 0,
+ .finalize = 2,
+ };
+ DO_N(300,check_randomized(&cfg2));
+
+ struct rand_cfg cfg2b = {
+ .min_timeout = 20,
+ .max_timeout = 1000,
+ .start_at = 10,
+ .end_at = 100,
+ .n_timeouts = 1000,
+ .max_step = 5,
+ .relative = 1,
+ .try_removing = 0,
+ .finalize = 1,
+ };
+ DO_N(300,check_randomized(&cfg2b));
+
+ struct rand_cfg cfg2c = {
+ .min_timeout = 20,
+ .max_timeout = 1000,
+ .start_at = 10,
+ .end_at = 100,
+ .n_timeouts = 1000,
+ .max_step = 5,
+ .relative = 1,
+ .try_removing = 0,
+ .finalize = 0,
+ };
+ DO_N(300,check_randomized(&cfg2c));
+
+ struct rand_cfg cfg3 = {
+ .min_timeout = 2000,
+ .max_timeout = ((uint64_t)1) << 50,
+ .start_at = 100,
+ .end_at = ((uint64_t)1) << 49,
+ .n_timeouts = 1000,
+ .max_step = ((uint64_t)1) << 31,
+ .relative = 0,
+ .try_removing = 0,
+ .finalize = 2,
+ };
+ DO_N(10,check_randomized(&cfg3));
+
+ struct rand_cfg cfg3b = {
+ .min_timeout = ((uint64_t)1) << 50,
+ .max_timeout = ((uint64_t)1) << 52,
+ .start_at = 100,
+ .end_at = ((uint64_t)1) << 53,
+ .n_timeouts = 1000,
+ .max_step = ((uint64_t)1)<<48,
+ .relative = 0,
+ .try_removing = 0,
+ .finalize = 2,
+ };
+ DO_N(10,check_randomized(&cfg3b));
+
+ struct rand_cfg cfg4 = {
+ .min_timeout = 2000,
+ .max_timeout = ((uint64_t)1) << 30,
+ .start_at = 100,
+ .end_at = ((uint64_t)1) << 26,
+ .n_timeouts = 10000,
+ .max_step = 1<<16,
+ .relative = 0,
+ .try_removing = 3,
+ .finalize = 2,
+ };
+ DO_N(10,check_randomized(&cfg4));
+
+ const timeout_t primes[] = {
+ 2,3,5,7,11,13,17,19,23,29,31,37,41,43,47,53,
+ 59,61,67,71,73,79,83,89,97
+ };
+ const timeout_t factors_of_1337[] = {
+ 1, 7, 191, 1337
+ };
+ const timeout_t multiples_of_five[] = {
+ 5, 10, 15, 20, 25, 30, 35, 40, 45, 50
+ };
+
+ struct intervals_cfg icfg1 = {
+ .timeouts = primes,
+ .n_timeouts = sizeof(primes)/sizeof(timeout_t),
+ .start_at = 50,
+ .end_at = 5322,
+ .skip = 0,
+ };
+ DO(check_intervals(&icfg1));
+
+ struct intervals_cfg icfg2 = {
+ .timeouts = factors_of_1337,
+ .n_timeouts = sizeof(factors_of_1337)/sizeof(timeout_t),
+ .start_at = 50,
+ .end_at = 50000,
+ .skip = 0,
+ };
+ DO(check_intervals(&icfg2));
+
+ struct intervals_cfg icfg3 = {
+ .timeouts = multiples_of_five,
+ .n_timeouts = sizeof(multiples_of_five)/sizeof(timeout_t),
+ .start_at = 49,
+ .end_at = 5333,
+ .skip = 0,
+ };
+ DO(check_intervals(&icfg3));
+
+ struct intervals_cfg icfg4 = {
+ .timeouts = primes,
+ .n_timeouts = sizeof(primes)/sizeof(timeout_t),
+ .start_at = 50,
+ .end_at = 5322,
+ .skip = 16,
+ };
+ DO(check_intervals(&icfg4));
+
+ if (n_failed) {
+ puts("\nFAIL");
+ } else {
+ puts("\nOK");
+ }
+ return !!n_failed;
+}
+
+/* TODO:
+
+ * Solve PR#3.
+
+ * Investigate whether any untaken branches are possible.
+
+ */
diff --git a/deps/timeout/timeout-bitops.c b/deps/timeout/timeout-bitops.c
new file mode 100644
index 0000000..d8325db
--- /dev/null
+++ b/deps/timeout/timeout-bitops.c
@@ -0,0 +1,249 @@
+#include <stdint.h>
+#ifdef _MSC_VER
+#include <intrin.h> /* _BitScanForward, _BitScanReverse */
+#endif
+
+/* First define ctz and clz functions; these are compiler-dependent if
+ * you want them to be fast. */
+#if defined(__GNUC__) && !defined(TIMEOUT_DISABLE_GNUC_BITOPS)
+
+/* On GCC and clang and some others, we can use __builtin functions. They
+ * are not defined for n==0, but timeout.s never calls them with n==0. */
+
+#define ctz64(n) __builtin_ctzll(n)
+#define clz64(n) __builtin_clzll(n)
+#if LONG_BITS == 32
+#define ctz32(n) __builtin_ctzl(n)
+#define clz32(n) __builtin_clzl(n)
+#else
+#define ctz32(n) __builtin_ctz(n)
+#define clz32(n) __builtin_clz(n)
+#endif
+
+#elif defined(_MSC_VER) && !defined(TIMEOUT_DISABLE_MSVC_BITOPS)
+
+/* On MSVC, we have these handy functions. We can ignore their return
+ * values, since we will never supply val == 0. */
+
+static __inline int ctz32(unsigned long val)
+{
+ DWORD zeros = 0;
+ _BitScanForward(&zeros, val);
+ return zeros;
+}
+static __inline int clz32(unsigned long val)
+{
+ DWORD zeros = 0;
+ _BitScanReverse(&zeros, val);
+ return zeros;
+}
+#ifdef _WIN64
+/* According to the documentation, these only exist on Win64. */
+static __inline int ctz64(uint64_t val)
+{
+ DWORD zeros = 0;
+ _BitScanForward64(&zeros, val);
+ return zeros;
+}
+static __inline int clz64(uint64_t val)
+{
+ DWORD zeros = 0;
+ _BitScanReverse64(&zeros, val);
+ return zeros;
+}
+#else
+static __inline int ctz64(uint64_t val)
+{
+ uint32_t lo = (uint32_t) val;
+ uint32_t hi = (uint32_t) (val >> 32);
+ return lo ? ctz32(lo) : 32 + ctz32(hi);
+}
+static __inline int clz64(uint64_t val)
+{
+ uint32_t lo = (uint32_t) val;
+ uint32_t hi = (uint32_t) (val >> 32);
+ return hi ? clz32(hi) : 32 + clz32(lo);
+}
+#endif
+
+/* End of MSVC case. */
+
+#else
+
+/* TODO: There are more clever ways to do this in the generic case. */
+
+
+#define process_(one, cz_bits, bits) \
+ if (x < ( one << (cz_bits - bits))) { rv += bits; x <<= bits; }
+
+#define process64(bits) process_((UINT64_C(1)), 64, (bits))
+static inline int clz64(uint64_t x)
+{
+ int rv = 0;
+
+ process64(32);
+ process64(16);
+ process64(8);
+ process64(4);
+ process64(2);
+ process64(1);
+ return rv;
+}
+#define process32(bits) process_((UINT32_C(1)), 32, (bits))
+static inline int clz32(uint32_t x)
+{
+ int rv = 0;
+
+ process32(16);
+ process32(8);
+ process32(4);
+ process32(2);
+ process32(1);
+ return rv;
+}
+
+#undef process_
+#undef process32
+#undef process64
+#define process_(one, bits) \
+ if ((x & ((one << (bits))-1)) == 0) { rv += bits; x >>= bits; }
+
+#define process64(bits) process_((UINT64_C(1)), bits)
+static inline int ctz64(uint64_t x)
+{
+ int rv = 0;
+
+ process64(32);
+ process64(16);
+ process64(8);
+ process64(4);
+ process64(2);
+ process64(1);
+ return rv;
+}
+
+#define process32(bits) process_((UINT32_C(1)), bits)
+static inline int ctz32(uint32_t x)
+{
+ int rv = 0;
+
+ process32(16);
+ process32(8);
+ process32(4);
+ process32(2);
+ process32(1);
+ return rv;
+}
+
+#undef process32
+#undef process64
+#undef process_
+
+/* End of generic case */
+
+#endif /* End of defining ctz */
+
+#ifdef TEST_BITOPS
+#include <stdio.h>
+#include <stdlib.h>
+
+static uint64_t testcases[] = {
+ 13371337 * 10,
+ 100,
+ 385789752,
+ 82574,
+ (((uint64_t)1)<<63) + (((uint64_t)1)<<31) + 10101
+};
+
+static int
+naive_clz(int bits, uint64_t v)
+{
+ int r = 0;
+ uint64_t bit = ((uint64_t)1) << (bits-1);
+ while (bit && 0 == (v & bit)) {
+ r++;
+ bit >>= 1;
+ }
+ /* printf("clz(%d,%lx) -> %d\n", bits, v, r); */
+ return r;
+}
+
+static int
+naive_ctz(int bits, uint64_t v)
+{
+ int r = 0;
+ uint64_t bit = 1;
+ while (bit && 0 == (v & bit)) {
+ r++;
+ bit <<= 1;
+ if (r == bits)
+ break;
+ }
+ /* printf("ctz(%d,%lx) -> %d\n", bits, v, r); */
+ return r;
+}
+
+static int
+check(uint64_t vv)
+{
+ uint32_t v32 = (uint32_t) vv;
+
+ if (vv == 0)
+ return 1; /* c[tl]z64(0) is undefined. */
+
+ if (ctz64(vv) != naive_ctz(64, vv)) {
+ printf("mismatch with ctz64: %d\n", ctz64(vv));
+ exit(1);
+ return 0;
+ }
+ if (clz64(vv) != naive_clz(64, vv)) {
+ printf("mismatch with clz64: %d\n", clz64(vv));
+ exit(1);
+ return 0;
+ }
+
+ if (v32 == 0)
+ return 1; /* c[lt]z(0) is undefined. */
+
+ if (ctz32(v32) != naive_ctz(32, v32)) {
+ printf("mismatch with ctz32: %d\n", ctz32(v32));
+ exit(1);
+ return 0;
+ }
+ if (clz32(v32) != naive_clz(32, v32)) {
+ printf("mismatch with clz32: %d\n", clz32(v32));
+ exit(1);
+ return 0;
+ }
+ return 1;
+}
+
+int
+main(int c, char **v)
+{
+ unsigned int i;
+ const unsigned int n = sizeof(testcases)/sizeof(testcases[0]);
+ int result = 0;
+
+ for (i = 0; i <= 63; ++i) {
+ uint64_t x = 1 << i;
+ if (!check(x))
+ result = 1;
+ --x;
+ if (!check(x))
+ result = 1;
+ }
+
+ for (i = 0; i < n; ++i) {
+ if (! check(testcases[i]))
+ result = 1;
+ }
+ if (result) {
+ puts("FAIL");
+ } else {
+ puts("OK");
+ }
+ return result;
+}
+#endif
+
diff --git a/deps/timeout/timeout-debug.h b/deps/timeout/timeout-debug.h
new file mode 100644
index 0000000..fc727a6
--- /dev/null
+++ b/deps/timeout/timeout-debug.h
@@ -0,0 +1,77 @@
+/*
+ * D E B U G R O U T I N E S
+ *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+#if TIMEOUT_DEBUG - 0
+#include <stdlib.h>
+#include <stdio.h>
+
+#undef TIMEOUT_DEBUG
+#define TIMEOUT_DEBUG 1
+#define DEBUG_LEVEL timeout_debug
+
+static int timeout_debug;
+
+#define SAYit_(lvl, fmt, ...) do { \
+ if (DEBUG_LEVEL >= (lvl)) \
+ fprintf(stderr, fmt "%s", __FILE__, __LINE__, __func__, __VA_ARGS__); \
+} while (0)
+
+#define SAYit(lvl, ...) SAYit_((lvl), "%s:%d:%s: " __VA_ARGS__, "\n")
+
+#define PANIC(...) do { \
+ SAYit(0, __VA_ARGS__); \
+ _Exit(EXIT_FAILURE); \
+} while (0)
+#else
+#undef TIMEOUT_DEBUG
+#define TIMEOUT_DEBUG 0
+#define DEBUG_LEVEL 0
+
+#define SAYit(...) (void)0
+#endif
+
+#define SAY(...) SAYit(1, __VA_ARGS__)
+#define HAI SAY("HAI")
+
+
+static inline char *fmt_(char *buf, uint64_t ts, int wheel_bit, int wheel_num) {
+ char *p = buf;
+ int wheel, n, i;
+
+ for (wheel = wheel_num - 2; wheel >= 0; wheel--) {
+ n = ((1 << wheel_bit) - 1) & (ts >> (wheel * WHEEL_BIT));
+
+ for (i = wheel_bit - 1; i >= 0; i--) {
+ *p++ = '0' + !!(n & (1 << i));
+ }
+
+ if (wheel != 0)
+ *p++ = ':';
+ }
+
+ *p = 0;
+
+ return buf;
+} /* fmt_() */
+
+#define fmt(ts) fmt_(((char[((1 << WHEEL_BIT) * WHEEL_NUM) + WHEEL_NUM + 1]){ 0 }), (ts), WHEEL_BIT, WHEEL_NUM)
+
+
+static inline char *bin64_(char *buf, uint64_t n, int wheel_bit) {
+ char *p = buf;
+ int i;
+
+ for (i = 0; i < (1 << wheel_bit); i++) {
+ *p++ = "01"[0x1 & (n >> (((1 << wheel_bit) - 1) - i))];
+ }
+
+ *p = 0;
+
+ return buf;
+} /* bin64_() */
+
+#define bin64(ts) bin64_(((char[((1 << WHEEL_BIT) * WHEEL_NUM) + 1]){ 0 }), (ts), WHEEL_BIT)
+
+
diff --git a/deps/timeout/timeout.c b/deps/timeout/timeout.c
new file mode 100644
index 0000000..aac21a0
--- /dev/null
+++ b/deps/timeout/timeout.c
@@ -0,0 +1,749 @@
+/* ==========================================================================
+ * timeout.c - Tickless hierarchical timing wheel.
+ * --------------------------------------------------------------------------
+ * Copyright (c) 2013, 2014 William Ahern
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the
+ * "Software"), to deal in the Software without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to permit
+ * persons to whom the Software is furnished to do so, subject to the
+ * following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included
+ * in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+ * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+ * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+ * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+ * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+ * USE OR OTHER DEALINGS IN THE SOFTWARE.
+ * ==========================================================================
+ */
+#include <limits.h> /* CHAR_BIT */
+
+#include <stddef.h> /* NULL */
+#include <stdlib.h> /* malloc(3) free(3) */
+#include <stdio.h> /* FILE fprintf(3) */
+
+#include <inttypes.h> /* UINT64_C uint64_t */
+
+#include <string.h> /* memset(3) */
+
+#include <errno.h> /* errno */
+
+#include <sys/queue.h> /* TAILQ(3) */
+
+#include "timeout.h"
+
+#if TIMEOUT_DEBUG - 0
+#include "timeout-debug.h"
+#endif
+
+#ifdef TIMEOUT_DISABLE_RELATIVE_ACCESS
+#define TO_SET_TIMEOUTS(to, T) ((void)0)
+#else
+#define TO_SET_TIMEOUTS(to, T) ((to)->timeouts = (T))
+#endif
+
+/*
+ * A N C I L L A R Y R O U T I N E S
+ *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+#define abstime_t timeout_t /* for documentation purposes */
+#define reltime_t timeout_t /* "" */
+
+#if !defined countof
+#define countof(a) (sizeof (a) / sizeof *(a))
+#endif
+
+#if !defined endof
+#define endof(a) (&(a)[countof(a)])
+#endif
+
+#if !defined MIN
+#define MIN(a, b) (((a) < (b))? (a) : (b))
+#endif
+
+#if !defined MAX
+#define MAX(a, b) (((a) > (b))? (a) : (b))
+#endif
+
+#if !defined TAILQ_CONCAT
+#define TAILQ_CONCAT(head1, head2, field) do { \
+ if (!TAILQ_EMPTY(head2)) { \
+ *(head1)->tqh_last = (head2)->tqh_first; \
+ (head2)->tqh_first->field.tqe_prev = (head1)->tqh_last; \
+ (head1)->tqh_last = (head2)->tqh_last; \
+ TAILQ_INIT((head2)); \
+ } \
+} while (0)
+#endif
+
+#if !defined TAILQ_FOREACH_SAFE
+#define TAILQ_FOREACH_SAFE(var, head, field, tvar) \
+ for ((var) = TAILQ_FIRST(head); \
+ (var) && ((tvar) = TAILQ_NEXT(var, field), 1); \
+ (var) = (tvar))
+#endif
+
+
+/*
+ * B I T M A N I P U L A T I O N R O U T I N E S
+ *
+ * The macros and routines below implement wheel parameterization. The
+ * inputs are:
+ *
+ * WHEEL_BIT - The number of value bits mapped in each wheel. The
+ * lowest-order WHEEL_BIT bits index the lowest-order (highest
+ * resolution) wheel, the next group of WHEEL_BIT bits the
+ * higher wheel, etc.
+ *
+ * WHEEL_NUM - The number of wheels. WHEEL_BIT * WHEEL_NUM = the number of
+ * value bits used by all the wheels. For the default of 6 and
+ * 4, only the low 24 bits are processed. Any timeout value
+ * larger than this will cycle through again.
+ *
+ * The implementation uses bit fields to remember which slot in each wheel
+ * is populated, and to generate masks of expiring slots according to the
+ * current update interval (i.e. the "tickless" aspect). The slots to
+ * process in a wheel are (populated-set & interval-mask).
+ *
+ * WHEEL_BIT cannot be larger than 6 bits because 2^6 -> 64 is the largest
+ * number of slots which can be tracked in a uint64_t integer bit field.
+ * WHEEL_BIT cannot be smaller than 3 bits because of our rotr and rotl
+ * routines, which only operate on all the value bits in an integer, and
+ * there's no integer smaller than uint8_t.
+ *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+#if !defined WHEEL_BIT
+#define WHEEL_BIT 6
+#endif
+
+#if !defined WHEEL_NUM
+#define WHEEL_NUM 4
+#endif
+
+#define WHEEL_LEN (1U << WHEEL_BIT)
+#define WHEEL_MAX (WHEEL_LEN - 1)
+#define WHEEL_MASK (WHEEL_LEN - 1)
+#define TIMEOUT_MAX ((TIMEOUT_C(1) << (WHEEL_BIT * WHEEL_NUM)) - 1)
+
+#include "timeout-bitops.c"
+
+#if WHEEL_BIT == 6
+#define ctz(n) ctz64(n)
+#define clz(n) clz64(n)
+#define fls(n) ((int)(64 - clz64(n)))
+#else
+#define ctz(n) ctz32(n)
+#define clz(n) clz32(n)
+#define fls(n) ((int)(32 - clz32(n)))
+#endif
+
+#if WHEEL_BIT == 6
+#define WHEEL_C(n) UINT64_C(n)
+#define WHEEL_PRIu PRIu64
+#define WHEEL_PRIx PRIx64
+
+typedef uint64_t wheel_t;
+
+#elif WHEEL_BIT == 5
+
+#define WHEEL_C(n) UINT32_C(n)
+#define WHEEL_PRIu PRIu32
+#define WHEEL_PRIx PRIx32
+
+typedef uint32_t wheel_t;
+
+#elif WHEEL_BIT == 4
+
+#define WHEEL_C(n) UINT16_C(n)
+#define WHEEL_PRIu PRIu16
+#define WHEEL_PRIx PRIx16
+
+typedef uint16_t wheel_t;
+
+#elif WHEEL_BIT == 3
+
+#define WHEEL_C(n) UINT8_C(n)
+#define WHEEL_PRIu PRIu8
+#define WHEEL_PRIx PRIx8
+
+typedef uint8_t wheel_t;
+
+#else
+#error invalid WHEEL_BIT value
+#endif
+
+
+static inline wheel_t rotl(const wheel_t v, int c) {
+ if (!(c &= (sizeof v * CHAR_BIT - 1)))
+ return v;
+
+ return (v << c) | (v >> (sizeof v * CHAR_BIT - c));
+} /* rotl() */
+
+
+static inline wheel_t rotr(const wheel_t v, int c) {
+ if (!(c &= (sizeof v * CHAR_BIT - 1)))
+ return v;
+
+ return (v >> c) | (v << (sizeof v * CHAR_BIT - c));
+} /* rotr() */
+
+
+/*
+ * T I M E R R O U T I N E S
+ *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+TAILQ_HEAD(timeout_list, timeout);
+
+struct timeouts {
+ struct timeout_list wheel[WHEEL_NUM][WHEEL_LEN], expired;
+
+ wheel_t pending[WHEEL_NUM];
+
+ timeout_t curtime;
+ timeout_t hertz;
+ long long count;
+}; /* struct timeouts */
+
+
+static struct timeouts *timeouts_init(struct timeouts *T, timeout_t hz) {
+ unsigned i, j;
+
+ for (i = 0; i < countof(T->wheel); i++) {
+ for (j = 0; j < countof(T->wheel[i]); j++) {
+ TAILQ_INIT(&T->wheel[i][j]);
+ }
+ }
+
+ TAILQ_INIT(&T->expired);
+
+ for (i = 0; i < countof(T->pending); i++) {
+ T->pending[i] = 0;
+ }
+
+ T->curtime = 0;
+ T->hertz = (hz)? hz : TIMEOUT_mHZ;
+ T->count =0;
+ return T;
+} /* timeouts_init() */
+long long timeouts_count(struct timeouts *T)
+{
+ return T->count;
+}
+TIMEOUT_PUBLIC struct timeouts *timeouts_open(timeout_t hz, int *error) {
+ struct timeouts *T;
+
+ if ((T = malloc(sizeof *T)))
+ return timeouts_init(T, hz);
+
+ *error = errno;
+
+ return NULL;
+} /* timeouts_open() */
+
+
+static void timeouts_reset(struct timeouts *T) {
+ struct timeout_list reset;
+ struct timeout *to;
+ unsigned i, j;
+
+ TAILQ_INIT(&reset);
+
+ for (i = 0; i < countof(T->wheel); i++) {
+ for (j = 0; j < countof(T->wheel[i]); j++) {
+ TAILQ_CONCAT(&reset, &T->wheel[i][j], tqe);
+ }
+ }
+
+ TAILQ_CONCAT(&reset, &T->expired, tqe);
+
+ TAILQ_FOREACH(to, &reset, tqe) {
+ to->pending = NULL;
+ TO_SET_TIMEOUTS(to, NULL);
+ }
+ T->count=0;
+} /* timeouts_reset() */
+
+
+TIMEOUT_PUBLIC void timeouts_close(struct timeouts *T) {
+ /*
+ * NOTE: Delete installed timeouts so timeout_pending() and
+ * timeout_expired() worked as expected.
+ */
+ timeouts_reset(T);
+
+ free(T);
+} /* timeouts_close() */
+
+
+TIMEOUT_PUBLIC timeout_t timeouts_hz(struct timeouts *T) {
+ return T->hertz;
+} /* timeouts_hz() */
+
+
+TIMEOUT_PUBLIC void timeouts_del(struct timeouts *T, struct timeout *to) {
+ if (to->pending) {
+ TAILQ_REMOVE(to->pending, to, tqe);
+
+ if (to->pending != &T->expired && TAILQ_EMPTY(to->pending)) {
+ ptrdiff_t index = to->pending - &T->wheel[0][0];
+ int wheel = index / WHEEL_LEN;
+ int slot = index % WHEEL_LEN;
+
+ T->pending[wheel] &= ~(WHEEL_C(1) << slot);
+ }
+
+ to->pending = NULL;
+ TO_SET_TIMEOUTS(to, NULL);
+ T->count--;
+ }
+} /* timeouts_del() */
+
+
+static inline reltime_t timeout_rem(struct timeouts *T, struct timeout *to) {
+ return to->expires - T->curtime;
+} /* timeout_rem() */
+
+
+static inline int timeout_wheel(timeout_t timeout) {
+ /* must be called with timeout != 0, so fls input is nonzero */
+ return (fls(MIN(timeout, TIMEOUT_MAX)) - 1) / WHEEL_BIT;
+} /* timeout_wheel() */
+
+
+static inline int timeout_slot(int wheel, timeout_t expires) {
+ return WHEEL_MASK & ((expires >> (wheel * WHEEL_BIT)) - !!wheel);
+} /* timeout_slot() */
+
+
+static void timeouts_sched(struct timeouts *T, struct timeout *to, timeout_t expires) {
+ timeout_t rem;
+ int wheel, slot;
+
+ timeouts_del(T, to);
+
+ to->expires = expires;
+
+ TO_SET_TIMEOUTS(to, T);
+
+ if (expires > T->curtime) {
+ rem = timeout_rem(T, to);
+
+ /* rem is nonzero since:
+ * rem == timeout_rem(T,to),
+ * == to->expires - T->curtime
+ * and above we have expires > T->curtime.
+ */
+ wheel = timeout_wheel(rem);
+ slot = timeout_slot(wheel, to->expires);
+
+ to->pending = &T->wheel[wheel][slot];
+ TAILQ_INSERT_TAIL(to->pending, to, tqe);
+
+ T->pending[wheel] |= WHEEL_C(1) << slot;
+ } else {
+ to->pending = &T->expired;
+ TAILQ_INSERT_TAIL(to->pending, to, tqe);
+ }
+} /* timeouts_sched() */
+
+
+#ifndef TIMEOUT_DISABLE_INTERVALS
+static void timeouts_readd(struct timeouts *T, struct timeout *to) {
+ to->expires += to->interval;
+
+ if (to->expires <= T->curtime) {
+ /* If we've missed the next firing of this timeout, reschedule
+ * it to occur at the next multiple of its interval after
+ * the last time that it fired.
+ */
+ timeout_t n = T->curtime - to->expires;
+ timeout_t r = n % to->interval;
+ to->expires = T->curtime + (to->interval - r);
+ }
+
+ timeouts_sched(T, to, to->expires);
+} /* timeouts_readd() */
+#endif
+
+
+TIMEOUT_PUBLIC void timeouts_add(struct timeouts *T, struct timeout *to, timeout_t timeout) {
+#ifndef TIMEOUT_DISABLE_INTERVALS
+ if (to->flags & TIMEOUT_INT)
+ to->interval = MAX(1, timeout);
+#endif
+ T->count++;
+ if (to->flags & TIMEOUT_ABS)
+ timeouts_sched(T, to, timeout);
+ else
+ timeouts_sched(T, to, T->curtime + timeout);
+} /* timeouts_add() */
+
+
+TIMEOUT_PUBLIC void timeouts_update(struct timeouts *T, abstime_t curtime) {
+ timeout_t elapsed = curtime - T->curtime;
+ struct timeout_list todo;
+ int wheel;
+
+ TAILQ_INIT(&todo);
+
+ /*
+ * There's no avoiding looping over every wheel. It's best to keep
+ * WHEEL_NUM smallish.
+ */
+ for (wheel = 0; wheel < WHEEL_NUM; wheel++) {
+ wheel_t pending;
+
+ /*
+ * Calculate the slots expiring in this wheel
+ *
+ * If the elapsed time is greater than the maximum period of
+ * the wheel, mark every position as expiring.
+ *
+ * Otherwise, to determine the expired slots fill in all the
+ * bits between the last slot processed and the current
+ * slot, inclusive of the last slot. We'll bitwise-AND this
+ * with our pending set below.
+ *
+ * If a wheel rolls over, force a tick of the next higher
+ * wheel.
+ */
+ if ((elapsed >> (wheel * WHEEL_BIT)) > WHEEL_MAX) {
+ pending = (wheel_t)~WHEEL_C(0);
+ } else {
+ wheel_t _elapsed = WHEEL_MASK & (elapsed >> (wheel * WHEEL_BIT));
+ int oslot, nslot;
+
+ /*
+ * TODO: It's likely that at least one of the
+ * following three bit fill operations is redundant
+ * or can be replaced with a simpler operation.
+ */
+ oslot = WHEEL_MASK & (T->curtime >> (wheel * WHEEL_BIT));
+ pending = rotl(((UINT64_C(1) << _elapsed) - 1), oslot);
+
+ nslot = WHEEL_MASK & (curtime >> (wheel * WHEEL_BIT));
+ pending |= rotr(rotl(((WHEEL_C(1) << _elapsed) - 1), nslot), _elapsed);
+ pending |= WHEEL_C(1) << nslot;
+ }
+
+ while (pending & T->pending[wheel]) {
+ /* ctz input cannot be zero: loop condition. */
+ int slot = ctz(pending & T->pending[wheel]);
+ TAILQ_CONCAT(&todo, &T->wheel[wheel][slot], tqe);
+ T->pending[wheel] &= ~(UINT64_C(1) << slot);
+ }
+
+ if (!(0x1 & pending))
+ break; /* break if we didn't wrap around end of wheel */
+
+ /* if we're continuing, the next wheel must tick at least once */
+ elapsed = MAX(elapsed, (WHEEL_LEN << (wheel * WHEEL_BIT)));
+ }
+
+ T->curtime = curtime;
+
+ while (!TAILQ_EMPTY(&todo)) {
+ struct timeout *to = TAILQ_FIRST(&todo);
+
+ TAILQ_REMOVE(&todo, to, tqe);
+ to->pending = NULL;
+
+ timeouts_sched(T, to, to->expires);
+ }
+
+ return;
+} /* timeouts_update() */
+
+
+TIMEOUT_PUBLIC void timeouts_step(struct timeouts *T, reltime_t elapsed) {
+ timeouts_update(T, T->curtime + elapsed);
+} /* timeouts_step() */
+
+
+TIMEOUT_PUBLIC bool timeouts_pending(struct timeouts *T) {
+ wheel_t pending = 0;
+ int wheel;
+
+ for (wheel = 0; wheel < WHEEL_NUM; wheel++) {
+ pending |= T->pending[wheel];
+ }
+
+ return !!pending;
+} /* timeouts_pending() */
+
+
+TIMEOUT_PUBLIC bool timeouts_expired(struct timeouts *T) {
+ return !TAILQ_EMPTY(&T->expired);
+} /* timeouts_expired() */
+
+
+/*
+ * Calculate the interval before needing to process any timeouts pending on
+ * any wheel.
+ *
+ * (This is separated from the public API routine so we can evaluate our
+ * wheel invariant assertions irrespective of the expired queue.)
+ *
+ * This might return a timeout value sooner than any installed timeout if
+ * only higher-order wheels have timeouts pending. We can only know when to
+ * process a wheel, not precisely when a timeout is scheduled. Our timeout
+ * accuracy could be off by 2^(N*M)-1 units where N is the wheel number and
+ * M is WHEEL_BIT. Only timeouts which have fallen through to wheel 0 can be
+ * known exactly.
+ *
+ * We should never return a timeout larger than the lowest actual timeout.
+ */
+static timeout_t timeouts_int(struct timeouts *T) {
+ timeout_t timeout = ~TIMEOUT_C(0), _timeout;
+ timeout_t relmask;
+ int wheel, slot;
+
+ relmask = 0;
+
+ for (wheel = 0; wheel < WHEEL_NUM; wheel++) {
+ if (T->pending[wheel]) {
+ slot = WHEEL_MASK & (T->curtime >> (wheel * WHEEL_BIT));
+
+ /* ctz input cannot be zero: T->pending[wheel] is
+ * nonzero, so rotr() is nonzero. */
+ _timeout = (ctz(rotr(T->pending[wheel], slot)) + !!wheel) << (wheel * WHEEL_BIT);
+ /* +1 to higher order wheels as those timeouts are one rotation in the future (otherwise they'd be on a lower wheel or expired) */
+
+ _timeout -= relmask & T->curtime;
+ /* reduce by how much lower wheels have progressed */
+
+ timeout = MIN(_timeout, timeout);
+ }
+
+ relmask <<= WHEEL_BIT;
+ relmask |= WHEEL_MASK;
+ }
+
+ return timeout;
+} /* timeouts_int() */
+
+
+/*
+ * Calculate the interval our caller can wait before needing to process
+ * events.
+ */
+TIMEOUT_PUBLIC timeout_t timeouts_timeout(struct timeouts *T) {
+ if (!TAILQ_EMPTY(&T->expired))
+ return 0;
+
+ return timeouts_int(T);
+} /* timeouts_timeout() */
+
+
+TIMEOUT_PUBLIC struct timeout *timeouts_get(struct timeouts *T) {
+ if (!TAILQ_EMPTY(&T->expired)) {
+ struct timeout *to = TAILQ_FIRST(&T->expired);
+
+ TAILQ_REMOVE(&T->expired, to, tqe);
+ to->pending = NULL;
+ TO_SET_TIMEOUTS(to, NULL);
+
+#ifndef TIMEOUT_DISABLE_INTERVALS
+ if ((to->flags & TIMEOUT_INT) && to->interval > 0)
+ timeouts_readd(T, to);
+#endif
+ T->count--;
+ return to;
+ } else {
+ return 0;
+ }
+} /* timeouts_get() */
+
+
+/*
+ * Use dumb looping to locate the earliest timeout pending on the wheel so
+ * our invariant assertions can check the result of our optimized code.
+ */
+static struct timeout *timeouts_min(struct timeouts *T) {
+ struct timeout *to, *min = NULL;
+ unsigned i, j;
+
+ for (i = 0; i < countof(T->wheel); i++) {
+ for (j = 0; j < countof(T->wheel[i]); j++) {
+ TAILQ_FOREACH(to, &T->wheel[i][j], tqe) {
+ if (!min || to->expires < min->expires)
+ min = to;
+ }
+ }
+ }
+
+ return min;
+} /* timeouts_min() */
+
+
+/*
+ * Check some basic algorithm invariants. If these invariants fail then
+ * something is definitely broken.
+ */
+#define report(...) do { \
+ if ((fp)) \
+ fprintf(fp, __VA_ARGS__); \
+} while (0)
+
+#define check(expr, ...) do { \
+ if (!(expr)) { \
+ report(__VA_ARGS__); \
+ return 0; \
+ } \
+} while (0)
+
+TIMEOUT_PUBLIC bool timeouts_check(struct timeouts *T, FILE *fp) {
+ timeout_t timeout;
+ struct timeout *to;
+
+ if ((to = timeouts_min(T))) {
+ check(to->expires > T->curtime, "missed timeout (expires:%" TIMEOUT_PRIu " <= curtime:%" TIMEOUT_PRIu ")\n", to->expires, T->curtime);
+
+ timeout = timeouts_int(T);
+ check(timeout <= to->expires - T->curtime, "wrong soft timeout (soft:%" TIMEOUT_PRIu " > hard:%" TIMEOUT_PRIu ") (expires:%" TIMEOUT_PRIu "; curtime:%" TIMEOUT_PRIu ")\n", timeout, (to->expires - T->curtime), to->expires, T->curtime);
+
+ timeout = timeouts_timeout(T);
+ check(timeout <= to->expires - T->curtime, "wrong soft timeout (soft:%" TIMEOUT_PRIu " > hard:%" TIMEOUT_PRIu ") (expires:%" TIMEOUT_PRIu "; curtime:%" TIMEOUT_PRIu ")\n", timeout, (to->expires - T->curtime), to->expires, T->curtime);
+ } else {
+ timeout = timeouts_timeout(T);
+
+ if (!TAILQ_EMPTY(&T->expired))
+ check(timeout == 0, "wrong soft timeout (soft:%" TIMEOUT_PRIu " != hard:%" TIMEOUT_PRIu ")\n", timeout, TIMEOUT_C(0));
+ else
+ check(timeout == ~TIMEOUT_C(0), "wrong soft timeout (soft:%" TIMEOUT_PRIu " != hard:%" TIMEOUT_PRIu ")\n", timeout, ~TIMEOUT_C(0));
+ }
+
+ return 1;
+} /* timeouts_check() */
+
+
+#define ENTER \
+ do { \
+ static const int pc0 = __LINE__; \
+ switch (pc0 + it->pc) { \
+ case __LINE__: (void)0
+
+#define SAVE_AND_DO(do_statement) \
+ do { \
+ it->pc = __LINE__ - pc0; \
+ do_statement; \
+ case __LINE__: (void)0; \
+ } while (0)
+
+#define YIELD(rv) \
+ SAVE_AND_DO(return (rv))
+
+#define LEAVE \
+ SAVE_AND_DO(break); \
+ } \
+ } while (0)
+
+TIMEOUT_PUBLIC struct timeout *timeouts_next(struct timeouts *T, struct timeouts_it *it) {
+ struct timeout *to;
+
+ ENTER;
+
+ if (it->flags & TIMEOUTS_EXPIRED) {
+ if (it->flags & TIMEOUTS_CLEAR) {
+ while ((to = timeouts_get(T))) {
+ YIELD(to);
+ }
+ } else {
+ TAILQ_FOREACH_SAFE(to, &T->expired, tqe, it->to) {
+ YIELD(to);
+ }
+ }
+ }
+
+ if (it->flags & TIMEOUTS_PENDING) {
+ for (it->i = 0; it->i < countof(T->wheel); it->i++) {
+ for (it->j = 0; it->j < countof(T->wheel[it->i]); it->j++) {
+ TAILQ_FOREACH_SAFE(to, &T->wheel[it->i][it->j], tqe, it->to) {
+ YIELD(to);
+ }
+ }
+ }
+ }
+
+ LEAVE;
+
+ return NULL;
+} /* timeouts_next */
+
+#undef LEAVE
+#undef YIELD
+#undef SAVE_AND_DO
+#undef ENTER
+
+
+/*
+ * T I M E O U T R O U T I N E S
+ *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+TIMEOUT_PUBLIC struct timeout *timeout_init(struct timeout *to, int flags) {
+ memset(to, 0, sizeof *to);
+
+ to->flags = flags;
+
+ return to;
+} /* timeout_init() */
+
+
+#ifndef TIMEOUT_DISABLE_RELATIVE_ACCESS
+TIMEOUT_PUBLIC bool timeout_pending(struct timeout *to) {
+ return to->pending && to->pending != &to->timeouts->expired;
+} /* timeout_pending() */
+
+
+TIMEOUT_PUBLIC bool timeout_expired(struct timeout *to) {
+ return to->pending && to->pending == &to->timeouts->expired;
+} /* timeout_expired() */
+
+
+TIMEOUT_PUBLIC void timeout_del(struct timeout *to) {
+ timeouts_del(to->timeouts, to);
+} /* timeout_del() */
+#endif
+
+
+/*
+ * V E R S I O N I N T E R F A C E S
+ *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+TIMEOUT_PUBLIC int timeout_version(void) {
+ return TIMEOUT_VERSION;
+} /* timeout_version() */
+
+
+TIMEOUT_PUBLIC const char *timeout_vendor(void) {
+ return TIMEOUT_VENDOR;
+} /* timeout_version() */
+
+
+TIMEOUT_PUBLIC int timeout_v_rel(void) {
+ return TIMEOUT_V_REL;
+} /* timeout_version() */
+
+
+TIMEOUT_PUBLIC int timeout_v_abi(void) {
+ return TIMEOUT_V_ABI;
+} /* timeout_version() */
+
+
+TIMEOUT_PUBLIC int timeout_v_api(void) {
+ return TIMEOUT_V_API;
+} /* timeout_version() */
+
diff --git a/deps/timeout/timeout.h b/deps/timeout/timeout.h
new file mode 100644
index 0000000..b597ab5
--- /dev/null
+++ b/deps/timeout/timeout.h
@@ -0,0 +1,256 @@
+/* ==========================================================================
+ * timeout.h - Tickless hierarchical timing wheel.
+ * --------------------------------------------------------------------------
+ * Copyright (c) 2013, 2014 William Ahern
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the
+ * "Software"), to deal in the Software without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to permit
+ * persons to whom the Software is furnished to do so, subject to the
+ * following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included
+ * in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+ * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+ * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+ * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+ * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+ * USE OR OTHER DEALINGS IN THE SOFTWARE.
+ * ==========================================================================
+ */
+#ifndef TIMEOUT_H
+#define TIMEOUT_H
+
+#include <stdbool.h> /* bool */
+#include <stdio.h> /* FILE */
+
+#include <inttypes.h> /* PRIu64 PRIx64 PRIX64 uint64_t */
+
+#include <sys/queue.h> /* TAILQ(3) */
+
+
+/*
+ * V E R S I O N I N T E R F A C E S
+ *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+#if !defined TIMEOUT_PUBLIC
+#define TIMEOUT_PUBLIC
+#endif
+
+#define TIMEOUT_VERSION TIMEOUT_V_REL
+#define TIMEOUT_VENDOR "[email protected]"
+
+#define TIMEOUT_V_REL 0x20160226
+#define TIMEOUT_V_ABI 0x20160224
+#define TIMEOUT_V_API 0x20160226
+
+TIMEOUT_PUBLIC int timeout_version(void);
+
+TIMEOUT_PUBLIC const char *timeout_vendor(void);
+
+TIMEOUT_PUBLIC int timeout_v_rel(void);
+
+TIMEOUT_PUBLIC int timeout_v_abi(void);
+
+TIMEOUT_PUBLIC int timeout_v_api(void);
+
+
+/*
+ * I N T E G E R T Y P E I N T E R F A C E S
+ *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+#define TIMEOUT_C(n) UINT64_C(n)
+#define TIMEOUT_PRIu PRIu64
+#define TIMEOUT_PRIx PRIx64
+#define TIMEOUT_PRIX PRIX64
+
+#define TIMEOUT_mHZ TIMEOUT_C(1000)
+#define TIMEOUT_uHZ TIMEOUT_C(1000000)
+#define TIMEOUT_nHZ TIMEOUT_C(1000000000)
+
+typedef uint64_t timeout_t;
+
+#define timeout_error_t int /* for documentation purposes */
+
+
+/*
+ * C A L L B A C K I N T E R F A C E
+ *
+ * Callback function parameters unspecified to make embedding into existing
+ * applications easier.
+ *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+#ifndef TIMEOUT_CB_OVERRIDE
+struct timeout_cb {
+ void (*fn)();
+ void *arg;
+}; /* struct timeout_cb */
+#endif
+
+/*
+ * T I M E O U T I N T E R F A C E S
+ *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+#ifndef TIMEOUT_DISABLE_INTERVALS
+#define TIMEOUT_INT 0x01 /* interval (repeating) timeout */
+#endif
+#define TIMEOUT_ABS 0x02 /* treat timeout values as absolute */
+
+#define TIMEOUT_INITIALIZER(flags) { (flags) }
+
+#define timeout_setcb(to, fn, arg) do { \
+ (to)->callback.fn = (fn); \
+ (to)->callback.arg = (arg); \
+} while (0)
+
+struct timeout {
+ int flags;
+
+ timeout_t expires;
+ /* absolute expiration time */
+
+ struct timeout_list *pending;
+ /* timeout list if pending on wheel or expiry queue */
+
+ TAILQ_ENTRY(timeout) tqe;
+ /* entry member for struct timeout_list lists */
+
+#ifndef TIMEOUT_DISABLE_CALLBACKS
+ struct timeout_cb callback;
+ /* optional callback information */
+#endif
+
+#ifndef TIMEOUT_DISABLE_INTERVALS
+ timeout_t interval;
+ /* timeout interval if periodic */
+#endif
+
+#ifndef TIMEOUT_DISABLE_RELATIVE_ACCESS
+ struct timeouts *timeouts;
+ /* timeouts collection if member of */
+#endif
+}; /* struct timeout */
+
+
+TIMEOUT_PUBLIC struct timeout *timeout_init(struct timeout *, int);
+/* initialize timeout structure (same as TIMEOUT_INITIALIZER) */
+
+#ifndef TIMEOUT_DISABLE_RELATIVE_ACCESS
+TIMEOUT_PUBLIC bool timeout_pending(struct timeout *);
+/* true if on timing wheel, false otherwise */
+
+TIMEOUT_PUBLIC bool timeout_expired(struct timeout *);
+/* true if on expired queue, false otherwise */
+
+TIMEOUT_PUBLIC void timeout_del(struct timeout *);
+/* remove timeout from any timing wheel (okay if not member of any) */
+#endif
+
+/*
+ * T I M I N G W H E E L I N T E R F A C E S
+ *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+struct timeouts;
+
+TIMEOUT_PUBLIC struct timeouts *timeouts_open(timeout_t, timeout_error_t *);
+/* open a new timing wheel, setting optional HZ (for float conversions) */
+
+TIMEOUT_PUBLIC void timeouts_close(struct timeouts *);
+/* destroy timing wheel */
+
+TIMEOUT_PUBLIC long long timeouts_count(struct timeouts *);
+/* count timeout of timeouts*/
+
+TIMEOUT_PUBLIC timeout_t timeouts_hz(struct timeouts *);
+/* return HZ setting (for float conversions) */
+
+TIMEOUT_PUBLIC void timeouts_update(struct timeouts *, timeout_t);
+/* update timing wheel with current absolute time */
+
+TIMEOUT_PUBLIC void timeouts_step(struct timeouts *, timeout_t);
+/* step timing wheel by relative time */
+
+TIMEOUT_PUBLIC timeout_t timeouts_timeout(struct timeouts *);
+/* return interval to next required update */
+
+TIMEOUT_PUBLIC void timeouts_add(struct timeouts *, struct timeout *, timeout_t);
+/* add timeout to timing wheel */
+
+TIMEOUT_PUBLIC void timeouts_del(struct timeouts *, struct timeout *);
+/* remove timeout from any timing wheel or expired queue (okay if on neither) */
+
+TIMEOUT_PUBLIC struct timeout *timeouts_get(struct timeouts *);
+/* return any expired timeout (caller should loop until NULL-return) */
+
+TIMEOUT_PUBLIC bool timeouts_pending(struct timeouts *);
+/* return true if any timeouts pending on timing wheel */
+
+TIMEOUT_PUBLIC bool timeouts_expired(struct timeouts *);
+/* return true if any timeouts on expired queue */
+
+TIMEOUT_PUBLIC bool timeouts_check(struct timeouts *, FILE *);
+/* return true if invariants hold. describes failures to optional file handle. */
+
+#define TIMEOUTS_PENDING 0x10
+#define TIMEOUTS_EXPIRED 0x20
+#define TIMEOUTS_ALL (TIMEOUTS_PENDING|TIMEOUTS_EXPIRED)
+#define TIMEOUTS_CLEAR 0x40
+
+#define TIMEOUTS_IT_INITIALIZER(flags) { (flags), 0, 0, 0, 0 }
+
+#define TIMEOUTS_IT_INIT(cur, _flags) do { \
+ (cur)->flags = (_flags); \
+ (cur)->pc = 0; \
+} while (0)
+
+struct timeouts_it {
+ int flags;
+ unsigned pc, i, j;
+ struct timeout *to;
+}; /* struct timeouts_it */
+
+TIMEOUT_PUBLIC struct timeout *timeouts_next(struct timeouts *, struct timeouts_it *);
+/* return next timeout in pending wheel or expired queue. caller can delete
+ * the returned timeout, but should not otherwise manipulate the timing
+ * wheel. in particular, caller SHOULD NOT delete any other timeout as that
+ * could invalidate cursor state and trigger a use-after-free.
+ */
+
+#define TIMEOUTS_FOREACH(var, T, flags) \
+ struct timeouts_it _it = TIMEOUTS_IT_INITIALIZER((flags)); \
+ while (((var) = timeouts_next((T), &_it)))
+
+
+/*
+ * B O N U S W H E E L I N T E R F A C E S
+ *
+ * I usually use floating point timeouts in all my code, but it's cleaner to
+ * separate it to keep the core algorithmic code simple.
+ *
+ * Using macros instead of static inline routines where <math.h> routines
+ * might be used to keep -lm linking optional.
+ *
+ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
+
+#include <math.h> /* ceil(3) */
+
+#define timeouts_f2i(T, f) \
+ ((timeout_t)ceil((f) * timeouts_hz((T)))) /* prefer late expiration over early */
+
+#define timeouts_i2f(T, i) \
+ ((double)(i) / timeouts_hz((T)))
+
+#define timeouts_addf(T, to, timeout) \
+ timeouts_add((T), (to), timeouts_f2i((T), (timeout)))
+
+#endif /* TIMEOUT_H */
diff --git a/shaping/CMakeLists.txt b/shaping/CMakeLists.txt
index 846a1bb..b96c476 100644
--- a/shaping/CMakeLists.txt
+++ b/shaping/CMakeLists.txt
@@ -1,4 +1,4 @@
-add_library(shaper src/shaper_maat.cpp src/shaper_marsio.cpp src/shaper_session.cpp src/shaper_stat.cpp src/shaper_swarmkv.cpp src/shaper.cpp src/shaper_global_stat.cpp)
+add_library(shaper src/shaper_maat.cpp src/shaper_marsio.cpp src/shaper_session.cpp src/shaper_stat.cpp src/shaper_swarmkv.cpp src/shaper.cpp src/shaper_global_stat.cpp ${PROJECT_SOURCE_DIR}/deps/timeout/timeout.c)
target_link_libraries(shaper PUBLIC common)
target_link_libraries(shaper PUBLIC avl_tree)
target_link_libraries(shaper PUBLIC cjson)
@@ -7,6 +7,7 @@ target_link_libraries(shaper PUBLIC MESA_prof_load)
target_link_libraries(shaper PUBLIC fieldstat3)
target_link_libraries(shaper PUBLIC pthread)
target_include_directories(shaper PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include)
+target_include_directories(shaper PUBLIC ${PROJECT_SOURCE_DIR}/deps/timeout)
add_executable(shaping_engine src/main.cpp)
target_link_libraries(shaping_engine PUBLIC shaper)
target_link_libraries(shaping_engine PUBLIC maat4)
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h
index 7821459..a218884 100644
--- a/shaping/include/shaper.h
+++ b/shaping/include/shaper.h
@@ -6,6 +6,9 @@
#include "session_table.h"
#include "utils.h"
#include "shaper_stat.h"
+extern "C" {
+#include "timeout.h"
+}
#define SHAPING_DIR_IN 0x1
#define SHAPING_DIR_OUT 0x2
@@ -22,6 +25,9 @@
#define SHAPING_WROK_THREAD_NUM_MAX 128
+#define SHAPING_STAT_REFRESH_INTERVAL_SEC 2
+#define SHAPING_STAT_REFRESH_MAX_PER_POLLING 5
+
#define SHAPING_GLOBAL_CONF_FILE "./conf/shaping.conf"
struct shaping_system_conf {
@@ -46,6 +52,8 @@ struct shaping_thread_ctx {
struct swarmkv *swarmkv_db;//handle of swarmkv
struct shaping_maat_info *maat_info;
struct session_table *session_table;
+ struct timeouts *expires;
+ time_t last_update_timeout_sec;
int session_need_reset;
struct shaping_system_conf conf;
};
@@ -84,9 +92,8 @@ struct shaping_profile_info {
int priority;
int in_deposit_token;
int out_deposit_token;
- int async_token_ref_count;
- int async_queue_len_ref_count;
unsigned long long enqueue_time_us;//to calculate max latency
+ unsigned long long last_failed_get_token_ms;
unsigned char is_priority_blocked;
unsigned char is_invalid;
struct shaping_stat_for_profile stat;
@@ -142,6 +149,8 @@ struct shaping_flow {
unsigned long long processed_pkts;
struct timespec stat_update_time;
time_t check_rule_time;
+ struct timeout timeout_handle;
+ time_t last_update_timeout_sec;
};
struct shaper_flow_instance {
@@ -151,7 +160,7 @@ struct shaper_flow_instance {
struct shaper;//instance of shaping, thread unsafe
-struct shaping_flow* shaping_flow_new();
+struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx);
void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf);
struct shaper* shaper_new(unsigned int priority_queue_len_max);
void shaper_free(struct shaper *sp);
diff --git a/shaping/include/shaper_stat.h b/shaping/include/shaper_stat.h
index 20da941..a2527cd 100644
--- a/shaping/include/shaper_stat.h
+++ b/shaping/include/shaper_stat.h
@@ -55,4 +55,4 @@ void shaper_stat_forward_all_rule_inc(struct shaping_stat *stat, struct shaping_
void shaper_stat_drop_inc(struct shaping_stat_for_profile *profile_stat, unsigned char direction, int thread_id);
void shaper_stat_max_latency_update(struct shaping_stat_for_profile *profile_stat, unsigned char direction, unsigned long long latency, int thread_id);
-void shaper_stat_refresh(struct shaping_stat *stat, struct shaping_flow *sf, int thread_id, int force); \ No newline at end of file
+void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int thread_id, int force); \ No newline at end of file
diff --git a/shaping/include/shaper_swarmkv.h b/shaping/include/shaper_swarmkv.h
index 8ccfe19..e533802 100644
--- a/shaping/include/shaper_swarmkv.h
+++ b/shaping/include/shaper_swarmkv.h
@@ -1,4 +1,5 @@
+#include <MESA/swarmkv.h>
-struct swarmkv* shaper_swarmkv_init();
+struct swarmkv* shaper_swarmkv_init(int caller_thread_num);
void shaper_swarmkv_destroy(struct swarmkv* swarmkv_db);
void swarmkv_reload_log_level(); \ No newline at end of file
diff --git a/shaping/src/main.cpp b/shaping/src/main.cpp
index 91e1202..f9a38b2 100644
--- a/shaping/src/main.cpp
+++ b/shaping/src/main.cpp
@@ -25,6 +25,8 @@ static void *shaper_thread_loop(void *data)
return NULL;
}
+ swarmkv_register_thread(ctx->swarmkv_db);
+
//loop to process pkts
while(!quit) {
shaper_packet_recv_and_process(ctx);
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 09a0c24..02d7f33 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -27,8 +27,14 @@ extern "C" {
#define MICRO_SECONDS_PER_SEC 1000000
#define NANO_SECONDS_PER_SEC 1000000000
+#define NANO_SECONDS_PER_MILLI_SEC 1000000
+#define MILLI_SECONDS_PER_SEC 1000
+
#define SHAPING_LATENCY_THRESHOLD 2000000 //2s
+#define TOKEN_ENLARGE_TIMES 10
+#define TOKEN_GET_FAILED_INTERVAL_MS 1
+
#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 "HMGET tsg-shaping-%d priority-0"
#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 " priority-1"
#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 " priority-2"
@@ -137,7 +143,7 @@ static void shaping_node_free(struct shaping_node *s_node)
return;
}
-struct shaping_flow* shaping_flow_new()
+struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx)
{
struct shaping_node *s_node = NULL;
int i;
@@ -157,6 +163,8 @@ struct shaping_flow* shaping_flow_new()
TAILQ_INIT(&s_node->shaping_flow.packet_queue);
s_node->shaping_flow.ref_count = 1;
s_node->shaping_flow.priority = SHAPING_PRIORITY_NUM_MAX - 1;
+ timeout_init(&s_node->shaping_flow.timeout_handle, TIMEOUT_ABS);
+ timeouts_add(ctx->expires, &s_node->shaping_flow.timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);
return &s_node->shaping_flow;
@@ -170,7 +178,8 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
struct shaping_node *s_node = (struct shaping_node*)sf;
if (__atomic_sub_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST) == 0) {
- shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 1);
+ timeouts_del(ctx->expires, &sf->timeout_handle);
+ shaper_stat_refresh(ctx, sf, ctx->thread_index, 1);
shaping_node_free(s_node);
}
@@ -250,19 +259,6 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx)
return;
}
-static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * cb_arg)
-{
- struct shaping_global_stat *global_stat = (struct shaping_global_stat *)cb_arg;
-
- shaper_global_stat_async_callback_inc(global_stat);
-
- if (reply->type != SWARMKV_REPLY_INTEGER) {
- shaper_global_stat_async_hincrby_failed_inc(global_stat);
- }
-
- return;
-}
-
//return success(0) while any avl tree insert success
int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time)
{
@@ -277,20 +273,10 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
pkt_wrapper = shaper_first_pkt_get(sf);
assert(pkt_wrapper != NULL);
- if ((sf->flag & SESSION_UPDATE_PF_PRIO_LEN) == 0) {
- if (sf->processed_pkts > CONFIRM_PRIORITY_PKTS) {
- sf->flag |= SESSION_UPDATE_PF_PRIO_LEN;
- }
- }
-
priority = s_rule_info->primary.priority;
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
- if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority);
- }
}
if (s_rule_info->borrowing_num == 0) {// no borrow profile
@@ -302,10 +288,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
- if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority);
- }
+ //TODO: calculate queue_len for borrow profile and add judge when refresh stat????
+ //shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index);
}
}
@@ -345,10 +329,6 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
priority = s_rule_info->primary.priority;
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
- if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority);
- }
}
if (s_rule_info->borrowing_num == 0) {
@@ -359,10 +339,7 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
priority = s_rule_info->borrowing[i].priority;
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
- if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority);
- }
+ //TODO: calculate queue_len for borrow profile and add judge when refresh stat????
}
}
@@ -416,6 +393,8 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
shaper_global_stat_async_callback_inc(arg->ctx->global_stat);
+ LOG_INFO("Swarmkv reply type =%d, integer =%llu",reply->type, reply->integer);
+
if (reply->type != SWARMKV_REPLY_INTEGER) {
shaper_global_stat_async_tconsume_failed_inc(arg->ctx->global_stat);
goto END;
@@ -428,10 +407,16 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
s_pf_info->is_invalid = 0;
}
+ if (reply->integer == 0) {//no token
+ struct timespec curr_time;
+ clock_gettime(CLOCK_MONOTONIC, &curr_time);
+ s_pf_info->last_failed_get_token_ms = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
+ goto END;
+ }
+
shaper_deposit_token_add(s_pf_info, reply->integer, arg->direction);//deposit tokens to profile
END:
- __atomic_sub_fetch(&s_pf_info->async_token_ref_count, 1, __ATOMIC_SEQ_CST);
shaping_flow_free(arg->ctx, sf);//sub ref count and decide if need to free
free(cb_arg);
cb_arg = NULL;
@@ -470,7 +455,6 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
struct shaping_async_cb_arg *arg = NULL;
char key[32] = {0};
- __atomic_add_fetch(&pf_info->async_token_ref_count, 1, __ATOMIC_SEQ_CST);
__atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST);
snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming");
@@ -483,14 +467,14 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
shaper_global_stat_async_invoke_inc(ctx->global_stat);
switch (pf_info->type) {
case PROFILE_TYPE_GENERIC:
- swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits, shaper_token_get_cb, arg);
+ swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg);
break;
case PROFILE_TYPE_HOST_FARINESS:
case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS:
- swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits, shaper_token_get_cb, arg);
+ swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg);
break;
case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST:
- swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits, shaper_token_get_cb, arg);
+ swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg);
break;
default:
if (arg) {
@@ -499,11 +483,6 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
break;
}
- if (__atomic_load_n(&pf_info->async_token_ref_count, __ATOMIC_SEQ_CST) != 0) {//has async operation not completed
- shaper_deposit_token_sub(pf_info, req_token_bits, direction);
- return SHAPER_TOKEN_GET_SUCCESS;
- }
-
if (pf_info->is_invalid) {
if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token
return SHAPER_TOKEN_GET_SUCCESS;
@@ -551,7 +530,6 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb
}
END:
- __atomic_sub_fetch(&s_pf_info->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST);
shaping_flow_free(arg->ctx, sf);//sub ref count and decide if need to free
free(cb_arg);
cb_arg = NULL;
@@ -572,20 +550,15 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st
arg->sf = sf;
arg->priority = priority;
- __atomic_add_fetch(&profile->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST);
__atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST);
shaper_global_stat_async_invoke_inc(ctx->global_stat);
swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id);
- if (__atomic_load_n(&profile->async_queue_len_ref_count, __ATOMIC_SEQ_CST) != 0) {
- return 0;
+ if (profile->is_priority_blocked) {
+ return 1;
} else {
- if (profile->is_priority_blocked) {
- return 1;
- } else {
- return 0;
- }
+ return 0;
}
}
@@ -609,6 +582,18 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f
return SHAPER_TOKEN_GET_PASS;//rule is disabled, don't need to get token and forward packet
}
+ if (shaper_deposit_token_is_enough(profile, req_token_bytes * 8, direction)) {
+ shaper_deposit_token_sub(profile, req_token_bytes * 8, direction);
+ return SHAPER_TOKEN_GET_SUCCESS;
+ }
+
+ struct timespec curr_timespec;
+ clock_gettime(CLOCK_MONOTONIC, &curr_timespec);
+ unsigned long long curr_time_ms = curr_timespec.tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec.tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
+ if (curr_time_ms - profile->last_failed_get_token_ms < TOKEN_GET_FAILED_INTERVAL_MS) {//if failed to get token in last 1ms, return failed; for swarmkv can't reproduce token in 1ms
+ return SHAPER_TOKEN_GET_FAILED;
+ }
+
if (shaper_profile_is_priority_blocked(ctx, sf, profile)) {
return SHAPER_TOKEN_GET_FAILED;
} else {
@@ -831,12 +816,12 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
break;
}
- shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 0);
+ shaper_stat_refresh(ctx, sf, ctx->thread_index, 0);
if (shaper_queue_empty(sf)) {
if (sf->flag & SESSION_CLOSE) {
- shaping_flow_free(ctx, sf);
sf->flag &= (~SESSION_CLOSE);
+ shaping_flow_free(ctx, sf);
}
return 0;
} else {
@@ -849,8 +834,8 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
} else {
shaper_queue_clear(sf, ctx);//first packet fail, then every packet will fail
if (sf->flag & SESSION_CLOSE) {
- shaping_flow_free(ctx, sf);
sf->flag &= (~SESSION_CLOSE);
+ shaping_flow_free(ctx, sf);
}
}
return 0;
@@ -911,14 +896,20 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
}
END:
- shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 0);
+ shaper_stat_refresh(ctx, sf, ctx->thread_index, 0);
+ time_t curr_time = time(NULL);
+ if (curr_time > sf->last_update_timeout_sec) {
+ timeouts_add(ctx->expires, &sf->timeout_handle, curr_time + SHAPING_STAT_REFRESH_INTERVAL_SEC);
+ sf->last_update_timeout_sec = curr_time;
+ }
+
if(sf->flag & SESSION_CLOSE) {
if (shaper_queue_empty(sf)) {
char *addr_str = addr_tuple4_to_str(&sf->tuple4);
LOG_DEBUG("%s: shaping free a shaping_flow for session: %s", LOG_TAG_SHAPING, addr_str);
- shaping_flow_free(ctx, sf);
sf->flag &= (~SESSION_CLOSE);
+ shaping_flow_free(ctx, sf);
if (addr_str) {
free(addr_str);
@@ -931,6 +922,27 @@ END:
void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_thread_ctx *ctx)
{
+ swarmkv_caller_loop(ctx->swarmkv_db, SWARMKV_LOOP_NONBLOCK, NULL);
+
+ struct timeout *t = NULL;
+ struct shaping_flow *sf = NULL;
+ time_t curr_time = time(NULL);
+ int cnt = 0;
+
+ if (curr_time > ctx->last_update_timeout_sec) {
+ timeouts_update(ctx->expires, curr_time);
+ ctx->last_update_timeout_sec = curr_time;
+ }
+
+ t = timeouts_get(ctx->expires);
+ while (t && cnt < SHAPING_STAT_REFRESH_MAX_PER_POLLING) {
+ sf = container_of(t, struct shaping_flow, timeout_handle);
+ shaper_stat_refresh(ctx, sf, ctx->thread_index, 0);
+ timeouts_add(ctx->expires, &sf->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);
+ t = timeouts_get(ctx->expires);
+ cnt++;
+ }
+
if (shaper_global_stat_queueing_pkts_get(ctx->global_stat) == 0) {
return;
}
@@ -1182,6 +1194,7 @@ void shaping_engine_destroy(struct shaping_ctx *ctx)
for (int i = 0; i < ctx->thread_num; i++) {
shaper_free(ctx->thread_ctx[i].sp);
session_table_destory(ctx->thread_ctx[i].session_table);
+ timeouts_close(ctx->thread_ctx[i].expires);
}
free(ctx->thread_ctx);
}
@@ -1196,7 +1209,7 @@ struct shaping_ctx *shaping_engine_init()
{
struct shaping_system_conf conf;
struct shaping_ctx *ctx = NULL;
- int ret;
+ int ret, error;
memset(&conf, 0, sizeof(conf));
ctx = (struct shaping_ctx *)calloc(1, sizeof(struct shaping_ctx));
@@ -1208,7 +1221,7 @@ struct shaping_ctx *shaping_engine_init()
}
/*init swarmkv*/
- ctx->swarmkv_db = shaper_swarmkv_init();
+ ctx->swarmkv_db = shaper_swarmkv_init(conf.work_thread_num);
if (ctx->swarmkv_db == NULL) {
goto ERROR;
}
@@ -1246,6 +1259,7 @@ struct shaping_ctx *shaping_engine_init()
ctx->thread_ctx[i].maat_info = ctx->maat_info;
ctx->thread_ctx[i].marsio_info = ctx->marsio_info;
ctx->thread_ctx[i].swarmkv_db = ctx->swarmkv_db;
+ ctx->thread_ctx[i].expires = timeouts_open(0, &error);
ctx->thread_ctx[i].ref_ctx = ctx;
memcpy(&ctx->thread_ctx[i].conf, &conf, sizeof(conf));
}
diff --git a/shaping/src/shaper_maat.cpp b/shaping/src/shaper_maat.cpp
index 4b4f21f..64db3e6 100644
--- a/shaping/src/shaper_maat.cpp
+++ b/shaping/src/shaper_maat.cpp
@@ -381,7 +381,7 @@ void shaper_rules_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf
}
if (sf->rule_num > 0 && priority_changed) {
- shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 1);
+ shaper_stat_refresh(ctx, sf, ctx->thread_index, 1);
}
sf->rule_num += rule_num;
diff --git a/shaping/src/shaper_session.cpp b/shaping/src/shaper_session.cpp
index f43f76f..af4a7ee 100644
--- a/shaping/src/shaper_session.cpp
+++ b/shaping/src/shaper_session.cpp
@@ -30,7 +30,7 @@ struct shaping_flow* shaper_session_opening(struct shaping_thread_ctx *ctx, stru
return NULL;
}
- sf = shaping_flow_new();
+ sf = shaping_flow_new(ctx);
raw_packet_parser_get_most_inner_tuple4(raw_parser, &sf->tuple4);
sf->src_ip_str = addr_src_ip_to_str(&sf->tuple4);
sf->src_ip_str_len = strlen(sf->src_ip_str);
diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp
index 6bd2cfb..6ede233 100644
--- a/shaping/src/shaper_stat.cpp
+++ b/shaping/src/shaper_stat.cpp
@@ -4,12 +4,14 @@
#include <sys/socket.h>
#include <arpa/inet.h>
#include <MESA/MESA_prof_load.h>
+#include <MESA/swarmkv.h>
#include <fieldstat.h>
#include "log.h"
#include "utils.h"
#include "shaper.h"
#include "shaper_stat.h"
+#include "shaper_global_stat.h"
#define SHAPER_STAT_ROW_NAME "traffic_shaping_rule_hits"
@@ -131,9 +133,23 @@ static void shaper_stat_tags_build(int vsys_id, int rule_id, int profile_id, int
return;
}
-static void shaper_stat_profile_metirc_refresh(struct shaping_stat *stat, int vsys_id, int thread_id, int rule_id, struct shaping_profile_info *profile, int profile_type, int need_update_guage)
+static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, void * cb_arg)
+{
+ struct shaping_global_stat *global_stat = (struct shaping_global_stat *)cb_arg;
+
+ shaper_global_stat_async_callback_inc(global_stat);
+
+ if (reply->type != SWARMKV_REPLY_INTEGER) {
+ shaper_global_stat_async_hincrby_failed_inc(global_stat);
+ }
+
+ return;
+}
+
+static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, int vsys_id, int thread_id, int rule_id, struct shaping_profile_info *profile, int profile_type, int need_update_guage)
{
struct shaping_stat_for_profile *profile_stat = &profile->stat;
+ struct shaping_stat *stat = ctx->stat;
unsigned long long old_latency;
shaper_stat_tags_build(vsys_id, rule_id, profile->id, profile->priority, profile_type);
@@ -158,6 +174,9 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_stat *stat, int vs
if (need_update_guage) {
fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.queue_len, tags, TAG_IDX_MAX, thread_id);
fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.queue_len, tags, TAG_IDX_MAX, thread_id);
+
+ shaper_global_stat_async_invoke_inc(ctx->global_stat);
+ swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d %lld", profile->id, profile->priority, profile_stat->in.queue_len + profile_stat->out.queue_len);
memset(profile_stat, 0, sizeof(struct shaping_stat_for_profile));
} else {
profile_stat->in.pkts = 0;
@@ -174,7 +193,7 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_stat *stat, int vs
return;
}
-void shaper_stat_refresh(struct shaping_stat *stat, struct shaping_flow *sf, int thread_id, int force)
+void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int thread_id, int force)
{
struct shaping_rule_info *rule;
struct timespec curr_time;
@@ -199,10 +218,10 @@ void shaper_stat_refresh(struct shaping_stat *stat, struct shaping_flow *sf, int
for (int i = 0; i < sf->rule_num; i++) {
rule = &sf->matched_rule_infos[i];
- shaper_stat_profile_metirc_refresh(stat, rule->vsys_id, thread_id, rule->id, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage);
+ shaper_stat_profile_metirc_refresh(ctx, rule->vsys_id, thread_id, rule->id, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage);
for (int j = 0; j < rule->borrowing_num; j++) {
- shaper_stat_profile_metirc_refresh(stat, rule->vsys_id, thread_id, rule->id, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage);
+ shaper_stat_profile_metirc_refresh(ctx, rule->vsys_id, thread_id, rule->id, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage);
}
}
diff --git a/shaping/src/shaper_swarmkv.cpp b/shaping/src/shaper_swarmkv.cpp
index 6d2e32f..05ccecc 100644
--- a/shaping/src/shaper_swarmkv.cpp
+++ b/shaping/src/shaper_swarmkv.cpp
@@ -1,10 +1,10 @@
-#include <MESA/swarmkv.h>
#include <MESA/MESA_handle_logger.h>
#include <MESA/MESA_prof_load.h>
#include "log.h"
#include "shaper.h"
#include "utils.h"
+#include "shaper_swarmkv.h"
struct shaper_swarmkv_conf
{
@@ -97,7 +97,7 @@ void swarmkv_reload_log_level()
return;
}
-struct swarmkv* shaper_swarmkv_init()
+struct swarmkv* shaper_swarmkv_init(int caller_thread_num)
{
struct swarmkv_options *swarmkv_opts = NULL;
struct swarmkv *swarmkv_db = NULL;
@@ -120,6 +120,8 @@ struct swarmkv* shaper_swarmkv_init()
swarmkv_options_set_health_check_announce_port(swarmkv_opts, conf.swarmkv_health_check_announce_port);
swarmkv_options_set_log_path(swarmkv_opts, "log");
swarmkv_options_set_log_level(swarmkv_opts, conf.swarmkv_log_level);
+ swarmkv_options_set_caller_thread_number(swarmkv_opts, caller_thread_num);
+ swarmkv_options_set_worker_thread_number(swarmkv_opts, 1);
swarmkv_db = swarmkv_open(swarmkv_opts, conf.swarmkv_cluster_name, &err);
if (err) {
diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp
index a1a9b77..252d5f5 100644
--- a/shaping/test/gtest_shaper.cpp
+++ b/shaping/test/gtest_shaper.cpp
@@ -5,6 +5,7 @@
#include <cjson/cJSON.h>
#include <sys/queue.h>
+#include "log.h"
#include "shaper.h"
#include "shaper_maat.h"
#include "shaper_stat.h"
@@ -239,7 +240,7 @@ TEST(single_session, udp_tx_in_order)
stub_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
- sf = shaping_flow_new();
+ sf = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf != NULL);
stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id);
@@ -270,7 +271,6 @@ TEST(single_session, udp_tx_in_order)
fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
shaper_global_stat_refresh(ctx->global_stat);
@@ -287,7 +287,7 @@ TEST(single_session, udp_tx_in_order)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 170, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts
+ shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 170000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
@@ -320,7 +320,7 @@ TEST(single_session, tcp_tx_in_order)
stub_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
- sf = shaping_flow_new();
+ sf = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf != NULL);
stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id);
@@ -340,8 +340,7 @@ TEST(single_session, tcp_tx_in_order)
/***********send stat data here********************/
- stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
- shaper_stat_refresh(ctx->thread_ctx[0].stat, sf, ctx->thread_ctx[0].thread_index, 1);
+ shaper_stat_refresh(&ctx->thread_ctx[0], sf, ctx->thread_ctx[0].thread_index, 1);
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
sleep(2);//wait telegraf generate metric
@@ -357,7 +356,6 @@ TEST(single_session, tcp_tx_in_order)
fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
@@ -375,7 +373,7 @@ TEST(single_session, tcp_tx_in_order)
shaping_stat_judge(line, 0, 0, 1, 30, 3000, 0, 10, 0, SHAPING_DIR_OUT, profile_type_primary);
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 0, 30 + (STUB_TIME_INC_FOR_METRIC_SEND / 1000), SHAPING_DIR_OUT, profile_type_primary);
+ shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 0, 30000, SHAPING_DIR_OUT, profile_type_primary);
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
@@ -401,7 +399,7 @@ TEST(single_session, udp_diff_direction)
stub_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
- sf = shaping_flow_new();
+ sf = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf != NULL);
stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id);
@@ -436,7 +434,6 @@ TEST(single_session, udp_diff_direction)
fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
@@ -451,9 +448,9 @@ TEST(single_session, udp_diff_direction)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 20, SHAPING_DIR_OUT, profile_type_primary);
+ shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 20000, SHAPING_DIR_OUT, profile_type_primary);
- shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 20, SHAPING_DIR_IN, profile_type_primary);
+ shaping_stat_judge(line, 0, 0, 1, 20, 2000, 0, 0, 20000, SHAPING_DIR_IN, profile_type_primary);
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
@@ -481,12 +478,12 @@ TEST(single_session, udp_multi_rules)
stub_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
- sf = shaping_flow_new();
+ sf = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf != NULL);
stub_set_matched_shaping_rules(3, rule_id, priority, profile_num, profile_id);
- stub_set_token_bucket_avl_per_sec(0, 1200, SHAPING_DIR_OUT);
- stub_set_token_bucket_avl_per_sec(1, 1100, SHAPING_DIR_OUT);
+ stub_set_token_bucket_avl_per_sec(0, 3000, SHAPING_DIR_OUT);
+ stub_set_token_bucket_avl_per_sec(1, 2000, SHAPING_DIR_OUT);
stub_set_token_bucket_avl_per_sec(2, 1000, SHAPING_DIR_OUT);
actual_tx_queue = stub_get_tx_queue();
shaper_rules_update(&ctx->thread_ctx[0], sf, rule_id, 3);
@@ -516,7 +513,6 @@ TEST(single_session, udp_multi_rules)
fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
shaping_engine_destroy(ctx);
@@ -529,13 +525,13 @@ TEST(single_session, udp_multi_rules)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 0
- shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 506, SHAPING_DIR_OUT, profile_type_primary);
+ shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 506000, SHAPING_DIR_OUT, profile_type_primary);
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1
- shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 1, SHAPING_DIR_OUT, profile_type_primary);//latency of every queued pkt is 1
+ shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 1000, SHAPING_DIR_OUT, profile_type_primary);//latency of every queued pkt is 1
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2
- shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 90, SHAPING_DIR_OUT, profile_type_primary);//max latency is first queued pkt
+ shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 90000, SHAPING_DIR_OUT, profile_type_primary);//max latency is first queued pkt
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
@@ -561,7 +557,7 @@ TEST(single_session, udp_borrow)
stub_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
- sf = shaping_flow_new();
+ sf = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf != NULL);
stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id);
@@ -591,7 +587,6 @@ TEST(single_session, udp_borrow)
fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
shaping_engine_destroy(ctx);
@@ -604,7 +599,7 @@ TEST(single_session, udp_borrow)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary
- shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 170, SHAPING_DIR_OUT, profile_type_primary);
+ shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 170000, SHAPING_DIR_OUT, profile_type_primary);
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow
shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow);
@@ -635,7 +630,7 @@ TEST(single_session, udp_borrow_same_priority_9)
stub_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
- sf = shaping_flow_new();
+ sf = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf != NULL);
stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id);
@@ -666,7 +661,6 @@ TEST(single_session, udp_borrow_same_priority_9)
fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
shaping_engine_destroy(ctx);
@@ -679,7 +673,7 @@ TEST(single_session, udp_borrow_same_priority_9)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary
- shaping_stat_judge(line, 1, 1, 9, 0, 0, 0, 0, 170, SHAPING_DIR_OUT, profile_type_primary);
+ shaping_stat_judge(line, 1, 1, 9, 0, 0, 0, 0, 170000, SHAPING_DIR_OUT, profile_type_primary);
#if 0 //fieldstat don't output a row when all values is zero
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow
@@ -721,9 +715,9 @@ TEST(two_session_diff_priority, udp_in_order)
TAILQ_INIT(&expec_tx_queue2);
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
- sf1 = shaping_flow_new();
+ sf1 = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf1 != NULL);
- sf2 = shaping_flow_new();
+ sf2 = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf2 != NULL);
stub_set_matched_shaping_rules(2, rule_ids, prioritys, profile_nums, profile_ids);
@@ -775,7 +769,6 @@ TEST(two_session_diff_priority, udp_in_order)
fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
@@ -789,10 +782,10 @@ TEST(two_session_diff_priority, udp_in_order)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 0
- shaping_stat_judge(line, 0, 0, 2, 100, 10000, 0, 0, 280, SHAPING_DIR_OUT, profile_type_primary);//max latency is every queued pkts
+ shaping_stat_judge(line, 0, 0, 2, 100, 10000, 0, 0, 280000, SHAPING_DIR_OUT, profile_type_primary);//max latency is every queued pkts
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1
- shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 90, SHAPING_DIR_OUT, profile_type_primary);//max latency is every queued pkts
+ shaping_stat_judge(line, 1, 1, 1, 100, 10000, 0, 0, 90000, SHAPING_DIR_OUT, profile_type_primary);//max latency is every queued pkts
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
@@ -834,9 +827,9 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule)
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
- sf1 = shaping_flow_new();
+ sf1 = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf1 != NULL);
- sf2 = shaping_flow_new();
+ sf2 = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf2 != NULL);
stub_set_matched_shaping_rules(4, rule_ids, prioritys, profile_nums, profile_ids);
@@ -895,7 +888,6 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule)
fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
@@ -910,16 +902,16 @@ TEST(two_session_diff_priority, udp_in_order_multi_rule)
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 1, 1, 1, 20, 2000, 0, 0, 48, SHAPING_DIR_OUT, profile_type_primary);//profile_id 1, max latency is last pkt
+ shaping_stat_judge(line, 1, 1, 1, 20, 2000, 0, 0, 48000, SHAPING_DIR_OUT, profile_type_primary);//profile_id 1, max latency is last pkt
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 2, 2, 1, 20, 2000, 0, 0, 1, SHAPING_DIR_OUT, profile_type_primary);//profile_id 2, evevy queued pkt's latency is 1
+ shaping_stat_judge(line, 2, 2, 1, 20, 2000, 0, 0, 1000, SHAPING_DIR_OUT, profile_type_primary);//profile_id 2, evevy queued pkt's latency is 1
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 4, 4, 1, 20, 2000, 0, 0, 1, SHAPING_DIR_OUT, profile_type_primary);//profile_id 4, max latency is first queued pkt
+ shaping_stat_judge(line, 4, 4, 1, 20, 2000, 0, 0, 1000, SHAPING_DIR_OUT, profile_type_primary);//profile_id 4, max latency is first queued pkt
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 3, 3, 3, 20, 2000, 0, 0, 40, SHAPING_DIR_OUT, profile_type_primary);//profile_id 3, every queued pkt's latency is 40
+ shaping_stat_judge(line, 3, 3, 3, 20, 2000, 0, 0, 40000, SHAPING_DIR_OUT, profile_type_primary);//profile_id 3, every queued pkt's latency is 40
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
@@ -947,7 +939,7 @@ TEST(single_session_async, udp_tx_in_order)
stub_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
- sf = shaping_flow_new();
+ sf = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf != NULL);
stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id);
@@ -965,15 +957,8 @@ TEST(single_session_async, udp_tx_in_order)
send_packets(&ctx->thread_ctx[0], sf, 80, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0);
- //first 20 packets,async get token
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 20));
- ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
-
- stub_refresh_token_bucket(0);
- for (int i = 0; i < 10; i++) {//异步获取token多发送了10个报文,补回token,不应发送报文
- polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
- }
+ //first 10 packets, got token
+ ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
while (!TAILQ_EMPTY(&expec_tx_queue)) {//last 90 delay packets
@@ -990,7 +975,6 @@ TEST(single_session_async, udp_tx_in_order)
fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
shaping_engine_destroy(ctx);
@@ -1003,7 +987,7 @@ TEST(single_session_async, udp_tx_in_order)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 160, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts
+ shaping_stat_judge(line, 0, 0, 1, 100, 10000, 0, 0, 170000, SHAPING_DIR_OUT, profile_type_primary);//max latency is last 10 pkts
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
@@ -1029,7 +1013,7 @@ TEST(single_session_async, udp_close_before_async_exec)
stub_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
- sf = shaping_flow_new();
+ sf = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf != NULL);
stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id);
@@ -1041,35 +1025,21 @@ TEST(single_session_async, udp_close_before_async_exec)
/*******send packets***********/
send_packets(&ctx->thread_ctx[0], sf, 10, 100, SHAPING_DIR_OUT, &expec_tx_queue, 1, 0);
-
+ ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));//async callback haven't been called, no token, no packet be sent
+ sf->flag |= SESSION_CLOSE;// receive close ctrlbuf
stub_set_async_token_get_times(0, 0);//refresh async count, async thread will be executed
sleep(1);//ensure async thread exec complete
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
-
- shaping_flow_free(&ctx->thread_ctx[0], sf);
- fieldstat_global_disable_prometheus_endpoint();
-
- /***********send stat data here********************/
- stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
- fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
+ for (int i = 0; i < 10; i++) {
+ polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
+ }
+ ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue, actual_tx_queue, 10));
+ fieldstat_global_disable_prometheus_endpoint();
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
-
- /*******test statistics***********/
- sleep(2);//wait telegraf to output
- FILE *stat_file;
-
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_primary);
- fclose(stat_file);
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
- fclose(stat_file);
}
/*session1 match rule1; session2 match rule2
@@ -1105,9 +1075,9 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order)
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
- sf1 = shaping_flow_new();
+ sf1 = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf1 != NULL);
- sf2 = shaping_flow_new();
+ sf2 = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf2 != NULL);
@@ -1138,6 +1108,7 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order)
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
}
+ shaper_stat_refresh(&ctx->thread_ctx[0], sf2, 0, 1);//refresh stat, to ensure priority queue_len in swarmkv is correct
while (!TAILQ_EMPTY(&expec_tx_queue1)) {//last 90 delay packets
stub_refresh_token_bucket(2);
@@ -1154,7 +1125,6 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order)
fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
shaping_engine_destroy(ctx);
@@ -1167,13 +1137,13 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 1, primary
- shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 470, SHAPING_DIR_OUT, profile_type_primary);
+ shaping_stat_judge(line, 1, 1, 1, 0, 0, 0, 0, 470000, SHAPING_DIR_OUT, profile_type_primary);
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, borrow
shaping_stat_judge(line, 1, 2, 2, 100, 10000, 0, 0, 0, SHAPING_DIR_OUT, profile_type_borrow);
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//profile_id 2, primary
- shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 190, SHAPING_DIR_OUT, profile_type_primary);
+ shaping_stat_judge(line, 2, 2, 1, 100, 10000, 0, 0, 190000, SHAPING_DIR_OUT, profile_type_primary);
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
@@ -1205,9 +1175,9 @@ TEST(two_session_same_rule, udp_tx_in_order)
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
- sf1 = shaping_flow_new();
+ sf1 = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf1 != NULL);
- sf2 = shaping_flow_new();
+ sf2 = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf2 != NULL);
stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id);
@@ -1241,7 +1211,6 @@ TEST(two_session_same_rule, udp_tx_in_order)
fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
shaping_engine_destroy(ctx);
@@ -1254,7 +1223,7 @@ TEST(two_session_same_rule, udp_tx_in_order)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 1, 1, 1, 200, 20000, 0, 0, 370, SHAPING_DIR_OUT, profile_type_primary);
+ shaping_stat_judge(line, 1, 1, 1, 200, 20000, 0, 0, 370000, SHAPING_DIR_OUT, profile_type_primary);
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
@@ -1292,9 +1261,9 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order)
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
- sf1 = shaping_flow_new();
+ sf1 = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf1 != NULL);
- sf2 = shaping_flow_new();
+ sf2 = shaping_flow_new(&ctx->thread_ctx[1]);
ASSERT_TRUE(sf2 != NULL);
stub_set_matched_shaping_rules(2, rule_ids, prioritys, profile_nums, profile_id);
@@ -1305,21 +1274,12 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order)
shaper_rules_update(&ctx->thread_ctx[1], sf2, rule_id2, 1);
/*******send packets***********/
- for (int i = 0; i < 100; i++) {
- send_packets(&ctx->thread_ctx[0], sf1, 1, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0);
- send_packets(&ctx->thread_ctx[1], sf2, 1, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0);
-
- if (i < 5) {
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));
- }
- }
+ send_packets(&ctx->thread_ctx[0], sf1, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0);
+ send_packets(&ctx->thread_ctx[1], sf2, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0);
+ ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
- stub_refresh_token_bucket(0);
- polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//优先级高的session发出一个报文,将优先级信息更新到swarmkv中
-
+ shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 0, 1);//刷新线程0中的优先级队列长度到swarmkv中
for (int i = 0; i < 10; i++) {//线程1中的session优先级为2,被线程0中优先级为1的session阻断
stub_refresh_token_bucket(0);
polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
@@ -1333,6 +1293,7 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order)
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 priority 1
}
+ shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 0, 1);//刷新线程0中的优先级队列长度到swarmkv中
while (!TAILQ_EMPTY(&expec_tx_queue2)) {
stub_refresh_token_bucket(0);
polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
@@ -1349,9 +1310,6 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order)
stub_clear_matched_shaping_rules();
}
-
-
-
/*session1 match rule1; session2 match rule2
rule1:
priority:1
@@ -1362,7 +1320,7 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order)
profile_a: limit 1000
*/
-TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order)
+TEST(two_session_diff_priority_same_profile, session_timer_test)
{
struct stub_pkt_queue expec_tx_queue1;
struct stub_pkt_queue expec_tx_queue2;
@@ -1376,9 +1334,6 @@ TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order)
int profile_nums[] = {1, 1};
int prioritys[] = {1, 2};
int profile_id[][MAX_REF_PROFILE] = {{0}, {0}};
- int stream1_pkt_num = 0;
- int stream2_pkt_num = 0;
- time_t curr_time;
TAILQ_INIT(&expec_tx_queue1);
@@ -1387,9 +1342,9 @@ TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order)
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
- sf1 = shaping_flow_new();
+ sf1 = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf1 != NULL);
- sf2 = shaping_flow_new();
+ sf2 = shaping_flow_new(&ctx->thread_ctx[1]);
ASSERT_TRUE(sf2 != NULL);
stub_set_matched_shaping_rules(2, rule_ids, prioritys, profile_nums, profile_id);
@@ -1397,45 +1352,48 @@ TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order)
stub_set_token_bucket_avl_per_sec(0, 1000, SHAPING_DIR_OUT);
actual_tx_queue = stub_get_tx_queue();
shaper_rules_update(&ctx->thread_ctx[0], sf1, rule_id1, 1);
- shaper_rules_update(&ctx->thread_ctx[0], sf2, rule_id2, 1);
+ shaper_rules_update(&ctx->thread_ctx[1], sf2, rule_id2, 1);
/*******send packets***********/
- {//为方便判断统计时的顺序,先给rule1发送一个报文,保证统计发送时rule1顺序在前
- send_packets(&ctx->thread_ctx[0], sf1, 1, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0);
- stream1_pkt_num++;
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));
- }
+ send_packets(&ctx->thread_ctx[0], sf1, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0);
+ send_packets(&ctx->thread_ctx[1], sf2, 100, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0);
+ ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));
+ ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
- time(&curr_time);
- srand(curr_time);
- for (int i = 0; i < 99; i++) {
- if (rand() % 2 == 0) {
- send_packets(&ctx->thread_ctx[0], sf1, 1, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0);
- stream1_pkt_num++;
- if (i < 9) {
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));
- }
- } else {
- send_packets(&ctx->thread_ctx[0], sf2, 1, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0);
- stream2_pkt_num++;
- if (i < 9) {
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));
- }
- }
+ sleep(3);//wait session timer to expire, to refresh priority queue_len to swarmkv
+ for (int i = 0; i < 200; i++) {
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);//inc time to refresh stat in timer
}
+ polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//timer triggered in polling
+ polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
- ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
+ for (int i = 0; i < 10; i++) {//线程1中的session优先级为2,被线程0中优先级为1的session阻断
+ stub_refresh_token_bucket(0);
+ polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
+
+ ASSERT_EQ(-1, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//优先级低,不能发出报文
+ }
while (!TAILQ_EMPTY(&expec_tx_queue1)) {
stub_refresh_token_bucket(0);
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 priority 1
}
+ sleep(3);//wait session timer to expire, to refresh priority queue_len to swarmkv
+ for (int i = 0; i < 200; i++) {
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);//inc time to refresh stat in timer
+ }
+ polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);//timer triggered in polling
+ polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
+
while (!TAILQ_EMPTY(&expec_tx_queue2)) {
stub_refresh_token_bucket(0);
- polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
+ polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
+ stub_curr_time_inc(STUB_TIME_INC_FOR_PACKET);
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//sf2 priority 2
}
@@ -1443,32 +1401,10 @@ TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order)
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
shaping_flow_free(&ctx->thread_ctx[0], sf1);
- shaping_flow_free(&ctx->thread_ctx[0], sf2);
+ shaping_flow_free(&ctx->thread_ctx[1], sf2);
fieldstat_global_disable_prometheus_endpoint();
-
- /***********send stat data here********************/
- stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
- fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
-
-
shaping_engine_destroy(ctx);
stub_clear_matched_shaping_rules();
-
- /*******test statistics***********/
- sleep(2);//wait telegraf to output
- FILE *stat_file;
-
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
- memset(line, 0, sizeof(line));
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 1, 0, 1, stream1_pkt_num, stream1_pkt_num*100, 0, 0, -1, SHAPING_DIR_OUT, profile_type_primary);//can't predict a certain latency cause of random
-
- ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 2, 0, 2, stream2_pkt_num, stream2_pkt_num*100, 0, 0, -1, SHAPING_DIR_OUT, profile_type_primary);//can't predict a certain latency cause of random
-
- fclose(stat_file);
- stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
- fclose(stat_file);
}
/*session1 match rule1
@@ -1489,7 +1425,7 @@ TEST(statistics, udp_drop_pkt)
stub_init();
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
- sf = shaping_flow_new();
+ sf = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf != NULL);
stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id);
@@ -1516,7 +1452,6 @@ TEST(statistics, udp_drop_pkt)
fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
shaper_global_stat_refresh(ctx->global_stat);
@@ -1531,7 +1466,7 @@ TEST(statistics, udp_drop_pkt)
stat_file = fopen(SHAPING_STAT_FILE_NAME, "r");
memset(line, 0, sizeof(line));
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));
- shaping_stat_judge(line, 0, 0, 1, SHAPING_SESSION_QUEUE_LEN+10, (SHAPING_SESSION_QUEUE_LEN+10)*100, 100, 0, 228, SHAPING_DIR_OUT, profile_type_primary);//every queued pkt's latency is max
+ shaping_stat_judge(line, 0, 0, 1, SHAPING_SESSION_QUEUE_LEN+10, (SHAPING_SESSION_QUEUE_LEN+10)*100, 100, 0, 228000, SHAPING_DIR_OUT, profile_type_primary);//every queued pkt's latency is max
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
fclose(stat_file);
@@ -1564,7 +1499,7 @@ TEST(statistics, udp_queueing_pkt)
ctx = shaping_engine_init();
ASSERT_TRUE(ctx != NULL);
- sf = shaping_flow_new();
+ sf = shaping_flow_new(&ctx->thread_ctx[0]);
ASSERT_TRUE(sf != NULL);
stub_set_matched_shaping_rules(1, rule_id, priority, profile_num, profile_id);
@@ -1577,8 +1512,7 @@ TEST(statistics, udp_queueing_pkt)
/***********send stat data here********************/
- stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
- shaper_stat_refresh(ctx->thread_ctx[0].stat, sf, ctx->thread_ctx[0].thread_index, 1);
+ shaper_stat_refresh(&ctx->thread_ctx[0], sf, ctx->thread_ctx[0].thread_index, 1);
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
shaper_global_stat_refresh(ctx->global_stat);
sleep(2);//wait telegraf generate metric
@@ -1605,7 +1539,6 @@ TEST(statistics, udp_queueing_pkt)
fieldstat_global_disable_prometheus_endpoint();
/***********send stat data here********************/
- stub_curr_time_inc(STUB_TIME_INC_FOR_METRIC_SEND);//inc time to send metric
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
shaper_global_stat_refresh(ctx->global_stat);
@@ -1623,7 +1556,7 @@ TEST(statistics, udp_queueing_pkt)
shaping_stat_judge(line, 0, 0, 1, 10, 1000, 0, 90, 0, SHAPING_DIR_OUT, profile_type_primary);
ASSERT_TRUE(NULL != fgets(line, sizeof(line), stat_file));//stat data last sent
- shaping_stat_judge(line, 0, 0, 1, 90, 9000, 0, 0, 90 + (STUB_TIME_INC_FOR_METRIC_SEND / 1000), SHAPING_DIR_OUT, profile_type_primary);
+ shaping_stat_judge(line, 0, 0, 1, 90, 9000, 0, 0, 90000, SHAPING_DIR_OUT, profile_type_primary);
fclose(stat_file);
stat_file = fopen(SHAPING_STAT_FILE_NAME, "w");//clear stat file
@@ -1640,6 +1573,6 @@ TEST(statistics, udp_queueing_pkt)
int main(int argc, char **argv)
{
testing::InitGoogleTest(&argc, argv);
- //testing::GTEST_FLAG(filter) = "single_session.udp_borrow_same_priority_9";
+ //testing::GTEST_FLAG(filter) = "single_session_async.udp_close_before_async_exec";
return RUN_ALL_TESTS();
} \ No newline at end of file
diff --git a/shaping/test/stub.cpp b/shaping/test/stub.cpp
index 9406752..25d4550 100644
--- a/shaping/test/stub.cpp
+++ b/shaping/test/stub.cpp
@@ -15,6 +15,7 @@
#include <pthread.h>
#include "stub.h"
#include "shaper_maat.h"
+#include "log.h"
using namespace std;
@@ -166,6 +167,8 @@ void stub_init()
{
int i;
+ LOG_INIT("./conf/zlog.conf");
+
TAILQ_INIT(&tx_queue);
memset(&matched_rules, 0, sizeof(struct stub_matched_rules));
memset(&pf_array, 0, MAX_STUB_PROFILE_NUM * sizeof(struct shaping_profile));
@@ -418,6 +421,21 @@ void swarmkv_close(struct swarmkv * db)
}
return;
}
+
+void swarmkv_caller_loop(struct swarmkv *db, int flags, struct timeval *tv)
+{
+ return;
+}
+
+int swarmkv_options_set_caller_thread_number(struct swarmkv_options *opts, int nr_caller_threads)
+{
+ return 0;
+}
+
+int swarmkv_options_set_worker_thread_number(struct swarmkv_options *opts, int nr_worker_threads)
+{
+ return 0;
+}
/**********************************************/
/*************stub of maat*********************/
diff --git a/shaping/test/stub.h b/shaping/test/stub.h
index 7f09e02..80e099b 100644
--- a/shaping/test/stub.h
+++ b/shaping/test/stub.h
@@ -9,8 +9,7 @@
#define STUB_MAAT_SHAPING_RULE_TABLE_ID 0
#define STUB_MAAT_SHAPING_PROFILE_TABLE_ID 1
-#define STUB_TIME_INC_FOR_PACKET 1000
-#define STUB_TIME_INC_FOR_METRIC_SEND 1000000
+#define STUB_TIME_INC_FOR_PACKET 1000000
#define STUB_TEST_VSYS_ID 2333
diff --git a/shaping/test/test_conf/zlog.conf b/shaping/test/test_conf/zlog.conf
index ad3644d..4d6c9d5 100644
--- a/shaping/test/test_conf/zlog.conf
+++ b/shaping/test/test_conf/zlog.conf
@@ -8,4 +8,4 @@ FATAL=30
[rules]
log_shaping.DEBUG "./log/shaping.log.%d(%F)";
-log_shaping.DEBUG >stdout; \ No newline at end of file
+#log_shaping.DEBUG >stdout; \ No newline at end of file