diff --git a/common/WAL/WAL.go b/common/WAL/WAL.go index c67b293..dc75101 100644 --- a/common/WAL/WAL.go +++ b/common/WAL/WAL.go @@ -31,6 +31,10 @@ type WAL struct { // lastFlushedLSN tracks the last LSN that was persisted to disk lastFlushedLSN uint64 + + // asyncBuf is the CAS slot for the active async write buffer. + // Nil means no buffer is running; AddToBufferWAL creates one on demand. + asyncBuf atomic.Pointer[AsyncBuffer] } // WALEntry represents a single event in the WAL diff --git a/common/WAL/async_buffer.go b/common/WAL/async_buffer.go new file mode 100644 index 0000000..3e89fe7 --- /dev/null +++ b/common/WAL/async_buffer.go @@ -0,0 +1,238 @@ +package WAL + +import ( + "log" + "sync" + "sync/atomic" + "time" + + wal_types "github.com/JupiterMetaLabs/JMDN-FastSync/common/types/wal" +) + +const ( + asyncFlushInterval = 60 * time.Millisecond + asyncIdleTimeout = 10 * time.Second +) + +type entryKind uint8 + +const ( + kindData entryKind = iota + kindCheckpoint +) + +// pendingEntry holds a pre-serialised WAL event waiting to be assigned an LSN. +// Serialisation happens at enqueue time (no lock); LSN is assigned at flush time +// inside the WAL mutex so ordering is always correct even with multiple buffers. +type pendingEntry struct { + walType wal_types.WALType + data []byte +} + +// stackEntry is one node in the coalescing stack. +// Adjacent data pushes are merged into a single node; checkpoint nodes act as +// barriers and are deduplicated when consecutive. +type stackEntry struct { + kind entryKind + entries []pendingEntry // non-nil for kindData only +} + +// AsyncBuffer is a coalescing write buffer that batches WAL entries and flushes +// them to the underlying WAL every asyncFlushInterval. +// +// Merge rules (applied at push time against the current stack top): +// push data + top=data → append into top entry (merge) +// push data + top=checkpoint → new data node +// push checkpoint + top=checkpoint → deduplicate (no-op) +// push checkpoint + top=data → new checkpoint node +// +// The buffer self-closes after asyncIdleTimeout of inactivity: it drains, +// CAS-nils the global slot, and exits. The next AddToBufferWAL call creates a +// fresh buffer transparently. +type AsyncBuffer struct { + mu sync.Mutex + stack []stackEntry + closed bool + + wal *WAL + global *atomic.Pointer[AsyncBuffer] + + done chan struct{} + drainDone chan struct{} + closeOnce sync.Once + + lastActivity atomic.Int64 // unix nanoseconds of last push +} + +func newAsyncBuffer(w *WAL, global *atomic.Pointer[AsyncBuffer]) *AsyncBuffer { + b := &AsyncBuffer{ + wal: w, + global: global, + done: make(chan struct{}), + drainDone: make(chan struct{}), + } + b.lastActivity.Store(time.Now().UnixNano()) + return b +} + +// pushData enqueues a pre-serialised entry. +// Returns false if the buffer is already closed; the caller must create a new one. +func (b *AsyncBuffer) pushData(e pendingEntry) bool { + b.mu.Lock() + defer b.mu.Unlock() + if b.closed { + return false + } + if len(b.stack) > 0 && b.stack[len(b.stack)-1].kind == kindData { + b.stack[len(b.stack)-1].entries = append(b.stack[len(b.stack)-1].entries, e) + } else { + b.stack = append(b.stack, stackEntry{kind: kindData, entries: []pendingEntry{e}}) + } + b.lastActivity.Store(time.Now().UnixNano()) + return true +} + +// pushCheckpoint enqueues a checkpoint sentinel. +// Returns false if the buffer is closed. +func (b *AsyncBuffer) pushCheckpoint() bool { + b.mu.Lock() + defer b.mu.Unlock() + if b.closed { + return false + } + if len(b.stack) > 0 && b.stack[len(b.stack)-1].kind == kindCheckpoint { + // consecutive checkpoints collapse to one + return true + } + b.stack = append(b.stack, stackEntry{kind: kindCheckpoint}) + b.lastActivity.Store(time.Now().UnixNano()) + return true +} + +// flushAndReport swaps the stack and writes it to WAL. +// Returns true if there was anything to flush. +func (b *AsyncBuffer) flushAndReport() bool { + b.mu.Lock() + if len(b.stack) == 0 { + b.mu.Unlock() + return false + } + snapshot := b.stack + b.stack = make([]stackEntry, 0, cap(snapshot)) + b.mu.Unlock() + + b.writeToWAL(snapshot) + return true +} + +// closeAndDrain marks the buffer closed, CAS-nils the global slot so new writers +// immediately see a nil slot and create a fresh buffer, then writes any remaining +// stack entries to WAL and signals drainDone. +func (b *AsyncBuffer) closeAndDrain() { + b.mu.Lock() + b.closed = true + snapshot := b.stack + b.stack = nil + b.mu.Unlock() + + // Unregister before flush: new writers can create a fresh buffer in parallel + // while this buffer is draining. + b.global.CompareAndSwap(b, nil) + + b.writeToWAL(snapshot) + close(b.drainDone) +} + +// writeToWAL processes a stack snapshot into the underlying WAL. +// For kindData nodes: acquires WAL.Mu, assigns LSNs, appends to WAL buffer, +// then calls flushBuffer once per node. +// For kindCheckpoint nodes: calls WAL.CreateCheckpoint (has its own lock). +func (b *AsyncBuffer) writeToWAL(snapshot []stackEntry) { + for _, se := range snapshot { + switch se.kind { + case kindData: + b.wal.Mu.Lock() + for _, pe := range se.entries { + lsn := b.wal.nextLSN() + b.wal.Buffer = append(b.wal.Buffer, WALEntry{ + Type: pe.walType, + Data: pe.data, + LSN: lsn, + Timestamp: time.Now().Unix(), + }) + if len(b.wal.Buffer) >= b.wal.BatchSize { + if err := b.wal.flushBuffer(); err != nil { + log.Printf("async WAL: mid-batch flush error: %v", err) + } + } + } + var flushErr error + var lastLSN uint64 + if flushErr = b.wal.flushBuffer(); flushErr != nil { + log.Printf("async WAL: flush error: %v", flushErr) + } else { + lastLSN = b.wal.lastFlushedLSN + } + b.wal.Mu.Unlock() + if flushErr == nil { + log.Printf("[WAL] %d coalesced entries committed to disk — last LSN %d", + len(se.entries), lastLSN) + } + + case kindCheckpoint: + if _, err := b.wal.CreateCheckpoint(); err != nil { + log.Printf("async WAL: checkpoint error: %v", err) + } + } + } +} + +// Drain signals the buffer to stop accepting writes, flushes all remaining +// entries to WAL, and blocks until complete. Safe to call multiple times. +func (b *AsyncBuffer) Drain() { + b.closeOnce.Do(func() { close(b.done) }) + <-b.drainDone +} + +// run is the background goroutine: periodic flush every asyncFlushInterval, +// idle shutdown after asyncIdleTimeout of inactivity. +func (b *AsyncBuffer) run() { + ticker := time.NewTicker(asyncFlushInterval) + defer ticker.Stop() + + idleTimer := time.NewTimer(asyncIdleTimeout) + defer idleTimer.Stop() + + resetIdle := func() { + if !idleTimer.Stop() { + select { + case <-idleTimer.C: + default: + } + } + idleTimer.Reset(asyncIdleTimeout) + } + + for { + select { + case <-ticker.C: + if b.flushAndReport() { + resetIdle() + } + + case <-idleTimer.C: + since := time.Since(time.Unix(0, b.lastActivity.Load())) + if since < asyncIdleTimeout { + // spurious fire (activity happened after timer was set) + idleTimer.Reset(asyncIdleTimeout - since) + continue + } + b.closeAndDrain() + return + + case <-b.done: + b.closeAndDrain() + return + } + } +} diff --git a/common/WAL/operations.go b/common/WAL/operations.go index f9d2adf..950e2d5 100644 --- a/common/WAL/operations.go +++ b/common/WAL/operations.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "runtime" "time" wal_types "github.com/JupiterMetaLabs/JMDN-FastSync/common/types/wal" @@ -297,24 +298,89 @@ func (w *WAL) GetLastFlushedLSN() uint64 { return w.lastFlushedLSN } -// Close ensures all buffered data is flushed before closing the underlying log. +// Close drains the async buffer (if any), flushes all remaining buffered entries, +// then closes the underlying log. func (w *WAL) Close() error { + // Drain async buffer before acquiring Mu: writeToWAL inside Drain needs Mu.Lock. + w.DrainAsyncBuffer() + w.Mu.Lock() defer w.Mu.Unlock() - // Flush remaining buffer if err := w.flushBuffer(); err != nil { return fmt.Errorf("failed to flush on close: %w", err) } - - // Close the WAL if err := w.Log.Close(); err != nil { return fmt.Errorf("failed to close WAL: %w", err) } - return nil } +// AddToBufferWAL serialises event immediately (no lock) and enqueues it into the +// async write buffer. The LSN is assigned and the entry is flushed to disk by the +// background goroutine every asyncFlushInterval (60 ms). +// +// The buffer is created on the first call and reused until it idles out. +// If a buffer is in the process of closing, this spins (rare, nanoseconds) until +// the CAS slot is clear, then creates a fresh buffer. +func (w *WAL) AddToBufferWAL(event wal_types.EventAdapter) error { + data, err := event.Serialize() + if err != nil { + return fmt.Errorf("async WAL: serialize: %w", err) + } + pe := pendingEntry{walType: event.GetType(), data: data} + + for { + buf := w.asyncBuf.Load() + if buf != nil { + if buf.pushData(pe) { + return nil + } + // Buffer is closed but hasn't CAS-nil'd the slot yet. Yield and retry. + runtime.Gosched() + continue + } + nb := newAsyncBuffer(w, &w.asyncBuf) + if w.asyncBuf.CompareAndSwap(nil, nb) { + go nb.run() + nb.pushData(pe) + return nil + } + // Lost the CAS race to another goroutine — retry from top. + } +} + +// AddCheckpointToBuffer enqueues a checkpoint sentinel into the async buffer. +// The checkpoint fires after all data entries already in the buffer have been +// flushed to WAL. Creates a buffer if none is active. +func (w *WAL) AddCheckpointToBuffer() { + for { + buf := w.asyncBuf.Load() + if buf != nil { + if buf.pushCheckpoint() { + return + } + runtime.Gosched() + continue + } + nb := newAsyncBuffer(w, &w.asyncBuf) + if w.asyncBuf.CompareAndSwap(nil, nb) { + go nb.run() + nb.pushCheckpoint() + return + } + } +} + +// DrainAsyncBuffer synchronously flushes all pending async entries to WAL +// and closes the buffer. No-op if no buffer is active. +// Must be called before WAL.Close() and before the process exits. +func (w *WAL) DrainAsyncBuffer() { + if buf := w.asyncBuf.Load(); buf != nil { + buf.Drain() + } +} + // TruncateBefore eliminates all log entries with an LSN lower than the specified value. // CAUTION: This is a destructive operation. Use only after state is safely checkpointed. func (w *WAL) TruncateBefore(lsn uint64) error { diff --git a/core/protocol/router/data_router.go b/core/protocol/router/data_router.go index 6450e44..2db9627 100644 --- a/core/protocol/router/data_router.go +++ b/core/protocol/router/data_router.go @@ -1659,4 +1659,4 @@ func (router *Datarouter) HandleAccountsFetch(ctx context.Context, req *accounts }, } } -} \ No newline at end of file +} diff --git a/core/protocol/router/helper/accounts/client_helper/client_helper.go b/core/protocol/router/helper/accounts/client_helper/client_helper.go index 307ecdd..871be52 100644 --- a/core/protocol/router/helper/accounts/client_helper/client_helper.go +++ b/core/protocol/router/helper/accounts/client_helper/client_helper.go @@ -45,11 +45,10 @@ func (clienthelper *clientHelper) GetSyncVars() *types.Syncvars { // WriteAccounts persists a page of accounts received from the server. // -// If WAL is configured: -// 1. Write AccountSyncEvent to WAL (crash-safe record before DB touch) -// 2. Flush WAL to disk -// 3. Write accounts to DB -// 4. Create WAL checkpoint (mark these entries as applied) +// WAL writes are async: the event is serialised and enqueued into the async +// buffer (no disk I/O on the hot path). The background buffer goroutine flushes +// to disk every 60 ms. The DB write happens immediately without waiting for the +// WAL flush — CreateAccount is idempotent so crash recovery via WAL replay is safe. // // If WAL is nil, writes directly to DB (e.g. in tests). func (clienthelper *clientHelper) WriteAccounts(protoaccounts []*accountspb.Account) (bool, error) { @@ -64,7 +63,7 @@ func (clienthelper *clientHelper) WriteAccounts(protoaccounts []*accountspb.Acco ctx := clienthelper.SyncVars.Ctx - // ── 1. WAL write ────────────────────────────────────────────────────────── + // ── 1. Async WAL enqueue (non-blocking) ─────────────────────────────────── if clienthelper.SyncVars.WAL != nil { event := &WAL.AccountSyncEvent{ BaseEvent: wal_types.BaseEvent{Operation: wal_types.OpAppend}, @@ -72,26 +71,17 @@ func (clienthelper *clientHelper) WriteAccounts(protoaccounts []*accountspb.Acco Accounts: protoaccounts, }, } - lsn, err := clienthelper.SyncVars.WAL.WriteEvent(event) - if err != nil { - return false, fmt.Errorf("accountsync client: WAL write failed: %w", err) + if err := clienthelper.SyncVars.WAL.AddToBufferWAL(event); err != nil { + return false, fmt.Errorf("accountsync client: async WAL enqueue failed: %w", err) } - Log.Logger(Log.Sync).Debug(ctx, "accountsync client: WAL event written", - ion.Int64("lsn", int64(lsn)), + Log.Logger(Log.Sync).Debug(ctx, "accountsync client: WAL entry enqueued async", ion.Int("account_count", len(protoaccounts))) - - // ── 2. WAL flush ────────────────────────────────────────────────────── - if err := clienthelper.SyncVars.WAL.Flush(); err != nil { - return false, fmt.Errorf("accountsync client: WAL flush failed: %w", err) - } - Log.Logger(Log.Sync).Debug(ctx, "accountsync client: WAL flushed", - ion.Int64("last_flushed_lsn", int64(clienthelper.SyncVars.WAL.GetLastFlushedLSN()))) } else { Log.Logger(Log.Sync).Warn(ctx, "accountsync client: WAL is nil — skipping WAL write", ion.Int("account_count", len(protoaccounts))) } - // ── 3. DB write ─────────────────────────────────────────────────────────── + // ── 2. DB write ─────────────────────────────────────────────────────────── if err := clienthelper.SyncVars.NodeInfo.BlockInfo.NewAccountManager().WriteAccounts(accounts); err != nil { return false, fmt.Errorf("accountsync client: DB write failed: %w", err) } @@ -99,12 +89,9 @@ func (clienthelper *clientHelper) WriteAccounts(protoaccounts []*accountspb.Acco Log.Logger(Log.Sync).Info(ctx, "accountsync client: accounts written to DB", ion.Int("account_count", len(accounts))) - // ── 4. WAL checkpoint ───────────────────────────────────────────────────── + // ── 3. Async checkpoint enqueue ─────────────────────────────────────────── if clienthelper.SyncVars.WAL != nil { - if _, err := clienthelper.SyncVars.WAL.CreateCheckpoint(); err != nil { - Log.Logger(Log.Sync).Warn(ctx, "accountsync client: WAL checkpoint failed after DB write", - ion.Err(err)) - } + clienthelper.SyncVars.WAL.AddCheckpointToBuffer() } return true, nil