Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-4321: store skipped sequences as single entries below a certain threshold #7173

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,13 @@ func (c *changeCache) PushSkipped(ctx context.Context, startSeq uint64, endSeq u
base.InfofCtx(ctx, base.KeyCache, "cannot push negative skipped sequence range to skipped list: %d %d", startSeq, endSeq)
return
}
c.skippedSeqs.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(startSeq, endSeq))
numSeqs := (endSeq - startSeq) + 1
if numSeqs > MinSequencesForRange {
c.skippedSeqs.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(startSeq, endSeq))
} else {
// push sequences as separate entries
c.skippedSeqs.PushSkippedSequenceEntries(startSeq, endSeq, int64(numSeqs))
}
}

// waitForSequence blocks up to maxWaitTime until the given sequence has been received.
Expand Down
135 changes: 118 additions & 17 deletions db/change_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2056,7 +2056,7 @@ func TestProcessSkippedEntry(t *testing.T) {

// assert this pushes an entry on the skipped sequence slice
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, 1, len(testChangeCache.skippedSeqs.list))
assert.Equal(c, 19, len(testChangeCache.skippedSeqs.list))
}, time.Second*10, time.Millisecond*100)

// process some sequences over cache
Expand Down Expand Up @@ -2130,13 +2130,13 @@ func TestProcessSkippedEntryStats(t *testing.T) {

// assert this pushes an entry on the skipped sequence slice
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, 1, len(testChangeCache.skippedSeqs.list))
assert.Equal(c, 19, len(testChangeCache.skippedSeqs.list))
}, time.Second*10, time.Millisecond*100)

// expected values for stats on skipped slice
arrivingSeqs := []uint64{3, 15, 18, 2, 1}
expSliceLen := []int64{2, 3, 4, 4, 3}
expSliceCap := []int64{2, 4, 4, 4, 4}
expSliceLen := []int64{18, 17, 16, 15, 14}
expSliceCap := []int64{32, 32, 32, 32, 32}

numSeqsInList := dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()
for j := 0; j < len(arrivingSeqs); j++ {
Expand Down Expand Up @@ -2181,7 +2181,7 @@ func TestSkippedSequenceCompact(t *testing.T) {
testChangeCache := &changeCache{}
if err := testChangeCache.Init(ctx, dbContext, dbContext.channelCache, nil, &CacheOptions{
CachePendingSeqMaxWait: 5 * time.Millisecond,
CacheSkippedSeqMaxWait: 2 * time.Second,
CacheSkippedSeqMaxWait: 1 * time.Second,
}, dbContext.MetadataKeys); err != nil {
log.Printf("Init failed for testChangeCache: %v", err)
t.Fail()
Expand All @@ -2206,9 +2206,12 @@ func TestSkippedSequenceCompact(t *testing.T) {

// assert this pushes an entry on the skipped sequence slice
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, 1, len(testChangeCache.skippedSeqs.list))
assert.Equal(c, 19, len(testChangeCache.skippedSeqs.list))
}, time.Second*10, time.Millisecond*100)

// manually call clean skipped sequence queue, avoiding race with actual wait time omn skipped
require.NoError(t, testChangeCache.CleanSkippedSequenceQueue(ctx))

// assert that compaction empties the skipped slice and we have correct value for abandoned sequences
require.EventuallyWithT(t, func(c *assert.CollectT) {
testChangeCache.updateStats(ctx)
Expand Down Expand Up @@ -2264,7 +2267,7 @@ func TestReleasedSequenceRangeHandlingEverythingSkipped(t *testing.T) {
// assert that skipped list is filled and next seq at cache is updated
require.EventuallyWithT(t, func(c *assert.CollectT) {
testChangeCache.updateStats(ctx)
assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.SkippedSeqLen.Value())
assert.Equal(c, int64(19), dbContext.DbStats.CacheStats.SkippedSeqLen.Value())
assert.Equal(c, int64(19), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value())
assert.Equal(c, uint64(21), testChangeCache.nextSequence)
}, time.Second*10, time.Millisecond*100)
Expand Down Expand Up @@ -2712,7 +2715,7 @@ func TestReleasedSequenceRangeHandlingEdgeCase2(t *testing.T) {
// assert that the skipped list is filled
require.EventuallyWithT(t, func(c *assert.CollectT) {
testChangeCache.updateStats(ctx)
assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.SkippedSeqLen.Value())
assert.Equal(c, int64(19), dbContext.DbStats.CacheStats.SkippedSeqLen.Value())
assert.Equal(c, int64(19), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value())
assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.PendingSeqLen.Value())
dbContext.UpdateCalculatedStats(ctx)
Expand Down Expand Up @@ -2794,22 +2797,22 @@ func TestReleasedSequenceRangeHandlingDuplicateSequencesInSkipped(t *testing.T)
require.EventuallyWithT(t, func(c *assert.CollectT) {
testChangeCache.updateStats(ctx)
assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.PendingSeqLen.Value())
assert.Equal(c, int64(2), dbContext.DbStats.CacheStats.SkippedSeqLen.Value())
assert.Equal(c, int64(16), dbContext.DbStats.CacheStats.SkippedSeqLen.Value())
assert.Equal(c, int64(16), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value())
assert.Equal(c, uint64(19), testChangeCache.nextSequence)
dbContext.UpdateCalculatedStats(ctx)
assert.Equal(c, int64(18), dbContext.DbStats.CacheStats.HighSeqCached.Value())
}, time.Second*10, time.Millisecond*100)

// process unusedSeq range with range containing duplicate sipped sequences
// Skipped should contain: (1-13), (15-17) before processing this range
// Skipped should contain: (1,2,3,4,5,6,7,8,9,10,11,12,13), (15,16,17) before processing this range
testChangeCache.releaseUnusedSequenceRange(ctx, 10, 17, time.Now())

// assert skipped list altered to reflect the above range is processed
require.EventuallyWithT(t, func(c *assert.CollectT) {
testChangeCache.updateStats(ctx)
assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.PendingSeqLen.Value())
assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.SkippedSeqLen.Value())
assert.Equal(c, int64(9), dbContext.DbStats.CacheStats.SkippedSeqLen.Value())
assert.Equal(c, int64(9), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value())
assert.Equal(c, uint64(19), testChangeCache.nextSequence)
dbContext.UpdateCalculatedStats(ctx)
Expand Down Expand Up @@ -2852,6 +2855,104 @@ func TestReleasedSequenceRangeHandlingDuplicateSequencesInSkipped(t *testing.T)
}, time.Second*10, time.Millisecond*100)
}

// TestRangeInSkippedSplit:
// - Test skipped handling when skipped list has ranges
// - Purpose to ensure range handling isn't broken now changes are to have more single entries
func TestRangeInSkippedSplit(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCache)

ctx := base.TestCtx(t)
bucket := base.GetTestBucket(t)
dbContext, err := NewDatabaseContext(ctx, "db", bucket, false, DatabaseContextOptions{
Scopes: GetScopesOptions(t, bucket, 1),
})
require.NoError(t, err)
defer dbContext.Close(ctx)

ctx = dbContext.AddDatabaseLogContext(ctx)
err = dbContext.StartOnlineProcesses(ctx)
require.NoError(t, err)

testChangeCache := &changeCache{}
if err := testChangeCache.Init(ctx, dbContext, dbContext.channelCache, nil, &CacheOptions{
CachePendingSeqMaxWait: 20 * time.Minute,
CacheSkippedSeqMaxWait: 20 * time.Minute,
CachePendingSeqMaxNum: 0,
}, dbContext.MetadataKeys); err != nil {
log.Printf("Init failed for testChangeCache: %v", err)
t.Fail()
}

if err := testChangeCache.Start(0); err != nil {
log.Printf("Start error for testChangeCache: %v", err)
t.Fail()
}
defer testChangeCache.Stop(ctx)
require.NoError(t, err)

// entry that will create a skipped range entry on the list
entry := &LogEntry{
Sequence: uint64(MinSequencesForRange + 10),
DocID: fmt.Sprintf("doc_%d", 50),
RevID: "1-abcdefabcdefabcdef",
TimeReceived: time.Now(),
TimeSaved: time.Now(),
}
_ = testChangeCache.processEntry(ctx, entry)

// process unusedSeq range with range in middle of skipped entry
testChangeCache.releaseUnusedSequenceRange(ctx, 10, 17, time.Now())

// assert on stats
require.EventuallyWithT(t, func(c *assert.CollectT) {
testChangeCache.updateStats(ctx)
assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.PendingSeqLen.Value())
assert.Equal(c, int64(2), dbContext.DbStats.CacheStats.SkippedSeqLen.Value())
assert.Equal(c, int64(31), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value())
assert.Equal(c, uint64(41), testChangeCache.nextSequence)
dbContext.UpdateCalculatedStats(ctx)
assert.Equal(c, int64(40), dbContext.DbStats.CacheStats.HighSeqCached.Value())
}, time.Second*10, time.Millisecond*100)

// expected values for stats on skipped slice
arrivingSeqs := []uint64{3, 30, 25, 27}
expSliceLen := []int64{3, 4, 5, 6}
expSliceCap := []int64{4, 4, 8, 8}

testChangeCache.updateStats(ctx)
numSeqsInList := dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()
for j := 0; j < len(arrivingSeqs); j++ {
newEntry := &LogEntry{
DocID: fmt.Sprintf("doc_%d", arrivingSeqs),
RevID: "1-abcdefabcdefabcdef",
Sequence: arrivingSeqs[j],
}

_ = testChangeCache.processEntry(ctx, newEntry)
// assert on skipped sequence slice stats
testChangeCache.updateStats(ctx)
assert.Equal(t, numSeqsInList-1, dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value())
assert.Equal(t, expSliceLen[j], dbContext.DbStats.CacheStats.SkippedSeqLen.Value())
assert.Equal(t, int64(39), dbContext.DbStats.CacheStats.NumSkippedSeqs.Value())
assert.Equal(t, expSliceCap[j], dbContext.DbStats.CacheStats.SkippedSeqCap.Value())
numSeqsInList = dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()
}

// empty list
testChangeCache.releaseUnusedSequenceRange(ctx, 1, 40, time.Now())

// assert on stats
require.EventuallyWithT(t, func(c *assert.CollectT) {
testChangeCache.updateStats(ctx)
assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.PendingSeqLen.Value())
assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.SkippedSeqLen.Value())
assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value())
assert.Equal(c, uint64(41), testChangeCache.nextSequence)
dbContext.UpdateCalculatedStats(ctx)
assert.Equal(c, int64(40), dbContext.DbStats.CacheStats.HighSeqCached.Value())
}, time.Second*10, time.Millisecond*100)
}

// getChanges is a synchronous convenience function that returns all changes as a simple array. This will fail the test if an error is returned.
func getChanges(t *testing.T, collection *DatabaseCollectionWithUser, channels base.Set, options ChangesOptions) []*ChangeEntry {
require.NotNil(t, options.ChangesCtx)
Expand Down Expand Up @@ -2942,27 +3043,27 @@ func TestAddPendingLogs(t *testing.T) {
// overlapping ranges, low range arrives first
incoming: []sequenceRange{{4, 8}, {6, 10}},
expectedNextSequence: 11,
expectedSkipped: []sequenceRange{{1, 3}},
expectedSkipped: []sequenceRange{{1, 1}, {2, 2}, {3, 3}},
},
{
// completely overlapping ranges, larger range arrives first
incoming: []sequenceRange{{4, 8}, {6, 8}},
expectedNextSequence: 9,
expectedSkipped: []sequenceRange{{1, 3}},
expectedSkipped: []sequenceRange{{1, 1}, {2, 2}, {3, 3}},
},
{
// range arrives then partly overlapping range arrives
incoming: []sequenceRange{{4, 8}, {6, 10}},
expectedNextSequence: 11,
expectedSkipped: []sequenceRange{{1, 3}},
expectedSkipped: []sequenceRange{{1, 1}, {2, 2}, {3, 3}},
},
{
// range arrives, partly overlapping range arrives the single overlapping range
incoming: []sequenceRange{{4, 8}, {8, 10}, {9, 0}},
channelName: "B",
expectedNextSequence: 10,
expectedCached: []uint64{9},
expectedSkipped: []sequenceRange{{1, 3}},
expectedSkipped: []sequenceRange{{1, 1}, {2, 2}, {3, 3}},
},
{
// single range arrives, left side overlapping range arrives
Expand All @@ -2976,13 +3077,13 @@ func TestAddPendingLogs(t *testing.T) {
incoming: []sequenceRange{{4, 0}, {4, 5}},
channelName: "C",
expectedNextSequence: 6,
expectedSkipped: []sequenceRange{{1, 3}},
expectedSkipped: []sequenceRange{{1, 1}, {2, 2}, {3, 3}},
},
{
// range arrives, lower end overlapping range arrives
incoming: []sequenceRange{{6, 10}, {4, 8}},
expectedNextSequence: 11,
expectedSkipped: []sequenceRange{{1, 3}},
expectedSkipped: []sequenceRange{{1, 1}, {2, 2}, {3, 3}},
},
}

Expand Down
40 changes: 36 additions & 4 deletions db/skipped_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

const (
DefaultClipCapacityHeadroom = 1000
MinSequencesForRange = 30 // minimum number of sequences required to store entry as range
)

// SkippedSequenceSlice stores the set of skipped sequences as an ordered slice of single skipped sequences
Expand Down Expand Up @@ -231,7 +232,7 @@ func (s *SkippedSequenceSlice) _removeSeqRange(ctx context.Context, startSeq, en
// put this below a check for !found to avoid out of bound error
rangeElem := s.list[startIndex]
if endSeq > rangeElem.getLastSeq() {
base.DebugfCtx(ctx, base.KeyCache, "sequence range %d to %d specified has sequences in that are not present in skipped list", startSeq, endSeq)
base.DebugfCtx(ctx, base.KeyCache, "sequence range %d to %d specified has sequences in that are not present in skipped list, or sequence range spans multiple skipped entries", startSeq, endSeq)
return base.ErrSkippedSequencesMissing
}

Expand Down Expand Up @@ -327,6 +328,35 @@ func (s *SkippedSequenceSlice) _insert(index int, entry *SkippedSequenceListEntr
s.list[index] = entry
}

// PushSkippedSequenceEntries will push seq range to end of slice as separate single sequence entries unless the range
// being pushed in contiguous with end of the slice and that contiguous range is grater than the min range threshold
func (s *SkippedSequenceSlice) PushSkippedSequenceEntries(startSeq, endSeq uint64, numSeqsIncoming int64) {
s.lock.Lock()
defer s.lock.Unlock()

// update num current skipped sequences count + the cumulative count of skipped sequences
s.NumCurrentSkippedSequences += numSeqsIncoming
s.NumCumulativeSkippedSequences += numSeqsIncoming

// check if we should be extending the last entry on the slice
if len(s.list) != 0 {
index := len(s.list) - 1
lastEntryLastSeq := s.list[index].getLastSeq()
totalSeqs := s.list[index].getNumSequencesInEntry() + numSeqsIncoming
if (lastEntryLastSeq+1) == startSeq && totalSeqs > MinSequencesForRange {
// adding contiguous sequence, and we are above the min threshold to hold a range in list
// set last seq in the range to the new arriving sequence + alter timestamp to incoming entries timestamp
s.list[index].extendRange(endSeq, time.Now().Unix())
return
}
}

// push all items separate to end of slice
for i := startSeq; i <= endSeq; i++ {
s.list = append(s.list, NewSingleSkippedSequenceEntry(i))
}
}

// PushSkippedSequenceEntry will append a new skipped sequence entry to the end of the slice, if adding a contiguous
// sequence function will expand the last entry of the slice to reflect this
func (s *SkippedSequenceSlice) PushSkippedSequenceEntry(entry *SkippedSequenceListEntry) {
Expand All @@ -345,11 +375,13 @@ func (s *SkippedSequenceSlice) PushSkippedSequenceEntry(entry *SkippedSequenceLi
// get index of last entry + last seq of entry
index := len(s.list) - 1
lastEntryLastSeq := s.list[index].getLastSeq()
if (lastEntryLastSeq + 1) == entry.getStartSeq() {
// adding contiguous sequence
totalSeqs := s.list[index].getNumSequencesInEntry() + entry.getNumSequencesInEntry()
if (lastEntryLastSeq+1) == entry.getStartSeq() && totalSeqs > MinSequencesForRange {
// adding contiguous sequence, and we are above the min threshold to hold a range in list
// set last seq in the range to the new arriving sequence + alter timestamp to incoming entries timestamp
s.list[index].extendRange(entry.getLastSeq(), entry.getTimestamp())
} else {
// add new entry as separate item
s.list = append(s.list, entry)
}

Expand Down Expand Up @@ -397,7 +429,7 @@ func (s *SkippedSequenceSlice) processUnusedSequenceRangeAtSkipped(ctx context.C
}
} else if err != nil {
// if we get here then the skipped list must be empty
base.InfofCtx(ctx, base.KeyCache, "error attempting to remove unused sequence range form skipped: %v", err)
base.InfofCtx(ctx, base.KeyCache, "error attempting to remove unused sequence range from skipped: %v", err)
}
}

Expand Down
Loading
Loading