summaryrefslogtreecommitdiff
path: root/src/objectscanner_kafka.cpp
diff options
context:
space:
mode:
authorzhangchengwei <[email protected]>2019-06-12 11:51:21 +0800
committerzhangchengwei <[email protected]>2019-06-12 11:51:21 +0800
commit36538c5b592a2cae350853da66ed40f93ddb39c7 (patch)
tree8b53c0c8002d8afd06909d6b22c292ebc7973bdf /src/objectscanner_kafka.cpp
创建
Diffstat (limited to 'src/objectscanner_kafka.cpp')
-rw-r--r--src/objectscanner_kafka.cpp331
1 files changed, 331 insertions, 0 deletions
diff --git a/src/objectscanner_kafka.cpp b/src/objectscanner_kafka.cpp
new file mode 100644
index 0000000..6b52447
--- /dev/null
+++ b/src/objectscanner_kafka.cpp
@@ -0,0 +1,331 @@
+#include <time.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/prctl.h>
+
+#include "objectscanner_main.h"
+#include "objectscanner_kafka.h"
+
+#define OBJSCAN_RESULT_TOPIC "NTC-HTTP-OBJSCAN-RESULT"
+#define OBJSCAN_HTTP_DOC_TOPIC "NTC-COLLECT-HTTP-DOC-LOG"
+#define OBJSCAN_HTTP_EXE_TOPIC "NTC-COLLECT-HTTP-EXE-LOG"
+
+extern objscan_global_info_t g_objscan_info;
+
+static void kafka_conf_log_cb(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
+{
+ if(level <= g_objscan_info.sys_log_level)
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(g_objscan_info.log_runtime, RLOG_LV_FATAL, "In log_cb, %s: %s", fac, buf);
+ }
+}
+
+static void kafka_conf_error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
+{
+ MESA_HANDLE_RUNTIME_LOGV2(g_objscan_info.log_runtime, RLOG_LV_FATAL, "In error_cb, error: %d, reason: %s", err, reason);
+}
+
+static void print_partition_list (const char *type, const rd_kafka_topic_partition_list_t *partitions)
+{
+ int i;
+ char buffer[2048];
+
+ sprintf(buffer, "Consumer group rebalanced, type: %s, ", type);
+ for (i = 0 ; i < partitions->cnt ; i++)
+ {
+ snprintf(buffer+strlen(buffer), 2048-strlen(buffer), "%s %s[%u] offset %lu",
+ i > 0 ? ",":"", partitions->elems[i].topic, partitions->elems[i].partition, partitions->elems[i].offset);
+ }
+ MESA_HANDLE_RUNTIME_LOGV2(g_objscan_info.log_runtime, RLOG_LV_FATAL, "%s", buffer);
+ printf("%s\n", buffer);
+}
+
+//����һ�����Ϸ���OFFSET���Ա�����µ����ݿ�ʼ����
+static void reset_kafka_offsets(rd_kafka_topic_partition_list_t *partitions)
+{
+ rd_kafka_resp_err_t err;
+ int i;
+
+ for (i = 0 ; i < partitions->cnt ; i++)
+ {
+ err = rd_kafka_topic_partition_list_set_offset(partitions, partitions->elems[i].topic, partitions->elems[i].partition, -1);
+ if(err != RD_KAFKA_RESP_ERR_NO_ERROR)
+ {
+ printf("Set Kafka offset error: %s\n", rd_kafka_err2str(err));
+ MESA_HANDLE_RUNTIME_LOGV2(g_objscan_info.log_runtime, RLOG_LV_FATAL, "Set Kafka offset error: %s\n", rd_kafka_err2str(err));
+ }
+ }
+}
+
+static void kafka_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque)
+{
+ switch (err)
+ {
+ case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
+ print_partition_list("assigned", partitions);
+ if(g_objscan_info.consume_from_latest)
+ {
+ reset_kafka_offsets(partitions);
+ }
+ rd_kafka_assign(rk, partitions);
+ break;
+
+ case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
+ print_partition_list("revoked", partitions);
+ rd_kafka_assign(rk, NULL);
+ break;
+
+ default:
+ MESA_HANDLE_RUNTIME_LOGV2(g_objscan_info.log_runtime, RLOG_LV_FATAL, "failed: %s\n", rd_kafka_err2str(err));
+ rd_kafka_assign(rk, NULL);
+ break;
+ }
+}
+
+int32_t rdkafka_consumer_init(void)
+{
+ char errString[512] = {0}, confbuf[128];
+ rd_kafka_conf_t *config;
+ rd_kafka_topic_conf_t *topic_conf;
+ rd_kafka_topic_partition_list_t *topics;
+ rd_kafka_resp_err_t err;
+
+ /*Consumer configuration*/
+ config = rd_kafka_conf_new();
+ rd_kafka_conf_set(config, "topic.metadata.refresh.interval.ms", "60000", errString, 512);
+ if(rd_kafka_conf_set(config, "group.id", g_objscan_info.kafka_consu_name, errString, 512) != RD_KAFKA_CONF_OK)
+ {
+ printf("rd_kafka_conf_set group.id error: %s.\n", errString);
+ MESA_HANDLE_RUNTIME_LOGV2(g_objscan_info.log_runtime, RLOG_LV_FATAL, "rd_kafka_conf_set group.id error: %s.\n", errString);
+ return -1;
+ }
+
+ /*Topic configuration*/
+ topic_conf = rd_kafka_topic_conf_new();
+ rd_kafka_topic_conf_set(topic_conf, "consume.callback.max.messages", confbuf, errString, 512);
+ rd_kafka_topic_conf_set(topic_conf, "auto.commit.enable", "true", errString, 512);
+ rd_kafka_topic_conf_set(topic_conf, "auto.offset.reset", "earliest", errString, 512); //RD_KAFKA_OFFSET_STORED��ʼû��offesetʱ
+ if(rd_kafka_topic_conf_set(topic_conf, "offset.store.method", "broker", errString, 512) != RD_KAFKA_CONF_OK)
+ {
+ printf("rd_kafka_topic_conf_set offset.store.method(broker) error: %s.\n", errString);
+ MESA_HANDLE_RUNTIME_LOGV2(g_objscan_info.log_runtime, RLOG_LV_FATAL, "rd_kafka_topic_conf_set offset.store.method(broker) error: %s.\n", errString);
+ return -1;
+ }
+ rd_kafka_conf_set_default_topic_conf(config, topic_conf);
+ rd_kafka_conf_set_rebalance_cb(config, kafka_rebalance_cb);
+ rd_kafka_conf_set_log_cb(config, kafka_conf_log_cb);
+ rd_kafka_conf_set_error_cb(config, kafka_conf_error_cb);
+
+ if(!(g_objscan_info.kafka_consumer = rd_kafka_new(RD_KAFKA_CONSUMER, config, errString, sizeof(errString))))
+ {
+ printf("rd_kafka_new error: %s.\n", errString);
+ MESA_HANDLE_RUNTIME_LOGV2(g_objscan_info.log_runtime, RLOG_LV_FATAL, "rd_kafka_new error: %s.\n", errString);
+ return -1;
+ }
+
+ if(rd_kafka_brokers_add(g_objscan_info.kafka_consumer, g_objscan_info.kafka_brokers) == 0)
+ {
+ printf("rd_kafka_brokers_add: %s error.\n", g_objscan_info.kafka_brokers);
+ MESA_HANDLE_RUNTIME_LOGV2(g_objscan_info.log_runtime, RLOG_LV_FATAL, "rd_kafka_brokers_add %s error.", g_objscan_info.kafka_brokers);
+ return -1;
+ }
+ rd_kafka_poll_set_consumer(g_objscan_info.kafka_consumer);
+
+ topics = rd_kafka_topic_partition_list_new(2);
+ rd_kafka_topic_partition_list_add(topics, OBJSCAN_HTTP_DOC_TOPIC, -1);
+ rd_kafka_topic_partition_list_add(topics, OBJSCAN_HTTP_EXE_TOPIC, -1);
+ err = rd_kafka_subscribe(g_objscan_info.kafka_consumer, topics);
+ if(err)
+ {
+ printf("%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
+ MESA_HANDLE_RUNTIME_LOGV2(g_objscan_info.log_runtime, RLOG_LV_FATAL, "Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
+ return -2;
+ }
+ return 0;
+}
+
+static void produce_delivery_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque)
+{
+ if(rkmessage->err)
+ {
+ atomic_inc(&(g_objscan_info.statistic.num[MESSAGE_OTH_FAIL]));
+ }
+ else
+ {
+ atomic_inc(&(g_objscan_info.statistic.num[MESSAGE_SUCC]));
+ }
+}
+
+int32_t rdkafka_producer_init(void)
+{
+ char errString[512] = {0}, config_buf[128];
+ rd_kafka_conf_t *config;
+ rd_kafka_topic_conf_t *topic_conf;
+
+ /*Consumer configuration*/
+ config = rd_kafka_conf_new();
+ sprintf(config_buf, "%u", g_objscan_info.produce_q_size);
+ rd_kafka_conf_set(config, "queue.buffering.max.messages", config_buf, errString, sizeof(errString));
+ rd_kafka_conf_set(config, "topic.metadata.refresh.interval.ms", "60000", errString, sizeof(errString));
+ rd_kafka_conf_set_dr_msg_cb(config, produce_delivery_msg_cb);
+
+ if(!(g_objscan_info.kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, config, errString, sizeof(errString))))
+ {
+ printf("rd_kafka_new error: %s.\n", errString);
+ MESA_HANDLE_RUNTIME_LOGV2(g_objscan_info.log_runtime, RLOG_LV_FATAL, "rd_kafka_new error: %s.\n", errString);
+ return -1;
+ }
+ if(rd_kafka_brokers_add(g_objscan_info.kafka_producer, g_objscan_info.kafka_brokers) == 0)
+ {
+ printf("rd_kafka_brokers_add: %s error.\n", g_objscan_info.kafka_brokers);
+ MESA_HANDLE_RUNTIME_LOGV2(g_objscan_info.log_runtime, RLOG_LV_FATAL, "rd_kafka_brokers_add %s error.", g_objscan_info.kafka_brokers);
+ return -1;
+ }
+
+ /*Topic configuration*/
+ sprintf(config_buf, "%u", g_objscan_info.kafka_req_ack);
+ topic_conf = rd_kafka_topic_conf_new();
+ rd_kafka_topic_conf_set(topic_conf, "request.required.acks", config_buf, errString, 512);
+ g_objscan_info.produc_topic= rd_kafka_topic_new(g_objscan_info.kafka_producer, OBJSCAN_RESULT_TOPIC, topic_conf);
+ if(NULL==g_objscan_info.produc_topic)
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(g_objscan_info.log_runtime, RLOG_LV_FATAL, "create topic: %s error.", OBJSCAN_RESULT_TOPIC);
+ return -1;
+ }
+ return 0;
+}
+
+void destroy_parsed_message(message_meta_item_t *message)
+{
+ future_destroy(message->future);
+ free(message->content);
+
+ cJSON_Delete(message->meta_json);
+ free(message);
+}
+
+static message_meta_item_t *parse_rkmessage_cjson(const char* data, int datalen)
+{
+ cJSON *root, *psub = NULL;
+ char *buffer = (char *)malloc(datalen+1), *pos_colon;
+ message_meta_item_t *message;
+
+ memcpy(buffer, data, datalen);
+ buffer[datalen] = '\0';
+ root = cJSON_Parse(buffer);
+ free(buffer);
+ if(NULL == root)
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(g_objscan_info.log_runtime, RLOG_LV_INFO, "recv rkmessage error: cJSON_Parse failed.");
+ return NULL;
+ }
+
+ if(NULL==(psub=cJSON_GetObjectItem(root, "res_body_file")))
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(g_objscan_info.log_runtime, RLOG_LV_INFO, "recv rkmessage fail: path is NULL.");
+ cJSON_Delete(root);
+ return NULL;
+ }
+
+ if(psub->type!=cJSON_String || strncmp(psub->valuestring, "http://", strlen("http://")) ||
+ NULL==(pos_colon=strchr(psub->valuestring+strlen("http://"), '/')))
+ {
+ MESA_HANDLE_RUNTIME_LOGV2(g_objscan_info.log_runtime, RLOG_LV_INFO, "recv rkmessage fail: message format error.");
+ cJSON_Delete(root);
+ return NULL;
+ }
+
+ message = (message_meta_item_t *)calloc(1, sizeof(message_meta_item_t));
+ message->meta_json = root;
+ snprintf(message->object_uri, 256, "%s", pos_colon+1);
+
+ if(NULL!=(psub=cJSON_GetObjectItem(root, "found_time")))
+ {
+ message->found_time = psub->valuedouble;
+ }
+ else
+ {
+ message->found_time = time(NULL);
+ }
+ return message;
+}
+
+void* thread_delay_msg_queue(void *arg)
+{
+ message_meta_item_t *message;
+ long node_size;
+ time_t now;
+
+ prctl(PR_SET_NAME, "queue_delay");
+
+ while(1)
+ {
+ node_size = sizeof(message_meta_item_t *);
+ if(MESA_lqueue_get_head(g_objscan_info.queue_msg, (void *)&message, &node_size))
+ {
+ usleep(100000);
+ continue;
+ }
+ now = time(NULL);
+ if((message->found_time + g_objscan_info.delay_time_s) > now) //��֤�ӳ�g_objscan_info.delay_time_sʱ��
+ {
+ sleep(g_objscan_info.delay_time_s - (now - message->found_time));
+ }
+
+ if(MESA_lqueue_join_tail(g_objscan_info.queue_delay, (void *)&message, sizeof(message)) < 0)
+ {
+ atomic_inc(&(g_objscan_info.statistic.num[MESSAGE_DROP]));
+ destroy_parsed_message(message);
+ }
+ }
+ return NULL;
+}
+
+void* thread_recv_kafka_msg(void *arg)
+{
+ rd_kafka_message_t *rkmessage;
+ message_meta_item_t *parsed_msg;
+ int32_t ret;
+
+ prctl(PR_SET_NAME, "kafka_rcvmsg");
+ while(1)
+ {
+ rkmessage = rd_kafka_consumer_poll(g_objscan_info.kafka_consumer, g_objscan_info.consume_timeout_ms);
+ if(rkmessage)
+ {
+ if(rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR)
+ {
+ if(NULL==(parsed_msg=parse_rkmessage_cjson((const char*)rkmessage->payload, rkmessage->len)))
+ {
+ atomic_inc(&(g_objscan_info.statistic.num[MESSAGE_RCV_ERROR]));
+ rd_kafka_message_destroy(rkmessage);
+ continue;
+ }
+ cJSON_AddStringToObject(parsed_msg->meta_json, "topic_name", rd_kafka_topic_name(rkmessage->rkt));
+
+ ret = MESA_QUEUE_RET_CANT_GET_LOCK;
+ while(ret==MESA_QUEUE_RET_CANT_GET_LOCK || ret==MESA_QUEUE_RET_GET_LOCK_TMOUT)
+ {
+ ret = MESA_lqueue_try_join_tail(g_objscan_info.queue_msg, (void *)&parsed_msg, sizeof(parsed_msg));
+ }
+ if(ret < 0)
+ {
+ atomic_inc(&(g_objscan_info.statistic.num[MESSAGE_DROP]));
+ destroy_parsed_message(parsed_msg);
+ }
+ }
+ rd_kafka_message_destroy(rkmessage);
+ }
+ else
+ {
+ usleep(200000);
+ }
+ }
+
+ return NULL;
+}
+