High-Performance Persistent Messaging Queue
Downloadable computer software for database management
A fast, scalable messaging queue designed for high-throughput transactional event processing, built to integrate seamlessly with existing infrastructure like MySQL.
* Data collected on M1 notebook. View benchmark code. Higher throughput achievable in production.
Read Technical Deep Dives:
Optimized for low-latency operations with MySQL backend, supporting sharded table architecture for parallelism and scalability.
Ensures strictly monotonic ordering via DbId, enabling exactly-once processing with acknowledgments tracked in dedicated pointers tables.
Adjust performance via parameters like shard count (TableParallelism) and rate limits (MaxRPSPerThread). Designed for horizontal scaling.
Guarantees message integrity through asynchronous acknowledgment management, ensuring no data loss or duplication.
Encrypted offsite archiving to S3/B2, accelerated external processing, and optimized for AI/ML & reporting workloads.
NerveBus's remarkable throughput isn't magic; it's a direct result of leveraging Go's concurrency model to its fullest. At its heart, the Synapse
struct acts as an orchestrator, but the real work happens in a series of decoupled goroutines communicating via channels.
When you call SendPack
, you're not directly writing to the database. Instead, your packets are handed off to a dedicated channel managed by getQueueRunnerChannel
(see synapse_internal_channels.go
). A single queueRunner
goroutine consumes from this channel, performs the crucial task of assigning a globally sequential DbId
by coordinating with the backend's writer pointer, and then dispatches the packet to a shard-specific channel managed by getQueueWriterChannel
.
Each of these shard channels is served by a queueWriter
goroutine, responsible for batching writes (getDefaultIOBatchSize
). Finally, after a successful database write via the SynapseBackend
, confirmations flow through yet another channel to the queueAckMan
, which handles pointer updates and notifies the original sender.
This pipeline design, built entirely on goroutines and channels, means each stage operates largely independently. Buffering between stages provides resilience to temporary slowdowns (e.g., network latency to the DB), preventing backpressure from immediately halting ingestion. It allows NerveBus to effectively utilize multi-core processors by running these stages in parallel. This deep decoupling is fundamental to NerveBus's ability to sustain high message rates.
How does NerveBus reportedly achieve write speeds nearing 500,000 messages per second on suitable hardware? Let's trace the optimized write path:
SendPack
is the preferred method, allowing the client to amortize call overhead.queueRunner
goroutine fetches the last writer pointer (GetPtr(queueName, "")
) on startup and then atomically increments it for each incoming message, ensuring a strict, monotonic DbId
. This sequential ID is key for simplifying downstream logic.queueRunner
dispatches the packet to a specific queueWriter
based on DbId % TableParallelism
. This distributes the load across multiple concurrent writers.queueWriter
(in synapse_writer.go
) collects packets into an internal buffer, aiming for getDefaultIOBatchSize
(e.g., 10,000) before triggering a backend write. This minimizes DB round trips.SMysqlBackend.WriteBatch
function constructs a bulk INSERT ... ON DUPLICATE KEY UPDATE
statement for all packets destined for the same shard within the batch. This is highly efficient in InnoDB.WriteBatch
succeeds, packets are sent to the queueAckMan
. This manager (in synapse_ack_manager.go
) buffers these confirmations, sorts them by DbId
, and only updates the master writer pointer in the DB (saveQueueWriterPtr
) when it detects a contiguous sequence starting from the last saved pointer. Only then are the original senders' confirmation channels unblocked.This combination of client-side batching, sharding, deep internal batching, efficient SQL, and drastically minimized pointer update frequency allows NerveBus to push writes through at an extremely high rate.
Efficiently reading from a high-throughput queue like NerveBus requires careful state management. The Receiver
struct encapsulates this logic for a specific consumer (ConsumerId
).
When GetReceiver
is called, it initializes a Receiver
and kicks off two key goroutines: receiverBody
and readerAckManager
.
The receiverBody
runs a continuous loop:
GetPtr(r.QueueName, r.ConsumerId)
).GetPtr(r.QueueName, "")
).getDefaultReaderLimit
) and calls Synapse.Backend.ReadBatch
. This backend method efficiently fetches records by their DbId
s, potentially across multiple shards.DbId
, to the consumer via the buffered DataChan
.The consumer processes messages from DataChan
and calls receiver.Ack(packet)
. This doesn't immediately update the database. Instead, the packet.DbId
is sent to the AckChannel
.
The readerAckManager
listens on AckChannel
, buffers the incoming acknowledged DbId
s (ackBuffer
), sorts them, and checks if the sequence starting from lastAckedId + 1
can be advanced. If so, it calls Synapse.Backend.WritePtr
to update the consumer's pointer in the database only with the new highest sequential ID.
This asynchronous acknowledgement means Ack
returns quickly, but the persistence of the consumer's progress is delayed and batched. This implies at-least-once delivery semantics: if a crash occurs after processing but before the pointer update is persisted, messages may be redelivered. Consumers must be designed with idempotency in mind.
While NerveBus's architecture allows for different backends via the SynapseBackend
interface, the provided SMysqlBackend
is heavily optimized for performance. It tackles MySQL's potential limitations head-on:
SMysqlBackend
creates TableParallelism
data tables (queue_<Name>_<P>_<ShardId>
) and PointersParallelism
pointer tables (..._pointers
). Data is distributed using DbId % TableParallelism
. This spreads I/O load and reduces lock contention within InnoDB.id BIGINT UNSIGNED PRIMARY KEY, data LONGBLOB
. This simple structure is fast for primary key lookups and insertions. Pointer tables use id VARCHAR(255) PRIMARY KEY, ptr BIGINT UNSIGNED
, efficiently storing progress for the writer (<QueueName>:
) and consumers (<QueueName>:<ConsumerId>
).WriteBatch
uses INSERT INTO ... VALUES (...), (...) ON DUPLICATE KEY UPDATE data=VALUES(data)
. This handles both new inserts and potential retries/overwrites efficiently in a single statement per shard-batch.PointersParallelism
often equals 1 as per NQLocalTest
), minimizing the overhead of tracking progress. Updates use INSERT ... ON DUPLICATE KEY UPDATE ptr=VALUES(ptr)
.MaxRPSPerThread
): Crucially, the backend throttles itself. Before executing WriteBatch
or WritePtr
, it checks the time since the last operation for that effective "thread" (shard ID or pointer type). If too fast, it injects a time.Sleep
. This prevents overwhelming MySQL, ensuring stability and smoother performance under sustained load.ensureTablesExists
creates the necessary sharded tables and pointer tables on demand if they don't exist, simplifying deployment.The SMysqlBackend
isn't just using MySQL; it's carefully tailoring its interaction pattern—sharding, bulk operations, rate limiting—to extract maximum performance from the InnoDB engine.
NerveBus is designed for speed, but it also incorporates elements of flexibility:
QueueConfig
and BackendConfig
structs expose key tuning parameters:
TableParallelism
: Controls the number of data shards in the backend. Higher values distribute load further but might increase connection/management overhead. Optimal values depend on underlying hardware and MySQL config.PointersParallelism
: Usually lower (often 1), controls sharding for pointer tables.MaxRPSPerThread
: Directly controls the maximum query rate per writer shard/pointer type sent to the backend, crucial for stability.perChannelMemory
(synapse_config.go
) influence buffer sizes, trading memory usage for burst absorption capacity.SynapseBackend
interface (synapse_types.go
) defines the essential operations: WriteBatch
, ReadBatch
, WritePtr
, GetPtr
, etc. While only SMysqlBackend
is provided, this interface opens the door for implementing backends using other storage systems (e.g., PostgreSQL, Cassandra, potentially even log-structured systems or object storage if latency requirements allow) without altering the core Synapse
concurrency logic.[]byte
payloads within the Packet
struct. Helper functions like SendProtoPack
and SendSourcedPack
show it can easily handle specific serialization formats (like Protobuf) or structured metadata (NerveSourcedPacket
), but the core is agnostic.Key Trade-offs:
This flexibility comes with considerations:
DbId
generation assumes a single logical publisher for guaranteed ordering.NerveBus offers a powerful, configurable engine for high-throughput messaging, allowing users to tune its parallelism and database interaction, and even swap storage layers, provided they understand and design for its inherent operational characteristics.
High-throughput systems inevitably encounter transient backend issues. NerveBus is designed with persistence in mind, particularly on the critical write path, employing straightforward retry mechanisms.
Observe the queueWriter
's writeBuffer
function: if a call to s.Backend.WriteBatch(queueName, *writeBuffer)
fails, it logs the error but enters an infinite loop, retrying the same batch until success. Similarly, saveQueueWriterPtr
(responsible for advancing the main queue pointer in queueAckMan
) will loop indefinitely until s.Backend.WritePtr
succeeds. This ensures that once a message has been assigned a DbId
, NerveBus commits to eventually persisting it and its corresponding pointer update, preventing data loss during temporary database unavailability or network glitches.
On startup, components also show resilience. The queueRunner
retries s.Backend.GetPtr
with a delay if it fails initially, ensuring it correctly identifies the starting DbId
. The Receiver
's receiverBody
also appears to retry GetPtr
calls within its main loop.
It's worth noting that the SMysqlBackend
itself doesn't implement internal retries; it relies on these caller-side loops. While robust for writes, error handling within the receiverBody
after a failed ReadBatch
might warrant closer inspection in consumer logic, as the provided code seems to log the error but potentially continues its loop. Overall, NerveBus prioritizes write durability through persistent retries, a crucial feature for a reliable queue.
NerveBus's impressive speed relies heavily on decoupling processing stages with buffers, primarily Go channels and internal slices. However, this buffering directly impacts memory consumption and latency characteristics. Understanding this trade-off is key for tuning.
perChannelMemory
(e.g., 16Ki elements) in synapse_config.go
, these buffers sit between stages: API ingress (queueRunnerChannel
), runner-to-writer (queueWriterChannel
), and writer-to-ack (queueAckManChannel
). Larger buffers absorb bigger, temporary bursts where one stage outpaces the next, preventing stalls. The cost is increased RAM usage proportional to buffer size and element count.queueWriter
accumulates packets in a slice (buffer
) before calling Backend.WriteBatch
, aiming for getDefaultIOBatchSize
(e.g., 10,000). Larger batches reduce the frequency of database calls, boosting overall throughput, but increase the latency for any single message within that batch and require more memory per active writer goroutine.queueAckMan
and readerAckManager
use slices (confirmationsBuffer
, ackBuffer
) to hold pending acknowledgements before updating DB pointers. Their effective size impacts how many acks can be buffered before potentially blocking, especially relevant if pointer updates lag.Receiver.DataChan
is buffered (bufSize
parameter in newReceiver
). This allows the receiverBody
to pre-fetch messages from the backend, smoothing out delivery to the consumer, but again consumes memory.Tuning these buffer sizes requires profiling your specific workload. High-burst traffic might benefit from larger channel buffers, while consistent high throughput might optimize better with large write batches. It's a constant negotiation between maximizing throughput, controlling latency, and managing the application's memory footprint.
Within NerveBus's architecture, the QueueElementIndex
(often seen as packet.DbId
) is far more than a simple unique identifier; it's the central element coordinating multiple critical functions:
queueRunner
per queue, which fetches the last writer ID and monotonically increments it for every message entering the system. This provides a strict, predictable sequence at the point of ingestion.SMysqlBackend
relies directly on DbId
for distributing load: shardIdx := data[i].DbId % QueueElementIndex(s.config.TableParallelism)
. This simple modulo operation ensures packets are deterministically assigned to a specific backend shard table and its corresponding queueWriter
goroutine.Receiver
uses DbId
ranges to query the backend (ReadBatch
). While the backend might return data from different shards, the Receiver
implicitly ensures ordered delivery to the consumer's DataChan
because read operations fetch contiguous DbId
ranges and process them sequentially.DbId
. Both the main writer pointer (identifying the highest assigned DbId
) and individual consumer pointers (identifying the highest acknowledged DbId
) are stored and updated based on this value. The acknowledgement managers (queueAckMan
, readerAckManager
) perform their core logic by sorting received DbId
s and finding the highest contiguous value to advance the relevant pointer.Without the strictly sequential, centrally assigned DbId
, NerveBus's mechanisms for sharding, ordered delivery, and efficient, batched pointer updates would be vastly more complex, if not impossible. It's the lynchpin that enables coherent, high-performance operation across its distributed components.
The Synapse
struct serves as the primary public interface to NerveBus, but internally it acts as a stateful manager and factory for the components driving the queue pipelines. Let's peek inside:
Backend SynapseBackend
holds the reference to the storage layer implementation (e.g., SMysqlBackend
), providing the connection to persistence.Synapse
doesn't pre-allocate channels for all possible queues. Instead, it uses maps (queueChannels
, queueAckManChannels
, queueWriterChannels
) protected by sync.RWMutex
. Helper functions (getOrAddItem
, getOrAddNthItem
in synapse_map_tools.go
) ensure that channels and their associated goroutines (queueRunner
, queueWriter
, queueAckMan
) are created lazily and safely only when a queue is first used.controlChannels
slice holds channels intended for sending control signals (like ControlCommand{terminate: true}
) to the background goroutines. While the snippets show checks for termination, a full graceful shutdown mechanism isn't fully detailed but the infrastructure exists.trace bool
flag controls verbose logging. The SetTrace
method propagates this setting down to the Synapse
instance and its associated Backend
, allowing runtime debugging. A *zerolog.Logger
provides structured logging capabilities.infoChan chan ControlChanInfo
serves as an internal channel presumably for reporting significant events or errors (like ErrorSavingPointerInAckManSyncSection
) asynchronously from background tasks back to a potential monitoring or main loop (though the consumption side of infoChan
outside the NewSynapse
init isn't fully shown).In essence, Synapse
is the bootstrapping and coordination point. It dynamically manages the lifecycle of the necessary concurrent machinery for each queue, holds the connection to the storage backend, and provides basic control and observability hooks.
NerveBus's performance potential is high, but unlocking it often requires moving beyond default settings. The synapse_bench_test.go
provides clues, demonstrating patterns like high client concurrency (writeWorkers = 2000
) and client-side batching (packSize = 128
). How do we translate this to tuning?
TableParallelism
: This dictates backend sharding. The benchmark uses nIOThreads = 4
. In production, align this with your backend's capabilities. Consider MySQL's configured I/O threads (innodb_read_io_threads
, innodb_write_io_threads
) and available CPU cores. Monitor database contention – too many shards might increase overhead. Start with a moderate number (e.g., 4-16) and test.MaxRPSPerThread
: The benchmark uses 50. This is critical for stability. Determine the sustainable query rate your tuned MySQL instance can handle per connection without significant latency increase. Use load testing tools against MySQL directly or monitor NerveBus's backend query times (SMysqlBackend
logs slow queries) under load. Start conservatively and increase cautiously.getDefaultIOBatchSize = 10_000
and perChannelMemory = 16384
. Larger internal batches (IOBatchSize
) generally improve throughput by reducing DB calls per message but increase individual message latency. Larger channel buffers (perChannelMemory
) handle bursts better but use more RAM. Profile with realistic message sizes and traffic patterns to find the sweet spot for your latency vs. throughput needs.innodb_buffer_pool_size
, proper transaction log settings, sufficient connections) cannot be overstated. Monitor backend CPU, I/O wait, disk latency, and InnoDB metrics closely.SendPack
) is the intended high-throughput pattern. Tune the number of client workers and their batch sizes based on application logic and network capacity.Tuning NerveBus is an iterative process: establish a baseline, change one parameter (TableParallelism
, MaxRPS
, buffer sizes) at a time, load test, measure backend and application metrics, and repeat.
Additional support: $85/hour (technical), $95/hour (integration)
Additional support: $85/hour (technical), $95/hour (integration)
Additional support: $85/hour (technical), $95/hour (integration)
Complete the questionnaire below to receive a custom quote and solutions tailored to your specific needs.
NerveBus bridges traditional databases to modern event-driven architectures with enterprise-grade durability.