diff options
Diffstat (limited to 'src/hos_client.cpp')
| -rw-r--r-- | src/hos_client.cpp | 135 |
1 files changed, 31 insertions, 104 deletions
diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 5132182d..ace52c9f 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -8,119 +8,32 @@ extern "C" #include <string.h> #include <sys/stat.h> #include <unistd.h> -#include <netinet/in.h> } -#include <aws/core/Aws.h> -#include <aws/s3/S3Client.h> #include <aws/s3/model/PutObjectRequest.h> #include <aws/s3/model/CreateBucketRequest.h> -#include <aws/core/auth/AWSCredentials.h> -#include <aws/core/utils/threading/Executor.h> #include <fstream> #include <iostream> -#include <mutex> #include <aws/external/gtest.h> #include <aws/testing/platform/PlatformTesting.h> #include <aws/testing/TestingEnvironment.h> #include <aws/testing/MemoryTesting.h> +#ifdef HOS_MOCK +#include "mock/hos_mock.h" +#endif #include "hos_client.h" -#include "hos_hash.h" -#include "field_stat2.h" #include "MESA_handle_logger.h" #include "MESA_prof_load.h" +#include "hos_common.h" -#if(__GNUC__ * 100 + __GNUC_MINOR__ * 10 + __GNUC_PATCHLEVEL__ >= 410) -#define atomic_add(x,y) __sync_add_and_fetch((x),(y)) -#define atomic_read(x) __sync_add_and_fetch((x),0) -#define atomic_sub(x,y) __sync_sub_and_fetch((x),(y)) -#else -#define atomic_add(x,y) ((*(x))+=(y)) -#define atomic_read(x) (*(x)) -#define atomic_sub(x,y) ((*(x))-=(y)) -#endif - -#define MAX_HOS_STRING_LEN 1024 -#define HOS_ERROR_MESSAGE_SIZE (MAX_HOS_STRING_LEN - 1) -#define MAX_HOS_CLIENT_FD_NUM 65535 -#define HOS_LOG_PATH "./tsglog/hoslog" - -typedef struct data_info_s -{ - size_t *tx_pkts; - size_t *tx_bytes; - size_t *rx_pkts; - size_t *rx_bytes; - size_t *tx_failed_pkts; - size_t *tx_failed_bytes; - size_t *cache; -}data_info_t; - -typedef struct fs2_info_s -{ - screen_stat_handle_t fs2_handle; - int *line_ids; - int *column_ids; - void *reserved; //预留给每个fs2 handle用来存储自定义的数据 -}fs2_info_t; - -enum -{ - FS2_DATA_FLOW_STATE = 0, - FS2_POOL_THREAD_STATE, - FS2_RECORD_EVENTS, -}; - -typedef struct hos_config_s -{ - char ip[INET6_ADDRSTRLEN]; - char fs2_ip[INET6_ADDRSTRLEN]; - char accesskeyid[MAX_HOS_STRING_LEN]; - char secretkey[MAX_HOS_STRING_LEN]; - char log_path[MAX_HOS_STRING_LEN]; - char fs2_path[MAX_HOS_STRING_LEN]; - - uint32_t port; - uint32_t fs2_port; - uint32_t fs2_fmt; - uint32_t log_level; - uint32_t pool_thread_size; - uint32_t thread_num; - uint32_t cache_size; - uint32_t cache_count; - uint32_t timeout; -}hos_config_t; - -typedef struct hos_func_thread_s -{ - /* fd 管理线程 */ - pthread_t fd_thread; - int fd_thread_status; - /* fs2 管理线程 */ - fs2_info_t fs2_info[FS2_RECORD_EVENTS]; //0: data info; 1: fd info; 2 cache info; 3 PoolThread state - pthread_t fs2_thread; - int fs2_status; -#define HOS_FS2_START 1 -#define HOS_FS2_STOP 2 -}hos_func_thread_t; - -typedef struct hos_client_handle_s -{ - Aws::S3::S3Client *S3Client; - Aws::Vector<Aws::S3::Model::Bucket> buckets; - std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor> executor; - size_t count; /* 记录了有多少个对象在使用hos */ - hos_config_t hos_config; - hos_func_thread_t hos_func; - void *log; -}hos_client_handle_t; - -static struct hos_instance_s g_hos_instance; -static hos_client_handle_t g_hos_handle;//一个进程只允许有一个g_hos_handle +struct hos_instance_s g_hos_instance; +hos_client_handle_t g_hos_handle;//一个进程只允许有一个g_hos_handle static std::mutex m_client_lock; -static hos_fd_context_t **g_fd_context; -static size_t (*g_fd_info)[MAX_HOS_CLIENT_FD_NUM + 1]; //fd 实际从3开始, fd[thread_id][0]记录register的fd,fd[thread_id][1]记录inject的fd +hos_fd_context_t **g_fd_context; +size_t (*g_fd_info)[MAX_HOS_CLIENT_FD_NUM + 1]; //fd 实际从3开始, fd[thread_id][0]记录register的fd,fd[thread_id][1]记录inject的fd static Aws::SDKOptions g_options; +static void *hos_fd_manage(void *ptr); + static inline size_t get_current_ms() { struct timespec timenow; @@ -268,7 +181,11 @@ static void hos_client_create() //同步模式 } + #ifndef HOS_MOCK g_hos_handle.S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); + #else + g_hos_handle.S3Client = new Aws::S3::S3ClientMock(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); + #endif /* 获取当前用户的所有的buckets */ Aws::S3::Model::ListBucketsOutcome outcome = g_hos_handle.S3Client->ListBuckets(); @@ -571,7 +488,7 @@ static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t sprintf(buf, "%lu %lu %lu", thread_id, fd, stream_len); context->SetUUID(buf); - Aws::S3::S3Client& S3Client = *(g_hos_handle.S3Client); + auto &S3Client = *(g_hos_handle.S3Client); ret = S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); if (ret) { @@ -605,7 +522,7 @@ static int hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t hos_func_thread_t *hos_func = &g_hos_handle.hos_func; data_info_t *data_info = NULL; - Aws::S3::S3Client& S3Client = *(g_hos_handle.S3Client); + auto& S3Client = *(g_hos_handle.S3Client); Aws::S3::Model::PutObjectOutcome Outcome = S3Client.PutObject(request); if (Outcome.IsSuccess()) { @@ -650,6 +567,7 @@ hos_instance hos_get_instance() g_hos_instance.result = true; return &g_hos_instance; } + memset(&g_hos_instance, 0, sizeof(g_hos_instance)); g_hos_instance.result = false; return &g_hos_instance; } @@ -742,7 +660,7 @@ int hos_create_bucket(const char *bucket) "error:bucket:%s, s3client:%s", bucket, g_hos_handle.S3Client?"not null":"null"); return HOS_PARAMETER_ERROR; } - Aws::S3::S3Client& S3Client = *g_hos_handle.S3Client; + auto& S3Client = *g_hos_handle.S3Client; /* 本地检查是否已经存在该bucket */ for (Aws::S3::Model::Bucket& new_bucket : g_hos_handle.buckets) @@ -777,7 +695,6 @@ int hos_create_bucket(const char *bucket) static int hos_upload_stream(const char *bucket, const char *object, const char *data, size_t data_len, put_finished_callback callback, void *userdata, size_t thread_id) { - char buf[128]; data_info_t *data_info = NULL; hos_config_t *hos_conf = &g_hos_handle.hos_config; hos_func_thread_t *hos_func = &g_hos_handle.hos_func; @@ -833,6 +750,15 @@ static int hos_upload_stream(const char *bucket, const char *object, const char hos_fd_context_t info = {fd, 0, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, 0, 0, 0 }; add_fd_context(&g_fd_context[thread_id], &info); + { + std::lock_guard<std::mutex> locker(m_client_lock); + if (g_hos_handle.hos_func.fd_thread == 0) + { + g_hos_handle.hos_func.fd_thread_status = 0; + pthread_create(&g_hos_handle.hos_func.fd_thread, NULL, hos_fd_manage, NULL); + } + } + if (hos_conf->pool_thread_size > 0) { ret = hos_putobject_async(request, data_len, thread_id, fd, bucket, object); @@ -942,13 +868,14 @@ int hos_open_fd(const char *bucket, const char *object, put_finished_callback ca { MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s", - g_hos_instance.result, (g_hos_handle.S3Client == NULL)?(NULL):("not null")); + g_hos_instance.result, (g_hos_handle.S3Client == NULL)?("null"):("not null")); return HOS_INSTANCE_NOT_INIT; } if ((bucket == NULL) || (object == NULL) || (thread_id > g_hos_handle.hos_config.thread_num) || strlen(bucket) == 0 || strlen(object) == 0) { MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, "hos_open_fd", - "bucket:%s, obejct:%s, thread_id:%s", + "bucket:%s, obejct:%s, thread_id:%d", + //(bucket == NULL)?"null":bucket, (object == NULL)?"null":object, thread_id); bucket, object, thread_id); return HOS_PARAMETER_ERROR; } @@ -1007,7 +934,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id if ((fd < 3) || fd > MAX_HOS_CLIENT_FD_NUM || (stream == NULL) || (thread_id > hos_conf->thread_num)) { MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, - "hos_write", "error: fd:%d, stream:%s, stream_len:%s, thread_id:%d.", + "hos_write", "error: fd:%d, stream:%s, stream_len:%d, thread_id:%d.", fd, stream?"not null":"null", stream_len, thread_id); return HOS_PARAMETER_ERROR; } |
