summaryrefslogtreecommitdiff
path: root/src/hos_client.cpp
diff options
context:
space:
mode:
authorpengxuanzheng <[email protected]>2020-10-20 17:20:27 +0800
committerpengxuanzheng <[email protected]>2020-11-02 19:00:21 +0800
commite5f5062f51ea4ea7af418e1161e3139ae9fea984 (patch)
tree7067a04c639f1c8cb69a9710e86bb11bb95bc21e /src/hos_client.cpp
parent050a489e5560c82e078ebe71f60e154c345d6997 (diff)
支持field_stat2
Diffstat (limited to 'src/hos_client.cpp')
-rw-r--r--src/hos_client.cpp137
1 files changed, 130 insertions, 7 deletions
diff --git a/src/hos_client.cpp b/src/hos_client.cpp
index 805450b3..8398009d 100644
--- a/src/hos_client.cpp
+++ b/src/hos_client.cpp
@@ -6,6 +6,8 @@
extern "C"
{
#include<string.h>
+#include <sys/stat.h>
+#include <unistd.h>
}
#include <aws/core/Aws.h>
#include <aws/s3/S3Client.h>
@@ -16,9 +18,9 @@ extern "C"
#include <fstream>
#include <iostream>
#include <mutex>
-#include <sys/stat.h>
#include "hos_client.h"
#include "hos_hash.h"
+#include "field_stat2.h"
#define MAX_HOS_CLIENT_THREAD_NUM 255
#define MAX_HOS_CLIENT_FD_NUM 65535
@@ -32,6 +34,18 @@ typedef struct hos_client_handle_s
size_t cache_size;
size_t cache_times;
size_t thread_sum;
+ /* expand */
+ screen_stat_handle_t fs2_handle;
+ pthread_t fs2_thread;
+ int fs2_status;
+#define HOS_FS2_START 1
+#define HOS_FS2_STOP 2
+ int *line_ids;
+ int *column_ids;
+ size_t tx_pkts;
+ size_t tx_bytes;
+ size_t rx_pkts;
+ size_t rx_bytes;
}hos_client_handle_t;
hos_info_t *hash_hos_info[MAX_HOS_CLIENT_THREAD_NUM];
@@ -140,6 +154,93 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi
return handle;
}
+static void *fs2_statistics(void *ptr)
+{
+ hos_client_handle handle = (hos_client_handle)ptr;
+
+ while(1)
+ {
+ if (handle->fs2_status == HOS_FS2_STOP)
+ {
+ break;
+ }
+
+ FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[0], FS_OP_SET, handle->tx_pkts);
+ FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[1], FS_OP_SET, handle->tx_bytes);
+ FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[2], FS_OP_SET, handle->rx_pkts);
+ FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[3], FS_OP_SET, handle->rx_bytes);
+
+ FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[0], FS_OP_SET, handle->tx_pkts);
+ FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[1], FS_OP_SET, handle->tx_bytes);
+ FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[2], FS_OP_SET, handle->rx_pkts);
+ FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[3], FS_OP_SET, handle->rx_bytes);
+
+ sleep(1);
+ }
+ pthread_exit(NULL);
+}
+
+void hos_expand_fs2(hos_client_handle handle, const char * path, int format, char *server_ip, int port)
+{
+ screen_stat_handle_t fs2_handle = NULL;
+ const char *app_name = "hos-sdk-client-cpp";
+ int *line_ids = (int *)malloc(sizeof(int) * 2);
+ int *column_ids = (int *)malloc(sizeof(int) * 4);
+ int value = 0;
+ char buff[128];
+
+ fs2_handle = FS_create_handle();
+
+ FS_set_para(fs2_handle, APP_NAME, app_name, strlen(app_name) + 1);
+ value = 1;//true
+ FS_set_para(fs2_handle, FLUSH_BY_DATE, &value, sizeof(value));
+ if (path != NULL)
+ {
+ FS_set_para(fs2_handle, OUTPUT_DEVICE, path, strlen(path) + 1);
+ }
+ value = 2;//append
+ FS_set_para(fs2_handle, PRINT_MODE, &value, sizeof(value));
+ value = 1;
+ FS_set_para(fs2_handle, CREATE_THREAD, &value, sizeof(value));
+ FS_set_para(fs2_handle, METRIS_FORMAT, &format, sizeof(format));
+ value = 4096;
+ FS_set_para(fs2_handle, MAX_STAT_FIELD_NUM, &value, sizeof(value));
+ if (server_ip == NULL)
+ {
+ FS_set_para(fs2_handle, STATS_SERVER_IP, "127.0.0.1", strlen("127.0.0.1"));
+ }else
+ {
+ FS_set_para(fs2_handle, STATS_SERVER_IP, server_ip, strlen(server_ip));
+ }
+
+ FS_set_para(fs2_handle, STATS_SERVER_PORT, &port, sizeof(port));
+
+ //line info
+ snprintf(buff, sizeof(buff), "tx_pkts(MB)");
+ line_ids[0] = FS_register(fs2_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, buff);
+ snprintf(buff, sizeof(buff), "tx_bytes(MB)");
+ line_ids[1] = FS_register(fs2_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, buff);
+ snprintf(buff, sizeof(buff), "rx_pkts(MB)");
+ line_ids[2] = FS_register(fs2_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, buff);
+ snprintf(buff, sizeof(buff), "rx_bytes(MB)");
+ line_ids[3] = FS_register(fs2_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, buff);
+ snprintf(buff, sizeof(buff), "total");
+ column_ids[0] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff);
+ snprintf(buff, sizeof(buff), "per-second");
+ column_ids[1] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_SPEED, buff);
+
+ handle->fs2_handle = fs2_handle;
+ handle->line_ids = line_ids;
+ handle->column_ids = column_ids;
+ handle->fs2_status = HOS_FS2_START;
+
+ FS_start(fs2_handle);
+
+ pthread_create(&handle->fs2_thread, NULL, fs2_statistics, handle);
+
+ return ;
+}
+
bool hos_verify_bucket(hos_client_handle handle, const char *bucket)
{
Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets();
@@ -288,6 +389,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
char buf[128];
int flag = 0; // 0, 一次处理就可以完成;1,需要多次处理才能处理完
int rest; // stream 剩余未处理的数据长度
+ int ret = 0;
if ((fd == 0) || (stream == NULL) || (thread_id > MAX_HOS_CLIENT_THREAD_NUM))
{
return HOS_PARAMETER_ERROR;
@@ -303,6 +405,11 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
}
handle = (hos_client_handle)hos_info->handle;
+
+ //field_stat2 record
+ handle->tx_pkts++;
+ handle->tx_bytes += stream_len;
+
Aws::S3::S3Client& S3Client = *(handle->S3Client);
// Create and configure the asynchronous put object request.
@@ -379,8 +486,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
sprintf(buf, "%lu %lu", thread_id, fd);
context->SetUUID(buf);
- S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context);
-
+ ret = S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context);
//恢复fd 的cache设置
if (hos_info->mode & APPEND_MODE)
{
@@ -388,11 +494,17 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
hos_info->cache_rest = hos_info->handle->cache_size;
hos_info->cache_times = hos_info->handle->cache_times;
}
- while (flag == 1)
+ if (ret == HOS_CLIENT_OK)
{
- return hos_write(fd, &stream[hos_info->cache_rest], rest, thread_id);
+ handle->rx_bytes += handle->cache_size;
+ handle->rx_pkts++;
+ while (flag == 1)
+ {
+ return hos_write(fd, &stream[hos_info->cache_rest], rest, thread_id);
+ }
}
- return HOS_CLIENT_OK;
+
+ return ret;
}
int hos_close_fd(size_t fd, size_t thread_id)
@@ -425,7 +537,18 @@ int hos_client_destory(hos_client_handle handle)
{
delete_all(&hash_hos_info[i]);
}
- //delete(handle->options);
+ if (handle->fs2_handle)
+ {
+ FS_stop(&handle->fs2_handle);
+ }
+ if (handle->line_ids)
+ {
+ free(handle->line_ids);
+ }
+ if (handle->column_ids)
+ {
+ free(handle->column_ids);
+ }
free(handle);
return HOS_CLIENT_OK;