Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-3736: delta sync for cv #7141

Open
wants to merge 4 commits into
base: release/anemone
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions db/attachment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@ func TestBackupOldRevisionWithAttachments(t *testing.T) {
var rev2Body Body
rev2Data := `{"test": true, "updated": true, "_attachments": {"hello.txt": {"stub": true, "revpos": 1}}}`
require.NoError(t, base.JSONUnmarshal([]byte(rev2Data), &rev2Body))
_, _, err = collection.PutExistingRevWithBody(ctx, docID, rev2Body, []string{"2-abc", rev1ID}, true, ExistingVersionWithUpdateToHLV)
_, rev2ID, err := collection.PutExistingRevWithBody(ctx, docID, rev2Body, []string{"2-abc", rev1ID}, true, ExistingVersionWithUpdateToHLV)
require.NoError(t, err)
rev2ID := "2-abc"

// now in any case - we'll have rev 1 backed up
rev1OldBody, err = collection.getOldRevisionJSON(ctx, docID, rev1ID)
Expand Down
21 changes: 12 additions & 9 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error {
func (bsc *BlipSyncContext) sendRevAsDelta(ctx context.Context, sender *blip.Sender, docID, revID string, deltaSrcRevID string, seq SequenceID, knownRevs map[string]bool, maxHistory int, handleChangesResponseCollection *DatabaseCollectionWithUser, collectionIdx *int) error {
bsc.replicationStats.SendRevDeltaRequestedCount.Add(1)

revDelta, redactedRev, err := handleChangesResponseCollection.GetDelta(ctx, docID, deltaSrcRevID, revID)
revDelta, redactedRev, err := handleChangesResponseCollection.GetDelta(ctx, docID, deltaSrcRevID, revID, bsc.useHLV())
if err == ErrForbidden { // nolint: gocritic // can't convert if/else if to switch since base.IsFleeceDeltaError is not switchable
return err
} else if base.IsFleeceDeltaError(err) {
Expand All @@ -895,7 +895,12 @@ func (bsc *BlipSyncContext) sendRevAsDelta(ctx context.Context, sender *blip.Sen
}

if redactedRev != nil {
history := toHistory(redactedRev.History, knownRevs, maxHistory)
var history []string
if !bsc.useHLV() {
history = toHistory(redactedRev.History, knownRevs, maxHistory)
} else {
history = append(history, redactedRev.hlvHistory)
}
properties := blipRevMessageProperties(history, redactedRev.Deleted, seq, "")
return bsc.sendRevisionWithProperties(ctx, sender, docID, revID, collectionIdx, redactedRev.BodyBytes, nil, properties, seq, nil)
}
Expand Down Expand Up @@ -1091,9 +1096,11 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
// revisions to malicious actors (in the scenario where that user has write but not read access).
var deltaSrcRev DocumentRevision
if bh.useHLV() {
cv := Version{}
cv.SourceID, cv.Value = incomingHLV.GetCurrentVersion()
deltaSrcRev, err = bh.collection.GetCV(bh.loggingCtx, docID, &cv)
deltaSrcVersion, parseErr := ParseVersion(deltaSrcRevID)
if parseErr != nil {
return base.HTTPErrorf(http.StatusUnprocessableEntity, "Unable to parse version for delta source for doc %s, error: %v", base.UD(docID), err)
}
deltaSrcRev, err = bh.collection.GetCV(bh.loggingCtx, docID, &deltaSrcVersion)
} else {
deltaSrcRev, err = bh.collection.GetRev(bh.loggingCtx, docID, deltaSrcRevID, false, nil)
}
Expand Down Expand Up @@ -1620,7 +1627,3 @@ func allowedAttachmentKey(docID, digest string, activeCBMobileSubprotocol CBMobi
func (bh *blipHandler) logEndpointEntry(profile, endpoint string) {
base.InfofCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s %s", bh.serialNumber, profile, endpoint)
}

func (bh *blipHandler) useHLV() bool {
return bh.activeCBMobileSubprotocol >= CBMobileReplicationV4
}
29 changes: 23 additions & 6 deletions db/blip_sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b

var err error

// fall back to sending full revision v4 protocol, delta sync not yet implemented for v4
if deltaSrcRevID != "" && bsc.activeCBMobileSubprotocol <= CBMobileReplicationV3 {
// fallback to sending full revision if protocol is < v4
if deltaSrcRevID != "" && bsc.useHLV() {
err = bsc.sendRevAsDelta(ctx, sender, docID, rev, deltaSrcRevID, seq, knownRevs, maxHistory, handleChangesResponseDbCollection, collectionIdx)
} else {
err = bsc.sendRevision(ctx, sender, docID, rev, seq, knownRevs, maxHistory, handleChangesResponseDbCollection, collectionIdx)
Expand Down Expand Up @@ -568,12 +568,23 @@ func (bsc *BlipSyncContext) setUseDeltas(clientCanUseDeltas bool) {

func (bsc *BlipSyncContext) sendDelta(ctx context.Context, sender *blip.Sender, docID string, collectionIdx *int, deltaSrcRevID string, revDelta *RevisionDelta, seq SequenceID, resendFullRevisionFunc func() error) error {

properties := blipRevMessageProperties(revDelta.RevisionHistory, revDelta.ToDeleted, seq, "")
var history []string
if bsc.useHLV() {
history = append(history, revDelta.HlvHistory)
} else {
history = revDelta.RevisionHistory
}
properties := blipRevMessageProperties(history, revDelta.ToDeleted, seq, "")
properties[RevMessageDeltaSrc] = deltaSrcRevID

base.DebugfCtx(ctx, base.KeySync, "Sending rev %q %s as delta. DeltaSrc:%s", base.UD(docID), revDelta.ToRevID, deltaSrcRevID)
return bsc.sendRevisionWithProperties(ctx, sender, docID, revDelta.ToRevID, collectionIdx, revDelta.DeltaBytes, revDelta.AttachmentStorageMeta,
properties, seq, resendFullRevisionFunc)
if bsc.useHLV() {
return bsc.sendRevisionWithProperties(ctx, sender, docID, revDelta.ToCV, collectionIdx, revDelta.DeltaBytes, revDelta.AttachmentStorageMeta,
properties, seq, resendFullRevisionFunc)
} else {
return bsc.sendRevisionWithProperties(ctx, sender, docID, revDelta.ToRevID, collectionIdx, revDelta.DeltaBytes, revDelta.AttachmentStorageMeta,
properties, seq, resendFullRevisionFunc)
}
}

// sendBLIPMessage is a simple wrapper around all sent BLIP messages
Expand Down Expand Up @@ -717,7 +728,9 @@ func (bsc *BlipSyncContext) sendRevision(ctx context.Context, sender *blip.Sende
if bsc.activeCBMobileSubprotocol <= CBMobileReplicationV3 {
history = toHistory(docRev.History, knownRevs, maxHistory)
} else {
history = append(history, docRev.hlvHistory)
if docRev.hlvHistory != "" {
history = append(history, docRev.hlvHistory)
}
}

properties := blipRevMessageProperties(history, docRev.Deleted, seq, replacedRevID)
Expand Down Expand Up @@ -792,3 +805,7 @@ func (bsc *BlipSyncContext) reportStats(updateImmediately bool) {
bsc.stats.lastReportTime.Store(currentTime)

}

func (bsc *BlipSyncContext) useHLV() bool {
return bsc.activeCBMobileSubprotocol >= CBMobileReplicationV4
}
49 changes: 36 additions & 13 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,13 +410,25 @@ func (db *DatabaseCollectionWithUser) GetCV(ctx context.Context, docid string, c

// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated,
// returns nil.
func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromRevID, toRevID string) (delta *RevisionDelta, redactedRev *DocumentRevision, err error) {
func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromRev, toRev string, useCVRevCache bool) (delta *RevisionDelta, redactedRev *DocumentRevision, err error) {

if docID == "" || fromRevID == "" || toRevID == "" {
if docID == "" || fromRev == "" || toRev == "" {
return nil, nil, nil
}

fromRevision, err := db.revisionCache.GetWithRev(ctx, docID, fromRevID, RevCacheIncludeDelta)
var fromRevision DocumentRevision
var fromRevVrs Version
if useCVRevCache {
fromRevVrs, err = ParseVersion(fromRev)
if err != nil {
return nil, nil, err
}
fromRevision, err = db.revisionCache.GetWithCV(ctx, docID, &fromRevVrs, RevCacheIncludeDelta)
if err != nil {
return nil, nil, err
}
} else {
return nil, nil, fmt.Errorf("delta sync for rev tree not yet implemented")
}

// If the fromRevision is a removal cache entry (no body), but the user has access to that removal, then just
// return 404 missing to indicate that the body of the revision is no longer available.
Expand All @@ -437,9 +449,9 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR

// If delta is found, check whether it is a delta for the toRevID we want
if fromRevision.Delta != nil {
if fromRevision.Delta.ToRevID == toRevID {
if fromRevision.Delta.ToCV == toRev {

isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, nil, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory))
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRev, fromRevision.CV, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory))
if !isAuthorized {
return nil, &redactedBody, nil
}
Expand All @@ -454,15 +466,18 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
// Delta is unavailable, but the body is available.
if fromRevision.BodyBytes != nil {

// db.DbStats.StatsDeltaSync().Add(base.StatKeyDeltaCacheMisses, 1)
db.dbStats().DeltaSync().DeltaCacheMiss.Add(1)
toRevision, err := db.revisionCache.GetWithRev(ctx, docID, toRevID, RevCacheIncludeDelta)
cv, err := ParseVersion(toRev)
if err != nil {
return nil, nil, err
}
toRevision, err := db.revisionCache.GetWithCV(ctx, docID, &cv, RevCacheIncludeDelta)
if err != nil {
return nil, nil, err
}

deleted := toRevision.Deleted
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, nil, toRevision.Channels, deleted, toRevision.History)
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRev, toRevision.CV, toRevision.Channels, deleted, toRevision.History)
if !isAuthorized {
return nil, &redactedBody, nil
}
Expand All @@ -473,8 +488,12 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR

// If the revision we're generating a delta to is a tombstone, mark it as such and don't bother generating a delta
if deleted {
revCacheDelta := newRevCacheDelta([]byte(base.EmptyDocument), fromRevID, toRevision, deleted, nil)
db.revisionCache.UpdateDelta(ctx, docID, fromRevID, revCacheDelta)
revCacheDelta := newRevCacheDelta([]byte(base.EmptyDocument), fromRev, toRevision, deleted, nil)
if useCVRevCache {
db.revisionCache.UpdateDeltaCV(ctx, docID, &fromRevVrs, revCacheDelta)
} else {
db.revisionCache.UpdateDelta(ctx, docID, fromRev, revCacheDelta)
}
return &revCacheDelta, nil, nil
}

Expand Down Expand Up @@ -511,10 +530,14 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
if err != nil {
return nil, nil, err
}
revCacheDelta := newRevCacheDelta(deltaBytes, fromRevID, toRevision, deleted, toRevAttStorageMeta)
revCacheDelta := newRevCacheDelta(deltaBytes, fromRev, toRevision, deleted, toRevAttStorageMeta)

// Write the newly calculated delta back into the cache before returning
db.revisionCache.UpdateDelta(ctx, docID, fromRevID, revCacheDelta)
if useCVRevCache {
db.revisionCache.UpdateDeltaCV(ctx, docID, &fromRevVrs, revCacheDelta)
} else {
db.revisionCache.UpdateDelta(ctx, docID, fromRev, revCacheDelta)
}
return &revCacheDelta, nil, nil
}

Expand Down
Loading
Loading