From 002cfe6ff53f517f050bd7d5ad73016eee2ac10a Mon Sep 17 00:00:00 2001 From: Bator Tsyrendylykov Date: Fri, 17 Mar 2023 11:37:04 +0300 Subject: [PATCH 01/18] Add option to skip host resolving for redis cluster --- rediscluster/cluster.go | 30 ++++++++++++++++++------------ redisconn/conn.go | 4 +++- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/rediscluster/cluster.go b/rediscluster/cluster.go index eab6833..8701dc1 100644 --- a/rediscluster/cluster.go +++ b/rediscluster/cluster.go @@ -59,7 +59,7 @@ const ( preferConnected ) -// Opts is a options for Cluster +// Opts is an options for Cluster type Opts struct { // HostOpts - per host options // Note that HostOpts.Handle will be overwritten to ClusterHandle{ cluster.opts.Handle, conn.address} @@ -68,7 +68,7 @@ type Opts struct { // if ConnsPerHost < 1 then ConnsPerHost = 2 ConnsPerHost int // ConnHostPolicy - either prefer to send to first connection until it is disconnected, or - // send to all connections in round robin maner. + // send to all connections in round-robin manner. // default: ConnHostPreferFirst ConnHostPolicy ConnHostPolicyEnum // Handle is returned with Cluster.Handle() @@ -79,7 +79,7 @@ type Opts struct { // Check interval - default cluster configuration reloading interval // default: 5 seconds, min: 100 millisecond, max: 10 minutes // Note, that MOVE and ASK redis errors will force configuration reloading, - // therefore there is not need to make it very frequent. + // therefore there is no need to make it very frequent. CheckInterval time.Duration // MovedRetries - follow MOVED|ASK redirections this number of times // default: 3, min: 1, max: 10 @@ -95,8 +95,12 @@ type Opts struct { RoundRobinSeed RoundRobinSeed // LatencyOrientedRR - when MasterAndSlaves is used, prefer hosts with lower latency LatencyOrientedRR bool - TLSEnabled bool - TLSConfig *tls.Config + // Enable connection with TLS + TLSEnabled bool + // Config for TLS connection + TLSConfig *tls.Config + // Do not resolve host address to ip + SkipHostResolving bool } // Cluster is implementation of redis.Sender which represents connection to redis-cluster. @@ -105,7 +109,7 @@ type Opts struct { // There could be several connections to single redis server, it is controlled by Opts.ConnsPerHost, // and Opts.ConnHostPolicy specifies how to use them. // -// By default requests are always sent to known master of replica-set. But you could override it with +// By default, requests are always sent to known master of replica-set. But you could override it with // Cluster.WithPolicy. Write commands still will be sent to master, unless you specify ForceMasterAndSlaves // or ForcePreferSlaves policy. Note: read-only commands are hard-coded in UPCASE format, therefore command // will not be recognized as read-only if it is Camel-case or low-case. @@ -239,15 +243,17 @@ func NewCluster(ctx context.Context, initAddrs []string, opts Opts) (*Cluster, e var err error for _, addr := range initAddrs { - // If redis hosts are mentioned by names, couple of connections will be established and closed shortly. - // Lets resolve them to ip addresses. - addr, err = redisclusterutil.Resolve(addr) - if err != nil { - return nil, ErrAddressNotResolved.WrapWithNoMessage(err) + if !opts.SkipHostResolving { + // If redis hosts are mentioned by names, a couple of connections will be established and closed shortly. + // Let's resolve them to ip addresses. + addr, err = redisclusterutil.Resolve(addr) + if err != nil { + return nil, ErrAddressNotResolved.WrapWithNoMessage(err) + } } if _, ok := config.masters[addr]; !ok { config.nodes[addr], err = cluster.newNode(addr, true) - // since we connecting asynchronously, it can be only configuration error + // since we're connecting asynchronously, it can be only configuration error if err != nil { cluster.cancel() return nil, err diff --git a/redisconn/conn.go b/redisconn/conn.go index d87daae..43a75c7 100644 --- a/redisconn/conn.go +++ b/redisconn/conn.go @@ -68,8 +68,10 @@ type Opts struct { // It will allow to use this connector in script like (ie single threaded) environment // where it is ok to use blocking commands and pipelining gives no gain. ScriptMode bool + // Enable connection with TLS TLSEnabled bool - TLSConfig *tls.Config + // Config for TLS connection + TLSConfig *tls.Config } // Connection is implementation of redis.Sender which represents single connection to single redis instance. From 6dbeadda0be41dc3ac331edfa5c38614d26867e6 Mon Sep 17 00:00:00 2001 From: Bator Tsyrendylykov Date: Mon, 20 Mar 2023 15:41:55 +0300 Subject: [PATCH 02/18] Move address resolving closer to redis connection. Preserve original hostname for TLS verification. --- rediscluster/cluster.go | 8 ------- rediscluster/mapping.go | 28 +++++++++++++++++++++--- rediscluster/redisclusterutil/resolve.go | 4 ++++ 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/rediscluster/cluster.go b/rediscluster/cluster.go index 8701dc1..c555541 100644 --- a/rediscluster/cluster.go +++ b/rediscluster/cluster.go @@ -243,14 +243,6 @@ func NewCluster(ctx context.Context, initAddrs []string, opts Opts) (*Cluster, e var err error for _, addr := range initAddrs { - if !opts.SkipHostResolving { - // If redis hosts are mentioned by names, a couple of connections will be established and closed shortly. - // Let's resolve them to ip addresses. - addr, err = redisclusterutil.Resolve(addr) - if err != nil { - return nil, ErrAddressNotResolved.WrapWithNoMessage(err) - } - } if _, ok := config.masters[addr]; !ok { config.nodes[addr], err = cluster.newNode(addr, true) // since we're connecting asynchronously, it can be only configuration error diff --git a/rediscluster/mapping.go b/rediscluster/mapping.go index 69cb53c..9ec63b7 100644 --- a/rediscluster/mapping.go +++ b/rediscluster/mapping.go @@ -1,6 +1,7 @@ package rediscluster import ( + "crypto/tls" "fmt" "sync/atomic" "unsafe" @@ -8,6 +9,7 @@ import ( "github.com/joomcode/errorx" "github.com/joomcode/redispipe/redis" + "github.com/joomcode/redispipe/rediscluster/redisclusterutil" "github.com/joomcode/redispipe/redisconn" ) @@ -33,8 +35,29 @@ type ClusterHandle struct { // newNode creates handle for a connection, that will be established in a future. func (c *Cluster) newNode(addr string, initial bool) (*node, error) { + var err error + connectionAddr := addr + nodeOpts := c.opts.HostOpts + + if !c.opts.SkipHostResolving { + // If redis hosts are mentioned by names, a couple of connections will be established and closed shortly. + // Let's resolve them to ip addresses. + connectionAddr, err = redisclusterutil.Resolve(connectionAddr) + if err != nil { + return nil, ErrAddressNotResolved.WrapWithNoMessage(err) + } + } + + if nodeOpts.TLSEnabled && !redisclusterutil.IsIPAddress(addr) { + // preserve original hostname for TLS verification + if nodeOpts.TLSConfig != nil { + nodeOpts.TLSConfig = &tls.Config{} + } + nodeOpts.TLSConfig.ServerName = addr + } + node := &node{ - opts: c.opts.HostOpts, + opts: nodeOpts, addr: addr, refcnt: 1, } @@ -42,8 +65,7 @@ func (c *Cluster) newNode(addr string, initial bool) (*node, error) { node.conns = make([]*redisconn.Connection, c.opts.ConnsPerHost) for i := range node.conns { node.opts.Handle = ClusterHandle{c.opts.Handle, addr, i} - var err error - node.conns[i], err = redisconn.Connect(c.ctx, addr, node.opts) + node.conns[i], err = redisconn.Connect(c.ctx, connectionAddr, node.opts) if err != nil { if initial { return nil, err diff --git a/rediscluster/redisclusterutil/resolve.go b/rediscluster/redisclusterutil/resolve.go index d213e45..61184f7 100644 --- a/rediscluster/redisclusterutil/resolve.go +++ b/rediscluster/redisclusterutil/resolve.go @@ -14,3 +14,7 @@ func Resolve(addr string) (string, error) { } return net.JoinHostPort(ips[0], port), nil } + +func IsIPAddress(addr string) bool { + return net.ParseIP(addr) != nil +} From 1863ca91abfc79c327af44937e46cc8746608671 Mon Sep 17 00:00:00 2001 From: Bator Tsyrendylykov Date: Mon, 20 Mar 2023 16:49:34 +0300 Subject: [PATCH 03/18] Fix tls config server name --- rediscluster/mapping.go | 15 ++++++++------- rediscluster/redisclusterutil/resolve.go | 10 +++++----- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/rediscluster/mapping.go b/rediscluster/mapping.go index 9ec63b7..8683a12 100644 --- a/rediscluster/mapping.go +++ b/rediscluster/mapping.go @@ -42,18 +42,19 @@ func (c *Cluster) newNode(addr string, initial bool) (*node, error) { if !c.opts.SkipHostResolving { // If redis hosts are mentioned by names, a couple of connections will be established and closed shortly. // Let's resolve them to ip addresses. - connectionAddr, err = redisclusterutil.Resolve(connectionAddr) + var originalHost string + connectionAddr, originalHost, err = redisclusterutil.Resolve(connectionAddr) if err != nil { return nil, ErrAddressNotResolved.WrapWithNoMessage(err) } - } - if nodeOpts.TLSEnabled && !redisclusterutil.IsIPAddress(addr) { - // preserve original hostname for TLS verification - if nodeOpts.TLSConfig != nil { - nodeOpts.TLSConfig = &tls.Config{} + if nodeOpts.TLSEnabled && !redisclusterutil.IsIPAddress(originalHost) { + // preserve original hostname for TLS verification + if nodeOpts.TLSConfig != nil { + nodeOpts.TLSConfig = &tls.Config{} + } + nodeOpts.TLSConfig.ServerName = originalHost } - nodeOpts.TLSConfig.ServerName = addr } node := &node{ diff --git a/rediscluster/redisclusterutil/resolve.go b/rediscluster/redisclusterutil/resolve.go index 61184f7..07466f8 100644 --- a/rediscluster/redisclusterutil/resolve.go +++ b/rediscluster/redisclusterutil/resolve.go @@ -2,17 +2,17 @@ package redisclusterutil import "net" -// Resolve just resolves hostname:port to ipaddr:port -func Resolve(addr string) (string, error) { +// Resolve just resolves hostname:port to ipaddr:port and also returns original hostname +func Resolve(addr string) (string, string, error) { ip, port, err := net.SplitHostPort(addr) if err != nil { - return "", err + return "", "", err } ips, err := net.LookupHost(ip) if err != nil { - return "", err + return "", "", err } - return net.JoinHostPort(ips[0], port), nil + return net.JoinHostPort(ips[0], port), ip, nil } func IsIPAddress(addr string) bool { From e8e2b4701a910774d3d9f730c23f422069dd551e Mon Sep 17 00:00:00 2001 From: Bator Tsyrendylykov Date: Mon, 20 Mar 2023 17:55:24 +0300 Subject: [PATCH 04/18] Get host from address explicitly --- rediscluster/mapping.go | 8 ++++++-- rediscluster/redisclusterutil/resolve.go | 19 ++++++++++++++----- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/rediscluster/mapping.go b/rediscluster/mapping.go index 8683a12..c221893 100644 --- a/rediscluster/mapping.go +++ b/rediscluster/mapping.go @@ -42,12 +42,16 @@ func (c *Cluster) newNode(addr string, initial bool) (*node, error) { if !c.opts.SkipHostResolving { // If redis hosts are mentioned by names, a couple of connections will be established and closed shortly. // Let's resolve them to ip addresses. - var originalHost string - connectionAddr, originalHost, err = redisclusterutil.Resolve(connectionAddr) + connectionAddr, err = redisclusterutil.Resolve(connectionAddr) if err != nil { return nil, ErrAddressNotResolved.WrapWithNoMessage(err) } + originalHost, err := redisclusterutil.GetHost(addr) + if err != nil { + return nil, err + } + if nodeOpts.TLSEnabled && !redisclusterutil.IsIPAddress(originalHost) { // preserve original hostname for TLS verification if nodeOpts.TLSConfig != nil { diff --git a/rediscluster/redisclusterutil/resolve.go b/rediscluster/redisclusterutil/resolve.go index 07466f8..7695802 100644 --- a/rediscluster/redisclusterutil/resolve.go +++ b/rediscluster/redisclusterutil/resolve.go @@ -2,17 +2,26 @@ package redisclusterutil import "net" -// Resolve just resolves hostname:port to ipaddr:port and also returns original hostname -func Resolve(addr string) (string, string, error) { +// Resolve just resolves hostname:port to ipaddr:port +func Resolve(addr string) (string, error) { ip, port, err := net.SplitHostPort(addr) if err != nil { - return "", "", err + return "", err } ips, err := net.LookupHost(ip) if err != nil { - return "", "", err + return "", err } - return net.JoinHostPort(ips[0], port), ip, nil + return net.JoinHostPort(ips[0], port), nil +} + +func GetHost(addr string) (string, error) { + host, _, err := net.SplitHostPort(addr) + if err != nil { + return "", err + } + + return host, nil } func IsIPAddress(addr string) bool { From ece74bf5d37c898601fac398f7a58619ea7e6451 Mon Sep 17 00:00:00 2001 From: Bator Tsyrendylykov Date: Mon, 20 Mar 2023 18:00:42 +0300 Subject: [PATCH 05/18] Clone TLS Config for modification --- rediscluster/mapping.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/rediscluster/mapping.go b/rediscluster/mapping.go index c221893..9fce619 100644 --- a/rediscluster/mapping.go +++ b/rediscluster/mapping.go @@ -55,9 +55,13 @@ func (c *Cluster) newNode(addr string, initial bool) (*node, error) { if nodeOpts.TLSEnabled && !redisclusterutil.IsIPAddress(originalHost) { // preserve original hostname for TLS verification if nodeOpts.TLSConfig != nil { - nodeOpts.TLSConfig = &tls.Config{} + nodeOpts.TLSConfig = nodeOpts.TLSConfig.Clone() + nodeOpts.TLSConfig.ServerName = originalHost + } else { + nodeOpts.TLSConfig = &tls.Config{ + ServerName: originalHost, + } } - nodeOpts.TLSConfig.ServerName = originalHost } } From 0cbeb54a1100273efeca8083829f19e7fdc32079 Mon Sep 17 00:00:00 2001 From: Bator Tsyrendylykov Date: Tue, 21 Mar 2023 10:17:02 +0300 Subject: [PATCH 06/18] Remove SkipHostResolving option --- rediscluster/cluster.go | 4 +-- rediscluster/mapping.go | 62 ++++++++++++++++++++++++----------------- 2 files changed, 38 insertions(+), 28 deletions(-) diff --git a/rediscluster/cluster.go b/rediscluster/cluster.go index c555541..a50eb34 100644 --- a/rediscluster/cluster.go +++ b/rediscluster/cluster.go @@ -59,7 +59,7 @@ const ( preferConnected ) -// Opts is an options for Cluster +// Opts holds the options for Cluster type Opts struct { // HostOpts - per host options // Note that HostOpts.Handle will be overwritten to ClusterHandle{ cluster.opts.Handle, conn.address} @@ -99,8 +99,6 @@ type Opts struct { TLSEnabled bool // Config for TLS connection TLSConfig *tls.Config - // Do not resolve host address to ip - SkipHostResolving bool } // Cluster is implementation of redis.Sender which represents connection to redis-cluster. diff --git a/rediscluster/mapping.go b/rediscluster/mapping.go index 9fce619..0d458d7 100644 --- a/rediscluster/mapping.go +++ b/rediscluster/mapping.go @@ -37,36 +37,21 @@ type ClusterHandle struct { func (c *Cluster) newNode(addr string, initial bool) (*node, error) { var err error connectionAddr := addr - nodeOpts := c.opts.HostOpts - - if !c.opts.SkipHostResolving { - // If redis hosts are mentioned by names, a couple of connections will be established and closed shortly. - // Let's resolve them to ip addresses. - connectionAddr, err = redisclusterutil.Resolve(connectionAddr) - if err != nil { - return nil, ErrAddressNotResolved.WrapWithNoMessage(err) - } - originalHost, err := redisclusterutil.GetHost(addr) - if err != nil { - return nil, err - } + // If redis hosts are mentioned by names, a couple of connections will be established and closed shortly. + // Let's resolve them to ip addresses. + connectionAddr, err = redisclusterutil.Resolve(connectionAddr) + if err != nil { + return nil, ErrAddressNotResolved.WrapWithNoMessage(err) + } - if nodeOpts.TLSEnabled && !redisclusterutil.IsIPAddress(originalHost) { - // preserve original hostname for TLS verification - if nodeOpts.TLSConfig != nil { - nodeOpts.TLSConfig = nodeOpts.TLSConfig.Clone() - nodeOpts.TLSConfig.ServerName = originalHost - } else { - nodeOpts.TLSConfig = &tls.Config{ - ServerName: originalHost, - } - } - } + nodeOpts, err := c.nodeOpts(addr) + if err != nil { + return nil, err } node := &node{ - opts: nodeOpts, + opts: *nodeOpts, addr: addr, refcnt: 1, } @@ -88,6 +73,33 @@ func (c *Cluster) newNode(addr string, initial bool) (*node, error) { return node, nil } +func (c *Cluster) nodeOpts(addr string) (*redisconn.Opts, error) { + nodeOpts := c.opts.HostOpts + + if !nodeOpts.TLSEnabled { + return &nodeOpts, nil + } + + originalHost, err := redisclusterutil.GetHost(addr) + if err != nil { + return nil, err + } + + if !redisclusterutil.IsIPAddress(originalHost) { + // preserve original hostname for TLS verification + if nodeOpts.TLSConfig != nil { + nodeOpts.TLSConfig = nodeOpts.TLSConfig.Clone() + nodeOpts.TLSConfig.ServerName = originalHost + } else { + nodeOpts.TLSConfig = &tls.Config{ + ServerName: originalHost, + } + } + } + + return &nodeOpts, nil +} + type connThen func(conn *redisconn.Connection, err error) // Call callback with connection to specified address. From e46d20d84e7ab2ada70c83404224e75b93109e07 Mon Sep 17 00:00:00 2001 From: Bator Tsyrendylykov Date: Tue, 21 Mar 2023 10:55:08 +0300 Subject: [PATCH 07/18] Define ErrAddressHostname error. Set TLS config ServerName refactoring --- rediscluster/error.go | 2 ++ rediscluster/mapping.go | 8 +++----- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/rediscluster/error.go b/rediscluster/error.go index ce429aa..eac9abe 100644 --- a/rediscluster/error.go +++ b/rediscluster/error.go @@ -13,6 +13,8 @@ var ( // ErrAddressNotResolved - address could not be resolved // Cluster resolves named hosts specified as start points. If this resolution fails, this error returned. ErrAddressNotResolved = ErrCluster.NewType("resolve_address") + // ErrAddressHostname - hostname could not be extracted from address + ErrAddressHostname = ErrCluster.NewType("address_hostname") // ErrClusterConfigEmpty - no addresses found in config. ErrClusterConfigEmpty = ErrCluster.NewType("config_empty") // ErrNoAliveConnection - no alive connection to shard diff --git a/rediscluster/mapping.go b/rediscluster/mapping.go index 0d458d7..2f0113d 100644 --- a/rediscluster/mapping.go +++ b/rediscluster/mapping.go @@ -82,19 +82,17 @@ func (c *Cluster) nodeOpts(addr string) (*redisconn.Opts, error) { originalHost, err := redisclusterutil.GetHost(addr) if err != nil { - return nil, err + return nil, ErrAddressHostname.WrapWithNoMessage(err) } if !redisclusterutil.IsIPAddress(originalHost) { // preserve original hostname for TLS verification if nodeOpts.TLSConfig != nil { nodeOpts.TLSConfig = nodeOpts.TLSConfig.Clone() - nodeOpts.TLSConfig.ServerName = originalHost } else { - nodeOpts.TLSConfig = &tls.Config{ - ServerName: originalHost, - } + nodeOpts.TLSConfig = &tls.Config{} } + nodeOpts.TLSConfig.ServerName = originalHost } return &nodeOpts, nil From 55754e9f651b39ef03dfeddb48939bb0b01cf801 Mon Sep 17 00:00:00 2001 From: Sergey Zagursky Date: Fri, 14 Apr 2023 12:04:13 +0300 Subject: [PATCH 08/18] Add byte stats --- .gitignore | 3 +++ redis/reader.go | 56 ++++++++++++++++++++++-------------------- redis/reader_test.go | 3 ++- rediscluster/logger.go | 10 ++++---- redisconn/conn.go | 26 ++++++++++++-------- redisconn/logger.go | 6 ++--- redisconn/request.go | 8 +++--- redisdumb/conn.go | 4 +-- 8 files changed, 66 insertions(+), 50 deletions(-) diff --git a/.gitignore b/.gitignore index f458559..eb2d8ff 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,6 @@ #our version with redis with patched cluster redis-server + +# temporary test files +rediscluster/redis_test_* diff --git a/redis/reader.go b/redis/reader.go index d568474..e49e535 100644 --- a/redis/reader.go +++ b/redis/reader.go @@ -10,24 +10,24 @@ import ( ) // ReadResponse reads single RESP answer from bufio.Reader -func ReadResponse(b *bufio.Reader) interface{} { +func ReadResponse(b *bufio.Reader) (interface{}, int) { line, isPrefix, err := b.ReadLine() if err != nil { - return ErrIO.WrapWithNoMessage(err) + return ErrIO.WrapWithNoMessage(err), 0 } if isPrefix { - return ErrHeaderlineTooLarge.NewWithNoMessage().WithProperty(EKLine, line) + return ErrHeaderlineTooLarge.NewWithNoMessage().WithProperty(EKLine, line), len(line) } if len(line) == 0 { - return ErrHeaderlineEmpty.NewWithNoMessage() + return ErrHeaderlineEmpty.NewWithNoMessage(), 0 } var v int64 switch line[0] { case '+': - return string(line[1:]) + return string(line[1:]), len(line) case '-': // detect MOVED and ASK txt := string(line[1:]) @@ -36,68 +36,72 @@ func ReadResponse(b *bufio.Reader) interface{} { if moved || ask { parts := bytes.Split(line, []byte(" ")) if len(parts) < 3 { - return ErrResponseFormat.NewWithNoMessage().WithProperty(EKLine, line) + return ErrResponseFormat.NewWithNoMessage().WithProperty(EKLine, line), len(line) } slot, err := parseInt(parts[1]) if err != nil { - return err.WithProperty(EKLine, line) + return err.WithProperty(EKLine, line), len(line) } kind := ErrAsk if moved { kind = ErrMoved } - return kind.New(txt).WithProperty(EKMovedTo, string(parts[2])).WithProperty(EKSlot, slot) + return kind.New(txt).WithProperty(EKMovedTo, string(parts[2])).WithProperty(EKSlot, slot), len(line) } if strings.HasPrefix(txt, "LOADING") { - return ErrLoading.New(txt) + return ErrLoading.New(txt), len(line) } if strings.HasPrefix(txt, "EXECABORT") { - return ErrExecAbort.New(txt) + return ErrExecAbort.New(txt), len(line) } if strings.HasPrefix(txt, "TRYAGAIN") { - return ErrTryAgain.New(txt) + return ErrTryAgain.New(txt), len(line) } - return ErrResult.New(txt) + return ErrResult.New(txt), len(line) case ':': v, err := parseInt(line[1:]) if err != nil { - return err.WithProperty(EKLine, line) + return err.WithProperty(EKLine, line), len(line) } - return v + return v, len(line) case '$': var rerr *errorx.Error if v, rerr = parseInt(line[1:]); rerr != nil { - return rerr.WithProperty(EKLine, line) + return rerr.WithProperty(EKLine, line), len(line) } if v < 0 { - return nil + return nil, len(line) } + nBytes := 0 buf := make([]byte, v+2, v+2) - if _, err = io.ReadFull(b, buf); err != nil { - return ErrIO.WrapWithNoMessage(err) + if nBytes, err = io.ReadFull(b, buf); err != nil { + return ErrIO.WrapWithNoMessage(err), nBytes + len(line) } if buf[v] != '\r' || buf[v+1] != '\n' { - return ErrNoFinalRN.NewWithNoMessage() + return ErrNoFinalRN.NewWithNoMessage(), nBytes + len(line) } - return buf[:v:v] + return buf[:v:v], nBytes + len(line) case '*': var rerr *errorx.Error if v, rerr = parseInt(line[1:]); rerr != nil { - return rerr.WithProperty(EKLine, line) + return rerr.WithProperty(EKLine, line), len(line) } if v < 0 { - return nil + return nil, len(line) } + totalResponseBytes := len(line) result := make([]interface{}, v) for i := int64(0); i < v; i++ { - result[i] = ReadResponse(b) + currentResponseBytes := 0 + result[i], currentResponseBytes = ReadResponse(b) + totalResponseBytes += currentResponseBytes if e, ok := result[i].(*errorx.Error); ok && !e.IsOfType(ErrResult) { - return e + return e, totalResponseBytes } } - return result + return result, totalResponseBytes default: - return ErrUnknownHeaderType.NewWithNoMessage() + return ErrUnknownHeaderType.NewWithNoMessage(), len(line) } } diff --git a/redis/reader_test.go b/redis/reader_test.go index 0a0c478..18b1761 100644 --- a/redis/reader_test.go +++ b/redis/reader_test.go @@ -18,7 +18,8 @@ func lines2bufio(lines ...string) *bufio.Reader { } func readLines(lines ...string) interface{} { - return ReadResponse(lines2bufio(lines...)) + r, _ := ReadResponse(lines2bufio(lines...)) + return r } func checkErrType(t *testing.T, res interface{}, kind *errorx.Type) bool { diff --git a/rediscluster/logger.go b/rediscluster/logger.go index 2b6d36b..f5f89db 100644 --- a/rediscluster/logger.go +++ b/rediscluster/logger.go @@ -14,7 +14,7 @@ type Logger interface { // ReqStat is called after request receives it's answer with request/result information // and time spend to fulfill request. // Default implementation is no-op. - ReqStat(c *Cluster, conn *redisconn.Connection, req Request, res interface{}, nanos int64) + ReqStat(c *Cluster, conn *redisconn.Connection, req Request, res interface{}, nanos, bytesIn, bytesOut int64) } func (c *Cluster) report(event LogEvent) { @@ -87,7 +87,7 @@ func (d DefaultLogger) Report(cluster *Cluster, event LogEvent) { } // ReqStat implements Logger.ReqStat as no-op. -func (d DefaultLogger) ReqStat(c *Cluster, conn *redisconn.Connection, req Request, res interface{}, nanos int64) { +func (d DefaultLogger) ReqStat(_ *Cluster, _ *redisconn.Connection, _ Request, _ interface{}, _, _, _ int64) { // noop } @@ -102,8 +102,8 @@ func (d defaultConnLogger) Report(conn *redisconn.Connection, event redisconn.Lo } // Report implements redisconn.Logger.ReqStat -func (d defaultConnLogger) ReqStat(conn *redisconn.Connection, req Request, res interface{}, nanos int64) { - d.Cluster.opts.Logger.ReqStat(d.Cluster, conn, req, res, nanos) +func (d defaultConnLogger) ReqStat(conn *redisconn.Connection, req Request, res interface{}, nanos, bytesIn, bytesOut int64) { + d.Cluster.opts.Logger.ReqStat(d.Cluster, conn, req, res, nanos, bytesIn, bytesOut) } // NoopLogger implements Logger with no logging at all. @@ -113,5 +113,5 @@ type NoopLogger struct{} func (d NoopLogger) Report(conn *Cluster, event LogEvent) {} // ReqStat implements Logger.ReqStat -func (d NoopLogger) ReqStat(c *Cluster, conn *redisconn.Connection, req Request, res interface{}, nanos int64) { +func (d NoopLogger) ReqStat(c *Cluster, conn *redisconn.Connection, req Request, res interface{}, nanos, bytesIn, bytesOut int64) { } diff --git a/redisconn/conn.go b/redisconn/conn.go index 43a75c7..72a2907 100644 --- a/redisconn/conn.go +++ b/redisconn/conn.go @@ -336,9 +336,9 @@ func (conn *Connection) doSend(req Request, cb Future, n uint64, asking bool) *e futures := conn.futures if asking { // send ASKING request before actual - futures = append(futures, future{&dumb, 0, 0, Request{"ASKING", nil}}) + futures = append(futures, future{&dumb, 0, 0, 0, 0, Request{"ASKING", nil}}) } - futures = append(futures, future{cb, n, nownano(), req}) + futures = append(futures, future{cb, n, nownano(), 0, 0, req}) // should notify writer about this shard having queries. // Since we are under shard lock, it is safe to send notification before assigning futures. @@ -448,22 +448,22 @@ func (conn *Connection) doSendBatch(requests []Request, cb Future, start uint64, futures := conn.futures if flags&DoAsking != 0 { // send ASKING request before actual - futures = append(futures, future{&dumb, 0, 0, Request{"ASKING", nil}}) + futures = append(futures, future{&dumb, 0, 0, 0, 0, Request{"ASKING", nil}}) } if flags&DoTransaction != 0 { // send MULTI request for transaction start - futures = append(futures, future{&dumb, 0, 0, Request{"MULTI", nil}}) + futures = append(futures, future{&dumb, 0, 0, 0, 0, Request{"MULTI", nil}}) } now := nownano() for i, req := range requests { - futures = append(futures, future{cb, start + uint64(i), now, req}) + futures = append(futures, future{cb, start + uint64(i), now, 0, 0, req}) } if flags&DoTransaction != 0 { // send EXEC request for transaction end - futures = append(futures, future{cb, start + uint64(len(requests)), now, Request{"EXEC", nil}}) + futures = append(futures, future{cb, start + uint64(len(requests)), now, 0, 0, Request{"EXEC", nil}}) } // should notify writer about this shard having queries @@ -581,7 +581,7 @@ func (conn *Connection) dial() error { var res interface{} // Password response if conn.opts.Password != "" { - res = redis.ReadResponse(r) + res, _ = redis.ReadResponse(r) if err := redis.AsErrorx(res); err != nil { connection.Close() if !err.IsOfType(redis.ErrIO) { @@ -591,7 +591,7 @@ func (conn *Connection) dial() error { } } // PING Response - res = redis.ReadResponse(r) + res, _ = redis.ReadResponse(r) if err := redis.AsErrorx(res); err != nil { connection.Close() if !err.IsOfType(redis.ErrIO) { @@ -607,7 +607,7 @@ func (conn *Connection) dial() error { } // SELECT DB Response if conn.opts.DB != 0 { - res = redis.ReadResponse(r) + res, _ = redis.ReadResponse(r) if err := redis.AsErrorx(res); err != nil { connection.Close() if !err.IsOfType(redis.ErrIO) { @@ -845,12 +845,16 @@ func (conn *Connection) writer(one *oneconn) { // serialize requests for i, fut := range futures { + bytesBefore := len(packet) + var err error if packet, err = redis.AppendRequest(packet, fut.req); err != nil { // since we checked arguments in doSend and doSendBatch, error here is a signal of programmer error. // lets just panic and die. panic(err) } + + futures[i].bytesOut = int64(len(packet) - bytesBefore) if fut.req.Cmd == "PING" { futures[i].start = nownano() } @@ -893,7 +897,8 @@ func (conn *Connection) reader(r *bufio.Reader, one *oneconn) { for { // try to read response from buffered socket. // Here is IOTimeout handled as well (through deadlineIO wrapper around socket). - res = redis.ReadResponse(r) + resBytes := 0 + res, resBytes = redis.ReadResponse(r) if rerr := redis.AsErrorx(res); rerr != nil { if !rerr.IsOfType(redis.ErrResult) { // it is not redis-sended error, then close connection @@ -924,6 +929,7 @@ func (conn *Connection) reader(r *bufio.Reader, one *oneconn) { if rerr := redis.AsErrorx(res); rerr != nil { res = conn.addProps(rerr).WithProperty(redis.EKRequest, fut.req) } + fut.bytesIn = int64(resBytes) conn.resolve(fut, res) } diff --git a/redisconn/logger.go b/redisconn/logger.go index 48d64a3..f574ba3 100644 --- a/redisconn/logger.go +++ b/redisconn/logger.go @@ -10,7 +10,7 @@ type Logger interface { // ReqStat is called after request receives it's answer with request/result information // and time spend to fulfill request. // Default implementation is no-op. - ReqStat(conn *Connection, req Request, res interface{}, nanos int64) + ReqStat(conn *Connection, req Request, res interface{}, nanos, bytesIn, bytesOut int64) } // LogEvent is a sum-type for events to be logged. @@ -79,7 +79,7 @@ func (d DefaultLogger) Report(conn *Connection, event LogEvent) { } // ReqStat implements Logger.ReqStat -func (d DefaultLogger) ReqStat(conn *Connection, req Request, res interface{}, nanos int64) { +func (d DefaultLogger) ReqStat(_ *Connection, _ Request, _ interface{}, _, _, _ int64) { // noop } @@ -91,4 +91,4 @@ type NoopLogger struct{} func (d NoopLogger) Report(*Connection, LogEvent) {} // ReqStat implements Logger.ReqStat -func (d NoopLogger) ReqStat(conn *Connection, req Request, res interface{}, nanos int64) {} +func (d NoopLogger) ReqStat(_ *Connection, _ Request, _ interface{}, _, _, _ int64) {} diff --git a/redisconn/request.go b/redisconn/request.go index fd09b62..cdac9e8 100644 --- a/redisconn/request.go +++ b/redisconn/request.go @@ -16,8 +16,10 @@ type future struct { Future N uint64 - start int64 - req Request + start int64 + bytesIn int64 + bytesOut int64 + req Request } var epoch = time.Now() @@ -29,7 +31,7 @@ func nownano() int64 { func (c *Connection) resolve(f future, res interface{}) { if f.start != 0 && f.req.Cmd != "" { delta := nownano() - f.start - c.opts.Logger.ReqStat(c, f.req, res, delta) + c.opts.Logger.ReqStat(c, f.req, res, delta, f.bytesIn, f.bytesOut) if f.req.Cmd == "PING" { c.storePingLatency(time.Duration(delta)) } diff --git a/redisdumb/conn.go b/redisdumb/conn.go index 498af0f..68bc1f3 100644 --- a/redisdumb/conn.go +++ b/redisdumb/conn.go @@ -74,7 +74,7 @@ func (c *Conn) Do(cmd string, args ...interface{}) interface{} { req, err = redis.AppendRequest(nil, redis.Request{cmd, args}) if err == nil { if _, err = c.C.Write(req); err == nil { - res := redis.ReadResponse(c.R) + res, _ := redis.ReadResponse(c.R) rerr := redis.AsErrorx(res) if rerr == nil { return res @@ -213,6 +213,6 @@ func Do(addr string, cmd string, args ...interface{}) interface{} { if _, err = conn.Write(req); err != nil { return redis.ErrIO.WrapWithNoMessage(err) } - res := redis.ReadResponse(bufio.NewReader(conn)) + res, _ := redis.ReadResponse(bufio.NewReader(conn)) return res } From 6c9b1712068350cfde41c3f7c45bd7ebb2179483 Mon Sep 17 00:00:00 2001 From: Artem Klyukvin Date: Wed, 24 May 2023 15:57:03 +0700 Subject: [PATCH 09/18] refactor shard selection (#27) --- rediscluster/cluster.go | 21 ++++-- rediscluster/mapping.go | 151 ++++++++++++++++++++++---------------- rediscluster/slotrange.go | 12 +-- 3 files changed, 106 insertions(+), 78 deletions(-) diff --git a/rediscluster/cluster.go b/rediscluster/cluster.go index a50eb34..ebd8d0d 100644 --- a/rediscluster/cluster.go +++ b/rediscluster/cluster.go @@ -59,6 +59,11 @@ const ( preferConnected ) +const ( + disabled = 0 + enabled = 1 +) + // Opts holds the options for Cluster type Opts struct { // HostOpts - per host options @@ -144,10 +149,10 @@ type clusterConfig struct { } type shard struct { - rr uint32 - good uint32 - addr []string - weights []uint32 + rr uint32 + good uint32 + addr []string + pingWeights []uint32 } type shardMap map[uint16]*shard type masterMap map[string]uint16 @@ -223,9 +228,9 @@ func NewCluster(ctx context.Context, initAddrs []string, opts Opts) (*Cluster, e cluster.opts.WaitToMigrate = 100 * time.Millisecond } - cluster.latencyAwareness = 0 + cluster.latencyAwareness = disabled if cluster.opts.LatencyOrientedRR { - cluster.latencyAwareness = 1 + cluster.latencyAwareness = enabled } cluster.opts.HostOpts.TLSEnabled = opts.TLSEnabled cluster.opts.HostOpts.TLSConfig = opts.TLSConfig @@ -290,9 +295,9 @@ func (c *Cluster) Handle() interface{} { // SetLatencyOrientedRR changes "latency awareness" on the fly. func (c *Cluster) SetLatencyOrientedRR(v bool) { if v { - atomic.StoreUint32(&c.latencyAwareness, 1) + atomic.StoreUint32(&c.latencyAwareness, enabled) } else { - atomic.StoreUint32(&c.latencyAwareness, 0) + atomic.StoreUint32(&c.latencyAwareness, disabled) } } diff --git a/rediscluster/mapping.go b/rediscluster/mapping.go index 2f0113d..af70cc0 100644 --- a/rediscluster/mapping.go +++ b/rediscluster/mapping.go @@ -247,90 +247,113 @@ var rr, rs = func() ([32]uint32, [32]uint32) { // connForSlot returns established connection for slot, if it exists. func (c *Cluster) connForSlot(slot uint16, policy ReplicaPolicyEnum, seen []*redisconn.Connection) (*redisconn.Connection, *errorx.Error) { - var conn *redisconn.Connection cfg := c.getConfig() shard := cfg.slot2shard(slot) - nodes := cfg.nodes if shard == nil { return nil, c.err(ErrClusterConfigEmpty).WithProperty(redis.EKSlot, slot) } - var addr string + conn := c.connForPolicy(policy, seen, shard, cfg) + if conn == nil { + c.ForceReloading() + return nil, c.err(ErrNoAliveConnection).WithProperty(redis.EKSlot, slot).WithProperty(EKPolicy, policy) + } + return conn, nil +} + +func (c *Cluster) connForPolicy(policy ReplicaPolicyEnum, seen []*redisconn.Connection, shard *shard, cfg *clusterConfig) *redisconn.Connection { switch policy { case MasterOnly: - addr = shard.addr[0] - node := nodes[addr] - if node == nil { - break /*switch*/ - } - conn = node.getConn(c.opts.ConnHostPolicy, preferConnected, seen) + return c.connForPolicyMaster(seen, shard, cfg) case MasterAndSlaves, PreferSlaves: - var ws [32]uint32 - if atomic.LoadUint32(&c.latencyAwareness) == 0 { - ws = rr - if policy == PreferSlaves { - ws = rs - } - } else { - for i := range shard.weights { - ws[i] = atomic.LoadUint32(&shard.weights[i]) - } - } - weights := ws[:len(shard.weights)] + return c.connForPolicySlaves(policy, seen, shard, cfg) + default: + panic("unknown policy") + } +} - health := atomic.LoadUint32(&shard.good) // load health information - healthWeight := uint32(0) - for i, w := range weights { - if health&(1< Date: Thu, 25 May 2023 15:10:34 +0700 Subject: [PATCH 10/18] add possibility to set replica weights explicitly (#28) --- rediscluster/cluster.go | 12 +++++++++++- rediscluster/mapping.go | 19 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/rediscluster/cluster.go b/rediscluster/cluster.go index ebd8d0d..eb55689 100644 --- a/rediscluster/cluster.go +++ b/rediscluster/cluster.go @@ -64,6 +64,14 @@ const ( enabled = 1 ) +// WeightProvider explicitly provides weights for redis replicas +type WeightProvider interface { + // GetWeightByHost provides weight by given name. If implementation does not have weight for a given host + // it must return `false` as the second return value. In this case scheduler will fallback to default + // scheduling strategy (either constant weights random or ping latency based random) + GetWeightByHost(host string) (uint32, bool) +} + // Opts holds the options for Cluster type Opts struct { // HostOpts - per host options @@ -98,8 +106,10 @@ type Opts struct { // RoundRobinSeed - used to choose between master and replica. RoundRobinSeed RoundRobinSeed - // LatencyOrientedRR - when MasterAndSlaves is used, prefer hosts with lower latency + // LatencyOrientedRR - when MasterAndSlaves is used, prefer hosts with lower latency (has lower priority than WeightProvider) LatencyOrientedRR bool + // WeightProvider - enables to explicitly set weights of replicas (has higher priority than LatencyOrientedRR) + WeightProvider WeightProvider // Enable connection with TLS TLSEnabled bool // Config for TLS connection diff --git a/rediscluster/mapping.go b/rediscluster/mapping.go index af70cc0..1c13ce8 100644 --- a/rediscluster/mapping.go +++ b/rediscluster/mapping.go @@ -341,6 +341,25 @@ func (*Cluster) getHealthWeight(weights []uint32, health uint32) uint32 { } func (c *Cluster) weightsForPolicySlaves(policy ReplicaPolicyEnum, shard *shard) []uint32 { + if c.opts.WeightProvider == nil { + return c.weightsForPolicySlavesDefault(policy, shard) + } + + weights := make([]uint32, 0, len(shard.addr)) + for _, addr := range shard.addr { + weight, found := c.opts.WeightProvider.GetWeightByHost(addr) + if !found { + // there was some reconfiguration, so we fallback to default weights + return c.weightsForPolicySlavesDefault(policy, shard) + } + + weights = append(weights, weight) + } + + return weights +} + +func (c *Cluster) weightsForPolicySlavesDefault(policy ReplicaPolicyEnum, shard *shard) []uint32 { var ws []uint32 if atomic.LoadUint32(&c.latencyAwareness) == disabled { ws = rr[:] From dda13443c0623b84fe67b86babd1b6cab63ac509 Mon Sep 17 00:00:00 2001 From: Artem Klyukvin Date: Wed, 14 Jun 2023 12:59:04 +0400 Subject: [PATCH 11/18] force min ping replica under option (#29) --- rediscluster/cluster.go | 18 +++++++++++++++++- rediscluster/slotrange.go | 20 ++++++++++++++++++-- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/rediscluster/cluster.go b/rediscluster/cluster.go index eb55689..6d01e0d 100644 --- a/rediscluster/cluster.go +++ b/rediscluster/cluster.go @@ -108,6 +108,8 @@ type Opts struct { RoundRobinSeed RoundRobinSeed // LatencyOrientedRR - when MasterAndSlaves is used, prefer hosts with lower latency (has lower priority than WeightProvider) LatencyOrientedRR bool + // ForceMinLatencyReplica - when LatencyOrientedRR is used forces min latency replica instead of using `weight[i] = sum(ping_latency[i]) / ping_latency[i]` algorithm + ForceMinLatencyReplica bool // WeightProvider - enables to explicitly set weights of replicas (has higher priority than LatencyOrientedRR) WeightProvider WeightProvider // Enable connection with TLS @@ -132,7 +134,8 @@ type Cluster struct { opts Opts - latencyAwareness uint32 + latencyAwareness uint32 + forceMinLatencyReplica uint32 m sync.Mutex @@ -242,6 +245,10 @@ func NewCluster(ctx context.Context, initAddrs []string, opts Opts) (*Cluster, e if cluster.opts.LatencyOrientedRR { cluster.latencyAwareness = enabled } + cluster.forceMinLatencyReplica = disabled + if cluster.opts.ForceMinLatencyReplica { + cluster.forceMinLatencyReplica = enabled + } cluster.opts.HostOpts.TLSEnabled = opts.TLSEnabled cluster.opts.HostOpts.TLSConfig = opts.TLSConfig @@ -311,6 +318,15 @@ func (c *Cluster) SetLatencyOrientedRR(v bool) { } } +// SetForceMinLatencyReplica changes "min latency replica forcing" on the fly +func (c *Cluster) SetForceMinLatencyReplica(v bool) { + if v { + atomic.StoreUint32(&c.forceMinLatencyReplica, enabled) + } else { + atomic.StoreUint32(&c.forceMinLatencyReplica, disabled) + } +} + func (c *Cluster) control() { t := time.NewTicker(c.opts.CheckInterval) defer t.Stop() diff --git a/rediscluster/slotrange.go b/rediscluster/slotrange.go index 18672fc..a65891d 100644 --- a/rediscluster/slotrange.go +++ b/rediscluster/slotrange.go @@ -2,6 +2,7 @@ package rediscluster import ( "bytes" + "math" "sync/atomic" "time" @@ -173,13 +174,28 @@ func (c *Cluster) updateMappings(slotRanges []redisclusterutil.SlotsRange) { } for _, shard := range newConfig.shards { sumLatency := uint32(0) - for _, addr := range shard.addr { + minLatencyID := 0 + minLatency := uint32(math.MaxUint32) + + for i, addr := range shard.addr { node := newConfig.nodes[addr] - sumLatency += atomic.LoadUint32(&node.ping) + pingLatency := atomic.LoadUint32(&node.ping) + if pingLatency < minLatency { + minLatency = pingLatency + minLatencyID = i + } + + sumLatency += pingLatency } for i, addr := range shard.addr { node := newConfig.nodes[addr] + weight := sumLatency / atomic.LoadUint32(&node.ping) + if atomic.LoadUint32(&c.forceMinLatencyReplica) == enabled && i == minLatencyID { + const alwaysPrefer = 1_000_000 + weight = alwaysPrefer + } + atomic.StoreUint32(&shard.pingWeights[i], weight) } } From c7ea52dc325cd2c1bb145a5b4905e9ad96fbfb8c Mon Sep 17 00:00:00 2001 From: Anna Tikhonova Date: Fri, 26 Apr 2024 11:40:27 +0300 Subject: [PATCH 12/18] Indicate if redis node is disconnected in InstanceInfo --- rediscluster/redisclusterutil/cluster.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rediscluster/redisclusterutil/cluster.go b/rediscluster/redisclusterutil/cluster.go index 8ec9093..c0508d9 100644 --- a/rediscluster/redisclusterutil/cluster.go +++ b/rediscluster/redisclusterutil/cluster.go @@ -105,6 +105,7 @@ type InstanceInfo struct { // More probably, redis instance with other UUID were started on the same port. NoAddr bool SlaveOf string + Connected bool Slots [][2]uint16 Migrating []SlotMigration } @@ -312,6 +313,7 @@ func ParseClusterNodes(res interface{}) (InstanceInfos, error) { } node.NoAddr = strings.Contains(parts[2], "noaddr") node.MySelf = strings.Contains(parts[2], "myself") + node.Connected = parts[7] == "connected" for _, slot := range parts[8:] { if slot[0] == '[' { From 2bf65814be8a7608ef95f8aacc24305804749c43 Mon Sep 17 00:00:00 2001 From: Marat Khasanov Date: Wed, 18 Sep 2024 11:50:43 +0100 Subject: [PATCH 13/18] Ignore zero addresses in CLUSTER SLOTS response --- rediscluster/redisclusterutil/cluster.go | 15 +- rediscluster/redisclusterutil/cluster_test.go | 182 ++++++++++++++++++ 2 files changed, 193 insertions(+), 4 deletions(-) create mode 100644 rediscluster/redisclusterutil/cluster_test.go diff --git a/rediscluster/redisclusterutil/cluster.go b/rediscluster/redisclusterutil/cluster.go index c0508d9..fec10c6 100644 --- a/rediscluster/redisclusterutil/cluster.go +++ b/rediscluster/redisclusterutil/cluster.go @@ -72,12 +72,19 @@ func ParseSlotsInfo(res interface{}) ([]SlotsRange, error) { for j := 2; j < len(rawrange); j++ { rawaddr, ok := rawrange[j].([]interface{}) if !ok || len(rawaddr) < 2 { - return errf("address format mismatch: res[%d][%d] = %+v", + return errf("address format mismatch: res[%d][%d] = %+v, missing lines", i, j, rawrange[j]) } - host, ok := rawaddr[0].([]byte) - port, ok2 := rawaddr[1].(int64) - if !ok || !ok2 || port <= 0 || port+10000 > 65535 { + host, hasHost := rawaddr[0].([]byte) + port, hasPort := rawaddr[1].(int64) + if !hasHost && hasPort && port == 0 { + // fallback to skip zero address + arr, isArray := rawaddr[0].([]interface{}) + if isArray && len(arr) == 0 { + continue + } + } + if !hasHost || !hasPort || port <= 0 || port+10000 > 65535 { return errf("address format mismatch: res[%d][%d] = %+v", i, j, rawaddr) } diff --git a/rediscluster/redisclusterutil/cluster_test.go b/rediscluster/redisclusterutil/cluster_test.go new file mode 100644 index 0000000..c4e3eca --- /dev/null +++ b/rediscluster/redisclusterutil/cluster_test.go @@ -0,0 +1,182 @@ +package redisclusterutil + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParseSlotsInfo(t *testing.T) { + clusterSlotsResponse := []interface{}{ + []interface{}{ + int64(0), + int64(5460), + []interface{}{ + []byte("127.0.0.1"), + int64(30001), + []byte("09dbe9720cda62f7865eabc5fd8857c5d2678366"), + []interface{}{ + "hostname", + "host-1.redis.example.com", + }, + }, + []interface{}{ + []byte("127.0.0.1"), + int64(30004), + []byte("821d8ca00d7ccf931ed3ffc7e3db0599d2271abf"), + []interface{}{ + "hostname", + "host-2.redis.example.com", + }, + }, + }, + []interface{}{ + int64(5461), + int64(10922), + []interface{}{ + []byte("127.0.0.1"), + int64(30002), + []byte("c9d93d9f2c0c524ff34cc11838c2003d8c29e013"), + []interface{}{ + "hostname", + "host-3.redis.example.com", + }, + }, + []interface{}{ + []byte("127.0.0.1"), + int64(30005), + []byte("faadb3eb99009de4ab72ad6b6ed87634c7ee410f"), + []interface{}{ + "hostname", + "host-4.redis.example.com", + }, + }, + }, + []interface{}{ + int64(10923), + int64(16383), + []interface{}{ + []byte("192.168.11.131"), + int64(30003), + []byte("044ec91f325b7595e76dbcb18cc688b6a5b434a1"), + []interface{}{ + "hostname", + "host-5.redis.example.com", + }, + }, + []interface{}{ + []byte("127.0.0.1"), + int64(30006), + []byte("58e6e48d41228013e5d9c1c37c5060693925e97e"), + []interface{}{ + "hostname", + "host-6.redis.example.com", + }, + }, + }, + } + + expectedSlots := []SlotsRange{ + { + From: 0, + To: 5460, + Addrs: []string{ + "127.0.0.1:30001", + "127.0.0.1:30004", + }, + }, + { + From: 5461, + To: 10922, + Addrs: []string{ + "127.0.0.1:30002", + "127.0.0.1:30005", + }, + }, + { + From: 10923, + To: 16383, + Addrs: []string{ + "192.168.11.131:30003", + "127.0.0.1:30006", + }, + }, + } + + slots, err := ParseSlotsInfo(clusterSlotsResponse) + require.NoError(t, err) + + assert.Equal(t, expectedSlots, slots) +} + +func TestParseSlotsInfo_EmptyAddress(t *testing.T) { + clusterSlotsResponse := []interface{}{ + []interface{}{ + int64(0), + int64(5460), + []interface{}{ + []byte("127.0.0.1"), + int64(30001), + []byte("09dbe9720cda62f7865eabc5fd8857c5d2678366"), + []interface{}{ + "hostname", + "host-1.redis.example.com", + }, + }, + []interface{}{ + []interface{}{}, + int64(0), + []byte("821d8ca00d7ccf931ed3ffc7e3db0599d2271abf"), + []interface{}{ + "hostname", + "host-2.redis.example.com", + }, + }, + }, + []interface{}{ + int64(5461), + int64(10922), + []interface{}{ + []interface{}{}, + int64(0), + []byte("c9d93d9f2c0c524ff34cc11838c2003d8c29e013"), + []interface{}{ + "hostname", + "host-3.redis.example.com", + }, + }, + []interface{}{ + []byte("127.0.0.1"), + int64(30005), + []byte("faadb3eb99009de4ab72ad6b6ed87634c7ee410f"), + []interface{}{ + "hostname", + "host-4.redis.example.com", + }, + }, + }, + } + + expectedSlots := []SlotsRange{ + { + From: 0, + To: 5460, + Addrs: []string{ + "127.0.0.1:30001", + }, + }, + { + From: 5461, + To: 10922, + Addrs: []string{ + "127.0.0.1:30005", + }, + }, + } + + slots, err := ParseSlotsInfo(clusterSlotsResponse) + require.NoError(t, err) + + assert.Equal(t, expectedSlots, slots) +} From 090aef1edb3ccd76334d12e2a7b2d75880cc6d91 Mon Sep 17 00:00:00 2001 From: Marat Khasanov Date: Wed, 18 Sep 2024 17:05:38 +0100 Subject: [PATCH 14/18] Describe why we skip zero address --- rediscluster/redisclusterutil/cluster.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rediscluster/redisclusterutil/cluster.go b/rediscluster/redisclusterutil/cluster.go index fec10c6..c6a7659 100644 --- a/rediscluster/redisclusterutil/cluster.go +++ b/rediscluster/redisclusterutil/cluster.go @@ -78,7 +78,9 @@ func ParseSlotsInfo(res interface{}) ([]SlotsRange, error) { host, hasHost := rawaddr[0].([]byte) port, hasPort := rawaddr[1].(int64) if !hasHost && hasPort && port == 0 { - // fallback to skip zero address + // Due to possible Redis cluster misconfiguration we can receive zero address + // for one of the replicas. It is totally fine to skip the misconfigured replica + // and go with the remaining ones without inducing denial of service. arr, isArray := rawaddr[0].([]interface{}) if isArray && len(arr) == 0 { continue From 4b4139e86dcb9477a1a9d5fe564e7d255af067cd Mon Sep 17 00:00:00 2001 From: Yohay Date: Sun, 12 Jan 2025 21:54:05 +0200 Subject: [PATCH 15/18] conn with username --- redisconn/conn.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/redisconn/conn.go b/redisconn/conn.go index 72a2907..42db138 100644 --- a/redisconn/conn.go +++ b/redisconn/conn.go @@ -39,6 +39,8 @@ type Opts struct { DB int // Password for AUTH Password string + // Password for AUTH + Username string // IOTimeout - timeout on read/write to socket. // If IOTimeout == 0, then it is set to 1 second // If IOTimeout < 0, then timeout is disabled @@ -556,7 +558,9 @@ func (conn *Connection) dial() error { // Password request var req []byte - if conn.opts.Password != "" { + if conn.opts.Password != "" && conn.opts.Username != "" { + req, _ = redis.AppendRequest(req, redis.Req("AUTH", conn.opts.Username, conn.opts.Password)) + } else if conn.opts.Password != "" { req, _ = redis.AppendRequest(req, redis.Req("AUTH", conn.opts.Password)) } const pingReq = "*1\r\n$4\r\nPING\r\n" From 667456148d5fdd57d7164bb7de5bd7756f3d8a9d Mon Sep 17 00:00:00 2001 From: Yohay Date: Mon, 13 Jan 2025 07:18:22 +0200 Subject: [PATCH 16/18] Update conn.go --- redisconn/conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redisconn/conn.go b/redisconn/conn.go index 42db138..889903f 100644 --- a/redisconn/conn.go +++ b/redisconn/conn.go @@ -39,7 +39,7 @@ type Opts struct { DB int // Password for AUTH Password string - // Password for AUTH + // Username for AUTH Username string // IOTimeout - timeout on read/write to socket. // If IOTimeout == 0, then it is set to 1 second From 1e74fb9177de6c54d7d3024792a2db644ce090e9 Mon Sep 17 00:00:00 2001 From: Sergey Zagursky Date: Wed, 16 Apr 2025 16:02:25 +0100 Subject: [PATCH 17/18] Trigger topology refresh on CLUSTERDOWN When Redis cluster topology changes quickly, we may encounter a replica that was detached from its master and removed from the cluster. The driver, however, may still have this replica in topology. Sending requests to this replica trigger CLUSTERDOWN responses. To facilitate faster recovery we may force topology request. This is exactly what this commit does. --- redis/error.go | 2 ++ redis/reader.go | 3 +++ 2 files changed, 5 insertions(+) diff --git a/redis/error.go b/redis/error.go index 3aa490f..8442ce2 100644 --- a/redis/error.go +++ b/redis/error.go @@ -69,6 +69,8 @@ var ( ErrMoved = ErrResult.NewSubtype("moved", ErrTraitClusterMove) // ErrAsk - ASK response ErrAsk = ErrResult.NewSubtype("ask", ErrTraitClusterMove) + // ErrClusterDown - CLUSTERDOWN response + ErrClusterDown = ErrResult.NewSubtype("clusterdown", ErrTraitNotSent) // ErrLoading - redis didn't finish start ErrLoading = ErrResult.NewSubtype("loading", ErrTraitNotSent) // ErrExecEmpty - EXEC returns nil (WATCH failed) (it is strange, cause we don't support WATCH) diff --git a/redis/reader.go b/redis/reader.go index e49e535..ac05ffd 100644 --- a/redis/reader.go +++ b/redis/reader.go @@ -48,6 +48,9 @@ func ReadResponse(b *bufio.Reader) (interface{}, int) { } return kind.New(txt).WithProperty(EKMovedTo, string(parts[2])).WithProperty(EKSlot, slot), len(line) } + if strings.HasPrefix(txt, "CLUSTERDOWN") { + return ErrClusterDown.New(txt), len(line) + } if strings.HasPrefix(txt, "LOADING") { return ErrLoading.New(txt), len(line) } From 9c0687ecc74e2c09210f62199885381c68735306 Mon Sep 17 00:00:00 2001 From: Sergey Zagursky Date: Wed, 16 Apr 2025 16:17:13 +0100 Subject: [PATCH 18/18] Update GitHub Actions --- .github/workflows/ci.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8eddca1..7d10318 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,21 +9,21 @@ jobs: strategy: matrix: - go: ["1.19", "1.20"] + go: ["1.19", "1.20", "1.21", "1.22", "1.23", "1.24"] stage: [testredis, testconn, testcluster] steps: - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Set up Go ${{ matrix.go }} - uses: actions/setup-go@v1 + uses: actions/setup-go@v5 with: go-version: ${{ matrix.go }} - name: Cache go modules - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: ~/go/pkg/mod key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} @@ -31,7 +31,7 @@ jobs: ${{ runner.os }}-go- - name: Cache redis build - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: | /tmp/redis-server