From 164c9edf92b154e2e2c01473e59918e5446d6e93 Mon Sep 17 00:00:00 2001 From: leventeliu Date: Wed, 2 Jan 2019 00:36:48 -0800 Subject: [PATCH 01/10] Block sync head if BPs are unreachable --- blockproducer/chain.go | 70 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 58 insertions(+), 12 deletions(-) diff --git a/blockproducer/chain.go b/blockproducer/chain.go index 068decaa7..d0b129729 100644 --- a/blockproducer/chain.go +++ b/blockproducer/chain.go @@ -22,6 +22,7 @@ import ( "math" "os" "sync" + "sync/atomic" "time" pi "github.com/CovenantSQL/CovenantSQL/blockproducer/interfaces" @@ -376,18 +377,32 @@ func (c *Chain) produceBlock(now time.Time) (err error) { } // advanceNextHeight does the check and runs block producing if its my turn. -func (c *Chain) advanceNextHeight(now time.Time) { +func (c *Chain) advanceNextHeight(now time.Time, d time.Duration) { + var elapsed = -d + log.WithFields(log.Fields{ - "next_height": c.getNextHeight(), - "bp_number": c.serversNum, - "node_index": c.locSvIndex, - }).Info("check turns") - defer c.increaseNextHeight() + "bp_number": c.serversNum, + "node_index": c.locSvIndex, + "enclosing_height": c.getNextHeight() - 1, + "using_timestamp": now.Format(time.RFC3339Nano), + "elapsed_seconds": elapsed.Seconds(), + }).Info("enclosing current height and advancing to next height") + defer c.increaseNextHeight() + // Skip if it's not my turn if !c.isMyTurn() { return } - + // Normally, a block producing should start right after the new period, but more time may also + // elapse since the last block synchronizing. + if elapsed > c.tick { // TODO(leventeliu): add threshold config for `elapsed`. + log.WithFields(log.Fields{ + "advanced_height": c.getNextHeight(), + "using_timestamp": now.Format(time.RFC3339Nano), + "elapsed_seconds": elapsed.Seconds(), + }).Warn("too much time elapsed in the new period, skip this block") + return + } log.WithField("height", c.getNextHeight()).Info("producing a new block") if err := c.produceBlock(now); err != nil { log.WithField("now", now.Format(time.RFC3339Nano)).WithError(err).Errorln( @@ -512,11 +527,20 @@ func (c *Chain) mainCycle(ctx context.Context) { for { select { case <-timer.C: - c.syncCurrentHead(ctx) // Try to fetch block at height `nextHeight-1` + // Try to fetch block at height `nextHeight-1` until enough peers are reachable + for !c.syncCurrentHead(ctx) { + if ctx.Err() != nil { + // Context is closed, break forever loop + log.WithError(ctx.Err()).Info("abort main cycle") + timer.Reset(0) + return + } + } + var t, d = c.nextTick() if d <= 0 { // Try to produce block at `nextHeight` if it's my turn, and increase height by 1 - c.advanceNextHeight(t) + c.advanceNextHeight(t, d) } else { log.WithFields(log.Fields{ "peer": c.peerInfo(), @@ -535,7 +559,10 @@ func (c *Chain) mainCycle(ctx context.Context) { } } -func (c *Chain) syncCurrentHead(ctx context.Context) { +// syncCurrentHead synchronizes a block at the current height of the local peer from the known +// remote peers. The return value `ok` indicates that there're at less `c.confirms-1` replies +// from these gossip calls. +func (c *Chain) syncCurrentHead(ctx context.Context) (ok bool) { var h = c.getNextHeight() - 1 if c.head().height >= h { return @@ -543,12 +570,29 @@ func (c *Chain) syncCurrentHead(ctx context.Context) { // Initiate blocking gossip calls to fetch block of the current height, // with timeout of one tick. var ( - wg = &sync.WaitGroup{} - cld, ccl = context.WithTimeout(ctx, c.tick) + wg = &sync.WaitGroup{} + cld, ccl = context.WithTimeout(ctx, c.tick) + unreachable uint32 ) defer func() { wg.Wait() ccl() + var needConfirms, serversNum = func() (cf, sn uint32) { + c.RLock() + defer c.RUnlock() + cf, sn = c.confirms, c.serversNum + return + }() + if unreachable+needConfirms > serversNum { + log.WithFields(log.Fields{ + "peer": c.peerInfo(), + "sync_head_height": h, + "unreachable_count": unreachable, + }).Warn("one or more block producers are currently unreachable") + ok = false + } else { + ok = true + } }() for _, s := range c.getPeers().Servers { if !s.IsEqual(&c.nodeID) { @@ -573,6 +617,7 @@ func (c *Chain) syncCurrentHead(ctx context.Context) { "remote": id, "height": h, }).WithError(err).Warn("failed to fetch block") + atomic.AddUint32(&unreachable, 1) return } log.WithFields(log.Fields{ @@ -590,6 +635,7 @@ func (c *Chain) syncCurrentHead(ctx context.Context) { }(s) } } + return } func (c *Chain) storeTx(tx pi.Transaction) (err error) { From c2323c951f3f863f1ae64ac3ae2449ce576b193b Mon Sep 17 00:00:00 2001 From: leventeliu Date: Wed, 2 Jan 2019 00:37:31 -0800 Subject: [PATCH 02/10] Change fetch block API, do NOT use error to indicate unknown block --- blockproducer/chain_service.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/blockproducer/chain_service.go b/blockproducer/chain_service.go index 0b94986b2..44dfcb237 100644 --- a/blockproducer/chain_service.go +++ b/blockproducer/chain_service.go @@ -66,10 +66,12 @@ func (c *Chain) fetchLastIrreversibleBlock() ( func (c *Chain) fetchBlockByHeight(h uint32) (b *types.BPBlock, count uint32, err error) { var node = c.head().ancestor(h) + // Not found if node == nil { - err = ErrNoSuchBlock return - } else if node.block != nil { + } + // OK, and block is cached + if node.block != nil { b = node.block count = node.count return @@ -84,10 +86,12 @@ func (c *Chain) fetchBlockByHeight(h uint32) (b *types.BPBlock, count uint32, er func (c *Chain) fetchBlockByCount(count uint32) (b *types.BPBlock, height uint32, err error) { var node = c.head().ancestorByCount(count) + // Not found if node == nil { - err = ErrNoSuchBlock return - } else if node.block != nil { + } + // OK, and block is cached + if node.block != nil { b = node.block height = node.height return From 1fd59211243bdedbf6e3b33b9e79f8c5d5a7bf44 Mon Sep 17 00:00:00 2001 From: leventeliu Date: Wed, 2 Jan 2019 00:37:45 -0800 Subject: [PATCH 03/10] Fix test cases --- blockproducer/chain_test.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/blockproducer/chain_test.go b/blockproducer/chain_test.go index 857d668c6..f91733dd0 100644 --- a/blockproducer/chain_test.go +++ b/blockproducer/chain_test.go @@ -284,11 +284,13 @@ func TestChain(t *testing.T) { count, height uint32 ) - _, _, err = chain.fetchBlockByHeight(100) - So(err, ShouldEqual, ErrNoSuchBlock) + bl, _, err = chain.fetchBlockByHeight(100) + So(bl, ShouldBeNil) + So(err, ShouldBeNil) - _, _, err = chain.fetchBlockByCount(100) - So(err, ShouldEqual, ErrNoSuchBlock) + bl, _, err = chain.fetchBlockByCount(100) + So(bl, ShouldBeNil) + So(err, ShouldBeNil) bl, count, err = chain.fetchBlockByHeight(0) So(err, ShouldBeNil) @@ -332,6 +334,7 @@ func TestChain(t *testing.T) { defer sv.Stop() chain.server = sv + chain.confirms = 1 chain.Start() defer func() { chain.Stop() From 0bb1eab195c8ff0e437b49f9a4d4cf700666feaf Mon Sep 17 00:00:00 2001 From: leventeliu Date: Wed, 2 Jan 2019 00:37:58 -0800 Subject: [PATCH 04/10] Remove unused variable --- blockproducer/errors.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/blockproducer/errors.go b/blockproducer/errors.go index 29409bcd6..7bc765b1f 100644 --- a/blockproducer/errors.go +++ b/blockproducer/errors.go @@ -40,8 +40,6 @@ var ( ErrInvalidMerkleTreeRoot = errors.New("Block merkle tree root does not match the tx hashes") // ErrParentNotMatch defines invalid parent hash. ErrParentNotMatch = errors.New("Block's parent hash cannot match best block") - // ErrNoSuchBlock defines no such block error. - ErrNoSuchBlock = errors.New("Cannot find such block") // ErrNoSuchTxBilling defines no such txbilling error. ErrNoSuchTxBilling = errors.New("Cannot find such txbilling") // ErrSmallerSequenceID defines that new sequence id is smaller the old one. From 201714cd6d487c3f73499187c999b044a95f3f35 Mon Sep 17 00:00:00 2001 From: leventeliu Date: Wed, 2 Jan 2019 00:49:24 -0800 Subject: [PATCH 05/10] Replace fixed timer with ticker --- blockproducer/rpc.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/blockproducer/rpc.go b/blockproducer/rpc.go index 7de2dfdaa..eed4be86c 100644 --- a/blockproducer/rpc.go +++ b/blockproducer/rpc.go @@ -154,21 +154,16 @@ func WaitDatabaseCreation( period time.Duration, ) (err error) { var ( - timer = time.NewTimer(0) - req = &types.QuerySQLChainProfileReq{ + ticker = time.NewTicker(period) + req = &types.QuerySQLChainProfileReq{ DBID: dbID, } resp = &types.QuerySQLChainProfileResp{} ) - defer func() { - if !timer.Stop() { - <-timer.C - } - }() + defer ticker.Stop() for { select { - case <-timer.C: - timer.Reset(period) + case <-ticker.C: if err = rpc.RequestBP( route.MCCQuerySQLChainProfile.String(), req, resp, ); err != nil { From 02d757685ac77ac9e42ce58afd425344d1f1c678 Mon Sep 17 00:00:00 2001 From: auxten Date: Wed, 2 Jan 2019 16:06:36 +0800 Subject: [PATCH 06/10] Use new salt for base58 private key --- crypto/hash/hashfuncs.go | 1 + crypto/kms/privatekeystore.go | 73 +++++++++++++++++++++--------- crypto/kms/privatekeystore_test.go | 11 +++-- crypto/symmetric/aes.go | 16 +++---- crypto/symmetric/aes_test.go | 15 +++--- 5 files changed, 74 insertions(+), 42 deletions(-) diff --git a/crypto/hash/hashfuncs.go b/crypto/hash/hashfuncs.go index d3cd98561..af14cf46d 100644 --- a/crypto/hash/hashfuncs.go +++ b/crypto/hash/hashfuncs.go @@ -19,6 +19,7 @@ package hash import ( "encoding/binary" "hash/fnv" + // "crypto/sha256" benchmark is at least 10% faster on // i7-4870HQ CPU @ 2.50GHz than "github.com/minio/sha256-simd" "crypto/sha256" diff --git a/crypto/kms/privatekeystore.go b/crypto/kms/privatekeystore.go index 0e3aa01d8..bcfd37795 100644 --- a/crypto/kms/privatekeystore.go +++ b/crypto/kms/privatekeystore.go @@ -22,12 +22,13 @@ import ( "io/ioutil" "os" + "github.com/btcsuite/btcutil/base58" + "github.com/CovenantSQL/CovenantSQL/conf" "github.com/CovenantSQL/CovenantSQL/crypto/asymmetric" "github.com/CovenantSQL/CovenantSQL/crypto/hash" "github.com/CovenantSQL/CovenantSQL/crypto/symmetric" "github.com/CovenantSQL/CovenantSQL/utils/log" - "github.com/btcsuite/btcutil/base58" ) var ( @@ -41,17 +42,30 @@ var ( ErrInvalidBase58Checksum = errors.New("invalid base58 checksum") // PrivateKeyStoreVersion defines the private key version byte. PrivateKeyStoreVersion byte = 0x23 + // oldPrivateKDFSalt is the old KDF salt for private key encryption + oldPrivateKDFSalt = "auxten-key-salt-auxten" + // privateKDFSalt is the KDF salt for private key encryption + privateKDFSalt = []byte{ + 0xC0, 0x4E, 0xA4, 0x71, 0x49, 0x65, 0x41, 0x31, + 0x79, 0x4b, 0x6a, 0x70, 0x2f, 0x39, 0x45, 0x43, + } ) // LoadPrivateKey loads private key from keyFilePath, and verifies the hash // head func LoadPrivateKey(keyFilePath string, masterKey []byte) (key *asymmetric.PrivateKey, err error) { + var ( + isBinaryKey bool + decData []byte + ) fileContent, err := ioutil.ReadFile(keyFilePath) if err != nil { log.WithField("path", keyFilePath).WithError(err).Error("read key file failed") return } + // It's very impossible to get an raw private key base58 decodable. + // So if it's not base58 decodable we just make fileContent the encData encData, version, err := base58.CheckDecode(string(fileContent)) switch err { case base58.ErrChecksum: @@ -59,6 +73,7 @@ func LoadPrivateKey(keyFilePath string, masterKey []byte) (key *asymmetric.Priva case base58.ErrInvalidFormat: // be compatible with the original binary private key format + isBinaryKey = true encData = fileContent } @@ -66,27 +81,45 @@ func LoadPrivateKey(keyFilePath string, masterKey []byte) (key *asymmetric.Priva return nil, ErrInvalidBase58Version } - decData, err := symmetric.DecryptWithPassword(encData, masterKey) - if err != nil { - log.Error("decrypt private key error") - return - } + if isBinaryKey { + decData, err = symmetric.DecryptWithPassword(encData, masterKey, []byte(oldPrivateKDFSalt)) + if err != nil { + log.Error("decrypt private key error") + return + } - // sha256 + privateKey - if len(decData) != hash.HashBSize+asymmetric.PrivateKeyBytesLen { - log.WithFields(log.Fields{ - "expected": hash.HashBSize + asymmetric.PrivateKeyBytesLen, - "actual": len(decData), - }).Error("wrong private key file size") - return nil, ErrNotKeyFile - } + // sha256 + privateKey + if len(decData) != hash.HashBSize+asymmetric.PrivateKeyBytesLen { + log.WithFields(log.Fields{ + "expected": hash.HashBSize + asymmetric.PrivateKeyBytesLen, + "actual": len(decData), + }).Error("wrong binary private key file size") + return nil, ErrNotKeyFile + } + + computedHash := hash.DoubleHashB(decData[hash.HashBSize:]) + if !bytes.Equal(computedHash, decData[:hash.HashBSize]) { + return nil, ErrHashNotMatch + } + key, _ = asymmetric.PrivKeyFromBytes(decData[hash.HashBSize:]) + } else { + decData, err = symmetric.DecryptWithPassword(encData, masterKey, privateKDFSalt) + if err != nil { + log.Error("decrypt private key error") + return + } - computedHash := hash.DoubleHashB(decData[hash.HashBSize:]) - if !bytes.Equal(computedHash, decData[:hash.HashBSize]) { - return nil, ErrHashNotMatch + // privateKey + if len(decData) != asymmetric.PrivateKeyBytesLen { + log.WithFields(log.Fields{ + "expected": asymmetric.PrivateKeyBytesLen, + "actual": len(decData), + }).Error("wrong base58 private key file size") + return nil, ErrNotKeyFile + } + key, _ = asymmetric.PrivKeyFromBytes(decData) } - key, _ = asymmetric.PrivKeyFromBytes(decData[hash.HashBSize:]) return } @@ -94,9 +127,7 @@ func LoadPrivateKey(keyFilePath string, masterKey []byte) (key *asymmetric.Priva // default perm is 0600 func SavePrivateKey(keyFilePath string, key *asymmetric.PrivateKey, masterKey []byte) (err error) { serializedKey := key.Serialize() - keyHash := hash.DoubleHashB(serializedKey) - rawData := append(keyHash, serializedKey...) - encKey, err := symmetric.EncryptWithPassword(rawData, masterKey) + encKey, err := symmetric.EncryptWithPassword(serializedKey, masterKey, privateKDFSalt) if err != nil { return } diff --git a/crypto/kms/privatekeystore_test.go b/crypto/kms/privatekeystore_test.go index 32710e0df..c33ba450f 100644 --- a/crypto/kms/privatekeystore_test.go +++ b/crypto/kms/privatekeystore_test.go @@ -34,6 +34,7 @@ import ( const ( privateKeyPath = "./.testprivatekey" password = "auxten" + salt = "auxten-key-salt-auxten" ) func TestSaveLoadPrivateKey(t *testing.T) { @@ -83,7 +84,7 @@ func TestLoadPrivateKey(t *testing.T) { }) Convey("not key file2", t, func() { defer os.Remove("./.notkey") - enc, _ := symmetric.EncryptWithPassword([]byte("aa"), []byte(password)) + enc, _ := symmetric.EncryptWithPassword([]byte("aa"), []byte(password), []byte(salt)) ioutil.WriteFile("./.notkey", enc, 0600) lk, err := LoadPrivateKey("./.notkey", []byte(password)) So(err, ShouldEqual, ErrNotKeyFile) @@ -91,7 +92,7 @@ func TestLoadPrivateKey(t *testing.T) { }) Convey("hash not match", t, func() { defer os.Remove("./.HashNotMatch") - enc, _ := symmetric.EncryptWithPassword(bytes.Repeat([]byte("a"), 64), []byte(password)) + enc, _ := symmetric.EncryptWithPassword(bytes.Repeat([]byte("a"), 64), []byte(password), []byte(salt)) ioutil.WriteFile("./.HashNotMatch", enc, 0600) lk, err := LoadPrivateKey("./.HashNotMatch", []byte(password)) So(err, ShouldEqual, ErrHashNotMatch) @@ -105,7 +106,7 @@ func TestLoadPrivateKey(t *testing.T) { serializedKey := privateKey.Serialize() keyHash := hash.DoubleHashB(serializedKey) rawData := append(keyHash, serializedKey...) - encKey, _ := symmetric.EncryptWithPassword(rawData, []byte(password)) + encKey, _ := symmetric.EncryptWithPassword(rawData, []byte(password), []byte(salt)) invalidBase58EncKey := base58.CheckEncode(encKey, invalidPrivateKeyStoreVersion) ioutil.WriteFile("./.Base58VersionNotMatch", []byte(invalidBase58EncKey), 0600) lk, err := LoadPrivateKey("./.Base58VersionNotMatch", []byte(password)) @@ -154,14 +155,14 @@ func TestInitLocalKeyPair(t *testing.T) { func TestInitLocalKeyPair_error(t *testing.T) { Convey("hash not match", t, func() { defer os.Remove("./.HashNotMatch") - enc, _ := symmetric.EncryptWithPassword(bytes.Repeat([]byte("a"), 64), []byte(password)) + enc, _ := symmetric.EncryptWithPassword(bytes.Repeat([]byte("a"), 64), []byte(password), []byte(salt)) ioutil.WriteFile("./.HashNotMatch", enc, 0600) err := InitLocalKeyPair("./.HashNotMatch", []byte(password)) So(err, ShouldEqual, ErrHashNotMatch) }) Convey("ErrNotKeyFile", t, func() { defer os.Remove("./.ErrNotKeyFile") - enc, _ := symmetric.EncryptWithPassword(bytes.Repeat([]byte("a"), 65), []byte(password)) + enc, _ := symmetric.EncryptWithPassword(bytes.Repeat([]byte("a"), 65), []byte(password), []byte(salt)) ioutil.WriteFile("./.ErrNotKeyFile", enc, 0600) err := InitLocalKeyPair("./.ErrNotKeyFile", []byte(password)) So(err, ShouldEqual, ErrNotKeyFile) diff --git a/crypto/symmetric/aes.go b/crypto/symmetric/aes.go index 41c7a0f1d..afaeef8ed 100644 --- a/crypto/symmetric/aes.go +++ b/crypto/symmetric/aes.go @@ -28,10 +28,6 @@ import ( "github.com/CovenantSQL/CovenantSQL/crypto/hash" ) -const ( - keySalt = "auxten-key-salt-auxten" -) - var ( // ErrInputSize indicates cipher data size is not expected, // maybe data is not encrypted by EncryptWithPassword in this package @@ -39,16 +35,16 @@ var ( ) // keyDerivation does sha256 twice to password -func keyDerivation(password []byte) (out []byte) { - return hash.DoubleHashB(append(password, keySalt...)) +func keyDerivation(password []byte, salt []byte) (out []byte) { + return hash.DoubleHashB(append(password, salt...)) } // EncryptWithPassword encrypts data with given password, iv will be placed // at head of cipher data -func EncryptWithPassword(in, password []byte) (out []byte, err error) { +func EncryptWithPassword(in, password []byte, salt []byte) (out []byte, err error) { // keyE will be 256 bits, so aes.NewCipher(keyE) will return // AES-256 Cipher. - keyE := keyDerivation(password) + keyE := keyDerivation(password, salt) paddedIn := crypto.AddPKCSPadding(in) // IV + padded cipher data out = make([]byte, aes.BlockSize+len(paddedIn)) @@ -70,8 +66,8 @@ func EncryptWithPassword(in, password []byte) (out []byte, err error) { } // DecryptWithPassword decrypts data with given password -func DecryptWithPassword(in, password []byte) (out []byte, err error) { - keyE := keyDerivation(password) +func DecryptWithPassword(in, password []byte, salt []byte) (out []byte, err error) { + keyE := keyDerivation(password, salt) // IV + padded cipher data == (n + 1 + 1) * aes.BlockSize if len(in)%aes.BlockSize != 0 || len(in)/aes.BlockSize < 2 { return nil, ErrInputSize diff --git a/crypto/symmetric/aes_test.go b/crypto/symmetric/aes_test.go index a5054dff1..4c469015b 100644 --- a/crypto/symmetric/aes_test.go +++ b/crypto/symmetric/aes_test.go @@ -24,16 +24,19 @@ import ( . "github.com/smartystreets/goconvey/convey" ) -const password = "CovenantSQL.io" +const ( + password = "CovenantSQL.io" + salt = "auxten-key-salt-auxten" +) func TestEncryptDecryptWithPassword(t *testing.T) { Convey("encrypt & decrypt 0 length bytes with aes256", t, func() { - enc, err := EncryptWithPassword([]byte(nil), []byte(password)) + enc, err := EncryptWithPassword([]byte(nil), []byte(password), []byte(salt)) So(enc, ShouldNotBeNil) So(len(enc), ShouldEqual, 2*aes.BlockSize) So(err, ShouldBeNil) - dec, err := DecryptWithPassword(enc, []byte(password)) + dec, err := DecryptWithPassword(enc, []byte(password), []byte(salt)) So(dec, ShouldNotBeNil) So(len(dec), ShouldEqual, 0) So(err, ShouldBeNil) @@ -41,12 +44,12 @@ func TestEncryptDecryptWithPassword(t *testing.T) { Convey("encrypt & decrypt 1747 length bytes", t, func() { in := bytes.Repeat([]byte{0xff}, 1747) - enc, err := EncryptWithPassword(in, []byte(password)) + enc, err := EncryptWithPassword(in, []byte(password), []byte(salt)) So(enc, ShouldNotBeNil) So(len(enc), ShouldEqual, (1747/aes.BlockSize+2)*aes.BlockSize) So(err, ShouldBeNil) - dec, err := DecryptWithPassword(enc, []byte(password)) + dec, err := DecryptWithPassword(enc, []byte(password), []byte(salt)) So(dec, ShouldNotBeNil) So(len(dec), ShouldEqual, 1747) So(err, ShouldBeNil) @@ -54,7 +57,7 @@ func TestEncryptDecryptWithPassword(t *testing.T) { Convey("decrypt error length bytes", t, func() { in := bytes.Repeat([]byte{0xff}, 1747) - dec, err := DecryptWithPassword(in, []byte(password)) + dec, err := DecryptWithPassword(in, []byte(password), []byte(salt)) So(dec, ShouldBeNil) So(err, ShouldEqual, ErrInputSize) }) From fcca3253c40a3d8bdc03aa8906f214af17f0bf8b Mon Sep 17 00:00:00 2001 From: auxten Date: Wed, 2 Jan 2019 16:07:00 +0800 Subject: [PATCH 07/10] Format code --- cmd/cql-adapter/storage/sqlite3.go | 1 + cmd/cql-faucet/persistence.go | 1 + storage/storage.go | 1 + xenomint/xxx_test.go | 1 + 4 files changed, 4 insertions(+) diff --git a/cmd/cql-adapter/storage/sqlite3.go b/cmd/cql-adapter/storage/sqlite3.go index eb794d511..9eeaab39f 100644 --- a/cmd/cql-adapter/storage/sqlite3.go +++ b/cmd/cql-adapter/storage/sqlite3.go @@ -24,6 +24,7 @@ import ( "math/rand" "os" "path/filepath" + // Import sqlite3 manually. _ "github.com/CovenantSQL/go-sqlite3-encrypt" ) diff --git a/cmd/cql-faucet/persistence.go b/cmd/cql-faucet/persistence.go index dfdc3d458..140a6c6d6 100644 --- a/cmd/cql-faucet/persistence.go +++ b/cmd/cql-faucet/persistence.go @@ -26,6 +26,7 @@ import ( "github.com/CovenantSQL/CovenantSQL/conf" "github.com/CovenantSQL/CovenantSQL/utils/log" uuid "github.com/satori/go.uuid" + // Load sqlite3 database driver. _ "github.com/CovenantSQL/go-sqlite3-encrypt" ) diff --git a/storage/storage.go b/storage/storage.go index 7e36bad0e..e059819af 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -26,6 +26,7 @@ import ( "github.com/CovenantSQL/CovenantSQL/twopc" "github.com/CovenantSQL/CovenantSQL/utils/log" + // Register CovenantSQL/go-sqlite3-encrypt engine. _ "github.com/CovenantSQL/go-sqlite3-encrypt" ) diff --git a/xenomint/xxx_test.go b/xenomint/xxx_test.go index 13a973878..6e19f1b08 100644 --- a/xenomint/xxx_test.go +++ b/xenomint/xxx_test.go @@ -22,6 +22,7 @@ import ( "os" "path" "sync/atomic" + //"runtime/trace" "sync" "syscall" From b8b1735ffe6391d547d7ec50d76691a9df70490a Mon Sep 17 00:00:00 2001 From: auxten Date: Wed, 2 Jan 2019 16:16:00 +0800 Subject: [PATCH 08/10] Remove Build func and its test --- utils/exec.go | 18 ------------------ utils/exec_test.go | 7 ------- 2 files changed, 25 deletions(-) diff --git a/utils/exec.go b/utils/exec.go index 401e51bee..0a4db3616 100644 --- a/utils/exec.go +++ b/utils/exec.go @@ -42,24 +42,6 @@ func GetProjectSrcDir() string { return FJ(filepath.Dir(testFile), "../") } -// Build runs make -func Build() (err error) { - wd := GetProjectSrcDir() - err = os.Chdir(wd) - if err != nil { - log.WithError(err).Error("change working dir failed") - return - } - exec.Command("make", "clean").Run() - cmd := exec.Command("make", "use_all_cores") - output, err := cmd.CombinedOutput() - if err != nil { - log.WithError(err).Error("build failed") - } - log.Debugf("build output info: %#v", string(output)) - return -} - // RunCommand runs a command and capture its output to a log file, // if toStd is true also output to stdout and stderr func RunCommand(bin string, args []string, processName string, workingDir string, logDir string, toStd bool) (err error) { diff --git a/utils/exec_test.go b/utils/exec_test.go index f0e37b55e..f413bca64 100644 --- a/utils/exec_test.go +++ b/utils/exec_test.go @@ -31,13 +31,6 @@ var ( logDir = FJ(testWorkingDir, "./log/") ) -func TestBuild(t *testing.T) { - Convey("build", t, func() { - log.SetLevel(log.DebugLevel) - So(Build(), ShouldBeNil) - }) -} - func TestRunServer(t *testing.T) { Convey("build", t, func() { log.SetLevel(log.DebugLevel) From f63d3bdfd9f4a4e7a7e1bacda52cf5bad70af3dd Mon Sep 17 00:00:00 2001 From: leventeliu Date: Wed, 2 Jan 2019 18:08:16 +0800 Subject: [PATCH 09/10] Fix nil pointer access and unset return value --- blockproducer/chain.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/blockproducer/chain.go b/blockproducer/chain.go index d0b129729..aaa06a389 100644 --- a/blockproducer/chain.go +++ b/blockproducer/chain.go @@ -565,6 +565,7 @@ func (c *Chain) mainCycle(ctx context.Context) { func (c *Chain) syncCurrentHead(ctx context.Context) (ok bool) { var h = c.getNextHeight() - 1 if c.head().height >= h { + ok = true return } // Initiate blocking gossip calls to fetch block of the current height, @@ -620,13 +621,18 @@ func (c *Chain) syncCurrentHead(ctx context.Context) (ok bool) { atomic.AddUint32(&unreachable, 1) return } - log.WithFields(log.Fields{ + var fields = log.Fields{ "local": c.peerInfo(), "remote": id, "height": h, - "parent": resp.Block.ParentHash().Short(4), - "hash": resp.Block.BlockHash().Short(4), - }).Debug("fetched new block from remote peer") + } + defer log.WithFields(fields).Debug("fetch block request reply") + if resp.Block == nil { + return + } + // Push new block from other peers + fields["parent"] = resp.Block.ParentHash().Short(4) + fields["hash"] = resp.Block.BlockHash().Short(4) select { case c.pendingBlocks <- resp.Block: case <-cld.Done(): From 96a8f3ff0e5aa24b6a2d02959fe9ed9c539fadb7 Mon Sep 17 00:00:00 2001 From: leventeliu Date: Wed, 2 Jan 2019 22:21:08 +0800 Subject: [PATCH 10/10] Add simple test cases for cqld --- blockproducer/chain.go | 34 +++++++++--- cmd/cqld/bench_test.go | 14 ++++- cmd/cqld/cqld_test.go | 114 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 154 insertions(+), 8 deletions(-) create mode 100644 cmd/cqld/cqld_test.go diff --git a/blockproducer/chain.go b/blockproducer/chain.go index aaa06a389..40f3f1465 100644 --- a/blockproducer/chain.go +++ b/blockproducer/chain.go @@ -528,13 +528,10 @@ func (c *Chain) mainCycle(ctx context.Context) { select { case <-timer.C: // Try to fetch block at height `nextHeight-1` until enough peers are reachable - for !c.syncCurrentHead(ctx) { - if ctx.Err() != nil { - // Context is closed, break forever loop - log.WithError(ctx.Err()).Info("abort main cycle") - timer.Reset(0) - return - } + if err := c.blockingSyncCurrentHead(ctx); err != nil { + log.WithError(err).Info("abort main cycle") + timer.Reset(0) + return } var t, d = c.nextTick() @@ -559,6 +556,29 @@ func (c *Chain) mainCycle(ctx context.Context) { } } +func (c *Chain) blockingSyncCurrentHead(ctx context.Context) (err error) { + var ( + ticker *time.Ticker + interval = 1 * time.Second + ) + if c.tick < interval { + interval = c.tick + } + ticker = time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if c.syncCurrentHead(ctx) { + return + } + case <-ctx.Done(): + err = ctx.Err() + return + } + } +} + // syncCurrentHead synchronizes a block at the current height of the local peer from the known // remote peers. The return value `ok` indicates that there're at less `c.confirms-1` replies // from these gossip calls. diff --git a/cmd/cqld/bench_test.go b/cmd/cqld/bench_test.go index 086d92a4e..37771d128 100644 --- a/cmd/cqld/bench_test.go +++ b/cmd/cqld/bench_test.go @@ -21,6 +21,7 @@ package main import ( "context" "net" + "os" "os/exec" "path/filepath" "sync" @@ -60,6 +61,9 @@ func start3BPs() { // start 3bps var cmd *utils.CMD + os.Remove(FJ(testWorkingDir, "./node_0/chain.db")) + os.Remove(FJ(testWorkingDir, "./node_0/dht.db")) + os.Remove(FJ(testWorkingDir, "./node_0/public.keystore")) if cmd, err = utils.RunCommandNB( FJ(baseDir, "./bin/cqld.test"), []string{"-config", FJ(testWorkingDir, "./node_0/config.yaml"), @@ -71,6 +75,9 @@ func start3BPs() { } else { log.Errorf("start node failed: %v", err) } + os.Remove(FJ(testWorkingDir, "./node_1/chain.db")) + os.Remove(FJ(testWorkingDir, "./node_1/dht.db")) + os.Remove(FJ(testWorkingDir, "./node_1/public.keystore")) if cmd, err = utils.RunCommandNB( FJ(baseDir, "./bin/cqld.test"), []string{"-config", FJ(testWorkingDir, "./node_1/config.yaml"), @@ -82,6 +89,9 @@ func start3BPs() { } else { log.Errorf("start node failed: %v", err) } + os.Remove(FJ(testWorkingDir, "./node_2/chain.db")) + os.Remove(FJ(testWorkingDir, "./node_2/dht.db")) + os.Remove(FJ(testWorkingDir, "./node_2/public.keystore")) if cmd, err = utils.RunCommandNB( FJ(baseDir, "./bin/cqld.test"), []string{"-config", FJ(testWorkingDir, "./node_2/config.yaml"), @@ -93,6 +103,8 @@ func start3BPs() { } else { log.Errorf("start node failed: %v", err) } + os.Remove(FJ(testWorkingDir, "./node_c/dht.db")) + os.Remove(FJ(testWorkingDir, "./node_c/public.keystore")) } func stopNodes() { @@ -111,8 +123,8 @@ func stopNodes() { } }(nodeCmd) } - wg.Wait() + nodeCmds = nil } func TestStartBP_CallRPC(t *testing.T) { diff --git a/cmd/cqld/cqld_test.go b/cmd/cqld/cqld_test.go new file mode 100644 index 000000000..97337d71e --- /dev/null +++ b/cmd/cqld/cqld_test.go @@ -0,0 +1,114 @@ +// +build !testbinary + +/* + * Copyright 2018 The CovenantSQL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "strings" + "syscall" + "testing" + "time" + + "github.com/CovenantSQL/CovenantSQL/conf" + "github.com/CovenantSQL/CovenantSQL/crypto/kms" + "github.com/CovenantSQL/CovenantSQL/route" + "github.com/CovenantSQL/CovenantSQL/rpc" + "github.com/CovenantSQL/CovenantSQL/types" + "github.com/CovenantSQL/CovenantSQL/utils" + . "github.com/smartystreets/goconvey/convey" +) + +func waitBPChainService(ctx context.Context, period time.Duration) (err error) { + var ( + ticker = time.NewTicker(period) + req = &types.FetchBlockReq{ + Height: 0, // Genesis block + } + resp = &types.FetchTxBillingResp{} + ) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if err = rpc.RequestBP( + route.MCCFetchBlock.String(), req, resp, + ); err == nil || strings.Contains(err.Error(), "can't find service") { + return + } + case <-ctx.Done(): + err = ctx.Err() + return + } + } +} + +func TestCQLD(t *testing.T) { + Convey("Test cqld 3BPs", t, func() { + var ( + ctx1, ctx2 context.Context + ccl1, ccl2 context.CancelFunc + err error + ) + start3BPs() + defer stopNodes() + So(len(nodeCmds), ShouldEqual, 3) + + ctx1, ccl1 = context.WithTimeout(context.Background(), 30*time.Second) + defer ccl1() + + err = utils.WaitToConnect(ctx1, "127.0.0.1", []int{2122, 2121, 2120}, 10*time.Second) + So(err, ShouldBeNil) + + // Initialize local client + conf.GConf, err = conf.LoadConfig(FJ(testWorkingDir, "./node_c/config.yaml")) + So(err, ShouldBeNil) + route.InitKMS(conf.GConf.PubKeyStoreFile) + err = kms.InitLocalKeyPair(conf.GConf.PrivateKeyFile, []byte{}) + So(err, ShouldBeNil) + + // Wait BP chain service to be ready + ctx2, ccl2 = context.WithTimeout(context.Background(), 30*time.Second) + defer ccl2() + err = waitBPChainService(ctx2, 3*time.Second) + So(err, ShouldBeNil) + + // Wait for block producing + time.Sleep(15 * time.Second) + + // Kill one BP + err = nodeCmds[2].Cmd.Process.Signal(syscall.SIGTERM) + So(err, ShouldBeNil) + time.Sleep(15 * time.Second) + + // The other peers should be waiting + var ( + req = &types.FetchLastIrreversibleBlockReq{} + resp = &types.FetchLastIrreversibleBlockResp{} + + lastBlockCount uint32 + ) + err = rpc.RequestBP(route.MCCFetchLastIrreversibleBlock.String(), req, resp) + So(err, ShouldBeNil) + lastBlockCount = resp.Count + time.Sleep(15 * time.Second) + err = rpc.RequestBP(route.MCCFetchLastIrreversibleBlock.String(), req, resp) + So(err, ShouldBeNil) + So(resp.Count, ShouldEqual, lastBlockCount) + }) +}