diff options
| author | “pengxuanzheng” <[email protected]> | 2021-09-02 16:04:15 +0800 |
|---|---|---|
| committer | “pengxuanzheng” <[email protected]> | 2021-09-02 16:04:15 +0800 |
| commit | 0de85d51bcb42319126ee7fca8079df13021e6b3 (patch) | |
| tree | 46f8e6da18cd967192381a71d4c1494a4c118daf | |
| parent | d775b9d7e4d817e64ef3c959cd9b483474f1b880 (diff) | |
🐞 fix(TSG-7599): hos client 初始化失败,定期尝试重连hos服务
| -rw-r--r-- | CMakeLists.txt | 1 | ||||
| -rw-r--r-- | example/demo/conf/default.conf | 2 | ||||
| -rw-r--r-- | gtest/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/hos_client.cpp | 248 | ||||
| -rw-r--r-- | src/hos_client.h | 15 | ||||
| -rw-r--r-- | src/hos_common.h | 20 |
6 files changed, 197 insertions, 91 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 78d82f3e..4365f138 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,6 +17,7 @@ set(SUPPORT_INSTALL_PREFIX /usr/local/lib64) add_subdirectory(support) add_subdirectory(src) +#add_subdirectory(gtest) install(FILES ${CMAKE_CURRENT_BINARY_DIR}/src/libhos-client-cpp.so DESTINATION ${CMAKE_INSTALL_PREFIX}/lib COMPONENT LIBRARIES) install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/src/hos_client.h DESTINATION ${CMAKE_INSTALL_PREFIX}/include COMPONENT HEADER) diff --git a/example/demo/conf/default.conf b/example/demo/conf/default.conf index 3d95a693..97d3fb19 100644 --- a/example/demo/conf/default.conf +++ b/example/demo/conf/default.conf @@ -13,4 +13,4 @@ hos_fd_live_time_ms=1000 #default hos_fs2_serverip=127.0.0.1 hos_fs2_serverport=10086 hos_fs2_path="./log/hos_fs2_log" #default -hos_fs2_format=0 #defaul
\ No newline at end of file +hos_fs2_format=0 #defaul diff --git a/gtest/CMakeLists.txt b/gtest/CMakeLists.txt index db8c5f56..dc244b81 100644 --- a/gtest/CMakeLists.txt +++ b/gtest/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 3.0) +cmake_minimum_required(VERSION 2.8) project(gtest_hos_client) aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR} SRCS) diff --git a/src/hos_client.cpp b/src/hos_client.cpp index fb523b8c..188cd97c 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -51,6 +51,8 @@ static std::mutex m_delete_lock; hos_fd_context_t **g_fd_context; size_t *g_fd_info; //fd 实际从1开始,每个线程有独立的fd static Aws::SDKOptions g_options; +Aws::Auth::AWSCredentials g_credentials; +Aws::Client::ClientConfiguration g_client_config; static inline size_t get_current_ms() { @@ -180,43 +182,23 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, atomic_sub(&g_hos_handle.task_context[thread_id], stream_len); } -static void hos_client_create() +//TODO 解决atempt 不加锁问题 +/** + * 全局唯一一个示例,没有必要获取到实例,只要获取实例状态 + * 实例状态分为:uninit_state, attempt_state, enable_state + * 初始化前 uninit_state,初始化成功enable_state, 初始化失败attempt_state + * 只有状态为 uninit_state才允许调用hos_init_instance + * 只有状态为 enable_state才允许进行hos上传操作 + * filed_stat2功能只有在enable_state状态才允许运行 +*/ + +static int hos_attempt_connection() { hos_config_t *hos_conf = &g_hos_handle.hos_config; - void *log = g_hos_handle.log; - - if (g_hos_handle.S3Client != NULL) - { - g_hos_handle.count++; - g_hos_instance.result = true; - return ; - } - - Aws::InitAPI(g_options); - Aws::Client::ClientConfiguration config; - Aws::Auth::AWSCredentials credentials(hos_conf->accesskeyid, hos_conf->secretkey); - - //初始化 - char endpoint[128]; - snprintf(endpoint, 128, "http://%s:%d/hos/", hos_conf->ip, hos_conf->port); - config.endpointOverride = endpoint; - config.verifySSL = false; - config.enableEndpointDiscovery = true; - if (hos_conf->pool_thread_size > 0) - { - //异步模式 - //config.executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY)); //支持线程池 - config.executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::QUEUE_TASKS_EVENLY_ACCROSS_THREADS)); //支持线程池 - } - else - { - //同步模式 - } - #ifndef HOS_MOCK - g_hos_handle.S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); + g_hos_handle.S3Client = new Aws::S3::S3Client(g_credentials, g_client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); #else - g_hos_handle.S3Client = new Aws::S3::S3ClientMock(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); + g_hos_handle.S3Client = new Aws::S3::S3ClientMock(credentials, g_client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); #endif /* 获取当前用户的所有的buckets */ Aws::S3::Model::ListBucketsOutcome outcome = g_hos_handle.S3Client->ListBuckets(); @@ -228,30 +210,89 @@ static void hos_client_create() Aws::ShutdownAPI(g_options); g_hos_instance.error_code = (size_t)outcome.GetError().GetErrorType() + 1; snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, outcome.GetError().GetMessage().c_str()); - g_hos_instance.result = false; - MESA_HANDLE_RUNTIME_LOG(log, RLOG_LV_FATAL, "hos_client_create", "error: %s", g_hos_instance.error_message); - return; + atomic_set(&g_hos_instance.status, INSTANCE_ATTEMPT_STATE); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "%s error: %s", g_client_config.endpointOverride.c_str(), g_hos_instance.error_message); + return g_hos_instance.error_code; } g_hos_handle.buckets = outcome.GetResult().GetBuckets(); g_hos_handle.count++; - g_hos_handle.executor = std::dynamic_pointer_cast<Aws::Utils::Threading::PooledThreadExecutor>(config.executor); g_hos_handle.task_num = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); g_hos_handle.task_context = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); g_fd_context = (hos_fd_context_t **)calloc(hos_conf->thread_num, sizeof(hos_fd_context_t *)); g_fd_info = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); + + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: hos s3client create success, url:%s.",g_client_config.endpointOverride.c_str()); + g_hos_instance.error_code = 0; + g_hos_instance.error_message[0] = '\0'; + g_hos_instance.hos_url_prefix = g_client_config.endpointOverride.c_str(); + atomic_set(&g_hos_instance.status, INSTANCE_ENABLE_STATE); + + return HOS_CLIENT_OK; +} + +static void *hos_attempt_connection_exhaustively(void *ptr) +{ + while(atomic_read(&g_hos_handle.hos_func.hos_client_retry_thread_status) == 0) + { + if (hos_attempt_connection() == HOS_CLIENT_OK) + { + atomic_set(&g_hos_handle.hos_func.hos_client_retry_thread_status, 1); + hos_expand_fs2(); + pthread_exit(NULL); + } + sleep(60); + } + pthread_exit(NULL); +} + + +static void hos_client_create() +{ + hos_config_t *hos_conf = &g_hos_handle.hos_config; + void *log = g_hos_handle.log; + + //每调用一次此函数,那么就必须调用一次destroy函数 + atomic_add(&g_hos_handle.count, 1); + if (atomic_read(&g_hos_instance.status) != INSTANCE_UNINIT_STATE) + { + return; + } + + Aws::InitAPI(g_options); + g_credentials.SetAWSAccessKeyId(hos_conf->accesskeyid); + g_credentials.SetAWSSecretKey(hos_conf->secretkey); + + //初始化 + char endpoint[128]; + snprintf(endpoint, 128, "http://%s:%d/hos/", hos_conf->ip, hos_conf->port); + g_client_config.endpointOverride = endpoint; + g_client_config.verifySSL = false; + g_client_config.enableEndpointDiscovery = true; + if (hos_conf->pool_thread_size > 0) + { + //异步模式 + //g_client_config.executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY)); //支持线程池 + g_client_config.executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::QUEUE_TASKS_EVENLY_ACCROSS_THREADS)); //支持线程池 + } + else + { + //同步模式 + } - #if 0 - if (g_hos_handle.hos_func.fd_thread == 0) + if (hos_attempt_connection() != HOS_CLIENT_OK) { - g_hos_handle.hos_func.fd_thread_status = 0; - pthread_create(&g_hos_handle.hos_func.fd_thread, NULL, hos_fd_manage, NULL); + if (g_hos_handle.hos_func.hos_client_retry_thread_id == 0) + { + g_hos_handle.hos_func.hos_client_retry_thread_status = 0; + pthread_create(&g_hos_handle.hos_func.hos_client_retry_thread_id, NULL, hos_attempt_connection_exhaustively, NULL); + } } + +#if 0 #endif - MESA_HANDLE_RUNTIME_LOG(log, RLOG_LV_DEBUG, "hos_client_create", "debug: hos s3client create success, url:%s.",endpoint); - g_hos_instance.result = true; } bool hos_verify_bucket(const char *bucket) @@ -262,7 +303,7 @@ bool hos_verify_bucket(const char *bucket) "debug: bucket is null"); return false; } - if (g_hos_instance.result != true || g_hos_handle.S3Client == NULL) + if (atomic_read(&g_hos_instance.status) != INSTANCE_ENABLE_STATE) { return false; } @@ -587,6 +628,9 @@ static int hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t } } +//XXX 处于对count的考虑,删除hos_get_instance函数 +//将其放到hos_init_instance函数中去 +#if 0 hos_instance hos_get_instance() { if (g_hos_handle.S3Client != NULL) @@ -597,7 +641,35 @@ hos_instance hos_get_instance() } memset(&g_hos_instance, 0, sizeof(g_hos_instance)); g_hos_instance.result = false; - return &g_hos_instance; + return NULL; +} +#endif + +int hos_get_instance_status() +{ + switch (atomic_read(&g_hos_instance.status)) + { + case INSTANCE_UNINIT_STATE: + return INSTANCE_UNINIT_STATE; + default: + atomic_add(&g_hos_handle.count, 1); + return atomic_read(&g_hos_instance.status); + } +} + +int hos_get_init_instance_errorcode() +{ + return g_hos_instance.error_code; +} + +const char *hos_get_init_instance_errormsg() +{ + return g_hos_instance.error_message; +} + +const char *hos_get_upload_endpoint() +{ + return g_hos_instance.hos_url_prefix; } hos_instance hos_init_instance(const char *conf_path, const char *module, size_t thread_num, const char *bucket) @@ -608,14 +680,33 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t if (conf_path == NULL || thread_num == 0 || module == NULL || bucket == NULL) { - g_hos_instance.result = false; + g_hos_instance.status = INSTANCE_UNINIT_STATE; g_hos_instance.error_code = HOS_PARAMETER_ERROR; snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "param error:conf_path:%s, module:%s, thread_num:%lu, bucket:%s", conf_path, module, thread_num, bucket); + return NULL; + } + + if (atomic_read(&g_hos_instance.status) == INSTANCE_ENABLE_STATE) + { + atomic_add(&g_hos_handle.count, 1); return &g_hos_instance; } + else if (atomic_read(&g_hos_instance.status) == INSTANCE_ATTEMPT_STATE) + { + atomic_add(&g_hos_handle.count, 1); + return NULL; + } + else + { + // INSTANCE_UNINIT_STATE + //GO ON + memset(&g_hos_handle, 0, sizeof(g_hos_handle)); + memset(&g_hos_instance, 0, sizeof(g_hos_instance)); + } + - MESA_load_profile_string_nodef(conf_path, module, "hos_serverip", hos_conf->ip, MAX_HOS_STRING_LEN); + MESA_load_profile_string_nodef(conf_path, module, "hos_serverip", hos_conf->ip, INET6_ADDRSTRLEN); MESA_load_profile_uint_nodef(conf_path, module, "hos_serverport", &hos_conf->port); MESA_load_profile_string_nodef(conf_path, module, "hos_accesskeyid", hos_conf->accesskeyid, MAX_HOS_STRING_LEN); MESA_load_profile_string_nodef(conf_path, module, "hos_secretkey", hos_conf->secretkey, MAX_HOS_STRING_LEN); @@ -635,50 +726,45 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t g_hos_handle.log = MESA_create_runtime_log_handle(hos_conf->log_path, hos_conf->log_level); if (g_hos_handle.log == NULL) { - g_hos_instance.result = false; g_hos_instance.error_code = HOS_RUNTIME_LOG_FAILED; snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "runtime log create failed."); - return &g_hos_instance; + return NULL; } snprintf(hos_url, sizeof(hos_url), "http://%s:%d/hos/", hos_conf->ip, hos_conf->port); hos_conf->thread_num = thread_num; hos_client_create(); - if (g_hos_instance.result == true) + //不验证bucket是否存在,允许中途建桶 + #if 0 + if (atomic_read(&g_hos_instance.status) == true) { if(hos_verify_bucket(bucket) == false) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: bucket:%s not exist.", bucket); hos_shutdown_instance(); - g_hos_instance.result = false; + g_hos_instance.status = INSTANCE_UNINIT_STATE; g_hos_instance.error_code = HOS_BUCKET_NOT_EXIST; snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "bucket:%s not exits.", bucket); - return &g_hos_instance; + return NULL; } - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug:%s","Instance init completed"); - hos_expand_fs2(); - g_hos_instance.error_code = 0; - g_hos_instance.error_message[0]='\0'; - g_hos_instance.hos_url_prefix = (const char *)calloc(1, strlen(hos_url) + 1); - memcpy((void *)g_hos_instance.hos_url_prefix, hos_url, strlen(hos_url)); } + #endif return &g_hos_instance; } else { - g_hos_instance.result = false; g_hos_instance.error_code = HOS_CONF_ERROR; snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "hos param error:hos ip:%s, hos port:%u, accesskeyid:%s, secretkey:%s", hos_conf->ip, hos_conf->port, hos_conf->accesskeyid, hos_conf->secretkey); - return &g_hos_instance; + return NULL; } } int hos_create_bucket(const char *bucket) { - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (atomic_read(&g_hos_instance.status) == INSTANCE_ENABLE_STATE) { - return HOS_INSTANCE_NOT_INIT; + return HOS_INSTANCE_NOT_ENABLE; } if (bucket == NULL) { @@ -727,9 +813,9 @@ static int hos_upload_stream(const char *bucket, const char *object, const char int ret; int mode = 0; - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (atomic_read(&g_hos_instance.status) != INSTANCE_ENABLE_STATE) { - return HOS_INSTANCE_NOT_INIT; + return HOS_INSTANCE_NOT_ENABLE; } if ((bucket == NULL) || (object == NULL) || (thread_id > hos_conf->thread_num)) @@ -796,9 +882,9 @@ static int hos_upload_stream(const char *bucket, const char *object, const char int hos_upload_file(const char *bucket, const char *file_path, put_finished_callback callback, void *userdata, size_t thread_id) { struct stat buffer; - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (atomic_read(&g_hos_instance.status) != INSTANCE_ENABLE_STATE) { - return HOS_INSTANCE_NOT_INIT; + return HOS_INSTANCE_NOT_ENABLE; } if ((bucket == NULL) || (file_path == NULL) || (thread_id > g_hos_handle.hos_config.thread_num)) @@ -819,9 +905,9 @@ int hos_upload_file(const char *bucket, const char *file_path, put_finished_call int hos_upload_buf(const char *bucket, const char *object, const char *buf, size_t buf_len, put_finished_callback callback, void *userdata, size_t thread_id) { - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (atomic_read(&g_hos_instance.status) != INSTANCE_ENABLE_STATE) { - return HOS_INSTANCE_NOT_INIT; + return HOS_INSTANCE_NOT_ENABLE; } if ((bucket == NULL) || (object == NULL) || (buf == NULL) || (buf_len == 0) @@ -837,20 +923,20 @@ int hos_upload_buf(const char *bucket, const char *object, const char *buf, size int hos_open_fd(const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id) { - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) - { - return HOS_INSTANCE_NOT_INIT; + if (atomic_read(&g_hos_instance.status) != INSTANCE_ENABLE_STATE) + { + return HOS_INSTANCE_NOT_ENABLE; } if ((bucket == NULL) || (object == NULL) || (thread_id > g_hos_handle.hos_config.thread_num) || strlen(bucket) == 0 || strlen(object) == 0) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_open_fd", + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: bucket:%s, obejct:%s, thread_id:%lu", (bucket == NULL)?"null":bucket, (object == NULL)?"null":object, thread_id); return HOS_PARAMETER_ERROR; } size_t fd = ++g_fd_info[thread_id]; - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "debug: thread_id:%lu, fd:%lu", thread_id, fd); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: thread_id:%lu, fd:%lu", thread_id, fd); hos_fd_context_t info = {fd, BUFF_MODE | APPEND_MODE, (char *)bucket, (char *)object, (void *)callback, userdata, NULL,/*cache*/ g_hos_handle.hos_config.cache_count, 0,/*position*/ 0,/*recive_cnt*/ @@ -870,9 +956,9 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id hos_func_thread_t *hos_func = &g_hos_handle.hos_func; size_t upload_len = 0; - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (atomic_read(&g_hos_instance.status) != INSTANCE_ENABLE_STATE) { - return HOS_INSTANCE_NOT_INIT; + return HOS_INSTANCE_NOT_ENABLE; } if ((stream == NULL) || (thread_id > hos_conf->thread_num)) @@ -971,9 +1057,9 @@ int hos_close_fd(size_t fd, size_t thread_id) hos_config_t *hos_conf = &g_hos_handle.hos_config; size_t upload_len = 0; - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (atomic_read(&g_hos_instance.status) != INSTANCE_ENABLE_STATE) { - return HOS_INSTANCE_NOT_INIT; + return HOS_INSTANCE_NOT_ENABLE; } if (thread_id > hos_conf->thread_num) @@ -1062,7 +1148,7 @@ int hos_shutdown_instance() hos_func_thread_t *hos_func = &g_hos_handle.hos_func; size_t task_num = 0; - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (atomic_read(&g_hos_instance.status) == INSTANCE_UNINIT_STATE) { return HOS_INSTANCE_NOT_INIT; } @@ -1073,6 +1159,12 @@ int hos_shutdown_instance() return HOS_CLIENT_OK; } + if (atomic_read(&g_hos_instance.status) == INSTANCE_ATTEMPT_STATE) + { + atomic_set(&g_hos_handle.hos_func.hos_client_retry_thread_status, 1); + pthread_join(g_hos_handle.hos_func.hos_client_retry_thread_id, NULL); + } + //先等待所有的task完成 while(1) { diff --git a/src/hos_client.h b/src/hos_client.h index f1e5e51d..ace6300b 100644 --- a/src/hos_client.h +++ b/src/hos_client.h @@ -6,13 +6,8 @@ #ifndef __HOS_CLIENT_INIT__ #define __HOS_CLIENT_INIT__ -/*hos instance */ -typedef struct hos_instance_s{ - bool result; - int error_code; - char error_message[1024]; - const char *hos_url_prefix; -}* hos_instance; +struct hos_instance_s; +typedef struct hos_instance_s *hos_instance; #define HOS_CLIENT_OK 0 @@ -33,6 +28,7 @@ enum hoserrors HOS_CONF_ERROR = -7, HOS_BUCKET_NOT_EXIST = -8, HOS_INSTANCE_NOT_INIT = -9, + HOS_INSTANCE_NOT_ENABLE = -10, }; @@ -94,9 +90,12 @@ typedef void (*put_finished_callback)(bool result, const char *bucket, const cha hos_instance hos_init_instance(const char *conf_path, const char *module, size_t thread_num, const char *bucket); /************************************************************************************* * 函数名: hos_get_instance - * 返回值: hos_instance 成功,result 为true + * 返回值: hos_instance *************************************************************************************/ hos_instance hos_get_instance(); +bool hos_get_init_instance_result(); +char *hos_get_upload_prefix(); +int hos_get_init_instance_errorcode(); /************************************************************************************* * 函数名: hos_upload_file * 参数: hos_instance instance 非空句柄 diff --git a/src/hos_common.h b/src/hos_common.h index 8d4425f7..2fdfc03e 100644 --- a/src/hos_common.h +++ b/src/hos_common.h @@ -14,10 +14,12 @@ #define atomic_add(x,y) __sync_add_and_fetch((x),(y)) #define atomic_read(x) __sync_add_and_fetch((x),0) #define atomic_sub(x,y) __sync_sub_and_fetch((x),(y)) +#define atomic_set(x,y) __sync_lock_test_and_set((x),(y)) #else #define atomic_add(x,y) ((*(x))+=(y)) #define atomic_read(x) (*(x)) #define atomic_sub(x,y) ((*(x))-=(y)) +#define atomic_set(x,y) (*(x)=(y)) #endif #define MAX_HOS_STRING_LEN 1024 @@ -25,6 +27,17 @@ #define MAX_HOS_CLIENT_FD_NUM 65535 #define HOS_LOG_PATH "./tsglog/hoslog" +/*hos instance */ +typedef struct hos_instance_s{ +#define INSTANCE_UNINIT_STATE 0 +#define INSTANCE_ATTEMPT_STATE 1 +#define INSTANCE_ENABLE_STATE 2 + int status; + int error_code; + char error_message[1024]; + const char *hos_url_prefix; +}* hos_instance; + typedef struct data_info_s { size_t *tx_pkts; @@ -54,8 +67,8 @@ enum typedef struct hos_config_s { - char ip[INET6_ADDRSTRLEN]; - char fs2_ip[INET6_ADDRSTRLEN]; + char ip[INET6_ADDRSTRLEN+1]; + char fs2_ip[INET6_ADDRSTRLEN+1]; char accesskeyid[MAX_HOS_STRING_LEN]; char secretkey[MAX_HOS_STRING_LEN]; char log_path[MAX_HOS_STRING_LEN]; @@ -82,6 +95,8 @@ typedef struct hos_func_thread_s fs2_info_t fs2_info; pthread_t fs2_thread; int fs2_status; + pthread_t hos_client_retry_thread_id; + int hos_client_retry_thread_status; #define HOS_FS2_START 1 #define HOS_FS2_STOP 2 }hos_func_thread_t; @@ -94,7 +109,6 @@ typedef struct hos_client_handle_s Aws::S3::S3ClientMock *S3Client; #endif Aws::Vector<Aws::S3::Model::Bucket> buckets; - std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor> executor; size_t count; /* 记录了有多少个对象在使用hos */ hos_config_t hos_config; hos_func_thread_t hos_func; |
