From 7d767c4aba4c5e33494fa21312a50007c73c4b7d Mon Sep 17 00:00:00 2001 From: Brett Lawson <> Date: Mon, 28 Oct 2024 16:37:55 -0700 Subject: [PATCH] ING-959: Refactored hostname handling when parsing configs. This commit also refactors how we deal with remote hostnames and addresses with the various clients and connections inside of the library. Primarily with the intent of simplifying the interfaces that we expose, and ensuring the correct details are available in the right places. --- configwatcher_memd.go | 16 +-- contrib/cbconfig/parse.go | 16 +++ generate-mocks.go | 2 +- kvclient.go | 52 +++----- kvclient_ops.go | 4 +- kvclient_test.go | 75 ++++------- kvclientmanager.go | 31 ++++- kvclientpool.go | 49 ++++---- memdx/client.go | 5 +- memdx/conn.go | 8 +- memdx/dispatcher.go | 2 - memdx/errors.go | 14 +-- memdx/errors_test.go | 6 +- memdx/harness_int_test.go | 15 +-- memdx/ops_core.go | 42 +++---- memdx/ops_core_errors_test.go | 15 +-- memdx/ops_crud.go | 44 +++---- memdx/ops_crud_rangescan.go | 6 +- memdx/ops_utils.go | 8 +- memdxclient.go | 16 +++ mock_kvclient_test.go | 228 +++++++++++++++++++++++----------- utils.go | 13 ++ vbucketrouter.go | 62 ++++----- 23 files changed, 403 insertions(+), 326 deletions(-) create mode 100644 contrib/cbconfig/parse.go create mode 100644 memdxclient.go diff --git a/configwatcher_memd.go b/configwatcher_memd.go index 622382e8..c3690571 100644 --- a/configwatcher_memd.go +++ b/configwatcher_memd.go @@ -2,8 +2,6 @@ package gocbcorex import ( "context" - "encoding/json" - "errors" "sync" "time" @@ -67,13 +65,8 @@ func configWatcherMemd_pollOne( return nil, err } - host, _, _ := client.RemoteHostPort() - if host == "" { - return nil, errors.New("unexpected cccp endpoint format") - } - logger.Debug("Polling for new config", - zap.String("host", host), + zap.String("endpoint", endpoint), zap.String("endpoint", endpoint)) resp, err := client.GetClusterConfig(ctx, &memdx.GetClusterConfigRequest{}) @@ -81,8 +74,9 @@ func configWatcherMemd_pollOne( return nil, err } - var config cbconfig.TerseConfigJson - err = json.Unmarshal(resp.Config, &config) + hostname := client.RemoteHostname() + + config, err := cbconfig.ParseTerseConfig(resp.Config, hostname) if err != nil { return nil, err } @@ -91,7 +85,7 @@ func configWatcherMemd_pollOne( zap.Int("config", config.Rev), zap.Int("configRevEpoch", config.RevEpoch)) - parsedConfig, err := ConfigParser{}.ParseTerseConfig(&config, host) + parsedConfig, err := ConfigParser{}.ParseTerseConfig(config, hostname) if err != nil { return nil, err } diff --git a/contrib/cbconfig/parse.go b/contrib/cbconfig/parse.go new file mode 100644 index 00000000..7605f209 --- /dev/null +++ b/contrib/cbconfig/parse.go @@ -0,0 +1,16 @@ +package cbconfig + +import ( + "bytes" + "encoding/json" +) + +func ParseTerseConfig(config []byte, sourceHostname string) (*TerseConfigJson, error) { + config = bytes.ReplaceAll(config, []byte("$HOST"), []byte(sourceHostname)) + var configOut *TerseConfigJson + err := json.Unmarshal(config, &configOut) + if err != nil { + return nil, err + } + return configOut, nil +} diff --git a/generate-mocks.go b/generate-mocks.go index d7db6ad9..a82b3cc1 100644 --- a/generate-mocks.go +++ b/generate-mocks.go @@ -1,6 +1,6 @@ //go:generate moq -out mock_collectionresolver_test.go . CollectionResolver //go:generate moq -out mock_vbucketrouter_test.go . VbucketRouter -//go:generate moq -out mock_kvclient_test.go . KvClient MemdxDispatcherCloser +//go:generate moq -out mock_kvclient_test.go . KvClient MemdxClient //go:generate moq -out mock_kvclientpool_test.go . KvClientPool //go:generate moq -out mock_kvclientmanager_test.go . KvClientManager //go:generate moq -out mock_retrymanager_test.go . RetryManager RetryController diff --git a/kvclient.go b/kvclient.go index 4c56b0d6..8e2218e4 100644 --- a/kvclient.go +++ b/kvclient.go @@ -19,7 +19,7 @@ import ( "github.com/couchbase/gocbcorex/memdx" ) -type GetMemdxClientFunc func(opts *memdx.ClientOptions) MemdxDispatcherCloser +type GetMemdxClientFunc func(opts *memdx.ClientOptions) MemdxClient type KvClientConfig struct { Address string @@ -91,27 +91,20 @@ type KvClient interface { LoadFactor() float64 - RemoteHostPort() (string, string, int) - LocalHostPort() (string, int) + RemoteHostname() string + RemoteAddr() net.Addr + LocalAddr() net.Addr KvClientOps -} - -type MemdxDispatcherCloser interface { memdx.Dispatcher - Close() error } type kvClient struct { logger *zap.Logger - remoteHostName string - remoteHost string - remotePort int - localHost string - localPort int + remoteHostname string pendingOperations uint64 - cli MemdxDispatcherCloser + cli MemdxClient durationMetric metric.Float64Histogram lock sync.Mutex @@ -132,20 +125,12 @@ type kvClient struct { var _ KvClient = (*kvClient)(nil) func NewKvClient(ctx context.Context, config *KvClientConfig, opts *KvClientOptions) (*kvClient, error) { - parseHostPort := func(addr string) (string, int) { - host, portStr, _ := net.SplitHostPort(addr) - parsedPort, _ := strconv.ParseInt(portStr, 10, 64) - return host, int(parsedPort) - } - logger := loggerOrNop(opts.Logger) // We namespace the pool to improve debugging, logger = logger.With( zap.String("clientId", uuid.NewString()[:8]), ) - remoteHostName, remotePort := parseHostPort(config.Address) - durationMetric, err := meter.Float64Histogram("db.client.operation.duration", metric.WithExplicitBucketBoundaries(0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10)) if err != nil { @@ -154,8 +139,7 @@ func NewKvClient(ctx context.Context, config *KvClientConfig, opts *KvClientOpti kvCli := &kvClient{ currentConfig: *config, - remoteHostName: remoteHostName, - remotePort: remotePort, + remoteHostname: hostnameFromAddrStr(config.Address), logger: logger, closeHandler: opts.CloseHandler, durationMetric: durationMetric, @@ -244,12 +228,6 @@ func NewKvClient(ctx context.Context, config *KvClientConfig, opts *KvClientOpti kvCli.cli = opts.NewMemdxClient(memdxClientOpts) } - remoteHost, _ := parseHostPort(kvCli.cli.RemoteAddr()) - localHost, localPort := parseHostPort(kvCli.cli.LocalAddr()) - kvCli.remoteHost = remoteHost - kvCli.localHost = localHost - kvCli.localPort = localPort - if shouldBootstrap { if bootstrapSelectBucket != nil { kvCli.selectedBucket.Store(ptr.To(bootstrapSelectBucket.BucketName)) @@ -368,12 +346,20 @@ func (c *kvClient) LoadFactor() float64 { return (float64)(atomic.LoadUint64(&c.pendingOperations)) } -func (c *kvClient) RemoteHostPort() (string, string, int) { - return c.remoteHostName, c.remoteHost, c.remotePort +func (c *kvClient) RemoteHostname() string { + return c.remoteHostname +} + +func (c *kvClient) RemoteAddr() net.Addr { + return c.cli.RemoteAddr() +} + +func (c *kvClient) LocalAddr() net.Addr { + return c.cli.LocalAddr() } -func (c *kvClient) LocalHostPort() (string, int) { - return c.localHost, c.localPort +func (c *kvClient) Dispatch(packet *memdx.Packet, cb memdx.DispatchCallback) (memdx.PendingOp, error) { + return c.cli.Dispatch(packet, cb) } func (c *kvClient) SelectedBucket() string { diff --git a/kvclient_ops.go b/kvclient_ops.go index 30897cdc..f5704bdf 100644 --- a/kvclient_ops.go +++ b/kvclient_ops.go @@ -68,8 +68,8 @@ func kvClient_SimpleCall[Encoder any, ReqT memdx.OpRequest, RespT memdx.OpRespon req ReqT, ) (RespT, error) { bucketName := c.SelectedBucket() - localHost, localPort := c.LocalHostPort() - _, remoteHost, remotePort := c.RemoteHostPort() + localHost, localPort := hostPortFromNetAddr(c.LocalAddr()) + remoteHost, remotePort := hostPortFromNetAddr(c.RemoteAddr()) stime := time.Now() diff --git a/kvclient_test.go b/kvclient_test.go index ba4163ec..b672f9a1 100644 --- a/kvclient_test.go +++ b/kvclient_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "errors" + "net" "testing" "github.com/couchbase/gocbcorex/memdx" @@ -23,11 +24,7 @@ func (mpo memdxPendingOpMock) Cancel(err error) { func TestKvClientReconfigureBucketOverExistingBucket(t *testing.T) { logger, _ := zap.NewDevelopment() - memdxCli := &MemdxDispatcherCloserMock{ - DispatchFunc: nil, - RemoteAddrFunc: func() string { return "remote:1" }, - LocalAddrFunc: func() string { return "local:2" }, - } + memdxCli := &MemdxClientMock{} cli, err := NewKvClient(context.Background(), &KvClientConfig{ Address: "endpoint1", @@ -38,7 +35,7 @@ func TestKvClientReconfigureBucketOverExistingBucket(t *testing.T) { DisableErrorMap: true, }, &KvClientOptions{ Logger: logger, - NewMemdxClient: func(opts *memdx.ClientOptions) MemdxDispatcherCloser { + NewMemdxClient: func(opts *memdx.ClientOptions) MemdxClient { return memdxCli }, }) @@ -58,11 +55,7 @@ func TestKvClientReconfigureBucketOverExistingBucket(t *testing.T) { func TestKvClientReconfigureTLSConfig(t *testing.T) { logger, _ := zap.NewDevelopment() - memdxCli := &MemdxDispatcherCloserMock{ - DispatchFunc: nil, - RemoteAddrFunc: func() string { return "remote:1" }, - LocalAddrFunc: func() string { return "local:2" }, - } + memdxCli := &MemdxClientMock{} cli, err := NewKvClient(context.Background(), &KvClientConfig{ Address: "endpoint1", @@ -73,7 +66,7 @@ func TestKvClientReconfigureTLSConfig(t *testing.T) { DisableErrorMap: true, }, &KvClientOptions{ Logger: logger, - NewMemdxClient: func(opts *memdx.ClientOptions) MemdxDispatcherCloser { + NewMemdxClient: func(opts *memdx.ClientOptions) MemdxClient { return memdxCli }, }) @@ -93,11 +86,7 @@ func TestKvClientReconfigureTLSConfig(t *testing.T) { func TestKvClientReconfigureUsername(t *testing.T) { logger, _ := zap.NewDevelopment() - memdxCli := &MemdxDispatcherCloserMock{ - DispatchFunc: nil, - RemoteAddrFunc: func() string { return "remote:1" }, - LocalAddrFunc: func() string { return "local:2" }, - } + memdxCli := &MemdxClientMock{} cli, err := NewKvClient(context.Background(), &KvClientConfig{ Address: "endpoint1", @@ -108,7 +97,7 @@ func TestKvClientReconfigureUsername(t *testing.T) { DisableErrorMap: true, }, &KvClientOptions{ Logger: logger, - NewMemdxClient: func(opts *memdx.ClientOptions) MemdxDispatcherCloser { + NewMemdxClient: func(opts *memdx.ClientOptions) MemdxClient { return memdxCli }, }) @@ -128,11 +117,7 @@ func TestKvClientReconfigureUsername(t *testing.T) { func TestKvClientReconfigurePassword(t *testing.T) { logger, _ := zap.NewDevelopment() - memdxCli := &MemdxDispatcherCloserMock{ - DispatchFunc: nil, - RemoteAddrFunc: func() string { return "remote:1" }, - LocalAddrFunc: func() string { return "local:2" }, - } + memdxCli := &MemdxClientMock{} cli, err := NewKvClient(context.Background(), &KvClientConfig{ Address: "endpoint1", @@ -143,7 +128,7 @@ func TestKvClientReconfigurePassword(t *testing.T) { DisableErrorMap: true, }, &KvClientOptions{ Logger: logger, - NewMemdxClient: func(opts *memdx.ClientOptions) MemdxDispatcherCloser { + NewMemdxClient: func(opts *memdx.ClientOptions) MemdxClient { return memdxCli }, }) @@ -163,11 +148,7 @@ func TestKvClientReconfigurePassword(t *testing.T) { func TestKvClientReconfigureAddress(t *testing.T) { logger, _ := zap.NewDevelopment() - memdxCli := &MemdxDispatcherCloserMock{ - DispatchFunc: nil, - RemoteAddrFunc: func() string { return "remote:1" }, - LocalAddrFunc: func() string { return "local:2" }, - } + memdxCli := &MemdxClientMock{} cli, err := NewKvClient(context.Background(), &KvClientConfig{ Address: "endpoint1", @@ -178,7 +159,7 @@ func TestKvClientReconfigureAddress(t *testing.T) { DisableErrorMap: true, }, &KvClientOptions{ Logger: logger, - NewMemdxClient: func(opts *memdx.ClientOptions) MemdxDispatcherCloser { + NewMemdxClient: func(opts *memdx.ClientOptions) MemdxClient { return memdxCli }, }) @@ -199,12 +180,10 @@ func TestKvClientReconfigureAddress(t *testing.T) { func TestKvClientOrphanResponseHandler(t *testing.T) { logger, _ := zap.NewDevelopment() - memdxCli := &MemdxDispatcherCloserMock{ + memdxCli := &MemdxClientMock{ DispatchFunc: func(packet *memdx.Packet, dispatchCallback memdx.DispatchCallback) (memdx.PendingOp, error) { return memdxPendingOpMock{}, nil }, - RemoteAddrFunc: func() string { return "remote:1" }, - LocalAddrFunc: func() string { return "local:2" }, } cli, err := NewKvClient(context.Background(), &KvClientConfig{ @@ -216,7 +195,7 @@ func TestKvClientOrphanResponseHandler(t *testing.T) { DisableErrorMap: true, }, &KvClientOptions{ Logger: logger, - NewMemdxClient: func(opts *memdx.ClientOptions) MemdxDispatcherCloser { + NewMemdxClient: func(opts *memdx.ClientOptions) MemdxClient { return memdxCli }, }) @@ -228,12 +207,10 @@ func TestKvClientOrphanResponseHandler(t *testing.T) { func TestKvClientConnCloseHandlerDefault(t *testing.T) { logger, _ := zap.NewDevelopment() - memdxCli := &MemdxDispatcherCloserMock{ + memdxCli := &MemdxClientMock{ DispatchFunc: func(packet *memdx.Packet, dispatchCallback memdx.DispatchCallback) (memdx.PendingOp, error) { return memdxPendingOpMock{}, nil }, - RemoteAddrFunc: func() string { return "remote:1" }, - LocalAddrFunc: func() string { return "local:2" }, } cli, err := NewKvClient(context.Background(), &KvClientConfig{ @@ -245,7 +222,7 @@ func TestKvClientConnCloseHandlerDefault(t *testing.T) { DisableErrorMap: true, }, &KvClientOptions{ Logger: logger, - NewMemdxClient: func(opts *memdx.ClientOptions) MemdxDispatcherCloser { + NewMemdxClient: func(opts *memdx.ClientOptions) MemdxClient { return memdxCli }, }) @@ -258,12 +235,10 @@ func TestKvClientConnCloseHandlerDefault(t *testing.T) { func TestKvClientConnCloseHandlerCallsUpstream(t *testing.T) { logger, _ := zap.NewDevelopment() - memdxCli := &MemdxDispatcherCloserMock{ + memdxCli := &MemdxClientMock{ DispatchFunc: func(packet *memdx.Packet, dispatchCallback memdx.DispatchCallback) (memdx.PendingOp, error) { return memdxPendingOpMock{}, nil }, - RemoteAddrFunc: func() string { return "remote:1" }, - LocalAddrFunc: func() string { return "local:2" }, } var closedCli KvClient @@ -277,7 +252,7 @@ func TestKvClientConnCloseHandlerCallsUpstream(t *testing.T) { DisableErrorMap: true, }, &KvClientOptions{ Logger: logger, - NewMemdxClient: func(opts *memdx.ClientOptions) MemdxDispatcherCloser { + NewMemdxClient: func(opts *memdx.ClientOptions) MemdxClient { return memdxCli }, CloseHandler: func(client KvClient, err error) { @@ -296,12 +271,12 @@ func TestKvClientConnCloseHandlerCallsUpstream(t *testing.T) { func TestKvClientWrapsDispatchError(t *testing.T) { logger, _ := zap.NewDevelopment() - memdxCli := &MemdxDispatcherCloserMock{ + memdxCli := &MemdxClientMock{ DispatchFunc: func(packet *memdx.Packet, dispatchCallback memdx.DispatchCallback) (memdx.PendingOp, error) { return nil, memdx.ErrDispatch }, - RemoteAddrFunc: func() string { return "remote:1" }, - LocalAddrFunc: func() string { return "local:2" }, + LocalAddrFunc: func() net.Addr { return &net.TCPAddr{} }, + RemoteAddrFunc: func() net.Addr { return &net.TCPAddr{} }, } cli, err := NewKvClient(context.Background(), &KvClientConfig{ @@ -313,7 +288,7 @@ func TestKvClientWrapsDispatchError(t *testing.T) { DisableErrorMap: true, }, &KvClientOptions{ Logger: logger, - NewMemdxClient: func(opts *memdx.ClientOptions) MemdxDispatcherCloser { + NewMemdxClient: func(opts *memdx.ClientOptions) MemdxClient { return memdxCli }, }) @@ -329,12 +304,12 @@ func TestKvClientWrapsDispatchError(t *testing.T) { func TestKvClientDoesNotWrapNonDispatchError(t *testing.T) { logger, _ := zap.NewDevelopment() - memdxCli := &MemdxDispatcherCloserMock{ + memdxCli := &MemdxClientMock{ DispatchFunc: func(packet *memdx.Packet, dispatchCallback memdx.DispatchCallback) (memdx.PendingOp, error) { return nil, memdx.ErrProtocol }, - RemoteAddrFunc: func() string { return "remote:1" }, - LocalAddrFunc: func() string { return "local:2" }, + LocalAddrFunc: func() net.Addr { return &net.TCPAddr{} }, + RemoteAddrFunc: func() net.Addr { return &net.TCPAddr{} }, } cli, err := NewKvClient(context.Background(), &KvClientConfig{ @@ -346,7 +321,7 @@ func TestKvClientDoesNotWrapNonDispatchError(t *testing.T) { DisableErrorMap: true, }, &KvClientOptions{ Logger: logger, - NewMemdxClient: func(opts *memdx.ClientOptions) MemdxDispatcherCloser { + NewMemdxClient: func(opts *memdx.ClientOptions) MemdxClient { return memdxCli }, }) diff --git a/kvclientmanager.go b/kvclientmanager.go index c175f90a..0c66fe82 100644 --- a/kvclientmanager.go +++ b/kvclientmanager.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "net" "sync" "sync/atomic" @@ -285,6 +286,22 @@ func (m *kvClientManager) Close() error { return nil } +type KvClientError struct { + Cause error + RemoteHostname string + RemoteAddr net.Addr + LocalAddr net.Addr +} + +func (e *KvClientError) Error() string { + return fmt.Sprintf("kv client error: %s (remote-host: %s, remote-addr: %s, local-addr: %s)", + e.Cause, e.RemoteHostname, e.RemoteAddr, e.LocalAddr) +} + +func (e *KvClientError) Unwrap() error { + return e.Cause +} + func OrchestrateMemdClient[RespT any]( ctx context.Context, cm KvClientManager, @@ -308,7 +325,12 @@ func OrchestrateMemdClient[RespT any]( continue } - return res, err + return res, &KvClientError{ + Cause: err, + RemoteHostname: cli.RemoteHostname(), + RemoteAddr: cli.RemoteAddr(), + LocalAddr: cli.LocalAddr(), + } } return res, nil @@ -337,7 +359,12 @@ func OrchestrateRandomMemdClient[RespT any]( continue } - return res, err + return res, &KvClientError{ + Cause: err, + RemoteHostname: cli.RemoteHostname(), + RemoteAddr: cli.RemoteAddr(), + LocalAddr: cli.LocalAddr(), + } } return res, nil diff --git a/kvclientpool.go b/kvclientpool.go index f664258b..7ecf59d1 100644 --- a/kvclientpool.go +++ b/kvclientpool.go @@ -3,7 +3,6 @@ package gocbcorex import ( "context" "errors" - "fmt" "sync" "sync/atomic" "time" @@ -61,6 +60,7 @@ type kvClientPool struct { lock sync.Mutex config KvClientPoolConfig + poolName string connectErr error connectErrTime time.Time closeSig chan struct{} @@ -121,9 +121,12 @@ func NewKvClientPool(config *KvClientPoolConfig, opts *KvClientPoolOptions) (*kv logger.Warn("failed to create connection failure metric") } + poolName := config.ClientConfig.Address + "/" + config.ClientConfig.SelectedBucket + p := &kvClientPool{ logger: logger, config: *config, + poolName: poolName, connectTimeout: connectTimeout, connectErrThrottlePeriod: connectErrThrottlePeriod, @@ -238,6 +241,7 @@ func (p *kvClientPool) startNewClientLocked() <-chan struct{} { } p.addPendingClientLocked(pendingClient) + poolName := p.poolName clientConfig := p.config.ClientConfig // create the goroutine to actually create the client @@ -280,7 +284,6 @@ func (p *kvClientPool) startNewClientLocked() <-chan struct{} { connDTime := connETime.Sub(connStime) connDTimeSecs := float64(connDTime) / float64(time.Second) - poolName := clientConfig.Address + "/" + clientConfig.SelectedBucket if err != nil { p.connFailureMetric.Add(context.Background(), 1, metric.WithAttributes( semconv.DBSystemCouchbase, @@ -422,17 +425,14 @@ func (p *kvClientPool) rebuildActiveClientsLocked() { } func (p *kvClientPool) handleClientClosed(client KvClient, err error) { - host, _, port := client.RemoteHostPort() - poolName := fmt.Sprintf("%s:%d", host, port) + p.lock.Lock() + defer p.lock.Unlock() p.connCountMetric.Record(context.Background(), -1, metric.WithAttributes( semconv.DBSystemCouchbase, - semconv.DBClientConnectionsPoolName(poolName), + semconv.DBClientConnectionsPoolName(p.poolName), )) - p.lock.Lock() - defer p.lock.Unlock() - if !p.removeCurrentClientLocked(client) { // If the client is no longer current anyways, we have nothing to do... // We can get here when we Close a client so this prevents us from taking action there too. @@ -453,11 +453,11 @@ func (p *kvClientPool) Reconfigure(config *KvClientPoolConfig, cb func(error)) e p.logger.Debug("reconfiguring") - oldConfig := p.config.ClientConfig - oldPoolName := oldConfig.Address + "/" + oldConfig.SelectedBucket - newPoolName := oldConfig.Address + "/" + config.ClientConfig.SelectedBucket + oldPoolName := p.poolName + newPoolName := config.ClientConfig.Address + "/" + config.ClientConfig.SelectedBucket p.config = *config + p.poolName = newPoolName numClientsReconfiguring := int64(len(p.currentClients)) markClientReconfigureDone := func() { @@ -473,7 +473,6 @@ func (p *kvClientPool) Reconfigure(config *KvClientPoolConfig, cb func(error)) e clientsToReconfigure := make([]KvClient, len(p.currentClients)) copy(clientsToReconfigure, p.currentClients) - numReconfigured := 0 for _, client := range clientsToReconfigure { client := client @@ -505,24 +504,24 @@ func (p *kvClientPool) Reconfigure(config *KvClientPoolConfig, cb func(error)) e // reconfiguring is successful up until this point, so it can stay in the // current list of clients. it may be moved later by the Reconfigure callback. - numReconfigured++ + + // if the pool name changed as part of this reconfigure, move it. + if oldPoolName != newPoolName { + p.connCountMetric.Record(context.Background(), -1, metric.WithAttributes( + semconv.DBSystemCouchbase, + semconv.DBClientConnectionsPoolName(oldPoolName), + )) + + p.connCountMetric.Record(context.Background(), 1, metric.WithAttributes( + semconv.DBSystemCouchbase, + semconv.DBClientConnectionsPoolName(newPoolName), + )) + } } p.rebuildActiveClientsLocked() p.checkConnectionsLocked() - if oldPoolName != newPoolName { - p.connCountMetric.Record(context.Background(), -int64(numReconfigured), metric.WithAttributes( - semconv.DBSystemCouchbase, - semconv.DBClientConnectionsPoolName(oldPoolName), - )) - - p.connCountMetric.Record(context.Background(), int64(numReconfigured), metric.WithAttributes( - semconv.DBSystemCouchbase, - semconv.DBClientConnectionsPoolName(newPoolName), - )) - } - return nil } diff --git a/memdx/client.go b/memdx/client.go index 0bfbcdbe..cd167001 100644 --- a/memdx/client.go +++ b/memdx/client.go @@ -2,6 +2,7 @@ package memdx import ( "errors" + "net" "os" "sync" @@ -246,10 +247,10 @@ func (c *Client) Dispatch(req *Packet, handler DispatchCallback) (PendingOp, err }, nil } -func (c *Client) LocalAddr() string { +func (c *Client) LocalAddr() net.Addr { return c.conn.LocalAddr() } -func (c *Client) RemoteAddr() string { +func (c *Client) RemoteAddr() net.Addr { return c.conn.RemoteAddr() } diff --git a/memdx/conn.go b/memdx/conn.go index 817116e2..eae94404 100644 --- a/memdx/conn.go +++ b/memdx/conn.go @@ -91,10 +91,10 @@ func (c *Conn) Close() error { return c.conn.Close() } -func (c *Conn) LocalAddr() string { - return c.conn.LocalAddr().String() +func (c *Conn) LocalAddr() net.Addr { + return c.conn.LocalAddr() } -func (c *Conn) RemoteAddr() string { - return c.conn.RemoteAddr().String() +func (c *Conn) RemoteAddr() net.Addr { + return c.conn.RemoteAddr() } diff --git a/memdx/dispatcher.go b/memdx/dispatcher.go index caeed0a4..a75eabaf 100644 --- a/memdx/dispatcher.go +++ b/memdx/dispatcher.go @@ -8,6 +8,4 @@ type DispatchCallback func(*Packet, error) bool type Dispatcher interface { Dispatch(*Packet, DispatchCallback) (PendingOp, error) - LocalAddr() string - RemoteAddr() string } diff --git a/memdx/errors.go b/memdx/errors.go index 7b56ef0f..067e6537 100644 --- a/memdx/errors.go +++ b/memdx/errors.go @@ -114,22 +114,18 @@ func (e invalidArgError) Unwrap() error { } type ServerError struct { - OpCode OpCode - Status Status - Cause error - DispatchedTo string - DispatchedFrom string - Opaque uint32 + OpCode OpCode + Status Status + Cause error + Opaque uint32 } func (e ServerError) Error() string { return fmt.Sprintf( - "server error: %s, status: 0x%x, opcode: %s, dispatched from: %s, dispatched to: %s, opaque: %d", + "server error: %s, status: 0x%x, opcode: %s, opaque: %d", e.Cause, uint16(e.Status), e.OpCode.String(), - e.DispatchedFrom, - e.DispatchedTo, e.Opaque, ) } diff --git a/memdx/errors_test.go b/memdx/errors_test.go index 37635837..7a4a5f10 100644 --- a/memdx/errors_test.go +++ b/memdx/errors_test.go @@ -14,10 +14,8 @@ func TestServerErrorWithContextText(t *testing.T) { err := &ServerErrorWithContext{ Cause: ServerError{ - Cause: errors.New("invalid"), - DispatchedTo: "", - DispatchedFrom: "", - Opaque: 1, + Cause: errors.New("invalid"), + Opaque: 1, }, ContextJson: text, } diff --git a/memdx/harness_int_test.go b/memdx/harness_int_test.go index 777527c7..8c704324 100644 --- a/memdx/harness_int_test.go +++ b/memdx/harness_int_test.go @@ -2,9 +2,10 @@ package memdx_test import ( "context" - "encoding/json" + "net" "testing" + "github.com/couchbase/gocbcorex/contrib/cbconfig" "github.com/couchbase/gocbcorex/memdx" "github.com/couchbase/gocbcorex/testutilsint" "github.com/stretchr/testify/require" @@ -22,15 +23,9 @@ func createTestClient(t *testing.T) *memdx.Client { // As we tie commands to a vbucket we have to ensure that the client we're returning is // actually connected to the right node. - type vbucketServerMap struct { - ServerList []string `json:"serverList"` - VBucketMap [][]int `json:"vBucketMap,omitempty"` - } - type cbConfig struct { - VBucketServerMap vbucketServerMap `json:"vBucketServerMap"` - } - var config cbConfig - require.NoError(t, json.Unmarshal(resp.ClusterConfig.Config, &config)) + nodeHostname, _, _ := net.SplitHostPort(testAddress) + config, err := cbconfig.ParseTerseConfig(resp.ClusterConfig.Config, nodeHostname) + require.NoError(t, err) // This is all a bit rough and can be improved, in time. vbIdx := config.VBucketServerMap.VBucketMap[defaultTestVbucketID][0] diff --git a/memdx/ops_core.go b/memdx/ops_core.go index d5055ba6..bed206d2 100644 --- a/memdx/ops_core.go +++ b/memdx/ops_core.go @@ -1,10 +1,8 @@ package memdx import ( - "bytes" "encoding/binary" "errors" - "net" "strings" ) @@ -27,14 +25,12 @@ type CoreResponseMeta struct { type OpsCore struct { } -func (o OpsCore) decodeErrorContext(resp *Packet, err error, dispatchedTo string, dispatchedFrom string) error { +func (o OpsCore) decodeErrorContext(resp *Packet, err error) error { baseCause := &ServerError{ - OpCode: resp.OpCode, - Status: resp.Status, - Cause: err, - DispatchedTo: dispatchedTo, - DispatchedFrom: dispatchedFrom, - Opaque: resp.Opaque, + OpCode: resp.OpCode, + Status: resp.Status, + Cause: err, + Opaque: resp.Opaque, } if len(resp.Value) > 0 { @@ -54,7 +50,7 @@ func (o OpsCore) decodeErrorContext(resp *Packet, err error, dispatchedTo string return baseCause } -func (o OpsCore) decodeError(resp *Packet, dispatchedTo string, dispatchedFrom string) error { +func (o OpsCore) decodeError(resp *Packet) error { var err error if resp.Status == StatusNotMyVBucket { err = ErrNotMyVbucket @@ -64,7 +60,7 @@ func (o OpsCore) decodeError(resp *Packet, dispatchedTo string, dispatchedFrom s err = errors.New("unexpected status: " + resp.Status.String()) } - return o.decodeErrorContext(resp, err, dispatchedTo, dispatchedFrom) + return o.decodeErrorContext(resp, err) } type HelloRequest struct { @@ -98,7 +94,7 @@ func (o OpsCore) Hello(d Dispatcher, req *HelloRequest, cb func(*HelloResponse, } if resp.Status != StatusSuccess { - cb(nil, o.decodeError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, o.decodeError(resp)) return false } @@ -147,7 +143,7 @@ func (o OpsCore) GetErrorMap(d Dispatcher, req *GetErrorMapRequest, cb func(*Get } if resp.Status != StatusSuccess { - cb(nil, o.decodeError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, o.decodeError(resp)) return false } @@ -189,20 +185,12 @@ func (o OpsCore) GetClusterConfig(d Dispatcher, req *GetClusterConfigRequest, cb } if resp.Status != StatusSuccess { - cb(nil, o.decodeError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, o.decodeError(resp)) return false } - host, _, _ := net.SplitHostPort(d.RemoteAddr()) - if host == "" { - cb(nil, errors.New("failed to identify memd hostname for $HOST replacement")) - return false - } - - outValue := bytes.ReplaceAll(resp.Value, []byte("$HOST"), []byte(host)) - cb(&GetClusterConfigResponse{ - Config: outValue, + Config: resp.Value, }, nil) return false }) @@ -242,7 +230,7 @@ func (o OpsCore) SelectBucket(d Dispatcher, req *SelectBucketRequest, cb func(*S } if resp.Status != StatusSuccess { - cb(nil, o.decodeError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, o.decodeError(resp)) return false } @@ -273,7 +261,7 @@ func (o OpsCore) SASLListMechs(d Dispatcher, req *SASLListMechsRequest, cb func( } if resp.Status != StatusSuccess { - cb(nil, o.decodeError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, o.decodeError(resp)) return false } @@ -329,7 +317,7 @@ func (o OpsCore) SASLAuth(d Dispatcher, req *SASLAuthRequest, cb func(*SASLAuthR } if resp.Status != StatusSuccess { - cb(nil, o.decodeError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, o.decodeError(resp)) return false } @@ -379,7 +367,7 @@ func (o OpsCore) SASLStep(d Dispatcher, req *SASLStepRequest, cb func(*SASLStepR } if resp.Status != StatusSuccess { - cb(nil, o.decodeError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, o.decodeError(resp)) return false } diff --git a/memdx/ops_core_errors_test.go b/memdx/ops_core_errors_test.go index 277305a2..71ca9bcd 100644 --- a/memdx/ops_core_errors_test.go +++ b/memdx/ops_core_errors_test.go @@ -15,9 +15,6 @@ func TestOpsCoreDecodeError(t *testing.T) { ExpectedError error } - dispatchedTo := "endpoint1" - dispatchedFrom := "local1" - tests := []test{ { Name: "NotMyVbucket", @@ -30,12 +27,10 @@ func TestOpsCoreDecodeError(t *testing.T) { }, ExpectedError: &ServerErrorWithConfig{ Cause: ServerError{ - OpCode: OpCodeReplace, - Status: StatusNotMyVBucket, - Cause: ErrNotMyVbucket, - DispatchedTo: dispatchedTo, - DispatchedFrom: dispatchedFrom, - Opaque: 0x34, + OpCode: OpCodeReplace, + Status: StatusNotMyVBucket, + Cause: ErrNotMyVbucket, + Opaque: 0x34, }, ConfigJson: []byte("impretendingtobeaconfig"), }, @@ -44,7 +39,7 @@ func TestOpsCoreDecodeError(t *testing.T) { for _, test := range tests { t.Run(test.Name, func(tt *testing.T) { - err := OpsCore{}.decodeError(test.Pkt, dispatchedTo, dispatchedFrom) + err := OpsCore{}.decodeError(test.Pkt) assert.Equal(t, test.ExpectedError, err) }) diff --git a/memdx/ops_crud.go b/memdx/ops_crud.go index 82c9cc51..72e97dc9 100644 --- a/memdx/ops_crud.go +++ b/memdx/ops_crud.go @@ -139,13 +139,13 @@ func (o OpsCrud) decodeCommonStatus(status Status) error { return nil } } -func (o OpsCrud) decodeCommonError(resp *Packet, dispatchedTo string, dispatchedFrom string) error { +func (o OpsCrud) decodeCommonError(resp *Packet) error { err := OpsCrud{}.decodeCommonStatus(resp.Status) if err != nil { return err } - return OpsCore{}.decodeError(resp, dispatchedTo, dispatchedFrom) + return OpsCore{}.decodeError(resp) } type GetRequest struct { @@ -194,7 +194,7 @@ func (o OpsCrud) Get(d Dispatcher, req *GetRequest, cb func(*GetResponse, error) } if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -278,7 +278,7 @@ func (o OpsCrud) GetAndTouch(d Dispatcher, req *GetAndTouchRequest, cb func(*Get } if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -354,7 +354,7 @@ func (o OpsCrud) GetReplica(d Dispatcher, req *GetReplicaRequest, cb func(*GetRe } if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -438,7 +438,7 @@ func (o OpsCrud) GetAndLock(d Dispatcher, req *GetAndLockRequest, cb func(*GetAn } if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -514,7 +514,7 @@ func (o OpsCrud) GetRandom(d Dispatcher, req *GetRandomRequest, cb func(*GetRand } if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -625,7 +625,7 @@ func (o OpsCrud) Set(d Dispatcher, req *SetRequest, cb func(*SetResponse, error) } if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -706,7 +706,7 @@ func (o OpsCrud) Unlock(d Dispatcher, req *UnlockRequest, cb func(*UnlockRespons } if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -786,7 +786,7 @@ func (o OpsCrud) Touch(d Dispatcher, req *TouchRequest, cb func(*TouchResponse, } if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -878,7 +878,7 @@ func (o OpsCrud) Delete(d Dispatcher, req *DeleteRequest, cb func(*DeleteRespons } if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -981,7 +981,7 @@ func (o OpsCrud) Add(d Dispatcher, req *AddRequest, cb func(*AddResponse, error) } if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -1097,7 +1097,7 @@ func (o OpsCrud) Replace(d Dispatcher, req *ReplaceRequest, cb func(*ReplaceResp } if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -1201,7 +1201,7 @@ func (o OpsCrud) Append(d Dispatcher, req *AppendRequest, cb func(*AppendRespons } if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -1305,7 +1305,7 @@ func (o OpsCrud) Prepend(d Dispatcher, req *PrependRequest, cb func(*PrependResp } if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -1413,7 +1413,7 @@ func (o OpsCrud) Increment(d Dispatcher, req *IncrementRequest, cb func(*Increme } if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -1528,7 +1528,7 @@ func (o OpsCrud) Decrement(d Dispatcher, req *DecrementRequest, cb func(*Decreme } if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -1620,7 +1620,7 @@ func (o OpsCrud) GetMeta(d Dispatcher, req *GetMetaRequest, cb func(*GetMetaResp } if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -1719,7 +1719,7 @@ func (o OpsCrud) SetMeta(d Dispatcher, req *SetMetaRequest, cb func(*SetMetaResp } if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -1810,7 +1810,7 @@ func (o OpsCrud) DeleteMeta(d Dispatcher, req *DeleteMetaRequest, cb func(*Delet cb(nil, ErrDocNotFound) return false } else if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -1936,7 +1936,7 @@ func (o OpsCrud) LookupIn(d Dispatcher, req *LookupInRequest, cb func(*LookupInR docIsDeleted = true // considered a success still } else if resp.Status != StatusSuccess && resp.Status != StatusSubDocMultiPathFailure { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -2216,7 +2216,7 @@ func (o OpsCrud) MutateIn(d Dispatcher, req *MutateInRequest, cb func(*MutateInR docIsDeleted = true // considered a success still } else if resp.Status != StatusSuccess && resp.Status != StatusSubDocMultiPathFailure { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } diff --git a/memdx/ops_crud_rangescan.go b/memdx/ops_crud_rangescan.go index 3148bf9f..0e34fa44 100644 --- a/memdx/ops_crud_rangescan.go +++ b/memdx/ops_crud_rangescan.go @@ -45,7 +45,7 @@ func (o OpsCrud) RangeScanCreate(d Dispatcher, req *RangeScanCreateRequest, cb f cb(nil, ErrRangeScanVbUuidMismatch) return false } else if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -98,7 +98,7 @@ func (o OpsCrud) RangeScanContinue(d Dispatcher, req *RangeScanContinueRequest, return false } else if resp.Status != StatusSuccess && resp.Status != StatusRangeScanMore && resp.Status != StatusRangeScanComplete { - actionCb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + actionCb(nil, OpsCrud{}.decodeCommonError(resp)) return false } @@ -169,7 +169,7 @@ func (o OpsCrud) RangeScanCancel(d Dispatcher, req *RangeScanCancelRequest, cb f cb(nil, ErrRangeScanNotFound) return false } else if resp.Status != StatusSuccess { - cb(nil, OpsCrud{}.decodeCommonError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCrud{}.decodeCommonError(resp)) return false } diff --git a/memdx/ops_utils.go b/memdx/ops_utils.go index 0a5dd075..fbe04631 100644 --- a/memdx/ops_utils.go +++ b/memdx/ops_utils.go @@ -70,7 +70,7 @@ func (o OpsUtils) Stats(d Dispatcher, req *StatsRequest, cb func(*StatsResponse, } if resp.Status != StatusSuccess { - cb(nil, OpsCore{}.decodeError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCore{}.decodeError(resp)) return false } @@ -122,13 +122,13 @@ func (o OpsUtils) GetCollectionID(d Dispatcher, req *GetCollectionIDRequest, cb if resp.Status == StatusScopeUnknown { cb(nil, &ResourceError{ - Cause: OpsCore{}.decodeErrorContext(resp, ErrUnknownScopeName, "", ""), + Cause: OpsCore{}.decodeErrorContext(resp, ErrUnknownScopeName), ScopeName: req.ScopeName, }) return false } else if resp.Status == StatusCollectionUnknown { cb(nil, &ResourceError{ - Cause: OpsCore{}.decodeErrorContext(resp, ErrUnknownCollectionName, "", ""), + Cause: OpsCore{}.decodeErrorContext(resp, ErrUnknownCollectionName), ScopeName: req.ScopeName, CollectionName: req.CollectionName, }) @@ -136,7 +136,7 @@ func (o OpsUtils) GetCollectionID(d Dispatcher, req *GetCollectionIDRequest, cb } if resp.Status != StatusSuccess { - cb(nil, OpsCore{}.decodeError(resp, d.RemoteAddr(), d.LocalAddr())) + cb(nil, OpsCore{}.decodeError(resp)) return false } diff --git a/memdxclient.go b/memdxclient.go new file mode 100644 index 00000000..74e8bd1c --- /dev/null +++ b/memdxclient.go @@ -0,0 +1,16 @@ +package gocbcorex + +import ( + "net" + + "github.com/couchbase/gocbcorex/memdx" +) + +type MemdxClient interface { + memdx.Dispatcher + LocalAddr() net.Addr + RemoteAddr() net.Addr + Close() error +} + +var _ MemdxClient = (*memdx.Client)(nil) diff --git a/mock_kvclient_test.go b/mock_kvclient_test.go index 679f2aca..735fcce2 100644 --- a/mock_kvclient_test.go +++ b/mock_kvclient_test.go @@ -5,6 +5,7 @@ package gocbcorex import ( "context" + "net" "sync" "github.com/couchbase/gocbcorex/memdx" @@ -38,6 +39,9 @@ var _ KvClient = &KvClientMock{} // DeleteMetaFunc: func(ctx context.Context, req *memdx.DeleteMetaRequest) (*memdx.DeleteMetaResponse, error) { // panic("mock out the DeleteMeta method") // }, +// DispatchFunc: func(packet *memdx.Packet, dispatchCallback memdx.DispatchCallback) (memdx.PendingOp, error) { +// panic("mock out the Dispatch method") +// }, // GetFunc: func(ctx context.Context, req *memdx.GetRequest) (*memdx.GetResponse, error) { // panic("mock out the Get method") // }, @@ -71,8 +75,8 @@ var _ KvClient = &KvClientMock{} // LoadFactorFunc: func() float64 { // panic("mock out the LoadFactor method") // }, -// LocalHostPortFunc: func() (string, int) { -// panic("mock out the LocalHostPort method") +// LocalAddrFunc: func() net.Addr { +// panic("mock out the LocalAddr method") // }, // LookupInFunc: func(ctx context.Context, req *memdx.LookupInRequest) (*memdx.LookupInResponse, error) { // panic("mock out the LookupIn method") @@ -95,8 +99,11 @@ var _ KvClient = &KvClientMock{} // ReconfigureFunc: func(config *KvClientConfig, cb func(error)) error { // panic("mock out the Reconfigure method") // }, -// RemoteHostPortFunc: func() (string, string, int) { -// panic("mock out the RemoteHostPort method") +// RemoteAddrFunc: func() net.Addr { +// panic("mock out the RemoteAddr method") +// }, +// RemoteHostnameFunc: func() string { +// panic("mock out the RemoteHostname method") // }, // ReplaceFunc: func(ctx context.Context, req *memdx.ReplaceRequest) (*memdx.ReplaceResponse, error) { // panic("mock out the Replace method") @@ -138,6 +145,9 @@ type KvClientMock struct { // DeleteMetaFunc mocks the DeleteMeta method. DeleteMetaFunc func(ctx context.Context, req *memdx.DeleteMetaRequest) (*memdx.DeleteMetaResponse, error) + // DispatchFunc mocks the Dispatch method. + DispatchFunc func(packet *memdx.Packet, dispatchCallback memdx.DispatchCallback) (memdx.PendingOp, error) + // GetFunc mocks the Get method. GetFunc func(ctx context.Context, req *memdx.GetRequest) (*memdx.GetResponse, error) @@ -171,8 +181,8 @@ type KvClientMock struct { // LoadFactorFunc mocks the LoadFactor method. LoadFactorFunc func() float64 - // LocalHostPortFunc mocks the LocalHostPort method. - LocalHostPortFunc func() (string, int) + // LocalAddrFunc mocks the LocalAddr method. + LocalAddrFunc func() net.Addr // LookupInFunc mocks the LookupIn method. LookupInFunc func(ctx context.Context, req *memdx.LookupInRequest) (*memdx.LookupInResponse, error) @@ -195,8 +205,11 @@ type KvClientMock struct { // ReconfigureFunc mocks the Reconfigure method. ReconfigureFunc func(config *KvClientConfig, cb func(error)) error - // RemoteHostPortFunc mocks the RemoteHostPort method. - RemoteHostPortFunc func() (string, string, int) + // RemoteAddrFunc mocks the RemoteAddr method. + RemoteAddrFunc func() net.Addr + + // RemoteHostnameFunc mocks the RemoteHostname method. + RemoteHostnameFunc func() string // ReplaceFunc mocks the Replace method. ReplaceFunc func(ctx context.Context, req *memdx.ReplaceRequest) (*memdx.ReplaceResponse, error) @@ -253,6 +266,13 @@ type KvClientMock struct { // Req is the req argument value. Req *memdx.DeleteMetaRequest } + // Dispatch holds details about calls to the Dispatch method. + Dispatch []struct { + // Packet is the packet argument value. + Packet *memdx.Packet + // DispatchCallback is the dispatchCallback argument value. + DispatchCallback memdx.DispatchCallback + } // Get holds details about calls to the Get method. Get []struct { // Ctx is the ctx argument value. @@ -324,8 +344,8 @@ type KvClientMock struct { // LoadFactor holds details about calls to the LoadFactor method. LoadFactor []struct { } - // LocalHostPort holds details about calls to the LocalHostPort method. - LocalHostPort []struct { + // LocalAddr holds details about calls to the LocalAddr method. + LocalAddr []struct { } // LookupIn holds details about calls to the LookupIn method. LookupIn []struct { @@ -378,8 +398,11 @@ type KvClientMock struct { // Cb is the cb argument value. Cb func(error) } - // RemoteHostPort holds details about calls to the RemoteHostPort method. - RemoteHostPort []struct { + // RemoteAddr holds details about calls to the RemoteAddr method. + RemoteAddr []struct { + } + // RemoteHostname holds details about calls to the RemoteHostname method. + RemoteHostname []struct { } // Replace holds details about calls to the Replace method. Replace []struct { @@ -423,6 +446,7 @@ type KvClientMock struct { lockDecrement sync.RWMutex lockDelete sync.RWMutex lockDeleteMeta sync.RWMutex + lockDispatch sync.RWMutex lockGet sync.RWMutex lockGetAndLock sync.RWMutex lockGetAndTouch sync.RWMutex @@ -434,7 +458,7 @@ type KvClientMock struct { lockHasFeature sync.RWMutex lockIncrement sync.RWMutex lockLoadFactor sync.RWMutex - lockLocalHostPort sync.RWMutex + lockLocalAddr sync.RWMutex lockLookupIn sync.RWMutex lockMutateIn sync.RWMutex lockPrepend sync.RWMutex @@ -442,7 +466,8 @@ type KvClientMock struct { lockRangeScanContinue sync.RWMutex lockRangeScanCreate sync.RWMutex lockReconfigure sync.RWMutex - lockRemoteHostPort sync.RWMutex + lockRemoteAddr sync.RWMutex + lockRemoteHostname sync.RWMutex lockReplace sync.RWMutex lockSet sync.RWMutex lockSetMeta sync.RWMutex @@ -657,6 +682,42 @@ func (mock *KvClientMock) DeleteMetaCalls() []struct { return calls } +// Dispatch calls DispatchFunc. +func (mock *KvClientMock) Dispatch(packet *memdx.Packet, dispatchCallback memdx.DispatchCallback) (memdx.PendingOp, error) { + if mock.DispatchFunc == nil { + panic("KvClientMock.DispatchFunc: method is nil but KvClient.Dispatch was just called") + } + callInfo := struct { + Packet *memdx.Packet + DispatchCallback memdx.DispatchCallback + }{ + Packet: packet, + DispatchCallback: dispatchCallback, + } + mock.lockDispatch.Lock() + mock.calls.Dispatch = append(mock.calls.Dispatch, callInfo) + mock.lockDispatch.Unlock() + return mock.DispatchFunc(packet, dispatchCallback) +} + +// DispatchCalls gets all the calls that were made to Dispatch. +// Check the length with: +// +// len(mockedKvClient.DispatchCalls()) +func (mock *KvClientMock) DispatchCalls() []struct { + Packet *memdx.Packet + DispatchCallback memdx.DispatchCallback +} { + var calls []struct { + Packet *memdx.Packet + DispatchCallback memdx.DispatchCallback + } + mock.lockDispatch.RLock() + calls = mock.calls.Dispatch + mock.lockDispatch.RUnlock() + return calls +} + // Get calls GetFunc. func (mock *KvClientMock) Get(ctx context.Context, req *memdx.GetRequest) (*memdx.GetResponse, error) { if mock.GetFunc == nil { @@ -1040,30 +1101,30 @@ func (mock *KvClientMock) LoadFactorCalls() []struct { return calls } -// LocalHostPort calls LocalHostPortFunc. -func (mock *KvClientMock) LocalHostPort() (string, int) { - if mock.LocalHostPortFunc == nil { - panic("KvClientMock.LocalHostPortFunc: method is nil but KvClient.LocalHostPort was just called") +// LocalAddr calls LocalAddrFunc. +func (mock *KvClientMock) LocalAddr() net.Addr { + if mock.LocalAddrFunc == nil { + panic("KvClientMock.LocalAddrFunc: method is nil but KvClient.LocalAddr was just called") } callInfo := struct { }{} - mock.lockLocalHostPort.Lock() - mock.calls.LocalHostPort = append(mock.calls.LocalHostPort, callInfo) - mock.lockLocalHostPort.Unlock() - return mock.LocalHostPortFunc() + mock.lockLocalAddr.Lock() + mock.calls.LocalAddr = append(mock.calls.LocalAddr, callInfo) + mock.lockLocalAddr.Unlock() + return mock.LocalAddrFunc() } -// LocalHostPortCalls gets all the calls that were made to LocalHostPort. +// LocalAddrCalls gets all the calls that were made to LocalAddr. // Check the length with: // -// len(mockedKvClient.LocalHostPortCalls()) -func (mock *KvClientMock) LocalHostPortCalls() []struct { +// len(mockedKvClient.LocalAddrCalls()) +func (mock *KvClientMock) LocalAddrCalls() []struct { } { var calls []struct { } - mock.lockLocalHostPort.RLock() - calls = mock.calls.LocalHostPort - mock.lockLocalHostPort.RUnlock() + mock.lockLocalAddr.RLock() + calls = mock.calls.LocalAddr + mock.lockLocalAddr.RUnlock() return calls } @@ -1323,30 +1384,57 @@ func (mock *KvClientMock) ReconfigureCalls() []struct { return calls } -// RemoteHostPort calls RemoteHostPortFunc. -func (mock *KvClientMock) RemoteHostPort() (string, string, int) { - if mock.RemoteHostPortFunc == nil { - panic("KvClientMock.RemoteHostPortFunc: method is nil but KvClient.RemoteHostPort was just called") +// RemoteAddr calls RemoteAddrFunc. +func (mock *KvClientMock) RemoteAddr() net.Addr { + if mock.RemoteAddrFunc == nil { + panic("KvClientMock.RemoteAddrFunc: method is nil but KvClient.RemoteAddr was just called") + } + callInfo := struct { + }{} + mock.lockRemoteAddr.Lock() + mock.calls.RemoteAddr = append(mock.calls.RemoteAddr, callInfo) + mock.lockRemoteAddr.Unlock() + return mock.RemoteAddrFunc() +} + +// RemoteAddrCalls gets all the calls that were made to RemoteAddr. +// Check the length with: +// +// len(mockedKvClient.RemoteAddrCalls()) +func (mock *KvClientMock) RemoteAddrCalls() []struct { +} { + var calls []struct { + } + mock.lockRemoteAddr.RLock() + calls = mock.calls.RemoteAddr + mock.lockRemoteAddr.RUnlock() + return calls +} + +// RemoteHostname calls RemoteHostnameFunc. +func (mock *KvClientMock) RemoteHostname() string { + if mock.RemoteHostnameFunc == nil { + panic("KvClientMock.RemoteHostnameFunc: method is nil but KvClient.RemoteHostname was just called") } callInfo := struct { }{} - mock.lockRemoteHostPort.Lock() - mock.calls.RemoteHostPort = append(mock.calls.RemoteHostPort, callInfo) - mock.lockRemoteHostPort.Unlock() - return mock.RemoteHostPortFunc() + mock.lockRemoteHostname.Lock() + mock.calls.RemoteHostname = append(mock.calls.RemoteHostname, callInfo) + mock.lockRemoteHostname.Unlock() + return mock.RemoteHostnameFunc() } -// RemoteHostPortCalls gets all the calls that were made to RemoteHostPort. +// RemoteHostnameCalls gets all the calls that were made to RemoteHostname. // Check the length with: // -// len(mockedKvClient.RemoteHostPortCalls()) -func (mock *KvClientMock) RemoteHostPortCalls() []struct { +// len(mockedKvClient.RemoteHostnameCalls()) +func (mock *KvClientMock) RemoteHostnameCalls() []struct { } { var calls []struct { } - mock.lockRemoteHostPort.RLock() - calls = mock.calls.RemoteHostPort - mock.lockRemoteHostPort.RUnlock() + mock.lockRemoteHostname.RLock() + calls = mock.calls.RemoteHostname + mock.lockRemoteHostname.RUnlock() return calls } @@ -1530,35 +1618,35 @@ func (mock *KvClientMock) UnlockCalls() []struct { return calls } -// Ensure, that MemdxDispatcherCloserMock does implement MemdxDispatcherCloser. +// Ensure, that MemdxClientMock does implement MemdxClient. // If this is not the case, regenerate this file with moq. -var _ MemdxDispatcherCloser = &MemdxDispatcherCloserMock{} +var _ MemdxClient = &MemdxClientMock{} -// MemdxDispatcherCloserMock is a mock implementation of MemdxDispatcherCloser. +// MemdxClientMock is a mock implementation of MemdxClient. // -// func TestSomethingThatUsesMemdxDispatcherCloser(t *testing.T) { +// func TestSomethingThatUsesMemdxClient(t *testing.T) { // -// // make and configure a mocked MemdxDispatcherCloser -// mockedMemdxDispatcherCloser := &MemdxDispatcherCloserMock{ +// // make and configure a mocked MemdxClient +// mockedMemdxClient := &MemdxClientMock{ // CloseFunc: func() error { // panic("mock out the Close method") // }, // DispatchFunc: func(packet *memdx.Packet, dispatchCallback memdx.DispatchCallback) (memdx.PendingOp, error) { // panic("mock out the Dispatch method") // }, -// LocalAddrFunc: func() string { +// LocalAddrFunc: func() net.Addr { // panic("mock out the LocalAddr method") // }, -// RemoteAddrFunc: func() string { +// RemoteAddrFunc: func() net.Addr { // panic("mock out the RemoteAddr method") // }, // } // -// // use mockedMemdxDispatcherCloser in code that requires MemdxDispatcherCloser +// // use mockedMemdxClient in code that requires MemdxClient // // and then make assertions. // // } -type MemdxDispatcherCloserMock struct { +type MemdxClientMock struct { // CloseFunc mocks the Close method. CloseFunc func() error @@ -1566,10 +1654,10 @@ type MemdxDispatcherCloserMock struct { DispatchFunc func(packet *memdx.Packet, dispatchCallback memdx.DispatchCallback) (memdx.PendingOp, error) // LocalAddrFunc mocks the LocalAddr method. - LocalAddrFunc func() string + LocalAddrFunc func() net.Addr // RemoteAddrFunc mocks the RemoteAddr method. - RemoteAddrFunc func() string + RemoteAddrFunc func() net.Addr // calls tracks calls to the methods. calls struct { @@ -1597,9 +1685,9 @@ type MemdxDispatcherCloserMock struct { } // Close calls CloseFunc. -func (mock *MemdxDispatcherCloserMock) Close() error { +func (mock *MemdxClientMock) Close() error { if mock.CloseFunc == nil { - panic("MemdxDispatcherCloserMock.CloseFunc: method is nil but MemdxDispatcherCloser.Close was just called") + panic("MemdxClientMock.CloseFunc: method is nil but MemdxClient.Close was just called") } callInfo := struct { }{} @@ -1612,8 +1700,8 @@ func (mock *MemdxDispatcherCloserMock) Close() error { // CloseCalls gets all the calls that were made to Close. // Check the length with: // -// len(mockedMemdxDispatcherCloser.CloseCalls()) -func (mock *MemdxDispatcherCloserMock) CloseCalls() []struct { +// len(mockedMemdxClient.CloseCalls()) +func (mock *MemdxClientMock) CloseCalls() []struct { } { var calls []struct { } @@ -1624,9 +1712,9 @@ func (mock *MemdxDispatcherCloserMock) CloseCalls() []struct { } // Dispatch calls DispatchFunc. -func (mock *MemdxDispatcherCloserMock) Dispatch(packet *memdx.Packet, dispatchCallback memdx.DispatchCallback) (memdx.PendingOp, error) { +func (mock *MemdxClientMock) Dispatch(packet *memdx.Packet, dispatchCallback memdx.DispatchCallback) (memdx.PendingOp, error) { if mock.DispatchFunc == nil { - panic("MemdxDispatcherCloserMock.DispatchFunc: method is nil but MemdxDispatcherCloser.Dispatch was just called") + panic("MemdxClientMock.DispatchFunc: method is nil but MemdxClient.Dispatch was just called") } callInfo := struct { Packet *memdx.Packet @@ -1644,8 +1732,8 @@ func (mock *MemdxDispatcherCloserMock) Dispatch(packet *memdx.Packet, dispatchCa // DispatchCalls gets all the calls that were made to Dispatch. // Check the length with: // -// len(mockedMemdxDispatcherCloser.DispatchCalls()) -func (mock *MemdxDispatcherCloserMock) DispatchCalls() []struct { +// len(mockedMemdxClient.DispatchCalls()) +func (mock *MemdxClientMock) DispatchCalls() []struct { Packet *memdx.Packet DispatchCallback memdx.DispatchCallback } { @@ -1660,9 +1748,9 @@ func (mock *MemdxDispatcherCloserMock) DispatchCalls() []struct { } // LocalAddr calls LocalAddrFunc. -func (mock *MemdxDispatcherCloserMock) LocalAddr() string { +func (mock *MemdxClientMock) LocalAddr() net.Addr { if mock.LocalAddrFunc == nil { - panic("MemdxDispatcherCloserMock.LocalAddrFunc: method is nil but MemdxDispatcherCloser.LocalAddr was just called") + panic("MemdxClientMock.LocalAddrFunc: method is nil but MemdxClient.LocalAddr was just called") } callInfo := struct { }{} @@ -1675,8 +1763,8 @@ func (mock *MemdxDispatcherCloserMock) LocalAddr() string { // LocalAddrCalls gets all the calls that were made to LocalAddr. // Check the length with: // -// len(mockedMemdxDispatcherCloser.LocalAddrCalls()) -func (mock *MemdxDispatcherCloserMock) LocalAddrCalls() []struct { +// len(mockedMemdxClient.LocalAddrCalls()) +func (mock *MemdxClientMock) LocalAddrCalls() []struct { } { var calls []struct { } @@ -1687,9 +1775,9 @@ func (mock *MemdxDispatcherCloserMock) LocalAddrCalls() []struct { } // RemoteAddr calls RemoteAddrFunc. -func (mock *MemdxDispatcherCloserMock) RemoteAddr() string { +func (mock *MemdxClientMock) RemoteAddr() net.Addr { if mock.RemoteAddrFunc == nil { - panic("MemdxDispatcherCloserMock.RemoteAddrFunc: method is nil but MemdxDispatcherCloser.RemoteAddr was just called") + panic("MemdxClientMock.RemoteAddrFunc: method is nil but MemdxClient.RemoteAddr was just called") } callInfo := struct { }{} @@ -1702,8 +1790,8 @@ func (mock *MemdxDispatcherCloserMock) RemoteAddr() string { // RemoteAddrCalls gets all the calls that were made to RemoteAddr. // Check the length with: // -// len(mockedMemdxDispatcherCloser.RemoteAddrCalls()) -func (mock *MemdxDispatcherCloserMock) RemoteAddrCalls() []struct { +// len(mockedMemdxClient.RemoteAddrCalls()) +func (mock *MemdxClientMock) RemoteAddrCalls() []struct { } { var calls []struct { } diff --git a/utils.go b/utils.go index 0942cb75..670803c9 100644 --- a/utils.go +++ b/utils.go @@ -22,6 +22,19 @@ func getHostFromUri(uri string) (string, error) { return parsedUrl.Host, nil } +func hostnameFromAddrStr(address string) string { + host, _, err := net.SplitHostPort(address) + if err != nil { + return address + } + return host +} + +func hostPortFromNetAddr(addr net.Addr) (string, int) { + tcpAddr := addr.(*net.TCPAddr) + return tcpAddr.IP.String(), tcpAddr.Port +} + func hostFromHostPort(hostport string) (string, error) { host, _, err := net.SplitHostPort(hostport) if err != nil { diff --git a/vbucketrouter.go b/vbucketrouter.go index 473a3a06..0a99cec4 100644 --- a/vbucketrouter.go +++ b/vbucketrouter.go @@ -1,9 +1,7 @@ package gocbcorex import ( - "bytes" "context" - "encoding/json" "errors" "sync/atomic" @@ -118,8 +116,14 @@ type NotMyVbucketConfigHandler interface { HandleNotMyVbucketConfig(config *cbconfig.TerseConfigJson, sourceHostname string) } -func OrchestrateMemdRouting[RespT any](ctx context.Context, vb VbucketRouter, ch NotMyVbucketConfigHandler, key []byte, vbServerIdx uint32, - fn func(endpoint string, vbID uint16) (RespT, error)) (RespT, error) { +func OrchestrateMemdRouting[RespT any]( + ctx context.Context, + vb VbucketRouter, + ch NotMyVbucketConfigHandler, + key []byte, + vbServerIdx uint32, + fn func(endpoint string, vbID uint16) (RespT, error), +) (RespT, error) { endpoint, vbID, err := vb.DispatchByKey(key, vbServerIdx) if err != nil { var emptyResp RespT @@ -127,43 +131,31 @@ func OrchestrateMemdRouting[RespT any](ctx context.Context, vb VbucketRouter, ch } for { - // Implement me properly res, err := fn(endpoint, vbID) if err != nil { if errors.Is(err, memdx.ErrNotMyVbucket) { - if ch == nil { - // if we have no config handler, no point in trying to parse the config - return res, &VbucketMapOutdatedError{ - Cause: err, - } - } - - var nmvErr *memdx.ServerErrorWithConfig - if !errors.As(err, &nmvErr) { - // if there is no new config available, we cant make any assumptions - // about the meaning of this error and propagate it upwards. - // log.Printf("received a not-my-vbucket without config information") - return res, &VbucketMapOutdatedError{ - Cause: err, + // if we have a config handler, lets try to parse the config and update + if ch != nil { + var nmvErr *memdx.ServerErrorWithConfig + if errors.As(err, &nmvErr) { + var kvCliErr *KvClientError + if errors.As(err, &kvCliErr) { + sourceHostname := kvCliErr.RemoteHostname + + config, parseErr := cbconfig.ParseTerseConfig( + nmvErr.ConfigJson, + sourceHostname) + if parseErr != nil { + return res, &VbucketMapOutdatedError{ + Cause: err, + } + } + + ch.HandleNotMyVbucketConfig(config, sourceHostname) + } } } - // configs can contain $HOST, which needs to be replaced with the querying endpoint... - configJsonBytes := bytes.ReplaceAll( - nmvErr.ConfigJson, - []byte("$HOST"), - []byte(endpoint)) - - var configJson *cbconfig.TerseConfigJson - unmarshalErr := json.Unmarshal(configJsonBytes, &configJson) - if unmarshalErr != nil { - return res, &VbucketMapOutdatedError{ - Cause: err, - } - } - - ch.HandleNotMyVbucketConfig(configJson, endpoint) - newEndpoint, newVbID, err := vb.DispatchByKey(key, vbServerIdx) if err != nil { var emptyResp RespT