-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.go
More file actions
364 lines (301 loc) · 13.6 KB
/
database.go
File metadata and controls
364 lines (301 loc) · 13.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
package main
import (
"database/sql"; "fmt"; "io/fs"; "log"; "os"; "path/filepath"; "strings"; "time"; "sync"; "runtime"
_ "github.com/mattn/go-sqlite3"
_ "github.com/knaka/go-sqlite3-fts5"
)
var DB *sql.DB
// Deleting the database files and recreating them is necessary. Because, for example:
// getNodeInfo function extracts the outlinks and attachments based on the ONLY_PUBLIC option (using isServed function) (Some lines can be excluded). And the getNodeInfo function is used inside upsertNodes.
// We can't use a column named "public" to determine if we are going to serve the node, because of this:
// If its set to ONLY_PUBLIC=yes at first, links in the excluded lines are ignored in the nodes. outlinks and attachments will not contain these.
// When its switched to ONLY_PUBLIC=no, excluded lines should not be excluded, however, we did not insert the links in the excluded lines to the database.
// If its set to ONLY_PUBLIC=no at first, links in the excluded lines are also inserted in the outlinks and attachments tables.
// When its switched to ONLY_PUBLIC=yes, we should exclude the links inside the excluded lines. However, they are in the table and queries will fetch them.
func syncMarker(cacheDir, filename, envValue, disableValue string) (change bool) {
markerPath := filepath.Join(cacheDir, filename)
_, err := os.Stat(markerPath)
markerExists := (err == nil)
wantsDisabled := envValue == disableValue
// If (marker doesn't exist but the option is wanted) OR (marker exists but the option is not wanted)
if (!markerExists && !wantsDisabled) || (markerExists && wantsDisabled) {
// If the database exists
if _,err = os.Stat(filepath.Join(cacheDir, "mandos.db")); err == nil {
fmt.Printf("%s variable has changed. The database will be regenerated.\n", strings.ToUpper(filename))
}
if !markerExists { os.WriteFile(markerPath, []byte{}, 0644)
} else { os.Remove(markerPath) }
return true // Signal that a change happened
}
return false
}
func checkDatabaseConsistency(cacheDir string) {
change1 := syncMarker(cacheDir, "only_public", getEnvValue("ONLY_PUBLIC"), "no")
change2 := syncMarker(cacheDir, "content_search", getEnvValue("CONTENT_SEARCH"), "false")
if change1 || change2 {
os.Remove(filepath.Join(cacheDir, "mandos.db"))
os.Remove(filepath.Join(cacheDir, "mandos.db-shm"))
os.Remove(filepath.Join(cacheDir, "mandos.db-wal"))
}
}
func InitDB() {
var err error
err = os.MkdirAll(getEnvValue("CACHE_FOLDER"), 0755); if err!=nil {log.Fatalln("Cache dir could not be created.", err)}
checkDatabaseConsistency(getEnvValue("CACHE_FOLDER"))
// Open (creates file if not exists)
DB, err = sql.Open("sqlite3", "file:"+filepath.Join(getEnvValue("CACHE_FOLDER"),"mandos.db"))
if err != nil { log.Fatal(err) }
// Ensure connection is alive
if err := DB.Ping(); err != nil { log.Fatal(err) }
// Optional pragmas for performance
_, _ = DB.Exec("PRAGMA journal_mode=WAL;") // Enable parallel reading on writes.
_, _ = DB.Exec("PRAGMA synchronous=NORMAL;")
_, _ = DB.Exec("PRAGMA foreign_keys = ON;") // Enable foreign keys.
// Create tables if they don't exist
if err := ensureSchema(DB); err != nil { log.Fatal(err) }
}
func ensureSchema(db *sql.DB) error {
tx, err := db.Begin()
if err != nil { return err }
defer tx.Rollback()
// Nodes: file is text (filepath), mtime as INTEGER, date as INTEGER (unix seconds), title TEXT
_, err = tx.Exec(`CREATE TABLE IF NOT EXISTS nodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file TEXT UNIQUE,
mtime INTEGER NOT NULL,
date INTEGER,
title TEXT
);
CREATE INDEX IF NOT EXISTS idx_node_file ON nodes(file);
CREATE INDEX IF NOT EXISTS idx_node_date ON nodes(date);
`)
if err != nil { return err }
_, err = tx.Exec(`CREATE TABLE IF NOT EXISTS outlinks (
"from" TEXT NOT NULL,
"to" TEXT NOT NULL,
PRIMARY KEY ("from", "to"),
FOREIGN KEY ("from") REFERENCES nodes(file) ON DELETE CASCADE
) WITHOUT ROWID;
CREATE INDEX IF NOT EXISTS idx_outlink_to ON outlinks("to");
`)
if err != nil { return err }
_, err = tx.Exec(`CREATE TABLE IF NOT EXISTS attachments (
"from" TEXT NOT NULL,
file TEXT NOT NULL,
PRIMARY KEY ("from", file)
FOREIGN KEY ("from") REFERENCES nodes(file) ON DELETE CASCADE
) WITHOUT ROWID;
CREATE INDEX IF NOT EXISTS idx_attachment_file ON attachments(file);
`)
if err != nil { return err }
// Params: one row per (from, key, value). Unique constraint prevents duplicates.
_, err = tx.Exec(`CREATE TABLE IF NOT EXISTS params (
"from" TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
PRIMARY KEY ("from", key, value)
FOREIGN KEY ("from") REFERENCES nodes(file) ON DELETE CASCADE
) WITHOUT ROWID;
CREATE INDEX IF NOT EXISTS idx_params_key_val_from ON params(key, value, "from");
CREATE INDEX IF NOT EXISTS idx_params_from ON params("from");
`)
if err != nil { return err }
// FTS5 Virtual Table for content searching
if getEnvValue("CONTENT_SEARCH") == "true" {
_, err = tx.Exec(`CREATE VIRTUAL TABLE IF NOT EXISTS nodes_fts USING fts5(
title, content,
content='', contentless_delete=1,
tokenize="unicode61 remove_diacritics 2 tokenchars '#'"
);`)
if err != nil { return err }
// Sync On Delete (Like ON DELETE CASCADE)
_, err = tx.Exec(`CREATE TRIGGER IF NOT EXISTS nodes_ad AFTER DELETE ON nodes BEGIN
DELETE FROM nodes_fts WHERE rowid = old.id;
END;`)
if err != nil { return err }
// We first delete, then insert. No need for update sync.
}
return tx.Commit()
}
// Modification times of the nodes in the db.
// key: path of the markdown node, considering notesPath as root
// value: modification time of the node
var sqlNodeMtimes = make(map[string]int64)
// Synchronize the filesystem with the database. Update modified nodes, remove deleted nodes and add new nodes.
func initialSyncWithDB() {
fmt.Println("Syncing the database with the filesystem.")
syncStartTime := time.Now()
rows, err := DB.Query(`SELECT file, mtime FROM nodes;`)
if err != nil { log.Fatalln(err) }
defer rows.Close()
for rows.Next() {
var file string; var mtime int64
if err := rows.Scan(&file, &mtime); err != nil { log.Fatalln(err) }
sqlNodeMtimes[file] = mtime
}
var newNodes = make(map[string]int64) // New and modified nodes.
err = filepath.WalkDir(notesPath, func(npath string, d fs.DirEntry, err error) error {
if err != nil {return err}
fileName := filepath.Base(d.Name())
// Get only the non-hidden markdown files
if !d.IsDir() && strings.HasSuffix(fileName, ".md") && !strings.HasPrefix(fileName,".") {
relPath := strings.TrimPrefix(npath, notesPath)
fileinf,err := d.Info(); if err!=nil{ log.Println(relPath, err) }
mTime := fileinf.ModTime().Unix()
// Add to the newNodes table if it does not exists in the sqlNodeMtimes map or has a modified mtime
if sqlNodeMtimes[relPath] == 0 || (sqlNodeMtimes[relPath] != 0 && mTime != sqlNodeMtimes[relPath]) { newNodes[relPath]=mTime }
// Delete the node from the sqlNodeMtimes map if it exists in the filesystem. The remaining will be the deleted nodes.
if sqlNodeMtimes[relPath] != 0 { delete(sqlNodeMtimes, relPath) }
// TODO: This also skips the directories named "static" and "mandos" that are not in the root. Fix that.
}else if d.IsDir() && (d.Name() == "mandos") { return filepath.SkipDir }
return nil
})
if err != nil {fmt.Println("Error walking the path:", err)}
// The remaining sqlNodeMtimes fields are deleted ones. If they were exist in the filesystem, the code above would remove them from the map.
// Delete them from the database.
var deletedNodes []string
for deletedId := range sqlNodeMtimes {deletedNodes = append(deletedNodes, deletedId)}
deleteNodes(deletedNodes)
fmt.Println(len(deletedNodes), "node(s) are deleted from the database.")
// Add new nodes and update updated
fmt.Println(upsertNodes(newNodes), "node(s) are upserted in the database.")
fmt.Printf("Database synchronization is completed in %v ms\n", time.Since(syncStartTime).Milliseconds())
}
func deleteNodes(nodeIds []string) {
if len(nodeIds) == 0 { return }
tx, err := DB.Begin()
if err != nil { log.Println(err); return }
defer tx.Rollback()
delNodes, _ := tx.Prepare(`DELETE FROM nodes WHERE file = ?`)
defer delNodes.Close()
for _, id := range nodeIds {
delNodes.Exec(id);
// Remove the node from the cache.
nodeCache.Delete(id)
}
tx.Commit()
}
func upsertNodes(nodeIdMTimeMap map[string]int64) (count int) {
if len(nodeIdMTimeMap) == 0 { return 0 }
// 1. Setup Channel and WaitGroup
type result struct {node Node; mtime int64}
batchSize := 1000
// Channel to hold the results of getNodeInfo
jobs := make(chan result, batchSize)
var wg sync.WaitGroup
// Distribute work
pathChan := make(chan string, batchSize/2)
for range runtime.NumCPU() {
wg.Add(1)
go func() {
defer wg.Done()
for path := range pathChan {
node, err := getNodeInfo(path, false)
if err != nil {
log.Println("Error getting node info:", path, err); continue
}
// Update the node in the cache if exists, without moving it to forward.
nodeCache.Update(node.File, node)
jobs <- result{node: node, mtime: nodeIdMTimeMap[path]}
}
}()
}
// Feed workers with filepaths and close the path channel
go func() {
for p := range nodeIdMTimeMap { pathChan <- p }
close(pathChan)
wg.Wait()
close(jobs) // Close jobs once all workers are done
}()
tx, _ := DB.Begin() // Start the transaction
defer tx.Rollback() // Rollback if a critical error happens.
// --- PREPARE STATEMENTS --- //
// For cleaning the non-public nodes and old attachments, params and outlinks that does not exist anymore.
delNodes, _ := tx.Prepare(`DELETE FROM nodes WHERE file = ?`)
defer delNodes.Close()
stmtNode, _ := tx.Prepare(`INSERT INTO nodes (file, mtime, date, title) VALUES (?, ?, ?, ?)`)
defer stmtNode.Close()
var stmtNodeFTS *sql.Stmt
if getEnvValue("CONTENT_SEARCH")=="true"{
stmtNodeFTS, _ = tx.Prepare(`INSERT INTO nodes_fts (rowid, title, content) VALUES (?, ?, ?)`)
defer stmtNodeFTS.Close()
}
stmtLink, _ := tx.Prepare(`INSERT INTO outlinks ("from", "to") VALUES (?, ?)`)
defer stmtLink.Close()
stmtAtt, _ := tx.Prepare(`INSERT INTO attachments ("from", "file") VALUES (?, ?)`)
defer stmtAtt.Close()
// Using INSERT OR IGNORE to handle potential duplicate params/tags gracefully
stmtParam, _ := tx.Prepare(`INSERT OR IGNORE INTO params ("from", "key", "value") VALUES (?, ?, ?)`)
defer stmtParam.Close()
// This loop runs in the main thread, pulling data as it becomes available
for res := range jobs {
node,mtime := res.node,res.mtime
// Delete existing node. This will also delete the private nodes.
if _, err := delNodes.Exec(node.File); err != nil { log.Println("Error deleting node:", node.File, err) }
// Skip the private nodes. We will not reinsert them.
if !isServed(node.Public){continue}
// Insert the node
result, err := stmtNode.Exec(node.File, mtime, node.Date, node.Title);
// If it gives an error, skip inserting things related to this node completely.
if err != nil { log.Println("Error inserting node:", node.File, err); continue }
// Insert the index of the node content.
if getEnvValue("CONTENT_SEARCH")=="true" {
newNodeRowId, err := result.LastInsertId()
if err != nil{log.Println("Error while getting node's last insert id:", node.File, err)}
_,err = stmtNodeFTS.Exec(newNodeRowId, node.Title, node.Content)
if err!=nil{log.Println("Error while inserting index of the node content:",node.File, err);}
}
// Insert Outlinks
for _, target := range node.OutLinks { _,err := stmtLink.Exec(node.File, target); if err!=nil{log.Println(node.File, target, err)} }
// Insert Attachments
for _, att := range node.Attachments { _,err := stmtAtt.Exec(node.File, att); if err!=nil{log.Println(node.File, att, err)} }
// Insert Params. Params is map[string]any, but values can only be string or []string
for key, val := range node.Params {
switch v := val.(type) {
case string: stmtParam.Exec(node.File, key, v)
case []string: for _, subVal := range v { stmtParam.Exec(node.File, key, subVal) }
// use fmt.Sprint as fallback.
default: stmtParam.Exec(node.File, key, fmt.Sprint(val))
}
}
count++
}
// Do the final commit and cleanup.
tx.Commit()
return count
}
// Execute the queryStr with queryVals values, then return the rows in []map[string]any where key is the column name and value is the column value.
func Query(queryStr string, queryVals []any) (returnData []map[string]any) {
// Prefer the cached data.
returnData, exists := queryCache.Get(GetQueryKey(queryStr, queryVals...))
if exists {return returnData}
rows, err := DB.Query(queryStr, queryVals...)
if err!=nil{log.Println(err); return returnData}
defer rows.Close()
// Get the column names
columns,err := rows.Columns()
if err!=nil{log.Println("Columns error:",err); return returnData}
for rows.Next() {
// Prepare a slice of 'any' to hold the data and a slice of pointers to those 'any'
values := make([]any, len(columns))
valuePointers := make([]any, len(columns))
for i := range values { valuePointers[i] = &values[i] }
// Scan columns in the row and set their values to values slice.
if err := rows.Scan(valuePointers...); err != nil {
log.Println("failed to scan node row: %w", err); return returnData
}
rowMap := make(map[string]any)
for i, colName := range columns {
val := values[i]
// SQLite returns texts as []byte. We need to convert them to strings.
if b, ok := val.([]byte); ok { rowMap[colName] = string(b)
} else { rowMap[colName] = val }
}
returnData = append(returnData, rowMap)
}
// Cache the returned data if not empty.
if len(returnData) > 0 {
queryCache.Set(GetQueryKey(queryStr, queryVals...), returnData, time.Second*10)
}
return returnData
}