diff options
| author | liuxueli <[email protected]> | 2024-01-26 14:42:23 +0800 |
|---|---|---|
| committer | liuxueli <[email protected]> | 2024-02-01 16:46:42 +0800 |
| commit | 01f6c61298d56cdf0a99300636bbb8ed5bf0b0fa (patch) | |
| tree | 0a0d4601e3ba6ee3c1c811cdd162026adfc6d85f | |
| parent | 82ce053f67b78b03d89641a2ee53b7825c2b2fa9 (diff) | |
同步发送日志调整为异步发送日志feature-aync-sendlog-ringbuff
| -rw-r--r-- | CMakeLists.txt | 4 | ||||
| -rw-r--r-- | deps/ringbuf/LICENSE | 25 | ||||
| -rw-r--r-- | deps/ringbuf/README.md | 104 | ||||
| -rw-r--r-- | deps/ringbuf/ringbuf.c | 430 | ||||
| -rw-r--r-- | deps/ringbuf/ringbuf.h | 29 | ||||
| -rw-r--r-- | deps/ringbuf/utils.h | 115 | ||||
| -rw-r--r-- | inc/tsg_send_log.h | 3 | ||||
| -rw-r--r-- | src/CMakeLists.txt | 20 | ||||
| -rw-r--r-- | src/tsg_send_log.cpp | 889 | ||||
| -rw-r--r-- | src/tsg_send_log_internal.h | 39 | ||||
| -rw-r--r-- | test/src/CMakeLists.txt | 39 |
11 files changed, 1324 insertions, 373 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index f6a2582..268e7ee 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,9 +8,13 @@ include(Version) set(CMAKE_MACOSX_RPATH 0) +set(CMAKE_C_STANDARD 99) +set(CMAKE_CXX_STANDARD 17) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g") +include_directories(${PROJECT_SOURCE_DIR}/deps/) include_directories(${PROJECT_SOURCE_DIR}/inc/) include_directories(/opt/MESA/include/) diff --git a/deps/ringbuf/LICENSE b/deps/ringbuf/LICENSE new file mode 100644 index 0000000..4f86ee0 --- /dev/null +++ b/deps/ringbuf/LICENSE @@ -0,0 +1,25 @@ +/*- + * Copyright (c) 2016-2017 Mindaugas Rasiukevicius <rmind at noxt eu> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ diff --git a/deps/ringbuf/README.md b/deps/ringbuf/README.md new file mode 100644 index 0000000..e5b849d --- /dev/null +++ b/deps/ringbuf/README.md @@ -0,0 +1,104 @@ +# Lock-free ring buffer + +[](https://travis-ci.org/rmind/ringbuf) + +Lock-free multi-producer single-consumer (MPSC) ring buffer which supports +contiguous range operations and which can be conveniently used for message +passing. The implementation is written in C11 and distributed under the +2-clause BSD license. + +## API + +* `int ringbuf_setup(ringbuf_t *rbuf, unsigned nworkers, size_t length)` + * Setup a new ring buffer of a given _length_. The `rbuf` is a pointer + to the opaque ring buffer object; the caller is responsible to allocate + the space for this object. Typically, the object would be allocated + dynamically if using threads or reserved in a shared memory blocked if + using processes. The allocation size for the object shall be obtained + using the `ringbuf_get_sizes` function. Returns 0 on success and -1 + on failure. + +* `void ringbuf_get_sizes(unsigned nworkers, size_t *ringbuf_obj_size, size_t *ringbuf_worker_size)` + * Returns the size of the opaque `ringbuf_t` and, optionally, `ringbuf_worker_t` structures. + The size of the `ringbuf_t` structure depends on the number of workers, + specified by the `nworkers` parameter. + +* `ringbuf_worker_t *ringbuf_register(ringbuf_t *rbuf, unsigned i)` + * Register the current worker (thread or process) as a producer. Each + producer MUST register itself. The `i` is a worker number, starting + from zero (i.e. shall be than `nworkers` used in the setup). On success, + returns a pointer to an opaque `ringbuf_worker_t` structured, which is + a part of the `ringbuf_t` memory block. On failure, returns `NULL`. + +* `void ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *worker)` + * Unregister the specified worker from the list of producers. + +* `ssize_t ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *worker, size_t len)` + * Request a space of a given length in the ring buffer. Returns the + offset at which the space is available or -1 on failure. Once the data + is ready (typically, when writing to the ring buffer is complete), the + `ringbuf_produce` function must be called to indicate that. Nested + acquire calls are not allowed. + +* `void ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *worker)` + * Indicate that the acquired range in the buffer is produced and is ready + to be consumed. + +* `size_t ringbuf_consume(ringbuf_t *rbuf, size_t *offset)` + * Get a contiguous range which is ready to be consumed. Returns zero + if there is no data available for consumption. Once the data is + consumed (typically, when reading from the ring buffer is complete), + the `ringbuf_release` function must be called to indicate that. + +* `void ringbuf_release(ringbuf_t *rbuf, size_t nbytes)` + * Indicate that the consumed range can now be released and may now be + reused by the producers. + +## Notes + +The consumer will return a contiguous block of ranges produced i.e. the +`ringbuf_consume` call will not return partial ranges. If you think of +produced range as a message, then consumer will return a block of messages, +always ending at the message boundary. Such behaviour allows us to use +this ring buffer implementation as a message queue. + +The implementation was extensively tested on a 24-core x86 machine, +see [the stress test](src/t_stress.c) for the details on the technique. +It also provides an example how the mechanism can be used for message +passing. + +## Caveats + +This ring buffer implementation always provides a contiguous range of +space for the producer. It is achieved by an early wrap-around if the +requested range cannot fit in the end. The implication of this is that +the `ringbuf_acquire` call may fail if the requested range is greater +than half of the buffer size. Hence, it may be necessary to ensure that +the ring buffer size is at least twice as large as the maximum production +unit size. + +It should also be noted that one of the trade-offs of such design is that +the consumer currently performs an O(n) scan on the list of producers. + +## Example + +Producers: +```c +if ((w = ringbuf_register(r, worker_id)) == NULL) + err(EXIT_FAILURE, "ringbuf_register") + +... + +if ((off = ringbuf_acquire(r, w, len)) != -1) { + memcpy(&buf[off], payload, len); + ringbuf_produce(r, tls); +} +``` + +Consumer: +```c +if ((len = ringbuf_consume(r, &off)) != 0) { + process(&buf[off], len); + ringbuf_release(r, len); +} +``` diff --git a/deps/ringbuf/ringbuf.c b/deps/ringbuf/ringbuf.c new file mode 100644 index 0000000..75cfee9 --- /dev/null +++ b/deps/ringbuf/ringbuf.c @@ -0,0 +1,430 @@ +/* + * Copyright (c) 2016-2017 Mindaugas Rasiukevicius <rmind at noxt eu> + * All rights reserved. + * + * Use is subject to license terms, as specified in the LICENSE file. + */ + +/* + * Atomic multi-producer single-consumer ring buffer, which supports + * contiguous range operations and which can be conveniently used for + * message passing. + * + * There are three offsets -- think of clock hands: + * - NEXT: marks the beginning of the available space, + * - WRITTEN: the point up to which the data is actually written. + * - Observed READY: point up to which data is ready to be written. + * + * Producers + * + * Observe and save the 'next' offset, then request N bytes from + * the ring buffer by atomically advancing the 'next' offset. Once + * the data is written into the "reserved" buffer space, the thread + * clears the saved value; these observed values are used to compute + * the 'ready' offset. + * + * Consumer + * + * Writes the data between 'written' and 'ready' offsets and updates + * the 'written' value. The consumer thread scans for the lowest + * seen value by the producers. + * + * Key invariant + * + * Producers cannot go beyond the 'written' offset; producers are + * also not allowed to catch up with the consumer. Only the consumer + * is allowed to catch up with the producer i.e. set the 'written' + * offset to be equal to the 'next' offset. + * + * Wrap-around + * + * If the producer cannot acquire the requested length due to little + * available space at the end of the buffer, then it will wraparound. + * WRAP_LOCK_BIT in 'next' offset is used to lock the 'end' offset. + * + * There is an ABA problem if one producer stalls while a pair of + * producer and consumer would both successfully wrap-around and set + * the 'next' offset to the stale value of the first producer, thus + * letting it to perform a successful CAS violating the invariant. + * A counter in the 'next' offset (masked by WRAP_COUNTER) is used + * to prevent from this problem. It is incremented on wraparounds. + * + * The same ABA problem could also cause a stale 'ready' offset, + * which could be observed by the consumer. We set WRAP_LOCK_BIT in + * the 'seen' value before advancing the 'next' and clear this bit + * after the successful advancing; this ensures that only the stable + * 'ready' is observed by the consumer. + */ + +#include <stdio.h> +#include <stdlib.h> +#include <stddef.h> +#include <stdbool.h> +#include <inttypes.h> +#include <string.h> +#include <limits.h> +#include <errno.h> + +#include "ringbuf.h" +#include "utils.h" + +#define RBUF_OFF_MASK (0x00000000ffffffffUL) +#define WRAP_LOCK_BIT (0x8000000000000000UL) +#define RBUF_OFF_MAX (UINT64_MAX & ~WRAP_LOCK_BIT) + +#define WRAP_COUNTER (0x7fffffff00000000UL) +#define WRAP_INCR(x) (((x) + 0x100000000UL) & WRAP_COUNTER) + +typedef uint64_t ringbuf_off_t; + +struct ringbuf_worker { + volatile ringbuf_off_t seen_off; + int registered; +}; + +struct ringbuf { + /* Ring buffer space. */ + size_t space; + + /* + * The NEXT hand is atomically updated by the producer. + * WRAP_LOCK_BIT is set in case of wrap-around; in such case, + * the producer can update the 'end' offset. + */ + volatile ringbuf_off_t next; + ringbuf_off_t end; + + /* The following are updated by the consumer. */ + ringbuf_off_t written; + unsigned nworkers; + ringbuf_worker_t workers[]; +}; + +/* + * ringbuf_setup: initialise a new ring buffer of a given length. + */ +int +ringbuf_setup(ringbuf_t *rbuf, unsigned nworkers, size_t length) +{ + if (length >= RBUF_OFF_MASK) { + errno = EINVAL; + return -1; + } + memset(rbuf, 0, offsetof(ringbuf_t, workers[nworkers])); + rbuf->space = length; + rbuf->end = RBUF_OFF_MAX; + rbuf->nworkers = nworkers; + return 0; +} + +/* + * ringbuf_get_sizes: return the sizes of the ringbuf_t and ringbuf_worker_t. + */ +void +ringbuf_get_sizes(unsigned nworkers, + size_t *ringbuf_size, size_t *ringbuf_worker_size) +{ + if (ringbuf_size) + *ringbuf_size = offsetof(ringbuf_t, workers[nworkers]); + if (ringbuf_worker_size) + *ringbuf_worker_size = sizeof(ringbuf_worker_t); +} + +/* + * ringbuf_register: register the worker (thread/process) as a producer + * and pass the pointer to its local store. + */ +ringbuf_worker_t * +ringbuf_register(ringbuf_t *rbuf, unsigned i) +{ + ringbuf_worker_t *w = &rbuf->workers[i]; + + w->seen_off = RBUF_OFF_MAX; + atomic_store_explicit(&w->registered, true, memory_order_release); + return w; +} + +void +ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *w) +{ + w->registered = false; + (void)rbuf; +} + +/* + * stable_nextoff: capture and return a stable value of the 'next' offset. + */ +static inline ringbuf_off_t +stable_nextoff(ringbuf_t *rbuf) +{ + unsigned count = SPINLOCK_BACKOFF_MIN; + ringbuf_off_t next; +retry: + next = atomic_load_explicit(&rbuf->next, memory_order_acquire); + if (next & WRAP_LOCK_BIT) { + SPINLOCK_BACKOFF(count); + goto retry; + } + ASSERT((next & RBUF_OFF_MASK) < rbuf->space); + return next; +} + +/* + * stable_seenoff: capture and return a stable value of the 'seen' offset. + */ +static inline ringbuf_off_t +stable_seenoff(ringbuf_worker_t *w) +{ + unsigned count = SPINLOCK_BACKOFF_MIN; + ringbuf_off_t seen_off; +retry: + seen_off = atomic_load_explicit(&w->seen_off, memory_order_acquire); + if (seen_off & WRAP_LOCK_BIT) { + SPINLOCK_BACKOFF(count); + goto retry; + } + return seen_off; +} + +/* + * ringbuf_acquire: request a space of a given length in the ring buffer. + * + * => On success: returns the offset at which the space is available. + * => On failure: returns -1. + */ +ssize_t +ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len) +{ + ringbuf_off_t seen, next, target; + + ASSERT(len > 0 && len <= rbuf->space); + ASSERT(w->seen_off == RBUF_OFF_MAX); + + do { + ringbuf_off_t written; + + /* + * Get the stable 'next' offset. Save the observed 'next' + * value (i.e. the 'seen' offset), but mark the value as + * unstable (set WRAP_LOCK_BIT). + * + * Note: CAS will issue a memory_order_release for us and + * thus ensures that it reaches global visibility together + * with new 'next'. + */ + seen = stable_nextoff(rbuf); + next = seen & RBUF_OFF_MASK; + ASSERT(next < rbuf->space); + atomic_store_explicit(&w->seen_off, next | WRAP_LOCK_BIT, + memory_order_relaxed); + + /* + * Compute the target offset. Key invariant: we cannot + * go beyond the WRITTEN offset or catch up with it. + */ + target = next + len; + written = rbuf->written; + if (__predict_false(next < written && target >= written)) { + /* The producer must wait. */ + atomic_store_explicit(&w->seen_off, + RBUF_OFF_MAX, memory_order_release); + return -1; + } + + if (__predict_false(target >= rbuf->space)) { + const bool exceed = target > rbuf->space; + + /* + * Wrap-around and start from the beginning. + * + * If we would exceed the buffer, then attempt to + * acquire the WRAP_LOCK_BIT and use the space in + * the beginning. If we used all space exactly to + * the end, then reset to 0. + * + * Check the invariant again. + */ + target = exceed ? (WRAP_LOCK_BIT | len) : 0; + if ((target & RBUF_OFF_MASK) >= written) { + atomic_store_explicit(&w->seen_off, + RBUF_OFF_MAX, memory_order_release); + return -1; + } + /* Increment the wrap-around counter. */ + target |= WRAP_INCR(seen & WRAP_COUNTER); + } else { + /* Preserve the wrap-around counter. */ + target |= seen & WRAP_COUNTER; + } + } while (!atomic_compare_exchange_weak(&rbuf->next, &seen, target)); + + /* + * Acquired the range. Clear WRAP_LOCK_BIT in the 'seen' value + * thus indicating that it is stable now. + * + * No need for memory_order_release, since CAS issued a fence. + */ + atomic_store_explicit(&w->seen_off, w->seen_off & ~WRAP_LOCK_BIT, + memory_order_relaxed); + + /* + * If we set the WRAP_LOCK_BIT in the 'next' (because we exceed + * the remaining space and need to wrap-around), then save the + * 'end' offset and release the lock. + */ + if (__predict_false(target & WRAP_LOCK_BIT)) { + /* Cannot wrap-around again if consumer did not catch-up. */ + ASSERT(rbuf->written <= next); + ASSERT(rbuf->end == RBUF_OFF_MAX); + rbuf->end = next; + next = 0; + + /* + * Unlock: ensure the 'end' offset reaches global + * visibility before the lock is released. + */ + atomic_store_explicit(&rbuf->next, + (target & ~WRAP_LOCK_BIT), memory_order_release); + } + ASSERT((target & RBUF_OFF_MASK) <= rbuf->space); + return (ssize_t)next; +} + +/* + * ringbuf_produce: indicate the acquired range in the buffer is produced + * and is ready to be consumed. + */ +void +ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *w) +{ + (void)rbuf; + ASSERT(w->registered); + ASSERT(w->seen_off != RBUF_OFF_MAX); + atomic_store_explicit(&w->seen_off, RBUF_OFF_MAX, memory_order_release); +} + +/* + * ringbuf_consume: get a contiguous range which is ready to be consumed. + */ +size_t +ringbuf_consume(ringbuf_t *rbuf, size_t *offset) +{ + ringbuf_off_t written = rbuf->written, next, ready; + size_t towrite; +retry: + /* + * Get the stable 'next' offset. Note: stable_nextoff() issued + * a load memory barrier. The area between the 'written' offset + * and the 'next' offset will be the *preliminary* target buffer + * area to be consumed. + */ + next = stable_nextoff(rbuf) & RBUF_OFF_MASK; + if (written == next) { + /* If producers did not advance, then nothing to do. */ + return 0; + } + + /* + * Observe the 'ready' offset of each producer. + * + * At this point, some producer might have already triggered the + * wrap-around and some (or all) seen 'ready' values might be in + * the range between 0 and 'written'. We have to skip them. + */ + ready = RBUF_OFF_MAX; + + for (unsigned i = 0; i < rbuf->nworkers; i++) { + ringbuf_worker_t *w = &rbuf->workers[i]; + ringbuf_off_t seen_off; + + /* + * Skip if the worker has not registered. + * + * Get a stable 'seen' value. This is necessary since we + * want to discard the stale 'seen' values. + */ + if (!atomic_load_explicit(&w->registered, memory_order_relaxed)) + continue; + seen_off = stable_seenoff(w); + + /* + * Ignore the offsets after the possible wrap-around. + * We are interested in the smallest seen offset that is + * not behind the 'written' offset. + */ + if (seen_off >= written) { + ready = MIN(seen_off, ready); + } + ASSERT(ready >= written); + } + + /* + * Finally, we need to determine whether wrap-around occurred + * and deduct the safe 'ready' offset. + */ + if (next < written) { + const ringbuf_off_t end = MIN(rbuf->space, rbuf->end); + + /* + * Wrap-around case. Check for the cut off first. + * + * Reset the 'written' offset if it reached the end of + * the buffer or the 'end' offset (if set by a producer). + * However, we must check that the producer is actually + * done (the observed 'ready' offsets are clear). + */ + if (ready == RBUF_OFF_MAX && written == end) { + /* + * Clear the 'end' offset if was set. + */ + if (rbuf->end != RBUF_OFF_MAX) { + rbuf->end = RBUF_OFF_MAX; + } + + /* + * Wrap-around the consumer and start from zero. + */ + written = 0; + atomic_store_explicit(&rbuf->written, + written, memory_order_release); + goto retry; + } + + /* + * We cannot wrap-around yet; there is data to consume at + * the end. The ready range is smallest of the observed + * 'ready' or the 'end' offset. If neither is set, then + * the actual end of the buffer. + */ + ASSERT(ready > next); + ready = MIN(ready, end); + ASSERT(ready >= written); + } else { + /* + * Regular case. Up to the observed 'ready' (if set) + * or the 'next' offset. + */ + ready = MIN(ready, next); + } + towrite = ready - written; + *offset = written; + + ASSERT(ready >= written); + ASSERT(towrite <= rbuf->space); + return towrite; +} + +/* + * ringbuf_release: indicate that the consumed range can now be released. + */ +void +ringbuf_release(ringbuf_t *rbuf, size_t nbytes) +{ + const size_t nwritten = rbuf->written + nbytes; + + ASSERT(rbuf->written <= rbuf->space); + ASSERT(rbuf->written <= rbuf->end); + ASSERT(nwritten <= rbuf->space); + + rbuf->written = (nwritten == rbuf->space) ? 0 : nwritten; +} diff --git a/deps/ringbuf/ringbuf.h b/deps/ringbuf/ringbuf.h new file mode 100644 index 0000000..e8fc767 --- /dev/null +++ b/deps/ringbuf/ringbuf.h @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2016 Mindaugas Rasiukevicius <rmind at noxt eu> + * All rights reserved. + * + * Use is subject to license terms, as specified in the LICENSE file. + */ + +#ifndef _RINGBUF_H_ +#define _RINGBUF_H_ + +__BEGIN_DECLS + +typedef struct ringbuf ringbuf_t; +typedef struct ringbuf_worker ringbuf_worker_t; + +int ringbuf_setup(ringbuf_t *, unsigned, size_t); +void ringbuf_get_sizes(unsigned, size_t *, size_t *); + +ringbuf_worker_t *ringbuf_register(ringbuf_t *, unsigned); +void ringbuf_unregister(ringbuf_t *, ringbuf_worker_t *); + +ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t *, size_t); +void ringbuf_produce(ringbuf_t *, ringbuf_worker_t *); +size_t ringbuf_consume(ringbuf_t *, size_t *); +void ringbuf_release(ringbuf_t *, size_t); + +__END_DECLS + +#endif diff --git a/deps/ringbuf/utils.h b/deps/ringbuf/utils.h new file mode 100644 index 0000000..413157b --- /dev/null +++ b/deps/ringbuf/utils.h @@ -0,0 +1,115 @@ +/* + * Copyright (c) 1991, 1993 + * The Regents of the University of California. All rights reserved. + * + * This code is derived from software contributed to Berkeley by + * Berkeley Software Design, Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the University nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + * @(#)cdefs.h 8.8 (Berkeley) 1/9/95 + */ + +#ifndef _UTILS_H_ +#define _UTILS_H_ + +#include <assert.h> + +/* + * A regular assert (debug/diagnostic only). + */ +#if defined(DEBUG) +#define ASSERT assert +#else +#define ASSERT(x) +#endif + +/* + * Minimum, maximum and rounding macros. + */ + +#ifndef MIN +#define MIN(x, y) ((x) < (y) ? (x) : (y)) +#endif + +#ifndef MAX +#define MAX(x, y) ((x) > (y) ? (x) : (y)) +#endif + +/* + * Branch prediction macros. + */ +#ifndef __predict_true +#define __predict_true(x) __builtin_expect((x) != 0, 1) +#define __predict_false(x) __builtin_expect((x) != 0, 0) +#endif + +/* + * Atomic operations and memory barriers. If C11 API is not available, + * then wrap the GCC builtin routines. + * + * Note: This atomic_compare_exchange_weak does not do the C11 thing of + * filling *(expected) with the actual value, because we don't need + * that here. + */ +#ifndef atomic_compare_exchange_weak +#define atomic_compare_exchange_weak(ptr, expected, desired) \ + __sync_bool_compare_and_swap(ptr, *(expected), desired) +#endif + +#ifndef atomic_thread_fence +#define memory_order_relaxed __ATOMIC_RELAXED +#define memory_order_acquire __ATOMIC_ACQUIRE +#define memory_order_release __ATOMIC_RELEASE +#define memory_order_seq_cst __ATOMIC_SEQ_CST +#define atomic_thread_fence(m) __atomic_thread_fence(m) +#endif +#ifndef atomic_store_explicit +#define atomic_store_explicit __atomic_store_n +#endif +#ifndef atomic_load_explicit +#define atomic_load_explicit __atomic_load_n +#endif + +/* + * Exponential back-off for the spinning paths. + */ +#define SPINLOCK_BACKOFF_MIN 4 +#define SPINLOCK_BACKOFF_MAX 128 +#if defined(__x86_64__) || defined(__i386__) +#define SPINLOCK_BACKOFF_HOOK __asm volatile("pause" ::: "memory") +#else +#define SPINLOCK_BACKOFF_HOOK +#endif +#define SPINLOCK_BACKOFF(count) \ +do { \ + for (int __i = (count); __i != 0; __i--) { \ + SPINLOCK_BACKOFF_HOOK; \ + } \ + if ((count) < SPINLOCK_BACKOFF_MAX) \ + (count) += (count); \ +} while (/* CONSTCOND */ 0); + +#endif diff --git a/inc/tsg_send_log.h b/inc/tsg_send_log.h index 5951be8..6a2177a 100644 --- a/inc/tsg_send_log.h +++ b/inc/tsg_send_log.h @@ -29,7 +29,8 @@ enum LOG_TYPE LOG_TYPE_TRANSACTION_RECORD, LOG_TYPE_GTPC_RECORD, LOG_TYPE_BGP_RECORD, - LOG_TYPE_INTERCEPT_EVENT + LOG_TYPE_INTERCEPT_EVENT, + LOG_TYPE_MAX }; struct TLD_handle_t; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d48f4a5..6b371b7 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -2,7 +2,25 @@ cmake_minimum_required(VERSION 2.8...3.10) add_definitions(-fPIC) -set(SRC tsg_entry.cpp tsg_rule.cpp tsg_ssl_utils.cpp tsg_send_log.cpp tsg_statistic.cpp tsg_ssh_utils.cpp tsg_gtp_signaling.cpp tsg_action.cpp tsg_leaky_bucket.cpp tsg_dns.cpp tsg_icmp.cpp tsg_tamper.cpp tsg_bridge.cpp tsg_sync_state.cpp tsg_variable.cpp tsg_proxy.cpp mpack.c tsg_stat.cpp tsg_ssl_ja3_fingerprint.cpp) +set(SRC tsg_entry.cpp + tsg_rule.cpp + tsg_ssl_utils.cpp + tsg_send_log.cpp + tsg_statistic.cpp + tsg_ssh_utils.cpp + tsg_gtp_signaling.cpp + tsg_action.cpp + tsg_leaky_bucket.cpp + tsg_dns.cpp tsg_icmp.cpp + tsg_tamper.cpp tsg_bridge.cpp + tsg_sync_state.cpp + tsg_variable.cpp + tsg_proxy.cpp + mpack.c + tsg_stat.cpp + tsg_ssl_ja3_fingerprint.cpp + ${PROJECT_SOURCE_DIR}/deps/ringbuf/ringbuf.c + ) include_directories(${CMAKE_SOURCE_DIR}/inc) include_directories(/opt/MESA/include/MESA/) diff --git a/src/tsg_send_log.cpp b/src/tsg_send_log.cpp index 62bb7e2..3d8bbee 100644 --- a/src/tsg_send_log.cpp +++ b/src/tsg_send_log.cpp @@ -56,6 +56,21 @@ const id2field_t tld_type[TLD_TYPE_MAX]={{TLD_TYPE_UNKNOWN, TLD_TYPE_UNKNOWN, " extern "C" int MESA_get_dev_ipv4(const char *device, int *ip_add); +char *tsg_string_dup(char *string) +{ + if(string==NULL) + { + return NULL; + } + + size_t len=strlen(string); + char *dup=(char *)malloc(len+1); + memcpy(dup, string, len); + dup[len]='\0'; + + return dup; +} + static int string_cat(char *dst, int dst_len, char *src) { if(dst==NULL || dst_len<=0 || src==NULL) @@ -1195,48 +1210,6 @@ int TLD_convert_json(struct TLD_handle_t *_handle, char *buff, unsigned int buff return 1; } -static int set_mail_eml(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream) -{ - struct tsg_conn_sketch_notify_data *notify_mail=(struct tsg_conn_sketch_notify_data *)session_conn_sketch_notify_data_get(a_stream); - if(notify_mail!=NULL && notify_mail->pdata.mail_eml_filename!=NULL && notify_mail->protocol==PROTO_MAIL) - { - TLD_delete(_handle, _instance->id2field[LOG_COMMON_MAIL_EML_FILE].name); - TLD_append(_handle, _instance->id2field[LOG_COMMON_MAIL_EML_FILE].name, (void *)notify_mail->pdata.mail_eml_filename, TLD_TYPE_STRING); - return 1; - } - - return 0; -} - -static int set_s3_filename(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream) -{ - struct business_notify_data *bnd_label=(struct business_notify_data *)session_business_data_get(a_stream); - if(bnd_label==NULL || bnd_label->pdata==NULL) - { - return 0; - } - - switch(bnd_label->proto) - { - case PROTO_HTTP: - TLD_delete(_handle, _instance->id2field[LOG_COMMON_HTTP_REQUEST_S3_FILE].name); - TLD_delete(_handle, _instance->id2field[LOG_COMMON_HTTP_RESPONSE_S3_FILE].name); - - if(bnd_label->s3_http==NULL) - { - break; - } - - TLD_append(_handle, _instance->id2field[LOG_COMMON_HTTP_REQUEST_S3_FILE].name, bnd_label->s3_http->request_filename, TLD_TYPE_STRING); - TLD_append(_handle, _instance->id2field[LOG_COMMON_HTTP_RESPONSE_S3_FILE].name, bnd_label->s3_http->response_filename, TLD_TYPE_STRING); - break; - default: - break; - } - - return 1; -} - int set_nat_linkinfo(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream) { if(_instance->send_nat_linkinfo==0 || a_stream==NULL) @@ -1578,14 +1551,11 @@ static int is_hitted_shunt(const struct streaminfo *a_stream) return 0; } -static int set_xxxx_from_user_region(struct TLD_handle_t *_handle, struct tsg_log_instance_t *_instance, struct maat_rule *p_result, int thread_seq) +static int set_xxxx_from_user_region(struct TLD_handle_t *_handle, struct tsg_log_instance_t *_instance, struct maat_rule *p_result) { cJSON *item=NULL; cJSON *object=NULL; - TLD_delete(_handle, _instance->id2field[LOG_COMMON_USER_REGION].name); - TLD_delete(_handle, _instance->id2field[LOG_COMMON_SUB_ACTION].name); - if(p_result->action!=TSG_ACTION_NONE) { struct maat_compile *compile=(struct maat_compile *)matched_rule_cites_security_compile(g_tsg_maat_feather, p_result->rule_id); @@ -1630,40 +1600,6 @@ int set_application_behavior(struct tsg_log_instance_t *_instance, struct TLD_ha return 1; } -int set_policy_action_para_exec_result(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream, struct maat_rule *p_result) -{ - struct tsg_notify_execution_result *execution_result=(struct tsg_notify_execution_result *)session_mirrored_and_capture_packets_exec_result_get(a_stream); - if(execution_result==NULL) - { - return 0; - } - - TLD_delete(_handle, _instance->id2field[LOG_COMMON_MIRRORED_PKTS].name); - TLD_delete(_handle, _instance->id2field[LOG_COMMON_MIRRORED_BYTES].name); - TLD_delete(_handle, _instance->id2field[LOG_COMMON_PACKET_CAPTURE_FILE].name); - - for(int i=0; i<execution_result->stat_mirrored_cnt; i++) - { - if(execution_result->stat_mirrored[i].compile_id==p_result->rule_id) - { - TLD_append(_handle, _instance->id2field[LOG_COMMON_MIRRORED_PKTS].name, (void *)(execution_result->stat_mirrored[i].packets), TLD_TYPE_LONG); - TLD_append(_handle, _instance->id2field[LOG_COMMON_MIRRORED_BYTES].name, (void *)(execution_result->stat_mirrored[i].bytes), TLD_TYPE_LONG); - break; - } - } - - for(int i=0; i<execution_result->capture_result_cnt; i++) - { - if(execution_result->capture_result[i].compile_id==p_result->rule_id) - { - TLD_append(_handle, _instance->id2field[LOG_COMMON_PACKET_CAPTURE_FILE].name, (void *)(execution_result->capture_result[i].packet_path), TLD_TYPE_STRING); - break; - } - } - - return 1; -} - int set_session_attributes(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream) { struct session_runtime_attribute *srt_attribute=(struct session_runtime_attribute *)session_runtime_attribute_get(a_stream); @@ -1867,24 +1803,6 @@ static int session_record_limit(struct tsg_log_instance_t *_instance, const stru return 0; } -int append_common_field(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream) -{ - TLD_append_streaminfo(_instance, _handle, a_stream); - TLD_append(_handle, _instance->id2field[LOG_COMMON_SLED_IP].name, (void *)(_instance->local_ip_str), TLD_TYPE_STRING); - if(strlen(g_tsg_para.device_sn)>0) - { - TLD_append(_handle, _instance->id2field[LOG_COMMON_DEVICE_ID].name, (void *)(g_tsg_para.device_sn), TLD_TYPE_STRING); - } - - TLD_append(_handle, _instance->id2field[LOG_COMMON_DATA_CENTER].name, (void *)tsg_data_center_get(), TLD_TYPE_STRING); - TLD_append(_handle, _instance->id2field[LOG_COMMON_DEVICE_TAG].name, (void *)tsg_device_tag_get(), TLD_TYPE_STRING); - TLD_append(_handle, _instance->id2field[LOG_COMMON_TRAFFIC_VSYSTEM_ID].name, (void *)(long)_instance->vsystem_id, TLD_TYPE_LONG); - - set_application_behavior(_instance, _handle, a_stream); - - return 1; -} - int log_common_fields_new(const char *filename, id2field_t *id2field, struct topic_stat **service2topic, int *max_service) { int i=0,flag=0; @@ -1987,221 +1905,271 @@ int log_common_fields_new(const char *filename, id2field_t *id2field, struct top return 0; } -struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile) +void tsg_enforce_rule_result_free(struct enforce_rule_result *enforce_rule, size_t n_enforce_rule) { - char override_sled_ip[32]={0}; - char kafka_errstr[1024]={0}; - unsigned int local_ip_nr=0; - rd_kafka_conf_t *rdkafka_conf = NULL; - char broker_list[1024]={0}; - struct tsg_log_instance_t *_instance=NULL; - char common_field_file[128]={0}; - char log_path[128]={0}; - - _instance=(struct tsg_log_instance_t *)calloc(1, sizeof(struct tsg_log_instance_t)); + for(size_t i=0; i<n_enforce_rule && enforce_rule!=NULL; i++) + { + if(enforce_rule[i].mail_eml_filename!=NULL) + { + free(enforce_rule[i].mail_eml_filename); + enforce_rule[i].mail_eml_filename=NULL; + } - MESA_load_profile_int_def(conffile, "TSG_LOG", "LOG_LEVEL",&(_instance->level), 30); - MESA_load_profile_string_def(conffile, "TSG_LOG", "LOG_PATH", log_path, sizeof(log_path), "./log/tsglog"); - MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_USER_REGION", &(_instance->send_user_region), 0); - MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_DATA_CENTER_SWITCH", &(_instance->send_data_center), 0); - MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_APP_ID_SWITCH", &(_instance->send_app_id), 0); - MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_INTERCEPT_LOG", &(_instance->send_intercept_log), 0); - MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_NAT_LINKINFO_SWITCH", &(_instance->send_nat_linkinfo), 0); - MESA_load_profile_int_def(conffile, "TSG_LOG", "RAPIDJSON_CHUNK_CAPACITY", &(_instance->rapidjson_chunk_capacity), 2048); + if(enforce_rule[i].http_request_filename!=NULL) + { + free(enforce_rule[i].http_request_filename); + enforce_rule[i].http_request_filename=NULL; + } - MESA_load_profile_int_def(conffile, "TSG_LOG", "VSYSTEM_ID", &(_instance->vsystem_id), 1); - MESA_load_profile_int_def(conffile, "SYSTEM","UNKNOWN_APP_ID", &_instance->unknown_app_id, 4); + if(enforce_rule[i].http_response_filename!=NULL) + { + free(enforce_rule[i].http_response_filename); + enforce_rule[i].http_response_filename=NULL; + } - MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_INTERIM_RECORD", &(_instance->send_interim_log), 1); - MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_TRANSCATION_RECORD", &(_instance->send_transcation_log), 1); - MESA_load_profile_int_def(conffile, "TSG_LOG","TCP_MIN_PKTS", &_instance->tcp_min_log_pkts, 3); - MESA_load_profile_int_def(conffile, "TSG_LOG","TCP_MIN_BYTES", &_instance->tcp_min_log_bytes, 5); - MESA_load_profile_int_def(conffile, "TSG_LOG","UDP_MIN_PKTS", &_instance->udp_min_log_pkts, 3); - MESA_load_profile_int_def(conffile, "TSG_LOG","UDP_MIN_BYTES", &_instance->udp_min_log_bytes, 5); - - _instance->logger=MESA_create_runtime_log_handle(log_path, _instance->level); - if(_instance->logger==NULL) - { - printf("MESA_create_runtime_log_handle failed ..., path: %s level: %d", log_path, _instance->level); - free(_instance); - _instance=NULL; - return NULL; + if(enforce_rule[i].packet_capture_file!=NULL) + { + free(enforce_rule[i].packet_capture_file); + enforce_rule[i].packet_capture_file=NULL; + } } - - MESA_load_profile_int_def(conffile, "TSG_LOG", "MODE",&(_instance->mode), 0); - if(_instance->mode==CLOSE) +} + +std::string tsg_rapidjson_serialize(Document *document, rapidjson::StringBuffer& sbuff) +{ + sbuff.Clear(); + rapidjson::Writer<rapidjson::StringBuffer> writer(sbuff); + document->Accept(writer); + return std::string(sbuff.GetString()); +} + +void *tsg_log_ringbuf_consume(void *arg) +{ + struct logger_ringbuf_schema *ringbuf=(struct logger_ringbuf_schema *)arg; + rapidjson::StringBuffer sbuff(0, 1024*16); + + while(1) { - MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG, "Disable tsg_send_log"); - return _instance; - } - - MESA_load_profile_int_def(conffile, "TSG_LOG", "RECOVERY_INTERVEL_S", &(_instance->recovery_interval), 30); + size_t offset=0; + size_t scratch_sz=ringbuf_consume(ringbuf->ring, &offset); + if(scratch_sz==0) + { + tsg_stat_sendlog_update(ringbuf->consume_row_idx, LOG_STATUS_FAIL, 1); + continue; + } + + struct logger_scratch *scratch=(struct logger_scratch *)(ringbuf->buff+offset); - MESA_load_profile_string_def(conffile, "TSG_LOG", "COMMON_FIELD_FILE", common_field_file, sizeof(common_field_file), NULL); - MESA_load_profile_string_def(conffile, "TSG_LOG", "BROKER_LIST", broker_list, sizeof(broker_list), NULL); - MESA_load_profile_string_def(conffile, "TSG_LOG", "SASL_USERNAME", _instance->sasl_username, sizeof(_instance->sasl_username), ""); //admin - MESA_load_profile_string_def(conffile, "TSG_LOG", "SASL_PASSWD", _instance->sasl_passwd, sizeof(_instance->sasl_passwd), ""); - MESA_load_profile_string_def(conffile, "TSG_LOG", "COMPRESSION_TYPE", _instance->compression, sizeof(_instance->compression), ""); //snappy + TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_VSYSTEM_ID].name); - MESA_load_profile_string_def(conffile, "TSG_LOG", "SEND_QUEUE_MAX_MESSAGE", _instance->send_queue_max_msg, sizeof(_instance->send_queue_max_msg), "1000000"); - MESA_load_profile_string_def(conffile, "TSG_LOG", "REFRESH_INTERVAL_MS", _instance->refresh_interval_ms, sizeof(_instance->refresh_interval_ms), "600000"); - MESA_load_profile_string_def(conffile, "TSG_LOG", "REQUIRE_ACK", _instance->require_ack, sizeof(_instance->require_ack), "1"); + for(int i=0; i<scratch->n_enforce_rule && scratch->enforce_rule!=NULL; i++) + { + TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_POLICY_ID].name, (void *)(long)(scratch->enforce_rule[i].rule.rule_id), TLD_TYPE_LONG); + TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_SERVICE].name, (void *)(long)(scratch->enforce_rule[i].rule.service_id), TLD_TYPE_LONG); + TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_VSYSTEM_ID].name, (void *)(long)scratch->enforce_rule[i].rule.vsys_id, TLD_TYPE_LONG); + TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_ACTION].name, (void *)(long)(scratch->enforce_rule[i].rule.action), TLD_TYPE_LONG); - MESA_load_profile_string_def(conffile, "SYSTEM", "TCP_LABEL", _instance->tcp_label, sizeof(_instance->tcp_label), "tcp_flow_stat"); - MESA_load_profile_string_def(conffile, "SYSTEM", "UDP_LABEL", _instance->udp_label, sizeof(_instance->udp_label), "udp_flow_stat"); + if(scratch->enforce_rule[i].mirrored_packets>0) + { + TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_MIRRORED_PKTS].name, (void *)(scratch->enforce_rule[i].mirrored_packets), TLD_TYPE_LONG); + } - _instance->tcp_flow_project_id=project_customer_register(_instance->tcp_label, "struct"); - _instance->udp_flow_project_id=project_customer_register(_instance->udp_label, "struct"); - if(_instance->tcp_flow_project_id<0 || _instance->udp_flow_project_id<0) - { - MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG, - "project_customer_register is error, tcp_label: %s udp_label: %s, please check etc/project.conf", - _instance->tcp_label, - _instance->udp_label - ); - } + if(scratch->enforce_rule[i].mirrored_bytes>0) + { + TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_MIRRORED_BYTES].name, (void *)(scratch->enforce_rule[i].mirrored_bytes), TLD_TYPE_LONG); + } - MESA_load_profile_string_def(conffile, "SYSTEM", "OVERRIDE_SLED_IP", override_sled_ip, sizeof(override_sled_ip), "OVERRIDE_SLED_IP"); - char *sled_ip=getenv(override_sled_ip); - if(sled_ip==NULL) - { - char nic_name[32]={0}; - MESA_load_profile_string_def(conffile, "SYSTEM", "NIC_NAME", nic_name, sizeof(nic_name), "lo"); - int ret=MESA_get_dev_ipv4(nic_name, (int *)&local_ip_nr); - if(ret<0) + TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_MAIL_EML_FILE].name, (void *)(scratch->enforce_rule[i].mail_eml_filename), TLD_TYPE_STRING); + TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_HTTP_REQUEST_S3_FILE].name, (void *)(scratch->enforce_rule[i].http_request_filename), TLD_TYPE_STRING); + TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_HTTP_RESPONSE_S3_FILE].name, (void *)(scratch->enforce_rule[i].http_response_filename), TLD_TYPE_STRING); + TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_PACKET_CAPTURE_FILE].name, (void *)(scratch->enforce_rule[i].packet_capture_file), TLD_TYPE_STRING); + + set_xxxx_from_user_region(scratch->handle, scratch->instance, &(scratch->enforce_rule[i].rule)); + + std::string sbuff_str=tsg_rapidjson_serialize(scratch->handle->document, sbuff); + tsg_send_payload(scratch->instance, scratch->enforce_rule[i].log_type, (char *)sbuff_str.c_str(), sbuff_str.length(), scratch->thread_id); + + TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_POLICY_ID].name); + TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_SERVICE].name); + TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_ACTION].name); + TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_USER_REGION].name); + TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_VSYSTEM_ID].name); + + TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_MIRRORED_PKTS].name); + TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_MIRRORED_BYTES].name); + + TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_MAIL_EML_FILE].name); + TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_HTTP_REQUEST_S3_FILE].name); + TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_HTTP_RESPONSE_S3_FILE].name); + TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_PACKET_CAPTURE_FILE].name); + + TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_USER_REGION].name); + TLD_delete(scratch->handle, scratch->instance->id2field[LOG_COMMON_SUB_ACTION].name); + + fs3_rule_stat_update(RULE_STAT_SEND, (int)(scratch->enforce_rule[i].rule.action), 1); + } + + if(scratch->log_type!=LOG_TYPE_MAX) { - MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG, - "GET_LOCAL_IP MESA_get_dev_ipv4 is error, nic_name: %s, please check tsgconf/main.conf", - nic_name - ); - return NULL; + TLD_append(scratch->handle, scratch->instance->id2field[LOG_COMMON_VSYSTEM_ID].name, (void *)(long)scratch->instance->vsystem_id, TLD_TYPE_LONG); + std::string sbuff_str=tsg_rapidjson_serialize(scratch->handle->document, sbuff); + tsg_send_payload(scratch->instance, scratch->log_type, (char *)sbuff_str.c_str(), sbuff_str.length(), scratch->thread_id); } - inet_ntop(AF_INET,&(local_ip_nr),_instance->local_ip_str,sizeof(_instance->local_ip_str)); - } - else - { - memcpy(_instance->local_ip_str, sled_ip, MIN(sizeof(_instance->local_ip_str)-1, strlen(sled_ip))); - } - rdkafka_conf = rd_kafka_conf_new(); - rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", _instance->send_queue_max_msg, kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", _instance->refresh_interval_ms, kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "request.required.acks", _instance->require_ack, kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", broker_list, kafka_errstr, sizeof(kafka_errstr)); - - if(strlen(_instance->compression)>0) - { - rd_kafka_conf_set(rdkafka_conf, "compression.codec", _instance->compression, kafka_errstr, sizeof(kafka_errstr)); - } + tsg_stat_sendlog_update(ringbuf->consume_row_idx, LOG_STATUS_SUCCESS, 1); - if(strlen(_instance->sasl_username)> 0 && strlen(_instance->sasl_passwd)>0) - { - rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "sasl.username", _instance->sasl_username, kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "sasl.password", _instance->sasl_passwd, kafka_errstr, sizeof(kafka_errstr)); + TLD_cancel(scratch->handle); + if(scratch->enforce_rule!=NULL) + { + tsg_enforce_rule_result_free(scratch->enforce_rule, scratch->n_enforce_rule); + free(scratch->enforce_rule); + } + + memset(scratch, 0, sizeof(struct logger_scratch)); + ringbuf_release(ringbuf->ring, sizeof(struct logger_scratch)); } - - if(!(_instance->kafka_handle=rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)))) + + pthread_exit(NULL); +} + +void tsg_log_ringbuf_produce(struct tsg_log_instance_t *instance, struct TLD_handle_t *handle, enum LOG_TYPE log_type, struct enforce_rule_result *enforce_rule, size_t n_enforce_rule, int thread_id) +{ + int i=(thread_id%instance->ringbuf_num); + struct logger_ringbuf_schema *ringbuf=&(instance->ringbuf[i]); + ssize_t offset=ringbuf_acquire(ringbuf->ring, ringbuf->worker[thread_id], sizeof(struct logger_scratch)); + if(offset==-1) { - MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG, "KAFKA_INIT rd_kafka_new is error"); - return NULL; + TLD_cancel(handle); + tsg_enforce_rule_result_free(enforce_rule, n_enforce_rule); + tsg_stat_sendlog_update(ringbuf->produce_row_idx, LOG_STATUS_FAIL, 1); + return; } - log_common_fields_new(common_field_file, _instance->id2field, &(_instance->service2topic), &(_instance->max_service)); + struct logger_scratch *scratch=(struct logger_scratch *)(ringbuf->buff+offset); + scratch->handle=handle; + scratch->instance=instance; + scratch->log_type=log_type; + scratch->thread_id=thread_id; - if(_instance->service2topic!=NULL) + if(enforce_rule!=NULL && n_enforce_rule>0) { - _instance->sum_stat_row_id=tsg_stat_sendlog_row_init("sum"); - for(int i=0; i<_instance->max_service; i++) - { - if(_instance->service2topic[i].type==TLD_TYPE_MAX && strlen(_instance->service2topic[i].name)>0) - { - register_topic(_instance, &( _instance->service2topic[i])); - } - } + scratch->n_enforce_rule=MIN(n_enforce_rule, LOGGER_SCRATCH_RULE_MAX); + scratch->enforce_rule=(struct enforce_rule_result *)calloc(scratch->n_enforce_rule, sizeof(struct enforce_rule_result)); + memcpy(scratch->enforce_rule, enforce_rule, sizeof(struct enforce_rule_result)*scratch->n_enforce_rule); } else { - MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG, "KAFKA_INIT log_common_fields_new is error, please check %s", common_field_file); + scratch->n_enforce_rule=0; + scratch->enforce_rule=NULL; } - return _instance; + ringbuf_produce(ringbuf->ring, ringbuf->worker[thread_id]); + tsg_stat_sendlog_update(ringbuf->produce_row_idx, LOG_STATUS_SUCCESS, 1); } -void tsg_sendlog_destroy(struct tsg_log_instance_t * instance) + +void tsg_rule_log_enable_additional_set(const struct streaminfo *a_stream, struct enforce_rule_result *enforce_rule, size_t n_enforce_rule) { - if(instance==NULL) - { - return ; - } - - if(instance->mode!=CLOSE) + struct tsg_notify_execution_result *execution_result=(struct tsg_notify_execution_result *)session_mirrored_and_capture_packets_exec_result_get(a_stream); + + for(size_t i=0;i<n_enforce_rule; i++) { - for(int i=0; i<instance->max_service; i++) + if(enforce_rule[i].rule.do_log==LOG_ABORT) { - if(instance->service2topic[i].type!=TLD_TYPE_MAX || i==1) //i=1 equal i=0, service id of security event is 0 and 1 - { - continue; - } - - if(instance->service2topic[i].topic_rkt!=NULL) - { - rd_kafka_topic_destroy(instance->service2topic[i].topic_rkt); - } + continue; + } - if(instance->service2topic[i].drop_start!=NULL) + if(execution_result!=NULL) + { + for(int j=0; j<execution_result->stat_mirrored_cnt; j++) { - free(instance->service2topic[i].drop_start); - instance->service2topic[i].drop_start=NULL; + if(execution_result->stat_mirrored[j].compile_id==enforce_rule[i].rule.rule_id) + { + enforce_rule[i].mirrored_bytes=execution_result->stat_mirrored[j].bytes; + enforce_rule[i].mirrored_packets=execution_result->stat_mirrored[j].packets; + break; + } } - if(instance->service2topic[i].send_log_percent!=NULL) - { - free(instance->service2topic[i].send_log_percent); - instance->service2topic[i].send_log_percent=NULL; + for(int j=0; j<execution_result->capture_result_cnt; j++) + { + if(execution_result->capture_result[j].compile_id!=enforce_rule[i].rule.rule_id) + { + continue; + } + + if(execution_result->capture_result[j].packet_path!=NULL) + { + enforce_rule[i].packet_capture_file=tsg_string_dup(execution_result->capture_result[j].packet_path); + } + + break; } } - - //rd_kafka_destroy_flags(instance->kafka_handle, 4); - rd_kafka_destroy(instance->kafka_handle); - - free(instance->service2topic); - instance->service2topic=NULL; - } - - MESA_destroy_runtime_log_handle(instance->logger); - instance->logger=NULL; + if(enforce_rule[i].rule.do_log==LOG_NOFILE) + { + continue; + } - free(instance); - instance=NULL; - return ; + struct business_notify_data *bnd_label=(struct business_notify_data *)session_business_data_get(a_stream); + if(bnd_label!=NULL && bnd_label->pdata!=NULL && bnd_label->proto==PROTO_HTTP) + { + enforce_rule[i].http_request_filename=tsg_string_dup(bnd_label->s3_http->request_filename); + enforce_rule[i].http_response_filename=tsg_string_dup(bnd_label->s3_http->response_filename); + continue; + } + + struct tsg_conn_sketch_notify_data *notify_mail=(struct tsg_conn_sketch_notify_data *)session_conn_sketch_notify_data_get(a_stream); + if(notify_mail!=NULL && notify_mail->pdata.mail_eml_filename!=NULL && notify_mail->protocol==PROTO_MAIL) + { + enforce_rule[i].mail_eml_filename=tsg_string_dup(notify_mail->pdata.mail_eml_filename); + } + } } -int send_log_by_type(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream, LOG_TYPE log_type, int thread_id) +void tsg_rule_log_enable_type_set(struct enforce_rule_result *enforce_rule, size_t n_enforce_rule, enum LOG_TYPE log_type) { - int ret=update_percent(_instance, log_type, LOG_STATUS_DROP, thread_id); - if(ret==1) + for(size_t i=0;i<n_enforce_rule; i++) { - MASTER_LOG(_instance->logger, RLOG_LV_DEBUG, LOG_MODULE_SENDLOG, - "drop log: log_type=%d send_log_percent: %d addr=%s", - log_type, - _instance->service2topic[log_type].send_log_percent[thread_id], - (a_stream==NULL ? "" : printaddr(&(a_stream->addr), thread_id)) - ); + enforce_rule[i].log_type=log_type; } +} + +void tsg_rule_log_enable_select(struct maat_rule *rules, size_t n_rules, struct enforce_rule_result *enforce_rule, size_t n_enforce_rule, size_t *enforce_rule_offset) +{ + int repeat_rule=0; + for(size_t i=0;i<n_rules; i++) + { + if(rules[i].do_log==LOG_ABORT) + { + continue; + } - StringBuffer sb(0, _instance->rapidjson_chunk_capacity); - Writer<StringBuffer> writer(sb); - _handle->document->Accept(writer); + for(size_t j=0; j<(*enforce_rule_offset); j++) + { + if(enforce_rule[j].rule.rule_id==rules[i].rule_id) + { + repeat_rule=1; + continue; + } + } - tsg_send_payload(_instance, log_type, (char *)sb.GetString(), sb.GetSize(), thread_id); - - return 0; + if(repeat_rule==1) + { + repeat_rule=0; + continue; + } + + if(n_enforce_rule < (*enforce_rule_offset)+1) + { + break; + } + + enforce_rule[(*enforce_rule_offset)++].rule=rules[i]; + } } -int send_event_log(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream, LOG_TYPE log_type, struct maat_rule *rules, size_t n_rules, int thread_id) +void tsg_hits_rule_metrics_set(const struct streaminfo *a_stream, struct maat_rule *rules, size_t n_rules, int thread_id) { int repeat_cnt=0; int policy_id[MAX_RESULT_NUM]={0}; @@ -2210,12 +2178,6 @@ int send_event_log(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_h { if(is_multi_hit_same_policy(&(rules[i]), policy_id, &repeat_cnt)) { - MASTER_LOG(_instance->logger, RLOG_LV_DEBUG, LOG_MODULE_SENDLOG, - "tsg same log:cfg_id=%d service=%d addr=%s", - rules[i].rule_id, - rules[i].service_id, - (a_stream==NULL ? "" : printaddr(&(a_stream->addr), thread_id)) - ); continue; } @@ -2223,86 +2185,26 @@ int send_event_log(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_h { tsg_set_policy_flow(a_stream, &(rules[i]), thread_id); } - - switch(rules[i].do_log) - { - case LOG_ABORT: - MASTER_LOG(_instance->logger, RLOG_LV_DEBUG, LOG_MODULE_SENDLOG, - "tsg abort log:cfg_id=%d service=%d addr=%s", - rules[i].rule_id, - rules[i].service_id, - (a_stream==NULL ? "" : printaddr(&(a_stream->addr), thread_id)) - ); - - fs3_rule_stat_update(RULE_STAT_ABORT, (int)rules[i].action, 1); - continue; - break; - case LOG_ALL: - if(rules[i].action==TSG_ACTION_MONITOR) - { - set_s3_filename(_instance, _handle, a_stream); - set_mail_eml(_instance, _handle, a_stream); - } - break; - case LOG_NOFILE: - if(rules[i].action==TSG_ACTION_MONITOR) - { - TLD_delete(_handle, _instance->id2field[LOG_COMMON_MAIL_EML_FILE].name); - TLD_delete(_handle, _instance->id2field[LOG_COMMON_HTTP_REQUEST_S3_FILE].name); - TLD_delete(_handle, _instance->id2field[LOG_COMMON_HTTP_RESPONSE_S3_FILE].name); - } - break; - default: - break; - } - - TLD_append(_handle, _instance->id2field[LOG_COMMON_POLICY_ID].name, (void *)(long)(rules[i].rule_id), TLD_TYPE_LONG); - TLD_append(_handle, _instance->id2field[LOG_COMMON_SERVICE].name, (void *)(long)(rules[i].service_id), TLD_TYPE_LONG); - TLD_append(_handle, _instance->id2field[LOG_COMMON_VSYSTEM_ID].name, (void *)(long)rules[i].vsys_id, TLD_TYPE_LONG); - TLD_append(_handle, _instance->id2field[LOG_COMMON_ACTION].name, (void *)(long)(rules[i].action), TLD_TYPE_LONG); - - set_policy_action_para_exec_result(_instance, _handle, a_stream, &(rules[i])); - - set_xxxx_from_user_region(_handle, _instance, &(rules[i]), thread_id); - - send_log_by_type(_instance, _handle, a_stream, log_type, thread_id); - - TLD_delete(_handle, _instance->id2field[LOG_COMMON_POLICY_ID].name); - TLD_delete(_handle, _instance->id2field[LOG_COMMON_SERVICE].name); - TLD_delete(_handle, _instance->id2field[LOG_COMMON_ACTION].name); - TLD_delete(_handle, _instance->id2field[LOG_COMMON_USER_REGION].name); - TLD_delete(_handle, _instance->id2field[LOG_COMMON_VSYSTEM_ID].name); - - fs3_rule_stat_update(RULE_STAT_SEND, (int)rules[i].action, 1); } - - return 0; } -int deal_event_rules(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream, LOG_TYPE log_type, TSG_SERVICE service, int *is_append_common_field, int thread_id) +int tsg_enforce_rule_append(const struct streaminfo *a_stream, enum TSG_SERVICE service, struct enforce_rule_result *enforce_rule, size_t n_enforce_rule) { - struct matched_policy_rules *matched_rules=(struct matched_policy_rules *)session_matched_rules_get(a_stream, service); - if(matched_rules==NULL || matched_rules->n_rules==0) + size_t offset=0; + struct matched_policy_rules *matched_policy=(struct matched_policy_rules *)session_matched_rules_get(a_stream, service); + if(matched_policy!=NULL && matched_policy->n_rules>0) { - return 0; - } + tsg_hits_rule_metrics_set(a_stream, matched_policy->rules, matched_policy->n_rules, a_stream->threadnum); + size_t n_rules=MIN(n_enforce_rule, matched_policy->n_rules); - if((*is_append_common_field)==0) - { - (*is_append_common_field)=1; - append_common_field(_instance, _handle, a_stream); - } - - if(service==TSG_SERVICE_INTERCEPT) - { - set_intercept_info(_instance, _handle, a_stream); + memset(enforce_rule, 0, sizeof(struct enforce_rule_result)*n_rules); + tsg_rule_log_enable_select(matched_policy->rules, n_rules, enforce_rule, n_enforce_rule, &offset); + + session_matched_rules_free(a_stream, service, (void *)matched_policy); + session_matched_rules_async(a_stream, service, NULL); } - - send_event_log(_instance, _handle, a_stream, log_type, matched_rules->rules, matched_rules->n_rules, thread_id); - session_matched_rules_free(a_stream, service, (void *)matched_rules); - session_matched_rules_async(a_stream, service, NULL); - return 1; + return offset; } int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handle, const struct streaminfo *a_stream, enum LOG_TYPE log_type, struct maat_rule *rules, size_t n_rules, int thread_id) @@ -2336,60 +2238,87 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl return 0; } - int is_append_common_field=0; + TLD_append_streaminfo(_instance, _handle, a_stream); + TLD_append(_handle, _instance->id2field[LOG_COMMON_SLED_IP].name, (void *)(_instance->local_ip_str), TLD_TYPE_STRING); + if(strlen(g_tsg_para.device_sn)>0) + { + TLD_append(_handle, _instance->id2field[LOG_COMMON_DEVICE_ID].name, (void *)(g_tsg_para.device_sn), TLD_TYPE_STRING); + } + TLD_append(_handle, _instance->id2field[LOG_COMMON_DATA_CENTER].name, (void *)tsg_data_center_get(), TLD_TYPE_STRING); + TLD_append(_handle, _instance->id2field[LOG_COMMON_DEVICE_TAG].name, (void *)tsg_device_tag_get(), TLD_TYPE_STRING); + TLD_append(_handle, _instance->id2field[LOG_COMMON_TRAFFIC_VSYSTEM_ID].name, (void *)(long)_instance->vsystem_id, TLD_TYPE_LONG); + + set_application_behavior(_instance, _handle, a_stream); + + size_t enforce_rule_offset=0; + size_t n_enforce_rule=LOGGER_SCRATCH_RULE_MAX; + struct enforce_rule_result enforce_rule[n_enforce_rule]; + switch(log_type) { case LOG_TYPE_SECURITY_EVENT: - append_common_field(instance, handle, a_stream); - send_event_log(_instance, _handle, a_stream, LOG_TYPE_SECURITY_EVENT, rules, n_rules, thread_id); - break; case LOG_TYPE_INTERCEPT_EVENT: - append_common_field(instance, handle, a_stream); - send_event_log(_instance, _handle, a_stream, LOG_TYPE_INTERCEPT_EVENT, rules, n_rules, thread_id); + memset(enforce_rule, 0, sizeof(struct enforce_rule_result)*n_enforce_rule); + tsg_hits_rule_metrics_set(a_stream, rules, n_rules, thread_id); + tsg_rule_log_enable_select(rules, n_rules, enforce_rule, n_enforce_rule, &enforce_rule_offset); + tsg_rule_log_enable_additional_set(a_stream, enforce_rule, enforce_rule_offset); + tsg_rule_log_enable_type_set(enforce_rule, enforce_rule_offset, log_type); + tsg_log_ringbuf_produce(_instance, _handle, log_type, enforce_rule, enforce_rule_offset, thread_id); break; case LOG_TYPE_SESSION_RECORD: if(_instance->send_intercept_log==1) { - deal_event_rules(_instance, _handle, a_stream, LOG_TYPE_INTERCEPT_EVENT, TSG_SERVICE_INTERCEPT, &is_append_common_field, thread_id); + size_t offset=tsg_enforce_rule_append(a_stream, TSG_SERVICE_INTERCEPT, enforce_rule, n_enforce_rule); + if(offset>0) + { + set_intercept_info(_instance, _handle, a_stream); + tsg_rule_log_enable_type_set(enforce_rule, offset, LOG_TYPE_INTERCEPT_EVENT); + enforce_rule_offset+=offset; + } } // no break; case LOG_TYPE_TRANSACTION_RECORD: - deal_event_rules(_instance, _handle, a_stream, LOG_TYPE_SECURITY_EVENT, TSG_SERVICE_SECURITY, &is_append_common_field, thread_id); + { + size_t offset=tsg_enforce_rule_append(a_stream, TSG_SERVICE_SECURITY, enforce_rule+enforce_rule_offset, n_enforce_rule-enforce_rule_offset); + if(offset>0) + { + tsg_rule_log_enable_type_set(enforce_rule+enforce_rule_offset, offset, LOG_TYPE_SECURITY_EVENT); + tsg_rule_log_enable_additional_set(a_stream, enforce_rule+enforce_rule_offset, offset); + enforce_rule_offset+=offset; + } + } // no break; case LOG_TYPE_INTERIM_SESSION_RECORD: if(session_record_limit(_instance, a_stream, log_type)) { + if(enforce_rule_offset>0) + { + tsg_log_ringbuf_produce(_instance, _handle, LOG_TYPE_MAX, enforce_rule, enforce_rule_offset, thread_id); + } + else + { + TLD_cancel(_handle); + } break; } - if(is_append_common_field==0) - { - append_common_field(_instance, _handle, a_stream); - } - - TLD_append(_handle, _instance->id2field[LOG_COMMON_VSYSTEM_ID].name, (void *)(long)_instance->vsystem_id, TLD_TYPE_LONG); - set_sce_profile_ids(_instance, _handle, a_stream); set_shaping_profile_ids(_instance, _handle, a_stream); set_shaping_rule_ids(_instance, _handle, a_stream); set_nat_linkinfo(_instance, _handle, a_stream); - send_log_by_type(_instance, _handle, a_stream, log_type, thread_id); + tsg_log_ringbuf_produce(_instance, _handle, log_type, enforce_rule, enforce_rule_offset, thread_id); break; case LOG_TYPE_BGP_RECORD: case LOG_TYPE_VOIP_RECORD: case LOG_TYPE_GTPC_RECORD: case LOG_TYPE_INTERNAL_RTP_RECORD: - append_common_field(_instance, _handle, a_stream); - TLD_append(_handle, _instance->id2field[LOG_COMMON_VSYSTEM_ID].name, (void *)(long)_instance->vsystem_id, TLD_TYPE_LONG); - send_log_by_type(_instance, _handle, a_stream, log_type, thread_id); + tsg_log_ringbuf_produce(_instance, _handle, log_type, NULL, 0, thread_id); break; default: - return 0; + break; } - TLD_cancel(handle); - return 0; } @@ -2468,3 +2397,231 @@ int tsg_unknown_app_id_get(struct tsg_log_instance_t *instance) return 0; } + +struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile) +{ + char override_sled_ip[32]={0}; + char kafka_errstr[1024]={0}; + unsigned int local_ip_nr=0; + rd_kafka_conf_t *rdkafka_conf = NULL; + char broker_list[1024]={0}; + struct tsg_log_instance_t *_instance=NULL; + char common_field_file[128]={0}; + char log_path[128]={0}; + + _instance=(struct tsg_log_instance_t *)calloc(1, sizeof(struct tsg_log_instance_t)); + + MESA_load_profile_int_def(conffile, "TSG_LOG", "LOG_LEVEL",&(_instance->level), 30); + MESA_load_profile_string_def(conffile, "TSG_LOG", "LOG_PATH", log_path, sizeof(log_path), "./log/tsglog"); + MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_USER_REGION", &(_instance->send_user_region), 0); + MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_DATA_CENTER_SWITCH", &(_instance->send_data_center), 0); + MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_APP_ID_SWITCH", &(_instance->send_app_id), 0); + MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_INTERCEPT_LOG", &(_instance->send_intercept_log), 0); + MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_NAT_LINKINFO_SWITCH", &(_instance->send_nat_linkinfo), 0); + MESA_load_profile_int_def(conffile, "TSG_LOG", "RAPIDJSON_CHUNK_CAPACITY", &(_instance->rapidjson_chunk_capacity), 2048); + + MESA_load_profile_int_def(conffile, "TSG_LOG", "VSYSTEM_ID", &(_instance->vsystem_id), 1); + MESA_load_profile_int_def(conffile, "SYSTEM","UNKNOWN_APP_ID", &_instance->unknown_app_id, 4); + + MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_INTERIM_RECORD", &(_instance->send_interim_log), 1); + MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_TRANSCATION_RECORD", &(_instance->send_transcation_log), 1); + MESA_load_profile_int_def(conffile, "TSG_LOG","TCP_MIN_PKTS", &_instance->tcp_min_log_pkts, 3); + MESA_load_profile_int_def(conffile, "TSG_LOG","TCP_MIN_BYTES", &_instance->tcp_min_log_bytes, 5); + MESA_load_profile_int_def(conffile, "TSG_LOG","UDP_MIN_PKTS", &_instance->udp_min_log_pkts, 3); + MESA_load_profile_int_def(conffile, "TSG_LOG","UDP_MIN_BYTES", &_instance->udp_min_log_bytes, 5); + + _instance->logger=MESA_create_runtime_log_handle(log_path, _instance->level); + if(_instance->logger==NULL) + { + printf("MESA_create_runtime_log_handle failed ..., path: %s level: %d", log_path, _instance->level); + free(_instance); + _instance=NULL; + return NULL; + } + + MESA_load_profile_int_def(conffile, "TSG_LOG", "MODE",&(_instance->mode), 0); + if(_instance->mode==CLOSE) + { + MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG, "Disable tsg_send_log"); + return _instance; + } + + MESA_load_profile_int_def(conffile, "TSG_LOG", "RECOVERY_INTERVEL_S", &(_instance->recovery_interval), 30); + + MESA_load_profile_string_def(conffile, "TSG_LOG", "COMMON_FIELD_FILE", common_field_file, sizeof(common_field_file), NULL); + MESA_load_profile_string_def(conffile, "TSG_LOG", "BROKER_LIST", broker_list, sizeof(broker_list), NULL); + MESA_load_profile_string_def(conffile, "TSG_LOG", "SASL_USERNAME", _instance->sasl_username, sizeof(_instance->sasl_username), ""); //admin + MESA_load_profile_string_def(conffile, "TSG_LOG", "SASL_PASSWD", _instance->sasl_passwd, sizeof(_instance->sasl_passwd), ""); + MESA_load_profile_string_def(conffile, "TSG_LOG", "COMPRESSION_TYPE", _instance->compression, sizeof(_instance->compression), ""); //snappy + + MESA_load_profile_string_def(conffile, "TSG_LOG", "SEND_QUEUE_MAX_MESSAGE", _instance->send_queue_max_msg, sizeof(_instance->send_queue_max_msg), "1000000"); + MESA_load_profile_string_def(conffile, "TSG_LOG", "REFRESH_INTERVAL_MS", _instance->refresh_interval_ms, sizeof(_instance->refresh_interval_ms), "600000"); + MESA_load_profile_string_def(conffile, "TSG_LOG", "REQUIRE_ACK", _instance->require_ack, sizeof(_instance->require_ack), "1"); + + MESA_load_profile_string_def(conffile, "SYSTEM", "TCP_LABEL", _instance->tcp_label, sizeof(_instance->tcp_label), "tcp_flow_stat"); + MESA_load_profile_string_def(conffile, "SYSTEM", "UDP_LABEL", _instance->udp_label, sizeof(_instance->udp_label), "udp_flow_stat"); + + _instance->tcp_flow_project_id=project_customer_register(_instance->tcp_label, "struct"); + _instance->udp_flow_project_id=project_customer_register(_instance->udp_label, "struct"); + if(_instance->tcp_flow_project_id<0 || _instance->udp_flow_project_id<0) + { + MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG, + "project_customer_register is error, tcp_label: %s udp_label: %s, please check etc/project.conf", + _instance->tcp_label, + _instance->udp_label + ); + } + + MESA_load_profile_string_def(conffile, "SYSTEM", "OVERRIDE_SLED_IP", override_sled_ip, sizeof(override_sled_ip), "OVERRIDE_SLED_IP"); + char *sled_ip=getenv(override_sled_ip); + if(sled_ip==NULL) + { + char nic_name[32]={0}; + MESA_load_profile_string_def(conffile, "SYSTEM", "NIC_NAME", nic_name, sizeof(nic_name), "lo"); + int ret=MESA_get_dev_ipv4(nic_name, (int *)&local_ip_nr); + if(ret<0) + { + MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG, + "GET_LOCAL_IP MESA_get_dev_ipv4 is error, nic_name: %s, please check tsgconf/main.conf", + nic_name + ); + return NULL; + } + inet_ntop(AF_INET,&(local_ip_nr),_instance->local_ip_str,sizeof(_instance->local_ip_str)); + } + else + { + memcpy(_instance->local_ip_str, sled_ip, MIN(sizeof(_instance->local_ip_str)-1, strlen(sled_ip))); + } + + rdkafka_conf = rd_kafka_conf_new(); + rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", _instance->send_queue_max_msg, kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", _instance->refresh_interval_ms, kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "request.required.acks", _instance->require_ack, kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", broker_list, kafka_errstr, sizeof(kafka_errstr)); + + if(strlen(_instance->compression)>0) + { + rd_kafka_conf_set(rdkafka_conf, "compression.codec", _instance->compression, kafka_errstr, sizeof(kafka_errstr)); + } + + if(strlen(_instance->sasl_username)> 0 && strlen(_instance->sasl_passwd)>0) + { + rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "sasl.username", _instance->sasl_username, kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "sasl.password", _instance->sasl_passwd, kafka_errstr, sizeof(kafka_errstr)); + } + + if(!(_instance->kafka_handle=rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)))) + { + MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG, "KAFKA_INIT rd_kafka_new is error"); + return NULL; + } + + log_common_fields_new(common_field_file, _instance->id2field, &(_instance->service2topic), &(_instance->max_service)); + + if(_instance->service2topic!=NULL) + { + _instance->sum_stat_row_id=tsg_stat_sendlog_row_init("sum"); + for(int i=0; i<_instance->max_service; i++) + { + if(_instance->service2topic[i].type==TLD_TYPE_MAX && strlen(_instance->service2topic[i].name)>0) + { + register_topic(_instance, &( _instance->service2topic[i])); + } + } + } + else + { + MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG, "KAFKA_INIT log_common_fields_new is error, please check %s", common_field_file); + } + + int32_t ringbuf_size=0; + MESA_load_profile_int_def(conffile, "TSG_LOG", "RINGBUFF_SIZE", &(ringbuf_size), 100000); + MESA_load_profile_int_def(conffile, "TSG_LOG", "RINGBUFF_NUM", &(_instance->ringbuf_num), 1); + + _instance->ringbuf=(struct logger_ringbuf_schema *)calloc(1, _instance->ringbuf_num * sizeof(struct logger_ringbuf_schema)); + + for(int i=0; i<_instance->ringbuf_num; i++) + { + struct logger_ringbuf_schema *ringbuf=&(_instance->ringbuf[i]); + + int thread_num=get_thread_count(); + size_t ringbuf_obj_size=0; + ringbuf_get_sizes(thread_num, &ringbuf_obj_size, NULL); + ringbuf->ring=(ringbuf_t *)calloc(1, ringbuf_obj_size); + + ringbuf->size=ringbuf_size; + ringbuf->buff=(char *)calloc(1, sizeof(struct logger_scratch)*ringbuf->size); + ringbuf_setup(ringbuf->ring, thread_num, sizeof(struct logger_scratch)*ringbuf->size); + ringbuf->worker=(ringbuf_worker_t **)calloc(1, sizeof(ringbuf_worker_t *)*thread_num); + for(int j=0; j<thread_num; j++) + { + ringbuf->worker[j]=ringbuf_register(ringbuf->ring, j); + } + + char ringbuf_name[32]={0}; + + snprintf(ringbuf_name, sizeof(ringbuf_name), "ringbuf_produce_%d", i); + ringbuf->produce_row_idx=tsg_stat_sendlog_row_init(ringbuf_name); + snprintf(ringbuf_name, sizeof(ringbuf_name), "ringbuf_consume_%d", i); + ringbuf->consume_row_idx=tsg_stat_sendlog_row_init(ringbuf_name); + + pthread_create(&(ringbuf->pthread_id), NULL, tsg_log_ringbuf_consume, (void *)ringbuf); + pthread_setname_np(ringbuf->pthread_id, "SNEDLOG_RINGBUF"); + } + + + return _instance; +} +void tsg_sendlog_destroy(struct tsg_log_instance_t * instance) +{ + if(instance==NULL) + { + return ; + } + + if(instance->mode!=CLOSE) + { + for(int i=0; i<instance->max_service; i++) + { + if(instance->service2topic[i].type!=TLD_TYPE_MAX || i==1) //i=1 equal i=0, service id of security event is 0 and 1 + { + continue; + } + + if(instance->service2topic[i].topic_rkt!=NULL) + { + rd_kafka_topic_destroy(instance->service2topic[i].topic_rkt); + } + + if(instance->service2topic[i].drop_start!=NULL) + { + free(instance->service2topic[i].drop_start); + instance->service2topic[i].drop_start=NULL; + } + + if(instance->service2topic[i].send_log_percent!=NULL) + { + free(instance->service2topic[i].send_log_percent); + instance->service2topic[i].send_log_percent=NULL; + } + } + + //rd_kafka_destroy_flags(instance->kafka_handle, 4); + rd_kafka_destroy(instance->kafka_handle); + + free(instance->service2topic); + instance->service2topic=NULL; + } + + + MESA_destroy_runtime_log_handle(instance->logger); + instance->logger=NULL; + + free(instance); + instance=NULL; + return ; +} diff --git a/src/tsg_send_log_internal.h b/src/tsg_send_log_internal.h index 25a9c89..51e8b91 100644 --- a/src/tsg_send_log_internal.h +++ b/src/tsg_send_log_internal.h @@ -5,6 +5,7 @@ #include <librdkafka/rdkafka.h> #include <MESA/cJSON.h> #include <time.h> +#include "ringbuf/ringbuf.h" #define MIN_L7_PROTO_ID 100 #define MAX_L7_PROTO_ID 150 @@ -175,10 +176,47 @@ struct topic_stat rd_kafka_topic_t *topic_rkt; }; +struct enforce_rule_result +{ + int log_type; //enum LOG_TYPE + struct maat_rule rule; + + char *mail_eml_filename; + char *http_request_filename; + char *http_response_filename; + + long mirrored_bytes; + long mirrored_packets; + char *packet_capture_file; +}; + +#define LOGGER_SCRATCH_RULE_MAX 16 +struct logger_scratch +{ + int thread_id; + int n_enforce_rule; + int log_type; // enum LOG_TYPE + struct TLD_handle_t *handle; + struct tsg_log_instance_t *instance; + struct enforce_rule_result *enforce_rule; +}; + +struct logger_ringbuf_schema +{ + int produce_row_idx; + int consume_row_idx; + pthread_t pthread_id; + size_t size; + char *buff; + ringbuf_t *ring; + ringbuf_worker_t **worker; +}; + struct tsg_log_instance_t { int mode; int level; + int ringbuf_num; int max_service; int vsystem_id; int unknown_app_id; @@ -210,6 +248,7 @@ struct tsg_log_instance_t id2field_t id2field[LOG_COMMON_MAX]; rd_kafka_t *kafka_handle; struct topic_stat *service2topic; + struct logger_ringbuf_schema *ringbuf; void *logger; }; diff --git a/test/src/CMakeLists.txt b/test/src/CMakeLists.txt index 2a25309..2915259 100644 --- a/test/src/CMakeLists.txt +++ b/test/src/CMakeLists.txt @@ -9,20 +9,48 @@ include_directories(${PROJECT_SOURCE_DIR}/src/) add_definitions(-std=c++11) LINK_DIRECTORIES(/opt/MESA/lib) -add_executable(gtest_rule ${PROJECT_SOURCE_DIR}/src/tsg_rule.cpp ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp gtest_common.cpp gtest_rule.cpp) +add_executable(gtest_rule ${PROJECT_SOURCE_DIR}/src/tsg_rule.cpp + ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp + ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp + gtest_common.cpp gtest_rule.cpp + ) target_link_libraries(gtest_rule gtest-static ctemplate-static cjson MESA_prof_load MESA_handle_logger maat4 fieldstat3) -add_executable(gtest_tableinfo ${PROJECT_SOURCE_DIR}/src/tsg_rule.cpp ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp gtest_common.cpp gtest_tableinfo.cpp) +add_executable(gtest_tableinfo ${PROJECT_SOURCE_DIR}/src/tsg_rule.cpp + ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp + ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp + gtest_common.cpp gtest_tableinfo.cpp + ) target_link_libraries(gtest_tableinfo gtest-static ctemplate-static cjson MESA_prof_load MESA_handle_logger maat4 fieldstat3) -add_executable(gtest_bridge ${PROJECT_SOURCE_DIR}/src/tsg_bridge.cpp ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp gtest_common.cpp gtest_bridge.cpp) +add_executable(gtest_bridge ${PROJECT_SOURCE_DIR}/src/tsg_bridge.cpp + ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp + ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp + gtest_common.cpp gtest_bridge.cpp + ) target_link_libraries(gtest_bridge gtest-static ctemplate-static cjson MESA_prof_load MESA_handle_logger maat4 fieldstat3) -add_executable(gtest_action ${PROJECT_SOURCE_DIR}/src/tsg_action.cpp ${PROJECT_SOURCE_DIR}/src/tsg_leaky_bucket.cpp ${PROJECT_SOURCE_DIR}/src/tsg_dns.cpp ${PROJECT_SOURCE_DIR}/src/tsg_icmp.cpp ${PROJECT_SOURCE_DIR}/src/tsg_tamper.cpp ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp gtest_common.cpp gtest_action.cpp) +add_executable(gtest_action ${PROJECT_SOURCE_DIR}/src/tsg_action.cpp + ${PROJECT_SOURCE_DIR}/src/tsg_leaky_bucket.cpp + ${PROJECT_SOURCE_DIR}/src/tsg_dns.cpp + ${PROJECT_SOURCE_DIR}/src/tsg_icmp.cpp + ${PROJECT_SOURCE_DIR}/src/tsg_tamper.cpp + ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp + ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp + gtest_common.cpp + gtest_action.cpp + ) target_link_libraries(gtest_action gtest-static ctemplate-static cjson MESA_prof_load MESA_handle_logger maat4 fieldstat3) -add_executable(gtest_sendlog ${PROJECT_SOURCE_DIR}/src/tsg_send_log.cpp ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp gtest_common.cpp gtest_kafka.cpp gtest_sendlog.cpp) +add_executable(gtest_sendlog ${PROJECT_SOURCE_DIR}/src/tsg_send_log.cpp + ${PROJECT_SOURCE_DIR}/src/tsg_variable.cpp + ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp + ${PROJECT_SOURCE_DIR}/deps/ringbuf/ringbuf.c + gtest_common.cpp + gtest_kafka.cpp + gtest_sendlog.cpp + ) target_link_libraries(gtest_sendlog gtest-static ctemplate-static cjson MESA_prof_load MESA_handle_logger maat4 rdkafka fieldstat3) set(TSG_MASTER_SRC ${PROJECT_SOURCE_DIR}/src/tsg_entry.cpp @@ -44,6 +72,7 @@ set(TSG_MASTER_SRC ${PROJECT_SOURCE_DIR}/src/tsg_entry.cpp ${PROJECT_SOURCE_DIR}/src/mpack.c ${PROJECT_SOURCE_DIR}/src/tsg_stat.cpp ${PROJECT_SOURCE_DIR}/src/tsg_ssl_ja3_fingerprint.cpp + ${PROJECT_SOURCE_DIR}/deps/ringbuf/ringbuf.c ) add_executable(gtest_master ${TSG_MASTER_SRC} gtest_kafka.cpp gtest_common.cpp gtest_master.cpp) |
