1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
|
/* Application, Process and Dataplane Manager
*
* This manager is used to manage application, process and thread.
* It gives one unique id for each application, process and dataplane thread.
*
* Author : Lu Qiuwen<[email protected]>
* Date : 2016-08-19
*/
#include <sys/queue.h>
#include <stdio.h>
#include <stdint.h>
#include <inttypes.h>
#include <mr_common.h>
#include <mr_mask.h>
#include <mr_runtime.h>
#include <rte_malloc.h>
#include <rte_rwlock.h>
#include <rte_lcore.h>
#include <rte_per_lcore.h>
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
struct appinfo
{
/* 下一个 */
TAILQ_ENTRY(appinfo) next;
/* 应用标识符 */
char symbol[MR_SYMBOL_MAX];
/* 应用序号 */
app_id_t app_id;
/* 全局线程序号,起始序号,左闭右开 */
thread_id_t gsid_start;
/* 全局线程序号,终止序号,左闭右开 */
thread_id_t gsid_end;
/* 进程状态 */
volatile unsigned int state;
/* 运行的线程信息 */
struct thread_info * thread_info;
/* CPU绑定掩码 */
cpu_mask_t cpu_mask;
/* 进程最大数据面线程数量 */
unsigned int nr_max_thread;
/* 目前分配的线程序号 */
unsigned int cur_sid;
/* 目前分配的全局线程序号 */
unsigned int cur_gsid;
/* 应用私有句柄 */
void * priv;
};
struct thread_info
{
/* 线程顺序号 */
thread_id_t sid;
/* 全局线程顺序号 */
thread_id_t gsid;
/* 进程号 */
thread_id_t proc_id;
/* 物理CPU序号 */
cpu_id_t cpu_id;
/* 物理NUMA节点序号 */
socket_id_t socket_id;
/* 进程私有上下文 */
void * priv;
/* 标志位,当前线程是否运行 */
unsigned int in_use;
};
/* 当前运行应用的描述符 */
static struct appinfo * currect_app_info = NULL;
/* 当前线程描述符 */
static __thread struct thread_info * currect_thread_info = NULL;
TAILQ_HEAD(appinfo_list, appinfo);
struct app_manager
{
/* 读写锁,控制并发访问 */
mr_spinlock_t lock;
/* 进程列表 */
struct appinfo_list appinfo_list;
/* 在册进程数量 */
unsigned int nr_app;
};
struct app_manager * app_manager_create()
{
struct app_manager * object = rte_zmalloc(NULL, sizeof(struct app_manager), 0);
MR_CHECK(object != NULL, "AppObjectCreate, Cannot alloc memory for app manager");
object->nr_app = 0;
mr_spin_init(&object->lock);
TAILQ_INIT(&object->appinfo_list);
return object;
}
//TODO: 应用管理器销毁
int app_manager_destory(struct app_manager * object)
{
return 0;
}
/* 应用运行 */
static int __rte_unused __app_run_unsafe(struct app_manager * object, struct appinfo * pinfo)
{
return 0;
}
/* 应用挂起,保留上下文 */
static int __rte_unused __app_suspend_unsafe(struct app_manager * object, struct appinfo * pinfo)
{
return 0;
}
/* 应用退出,销毁上下文 */
static int __rte_unused __app_exit_unsafe(struct app_manager * object, struct appinfo * pinfo)
{
return 0;
}
static struct appinfo * __appinfo_lookup_unsafe(const struct app_manager * object,
const char * sym)
{
struct appinfo * pinfo_iter = NULL;
struct appinfo * pinfo_ret = NULL;
TAILQ_FOREACH(pinfo_iter, &object->appinfo_list, next)
{
if(strncmp(pinfo_iter->symbol, sym, sizeof(pinfo_iter->symbol)) == 0)
{
pinfo_ret = pinfo_iter;
break;
}
}
return pinfo_ret;
}
/* 删除一个进程 */
static int __appinfo_delete_unsafe(struct app_manager * object,
struct appinfo * pinfo)
{
// 检查进程所属的线程是否全部退出。线程全部退出后才允许销毁这个进程
for(int i = 0; i < pinfo->nr_max_thread; i++)
{
if (pinfo->thread_info[i].in_use == 0) continue;
// 检查失败,有一个或多个线程仍然在使用
MR_LOG(INFO, BASE, "AppInfo, AppInfoDelete, "
"Thread %d in use, cannot delete app %s. \n",
pinfo->thread_info[i].sid, pinfo->symbol);
return -1;
}
// 回收应用序号和全局线程号
mr_id_manager_release_app_id(pinfo->app_id);
mr_id_manager_release_gsid(pinfo->gsid_start, pinfo->nr_max_thread);
// 移除AppInfo,释放资源
TAILQ_REMOVE(&object->appinfo_list, pinfo, next);
object->nr_app--;
rte_free(pinfo->thread_info);
rte_free(pinfo);
// 当前的应用信息置空
currect_app_info = NULL;
return 0;
}
/* 创建一个新的进程 */
static int __appinfo_new_unsafe(struct app_manager * object, const char * sym,
cpu_mask_t cpu_mask, unsigned int nr_max_thread)
{
struct appinfo * pinfo;
int ret = 0;
// 检查进程是否已经存在
pinfo = __appinfo_lookup_unsafe(object, sym);
if (pinfo != NULL)
{
MR_LOG(INFO, BASE, "AppInfo, AppInfoNew, "
"Application %s is already exists, failed. \n", sym);
ret = -1; goto out;
}
// 进程不存在,创建新的描述符
pinfo = (struct appinfo *)rte_zmalloc(NULL, sizeof(struct appinfo), 0);
if (unlikely(pinfo == NULL))
{
MR_LOG(INFO, BASE, "AppInfo, AppInfoNew, "
"Cannot alloc appinfo mem for app %s info.\n", sym);
ret = -2; goto errout;
}
// 填充进程信息
snprintf(pinfo->symbol, sizeof(pinfo->symbol), "%s", sym);
// 申请应用序号
app_id_t app_id;
ret = mr_id_manager_apply_app_id(&app_id);
if(unlikely(ret < 0))
{
MR_LOG(INFO, BASE, "AppInfo, AppInfoNew, "
"Cannot apply app_id for app %s. \n", sym);
ret = -3; goto errout;
}
// 申请全局线程号
thread_id_t gsid_start;
ret = mr_id_manager_apply_gsid(nr_max_thread, &gsid_start);
if(unlikely(ret < 0))
{
MR_LOG(INFO, BASE, "AppInfo, AppInfoNew, "
"Cannot apply gsid for app %s. \n", sym);
ret = -4; goto errout;
}
// 计算全局线程号终止序号,全局线程号在单个应用内是连续的
thread_id_t gsid_end = gsid_start + nr_max_thread;
pinfo->app_id = app_id;
pinfo->gsid_start = gsid_start;
pinfo->gsid_end = gsid_end;
pinfo->cpu_mask = cpu_mask;
pinfo->nr_max_thread = nr_max_thread;
// 插入到应用链表
TAILQ_INSERT_TAIL(&object->appinfo_list, pinfo, next);
// 初始化线程
pinfo->thread_info = (struct thread_info *)rte_zmalloc(NULL,
sizeof(struct thread_info)*nr_max_thread, 0);
if (pinfo->thread_info == NULL)
{
MR_LOG(INFO, BASE, "AppInfo, AppInfoNew, "
"Cannot alloc memory for thread_info sturcture, failed\n");
ret = -5; goto errout;
}
pinfo->cur_sid = 0;
pinfo->cur_gsid = pinfo->gsid_start;
ret = 0; goto out;
errout:
if (pinfo != NULL) rte_free(pinfo);
goto out;
out:
return ret;
}
struct thread_info * __thread_new_unsafe(struct appinfo * pinfo)
{
struct thread_info * tinfo;
tinfo = &pinfo->thread_info[pinfo->cur_sid];
// 分配线程序号,全局序号和上下文空间
tinfo->proc_id = pinfo->app_id;
tinfo->sid = pinfo->cur_sid;
tinfo->gsid = pinfo->cur_gsid;
tinfo->priv = NULL;
// 增加线程计数
pinfo->cur_sid++;
pinfo->cur_gsid++;
return tinfo;
}
//TODO: 线程Recover机制:线程正常(或异常)退出之后,如何恢复上下文空间。
struct thread_info * __thread_lookup_unsafe(struct appinfo * pinfo, thread_id_t sid)
{
if (sid < pinfo->cur_sid)
return &pinfo->thread_info[sid];
return NULL;
}
int __set_affinity(unsigned int cpu_id)
{
cpu_set_t _cpu_set;
CPU_ZERO(&_cpu_set);
CPU_SET(cpu_id, &_cpu_set);
pthread_t ppid = pthread_self();
int ret = pthread_setaffinity_np(ppid, sizeof(cpu_mask_t), &_cpu_set);
return ret;
}
int __thread_set_affinity(struct appinfo * pinfo,
struct thread_info * tinfo)
{
cpu_id_t cpu_id = mask_location(pinfo->cpu_mask, tinfo->sid);
if (unlikely(cpu_id >= mr_hwinfo_nr_cpus()))
{
MR_LOG(INFO, BASE, "Procman, ThreadSetAffinity, "
"Thread %d in Process %s, Invalided CPU_ID %d "
"(Exceed Physical CPU Count %d)\n",
tinfo->sid, pinfo->symbol, cpu_id, mr_hwinfo_nr_cpus());
return -1;
}
if(unlikely(cpu_id < 0))
{
MR_LOG(INFO, BASE, "AppInfo, ThreadSetAffinity, "
"Thread %d in Process %s, Invailided CPU_ID %d"
"(Invailed CPU Mask or SID)\n", tinfo->sid,
pinfo->symbol, cpu_id);
return -2;
}
socket_id_t socket_id = mr_hwinfo_socket_id(cpu_id);
assert(socket_id >= 0 && socket_id < mr_hwinfo_nr_sockets());
if(__set_affinity(cpu_id) != 0)
{
MR_LOG(INFO, BASE, "AppInfo, ThreadSetAffinity, "
"Thread %d in Process %s, Call Pthread Error : %s\n",
tinfo->cpu_id, pinfo->symbol, strerror(errno));
return -3;
}
tinfo->cpu_id = cpu_id;
tinfo->socket_id = socket_id;
return 0;
}
int __thread_register_unsafe(struct appinfo * pinfo, thread_id_t suppose_sid)
{
struct thread_info * tinfo;
// 查找当前的SID是否已经注册过
tinfo = __thread_lookup_unsafe(pinfo, suppose_sid);
// 没有注册过,注册一个新的线程
if (tinfo != NULL)
{
MR_LOG(INFO, BASE, "Thread %d in Application %s existed, recoverd.\n",
suppose_sid, pinfo->symbol);
}
else
{
MR_LOG(INFO, BASE, "Thread %d in Application %s registed as a new thread. \n",
suppose_sid, pinfo->symbol);
tinfo = __thread_new_unsafe(pinfo);
}
// 检测tinfo是否为空,此时为空就是异常情况了
if(unlikely(tinfo == NULL))
{
MR_LOG(WARNING, BASE, "Thread %d in Application %s Info Structure is NULL, failed.\n",
suppose_sid, pinfo->symbol);
return -1;
}
// 注册上下文空间
RTE_PER_LCORE(_lcore_id) = tinfo->gsid;
currect_thread_info = tinfo;
if(__thread_set_affinity(pinfo, tinfo) < 0)
{
MR_LOG(WARNING, BASE, "Thread %d in Application %s, SetThreadAffinity failed.\n",
tinfo->sid, pinfo->symbol);
return -2;
}
MR_LOG(DEBUG, BASE, "Thread Registered (ThreadID=%d, GThreadID=%d, App=%s). \n",
tinfo->sid, tinfo->gsid, pinfo->symbol);
return 0;
}
int __thread_unregister_unsafe()
{
assert(currect_thread_info != NULL);
currect_thread_info->in_use = 0;
return 0;
}
int app_manager_appinfo_create(struct app_manager * object,
const char * sym, cpu_mask_t cpu_mask)
{
int nr_max_thread = mask_popcnt(cpu_mask);
if (unlikely(nr_max_thread < 0))
{
MR_LOG(WARNING, BASE, "AppInfo, AppInfoNew, "
"Process %s CPU Mask error(mask=%"PRIx64", nr_max_thread=%d)\n",
sym, cpu_mask, nr_max_thread);
return -1;
}
mr_spin_lock(&object->lock);
int ret = __appinfo_new_unsafe(object, sym, cpu_mask, nr_max_thread);
mr_spin_unlock(&object->lock);
return ret;
}
int app_manager_appinfo_register(struct app_manager * object,
const char * sym, cpu_mask_t cpu_mask)
{
struct appinfo * pinfo;
int ret = 0;
mr_spin_lock(&object->lock);
// 查找应用是否以前注册过
pinfo = __appinfo_lookup_unsafe(object, sym);
// 应用已经存在,跳过创建过程进入后续流程
if (pinfo != NULL) goto after_create;
// 创建应用
int nr_max_thread = mask_popcnt(cpu_mask);
if(nr_max_thread == 0)
{
MR_LOG(WARNING, BASE, "AppInfo, AppInfoNew, "
"App %s CPU Mask error(mask=%"PRIx64", nr_max_thread=%d)\n",
sym, cpu_mask, nr_max_thread);
ret = -1; goto out;
}
ret = __appinfo_new_unsafe(object, sym, cpu_mask, nr_max_thread);
if (unlikely(ret < 0)) goto out;
// 成功创建,再次查找描述符
pinfo = __appinfo_lookup_unsafe(object, sym);
assert(pinfo != NULL);
after_create:
__app_run_unsafe(object, pinfo);
currect_app_info = pinfo;
out:
mr_spin_unlock(&object->lock);
return ret;
}
// 应用反注册流程
int app_manager_appinfo_unregister(struct app_manager * object)
{
struct appinfo * pinfo = currect_app_info;
if (pinfo == NULL) return -1;
int ret = 0;
// 反注册
mr_spin_lock(&object->lock);
ret = __appinfo_delete_unsafe(object, pinfo);
mr_spin_unlock(&object->lock);
// 反注册成功,当前APPInfo指针置空,防止再次引用。
if (ret >= 0) currect_app_info = NULL;
return ret;
}
int app_manager_thread_register(struct app_manager * object)
{
struct appinfo * pinfo = currect_app_info;
// 进程重启后,原来的线程可能注册过。因此设置一个变量supposed_sid
// 在进程重启后清零。如果线程号以前注册过,就不再创建线程的描述符
// 直接恢复就可以了。因此,需要两个计数器,一个全局共享,表示多少
// 个线程已经创建描述符,另一个计数器用于进程恢复,表示当前进程的
// 线程计数。
// supposed_sid在临界区中访问,不存在线程安全问题。
static thread_id_t suppose_sid = 0;
int ret = 0;
mr_spin_lock(&object->lock);
ret = __thread_register_unsafe(pinfo, suppose_sid);
if (ret >= 0) suppose_sid++;
mr_spin_unlock(&object->lock);
return ret;
}
int app_manager_thread_unregister(struct app_manager * object)
{
mr_spin_lock(&object->lock);
int ret = __thread_unregister_unsafe();
mr_spin_unlock(&object->lock);
return ret;
}
int app_manager_appinfo_iterate(struct app_manager * object, struct appinfo ** appinfo)
{
// 迭代器为空,从头开始迭代,否则查找迭代器下一个对象
if (*appinfo == NULL)
{
*appinfo = TAILQ_FIRST(&object->appinfo_list);
return 0;
}
else
{
*appinfo = TAILQ_NEXT(*appinfo, next);
}
// 迭代到尾部,返回错误码
if (*appinfo == NULL) return -ENOENT;
return 0;
}
int app_manager_list_all_appinfo(struct app_manager * object,
struct appinfo * infos[], int nr_max_infos)
{
struct appinfo * pinfo_iter;
int ret = 0;
mr_spin_lock(&object->lock);
TAILQ_FOREACH(pinfo_iter, &object->appinfo_list, next)
{
if (ret > nr_max_infos) break;
infos[ret++] = pinfo_iter;
}
mr_spin_unlock(&object->lock);
return ret;
}
int app_manager_tinfo_iterate(struct appinfo * appinfo,
struct thread_info ** tinfo, int * iterate)
{
if (*iterate >= appinfo->nr_max_thread) return -ENOENT;
*tinfo = &appinfo->thread_info[*iterate];
return *iterate++;
}
// 外部接口,获取当前应用、线程的信息
thread_id_t mr_thread_id()
{
assert(currect_thread_info != NULL);
return currect_thread_info->sid;
}
thread_id_t mr_gsid_id()
{
assert(currect_thread_info != NULL);
return currect_thread_info->gsid;
}
socket_id_t mr_socket_id()
{
assert(currect_thread_info != NULL);
return currect_thread_info->socket_id;
}
cpu_id_t mr_cpu_id()
{
assert(currect_thread_info != NULL);
return currect_thread_info->cpu_id;
}
app_id_t mr_app_id()
{
assert(currect_app_info != NULL);
return currect_app_info->app_id;
}
app_id_t mr_appinfo_get_app_id(struct appinfo * appinfo)
{
return appinfo->app_id;
}
const char * mr_appinfo_get_symbol(struct appinfo * appinfo)
{
return appinfo->symbol;
}
unsigned int mr_appinfo_get_nr_max_thread(struct appinfo * appinfo)
{
return appinfo->nr_max_thread;
}
struct appinfo * mr_app_info_get()
{
return currect_app_info;
}
struct thread_info * mr_thread_info_get()
{
return currect_thread_info;
}
// 外部接口,读取(或写入)应用级、线程级私有句柄指针
void * mr_app_priv_get()
{
assert(currect_app_info != NULL);
return currect_app_info->priv;
}
void mr_app_priv_set(void * ptr)
{
assert(currect_app_info != NULL);
currect_app_info->priv = ptr;
}
void * mr_thread_priv_get()
{
assert(currect_thread_info != NULL);
return currect_thread_info->priv;
}
void mr_thread_priv_set(void * ptr)
{
assert(currect_thread_info != NULL);
currect_thread_info->priv = ptr;
}
|