summaryrefslogtreecommitdiff
path: root/src/trace_output.c
diff options
context:
space:
mode:
author童宗振 <[email protected]>2024-04-11 10:19:06 +0000
committer童宗振 <[email protected]>2024-04-11 10:19:06 +0000
commit6b01bae42db789e5373bb9fab41365619f03517c (patch)
tree396e5dd7f96df14d8b3faf20dea659420ae3957a /src/trace_output.c
parentde1bd5cdd8b6f7ecf3e3c512bed48b9154141e70 (diff)
Adjust code
Diffstat (limited to 'src/trace_output.c')
-rw-r--r--src/trace_output.c94
1 files changed, 47 insertions, 47 deletions
diff --git a/src/trace_output.c b/src/trace_output.c
index e3adfdc..79e9d14 100644
--- a/src/trace_output.c
+++ b/src/trace_output.c
@@ -15,6 +15,8 @@
#define BURST_MAX 64
extern struct mr_instance * mr_instance;
+static rd_kafka_t * kafka_handle = NULL;
+static rd_kafka_topic_t * kafka_topic = NULL;
int dp_trace_file_mutex_lock(job_bitmap_t job_id);
int dp_trace_file_mutex_unlock(job_bitmap_t job_id);
@@ -22,7 +24,8 @@ bool dp_trace_file_reach_max_size(job_bitmap_t job_id);
void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitmap_t job_id);
void dp_trace_file_rollbak(job_bitmap_t job_id);
int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs[], int nr_mbufs,
- marsio_buff_t * class_mbufs[], int nr_class_mbufs, int nr_jobs_mbufs[DP_TRACE_JOB_NUM_MAX]);
+ marsio_buff_t * class_mbufs[DP_TRACE_JOB_NUM_MAX][nr_mbufs],
+ int nr_jobs_mbufs[DP_TRACE_JOB_NUM_MAX]);
static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size);
static int dp_trace_record_decode_to_str(marsio_buff_t * mr_mbuf, char * data, unsigned int size);
@@ -35,12 +38,14 @@ struct dp_trace_output
pthread_mutex_t file_mutex;
};
-struct dp_trace_output dp_trace_output[DP_TRACE_JOB_NUM_MAX] = {};
+static struct dp_trace_output dp_trace_output[DP_TRACE_JOB_NUM_MAX] = {};
void dp_trace_output_init()
{
+ const struct config * conf = global_config_get();
+
// pcapng file path init
- char * dp_trace_dir = g_conf->dp_trace_dir;
+ const char * dp_trace_dir = conf->dp_trace_dir;
int ret = mkdir(dp_trace_dir, 0755);
if (ret != 0 && errno != EEXIST)
{
@@ -69,25 +74,26 @@ void dp_trace_output_init()
}
}
- g_conf->kafka_handle = kafka_handle_create(g_conf->broker_list, g_conf->sasl_password, g_conf->sasl_username);
- g_conf->kafka_topic = kafka_topic_new(g_conf->kafka_handle, g_conf->topic_name, NULL);
+ kafka_handle = kafka_handle_create(conf->broker_list, conf->sasl_password, conf->sasl_username);
+ kafka_topic = kafka_topic_new(kafka_handle, conf->topic_name, NULL);
}
void * dp_trace_process_thread(void * arg)
{
+ const struct config * conf = global_config_get();
+
uintptr_t qid = (uintptr_t)arg;
marsio_buff_t * rx_buff[BURST_MAX];
- unsigned int nr_burst = TELEMETRY_DIM(rx_buff);
- unsigned int nr_class_mbufs = nr_burst * DP_TRACE_JOB_NUM_MAX;
- marsio_buff_t * class_mbufs[nr_class_mbufs];
+ marsio_buff_t * tx_buff[BURST_MAX];
+ marsio_buff_t * class_mbufs[DP_TRACE_JOB_NUM_MAX][BURST_MAX];
int nr_jobs_mbufs[DP_TRACE_JOB_NUM_MAX];
marsio_thread_init(mr_instance);
while (1)
{
- int nr_recv = marsio_dp_trace_recv_burst(mr_instance, qid, rx_buff, nr_burst);
+ unsigned int nr_recv = marsio_dp_trace_recv_burst(mr_instance, qid, rx_buff, TELEMETRY_DIM(rx_buff));
if (nr_recv == 0)
{
// todo: use epoll+eventfd
@@ -95,9 +101,9 @@ void * dp_trace_process_thread(void * arg)
continue;
}
- dp_trace_classification(mr_instance, rx_buff, nr_recv, class_mbufs, nr_class_mbufs, nr_jobs_mbufs);
+ dp_trace_classification(mr_instance, rx_buff, nr_recv, class_mbufs, nr_jobs_mbufs);
- for (unsigned int i = 0; i < 8; i++)
+ for (unsigned int i = 0; i < DP_TRACE_JOB_NUM_MAX; i++)
{
unsigned int nr_mbufs = nr_jobs_mbufs[i];
if (nr_mbufs == 0)
@@ -105,46 +111,40 @@ void * dp_trace_process_thread(void * arg)
continue;
}
- for (unsigned int j = 0; j < nr_mbufs; j++)
+ uint8_t role = job_id_role_get(index_to_job_id(i));
+ if (role == DP_TRACE_ROLE)
{
- rx_buff[j] = class_mbufs[i * nr_recv + j];
+ for (unsigned int j = 0; j < nr_mbufs; j++)
+ {
+ tx_buff[j] = class_mbufs[i][j];
+ }
+ job_bitmap_t job_id = index_to_job_id(i);
+ cli_job_mbufs_write_process(tx_buff, nr_mbufs, job_id);
}
- job_bitmap_t job_id = index_to_job_id(i);
- cli_job_mbufs_write_process(rx_buff, nr_mbufs, job_id);
- }
-
- for (unsigned int i = 8; i < 16; i++)
- {
- unsigned int nr_mbufs = nr_jobs_mbufs[i];
- if (nr_mbufs == 0)
+ else if (role == DP_TELEMETRY_ROLE)
{
- continue;
+ for (unsigned int j = 0; j < nr_mbufs; j++)
+ {
+ char * data;
+ size_t size;
+ dp_trace_decode_to_message_pack(class_mbufs[i][j], &data, &size);
+ kafka_produce(kafka_topic, (void *)data, size);
+ }
}
-
- for (unsigned int j = 0; j < nr_mbufs; j++)
- {
- rx_buff[j] = class_mbufs[i * nr_recv + j];
- }
- for (unsigned int j = 0; j < nr_mbufs; j++)
+ else
{
- char * data;
- size_t size;
- dp_trace_decode_to_message_pack(rx_buff[j], &data, &size);
- kafka_produce(g_conf->kafka_topic, (void *)data, size);
+ syslog(LOG_INFO,
+ "The job has been deleted. The trace content corresponding to the job has been discarded.");
+ marsio_dp_trace_free(mr_instance, class_mbufs[i], nr_mbufs);
}
}
}
}
int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs[], int nr_mbufs,
- marsio_buff_t * class_mbufs[], int nr_class_mbufs, int nr_jobs_mbufs[DP_TRACE_JOB_NUM_MAX])
+ marsio_buff_t * class_mbufs[DP_TRACE_JOB_NUM_MAX][BURST_MAX],
+ int nr_jobs_mbufs[DP_TRACE_JOB_NUM_MAX])
{
- assert(nr_class_mbufs >= DP_TRACE_JOB_NUM_MAX * nr_mbufs);
- if (unlikely(nr_class_mbufs < DP_TRACE_JOB_NUM_MAX * nr_mbufs))
- {
- return -1;
- }
-
memset((void *)nr_jobs_mbufs, 0, DP_TRACE_JOB_NUM_MAX * sizeof(int));
for (unsigned int i = 0; i < nr_mbufs; i++)
@@ -163,7 +163,7 @@ int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs
for (unsigned int j = 0; j < DP_TRACE_JOB_NUM_MAX; j++)
{
- uint16_t job_id_mask = 1 << j;
+ uint16_t job_id_mask = index_to_job_id(j);
uint16_t job_id = jobs_id & job_id_mask;
if (job_id == 0)
{
@@ -171,7 +171,7 @@ int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs
}
unsigned int nr_job = nr_jobs_mbufs[j];
- class_mbufs[j * nr_mbufs + nr_job] = mbufs[i];
+ class_mbufs[j][nr_job] = mbufs[i];
nr_jobs_mbufs[j]++;
}
}
@@ -261,7 +261,7 @@ bool dp_trace_file_reach_max_size(job_bitmap_t job_id)
dp_trace_file_mutex_lock(job_id);
- unsigned int max_size = g_conf->dp_trace_file_max_size_in_KB / 2;
+ unsigned int max_size = global_config_get()->dp_trace_file_max_size_in_KB / 2;
if (max_size == 0)
{
@@ -342,8 +342,8 @@ void dp_trace_pcapng_merger(job_bitmap_t job_id)
}
char command[2 * PATH_MAX];
- snprintf(command, sizeof(command), "timeout -v %us mergecap -w %s %s %s 2>&1", g_conf->dp_trace_merge_timeout,
- file_path, file_middle_path, file_bak_path);
+ snprintf(command, sizeof(command), "timeout -v %us mergecap -w %s %s %s 2>&1",
+ global_config_get()->dp_trace_merge_timeout, file_path, file_middle_path, file_bak_path);
syslog(LOG_INFO, "merge trace file: %s", command);
FILE * fp;
@@ -433,9 +433,9 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat
mpack_write_cstr(&writer, job_id_str);
mpack_write_cstr(&writer, "sled_ip");
- if (g_conf->sled_ip != NULL)
+ if (global_config_get()->sled_ip != NULL)
{
- mpack_write_cstr(&writer, g_conf->sled_ip);
+ mpack_write_cstr(&writer, global_config_get()->sled_ip);
}
else
{
@@ -443,7 +443,7 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat
}
mpack_write_cstr(&writer, "device_group");
- mpack_write_cstr(&writer, g_conf->device_group);
+ mpack_write_cstr(&writer, global_config_get()->device_group);
mpack_write_cstr(&writer, "source_ip");
mpack_write_cstr(&writer, pkt_info.src_addr_str);