diff options
| author | 彭宣正 <[email protected]> | 2021-06-24 16:26:39 +0800 |
|---|---|---|
| committer | 彭宣正 <[email protected]> | 2021-06-29 18:12:16 +0800 |
| commit | 696fcecb7c2a4d0bb5f908e3c4fd8b77ba830b0d (patch) | |
| tree | 77d1e4204ed3cfbf96760c18e3d1c44194a27e33 /src | |
| parent | 8cb339003a3e7aeb0e85ee51ac1341900219ae86 (diff) | |
✨ feat(gtest与src): 修改fd管理流程,TSG-6760v2.0.0
Diffstat (limited to 'src')
| -rw-r--r-- | src/hos_client.cpp | 372 | ||||
| -rw-r--r-- | src/hos_common.h | 8 | ||||
| -rw-r--r-- | src/hos_hash.cpp | 2 | ||||
| -rw-r--r-- | src/hos_hash.h | 7 |
4 files changed, 135 insertions, 254 deletions
diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 2a5438b6..89871232 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -47,11 +47,9 @@ struct hos_instance_s g_hos_instance; hos_client_handle_t g_hos_handle;//一个进程只允许有一个g_hos_handle static std::mutex m_client_lock; hos_fd_context_t **g_fd_context; -size_t (*g_fd_info)[MAX_HOS_CLIENT_FD_NUM + 1]; //fd 实际从3开始, fd[thread_id][0]记录register的fd,fd[thread_id][1]记录inject的fd +size_t *g_fd_info; //fd 实际从1开始,每个线程有独立的fd static Aws::SDKOptions g_options; -static void *hos_fd_manage(void *ptr); - static inline size_t get_current_ms() { struct timespec timenow; @@ -59,30 +57,12 @@ static inline size_t get_current_ms() return (timenow.tv_sec * 1000 + timenow.tv_nsec / 1000 / 1000 ); } -static size_t hash_get_min_free_fd(size_t thread_id) -{ - size_t i = 0; - for (i = 3; i < MAX_HOS_CLIENT_FD_NUM + 1; i++) - { - if (!g_fd_info[thread_id][i]) - { - g_fd_info[thread_id][i] = 1; - g_fd_info[thread_id][HOS_FD_REGISTER]++; - g_fd_info[thread_id][HOS_FD_FREE]--; - - return i; - } - } - return 0; -} - static int hos_delete_fd(size_t thread_id, hos_fd_context_t *context) { if (context == NULL) { return HOS_PARAMETER_ERROR; } - size_t fd = context->fd; if (context) { @@ -99,10 +79,6 @@ static int hos_delete_fd(size_t thread_id, hos_fd_context_t *context) HASH_DEL(g_fd_context[thread_id], context); free(context); } - - g_fd_info[thread_id][fd] = 0; - g_fd_info[thread_id][HOS_FD_FREE]++; - g_fd_info[thread_id][HOS_FD_INJECT]--; return HOS_CLIENT_OK; } @@ -118,19 +94,17 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, data_info_t *data_info = NULL; const Aws::String& uuid = context->GetUUID(); size_t thread_id, fd, stream_len; + sscanf(uuid.c_str(), "%lu %lu %lu", &thread_id, &fd, &stream_len); - if (g_fd_info[thread_id][fd]) - { - a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd); - } + a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd); if (a_fd_context == NULL) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: Not find the info of [thread_id:%lu fd:%lu]", thread_id, fd); - if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) + if (hos_func->fs2_info.fs2_handle && hos_func->fs2_info.reserved) { - data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; + data_info = (data_info_t *)hos_func->fs2_info.reserved; data_info->tx_failed_pkts[thread_id]++; data_info->tx_failed_bytes[thread_id] += stream_len; } @@ -144,18 +118,18 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: [%s:%s] upload failed. error:%s", a_fd_context->bucket, a_fd_context->object, error); - if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) + if (hos_func->fs2_info.fs2_handle && hos_func->fs2_info.reserved) { - data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; + data_info = (data_info_t *)hos_func->fs2_info.reserved; data_info->tx_failed_pkts[thread_id]++; data_info->tx_failed_bytes[thread_id] += stream_len; } } else { - if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) + if (hos_func->fs2_info.fs2_handle && hos_func->fs2_info.reserved) { - data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; + data_info = (data_info_t *)hos_func->fs2_info.reserved; data_info->tx_pkts[thread_id]++; data_info->tx_bytes[thread_id] += stream_len; MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, @@ -175,14 +149,28 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, { //APPEND MODE 保留fd atomic_add(&(a_fd_context->recive_cnt), 1); + if (a_fd_context->fd_status == HOS_FD_INJECT) + { + if (a_fd_context->position == a_fd_context->recive_cnt) + { + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", + a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd); + hos_delete_fd(thread_id, a_fd_context); + } + } } else { //完整上传 删除fd - hos_close_fd(fd, thread_id); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", + a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd); + hos_delete_fd(thread_id, a_fd_context); } } g_hos_handle.task_num[thread_id]--; + g_hos_handle.task_context[thread_id]--; } static void hos_client_create() @@ -243,20 +231,18 @@ static void hos_client_create() g_hos_handle.count++; g_hos_handle.executor = std::dynamic_pointer_cast<Aws::Utils::Threading::PooledThreadExecutor>(config.executor); g_hos_handle.task_num = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); + g_hos_handle.task_context = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); g_fd_context = (hos_fd_context_t **)calloc(hos_conf->thread_num, sizeof(hos_fd_context_t *)); - g_fd_info = (size_t (*)[MAX_HOS_CLIENT_FD_NUM + 1])calloc(hos_conf->thread_num, sizeof(size_t [MAX_HOS_CLIENT_FD_NUM + 1])); - - for (size_t i = 0; i < hos_conf->thread_num; i++) - { - g_fd_info[i][0] = 65533; - } + g_fd_info = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); + #if 0 if (g_hos_handle.hos_func.fd_thread == 0) { g_hos_handle.hos_func.fd_thread_status = 0; pthread_create(&g_hos_handle.hos_func.fd_thread, NULL, hos_fd_manage, NULL); } + #endif MESA_HANDLE_RUNTIME_LOG(log, RLOG_LV_DEBUG, "hos_client_create", "debug: hos s3client create success, url:%s.",endpoint); g_hos_instance.result = true; @@ -272,9 +258,6 @@ bool hos_verify_bucket(const char *bucket) } if (g_hos_instance.result != true || g_hos_handle.S3Client == NULL) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: g_hos_instance.result:%d, g_hos_handle.S3Client:%s", - g_hos_instance.result, (g_hos_handle.S3Client==NULL)?("null"):("not null")); return false; } Aws::S3::Model::ListBucketsOutcome outcome = g_hos_handle.S3Client->ListBuckets(); @@ -324,14 +307,10 @@ static void *fs2_statistics(void *ptr) size_t tx_failed_pkts_last = 0; size_t cache_last = 0; fs2_info_t *fs2_info = NULL; - int PoolThread_state[3] = {0, 0, 0};//{PoolSize, Busy, TopBusy} - int *busy = &PoolThread_state[1]; - int *top_busy = &PoolThread_state[2]; - int pool_history_sum = 0; hos_config_t *hos_conf = &g_hos_handle.hos_config; hos_func_thread_t *hos_func = &g_hos_handle.hos_func; + size_t task_num = 0; - PoolThread_state[0] = hos_conf->pool_thread_size; while(1) { if (hos_func->fs2_status == HOS_FS2_STOP) @@ -348,7 +327,7 @@ static void *fs2_statistics(void *ptr) tx_failed_pkts_sum = 0; cache_sum = 0; - fs2_info = &hos_func->fs2_info[0]; + fs2_info = &hos_func->fs2_info; data_info_t *data_info = (data_info_t *)fs2_info->reserved; for (i = 0; i < hos_conf->thread_num; i++) { @@ -359,6 +338,8 @@ static void *fs2_statistics(void *ptr) tx_failed_bytes_sum += data_info->tx_failed_bytes[i]; tx_failed_pkts_sum += data_info->tx_failed_pkts[i]; cache_sum += data_info->cache[i]; + + task_num += g_hos_handle.task_num[i]; } rx_pkts_interval = rx_pkts_sum - rx_pkts_last; @@ -393,20 +374,6 @@ static void *fs2_statistics(void *ptr) FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[1], fs2_info->column_ids[5], FS_OP_SET, tx_failed_bytes_sum); FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[1], fs2_info->column_ids[6], FS_OP_SET, cache_sum); - //PoolThread State - if (hos_conf->pool_thread_size > 0) - { - *busy = g_hos_handle.executor->GetTaskSize(); - *top_busy = (*busy) > (*top_busy) ? (*busy) : (*top_busy); - pool_history_sum += *busy; - - fs2_info = &hos_func->fs2_info[FS2_POOL_THREAD_STATE]; - for (i = 0; i < 3; i++) - { - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[i], FS_OP_SET, PoolThread_state[i]); - } - } - sleep(1); } pthread_exit(NULL); @@ -458,7 +425,7 @@ static void hos_expand_fs2() hos_func_thread_t *hos_func = &g_hos_handle.hos_func; size_t i = 0; - if (hos_func->fs2_info[0].fs2_handle) + if (hos_func->fs2_info.fs2_handle) return; //data info /********************************************************************************************************** @@ -466,9 +433,9 @@ static void hos_expand_fs2() * current 10 100 1 100 0 0 100 * total 100 1000 10 1000 0 0 100(无实意) ***********************************************************************************************************/ - fs2_info = &hos_func->fs2_info[FS2_DATA_FLOW_STATE]; - hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle = hos_init_fs2((char *)"hos-data", strlen("hos-data")); - fs2_handle = hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle; + fs2_info = &hos_func->fs2_info; + hos_func->fs2_info.fs2_handle = hos_init_fs2((char *)"hos-data", strlen("hos-data")); + fs2_handle = hos_func->fs2_info.fs2_handle; fs2_info->line_ids = (int *)calloc(2, sizeof(int)); fs2_info->column_ids = (int *)calloc(7, sizeof(int)); @@ -492,29 +459,6 @@ static void hos_expand_fs2() data_info->cache = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); FS_start(fs2_handle); - if (hos_conf->pool_thread_size > 0) - { - //PoolThread state - /******************************************************* - * PoolSize Busy TopBusy AveBusy - * ThreadNum 1000 500 800 650 - ********************************************************/ - fs2_info = &hos_func->fs2_info[FS2_POOL_THREAD_STATE]; - hos_func->fs2_info[FS2_POOL_THREAD_STATE].fs2_handle = hos_init_fs2((char *)"hos-poolthread", strlen("hos-poolthread")); - fs2_handle = hos_func->fs2_info[FS2_POOL_THREAD_STATE].fs2_handle; - fs2_info->line_ids = (int *)calloc(1, sizeof(int)); - fs2_info->column_ids = (int *)calloc(3, sizeof(int)); - - const char *poolthread_col[3] = {"PoolSize", "Busy", "TopBusy"}; - for (i = 0; i < sizeof(poolthread_col) / sizeof(const char *); i++) - { - fs2_info->column_ids[i] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, poolthread_col[i]); - } - fs2_info->line_ids[0] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, "ThreadNum"); - - FS_start(fs2_handle); - } - pthread_create(&hos_func->fs2_thread, NULL, fs2_statistics, NULL); return ; @@ -527,17 +471,31 @@ static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t int ret = 0; hos_func_thread_t *hos_func = &g_hos_handle.hos_func; data_info_t *data_info = NULL; + hos_config_t *hos_conf = &g_hos_handle.hos_config; + //设置回调函数 std::shared_ptr<Aws::Client::AsyncCallerContext> context = Aws::MakeShared<Aws::Client::AsyncCallerContext>(""); sprintf(buf, "%lu %lu %lu", thread_id, fd, stream_len); context->SetUUID(buf); + if (hos_conf->max_request_num && hos_conf->max_request_context && + (g_hos_handle.task_num[thread_id] >= hos_conf->max_request_num || + g_hos_handle.task_context[thread_id] >= hos_conf->max_request_context)) + { + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: PutObjectAsync failed. [%s:%s]. task_num:%lu, task_context:%lu", + bucket, object, g_hos_handle.task_num[thread_id], g_hos_handle.task_context[thread_id]); + + return HOS_SEND_FAILED; + } + auto &S3Client = *(g_hos_handle.S3Client); ret = S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); if (ret) { g_hos_handle.task_num[thread_id]++; + g_hos_handle.task_context[thread_id] += stream_len; //不算真正成功,需要等到PutObjectAsyncFinished的结果 MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: PutObjectAsync success. [%s:%s]", bucket, object); @@ -549,11 +507,11 @@ static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: PutObjectAsync failed. [%s:%s]", bucket, object); - if (hos_func->fs2_info[0].fs2_handle) + if (hos_func->fs2_info.fs2_handle) { - if (hos_func->fs2_info[0].reserved) + if (hos_func->fs2_info.reserved) { - data_info = (data_info_t *)hos_func->fs2_info[0].reserved; + data_info = (data_info_t *)hos_func->fs2_info.reserved; data_info->tx_failed_pkts[thread_id]++; data_info->tx_failed_bytes[thread_id] += stream_len; } @@ -572,9 +530,9 @@ static int hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t Aws::S3::Model::PutObjectOutcome Outcome = S3Client.PutObject(request); if (Outcome.IsSuccess()) { - if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) + if (hos_func->fs2_info.fs2_handle && hos_func->fs2_info.reserved) { - data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; + data_info = (data_info_t *)hos_func->fs2_info.reserved; data_info->tx_pkts[thread_id]++; data_info->tx_bytes[thread_id] += stream_len; MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, @@ -594,9 +552,9 @@ static int hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: PutObject failed. [%s:%s] cause:%s", bucket, object, Outcome.GetError().GetMessage().c_str()); - if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) + if (hos_func->fs2_info.fs2_handle && hos_func->fs2_info.reserved) { - data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; + data_info = (data_info_t *)hos_func->fs2_info.reserved; data_info->tx_failed_pkts[thread_id]++; data_info->tx_failed_bytes[thread_id] += stream_len; } @@ -641,15 +599,16 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t MESA_load_profile_uint_def(conf_path, module, "hos_poolsize", &hos_conf->pool_thread_size, 0); MESA_load_profile_uint_def(conf_path, module, "hos_cache_size", &hos_conf->cache_size, 102400); MESA_load_profile_uint_def(conf_path, module, "hos_cache_count", &hos_conf->cache_count, 10); - MESA_load_profile_uint_def(conf_path, module, "hos_fd_live_time_ms", &hos_conf->timeout, 1000); MESA_load_profile_string_nodef(conf_path, module, "hos_fs2_serverip", hos_conf->fs2_ip, INET6_ADDRSTRLEN); MESA_load_profile_uint_nodef(conf_path, module, "hos_fs2_serverport", &hos_conf->fs2_port); MESA_load_profile_string_def(conf_path, module, "hos_fs2_path", hos_conf->fs2_path, sizeof(hos_conf->fs2_path), "./hos_fs2.stat"); MESA_load_profile_uint_def(conf_path, module, "hos_fs2_format", &hos_conf->fs2_fmt, 0); + MESA_load_profile_uint_def(conf_path, module, "hos_request_num", &hos_conf->max_request_num, 100); + MESA_load_profile_uint_def(conf_path, module, "hos_request_context", &hos_conf->max_request_context, 10240000); if (hos_conf->ip && hos_conf->port && strlen(hos_conf->accesskeyid) && strlen(hos_conf->secretkey)) { g_hos_handle.log = MESA_create_runtime_log_handle(hos_conf->log_path, hos_conf->log_level); - if (log == NULL) + if (g_hos_handle.log == NULL) { g_hos_instance.result = false; g_hos_instance.error_code = HOS_RUNTIME_LOG_FAILED; @@ -693,17 +652,20 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t g_hos_instance.error_code = HOS_CONF_ERROR; snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "hos param error:hos ip:%s, hos port:%u, accesskeyid:%s, secretkey:%s", hos_conf->ip, hos_conf->port, hos_conf->accesskeyid, hos_conf->secretkey); - MESA_destroy_runtime_log_handle(g_hos_handle.log); return &g_hos_instance; } } int hos_create_bucket(const char *bucket) { - if ((bucket == NULL) || (g_hos_handle.S3Client == NULL)) + if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + { + return HOS_INSTANCE_NOT_INIT; + } + if (bucket == NULL) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_create_bucket", - "error:bucket:%s, s3client:%s", bucket, g_hos_handle.S3Client?"not null":"null"); + "error:bucket:%s", bucket); return HOS_PARAMETER_ERROR; } auto& S3Client = *g_hos_handle.S3Client; @@ -747,7 +709,12 @@ static int hos_upload_stream(const char *bucket, const char *object, const char int ret; int mode = 0; - if ((g_hos_handle.S3Client == NULL) || (bucket == NULL) || (object == NULL) || (thread_id > hos_conf->thread_num)) + if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + { + return HOS_INSTANCE_NOT_INIT; + } + + if ((bucket == NULL) || (object == NULL) || (thread_id > hos_conf->thread_num)) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_stream", "error: s3client:%s, bucket:%s, object:%s, thread_id:%lu, thread_num:%u", @@ -780,31 +747,22 @@ static int hos_upload_stream(const char *bucket, const char *object, const char request.SetBody(input_data); } //field_stat2 record - if (hos_func->fs2_info[0].fs2_handle) + if (hos_func->fs2_info.fs2_handle) { - if (hos_func->fs2_info[0].reserved) + if (hos_func->fs2_info.reserved) { - data_info = (data_info_t *)hos_func->fs2_info[0].reserved; + data_info = (data_info_t *)hos_func->fs2_info.reserved; data_info->rx_pkts[thread_id]++; data_info->rx_bytes[thread_id] += data_len; } } //设置回调函数 - size_t fd = hash_get_min_free_fd(thread_id); + size_t fd = ++g_fd_info[thread_id]; hos_fd_context_t info = {fd, 0, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, 0, 0, 0 }; add_fd_context(&g_fd_context[thread_id], &info); - - { - std::lock_guard<std::mutex> locker(m_client_lock); - if (g_hos_handle.hos_func.fd_thread == 0) - { - g_hos_handle.hos_func.fd_thread_status = 0; - pthread_create(&g_hos_handle.hos_func.fd_thread, NULL, hos_fd_manage, NULL); - } - } - + if (hos_conf->pool_thread_size > 0) { ret = hos_putobject_async(request, data_len, thread_id, fd, bucket, object); @@ -822,9 +780,6 @@ int hos_upload_file(const char *bucket, const char *file_path, put_finished_call struct stat buffer; if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, - "error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s", - g_hos_instance.result, (g_hos_handle.S3Client == NULL)?(NULL):("not null")); return HOS_INSTANCE_NOT_INIT; } @@ -848,9 +803,6 @@ int hos_upload_buf(const char *bucket, const char *object, const char *buf, size { if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, - "error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s", - g_hos_instance.result, (g_hos_handle.S3Client == NULL)?(NULL):("not null")); return HOS_INSTANCE_NOT_INIT; } @@ -865,102 +817,28 @@ int hos_upload_buf(const char *bucket, const char *object, const char *buf, size return hos_upload_stream(bucket, object, buf, buf_len, callback, userdata, thread_id); } -static void *hos_fd_manage(void *ptr) -{ - hos_fd_context_t *a_fd_context; - size_t thread_sum = g_hos_handle.hos_config.thread_num; - size_t thread_num; - //size_t fd; - while(1) - { - if (g_hos_handle.hos_func.fd_thread_status) - break; - for (thread_num = 0; thread_num < thread_sum; thread_num++) - { -#if 0 - a_fd_context = find_context_by_fd(g_fd_context[thread_num], fd); - if (!a_fd_context) - continue; -#endif - hos_fd_context_t *tmp = NULL; - HASH_ITER(hh, g_fd_context[thread_num], a_fd_context, tmp) - { - if (!a_fd_context) - break; - - if (a_fd_context->fd_status == HOS_FD_INJECT) - { - if (a_fd_context->position == a_fd_context->recive_cnt) - { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", - a_fd_context->bucket, a_fd_context->object, thread_num, a_fd_context->fd); - hos_delete_fd(thread_num, a_fd_context); - } - else if (a_fd_context->overtime <= get_current_ms()) - { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, - "error: [%s:%s] upload not completed, but the live-time of [thread_id:%lu fd:%lu] is over.", - a_fd_context->bucket, a_fd_context->object, thread_num, a_fd_context->fd); - hos_delete_fd(thread_num, a_fd_context); - } - } - } - } - usleep(500000); - } - pthread_exit(NULL); -} - int hos_open_fd(const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id, int mode) { if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, - "error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s", - g_hos_instance.result, (g_hos_handle.S3Client == NULL)?("null"):("not null")); return HOS_INSTANCE_NOT_INIT; } if ((bucket == NULL) || (object == NULL) || (thread_id > g_hos_handle.hos_config.thread_num) || strlen(bucket) == 0 || strlen(object) == 0) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_open_fd", "error: bucket:%s, obejct:%s, thread_id:%lu", - //(bucket == NULL)?"null":bucket, (object == NULL)?"null":object, thread_id); - bucket, object, thread_id); + (bucket == NULL)?"null":bucket, (object == NULL)?"null":object, thread_id); return HOS_PARAMETER_ERROR; } - size_t fd = hash_get_min_free_fd(thread_id); - if (fd == 0) - { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_open_fd", - "error:fd not enough, thread_id:%lu, fd free: %lu, fd register:%lu, fd inject:%lu", - thread_id, - g_fd_info[thread_id][HOS_FD_FREE], - g_fd_info[thread_id][HOS_FD_REGISTER], - g_fd_info[thread_id][HOS_FD_INJECT]); - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "debug: thread_id:%lu, fd:%lu", thread_id, fd); - return HOS_FD_NOT_ENOUGH; - } + size_t fd = ++g_fd_info[thread_id]; MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "debug: thread_id:%lu, fd:%lu", thread_id, fd); hos_fd_context_t info = {fd, mode, (char *)bucket, (char *)object, (void *)callback, userdata, NULL,/*cache*/ g_hos_handle.hos_config.cache_count, 0,/*position*/ 0,/*recive_cnt*/ - (long)g_hos_handle.hos_config.cache_size,/*cache_rest*/ HOS_FD_REGISTER,/*fd_status*/ - 0,/*overtime*/ g_hos_handle.hos_config.timeout,}; + (long)g_hos_handle.hos_config.cache_size,/*cache_rest*/ HOS_FD_REGISTER,/*fd_status*/}; add_fd_context(&g_fd_context[thread_id], &info); -#if 0 - { - std::lock_guard<std::mutex> locker(m_client_lock); - if (g_hos_handle.hos_func.fd_thread == 0) - { - g_hos_handle.hos_func.fd_thread_status = 0; - pthread_create(&g_hos_handle.hos_func.fd_thread, NULL, hos_fd_manage, NULL); - } - } -#endif - return fd; } @@ -977,13 +855,10 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, - "error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s", - g_hos_instance.result, (g_hos_handle.S3Client == NULL)?(NULL):("not null")); return HOS_INSTANCE_NOT_INIT; } - if ((fd < 3) || fd > MAX_HOS_CLIENT_FD_NUM || (stream == NULL) || (thread_id > hos_conf->thread_num)) + if ((stream == NULL) || (thread_id > hos_conf->thread_num)) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_write", "error: fd:%lu, stream:%s, stream_len:%lu, thread_id:%lu.", @@ -991,10 +866,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id return HOS_PARAMETER_ERROR; } - if (g_fd_info[thread_id][fd]) - { - a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd); - } + a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd); if (a_fd_context == NULL) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: fd info not find. thread_id:%lu, fd:%lu", thread_id, fd); @@ -1011,11 +883,11 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id //BUFF_MODE //field_stat2 record - if (hos_func->fs2_info[0].fs2_handle) + if (hos_func->fs2_info.fs2_handle) { - if (hos_func->fs2_info[0].reserved) + if (hos_func->fs2_info.reserved) { - data_info = (data_info_t *)hos_func->fs2_info[0].reserved; + data_info = (data_info_t *)hos_func->fs2_info.reserved; data_info->rx_pkts[thread_id]++; data_info->rx_bytes[thread_id] += stream_len; } @@ -1079,11 +951,11 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id request.SetBody(input_data); upload_len = buffer.st_size; //field_stat2 record - if (hos_func->fs2_info[0].fs2_handle) + if (hos_func->fs2_info.fs2_handle) { - if (hos_func->fs2_info[0].reserved) + if (hos_func->fs2_info.reserved) { - data_info = (data_info_t *)hos_func->fs2_info[0].reserved; + data_info = (data_info_t *)hos_func->fs2_info.reserved; data_info->rx_pkts[thread_id]++; data_info->rx_bytes[thread_id] += upload_len; } @@ -1124,28 +996,22 @@ int hos_close_fd(size_t fd, size_t thread_id) if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, - "error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s", - g_hos_instance.result, (g_hos_handle.S3Client == NULL)?("null"):("not null")); return HOS_INSTANCE_NOT_INIT; } - if (fd < 3 || fd > 65533 || thread_id > hos_conf->thread_num) + if (thread_id > hos_conf->thread_num) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_close_fd", "error:fd:%lu, thread_id:%lu, thread_sum:%u.", fd, thread_id, hos_conf->thread_num); return HOS_PARAMETER_ERROR; } - if (g_fd_info[thread_id][fd]) - { - a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd); - } + a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd); if (a_fd_context == NULL) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, - "hos_close_fd", "debug: not find the a_fd_context of [fd:%lu thread:%lu]", - fd, thread_id); + "hos_close_fd", "debug: not find the a_fd_context of [thread:%lu fd:%lu]", + thread_id, fd); return HOS_CLIENT_OK; } @@ -1179,18 +1045,32 @@ int hos_close_fd(size_t fd, size_t thread_id) { hos_putobject_sync(request, upload_len, thread_id, fd, a_fd_context->bucket, a_fd_context->object); } - ((data_info_t *)(g_hos_handle.hos_func.fs2_info->reserved))->cache[thread_id] = 0; + ((data_info_t *)(g_hos_handle.hos_func.fs2_info.reserved))->cache[thread_id] = 0; } } a_fd_context->fd_status = HOS_FD_INJECT; a_fd_context->cache.reset(); a_fd_context->cache = NULL; - a_fd_context->overtime = get_current_ms() + a_fd_context->timeout; a_fd_context->cache_rest = hos_conf->cache_size; a_fd_context->cache_count = hos_conf->cache_count; - g_fd_info[thread_id][HOS_FD_REGISTER]--; - g_fd_info[thread_id][HOS_FD_INJECT]++; + if (hos_conf->pool_thread_size == 0) + { + //同步模式,立即释放fd + hos_delete_fd(thread_id, a_fd_context); + } + else + { + //异步APPEND 模式,判断是否可以释放 + //异步其他模式,在PutObjectAsyncFinished出释放fd + if (a_fd_context->mode == (BUFF_MODE | APPEND_MODE) && a_fd_context->position == a_fd_context->recive_cnt) + { + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", + a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd); + hos_delete_fd(thread_id, a_fd_context); + } + } return HOS_CLIENT_OK; } @@ -1203,10 +1083,9 @@ int hos_shutdown_instance() hos_func_thread_t *hos_func = &g_hos_handle.hos_func; size_t task_num = 0; - if (g_hos_handle.S3Client == NULL) + if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_shutdown_instance", "debug: There is no hos client."); - return HOS_CLIENT_OK; + return HOS_INSTANCE_NOT_INIT; } if (g_hos_handle.count > 0 && --g_hos_handle.count) @@ -1241,17 +1120,17 @@ int hos_shutdown_instance() pthread_join(hos_func->fs2_thread, NULL); for (i = 0; i < FS2_RECORD_EVENTS; i++) { - screen_stat_handle_t *fs2_handle = &hos_func->fs2_info[i].fs2_handle; + screen_stat_handle_t *fs2_handle = &hos_func->fs2_info.fs2_handle; if (*fs2_handle) { FS_stop(fs2_handle); *fs2_handle = NULL; } - if (hos_func->fs2_info[i].reserved) + if (hos_func->fs2_info.reserved) { if (i == 0) { - data_info_t * data_info = (data_info_t *)hos_func->fs2_info[i].reserved; + data_info_t * data_info = (data_info_t *)hos_func->fs2_info.reserved; if (data_info->rx_pkts) free(data_info->rx_pkts); if (data_info->rx_bytes) @@ -1267,18 +1146,18 @@ int hos_shutdown_instance() if (data_info->cache) free(data_info->cache); } - free(hos_func->fs2_info[i].reserved); - hos_func->fs2_info[i].reserved = NULL; + free(hos_func->fs2_info.reserved); + hos_func->fs2_info.reserved = NULL; } - if (hos_func->fs2_info[i].line_ids) + if (hos_func->fs2_info.line_ids) { - free(hos_func->fs2_info[i].line_ids); - hos_func->fs2_info[i].line_ids=NULL; + free(hos_func->fs2_info.line_ids); + hos_func->fs2_info.line_ids=NULL; } - if (hos_func->fs2_info[i].column_ids) + if (hos_func->fs2_info.column_ids) { - free(hos_func->fs2_info[i].column_ids); - hos_func->fs2_info[i].column_ids=NULL; + free(hos_func->fs2_info.column_ids); + hos_func->fs2_info.column_ids=NULL; } } } @@ -1290,6 +1169,11 @@ int hos_shutdown_instance() free(g_hos_handle.task_num); g_hos_handle.task_num = NULL; } + if (g_hos_handle.task_context != NULL) + { + free(g_hos_handle.task_context); + g_hos_handle.task_context = NULL; + } MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: delete s3client."); if (g_fd_info) diff --git a/src/hos_common.h b/src/hos_common.h index 9c9db6a0..29d9dce0 100644 --- a/src/hos_common.h +++ b/src/hos_common.h @@ -68,7 +68,8 @@ typedef struct hos_config_s uint32_t thread_num; uint32_t cache_size; uint32_t cache_count; - uint32_t timeout; + uint32_t max_request_num; + uint32_t max_request_context; }hos_config_t; typedef struct hos_func_thread_s @@ -77,7 +78,7 @@ typedef struct hos_func_thread_s pthread_t fd_thread; int fd_thread_status; /* fs2 管理线程 */ - fs2_info_t fs2_info[FS2_RECORD_EVENTS]; //0: data info; 1: fd info; 2 cache info; 3 PoolThread state + fs2_info_t fs2_info; pthread_t fs2_thread; int fs2_status; #define HOS_FS2_START 1 @@ -98,11 +99,12 @@ typedef struct hos_client_handle_s hos_func_thread_t hos_func; void *log; size_t *task_num; + size_t *task_context; }hos_client_handle_t; extern struct hos_instance_s g_hos_instance; extern hos_client_handle_t g_hos_handle;//一个进程只允许有一个g_hos_handle extern hos_fd_context_t **g_fd_context; -extern size_t (*g_fd_info)[MAX_HOS_CLIENT_FD_NUM + 1]; //fd 实际从3开始, fd[thread_id][0]记录register的fd,fd[thread_id][1]记录inject的fd +extern size_t *g_fd_info; //fd 实际从1开始,每个线程有独立的fd #endif
\ No newline at end of file diff --git a/src/hos_hash.cpp b/src/hos_hash.cpp index fff9b474..bb7e2538 100644 --- a/src/hos_hash.cpp +++ b/src/hos_hash.cpp @@ -44,8 +44,6 @@ void add_fd_context(hos_fd_context_t **handle, hos_fd_context_t *input) value->position = input->position; value->recive_cnt = input->recive_cnt; value->fd_status = value->fd_status; - value->overtime = value->overtime; - value->timeout = value->timeout; } } diff --git a/src/hos_hash.h b/src/hos_hash.h index 1b196df3..c23e3d51 100644 --- a/src/hos_hash.h +++ b/src/hos_hash.h @@ -25,11 +25,8 @@ typedef struct hos_fd_context_s size_t recive_cnt; long cache_rest; int fd_status; -#define HOS_FD_FREE 0 -#define HOS_FD_REGISTER 1 -#define HOS_FD_INJECT 2 - size_t overtime; //计算后的时间点,超过即inject fd - size_t timeout; //配置的超时时间,从status变成INJECT开始计时 +#define HOS_FD_REGISTER 0 +#define HOS_FD_INJECT 1 UT_hash_handle hh; }hos_fd_context_t; |
