summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
author彭宣正 <[email protected]>2020-12-01 18:24:20 +0800
committer彭宣正 <[email protected]>2020-12-02 09:48:01 +0800
commita5b96484152a5a6321ea373a0e550da68712aa04 (patch)
tree4ac226d792669a4b01c31dfcead402ba4815783e /src
parent63d45c50b7043165481d61c5d93849a47b362f6e (diff)
取消cache模式下,多次发送的机制
Diffstat (limited to 'src')
-rw-r--r--src/hos_client.cpp61
-rw-r--r--src/hos_hash.h2
2 files changed, 19 insertions, 44 deletions
diff --git a/src/hos_client.cpp b/src/hos_client.cpp
index 3497e97e..18207471 100644
--- a/src/hos_client.cpp
+++ b/src/hos_client.cpp
@@ -641,7 +641,8 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object
return HOS_FD_NOT_ENOUGH;
}
- hos_info_t info = {fd, mode, handle, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, handle->cache_count, handle->cache_size, 0, 0, HOS_FD_REGISTER, 0, handle->timeout,};
+ hos_info_t info = {fd, mode, handle, (char *)bucket, (char *)object, (void *)callback, userdata,
+ NULL,/*cache*/ handle->cache_count, 0,/*position*/ 0,/*recive_cnt*/(long)handle->cache_size,/*cache_rest*/ HOS_FD_REGISTER,/*fd_status*/ 0,/*overtime*/ handle->timeout,};
add_hos_info(&hash_hos_info[thread_id], &info);
#if 1
if (handle->fd_thread == 0)
@@ -660,8 +661,6 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
hos_client_handle handle = NULL;
char num[128];
char buf[128];
- int flag = 0; // 0, 一次处理就可以完成;1,需要多次处理才能处理完
- int rest; // stream 剩余未处理的数据长度
int ret = 0;
data_info_t *data_info = NULL;
@@ -711,51 +710,28 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
if (hos_info->cache_count == 0)
{
//不设置cache_count的情况下
- if (stream_len < hos_info->cache_rest)
+ Aws::String buffer (stream, stream_len);
+ *hos_info->cache << buffer;
+ hos_info->cache_rest -= stream_len;
+ hos_cache[thread_id] += stream_len;
+ if (hos_info->cache_rest > 0)
{
- // cache
- Aws::String buffer (stream, stream_len);
- *hos_info->cache << buffer;
- hos_info->cache_rest -= stream_len;
- if (hos_info->cache_rest > 0)
- {
- hos_cache[thread_id] += stream_len;
- return HOS_CLIENT_OK;
- }
- }else if (stream_len >= hos_info->cache_rest)
- {
- // multi handle
- flag = 1;
- Aws::String buffer (stream, hos_info->cache_rest);
- *hos_info->cache << buffer;
- rest = stream_len - hos_info->cache_rest;
+ return HOS_CLIENT_OK;
}
}else
{
+ // cache
+ Aws::String buffer (stream, stream_len);
+ *hos_info->cache << buffer;
+ hos_info->cache_rest -= stream_len;
+ hos_cache[thread_id] += stream_len;
//设置cache times的情况下
- if ((--hos_info->cache_count) && (stream_len <= hos_info->cache_rest))
+ if (--hos_info->cache_count)
{
- // cache
- Aws::String buffer (stream, stream_len);
- *hos_info->cache << buffer;
- hos_info->cache_rest -= stream_len;
if (hos_info->cache_rest > 0)
{
- hos_cache[thread_id] += stream_len;
return HOS_CLIENT_OK;
}
- }else if (stream_len > hos_info->cache_rest)
- {
- // multi handle
- flag = 1;
- Aws::String buffer (stream, hos_info->cache_rest);
- *hos_info->cache << buffer;
- rest = stream_len - hos_info->cache_rest;
- }else
- {
- //over cache_count
- Aws::String buffer (stream, stream_len);
- *hos_info->cache << buffer;
}
}
request.SetBody(hos_info->cache);
@@ -805,6 +781,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
//恢复fd 的cache设置
if (hos_info->mode & APPEND_MODE)
{
+ hos_info->cache.reset();
hos_info->cache = NULL;
hos_info->cache_rest = hos_info->handle->cache_size;
hos_info->cache_count = hos_info->handle->cache_count;
@@ -828,10 +805,6 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
data_info->tx_bytes[thread_id] += buffer.st_size;
}
}
- while (flag == 1)
- {
- return hos_write(fd, &stream[hos_info->cache_rest], rest, thread_id);
- }
}else
{
return HOS_SEND_FAILED;
@@ -863,7 +836,7 @@ int hos_close_fd(size_t fd, size_t thread_id)
//close fd 之前发送append的缓存中内容
if ((hos_info->mode & BUFF_MODE) && (hos_info->mode & APPEND_MODE))
{
- if (hos_info->cache_rest != hos_info->handle->cache_size)
+ if (hos_info->cache_rest != (long)hos_info->handle->cache_size)
{
//handle = (hos_client_handle)hos_info->handle;
Aws::S3::S3Client& S3Client = *(hos_info->handle->S3Client);
@@ -900,6 +873,7 @@ int hos_close_fd(size_t fd, size_t thread_id)
}
}
hos_info->fd_status = HOS_FD_INJECT;
+ hos_info->cache.reset();
hos_info->overtime = get_current_ms() + hos_info->timeout;
fd_info[thread_id][HOS_FD_REGISTER]--;
@@ -955,6 +929,7 @@ int hos_client_destory(hos_client_handle handle)
if (data_info->tx_bytes_last)
free(data_info->tx_bytes_last);
}
+ free(handle->fs2_info[i].reserved);
}
if (handle->fs2_info[i].line_ids)
free(handle->fs2_info[i].line_ids);
diff --git a/src/hos_hash.h b/src/hos_hash.h
index 24dac240..3936b3ec 100644
--- a/src/hos_hash.h
+++ b/src/hos_hash.h
@@ -21,9 +21,9 @@ typedef struct hos_info_s
void *userdata;
std::shared_ptr<Aws::IOStream> cache;
size_t cache_count;
- size_t cache_rest;
size_t position;
size_t recive_cnt;
+ long cache_rest;
int fd_status;
#define HOS_FD_FREE 0
#define HOS_FD_REGISTER 1