summaryrefslogtreecommitdiff
path: root/node
diff options
context:
space:
mode:
authorJoseph Henry <[email protected]>2023-08-15 12:59:58 -0700
committerJoseph Henry <[email protected]>2023-08-15 12:59:58 -0700
commitb8b5d0bff8b05d7b66439f25f1b4358f2a556f21 (patch)
tree1778a989892447b79f44bd6647b39c27ca34c3ee /node
parente18172d434606336946e4f4f46b518e00f3a109c (diff)
Add setmtu command, fix bond lifetime issue
Diffstat (limited to 'node')
-rw-r--r--node/Bond.cpp46
-rw-r--r--node/Bond.hpp20
-rw-r--r--node/IncomingPacket.cpp28
-rw-r--r--node/Node.cpp9
-rw-r--r--node/Path.hpp8
-rw-r--r--node/Peer.cpp23
-rw-r--r--node/Peer.hpp81
7 files changed, 179 insertions, 36 deletions
diff --git a/node/Bond.cpp b/node/Bond.cpp
index 4c43ad50..9bd9ee41 100644
--- a/node/Bond.cpp
+++ b/node/Bond.cpp
@@ -102,6 +102,43 @@ SharedPtr<Bond> Bond::getBondByPeerId(int64_t identity)
return _bonds.count(identity) ? _bonds[identity] : SharedPtr<Bond>();
}
+bool Bond::setAllMtuByTuple(uint16_t mtu, const std::string& ifStr, const std::string& ipStr)
+{
+ Mutex::Lock _l(_bonds_m);
+ std::map<int64_t, SharedPtr<Bond> >::iterator bondItr = _bonds.begin();
+ bool found = false;
+ while (bondItr != _bonds.end()) {
+ if (bondItr->second->setMtuByTuple(mtu,ifStr,ipStr)) {
+ found = true;
+ }
+ ++bondItr;
+ }
+ return found;
+}
+
+bool Bond::setMtuByTuple(uint16_t mtu, const std::string& ifStr, const std::string& ipStr)
+{
+ Mutex::Lock _lp(_paths_m);
+ bool found = false;
+ for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
+ if (_paths[i].p) {
+ SharedPtr<Link> sl = getLink(_paths[i].p);
+ if (sl) {
+ if (sl->ifname() == ifStr) {
+ char ipBuf[64] = { 0 };
+ _paths[i].p->address().toIpString(ipBuf);
+ std::string newString = std::string(ipBuf);
+ if (newString == ipStr) {
+ _paths[i].p->_mtu = mtu;
+ found = true;
+ }
+ }
+ }
+ }
+ }
+ return found;
+}
+
SharedPtr<Bond> Bond::createBond(const RuntimeEnvironment* renv, const SharedPtr<Peer>& peer)
{
Mutex::Lock _l(_bonds_m);
@@ -162,8 +199,8 @@ void Bond::destroyBond(uint64_t peerId)
auto iter = _bonds.find(peerId);
if (iter != _bonds.end()) {
iter->second->stopBond();
+ _bonds.erase(iter);
}
- _bonds.erase(peerId);
}
void Bond::stopBond()
@@ -978,7 +1015,7 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
// Whether we've waited long enough since the link last came online
bool satisfiedUpDelay = (now - _paths[i].lastAliveToggle) >= _upDelay;
// How long since the last QoS was received (Must be less than ZT_PEER_PATH_EXPIRATION since the remote peer's _qosSendInterval isn't known)
- bool acceptableQoSAge = _paths[i].lastQoSReceived == 0 || ((now - _paths[i].lastQoSReceived) < ZT_PEER_EXPIRED_PATH_TRIAL_PERIOD);
+ bool acceptableQoSAge = (_paths[i].lastQoSReceived == 0 && inTrial) || ((now - _paths[i].lastQoSReceived) < ZT_PEER_EXPIRED_PATH_TRIAL_PERIOD);
currEligibility = _paths[i].allowed() && ((acceptableAge && satisfiedUpDelay && acceptableQoSAge) || inTrial);
if (currEligibility) {
@@ -1070,7 +1107,7 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
// Bond a spare link if required (no viable primary links left)
if (! foundUsablePrimaryPath) {
- debug("no usable primary links remain, will attempt to use spare if available");
+ // debug("no usable primary links remain, will attempt to use spare if available");
for (int j = 0; j < it->second.size(); j++) {
int idx = it->second.at(j);
if (! _paths[idx].p || ! _paths[idx].eligible || ! _paths[idx].allowed() || ! _paths[idx].isSpare()) {
@@ -1244,7 +1281,8 @@ void Bond::estimatePathQuality(int64_t now)
if (link) {
int linkSpeed = link->capacity();
_paths[i].p->_givenLinkSpeed = linkSpeed;
- _paths[i].p->_mtu = link->mtu();
+ _paths[i].p->_mtu = link->mtu() ? link->mtu() : _paths[i].p->_mtu;
+ _paths[i].p->_assignedFlowCount = _paths[i].assignedFlowCount;
maxObservedLinkCap = linkSpeed > maxObservedLinkCap ? linkSpeed : maxObservedLinkCap;
}
}
diff --git a/node/Bond.hpp b/node/Bond.hpp
index 81b4691b..c6347a8c 100644
--- a/node/Bond.hpp
+++ b/node/Bond.hpp
@@ -457,6 +457,26 @@ class Bond {
static SharedPtr<Bond> getBondByPeerId(int64_t identity);
/**
+ * Set MTU for link by given interface name and IP address (across all bonds)
+ *
+ * @param mtu MTU to be used on this link
+ * @param ifStr interface name to match
+ * @param ipStr IP address to match
+ * @return Whether the MTU was set
+ */
+ static bool setAllMtuByTuple(uint16_t mtu, const std::string& ifStr, const std::string& ipStr);
+
+ /**
+ * Set MTU for link by given interface name and IP address
+ *
+ * @param mtu MTU to be used on this link
+ * @param ifStr interface name to match
+ * @param ipStr IP address to match
+ * @return Whether the MTU was set
+ */
+ bool setMtuByTuple(uint16_t mtu, const std::string& ifStr, const std::string& ipStr);
+
+ /**
* Add a new bond to the bond controller.
*
* @param renv Runtime environment
diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp
index f748bdb3..a5dd7701 100644
--- a/node/IncomingPacket.cpp
+++ b/node/IncomingPacket.cpp
@@ -317,8 +317,7 @@ bool IncomingPacket::_doERROR(const RuntimeEnvironment *RR,void *tPtr,const Shar
bool IncomingPacket::_doACK(const RuntimeEnvironment* RR, void* tPtr, const SharedPtr<Peer>& peer)
{
/*
- SharedPtr<Bond> bond = peer->bond();
- if (! bond || ! bond->rateGateACK(RR->node->now())) {
+ if (! peer->rateGateACK(RR->node->now())) {
return true;
}
int32_t ackedBytes;
@@ -326,9 +325,7 @@ bool IncomingPacket::_doACK(const RuntimeEnvironment* RR, void* tPtr, const Shar
return true; // ignore
}
memcpy(&ackedBytes, payload(), sizeof(ackedBytes));
- if (bond) {
- bond->receivedAck(_path, RR->node->now(), Utils::ntoh(ackedBytes));
- }
+ peer->receivedAck(_path, RR->node->now(), Utils::ntoh(ackedBytes));
*/
Metrics::pkt_ack_in++;
return true;
@@ -338,7 +335,7 @@ bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment* RR, void* tPtr
{
Metrics::pkt_qos_in++;
SharedPtr<Bond> bond = peer->bond();
- if (! bond || ! bond->rateGateQoS(RR->node->now(), _path)) {
+ if (! peer->rateGateQoS(RR->node->now(), _path)) {
return true;
}
if (payloadLength() > ZT_QOS_MAX_PACKET_SIZE || payloadLength() < ZT_QOS_MIN_PACKET_SIZE) {
@@ -359,9 +356,7 @@ bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment* RR, void* tPtr
ptr += sizeof(uint16_t);
count++;
}
- if (bond) {
- bond->receivedQoS(_path, now, count, rx_id, rx_ts);
- }
+ peer->receivedQoS(_path, now, count, rx_id, rx_ts);
return true;
}
@@ -626,10 +621,7 @@ bool IncomingPacket::_doOK(const RuntimeEnvironment *RR,void *tPtr,const SharedP
}
if (!hops()) {
- SharedPtr<Bond> bond = peer->bond();
- if (!bond) {
- _path->updateLatency((unsigned int)latency,RR->node->now());
- }
+ _path->updateLatency((unsigned int)latency,RR->node->now());
}
peer->setRemoteVersion(vProto,vMajor,vMinor,vRevision);
@@ -801,8 +793,7 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar
{
Metrics::pkt_frame_in++;
int32_t _flowId = ZT_QOS_NO_FLOW;
- SharedPtr<Bond> bond = peer->bond();
- if (bond && bond->flowHashingSupported()) {
+ if (peer->flowHashingSupported()) {
if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) {
const unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE);
const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
@@ -1481,8 +1472,7 @@ bool IncomingPacket::_doPATH_NEGOTIATION_REQUEST(const RuntimeEnvironment *RR,vo
{
Metrics::pkt_path_negotiation_request_in++;
uint64_t now = RR->node->now();
- SharedPtr<Bond> bond = peer->bond();
- if (!bond || !bond->rateGatePathNegotiation(now, _path)) {
+ if (!peer->rateGatePathNegotiation(now, _path)) {
return true;
}
if (payloadLength() != sizeof(int16_t)) {
@@ -1490,9 +1480,7 @@ bool IncomingPacket::_doPATH_NEGOTIATION_REQUEST(const RuntimeEnvironment *RR,vo
}
int16_t remoteUtility = 0;
memcpy(&remoteUtility, payload(), sizeof(int16_t));
- if (peer->bond()) {
- peer->bond()->processIncomingPathNegotiationRequest(now, _path, Utils::ntoh(remoteUtility));
- }
+ peer->processIncomingPathNegotiationRequest(now, _path, Utils::ntoh(remoteUtility));
return true;
}
diff --git a/node/Node.cpp b/node/Node.cpp
index 0657cbd0..4913d1a4 100644
--- a/node/Node.cpp
+++ b/node/Node.cpp
@@ -589,6 +589,7 @@ ZT_PeerList *Node::peers() const
p->paths[p->pathCount].latencyVariance = (*path)->latencyVariance();
p->paths[p->pathCount].packetLossRatio = (*path)->packetLossRatio();
p->paths[p->pathCount].packetErrorRatio = (*path)->packetErrorRatio();
+ p->paths[p->pathCount].assignedFlowCount = (*path)->assignedFlowCount();
p->paths[p->pathCount].relativeQuality = (*path)->relativeQuality();
p->paths[p->pathCount].linkSpeed = (*path)->givenLinkSpeed();
p->paths[p->pathCount].bonded = (*path)->bonded();
@@ -602,9 +603,9 @@ ZT_PeerList *Node::peers() const
}
if (pi->second->bond()) {
p->isBonded = pi->second->bond();
- p->bondingPolicy = pi->second->bond()->policy();
- p->numAliveLinks = pi->second->bond()->getNumAliveLinks();
- p->numTotalLinks = pi->second->bond()->getNumTotalLinks();
+ p->bondingPolicy = pi->second->bondingPolicy();
+ p->numAliveLinks = pi->second->getNumAliveLinks();
+ p->numTotalLinks = pi->second->getNumTotalLinks();
}
}
@@ -851,7 +852,7 @@ void Node::ncSendError(uint64_t nwid,uint64_t requestPacketId,const Address &des
case NetworkController::NC_ERROR_AUTHENTICATION_REQUIRED: {
//fprintf(stderr, "\n\nGot auth required\n\n");
break;
- }
+ }
default:
break;
diff --git a/node/Path.hpp b/node/Path.hpp
index 31d8e60d..b7694920 100644
--- a/node/Path.hpp
+++ b/node/Path.hpp
@@ -89,6 +89,7 @@ public:
_latencyVariance(0.0),
_packetLossRatio(0.0),
_packetErrorRatio(0.0),
+ _assignedFlowCount(0),
_valid(true),
_eligible(false),
_bonded(false),
@@ -110,6 +111,7 @@ public:
_latencyVariance(0.0),
_packetLossRatio(0.0),
_packetErrorRatio(0.0),
+ _assignedFlowCount(0),
_valid(true),
_eligible(false),
_bonded(false),
@@ -321,6 +323,11 @@ public:
inline float packetErrorRatio() const { return _packetErrorRatio; }
/**
+ * @return Number of flows assigned to this path
+ */
+ inline unsigned int assignedFlowCount() const { return _assignedFlowCount; }
+
+ /**
* @return Whether this path is valid as reported by the bonding layer. The bonding layer
* actually checks with Phy to see if the interface is still up
*/
@@ -374,6 +381,7 @@ private:
volatile float _latencyVariance;
volatile float _packetLossRatio;
volatile float _packetErrorRatio;
+ volatile uint16_t _assignedFlowCount;
volatile bool _valid;
volatile bool _eligible;
volatile bool _bonded;
diff --git a/node/Peer.cpp b/node/Peer.cpp
index d7f543ea..2040a3b4 100644
--- a/node/Peer.cpp
+++ b/node/Peer.cpp
@@ -28,7 +28,7 @@ namespace ZeroTier {
static unsigned char s_freeRandomByteCounter = 0;
-Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Identity &peerIdentity)
+Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Identity &peerIdentity)
: RR(renv)
, _lastReceive(0)
, _lastNontrivialReceive(0)
@@ -487,20 +487,29 @@ void Peer::tryMemorizedPath(void *tPtr,int64_t now)
void Peer::performMultipathStateCheck(void *tPtr, int64_t now)
{
Mutex::Lock _l(_bond_m);
- if (_bond) {
- // Once enabled the Bond object persists, no need to update state
- return;
- }
/**
* Check for conditions required for multipath bonding and create a bond
* if allowed.
*/
int numAlivePaths = 0;
+ bool atLeastOneNonExpired = false;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
- if (_paths[i].p && _paths[i].p->alive(now)) {
- numAlivePaths++;
+ if (_paths[i].p) {
+ if(_paths[i].p->alive(now)) {
+ numAlivePaths++;
+ }
+ if ((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION) {
+ atLeastOneNonExpired = true;
+ }
}
}
+ if (_bond) {
+ if (numAlivePaths == 0 && !atLeastOneNonExpired) {
+ _bond = SharedPtr<Bond>();
+ RR->bc->destroyBond(_id.address().toInt());
+ }
+ return;
+ }
_localMultipathSupported = ((numAlivePaths >= 1) && (RR->bc->inUse()) && (ZT_PROTO_VERSION > 9));
if (_localMultipathSupported && !_bond) {
if (RR->bc) {
diff --git a/node/Peer.hpp b/node/Peer.hpp
index d03e8f88..e29975b1 100644
--- a/node/Peer.hpp
+++ b/node/Peer.hpp
@@ -56,7 +56,6 @@ private:
public:
~Peer() {
Utils::burn(_key,sizeof(_key));
- RR->bc->destroyBond(_id.address().toInt());
}
/**
@@ -435,6 +434,64 @@ public:
}
/**
+ * See definition in Bond
+ */
+ inline bool rateGateQoS(int64_t now, SharedPtr<Path>& path)
+ {
+ Mutex::Lock _l(_bond_m);
+ if(_bond) {
+ return _bond->rateGateQoS(now, path);
+ }
+ return false; // Default behavior. If there is no bond, we drop these
+ }
+
+ /**
+ * See definition in Bond
+ */
+ void receivedQoS(const SharedPtr<Path>& path, int64_t now, int count, uint64_t* rx_id, uint16_t* rx_ts)
+ {
+ Mutex::Lock _l(_bond_m);
+ if(_bond) {
+ _bond->receivedQoS(path, now, count, rx_id, rx_ts);
+ }
+ }
+
+ /**
+ * See definition in Bond
+ */
+ void processIncomingPathNegotiationRequest(uint64_t now, SharedPtr<Path>& path, int16_t remoteUtility)
+ {
+ Mutex::Lock _l(_bond_m);
+ if(_bond) {
+ _bond->processIncomingPathNegotiationRequest(now, path, remoteUtility);
+ }
+ }
+
+ /**
+ * See definition in Bond
+ */
+ inline bool rateGatePathNegotiation(int64_t now, SharedPtr<Path>& path)
+ {
+ Mutex::Lock _l(_bond_m);
+ if(_bond) {
+ return _bond->rateGatePathNegotiation(now, path);
+ }
+ return false; // Default behavior. If there is no bond, we drop these
+ }
+
+ /**
+ * See definition in Bond
+ */
+ bool flowHashingSupported()
+ {
+ Mutex::Lock _l(_bond_m);
+ if(_bond) {
+ return _bond->flowHashingSupported();
+ }
+ return false;
+ }
+
+ /**
* Serialize a peer for storage in local cache
*
* This does not serialize everything, just non-ephemeral information.
@@ -533,6 +590,28 @@ public:
return ZT_BOND_POLICY_NONE;
}
+ /**
+ * @return the number of links in this bond which are considered alive
+ */
+ inline uint8_t getNumAliveLinks() {
+ Mutex::Lock _l(_paths_m);
+ if (_bond) {
+ return _bond->getNumAliveLinks();
+ }
+ return 0;
+ }
+
+ /**
+ * @return the number of links in this bond
+ */
+ inline uint8_t getNumTotalLinks() {
+ Mutex::Lock _l(_paths_m);
+ if (_bond) {
+ return _bond->getNumTotalLinks();
+ }
+ return 0;
+ }
+
//inline const AES *aesKeysIfSupported() const
//{ return (const AES *)0; }