summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortanghao <[email protected]>2024-11-20 16:35:39 +0800
committertanghao <[email protected]>2024-11-20 16:35:39 +0800
commit0ee83d3795136f79a4bb49521374b696f820f83b (patch)
tree8e6b69c1fe9679a91f242495580dd8472adac08b
parentf20c14744b7c8a0baba2fae0ee7879b5637aea8d (diff)
fix: sources详情增加引用属性,上传接口增加header
-rw-r--r--cn-admin/src/main/java/net/geedge/modules/entity/service/impl/EntitySourcesServiceImpl.java10
1 files changed, 9 insertions, 1 deletions
diff --git a/cn-admin/src/main/java/net/geedge/modules/entity/service/impl/EntitySourcesServiceImpl.java b/cn-admin/src/main/java/net/geedge/modules/entity/service/impl/EntitySourcesServiceImpl.java
index f365769..22ba456 100644
--- a/cn-admin/src/main/java/net/geedge/modules/entity/service/impl/EntitySourcesServiceImpl.java
+++ b/cn-admin/src/main/java/net/geedge/modules/entity/service/impl/EntitySourcesServiceImpl.java
@@ -36,6 +36,8 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
@@ -138,6 +140,8 @@ public class EntitySourcesServiceImpl extends ServiceImpl<EntitySourcesDao, Enti
@Override
public EntitySources queryById(Integer id) {
EntitySources entitySources = this.baseMapper.selectById(id);
+ List<EntityProfiles> profiles = entityProfilesService.list(new QueryWrapper<EntityProfiles>().lambda().eq(EntityProfiles::getSourceId, entitySources.getId()));
+ entitySources.setUsage(ObjectUtil.isNotEmpty(profiles)?profiles.size():0);
return entitySources;
}
@@ -273,8 +277,12 @@ public class EntitySourcesServiceImpl extends ServiceImpl<EntitySourcesDao, Enti
private void uploadDataToKafka(Integer sourceId, List<String> datas) {
try {
+ // 创建Headers实例
+ Headers headers = new RecordHeaders();
+ // 添加header
+ headers.add("source_id",sourceId.toString().getBytes());
datas.stream().forEach(data -> {
- kafkaProducer.send(new ProducerRecord<>(Constant.ENTITY_SOURCES_TOPIC_NAME, sourceId.toString(), data),new Callback() {
+ kafkaProducer.send(new ProducerRecord<>(Constant.ENTITY_SOURCES_TOPIC_NAME, null,null, null,data,headers),new Callback(){
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
log.error("[uploadDataToKafka] [error] [msg: {}]",e.getMessage());