From 857141701b957e99ce40c02714748103f7cdf66b Mon Sep 17 00:00:00 2001 From: Ivan Shakuta Date: Mon, 23 Mar 2026 20:58:51 +0100 Subject: [PATCH 1/3] feat: add streaming mode for prefix analysis (constant memory) The existing prefix command builds a full radix tree in memory, which OOMs on large RDB files. This adds a -streaming flag that uses a flat map[prefix]stats instead, keeping memory usage constant regardless of dump size. Usage: rdb -c prefix -streaming -max-depth 3 -prefix-sep ":" dump.rdb --- cmd.go | 10 ++- go.mod | 4 +- go.sum | 60 ++++++++++++++ helper/prefix_stream.go | 149 +++++++++++++++++++++++++++++++++++ helper/prefix_stream_test.go | 60 ++++++++++++++ 5 files changed, 280 insertions(+), 3 deletions(-) create mode 100644 helper/prefix_stream.go create mode 100644 helper/prefix_stream_test.go diff --git a/cmd.go b/cmd.go index da44bf0..3f4642e 100644 --- a/cmd.go +++ b/cmd.go @@ -74,6 +74,8 @@ func main() { var maxDepth int var concurrent int var showGlobalMeta bool + var sep string + var streaming bool var err error flagSet.StringVar(&cmd, "c", "", "command for rdb: json") flagSet.StringVar(&output, "o", "", "output file path") @@ -86,6 +88,8 @@ func main() { flagSet.StringVar(&expirationExpr, "expire", "", "expiration filter expression") flagSet.StringVar(&sizeExpr, "size", "", "size filter expression") flagSet.BoolVar(&noExpired, "no-expired", false, "filter expired keys(deprecated, please use expire)") + flagSet.StringVar(&sep, "prefix-sep", ":", "separator for streaming prefix analysis") + flagSet.BoolVar(&streaming, "streaming", false, "use streaming mode for prefix analysis (constant memory)") flagSet.BoolVar(&showGlobalMeta, "show-global-meta", false, "Show global meta likes redis-verion/ctime/functions") _ = flagSet.Parse(os.Args[1:]) // ExitOnError src := flagSet.Arg(0) @@ -142,7 +146,11 @@ func main() { case "bigkey": err = helper.FindBiggestKeys(src, n, outputFile, options...) case "prefix": - err = helper.PrefixAnalyse(src, n, maxDepth, outputFile, options...) + if streaming { + err = helper.StreamingPrefixAnalyse(src, n, maxDepth, sep, outputFile, options...) + } else { + err = helper.PrefixAnalyse(src, n, maxDepth, outputFile, options...) + } case "flamegraph": _, err = helper.FlameGraph(src, port, seps, options...) if err != nil { diff --git a/go.mod b/go.mod index e961701..141988c 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,9 @@ module github.com/hdt3213/rdb go 1.16 require ( - github.com/bytedance/sonic v1.12.1 + github.com/bytedance/sonic v1.15.0 github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect - github.com/klauspost/cpuid/v2 v2.2.8 // indirect + github.com/cloudwego/iasm v0.2.0 // indirect golang.org/x/arch v0.9.0 // indirect golang.org/x/sys v0.24.0 // indirect ) diff --git a/go.sum b/go.sum index 9fc9902..6e2a7af 100644 --- a/go.sum +++ b/go.sum @@ -1,16 +1,25 @@ +github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M= +github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.8.7 h1:d3sry5vGgVq/OpgozRUNP6xBsSo0mtNdwliApw+SAMQ= github.com/bytedance/sonic v1.8.7/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= github.com/bytedance/sonic v1.12.1 h1:jWl5Qz1fy7X1ioY74WqO0KjAMtAGQs4sYnjiEBiyX24= github.com/bytedance/sonic v1.12.1/go.mod h1:B8Gt/XvtZ3Fqj+iSKMypzymZxw/FVwgIGKzMzT9r/rk= +github.com/bytedance/sonic v1.15.0 h1:/PXeWFaR5ElNcVE84U0dOHjiMHQOwNIx3K4ymzh/uSE= +github.com/bytedance/sonic v1.15.0/go.mod h1:tFkWrPz0/CUCLEF4ri4UkHekCIcdnkqXw9VduqpJh0k= github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= github.com/bytedance/sonic/loader v0.2.0 h1:zNprn+lsIP06C/IqCHs3gPQIvnvpKbbxyXQP1iU4kWM= github.com/bytedance/sonic/loader v0.2.0/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= +github.com/bytedance/sonic/loader v0.5.0 h1:gXH3KVnatgY7loH5/TkeVyXPfESoqSBSBEiDd5VjlgE= +github.com/bytedance/sonic/loader v0.5.0/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo= github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y= github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= +github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M= +github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU= github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -20,26 +29,77 @@ github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBF github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY= +github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8= github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= golang.org/x/arch v0.0.0-20210923205945-b76863e36670 h1:18EFjUmQOcUvxNYSkA6jO9VAiXCnxFY6NyDX0bHDmkU= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.9.0 h1:ub9TgUInamJ8mrZIGlBG6/4TqWeMszd4N8lNorbrr6k= golang.org/x/arch v0.9.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= +golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= +golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/helper/prefix_stream.go b/helper/prefix_stream.go new file mode 100644 index 0000000..598f888 --- /dev/null +++ b/helper/prefix_stream.go @@ -0,0 +1,149 @@ +package helper + +import ( + "encoding/csv" + "errors" + "fmt" + "math" + "os" + "sort" + "strconv" + "strings" + + "github.com/hdt3213/rdb/bytefmt" + "github.com/hdt3213/rdb/core" + "github.com/hdt3213/rdb/model" +) + +type prefixStats struct { + size int + keyCount int +} + +// StreamingPrefixAnalyse reads an RDB file and aggregates memory usage by key prefix +// using constant memory (no radix tree). Keys are split by separator up to maxDepth. +func StreamingPrefixAnalyse(rdbFilename string, topN int, maxDepth int, separator string, output *os.File, options ...interface{}) error { + if rdbFilename == "" { + return errors.New("src file path is required") + } + if topN <= 0 { + topN = math.MaxInt + } + if maxDepth <= 0 { + maxDepth = math.MaxInt + } + + rdbFile, err := os.Open(rdbFilename) + if err != nil { + return fmt.Errorf("open rdb %s failed, %v", rdbFilename, err) + } + defer rdbFile.Close() + + var dec decoder = core.NewDecoder(rdbFile) + if dec, err = wrapDecoder(dec, options...); err != nil { + return err + } + + // flat map: "db:prefix" -> stats. One entry per unique prefix, typically 1K-50K entries. + prefixes := make(map[string]*prefixStats) + var keysSeen int + + err = dec.Parse(func(object model.RedisObject) bool { + key := object.GetKey() + db := object.GetDBIndex() + size := object.GetSize() + + // extract prefix at each depth level and accumulate + var charMode bool + var parts []string + if separator == "" { + // per-character split: prefix at depth N = first N chars of key + charMode = true + for _, ch := range key { + parts = append(parts, string(ch)) + } + } else { + parts = strings.SplitN(key, separator, maxDepth+1) + } + for depth := 1; depth <= maxDepth && depth <= len(parts); depth++ { + var prefix string + if charMode { + prefix = strings.Join(parts[:depth], "") + } else { + prefix = strings.Join(parts[:depth], separator) + if depth < len(parts) { + prefix += separator + "*" + } + } + mapKey := strconv.Itoa(db) + "\x00" + prefix + + s := prefixes[mapKey] + if s == nil { + s = &prefixStats{} + prefixes[mapKey] = s + } + s.size += size + s.keyCount++ + } + + keysSeen++ + if keysSeen%1_000_000 == 0 { + fmt.Fprintf(os.Stderr, "Processed %d keys, %d unique prefixes\n", keysSeen, len(prefixes)) + } + return true + }) + if err != nil { + return err + } + + fmt.Fprintf(os.Stderr, "Done: %d keys, %d unique prefixes\n", keysSeen, len(prefixes)) + + // sort by size descending + type entry struct { + db string + prefix string + size int + keyCount int + } + entries := make([]entry, 0, len(prefixes)) + for mapKey, s := range prefixes { + idx := strings.Index(mapKey, "\x00") + entries = append(entries, entry{ + db: mapKey[:idx], + prefix: mapKey[idx+1:], + size: s.size, + keyCount: s.keyCount, + }) + } + sort.Slice(entries, func(i, j int) bool { + return entries[i].size > entries[j].size + }) + + // write CSV + _, err = output.WriteString("database,prefix,size,size_readable,key_count\n") + if err != nil { + return fmt.Errorf("write header failed: %v", err) + } + csvWriter := csv.NewWriter(output) + defer csvWriter.Flush() + + limit := topN + if limit > len(entries) { + limit = len(entries) + } + for i := 0; i < limit; i++ { + e := entries[i] + err = csvWriter.Write([]string{ + e.db, + e.prefix, + strconv.Itoa(e.size), + bytefmt.FormatSize(uint64(e.size)), + strconv.Itoa(e.keyCount), + }) + if err != nil { + return fmt.Errorf("csv write failed: %v", err) + } + } + + return nil +} diff --git a/helper/prefix_stream_test.go b/helper/prefix_stream_test.go new file mode 100644 index 0000000..14def1a --- /dev/null +++ b/helper/prefix_stream_test.go @@ -0,0 +1,60 @@ +package helper + +import ( + "os" + "path/filepath" + "testing" +) + +func TestStreamingPrefixAnalyse(t *testing.T) { + err := os.MkdirAll("tmp", os.ModePerm) + if err != nil { + return + } + defer func() { + _ = os.RemoveAll("tmp") + }() + srcRdb := filepath.Join("../cases", "tree.rdb") + + // test with empty separator (per-character), depth 0 (unlimited), top 3 + actualFile := filepath.Join("tmp", "stream_tree.csv") + f, err := os.Create(actualFile) + if err != nil { + t.Fatal(err) + } + err = StreamingPrefixAnalyse(srcRdb, 3, 0, "", f, ) + if err != nil { + t.Fatal(err) + } + _ = f.Close() + + expectFile := filepath.Join("../cases", "tree.csv") + equals, err := compareFileByLine(t, actualFile, expectFile) + if err != nil { + t.Fatalf("error comparing files: %v", err) + } + if !equals { + t.Error("streaming prefix top3 result does not match expected") + } + + // test with empty separator, depth 2 + actualFile2 := filepath.Join("tmp", "stream_tree2.csv") + f2, err := os.Create(actualFile2) + if err != nil { + t.Fatal(err) + } + err = StreamingPrefixAnalyse(srcRdb, 0, 2, "", f2) + if err != nil { + t.Fatal(err) + } + _ = f2.Close() + + expectFile2 := filepath.Join("../cases", "tree2.csv") + equals, err = compareFileByLine(t, actualFile2, expectFile2) + if err != nil { + t.Fatalf("error comparing files: %v", err) + } + if !equals { + t.Error("streaming prefix depth2 result does not match expected") + } +} From 65fc49a835db15ff09412b5d8d20fac6ba37912c Mon Sep 17 00:00:00 2001 From: Ivan Shakuta Date: Mon, 23 Mar 2026 22:28:14 +0100 Subject: [PATCH 2/3] fix: avoid storing full keys as prefixes + add memory tracking - Only emit prefix entries where depth < segment count, preventing full keys from being stored as individual "prefixes" (23M -> 2.5K entries) - Add -track-mem flag to print heap/sys stats to stderr every 1M keys - Result: 132M keys / 55GB RDB parsed with ~87MB total memory --- cmd.go | 6 ++++- helper/prefix_stream.go | 54 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 53 insertions(+), 7 deletions(-) diff --git a/cmd.go b/cmd.go index 3f4642e..cf2bc85 100644 --- a/cmd.go +++ b/cmd.go @@ -76,6 +76,7 @@ func main() { var showGlobalMeta bool var sep string var streaming bool + var trackMem bool var err error flagSet.StringVar(&cmd, "c", "", "command for rdb: json") flagSet.StringVar(&output, "o", "", "output file path") @@ -90,6 +91,7 @@ func main() { flagSet.BoolVar(&noExpired, "no-expired", false, "filter expired keys(deprecated, please use expire)") flagSet.StringVar(&sep, "prefix-sep", ":", "separator for streaming prefix analysis") flagSet.BoolVar(&streaming, "streaming", false, "use streaming mode for prefix analysis (constant memory)") + flagSet.BoolVar(&trackMem, "track-mem", false, "print heap memory usage to stderr (streaming mode only)") flagSet.BoolVar(&showGlobalMeta, "show-global-meta", false, "Show global meta likes redis-verion/ctime/functions") _ = flagSet.Parse(os.Args[1:]) // ExitOnError src := flagSet.Arg(0) @@ -146,7 +148,9 @@ func main() { case "bigkey": err = helper.FindBiggestKeys(src, n, outputFile, options...) case "prefix": - if streaming { + if streaming && trackMem { + err = helper.StreamingPrefixAnalyseWithMemTrack(src, n, maxDepth, sep, outputFile, options...) + } else if streaming { err = helper.StreamingPrefixAnalyse(src, n, maxDepth, sep, outputFile, options...) } else { err = helper.PrefixAnalyse(src, n, maxDepth, outputFile, options...) diff --git a/helper/prefix_stream.go b/helper/prefix_stream.go index 598f888..d7a915e 100644 --- a/helper/prefix_stream.go +++ b/helper/prefix_stream.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "os" + "runtime" "sort" "strconv" "strings" @@ -22,7 +23,18 @@ type prefixStats struct { // StreamingPrefixAnalyse reads an RDB file and aggregates memory usage by key prefix // using constant memory (no radix tree). Keys are split by separator up to maxDepth. +// When trackMem is true, heap usage stats are printed to stderr every 1M keys. func StreamingPrefixAnalyse(rdbFilename string, topN int, maxDepth int, separator string, output *os.File, options ...interface{}) error { + return streamingPrefixAnalyse(rdbFilename, topN, maxDepth, separator, false, output, options...) +} + +// StreamingPrefixAnalyseWithMemTrack is like StreamingPrefixAnalyse but prints +// heap memory usage to stderr periodically. +func StreamingPrefixAnalyseWithMemTrack(rdbFilename string, topN int, maxDepth int, separator string, output *os.File, options ...interface{}) error { + return streamingPrefixAnalyse(rdbFilename, topN, maxDepth, separator, true, output, options...) +} + +func streamingPrefixAnalyse(rdbFilename string, topN int, maxDepth int, separator string, trackMem bool, output *os.File, options ...interface{}) error { if rdbFilename == "" { return errors.New("src file path is required") } @@ -65,15 +77,27 @@ func StreamingPrefixAnalyse(rdbFilename string, topN int, maxDepth int, separato } else { parts = strings.SplitN(key, separator, maxDepth+1) } - for depth := 1; depth <= maxDepth && depth <= len(parts); depth++ { + var limit int + if charMode { + limit = len(parts) + if limit > maxDepth { + limit = maxDepth + } + } else { + // only emit prefixes that actually group keys — + // skip depth == len(parts) since that's the full key, not a prefix + limit = len(parts) - 1 + if limit > maxDepth { + limit = maxDepth + } + } + for depth := 1; depth <= limit; depth++ { var prefix string if charMode { prefix = strings.Join(parts[:depth], "") } else { prefix = strings.Join(parts[:depth], separator) - if depth < len(parts) { - prefix += separator + "*" - } + prefix += separator + "*" } mapKey := strconv.Itoa(db) + "\x00" + prefix @@ -88,7 +112,16 @@ func StreamingPrefixAnalyse(rdbFilename string, topN int, maxDepth int, separato keysSeen++ if keysSeen%1_000_000 == 0 { - fmt.Fprintf(os.Stderr, "Processed %d keys, %d unique prefixes\n", keysSeen, len(prefixes)) + if trackMem { + var m runtime.MemStats + runtime.ReadMemStats(&m) + fmt.Fprintf(os.Stderr, "Processed %d keys, %d unique prefixes | heap: %s, sys: %s\n", + keysSeen, len(prefixes), + bytefmt.FormatSize(m.HeapAlloc), + bytefmt.FormatSize(m.Sys)) + } else { + fmt.Fprintf(os.Stderr, "Processed %d keys, %d unique prefixes\n", keysSeen, len(prefixes)) + } } return true }) @@ -96,7 +129,16 @@ func StreamingPrefixAnalyse(rdbFilename string, topN int, maxDepth int, separato return err } - fmt.Fprintf(os.Stderr, "Done: %d keys, %d unique prefixes\n", keysSeen, len(prefixes)) + if trackMem { + var m runtime.MemStats + runtime.ReadMemStats(&m) + fmt.Fprintf(os.Stderr, "Done: %d keys, %d unique prefixes | peak heap: %s, sys: %s\n", + keysSeen, len(prefixes), + bytefmt.FormatSize(m.HeapAlloc), + bytefmt.FormatSize(m.Sys)) + } else { + fmt.Fprintf(os.Stderr, "Done: %d keys, %d unique prefixes\n", keysSeen, len(prefixes)) + } // sort by size descending type entry struct { From 053d988035f6b80c5bfcd0d1efa11932760c6294 Mon Sep 17 00:00:00 2001 From: hdt3213 Date: Sat, 18 Apr 2026 23:32:01 +0800 Subject: [PATCH 3/3] optimize prefix analyze args pattern --- README.md | 23 ++- README_CN.md | 23 ++- cmd.go | 21 ++- cmd_test.go | 5 + go.mod | 12 +- go.sum | 74 --------- helper/prefix_stream.go | 96 +++-------- helper/prefix_stream_test.go | 305 ++++++++++++++++++++++++++++++++--- 8 files changed, 373 insertions(+), 186 deletions(-) diff --git a/README.md b/README.md index d032ecf..bdd8c1d 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,9 @@ Options: -port listen port for flame graph web service -sep separator for flamegraph, rdb will separate key by it, default value is ":". supporting multi separators: -sep sep1 -sep sep2 + -prefix-sep separator for prefix analysis (flat-map mode, constant memory). + when specified, uses separator-based analysis instead of radix tree. + supporting multi separators: -prefix-sep sep1 -prefix-sep sep2 -regex using regex expression filter keys -expire filter keys by its expiration time 1. '1751731200~1751817600' get keys with expiration time in range [1751731200, 1751817600] @@ -86,7 +89,9 @@ parameters between '[' and ']' is optional rdb -c bigkey [-o dump.aof] [-n 10] dump.rdb 5. get number and memory size by prefix rdb -c prefix [-n 10] [-max-depth 3] [-o prefix-report.csv] dump.rdb -6. draw flamegraph +6. get number and memory size by prefix with separator (constant memory) + rdb -c prefix [-n 10] [-max-depth 3] -prefix-sep : [-o prefix-report.csv] dump.rdb +7. draw flamegraph rdb -c flamegraph [-port 16379] [-sep :] dump.rdb ``` @@ -417,6 +422,22 @@ Example: rdb -c prefix -n 10 -max-depth 2 -o prefix.csv cases/memory.rdb ``` +## Separator-based Prefix Analysis (Constant Memory) + +When you specify `-prefix-sep`, RDB uses a flat map instead of a radix tree for prefix analysis. This mode has constant memory usage regardless of the number of keys, making it suitable for very large RDB files. + +You need to specify the separator(s) used in your key naming convention. Multiple separators are supported and will be normalized to the first one. + +```bash +rdb -c prefix -prefix-sep : -n 10 -max-depth 2 -o prefix.csv dump.rdb +``` + +With multiple separators (e.g., keys use both `:` and `.`): + +```bash +rdb -c prefix -prefix-sep : -prefix-sep . -n 10 -o prefix.csv dump.rdb +``` + # Flame Graph In many cases there is not a few very large key but lots of small keys that occupied most memory. diff --git a/README_CN.md b/README_CN.md index c88138e..4ccc7e7 100644 --- a/README_CN.md +++ b/README_CN.md @@ -45,6 +45,9 @@ Options: -port listen port for flame graph web service -sep separator for flamegraph, rdb will separate key by it, default value is ":". supporting multi separators: -sep sep1 -sep sep2 + -prefix-sep separator for prefix analysis (flat-map mode, constant memory). + when specified, uses separator-based analysis instead of radix tree. + supporting multi separators: -prefix-sep sep1 -prefix-sep sep2 -regex using regex expression filter keys -expire filter keys by its expiration time 1. '1751731200~1751817600' get keys with expiration time in range [1751731200, 1751817600] @@ -72,7 +75,9 @@ parameters between '[' and ']' is optional rdb -c bigkey [-o dump.aof] [-n 10] dump.rdb 5. get number and memory size by prefix rdb -c prefix [-n 10] [-max-depth 3] [-o prefix-report.csv] dump.rdb -6. draw flamegraph +6. get number and memory size by prefix with separator (constant memory) + rdb -c prefix [-n 10] [-max-depth 3] -prefix-sep : [-o prefix-report.csv] dump.rdb +7. draw flamegraph rdb -c flamegraph [-port 16379] [-sep :] dump.rdb ``` @@ -343,6 +348,22 @@ Example: rdb -c prefix -n 10 -max-depth 2 -o prefix.csv cases/memory.rdb ``` +## 基于分隔符的前缀分析(恒定内存) + +当指定 `-prefix-sep` 参数时,RDB 将使用 flat map 代替 radix tree 进行前缀分析。这种模式的内存占用不随 key 数量增长,适合分析超大 RDB 文件。 + +您需要指定 key 命名规范中使用的分隔符。支持多个分隔符,所有分隔符会被归一化为第一个。 + +```bash +rdb -c prefix -prefix-sep : -n 10 -max-depth 2 -o prefix.csv dump.rdb +``` + +使用多个分隔符(例如 key 中同时使用 `:` 和 `.`): + +```bash +rdb -c prefix -prefix-sep : -prefix-sep . -n 10 -o prefix.csv dump.rdb +``` + # 火焰图 diff --git a/cmd.go b/cmd.go index cf2bc85..f71c62f 100644 --- a/cmd.go +++ b/cmd.go @@ -18,6 +18,9 @@ Options: -port listen port for flame graph web service -sep separator for flamegraph, rdb will separate key by it, default value is ":". supporting multi separators: -sep sep1 -sep sep2 + -prefix-sep separator for prefix analysis (flat-map mode, constant memory). + when specified, uses separator-based analysis instead of radix tree. + supporting multi separators: -prefix-sep sep1 -prefix-sep sep2 -regex using regex expression filter keys -expire filter keys by its expiration time 1. '1751731200~1751817600' get keys with expiration time in range [1751731200, 1751817600] @@ -45,7 +48,9 @@ parameters between '[' and ']' is optional rdb -c bigkey [-o dump.aof] [-n 10] dump.rdb 5. get number and memory size by prefix rdb -c prefix [-n 10] [-max-depth 3] [-o prefix-report.csv] dump.rdb -6. draw flamegraph +6. get number and memory size by prefix with separator (constant memory) + rdb -c prefix [-n 10] [-max-depth 3] -prefix-sep : [-o prefix-report.csv] dump.rdb +7. draw flamegraph rdb -c flamegraph [-port 16379] [-sep :] dump.rdb ` @@ -74,9 +79,7 @@ func main() { var maxDepth int var concurrent int var showGlobalMeta bool - var sep string - var streaming bool - var trackMem bool + var prefixSeps separators var err error flagSet.StringVar(&cmd, "c", "", "command for rdb: json") flagSet.StringVar(&output, "o", "", "output file path") @@ -89,9 +92,7 @@ func main() { flagSet.StringVar(&expirationExpr, "expire", "", "expiration filter expression") flagSet.StringVar(&sizeExpr, "size", "", "size filter expression") flagSet.BoolVar(&noExpired, "no-expired", false, "filter expired keys(deprecated, please use expire)") - flagSet.StringVar(&sep, "prefix-sep", ":", "separator for streaming prefix analysis") - flagSet.BoolVar(&streaming, "streaming", false, "use streaming mode for prefix analysis (constant memory)") - flagSet.BoolVar(&trackMem, "track-mem", false, "print heap memory usage to stderr (streaming mode only)") + flagSet.Var(&prefixSeps, "prefix-sep", "separator for prefix analysis (flat-map mode, constant memory)") flagSet.BoolVar(&showGlobalMeta, "show-global-meta", false, "Show global meta likes redis-verion/ctime/functions") _ = flagSet.Parse(os.Args[1:]) // ExitOnError src := flagSet.Arg(0) @@ -148,10 +149,8 @@ func main() { case "bigkey": err = helper.FindBiggestKeys(src, n, outputFile, options...) case "prefix": - if streaming && trackMem { - err = helper.StreamingPrefixAnalyseWithMemTrack(src, n, maxDepth, sep, outputFile, options...) - } else if streaming { - err = helper.StreamingPrefixAnalyse(src, n, maxDepth, sep, outputFile, options...) + if len(prefixSeps) > 0 { + err = helper.SepPrefixAnalyse(src, n, maxDepth, prefixSeps, outputFile, options...) } else { err = helper.PrefixAnalyse(src, n, maxDepth, outputFile, options...) } diff --git a/cmd_test.go b/cmd_test.go index a6cb3f4..2acb4ad 100644 --- a/cmd_test.go +++ b/cmd_test.go @@ -63,6 +63,11 @@ func TestCmd(t *testing.T) { if f, _ := os.Stat("tmp/tree.csv"); f == nil { t.Error("command prefix failed") } + os.Args = []string{"", "-c", "prefix", "-prefix-sep", ":", "-o", "tmp/tree_sep.csv", "cases/tree.rdb"} + main() + if f, _ := os.Stat("tmp/tree_sep.csv"); f == nil { + t.Error("command prefix with prefix-sep failed") + } // test error command line os.Args = []string{"", "-c", "json", "-o", "tmp/output", "/none/a"} diff --git a/go.mod b/go.mod index 141988c..5424681 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,15 @@ module github.com/hdt3213/rdb -go 1.16 +go 1.18 + +require github.com/bytedance/sonic v1.15.0 require ( - github.com/bytedance/sonic v1.15.0 - github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect - github.com/cloudwego/iasm v0.2.0 // indirect + github.com/bytedance/gopkg v0.1.3 // indirect + github.com/bytedance/sonic/loader v0.5.0 // indirect + github.com/cloudwego/base64x v0.1.6 // indirect + github.com/klauspost/cpuid/v2 v2.2.9 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect golang.org/x/arch v0.9.0 // indirect golang.org/x/sys v0.24.0 // indirect ) diff --git a/go.sum b/go.sum index 6e2a7af..af983a3 100644 --- a/go.sum +++ b/go.sum @@ -1,108 +1,34 @@ github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M= github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= -github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= -github.com/bytedance/sonic v1.8.7 h1:d3sry5vGgVq/OpgozRUNP6xBsSo0mtNdwliApw+SAMQ= -github.com/bytedance/sonic v1.8.7/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= -github.com/bytedance/sonic v1.12.1 h1:jWl5Qz1fy7X1ioY74WqO0KjAMtAGQs4sYnjiEBiyX24= -github.com/bytedance/sonic v1.12.1/go.mod h1:B8Gt/XvtZ3Fqj+iSKMypzymZxw/FVwgIGKzMzT9r/rk= github.com/bytedance/sonic v1.15.0 h1:/PXeWFaR5ElNcVE84U0dOHjiMHQOwNIx3K4ymzh/uSE= github.com/bytedance/sonic v1.15.0/go.mod h1:tFkWrPz0/CUCLEF4ri4UkHekCIcdnkqXw9VduqpJh0k= -github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= -github.com/bytedance/sonic/loader v0.2.0 h1:zNprn+lsIP06C/IqCHs3gPQIvnvpKbbxyXQP1iU4kWM= -github.com/bytedance/sonic/loader v0.2.0/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= -github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= github.com/bytedance/sonic/loader v0.5.0 h1:gXH3KVnatgY7loH5/TkeVyXPfESoqSBSBEiDd5VjlgE= github.com/bytedance/sonic/loader v0.5.0/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo= -github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= -github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= -github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= -github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y= -github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M= github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU= -github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= -github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= -github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= -github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= -github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY= github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8= -github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= -github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -golang.org/x/arch v0.0.0-20210923205945-b76863e36670 h1:18EFjUmQOcUvxNYSkA6jO9VAiXCnxFY6NyDX0bHDmkU= -golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.9.0 h1:ub9TgUInamJ8mrZIGlBG6/4TqWeMszd4N8lNorbrr6k= golang.org/x/arch v0.9.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= -golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= -golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= -golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= -golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= -rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/helper/prefix_stream.go b/helper/prefix_stream.go index d7a915e..a50a2f4 100644 --- a/helper/prefix_stream.go +++ b/helper/prefix_stream.go @@ -6,7 +6,6 @@ import ( "fmt" "math" "os" - "runtime" "sort" "strconv" "strings" @@ -21,23 +20,16 @@ type prefixStats struct { keyCount int } -// StreamingPrefixAnalyse reads an RDB file and aggregates memory usage by key prefix -// using constant memory (no radix tree). Keys are split by separator up to maxDepth. -// When trackMem is true, heap usage stats are printed to stderr every 1M keys. -func StreamingPrefixAnalyse(rdbFilename string, topN int, maxDepth int, separator string, output *os.File, options ...interface{}) error { - return streamingPrefixAnalyse(rdbFilename, topN, maxDepth, separator, false, output, options...) -} - -// StreamingPrefixAnalyseWithMemTrack is like StreamingPrefixAnalyse but prints -// heap memory usage to stderr periodically. -func StreamingPrefixAnalyseWithMemTrack(rdbFilename string, topN int, maxDepth int, separator string, output *os.File, options ...interface{}) error { - return streamingPrefixAnalyse(rdbFilename, topN, maxDepth, separator, true, output, options...) -} - -func streamingPrefixAnalyse(rdbFilename string, topN int, maxDepth int, separator string, trackMem bool, output *os.File, options ...interface{}) error { +// SepPrefixAnalyse reads an RDB file and aggregates memory usage by key prefix +// using a flat map (constant memory). Keys are split by the given separators up to maxDepth. +// Multiple separators are normalized to the first one before splitting. +func SepPrefixAnalyse(rdbFilename string, topN int, maxDepth int, separators []string, output *os.File, options ...interface{}) error { if rdbFilename == "" { return errors.New("src file path is required") } + if len(separators) == 0 { + return errors.New("at least one separator is required") + } if topN <= 0 { topN = math.MaxInt } @@ -56,49 +48,33 @@ func streamingPrefixAnalyse(rdbFilename string, topN int, maxDepth int, separato return err } - // flat map: "db:prefix" -> stats. One entry per unique prefix, typically 1K-50K entries. + primarySep := separators[0] + + // flat map: "db\x00prefix" -> stats prefixes := make(map[string]*prefixStats) - var keysSeen int err = dec.Parse(func(object model.RedisObject) bool { key := object.GetKey() db := object.GetDBIndex() size := object.GetSize() - // extract prefix at each depth level and accumulate - var charMode bool - var parts []string - if separator == "" { - // per-character split: prefix at depth N = first N chars of key - charMode = true - for _, ch := range key { - parts = append(parts, string(ch)) - } - } else { - parts = strings.SplitN(key, separator, maxDepth+1) + // normalize all separators to the primary one + normalizedKey := key + for i := 1; i < len(separators); i++ { + normalizedKey = strings.ReplaceAll(normalizedKey, separators[i], primarySep) } - var limit int - if charMode { - limit = len(parts) - if limit > maxDepth { - limit = maxDepth - } - } else { - // only emit prefixes that actually group keys — - // skip depth == len(parts) since that's the full key, not a prefix - limit = len(parts) - 1 - if limit > maxDepth { - limit = maxDepth - } + + parts := strings.SplitN(normalizedKey, primarySep, maxDepth+1) + + // only emit prefixes that actually group keys — + // skip depth == len(parts) since that's the full key, not a prefix + limit := len(parts) - 1 + if limit > maxDepth { + limit = maxDepth } + for depth := 1; depth <= limit; depth++ { - var prefix string - if charMode { - prefix = strings.Join(parts[:depth], "") - } else { - prefix = strings.Join(parts[:depth], separator) - prefix += separator + "*" - } + prefix := strings.Join(parts[:depth], primarySep) + primarySep + "*" mapKey := strconv.Itoa(db) + "\x00" + prefix s := prefixes[mapKey] @@ -110,36 +86,12 @@ func streamingPrefixAnalyse(rdbFilename string, topN int, maxDepth int, separato s.keyCount++ } - keysSeen++ - if keysSeen%1_000_000 == 0 { - if trackMem { - var m runtime.MemStats - runtime.ReadMemStats(&m) - fmt.Fprintf(os.Stderr, "Processed %d keys, %d unique prefixes | heap: %s, sys: %s\n", - keysSeen, len(prefixes), - bytefmt.FormatSize(m.HeapAlloc), - bytefmt.FormatSize(m.Sys)) - } else { - fmt.Fprintf(os.Stderr, "Processed %d keys, %d unique prefixes\n", keysSeen, len(prefixes)) - } - } return true }) if err != nil { return err } - if trackMem { - var m runtime.MemStats - runtime.ReadMemStats(&m) - fmt.Fprintf(os.Stderr, "Done: %d keys, %d unique prefixes | peak heap: %s, sys: %s\n", - keysSeen, len(prefixes), - bytefmt.FormatSize(m.HeapAlloc), - bytefmt.FormatSize(m.Sys)) - } else { - fmt.Fprintf(os.Stderr, "Done: %d keys, %d unique prefixes\n", keysSeen, len(prefixes)) - } - // sort by size descending type entry struct { db string diff --git a/helper/prefix_stream_test.go b/helper/prefix_stream_test.go index 14def1a..2b2451b 100644 --- a/helper/prefix_stream_test.go +++ b/helper/prefix_stream_test.go @@ -1,60 +1,319 @@ package helper import ( + "bufio" "os" "path/filepath" + "strings" "testing" + + "github.com/hdt3213/rdb/encoder" ) -func TestStreamingPrefixAnalyse(t *testing.T) { +// makeTestRDB creates a temporary RDB file with the given string keys (all values are "v"). +func makeTestRDB(t *testing.T, path string, keys []string) { + t.Helper() + f, err := os.Create(path) + if err != nil { + t.Fatalf("create rdb failed: %v", err) + } + defer f.Close() + + enc := encoder.NewEncoder(f) + if err = enc.WriteHeader(); err != nil { + t.Fatalf("write header: %v", err) + } + if err = enc.WriteDBHeader(0, uint64(len(keys)), 0); err != nil { + t.Fatalf("write db header: %v", err) + } + for _, key := range keys { + if err = enc.WriteStringObject(key, []byte("v")); err != nil { + t.Fatalf("write string %q: %v", key, err) + } + } + if err = enc.WriteEnd(); err != nil { + t.Fatalf("write end: %v", err) + } +} + +// readCSVLines reads a file and returns non-empty lines. +func readCSVLines(t *testing.T, path string) []string { + t.Helper() + f, err := os.Open(path) + if err != nil { + t.Fatalf("open %s: %v", path, err) + } + defer f.Close() + var lines []string + sc := bufio.NewScanner(f) + for sc.Scan() { + line := strings.TrimSpace(sc.Text()) + if line != "" { + lines = append(lines, line) + } + } + return lines +} + +func TestSepPrefixAnalyse(t *testing.T) { + err := os.MkdirAll("tmp", os.ModePerm) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = os.RemoveAll("tmp") + }() + + rdbPath := filepath.Join("tmp", "sep_test.rdb") + makeTestRDB(t, rdbPath, []string{ + "user:alice", + "user:bob", + "user:charlie", + "post:100", + "post:200", + "stat:user:active", + "stat:user:inactive", + "stat:post:views", + "solo", // no separator — should not generate any prefix + }) + + outPath := filepath.Join("tmp", "sep_test.csv") + outFile, err := os.Create(outPath) + if err != nil { + t.Fatal(err) + } + err = SepPrefixAnalyse(rdbPath, 0, 0, []string{":"}, outFile, ) + if err != nil { + t.Fatalf("SepPrefixAnalyse failed: %v", err) + } + _ = outFile.Close() + + lines := readCSVLines(t, outPath) + // header + data lines + if len(lines) < 2 { + t.Fatalf("expected at least 2 lines (header + data), got %d", len(lines)) + } + + // check header + if lines[0] != "database,prefix,size,size_readable,key_count" { + t.Errorf("unexpected header: %s", lines[0]) + } + + // collect prefixes from output + prefixSet := make(map[string]bool) + for _, line := range lines[1:] { + parts := strings.SplitN(line, ",", 3) + if len(parts) >= 2 { + prefixSet[parts[1]] = true + } + } + + // should contain expected prefixes + for _, expected := range []string{"user:*", "post:*", "stat:*", "stat:user:*", "stat:post:*"} { + if !prefixSet[expected] { + t.Errorf("expected prefix %q not found in output", expected) + } + } + + // "solo" has no separator so should NOT appear as a prefix + for prefix := range prefixSet { + if strings.HasPrefix(prefix, "solo") { + t.Errorf("unexpected prefix for key without separator: %s", prefix) + } + } +} + +func TestSepPrefixAnalyseTopN(t *testing.T) { + err := os.MkdirAll("tmp", os.ModePerm) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = os.RemoveAll("tmp") + }() + + rdbPath := filepath.Join("tmp", "sep_topn.rdb") + makeTestRDB(t, rdbPath, []string{ + "a:1", "a:2", "a:3", + "b:1", "b:2", + "c:1", + }) + + outPath := filepath.Join("tmp", "sep_topn.csv") + outFile, err := os.Create(outPath) + if err != nil { + t.Fatal(err) + } + err = SepPrefixAnalyse(rdbPath, 2, 0, []string{":"}, outFile) + if err != nil { + t.Fatalf("SepPrefixAnalyse failed: %v", err) + } + _ = outFile.Close() + + lines := readCSVLines(t, outPath) + // header + at most 2 data lines + dataLines := lines[1:] + if len(dataLines) != 2 { + t.Errorf("expected 2 data lines (topN=2), got %d", len(dataLines)) + } +} + +func TestSepPrefixAnalyseMaxDepth(t *testing.T) { err := os.MkdirAll("tmp", os.ModePerm) if err != nil { - return + t.Fatal(err) } defer func() { _ = os.RemoveAll("tmp") }() - srcRdb := filepath.Join("../cases", "tree.rdb") - // test with empty separator (per-character), depth 0 (unlimited), top 3 - actualFile := filepath.Join("tmp", "stream_tree.csv") - f, err := os.Create(actualFile) + rdbPath := filepath.Join("tmp", "sep_depth.rdb") + makeTestRDB(t, rdbPath, []string{ + "a:b:c:d", // depth 3 key + "a:b:x:y", // depth 3 key + }) + + outPath := filepath.Join("tmp", "sep_depth.csv") + outFile, err := os.Create(outPath) if err != nil { t.Fatal(err) } - err = StreamingPrefixAnalyse(srcRdb, 3, 0, "", f, ) + // maxDepth=1: should only see "a:*", not "a:b:*" or deeper + err = SepPrefixAnalyse(rdbPath, 0, 1, []string{":"}, outFile) + if err != nil { + t.Fatalf("SepPrefixAnalyse failed: %v", err) + } + _ = outFile.Close() + + lines := readCSVLines(t, outPath) + dataLines := lines[1:] + if len(dataLines) != 1 { + t.Errorf("expected 1 prefix with maxDepth=1, got %d", len(dataLines)) + } + if len(dataLines) > 0 && !strings.Contains(dataLines[0], "a:*") { + t.Errorf("expected prefix a:*, got: %s", dataLines[0]) + } +} + +func TestSepPrefixAnalyseMultiSep(t *testing.T) { + err := os.MkdirAll("tmp", os.ModePerm) if err != nil { t.Fatal(err) } - _ = f.Close() + defer func() { + _ = os.RemoveAll("tmp") + }() - expectFile := filepath.Join("../cases", "tree.csv") - equals, err := compareFileByLine(t, actualFile, expectFile) + rdbPath := filepath.Join("tmp", "sep_multi.rdb") + // keys use different separators: ":" and "." + makeTestRDB(t, rdbPath, []string{ + "user:alice", + "user.bob", // "." normalized to ":" + "post:100", + "post.200", // "." normalized to ":" + }) + + outPath := filepath.Join("tmp", "sep_multi.csv") + outFile, err := os.Create(outPath) + if err != nil { + t.Fatal(err) + } + err = SepPrefixAnalyse(rdbPath, 0, 0, []string{":", "."}, outFile) if err != nil { - t.Fatalf("error comparing files: %v", err) + t.Fatalf("SepPrefixAnalyse failed: %v", err) + } + _ = outFile.Close() + + lines := readCSVLines(t, outPath) + prefixMap := make(map[string]string) // prefix -> full line + for _, line := range lines[1:] { + parts := strings.SplitN(line, ",", 3) + if len(parts) >= 2 { + prefixMap[parts[1]] = line + } } - if !equals { - t.Error("streaming prefix top3 result does not match expected") + + // "user.bob" should be normalized to "user:bob", so "user:*" should have 2 keys + if line, ok := prefixMap["user:*"]; ok { + if !strings.HasSuffix(line, ",2") { + t.Errorf("expected user:* key_count=2, got: %s", line) + } + } else { + t.Error("expected prefix user:* not found") + } + + if line, ok := prefixMap["post:*"]; ok { + if !strings.HasSuffix(line, ",2") { + t.Errorf("expected post:* key_count=2, got: %s", line) + } + } else { + t.Error("expected prefix post:* not found") } +} - // test with empty separator, depth 2 - actualFile2 := filepath.Join("tmp", "stream_tree2.csv") - f2, err := os.Create(actualFile2) +func TestSepPrefixAnalyseErrors(t *testing.T) { + err := os.MkdirAll("tmp", os.ModePerm) if err != nil { t.Fatal(err) } - err = StreamingPrefixAnalyse(srcRdb, 0, 2, "", f2) + defer func() { + _ = os.RemoveAll("tmp") + }() + + outFile, _ := os.Create(filepath.Join("tmp", "err.csv")) + defer outFile.Close() + + // empty filename + err = SepPrefixAnalyse("", 0, 0, []string{":"}, outFile) + if err == nil { + t.Error("expected error for empty filename") + } + + // empty separators + err = SepPrefixAnalyse("tmp/test.rdb", 0, 0, []string{}, outFile) + if err == nil { + t.Error("expected error for empty separators") + } + + // non-existent file + err = SepPrefixAnalyse("/nonexistent/file.rdb", 0, 0, []string{":"}, outFile) + if err == nil { + t.Error("expected error for non-existent file") + } +} + +func TestSepPrefixAnalyseNoSepKeys(t *testing.T) { + err := os.MkdirAll("tmp", os.ModePerm) if err != nil { t.Fatal(err) } - _ = f2.Close() + defer func() { + _ = os.RemoveAll("tmp") + }() + + rdbPath := filepath.Join("tmp", "sep_nosep.rdb") + // all keys have no separator + makeTestRDB(t, rdbPath, []string{ + "alpha", + "beta", + "gamma", + }) - expectFile2 := filepath.Join("../cases", "tree2.csv") - equals, err = compareFileByLine(t, actualFile2, expectFile2) + outPath := filepath.Join("tmp", "sep_nosep.csv") + outFile, err := os.Create(outPath) + if err != nil { + t.Fatal(err) + } + err = SepPrefixAnalyse(rdbPath, 0, 0, []string{":"}, outFile) if err != nil { - t.Fatalf("error comparing files: %v", err) + t.Fatalf("SepPrefixAnalyse failed: %v", err) } - if !equals { - t.Error("streaming prefix depth2 result does not match expected") + _ = outFile.Close() + + lines := readCSVLines(t, outPath) + // header only — no prefixes when no keys contain the separator + dataLines := lines[1:] + if len(dataLines) != 0 { + t.Errorf("expected 0 data lines for keys without separator, got %d", len(dataLines)) } }