Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More β†’
Socket
Sign inDemoInstall
Socket

@thi.ng/rstream

Package Overview
Dependencies
Maintainers
0
Versions
399
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@thi.ng/rstream - npm Package Compare versions

Comparing version 9.0.9 to 9.1.0

8

CHANGELOG.md
# Change Log
- **Last updated**: 2024-11-24T18:15:49Z
- **Last updated**: 2024-12-04T15:08:17Z
- **Generator**: [thi.ng/monopub](https://thi.ng/monopub)

@@ -12,2 +12,8 @@

## [9.1.0](https://github.com/thi-ng/umbrella/tree/@thi.ng/rstream@9.1.0) (2024-12-04)
#### πŸš€ Features
- update fromObject(), re-implement as full subscription ([6903507](https://github.com/thi-ng/umbrella/commit/6903507))
# [9.0.0](https://github.com/thi-ng/umbrella/tree/@thi.ng/rstream@9.0.0) (2024-08-20)

@@ -14,0 +20,0 @@

45

object.d.ts

@@ -1,27 +0,7 @@

import type { Keys, Predicate2 } from "@thi.ng/api";
import type { IObjectOf, Keys, NumOrString, Predicate2 } from "@thi.ng/api";
import type { CommonOpts, ISubscription } from "./api.js";
import { Subscription } from "./subscription.js";
export type KeyStreams<T, K extends Keys<T>> = {
[id in K]-?: ISubscription<T[id], T[id]>;
};
/**
* Result object type for {@link fromObject}.
*/
export interface StreamObj<T, K extends Keys<T>> {
/**
* Object of managed & typed streams for registered keys.
*/
streams: KeyStreams<T, K>;
/**
* Feeds new values from `x` to each registered key's stream.
* Satifies {@link ISubscriber.next} interface.
*
* @param x -
*/
next(x: T): void;
/**
* Calls {@link ISubscriber.done} for all streams created. Satifies
* {@link ISubscriber.done} interface.
*/
done(): void;
}
export interface StreamObjOpts<T, K extends Keys<T>> extends CommonOpts {

@@ -134,2 +114,23 @@ /**

export declare const fromObject: <T extends object, K extends Keys<T>>(src: T, opts?: Partial<StreamObjOpts<T, K>>) => StreamObj<T, K>;
export declare class StreamObj<T extends object, K extends Keys<T>> extends Subscription<T, T> {
/**
* Object of managed & typed streams for registered keys.
*/
keys: NumOrString[];
streams: IObjectOf<Subscription<any, any>>;
defaults?: Partial<T>;
constructor(src: T, opts?: Partial<StreamObjOpts<T, K>>);
/**
* Feeds new values from `x` to each registered key's stream.
* Satifies {@link ISubscriber.next} interface.
*
* @param x -
*/
next(x: T): void;
/**
* Calls {@link ISubscriber.done} for all streams created. Satifies
* {@link ISubscriber.done} interface.
*/
done(): void;
}
//# sourceMappingURL=object.d.ts.map
import { dedupe } from "@thi.ng/transducers/dedupe";
import { __nextID } from "./idgen.js";
import { subscription } from "./subscription.js";
const fromObject = (src, opts = {}) => {
const id = opts.id || `obj${__nextID()}`;
const keys = opts.keys || Object.keys(src);
const _opts = opts.dedupe !== false ? {
xform: dedupe(opts.equiv || ((a, b) => a === b)),
...opts
} : opts;
const streams = {};
for (let k of keys) {
streams[k] = subscription(void 0, {
..._opts,
id: `${id}-${String(k)}`
});
import { __optsWithID } from "./idgen.js";
import { Subscription, subscription } from "./subscription.js";
const fromObject = (src, opts = {}) => new StreamObj(src, opts);
class StreamObj extends Subscription {
/**
* Object of managed & typed streams for registered keys.
*/
keys;
streams = {};
defaults;
constructor(src, opts = {}) {
super(void 0, __optsWithID("obj", opts));
this.keys = opts.keys || Object.keys(src);
this.defaults = opts.defaults;
const _opts = opts.dedupe !== false ? {
xform: dedupe(opts.equiv || ((a, b) => a === b)),
...opts
} : opts;
for (let k of this.keys) {
this.streams[k] = subscription(void 0, {
..._opts,
id: `${this.id}-${k}`
});
}
opts.initial !== false && this.next(src);
}
const res = {
streams,
next(state) {
for (let k of keys) {
const val = state[k];
streams[k].next(
opts.defaults && val === void 0 ? opts.defaults[k] : val
);
}
},
done() {
for (let k of keys) {
streams[k].done();
}
/**
* Feeds new values from `x` to each registered key's stream.
* Satifies {@link ISubscriber.next} interface.
*
* @param x -
*/
next(x) {
this.cacheLast && (this.last = x);
for (let k of this.keys) {
const val = x[k];
this.streams[k].next(
this.defaults && val === void 0 ? this.defaults[k] : val
);
}
};
opts.initial !== false && res.next(src);
return res;
};
}
/**
* Calls {@link ISubscriber.done} for all streams created. Satifies
* {@link ISubscriber.done} interface.
*/
done() {
for (let k of this.keys) {
this.streams[k].done();
}
}
}
export {
StreamObj,
fromObject
};
{
"name": "@thi.ng/rstream",
"version": "9.0.9",
"version": "9.1.0",
"description": "Reactive streams & subscription primitives for constructing dataflow graphs / pipelines",

@@ -222,3 +222,3 @@ "type": "module",

},
"gitHead": "85e2f0935b58bde5d165fbe754fafec5da0b731e\n"
"gitHead": "7f97d3a454bb605afabf785af1736cb155ecced4\n"
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚑️ by Socket Inc