@@ -284,183 +284,6 @@ func (mp *MutationPipeline) InsertTokenizerIndexes(ctx context.Context, pipeline
284284 return nil
285285}
286286
287- // func (mp *MutationPipeline) InsertTokenizerIndexes(ctx context.Context, pipeline *PredicatePipeline, postings *map[uint64]*pb.PostingList, info predicateInfo) error {
288- // startTime := time.Now()
289- // defer func() {
290- // fmt.Println("Inserting tokenizer indexes for predicate", pipeline.attr, "took", time.Since(startTime))
291- // }()
292-
293- // tokenizers := schema.State().Tokenizer(ctx, pipeline.attr)
294- // if len(tokenizers) == 0 {
295- // return nil
296- // }
297-
298- // values := make(map[string]*pb.PostingList, len(tokenizers)*len(*postings))
299- // valPost := make(map[string]*pb.Posting)
300-
301- // indexEdge1 := &pb.DirectedEdge{
302- // Attr: pipeline.attr,
303- // }
304-
305- // for uid, postingList := range *postings {
306- // fmt.Println("POSTING", uid, postingList)
307- // for _, posting := range postingList.Postings {
308- // key := fmt.Sprintf("%s,%s", posting.LangTag, posting.Value)
309- // valPl, ok := values[key]
310- // if !ok {
311- // valPl = &pb.PostingList{}
312- // }
313-
314- // indexEdge1.Op = GetPostingOp(posting.Op)
315- // indexEdge1.ValueId = uid
316-
317- // mpost := makePostingFromEdge(mp.txn.StartTs, indexEdge1)
318- // valPl.Postings = append(valPl.Postings, mpost)
319- // values[key] = valPl
320-
321- // newPosting := new(pb.Posting)
322- // newPosting.ValType = posting.ValType
323- // newPosting.Value = posting.Value
324- // newPosting.LangTag = posting.LangTag
325- // valPost[key] = newPosting
326- // }
327- // }
328-
329- // keysCreated := make([]string, 0, len(values))
330- // for i := range values {
331- // keysCreated = append(keysCreated, i)
332- // }
333-
334- // //fmt.Println("START")
335-
336- // f := func(numGo int) *types.LockedShardedMap[string, *MutableLayer] {
337- // wg := &sync.WaitGroup{}
338-
339- // globalMap := types.NewLockedShardedMap[string, *MutableLayer]()
340- // process := func(start int) {
341- // factorySpecs, err := schema.State().FactoryCreateSpec(ctx, pipeline.attr)
342- // if err != nil {
343- // pipeline.errCh <- err
344- // return
345- // }
346-
347- // defer wg.Done()
348- // localMap := make(map[string]*pb.PostingList, len(values)/numGo)
349- // for i := start; i < len(values); i += numGo {
350- // key := keysCreated[i]
351- // valPl := values[key]
352- // if len(valPl.Postings) == 0 {
353- // continue
354- // }
355-
356- // posting := valPost[key]
357- // // Build info per iteration without indexEdge.
358- // info := &indexMutationInfo{
359- // tokenizers: tokenizers,
360- // factorySpecs: factorySpecs,
361- // op: pb.DirectedEdge_SET,
362- // val: types.Val{
363- // Tid: types.TypeID(posting.ValType),
364- // Value: posting.Value,
365- // },
366- // }
367-
368- // info.edge = &pb.DirectedEdge{
369- // Attr: pipeline.attr,
370- // Op: pb.DirectedEdge_SET,
371- // Lang: string(posting.LangTag),
372- // Value: posting.Value,
373- // }
374-
375- // tokens, erri := indexTokens(ctx, info)
376- // if erri != nil {
377- // fmt.Println("ERRORRRING", erri)
378- // x.Panic(erri)
379- // }
380-
381- // for _, token := range tokens {
382- // key := x.IndexKey(pipeline.attr, token)
383- // pk, _ := x.Parse([]byte(key))
384- // fmt.Println("TOKENS", key, i, numGo, pk)
385- // val, ok := localMap[string(key)]
386- // if !ok {
387- // val = &pb.PostingList{}
388- // }
389- // val.Postings = append(val.Postings, valPl.Postings...)
390- // localMap[string(key)] = val
391- // }
392- // }
393-
394- // for key, value := range localMap {
395- // pk, _ := x.Parse([]byte(key))
396- // fmt.Println("LOCAL MAP", pk, numGo, value)
397- // globalMap.Update(key, func(val *MutableLayer, ok bool) *MutableLayer {
398- // if !ok {
399- // val = newMutableLayer()
400- // val.currentEntries = &pb.PostingList{}
401- // }
402- // for _, posting := range value.Postings {
403- // val.insertPosting(posting, false)
404- // }
405- // return val
406- // })
407- // }
408- // }
409-
410- // for i := range numGo {
411- // wg.Add(1)
412- // go process(i)
413- // }
414-
415- // wg.Wait()
416-
417- // return globalMap
418- // }
419-
420- // globalMapI := f(1)
421-
422- // mp.txn.cache.Lock()
423- // defer mp.txn.cache.Unlock()
424-
425- // globalMap := mp.txn.cache.deltas.GetIndexMapForPredicate(pipeline.attr)
426- // if globalMap == nil {
427- // globalMap = types.NewLockedShardedMap[string, *pb.PostingList]()
428- // mp.txn.cache.deltas.indexMap[pipeline.attr] = globalMap
429- // }
430-
431- // updateFn := func(key string, value *MutableLayer) {
432- // globalMap.Update(key, func(val *pb.PostingList, ok bool) *pb.PostingList {
433- // if !ok {
434- // val = &pb.PostingList{}
435- // }
436- // val.Postings = append(val.Postings, value.currentEntries.Postings...)
437- // return val
438- // })
439- // }
440-
441- // if info.hasUpsert {
442- // err := globalMapI.Iterate(func(key string, value *MutableLayer) error {
443- // updateFn(key, value)
444- // mp.txn.addConflictKey(farm.Fingerprint64([]byte(key)))
445- // return nil
446- // })
447- // if err != nil {
448- // return err
449- // }
450- // } else {
451- // err := globalMapI.Iterate(func(key string, value *MutableLayer) error {
452- // updateFn(key, value)
453- // mp.txn.addConflictKeyWithUid([]byte(key), value.currentEntries, info.hasUpsert, info.noConflict)
454- // return nil
455- // })
456- // if err != nil {
457- // return err
458- // }
459- // }
460-
461- // return nil
462- // }
463-
464287type predicateInfo struct {
465288 isList bool
466289 index bool
@@ -752,8 +575,6 @@ func (mp *MutationPipeline) ProcessCount(ctx context.Context, pipeline *Predicat
752575 continue
753576 }
754577
755- //fmt.Println("COUNT STATS", uid, prevCount, newCount, postingList, list.Print())
756-
757578 edge .ValueId = uid
758579 edge .Op = pb .DirectedEdge_DEL
759580 if prevCount > 0 {
@@ -766,7 +587,6 @@ func (mp *MutationPipeline) ProcessCount(ctx context.Context, pipeline *Predicat
766587 }
767588
768589 for c , pl := range countMap {
769- //fmt.Println("COUNT", c, pl)
770590 ck := x .CountKey (pipeline .attr , uint32 (c ), isReverseEdge )
771591 if newPl , err := mp .txn .AddDelta (string (ck ), pl , true , true ); err != nil {
772592 return err
@@ -788,7 +608,6 @@ func (mp *MutationPipeline) ProcessSingle(ctx context.Context, pipeline *Predica
788608
789609 var oldVal * pb.Posting
790610 for edge := range pipeline .edges {
791- // fmt.Println("SINGLE EDGE", edge)
792611 if edge .Op != pb .DirectedEdge_DEL && ! schemaExists {
793612 return errors .Errorf ("runMutation: Unable to find schema for %s" , edge .Attr )
794613 }
@@ -886,7 +705,6 @@ func (mp *MutationPipeline) ProcessSingle(ctx context.Context, pipeline *Predica
886705 baseKey := string (dataKey [:len (dataKey )- 8 ]) // Avoid repeated conversion
887706
888707 for uid , pl := range postings {
889- //fmt.Println("ADDING DELTA", uid, pipeline.attr, pl)
890708 binary .BigEndian .PutUint64 (dataKey [len (dataKey )- 8 :], uid )
891709 key := baseKey + string (dataKey [len (dataKey )- 8 :])
892710
@@ -1102,7 +920,6 @@ func (mp *MutationPipeline) Process(ctx context.Context, edges []*pb.DirectedEdg
1102920 numWg := 0
1103921 eg , egCtx := errgroup .WithContext (ctx )
1104922 for _ , edge := range edges {
1105- //fmt.Println("PROCESSING EDGE", edge)
1106923 if edge .Op == pb .DirectedEdge_DEL && string (edge .Value ) == x .Star {
1107924 l , err := mp .txn .Get (x .DataKey (edge .Attr , edge .Entity ))
1108925 if err != nil {
0 commit comments