Skip to content

Commit 4fe27e1

Browse files
chore(posting): remove dead commented-out code
Two large dead-code blocks left over from the original WIP: - posting/index.go: ~175 lines of an alternate InsertTokenizerIndexes implementation, fully commented out. The live implementation directly above it supersedes it; keeping the commented variant just made the file harder to follow. Also drop the scattered "//fmt.Println(...)" leftovers next to live code. - worker/draft_test.go: BenchmarkProcessListIndex was added entirely commented out and references methods (DefaultPipeline, ProcessListWithoutIndex, ProcessListIndex) that don't exist on MutationPipeline. If we want a benchmark for the pipeline, we should write one against the real API. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent a2fcc09 commit 4fe27e1

2 files changed

Lines changed: 0 additions & 284 deletions

File tree

posting/index.go

Lines changed: 0 additions & 183 deletions
Original file line numberDiff line numberDiff line change
@@ -284,183 +284,6 @@ func (mp *MutationPipeline) InsertTokenizerIndexes(ctx context.Context, pipeline
284284
return nil
285285
}
286286

287-
// func (mp *MutationPipeline) InsertTokenizerIndexes(ctx context.Context, pipeline *PredicatePipeline, postings *map[uint64]*pb.PostingList, info predicateInfo) error {
288-
// startTime := time.Now()
289-
// defer func() {
290-
// fmt.Println("Inserting tokenizer indexes for predicate", pipeline.attr, "took", time.Since(startTime))
291-
// }()
292-
293-
// tokenizers := schema.State().Tokenizer(ctx, pipeline.attr)
294-
// if len(tokenizers) == 0 {
295-
// return nil
296-
// }
297-
298-
// values := make(map[string]*pb.PostingList, len(tokenizers)*len(*postings))
299-
// valPost := make(map[string]*pb.Posting)
300-
301-
// indexEdge1 := &pb.DirectedEdge{
302-
// Attr: pipeline.attr,
303-
// }
304-
305-
// for uid, postingList := range *postings {
306-
// fmt.Println("POSTING", uid, postingList)
307-
// for _, posting := range postingList.Postings {
308-
// key := fmt.Sprintf("%s,%s", posting.LangTag, posting.Value)
309-
// valPl, ok := values[key]
310-
// if !ok {
311-
// valPl = &pb.PostingList{}
312-
// }
313-
314-
// indexEdge1.Op = GetPostingOp(posting.Op)
315-
// indexEdge1.ValueId = uid
316-
317-
// mpost := makePostingFromEdge(mp.txn.StartTs, indexEdge1)
318-
// valPl.Postings = append(valPl.Postings, mpost)
319-
// values[key] = valPl
320-
321-
// newPosting := new(pb.Posting)
322-
// newPosting.ValType = posting.ValType
323-
// newPosting.Value = posting.Value
324-
// newPosting.LangTag = posting.LangTag
325-
// valPost[key] = newPosting
326-
// }
327-
// }
328-
329-
// keysCreated := make([]string, 0, len(values))
330-
// for i := range values {
331-
// keysCreated = append(keysCreated, i)
332-
// }
333-
334-
// //fmt.Println("START")
335-
336-
// f := func(numGo int) *types.LockedShardedMap[string, *MutableLayer] {
337-
// wg := &sync.WaitGroup{}
338-
339-
// globalMap := types.NewLockedShardedMap[string, *MutableLayer]()
340-
// process := func(start int) {
341-
// factorySpecs, err := schema.State().FactoryCreateSpec(ctx, pipeline.attr)
342-
// if err != nil {
343-
// pipeline.errCh <- err
344-
// return
345-
// }
346-
347-
// defer wg.Done()
348-
// localMap := make(map[string]*pb.PostingList, len(values)/numGo)
349-
// for i := start; i < len(values); i += numGo {
350-
// key := keysCreated[i]
351-
// valPl := values[key]
352-
// if len(valPl.Postings) == 0 {
353-
// continue
354-
// }
355-
356-
// posting := valPost[key]
357-
// // Build info per iteration without indexEdge.
358-
// info := &indexMutationInfo{
359-
// tokenizers: tokenizers,
360-
// factorySpecs: factorySpecs,
361-
// op: pb.DirectedEdge_SET,
362-
// val: types.Val{
363-
// Tid: types.TypeID(posting.ValType),
364-
// Value: posting.Value,
365-
// },
366-
// }
367-
368-
// info.edge = &pb.DirectedEdge{
369-
// Attr: pipeline.attr,
370-
// Op: pb.DirectedEdge_SET,
371-
// Lang: string(posting.LangTag),
372-
// Value: posting.Value,
373-
// }
374-
375-
// tokens, erri := indexTokens(ctx, info)
376-
// if erri != nil {
377-
// fmt.Println("ERRORRRING", erri)
378-
// x.Panic(erri)
379-
// }
380-
381-
// for _, token := range tokens {
382-
// key := x.IndexKey(pipeline.attr, token)
383-
// pk, _ := x.Parse([]byte(key))
384-
// fmt.Println("TOKENS", key, i, numGo, pk)
385-
// val, ok := localMap[string(key)]
386-
// if !ok {
387-
// val = &pb.PostingList{}
388-
// }
389-
// val.Postings = append(val.Postings, valPl.Postings...)
390-
// localMap[string(key)] = val
391-
// }
392-
// }
393-
394-
// for key, value := range localMap {
395-
// pk, _ := x.Parse([]byte(key))
396-
// fmt.Println("LOCAL MAP", pk, numGo, value)
397-
// globalMap.Update(key, func(val *MutableLayer, ok bool) *MutableLayer {
398-
// if !ok {
399-
// val = newMutableLayer()
400-
// val.currentEntries = &pb.PostingList{}
401-
// }
402-
// for _, posting := range value.Postings {
403-
// val.insertPosting(posting, false)
404-
// }
405-
// return val
406-
// })
407-
// }
408-
// }
409-
410-
// for i := range numGo {
411-
// wg.Add(1)
412-
// go process(i)
413-
// }
414-
415-
// wg.Wait()
416-
417-
// return globalMap
418-
// }
419-
420-
// globalMapI := f(1)
421-
422-
// mp.txn.cache.Lock()
423-
// defer mp.txn.cache.Unlock()
424-
425-
// globalMap := mp.txn.cache.deltas.GetIndexMapForPredicate(pipeline.attr)
426-
// if globalMap == nil {
427-
// globalMap = types.NewLockedShardedMap[string, *pb.PostingList]()
428-
// mp.txn.cache.deltas.indexMap[pipeline.attr] = globalMap
429-
// }
430-
431-
// updateFn := func(key string, value *MutableLayer) {
432-
// globalMap.Update(key, func(val *pb.PostingList, ok bool) *pb.PostingList {
433-
// if !ok {
434-
// val = &pb.PostingList{}
435-
// }
436-
// val.Postings = append(val.Postings, value.currentEntries.Postings...)
437-
// return val
438-
// })
439-
// }
440-
441-
// if info.hasUpsert {
442-
// err := globalMapI.Iterate(func(key string, value *MutableLayer) error {
443-
// updateFn(key, value)
444-
// mp.txn.addConflictKey(farm.Fingerprint64([]byte(key)))
445-
// return nil
446-
// })
447-
// if err != nil {
448-
// return err
449-
// }
450-
// } else {
451-
// err := globalMapI.Iterate(func(key string, value *MutableLayer) error {
452-
// updateFn(key, value)
453-
// mp.txn.addConflictKeyWithUid([]byte(key), value.currentEntries, info.hasUpsert, info.noConflict)
454-
// return nil
455-
// })
456-
// if err != nil {
457-
// return err
458-
// }
459-
// }
460-
461-
// return nil
462-
// }
463-
464287
type predicateInfo struct {
465288
isList bool
466289
index bool
@@ -752,8 +575,6 @@ func (mp *MutationPipeline) ProcessCount(ctx context.Context, pipeline *Predicat
752575
continue
753576
}
754577

755-
//fmt.Println("COUNT STATS", uid, prevCount, newCount, postingList, list.Print())
756-
757578
edge.ValueId = uid
758579
edge.Op = pb.DirectedEdge_DEL
759580
if prevCount > 0 {
@@ -766,7 +587,6 @@ func (mp *MutationPipeline) ProcessCount(ctx context.Context, pipeline *Predicat
766587
}
767588

768589
for c, pl := range countMap {
769-
//fmt.Println("COUNT", c, pl)
770590
ck := x.CountKey(pipeline.attr, uint32(c), isReverseEdge)
771591
if newPl, err := mp.txn.AddDelta(string(ck), pl, true, true); err != nil {
772592
return err
@@ -788,7 +608,6 @@ func (mp *MutationPipeline) ProcessSingle(ctx context.Context, pipeline *Predica
788608

789609
var oldVal *pb.Posting
790610
for edge := range pipeline.edges {
791-
// fmt.Println("SINGLE EDGE", edge)
792611
if edge.Op != pb.DirectedEdge_DEL && !schemaExists {
793612
return errors.Errorf("runMutation: Unable to find schema for %s", edge.Attr)
794613
}
@@ -886,7 +705,6 @@ func (mp *MutationPipeline) ProcessSingle(ctx context.Context, pipeline *Predica
886705
baseKey := string(dataKey[:len(dataKey)-8]) // Avoid repeated conversion
887706

888707
for uid, pl := range postings {
889-
//fmt.Println("ADDING DELTA", uid, pipeline.attr, pl)
890708
binary.BigEndian.PutUint64(dataKey[len(dataKey)-8:], uid)
891709
key := baseKey + string(dataKey[len(dataKey)-8:])
892710

@@ -1102,7 +920,6 @@ func (mp *MutationPipeline) Process(ctx context.Context, edges []*pb.DirectedEdg
1102920
numWg := 0
1103921
eg, egCtx := errgroup.WithContext(ctx)
1104922
for _, edge := range edges {
1105-
//fmt.Println("PROCESSING EDGE", edge)
1106923
if edge.Op == pb.DirectedEdge_DEL && string(edge.Value) == x.Star {
1107924
l, err := mp.txn.Get(x.DataKey(edge.Attr, edge.Entity))
1108925
if err != nil {

worker/draft_test.go

Lines changed: 0 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -38,107 +38,6 @@ func getEntryForCommit(index, startTs, commitTs uint64) raftpb.Entry {
3838
return raftpb.Entry{Index: index, Term: 1, Type: raftpb.EntryNormal, Data: data}
3939
}
4040

41-
// func BenchmarkProcessListIndex(b *testing.B) {
42-
// dir, err := os.MkdirTemp("", "storetest_")
43-
// x.Check(err)
44-
// defer os.RemoveAll(dir)
45-
46-
// opt := badger.DefaultOptions(dir)
47-
// ps, err := badger.OpenManaged(opt)
48-
// x.Check(err)
49-
// pstore = ps
50-
// // Not using posting list cache
51-
// posting.Init(ps, 0, false)
52-
// Init(ps)
53-
// err = schema.ParseBytes([]byte("testAttr: [string] @index(exact) ."), 1)
54-
// require.NoError(b, err)
55-
56-
// ctx := context.Background()
57-
// pipeline := &PredicatePipeline{
58-
// attr: "0-testAttr",
59-
// edges: make(chan *pb.DirectedEdge, 1000),
60-
// wg: &sync.WaitGroup{},
61-
// errCh: make(chan error, 1),
62-
// }
63-
64-
// txn := posting.Oracle().RegisterStartTs(5)
65-
// mp := &MutationPipeline{txn: txn}
66-
67-
// // Generate 1000 edges
68-
// populatePipeline := func() {
69-
// pipeline = &PredicatePipeline{
70-
// attr: "0-testAttr",
71-
// edges: make(chan *pb.DirectedEdge, 1000),
72-
// wg: &sync.WaitGroup{},
73-
// errCh: make(chan error, 1),
74-
// }
75-
76-
// txn = posting.Oracle().RegisterStartTs(5)
77-
// mp = &MutationPipeline{txn: txn}
78-
79-
// for i := 0; i < 1000; i++ {
80-
// edge := &pb.DirectedEdge{
81-
// Entity: uint64(i + 1),
82-
// Attr: "0-testAttr",
83-
// Value: []byte(fmt.Sprintf("value%d", rand.Intn(1000))),
84-
// ValueType: pb.Posting_STRING,
85-
// Op: pb.DirectedEdge_SET,
86-
// }
87-
// pipeline.edges <- edge
88-
// }
89-
// }
90-
91-
// b.ResetTimer()
92-
93-
// b.Run("Baseline", func(b *testing.B) {
94-
// for i := 0; i < b.N; i++ {
95-
// populatePipeline()
96-
// }
97-
// })
98-
99-
// b.Run("DefaultPipeline", func(b *testing.B) {
100-
// for i := 0; i < b.N; i++ {
101-
// populatePipeline()
102-
// var wg sync.WaitGroup
103-
// wg.Add(1)
104-
// go func() {
105-
// mp.DefaultPipeline(ctx, pipeline)
106-
// wg.Done()
107-
// }()
108-
// close(pipeline.edges)
109-
// wg.Wait()
110-
// }
111-
// })
112-
113-
// b.Run("ProcessListWithoutIndex", func(b *testing.B) {
114-
// for i := 0; i < b.N; i++ {
115-
// populatePipeline()
116-
// var wg sync.WaitGroup
117-
// wg.Add(1)
118-
// go func() {
119-
// mp.ProcessListWithoutIndex(ctx, pipeline)
120-
// wg.Done()
121-
// }()
122-
// close(pipeline.edges)
123-
// wg.Wait()
124-
// }
125-
// })
126-
127-
// b.Run("ProcessListIndex", func(b *testing.B) {
128-
// for i := 0; i < b.N; i++ {
129-
// populatePipeline()
130-
// var wg sync.WaitGroup
131-
// wg.Add(1)
132-
// go func() {
133-
// mp.ProcessListIndex(ctx, pipeline)
134-
// wg.Done()
135-
// }()
136-
// close(pipeline.edges)
137-
// wg.Wait()
138-
// }
139-
// })
140-
// }
141-
14241
func TestCalculateSnapshot(t *testing.T) {
14342
dir := t.TempDir()
14443
ds := raftwal.Init(dir)

0 commit comments

Comments
 (0)