Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mps3

Package Overview
Dependencies
Maintainers
1
Versions
41
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mps3 - npm Package Compare versions

Comparing version 0.0.52 to 0.0.53

docs/causal_consistency_checking.md

2

package.json
{
"name": "mps3",
"description": "Provide clientside multiplayer and optimistic updates over any s3-compatible storage API",
"version": "0.0.52",
"version": "0.0.53",
"license": "MIT",

@@ -6,0 +6,0 @@ "author": {

@@ -5,2 +5,3 @@ import { S3 } from "@aws-sdk/client-s3";

import { uuid } from "types";
import { CentralisedCausalSystem } from "./consistency";

@@ -62,3 +63,7 @@ describe("mps3", () => {

});
const getClient = () => new MPS3(variant.config);
const getClient = (args?: { label?: string }) =>
new MPS3({
label: args?.label,
...variant.config,
});

@@ -346,4 +351,61 @@ test("Subscription deduplicate undefined", async (done) => {

});
test("causal consistency all-to-all, single key", async (done) => {
const key = "causal";
await getClient().delete(key);
const system = new CentralisedCausalSystem();
const max_steps = 100;
type Message = {
sender: number;
send_time: number;
};
// Setup all clients to forward messages to the observer
const clients = [...Array(3)].map((_, client_id) => {
const client = getClient({
label: system.client_labels[client_id],
});
client.subscribe(key, (val) => {
if (val) {
const message: Message = <Message>val;
system.observe({
...message,
receiver: client_id,
});
}
if (system.global_time < max_steps) {
// Check facts are causally consistent so far
const check_result = system.causallyConsistent();
if (!check_result) {
console.error(system.grounding);
console.error(system.knowledge_base);
}
expect(check_result).toBe(true);
// Write a new message
system.observe({
receiver: client_id,
sender: client_id,
send_time: system.client_clocks[client_id] - 1,
});
expect(check_result).toBe(true);
client.put(key, {
sender: client_id,
send_time: system.client_clocks[client_id] - 1,
});
} else if (system.global_time === max_steps) {
clients.forEach((c) =>
c.manifests.forEach((m) => m.subscribers.clear())
);
done();
}
});
return client;
});
});
})
);
});

@@ -42,18 +42,2 @@ import { OMap } from "OMap";

const isManifest = (obj: any): obj is ManifestState => {
if (!obj) return false;
return (
obj.version !== undefined &&
typeof obj.version === "number" &&
obj.files !== undefined &&
typeof obj.files === "object" &&
Object.values(obj.files).every(
(file: any) =>
typeof file === "object" &&
file.version !== undefined &&
typeof file.version === "string"
)
);
};
export class Subscriber {

@@ -71,5 +55,10 @@ ref: ResolvedRef;

notify(version: VersionId | undefined, value: JSONValue | DeleteValue) {
notify(
label: string,
version: VersionId | undefined,
value: JSONValue | DeleteValue
) {
if (version === this.lastVersion) return;
else {
console.log(`${label} NOTIFY ${url(this.ref)} ${version}`);
this.lastVersion = version;

@@ -87,2 +76,3 @@ this.handler(value);

cache?: HttpCacheEntry<ManifestState>;
pollInProgress: boolean = false;

@@ -101,2 +91,3 @@ // Pending writes iterate in insertion order

observeVersionId(versionId: VersionId) {
/*
console.log(

@@ -106,5 +97,5 @@ `observeVersionId ${versionId} in ${[

]} pending ${this.pendingWrites.size}`
);
);*/
if (this.writtenOperations.has(versionId)) {
console.log(`clearing pending write for observeVersionId ${versionId}`);
//console.log(`clearing pending write for observeVersionId ${versionId}`);
const operation = this.writtenOperations.get(versionId)!;

@@ -121,6 +112,5 @@ this.pendingWrites.delete(operation);

async getLatest(): Promise<ManifestState | undefined> {
console.log("getLatest");
try {
console.log("getLatest: s1");
const response = await this.service._getObject2<ManifestState>({
const response = await this.service._getObject<ManifestState>({
operation: "POLL_TIME",
ref: this.ref,

@@ -140,3 +130,2 @@ ifNoneMatch: this.cache?.etag,

console.log("getLatest: s2");
const objects = await this.service.s3Client.send(

@@ -152,7 +141,7 @@ new ListObjectsV2Command({

console.log(`replay state ${JSON.stringify(objects.Contents)}`);
let state = undefined;
for (let index = 0; index < objects.Contents.length; index++) {
const key = objects.Contents[index].Key!;
const step = await this.service._getObject2<ManifestState>({
const step = await this.service._getObject<ManifestState>({
operation: "SWEEP",
ref: {

@@ -164,6 +153,4 @@ bucket: this.ref.bucket,

if (!state) {
console.log(`base ${key} ${JSON.stringify(step.data)}`);
state = step.data;
} else {
console.log(`patch ${key} ${JSON.stringify(step.data?.update)}`);
state = apply(state, step.data?.update);

@@ -183,3 +170,2 @@ }

};
console.log("resolved data", JSON.stringify(state));
return state;

@@ -196,2 +182,5 @@ } catch (err: any) {

async poll() {
if (this.pollInProgress) return;
this.pollInProgress = true;
if (this.subscriberCount === 0 && this.poller) {

@@ -208,5 +197,6 @@ clearInterval(this.poller);

const state = await this.getLatest();
if (state === undefined) return; // no changes
console.log(`POLL ${JSON.stringify(state)}`);
if (state === undefined) {
this.pollInProgress = false;
return; // no changes
}
this.subscribers.forEach(async (subscriber) => {

@@ -216,13 +206,17 @@ const fileState: FileState | null | undefined =

if (fileState) {
console.log(`NOTIFY ${url(subscriber.ref)} ${fileState.version}`);
const fileContent = await this.service._getObject({
const fileContent = await this.service._getObject<any>({
operation: "GET_CONTENT",
ref: subscriber.ref,
version: fileState.version,
});
subscriber.notify(fileState.version, fileContent);
subscriber.notify(
this.service.config.label,
fileState.version,
fileContent.data
);
} else if (fileState === null) {
console.log(`NOTIFY ${url(subscriber.ref)} DELETE`);
subscriber.notify(undefined, undefined);
subscriber.notify(this.service.config.label, undefined, undefined);
}
});
this.pollInProgress = false;
}

@@ -235,3 +229,3 @@

this.pendingWrites.set(write, values);
console.log(`updateContent pending ${this.pendingWrites.size}`);
// console.log(`updateContent pending ${this.pendingWrites.size}`);

@@ -263,2 +257,3 @@ try {

await this.service._putObject({
operation: "PUT_MANIFEST",
ref: {

@@ -272,2 +267,3 @@ key: this.ref.key + "@" + version,

const response = await this.service._putObject({
operation: "PUT_TIME",
ref: {

@@ -291,3 +287,2 @@ key: this.ref.key,

async getVersion(ref: ResolvedRef): Promise<string | undefined> {
console.log("getVersion");
const state = await this.get();

@@ -294,0 +289,0 @@ return state.files[url(ref)]?.version;

@@ -19,2 +19,3 @@ import {

export interface MPS3Config {
label?: string;
defaultBucket: string;

@@ -29,2 +30,3 @@ defaultManifest?: Ref;

interface ResolvedMPS3Config extends MPS3Config {
label: string;
defaultManifest: ResolvedRef;

@@ -44,2 +46,3 @@ useVersioning: boolean;

...config,
label: config.label || uuid().substring(0, 3),
useChecksum: config.useChecksum === false ? false : true,

@@ -91,3 +94,3 @@ useVersioning: config.useVersioning || false,

if (inCache) {
console.log(`get (cached) ${url(contentRef)}`);
console.log(`${this.config.label} get (cached) ${url(contentRef)}`);
return cachedValue;

@@ -98,12 +101,25 @@ }

if (version === undefined) return undefined;
return this._getObject({
ref: contentRef,
version: version,
});
return (
await this._getObject<any>({
operation: "GET",
ref: contentRef,
version: version,
})
).data;
}
async _getObject(args: {
getCache = new OMap<
GetObjectCommandInput,
Promise<GetObjectCommandOutput & { data: any }>
>(
(input) =>
`${input.Bucket}${input.Key}${input.VersionId}${input.IfNoneMatch}`
);
async _getObject<T>(args: {
operation: string;
ref: ResolvedRef;
version?: string;
}): Promise<JSONValue | DeleteValue> {
ifNoneMatch?: string;
}): Promise<GetObjectCommandOutput & { data: T | undefined }> {
let command: GetObjectCommandInput;

@@ -114,2 +130,3 @@ if (this.config.useVersioning) {

Key: args.ref.key,
IfNoneMatch: args.ifNoneMatch,
...(args.version && { VersionId: args.version }),

@@ -121,61 +138,41 @@ };

Key: `${args.ref.key}${args.version ? `@${args.version}` : ""}`,
IfNoneMatch: args.ifNoneMatch,
};
}
try {
const response = await this.s3Client.send(new GetObjectCommand(command));
if (!response.Body) return undefined;
else {
const payload = await response.Body.transformToString("utf-8");
console.log(
`GET ${command.Bucket}/${command.Key} => ${response.VersionId}\n${payload}`
);
return JSON.parse(payload);
}
} catch (err: any) {
if (err.name === "NoSuchKey") {
console.log(`GET ${command.Bucket}/${command.Key} => NOT FOUND`);
return undefined;
} else throw err;
if (this.getCache.has(command)) {
return await this.getCache.get(command)!;
}
}
async _getObject2<T>(args: {
ref: ResolvedRef;
version?: string;
ifNoneMatch?: string;
}): Promise<GetObjectCommandOutput & { data: T | undefined }> {
const command: GetObjectCommandInput = {
Bucket: args.ref.bucket,
Key: args.ref.key,
IfNoneMatch: args.ifNoneMatch,
...(args.version && { VersionId: args.version }),
};
try {
const response = {
...(await this.s3Client.send(new GetObjectCommand(command))),
data: <T | undefined>undefined,
};
if (response.Body) {
response.data = <T>(
JSON.parse(await response.Body.transformToString("utf-8"))
);
console.log(
`GET ${args.ref.bucket}/${args.ref.key}@${args.version} => ${
response.VersionId
}\n${JSON.stringify(response.data)}`
);
}
return response;
} catch (err: any) {
if (err?.name === "304") {
return {
$metadata: {
httpStatusCode: 304,
},
data: undefined,
const work = this.s3Client
.send(new GetObjectCommand(command))
.then(async (apiResponse) => {
const response = {
...apiResponse,
data: <T | undefined>undefined,
};
} else {
throw err;
}
}
if (response.Body) {
response.data = <T>(
JSON.parse(await response.Body.transformToString("utf-8"))
);
console.log(
`${this.config.label} ${args.operation} ${args.ref.bucket}/${args.ref.key}@${args.version} => ${response.VersionId}`
);
this.getCache.set(command, work); // it be nice to cache this earlier but I hit some race conditions
}
return response;
})
.catch((err: any) => {
if (err?.name === "304") {
return {
$metadata: {
httpStatusCode: 304,
},
data: undefined,
};
} else {
throw err;
}
});
return work;
}

@@ -249,2 +246,3 @@

this._putObject({
operation: "PUT_CONTENT",
ref: contentRef,

@@ -290,2 +288,3 @@ value,

async _putObject(args: {
operation: string;
ref: ResolvedRef;

@@ -295,3 +294,2 @@ value: any;

}): Promise<PutObjectCommandOutput> {
console.log(`putObject ${url(args.ref)}`);
const content: string = JSON.stringify(args.value, null, 2);

@@ -323,3 +321,3 @@ let command: PutObjectCommandInput;

console.log(
`PUT ${command.Bucket}/${command.Key} => ${response.VersionId}\n${content}`
`${this.config.label} ${args.operation} ${command.Bucket}/${command.Key} => ${response.VersionId}`
);

@@ -339,3 +337,3 @@

console.log(
`DELETE ${args.ref.bucket}/${args.ref.key} => ${response.VersionId}`
`${this.config.label} DELETE ${args.ref.bucket}/${args.ref.key} => ${response.VersionId}`
);

@@ -367,3 +365,3 @@ return response;

}).then((initial) => {
console.log(`NOTIFY (initial) ${url(keyRef)}`);
console.log(`${this.config.label} NOTIFY (initial) ${url(keyRef)}`);
// if the data is cached we don't want the subscriber called in the same tick as

@@ -370,0 +368,0 @@ // the unsubscribe retun value will not be initialized

export class OMap<K, V> {
key: (key: K) => string;
vals: Map<string, V>;
keys: Map<string, K>;
private _vals: Map<string, V>;
private _keys: Map<string, K>;
constructor(key: (key: K) => string, values?: Iterable<readonly [K, V]>) {
this.key = key;
this.vals = new Map();
this.keys = new Map();
this._vals = new Map();
this._keys = new Map();
if (values) {

@@ -16,21 +16,31 @@ for (const [k, v] of values) {

}
get size(): number {
return this._vals.size;
}
set(key: K, value: V): this {
const k = this.key(key);
this.vals.set(k, value);
this.keys.set(k, key);
this._vals.set(k, value);
this._keys.set(k, key);
return this;
}
get(key: K): V | undefined {
return this.vals.get(this.key(key));
return this._vals.get(this.key(key));
}
delete(key: K): boolean {
const k = this.key(key);
this._keys.delete(k);
return this._vals.delete(k);
}
has(key: K): boolean {
return this.vals.has(this.key(key));
return this._vals.has(this.key(key));
}
values(): IterableIterator<V> {
return this.vals.values();
return this._vals.values();
}
keys(): IterableIterator<K> {
return this._keys.values();
}
forEach(callback: (value: V, key: K) => void) {
return this.vals.forEach((v, k, map) => callback(v, this.keys.get(k)!));
return this._vals.forEach((v, k, map) => callback(v, this._keys.get(k)!));
}
}

Sorry, the diff of this file is too big to display

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc