summaryrefslogtreecommitdiff
path: root/controller/PostgreSQL.cpp
diff options
context:
space:
mode:
authorGrant Limberg <[email protected]>2022-05-10 08:36:39 -0700
committerGrant Limberg <[email protected]>2022-05-10 08:36:39 -0700
commit9ddc0327d442cae13d9e3253eea6915621b21824 (patch)
tree2532b7fbef7dff93107c61ed9d25d0f21327256c /controller/PostgreSQL.cpp
parent5fcaed086d7216e87ba2d39643a9fcffc52b346c (diff)
enable redis member status again
Diffstat (limited to 'controller/PostgreSQL.cpp')
-rw-r--r--controller/PostgreSQL.cpp31
1 files changed, 24 insertions, 7 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp
index c6623d55..1f52da69 100644
--- a/controller/PostgreSQL.cpp
+++ b/controller/PostgreSQL.cpp
@@ -170,6 +170,7 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R
, _rc(rc)
, _redis(NULL)
, _cluster(NULL)
+ , _redisMemberStatus(false)
{
char myAddress[64];
_myAddressStr = myId.address().toString(myAddress);
@@ -189,6 +190,11 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R
// it will be padded at the end with zeroes. If longer, it'll be truncated.
Utils::unhex(ssoPskHex, _ssoPsk, sizeof(_ssoPsk));
}
+ const char *redisMemberStatus = getenv("ZT_REDIS_MEMBER_STATUS");
+ if (redisMemberStatus) {
+ _redisMemberStatus = true;
+ fprintf(stderr, "Using redis for member status\n");
+ }
auto c = _pool->borrow();
pqxx::work txn{*c->c};
@@ -1390,8 +1396,12 @@ void PostgreSQL::commitThread()
void PostgreSQL::onlineNotificationThread()
{
- waitForReady();
- onlineNotification_Postgres();
+ waitForReady();
+ if (_redisMemberStatus) {
+ onlineNotification_Redis();
+ } else {
+ onlineNotification_Postgres();
+ }
}
void PostgreSQL::onlineNotification_Postgres()
@@ -1511,9 +1521,7 @@ void PostgreSQL::onlineNotification_Redis()
}
}
} catch (sw::redis::Error &e) {
-#ifdef ZT_TRACE
fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what());
-#endif
}
std::this_thread::sleep_for(std::chrono::seconds(10));
}
@@ -1524,6 +1532,7 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control
{
nlohmann::json jtmp1, jtmp2;
+ uint64_t count = 0;
for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) {
uint64_t nwid_i = i->first.first;
uint64_t memberid_i = i->first.second;
@@ -1555,14 +1564,21 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control
.zadd("active-networks:{"+controllerId+"}", networkId, ts)
.sadd("network-nodes-all:{"+controllerId+"}:"+networkId, memberId)
.hmset("member:{"+controllerId+"}:"+networkId+":"+memberId, record.begin(), record.end());
+ ++count;
}
// expire records from all-nodes and network-nodes member list
uint64_t expireOld = OSUtils::now() - 300000;
- tx.zremrangebyscore("nodes-online:{"+controllerId+"}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
- tx.zremrangebyscore("nodes-online2:{"+controllerId+"}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
- tx.zremrangebyscore("active-networks:{"+controllerId+"}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
+ tx.zremrangebyscore("nodes-online:{"+controllerId+"}",
+ sw::redis::RightBoundedInterval<double>(expireOld,
+ sw::redis::BoundType::LEFT_OPEN));
+ tx.zremrangebyscore("nodes-online2:{"+controllerId+"}",
+ sw::redis::RightBoundedInterval<double>(expireOld,
+ sw::redis::BoundType::LEFT_OPEN));
+ tx.zremrangebyscore("active-networks:{"+controllerId+"}",
+ sw::redis::RightBoundedInterval<double>(expireOld,
+ sw::redis::BoundType::LEFT_OPEN));
{
std::lock_guard<std::mutex> l(_networks_l);
for (const auto &it : _networks) {
@@ -1574,6 +1590,7 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control
}
}
tx.exec();
+ fprintf(stderr, "%s: Updated online status of %d members\n", _myAddressStr.c_str(), count);
}