diff options
Diffstat (limited to 'src/hos_client.cpp')
| -rw-r--r-- | src/hos_client.cpp | 625 |
1 files changed, 250 insertions, 375 deletions
diff --git a/src/hos_client.cpp b/src/hos_client.cpp index a589b26c..c3a9941e 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -43,12 +43,14 @@ extern "C" } while (0) #endif -struct hos_instance_s g_hos_instance; +#define FILE_MODE 0 +#define BUFF_MODE 1 +#define APPEND_MODE 2 + +struct hos_info_s g_hos_instance; hos_client_handle_t g_hos_handle;//一个进程只允许有一个g_hos_handle static std::mutex m_client_lock; static std::mutex m_delete_lock; -hos_fd_context_t **g_fd_context; -size_t *g_fd_info; //fd 实际从1开始,每个线程有独立的fd static Aws::SDKOptions g_options; static inline size_t get_current_ms() @@ -58,34 +60,31 @@ static inline size_t get_current_ms() return (timenow.tv_sec * 1000 + timenow.tv_nsec / 1000 / 1000 ); } -static int hos_delete_fd(size_t fd, size_t thread_id) +static void hos_destroy_handle(hos_handle handle) { std::lock_guard<std::mutex> locker(m_delete_lock); - hos_fd_context_t* context = find_context_by_fd(g_fd_context[thread_id], fd); - if (context == NULL) + if (handle == NULL) { - return HOS_PARAMETER_ERROR; + return; } - - put_finished_callback callback = (put_finished_callback)context->callback; + put_finished_callback callback = (put_finished_callback)handle->callback; if (callback) { - callback(context->reslut, context->bucket, context->object, context->error, context->userdata); + callback(handle->reslut, handle->bucket, handle->object, handle->error, handle->userdata); } - if (context->bucket) + if (handle->bucket) { - free(context->bucket); - context->bucket = NULL; - } - if (context->object) - { - free(context->object); - context->object = NULL; - } - HASH_DEL(g_fd_context[thread_id], context); - free(context); - - return HOS_CLIENT_OK; + free(handle->bucket); + handle->bucket = NULL; + } + if (handle->object) + { + free(handle->object); + handle->object = NULL; + } + free(handle); + + return ; } static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, @@ -94,92 +93,77 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context) { const char *error = NULL; - hos_fd_context_t *a_fd_context = NULL; hos_func_thread_t *hos_func = &g_hos_handle.hos_func; data_info_t *data_info = NULL; const Aws::String& uuid = context->GetUUID(); - size_t thread_id, fd, stream_len; + size_t handle_addr = 0, stream_len = 0; + hos_handle_t *handle = NULL; + + sscanf(uuid.c_str(), "%lu %lu", &handle_addr, &stream_len); + handle = (hos_handle_t *)handle_addr; - sscanf(uuid.c_str(), "%lu %lu %lu", &thread_id, &fd, &stream_len); - a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd); - if (a_fd_context == NULL) + bool result = outcome.IsSuccess(); + if (!result) { + error = outcome.GetError().GetMessage().c_str(); MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, - "error: Not find the info of [thread_id:%lu fd:%lu]", thread_id, fd); + "error: [%s:%s] upload failed. error:%s", handle->bucket, handle->object, error); - if (hos_func->fs2_info.fs2_handle && hos_func->fs2_info.reserved) + if (hos_func->fs2_info.fs2_handle) { - data_info = (data_info_t *)hos_func->fs2_info.reserved; - data_info->tx_failed_pkts[thread_id]++; - data_info->tx_failed_bytes[thread_id] += stream_len; + data_info = (data_info_t *)&hos_func->fs2_info.data_info; + atomic_add(&data_info->tx_failed_pkts, 1); + atomic_add(&data_info->tx_failed_bytes, stream_len); } } else { - bool result = outcome.IsSuccess(); - if (!result) + if (hos_func->fs2_info.fs2_handle) { - error = outcome.GetError().GetMessage().c_str(); - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, - "error: [%s:%s] upload failed. error:%s", a_fd_context->bucket, a_fd_context->object, error); - - if (hos_func->fs2_info.fs2_handle && hos_func->fs2_info.reserved) - { - data_info = (data_info_t *)hos_func->fs2_info.reserved; - data_info->tx_failed_pkts[thread_id]++; - data_info->tx_failed_bytes[thread_id] += stream_len; - } + data_info = (data_info_t *)&hos_func->fs2_info.data_info; + atomic_add(&data_info->tx_pkts, 1); + atomic_add(&data_info->tx_bytes, stream_len); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: [%s:%s] upload success. tx_pkts:%lu, tx_bytes:%lu", + handle->bucket, handle->object, + atomic_read(&data_info->tx_pkts), atomic_read(&data_info->tx_bytes)); } else { - if (hos_func->fs2_info.fs2_handle && hos_func->fs2_info.reserved) - { - data_info = (data_info_t *)hos_func->fs2_info.reserved; - data_info->tx_pkts[thread_id]++; - data_info->tx_bytes[thread_id] += stream_len; - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: [%s:%s] upload success. tx_pkts:%lu, tx_bytes:%lu", - a_fd_context->bucket, a_fd_context->object, - data_info->tx_pkts[thread_id], data_info->tx_bytes[thread_id]); - } - else - { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: [%s:%s] upload success. stream size:%lu", a_fd_context->bucket, a_fd_context->object, stream_len); - } + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: [%s:%s] upload success. stream size:%lu", handle->bucket, handle->object, stream_len); } - a_fd_context->reslut = result; - a_fd_context->error = error; + } + handle->reslut = result; + handle->error = error; - if (a_fd_context->mode & APPEND_MODE) + if (handle->mode & APPEND_MODE) + { + //APPEND MODE 保留fd + atomic_add(&(handle->recive_cnt), 1); + if (handle->handle_status == HOS_HANDLE_CANCEL) { - //APPEND MODE 保留fd - atomic_add(&(a_fd_context->recive_cnt), 1); - if (a_fd_context->fd_status == HOS_FD_INJECT) + if (handle->position == handle->recive_cnt) { - if (a_fd_context->position == a_fd_context->recive_cnt) - { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", - a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd); - hos_delete_fd(fd, thread_id); - } + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: [%s:%s] upload completed. delete", + handle->bucket, handle->object); + hos_destroy_handle(handle); } } - else - { - //完整上传 删除fd - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", - a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd); - hos_delete_fd(fd, thread_id); - } } - atomic_sub(&g_hos_handle.task_num[thread_id], 1); - atomic_sub(&g_hos_handle.task_context[thread_id], stream_len); + else + { + //完整上传 删除fd + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: [%s:%s] upload completed. handle destory", handle->bucket, handle->object); + hos_destroy_handle(handle); + } + atomic_sub(&g_hos_handle.task_num, 1); + atomic_sub(&g_hos_handle.task_context, stream_len); } -static void hos_client_create() +static bool hos_client_create() { hos_config_t *hos_conf = &g_hos_handle.hos_config; void *log = g_hos_handle.log; @@ -187,8 +171,7 @@ static void hos_client_create() if (g_hos_handle.S3Client != NULL) { g_hos_handle.count++; - g_hos_instance.result = true; - return ; + return true; } Aws::InitAPI(g_options); @@ -227,30 +210,18 @@ static void hos_client_create() Aws::ShutdownAPI(g_options); g_hos_instance.error_code = (size_t)outcome.GetError().GetErrorType() + 1; snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, outcome.GetError().GetMessage().c_str()); - g_hos_instance.result = false; MESA_HANDLE_RUNTIME_LOG(log, RLOG_LV_FATAL, "hos_client_create", "error: %s", g_hos_instance.error_message); - return; + return false; } g_hos_handle.buckets = outcome.GetResult().GetBuckets(); g_hos_handle.count++; - g_hos_handle.executor = std::dynamic_pointer_cast<Aws::Utils::Threading::PooledThreadExecutor>(config.executor); - g_hos_handle.task_num = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); - g_hos_handle.task_context = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); - - g_fd_context = (hos_fd_context_t **)calloc(hos_conf->thread_num, sizeof(hos_fd_context_t *)); - g_fd_info = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); - - #if 0 - if (g_hos_handle.hos_func.fd_thread == 0) - { - g_hos_handle.hos_func.fd_thread_status = 0; - pthread_create(&g_hos_handle.hos_func.fd_thread, NULL, hos_fd_manage, NULL); - } - #endif + //g_hos_handle.executor = std::dynamic_pointer_cast<Aws::Utils::Threading::PooledThreadExecutor>(config.executor); + g_hos_handle.task_num = 0; + g_hos_handle.task_context = 0; MESA_HANDLE_RUNTIME_LOG(log, RLOG_LV_DEBUG, "hos_client_create", "debug: hos s3client create success, url:%s.",endpoint); - g_hos_instance.result = true; + return true; } bool hos_verify_bucket(const char *bucket) @@ -261,7 +232,7 @@ bool hos_verify_bucket(const char *bucket) "debug: bucket is null"); return false; } - if (g_hos_instance.result != true || g_hos_handle.S3Client == NULL) + if (g_hos_handle.S3Client == NULL) { return false; } @@ -289,7 +260,6 @@ bool hos_verify_bucket(const char *bucket) static void *fs2_statistics(void *ptr) { - size_t i = 0; size_t rx_pkts_sum = 0; size_t rx_bytes_sum = 0; size_t tx_pkts_sum = 0; @@ -315,9 +285,7 @@ static void *fs2_statistics(void *ptr) size_t cache_last = 0; size_t req_overflow_last = 0; fs2_info_t *fs2_info = NULL; - hos_config_t *hos_conf = &g_hos_handle.hos_config; hos_func_thread_t *hos_func = &g_hos_handle.hos_func; - size_t task_num = 0; while(1) { @@ -336,20 +304,14 @@ static void *fs2_statistics(void *ptr) cache_sum = 0; fs2_info = &hos_func->fs2_info; - data_info_t *data_info = (data_info_t *)fs2_info->reserved; - for (i = 0; i < hos_conf->thread_num; i++) - { - rx_pkts_sum += data_info->rx_pkts[i]; - rx_bytes_sum += data_info->rx_bytes[i]; - tx_pkts_sum += data_info->tx_pkts[i]; - tx_bytes_sum += data_info->tx_bytes[i]; - tx_failed_bytes_sum += data_info->tx_failed_bytes[i]; - tx_failed_pkts_sum += data_info->tx_failed_pkts[i]; - cache_sum += data_info->cache[i]; - - task_num += atomic_read(&g_hos_handle.task_num[i]); - } - req_overflow_sum = atomic_read(&data_info->tx_req_num_overflow); + data_info_t *data_info = (data_info_t *)&fs2_info->data_info; + rx_pkts_sum += atomic_read(&data_info->rx_pkts); + rx_bytes_sum += atomic_read(&data_info->rx_bytes); + tx_pkts_sum += atomic_read(&data_info->tx_pkts); + tx_bytes_sum += atomic_read(&data_info->tx_bytes); + tx_failed_bytes_sum += atomic_read(&data_info->tx_failed_bytes); + tx_failed_pkts_sum += atomic_read(&data_info->tx_failed_pkts); + cache_sum += atomic_read(&data_info->cache); rx_pkts_interval = rx_pkts_sum - rx_pkts_last; rx_bytes_interval = rx_bytes_sum - rx_bytes_last; @@ -433,7 +395,6 @@ static void hos_expand_fs2() { fs2_info_t *fs2_info = NULL; screen_stat_handle_t fs2_handle = NULL; - hos_config_t *hos_conf = &g_hos_handle.hos_config; hos_func_thread_t *hos_func = &g_hos_handle.hos_func; size_t i = 0; @@ -460,15 +421,6 @@ static void hos_expand_fs2() fs2_info->line_ids[1] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, "total"); hos_func->fs2_status = HOS_FS2_START; - data_info_t *data_info = (data_info_t *)calloc(1, sizeof(data_info_t)); - fs2_info->reserved = (void *)data_info; - data_info->tx_pkts = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); - data_info->tx_bytes = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); - data_info->rx_pkts = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); - data_info->rx_bytes = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); - data_info->tx_failed_pkts = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); - data_info->tx_failed_bytes = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); - data_info->cache = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); FS_start(fs2_handle); pthread_create(&hos_func->fs2_thread, NULL, fs2_statistics, NULL); @@ -476,8 +428,7 @@ static void hos_expand_fs2() return ; } -static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t stream_len, - size_t thread_id, size_t fd, const char *bucket, const char *object) +static int hos_putobject_async(hos_handle_t *handle, Aws::S3::Model::PutObjectRequest& request, size_t stream_len) { char buf[128]; int ret = 0; @@ -488,26 +439,22 @@ static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t //设置回调函数 std::shared_ptr<Aws::Client::AsyncCallerContext> context = Aws::MakeShared<Aws::Client::AsyncCallerContext>(""); - sprintf(buf, "%lu %lu %lu", thread_id, fd, stream_len); + sprintf(buf, "%lu %lu", (size_t)handle, stream_len); context->SetUUID(buf); if (hos_conf->max_request_num && hos_conf->max_request_context && - (atomic_read(&g_hos_handle.task_num[thread_id]) >= hos_conf->max_request_num || - atomic_read(&g_hos_handle.task_context[thread_id]) >= hos_conf->max_request_context)) + (atomic_read(&g_hos_handle.task_num) >= hos_conf->max_request_num || + atomic_read(&g_hos_handle.task_context) >= hos_conf->max_request_context)) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: PutObjectAsync failed. [%s:%s]. task_num:%lu, task_context:%lu", - bucket, object, atomic_read(&g_hos_handle.task_num[thread_id]), atomic_read(&g_hos_handle.task_context[thread_id])); - + handle->bucket, handle->object, g_hos_handle.task_num, g_hos_handle.task_context); if (hos_func->fs2_info.fs2_handle) { - if (hos_func->fs2_info.reserved) - { - data_info = (data_info_t *)hos_func->fs2_info.reserved; - data_info->tx_failed_pkts[thread_id]++; - data_info->tx_failed_bytes[thread_id] += stream_len; + data_info = (data_info_t *)&(hos_func->fs2_info.data_info); + atomic_add(&data_info->tx_failed_pkts, 1); + atomic_add(&data_info->tx_failed_bytes, stream_len); atomic_add(&data_info->tx_req_num_overflow, 1); - } } return HOS_SEND_FAILED; @@ -517,34 +464,30 @@ static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t ret = S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); if (ret) { - atomic_add(&g_hos_handle.task_num[thread_id], 1); - atomic_add(&g_hos_handle.task_context[thread_id], stream_len); + atomic_add(&g_hos_handle.task_num, 1); + atomic_add(&g_hos_handle.task_context, stream_len); //不算真正成功,需要等到PutObjectAsyncFinished的结果 MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: PutObjectAsync success. [%s:%s]", bucket, object); + "debug: PutObjectAsync success. [%s:%s]", handle->bucket, handle->object); return HOS_CLIENT_OK; } else { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: PutObjectAsync failed. [%s:%s]", bucket, object); + "debug: PutObjectAsync failed. [%s:%s]", handle->bucket, handle->object); if (hos_func->fs2_info.fs2_handle) { - if (hos_func->fs2_info.reserved) - { - data_info = (data_info_t *)hos_func->fs2_info.reserved; - data_info->tx_failed_pkts[thread_id]++; - data_info->tx_failed_bytes[thread_id] += stream_len; - } + data_info = (data_info_t *)&hos_func->fs2_info.data_info; + atomic_add(&data_info->tx_failed_pkts, 1); + atomic_add(&data_info->tx_failed_bytes, stream_len); } return HOS_SEND_FAILED; } } -static int hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t stream_len, size_t thread_id, size_t fd, - const char *bucket, const char *object) +static int hos_putobject_sync(hos_handle_t *handle, Aws::S3::Model::PutObjectRequest& request, size_t stream_len) { hos_func_thread_t *hos_func = &g_hos_handle.hos_func; data_info_t *data_info = NULL; @@ -553,19 +496,19 @@ static int hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t Aws::S3::Model::PutObjectOutcome Outcome = S3Client.PutObject(request); if (Outcome.IsSuccess()) { - if (hos_func->fs2_info.fs2_handle && hos_func->fs2_info.reserved) + if (hos_func->fs2_info.fs2_handle) { - data_info = (data_info_t *)hos_func->fs2_info.reserved; - data_info->tx_pkts[thread_id]++; - data_info->tx_bytes[thread_id] += stream_len; + data_info = (data_info_t *)&hos_func->fs2_info.data_info; + atomic_add(&data_info->tx_pkts, 1); + atomic_add(&data_info->tx_bytes, stream_len); MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: PutObject success. [%s:%s] tx_pkts:%lu, tx_bytes:%lu", - bucket, object, data_info->tx_pkts[thread_id], data_info->tx_bytes[thread_id]); + handle->bucket, handle->object, atomic_read(&data_info->tx_pkts), atomic_read(&data_info->tx_bytes)); } else { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: PutObject success. [%s:%s]", bucket, object); + "debug: PutObject success. [%s:%s]", handle->bucket, handle->object); } return HOS_CLIENT_OK; @@ -573,45 +516,57 @@ static int hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t else { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: PutObject failed. [%s:%s] cause:%s", bucket, object, Outcome.GetError().GetMessage().c_str()); + "debug: PutObject failed. [%s:%s] cause:%s", handle->bucket, handle->object, Outcome.GetError().GetMessage().c_str()); - if (hos_func->fs2_info.fs2_handle && hos_func->fs2_info.reserved) + if (hos_func->fs2_info.fs2_handle) { - data_info = (data_info_t *)hos_func->fs2_info.reserved; - data_info->tx_failed_pkts[thread_id]++; - data_info->tx_failed_bytes[thread_id] += stream_len; + data_info = (data_info_t *)&hos_func->fs2_info.data_info; + atomic_add(&data_info->tx_failed_pkts, 1); + atomic_add(&data_info->tx_failed_bytes, stream_len); } return (int)Outcome.GetError().GetErrorType() + 1; } } -hos_instance hos_get_instance() +bool hos_verify_instance() { if (g_hos_handle.S3Client != NULL) { g_hos_handle.count++; - g_hos_instance.result = true; - return &g_hos_instance; + return true; } memset(&g_hos_instance, 0, sizeof(g_hos_instance)); - g_hos_instance.result = false; - return &g_hos_instance; + return false; +} + +int hos_get_init_instance_errcode() +{ + return g_hos_instance.error_code; +} + +const char *hos_get_init_instance_errmsg() +{ + return g_hos_instance.error_message; +} + +const char *hos_get_url_prefix() +{ + return g_hos_instance.hos_url_prefix; } -hos_instance hos_init_instance(const char *conf_path, const char *module, size_t thread_num, const char *bucket) +bool hos_init_instance(const char *conf_path, const char *module, const char *bucket) { std::lock_guard<std::mutex> locker(m_client_lock); hos_config_t *hos_conf = &g_hos_handle.hos_config; char hos_url[1024]; - if (conf_path == NULL || thread_num == 0 || module == NULL || bucket == NULL) + if (conf_path == NULL || module == NULL || bucket == NULL) { - g_hos_instance.result = false; g_hos_instance.error_code = HOS_PARAMETER_ERROR; snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, - "param error:conf_path:%s, module:%s, thread_num:%lu, bucket:%s", conf_path, module, thread_num, bucket); - return &g_hos_instance; + "param error:conf_path:%s, module:%s, bucket:%s", conf_path, module, bucket); + return false; } MESA_load_profile_string_nodef(conf_path, module, "hos_serverip", hos_conf->ip, MAX_HOS_STRING_LEN); @@ -634,25 +589,22 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t g_hos_handle.log = MESA_create_runtime_log_handle(hos_conf->log_path, hos_conf->log_level); if (g_hos_handle.log == NULL) { - g_hos_instance.result = false; g_hos_instance.error_code = HOS_RUNTIME_LOG_FAILED; snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "runtime log create failed."); - return &g_hos_instance; + return false; } snprintf(hos_url, sizeof(hos_url), "http://%s:%d/hos/", hos_conf->ip, hos_conf->port); - hos_conf->thread_num = thread_num; - hos_client_create(); - if (g_hos_instance.result == true) + if (hos_client_create()) { + //XXX 是否需要验证bucket if(hos_verify_bucket(bucket) == false) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: bucket:%s not exist.", bucket); hos_shutdown_instance(); - g_hos_instance.result = false; g_hos_instance.error_code = HOS_BUCKET_NOT_EXIST; snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "bucket:%s not exits.", bucket); - return &g_hos_instance; + return false; } MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug:%s","Instance init completed"); hos_expand_fs2(); @@ -660,22 +612,25 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t g_hos_instance.error_message[0]='\0'; g_hos_instance.hos_url_prefix = (const char *)calloc(1, strlen(hos_url) + 1); memcpy((void *)g_hos_instance.hos_url_prefix, hos_url, strlen(hos_url)); + return true; + } + else + { + return false; } - return &g_hos_instance; } else { - g_hos_instance.result = false; g_hos_instance.error_code = HOS_CONF_ERROR; snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "hos param error:hos ip:%s, hos port:%u, accesskeyid:%s, secretkey:%s", hos_conf->ip, hos_conf->port, hos_conf->accesskeyid, hos_conf->secretkey); - return &g_hos_instance; + return false; } } int hos_create_bucket(const char *bucket) { - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (g_hos_handle.S3Client == NULL) { return HOS_INSTANCE_NOT_INIT; } @@ -718,7 +673,7 @@ int hos_create_bucket(const char *bucket) } static int hos_upload_stream(const char *bucket, const char *object, const char *data, size_t data_len, - put_finished_callback callback, void *userdata, size_t thread_id) + put_finished_callback callback, void *userdata) { data_info_t *data_info = NULL; hos_config_t *hos_conf = &g_hos_handle.hos_config; @@ -726,16 +681,16 @@ static int hos_upload_stream(const char *bucket, const char *object, const char int ret; int mode = 0; - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (g_hos_handle.S3Client == NULL) { return HOS_INSTANCE_NOT_INIT; } - if ((bucket == NULL) || (object == NULL) || (thread_id > hos_conf->thread_num)) + if ((bucket == NULL) || (object == NULL)) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_stream", - "error: s3client:%s, bucket:%s, object:%s, thread_id:%lu, thread_num:%u", - g_hos_handle.S3Client?"not null":"null", bucket, object, thread_id, hos_conf->thread_num); + "error: s3client:%s, bucket:%s, object:%s", + g_hos_handle.S3Client?"not null":"null", bucket, object); return HOS_PARAMETER_ERROR; } @@ -766,102 +721,99 @@ static int hos_upload_stream(const char *bucket, const char *object, const char //field_stat2 record if (hos_func->fs2_info.fs2_handle) { - if (hos_func->fs2_info.reserved) - { - data_info = (data_info_t *)hos_func->fs2_info.reserved; - data_info->rx_pkts[thread_id]++; - data_info->rx_bytes[thread_id] += data_len; - } + data_info = (data_info_t *)&hos_func->fs2_info.data_info; + atomic_add(&data_info->rx_pkts, 1); + atomic_add(&data_info->rx_bytes, data_len); } //设置回调函数 - size_t fd = ++g_fd_info[thread_id]; - - hos_fd_context_t info = {fd, 0, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, 0, 0, 0 }; - add_fd_context(&g_fd_context[thread_id], &info); + hos_handle_t *handle = (hos_handle_t *)calloc(1, sizeof(hos_handle_t)); + handle->bucket = (char *)calloc(1, strlen(bucket)+1); + memcpy(handle->bucket, bucket, strlen(bucket)); + handle->object = (char *)calloc(1, strlen(object)+1); + memcpy(handle->object, object, strlen(object)); + handle->callback = callback; + handle->userdata = userdata; if (hos_conf->pool_thread_size > 0) { - ret = hos_putobject_async(request, data_len, thread_id, fd, bucket, object); + ret = hos_putobject_async(handle, request, data_len); } else { - ret = hos_putobject_sync(request, data_len, thread_id, fd, bucket, object); + ret = hos_putobject_sync(handle, request, data_len); } return ret; } -int hos_upload_file(const char *bucket, const char *file_path, put_finished_callback callback, void *userdata, size_t thread_id) +int hos_upload_file(const char *bucket, const char *file_path, put_finished_callback callback, void *userdata) { struct stat buffer; - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (g_hos_handle.S3Client == NULL) { return HOS_INSTANCE_NOT_INIT; } - if ((bucket == NULL) || (file_path == NULL) || (thread_id > g_hos_handle.hos_config.thread_num)) + if ((bucket == NULL) || (file_path == NULL)) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_file", - "error: bucket:%s, file_path:%s, thread_id:%lu, thread_num:%u", - bucket, file_path, thread_id, g_hos_handle.hos_config.thread_num); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, + "error: bucket:%s, file_path:%s", bucket, file_path); return HOS_PARAMETER_ERROR; } if (stat(file_path, &buffer) == -1) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_file", "error: The file:%s not exist", file_path); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: The file:%s not exist", file_path); return HOS_FILE_NOT_EXIST; } - return hos_upload_stream(bucket, file_path, NULL, buffer.st_size, callback, userdata, thread_id); + return hos_upload_stream(bucket, file_path, NULL, buffer.st_size, callback, userdata); } -int hos_upload_buf(const char *bucket, const char *object, const char *buf, size_t buf_len, put_finished_callback callback, void *userdata, size_t thread_id) +int hos_upload_buf(const char *bucket, const char *object, const char *buf, size_t buf_len, put_finished_callback callback, void *userdata) { - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (g_hos_handle.S3Client == NULL) { return HOS_INSTANCE_NOT_INIT; } - if ((bucket == NULL) || (object == NULL) || (buf == NULL) || (buf_len == 0) - || (thread_id > g_hos_handle.hos_config.thread_num)) + if ((bucket == NULL) || (object == NULL) || (buf == NULL) || (buf_len == 0)) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_buf", - "bucket:%s, object:%s, buf:%s, buf_len:%lu, thread_id:%lu, thread_num:%u", - bucket, object, buf?"not null":"null", buf_len, thread_id, g_hos_handle.hos_config.thread_num); + "bucket:%s, object:%s, buf:%s, buf_len:%lu", bucket, object, buf?"not null":"null", buf_len); return HOS_PARAMETER_ERROR; } - return hos_upload_stream(bucket, object, buf, buf_len, callback, userdata, thread_id); + return hos_upload_stream(bucket, object, buf, buf_len, callback, userdata); } -int hos_open_fd(const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id) +int hos_open_handle(const char *bucket, const char *object, put_finished_callback callback, void *userdata, hos_handle handle) { - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (g_hos_handle.S3Client == NULL) { return HOS_INSTANCE_NOT_INIT; } - if ((bucket == NULL) || (object == NULL) || (thread_id > g_hos_handle.hos_config.thread_num) || strlen(bucket) == 0 || strlen(object) == 0) + if ((bucket == NULL) || (object == NULL) || strlen(bucket) == 0 || strlen(object) == 0) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_open_fd", - "error: bucket:%s, obejct:%s, thread_id:%lu", - (bucket == NULL)?"null":bucket, (object == NULL)?"null":object, thread_id); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, + "error: bucket:%s, obejct:%s", + (bucket == NULL)?"null":bucket, (object == NULL)?"null":object); return HOS_PARAMETER_ERROR; } - - size_t fd = ++g_fd_info[thread_id]; - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "debug: thread_id:%lu, fd:%lu", thread_id, fd); - hos_fd_context_t info = {fd, BUFF_MODE | APPEND_MODE, (char *)bucket, (char *)object, (void *)callback, userdata, - NULL,/*cache*/ g_hos_handle.hos_config.cache_count, 0,/*position*/ 0,/*recive_cnt*/ - (long)g_hos_handle.hos_config.cache_size,/*cache_rest*/ HOS_FD_REGISTER,/*fd_status*/}; - add_fd_context(&g_fd_context[thread_id], &info); + handle = (hos_handle_t *)calloc(1, sizeof(hos_handle_t)); + handle->bucket = (char *)calloc(1, strlen(bucket)+1); + memcpy(handle->bucket, bucket, strlen(bucket)); + handle->object = (char *)calloc(1, strlen(object)+1); + memcpy(handle->object, object, strlen(object)); + handle->callback = callback; + handle->userdata = userdata; + handle->handle_status = HOS_HANDLE_REGISTER; - return fd; + return HOS_CLIENT_OK; } -int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id) +int hos_write(hos_handle handle, const char *stream, size_t stream_len) { - hos_fd_context_t *a_fd_context = NULL; char num[128]; int ret = 0; data_info_t *data_info = NULL; @@ -869,26 +821,23 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id hos_func_thread_t *hos_func = &g_hos_handle.hos_func; size_t upload_len = 0; - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (g_hos_handle.S3Client == NULL) { return HOS_INSTANCE_NOT_INIT; } - if ((stream == NULL) || (thread_id > hos_conf->thread_num)) + if (stream == NULL) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, - "hos_write", "error: fd:%lu, stream:%s, stream_len:%lu, thread_id:%lu.", - fd, stream?"not null":"null", stream_len, thread_id); + "hos_write", "error: stream:%s, stream_len:%lu.", + stream?"not null":"null", stream_len); return HOS_PARAMETER_ERROR; } - - a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd); - if (a_fd_context == NULL) + if (handle == NULL) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: fd info not find. thread_id:%lu, fd:%lu", thread_id, fd); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: fd info not find."); return HOS_HASH_NOT_FIND; } - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: Get fd_context, thread_id:%lu, fd:%lu", thread_id, fd); // create and configure the asynchronous put object request. Aws::S3::Model::PutObjectRequest request; @@ -896,157 +845,138 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id //field_stat2 record if (hos_func->fs2_info.fs2_handle) { - if (hos_func->fs2_info.reserved) - { - data_info = (data_info_t *)hos_func->fs2_info.reserved; - data_info->rx_pkts[thread_id]++; - data_info->rx_bytes[thread_id] += stream_len; - } + data_info = (data_info_t *)&hos_func->fs2_info.data_info; + atomic_add(&data_info->rx_pkts, 1); + atomic_add(&data_info->rx_bytes, stream_len); } - if (a_fd_context->cache == NULL) + if (handle->cache == NULL) { - //a_fd_context->cache = Aws::MakeShared<Aws::StringStream>("hos_write append mode"); - a_fd_context->cache = std::make_shared<Aws::StringStream>(); + handle->cache = std::make_shared<Aws::StringStream>(); } Aws::String buffer(stream, stream_len); - *a_fd_context->cache << buffer; - a_fd_context->cache_rest -= stream_len; - if (data_info != NULL) - data_info->cache[thread_id] += stream_len; - if (a_fd_context->cache_count == 0 || --a_fd_context->cache_count) + *handle->cache << buffer; + handle->cache_rest -= stream_len; + atomic_add(&data_info->cache, stream_len); + if (handle->cache_count == 0 || --handle->cache_count) { //cache_count == 0,不设置cache_count的情况 //cache_count > 0,设置cache_count的情况 - if (a_fd_context->cache_rest > 0) + if (handle->cache_rest > 0) { return HOS_CLIENT_OK; } } - request.SetBody(a_fd_context->cache); + request.SetBody(handle->cache); // add headers - atomic_add(&(a_fd_context->position), 1); - snprintf(num, 128, "%lu", atomic_read(&(a_fd_context->position))); + atomic_add(&(handle->position), 1); + snprintf(num, 128, "%lu", atomic_read(&(handle->position))); Aws::Map<Aws::String, Aws::String> headers; headers["x-hos-upload-type"] = "append"; headers["x-hos-position"] = num; request.SetMetadata(headers); - a_fd_context->cache->seekg(0, std::ios_base::end); - upload_len = a_fd_context->cache->tellg(); - a_fd_context->cache->seekg(0, std::ios_base::beg); + handle->cache->seekg(0, std::ios_base::end); + upload_len = handle->cache->tellg(); + handle->cache->seekg(0, std::ios_base::beg); MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: x-hos-posotion:%s", num); - request.SetBucket(a_fd_context->bucket); - request.SetKey(a_fd_context->object); + request.SetBucket(handle->bucket); + request.SetKey(handle->object); if (hos_conf->pool_thread_size > 0) { - ret = hos_putobject_async(request, upload_len, thread_id, fd, a_fd_context->bucket, a_fd_context->object); + ret = hos_putobject_async(handle, request, upload_len); } else { - ret = hos_putobject_sync(request, upload_len, thread_id, fd, a_fd_context->bucket, a_fd_context->object); + ret = hos_putobject_sync(handle, request, upload_len); } //恢复fd 的cache设置 - if (a_fd_context->mode & APPEND_MODE) + if (handle->mode & APPEND_MODE) { - if (data_info) - data_info->cache[thread_id] -= upload_len; - a_fd_context->cache.reset(); - a_fd_context->cache = NULL; - a_fd_context->cache_rest = hos_conf->cache_size; - a_fd_context->cache_count = hos_conf->cache_count; + atomic_sub(&data_info->cache, upload_len); + handle->cache.reset(); + handle->cache = NULL; + handle->cache_rest = hos_conf->cache_size; + handle->cache_count = hos_conf->cache_count; } return ret; } -int hos_close_fd(size_t fd, size_t thread_id) +int hos_close_handle(hos_handle handle) { - hos_fd_context_t *a_fd_context = NULL; char num[128]; hos_config_t *hos_conf = &g_hos_handle.hos_config; size_t upload_len = 0; - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (g_hos_handle.S3Client == NULL) { return HOS_INSTANCE_NOT_INIT; } - if (thread_id > hos_conf->thread_num) + if (handle == NULL) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_close_fd", - "error:fd:%lu, thread_id:%lu, thread_sum:%u.", - fd, thread_id, hos_conf->thread_num); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, + "error: handle is null."); return HOS_PARAMETER_ERROR; } - a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd); - if (a_fd_context == NULL) - { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, - "hos_close_fd", "debug: not find the a_fd_context of [thread:%lu fd:%lu]", - thread_id, fd); - return HOS_CLIENT_OK; - } //close fd 之前发送append的缓存中内容 - if ((a_fd_context->mode & BUFF_MODE) && (a_fd_context->mode & APPEND_MODE)) + if ((handle->mode & BUFF_MODE) && (handle->mode & APPEND_MODE)) { - if (a_fd_context->cache_rest != (long)hos_conf->cache_size && a_fd_context->cache != NULL) + if (handle->cache_rest != (long)hos_conf->cache_size && handle->cache != NULL) { // Create and configure the asynchronous put object request. Aws::S3::Model::PutObjectRequest request; - request.SetBucket(a_fd_context->bucket); - request.SetKey(a_fd_context->object); - request.SetBody(a_fd_context->cache); + request.SetBucket(handle->bucket); + request.SetKey(handle->object); + request.SetBody(handle->cache); // add headers - atomic_add(&(a_fd_context->position), 1); - snprintf(num, 128, "%lu", atomic_read(&(a_fd_context->position))); + atomic_add(&(handle->position), 1); + snprintf(num, 128, "%lu", atomic_read(&(handle->position))); Aws::Map<Aws::String, Aws::String> headers; headers["x-hos-upload-type"] = "append"; headers["x-hos-position"] = num; request.SetMetadata(headers); - a_fd_context->cache->seekg(0, std::ios_base::end); - upload_len = a_fd_context->cache->tellg(); - a_fd_context->cache->seekg(0, std::ios_base::beg); + handle->cache->seekg(0, std::ios_base::end); + upload_len = handle->cache->tellg(); + handle->cache->seekg(0, std::ios_base::beg); if (hos_conf->pool_thread_size > 0) { - hos_putobject_async(request, upload_len, thread_id, fd, a_fd_context->bucket, a_fd_context->object); + hos_putobject_async(handle, request, upload_len); } else { - hos_putobject_sync(request, upload_len, thread_id, fd, a_fd_context->bucket, a_fd_context->object); + hos_putobject_sync(handle, request, upload_len); } - data_info_t *data_info = (data_info_t *)(g_hos_handle.hos_func.fs2_info.reserved); - if (data_info) - data_info->cache[thread_id] = 0; + atomic_set(&g_hos_handle.hos_func.fs2_info.data_info.cache, 0); } } - a_fd_context->fd_status = HOS_FD_INJECT; - a_fd_context->cache.reset(); - a_fd_context->cache = NULL; - a_fd_context->cache_rest = hos_conf->cache_size; - a_fd_context->cache_count = hos_conf->cache_count; + handle->handle_status = HOS_HANDLE_CANCEL; + handle->cache.reset(); + handle->cache = NULL; + handle->cache_rest = hos_conf->cache_size; + handle->cache_count = hos_conf->cache_count; if (hos_conf->pool_thread_size == 0) { //同步模式,立即释放fd - hos_delete_fd(fd, thread_id); + hos_destroy_handle(handle); } else { //异步APPEND 模式,判断是否可以释放 //异步其他模式,在PutObjectAsyncFinished出释放fd - if (a_fd_context->mode == (BUFF_MODE | APPEND_MODE) && a_fd_context->position == a_fd_context->recive_cnt) + if (handle->mode == (BUFF_MODE | APPEND_MODE) && handle->position == handle->recive_cnt) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", - a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd); - hos_delete_fd(fd, thread_id); + "debug: [%s:%s] upload completed. handle delete", handle->bucket, handle->object); + hos_destroy_handle(handle); } } @@ -1057,30 +987,23 @@ int hos_shutdown_instance() { std::lock_guard<std::mutex> locker(m_client_lock); size_t i = 0; - hos_config_t *hos_conf = &g_hos_handle.hos_config; hos_func_thread_t *hos_func = &g_hos_handle.hos_func; - size_t task_num = 0; - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (g_hos_handle.S3Client == NULL) { return HOS_INSTANCE_NOT_INIT; } if (g_hos_handle.count > 0 && --g_hos_handle.count) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_shutdown_instance", "debug: hos client count:%lu.", g_hos_handle.count); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: hos client count:%lu.", g_hos_handle.count); return HOS_CLIENT_OK; } //先等待所有的task完成 while(1) { - task_num = 0; - for (uint32_t i = 0; i < g_hos_handle.hos_config.thread_num; i++) - { - task_num += atomic_read(&g_hos_handle.task_num[i]); - } - if (task_num == 0) + if (atomic_read(&g_hos_handle.task_num) == 0) break; usleep(500 * 1000); } @@ -1104,27 +1027,6 @@ int hos_shutdown_instance() FS_stop(fs2_handle); *fs2_handle = NULL; } - if (hos_func->fs2_info.reserved) - { - data_info_t *data_info = (data_info_t *)hos_func->fs2_info.reserved; - if (data_info->rx_pkts) - free(data_info->rx_pkts); - if (data_info->rx_bytes) - free(data_info->rx_bytes); - if (data_info->tx_pkts) - free(data_info->tx_pkts); - if (data_info->tx_bytes) - free(data_info->tx_bytes); - if (data_info->tx_failed_bytes) - free(data_info->tx_failed_bytes); - if (data_info->tx_failed_pkts) - ; - free(data_info->tx_failed_pkts); - if (data_info->cache) - free(data_info->cache); - free(hos_func->fs2_info.reserved); - hos_func->fs2_info.reserved = NULL; - } if (hos_func->fs2_info.line_ids) { free(hos_func->fs2_info.line_ids); @@ -1140,35 +1042,8 @@ int hos_shutdown_instance() delete g_hos_handle.S3Client; g_hos_handle.S3Client = NULL; - if (g_hos_handle.task_num != NULL) - { - free(g_hos_handle.task_num); - g_hos_handle.task_num = NULL; - } - if (g_hos_handle.task_context != NULL) - { - free(g_hos_handle.task_context); - g_hos_handle.task_context = NULL; - } MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: delete s3client."); - if (g_fd_info) - { - free(g_fd_info); - g_fd_info = NULL; - } - - for (i = 0; i < hos_conf->thread_num; i++) - { - delete_all(&g_fd_context[i]); - } - - if (g_fd_context) - { - free(g_fd_context); - g_fd_context = NULL; - } - Aws::ShutdownAPI(g_options); MESA_destroy_runtime_log_handle(g_hos_handle.log); g_hos_handle.log = NULL; |
