summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2023-12-07 14:01:21 +0800
committeryangwei <[email protected]>2023-12-18 09:48:25 +0800
commit8a4f10e8c4082b290c630e378f4b000c8feb7405 (patch)
tree734fbba1ffb2bfb68fdaa5eb08c0ab77b3dc588b
parent558488fbb2ed47c3a50f1a4895dea7616cbfbc5e (diff)
✨ feat(max_timeouts_per_msec): 支持设置每个线程主动超时流的速率
-rw-r--r--CMakeLists.txt2
-rw-r--r--bin/etc/sapp.toml7
-rw-r--r--include/private/sapp_global_val.h6
-rw-r--r--include/private/stream_manage.h6
-rw-r--r--include/support/token_bucket.h6
-rw-r--r--src/config/config_parse.cpp7
-rw-r--r--src/dealpkt/deal_tcp.c10
-rw-r--r--src/dealpkt/stream_manage.c195
-rw-r--r--src/entry/CMakeLists.txt2
-rw-r--r--src/support/CMakeLists.txt1
-rw-r--r--src/support/token_bucket/CMakeLists.txt7
-rw-r--r--src/support/token_bucket/token_bucket.c56
12 files changed, 236 insertions, 69 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0b0d307..e4a4f53 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -201,7 +201,7 @@ set(SAPP_DEPEND_DYN_LIB MESA_handle_logger MESA_prof_load MESA_htable MESA_fiel
set(SAPP_INNER_STATIC_LIB sapp_dev dealpkt packet_io packet_io_pcap packet_io_marsio common
config inline_keepalive gdev_assistant sapp_assistant sapp_metrics plugctrl project timer
md5 symbol_check tomlc99_wrap MESA_socket_wrap timestamp_record
- MESA_sleep dpdk_ip_hash cpu_limit timeout libdabloom)
+ MESA_sleep dpdk_ip_hash cpu_limit timeout libdabloom token_bucket)
if(LIBEVENT_ENABLED)
set(SAPP_INNER_STATIC_LIB ${SAPP_INNER_STATIC_LIB} libevent-static)
diff --git a/bin/etc/sapp.toml b/bin/etc/sapp.toml
index 4b6c8dd..ed3a8fe 100644
--- a/bin/etc/sapp.toml
+++ b/bin/etc/sapp.toml
@@ -143,6 +143,9 @@
analyse_option_enabled=1
tuple4_reuse_time_interval=30
+ max_opening_per_sec=1
+ max_timeouts_per_sec=1
+
meaningful_statistics_minimum_pkt=3
meaningful_statistics_minimum_byte=5
@@ -163,6 +166,10 @@
meaningful_statistics_minimum_pkt=3
meaningful_statistics_minimum_byte=5
+ max_opening_per_sec=5
+ max_timeouts_per_sec=5
+
+
[PROFILING]
[profiling.log]
sapp_log_category="sapp_log"
diff --git a/include/private/sapp_global_val.h b/include/private/sapp_global_val.h
index 3aa1787..911330e 100644
--- a/include/private/sapp_global_val.h
+++ b/include/private/sapp_global_val.h
@@ -101,10 +101,12 @@ typedef struct{
int meaningful_statistics_minimum_pkt;
int meaningful_statistics_minimum_byte;
sapp_config_stream_tcp_inject_t inject;
- unsigned short *well_known_ports_array; /* ���ڰ�data������ȷ��server�� */
int well_known_ports_array_num;
+ unsigned short *well_known_ports_array; /* ���ڰ�data������ȷ��server�� */
int closing_timeout;
int opening_timeout;
+ int max_timeouts_per_sec;
+ int max_opening_per_sec;
}sapp_config_stream_tcp_t;
@@ -115,6 +117,8 @@ typedef struct{
int meaningful_statistics_minimum_byte;
unsigned short *well_known_ports_array; /* �����½�����ȷ��server��, ����4001->8000, ʵ������OICQ��C2S�������ݰ�,֮ǰ�İ汾�ᱻ��ʶ��ΪS2C����� */
int well_known_ports_array_num;
+ int max_timeouts_per_sec;
+ int max_opening_per_sec;
}sapp_config_stream_udp_t;
diff --git a/include/private/stream_manage.h b/include/private/stream_manage.h
index 1c4a030..cdd91a6 100644
--- a/include/private/stream_manage.h
+++ b/include/private/stream_manage.h
@@ -4,6 +4,7 @@
#include "stream_internal.h"
#include "stream_register.h"
#include "timeout.h"
+#include "token_bucket.h"
#ifdef __cplusplus
extern "C" {
@@ -91,6 +92,7 @@ struct stream_list
long last_update_timer_s;
long interval_to_next_timeout_s;
timeout_error_t timer_error;
+ struct token_bucket *timeout_ratelimiter;
};
struct stream_index_list_item
@@ -118,6 +120,10 @@ struct global_stream
long user_define_timer_cnt;
long interval_to_next_timeout_s;
timeout_error_t timer_error;
+ struct token_bucket *tcp_timeout_ratelimiter;
+ struct token_bucket *udp_timeout_ratelimiter;
+ struct token_bucket *udp_opening_ratelimiter;
+ struct token_bucket *tcp_opening_ratelimiter;
};
typedef struct _stStreamFunInfo
diff --git a/include/support/token_bucket.h b/include/support/token_bucket.h
new file mode 100644
index 0000000..44e50ee
--- /dev/null
+++ b/include/support/token_bucket.h
@@ -0,0 +1,6 @@
+#pragma once
+
+struct token_bucket;
+struct token_bucket *token_bucket_new(long long now_ms, long long CBS, long long CIR);
+void token_bucket_free(struct token_bucket *t);
+int token_bucket_consume(struct token_bucket *t, long long tokens, long long now_ms); \ No newline at end of file
diff --git a/src/config/config_parse.cpp b/src/config/config_parse.cpp
index 54015bb..26d4e0d 100644
--- a/src/config/config_parse.cpp
+++ b/src/config/config_parse.cpp
@@ -1771,6 +1771,9 @@ int sapp_parse_config(void)
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"stream.tcp", (char *)"closing_timeout", (int *)&pconfig->stream.tcp.closing_timeout, 30);
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"stream.tcp", (char *)"opening_timeout", (int *)&pconfig->stream.tcp.opening_timeout, 10);
+
+ tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"stream.tcp", (char *)"max_timeouts_per_sec", (int *)&pconfig->stream.tcp.max_timeouts_per_sec, 0);
+ tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"stream.tcp", (char *)"max_opening_per_sec", (int *)&pconfig->stream.tcp.max_opening_per_sec, 0);
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"stream.tcp.inject", (char *)"link_mss", (int *)&pconfig->stream.tcp.inject.link_mss, 1460);
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"stream.tcp.inject.rst", (char *)"auto_remedy", (int *)&pconfig->stream.tcp.inject.auto_remedy, 0);
@@ -1786,7 +1789,9 @@ int sapp_parse_config(void)
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"stream.udp", (char *)"timeout", (int *)&pconfig->stream.udp.timeout, 60);
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"stream.udp", (char *)"meaningful_statistics_minimum_pkt", (int *)&pconfig->stream.udp.meaningful_statistics_minimum_pkt, 3);
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"stream.udp", (char *)"meaningful_statistics_minimum_byte", (int *)&pconfig->stream.udp.meaningful_statistics_minimum_byte, 5);
-
+
+ tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"stream.udp", (char *)"max_timeouts_per_sec", (int *)&pconfig->stream.udp.max_timeouts_per_sec, 0);
+ tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"stream.udp", (char *)"max_opening_per_sec", (int *)&pconfig->stream.udp.max_opening_per_sec, 0);
/******************************* PROFILING.metric.fs2 **********************/
diff --git a/src/dealpkt/deal_tcp.c b/src/dealpkt/deal_tcp.c
index 3fa1660..f7b13df 100644
--- a/src/dealpkt/deal_tcp.c
+++ b/src/dealpkt/deal_tcp.c
@@ -751,8 +751,14 @@ static struct streamindex *tcp_add_new_stream_bysyn(struct streamindex *pindex,
if(pstream_pr->create_dir_by_well_known_port != 1)
{
pstream_pr->under_ddos_bypass = packet_io_under_ddos_should_bypass(threadnum);
- }
- }
+ if (pstream_pr->under_ddos_bypass == 0 && G_MESA_GLOBAL_STREAM[threadnum]->tcp_opening_ratelimiter != NULL)
+ {
+ if(token_bucket_consume(G_MESA_GLOBAL_STREAM[threadnum]->tcp_opening_ratelimiter, 1,
+ g_current_time_ms) == 0)
+ pstream_pr->under_ddos_bypass = 1;
+ }
+ }
+ }
/* 2014-07-31 LiJia add, for set one stream unorder number */
if(pdetail_pr->pclient){
diff --git a/src/dealpkt/stream_manage.c b/src/dealpkt/stream_manage.c
index 50a58f7..f7636c7 100644
--- a/src/dealpkt/stream_manage.c
+++ b/src/dealpkt/stream_manage.c
@@ -29,6 +29,7 @@
#include "stream_inc/stream_base.h"
#include "stream_manage.h"
#include "timeout.h"
+#include <sys/types.h>
#ifdef __cplusplus
extern "C" {
@@ -85,20 +86,47 @@ static int init_stream_detail(struct global_stream *g_stream)
{
int i = 0;
memset(g_stream, 0, sizeof(struct global_stream));
+
+ if (sapp_global_val->config.stream.udp.max_opening_per_sec > 0)
+ {
+ g_stream->udp_opening_ratelimiter =
+ token_bucket_new(g_CurrentTime_ms, sapp_global_val->config.stream.udp.max_opening_per_sec,
+ sapp_global_val->config.stream.udp.max_opening_per_sec);
+ }
+ if (sapp_global_val->config.stream.tcp.max_opening_per_sec > 0)
+ {
+ g_stream->tcp_opening_ratelimiter =
+ token_bucket_new(g_CurrentTime_ms, sapp_global_val->config.stream.tcp.max_opening_per_sec,
+ sapp_global_val->config.stream.tcp.max_opening_per_sec);
+ }
+ if (sapp_global_val->config.stream.udp.max_timeouts_per_sec > 0)
+ {
+ g_stream->udp_timeout_ratelimiter =
+ token_bucket_new(g_CurrentTime_ms, sapp_global_val->config.stream.udp.max_timeouts_per_sec,
+ sapp_global_val->config.stream.udp.max_timeouts_per_sec);
+ }
+ if (sapp_global_val->config.stream.tcp.max_timeouts_per_sec > 0)
+ {
+ g_stream->tcp_timeout_ratelimiter =
+ token_bucket_new(g_CurrentTime_ms, sapp_global_val->config.stream.tcp.max_timeouts_per_sec,
+ sapp_global_val->config.stream.tcp.max_timeouts_per_sec);
+ }
+
g_stream->user_define_timer = timeouts_open(0, &g_stream->timer_error);
for (i = 0; i < MAX_TCP_STATE; i++)
{
g_stream->tcpList[i].max_cnt = tcpstate_num[i];
g_stream->tcpList[i].cnt = 0;
g_stream->tcpList[i].streamindex_timer = timeouts_open(0, &g_stream->tcpList[i].timer_error);
+ g_stream->tcpList[i].timeout_ratelimiter = g_stream->tcp_timeout_ratelimiter;
}
for (i = 0; i < MAX_UDP_STATE; i++)
{
g_stream->udpList[i].max_cnt = udpstate_num[i];
g_stream->udpList[i].cnt = 0;
g_stream->udpList[i].streamindex_timer = timeouts_open(0, &g_stream->udpList[i].timer_error);
- }
-
+ g_stream->udpList[i].timeout_ratelimiter = g_stream->udp_timeout_ratelimiter;
+ }
//Ԥ�ȷ���hashָ��
g_stream->tcp_stream_table = (struct streamindex **)sapp_mem_malloc(SAPP_MEM_FIX_GLOBAL_STREAM, -1, tcp_stream_table_size * sizeof(char *));
@@ -126,7 +154,7 @@ static int init_stream_detail(struct global_stream *g_stream)
g_stream->freeList_max_cnt = max_cnt;
g_stream->freeList_cnt = max_cnt;
- return 0;
+ return 0;
}
int tcp_set_flood_detect_model(int model)
@@ -435,9 +463,18 @@ extern int call_streamentry(struct streaminfo *a_stream, const void *this_iphdr,
modify by lqy 20150612
��ʱɾ��������ͷ����ʼ��һ��ɾ��
*/
+
+enum del_stream_by_time_returen_code
+{
+ DEL_STREAM_BY_TIME_RET_NORMAL = 0,
+ DEL_STREAM_BY_TIME_RET_DEL_SELF = 1,
+ DEL_STREAM_BY_TIME_RET_DEL_OTHER = 2,
+ DEL_STREAM_BY_TIME_RET_NO_TOKEN = 3,
+};
+
int del_stream_by_time(struct stream_list *plist, const struct streamindex *current_drive_index, int thread_id)
{
- int ret = 0;
+ int ret = DEL_STREAM_BY_TIME_RET_NORMAL;
struct timeout *t=NULL;
struct global_stream *g_stream = G_MESA_GLOBAL_STREAM[thread_id];
if(unlikely(plist->last_update_timer_s < g_CurrentTime))
@@ -451,42 +488,46 @@ int del_stream_by_time(struct stream_list *plist, const struct streamindex *curr
plist->last_update_timer_s = g_CurrentTime;
plist->interval_to_next_timeout_s=timeouts_timeout(plist->streamindex_timer);
}
- if (plist->interval_to_next_timeout_s == 0)
- {
- t = timeouts_get(plist->streamindex_timer);
- if (t != NULL)
- {
- struct streamindex *pindex = (struct streamindex *)sapp_get_struct_header(t, struct streamindex, timeout);
- struct streaminfo_private *pstream_pr = &(pindex->stream);
- struct streaminfo *pstream = &(pstream_pr->stream_public);
-
- if (pindex == current_drive_index)
- { /* ��ǰ�����ڵ�����ʱ, ˵��֮ǰ�ܳ�ʱ��û�а�����, ��ʱ����free, ���Ƿ���1, �ٵ���reset */
- if (STREAM_TYPE_TCP == pstream->type)
- {
- /* 2016-12-15 lijia add, ��ʱreset֮ǰ, ��Ҫ����link_state */
- ((struct tcpdetail_private *)(pstream->pdetail))->link_state = STREAM_LINK_TIMEOUT;
- }
- ret = 1;
- }
- else
- {
- if (STREAM_TYPE_TCP == pstream->type)
- {
- ((struct tcpdetail_private *)(pstream->pdetail))->link_state = STREAM_LINK_TIMEOUT;
- tcp_free_stream(pindex, NULL, NULL, NULL);
- ret = 2;
- }
- else
- {
- pstream_pr->stream_close_reason = STREAM_CLOSE_REASON_TIMEOUT;
- udp_free_stream(pindex);
- ret = 2;
- }
- }
- }
- }
- if(g_stream->user_define_timer_cnt > 0 && g_stream->interval_to_next_timeout_s == 0)
+ if (plist->interval_to_next_timeout_s == 0)
+ {
+ if ( plist->timeout_ratelimiter == NULL || token_bucket_consume(plist->timeout_ratelimiter, 1, g_current_time_ms) > 0)
+ {
+ t = timeouts_get(plist->streamindex_timer);
+ if (t != NULL)
+ {
+ struct streamindex *pindex =
+ (struct streamindex *)sapp_get_struct_header(t, struct streamindex, timeout);
+ struct streaminfo_private *pstream_pr = &(pindex->stream);
+ struct streaminfo *pstream = &(pstream_pr->stream_public);
+
+ if (pindex == current_drive_index)
+ { /* ��ǰ�����ڵ�����ʱ, ˵��֮ǰ�ܳ�ʱ��û�а�����, ��ʱ����free, ���Ƿ���1, �ٵ���reset */
+ if (STREAM_TYPE_TCP == pstream->type)
+ {
+ /* 2016-12-15 lijia add, ��ʱreset֮ǰ, ��Ҫ����link_state */
+ ((struct tcpdetail_private *)(pstream->pdetail))->link_state = STREAM_LINK_TIMEOUT;
+ }
+ ret = DEL_STREAM_BY_TIME_RET_DEL_SELF;
+ }
+ else
+ {
+ if (STREAM_TYPE_TCP == pstream->type)
+ {
+ ((struct tcpdetail_private *)(pstream->pdetail))->link_state = STREAM_LINK_TIMEOUT;
+ tcp_free_stream(pindex, NULL, NULL, NULL);
+ ret = DEL_STREAM_BY_TIME_RET_DEL_OTHER;
+ }
+ else
+ {
+ pstream_pr->stream_close_reason = STREAM_CLOSE_REASON_TIMEOUT;
+ udp_free_stream(pindex);
+ ret = DEL_STREAM_BY_TIME_RET_DEL_OTHER;
+ }
+ }
+ }
+ }
+ }
+ if(g_stream->user_define_timer_cnt > 0 && g_stream->interval_to_next_timeout_s == 0)
{
t = timeouts_get(g_stream->user_define_timer);
if (t != NULL)
@@ -1521,7 +1562,26 @@ void hash_add_stream(struct streamindex *pindex)
if(pindex->stream.create_dir_by_well_known_port != 1)
{
pindex->stream.under_ddos_bypass = packet_io_under_ddos_should_bypass(threadnum);
- }
+ if (pindex->stream.under_ddos_bypass == 0)
+ {
+ if (STREAM_TYPE_UDP == ptmp->type && G_MESA_GLOBAL_STREAM[threadnum]->udp_opening_ratelimiter != NULL)
+ {
+ if (token_bucket_consume(G_MESA_GLOBAL_STREAM[threadnum]->udp_opening_ratelimiter, 1,
+ g_current_time_ms) == 0)
+ {
+ pindex->stream.under_ddos_bypass = 1;
+ }
+ }
+ if (STREAM_TYPE_TCP == ptmp->type && G_MESA_GLOBAL_STREAM[threadnum]->tcp_opening_ratelimiter != NULL)
+ {
+ if (token_bucket_consume(G_MESA_GLOBAL_STREAM[threadnum]->tcp_opening_ratelimiter, 1,
+ g_current_time_ms) == 0)
+ {
+ pindex->stream.under_ddos_bypass = 1;
+ }
+ }
+ }
+ }
pindex->stream.global_stream_id = get_global_stream_id(threadnum);
pindex->stream.stream_trace_id=0;
@@ -2027,7 +2087,10 @@ void free_thread_stream(int thread_seq)
struct timeout *t = NULL;
struct timeouts_it it_all;
- g_stream=G_MESA_GLOBAL_STREAM[thread_seq];
+ g_stream=G_MESA_GLOBAL_STREAM[thread_seq];
+ token_bucket_free(g_stream->tcp_timeout_ratelimiter);
+ token_bucket_free(g_stream->udp_timeout_ratelimiter);
+ token_bucket_free(g_stream->udp_opening_ratelimiter);
timeouts_close(g_stream->user_define_timer);
for(j=0;j<MAX_TCP_STATE;j++)
{
@@ -2649,29 +2712,35 @@ int polling_stream_timeout(int tid)
struct stream_list *plist=NULL;
int i, ret, has_work = 0;
- for(i = UDP_ONE_STATE; i < MAX_UDP_STATE; i++){
- plist=&(G_MESA_GLOBAL_STREAM[tid]->udpList[i]);
- if( plist->cnt > 0){
- ret = del_stream_by_time(plist, NULL, tid);
- if(ret > 0){
- has_work = POLLING_STATE_WORK;
- break;
- }
- }
- }
+ for (i = UDP_ONE_STATE; i < MAX_UDP_STATE; i++)
+ {
+ plist = &(G_MESA_GLOBAL_STREAM[tid]->udpList[i]);
+ if (plist->cnt > 0)
+ {
+ ret = del_stream_by_time(plist, NULL, tid);
+ if (ret > 0)
+ {
+ has_work = POLLING_STATE_WORK;
+ break;
+ }
+ }
+ }
- for(i = TCP_SYN_STATE; i < MAX_TCP_STATE; i++){
- plist=&(G_MESA_GLOBAL_STREAM[tid]->tcpList[i]);
- if(plist->cnt > 0){ /* polling�׶ο��ܻ�û����, ����û�д�״̬����, plist����ΪNULL */
- ret = del_stream_by_time(plist, NULL, tid);
- if(ret > 0){
- has_work = POLLING_STATE_WORK;
- break;
- }
- }
- }
+ for (i = TCP_SYN_STATE; i < MAX_TCP_STATE; i++)
+ {
+ plist = &(G_MESA_GLOBAL_STREAM[tid]->tcpList[i]);
+ if (plist->cnt > 0)
+ { /* polling�׶ο��ܻ�û����, ����û�д�״̬����, plist����ΪNULL */
+ ret = del_stream_by_time(plist, NULL, tid);
+ if (ret > 0)
+ {
+ has_work = POLLING_STATE_WORK;
+ break;
+ }
+ }
+ }
- return has_work;
+ return has_work;
}
diff --git a/src/entry/CMakeLists.txt b/src/entry/CMakeLists.txt
index b5f9b3c..3cfef40 100644
--- a/src/entry/CMakeLists.txt
+++ b/src/entry/CMakeLists.txt
@@ -40,7 +40,7 @@ set(SAPP_MODULES timestamp_record md5
config timer tomlc99_wrap dpdk_ip_hash
gdev_assistant inline_keepalive libdabloom
sapp_dev sapp_assistant sapp_metrics
- platform_support cpu_limit timeout)
+ platform_support cpu_limit timeout token_bucket)
if(LIBEVENT_ENABLED STREQUAL "ON")
set(SAPP_MODULES ${SAPP_MODULES} libevent-static)
diff --git a/src/support/CMakeLists.txt b/src/support/CMakeLists.txt
index 5fef889..291c15f 100644
--- a/src/support/CMakeLists.txt
+++ b/src/support/CMakeLists.txt
@@ -10,3 +10,4 @@ add_subdirectory(dpdk_rte_hash)
add_subdirectory(cpu_limit)
add_subdirectory(dablooms)
add_subdirectory(timeout)
+add_subdirectory(token_bucket)
diff --git a/src/support/token_bucket/CMakeLists.txt b/src/support/token_bucket/CMakeLists.txt
new file mode 100644
index 0000000..4be1b99
--- /dev/null
+++ b/src/support/token_bucket/CMakeLists.txt
@@ -0,0 +1,7 @@
+cmake_minimum_required(VERSION 2.8...3.10)
+
+include_directories(${CMAKE_SOURCE_DIR}/include/support)
+
+add_definitions(-fPIC)
+
+add_library(token_bucket STATIC token_bucket.c)
diff --git a/src/support/token_bucket/token_bucket.c b/src/support/token_bucket/token_bucket.c
new file mode 100644
index 0000000..eeedba4
--- /dev/null
+++ b/src/support/token_bucket/token_bucket.c
@@ -0,0 +1,56 @@
+#include "token_bucket.h"
+
+#include <stdlib.h>
+
+#ifndef MIN
+#define MIN(a, b) (((a) > (b)) ? (b) : (a))
+#endif
+
+#define DEFALUT_REFILL_INTERVAL_MS 10
+
+struct token_bucket {
+ long long CBS; //
+ long long tokens; // available tokens
+ long long CIR; // fill tokens per time interval
+ long long last_refill_timestamp_ms; //
+};
+
+struct token_bucket *token_bucket_new(long long now_ms, long long CBS, long long CIR)
+{
+ struct token_bucket *tb = (struct token_bucket *)calloc(sizeof(struct token_bucket),1);
+ tb->CBS = CBS;
+ tb->tokens = CBS; // initially full
+ tb->CIR = CIR;
+ tb->last_refill_timestamp_ms = now_ms;
+ return tb;
+}
+
+void token_bucket_free(struct token_bucket *t)
+{
+ if(t)free(t);
+}
+
+int token_bucket_consume(struct token_bucket *t, long long tokens, long long now_ms)
+{
+ if (t == NULL || tokens <= 0 || now_ms <= 0)return 0;
+ if (tokens > t->CBS)return 0;
+ if (now_ms < t->last_refill_timestamp_ms) return 0;
+
+ //refill
+ long long delta_ms = now_ms - t->last_refill_timestamp_ms;
+ long long refill = (delta_ms * t->CIR)/1000;
+ if(delta_ms > DEFALUT_REFILL_INTERVAL_MS && refill > 0)
+ {
+
+ t->tokens += MIN(refill, t->CBS);
+ t->last_refill_timestamp_ms = now_ms;
+ }
+
+ //consume
+ if (t->tokens >= tokens)
+ {
+ t->tokens -= tokens;
+ return 1; // success
+ }
+ return 0; // failure
+} \ No newline at end of file