Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
type App struct {
dcsDivergeTime time.Time
replFailTime time.Time
lostSince time.Time
critical atomic.Value
ctx context.Context
dcs dcs.DCS
Expand Down Expand Up @@ -108,6 +109,22 @@ func (app *App) connectDCS() error {
return nil
}

func (app *App) reconnectDCS() error {
app.logger.Info("Attempting DCS reconnection after prolonged Lost state")
oldDCS := app.dcs
err := app.connectDCS()
if err != nil {
app.logger.Error("DCS reconnection failed", slog.Any("error", err))
app.dcs = oldDCS
return err
}
app.dcs.SetDisconnectCallback(func() error { return app.handleCritical() })
app.shard.SetDCS(app.dcs)
oldDCS.Close()
app.logger.Info("DCS reconnection successful")
return nil
}

func (app *App) lockDaemonFile() {
app.daemonLock = flock.New(app.config.DaemonLockFile)
if locked, err := app.daemonLock.TryLock(); !locked {
Expand Down Expand Up @@ -184,6 +201,11 @@ func (app *App) Run() int {
if nextState == app.state {
break
}
if nextState == stateLost && app.state != stateLost {
app.lostSince = time.Now()
} else if nextState != stateLost && app.state == stateLost {
app.lostSince = time.Time{}
}
app.state = nextState
}
case <-app.ctx.Done():
Expand Down
10 changes: 10 additions & 0 deletions internal/app/lost.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,22 @@ package app
import (
"fmt"
"log/slog"
"time"
)

func (app *App) stateLost() appState {
if app.dcs.IsConnected() {
return stateCandidate
}
if !app.lostSince.IsZero() && time.Since(app.lostSince) >= app.config.DcsReconnectTimeout {
app.logger.Warn(fmt.Sprintf("Lost state persisted for %s, attempting DCS reconnection", time.Since(app.lostSince).Truncate(time.Second)))
err := app.reconnectDCS()
app.lostSince = time.Now()
if err != nil {
app.logger.Error("DCS reconnection attempt failed, will retry later", slog.Any("error", err))
}
return stateLost
}
if len(app.shard.Hosts()) == 1 {
return stateLost
}
Expand Down
2 changes: 2 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type Config struct {
InfoFileHandlerInterval time.Duration `yaml:"info_file_handler_interval"`
InactivationDelay time.Duration `yaml:"inactivation_delay"`
DcsWaitTimeout time.Duration `yaml:"dcs_wait_timeout"`
DcsReconnectTimeout time.Duration `yaml:"dcs_reconnect_timeout"`
TickInterval time.Duration `yaml:"tick_interval"`
PingStable int `yaml:"ping_stable"`
}
Expand Down Expand Up @@ -178,6 +179,7 @@ func DefaultConfig() (Config, error) {
PprofAddr: "",
Zookeeper: zkConfig,
DcsWaitTimeout: 10 * time.Second,
DcsReconnectTimeout: 2 * time.Minute,
Valkey: DefaultValkeyConfig(),
SentinelMode: sentinelConf,
}
Expand Down
7 changes: 7 additions & 0 deletions internal/valkey/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ func NewShard(config *config.Config, logger *slog.Logger, dcs dcs.DCS) *Shard {
return s
}

// SetDCS updates the DCS reference used by the shard
func (s *Shard) SetDCS(newDCS dcs.DCS) {
s.Lock()
defer s.Unlock()
s.dcs = newDCS
}

// GetShardHostsFromDcs returns current shard hosts from dcs state
func (s *Shard) GetShardHostsFromDcs() ([]string, error) {
fqdns, err := s.dcs.GetChildren(dcs.PathHANodesPrefix)
Expand Down
Loading