Skip to content
This repository was archived by the owner on Jul 4, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 29 additions & 15 deletions bitswap-tuning/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,37 @@ module github.com/ipfs/test-plans/bitswap-tuning
go 1.14

require (
github.com/ipfs/go-bitswap v0.1.9
github.com/ipfs/go-blockservice v0.1.2
github.com/ipfs/go-cid v0.0.3
github.com/ipfs/go-datastore v0.1.1
github.com/ipfs/go-ipfs-blockstore v0.1.0
github.com/ipfs/go-ipfs-chunker v0.0.3
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/google/gopacket v1.1.18 // indirect
github.com/ipfs/go-bitswap v0.2.19
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-delay v0.0.1
github.com/ipfs/go-ipfs-files v0.0.6
github.com/ipfs/go-ipfs-files v0.0.8
github.com/ipfs/go-ipfs-routing v0.1.0
github.com/ipfs/go-ipld-format v0.0.2
github.com/ipfs/go-merkledag v0.2.4
github.com/ipfs/go-unixfs v0.2.2
github.com/libp2p/go-libp2p v0.4.0
github.com/libp2p/go-libp2p-core v0.3.0
github.com/multiformats/go-multiaddr v0.2.0
github.com/multiformats/go-multihash v0.0.10
github.com/ipfs/go-ipld-cbor v0.0.4 // indirect
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-merkledag v0.3.2
github.com/ipfs/go-unixfs v0.2.4
github.com/libp2p/go-libp2p v0.10.2
github.com/libp2p/go-libp2p-autonat v0.3.2 // indirect
github.com/libp2p/go-libp2p-core v0.6.1
github.com/libp2p/go-mplex v0.1.3 // indirect
github.com/libp2p/go-reuseport-transport v0.0.4 // indirect
github.com/libp2p/go-sockaddr v0.1.0 // indirect
github.com/libp2p/go-yamux v1.3.8 // indirect
github.com/multiformats/go-multiaddr v0.2.2
github.com/multiformats/go-multihash v0.0.14
github.com/pkg/errors v0.9.1
github.com/testground/sdk-go v0.1.1
github.com/polydawn/refmt v0.0.0-20190807091052-3d65705ee9f1 // indirect
github.com/testground/sdk-go v0.2.3
github.com/whyrusleeping/cbor-gen v0.0.0-20200723185710-6a3894a6352b // indirect
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de // indirect
golang.org/x/net v0.0.0-20200707034311-ab3426394381 // indirect
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642 // indirect
golang.org/x/text v0.3.3 // indirect
)
356 changes: 356 additions & 0 deletions bitswap-tuning/go.sum

Large diffs are not rendered by default.

18 changes: 5 additions & 13 deletions bitswap-tuning/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,12 @@ package main

import (
test "github.com/ipfs/test-plans/bitswap-tuning/test"
"github.com/testground/sdk-go/runtime"
"github.com/testground/sdk-go/run"
)

func main() {
runtime.Invoke(run)
}

func run(runenv *runtime.RunEnv) error {
switch c := runenv.TestCase; c {
case "transfer":
return test.Transfer(runenv)
case "fuzz":
return test.Fuzz(runenv)
default:
panic("unrecognized test case")
}
run.InvokeMap(map[string]interface{}{
"transfer": test.Transfer,
"fuzz": test.Fuzz,
})
}
1 change: 1 addition & 0 deletions bitswap-tuning/manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ instances = { min = 2, max = 64, default = 2 }
bandwidth_mb = { type = "int", desc = "bandwidth", unit = "Mib", default = 1024 }
parallel_gen_mb = { type = "int", desc = "maximum allowed size of seed data to generate in parallel", unit = "Mib", default = 100 }


[[testcases]]
name = "fuzz"
instances = { min = 2, max = 64, default = 2 }
Expand Down
173 changes: 173 additions & 0 deletions bitswap-tuning/test/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package test

import (
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"

"github.com/testground/sdk-go/runtime"
"github.com/testground/sdk-go/sync"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/ipfs/test-plans/bitswap-tuning/utils"
)

func parseType(ctx context.Context, runenv *runtime.RunEnv, client *sync.DefaultClient, h host.Host, seq int64) (int64, utils.NodeType, int, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI some of this parseType function and getNodeSetSeq is most probably redundant, as with the latest SDKs we have 2 types of global sequence:

  1. one unique global sequence for every single test plan instance (no matter which group it comes from)
  2. one unique group sequence for every instance within a group (something that this part of the code appears to be implementing, as we didn't have it before).

See https://github.com/testground/sdk-go/blob/master/run/init_context.go#L36-L37

leechCount := runenv.IntParam("leech_count")
passiveCount := runenv.IntParam("passive_count")

grpCountOverride := false
if runenv.TestGroupID != "" {
grpLchLabel := runenv.TestGroupID + "_leech_count"
if runenv.IsParamSet(grpLchLabel) {
leechCount = runenv.IntParam(grpLchLabel)
grpCountOverride = true
}
grpPsvLabel := runenv.TestGroupID + "_passive_count"
if runenv.IsParamSet(grpPsvLabel) {
passiveCount = runenv.IntParam(grpPsvLabel)
grpCountOverride = true
}
}

var nodetp utils.NodeType
var tpindex int
grpseq := seq
seqstr := fmt.Sprintf("- seq %d / %d", seq, runenv.TestInstanceCount)
grpPrefix := ""
if grpCountOverride {
grpPrefix = runenv.TestGroupID + " "

var err error
grpseq, err = getNodeSetSeq(ctx, client, h, runenv.TestGroupID)
if err != nil {
return grpseq, nodetp, tpindex, err
}

seqstr = fmt.Sprintf("%s (%d / %d of %s)", seqstr, grpseq, runenv.TestGroupInstanceCount, runenv.TestGroupID)
}

// Note: seq starts at 1 (not 0)
switch {
case grpseq <= int64(leechCount):
nodetp = utils.Leech
tpindex = int(grpseq) - 1
case grpseq > int64(leechCount+passiveCount):
nodetp = utils.Seed
tpindex = int(grpseq) - 1 - (leechCount + passiveCount)
default:
nodetp = utils.Passive
tpindex = int(grpseq) - 1 - leechCount
}

runenv.RecordMessage("I am %s %d %s", grpPrefix+nodetp.String(), tpindex, seqstr)

return grpseq, nodetp, tpindex, nil
}

func getNodeSetSeq(ctx context.Context, client *sync.DefaultClient, h host.Host, setID string) (int64, error) {
topic := sync.NewTopic("nodes"+setID, &peer.AddrInfo{})

return client.Publish(ctx, topic, host.InfoFromHost(h))
}

func setupSeed(ctx context.Context, runenv *runtime.RunEnv, node *utils.Node, fileSize int, seedIndex int) (cid.Cid, error) {
tmpFile := utils.RandReader(fileSize)
ipldNode, err := node.Add(ctx, tmpFile)
if err != nil {
return cid.Cid{}, err
}

if !runenv.IsParamSet("seed_fraction") {
return ipldNode.Cid(), nil
}
seedFrac := runenv.StringParam("seed_fraction")
if seedFrac == "" {
return ipldNode.Cid(), nil
}

parts := strings.Split(seedFrac, "/")
if len(parts) != 2 {
return cid.Cid{}, fmt.Errorf("Invalid seed fraction %s", seedFrac)
}
numerator, nerr := strconv.ParseInt(parts[0], 10, 64)
denominator, derr := strconv.ParseInt(parts[1], 10, 64)
if nerr != nil || derr != nil {
return cid.Cid{}, fmt.Errorf("Invalid seed fraction %s", seedFrac)
}

nodes, err := getLeafNodes(ctx, ipldNode, node.Dserv)
if err != nil {
return cid.Cid{}, err
}
var del []cid.Cid
for i := 0; i < len(nodes); i++ {
idx := i + seedIndex
if idx%int(denominator) >= int(numerator) {
del = append(del, nodes[i].Cid())
}
}
if err := node.Dserv.RemoveMany(ctx, del); err != nil {
return cid.Cid{}, err
}

runenv.RecordMessage("Retained %d / %d of blocks from seed, removed %d / %d blocks", numerator, denominator, len(del), len(nodes))
return ipldNode.Cid(), nil
}

func getLeafNodes(ctx context.Context, node ipld.Node, dserv ipld.DAGService) ([]ipld.Node, error) {
if len(node.Links()) == 0 {
return []ipld.Node{node}, nil
}

var leaves []ipld.Node
for _, l := range node.Links() {
child, err := l.GetNode(ctx, dserv)
if err != nil {
return nil, err
}
childLeaves, err := getLeafNodes(ctx, child, dserv)
if err != nil {
return nil, err
}
leaves = append(leaves, childLeaves...)
}

return leaves, nil
}

func getRootCidTopic(id int) *sync.Topic {
return sync.NewTopic(fmt.Sprintf("root-cid-%d", id), &cid.Cid{})
}

func emitMetrics(runenv *runtime.RunEnv, bsnode *utils.Node, runNum int, seq int64, grpseq int64,
latency time.Duration, bandwidthMB int, fileSize int, nodetp utils.NodeType, tpindex int, timeToFetch time.Duration) error {

stats, err := bsnode.Bitswap.Stat()
if err != nil {
return fmt.Errorf("Error getting stats from Bitswap: %w", err)
}

latencyMS := latency.Milliseconds()
id := fmt.Sprintf("latencyMS:%d/bandwidthMB:%d/run:%d/seq:%d/groupName:%s/groupSeq:%d/fileSize:%d/nodeType:%s/nodeTypeIndex:%d",
latencyMS, bandwidthMB, runNum, seq, runenv.TestGroupID, grpseq, fileSize, nodetp, tpindex)
if nodetp == utils.Leech {
runenv.R().RecordPoint(fmt.Sprintf("%s/name:time_to_fetch", id), float64(timeToFetch))
}
runenv.R().RecordPoint(fmt.Sprintf("%s/name:msgs_rcvd", id), float64(stats.MessagesReceived))
runenv.R().RecordPoint(fmt.Sprintf("%s/name:data_sent", id), float64(stats.DataSent))
runenv.R().RecordPoint(fmt.Sprintf("%s/name:data_rcvd", id), float64(stats.DataReceived))
runenv.R().RecordPoint(fmt.Sprintf("%s/name:dup_data_rcvd", id), float64(stats.DupDataReceived))
runenv.R().RecordPoint(fmt.Sprintf("%s/name:blks_sent", id), float64(stats.BlocksSent))
runenv.R().RecordPoint(fmt.Sprintf("%s/name:blks_rcvd", id), float64(stats.BlocksReceived))
runenv.R().RecordPoint(fmt.Sprintf("%s/name:dup_blks_rcvd", id), float64(stats.DupBlksReceived))

return nil
}
29 changes: 12 additions & 17 deletions bitswap-tuning/test/fuzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/ipfs/go-cid"
"golang.org/x/sync/errgroup"

"github.com/testground/sdk-go/network"
"github.com/testground/sdk-go/runtime"
"github.com/testground/sdk-go/sync"

Expand Down Expand Up @@ -54,6 +55,7 @@ func Fuzz(runenv *runtime.RunEnv) error {
defer cancel()

client := sync.MustBoundClient(ctx, runenv)
nwClient := network.NewClient(client, runenv)

/// --- Tear down
defer func() {
Expand Down Expand Up @@ -98,7 +100,7 @@ func Fuzz(runenv *runtime.RunEnv) error {
runenv.RecordMessage("I am %s with addrs: %v", h.ID(), h.Addrs())

// Set up network (with traffic shaping)
err = setupFuzzNetwork(ctx, runenv, client)
err = setupFuzzNetwork(ctx, runenv, nwClient)
if err != nil {
return fmt.Errorf("Failed to set up network: %w", err)
}
Expand Down Expand Up @@ -364,19 +366,13 @@ func Fuzz(runenv *runtime.RunEnv) error {
}

// Set up traffic shaping with random latency and bandwidth
func setupFuzzNetwork(ctx context.Context, runenv *runtime.RunEnv, client *sync.Client) error {
func setupFuzzNetwork(ctx context.Context, runenv *runtime.RunEnv, nwClient *network.Client) error {
if !runenv.TestSidecar {
return nil
}

// Wait for the network to be initialized.
if err := client.WaitNetworkInitialized(ctx, runenv); err != nil {
return err
}

// TODO: just put the unique testplan id inside the runenv?
hostname, err := os.Hostname()
if err != nil {
if err := nwClient.WaitNetworkInitialized(ctx); err != nil {
return err
}

Expand All @@ -385,21 +381,20 @@ func setupFuzzNetwork(ctx context.Context, runenv *runtime.RunEnv, client *sync.
bandwidth := 1 + rnd.Intn(100)

state := sync.State("network-configured")
topic := sync.NetworkTopic(hostname)
cfg := &sync.NetworkConfig{

cfg := &network.Config{
Network: "default",
Enable: true,
Default: sync.LinkShape{
Default: network.LinkShape{
Latency: latency,
Bandwidth: uint64(bandwidth * 1024 * 1024),
Jitter: (latency * 10) / 100,
},
State: state,
CallbackState: state,
CallbackTarget: runenv.TestInstanceCount,
}

_, err = client.PublishAndWait(ctx, topic, cfg, state, runenv.TestInstanceCount)
if err != nil {
return fmt.Errorf("failed to configure network: %w", err)
}
nwClient.ConfigureNetwork(ctx, cfg)

return nil
}
Loading