Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 82 additions & 41 deletions rag/reindex-freshness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
} from "./indexer";

const REINDEX_STATE_KEY = "salem:docs:index:last";
const REINDEX_HEARTBEAT_KEY = "salem:docs:index:lastcheck";

export type { IndexResult };

Expand All @@ -22,6 +23,9 @@ export interface ReindexState {
export interface ReindexStateStore {
get(): Promise<ReindexState | null>;
set(state: ReindexState): Promise<void>;
// Optional: record a per-run heartbeat. Stores without it simply skip
// heartbeat tracking (keeps existing/custom stores backward-compatible).
recordCheck?(heartbeat: ReindexHeartbeat): Promise<void>;
}

export interface ReindexDecision {
Expand All @@ -35,6 +39,19 @@ export interface ReindexDecision {
errors: string[];
}

/**
* Recorded on every reindex evaluation — including skipped/unchanged runs that
* otherwise leave no trace — so scheduled (cron) runs are observable: read
* `salem:docs:index:lastcheck` from Redis to see when the cron last fired.
*/
export interface ReindexHeartbeat {
checkedAt: string;
trigger: string;
status: ReindexDecision["status"];
reason: ReindexDecision["reason"];
docsHash: string;
}

interface ReindexDocsOptions {
trigger: string;
force?: boolean;
Expand All @@ -57,6 +74,10 @@ class RedisReindexStateStore implements ReindexStateStore {
async set(state: ReindexState): Promise<void> {
await this.redis.set(REINDEX_STATE_KEY, state);
}

async recordCheck(heartbeat: ReindexHeartbeat): Promise<void> {
await this.redis.set(REINDEX_HEARTBEAT_KEY, heartbeat);
}
}

export function createRedisReindexStateStore(): ReindexStateStore | null {
Expand Down Expand Up @@ -96,8 +117,10 @@ export async function reindexDocsIfChanged({
const previousHash = previous?.docsHash ?? null;
const stateStorage = store ? (store instanceof RedisReindexStateStore ? "redis" : "custom") : "disabled";

let decision: ReindexDecision;

if (!force && previousHash === docsHash) {
return {
decision = {
status: "skipped",
reason: "unchanged",
docsHash,
Expand All @@ -116,48 +139,66 @@ export async function reindexDocsIfChanged({
stateStorage,
errors: [],
};
} else {
const result = await indexDocs();
const reason = force ? "forced" : previousHash ? "changed" : "first-index";

if (!result.success) {
decision = {
status: "error",
reason: "index-failed",
docsHash,
docsLength: docsText.length,
previousHash,
result,
stateStorage,
errors: result.errors,
};
} else {
if (store) {
await store.set({
docsHash,
docsLength: docsText.length,
docsUrl: LLMS_FULL_URL,
indexedAt: new Date().toISOString(),
trigger,
result: {
pagesProcessed: result.pagesProcessed,
chunksCreated: result.chunksCreated,
uniqueTerms: result.uniqueTerms,
duration: result.duration,
},
});
}

decision = {
status: "indexed",
reason,
docsHash,
docsLength: docsText.length,
previousHash,
result,
stateStorage,
errors: [],
};
}
}

const result = await indexDocs();
const reason = force ? "forced" : previousHash ? "changed" : "first-index";

if (!result.success) {
return {
status: "error",
reason: "index-failed",
docsHash,
docsLength: docsText.length,
previousHash,
result,
stateStorage,
errors: result.errors,
};
}

if (store) {
await store.set({
docsHash,
docsLength: docsText.length,
docsUrl: LLMS_FULL_URL,
indexedAt: new Date().toISOString(),
trigger,
result: {
pagesProcessed: result.pagesProcessed,
chunksCreated: result.chunksCreated,
uniqueTerms: result.uniqueTerms,
duration: result.duration,
},
});
// Heartbeat: record every evaluation (incl. skips) so scheduled runs are
// observable. Best-effort — a heartbeat failure must not fail the reindex.
if (store?.recordCheck) {
try {
await store.recordCheck({
checkedAt: new Date().toISOString(),
trigger,
status: decision.status,
reason: decision.reason,
docsHash,
});
} catch (error) {
console.warn("Reindex heartbeat write failed:", error);
}
}

return {
status: "indexed",
reason,
docsHash,
docsLength: docsText.length,
previousHash,
result,
stateStorage,
errors: [],
};
return decision;
}