diff options
Diffstat (limited to 'src/hos_client.cpp')
| -rw-r--r-- | src/hos_client.cpp | 225 |
1 files changed, 186 insertions, 39 deletions
diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 5d251aeb..27cced9a 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -30,10 +30,13 @@ typedef struct hos_client_handle_s Aws::S3::S3Client *S3Client; Aws::SDKOptions options; Aws::Vector<Aws::S3::Model::Bucket> buckets; + pthread_t fd_thread; + int fd_thread_status; /* options */ size_t cache_size; size_t cache_times; size_t thread_sum; + size_t timeout; /* expand */ screen_stat_handle_t fs2_handle; pthread_t fs2_thread; @@ -52,10 +55,18 @@ typedef struct hos_client_handle_s int *rx_bytes_last; }hos_client_handle_t; +hos_client_handle hos_handle;//一个进程只允许有一个hos_handle hos_info_t *hash_hos_info[MAX_HOS_CLIENT_THREAD_NUM]; size_t fd_info[MAX_HOS_CLIENT_THREAD_NUM][MAX_HOS_CLIENT_FD_NUM]; Aws::SDKOptions options; +static inline size_t get_current_ms() +{ + struct timespec timenow; + clock_gettime(CLOCK_MONOTONIC, &timenow); + return (timenow.tv_sec * 1000 + timenow.tv_nsec / 1000 / 1000 ); +} + static size_t hash_get_min_free_fd(size_t thread_id) { size_t i = 0; @@ -67,6 +78,18 @@ static size_t hash_get_min_free_fd(size_t thread_id) return 0; } +static int hos_delete_fd(size_t fd, size_t thread_id) +{ + if (fd == 0) + { + return HOS_PARAMETER_ERROR; + } + delete_info_by_fd(&hash_hos_info[thread_id], fd); + fd_info[thread_id][fd] = 0; + + return HOS_CLIENT_OK; +} + static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, const Aws::S3::Model::PutObjectRequest& request, const Aws::S3::Model::PutObjectOutcome& outcome, @@ -91,14 +114,23 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, return ; } put_finished_callback callback = (put_finished_callback)hos_info->callback; - callback(result, error, hos_info->userdata); + callback(result, hos_info->bucket, hos_info->object, error, hos_info->userdata); if (hos_info->mode & APPEND_MODE) { //APPEND MODE 保留fd + hos_info->recive_cnt++; +#if 0 + if (hos_info->fd_status == HOS_FD_INJECT) + { + if (hos_info->recive_cnt == hos_info->position) + hos_delete_fd(fd, thread_id); + } +#endif }else { //完整上传 删除fd - hos_close_fd(fd, thread_id); + //hos_delete_fd(fd, thread_id); + hos_info->fd_status = HOS_FD_INJECT; } } @@ -139,9 +171,12 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi return NULL; } - - hos_client_handle handle = (hos_client_handle)malloc(sizeof(hos_client_handle_t)); - memset(handle, 0, sizeof(hos_client_handle_t)); + if (hos_handle) + { + return hos_handle; + } + hos_handle = (hos_client_handle)malloc(sizeof(hos_client_handle_t)); + memset(hos_handle, 0, sizeof(hos_client_handle_t)); Aws::Client::ClientConfiguration config; Aws::Auth::AWSCredentials credentials(accesskeyid, secretkey); @@ -151,21 +186,23 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi config.enableEndpointDiscovery = true; config.executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(pool_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY));//支持线程池 - handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); - handle->options = options; + hos_handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); + hos_handle->options = options; /* 获取当前用户的所有的buckets */ - Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets(); + Aws::S3::Model::ListBucketsOutcome outcome = hos_handle->S3Client->ListBuckets(); - if (outcome.IsSuccess()) + if (!outcome.IsSuccess()) { - handle->buckets = outcome.GetResult().GetBuckets(); + return NULL; } - handle->cache_size = 0; - handle->cache_times = 1; - handle->thread_sum = 1; + hos_handle->buckets = outcome.GetResult().GetBuckets(); + hos_handle->cache_size = 0; + hos_handle->cache_times = 1; + hos_handle->thread_sum = 1; + hos_handle->timeout = 1000; - return handle; + return hos_handle; } static void *fs2_statistics(void *ptr) @@ -420,7 +457,7 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const sprintf(buf, "%lu %lu", thread_id, fd); context->SetUUID(buf); - hos_info_t info = {fd, 0, handle, bucket, object, (void *)callback, userdata, NULL, 0, 0, 0 }; + hos_info_t info = {fd, 0, handle, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, 0, 0, 0 }; add_hos_info(&hash_hos_info[thread_id], &info); fd_info[thread_id][fd] = 1; @@ -440,6 +477,37 @@ int hos_upload_buf(hos_client_handle handle, const char *bucket, const char *obj return hos_upload_stream(handle, bucket, object, buf, buf_len, callback, userdata, thread_id, 1); } +static void *hos_fd_manage(void *ptr) +{ + hos_info_t *hos_info; + hos_client_handle handle = (hos_client_handle)ptr; + size_t thread_sum = handle->thread_sum; + size_t thread_num; + size_t fd; + while(1) + { + if (handle->fd_thread_status) + break; + for (thread_num = 0; thread_num < thread_sum; thread_num++) + { + for(fd = 0; fd < MAX_HOS_CLIENT_FD_NUM; fd++) + { + if (!fd_info[thread_num][fd]) + break; + hos_info = find_info_by_fd(hash_hos_info[thread_num], fd); + if (!hos_info) + break; + if (hos_info->fd_status == HOS_FD_REGISTER) + continue; + if ((hos_info->position == hos_info->recive_cnt) || (hos_info->overtime <= get_current_ms())) + hos_delete_fd(fd, thread_num); + } + } + usleep(1000); + } + pthread_exit(NULL); +} + int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id, int mode) { if ((handle == NULL) || (bucket == NULL) || (object == NULL) || (thread_id > handle->thread_sum)) @@ -453,10 +521,16 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object return HOS_FD_NOT_ENOUGH; } - hos_info_t info = {fd, mode, handle, bucket, object, (void *)callback, userdata, NULL, handle->cache_times, handle->cache_size, 0, }; + hos_info_t info = {fd, mode, handle, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, handle->cache_times, handle->cache_size, 0, 0, HOS_FD_REGISTER, 0, handle->timeout,}; add_hos_info(&hash_hos_info[thread_id], &info); fd_info[thread_id][fd] = 1; - +#if 1 + if (handle->fd_thread == 0) + { + handle->fd_thread_status = 0; + pthread_create(&handle->fd_thread, NULL, hos_fd_manage, handle); + } +#endif return fd; } @@ -495,10 +569,8 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id Aws::S3::S3Client& S3Client = *(handle->S3Client); - // Create and configure the asynchronous put object request. + // create and configure the asynchronous put object request. Aws::S3::Model::PutObjectRequest request; - request.SetBucket(hos_info->bucket); - request.SetKey(hos_info->object); //设置上传数据类型 if (hos_info->mode & BUFF_MODE) @@ -511,27 +583,53 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id { hos_info->cache = Aws::MakeShared<Aws::StringStream>("append mode"); } - if ((--hos_info->cache_times) && (stream_len <= hos_info->cache_rest)) + if (hos_info->cache_times == 0) { - // cache - Aws::String buffer (stream, stream_len); - hos_info->cache_rest -= stream_len; - if (hos_info->cache_rest > 0) + //不设置cache_times的情况下 + if (stream_len < hos_info->cache_rest) + { + // cache + Aws::String buffer (stream, stream_len); + *hos_info->cache << buffer; + hos_info->cache_rest -= stream_len; + if (hos_info->cache_rest > 0) + { + return HOS_CLIENT_OK; + } + }else if (stream_len >= hos_info->cache_rest) { - return HOS_CLIENT_OK; + // multi handle + flag = 1; + Aws::String buffer (stream, hos_info->cache_rest); + *hos_info->cache << buffer; + rest = stream_len - hos_info->cache_rest; } - }else if (stream_len > hos_info->cache_rest) - { - // multi handle - flag = 1; - Aws::String buffer (stream, hos_info->cache_rest); - *hos_info->cache << buffer; - rest = stream_len - hos_info->cache_rest; - }else + }else { - //over cache_times - Aws::String buffer (stream, stream_len); - *hos_info->cache << buffer; + //设置cache times的情况下 + if ((--hos_info->cache_times) && (stream_len <= hos_info->cache_rest)) + { + // cache + Aws::String buffer (stream, stream_len); + *hos_info->cache << buffer; + hos_info->cache_rest -= stream_len; + if (hos_info->cache_rest > 0) + { + return HOS_CLIENT_OK; + } + }else if (stream_len > hos_info->cache_rest) + { + // multi handle + flag = 1; + Aws::String buffer (stream, hos_info->cache_rest); + *hos_info->cache << buffer; + rest = stream_len - hos_info->cache_rest; + }else + { + //over cache_times + Aws::String buffer (stream, stream_len); + *hos_info->cache << buffer; + } } request.SetBody(hos_info->cache); @@ -565,6 +663,9 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id request.SetBody(input_data); } + request.SetBucket(hos_info->bucket); + request.SetKey(hos_info->object); + //设置回调函数 std::shared_ptr<Aws::Client::AsyncCallerContext> context = Aws::MakeShared<Aws::Client::AsyncCallerContext>(""); @@ -612,13 +713,54 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id int hos_close_fd(size_t fd, size_t thread_id) { + hos_info_t *hos_info; + char num[128]; + char buf[128]; if (fd == 0) { return HOS_PARAMETER_ERROR; } + if (fd_info[thread_id][fd]) + { + hos_info = find_info_by_fd(hash_hos_info[thread_id], fd); + } + if (hos_info == NULL) + { + return HOS_CLIENT_OK; + } - delete_info_by_fd(&hash_hos_info[thread_id], fd); - fd_info[thread_id][fd] = 0; + //close fd 之前发送append的缓存中内容 + if ((hos_info->mode & BUFF_MODE) && (hos_info->mode & APPEND_MODE)) + { + if (hos_info->cache_rest != hos_info->handle->cache_size) + { + //handle = (hos_client_handle)hos_info->handle; + Aws::S3::S3Client& S3Client = *(hos_info->handle->S3Client); + + // Create and configure the asynchronous put object request. + Aws::S3::Model::PutObjectRequest request; + request.SetBucket(hos_info->bucket); + request.SetKey(hos_info->object); + request.SetBody(hos_info->cache); + + // add headers + snprintf(num, 128, "%lu", ++hos_info->position); + Aws::Map<Aws::String, Aws::String> headers; + headers["x-hos-upload-type"] = "append"; + headers["x-hos-position"] = num; + request.SetMetadata(headers); + + std::shared_ptr<Aws::Client::AsyncCallerContext> context = + Aws::MakeShared<Aws::Client::AsyncCallerContext>(""); + sprintf(buf, "%lu %lu", thread_id, fd); + context->SetUUID(buf); + + S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); + } + } + hos_info->fd_status = HOS_FD_INJECT; + hos_info->cache.reset(); + hos_info->overtime = get_current_ms() + hos_info->timeout; return HOS_CLIENT_OK; } @@ -635,6 +777,11 @@ int hos_client_destory(hos_client_handle handle) Aws::Vector<Aws::S3::Model::Bucket>().swap(handle->buckets); + if (handle->fd_thread) + { + handle->fd_thread_status = 1; + pthread_join(handle->fd_thread, NULL); + } for (i = 0; i < handle->thread_sum; i++) { delete_all(&hash_hos_info[i]); |
