From c92f403aebd8065281418c361091df0c8881c9f6 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Tue, 29 Oct 2024 14:35:16 +0000 Subject: [PATCH 1/3] CBG-4321: store skipped sequences as single entries below a certain thredhold --- db/change_cache.go | 9 ++- db/change_cache_test.go | 135 +++++++++++++++++++++++++++++++----- db/skipped_sequence.go | 9 ++- db/skipped_sequence_test.go | 31 ++++++--- rest/changes_test.go | 2 +- 5 files changed, 155 insertions(+), 31 deletions(-) diff --git a/db/change_cache.go b/db/change_cache.go index bbeecdcbee..ab4f428514 100644 --- a/db/change_cache.go +++ b/db/change_cache.go @@ -989,7 +989,14 @@ func (c *changeCache) PushSkipped(ctx context.Context, startSeq uint64, endSeq u base.InfofCtx(ctx, base.KeyCache, "cannot push negative skipped sequence range to skipped list: %d %d", startSeq, endSeq) return } - c.skippedSeqs.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(startSeq, endSeq)) + numSeqs := (endSeq - startSeq) + 1 + if numSeqs > MinSequencesForRange { + c.skippedSeqs.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(startSeq, endSeq)) + } else { + for i := startSeq; i <= endSeq; i++ { + c.skippedSeqs.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(i)) + } + } } // waitForSequence blocks up to maxWaitTime until the given sequence has been received. diff --git a/db/change_cache_test.go b/db/change_cache_test.go index 0cd1ad5dd1..ecc35d05fb 100644 --- a/db/change_cache_test.go +++ b/db/change_cache_test.go @@ -2056,7 +2056,7 @@ func TestProcessSkippedEntry(t *testing.T) { // assert this pushes an entry on the skipped sequence slice require.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Equal(c, 1, len(testChangeCache.skippedSeqs.list)) + assert.Equal(c, 19, len(testChangeCache.skippedSeqs.list)) }, time.Second*10, time.Millisecond*100) // process some sequences over cache @@ -2130,13 +2130,13 @@ func TestProcessSkippedEntryStats(t *testing.T) { // assert this pushes an entry on the skipped sequence slice require.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Equal(c, 1, len(testChangeCache.skippedSeqs.list)) + assert.Equal(c, 19, len(testChangeCache.skippedSeqs.list)) }, time.Second*10, time.Millisecond*100) // expected values for stats on skipped slice arrivingSeqs := []uint64{3, 15, 18, 2, 1} - expSliceLen := []int64{2, 3, 4, 4, 3} - expSliceCap := []int64{2, 4, 4, 4, 4} + expSliceLen := []int64{18, 17, 16, 15, 14} + expSliceCap := []int64{32, 32, 32, 32, 32} numSeqsInList := dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value() for j := 0; j < len(arrivingSeqs); j++ { @@ -2181,7 +2181,7 @@ func TestSkippedSequenceCompact(t *testing.T) { testChangeCache := &changeCache{} if err := testChangeCache.Init(ctx, dbContext, dbContext.channelCache, nil, &CacheOptions{ CachePendingSeqMaxWait: 5 * time.Millisecond, - CacheSkippedSeqMaxWait: 2 * time.Second, + CacheSkippedSeqMaxWait: 1 * time.Second, }, dbContext.MetadataKeys); err != nil { log.Printf("Init failed for testChangeCache: %v", err) t.Fail() @@ -2206,9 +2206,12 @@ func TestSkippedSequenceCompact(t *testing.T) { // assert this pushes an entry on the skipped sequence slice require.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Equal(c, 1, len(testChangeCache.skippedSeqs.list)) + assert.Equal(c, 19, len(testChangeCache.skippedSeqs.list)) }, time.Second*10, time.Millisecond*100) + // manually call clean skipped sequence queue, avoiding race with actual wait time omn skipped + require.NoError(t, testChangeCache.CleanSkippedSequenceQueue(ctx)) + // assert that compaction empties the skipped slice and we have correct value for abandoned sequences require.EventuallyWithT(t, func(c *assert.CollectT) { testChangeCache.updateStats(ctx) @@ -2264,7 +2267,7 @@ func TestReleasedSequenceRangeHandlingEverythingSkipped(t *testing.T) { // assert that skipped list is filled and next seq at cache is updated require.EventuallyWithT(t, func(c *assert.CollectT) { testChangeCache.updateStats(ctx) - assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) + assert.Equal(c, int64(19), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) assert.Equal(c, int64(19), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) assert.Equal(c, uint64(21), testChangeCache.nextSequence) }, time.Second*10, time.Millisecond*100) @@ -2712,7 +2715,7 @@ func TestReleasedSequenceRangeHandlingEdgeCase2(t *testing.T) { // assert that the skipped list is filled require.EventuallyWithT(t, func(c *assert.CollectT) { testChangeCache.updateStats(ctx) - assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) + assert.Equal(c, int64(19), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) assert.Equal(c, int64(19), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.PendingSeqLen.Value()) dbContext.UpdateCalculatedStats(ctx) @@ -2794,7 +2797,7 @@ func TestReleasedSequenceRangeHandlingDuplicateSequencesInSkipped(t *testing.T) require.EventuallyWithT(t, func(c *assert.CollectT) { testChangeCache.updateStats(ctx) assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.PendingSeqLen.Value()) - assert.Equal(c, int64(2), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) + assert.Equal(c, int64(16), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) assert.Equal(c, int64(16), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) assert.Equal(c, uint64(19), testChangeCache.nextSequence) dbContext.UpdateCalculatedStats(ctx) @@ -2802,14 +2805,14 @@ func TestReleasedSequenceRangeHandlingDuplicateSequencesInSkipped(t *testing.T) }, time.Second*10, time.Millisecond*100) // process unusedSeq range with range containing duplicate sipped sequences - // Skipped should contain: (1-13), (15-17) before processing this range + // Skipped should contain: (1,2,3,4,5,6,7,8,9,10,11,12,13), (15,16,17) before processing this range testChangeCache.releaseUnusedSequenceRange(ctx, 10, 17, time.Now()) // assert skipped list altered to reflect the above range is processed require.EventuallyWithT(t, func(c *assert.CollectT) { testChangeCache.updateStats(ctx) assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.PendingSeqLen.Value()) - assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) + assert.Equal(c, int64(9), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) assert.Equal(c, int64(9), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) assert.Equal(c, uint64(19), testChangeCache.nextSequence) dbContext.UpdateCalculatedStats(ctx) @@ -2852,6 +2855,104 @@ func TestReleasedSequenceRangeHandlingDuplicateSequencesInSkipped(t *testing.T) }, time.Second*10, time.Millisecond*100) } +// TestRangeInSkippedSplit: +// - Test skipped handling when skipped list has ranges +// - Purpose to ensure range handling isn't broken now changes are to have more single entries +func TestRangeInSkippedSplit(t *testing.T) { + base.SetUpTestLogging(t, base.LevelDebug, base.KeyCache) + + ctx := base.TestCtx(t) + bucket := base.GetTestBucket(t) + dbContext, err := NewDatabaseContext(ctx, "db", bucket, false, DatabaseContextOptions{ + Scopes: GetScopesOptions(t, bucket, 1), + }) + require.NoError(t, err) + defer dbContext.Close(ctx) + + ctx = dbContext.AddDatabaseLogContext(ctx) + err = dbContext.StartOnlineProcesses(ctx) + require.NoError(t, err) + + testChangeCache := &changeCache{} + if err := testChangeCache.Init(ctx, dbContext, dbContext.channelCache, nil, &CacheOptions{ + CachePendingSeqMaxWait: 20 * time.Minute, + CacheSkippedSeqMaxWait: 20 * time.Minute, + CachePendingSeqMaxNum: 0, + }, dbContext.MetadataKeys); err != nil { + log.Printf("Init failed for testChangeCache: %v", err) + t.Fail() + } + + if err := testChangeCache.Start(0); err != nil { + log.Printf("Start error for testChangeCache: %v", err) + t.Fail() + } + defer testChangeCache.Stop(ctx) + require.NoError(t, err) + + // entry that will create a skipped range entry on the list + entry := &LogEntry{ + Sequence: uint64(MinSequencesForRange + 10), + DocID: fmt.Sprintf("doc_%d", 50), + RevID: "1-abcdefabcdefabcdef", + TimeReceived: time.Now(), + TimeSaved: time.Now(), + } + _ = testChangeCache.processEntry(ctx, entry) + + // process unusedSeq range with range in middle of skipped entry + testChangeCache.releaseUnusedSequenceRange(ctx, 10, 17, time.Now()) + + // assert on stats + require.EventuallyWithT(t, func(c *assert.CollectT) { + testChangeCache.updateStats(ctx) + assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.PendingSeqLen.Value()) + assert.Equal(c, int64(2), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) + assert.Equal(c, int64(31), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) + assert.Equal(c, uint64(41), testChangeCache.nextSequence) + dbContext.UpdateCalculatedStats(ctx) + assert.Equal(c, int64(40), dbContext.DbStats.CacheStats.HighSeqCached.Value()) + }, time.Second*10, time.Millisecond*100) + + // expected values for stats on skipped slice + arrivingSeqs := []uint64{3, 30, 25, 27} + expSliceLen := []int64{3, 4, 5, 6} + expSliceCap := []int64{4, 4, 8, 8} + + testChangeCache.updateStats(ctx) + numSeqsInList := dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value() + for j := 0; j < len(arrivingSeqs); j++ { + newEntry := &LogEntry{ + DocID: fmt.Sprintf("doc_%d", arrivingSeqs), + RevID: "1-abcdefabcdefabcdef", + Sequence: arrivingSeqs[j], + } + + _ = testChangeCache.processEntry(ctx, newEntry) + // assert on skipped sequence slice stats + testChangeCache.updateStats(ctx) + assert.Equal(t, numSeqsInList-1, dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) + assert.Equal(t, expSliceLen[j], dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) + assert.Equal(t, int64(39), dbContext.DbStats.CacheStats.NumSkippedSeqs.Value()) + assert.Equal(t, expSliceCap[j], dbContext.DbStats.CacheStats.SkippedSeqCap.Value()) + numSeqsInList = dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value() + } + + // empty list + testChangeCache.releaseUnusedSequenceRange(ctx, 1, 40, time.Now()) + + // assert on stats + require.EventuallyWithT(t, func(c *assert.CollectT) { + testChangeCache.updateStats(ctx) + assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.PendingSeqLen.Value()) + assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.SkippedSeqLen.Value()) + assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) + assert.Equal(c, uint64(41), testChangeCache.nextSequence) + dbContext.UpdateCalculatedStats(ctx) + assert.Equal(c, int64(40), dbContext.DbStats.CacheStats.HighSeqCached.Value()) + }, time.Second*10, time.Millisecond*100) +} + // getChanges is a synchronous convenience function that returns all changes as a simple array. This will fail the test if an error is returned. func getChanges(t *testing.T, collection *DatabaseCollectionWithUser, channels base.Set, options ChangesOptions) []*ChangeEntry { require.NotNil(t, options.ChangesCtx) @@ -2942,19 +3043,19 @@ func TestAddPendingLogs(t *testing.T) { // overlapping ranges, low range arrives first incoming: []sequenceRange{{4, 8}, {6, 10}}, expectedNextSequence: 11, - expectedSkipped: []sequenceRange{{1, 3}}, + expectedSkipped: []sequenceRange{{1, 1}, {2, 2}, {3, 3}}, }, { // completely overlapping ranges, larger range arrives first incoming: []sequenceRange{{4, 8}, {6, 8}}, expectedNextSequence: 9, - expectedSkipped: []sequenceRange{{1, 3}}, + expectedSkipped: []sequenceRange{{1, 1}, {2, 2}, {3, 3}}, }, { // range arrives then partly overlapping range arrives incoming: []sequenceRange{{4, 8}, {6, 10}}, expectedNextSequence: 11, - expectedSkipped: []sequenceRange{{1, 3}}, + expectedSkipped: []sequenceRange{{1, 1}, {2, 2}, {3, 3}}, }, { // range arrives, partly overlapping range arrives the single overlapping range @@ -2962,7 +3063,7 @@ func TestAddPendingLogs(t *testing.T) { channelName: "B", expectedNextSequence: 10, expectedCached: []uint64{9}, - expectedSkipped: []sequenceRange{{1, 3}}, + expectedSkipped: []sequenceRange{{1, 1}, {2, 2}, {3, 3}}, }, { // single range arrives, left side overlapping range arrives @@ -2976,13 +3077,13 @@ func TestAddPendingLogs(t *testing.T) { incoming: []sequenceRange{{4, 0}, {4, 5}}, channelName: "C", expectedNextSequence: 6, - expectedSkipped: []sequenceRange{{1, 3}}, + expectedSkipped: []sequenceRange{{1, 1}, {2, 2}, {3, 3}}, }, { // range arrives, lower end overlapping range arrives incoming: []sequenceRange{{6, 10}, {4, 8}}, expectedNextSequence: 11, - expectedSkipped: []sequenceRange{{1, 3}}, + expectedSkipped: []sequenceRange{{1, 1}, {2, 2}, {3, 3}}, }, } diff --git a/db/skipped_sequence.go b/db/skipped_sequence.go index 687e9cf382..965809f549 100644 --- a/db/skipped_sequence.go +++ b/db/skipped_sequence.go @@ -23,6 +23,7 @@ import ( const ( DefaultClipCapacityHeadroom = 1000 + MinSequencesForRange = 30 // minimum number of sequences required to store entry as range ) // SkippedSequenceSlice stores the set of skipped sequences as an ordered slice of single skipped sequences @@ -231,7 +232,7 @@ func (s *SkippedSequenceSlice) _removeSeqRange(ctx context.Context, startSeq, en // put this below a check for !found to avoid out of bound error rangeElem := s.list[startIndex] if endSeq > rangeElem.getLastSeq() { - base.DebugfCtx(ctx, base.KeyCache, "sequence range %d to %d specified has sequences in that are not present in skipped list", startSeq, endSeq) + base.DebugfCtx(ctx, base.KeyCache, "sequence range %d to %d specified has sequences in that are not present in skipped list, or sequence range spans multiple skipped entries", startSeq, endSeq) return base.ErrSkippedSequencesMissing } @@ -345,11 +346,13 @@ func (s *SkippedSequenceSlice) PushSkippedSequenceEntry(entry *SkippedSequenceLi // get index of last entry + last seq of entry index := len(s.list) - 1 lastEntryLastSeq := s.list[index].getLastSeq() - if (lastEntryLastSeq + 1) == entry.getStartSeq() { - // adding contiguous sequence + totalSeqs := s.list[index].getNumSequencesInEntry() + entry.getNumSequencesInEntry() + if (lastEntryLastSeq+1) == entry.getStartSeq() && totalSeqs > MinSequencesForRange { + // adding contiguous sequence, and we are above the min threshold to hold a range in list // set last seq in the range to the new arriving sequence + alter timestamp to incoming entries timestamp s.list[index].extendRange(entry.getLastSeq(), entry.getTimestamp()) } else { + // add new entry as separate item s.list = append(s.list, entry) } diff --git a/db/skipped_sequence_test.go b/db/skipped_sequence_test.go index 58c671adee..828d494e35 100644 --- a/db/skipped_sequence_test.go +++ b/db/skipped_sequence_test.go @@ -23,6 +23,7 @@ import ( // - Populate 10 single skipped sequence items in the slice // - Assert that each one is added in the correct order // - Assert that timestamp is increasing from the last entry (or equal to) +// - Add contiguous sequence to slice and assert that it is added as single range // - Add contiguous sequence to slice and assert that it extends the last element with a range func TestPushSingleSkippedSequence(t *testing.T) { skippedSlice := NewSkippedSequenceSlice(DefaultClipCapacityHeadroom) @@ -37,17 +38,28 @@ func TestPushSingleSkippedSequence(t *testing.T) { assert.GreaterOrEqual(t, skippedSlice.list[j].getTimestamp(), prevTime) prevTime = skippedSlice.list[j].getTimestamp() } - // add a new single entry that is contiguous with end of the slice which should replace last - // single entry with a range + // add a new single entry that is contiguous with end of the slice which is results in thee range being less than + // MinRangeThreshold so wil be added as single entry skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(19)) // grab last entry in list index := len(skippedSlice.list) - 1 entry := skippedSlice.list[index] + // assert last entry is single entry and start + end sequence on range is as expected + assert.True(t, entry.singleEntry()) + assert.Equal(t, uint64(19), entry.getStartSeq()) + assert.Equal(t, uint64(19), entry.getLastSeq()) + + // add a new single entry that is contiguous with end of the slice which is results in the range being greater than + // MinRangeThreshold so wil be added as range entry + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(20, 50)) + // grab last entry in list + index = len(skippedSlice.list) - 1 + entry = skippedSlice.list[index] // assert last entry is range entry and start + end sequence on range is as expected assert.False(t, entry.singleEntry()) - assert.Equal(t, uint64(18), entry.getStartSeq()) - assert.Equal(t, uint64(19), entry.getLastSeq()) + assert.Equal(t, uint64(19), entry.getStartSeq()) + assert.Equal(t, uint64(50), entry.getLastSeq()) } // TestPushSkippedSequenceRange: @@ -72,8 +84,9 @@ func TestPushSkippedSequenceRange(t *testing.T) { prevTime = skippedSlice.list[j].getTimestamp() } - // add a new range entry that is contiguous with end of the slice which should alter range last element in list - skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(96, 110)) + // add a new range entry that is contiguous with end of the slice which is greater then range threshold so should + // extend the existing last range + skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(96, 142)) // grab last entry in list index := len(skippedSlice.list) - 1 entry := skippedSlice.list[index] @@ -81,13 +94,13 @@ func TestPushSkippedSequenceRange(t *testing.T) { // assert last entry is range entry and start + end sequence on range is as expected assert.False(t, entry.singleEntry()) assert.Equal(t, uint64(90), entry.getStartSeq()) - assert.Equal(t, uint64(110), entry.getLastSeq()) + assert.Equal(t, uint64(142), entry.getLastSeq()) // add new single entry that is not contiguous with last element on slice skippedSlice.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(500)) // add new range that is contiguous with the single entry on the last element of the slice + garbage timestamp - // for later assertion + // for later assertion. This new range will not be greater than min threshold so wil be added a separate range newTimeStamp := time.Now().Unix() + 10000 skippedSlice.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntryAt(501, 510, newTimeStamp)) @@ -96,7 +109,7 @@ func TestPushSkippedSequenceRange(t *testing.T) { // assert that last element in list is a range and holds sequences we expect + timestamp // is what the new pushed range above holds assert.False(t, entry.singleEntry()) - assert.Equal(t, uint64(500), entry.getStartSeq()) + assert.Equal(t, uint64(501), entry.getStartSeq()) assert.Equal(t, uint64(510), entry.getLastSeq()) assert.Equal(t, newTimeStamp, entry.getTimestamp()) } diff --git a/rest/changes_test.go b/rest/changes_test.go index 20525f7b71..56ae42c7c8 100644 --- a/rest/changes_test.go +++ b/rest/changes_test.go @@ -315,7 +315,7 @@ func TestJumpInSequencesAtAllocatorSkippedSequenceFill(t *testing.T) { // wait for value to move from pending to cache and skipped list to fill require.EventuallyWithT(t, func(c *assert.CollectT) { rt.GetDatabase().UpdateCalculatedStats(ctx) - assert.Equal(c, int64(1), rt.GetDatabase().DbStats.CacheStats.SkippedSeqLen.Value()) + assert.Equal(c, int64(18), rt.GetDatabase().DbStats.CacheStats.SkippedSeqLen.Value()) }, time.Second*10, time.Millisecond*100) docVrs := rt.UpdateDoc("doc", vrs, `{"prob": "lol"}`) From 065a5bb41a8d24b8e3a14b2c1735602c779c3402 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Tue, 29 Oct 2024 14:41:33 +0000 Subject: [PATCH 2/3] fix spelling mistake in logging --- db/skipped_sequence.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/skipped_sequence.go b/db/skipped_sequence.go index 965809f549..5157e63b7a 100644 --- a/db/skipped_sequence.go +++ b/db/skipped_sequence.go @@ -400,7 +400,7 @@ func (s *SkippedSequenceSlice) processUnusedSequenceRangeAtSkipped(ctx context.C } } else if err != nil { // if we get here then the skipped list must be empty - base.InfofCtx(ctx, base.KeyCache, "error attempting to remove unused sequence range form skipped: %v", err) + base.InfofCtx(ctx, base.KeyCache, "error attempting to remove unused sequence range from skipped: %v", err) } } From 025e2a49f594beeb745d38ad605f0f56bab5a7d0 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Wed, 30 Oct 2024 10:23:38 +0000 Subject: [PATCH 3/3] move code to be more mutex efficicent --- db/change_cache.go | 5 ++--- db/skipped_sequence.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/db/change_cache.go b/db/change_cache.go index ab4f428514..daa92b15c1 100644 --- a/db/change_cache.go +++ b/db/change_cache.go @@ -993,9 +993,8 @@ func (c *changeCache) PushSkipped(ctx context.Context, startSeq uint64, endSeq u if numSeqs > MinSequencesForRange { c.skippedSeqs.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(startSeq, endSeq)) } else { - for i := startSeq; i <= endSeq; i++ { - c.skippedSeqs.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(i)) - } + // push sequences as separate entries + c.skippedSeqs.PushSkippedSequenceEntries(startSeq, endSeq, int64(numSeqs)) } } diff --git a/db/skipped_sequence.go b/db/skipped_sequence.go index 5157e63b7a..a26eb83153 100644 --- a/db/skipped_sequence.go +++ b/db/skipped_sequence.go @@ -328,6 +328,35 @@ func (s *SkippedSequenceSlice) _insert(index int, entry *SkippedSequenceListEntr s.list[index] = entry } +// PushSkippedSequenceEntries will push seq range to end of slice as separate single sequence entries unless the range +// being pushed in contiguous with end of the slice and that contiguous range is grater than the min range threshold +func (s *SkippedSequenceSlice) PushSkippedSequenceEntries(startSeq, endSeq uint64, numSeqsIncoming int64) { + s.lock.Lock() + defer s.lock.Unlock() + + // update num current skipped sequences count + the cumulative count of skipped sequences + s.NumCurrentSkippedSequences += numSeqsIncoming + s.NumCumulativeSkippedSequences += numSeqsIncoming + + // check if we should be extending the last entry on the slice + if len(s.list) != 0 { + index := len(s.list) - 1 + lastEntryLastSeq := s.list[index].getLastSeq() + totalSeqs := s.list[index].getNumSequencesInEntry() + numSeqsIncoming + if (lastEntryLastSeq+1) == startSeq && totalSeqs > MinSequencesForRange { + // adding contiguous sequence, and we are above the min threshold to hold a range in list + // set last seq in the range to the new arriving sequence + alter timestamp to incoming entries timestamp + s.list[index].extendRange(endSeq, time.Now().Unix()) + return + } + } + + // push all items separate to end of slice + for i := startSeq; i <= endSeq; i++ { + s.list = append(s.list, NewSingleSkippedSequenceEntry(i)) + } +} + // PushSkippedSequenceEntry will append a new skipped sequence entry to the end of the slice, if adding a contiguous // sequence function will expand the last entry of the slice to reflect this func (s *SkippedSequenceSlice) PushSkippedSequenceEntry(entry *SkippedSequenceListEntry) {