diff options
Diffstat (limited to 'src/main/java/cn/ac/iie/dao/KafkaDB.java')
| -rw-r--r-- | src/main/java/cn/ac/iie/dao/KafkaDB.java | 81 |
1 files changed, 81 insertions, 0 deletions
diff --git a/src/main/java/cn/ac/iie/dao/KafkaDB.java b/src/main/java/cn/ac/iie/dao/KafkaDB.java new file mode 100644 index 0000000..4d15449 --- /dev/null +++ b/src/main/java/cn/ac/iie/dao/KafkaDB.java @@ -0,0 +1,81 @@ +package cn.ac.iie.dao; + +import cn.ac.iie.bean.voipSipFromToLog.RouteRelationLog; +import cn.ac.iie.bean.voipSipOrigin.SipOriginALL; +import cn.ac.iie.common.RealtimeCountConfig; +import com.alibaba.fastjson.JSONObject; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.log4j.Logger; + +import java.text.SimpleDateFormat; +import java.util.LinkedList; +import java.util.Properties; + +/** + * 发送数据的kafka工具类 + */ +public class KafkaDB { + private static Logger logger = Logger.getLogger(KafkaDB.class); + + private static Producer<String, String> producer; + + private static KafkaDB kafkaDB; + + private KafkaDB() { + getProducer(); + } + + public static KafkaDB getInstance() { + if (kafkaDB == null) { + kafkaDB = new KafkaDB(); + } + return kafkaDB; + } + + public void siporiLog2KafkaFromSipInsertBoltDC(LinkedList<String> sipOriJsonS) { + long time = System.currentTimeMillis() / 1000L; + for (String sipOriJson : sipOriJsonS) { + try { + SipOriginALL sipOriginLog = JSONObject.parseObject(sipOriJson, SipOriginALL.class); + sipOriginLog.setStat_time(time); + producer.send(new ProducerRecord<>(RealtimeCountConfig.KAFKA_SIP_COMPLEMENT_TOPIC, JSONObject.toJSONString(sipOriginLog))); + } catch (Exception e) { + e.printStackTrace(); + } + } + producer.flush(); + } + + public void routeRelatLog2KafkaFromSipInsertBoltDC(LinkedList<String> routeJsonS) { + long time = System.currentTimeMillis() / 1000L; + for (String routeJson : routeJsonS) { + try { + RouteRelationLog routeRelationLog = JSONObject.parseObject(routeJson, RouteRelationLog.class); + routeRelationLog.setTimestamp(time); + producer.send(new ProducerRecord<>(RealtimeCountConfig.KAFKA_ROUTE_RELATION_TOPIC, JSONObject.toJSONString(routeRelationLog))); + } catch (Exception e) { + e.printStackTrace(); + } + } + producer.flush(); + } + + /** + * 根据kafka生产者配置信息初始化kafka消息生产者,只初始化一次 + */ + private void getProducer() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", RealtimeCountConfig.BOOTSTRAP_OUTPUT_SERVERS); + properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("acks", "1"); + properties.put("linger.ms", "2"); + properties.put("request.timeout.ms", 20000); + properties.put("batch.size", 262144); + properties.put("buffer.memory", 33554432); + producer = new KafkaProducer<String, String>(properties); + } + +} |
