Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@
name = "github.com/CovenantSQL/xurls"
branch = "master"

[[override]]
name = "github.com/xtaci/smux"
branch = "master"

[[override]]
name = "github.com/siddontang/go-mysql"
source = "github.com/CovenantSQL/go-mysql"
Expand Down
67 changes: 66 additions & 1 deletion cmd/cql-minerd/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ import (
"time"

"github.com/CovenantSQL/CovenantSQL/client"
"github.com/CovenantSQL/CovenantSQL/conf"
"github.com/CovenantSQL/CovenantSQL/crypto/asymmetric"
"github.com/CovenantSQL/CovenantSQL/proto"
"github.com/CovenantSQL/CovenantSQL/rpc"
"github.com/CovenantSQL/CovenantSQL/utils"
"github.com/CovenantSQL/CovenantSQL/utils/log"
"github.com/CovenantSQL/go-sqlite3-encrypt"
Expand Down Expand Up @@ -316,7 +319,7 @@ func stopNodes() {
func TestFullProcess(t *testing.T) {
log.SetLevel(log.DebugLevel)

Convey("test full process", t, func() {
Convey("test full process", t, func(c C) {
startNodes()
defer stopNodes()
var err error
Expand Down Expand Up @@ -384,6 +387,68 @@ func TestFullProcess(t *testing.T) {
So(err, ShouldBeNil)
So(resultBytes, ShouldResemble, []byte("ha\001ppy"))

Convey("test query cancel", FailureContinues, func(c C) {
/* test cancel write query */
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
db.Exec("INSERT INTO test VALUES(sleep(10000000000))")
}()
time.Sleep(time.Second)
wg.Add(1)
go func() {
defer wg.Done()
var err error
_, err = db.Exec("UPDATE test SET test = 100;")
// should be canceled
c.So(err, ShouldNotBeNil)
}()
time.Sleep(time.Second)
for _, n := range conf.GConf.KnownNodes {
if n.Role == proto.Miner {
rpc.GetSessionPoolInstance().Remove(n.ID)
}
}
time.Sleep(time.Second)

// ensure connection
db.Query("SELECT 1")

// test before write operation complete
var result int
err = db.QueryRow("SELECT * FROM test WHERE test = 4 LIMIT 1").Scan(&result)
c.So(err, ShouldBeNil)
c.So(result, ShouldEqual, 4)

wg.Wait()

/* test cancel read query */
go func() {
_, err = db.Query("SELECT * FROM test WHERE test = sleep(10000000000)")
// call write query using read query interface
//_, err = db.Query("INSERT INTO test VALUES(sleep(10000000000))")
c.So(err, ShouldNotBeNil)
}()
time.Sleep(time.Second)
for _, n := range conf.GConf.KnownNodes {
if n.Role == proto.Miner {
rpc.GetSessionPoolInstance().Remove(n.ID)
}
}
time.Sleep(time.Second)
// ensure connection
db.Query("SELECT 1")

/* test long running write query */
row = db.QueryRow("SELECT * FROM test WHERE test = 10000000000 LIMIT 1")
err = row.Scan(&result)
c.So(err, ShouldBeNil)
c.So(result, ShouldEqual, 10000000000)

c.So(err, ShouldBeNil)
})

err = db.Close()
So(err, ShouldBeNil)

Expand Down
55 changes: 28 additions & 27 deletions kayak/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (

const (
// commit channel window size
commitWindow = 10
commitWindow = 0
// prepare window
trackerWindow = 10
)
Expand Down Expand Up @@ -246,6 +246,7 @@ func (r *Runtime) Shutdown() (err error) {
// Apply defines entry for Leader node.
func (r *Runtime) Apply(ctx context.Context, req interface{}) (result interface{}, logIndex uint64, err error) {
var commitFuture <-chan *commitResult
var cResult *commitResult

var tmStart, tmLeaderPrepare, tmFollowerPrepare, tmCommitEnqueue, tmLeaderRollback,
tmRollback, tmCommitDequeue, tmLeaderCommit, tmCommit time.Time
Expand Down Expand Up @@ -350,37 +351,36 @@ func (r *Runtime) Apply(ctx context.Context, req interface{}) (result interface{

tmCommitEnqueue = time.Now()

select {
case cResult := <-commitFuture:
if cResult != nil {
logIndex = prepareLog.Index
result = cResult.result
err = cResult.err

tmCommitDequeue = cResult.start
dbCost = cResult.dbCost
tmLeaderCommit = time.Now()

// wait until context deadline or commit done
if cResult.rpc != nil {
cResult.rpc.get(ctx)
}
} else {
log.Fatal("IMPOSSIBLE BRANCH")
select {
case <-ctx.Done():
err = errors.Wrap(ctx.Err(), "process commit timeout")
goto ROLLBACK
default:
}
}
case <-ctx.Done():
// pipeline commit timeout
if commitFuture == nil {
logIndex = prepareLog.Index
err = errors.Wrap(ctx.Err(), "enqueue commit timeout")
goto ROLLBACK
}

cResult = <-commitFuture
if cResult != nil {
logIndex = prepareLog.Index
result = cResult.result
err = cResult.err

tmCommitDequeue = cResult.start
dbCost = cResult.dbCost
tmLeaderCommit = time.Now()

// wait until context deadline or commit done
if cResult.rpc != nil {
cResult.rpc.get(ctx)
}
} else {
log.Fatal("IMPOSSIBLE BRANCH")
select {
case <-ctx.Done():
err = errors.Wrap(ctx.Err(), "process commit timeout")
goto ROLLBACK
default:
}
}

tmCommit = time.Now()

return
Expand Down Expand Up @@ -572,6 +572,7 @@ func (r *Runtime) leaderCommitResult(ctx context.Context, reqPayload interface{}

select {
case <-ctx.Done():
res = nil
case r.commitCh <- req:
}

Expand Down
25 changes: 21 additions & 4 deletions proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package proto

import (
"context"
"time"
)

Expand All @@ -30,19 +31,22 @@ type EnvelopeAPI interface {
GetTTL() time.Duration
GetExpire() time.Duration
GetNodeID() *RawNodeID
GetContext() context.Context

SetVersion(string)
SetTTL(time.Duration)
SetExpire(time.Duration)
SetNodeID(*RawNodeID)
SetContext(context.Context)
}

// Envelope is the protocol header
type Envelope struct {
Version string `json:"v"`
TTL time.Duration `json:"t"`
Expire time.Duration `json:"e"`
NodeID *RawNodeID `json:"id"`
Version string `json:"v"`
TTL time.Duration `json:"t"`
Expire time.Duration `json:"e"`
NodeID *RawNodeID `json:"id"`
_ctx context.Context `json:"-"`
}

// PingReq is Ping RPC request
Expand Down Expand Up @@ -120,6 +124,14 @@ func (e *Envelope) GetNodeID() *RawNodeID {
return e.NodeID
}

// GetContext returns context from envelop which is set in server Accept
func (e *Envelope) GetContext() context.Context {
if e._ctx == nil {
return context.Background()
}
return e._ctx
}

// SetVersion implements EnvelopeAPI.SetVersion
func (e *Envelope) SetVersion(ver string) {
e.Version = ver
Expand All @@ -140,5 +152,10 @@ func (e *Envelope) SetNodeID(nodeID *RawNodeID) {
e.NodeID = nodeID
}

// SetContext set a ctx in envelope
func (e *Envelope) SetContext(ctx context.Context) {
e._ctx = ctx
}

// DatabaseID is database name, will be generated from UUID
type DatabaseID string
7 changes: 7 additions & 0 deletions proto/proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package proto

import (
"context"
"testing"
"time"

Expand All @@ -41,5 +42,11 @@ func TestEnvelope_GetSet(t *testing.T) {

env.SetVersion("0.0.1")
So(env.GetVersion(), ShouldEqual, "0.0.1")

ctx := env.GetContext()
So(ctx, ShouldEqual, context.Background())
cldCtx, _ := context.WithCancel(ctx)
env.SetContext(cldCtx)
So(env.GetContext(), ShouldEqual, cldCtx)
})
}
7 changes: 6 additions & 1 deletion rpc/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package rpc

import (
"context"
"net/rpc"

"github.com/CovenantSQL/CovenantSQL/proto"
Expand All @@ -26,13 +27,15 @@ import (
type NodeAwareServerCodec struct {
rpc.ServerCodec
NodeID *proto.RawNodeID
Ctx context.Context
}

// NewNodeAwareServerCodec returns new NodeAwareServerCodec with normal rpc.ServerCode and proto.RawNodeID
func NewNodeAwareServerCodec(codec rpc.ServerCodec, nodeID *proto.RawNodeID) *NodeAwareServerCodec {
func NewNodeAwareServerCodec(ctx context.Context, codec rpc.ServerCodec, nodeID *proto.RawNodeID) *NodeAwareServerCodec {
return &NodeAwareServerCodec{
ServerCodec: codec,
NodeID: nodeID,
Ctx: ctx,
}
}

Expand All @@ -51,6 +54,8 @@ func (nc *NodeAwareServerCodec) ReadRequestBody(body interface{}) (err error) {
if r, ok := body.(proto.EnvelopeAPI); ok {
// inject node id to rpc envelope
r.SetNodeID(nc.NodeID)
// inject context
r.SetContext(nc.Ctx)
}

return
Expand Down
21 changes: 10 additions & 11 deletions rpc/rpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/rand"
"net"
"net/rpc"
"strings"
"sync"

"github.com/CovenantSQL/CovenantSQL/crypto/kms"
Expand Down Expand Up @@ -65,6 +66,7 @@ func (c *PersistentCaller) initClient(isAnonymous bool) (err error) {
c.Lock()
defer c.Unlock()
if c.client == nil {
log.Debug("init new rpc client")
var conn net.Conn
conn, err = DialToNode(c.TargetID, c.pool, isAnonymous)
if err != nil {
Expand Down Expand Up @@ -93,29 +95,26 @@ func (c *PersistentCaller) Call(method string, args interface{}, reply interface
if err == io.EOF ||
err == io.ErrUnexpectedEOF ||
err == io.ErrClosedPipe ||
err == rpc.ErrShutdown {
err == rpc.ErrShutdown ||
strings.Contains(strings.ToLower(err.Error()), "shut down") ||
strings.Contains(strings.ToLower(err.Error()), "broken pipe") {
// if got EOF, retry once
err = c.Reconnect(method)
if err != nil {
log.WithField("rpc", method).WithError(err).Error("reconnect failed")
reconnectErr := c.ResetClient(method)
if reconnectErr != nil {
log.WithField("rpc", method).WithError(reconnectErr).Error("reconnect failed")
}
}
log.WithField("rpc", method).WithError(err).Error("call RPC failed")
}
return
}

// Reconnect tries to rebuild RPC client
func (c *PersistentCaller) Reconnect(method string) (err error) {
// ResetClient resets client.
func (c *PersistentCaller) ResetClient(method string) (err error) {
c.Lock()
c.Close()
c.client = nil
c.Unlock()
err = c.initClient(method == route.DHTPing.String())
if err != nil {
log.WithField("rpc", method).WithError(err).Error("second init client for RPC failed")
return
}
return
}

Expand Down
8 changes: 7 additions & 1 deletion rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package rpc

import (
"context"
"io"
"net"
"net/rpc"
Expand Down Expand Up @@ -149,7 +150,12 @@ sessionLoop:
}
break sessionLoop
}
nodeAwareCodec := NewNodeAwareServerCodec(utils.GetMsgPackServerCodec(muxConn), remoteNodeID)
ctx, cancelFunc := context.WithCancel(context.Background())
go func() {
<-muxConn.GetDieCh()
cancelFunc()
}()
nodeAwareCodec := NewNodeAwareServerCodec(ctx, utils.GetMsgPackServerCodec(muxConn), remoteNodeID)
go s.rpcServer.ServeCodec(nodeAwareCodec)
}
}
Expand Down
Loading