Obj
The obj module implements the NATS ObjectStore functionality using JetStream for
JavaScript clients. JetStream clients can use streams to store and access data.
Obj is materialized view that presents a different API to interact with the
data stored in a stream using the API for an ObjectStore which should be
familiar to many application developers.
Installation
Note that this library is distributed in two different registries:
- npm a node-specific library supporting CJS (
require
) and ESM (import
) - jsr a node and other ESM (
import
) compatible runtimes (deno, browser, node)
If your application doesn't use require
, you can simply depend on the JSR
version.
NPM
The NPM registry hosts a node-only compatible version of the library
@nats-io/obj supporting both CJS
and ESM:
npm install @nats-io/obj
JSR
The JSR registry hosts the EMS-only @nats-io/obj
version of the library.
deno add @nats-io/obj
npx jsr add @nats-io/obj
yarn dlx jsr add @nats-io/obj
bunx jsr add @nats-io/obj
Referencing the library
Once you import the library, you can reference in your code as:
import { Objm } from "@nats-io/obj";
const { Objm } = require("@nats-io/obj");
const objm = new Objm(nc);
await objm.list();
await objm.create("myobj");
If you want to customize some of the JetStream options when working with KV, you
can:
import { jetStream } from "@nats-io/jetstream";
import { Objm } from "@nats-io/obj";
const js = jetstream(nc, { timeout: 10_000 });
const objm = new Objm(js);
const sc = StringCodec();
const objm = new Objm(nc);
const os = await objm.create("testing", { storage: StorageType.File });
function readableStreamFrom(data: Uint8Array): ReadableStream<Uint8Array> {
return new ReadableStream<Uint8Array>({
pull(controller) {
controller.enqueue(data);
controller.close();
},
});
}
async function fromReadableStream(
rs: ReadableStream<Uint8Array>,
) {
let i = 1;
const reader = rs.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
if (value && value.length) {
console.log(`chunk ${i++}: ${sc.decode(value)}`);
}
}
}
let e = await os.get("hello");
console.log(`hello entry exists? ${e !== null}`);
const watch = await os.watch();
(async () => {
for await (const i of watch) {
if (i === null) {
continue;
}
console.log(`watch: ${i!.name} deleted?: ${i!.deleted}`);
}
})();
const info = await os.put({
name: "hello",
description: "first entry",
options: {
max_chunk_size: 1,
},
}, readableStreamFrom(sc.encode("hello world")));
console.log(
`object size: ${info.size} number of chunks: ${info.size} deleted: ${info.deleted}`,
);
const r = await os.get("hello");
r?.error.then((err) => {
if (err) {
console.error("reading the readable stream failed:", err);
}
});
await fromReadableStream(r!.data);
const list = await os.list();
list.forEach((i) => {
console.log(`list: ${i.name}`);
});
const status = await os.status();
console.log(`bucket: '${status.bucket}' size in bytes: ${status.size}`);
const final = await os.seal();
console.log(`bucket: '${final.bucket}' sealed: ${final.sealed}`);
const destroyed = await os.destroy();
console.log(`destroyed: ${destroyed}`);