diff options
Diffstat (limited to 'controller/PostgreSQL.cpp')
| -rw-r--r-- | controller/PostgreSQL.cpp | 153 |
1 files changed, 120 insertions, 33 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 319c0268..b1682e7e 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -21,6 +21,8 @@ #include "../version.h" #include "Redis.hpp" +#include <smeeclient.h> + #include <libpq-fe.h> #include <sstream> #include <iomanip> @@ -159,6 +161,8 @@ using Attrs = std::vector<std::pair<std::string, std::string>>; using Item = std::pair<std::string, Attrs>; using ItemStream = std::vector<Item>; + + PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, RedisConfig *rc) : DB() , _pool() @@ -173,6 +177,7 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R , _redis(NULL) , _cluster(NULL) , _redisMemberStatus(false) + , _smee(NULL) { char myAddress[64]; _myAddressStr = myId.address().toString(myAddress); @@ -248,10 +253,17 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R _commitThread[i] = std::thread(&PostgreSQL::commitThread, this); } _onlineNotificationThread = std::thread(&PostgreSQL::onlineNotificationThread, this); + + configureSmee(); } PostgreSQL::~PostgreSQL() { + if (_smee != NULL) { + smeeclient::smee_client_delete(_smee); + _smee = NULL; + } + _run = 0; std::this_thread::sleep_for(std::chrono::milliseconds(100)); @@ -265,6 +277,31 @@ PostgreSQL::~PostgreSQL() _onlineNotificationThread.join(); } +void PostgreSQL::configureSmee() +{ + const char *TEMPORAL_SCHEME = "ZT_TEMPORAL_SCHEME"; + const char *TEMPORAL_HOST = "ZT_TEMPORAL_HOST"; + const char *TEMPORAL_PORT = "ZT_TEMPORAL_PORT"; + const char *TEMPORAL_NAMESPACE = "ZT_TEMPORAL_NAMESPACE"; + const char *SMEE_TASK_QUEUE = "ZT_SMEE_TASK_QUEUE"; + + const char *scheme = getenv(TEMPORAL_SCHEME); + if (scheme == NULL) { + scheme = "http"; + } + const char *host = getenv(TEMPORAL_HOST); + const char *port = getenv(TEMPORAL_PORT); + const char *ns = getenv(TEMPORAL_NAMESPACE); + const char *task_queue = getenv(SMEE_TASK_QUEUE); + + if (scheme != NULL && host != NULL && port != NULL && ns != NULL && task_queue != NULL) { + fprintf(stderr, "creating smee client\n"); + std::string hostPort = std::string(scheme) + std::string("://") + std::string(host) + std::string(":") + std::string(port); + this->_smee = smeeclient::smee_client_new(hostPort.c_str(), ns, task_queue); + } else { + fprintf(stderr, "Smee client not configured\n"); + } +} bool PostgreSQL::waitForReady() { @@ -1306,40 +1343,72 @@ void PostgreSQL::commitThread() continue; } - pqxx::result res = w.exec_params0( - "INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, " - "identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, " - "remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) " - "VALUES ($1, $2, $3, $4, $5, $6, " - "TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), " - "$9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (network_id, id) DO UPDATE SET " - "active_bridge = EXCLUDED.active_bridge, authorized = EXCLUDED.authorized, capabilities = EXCLUDED.capabilities, " - "identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, " - "last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, " - "remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, " - "revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, " - "v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto", - memberId, - networkId, - (bool)config["activeBridge"], - (bool)config["authorized"], - OSUtils::jsonDump(config["capabilities"], -1), - OSUtils::jsonString(config["identity"], ""), - (uint64_t)config["lastAuthorizedTime"], - (uint64_t)config["lastDeauthorizedTime"], - (bool)config["noAutoAssignIps"], - (int)config["remoteTraceLevel"], - target, - (uint64_t)config["revision"], - OSUtils::jsonDump(config["tags"], -1), - (int)config["vMajor"], - (int)config["vMinor"], - (int)config["vRev"], - (int)config["vProto"]); + pqxx::row mrow = w.exec_params1("SELECT COUNT(id) FROM ztc_member WHERE id = $1 AND network_id = $2", memberId, networkId); + int membercount = mrow[0].as<int>(); + + bool isNewMember = false; + if (membercount == 0) { + // new member + isNewMember = true; + pqxx::result res = w.exec_params0( + "INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, " + "identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, " + "remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) " + "VALUES ($1, $2, $3, $4, $5, $6, " + "TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), " + "$9, $10, $11, $12, $13, $14, $15, $16, $17)", + memberId, + networkId, + (bool)config["activeBridge"], + (bool)config["authorized"], + OSUtils::jsonDump(config["capabilities"], -1), + OSUtils::jsonString(config["identity"], ""), + (uint64_t)config["lastAuthorizedTime"], + (uint64_t)config["lastDeauthorizedTime"], + (bool)config["noAutoAssignIps"], + (int)config["remoteTraceLevel"], + target, + (uint64_t)config["revision"], + OSUtils::jsonDump(config["tags"], -1), + (int)config["vMajor"], + (int)config["vMinor"], + (int)config["vRev"], + (int)config["vProto"]); + } else { + // existing member + pqxx::result res = w.exec_params0( + "UPDATE ztc_member " + "SET active_bridge = $3, authorized = $4, capabilities = $5, identity = $6, " + "last_authorized_time = TO_TIMESTAMP($7::double precision/1000), " + "last_deauthorized_time = TO_TIMESTAMP($8::double precision/1000), " + "no_auto_assign_ips = $9, remote_trace_level = $10, remote_trace_target= $11, " + "revision = $12, tags = $13, v_major = $14, v_minor = $15, v_rev = $16, v_proto = $17 " + "WHERE id = $1 AND network_id = $2", + memberId, + networkId, + (bool)config["activeBridge"], + (bool)config["authorized"], + OSUtils::jsonDump(config["capabilities"], -1), + OSUtils::jsonString(config["identity"], ""), + (uint64_t)config["lastAuthorizedTime"], + (uint64_t)config["lastDeauthorizedTime"], + (bool)config["noAutoAssignIps"], + (int)config["remoteTraceLevel"], + target, + (uint64_t)config["revision"], + OSUtils::jsonDump(config["tags"], -1), + (int)config["vMajor"], + (int)config["vMinor"], + (int)config["vRev"], + (int)config["vProto"] + ); + } - res = w.exec_params0("DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", - memberId, networkId); + if (!isNewMember) { + pqxx::result res = w.exec_params0("DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", + memberId, networkId); + } std::vector<std::string> assignments; bool ipAssignError = false; @@ -1350,7 +1419,7 @@ void PostgreSQL::commitThread() continue; } - res = w.exec_params0( + pqxx::result res = w.exec_params0( "INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3) ON CONFLICT (network_id, member_id, address) DO NOTHING", memberId, networkId, addr); @@ -1366,6 +1435,17 @@ void PostgreSQL::commitThread() w.commit(); + if (_smee != NULL && isNewMember) { + notifyNewMember(networkId, memberId); + } else { + if (_smee == NULL) { + fprintf(stderr, "smee is NULL\n"); + } + if (!isNewMember) { + fprintf(stderr, "nt a new member\n"); + } + } + const uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL); const uint64_t memberidInt = OSUtils::jsonIntHex(config["id"], 0ULL); if (nwidInt && memberidInt) { @@ -1609,6 +1689,13 @@ void PostgreSQL::commitThread() fprintf(stderr, "%s commitThread finished\n", _myAddressStr.c_str()); } +void PostgreSQL::notifyNewMember(const std::string &networkID, const std::string &memberID) { + smeeclient::smee_client_notify_network_joined( + _smee, + networkID.c_str(), + memberID.c_str()); +} + void PostgreSQL::onlineNotificationThread() { waitForReady(); |
