diff options
| author | houjinchuan <[email protected]> | 2023-12-07 10:49:38 +0800 |
|---|---|---|
| committer | houjinchuan <[email protected]> | 2023-12-07 10:49:38 +0800 |
| commit | 6d89848e5cdea7d5c95c6ccdb3acaedd553bbfd3 (patch) | |
| tree | 83bf57130b318ffa80ff56cd4180d681f4b6ff81 /groot-connectors | |
| parent | 12265c10d1d981f34530f9f17f0d501c5b15ce30 (diff) | |
[improve][bootstrap] 优化ipfix connector,修改nacos和consul配置获取方式,修改udp通信模式
Diffstat (limited to 'groot-connectors')
7 files changed, 310 insertions, 238 deletions
diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/util/IPFIXUtils.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFIXUtils.java index 81dff59..50569f3 100644 --- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/util/IPFIXUtils.java +++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFIXUtils.java @@ -1,7 +1,6 @@ -package com.geedgenetworks.connectors.ipfix.collector.util; +package com.geedgenetworks.connectors.ipfix.collector; import cn.hutool.json.JSONArray; -import com.geedgenetworks.connectors.ipfix.collector.IPFixSourceProvider; import com.geedgenetworks.core.pojo.ipfix.Record; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; @@ -12,8 +11,6 @@ import scala.xml.Node; import java.io.File; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; public class IPFIXUtils { private static final Log logger = LogFactory.get(); @@ -42,18 +39,4 @@ public class IPFIXUtils { } return nodes; } - - public static Map<String, Object> dataConverter(Map<String, Object> extractedFields, Map<String, IPFixSourceProvider.ValueConverter> filedConverters) throws Exception { - Map<String, Object> newExtractedFields = new HashMap<>(); - if (filedConverters != null && filedConverters.size() > 0) { - for (String field : filedConverters.keySet()) { - if (extractedFields.containsKey(field)) { - newExtractedFields.put(field, filedConverters.get(field).convert(extractedFields.get(field))); - } - } - return newExtractedFields; - } else { - return extractedFields; - } - } } diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorOptions.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorOptions.java index d084f3e..d1d5594 100644 --- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorOptions.java +++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorOptions.java @@ -11,12 +11,6 @@ public class IPFixConnectorOptions { .noDefaultValue() .withDescription("Initial port of the IPFix server."); - public static final ConfigOption<String> SERVICE_DISCOVERY_SERVICE_NAME = - ConfigOptions.key("service.discovery.service.name") - .stringType() - .noDefaultValue() - .withDescription("The service name used for service discovery."); - public static final ConfigOption<Integer> PORT_NUMBER = ConfigOptions.key("port.number") .intType() @@ -38,16 +32,62 @@ public class IPFixConnectorOptions { .intType() .defaultValue(104857600); - public static final ConfigOption<Integer> SERVICE_DISCOVERY_REGISTER_MODE = - ConfigOptions.key("service.discovery.register.mode") + public static final ConfigOption<Integer> SERVICE_DISCOVERY_REGISTRY_MODE = + ConfigOptions.key("service.discovery.registry.mode") .intType() .defaultValue(0) .withDescription("Component used by service discovery."); + public static final ConfigOption<String> SERVICE_DISCOVERY_SERVICE_NAME = + ConfigOptions.key("service.discovery.service.name") + .stringType() + .defaultValue("udp_ipfix") + .withDescription("The service name used for service discovery."); + public static final ConfigOption<Integer> SERVICE_DISCOVERY_HEALTH_CHECK_INTERVAL = ConfigOptions.key("service.discovery.health.check.interval") .intType() .defaultValue(5) .withDescription("Required Kafka server connection string"); + public static final ConfigOption<String> SERVICE_DISCOVERY_NACOS_SERVER_ADDR = + ConfigOptions.key("service.discovery.nacos.server.addr") + .stringType() + .noDefaultValue(); + + public static final ConfigOption<String> SERVICE_DISCOVERY_NACOS_USERNAME = + ConfigOptions.key("service.discovery.nacos.username") + .stringType() + .defaultValue(""); + + public static final ConfigOption<String> SERVICE_DISCOVERY_NACOS_PASSWORD = + ConfigOptions.key("service.discovery.nacos.password") + .stringType() + .defaultValue(""); + + public static final ConfigOption<String> SERVICE_DISCOVERY_NACOS_NAMESPACE = + ConfigOptions.key("service.discovery.nacos.namespace") + .stringType() + .defaultValue("public"); + + public static final ConfigOption<String> SERVICE_DISCOVERY_NACOS_GROUP = + ConfigOptions.key("service.discovery.nacos.group") + .stringType() + .defaultValue("DEFAULT_GROUP"); + + public static final ConfigOption<String> SERVICE_DISCOVERY_CONSUL_SERVER_ADDR = + ConfigOptions.key("service.discovery.consul.server.addr") + .stringType() + .noDefaultValue(); + + public static final ConfigOption<Integer> SERVICE_DISCOVERY_CONSUL_SERVER_PORT = + ConfigOptions.key("service.discovery.consul.server.port") + .intType() + .noDefaultValue(); + + public static final ConfigOption<String> SERVICE_DISCOVERY_CONSUL_TOKEN = + ConfigOptions.key("service.discovery.consul.token") + .stringType() + .defaultValue(""); + } diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorOptionsUtil.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorOptionsUtil.java new file mode 100644 index 0000000..53778b0 --- /dev/null +++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixConnectorOptionsUtil.java @@ -0,0 +1,36 @@ +package com.geedgenetworks.connectors.ipfix.collector; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Map; +import java.util.Properties; + +public class IPFixConnectorOptionsUtil { + + public static final String PROPERTIES_PREFIX = "service."; + + public static Properties getServiceDiscoveryProperties(Map<String, String> tableOptions) { + final Properties serviceDiscoveryProperties = new Properties(); + + if (hasServiceDiscoveryProperties(tableOptions)) { + tableOptions.keySet().stream() + .filter(key -> key.startsWith(PROPERTIES_PREFIX)) + .forEach( + key -> { + final String value = tableOptions.get(key); + if (!StringUtils.isBlank(value)) { + serviceDiscoveryProperties.put(key, value); + } + }); + } + return serviceDiscoveryProperties; + } + + /** + * Decides if the table options contains Kafka client properties that start with prefix + * 'properties'. + */ + private static boolean hasServiceDiscoveryProperties(Map<String, String> tableOptions) { + return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX)); + } +} diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java index 34c31fd..e014d87 100644 --- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java +++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java @@ -2,16 +2,10 @@ package com.geedgenetworks.connectors.ipfix.collector; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.alibaba.fastjson2.JSON; -import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.EngineConfig; -import com.geedgenetworks.connectors.ipfix.collector.source.UDPSource; -import com.geedgenetworks.connectors.ipfix.collector.util.IPFIXUtils; import com.geedgenetworks.core.connector.source.SourceProvider; import com.geedgenetworks.core.pojo.Event; import com.geedgenetworks.core.types.*; import com.geedgenetworks.core.types.DataType; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.datastream.DataStream; @@ -20,75 +14,70 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import org.cert.netsa.io.ipfix.*; import scala.collection.Iterator; -import scala.collection.concurrent.TrieMap; import java.io.Serializable; import java.nio.ByteBuffer; import java.util.*; import java.util.stream.Collectors; +import static com.geedgenetworks.connectors.ipfix.collector.IPFixConnectorOptionsUtil.getServiceDiscoveryProperties; + public class IPFixSourceProvider implements SourceProvider { private static final Log logger = LogFactory.get(); private final StructType physicalDataType; private final ReadableConfig config; + private final Map<String, String> options; private final Map<String, ValueConverter> filedConverters; private static SessionGroup sessionGroup; - public IPFixSourceProvider(StructType physicalDataType, ReadableConfig config, SessionGroup sessionGroup) { + public IPFixSourceProvider(StructType physicalDataType, ReadableConfig config, Map<String, String> options, SessionGroup sessionGroup) { this.physicalDataType = physicalDataType; this.config = config; + this.options = options; IPFixSourceProvider.sessionGroup = sessionGroup; this.filedConverters = physicalDataType != null ? Arrays.stream(physicalDataType.fields).collect(Collectors.toMap(f -> f.name, f -> this.makeConverter(f.dataType))) : null; } @Override public SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env) { - ExecutionConfig.GlobalJobParameters executionConfig = env.getConfig().getGlobalJobParameters(); - EngineConfig engineConfig = JSON.parseObject(executionConfig.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class); + Properties serviceDiscoveryProperties = getServiceDiscoveryProperties(options); UDPSource udpSource = new UDPSource(config.get(IPFixConnectorOptions.INITIAL_PORT), config.get(IPFixConnectorOptions.MAX_ATTEMPTS), config.get(IPFixConnectorOptions.BUFFER_SIZE), config.get(IPFixConnectorOptions.RECEIVE_BUFFER), - config.get(IPFixConnectorOptions.SERVICE_DISCOVERY_SERVICE_NAME), - config.get(IPFixConnectorOptions.SERVICE_DISCOVERY_REGISTER_MODE), - config.get(IPFixConnectorOptions.SERVICE_DISCOVERY_HEALTH_CHECK_INTERVAL), - engineConfig.getNacosConfig(), - engineConfig.getConsulConfig()); - DataStream<byte[]> source = env.addSource(udpSource).setParallelism(config.getOptional(IPFixConnectorOptions.PORT_NUMBER).get()); + serviceDiscoveryProperties); + DataStream<byte[]> source = env.addSource(udpSource).setParallelism(config.get(IPFixConnectorOptions.PORT_NUMBER)); return source.flatMap(new FlatMapFunction<byte[], Event>() { @Override public void flatMap(byte[] data, Collector<Event> out) { try { ByteBuffer buffer = ByteBuffer.wrap(data); RecordReader reader = new RecordReader(new ByteBufferMessageReader(buffer, sessionGroup)); - if (reader.isEmpty()) { - Iterator<Session> iterator = sessionGroup.iterator(); - while (iterator.hasNext()) { - Session next = iterator.next(); - logger.info("domain: " + next.observationDomain()); - TrieMap<Object, Template> objectTemplateTrieMap = next.idGetTmpl(); - logger.info("Template: " + objectTemplateTrieMap.toString()); - } - } else { - while (reader.hasNext()) { - Event event = new Event(); - Record record = reader.next(); - Iterator<Field> fields = record.fields(); - Map<String, Object> fieldMap = new HashMap<>(); - while (fields.hasNext()) { - Field field = fields.next(); - fieldMap.put(field.ie().name(), field.value()); - } - fieldMap.put(Event.INTERNAL_TIMESTAMP_KEY, record.exportTime().get().getEpochSecond()); + while (reader.hasNext()) { + Event event = new Event(); + Record record = reader.next(); + Iterator<Field> fields = record.fields(); + Map<String, Object> fieldMap = new HashMap<>(); + while (fields.hasNext()) { + Field field = fields.next(); + String fieldName = field.ie().name(); if (filedConverters != null) { - fieldMap = IPFIXUtils.dataConverter(fieldMap, filedConverters); + ValueConverter valueConverter = filedConverters.get(field.ie().name()); + if (valueConverter != null) { + fieldMap.put(fieldName, valueConverter.convert(field.value())); + } + } else { + fieldMap.put(fieldName, field.value()); } - event.setExtractedFields(fieldMap); - event.setFinal(true); - out.collect(event); } + fieldMap.put(Event.INTERNAL_TIMESTAMP_KEY, record.exportTime().get().getEpochSecond() * 1000); + event.setExtractedFields(fieldMap); + event.setFinal(true); + out.collect(event); } + } catch (NoTemplateException e) { + logger.error(e.getMessage()+". The client did not send the template, or there was a delay in processing the template message." ); } catch (Exception e) { logger.error("IPFix parsing failed.", e); } diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java index 59f5504..056f612 100644 --- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java +++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java @@ -1,6 +1,5 @@ package com.geedgenetworks.connectors.ipfix.collector; -import com.geedgenetworks.connectors.ipfix.collector.util.IPFIXUtils; import com.geedgenetworks.core.connector.source.SourceProvider; import com.geedgenetworks.core.factories.FactoryUtil; import com.geedgenetworks.core.factories.SourceTableFactory; @@ -13,8 +12,10 @@ import scala.xml.Node; import java.util.ArrayList; import java.util.HashSet; +import java.util.Map; import java.util.Set; +import static com.geedgenetworks.connectors.ipfix.collector.IPFixConnectorOptionsUtil.PROPERTIES_PREFIX; public class IPFixTableFactory implements SourceTableFactory { public static final String IDENTIFIER = "ipfix"; @@ -22,16 +23,17 @@ public class IPFixTableFactory implements SourceTableFactory { @Override public SourceProvider getSourceProvider(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); - helper.validate(); + helper.validateExcept(PROPERTIES_PREFIX); StructType physicalDataType = context.getPhysicalDataType(); ReadableConfig config = context.getConfiguration(); + Map<String, String> options = context.getOptions(); InfoModel model = InfoModel.getStandardInfoModel(true); SessionGroup sessionGroup = new SessionGroup(model, null, false); ArrayList<Node> elements = IPFIXUtils.getElements(); for (Node element : elements) { model.importElements(element); } - return new IPFixSourceProvider(physicalDataType, config, sessionGroup); + return new IPFixSourceProvider(physicalDataType, config, options, sessionGroup); } @Override @@ -43,7 +45,6 @@ public class IPFixTableFactory implements SourceTableFactory { public Set<ConfigOption<?>> requiredOptions() { final Set<ConfigOption<?>> options = new HashSet<>(); options.add(IPFixConnectorOptions.INITIAL_PORT); - options.add(IPFixConnectorOptions.SERVICE_DISCOVERY_SERVICE_NAME); return options; } @@ -54,8 +55,6 @@ public class IPFixTableFactory implements SourceTableFactory { options.add(IPFixConnectorOptions.MAX_ATTEMPTS); options.add(IPFixConnectorOptions.PORT_NUMBER); options.add(IPFixConnectorOptions.RECEIVE_BUFFER); - options.add(IPFixConnectorOptions.SERVICE_DISCOVERY_REGISTER_MODE); - options.add(IPFixConnectorOptions.SERVICE_DISCOVERY_HEALTH_CHECK_INTERVAL); return options; } } diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/UDPSource.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/UDPSource.java new file mode 100644 index 0000000..c9ee498 --- /dev/null +++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/UDPSource.java @@ -0,0 +1,192 @@ +package com.geedgenetworks.connectors.ipfix.collector; + +import cn.hutool.core.io.IoUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.NamingFactory; +import com.alibaba.nacos.api.naming.NamingService; +import com.alibaba.nacos.api.naming.pojo.Instance; +import com.ecwid.consul.v1.ConsulClient; +import com.ecwid.consul.v1.agent.model.NewCheck; +import com.ecwid.consul.v1.agent.model.NewService; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.Iterator; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class UDPSource extends RichParallelSourceFunction<byte[]> { + private static final Log logger = LogFactory.get(); + + private static ScheduledExecutorService executorService; + private final int initialPort; + private final int maxAttempts; + private final int bufferSize; + private final int receiveBuffer; + private final String serviceName; + private int currentPort; + private String serverIp; + private final int registryMode; + private final Integer healthCheckInterval; + private final String nacosServerAddr; + private final String nacosUsername; + private final String nacosPassword; + private final String nacosNamespace; + private final String nacosGroup; + private final String consulServerAddr; + private final Integer consulServerPort; + private final String consulToken; + private transient DatagramChannel channel; + private transient NamingService namingService; + private transient ConsulClient consulClient; + private transient boolean stop; + + public UDPSource(int initialPort, int maxAttempts, int bufferSize, int receiveBuffer, Properties serviceDiscoveryProperties) { + this.initialPort = initialPort; + this.bufferSize = bufferSize; + this.maxAttempts = maxAttempts; + this.receiveBuffer = receiveBuffer; + this.registryMode = Integer.parseInt(String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_REGISTRY_MODE.key()))); + this.serviceName = registryMode == 0 || registryMode == 1 ? String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_SERVICE_NAME.key())) : null; + this.healthCheckInterval = registryMode == 0 || registryMode == 1 ? Integer.parseInt(String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_HEALTH_CHECK_INTERVAL.key()))) : null; + this.nacosServerAddr = registryMode == 0 ? String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_NACOS_SERVER_ADDR.key())) : null; + this.nacosUsername = registryMode == 0 ? String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_NACOS_USERNAME.key())) : null; + this.nacosPassword = registryMode == 0 ? String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_NACOS_PASSWORD.key())) : null; + this.nacosNamespace = registryMode == 0 ? String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_NACOS_NAMESPACE.key())) : null; + this.nacosGroup = registryMode == 0 ? String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_NACOS_GROUP.key())) : null; + this.consulServerAddr = registryMode == 1 ? String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_CONSUL_SERVER_ADDR.key())) : null; + this.consulServerPort = registryMode == 1 ? Integer.parseInt(String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_CONSUL_SERVER_PORT.key()))) : null; + this.consulToken = registryMode == 1 ? String.valueOf(serviceDiscoveryProperties.get(IPFixConnectorOptions.SERVICE_DISCOVERY_CONSUL_TOKEN.key())) : null; + } + + @Override + public void run(SourceContext<byte[]> ctx) { + try { + executorService = Executors.newScheduledThreadPool(maxAttempts); + channel = DatagramChannel.open(); + channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBuffer); + channel.configureBlocking(false); + Selector selector = Selector.open(); + currentPort = initialPort; + int attempt = 1; + while (attempt <= maxAttempts) { + try { + channel.bind(new InetSocketAddress(currentPort)); + channel.register(selector, SelectionKey.OP_READ); + serverIp = InetAddress.getLocalHost().getHostAddress(); + registerInstance(serverIp, currentPort); + break; + } catch (IOException e) { + if (e.getMessage().contains("Address already in use")) { + logger.info("端口 " + currentPort + " 已被占用"); + currentPort++; + attempt++; + } else { + logger.error(e.toString(), e); + channel = null; + break; + } + } catch (NacosException e) { + logger.error(e.toString(), e); + channel = null; + } + } + ByteBuffer buffer = ByteBuffer.allocate(bufferSize); + while (!stop) { + selector.select(); + Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); + while (keyIterator.hasNext()) { + SelectionKey selectionKey = keyIterator.next(); + if (selectionKey.isReadable()) { + DatagramChannel currentDatagramChannel = (DatagramChannel) selectionKey.channel(); + currentDatagramChannel.receive(buffer); + buffer.flip(); + byte[] data = new byte[buffer.remaining()]; + buffer.get(data); + ctx.collect(data); + buffer.clear(); + } + keyIterator.remove(); + } + } + } catch (Exception e) { + logger.error(e.toString(), e); + } + } + + @Override + public void cancel() { + stop = true; + executorService.shutdownNow(); + IoUtil.close(channel); + deregisterInstance(serverIp, currentPort); + } + + private void registerInstance(String ip, int port) throws NacosException { + if (registryMode == 0) { + Properties properties = new Properties(); + properties.put(PropertyKeyConst.SERVER_ADDR, nacosServerAddr); + properties.put(PropertyKeyConst.USERNAME, nacosUsername); + properties.put(PropertyKeyConst.PASSWORD, nacosPassword); + properties.put(PropertyKeyConst.NAMESPACE, nacosNamespace); + namingService = NamingFactory.createNamingService(properties); + Instance instance = new Instance(); + instance.setIp(ip); + instance.setPort(port); + instance.setEphemeral(true); + namingService.registerInstance(serviceName, nacosGroup, instance); + executorService.scheduleAtFixedRate(() -> { + try { + namingService.registerInstance(serviceName, nacosGroup, instance); + } catch (NacosException e) { + logger.error("nacos check instance healthy error.", e); + } + }, 0, healthCheckInterval, TimeUnit.SECONDS); + } else if (registryMode == 1) { + consulClient = new ConsulClient(consulServerAddr, consulServerPort); + NewService newService = new NewService(); + newService.setName(serviceName); + newService.setId(ip + ":" + port); + newService.setAddress(ip); + newService.setPort(port); + consulClient.agentServiceRegister(newService, consulToken); + NewCheck newCheck = new NewCheck(); + newCheck.setTtl(healthCheckInterval + "s"); + newCheck.setName(ip + ":" + port); + newCheck.setId(ip + ":" + port); + newCheck.setServiceId(ip + ":" + port); + newCheck.setDeregisterCriticalServiceAfter("60s"); + consulClient.agentCheckRegister(newCheck); + executorService.scheduleAtFixedRate(() -> consulClient.agentCheckPass(ip + ":" + port, null, consulToken), + 0, healthCheckInterval - 1, TimeUnit.SECONDS); + } + } + + private void deregisterInstance(String ip, int port) { + try { + if (registryMode == 0) { + Instance instance = new Instance(); + instance.setIp(ip); + instance.setPort(port); + instance.setEphemeral(true); + namingService.deregisterInstance(serviceName, nacosGroup, instance); + } else if (registryMode == 1) { + consulClient.agentServiceDeregister(ip + ":" + port, consulToken); + } + } catch (Exception e) { + logger.error(e.toString(), e); + } + } +} diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/source/UDPSource.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/source/UDPSource.java deleted file mode 100644 index 7329af2..0000000 --- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/source/UDPSource.java +++ /dev/null @@ -1,167 +0,0 @@ -package com.geedgenetworks.connectors.ipfix.collector.source; - -import cn.hutool.core.io.IoUtil; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.api.naming.NamingFactory; -import com.alibaba.nacos.api.naming.NamingService; -import com.alibaba.nacos.api.naming.pojo.Instance; -import com.ecwid.consul.v1.ConsulClient; -import com.ecwid.consul.v1.agent.model.NewCheck; -import com.ecwid.consul.v1.agent.model.NewService; -import com.geedgenetworks.common.config.ConsulConfig; -import com.geedgenetworks.common.config.NacosConfig; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.StandardSocketOptions; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; -import java.util.Properties; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class UDPSource extends RichParallelSourceFunction<byte[]> { - private static final Log logger = LogFactory.get(); - - private final int initialPort; - private final int maxAttempts; - private final int bufferSize; - private final int receiveBuffer; - private final String serviceName; - private transient DatagramChannel channel; - private transient NamingService namingService; - private transient ConsulClient consulClient; - private int currentPort; - private String serverIp; - private int registerMode; - private int healthCheckInterval; - private NacosConfig nacosConfig; - private ConsulConfig consulConfig; - - public UDPSource(int initialPort, int maxAttempts, int bufferSize, int receiveBuffer, String serviceName, int registerMode, int healthCheckInterval, NacosConfig nacosConfig, ConsulConfig consulConfig) { - this.initialPort = initialPort; - this.bufferSize = bufferSize; - this.maxAttempts = maxAttempts; - this.receiveBuffer = receiveBuffer; - this.serviceName = serviceName; - this.registerMode = registerMode; - this.healthCheckInterval = healthCheckInterval; - this.nacosConfig = nacosConfig; - this.consulConfig = consulConfig; - } - - @Override - public void run(SourceContext<byte[]> ctx) { - try { - channel = DatagramChannel.open(); - channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBuffer); - channel.configureBlocking(false); - currentPort = initialPort; - int attempt = 1; - while (attempt <= maxAttempts) { - try { - channel.bind(new InetSocketAddress(currentPort)); - InetAddress inetAddress = InetAddress.getLocalHost(); - serverIp = inetAddress.getHostAddress(); - registerInstance(serverIp, currentPort, nacosConfig, consulConfig); - break; - } catch (IOException e) { - // 捕获端口占用异常 - if (e.getMessage().contains("Address already in use")) { - logger.info("端口 " + currentPort + " 已被占用"); - currentPort++; - attempt++; - } else { - logger.error(e.toString(), e); - channel = null; - break; - } - } catch (NacosException e) { - logger.error(e.toString(), e); - channel = null; - } - } - ByteBuffer buffer = ByteBuffer.allocate(bufferSize); - while (true) { - if (channel.receive(buffer) != null) { - buffer.flip(); - byte[] data = new byte[buffer.remaining()]; - buffer.get(data); - ctx.collect(data); - buffer.clear(); - } - } - } catch (Exception e) { - logger.error(e.toString(), e); - } - } - - @Override - public void cancel() { - IoUtil.close(channel); - deregisterInstance(serverIp, currentPort, consulConfig); - } - - private void registerInstance(String ip, int port, NacosConfig nacosConfig, ConsulConfig consulConfig) throws NacosException { - ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - if (registerMode == 0) { - Properties properties = new Properties(); - properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosConfig.getServerAddr()); - properties.setProperty(PropertyKeyConst.USERNAME, nacosConfig.getUserName()); - properties.setProperty(PropertyKeyConst.PASSWORD, nacosConfig.getPassword()); - properties.setProperty(PropertyKeyConst.NAMESPACE, nacosConfig.getNamespace()); - namingService = NamingFactory.createNamingService(properties); - Instance instance = new Instance(); - instance.setIp(ip); - instance.setPort(port); - instance.setEphemeral(true); - namingService.registerInstance(serviceName, instance); - executorService.scheduleAtFixedRate(() -> { - try { - namingService.registerInstance(serviceName, instance); - } catch (NacosException e) { - logger.error("nacos check instance healthy error.", e); - } - }, 0, healthCheckInterval, TimeUnit.SECONDS); - } else if (registerMode == 1) { - consulClient = new ConsulClient(consulConfig.getServerAddr(), consulConfig.getServerPort()); - NewService newService = new NewService(); - newService.setName(serviceName); - newService.setId(ip + ":" + port); - newService.setAddress(ip); - newService.setPort(port); - consulClient.agentServiceRegister(newService, consulConfig.getToken()); - NewCheck newCheck = new NewCheck(); - newCheck.setTtl(healthCheckInterval + "s"); - newCheck.setName(ip + ":" + port); - newCheck.setId(ip + ":" + port); - newCheck.setServiceId(ip + ":" + port); - newCheck.setDeregisterCriticalServiceAfter("60s"); - consulClient.agentCheckRegister(newCheck); - executorService.scheduleAtFixedRate(() -> consulClient.agentCheckPass(ip + ":" + port, null, consulConfig.getToken()), - 0, healthCheckInterval - 1, TimeUnit.SECONDS); - } - } - - private void deregisterInstance(String ip, int port, ConsulConfig consulConfig) { - try { - if (registerMode == 0) { - Instance instance = new Instance(); - instance.setIp(ip); - instance.setPort(port); - instance.setEphemeral(false); - namingService.deregisterInstance(serviceName, instance); - } else if (registerMode == 1) { - consulClient.agentServiceDeregister(ip + ":" + port, consulConfig.getToken()); - } - } catch (Exception e) { - logger.error(e.toString(), e); - } - } -} |
