summaryrefslogtreecommitdiff
path: root/plugin/protocol/http/src
diff options
context:
space:
mode:
authorLu Qiuwen <[email protected]>2018-10-22 21:22:59 +0800
committerLu Qiuwen <[email protected]>2018-10-22 21:22:59 +0800
commitbcfe14055ff4dc31ef79df87b4394e73924f5eed (patch)
treec82b70b5cd9b8668450f047a5075f2bdbab49ae5 /plugin/protocol/http/src
parent0f31b948bad858bda8e5d21e845827c648c512ec (diff)
增加HTTP Session延迟回收机制,完善HTTP Session销毁流程
Diffstat (limited to 'plugin/protocol/http/src')
-rw-r--r--plugin/protocol/http/src/http_entry.cpp142
-rw-r--r--plugin/protocol/http/src/http_half.cpp55
2 files changed, 172 insertions, 25 deletions
diff --git a/plugin/protocol/http/src/http_entry.cpp b/plugin/protocol/http/src/http_entry.cpp
index fa84675..2ec4348 100644
--- a/plugin/protocol/http/src/http_entry.cpp
+++ b/plugin/protocol/http/src/http_entry.cpp
@@ -5,17 +5,90 @@
#include <tfe_stream.h>
#include <http_parser.h>
#include <malloc.h>
+#include <pthread.h>
#include <http_common.h>
#include <http_half.h>
#include <assert.h>
#include <event.h>
+#include <tfe_proxy.h>
struct http_plugin __g_http_plugin;
struct http_plugin * g_http_plugin = &__g_http_plugin;
+struct session_gc_cb_closure
+{
+#ifndef NDEBUG
+ unsigned int __magic__;
+#endif
+ struct http_plugin * plugin;
+ unsigned int thread_id;
+};
+
+static void http_plugin_session_gc_cb(evutil_socket_t fd, short what, void * arg)
+{
+ struct session_gc_cb_closure * closure = (struct session_gc_cb_closure *) arg;
+ assert(closure->__magic__ == 0x8021);
+
+ struct http_plugin * plugin_ctx = closure->plugin;
+ struct hs_private_list * gc_list_hs_private = &plugin_ctx->gc_list_hs_private[closure->thread_id];
+
+ struct http_session_private * hs_private_iter = NULL;
+ struct http_session_private * hs_private_titer = NULL;
+
+ TAILQ_FOREACH_SAFE(hs_private_iter, gc_list_hs_private, next, hs_private_titer)
+ {
+ assert(hs_private_iter->release_lock >= 0);
+ if(hs_private_iter->release_lock > 0) continue;
+
+ TAILQ_REMOVE(gc_list_hs_private, hs_private_iter, next);
+
+ /* Call the http frame to raise END event */
+ if (hs_private_iter->ht_frame)
+ {
+ struct tfe_http_session * hs_public = to_hs_public(hs_private_iter);
+ http_frame_raise_session_end(hs_private_iter->ht_frame, NULL, hs_public, hs_private_iter->thread_id);
+ hs_private_iter->ht_frame = NULL;
+ }
+
+ hs_private_destroy(hs_private_iter);
+ fprintf(stderr, "---- http_plugin_session_gc_cb, close session by GC\n, %p", hs_private_iter);
+ }
+}
+
int http_plugin_init(struct tfe_proxy * proxy)
{
+ unsigned int nr_work_thread = tfe_proxy_get_work_thread_count();
+ struct http_plugin * plugin_ctx = g_http_plugin;
+
+ for (unsigned int thread_id = 0; thread_id < nr_work_thread; thread_id++)
+ {
+#ifndef NDEBUG
+ pthread_mutex_init(&plugin_ctx->lock_list_hs_private[thread_id], NULL);
+#endif
+ struct event_base * ev_base = tfe_proxy_get_work_thread_evbase(thread_id);
+ struct session_gc_cb_closure * closure = ALLOC(struct session_gc_cb_closure, 1);
+
+#ifndef NDEBUG
+ closure->__magic__ = 0x8021;
+#endif
+ closure->plugin = plugin_ctx;
+ closure->thread_id = thread_id;
+
+ /* TODO: Load GC delay from configure files */
+ struct timeval gc_delay = {0, 500 * 1000};
+ struct event * gc_event = event_new(ev_base, -1, EV_PERSIST, http_plugin_session_gc_cb, closure);
+
+ if (unlikely(gc_event == NULL))
+ {
+ /* TODO: write a log */
+ assert(0);
+ }
+
+ evtimer_add(gc_event, &gc_delay);
+ plugin_ctx->gc_event_hs_private[thread_id] = gc_event;
+ }
+
return 0;
}
@@ -28,7 +101,7 @@ int http_connection_entry_open(const struct tfe_stream * stream, unsigned int th
enum tfe_conn_dir dir, void ** pme)
{
struct http_connection_private * ht_conn = ALLOC(
- struct http_connection_private, 1);
+ struct http_connection_private, 1);
TAILQ_INIT(&ht_conn->hs_private_list);
TAILQ_INIT(&ht_conn->hs_private_orphan_list);
ht_conn->stream = stream;
@@ -139,11 +212,11 @@ int __on_request_handle_user_req_or_resp(const tfe_stream * stream, struct http_
if (hf_private_req_user->message_status == STATUS_COMPLETE)
{
hf_private_destory(hf_private_req_user);
- hs_private->hf_private_resp_user = NULL;
+ hs_private->hf_private_req_user = NULL;
}
assert(hf_private_req_in->stream_action == ACTION_DEFER_DATA
- || hf_private_req_in->stream_action == ACTION_DROP_DATA);
+ || hf_private_req_in->stream_action == ACTION_DROP_DATA);
hf_private_req_in->stream_action = ACTION_DROP_DATA;
}
@@ -172,7 +245,7 @@ int __on_request_handle_user_req_or_resp(const tfe_stream * stream, struct http_
}
assert(hf_private_req_in->stream_action == ACTION_DEFER_DATA
- || hf_private_req_in->stream_action == ACTION_DROP_DATA);
+ || hf_private_req_in->stream_action == ACTION_DROP_DATA);
hf_private_req_in->stream_action = ACTION_DROP_DATA;
}
@@ -201,12 +274,12 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
{
/* HTTP Request and Session */
hf_private_req_in = hf_private_create(TFE_HTTP_REQUEST, 1, 0);
- hs_private = hs_private_create(hc_private, hf_private_req_in, NULL);
+ hs_private = hs_private_create(hc_private, thread_id, hf_private_req_in, NULL);
hf_private_set_session(hf_private_req_in, hs_private);
/* Closure, catch stream, session and thread_id */
struct user_event_dispatch_closure * __closure = ALLOC(
- struct user_event_dispatch_closure, 1);
+ struct user_event_dispatch_closure, 1);
__closure->thread_id = thread_id;
__closure->stream = stream;
__closure->session = to_hs_public(hs_private);
@@ -241,7 +314,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
goto __errout;
}
- if(hf_private_req_in->is_user_stream_action_set)
+ if (hf_private_req_in->is_user_stream_action_set)
{
hf_private_req_in->stream_action = hf_private_req_in->user_stream_action;
}
@@ -249,7 +322,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
{
hf_private_req_in->stream_action = ACTION_FORWARD_DATA;
}
-
+
/* Ignore parse the content which is nullptr. */
goto __boundary;
}
@@ -319,7 +392,9 @@ __out:
{
http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id);
TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next);
- hs_private_destory(hs_private);
+
+ hs_private->ht_frame = NULL;
+ hs_private_destroy(hs_private);
}
return __action;
@@ -362,7 +437,7 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre
/* Closure, catch stream, session and thread_id */
struct user_event_dispatch_closure * __closure = ALLOC(
- struct user_event_dispatch_closure, 1);
+ struct user_event_dispatch_closure, 1);
__closure->thread_id = thread_id;
__closure->stream = stream;
__closure->session = to_hs_public(hs_private);
@@ -439,12 +514,14 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre
TAILQ_INSERT_TAIL(&hc_private->hs_private_orphan_list, hs_private, next);
}
- /* Nothing to do, everything is over, destroy the session */
+ /* Nothing to do, everything is over, destroy the session */
else
{
http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id);
TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next);
- hs_private_destory(hs_private);
+
+ hs_private->ht_frame = NULL;
+ hs_private_destroy(hs_private);
}
}
@@ -516,21 +593,45 @@ void http_connection_entry_close(const struct tfe_stream * stream, unsigned int
enum tfe_stream_close_reason reason, void ** pme)
{
struct http_connection_private * ht_conn = (struct http_connection_private *) (*pme);
- *pme = NULL;
+ struct http_plugin * plugin_ctx = g_http_plugin;
- /* Delete all live sessions */
- while (true)
- {
- struct http_session_private * hs_private_iter = TAILQ_FIRST(&ht_conn->hs_private_list);
- if (hs_private_iter == NULL) break;
+ struct http_session_private * hs_private_iter = NULL;
+ struct http_session_private * hs_private_titer = NULL;
+ TAILQ_FOREACH_SAFE(hs_private_iter, &ht_conn->hs_private_list, next, hs_private_titer)
+ {
TAILQ_REMOVE(&ht_conn->hs_private_list, hs_private_iter, next);
- hs_private_destory(hs_private_iter);
+
+ /* Call the http frame to raise END event */
+ if (hs_private_iter->ht_frame)
+ {
+ struct tfe_http_session * hs_public = to_hs_public(hs_private_iter);
+ http_frame_raise_session_end(hs_private_iter->ht_frame, stream, hs_public, hs_private_iter->thread_id);
+ hs_private_iter->ht_frame = NULL;
+ }
+
+ hs_private_gc_destroy(hs_private_iter, &plugin_ctx->gc_list_hs_private[thread_id]);
+ }
+
+ TAILQ_FOREACH_SAFE(hs_private_iter, &ht_conn->hs_private_orphan_list, next, hs_private_titer)
+ {
+ TAILQ_REMOVE(&ht_conn->hs_private_orphan_list, hs_private_iter, next);
+
+ if (hs_private_iter->ht_frame)
+ {
+ struct tfe_http_session * hs_public = to_hs_public(hs_private_iter);
+ http_frame_raise_session_end(hs_private_iter->ht_frame, stream, hs_public, hs_private_iter->thread_id);
+ hs_private_iter->ht_frame = NULL;
+ }
+
+ hs_private_gc_destroy(hs_private_iter, &plugin_ctx->gc_list_hs_private[thread_id]);
}
/* Clear session counter, and free ht_conn structure */
ht_conn->session_id_counter = 0;
free(ht_conn);
+
+ *pme = NULL;
}
static struct tfe_plugin __http_plugin_info =
@@ -544,5 +645,4 @@ static struct tfe_plugin __http_plugin_info =
.on_close = http_connection_entry_close
};
-TFE_PLUGIN_REGISTER(HTTP, __http_plugin_info
-)
+TFE_PLUGIN_REGISTER(HTTP, __http_plugin_info)
diff --git a/plugin/protocol/http/src/http_half.cpp b/plugin/protocol/http/src/http_half.cpp
index b41b026..a916113 100644
--- a/plugin/protocol/http/src/http_half.cpp
+++ b/plugin/protocol/http/src/http_half.cpp
@@ -588,6 +588,8 @@ int hf_ops_append_body(struct tfe_http_half * half, char * buff, size_t size, in
int hf_ops_body_begin(struct tfe_http_half * half, int by_stream)
{
struct http_half_private * hf_private = to_hf_private(half);
+ struct http_session_private * hs_private = hf_private->session;
+
assert(hf_private->evbuf_body == NULL);
if (by_stream)
@@ -602,6 +604,9 @@ int hf_ops_body_begin(struct tfe_http_half * half, int by_stream)
hf_private->content_encoding = HTTP_ACCEPT_ENCODING_NONE;
hf_private->is_setup_by_stream = true;
+
+ hs_private->stream_write_tag_effective = true;
+ hs_private->release_lock++;
}
hf_private->evbuf_body = evbuffer_new();
@@ -649,6 +654,8 @@ __out:
int hf_ops_body_end(struct tfe_http_half * half)
{
struct http_half_private * hf_private = to_hf_private(half);
+ struct http_session_private * hs_private = hf_private->session;
+
if (hf_private->write_ctx)
{
tfe_stream_write_frag_end(hf_private->write_ctx);
@@ -658,7 +665,13 @@ int hf_ops_body_end(struct tfe_http_half * half)
hf_private->body_status = STATUS_COMPLETE;
hf_private->message_status = STATUS_COMPLETE;
-// printf("frag write end, stream = %p, hf_private = %p\n", hf_private->session->hc_private->stream, hf_private);
+ if(hf_private->is_setup_by_stream)
+ {
+ hs_private->stream_write_tag_effective = true;
+ hs_private->release_lock++;
+ }
+
+ printf("frag write end, stream = %p, hf_private = %p\n", hf_private->session->hc_private->stream, hf_private);
return 0;
}
@@ -800,6 +813,7 @@ void hs_ops_suspend(struct tfe_http_session * session)
{
struct http_session_private * hs_private = to_hs_private(session);
hs_private->suspend_tag_user = true;
+ hs_private->release_lock++;
}
void hs_ops_resume(struct tfe_http_session * session)
@@ -808,6 +822,7 @@ void hs_ops_resume(struct tfe_http_session * session)
struct http_connection_private * hc_private = hs_private->hc_private;
hs_private->suspend_tag_user = false;
+ hs_private->release_lock--;
tfe_stream_resume(hc_private->stream);
}
@@ -1024,10 +1039,11 @@ void hf_private_construct(struct http_half_private * hf_private)
return;
}
-struct http_session_private * hs_private_create(struct http_connection_private * hc_private,
- struct http_half_private * hf_private_req, struct http_half_private * hf_private_resp)
+struct http_session_private * hs_private_create(struct http_connection_private * hc_private, unsigned int thread_id,
+ struct http_half_private * hf_private_req, struct http_half_private * hf_private_resp)
{
struct http_session_private * __hs_private = ALLOC(struct http_session_private, 1);
+ __hs_private->thread_id = thread_id;
/* HS-PUBLIC */
__hs_private->hs_public.ops = &__http_session_ops;
@@ -1085,12 +1101,43 @@ void __write_access_log(struct http_session_private * hs_private)
free(__access_log);
}
-void hs_private_destory(struct http_session_private * hs_private)
+void hs_private_destroy(struct http_session_private * hs_private)
{
+ struct http_half_private * hf_req = to_hf_request_private(hs_private);
+ struct http_half_private * hf_resp = to_hf_response_private(hs_private);
+
__write_access_log(hs_private);
+
+ if (hf_req != NULL)
+ {
+ hf_private_destory(hf_req);
+ }
+
+ if (hf_resp != NULL)
+ {
+ hf_private_destory(hf_resp);
+ }
+
+ if (hs_private->hf_private_req_user)
+ {
+ hf_private_destory(hs_private->hf_private_req_user);
+ }
+
+ if (hs_private->hf_private_resp_user)
+ {
+ hf_private_destory(hs_private->hf_private_resp_user);
+ }
+
+ assert(hs_private->ht_frame == NULL);
free(hs_private);
}
+void hs_private_gc_destroy(struct http_session_private * hs_private, struct hs_private_list * gc_list)
+{
+ if (hs_private->release_lock > 0) TAILQ_INSERT_TAIL(gc_list, hs_private, next);
+ else return hs_private_destroy(hs_private);
+}
+
void hs_private_hf_private_set(struct http_session_private * hs_private,
struct http_half_private * hf, enum tfe_http_direction direction)
{