summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--dns/dns_lrj.c336
1 files changed, 336 insertions, 0 deletions
diff --git a/dns/dns_lrj.c b/dns/dns_lrj.c
new file mode 100644
index 0000000..07cce6d
--- /dev/null
+++ b/dns/dns_lrj.c
@@ -0,0 +1,336 @@
+#include <stdio.h>
+#include <string.h>
+#include <time.h>
+#include <arpa/inet.h>
+#include <pcap/pcap.h>
+#include <assert.h>
+#include <syslog.h>
+#include <signal.h>
+#include <MESA/stream.h>
+#include <MESA/dns.h>
+#include <MESA/MESA_prof_load.h>
+#include <MESA/MESA_handle_logger.h>
+#include "rdkafka.h"
+#include "cJSON.h"
+
+#define MAX_LOG_PATH_LEN 256
+// #define DEBUG 1
+#ifndef MAX_MODULE_NAME_LEN
+#define MAX_MODULE_NAME_LEN 256
+#endif
+#ifndef MAX_BROKERS_LEN
+#define MAX_BROKERS_LEN 256
+#endif
+#ifndef MAX_TOPIC_LEN
+#define MAX_TOPIC_LEN 256
+#endif
+
+const char *dns_lrj_conf_file = "./conf/dns_lrj/dns_lrj.conf";
+static void *runtime_log_handler;
+char dns_log_path[MAX_LOG_PATH_LEN];
+char module_name[MAX_MODULE_NAME_LEN];
+// static const char *module_name = "DNS_LRJ";
+// static const char *kafka_log_path = "./log/lirenjie_vxlan/kafka.log";
+/* kafka */
+char kafka_log_path[MAX_LOG_PATH_LEN];
+static void *kafka_log_handler;
+static const int PRODUCER_INIT_FAILED = -1;
+static const int PRODUCER_INIT_SUCCESS = 0;
+static const int PUSH_DATA_FAILED = -1;
+static const int PUSH_DATA_SUCCESS = 0;
+static int partition;
+//rd
+static rd_kafka_t *kafka_producer;
+static rd_kafka_conf_t *conf;
+// topic
+static rd_kafka_topic_t *rkt;
+static rd_kafka_topic_conf_t *topic_conf;
+// char errstr[512]={0};
+char brokers[MAX_BROKERS_LEN];
+char topic[MAX_TOPIC_LEN];
+
+static void logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
+{
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ /*fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
+ (int)tv.tv_sec, (int)(tv.tv_usec / 1000),
+ level, fac, rk ? rd_kafka_name(rk) : NULL, buf);*/
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name, "%u.%03u RDKAFKA-%i-%s: %s: %s",
+ (int)tv.tv_sec, (int)(tv.tv_usec / 1000),
+ level, fac, rk ? rd_kafka_name(rk) : NULL, buf);
+}
+
+static int init_kafka(int partition_, char *brokers_, char *topic_)
+{
+ char tmp[16];
+ char errstr[1024];
+ partition = partition_;
+ /* Kafka configuration */
+ conf = rd_kafka_conf_new();
+ //set logger :register log function
+ rd_kafka_conf_set_log_cb(conf, logger);
+ /* Quick termination */
+ snprintf(tmp, sizeof(tmp), "%i", SIGIO);
+ rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
+ //rd_kafka_conf_set(conf, "producer.type", "kafka.producer.AyncProducer", errstr, sizeof(errstr));
+ rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", errstr, sizeof(errstr));
+ rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000",errstr, sizeof(errstr));
+ rd_kafka_conf_set(conf, "request.required.acks", "0", errstr, sizeof(errstr));
+ /*topic configuration*/
+ topic_conf = rd_kafka_topic_conf_new();
+
+ if (conf == NULL)
+ {
+ //fprintf(stderr, "***** Failed to create new conf *******\n");
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "***** Failed to create new conf *******");
+ return PRODUCER_INIT_FAILED;
+ }
+ kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, (size_t)sizeof(errstr));
+ if (kafka_producer == NULL)
+ {
+ /*fprintf(stderr, "***** kafka_producer is null *******\n");
+ fprintf(stderr, "*****Failed to create new producer: %s*******\n", errstr);*/
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "***** kafka_producer is null, *******");
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "*****Failed to create new producer: %s*******\n", errstr);
+ return PRODUCER_INIT_FAILED;
+ }
+
+ rd_kafka_set_log_level(kafka_producer, LOG_DEBUG);
+
+ /* Add brokers */
+ if (rd_kafka_brokers_add(kafka_producer, brokers_) == 0)
+ {
+ //fprintf(stderr, "****** No valid brokers specified********\n");
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "****** No valid brokers specified********");
+ return PRODUCER_INIT_FAILED;
+ }
+ /* Create topic */
+ rkt = rd_kafka_topic_new(kafka_producer, topic_, topic_conf);
+
+ return PRODUCER_INIT_SUCCESS;
+}
+
+static void kafka_destroy()
+{
+ rd_kafka_topic_destroy(rkt);
+ rd_kafka_destroy(kafka_producer);
+}
+
+static int push_data_to_kafka(char *buffer, int buf_len)
+{
+ int ret;
+ if (buffer == NULL)
+ {
+ return 0;
+ }
+ // ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
+ ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
+ if (ret < 0)
+ {
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Failed to produce to topic %s : %s",
+ rd_kafka_topic_name(rkt), rd_kafka_err2str(rd_kafka_last_error()));
+ return PUSH_DATA_FAILED;
+ }
+ else if(ret == 0){
+ // MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Sent %zd bytes to topic %s partition %i",
+ // buf_len, rd_kafka_topic_name(rkt), partition);
+ return PUSH_DATA_SUCCESS;
+ }
+ return ret==0 ? PUSH_DATA_SUCCESS:PUSH_DATA_FAILED;
+}
+/******************************************** */
+
+char dns_lrj_entry(stSessionInfo *session_info, void **pme, int thread_seq, struct streaminfo *a_udp, void *a_packet)
+{
+ // printf("********************* DNS_ENTRY success ***************************\n");
+ // int dns_sec = 0;
+
+ int i = 0;
+ char ip_str[128];
+ dns_rr_t *dns_rr = NULL;
+
+ dns_info_t *dns_info = (dns_info_t *)session_info->app_info;
+
+ if (dns_info != NULL && dns_info->hdr_info.qr == 1 && dns_info->hdr_info.ancount > 0)
+ {
+ int is_exist = 0;
+ cJSON *root, *req_array, *rr_array;
+ root = cJSON_CreateObject();
+ cJSON_AddNumberToObject(root, "qdcount", dns_info->hdr_info.qdcount);
+ cJSON_AddNumberToObject(root, "ancount", dns_info->hdr_info.ancount);
+ cJSON_AddNumberToObject(root, "aucount", dns_info->hdr_info.aucount);
+ cJSON_AddNumberToObject(root, "adcount", dns_info->hdr_info.adcount);
+ // printf("ANSWER RSS:%d\n", dns_info->hdr_info.ancount);
+ // printf("Authority RSS RSS:%d\n", dns_info->hdr_info.aucount);
+ // printf("Additional RSS:%d\n", dns_info->hdr_info.adcount);
+ cJSON_AddItemToObject(root, "question", req_array = cJSON_CreateArray());
+ for (i = 0; i < dns_info->hdr_info.qdcount && (&dns_info->query_question[i]) != NULL; i++)
+ {
+ // printf("Req name:%s\n", dns_info->query_question[i].qname);
+ cJSON *req = cJSON_CreateObject();
+ cJSON_AddItemToArray(req_array, req);
+ cJSON_AddStringToObject(req, "qname", (const char * )dns_info->query_question[i].qname);
+ cJSON_AddNumberToObject(req, "qtype", dns_info->query_question[i].qtype);
+ cJSON_AddNumberToObject(req, "qclass", dns_info->query_question[i].qclass);
+ }
+ cJSON_AddItemToObject(root, "rr", rr_array = cJSON_CreateArray());
+ for (i = 0; i < dns_info->rr_count; i++)
+ {
+ dns_rr = &(dns_info->rr[i]);
+
+ cJSON *rr = cJSON_CreateObject();
+ cJSON_AddItemToArray(rr_array, rr);
+ cJSON_AddStringToObject(rr, "name", (const char * )dns_rr->name);
+ cJSON_AddNumberToObject(rr, "type", dns_rr->type);
+ cJSON_AddNumberToObject(rr, "ttl", dns_rr->ttl);
+ switch (dns_rr->type)
+ {
+ case DNS_TYPE_A:
+ inet_ntop(AF_INET, (void *)(dns_rr->rdata.a), ip_str, sizeof(ip_str));
+ // printf("A: %s\n", ip_str);
+ cJSON_AddStringToObject(rr, "a", ip_str);
+ is_exist = 1;
+ break;
+ case DNS_TYPE_NS:
+ break;
+ case DNS_TYPE_MD:
+ break;
+ case DNS_TYPE_MF:
+ break;
+ case DNS_TYPE_CNAME:
+ cJSON_AddStringToObject(rr, "cname", (const char * )dns_rr->rdata.cname);
+ is_exist = 1;
+ break;
+ case DNS_TYPE_SOA:
+ break;
+ case DNS_TYPE_MB:
+ break;
+ case DNS_TYPE_MG:
+ break;
+ case DNS_TYPE_MR:
+ break;
+ case DNS_TYPE_NULL:
+ break;
+ case DNS_TYPE_WKS:
+ break;
+ case DNS_TYPE_PTR:
+ break;
+ case DNS_TYPE_HINFO:
+ break;
+ case DNS_TYPE_MINFO:
+ break;
+ case DNS_TYPE_MX:
+ break;
+ case DNS_TYPE_TXT:
+ break;
+ case DNS_TYPE_RP:
+ break;
+ case DNS_TYPE_AAAA:
+ inet_ntop(AF_INET6, dns_rr->rdata.aaaa, ip_str, sizeof(ip_str));
+ // used_len += snprintf(buf + used_len, buflen - used_len, "[AAAA: %s]\n", ip_str);
+ // printf("AAAA: %s\n", ip_str);
+ cJSON_AddStringToObject(rr, "aaaa", ip_str);
+ is_exist = 1;
+ break;
+ case DNS_TYPE_OPT:
+ break;
+ case DNS_TYPE_DS:
+ break;
+ case DNS_TYPE_RRSIG:
+ break;
+ case DNS_TYPE_NSEC:
+ break;
+ case DNS_TYPE_DNSKEY:
+ break;
+ case DNS_TYPE_NSEC3:
+ break;
+ case DNS_TYPE_NSEC3PARAM:
+ break;
+ case DNS_TYPE_UNKNOWN:
+ break;
+ case DNS_QTYPE_AXFR:
+ continue;
+ break;
+ case DNS_QTYPE_MAILB:
+ continue;
+ break;
+ case DNS_QTYPE_MAILA:
+ continue;
+ break;
+ case DNS_QTYPE_ANY:
+ continue;
+ break;
+ default:
+ continue;
+ break;
+ }
+ }
+ // char *s = cJSON_Print(root);
+ char *s = cJSON_PrintUnformatted(root);
+ if (s)
+ {
+ // printf("%s\n", s);
+ push_data_to_kafka(s, sizeof(s));
+ free(s);
+ }
+ if (root)
+ {
+ cJSON_Delete(root);
+ }
+ }
+ return PROT_STATE_GIVEME;
+}
+
+int dns_lrj_init()
+{
+ // printf("********************* DNS_INIT success ***************************\n");
+ /**edited by lrj 20190812 */
+ int demo_plugid = 0;
+ runtime_log_handler = NULL;
+ kafka_log_handler = NULL;
+ int log_level = 30;
+
+ MESA_load_profile_int_def(dns_lrj_conf_file, "DNS", "LOG_LEVEL", &log_level, 30);
+ MESA_load_profile_string_def(dns_lrj_conf_file, "DNS", "KAFKA_LOG_PATH", kafka_log_path, MAX_LOG_PATH_LEN, NULL);
+ MESA_load_profile_string_def(dns_lrj_conf_file, "DNS", "LOG_PATH", dns_log_path, MAX_LOG_PATH_LEN, NULL);
+ MESA_load_profile_string_def(dns_lrj_conf_file, "DNS", "BROKERS", brokers, MAX_BROKERS_LEN, NULL);
+ MESA_load_profile_string_def(dns_lrj_conf_file, "DNS", "TOPIC", topic, MAX_TOPIC_LEN, NULL);
+ MESA_load_profile_string_def(dns_lrj_conf_file, "DNS", "TOPIC", topic, MAX_TOPIC_LEN, NULL);
+ MESA_load_profile_string_def(dns_lrj_conf_file, "DNS", "MODULE_NAME", module_name, MAX_TOPIC_LEN, NULL);
+ kafka_log_handler = MESA_create_runtime_log_handle(kafka_log_path, log_level);
+ runtime_log_handler = MESA_create_runtime_log_handle(dns_log_path, log_level);
+
+ if (runtime_log_handler == NULL || kafka_log_handler == NULL)
+ {
+ printf("********************* DNS_INIT error ***************************\n");
+ return -1;
+ }
+ // printf("module_name:%s\n", module_name);
+ // printf("log:%s\n", dns_log_path);
+ // printf("kafka log path:%s\n", kafka_log_path);
+ // printf("kafka brokers:%s\n", brokers);
+ // printf("kafka topic:%s\n", topic);
+
+ /* kafka init */
+ if (init_kafka(0, brokers, topic) != PRODUCER_INIT_SUCCESS)
+ {
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"kafka init failed!!!");
+ return -1;
+ }
+ // printf("********************* DNS_INIT success ***************************\n");
+ return demo_plugid;
+}
+
+void dns_lrj_destory()
+{
+ // printf("DNS_DESTROY\n");
+ kafka_destroy();
+ if (runtime_log_handler == NULL)
+ {
+ printf("TEST_APP_DESTORY out...\n");
+ return;
+ }
+ MESA_destroy_runtime_log_handle(runtime_log_handler);
+ MESA_destroy_runtime_log_handle(kafka_log_handler);
+} \ No newline at end of file