summaryrefslogtreecommitdiff
path: root/plugin
diff options
context:
space:
mode:
authorLu Qiuwen <[email protected]>2018-10-26 20:30:06 +0800
committerLu Qiuwen <[email protected]>2018-10-26 20:30:06 +0800
commitcf64f01f7f4d4e7dd3f308f8a67e338d57953303 (patch)
treec7558f895ab489d1516afa0f7b831df5040c92cc /plugin
parentd3d34355ef9f261f91c017bc8af24c7611272003 (diff)
修正suspend/resume语义实现的若干问题,增加自行构建request/response的header标志
Diffstat (limited to 'plugin')
-rw-r--r--plugin/business/pangu-http/src/pangu_http.cpp6
-rw-r--r--plugin/business/pangu-http/src/pangu_web_cache.cpp9
-rw-r--r--plugin/protocol/http/include/internal/http_common.h10
-rw-r--r--plugin/protocol/http/src/http_entry.cpp20
-rw-r--r--plugin/protocol/http/src/http_half.cpp55
-rw-r--r--plugin/protocol/http/test/test_http_half.cpp12
6 files changed, 81 insertions, 31 deletions
diff --git a/plugin/business/pangu-http/src/pangu_http.cpp b/plugin/business/pangu-http/src/pangu_http.cpp
index cc3771b..9499cd5 100644
--- a/plugin/business/pangu-http/src/pangu_http.cpp
+++ b/plugin/business/pangu-http/src/pangu_http.cpp
@@ -907,6 +907,7 @@ int make_revalidate_request(const struct tfe_stream * stream, const struct tfe_h
if(events & EV_HTTP_REQ_END && ctx->cache_revalidate_req)
{
ctx->cache_revalidate_req=NULL;
+ return RESUMED_CB_NO_MORE_CALLS;
}
return RESUMED_CB_MORE_CALLS;
}
@@ -1001,12 +1002,12 @@ static void cache_pending_on_succ(future_result_t * result, void * user)
ctx->f_cache_pending=NULL;
if(meta==NULL)
{
- ctx->pending_result==PENDING_RESULT_MISS;
+ ctx->pending_result = PENDING_RESULT_MISS;
return;
}
if(!(meta->etag && meta->last_modified))
{
- ctx->pending_result==PENDING_RESULT_FOBIDDEN;
+ ctx->pending_result = PENDING_RESULT_FOBIDDEN;
return;
}
ctx->pending_result=PENDING_RESULT_REVALIDATE;
@@ -1058,6 +1059,7 @@ void cache_pending(const struct tfe_http_session * session, unsigned int thread_
{
case PENDING_RESULT_REVALIDATE:
ctx->ref_session=tfe_http_session_allow_write(session);
+ assert(ctx->ref_session != NULL);
tfe_http_session_suspend(ctx->ref_session);
break;
case PENDING_RESULT_ALLOWED:
diff --git a/plugin/business/pangu-http/src/pangu_web_cache.cpp b/plugin/business/pangu-http/src/pangu_web_cache.cpp
index a20b2d7..7354145 100644
--- a/plugin/business/pangu-http/src/pangu_web_cache.cpp
+++ b/plugin/business/pangu-http/src/pangu_web_cache.cpp
@@ -432,7 +432,7 @@ static void cache_query_meta_on_succ(future_result_t * result, void * user)
struct cache_pending_context* ctx=(struct cache_pending_context*)promise_get_ctx(p);
struct tango_cache_result* _result=tango_cache_read_result(result);
ctx->ref_tango_cache_result=_result;
- time_t cache_last_modified_time=0, request_last_modified_time=0;
+
switch(_result->type)
{
case RESULT_TYPE_HEADER:
@@ -690,9 +690,14 @@ void web_cache_update(struct cache_update_context* ctx, const unsigned char * bo
}
void web_cache_update_end(struct cache_update_context* ctx)
{
+ fprintf(stderr, "------- web_cache_update_end , %p\n", ctx);
+
tango_cache_update_end(ctx->write_ctx);
+ ATOMIC_DEC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOADING]));
+
+ ctx->write_ctx = NULL;
+ ctx->ref_cache_handle = NULL;
free(ctx);
- ATOMIC_DEC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOADING]));
return;
}
diff --git a/plugin/protocol/http/include/internal/http_common.h b/plugin/protocol/http/include/internal/http_common.h
index 56fa65f..e220839 100644
--- a/plugin/protocol/http/include/internal/http_common.h
+++ b/plugin/protocol/http/include/internal/http_common.h
@@ -41,18 +41,20 @@ struct http_session_private
struct http_half_private * hf_private_req_user;
/* USER SETUP RESPONSE HALF */
struct http_half_private * hf_private_resp_user;
- /* SUSPEND TAG */
- bool suspend_tag_user;
/* SUSPEND EVENT */
tfe_http_event suspend_event;
/* SUSPEND TAG EFFECTIVE */
bool suspend_tag_effective;
- /* STREAM WRITE EFFECTIVE */
- bool stream_write_tag_effective;
/* RELEASE LOCK, when the tag is zero, the session can be destroyed */
int release_lock;
/* thread id */
unsigned int thread_id;
+
+ /* SUSPEND COUNTER, ONLY FOR DEBUG AND LOG */
+#ifndef NDEBUG
+ int suspend_counter;
+#endif
+ bool in_gc_queue;
};
struct http_connection_private
diff --git a/plugin/protocol/http/src/http_entry.cpp b/plugin/protocol/http/src/http_entry.cpp
index 8f42893..e496607 100644
--- a/plugin/protocol/http/src/http_entry.cpp
+++ b/plugin/protocol/http/src/http_entry.cpp
@@ -309,13 +309,19 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
/* The session is suspended, and to resume */
if (hs_private->suspend_tag_effective)
{
- hf_private_req_in->event_cb(hf_private_req_in, hs_private->suspend_event, NULL, 0,
- hf_private_req_in->event_cb_user);
+ enum tfe_http_event __backup_event = hs_private->suspend_event;
+ /* Clean up suspend tag, we can support user's call suspend in this callback */
hs_private->suspend_event = (enum tfe_http_event) 0;
hs_private->suspend_tag_effective = false;
- hs_private->suspend_tag_user = false;
+ hs_private->suspend_counter++;
+
+ /* Call user callback, tell user we resume from suspend */
+ hf_private_req_in->event_cb(hf_private_req_in, hs_private->suspend_event, NULL, 0,
+ hf_private_req_in->event_cb_user);
+
+#if 0
if (__on_request_handle_user_req_or_resp(stream, hs_private,
hf_private_req_in, __need_to_close_the_session) < 0)
{
@@ -333,6 +339,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
/* Ignore parse the content which is nullptr. */
goto __boundary;
+#endif
}
/* Parse the content, the data which in defered state has been ignored. */
@@ -351,13 +358,8 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
}
/* Suspend */
- if (hs_private->suspend_tag_user)
+ if (hs_private->suspend_tag_effective)
{
- assert(!hs_private->suspend_tag_effective);
- hs_private->suspend_tag_effective = true;
-
- /* Suspend TCP stream */
- tfe_stream_suspend(stream, CONN_DIR_DOWNSTREAM);
return ACTION_DEFER_DATA;
}
diff --git a/plugin/protocol/http/src/http_half.cpp b/plugin/protocol/http/src/http_half.cpp
index a85374b..7f04128 100644
--- a/plugin/protocol/http/src/http_half.cpp
+++ b/plugin/protocol/http/src/http_half.cpp
@@ -348,7 +348,7 @@ static int __parser_callback_on_headers_complete(http_parser * parser)
/* user's suspend tag is set, which indicate that the way to handle request/response
* cannot be determinate at now, need to defer */
- else if (hs_private && hs_private->suspend_tag_user)
+ else if (hs_private && hs_private->suspend_tag_effective)
{
/* Pause parser, prevent to parse request/response body,
* The body should be parsed after resume() */
@@ -362,7 +362,7 @@ static int __parser_callback_on_headers_complete(http_parser * parser)
assert(hf_private->stream_action == ACTION_DEFER_DATA);
}
- /* Otherwise, forward the request/response */
+ /* Otherwise, forward the request/response */
else
{
hf_private->stream_action = ACTION_FORWARD_DATA;
@@ -615,9 +615,8 @@ int hf_ops_body_begin(struct tfe_http_half * half, int by_stream)
}
hf_private->content_encoding = HTTP_ACCEPT_ENCODING_NONE;
+ hf_private->message_status = STATUS_READING;
hf_private->is_setup_by_stream = true;
-
- hs_private->stream_write_tag_effective = true;
hs_private->release_lock++;
}
@@ -679,8 +678,7 @@ int hf_ops_body_end(struct tfe_http_half * half)
if(hf_private->is_setup_by_stream)
{
- hs_private->stream_write_tag_effective = true;
- hs_private->release_lock++;
+ hs_private->release_lock--;
}
printf("frag write end, stream = %p, hf_private = %p\n", hf_private->session->hc_private->stream, hf_private);
@@ -802,7 +800,7 @@ int hf_private_parse(struct http_half_private * hf_private, const unsigned char
* resume it to normal status */
if (__is_paused)
{
- hf_private->parse_cursor += sz_parsed + 1;
+ hf_private->parse_cursor += sz_parsed;
return 1;
}
@@ -829,8 +827,15 @@ void hs_ops_drop(struct tfe_http_session * session)
void hs_ops_suspend(struct tfe_http_session * session)
{
struct http_session_private * hs_private = to_hs_private(session);
- hs_private->suspend_tag_user = true;
+ struct http_connection_private * hc_private = hs_private->hc_private;
+
hs_private->release_lock++;
+ hs_private->suspend_counter++;
+ hs_private->suspend_tag_effective = true;
+
+ tfe_stream_suspend(hc_private->stream, CONN_DIR_DOWNSTREAM);
+ fprintf(stderr, "---- suspend ----, url = %s, counter = %d\n",
+ hs_private->hs_public.req->req_spec.url, hs_private->suspend_counter);
}
void hs_ops_resume(struct tfe_http_session * session)
@@ -838,9 +843,16 @@ void hs_ops_resume(struct tfe_http_session * session)
struct http_session_private * hs_private = to_hs_private(session);
struct http_connection_private * hc_private = hs_private->hc_private;
- hs_private->suspend_tag_user = false;
+ assert(!hs_private->in_gc_queue);
+
hs_private->release_lock--;
- tfe_stream_resume(hc_private->stream);
+ if (hs_private->suspend_tag_effective)
+ {
+ tfe_stream_resume(hc_private->stream);
+ }
+
+ fprintf(stderr, "---- resume ----, url = %s, counter = %d\n",
+ hs_private->hs_public.req->req_spec.url, hs_private->suspend_counter);
}
// TODO: change the return type to int, there is something happend where -1 returned.
@@ -865,6 +877,8 @@ void hs_ops_request_set(struct tfe_http_session * session, struct tfe_http_half
assert(hs_private->hf_private_req_user == NULL);
hs_private->hf_private_req_user = hf_user_private;
+ hs_private->hf_private_req_user->message_status = STATUS_COMPLETE;
+ hs_private->hf_private_req_user->session = hs_private;
}
void hs_ops_response_set(struct tfe_http_session * session, struct tfe_http_half * resp)
@@ -897,7 +911,8 @@ void hs_ops_response_set(struct tfe_http_session * session, struct tfe_http_half
assert(hs_private->hf_private_resp_user == NULL);
hs_private->hf_private_resp_user = hf_private;
- hf_private->session = hs_private;
+ hs_private->hf_private_resp_user->message_status = STATUS_COMPLETE;
+ hs_private->hf_private_resp_user->session = hs_private;
}
struct tfe_http_half * hs_ops_request_create(struct tfe_http_session * session,
@@ -1042,6 +1057,8 @@ void hf_private_construct(struct http_half_private * hf_private)
evbuffer_add_printf(hf_private->evbuf_raw, "%s: %s\r\n", str_field, str_value);
}
+ /* Trace Tags */
+ evbuffer_add_printf(hf_private->evbuf_raw, "%s: tfe/%s\r\n", "X-TG-Construct-By", tfe_version());
/* delimitor between header and body */
evbuffer_add_printf(hf_private->evbuf_raw, "\r\n");
@@ -1154,8 +1171,20 @@ void hs_private_destroy(struct http_session_private * hs_private)
void hs_private_gc_destroy(struct http_session_private * hs_private, struct hs_private_list * gc_list)
{
- if (hs_private->release_lock > 0) TAILQ_INSERT_TAIL(gc_list, hs_private, next);
- else return hs_private_destroy(hs_private);
+ if (hs_private->release_lock > 0)
+ {
+ TAILQ_INSERT_TAIL(gc_list, hs_private, next);
+ hs_private->in_gc_queue = true;
+ }
+ else
+ {
+ return hs_private_destroy(hs_private);
+ }
+}
+
+bool hs_private_can_destroy(struct http_session_private * hs_private)
+{
+ return hs_private->release_lock <= 0;
}
void hs_private_hf_private_set(struct http_session_private * hs_private,
diff --git a/plugin/protocol/http/test/test_http_half.cpp b/plugin/protocol/http/test/test_http_half.cpp
index f453343..5d43c09 100644
--- a/plugin/protocol/http/test/test_http_half.cpp
+++ b/plugin/protocol/http/test/test_http_half.cpp
@@ -348,7 +348,7 @@ void __http_post_header_verify_helper(struct http_half_private * hf_private)
}
struct __post_http_req_ctx
-{
+ {
unsigned int calling_seq{0};
size_t total_length{0};
@@ -1488,6 +1488,11 @@ void tfe_stream_resume(const struct tfe_stream * stream)
return;
}
+void tfe_stream_suspend(const struct tfe_stream * stream, enum tfe_conn_dir by)
+{
+ return;
+}
+
int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size)
{
return 0;
@@ -1498,6 +1503,11 @@ void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx)
return;
}
+const char * tfe_version()
+{
+ return NULL;
+}
+
int main(int argc, char ** argv)
{
::testing::InitGoogleTest(&argc, argv);