summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author彭宣正 <[email protected]>2021-04-26 18:17:37 +0800
committer彭宣正 <[email protected]>2021-06-04 14:02:22 +0800
commit87537258b3568ec89dd0082a9ed6d5c3399bc7fc (patch)
treeb4195db6ab3295efecdeabadc23fc1b2e996a9f9
parentc5727b4c841523783b30264023b48876b5ee4e6a (diff)
优化shutdown函数
-rw-r--r--example/demo/hos_write_demo.cpp32
-rw-r--r--src/hos_client.cpp188
2 files changed, 118 insertions, 102 deletions
diff --git a/example/demo/hos_write_demo.cpp b/example/demo/hos_write_demo.cpp
index 9a8ecb1f..c4520614 100644
--- a/example/demo/hos_write_demo.cpp
+++ b/example/demo/hos_write_demo.cpp
@@ -27,23 +27,23 @@ static size_t calc_time(struct timespec start, struct timespec end)
(start.tv_sec * 1000 * 1000 * 1000 + start.tv_nsec));
}
-int file_to_buffer(const char *file, char *buffer, size_t *len)
+int file_to_buffer(const char *file, char *buffer)
{
FILE *fp = fopen(file, "r");
int num = 0;
- *len = 0;
+ int len = 0;
if (fp == NULL)
{
printf("fopen file failed:%s\n", file);
return -1;
}
do{
- num = fread(&buffer[*len], 1, 4096, fp);
+ num = fread(&buffer[len], 1, 4096, fp);
if (num < 0)
{
return -1;
}
- *len += num;
+ len += num;
}while(num == 4096);
fclose(fp);
return 0;
@@ -73,7 +73,7 @@ int main(int argc, char *argv[])
char *buf = NULL;
size_t buf_size;
int mode = FILE_MODE;
- size_t fd[10001] = {0};
+ size_t fd = 0;
userdata_t data = {&finished};
hos_instance hos_instance = NULL;
char object[1024];
@@ -86,7 +86,7 @@ int main(int argc, char *argv[])
buf = (char *)calloc(1, buffer.st_size);
- if (file_to_buffer(file_name, buf, &buf_size) == -1)
+ if (file_to_buffer(file_name, buf) == -1)
{
free(buf);
return -1;
@@ -109,38 +109,38 @@ int main(int argc, char *argv[])
mode = FILE_MODE;
printf("hos_write file start ...\n");
snprintf(object, 1023, "%s_write_file", file_name);
- fd[0] = hos_open_fd("hos_test_bucket", object, callback, NULL, 0, mode);
- if (hos_write(fd[i], file_name, 0, 0) != HOS_CLIENT_OK)
+ fd = hos_open_fd("hos_test_bucket", object, callback, NULL, 0, mode);
+ if (hos_write(fd, file_name, 0, 0) != HOS_CLIENT_OK)
{
printf("error: hos_write fialed!\n");
}
- hos_close_fd(fd[1], 0);
+ hos_close_fd(fd, 0);
printf("hos_write file end ...\n");
mode = BUFF_MODE;
printf("hos_write buff start ...\n");
snprintf(object, 1023, "%s_write_buff", file_name);
- fd[1] = hos_open_fd("hos_test_bucket", object, callback, NULL, 0, mode);
- if (hos_write(fd[i], buf, buffer.st_size, 0) != HOS_CLIENT_OK)
+ fd = hos_open_fd("hos_test_bucket", object, callback, NULL, 0, mode);
+ if (hos_write(fd, buf, buffer.st_size, 0) != HOS_CLIENT_OK)
{
printf("error: hos_write failed!\n");
}
- hos_close_fd(fd[1], 0);
+ hos_close_fd(fd, 0);
printf("hos_write buff end ...\n");
mode = BUFF_MODE | APPEND_MODE;
printf("hos_write buff start ...\n");
snprintf(object, 1023, "%s_write_APPEND", file_name);
- fd[2] = hos_open_fd("hos_test_bucket", object, callback, NULL, 0, mode);
- if (hos_write(fd[i], buf, buffer.st_size, 0) != HOS_CLIENT_OK)
+ fd = hos_open_fd("hos_test_bucket", object, callback, NULL, 0, mode);
+ if (hos_write(fd, buf, buffer.st_size, 0) != HOS_CLIENT_OK)
{
printf("error: hos_write failed 1st!\n");
}
- if (hos_write(fd[i], buf, buffer.st_size, 0) != HOS_CLIENT_OK)
+ if (hos_write(fd, buf, buffer.st_size, 0) != HOS_CLIENT_OK)
{
printf("error: hos_write failed 2nd!\n");
}
- hos_close_fd(fd[2], 0);
+ hos_close_fd(fd, 0);
printf("hos_write buff end ...\n");
printf("hos_shutdown_instance start ...\n");
diff --git a/src/hos_client.cpp b/src/hos_client.cpp
index 4d5dbd4d..d290081e 100644
--- a/src/hos_client.cpp
+++ b/src/hos_client.cpp
@@ -415,86 +415,85 @@ static void *fs2_statistics(void *ptr)
FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[1], fs2_info->column_ids[6], FS_OP_SET, cache_sum);
//PoolThread State
- *busy = g_hos_handle.executor->GetTaskSize();
- *top_busy = (*busy) > (*top_busy) ? (*busy) : (*top_busy);
- pool_history_sum += *busy;
-
- fs2_info = &hos_func->fs2_info[FS2_POOL_THREAD_STATE];
- for (i = 0; i < 3; i++)
+ if (hos_conf->pool_thread_size > 0)
{
- FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[i], FS_OP_SET, PoolThread_state[i]);
+ *busy = g_hos_handle.executor->GetTaskSize();
+ *top_busy = (*busy) > (*top_busy) ? (*busy) : (*top_busy);
+ pool_history_sum += *busy;
+
+ fs2_info = &hos_func->fs2_info[FS2_POOL_THREAD_STATE];
+ for (i = 0; i < 3; i++)
+ {
+ FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[i], FS_OP_SET, PoolThread_state[i]);
+ }
}
-
+
sleep(1);
}
pthread_exit(NULL);
}
-static void hos_expand_fs2(const char * path, int format, char *server_ip, int port)
+static screen_stat_handle_t hos_init_fs2(char *app_name, int app_name_size)
{
- fs2_info_t *fs2_info = NULL;
- screen_stat_handle_t fs2_handle = NULL;
- const char *app_name[] = {"hos-data", "hos-poolthread"};
int value = 0;
+ screen_stat_handle_t fs2_handle = FS_create_handle();
hos_config_t *hos_conf = &g_hos_handle.hos_config;
- hos_func_thread_t *hos_func = &g_hos_handle.hos_func;
- size_t i = 0;
- if (hos_func->fs2_info[0].fs2_handle)
- return;
- //fs2 init
- for (i = 0; i < FS2_RECORD_EVENTS; i++)
+ FS_set_para(fs2_handle, APP_NAME, app_name, app_name_size + 1);
+ value = 1; //true
+ FS_set_para(fs2_handle, FLUSH_BY_DATE, &value, sizeof(value));
+ if (hos_conf->fs2_path != NULL)
+ {
+ FS_set_para(fs2_handle, OUTPUT_DEVICE, hos_conf->fs2_path, strlen(hos_conf->fs2_path) + 1);
+ }
+ value = 2;
+ FS_set_para(fs2_handle, PRINT_MODE, &value, sizeof(value));
+ value = 1;
+ FS_set_para(fs2_handle, CREATE_THREAD, &value, sizeof(value));
+ FS_set_para(fs2_handle, METRIS_FORMAT, &hos_conf->fs2_fmt, sizeof(hos_conf->fs2_fmt));
+ FS_set_para(fs2_handle, STAT_CYCLE, &value, sizeof(value));
+ value = 4096;
+ FS_set_para(fs2_handle, MAX_STAT_FIELD_NUM, &value, sizeof(value));
+ if (hos_conf->fs2_ip == NULL)
+ {
+ FS_set_para(fs2_handle, STATS_SERVER_IP, "127.0.0.1", strlen("127.0.0.1"));
+ }
+ else
{
- hos_func->fs2_info[i].fs2_handle = FS_create_handle();
- fs2_handle = hos_func->fs2_info[i].fs2_handle;
+ FS_set_para(fs2_handle, STATS_SERVER_IP, hos_conf->fs2_ip, strlen(hos_conf->fs2_ip));
+ }
- FS_set_para(fs2_handle, APP_NAME, app_name[i], strlen(app_name[i]) + 1);
- value = 1;//true
- FS_set_para(fs2_handle, FLUSH_BY_DATE, &value, sizeof(value));
- if (path != NULL)
- {
- if (FS_set_para(fs2_handle, OUTPUT_DEVICE, path, strlen(path) + 1) != 0)
- {
- MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: fs2 OUTOUT_DEVICE:%s", path);
- return;
- }
- }
- value = 2;
- FS_set_para(fs2_handle, PRINT_MODE, &value, sizeof(value));
- value = 1;
- FS_set_para(fs2_handle, CREATE_THREAD, &value, sizeof(value));
- FS_set_para(fs2_handle, METRIS_FORMAT, &format, sizeof(format));
- FS_set_para(fs2_handle, STAT_CYCLE, &value, sizeof(value));
- value = 4096;
- FS_set_para(fs2_handle, MAX_STAT_FIELD_NUM, &value, sizeof(value));
- if (server_ip == NULL)
- {
- FS_set_para(fs2_handle, STATS_SERVER_IP, "127.0.0.1", strlen("127.0.0.1"));
- }
- else
- {
- FS_set_para(fs2_handle, STATS_SERVER_IP, server_ip, strlen(server_ip));
- }
+ FS_set_para(fs2_handle, STATS_SERVER_PORT, &hos_conf->fs2_port, sizeof(hos_conf->fs2_port));
- FS_set_para(fs2_handle, STATS_SERVER_PORT, &port, sizeof(port));
+ value = FS_OUTPUT_STATSD;
+ FS_set_para(fs2_handle, STATS_FORMAT, &value, sizeof(value));
- value = FS_OUTPUT_STATSD;
- FS_set_para(fs2_handle, STATS_FORMAT, &value, sizeof(value));
- }
+ return fs2_handle;
+}
- //pkts and bytes info
- fs2_info = &hos_func->fs2_info[FS2_DATA_FLOW_STATE];
- fs2_handle = hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle;
- fs2_info->line_ids = (int *)calloc(2, sizeof(int));
- fs2_info->column_ids = (int *)calloc(6, sizeof(int));
+static void hos_expand_fs2()
+{
+ fs2_info_t *fs2_info = NULL;
+ screen_stat_handle_t fs2_handle = NULL;
+ hos_config_t *hos_conf = &g_hos_handle.hos_config;
+ hos_func_thread_t *hos_func = &g_hos_handle.hos_func;
+ size_t i = 0;
+ if (hos_func->fs2_info[0].fs2_handle)
+ return;
//data info
/**********************************************************************************************************
* rx_pkts rx_bytes tx_pkts tx_bytes tx_failed_p tx_failed_b cache_bytes
* current 10 100 1 100 0 0 100
* total 100 1000 10 1000 0 0 100(无实意)
***********************************************************************************************************/
- const char *data_col[] = {"rx_pkts", "rx_bytes", "tx_pkts", "tx_bytes", "tx_failed_b", "cache_bytes"};
+ fs2_info = &hos_func->fs2_info[FS2_DATA_FLOW_STATE];
+ hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle = hos_init_fs2((char *)"hos-data", strlen("hos-data"));
+ fs2_handle = hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle;
+ fs2_info->line_ids = (int *)calloc(2, sizeof(int));
+ fs2_info->column_ids = (int *)calloc(7, sizeof(int));
+
+ const char *data_col[] = {"rx_pkts", "rx_bytes", "tx_pkts", "tx_bytes", "tx_failed_p", "tx_failed_b", "cache_bytes"};
for (i = 0; i < sizeof(data_col) / sizeof(const char *); i++)
{
fs2_info->column_ids[i] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, data_col[i]);
@@ -503,10 +502,8 @@ static void hos_expand_fs2(const char * path, int format, char *server_ip, int p
fs2_info->line_ids[1] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, "total");
hos_func->fs2_status = HOS_FS2_START;
-
data_info_t *data_info = (data_info_t *)calloc(1, sizeof(data_info_t));
fs2_info->reserved = (void *)data_info;
- #if 1
data_info->tx_pkts = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
data_info->tx_bytes = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
data_info->rx_pkts = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
@@ -514,34 +511,30 @@ static void hos_expand_fs2(const char * path, int format, char *server_ip, int p
data_info->tx_failed_pkts = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
data_info->tx_failed_bytes = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
data_info->cache = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
- #else
- data_info->tx_pkts_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
- data_info->tx_bytes_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
- data_info->rx_pkts_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
- data_info->rx_bytes_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
- data_info->tx_failed_bytes_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
- #endif
- //FS_start(hos_func->fs2_info[0].fs2_handle);
FS_start(fs2_handle);
- //PoolThread state
- /*******************************************************
- * PoolSize Busy TopBusy AveBusy
- * ThreadNum 1000 500 800 650
- ********************************************************/
- fs2_info = &hos_func->fs2_info[FS2_POOL_THREAD_STATE];
- fs2_handle = hos_func->fs2_info[FS2_POOL_THREAD_STATE].fs2_handle;
- fs2_info->line_ids = (int *)calloc(1, sizeof(int));
- fs2_info->column_ids = (int *)calloc(3, sizeof(int));
-
- const char *poolthread_col[3] = {"PoolSize", "Busy", "TopBusy"};
- for (i = 0; i < sizeof(poolthread_col) / sizeof(const char *); i++)
+ if (hos_conf->pool_thread_size > 0)
{
- fs2_info->column_ids[i] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, poolthread_col[i]);
- }
- fs2_info->line_ids[0] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, "ThreadNum");
+ //PoolThread state
+ /*******************************************************
+ * PoolSize Busy TopBusy AveBusy
+ * ThreadNum 1000 500 800 650
+ ********************************************************/
+ fs2_info = &hos_func->fs2_info[FS2_POOL_THREAD_STATE];
+ hos_func->fs2_info[FS2_POOL_THREAD_STATE].fs2_handle = hos_init_fs2((char *)"hos-poolthread", strlen("hos-poolthread"));
+ fs2_handle = hos_func->fs2_info[FS2_POOL_THREAD_STATE].fs2_handle;
+ fs2_info->line_ids = (int *)calloc(1, sizeof(int));
+ fs2_info->column_ids = (int *)calloc(3, sizeof(int));
- FS_start(fs2_handle);
+ const char *poolthread_col[3] = {"PoolSize", "Busy", "TopBusy"};
+ for (i = 0; i < sizeof(poolthread_col) / sizeof(const char *); i++)
+ {
+ fs2_info->column_ids[i] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, poolthread_col[i]);
+ }
+ fs2_info->line_ids[0] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, "ThreadNum");
+
+ FS_start(fs2_handle);
+ }
pthread_create(&hos_func->fs2_thread, NULL, fs2_statistics, NULL);
@@ -633,6 +626,11 @@ static bool hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t
}
}
+void hos_init_log()
+{
+ MESA_handle_runtime_log_creation("./log");
+}
+
hos_instance hos_get_instance()
{
if (g_hos_handle.S3Client != NULL)
@@ -675,7 +673,6 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t
MESA_load_profile_uint_def(conf_path, module, "hos_fs2_format", &hos_conf->fs2_fmt, 0);
if (hos_conf->ip && hos_conf->port && strlen(hos_conf->accesskeyid) && strlen(hos_conf->secretkey))
{
- MESA_handle_runtime_log_creation("./log");
g_hos_handle.log = MESA_create_runtime_log_handle(hos_conf->log_path, hos_conf->log_level);
if (log == NULL)
{
@@ -703,7 +700,7 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t
MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "Instance init completed");
if (hos_conf->fs2_ip && hos_conf->fs2_port)
{
- hos_expand_fs2(hos_conf->fs2_path, hos_conf->fs2_fmt, hos_conf->fs2_ip, hos_conf->fs2_port);
+ hos_expand_fs2();
}
else
{
@@ -1211,7 +1208,11 @@ int hos_shutdown_instance()
for (i = 0; i < FS2_RECORD_EVENTS; i++)
{
screen_stat_handle_t *fs2_handle = &hos_func->fs2_info[i].fs2_handle;
- FS_stop(fs2_handle);
+ if (*fs2_handle)
+ {
+ FS_stop(fs2_handle);
+ *fs2_handle = NULL;
+ }
if (hos_func->fs2_info[i].reserved)
{
if (i == 0)
@@ -1244,11 +1245,18 @@ int hos_shutdown_instance()
#endif
}
free(hos_func->fs2_info[i].reserved);
+ hos_func->fs2_info[i].reserved = NULL;
}
if (hos_func->fs2_info[i].line_ids)
+ {
free(hos_func->fs2_info[i].line_ids);
+ hos_func->fs2_info[i].line_ids=NULL;
+ }
if (hos_func->fs2_info[i].column_ids)
+ {
free(hos_func->fs2_info[i].column_ids);
+ hos_func->fs2_info[i].column_ids=NULL;
+ }
}
}
@@ -1259,6 +1267,7 @@ int hos_shutdown_instance()
if (g_fd_info)
{
free(g_fd_info);
+ g_fd_info = NULL;
}
for (i = 0; i < hos_conf->thread_num; i++)
@@ -1269,9 +1278,16 @@ int hos_shutdown_instance()
if (g_fd_context)
{
free(g_fd_context);
+ g_fd_context = NULL;
}
Aws::ShutdownAPI(g_options);
+ MESA_destroy_runtime_log_handle(g_hos_handle.log);
+ g_hos_handle.log = NULL;
+ memset(&g_hos_handle, 0 , sizeof(g_hos_handle));
+ if (g_hos_instance.hos_url_prefix)
+ free((void *)g_hos_instance.hos_url_prefix);
+ memset(&g_hos_instance, 0, sizeof(g_hos_handle));
return HOS_CLIENT_OK;
}