diff options
Diffstat (limited to 'src/hos_client.cpp')
| -rw-r--r-- | src/hos_client.cpp | 109 |
1 files changed, 51 insertions, 58 deletions
diff --git a/src/hos_client.cpp b/src/hos_client.cpp index fb523b8c..3029af92 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -48,8 +48,6 @@ hos_client_handle_t g_hos_handle;//一个进程只允许有一个g_hos_handle static std::mutex m_client_lock; static std::mutex m_instance_lock; static std::mutex m_delete_lock; -hos_fd_context_t **g_fd_context; -size_t *g_fd_info; //fd 实际从1开始,每个线程有独立的fd static Aws::SDKOptions g_options; static inline size_t get_current_ms() @@ -62,7 +60,7 @@ static inline size_t get_current_ms() static int hos_delete_fd(size_t fd, size_t thread_id) { std::lock_guard<std::mutex> locker(m_delete_lock); - hos_fd_context_t* context = find_context_by_fd(g_fd_context[thread_id], fd); + hos_fd_context_t* context = (hos_fd_context_t *)fd; if (context == NULL) { return HOS_PARAMETER_ERROR; @@ -83,7 +81,6 @@ static int hos_delete_fd(size_t fd, size_t thread_id) free(context->object); context->object = NULL; } - HASH_DEL(g_fd_context[thread_id], context); free(context); return HOS_CLIENT_OK; @@ -102,7 +99,7 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, size_t thread_id, fd, stream_len; sscanf(uuid.c_str(), "%lu %lu %lu", &thread_id, &fd, &stream_len); - a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd); + a_fd_context = (hos_fd_context_t *)fd; if (a_fd_context == NULL) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, @@ -156,13 +153,13 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, { //APPEND MODE 保留fd atomic_add(&(a_fd_context->recive_cnt), 1); - if (a_fd_context->fd_status == HOS_FD_INJECT) + if (a_fd_context->fd_status == HOS_FD_CANCEL) { if (a_fd_context->position == a_fd_context->recive_cnt) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", - a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd); + a_fd_context->bucket, a_fd_context->object, thread_id, fd); hos_delete_fd(fd, thread_id); } } @@ -172,7 +169,7 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, //完整上传 删除fd MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", - a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd); + a_fd_context->bucket, a_fd_context->object, thread_id, fd); hos_delete_fd(fd, thread_id); } } @@ -239,9 +236,6 @@ static void hos_client_create() g_hos_handle.task_num = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); g_hos_handle.task_context = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); - g_fd_context = (hos_fd_context_t **)calloc(hos_conf->thread_num, sizeof(hos_fd_context_t *)); - g_fd_info = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); - #if 0 if (g_hos_handle.hos_func.fd_thread == 0) { @@ -279,6 +273,10 @@ bool hos_verify_bucket(const char *bucket) MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_verify_bucket","debug: bucket:%s exits", bucket); return true; } + else + { + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_verify_bucket","debug: Get bucket list:%s", new_bucket.GetName().c_str()); + } } } else @@ -477,19 +475,20 @@ static void hos_expand_fs2() return ; } -static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t stream_len, - size_t thread_id, size_t fd, const char *bucket, const char *object) +static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t stream_len, size_t thread_id, hos_fd_context_t **fd) { char buf[128]; int ret = 0; hos_func_thread_t *hos_func = &g_hos_handle.hos_func; data_info_t *data_info = NULL; hos_config_t *hos_conf = &g_hos_handle.hos_config; + char *bucket = (*fd)->bucket; + char *object = (*fd)->object; //设置回调函数 std::shared_ptr<Aws::Client::AsyncCallerContext> context = Aws::MakeShared<Aws::Client::AsyncCallerContext>(""); - sprintf(buf, "%lu %lu %lu", thread_id, fd, stream_len); + sprintf(buf, "%lu %lu %lu", thread_id, (long)*fd, stream_len); context->SetUUID(buf); if (hos_conf->max_request_num && hos_conf->max_request_context && @@ -544,11 +543,12 @@ static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t } } -static int hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t stream_len, size_t thread_id, size_t fd, - const char *bucket, const char *object) +static int hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t stream_len, size_t thread_id, hos_fd_context_t **fd) { hos_func_thread_t *hos_func = &g_hos_handle.hos_func; data_info_t *data_info = NULL; + char *bucket = (*fd)->bucket; + char *object = (*fd)->object; auto& S3Client = *(g_hos_handle.S3Client); Aws::S3::Model::PutObjectOutcome Outcome = S3Client.PutObject(request); @@ -748,7 +748,7 @@ static int hos_upload_stream(const char *bucket, const char *object, const char request.SetKey(object); //设置上传数据类型 - if (mode == 0) + if (mode == FILE_MODE) { //文件类型 const std::shared_ptr<Aws::IOStream> input_data = @@ -776,18 +776,22 @@ static int hos_upload_stream(const char *bucket, const char *object, const char } //设置回调函数 - size_t fd = ++g_fd_info[thread_id]; - - hos_fd_context_t info = {fd, 0, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, 0, 0, 0 }; - add_fd_context(&g_fd_context[thread_id], &info); + hos_fd_context_t *hos_fd = (hos_fd_context_t *)calloc(1, sizeof(hos_fd_context_t)); + hos_fd->mode = mode; + hos_fd->bucket = (char *)malloc(strlen(bucket) + 1); + memcpy(hos_fd->bucket, bucket, strlen(bucket) + 1); + hos_fd->object = (char *)malloc(strlen(object) + 1); + memcpy(hos_fd->object, object, strlen(object) + 1); + hos_fd->callback = (void *)callback; + hos_fd->userdata = userdata; if (hos_conf->pool_thread_size > 0) { - ret = hos_putobject_async(request, data_len, thread_id, fd, bucket, object); + ret = hos_putobject_async(request, data_len, thread_id, &hos_fd); } else { - ret = hos_putobject_sync(request, data_len, thread_id, fd, bucket, object); + ret = hos_putobject_sync(request, data_len, thread_id, &hos_fd); } return ret; @@ -835,7 +839,7 @@ int hos_upload_buf(const char *bucket, const char *object, const char *buf, size return hos_upload_stream(bucket, object, buf, buf_len, callback, userdata, thread_id); } -int hos_open_fd(const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id) +long hos_open_fd(const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id) { if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { @@ -849,15 +853,22 @@ int hos_open_fd(const char *bucket, const char *object, put_finished_callback ca return HOS_PARAMETER_ERROR; } - size_t fd = ++g_fd_info[thread_id]; - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "debug: thread_id:%lu, fd:%lu", thread_id, fd); - - hos_fd_context_t info = {fd, BUFF_MODE | APPEND_MODE, (char *)bucket, (char *)object, (void *)callback, userdata, - NULL,/*cache*/ g_hos_handle.hos_config.cache_count, 0,/*position*/ 0,/*recive_cnt*/ - (long)g_hos_handle.hos_config.cache_size,/*cache_rest*/ HOS_FD_REGISTER,/*fd_status*/}; - add_fd_context(&g_fd_context[thread_id], &info); + hos_fd_context_t *hos_fd = (hos_fd_context_t *)calloc(1, sizeof(hos_fd_context_t)); + hos_fd->mode = BUFF_MODE | APPEND_MODE; + hos_fd->bucket = (char *)malloc(strlen(bucket) + 1); + memcpy(hos_fd->bucket, bucket, strlen(bucket) + 1); + hos_fd->object = (char *)malloc(strlen(object) + 1); + memcpy(hos_fd->object, object, strlen(object) + 1); + hos_fd->callback = (void *)callback; + hos_fd->userdata = userdata; + hos_fd->cache_count = g_hos_handle.hos_config.cache_count; + hos_fd->cache_rest = g_hos_handle.hos_config.cache_size; + hos_fd->fd_status = HOS_FD_REGISTER; + hos_fd->reslut = true; + + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "debug: thread_id:%lu, fd:%lu", thread_id, (long)&hos_fd); - return fd; + return (long)hos_fd; } int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id) @@ -883,7 +894,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id return HOS_PARAMETER_ERROR; } - a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd); + a_fd_context = (hos_fd_context_t *)fd; if (a_fd_context == NULL) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: fd info not find. thread_id:%lu, fd:%lu", thread_id, fd); @@ -943,11 +954,11 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id if (hos_conf->pool_thread_size > 0) { - ret = hos_putobject_async(request, upload_len, thread_id, fd, a_fd_context->bucket, a_fd_context->object); + ret = hos_putobject_async(request, upload_len, thread_id, &a_fd_context); } else { - ret = hos_putobject_sync(request, upload_len, thread_id, fd, a_fd_context->bucket, a_fd_context->object); + ret = hos_putobject_sync(request, upload_len, thread_id, &a_fd_context); } //恢复fd 的cache设置 @@ -983,7 +994,7 @@ int hos_close_fd(size_t fd, size_t thread_id) fd, thread_id, hos_conf->thread_num); return HOS_PARAMETER_ERROR; } - a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd); + a_fd_context = (hos_fd_context_t *)fd; if (a_fd_context == NULL) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, @@ -1016,18 +1027,18 @@ int hos_close_fd(size_t fd, size_t thread_id) if (hos_conf->pool_thread_size > 0) { - hos_putobject_async(request, upload_len, thread_id, fd, a_fd_context->bucket, a_fd_context->object); + hos_putobject_async(request, upload_len, thread_id, &a_fd_context); } else { - hos_putobject_sync(request, upload_len, thread_id, fd, a_fd_context->bucket, a_fd_context->object); + hos_putobject_sync(request, upload_len, thread_id, &a_fd_context); } data_info_t *data_info = (data_info_t *)(g_hos_handle.hos_func.fs2_info.reserved); if (data_info) data_info->cache[thread_id] = 0; } } - a_fd_context->fd_status = HOS_FD_INJECT; + a_fd_context->fd_status = HOS_FD_CANCEL; a_fd_context->cache.reset(); a_fd_context->cache = NULL; a_fd_context->cache_rest = hos_conf->cache_size; @@ -1046,7 +1057,7 @@ int hos_close_fd(size_t fd, size_t thread_id) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", - a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd); + a_fd_context->bucket, a_fd_context->object, thread_id, fd); hos_delete_fd(fd, thread_id); } } @@ -1058,7 +1069,6 @@ int hos_shutdown_instance() { std::lock_guard<std::mutex> locker(m_instance_lock); size_t i = 0; - hos_config_t *hos_conf = &g_hos_handle.hos_config; hos_func_thread_t *hos_func = &g_hos_handle.hos_func; size_t task_num = 0; @@ -1153,23 +1163,6 @@ int hos_shutdown_instance() } MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: delete s3client."); - if (g_fd_info) - { - free(g_fd_info); - g_fd_info = NULL; - } - - for (i = 0; i < hos_conf->thread_num; i++) - { - delete_all(&g_fd_context[i]); - } - - if (g_fd_context) - { - free(g_fd_context); - g_fd_context = NULL; - } - Aws::ShutdownAPI(g_options); MESA_destroy_runtime_log_handle(g_hos_handle.log); g_hos_handle.log = NULL; |
