diff options
| author | 李仁杰 <[email protected]> | 2019-09-02 22:53:58 +0800 |
|---|---|---|
| committer | 李仁杰 <[email protected]> | 2019-09-02 22:53:58 +0800 |
| commit | f89b44bdf46af2b0e58d178aabcb74b53e78f13d (patch) | |
| tree | 45260831c0b355bb6c5dc0fb47b213c2cf47f442 | |
| parent | 783eef0d2efd4d2526372f88d7f049f2cdbd2105 (diff) | |
Upload New File
| -rw-r--r-- | dns/dns_lrj.c | 336 |
1 files changed, 336 insertions, 0 deletions
diff --git a/dns/dns_lrj.c b/dns/dns_lrj.c new file mode 100644 index 0000000..07cce6d --- /dev/null +++ b/dns/dns_lrj.c @@ -0,0 +1,336 @@ +#include <stdio.h> +#include <string.h> +#include <time.h> +#include <arpa/inet.h> +#include <pcap/pcap.h> +#include <assert.h> +#include <syslog.h> +#include <signal.h> +#include <MESA/stream.h> +#include <MESA/dns.h> +#include <MESA/MESA_prof_load.h> +#include <MESA/MESA_handle_logger.h> +#include "rdkafka.h" +#include "cJSON.h" + +#define MAX_LOG_PATH_LEN 256 +// #define DEBUG 1 +#ifndef MAX_MODULE_NAME_LEN +#define MAX_MODULE_NAME_LEN 256 +#endif +#ifndef MAX_BROKERS_LEN +#define MAX_BROKERS_LEN 256 +#endif +#ifndef MAX_TOPIC_LEN +#define MAX_TOPIC_LEN 256 +#endif + +const char *dns_lrj_conf_file = "./conf/dns_lrj/dns_lrj.conf"; +static void *runtime_log_handler; +char dns_log_path[MAX_LOG_PATH_LEN]; +char module_name[MAX_MODULE_NAME_LEN]; +// static const char *module_name = "DNS_LRJ"; +// static const char *kafka_log_path = "./log/lirenjie_vxlan/kafka.log"; +/* kafka */ +char kafka_log_path[MAX_LOG_PATH_LEN]; +static void *kafka_log_handler; +static const int PRODUCER_INIT_FAILED = -1; +static const int PRODUCER_INIT_SUCCESS = 0; +static const int PUSH_DATA_FAILED = -1; +static const int PUSH_DATA_SUCCESS = 0; +static int partition; +//rd +static rd_kafka_t *kafka_producer; +static rd_kafka_conf_t *conf; +// topic +static rd_kafka_topic_t *rkt; +static rd_kafka_topic_conf_t *topic_conf; +// char errstr[512]={0}; +char brokers[MAX_BROKERS_LEN]; +char topic[MAX_TOPIC_LEN]; + +static void logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf) +{ + struct timeval tv; + gettimeofday(&tv, NULL); + /*fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n", + (int)tv.tv_sec, (int)(tv.tv_usec / 1000), + level, fac, rk ? rd_kafka_name(rk) : NULL, buf);*/ + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name, "%u.%03u RDKAFKA-%i-%s: %s: %s", + (int)tv.tv_sec, (int)(tv.tv_usec / 1000), + level, fac, rk ? rd_kafka_name(rk) : NULL, buf); +} + +static int init_kafka(int partition_, char *brokers_, char *topic_) +{ + char tmp[16]; + char errstr[1024]; + partition = partition_; + /* Kafka configuration */ + conf = rd_kafka_conf_new(); + //set logger :register log function + rd_kafka_conf_set_log_cb(conf, logger); + /* Quick termination */ + snprintf(tmp, sizeof(tmp), "%i", SIGIO); + rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); + //rd_kafka_conf_set(conf, "producer.type", "kafka.producer.AyncProducer", errstr, sizeof(errstr)); + rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", errstr, sizeof(errstr)); + rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000",errstr, sizeof(errstr)); + rd_kafka_conf_set(conf, "request.required.acks", "0", errstr, sizeof(errstr)); + /*topic configuration*/ + topic_conf = rd_kafka_topic_conf_new(); + + if (conf == NULL) + { + //fprintf(stderr, "***** Failed to create new conf *******\n"); + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "***** Failed to create new conf *******"); + return PRODUCER_INIT_FAILED; + } + kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, (size_t)sizeof(errstr)); + if (kafka_producer == NULL) + { + /*fprintf(stderr, "***** kafka_producer is null *******\n"); + fprintf(stderr, "*****Failed to create new producer: %s*******\n", errstr);*/ + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "***** kafka_producer is null, *******"); + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "*****Failed to create new producer: %s*******\n", errstr); + return PRODUCER_INIT_FAILED; + } + + rd_kafka_set_log_level(kafka_producer, LOG_DEBUG); + + /* Add brokers */ + if (rd_kafka_brokers_add(kafka_producer, brokers_) == 0) + { + //fprintf(stderr, "****** No valid brokers specified********\n"); + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "****** No valid brokers specified********"); + return PRODUCER_INIT_FAILED; + } + /* Create topic */ + rkt = rd_kafka_topic_new(kafka_producer, topic_, topic_conf); + + return PRODUCER_INIT_SUCCESS; +} + +static void kafka_destroy() +{ + rd_kafka_topic_destroy(rkt); + rd_kafka_destroy(kafka_producer); +} + +static int push_data_to_kafka(char *buffer, int buf_len) +{ + int ret; + if (buffer == NULL) + { + return 0; + } + // ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); + ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); + if (ret < 0) + { + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Failed to produce to topic %s : %s", + rd_kafka_topic_name(rkt), rd_kafka_err2str(rd_kafka_last_error())); + return PUSH_DATA_FAILED; + } + else if(ret == 0){ + // MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Sent %zd bytes to topic %s partition %i", + // buf_len, rd_kafka_topic_name(rkt), partition); + return PUSH_DATA_SUCCESS; + } + return ret==0 ? PUSH_DATA_SUCCESS:PUSH_DATA_FAILED; +} +/******************************************** */ + +char dns_lrj_entry(stSessionInfo *session_info, void **pme, int thread_seq, struct streaminfo *a_udp, void *a_packet) +{ + // printf("********************* DNS_ENTRY success ***************************\n"); + // int dns_sec = 0; + + int i = 0; + char ip_str[128]; + dns_rr_t *dns_rr = NULL; + + dns_info_t *dns_info = (dns_info_t *)session_info->app_info; + + if (dns_info != NULL && dns_info->hdr_info.qr == 1 && dns_info->hdr_info.ancount > 0) + { + int is_exist = 0; + cJSON *root, *req_array, *rr_array; + root = cJSON_CreateObject(); + cJSON_AddNumberToObject(root, "qdcount", dns_info->hdr_info.qdcount); + cJSON_AddNumberToObject(root, "ancount", dns_info->hdr_info.ancount); + cJSON_AddNumberToObject(root, "aucount", dns_info->hdr_info.aucount); + cJSON_AddNumberToObject(root, "adcount", dns_info->hdr_info.adcount); + // printf("ANSWER RSS:%d\n", dns_info->hdr_info.ancount); + // printf("Authority RSS RSS:%d\n", dns_info->hdr_info.aucount); + // printf("Additional RSS:%d\n", dns_info->hdr_info.adcount); + cJSON_AddItemToObject(root, "question", req_array = cJSON_CreateArray()); + for (i = 0; i < dns_info->hdr_info.qdcount && (&dns_info->query_question[i]) != NULL; i++) + { + // printf("Req name:%s\n", dns_info->query_question[i].qname); + cJSON *req = cJSON_CreateObject(); + cJSON_AddItemToArray(req_array, req); + cJSON_AddStringToObject(req, "qname", (const char * )dns_info->query_question[i].qname); + cJSON_AddNumberToObject(req, "qtype", dns_info->query_question[i].qtype); + cJSON_AddNumberToObject(req, "qclass", dns_info->query_question[i].qclass); + } + cJSON_AddItemToObject(root, "rr", rr_array = cJSON_CreateArray()); + for (i = 0; i < dns_info->rr_count; i++) + { + dns_rr = &(dns_info->rr[i]); + + cJSON *rr = cJSON_CreateObject(); + cJSON_AddItemToArray(rr_array, rr); + cJSON_AddStringToObject(rr, "name", (const char * )dns_rr->name); + cJSON_AddNumberToObject(rr, "type", dns_rr->type); + cJSON_AddNumberToObject(rr, "ttl", dns_rr->ttl); + switch (dns_rr->type) + { + case DNS_TYPE_A: + inet_ntop(AF_INET, (void *)(dns_rr->rdata.a), ip_str, sizeof(ip_str)); + // printf("A: %s\n", ip_str); + cJSON_AddStringToObject(rr, "a", ip_str); + is_exist = 1; + break; + case DNS_TYPE_NS: + break; + case DNS_TYPE_MD: + break; + case DNS_TYPE_MF: + break; + case DNS_TYPE_CNAME: + cJSON_AddStringToObject(rr, "cname", (const char * )dns_rr->rdata.cname); + is_exist = 1; + break; + case DNS_TYPE_SOA: + break; + case DNS_TYPE_MB: + break; + case DNS_TYPE_MG: + break; + case DNS_TYPE_MR: + break; + case DNS_TYPE_NULL: + break; + case DNS_TYPE_WKS: + break; + case DNS_TYPE_PTR: + break; + case DNS_TYPE_HINFO: + break; + case DNS_TYPE_MINFO: + break; + case DNS_TYPE_MX: + break; + case DNS_TYPE_TXT: + break; + case DNS_TYPE_RP: + break; + case DNS_TYPE_AAAA: + inet_ntop(AF_INET6, dns_rr->rdata.aaaa, ip_str, sizeof(ip_str)); + // used_len += snprintf(buf + used_len, buflen - used_len, "[AAAA: %s]\n", ip_str); + // printf("AAAA: %s\n", ip_str); + cJSON_AddStringToObject(rr, "aaaa", ip_str); + is_exist = 1; + break; + case DNS_TYPE_OPT: + break; + case DNS_TYPE_DS: + break; + case DNS_TYPE_RRSIG: + break; + case DNS_TYPE_NSEC: + break; + case DNS_TYPE_DNSKEY: + break; + case DNS_TYPE_NSEC3: + break; + case DNS_TYPE_NSEC3PARAM: + break; + case DNS_TYPE_UNKNOWN: + break; + case DNS_QTYPE_AXFR: + continue; + break; + case DNS_QTYPE_MAILB: + continue; + break; + case DNS_QTYPE_MAILA: + continue; + break; + case DNS_QTYPE_ANY: + continue; + break; + default: + continue; + break; + } + } + // char *s = cJSON_Print(root); + char *s = cJSON_PrintUnformatted(root); + if (s) + { + // printf("%s\n", s); + push_data_to_kafka(s, sizeof(s)); + free(s); + } + if (root) + { + cJSON_Delete(root); + } + } + return PROT_STATE_GIVEME; +} + +int dns_lrj_init() +{ + // printf("********************* DNS_INIT success ***************************\n"); + /**edited by lrj 20190812 */ + int demo_plugid = 0; + runtime_log_handler = NULL; + kafka_log_handler = NULL; + int log_level = 30; + + MESA_load_profile_int_def(dns_lrj_conf_file, "DNS", "LOG_LEVEL", &log_level, 30); + MESA_load_profile_string_def(dns_lrj_conf_file, "DNS", "KAFKA_LOG_PATH", kafka_log_path, MAX_LOG_PATH_LEN, NULL); + MESA_load_profile_string_def(dns_lrj_conf_file, "DNS", "LOG_PATH", dns_log_path, MAX_LOG_PATH_LEN, NULL); + MESA_load_profile_string_def(dns_lrj_conf_file, "DNS", "BROKERS", brokers, MAX_BROKERS_LEN, NULL); + MESA_load_profile_string_def(dns_lrj_conf_file, "DNS", "TOPIC", topic, MAX_TOPIC_LEN, NULL); + MESA_load_profile_string_def(dns_lrj_conf_file, "DNS", "TOPIC", topic, MAX_TOPIC_LEN, NULL); + MESA_load_profile_string_def(dns_lrj_conf_file, "DNS", "MODULE_NAME", module_name, MAX_TOPIC_LEN, NULL); + kafka_log_handler = MESA_create_runtime_log_handle(kafka_log_path, log_level); + runtime_log_handler = MESA_create_runtime_log_handle(dns_log_path, log_level); + + if (runtime_log_handler == NULL || kafka_log_handler == NULL) + { + printf("********************* DNS_INIT error ***************************\n"); + return -1; + } + // printf("module_name:%s\n", module_name); + // printf("log:%s\n", dns_log_path); + // printf("kafka log path:%s\n", kafka_log_path); + // printf("kafka brokers:%s\n", brokers); + // printf("kafka topic:%s\n", topic); + + /* kafka init */ + if (init_kafka(0, brokers, topic) != PRODUCER_INIT_SUCCESS) + { + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"kafka init failed!!!"); + return -1; + } + // printf("********************* DNS_INIT success ***************************\n"); + return demo_plugid; +} + +void dns_lrj_destory() +{ + // printf("DNS_DESTROY\n"); + kafka_destroy(); + if (runtime_log_handler == NULL) + { + printf("TEST_APP_DESTORY out...\n"); + return; + } + MESA_destroy_runtime_log_handle(runtime_log_handler); + MESA_destroy_runtime_log_handle(kafka_log_handler); +}
\ No newline at end of file |
