1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
/*
* KafkaProducer.cpp
*
* Created on:
* Author:
*/
#include "KafkaProducer.h"
KafkaProducer::KafkaProducer(const string& b):brokers(b)
{
partition = RD_KAFKA_PARTITION_UA;
};
int KafkaProducer::KafkaConnection()
{
config = rd_kafka_conf_new();
rd_kafka_conf_set(config, "queue.buffering.max.messages", "1000000", NULL, 0);
rd_kafka_conf_set(config, "topic.metadata.refresh.interval.ms", "600000", NULL, 0);
if (!(kafka = rd_kafka_new(RD_KAFKA_PRODUCER, config, errString, sizeof(errString))))
{
return -1;
}
if (rd_kafka_brokers_add(kafka, brokers.c_str()) == 0)
{
return -2;
}
return 0;
}
KafkaProducer::~KafkaProducer()
{
rd_kafka_destroy(kafka);
for(iter = topicHandleMap.begin(); iter!=topicHandleMap.end(); ++iter)
{
rd_kafka_topic_destroy(iter->second);
}
rd_kafka_wait_destroyed(5000);
}
rd_kafka_topic_t* KafkaProducer::CreateTopicHandle(const string& topicName)
{
if(!topicHandleMap.count(topicName))
{
rd_kafka_topic_conf_t* config = rd_kafka_topic_conf_new();
rd_kafka_topic_t* rkt = rd_kafka_topic_new(kafka, topicName.c_str(), config);
topicHandleMap[topicName] = rkt;
}
return topicHandleMap[topicName];
}
int KafkaProducer::SendData(string& topicName, void *payload, size_t paylen)
{
rd_kafka_topic_t* currentTopicHandle = topicHandleMap[topicName];
int status = rd_kafka_produce(currentTopicHandle, partition, RD_KAFKA_MSG_F_COPY, payload,
paylen, NULL, 0, NULL);
return status;
}
int KafkaProducer::MessageInQueue()
{
return rd_kafka_outq_len(kafka);
}
void KafkaProducer::KafkaPoll(int interval)
{
rd_kafka_poll(kafka, interval);
}
|