diff options
Diffstat (limited to 'src/hos_client.cpp')
| -rw-r--r-- | src/hos_client.cpp | 288 |
1 files changed, 246 insertions, 42 deletions
diff --git a/src/hos_client.cpp b/src/hos_client.cpp index f2a092e2..9e5acfde 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -3,6 +3,10 @@ > Author: pxz > Created Time: Thu 10 Sep 2020 03:00:23 PM CST ************************************************************************/ +extern "C" +{ +#include<string.h> +} #include <aws/core/Aws.h> #include <aws/s3/S3Client.h> #include <aws/s3/model/PutObjectRequest.h> @@ -13,111 +17,311 @@ #include <mutex> #include <sys/stat.h> #include "hos_client.h" +#include "hos_hash.h" + +typedef struct hos_client_handle_s +{ + Aws::S3::S3Client *S3Client; + size_t append_size; + int thread_sum; + Aws::Vector<Aws::S3::Model::Bucket> buckets; +}hos_client_handle_t; -std::mutex upload_mutex; +#define MAX_THREAD_NUM 255 +#define MAX_FD_NUM 65535 +hos_info_t *hash_hos_info[MAX_THREAD_NUM]; -static void PutObjectAsyncFinished(const Aws::S3::S3Client* s3Client, +static size_t hash_get_min_free_fd(hos_info_t *handle) +{ + size_t i = 0; + for (i = 1; i < MAX_FD_NUM; i++) + { + if (!find_info_by_fd(handle, i)) + return i; + } + return 0; +} + +static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, const Aws::S3::Model::PutObjectRequest& request, const Aws::S3::Model::PutObjectOutcome& outcome, const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context) { - if (outcome.IsSuccess()) { - std::cout << "Success: PutObjectAsyncFinished: Finished uploading '" - << context->GetUUID() << "'." << std::endl; - } - else { - std::cout << "Error: PutObjectAsyncFinished: " << - outcome.GetError() << std::endl; + const char *error = NULL; + bool result = outcome.IsSuccess(); + if (!result) + { + error = outcome.GetError().GetMessage().c_str(); } - + const Aws::String& uuid = context->GetUUID(); + size_t thread_id, fd; + sscanf(uuid.c_str(), "%llu %llu", &thread_id, &fd); + hos_info_t *hos_info = find_info_by_fd(hash_hos_info[thread_id], fd); + //put_finished_callback& callback = *(put_finished_callback *)hos_info->callback; + put_finished_callback callback = (put_finished_callback)hos_info->callback; + callback(result, error, hos_info->userdata); } -hos_client_handle hos_client_init(const char *endpoint, const char *accesskeyid, const char *secretkey) +hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t thread_sum) { - if (!endpoint || !accesskeyid || !secretkey) + int i; + if (!endpoint || !accesskeyid || !secretkey || thread_sum > MAX_THREAD_NUM) { return NULL; } Aws::SDKOptions options; Aws::InitAPI(options); - hos_client_handle handle = NULL; + hos_client_handle handle = (hos_client_handle)malloc(sizeof(hos_client_handle_t)); + memset(handle, 0, sizeof(hos_client_handle_t)); Aws::Client::ClientConfiguration config; Aws::Auth::AWSCredentials credentials(accesskeyid, secretkey); - //std::cout << "accesskeyid: " << credentials.GetAWSAccessKeyId() << "\n" << std::endl; - //std::cout << "secretkey: " << credentials.GetAWSSecretKey() << "\n" << std::endl; config.endpointOverride = endpoint; config.verifySSL = false; config.enableEndpointDiscovery = true; - handle = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); + handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); + handle->append_size = 30 * 1024 * 1024; + handle->thread_sum = thread_sum; + /* 获取当前用户的所有的buckets */ + Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets(); + + if (outcome.IsSuccess()) + { + handle->buckets = outcome.GetResult().GetBuckets(); + } + return handle; } +bool hos_verify_bucket(hos_client_handle handle, const char *bucket) +{ + Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets(); + + if (outcome.IsSuccess()) + { + handle->buckets = outcome.GetResult().GetBuckets(); + + for (Aws::S3::Model::Bucket& new_bucket : handle->buckets) + { + if (strcmp(new_bucket.GetName().c_str(), bucket) == 0) + { + return true; + } + } + } + return false; +} + int hos_create_bucket(hos_client_handle handle, const char *bucket) { - if (!bucket) + if ((bucket == NULL) || (handle == NULL)) { - return -1; + return HOS_PARAMETER_ERROR; } - Aws::S3::S3Client& s3Client = *(Aws::S3::S3Client *) handle; + Aws::S3::S3Client& S3Client = *handle->S3Client; + + /* 本地检查是否已经存在该bucket */ + for (Aws::S3::Model::Bucket& new_bucket : handle->buckets) + { + if (strcmp(new_bucket.GetName().c_str(), bucket) == 0) + { + return HOS_CLIENT_OK; + } + } + Aws::S3::Model::CreateBucketRequest createBucketRequest; createBucketRequest.SetBucket(bucket); - //std::cout << "bucket name: " << createBucketRequest.GetBucket() << "\n" << std::endl; - Aws::S3::Model::CreateBucketOutcome createBucketOutcome = s3Client.CreateBucket(createBucketRequest); + Aws::S3::Model::CreateBucketOutcome createBucketOutcome = S3Client.CreateBucket(createBucketRequest); if (!createBucketOutcome.IsSuccess()) { Aws::S3::S3Errors errorcode = createBucketOutcome.GetError().GetErrorType(); if (errorcode != Aws::S3::S3Errors::BUCKET_ALREADY_OWNED_BY_YOU) { - //std::cout << "Failed to create bucket: " << bucket << "\n" << createBucketOutcome.GetError() << std::endl; - return errorcode; + return (int)errorcode + 1; } } - return 0; + //handle->buckets.push_back(); + + return HOS_CLIENT_OK; } -bool hos_upload_async(hos_client_handle handle, const char *bucket, const char *object) +static int hos_upload_stream(hos_client_handle handle, const char *bucket, const char *object, + const char *data, size_t data_len, put_finished_callback callback, void *userdata, size_t thread_id, int file_type) { - Aws::S3::S3Client& s3Client = *(Aws::S3::S3Client *) handle; struct stat buffer; + char buf[128]; + size_t fd = hash_get_min_free_fd(hash_hos_info[thread_id]); - std::unique_lock<std::mutex> lock(upload_mutex); - if (stat(object, &buffer) == -1) + if ((handle == NULL) || (bucket == NULL) || (object == NULL) || (callback == NULL) || (thread_id > handle->thread_sum)) { - //error: file does not exist. - return false; + return HOS_PARAMETER_ERROR; } + Aws::S3::S3Client& S3Client = *handle->S3Client; // Create and configure the asynchronous put object request. Aws::S3::Model::PutObjectRequest request; request.SetBucket(bucket); request.SetKey(object); + + //设置上传数据类型 + if (file_type == 0) + { + if (stat(object, &buffer) == -1) + { + return HOS_FILE_NOT_EXITS; + } + //文件类型 + const std::shared_ptr<Aws::IOStream> input_data = + Aws::MakeShared<Aws::FStream>("SampleAllocationTag", object, std::ios_base::in | std::ios_base::binary); + request.SetBody(input_data); + } + else + { + //内存块 + const std::shared_ptr<Aws::IOStream> input_data = + Aws::MakeShared<Aws::StringStream>(data); + Aws::String stream (data, data_len); + *input_data << stream; + request.SetBody(input_data); + } + + //设置回调函数 + std::shared_ptr<Aws::Client::AsyncCallerContext> context = + Aws::MakeShared<Aws::Client::AsyncCallerContext>(""); + sprintf(buf, "%ld %d", thread_id, fd); + context->SetUUID(buf); + + hos_info_t info = {fd, 0, handle, bucket, object, (void *)callback, userdata, }; + add_hos_info(&hash_hos_info[thread_id], &info); + + S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); + return HOS_CLIENT_OK; +} + +int hos_upload_file(hos_client_handle handle, const char *bucket, const char *file_path, + put_finished_callback callback, void *userdata, size_t thread_id) +{ + return hos_upload_stream(handle, bucket, file_path, NULL, 0, callback, userdata, thread_id, 0); +} + +int hos_upload_buf(hos_client_handle handle, const char *bucket, const char *object, + const char *buf, size_t buf_len, put_finished_callback callback, void *userdata, size_t thread_id) +{ + return hos_upload_stream(handle, bucket, object, buf, buf_len, callback, userdata, thread_id, 1); +} + +int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id, int mode) +{ + if ((handle == NULL) || (bucket == NULL) || (object == NULL) || (thread_id > handle->thread_sum)) + { + return HOS_PARAMETER_ERROR; + } + + size_t fd = hash_get_min_free_fd(hash_hos_info[thread_id]); + if (fd == 0) + { + return HOS_FD_NOT_ENOUGH; + } + + hos_info_t info = {fd, mode, handle, bucket, object, (void *)callback, userdata, }; + add_hos_info(&hash_hos_info[thread_id], &info); + + return fd; +} + +int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id) +{ + struct stat buffer; + hos_info_t *hos_info = NULL; + hos_client_handle handle = NULL; + char buf[128]; + if ((fd == 0) || (stream == NULL) || (thread_id > MAX_THREAD_NUM)) + { + return HOS_PARAMETER_ERROR; + } + + hos_info = find_info_by_fd(hash_hos_info[thread_id], fd); + if (hos_info == NULL) + { + return HOS_HASH_NOT_FIND; + } + + handle = (hos_client_handle)hos_info->handle; + Aws::S3::S3Client& S3Client = *(handle->S3Client); + + // Create and configure the asynchronous put object request. + Aws::S3::Model::PutObjectRequest request; + request.SetBucket(hos_info->bucket); + request.SetKey(hos_info->object); + + //TODO APPEND MODE + + //设置上传数据类型 + if (hos_info->mode & BUFF_MODE) + { + //BUFF_MODE + const std::shared_ptr<Aws::IOStream> input_data = + Aws::MakeShared<Aws::StringStream>(stream, stream + stream_len); + Aws::String buffer (stream, stream_len); + *input_data << buffer; + request.SetBody(input_data); + } + else + { + //BUFF_MODE + if (stat(hos_info->object, &buffer) == -1) + { + return HOS_FILE_NOT_EXITS; + } + //文件类型 + const std::shared_ptr<Aws::IOStream> input_data = + Aws::MakeShared<Aws::FStream>("SampleAllocationTag", hos_info->object, std::ios_base::in | std::ios_base::binary); + request.SetBody(input_data); + } - const std::shared_ptr<Aws::IOStream> input_data = - Aws::MakeShared<Aws::FStream>("SampleAllocationTag", object, std::ios_base::in | std::ios_base::binary); + //设置回调函数 + std::shared_ptr<Aws::Client::AsyncCallerContext> context = + Aws::MakeShared<Aws::Client::AsyncCallerContext>(""); + sprintf(buf, "%ld %d", thread_id, fd); + context->SetUUID(buf); + + S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); + return HOS_CLIENT_OK; +} - request.SetBody(input_data); +int hos_close_fd(size_t fd, size_t thread_id) +{ + if (fd == 0) + { + return HOS_PARAMETER_ERROR; + } - std::shared_ptr<Aws::Client::AsyncCallerContext> context = - Aws::MakeShared<Aws::Client::AsyncCallerContext>("PutObjectAllocationTag"); - context->SetUUID(object); - s3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); - return true; + delete_info_by_fd(hash_hos_info[thread_id], fd); + + return HOS_CLIENT_OK; } -void hos_client_close(hos_client_handle handle) +int hos_client_destory(hos_client_handle handle) { + int i = 0; if (handle == NULL) { - return; + return HOS_PARAMETER_ERROR; + } + + delete handle->S3Client; + + for (i = 0; i < handle->thread_sum; i++) + { + delete_all(hash_hos_info[i]); } - delete (Aws::S3::S3Client *)handle; + free(handle); - return ; + return HOS_CLIENT_OK; } |
