summaryrefslogtreecommitdiff
path: root/src/KafkaProducer.cpp
blob: 197c2df080555b99a8a99c298ba3f44e7f8ab972 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/*
 * KafkaProducer.cpp
 *
 *  Created on: 
 *      Author: 
 */
#include "KafkaProducer.h"

KafkaProducer::KafkaProducer(const string& b):brokers(b)
{
	partition = RD_KAFKA_PARTITION_UA;
};

int KafkaProducer::KafkaConnection()
{
	config = rd_kafka_conf_new();
	rd_kafka_conf_set(config, "queue.buffering.max.messages", "1000000", NULL, 0);
	rd_kafka_conf_set(config, "topic.metadata.refresh.interval.ms", "600000", NULL, 0);

	if (!(kafka = rd_kafka_new(RD_KAFKA_PRODUCER, config, errString, sizeof(errString))))
	{
		return -1;
	}

	if (rd_kafka_brokers_add(kafka, brokers.c_str()) == 0)
	{
		return -2;
	}

	return 0;
}

KafkaProducer::~KafkaProducer()
{
	rd_kafka_destroy(kafka);
	for(iter = topicHandleMap.begin(); iter!=topicHandleMap.end(); ++iter)
	{
		rd_kafka_topic_destroy(iter->second);
	}
	rd_kafka_wait_destroyed(5000);
}

rd_kafka_topic_t* KafkaProducer::CreateTopicHandle(const string& topicName)
{
	if(!topicHandleMap.count(topicName))
	{
		rd_kafka_topic_conf_t* config = rd_kafka_topic_conf_new();
		rd_kafka_topic_t* rkt = rd_kafka_topic_new(kafka, topicName.c_str(), config);
		topicHandleMap[topicName] = rkt;
	}
	return topicHandleMap[topicName];
}

int KafkaProducer::SendData(string& topicName, void *payload, size_t paylen)
{
	rd_kafka_topic_t* currentTopicHandle = topicHandleMap[topicName];
		
	int status = rd_kafka_produce(currentTopicHandle, partition, RD_KAFKA_MSG_F_COPY, payload,
			paylen, NULL, 0, NULL);
	return status;

}

int KafkaProducer::MessageInQueue()
{
	return rd_kafka_outq_len(kafka);
}

void KafkaProducer::KafkaPoll(int interval)
{
	rd_kafka_poll(kafka, interval);
}