diff options
| author | “pengxuanzheng” <[email protected]> | 2022-03-02 10:35:26 +0000 |
|---|---|---|
| committer | “pengxuanzheng” <[email protected]> | 2022-03-04 04:35:47 +0000 |
| commit | 16d71d2fe65c457308512dfadeb7d15c17d5eeae (patch) | |
| tree | 9f04a8e32a487bb090516fa411d4f9b4b24aef44 /src/hos_client.cpp | |
| parent | d4e8b149c85b7310d3073b3eea2b0d5f90c0c351 (diff) | |
🐞 fix(TSG-9807): 修复cache_size设置为0导致的内存快速消耗v3.0.3
Diffstat (limited to 'src/hos_client.cpp')
| -rw-r--r-- | src/hos_client.cpp | 77 |
1 files changed, 41 insertions, 36 deletions
diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 81989cba..b20fc689 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -14,10 +14,6 @@ extern "C" #include <aws/s3/model/CreateBucketRequest.h> #include <fstream> #include <iostream> -#include <aws/external/gtest.h> -#include <aws/testing/platform/PlatformTesting.h> -#include <aws/testing/TestingEnvironment.h> -#include <aws/testing/MemoryTesting.h> #ifdef HOS_MOCK #include "mock/hos_mock.h" #endif @@ -45,12 +41,12 @@ extern "C" struct hos_instance_s g_hos_instance; hos_client_handle_t g_hos_handle;//一个进程只允许有一个g_hos_handle -static std::mutex m_client_lock; -static std::mutex m_instance_lock; -static std::mutex m_delete_lock; -static Aws::SDKOptions g_options; -Aws::Auth::AWSCredentials g_credentials; -Aws::Client::ClientConfiguration *g_client_config; +static std::mutex m_hos_client_lock; +static std::mutex m_hos_instance_lock; +static std::mutex m_hos_delete_lock; +static Aws::SDKOptions g_hos_options; +Aws::Auth::AWSCredentials g_hos_credentials; +Aws::Client::ClientConfiguration *g_hos_client_config; static int hos_delete_fd(size_t fd, size_t thread_id) { @@ -152,7 +148,7 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, if (a_fd_context->mode & APPEND_MODE) { - std::lock_guard<std::mutex> locker(m_delete_lock); + std::lock_guard<std::mutex> locker(m_hos_delete_lock); //APPEND MODE 保留fd atomic_add(&(a_fd_context->recive_cnt), 1); if (a_fd_context->fd_status == HOS_FD_CANCEL) @@ -177,6 +173,8 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, } atomic_sub(&g_hos_handle.task_num[thread_id], 1); atomic_sub(&g_hos_handle.task_context[thread_id], stream_len); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "thread_id:%zu, task_num:%zu, task_content:%zu", thread_id, g_hos_handle.task_num[thread_id], g_hos_handle.task_context[thread_id]); } static int hos_attempt_connection() @@ -421,22 +419,22 @@ static void hos_client_create() { hos_config_t *hos_conf = &g_hos_handle.hos_config; - Aws::InitAPI(g_options); - g_client_config = new Aws::Client::ClientConfiguration(); - g_credentials.SetAWSAccessKeyId(hos_conf->accesskeyid); - g_credentials.SetAWSSecretKey(hos_conf->secretkey); + Aws::InitAPI(g_hos_options); + g_hos_client_config = new Aws::Client::ClientConfiguration(); + g_hos_credentials.SetAWSAccessKeyId(hos_conf->accesskeyid); + g_hos_credentials.SetAWSSecretKey(hos_conf->secretkey); //初始化 char endpoint[128]; snprintf(endpoint, 128, "http://%s:%u/hos/", hos_conf->ip, hos_conf->port); - g_client_config->endpointOverride.append(endpoint); - g_client_config->verifySSL = false; - g_client_config->enableEndpointDiscovery = true; + g_hos_client_config->endpointOverride.append(endpoint); + g_hos_client_config->verifySSL = false; + g_hos_client_config->enableEndpointDiscovery = true; if (hos_conf->pool_thread_size > 0) { //异步模式 //config.executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY)); //支持线程池 - g_client_config->executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::QUEUE_TASKS_EVENLY_ACCROSS_THREADS)); //支持线程池 + g_hos_client_config->executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::QUEUE_TASKS_EVENLY_ACCROSS_THREADS)); //支持线程池 } else { @@ -444,12 +442,12 @@ static void hos_client_create() } #ifndef HOS_MOCK - g_hos_handle.S3Client = new Aws::S3::S3Client(g_credentials, *g_client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); + g_hos_handle.S3Client = new Aws::S3::S3Client(g_hos_credentials, *g_hos_client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); #else - g_hos_handle.S3Client = new Aws::S3::S3ClientMock(g_credentials, *g_client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); + g_hos_handle.S3Client = new Aws::S3::S3ClientMock(g_hos_credentials, *g_hos_client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); #endif - g_hos_instance.hos_url_prefix = g_client_config->endpointOverride.c_str(); + g_hos_instance.hos_url_prefix = g_hos_client_config->endpointOverride.c_str(); //hos 检测服务端是否可以连接上 int ret = hos_attempt_connection(); if (ret != HOS_CLIENT_OK && ret != NETWORK_CONNECTION) @@ -558,7 +556,9 @@ static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t atomic_add(&g_hos_handle.task_context[thread_id], stream_len); //不算真正成功,需要等到PutObjectAsyncFinished的结果 MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: [%s/%s/%s] PutObjectAsync success.", g_hos_instance.hos_url_prefix, bucket, object); + "debug: [%s/%s/%s] PutObjectAsync success. thread:%zu, task_num:%zu, task_content:%zu", + g_hos_instance.hos_url_prefix, bucket, object, + thread_id, g_hos_handle.task_num[thread_id], g_hos_handle.task_context[thread_id]); return HOS_CLIENT_OK; } @@ -653,7 +653,7 @@ const char *hos_get_upload_endpoint() hos_instance hos_init_instance(const char *conf_path, const char *module, size_t thread_num) { - std::lock_guard<std::mutex> locker(m_client_lock); + std::lock_guard<std::mutex> locker(m_hos_client_lock); hos_config_t *hos_conf = &g_hos_handle.hos_config; memset(&g_hos_handle, 0, sizeof(g_hos_handle)); @@ -778,7 +778,7 @@ static int hos_upload_stream(const char *bucket, const char *object, const char return HOS_PARAMETER_ERROR; } - mode = data?1:0; // 0, file mode; 1 buf mode + mode = data?FILE_MODE:BUFF_MODE; // 0, file mode; 1 buf mode // Create and configure the asynchronous put object request. Aws::S3::Model::PutObjectRequest request; @@ -830,6 +830,7 @@ static int hos_upload_stream(const char *bucket, const char *object, const char else { ret = hos_putobject_sync(request, data_len, thread_id, &hos_fd); + hos_delete_fd((size_t)hos_fd, thread_id); } return ret; @@ -974,18 +975,24 @@ int hos_write(size_t fd, const char *stream, size_t stream_len) } Aws::String buffer(stream, stream_len); *a_fd_context->cache << buffer; - a_fd_context->cache_rest -= stream_len; + if (data_info != NULL) + { data_info->cache[thread_id] += stream_len; - if (a_fd_context->cache_count == 0 || --a_fd_context->cache_count) + } + + if (a_fd_context->cache_rest > stream_len) { - //cache_count == 0,不设置cache_count的情况 - //cache_count > 0,设置cache_count的情况 - if (a_fd_context->cache_rest > 0) + a_fd_context->cache_rest -= stream_len; + if (a_fd_context->cache_count > 0 && --a_fd_context->cache_count) { return HOS_IN_CACHE; } } + else + { + a_fd_context->cache_rest = 0; + } request.SetBody(a_fd_context->cache); // add headers @@ -1015,7 +1022,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len) //恢复fd 的cache设置 if (a_fd_context->mode & APPEND_MODE) { - if (data_info) + if (data_info && data_info->cache[thread_id]) data_info->cache[thread_id] -= upload_len; a_fd_context->cache.reset(); a_fd_context->cache = NULL; @@ -1106,7 +1113,7 @@ int hos_close_fd(size_t fd) { //异步APPEND 模式,判断是否可以释放 //异步其他模式,在PutObjectAsyncFinished出释放fd - std::lock_guard<std::mutex> locker(m_delete_lock); + std::lock_guard<std::mutex> locker(m_hos_delete_lock); a_fd_context->fd_status = HOS_FD_CANCEL; if (a_fd_context->mode == (BUFF_MODE | APPEND_MODE) && a_fd_context->position == a_fd_context->recive_cnt) { @@ -1129,7 +1136,7 @@ int hos_close_fd(size_t fd) int hos_shutdown_instance() { - std::lock_guard<std::mutex> locker(m_instance_lock); + std::lock_guard<std::mutex> locker(m_hos_instance_lock); hos_func_thread_t *hos_func = &g_hos_handle.hos_func; if (atomic_read(&g_hos_instance.status) == INSTANCE_UNINIT_STATE) @@ -1167,8 +1174,6 @@ int hos_shutdown_instance() usleep(500 * 1000); } - Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets); - if (hos_func->fd_thread) { hos_func->fd_thread_status = 1; @@ -1234,7 +1239,7 @@ int hos_shutdown_instance() } MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: [%s] delete s3client.", g_hos_instance.hos_url_prefix); - Aws::ShutdownAPI(g_options); + Aws::ShutdownAPI(g_hos_options); MESA_destroy_runtime_log_handle(g_hos_handle.log); g_hos_handle.log = NULL; memset(&g_hos_handle, 0 , sizeof(g_hos_handle)); |
