diff options
| author | Grant Limberg <[email protected]> | 2022-06-13 13:09:36 -0700 |
|---|---|---|
| committer | Grant Limberg <[email protected]> | 2022-06-13 13:09:36 -0700 |
| commit | c6fc3560f2029a6ebec4a60e2ab9959fe0bbf17d (patch) | |
| tree | f11b90a07d47180b82b23d24c00b7f676c0505cd /controller/PostgreSQL.cpp | |
| parent | 9ddc0327d442cae13d9e3253eea6915621b21824 (diff) | |
| parent | b4cec0b4a8acb41cc1a52f24e0d423131b5b71df (diff) | |
Merge branch 'dev' into redisrection
Diffstat (limited to 'controller/PostgreSQL.cpp')
| -rw-r--r-- | controller/PostgreSQL.cpp | 28 |
1 files changed, 16 insertions, 12 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 1f52da69..76b20877 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -28,7 +28,7 @@ #include <chrono> -// #define ZT_TRACE 1 +// #define REDIS_TRACE 1 using json = nlohmann::json; @@ -783,6 +783,7 @@ void PostgreSQL::initializeMembers() std::string assignedAddresses = std::get<20>(row); config["id"] = memberId; + config["address"] = memberId; config["nwid"] = networkId; config["activeBridge"] = activeBridge.value_or(false); config["authorized"] = authorized.value_or(false); @@ -942,30 +943,31 @@ void PostgreSQL::_membersWatcher_Postgres() { void PostgreSQL::_membersWatcher_Redis() { char buf[11] = {0}; std::string key = "member-stream:{" + std::string(_myAddress.toString(buf)) + "}"; + std::string lastID = "0"; fprintf(stderr, "Listening to member stream: %s\n", key.c_str()); while (_run == 1) { try { json tmp; std::unordered_map<std::string, ItemStream> result; if (_rc->clusterMode) { - _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + _cluster->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end())); } else { - _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + _redis->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end())); } if (!result.empty()) { for (auto element : result) { - #ifdef ZT_TRACE + #ifdef REDIS_TRACE fprintf(stdout, "Received notification from: %s\n", element.first.c_str()); #endif for (auto rec : element.second) { std::string id = rec.first; auto attrs = rec.second; - #ifdef ZT_TRACE + #ifdef REDIS_TRACE fprintf(stdout, "Record ID: %s\n", id.c_str()); fprintf(stdout, "attrs len: %lu\n", attrs.size()); #endif for (auto a : attrs) { - #ifdef ZT_TRACE + #ifdef REDIS_TRACE fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str()); #endif try { @@ -987,6 +989,7 @@ void PostgreSQL::_membersWatcher_Redis() { } else { _redis->xdel(key, id); } + lastID = id; } } } @@ -1029,31 +1032,31 @@ void PostgreSQL::_networksWatcher_Postgres() { void PostgreSQL::_networksWatcher_Redis() { char buf[11] = {0}; std::string key = "network-stream:{" + std::string(_myAddress.toString(buf)) + "}"; - + std::string lastID = "0"; while (_run == 1) { try { json tmp; std::unordered_map<std::string, ItemStream> result; if (_rc->clusterMode) { - _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + _cluster->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end())); } else { - _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + _redis->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end())); } if (!result.empty()) { for (auto element : result) { -#ifdef ZT_TRACE +#ifdef REDIS_TRACE fprintf(stdout, "Received notification from: %s\n", element.first.c_str()); #endif for (auto rec : element.second) { std::string id = rec.first; auto attrs = rec.second; -#ifdef ZT_TRACE +#ifdef REDIS_TRACE fprintf(stdout, "Record ID: %s\n", id.c_str()); fprintf(stdout, "attrs len: %lu\n", attrs.size()); #endif for (auto a : attrs) { -#ifdef ZT_TRACE +#ifdef REDIS_TRACE fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str()); #endif try { @@ -1075,6 +1078,7 @@ void PostgreSQL::_networksWatcher_Redis() { } else { _redis->xdel(key, id); } + lastID = id; } } } |
