Skip to content
This repository has been archived by the owner on Dec 15, 2022. It is now read-only.

Resolve conflicts during remote insertion integration in O(log n) #4

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions lib/document-tree.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,11 @@ class DocumentTree extends SplayTree {
throw new Error('No segment found')
}

insertBetween (prev, next, newSegment) {
this.splayNode(prev)
insertBefore (next, newSegment) {
this.splayNode(next)
this.root = newSegment
newSegment.documentLeft = prev
prev.documentParent = newSegment
newSegment.documentLeft = next.documentLeft
if (next.documentLeft) next.documentLeft.documentParent = newSegment
newSegment.documentRight = next
next.documentParent = newSegment
next.documentLeft = null
Expand All @@ -108,16 +107,31 @@ class DocumentTree extends SplayTree {
updateSubtreeExtent (node, undoCountOverrides) {
node.documentSubtreeExtent = ZERO_POINT
node.documentSubtreeSize = 1
node.minClock = node.insertionClock
node.minSiteId = node.spliceId.site

if (node.documentLeft) {
node.documentSubtreeExtent = traverse(node.documentSubtreeExtent, node.documentLeft.documentSubtreeExtent)
node.documentSubtreeSize += node.documentLeft.documentSubtreeSize
if (node.documentLeft.minClock < node.minClock) {
node.minClock = node.documentLeft.minClock
node.minSiteId = node.documentLeft.minSiteId
} else if (node.documentLeft.minClock === node.minClock) {
node.minSiteId = Math.min(node.minSiteId, node.documentLeft.minSiteId)
}
}
if (this.isSegmentVisible(node, undoCountOverrides)) {
node.documentSubtreeExtent = traverse(node.documentSubtreeExtent, node.extent)
}
if (node.documentRight) {
node.documentSubtreeExtent = traverse(node.documentSubtreeExtent, node.documentRight.documentSubtreeExtent)
node.documentSubtreeSize += node.documentRight.documentSubtreeSize
if (node.documentRight.minClock < node.minClock) {
node.minClock = node.documentRight.minClock
node.minSiteId = node.documentRight.minSiteId
} else if (node.documentRight.minClock === node.minClock) {
node.minSiteId = Math.min(node.minSiteId, node.documentRight.minSiteId)
}
}
}

Expand Down
67 changes: 35 additions & 32 deletions lib/document.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class Document {
constructor ({siteId, text, history}) {
assert(siteId !== 0, 'siteId 0 is reserved')
this.siteId = siteId
this.insertionClock = 0
this.nextSequenceNumber = 1
this.splitTreesBySpliceId = new Map()
this.deletionsBySpliceId = new Map()
Expand All @@ -23,10 +24,10 @@ class Document {
this.redoStack = []
this.nextCheckpointId = 1

const firstSegment = {spliceId: {site: 0, seq: 0}, offset: ZERO_POINT, text: '', extent: ZERO_POINT, nextSplit: null, deletions: new Set()}
const firstSegment = {spliceId: {site: 0, seq: 0}, offset: ZERO_POINT, text: '', extent: ZERO_POINT, nextSplit: null, deletions: new Set(), insertionClock: Infinity}
this.splitTreesBySpliceId.set(spliceIdToString(firstSegment.spliceId), new SplitTree(firstSegment))

const lastSegment = {spliceId: {site: 0, seq: 1}, offset: ZERO_POINT, text: '', extent: ZERO_POINT, nextSplit: null, deletions: new Set()}
const lastSegment = {spliceId: {site: 0, seq: 1}, offset: ZERO_POINT, text: '', extent: ZERO_POINT, nextSplit: null, deletions: new Set(), insertionClock: -Infinity}
this.splitTreesBySpliceId.set(spliceIdToString(lastSegment.spliceId), new SplitTree(lastSegment))

this.documentTree = new DocumentTree(
Expand Down Expand Up @@ -477,9 +478,11 @@ class Document {
}

insert (spliceId, position, text) {
const insertionClock = this.insertionClock++
const [left, right] = this.findLocalSegmentBoundary(position)
const newSegment = {
spliceId,
insertionClock,
text,
extent: extentForText(text),
offset: ZERO_POINT,
Expand All @@ -488,11 +491,12 @@ class Document {
nextSplit: null,
deletions: new Set()
}
this.documentTree.insertBetween(left, right, newSegment)
this.documentTree.insertBefore(right, newSegment)
this.splitTreesBySpliceId.set(spliceIdToString(spliceId), new SplitTree(newSegment))

return {
text,
insertionClock,
leftDependencyId: left.spliceId,
offsetInLeftDependency: traverse(left.offset, left.extent),
rightDependencyId: right.spliceId,
Expand Down Expand Up @@ -665,47 +669,46 @@ class Document {
}

integrateInsertion (spliceId, operation) {
const {text, leftDependencyId, offsetInLeftDependency, rightDependencyId, offsetInRightDependency} = operation

const originalRightDependency = this.findSegmentStart(rightDependencyId, offsetInRightDependency)
const originalLeftDependency = this.findSegmentEnd(leftDependencyId, offsetInLeftDependency)

this.documentTree.splayNode(originalLeftDependency)
this.documentTree.splayNode(originalRightDependency)

let currentSegment = this.documentTree.getSuccessor(originalLeftDependency)
let leftDependency = originalLeftDependency
let rightDependency = originalRightDependency
while (currentSegment !== rightDependency) {
const leftDependencyIndex = this.documentTree.getSegmentIndex(leftDependency)
const rightDependencyIndex = this.documentTree.getSegmentIndex(rightDependency)
const currentSegmentLeftDependencyIndex = this.documentTree.getSegmentIndex(currentSegment.leftDependency)
const currentSegmentRightDependencyIndex = this.documentTree.getSegmentIndex(currentSegment.rightDependency)

if (currentSegmentLeftDependencyIndex <= leftDependencyIndex && currentSegmentRightDependencyIndex >= rightDependencyIndex) {
if (spliceId.site < currentSegment.spliceId.site) {
rightDependency = currentSegment
} else {
leftDependency = currentSegment
}

currentSegment = this.documentTree.getSuccessor(leftDependency)
const {text, insertionClock, leftDependencyId, offsetInLeftDependency, rightDependencyId, offsetInRightDependency} = operation
this.insertionClock = Math.max(this.insertionClock, insertionClock) + 1

const rightDependency = this.findSegmentStart(rightDependencyId, offsetInRightDependency)
const leftDependency = this.findSegmentEnd(leftDependencyId, offsetInLeftDependency)

this.documentTree.splayNode(leftDependency)
this.documentTree.splayNode(rightDependency)
if (rightDependency.documentLeft !== leftDependency) {
this.documentTree.rotateNodeRight(leftDependency)
}

let successor = rightDependency
let segment = leftDependency.documentRight
while (segment) {
const left = segment.documentLeft
const right = segment.documentRight
if (left && (left.minClock < insertionClock || (left.minClock === insertionClock && left.minSiteId < spliceId.site))) {
successor = segment
segment = left
} else if (insertionClock > segment.insertionClock || (insertionClock === segment.insertionClock && spliceId.site > segment.spliceId.site)) {
successor = segment
break
} else {
currentSegment = this.documentTree.getSuccessor(currentSegment)
segment = right
}
}

const newSegment = {
spliceId,
insertionClock,
offset: ZERO_POINT,
text,
extent: extentForText(text),
leftDependency: originalLeftDependency,
rightDependency: originalRightDependency,
leftDependency,
rightDependency,
nextSplit: null,
deletions: new Set()
}
this.documentTree.insertBetween(leftDependency, rightDependency, newSegment)
this.documentTree.insertBefore(successor, newSegment)
this.splitTreesBySpliceId.set(spliceIdToString(spliceId), new SplitTree(newSegment))
}

Expand Down
2 changes: 2 additions & 0 deletions lib/serialization.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ function serializeSplice (splice) {
function serializeInsertion (insertion) {
const insertionMessage = new Operation.Splice.Insertion()
insertionMessage.setText(insertion.text)
insertionMessage.setInsertionClock(insertion.insertionClock)
insertionMessage.setLeftDependencyId(serializeSpliceId(insertion.leftDependencyId))
insertionMessage.setOffsetInLeftDependency(serializePoint(insertion.offsetInLeftDependency))
insertionMessage.setRightDependencyId(serializeSpliceId(insertion.rightDependencyId))
Expand Down Expand Up @@ -148,6 +149,7 @@ function deserializeSplice (spliceMessage) {
function deserializeInsertion (insertionMessage) {
return {
text: insertionMessage.getText(),
insertionClock: insertionMessage.getInsertionClock(),
leftDependencyId: deserializeSpliceId(insertionMessage.getLeftDependencyId()),
offsetInLeftDependency: deserializePoint(insertionMessage.getOffsetInLeftDependency()),
rightDependencyId: deserializeSpliceId(insertionMessage.getRightDependencyId()),
Expand Down
29 changes: 28 additions & 1 deletion lib/teletype-crdt_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,8 @@ proto.Operation.Splice.Insertion.toObject = function(includeInstance, msg) {
leftDependencyId: (f = msg.getLeftDependencyId()) && proto.Operation.SpliceId.toObject(includeInstance, f),
offsetInLeftDependency: (f = msg.getOffsetInLeftDependency()) && proto.Operation.Point.toObject(includeInstance, f),
rightDependencyId: (f = msg.getRightDependencyId()) && proto.Operation.SpliceId.toObject(includeInstance, f),
offsetInRightDependency: (f = msg.getOffsetInRightDependency()) && proto.Operation.Point.toObject(includeInstance, f)
offsetInRightDependency: (f = msg.getOffsetInRightDependency()) && proto.Operation.Point.toObject(includeInstance, f),
insertionClock: jspb.Message.getFieldWithDefault(msg, 7, 0)
};

if (includeInstance) {
Expand Down Expand Up @@ -476,6 +477,10 @@ proto.Operation.Splice.Insertion.deserializeBinaryFromReader = function(msg, rea
reader.readMessage(value,proto.Operation.Point.deserializeBinaryFromReader);
msg.setOffsetInRightDependency(value);
break;
case 7:
var value = /** @type {number} */ (reader.readUint32());
msg.setInsertionClock(value);
break;
default:
reader.skipField();
break;
Expand Down Expand Up @@ -544,6 +549,13 @@ proto.Operation.Splice.Insertion.serializeBinaryToWriter = function(message, wri
proto.Operation.Point.serializeBinaryToWriter
);
}
f = message.getInsertionClock();
if (f !== 0) {
writer.writeUint32(
7,
f
);
}
};


Expand Down Expand Up @@ -682,6 +694,21 @@ proto.Operation.Splice.Insertion.prototype.hasOffsetInRightDependency = function
};


/**
* optional uint32 insertion_clock = 7;
* @return {number}
*/
proto.Operation.Splice.Insertion.prototype.getInsertionClock = function() {
return /** @type {number} */ (jspb.Message.getFieldWithDefault(this, 7, 0));
};


/** @param {number} value */
proto.Operation.Splice.Insertion.prototype.setInsertionClock = function(value) {
jspb.Message.setField(this, 7, value);
};



/**
* Generated by JsPbCodeGenerator.
Expand Down
1 change: 1 addition & 0 deletions teletype-crdt.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ message Operation {
Point offset_in_left_dependency = 4;
SpliceId right_dependency_id = 5;
Point offset_in_right_dependency = 6;
uint32 insertion_clock = 7;
}

message Deletion {
Expand Down
10 changes: 5 additions & 5 deletions test/document.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ suite('Document', () => {
integrateOperations(replica1, ops2)
integrateOperations(replica2, ops1)

assert.equal(replica1.testLocalDocument.text, 'ab')
assert.equal(replica2.testLocalDocument.text, 'ab')
assert.equal(replica1.testLocalDocument.text, 'ba')
assert.equal(replica2.testLocalDocument.text, 'ba')
})

test('concurrent inserts at the same position inside a previous insertion', () => {
Expand All @@ -29,8 +29,8 @@ suite('Document', () => {
integrateOperations(replica1, ops2)
integrateOperations(replica2, ops1)

assert.equal(replica1.testLocalDocument.text, 'AB+++***CDEFG')
assert.equal(replica2.testLocalDocument.text, 'AB+++***CDEFG')
assert.equal(replica1.testLocalDocument.text, 'AB***+++CDEFG')
assert.equal(replica2.testLocalDocument.text, 'AB***+++CDEFG')
})

test('concurrent inserts at different positions inside a previous insertion', () => {
Expand Down Expand Up @@ -1032,7 +1032,7 @@ suite('Document', () => {
for (var i = 0; i < 1000; i++) {
const peers = Peer.buildNetwork(peerCount, '')
let seed = initialSeed + i
// seed = 1504270975436
// seed = 1510334047209
// global.enableLog = true
const failureMessage = `Random seed: ${seed}`
try {
Expand Down
1 change: 1 addition & 0 deletions test/serialization.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ suite('serialization/deserialization', () => {
spliceId: {site: 1, seq: 2},
insertion: {
text: 'hello',
insertionClock: 3,
leftDependencyId: {site: 1, seq: 1},
offsetInLeftDependency: {row: 0, column: 5},
rightDependencyId: {site: 1, seq: 1},
Expand Down