diff options
| author | Joseph Henry <[email protected]> | 2023-08-15 12:59:58 -0700 |
|---|---|---|
| committer | Joseph Henry <[email protected]> | 2023-08-15 12:59:58 -0700 |
| commit | b8b5d0bff8b05d7b66439f25f1b4358f2a556f21 (patch) | |
| tree | 1778a989892447b79f44bd6647b39c27ca34c3ee /node | |
| parent | e18172d434606336946e4f4f46b518e00f3a109c (diff) | |
Add setmtu command, fix bond lifetime issue
Diffstat (limited to 'node')
| -rw-r--r-- | node/Bond.cpp | 46 | ||||
| -rw-r--r-- | node/Bond.hpp | 20 | ||||
| -rw-r--r-- | node/IncomingPacket.cpp | 28 | ||||
| -rw-r--r-- | node/Node.cpp | 9 | ||||
| -rw-r--r-- | node/Path.hpp | 8 | ||||
| -rw-r--r-- | node/Peer.cpp | 23 | ||||
| -rw-r--r-- | node/Peer.hpp | 81 |
7 files changed, 179 insertions, 36 deletions
diff --git a/node/Bond.cpp b/node/Bond.cpp index 4c43ad50..9bd9ee41 100644 --- a/node/Bond.cpp +++ b/node/Bond.cpp @@ -102,6 +102,43 @@ SharedPtr<Bond> Bond::getBondByPeerId(int64_t identity) return _bonds.count(identity) ? _bonds[identity] : SharedPtr<Bond>(); } +bool Bond::setAllMtuByTuple(uint16_t mtu, const std::string& ifStr, const std::string& ipStr) +{ + Mutex::Lock _l(_bonds_m); + std::map<int64_t, SharedPtr<Bond> >::iterator bondItr = _bonds.begin(); + bool found = false; + while (bondItr != _bonds.end()) { + if (bondItr->second->setMtuByTuple(mtu,ifStr,ipStr)) { + found = true; + } + ++bondItr; + } + return found; +} + +bool Bond::setMtuByTuple(uint16_t mtu, const std::string& ifStr, const std::string& ipStr) +{ + Mutex::Lock _lp(_paths_m); + bool found = false; + for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { + if (_paths[i].p) { + SharedPtr<Link> sl = getLink(_paths[i].p); + if (sl) { + if (sl->ifname() == ifStr) { + char ipBuf[64] = { 0 }; + _paths[i].p->address().toIpString(ipBuf); + std::string newString = std::string(ipBuf); + if (newString == ipStr) { + _paths[i].p->_mtu = mtu; + found = true; + } + } + } + } + } + return found; +} + SharedPtr<Bond> Bond::createBond(const RuntimeEnvironment* renv, const SharedPtr<Peer>& peer) { Mutex::Lock _l(_bonds_m); @@ -162,8 +199,8 @@ void Bond::destroyBond(uint64_t peerId) auto iter = _bonds.find(peerId); if (iter != _bonds.end()) { iter->second->stopBond(); + _bonds.erase(iter); } - _bonds.erase(peerId); } void Bond::stopBond() @@ -978,7 +1015,7 @@ void Bond::curateBond(int64_t now, bool rebuildBond) // Whether we've waited long enough since the link last came online bool satisfiedUpDelay = (now - _paths[i].lastAliveToggle) >= _upDelay; // How long since the last QoS was received (Must be less than ZT_PEER_PATH_EXPIRATION since the remote peer's _qosSendInterval isn't known) - bool acceptableQoSAge = _paths[i].lastQoSReceived == 0 || ((now - _paths[i].lastQoSReceived) < ZT_PEER_EXPIRED_PATH_TRIAL_PERIOD); + bool acceptableQoSAge = (_paths[i].lastQoSReceived == 0 && inTrial) || ((now - _paths[i].lastQoSReceived) < ZT_PEER_EXPIRED_PATH_TRIAL_PERIOD); currEligibility = _paths[i].allowed() && ((acceptableAge && satisfiedUpDelay && acceptableQoSAge) || inTrial); if (currEligibility) { @@ -1070,7 +1107,7 @@ void Bond::curateBond(int64_t now, bool rebuildBond) // Bond a spare link if required (no viable primary links left) if (! foundUsablePrimaryPath) { - debug("no usable primary links remain, will attempt to use spare if available"); + // debug("no usable primary links remain, will attempt to use spare if available"); for (int j = 0; j < it->second.size(); j++) { int idx = it->second.at(j); if (! _paths[idx].p || ! _paths[idx].eligible || ! _paths[idx].allowed() || ! _paths[idx].isSpare()) { @@ -1244,7 +1281,8 @@ void Bond::estimatePathQuality(int64_t now) if (link) { int linkSpeed = link->capacity(); _paths[i].p->_givenLinkSpeed = linkSpeed; - _paths[i].p->_mtu = link->mtu(); + _paths[i].p->_mtu = link->mtu() ? link->mtu() : _paths[i].p->_mtu; + _paths[i].p->_assignedFlowCount = _paths[i].assignedFlowCount; maxObservedLinkCap = linkSpeed > maxObservedLinkCap ? linkSpeed : maxObservedLinkCap; } } diff --git a/node/Bond.hpp b/node/Bond.hpp index 81b4691b..c6347a8c 100644 --- a/node/Bond.hpp +++ b/node/Bond.hpp @@ -457,6 +457,26 @@ class Bond { static SharedPtr<Bond> getBondByPeerId(int64_t identity); /** + * Set MTU for link by given interface name and IP address (across all bonds) + * + * @param mtu MTU to be used on this link + * @param ifStr interface name to match + * @param ipStr IP address to match + * @return Whether the MTU was set + */ + static bool setAllMtuByTuple(uint16_t mtu, const std::string& ifStr, const std::string& ipStr); + + /** + * Set MTU for link by given interface name and IP address + * + * @param mtu MTU to be used on this link + * @param ifStr interface name to match + * @param ipStr IP address to match + * @return Whether the MTU was set + */ + bool setMtuByTuple(uint16_t mtu, const std::string& ifStr, const std::string& ipStr); + + /** * Add a new bond to the bond controller. * * @param renv Runtime environment diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index f748bdb3..a5dd7701 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -317,8 +317,7 @@ bool IncomingPacket::_doERROR(const RuntimeEnvironment *RR,void *tPtr,const Shar bool IncomingPacket::_doACK(const RuntimeEnvironment* RR, void* tPtr, const SharedPtr<Peer>& peer) { /* - SharedPtr<Bond> bond = peer->bond(); - if (! bond || ! bond->rateGateACK(RR->node->now())) { + if (! peer->rateGateACK(RR->node->now())) { return true; } int32_t ackedBytes; @@ -326,9 +325,7 @@ bool IncomingPacket::_doACK(const RuntimeEnvironment* RR, void* tPtr, const Shar return true; // ignore } memcpy(&ackedBytes, payload(), sizeof(ackedBytes)); - if (bond) { - bond->receivedAck(_path, RR->node->now(), Utils::ntoh(ackedBytes)); - } + peer->receivedAck(_path, RR->node->now(), Utils::ntoh(ackedBytes)); */ Metrics::pkt_ack_in++; return true; @@ -338,7 +335,7 @@ bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment* RR, void* tPtr { Metrics::pkt_qos_in++; SharedPtr<Bond> bond = peer->bond(); - if (! bond || ! bond->rateGateQoS(RR->node->now(), _path)) { + if (! peer->rateGateQoS(RR->node->now(), _path)) { return true; } if (payloadLength() > ZT_QOS_MAX_PACKET_SIZE || payloadLength() < ZT_QOS_MIN_PACKET_SIZE) { @@ -359,9 +356,7 @@ bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment* RR, void* tPtr ptr += sizeof(uint16_t); count++; } - if (bond) { - bond->receivedQoS(_path, now, count, rx_id, rx_ts); - } + peer->receivedQoS(_path, now, count, rx_id, rx_ts); return true; } @@ -626,10 +621,7 @@ bool IncomingPacket::_doOK(const RuntimeEnvironment *RR,void *tPtr,const SharedP } if (!hops()) { - SharedPtr<Bond> bond = peer->bond(); - if (!bond) { - _path->updateLatency((unsigned int)latency,RR->node->now()); - } + _path->updateLatency((unsigned int)latency,RR->node->now()); } peer->setRemoteVersion(vProto,vMajor,vMinor,vRevision); @@ -801,8 +793,7 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar { Metrics::pkt_frame_in++; int32_t _flowId = ZT_QOS_NO_FLOW; - SharedPtr<Bond> bond = peer->bond(); - if (bond && bond->flowHashingSupported()) { + if (peer->flowHashingSupported()) { if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) { const unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE); const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD; @@ -1481,8 +1472,7 @@ bool IncomingPacket::_doPATH_NEGOTIATION_REQUEST(const RuntimeEnvironment *RR,vo { Metrics::pkt_path_negotiation_request_in++; uint64_t now = RR->node->now(); - SharedPtr<Bond> bond = peer->bond(); - if (!bond || !bond->rateGatePathNegotiation(now, _path)) { + if (!peer->rateGatePathNegotiation(now, _path)) { return true; } if (payloadLength() != sizeof(int16_t)) { @@ -1490,9 +1480,7 @@ bool IncomingPacket::_doPATH_NEGOTIATION_REQUEST(const RuntimeEnvironment *RR,vo } int16_t remoteUtility = 0; memcpy(&remoteUtility, payload(), sizeof(int16_t)); - if (peer->bond()) { - peer->bond()->processIncomingPathNegotiationRequest(now, _path, Utils::ntoh(remoteUtility)); - } + peer->processIncomingPathNegotiationRequest(now, _path, Utils::ntoh(remoteUtility)); return true; } diff --git a/node/Node.cpp b/node/Node.cpp index 0657cbd0..4913d1a4 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -589,6 +589,7 @@ ZT_PeerList *Node::peers() const p->paths[p->pathCount].latencyVariance = (*path)->latencyVariance(); p->paths[p->pathCount].packetLossRatio = (*path)->packetLossRatio(); p->paths[p->pathCount].packetErrorRatio = (*path)->packetErrorRatio(); + p->paths[p->pathCount].assignedFlowCount = (*path)->assignedFlowCount(); p->paths[p->pathCount].relativeQuality = (*path)->relativeQuality(); p->paths[p->pathCount].linkSpeed = (*path)->givenLinkSpeed(); p->paths[p->pathCount].bonded = (*path)->bonded(); @@ -602,9 +603,9 @@ ZT_PeerList *Node::peers() const } if (pi->second->bond()) { p->isBonded = pi->second->bond(); - p->bondingPolicy = pi->second->bond()->policy(); - p->numAliveLinks = pi->second->bond()->getNumAliveLinks(); - p->numTotalLinks = pi->second->bond()->getNumTotalLinks(); + p->bondingPolicy = pi->second->bondingPolicy(); + p->numAliveLinks = pi->second->getNumAliveLinks(); + p->numTotalLinks = pi->second->getNumTotalLinks(); } } @@ -851,7 +852,7 @@ void Node::ncSendError(uint64_t nwid,uint64_t requestPacketId,const Address &des case NetworkController::NC_ERROR_AUTHENTICATION_REQUIRED: { //fprintf(stderr, "\n\nGot auth required\n\n"); break; - } + } default: break; diff --git a/node/Path.hpp b/node/Path.hpp index 31d8e60d..b7694920 100644 --- a/node/Path.hpp +++ b/node/Path.hpp @@ -89,6 +89,7 @@ public: _latencyVariance(0.0), _packetLossRatio(0.0), _packetErrorRatio(0.0), + _assignedFlowCount(0), _valid(true), _eligible(false), _bonded(false), @@ -110,6 +111,7 @@ public: _latencyVariance(0.0), _packetLossRatio(0.0), _packetErrorRatio(0.0), + _assignedFlowCount(0), _valid(true), _eligible(false), _bonded(false), @@ -321,6 +323,11 @@ public: inline float packetErrorRatio() const { return _packetErrorRatio; } /** + * @return Number of flows assigned to this path + */ + inline unsigned int assignedFlowCount() const { return _assignedFlowCount; } + + /** * @return Whether this path is valid as reported by the bonding layer. The bonding layer * actually checks with Phy to see if the interface is still up */ @@ -374,6 +381,7 @@ private: volatile float _latencyVariance; volatile float _packetLossRatio; volatile float _packetErrorRatio; + volatile uint16_t _assignedFlowCount; volatile bool _valid; volatile bool _eligible; volatile bool _bonded; diff --git a/node/Peer.cpp b/node/Peer.cpp index d7f543ea..2040a3b4 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -28,7 +28,7 @@ namespace ZeroTier { static unsigned char s_freeRandomByteCounter = 0; -Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Identity &peerIdentity) +Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Identity &peerIdentity) : RR(renv) , _lastReceive(0) , _lastNontrivialReceive(0) @@ -487,20 +487,29 @@ void Peer::tryMemorizedPath(void *tPtr,int64_t now) void Peer::performMultipathStateCheck(void *tPtr, int64_t now) { Mutex::Lock _l(_bond_m); - if (_bond) { - // Once enabled the Bond object persists, no need to update state - return; - } /** * Check for conditions required for multipath bonding and create a bond * if allowed. */ int numAlivePaths = 0; + bool atLeastOneNonExpired = false; for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { - if (_paths[i].p && _paths[i].p->alive(now)) { - numAlivePaths++; + if (_paths[i].p) { + if(_paths[i].p->alive(now)) { + numAlivePaths++; + } + if ((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION) { + atLeastOneNonExpired = true; + } } } + if (_bond) { + if (numAlivePaths == 0 && !atLeastOneNonExpired) { + _bond = SharedPtr<Bond>(); + RR->bc->destroyBond(_id.address().toInt()); + } + return; + } _localMultipathSupported = ((numAlivePaths >= 1) && (RR->bc->inUse()) && (ZT_PROTO_VERSION > 9)); if (_localMultipathSupported && !_bond) { if (RR->bc) { diff --git a/node/Peer.hpp b/node/Peer.hpp index d03e8f88..e29975b1 100644 --- a/node/Peer.hpp +++ b/node/Peer.hpp @@ -56,7 +56,6 @@ private: public: ~Peer() { Utils::burn(_key,sizeof(_key)); - RR->bc->destroyBond(_id.address().toInt()); } /** @@ -435,6 +434,64 @@ public: } /** + * See definition in Bond + */ + inline bool rateGateQoS(int64_t now, SharedPtr<Path>& path) + { + Mutex::Lock _l(_bond_m); + if(_bond) { + return _bond->rateGateQoS(now, path); + } + return false; // Default behavior. If there is no bond, we drop these + } + + /** + * See definition in Bond + */ + void receivedQoS(const SharedPtr<Path>& path, int64_t now, int count, uint64_t* rx_id, uint16_t* rx_ts) + { + Mutex::Lock _l(_bond_m); + if(_bond) { + _bond->receivedQoS(path, now, count, rx_id, rx_ts); + } + } + + /** + * See definition in Bond + */ + void processIncomingPathNegotiationRequest(uint64_t now, SharedPtr<Path>& path, int16_t remoteUtility) + { + Mutex::Lock _l(_bond_m); + if(_bond) { + _bond->processIncomingPathNegotiationRequest(now, path, remoteUtility); + } + } + + /** + * See definition in Bond + */ + inline bool rateGatePathNegotiation(int64_t now, SharedPtr<Path>& path) + { + Mutex::Lock _l(_bond_m); + if(_bond) { + return _bond->rateGatePathNegotiation(now, path); + } + return false; // Default behavior. If there is no bond, we drop these + } + + /** + * See definition in Bond + */ + bool flowHashingSupported() + { + Mutex::Lock _l(_bond_m); + if(_bond) { + return _bond->flowHashingSupported(); + } + return false; + } + + /** * Serialize a peer for storage in local cache * * This does not serialize everything, just non-ephemeral information. @@ -533,6 +590,28 @@ public: return ZT_BOND_POLICY_NONE; } + /** + * @return the number of links in this bond which are considered alive + */ + inline uint8_t getNumAliveLinks() { + Mutex::Lock _l(_paths_m); + if (_bond) { + return _bond->getNumAliveLinks(); + } + return 0; + } + + /** + * @return the number of links in this bond + */ + inline uint8_t getNumTotalLinks() { + Mutex::Lock _l(_paths_m); + if (_bond) { + return _bond->getNumTotalLinks(); + } + return 0; + } + //inline const AES *aesKeysIfSupported() const //{ return (const AES *)0; } |
