1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
|
#pragma once
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stddef.h>
#include <pthread.h>
#include <netinet/in.h>
#include <pcap/pcap.h>
#ifdef __cplusplus
extern "C"
{
#endif
#include "stellar/monitor.h"
#include "sds/sds.h"
#include "stellar/module.h"
#include "stellar/log.h"
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/thread.h>
#include <event2/http.h>
#include "monitor_rpc.h"
/********************************** limit definition *****************************************/
#ifndef STELLAR_MAX_THREAD_NUM
#define STELLAR_MAX_THREAD_NUM (256)
#endif
#define STM_RINGBUF_SIZE (1024 * 1024) /* per thread */
#define STM_CONNECTION_IDLE_TIMEOUT 300 /* How many seconds elapsed without input command, connection will closed */
#define STM_REQUEST_TIMEOUT 5
#define STM_SERVER_LISTEN_IP "127.0.0.1"
#define STM_SERVER_LISTEN_PORT 80
#define STM_TZSP_UDP_PORT 37008 /* default port of TZSP protocol: https://en.wikipedia.org/wiki/TZSP# */
#define STM_SESSION_DEFAULT_SEARCH_COUNT 100 /* if no count params, max search session number */
#define STM_SESSION_DEFAULT_LIMIT_NUM 10 /* if no limit params, max support result session number */
#define STM_SESSION_MAX_LIMIT_NUM 1000
#define STM_UINT64_READABLE_STRING_MAX_LEN 21 /* MAX value is: 18446744073709551615 */
#define STM_UINT32_READABLE_STRING_MAX_LEN 11 /* MAX value is: 4294967295 */
#define STM_CONNECTIVITY_DEFALUT_COUNT 5 /* ping default count */
#define STM_CONNECTIVITY_DEFALUT_SIZE 64 /* ping default bytes */
#define STM_CONNECTIVITY_MAX_COUNT 100 /* ping max count */
#define STM_CONNECTIVITY_MAX_SIZE 65535 /* ping max bytes */
/************************************************************************/
#define STM_CMD_CALLBACK_THREAD_LOCAL_MAGIC (0x1234ABCD)
#define STM_RINGBUF_HDR_MAGIC (0x0ABCD12345678)
#define STM_RINGBUF_THREAD_IDX_SERVER 0
#define STM_RINGBUF_THREAD_IDX_AGENT 1
#define STM_MONITOR_THREAD_ID 0 // There are only two threads, use fix id
#define STM_WORKER_THREAD_ID 1 // There are only two threads, use fix id
#define STM_LOG_MODULE_NAME "monitor"
#define STM_STAT_OUTPUT_PATH "log/monitor.fs4"
#define STM_STAT_OUTPUT_INTERVAL_MS 3000
#define STM_RESTFUL_VERSION "v1"
#define STM_RESTFUL_RESOURCE "stellar_monitor"
#define STM_RESTFUL_URI_CMD_KEY "raw_cmd" // example: http://127.0.0.1:80/v1/stellar_monitor?raw_cmd=show%20session
#define STM_CLIENT_SERVER_SYNC_CMD "show command verbose"
#define STM_CLI_CMD_HINTS_COLOR 90
#define STM_CLI_CMD_HINTS_BOLD 0
#ifndef UNUSED
#define UNUSED __attribute__((unused))
#endif
#ifdef NDEBUG // release version
#define STM_DBG_PRINT(fmt, args...)
#else
#define STM_DBG_PRINT(fmt, args...) fprintf(stderr, fmt, ##args)
#endif
#ifndef CALLOC
#define CALLOC(type, number) ((type *)calloc(sizeof(type), number))
#endif
#ifndef FREE
#define FREE(ptr) \
{ \
if (ptr) \
{ \
free((void *)ptr); \
ptr = NULL; \
} \
}
#endif
#define STM_TIME_START() \
struct timespec __start_time, __end_time; \
unsigned long long diff; \
clock_gettime(CLOCK_MONOTONIC, &__start_time);
#define STM_TIME_DIFF() \
{ \
clock_gettime(CLOCK_MONOTONIC, &__end_time); \
if (__start_time.tv_sec == __end_time.tv_sec) \
{ \
diff = (unsigned long long)(__end_time.tv_nsec - __start_time.tv_nsec); \
} \
diff = ((unsigned long long)__end_time.tv_sec * 1000 * 1000 * 1000 + __end_time.tv_nsec) - ((unsigned long long)__start_time.tv_sec * 1000 * 1000 * 1000 + __start_time.tv_nsec); \
}
#ifndef MIN
#define MIN(x, y) ((x) < (y) ? (x) : (y))
#endif
#ifndef MAX
#define MAX(x, y) ((x) > (y) ? (x) : (y))
#endif
#ifndef TRUE
#define TRUE 1
#endif
#ifndef FALSE
#define FALSE 0
#endif
#define STM_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, STM_LOG_MODULE_NAME, format, ##__VA_ARGS__)
#define STM_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, STM_LOG_MODULE_NAME, format, ##__VA_ARGS__)
#define STM_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, STM_LOG_MODULE_NAME, format, ##__VA_ARGS__)
#define STM_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, STM_LOG_MODULE_NAME, format, ##__VA_ARGS__)
enum stm_http_response_code
{
STM_HTTP_200_OK = 200,
STM_HTTP_204_NO_CONTENT = 204,
STM_HTTP_403_FORBIDDEN = 403,
STM_HTTP_408_REQUEST_TIMEOUT = 408,
STM_HTTP_413_PAYLOAD_TOO_LARGE = 413,
};
enum stm_stat_type
{
STM_STAT_CLI_CONNECTION_NEW,
STM_STAT_CLI_CONNECTION_CLOSE,
STM_STAT_CLI_REQUEST_SUCC,
STM_STAT_CLI_RESPONSE_SUCC,
STM_STAT_CLI_REQUEST_ERR, // RESTFul Syntax error!
STM_STAT_CLI_RESPONSE_ERR, // attention: empty result is not error!
STM_STAT_MAX,
};
struct stm_spinlock;
struct stellar_monitor_config
{
// int thread_count;
size_t ringbuf_size; /* bytes */
int connection_idle_timeout;
int cli_request_timeout;
char *listen_ipaddr;
unsigned short listen_port_host_order;
unsigned short data_link_bind_port_host_order; // for TZSP protocol
int output_interval_ms;
char *output_path;
};
struct stm_key_value_tuple
{
char *key;
char *value;
};
struct stm_key_value
{
int tuple_num;
struct stm_key_value_tuple *tuple;
};
typedef struct evhttp_request stm_network_connection;
struct stm_cmd_transaction
{
struct stm_cmd_request *cmd_req;
struct stm_cmd_reply *cmd_res[STELLAR_MAX_THREAD_NUM]; // multi thread merge to one
};
struct stm_connection_manager
{
struct timeval link_start_time;
struct timeval last_active_time;
struct evhttp_connection *conn;
char peer_ipaddr[INET6_ADDRSTRLEN];
uint16_t peer_port_host_order;
struct stm_connection_manager *next, *prev;
};
struct stm_stat_counter
{
int counter_id;
uint64_t count;
uint64_t bytes;
};
struct stm_stat
{
void *fs4_ins;
struct stm_stat_counter counters[STM_STAT_MAX];
};
struct monitor_connection
{
struct evhttp_connection *current_evconn_ref;
};
/* optional API */
struct monitor_connection;
typedef void(monitor_connection_close_cb)(struct monitor_connection *conn, void *arg);
int monitor_register_connection_close_cb(struct stellar_monitor *monitor, monitor_connection_close_cb *cb, void *arg);
struct stm_conn_close_cb_manager
{
monitor_connection_close_cb *cb;
void *arg;
struct stm_conn_close_cb_manager *next, *prev;
};
struct stm_pktdump_runtime;
struct stellar_monitor
{
struct module_manager *mod_mgr_ref;
struct logger *logger_ref;
int worker_thread_num;
struct stellar_monitor_config *config;
struct stm_cmd_assistant *aide; // reference, share with stellar
struct stm_connection_manager *connection_mgr; // used to tracking all connections, for cli "who" command
struct stm_conn_close_cb_manager *conn_close_mgr;
// struct stm_ringbuf_mgr *ringbuf_mgr[STELLAR_MAX_THREAD_NUM];
struct event_base *evt_base;
// struct event *ev_timeout;
struct evhttp *evt_http_server;
pthread_t evt_main_loop_tid;
struct timeval time_now;
struct stm_stat *stat;
struct stm_spinlock *lock; // for dynamic register command, conn_close_cb
struct monitor_connection current_conn;
struct stm_pktdump_runtime *packet_dump;
struct monitor_rpc **rpc_ins_array; // multir threads
};
enum monitor_reply_type
{
MONITOR_REPLY_INTEGER,
MONITOR_REPLY_DOUBLE,
MONITOR_REPLY_STRING,
MONITOR_REPLY_ERROR,
MONITOR_REPLY_STATUS,
MONITOR_REPLY_NIL,
};
struct monitor_reply
{
enum monitor_reply_type type;
long long integer; /* The integer when type is SWARMKV_REPLY_INTEGER */
double dval; /* The double when type is SWARMKV_REPLY_DOUBLE */
int len; /* Length of string */
char *str;
int http_code;
const char *http_reason;
};
struct monitor_cli_args
{
const char *short_opt;
const char *long_opt;
int require_arg_value;
int value_is_multi_words; // "a b c d e f g"
char *value; // should be free after use
};
/************************************************************************************************************/
/* monitor call gettimeofday(2) by default */
struct stellar_monitor_config *stellar_monitor_config_new(const char *toml);
struct stellar_monitor *stellar_monitor_get(void);
struct stm_connection_manager *stm_connection_insert(struct evhttp_connection *evconn);
void stm_connection_update(struct stm_connection_manager *conn_mgr, const struct evhttp_connection *evconn);
void stm_connection_delete(struct evhttp_connection *evconn);
const struct stm_connection_manager *stm_connection_search(const struct stm_connection_manager *conn_mgr_head, const struct evhttp_connection *evconn);
struct stm_key_value *stm_cmd_key_value_new(void);
void stm_cmd_key_value_append(struct stm_key_value **kv, const char *key, const char *value);
void stm_cmd_key_value_free(struct stm_key_value *kv);
/************************************** command manager **********************************************/
struct stm_stat *stm_stat_init(struct stellar_monitor *stm);
sds stm_config_print(const struct stellar_monitor_config *config);
void stm_stat_free(struct stm_stat *stat);
void stm_stat_update(struct stm_stat *stat, int thread_idx, enum stm_stat_type type, long long value);
long long stm_get_stat_count(struct stm_stat *stat, enum stm_stat_type type);
long long stm_get_stat_bytes(struct stm_stat *stat, enum stm_stat_type type);
sds monitor_reply_to_string(const struct monitor_reply *reply);
void monitor_reply_free(struct monitor_reply *reply);
int monitor_util_parse_cmd_args(int argc, const char *argv[], struct monitor_cli_args cli_args[], size_t cli_args_array_size);
char *stm_http_url_encode(const char *originalText);
struct stm_spinlock *stm_spinlock_new(void);
void stm_spinlock_lock(struct stm_spinlock *splock);
void stm_spinlock_unlock(struct stm_spinlock *splock);
void stm_spinlock_free(struct stm_spinlock *splock);
struct stm_pktdump_runtime *stm_packet_dump_new(struct stellar_monitor *stm, const struct stellar_monitor_config *config);
void stm_pktdump_enforcer_free(struct stellar_monitor *stm);
struct monitor_rpc *stm_rpc_new(void);
void stm_rpc_free(struct monitor_rpc *rpc_ins);
int stm_rpc_exec(int thread_idx, struct monitor_rpc *rpc_ins);
struct iovec stm_rpc_call(struct monitor_rpc *rpc_ins, struct iovec rpc_request, monitor_rpc_callabck *cb, void *user_args);
void monitor_rpc_free(struct monitor_rpc *rpc_ins);
struct monitor_rpc *monitor_rpc_new(struct stellar_monitor *stm, struct module_manager *mod_mgr);
struct stellar_monitor *monitor_new(const char *toml_file, struct module_manager *mod_mgr, struct logger *logh);
/* Must be called in 'monitor_cmd_cb' context */
struct monitor_connection *monitor_get_current_connection(struct stellar_monitor *monitor);
/* Get the remote address and port associated with this connection. */
int monitor_get_peer_addr(struct monitor_connection *conn, char **peer_ip, unsigned short *peer_port);
/* command enforcer */
int show_session_enforcer_init(struct module_manager *mod_mgr, struct stellar_monitor *stm);
#ifdef __cplusplus
}
#endif
|