Comparing version 0.0.52 to 0.0.53
{ | ||
"name": "mps3", | ||
"description": "Provide clientside multiplayer and optimistic updates over any s3-compatible storage API", | ||
"version": "0.0.52", | ||
"version": "0.0.53", | ||
"license": "MIT", | ||
@@ -6,0 +6,0 @@ "author": { |
@@ -5,2 +5,3 @@ import { S3 } from "@aws-sdk/client-s3"; | ||
import { uuid } from "types"; | ||
import { CentralisedCausalSystem } from "./consistency"; | ||
@@ -62,3 +63,7 @@ describe("mps3", () => { | ||
}); | ||
const getClient = () => new MPS3(variant.config); | ||
const getClient = (args?: { label?: string }) => | ||
new MPS3({ | ||
label: args?.label, | ||
...variant.config, | ||
}); | ||
@@ -346,4 +351,61 @@ test("Subscription deduplicate undefined", async (done) => { | ||
}); | ||
test("causal consistency all-to-all, single key", async (done) => { | ||
const key = "causal"; | ||
await getClient().delete(key); | ||
const system = new CentralisedCausalSystem(); | ||
const max_steps = 100; | ||
type Message = { | ||
sender: number; | ||
send_time: number; | ||
}; | ||
// Setup all clients to forward messages to the observer | ||
const clients = [...Array(3)].map((_, client_id) => { | ||
const client = getClient({ | ||
label: system.client_labels[client_id], | ||
}); | ||
client.subscribe(key, (val) => { | ||
if (val) { | ||
const message: Message = <Message>val; | ||
system.observe({ | ||
...message, | ||
receiver: client_id, | ||
}); | ||
} | ||
if (system.global_time < max_steps) { | ||
// Check facts are causally consistent so far | ||
const check_result = system.causallyConsistent(); | ||
if (!check_result) { | ||
console.error(system.grounding); | ||
console.error(system.knowledge_base); | ||
} | ||
expect(check_result).toBe(true); | ||
// Write a new message | ||
system.observe({ | ||
receiver: client_id, | ||
sender: client_id, | ||
send_time: system.client_clocks[client_id] - 1, | ||
}); | ||
expect(check_result).toBe(true); | ||
client.put(key, { | ||
sender: client_id, | ||
send_time: system.client_clocks[client_id] - 1, | ||
}); | ||
} else if (system.global_time === max_steps) { | ||
clients.forEach((c) => | ||
c.manifests.forEach((m) => m.subscribers.clear()) | ||
); | ||
done(); | ||
} | ||
}); | ||
return client; | ||
}); | ||
}); | ||
}) | ||
); | ||
}); |
@@ -42,18 +42,2 @@ import { OMap } from "OMap"; | ||
const isManifest = (obj: any): obj is ManifestState => { | ||
if (!obj) return false; | ||
return ( | ||
obj.version !== undefined && | ||
typeof obj.version === "number" && | ||
obj.files !== undefined && | ||
typeof obj.files === "object" && | ||
Object.values(obj.files).every( | ||
(file: any) => | ||
typeof file === "object" && | ||
file.version !== undefined && | ||
typeof file.version === "string" | ||
) | ||
); | ||
}; | ||
export class Subscriber { | ||
@@ -71,5 +55,10 @@ ref: ResolvedRef; | ||
notify(version: VersionId | undefined, value: JSONValue | DeleteValue) { | ||
notify( | ||
label: string, | ||
version: VersionId | undefined, | ||
value: JSONValue | DeleteValue | ||
) { | ||
if (version === this.lastVersion) return; | ||
else { | ||
console.log(`${label} NOTIFY ${url(this.ref)} ${version}`); | ||
this.lastVersion = version; | ||
@@ -87,2 +76,3 @@ this.handler(value); | ||
cache?: HttpCacheEntry<ManifestState>; | ||
pollInProgress: boolean = false; | ||
@@ -101,2 +91,3 @@ // Pending writes iterate in insertion order | ||
observeVersionId(versionId: VersionId) { | ||
/* | ||
console.log( | ||
@@ -106,5 +97,5 @@ `observeVersionId ${versionId} in ${[ | ||
]} pending ${this.pendingWrites.size}` | ||
); | ||
);*/ | ||
if (this.writtenOperations.has(versionId)) { | ||
console.log(`clearing pending write for observeVersionId ${versionId}`); | ||
//console.log(`clearing pending write for observeVersionId ${versionId}`); | ||
const operation = this.writtenOperations.get(versionId)!; | ||
@@ -121,6 +112,5 @@ this.pendingWrites.delete(operation); | ||
async getLatest(): Promise<ManifestState | undefined> { | ||
console.log("getLatest"); | ||
try { | ||
console.log("getLatest: s1"); | ||
const response = await this.service._getObject2<ManifestState>({ | ||
const response = await this.service._getObject<ManifestState>({ | ||
operation: "POLL_TIME", | ||
ref: this.ref, | ||
@@ -140,3 +130,2 @@ ifNoneMatch: this.cache?.etag, | ||
console.log("getLatest: s2"); | ||
const objects = await this.service.s3Client.send( | ||
@@ -152,7 +141,7 @@ new ListObjectsV2Command({ | ||
console.log(`replay state ${JSON.stringify(objects.Contents)}`); | ||
let state = undefined; | ||
for (let index = 0; index < objects.Contents.length; index++) { | ||
const key = objects.Contents[index].Key!; | ||
const step = await this.service._getObject2<ManifestState>({ | ||
const step = await this.service._getObject<ManifestState>({ | ||
operation: "SWEEP", | ||
ref: { | ||
@@ -164,6 +153,4 @@ bucket: this.ref.bucket, | ||
if (!state) { | ||
console.log(`base ${key} ${JSON.stringify(step.data)}`); | ||
state = step.data; | ||
} else { | ||
console.log(`patch ${key} ${JSON.stringify(step.data?.update)}`); | ||
state = apply(state, step.data?.update); | ||
@@ -183,3 +170,2 @@ } | ||
}; | ||
console.log("resolved data", JSON.stringify(state)); | ||
return state; | ||
@@ -196,2 +182,5 @@ } catch (err: any) { | ||
async poll() { | ||
if (this.pollInProgress) return; | ||
this.pollInProgress = true; | ||
if (this.subscriberCount === 0 && this.poller) { | ||
@@ -208,5 +197,6 @@ clearInterval(this.poller); | ||
const state = await this.getLatest(); | ||
if (state === undefined) return; // no changes | ||
console.log(`POLL ${JSON.stringify(state)}`); | ||
if (state === undefined) { | ||
this.pollInProgress = false; | ||
return; // no changes | ||
} | ||
this.subscribers.forEach(async (subscriber) => { | ||
@@ -216,13 +206,17 @@ const fileState: FileState | null | undefined = | ||
if (fileState) { | ||
console.log(`NOTIFY ${url(subscriber.ref)} ${fileState.version}`); | ||
const fileContent = await this.service._getObject({ | ||
const fileContent = await this.service._getObject<any>({ | ||
operation: "GET_CONTENT", | ||
ref: subscriber.ref, | ||
version: fileState.version, | ||
}); | ||
subscriber.notify(fileState.version, fileContent); | ||
subscriber.notify( | ||
this.service.config.label, | ||
fileState.version, | ||
fileContent.data | ||
); | ||
} else if (fileState === null) { | ||
console.log(`NOTIFY ${url(subscriber.ref)} DELETE`); | ||
subscriber.notify(undefined, undefined); | ||
subscriber.notify(this.service.config.label, undefined, undefined); | ||
} | ||
}); | ||
this.pollInProgress = false; | ||
} | ||
@@ -235,3 +229,3 @@ | ||
this.pendingWrites.set(write, values); | ||
console.log(`updateContent pending ${this.pendingWrites.size}`); | ||
// console.log(`updateContent pending ${this.pendingWrites.size}`); | ||
@@ -263,2 +257,3 @@ try { | ||
await this.service._putObject({ | ||
operation: "PUT_MANIFEST", | ||
ref: { | ||
@@ -272,2 +267,3 @@ key: this.ref.key + "@" + version, | ||
const response = await this.service._putObject({ | ||
operation: "PUT_TIME", | ||
ref: { | ||
@@ -291,3 +287,2 @@ key: this.ref.key, | ||
async getVersion(ref: ResolvedRef): Promise<string | undefined> { | ||
console.log("getVersion"); | ||
const state = await this.get(); | ||
@@ -294,0 +289,0 @@ return state.files[url(ref)]?.version; |
128
src/mps3.ts
@@ -19,2 +19,3 @@ import { | ||
export interface MPS3Config { | ||
label?: string; | ||
defaultBucket: string; | ||
@@ -29,2 +30,3 @@ defaultManifest?: Ref; | ||
interface ResolvedMPS3Config extends MPS3Config { | ||
label: string; | ||
defaultManifest: ResolvedRef; | ||
@@ -44,2 +46,3 @@ useVersioning: boolean; | ||
...config, | ||
label: config.label || uuid().substring(0, 3), | ||
useChecksum: config.useChecksum === false ? false : true, | ||
@@ -91,3 +94,3 @@ useVersioning: config.useVersioning || false, | ||
if (inCache) { | ||
console.log(`get (cached) ${url(contentRef)}`); | ||
console.log(`${this.config.label} get (cached) ${url(contentRef)}`); | ||
return cachedValue; | ||
@@ -98,12 +101,25 @@ } | ||
if (version === undefined) return undefined; | ||
return this._getObject({ | ||
ref: contentRef, | ||
version: version, | ||
}); | ||
return ( | ||
await this._getObject<any>({ | ||
operation: "GET", | ||
ref: contentRef, | ||
version: version, | ||
}) | ||
).data; | ||
} | ||
async _getObject(args: { | ||
getCache = new OMap< | ||
GetObjectCommandInput, | ||
Promise<GetObjectCommandOutput & { data: any }> | ||
>( | ||
(input) => | ||
`${input.Bucket}${input.Key}${input.VersionId}${input.IfNoneMatch}` | ||
); | ||
async _getObject<T>(args: { | ||
operation: string; | ||
ref: ResolvedRef; | ||
version?: string; | ||
}): Promise<JSONValue | DeleteValue> { | ||
ifNoneMatch?: string; | ||
}): Promise<GetObjectCommandOutput & { data: T | undefined }> { | ||
let command: GetObjectCommandInput; | ||
@@ -114,2 +130,3 @@ if (this.config.useVersioning) { | ||
Key: args.ref.key, | ||
IfNoneMatch: args.ifNoneMatch, | ||
...(args.version && { VersionId: args.version }), | ||
@@ -121,61 +138,41 @@ }; | ||
Key: `${args.ref.key}${args.version ? `@${args.version}` : ""}`, | ||
IfNoneMatch: args.ifNoneMatch, | ||
}; | ||
} | ||
try { | ||
const response = await this.s3Client.send(new GetObjectCommand(command)); | ||
if (!response.Body) return undefined; | ||
else { | ||
const payload = await response.Body.transformToString("utf-8"); | ||
console.log( | ||
`GET ${command.Bucket}/${command.Key} => ${response.VersionId}\n${payload}` | ||
); | ||
return JSON.parse(payload); | ||
} | ||
} catch (err: any) { | ||
if (err.name === "NoSuchKey") { | ||
console.log(`GET ${command.Bucket}/${command.Key} => NOT FOUND`); | ||
return undefined; | ||
} else throw err; | ||
if (this.getCache.has(command)) { | ||
return await this.getCache.get(command)!; | ||
} | ||
} | ||
async _getObject2<T>(args: { | ||
ref: ResolvedRef; | ||
version?: string; | ||
ifNoneMatch?: string; | ||
}): Promise<GetObjectCommandOutput & { data: T | undefined }> { | ||
const command: GetObjectCommandInput = { | ||
Bucket: args.ref.bucket, | ||
Key: args.ref.key, | ||
IfNoneMatch: args.ifNoneMatch, | ||
...(args.version && { VersionId: args.version }), | ||
}; | ||
try { | ||
const response = { | ||
...(await this.s3Client.send(new GetObjectCommand(command))), | ||
data: <T | undefined>undefined, | ||
}; | ||
if (response.Body) { | ||
response.data = <T>( | ||
JSON.parse(await response.Body.transformToString("utf-8")) | ||
); | ||
console.log( | ||
`GET ${args.ref.bucket}/${args.ref.key}@${args.version} => ${ | ||
response.VersionId | ||
}\n${JSON.stringify(response.data)}` | ||
); | ||
} | ||
return response; | ||
} catch (err: any) { | ||
if (err?.name === "304") { | ||
return { | ||
$metadata: { | ||
httpStatusCode: 304, | ||
}, | ||
data: undefined, | ||
const work = this.s3Client | ||
.send(new GetObjectCommand(command)) | ||
.then(async (apiResponse) => { | ||
const response = { | ||
...apiResponse, | ||
data: <T | undefined>undefined, | ||
}; | ||
} else { | ||
throw err; | ||
} | ||
} | ||
if (response.Body) { | ||
response.data = <T>( | ||
JSON.parse(await response.Body.transformToString("utf-8")) | ||
); | ||
console.log( | ||
`${this.config.label} ${args.operation} ${args.ref.bucket}/${args.ref.key}@${args.version} => ${response.VersionId}` | ||
); | ||
this.getCache.set(command, work); // it be nice to cache this earlier but I hit some race conditions | ||
} | ||
return response; | ||
}) | ||
.catch((err: any) => { | ||
if (err?.name === "304") { | ||
return { | ||
$metadata: { | ||
httpStatusCode: 304, | ||
}, | ||
data: undefined, | ||
}; | ||
} else { | ||
throw err; | ||
} | ||
}); | ||
return work; | ||
} | ||
@@ -249,2 +246,3 @@ | ||
this._putObject({ | ||
operation: "PUT_CONTENT", | ||
ref: contentRef, | ||
@@ -290,2 +288,3 @@ value, | ||
async _putObject(args: { | ||
operation: string; | ||
ref: ResolvedRef; | ||
@@ -295,3 +294,2 @@ value: any; | ||
}): Promise<PutObjectCommandOutput> { | ||
console.log(`putObject ${url(args.ref)}`); | ||
const content: string = JSON.stringify(args.value, null, 2); | ||
@@ -323,3 +321,3 @@ let command: PutObjectCommandInput; | ||
console.log( | ||
`PUT ${command.Bucket}/${command.Key} => ${response.VersionId}\n${content}` | ||
`${this.config.label} ${args.operation} ${command.Bucket}/${command.Key} => ${response.VersionId}` | ||
); | ||
@@ -339,3 +337,3 @@ | ||
console.log( | ||
`DELETE ${args.ref.bucket}/${args.ref.key} => ${response.VersionId}` | ||
`${this.config.label} DELETE ${args.ref.bucket}/${args.ref.key} => ${response.VersionId}` | ||
); | ||
@@ -367,3 +365,3 @@ return response; | ||
}).then((initial) => { | ||
console.log(`NOTIFY (initial) ${url(keyRef)}`); | ||
console.log(`${this.config.label} NOTIFY (initial) ${url(keyRef)}`); | ||
// if the data is cached we don't want the subscriber called in the same tick as | ||
@@ -370,0 +368,0 @@ // the unsubscribe retun value will not be initialized |
export class OMap<K, V> { | ||
key: (key: K) => string; | ||
vals: Map<string, V>; | ||
keys: Map<string, K>; | ||
private _vals: Map<string, V>; | ||
private _keys: Map<string, K>; | ||
constructor(key: (key: K) => string, values?: Iterable<readonly [K, V]>) { | ||
this.key = key; | ||
this.vals = new Map(); | ||
this.keys = new Map(); | ||
this._vals = new Map(); | ||
this._keys = new Map(); | ||
if (values) { | ||
@@ -16,21 +16,31 @@ for (const [k, v] of values) { | ||
} | ||
get size(): number { | ||
return this._vals.size; | ||
} | ||
set(key: K, value: V): this { | ||
const k = this.key(key); | ||
this.vals.set(k, value); | ||
this.keys.set(k, key); | ||
this._vals.set(k, value); | ||
this._keys.set(k, key); | ||
return this; | ||
} | ||
get(key: K): V | undefined { | ||
return this.vals.get(this.key(key)); | ||
return this._vals.get(this.key(key)); | ||
} | ||
delete(key: K): boolean { | ||
const k = this.key(key); | ||
this._keys.delete(k); | ||
return this._vals.delete(k); | ||
} | ||
has(key: K): boolean { | ||
return this.vals.has(this.key(key)); | ||
return this._vals.has(this.key(key)); | ||
} | ||
values(): IterableIterator<V> { | ||
return this.vals.values(); | ||
return this._vals.values(); | ||
} | ||
keys(): IterableIterator<K> { | ||
return this._keys.values(); | ||
} | ||
forEach(callback: (value: V, key: K) => void) { | ||
return this.vals.forEach((v, k, map) => callback(v, this.keys.get(k)!)); | ||
return this._vals.forEach((v, k, map) => callback(v, this._keys.get(k)!)); | ||
} | ||
} |
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
418088
17
2564