diff options
| author | luwenpeng <[email protected]> | 2020-04-17 18:48:16 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2020-04-27 14:24:07 +0800 |
| commit | 444ebc09878905ba3d8cdb51bdb48f3cdb138659 (patch) | |
| tree | 790c020b6a13fff6658a317e23080d57eed223a7 | |
| parent | a56d9fe8dfeaae55495ea433906a78469be091b2 (diff) | |
TSG-748 tfe 增加 ssh proxy 功能
| -rw-r--r-- | common/include/tfe_stream.h | 1 | ||||
| -rw-r--r-- | conf/tfe/tfe.conf | 46 | ||||
| -rw-r--r-- | platform/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | platform/include/internal/platform.h | 1 | ||||
| -rw-r--r-- | platform/include/internal/proxy.h | 1 | ||||
| -rw-r--r-- | platform/include/internal/ssh_stream.h | 125 | ||||
| -rw-r--r-- | platform/src/acceptor_kni_v1.cpp | 5 | ||||
| -rw-r--r-- | platform/src/proxy.cpp | 11 | ||||
| -rw-r--r-- | platform/src/ssh_stream.cpp | 1164 | ||||
| -rw-r--r-- | platform/src/tcp_stream.cpp | 34 | ||||
| -rw-r--r-- | vendor/CMakeLists.txt | 20 | ||||
| -rw-r--r-- | vendor/libssh-0.9.3.tar.gz | bin | 0 -> 741826 bytes |
12 files changed, 1410 insertions, 1 deletions
diff --git a/common/include/tfe_stream.h b/common/include/tfe_stream.h index e35ccca..7e07421 100644 --- a/common/include/tfe_stream.h +++ b/common/include/tfe_stream.h @@ -10,6 +10,7 @@ enum tfe_stream_proto { STREAM_PROTO_PLAIN = 0, STREAM_PROTO_SSL, + STREAM_PROTO_SSH, STREAM_PROTO_QUIC, STREAM_PROTO_SPDY }; diff --git a/conf/tfe/tfe.conf b/conf/tfe/tfe.conf index 918dbd1..5bae9b4 100644 --- a/conf/tfe/tfe.conf +++ b/conf/tfe/tfe.conf @@ -34,6 +34,52 @@ mc_cache_broker_list=192.168.40.224:9092 # default PXY-EXCH-INTERMEDIA-CERT mc_cache_topic=PXY-EXCH-INTERMEDIA-CERT +[ssh] +# default 0 +# If set 0, Passthough all ssh traffic +ssh_enable=0 +# default 24 +# If the number of ssh threads reaches the limit, Passthough new ssh traffic +ssh_max_thread_num=24 +# default 0 +# If set 0, Force users to use password authentication, otherwise ssh cannot be used. +# If set 1, Hijack as much as possible, Passthough if the hijacking is unsuccessful +ssh_unpassword_auth_bypass=0 +# default 10 +ssh_connect_timeout_sec=10 +# default /etc/ssh/ssh_host_rsa_key +host_rsa_key_path=/etc/ssh/ssh_host_rsa_key +# default /etc/ssh/ssh_host_ecdsa_key +host_ecdsa_key_path=/etc/ssh/ssh_host_ecdsa_key +# default /etc/ssh/ssh_host_dsa_key +host_dsa_key_path=/etc/ssh/ssh_host_dsa_key +# default /etc/ssh/ssh_known_hosts +known_hosts_path=/etc/ssh/ssh_known_hosts +# ssh_verbosity is libssh log switch: 0 or 1, 2, 3, 4 +# 0 :- SSH_LOG_NOLOG: No logging +# 1 :- SSH_LOG_WARNING: Only warnings +# 2 :- SSH_LOG_PROTOCOL: High level protocol information +# 3 :- SSH_LOG_PACKET: Lower level protocol infomations, packet level +# 4 :- SSH_LOG_FUNCTIONS: Every function path +# If ssh_verbosity == 5, ssh proxy data will write to stdout +ssh_verbosity=0 + +# default 0 +ssh_kafka_enable=0 +# default eth0 +ssh_nic_name=eth0 +# default PXY-SSH-PASSWORD +ssh_kafka_topic=PXY-SSH-PASSWORD +# default NULL +ssh_kafka_brokerlist=NULL + +# defalut 4194304 (4 * 1024 * 1024) +ssh_htable_slot_size=4194304 +# default 16777216 (4 * ssh_htable_slot_size) +ssh_htable_max_element_num=16777216 +# default 1800 (30 * 60) +ssh_htable_expire_seconds=1800 + [key_keeper] #Mode: debug - generate cert with ca_path, normal - generate cert with cert store #0 on cache 1 off cache diff --git a/platform/CMakeLists.txt b/platform/CMakeLists.txt index 85a76d7..e9cc029 100644 --- a/platform/CMakeLists.txt +++ b/platform/CMakeLists.txt @@ -3,7 +3,7 @@ find_package(SYSTEMD REQUIRED) add_executable(tfe src/acceptor_kni_v1.cpp src/acceptor_kni_v2.cpp src/ssl_stream.cpp src/key_keeper.cpp src/ssl_fetch_cert.cpp src/ssl_sess_cache.cpp src/ssl_sess_ticket.cpp src/ssl_service_cache.cpp src/ssl_trusted_cert_storage.cpp src/ev_root_ca_metadata.cpp src/ssl_utils.cpp - src/tcp_stream.cpp src/main.cpp src/proxy.cpp src/sender_scm.cpp src/watchdog_kni.cpp) + src/tcp_stream.cpp src/main.cpp src/proxy.cpp src/sender_scm.cpp src/watchdog_kni.cpp src/ssh_stream.cpp) target_include_directories(tfe PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include/external) target_include_directories(tfe PRIVATE ${CMAKE_CURRENT_LIST_DIR}/include/internal) @@ -11,6 +11,7 @@ target_include_directories(tfe PRIVATE ${SYSTEMD_INCLUDE_DIRS}) target_link_libraries(tfe common tango-cache-client) target_link_libraries(tfe pthread dl + libssh-static openssl-ssl-static openssl-crypto-static libevent-static diff --git a/platform/include/internal/platform.h b/platform/include/internal/platform.h index 35f6647..c9390f2 100644 --- a/platform/include/internal/platform.h +++ b/platform/include/internal/platform.h @@ -73,6 +73,7 @@ struct tfe_stream_private struct tfe_stream head; char * str_stream_addr; void * stream_logger; + pthread_t thread; struct tfe_proxy * proxy_ref; struct tfe_thread_ctx * thread_ref; diff --git a/platform/include/internal/proxy.h b/platform/include/internal/proxy.h index b575741..4184b7e 100644 --- a/platform/include/internal/proxy.h +++ b/platform/include/internal/proxy.h @@ -38,6 +38,7 @@ enum TFE_STAT_FIELD /* Protocol */ STAT_STREAM_TCP_PLAIN, STAT_STREAM_TCP_SSL, + STAT_STREAM_TCP_SSH, TFE_STAT_MAX }; diff --git a/platform/include/internal/ssh_stream.h b/platform/include/internal/ssh_stream.h new file mode 100644 index 0000000..24f261a --- /dev/null +++ b/platform/include/internal/ssh_stream.h @@ -0,0 +1,125 @@ +#ifndef _SSH_STREAM_H +#define _SSH_STREAM_H + +#ifdef __cpluscplus +extern "C" +{ +#endif + +#include <platform.h> +#include <poll.h> + +#define LIBSSH_STATIC 1 +#include <libssh/libssh.h> +#include <libssh/server.h> +#include <libssh/callbacks.h> + +#define KEY_EXIST 0 +#define KEY_NOFOND -1 +#define MAX_CHANNES 50 + + typedef struct ssh_stream_ctx_s ssh_stream_ctx_t; + typedef struct ssh_global_conf_s ssh_global_conf_t; + typedef struct ssh_channel_pair_s ssh_channel_pair_t; + typedef struct ssh_channel_wapper_s ssh_channel_wapper_t; + + typedef enum ssh_direct_e + { + DOWNSTREAM = 0, + UPSTREAM = 1, + } ssh_direct_t; + + struct ssh_channel_wapper_s + { + ssh_channel channel; + ssh_direct_t direct; + ssh_stream_ctx_t *ctx; + ssh_channel_wapper_t *peer; + ssh_channel_callbacks_struct cb; + }; + + struct ssh_channel_pair_s + { + ssh_channel_wapper_t down_stream; + ssh_channel_wapper_t up_stream; + }; + + struct ssh_stream_ctx_s + { + int fd_downstream; + int fd_upstream; + + int is_upstream_connected; + int is_downstream_connected; + + ssh_session downstream_session; // need to free + ssh_session upstream_session; // need to free + + ssh_channel base_downstream_channel; + ssh_channel base_upstream_channel; + + ssh_bind bind; // need to free + ssh_event mainloop; // need to free + + struct ssh_callbacks_struct *upstream_cb; // need to free,for x11 cb + + ssh_channel_pair_t *channel_pair_arry[MAX_CHANNES]; + int channel_index; + + char *srcip; // need to free + char *hostname; // need to free + char *port; // need to free + + char *username; // need to free + char *password; // need to free + + char *src_dst_addr; // need to free + int is_upstream_session_open; + int is_authenticated; + int auth_attempts; + + int is_upstream_error; + int is_downstream_error; + + int is_unsupport_auth_method; + int is_upstream_subsystem_enable; + }; + + /* ssh_verbosity: 0 or 1, 2, 3, 4 + * 0 :- SSH_LOG_NOLOG: No logging + * 1 :- SSH_LOG_WARNING: Only warnings + * 2 :- SSH_LOG_PROTOCOL: High level protocol information + * 3 :- SSH_LOG_PACKET: Lower level protocol infomations, packet level + * 4 :- SSH_LOG_FUNCTIONS: Every function path + * if ssh_verbosity == 5, ssh proxy data will write to stdout + */ + struct ssh_global_conf_s + { + int ssh_enable; + int ssh_max_thread_num; + int ssh_verbosity; // 0,1,2,3,4 + int ssh_unpassword_auth_bypass; + int ssh_connect_timeout_sec; + char host_rsa_key_path[TFE_STRING_MAX]; + char host_ecdsa_key_path[TFE_STRING_MAX]; + char host_dsa_key_path[TFE_STRING_MAX]; + char known_hosts_path[TFE_STRING_MAX]; + }; + + extern ssh_global_conf_t ssh_global_ctx; + + int ssh_global_init(const char *profile, const char *section); + void ssh_global_finalize(); + + int ssh_downstream_connect(ssh_stream_ctx_t *ctx); + int ssh_upstream_connect(ssh_stream_ctx_t *ctx); + void ssh_proxy_main(ssh_stream_ctx_t *ctx); + + void *ssh_proxy_thread_entry(void *args); + int ssh_whitelist_get(const char *srcip, const char *hostname, const char *port); + +#ifdef __cpluscplus +} +#endif + +#endif diff --git a/platform/src/acceptor_kni_v1.cpp b/platform/src/acceptor_kni_v1.cpp index 9dad068..d2148ed 100644 --- a/platform/src/acceptor_kni_v1.cpp +++ b/platform/src/acceptor_kni_v1.cpp @@ -65,6 +65,7 @@ enum KNI_TLV_VALUE { KNI_TLV_VALUE_HTTP = 0x1, KNI_TLV_VALUE_SSL = 0x2, + KNI_TLV_VALUE_SSH = 0x3, }; struct kni_tlv_header @@ -176,6 +177,10 @@ static int __kni_parse_tlv_data(struct acceptor_kni_v1 * ctx, { out_para->session_type = STREAM_PROTO_SSL; } + if (__value == KNI_TLV_VALUE_SSH) + { + out_para->session_type = STREAM_PROTO_SSH; + } assert(tlv_info->len == sizeof(uint32_t)); break; } diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index a4663a6..9335448 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -53,6 +53,9 @@ /* Systemd */ #include <systemd/sd-daemon.h> +/* ssh */ +#include <ssh_stream.h> + extern struct ssl_policy_enforcer* ssl_policy_enforcer_create(void* logger); extern enum ssl_stream_action ssl_policy_enforce(struct ssl_stream *upstream, void* u_para); @@ -334,6 +337,7 @@ static const char * __str_stat_spec_map[] = [STAT_STREAM_INCPT_UP_BYTES] = "ustm_incpt_B", [STAT_STREAM_TCP_PLAIN] = "plain", [STAT_STREAM_TCP_SSL] = "ssl", + [STAT_STREAM_TCP_SSH] = "ssh", [TFE_STAT_MAX] = NULL }; @@ -697,6 +701,10 @@ int main(int argc, char * argv[]) g_default_proxy->key_keeper_handler = key_keeper_init(main_profile, "key_keeper", g_default_logger); CHECK_OR_EXIT(g_default_proxy->key_keeper_handler, "Failed at init Key keeper. Exit."); + /* SSH INIT */ + ret = ssh_global_init(main_profile, "ssh"); + CHECK_OR_EXIT(ret == 0, "Failed at init ssh. Exit."); + /* SSL INIT */ g_default_proxy->ssl_mgr_handler = ssl_manager_init(main_profile, "ssl", g_default_proxy->evbase, g_default_proxy->key_keeper_handler, g_default_logger); CHECK_OR_EXIT(g_default_proxy->ssl_mgr_handler, "Failed at init SSL manager. Exit."); @@ -751,6 +759,9 @@ int main(int argc, char * argv[]) } event_base_dispatch(g_default_proxy->evbase); + + ssh_global_finalize(); + return 0; } diff --git a/platform/src/ssh_stream.cpp b/platform/src/ssh_stream.cpp new file mode 100644 index 0000000..65d2171 --- /dev/null +++ b/platform/src/ssh_stream.cpp @@ -0,0 +1,1164 @@ +#include <proxy.h> +#include <ssh_stream.h> +#include <cjson/cJSON.h> +#include <tfe_kafka_logger.h> +#include <MESA/MESA_prof_load.h> + +#define get_direct_str_by_num(direct) (direct == DOWNSTREAM ? "DOWNSTREAM" : "UPSTREAM") + +static tfe_kafka_logger_t *ssh_kafka_logger = NULL; +ssh_global_conf_t ssh_global_ctx; + +/////////////////////// htable utils /////////////////////// + +static MESA_htable_handle htable = NULL; + +static void htable_free_cb(void *data) +{ + if (data) + { + free(data); + } +} + +static int htable_timeout_cb(void *data, int eliminate_type) +{ + TFE_LOG_DEBUG(g_default_logger, "ssh whitelist data timeout: %s", data); + return 1; +} + +static long htable_set_cb(void *data, const uchar *key, uint size, void *user_arg) +{ + if (data) + { + return KEY_EXIST; + } + else + { + UNUSED int ret = MESA_htable_add(htable, key, size, user_arg); + assert(ret >= 0); + return KEY_NOFOND; + } +} + +static long htable_get_cb(void *data, const uchar *key, uint size, void *user_arg) +{ + if (data) + { + return KEY_EXIST; + } + else + { + return KEY_NOFOND; + } +} + +static int ssh_whitelist_create(int slot_size, int max_element_num, int expire_seconds) +{ + UNUSED int ret; + int opt_val; + + htable = MESA_htable_born(); + assert(htable); + + opt_val = 0; + ret = MESA_htable_set_opt(htable, MHO_SCREEN_PRINT_CTRL, &opt_val, sizeof(opt_val)); + assert(ret == 0); + + opt_val = 1; + ret = MESA_htable_set_opt(htable, MHO_THREAD_SAFE, &opt_val, sizeof(opt_val)); + assert(ret == 0); + + opt_val = 16; + ret = MESA_htable_set_opt(htable, MHO_MUTEX_NUM, &opt_val, sizeof(opt_val)); + assert(ret == 0); + + ret = MESA_htable_set_opt(htable, MHO_HASH_SLOT_SIZE, &slot_size, sizeof(slot_size)); + assert(ret == 0); + + ret = MESA_htable_set_opt(htable, MHO_HASH_MAX_ELEMENT_NUM, &max_element_num, sizeof(max_element_num)); + assert(ret == 0); + + ret = MESA_htable_set_opt(htable, MHO_EXPIRE_TIME, &expire_seconds, sizeof(expire_seconds)); + assert(ret == 0); + + opt_val = HASH_ELIMINATE_ALGO_FIFO; + ret = MESA_htable_set_opt(htable, MHO_ELIMIMINATE_TYPE, &opt_val, sizeof(int)); + assert(ret == 0); + + ret = MESA_htable_set_opt(htable, MHO_CBFUN_DATA_FREE, (void *)htable_free_cb, sizeof(&htable_free_cb)); + assert(ret == 0); + + ret = MESA_htable_set_opt(htable, MHO_CBFUN_DATA_EXPIRE_NOTIFY, (void *)htable_timeout_cb, sizeof(&htable_timeout_cb)); + assert(ret == 0); + + ret = MESA_htable_mature(htable); + assert(ret == 0); + + return 0; +} + +static void ssh_whitelist_destory() +{ + if (htable) + MESA_htable_destroy(htable, NULL); +} + +static void ssh_whitelist_set(const char *srcip, const char *hostname, const char *port) +{ + long cb_ret = 0; + char *key = NULL; + char *val = NULL; + if (!ssh_global_ctx.ssh_enable) + return; + + asprintf(&key, "%s %s %s", srcip, hostname, port); + asprintf(&val, "%s %s %s", srcip, hostname, port); + MESA_htable_search_cb(htable, (const unsigned char *)key, strlen(key), htable_set_cb, val, &cb_ret); + if (cb_ret == KEY_EXIST) + { + TFE_LOG_DEBUG(g_default_logger, "KEY_EXIST : %s %s %s", srcip, hostname, port); + htable_free_cb(val); + } + else + { + TFE_LOG_DEBUG(g_default_logger, "KEY_ADDED : %s %s %s", srcip, hostname, port); + } + + free(key); +} + +int ssh_whitelist_get(const char *srcip, const char *hostname, const char *port) +{ + if (!ssh_global_ctx.ssh_enable) + return 0; + + long cb_ret = 0; + char *key = NULL; + asprintf(&key, "%s %s %s", srcip, hostname, port); + MESA_htable_search_cb(htable, (const unsigned char *)key, strlen(key), htable_get_cb, NULL, &cb_ret); + free(key); + if (cb_ret == KEY_EXIST) + { + TFE_LOG_DEBUG(g_default_logger, "KEY_FOUND : %s %s %s", srcip, hostname, port); + return 1; + } + else + { + TFE_LOG_DEBUG(g_default_logger, "KEY_NOFOUND : %s %s %s", srcip, hostname, port); + return 0; + } +} + +/////////////////////// ssh utils /////////////////////// + +static int ssh_verify_knownhost(ssh_session session) +{ + int rc; + int state; + char *hexa = NULL; + unsigned char *hash = NULL; + size_t hlen; + ssh_key srv_pubkey; + + rc = ssh_get_server_publickey(session, &srv_pubkey); + if (rc != SSH_OK) + { + return -1; + } + + rc = ssh_get_publickey_hash(srv_pubkey, SSH_PUBLICKEY_HASH_SHA1, &hash, &hlen); + ssh_key_free(srv_pubkey); + if (rc < 0) + { + return -1; + } + + hexa = ssh_get_hexa(hash, hlen); + if (hexa == NULL) + { + ssh_clean_pubkey_hash(&hash); + return -1; + } + + state = ssh_session_is_known_server(session); + switch (state) + { + case SSH_SERVER_KNOWN_OK: + TFE_LOG_INFO(g_default_logger, "ssh upstream verify knownhost: host key known, Public key hash: %s\n", hexa); + goto ok; + case SSH_SERVER_KNOWN_CHANGED: + TFE_LOG_INFO(g_default_logger, "ssh upstream verify knownhost: host key changed, Public key hash: %s", hexa); + goto ok; + case SSH_SERVER_FOUND_OTHER: + // may be attack + TFE_LOG_INFO(g_default_logger, "ssh upstream verify knownhost: host key not found but an other type of key exists, Public key hash: %s", hexa); + goto error; + case SSH_SERVER_FILE_NOT_FOUND: + /* fallback to SSH_SERVER_NOT_KNOWN behavior */ + case SSH_SERVER_NOT_KNOWN: + TFE_LOG_INFO(g_default_logger, "ssh upstream verify knownhost: host key not known, Public key hash: %s\n", hexa); + if (ssh_session_update_known_hosts(session) != SSH_OK) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream: error to write knownhost, %s\n", ssh_get_error(session)); + goto error; + } + else + { + goto ok; + } + case SSH_SERVER_ERROR: + TFE_LOG_ERROR(g_default_logger, "ssh upstream: error to verify is server known, %s\n", ssh_get_error(session)); + goto error; + } + +ok: + ssh_string_free_char(hexa); + ssh_clean_pubkey_hash(&hash); + return 0; +error: + ssh_string_free_char(hexa); + ssh_clean_pubkey_hash(&hash); + return -1; +} + +static ssh_channel on_channel_open_request_x11(ssh_session session, const char *originator_address, int originator_port, void *userdata); + +static int ssh_upstream_channel_readcb(socket_t fd, int revents, void *userdata) +{ + ssh_stream_ctx_t *ctx = (ssh_stream_ctx_t *)userdata; + int nread = -1; + int nwrite = 0; + int ret; + int total = 0; + char buffer[2048]; + + ssh_channel_set_blocking(ctx->base_downstream_channel, 1); + while (1) + { + memset(buffer, 0, sizeof(buffer)); + nread = ssh_channel_read_nonblocking(ctx->base_upstream_channel, buffer, sizeof(buffer), 0); + if (nread > 0) + { + nwrite = 0; + total += nread; + + if (ssh_global_ctx.ssh_verbosity == 5) + write(STDOUT_FILENO, buffer, nread); + + do + { + ret = ssh_channel_write(ctx->base_downstream_channel, buffer + nwrite, nread - nwrite); + if (ret < 0) + { + TFE_LOG_ERROR(g_default_logger, "ssh downstream, error to write, %s", ssh_get_error(ctx->downstream_session)); + ctx->is_downstream_error = 1; + ssh_channel_set_blocking(ctx->base_downstream_channel, 0); + return total; + } + else + { + nwrite += ret; + } + } while (nwrite != nread); + } + else + { + ssh_channel_set_blocking(ctx->base_downstream_channel, 0); + return total; + } + } +} + +/////////////////////// auth callback /////////////////////// + +// returns SSH_AUTH_SUCCESS Authentication is accepted. +// returns SSH_AUTH_PARTIAL Partial authentication, more authentication means are needed. +// returns SSH_AUTH_DENIED Authentication failed. + +static int auth_password_cb(ssh_session session, const char *user, const char *password, void *userdata) +{ + + TFE_LOG_INFO(g_default_logger, "ssh downstream authenticate use password, user:%s, password:%s", user, password); + + ssh_stream_ctx_t *ctx = (ssh_stream_ctx_t *)userdata; + ctx->auth_attempts++; + if (ctx->username == NULL) + ctx->username = strdup(user); + if (ssh_upstream_connect(ctx)) + { + ctx->is_upstream_error = 1; + return SSH_AUTH_DENIED; + } + + int rc = ssh_userauth_password(ctx->upstream_session, NULL, password); + if (rc == SSH_AUTH_SUCCESS) + { + ctx->password = strdup(password); + ctx->is_authenticated = 1; + ctx->is_unsupport_auth_method = 0; + return SSH_AUTH_SUCCESS; + } + else if (rc == SSH_AUTH_PARTIAL) + { + ctx->is_unsupport_auth_method = 1; + return SSH_AUTH_PARTIAL; + } + else if (rc == SSH_AUTH_ERROR) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error to userauth password, %s", ssh_get_error(ctx->upstream_session)); + return SSH_AUTH_DENIED; + } + else + { + return SSH_AUTH_DENIED; + } +} + +static int auth_none_cb(ssh_session session, const char *user, void *userdata) +{ + TFE_LOG_INFO(g_default_logger, "ssh downstream authenticate use none, user:%s", user); + + ssh_stream_ctx_t *ctx = (ssh_stream_ctx_t *)userdata; + if (ctx->username == NULL) + ctx->username = strdup(user); + if (ssh_upstream_connect(ctx)) + { + ctx->is_upstream_error = 1; + return SSH_AUTH_DENIED; + } + + int rc = ssh_userauth_none(ctx->upstream_session, NULL); + if (rc == SSH_AUTH_SUCCESS) + { + ctx->is_authenticated = 1; + return SSH_AUTH_SUCCESS; + } + else if (rc == SSH_AUTH_ERROR) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error to userauth none, %s", ssh_get_error(ctx->upstream_session)); + } + else + { + int ssh_upstream_support_auth_methods = ssh_userauth_list(ctx->upstream_session, NULL); + // if ssh_unpassword_auth_bypass set 0, Force users to use password authentication, otherwise ssh cannot be used. + if (!ssh_global_ctx.ssh_unpassword_auth_bypass && ssh_upstream_support_auth_methods & SSH_AUTH_METHOD_PASSWORD) + ssh_set_auth_methods(ctx->downstream_session, SSH_AUTH_METHOD_PASSWORD); + else + ssh_set_auth_methods(ctx->downstream_session, ssh_upstream_support_auth_methods); + } + + return SSH_AUTH_DENIED; +} + +static int auth_gssapi_mic_cb(ssh_session session, const char *user, const char *principal, void *userdata) +{ + TFE_LOG_INFO(g_default_logger, "ssh downstream authenticate use gssapi mic, user:%s, principal:%s", user, principal); + + ssh_stream_ctx_t *ctx = (ssh_stream_ctx_t *)userdata; + ctx->is_unsupport_auth_method = 1; + + return SSH_AUTH_DENIED; +} + +static int auth_pubkey_cb(ssh_session session, const char *user, struct ssh_key_struct *pubkey, char signature_state, void *userdata) +{ + TFE_LOG_INFO(g_default_logger, "ssh downstream authenticate use pubkey, user:%s", user); + + ssh_stream_ctx_t *ctx = (ssh_stream_ctx_t *)userdata; + ctx->is_unsupport_auth_method = 1; + + return SSH_AUTH_DENIED; +} + +/////////////////////// request callback /////////////////////// + +static int on_channel_data(ssh_session session, ssh_channel channel, void *data, uint32_t len, int is_stderr, void *userdata) +{ + ssh_channel_wapper_t *wapper = (ssh_channel_wapper_t *)userdata; + // TFE_LOG_INFO(g_default_logger, "ssh sub channel on data[%s] %p -> %p", get_direct_str_by_num(wapper->direct), wapper->channel, wapper->peer->channel); + int nwrite = ssh_channel_write(wapper->peer->channel, data, len); + if (nwrite == SSH_ERROR) + return 0; + else + return nwrite; +} + +static void on_channel_eof(ssh_session session, ssh_channel channel, void *userdata) +{ + ssh_channel_wapper_t *wapper = (ssh_channel_wapper_t *)userdata; + TFE_LOG_INFO(g_default_logger, "ssh sub channel on eof[%s] %p -> %p", get_direct_str_by_num(wapper->direct), wapper->channel, wapper->peer->channel); + + ssh_channel_send_eof(wapper->peer->channel); +} + +static void on_channel_close(ssh_session session, ssh_channel channel, void *userdata) +{ + ssh_channel_wapper_t *wapper = (ssh_channel_wapper_t *)userdata; + TFE_LOG_INFO(g_default_logger, "ssh sub channel on close[%s] %p -> %p", get_direct_str_by_num(wapper->direct), wapper->channel, wapper->peer->channel); + + ssh_channel_close(wapper->channel); + ssh_channel_close(wapper->peer->channel); +} + +static void on_channel_signal(ssh_session session, ssh_channel channel, const char *signal, void *userdata) +{ + ssh_channel_wapper_t *wapper = (ssh_channel_wapper_t *)userdata; + TFE_LOG_INFO(g_default_logger, "ssh sub channel on signal[%s] %p -> %p", get_direct_str_by_num(wapper->direct), wapper->channel, wapper->peer->channel); + + ssh_channel_request_send_signal(wapper->peer->channel, signal); +} + +static void on_channel_exit_status(ssh_session session, ssh_channel channel, int exit_status, void *userdata) +{ + ssh_channel_wapper_t *wapper = (ssh_channel_wapper_t *)userdata; + TFE_LOG_INFO(g_default_logger, "ssh sub channel on exit status[%s] %p -> %p", get_direct_str_by_num(wapper->direct), wapper->channel, wapper->peer->channel); + + ssh_channel_request_send_exit_status(wapper->peer->channel, exit_status); +} + +static void on_channel_exit_signal(ssh_session session, ssh_channel channel, const char *signal, int core, const char *errmsg, const char *lang, void *userdata) +{ + ssh_channel_wapper_t *wapper = (ssh_channel_wapper_t *)userdata; + TFE_LOG_INFO(g_default_logger, "ssh sub channel on exit signal[%s] %p -> %p", get_direct_str_by_num(wapper->direct), wapper->channel, wapper->peer->channel); + + ssh_channel_request_send_exit_signal(wapper->peer->channel, signal, core, errmsg, lang); +} + +static int on_channel_pty_request(ssh_session session, ssh_channel channel, const char *term, int width, int height, int pxwidth, int pwheight, void *userdata) +{ + ssh_channel_wapper_t *wapper = (ssh_channel_wapper_t *)userdata; + TFE_LOG_INFO(g_default_logger, "ssh sub channel on pty request[%s] %p -> %p", get_direct_str_by_num(wapper->direct), wapper->channel, wapper->peer->channel); + + ssh_channel_set_blocking(wapper->peer->channel, 1); + if (ssh_channel_request_pty_size(wapper->peer->channel, term, width, height) != SSH_OK) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error on pty request: %s", ssh_get_error(wapper->ctx->upstream_session)); + ssh_channel_set_blocking(wapper->peer->channel, 0); + return -1; + } + ssh_channel_set_blocking(wapper->peer->channel, 0); + + return 0; +} + +static int on_channel_shell_request(ssh_session session, ssh_channel channel, void *userdata) +{ + ssh_channel_wapper_t *wapper = (ssh_channel_wapper_t *)userdata; + TFE_LOG_INFO(g_default_logger, "ssh sub channel on shell request[%s] %p -> %p", get_direct_str_by_num(wapper->direct), wapper->channel, wapper->peer->channel); + + ssh_channel_set_blocking(wapper->peer->channel, 1); + if (ssh_channel_request_shell(wapper->peer->channel) != SSH_OK) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error on shell request: %s", ssh_get_error(wapper->ctx->upstream_session)); + ssh_channel_set_blocking(wapper->peer->channel, 0); + return 1; + } + ssh_channel_set_blocking(wapper->peer->channel, 0); + + return 0; +} + +static void on_channel_x11_req(ssh_session session, ssh_channel channel, int single_connection, const char *auth_protocol, const char *auth_cookie, uint32_t screen_number, void *userdata) +{ + ssh_channel_wapper_t *wapper = (ssh_channel_wapper_t *)userdata; + TFE_LOG_INFO(g_default_logger, "ssh sub channel on x11 request[%s] %p -> %p", get_direct_str_by_num(wapper->direct), wapper->channel, wapper->peer->channel); + + ssh_channel_set_blocking(wapper->peer->channel, 1); + if (ssh_channel_request_x11(wapper->peer->channel, single_connection, auth_protocol, auth_cookie, screen_number) != SSH_OK) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error on x11 request: %s", ssh_get_error(wapper->ctx->upstream_session)); + ssh_channel_set_blocking(wapper->peer->channel, 0); + return; + } + ssh_channel_set_blocking(wapper->peer->channel, 0); + if (wapper->ctx->upstream_cb == NULL) + { + wapper->ctx->upstream_cb = (struct ssh_callbacks_struct *)calloc(1, sizeof(struct ssh_callbacks_struct)); + assert(wapper->ctx->upstream_cb); + } + wapper->ctx->upstream_cb->userdata = wapper; + wapper->ctx->upstream_cb->channel_open_request_x11_function = on_channel_open_request_x11; + ssh_callbacks_init(wapper->ctx->upstream_cb); + ssh_set_callbacks(wapper->ctx->upstream_session, wapper->ctx->upstream_cb); +} + +static int on_channel_pty_window_change(ssh_session session, ssh_channel channel, int width, int height, int pxwidth, int pwheight, void *userdata) +{ + ssh_channel_wapper_t *wapper = (ssh_channel_wapper_t *)userdata; + TFE_LOG_INFO(g_default_logger, "ssh sub channel on pty window change[%s] %p -> %p", get_direct_str_by_num(wapper->direct), wapper->channel, wapper->peer->channel); + + ssh_channel_set_blocking(wapper->peer->channel, 1); + if (ssh_channel_change_pty_size(wapper->peer->channel, width, height) != SSH_OK) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error on pty window change: %s", ssh_get_error(wapper->ctx->upstream_session)); + ssh_channel_set_blocking(wapper->peer->channel, 0); + return -1; + } + ssh_channel_set_blocking(wapper->peer->channel, 0); + + return 0; +} + +static int on_channel_exec_request(ssh_session session, ssh_channel channel, const char *command, void *userdata) +{ + ssh_channel_wapper_t *wapper = (ssh_channel_wapper_t *)userdata; + TFE_LOG_INFO(g_default_logger, "ssh sub channel on exec request[%s] %p -> %p, %s", get_direct_str_by_num(wapper->direct), wapper->channel, wapper->peer->channel, command); + + ssh_channel_set_blocking(wapper->peer->channel, 1); + if (ssh_channel_request_exec(wapper->peer->channel, command) != SSH_OK) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error on exec request: %s", ssh_get_error(wapper->ctx->upstream_session)); + ssh_channel_set_blocking(wapper->peer->channel, 0); + return 1; + } + ssh_channel_set_blocking(wapper->peer->channel, 0); + + return 0; +} + +static int on_channel_env_request(ssh_session session, ssh_channel channel, const char *env_name, const char *env_value, void *userdata) +{ + ssh_channel_wapper_t *wapper = (ssh_channel_wapper_t *)userdata; + TFE_LOG_INFO(g_default_logger, "ssh sub channel on env request[%s] %p -> %p, %s:%s", get_direct_str_by_num(wapper->direct), wapper->channel, wapper->peer->channel, env_name, env_value); + + ssh_channel_set_blocking(wapper->peer->channel, 1); + if (ssh_channel_request_env(wapper->peer->channel, env_name, env_value) != SSH_OK) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error to env request: %s", ssh_get_error(wapper->ctx->upstream_session)); + ssh_channel_set_blocking(wapper->peer->channel, 0); + return 1; + } + ssh_channel_set_blocking(wapper->peer->channel, 0); + + return 0; +} + +static int on_channel_subsystem_request(ssh_session session, ssh_channel channel, const char *subsystem, void *userdata) +{ + ssh_channel_wapper_t *wapper = (ssh_channel_wapper_t *)userdata; + TFE_LOG_INFO(g_default_logger, "ssh sub channel on subsystem request[%s] %p -> %p, %s", get_direct_str_by_num(wapper->direct), wapper->channel, wapper->peer->channel, subsystem); + + if (wapper->ctx->is_upstream_subsystem_enable) + return 0; + ssh_channel_set_blocking(wapper->peer->channel, 1); + if (ssh_channel_request_subsystem(wapper->peer->channel, subsystem) != SSH_OK) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error on subsystem request: %s", ssh_get_error(wapper->ctx->upstream_session)); + ssh_channel_set_blocking(wapper->peer->channel, 0); + return 1; + } + ssh_channel_set_blocking(wapper->peer->channel, 0); + wapper->ctx->is_upstream_subsystem_enable = 1; + + return 0; +} + +static ssh_channel on_channel_open_request_x11(ssh_session session, const char *originator_address, int originator_port, void *userdata) +{ + ssh_channel_wapper_t *wapper = (ssh_channel_wapper_t *)userdata; + ssh_stream_ctx_t *ctx = wapper->ctx; + TFE_LOG_INFO(g_default_logger, "ssh upstream channel open req x11: %s:%d, count:%d", originator_address, originator_port, ctx->channel_index); + + if (ctx->channel_index >= MAX_CHANNES - 1) + { + TFE_LOG_ERROR(g_default_logger, "ssh sub channels too much: %d", ctx->channel_index); + return NULL; + } + + ssh_channel_pair_t *pairt = (ssh_channel_pair_t *)calloc(1, sizeof(ssh_channel_pair_t)); + ctx->channel_pair_arry[ctx->channel_index++] = pairt; + + pairt->up_stream.channel = ssh_channel_new(ctx->upstream_session); + if (ssh_channel_open_x11(pairt->up_stream.channel, originator_address, originator_port) == SSH_ERROR) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error on open x11: %s", ssh_get_error(ctx->upstream_session)); + return NULL; + } + pairt->down_stream.channel = ssh_channel_new(ctx->downstream_session); + if (ssh_channel_open_x11(pairt->down_stream.channel, originator_address, originator_port) == SSH_ERROR) + { + TFE_LOG_ERROR(g_default_logger, "ssh downstream, error on open x11: %s", ssh_get_error(ctx->downstream_session)); + return NULL; + } + ssh_channel_set_blocking(pairt->up_stream.channel, 0); + ssh_channel_set_blocking(pairt->down_stream.channel, 0); + pairt->up_stream.peer = &pairt->down_stream; + pairt->down_stream.peer = &pairt->up_stream; + + pairt->up_stream.ctx = ctx; + pairt->down_stream.ctx = ctx; + + pairt->up_stream.direct = UPSTREAM; + pairt->down_stream.direct = DOWNSTREAM; + + TFE_LOG_INFO(g_default_logger, "ssh sub channel on open[%s] %p - [%s] %p, %s", + get_direct_str_by_num(pairt->down_stream.direct), pairt->down_stream.channel, + get_direct_str_by_num(pairt->up_stream.direct), pairt->up_stream.channel, ctx->src_dst_addr); + + pairt->up_stream.cb.userdata = &pairt->up_stream; + pairt->down_stream.cb.userdata = &pairt->down_stream; + pairt->up_stream.cb.channel_data_function = pairt->down_stream.cb.channel_data_function = on_channel_data; + pairt->up_stream.cb.channel_eof_function = pairt->down_stream.cb.channel_eof_function = on_channel_eof; + pairt->up_stream.cb.channel_close_function = pairt->down_stream.cb.channel_close_function = on_channel_close; + pairt->up_stream.cb.channel_signal_function = pairt->down_stream.cb.channel_signal_function = on_channel_signal; + pairt->up_stream.cb.channel_exit_status_function = pairt->down_stream.cb.channel_exit_status_function = on_channel_exit_status; + pairt->up_stream.cb.channel_exit_signal_function = pairt->down_stream.cb.channel_exit_signal_function = on_channel_exit_signal; + + ssh_callbacks_init(&pairt->up_stream.cb); + ssh_callbacks_init(&pairt->down_stream.cb); + ssh_set_channel_callbacks(pairt->up_stream.channel, &pairt->up_stream.cb); + ssh_set_channel_callbacks(pairt->down_stream.channel, &pairt->down_stream.cb); + + if (ssh_event_add_session(ctx->mainloop, ctx->upstream_session) == SSH_ERROR) + { + ctx->is_upstream_error = 1; + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error to add session to event"); + return NULL; + } + if (ssh_event_add_session(ctx->mainloop, ctx->downstream_session) == SSH_ERROR) + { + ctx->is_downstream_error = 1; + TFE_LOG_ERROR(g_default_logger, "ssh downstream, error to add session to event"); + return NULL; + } + + return pairt->up_stream.channel; +} + +/////////////////////// channel callback /////////////////////// + +// returns a valid ssh_channel handle if the request is to be allowed +// returns NULL if the request should not be allowed +static ssh_channel channel_open_cb(ssh_session session, void *userdata) +{ + TFE_LOG_INFO(g_default_logger, "ssh downstream channel open"); + ssh_stream_ctx_t *ctx = (ssh_stream_ctx_t *)userdata; + + if (ctx->base_downstream_channel) + return ctx->base_downstream_channel; + + if (ctx->channel_index >= MAX_CHANNES - 1) + { + TFE_LOG_ERROR(g_default_logger, "ssh sub channels too much: %d", ctx->channel_index); + return NULL; + } + + ssh_channel_pair_t *pairt = (ssh_channel_pair_t *)calloc(1, sizeof(ssh_channel_pair_t)); + ctx->channel_pair_arry[ctx->channel_index++] = pairt; + + pairt->up_stream.channel = ctx->base_upstream_channel = ssh_channel_new(ctx->upstream_session); + pairt->down_stream.channel = ctx->base_downstream_channel = ssh_channel_new(ctx->downstream_session); + + if (ssh_channel_open_session(pairt->up_stream.channel) != SSH_OK) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error on open session: %s", ssh_get_error(ctx->upstream_session)); + return NULL; + } + ctx->is_upstream_session_open = 1; + + ssh_channel_set_blocking(pairt->up_stream.channel, 0); + ssh_channel_set_blocking(pairt->down_stream.channel, 0); + + pairt->up_stream.ctx = ctx; + pairt->down_stream.ctx = ctx; + + pairt->up_stream.direct = UPSTREAM; + pairt->down_stream.direct = DOWNSTREAM; + + pairt->up_stream.peer = &pairt->down_stream; + pairt->down_stream.peer = &pairt->up_stream; + + pairt->up_stream.cb.userdata = &pairt->up_stream; + pairt->down_stream.cb.userdata = &pairt->down_stream; + pairt->down_stream.cb.channel_data_function = on_channel_data; // OK + + //pairt->up_stream.cb.channel_data_function = on_channel_data; + + pairt->up_stream.cb.channel_eof_function = pairt->down_stream.cb.channel_eof_function = on_channel_eof; // OK + pairt->up_stream.cb.channel_close_function = pairt->down_stream.cb.channel_close_function = on_channel_close; // OK + pairt->up_stream.cb.channel_signal_function = pairt->down_stream.cb.channel_signal_function = on_channel_signal; + pairt->up_stream.cb.channel_exit_status_function = pairt->down_stream.cb.channel_exit_status_function = on_channel_exit_status; // OK + pairt->up_stream.cb.channel_exit_signal_function = pairt->down_stream.cb.channel_exit_signal_function = on_channel_exit_signal; + + pairt->down_stream.cb.channel_pty_request_function = on_channel_pty_request; // OK + pairt->down_stream.cb.channel_shell_request_function = on_channel_shell_request; // OK + pairt->down_stream.cb.channel_x11_req_function = on_channel_x11_req; // OK + pairt->down_stream.cb.channel_pty_window_change_function = on_channel_pty_window_change; // OK + pairt->down_stream.cb.channel_exec_request_function = on_channel_exec_request; // OK + pairt->down_stream.cb.channel_env_request_function = on_channel_env_request; // OK + pairt->down_stream.cb.channel_subsystem_request_function = on_channel_subsystem_request; // OK + + TFE_LOG_INFO(g_default_logger, "ssh base channel on open[%s] %p - [%s] %p, %s", + get_direct_str_by_num(pairt->down_stream.direct), pairt->down_stream.channel, + get_direct_str_by_num(pairt->up_stream.direct), pairt->up_stream.channel, ctx->src_dst_addr); + + ssh_callbacks_init(&pairt->up_stream.cb); + ssh_callbacks_init(&pairt->down_stream.cb); + ssh_set_channel_callbacks(pairt->up_stream.channel, &pairt->up_stream.cb); + ssh_set_channel_callbacks(pairt->down_stream.channel, &pairt->down_stream.cb); + + if (ssh_event_add_session(ctx->mainloop, ctx->upstream_session) == SSH_ERROR) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error to add session to event"); + } + if (ssh_event_add_session(ctx->mainloop, ctx->downstream_session) == SSH_ERROR) + { + TFE_LOG_ERROR(g_default_logger, "ssh downstream, error to add session to event"); + } + + return pairt->down_stream.channel; +} + +/////////////////////////////////////////////////////////////// + +static void ssh_kafka_logger_sender(const char *username, const char *password, const char *hostname, const char *port) +{ + cJSON *obj = NULL; + cJSON *dup = NULL; + char *msg = NULL; + if (!ssh_kafka_logger || ssh_kafka_logger->enable == 0) + { + return; + } + + obj = cJSON_CreateObject(); + cJSON_AddStringToObject(obj, "username", username); + cJSON_AddStringToObject(obj, "password", password); + cJSON_AddStringToObject(obj, "hostname", hostname); + cJSON_AddStringToObject(obj, "port", port); + cJSON_AddStringToObject(obj, "sender", ssh_kafka_logger->local_ip_str); + dup = cJSON_Duplicate(obj, 1); + msg = cJSON_PrintUnformatted(dup); + tfe_kafka_logger_send(ssh_kafka_logger, msg, strlen(msg)); + free(msg); + cJSON_Delete(dup); + cJSON_Delete(obj); +} + +// return 0 on success, +// return -1 if an error occured +int ssh_global_init(const char *profile, const char *section) +{ + int enable_kafka = 0; + char nic_name[64] = {0}; + char broker_list[TFE_SYMBOL_MAX] = {0}; + char topic_name[TFE_SYMBOL_MAX] = {0}; + int slot_size = 0; + int max_element_num = 0; + int expire_seconds = 0; + + MESA_load_profile_int_def(profile, section, "ssh_enable", &(ssh_global_ctx.ssh_enable), 0); + if (!ssh_global_ctx.ssh_enable) + return 0; + if (ssh_init() == -1) + { + TFE_LOG_INFO(g_default_logger, "Error to init ssh"); + return -1; + } + + MESA_load_profile_int_def(profile, section, "ssh_max_thread_num", &(ssh_global_ctx.ssh_max_thread_num), 24); + MESA_load_profile_int_def(profile, section, "ssh_verbosity", &(ssh_global_ctx.ssh_verbosity), 0); + MESA_load_profile_int_def(profile, section, "ssh_unpassword_auth_bypass", &(ssh_global_ctx.ssh_unpassword_auth_bypass), 0); + MESA_load_profile_int_def(profile, section, "ssh_connect_timeout_sec", &(ssh_global_ctx.ssh_connect_timeout_sec), 10); + MESA_load_profile_string_def(profile, section, "host_rsa_key_path", ssh_global_ctx.host_rsa_key_path, sizeof(ssh_global_ctx.host_rsa_key_path), "/etc/ssh/ssh_host_rsa_key"); + MESA_load_profile_string_def(profile, section, "host_ecdsa_key_path", ssh_global_ctx.host_ecdsa_key_path, sizeof(ssh_global_ctx.host_ecdsa_key_path), "/etc/ssh/ssh_host_ecdsa_key"); + MESA_load_profile_string_def(profile, section, "host_dsa_key_path", ssh_global_ctx.host_dsa_key_path, sizeof(ssh_global_ctx.host_dsa_key_path), "/etc/ssh/ssh_host_dsa_key"); + MESA_load_profile_string_def(profile, section, "known_hosts_path", ssh_global_ctx.known_hosts_path, sizeof(ssh_global_ctx.known_hosts_path), "/etc/ssh/ssh_known_hosts"); + + MESA_load_profile_int_def(profile, section, "ssh_htable_slot_size", &slot_size, 4 * 1024 * 1024); + MESA_load_profile_int_def(profile, section, "ssh_htable_max_element_num", &max_element_num, 4 * 4 * 1024 * 1024); + MESA_load_profile_int_def(profile, section, "ssh_htable_expire_seconds", &expire_seconds, 30 * 60); + ssh_whitelist_create(slot_size, max_element_num, expire_seconds); + + MESA_load_profile_int_def(profile, section, "ssh_kafka_enable", &enable_kafka, 0); + MESA_load_profile_string_def(profile, section, "ssh_nic_name", nic_name, sizeof(nic_name), "eth0"); + MESA_load_profile_string_def(profile, section, "ssh_kafka_topic", topic_name, sizeof(topic_name), "PXY-SSH-PASSWORD"); + if (!enable_kafka) // is disable,skip broker list + goto skip; + if (MESA_load_profile_string_def(profile, section, "ssh_kafka_brokerlist", broker_list, sizeof(broker_list), NULL) < 0) + { + TFE_LOG_ERROR(g_default_logger, "Error to get \"ssh_kafka_brokerlist\" in profile \"%s\" section \"%s\".", profile, section); + return -1; + } +skip: + ssh_kafka_logger = tfe_kafka_logger_create(enable_kafka, nic_name, broker_list, topic_name, g_default_logger); + if (ssh_kafka_logger) + return 0; + else + return -1; +} + +void ssh_global_finalize() +{ + if (!ssh_global_ctx.ssh_enable) + return; + + ssh_finalize(); + ssh_whitelist_destory(); + tfe_kafka_logger_destroy(ssh_kafka_logger); +} + +// return 0 on success, +// return -1 if an error occured +// need set ctx->fd_downstream +int ssh_downstream_connect(ssh_stream_ctx_t *ctx) +{ + if (ctx == NULL || ctx->fd_downstream < 0) + return -1; + + // downstream_session created, don`t recreate + if (ctx->downstream_session) + return 0; + + ctx->bind = ssh_bind_new(); + if (ctx->bind == NULL) + { + TFE_LOG_ERROR(g_default_logger, "ssh downstream, error to create ssh bind"); + return -1; + } + + if (ssh_bind_options_set(ctx->bind, SSH_BIND_OPTIONS_DSAKEY, ssh_global_ctx.host_dsa_key_path)) + { + TFE_LOG_ERROR(g_default_logger, "ssh downstream, error to set bind options \"SSH_BIND_OPTIONS_DSAKEY\", %s", ssh_get_error(ctx->bind)); + ssh_bind_free(ctx->bind); + ctx->bind = NULL; + return -1; + } + + if (ssh_bind_options_set(ctx->bind, SSH_BIND_OPTIONS_RSAKEY, ssh_global_ctx.host_rsa_key_path)) + { + TFE_LOG_ERROR(g_default_logger, "ssh downstream, error to set bind options \"SSH_BIND_OPTIONS_RSAKEY\", %s", ssh_get_error(ctx->bind)); + ssh_bind_free(ctx->bind); + ctx->bind = NULL; + return -1; + } + + if (ssh_bind_options_set(ctx->bind, SSH_BIND_OPTIONS_ECDSAKEY, ssh_global_ctx.host_ecdsa_key_path)) + { + TFE_LOG_ERROR(g_default_logger, "ssh downstream, error to set bind options \"SSH_BIND_OPTIONS_ECDSAKEY\", %s", ssh_get_error(ctx->bind)); + ssh_bind_free(ctx->bind); + ctx->bind = NULL; + return -1; + } + + if (ssh_bind_options_set(ctx->bind, SSH_BIND_OPTIONS_LOG_VERBOSITY, &(ssh_global_ctx.ssh_verbosity))) + { + TFE_LOG_ERROR(g_default_logger, "ssh downstream, error to set bind options \"SSH_BIND_OPTIONS_LOG_VERBOSITY\", %s", ssh_get_error(ctx->bind)); + ssh_bind_free(ctx->bind); + ctx->bind = NULL; + return -1; + } + + ctx->downstream_session = ssh_new(); + if (ctx->downstream_session == NULL) + { + TFE_LOG_ERROR(g_default_logger, "ssh downstream, error to create ssh session"); + ssh_bind_free(ctx->bind); + ctx->bind = NULL; + return -1; + } + + if (ssh_options_set(ctx->downstream_session, SSH_OPTIONS_TIMEOUT, &(ssh_global_ctx.ssh_connect_timeout_sec))) + { + TFE_LOG_ERROR(g_default_logger, "ssh downstream, error to set options \"SSH_OPTIONS_TIMEOUT\", %s", ssh_get_error(ctx->downstream_session)); + ssh_bind_free(ctx->bind); + ctx->bind = NULL; + ssh_free(ctx->downstream_session); + ctx->downstream_session = NULL; + return -1; + } + + if (ssh_bind_accept_fd(ctx->bind, ctx->downstream_session, ctx->fd_downstream) != SSH_OK) + { + TFE_LOG_ERROR(g_default_logger, "ssh downstream, error to bind accept fd, %s", ssh_get_error(ctx->bind)); + ssh_bind_free(ctx->bind); + ctx->bind = NULL; + ssh_free(ctx->downstream_session); + ctx->downstream_session = NULL; + return -1; + } + + TFE_LOG_DEBUG(g_default_logger, "ssh downstream connect [done]\n\n"); + + return 0; +} + +// return 0 on success +// return -1 if an error occured +// need set ctx->hostname | ctx->port | ctx->user | ctx->fd_upstream +int ssh_upstream_connect(ssh_stream_ctx_t *ctx) +{ + if (ctx == NULL || ctx->fd_upstream < 0) + return -1; + + // upstream_session created, don`t recreate + if (ctx->upstream_session) + return 0; + + ctx->upstream_session = ssh_new(); + if (ctx->upstream_session == NULL) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error to create ssh session"); + return -1; + } + + if (ssh_options_set(ctx->upstream_session, SSH_OPTIONS_TIMEOUT, &(ssh_global_ctx.ssh_connect_timeout_sec))) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error to set options \"SSH_OPTIONS_TIMEOUT\", %s", ssh_get_error(ctx->upstream_session)); + goto error; + } + + if (ssh_options_set(ctx->upstream_session, SSH_OPTIONS_KNOWNHOSTS, ssh_global_ctx.known_hosts_path)) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error to set options \"SSH_OPTIONS_KNOWNHOSTS\", %s", ssh_get_error(ctx->upstream_session)); + goto error; + } + + if (ssh_options_set(ctx->upstream_session, SSH_OPTIONS_HOST, ctx->hostname)) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error to set options \"SSH_OPTIONS_HOST\", %s", ssh_get_error(ctx->upstream_session)); + goto error; + } + + if (ssh_options_set(ctx->upstream_session, SSH_OPTIONS_PORT_STR, ctx->port)) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error to set options \"SSH_OPTIONS_PORT_STR\", %s", ssh_get_error(ctx->upstream_session)); + goto error; + } + + if (ssh_options_set(ctx->upstream_session, SSH_OPTIONS_USER, ctx->username)) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error to set options \"SSH_OPTIONS_USER\", %s", ssh_get_error(ctx->upstream_session)); + goto error; + } + + if (ssh_options_set(ctx->upstream_session, SSH_OPTIONS_LOG_VERBOSITY, &(ssh_global_ctx.ssh_verbosity))) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error to set options \"SSH_OPTIONS_LOG_VERBOSITY\", %s", ssh_get_error(ctx->upstream_session)); + goto error; + } + + if (ssh_options_set(ctx->upstream_session, SSH_OPTIONS_FD, &(ctx->fd_upstream))) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error to set options \"SSH_OPTIONS_FD\", %s", ssh_get_error(ctx->upstream_session)); + goto error; + } + + if (ssh_connect(ctx->upstream_session) != SSH_OK) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream, error to create ssh connect, %s", ssh_get_error(ctx->upstream_session)); + goto error; + } + /* + if (ssh_verify_knownhost(ctx->upstream_session)) + { + TFE_LOG_ERROR(g_default_logger, "ssh upstream: error to verify is server known, %s\n", ssh_get_error(ctx->upstream_session)); + ssh_disconnect(ctx->upstream_session); + goto error; + } + */ + + TFE_LOG_DEBUG(g_default_logger, "ssh upstream connect [done]\n\n"); + ctx->is_upstream_connected = 1; + + return 0; +error: + ssh_free(ctx->upstream_session); + ctx->upstream_session = NULL; + return -1; +} + +void ssh_proxy_main(ssh_stream_ctx_t *ctx) +{ + struct ssh_server_callbacks_struct downstream_server_cb = {0}; + downstream_server_cb.userdata = ctx; + downstream_server_cb.auth_none_function = auth_none_cb; + downstream_server_cb.auth_pubkey_function = auth_pubkey_cb; + downstream_server_cb.auth_password_function = auth_password_cb; + downstream_server_cb.auth_gssapi_mic_function = auth_gssapi_mic_cb; + downstream_server_cb.channel_open_request_session_function = channel_open_cb; + //downstream_server_cb.service_request_function + //downstream_server_cb.gssapi_select_oid_function + //downstream_server_cb.gssapi_accept_sec_ctx_function + //downstream_server_cb.gssapi_verify_mic_function + + ssh_callbacks_init(&downstream_server_cb); + ssh_set_server_callbacks(ctx->downstream_session, &downstream_server_cb); + + if (ssh_handle_key_exchange(ctx->downstream_session) != SSH_OK) + { + TFE_LOG_ERROR(g_default_logger, "ssh stream %s error on key exchange, downstream: '%s', upstream: '%s'", ctx->src_dst_addr, + ctx->downstream_session ? ssh_get_error(ctx->downstream_session) : "NULL", + ctx->upstream_session ? ssh_get_error(ctx->upstream_session) : "NULL"); + return; + } + ctx->is_downstream_connected = 1; + + ssh_set_auth_methods(ctx->downstream_session, SSH_AUTH_METHOD_PASSWORD); + + ctx->mainloop = ssh_event_new(); + if (ctx->mainloop == NULL) + { + TFE_LOG_ERROR(g_default_logger, "ssh stream %s error on new event, downstream: '%s', upstream: '%s'", ctx->src_dst_addr, + ctx->downstream_session ? ssh_get_error(ctx->downstream_session) : "NULL", + ctx->upstream_session ? ssh_get_error(ctx->upstream_session) : "NULL"); + return; + } + if (ssh_event_add_session(ctx->mainloop, ctx->downstream_session) == SSH_ERROR) + { + TFE_LOG_ERROR(g_default_logger, "ssh stream %s error on add session to event, downstream: '%s', upstream: '%s'", ctx->src_dst_addr, + ctx->downstream_session ? ssh_get_error(ctx->downstream_session) : "NULL", + ctx->upstream_session ? ssh_get_error(ctx->upstream_session) : "NULL"); + return; + } + + int n = 0; + while (ctx->is_authenticated == 0 || ctx->base_downstream_channel == NULL) + { + if (ctx->auth_attempts >= 3 || n >= 1000) + { + TFE_LOG_INFO(g_default_logger, "ssh stream %s auth timeout", ctx->src_dst_addr); + return; + } + + if (ssh_event_dopoll(ctx->mainloop, 100) == SSH_ERROR) + { + TFE_LOG_ERROR(g_default_logger, "ssh stream %s break on event poll, downstream: '%s', upstream: '%s'", ctx->src_dst_addr, + ctx->downstream_session ? ssh_get_error(ctx->downstream_session) : "NULL", + ctx->upstream_session ? ssh_get_error(ctx->upstream_session) : "NULL"); + return; + } + n++; + } + + TFE_LOG_INFO(g_default_logger, "ssh stream %s auth success, user:%s, password:%s", ctx->src_dst_addr, ctx->username, ctx->password); + ssh_kafka_logger_sender(ctx->username, ctx->password, ctx->hostname, ctx->port); + + do + { + if (ssh_event_dopoll(ctx->mainloop, -1) == SSH_ERROR) + { + TFE_LOG_ERROR(g_default_logger, "ssh stream %s break on event poll, downstream: '%s', upstream: '%s'", ctx->src_dst_addr, + ctx->downstream_session ? ssh_get_error(ctx->downstream_session) : "NULL", + ctx->upstream_session ? ssh_get_error(ctx->upstream_session) : "NULL"); + return; + } + if (ctx->is_upstream_session_open == 1) + { + if (ssh_event_add_fd(ctx->mainloop, ctx->fd_upstream, POLLIN, ssh_upstream_channel_readcb, ctx) != SSH_OK) + { + TFE_LOG_ERROR(g_default_logger, "ssl stream %s error on add fd to event, downstream: '%s', upstream: '%s'", ctx->src_dst_addr, + ctx->downstream_session ? ssh_get_error(ctx->downstream_session) : "NULL", + ctx->upstream_session ? ssh_get_error(ctx->upstream_session) : "NULL"); + return; + } + ctx->is_upstream_session_open = 2; + } + } while (ssh_channel_is_open(ctx->base_downstream_channel) && !ctx->is_downstream_error && !ctx->is_upstream_error); +} + +void *ssh_proxy_thread_entry(void *args) +{ + const char *sip = NULL; + const char *dip = NULL; + const char *sport = NULL; + const char *dport = NULL; + ssh_stream_ctx_t *ctx = NULL; + struct tfe_stream_private *_stream = NULL; + + if (!ssh_global_ctx.ssh_enable) + return 0; + + _stream = (struct tfe_stream_private *)args; + assert(_stream != NULL && _stream->thread == pthread_self() && _stream->session_type == STREAM_PROTO_SSH); + + char *temp = strdup(_stream->str_stream_addr); + tfe_stream_addr_str_split(temp, &sip, &sport, &dip, &dport); + assert(sip && dip && dport); + TFE_LOG_INFO(g_default_logger, "ssh stream %s on thread %ld running", _stream->str_stream_addr, _stream->thread); + + ctx = (ssh_stream_ctx_t *)calloc(1, sizeof(ssh_stream_ctx_t)); + assert(ctx); + ctx->src_dst_addr = strdup(_stream->str_stream_addr); + ctx->port = strdup(dport); + ctx->hostname = strdup(dip); + ctx->srcip = strdup(sip); + free(temp); + ctx->fd_upstream = _stream->defer_fd_upstream; + ctx->fd_downstream = _stream->defer_fd_downstream; + + if (ssh_downstream_connect(ctx)) + goto error; + + ssh_proxy_main(ctx); + + if (ctx && ctx->is_unsupport_auth_method && ssh_global_ctx.ssh_unpassword_auth_bypass) + { + TFE_LOG_INFO(g_default_logger, "ssh dynamic bypass enable, stream %s need bypass", ctx->src_dst_addr); + ssh_whitelist_set(ctx->srcip, ctx->hostname, ctx->port); + } + +error: + if (ctx) + { + if (ctx->bind) + ssh_bind_free(ctx->bind); + + if (ctx->channel_index) + { + for (int i = 0; i < ctx->channel_index; i++) + { + ssh_channel_free(ctx->channel_pair_arry[i]->down_stream.channel); + ssh_channel_free(ctx->channel_pair_arry[i]->up_stream.channel); + + free(ctx->channel_pair_arry[i]); + } + } + + if (ctx->downstream_session) + { + if (ctx->is_downstream_connected) + ssh_disconnect(ctx->downstream_session); + ssh_free(ctx->downstream_session); + } + if (ctx->upstream_session) + { + if (ctx->is_upstream_connected) + ssh_disconnect(ctx->upstream_session); + ssh_free(ctx->upstream_session); + } + + if (ctx->mainloop) + ssh_event_free(ctx->mainloop); + + if (ctx->srcip) + free(ctx->srcip); + if (ctx->hostname) + free(ctx->hostname); + if (ctx->port) + free(ctx->port); + + if (ctx->username) + free(ctx->username); + if (ctx->password) + free(ctx->password); + + if (ctx->fd_downstream) + close(ctx->fd_downstream); + if (ctx->fd_upstream) + close(ctx->fd_upstream); + + if (ctx->upstream_cb) + free(ctx->upstream_cb); + if (ctx->src_dst_addr) + free(ctx->src_dst_addr); + + free(ctx); + } + + TFE_LOG_INFO(g_default_logger, "ssh stream %s on thread %ld exited", _stream->str_stream_addr, _stream->thread); + ATOMIC_DEC(&g_default_proxy->stat_val[STAT_STREAM_TCP_SSH]); + + return NULL; +}
\ No newline at end of file diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index a8691fd..9dfe3da 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -26,6 +26,7 @@ #include <tfe_future.h> #include <tfe_plugin.h> #include <tfe_proxy.h> +#include <ssh_stream.h> #include <platform.h> #include <ssl_stream.h> @@ -1349,8 +1350,41 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst _stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr); stream->str_stream_info = _stream->str_stream_addr; + if (_stream->session_type == STREAM_PROTO_SSH) + { + const char *sip = NULL; + const char *dip = NULL; + const char *dport = NULL; + char *temp = strdup(_stream->str_stream_addr); + tfe_stream_addr_str_split(temp, &sip, NULL, &dip, &dport); + if (!ssh_global_ctx.ssh_enable || // ssh proxy not enable, passthough + (ssh_global_ctx.ssh_unpassword_auth_bypass && ssh_whitelist_get(sip, dip, dport)) || // ssh proxy enable, unsupport auth method, passthough + ATOMIC_READ(&g_default_proxy->stat_val[STAT_STREAM_TCP_SSH]) >= ssh_global_ctx.ssh_max_thread_num) // ssh proxy enable, thread num too much, passthough + { + TFE_LOG_INFO(_stream->stream_logger, "ssh stream passthough: %s", _stream->str_stream_addr); + _stream->tcp_passthough = 1; + free(temp); + goto passthough; + } + free(temp); + + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + /* Create a thread to process ssh traffic in blocking mode */ + int ret = pthread_create(&_stream->thread, &attr, ssh_proxy_thread_entry, (void *)_stream); + if (unlikely(ret < 0)) + { + TFE_LOG_ERROR(_stream->stream_logger, "Failed to create thread to process ssh traffic: %s", strerror(errno)); + goto __errout; + } + + TFE_PROXY_STAT_INCREASE(STAT_STREAM_TCP_SSH, 1); + } + if (_stream->session_type == STREAM_PROTO_PLAIN) { +passthough: _stream->conn_downstream = __conn_private_create_by_fd(_stream, fd_downstream); if (_stream->conn_downstream != NULL) { diff --git a/vendor/CMakeLists.txt b/vendor/CMakeLists.txt index aa0b2f2..f9b71d4 100644 --- a/vendor/CMakeLists.txt +++ b/vendor/CMakeLists.txt @@ -13,6 +13,7 @@ ExternalProject_Add(OpenSSL PREFIX openssl BUILD_IN_SOURCE 1) ExternalProject_Get_Property(OpenSSL INSTALL_DIR) +set(OPENSSL_INSTSLL_DIR ${INSTALL_DIR}) set(OPENSSL_INCLUDE_DIRECTORIES ${INSTALL_DIR}/include) set(OPENSSL_LINK_DIRECTORIES ${INSTALL_DIR}/lib) set(OPENSSL_PKGCONFIG_PATH ${INSTALL_DIR}/lib/pkgconfig/) @@ -28,6 +29,25 @@ add_dependencies(openssl-ssl-static OpenSSL) set_property(TARGET openssl-ssl-static PROPERTY IMPORTED_LOCATION ${INSTALL_DIR}/lib/libssl.a) set_property(TARGET openssl-ssl-static PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${INSTALL_DIR}/include) +### libssh +ExternalProject_Add(libssh PREFIX libssh + URL ${CMAKE_CURRENT_SOURCE_DIR}/libssh-0.9.3.tar.gz + URL_MD5 962c71808bc4d5fcc35411b0f91641f2 + CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=<INSTALL_DIR> + -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} + -DBUILD_SHARED_LIBS=OFF + -DWITH_GSSAPI=OFF + -DWITH_EXAMPLES=OFF + -DOPENSSL_ROOT_DIR=${OPENSSL_INSTSLL_DIR} + DEPENDS OpenSSL) +ExternalProject_Get_Property(libssh INSTALL_DIR) +file(MAKE_DIRECTORY ${INSTALL_DIR}/include) + +add_library(libssh-static STATIC IMPORTED GLOBAL) +add_dependencies(libssh-static libssh) +set_property(TARGET libssh-static PROPERTY IMPORTED_LOCATION ${INSTALL_DIR}/lib64/libssh.a) +set_property(TARGET libssh-static PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${INSTALL_DIR}/include) + ### Libevent 2.1.8 ExternalProject_Add(libevent PREFIX libevent URL ${CMAKE_CURRENT_SOURCE_DIR}/libevent-2.1.8-stable.tar.gz diff --git a/vendor/libssh-0.9.3.tar.gz b/vendor/libssh-0.9.3.tar.gz Binary files differnew file mode 100644 index 0000000..5095fb3 --- /dev/null +++ b/vendor/libssh-0.9.3.tar.gz |
