summaryrefslogtreecommitdiff
path: root/rdns_scan/zmap4rdns/src/ztee.c
diff options
context:
space:
mode:
Diffstat (limited to 'rdns_scan/zmap4rdns/src/ztee.c')
-rw-r--r--rdns_scan/zmap4rdns/src/ztee.c555
1 files changed, 555 insertions, 0 deletions
diff --git a/rdns_scan/zmap4rdns/src/ztee.c b/rdns_scan/zmap4rdns/src/ztee.c
new file mode 100644
index 0000000..807cc78
--- /dev/null
+++ b/rdns_scan/zmap4rdns/src/ztee.c
@@ -0,0 +1,555 @@
+/*
+ * ZTee Copyright 2014 Regents of the University of Michigan
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy
+ * of the License at http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+// without defining this, FreeBSD throws a warning.
+#define _WITH_GETLINE
+#include <stdio.h>
+
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+
+#include <errno.h>
+#include <getopt.h>
+#include <pthread.h>
+#include <unistd.h>
+#include <signal.h>
+
+#include "../lib/lockfd.h"
+#include "../lib/logger.h"
+#include "../lib/queue.h"
+#include "../lib/util.h"
+#include "../lib/xalloc.h"
+#include "../lib/csv.h"
+
+#include "topt.h"
+
+typedef enum file_format { FORMAT_CSV, FORMAT_JSON, FORMAT_RAW } format_t;
+static const char *format_names[] = {"csv", "json", "raw"};
+
+typedef struct ztee_conf {
+ // Files
+ char *output_filename;
+ char *status_updates_filename;
+ char *log_file_name;
+ FILE *output_file;
+ FILE *status_updates_file;
+ FILE *log_file;
+
+ // Log level
+ int log_level;
+
+ // Input formats
+ format_t in_format;
+ format_t out_format;
+
+ // Output config
+ int success_only;
+
+ // Monitor config
+ int monitor;
+
+ // Field indices
+ size_t ip_field;
+ size_t success_field;
+
+} ztee_conf_t;
+
+static ztee_conf_t tconf;
+
+static int print_from_csv(char *line);
+
+static format_t test_input_format(char *line, size_t len)
+{
+ // Check for empty input, remember line contains '\n'
+ if (len < 2) {
+ return FORMAT_RAW;
+ }
+ if (len >= 3) {
+ // If the input is JSON, the line should look like
+ // {.......}\n
+ if (line[0] == '{' && line[len - 2] == '}') {
+ return FORMAT_JSON;
+ }
+ }
+ if (strchr(line, ',') != NULL) {
+ return FORMAT_CSV;
+ }
+ return FORMAT_RAW;
+}
+
+int done = 0;
+int process_done = 0;
+int total_read_in = 0;
+int read_in_last_sec = 0;
+int total_written = 0;
+
+double start_time;
+
+pthread_t threads[3];
+
+// one thread reads in
+// one thread writes out and parses
+
+// pops next element and determines what to do
+// if zqueue_t is empty and read_in is finished, then
+// it exits
+void *process_queue(void *my_q);
+
+// uses fgets to read from stdin and add it to the zqueue_t
+void *read_in(void *my_q);
+
+// does the same as find UP but finds only successful IPs, determined by the
+// is_successful field and flag
+void find_successful_IP(char *my_string);
+
+// finds IP in the string of csv and sends it to stdout for zgrab
+// you need to know what position is the csv string the ip field is in
+// zero indexed
+void find_IP(char *my_string);
+
+// writes a csv string out to csv file
+// fprintf(stderr, "Is empty inside if %i\n", is_empty(queue));
+void write_out_to_file(char *data);
+
+// figure out how many fields are present if it is a csv
+void figure_out_fields(char *data);
+
+// check that the output file is either in a csv form or json form
+// throws error is it is not either
+// NOTE: JSON OUTPUT NOT IMPLEMENTED
+void output_file_is_csv();
+
+void print_thread_error();
+
+// monitor code for ztee
+// executes every second
+void *monitor_ztee(void *my_q);
+
+#define SET_IF_GIVEN(DST, ARG) \
+ { \
+ if (args.ARG##_given) { \
+ (DST) = args.ARG##_arg; \
+ }; \
+ }
+#define SET_BOOL(DST, ARG) \
+ { \
+ if (args.ARG##_given) { \
+ (DST) = 1; \
+ }; \
+ }
+
+int main(int argc, char *argv[])
+{
+ struct gengetopt_args_info args;
+ struct cmdline_parser_params *params;
+ params = cmdline_parser_params_create();
+ assert(params);
+ params->initialize = 1;
+ params->override = 0;
+ params->check_required = 0;
+
+ if (cmdline_parser_ext(argc, argv, &args, params) != 0) {
+ exit(EXIT_SUCCESS);
+ }
+
+ signal(SIGPIPE, SIG_IGN);
+
+ // Handle help text and version
+ if (args.help_given) {
+ cmdline_parser_print_help();
+ exit(EXIT_SUCCESS);
+ }
+ if (args.version_given) {
+ cmdline_parser_print_version();
+ exit(EXIT_SUCCESS);
+ }
+
+ // Try opening the log file
+ tconf.log_level = ZLOG_WARN;
+ if (args.log_file_given) {
+ tconf.log_file = fopen(args.log_file_arg, "w");
+ } else {
+ tconf.log_file = stderr;
+ }
+
+ // Check for an error opening the log file
+ if (tconf.log_file == NULL) {
+ log_init(stderr, tconf.log_level, 0, "ztee");
+ log_fatal("ztee", "Could not open log file");
+ }
+
+ // Actually init the logging infrastructure
+ log_init(tconf.log_file, tconf.log_level, 0, "ztee");
+
+ // Check for an output file
+ if (args.inputs_num < 1) {
+ log_fatal("ztee", "No output file specified");
+ }
+ if (args.inputs_num > 1) {
+ log_fatal("ztee", "Extra positional arguments starting with %s",
+ args.inputs[1]);
+ }
+
+ tconf.output_filename = args.inputs[0];
+ tconf.output_file = fopen(tconf.output_filename, "w");
+ if (!tconf.output_file) {
+ log_fatal("ztee", "Could not open output file %s, %s",
+ tconf.output_filename, strerror(errno));
+ }
+
+ // Read actual options
+ int raw = 0;
+ SET_BOOL(tconf.success_only, success_only);
+ SET_BOOL(tconf.monitor, monitor);
+ SET_BOOL(raw, raw);
+
+ // Open the status update file if necessary
+ if (args.status_updates_file_given) {
+ // Try to open the status output file
+ char *filename = args.status_updates_file_arg;
+ FILE *file = fopen(filename, "w");
+ if (!file) {
+ char *err = strerror(errno);
+ log_fatal("ztee",
+ "unable to open status updates file %s (%s)",
+ filename, err);
+ }
+ // Set the variables in state
+ tconf.status_updates_filename = filename;
+ tconf.status_updates_file = file;
+ }
+
+ // Read the first line of the input file
+ size_t first_line_len = 1024;
+ char *first_line = xmalloc(first_line_len);
+ if (getline(&first_line, &first_line_len, stdin) < 0) {
+ log_fatal("ztee", "reading input to test format failed");
+ }
+ // Detect the input format
+ if (!raw) {
+ format_t format = test_input_format(first_line, first_line_len);
+ log_info("ztee", "detected input format %s",
+ format_names[format]);
+ tconf.in_format = format;
+ } else {
+ tconf.in_format = FORMAT_RAW;
+ log_info("ztee", "raw input");
+ }
+
+ if (tconf.in_format == FORMAT_JSON) {
+ log_fatal("ztee", "json input not implemented");
+ }
+
+ // Find fields if needed
+ char *header = strdup(first_line);
+ int found_success = 0;
+ int found_ip = 0;
+ if (tconf.in_format == FORMAT_CSV) {
+ static const char *success_names[] = {"success"};
+ static const char *ip_names[] = {"saddr", "ip"};
+ int success_idx = csv_find_index(header, success_names, 1);
+ if (success_idx >= 0) {
+ found_success = 1;
+ tconf.success_field = (size_t)success_idx;
+ }
+ int ip_idx = csv_find_index(header, ip_names, 2);
+ if (found_ip >= 0) {
+ found_ip = 1;
+ tconf.ip_field = (size_t)ip_idx;
+ }
+ if (!found_ip) {
+ log_fatal("ztee", "Unable to find IP/SADDR field");
+ }
+ }
+
+ if (tconf.success_only) {
+ if (tconf.in_format != FORMAT_CSV) {
+ log_fatal("ztee", "success filter requires csv input");
+ }
+ if (!found_success) {
+ log_fatal("ztee", "Could not find success field");
+ }
+ }
+
+ // Make the queue
+ zqueue_t *queue = queue_init();
+ assert(queue);
+
+ // Add the first line to the queue if needed
+ push_back(first_line, queue);
+
+ // Start the regular read thread
+ pthread_t read_thread;
+ if (pthread_create(&read_thread, NULL, read_in, queue)) {
+ log_fatal("ztee", "unable to start read thread");
+ }
+
+ // Record the start time
+ start_time = now();
+
+ // Start the process thread
+ pthread_t process_thread;
+ if (pthread_create(&process_thread, NULL, process_queue, queue)) {
+ log_fatal("ztee", "unable to start process thread");
+ }
+
+ // Start the monitor thread if necessary, and join to it
+ if (tconf.monitor || tconf.status_updates_file) {
+ pthread_t monitor_thread;
+ if (pthread_create(&monitor_thread, NULL, monitor_ztee,
+ queue)) {
+ log_fatal("ztee", "unable to create monitor thread");
+ }
+ pthread_join(monitor_thread, NULL);
+ }
+
+ // Join to the remaining threads,
+ pthread_join(read_thread, NULL);
+ pthread_join(process_thread, NULL);
+ return 0;
+}
+
+void *process_queue(void *arg)
+{
+ zqueue_t *queue = arg;
+ FILE *output_file = tconf.output_file;
+ while (!process_done) {
+
+ pthread_mutex_lock(&queue->lock);
+ while (!done && is_empty(queue)) {
+ pthread_cond_wait(&queue->empty, &queue->lock);
+ }
+ if (done && is_empty(queue)) {
+ process_done = 1;
+ pthread_mutex_unlock(&queue->lock);
+ continue;
+ }
+ znode_t *node = pop_front_unsafe(queue);
+ pthread_mutex_unlock(&queue->lock);
+
+ // Write raw data to output file
+ fprintf(output_file, "%s", node->data);
+ fflush(output_file);
+ if (ferror(output_file)) {
+ log_fatal("ztee", "Error writing to output file");
+ }
+
+ // Dump to stdout
+ switch (tconf.in_format) {
+ case FORMAT_JSON:
+ log_fatal("ztee", "JSON input format unimplemented");
+ break;
+ case FORMAT_CSV:
+ print_from_csv(node->data);
+ break;
+ default:
+ // Handle raw
+ fprintf(stdout, "%s", node->data);
+ break;
+ }
+
+ // Check to see if write failed
+ fflush(stdout);
+ if (ferror(stdout)) {
+ log_fatal("ztee", "%s", "Error writing to stdout");
+ }
+
+ // Record output lines
+ total_written++;
+
+ // Free the memory
+ free(node->data);
+ free(node);
+ }
+ process_done = 1;
+ fflush(output_file);
+ fclose(output_file);
+ return NULL;
+}
+
+void *read_in(void *arg)
+{
+ // Allocate buffers
+ zqueue_t *queue = (zqueue_t *)arg;
+ size_t length = 1000;
+ char *input = xcalloc(sizeof(char), length);
+ ;
+
+ // Read in from stdin and add to back of linked list
+ while (getline(&input, &length, stdin) > 0) {
+ push_back(input, queue);
+
+ total_read_in++;
+ read_in_last_sec++;
+ }
+ pthread_mutex_lock(&queue->lock);
+ done = 1;
+ pthread_cond_signal(&queue->empty);
+ pthread_mutex_unlock(&queue->lock);
+ return NULL;
+}
+
+int print_from_csv(char *line)
+{
+ if (total_written == 0) {
+ return 1;
+ }
+ if (tconf.success_only) {
+ char *success_entry = csv_get_index(line, tconf.success_field);
+ if (success_entry == NULL) {
+ return 1;
+ }
+ int success = 0;
+ if (atoi(success_entry)) {
+ success = 1;
+ } else if (strcasecmp(success_entry, "true") == 0) {
+ success = 1;
+ }
+ if (!success) {
+ return 1;
+ }
+ }
+ // Find the ip
+ char *ip = csv_get_index(line, tconf.ip_field);
+ int ret = fprintf(stdout, "%s\n", ip);
+ if (ferror(stdout)) {
+ log_fatal("ztee", "unable to write to stdout");
+ }
+ return ret;
+}
+
+void output_file_is_csv()
+{
+ return;
+ /*
+ char *dot = strrchr(output_filename);
+ if dot == NULL {
+ return;
+ }
+ */
+ /*
+ int length = strlen(output_filename);
+ char *end_of_file = malloc(sizeof(char*) *4);
+ strncpy(end_of_file, output_filename+(length - 3), 3);
+ end_of_file[4] = '\0';
+ const char *csv = "csv\n";
+ const char *json = "jso\n";
+ if(!strncmp(end_of_file, csv, 3) && !strncmp(end_of_file, json, 3)){
+ log_fatal("ztee", "Invalid output format");
+ }
+ if(!strncmp(end_of_file, csv, 3)) output_csv = 1;
+ if(!strncmp(end_of_file, json, 3)) output_csv = 0;
+ */
+}
+
+void print_thread_error(char *string)
+{
+ fprintf(stderr, "Could not create thread %s\n", string);
+ return;
+}
+
+#define TIME_STR_LEN 20
+
+typedef struct ztee_stats {
+ // Read stats
+ uint32_t total_read;
+ uint32_t read_per_sec_avg;
+ uint32_t read_last_sec;
+
+ // Buffer stats
+ uint32_t buffer_cur_size;
+ uint32_t buffer_avg_size;
+ uint64_t _buffer_size_sum;
+
+ // Duration
+ double _last_age;
+ uint32_t time_past;
+ char time_past_str[TIME_STR_LEN];
+} stats_t;
+
+void update_stats(stats_t *stats, zqueue_t *queue)
+{
+ double age = now() - start_time;
+ double delta = age - stats->_last_age;
+ stats->_last_age = age;
+
+ stats->time_past = age;
+ time_string((int)age, 0, stats->time_past_str, TIME_STR_LEN);
+
+ uint32_t total_read = total_read_in;
+ stats->read_last_sec = (total_read - stats->total_read) / delta;
+ stats->total_read = total_read;
+ stats->read_per_sec_avg = stats->total_read / age;
+
+ stats->buffer_cur_size = get_size(queue);
+ stats->_buffer_size_sum += stats->buffer_cur_size;
+ stats->buffer_avg_size = stats->_buffer_size_sum / age;
+}
+
+void *monitor_ztee(void *arg)
+{
+ zqueue_t *queue = (zqueue_t *)arg;
+ stats_t *stats = xmalloc(sizeof(stats_t));
+
+ if (tconf.status_updates_file) {
+ fprintf(
+ tconf.status_updates_file,
+ "time_past,total_read_in,read_in_last_sec,read_per_sec_avg,"
+ "buffer_current_size,buffer_avg_size\n");
+ fflush(tconf.status_updates_file);
+ if (ferror(tconf.status_updates_file)) {
+ log_fatal("ztee",
+ "unable to write to status updates file");
+ }
+ }
+ while (!process_done) {
+ sleep(1);
+
+ update_stats(stats, queue);
+ if (tconf.monitor) {
+ lock_file(stderr);
+ fprintf(
+ stderr,
+ "%5s read_rate: %u rows/s (avg %u rows/s), buffer_size: %u (avg %u)\n",
+ stats->time_past_str, stats->read_last_sec,
+ stats->read_per_sec_avg, stats->buffer_cur_size,
+ stats->buffer_avg_size);
+ fflush(stderr);
+ unlock_file(stderr);
+ if (ferror(stderr)) {
+ log_fatal(
+ "ztee",
+ "unable to write status updates to stderr");
+ }
+ }
+ if (tconf.status_updates_file) {
+ fprintf(tconf.status_updates_file,
+ "%u,%u,%u,%u,%u,%u\n", stats->time_past,
+ stats->total_read, stats->read_last_sec,
+ stats->read_per_sec_avg, stats->buffer_cur_size,
+ stats->buffer_avg_size);
+ fflush(tconf.status_updates_file);
+ if (ferror(tconf.status_updates_file)) {
+ log_fatal(
+ "ztee",
+ "unable to write to status updates file");
+ }
+ }
+ }
+ if (tconf.monitor) {
+ lock_file(stderr);
+ fflush(stderr);
+ unlock_file(stderr);
+ }
+ if (tconf.status_updates_file) {
+ fflush(tconf.status_updates_file);
+ fclose(tconf.status_updates_file);
+ }
+ return NULL;
+}