summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorpengxuanzheng <[email protected]>2020-10-09 14:20:39 +0800
committerpengxuanzheng <[email protected]>2020-11-02 18:59:51 +0800
commita6bc240ec0afed54d0b493ed9f2dc207f34df447 (patch)
treecb3860d46e46d9e498518118c6e75107de012cbb /src
parent76477b253d9cd3a1f3250965bc25de472c94ad32 (diff)
优化fd的生成及记录
Diffstat (limited to 'src')
-rw-r--r--src/hos_client.cpp40
-rw-r--r--src/hos_client.h12
2 files changed, 37 insertions, 15 deletions
diff --git a/src/hos_client.cpp b/src/hos_client.cpp
index 659db6c5..f49bf9ff 100644
--- a/src/hos_client.cpp
+++ b/src/hos_client.cpp
@@ -25,20 +25,21 @@ typedef struct hos_client_handle_s
Aws::S3::S3Client *S3Client;
size_t append_size;
size_t thread_sum;
- Aws::SDKOptions *options;
+ Aws::SDKOptions options;
Aws::Vector<Aws::S3::Model::Bucket> buckets;
}hos_client_handle_t;
#define MAX_THREAD_NUM 255
#define MAX_FD_NUM 65535
hos_info_t *hash_hos_info[MAX_THREAD_NUM];
+size_t fd_info[MAX_THREAD_NUM][MAX_FD_NUM];
-static size_t hash_get_min_free_fd(hos_info_t *handle)
+static size_t hash_get_min_free_fd(size_t thread_id)
{
size_t i = 0;
for (i = 1; i < MAX_FD_NUM; i++)
{
- if (!find_info_by_fd(handle, i))
+ if (!fd_info[thread_id][i])
return i;
}
return 0;
@@ -50,6 +51,7 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context)
{
const char *error = NULL;
+ hos_info_t *hos_info = NULL;
bool result = outcome.IsSuccess();
if (!result)
{
@@ -58,8 +60,14 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
const Aws::String& uuid = context->GetUUID();
size_t thread_id, fd;
sscanf(uuid.c_str(), "%lu %lu", &thread_id, &fd);
- hos_info_t *hos_info = find_info_by_fd(hash_hos_info[thread_id], fd);
- //put_finished_callback& callback = *(put_finished_callback *)hos_info->callback;
+ if (fd_info[thread_id][fd])
+ {
+ hos_info = find_info_by_fd(hash_hos_info[thread_id], fd);
+ }
+ if (hos_info == NULL)
+ {
+ return ;
+ }
put_finished_callback callback = (put_finished_callback)hos_info->callback;
callback(result, error, hos_info->userdata);
if (hos_info->mode & APPEND_MODE)
@@ -78,8 +86,8 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi
{
return NULL;
}
- //Aws::SDKOptions *options = (Aws::SDKOptions *)malloc(sizeof(Aws::SDKOptions));
Aws::SDKOptions options;
+ //auto options = new Aws::SDKOptions;
Aws::InitAPI(options);
hos_client_handle handle = (hos_client_handle)malloc(sizeof(hos_client_handle_t));
@@ -96,7 +104,7 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi
handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
handle->append_size = 30 * 1024 * 1024;
handle->thread_sum = thread_sum;
- handle->options = &options;
+ handle->options = options;
/* 获取当前用户的所有的buckets */
Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets();
@@ -168,7 +176,7 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const
{
struct stat buffer;
char buf[128];
- size_t fd = hash_get_min_free_fd(hash_hos_info[thread_id]);
+ size_t fd = hash_get_min_free_fd(thread_id);
if ((handle == NULL) || (bucket == NULL) || (object == NULL) || (callback == NULL) || (thread_id > handle->thread_sum))
{
@@ -211,6 +219,7 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const
hos_info_t info = {fd, 0, handle, bucket, object, (void *)callback, userdata, };
add_hos_info(&hash_hos_info[thread_id], &info);
+ fd_info[thread_id][fd] = 1;
S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context);
return HOS_CLIENT_OK;
@@ -235,7 +244,7 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object
return HOS_PARAMETER_ERROR;
}
- size_t fd = hash_get_min_free_fd(hash_hos_info[thread_id]);
+ size_t fd = hash_get_min_free_fd(thread_id);
if (fd == 0)
{
return HOS_FD_NOT_ENOUGH;
@@ -243,6 +252,7 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object
hos_info_t info = {fd, mode, handle, bucket, object, (void *)callback, userdata, };
add_hos_info(&hash_hos_info[thread_id], &info);
+ fd_info[thread_id][fd] = 1;
return fd;
}
@@ -259,7 +269,10 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
return HOS_PARAMETER_ERROR;
}
- hos_info = find_info_by_fd(hash_hos_info[thread_id], fd);
+ if (fd_info[thread_id][fd])
+ {
+ hos_info = find_info_by_fd(hash_hos_info[thread_id], fd);
+ }
if (hos_info == NULL)
{
return HOS_HASH_NOT_FIND;
@@ -279,7 +292,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
if (hos_info->mode & APPEND_MODE)
{
headers["x-hos-upload-type"] = "append";
- headers["x-hos_position"] = num;
+ headers["x-hos-position"] = num;
request.SetMetadata(headers);
#if 0
request.AddMetadata("x-hos-upload-type", "append");
@@ -333,6 +346,7 @@ int hos_close_fd(size_t fd, size_t thread_id)
}
delete_info_by_fd(&hash_hos_info[thread_id], fd);
+ fd_info[thread_id][fd] = 0;
return HOS_CLIENT_OK;
}
@@ -348,13 +362,13 @@ int hos_client_destory(hos_client_handle handle)
delete handle->S3Client;
Aws::Vector<Aws::S3::Model::Bucket>().swap(handle->buckets);
- Aws::ShutdownAPI(*(handle->options));
+ Aws::ShutdownAPI((handle->options));
for (i = 0; i < handle->thread_sum; i++)
{
delete_all(&hash_hos_info[i]);
}
- //free(handle->options);
+ //delete(handle->options);
free(handle);
return HOS_CLIENT_OK;
diff --git a/src/hos_client.h b/src/hos_client.h
index 961539ea..d1cb7db5 100644
--- a/src/hos_client.h
+++ b/src/hos_client.h
@@ -79,9 +79,10 @@ typedef void (*put_finished_callback)(bool, const char *, void *);
* 参数: const char *endpoint 目的地址,如”http://192.168.44.12:9098/hos“
* const char *accesskeyid AWS access key ID,如”default“
* const char *secretkey AWS secret key,如”default“
+ * size_t thread_sum 线程总数
* 返回值: 成功返回一个非空句柄,失败返回NULL。(失败原因都是因为输入参数为空)
*************************************************************************************/
-hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t thread_id);
+hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t thread_sum);
/*************************************************************************************
* 函数名: hos_create_bucket
* 参数: hos_client_handle handle 非空句柄
@@ -100,8 +101,10 @@ int hos_create_bucket(hos_client_handle handle, const char *bucket);
* 函数名: hos_upload_async
* 参数: hos_client_handle handle 非空句柄
* const char * bucket 桶名称
- * const char * object 上传对象名称
+ * const char * file_path 上传对象路径
* put_finished_callback callback upload操作结束时调用的回调函数
+ * void *userdata 用户自定义数据
+ * size_t thread_id 当前线程id
* 返回值 int 成功返回0,失败返回hoserros错误码
*************************************************************************************/
int hos_upload_file(hos_client_handle handle, const char *bucket, const char *file_path, put_finished_callback callback, void* userdata, size_t thread_id);
@@ -110,7 +113,11 @@ int hos_upload_file(hos_client_handle handle, const char *bucket, const char *fi
* 参数: hos_client_handle handle 非空句柄
* const char * bucket 桶名称
* const char * object 上传对象名称
+ * const char *buf 上传的buf
+ * size_t buf_len 上传的buf的长度
* put_finished_callback callback upload操作结束时调用的回调函数
+ * void *userdata 用户自定义数据
+ * size_t thread_id 当前线程id
* 返回值 int 成功返回0,失败返回hoserros错误码
*************************************************************************************/
int hos_upload_buf(hos_client_handle handle, const char *bucket, const char *object, const char *buf, size_t buf_len, put_finished_callback callback, void *userdata, size_t thread_id);
@@ -132,6 +139,7 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object
* const char * stream 待上传的数据
* size_t stream 待上传的数据长度
* size_t thread_id 线程ID
+ * size_t position append模式下的每段内容编号
* 返回值 int 成功返回0,失败返回hoserros错误码
*************************************************************************************/
int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id, size_t position);