diff options
Diffstat (limited to 'src/hos_client.cpp')
| -rw-r--r-- | src/hos_client.cpp | 44 |
1 files changed, 25 insertions, 19 deletions
diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 52e82f81..fcd9742b 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -917,13 +917,14 @@ long hos_open_fd(const char *bucket, const char *object, put_finished_callback c hos_fd->cache_rest = g_hos_handle.hos_config.cache_size; hos_fd->fd_status = HOS_FD_REGISTER; hos_fd->reslut = true; + hos_fd->thread_id = thread_id; MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "debug: [%s] thread_id:%lu, fd:%lu", g_hos_instance.hos_url_prefix, thread_id, (long)&hos_fd); return (long)hos_fd; } -int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id) +int hos_write(size_t fd, const char *stream, size_t stream_len) { hos_fd_context_t *a_fd_context = NULL; char num[128]; @@ -932,12 +933,22 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id hos_config_t *hos_conf = &g_hos_handle.hos_config; hos_func_thread_t *hos_func = &g_hos_handle.hos_func; size_t upload_len = 0; + size_t thread_id = 0; if (g_hos_instance.status != INSTANCE_ENABLE_STATE) { return HOS_INSTANCE_NOT_ENABLE; } + a_fd_context = (hos_fd_context_t *)fd; + if (a_fd_context == NULL) + { + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: [%s] fd is NULL", g_hos_instance.hos_url_prefix); + return HOS_FD_IS_INVALID; + } + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: [%s] Get fd_context", g_hos_instance.hos_url_prefix); + + thread_id = a_fd_context->thread_id; if ((stream == NULL) || (thread_id > hos_conf->thread_num)) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, @@ -946,14 +957,6 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id return HOS_PARAMETER_ERROR; } - a_fd_context = (hos_fd_context_t *)fd; - if (a_fd_context == NULL) - { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: [%s] fd is NULL", g_hos_instance.hos_url_prefix); - return HOS_HASH_NOT_FIND; - } - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: [%s] Get fd_context", g_hos_instance.hos_url_prefix); - // create and configure the asynchronous put object request. Aws::S3::Model::PutObjectRequest request; @@ -990,7 +993,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id request.SetBody(a_fd_context->cache); // add headers - atomic_add(&(a_fd_context->position), 1); + atomic_add(&(a_fd_context->position), 100001); snprintf(num, 128, "%lu", atomic_read(&(a_fd_context->position))); Aws::Map<Aws::String, Aws::String> headers; headers["x-hos-upload-type"] = "append"; @@ -1027,25 +1030,19 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id return ret; } -int hos_close_fd(size_t fd, size_t thread_id) +int hos_close_fd(size_t fd) { hos_fd_context_t *a_fd_context = NULL; char num[128]; hos_config_t *hos_conf = &g_hos_handle.hos_config; size_t upload_len = 0; + size_t thread_id = 0; if (g_hos_instance.status == INSTANCE_UNINIT_STATE) { return HOS_INSTANCE_NOT_INIT; } - if (thread_id > hos_conf->thread_num) - { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_close_fd", - "error: [%s] fd:%lu, thread_id:%lu, thread_sum:%u.", - g_hos_instance.hos_url_prefix, fd, thread_id, hos_conf->thread_num); - return HOS_PARAMETER_ERROR; - } a_fd_context = (hos_fd_context_t *)fd; if (a_fd_context == NULL) { @@ -1055,6 +1052,15 @@ int hos_close_fd(size_t fd, size_t thread_id) return HOS_CLIENT_OK; } + thread_id = a_fd_context->thread_id; + if (thread_id > hos_conf->thread_num) + { + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_close_fd", + "error: [%s] fd:%lu, thread_id:%lu, thread_sum:%u.", + g_hos_instance.hos_url_prefix, fd, thread_id, hos_conf->thread_num); + return HOS_PARAMETER_ERROR; + } + //close fd 之前发送append的缓存中内容 if ((a_fd_context->mode & BUFF_MODE) && (a_fd_context->mode & APPEND_MODE)) { @@ -1067,7 +1073,7 @@ int hos_close_fd(size_t fd, size_t thread_id) request.SetBody(a_fd_context->cache); // add headers - atomic_add(&(a_fd_context->position), 1); + atomic_add(&(a_fd_context->position), 100001); snprintf(num, 128, "%lu", atomic_read(&(a_fd_context->position))); Aws::Map<Aws::String, Aws::String> headers; headers["x-hos-upload-type"] = "append"; |
