diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 5 | ||||
| -rw-r--r-- | src/hos_client.cpp | 340 | ||||
| -rw-r--r-- | src/hos_common.h | 1 |
3 files changed, 217 insertions, 129 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 557d66ca..218e8400 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -6,6 +6,7 @@ include_directories(${CMAKE_INSTALL_PREFIX}/include/MESA) link_directories(${CMAKE_INSTALL_PREFIX}/lib) option(HOS_MOCK "If enabled, the SDK will be built using a MOCK .cpp file for S3." OFF) +option(HOS_MESA_LOG "If enabled, the SDK will be built using a MOCK .cpp file for S3." ON) file(GLOB HOS_HEADERS "*.h") file(GLOB HOS_SOURCE "*.cpp") @@ -15,6 +16,10 @@ if (HOS_MOCK) file(GLOB HOS_MOCK_SOURCE "mock/hos_mock.cpp") endif() +if (HOS_MESA_LOG) + add_definitions(-DHOS_MESA_LOG) +endif() + file(GLOB HOS_SRC ${HOS_SOURCE} ${HOS_HEADERS} diff --git a/src/hos_client.cpp b/src/hos_client.cpp index ace52c9f..2a5438b6 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -8,6 +8,7 @@ extern "C" #include <string.h> #include <sys/stat.h> #include <unistd.h> +#include <stdio.h> } #include <aws/s3/model/PutObjectRequest.h> #include <aws/s3/model/CreateBucketRequest.h> @@ -21,9 +22,26 @@ extern "C" #include "mock/hos_mock.h" #endif #include "hos_client.h" -#include "MESA_handle_logger.h" #include "MESA_prof_load.h" #include "hos_common.h" +#ifdef HOS_MESA_LOG +#include "MESA_handle_logger.h" +#else +#define RLOG_LV_DEBUG 10 +#define RLOG_LV_INFO 20 +#define RLOG_LV_FATAL 30 + +#define MESA_create_runtime_log_handle(path, lv) \ + (void *)fopen((path), "a+") +#define MESA_destroy_runtime_log_handle(handle) \ + fclose((FILE *)handle) +#define MESA_HANDLE_RUNTIME_LOG(handle, lv, mod, fmt, args...) \ + do{ \ + fprintf(((FILE *) handle), "line:%d, level:%d, module:%s, ", __LINE__, lv, mod); \ + fprintf(((FILE *) handle), fmt, ##args);\ + fprintf(((FILE *) handle), "\n");\ + } while (0) +#endif struct hos_instance_s g_hos_instance; hos_client_handle_t g_hos_handle;//一个进程只允许有一个g_hos_handle @@ -58,13 +76,30 @@ static size_t hash_get_min_free_fd(size_t thread_id) return 0; } -static int hos_delete_fd(size_t fd, size_t thread_id) +static int hos_delete_fd(size_t thread_id, hos_fd_context_t *context) { - if (fd == 0) + if (context == NULL) { return HOS_PARAMETER_ERROR; } - delete_context_by_fd(&g_fd_context[thread_id], fd); + size_t fd = context->fd; + + if (context) + { + if (context->bucket) + { + free(context->bucket); + context->bucket = NULL; + } + if (context->object) + { + free(context->object); + context->object = NULL; + } + HASH_DEL(g_fd_context[thread_id], context); + free(context); + } + g_fd_info[thread_id][fd] = 0; g_fd_info[thread_id][HOS_FD_FREE]++; g_fd_info[thread_id][HOS_FD_INJECT]--; @@ -90,8 +125,8 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, } if (a_fd_context == NULL) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, - "Not find the info of [thread_id:%d fd:%d]", thread_id, fd); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, + "error: Not find the info of [thread_id:%lu fd:%lu]", thread_id, fd); if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) { @@ -99,53 +134,55 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, data_info->tx_failed_pkts[thread_id]++; data_info->tx_failed_bytes[thread_id] += stream_len; } - return ; } - - bool result = outcome.IsSuccess(); - if (!result) + else { - error = outcome.GetError().GetMessage().c_str(); - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, - "[%s:%s] upload failed. error:%s",a_fd_context->bucket, a_fd_context->object, error); + bool result = outcome.IsSuccess(); + if (!result) + { + error = outcome.GetError().GetMessage().c_str(); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, + "error: [%s:%s] upload failed. error:%s", a_fd_context->bucket, a_fd_context->object, error); - if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) + if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) + { + data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; + data_info->tx_failed_pkts[thread_id]++; + data_info->tx_failed_bytes[thread_id] += stream_len; + } + } + else { - data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; - data_info->tx_failed_pkts[thread_id]++; - data_info->tx_failed_bytes[thread_id] += stream_len; + if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) + { + data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; + data_info->tx_pkts[thread_id]++; + data_info->tx_bytes[thread_id] += stream_len; + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: [%s:%s] upload success. tx_pkts:%lu, tx_bytes:%lu", + a_fd_context->bucket, a_fd_context->object, + data_info->tx_pkts[thread_id], data_info->tx_bytes[thread_id]); + } + else + { + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: [%s:%s] upload success. stream size:%lu", a_fd_context->bucket, a_fd_context->object, stream_len); + } } - } - else - { - if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) + put_finished_callback callback = (put_finished_callback)a_fd_context->callback; + callback(result, a_fd_context->bucket, a_fd_context->object, error, a_fd_context->userdata); + if (a_fd_context->mode & APPEND_MODE) { - data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; - data_info->tx_pkts[thread_id]++; - data_info->tx_bytes[thread_id] += stream_len; - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "[%s:%s] upload success. tx_pkts:%d, tx_bytes:%d", - a_fd_context->bucket, a_fd_context->object, - data_info->tx_pkts[thread_id], data_info->tx_bytes[thread_id]); + //APPEND MODE 保留fd + atomic_add(&(a_fd_context->recive_cnt), 1); } else { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "[%s:%s] upload success. stream size:%d", a_fd_context->bucket, a_fd_context->object, stream_len); + //完整上传 删除fd + hos_close_fd(fd, thread_id); } } - put_finished_callback callback = (put_finished_callback)a_fd_context->callback; - callback(result, a_fd_context->bucket, a_fd_context->object, error, a_fd_context->userdata); - if (a_fd_context->mode & APPEND_MODE) - { - //APPEND MODE 保留fd - atomic_add(&(a_fd_context->recive_cnt), 1); - } - else - { - //完整上传 删除fd - hos_close_fd(fd, thread_id); - } + g_hos_handle.task_num[thread_id]--; } static void hos_client_create() @@ -174,7 +211,8 @@ static void hos_client_create() if (hos_conf->pool_thread_size > 0) { //异步模式 - config.executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY)); //支持线程池 + //config.executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY)); //支持线程池 + config.executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::QUEUE_TASKS_EVENLY_ACCROSS_THREADS)); //支持线程池 } else { @@ -197,13 +235,14 @@ static void hos_client_create() g_hos_instance.error_code = (size_t)outcome.GetError().GetErrorType() + 1; snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, outcome.GetError().GetMessage().c_str()); g_hos_instance.result = false; - MESA_handle_runtime_log(log, RLOG_LV_FATAL, "hos_client_create", g_hos_instance.error_message); + MESA_HANDLE_RUNTIME_LOG(log, RLOG_LV_FATAL, "hos_client_create", "error: %s", g_hos_instance.error_message); return; } g_hos_handle.buckets = outcome.GetResult().GetBuckets(); g_hos_handle.count++; g_hos_handle.executor = std::dynamic_pointer_cast<Aws::Utils::Threading::PooledThreadExecutor>(config.executor); + g_hos_handle.task_num = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); g_fd_context = (hos_fd_context_t **)calloc(hos_conf->thread_num, sizeof(hos_fd_context_t *)); g_fd_info = (size_t (*)[MAX_HOS_CLIENT_FD_NUM + 1])calloc(hos_conf->thread_num, sizeof(size_t [MAX_HOS_CLIENT_FD_NUM + 1])); @@ -213,7 +252,13 @@ static void hos_client_create() g_fd_info[i][0] = 65533; } - MESA_handle_runtime_log(log, RLOG_LV_DEBUG, "hos_client_create", "hos s3client create success, url:%s.",endpoint); + if (g_hos_handle.hos_func.fd_thread == 0) + { + g_hos_handle.hos_func.fd_thread_status = 0; + pthread_create(&g_hos_handle.hos_func.fd_thread, NULL, hos_fd_manage, NULL); + } + + MESA_HANDLE_RUNTIME_LOG(log, RLOG_LV_DEBUG, "hos_client_create", "debug: hos s3client create success, url:%s.",endpoint); g_hos_instance.result = true; } @@ -221,15 +266,15 @@ bool hos_verify_bucket(const char *bucket) { if (bucket == NULL) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "bucket is null"); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: bucket is null"); return false; } if (g_hos_instance.result != true || g_hos_handle.S3Client == NULL) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "g_hos_instance.result:%s, g_hos_handle.S3Client:%s", - g_hos_instance.result, (g_hos_handle.S3Client==NULL)?(NULL):("not null")); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: g_hos_instance.result:%d, g_hos_handle.S3Client:%s", + g_hos_instance.result, (g_hos_handle.S3Client==NULL)?("null"):("not null")); return false; } Aws::S3::Model::ListBucketsOutcome outcome = g_hos_handle.S3Client->ListBuckets(); @@ -242,14 +287,14 @@ bool hos_verify_bucket(const char *bucket) { if (strcmp(new_bucket.GetName().c_str(), bucket) == 0) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, "hos_verify_bucket","bucket:%s exits", bucket); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_verify_bucket","debug: bucket:%s exits", bucket); return true; } } } else { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, "hos_verify_bucket","error:%s", outcome.GetError().GetMessage().c_str()); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_verify_bucket","error:%s", outcome.GetError().GetMessage().c_str()); } return false; } @@ -336,8 +381,8 @@ static void *fs2_statistics(void *ptr) FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[1], FS_OP_SET, rx_bytes_interval); FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[2], FS_OP_SET, tx_pkts_interval); FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[3], FS_OP_SET, tx_bytes_interval); - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[5], FS_OP_SET, tx_failed_pkts_interval); - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[4], FS_OP_SET, tx_failed_bytes_interval); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[4], FS_OP_SET, tx_failed_pkts_interval); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[5], FS_OP_SET, tx_failed_bytes_interval); FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[6], FS_OP_SET, cache_interval); FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[1], fs2_info->column_ids[0], FS_OP_SET, rx_pkts_sum); @@ -492,16 +537,17 @@ static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t ret = S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); if (ret) { + g_hos_handle.task_num[thread_id]++; //不算真正成功,需要等到PutObjectAsyncFinished的结果 - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "PutObjectAsync success. [%s:%s]", bucket, object); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: PutObjectAsync success. [%s:%s]", bucket, object); return HOS_CLIENT_OK; } else { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "PutObjectAsync failed. [%s:%s]", bucket, object); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: PutObjectAsync failed. [%s:%s]", bucket, object); if (hos_func->fs2_info[0].fs2_handle) { @@ -531,22 +577,22 @@ static int hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; data_info->tx_pkts[thread_id]++; data_info->tx_bytes[thread_id] += stream_len; - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "PutObject success. [%s:%s] tx_pkts:%d, tx_bytes:%d", + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: PutObject success. [%s:%s] tx_pkts:%lu, tx_bytes:%lu", bucket, object, data_info->tx_pkts[thread_id], data_info->tx_bytes[thread_id]); } else { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "PutObject success. [%s:%s]", bucket, object); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: PutObject success. [%s:%s]", bucket, object); } return HOS_CLIENT_OK; } else { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "PutObject failed. [%s:%s] cause:%s", bucket, object, Outcome.GetError().GetMessage().c_str()); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: PutObject failed. [%s:%s] cause:%s", bucket, object, Outcome.GetError().GetMessage().c_str()); if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) { @@ -618,21 +664,21 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t { if(hos_verify_bucket(bucket) == false) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "bucket:%s not exist.", bucket); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: bucket:%s not exist.", bucket); hos_shutdown_instance(); g_hos_instance.result = false; g_hos_instance.error_code = HOS_BUCKET_NOT_EXIST; snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "bucket:%s not exits.", bucket); return &g_hos_instance; } - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "Instance init completed"); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug:%s","Instance init completed"); if (hos_conf->fs2_ip && hos_conf->fs2_port) { hos_expand_fs2(); } else { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "hos fs2 function not starup"); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__,"error: hos fs2 function not starup"); } g_hos_instance.error_code = 0; g_hos_instance.error_message[0]='\0'; @@ -656,7 +702,7 @@ int hos_create_bucket(const char *bucket) { if ((bucket == NULL) || (g_hos_handle.S3Client == NULL)) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, "hos_create_bucket", + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_create_bucket", "error:bucket:%s, s3client:%s", bucket, g_hos_handle.S3Client?"not null":"null"); return HOS_PARAMETER_ERROR; } @@ -667,7 +713,7 @@ int hos_create_bucket(const char *bucket) { if (strcmp(new_bucket.GetName().c_str(), bucket) == 0) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "%s was exits", bucket); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: %s was exits", bucket); return HOS_CLIENT_OK; } } @@ -682,12 +728,12 @@ int hos_create_bucket(const char *bucket) Aws::S3::S3Errors errorcode = createBucketOutcome.GetError().GetErrorType(); if (errorcode != Aws::S3::S3Errors::BUCKET_ALREADY_OWNED_BY_YOU) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__,"error: %s create failed. %s", + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__,"error: %s create failed. %s", bucket, createBucketOutcome.GetError().GetMessage().c_str()); return (int)errorcode + 1; } } - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "%s create successful", bucket); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: %s create successful", bucket); return HOS_CLIENT_OK; } @@ -703,13 +749,13 @@ static int hos_upload_stream(const char *bucket, const char *object, const char if ((g_hos_handle.S3Client == NULL) || (bucket == NULL) || (object == NULL) || (thread_id > hos_conf->thread_num)) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_stream", - "s3client:%s, bucket:%s, object:%s, thread_id:%d, thread_num:%d", + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_stream", + "error: s3client:%s, bucket:%s, object:%s, thread_id:%lu, thread_num:%u", g_hos_handle.S3Client?"not null":"null", bucket, object, thread_id, hos_conf->thread_num); return HOS_PARAMETER_ERROR; } - mode = data?1:0; // 1, file mode; 0 buf mode + mode = data?1:0; // 0, file mode; 1 buf mode // Create and configure the asynchronous put object request. Aws::S3::Model::PutObjectRequest request; @@ -776,7 +822,7 @@ int hos_upload_file(const char *bucket, const char *file_path, put_finished_call struct stat buffer; if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s", g_hos_instance.result, (g_hos_handle.S3Client == NULL)?(NULL):("not null")); return HOS_INSTANCE_NOT_INIT; @@ -784,15 +830,15 @@ int hos_upload_file(const char *bucket, const char *file_path, put_finished_call if ((bucket == NULL) || (file_path == NULL) || (thread_id > g_hos_handle.hos_config.thread_num)) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_file", - "s3client:%s, bucket:%s, file_path:%s, thread_id:%d, thread_num:%d", + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_file", + "error: bucket:%s, file_path:%s, thread_id:%lu, thread_num:%u", bucket, file_path, thread_id, g_hos_handle.hos_config.thread_num); return HOS_PARAMETER_ERROR; } if (stat(file_path, &buffer) == -1) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_file", "The file:%s not exist", file_path); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_file", "error: The file:%s not exist", file_path); return HOS_FILE_NOT_EXIST; } return hos_upload_stream(bucket, file_path, NULL, buffer.st_size, callback, userdata, thread_id); @@ -802,7 +848,7 @@ int hos_upload_buf(const char *bucket, const char *object, const char *buf, size { if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s", g_hos_instance.result, (g_hos_handle.S3Client == NULL)?(NULL):("not null")); return HOS_INSTANCE_NOT_INIT; @@ -811,8 +857,8 @@ int hos_upload_buf(const char *bucket, const char *object, const char *buf, size if ((bucket == NULL) || (object == NULL) || (buf == NULL) || (buf_len == 0) || (thread_id > g_hos_handle.hos_config.thread_num)) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_buf", - "bucket:%s, object:%s, buf:%s, buf_len:%d, thread_id:%d, thread_num:%d", + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_buf", + "bucket:%s, object:%s, buf:%s, buf_len:%lu, thread_id:%lu, thread_num:%u", bucket, object, buf?"not null":"null", buf_len, thread_id, g_hos_handle.hos_config.thread_num); return HOS_PARAMETER_ERROR; } @@ -824,35 +870,39 @@ static void *hos_fd_manage(void *ptr) hos_fd_context_t *a_fd_context; size_t thread_sum = g_hos_handle.hos_config.thread_num; size_t thread_num; - size_t fd; + //size_t fd; while(1) { if (g_hos_handle.hos_func.fd_thread_status) break; for (thread_num = 0; thread_num < thread_sum; thread_num++) { - for(fd = 3; fd < MAX_HOS_CLIENT_FD_NUM + 1; fd++) - { - if (!g_fd_info[thread_num][fd]) - continue; +#if 0 a_fd_context = find_context_by_fd(g_fd_context[thread_num], fd); if (!a_fd_context) continue; +#endif + hos_fd_context_t *tmp = NULL; + HASH_ITER(hh, g_fd_context[thread_num], a_fd_context, tmp) + { + if (!a_fd_context) + break; + if (a_fd_context->fd_status == HOS_FD_INJECT) { if (a_fd_context->position == a_fd_context->recive_cnt) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "[%s:%s] upload completed. [thread:%d fd:%d] delete", - a_fd_context->bucket, a_fd_context->object, thread_num, fd); - hos_delete_fd(fd, thread_num); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", + a_fd_context->bucket, a_fd_context->object, thread_num, a_fd_context->fd); + hos_delete_fd(thread_num, a_fd_context); } else if (a_fd_context->overtime <= get_current_ms()) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, - "[%s:%s] upload not completed, but the live-time of [thread_id:%d fd:%d] is over.", - a_fd_context->bucket, a_fd_context->object, thread_num, fd); - hos_delete_fd(fd, thread_num); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, + "error: [%s:%s] upload not completed, but the live-time of [thread_id:%lu fd:%lu] is over.", + a_fd_context->bucket, a_fd_context->object, thread_num, a_fd_context->fd); + hos_delete_fd(thread_num, a_fd_context); } } } @@ -866,15 +916,15 @@ int hos_open_fd(const char *bucket, const char *object, put_finished_callback ca { if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s", g_hos_instance.result, (g_hos_handle.S3Client == NULL)?("null"):("not null")); return HOS_INSTANCE_NOT_INIT; } if ((bucket == NULL) || (object == NULL) || (thread_id > g_hos_handle.hos_config.thread_num) || strlen(bucket) == 0 || strlen(object) == 0) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, "hos_open_fd", - "bucket:%s, obejct:%s, thread_id:%d", + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_open_fd", + "error: bucket:%s, obejct:%s, thread_id:%lu", //(bucket == NULL)?"null":bucket, (object == NULL)?"null":object, thread_id); bucket, object, thread_id); return HOS_PARAMETER_ERROR; @@ -883,16 +933,16 @@ int hos_open_fd(const char *bucket, const char *object, put_finished_callback ca size_t fd = hash_get_min_free_fd(thread_id); if (fd == 0) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, "hos_open_fd", - "error:fd not enough, thread_id:%d, fd free: %d, fd register:%d, fd inject:%d", + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_open_fd", + "error:fd not enough, thread_id:%lu, fd free: %lu, fd register:%lu, fd inject:%lu", thread_id, g_fd_info[thread_id][HOS_FD_FREE], g_fd_info[thread_id][HOS_FD_REGISTER], g_fd_info[thread_id][HOS_FD_INJECT]); - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "thread_id:%d, fd:%d", thread_id, fd); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "debug: thread_id:%lu, fd:%lu", thread_id, fd); return HOS_FD_NOT_ENOUGH; } - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "thread_id:%d, fd:%d", thread_id, fd); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "debug: thread_id:%lu, fd:%lu", thread_id, fd); hos_fd_context_t info = {fd, mode, (char *)bucket, (char *)object, (void *)callback, userdata, NULL,/*cache*/ g_hos_handle.hos_config.cache_count, 0,/*position*/ 0,/*recive_cnt*/ @@ -900,6 +950,7 @@ int hos_open_fd(const char *bucket, const char *object, put_finished_callback ca 0,/*overtime*/ g_hos_handle.hos_config.timeout,}; add_fd_context(&g_fd_context[thread_id], &info); +#if 0 { std::lock_guard<std::mutex> locker(m_client_lock); if (g_hos_handle.hos_func.fd_thread == 0) @@ -908,6 +959,7 @@ int hos_open_fd(const char *bucket, const char *object, put_finished_callback ca pthread_create(&g_hos_handle.hos_func.fd_thread, NULL, hos_fd_manage, NULL); } } +#endif return fd; } @@ -925,7 +977,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s", g_hos_instance.result, (g_hos_handle.S3Client == NULL)?(NULL):("not null")); return HOS_INSTANCE_NOT_INIT; @@ -933,8 +985,8 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id if ((fd < 3) || fd > MAX_HOS_CLIENT_FD_NUM || (stream == NULL) || (thread_id > hos_conf->thread_num)) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, - "hos_write", "error: fd:%d, stream:%s, stream_len:%d, thread_id:%d.", + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, + "hos_write", "error: fd:%lu, stream:%s, stream_len:%lu, thread_id:%lu.", fd, stream?"not null":"null", stream_len, thread_id); return HOS_PARAMETER_ERROR; } @@ -945,22 +997,11 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id } if (a_fd_context == NULL) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "fd info not find. thread_id:%d, fd:%d", thread_id, fd); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: fd info not find. thread_id:%lu, fd:%lu", thread_id, fd); return HOS_HASH_NOT_FIND; } - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "Get fd_context, thread_id:%d, fd:%d", thread_id, fd); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: Get fd_context, thread_id:%lu, fd:%lu", thread_id, fd); - //field_stat2 record - if (hos_func->fs2_info[0].fs2_handle) - { - if (hos_func->fs2_info[0].reserved) - { - data_info = (data_info_t *)hos_func->fs2_info[0].reserved; - data_info->rx_pkts[thread_id]++; - data_info->rx_bytes[thread_id] += stream_len; - } - } - // create and configure the asynchronous put object request. Aws::S3::Model::PutObjectRequest request; @@ -968,6 +1009,17 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id if (a_fd_context->mode & BUFF_MODE) { //BUFF_MODE + + //field_stat2 record + if (hos_func->fs2_info[0].fs2_handle) + { + if (hos_func->fs2_info[0].reserved) + { + data_info = (data_info_t *)hos_func->fs2_info[0].reserved; + data_info->rx_pkts[thread_id]++; + data_info->rx_bytes[thread_id] += stream_len; + } + } if (a_fd_context->mode & APPEND_MODE) { //APPEND_MODE @@ -1002,7 +1054,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id a_fd_context->cache->seekg(0, std::ios_base::end); upload_len = a_fd_context->cache->tellg(); a_fd_context->cache->seekg(0, std::ios_base::beg); - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "x-hos-posotion:%s", num); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: x-hos-posotion:%s", num); } else { @@ -1018,14 +1070,24 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id { if (stat(stream, &buffer) == -1) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "The file:%s not exist", stream); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: The file:%s not exist", stream); return HOS_FILE_NOT_EXIST; } //文件类型 const std::shared_ptr<Aws::IOStream> input_data = - Aws::MakeShared<Aws::FStream>("hos_write file mode", a_fd_context->object, std::ios_base::in | std::ios_base::binary); + Aws::MakeShared<Aws::FStream>("hos_write file mode", stream, std::ios_base::in | std::ios_base::binary); request.SetBody(input_data); upload_len = buffer.st_size; + //field_stat2 record + if (hos_func->fs2_info[0].fs2_handle) + { + if (hos_func->fs2_info[0].reserved) + { + data_info = (data_info_t *)hos_func->fs2_info[0].reserved; + data_info->rx_pkts[thread_id]++; + data_info->rx_bytes[thread_id] += upload_len; + } + } } request.SetBucket(a_fd_context->bucket); @@ -1062,16 +1124,16 @@ int hos_close_fd(size_t fd, size_t thread_id) if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s", - g_hos_instance.result, (g_hos_handle.S3Client == NULL)?(NULL):("not null")); + g_hos_instance.result, (g_hos_handle.S3Client == NULL)?("null"):("not null")); return HOS_INSTANCE_NOT_INIT; } if (fd < 3 || fd > 65533 || thread_id > hos_conf->thread_num) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, "hos_close_fd", - "error:fd:%d, thread_id:%d, thread_sum:%d.", + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_close_fd", + "error:fd:%lu, thread_id:%lu, thread_sum:%u.", fd, thread_id, hos_conf->thread_num); return HOS_PARAMETER_ERROR; } @@ -1081,8 +1143,8 @@ int hos_close_fd(size_t fd, size_t thread_id) } if (a_fd_context == NULL) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, - "hos_close_fd", "not find the a_fd_context of [fd:%d thread:%d]", + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, + "hos_close_fd", "debug: not find the a_fd_context of [fd:%lu thread:%lu]", fd, thread_id); return HOS_CLIENT_OK; } @@ -1117,6 +1179,7 @@ int hos_close_fd(size_t fd, size_t thread_id) { hos_putobject_sync(request, upload_len, thread_id, fd, a_fd_context->bucket, a_fd_context->object); } + ((data_info_t *)(g_hos_handle.hos_func.fs2_info->reserved))->cache[thread_id] = 0; } } a_fd_context->fd_status = HOS_FD_INJECT; @@ -1138,19 +1201,33 @@ int hos_shutdown_instance() size_t i = 0; hos_config_t *hos_conf = &g_hos_handle.hos_config; hos_func_thread_t *hos_func = &g_hos_handle.hos_func; + size_t task_num = 0; if (g_hos_handle.S3Client == NULL) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, "hos_shutdown_instance", "There is no hos client."); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_shutdown_instance", "debug: There is no hos client."); return HOS_CLIENT_OK; } if (g_hos_handle.count > 0 && --g_hos_handle.count) { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, "hos_shutdown_instance", "hos client count:%d.", g_hos_handle.count); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_shutdown_instance", "debug: hos client count:%lu.", g_hos_handle.count); return HOS_CLIENT_OK; } + //先等待所有的task完成 + while(1) + { + task_num = 0; + for (uint32_t i = 0; i < g_hos_handle.hos_config.thread_num; i++) + { + task_num += g_hos_handle.task_num[i]; + } + if (task_num == 0) + break; + usleep(500 * 1000); + } + Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets); if (hos_func->fd_thread) @@ -1208,7 +1285,12 @@ int hos_shutdown_instance() delete g_hos_handle.S3Client; g_hos_handle.S3Client = NULL; - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "delete s3client."); + if (g_hos_handle.task_num != NULL) + { + free(g_hos_handle.task_num); + g_hos_handle.task_num = NULL; + } + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: delete s3client."); if (g_fd_info) { diff --git a/src/hos_common.h b/src/hos_common.h index 85d415e8..9c9db6a0 100644 --- a/src/hos_common.h +++ b/src/hos_common.h @@ -97,6 +97,7 @@ typedef struct hos_client_handle_s hos_config_t hos_config; hos_func_thread_t hos_func; void *log; + size_t *task_num; }hos_client_handle_t; extern struct hos_instance_s g_hos_instance; |
