1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
|
#include "tfe_rpc.h"
#include "event2/http.h"
#include "event2/http_struct.h"
#include "event2/event.h"
#include "event2/buffer.h"
#include "event2/dns.h"
#include "event2/thread.h"
#include "tfe_utils.h"
#include "MESA/MESA_handle_logger.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <sys/queue.h>
#define MODULE_NAME "tfe_rpc"
struct tfe_rpc_ctx
{
struct event_base * evbase;
enum TFE_RPC_FLAG flag;
struct evhttp_connection* connection;
};
/*
//will be called after receiving and parsing the full header. It allows analyzing the header and possibly closing the connection by returning a value < 0.
int read_header_done_cb(struct evhttp_request* response, void* arg)
{
//struct promise* p = (struct promise*)arg;
printf("call read_header_done_cb!\n");
printf("< HTTP/1.1 %d %s\n", evhttp_request_get_response_code(response), evhttp_request_get_response_code_line(response));
return 0;
struct evkeyvalq* headers = evhttp_request_get_input_headers(response);
struct evkeyval* header;
TAILQ_FOREACH(header, headers, next){
MESA_handle_runtime_log(logger, RLOG_LV_INFO, MODULE_NAME, "< %s: %s\n", header->key, header->value);
}
MESA_handle_runtime_log(logger, RLOG_LV_INFO, MODULE_NAME, "< \n");
}
*/
static void tfe_rpc_promise_free_ctx(void* ctx)
{
free(ctx);
ctx = NULL;
return;
}
static void _wrapped_promise_success(struct promise* p, void* result)
{
struct tfe_rpc_ctx* ctx = (struct tfe_rpc_ctx*)promise_get_ctx(p);
struct tfe_rpc_response_result* _result = (struct tfe_rpc_response_result*)result;
if(ctx->flag == CHUNK_CB && _result->len > 0)
{
return;
}
/*
if(ctx->evbase)
{
event_base_loopexit(ctx->evbase, 0);
}
*/
//promise_dettach_ctx(p);
//tfe_rpc_promise_free_ctx(ctx);
promise_success(p, result);
return;
}
static void _wrapped_promise_failed(struct promise * p, enum e_future_error error, const char * what)
{
/*
struct tfe_rpc_ctx* ctx = (struct tfe_rpc_ctx*)promise_get_ctx(p);
if(ctx->evbase)
{
event_base_loopexit(ctx->evbase, 0);
}
*/
promise_failed(p, error, what);
//promise_dettach_ctx(p);
//ctx_destroy_cb(ctx);
return;
}
//will be called after every read of data with the same argument as the completion callback. Will never be called on an empty response. May drain the input buffer; it will be drained automatically on return.
//will drain automaticly
void read_chunk_cb(struct evhttp_request* response, void* arg)
{
struct promise* p = (struct promise*)arg;
//printf("call get_chunk_cb\n");
if(response == NULL)
{
return;
}
struct evbuffer* evbuf = evhttp_request_get_input_buffer(response);
if(evbuf == NULL)
{
return;
}
size_t evbuf_len = evbuffer_get_length(evbuf);
char* data = (char*)evbuffer_pullup(evbuf, evbuf_len);
//printf("data is %s\n", data==NULL ? "NULL":"NOT NULL");
struct tfe_rpc_response_result* result = ALLOC(struct tfe_rpc_response_result, 1);
result->status_code = evhttp_request_get_response_code(response);
result->status_msg = evhttp_request_get_response_code_line(response);
result->data = data;
result->len = evbuf_len;
_wrapped_promise_success(p, result);
free(result);
}
//The callback is executed when the request completed or an error occurred
void get_response_cb(struct evhttp_request* response, void* arg)
{
//printf("call get_response_cb\n");
read_chunk_cb(response, arg);
}
//On error, both the error callback and the regular callback will be called, error callback is called before the regular callback.
void request_error_cb(enum evhttp_request_error error, void* arg)
{
//printf("call request_error_cb\n");
struct promise* p = (struct promise*)arg;
switch(error)
{
case EVREQ_HTTP_TIMEOUT:
_wrapped_promise_failed(p, FUTURE_ERROR_TIMEOUT, "EVREQ_HTTP_TIMEOUT");
break;
case EVREQ_HTTP_EOF:
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "EVREQ_HTTP_EOF");
break;
case EVREQ_HTTP_INVALID_HEADER:
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "EVREQ_HTTP_INVALID_HEADER");
break;
case EVREQ_HTTP_BUFFER_ERROR:
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "EVREQ_HTTP_BUFFER_ERROR");
break;
case EVREQ_HTTP_REQUEST_CANCEL:
_wrapped_promise_failed(p, FUTURE_ERROR_CANCEL, "EVREQ_HTTP_REQUEST_CANCEL");
break;
case EVREQ_HTTP_DATA_TOO_LONG:
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "EVREQ_HTTP_DATA_TOO_LONG");
break;
default:
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "EVREQ_HTTP_UNKOWN_EXCEPTION");
break;
}
}
//when to close a connection ???
//Set a callback for connection close
void connection_close_cb(struct evhttp_connection* connection, void* arg)
{
//printf("call connection_close_cb\n");
}
char* get_request_url(struct evhttp_uri* uri, size_t url_len)
{
const char* path = evhttp_uri_get_path(uri);
const char* query = evhttp_uri_get_query(uri);
char *request_url = NULL;
request_url = (char*)malloc(url_len);
if(path == NULL || strnlen(path, url_len) == 0)
{
snprintf(request_url, url_len, "/");
}
else
{
snprintf(request_url, url_len, "%s", path);
}
if(query && strnlen(query, url_len))
{
strncat(request_url, "?", url_len);
strncat(request_url, query, url_len);
}
return request_url;
}
//data is for POST. if method is GET, data should be NULL
void tfe_rpc_async_ask(struct future* f, const char* url, enum TFE_RPC_METHOD method, enum TFE_RPC_FLAG flag, char* data, int data_len, struct event_base * evbase, struct evdns_base* dnsbase, struct evhttp_connection *evhttp)
{
const char* host=NULL;
int port=0;
size_t url_len=0;
char* request_url=NULL;
struct evhttp_request* request=NULL;
struct promise* p = future_to_promise(f);
struct tfe_rpc_ctx* ctx = ALLOC(struct tfe_rpc_ctx, 1);
ctx->evbase = evbase;
ctx->flag = flag;
promise_set_ctx(p, (void*)ctx, tfe_rpc_promise_free_ctx);
assert(evbase&&dnsbase);
struct evhttp_uri* uri = evhttp_uri_parse(url);
if(NULL == uri)
{
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "parse url failed!");
goto error_out;
}
host = evhttp_uri_get_host(uri);
if(!host)
{
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "parse host failed!");
goto error_out;
}
port = evhttp_uri_get_port(uri);
if(port < 0)
{
port = 80;
}
ctx->connection = evhttp;
if (!ctx->connection)
{
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "create connection failed!");
goto error_out;
}
evhttp_connection_set_closecb(ctx->connection, connection_close_cb, evbase);
request = evhttp_request_new(get_response_cb, (void*)p);
//evhttp_request_set_header_cb(request, read_header_done_cb);
if(flag == CHUNK_CB)
{
evhttp_request_set_chunked_cb(request, read_chunk_cb);
}
evhttp_request_set_error_cb(request, request_error_cb);
evhttp_add_header(evhttp_request_get_output_headers(request), "Host", host);
evhttp_add_header(evhttp_request_get_output_headers(request), "Connection", "keep-alive");
url_len = strlen(url);
request_url = get_request_url(uri, url_len);
//printf("request url is %s\n", request_url);
if(request_url == NULL)
{
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "get request url failed");
goto error_out;
}
switch(method)
{
case GET:
evhttp_make_request(ctx->connection, request, EVHTTP_REQ_GET, request_url);
break;
case POST:
evbuffer_add(request->output_buffer, data, data_len);
evhttp_make_request(ctx->connection, request, EVHTTP_REQ_POST, request_url);
break;
default:
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "method is invalid!");
goto error_out;
}
free(data);
free(request_url);
evhttp_uri_free(uri);
return;
error_out:
if(uri) evhttp_uri_free(uri);
if(data) free(data);
promise_dettach_ctx(p);
tfe_rpc_promise_free_ctx(ctx);
return;
}
struct tfe_rpc_response_result* tfe_rpc_release(void* result)
{
struct tfe_rpc_response_result* response = (struct tfe_rpc_response_result*)result;
return response;
}
|