summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorliuxueli <[email protected]>2024-01-26 14:42:23 +0800
committerliuxueli <[email protected]>2024-02-01 16:46:42 +0800
commit01f6c61298d56cdf0a99300636bbb8ed5bf0b0fa (patch)
tree0a0d4601e3ba6ee3c1c811cdd162026adfc6d85f
parent82ce053f67b78b03d89641a2ee53b7825c2b2fa9 (diff)
同步发送日志调整为异步发送日志feature-aync-sendlog-ringbuff
-rw-r--r--CMakeLists.txt4
-rw-r--r--deps/ringbuf/LICENSE25
-rw-r--r--deps/ringbuf/README.md104
-rw-r--r--deps/ringbuf/ringbuf.c430
-rw-r--r--deps/ringbuf/ringbuf.h29
-rw-r--r--deps/ringbuf/utils.h115
-rw-r--r--inc/tsg_send_log.h3
-rw-r--r--src/CMakeLists.txt20
-rw-r--r--src/tsg_send_log.cpp889
-rw-r--r--src/tsg_send_log_internal.h39
-rw-r--r--test/src/CMakeLists.txt39
11 files changed, 1324 insertions, 373 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index f6a2582..268e7ee 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -8,9 +8,13 @@ include(Version)
set(CMAKE_MACOSX_RPATH 0)
+set(CMAKE_C_STANDARD 99)
+set(CMAKE_CXX_STANDARD 17)
+
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g")
+include_directories(${PROJECT_SOURCE_DIR}/deps/)
include_directories(${PROJECT_SOURCE_DIR}/inc/)
include_directories(/opt/MESA/include/)
diff --git a/deps/ringbuf/LICENSE b/deps/ringbuf/LICENSE
new file mode 100644
index 0000000..4f86ee0
--- /dev/null
+++ b/deps/ringbuf/LICENSE
@@ -0,0 +1,25 @@
+/*-
+ * Copyright (c) 2016-2017 Mindaugas Rasiukevicius <rmind at noxt eu>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
diff --git a/deps/ringbuf/README.md b/deps/ringbuf/README.md
new file mode 100644
index 0000000..e5b849d
--- /dev/null
+++ b/deps/ringbuf/README.md
@@ -0,0 +1,104 @@
+# Lock-free ring buffer
+
+[![Build Status](https://travis-ci.org/rmind/ringbuf.svg?branch=master)](https://travis-ci.org/rmind/ringbuf)
+
+Lock-free multi-producer single-consumer (MPSC) ring buffer which supports
+contiguous range operations and which can be conveniently used for message
+passing. The implementation is written in C11 and distributed under the
+2-clause BSD license.
+
+## API
+
+* `int ringbuf_setup(ringbuf_t *rbuf, unsigned nworkers, size_t length)`
+ * Setup a new ring buffer of a given _length_. The `rbuf` is a pointer
+ to the opaque ring buffer object; the caller is responsible to allocate
+ the space for this object. Typically, the object would be allocated
+ dynamically if using threads or reserved in a shared memory blocked if
+ using processes. The allocation size for the object shall be obtained
+ using the `ringbuf_get_sizes` function. Returns 0 on success and -1
+ on failure.
+
+* `void ringbuf_get_sizes(unsigned nworkers, size_t *ringbuf_obj_size, size_t *ringbuf_worker_size)`
+ * Returns the size of the opaque `ringbuf_t` and, optionally, `ringbuf_worker_t` structures.
+ The size of the `ringbuf_t` structure depends on the number of workers,
+ specified by the `nworkers` parameter.
+
+* `ringbuf_worker_t *ringbuf_register(ringbuf_t *rbuf, unsigned i)`
+ * Register the current worker (thread or process) as a producer. Each
+ producer MUST register itself. The `i` is a worker number, starting
+ from zero (i.e. shall be than `nworkers` used in the setup). On success,
+ returns a pointer to an opaque `ringbuf_worker_t` structured, which is
+ a part of the `ringbuf_t` memory block. On failure, returns `NULL`.
+
+* `void ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *worker)`
+ * Unregister the specified worker from the list of producers.
+
+* `ssize_t ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *worker, size_t len)`
+ * Request a space of a given length in the ring buffer. Returns the
+ offset at which the space is available or -1 on failure. Once the data
+ is ready (typically, when writing to the ring buffer is complete), the
+ `ringbuf_produce` function must be called to indicate that. Nested
+ acquire calls are not allowed.
+
+* `void ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *worker)`
+ * Indicate that the acquired range in the buffer is produced and is ready
+ to be consumed.
+
+* `size_t ringbuf_consume(ringbuf_t *rbuf, size_t *offset)`
+ * Get a contiguous range which is ready to be consumed. Returns zero
+ if there is no data available for consumption. Once the data is
+ consumed (typically, when reading from the ring buffer is complete),
+ the `ringbuf_release` function must be called to indicate that.
+
+* `void ringbuf_release(ringbuf_t *rbuf, size_t nbytes)`
+ * Indicate that the consumed range can now be released and may now be
+ reused by the producers.
+
+## Notes
+
+The consumer will return a contiguous block of ranges produced i.e. the
+`ringbuf_consume` call will not return partial ranges. If you think of
+produced range as a message, then consumer will return a block of messages,
+always ending at the message boundary. Such behaviour allows us to use
+this ring buffer implementation as a message queue.
+
+The implementation was extensively tested on a 24-core x86 machine,
+see [the stress test](src/t_stress.c) for the details on the technique.
+It also provides an example how the mechanism can be used for message
+passing.
+
+## Caveats
+
+This ring buffer implementation always provides a contiguous range of
+space for the producer. It is achieved by an early wrap-around if the
+requested range cannot fit in the end. The implication of this is that
+the `ringbuf_acquire` call may fail if the requested range is greater
+than half of the buffer size. Hence, it may be necessary to ensure that
+the ring buffer size is at least twice as large as the maximum production
+unit size.
+
+It should also be noted that one of the trade-offs of such design is that
+the consumer currently performs an O(n) scan on the list of producers.
+
+## Example
+
+Producers:
+```c
+if ((w = ringbuf_register(r, worker_id)) == NULL)
+ err(EXIT_FAILURE, "ringbuf_register")
+
+...
+
+if ((off = ringbuf_acquire(r, w, len)) != -1) {
+ memcpy(&buf[off], payload, len);
+ ringbuf_produce(r, tls);
+}
+```
+
+Consumer:
+```c
+if ((len = ringbuf_consume(r, &off)) != 0) {
+ process(&buf[off], len);
+ ringbuf_release(r, len);
+}
+```
diff --git a/deps/ringbuf/ringbuf.c b/deps/ringbuf/ringbuf.c
new file mode 100644
index 0000000..75cfee9
--- /dev/null
+++ b/deps/ringbuf/ringbuf.c
@@ -0,0 +1,430 @@
+/*
+ * Copyright (c) 2016-2017 Mindaugas Rasiukevicius <rmind at noxt eu>
+ * All rights reserved.
+ *
+ * Use is subject to license terms, as specified in the LICENSE file.
+ */
+
+/*
+ * Atomic multi-producer single-consumer ring buffer, which supports
+ * contiguous range operations and which can be conveniently used for
+ * message passing.
+ *
+ * There are three offsets -- think of clock hands:
+ * - NEXT: marks the beginning of the available space,
+ * - WRITTEN: the point up to which the data is actually written.
+ * - Observed READY: point up to which data is ready to be written.
+ *
+ * Producers
+ *
+ * Observe and save the 'next' offset, then request N bytes from
+ * the ring buffer by atomically advancing the 'next' offset. Once
+ * the data is written into the "reserved" buffer space, the thread
+ * clears the saved value; these observed values are used to compute
+ * the 'ready' offset.
+ *
+ * Consumer
+ *
+ * Writes the data between 'written' and 'ready' offsets and updates
+ * the 'written' value. The consumer thread scans for the lowest
+ * seen value by the producers.
+ *
+ * Key invariant
+ *
+ * Producers cannot go beyond the 'written' offset; producers are
+ * also not allowed to catch up with the consumer. Only the consumer
+ * is allowed to catch up with the producer i.e. set the 'written'
+ * offset to be equal to the 'next' offset.
+ *
+ * Wrap-around
+ *
+ * If the producer cannot acquire the requested length due to little
+ * available space at the end of the buffer, then it will wraparound.
+ * WRAP_LOCK_BIT in 'next' offset is used to lock the 'end' offset.
+ *
+ * There is an ABA problem if one producer stalls while a pair of
+ * producer and consumer would both successfully wrap-around and set
+ * the 'next' offset to the stale value of the first producer, thus
+ * letting it to perform a successful CAS violating the invariant.
+ * A counter in the 'next' offset (masked by WRAP_COUNTER) is used
+ * to prevent from this problem. It is incremented on wraparounds.
+ *
+ * The same ABA problem could also cause a stale 'ready' offset,
+ * which could be observed by the consumer. We set WRAP_LOCK_BIT in
+ * the 'seen' value before advancing the 'next' and clear this bit
+ * after the successful advancing; this ensures that only the stable
+ * 'ready' is observed by the consumer.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stddef.h>
+#include <stdbool.h>
+#include <inttypes.h>
+#include <string.h>
+#include <limits.h>
+#include <errno.h>
+
+#include "ringbuf.h"
+#include "utils.h"
+
+#define RBUF_OFF_MASK (0x00000000ffffffffUL)
+#define WRAP_LOCK_BIT (0x8000000000000000UL)
+#define RBUF_OFF_MAX (UINT64_MAX & ~WRAP_LOCK_BIT)
+
+#define WRAP_COUNTER (0x7fffffff00000000UL)
+#define WRAP_INCR(x) (((x) + 0x100000000UL) & WRAP_COUNTER)
+
+typedef uint64_t ringbuf_off_t;
+
+struct ringbuf_worker {
+ volatile ringbuf_off_t seen_off;
+ int registered;
+};
+
+struct ringbuf {
+ /* Ring buffer space. */
+ size_t space;
+
+ /*
+ * The NEXT hand is atomically updated by the producer.
+ * WRAP_LOCK_BIT is set in case of wrap-around; in such case,
+ * the producer can update the 'end' offset.
+ */
+ volatile ringbuf_off_t next;
+ ringbuf_off_t end;
+
+ /* The following are updated by the consumer. */
+ ringbuf_off_t written;
+ unsigned nworkers;
+ ringbuf_worker_t workers[];
+};
+
+/*
+ * ringbuf_setup: initialise a new ring buffer of a given length.
+ */
+int
+ringbuf_setup(ringbuf_t *rbuf, unsigned nworkers, size_t length)
+{
+ if (length >= RBUF_OFF_MASK) {
+ errno = EINVAL;
+ return -1;
+ }
+ memset(rbuf, 0, offsetof(ringbuf_t, workers[nworkers]));
+ rbuf->space = length;
+ rbuf->end = RBUF_OFF_MAX;
+ rbuf->nworkers = nworkers;
+ return 0;
+}
+
+/*
+ * ringbuf_get_sizes: return the sizes of the ringbuf_t and ringbuf_worker_t.
+ */
+void
+ringbuf_get_sizes(unsigned nworkers,
+ size_t *ringbuf_size, size_t *ringbuf_worker_size)
+{
+ if (ringbuf_size)
+ *ringbuf_size = offsetof(ringbuf_t, workers[nworkers]);
+ if (ringbuf_worker_size)
+ *ringbuf_worker_size = sizeof(ringbuf_worker_t);
+}
+
+/*
+ * ringbuf_register: register the worker (thread/process) as a producer
+ * and pass the pointer to its local store.
+ */
+ringbuf_worker_t *
+ringbuf_register(ringbuf_t *rbuf, unsigned i)
+{
+ ringbuf_worker_t *w = &rbuf->workers[i];
+
+ w->seen_off = RBUF_OFF_MAX;
+ atomic_store_explicit(&w->registered, true, memory_order_release);
+ return w;
+}
+
+void
+ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *w)
+{
+ w->registered = false;
+ (void)rbuf;
+}
+
+/*
+ * stable_nextoff: capture and return a stable value of the 'next' offset.
+ */
+static inline ringbuf_off_t
+stable_nextoff(ringbuf_t *rbuf)
+{
+ unsigned count = SPINLOCK_BACKOFF_MIN;
+ ringbuf_off_t next;
+retry:
+ next = atomic_load_explicit(&rbuf->next, memory_order_acquire);
+ if (next & WRAP_LOCK_BIT) {
+ SPINLOCK_BACKOFF(count);
+ goto retry;
+ }
+ ASSERT((next & RBUF_OFF_MASK) < rbuf->space);
+ return next;
+}
+
+/*
+ * stable_seenoff: capture and return a stable value of the 'seen' offset.
+ */
+static inline ringbuf_off_t
+stable_seenoff(ringbuf_worker_t *w)
+{
+ unsigned count = SPINLOCK_BACKOFF_MIN;
+ ringbuf_off_t seen_off;
+retry:
+ seen_off = atomic_load_explicit(&w->seen_off, memory_order_acquire);
+ if (seen_off & WRAP_LOCK_BIT) {
+ SPINLOCK_BACKOFF(count);
+ goto retry;
+ }
+ return seen_off;
+}
+
+/*
+ * ringbuf_acquire: request a space of a given length in the ring buffer.
+ *
+ * => On success: returns the offset at which the space is available.
+ * => On failure: returns -1.
+ */
+ssize_t
+ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len)
+{
+ ringbuf_off_t seen, next, target;
+
+ ASSERT(len > 0 && len <= rbuf->space);
+ ASSERT(w->seen_off == RBUF_OFF_MAX);
+
+ do {
+ ringbuf_off_t written;
+
+ /*
+ * Get the stable 'next' offset. Save the observed 'next'
+ * value (i.e. the 'seen' offset), but mark the value as
+ * unstable (set WRAP_LOCK_BIT).
+ *
+ * Note: CAS will issue a memory_order_release for us and
+ * thus ensures that it reaches global visibility together
+ * with new 'next'.
+ */
+ seen = stable_nextoff(rbuf);
+ next = seen & RBUF_OFF_MASK;
+ ASSERT(next < rbuf->space);
+ atomic_store_explicit(&w->seen_off, next | WRAP_LOCK_BIT,
+ memory_order_relaxed);
+
+ /*
+ * Compute the target offset. Key invariant: we cannot
+ * go beyond the WRITTEN offset or catch up with it.
+ */
+ target = next + len;
+ written = rbuf->written;
+ if (__predict_false(next < written && target >= written)) {
+ /* The producer must wait. */
+ atomic_store_explicit(&w->seen_off,
+ RBUF_OFF_MAX, memory_order_release);
+ return -1;
+ }
+
+ if (__predict_false(target >= rbuf->space)) {
+ const bool exceed = target > rbuf->space;
+
+ /*
+ * Wrap-around and start from the beginning.
+ *
+ * If we would exceed the buffer, then attempt to
+ * acquire the WRAP_LOCK_BIT and use the space in
+ * the beginning. If we used all space exactly to
+ * the end, then reset to 0.
+ *
+ * Check the invariant again.
+ */
+ target = exceed ? (WRAP_LOCK_BIT | len) : 0;
+ if ((target & RBUF_OFF_MASK) >= written) {
+ atomic_store_explicit(&w->seen_off,
+ RBUF_OFF_MAX, memory_order_release);
+ return -1;
+ }
+ /* Increment the wrap-around counter. */
+ target |= WRAP_INCR(seen & WRAP_COUNTER);
+ } else {
+ /* Preserve the wrap-around counter. */
+ target |= seen & WRAP_COUNTER;
+ }
+ } while (!atomic_compare_exchange_weak(&rbuf->next, &seen, target));
+
+ /*
+ * Acquired the range. Clear WRAP_LOCK_BIT in the 'seen' value
+ * thus indicating that it is stable now.
+ *
+ * No need for memory_order_release, since CAS issued a fence.
+ */
+ atomic_store_explicit(&w->seen_off, w->seen_off & ~WRAP_LOCK_BIT,
+ memory_order_relaxed);
+
+ /*
+ * If we set the WRAP_LOCK_BIT in the 'next' (because we exceed
+ * the remaining space and need to wrap-around), then save the
+ * 'end' offset and release the lock.
+ */
+ if (__predict_false(target & WRAP_LOCK_BIT)) {
+ /* Cannot wrap-around again if consumer did not catch-up. */
+ ASSERT(rbuf->written <= next);
+ ASSERT(rbuf->end == RBUF_OFF_MAX);
+ rbuf->end = next;
+ next = 0;
+
+ /*
+ * Unlock: ensure the 'end' offset reaches global
+ * visibility before the lock is released.
+ */
+ atomic_store_explicit(&rbuf->next,
+ (target & ~WRAP_LOCK_BIT), memory_order_release);
+ }
+ ASSERT((target & RBUF_OFF_MASK) <= rbuf->space);
+ return (ssize_t)next;
+}
+
+/*
+ * ringbuf_produce: indicate the acquired range in the buffer is produced
+ * and is ready to be consumed.
+ */
+void
+ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *w)
+{
+ (void)rbuf;
+ ASSERT(w->registered);
+ ASSERT(w->seen_off != RBUF_OFF_MAX);
+ atomic_store_explicit(&w->seen_off, RBUF_OFF_MAX, memory_order_release);
+}
+
+/*
+ * ringbuf_consume: get a contiguous range which is ready to be consumed.
+ */
+size_t
+ringbuf_consume(ringbuf_t *rbuf, size_t *offset)
+{
+ ringbuf_off_t written = rbuf->written, next, ready;
+ size_t towrite;
+retry:
+ /*
+ * Get the stable 'next' offset. Note: stable_nextoff() issued
+ * a load memory barrier. The area between the 'written' offset
+ * and the 'next' offset will be the *preliminary* target buffer
+ * area to be consumed.
+ */
+ next = stable_nextoff(rbuf) & RBUF_OFF_MASK;
+ if (written == next) {
+ /* If producers did not advance, then nothing to do. */
+ return 0;
+ }
+
+ /*
+ * Observe the 'ready' offset of each producer.
+ *
+ * At this point, some producer might have already triggered the
+ * wrap-around and some (or all) seen 'ready' values might be in
+ * the range between 0 and 'written'. We have to skip them.
+ */
+ ready = RBUF_OFF_MAX;
+
+ for (unsigned i = 0; i < rbuf->nworkers; i++) {
+ ringbuf_worker_t *w = &rbuf->workers[i];
+ ringbuf_off_t seen_off;
+
+ /*
+ * Skip if the worker has not registered.
+ *
+ * Get a stable 'seen' value. This is necessary since we
+ * want to discard the stale 'seen' values.
+ */
+ if (!atomic_load_explicit(&w->registered, memory_order_relaxed))
+ continue;
+ seen_off = stable_seenoff(w);
+
+ /*
+ * Ignore the offsets after the possible wrap-around.
+ * We are interested in the smallest seen offset that is
+ * not behind the 'written' offset.
+ */
+ if (seen_off >= written) {
+ ready = MIN(seen_off, ready);
+ }
+ ASSERT(ready >= written);
+ }
+
+ /*
+ * Finally, we need to determine whether wrap-around occurred
+ * and deduct the safe 'ready' offset.
+ */
+ if (next < written) {
+ const ringbuf_off_t end = MIN(rbuf->space, rbuf->end);
+
+ /*
+ * Wrap-around case. Check for the cut off first.
+ *
+ * Reset the 'written' offset if it reached the end of
+ * the buffer or the 'end' offset (if set by a producer).
+ * However, we must check that the producer is actually
+ * done (the observed 'ready' offsets are clear).
+ */
+ if (ready == RBUF_OFF_MAX && written == end) {
+ /*
+ * Clear the 'end' offset if was set.
+ */
+ if (rbuf->end != RBUF_OFF_MAX) {
+ rbuf->end = RBUF_OFF_MAX;
+ }
+
+ /*
+ * Wrap-around the consumer and start from zero.
+ */
+ written = 0;
+ atomic_store_explicit(&rbuf->written,
+ written, memory_order_release);
+ goto retry;
+ }
+
+ /*
+ * We cannot wrap-around yet; there is data to consume at
+ * the end. The ready range is smallest of the observed
+ * 'ready' or the 'end' offset. If neither is set, then
+ * the actual end of the buffer.
+ */
+ ASSERT(ready > next);
+ ready = MIN(ready, end);
+ ASSERT(ready >= written);
+ } else {
+ /*
+ * Regular case. Up to the observed 'ready' (if set)
+ * or the 'next' offset.
+ */
+ ready = MIN(ready, next);
+ }
+ towrite = ready - written;
+ *offset = written;
+
+ ASSERT(ready >= written);
+ ASSERT(towrite <= rbuf->space);
+ return towrite;
+}
+
+/*
+ * ringbuf_release: indicate that the consumed range can now be released.
+ */
+void
+ringbuf_release(ringbuf_t *rbuf, size_t nbytes)
+{
+ const size_t nwritten = rbuf->written + nbytes;
+
+ ASSERT(rbuf->written <= rbuf->space);
+ ASSERT(rbuf->written <= rbuf->end);
+ ASSERT(nwritten <= rbuf->space);
+
+ rbuf->written = (nwritten == rbuf->space) ? 0 : nwritten;
+}
diff --git a/deps/ringbuf/ringbuf.h b/deps/ringbuf/ringbuf.h
new file mode 100644
index 0000000..e8fc767
--- /dev/null
+++ b/deps/ringbuf/ringbuf.h
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2016 Mindaugas Rasiukevicius <rmind at noxt eu>
+ * All rights reserved.
+ *
+ * Use is subject to license terms, as specified in the LICENSE file.
+ */
+
+#ifndef _RINGBUF_H_
+#define _RINGBUF_H_
+
+__BEGIN_DECLS
+
+typedef struct ringbuf ringbuf_t;
+typedef struct ringbuf_worker ringbuf_worker_t;
+
+int ringbuf_setup(ringbuf_t *, unsigned, size_t);
+void ringbuf_get_sizes(unsigned, size_t *, size_t *);
+
+ringbuf_worker_t *ringbuf_register(ringbuf_t *, unsigned);
+void ringbuf_unregister(ringbuf_t *, ringbuf_worker_t *);
+
+ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t *, size_t);
+void ringbuf_produce(ringbuf_t *, ringbuf_worker_t *);
+size_t ringbuf_consume(ringbuf_t *, size_t *);
+void ringbuf_release(ringbuf_t *, size_t);
+
+__END_DECLS
+
+#endif
diff --git a/deps/ringbuf/utils.h b/deps/ringbuf/utils.h
new file mode 100644
index 0000000..413157b
--- /dev/null
+++ b/deps/ringbuf/utils.h
@@ -0,0 +1,115 @@
+/*
+ * Copyright (c) 1991, 1993
+ * The Regents of the University of California. All rights reserved.
+ *
+ * This code is derived from software contributed to Berkeley by
+ * Berkeley Software Design, Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. Neither the name of the University nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ * @(#)cdefs.h 8.8 (Berkeley) 1/9/95
+ */
+
+#ifndef _UTILS_H_
+#define _UTILS_H_
+
+#include <assert.h>
+
+/*
+ * A regular assert (debug/diagnostic only).
+ */
+#if defined(DEBUG)
+#define ASSERT assert
+#else
+#define ASSERT(x)
+#endif
+
+/*
+ * Minimum, maximum and rounding macros.
+ */
+
+#ifndef MIN
+#define MIN(x, y) ((x) < (y) ? (x) : (y))
+#endif
+
+#ifndef MAX
+#define MAX(x, y) ((x) > (y) ? (x) : (y))
+#endif
+
+/*
+ * Branch prediction macros.
+ */
+#ifndef __predict_true
+#define __predict_true(x) __builtin_expect((x) != 0, 1)
+#define __predict_false(x) __builtin_expect((x) != 0, 0)
+#endif
+
+/*
+ * Atomic operations and memory barriers. If C11 API is not available,
+ * then wrap the GCC builtin routines.
+ *
+ * Note: This atomic_compare_exchange_weak does not do the C11 thing of
+ * filling *(expected) with the actual value, because we don't need
+ * that here.
+ */
+#ifndef atomic_compare_exchange_weak
+#define atomic_compare_exchange_weak(ptr, expected, desired) \
+ __sync_bool_compare_and_swap(ptr, *(expected), desired)
+#endif
+
+#ifndef atomic_thread_fence
+#define memory_order_relaxed __ATOMIC_RELAXED
+#define memory_order_acquire __ATOMIC_ACQUIRE
+#define memory_order_release __ATOMIC_RELEASE
+#define memory_order_seq_cst __ATOMIC_SEQ_CST
+#define atomic_thread_fence(m) __atomic_thread_fence(m)
+#endif
+#ifndef atomic_store_explicit
+#define atomic_store_explicit __atomic_store_n
+#endif
+#ifndef atomic_load_explicit
+#define atomic_load_explicit __atomic_load_n
+#endif
+
+/*
+ * Exponential back-off for the spinning paths.
+ */
+#define SPINLOCK_BACKOFF_MIN 4
+#define SPINLOCK_BACKOFF_MAX 128
+#if defined(__x86_64__) || defined(__i386__)
+#define SPINLOCK_BACKOFF_HOOK __asm volatile("pause" ::: "memory")
+#else
+#define SPINLOCK_BACKOFF_HOOK
+#endif
+#define SPINLOCK_BACKOFF(count) \
+do { \
+ for (int __i = (count); __i != 0; __i--) { \
+ SPINLOCK_BACKOFF_HOOK; \
+ } \
+ if ((count) < SPINLOCK_BACKOFF_MAX) \
+ (count) += (count); \
+} while (/* CONSTCOND */ 0);
+
+#endif
diff --git a/inc/tsg_send_log.h b/inc/tsg_send_log.h
index 5951be8..6a2177a 100644
--- a/inc/tsg_send_log.h
+++ b/inc/tsg_send_log.h
@@ -29,7 +29,8 @@ enum LOG_TYPE
LOG_TYPE_TRANSACTION_RECORD,
LOG_TYPE_GTPC_RECORD,
LOG_TYPE_BGP_RECORD,
- LOG_TYPE_INTERCEPT_EVENT
+ LOG_TYPE_INTERCEPT_EVENT,
+ LOG_TYPE_MAX
};
struct TLD_handle_t;
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index d48f4a5..6b371b7 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -2,7 +2,25 @@ cmake_minimum_required(VERSION 2.8...3.10)
add_definitions(-fPIC)
-set(SRC tsg_entry.cpp tsg_rule.cpp tsg_ssl_utils.cpp tsg_send_log.cpp tsg_statistic.cpp tsg_ssh_utils.cpp tsg_gtp_signaling.cpp tsg_action.cpp tsg_leaky_bucket.cpp tsg_dns.cpp tsg_icmp.cpp tsg_tamper.cpp tsg_bridge.cpp tsg_sync_state.cpp tsg_variable.cpp tsg_proxy.cpp mpack.c tsg_stat.cpp tsg_ssl_ja3_fingerprint.cpp)
+set(SRC tsg_entry.cpp
+ tsg_rule.cpp
+ tsg_ssl_utils.cpp
+ tsg_send_log.cpp
+ tsg_statistic.cpp
+ tsg_ssh_utils.cpp
+ tsg_gtp_signaling.cpp
+ tsg_action.cpp
+ tsg_leaky_bucket.cpp
+ tsg_dns.cpp tsg_icmp.cpp
+ tsg_tamper.cpp tsg_bridge.cpp
+ tsg_sync_state.cpp
+ tsg_variable.cpp
+ tsg_proxy.cpp
+ mpack.c
+ tsg_stat.cpp
+ tsg_ssl_ja3_fingerprint.cpp
+ ${PROJECT_SOURCE_DIR}/deps/ringbuf/ringbuf.c
+ )
include_directories(${CMAKE_SOURCE_DIR}/inc)
include_directories(/opt/MESA/include/MESA/)
diff --git a/src/tsg_send_log.cpp b/src/tsg_send_log.cpp
index 62bb7e2..3d8bbee 100644
--- a/src/tsg_send_log.cpp
+++ b/src/tsg_send_log.cpp
@@ -56,6 +56,21 @@ const id2field_t tld_type[TLD_TYPE_MAX]={{TLD_TYPE_UNKNOWN, TLD_TYPE_UNKNOWN, "
extern "C" int MESA_get_dev_ipv4(const char *device, int *ip_add);
+char *tsg_string_dup(char *string)
+{
+ if(string==NULL)
+ {
+ return NULL;
+ }
+
+ size_t len=strlen(string);
+ char *dup=(char *)malloc(len+1);
+ memcpy(dup, string, len);
+ dup[len]='\0';
+
+ return dup;
+}
+
static int string_cat(char *dst, int dst_len, char *src)
{
if(dst==NULL || dst_len<=0 || src==NULL)
@@ -1195,48 +1210,6 @@ int TLD_convert_json(struct TLD_handle_t *_handle, char *buff, unsigned int buff
return 1;
}
-static int set_mail_eml(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream)
-{
- struct tsg_conn_sketch_notify_data *notify_mail=(struct tsg_conn_sketch_notify_data *)session_conn_sketch_notify_data_get(a_stream);
- if(notify_mail!=NULL && notify_mail->pdata.mail_eml_filename!=NULL && notify_mail->protocol==PROTO_MAIL)
- {
- TLD_delete(_handle, _instance->id2field[LOG_COMMON_MAIL_EML_FILE].name);
- TLD_append(_handle, _instance->id2field[LOG_COMMON_MAIL_EML_FILE].name, (void *)notify_mail->pdata.mail_eml_filename, TLD_TYPE_STRING);
- return 1;
- }
-
- return 0;
-}
-
-static int set_s3_filename(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream)
-{
- struct business_notify_data *bnd_label=(struct business_notify_data *)session_business_data_get(a_stream);
- if(bnd_label==NULL || bnd_label->pdata==NULL)
- {
- return 0;
- }
-
- switch(bnd_label->proto)
- {
- case PROTO_HTTP:
- TLD_delete(_handle, _instance->id2field[LOG_COMMON_HTTP_REQUEST_S3_FILE].name);
- TLD_delete(_handle, _instance->id2field[LOG_COMMON_HTTP_RESPONSE_S3_FILE].name);
-
- if(bnd_label->s3_http==NULL)
- {
- break;
- }
-
- TLD_append(_handle, _instance->id2field[LOG_COMMON_HTTP_REQUEST_S3_FILE].name, bnd_label->s3_http->request_filename, TLD_TYPE_STRING);
- TLD_append(_handle, _instance->id2field[LOG_COMMON_HTTP_RESPONSE_S3_FILE].name, bnd_label->s3_http->response_filename, TLD_TYPE_STRING);
- break;
- default:
- break;
- }
-
- return 1;
-}
-
int set_nat_linkinfo(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream)
{
if(_instance->send_nat_linkinfo==0 || a_stream==NULL)
@@ -1578,14 +1551,11 @@ static int is_hitted_shunt(const struct streaminfo *a_stream)
return 0;
}
-static int set_xxxx_from_user_region(struct TLD_handle_t *_handle, struct tsg_log_instance_t *_instance, struct maat_rule *p_result, int thread_seq)
+static int set_xxxx_from_user_region(struct TLD_handle_t *_handle, struct tsg_log_instance_t *_instance, struct maat_rule *p_result)
{
cJSON *item=NULL;
cJSON *object=NULL;
- TLD_delete(_handle, _instance->id2field[LOG_COMMON_USER_REGION].name);
- TLD_delete(_handle, _instance->id2field[LOG_COMMON_SUB_ACTION].name);
-
if(p_result->action!=TSG_ACTION_NONE)
{
struct maat_compile *compile=(struct maat_compile *)matched_rule_cites_security_compile(g_tsg_maat_feather, p_result->rule_id);
@@ -1630,40 +1600,6 @@ int set_application_behavior(struct tsg_log_instance_t *_instance, struct TLD_ha
return 1;
}
-int set_policy_action_para_exec_result(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream, struct maat_rule *p_result)
-{
- struct tsg_notify_execution_result *execution_result=(struct tsg_notify_execution_result *)session_mirrored_and_capture_packets_exec_result_get(a_stream);
- if(execution_result==NULL)
- {
- return 0;
- }
-
- TLD_delete(_handle, _instance->id2field[LOG_COMMON_MIRRORED_PKTS].name);
- TLD_delete(_handle, _instance->id2field[LOG_COMMON_MIRRORED_BYTES].name);
- TLD_delete(_handle, _instance->id2field[LOG_COMMON_PACKET_CAPTURE_FILE].name);
-
- for(int i=0; i<execution_result->stat_mirrored_cnt; i++)
- {
- if(execution_result->stat_mirrored[i].compile_id==p_result->rule_id)
- {
- TLD_append(_handle, _instance->id2field[LOG_COMMON_MIRRORED_PKTS].name, (void *)(execution_result->stat_mirrored[i].packets), TLD_TYPE_LONG);
- TLD_append(_handle, _instance->id2field[LOG_COMMON_MIRRORED_BYTES].name, (void *)(execution_result->stat_mirrored[i].bytes), TLD_TYPE_LONG);
- break;
- }
- }
-
- for(int i=0; i<execution_result->capture_result_cnt; i++)
- {
- if(execution_result->capture_result[i].compile_id==p_result->rule_id)
- {
- TLD_append(_handle, _instance->id2field[LOG_COMMON_PACKET_CAPTURE_FILE].name, (void *)(execution_result->capture_result[i].packet_path), TLD_TYPE_STRING);
- break;
- }
- }
-
- return 1;
-}
-
int set_session_attributes(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream)
{
struct session_runtime_attribute *srt_attribute=(struct session_runtime_attribute *)session_runtime_attribute_get(a_stream);
@@ -1867,24 +1803,6 @@ static int session_record_limit(struct tsg_log_instance_t *_instance, const stru
return 0;
}
-int append_common_field(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream)
-{
- TLD_append_streaminfo(_instance, _handle, a_stream);
- TLD_append(_handle, _instance->id2field[LOG_COMMON_SLED_IP].name, (void *)(_instance->local_ip_str), TLD_TYPE_STRING);
- if(strlen(g_tsg_para.device_sn)>0)
- {
- TLD_append(_handle, _instance->id2field[LOG_COMMON_DEVICE_ID].name, (void *)(g_tsg_para.device_sn), TLD_TYPE_STRING);
- }
-
- TLD_append(_handle, _instance->id2field[LOG_COMMON_DATA_CENTER].name, (void *)tsg_data_center_get(), TLD_TYPE_STRING);
- TLD_append(_handle, _instance->id2field[LOG_COMMON_DEVICE_TAG].name, (void *)tsg_device_tag_get(), TLD_TYPE_STRING);
- TLD_append(_handle, _instance->id2field[LOG_COMMON_TRAFFIC_VSYSTEM_ID].name, (void *)(long)_instance->vsystem_id, TLD_TYPE_LONG);
-
- set_application_behavior(_instance, _handle, a_stream);
-
- return 1;
-}
-
int log_common_fields_new(const char *filename, id2field_t *id2field, struct topic_stat **service2topic, int *max_service)
{
int i=0,flag=0;
@@ -1987,221 +1905,271 @@ int log_common_fields_new(const char *filename, id2field_t *id2field, struct top
return 0;
}
-struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
+void tsg_enforce_rule_result_free(struct enforce_rule_result *enforce_rule, size_t n_enforce_rule)
{
- char override_sled_ip[32]={0};
- char kafka_errstr[1024]={0};
- unsigned int local_ip_nr=0;
- rd_kafka_conf_t *rdkafka_conf = NULL;
- char broker_list[1024]={0};
- struct tsg_log_instance_t *_instance=NULL;
- char common_field_file[128]={0};
- char log_path[128]={0};
-
- _instance=(struct tsg_log_instance_t *)calloc(1, sizeof(struct tsg_log_instance_t));
+ for(size_t i=0; i<n_enforce_rule && enforce_rule!=NULL; i++)
+ {
+ if(enforce_rule[i].mail_eml_filename!=NULL)
+ {
+ free(enforce_rule[i].mail_eml_filename);
+ enforce_rule[i].mail_eml_filename=NULL;
+ }
- MESA_load_profile_int_def(conffile, "TSG_LOG", "LOG_LEVEL",&(_instance->level), 30);
- MESA_load_profile_string_def(conffile, "TSG_LOG", "LOG_PATH", log_path, sizeof(log_path), "./log/tsglog");
- MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_USER_REGION", &(_instance->send_user_region), 0);
- MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_DATA_CENTER_SWITCH", &(_instance->send_data_center), 0);
- MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_APP_ID_SWITCH", &(_instance->send_app_id), 0);
- MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_INTERCEPT_LOG", &(_instance->send_intercept_log), 0);
- MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_NAT_LINKINFO_SWITCH", &(_instance->send_nat_linkinfo), 0);
- MESA_load_profile_int_def(conffile, "TSG_LOG", "RAPIDJSON_CHUNK_CAPACITY", &(_instance->rapidjson_chunk_capacity), 2048);
+ if(enforce_rule[i].http_request_filename!=NULL)
+ {
+ free(enforce_rule[i].http_request_filename);
+ enforce_rule[i].http_request_filename=NULL;
+ }
- MESA_load_profile_int_def(conffile, "TSG_LOG", "VSYSTEM_ID", &(_instance->vsystem_id), 1);
- MESA_load_profile_int_def(conffile, "SYSTEM","UNKNOWN_APP_ID", &_instance->unknown_app_id, 4);
+ if(enforce_rule[i].http_response_filename!=NULL)
+ {
+ free(enforce_rule[i].http_response_filename);
+ enforce_rule[i].http_response_filename=NULL;
+ }
- MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_INTERIM_RECORD", &(_instance->send_interim_log), 1);
- MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_TRANSCATION_RECORD", &(_instance->send_transcation_log), 1);
- MESA_load_profile_int_def(conffile, "TSG_LOG","TCP_MIN_PKTS", &_instance->tcp_min_log_pkts, 3);
- MESA_load_profile_int_def(conffile, "TSG_LOG","TCP_MIN_BYTES", &_instance->tcp_min_log_bytes, 5);
- MESA_load_profile_int_def(conffile, "TSG_LOG","UDP_MIN_PKTS", &_instance->udp_min_log_pkts, 3);
- MESA_load_profile_int_def(conffile, "TSG_LOG","UDP_MIN_BYTES", &_instance->udp_min_log_bytes, 5);
-
- _instance->logger=MESA_create_runtime_log_handle(log_path, _instance->level);
- if(_instance->logger==NULL)
- {
- printf("MESA_create_runtime_log_handle failed ..., path: %s level: %d", log_path, _instance->level);
- free(_instance);
- _instance=NULL;
- return NULL;
+ if(enforce_rule[i].packet_capture_file!=NULL)
+ {
+ free(enforce_rule[i].packet_capture_file);
+ enforce_rule[i].packet_capture_file=NULL;
+ }
}
-
- MESA_load_profile_int_def(conffile, "TSG_LOG", "MODE",&(_instance->mode), 0);
- if(_instance->mode==CLOSE)
+}
+
+std::string tsg_rapidjson_serialize(Document *document, rapidjson::StringBuffer& sbuff)
+{
+ sbuff.Clear();
+ rapidjson::Writer<rapidjson::StringBuffer> writer(sbuff);
+ document->Accept(writer);
+ return std::string(sbuff.GetString());
+}
+
+void *tsg_log_ringbuf_consume(void *arg)
+{
+ struct logger_ringbuf_schema *ringbuf=(struct logger_ringbuf_schema *)arg;
+ rapidjson::StringBuffer sbuff(0, 1024*16);
+
+ while(1)
{
- MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG, "Disable tsg_send_log");
- return _instance;
- }
-
- MESA_load_profile_int_def(conffile, "TSG_LOG", "RECOVERY_INTERVEL_S", &(_instance->recovery_interval), 30);
+ size_t offset=0;
+ size_t scratch_sz=ringbuf_consume(ringbuf->ring, &offset);
+ if(scratch_sz==0)
+ {
+ tsg_stat_sendlog_update(ringbuf->consume_row_idx, LOG_STATUS_FAIL, 1);
+ continue;
+ }
+
+ struct logger_scratch *scratch=(struct logger_scratch *)(ringbuf->buff+offset);
- MESA_load_profile_string_def(conffile, "TSG_LOG", "COMMON_FIELD_FILE", common_field_file, sizeof(common_field_file), NULL);
- MESA_load_profile_string_def(conffile, "TSG_LOG", "BROKER_LIST", broker_list, sizeof(broker_list), NULL);
- MESA_load_profile_string_def(conffile, "TSG_LOG", "SASL_USERNAME", _instance->sasl_username, sizeof(_instance->sasl_username), ""); //admin
- MESA_load_profile_string_def(conffile, "TSG_LOG", "SASL_PASSWD", _instance->sasl_passwd, sizeof(_instance->sasl_passwd), "");
- MESA_load_profile_string_def(conffile, "TSG_LOG", "COMPRESSION_TYPE", _instance->compression, sizeof(_instance->compression), ""); //snappy
+ TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_VSYSTEM_ID].name);
- MESA_load_profile_string_def(conffile, "TSG_LOG", "SEND_QUEUE_MAX_MESSAGE", _instance->send_queue_max_msg, sizeof(_instance->send_queue_max_msg), "1000000");
- MESA_load_profile_string_def(conffile, "TSG_LOG", "REFRESH_INTERVAL_MS", _instance->refresh_interval_ms, sizeof(_instance->refresh_interval_ms), "600000");
- MESA_load_profile_string_def(conffile, "TSG_LOG", "REQUIRE_ACK", _instance->require_ack, sizeof(_instance->require_ack), "1");
+ for(int i=0; i<scratch->n_enforce_rule && scratch->enforce_rule!=NULL; i++)
+ {
+ TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_POLICY_ID].name, (void *)(long)(scratch->enforce_rule[i].rule.rule_id), TLD_TYPE_LONG);
+ TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_SERVICE].name, (void *)(long)(scratch->enforce_rule[i].rule.service_id), TLD_TYPE_LONG);
+ TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_VSYSTEM_ID].name, (void *)(long)scratch->enforce_rule[i].rule.vsys_id, TLD_TYPE_LONG);
+ TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_ACTION].name, (void *)(long)(scratch->enforce_rule[i].rule.action), TLD_TYPE_LONG);
- MESA_load_profile_string_def(conffile, "SYSTEM", "TCP_LABEL", _instance->tcp_label, sizeof(_instance->tcp_label), "tcp_flow_stat");
- MESA_load_profile_string_def(conffile, "SYSTEM", "UDP_LABEL", _instance->udp_label, sizeof(_instance->udp_label), "udp_flow_stat");
+ if(scratch->enforce_rule[i].mirrored_packets>0)
+ {
+ TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_MIRRORED_PKTS].name, (void *)(scratch->enforce_rule[i].mirrored_packets), TLD_TYPE_LONG);
+ }
- _instance->tcp_flow_project_id=project_customer_register(_instance->tcp_label, "struct");
- _instance->udp_flow_project_id=project_customer_register(_instance->udp_label, "struct");
- if(_instance->tcp_flow_project_id<0 || _instance->udp_flow_project_id<0)
- {
- MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG,
- "project_customer_register is error, tcp_label: %s udp_label: %s, please check etc/project.conf",
- _instance->tcp_label,
- _instance->udp_label
- );
- }
+ if(scratch->enforce_rule[i].mirrored_bytes>0)
+ {
+ TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_MIRRORED_BYTES].name, (void *)(scratch->enforce_rule[i].mirrored_bytes), TLD_TYPE_LONG);
+ }
- MESA_load_profile_string_def(conffile, "SYSTEM", "OVERRIDE_SLED_IP", override_sled_ip, sizeof(override_sled_ip), "OVERRIDE_SLED_IP");
- char *sled_ip=getenv(override_sled_ip);
- if(sled_ip==NULL)
- {
- char nic_name[32]={0};
- MESA_load_profile_string_def(conffile, "SYSTEM", "NIC_NAME", nic_name, sizeof(nic_name), "lo");
- int ret=MESA_get_dev_ipv4(nic_name, (int *)&local_ip_nr);
- if(ret<0)
+ TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_MAIL_EML_FILE].name, (void *)(scratch->enforce_rule[i].mail_eml_filename), TLD_TYPE_STRING);
+ TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_HTTP_REQUEST_S3_FILE].name, (void *)(scratch->enforce_rule[i].http_request_filename), TLD_TYPE_STRING);
+ TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_HTTP_RESPONSE_S3_FILE].name, (void *)(scratch->enforce_rule[i].http_response_filename), TLD_TYPE_STRING);
+ TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_PACKET_CAPTURE_FILE].name, (void *)(scratch->enforce_rule[i].packet_capture_file), TLD_TYPE_STRING);
+
+ set_xxxx_from_user_region(scratch->handle, scratch->instance, &(scratch->enforce_rule[i].rule));
+
+ std::string sbuff_str=tsg_rapidjson_serialize(scratch->handle->document, sbuff);
+ tsg_send_payload(scratch->instance, scratch->enforce_rule[i].log_type, (char *)sbuff_str.c_str(), sbuff_str.length(), scratch->thread_id);
+
+ TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_POLICY_ID].name);
+ TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_SERVICE].name);
+ TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_ACTION].name);
+ TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_USER_REGION].name);
+ TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_VSYSTEM_ID].name);
+
+ TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_MIRRORED_PKTS].name);
+ TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_MIRRORED_BYTES].name);
+
+ TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_MAIL_EML_FILE].name);
+ TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_HTTP_REQUEST_S3_FILE].name);
+ TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_HTTP_RESPONSE_S3_FILE].name);
+ TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_PACKET_CAPTURE_FILE].name);
+
+ TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_USER_REGION].name);
+ TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_SUB_ACTION].name);
+
+ fs3_rule_stat_update(RULE_STAT_SEND, (int)(scratch->enforce_rule[i].rule.action), 1);
+ }
+
+ if(scratch->log_type!=LOG_TYPE_MAX)
{
- MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG,
- "GET_LOCAL_IP MESA_get_dev_ipv4 is error, nic_name: %s, please check tsgconf/main.conf",
- nic_name
- );
- return NULL;
+ TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_VSYSTEM_ID].name, (void *)(long)scratch->instance->vsystem_id, TLD_TYPE_LONG);
+ std::string sbuff_str=tsg_rapidjson_serialize(scratch->handle->document, sbuff);
+ tsg_send_payload(scratch->instance, scratch->log_type, (char *)sbuff_str.c_str(), sbuff_str.length(), scratch->thread_id);
}
- inet_ntop(AF_INET,&(local_ip_nr),_instance->local_ip_str,sizeof(_instance->local_ip_str));
- }
- else
- {
- memcpy(_instance->local_ip_str, sled_ip, MIN(sizeof(_instance->local_ip_str)-1, strlen(sled_ip)));
- }
- rdkafka_conf = rd_kafka_conf_new();
- rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", _instance->send_queue_max_msg, kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", _instance->refresh_interval_ms, kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "request.required.acks", _instance->require_ack, kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", broker_list, kafka_errstr, sizeof(kafka_errstr));
-
- if(strlen(_instance->compression)>0)
- {
- rd_kafka_conf_set(rdkafka_conf, "compression.codec", _instance->compression, kafka_errstr, sizeof(kafka_errstr));
- }
+ tsg_stat_sendlog_update(ringbuf->consume_row_idx, LOG_STATUS_SUCCESS, 1);
- if(strlen(_instance->sasl_username)> 0 && strlen(_instance->sasl_passwd)>0)
- {
- rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "sasl.username", _instance->sasl_username, kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rdkafka_conf, "sasl.password", _instance->sasl_passwd, kafka_errstr, sizeof(kafka_errstr));
+ TLD_cancel(scratch->handle);
+ if(scratch->enforce_rule!=NULL)
+ {
+ tsg_enforce_rule_result_free(scratch->enforce_rule, scratch->n_enforce_rule);
+ free(scratch->enforce_rule);
+ }
+
+ memset(scratch, 0, sizeof(struct logger_scratch));
+ ringbuf_release(ringbuf->ring, sizeof(struct logger_scratch));
}
-
- if(!(_instance->kafka_handle=rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr))))
+
+ pthread_exit(NULL);
+}
+
+void tsg_log_ringbuf_produce(struct tsg_log_instance_t *instance, struct TLD_handle_t *handle, enum LOG_TYPE log_type, struct enforce_rule_result *enforce_rule, size_t n_enforce_rule, int thread_id)
+{
+ int i=(thread_id%instance->ringbuf_num);
+ struct logger_ringbuf_schema *ringbuf=&(instance->ringbuf[i]);
+ ssize_t offset=ringbuf_acquire(ringbuf->ring, ringbuf->worker[thread_id], sizeof(struct logger_scratch));
+ if(offset==-1)
{
- MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG, "KAFKA_INIT rd_kafka_new is error");
- return NULL;
+ TLD_cancel(handle);
+ tsg_enforce_rule_result_free(enforce_rule, n_enforce_rule);
+ tsg_stat_sendlog_update(ringbuf->produce_row_idx, LOG_STATUS_FAIL, 1);
+ return;
}
- log_common_fields_new(common_field_file, _instance->id2field, &(_instance->service2topic), &(_instance->max_service));
+ struct logger_scratch *scratch=(struct logger_scratch *)(ringbuf->buff+offset);
+ scratch->handle=handle;
+ scratch->instance=instance;
+ scratch->log_type=log_type;
+ scratch->thread_id=thread_id;
- if(_instance->service2topic!=NULL)
+ if(enforce_rule!=NULL && n_enforce_rule>0)
{
- _instance->sum_stat_row_id=tsg_stat_sendlog_row_init("sum");
- for(int i=0; i<_instance->max_service; i++)
- {
- if(_instance->service2topic[i].type==TLD_TYPE_MAX && strlen(_instance->service2topic[i].name)>0)
- {
- register_topic(_instance, &( _instance->service2topic[i]));
- }
- }
+ scratch->n_enforce_rule=MIN(n_enforce_rule, LOGGER_SCRATCH_RULE_MAX);
+ scratch->enforce_rule=(struct enforce_rule_result *)calloc(scratch->n_enforce_rule, sizeof(struct enforce_rule_result));
+ memcpy(scratch->enforce_rule, enforce_rule, sizeof(struct enforce_rule_result)*scratch->n_enforce_rule);
}
else
{
- MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG, "KAFKA_INIT log_common_fields_new is error, please check %s", common_field_file);
+ scratch->n_enforce_rule=0;
+ scratch->enforce_rule=NULL;
}
- return _instance;
+ ringbuf_produce(ringbuf->ring, ringbuf->worker[thread_id]);
+ tsg_stat_sendlog_update(ringbuf->produce_row_idx, LOG_STATUS_SUCCESS, 1);
}
-void tsg_sendlog_destroy(struct tsg_log_instance_t * instance)
+
+void tsg_rule_log_enable_additional_set(const struct streaminfo *a_stream, struct enforce_rule_result *enforce_rule, size_t n_enforce_rule)
{
- if(instance==NULL)
- {
- return ;
- }
-
- if(instance->mode!=CLOSE)
+ struct tsg_notify_execution_result *execution_result=(struct tsg_notify_execution_result *)session_mirrored_and_capture_packets_exec_result_get(a_stream);
+
+ for(size_t i=0;i<n_enforce_rule; i++)
{
- for(int i=0; i<instance->max_service; i++)
+ if(enforce_rule[i].rule.do_log==LOG_ABORT)
{
- if(instance->service2topic[i].type!=TLD_TYPE_MAX || i==1) //i=1 equal i=0, service id of security event is 0 and 1
- {
- continue;
- }
-
- if(instance->service2topic[i].topic_rkt!=NULL)
- {
- rd_kafka_topic_destroy(instance->service2topic[i].topic_rkt);
- }
+ continue;
+ }
- if(instance->service2topic[i].drop_start!=NULL)
+ if(execution_result!=NULL)
+ {
+ for(int j=0; j<execution_result->stat_mirrored_cnt; j++)
{
- free(instance->service2topic[i].drop_start);
- instance->service2topic[i].drop_start=NULL;
+ if(execution_result->stat_mirrored[j].compile_id==enforce_rule[i].rule.rule_id)
+ {
+ enforce_rule[i].mirrored_bytes=execution_result->stat_mirrored[j].bytes;
+ enforce_rule[i].mirrored_packets=execution_result->stat_mirrored[j].packets;
+ break;
+ }
}
- if(instance->service2topic[i].send_log_percent!=NULL)
- {
- free(instance->service2topic[i].send_log_percent);
- instance->service2topic[i].send_log_percent=NULL;
+ for(int j=0; j<execution_result->capture_result_cnt; j++)
+ {
+ if(execution_result->capture_result[j].compile_id!=enforce_rule[i].rule.rule_id)
+ {
+ continue;
+ }
+
+ if(execution_result->capture_result[j].packet_path!=NULL)
+ {
+ enforce_rule[i].packet_capture_file=tsg_string_dup(execution_result->capture_result[j].packet_path);
+ }
+
+ break;
}
}
-
- //rd_kafka_destroy_flags(instance->kafka_handle, 4);
- rd_kafka_destroy(instance->kafka_handle);
-
- free(instance->service2topic);
- instance->service2topic=NULL;
- }
-
- MESA_destroy_runtime_log_handle(instance->logger);
- instance->logger=NULL;
+ if(enforce_rule[i].rule.do_log==LOG_NOFILE)
+ {
+ continue;
+ }
- free(instance);
- instance=NULL;
- return ;
+ struct business_notify_data *bnd_label=(struct business_notify_data *)session_business_data_get(a_stream);
+ if(bnd_label!=NULL && bnd_label->pdata!=NULL && bnd_label->proto==PROTO_HTTP)
+ {
+ enforce_rule[i].http_request_filename=tsg_string_dup(bnd_label->s3_http->request_filename);
+ enforce_rule[i].http_response_filename=tsg_string_dup(bnd_label->s3_http->response_filename);
+ continue;
+ }
+
+ struct tsg_conn_sketch_notify_data *notify_mail=(struct tsg_conn_sketch_notify_data *)session_conn_sketch_notify_data_get(a_stream);
+ if(notify_mail!=NULL && notify_mail->pdata.mail_eml_filename!=NULL && notify_mail->protocol==PROTO_MAIL)
+ {
+ enforce_rule[i].mail_eml_filename=tsg_string_dup(notify_mail->pdata.mail_eml_filename);
+ }
+ }
}
-int send_log_by_type(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream, LOG_TYPE log_type, int thread_id)
+void tsg_rule_log_enable_type_set(struct enforce_rule_result *enforce_rule, size_t n_enforce_rule, enum LOG_TYPE log_type)
{
- int ret=update_percent(_instance, log_type, LOG_STATUS_DROP, thread_id);
- if(ret==1)
+ for(size_t i=0;i<n_enforce_rule; i++)
{
- MASTER_LOG(_instance->logger, RLOG_LV_DEBUG, LOG_MODULE_SENDLOG,
- "drop log: log_type=%d send_log_percent: %d addr=%s",
- log_type,
- _instance->service2topic[log_type].send_log_percent[thread_id],
- (a_stream==NULL ? "" : printaddr(&(a_stream->addr), thread_id))
- );
+ enforce_rule[i].log_type=log_type;
}
+}
+
+void tsg_rule_log_enable_select(struct maat_rule *rules, size_t n_rules, struct enforce_rule_result *enforce_rule, size_t n_enforce_rule, size_t *enforce_rule_offset)
+{
+ int repeat_rule=0;
+ for(size_t i=0;i<n_rules; i++)
+ {
+ if(rules[i].do_log==LOG_ABORT)
+ {
+ continue;
+ }
- StringBuffer sb(0, _instance->rapidjson_chunk_capacity);
- Writer<StringBuffer> writer(sb);
- _handle->document->Accept(writer);
+ for(size_t j=0; j<(*enforce_rule_offset); j++)
+ {
+ if(enforce_rule[j].rule.rule_id==rules[i].rule_id)
+ {
+ repeat_rule=1;
+ continue;
+ }
+ }
- tsg_send_payload(_instance, log_type, (char *)sb.GetString(), sb.GetSize(), thread_id);
-
- return 0;
+ if(repeat_rule==1)
+ {
+ repeat_rule=0;
+ continue;
+ }
+
+ if(n_enforce_rule < (*enforce_rule_offset)+1)
+ {
+ break;
+ }
+
+ enforce_rule[(*enforce_rule_offset)++].rule=rules[i];
+ }
}
-int send_event_log(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream, LOG_TYPE log_type, struct maat_rule *rules, size_t n_rules, int thread_id)
+void tsg_hits_rule_metrics_set(const struct streaminfo *a_stream, struct maat_rule *rules, size_t n_rules, int thread_id)
{
int repeat_cnt=0;
int policy_id[MAX_RESULT_NUM]={0};
@@ -2210,12 +2178,6 @@ int send_event_log(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_h
{
if(is_multi_hit_same_policy(&(rules[i]), policy_id, &repeat_cnt))
{
- MASTER_LOG(_instance->logger, RLOG_LV_DEBUG, LOG_MODULE_SENDLOG,
- "tsg same log:cfg_id=%d service=%d addr=%s",
- rules[i].rule_id,
- rules[i].service_id,
- (a_stream==NULL ? "" : printaddr(&(a_stream->addr), thread_id))
- );
continue;
}
@@ -2223,86 +2185,26 @@ int send_event_log(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_h
{
tsg_set_policy_flow(a_stream, &(rules[i]), thread_id);
}
-
- switch(rules[i].do_log)
- {
- case LOG_ABORT:
- MASTER_LOG(_instance->logger, RLOG_LV_DEBUG, LOG_MODULE_SENDLOG,
- "tsg abort log:cfg_id=%d service=%d addr=%s",
- rules[i].rule_id,
- rules[i].service_id,
- (a_stream==NULL ? "" : printaddr(&(a_stream->addr), thread_id))
- );
-
- fs3_rule_stat_update(RULE_STAT_ABORT, (int)rules[i].action, 1);
- continue;
- break;
- case LOG_ALL:
- if(rules[i].action==TSG_ACTION_MONITOR)
- {
- set_s3_filename(_instance, _handle, a_stream);
- set_mail_eml(_instance, _handle, a_stream);
- }
- break;
- case LOG_NOFILE:
- if(rules[i].action==TSG_ACTION_MONITOR)
- {
- TLD_delete(_handle, _instance->id2field[LOG_COMMON_MAIL_EML_FILE].name);
- TLD_delete(_handle, _instance->id2field[LOG_COMMON_HTTP_REQUEST_S3_FILE].name);
- TLD_delete(_handle, _instance->id2field[LOG_COMMON_HTTP_RESPONSE_S3_FILE].name);
- }
- break;
- default:
- break;
- }
-
- TLD_append(_handle, _instance->id2field[LOG_COMMON_POLICY_ID].name, (void *)(long)(rules[i].rule_id), TLD_TYPE_LONG);
- TLD_append(_handle, _instance->id2field[LOG_COMMON_SERVICE].name, (void *)(long)(rules[i].service_id), TLD_TYPE_LONG);
- TLD_append(_handle, _instance->id2field[LOG_COMMON_VSYSTEM_ID].name, (void *)(long)rules[i].vsys_id, TLD_TYPE_LONG);
- TLD_append(_handle, _instance->id2field[LOG_COMMON_ACTION].name, (void *)(long)(rules[i].action), TLD_TYPE_LONG);
-
- set_policy_action_para_exec_result(_instance, _handle, a_stream, &(rules[i]));
-
- set_xxxx_from_user_region(_handle, _instance, &(rules[i]), thread_id);
-
- send_log_by_type(_instance, _handle, a_stream, log_type, thread_id);
-
- TLD_delete(_handle, _instance->id2field[LOG_COMMON_POLICY_ID].name);
- TLD_delete(_handle, _instance->id2field[LOG_COMMON_SERVICE].name);
- TLD_delete(_handle, _instance->id2field[LOG_COMMON_ACTION].name);
- TLD_delete(_handle, _instance->id2field[LOG_COMMON_USER_REGION].name);
- TLD_delete(_handle, _instance->id2field[LOG_COMMON_VSYSTEM_ID].name);
-
- fs3_rule_stat_update(RULE_STAT_SEND, (int)rules[i].action, 1);
}
-
- return 0;
}
-int deal_event_rules(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream, LOG_TYPE log_type, TSG_SERVICE service, int *is_append_common_field, int thread_id)
+int tsg_enforce_rule_append(const struct streaminfo *a_stream, enum TSG_SERVICE service, struct enforce_rule_result *enforce_rule, size_t n_enforce_rule)
{
- struct matched_policy_rules *matched_rules=(struct matched_policy_rules *)session_matched_rules_get(a_stream, service);
- if(matched_rules==NULL || matched_rules->n_rules==0)
+ size_t offset=0;
+ struct matched_policy_rules *matched_policy=(struct matched_policy_rules *)session_matched_rules_get(a_stream, service);
+ if(matched_policy!=NULL && matched_policy->n_rules>0)
{
- return 0;
- }
+ tsg_hits_rule_metrics_set(a_stream, matched_policy->rules, matched_policy->n_rules, a_stream->threadnum);
+ size_t n_rules=MIN(n_enforce_rule, matched_policy->n_rules);
- if((*is_append_common_field)==0)
- {
- (*is_append_common_field)=1;
- append_common_field(_instance, _handle, a_stream);
- }
-
- if(service==TSG_SERVICE_INTERCEPT)
- {
- set_intercept_info(_instance, _handle, a_stream);
+ memset(enforce_rule, 0, sizeof(struct enforce_rule_result)*n_rules);
+ tsg_rule_log_enable_select(matched_policy->rules, n_rules, enforce_rule, n_enforce_rule, &offset);
+
+ session_matched_rules_free(a_stream, service, (void *)matched_policy);
+ session_matched_rules_async(a_stream, service, NULL);
}
-
- send_event_log(_instance, _handle, a_stream, log_type, matched_rules->rules, matched_rules->n_rules, thread_id);
- session_matched_rules_free(a_stream, service, (void *)matched_rules);
- session_matched_rules_async(a_stream, service, NULL);
- return 1;
+ return offset;
}
int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handle, const struct streaminfo *a_stream, enum LOG_TYPE log_type, struct maat_rule *rules, size_t n_rules, int thread_id)
@@ -2336,60 +2238,87 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl
return 0;
}
- int is_append_common_field=0;
+ TLD_append_streaminfo(_instance, _handle, a_stream);
+ TLD_append(_handle, _instance->id2field[LOG_COMMON_SLED_IP].name, (void *)(_instance->local_ip_str), TLD_TYPE_STRING);
+ if(strlen(g_tsg_para.device_sn)>0)
+ {
+ TLD_append(_handle, _instance->id2field[LOG_COMMON_DEVICE_ID].name, (void *)(g_tsg_para.device_sn), TLD_TYPE_STRING);
+ }
+ TLD_append(_handle, _instance->id2field[LOG_COMMON_DATA_CENTER].name, (void *)tsg_data_center_get(), TLD_TYPE_STRING);
+ TLD_append(_handle, _instance->id2field[LOG_COMMON_DEVICE_TAG].name, (void *)tsg_device_tag_get(), TLD_TYPE_STRING);
+ TLD_append(_handle, _instance->id2field[LOG_COMMON_TRAFFIC_VSYSTEM_ID].name, (void *)(long)_instance->vsystem_id, TLD_TYPE_LONG);
+
+ set_application_behavior(_instance, _handle, a_stream);
+
+ size_t enforce_rule_offset=0;
+ size_t n_enforce_rule=LOGGER_SCRATCH_RULE_MAX;
+ struct enforce_rule_result enforce_rule[n_enforce_rule];
+
switch(log_type)
{
case LOG_TYPE_SECURITY_EVENT:
- append_common_field(instance, handle, a_stream);
- send_event_log(_instance, _handle, a_stream, LOG_TYPE_SECURITY_EVENT, rules, n_rules, thread_id);
- break;
case LOG_TYPE_INTERCEPT_EVENT:
- append_common_field(instance, handle, a_stream);
- send_event_log(_instance, _handle, a_stream, LOG_TYPE_INTERCEPT_EVENT, rules, n_rules, thread_id);
+ memset(enforce_rule, 0, sizeof(struct enforce_rule_result)*n_enforce_rule);
+ tsg_hits_rule_metrics_set(a_stream, rules, n_rules, thread_id);
+ tsg_rule_log_enable_select(rules, n_rules, enforce_rule, n_enforce_rule, &enforce_rule_offset);
+ tsg_rule_log_enable_additional_set(a_stream, enforce_rule, enforce_rule_offset);
+ tsg_rule_log_enable_type_set(enforce_rule, enforce_rule_offset, log_type);
+ tsg_log_ringbuf_produce(_instance, _handle, log_type, enforce_rule, enforce_rule_offset, thread_id);
break;
case LOG_TYPE_SESSION_RECORD:
if(_instance->send_intercept_log==1)
{
- deal_event_rules(_instance, _handle, a_stream, LOG_TYPE_INTERCEPT_EVENT, TSG_SERVICE_INTERCEPT, &is_append_common_field, thread_id);
+ size_t offset=tsg_enforce_rule_append(a_stream, TSG_SERVICE_INTERCEPT, enforce_rule, n_enforce_rule);
+ if(offset>0)
+ {
+ set_intercept_info(_instance, _handle, a_stream);
+ tsg_rule_log_enable_type_set(enforce_rule, offset, LOG_TYPE_INTERCEPT_EVENT);
+ enforce_rule_offset+=offset;
+ }
}
// no break;
case LOG_TYPE_TRANSACTION_RECORD:
- deal_event_rules(_instance, _handle, a_stream, LOG_TYPE_SECURITY_EVENT, TSG_SERVICE_SECURITY, &is_append_common_field, thread_id);
+ {
+ size_t offset=tsg_enforce_rule_append(a_stream, TSG_SERVICE_SECURITY, enforce_rule+enforce_rule_offset, n_enforce_rule-enforce_rule_offset);
+ if(offset>0)
+ {
+ tsg_rule_log_enable_type_set(enforce_rule+enforce_rule_offset, offset, LOG_TYPE_SECURITY_EVENT);
+ tsg_rule_log_enable_additional_set(a_stream, enforce_rule+enforce_rule_offset, offset);
+ enforce_rule_offset+=offset;
+ }
+ }
// no break;
case LOG_TYPE_INTERIM_SESSION_RECORD:
if(session_record_limit(_instance, a_stream, log_type))
{
+ if(enforce_rule_offset>0)
+ {
+ tsg_log_ringbuf_produce(_instance, _handle, LOG_TYPE_MAX, enforce_rule, enforce_rule_offset, thread_id);
+ }
+ else
+ {
+ TLD_cancel(_handle);
+ }
break;
}
- if(is_append_common_field==0)
- {
- append_common_field(_instance, _handle, a_stream);
- }
-
- TLD_append(_handle, _instance->id2field[LOG_COMMON_VSYSTEM_ID].name, (void *)(long)_instance->vsystem_id, TLD_TYPE_LONG);
-
set_sce_profile_ids(_instance, _handle, a_stream);
set_shaping_profile_ids(_instance, _handle, a_stream);
set_shaping_rule_ids(_instance, _handle, a_stream);
set_nat_linkinfo(_instance, _handle, a_stream);
- send_log_by_type(_instance, _handle, a_stream, log_type, thread_id);
+ tsg_log_ringbuf_produce(_instance, _handle, log_type, enforce_rule, enforce_rule_offset, thread_id);
break;
case LOG_TYPE_BGP_RECORD:
case LOG_TYPE_VOIP_RECORD:
case LOG_TYPE_GTPC_RECORD:
case LOG_TYPE_INTERNAL_RTP_RECORD:
- append_common_field(_instance, _handle, a_stream);
- TLD_append(_handle, _instance->id2field[LOG_COMMON_VSYSTEM_ID].name, (void *)(long)_instance->vsystem_id, TLD_TYPE_LONG);
- send_log_by_type(_instance, _handle, a_stream, log_type, thread_id);
+ tsg_log_ringbuf_produce(_instance, _handle, log_type, NULL, 0, thread_id);
break;
default:
- return 0;
+ break;
}
- TLD_cancel(handle);
-
return 0;
}
@@ -2468,3 +2397,231 @@ int tsg_unknown_app_id_get(struct tsg_log_instance_t *instance)
return 0;
}
+
+struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
+{
+ char override_sled_ip[32]={0};
+ char kafka_errstr[1024]={0};
+ unsigned int local_ip_nr=0;
+ rd_kafka_conf_t *rdkafka_conf = NULL;
+ char broker_list[1024]={0};
+ struct tsg_log_instance_t *_instance=NULL;
+ char common_field_file[128]={0};
+ char log_path[128]={0};
+
+ _instance=(struct tsg_log_instance_t *)calloc(1, sizeof(struct tsg_log_instance_t));
+
+ MESA_load_profile_int_def(conffile, "TSG_LOG", "LOG_LEVEL",&(_instance->level), 30);
+ MESA_load_profile_string_def(conffile, "TSG_LOG", "LOG_PATH", log_path, sizeof(log_path), "./log/tsglog");
+ MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_USER_REGION", &(_instance->send_user_region), 0);
+ MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_DATA_CENTER_SWITCH", &(_instance->send_data_center), 0);
+ MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_APP_ID_SWITCH", &(_instance->send_app_id), 0);
+ MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_INTERCEPT_LOG", &(_instance->send_intercept_log), 0);
+ MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_NAT_LINKINFO_SWITCH", &(_instance->send_nat_linkinfo), 0);
+ MESA_load_profile_int_def(conffile, "TSG_LOG", "RAPIDJSON_CHUNK_CAPACITY", &(_instance->rapidjson_chunk_capacity), 2048);
+
+ MESA_load_profile_int_def(conffile, "TSG_LOG", "VSYSTEM_ID", &(_instance->vsystem_id), 1);
+ MESA_load_profile_int_def(conffile, "SYSTEM","UNKNOWN_APP_ID", &_instance->unknown_app_id, 4);
+
+ MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_INTERIM_RECORD", &(_instance->send_interim_log), 1);
+ MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_TRANSCATION_RECORD", &(_instance->send_transcation_log), 1);
+ MESA_load_profile_int_def(conffile, "TSG_LOG","TCP_MIN_PKTS", &_instance->tcp_min_log_pkts, 3);
+ MESA_load_profile_int_def(conffile, "TSG_LOG","TCP_MIN_BYTES", &_instance->tcp_min_log_bytes, 5);
+ MESA_load_profile_int_def(conffile, "TSG_LOG","UDP_MIN_PKTS", &_instance->udp_min_log_pkts, 3);
+ MESA_load_profile_int_def(conffile, "TSG_LOG","UDP_MIN_BYTES", &_instance->udp_min_log_bytes, 5);
+
+ _instance->logger=MESA_create_runtime_log_handle(log_path, _instance->level);
+ if(_instance->logger==NULL)
+ {
+ printf("MESA_create_runtime_log_handle failed ..., path: %s level: %d", log_path, _instance->level);
+ free(_instance);
+ _instance=NULL;
+ return NULL;
+ }
+
+ MESA_load_profile_int_def(conffile, "TSG_LOG", "MODE",&(_instance->mode), 0);
+ if(_instance->mode==CLOSE)
+ {
+ MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG, "Disable tsg_send_log");
+ return _instance;
+ }
+
+ MESA_load_profile_int_def(conffile, "TSG_LOG", "RECOVERY_INTERVEL_S", &(_instance->recovery_interval), 30);
+
+ MESA_load_profile_string_def(conffile, "TSG_LOG", "COMMON_FIELD_FILE", common_field_file, sizeof(common_field_file), NULL);
+ MESA_load_profile_string_def(conffile, "TSG_LOG", "BROKER_LIST", broker_list, sizeof(broker_list), NULL);
+ MESA_load_profile_string_def(conffile, "TSG_LOG", "SASL_USERNAME", _instance->sasl_username, sizeof(_instance->sasl_username), ""); //admin
+ MESA_load_profile_string_def(conffile, "TSG_LOG", "SASL_PASSWD", _instance->sasl_passwd, sizeof(_instance->sasl_passwd), "");
+ MESA_load_profile_string_def(conffile, "TSG_LOG", "COMPRESSION_TYPE", _instance->compression, sizeof(_instance->compression), ""); //snappy
+
+ MESA_load_profile_string_def(conffile, "TSG_LOG", "SEND_QUEUE_MAX_MESSAGE", _instance->send_queue_max_msg, sizeof(_instance->send_queue_max_msg), "1000000");
+ MESA_load_profile_string_def(conffile, "TSG_LOG", "REFRESH_INTERVAL_MS", _instance->refresh_interval_ms, sizeof(_instance->refresh_interval_ms), "600000");
+ MESA_load_profile_string_def(conffile, "TSG_LOG", "REQUIRE_ACK", _instance->require_ack, sizeof(_instance->require_ack), "1");
+
+ MESA_load_profile_string_def(conffile, "SYSTEM", "TCP_LABEL", _instance->tcp_label, sizeof(_instance->tcp_label), "tcp_flow_stat");
+ MESA_load_profile_string_def(conffile, "SYSTEM", "UDP_LABEL", _instance->udp_label, sizeof(_instance->udp_label), "udp_flow_stat");
+
+ _instance->tcp_flow_project_id=project_customer_register(_instance->tcp_label, "struct");
+ _instance->udp_flow_project_id=project_customer_register(_instance->udp_label, "struct");
+ if(_instance->tcp_flow_project_id<0 || _instance->udp_flow_project_id<0)
+ {
+ MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG,
+ "project_customer_register is error, tcp_label: %s udp_label: %s, please check etc/project.conf",
+ _instance->tcp_label,
+ _instance->udp_label
+ );
+ }
+
+ MESA_load_profile_string_def(conffile, "SYSTEM", "OVERRIDE_SLED_IP", override_sled_ip, sizeof(override_sled_ip), "OVERRIDE_SLED_IP");
+ char *sled_ip=getenv(override_sled_ip);
+ if(sled_ip==NULL)
+ {
+ char nic_name[32]={0};
+ MESA_load_profile_string_def(conffile, "SYSTEM", "NIC_NAME", nic_name, sizeof(nic_name), "lo");
+ int ret=MESA_get_dev_ipv4(nic_name, (int *)&local_ip_nr);
+ if(ret<0)
+ {
+ MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG,
+ "GET_LOCAL_IP MESA_get_dev_ipv4 is error, nic_name: %s, please check tsgconf/main.conf",
+ nic_name
+ );
+ return NULL;
+ }
+ inet_ntop(AF_INET,&(local_ip_nr),_instance->local_ip_str,sizeof(_instance->local_ip_str));
+ }
+ else
+ {
+ memcpy(_instance->local_ip_str, sled_ip, MIN(sizeof(_instance->local_ip_str)-1, strlen(sled_ip)));
+ }
+
+ rdkafka_conf = rd_kafka_conf_new();
+ rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", _instance->send_queue_max_msg, kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", _instance->refresh_interval_ms, kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "request.required.acks", _instance->require_ack, kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", broker_list, kafka_errstr, sizeof(kafka_errstr));
+
+ if(strlen(_instance->compression)>0)
+ {
+ rd_kafka_conf_set(rdkafka_conf, "compression.codec", _instance->compression, kafka_errstr, sizeof(kafka_errstr));
+ }
+
+ if(strlen(_instance->sasl_username)> 0 && strlen(_instance->sasl_passwd)>0)
+ {
+ rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "sasl.username", _instance->sasl_username, kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rdkafka_conf, "sasl.password", _instance->sasl_passwd, kafka_errstr, sizeof(kafka_errstr));
+ }
+
+ if(!(_instance->kafka_handle=rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr))))
+ {
+ MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG, "KAFKA_INIT rd_kafka_new is error");
+ return NULL;
+ }
+
+ log_common_fields_new(common_field_file, _instance->id2field, &(_instance->service2topic), &(_instance->max_service));
+
+ if(_instance->service2topic!=NULL)
+ {
+ _instance->sum_stat_row_id=tsg_stat_sendlog_row_init("sum");
+ for(int i=0; i<_instance->max_service; i++)
+ {
+ if(_instance->service2topic[i].type==TLD_TYPE_MAX && strlen(_instance->service2topic[i].name)>0)
+ {
+ register_topic(_instance, &( _instance->service2topic[i]));
+ }
+ }
+ }
+ else
+ {
+ MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG, "KAFKA_INIT log_common_fields_new is error, please check %s", common_field_file);
+ }
+
+ int32_t ringbuf_size=0;
+ MESA_load_profile_int_def(conffile, "TSG_LOG", "RINGBUFF_SIZE", &(ringbuf_size), 100000);
+ MESA_load_profile_int_def(conffile, "TSG_LOG", "RINGBUFF_NUM", &(_instance->ringbuf_num), 1);
+
+ _instance->ringbuf=(struct logger_ringbuf_schema *)calloc(1, _instance->ringbuf_num * sizeof(struct logger_ringbuf_schema));
+
+ for(int i=0; i<_instance->ringbuf_num; i++)
+ {
+ struct logger_ringbuf_schema *ringbuf=&(_instance->ringbuf[i]);
+
+ int thread_num=get_thread_count();
+ size_t ringbuf_obj_size=0;
+ ringbuf_get_sizes(thread_num, &ringbuf_obj_size, NULL);
+ ringbuf->ring=(ringbuf_t *)calloc(1, ringbuf_obj_size);
+
+ ringbuf->size=ringbuf_size;
+ ringbuf->buff=(char *)calloc(1, sizeof(struct logger_scratch)*ringbuf->size);
+ ringbuf_setup(ringbuf->ring, thread_num, sizeof(struct logger_scratch)*ringbuf->size);
+ ringbuf->worker=(ringbuf_worker_t **)calloc(1, sizeof(ringbuf_worker_t *)*thread_num);
+ for(int j=0; j<thread_num; j++)
+ {
+ ringbuf->worker[j]=ringbuf_register(ringbuf->ring, j);
+ }
+
+ char ringbuf_name[32]={0};
+
+ snprintf(ringbuf_name, sizeof(ringbuf_name), "ringbuf_produce_%d", i);
+ ringbuf->produce_row_idx=tsg_stat_sendlog_row_init(ringbuf_name);
+ snprintf(ringbuf_name, sizeof(ringbuf_name), "ringbuf_consume_%d", i);
+ ringbuf->consume_row_idx=tsg_stat_sendlog_row_init(ringbuf_name);
+
+ pthread_create(&(ringbuf->pthread_id), NULL, tsg_log_ringbuf_consume, (void *)ringbuf);
+ pthread_setname_np(ringbuf->pthread_id, "SNEDLOG_RINGBUF");
+ }
+
+
+ return _instance;
+}
+void tsg_sendlog_destroy(struct tsg_log_instance_t * instance)
+{
+ if(instance==NULL)
+ {
+ return ;
+ }
+
+ if(instance->mode!=CLOSE)
+ {
+ for(int i=0; i<instance->max_service; i++)
+ {
+ if(instance->service2topic[i].type!=TLD_TYPE_MAX || i==1) //i=1 equal i=0, service id of security event is 0 and 1
+ {
+ continue;
+ }
+
+ if(instance->service2topic[i].topic_rkt!=NULL)
+ {
+ rd_kafka_topic_destroy(instance->service2topic[i].topic_rkt);
+ }
+
+ if(instance->service2topic[i].drop_start!=NULL)
+ {
+ free(instance->service2topic[i].drop_start);
+ instance->service2topic[i].drop_start=NULL;
+ }
+
+ if(instance->service2topic[i].send_log_percent!=NULL)
+ {
+ free(instance->service2topic[i].send_log_percent);
+ instance->service2topic[i].send_log_percent=NULL;
+ }
+ }
+
+ //rd_kafka_destroy_flags(instance->kafka_handle, 4);
+ rd_kafka_destroy(instance->kafka_handle);
+
+ free(instance->service2topic);
+ instance->service2topic=NULL;
+ }
+
+
+ MESA_destroy_runtime_log_handle(instance->logger);
+ instance->logger=NULL;
+
+ free(instance);
+ instance=NULL;
+ return ;
+}
diff --git a/src/tsg_send_log_internal.h b/src/tsg_send_log_internal.h
index 25a9c89..51e8b91 100644
--- a/src/tsg_send_log_internal.h
+++ b/src/tsg_send_log_internal.h
@@ -5,6 +5,7 @@
#include <librdkafka/rdkafka.h>
#include <MESA/cJSON.h>
#include <time.h>
+#include "ringbuf/ringbuf.h"
#define MIN_L7_PROTO_ID 100
#define MAX_L7_PROTO_ID 150
@@ -175,10 +176,47 @@ struct topic_stat
rd_kafka_topic_t *topic_rkt;
};
+struct enforce_rule_result
+{
+ int log_type; //enum LOG_TYPE
+ struct maat_rule rule;
+
+ char *mail_eml_filename;
+ char *http_request_filename;
+ char *http_response_filename;
+
+ long mirrored_bytes;
+ long mirrored_packets;
+ char *packet_capture_file;
+};
+
+#define LOGGER_SCRATCH_RULE_MAX 16
+struct logger_scratch
+{
+ int thread_id;
+ int n_enforce_rule;
+ int log_type; // enum LOG_TYPE
+ struct TLD_handle_t *handle;
+ struct tsg_log_instance_t *instance;
+ struct enforce_rule_result *enforce_rule;
+};
+
+struct logger_ringbuf_schema
+{
+ int produce_row_idx;
+ int consume_row_idx;
+ pthread_t pthread_id;
+ size_t size;
+ char *buff;
+ ringbuf_t *ring;
+ ringbuf_worker_t **worker;
+};
+
struct tsg_log_instance_t
{
int mode;
int level;
+ int ringbuf_num;
int max_service;
int vsystem_id;
int unknown_app_id;
@@ -210,6 +248,7 @@ struct tsg_log_instance_t
id2field_t id2field[LOG_COMMON_MAX];
rd_kafka_t *kafka_handle;
struct topic_stat *service2topic;
+ struct logger_ringbuf_schema *ringbuf;
void *logger;
};
diff --git a/test/src/CMakeLists.txt b/test/src/CMakeLists.txt
index 2a25309..2915259 100644
--- a/test/src/CMakeLists.txt
+++ b/test/src/CMakeLists.txt
@@ -9,20 +9,48 @@ include_directories(${PROJECT_SOURCE_DIR}/src/)
add_definitions(-std=c++11)
LINK_DIRECTORIES(/opt/MESA/lib)
-add_executable(gtest_rule ${PROJECT_SOURCE_DIR}/src/tsg_rule.cpp ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp gtest_common.cpp gtest_rule.cpp)
+add_executable(gtest_rule ${PROJECT_SOURCE_DIR}/src/tsg_rule.cpp
+ ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp
+ ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp
+ gtest_common.cpp gtest_rule.cpp
+ )
target_link_libraries(gtest_rule gtest-static ctemplate-static cjson MESA_prof_load MESA_handle_logger maat4 fieldstat3)
-add_executable(gtest_tableinfo ${PROJECT_SOURCE_DIR}/src/tsg_rule.cpp ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp gtest_common.cpp gtest_tableinfo.cpp)
+add_executable(gtest_tableinfo ${PROJECT_SOURCE_DIR}/src/tsg_rule.cpp
+ ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp
+ ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp
+ gtest_common.cpp gtest_tableinfo.cpp
+ )
target_link_libraries(gtest_tableinfo gtest-static ctemplate-static cjson MESA_prof_load MESA_handle_logger maat4 fieldstat3)
-add_executable(gtest_bridge ${PROJECT_SOURCE_DIR}/src/tsg_bridge.cpp ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp gtest_common.cpp gtest_bridge.cpp)
+add_executable(gtest_bridge ${PROJECT_SOURCE_DIR}/src/tsg_bridge.cpp
+ ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp
+ ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp
+ gtest_common.cpp gtest_bridge.cpp
+ )
target_link_libraries(gtest_bridge gtest-static ctemplate-static cjson MESA_prof_load MESA_handle_logger maat4 fieldstat3)
-add_executable(gtest_action ${PROJECT_SOURCE_DIR}/src/tsg_action.cpp ${PROJECT_SOURCE_DIR}/src/tsg_leaky_bucket.cpp ${PROJECT_SOURCE_DIR}/src/tsg_dns.cpp ${PROJECT_SOURCE_DIR}/src/tsg_icmp.cpp ${PROJECT_SOURCE_DIR}/src/tsg_tamper.cpp ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp gtest_common.cpp gtest_action.cpp)
+add_executable(gtest_action ${PROJECT_SOURCE_DIR}/src/tsg_action.cpp
+ ${PROJECT_SOURCE_DIR}/src/tsg_leaky_bucket.cpp
+ ${PROJECT_SOURCE_DIR}/src/tsg_dns.cpp
+ ${PROJECT_SOURCE_DIR}/src/tsg_icmp.cpp
+ ${PROJECT_SOURCE_DIR}/src/tsg_tamper.cpp
+ ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp
+ ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp
+ gtest_common.cpp
+ gtest_action.cpp
+ )
target_link_libraries(gtest_action gtest-static ctemplate-static cjson MESA_prof_load MESA_handle_logger maat4 fieldstat3)
-add_executable(gtest_sendlog ${PROJECT_SOURCE_DIR}/src/tsg_send_log.cpp ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp gtest_common.cpp gtest_kafka.cpp gtest_sendlog.cpp)
+add_executable(gtest_sendlog ${PROJECT_SOURCE_DIR}/src/tsg_send_log.cpp
+ ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp
+ ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp
+ ${PROJECT_SOURCE_DIR}/deps/ringbuf/ringbuf.c
+ gtest_common.cpp
+ gtest_kafka.cpp
+ gtest_sendlog.cpp
+ )
target_link_libraries(gtest_sendlog gtest-static ctemplate-static cjson MESA_prof_load MESA_handle_logger maat4 rdkafka fieldstat3)
set(TSG_MASTER_SRC ${PROJECT_SOURCE_DIR}/src/tsg_entry.cpp
@@ -44,6 +72,7 @@ set(TSG_MASTER_SRC ${PROJECT_SOURCE_DIR}/src/tsg_entry.cpp
${PROJECT_SOURCE_DIR}/src/mpack.c
${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp
${PROJECT_SOURCE_DIR}/src/tsg_ssl_ja3_fingerprint.cpp
+ ${PROJECT_SOURCE_DIR}/deps/ringbuf/ringbuf.c
)
add_executable(gtest_master ${TSG_MASTER_SRC} gtest_kafka.cpp gtest_common.cpp gtest_master.cpp)