Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 88 additions & 16 deletions blockproducer/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math"
"os"
"sync"
"sync/atomic"
"time"

pi "github.com/CovenantSQL/CovenantSQL/blockproducer/interfaces"
Expand Down Expand Up @@ -376,18 +377,32 @@ func (c *Chain) produceBlock(now time.Time) (err error) {
}

// advanceNextHeight does the check and runs block producing if its my turn.
func (c *Chain) advanceNextHeight(now time.Time) {
func (c *Chain) advanceNextHeight(now time.Time, d time.Duration) {
var elapsed = -d

log.WithFields(log.Fields{
"next_height": c.getNextHeight(),
"bp_number": c.serversNum,
"node_index": c.locSvIndex,
}).Info("check turns")
defer c.increaseNextHeight()
"bp_number": c.serversNum,
"node_index": c.locSvIndex,
"enclosing_height": c.getNextHeight() - 1,
"using_timestamp": now.Format(time.RFC3339Nano),
"elapsed_seconds": elapsed.Seconds(),
}).Info("enclosing current height and advancing to next height")

defer c.increaseNextHeight()
// Skip if it's not my turn
if !c.isMyTurn() {
return
}

// Normally, a block producing should start right after the new period, but more time may also
// elapse since the last block synchronizing.
if elapsed > c.tick { // TODO(leventeliu): add threshold config for `elapsed`.
log.WithFields(log.Fields{
"advanced_height": c.getNextHeight(),
"using_timestamp": now.Format(time.RFC3339Nano),
"elapsed_seconds": elapsed.Seconds(),
}).Warn("too much time elapsed in the new period, skip this block")
return
}
log.WithField("height", c.getNextHeight()).Info("producing a new block")
if err := c.produceBlock(now); err != nil {
log.WithField("now", now.Format(time.RFC3339Nano)).WithError(err).Errorln(
Expand Down Expand Up @@ -512,11 +527,17 @@ func (c *Chain) mainCycle(ctx context.Context) {
for {
select {
case <-timer.C:
c.syncCurrentHead(ctx) // Try to fetch block at height `nextHeight-1`
// Try to fetch block at height `nextHeight-1` until enough peers are reachable
if err := c.blockingSyncCurrentHead(ctx); err != nil {
log.WithError(err).Info("abort main cycle")
timer.Reset(0)
return
}

var t, d = c.nextTick()
if d <= 0 {
// Try to produce block at `nextHeight` if it's my turn, and increase height by 1
c.advanceNextHeight(t)
c.advanceNextHeight(t, d)
} else {
log.WithFields(log.Fields{
"peer": c.peerInfo(),
Expand All @@ -535,20 +556,64 @@ func (c *Chain) mainCycle(ctx context.Context) {
}
}

func (c *Chain) syncCurrentHead(ctx context.Context) {
func (c *Chain) blockingSyncCurrentHead(ctx context.Context) (err error) {
var (
ticker *time.Ticker
interval = 1 * time.Second
)
if c.tick < interval {
interval = c.tick
}
ticker = time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if c.syncCurrentHead(ctx) {
return
}
case <-ctx.Done():
err = ctx.Err()
return
}
}
}

// syncCurrentHead synchronizes a block at the current height of the local peer from the known
// remote peers. The return value `ok` indicates that there're at less `c.confirms-1` replies
// from these gossip calls.
func (c *Chain) syncCurrentHead(ctx context.Context) (ok bool) {
var h = c.getNextHeight() - 1
if c.head().height >= h {
ok = true
return
}
// Initiate blocking gossip calls to fetch block of the current height,
// with timeout of one tick.
var (
wg = &sync.WaitGroup{}
cld, ccl = context.WithTimeout(ctx, c.tick)
wg = &sync.WaitGroup{}
cld, ccl = context.WithTimeout(ctx, c.tick)
unreachable uint32
)
defer func() {
wg.Wait()
ccl()
var needConfirms, serversNum = func() (cf, sn uint32) {
c.RLock()
defer c.RUnlock()
cf, sn = c.confirms, c.serversNum
return
}()
if unreachable+needConfirms > serversNum {
log.WithFields(log.Fields{
"peer": c.peerInfo(),
"sync_head_height": h,
"unreachable_count": unreachable,
}).Warn("one or more block producers are currently unreachable")
ok = false
} else {
ok = true
}
}()
for _, s := range c.getPeers().Servers {
if !s.IsEqual(&c.nodeID) {
Expand All @@ -573,15 +638,21 @@ func (c *Chain) syncCurrentHead(ctx context.Context) {
"remote": id,
"height": h,
}).WithError(err).Warn("failed to fetch block")
atomic.AddUint32(&unreachable, 1)
return
}
log.WithFields(log.Fields{
var fields = log.Fields{
"local": c.peerInfo(),
"remote": id,
"height": h,
"parent": resp.Block.ParentHash().Short(4),
"hash": resp.Block.BlockHash().Short(4),
}).Debug("fetched new block from remote peer")
}
defer log.WithFields(fields).Debug("fetch block request reply")
if resp.Block == nil {
return
}
// Push new block from other peers
fields["parent"] = resp.Block.ParentHash().Short(4)
fields["hash"] = resp.Block.BlockHash().Short(4)
select {
case c.pendingBlocks <- resp.Block:
case <-cld.Done():
Expand All @@ -590,6 +661,7 @@ func (c *Chain) syncCurrentHead(ctx context.Context) {
}(s)
}
}
return
}

func (c *Chain) storeTx(tx pi.Transaction) (err error) {
Expand Down
12 changes: 8 additions & 4 deletions blockproducer/chain_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ func (c *Chain) fetchLastIrreversibleBlock() (

func (c *Chain) fetchBlockByHeight(h uint32) (b *types.BPBlock, count uint32, err error) {
var node = c.head().ancestor(h)
// Not found
if node == nil {
err = ErrNoSuchBlock
return
} else if node.block != nil {
}
// OK, and block is cached
if node.block != nil {
b = node.block
count = node.count
return
Expand All @@ -84,10 +86,12 @@ func (c *Chain) fetchBlockByHeight(h uint32) (b *types.BPBlock, count uint32, er

func (c *Chain) fetchBlockByCount(count uint32) (b *types.BPBlock, height uint32, err error) {
var node = c.head().ancestorByCount(count)
// Not found
if node == nil {
err = ErrNoSuchBlock
return
} else if node.block != nil {
}
// OK, and block is cached
if node.block != nil {
b = node.block
height = node.height
return
Expand Down
11 changes: 7 additions & 4 deletions blockproducer/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,13 @@ func TestChain(t *testing.T) {
count, height uint32
)

_, _, err = chain.fetchBlockByHeight(100)
So(err, ShouldEqual, ErrNoSuchBlock)
bl, _, err = chain.fetchBlockByHeight(100)
So(bl, ShouldBeNil)
So(err, ShouldBeNil)

_, _, err = chain.fetchBlockByCount(100)
So(err, ShouldEqual, ErrNoSuchBlock)
bl, _, err = chain.fetchBlockByCount(100)
So(bl, ShouldBeNil)
So(err, ShouldBeNil)

bl, count, err = chain.fetchBlockByHeight(0)
So(err, ShouldBeNil)
Expand Down Expand Up @@ -340,6 +342,7 @@ func TestChain(t *testing.T) {
defer sv.Stop()

chain.server = sv
chain.confirms = 1
chain.Start()
defer func() {
chain.Stop()
Expand Down
2 changes: 0 additions & 2 deletions blockproducer/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ var (
ErrInvalidMerkleTreeRoot = errors.New("Block merkle tree root does not match the tx hashes")
// ErrParentNotMatch defines invalid parent hash.
ErrParentNotMatch = errors.New("Block's parent hash cannot match best block")
// ErrNoSuchBlock defines no such block error.
ErrNoSuchBlock = errors.New("Cannot find such block")

// ErrBalanceOverflow indicates that there will be an overflow after balance manipulation.
ErrBalanceOverflow = errors.New("balance overflow")
Expand Down
13 changes: 4 additions & 9 deletions blockproducer/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,16 @@ func WaitDatabaseCreation(
period time.Duration,
) (err error) {
var (
timer = time.NewTimer(0)
req = &types.QuerySQLChainProfileReq{
ticker = time.NewTicker(period)
req = &types.QuerySQLChainProfileReq{
DBID: dbID,
}
resp = &types.QuerySQLChainProfileResp{}
)
defer func() {
if !timer.Stop() {
<-timer.C
}
}()
defer ticker.Stop()
for {
select {
case <-timer.C:
timer.Reset(period)
case <-ticker.C:
if err = rpc.RequestBP(
route.MCCQuerySQLChainProfile.String(), req, resp,
); err != nil {
Expand Down
14 changes: 13 additions & 1 deletion cmd/cqld/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package main
import (
"context"
"net"
"os"
"os/exec"
"path/filepath"
"sync"
Expand Down Expand Up @@ -60,6 +61,9 @@ func start3BPs() {

// start 3bps
var cmd *utils.CMD
os.Remove(FJ(testWorkingDir, "./node_0/chain.db"))
os.Remove(FJ(testWorkingDir, "./node_0/dht.db"))
os.Remove(FJ(testWorkingDir, "./node_0/public.keystore"))
if cmd, err = utils.RunCommandNB(
FJ(baseDir, "./bin/cqld.test"),
[]string{"-config", FJ(testWorkingDir, "./node_0/config.yaml"),
Expand All @@ -71,6 +75,9 @@ func start3BPs() {
} else {
log.Errorf("start node failed: %v", err)
}
os.Remove(FJ(testWorkingDir, "./node_1/chain.db"))
os.Remove(FJ(testWorkingDir, "./node_1/dht.db"))
os.Remove(FJ(testWorkingDir, "./node_1/public.keystore"))
if cmd, err = utils.RunCommandNB(
FJ(baseDir, "./bin/cqld.test"),
[]string{"-config", FJ(testWorkingDir, "./node_1/config.yaml"),
Expand All @@ -82,6 +89,9 @@ func start3BPs() {
} else {
log.Errorf("start node failed: %v", err)
}
os.Remove(FJ(testWorkingDir, "./node_2/chain.db"))
os.Remove(FJ(testWorkingDir, "./node_2/dht.db"))
os.Remove(FJ(testWorkingDir, "./node_2/public.keystore"))
if cmd, err = utils.RunCommandNB(
FJ(baseDir, "./bin/cqld.test"),
[]string{"-config", FJ(testWorkingDir, "./node_2/config.yaml"),
Expand All @@ -93,6 +103,8 @@ func start3BPs() {
} else {
log.Errorf("start node failed: %v", err)
}
os.Remove(FJ(testWorkingDir, "./node_c/dht.db"))
os.Remove(FJ(testWorkingDir, "./node_c/public.keystore"))
}

func stopNodes() {
Expand All @@ -111,8 +123,8 @@ func stopNodes() {
}
}(nodeCmd)
}

wg.Wait()
nodeCmds = nil
}

func TestStartBP_CallRPC(t *testing.T) {
Expand Down
Loading