diff options
| author | 彭宣正 <[email protected]> | 2021-07-12 11:09:53 +0800 |
|---|---|---|
| committer | 彭宣正 <[email protected]> | 2021-07-12 11:09:53 +0800 |
| commit | 0c27c779a62d2abed5e56bbe2cbd6c1cbdc652e7 (patch) | |
| tree | ea68e544aa76d8b99aa733db8c6edda6af7dcf21 /src/hos_client.cpp | |
| parent | c7c6f5b25f02f2532e913182ecad864da59144fe (diff) | |
🐞 fix(src, gtest, example): 解决hos_delete_fd引起的多线程安全问题v2.0.2
Diffstat (limited to 'src/hos_client.cpp')
| -rw-r--r-- | src/hos_client.cpp | 151 |
1 files changed, 57 insertions, 94 deletions
diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 0e9c41eb..6c02d181 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -46,6 +46,7 @@ extern "C" struct hos_instance_s g_hos_instance; hos_client_handle_t g_hos_handle;//一个进程只允许有一个g_hos_handle static std::mutex m_client_lock; +static std::mutex m_delete_lock; hos_fd_context_t **g_fd_context; size_t *g_fd_info; //fd 实际从1开始,每个线程有独立的fd static Aws::SDKOptions g_options; @@ -57,19 +58,24 @@ static inline size_t get_current_ms() return (timenow.tv_sec * 1000 + timenow.tv_nsec / 1000 / 1000 ); } -static int hos_delete_fd(size_t thread_id, hos_fd_context_t *context) +static int hos_delete_fd(size_t fd, size_t thread_id) { + std::lock_guard<std::mutex> locker(m_delete_lock); + hos_fd_context_t* context = find_context_by_fd(g_fd_context[thread_id], fd); if (context == NULL) { return HOS_PARAMETER_ERROR; } - - if (context) + + put_finished_callback callback = (put_finished_callback)context->callback; + if (callback) { - if (context->bucket) - { - free(context->bucket); - context->bucket = NULL; + callback(context->reslut, context->bucket, context->object, context->error, context->userdata); + } + if (context->bucket) + { + free(context->bucket); + context->bucket = NULL; } if (context->object) { @@ -78,7 +84,6 @@ static int hos_delete_fd(size_t thread_id, hos_fd_context_t *context) } HASH_DEL(g_fd_context[thread_id], context); free(context); - } return HOS_CLIENT_OK; } @@ -143,8 +148,9 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, "debug: [%s:%s] upload success. stream size:%lu", a_fd_context->bucket, a_fd_context->object, stream_len); } } - put_finished_callback callback = (put_finished_callback)a_fd_context->callback; - callback(result, a_fd_context->bucket, a_fd_context->object, error, a_fd_context->userdata); + a_fd_context->reslut = result; + a_fd_context->error = error; + if (a_fd_context->mode & APPEND_MODE) { //APPEND MODE 保留fd @@ -156,7 +162,7 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd); - hos_delete_fd(thread_id, a_fd_context); + hos_delete_fd(fd, thread_id); } } } @@ -166,7 +172,7 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd); - hos_delete_fd(thread_id, a_fd_context); + hos_delete_fd(fd, thread_id); } } g_hos_handle.task_num[thread_id]--; @@ -175,7 +181,6 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, static void hos_client_create() { - std::lock_guard<std::mutex> locker(m_client_lock); hos_config_t *hos_conf = &g_hos_handle.hos_config; void *log = g_hos_handle.log; @@ -577,6 +582,7 @@ hos_instance hos_get_instance() hos_instance hos_init_instance(const char *conf_path, const char *module, size_t thread_num, const char *bucket) { + std::lock_guard<std::mutex> locker(m_client_lock); hos_config_t *hos_conf = &g_hos_handle.hos_config; char hos_url[1024]; @@ -809,7 +815,7 @@ int hos_upload_buf(const char *bucket, const char *object, const char *buf, size return hos_upload_stream(bucket, object, buf, buf_len, callback, userdata, thread_id); } -int hos_open_fd(const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id, int mode) +int hos_open_fd(const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id) { if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { @@ -826,7 +832,7 @@ int hos_open_fd(const char *bucket, const char *object, put_finished_callback ca size_t fd = ++g_fd_info[thread_id]; MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "debug: thread_id:%lu, fd:%lu", thread_id, fd); - hos_fd_context_t info = {fd, mode, (char *)bucket, (char *)object, (void *)callback, userdata, + hos_fd_context_t info = {fd, BUFF_MODE | APPEND_MODE, (char *)bucket, (char *)object, (void *)callback, userdata, NULL,/*cache*/ g_hos_handle.hos_config.cache_count, 0,/*position*/ 0,/*recive_cnt*/ (long)g_hos_handle.hos_config.cache_size,/*cache_rest*/ HOS_FD_REGISTER,/*fd_status*/}; add_fd_context(&g_fd_context[thread_id], &info); @@ -836,7 +842,6 @@ int hos_open_fd(const char *bucket, const char *object, put_finished_callback ca int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id) { - struct stat buffer; hos_fd_context_t *a_fd_context = NULL; char num[128]; int ret = 0; @@ -869,92 +874,50 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id // create and configure the asynchronous put object request. Aws::S3::Model::PutObjectRequest request; - //设置上传数据类型 - if (a_fd_context->mode & BUFF_MODE) + //field_stat2 record + if (hos_func->fs2_info.fs2_handle) { - //BUFF_MODE - - //field_stat2 record - if (hos_func->fs2_info.fs2_handle) + if (hos_func->fs2_info.reserved) { - if (hos_func->fs2_info.reserved) - { - data_info = (data_info_t *)hos_func->fs2_info.reserved; - data_info->rx_pkts[thread_id]++; - data_info->rx_bytes[thread_id] += stream_len; - } + data_info = (data_info_t *)hos_func->fs2_info.reserved; + data_info->rx_pkts[thread_id]++; + data_info->rx_bytes[thread_id] += stream_len; } - if (a_fd_context->mode & APPEND_MODE) - { - //APPEND_MODE - if (a_fd_context->cache == NULL) - { - //a_fd_context->cache = Aws::MakeShared<Aws::StringStream>("hos_write append mode"); - a_fd_context->cache = std::make_shared<Aws::StringStream>(); - } - Aws::String buffer(stream, stream_len); - *a_fd_context->cache << buffer; - a_fd_context->cache_rest -= stream_len; - if (data_info != NULL) - data_info->cache[thread_id] += stream_len; - if (a_fd_context->cache_count == 0 || --a_fd_context->cache_count) - { - //cache_count == 0,不设置cache_count的情况 - //cache_count > 0,设置cache_count的情况 - if (a_fd_context->cache_rest > 0) - { - return HOS_CLIENT_OK; - } - } - request.SetBody(a_fd_context->cache); - - // add headers - atomic_add(&(a_fd_context->position), 1); - snprintf(num, 128, "%lu", atomic_read(&(a_fd_context->position))); - Aws::Map<Aws::String, Aws::String> headers; - headers["x-hos-upload-type"] = "append"; - headers["x-hos-position"] = num; - request.SetMetadata(headers); + } - a_fd_context->cache->seekg(0, std::ios_base::end); - upload_len = a_fd_context->cache->tellg(); - a_fd_context->cache->seekg(0, std::ios_base::beg); - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: x-hos-posotion:%s", num); - } - else - { - const std::shared_ptr<Aws::IOStream> input_data = - Aws::MakeShared<Aws::StringStream>("hos_write buffer mode"); - Aws::String buffer (stream, stream_len); - *input_data << buffer; - request.SetBody(input_data); - upload_len = stream_len; - } + if (a_fd_context->cache == NULL) + { + //a_fd_context->cache = Aws::MakeShared<Aws::StringStream>("hos_write append mode"); + a_fd_context->cache = std::make_shared<Aws::StringStream>(); } - else + Aws::String buffer(stream, stream_len); + *a_fd_context->cache << buffer; + a_fd_context->cache_rest -= stream_len; + if (data_info != NULL) + data_info->cache[thread_id] += stream_len; + if (a_fd_context->cache_count == 0 || --a_fd_context->cache_count) { - if (stat(stream, &buffer) == -1) + //cache_count == 0,不设置cache_count的情况 + //cache_count > 0,设置cache_count的情况 + if (a_fd_context->cache_rest > 0) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: The file:%s not exist", stream); - return HOS_FILE_NOT_EXIST; - } - //文件类型 - const std::shared_ptr<Aws::IOStream> input_data = - Aws::MakeShared<Aws::FStream>("hos_write file mode", stream, std::ios_base::in | std::ios_base::binary); - request.SetBody(input_data); - upload_len = buffer.st_size; - //field_stat2 record - if (hos_func->fs2_info.fs2_handle) - { - if (hos_func->fs2_info.reserved) - { - data_info = (data_info_t *)hos_func->fs2_info.reserved; - data_info->rx_pkts[thread_id]++; - data_info->rx_bytes[thread_id] += upload_len; - } + return HOS_CLIENT_OK; } } + request.SetBody(a_fd_context->cache); + + // add headers + atomic_add(&(a_fd_context->position), 1); + snprintf(num, 128, "%lu", atomic_read(&(a_fd_context->position))); + Aws::Map<Aws::String, Aws::String> headers; + headers["x-hos-upload-type"] = "append"; + headers["x-hos-position"] = num; + request.SetMetadata(headers); + a_fd_context->cache->seekg(0, std::ios_base::end); + upload_len = a_fd_context->cache->tellg(); + a_fd_context->cache->seekg(0, std::ios_base::beg); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: x-hos-posotion:%s", num); request.SetBucket(a_fd_context->bucket); request.SetKey(a_fd_context->object); @@ -1053,7 +1016,7 @@ int hos_close_fd(size_t fd, size_t thread_id) if (hos_conf->pool_thread_size == 0) { //同步模式,立即释放fd - hos_delete_fd(thread_id, a_fd_context); + hos_delete_fd(fd, thread_id); } else { @@ -1064,7 +1027,7 @@ int hos_close_fd(size_t fd, size_t thread_id) MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd); - hos_delete_fd(thread_id, a_fd_context); + hos_delete_fd(fd, thread_id); } } |
