diff options
| author | lijia <[email protected]> | 2024-10-18 16:47:51 +0800 |
|---|---|---|
| committer | lijia <[email protected]> | 2024-11-07 18:30:58 +0800 |
| commit | e734af76d81b07090c618b1c4af3b2fdd6b592f3 (patch) | |
| tree | c9b894fb0eaa9c56bd5f04bfab5628a97592091d /deps/ringbuf | |
| parent | 99a68d5c9efe500ab339165a2af515a0a7355ada (diff) | |
rebase onto develop-2.0
Diffstat (limited to 'deps/ringbuf')
| -rw-r--r-- | deps/ringbuf/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | deps/ringbuf/ringbuf.c | 430 | ||||
| -rw-r--r-- | deps/ringbuf/ringbuf.h | 29 | ||||
| -rw-r--r-- | deps/ringbuf/utils.h | 115 |
4 files changed, 576 insertions, 0 deletions
diff --git a/deps/ringbuf/CMakeLists.txt b/deps/ringbuf/CMakeLists.txt new file mode 100644 index 0000000..16222e2 --- /dev/null +++ b/deps/ringbuf/CMakeLists.txt @@ -0,0 +1,2 @@ +add_library(ringbuf STATIC ringbuf.c) +target_include_directories(ringbuf PUBLIC ${CMAKE_CURRENT_LIST_DIR})
\ No newline at end of file diff --git a/deps/ringbuf/ringbuf.c b/deps/ringbuf/ringbuf.c new file mode 100644 index 0000000..75cfee9 --- /dev/null +++ b/deps/ringbuf/ringbuf.c @@ -0,0 +1,430 @@ +/* + * Copyright (c) 2016-2017 Mindaugas Rasiukevicius <rmind at noxt eu> + * All rights reserved. + * + * Use is subject to license terms, as specified in the LICENSE file. + */ + +/* + * Atomic multi-producer single-consumer ring buffer, which supports + * contiguous range operations and which can be conveniently used for + * message passing. + * + * There are three offsets -- think of clock hands: + * - NEXT: marks the beginning of the available space, + * - WRITTEN: the point up to which the data is actually written. + * - Observed READY: point up to which data is ready to be written. + * + * Producers + * + * Observe and save the 'next' offset, then request N bytes from + * the ring buffer by atomically advancing the 'next' offset. Once + * the data is written into the "reserved" buffer space, the thread + * clears the saved value; these observed values are used to compute + * the 'ready' offset. + * + * Consumer + * + * Writes the data between 'written' and 'ready' offsets and updates + * the 'written' value. The consumer thread scans for the lowest + * seen value by the producers. + * + * Key invariant + * + * Producers cannot go beyond the 'written' offset; producers are + * also not allowed to catch up with the consumer. Only the consumer + * is allowed to catch up with the producer i.e. set the 'written' + * offset to be equal to the 'next' offset. + * + * Wrap-around + * + * If the producer cannot acquire the requested length due to little + * available space at the end of the buffer, then it will wraparound. + * WRAP_LOCK_BIT in 'next' offset is used to lock the 'end' offset. + * + * There is an ABA problem if one producer stalls while a pair of + * producer and consumer would both successfully wrap-around and set + * the 'next' offset to the stale value of the first producer, thus + * letting it to perform a successful CAS violating the invariant. + * A counter in the 'next' offset (masked by WRAP_COUNTER) is used + * to prevent from this problem. It is incremented on wraparounds. + * + * The same ABA problem could also cause a stale 'ready' offset, + * which could be observed by the consumer. We set WRAP_LOCK_BIT in + * the 'seen' value before advancing the 'next' and clear this bit + * after the successful advancing; this ensures that only the stable + * 'ready' is observed by the consumer. + */ + +#include <stdio.h> +#include <stdlib.h> +#include <stddef.h> +#include <stdbool.h> +#include <inttypes.h> +#include <string.h> +#include <limits.h> +#include <errno.h> + +#include "ringbuf.h" +#include "utils.h" + +#define RBUF_OFF_MASK (0x00000000ffffffffUL) +#define WRAP_LOCK_BIT (0x8000000000000000UL) +#define RBUF_OFF_MAX (UINT64_MAX & ~WRAP_LOCK_BIT) + +#define WRAP_COUNTER (0x7fffffff00000000UL) +#define WRAP_INCR(x) (((x) + 0x100000000UL) & WRAP_COUNTER) + +typedef uint64_t ringbuf_off_t; + +struct ringbuf_worker { + volatile ringbuf_off_t seen_off; + int registered; +}; + +struct ringbuf { + /* Ring buffer space. */ + size_t space; + + /* + * The NEXT hand is atomically updated by the producer. + * WRAP_LOCK_BIT is set in case of wrap-around; in such case, + * the producer can update the 'end' offset. + */ + volatile ringbuf_off_t next; + ringbuf_off_t end; + + /* The following are updated by the consumer. */ + ringbuf_off_t written; + unsigned nworkers; + ringbuf_worker_t workers[]; +}; + +/* + * ringbuf_setup: initialise a new ring buffer of a given length. + */ +int +ringbuf_setup(ringbuf_t *rbuf, unsigned nworkers, size_t length) +{ + if (length >= RBUF_OFF_MASK) { + errno = EINVAL; + return -1; + } + memset(rbuf, 0, offsetof(ringbuf_t, workers[nworkers])); + rbuf->space = length; + rbuf->end = RBUF_OFF_MAX; + rbuf->nworkers = nworkers; + return 0; +} + +/* + * ringbuf_get_sizes: return the sizes of the ringbuf_t and ringbuf_worker_t. + */ +void +ringbuf_get_sizes(unsigned nworkers, + size_t *ringbuf_size, size_t *ringbuf_worker_size) +{ + if (ringbuf_size) + *ringbuf_size = offsetof(ringbuf_t, workers[nworkers]); + if (ringbuf_worker_size) + *ringbuf_worker_size = sizeof(ringbuf_worker_t); +} + +/* + * ringbuf_register: register the worker (thread/process) as a producer + * and pass the pointer to its local store. + */ +ringbuf_worker_t * +ringbuf_register(ringbuf_t *rbuf, unsigned i) +{ + ringbuf_worker_t *w = &rbuf->workers[i]; + + w->seen_off = RBUF_OFF_MAX; + atomic_store_explicit(&w->registered, true, memory_order_release); + return w; +} + +void +ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *w) +{ + w->registered = false; + (void)rbuf; +} + +/* + * stable_nextoff: capture and return a stable value of the 'next' offset. + */ +static inline ringbuf_off_t +stable_nextoff(ringbuf_t *rbuf) +{ + unsigned count = SPINLOCK_BACKOFF_MIN; + ringbuf_off_t next; +retry: + next = atomic_load_explicit(&rbuf->next, memory_order_acquire); + if (next & WRAP_LOCK_BIT) { + SPINLOCK_BACKOFF(count); + goto retry; + } + ASSERT((next & RBUF_OFF_MASK) < rbuf->space); + return next; +} + +/* + * stable_seenoff: capture and return a stable value of the 'seen' offset. + */ +static inline ringbuf_off_t +stable_seenoff(ringbuf_worker_t *w) +{ + unsigned count = SPINLOCK_BACKOFF_MIN; + ringbuf_off_t seen_off; +retry: + seen_off = atomic_load_explicit(&w->seen_off, memory_order_acquire); + if (seen_off & WRAP_LOCK_BIT) { + SPINLOCK_BACKOFF(count); + goto retry; + } + return seen_off; +} + +/* + * ringbuf_acquire: request a space of a given length in the ring buffer. + * + * => On success: returns the offset at which the space is available. + * => On failure: returns -1. + */ +ssize_t +ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len) +{ + ringbuf_off_t seen, next, target; + + ASSERT(len > 0 && len <= rbuf->space); + ASSERT(w->seen_off == RBUF_OFF_MAX); + + do { + ringbuf_off_t written; + + /* + * Get the stable 'next' offset. Save the observed 'next' + * value (i.e. the 'seen' offset), but mark the value as + * unstable (set WRAP_LOCK_BIT). + * + * Note: CAS will issue a memory_order_release for us and + * thus ensures that it reaches global visibility together + * with new 'next'. + */ + seen = stable_nextoff(rbuf); + next = seen & RBUF_OFF_MASK; + ASSERT(next < rbuf->space); + atomic_store_explicit(&w->seen_off, next | WRAP_LOCK_BIT, + memory_order_relaxed); + + /* + * Compute the target offset. Key invariant: we cannot + * go beyond the WRITTEN offset or catch up with it. + */ + target = next + len; + written = rbuf->written; + if (__predict_false(next < written && target >= written)) { + /* The producer must wait. */ + atomic_store_explicit(&w->seen_off, + RBUF_OFF_MAX, memory_order_release); + return -1; + } + + if (__predict_false(target >= rbuf->space)) { + const bool exceed = target > rbuf->space; + + /* + * Wrap-around and start from the beginning. + * + * If we would exceed the buffer, then attempt to + * acquire the WRAP_LOCK_BIT and use the space in + * the beginning. If we used all space exactly to + * the end, then reset to 0. + * + * Check the invariant again. + */ + target = exceed ? (WRAP_LOCK_BIT | len) : 0; + if ((target & RBUF_OFF_MASK) >= written) { + atomic_store_explicit(&w->seen_off, + RBUF_OFF_MAX, memory_order_release); + return -1; + } + /* Increment the wrap-around counter. */ + target |= WRAP_INCR(seen & WRAP_COUNTER); + } else { + /* Preserve the wrap-around counter. */ + target |= seen & WRAP_COUNTER; + } + } while (!atomic_compare_exchange_weak(&rbuf->next, &seen, target)); + + /* + * Acquired the range. Clear WRAP_LOCK_BIT in the 'seen' value + * thus indicating that it is stable now. + * + * No need for memory_order_release, since CAS issued a fence. + */ + atomic_store_explicit(&w->seen_off, w->seen_off & ~WRAP_LOCK_BIT, + memory_order_relaxed); + + /* + * If we set the WRAP_LOCK_BIT in the 'next' (because we exceed + * the remaining space and need to wrap-around), then save the + * 'end' offset and release the lock. + */ + if (__predict_false(target & WRAP_LOCK_BIT)) { + /* Cannot wrap-around again if consumer did not catch-up. */ + ASSERT(rbuf->written <= next); + ASSERT(rbuf->end == RBUF_OFF_MAX); + rbuf->end = next; + next = 0; + + /* + * Unlock: ensure the 'end' offset reaches global + * visibility before the lock is released. + */ + atomic_store_explicit(&rbuf->next, + (target & ~WRAP_LOCK_BIT), memory_order_release); + } + ASSERT((target & RBUF_OFF_MASK) <= rbuf->space); + return (ssize_t)next; +} + +/* + * ringbuf_produce: indicate the acquired range in the buffer is produced + * and is ready to be consumed. + */ +void +ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *w) +{ + (void)rbuf; + ASSERT(w->registered); + ASSERT(w->seen_off != RBUF_OFF_MAX); + atomic_store_explicit(&w->seen_off, RBUF_OFF_MAX, memory_order_release); +} + +/* + * ringbuf_consume: get a contiguous range which is ready to be consumed. + */ +size_t +ringbuf_consume(ringbuf_t *rbuf, size_t *offset) +{ + ringbuf_off_t written = rbuf->written, next, ready; + size_t towrite; +retry: + /* + * Get the stable 'next' offset. Note: stable_nextoff() issued + * a load memory barrier. The area between the 'written' offset + * and the 'next' offset will be the *preliminary* target buffer + * area to be consumed. + */ + next = stable_nextoff(rbuf) & RBUF_OFF_MASK; + if (written == next) { + /* If producers did not advance, then nothing to do. */ + return 0; + } + + /* + * Observe the 'ready' offset of each producer. + * + * At this point, some producer might have already triggered the + * wrap-around and some (or all) seen 'ready' values might be in + * the range between 0 and 'written'. We have to skip them. + */ + ready = RBUF_OFF_MAX; + + for (unsigned i = 0; i < rbuf->nworkers; i++) { + ringbuf_worker_t *w = &rbuf->workers[i]; + ringbuf_off_t seen_off; + + /* + * Skip if the worker has not registered. + * + * Get a stable 'seen' value. This is necessary since we + * want to discard the stale 'seen' values. + */ + if (!atomic_load_explicit(&w->registered, memory_order_relaxed)) + continue; + seen_off = stable_seenoff(w); + + /* + * Ignore the offsets after the possible wrap-around. + * We are interested in the smallest seen offset that is + * not behind the 'written' offset. + */ + if (seen_off >= written) { + ready = MIN(seen_off, ready); + } + ASSERT(ready >= written); + } + + /* + * Finally, we need to determine whether wrap-around occurred + * and deduct the safe 'ready' offset. + */ + if (next < written) { + const ringbuf_off_t end = MIN(rbuf->space, rbuf->end); + + /* + * Wrap-around case. Check for the cut off first. + * + * Reset the 'written' offset if it reached the end of + * the buffer or the 'end' offset (if set by a producer). + * However, we must check that the producer is actually + * done (the observed 'ready' offsets are clear). + */ + if (ready == RBUF_OFF_MAX && written == end) { + /* + * Clear the 'end' offset if was set. + */ + if (rbuf->end != RBUF_OFF_MAX) { + rbuf->end = RBUF_OFF_MAX; + } + + /* + * Wrap-around the consumer and start from zero. + */ + written = 0; + atomic_store_explicit(&rbuf->written, + written, memory_order_release); + goto retry; + } + + /* + * We cannot wrap-around yet; there is data to consume at + * the end. The ready range is smallest of the observed + * 'ready' or the 'end' offset. If neither is set, then + * the actual end of the buffer. + */ + ASSERT(ready > next); + ready = MIN(ready, end); + ASSERT(ready >= written); + } else { + /* + * Regular case. Up to the observed 'ready' (if set) + * or the 'next' offset. + */ + ready = MIN(ready, next); + } + towrite = ready - written; + *offset = written; + + ASSERT(ready >= written); + ASSERT(towrite <= rbuf->space); + return towrite; +} + +/* + * ringbuf_release: indicate that the consumed range can now be released. + */ +void +ringbuf_release(ringbuf_t *rbuf, size_t nbytes) +{ + const size_t nwritten = rbuf->written + nbytes; + + ASSERT(rbuf->written <= rbuf->space); + ASSERT(rbuf->written <= rbuf->end); + ASSERT(nwritten <= rbuf->space); + + rbuf->written = (nwritten == rbuf->space) ? 0 : nwritten; +} diff --git a/deps/ringbuf/ringbuf.h b/deps/ringbuf/ringbuf.h new file mode 100644 index 0000000..e8fc767 --- /dev/null +++ b/deps/ringbuf/ringbuf.h @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2016 Mindaugas Rasiukevicius <rmind at noxt eu> + * All rights reserved. + * + * Use is subject to license terms, as specified in the LICENSE file. + */ + +#ifndef _RINGBUF_H_ +#define _RINGBUF_H_ + +__BEGIN_DECLS + +typedef struct ringbuf ringbuf_t; +typedef struct ringbuf_worker ringbuf_worker_t; + +int ringbuf_setup(ringbuf_t *, unsigned, size_t); +void ringbuf_get_sizes(unsigned, size_t *, size_t *); + +ringbuf_worker_t *ringbuf_register(ringbuf_t *, unsigned); +void ringbuf_unregister(ringbuf_t *, ringbuf_worker_t *); + +ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t *, size_t); +void ringbuf_produce(ringbuf_t *, ringbuf_worker_t *); +size_t ringbuf_consume(ringbuf_t *, size_t *); +void ringbuf_release(ringbuf_t *, size_t); + +__END_DECLS + +#endif diff --git a/deps/ringbuf/utils.h b/deps/ringbuf/utils.h new file mode 100644 index 0000000..413157b --- /dev/null +++ b/deps/ringbuf/utils.h @@ -0,0 +1,115 @@ +/* + * Copyright (c) 1991, 1993 + * The Regents of the University of California. All rights reserved. + * + * This code is derived from software contributed to Berkeley by + * Berkeley Software Design, Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the University nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + * @(#)cdefs.h 8.8 (Berkeley) 1/9/95 + */ + +#ifndef _UTILS_H_ +#define _UTILS_H_ + +#include <assert.h> + +/* + * A regular assert (debug/diagnostic only). + */ +#if defined(DEBUG) +#define ASSERT assert +#else +#define ASSERT(x) +#endif + +/* + * Minimum, maximum and rounding macros. + */ + +#ifndef MIN +#define MIN(x, y) ((x) < (y) ? (x) : (y)) +#endif + +#ifndef MAX +#define MAX(x, y) ((x) > (y) ? (x) : (y)) +#endif + +/* + * Branch prediction macros. + */ +#ifndef __predict_true +#define __predict_true(x) __builtin_expect((x) != 0, 1) +#define __predict_false(x) __builtin_expect((x) != 0, 0) +#endif + +/* + * Atomic operations and memory barriers. If C11 API is not available, + * then wrap the GCC builtin routines. + * + * Note: This atomic_compare_exchange_weak does not do the C11 thing of + * filling *(expected) with the actual value, because we don't need + * that here. + */ +#ifndef atomic_compare_exchange_weak +#define atomic_compare_exchange_weak(ptr, expected, desired) \ + __sync_bool_compare_and_swap(ptr, *(expected), desired) +#endif + +#ifndef atomic_thread_fence +#define memory_order_relaxed __ATOMIC_RELAXED +#define memory_order_acquire __ATOMIC_ACQUIRE +#define memory_order_release __ATOMIC_RELEASE +#define memory_order_seq_cst __ATOMIC_SEQ_CST +#define atomic_thread_fence(m) __atomic_thread_fence(m) +#endif +#ifndef atomic_store_explicit +#define atomic_store_explicit __atomic_store_n +#endif +#ifndef atomic_load_explicit +#define atomic_load_explicit __atomic_load_n +#endif + +/* + * Exponential back-off for the spinning paths. + */ +#define SPINLOCK_BACKOFF_MIN 4 +#define SPINLOCK_BACKOFF_MAX 128 +#if defined(__x86_64__) || defined(__i386__) +#define SPINLOCK_BACKOFF_HOOK __asm volatile("pause" ::: "memory") +#else +#define SPINLOCK_BACKOFF_HOOK +#endif +#define SPINLOCK_BACKOFF(count) \ +do { \ + for (int __i = (count); __i != 0; __i--) { \ + SPINLOCK_BACKOFF_HOOK; \ + } \ + if ((count) < SPINLOCK_BACKOFF_MAX) \ + (count) += (count); \ +} while (/* CONSTCOND */ 0); + +#endif |
