diff --git a/blockproducer/chain.go b/blockproducer/chain.go index a9f90edf8..fca7d2510 100644 --- a/blockproducer/chain.go +++ b/blockproducer/chain.go @@ -22,6 +22,7 @@ import ( "math" "os" "sync" + "sync/atomic" "time" pi "github.com/CovenantSQL/CovenantSQL/blockproducer/interfaces" @@ -376,18 +377,32 @@ func (c *Chain) produceBlock(now time.Time) (err error) { } // advanceNextHeight does the check and runs block producing if its my turn. -func (c *Chain) advanceNextHeight(now time.Time) { +func (c *Chain) advanceNextHeight(now time.Time, d time.Duration) { + var elapsed = -d + log.WithFields(log.Fields{ - "next_height": c.getNextHeight(), - "bp_number": c.serversNum, - "node_index": c.locSvIndex, - }).Info("check turns") - defer c.increaseNextHeight() + "bp_number": c.serversNum, + "node_index": c.locSvIndex, + "enclosing_height": c.getNextHeight() - 1, + "using_timestamp": now.Format(time.RFC3339Nano), + "elapsed_seconds": elapsed.Seconds(), + }).Info("enclosing current height and advancing to next height") + defer c.increaseNextHeight() + // Skip if it's not my turn if !c.isMyTurn() { return } - + // Normally, a block producing should start right after the new period, but more time may also + // elapse since the last block synchronizing. + if elapsed > c.tick { // TODO(leventeliu): add threshold config for `elapsed`. + log.WithFields(log.Fields{ + "advanced_height": c.getNextHeight(), + "using_timestamp": now.Format(time.RFC3339Nano), + "elapsed_seconds": elapsed.Seconds(), + }).Warn("too much time elapsed in the new period, skip this block") + return + } log.WithField("height", c.getNextHeight()).Info("producing a new block") if err := c.produceBlock(now); err != nil { log.WithField("now", now.Format(time.RFC3339Nano)).WithError(err).Errorln( @@ -512,11 +527,17 @@ func (c *Chain) mainCycle(ctx context.Context) { for { select { case <-timer.C: - c.syncCurrentHead(ctx) // Try to fetch block at height `nextHeight-1` + // Try to fetch block at height `nextHeight-1` until enough peers are reachable + if err := c.blockingSyncCurrentHead(ctx); err != nil { + log.WithError(err).Info("abort main cycle") + timer.Reset(0) + return + } + var t, d = c.nextTick() if d <= 0 { // Try to produce block at `nextHeight` if it's my turn, and increase height by 1 - c.advanceNextHeight(t) + c.advanceNextHeight(t, d) } else { log.WithFields(log.Fields{ "peer": c.peerInfo(), @@ -535,20 +556,64 @@ func (c *Chain) mainCycle(ctx context.Context) { } } -func (c *Chain) syncCurrentHead(ctx context.Context) { +func (c *Chain) blockingSyncCurrentHead(ctx context.Context) (err error) { + var ( + ticker *time.Ticker + interval = 1 * time.Second + ) + if c.tick < interval { + interval = c.tick + } + ticker = time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if c.syncCurrentHead(ctx) { + return + } + case <-ctx.Done(): + err = ctx.Err() + return + } + } +} + +// syncCurrentHead synchronizes a block at the current height of the local peer from the known +// remote peers. The return value `ok` indicates that there're at less `c.confirms-1` replies +// from these gossip calls. +func (c *Chain) syncCurrentHead(ctx context.Context) (ok bool) { var h = c.getNextHeight() - 1 if c.head().height >= h { + ok = true return } // Initiate blocking gossip calls to fetch block of the current height, // with timeout of one tick. var ( - wg = &sync.WaitGroup{} - cld, ccl = context.WithTimeout(ctx, c.tick) + wg = &sync.WaitGroup{} + cld, ccl = context.WithTimeout(ctx, c.tick) + unreachable uint32 ) defer func() { wg.Wait() ccl() + var needConfirms, serversNum = func() (cf, sn uint32) { + c.RLock() + defer c.RUnlock() + cf, sn = c.confirms, c.serversNum + return + }() + if unreachable+needConfirms > serversNum { + log.WithFields(log.Fields{ + "peer": c.peerInfo(), + "sync_head_height": h, + "unreachable_count": unreachable, + }).Warn("one or more block producers are currently unreachable") + ok = false + } else { + ok = true + } }() for _, s := range c.getPeers().Servers { if !s.IsEqual(&c.nodeID) { @@ -573,15 +638,21 @@ func (c *Chain) syncCurrentHead(ctx context.Context) { "remote": id, "height": h, }).WithError(err).Warn("failed to fetch block") + atomic.AddUint32(&unreachable, 1) return } - log.WithFields(log.Fields{ + var fields = log.Fields{ "local": c.peerInfo(), "remote": id, "height": h, - "parent": resp.Block.ParentHash().Short(4), - "hash": resp.Block.BlockHash().Short(4), - }).Debug("fetched new block from remote peer") + } + defer log.WithFields(fields).Debug("fetch block request reply") + if resp.Block == nil { + return + } + // Push new block from other peers + fields["parent"] = resp.Block.ParentHash().Short(4) + fields["hash"] = resp.Block.BlockHash().Short(4) select { case c.pendingBlocks <- resp.Block: case <-cld.Done(): @@ -590,6 +661,7 @@ func (c *Chain) syncCurrentHead(ctx context.Context) { }(s) } } + return } func (c *Chain) storeTx(tx pi.Transaction) (err error) { diff --git a/blockproducer/chain_service.go b/blockproducer/chain_service.go index 0b94986b2..44dfcb237 100644 --- a/blockproducer/chain_service.go +++ b/blockproducer/chain_service.go @@ -66,10 +66,12 @@ func (c *Chain) fetchLastIrreversibleBlock() ( func (c *Chain) fetchBlockByHeight(h uint32) (b *types.BPBlock, count uint32, err error) { var node = c.head().ancestor(h) + // Not found if node == nil { - err = ErrNoSuchBlock return - } else if node.block != nil { + } + // OK, and block is cached + if node.block != nil { b = node.block count = node.count return @@ -84,10 +86,12 @@ func (c *Chain) fetchBlockByHeight(h uint32) (b *types.BPBlock, count uint32, er func (c *Chain) fetchBlockByCount(count uint32) (b *types.BPBlock, height uint32, err error) { var node = c.head().ancestorByCount(count) + // Not found if node == nil { - err = ErrNoSuchBlock return - } else if node.block != nil { + } + // OK, and block is cached + if node.block != nil { b = node.block height = node.height return diff --git a/blockproducer/chain_test.go b/blockproducer/chain_test.go index e44cc0e5c..1e4afe108 100644 --- a/blockproducer/chain_test.go +++ b/blockproducer/chain_test.go @@ -292,11 +292,13 @@ func TestChain(t *testing.T) { count, height uint32 ) - _, _, err = chain.fetchBlockByHeight(100) - So(err, ShouldEqual, ErrNoSuchBlock) + bl, _, err = chain.fetchBlockByHeight(100) + So(bl, ShouldBeNil) + So(err, ShouldBeNil) - _, _, err = chain.fetchBlockByCount(100) - So(err, ShouldEqual, ErrNoSuchBlock) + bl, _, err = chain.fetchBlockByCount(100) + So(bl, ShouldBeNil) + So(err, ShouldBeNil) bl, count, err = chain.fetchBlockByHeight(0) So(err, ShouldBeNil) @@ -340,6 +342,7 @@ func TestChain(t *testing.T) { defer sv.Stop() chain.server = sv + chain.confirms = 1 chain.Start() defer func() { chain.Stop() diff --git a/blockproducer/errors.go b/blockproducer/errors.go index fbea655a6..1357a2a29 100644 --- a/blockproducer/errors.go +++ b/blockproducer/errors.go @@ -31,8 +31,6 @@ var ( ErrInvalidMerkleTreeRoot = errors.New("Block merkle tree root does not match the tx hashes") // ErrParentNotMatch defines invalid parent hash. ErrParentNotMatch = errors.New("Block's parent hash cannot match best block") - // ErrNoSuchBlock defines no such block error. - ErrNoSuchBlock = errors.New("Cannot find such block") // ErrBalanceOverflow indicates that there will be an overflow after balance manipulation. ErrBalanceOverflow = errors.New("balance overflow") diff --git a/blockproducer/rpc.go b/blockproducer/rpc.go index 7de2dfdaa..eed4be86c 100644 --- a/blockproducer/rpc.go +++ b/blockproducer/rpc.go @@ -154,21 +154,16 @@ func WaitDatabaseCreation( period time.Duration, ) (err error) { var ( - timer = time.NewTimer(0) - req = &types.QuerySQLChainProfileReq{ + ticker = time.NewTicker(period) + req = &types.QuerySQLChainProfileReq{ DBID: dbID, } resp = &types.QuerySQLChainProfileResp{} ) - defer func() { - if !timer.Stop() { - <-timer.C - } - }() + defer ticker.Stop() for { select { - case <-timer.C: - timer.Reset(period) + case <-ticker.C: if err = rpc.RequestBP( route.MCCQuerySQLChainProfile.String(), req, resp, ); err != nil { diff --git a/cmd/cqld/bench_test.go b/cmd/cqld/bench_test.go index 086d92a4e..37771d128 100644 --- a/cmd/cqld/bench_test.go +++ b/cmd/cqld/bench_test.go @@ -21,6 +21,7 @@ package main import ( "context" "net" + "os" "os/exec" "path/filepath" "sync" @@ -60,6 +61,9 @@ func start3BPs() { // start 3bps var cmd *utils.CMD + os.Remove(FJ(testWorkingDir, "./node_0/chain.db")) + os.Remove(FJ(testWorkingDir, "./node_0/dht.db")) + os.Remove(FJ(testWorkingDir, "./node_0/public.keystore")) if cmd, err = utils.RunCommandNB( FJ(baseDir, "./bin/cqld.test"), []string{"-config", FJ(testWorkingDir, "./node_0/config.yaml"), @@ -71,6 +75,9 @@ func start3BPs() { } else { log.Errorf("start node failed: %v", err) } + os.Remove(FJ(testWorkingDir, "./node_1/chain.db")) + os.Remove(FJ(testWorkingDir, "./node_1/dht.db")) + os.Remove(FJ(testWorkingDir, "./node_1/public.keystore")) if cmd, err = utils.RunCommandNB( FJ(baseDir, "./bin/cqld.test"), []string{"-config", FJ(testWorkingDir, "./node_1/config.yaml"), @@ -82,6 +89,9 @@ func start3BPs() { } else { log.Errorf("start node failed: %v", err) } + os.Remove(FJ(testWorkingDir, "./node_2/chain.db")) + os.Remove(FJ(testWorkingDir, "./node_2/dht.db")) + os.Remove(FJ(testWorkingDir, "./node_2/public.keystore")) if cmd, err = utils.RunCommandNB( FJ(baseDir, "./bin/cqld.test"), []string{"-config", FJ(testWorkingDir, "./node_2/config.yaml"), @@ -93,6 +103,8 @@ func start3BPs() { } else { log.Errorf("start node failed: %v", err) } + os.Remove(FJ(testWorkingDir, "./node_c/dht.db")) + os.Remove(FJ(testWorkingDir, "./node_c/public.keystore")) } func stopNodes() { @@ -111,8 +123,8 @@ func stopNodes() { } }(nodeCmd) } - wg.Wait() + nodeCmds = nil } func TestStartBP_CallRPC(t *testing.T) { diff --git a/cmd/cqld/cqld_test.go b/cmd/cqld/cqld_test.go new file mode 100644 index 000000000..97337d71e --- /dev/null +++ b/cmd/cqld/cqld_test.go @@ -0,0 +1,114 @@ +// +build !testbinary + +/* + * Copyright 2018 The CovenantSQL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "strings" + "syscall" + "testing" + "time" + + "github.com/CovenantSQL/CovenantSQL/conf" + "github.com/CovenantSQL/CovenantSQL/crypto/kms" + "github.com/CovenantSQL/CovenantSQL/route" + "github.com/CovenantSQL/CovenantSQL/rpc" + "github.com/CovenantSQL/CovenantSQL/types" + "github.com/CovenantSQL/CovenantSQL/utils" + . "github.com/smartystreets/goconvey/convey" +) + +func waitBPChainService(ctx context.Context, period time.Duration) (err error) { + var ( + ticker = time.NewTicker(period) + req = &types.FetchBlockReq{ + Height: 0, // Genesis block + } + resp = &types.FetchTxBillingResp{} + ) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if err = rpc.RequestBP( + route.MCCFetchBlock.String(), req, resp, + ); err == nil || strings.Contains(err.Error(), "can't find service") { + return + } + case <-ctx.Done(): + err = ctx.Err() + return + } + } +} + +func TestCQLD(t *testing.T) { + Convey("Test cqld 3BPs", t, func() { + var ( + ctx1, ctx2 context.Context + ccl1, ccl2 context.CancelFunc + err error + ) + start3BPs() + defer stopNodes() + So(len(nodeCmds), ShouldEqual, 3) + + ctx1, ccl1 = context.WithTimeout(context.Background(), 30*time.Second) + defer ccl1() + + err = utils.WaitToConnect(ctx1, "127.0.0.1", []int{2122, 2121, 2120}, 10*time.Second) + So(err, ShouldBeNil) + + // Initialize local client + conf.GConf, err = conf.LoadConfig(FJ(testWorkingDir, "./node_c/config.yaml")) + So(err, ShouldBeNil) + route.InitKMS(conf.GConf.PubKeyStoreFile) + err = kms.InitLocalKeyPair(conf.GConf.PrivateKeyFile, []byte{}) + So(err, ShouldBeNil) + + // Wait BP chain service to be ready + ctx2, ccl2 = context.WithTimeout(context.Background(), 30*time.Second) + defer ccl2() + err = waitBPChainService(ctx2, 3*time.Second) + So(err, ShouldBeNil) + + // Wait for block producing + time.Sleep(15 * time.Second) + + // Kill one BP + err = nodeCmds[2].Cmd.Process.Signal(syscall.SIGTERM) + So(err, ShouldBeNil) + time.Sleep(15 * time.Second) + + // The other peers should be waiting + var ( + req = &types.FetchLastIrreversibleBlockReq{} + resp = &types.FetchLastIrreversibleBlockResp{} + + lastBlockCount uint32 + ) + err = rpc.RequestBP(route.MCCFetchLastIrreversibleBlock.String(), req, resp) + So(err, ShouldBeNil) + lastBlockCount = resp.Count + time.Sleep(15 * time.Second) + err = rpc.RequestBP(route.MCCFetchLastIrreversibleBlock.String(), req, resp) + So(err, ShouldBeNil) + So(resp.Count, ShouldEqual, lastBlockCount) + }) +}