summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhoujinchuan <[email protected]>2023-12-07 10:49:38 +0800
committerhoujinchuan <[email protected]>2023-12-07 10:49:38 +0800
commit6d89848e5cdea7d5c95c6ccdb3acaedd553bbfd3 (patch)
tree83bf57130b318ffa80ff56cd4180d681f4b6ff81
parent12265c10d1d981f34530f9f17f0d501c5b15ce30 (diff)
[improve][bootstrap] 优化ipfix connector,修改nacos和consul配置获取方式,修改udp通信模式
-rw-r--r--config/grootstream_job_template.yaml24
-rw-r--r--groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFIXUtils.java (renamed from groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/util/IPFIXUtils.java)19
-rw-r--r--groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorOptions.java56
-rw-r--r--groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorOptionsUtil.java36
-rw-r--r--groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java67
-rw-r--r--groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java11
-rw-r--r--groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/UDPSource.java192
-rw-r--r--groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/source/UDPSource.java167
8 files changed, 334 insertions, 238 deletions
diff --git a/config/grootstream_job_template.yaml b/config/grootstream_job_template.yaml
index 08424e7..c125ebb 100644
--- a/config/grootstream_job_template.yaml
+++ b/config/grootstream_job_template.yaml
@@ -38,6 +38,30 @@ sources:
format: json
json.ignore.parse.errors: false
+ ipfix_source:
+ type: ipfix
+# fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+# - name: log_id
+# type: bigint
+ properties:
+ initial.port: 12345
+ port.number: 3
+ max.attempts: 3
+ buffer.size: 65535
+ receive.buffer: 104857600
+ service.discovery.registry.mode: 0 # 0为nacos,1为consul,其他值为不使用服务发现,默认为0
+ service.discovery.service.name: udp_ipfix
+ service.discovery.health.check.interval: 5 # The time interval for reporting health status to the service registry center, in seconds.
+ service.discovery.nacos.server.addr: 192.168.44.12:8848
+ service.discovery.nacos.username: nacos
+ service.discovery.nacos.password: nacos
+ service.discovery.nacos.namespace: test
+ service.discovery.nacos.group: IPFIX
+# service.discovery.consul.server.addr: 192.168.41.30
+# service.discovery.consul.server.port: 8500
+# service.discovery.consul.token:
+
+
filters:
schema_type_filter:
type: com.geedgenetworks.core.filter.AviatorFilter
diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/util/IPFIXUtils.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFIXUtils.java
index 81dff59..50569f3 100644
--- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/util/IPFIXUtils.java
+++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFIXUtils.java
@@ -1,7 +1,6 @@
-package com.geedgenetworks.connectors.ipfix.collector.util;
+package com.geedgenetworks.connectors.ipfix.collector;
import cn.hutool.json.JSONArray;
-import com.geedgenetworks.connectors.ipfix.collector.IPFixSourceProvider;
import com.geedgenetworks.core.pojo.ipfix.Record;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
@@ -12,8 +11,6 @@ import scala.xml.Node;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
public class IPFIXUtils {
private static final Log logger = LogFactory.get();
@@ -42,18 +39,4 @@ public class IPFIXUtils {
}
return nodes;
}
-
- public static Map<String, Object> dataConverter(Map<String, Object> extractedFields, Map<String, IPFixSourceProvider.ValueConverter> filedConverters) throws Exception {
- Map<String, Object> newExtractedFields = new HashMap<>();
- if (filedConverters != null && filedConverters.size() > 0) {
- for (String field : filedConverters.keySet()) {
- if (extractedFields.containsKey(field)) {
- newExtractedFields.put(field, filedConverters.get(field).convert(extractedFields.get(field)));
- }
- }
- return newExtractedFields;
- } else {
- return extractedFields;
- }
- }
}
diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorOptions.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorOptions.java
index d084f3e..d1d5594 100644
--- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorOptions.java
+++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorOptions.java
@@ -11,12 +11,6 @@ public class IPFixConnectorOptions {
.noDefaultValue()
.withDescription("Initial port of the IPFix server.");
- public static final ConfigOption<String> SERVICE_DISCOVERY_SERVICE_NAME =
- ConfigOptions.key("service.discovery.service.name")
- .stringType()
- .noDefaultValue()
- .withDescription("The service name used for service discovery.");
-
public static final ConfigOption<Integer> PORT_NUMBER =
ConfigOptions.key("port.number")
.intType()
@@ -38,16 +32,62 @@ public class IPFixConnectorOptions {
.intType()
.defaultValue(104857600);
- public static final ConfigOption<Integer> SERVICE_DISCOVERY_REGISTER_MODE =
- ConfigOptions.key("service.discovery.register.mode")
+ public static final ConfigOption<Integer> SERVICE_DISCOVERY_REGISTRY_MODE =
+ ConfigOptions.key("service.discovery.registry.mode")
.intType()
.defaultValue(0)
.withDescription("Component used by service discovery.");
+ public static final ConfigOption<String> SERVICE_DISCOVERY_SERVICE_NAME =
+ ConfigOptions.key("service.discovery.service.name")
+ .stringType()
+ .defaultValue("udp_ipfix")
+ .withDescription("The service name used for service discovery.");
+
public static final ConfigOption<Integer> SERVICE_DISCOVERY_HEALTH_CHECK_INTERVAL =
ConfigOptions.key("service.discovery.health.check.interval")
.intType()
.defaultValue(5)
.withDescription("Required Kafka server connection string");
+ public static final ConfigOption<String> SERVICE_DISCOVERY_NACOS_SERVER_ADDR =
+ ConfigOptions.key("service.discovery.nacos.server.addr")
+ .stringType()
+ .noDefaultValue();
+
+ public static final ConfigOption<String> SERVICE_DISCOVERY_NACOS_USERNAME =
+ ConfigOptions.key("service.discovery.nacos.username")
+ .stringType()
+ .defaultValue("");
+
+ public static final ConfigOption<String> SERVICE_DISCOVERY_NACOS_PASSWORD =
+ ConfigOptions.key("service.discovery.nacos.password")
+ .stringType()
+ .defaultValue("");
+
+ public static final ConfigOption<String> SERVICE_DISCOVERY_NACOS_NAMESPACE =
+ ConfigOptions.key("service.discovery.nacos.namespace")
+ .stringType()
+ .defaultValue("public");
+
+ public static final ConfigOption<String> SERVICE_DISCOVERY_NACOS_GROUP =
+ ConfigOptions.key("service.discovery.nacos.group")
+ .stringType()
+ .defaultValue("DEFAULT_GROUP");
+
+ public static final ConfigOption<String> SERVICE_DISCOVERY_CONSUL_SERVER_ADDR =
+ ConfigOptions.key("service.discovery.consul.server.addr")
+ .stringType()
+ .noDefaultValue();
+
+ public static final ConfigOption<Integer> SERVICE_DISCOVERY_CONSUL_SERVER_PORT =
+ ConfigOptions.key("service.discovery.consul.server.port")
+ .intType()
+ .noDefaultValue();
+
+ public static final ConfigOption<String> SERVICE_DISCOVERY_CONSUL_TOKEN =
+ ConfigOptions.key("service.discovery.consul.token")
+ .stringType()
+ .defaultValue("");
+
}
diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorOptionsUtil.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorOptionsUtil.java
new file mode 100644
index 0000000..53778b0
--- /dev/null
+++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorOptionsUtil.java
@@ -0,0 +1,36 @@
+package com.geedgenetworks.connectors.ipfix.collector;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class IPFixConnectorOptionsUtil {
+
+ public static final String PROPERTIES_PREFIX = "service.";
+
+ public static Properties getServiceDiscoveryProperties(Map<String, String> tableOptions) {
+ final Properties serviceDiscoveryProperties = new Properties();
+
+ if (hasServiceDiscoveryProperties(tableOptions)) {
+ tableOptions.keySet().stream()
+ .filter(key -> key.startsWith(PROPERTIES_PREFIX))
+ .forEach(
+ key -> {
+ final String value = tableOptions.get(key);
+ if (!StringUtils.isBlank(value)) {
+ serviceDiscoveryProperties.put(key, value);
+ }
+ });
+ }
+ return serviceDiscoveryProperties;
+ }
+
+ /**
+ * Decides if the table options contains Kafka client properties that start with prefix
+ * 'properties'.
+ */
+ private static boolean hasServiceDiscoveryProperties(Map<String, String> tableOptions) {
+ return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
+ }
+}
diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java
index 34c31fd..e014d87 100644
--- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java
+++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java
@@ -2,16 +2,10 @@ package com.geedgenetworks.connectors.ipfix.collector;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.EngineConfig;
-import com.geedgenetworks.connectors.ipfix.collector.source.UDPSource;
-import com.geedgenetworks.connectors.ipfix.collector.util.IPFIXUtils;
import com.geedgenetworks.core.connector.source.SourceProvider;
import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.types.*;
import com.geedgenetworks.core.types.DataType;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -20,75 +14,70 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.cert.netsa.io.ipfix.*;
import scala.collection.Iterator;
-import scala.collection.concurrent.TrieMap;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;
+import static com.geedgenetworks.connectors.ipfix.collector.IPFixConnectorOptionsUtil.getServiceDiscoveryProperties;
+
public class IPFixSourceProvider implements SourceProvider {
private static final Log logger = LogFactory.get();
private final StructType physicalDataType;
private final ReadableConfig config;
+ private final Map<String, String> options;
private final Map<String, ValueConverter> filedConverters;
private static SessionGroup sessionGroup;
- public IPFixSourceProvider(StructType physicalDataType, ReadableConfig config, SessionGroup sessionGroup) {
+ public IPFixSourceProvider(StructType physicalDataType, ReadableConfig config, Map<String, String> options, SessionGroup sessionGroup) {
this.physicalDataType = physicalDataType;
this.config = config;
+ this.options = options;
IPFixSourceProvider.sessionGroup = sessionGroup;
this.filedConverters = physicalDataType != null ? Arrays.stream(physicalDataType.fields).collect(Collectors.toMap(f -> f.name, f -> this.makeConverter(f.dataType))) : null;
}
@Override
public SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env) {
- ExecutionConfig.GlobalJobParameters executionConfig = env.getConfig().getGlobalJobParameters();
- EngineConfig engineConfig = JSON.parseObject(executionConfig.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class);
+ Properties serviceDiscoveryProperties = getServiceDiscoveryProperties(options);
UDPSource udpSource = new UDPSource(config.get(IPFixConnectorOptions.INITIAL_PORT),
config.get(IPFixConnectorOptions.MAX_ATTEMPTS),
config.get(IPFixConnectorOptions.BUFFER_SIZE),
config.get(IPFixConnectorOptions.RECEIVE_BUFFER),
- config.get(IPFixConnectorOptions.SERVICE_DISCOVERY_SERVICE_NAME),
- config.get(IPFixConnectorOptions.SERVICE_DISCOVERY_REGISTER_MODE),
- config.get(IPFixConnectorOptions.SERVICE_DISCOVERY_HEALTH_CHECK_INTERVAL),
- engineConfig.getNacosConfig(),
- engineConfig.getConsulConfig());
- DataStream<byte[]> source = env.addSource(udpSource).setParallelism(config.getOptional(IPFixConnectorOptions.PORT_NUMBER).get());
+ serviceDiscoveryProperties);
+ DataStream<byte[]> source = env.addSource(udpSource).setParallelism(config.get(IPFixConnectorOptions.PORT_NUMBER));
return source.flatMap(new FlatMapFunction<byte[], Event>() {
@Override
public void flatMap(byte[] data, Collector<Event> out) {
try {
ByteBuffer buffer = ByteBuffer.wrap(data);
RecordReader reader = new RecordReader(new ByteBufferMessageReader(buffer, sessionGroup));
- if (reader.isEmpty()) {
- Iterator<Session> iterator = sessionGroup.iterator();
- while (iterator.hasNext()) {
- Session next = iterator.next();
- logger.info("domain: " + next.observationDomain());
- TrieMap<Object, Template> objectTemplateTrieMap = next.idGetTmpl();
- logger.info("Template: " + objectTemplateTrieMap.toString());
- }
- } else {
- while (reader.hasNext()) {
- Event event = new Event();
- Record record = reader.next();
- Iterator<Field> fields = record.fields();
- Map<String, Object> fieldMap = new HashMap<>();
- while (fields.hasNext()) {
- Field field = fields.next();
- fieldMap.put(field.ie().name(), field.value());
- }
- fieldMap.put(Event.INTERNAL_TIMESTAMP_KEY, record.exportTime().get().getEpochSecond());
+ while (reader.hasNext()) {
+ Event event = new Event();
+ Record record = reader.next();
+ Iterator<Field> fields = record.fields();
+ Map<String, Object> fieldMap = new HashMap<>();
+ while (fields.hasNext()) {
+ Field field = fields.next();
+ String fieldName = field.ie().name();
if (filedConverters != null) {
- fieldMap = IPFIXUtils.dataConverter(fieldMap, filedConverters);
+ ValueConverter valueConverter = filedConverters.get(field.ie().name());
+ if (valueConverter != null) {
+ fieldMap.put(fieldName, valueConverter.convert(field.value()));
+ }
+ } else {
+ fieldMap.put(fieldName, field.value());
}
- event.setExtractedFields(fieldMap);
- event.setFinal(true);
- out.collect(event);
}
+ fieldMap.put(Event.INTERNAL_TIMESTAMP_KEY, record.exportTime().get().getEpochSecond() * 1000);
+ event.setExtractedFields(fieldMap);
+ event.setFinal(true);
+ out.collect(event);
}
+ } catch (NoTemplateException e) {
+ logger.error(e.getMessage()+". The client did not send the template, or there was a delay in processing the template message." );
} catch (Exception e) {
logger.error("IPFix parsing failed.", e);
}
diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java
index 59f5504..056f612 100644
--- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java
+++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java
@@ -1,6 +1,5 @@
package com.geedgenetworks.connectors.ipfix.collector;
-import com.geedgenetworks.connectors.ipfix.collector.util.IPFIXUtils;
import com.geedgenetworks.core.connector.source.SourceProvider;
import com.geedgenetworks.core.factories.FactoryUtil;
import com.geedgenetworks.core.factories.SourceTableFactory;
@@ -13,8 +12,10 @@ import scala.xml.Node;
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
+import static com.geedgenetworks.connectors.ipfix.collector.IPFixConnectorOptionsUtil.PROPERTIES_PREFIX;
public class IPFixTableFactory implements SourceTableFactory {
public static final String IDENTIFIER = "ipfix";
@@ -22,16 +23,17 @@ public class IPFixTableFactory implements SourceTableFactory {
@Override
public SourceProvider getSourceProvider(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
- helper.validate();
+ helper.validateExcept(PROPERTIES_PREFIX);
StructType physicalDataType = context.getPhysicalDataType();
ReadableConfig config = context.getConfiguration();
+ Map<String, String> options = context.getOptions();
InfoModel model = InfoModel.getStandardInfoModel(true);
SessionGroup sessionGroup = new SessionGroup(model, null, false);
ArrayList<Node> elements = IPFIXUtils.getElements();
for (Node element : elements) {
model.importElements(element);
}
- return new IPFixSourceProvider(physicalDataType, config, sessionGroup);
+ return new IPFixSourceProvider(physicalDataType, config, options, sessionGroup);
}
@Override
@@ -43,7 +45,6 @@ public class IPFixTableFactory implements SourceTableFactory {
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(IPFixConnectorOptions.INITIAL_PORT);
- options.add(IPFixConnectorOptions.SERVICE_DISCOVERY_SERVICE_NAME);
return options;
}
@@ -54,8 +55,6 @@ public class IPFixTableFactory implements SourceTableFactory {
options.add(IPFixConnectorOptions.MAX_ATTEMPTS);
options.add(IPFixConnectorOptions.PORT_NUMBER);
options.add(IPFixConnectorOptions.RECEIVE_BUFFER);
- options.add(IPFixConnectorOptions.SERVICE_DISCOVERY_REGISTER_MODE);
- options.add(IPFixConnectorOptions.SERVICE_DISCOVERY_HEALTH_CHECK_INTERVAL);
return options;
}
}
diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/UDPSource.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/UDPSource.java
new file mode 100644
index 0000000..c9ee498
--- /dev/null
+++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/UDPSource.java
@@ -0,0 +1,192 @@
+package com.geedgenetworks.connectors.ipfix.collector;
+
+import cn.hutool.core.io.IoUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.nacos.api.PropertyKeyConst;
+import com.alibaba.nacos.api.exception.NacosException;
+import com.alibaba.nacos.api.naming.NamingFactory;
+import com.alibaba.nacos.api.naming.NamingService;
+import com.alibaba.nacos.api.naming.pojo.Instance;
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.agent.model.NewCheck;
+import com.ecwid.consul.v1.agent.model.NewService;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class UDPSource extends RichParallelSourceFunction<byte[]> {
+ private static final Log logger = LogFactory.get();
+
+ private static ScheduledExecutorService executorService;
+ private final int initialPort;
+ private final int maxAttempts;
+ private final int bufferSize;
+ private final int receiveBuffer;
+ private final String serviceName;
+ private int currentPort;
+ private String serverIp;
+ private final int registryMode;
+ private final Integer healthCheckInterval;
+ private final String nacosServerAddr;
+ private final String nacosUsername;
+ private final String nacosPassword;
+ private final String nacosNamespace;
+ private final String nacosGroup;
+ private final String consulServerAddr;
+ private final Integer consulServerPort;
+ private final String consulToken;
+ private transient DatagramChannel channel;
+ private transient NamingService namingService;
+ private transient ConsulClient consulClient;
+ private transient boolean stop;
+
+ public UDPSource(int initialPort, int maxAttempts, int bufferSize, int receiveBuffer, Properties serviceDiscoveryProperties) {
+ this.initialPort = initialPort;
+ this.bufferSize = bufferSize;
+ this.maxAttempts = maxAttempts;
+ this.receiveBuffer = receiveBuffer;
+ this.registryMode = Integer.parseInt(String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_REGISTRY_MODE.key())));
+ this.serviceName = registryMode == 0 || registryMode == 1 ? String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_SERVICE_NAME.key())) : null;
+ this.healthCheckInterval = registryMode == 0 || registryMode == 1 ? Integer.parseInt(String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_HEALTH_CHECK_INTERVAL.key()))) : null;
+ this.nacosServerAddr = registryMode == 0 ? String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_NACOS_SERVER_ADDR.key())) : null;
+ this.nacosUsername = registryMode == 0 ? String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_NACOS_USERNAME.key())) : null;
+ this.nacosPassword = registryMode == 0 ? String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_NACOS_PASSWORD.key())) : null;
+ this.nacosNamespace = registryMode == 0 ? String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_NACOS_NAMESPACE.key())) : null;
+ this.nacosGroup = registryMode == 0 ? String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_NACOS_GROUP.key())) : null;
+ this.consulServerAddr = registryMode == 1 ? String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_CONSUL_SERVER_ADDR.key())) : null;
+ this.consulServerPort = registryMode == 1 ? Integer.parseInt(String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_CONSUL_SERVER_PORT.key()))) : null;
+ this.consulToken = registryMode == 1 ? String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_CONSUL_TOKEN.key())) : null;
+ }
+
+ @Override
+ public void run(SourceContext<byte[]> ctx) {
+ try {
+ executorService = Executors.newScheduledThreadPool(maxAttempts);
+ channel = DatagramChannel.open();
+ channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBuffer);
+ channel.configureBlocking(false);
+ Selector selector = Selector.open();
+ currentPort = initialPort;
+ int attempt = 1;
+ while (attempt <= maxAttempts) {
+ try {
+ channel.bind(new InetSocketAddress(currentPort));
+ channel.register(selector, SelectionKey.OP_READ);
+ serverIp = InetAddress.getLocalHost().getHostAddress();
+ registerInstance(serverIp, currentPort);
+ break;
+ } catch (IOException e) {
+ if (e.getMessage().contains("Address already in use")) {
+ logger.info("端口 " + currentPort + " 已被占用");
+ currentPort++;
+ attempt++;
+ } else {
+ logger.error(e.toString(), e);
+ channel = null;
+ break;
+ }
+ } catch (NacosException e) {
+ logger.error(e.toString(), e);
+ channel = null;
+ }
+ }
+ ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+ while (!stop) {
+ selector.select();
+ Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
+ while (keyIterator.hasNext()) {
+ SelectionKey selectionKey = keyIterator.next();
+ if (selectionKey.isReadable()) {
+ DatagramChannel currentDatagramChannel = (DatagramChannel) selectionKey.channel();
+ currentDatagramChannel.receive(buffer);
+ buffer.flip();
+ byte[] data = new byte[buffer.remaining()];
+ buffer.get(data);
+ ctx.collect(data);
+ buffer.clear();
+ }
+ keyIterator.remove();
+ }
+ }
+ } catch (Exception e) {
+ logger.error(e.toString(), e);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ stop = true;
+ executorService.shutdownNow();
+ IoUtil.close(channel);
+ deregisterInstance(serverIp, currentPort);
+ }
+
+ private void registerInstance(String ip, int port) throws NacosException {
+ if (registryMode == 0) {
+ Properties properties = new Properties();
+ properties.put(PropertyKeyConst.SERVER_ADDR, nacosServerAddr);
+ properties.put(PropertyKeyConst.USERNAME, nacosUsername);
+ properties.put(PropertyKeyConst.PASSWORD, nacosPassword);
+ properties.put(PropertyKeyConst.NAMESPACE, nacosNamespace);
+ namingService = NamingFactory.createNamingService(properties);
+ Instance instance = new Instance();
+ instance.setIp(ip);
+ instance.setPort(port);
+ instance.setEphemeral(true);
+ namingService.registerInstance(serviceName, nacosGroup, instance);
+ executorService.scheduleAtFixedRate(() -> {
+ try {
+ namingService.registerInstance(serviceName, nacosGroup, instance);
+ } catch (NacosException e) {
+ logger.error("nacos check instance healthy error.", e);
+ }
+ }, 0, healthCheckInterval, TimeUnit.SECONDS);
+ } else if (registryMode == 1) {
+ consulClient = new ConsulClient(consulServerAddr, consulServerPort);
+ NewService newService = new NewService();
+ newService.setName(serviceName);
+ newService.setId(ip + ":" + port);
+ newService.setAddress(ip);
+ newService.setPort(port);
+ consulClient.agentServiceRegister(newService, consulToken);
+ NewCheck newCheck = new NewCheck();
+ newCheck.setTtl(healthCheckInterval + "s");
+ newCheck.setName(ip + ":" + port);
+ newCheck.setId(ip + ":" + port);
+ newCheck.setServiceId(ip + ":" + port);
+ newCheck.setDeregisterCriticalServiceAfter("60s");
+ consulClient.agentCheckRegister(newCheck);
+ executorService.scheduleAtFixedRate(() -> consulClient.agentCheckPass(ip + ":" + port, null, consulToken),
+ 0, healthCheckInterval - 1, TimeUnit.SECONDS);
+ }
+ }
+
+ private void deregisterInstance(String ip, int port) {
+ try {
+ if (registryMode == 0) {
+ Instance instance = new Instance();
+ instance.setIp(ip);
+ instance.setPort(port);
+ instance.setEphemeral(true);
+ namingService.deregisterInstance(serviceName, nacosGroup, instance);
+ } else if (registryMode == 1) {
+ consulClient.agentServiceDeregister(ip + ":" + port, consulToken);
+ }
+ } catch (Exception e) {
+ logger.error(e.toString(), e);
+ }
+ }
+}
diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/source/UDPSource.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/source/UDPSource.java
deleted file mode 100644
index 7329af2..0000000
--- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/source/UDPSource.java
+++ /dev/null
@@ -1,167 +0,0 @@
-package com.geedgenetworks.connectors.ipfix.collector.source;
-
-import cn.hutool.core.io.IoUtil;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.nacos.api.PropertyKeyConst;
-import com.alibaba.nacos.api.exception.NacosException;
-import com.alibaba.nacos.api.naming.NamingFactory;
-import com.alibaba.nacos.api.naming.NamingService;
-import com.alibaba.nacos.api.naming.pojo.Instance;
-import com.ecwid.consul.v1.ConsulClient;
-import com.ecwid.consul.v1.agent.model.NewCheck;
-import com.ecwid.consul.v1.agent.model.NewService;
-import com.geedgenetworks.common.config.ConsulConfig;
-import com.geedgenetworks.common.config.NacosConfig;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.StandardSocketOptions;
-import java.nio.ByteBuffer;
-import java.nio.channels.DatagramChannel;
-import java.util.Properties;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class UDPSource extends RichParallelSourceFunction<byte[]> {
- private static final Log logger = LogFactory.get();
-
- private final int initialPort;
- private final int maxAttempts;
- private final int bufferSize;
- private final int receiveBuffer;
- private final String serviceName;
- private transient DatagramChannel channel;
- private transient NamingService namingService;
- private transient ConsulClient consulClient;
- private int currentPort;
- private String serverIp;
- private int registerMode;
- private int healthCheckInterval;
- private NacosConfig nacosConfig;
- private ConsulConfig consulConfig;
-
- public UDPSource(int initialPort, int maxAttempts, int bufferSize, int receiveBuffer, String serviceName, int registerMode, int healthCheckInterval, NacosConfig nacosConfig, ConsulConfig consulConfig) {
- this.initialPort = initialPort;
- this.bufferSize = bufferSize;
- this.maxAttempts = maxAttempts;
- this.receiveBuffer = receiveBuffer;
- this.serviceName = serviceName;
- this.registerMode = registerMode;
- this.healthCheckInterval = healthCheckInterval;
- this.nacosConfig = nacosConfig;
- this.consulConfig = consulConfig;
- }
-
- @Override
- public void run(SourceContext<byte[]> ctx) {
- try {
- channel = DatagramChannel.open();
- channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBuffer);
- channel.configureBlocking(false);
- currentPort = initialPort;
- int attempt = 1;
- while (attempt <= maxAttempts) {
- try {
- channel.bind(new InetSocketAddress(currentPort));
- InetAddress inetAddress = InetAddress.getLocalHost();
- serverIp = inetAddress.getHostAddress();
- registerInstance(serverIp, currentPort, nacosConfig, consulConfig);
- break;
- } catch (IOException e) {
- // 捕获端口占用异常
- if (e.getMessage().contains("Address already in use")) {
- logger.info("端口 " + currentPort + " 已被占用");
- currentPort++;
- attempt++;
- } else {
- logger.error(e.toString(), e);
- channel = null;
- break;
- }
- } catch (NacosException e) {
- logger.error(e.toString(), e);
- channel = null;
- }
- }
- ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
- while (true) {
- if (channel.receive(buffer) != null) {
- buffer.flip();
- byte[] data = new byte[buffer.remaining()];
- buffer.get(data);
- ctx.collect(data);
- buffer.clear();
- }
- }
- } catch (Exception e) {
- logger.error(e.toString(), e);
- }
- }
-
- @Override
- public void cancel() {
- IoUtil.close(channel);
- deregisterInstance(serverIp, currentPort, consulConfig);
- }
-
- private void registerInstance(String ip, int port, NacosConfig nacosConfig, ConsulConfig consulConfig) throws NacosException {
- ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
- if (registerMode == 0) {
- Properties properties = new Properties();
- properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosConfig.getServerAddr());
- properties.setProperty(PropertyKeyConst.USERNAME, nacosConfig.getUserName());
- properties.setProperty(PropertyKeyConst.PASSWORD, nacosConfig.getPassword());
- properties.setProperty(PropertyKeyConst.NAMESPACE, nacosConfig.getNamespace());
- namingService = NamingFactory.createNamingService(properties);
- Instance instance = new Instance();
- instance.setIp(ip);
- instance.setPort(port);
- instance.setEphemeral(true);
- namingService.registerInstance(serviceName, instance);
- executorService.scheduleAtFixedRate(() -> {
- try {
- namingService.registerInstance(serviceName, instance);
- } catch (NacosException e) {
- logger.error("nacos check instance healthy error.", e);
- }
- }, 0, healthCheckInterval, TimeUnit.SECONDS);
- } else if (registerMode == 1) {
- consulClient = new ConsulClient(consulConfig.getServerAddr(), consulConfig.getServerPort());
- NewService newService = new NewService();
- newService.setName(serviceName);
- newService.setId(ip + ":" + port);
- newService.setAddress(ip);
- newService.setPort(port);
- consulClient.agentServiceRegister(newService, consulConfig.getToken());
- NewCheck newCheck = new NewCheck();
- newCheck.setTtl(healthCheckInterval + "s");
- newCheck.setName(ip + ":" + port);
- newCheck.setId(ip + ":" + port);
- newCheck.setServiceId(ip + ":" + port);
- newCheck.setDeregisterCriticalServiceAfter("60s");
- consulClient.agentCheckRegister(newCheck);
- executorService.scheduleAtFixedRate(() -> consulClient.agentCheckPass(ip + ":" + port, null, consulConfig.getToken()),
- 0, healthCheckInterval - 1, TimeUnit.SECONDS);
- }
- }
-
- private void deregisterInstance(String ip, int port, ConsulConfig consulConfig) {
- try {
- if (registerMode == 0) {
- Instance instance = new Instance();
- instance.setIp(ip);
- instance.setPort(port);
- instance.setEphemeral(false);
- namingService.deregisterInstance(serviceName, instance);
- } else if (registerMode == 1) {
- consulClient.agentServiceDeregister(ip + ":" + port, consulConfig.getToken());
- }
- } catch (Exception e) {
- logger.error(e.toString(), e);
- }
- }
-}