diff options
| author | Lu Qiuwen <[email protected]> | 2018-10-22 21:22:59 +0800 |
|---|---|---|
| committer | Lu Qiuwen <[email protected]> | 2018-10-22 21:22:59 +0800 |
| commit | bcfe14055ff4dc31ef79df87b4394e73924f5eed (patch) | |
| tree | c82b70b5cd9b8668450f047a5075f2bdbab49ae5 /plugin/protocol/http/src | |
| parent | 0f31b948bad858bda8e5d21e845827c648c512ec (diff) | |
增加HTTP Session延迟回收机制,完善HTTP Session销毁流程
Diffstat (limited to 'plugin/protocol/http/src')
| -rw-r--r-- | plugin/protocol/http/src/http_entry.cpp | 142 | ||||
| -rw-r--r-- | plugin/protocol/http/src/http_half.cpp | 55 |
2 files changed, 172 insertions, 25 deletions
diff --git a/plugin/protocol/http/src/http_entry.cpp b/plugin/protocol/http/src/http_entry.cpp index fa84675..2ec4348 100644 --- a/plugin/protocol/http/src/http_entry.cpp +++ b/plugin/protocol/http/src/http_entry.cpp @@ -5,17 +5,90 @@ #include <tfe_stream.h> #include <http_parser.h> #include <malloc.h> +#include <pthread.h> #include <http_common.h> #include <http_half.h> #include <assert.h> #include <event.h> +#include <tfe_proxy.h> struct http_plugin __g_http_plugin; struct http_plugin * g_http_plugin = &__g_http_plugin; +struct session_gc_cb_closure +{ +#ifndef NDEBUG + unsigned int __magic__; +#endif + struct http_plugin * plugin; + unsigned int thread_id; +}; + +static void http_plugin_session_gc_cb(evutil_socket_t fd, short what, void * arg) +{ + struct session_gc_cb_closure * closure = (struct session_gc_cb_closure *) arg; + assert(closure->__magic__ == 0x8021); + + struct http_plugin * plugin_ctx = closure->plugin; + struct hs_private_list * gc_list_hs_private = &plugin_ctx->gc_list_hs_private[closure->thread_id]; + + struct http_session_private * hs_private_iter = NULL; + struct http_session_private * hs_private_titer = NULL; + + TAILQ_FOREACH_SAFE(hs_private_iter, gc_list_hs_private, next, hs_private_titer) + { + assert(hs_private_iter->release_lock >= 0); + if(hs_private_iter->release_lock > 0) continue; + + TAILQ_REMOVE(gc_list_hs_private, hs_private_iter, next); + + /* Call the http frame to raise END event */ + if (hs_private_iter->ht_frame) + { + struct tfe_http_session * hs_public = to_hs_public(hs_private_iter); + http_frame_raise_session_end(hs_private_iter->ht_frame, NULL, hs_public, hs_private_iter->thread_id); + hs_private_iter->ht_frame = NULL; + } + + hs_private_destroy(hs_private_iter); + fprintf(stderr, "---- http_plugin_session_gc_cb, close session by GC\n, %p", hs_private_iter); + } +} + int http_plugin_init(struct tfe_proxy * proxy) { + unsigned int nr_work_thread = tfe_proxy_get_work_thread_count(); + struct http_plugin * plugin_ctx = g_http_plugin; + + for (unsigned int thread_id = 0; thread_id < nr_work_thread; thread_id++) + { +#ifndef NDEBUG + pthread_mutex_init(&plugin_ctx->lock_list_hs_private[thread_id], NULL); +#endif + struct event_base * ev_base = tfe_proxy_get_work_thread_evbase(thread_id); + struct session_gc_cb_closure * closure = ALLOC(struct session_gc_cb_closure, 1); + +#ifndef NDEBUG + closure->__magic__ = 0x8021; +#endif + closure->plugin = plugin_ctx; + closure->thread_id = thread_id; + + /* TODO: Load GC delay from configure files */ + struct timeval gc_delay = {0, 500 * 1000}; + struct event * gc_event = event_new(ev_base, -1, EV_PERSIST, http_plugin_session_gc_cb, closure); + + if (unlikely(gc_event == NULL)) + { + /* TODO: write a log */ + assert(0); + } + + evtimer_add(gc_event, &gc_delay); + plugin_ctx->gc_event_hs_private[thread_id] = gc_event; + } + return 0; } @@ -28,7 +101,7 @@ int http_connection_entry_open(const struct tfe_stream * stream, unsigned int th enum tfe_conn_dir dir, void ** pme) { struct http_connection_private * ht_conn = ALLOC( - struct http_connection_private, 1); + struct http_connection_private, 1); TAILQ_INIT(&ht_conn->hs_private_list); TAILQ_INIT(&ht_conn->hs_private_orphan_list); ht_conn->stream = stream; @@ -139,11 +212,11 @@ int __on_request_handle_user_req_or_resp(const tfe_stream * stream, struct http_ if (hf_private_req_user->message_status == STATUS_COMPLETE) { hf_private_destory(hf_private_req_user); - hs_private->hf_private_resp_user = NULL; + hs_private->hf_private_req_user = NULL; } assert(hf_private_req_in->stream_action == ACTION_DEFER_DATA - || hf_private_req_in->stream_action == ACTION_DROP_DATA); + || hf_private_req_in->stream_action == ACTION_DROP_DATA); hf_private_req_in->stream_action = ACTION_DROP_DATA; } @@ -172,7 +245,7 @@ int __on_request_handle_user_req_or_resp(const tfe_stream * stream, struct http_ } assert(hf_private_req_in->stream_action == ACTION_DEFER_DATA - || hf_private_req_in->stream_action == ACTION_DROP_DATA); + || hf_private_req_in->stream_action == ACTION_DROP_DATA); hf_private_req_in->stream_action = ACTION_DROP_DATA; } @@ -201,12 +274,12 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea { /* HTTP Request and Session */ hf_private_req_in = hf_private_create(TFE_HTTP_REQUEST, 1, 0); - hs_private = hs_private_create(hc_private, hf_private_req_in, NULL); + hs_private = hs_private_create(hc_private, thread_id, hf_private_req_in, NULL); hf_private_set_session(hf_private_req_in, hs_private); /* Closure, catch stream, session and thread_id */ struct user_event_dispatch_closure * __closure = ALLOC( - struct user_event_dispatch_closure, 1); + struct user_event_dispatch_closure, 1); __closure->thread_id = thread_id; __closure->stream = stream; __closure->session = to_hs_public(hs_private); @@ -241,7 +314,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea goto __errout; } - if(hf_private_req_in->is_user_stream_action_set) + if (hf_private_req_in->is_user_stream_action_set) { hf_private_req_in->stream_action = hf_private_req_in->user_stream_action; } @@ -249,7 +322,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea { hf_private_req_in->stream_action = ACTION_FORWARD_DATA; } - + /* Ignore parse the content which is nullptr. */ goto __boundary; } @@ -319,7 +392,9 @@ __out: { http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id); TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next); - hs_private_destory(hs_private); + + hs_private->ht_frame = NULL; + hs_private_destroy(hs_private); } return __action; @@ -362,7 +437,7 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre /* Closure, catch stream, session and thread_id */ struct user_event_dispatch_closure * __closure = ALLOC( - struct user_event_dispatch_closure, 1); + struct user_event_dispatch_closure, 1); __closure->thread_id = thread_id; __closure->stream = stream; __closure->session = to_hs_public(hs_private); @@ -439,12 +514,14 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre TAILQ_INSERT_TAIL(&hc_private->hs_private_orphan_list, hs_private, next); } - /* Nothing to do, everything is over, destroy the session */ + /* Nothing to do, everything is over, destroy the session */ else { http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id); TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next); - hs_private_destory(hs_private); + + hs_private->ht_frame = NULL; + hs_private_destroy(hs_private); } } @@ -516,21 +593,45 @@ void http_connection_entry_close(const struct tfe_stream * stream, unsigned int enum tfe_stream_close_reason reason, void ** pme) { struct http_connection_private * ht_conn = (struct http_connection_private *) (*pme); - *pme = NULL; + struct http_plugin * plugin_ctx = g_http_plugin; - /* Delete all live sessions */ - while (true) - { - struct http_session_private * hs_private_iter = TAILQ_FIRST(&ht_conn->hs_private_list); - if (hs_private_iter == NULL) break; + struct http_session_private * hs_private_iter = NULL; + struct http_session_private * hs_private_titer = NULL; + TAILQ_FOREACH_SAFE(hs_private_iter, &ht_conn->hs_private_list, next, hs_private_titer) + { TAILQ_REMOVE(&ht_conn->hs_private_list, hs_private_iter, next); - hs_private_destory(hs_private_iter); + + /* Call the http frame to raise END event */ + if (hs_private_iter->ht_frame) + { + struct tfe_http_session * hs_public = to_hs_public(hs_private_iter); + http_frame_raise_session_end(hs_private_iter->ht_frame, stream, hs_public, hs_private_iter->thread_id); + hs_private_iter->ht_frame = NULL; + } + + hs_private_gc_destroy(hs_private_iter, &plugin_ctx->gc_list_hs_private[thread_id]); + } + + TAILQ_FOREACH_SAFE(hs_private_iter, &ht_conn->hs_private_orphan_list, next, hs_private_titer) + { + TAILQ_REMOVE(&ht_conn->hs_private_orphan_list, hs_private_iter, next); + + if (hs_private_iter->ht_frame) + { + struct tfe_http_session * hs_public = to_hs_public(hs_private_iter); + http_frame_raise_session_end(hs_private_iter->ht_frame, stream, hs_public, hs_private_iter->thread_id); + hs_private_iter->ht_frame = NULL; + } + + hs_private_gc_destroy(hs_private_iter, &plugin_ctx->gc_list_hs_private[thread_id]); } /* Clear session counter, and free ht_conn structure */ ht_conn->session_id_counter = 0; free(ht_conn); + + *pme = NULL; } static struct tfe_plugin __http_plugin_info = @@ -544,5 +645,4 @@ static struct tfe_plugin __http_plugin_info = .on_close = http_connection_entry_close }; -TFE_PLUGIN_REGISTER(HTTP, __http_plugin_info -) +TFE_PLUGIN_REGISTER(HTTP, __http_plugin_info) diff --git a/plugin/protocol/http/src/http_half.cpp b/plugin/protocol/http/src/http_half.cpp index b41b026..a916113 100644 --- a/plugin/protocol/http/src/http_half.cpp +++ b/plugin/protocol/http/src/http_half.cpp @@ -588,6 +588,8 @@ int hf_ops_append_body(struct tfe_http_half * half, char * buff, size_t size, in int hf_ops_body_begin(struct tfe_http_half * half, int by_stream) { struct http_half_private * hf_private = to_hf_private(half); + struct http_session_private * hs_private = hf_private->session; + assert(hf_private->evbuf_body == NULL); if (by_stream) @@ -602,6 +604,9 @@ int hf_ops_body_begin(struct tfe_http_half * half, int by_stream) hf_private->content_encoding = HTTP_ACCEPT_ENCODING_NONE; hf_private->is_setup_by_stream = true; + + hs_private->stream_write_tag_effective = true; + hs_private->release_lock++; } hf_private->evbuf_body = evbuffer_new(); @@ -649,6 +654,8 @@ __out: int hf_ops_body_end(struct tfe_http_half * half) { struct http_half_private * hf_private = to_hf_private(half); + struct http_session_private * hs_private = hf_private->session; + if (hf_private->write_ctx) { tfe_stream_write_frag_end(hf_private->write_ctx); @@ -658,7 +665,13 @@ int hf_ops_body_end(struct tfe_http_half * half) hf_private->body_status = STATUS_COMPLETE; hf_private->message_status = STATUS_COMPLETE; -// printf("frag write end, stream = %p, hf_private = %p\n", hf_private->session->hc_private->stream, hf_private); + if(hf_private->is_setup_by_stream) + { + hs_private->stream_write_tag_effective = true; + hs_private->release_lock++; + } + + printf("frag write end, stream = %p, hf_private = %p\n", hf_private->session->hc_private->stream, hf_private); return 0; } @@ -800,6 +813,7 @@ void hs_ops_suspend(struct tfe_http_session * session) { struct http_session_private * hs_private = to_hs_private(session); hs_private->suspend_tag_user = true; + hs_private->release_lock++; } void hs_ops_resume(struct tfe_http_session * session) @@ -808,6 +822,7 @@ void hs_ops_resume(struct tfe_http_session * session) struct http_connection_private * hc_private = hs_private->hc_private; hs_private->suspend_tag_user = false; + hs_private->release_lock--; tfe_stream_resume(hc_private->stream); } @@ -1024,10 +1039,11 @@ void hf_private_construct(struct http_half_private * hf_private) return; } -struct http_session_private * hs_private_create(struct http_connection_private * hc_private, - struct http_half_private * hf_private_req, struct http_half_private * hf_private_resp) +struct http_session_private * hs_private_create(struct http_connection_private * hc_private, unsigned int thread_id, + struct http_half_private * hf_private_req, struct http_half_private * hf_private_resp) { struct http_session_private * __hs_private = ALLOC(struct http_session_private, 1); + __hs_private->thread_id = thread_id; /* HS-PUBLIC */ __hs_private->hs_public.ops = &__http_session_ops; @@ -1085,12 +1101,43 @@ void __write_access_log(struct http_session_private * hs_private) free(__access_log); } -void hs_private_destory(struct http_session_private * hs_private) +void hs_private_destroy(struct http_session_private * hs_private) { + struct http_half_private * hf_req = to_hf_request_private(hs_private); + struct http_half_private * hf_resp = to_hf_response_private(hs_private); + __write_access_log(hs_private); + + if (hf_req != NULL) + { + hf_private_destory(hf_req); + } + + if (hf_resp != NULL) + { + hf_private_destory(hf_resp); + } + + if (hs_private->hf_private_req_user) + { + hf_private_destory(hs_private->hf_private_req_user); + } + + if (hs_private->hf_private_resp_user) + { + hf_private_destory(hs_private->hf_private_resp_user); + } + + assert(hs_private->ht_frame == NULL); free(hs_private); } +void hs_private_gc_destroy(struct http_session_private * hs_private, struct hs_private_list * gc_list) +{ + if (hs_private->release_lock > 0) TAILQ_INSERT_TAIL(gc_list, hs_private, next); + else return hs_private_destroy(hs_private); +} + void hs_private_hf_private_set(struct http_session_private * hs_private, struct http_half_private * hf, enum tfe_http_direction direction) { |
