1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
/*
**********************************************************************************************
* File: main.cpp
* Description: stellar main entry
*
* Copyright: (c) 2018-2022 Geedge Networks, Inc. All rights reserved.
***********************************************************************************************
*/
#include <limits.h>
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <netinet/ether.h>
#include <netinet/ip.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/syscall.h>
#include "logger.h"
#include "packet_io.h"
#include "packet_io_util.h"
#include "session_manager.h"
#include "plugin_manager.h"
#include "http.h"
#include "utils.h"
#include "util_errors.h"
#include "packet_io_internal.h"
struct worker_thread_ctx
{
pthread_t tid;
size_t dev_num;
struct packet_io_device **pdev;
struct session_manager *session_mgr;
struct plugin_manager *plugin_mgr;
int thread_id;
};
void *worker_thread_cycle(void *arg)
{
//uint64_t counter = 0;
//size_t data_len = 0;
//pid_t tid = syscall(SYS_gettid);
struct stellar_packet *rx_pkt[64];
struct stellar_session *session;
struct worker_thread_ctx *thread_arg = (struct worker_thread_ctx *)arg;
while (1) {
for (size_t i = 0; i < thread_arg->dev_num; i++) {
ssize_t fetch_num = packet_io_device_rx(thread_arg->pdev[i], thread_arg->thread_id, rx_pkt, 5);
for (ssize_t j = 0; j < fetch_num; j++) {
/*
char *buf_addr = get_stellar_packet_data(rx_pkt[j], &data_len);
struct ethhdr *eth_hdr = (struct ethhdr *)buf_addr;
uint16_t eth_type = ntohs(eth_hdr->h_proto);
if (eth_type == ETHERNET_TYPE_IP) {
struct ip *ip_hdr = (struct ip *)(eth_hdr + 1);
char sip_str[64] = {0};
char dip_str[64] = {0};
strncpy_safe(sip_str, inet_ntoa(ip_hdr->ip_src), sizeof(sip_str));
strncpy_safe(dip_str, inet_ntoa(ip_hdr->ip_dst), sizeof(dip_str));
//printf("sip: %s dip: %s, ip_len:%u\n", sip_str, dip_str, ntohs(ip_hdr->ip_len));
} else if (eth_type == ETHERNET_TYPE_IPV6) {
}
}
counter += fetch_num;
printf("tid: %d counter:%lu\n", tid, counter);
if (fetch_num == 0) {
sleep(1);
continue;
}
packet_io_pkts_free(thread_arg->pdev[i], thread_arg->tid, rx_pkt, fetch_num);
*/
session = session_manager_commit(thread_arg->session_mgr, rx_pkt[j], thread_arg->thread_id);
while(session) {
plugin_manager_dispatch(thread_arg->plugin_mgr, session, thread_arg->thread_id);
session = session_manager_fetch_session(thread_arg->session_mgr, session, thread_arg->thread_id);
}
//clean session_manager event queue
packet_io_device_tx(thread_arg->pdev[i], thread_arg->thread_id, &rx_pkt[j], 1);
}
if (fetch_num == 0) {
printf("no fetch num\n");
sleep(1);
//dispatch to time event
//dispatch to trigger polling event
}
}
}
return nullptr;
}
int main(int argc, char **argv)
{
int thread_num = 2;
char file_path[] = "./plugs/plugins.inf";
char packet_io_conf[PATH_MAX] = "./conf/packet_io/packet_io.toml";
struct packet_io_device *devices[64] = {0};
size_t nr_devices = 0;
struct packet_io_instance *pinst = packet_io_init("stellar", packet_io_conf, devices, &nr_devices);
if (nullptr == pinst) {
log_error(ST_ERR_PIO_INSTANCE, "packet_io init failed.");
return -1;
}
struct session_manager *session_mgr = session_manager_create(thread_num);
struct plugin_manager *plugin_mgr = plugin_manager_create(thread_num);
plugin_manager_register(plugin_mgr, "HTTP", SESSION_STATE_ALL, http_entry);
plugin_manager_load(plugin_mgr, file_path);
struct worker_thread_ctx *workers = (struct worker_thread_ctx *)calloc(sizeof(struct worker_thread_ctx), thread_num);
for (int i = 0; i < thread_num; i++)
{
workers[i].pdev = devices;
workers[i].dev_num = nr_devices;
workers[i].session_mgr = session_mgr;
workers[i].plugin_mgr = plugin_mgr;
workers[i].thread_id = i;
pthread_create(&workers[i].tid, nullptr, worker_thread_cycle, (void *)&workers[i]);
}
while (1)
{
/* main loop code */
usleep(1);
}
plugin_manager_unload(plugin_mgr);
plugin_manager_destory(plugin_mgr);
return 0;
}
|