summaryrefslogtreecommitdiff
path: root/common/src/tfe_future.cpp
diff options
context:
space:
mode:
authorzhengchao <[email protected]>2018-11-23 20:15:01 +0800
committerzhengchao <[email protected]>2018-11-23 21:17:55 +0800
commit5d20a525523fdc4491cf22dcbfeff74a89ae50c3 (patch)
tree2dc10deba6ea3f878309597de97f151b9dcc5b12 /common/src/tfe_future.cpp
parent6cd2b8186b8f71b0386db662926727b514228547 (diff)
通过增加promise_finish函数,实现future的cancel。
Diffstat (limited to 'common/src/tfe_future.cpp')
-rw-r--r--common/src/tfe_future.cpp57
1 files changed, 49 insertions, 8 deletions
diff --git a/common/src/tfe_future.cpp b/common/src/tfe_future.cpp
index d0c51f3..19f20f7 100644
--- a/common/src/tfe_future.cpp
+++ b/common/src/tfe_future.cpp
@@ -45,7 +45,9 @@ struct promise
{
struct future f;
void * ctx;
- int has_timeout;
+ char has_timeout;
+ char ref_cnt;
+ char may_success_many_times;
promise_ctx_destroy_cb * cb_ctx_destroy;
struct _future_promise_debug debug;
};
@@ -117,11 +119,34 @@ void future_promise_library_init(const char* profile)
g_is_FP_init=1;
return;
}
+static struct promise * __future_to_promise(struct future * f)
+{
+ return (struct promise *) f;
+}
+static void __promise_destroy(struct promise *p)
+{
+ if (p->cb_ctx_destroy != NULL)
+ {
+ p->cb_ctx_destroy(p->ctx);
+ }
+ if(!g_FP_instance.no_stats) FS_operate(g_FP_instance.fs_handle,g_FP_instance.fsid_f_num, 0, FS_OP_SUB, 1);
+ memset(p, 0, sizeof(struct promise));
+ free(p);
+ return;
+}
struct promise * future_to_promise(struct future * f)
{
+ struct promise *p=__future_to_promise(f);
+ p->ref_cnt++;
+ assert(p->ref_cnt==2);
return (struct promise *) f;
}
+void promise_allow_many_successes(struct promise *p)
+{
+ p->may_success_many_times=1;
+ return;
+}
struct field_get_set_args
{
MESA_htable_handle htable;
@@ -165,6 +190,7 @@ struct future * future_create(const char* symbol, future_success_cb * cb_success
p->f.user = user;
p->f.cb_success = cb_success;
p->f.cb_failed = cb_failed;
+ p->ref_cnt=1;
strncpy(p->f.symbol,symbol,sizeof(p->f.symbol));
if(!g_FP_instance.no_stats)
{
@@ -185,16 +211,23 @@ void future_set_timeout(struct future * f, struct timeval timeout)
p->has_timeout=1;
return;
}
+
void future_destroy(struct future * f)
{
- struct promise * p = future_to_promise(f);
- if (p->cb_ctx_destroy != NULL)
+ struct promise * p = __future_to_promise(f);
+ p->ref_cnt--;
+ if(p->ref_cnt==0)
{
- p->cb_ctx_destroy(p->ctx);
- }
- if(!g_FP_instance.no_stats) FS_operate(g_FP_instance.fs_handle,g_FP_instance.fsid_f_num, 0, FS_OP_SUB, 1);
- memset(p, 0, sizeof(struct promise));
- free(p);
+ __promise_destroy(p);
+ }
+}
+void promise_finish(struct promise * p)
+{
+ p->ref_cnt--;
+ if(p->ref_cnt==0)
+ {
+ __promise_destroy(p);
+ }
}
static void fp_stat_latency(struct _future_promise_debug* debug, int is_success)
{
@@ -220,6 +253,10 @@ void promise_failed(struct promise * p, enum e_future_error error, const char *
{
if(!g_FP_instance.no_stats) fp_stat_latency(&p->debug, 0);
p->f.cb_failed(error, what, p->f.user);
+ if(!p->may_success_many_times)
+ {
+ promise_finish(p);
+ }
return;
}
@@ -227,6 +264,10 @@ void promise_success(struct promise * p, void * result)
{
if(!g_FP_instance.no_stats) fp_stat_latency(&p->debug, 1);
p->f.cb_success(result, p->f.user);
+ if(!p->may_success_many_times)
+ {
+ promise_finish(p);
+ }
return;
}