From ffdb5c88847996f159fc4c1a2957aa7afc85490a Mon Sep 17 00:00:00 2001 From: Neeraj Chowdary <57310710+neerajchowdary889@users.noreply.github.com> Date: Wed, 20 May 2026 18:55:34 +0530 Subject: [PATCH 1/2] Implement asynchronous write buffer for WAL operations Enhance the Write-Ahead Log (WAL) functionality by introducing an asynchronous buffer for event writes. The AddToBufferWAL method now serializes events and enqueues them into the buffer without blocking the main execution path. Additionally, a new DrainAsyncBuffer method ensures all pending entries are flushed before closing the WAL. This update improves performance by allowing non-blocking writes and efficient background flushing of log entries. --- common/WAL/WAL.go | 4 + common/WAL/async_buffer.go | 230 ++++++++++++++++++ common/WAL/operations.go | 76 +++++- core/protocol/router/data_router.go | 2 +- .../accounts/client_helper/client_helper.go | 35 +-- 5 files changed, 317 insertions(+), 30 deletions(-) create mode 100644 common/WAL/async_buffer.go diff --git a/common/WAL/WAL.go b/common/WAL/WAL.go index c67b293..dc75101 100644 --- a/common/WAL/WAL.go +++ b/common/WAL/WAL.go @@ -31,6 +31,10 @@ type WAL struct { // lastFlushedLSN tracks the last LSN that was persisted to disk lastFlushedLSN uint64 + + // asyncBuf is the CAS slot for the active async write buffer. + // Nil means no buffer is running; AddToBufferWAL creates one on demand. + asyncBuf atomic.Pointer[AsyncBuffer] } // WALEntry represents a single event in the WAL diff --git a/common/WAL/async_buffer.go b/common/WAL/async_buffer.go new file mode 100644 index 0000000..fe67dc0 --- /dev/null +++ b/common/WAL/async_buffer.go @@ -0,0 +1,230 @@ +package WAL + +import ( + "log" + "sync" + "sync/atomic" + "time" + + wal_types "github.com/JupiterMetaLabs/JMDN-FastSync/common/types/wal" +) + +const ( + asyncFlushInterval = 60 * time.Millisecond + asyncIdleTimeout = 10 * time.Second +) + +type entryKind uint8 + +const ( + kindData entryKind = iota + kindCheckpoint +) + +// pendingEntry holds a pre-serialised WAL event waiting to be assigned an LSN. +// Serialisation happens at enqueue time (no lock); LSN is assigned at flush time +// inside the WAL mutex so ordering is always correct even with multiple buffers. +type pendingEntry struct { + walType wal_types.WALType + data []byte +} + +// stackEntry is one node in the coalescing stack. +// Adjacent data pushes are merged into a single node; checkpoint nodes act as +// barriers and are deduplicated when consecutive. +type stackEntry struct { + kind entryKind + entries []pendingEntry // non-nil for kindData only +} + +// AsyncBuffer is a coalescing write buffer that batches WAL entries and flushes +// them to the underlying WAL every asyncFlushInterval. +// +// Merge rules (applied at push time against the current stack top): +// push data + top=data → append into top entry (merge) +// push data + top=checkpoint → new data node +// push checkpoint + top=checkpoint → deduplicate (no-op) +// push checkpoint + top=data → new checkpoint node +// +// The buffer self-closes after asyncIdleTimeout of inactivity: it drains, +// CAS-nils the global slot, and exits. The next AddToBufferWAL call creates a +// fresh buffer transparently. +type AsyncBuffer struct { + mu sync.Mutex + stack []stackEntry + closed bool + + wal *WAL + global *atomic.Pointer[AsyncBuffer] + + done chan struct{} + drainDone chan struct{} + closeOnce sync.Once + + lastActivity atomic.Int64 // unix nanoseconds of last push +} + +func newAsyncBuffer(w *WAL, global *atomic.Pointer[AsyncBuffer]) *AsyncBuffer { + b := &AsyncBuffer{ + wal: w, + global: global, + done: make(chan struct{}), + drainDone: make(chan struct{}), + } + b.lastActivity.Store(time.Now().UnixNano()) + return b +} + +// pushData enqueues a pre-serialised entry. +// Returns false if the buffer is already closed; the caller must create a new one. +func (b *AsyncBuffer) pushData(e pendingEntry) bool { + b.mu.Lock() + defer b.mu.Unlock() + if b.closed { + return false + } + if len(b.stack) > 0 && b.stack[len(b.stack)-1].kind == kindData { + b.stack[len(b.stack)-1].entries = append(b.stack[len(b.stack)-1].entries, e) + } else { + b.stack = append(b.stack, stackEntry{kind: kindData, entries: []pendingEntry{e}}) + } + b.lastActivity.Store(time.Now().UnixNano()) + return true +} + +// pushCheckpoint enqueues a checkpoint sentinel. +// Returns false if the buffer is closed. +func (b *AsyncBuffer) pushCheckpoint() bool { + b.mu.Lock() + defer b.mu.Unlock() + if b.closed { + return false + } + if len(b.stack) > 0 && b.stack[len(b.stack)-1].kind == kindCheckpoint { + // consecutive checkpoints collapse to one + return true + } + b.stack = append(b.stack, stackEntry{kind: kindCheckpoint}) + b.lastActivity.Store(time.Now().UnixNano()) + return true +} + +// flushAndReport swaps the stack and writes it to WAL. +// Returns true if there was anything to flush. +func (b *AsyncBuffer) flushAndReport() bool { + b.mu.Lock() + if len(b.stack) == 0 { + b.mu.Unlock() + return false + } + snapshot := b.stack + b.stack = make([]stackEntry, 0, cap(snapshot)) + b.mu.Unlock() + + b.writeToWAL(snapshot) + return true +} + +// closeAndDrain marks the buffer closed, CAS-nils the global slot so new writers +// immediately see a nil slot and create a fresh buffer, then writes any remaining +// stack entries to WAL and signals drainDone. +func (b *AsyncBuffer) closeAndDrain() { + b.mu.Lock() + b.closed = true + snapshot := b.stack + b.stack = nil + b.mu.Unlock() + + // Unregister before flush: new writers can create a fresh buffer in parallel + // while this buffer is draining. + b.global.CompareAndSwap(b, nil) + + b.writeToWAL(snapshot) + close(b.drainDone) +} + +// writeToWAL processes a stack snapshot into the underlying WAL. +// For kindData nodes: acquires WAL.Mu, assigns LSNs, appends to WAL buffer, +// then calls flushBuffer once per node. +// For kindCheckpoint nodes: calls WAL.CreateCheckpoint (has its own lock). +func (b *AsyncBuffer) writeToWAL(snapshot []stackEntry) { + for _, se := range snapshot { + switch se.kind { + case kindData: + b.wal.Mu.Lock() + for _, pe := range se.entries { + lsn := b.wal.nextLSN() + b.wal.Buffer = append(b.wal.Buffer, WALEntry{ + Type: pe.walType, + Data: pe.data, + LSN: lsn, + Timestamp: time.Now().Unix(), + }) + if len(b.wal.Buffer) >= b.wal.BatchSize { + if err := b.wal.flushBuffer(); err != nil { + log.Printf("async WAL: mid-batch flush error: %v", err) + } + } + } + if err := b.wal.flushBuffer(); err != nil { + log.Printf("async WAL: flush error: %v", err) + } + b.wal.Mu.Unlock() + + case kindCheckpoint: + if _, err := b.wal.CreateCheckpoint(); err != nil { + log.Printf("async WAL: checkpoint error: %v", err) + } + } + } +} + +// Drain signals the buffer to stop accepting writes, flushes all remaining +// entries to WAL, and blocks until complete. Safe to call multiple times. +func (b *AsyncBuffer) Drain() { + b.closeOnce.Do(func() { close(b.done) }) + <-b.drainDone +} + +// run is the background goroutine: periodic flush every asyncFlushInterval, +// idle shutdown after asyncIdleTimeout of inactivity. +func (b *AsyncBuffer) run() { + ticker := time.NewTicker(asyncFlushInterval) + defer ticker.Stop() + + idleTimer := time.NewTimer(asyncIdleTimeout) + defer idleTimer.Stop() + + resetIdle := func() { + if !idleTimer.Stop() { + select { + case <-idleTimer.C: + default: + } + } + idleTimer.Reset(asyncIdleTimeout) + } + + for { + select { + case <-ticker.C: + if b.flushAndReport() { + resetIdle() + } + + case <-idleTimer.C: + since := time.Since(time.Unix(0, b.lastActivity.Load())) + if since < asyncIdleTimeout { + // spurious fire (activity happened after timer was set) + idleTimer.Reset(asyncIdleTimeout - since) + continue + } + b.closeAndDrain() + return + + case <-b.done: + b.closeAndDrain() + return + } + } +} diff --git a/common/WAL/operations.go b/common/WAL/operations.go index f9d2adf..950e2d5 100644 --- a/common/WAL/operations.go +++ b/common/WAL/operations.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "runtime" "time" wal_types "github.com/JupiterMetaLabs/JMDN-FastSync/common/types/wal" @@ -297,24 +298,89 @@ func (w *WAL) GetLastFlushedLSN() uint64 { return w.lastFlushedLSN } -// Close ensures all buffered data is flushed before closing the underlying log. +// Close drains the async buffer (if any), flushes all remaining buffered entries, +// then closes the underlying log. func (w *WAL) Close() error { + // Drain async buffer before acquiring Mu: writeToWAL inside Drain needs Mu.Lock. + w.DrainAsyncBuffer() + w.Mu.Lock() defer w.Mu.Unlock() - // Flush remaining buffer if err := w.flushBuffer(); err != nil { return fmt.Errorf("failed to flush on close: %w", err) } - - // Close the WAL if err := w.Log.Close(); err != nil { return fmt.Errorf("failed to close WAL: %w", err) } - return nil } +// AddToBufferWAL serialises event immediately (no lock) and enqueues it into the +// async write buffer. The LSN is assigned and the entry is flushed to disk by the +// background goroutine every asyncFlushInterval (60 ms). +// +// The buffer is created on the first call and reused until it idles out. +// If a buffer is in the process of closing, this spins (rare, nanoseconds) until +// the CAS slot is clear, then creates a fresh buffer. +func (w *WAL) AddToBufferWAL(event wal_types.EventAdapter) error { + data, err := event.Serialize() + if err != nil { + return fmt.Errorf("async WAL: serialize: %w", err) + } + pe := pendingEntry{walType: event.GetType(), data: data} + + for { + buf := w.asyncBuf.Load() + if buf != nil { + if buf.pushData(pe) { + return nil + } + // Buffer is closed but hasn't CAS-nil'd the slot yet. Yield and retry. + runtime.Gosched() + continue + } + nb := newAsyncBuffer(w, &w.asyncBuf) + if w.asyncBuf.CompareAndSwap(nil, nb) { + go nb.run() + nb.pushData(pe) + return nil + } + // Lost the CAS race to another goroutine — retry from top. + } +} + +// AddCheckpointToBuffer enqueues a checkpoint sentinel into the async buffer. +// The checkpoint fires after all data entries already in the buffer have been +// flushed to WAL. Creates a buffer if none is active. +func (w *WAL) AddCheckpointToBuffer() { + for { + buf := w.asyncBuf.Load() + if buf != nil { + if buf.pushCheckpoint() { + return + } + runtime.Gosched() + continue + } + nb := newAsyncBuffer(w, &w.asyncBuf) + if w.asyncBuf.CompareAndSwap(nil, nb) { + go nb.run() + nb.pushCheckpoint() + return + } + } +} + +// DrainAsyncBuffer synchronously flushes all pending async entries to WAL +// and closes the buffer. No-op if no buffer is active. +// Must be called before WAL.Close() and before the process exits. +func (w *WAL) DrainAsyncBuffer() { + if buf := w.asyncBuf.Load(); buf != nil { + buf.Drain() + } +} + // TruncateBefore eliminates all log entries with an LSN lower than the specified value. // CAUTION: This is a destructive operation. Use only after state is safely checkpointed. func (w *WAL) TruncateBefore(lsn uint64) error { diff --git a/core/protocol/router/data_router.go b/core/protocol/router/data_router.go index 6450e44..2db9627 100644 --- a/core/protocol/router/data_router.go +++ b/core/protocol/router/data_router.go @@ -1659,4 +1659,4 @@ func (router *Datarouter) HandleAccountsFetch(ctx context.Context, req *accounts }, } } -} \ No newline at end of file +} diff --git a/core/protocol/router/helper/accounts/client_helper/client_helper.go b/core/protocol/router/helper/accounts/client_helper/client_helper.go index 307ecdd..871be52 100644 --- a/core/protocol/router/helper/accounts/client_helper/client_helper.go +++ b/core/protocol/router/helper/accounts/client_helper/client_helper.go @@ -45,11 +45,10 @@ func (clienthelper *clientHelper) GetSyncVars() *types.Syncvars { // WriteAccounts persists a page of accounts received from the server. // -// If WAL is configured: -// 1. Write AccountSyncEvent to WAL (crash-safe record before DB touch) -// 2. Flush WAL to disk -// 3. Write accounts to DB -// 4. Create WAL checkpoint (mark these entries as applied) +// WAL writes are async: the event is serialised and enqueued into the async +// buffer (no disk I/O on the hot path). The background buffer goroutine flushes +// to disk every 60 ms. The DB write happens immediately without waiting for the +// WAL flush — CreateAccount is idempotent so crash recovery via WAL replay is safe. // // If WAL is nil, writes directly to DB (e.g. in tests). func (clienthelper *clientHelper) WriteAccounts(protoaccounts []*accountspb.Account) (bool, error) { @@ -64,7 +63,7 @@ func (clienthelper *clientHelper) WriteAccounts(protoaccounts []*accountspb.Acco ctx := clienthelper.SyncVars.Ctx - // ── 1. WAL write ────────────────────────────────────────────────────────── + // ── 1. Async WAL enqueue (non-blocking) ─────────────────────────────────── if clienthelper.SyncVars.WAL != nil { event := &WAL.AccountSyncEvent{ BaseEvent: wal_types.BaseEvent{Operation: wal_types.OpAppend}, @@ -72,26 +71,17 @@ func (clienthelper *clientHelper) WriteAccounts(protoaccounts []*accountspb.Acco Accounts: protoaccounts, }, } - lsn, err := clienthelper.SyncVars.WAL.WriteEvent(event) - if err != nil { - return false, fmt.Errorf("accountsync client: WAL write failed: %w", err) + if err := clienthelper.SyncVars.WAL.AddToBufferWAL(event); err != nil { + return false, fmt.Errorf("accountsync client: async WAL enqueue failed: %w", err) } - Log.Logger(Log.Sync).Debug(ctx, "accountsync client: WAL event written", - ion.Int64("lsn", int64(lsn)), + Log.Logger(Log.Sync).Debug(ctx, "accountsync client: WAL entry enqueued async", ion.Int("account_count", len(protoaccounts))) - - // ── 2. WAL flush ────────────────────────────────────────────────────── - if err := clienthelper.SyncVars.WAL.Flush(); err != nil { - return false, fmt.Errorf("accountsync client: WAL flush failed: %w", err) - } - Log.Logger(Log.Sync).Debug(ctx, "accountsync client: WAL flushed", - ion.Int64("last_flushed_lsn", int64(clienthelper.SyncVars.WAL.GetLastFlushedLSN()))) } else { Log.Logger(Log.Sync).Warn(ctx, "accountsync client: WAL is nil — skipping WAL write", ion.Int("account_count", len(protoaccounts))) } - // ── 3. DB write ─────────────────────────────────────────────────────────── + // ── 2. DB write ─────────────────────────────────────────────────────────── if err := clienthelper.SyncVars.NodeInfo.BlockInfo.NewAccountManager().WriteAccounts(accounts); err != nil { return false, fmt.Errorf("accountsync client: DB write failed: %w", err) } @@ -99,12 +89,9 @@ func (clienthelper *clientHelper) WriteAccounts(protoaccounts []*accountspb.Acco Log.Logger(Log.Sync).Info(ctx, "accountsync client: accounts written to DB", ion.Int("account_count", len(accounts))) - // ── 4. WAL checkpoint ───────────────────────────────────────────────────── + // ── 3. Async checkpoint enqueue ─────────────────────────────────────────── if clienthelper.SyncVars.WAL != nil { - if _, err := clienthelper.SyncVars.WAL.CreateCheckpoint(); err != nil { - Log.Logger(Log.Sync).Warn(ctx, "accountsync client: WAL checkpoint failed after DB write", - ion.Err(err)) - } + clienthelper.SyncVars.WAL.AddCheckpointToBuffer() } return true, nil From 293a5decc9465b0d3c0f457a64f7dfb1f7396b3d Mon Sep 17 00:00:00 2001 From: Neeraj Chowdary <57310710+neerajchowdary889@users.noreply.github.com> Date: Wed, 20 May 2026 18:58:23 +0530 Subject: [PATCH 2/2] Improve WAL flushing error handling and logging Refactor the writeToWAL method in the AsyncBuffer to enhance error handling during the buffer flush operation. Introduce a variable to capture the last flushed LSN and log the number of coalesced entries committed to disk upon successful flush. This update improves the clarity of log messages and ensures better tracking of WAL operations. --- common/WAL/async_buffer.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/common/WAL/async_buffer.go b/common/WAL/async_buffer.go index fe67dc0..3e89fe7 100644 --- a/common/WAL/async_buffer.go +++ b/common/WAL/async_buffer.go @@ -166,10 +166,18 @@ func (b *AsyncBuffer) writeToWAL(snapshot []stackEntry) { } } } - if err := b.wal.flushBuffer(); err != nil { - log.Printf("async WAL: flush error: %v", err) + var flushErr error + var lastLSN uint64 + if flushErr = b.wal.flushBuffer(); flushErr != nil { + log.Printf("async WAL: flush error: %v", flushErr) + } else { + lastLSN = b.wal.lastFlushedLSN } b.wal.Mu.Unlock() + if flushErr == nil { + log.Printf("[WAL] %d coalesced entries committed to disk — last LSN %d", + len(se.entries), lastLSN) + } case kindCheckpoint: if _, err := b.wal.CreateCheckpoint(); err != nil {