diff options
| author | tanghao <[email protected]> | 2024-11-20 16:35:39 +0800 |
|---|---|---|
| committer | tanghao <[email protected]> | 2024-11-20 16:35:39 +0800 |
| commit | 0ee83d3795136f79a4bb49521374b696f820f83b (patch) | |
| tree | 8e6b69c1fe9679a91f242495580dd8472adac08b | |
| parent | f20c14744b7c8a0baba2fae0ee7879b5637aea8d (diff) | |
fix: sources详情增加引用属性,上传接口增加header
| -rw-r--r-- | cn-admin/src/main/java/net/geedge/modules/entity/service/impl/EntitySourcesServiceImpl.java | 10 |
1 files changed, 9 insertions, 1 deletions
diff --git a/cn-admin/src/main/java/net/geedge/modules/entity/service/impl/EntitySourcesServiceImpl.java b/cn-admin/src/main/java/net/geedge/modules/entity/service/impl/EntitySourcesServiceImpl.java index f365769..22ba456 100644 --- a/cn-admin/src/main/java/net/geedge/modules/entity/service/impl/EntitySourcesServiceImpl.java +++ b/cn-admin/src/main/java/net/geedge/modules/entity/service/impl/EntitySourcesServiceImpl.java @@ -36,6 +36,8 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.multipart.MultipartFile; @@ -138,6 +140,8 @@ public class EntitySourcesServiceImpl extends ServiceImpl<EntitySourcesDao, Enti @Override public EntitySources queryById(Integer id) { EntitySources entitySources = this.baseMapper.selectById(id); + List<EntityProfiles> profiles = entityProfilesService.list(new QueryWrapper<EntityProfiles>().lambda().eq(EntityProfiles::getSourceId, entitySources.getId())); + entitySources.setUsage(ObjectUtil.isNotEmpty(profiles)?profiles.size():0); return entitySources; } @@ -273,8 +277,12 @@ public class EntitySourcesServiceImpl extends ServiceImpl<EntitySourcesDao, Enti private void uploadDataToKafka(Integer sourceId, List<String> datas) { try { + // 创建Headers实例 + Headers headers = new RecordHeaders(); + // 添加header + headers.add("source_id",sourceId.toString().getBytes()); datas.stream().forEach(data -> { - kafkaProducer.send(new ProducerRecord<>(Constant.ENTITY_SOURCES_TOPIC_NAME, sourceId.toString(), data),new Callback() { + kafkaProducer.send(new ProducerRecord<>(Constant.ENTITY_SOURCES_TOPIC_NAME, null,null, null,data,headers),new Callback(){ public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) { log.error("[uploadDataToKafka] [error] [msg: {}]",e.getMessage()); |
