Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
394 changes: 340 additions & 54 deletions cacheaside.go

Large diffs are not rendered by default.

554 changes: 554 additions & 0 deletions cacheaside_test.go

Large diffs are not rendered by default.

48 changes: 48 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package redcache

import (
"errors"
"fmt"
"strings"
)

// ErrLockLost indicates the distributed lock was lost or expired before the value could be set.
// This can occur if the lock TTL expires during callback execution or if Redis invalidates the lock.
var ErrLockLost = errors.New("lock was lost or expired before value could be set")

// BatchError represents partial failures in a multi-key operation.
// Some keys may have succeeded while others failed.
type BatchError struct {
// Failed maps each failed key to its error.
Failed map[string]error
// Succeeded lists the keys that were set successfully.
Succeeded []string
}

// Error returns a human-readable summary of the batch failure.
func (e *BatchError) Error() string {
var b strings.Builder
fmt.Fprintf(&b, "batch operation partially failed: %d succeeded, %d failed", len(e.Succeeded), len(e.Failed))
for key, err := range e.Failed {
fmt.Fprintf(&b, "; key %q: %s", key, err)
}
return b.String()
}

// HasFailures returns true if any keys failed.
func (e *BatchError) HasFailures() bool {
return len(e.Failed) > 0
}

// NewBatchError creates a BatchError from the given failures and successes.
// Returns nil (untyped) if there are no failures, so it is safe to return
// directly as an error interface value.
func NewBatchError(failed map[string]error, succeeded []string) error {
if len(failed) == 0 {
return nil
}
return &BatchError{
Failed: failed,
Succeeded: succeeded,
}
}
53 changes: 53 additions & 0 deletions errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package redcache_test

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/dcbickfo/redcache"
)

func TestBatchError_Error(t *testing.T) {
be := &redcache.BatchError{
Failed: map[string]error{"key1": errors.New("timeout"), "key2": errors.New("lock lost")},
Succeeded: []string{"key3"},
}
msg := be.Error()
assert.Contains(t, msg, "1 succeeded")
assert.Contains(t, msg, "2 failed")
assert.Contains(t, msg, "key1")
assert.Contains(t, msg, "key2")
}

func TestBatchError_HasFailures(t *testing.T) {
be := &redcache.BatchError{
Failed: map[string]error{"key1": errors.New("err")},
Succeeded: []string{"key2"},
}
assert.True(t, be.HasFailures())

beNoFail := &redcache.BatchError{
Failed: map[string]error{},
Succeeded: []string{"key1"},
}
assert.False(t, beNoFail.HasFailures())
}

func TestNewBatchError_NilWhenNoFailures(t *testing.T) {
be := redcache.NewBatchError(map[string]error{}, []string{"key1"})
assert.Nil(t, be)
}

func TestNewBatchError_ReturnsErrorWhenFailures(t *testing.T) {
failed := map[string]error{"key1": errors.New("oops")}
succeeded := []string{"key2"}
err := redcache.NewBatchError(failed, succeeded)
require.NotNil(t, err)
var be *redcache.BatchError
require.ErrorAs(t, err, &be)
assert.Equal(t, failed, be.Failed)
assert.Equal(t, succeeded, be.Succeeded)
}
15 changes: 13 additions & 2 deletions internal/cmdx/slot.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
// Package cmdx provides Redis cluster slot calculation utilities.
package cmdx

// https://redis.io/topics/cluster-spec

const (
// RedisClusterSlots is the maximum slot number in a Redis cluster (16384 total slots, numbered 0-16383).
RedisClusterSlots = 16383
)

// GroupBySlot groups items by their Redis cluster slot, using keyFn to extract
// the key for slot computation.
func GroupBySlot[V any](items []V, keyFn func(V) string) map[uint16][]V {
groups := make(map[uint16][]V)
for _, item := range items {
slot := Slot(keyFn(item))
groups[slot] = append(groups[slot], item)
}
return groups
}

// Slot returns the Redis cluster slot for the given key, following the CRC16 hash tag spec.
func Slot(key string) uint16 {
var s, e int
for ; s < len(key); s++ {
Expand Down
37 changes: 37 additions & 0 deletions internal/cmdx/slot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,43 @@ func TestSlot_BoundaryValues(t *testing.T) {
}
}

func TestGroupBySlot(t *testing.T) {
type item struct {
key string
value int
}

items := []item{
{key: "{user:1}:a", value: 1},
{key: "{user:1}:b", value: 2},
{key: "{user:2}:a", value: 3},
{key: "standalone", value: 4},
}

groups := cmdx.GroupBySlot(items, func(i item) string { return i.key })

// {user:1}:a and {user:1}:b should be in the same slot (same hash tag).
slot1 := cmdx.Slot("{user:1}:a")
assert.Len(t, groups[slot1], 2)
assert.Equal(t, 1, groups[slot1][0].value)
assert.Equal(t, 2, groups[slot1][1].value)

// {user:2}:a should be in its own slot.
slot2 := cmdx.Slot("{user:2}:a")
assert.Len(t, groups[slot2], 1)
assert.Equal(t, 3, groups[slot2][0].value)

// standalone should be in its own slot.
slot3 := cmdx.Slot("standalone")
assert.Len(t, groups[slot3], 1)
assert.Equal(t, 4, groups[slot3][0].value)
}

func TestGroupBySlot_Empty(t *testing.T) {
groups := cmdx.GroupBySlot([]string{}, func(s string) string { return s })
assert.Empty(t, groups)
}

func BenchmarkSlot(b *testing.B) {
keys := []string{
"simple",
Expand Down
37 changes: 37 additions & 0 deletions internal/lockpool/lockpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Package lockpool provides fast lock value generation using an atomic counter
// and a per-instance UUID prefix.
package lockpool

import (
"strconv"
"sync/atomic"

"github.com/google/uuid"
)

// Pool generates unique lock values by combining a fixed instance UUID with an
// atomic counter. This avoids calling uuid.NewV7() per lock, which is expensive
// under high concurrency.
type Pool struct {
prefix string
instanceID string
counter atomic.Uint64
}

// New creates a Pool with the given lock prefix (e.g., "__redcache:lock:").
func New(prefix string) (*Pool, error) {
id, err := uuid.NewV7()
if err != nil {
return nil, err
}
return &Pool{
prefix: prefix,
instanceID: id.String(),
}, nil
}

// Generate returns a unique lock value: prefix + instanceID + ":" + counter.
func (p *Pool) Generate() string {
n := p.counter.Add(1)
return p.prefix + p.instanceID + ":" + strconv.FormatUint(n, 10)
}
65 changes: 65 additions & 0 deletions internal/lockpool/lockpool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package lockpool_test

import (
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/dcbickfo/redcache/internal/lockpool"
)

func TestPool_Generate_Prefix(t *testing.T) {
prefix := "__redcache:lock:"
pool, err := lockpool.New(prefix)
require.NoError(t, err)

val := pool.Generate()
assert.True(t, strings.HasPrefix(val, prefix), "expected prefix %q, got %q", prefix, val)
}

func TestPool_Generate_Uniqueness(t *testing.T) {
pool, err := lockpool.New("lock:")
require.NoError(t, err)

seen := make(map[string]struct{})
for range 1000 {
val := pool.Generate()
_, exists := seen[val]
assert.False(t, exists, "duplicate lock value: %s", val)
seen[val] = struct{}{}
}
}

func TestPool_Generate_ConcurrentSafety(t *testing.T) {
pool, err := lockpool.New("lock:")
require.NoError(t, err)

const goroutines = 100
const perGoroutine = 100

results := make(chan string, goroutines*perGoroutine)
var wg sync.WaitGroup
wg.Add(goroutines)

for range goroutines {
go func() {
defer wg.Done()
for range perGoroutine {
results <- pool.Generate()
}
}()
}
wg.Wait()
close(results)

seen := make(map[string]struct{})
for val := range results {
_, exists := seen[val]
assert.False(t, exists, "duplicate lock value under concurrency: %s", val)
seen[val] = struct{}{}
}
assert.Len(t, seen, goroutines*perGoroutine)
}
3 changes: 3 additions & 0 deletions internal/mapsx/maps.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Package mapsx provides generic helpers for map operations.
package mapsx

// Keys returns the keys of the map m in unspecified order.
func Keys[M ~map[K]V, K comparable, V any](m M) []K {
keys := make([]K, 0, len(m))
for k := range m {
Expand All @@ -8,6 +10,7 @@ func Keys[M ~map[K]V, K comparable, V any](m M) []K {
return keys
}

// Values returns the values of the map m in unspecified order.
func Values[M ~map[K]V, K comparable, V any](m M) []V {
values := make([]V, 0, len(m))
for _, v := range m {
Expand Down
2 changes: 2 additions & 0 deletions internal/syncx/map.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Package syncx provides generic typed wrappers around standard library sync primitives.
package syncx

import "sync"

// Map is a generic typed wrapper around sync.Map that avoids interface{} casts at call sites.
type Map[K comparable, V any] struct {
m sync.Map
}
Expand Down
56 changes: 31 additions & 25 deletions internal/syncx/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,41 @@ package syncx

import (
"context"
"iter"
"reflect"
"sync"
)

func WaitForAll[C ~<-chan V, V any](ctx context.Context, waitLock iter.Seq[C], length int) error {
cases := setupCases(ctx, waitLock, length)
for range length {
chosen, _, _ := reflect.Select(cases)
if ctx.Err() != nil {
return ctx.Err()
}
cases = append(cases[:chosen], cases[chosen+1:]...)
// WaitForAll blocks until all channels are closed (or receive a value) or the context is cancelled.
// Returns nil if all channels signaled, or the context error if cancelled first.
func WaitForAll[C ~<-chan V, V any](ctx context.Context, channels []C) error {
if len(channels) == 0 {
return nil
}
return nil
}

func setupCases[C ~<-chan V, V any](ctx context.Context, waitLock iter.Seq[C], length int) []reflect.SelectCase {
cases := make([]reflect.SelectCase, length+1)
i := 0
for ch := range waitLock {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
}
i++
// Merged channel: each goroutine sends one signal when its channel fires.
done := make(chan struct{}, len(channels))
var wg sync.WaitGroup
wg.Add(len(channels))

for _, ch := range channels {
go func() {
defer wg.Done()
select {
case <-ch:
done <- struct{}{}
case <-ctx.Done():
}
}()
}
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done()),

// Wait for all signals or context cancellation.
for range len(channels) {
select {
case <-done:
case <-ctx.Done():
// Ensure all goroutines finish before returning.
wg.Wait()
return ctx.Err()
}
}
return cases
return nil
}
Loading
Loading