summaryrefslogtreecommitdiff
path: root/src/hos_client.cpp
diff options
context:
space:
mode:
author“pengxuanzheng” <[email protected]>2022-03-02 10:35:26 +0000
committer“pengxuanzheng” <[email protected]>2022-03-04 04:35:47 +0000
commit16d71d2fe65c457308512dfadeb7d15c17d5eeae (patch)
tree9f04a8e32a487bb090516fa411d4f9b4b24aef44 /src/hos_client.cpp
parentd4e8b149c85b7310d3073b3eea2b0d5f90c0c351 (diff)
🐞 fix(TSG-9807): 修复cache_size设置为0导致的内存快速消耗v3.0.3
Diffstat (limited to 'src/hos_client.cpp')
-rw-r--r--src/hos_client.cpp77
1 files changed, 41 insertions, 36 deletions
diff --git a/src/hos_client.cpp b/src/hos_client.cpp
index 81989cba..b20fc689 100644
--- a/src/hos_client.cpp
+++ b/src/hos_client.cpp
@@ -14,10 +14,6 @@ extern "C"
#include <aws/s3/model/CreateBucketRequest.h>
#include <fstream>
#include <iostream>
-#include <aws/external/gtest.h>
-#include <aws/testing/platform/PlatformTesting.h>
-#include <aws/testing/TestingEnvironment.h>
-#include <aws/testing/MemoryTesting.h>
#ifdef HOS_MOCK
#include "mock/hos_mock.h"
#endif
@@ -45,12 +41,12 @@ extern "C"
struct hos_instance_s g_hos_instance;
hos_client_handle_t g_hos_handle;//一个进程只允许有一个g_hos_handle
-static std::mutex m_client_lock;
-static std::mutex m_instance_lock;
-static std::mutex m_delete_lock;
-static Aws::SDKOptions g_options;
-Aws::Auth::AWSCredentials g_credentials;
-Aws::Client::ClientConfiguration *g_client_config;
+static std::mutex m_hos_client_lock;
+static std::mutex m_hos_instance_lock;
+static std::mutex m_hos_delete_lock;
+static Aws::SDKOptions g_hos_options;
+Aws::Auth::AWSCredentials g_hos_credentials;
+Aws::Client::ClientConfiguration *g_hos_client_config;
static int hos_delete_fd(size_t fd, size_t thread_id)
{
@@ -152,7 +148,7 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
if (a_fd_context->mode & APPEND_MODE)
{
- std::lock_guard<std::mutex> locker(m_delete_lock);
+ std::lock_guard<std::mutex> locker(m_hos_delete_lock);
//APPEND MODE 保留fd
atomic_add(&(a_fd_context->recive_cnt), 1);
if (a_fd_context->fd_status == HOS_FD_CANCEL)
@@ -177,6 +173,8 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
}
atomic_sub(&g_hos_handle.task_num[thread_id], 1);
atomic_sub(&g_hos_handle.task_context[thread_id], stream_len);
+ MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__,
+ "thread_id:%zu, task_num:%zu, task_content:%zu", thread_id, g_hos_handle.task_num[thread_id], g_hos_handle.task_context[thread_id]);
}
static int hos_attempt_connection()
@@ -421,22 +419,22 @@ static void hos_client_create()
{
hos_config_t *hos_conf = &g_hos_handle.hos_config;
- Aws::InitAPI(g_options);
- g_client_config = new Aws::Client::ClientConfiguration();
- g_credentials.SetAWSAccessKeyId(hos_conf->accesskeyid);
- g_credentials.SetAWSSecretKey(hos_conf->secretkey);
+ Aws::InitAPI(g_hos_options);
+ g_hos_client_config = new Aws::Client::ClientConfiguration();
+ g_hos_credentials.SetAWSAccessKeyId(hos_conf->accesskeyid);
+ g_hos_credentials.SetAWSSecretKey(hos_conf->secretkey);
//初始化
char endpoint[128];
snprintf(endpoint, 128, "http://%s:%u/hos/", hos_conf->ip, hos_conf->port);
- g_client_config->endpointOverride.append(endpoint);
- g_client_config->verifySSL = false;
- g_client_config->enableEndpointDiscovery = true;
+ g_hos_client_config->endpointOverride.append(endpoint);
+ g_hos_client_config->verifySSL = false;
+ g_hos_client_config->enableEndpointDiscovery = true;
if (hos_conf->pool_thread_size > 0)
{
//异步模式
//config.executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY)); //支持线程池
- g_client_config->executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::QUEUE_TASKS_EVENLY_ACCROSS_THREADS)); //支持线程池
+ g_hos_client_config->executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::QUEUE_TASKS_EVENLY_ACCROSS_THREADS)); //支持线程池
}
else
{
@@ -444,12 +442,12 @@ static void hos_client_create()
}
#ifndef HOS_MOCK
- g_hos_handle.S3Client = new Aws::S3::S3Client(g_credentials, *g_client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
+ g_hos_handle.S3Client = new Aws::S3::S3Client(g_hos_credentials, *g_hos_client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
#else
- g_hos_handle.S3Client = new Aws::S3::S3ClientMock(g_credentials, *g_client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
+ g_hos_handle.S3Client = new Aws::S3::S3ClientMock(g_hos_credentials, *g_hos_client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
#endif
- g_hos_instance.hos_url_prefix = g_client_config->endpointOverride.c_str();
+ g_hos_instance.hos_url_prefix = g_hos_client_config->endpointOverride.c_str();
//hos 检测服务端是否可以连接上
int ret = hos_attempt_connection();
if (ret != HOS_CLIENT_OK && ret != NETWORK_CONNECTION)
@@ -558,7 +556,9 @@ static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t
atomic_add(&g_hos_handle.task_context[thread_id], stream_len);
//不算真正成功,需要等到PutObjectAsyncFinished的结果
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__,
- "debug: [%s/%s/%s] PutObjectAsync success.", g_hos_instance.hos_url_prefix, bucket, object);
+ "debug: [%s/%s/%s] PutObjectAsync success. thread:%zu, task_num:%zu, task_content:%zu",
+ g_hos_instance.hos_url_prefix, bucket, object,
+ thread_id, g_hos_handle.task_num[thread_id], g_hos_handle.task_context[thread_id]);
return HOS_CLIENT_OK;
}
@@ -653,7 +653,7 @@ const char *hos_get_upload_endpoint()
hos_instance hos_init_instance(const char *conf_path, const char *module, size_t thread_num)
{
- std::lock_guard<std::mutex> locker(m_client_lock);
+ std::lock_guard<std::mutex> locker(m_hos_client_lock);
hos_config_t *hos_conf = &g_hos_handle.hos_config;
memset(&g_hos_handle, 0, sizeof(g_hos_handle));
@@ -778,7 +778,7 @@ static int hos_upload_stream(const char *bucket, const char *object, const char
return HOS_PARAMETER_ERROR;
}
- mode = data?1:0; // 0, file mode; 1 buf mode
+ mode = data?FILE_MODE:BUFF_MODE; // 0, file mode; 1 buf mode
// Create and configure the asynchronous put object request.
Aws::S3::Model::PutObjectRequest request;
@@ -830,6 +830,7 @@ static int hos_upload_stream(const char *bucket, const char *object, const char
else
{
ret = hos_putobject_sync(request, data_len, thread_id, &hos_fd);
+ hos_delete_fd((size_t)hos_fd, thread_id);
}
return ret;
@@ -974,18 +975,24 @@ int hos_write(size_t fd, const char *stream, size_t stream_len)
}
Aws::String buffer(stream, stream_len);
*a_fd_context->cache << buffer;
- a_fd_context->cache_rest -= stream_len;
+
if (data_info != NULL)
+ {
data_info->cache[thread_id] += stream_len;
- if (a_fd_context->cache_count == 0 || --a_fd_context->cache_count)
+ }
+
+ if (a_fd_context->cache_rest > stream_len)
{
- //cache_count == 0,不设置cache_count的情况
- //cache_count > 0,设置cache_count的情况
- if (a_fd_context->cache_rest > 0)
+ a_fd_context->cache_rest -= stream_len;
+ if (a_fd_context->cache_count > 0 && --a_fd_context->cache_count)
{
return HOS_IN_CACHE;
}
}
+ else
+ {
+ a_fd_context->cache_rest = 0;
+ }
request.SetBody(a_fd_context->cache);
// add headers
@@ -1015,7 +1022,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len)
//恢复fd 的cache设置
if (a_fd_context->mode & APPEND_MODE)
{
- if (data_info)
+ if (data_info && data_info->cache[thread_id])
data_info->cache[thread_id] -= upload_len;
a_fd_context->cache.reset();
a_fd_context->cache = NULL;
@@ -1106,7 +1113,7 @@ int hos_close_fd(size_t fd)
{
//异步APPEND 模式,判断是否可以释放
//异步其他模式,在PutObjectAsyncFinished出释放fd
- std::lock_guard<std::mutex> locker(m_delete_lock);
+ std::lock_guard<std::mutex> locker(m_hos_delete_lock);
a_fd_context->fd_status = HOS_FD_CANCEL;
if (a_fd_context->mode == (BUFF_MODE | APPEND_MODE) && a_fd_context->position == a_fd_context->recive_cnt)
{
@@ -1129,7 +1136,7 @@ int hos_close_fd(size_t fd)
int hos_shutdown_instance()
{
- std::lock_guard<std::mutex> locker(m_instance_lock);
+ std::lock_guard<std::mutex> locker(m_hos_instance_lock);
hos_func_thread_t *hos_func = &g_hos_handle.hos_func;
if (atomic_read(&g_hos_instance.status) == INSTANCE_UNINIT_STATE)
@@ -1167,8 +1174,6 @@ int hos_shutdown_instance()
usleep(500 * 1000);
}
- Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
-
if (hos_func->fd_thread)
{
hos_func->fd_thread_status = 1;
@@ -1234,7 +1239,7 @@ int hos_shutdown_instance()
}
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: [%s] delete s3client.", g_hos_instance.hos_url_prefix);
- Aws::ShutdownAPI(g_options);
+ Aws::ShutdownAPI(g_hos_options);
MESA_destroy_runtime_log_handle(g_hos_handle.log);
g_hos_handle.log = NULL;
memset(&g_hos_handle, 0 , sizeof(g_hos_handle));