summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/syncfile/consumer
diff options
context:
space:
mode:
authorhoujinchuan <[email protected]>2023-09-26 16:39:27 +0800
committerhoujinchuan <[email protected]>2023-09-26 16:39:27 +0800
commit8f64e062f9ea8d15bba9b2a8ddd36515af71412a (patch)
tree0fb37973d17737ae92d102257e07827b3b8d12ac /src/main/java/com/zdjizhi/syncfile/consumer
parentbea54cff553e868e69f2209918a1a2eab1f62ea3 (diff)
优化代码及性能HEADmain
Diffstat (limited to 'src/main/java/com/zdjizhi/syncfile/consumer')
-rw-r--r--src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java27
1 files changed, 20 insertions, 7 deletions
diff --git a/src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java b/src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java
index ff33ba2..4df5f4d 100644
--- a/src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java
+++ b/src/main/java/com/zdjizhi/syncfile/consumer/KafkaConsumerListener.java
@@ -9,6 +9,7 @@ import com.zdjizhi.syncfile.core.SyncFiles;
import com.zdjizhi.syncfile.entity.Source;
import com.zdjizhi.syncfile.entity.SysFileSync;
import com.zdjizhi.syncfile.monitor.MonitorProperties;
+import com.zdjizhi.syncfile.utils.HttpUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
@@ -17,6 +18,7 @@ import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
@Component
public class KafkaConsumerListener {
@@ -26,6 +28,10 @@ public class KafkaConsumerListener {
SyncFiles syncFiles;
@Autowired
MonitorProperties monitorProperties;
+ @Autowired
+ HttpUtil httpUtil;
+ @Autowired
+ ThreadPoolExecutor threadPool;
@KafkaListener(topics = {"${kafka.consumer.topic}"}, containerFactory = "kafkaListenerContainerFactory")
public void listen(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
@@ -36,19 +42,26 @@ public class KafkaConsumerListener {
JSONObject jsonObj = (JSONObject) JSON.parse(record.value().toString());
SysFileSync sysFileSync = JSON.toJavaObject(jsonObj, SysFileSync.class);
if (sysFileSync != null) {
- List<Source> sourceList = sysFileSync.getSourceList();
- if(sourceList.size() < 1){
- log.error("kafka data error, sourceList is null. kafka data: "+record.value().toString());
+ List<Source> sourceList = sysFileSync.getSource_list();
+ if (sourceList.size() < 1) {
+ log.error("kafka data error, sourceList is null. kafka data: " + record.value().toString());
monitorProperties.addFileSyncError();
- }else {
+ } else {
fileList.add(sourceList);
}
- }else {
- log.error("parse kafka data error. kafka data: "+record.value().toString());
+ } else {
+ log.error("parse kafka data error. kafka data: " + record.value().toString());
monitorProperties.addFileSyncError();
}
+ monitorProperties.addKafkaRecordCount();
}
- syncFiles.syncFiles(fileList);
+ while (true) {
+ if (threadPool.getActiveCount() < threadPool.getMaximumPoolSize()) {
+ threadPool.submit(() -> syncFiles.syncFiles(fileList));
+ break;
+ }
+ }
+ ack.acknowledge();
} catch (Exception e) {
log.error("consume kafka data error.", e);
monitorProperties.addFileSyncError();