summaryrefslogtreecommitdiff
path: root/controller/PostgreSQL.cpp
diff options
context:
space:
mode:
authorGrant Limberg <[email protected]>2023-10-30 08:09:40 -0700
committerGitHub <[email protected]>2023-10-30 08:09:40 -0700
commitf89cde81869b5efc130c4e3c96608316e204907e (patch)
treef55f2b309df87ad926f2fe50cc6d62fd5730ed34 /controller/PostgreSQL.cpp
parent9ae8b0b3b60b27cf06d7e74629c17e4a0f248364 (diff)
parentc89683fb0f9d157b3301731e69b1c83cd7697924 (diff)
Merge pull request #2163 from zerotier:temporal
Temporal integration with hosted controllers
Diffstat (limited to 'controller/PostgreSQL.cpp')
-rw-r--r--controller/PostgreSQL.cpp153
1 files changed, 120 insertions, 33 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp
index 319c0268..b1682e7e 100644
--- a/controller/PostgreSQL.cpp
+++ b/controller/PostgreSQL.cpp
@@ -21,6 +21,8 @@
#include "../version.h"
#include "Redis.hpp"
+#include <smeeclient.h>
+
#include <libpq-fe.h>
#include <sstream>
#include <iomanip>
@@ -159,6 +161,8 @@ using Attrs = std::vector<std::pair<std::string, std::string>>;
using Item = std::pair<std::string, Attrs>;
using ItemStream = std::vector<Item>;
+
+
PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, RedisConfig *rc)
: DB()
, _pool()
@@ -173,6 +177,7 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R
, _redis(NULL)
, _cluster(NULL)
, _redisMemberStatus(false)
+ , _smee(NULL)
{
char myAddress[64];
_myAddressStr = myId.address().toString(myAddress);
@@ -248,10 +253,17 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R
_commitThread[i] = std::thread(&PostgreSQL::commitThread, this);
}
_onlineNotificationThread = std::thread(&PostgreSQL::onlineNotificationThread, this);
+
+ configureSmee();
}
PostgreSQL::~PostgreSQL()
{
+ if (_smee != NULL) {
+ smeeclient::smee_client_delete(_smee);
+ _smee = NULL;
+ }
+
_run = 0;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
@@ -265,6 +277,31 @@ PostgreSQL::~PostgreSQL()
_onlineNotificationThread.join();
}
+void PostgreSQL::configureSmee()
+{
+ const char *TEMPORAL_SCHEME = "ZT_TEMPORAL_SCHEME";
+ const char *TEMPORAL_HOST = "ZT_TEMPORAL_HOST";
+ const char *TEMPORAL_PORT = "ZT_TEMPORAL_PORT";
+ const char *TEMPORAL_NAMESPACE = "ZT_TEMPORAL_NAMESPACE";
+ const char *SMEE_TASK_QUEUE = "ZT_SMEE_TASK_QUEUE";
+
+ const char *scheme = getenv(TEMPORAL_SCHEME);
+ if (scheme == NULL) {
+ scheme = "http";
+ }
+ const char *host = getenv(TEMPORAL_HOST);
+ const char *port = getenv(TEMPORAL_PORT);
+ const char *ns = getenv(TEMPORAL_NAMESPACE);
+ const char *task_queue = getenv(SMEE_TASK_QUEUE);
+
+ if (scheme != NULL && host != NULL && port != NULL && ns != NULL && task_queue != NULL) {
+ fprintf(stderr, "creating smee client\n");
+ std::string hostPort = std::string(scheme) + std::string("://") + std::string(host) + std::string(":") + std::string(port);
+ this->_smee = smeeclient::smee_client_new(hostPort.c_str(), ns, task_queue);
+ } else {
+ fprintf(stderr, "Smee client not configured\n");
+ }
+}
bool PostgreSQL::waitForReady()
{
@@ -1306,40 +1343,72 @@ void PostgreSQL::commitThread()
continue;
}
- pqxx::result res = w.exec_params0(
- "INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, "
- "identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, "
- "remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) "
- "VALUES ($1, $2, $3, $4, $5, $6, "
- "TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), "
- "$9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (network_id, id) DO UPDATE SET "
- "active_bridge = EXCLUDED.active_bridge, authorized = EXCLUDED.authorized, capabilities = EXCLUDED.capabilities, "
- "identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, "
- "last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, "
- "remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, "
- "revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, "
- "v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto",
- memberId,
- networkId,
- (bool)config["activeBridge"],
- (bool)config["authorized"],
- OSUtils::jsonDump(config["capabilities"], -1),
- OSUtils::jsonString(config["identity"], ""),
- (uint64_t)config["lastAuthorizedTime"],
- (uint64_t)config["lastDeauthorizedTime"],
- (bool)config["noAutoAssignIps"],
- (int)config["remoteTraceLevel"],
- target,
- (uint64_t)config["revision"],
- OSUtils::jsonDump(config["tags"], -1),
- (int)config["vMajor"],
- (int)config["vMinor"],
- (int)config["vRev"],
- (int)config["vProto"]);
+ pqxx::row mrow = w.exec_params1("SELECT COUNT(id) FROM ztc_member WHERE id = $1 AND network_id = $2", memberId, networkId);
+ int membercount = mrow[0].as<int>();
+
+ bool isNewMember = false;
+ if (membercount == 0) {
+ // new member
+ isNewMember = true;
+ pqxx::result res = w.exec_params0(
+ "INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, "
+ "identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, "
+ "remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) "
+ "VALUES ($1, $2, $3, $4, $5, $6, "
+ "TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), "
+ "$9, $10, $11, $12, $13, $14, $15, $16, $17)",
+ memberId,
+ networkId,
+ (bool)config["activeBridge"],
+ (bool)config["authorized"],
+ OSUtils::jsonDump(config["capabilities"], -1),
+ OSUtils::jsonString(config["identity"], ""),
+ (uint64_t)config["lastAuthorizedTime"],
+ (uint64_t)config["lastDeauthorizedTime"],
+ (bool)config["noAutoAssignIps"],
+ (int)config["remoteTraceLevel"],
+ target,
+ (uint64_t)config["revision"],
+ OSUtils::jsonDump(config["tags"], -1),
+ (int)config["vMajor"],
+ (int)config["vMinor"],
+ (int)config["vRev"],
+ (int)config["vProto"]);
+ } else {
+ // existing member
+ pqxx::result res = w.exec_params0(
+ "UPDATE ztc_member "
+ "SET active_bridge = $3, authorized = $4, capabilities = $5, identity = $6, "
+ "last_authorized_time = TO_TIMESTAMP($7::double precision/1000), "
+ "last_deauthorized_time = TO_TIMESTAMP($8::double precision/1000), "
+ "no_auto_assign_ips = $9, remote_trace_level = $10, remote_trace_target= $11, "
+ "revision = $12, tags = $13, v_major = $14, v_minor = $15, v_rev = $16, v_proto = $17 "
+ "WHERE id = $1 AND network_id = $2",
+ memberId,
+ networkId,
+ (bool)config["activeBridge"],
+ (bool)config["authorized"],
+ OSUtils::jsonDump(config["capabilities"], -1),
+ OSUtils::jsonString(config["identity"], ""),
+ (uint64_t)config["lastAuthorizedTime"],
+ (uint64_t)config["lastDeauthorizedTime"],
+ (bool)config["noAutoAssignIps"],
+ (int)config["remoteTraceLevel"],
+ target,
+ (uint64_t)config["revision"],
+ OSUtils::jsonDump(config["tags"], -1),
+ (int)config["vMajor"],
+ (int)config["vMinor"],
+ (int)config["vRev"],
+ (int)config["vProto"]
+ );
+ }
- res = w.exec_params0("DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2",
- memberId, networkId);
+ if (!isNewMember) {
+ pqxx::result res = w.exec_params0("DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2",
+ memberId, networkId);
+ }
std::vector<std::string> assignments;
bool ipAssignError = false;
@@ -1350,7 +1419,7 @@ void PostgreSQL::commitThread()
continue;
}
- res = w.exec_params0(
+ pqxx::result res = w.exec_params0(
"INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3) ON CONFLICT (network_id, member_id, address) DO NOTHING",
memberId, networkId, addr);
@@ -1366,6 +1435,17 @@ void PostgreSQL::commitThread()
w.commit();
+ if (_smee != NULL && isNewMember) {
+ notifyNewMember(networkId, memberId);
+ } else {
+ if (_smee == NULL) {
+ fprintf(stderr, "smee is NULL\n");
+ }
+ if (!isNewMember) {
+ fprintf(stderr, "nt a new member\n");
+ }
+ }
+
const uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL);
const uint64_t memberidInt = OSUtils::jsonIntHex(config["id"], 0ULL);
if (nwidInt && memberidInt) {
@@ -1609,6 +1689,13 @@ void PostgreSQL::commitThread()
fprintf(stderr, "%s commitThread finished\n", _myAddressStr.c_str());
}
+void PostgreSQL::notifyNewMember(const std::string &networkID, const std::string &memberID) {
+ smeeclient::smee_client_notify_network_joined(
+ _smee,
+ networkID.c_str(),
+ memberID.c_str());
+}
+
void PostgreSQL::onlineNotificationThread()
{
waitForReady();