Skip to content

Commit

Permalink
ING-959: Refactored hostname handling when parsing configs.
Browse files Browse the repository at this point in the history
This commit also refactors how we deal with remote hostnames and addresses
with the various clients and connections inside of the library.  Primarily
with the intent of simplifying the interfaces that we expose, and ensuring
the correct details are available in the right places.
  • Loading branch information
Brett Lawson committed Oct 29, 2024
1 parent b80f485 commit 7d767c4
Show file tree
Hide file tree
Showing 23 changed files with 403 additions and 326 deletions.
16 changes: 5 additions & 11 deletions configwatcher_memd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package gocbcorex

import (
"context"
"encoding/json"
"errors"
"sync"
"time"

Expand Down Expand Up @@ -67,22 +65,18 @@ func configWatcherMemd_pollOne(
return nil, err
}

host, _, _ := client.RemoteHostPort()
if host == "" {
return nil, errors.New("unexpected cccp endpoint format")
}

logger.Debug("Polling for new config",
zap.String("host", host),
zap.String("endpoint", endpoint),
zap.String("endpoint", endpoint))

resp, err := client.GetClusterConfig(ctx, &memdx.GetClusterConfigRequest{})
if err != nil {
return nil, err
}

var config cbconfig.TerseConfigJson
err = json.Unmarshal(resp.Config, &config)
hostname := client.RemoteHostname()

config, err := cbconfig.ParseTerseConfig(resp.Config, hostname)
if err != nil {
return nil, err
}
Expand All @@ -91,7 +85,7 @@ func configWatcherMemd_pollOne(
zap.Int("config", config.Rev),
zap.Int("configRevEpoch", config.RevEpoch))

parsedConfig, err := ConfigParser{}.ParseTerseConfig(&config, host)
parsedConfig, err := ConfigParser{}.ParseTerseConfig(config, hostname)
if err != nil {
return nil, err
}
Expand Down
16 changes: 16 additions & 0 deletions contrib/cbconfig/parse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package cbconfig

import (
"bytes"
"encoding/json"
)

func ParseTerseConfig(config []byte, sourceHostname string) (*TerseConfigJson, error) {
config = bytes.ReplaceAll(config, []byte("$HOST"), []byte(sourceHostname))
var configOut *TerseConfigJson
err := json.Unmarshal(config, &configOut)
if err != nil {
return nil, err
}
return configOut, nil
}
2 changes: 1 addition & 1 deletion generate-mocks.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//go:generate moq -out mock_collectionresolver_test.go . CollectionResolver
//go:generate moq -out mock_vbucketrouter_test.go . VbucketRouter
//go:generate moq -out mock_kvclient_test.go . KvClient MemdxDispatcherCloser
//go:generate moq -out mock_kvclient_test.go . KvClient MemdxClient
//go:generate moq -out mock_kvclientpool_test.go . KvClientPool
//go:generate moq -out mock_kvclientmanager_test.go . KvClientManager
//go:generate moq -out mock_retrymanager_test.go . RetryManager RetryController
Expand Down
52 changes: 19 additions & 33 deletions kvclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/couchbase/gocbcorex/memdx"
)

type GetMemdxClientFunc func(opts *memdx.ClientOptions) MemdxDispatcherCloser
type GetMemdxClientFunc func(opts *memdx.ClientOptions) MemdxClient

type KvClientConfig struct {
Address string
Expand Down Expand Up @@ -91,27 +91,20 @@ type KvClient interface {

LoadFactor() float64

RemoteHostPort() (string, string, int)
LocalHostPort() (string, int)
RemoteHostname() string
RemoteAddr() net.Addr
LocalAddr() net.Addr

KvClientOps
}

type MemdxDispatcherCloser interface {
memdx.Dispatcher
Close() error
}

type kvClient struct {
logger *zap.Logger
remoteHostName string
remoteHost string
remotePort int
localHost string
localPort int
remoteHostname string

pendingOperations uint64
cli MemdxDispatcherCloser
cli MemdxClient
durationMetric metric.Float64Histogram

lock sync.Mutex
Expand All @@ -132,20 +125,12 @@ type kvClient struct {
var _ KvClient = (*kvClient)(nil)

func NewKvClient(ctx context.Context, config *KvClientConfig, opts *KvClientOptions) (*kvClient, error) {
parseHostPort := func(addr string) (string, int) {
host, portStr, _ := net.SplitHostPort(addr)
parsedPort, _ := strconv.ParseInt(portStr, 10, 64)
return host, int(parsedPort)
}

logger := loggerOrNop(opts.Logger)
// We namespace the pool to improve debugging,
logger = logger.With(
zap.String("clientId", uuid.NewString()[:8]),
)

remoteHostName, remotePort := parseHostPort(config.Address)

durationMetric, err := meter.Float64Histogram("db.client.operation.duration",
metric.WithExplicitBucketBoundaries(0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10))
if err != nil {
Expand All @@ -154,8 +139,7 @@ func NewKvClient(ctx context.Context, config *KvClientConfig, opts *KvClientOpti

kvCli := &kvClient{
currentConfig: *config,
remoteHostName: remoteHostName,
remotePort: remotePort,
remoteHostname: hostnameFromAddrStr(config.Address),
logger: logger,
closeHandler: opts.CloseHandler,
durationMetric: durationMetric,
Expand Down Expand Up @@ -244,12 +228,6 @@ func NewKvClient(ctx context.Context, config *KvClientConfig, opts *KvClientOpti
kvCli.cli = opts.NewMemdxClient(memdxClientOpts)
}

remoteHost, _ := parseHostPort(kvCli.cli.RemoteAddr())
localHost, localPort := parseHostPort(kvCli.cli.LocalAddr())
kvCli.remoteHost = remoteHost
kvCli.localHost = localHost
kvCli.localPort = localPort

if shouldBootstrap {
if bootstrapSelectBucket != nil {
kvCli.selectedBucket.Store(ptr.To(bootstrapSelectBucket.BucketName))
Expand Down Expand Up @@ -368,12 +346,20 @@ func (c *kvClient) LoadFactor() float64 {
return (float64)(atomic.LoadUint64(&c.pendingOperations))
}

func (c *kvClient) RemoteHostPort() (string, string, int) {
return c.remoteHostName, c.remoteHost, c.remotePort
func (c *kvClient) RemoteHostname() string {
return c.remoteHostname
}

func (c *kvClient) RemoteAddr() net.Addr {
return c.cli.RemoteAddr()
}

func (c *kvClient) LocalAddr() net.Addr {
return c.cli.LocalAddr()
}

func (c *kvClient) LocalHostPort() (string, int) {
return c.localHost, c.localPort
func (c *kvClient) Dispatch(packet *memdx.Packet, cb memdx.DispatchCallback) (memdx.PendingOp, error) {
return c.cli.Dispatch(packet, cb)
}

func (c *kvClient) SelectedBucket() string {
Expand Down
4 changes: 2 additions & 2 deletions kvclient_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func kvClient_SimpleCall[Encoder any, ReqT memdx.OpRequest, RespT memdx.OpRespon
req ReqT,
) (RespT, error) {
bucketName := c.SelectedBucket()
localHost, localPort := c.LocalHostPort()
_, remoteHost, remotePort := c.RemoteHostPort()
localHost, localPort := hostPortFromNetAddr(c.LocalAddr())
remoteHost, remotePort := hostPortFromNetAddr(c.RemoteAddr())

stime := time.Now()

Expand Down
Loading

0 comments on commit 7d767c4

Please sign in to comment.