-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathsession.go
143 lines (118 loc) · 3.74 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package cassandra
import (
"time"
"github.com/gocql/gocql"
)
// Session is an interface that describes the surface area of interacting with
// a cassandra store.
type Session interface {
ExecQuery(string) error
ExecIterQuery(query string) (ScanIter, CloseIter)
Close()
}
// CQLSession implements a session to cassandra.
type CQLSession struct {
session *gocql.Session
}
// ScanIter defines the function type returned from ExecIterQuery.
type ScanIter func(...interface{}) bool
// CloseIter is a type returned from ExecIterQuery. When used it wraps the
// close of the underlying iterator.
type CloseIter func() error
func NewInsecureCQLConfig(ips []string, port int, keyspace string, consistency string, timeout string) (*gocql.ClusterConfig, error) {
cluster := gocql.NewCluster(ips...)
cluster.Keyspace = keyspace
if port != 0 {
cluster.Port = port
}
cluster.Consistency = gocql.ParseConsistency(consistency)
var err error
cluster.Timeout, err = time.ParseDuration(timeout)
if err != nil {
return nil, err
}
return cluster, nil
}
func NewSecuredCQLConfig(ips []string, port int, keyspace string, consistency string, timeout string, capath string, username string, passwd string) (*gocql.ClusterConfig, error) {
cluster, err := NewInsecureCQLConfig(ips, port, keyspace, consistency, timeout)
if err != nil {
return nil, err
}
cluster.DisableInitialHostLookup = true
cluster.SslOpts = &gocql.SslOptions{
CaPath: capath,
}
cluster.Authenticator = gocql.PasswordAuthenticator{
Username: username,
Password: passwd,
}
return cluster, nil
}
func NewCQLSessionFromConfig(cluster *gocql.ClusterConfig) (*CQLSession, error) {
s, err := cluster.CreateSession()
if err != nil {
return nil, err
}
return &CQLSession{
session: s,
}, nil
}
// NewCQLSession returns a populated CQLSession struct, or an error using the
// underlying cassandra driver.
func NewCQLSession(ips []string, port int, keyspace string, consistency string, timeout string) (*CQLSession, error) {
cluster, err := NewInsecureCQLConfig(ips, port, keyspace, consistency, timeout)
if err != nil {
return nil, err
}
return NewCQLSessionFromConfig(cluster)
}
func NewSecuredCQLSession(ips []string, port int, keyspace string, consistency string, timeout string, capath string, username string, passwd string) (*CQLSession, error) {
cluster, err := NewSecuredCQLConfig(ips, port, keyspace, consistency, timeout, capath, username, passwd)
if err != nil {
return nil, err
}
return NewCQLSessionFromConfig(cluster)
}
// ExecQuery executes the provided query against the underlying cassandra
// session.
func (s *CQLSession) ExecQuery(query string) error {
return s.session.Query(query).Exec()
}
// ExecIterQuery performs an iterated query against the underlying session.
func (s *CQLSession) ExecIterQuery(query string) (ScanIter, CloseIter) {
iter := s.session.Query(query).Iter()
return func(dest ...interface{}) bool {
return iter.Scan(dest...)
}, func() error {
return iter.Close()
}
}
// Close closes the underlying cassandra session.
func (s *CQLSession) Close() {
s.session.Close()
}
// MockCassSession is used in testing.
type MockCassSession struct {
query string
}
// ExecQuery implements the interface for testing.
func (s *MockCassSession) ExecQuery(query string) error {
s.query = query
return nil
}
// ExecIterQuery implements the interface for testing.
func (s *MockCassSession) ExecIterQuery(query string) (ScanIter, CloseIter) {
s.query = query
return func(dest ...interface{}) bool {
return false
},
func() error {
return nil
}
}
// Close implements the interface for testing.
func (s *MockCassSession) Close() {}
// LastQuery is used during tests to validate the generated query.
func (s *MockCassSession) LastQuery() string {
return s.query
}