diff options
Diffstat (limited to 'src/hos_client.cpp')
| -rw-r--r-- | src/hos_client.cpp | 40 |
1 files changed, 27 insertions, 13 deletions
diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 659db6c5..f49bf9ff 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -25,20 +25,21 @@ typedef struct hos_client_handle_s Aws::S3::S3Client *S3Client; size_t append_size; size_t thread_sum; - Aws::SDKOptions *options; + Aws::SDKOptions options; Aws::Vector<Aws::S3::Model::Bucket> buckets; }hos_client_handle_t; #define MAX_THREAD_NUM 255 #define MAX_FD_NUM 65535 hos_info_t *hash_hos_info[MAX_THREAD_NUM]; +size_t fd_info[MAX_THREAD_NUM][MAX_FD_NUM]; -static size_t hash_get_min_free_fd(hos_info_t *handle) +static size_t hash_get_min_free_fd(size_t thread_id) { size_t i = 0; for (i = 1; i < MAX_FD_NUM; i++) { - if (!find_info_by_fd(handle, i)) + if (!fd_info[thread_id][i]) return i; } return 0; @@ -50,6 +51,7 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context) { const char *error = NULL; + hos_info_t *hos_info = NULL; bool result = outcome.IsSuccess(); if (!result) { @@ -58,8 +60,14 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, const Aws::String& uuid = context->GetUUID(); size_t thread_id, fd; sscanf(uuid.c_str(), "%lu %lu", &thread_id, &fd); - hos_info_t *hos_info = find_info_by_fd(hash_hos_info[thread_id], fd); - //put_finished_callback& callback = *(put_finished_callback *)hos_info->callback; + if (fd_info[thread_id][fd]) + { + hos_info = find_info_by_fd(hash_hos_info[thread_id], fd); + } + if (hos_info == NULL) + { + return ; + } put_finished_callback callback = (put_finished_callback)hos_info->callback; callback(result, error, hos_info->userdata); if (hos_info->mode & APPEND_MODE) @@ -78,8 +86,8 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi { return NULL; } - //Aws::SDKOptions *options = (Aws::SDKOptions *)malloc(sizeof(Aws::SDKOptions)); Aws::SDKOptions options; + //auto options = new Aws::SDKOptions; Aws::InitAPI(options); hos_client_handle handle = (hos_client_handle)malloc(sizeof(hos_client_handle_t)); @@ -96,7 +104,7 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); handle->append_size = 30 * 1024 * 1024; handle->thread_sum = thread_sum; - handle->options = &options; + handle->options = options; /* 获取当前用户的所有的buckets */ Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets(); @@ -168,7 +176,7 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const { struct stat buffer; char buf[128]; - size_t fd = hash_get_min_free_fd(hash_hos_info[thread_id]); + size_t fd = hash_get_min_free_fd(thread_id); if ((handle == NULL) || (bucket == NULL) || (object == NULL) || (callback == NULL) || (thread_id > handle->thread_sum)) { @@ -211,6 +219,7 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const hos_info_t info = {fd, 0, handle, bucket, object, (void *)callback, userdata, }; add_hos_info(&hash_hos_info[thread_id], &info); + fd_info[thread_id][fd] = 1; S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); return HOS_CLIENT_OK; @@ -235,7 +244,7 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object return HOS_PARAMETER_ERROR; } - size_t fd = hash_get_min_free_fd(hash_hos_info[thread_id]); + size_t fd = hash_get_min_free_fd(thread_id); if (fd == 0) { return HOS_FD_NOT_ENOUGH; @@ -243,6 +252,7 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object hos_info_t info = {fd, mode, handle, bucket, object, (void *)callback, userdata, }; add_hos_info(&hash_hos_info[thread_id], &info); + fd_info[thread_id][fd] = 1; return fd; } @@ -259,7 +269,10 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id return HOS_PARAMETER_ERROR; } - hos_info = find_info_by_fd(hash_hos_info[thread_id], fd); + if (fd_info[thread_id][fd]) + { + hos_info = find_info_by_fd(hash_hos_info[thread_id], fd); + } if (hos_info == NULL) { return HOS_HASH_NOT_FIND; @@ -279,7 +292,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id if (hos_info->mode & APPEND_MODE) { headers["x-hos-upload-type"] = "append"; - headers["x-hos_position"] = num; + headers["x-hos-position"] = num; request.SetMetadata(headers); #if 0 request.AddMetadata("x-hos-upload-type", "append"); @@ -333,6 +346,7 @@ int hos_close_fd(size_t fd, size_t thread_id) } delete_info_by_fd(&hash_hos_info[thread_id], fd); + fd_info[thread_id][fd] = 0; return HOS_CLIENT_OK; } @@ -348,13 +362,13 @@ int hos_client_destory(hos_client_handle handle) delete handle->S3Client; Aws::Vector<Aws::S3::Model::Bucket>().swap(handle->buckets); - Aws::ShutdownAPI(*(handle->options)); + Aws::ShutdownAPI((handle->options)); for (i = 0; i < handle->thread_sum; i++) { delete_all(&hash_hos_info[i]); } - //free(handle->options); + //delete(handle->options); free(handle); return HOS_CLIENT_OK; |
