Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

linear: be aware of stream state when responding (#540) #543

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/prometheus/client_model v0.2.0
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/proto/otlp v0.15.0
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f
google.golang.org/genproto v0.0.0-20220329172620-7be39ac1afc7
google.golang.org/grpc v1.45.0
google.golang.org/protobuf v1.28.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
46 changes: 36 additions & 10 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

type watches = map[chan Response]struct{}
type watches = map[chan Response]stream.StreamState

// LinearCache supports collections of opaque resources. This cache has a
// single collection indexed by resource names and manages resource versions
Expand Down Expand Up @@ -114,24 +114,30 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
}

func (cache *LinearCache) respond(value chan Response, staleResources []string) {
var resources []types.ResourceWithTTL
var (
resources []types.ResourceWithTTL
respondResourceNames []string
)

// TODO: optimize the resources slice creations across different clients
if len(staleResources) == 0 {
resources = make([]types.ResourceWithTTL, 0, len(cache.resources))
for _, resource := range cache.resources {
for name, resource := range cache.resources {
resources = append(resources, types.ResourceWithTTL{Resource: resource})
respondResourceNames = append(respondResourceNames, name)
}
} else {
resources = make([]types.ResourceWithTTL, 0, len(staleResources))
for _, name := range staleResources {
resource := cache.resources[name]
if resource != nil {
resources = append(resources, types.ResourceWithTTL{Resource: resource})
respondResourceNames = append(respondResourceNames, name)
}
}
}
value <- &RawResponse{
Request: &Request{TypeUrl: cache.typeURL},
Request: &Request{TypeUrl: cache.typeURL, ResourceNames: respondResourceNames},
Resources: resources,
Version: cache.getVersion(),
Ctx: context.Background(),
Expand All @@ -142,11 +148,25 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
// de-duplicate watches that need to be responded
notifyList := make(map[chan Response][]string)
for name := range modified {
for watch := range cache.watches[name] {
notifyList[watch] = append(notifyList[watch], name)
for watch, streamState := range cache.watches[name] {
resourceNames := streamState.GetKnownResourceNames(cache.typeURL)
modifiedNameInResourceName := false
for resourceName := range resourceNames {
if !modifiedNameInResourceName && resourceName == name {
modifiedNameInResourceName = true
}
// To avoid the stale in notifyList becomes empty slice.
// Don't skip resource name that has been deleted here.
// It would be filtered out in respond because the corresponding resource has been deleted.
notifyList[watch] = append(notifyList[watch], resourceName)
}
if !modifiedNameInResourceName {
notifyList[watch] = append(notifyList[watch], name)
}
}
delete(cache.watches, name)
}

for value, stale := range notifyList {
cache.respond(value, stale)
}
Expand Down Expand Up @@ -328,10 +348,16 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
stale = lastVersion != cache.version
} else {
for _, name := range request.ResourceNames {
_, has := streamState.GetKnownResourceNames(request.TypeUrl)[name]
version, exists := cache.versionVector[name]

// When a resource is removed, its version defaults 0 and it is not considered stale.
if lastVersion < cache.versionVector[name] {
if lastVersion < version || (!has && exists) {
stale = true
staleResources = append(staleResources, name)

// Here we collect all requested names.
// It would be filtered out in respond if the resource name doesn't appear in cache.
staleResources = request.ResourceNames
}
}
}
Expand All @@ -341,7 +367,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
}
// Create open watches since versions are up to date.
if len(request.ResourceNames) == 0 {
cache.watchAll[value] = struct{}{}
cache.watchAll[value] = streamState
return func() {
cache.mu.Lock()
defer cache.mu.Unlock()
Expand All @@ -354,7 +380,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
set = make(watches)
cache.watches[name] = set
}
set[value] = struct{}{}
set[value] = streamState
}
return func() {
cache.mu.Lock()
Expand Down
12 changes: 9 additions & 3 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ func TestLinearSetResources(t *testing.T) {

// Create new resources
w1 := make(chan Response, 1)
streamState.SetKnownResourceNamesAsList(testType, []string{"a"})
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1)
mustBlock(t, w1)
w2 := make(chan Response, 1)
Expand Down Expand Up @@ -341,6 +342,7 @@ func TestLinearVersionPrefix(t *testing.T) {
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
verifyResponse(t, w, "instance1-1", 1)

streamState.SetKnownResourceNamesAsList(testType, []string{"a"})
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, streamState, w)
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
Expand All @@ -350,6 +352,7 @@ func TestLinearDeletion(t *testing.T) {
streamState := stream.NewStreamState(false, map[string]string{})
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
w := make(chan Response, 1)
streamState.SetKnownResourceNamesAsList(testType, []string{"a"})
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
Expand All @@ -369,14 +372,15 @@ func TestLinearWatchTwo(t *testing.T) {
streamState := stream.NewStreamState(false, map[string]string{})
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
w := make(chan Response, 1)
streamState.SetKnownResourceNamesAsList(testType, []string{"a", "b"})
c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
mustBlock(t, w)
w1 := make(chan Response, 1)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w1)
mustBlock(t, w1)
require.NoError(t, c.UpdateResource("a", testResource("aa")))
// should only get the modified resource
verifyResponse(t, w, "1", 1)
verifyResponse(t, w, "1", 2)
verifyResponse(t, w1, "1", 2)
}

Expand All @@ -394,6 +398,7 @@ func TestLinearCancel(t *testing.T) {
checkWatchCount(t, c, "a", 0)

// cancel watch for "a"
streamState.SetKnownResourceNamesAsList(testType, []string{"a"})
cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w)
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
Expand Down Expand Up @@ -737,6 +742,7 @@ func TestLinearMixedWatches(t *testing.T) {
assert.Equal(t, 2, c.NumResources())

sotwState := stream.NewStreamState(false, nil)
sotwState.SetKnownResourceNamesAsList(testType, []string{"a", "b"})
w := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w)
mustBlock(t, w)
Expand All @@ -749,7 +755,7 @@ func TestLinearMixedWatches(t *testing.T) {
err = c.UpdateResources(map[string]types.Resource{"a": a}, nil)
assert.NoError(t, err)
// This behavior is currently invalid for cds and lds, but due to a current limitation of linear cache sotw implementation
verifyResponse(t, w, c.getVersion(), 1)
verifyResponse(t, w, c.getVersion(), 2)
checkVersionMapNotSet(t, c)

c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w)
Expand All @@ -770,6 +776,6 @@ func TestLinearMixedWatches(t *testing.T) {
assert.NoError(t, err)
checkVersionMapSet(t, c)

verifyResponse(t, w, c.getVersion(), 0)
verifyResponse(t, w, c.getVersion(), 1)
verifyDeltaResponse(t, wd, nil, []string{"b"})
}
Loading