@fluidframework/protocol-base
Advanced tools
Comparing version 0.1025.0-25508 to 0.1025.0-25645
@@ -8,3 +8,3 @@ /*! | ||
export declare const pkgName = "@fluidframework/protocol-base"; | ||
export declare const pkgVersion = "0.1025.0-25508"; | ||
export declare const pkgVersion = "0.1025.0-25645"; | ||
//# sourceMappingURL=packageVersion.d.ts.map |
@@ -11,3 +11,3 @@ "use strict"; | ||
exports.pkgName = "@fluidframework/protocol-base"; | ||
exports.pkgVersion = "0.1025.0-25508"; | ||
exports.pkgVersion = "0.1025.0-25645"; | ||
//# sourceMappingURL=packageVersion.js.map |
@@ -26,4 +26,7 @@ /*! | ||
processMessage(message: ISequencedDocumentMessage, local: boolean): IProcessMessageResult; | ||
/** | ||
* Gets the scribe protocol state | ||
*/ | ||
getProtocolState(): IScribeProtocolState; | ||
} | ||
//# sourceMappingURL=protocol.d.ts.map |
@@ -35,3 +35,3 @@ "use strict"; | ||
this.term = term !== null && term !== void 0 ? term : 1; | ||
this.quorum = new quorum_1.Quorum(minimumSequenceNumber, members, proposals, values, sendProposal, sendReject); | ||
this.quorum = new quorum_1.Quorum(members, proposals, values, sendProposal, sendReject); | ||
} | ||
@@ -42,2 +42,10 @@ close() { | ||
processMessage(message, local) { | ||
// verify it's moving sequentially | ||
if (message.sequenceNumber !== this.sequenceNumber + 1) { | ||
throw new Error(`Protocol state is not moving sequentially. ` + | ||
`Current is ${this.sequenceNumber}. Next is ${message.sequenceNumber}`); | ||
} | ||
// Update tracked sequence numbers | ||
this.sequenceNumber = message.sequenceNumber; | ||
this.minimumSequenceNumber = message.minimumSequenceNumber; | ||
let immediateNoOp = false; | ||
@@ -71,5 +79,2 @@ switch (message.type) { | ||
} | ||
// Update tracked sequence numbers | ||
this.minimumSequenceNumber = message.minimumSequenceNumber; | ||
this.sequenceNumber = message.sequenceNumber; | ||
// Notify the quorum of the MSN from the message. We rely on it to handle duplicate values but may | ||
@@ -80,11 +85,9 @@ // want to move that logic to this class. | ||
} | ||
/** | ||
* Gets the scribe protocol state | ||
*/ | ||
getProtocolState() { | ||
const quorumSnapshot = this.quorum.snapshot(); | ||
return { | ||
members: quorumSnapshot.members, | ||
minimumSequenceNumber: this.minimumSequenceNumber, | ||
proposals: quorumSnapshot.proposals, | ||
sequenceNumber: this.sequenceNumber, | ||
values: quorumSnapshot.values, | ||
}; | ||
// return a new object every time | ||
// this ensures future state changes will not affect outside callers | ||
return Object.assign({ sequenceNumber: this.sequenceNumber, minimumSequenceNumber: this.minimumSequenceNumber }, this.quorum.snapshot()); | ||
} | ||
@@ -91,0 +94,0 @@ } |
@@ -17,3 +17,2 @@ /*! | ||
export declare class Quorum extends TypedEventEmitter<IQuorumEvents> implements IQuorum { | ||
private minimumSequenceNumber; | ||
private readonly sendProposal; | ||
@@ -28,6 +27,32 @@ private readonly sendReject; | ||
private readonly localProposals; | ||
constructor(minimumSequenceNumber: number | undefined, members: [string, ISequencedClient][], proposals: [number, ISequencedProposal, string[]][], values: [string, ICommittedProposal][], sendProposal: (key: string, value: any) => number, sendReject: (sequenceNumber: number) => void); | ||
/** | ||
* Cached snapshot state | ||
* The quorum consists of 3 properties: members, values, and proposals. | ||
* Depending on the op being processed, some or none of those properties may change. | ||
* Each property will be cached and the cache for each property will be cleared when an op causes a change. | ||
*/ | ||
private readonly snapshotCache; | ||
constructor(members: [string, ISequencedClient][], proposals: [number, ISequencedProposal, string[]][], values: [string, ICommittedProposal][], sendProposal: (key: string, value: any) => number, sendReject: (sequenceNumber: number) => void); | ||
close(): void; | ||
/** | ||
* Snapshots the entire quorum | ||
* @returns a quorum snapshot | ||
*/ | ||
snapshot(): IQuorumSnapshot; | ||
/** | ||
* Snapshots quorum members | ||
* @returns a deep cloned array of members | ||
*/ | ||
snapshotMembers(): IQuorumSnapshot["members"]; | ||
/** | ||
* Snapshots quorum proposals | ||
* @returns a deep cloned array of proposals | ||
*/ | ||
snapshotProposals(): IQuorumSnapshot["proposals"]; | ||
/** | ||
* Snapshots quorum values | ||
* @returns a deep cloned array of values | ||
*/ | ||
snapshotValues(): IQuorumSnapshot["values"]; | ||
/** | ||
* Returns whether the quorum has achieved a consensus for the given key. | ||
@@ -34,0 +59,0 @@ */ |
@@ -50,5 +50,4 @@ "use strict"; | ||
class Quorum extends common_utils_1.TypedEventEmitter { | ||
constructor(minimumSequenceNumber, members, proposals, values, sendProposal, sendReject) { | ||
constructor(members, proposals, values, sendProposal, sendReject) { | ||
super(); | ||
this.minimumSequenceNumber = minimumSequenceNumber; | ||
this.sendProposal = sendProposal; | ||
@@ -59,2 +58,9 @@ this.sendReject = sendReject; | ||
this.localProposals = new Map(); | ||
/** | ||
* Cached snapshot state | ||
* The quorum consists of 3 properties: members, values, and proposals. | ||
* Depending on the op being processed, some or none of those properties may change. | ||
* Each property will be cached and the cache for each property will be cleared when an op causes a change. | ||
*/ | ||
this.snapshotCache = {}; | ||
this.members = new Map(members); | ||
@@ -75,4 +81,27 @@ this.proposals = new Map(proposals.map(([, proposal, rejections]) => { | ||
} | ||
/** | ||
* Snapshots the entire quorum | ||
* @returns a quorum snapshot | ||
*/ | ||
snapshot() { | ||
const serializedProposals = Array.from(this.proposals).map(([sequenceNumber, proposal]) => [ | ||
var _a, _b, _c; | ||
var _d, _e, _f; | ||
(_a = (_d = this.snapshotCache).members) !== null && _a !== void 0 ? _a : (_d.members = this.snapshotMembers()); | ||
(_b = (_e = this.snapshotCache).proposals) !== null && _b !== void 0 ? _b : (_e.proposals = this.snapshotProposals()); | ||
(_c = (_f = this.snapshotCache).values) !== null && _c !== void 0 ? _c : (_f.values = this.snapshotValues()); | ||
return Object.assign({}, this.snapshotCache); | ||
} | ||
/** | ||
* Snapshots quorum members | ||
* @returns a deep cloned array of members | ||
*/ | ||
snapshotMembers() { | ||
return cloneDeep_1.default(Array.from(this.members)); | ||
} | ||
/** | ||
* Snapshots quorum proposals | ||
* @returns a deep cloned array of proposals | ||
*/ | ||
snapshotProposals() { | ||
return Array.from(this.proposals).map(([sequenceNumber, proposal]) => [ | ||
sequenceNumber, | ||
@@ -82,10 +111,11 @@ { sequenceNumber, key: proposal.key, value: proposal.value }, | ||
]); | ||
const snapshot = { | ||
members: [...this.members], | ||
proposals: serializedProposals, | ||
values: [...this.values], | ||
}; | ||
return cloneDeep_1.default(snapshot); | ||
} | ||
/** | ||
* Snapshots quorum values | ||
* @returns a deep cloned array of values | ||
*/ | ||
snapshotValues() { | ||
return cloneDeep_1.default(Array.from(this.values)); | ||
} | ||
/** | ||
* Returns whether the quorum has achieved a consensus for the given key. | ||
@@ -120,2 +150,4 @@ */ | ||
this.emit("addMember", clientId, details); | ||
// clear the members cache | ||
this.snapshotCache.members = undefined; | ||
} | ||
@@ -129,2 +161,4 @@ /** | ||
this.emit("removeMember", clientId); | ||
// clear the members cache | ||
this.snapshotCache.members = undefined; | ||
} | ||
@@ -175,2 +209,4 @@ /** | ||
} | ||
// clear the proposal cache | ||
this.snapshotCache.proposals = undefined; | ||
} | ||
@@ -189,5 +225,6 @@ /** | ||
} | ||
// clear the proposal cache | ||
this.snapshotCache.proposals = undefined; | ||
// We will emit approval and rejection messages once the MSN advances past the sequence number of the | ||
// proposal. This will allow us to convey all clients who rejected the proposal. | ||
return; | ||
} | ||
@@ -201,17 +238,4 @@ /** | ||
updateMinimumSequenceNumber(message) { | ||
const value = message.minimumSequenceNumber; | ||
if (this.minimumSequenceNumber !== undefined) { | ||
if (value < this.minimumSequenceNumber) { | ||
this.emit("error", { | ||
currentValue: this.minimumSequenceNumber, | ||
eventName: "QuorumMinSeqNumberError", | ||
newValue: value, | ||
}); | ||
} | ||
if (value <= this.minimumSequenceNumber) { | ||
return false; | ||
} | ||
} | ||
this.minimumSequenceNumber = value; | ||
let immediateNoOp = false; | ||
const msn = message.minimumSequenceNumber; | ||
// Accept proposals and reject proposals whose sequenceNumber is <= the minimumSequenceNumber | ||
@@ -222,3 +246,3 @@ // Return a sorted list of approved proposals. We sort so that we apply them in their sequence number order | ||
for (const [sequenceNumber, proposal] of this.proposals) { | ||
if (sequenceNumber <= this.minimumSequenceNumber) { | ||
if (sequenceNumber <= msn) { | ||
completed.push(proposal); | ||
@@ -250,2 +274,4 @@ } | ||
this.pendingCommit.set(committedProposal.key, committedProposal); | ||
// clear the values cache | ||
this.snapshotCache.values = undefined; | ||
// Send no-op on approval to expedite commit | ||
@@ -261,2 +287,4 @@ // accept means that all clients have seen the proposal and nobody has rejected it | ||
this.proposals.delete(proposal.sequenceNumber); | ||
// clear the proposals cache | ||
this.snapshotCache.proposals = undefined; | ||
} | ||
@@ -266,3 +294,3 @@ // Move values to the committed stage and notify | ||
Array.from(this.pendingCommit.values()) | ||
.filter((pendingCommit) => pendingCommit.approvalSequenceNumber <= value) | ||
.filter((pendingCommit) => pendingCommit.approvalSequenceNumber <= msn) | ||
.sort((a, b) => a.sequenceNumber - b.sequenceNumber) | ||
@@ -269,0 +297,0 @@ .forEach((pendingCommit) => { |
@@ -8,3 +8,3 @@ /*! | ||
export declare const pkgName = "@fluidframework/protocol-base"; | ||
export declare const pkgVersion = "0.1025.0-25508"; | ||
export declare const pkgVersion = "0.1025.0-25645"; | ||
//# sourceMappingURL=packageVersion.d.ts.map |
@@ -8,3 +8,3 @@ /*! | ||
export const pkgName = "@fluidframework/protocol-base"; | ||
export const pkgVersion = "0.1025.0-25508"; | ||
export const pkgVersion = "0.1025.0-25645"; | ||
//# sourceMappingURL=packageVersion.js.map |
@@ -26,4 +26,7 @@ /*! | ||
processMessage(message: ISequencedDocumentMessage, local: boolean): IProcessMessageResult; | ||
/** | ||
* Gets the scribe protocol state | ||
*/ | ||
getProtocolState(): IScribeProtocolState; | ||
} | ||
//# sourceMappingURL=protocol.d.ts.map |
@@ -31,3 +31,3 @@ /*! | ||
this.term = term !== null && term !== void 0 ? term : 1; | ||
this.quorum = new Quorum(minimumSequenceNumber, members, proposals, values, sendProposal, sendReject); | ||
this.quorum = new Quorum(members, proposals, values, sendProposal, sendReject); | ||
} | ||
@@ -38,2 +38,10 @@ close() { | ||
processMessage(message, local) { | ||
// verify it's moving sequentially | ||
if (message.sequenceNumber !== this.sequenceNumber + 1) { | ||
throw new Error(`Protocol state is not moving sequentially. ` + | ||
`Current is ${this.sequenceNumber}. Next is ${message.sequenceNumber}`); | ||
} | ||
// Update tracked sequence numbers | ||
this.sequenceNumber = message.sequenceNumber; | ||
this.minimumSequenceNumber = message.minimumSequenceNumber; | ||
let immediateNoOp = false; | ||
@@ -67,5 +75,2 @@ switch (message.type) { | ||
} | ||
// Update tracked sequence numbers | ||
this.minimumSequenceNumber = message.minimumSequenceNumber; | ||
this.sequenceNumber = message.sequenceNumber; | ||
// Notify the quorum of the MSN from the message. We rely on it to handle duplicate values but may | ||
@@ -76,13 +81,11 @@ // want to move that logic to this class. | ||
} | ||
/** | ||
* Gets the scribe protocol state | ||
*/ | ||
getProtocolState() { | ||
const quorumSnapshot = this.quorum.snapshot(); | ||
return { | ||
members: quorumSnapshot.members, | ||
minimumSequenceNumber: this.minimumSequenceNumber, | ||
proposals: quorumSnapshot.proposals, | ||
sequenceNumber: this.sequenceNumber, | ||
values: quorumSnapshot.values, | ||
}; | ||
// return a new object every time | ||
// this ensures future state changes will not affect outside callers | ||
return Object.assign({ sequenceNumber: this.sequenceNumber, minimumSequenceNumber: this.minimumSequenceNumber }, this.quorum.snapshot()); | ||
} | ||
} | ||
//# sourceMappingURL=protocol.js.map |
@@ -17,3 +17,2 @@ /*! | ||
export declare class Quorum extends TypedEventEmitter<IQuorumEvents> implements IQuorum { | ||
private minimumSequenceNumber; | ||
private readonly sendProposal; | ||
@@ -28,6 +27,32 @@ private readonly sendReject; | ||
private readonly localProposals; | ||
constructor(minimumSequenceNumber: number | undefined, members: [string, ISequencedClient][], proposals: [number, ISequencedProposal, string[]][], values: [string, ICommittedProposal][], sendProposal: (key: string, value: any) => number, sendReject: (sequenceNumber: number) => void); | ||
/** | ||
* Cached snapshot state | ||
* The quorum consists of 3 properties: members, values, and proposals. | ||
* Depending on the op being processed, some or none of those properties may change. | ||
* Each property will be cached and the cache for each property will be cleared when an op causes a change. | ||
*/ | ||
private readonly snapshotCache; | ||
constructor(members: [string, ISequencedClient][], proposals: [number, ISequencedProposal, string[]][], values: [string, ICommittedProposal][], sendProposal: (key: string, value: any) => number, sendReject: (sequenceNumber: number) => void); | ||
close(): void; | ||
/** | ||
* Snapshots the entire quorum | ||
* @returns a quorum snapshot | ||
*/ | ||
snapshot(): IQuorumSnapshot; | ||
/** | ||
* Snapshots quorum members | ||
* @returns a deep cloned array of members | ||
*/ | ||
snapshotMembers(): IQuorumSnapshot["members"]; | ||
/** | ||
* Snapshots quorum proposals | ||
* @returns a deep cloned array of proposals | ||
*/ | ||
snapshotProposals(): IQuorumSnapshot["proposals"]; | ||
/** | ||
* Snapshots quorum values | ||
* @returns a deep cloned array of values | ||
*/ | ||
snapshotValues(): IQuorumSnapshot["values"]; | ||
/** | ||
* Returns whether the quorum has achieved a consensus for the given key. | ||
@@ -34,0 +59,0 @@ */ |
@@ -44,5 +44,4 @@ /*! | ||
export class Quorum extends TypedEventEmitter { | ||
constructor(minimumSequenceNumber, members, proposals, values, sendProposal, sendReject) { | ||
constructor(members, proposals, values, sendProposal, sendReject) { | ||
super(); | ||
this.minimumSequenceNumber = minimumSequenceNumber; | ||
this.sendProposal = sendProposal; | ||
@@ -53,2 +52,9 @@ this.sendReject = sendReject; | ||
this.localProposals = new Map(); | ||
/** | ||
* Cached snapshot state | ||
* The quorum consists of 3 properties: members, values, and proposals. | ||
* Depending on the op being processed, some or none of those properties may change. | ||
* Each property will be cached and the cache for each property will be cleared when an op causes a change. | ||
*/ | ||
this.snapshotCache = {}; | ||
this.members = new Map(members); | ||
@@ -69,4 +75,27 @@ this.proposals = new Map(proposals.map(([, proposal, rejections]) => { | ||
} | ||
/** | ||
* Snapshots the entire quorum | ||
* @returns a quorum snapshot | ||
*/ | ||
snapshot() { | ||
const serializedProposals = Array.from(this.proposals).map(([sequenceNumber, proposal]) => [ | ||
var _a, _b, _c; | ||
var _d, _e, _f; | ||
(_a = (_d = this.snapshotCache).members) !== null && _a !== void 0 ? _a : (_d.members = this.snapshotMembers()); | ||
(_b = (_e = this.snapshotCache).proposals) !== null && _b !== void 0 ? _b : (_e.proposals = this.snapshotProposals()); | ||
(_c = (_f = this.snapshotCache).values) !== null && _c !== void 0 ? _c : (_f.values = this.snapshotValues()); | ||
return Object.assign({}, this.snapshotCache); | ||
} | ||
/** | ||
* Snapshots quorum members | ||
* @returns a deep cloned array of members | ||
*/ | ||
snapshotMembers() { | ||
return cloneDeep(Array.from(this.members)); | ||
} | ||
/** | ||
* Snapshots quorum proposals | ||
* @returns a deep cloned array of proposals | ||
*/ | ||
snapshotProposals() { | ||
return Array.from(this.proposals).map(([sequenceNumber, proposal]) => [ | ||
sequenceNumber, | ||
@@ -76,10 +105,11 @@ { sequenceNumber, key: proposal.key, value: proposal.value }, | ||
]); | ||
const snapshot = { | ||
members: [...this.members], | ||
proposals: serializedProposals, | ||
values: [...this.values], | ||
}; | ||
return cloneDeep(snapshot); | ||
} | ||
/** | ||
* Snapshots quorum values | ||
* @returns a deep cloned array of values | ||
*/ | ||
snapshotValues() { | ||
return cloneDeep(Array.from(this.values)); | ||
} | ||
/** | ||
* Returns whether the quorum has achieved a consensus for the given key. | ||
@@ -114,2 +144,4 @@ */ | ||
this.emit("addMember", clientId, details); | ||
// clear the members cache | ||
this.snapshotCache.members = undefined; | ||
} | ||
@@ -123,2 +155,4 @@ /** | ||
this.emit("removeMember", clientId); | ||
// clear the members cache | ||
this.snapshotCache.members = undefined; | ||
} | ||
@@ -169,2 +203,4 @@ /** | ||
} | ||
// clear the proposal cache | ||
this.snapshotCache.proposals = undefined; | ||
} | ||
@@ -183,5 +219,6 @@ /** | ||
} | ||
// clear the proposal cache | ||
this.snapshotCache.proposals = undefined; | ||
// We will emit approval and rejection messages once the MSN advances past the sequence number of the | ||
// proposal. This will allow us to convey all clients who rejected the proposal. | ||
return; | ||
} | ||
@@ -195,17 +232,4 @@ /** | ||
updateMinimumSequenceNumber(message) { | ||
const value = message.minimumSequenceNumber; | ||
if (this.minimumSequenceNumber !== undefined) { | ||
if (value < this.minimumSequenceNumber) { | ||
this.emit("error", { | ||
currentValue: this.minimumSequenceNumber, | ||
eventName: "QuorumMinSeqNumberError", | ||
newValue: value, | ||
}); | ||
} | ||
if (value <= this.minimumSequenceNumber) { | ||
return false; | ||
} | ||
} | ||
this.minimumSequenceNumber = value; | ||
let immediateNoOp = false; | ||
const msn = message.minimumSequenceNumber; | ||
// Accept proposals and reject proposals whose sequenceNumber is <= the minimumSequenceNumber | ||
@@ -216,3 +240,3 @@ // Return a sorted list of approved proposals. We sort so that we apply them in their sequence number order | ||
for (const [sequenceNumber, proposal] of this.proposals) { | ||
if (sequenceNumber <= this.minimumSequenceNumber) { | ||
if (sequenceNumber <= msn) { | ||
completed.push(proposal); | ||
@@ -244,2 +268,4 @@ } | ||
this.pendingCommit.set(committedProposal.key, committedProposal); | ||
// clear the values cache | ||
this.snapshotCache.values = undefined; | ||
// Send no-op on approval to expedite commit | ||
@@ -255,2 +281,4 @@ // accept means that all clients have seen the proposal and nobody has rejected it | ||
this.proposals.delete(proposal.sequenceNumber); | ||
// clear the proposals cache | ||
this.snapshotCache.proposals = undefined; | ||
} | ||
@@ -260,3 +288,3 @@ // Move values to the committed stage and notify | ||
Array.from(this.pendingCommit.values()) | ||
.filter((pendingCommit) => pendingCommit.approvalSequenceNumber <= value) | ||
.filter((pendingCommit) => pendingCommit.approvalSequenceNumber <= msn) | ||
.sort((a, b) => a.sequenceNumber - b.sequenceNumber) | ||
@@ -263,0 +291,0 @@ .forEach((pendingCommit) => { |
{ | ||
"name": "@fluidframework/protocol-base", | ||
"version": "0.1025.0-25508", | ||
"version": "0.1025.0-25645", | ||
"description": "Fluid protocol base", | ||
@@ -58,3 +58,3 @@ "homepage": "https://fluidframework.com", | ||
"@fluidframework/common-utils": "^0.30.0-0", | ||
"@fluidframework/gitresources": "0.1025.0-25508", | ||
"@fluidframework/gitresources": "0.1025.0-25645", | ||
"@fluidframework/protocol-definitions": "^0.1024.0", | ||
@@ -61,0 +61,0 @@ "assert": "^2.0.0", |
@@ -9,2 +9,2 @@ /*! | ||
export const pkgName = "@fluidframework/protocol-base"; | ||
export const pkgVersion = "0.1025.0-25508"; | ||
export const pkgVersion = "0.1025.0-25645"; |
@@ -50,2 +50,3 @@ /*! | ||
public readonly term: number; | ||
constructor( | ||
@@ -62,3 +63,2 @@ public minimumSequenceNumber: number, | ||
this.quorum = new Quorum( | ||
minimumSequenceNumber, | ||
members, | ||
@@ -76,2 +76,13 @@ proposals, | ||
public processMessage(message: ISequencedDocumentMessage, local: boolean): IProcessMessageResult { | ||
// verify it's moving sequentially | ||
if (message.sequenceNumber !== this.sequenceNumber + 1) { | ||
throw new Error( | ||
`Protocol state is not moving sequentially. ` + | ||
`Current is ${this.sequenceNumber}. Next is ${message.sequenceNumber}`); | ||
} | ||
// Update tracked sequence numbers | ||
this.sequenceNumber = message.sequenceNumber; | ||
this.minimumSequenceNumber = message.minimumSequenceNumber; | ||
let immediateNoOp = false; | ||
@@ -88,3 +99,2 @@ | ||
this.quorum.addMember(join.clientId, member); | ||
break; | ||
@@ -119,6 +129,2 @@ | ||
// Update tracked sequence numbers | ||
this.minimumSequenceNumber = message.minimumSequenceNumber; | ||
this.sequenceNumber = message.sequenceNumber; | ||
// Notify the quorum of the MSN from the message. We rely on it to handle duplicate values but may | ||
@@ -131,13 +137,14 @@ // want to move that logic to this class. | ||
/** | ||
* Gets the scribe protocol state | ||
*/ | ||
public getProtocolState(): IScribeProtocolState { | ||
const quorumSnapshot = this.quorum.snapshot(); | ||
// return a new object every time | ||
// this ensures future state changes will not affect outside callers | ||
return { | ||
members: quorumSnapshot.members, | ||
sequenceNumber: this.sequenceNumber, | ||
minimumSequenceNumber: this.minimumSequenceNumber, | ||
proposals: quorumSnapshot.proposals, | ||
sequenceNumber: this.sequenceNumber, | ||
values: quorumSnapshot.values, | ||
...this.quorum.snapshot(), | ||
}; | ||
} | ||
} |
@@ -83,4 +83,11 @@ /*! | ||
/** | ||
* Cached snapshot state | ||
* The quorum consists of 3 properties: members, values, and proposals. | ||
* Depending on the op being processed, some or none of those properties may change. | ||
* Each property will be cached and the cache for each property will be cleared when an op causes a change. | ||
*/ | ||
private readonly snapshotCache: Partial<IQuorumSnapshot> = {}; | ||
constructor( | ||
private minimumSequenceNumber: number | undefined, | ||
members: [string, ISequencedClient][], | ||
@@ -115,4 +122,30 @@ proposals: [number, ISequencedProposal, string[]][], | ||
/** | ||
* Snapshots the entire quorum | ||
* @returns a quorum snapshot | ||
*/ | ||
public snapshot(): IQuorumSnapshot { | ||
const serializedProposals = Array.from(this.proposals).map( | ||
this.snapshotCache.members ??= this.snapshotMembers(); | ||
this.snapshotCache.proposals ??= this.snapshotProposals(); | ||
this.snapshotCache.values ??= this.snapshotValues(); | ||
return { | ||
...this.snapshotCache as IQuorumSnapshot, | ||
}; | ||
} | ||
/** | ||
* Snapshots quorum members | ||
* @returns a deep cloned array of members | ||
*/ | ||
public snapshotMembers(): IQuorumSnapshot["members"] { | ||
return cloneDeep(Array.from(this.members)); | ||
} | ||
/** | ||
* Snapshots quorum proposals | ||
* @returns a deep cloned array of proposals | ||
*/ | ||
public snapshotProposals(): IQuorumSnapshot["proposals"] { | ||
return Array.from(this.proposals).map( | ||
([sequenceNumber, proposal]) => [ | ||
@@ -122,10 +155,10 @@ sequenceNumber, | ||
Array.from(proposal.rejections)] as [number, ISequencedProposal, string[]]); | ||
} | ||
const snapshot = { | ||
members: [...this.members], | ||
proposals: serializedProposals, | ||
values: [...this.values], | ||
}; | ||
return cloneDeep(snapshot); | ||
/** | ||
* Snapshots quorum values | ||
* @returns a deep cloned array of values | ||
*/ | ||
public snapshotValues(): IQuorumSnapshot["values"] { | ||
return cloneDeep(Array.from(this.values)); | ||
} | ||
@@ -166,2 +199,5 @@ | ||
this.emit("addMember", clientId, details); | ||
// clear the members cache | ||
this.snapshotCache.members = undefined; | ||
} | ||
@@ -176,2 +212,5 @@ | ||
this.emit("removeMember", clientId); | ||
// clear the members cache | ||
this.snapshotCache.members = undefined; | ||
} | ||
@@ -243,2 +282,5 @@ | ||
} | ||
// clear the proposal cache | ||
this.snapshotCache.proposals = undefined; | ||
} | ||
@@ -249,3 +291,3 @@ | ||
*/ | ||
public rejectProposal(clientId: string, sequenceNumber: number): void { | ||
public rejectProposal(clientId: string, sequenceNumber: number) { | ||
// Proposals require unanimous approval so any rejection results in a rejection of the proposal. For error | ||
@@ -261,6 +303,7 @@ // detection we will keep a rejected proposal in the pending list until the MSN advances so that we can | ||
// clear the proposal cache | ||
this.snapshotCache.proposals = undefined; | ||
// We will emit approval and rejection messages once the MSN advances past the sequence number of the | ||
// proposal. This will allow us to convey all clients who rejected the proposal. | ||
return; | ||
} | ||
@@ -275,19 +318,6 @@ | ||
public updateMinimumSequenceNumber(message: ISequencedDocumentMessage): boolean { | ||
const value = message.minimumSequenceNumber; | ||
if (this.minimumSequenceNumber !== undefined) { | ||
if (value < this.minimumSequenceNumber) { | ||
this.emit("error", { | ||
currentValue: this.minimumSequenceNumber, | ||
eventName: "QuorumMinSeqNumberError", | ||
newValue: value, | ||
}); | ||
} | ||
if (value <= this.minimumSequenceNumber) { | ||
return false; | ||
} | ||
} | ||
this.minimumSequenceNumber = value; | ||
let immediateNoOp = false; | ||
const msn = message.minimumSequenceNumber; | ||
// Accept proposals and reject proposals whose sequenceNumber is <= the minimumSequenceNumber | ||
@@ -299,3 +329,3 @@ | ||
for (const [sequenceNumber, proposal] of this.proposals) { | ||
if (sequenceNumber <= this.minimumSequenceNumber) { | ||
if (sequenceNumber <= msn) { | ||
completed.push(proposal); | ||
@@ -333,2 +363,5 @@ } | ||
// clear the values cache | ||
this.snapshotCache.values = undefined; | ||
// Send no-op on approval to expedite commit | ||
@@ -355,2 +388,5 @@ // accept means that all clients have seen the proposal and nobody has rejected it | ||
this.proposals.delete(proposal.sequenceNumber); | ||
// clear the proposals cache | ||
this.snapshotCache.proposals = undefined; | ||
} | ||
@@ -361,3 +397,3 @@ | ||
Array.from(this.pendingCommit.values()) | ||
.filter((pendingCommit) => pendingCommit.approvalSequenceNumber <= value) | ||
.filter((pendingCommit) => pendingCommit.approvalSequenceNumber <= msn) | ||
.sort((a, b) => a.sequenceNumber - b.sequenceNumber) | ||
@@ -364,0 +400,0 @@ .forEach((pendingCommit) => { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
230146
2834
+ Added@fluidframework/gitresources@0.1025.0-25645(transitive)
- Removed@fluidframework/gitresources@0.1025.0-25508(transitive)