summaryrefslogtreecommitdiff
path: root/controller/PostgreSQL.cpp
diff options
context:
space:
mode:
authorGrant Limberg <[email protected]>2021-10-06 09:39:30 -0700
committerGrant Limberg <[email protected]>2021-10-06 09:39:30 -0700
commit3818351287eab9c734011c7be61e0011a56f30c1 (patch)
treeb255205dd6179a4de3ba194ae4ce65ebb2fc3aae /controller/PostgreSQL.cpp
parent4d26b5a8688f38f125409f65d51d935b35aacd6b (diff)
use pqxx::pipeline for online update thread
Diffstat (limited to 'controller/PostgreSQL.cpp')
-rw-r--r--controller/PostgreSQL.cpp48
1 files changed, 21 insertions, 27 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp
index 53d88d6f..9edcf059 100644
--- a/controller/PostgreSQL.cpp
+++ b/controller/PostgreSQL.cpp
@@ -1337,6 +1337,7 @@ void PostgreSQL::onlineNotification_Postgres()
nlohmann::json jtmp1, jtmp2;
while (_run == 1) {
auto c = _pool->borrow();
+ auto c2 = _pool->borrow();
try {
fprintf(stderr, "%s onlineNotification_Postgres\n", _myAddressStr.c_str());
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
@@ -1346,15 +1347,16 @@ void PostgreSQL::onlineNotification_Postgres()
}
pqxx::work w(*c->c);
+ pqxx::work w2(*c2->c);
- // using pqxx::stream_to would be a really nice alternative here, but
- // unfortunately it doesn't support upserts.
- // fprintf(stderr, "online notification tick\n");
- std::stringstream memberUpdate;
- memberUpdate << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES ";
+ fprintf(stderr, "online notification tick\n");
+
bool firstRun = true;
bool memberAdded = false;
int updateCount = 0;
+
+ pqxx::pipeline pipe(w);
+
for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) {
updateCount += 1;
uint64_t nwid_i = i->first.first;
@@ -1371,16 +1373,10 @@ void PostgreSQL::onlineNotification_Postgres()
std::string networkId(nwidTmp);
std::string memberId(memTmp);
- const char *qvals[2] = {
- networkId.c_str(),
- memberId.c_str()
- };
-
try {
- pqxx::row r = w.exec_params1("SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2",
+ pqxx::row r = w2.exec_params1("SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2",
networkId, memberId);
} catch (pqxx::unexpected_rows &e) {
- // fprintf(stderr, "Member count failed: %s\n", e.what());
continue;
}
@@ -1388,32 +1384,30 @@ void PostgreSQL::onlineNotification_Postgres()
std::string ipAddr = i->second.second.toIpString(ipTmp);
std::string timestamp = std::to_string(ts);
- if (firstRun) {
- firstRun = false;
- } else {
- memberUpdate << ", ";
- }
-
- memberUpdate << "('" << networkId << "', '" << memberId << "', ";
+ std::stringstream memberUpdate;
+ memberUpdate << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES "
+ << "('" << networkId << "', '" << memberId << "', ";
if (ipAddr.empty()) {
memberUpdate << "NULL, ";
} else {
memberUpdate << "'" << ipAddr << "', ";
}
- memberUpdate << "TO_TIMESTAMP(" << timestamp << "::double precision/1000))";
- memberAdded = true;
- }
- memberUpdate << " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated;";
+ memberUpdate << "TO_TIMESTAMP(" << timestamp << "::double precision/1000)) "
+ << " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated";
- if (memberAdded) {
- //fprintf(stderr, "%s\n", memberUpdate.str().c_str());
- pqxx::result res = w.exec0(memberUpdate.str());
- w.commit();
+ pipe.insert(memberUpdate.str());
+ }
+ while(!pipe.empty()) {
+ pipe.retrieve();
}
+
+ pipe.complete();
+ w.commit();
fprintf(stderr, "%s: Updated online status of %d members\n", _myAddressStr.c_str(), updateCount);
} catch (std::exception &e) {
fprintf(stderr, "%s: error in onlinenotification thread: %s\n", _myAddressStr.c_str(), e.what());
}
+ _pool->unborrow(c2);
_pool->unborrow(c);
ConnectionPoolStats stats = _pool->get_stats();