replace comment out tests with t.Skip()

This commit is contained in:
Jianrong 2021-08-29 11:47:19 +10:00
parent 7f581be3be
commit d17f1fdd75
11 changed files with 837 additions and 772 deletions

View file

@ -245,6 +245,15 @@ func loadKeyStoreTestV1(file string, t *testing.T) map[string]KeyStoreTestV1 {
return tests
}
func TestKeyForDirectICAP(t *testing.T) {
t.Skip("Test unresponsive")
t.Parallel()
key := NewKeyForDirectICAP(rand.Reader)
if !strings.HasPrefix(key.Address.Hex(), "0x00") {
t.Errorf("Expected first address byte to be zero, have: %s", key.Address.Hex())
}
}
func TestV3_31_Byte_Key(t *testing.T) {
t.Parallel()
tests := loadKeyStoreTestV3("testdata/v3_test_vector.json", t)

View file

@ -19,7 +19,15 @@
package main
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/log"
)
type testFile struct {
@ -53,197 +61,198 @@ func TestCLISwarmFsDefaultIPCPath(t *testing.T) {
// and without any log messages in the log:
// /Library/Filesystems/osxfuse.fs/Contents/Resources/load_osxfuse.
// This is the reason for this file not being built on darwin architecture.
// func TestCLISwarmFs(t *testing.T) {
// cluster := newTestCluster(t, 3)
// defer cluster.Shutdown()
func TestCLISwarmFs(t *testing.T) {
t.Skip("Test fail on travis")
cluster := newTestCluster(t, 3)
defer cluster.Shutdown()
// // create a tmp dir
// mountPoint, err := ioutil.TempDir("", "swarm-test")
// log.Debug("swarmfs cli test", "1st mount", mountPoint)
// if err != nil {
// t.Fatal(err)
// }
// defer os.RemoveAll(mountPoint)
// create a tmp dir
mountPoint, err := ioutil.TempDir("", "swarm-test")
log.Debug("swarmfs cli test", "1st mount", mountPoint)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(mountPoint)
// handlingNode := cluster.Nodes[0]
// mhash := doUploadEmptyDir(t, handlingNode)
// log.Debug("swarmfs cli test: mounting first run", "ipc path", filepath.Join(handlingNode.Dir, handlingNode.IpcPath))
handlingNode := cluster.Nodes[0]
mhash := doUploadEmptyDir(t, handlingNode)
log.Debug("swarmfs cli test: mounting first run", "ipc path", filepath.Join(handlingNode.Dir, handlingNode.IpcPath))
// mount := runSwarm(t, []string{
// fmt.Sprintf("--%s", utils.IPCPathFlag.Name), filepath.Join(handlingNode.Dir, handlingNode.IpcPath),
// "fs",
// "mount",
// mhash,
// mountPoint,
// }...)
// mount.ExpectExit()
mount := runSwarm(t, []string{
fmt.Sprintf("--%s", utils.IPCPathFlag.Name), filepath.Join(handlingNode.Dir, handlingNode.IpcPath),
"fs",
"mount",
mhash,
mountPoint,
}...)
mount.ExpectExit()
// filesToAssert := []*testFile{}
filesToAssert := []*testFile{}
// dirPath, err := createDirInDir(mountPoint, "testSubDir")
// if err != nil {
// t.Fatal(err)
// }
// dirPath2, err := createDirInDir(dirPath, "AnotherTestSubDir")
// if err != nil {
// t.Fatal(err)
// }
dirPath, err := createDirInDir(mountPoint, "testSubDir")
if err != nil {
t.Fatal(err)
}
dirPath2, err := createDirInDir(dirPath, "AnotherTestSubDir")
if err != nil {
t.Fatal(err)
}
// dummyContent := "somerandomtestcontentthatshouldbeasserted"
// dirs := []string{
// mountPoint,
// dirPath,
// dirPath2,
// }
// files := []string{"f1.tmp", "f2.tmp"}
// for _, d := range dirs {
// for _, entry := range files {
// tFile, err := createTestFileInPath(d, entry, dummyContent)
// if err != nil {
// t.Fatal(err)
// }
// filesToAssert = append(filesToAssert, tFile)
// }
// }
// if len(filesToAssert) != len(dirs)*len(files) {
// t.Fatalf("should have %d files to assert now, got %d", len(dirs)*len(files), len(filesToAssert))
// }
// //hashRegexp := `[a-f\d]{64}`
// //log.Debug("swarmfs cli test: unmounting first run...", "ipc path", filepath.Join(handlingNode.Dir, handlingNode.IpcPath))
dummyContent := "somerandomtestcontentthatshouldbeasserted"
dirs := []string{
mountPoint,
dirPath,
dirPath2,
}
files := []string{"f1.tmp", "f2.tmp"}
for _, d := range dirs {
for _, entry := range files {
tFile, err := createTestFileInPath(d, entry, dummyContent)
if err != nil {
t.Fatal(err)
}
filesToAssert = append(filesToAssert, tFile)
}
}
if len(filesToAssert) != len(dirs)*len(files) {
t.Fatalf("should have %d files to assert now, got %d", len(dirs)*len(files), len(filesToAssert))
}
//hashRegexp := `[a-f\d]{64}`
//log.Debug("swarmfs cli test: unmounting first run...", "ipc path", filepath.Join(handlingNode.Dir, handlingNode.IpcPath))
// // unmount := runSwarm(t, []string{
// // fmt.Sprintf("--%s", utils.IPCPathFlag.Name), filepath.Join(handlingNode.Dir, handlingNode.IpcPath),
// // "fs",
// // "unmount",
// // mountPoint,
// // }...)
// // _, matches := unmount.ExpectRegexp(hashRegexp)
// // unmount.ExpectExit()
// //
// // hash := matches[0]
// // if hash == mhash {
// // t.Fatal("this should not be equal")
// // }
// // log.Debug("swarmfs cli test: asserting no files in mount point")
// //
// // //check that there's nothing in the mount folder
// // filesInDir, err := ioutil.ReadDir(mountPoint)
// // if err != nil {
// // t.Fatalf("had an error reading the directory: %v", err)
// // }
// //
// // if len(filesInDir) != 0 {
// // t.Fatal("there shouldn't be anything here")
// // }
// //
// // secondMountPoint, err := ioutil.TempDir("", "swarm-test")
// // log.Debug("swarmfs cli test", "2nd mount point at", secondMountPoint)
// // if err != nil {
// // t.Fatal(err)
// // }
// // defer os.RemoveAll(secondMountPoint)
// //
// // log.Debug("swarmfs cli test: remounting at second mount point", "ipc path", filepath.Join(handlingNode.Dir, handlingNode.IpcPath))
// //
// // //remount, check files
// // newMount := runSwarm(t, []string{
// // fmt.Sprintf("--%s", utils.IPCPathFlag.Name), filepath.Join(handlingNode.Dir, handlingNode.IpcPath),
// // "fs",
// // "mount",
// // hash, // the latest hash
// // secondMountPoint,
// // }...)
// //
// // newMount.ExpectExit()
// // time.Sleep(1 * time.Second)
// //
// // filesInDir, err = ioutil.ReadDir(secondMountPoint)
// // if err != nil {
// // t.Fatal(err)
// // }
// //
// // if len(filesInDir) == 0 {
// // t.Fatal("there should be something here")
// // }
// //
// // log.Debug("swarmfs cli test: traversing file tree to see it matches previous mount")
// //
// // for _, file := range filesToAssert {
// // file.filePath = strings.Replace(file.filePath, mountPoint, secondMountPoint, -1)
// // fileBytes, err := ioutil.ReadFile(file.filePath)
// //
// // if err != nil {
// // t.Fatal(err)
// // }
// // if !bytes.Equal(fileBytes, bytes.NewBufferString(file.content).Bytes()) {
// // t.Fatal("this should be equal")
// // }
// // }
// //
// // log.Debug("swarmfs cli test: unmounting second run", "ipc path", filepath.Join(handlingNode.Dir, handlingNode.IpcPath))
// //
// // unmountSec := runSwarm(t, []string{
// // fmt.Sprintf("--%s", utils.IPCPathFlag.Name), filepath.Join(handlingNode.Dir, handlingNode.IpcPath),
// // "fs",
// // "unmount",
// // secondMountPoint,
// // }...)
// //
// // _, matches = unmountSec.ExpectRegexp(hashRegexp)
// // unmountSec.ExpectExit()
// //
// // if matches[0] != hash {
// // t.Fatal("these should be equal - no changes made")
// // }
// }
// unmount := runSwarm(t, []string{
// fmt.Sprintf("--%s", utils.IPCPathFlag.Name), filepath.Join(handlingNode.Dir, handlingNode.IpcPath),
// "fs",
// "unmount",
// mountPoint,
// }...)
// _, matches := unmount.ExpectRegexp(hashRegexp)
// unmount.ExpectExit()
//
// hash := matches[0]
// if hash == mhash {
// t.Fatal("this should not be equal")
// }
// log.Debug("swarmfs cli test: asserting no files in mount point")
//
// //check that there's nothing in the mount folder
// filesInDir, err := ioutil.ReadDir(mountPoint)
// if err != nil {
// t.Fatalf("had an error reading the directory: %v", err)
// }
//
// if len(filesInDir) != 0 {
// t.Fatal("there shouldn't be anything here")
// }
//
// secondMountPoint, err := ioutil.TempDir("", "swarm-test")
// log.Debug("swarmfs cli test", "2nd mount point at", secondMountPoint)
// if err != nil {
// t.Fatal(err)
// }
// defer os.RemoveAll(secondMountPoint)
//
// log.Debug("swarmfs cli test: remounting at second mount point", "ipc path", filepath.Join(handlingNode.Dir, handlingNode.IpcPath))
//
// //remount, check files
// newMount := runSwarm(t, []string{
// fmt.Sprintf("--%s", utils.IPCPathFlag.Name), filepath.Join(handlingNode.Dir, handlingNode.IpcPath),
// "fs",
// "mount",
// hash, // the latest hash
// secondMountPoint,
// }...)
//
// newMount.ExpectExit()
// time.Sleep(1 * time.Second)
//
// filesInDir, err = ioutil.ReadDir(secondMountPoint)
// if err != nil {
// t.Fatal(err)
// }
//
// if len(filesInDir) == 0 {
// t.Fatal("there should be something here")
// }
//
// log.Debug("swarmfs cli test: traversing file tree to see it matches previous mount")
//
// for _, file := range filesToAssert {
// file.filePath = strings.Replace(file.filePath, mountPoint, secondMountPoint, -1)
// fileBytes, err := ioutil.ReadFile(file.filePath)
//
// if err != nil {
// t.Fatal(err)
// }
// if !bytes.Equal(fileBytes, bytes.NewBufferString(file.content).Bytes()) {
// t.Fatal("this should be equal")
// }
// }
//
// log.Debug("swarmfs cli test: unmounting second run", "ipc path", filepath.Join(handlingNode.Dir, handlingNode.IpcPath))
//
// unmountSec := runSwarm(t, []string{
// fmt.Sprintf("--%s", utils.IPCPathFlag.Name), filepath.Join(handlingNode.Dir, handlingNode.IpcPath),
// "fs",
// "unmount",
// secondMountPoint,
// }...)
//
// _, matches = unmountSec.ExpectRegexp(hashRegexp)
// unmountSec.ExpectExit()
//
// if matches[0] != hash {
// t.Fatal("these should be equal - no changes made")
// }
}
// func doUploadEmptyDir(t *testing.T, node *testNode) string {
// // create a tmp dir
// tmpDir, err := ioutil.TempDir("", "swarm-test")
// if err != nil {
// t.Fatal(err)
// }
// defer os.RemoveAll(tmpDir)
func doUploadEmptyDir(t *testing.T, node *testNode) string {
// create a tmp dir
tmpDir, err := ioutil.TempDir("", "swarm-test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
// hashRegexp := `[a-f\d]{64}`
hashRegexp := `[a-f\d]{64}`
// flags := []string{
// "--bzzapi", node.URL,
// "--recursive",
// "up",
// tmpDir}
flags := []string{
"--bzzapi", node.URL,
"--recursive",
"up",
tmpDir}
// log.Info("swarmfs cli test: uploading dir with 'swarm up'")
// up := runSwarm(t, flags...)
// _, matches := up.ExpectRegexp(hashRegexp)
// up.ExpectExit()
// hash := matches[0]
// log.Info("swarmfs cli test: dir uploaded", "hash", hash)
// return hash
// }
log.Info("swarmfs cli test: uploading dir with 'swarm up'")
up := runSwarm(t, flags...)
_, matches := up.ExpectRegexp(hashRegexp)
up.ExpectExit()
hash := matches[0]
log.Info("swarmfs cli test: dir uploaded", "hash", hash)
return hash
}
// func createDirInDir(createInDir string, dirToCreate string) (string, error) {
// fullpath := filepath.Join(createInDir, dirToCreate)
// err := os.MkdirAll(fullpath, 0777)
// if err != nil {
// return "", err
// }
// return fullpath, nil
// }
func createDirInDir(createInDir string, dirToCreate string) (string, error) {
fullpath := filepath.Join(createInDir, dirToCreate)
err := os.MkdirAll(fullpath, 0777)
if err != nil {
return "", err
}
return fullpath, nil
}
// func createTestFileInPath(dir, filename, content string) (*testFile, error) {
// tFile := &testFile{}
// filePath := filepath.Join(dir, filename)
// if file, err := os.Create(filePath); err == nil {
// tFile.content = content
// tFile.filePath = filePath
func createTestFileInPath(dir, filename, content string) (*testFile, error) {
tFile := &testFile{}
filePath := filepath.Join(dir, filename)
if file, err := os.Create(filePath); err == nil {
tFile.content = content
tFile.filePath = filePath
// _, err = io.WriteString(file, content)
// if err != nil {
// return nil, err
// }
// file.Close()
// }
_, err = io.WriteString(file, content)
if err != nil {
return nil, err
}
file.Close()
}
// return tFile, nil
// }
return tFile, nil
}

View file

@ -16,305 +16,306 @@
package protocols
// import (
// "context"
// "flag"
// "fmt"
// "io/ioutil"
// "math/rand"
// "os"
// "path/filepath"
// "reflect"
// "sync"
// "testing"
// "time"
import (
"context"
"flag"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"reflect"
"sync"
"testing"
"time"
// "github.com/mattn/go-colorable"
"github.com/mattn/go-colorable"
// "github.com/ethereum/go-ethereum/log"
// "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
// "github.com/ethereum/go-ethereum/node"
// "github.com/ethereum/go-ethereum/p2p"
// "github.com/ethereum/go-ethereum/p2p/enode"
// "github.com/ethereum/go-ethereum/p2p/simulations"
// "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
// )
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
)
// const (
// content = "123456789"
// )
const (
content = "123456789"
)
// var (
// nodes = flag.Int("nodes", 30, "number of nodes to create (default 30)")
// msgs = flag.Int("msgs", 100, "number of messages sent by node (default 100)")
// loglevel = flag.Int("loglevel", 0, "verbosity of logs")
// rawlog = flag.Bool("rawlog", false, "remove terminal formatting from logs")
// )
var (
nodes = flag.Int("nodes", 30, "number of nodes to create (default 30)")
msgs = flag.Int("msgs", 100, "number of messages sent by node (default 100)")
loglevel = flag.Int("loglevel", 0, "verbosity of logs")
rawlog = flag.Bool("rawlog", false, "remove terminal formatting from logs")
)
// func init() {
// flag.Parse()
// log.PrintOrigins(true)
// log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(!*rawlog))))
// }
func init() {
// flag.Parse()
log.PrintOrigins(true)
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(!*rawlog))))
}
// //TestAccountingSimulation runs a p2p/simulations simulation
// //It creates a *nodes number of nodes, connects each one with each other,
// //then sends out a random selection of messages up to *msgs amount of messages
// //from the test protocol spec.
// //The spec has some accounted messages defined through the Prices interface.
// //The test does accounting for all the message exchanged, and then checks
// //that every node has the same balance with a peer, but with opposite signs.
// //Balance(AwithB) = 0 - Balance(BwithA) or Abs|Balance(AwithB)| == Abs|Balance(BwithA)|
// func TestAccountingSimulation(t *testing.T) {
// //setup the balances objects for every node
// bal := newBalances(*nodes)
// //setup the metrics system or tests will fail trying to write metrics
// dir, err := ioutil.TempDir("", "account-sim")
// if err != nil {
// t.Fatal(err)
// }
// defer os.RemoveAll(dir)
// SetupAccountingMetrics(1*time.Second, filepath.Join(dir, "metrics.db"))
// //define the node.Service for this test
// services := adapters.Services{
// "accounting": func(ctx *adapters.ServiceContext) (node.Service, error) {
// return bal.newNode(), nil
// },
// }
// //setup the simulation
// adapter := adapters.NewSimAdapter(services)
// net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{DefaultService: "accounting"})
// defer net.Shutdown()
//TestAccountingSimulation runs a p2p/simulations simulation
//It creates a *nodes number of nodes, connects each one with each other,
//then sends out a random selection of messages up to *msgs amount of messages
//from the test protocol spec.
//The spec has some accounted messages defined through the Prices interface.
//The test does accounting for all the message exchanged, and then checks
//that every node has the same balance with a peer, but with opposite signs.
//Balance(AwithB) = 0 - Balance(BwithA) or Abs|Balance(AwithB)| == Abs|Balance(BwithA)|
func TestAccountingSimulation(t *testing.T) {
t.Skip("Test no longer works")
//setup the balances objects for every node
bal := newBalances(*nodes)
//setup the metrics system or tests will fail trying to write metrics
dir, err := ioutil.TempDir("", "account-sim")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
SetupAccountingMetrics(1*time.Second, filepath.Join(dir, "metrics.db"))
//define the node.Service for this test
services := adapters.Services{
"accounting": func(ctx *adapters.ServiceContext) (node.Service, error) {
return bal.newNode(), nil
},
}
//setup the simulation
adapter := adapters.NewSimAdapter(services)
net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{DefaultService: "accounting"})
defer net.Shutdown()
// // we send msgs messages per node, wait for all messages to arrive
// bal.wg.Add(*nodes * *msgs)
// trigger := make(chan enode.ID)
// go func() {
// // wait for all of them to arrive
// bal.wg.Wait()
// // then trigger a check
// // the selected node for the trigger is irrelevant,
// // we just want to trigger the end of the simulation
// trigger <- net.Nodes[0].ID()
// }()
// we send msgs messages per node, wait for all messages to arrive
bal.wg.Add(*nodes * *msgs)
trigger := make(chan enode.ID)
go func() {
// wait for all of them to arrive
bal.wg.Wait()
// then trigger a check
// the selected node for the trigger is irrelevant,
// we just want to trigger the end of the simulation
trigger <- net.Nodes[0].ID()
}()
// // create nodes and start them
// for i := 0; i < *nodes; i++ {
// conf := adapters.RandomNodeConfig()
// bal.id2n[conf.ID] = i
// if _, err := net.NewNodeWithConfig(conf); err != nil {
// t.Fatal(err)
// }
// if err := net.Start(conf.ID); err != nil {
// t.Fatal(err)
// }
// }
// // fully connect nodes
// for i, n := range net.Nodes {
// for _, m := range net.Nodes[i+1:] {
// if err := net.Connect(n.ID(), m.ID()); err != nil {
// t.Fatal(err)
// }
// }
// }
// create nodes and start them
for i := 0; i < *nodes; i++ {
conf := adapters.RandomNodeConfig()
bal.id2n[conf.ID] = i
if _, err := net.NewNodeWithConfig(conf); err != nil {
t.Fatal(err)
}
if err := net.Start(conf.ID); err != nil {
t.Fatal(err)
}
}
// fully connect nodes
for i, n := range net.Nodes {
for _, m := range net.Nodes[i+1:] {
if err := net.Connect(n.ID(), m.ID()); err != nil {
t.Fatal(err)
}
}
}
// // empty action
// action := func(ctx context.Context) error {
// return nil
// }
// // check always checks out
// check := func(ctx context.Context, id enode.ID) (bool, error) {
// return true, nil
// }
// empty action
action := func(ctx context.Context) error {
return nil
}
// check always checks out
check := func(ctx context.Context, id enode.ID) (bool, error) {
return true, nil
}
// // run simulation
// timeout := 30 * time.Second
// ctx, cancel := context.WithTimeout(context.Background(), timeout)
// defer cancel()
// result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{
// Action: action,
// Trigger: trigger,
// Expect: &simulations.Expectation{
// Nodes: []enode.ID{net.Nodes[0].ID()},
// Check: check,
// },
// })
// run simulation
timeout := 30 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{
Action: action,
Trigger: trigger,
Expect: &simulations.Expectation{
Nodes: []enode.ID{net.Nodes[0].ID()},
Check: check,
},
})
// if result.Error != nil {
// t.Fatal(result.Error)
// }
if result.Error != nil {
t.Fatal(result.Error)
}
// // check if balance matrix is symmetric
// if err := bal.symmetric(); err != nil {
// t.Fatal(err)
// }
// }
// check if balance matrix is symmetric
if err := bal.symmetric(); err != nil {
t.Fatal(err)
}
}
// // matrix is a matrix of nodes and its balances
// // matrix is in fact a linear array of size n*n,
// // so the balance for any node A with B is at index
// // A*n + B, while the balance of node B with A is at
// // B*n + A
// // (n entries in the array will not be filled -
// // the balance of a node with itself)
// type matrix struct {
// n int //number of nodes
// m []int64 //array of balances
// }
// matrix is a matrix of nodes and its balances
// matrix is in fact a linear array of size n*n,
// so the balance for any node A with B is at index
// A*n + B, while the balance of node B with A is at
// B*n + A
// (n entries in the array will not be filled -
// the balance of a node with itself)
type matrix struct {
n int //number of nodes
m []int64 //array of balances
}
// // create a new matrix
// func newMatrix(n int) *matrix {
// return &matrix{
// n: n,
// m: make([]int64, n*n),
// }
// }
// create a new matrix
func newMatrix(n int) *matrix {
return &matrix{
n: n,
m: make([]int64, n*n),
}
}
// // called from the testBalance's Add accounting function: register balance change
// func (m *matrix) add(i, j int, v int64) error {
// // index for the balance of local node i with remote nodde j is
// // i * number of nodes + remote node
// mi := i*m.n + j
// // register that balance
// m.m[mi] += v
// return nil
// }
// called from the testBalance's Add accounting function: register balance change
func (m *matrix) add(i, j int, v int64) error {
// index for the balance of local node i with remote nodde j is
// i * number of nodes + remote node
mi := i*m.n + j
// register that balance
m.m[mi] += v
return nil
}
// // check that the balances are symmetric:
// // balance of node i with node j is the same as j with i but with inverted signs
// func (m *matrix) symmetric() error {
// //iterate all nodes
// for i := 0; i < m.n; i++ {
// //iterate starting +1
// for j := i + 1; j < m.n; j++ {
// log.Debug("bal", "1", i, "2", j, "i,j", m.m[i*m.n+j], "j,i", m.m[j*m.n+i])
// if m.m[i*m.n+j] != -m.m[j*m.n+i] {
// return fmt.Errorf("value mismatch. m[%v, %v] = %v; m[%v, %v] = %v", i, j, m.m[i*m.n+j], j, i, m.m[j*m.n+i])
// }
// }
// }
// return nil
// }
// check that the balances are symmetric:
// balance of node i with node j is the same as j with i but with inverted signs
func (m *matrix) symmetric() error {
//iterate all nodes
for i := 0; i < m.n; i++ {
//iterate starting +1
for j := i + 1; j < m.n; j++ {
log.Debug("bal", "1", i, "2", j, "i,j", m.m[i*m.n+j], "j,i", m.m[j*m.n+i])
if m.m[i*m.n+j] != -m.m[j*m.n+i] {
return fmt.Errorf("value mismatch. m[%v, %v] = %v; m[%v, %v] = %v", i, j, m.m[i*m.n+j], j, i, m.m[j*m.n+i])
}
}
}
return nil
}
// // all the balances
// type balances struct {
// i int
// *matrix
// id2n map[enode.ID]int
// wg *sync.WaitGroup
// }
// all the balances
type balances struct {
i int
*matrix
id2n map[enode.ID]int
wg *sync.WaitGroup
}
// func newBalances(n int) *balances {
// return &balances{
// matrix: newMatrix(n),
// id2n: make(map[enode.ID]int),
// wg: &sync.WaitGroup{},
// }
// }
func newBalances(n int) *balances {
return &balances{
matrix: newMatrix(n),
id2n: make(map[enode.ID]int),
wg: &sync.WaitGroup{},
}
}
// // create a new testNode for every node created as part of the service
// func (b *balances) newNode() *testNode {
// defer func() { b.i++ }()
// return &testNode{
// bal: b,
// i: b.i,
// peers: make([]*testPeer, b.n), //a node will be connected to n-1 peers
// }
// }
// create a new testNode for every node created as part of the service
func (b *balances) newNode() *testNode {
defer func() { b.i++ }()
return &testNode{
bal: b,
i: b.i,
peers: make([]*testPeer, b.n), //a node will be connected to n-1 peers
}
}
// type testNode struct {
// bal *balances
// i int
// lock sync.Mutex
// peers []*testPeer
// peerCount int
// }
type testNode struct {
bal *balances
i int
lock sync.Mutex
peers []*testPeer
peerCount int
}
// // do the accounting for the peer's test protocol
// // testNode implements protocols.Balance
// func (t *testNode) Add(a int64, p *Peer) error {
// //get the index for the remote peer
// remote := t.bal.id2n[p.ID()]
// log.Debug("add", "local", t.i, "remote", remote, "amount", a)
// return t.bal.add(t.i, remote, a)
// }
// do the accounting for the peer's test protocol
// testNode implements protocols.Balance
func (t *testNode) Add(a int64, p *Peer) error {
//get the index for the remote peer
remote := t.bal.id2n[p.ID()]
log.Debug("add", "local", t.i, "remote", remote, "amount", a)
return t.bal.add(t.i, remote, a)
}
// //run the p2p protocol
// //for every node, represented by testNode, create a remote testPeer
// func (t *testNode) run(p *p2p.Peer, rw p2p.MsgReadWriter) error {
// spec := createTestSpec()
// //create accounting hook
// spec.Hook = NewAccounting(t, &dummyPrices{})
//run the p2p protocol
//for every node, represented by testNode, create a remote testPeer
func (t *testNode) run(p *p2p.Peer, rw p2p.MsgReadWriter) error {
spec := createTestSpec()
//create accounting hook
spec.Hook = NewAccounting(t, &dummyPrices{})
// //create a peer for this node
// tp := &testPeer{NewPeer(p, rw, spec), t.i, t.bal.id2n[p.ID()], t.bal.wg}
// t.lock.Lock()
// t.peers[t.bal.id2n[p.ID()]] = tp
// t.peerCount++
// if t.peerCount == t.bal.n-1 {
// //when all peer connections are established, start sending messages from this peer
// go t.send()
// }
// t.lock.Unlock()
// return tp.Run(tp.handle)
// }
//create a peer for this node
tp := &testPeer{NewPeer(p, rw, spec), t.i, t.bal.id2n[p.ID()], t.bal.wg}
t.lock.Lock()
t.peers[t.bal.id2n[p.ID()]] = tp
t.peerCount++
if t.peerCount == t.bal.n-1 {
//when all peer connections are established, start sending messages from this peer
go t.send()
}
t.lock.Unlock()
return tp.Run(tp.handle)
}
// // p2p message receive handler function
// func (tp *testPeer) handle(ctx context.Context, msg interface{}) error {
// tp.wg.Done()
// log.Debug("receive", "from", tp.remote, "to", tp.local, "type", reflect.TypeOf(msg), "msg", msg)
// return nil
// }
// p2p message receive handler function
func (tp *testPeer) handle(ctx context.Context, msg interface{}) error {
tp.wg.Done()
log.Debug("receive", "from", tp.remote, "to", tp.local, "type", reflect.TypeOf(msg), "msg", msg)
return nil
}
// type testPeer struct {
// *Peer
// local, remote int
// wg *sync.WaitGroup
// }
type testPeer struct {
*Peer
local, remote int
wg *sync.WaitGroup
}
// func (t *testNode) send() {
// log.Debug("start sending")
// for i := 0; i < *msgs; i++ {
// //determine randomly to which peer to send
// whom := rand.Intn(t.bal.n - 1)
// if whom >= t.i {
// whom++
// }
// t.lock.Lock()
// p := t.peers[whom]
// t.lock.Unlock()
func (t *testNode) send() {
log.Debug("start sending")
for i := 0; i < *msgs; i++ {
//determine randomly to which peer to send
whom := rand.Intn(t.bal.n - 1)
if whom >= t.i {
whom++
}
t.lock.Lock()
p := t.peers[whom]
t.lock.Unlock()
// //determine a random message from the spec's messages to be sent
// which := rand.Intn(len(p.spec.Messages))
// msg := p.spec.Messages[which]
// switch msg.(type) {
// case *perBytesMsgReceiverPays:
// msg = &perBytesMsgReceiverPays{Content: content[:rand.Intn(len(content))]}
// case *perBytesMsgSenderPays:
// msg = &perBytesMsgSenderPays{Content: content[:rand.Intn(len(content))]}
// }
// log.Debug("send", "from", t.i, "to", whom, "type", reflect.TypeOf(msg), "msg", msg)
// p.Send(context.TODO(), msg)
// }
// }
//determine a random message from the spec's messages to be sent
which := rand.Intn(len(p.spec.Messages))
msg := p.spec.Messages[which]
switch msg.(type) {
case *perBytesMsgReceiverPays:
msg = &perBytesMsgReceiverPays{Content: content[:rand.Intn(len(content))]}
case *perBytesMsgSenderPays:
msg = &perBytesMsgSenderPays{Content: content[:rand.Intn(len(content))]}
}
log.Debug("send", "from", t.i, "to", whom, "type", reflect.TypeOf(msg), "msg", msg)
p.Send(context.TODO(), msg)
}
}
// // define the protocol
// func (t *testNode) Protocols() []p2p.Protocol {
// return []p2p.Protocol{{
// Length: 100,
// Run: t.run,
// }}
// }
// define the protocol
func (t *testNode) Protocols() []p2p.Protocol {
return []p2p.Protocol{{
Length: 100,
Run: t.run,
}}
}
// func (t *testNode) APIs() []rpc.API {
// return nil
// }
func (t *testNode) APIs() []rpc.API {
return nil
}
// func (t *testNode) Start(server *p2p.Server) error {
// return nil
// }
func (t *testNode) Start(server *p2p.Server) error {
return nil
}
// func (t *testNode) Stop() error {
// return nil
// }
func (t *testNode) Stop() error {
return nil
}

View file

@ -17,10 +17,12 @@
package network
import (
"bytes"
"context"
"fmt"
"math/rand"
"strings"
"testing"
"time"
"github.com/ethereum/go-ethereum/log"
@ -62,48 +64,49 @@ Nodes should only connect with other nodes with the same network ID.
After the setup phase, the test checks on each node if it has the
expected node connections (excluding those not sharing the network ID).
*/
// func TestNetworkID(t *testing.T) {
// log.Debug("Start test")
// //arbitrarily set the number of nodes. It could be any number
// numNodes := 24
// //the nodeMap maps all nodes (slice value) with the same network ID (key)
// nodeMap = make(map[int][]enode.ID)
// //set up the network and connect nodes
// net, err := setupNetwork(numNodes)
// if err != nil {
// t.Fatalf("Error setting up network: %v", err)
// }
// //let's sleep to ensure all nodes are connected
// time.Sleep(1 * time.Second)
// // shutdown the the network to avoid race conditions
// // on accessing kademlias global map while network nodes
// // are accepting messages
// net.Shutdown()
// //for each group sharing the same network ID...
// for _, netIDGroup := range nodeMap {
// log.Trace("netIDGroup size", "size", len(netIDGroup))
// //...check that their size of the kademlia is of the expected size
// //the assumption is that it should be the size of the group minus 1 (the node itself)
// for _, node := range netIDGroup {
// if kademlias[node].addrs.Size() != len(netIDGroup)-1 {
// t.Fatalf("Kademlia size has not expected peer size. Kademlia size: %d, expected size: %d", kademlias[node].addrs.Size(), len(netIDGroup)-1)
// }
// kademlias[node].EachAddr(nil, 0, func(addr *BzzAddr, _ int) bool {
// found := false
// for _, nd := range netIDGroup {
// if bytes.Equal(kademlias[nd].BaseAddr(), addr.Address()) {
// found = true
// }
// }
// if !found {
// t.Fatalf("Expected node not found for node %s", node.String())
// }
// return true
// })
// }
// }
// log.Info("Test terminated successfully")
// }
func TestNetworkID(t *testing.T) {
t.Skip("Test no longer work for XDC")
log.Debug("Start test")
//arbitrarily set the number of nodes. It could be any number
numNodes := 24
//the nodeMap maps all nodes (slice value) with the same network ID (key)
nodeMap = make(map[int][]enode.ID)
//set up the network and connect nodes
net, err := setupNetwork(numNodes)
if err != nil {
t.Fatalf("Error setting up network: %v", err)
}
//let's sleep to ensure all nodes are connected
time.Sleep(1 * time.Second)
// shutdown the the network to avoid race conditions
// on accessing kademlias global map while network nodes
// are accepting messages
net.Shutdown()
//for each group sharing the same network ID...
for _, netIDGroup := range nodeMap {
log.Trace("netIDGroup size", "size", len(netIDGroup))
//...check that their size of the kademlia is of the expected size
//the assumption is that it should be the size of the group minus 1 (the node itself)
for _, node := range netIDGroup {
if kademlias[node].addrs.Size() != len(netIDGroup)-1 {
t.Fatalf("Kademlia size has not expected peer size. Kademlia size: %d, expected size: %d", kademlias[node].addrs.Size(), len(netIDGroup)-1)
}
kademlias[node].EachAddr(nil, 0, func(addr *BzzAddr, _ int) bool {
found := false
for _, nd := range netIDGroup {
if bytes.Equal(kademlias[nd].BaseAddr(), addr.Address()) {
found = true
}
}
if !found {
t.Fatalf("Expected node not found for node %s", node.String())
}
return true
})
}
}
log.Info("Test terminated successfully")
}
// setup simulated network with bzz/discovery and pss services.
// connects nodes in a circle

View file

@ -17,12 +17,18 @@
package simulation
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/network"
)
func TestUpDownNodeIDs(t *testing.T) {
@ -270,43 +276,44 @@ func TestAddNodesAndConnectStar(t *testing.T) {
}
//To test that uploading a snapshot works
// func TestUploadSnapshot(t *testing.T) {
// log.Debug("Creating simulation")
// s := New(map[string]ServiceFunc{
// "bzz": func(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) {
// addr := network.NewAddr(ctx.Config.Node())
// hp := network.NewHiveParams()
// hp.Discovery = false
// config := &network.BzzConfig{
// OverlayAddr: addr.Over(),
// UnderlayAddr: addr.Under(),
// HiveParams: hp,
// }
// kad := network.NewKademlia(addr.Over(), network.NewKadParams())
// return network.NewBzz(config, kad, nil, nil, nil), nil, nil
// },
// })
// defer s.Close()
func TestUploadSnapshot(t *testing.T) {
t.Skip("Broken test for XDC")
log.Debug("Creating simulation")
s := New(map[string]ServiceFunc{
"bzz": func(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) {
addr := network.NewAddr(ctx.Config.Node())
hp := network.NewHiveParams()
hp.Discovery = false
config := &network.BzzConfig{
OverlayAddr: addr.Over(),
UnderlayAddr: addr.Under(),
HiveParams: hp,
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
return network.NewBzz(config, kad, nil, nil, nil), nil, nil
},
})
defer s.Close()
// nodeCount := 16
// log.Debug("Uploading snapshot")
// err := s.UploadSnapshot(fmt.Sprintf("../stream/testing/snapshot_%d.json", nodeCount))
// if err != nil {
// t.Fatalf("Error uploading snapshot to simulation network: %v", err)
// }
nodeCount := 16
log.Debug("Uploading snapshot")
err := s.UploadSnapshot(fmt.Sprintf("../stream/testing/snapshot_%d.json", nodeCount))
if err != nil {
t.Fatalf("Error uploading snapshot to simulation network: %v", err)
}
// ctx := context.Background()
// log.Debug("Starting simulation...")
// s.Run(ctx, func(ctx context.Context, sim *Simulation) error {
// log.Debug("Checking")
// nodes := sim.UpNodeIDs()
// if len(nodes) != nodeCount {
// t.Fatal("Simulation network node number doesn't match snapshot node number")
// }
// return nil
// })
// log.Debug("Done.")
// }
ctx := context.Background()
log.Debug("Starting simulation...")
s.Run(ctx, func(ctx context.Context, sim *Simulation) error {
log.Debug("Checking")
nodes := sim.UpNodeIDs()
if len(nodes) != nodeCount {
t.Fatal("Simulation network node number doesn't match snapshot node number")
}
return nil
})
log.Debug("Done.")
}
func TestStartStopNode(t *testing.T) {
sim := New(noopServiceFuncMap)

View file

@ -121,9 +121,10 @@ func BenchmarkDiscovery_64_4(b *testing.B) { benchmarkDiscovery(b, 64, 4) }
func BenchmarkDiscovery_128_4(b *testing.B) { benchmarkDiscovery(b, 128, 4) }
func BenchmarkDiscovery_256_4(b *testing.B) { benchmarkDiscovery(b, 256, 4) }
// func TestDiscoverySimulationExecAdapter(t *testing.T) {
// testDiscoverySimulationExecAdapter(t, *nodeCount, *initCount)
// }
func TestDiscoverySimulationExecAdapter(t *testing.T) {
t.Skip("Test no longer work for XDC")
testDiscoverySimulationExecAdapter(t, *nodeCount, *initCount)
}
func testDiscoverySimulationExecAdapter(t *testing.T, nodes, conns int) {
baseDir, err := ioutil.TempDir("", "swarm-test")
@ -134,13 +135,15 @@ func testDiscoverySimulationExecAdapter(t *testing.T, nodes, conns int) {
testDiscoverySimulation(t, nodes, conns, adapters.NewExecAdapter(baseDir))
}
// func TestDiscoverySimulationSimAdapter(t *testing.T) {
// testDiscoverySimulationSimAdapter(t, *nodeCount, *initCount)
// }
func TestDiscoverySimulationSimAdapter(t *testing.T) {
t.Skip("Test no longer work for XDC")
testDiscoverySimulationSimAdapter(t, *nodeCount, *initCount)
}
// func TestDiscoveryPersistenceSimulationSimAdapter(t *testing.T) {
// testDiscoveryPersistenceSimulationSimAdapter(t, *nodeCount, *initCount)
// }
func TestDiscoveryPersistenceSimulationSimAdapter(t *testing.T) {
t.Skip("Test no longer work for XDC")
testDiscoveryPersistenceSimulationSimAdapter(t, *nodeCount, *initCount)
}
func testDiscoveryPersistenceSimulationSimAdapter(t *testing.T, nodes, conns int) {
testDiscoveryPersistenceSimulation(t, nodes, conns, adapters.NewSimAdapter(services))

View file

@ -42,27 +42,28 @@ const (
//provided to the test.
//Files are uploaded to nodes, other nodes try to retrieve the file
//Number of nodes can be provided via commandline too.
// func TestFileRetrieval(t *testing.T) {
// if *nodes != 0 {
// err := runFileRetrievalTest(*nodes)
// if err != nil {
// t.Fatal(err)
// }
// } else {
// nodeCnt := []int{16}
// //if the `longrunning` flag has been provided
// //run more test combinations
// if *longrunning {
// nodeCnt = append(nodeCnt, 32, 64, 128)
// }
// for _, n := range nodeCnt {
// err := runFileRetrievalTest(n)
// if err != nil {
// t.Fatal(err)
// }
// }
// }
// }
func TestFileRetrieval(t *testing.T) {
t.Skip("Test no longer work for XDC")
if *nodes != 0 {
err := runFileRetrievalTest(*nodes)
if err != nil {
t.Fatal(err)
}
} else {
nodeCnt := []int{16}
//if the `longrunning` flag has been provided
//run more test combinations
if *longrunning {
nodeCnt = append(nodeCnt, 32, 64, 128)
}
for _, n := range nodeCnt {
err := runFileRetrievalTest(n)
if err != nil {
t.Fatal(err)
}
}
}
}
//This test is a retrieval test for nodes.
//One node is randomly selected to be the pivot node.
@ -70,39 +71,40 @@ const (
//provided to the test, the number of chunks is uploaded
//to the pivot node and other nodes try to retrieve the chunk(s).
//Number of chunks and nodes can be provided via commandline too.
// func TestRetrieval(t *testing.T) {
// //if nodes/chunks have been provided via commandline,
// //run the tests with these values
// if *nodes != 0 && *chunks != 0 {
// err := runRetrievalTest(t, *chunks, *nodes)
// if err != nil {
// t.Fatal(err)
// }
// } else {
// var nodeCnt []int
// var chnkCnt []int
// //if the `longrunning` flag has been provided
// //run more test combinations
// if *longrunning {
// nodeCnt = []int{16, 32, 128}
// chnkCnt = []int{4, 32, 256}
// } else {
// //default test
// nodeCnt = []int{16}
// chnkCnt = []int{32}
// }
// for _, n := range nodeCnt {
// for _, c := range chnkCnt {
// t.Run(fmt.Sprintf("TestRetrieval_%d_%d", n, c), func(t *testing.T) {
// err := runRetrievalTest(t, c, n)
// if err != nil {
// t.Fatal(err)
// }
// })
// }
// }
// }
// }
func TestRetrieval(t *testing.T) {
t.Skip("Test no longer work for XDC")
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
err := runRetrievalTest(t, *chunks, *nodes)
if err != nil {
t.Fatal(err)
}
} else {
var nodeCnt []int
var chnkCnt []int
//if the `longrunning` flag has been provided
//run more test combinations
if *longrunning {
nodeCnt = []int{16, 32, 128}
chnkCnt = []int{4, 32, 256}
} else {
//default test
nodeCnt = []int{16}
chnkCnt = []int{32}
}
for _, n := range nodeCnt {
for _, c := range chnkCnt {
t.Run(fmt.Sprintf("TestRetrieval_%d_%d", n, c), func(t *testing.T) {
err := runRetrievalTest(t, c, n)
if err != nil {
t.Fatal(err)
}
})
}
}
}
}
var retrievalSimServiceMap = map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {

View file

@ -20,6 +20,8 @@ import (
"errors"
"fmt"
"io/ioutil"
"os"
"runtime"
"sync"
"testing"
"time"
@ -74,45 +76,46 @@ func dummyRequestFromPeers(_ context.Context, req *network.Request) (*enode.ID,
//to the pivot node, and we check that nodes get the chunks
//they are expected to store based on the syncing protocol.
//Number of chunks and nodes can be provided via commandline too.
// func TestSyncingViaGlobalSync(t *testing.T) {
// if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" {
// t.Skip("Flaky on mac on travis")
// }
// //if nodes/chunks have been provided via commandline,
// //run the tests with these values
// if *nodes != 0 && *chunks != 0 {
// log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
// testSyncingViaGlobalSync(t, *chunks, *nodes)
// } else {
// var nodeCnt []int
// var chnkCnt []int
// //if the `longrunning` flag has been provided
// //run more test combinations
// if *longrunning {
// chnkCnt = []int{1, 8, 32, 256, 1024}
// nodeCnt = []int{16, 32, 64, 128, 256}
// } else if raceTest {
// // TestSyncingViaGlobalSync allocates a lot of memory
// // with race detector. By reducing the number of chunks
// // and nodes, memory consumption is lower and data races
// // are still checked, while correctness of syncing is
// // tested with more chunks and nodes in regular (!race)
// // tests.
// chnkCnt = []int{4}
// nodeCnt = []int{16}
// } else {
// //default test
// chnkCnt = []int{4, 32}
// nodeCnt = []int{32, 16}
// }
// for _, chnk := range chnkCnt {
// for _, n := range nodeCnt {
// log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
// testSyncingViaGlobalSync(t, chnk, n)
// }
// }
// }
// }
func TestSyncingViaGlobalSync(t *testing.T) {
t.Skip("Flaky test")
if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" {
t.Skip("Flaky on mac on travis")
}
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
testSyncingViaGlobalSync(t, *chunks, *nodes)
} else {
var nodeCnt []int
var chnkCnt []int
//if the `longrunning` flag has been provided
//run more test combinations
if *longrunning {
chnkCnt = []int{1, 8, 32, 256, 1024}
nodeCnt = []int{16, 32, 64, 128, 256}
} else if raceTest {
// TestSyncingViaGlobalSync allocates a lot of memory
// with race detector. By reducing the number of chunks
// and nodes, memory consumption is lower and data races
// are still checked, while correctness of syncing is
// tested with more chunks and nodes in regular (!race)
// tests.
chnkCnt = []int{4}
nodeCnt = []int{16}
} else {
//default test
chnkCnt = []int{4, 32}
nodeCnt = []int{32, 16}
}
for _, chnk := range chnkCnt {
for _, n := range nodeCnt {
log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
testSyncingViaGlobalSync(t, chnk, n)
}
}
}
}
var simServiceMap = map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {

View file

@ -22,15 +22,20 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"golang.org/x/crypto/sha3"
)
@ -1171,162 +1176,163 @@ TestGetSubscriptionsRPC sets up a simulation network of `nodeCount` nodes,
starts the simulation, waits for SyncUpdateDelay in order to kick off
stream registration, then tests that there are subscriptions.
*/
// func TestGetSubscriptionsRPC(t *testing.T) {
func TestGetSubscriptionsRPC(t *testing.T) {
t.Skip("Test no longer work for XDC")
// // arbitrarily set to 4
// nodeCount := 4
// // run with more nodes if `longrunning` flag is set
// if *longrunning {
// nodeCount = 64
// }
// // set the syncUpdateDelay for sync registrations to start
// syncUpdateDelay := 200 * time.Millisecond
// // holds the msg code for SubscribeMsg
// var subscribeMsgCode uint64
// var ok bool
// var expectedMsgCount counter
// arbitrarily set to 4
nodeCount := 4
// run with more nodes if `longrunning` flag is set
if *longrunning {
nodeCount = 64
}
// set the syncUpdateDelay for sync registrations to start
syncUpdateDelay := 200 * time.Millisecond
// holds the msg code for SubscribeMsg
var subscribeMsgCode uint64
var ok bool
var expectedMsgCount counter
// // this channel signalizes that the expected amount of subscriptiosn is done
// allSubscriptionsDone := make(chan struct{})
// // after the test, we need to reset the subscriptionFunc to the default
// defer func() { subscriptionFunc = doRequestSubscription }()
// this channel signalizes that the expected amount of subscriptiosn is done
allSubscriptionsDone := make(chan struct{})
// after the test, we need to reset the subscriptionFunc to the default
defer func() { subscriptionFunc = doRequestSubscription }()
// // we use this subscriptionFunc for this test: just increases count and calls the actual subscription
// subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
// expectedMsgCount.inc()
// doRequestSubscription(r, p, bin, subs)
// return true
// }
// // create a standard sim
// sim := simulation.New(map[string]simulation.ServiceFunc{
// "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
// addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
// if err != nil {
// return nil, nil, err
// }
// we use this subscriptionFunc for this test: just increases count and calls the actual subscription
subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
expectedMsgCount.inc()
doRequestSubscription(r, p, bin, subs)
return true
}
// create a standard sim
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
if err != nil {
return nil, nil, err
}
// // configure so that sync registrations actually happen
// r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
// Retrieval: RetrievalEnabled,
// Syncing: SyncingAutoSubscribe, //enable sync registrations
// SyncUpdateDelay: syncUpdateDelay,
// }, nil)
// configure so that sync registrations actually happen
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalEnabled,
Syncing: SyncingAutoSubscribe, //enable sync registrations
SyncUpdateDelay: syncUpdateDelay,
}, nil)
// // get the SubscribeMsg code
// subscribeMsgCode, ok = r.GetSpec().GetCode(SubscribeMsg{})
// if !ok {
// t.Fatal("Message code for SubscribeMsg not found")
// }
// get the SubscribeMsg code
subscribeMsgCode, ok = r.GetSpec().GetCode(SubscribeMsg{})
if !ok {
t.Fatal("Message code for SubscribeMsg not found")
}
// cleanup = func() {
// r.Close()
// clean()
// }
cleanup = func() {
r.Close()
clean()
}
// return r, cleanup, nil
// },
// })
// defer sim.Close()
return r, cleanup, nil
},
})
defer sim.Close()
// ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
// defer cancelSimRun()
ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancelSimRun()
// // upload a snapshot
// err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
// if err != nil {
// t.Fatal(err)
// }
// upload a snapshot
err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
if err != nil {
t.Fatal(err)
}
// // setup the filter for SubscribeMsg
// msgs := sim.PeerEvents(
// context.Background(),
// sim.NodeIDs(),
// simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(subscribeMsgCode),
// )
// setup the filter for SubscribeMsg
msgs := sim.PeerEvents(
context.Background(),
sim.NodeIDs(),
simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(subscribeMsgCode),
)
// // strategy: listen to all SubscribeMsg events; after every event we wait
// // if after `waitDuration` no more messages are being received, we assume the
// // subscription phase has terminated!
// strategy: listen to all SubscribeMsg events; after every event we wait
// if after `waitDuration` no more messages are being received, we assume the
// subscription phase has terminated!
// // the loop in this go routine will either wait for new message events
// // or times out after 1 second, which signals that we are not receiving
// // any new subscriptions any more
// go func() {
// //for long running sims, waiting 1 sec will not be enough
// waitDuration := time.Duration(nodeCount/16) * time.Second
// for {
// select {
// case <-ctx.Done():
// return
// case m := <-msgs: // just reset the loop
// if m.Error != nil {
// log.Error("stream message", "err", m.Error)
// continue
// }
// log.Trace("stream message", "node", m.NodeID, "peer", m.PeerID)
// case <-time.After(waitDuration):
// // one second passed, don't assume more subscriptions
// allSubscriptionsDone <- struct{}{}
// log.Info("All subscriptions received")
// return
// the loop in this go routine will either wait for new message events
// or times out after 1 second, which signals that we are not receiving
// any new subscriptions any more
go func() {
//for long running sims, waiting 1 sec will not be enough
waitDuration := time.Duration(nodeCount/16) * time.Second
for {
select {
case <-ctx.Done():
return
case m := <-msgs: // just reset the loop
if m.Error != nil {
log.Error("stream message", "err", m.Error)
continue
}
log.Trace("stream message", "node", m.NodeID, "peer", m.PeerID)
case <-time.After(waitDuration):
// one second passed, don't assume more subscriptions
allSubscriptionsDone <- struct{}{}
log.Info("All subscriptions received")
return
// }
// }
// }()
}
}
}()
// //run the simulation
// result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
// log.Info("Simulation running")
// nodes := sim.Net.Nodes
//run the simulation
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
log.Info("Simulation running")
nodes := sim.Net.Nodes
// //wait until all subscriptions are done
// select {
// case <-allSubscriptionsDone:
// case <-ctx.Done():
// return errors.New("Context timed out")
// }
//wait until all subscriptions are done
select {
case <-allSubscriptionsDone:
case <-ctx.Done():
return errors.New("Context timed out")
}
// log.Debug("Expected message count: ", "expectedMsgCount", expectedMsgCount.count())
// //now iterate again, this time we call each node via RPC to get its subscriptions
// realCount := 0
// for _, node := range nodes {
// //create rpc client
// client, err := node.Client()
// if err != nil {
// return fmt.Errorf("create node 1 rpc client fail: %v", err)
// }
log.Debug("Expected message count: ", "expectedMsgCount", expectedMsgCount.count())
//now iterate again, this time we call each node via RPC to get its subscriptions
realCount := 0
for _, node := range nodes {
//create rpc client
client, err := node.Client()
if err != nil {
return fmt.Errorf("create node 1 rpc client fail: %v", err)
}
// //ask it for subscriptions
// pstreams := make(map[string][]string)
// err = client.Call(&pstreams, "stream_getPeerSubscriptions")
// if err != nil {
// return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err)
// }
// //length of the subscriptions can not be smaller than number of peers
// log.Debug("node subscriptions", "node", node.String())
// for p, ps := range pstreams {
// log.Debug("... with", "peer", p)
// for _, s := range ps {
// log.Debug(".......", "stream", s)
// // each node also has subscriptions to RETRIEVE_REQUEST streams,
// // we need to ignore those, we are only counting SYNC streams
// if !strings.HasPrefix(s, "RETRIEVE_REQUEST") {
// realCount++
// }
// }
// }
// }
// // every node is mutually subscribed to each other, so the actual count is half of it
// emc := expectedMsgCount.count()
// if realCount/2 != emc {
// return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, emc)
// }
// return nil
// })
// if result.Error != nil {
// t.Fatal(result.Error)
// }
// }
//ask it for subscriptions
pstreams := make(map[string][]string)
err = client.Call(&pstreams, "stream_getPeerSubscriptions")
if err != nil {
return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err)
}
//length of the subscriptions can not be smaller than number of peers
log.Debug("node subscriptions", "node", node.String())
for p, ps := range pstreams {
log.Debug("... with", "peer", p)
for _, s := range ps {
log.Debug(".......", "stream", s)
// each node also has subscriptions to RETRIEVE_REQUEST streams,
// we need to ignore those, we are only counting SYNC streams
if !strings.HasPrefix(s, "RETRIEVE_REQUEST") {
realCount++
}
}
}
}
// every node is mutually subscribed to each other, so the actual count is half of it
emc := expectedMsgCount.count()
if realCount/2 != emc {
return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, emc)
}
return nil
})
if result.Error != nil {
t.Fatal(result.Error)
}
}
// counter is used to concurrently increment
// and read an integer value.

View file

@ -16,6 +16,8 @@
package feed
import "testing"
func getTestQuery() *Query {
id := getTestID()
return &Query{
@ -25,10 +27,11 @@ func getTestQuery() *Query {
}
}
// func TestQueryValues(t *testing.T) {
// var expected = KV{"hint.level": "25", "hint.time": "1000", "time": "5000", "topic": "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000", "user": "0x876A8936A7Cd0b79Ef0735AD0896c1AFe278781c"}
func TestQueryValues(t *testing.T) {
t.Skip("Test no longer work for XDC")
var expected = KV{"hint.level": "25", "hint.time": "1000", "time": "5000", "topic": "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000", "user": "0x876A8936A7Cd0b79Ef0735AD0896c1AFe278781c"}
// query := getTestQuery()
// testValueSerializer(t, query, expected)
query := getTestQuery()
testValueSerializer(t, query, expected)
// }
}

View file

@ -87,6 +87,25 @@ var sharedKey = []byte("some arbitrary data here")
var sharedTopic TopicType = TopicType{0xF, 0x1, 0x2, 0}
var expectedMessage = []byte("per rectum ad astra")
// This test does the following:
// 1. creates a chain of whisper nodes,
// 2. installs the filters with shared (predefined) parameters,
// 3. each node sends a number of random (undecryptable) messages,
// 4. first node sends one expected (decryptable) message,
// 5. checks if each node have received and decrypted exactly one message.
func TestSimulation(t *testing.T) {
t.Skip("Test no longer work for XDC")
initialize(t)
for i := 0; i < NumNodes; i++ {
sendMsg(t, false, i)
}
sendMsg(t, true, 0)
checkPropagation(t)
stopServers()
}
func initialize(t *testing.T) {
var err error