summaryrefslogtreecommitdiff
path: root/controller/PostgreSQL.cpp
diff options
context:
space:
mode:
authorGrant Limberg <[email protected]>2022-04-28 11:13:44 -0700
committerGrant Limberg <[email protected]>2022-04-28 11:16:45 -0700
commitff18bacd94d38c744173caacc3280220810b41c1 (patch)
tree226181558fdf93db3a48395f1ccd7c7bb0964cb7 /controller/PostgreSQL.cpp
parentcaf1de3bcf380657519b8c6761cce4c0250da278 (diff)
fix XREAD commands for redis message queue
Diffstat (limited to 'controller/PostgreSQL.cpp')
-rw-r--r--controller/PostgreSQL.cpp13
1 files changed, 8 insertions, 5 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp
index f83ebc9b..7aefec9f 100644
--- a/controller/PostgreSQL.cpp
+++ b/controller/PostgreSQL.cpp
@@ -937,15 +937,16 @@ void PostgreSQL::_membersWatcher_Postgres() {
void PostgreSQL::_membersWatcher_Redis() {
char buf[11] = {0};
std::string key = "member-stream:{" + std::string(_myAddress.toString(buf)) + "}";
+ std::string lastID = "0";
fprintf(stderr, "Listening to member stream: %s\n", key.c_str());
while (_run == 1) {
try {
json tmp;
std::unordered_map<std::string, ItemStream> result;
if (_rc->clusterMode) {
- _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
+ _cluster->xread(key, lastID, std::chrono::seconds(1), 10, std::inserter(result, result.end()));
} else {
- _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
+ _redis->xread(key, lastID, std::chrono::seconds(1), 10, std::inserter(result, result.end()));
}
if (!result.empty()) {
for (auto element : result) {
@@ -982,6 +983,7 @@ void PostgreSQL::_membersWatcher_Redis() {
} else {
_redis->xdel(key, id);
}
+ lastID = id;
}
}
}
@@ -1024,15 +1026,15 @@ void PostgreSQL::_networksWatcher_Postgres() {
void PostgreSQL::_networksWatcher_Redis() {
char buf[11] = {0};
std::string key = "network-stream:{" + std::string(_myAddress.toString(buf)) + "}";
-
+ std::string lastID = "0";
while (_run == 1) {
try {
json tmp;
std::unordered_map<std::string, ItemStream> result;
if (_rc->clusterMode) {
- _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
+ _cluster->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
} else {
- _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
+ _redis->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
}
if (!result.empty()) {
@@ -1070,6 +1072,7 @@ void PostgreSQL::_networksWatcher_Redis() {
} else {
_redis->xdel(key, id);
}
+ lastID = id;
}
}
}