summaryrefslogtreecommitdiff
path: root/controller/PostgreSQL.cpp
diff options
context:
space:
mode:
authorAdam Ierymenko <[email protected]>2020-08-25 08:07:23 -0700
committerAdam Ierymenko <[email protected]>2020-08-25 08:07:23 -0700
commit47e9fb3ddbbf59ee5814239609f09e7f7864a533 (patch)
tree60788f854989221d100eb960358d69f2538fb513 /controller/PostgreSQL.cpp
parent2ac49d99dd2159827330a890e99d2e828b39e106 (diff)
parentb1ddba0438bc3b33ebc0e28ac5c015fa63be1430 (diff)
Merge branch 'dev' of http://git.int.zerotier.com/zerotier/ZeroTierOne into dev
Diffstat (limited to 'controller/PostgreSQL.cpp')
-rw-r--r--controller/PostgreSQL.cpp265
1 files changed, 133 insertions, 132 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp
index 4dd63a0a..718fd20d 100644
--- a/controller/PostgreSQL.cpp
+++ b/controller/PostgreSQL.cpp
@@ -273,18 +273,18 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
std::string setKey = "networks:{" + _myAddressStr + "}";
- if (_rc != NULL) {
- try {
- if (_rc->clusterMode) {
- _cluster->del(setKey);
- } else {
- _redis->del(setKey);
- }
- } catch (sw::redis::Error &e) {
- // del can throw an error if the key doesn't exist
- // swallow it and move along
- }
- }
+ // if (_rc != NULL) {
+ // try {
+ // if (_rc->clusterMode) {
+ // _cluster->del(setKey);
+ // } else {
+ // _redis->del(setKey);
+ // }
+ // } catch (sw::redis::Error &e) {
+ // // del can throw an error if the key doesn't exist
+ // // swallow it and move along
+ // }
+ // }
std::unordered_set<std::string> networkSet;
@@ -475,17 +475,17 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
PQclear(res);
- if(!networkSet.empty()) {
- if (_rc && _rc->clusterMode) {
- auto tx = _cluster->transaction(_myAddressStr, true);
- tx.sadd(setKey, networkSet.begin(), networkSet.end());
- tx.exec();
- } else if (_rc && !_rc->clusterMode) {
- auto tx = _redis->transaction(true);
- tx.sadd(setKey, networkSet.begin(), networkSet.end());
- tx.exec();
- }
- }
+ // if(!networkSet.empty()) {
+ // if (_rc && _rc->clusterMode) {
+ // auto tx = _cluster->transaction(_myAddressStr, true);
+ // tx.sadd(setKey, networkSet.begin(), networkSet.end());
+ // tx.exec();
+ // } else if (_rc && !_rc->clusterMode) {
+ // auto tx = _redis->transaction(true);
+ // tx.sadd(setKey, networkSet.begin(), networkSet.end());
+ // tx.exec();
+ // }
+ // }
if (++this->_ready == 2) {
if (_waitNoticePrinted) {
@@ -509,36 +509,36 @@ void PostgreSQL::initializeMembers(PGconn *conn)
fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
exit(1);
}
- std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:";
+ // std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:";
- if (_rc != NULL) {
- std::lock_guard<std::mutex> l(_networks_l);
- std::unordered_set<std::string> deletes;
- for ( auto it : _networks) {
- uint64_t nwid_i = it.first;
- char nwidTmp[64] = {0};
- OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i);
- std::string nwid(nwidTmp);
- std::string key = setKeyBase + nwid;
- deletes.insert(key);
- }
-
- if (!deletes.empty()) {
- if (_rc->clusterMode) {
- auto tx = _cluster->transaction(_myAddressStr, true);
- for (std::string k : deletes) {
- tx.del(k);
- }
- tx.exec();
- } else {
- auto tx = _redis->transaction(true);
- for (std::string k : deletes) {
- tx.del(k);
- }
- tx.exec();
- }
- }
- }
+ // if (_rc != NULL) {
+ // std::lock_guard<std::mutex> l(_networks_l);
+ // std::unordered_set<std::string> deletes;
+ // for ( auto it : _networks) {
+ // uint64_t nwid_i = it.first;
+ // char nwidTmp[64] = {0};
+ // OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i);
+ // std::string nwid(nwidTmp);
+ // std::string key = setKeyBase + nwid;
+ // deletes.insert(key);
+ // }
+
+ // if (!deletes.empty()) {
+ // if (_rc->clusterMode) {
+ // auto tx = _cluster->transaction(_myAddressStr, true);
+ // for (std::string k : deletes) {
+ // tx.del(k);
+ // }
+ // tx.exec();
+ // } else {
+ // auto tx = _redis->transaction(true);
+ // for (std::string k : deletes) {
+ // tx.del(k);
+ // }
+ // tx.exec();
+ // }
+ // }
+ // }
const char *params[1] = {
_myAddressStr.c_str()
@@ -578,7 +578,7 @@ void PostgreSQL::initializeMembers(PGconn *conn)
std::string memberId(PQgetvalue(res, i, 0));
std::string networkId(PQgetvalue(res, i, 1));
- networkMembers.insert(std::pair<std::string, std::string>(setKeyBase+networkId, memberId));
+ // networkMembers.insert(std::pair<std::string, std::string>(setKeyBase+networkId, memberId));
std::string ctime = PQgetvalue(res, i, 5);
config["id"] = memberId;
@@ -685,23 +685,23 @@ void PostgreSQL::initializeMembers(PGconn *conn)
PQclear(res);
- if (!networkMembers.empty()) {
- if (_rc != NULL) {
- if (_rc->clusterMode) {
- auto tx = _cluster->transaction(_myAddressStr, true);
- for (auto it : networkMembers) {
- tx.sadd(it.first, it.second);
- }
- tx.exec();
- } else {
- auto tx = _redis->transaction(true);
- for (auto it : networkMembers) {
- tx.sadd(it.first, it.second);
- }
- tx.exec();
- }
- }
- }
+ // if (!networkMembers.empty()) {
+ // if (_rc != NULL) {
+ // if (_rc->clusterMode) {
+ // auto tx = _cluster->transaction(_myAddressStr, true);
+ // for (auto it : networkMembers) {
+ // tx.sadd(it.first, it.second);
+ // }
+ // tx.exec();
+ // } else {
+ // auto tx = _redis->transaction(true);
+ // for (auto it : networkMembers) {
+ // tx.sadd(it.first, it.second);
+ // }
+ // tx.exec();
+ // }
+ // }
+ // }
if (++this->_ready == 2) {
if (_waitNoticePrinted) {
fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt());
@@ -755,7 +755,7 @@ void PostgreSQL::heartbeat()
std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD);
std::string now = std::to_string(ts);
std::string host_port = std::to_string(_listenPort);
- std::string use_redis = (_rc != NULL) ? "true" : "false";
+ std::string use_redis = "false"; // (_rc != NULL) ? "true" : "false";
const char *values[10] = {
controllerId,
hostname,
@@ -788,13 +788,13 @@ void PostgreSQL::heartbeat()
}
PQclear(res);
}
- if (_rc != NULL) {
- if (_rc->clusterMode) {
- _cluster->zadd("controllers", controllerId, ts);
- } else {
- _redis->zadd("controllers", controllerId, ts);
- }
- }
+ // if (_rc != NULL) {
+ // if (_rc->clusterMode) {
+ // _cluster->zadd("controllers", controllerId, ts);
+ // } else {
+ // _redis->zadd("controllers", controllerId, ts);
+ // }
+ // }
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
@@ -833,6 +833,7 @@ void PostgreSQL::membersDbWatcher()
void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) {
char buf[11] = {0};
std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf));
+ fprintf(stderr, "Listening to member stream: %s\n", cmd.c_str());
PGresult *res = PQexec(conn, cmd.c_str());
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res));
@@ -874,7 +875,7 @@ void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) {
void PostgreSQL::_membersWatcher_Redis() {
char buf[11] = {0};
std::string key = "member-stream:{" + std::string(_myAddress.toString(buf)) + "}";
-
+ fprintf(stderr, "Listening to member stream: %s\n", key.c_str());
while (_run == 1) {
try {
json tmp;
@@ -1515,20 +1516,20 @@ void PostgreSQL::commitThread()
} catch (std::exception &e) {
fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
}
- if (_rc != NULL) {
- try {
- std::string id = (*config)["id"];
- std::string controllerId = _myAddressStr.c_str();
- std::string key = "networks:{" + controllerId + "}";
- if (_rc->clusterMode) {
- _cluster->sadd(key, id);
- } else {
- _redis->sadd(key, id);
- }
- } catch (sw::redis::Error &e) {
- fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
- }
- }
+ // if (_rc != NULL) {
+ // try {
+ // std::string id = (*config)["id"];
+ // std::string controllerId = _myAddressStr.c_str();
+ // std::string key = "networks:{" + controllerId + "}";
+ // if (_rc->clusterMode) {
+ // _cluster->sadd(key, id);
+ // } else {
+ // _redis->sadd(key, id);
+ // }
+ // } catch (sw::redis::Error &e) {
+ // fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
+ // }
+ // }
} else if (objtype == "_delete_network") {
try {
std::string networkId = (*config)["nwid"];
@@ -1552,22 +1553,22 @@ void PostgreSQL::commitThread()
} catch (std::exception &e) {
fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what());
}
- if (_rc != NULL) {
- try {
- std::string id = (*config)["id"];
- std::string controllerId = _myAddressStr.c_str();
- std::string key = "networks:{" + controllerId + "}";
- if (_rc->clusterMode) {
- _cluster->srem(key, id);
- _cluster->del("network-nodes-online:{"+controllerId+"}:"+id);
- } else {
- _redis->srem(key, id);
- _redis->del("network-nodes-online:{"+controllerId+"}:"+id);
- }
- } catch (sw::redis::Error &e) {
- fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
- }
- }
+ // if (_rc != NULL) {
+ // try {
+ // std::string id = (*config)["id"];
+ // std::string controllerId = _myAddressStr.c_str();
+ // std::string key = "networks:{" + controllerId + "}";
+ // if (_rc->clusterMode) {
+ // _cluster->srem(key, id);
+ // _cluster->del("network-nodes-online:{"+controllerId+"}:"+id);
+ // } else {
+ // _redis->srem(key, id);
+ // _redis->del("network-nodes-online:{"+controllerId+"}:"+id);
+ // }
+ // } catch (sw::redis::Error &e) {
+ // fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
+ // }
+ // }
} else if (objtype == "_delete_member") {
try {
std::string memberId = (*config)["id"];
@@ -1595,23 +1596,23 @@ void PostgreSQL::commitThread()
} catch (std::exception &e) {
fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what());
}
- if (_rc != NULL) {
- try {
- std::string memberId = (*config)["id"];
- std::string networkId = (*config)["nwid"];
- std::string controllerId = _myAddressStr.c_str();
- std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId;
- if (_rc->clusterMode) {
- _cluster->srem(key, memberId);
- _cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
- } else {
- _redis->srem(key, memberId);
- _redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
- }
- } catch (sw::redis::Error &e) {
- fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what());
- }
- }
+ // if (_rc != NULL) {
+ // try {
+ // std::string memberId = (*config)["id"];
+ // std::string networkId = (*config)["nwid"];
+ // std::string controllerId = _myAddressStr.c_str();
+ // std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId;
+ // if (_rc->clusterMode) {
+ // _cluster->srem(key, memberId);
+ // _cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
+ // } else {
+ // _redis->srem(key, memberId);
+ // _redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
+ // }
+ // } catch (sw::redis::Error &e) {
+ // fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what());
+ // }
+ // }
} else {
fprintf(stderr, "ERROR: unknown objtype");
}
@@ -1619,7 +1620,7 @@ void PostgreSQL::commitThread()
fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what());
}
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
PQfinish(conn);
@@ -1634,11 +1635,11 @@ void PostgreSQL::onlineNotificationThread()
{
waitForReady();
- if (_rc != NULL) {
- onlineNotification_Redis();
- } else {
+ // if (_rc != NULL) {
+ // onlineNotification_Redis();
+ // } else {
onlineNotification_Postgres();
- }
+ // }
}
void PostgreSQL::onlineNotification_Postgres()
@@ -1783,7 +1784,7 @@ void PostgreSQL::onlineNotification_Redis()
fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what());
#endif
}
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ std::this_thread::sleep_for(std::chrono::seconds(10));
}
}