Skip to content

Commit

Permalink
ING-828: Refactored http services to use endpoint identifiers.
Browse files Browse the repository at this point in the history
  • Loading branch information
Brett Lawson committed Jul 5, 2024
1 parent 6a07c2d commit 9ed6df8
Show file tree
Hide file tree
Showing 12 changed files with 367 additions and 221 deletions.
96 changes: 56 additions & 40 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"net"
"net/http"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -312,52 +311,64 @@ func (agent *Agent) genAgentComponentConfigsLocked() *agentComponentConfigs {
clientName := fmt.Sprintf("gocbcorex/%s", buildVersion)

latestConfig := agent.state.latestConfig
bootstrapHosts := latestConfig.AddressesGroupForNetworkType(agent.networkType)
netInfo := latestConfig.AddressesGroupForNetworkType(agent.networkType)

var kvDataHosts []string
var mgmtEndpoints []string
var queryEndpoints []string
var searchEndpoints []string
var analyticsEndpoints []string
kvDataNodeIds := make([]string, 0, len(netInfo.Nodes))
kvDataHosts := make(map[string]string, len(netInfo.Nodes))
mgmtEndpoints := make(map[string]string, len(netInfo.Nodes))
queryEndpoints := make(map[string]string, len(netInfo.Nodes))
searchEndpoints := make(map[string]string, len(netInfo.Nodes))
analyticsEndpoints := make(map[string]string, len(netInfo.Nodes))

tlsConfig := agent.state.tlsConfig
if tlsConfig == nil {
kvDataHosts = bootstrapHosts.NonSSL.KvData
for _, host := range bootstrapHosts.NonSSL.Mgmt {
mgmtEndpoints = append(mgmtEndpoints, "http://"+host)
for _, node := range netInfo.Nodes {
kvEpId := "kv" + node.NodeID
mgmtEpId := "mg" + node.NodeID
queryEpId := "qu" + node.NodeID
searchEpId := "se" + node.NodeID
analyticsEpId := "an" + node.NodeID

if node.HasData {
kvDataNodeIds = append(kvDataNodeIds, kvEpId)
}
for _, host := range bootstrapHosts.NonSSL.Query {
queryEndpoints = append(queryEndpoints, "http://"+host)
}
for _, host := range bootstrapHosts.NonSSL.Search {
searchEndpoints = append(searchEndpoints, "http://"+host)
}
for _, host := range bootstrapHosts.NonSSL.Analytics {
analyticsEndpoints = append(analyticsEndpoints, "http://"+host)
}
} else {
kvDataHosts = bootstrapHosts.SSL.KvData
for _, host := range bootstrapHosts.SSL.Mgmt {
mgmtEndpoints = append(mgmtEndpoints, "https://"+host)
}
for _, host := range bootstrapHosts.SSL.Query {
queryEndpoints = append(queryEndpoints, "https://"+host)
}
for _, host := range bootstrapHosts.SSL.Search {
searchEndpoints = append(searchEndpoints, "https://"+host)
}
for _, host := range bootstrapHosts.SSL.Analytics {
analyticsEndpoints = append(analyticsEndpoints, "https://"+host)

if tlsConfig == nil {
if node.NonSSLPorts.Kv > 0 {
kvDataHosts[kvEpId] = fmt.Sprintf("%s:%d", node.Hostname, node.NonSSLPorts.Kv)
}
if node.NonSSLPorts.Mgmt > 0 {
mgmtEndpoints[mgmtEpId] = fmt.Sprintf("http://%s:%d", node.Hostname, node.NonSSLPorts.Mgmt)
}
if node.NonSSLPorts.Query > 0 {
queryEndpoints[queryEpId] = fmt.Sprintf("http://%s:%d", node.Hostname, node.NonSSLPorts.Query)
}
if node.NonSSLPorts.Search > 0 {
searchEndpoints[searchEpId] = fmt.Sprintf("http://%s:%d", node.Hostname, node.NonSSLPorts.Search)
}
if node.NonSSLPorts.Analytics > 0 {
analyticsEndpoints[analyticsEpId] = fmt.Sprintf("http://%s:%d", node.Hostname, node.NonSSLPorts.Analytics)
}
} else {
if node.SSLPorts.Kv > 0 {
kvDataHosts[kvEpId] = fmt.Sprintf("%s:%d", node.Hostname, node.SSLPorts.Kv)
}
if node.SSLPorts.Mgmt > 0 {
mgmtEndpoints[mgmtEpId] = fmt.Sprintf("https://%s:%d", node.Hostname, node.SSLPorts.Mgmt)
}
if node.SSLPorts.Query > 0 {
queryEndpoints[queryEpId] = fmt.Sprintf("https://%s:%d", node.Hostname, node.SSLPorts.Query)
}
if node.SSLPorts.Search > 0 {
searchEndpoints[searchEpId] = fmt.Sprintf("https://%s:%d", node.Hostname, node.SSLPorts.Search)
}
if node.SSLPorts.Analytics > 0 {
analyticsEndpoints[analyticsEpId] = fmt.Sprintf("https://%s:%d", node.Hostname, node.SSLPorts.Analytics)
}
}
}
kvDataNodeIds := make([]string, len(bootstrapHosts.NonSSL.KvData))
for i, hostPort := range bootstrapHosts.NonSSL.KvData {
kvDataNodeIds[i] = "ep-" + strings.Replace(hostPort, ":", "-", -1)
}

clients := make(map[string]*KvClientConfig)
for addrIdx, addr := range kvDataHosts {
nodeId := kvDataNodeIds[addrIdx]
for nodeId, addr := range kvDataHosts {
clients[nodeId] = &KvClientConfig{
Address: addr,
TlsConfig: tlsConfig,
Expand All @@ -367,10 +378,15 @@ func (agent *Agent) genAgentComponentConfigsLocked() *agentComponentConfigs {
}
}

mgmtEndpointsList := make([]string, 0, len(mgmtEndpoints))
for _, ep := range mgmtEndpoints {
mgmtEndpointsList = append(mgmtEndpointsList, ep)
}

return &agentComponentConfigs{
ConfigWatcherHttpConfig: ConfigWatcherHttpConfig{
HttpRoundTripper: agent.state.httpTransport,
Endpoints: mgmtEndpoints,
Endpoints: mgmtEndpointsList,
UserAgent: clientName,
Authenticator: agent.state.authenticator,
BucketName: agent.state.bucket,
Expand Down
4 changes: 2 additions & 2 deletions analyticscomponent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type AnalyticsComponent struct {

type AnalyticsComponentConfig struct {
HttpRoundTripper http.RoundTripper
Endpoints []string
Endpoints map[string]string
Authenticator Authenticator
}

Expand All @@ -34,7 +34,7 @@ func OrchestrateAnalyticsEndpoint[RespT any](
w *AnalyticsComponent,
fn func(roundTripper http.RoundTripper, endpoint, username, password string) (RespT, error),
) (RespT, error) {
roundTripper, endpoint, username, password, err := w.SelectEndpoint(nil)
roundTripper, _, endpoint, username, password, err := w.SelectEndpoint(nil)
if err != nil {
var emptyResp RespT
return emptyResp, err
Expand Down
59 changes: 38 additions & 21 deletions basehttpcomponent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/couchbase/gocbcorex/cbmgmtx"
"github.com/couchbase/gocbcorex/cbqueryx"
"github.com/couchbase/gocbcorex/cbsearchx"
"golang.org/x/exp/slices"
)

type baseHttpComponent struct {
Expand All @@ -22,14 +23,15 @@ type baseHttpComponent struct {

type baseHttpComponentState struct {
httpRoundTripper http.RoundTripper
endpoints []string
endpoints map[string]string
authenticator Authenticator
}

type baseHttpTarget struct {
Endpoint string
Username string
Password string
EndpointId string
Endpoint string
Username string
Password string
}

func (c *baseHttpComponent) updateState(newState baseHttpComponentState) {
Expand All @@ -38,16 +40,21 @@ func (c *baseHttpComponent) updateState(newState baseHttpComponentState) {
c.lock.Unlock()
}

func (c *baseHttpComponent) GetAllTargets(ignoredEndpoints []string) (http.RoundTripper, []baseHttpTarget, error) {
func (c *baseHttpComponent) GetAllTargets(endpointIdsToIgnore []string) (http.RoundTripper, []baseHttpTarget, error) {
c.lock.RLock()
state := *c.state
c.lock.RUnlock()

// remove all the endpoints we've already tried
remainingEndpoints := filterStringsOut(state.endpoints, ignoredEndpoints)
remainingEndpoints := make(map[string]string, len(state.endpoints))
for epId, endpoint := range state.endpoints {
if !slices.Contains(endpointIdsToIgnore, epId) {
remainingEndpoints[epId] = endpoint
}
}

targets := make([]baseHttpTarget, len(remainingEndpoints))
for endpointIdx, endpoint := range remainingEndpoints {
targets := make([]baseHttpTarget, 0, len(remainingEndpoints))
for epId, endpoint := range remainingEndpoints {
host, err := getHostFromUri(endpoint)
if err != nil {
return nil, []baseHttpTarget{}, err
Expand All @@ -58,48 +65,58 @@ func (c *baseHttpComponent) GetAllTargets(ignoredEndpoints []string) (http.Round
return nil, []baseHttpTarget{}, err
}

targets[endpointIdx] = baseHttpTarget{
Endpoint: endpoint,
Username: username,
Password: password,
}
targets = append(targets, baseHttpTarget{
EndpointId: epId,
Endpoint: endpoint,
Username: username,
Password: password,
})
}

return state.httpRoundTripper, targets, nil
}

func (c *baseHttpComponent) SelectEndpoint(ignoredEndpoints []string) (http.RoundTripper, string, string, string, error) {
func (c *baseHttpComponent) SelectEndpoint(endpointIdsToIgnore []string) (http.RoundTripper, string, string, string, string, error) {
c.lock.RLock()
state := *c.state
c.lock.RUnlock()

// if there are no endpoints to query, we can't proceed
if len(state.endpoints) == 0 {
return nil, "", "", "", nil
return nil, "", "", "", "", nil
}

// remove all the endpoints we've already tried
remainingEndpoints := filterStringsOut(state.endpoints, ignoredEndpoints)
remainingEndpoints := make(map[string]string, len(state.endpoints))
endpointIds := make([]string, 0, len(state.endpoints))
for epId, endpoint := range state.endpoints {
if !slices.Contains(endpointIdsToIgnore, epId) {
remainingEndpoints[epId] = endpoint
endpointIds = append(endpointIds, epId)
}
}

// if there are no more endpoints to try, we can't proceed
if len(remainingEndpoints) == 0 {
return nil, "", "", "", nil
return nil, "", "", "", "", nil
}

// pick a random endpoint to attempt
endpoint := remainingEndpoints[rand.Intn(len(remainingEndpoints))]
endpointIdx := rand.Intn(len(remainingEndpoints))
endpointId := endpointIds[endpointIdx]
endpoint := remainingEndpoints[endpointId]

host, err := getHostFromUri(endpoint)
if err != nil {
return nil, "", "", "", err
return nil, "", "", "", "", err
}

username, password, err := state.authenticator.GetCredentials(c.serviceType, host)
if err != nil {
return nil, "", "", "", err
return nil, "", "", "", "", err
}

return state.httpRoundTripper, endpoint, username, password, nil
return state.httpRoundTripper, endpointId, endpoint, username, password, nil
}

type baseHttpTargets []baseHttpTarget
Expand Down
38 changes: 23 additions & 15 deletions bucketstracking_agentmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"errors"
"fmt"
"net/http"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -191,17 +192,16 @@ func (m *BucketsTrackingAgentManager) startWatchers() {
}

func (m *BucketsTrackingAgentManager) mgmtEndpointsLocked(cfg *ParsedConfig) []string {
bootstrapHosts := cfg.AddressesGroupForNetworkType(m.networkType)
netInfo := cfg.AddressesGroupForNetworkType(m.networkType)

var mgmtEndpoints []string
tlsConfig := m.state.tlsConfig
if tlsConfig == nil {
for _, host := range bootstrapHosts.NonSSL.Mgmt {
mgmtEndpoints = append(mgmtEndpoints, "http://"+host)
}
} else {
for _, host := range bootstrapHosts.SSL.Mgmt {
mgmtEndpoints = append(mgmtEndpoints, "https://"+host)

for _, node := range netInfo.Nodes {
if tlsConfig == nil {
mgmtEndpoints = append(mgmtEndpoints, fmt.Sprintf("http://%s:%d", node.Hostname, node.NonSSLPorts.Mgmt))
} else {
mgmtEndpoints = append(mgmtEndpoints, fmt.Sprintf("http://%s:%d", node.Hostname, node.SSLPorts.Mgmt))
}
}

Expand Down Expand Up @@ -246,18 +246,26 @@ func (m *BucketsTrackingAgentManager) makeAgent(ctx context.Context, bucketName
defer m.stateLock.Unlock()

cfg := m.state.latestConfig.Load()
bootstrapHosts := cfg.AddressesGroupForNetworkType(m.networkType)
netInfo := cfg.AddressesGroupForNetworkType(m.networkType)

var kvDataHosts []string
var mgmtEndpoints []string

tlsConfig := m.state.tlsConfig
if tlsConfig == nil {
kvDataHosts = bootstrapHosts.NonSSL.KvData
mgmtEndpoints = bootstrapHosts.NonSSL.Mgmt
} else {
kvDataHosts = bootstrapHosts.SSL.KvData
mgmtEndpoints = bootstrapHosts.SSL.Mgmt
for _, node := range netInfo.Nodes {
if tlsConfig == nil {
if node.HasData {
kvDataHosts = append(kvDataHosts, fmt.Sprintf("%s:%d", node.Hostname, node.NonSSLPorts.Kv))
}

mgmtEndpoints = append(mgmtEndpoints, fmt.Sprintf("http://%s:%d", node.Hostname, node.NonSSLPorts.Mgmt))
} else {
if node.HasData {
kvDataHosts = append(kvDataHosts, fmt.Sprintf("%s:%d", node.Hostname, node.SSLPorts.Kv))
}

mgmtEndpoints = append(mgmtEndpoints, fmt.Sprintf("https://%s:%d", node.Hostname, node.SSLPorts.Mgmt))
}
}

ctx, cancel := context.WithTimeout(ctx, m.createAgentTimeout)
Expand Down
Loading

0 comments on commit 9ed6df8

Please sign in to comment.