Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/WAL/WAL.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type WAL struct {

// lastFlushedLSN tracks the last LSN that was persisted to disk
lastFlushedLSN uint64

// asyncBuf is the CAS slot for the active async write buffer.
// Nil means no buffer is running; AddToBufferWAL creates one on demand.
asyncBuf atomic.Pointer[AsyncBuffer]
}

// WALEntry represents a single event in the WAL
Expand Down
238 changes: 238 additions & 0 deletions common/WAL/async_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package WAL

import (
"log"
"sync"
"sync/atomic"
"time"

wal_types "github.com/JupiterMetaLabs/JMDN-FastSync/common/types/wal"
)

const (
asyncFlushInterval = 60 * time.Millisecond
asyncIdleTimeout = 10 * time.Second
)

type entryKind uint8

const (
kindData entryKind = iota
kindCheckpoint
)

// pendingEntry holds a pre-serialised WAL event waiting to be assigned an LSN.
// Serialisation happens at enqueue time (no lock); LSN is assigned at flush time
// inside the WAL mutex so ordering is always correct even with multiple buffers.
type pendingEntry struct {
walType wal_types.WALType
data []byte
}

// stackEntry is one node in the coalescing stack.
// Adjacent data pushes are merged into a single node; checkpoint nodes act as
// barriers and are deduplicated when consecutive.
type stackEntry struct {
kind entryKind
entries []pendingEntry // non-nil for kindData only
}

// AsyncBuffer is a coalescing write buffer that batches WAL entries and flushes
// them to the underlying WAL every asyncFlushInterval.
//
// Merge rules (applied at push time against the current stack top):
// push data + top=data → append into top entry (merge)
// push data + top=checkpoint → new data node
// push checkpoint + top=checkpoint → deduplicate (no-op)
// push checkpoint + top=data → new checkpoint node
//
// The buffer self-closes after asyncIdleTimeout of inactivity: it drains,
// CAS-nils the global slot, and exits. The next AddToBufferWAL call creates a
// fresh buffer transparently.
type AsyncBuffer struct {
mu sync.Mutex
stack []stackEntry
closed bool

wal *WAL
global *atomic.Pointer[AsyncBuffer]

done chan struct{}
drainDone chan struct{}
closeOnce sync.Once

lastActivity atomic.Int64 // unix nanoseconds of last push
}

func newAsyncBuffer(w *WAL, global *atomic.Pointer[AsyncBuffer]) *AsyncBuffer {
b := &AsyncBuffer{
wal: w,
global: global,
done: make(chan struct{}),
drainDone: make(chan struct{}),
}
b.lastActivity.Store(time.Now().UnixNano())
return b
}

// pushData enqueues a pre-serialised entry.
// Returns false if the buffer is already closed; the caller must create a new one.
func (b *AsyncBuffer) pushData(e pendingEntry) bool {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return false
}
if len(b.stack) > 0 && b.stack[len(b.stack)-1].kind == kindData {
b.stack[len(b.stack)-1].entries = append(b.stack[len(b.stack)-1].entries, e)
} else {
b.stack = append(b.stack, stackEntry{kind: kindData, entries: []pendingEntry{e}})
}
b.lastActivity.Store(time.Now().UnixNano())
return true
}

// pushCheckpoint enqueues a checkpoint sentinel.
// Returns false if the buffer is closed.
func (b *AsyncBuffer) pushCheckpoint() bool {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return false
}
if len(b.stack) > 0 && b.stack[len(b.stack)-1].kind == kindCheckpoint {
// consecutive checkpoints collapse to one
return true
}
b.stack = append(b.stack, stackEntry{kind: kindCheckpoint})
b.lastActivity.Store(time.Now().UnixNano())
return true
}

// flushAndReport swaps the stack and writes it to WAL.
// Returns true if there was anything to flush.
func (b *AsyncBuffer) flushAndReport() bool {
b.mu.Lock()
if len(b.stack) == 0 {
b.mu.Unlock()
return false
}
snapshot := b.stack
b.stack = make([]stackEntry, 0, cap(snapshot))
b.mu.Unlock()

b.writeToWAL(snapshot)
return true
}

// closeAndDrain marks the buffer closed, CAS-nils the global slot so new writers
// immediately see a nil slot and create a fresh buffer, then writes any remaining
// stack entries to WAL and signals drainDone.
func (b *AsyncBuffer) closeAndDrain() {
b.mu.Lock()
b.closed = true
snapshot := b.stack
b.stack = nil
b.mu.Unlock()

// Unregister before flush: new writers can create a fresh buffer in parallel
// while this buffer is draining.
b.global.CompareAndSwap(b, nil)

b.writeToWAL(snapshot)
close(b.drainDone)
}

// writeToWAL processes a stack snapshot into the underlying WAL.
// For kindData nodes: acquires WAL.Mu, assigns LSNs, appends to WAL buffer,
// then calls flushBuffer once per node.
// For kindCheckpoint nodes: calls WAL.CreateCheckpoint (has its own lock).
func (b *AsyncBuffer) writeToWAL(snapshot []stackEntry) {
for _, se := range snapshot {
switch se.kind {
case kindData:
b.wal.Mu.Lock()
for _, pe := range se.entries {
lsn := b.wal.nextLSN()
b.wal.Buffer = append(b.wal.Buffer, WALEntry{
Type: pe.walType,
Data: pe.data,
LSN: lsn,
Timestamp: time.Now().Unix(),
})
if len(b.wal.Buffer) >= b.wal.BatchSize {
if err := b.wal.flushBuffer(); err != nil {
log.Printf("async WAL: mid-batch flush error: %v", err)
}
}
}
var flushErr error
var lastLSN uint64
if flushErr = b.wal.flushBuffer(); flushErr != nil {
log.Printf("async WAL: flush error: %v", flushErr)
} else {
lastLSN = b.wal.lastFlushedLSN
}
b.wal.Mu.Unlock()
if flushErr == nil {
log.Printf("[WAL] %d coalesced entries committed to disk — last LSN %d",
len(se.entries), lastLSN)
}

case kindCheckpoint:
if _, err := b.wal.CreateCheckpoint(); err != nil {
log.Printf("async WAL: checkpoint error: %v", err)
}
}
}
}

// Drain signals the buffer to stop accepting writes, flushes all remaining
// entries to WAL, and blocks until complete. Safe to call multiple times.
func (b *AsyncBuffer) Drain() {
b.closeOnce.Do(func() { close(b.done) })
<-b.drainDone
}

// run is the background goroutine: periodic flush every asyncFlushInterval,
// idle shutdown after asyncIdleTimeout of inactivity.
func (b *AsyncBuffer) run() {
ticker := time.NewTicker(asyncFlushInterval)
defer ticker.Stop()

idleTimer := time.NewTimer(asyncIdleTimeout)
defer idleTimer.Stop()

resetIdle := func() {
if !idleTimer.Stop() {
select {
case <-idleTimer.C:
default:
}
}
idleTimer.Reset(asyncIdleTimeout)
}

for {
select {
case <-ticker.C:
if b.flushAndReport() {
resetIdle()
}

case <-idleTimer.C:
since := time.Since(time.Unix(0, b.lastActivity.Load()))
if since < asyncIdleTimeout {
// spurious fire (activity happened after timer was set)
idleTimer.Reset(asyncIdleTimeout - since)
continue
}
b.closeAndDrain()
return

case <-b.done:
b.closeAndDrain()
return
}
}
}
76 changes: 71 additions & 5 deletions common/WAL/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"time"

wal_types "github.com/JupiterMetaLabs/JMDN-FastSync/common/types/wal"
Expand Down Expand Up @@ -297,24 +298,89 @@ func (w *WAL) GetLastFlushedLSN() uint64 {
return w.lastFlushedLSN
}

// Close ensures all buffered data is flushed before closing the underlying log.
// Close drains the async buffer (if any), flushes all remaining buffered entries,
// then closes the underlying log.
func (w *WAL) Close() error {
// Drain async buffer before acquiring Mu: writeToWAL inside Drain needs Mu.Lock.
w.DrainAsyncBuffer()

w.Mu.Lock()
defer w.Mu.Unlock()

// Flush remaining buffer
if err := w.flushBuffer(); err != nil {
return fmt.Errorf("failed to flush on close: %w", err)
}

// Close the WAL
if err := w.Log.Close(); err != nil {
return fmt.Errorf("failed to close WAL: %w", err)
}

return nil
}

// AddToBufferWAL serialises event immediately (no lock) and enqueues it into the
// async write buffer. The LSN is assigned and the entry is flushed to disk by the
// background goroutine every asyncFlushInterval (60 ms).
//
// The buffer is created on the first call and reused until it idles out.
// If a buffer is in the process of closing, this spins (rare, nanoseconds) until
// the CAS slot is clear, then creates a fresh buffer.
func (w *WAL) AddToBufferWAL(event wal_types.EventAdapter) error {
data, err := event.Serialize()
if err != nil {
return fmt.Errorf("async WAL: serialize: %w", err)
}
pe := pendingEntry{walType: event.GetType(), data: data}

for {
buf := w.asyncBuf.Load()
if buf != nil {
if buf.pushData(pe) {
return nil
}
// Buffer is closed but hasn't CAS-nil'd the slot yet. Yield and retry.
runtime.Gosched()
continue
}
nb := newAsyncBuffer(w, &w.asyncBuf)
if w.asyncBuf.CompareAndSwap(nil, nb) {
go nb.run()
nb.pushData(pe)
return nil
}
// Lost the CAS race to another goroutine — retry from top.
}
}

// AddCheckpointToBuffer enqueues a checkpoint sentinel into the async buffer.
// The checkpoint fires after all data entries already in the buffer have been
// flushed to WAL. Creates a buffer if none is active.
func (w *WAL) AddCheckpointToBuffer() {
for {
buf := w.asyncBuf.Load()
if buf != nil {
if buf.pushCheckpoint() {
return
}
runtime.Gosched()
continue
}
nb := newAsyncBuffer(w, &w.asyncBuf)
if w.asyncBuf.CompareAndSwap(nil, nb) {
go nb.run()
nb.pushCheckpoint()
return
}
}
}

// DrainAsyncBuffer synchronously flushes all pending async entries to WAL
// and closes the buffer. No-op if no buffer is active.
// Must be called before WAL.Close() and before the process exits.
func (w *WAL) DrainAsyncBuffer() {
if buf := w.asyncBuf.Load(); buf != nil {
buf.Drain()
}
}

// TruncateBefore eliminates all log entries with an LSN lower than the specified value.
// CAUTION: This is a destructive operation. Use only after state is safely checkpointed.
func (w *WAL) TruncateBefore(lsn uint64) error {
Expand Down
2 changes: 1 addition & 1 deletion core/protocol/router/data_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -1659,4 +1659,4 @@ func (router *Datarouter) HandleAccountsFetch(ctx context.Context, req *accounts
},
}
}
}
}
Loading
Loading