diff options
| author | houjinchuan <[email protected]> | 2023-09-26 16:39:27 +0800 |
|---|---|---|
| committer | houjinchuan <[email protected]> | 2023-09-26 16:39:27 +0800 |
| commit | 8f64e062f9ea8d15bba9b2a8ddd36515af71412a (patch) | |
| tree | 0fb37973d17737ae92d102257e07827b3b8d12ac /src/main/java/com/zdjizhi/syncfile/consumer | |
| parent | bea54cff553e868e69f2209918a1a2eab1f62ea3 (diff) | |
Diffstat (limited to 'src/main/java/com/zdjizhi/syncfile/consumer')
| -rw-r--r-- | src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java | 27 |
1 files changed, 20 insertions, 7 deletions
diff --git a/src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java b/src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java index ff33ba2..4df5f4d 100644 --- a/src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java +++ b/src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java @@ -9,6 +9,7 @@ import com.zdjizhi.syncfile.core.SyncFiles; import com.zdjizhi.syncfile.entity.Source; import com.zdjizhi.syncfile.entity.SysFileSync; import com.zdjizhi.syncfile.monitor.MonitorProperties; +import com.zdjizhi.syncfile.utils.HttpUtil; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; @@ -17,6 +18,7 @@ import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; @Component public class KafkaConsumerListener { @@ -26,6 +28,10 @@ public class KafkaConsumerListener { SyncFiles syncFiles; @Autowired MonitorProperties monitorProperties; + @Autowired + HttpUtil httpUtil; + @Autowired + ThreadPoolExecutor threadPool; @KafkaListener(topics = {"${kafka.consumer.topic}"}, containerFactory = "kafkaListenerContainerFactory") public void listen(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) { @@ -36,19 +42,26 @@ public class KafkaConsumerListener { JSONObject jsonObj = (JSONObject) JSON.parse(record.value().toString()); SysFileSync sysFileSync = JSON.toJavaObject(jsonObj, SysFileSync.class); if (sysFileSync != null) { - List<Source> sourceList = sysFileSync.getSourceList(); - if(sourceList.size() < 1){ - log.error("kafka data error, sourceList is null. kafka data: "+record.value().toString()); + List<Source> sourceList = sysFileSync.getSource_list(); + if (sourceList.size() < 1) { + log.error("kafka data error, sourceList is null. kafka data: " + record.value().toString()); monitorProperties.addFileSyncError(); - }else { + } else { fileList.add(sourceList); } - }else { - log.error("parse kafka data error. kafka data: "+record.value().toString()); + } else { + log.error("parse kafka data error. kafka data: " + record.value().toString()); monitorProperties.addFileSyncError(); } + monitorProperties.addKafkaRecordCount(); } - syncFiles.syncFiles(fileList); + while (true) { + if (threadPool.getActiveCount() < threadPool.getMaximumPoolSize()) { + threadPool.submit(() -> syncFiles.syncFiles(fileList)); + break; + } + } + ack.acknowledge(); } catch (Exception e) { log.error("consume kafka data error.", e); monitorProperties.addFileSyncError(); |
