summaryrefslogtreecommitdiff
path: root/infra/monitor/monitor_rpc.c
blob: 45d11ee8f036e56413564c50fa703e0bfb7fe323 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#include <stddef.h>
#include "stellar/monitor.h"
#include "monitor_private.h"
#include "monitor_rpc.h"
#include "stellar/module.h"

#define RPC_WORKER_THREAD_BUSY 1
#define RPC_WORKER_THREAD_IDLE 0

struct monitor_rpc_msg_hdr
{
    unsigned int type;
    unsigned int length; // total messaage length, include this header, = payload length + sizeof(struct monitor_rpc_msg_hdr)
    char value[0];       // variable-length, continuous
} __attribute__((packed));

enum monitor_rpc_ringbuf_dir
{ // full duplex, dir: 0: worker thread to monitor thread; 1: monitor thread to worker thread
    RPC_RINBUG_DIR_W2M = 0,
    RPC_RINBUG_DIR_M2W = 1,
    RPC_RINBUG_DIR_MAX = 2,
};

struct monitor_rpc
{
    volatile long atomic_val;

    monitor_rpc_callabck *rpc_cb;
    void *rpc_args;
    struct iovec rpc_request;
    struct iovec rpc_response;
};

struct iovec stm_rpc_call(struct monitor_rpc *rpc_ins, struct iovec rpc_request, monitor_rpc_callabck *cb, void *user_args)
{
    while (__sync_or_and_fetch(&rpc_ins->atomic_val, 0) == RPC_WORKER_THREAD_BUSY)
    {
        // wait for the last rpc response, not support concurrent rpc yet!
        usleep(1000);
    }
    rpc_ins->rpc_cb = cb;
    rpc_ins->rpc_args = user_args;
    rpc_ins->rpc_request = rpc_request;
    __sync_fetch_and_or(&rpc_ins->atomic_val, 1);

    while (__sync_or_and_fetch(&rpc_ins->atomic_val, 0) == RPC_WORKER_THREAD_BUSY)
    {
        // wait for the rpc response...
        usleep(1000);
    }
    return rpc_ins->rpc_response;
}

int stm_rpc_exec(int thread_idx, struct monitor_rpc *rpc_ins)
{
    if (0 == __sync_or_and_fetch(&rpc_ins->atomic_val, RPC_WORKER_THREAD_IDLE))
    {
        return 0;
    }
    rpc_ins->rpc_response = rpc_ins->rpc_cb(thread_idx, rpc_ins->rpc_request, rpc_ins->rpc_args);
    __sync_fetch_and_and(&rpc_ins->atomic_val, RPC_WORKER_THREAD_IDLE);
    return 1;
}

/*
 * Communicate between different threads by ringbuf.
 */
struct iovec monitor_worker_thread_rpc(struct stellar_monitor *stm, int worker_thread_idx, struct iovec rpc_request, monitor_rpc_callabck *cb, void *user_args)
{
    int worker_thread_num = module_manager_get_max_thread_num(stm->mod_mgr_ref);
    if (worker_thread_idx >= worker_thread_num)
    {
        struct iovec response = {0};
        return response;
    }
    struct monitor_rpc *rpc_ins = stm->rpc_ins_array[worker_thread_idx];
    return stm_rpc_call(rpc_ins, rpc_request, cb, user_args);
}

__thread long long rpc_idle_num = 0;

void module_rpc_worker_thread_polling_cb(struct module_manager *mod_mgr, void *polling_arg)
{
    struct stellar_monitor *stm = (struct stellar_monitor *)polling_arg;
    int thread_idx = module_manager_get_thread_id(mod_mgr);
    struct monitor_rpc *rpc_ins = stm->rpc_ins_array[thread_idx];

    stm_rpc_exec(thread_idx, rpc_ins);
}

struct monitor_rpc *stm_rpc_new(void)
{
    struct monitor_rpc *rpc_ins = (struct monitor_rpc *)calloc(1, sizeof(struct monitor_rpc));
    return rpc_ins;
}

void stm_rpc_free(struct monitor_rpc *rpc_ins)
{
    if (NULL == rpc_ins)
    {
        return;
    }
    free(rpc_ins);
}

struct monitor_rpc *monitor_rpc_new(struct stellar_monitor *stm, struct module_manager *mod_mgr)
{
    module_manager_register_polling_node(mod_mgr, module_rpc_worker_thread_polling_cb, (void *)stm);
    return stm_rpc_new();
}

void monitor_rpc_free(struct monitor_rpc *rpc_ins)
{
    stm_rpc_free(rpc_ins);
}