diff options
| author | Lu Qiuwen <[email protected]> | 2018-10-26 20:30:06 +0800 |
|---|---|---|
| committer | Lu Qiuwen <[email protected]> | 2018-10-26 20:30:06 +0800 |
| commit | cf64f01f7f4d4e7dd3f308f8a67e338d57953303 (patch) | |
| tree | c7558f895ab489d1516afa0f7b831df5040c92cc /plugin | |
| parent | d3d34355ef9f261f91c017bc8af24c7611272003 (diff) | |
修正suspend/resume语义实现的若干问题,增加自行构建request/response的header标志
Diffstat (limited to 'plugin')
| -rw-r--r-- | plugin/business/pangu-http/src/pangu_http.cpp | 6 | ||||
| -rw-r--r-- | plugin/business/pangu-http/src/pangu_web_cache.cpp | 9 | ||||
| -rw-r--r-- | plugin/protocol/http/include/internal/http_common.h | 10 | ||||
| -rw-r--r-- | plugin/protocol/http/src/http_entry.cpp | 20 | ||||
| -rw-r--r-- | plugin/protocol/http/src/http_half.cpp | 55 | ||||
| -rw-r--r-- | plugin/protocol/http/test/test_http_half.cpp | 12 |
6 files changed, 81 insertions, 31 deletions
diff --git a/plugin/business/pangu-http/src/pangu_http.cpp b/plugin/business/pangu-http/src/pangu_http.cpp index cc3771b..9499cd5 100644 --- a/plugin/business/pangu-http/src/pangu_http.cpp +++ b/plugin/business/pangu-http/src/pangu_http.cpp @@ -907,6 +907,7 @@ int make_revalidate_request(const struct tfe_stream * stream, const struct tfe_h if(events & EV_HTTP_REQ_END && ctx->cache_revalidate_req) { ctx->cache_revalidate_req=NULL; + return RESUMED_CB_NO_MORE_CALLS; } return RESUMED_CB_MORE_CALLS; } @@ -1001,12 +1002,12 @@ static void cache_pending_on_succ(future_result_t * result, void * user) ctx->f_cache_pending=NULL; if(meta==NULL) { - ctx->pending_result==PENDING_RESULT_MISS; + ctx->pending_result = PENDING_RESULT_MISS; return; } if(!(meta->etag && meta->last_modified)) { - ctx->pending_result==PENDING_RESULT_FOBIDDEN; + ctx->pending_result = PENDING_RESULT_FOBIDDEN; return; } ctx->pending_result=PENDING_RESULT_REVALIDATE; @@ -1058,6 +1059,7 @@ void cache_pending(const struct tfe_http_session * session, unsigned int thread_ { case PENDING_RESULT_REVALIDATE: ctx->ref_session=tfe_http_session_allow_write(session); + assert(ctx->ref_session != NULL); tfe_http_session_suspend(ctx->ref_session); break; case PENDING_RESULT_ALLOWED: diff --git a/plugin/business/pangu-http/src/pangu_web_cache.cpp b/plugin/business/pangu-http/src/pangu_web_cache.cpp index a20b2d7..7354145 100644 --- a/plugin/business/pangu-http/src/pangu_web_cache.cpp +++ b/plugin/business/pangu-http/src/pangu_web_cache.cpp @@ -432,7 +432,7 @@ static void cache_query_meta_on_succ(future_result_t * result, void * user) struct cache_pending_context* ctx=(struct cache_pending_context*)promise_get_ctx(p); struct tango_cache_result* _result=tango_cache_read_result(result); ctx->ref_tango_cache_result=_result; - time_t cache_last_modified_time=0, request_last_modified_time=0; + switch(_result->type) { case RESULT_TYPE_HEADER: @@ -690,9 +690,14 @@ void web_cache_update(struct cache_update_context* ctx, const unsigned char * bo } void web_cache_update_end(struct cache_update_context* ctx) { + fprintf(stderr, "------- web_cache_update_end , %p\n", ctx); + tango_cache_update_end(ctx->write_ctx); + ATOMIC_DEC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOADING])); + + ctx->write_ctx = NULL; + ctx->ref_cache_handle = NULL; free(ctx); - ATOMIC_DEC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOADING])); return; } diff --git a/plugin/protocol/http/include/internal/http_common.h b/plugin/protocol/http/include/internal/http_common.h index 56fa65f..e220839 100644 --- a/plugin/protocol/http/include/internal/http_common.h +++ b/plugin/protocol/http/include/internal/http_common.h @@ -41,18 +41,20 @@ struct http_session_private struct http_half_private * hf_private_req_user; /* USER SETUP RESPONSE HALF */ struct http_half_private * hf_private_resp_user; - /* SUSPEND TAG */ - bool suspend_tag_user; /* SUSPEND EVENT */ tfe_http_event suspend_event; /* SUSPEND TAG EFFECTIVE */ bool suspend_tag_effective; - /* STREAM WRITE EFFECTIVE */ - bool stream_write_tag_effective; /* RELEASE LOCK, when the tag is zero, the session can be destroyed */ int release_lock; /* thread id */ unsigned int thread_id; + + /* SUSPEND COUNTER, ONLY FOR DEBUG AND LOG */ +#ifndef NDEBUG + int suspend_counter; +#endif + bool in_gc_queue; }; struct http_connection_private diff --git a/plugin/protocol/http/src/http_entry.cpp b/plugin/protocol/http/src/http_entry.cpp index 8f42893..e496607 100644 --- a/plugin/protocol/http/src/http_entry.cpp +++ b/plugin/protocol/http/src/http_entry.cpp @@ -309,13 +309,19 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea /* The session is suspended, and to resume */ if (hs_private->suspend_tag_effective) { - hf_private_req_in->event_cb(hf_private_req_in, hs_private->suspend_event, NULL, 0, - hf_private_req_in->event_cb_user); + enum tfe_http_event __backup_event = hs_private->suspend_event; + /* Clean up suspend tag, we can support user's call suspend in this callback */ hs_private->suspend_event = (enum tfe_http_event) 0; hs_private->suspend_tag_effective = false; - hs_private->suspend_tag_user = false; + hs_private->suspend_counter++; + + /* Call user callback, tell user we resume from suspend */ + hf_private_req_in->event_cb(hf_private_req_in, hs_private->suspend_event, NULL, 0, + hf_private_req_in->event_cb_user); + +#if 0 if (__on_request_handle_user_req_or_resp(stream, hs_private, hf_private_req_in, __need_to_close_the_session) < 0) { @@ -333,6 +339,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea /* Ignore parse the content which is nullptr. */ goto __boundary; +#endif } /* Parse the content, the data which in defered state has been ignored. */ @@ -351,13 +358,8 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea } /* Suspend */ - if (hs_private->suspend_tag_user) + if (hs_private->suspend_tag_effective) { - assert(!hs_private->suspend_tag_effective); - hs_private->suspend_tag_effective = true; - - /* Suspend TCP stream */ - tfe_stream_suspend(stream, CONN_DIR_DOWNSTREAM); return ACTION_DEFER_DATA; } diff --git a/plugin/protocol/http/src/http_half.cpp b/plugin/protocol/http/src/http_half.cpp index a85374b..7f04128 100644 --- a/plugin/protocol/http/src/http_half.cpp +++ b/plugin/protocol/http/src/http_half.cpp @@ -348,7 +348,7 @@ static int __parser_callback_on_headers_complete(http_parser * parser) /* user's suspend tag is set, which indicate that the way to handle request/response * cannot be determinate at now, need to defer */ - else if (hs_private && hs_private->suspend_tag_user) + else if (hs_private && hs_private->suspend_tag_effective) { /* Pause parser, prevent to parse request/response body, * The body should be parsed after resume() */ @@ -362,7 +362,7 @@ static int __parser_callback_on_headers_complete(http_parser * parser) assert(hf_private->stream_action == ACTION_DEFER_DATA); } - /* Otherwise, forward the request/response */ + /* Otherwise, forward the request/response */ else { hf_private->stream_action = ACTION_FORWARD_DATA; @@ -615,9 +615,8 @@ int hf_ops_body_begin(struct tfe_http_half * half, int by_stream) } hf_private->content_encoding = HTTP_ACCEPT_ENCODING_NONE; + hf_private->message_status = STATUS_READING; hf_private->is_setup_by_stream = true; - - hs_private->stream_write_tag_effective = true; hs_private->release_lock++; } @@ -679,8 +678,7 @@ int hf_ops_body_end(struct tfe_http_half * half) if(hf_private->is_setup_by_stream) { - hs_private->stream_write_tag_effective = true; - hs_private->release_lock++; + hs_private->release_lock--; } printf("frag write end, stream = %p, hf_private = %p\n", hf_private->session->hc_private->stream, hf_private); @@ -802,7 +800,7 @@ int hf_private_parse(struct http_half_private * hf_private, const unsigned char * resume it to normal status */ if (__is_paused) { - hf_private->parse_cursor += sz_parsed + 1; + hf_private->parse_cursor += sz_parsed; return 1; } @@ -829,8 +827,15 @@ void hs_ops_drop(struct tfe_http_session * session) void hs_ops_suspend(struct tfe_http_session * session) { struct http_session_private * hs_private = to_hs_private(session); - hs_private->suspend_tag_user = true; + struct http_connection_private * hc_private = hs_private->hc_private; + hs_private->release_lock++; + hs_private->suspend_counter++; + hs_private->suspend_tag_effective = true; + + tfe_stream_suspend(hc_private->stream, CONN_DIR_DOWNSTREAM); + fprintf(stderr, "---- suspend ----, url = %s, counter = %d\n", + hs_private->hs_public.req->req_spec.url, hs_private->suspend_counter); } void hs_ops_resume(struct tfe_http_session * session) @@ -838,9 +843,16 @@ void hs_ops_resume(struct tfe_http_session * session) struct http_session_private * hs_private = to_hs_private(session); struct http_connection_private * hc_private = hs_private->hc_private; - hs_private->suspend_tag_user = false; + assert(!hs_private->in_gc_queue); + hs_private->release_lock--; - tfe_stream_resume(hc_private->stream); + if (hs_private->suspend_tag_effective) + { + tfe_stream_resume(hc_private->stream); + } + + fprintf(stderr, "---- resume ----, url = %s, counter = %d\n", + hs_private->hs_public.req->req_spec.url, hs_private->suspend_counter); } // TODO: change the return type to int, there is something happend where -1 returned. @@ -865,6 +877,8 @@ void hs_ops_request_set(struct tfe_http_session * session, struct tfe_http_half assert(hs_private->hf_private_req_user == NULL); hs_private->hf_private_req_user = hf_user_private; + hs_private->hf_private_req_user->message_status = STATUS_COMPLETE; + hs_private->hf_private_req_user->session = hs_private; } void hs_ops_response_set(struct tfe_http_session * session, struct tfe_http_half * resp) @@ -897,7 +911,8 @@ void hs_ops_response_set(struct tfe_http_session * session, struct tfe_http_half assert(hs_private->hf_private_resp_user == NULL); hs_private->hf_private_resp_user = hf_private; - hf_private->session = hs_private; + hs_private->hf_private_resp_user->message_status = STATUS_COMPLETE; + hs_private->hf_private_resp_user->session = hs_private; } struct tfe_http_half * hs_ops_request_create(struct tfe_http_session * session, @@ -1042,6 +1057,8 @@ void hf_private_construct(struct http_half_private * hf_private) evbuffer_add_printf(hf_private->evbuf_raw, "%s: %s\r\n", str_field, str_value); } + /* Trace Tags */ + evbuffer_add_printf(hf_private->evbuf_raw, "%s: tfe/%s\r\n", "X-TG-Construct-By", tfe_version()); /* delimitor between header and body */ evbuffer_add_printf(hf_private->evbuf_raw, "\r\n"); @@ -1154,8 +1171,20 @@ void hs_private_destroy(struct http_session_private * hs_private) void hs_private_gc_destroy(struct http_session_private * hs_private, struct hs_private_list * gc_list) { - if (hs_private->release_lock > 0) TAILQ_INSERT_TAIL(gc_list, hs_private, next); - else return hs_private_destroy(hs_private); + if (hs_private->release_lock > 0) + { + TAILQ_INSERT_TAIL(gc_list, hs_private, next); + hs_private->in_gc_queue = true; + } + else + { + return hs_private_destroy(hs_private); + } +} + +bool hs_private_can_destroy(struct http_session_private * hs_private) +{ + return hs_private->release_lock <= 0; } void hs_private_hf_private_set(struct http_session_private * hs_private, diff --git a/plugin/protocol/http/test/test_http_half.cpp b/plugin/protocol/http/test/test_http_half.cpp index f453343..5d43c09 100644 --- a/plugin/protocol/http/test/test_http_half.cpp +++ b/plugin/protocol/http/test/test_http_half.cpp @@ -348,7 +348,7 @@ void __http_post_header_verify_helper(struct http_half_private * hf_private) } struct __post_http_req_ctx -{ + { unsigned int calling_seq{0}; size_t total_length{0}; @@ -1488,6 +1488,11 @@ void tfe_stream_resume(const struct tfe_stream * stream) return; } +void tfe_stream_suspend(const struct tfe_stream * stream, enum tfe_conn_dir by) +{ + return; +} + int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size) { return 0; @@ -1498,6 +1503,11 @@ void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx) return; } +const char * tfe_version() +{ + return NULL; +} + int main(int argc, char ** argv) { ::testing::InitGoogleTest(&argc, argv); |
