From f439b8a9ab926d87f3a3ae03d5061d3962a35ca5 Mon Sep 17 00:00:00 2001 From: pengxuanzheng Date: Wed, 23 Sep 2020 19:06:09 +0800 Subject: 线程池支持 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/hos_hash.cpp | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) (limited to 'src/hos_hash.cpp') diff --git a/src/hos_hash.cpp b/src/hos_hash.cpp index 5bbc7b5b..592f041b 100644 --- a/src/hos_hash.cpp +++ b/src/hos_hash.cpp @@ -17,6 +17,8 @@ void add_hos_info(hos_info_t **handle, hos_info_t *input) } else { + value->mode = input->mode; + value->handle = input->handle; value->bucket = input->bucket; value->object = input->object; value->callback = input->callback; @@ -31,22 +33,22 @@ hos_info_t *find_info_by_fd(hos_info_t *handle, size_t fd) return value; } -void delete_info_by_fd(hos_info_t *handle, size_t fd) +void delete_info_by_fd(hos_info_t **handle, size_t fd) { hos_info_t *value = NULL; - HASH_FIND_INT(handle, &fd, value); + HASH_FIND_INT(*handle, &fd, value); if (value) { - HASH_DEL(handle, value); + HASH_DEL(*handle, value); free(value); } } -void delete_all(hos_info_t *handle) +void delete_all(hos_info_t **handle) { hos_info_t *current, *tmp; - HASH_ITER(hh, handle, current, tmp) + HASH_ITER(hh, *handle, current, tmp) { - HASH_DEL(handle, current); + HASH_DEL(*handle, current); } } -- cgit v1.2.3 From 44261c1bb8aacaf657eec2cbc3e3039be1c5cc73 Mon Sep 17 00:00:00 2001 From: pengxuanzheng Date: Mon, 19 Oct 2020 15:35:16 +0800 Subject: 修改API,提供set options的方式设置参数 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/hos_client.cpp | 140 +++++++++++++++++++++++++++++++++++++---------------- src/hos_client.h | 22 ++++++++- src/hos_hash.cpp | 4 ++ src/hos_hash.h | 9 +++- 4 files changed, 131 insertions(+), 44 deletions(-) (limited to 'src/hos_hash.cpp') diff --git a/src/hos_client.cpp b/src/hos_client.cpp index b4e91867..805450b3 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -20,24 +20,27 @@ extern "C" #include "hos_client.h" #include "hos_hash.h" +#define MAX_HOS_CLIENT_THREAD_NUM 255 +#define MAX_HOS_CLIENT_FD_NUM 65535 + typedef struct hos_client_handle_s { Aws::S3::S3Client *S3Client; - size_t append_size; - size_t thread_sum; Aws::SDKOptions options; Aws::Vector buckets; + /* options */ + size_t cache_size; + size_t cache_times; + size_t thread_sum; }hos_client_handle_t; -#define MAX_THREAD_NUM 255 -#define MAX_FD_NUM 65535 -hos_info_t *hash_hos_info[MAX_THREAD_NUM]; -size_t fd_info[MAX_THREAD_NUM][MAX_FD_NUM]; +hos_info_t *hash_hos_info[MAX_HOS_CLIENT_THREAD_NUM]; +size_t fd_info[MAX_HOS_CLIENT_THREAD_NUM][MAX_HOS_CLIENT_FD_NUM]; static size_t hash_get_min_free_fd(size_t thread_id) { size_t i = 0; - for (i = 1; i < MAX_FD_NUM; i++) + for (i = 1; i < MAX_HOS_CLIENT_FD_NUM; i++) { if (!fd_info[thread_id][i]) return i; @@ -80,12 +83,31 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, } } -hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t thread_sum, size_t pool_size) +void set_cache_size(hos_client_handle client, size_t cache_size) +{ + client->cache_size = cache_size; + return ; +} + +void set_cache_times(hos_client_handle client, size_t cache_times) +{ + client->cache_times = cache_times; + return ; +} + +void set_thread_sum(hos_client_handle client, size_t thread_sum) +{ + client->thread_sum = thread_sum; + return ; +} + +hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t pool_size) { - if (!endpoint || !accesskeyid || !secretkey || thread_sum > MAX_THREAD_NUM) + if (!endpoint || !accesskeyid || !secretkey) { return NULL; } + Aws::SDKOptions options; //options.loggingOptions.logLevel = Aws::Utils::Logging::LogLevel::Debug; Aws::InitAPI(options); @@ -102,8 +124,6 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi config.executor = std::shared_ptr(std::make_shared(pool_size));//支持线程池 handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); - handle->append_size = 30 * 1024 * 1024; - handle->thread_sum = thread_sum; handle->options = options; /* 获取当前用户的所有的buckets */ Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets(); @@ -113,6 +133,10 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi handle->buckets = outcome.GetResult().GetBuckets(); } + handle->cache_size = 0; + handle->cache_times = 1; + handle->thread_sum = 1; + return handle; } @@ -166,8 +190,6 @@ int hos_create_bucket(hos_client_handle handle, const char *bucket) } } - //handle->buckets.push_back(); - return HOS_CLIENT_OK; } @@ -217,7 +239,7 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const sprintf(buf, "%lu %lu", thread_id, fd); context->SetUUID(buf); - hos_info_t info = {fd, 0, handle, bucket, object, (void *)callback, userdata, }; + hos_info_t info = {fd, 0, handle, bucket, object, (void *)callback, userdata, NULL, 0, 0, 0 }; add_hos_info(&hash_hos_info[thread_id], &info); fd_info[thread_id][fd] = 1; @@ -250,21 +272,23 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object return HOS_FD_NOT_ENOUGH; } - hos_info_t info = {fd, mode, handle, bucket, object, (void *)callback, userdata, }; + hos_info_t info = {fd, mode, handle, bucket, object, (void *)callback, userdata, NULL, handle->cache_size, handle->cache_times, 0, }; add_hos_info(&hash_hos_info[thread_id], &info); fd_info[thread_id][fd] = 1; return fd; } -int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id, size_t position) +int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id) { struct stat buffer; hos_info_t *hos_info = NULL; hos_client_handle handle = NULL; char num[128]; char buf[128]; - if ((fd == 0) || (stream == NULL) || (thread_id > MAX_THREAD_NUM)) + int flag = 0; // 0, 一次处理就可以完成;1,需要多次处理才能处理完 + int rest; // stream 剩余未处理的数据长度 + if ((fd == 0) || (stream == NULL) || (thread_id > MAX_HOS_CLIENT_THREAD_NUM)) { return HOS_PARAMETER_ERROR; } @@ -286,34 +310,56 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id request.SetBucket(hos_info->bucket); request.SetKey(hos_info->object); - //TODO APPEND MODE - snprintf(num, 128, "%lu", position); - Aws::Map headers; - if (hos_info->mode & APPEND_MODE) - { - headers["x-hos-upload-type"] = "append"; - headers["x-hos-position"] = num; - request.SetMetadata(headers); -#if 0 - request.AddMetadata("x-hos-upload-type", "append"); - request.AddMetadata("x-hos-position", num); -#endif - } - //设置上传数据类型 if (hos_info->mode & BUFF_MODE) { //BUFF_MODE -#if 1 - const std::shared_ptr input_data = - Aws::MakeShared(stream, stream + stream_len); - Aws::String buffer (stream, stream_len); - *input_data << buffer; -#else - Aws::StringStream *buffer = new Aws::StringStream(Aws::String(stream, stream_len)); - const std::shared_ptr input_data(buffer); -#endif - request.SetBody(input_data); + if (hos_info->mode & APPEND_MODE) + { + //APPEND_MODE + if (hos_info->cache == NULL) + { + hos_info->cache = Aws::MakeShared("append mode"); + } + if ((--hos_info->cache_times) && (stream_len <= hos_info->cache_rest)) + { + // cache + Aws::String buffer (stream, stream_len); + hos_info->cache_rest -= stream_len; + if (hos_info->cache_rest > 0) + { + return HOS_CLIENT_OK; + } + }else if (stream_len > hos_info->cache_rest) + { + // multi handle + flag = 1; + Aws::String buffer (stream, hos_info->cache_rest); + *hos_info->cache << buffer; + rest = stream_len - hos_info->cache_rest; + }else + { + //over cache_times + Aws::String buffer (stream, stream_len); + *hos_info->cache << buffer; + } + request.SetBody(hos_info->cache); + + // add headers + snprintf(num, 128, "%lu", ++hos_info->position); + Aws::Map headers; + if (hos_info->mode & APPEND_MODE) + { + headers["x-hos-upload-type"] = "append"; + headers["x-hos-position"] = num; + request.SetMetadata(headers); + } + }else + { + const std::shared_ptr input_data = + Aws::MakeShared("buffer mode"); + request.SetBody(input_data); + } } else { @@ -334,6 +380,18 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id context->SetUUID(buf); S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); + + //恢复fd 的cache设置 + if (hos_info->mode & APPEND_MODE) + { + hos_info->cache = NULL; + hos_info->cache_rest = hos_info->handle->cache_size; + hos_info->cache_times = hos_info->handle->cache_times; + } + while (flag == 1) + { + return hos_write(fd, &stream[hos_info->cache_rest], rest, thread_id); + } return HOS_CLIENT_OK; } diff --git a/src/hos_client.h b/src/hos_client.h index 668af9fd..adacbb90 100644 --- a/src/hos_client.h +++ b/src/hos_client.h @@ -82,7 +82,7 @@ typedef void (*put_finished_callback)(bool, const char *, void *); * size_t thread_sum 线程总数 * 返回值: 成功返回一个非空句柄,失败返回NULL。(失败原因都是因为输入参数不合法) *************************************************************************************/ -hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t thread_sum, size_t pool_size); +hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t pool_size); /************************************************************************************* * 函数名: hos_create_bucket * 参数: hos_client_handle handle 非空句柄 @@ -97,6 +97,24 @@ bool hos_verify_bucket(hos_client_handle handle, const char *bucket); * 返回值: int 成功返回0,S3错误返回s3errors错误码,hos client错误返回hoserrors错误码 *************************************************************************************/ int hos_create_bucket(hos_client_handle handle, const char *bucket); +/************************************************************************************* + * 函数名: set_cache_size + * 参数: hos_client_handle handle 非空句柄 + * size_t cache_size append 模式每次追加的buffer大小 +*************************************************************************************/ +void set_cache_size(hos_client_handle handle, size_t cache_size); +/************************************************************************************* + * 函数名: set_cache_times + * 参数: hos_client_handle handle 非空句柄 + * size_t cache_times append 模式追加次数 +*************************************************************************************/ +void set_cache_times(hos_client_handle handle, size_t cache_times); +/************************************************************************************* + * 函数名: set_thread_sum + * 参数: hos_client_handle handle 非空句柄 + * size_t thread_sum append 模式追加次数 +*************************************************************************************/ +void set_cache_times(hos_client_handle handle, size_t thread_sum); /************************************************************************************* * 函数名: hos_upload_async * 参数: hos_client_handle handle 非空句柄 @@ -142,7 +160,7 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object * size_t position append模式下的每段内容编号 * 返回值 int 成功返回0,失败返回hoserros错误码 *************************************************************************************/ -int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id, size_t position); +int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id); /************************************************************************************* * 函数名: hos_close_fd * 参数: size_t fd fd diff --git a/src/hos_hash.cpp b/src/hos_hash.cpp index 592f041b..f6bc8838 100644 --- a/src/hos_hash.cpp +++ b/src/hos_hash.cpp @@ -23,6 +23,10 @@ void add_hos_info(hos_info_t **handle, hos_info_t *input) value->object = input->object; value->callback = input->callback; value->userdata = input->userdata; + value->cache = input->cache; + value->cache_times = input->cache_times; + value->cache_rest = input->cache_rest; + value->position = input->position; } } diff --git a/src/hos_hash.h b/src/hos_hash.h index 7daf1b91..51e2a6b7 100644 --- a/src/hos_hash.h +++ b/src/hos_hash.h @@ -6,17 +6,24 @@ #ifndef __HOS_HASH_H__ #define __HOS_HASH_H__ +#include +#include "hos_client.h" #include "uthash.h" typedef struct hos_info_s { size_t fd; int mode; - void *handle; + hos_client_handle handle; const char *bucket; const char *object; void *callback; void *userdata; + std::shared_ptr cache; + //void *cache; + size_t cache_times; + size_t cache_rest; + size_t position; UT_hash_handle hh; }hos_info_t; -- cgit v1.2.3 From 5df452205030b36e5fe76eec69c7cd32229a5697 Mon Sep 17 00:00:00 2001 From: pengxuanzheng Date: Wed, 11 Nov 2020 11:20:19 +0800 Subject: 修复堆栈空间使用不当导致的object丢失 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/hos_client.cpp | 184 +++++++++++++++++++++++++++++++++++++++++------------ src/hos_hash.cpp | 30 ++++++++- src/hos_hash.h | 11 +++- 3 files changed, 181 insertions(+), 44 deletions(-) (limited to 'src/hos_hash.cpp') diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 9b1fa454..27cced9a 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -30,10 +30,13 @@ typedef struct hos_client_handle_s Aws::S3::S3Client *S3Client; Aws::SDKOptions options; Aws::Vector buckets; + pthread_t fd_thread; + int fd_thread_status; /* options */ size_t cache_size; size_t cache_times; size_t thread_sum; + size_t timeout; /* expand */ screen_stat_handle_t fs2_handle; pthread_t fs2_thread; @@ -52,10 +55,18 @@ typedef struct hos_client_handle_s int *rx_bytes_last; }hos_client_handle_t; +hos_client_handle hos_handle;//一个进程只允许有一个hos_handle hos_info_t *hash_hos_info[MAX_HOS_CLIENT_THREAD_NUM]; size_t fd_info[MAX_HOS_CLIENT_THREAD_NUM][MAX_HOS_CLIENT_FD_NUM]; Aws::SDKOptions options; +static inline size_t get_current_ms() +{ + struct timespec timenow; + clock_gettime(CLOCK_MONOTONIC, &timenow); + return (timenow.tv_sec * 1000 + timenow.tv_nsec / 1000 / 1000 ); +} + static size_t hash_get_min_free_fd(size_t thread_id) { size_t i = 0; @@ -67,6 +78,18 @@ static size_t hash_get_min_free_fd(size_t thread_id) return 0; } +static int hos_delete_fd(size_t fd, size_t thread_id) +{ + if (fd == 0) + { + return HOS_PARAMETER_ERROR; + } + delete_info_by_fd(&hash_hos_info[thread_id], fd); + fd_info[thread_id][fd] = 0; + + return HOS_CLIENT_OK; +} + static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, const Aws::S3::Model::PutObjectRequest& request, const Aws::S3::Model::PutObjectOutcome& outcome, @@ -95,10 +118,19 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, if (hos_info->mode & APPEND_MODE) { //APPEND MODE 保留fd + hos_info->recive_cnt++; +#if 0 + if (hos_info->fd_status == HOS_FD_INJECT) + { + if (hos_info->recive_cnt == hos_info->position) + hos_delete_fd(fd, thread_id); + } +#endif }else { //完整上传 删除fd - hos_close_fd(fd, thread_id); + //hos_delete_fd(fd, thread_id); + hos_info->fd_status = HOS_FD_INJECT; } } @@ -139,9 +171,12 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi return NULL; } - - hos_client_handle handle = (hos_client_handle)malloc(sizeof(hos_client_handle_t)); - memset(handle, 0, sizeof(hos_client_handle_t)); + if (hos_handle) + { + return hos_handle; + } + hos_handle = (hos_client_handle)malloc(sizeof(hos_client_handle_t)); + memset(hos_handle, 0, sizeof(hos_client_handle_t)); Aws::Client::ClientConfiguration config; Aws::Auth::AWSCredentials credentials(accesskeyid, secretkey); @@ -151,21 +186,23 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi config.enableEndpointDiscovery = true; config.executor = std::shared_ptr(std::make_shared(pool_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY));//支持线程池 - handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); - handle->options = options; + hos_handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); + hos_handle->options = options; /* 获取当前用户的所有的buckets */ - Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets(); + Aws::S3::Model::ListBucketsOutcome outcome = hos_handle->S3Client->ListBuckets(); - if (outcome.IsSuccess()) + if (!outcome.IsSuccess()) { - handle->buckets = outcome.GetResult().GetBuckets(); + return NULL; } - handle->cache_size = 0; - handle->cache_times = 1; - handle->thread_sum = 1; + hos_handle->buckets = outcome.GetResult().GetBuckets(); + hos_handle->cache_size = 0; + hos_handle->cache_times = 1; + hos_handle->thread_sum = 1; + hos_handle->timeout = 1000; - return handle; + return hos_handle; } static void *fs2_statistics(void *ptr) @@ -420,7 +457,7 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const sprintf(buf, "%lu %lu", thread_id, fd); context->SetUUID(buf); - hos_info_t info = {fd, 0, handle, bucket, object, (void *)callback, userdata, NULL, 0, 0, 0 }; + hos_info_t info = {fd, 0, handle, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, 0, 0, 0 }; add_hos_info(&hash_hos_info[thread_id], &info); fd_info[thread_id][fd] = 1; @@ -440,6 +477,37 @@ int hos_upload_buf(hos_client_handle handle, const char *bucket, const char *obj return hos_upload_stream(handle, bucket, object, buf, buf_len, callback, userdata, thread_id, 1); } +static void *hos_fd_manage(void *ptr) +{ + hos_info_t *hos_info; + hos_client_handle handle = (hos_client_handle)ptr; + size_t thread_sum = handle->thread_sum; + size_t thread_num; + size_t fd; + while(1) + { + if (handle->fd_thread_status) + break; + for (thread_num = 0; thread_num < thread_sum; thread_num++) + { + for(fd = 0; fd < MAX_HOS_CLIENT_FD_NUM; fd++) + { + if (!fd_info[thread_num][fd]) + break; + hos_info = find_info_by_fd(hash_hos_info[thread_num], fd); + if (!hos_info) + break; + if (hos_info->fd_status == HOS_FD_REGISTER) + continue; + if ((hos_info->position == hos_info->recive_cnt) || (hos_info->overtime <= get_current_ms())) + hos_delete_fd(fd, thread_num); + } + } + usleep(1000); + } + pthread_exit(NULL); +} + int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id, int mode) { if ((handle == NULL) || (bucket == NULL) || (object == NULL) || (thread_id > handle->thread_sum)) @@ -453,10 +521,16 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object return HOS_FD_NOT_ENOUGH; } - hos_info_t info = {fd, mode, handle, bucket, object, (void *)callback, userdata, NULL, handle->cache_times, handle->cache_size, 0, }; + hos_info_t info = {fd, mode, handle, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, handle->cache_times, handle->cache_size, 0, 0, HOS_FD_REGISTER, 0, handle->timeout,}; add_hos_info(&hash_hos_info[thread_id], &info); fd_info[thread_id][fd] = 1; - +#if 1 + if (handle->fd_thread == 0) + { + handle->fd_thread_status = 0; + pthread_create(&handle->fd_thread, NULL, hos_fd_manage, handle); + } +#endif return fd; } @@ -495,10 +569,8 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id Aws::S3::S3Client& S3Client = *(handle->S3Client); - // Create and configure the asynchronous put object request. + // create and configure the asynchronous put object request. Aws::S3::Model::PutObjectRequest request; - request.SetBucket(hos_info->bucket); - request.SetKey(hos_info->object); //设置上传数据类型 if (hos_info->mode & BUFF_MODE) @@ -511,27 +583,53 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id { hos_info->cache = Aws::MakeShared("append mode"); } - if ((--hos_info->cache_times) && (stream_len <= hos_info->cache_rest)) + if (hos_info->cache_times == 0) { - // cache - Aws::String buffer (stream, stream_len); - hos_info->cache_rest -= stream_len; - if (hos_info->cache_rest > 0) + //不设置cache_times的情况下 + if (stream_len < hos_info->cache_rest) + { + // cache + Aws::String buffer (stream, stream_len); + *hos_info->cache << buffer; + hos_info->cache_rest -= stream_len; + if (hos_info->cache_rest > 0) + { + return HOS_CLIENT_OK; + } + }else if (stream_len >= hos_info->cache_rest) { - return HOS_CLIENT_OK; + // multi handle + flag = 1; + Aws::String buffer (stream, hos_info->cache_rest); + *hos_info->cache << buffer; + rest = stream_len - hos_info->cache_rest; } - }else if (stream_len > hos_info->cache_rest) + }else { - // multi handle - flag = 1; - Aws::String buffer (stream, hos_info->cache_rest); - *hos_info->cache << buffer; - rest = stream_len - hos_info->cache_rest; - }else - { - //over cache_times - Aws::String buffer (stream, stream_len); - *hos_info->cache << buffer; + //设置cache times的情况下 + if ((--hos_info->cache_times) && (stream_len <= hos_info->cache_rest)) + { + // cache + Aws::String buffer (stream, stream_len); + *hos_info->cache << buffer; + hos_info->cache_rest -= stream_len; + if (hos_info->cache_rest > 0) + { + return HOS_CLIENT_OK; + } + }else if (stream_len > hos_info->cache_rest) + { + // multi handle + flag = 1; + Aws::String buffer (stream, hos_info->cache_rest); + *hos_info->cache << buffer; + rest = stream_len - hos_info->cache_rest; + }else + { + //over cache_times + Aws::String buffer (stream, stream_len); + *hos_info->cache << buffer; + } } request.SetBody(hos_info->cache); @@ -565,6 +663,9 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id request.SetBody(input_data); } + request.SetBucket(hos_info->bucket); + request.SetKey(hos_info->object); + //设置回调函数 std::shared_ptr context = Aws::MakeShared(""); @@ -657,9 +758,9 @@ int hos_close_fd(size_t fd, size_t thread_id) S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); } } - - delete_info_by_fd(&hash_hos_info[thread_id], fd); - fd_info[thread_id][fd] = 0; + hos_info->fd_status = HOS_FD_INJECT; + hos_info->cache.reset(); + hos_info->overtime = get_current_ms() + hos_info->timeout; return HOS_CLIENT_OK; } @@ -676,6 +777,11 @@ int hos_client_destory(hos_client_handle handle) Aws::Vector().swap(handle->buckets); + if (handle->fd_thread) + { + handle->fd_thread_status = 1; + pthread_join(handle->fd_thread, NULL); + } for (i = 0; i < handle->thread_sum; i++) { delete_all(&hash_hos_info[i]); diff --git a/src/hos_hash.cpp b/src/hos_hash.cpp index f6bc8838..c10e9494 100644 --- a/src/hos_hash.cpp +++ b/src/hos_hash.cpp @@ -13,20 +13,28 @@ void add_hos_info(hos_info_t **handle, hos_info_t *input) { value = (hos_info_t *)malloc(sizeof(hos_info_t)); memcpy(value, input, sizeof(hos_info_t)); + value->object = (char *)malloc(strlen(input->object) + 1); + value->bucket = (char *)malloc(strlen(input->bucket) + 1); + memcpy(value->bucket, input->bucket, strlen(input->bucket) + 1); + memcpy(value->object, input->object, strlen(input->object) + 1); HASH_ADD_INT(*handle, fd, value); } else { value->mode = input->mode; value->handle = input->handle; - value->bucket = input->bucket; - value->object = input->object; + memcpy(value->bucket, input->bucket, strlen(input->bucket) + 1); + memcpy(value->object, input->object, strlen(input->object) + 1); value->callback = input->callback; value->userdata = input->userdata; value->cache = input->cache; value->cache_times = input->cache_times; value->cache_rest = input->cache_rest; value->position = input->position; + value->recive_cnt = input->recive_cnt; + value->fd_status = value->fd_status; + value->overtime = value->overtime; + value->timeout = value->timeout; } } @@ -40,9 +48,18 @@ hos_info_t *find_info_by_fd(hos_info_t *handle, size_t fd) void delete_info_by_fd(hos_info_t **handle, size_t fd) { hos_info_t *value = NULL; + HASH_FIND_INT(*handle, &fd, value); if (value) { + if (value->bucket) + { + free(value->bucket); + } + if (value->object) + { + free(value->object); + } HASH_DEL(*handle, value); free(value); } @@ -53,6 +70,15 @@ void delete_all(hos_info_t **handle) hos_info_t *current, *tmp; HASH_ITER(hh, *handle, current, tmp) { + if (current->bucket) + { + free(current->bucket); + } + if (current->object) + { + free(current->object); + } HASH_DEL(*handle, current); + free(current); } } diff --git a/src/hos_hash.h b/src/hos_hash.h index 51e2a6b7..4bdf7b0c 100644 --- a/src/hos_hash.h +++ b/src/hos_hash.h @@ -15,15 +15,20 @@ typedef struct hos_info_s size_t fd; int mode; hos_client_handle handle; - const char *bucket; - const char *object; + char *bucket; + char *object; void *callback; void *userdata; std::shared_ptr cache; - //void *cache; size_t cache_times; size_t cache_rest; size_t position; + size_t recive_cnt; + int fd_status; +#define HOS_FD_REGISTER 0 +#define HOS_FD_INJECT 1 + size_t overtime; //计算后超时的时间 + size_t timeout; //配置的超时时间,从status变成INJECT开始计时 UT_hash_handle hh; }hos_info_t; -- cgit v1.2.3