/* * author:liujun * date:2016-10-09 * func:simple tcpdump function for filtering the invalidate packets * diff:multi-pthread and the packets provided by libpag */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include struct mr_instance * g_mr_instance = NULL; struct mr_vdev * g_mr_vdev_handler = NULL; #ifndef DUMP_DEFAULT_STR_DUMPFILE #define DUMP_DEFAULT_STR_DUMPFILE "default.pcap" #endif #ifndef DUMP_DEFAULT_NR_THREAD #define DUMP_DEFAULT_NR_THREAD 1 #endif #ifndef DUMP_BURST_MAX #define DUMP_BURST_MAX 32 #endif /* 每线程句柄 */ struct dump_thread_instance { /* 使能标记,用于部分线程捕包 */ int enable; /* 线程句柄 */ pthread_t pid; /* 线程号 */ unsigned int thread_id; /* 本线程写入的DUMPFILE的文件名称 */ char str_thread_dumpfile[MR_STRING_MAX]; /* PCAP写入句柄 */ pcap_dumper_t * pcap_dumper; /* BPF代码 */ struct bpf_program bpf_prog; /* 本线程预期报文数 */ uint64_t nr_expect_packets; /* 本线程报文数统计 */ uint64_t nr_dump_packets; /* 本线程到达报文数统计 */ uint64_t nr_recv_packets; }; /* 全局句柄 */ struct dump_instance { /* 应用名称,MARSIO句柄初始化使用 */ char appsym[MR_SYMBOL_MAX]; /* 捕获网卡名称 */ char devsym[MR_SYMBOL_MAX]; /* MARSIO句柄 */ struct mr_instance * mr_instance; /* MARSIO设备句柄 */ struct mr_vdev * mr_vdev_handler; /* 线程绑定掩码 */ cpu_mask_t cpu_mask; /* 捕包线程掩码,只有在掩码中的线程才真正执行捕包逻辑 */ cpu_mask_t capture_cpu_mask; /* 线程总数 */ unsigned int nr_thread; /* DUMPFILE文件基准名称,线程加后缀 */ char str_dumpfile[MR_STRING_MAX]; /* DUMPFILE流句柄 */ FILE * fp_dumpfile; /* BPF表达式 */ char str_bpf_expr[MR_STRING_MAX]; /* 全局PCAP句柄 */ pcap_t * pcap_handle_dead; /* 总收报文数量 */ uint64_t nr_expect_packets; /* 总收报文统计 */ uint64_t nr_dump_packets; /* 各线程句柄 */ struct dump_thread_instance * th_instances[MR_SID_MAX]; /* 继续运行标志位 */ volatile unsigned int keep_running; }; struct dump_instance * g_dump_instance = NULL; void * dump_capture_thread(void * arg) { struct dump_thread_instance * th_instance = (struct dump_thread_instance *)arg; struct dump_instance * instance = g_dump_instance; unsigned int sid = th_instance->thread_id; assert(th_instance != NULL); assert(instance != NULL); while (g_dump_instance->keep_running == 1) { if (th_instance->nr_expect_packets != 0 && th_instance->nr_dump_packets >= th_instance->nr_expect_packets) break; marsio_buff_t * mbufs[DUMP_BURST_MAX]; unsigned int nr_mbufs; marsio_buff_t * mbufs_dump[DUMP_BURST_MAX]; unsigned int nr_mbufs_dump = 0; nr_mbufs = marsio_recv_all_burst(instance->mr_instance, sid, mbufs, DUMP_BURST_MAX); if (nr_mbufs == 0) continue; /* BPF过滤 */ for (int i = 0; i < nr_mbufs; i++) { const char * pkt_ptr = marsio_buff_mtod(mbufs[i]); unsigned int pkt_len = marsio_buff_datalen(mbufs[i]); if (bpf_filter(th_instance->bpf_prog.bf_insns, (const u_char *)pkt_ptr, pkt_len, pkt_len) == 0 || pkt_len > 65535) continue; mbufs_dump[nr_mbufs_dump++] = mbufs[i]; } struct pcap_pkthdr pcap_pkthdr[MR_BURST_MAX]; /* 写入DUMPFILE文件 */ for (int i = 0; i < nr_mbufs_dump; i++) { const char * pkt_ptr = marsio_buff_mtod(mbufs_dump[i]); unsigned int pkt_len = marsio_buff_datalen(mbufs_dump[i]); gettimeofday(&pcap_pkthdr->ts, NULL); pcap_pkthdr[i].caplen = pkt_len; pcap_pkthdr[i].len = pkt_len; /* 写DUMPFILE */ pcap_dump((u_char *)th_instance->pcap_dumper, &pcap_pkthdr[i], (const u_char *)pkt_ptr); } th_instance->nr_recv_packets += nr_mbufs; th_instance->nr_dump_packets += nr_mbufs_dump; marsio_buff_free(instance->mr_instance, mbufs, nr_mbufs, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY); } return NULL; } void usage() { exit(0); } void sig_interupt_handle(int signo) { g_dump_instance->keep_running = 0; } void dump_capture_result(struct dump_instance * instance) { fprintf(stderr, "\nCapture statistics:\n"); uint64_t total_recv_packets = 0; uint64_t total_dump_packets = 0; for (int i = 0; i < instance->nr_thread; i++) { if (instance->th_instances[i]->enable == 0) continue; fprintf(stderr, " Thread %d: dumpfile %s, %" PRIu64 " packets recved, %" PRIu64 " packets captured.\n", i, instance->th_instances[i]->str_thread_dumpfile, instance->th_instances[i]->nr_recv_packets, instance->th_instances[i]->nr_dump_packets); total_recv_packets += instance->th_instances[i]->nr_recv_packets; total_dump_packets += instance->th_instances[i]->nr_dump_packets; } fprintf(stderr, " Summary : dumpfile %s, %" PRIu64 " packets recved, %" PRIu64 " packets captured.\n\n", instance->str_dumpfile, total_recv_packets, total_dump_packets); return; } void __thread_dumpfile_generate(const char * filename, unsigned int thread_id, char * out_filename, size_t sz_max_out) { char _filename[LINE_MAX] = {0}; strncpy(_filename, filename, sizeof(_filename)); char * str_last_delimiter = strrchr(_filename, '.'); /* 没有拓展名,直接在文件后拼接线程号 */ if (str_last_delimiter == NULL) { snprintf(out_filename, sz_max_out, "%s_%d", _filename, thread_id); return; } char __basename[LINE_MAX] = {0}; char __typename[LINE_MAX] = {0}; /* 生成basename,即最后一个点前面的部分 */ *str_last_delimiter = '\0'; snprintf(__basename, sizeof(__basename), "%s", _filename); /* 生成拓展名,包含点和后面的部分 */ *str_last_delimiter = '.'; snprintf(__typename, sizeof(__typename), "%s", str_last_delimiter); /* 拼接文件名,在basename后拼接线程号 */ snprintf(out_filename, sz_max_out, "%s_%d%s", __basename, thread_id, __typename); return; } int dump_thread_init(struct dump_instance * instance, struct dump_thread_instance * th_instance, unsigned int thread_id) { /* 设置了线程掩码,需要判断本线程是否需要启动 */ if (mask_is_set(instance->capture_cpu_mask, thread_id) == 0) { th_instance->enable = 0; return RT_SUCCESS; } if (instance->fp_dumpfile) { /* 如果设置了FP,直接打开FILE结构体,通常是STDOUT */ th_instance->pcap_dumper = pcap_dump_fopen(instance->pcap_handle_dead, instance->fp_dumpfile); } else { /* 生成本线程使用的DUMPFILE文件名 */ __thread_dumpfile_generate(instance->str_dumpfile, thread_id, th_instance->str_thread_dumpfile, sizeof(th_instance->str_thread_dumpfile)); /* 打开本线程使用的Dumper */ th_instance->pcap_dumper = pcap_dump_open(instance->pcap_handle_dead, th_instance->str_thread_dumpfile); } if (th_instance->pcap_dumper == NULL) { fprintf(stderr, "Cannot open pcap dumper for thread %d, dumpfile is %s : %s.\n", thread_id, instance->str_bpf_expr, pcap_geterr(instance->pcap_handle_dead)); return RT_ERR; } /* 编译BPF规则 */ int ret = pcap_compile(instance->pcap_handle_dead, &th_instance->bpf_prog, instance->str_bpf_expr, 1, PCAP_NETMASK_UNKNOWN); if (ret < 0) { fprintf(stderr, "Cannot compile bpf rules for thread %d, bpf rule is %s: %s.\n", thread_id, instance->str_bpf_expr, pcap_geterr(instance->pcap_handle_dead)); return RT_ERR; } th_instance->nr_expect_packets = instance->nr_expect_packets / instance->nr_thread; th_instance->thread_id = thread_id; /* 创建线程 */ ret = pthread_create(&th_instance->pid, NULL, dump_capture_thread, (void *)th_instance); if (ret != 0) { fprintf(stderr, "Cannot create capture thread %d : %s\n", thread_id, strerror(ret)); return RT_ERR; } th_instance->enable = 1; return RT_SUCCESS; } int main(int argc, char ** argv) { g_dump_instance = malloc(sizeof(struct dump_instance)); memset(g_dump_instance, 0, sizeof(struct dump_instance)); assert(argc > 0); snprintf(g_dump_instance->appsym, sizeof(g_dump_instance->appsym), "%s", basename(argv[0])); snprintf(g_dump_instance->str_dumpfile, sizeof(g_dump_instance->str_dumpfile), "%s", DUMP_DEFAULT_STR_DUMPFILE); g_dump_instance->nr_thread = DUMP_DEFAULT_NR_THREAD; g_dump_instance->keep_running = 1; g_dump_instance->capture_cpu_mask = (mask_t)-1; /* 处理命令行参数 */ int opt = 0; while ((opt = getopt(argc, argv, "p:w:c:i:q:")) != -1) { char * endptr = NULL; switch (opt) { case 'p': /* 线程数 */ g_dump_instance->nr_thread = strtoull(optarg, &endptr, 0); if (g_dump_instance->nr_thread == 0 && endptr == optarg) usage(); break; case 'c': /* 总预期报文数量 */ g_dump_instance->nr_expect_packets = strtoull(optarg, &endptr, 0); if (g_dump_instance->nr_expect_packets == 0 && endptr == optarg) usage(); break; case 'w': /* DUMPFILE文件模板 */ snprintf(g_dump_instance->str_dumpfile, sizeof(g_dump_instance->str_dumpfile), "%s", optarg); break; case 'i': /* 网卡名称 */ snprintf(g_dump_instance->devsym, sizeof(g_dump_instance->devsym), "%s", optarg); break; case 'q': /* 线程掩码 */ if (str_to_mask(optarg, &g_dump_instance->capture_cpu_mask) < 0) usage(); break; default: usage(); } } /* 合成BPF规则 */ char str_bpf_expr[MR_STRING_MAX]; unsigned int curser_str_bpf_expr = 0; for (int i = optind; i < argc; i++) { curser_str_bpf_expr += snprintf(str_bpf_expr + curser_str_bpf_expr, sizeof(str_bpf_expr) - curser_str_bpf_expr, "%s ", argv[i]); } strncpy(g_dump_instance->str_bpf_expr, str_bpf_expr, sizeof(g_dump_instance->str_bpf_expr)); /* 打开PCAP句柄 */ g_dump_instance->pcap_handle_dead = pcap_open_dead(DLT_EN10MB, 65535); if (g_dump_instance->pcap_handle_dead == NULL) { fprintf(stderr, "Cannot open pcap handle: %s.\n", strerror(errno)); return RT_ERR; } /* 打开MARSIO句柄 */ g_dump_instance->mr_instance = marsio_create(); int ret = marsio_init(g_dump_instance->mr_instance, g_dump_instance->appsym); if (ret < 0) { fprintf(stderr, "Cannot initialize mrzcpd handle. \n"); return RT_ERR; } /* 打开MARSIO设备 */ g_dump_instance->mr_vdev_handler = marsio_open_device(g_dump_instance->mr_instance, g_dump_instance->devsym, g_dump_instance->nr_thread, g_dump_instance->nr_thread); if (g_dump_instance->mr_vdev_handler == NULL) { fprintf(stderr, "Cannot open device %s.\n", g_dump_instance->devsym); return RT_ERR; } /* 输出到标准输出,便于配合管道调用系统TCPDUMP解析 */ if (strcmp(g_dump_instance->str_dumpfile, "-") == 0) { g_dump_instance->fp_dumpfile = stdout; g_dump_instance->nr_thread = 1; } signal(SIGINT, sig_interupt_handle); /* 捕包线程初始化 */ for (int i = 0; i < g_dump_instance->nr_thread; i++) { g_dump_instance->th_instances[i] = malloc(sizeof(struct dump_thread_instance)); memset(g_dump_instance->th_instances[i], 0, sizeof(struct dump_thread_instance)); int ret = dump_thread_init(g_dump_instance, g_dump_instance->th_instances[i], i); if (ret < 0) return ret; } /* 等待线程结束 */ for (int i = 0; i < g_dump_instance->nr_thread; i++) { if (g_dump_instance->th_instances[i]->enable == 0) continue; pthread_join(g_dump_instance->th_instances[i]->pid, NULL); } pcap_close(g_dump_instance->pcap_handle_dead); dump_capture_result(g_dump_instance); return 0; }