diff options
| author | 刘学利 <[email protected]> | 2021-08-24 05:28:08 +0000 |
|---|---|---|
| committer | 刘学利 <[email protected]> | 2021-08-24 05:28:08 +0000 |
| commit | e6e71dca4f7762590b1066eb3df9b2ca22a4bb73 (patch) | |
| tree | 9b217f277d827d28f27161d1f95d33f1f34d9990 | |
| parent | ea1875f229aecba6a2a3372ac96f7fcfa809874c (diff) | |
TSG-7510: tsg_master提供发送payload到kafka的接口
| -rw-r--r-- | inc/tsg_send_log.h | 4 | ||||
| -rw-r--r-- | src/tsg_entry.cpp | 4 | ||||
| -rw-r--r-- | src/tsg_entry.h | 5 | ||||
| -rw-r--r-- | src/tsg_send_log.cpp | 74 | ||||
| -rw-r--r-- | src/tsg_send_log_internal.h | 1 |
5 files changed, 83 insertions, 5 deletions
diff --git a/inc/tsg_send_log.h b/inc/tsg_send_log.h index 4542a1b..2f9caf1 100644 --- a/inc/tsg_send_log.h +++ b/inc/tsg_send_log.h @@ -39,6 +39,10 @@ int TLD_cancel(struct TLD_handle_t *handle); int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handle, tsg_log_t *log_msg, int thread_id); +//return topic_id; return >=0 if success,otherwise return -1; +int tsg_register_topic(struct tsg_log_instance_t *instance, char *topic_name); +int tsg_send_payload(struct tsg_log_instance_t *instance, int topic_id, char *payload, int payload_len, int thread_id); + unsigned long long tsg_get_stream_id(struct streaminfo *a_stream); char *tsg_l7_protocol_id2name(unsigned int l7_protocol_id); unsigned int tsg_l7_protocol_name2id(const char *l7_protocol_name); diff --git a/src/tsg_entry.cpp b/src/tsg_entry.cpp index 40a95ef..80b8c78 100644 --- a/src/tsg_entry.cpp +++ b/src/tsg_entry.cpp @@ -75,7 +75,9 @@ id2field_t g_tsg_fs2_field[TSG_FS2_MAX]={{0, TSG_FS2_TCP_LINKS, "tcp_links"}, {0, TSG_FS2_MIRRORED_PKT_SUCCESS, "mirror_pkt_suc"}, {0, TSG_FS2_MIRRORED_BYTE_SUCCESS, "mirror_byte_suc"}, {0, TSG_FS2_MIRRORED_PKT_FAILED, "mirror_pkt_fai"}, - {0, TSG_FS2_MIRRORED_BYTE_FAILED, "mirror_byte_fai"} + {0, TSG_FS2_MIRRORED_BYTE_FAILED, "mirror_byte_fai"}, + {0, TSG_FS2_DDOS_SUCCESS_LOG, "ddos_suc_log"}, + {0, TSG_FS2_DDOS_FAILED_LOG, "ddos_fai_log"} }; id2field_t g_tsg_proto_name2id[PROTO_MAX]={{PROTO_UNKONWN, 0, "unknown"}, diff --git a/src/tsg_entry.h b/src/tsg_entry.h index 1384f7b..62cdbaa 100644 --- a/src/tsg_entry.h +++ b/src/tsg_entry.h @@ -109,6 +109,8 @@ enum TSG_FS2_TYPE{ TSG_FS2_MIRRORED_BYTE_SUCCESS, TSG_FS2_MIRRORED_PKT_FAILED, TSG_FS2_MIRRORED_BYTE_FAILED, + TSG_FS2_DDOS_SUCCESS_LOG, + TSG_FS2_DDOS_FAILED_LOG, TSG_FS2_MAX }; @@ -243,7 +245,8 @@ typedef struct tsg_para char device_id_command[MAX_DOMAIN_LEN/8]; char data_center[_MAX_TABLE_NAME_LEN]; char table_name[TABLE_MAX][_MAX_TABLE_NAME_LEN]; - void *logger; + void *logger; + void *maat_logger; struct reset_argv reset; screen_stat_handle_t fs2_handle; struct l7_protocol *name_by_id; diff --git a/src/tsg_send_log.cpp b/src/tsg_send_log.cpp index 48b2bcd..8562c22 100644 --- a/src/tsg_send_log.cpp +++ b/src/tsg_send_log.cpp @@ -1358,7 +1358,6 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile) char nic_name[32]={0}; char kafka_errstr[1024]={0}; unsigned int local_ip_nr=0; - rd_kafka_t *kafka_handle = NULL; rd_kafka_conf_t *rdkafka_conf = NULL; rd_kafka_topic_conf_t *topic_conf; struct tsg_log_instance_t *_instance=NULL; @@ -1473,7 +1472,7 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile) rd_kafka_conf_set(rdkafka_conf, "sasl.password", _instance->sasl_passwd, kafka_errstr, sizeof(kafka_errstr)); } - if(!(kafka_handle=rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)))) + if(!(_instance->kafka_handle=rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)))) { MESA_handle_runtime_log(_instance->logger, RLOG_LV_FATAL, "KAFKA_INIT", "rd_kafka_new is error"); return NULL; @@ -1490,7 +1489,7 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile) if(_instance->service2topic[i].type==TLD_TYPE_MAX) { topic_conf=rd_kafka_topic_conf_new(); - _instance->topic_rkt[_instance->service2topic[i].id]=rd_kafka_topic_new(kafka_handle, _instance->service2topic[i].name, topic_conf); + _instance->topic_rkt[_instance->service2topic[i].id]=rd_kafka_topic_new(_instance->kafka_handle, _instance->service2topic[i].name, topic_conf); } } } @@ -1685,3 +1684,72 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl return 0; } +int tsg_register_topic(struct tsg_log_instance_t *instance, char *topic_name) +{ + rd_kafka_topic_conf_t *topic_conf; + struct tsg_log_instance_t *_instance=(struct tsg_log_instance_t *)instance; + if(_instance->mode==CLOSE) + { + return 0; + } + + if(topic_name!=NULL && _instance!=NULL && _instance->kafka_handle!=NULL) + { + + _instance->service2topic=(id2field_t *)realloc(_instance->service2topic, (_instance->max_service+1)*sizeof(id2field_t)); + _instance->service2topic[_instance->max_service].id=_instance->max_service; + _instance->service2topic[_instance->max_service].type=TLD_TYPE_MAX; + memset(_instance->service2topic[_instance->max_service].name, 0, MAX_STRING_LEN); + memcpy(_instance->service2topic[_instance->max_service].name, topic_name, MIN(MAX_STRING_LEN-1, strlen(topic_name))); + + _instance->topic_rkt=(rd_kafka_topic_t **)realloc(_instance->topic_rkt, (_instance->max_service+1)*sizeof(rd_kafka_topic_t*)); + topic_conf=rd_kafka_topic_conf_new(); + _instance->topic_rkt[_instance->max_service]=rd_kafka_topic_new(_instance->kafka_handle, topic_name, topic_conf); + + _instance->max_service++; + } + else + { + return -1; + } + + return (_instance->max_service-1); +} + +int tsg_send_payload(struct tsg_log_instance_t *instance, int topic_id, char *payload, int payload_len, int thread_id) +{ + int status=0; + struct tsg_log_instance_t *_instance=instance; + + if(_instance->mode==CLOSE) + { + return 0; + } + + if(_instance==NULL || payload==NULL || payload_len<=0 || topic_id<0 || _instance->topic_rkt==NULL) + { + return -1; + } + + status=rd_kafka_produce(_instance->topic_rkt[topic_id], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, payload, payload_len, NULL, 0, NULL); + if(status<0) + { + FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_DDOS_FAILED_LOG], 0, FS_OP_ADD, 1); + + MESA_handle_runtime_log(_instance->logger, + RLOG_LV_INFO, + "TSG_SEND_LOG", + "tsg_send_log to kafka is error of %s(%s), status: %d, topic: %s", + rd_kafka_err2name(rd_kafka_last_error()), + rd_kafka_err2str(rd_kafka_last_error()), + status, + _instance->service2topic[topic_id].name + ); + } + else + { + FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_DDOS_SUCCESS_LOG], 0, FS_OP_ADD, 1); + } + + return 0; +} diff --git a/src/tsg_send_log_internal.h b/src/tsg_send_log_internal.h index 8304d8a..7fca826 100644 --- a/src/tsg_send_log_internal.h +++ b/src/tsg_send_log_internal.h @@ -153,6 +153,7 @@ struct tsg_log_instance_t char l7_proto_id_file[MAX_STRING_LEN*4]; id2field_t id2field[LOG_COMMON_MAX]; rd_kafka_topic_t **topic_rkt; + rd_kafka_t *kafka_handle; id2field_t *service2topic; void *logger; }; |
