Socket
Socket
Sign inDemoInstall

@segment/analytics-node

Package Overview
Dependencies
Maintainers
233
Versions
43
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@segment/analytics-node - npm Package Compare versions

Comparing version 0.0.1-beta.5 to 0.0.1-beta.6

dist/cjs/src/app/dispatch-emit.js

7

CHANGELOG.md

@@ -7,2 +7,9 @@ # @segment/analytics-node

- Updated dependencies [[`ecb4b8d`](https://github.com/segmentio/analytics-next/commit/ecb4b8db0194e06a3ee3c8cae57d4f327d15dc02)]:
- @segment/analytics-core@1.1.4
## 0.0.1
### Patch Changes
- Updated dependencies [[`0b9f4d7`](https://github.com/segmentio/analytics-next/commit/0b9f4d7e82662f7d5fda3590e93b10b3fd2e9833)]:

@@ -9,0 +16,0 @@ - @segment/analytics-core@1.1.3

4

dist/cjs/package.json
{
"name": "@segment/analytics-node",
"version": "0.0.1-beta.5",
"version": "0.0.1-beta.6",
"main": "./dist/cjs/src/index.js",

@@ -36,3 +36,3 @@ "module": "./dist/esm/src/index.js",

"@lukeed/uuid": "^2.0.0",
"@segment/analytics-core": "1.1.3",
"@segment/analytics-core": "1.1.4",
"node-fetch": "^2.6.7",

@@ -39,0 +39,0 @@ "tslib": "^2.4.0"

@@ -9,2 +9,4 @@ "use strict";

const create_node_event_factory_1 = require("../lib/create-node-event-factory");
const dispatch_emit_1 = require("./dispatch-emit");
const emitter_1 = require("./emitter");
// create a derived class since we may want to add node specific things to Context later

@@ -27,3 +29,3 @@ class Context extends analytics_core_1.CoreContext {

}
class Analytics extends analytics_core_1.Emitter {
class Analytics extends emitter_1.NodeEmitter {
constructor(settings) {

@@ -35,6 +37,6 @@ super();

this._eventFactory = (0, create_node_event_factory_1.createNodeEventFactory)();
this.queue = new analytics_core_1.EventQueue(new NodePriorityQueue());
this._queue = new analytics_core_1.EventQueue(new NodePriorityQueue());
const flushInterval = settings.flushInterval ?? 10000;
this._closeAndFlushDefaultTimeout = flushInterval * 1.25; // add arbitrary multiplier in case an event is in a plugin.
this.ready = this.register((0, segmentio_1.configureNodePlugin)({
const { plugin, publisher } = (0, segmentio_1.createConfiguredNodePlugin)({
writeKey: settings.writeKey,

@@ -46,3 +48,5 @@ host: settings.host,

flushInterval,
})).then(() => undefined);
});
this._publisher = publisher;
this.ready = this.register(plugin).then(() => undefined);
this.emit('initialize', settings);

@@ -60,2 +64,3 @@ (0, analytics_core_1.bindAll)(this);

closeAndFlush({ timeout = this._closeAndFlushDefaultTimeout, } = {}) {
this._publisher.flushAfterClose(this._pendingEvents);
this._isClosed = true;

@@ -78,5 +83,3 @@ const promise = new Promise((resolve) => {

this._pendingEvents++;
(0, analytics_core_1.dispatchAndEmit)(segmentEvent, this.queue, this, {
callback: callback,
})
(0, dispatch_emit_1.dispatchAndEmit)(segmentEvent, this._queue, this, callback)
.catch((ctx) => ctx)

@@ -94,4 +97,8 @@ .finally(() => {

*/
alias({ userId, previousId, options, callback, }) {
const segmentEvent = this._eventFactory.alias(userId, previousId, options);
alias({ userId, previousId, context, timestamp, integrations, }, callback) {
const segmentEvent = this._eventFactory.alias(userId, previousId, {
context,
integrations,
timestamp,
});
this._dispatch(segmentEvent, callback);

@@ -103,7 +110,9 @@ }

*/
group({ groupId, userId, anonymousId, traits = {}, options = {}, callback, }) {
group({ timestamp, groupId, userId, anonymousId, traits = {}, context, integrations, }, callback) {
const segmentEvent = this._eventFactory.group(groupId, traits, {
...options,
context,
anonymousId,
userId,
timestamp,
integrations,
});

@@ -116,7 +125,8 @@ this._dispatch(segmentEvent, callback);

*/
identify({ userId, anonymousId, traits = {}, options, callback, }) {
identify({ userId, anonymousId, traits = {}, context, integrations, }, callback) {
const segmentEvent = this._eventFactory.identify(userId, traits, {
...options,
context,
anonymousId,
userId,
integrations,
});

@@ -129,4 +139,4 @@ this._dispatch(segmentEvent, callback);

*/
page({ userId, anonymousId, category, name, properties, options, timestamp, callback, }) {
const segmentEvent = this._eventFactory.page(category ?? null, name ?? null, properties, { ...options, anonymousId, userId, timestamp });
page({ userId, anonymousId, category, name, properties, context, timestamp, integrations, }, callback) {
const segmentEvent = this._eventFactory.page(category ?? null, name ?? null, properties, { context, anonymousId, userId, timestamp, integrations });
this._dispatch(segmentEvent, callback);

@@ -140,4 +150,4 @@ }

*/
screen({ userId, anonymousId, category, name, properties, options, callback, timestamp, }) {
const segmentEvent = this._eventFactory.screen(category ?? null, name ?? null, properties, { ...options, anonymousId, userId, timestamp });
screen({ userId, anonymousId, category, name, properties, context, timestamp, integrations, }, callback) {
const segmentEvent = this._eventFactory.screen(category ?? null, name ?? null, properties, { context, anonymousId, userId, timestamp, integrations });
this._dispatch(segmentEvent, callback);

@@ -149,7 +159,9 @@ }

*/
track({ userId, anonymousId, event, properties, options, callback, }) {
track({ userId, anonymousId, event, properties, context, timestamp, integrations, }, callback) {
const segmentEvent = this._eventFactory.track(event, properties, {
...options,
context,
userId,
anonymousId,
timestamp,
integrations,
});

@@ -163,5 +175,5 @@ this._dispatch(segmentEvent, callback);

async register(...plugins) {
return this.queue.criticalTasks.run(async () => {
return this._queue.criticalTasks.run(async () => {
const ctx = Context.system();
const registrations = plugins.map((xt) => this.queue.register(ctx, xt, this));
const registrations = plugins.map((xt) => this._queue.register(ctx, xt, this));
await Promise.all(registrations);

@@ -178,5 +190,5 @@ this.emit('register', plugins.map((el) => el.name));

const deregistrations = pluginNames.map(async (pl) => {
const plugin = this.queue.plugins.find((p) => p.name === pl);
const plugin = this._queue.plugins.find((p) => p.name === pl);
if (plugin) {
return this.queue.deregister(ctx, plugin, this);
return this._queue.deregister(ctx, plugin, this);
}

@@ -183,0 +195,0 @@ else {

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const tslib_1 = require("tslib");
tslib_1.__exportStar(require("./app/analytics-node"), exports);
exports.Context = exports.Analytics = void 0;
var analytics_node_1 = require("./app/analytics-node");
Object.defineProperty(exports, "Analytics", { enumerable: true, get: function () { return analytics_node_1.Analytics; } });
Object.defineProperty(exports, "Context", { enumerable: true, get: function () { return analytics_node_1.Context; } });
// export Analytics as both a named export and a default export (for backwards-compat. reasons)
const analytics_node_1 = require("./app/analytics-node");
exports.default = analytics_node_1.Analytics;
const analytics_node_2 = require("./app/analytics-node");
exports.default = analytics_node_2.Analytics;
//# sourceMappingURL=index.js.map

@@ -5,4 +5,4 @@ "use strict";

const uuid_1 = require("@lukeed/uuid");
const MAX_EVENT_SIZE_IN_BYTES = 32 * 1024; // 32 KB
const MAX_BATCH_SIZE_IN_BYTES = 480 * 1024; // 480 KB (500 KB is the limit, leaving some padding)
const MAX_EVENT_SIZE_IN_KB = 32;
const MAX_BATCH_SIZE_IN_KB = 480; // (500 KB is the limit, leaving some padding)
class ContextBatch {

@@ -17,14 +17,22 @@ constructor(maxEventCount) {

if (this.length === this.maxEventCount)
return false;
return {
success: false,
message: `Event limit of ${this.maxEventCount} has been exceeded.`,
};
const eventSize = this.calculateSize(item.context);
if (eventSize > MAX_EVENT_SIZE_IN_BYTES) {
// Event exceeds Segment's limits
return false;
if (eventSize > MAX_EVENT_SIZE_IN_KB * 1024) {
return {
success: false,
message: `Event exceeds maximum event size of ${MAX_EVENT_SIZE_IN_KB} KB`,
};
}
if (this.sizeInBytes + eventSize <= MAX_BATCH_SIZE_IN_BYTES) {
this.items.push(item);
this.sizeInBytes += eventSize;
return true;
if (this.sizeInBytes + eventSize > MAX_BATCH_SIZE_IN_KB * 1024) {
return {
success: false,
message: `Event has caused batch size to exceed ${MAX_BATCH_SIZE_IN_KB} KB`,
};
}
return false;
this.items.push(item);
this.sizeInBytes += eventSize;
return { success: true };
}

@@ -31,0 +39,0 @@ get length() {

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.configureNodePlugin = void 0;
exports.createConfiguredNodePlugin = exports.createNodePlugin = void 0;
const publisher_1 = require("./publisher");

@@ -11,4 +11,3 @@ const package_json_1 = require("../../../package.json");

}
function configureNodePlugin(props) {
const publisher = new publisher_1.Publisher(props);
function createNodePlugin(publisher) {
function action(ctx) {

@@ -32,3 +31,11 @@ normalizeEvent(ctx);

}
exports.configureNodePlugin = configureNodePlugin;
exports.createNodePlugin = createNodePlugin;
const createConfiguredNodePlugin = (props) => {
const publisher = new publisher_1.Publisher(props);
return {
publisher: publisher,
plugin: createNodePlugin(publisher),
};
};
exports.createConfiguredNodePlugin = createConfiguredNodePlugin;
//# sourceMappingURL=index.js.map

@@ -27,6 +27,6 @@ "use strict";

const batch = new context_batch_1.ContextBatch(this._maxEventsInBatch);
this.batch = batch;
this._batch = batch;
this.pendingFlushTimeout = setTimeout(() => {
if (batch === this.batch) {
this.batch = undefined;
if (batch === this._batch) {
this._batch = undefined;
}

@@ -42,4 +42,21 @@ this.pendingFlushTimeout = undefined;

this.pendingFlushTimeout && clearTimeout(this.pendingFlushTimeout);
this.batch = undefined;
this._batch = undefined;
}
flushAfterClose(pendingItemsCount) {
if (!pendingItemsCount) {
// if number of pending items is 0, there will never be anything else entering the batch, since the app is closed.
return;
}
this._closeAndFlushPendingItemsCount = pendingItemsCount;
// if batch is empty, there's nothing to flush, and when things come in, enqueue will handle them.
if (!this._batch)
return;
// the number of globally pending items will always be larger or the same as batch size.
// Any mismatch is because some globally pending items are in plugins.
const isExpectingNoMoreItems = this._batch.length === pendingItemsCount;
if (isExpectingNoMoreItems) {
this.send(this._batch).catch(noop);
this.clearBatch();
}
}
/**

@@ -51,3 +68,3 @@ * Enqueues the context for future delivery.

enqueue(ctx) {
const batch = this.batch ?? this.createBatch();
const batch = this._batch ?? this.createBatch();
const { promise: ctxPromise, resolve } = (0, extract_promise_parts_1.extractPromiseParts)();

@@ -63,3 +80,3 @@ const pendingItem = {

Add an event to the existing batch.
Success: Check if batch is full and send if it is.
Success: Check if batch is full or no more items are expected to come in (i.e. closing). If so, send batch.
Failure: Assume event is too big to fit in current batch - send existing batch.

@@ -70,4 +87,7 @@ Add an event to the new batch.

*/
if (batch.tryAdd(pendingItem)) {
if (batch.length === this._maxEventsInBatch) {
const addStatus = batch.tryAdd(pendingItem);
if (addStatus.success) {
const isExpectingNoMoreItems = batch.length === this._closeAndFlushPendingItemsCount;
const isFull = batch.length === this._maxEventsInBatch;
if (isFull || isExpectingNoMoreItems) {
this.send(batch).catch(noop);

@@ -78,3 +98,4 @@ this.clearBatch();

}
else if (batch.length) {
// If the new item causes the maximimum event size to be exceeded, send the current batch and create a new one.
if (batch.length) {
this.send(batch).catch(noop);

@@ -84,10 +105,6 @@ this.clearBatch();

const fallbackBatch = this.createBatch();
if (!fallbackBatch.tryAdd(pendingItem)) {
ctx.setFailedDelivery({
reason: new Error(`Event exceeds maximum event size of 32 kb`),
});
return Promise.resolve(ctx);
}
else {
if (fallbackBatch.length === this._maxEventsInBatch) {
const fbAddStatus = fallbackBatch.tryAdd(pendingItem);
if (fbAddStatus.success) {
const isExpectingNoMoreItems = fallbackBatch.length === this._closeAndFlushPendingItemsCount;
if (isExpectingNoMoreItems) {
this.send(fallbackBatch).catch(noop);

@@ -98,4 +115,14 @@ this.clearBatch();

}
else {
// this should only occur if max event size is exceeded
ctx.setFailedDelivery({
reason: new Error(fbAddStatus.message),
});
return Promise.resolve(ctx);
}
}
async send(batch) {
if (this._closeAndFlushPendingItemsCount) {
this._closeAndFlushPendingItemsCount -= batch.length;
}
const events = batch.getEvents();

@@ -102,0 +129,0 @@ const payload = JSON.stringify({ batch: events });

{
"name": "@segment/analytics-node",
"version": "0.0.1-beta.5",
"version": "0.0.1-beta.6",
"main": "./dist/cjs/src/index.js",

@@ -36,3 +36,3 @@ "module": "./dist/esm/src/index.js",

"@lukeed/uuid": "^2.0.0",
"@segment/analytics-core": "1.1.3",
"@segment/analytics-core": "1.1.4",
"node-fetch": "^2.6.7",

@@ -39,0 +39,0 @@ "tslib": "^2.4.0"

@@ -1,6 +0,8 @@

import { Emitter, CoreContext, EventQueue, dispatchAndEmit, bindAll, PriorityQueue, pTimeout, } from '@segment/analytics-core';
import { CoreContext, EventQueue, bindAll, PriorityQueue, pTimeout, } from '@segment/analytics-core';
import { validateSettings } from './settings';
import { version } from '../../package.json';
import { configureNodePlugin } from '../plugins/segmentio';
import { createConfiguredNodePlugin } from '../plugins/segmentio';
import { createNodeEventFactory } from '../lib/create-node-event-factory';
import { dispatchAndEmit } from './dispatch-emit';
import { NodeEmitter } from './emitter';
// create a derived class since we may want to add node specific things to Context later

@@ -22,3 +24,3 @@ export class Context extends CoreContext {

}
export class Analytics extends Emitter {
export class Analytics extends NodeEmitter {
constructor(settings) {

@@ -30,6 +32,6 @@ super();

this._eventFactory = createNodeEventFactory();
this.queue = new EventQueue(new NodePriorityQueue());
this._queue = new EventQueue(new NodePriorityQueue());
const flushInterval = settings.flushInterval ?? 10000;
this._closeAndFlushDefaultTimeout = flushInterval * 1.25; // add arbitrary multiplier in case an event is in a plugin.
this.ready = this.register(configureNodePlugin({
const { plugin, publisher } = createConfiguredNodePlugin({
writeKey: settings.writeKey,

@@ -41,3 +43,5 @@ host: settings.host,

flushInterval,
})).then(() => undefined);
});
this._publisher = publisher;
this.ready = this.register(plugin).then(() => undefined);
this.emit('initialize', settings);

@@ -55,2 +59,3 @@ bindAll(this);

closeAndFlush({ timeout = this._closeAndFlushDefaultTimeout, } = {}) {
this._publisher.flushAfterClose(this._pendingEvents);
this._isClosed = true;

@@ -73,5 +78,3 @@ const promise = new Promise((resolve) => {

this._pendingEvents++;
dispatchAndEmit(segmentEvent, this.queue, this, {
callback: callback,
})
dispatchAndEmit(segmentEvent, this._queue, this, callback)
.catch((ctx) => ctx)

@@ -89,4 +92,8 @@ .finally(() => {

*/
alias({ userId, previousId, options, callback, }) {
const segmentEvent = this._eventFactory.alias(userId, previousId, options);
alias({ userId, previousId, context, timestamp, integrations, }, callback) {
const segmentEvent = this._eventFactory.alias(userId, previousId, {
context,
integrations,
timestamp,
});
this._dispatch(segmentEvent, callback);

@@ -98,7 +105,9 @@ }

*/
group({ groupId, userId, anonymousId, traits = {}, options = {}, callback, }) {
group({ timestamp, groupId, userId, anonymousId, traits = {}, context, integrations, }, callback) {
const segmentEvent = this._eventFactory.group(groupId, traits, {
...options,
context,
anonymousId,
userId,
timestamp,
integrations,
});

@@ -111,7 +120,8 @@ this._dispatch(segmentEvent, callback);

*/
identify({ userId, anonymousId, traits = {}, options, callback, }) {
identify({ userId, anonymousId, traits = {}, context, integrations, }, callback) {
const segmentEvent = this._eventFactory.identify(userId, traits, {
...options,
context,
anonymousId,
userId,
integrations,
});

@@ -124,4 +134,4 @@ this._dispatch(segmentEvent, callback);

*/
page({ userId, anonymousId, category, name, properties, options, timestamp, callback, }) {
const segmentEvent = this._eventFactory.page(category ?? null, name ?? null, properties, { ...options, anonymousId, userId, timestamp });
page({ userId, anonymousId, category, name, properties, context, timestamp, integrations, }, callback) {
const segmentEvent = this._eventFactory.page(category ?? null, name ?? null, properties, { context, anonymousId, userId, timestamp, integrations });
this._dispatch(segmentEvent, callback);

@@ -135,4 +145,4 @@ }

*/
screen({ userId, anonymousId, category, name, properties, options, callback, timestamp, }) {
const segmentEvent = this._eventFactory.screen(category ?? null, name ?? null, properties, { ...options, anonymousId, userId, timestamp });
screen({ userId, anonymousId, category, name, properties, context, timestamp, integrations, }, callback) {
const segmentEvent = this._eventFactory.screen(category ?? null, name ?? null, properties, { context, anonymousId, userId, timestamp, integrations });
this._dispatch(segmentEvent, callback);

@@ -144,7 +154,9 @@ }

*/
track({ userId, anonymousId, event, properties, options, callback, }) {
track({ userId, anonymousId, event, properties, context, timestamp, integrations, }, callback) {
const segmentEvent = this._eventFactory.track(event, properties, {
...options,
context,
userId,
anonymousId,
timestamp,
integrations,
});

@@ -158,5 +170,5 @@ this._dispatch(segmentEvent, callback);

async register(...plugins) {
return this.queue.criticalTasks.run(async () => {
return this._queue.criticalTasks.run(async () => {
const ctx = Context.system();
const registrations = plugins.map((xt) => this.queue.register(ctx, xt, this));
const registrations = plugins.map((xt) => this._queue.register(ctx, xt, this));
await Promise.all(registrations);

@@ -173,5 +185,5 @@ this.emit('register', plugins.map((el) => el.name));

const deregistrations = pluginNames.map(async (pl) => {
const plugin = this.queue.plugins.find((p) => p.name === pl);
const plugin = this._queue.plugins.find((p) => p.name === pl);
if (plugin) {
return this.queue.deregister(ctx, plugin, this);
return this._queue.deregister(ctx, plugin, this);
}

@@ -178,0 +190,0 @@ else {

@@ -1,2 +0,2 @@

export * from './app/analytics-node';
export { Analytics, Context } from './app/analytics-node';
// export Analytics as both a named export and a default export (for backwards-compat. reasons)

@@ -3,0 +3,0 @@ import { Analytics } from './app/analytics-node';

import { v4 as uuid } from '@lukeed/uuid';
const MAX_EVENT_SIZE_IN_BYTES = 32 * 1024; // 32 KB
const MAX_BATCH_SIZE_IN_BYTES = 480 * 1024; // 480 KB (500 KB is the limit, leaving some padding)
const MAX_EVENT_SIZE_IN_KB = 32;
const MAX_BATCH_SIZE_IN_KB = 480; // (500 KB is the limit, leaving some padding)
export class ContextBatch {

@@ -13,14 +13,22 @@ constructor(maxEventCount) {

if (this.length === this.maxEventCount)
return false;
return {
success: false,
message: `Event limit of ${this.maxEventCount} has been exceeded.`,
};
const eventSize = this.calculateSize(item.context);
if (eventSize > MAX_EVENT_SIZE_IN_BYTES) {
// Event exceeds Segment's limits
return false;
if (eventSize > MAX_EVENT_SIZE_IN_KB * 1024) {
return {
success: false,
message: `Event exceeds maximum event size of ${MAX_EVENT_SIZE_IN_KB} KB`,
};
}
if (this.sizeInBytes + eventSize <= MAX_BATCH_SIZE_IN_BYTES) {
this.items.push(item);
this.sizeInBytes += eventSize;
return true;
if (this.sizeInBytes + eventSize > MAX_BATCH_SIZE_IN_KB * 1024) {
return {
success: false,
message: `Event has caused batch size to exceed ${MAX_BATCH_SIZE_IN_KB} KB`,
};
}
return false;
this.items.push(item);
this.sizeInBytes += eventSize;
return { success: true };
}

@@ -27,0 +35,0 @@ get length() {

@@ -8,4 +8,3 @@ import { Publisher } from './publisher';

}
export function configureNodePlugin(props) {
const publisher = new Publisher(props);
export function createNodePlugin(publisher) {
function action(ctx) {

@@ -29,2 +28,9 @@ normalizeEvent(ctx);

}
export const createConfiguredNodePlugin = (props) => {
const publisher = new Publisher(props);
return {
publisher: publisher,
plugin: createNodePlugin(publisher),
};
};
//# sourceMappingURL=index.js.map

@@ -24,6 +24,6 @@ import { backoff } from '@segment/analytics-core';

const batch = new ContextBatch(this._maxEventsInBatch);
this.batch = batch;
this._batch = batch;
this.pendingFlushTimeout = setTimeout(() => {
if (batch === this.batch) {
this.batch = undefined;
if (batch === this._batch) {
this._batch = undefined;
}

@@ -39,4 +39,21 @@ this.pendingFlushTimeout = undefined;

this.pendingFlushTimeout && clearTimeout(this.pendingFlushTimeout);
this.batch = undefined;
this._batch = undefined;
}
flushAfterClose(pendingItemsCount) {
if (!pendingItemsCount) {
// if number of pending items is 0, there will never be anything else entering the batch, since the app is closed.
return;
}
this._closeAndFlushPendingItemsCount = pendingItemsCount;
// if batch is empty, there's nothing to flush, and when things come in, enqueue will handle them.
if (!this._batch)
return;
// the number of globally pending items will always be larger or the same as batch size.
// Any mismatch is because some globally pending items are in plugins.
const isExpectingNoMoreItems = this._batch.length === pendingItemsCount;
if (isExpectingNoMoreItems) {
this.send(this._batch).catch(noop);
this.clearBatch();
}
}
/**

@@ -48,3 +65,3 @@ * Enqueues the context for future delivery.

enqueue(ctx) {
const batch = this.batch ?? this.createBatch();
const batch = this._batch ?? this.createBatch();
const { promise: ctxPromise, resolve } = extractPromiseParts();

@@ -60,3 +77,3 @@ const pendingItem = {

Add an event to the existing batch.
Success: Check if batch is full and send if it is.
Success: Check if batch is full or no more items are expected to come in (i.e. closing). If so, send batch.
Failure: Assume event is too big to fit in current batch - send existing batch.

@@ -67,4 +84,7 @@ Add an event to the new batch.

*/
if (batch.tryAdd(pendingItem)) {
if (batch.length === this._maxEventsInBatch) {
const addStatus = batch.tryAdd(pendingItem);
if (addStatus.success) {
const isExpectingNoMoreItems = batch.length === this._closeAndFlushPendingItemsCount;
const isFull = batch.length === this._maxEventsInBatch;
if (isFull || isExpectingNoMoreItems) {
this.send(batch).catch(noop);

@@ -75,3 +95,4 @@ this.clearBatch();

}
else if (batch.length) {
// If the new item causes the maximimum event size to be exceeded, send the current batch and create a new one.
if (batch.length) {
this.send(batch).catch(noop);

@@ -81,10 +102,6 @@ this.clearBatch();

const fallbackBatch = this.createBatch();
if (!fallbackBatch.tryAdd(pendingItem)) {
ctx.setFailedDelivery({
reason: new Error(`Event exceeds maximum event size of 32 kb`),
});
return Promise.resolve(ctx);
}
else {
if (fallbackBatch.length === this._maxEventsInBatch) {
const fbAddStatus = fallbackBatch.tryAdd(pendingItem);
if (fbAddStatus.success) {
const isExpectingNoMoreItems = fallbackBatch.length === this._closeAndFlushPendingItemsCount;
if (isExpectingNoMoreItems) {
this.send(fallbackBatch).catch(noop);

@@ -95,4 +112,14 @@ this.clearBatch();

}
else {
// this should only occur if max event size is exceeded
ctx.setFailedDelivery({
reason: new Error(fbAddStatus.message),
});
return Promise.resolve(ctx);
}
}
async send(batch) {
if (this._closeAndFlushPendingItemsCount) {
this._closeAndFlushPendingItemsCount -= batch.length;
}
const events = batch.getEvents();

@@ -99,0 +126,0 @@ const payload = JSON.stringify({ batch: events });

@@ -1,5 +0,10 @@

import { EventProperties, Traits, Emitter, CoreAnalytics, CoreContext, CorePlugin, EventQueue, CoreOptions, Callback, CoreSegmentEvent, CoreEmitterContract } from '@segment/analytics-core';
import { EventProperties, Traits, CoreAnalytics, CoreContext, CorePlugin, CoreSegmentEvent, Integrations } from '@segment/analytics-core';
import { AnalyticsSettings } from './settings';
import { Callback } from './dispatch-emit';
import { NodeEmitter } from './emitter';
export declare class Context extends CoreContext {
}
export interface Plugin extends CorePlugin {
}
declare type Timestamp = string | Date;
/**

@@ -15,26 +20,18 @@ * An ID associated with the user. Note: at least one of userId or anonymousId must be included.

};
/** Events from CoreOptions */
export interface SegmentEventOptions {
context?: Context;
timestamp?: CoreOptions['timestamp'];
}
/**
* Map of emitter event names to method args.
* A dictionary of extra context to attach to the call.
* Note: context differs from traits because it is not attributes of the user itself.
*/
declare type NodeEmitterEvents = CoreEmitterContract<Context> & {
initialize: [AnalyticsSettings];
call_after_close: [SegmentEvent];
drained: [];
};
declare type AdditionalContext = Record<string, any>;
declare type SegmentEventType = 'track' | 'page' | 'identify' | 'alias' | 'screen';
export interface SegmentEvent extends CoreSegmentEvent {
type: SegmentEventType;
options?: SegmentEventOptions;
}
export declare class Analytics extends Emitter<NodeEmitterEvents> implements CoreAnalytics {
private _eventFactory;
export declare class Analytics extends NodeEmitter implements CoreAnalytics {
private readonly _eventFactory;
private _isClosed;
private _pendingEvents;
private readonly _closeAndFlushDefaultTimeout;
queue: EventQueue;
private readonly _publisher;
private readonly _queue;
ready: Promise<void>;

@@ -57,8 +54,9 @@ constructor(settings: AnalyticsSettings);

*/
alias({ userId, previousId, options, callback, }: {
alias({ userId, previousId, context, timestamp, integrations, }: {
userId: string;
previousId: string;
options?: SegmentEventOptions;
callback?: Callback;
}): void;
context?: AdditionalContext;
timestamp?: Timestamp;
integrations?: Integrations;
}, callback?: Callback): void;
/**

@@ -68,8 +66,9 @@ * Associates an identified user with a collective.

*/
group({ groupId, userId, anonymousId, traits, options, callback, }: IdentityOptions & {
group({ timestamp, groupId, userId, anonymousId, traits, context, integrations, }: IdentityOptions & {
groupId: string;
traits?: Traits;
options?: SegmentEventOptions;
callback?: Callback;
}): void;
context?: AdditionalContext;
timestamp?: Timestamp;
integrations?: Integrations;
}, callback?: Callback): void;
/**

@@ -79,7 +78,7 @@ * Includes a unique userId and (maybe anonymousId) and any optional traits you know about them.

*/
identify({ userId, anonymousId, traits, options, callback, }: IdentityOptions & {
identify({ userId, anonymousId, traits, context, integrations, }: IdentityOptions & {
traits?: Traits;
options?: SegmentEventOptions;
callback?: Callback;
}): void;
context?: AdditionalContext;
integrations?: Integrations;
}, callback?: Callback): void;
/**

@@ -89,10 +88,10 @@ * The page method lets you record page views on your website, along with optional extra information about the page being viewed.

*/
page({ userId, anonymousId, category, name, properties, options, timestamp, callback, }: IdentityOptions & {
page({ userId, anonymousId, category, name, properties, context, timestamp, integrations, }: IdentityOptions & {
category?: string;
name?: string;
properties?: EventProperties;
callback?: Callback;
timestamp?: string | Date;
options?: SegmentEventOptions;
}): void;
timestamp?: Timestamp;
context?: AdditionalContext;
integrations?: Integrations;
}, callback?: Callback): void;
/**

@@ -104,3 +103,3 @@ * Records screen views on your app, along with optional extra information

*/
screen({ userId, anonymousId, category, name, properties, options, callback, timestamp, }: Parameters<Analytics['page']>[0]): void;
screen({ userId, anonymousId, category, name, properties, context, timestamp, integrations, }: Parameters<Analytics['page']>[0], callback?: Callback): void;
/**

@@ -110,8 +109,9 @@ * Records actions your users perform.

*/
track({ userId, anonymousId, event, properties, options, callback, }: IdentityOptions & {
track({ userId, anonymousId, event, properties, context, timestamp, integrations, }: IdentityOptions & {
event: string;
properties?: EventProperties;
options?: SegmentEventOptions;
callback?: Callback;
}): void;
context?: AdditionalContext;
timestamp?: Timestamp;
integrations?: Integrations;
}, callback?: Callback): void;
/**

@@ -118,0 +118,0 @@ * Registers one or more plugins to augment Analytics functionality.

@@ -1,2 +0,1 @@

import { CorePlugin } from '@segment/analytics-core';
export interface AnalyticsSettings {

@@ -8,5 +7,2 @@ /**

/**
* An optional array of additional plugins that are capable of augmenting analytics-node functionality and enriching data.
*/
plugins?: CorePlugin[];
/**

@@ -13,0 +9,0 @@ * The base URL of the API. Default: "https://api.segment.io"

@@ -1,2 +0,2 @@

export * from './app/analytics-node';
export { Analytics, Context, Plugin } from './app/analytics-node';
export type { AnalyticsSettings } from './app/settings';

@@ -3,0 +3,0 @@ import { Analytics } from './app/analytics-node';

@@ -12,3 +12,8 @@ import { CoreContext, CoreSegmentEvent } from '@segment/analytics-core';

constructor(maxEventCount: number);
tryAdd(item: PendingItem): boolean;
tryAdd(item: PendingItem): {
success: true;
} | {
success: false;
message: string;
};
get length(): number;

@@ -15,0 +20,0 @@ private calculateSize;

import { CorePlugin } from '@segment/analytics-core';
import { PublisherProps } from './publisher';
import { Publisher, PublisherProps } from './publisher';
declare type DefinedPluginFields = 'name' | 'type' | 'version' | 'isLoaded' | 'load' | 'alias' | 'group' | 'identify' | 'page' | 'screen' | 'track';
declare type SegmentNodePlugin = CorePlugin & Required<Pick<CorePlugin, DefinedPluginFields>>;
export declare type ConfigureNodePluginProps = PublisherProps;
export declare function configureNodePlugin(props: ConfigureNodePluginProps): SegmentNodePlugin;
export declare function createNodePlugin(publisher: Publisher): SegmentNodePlugin;
export declare const createConfiguredNodePlugin: (props: ConfigureNodePluginProps) => {
publisher: Publisher;
plugin: SegmentNodePlugin;
};
export {};
//# sourceMappingURL=index.d.ts.map

@@ -15,3 +15,3 @@ import { CoreContext } from '@segment/analytics-core';

private pendingFlushTimeout?;
private batch?;
private _batch?;
private _flushInterval;

@@ -22,5 +22,7 @@ private _maxEventsInBatch;

private _url;
private _closeAndFlushPendingItemsCount?;
constructor({ host, path, maxRetries, maxEventsInBatch, flushInterval, writeKey, }: PublisherProps);
private createBatch;
private clearBatch;
flushAfterClose(pendingItemsCount: number): void;
/**

@@ -27,0 +29,0 @@ * Enqueues the context for future delivery.

{
"name": "@segment/analytics-node",
"version": "0.0.1-beta.5",
"version": "0.0.1-beta.6",
"main": "./dist/cjs/src/index.js",

@@ -36,3 +36,3 @@ "module": "./dist/esm/src/index.js",

"@lukeed/uuid": "^2.0.0",
"@segment/analytics-core": "1.1.3",
"@segment/analytics-core": "1.1.4",
"node-fetch": "^2.6.7",

@@ -39,0 +39,0 @@ "tslib": "^2.4.0"

@@ -22,3 +22,6 @@ # @segment/analytics-node

import { Analytics } from '@segment/analytics-node'
// or, if you use require:
const { Analytics } = require('@segment/analytics-node')
// instantiation
const analytics = new Analytics({ writeKey: '<MY_WRITE_KEY>' })

@@ -40,3 +43,3 @@

})
res.sendStatus(200)
res.sendStatus(201)
});

@@ -53,4 +56,5 @@ ```

```ts
const analytics = new Analytics('YOUR_WRITE_KEY', {
host: "https://events.eu1.segmentapis.com"
const analytics = new Analytics({
...
host: "https://events.eu1.segmentapis.com"
});

@@ -64,3 +68,2 @@ ```

writeKey: '<MY_WRITE_KEY>',
plugins: [plugin1, plugin2],
host: 'https://api.segment.io',

@@ -73,3 +76,2 @@ path: '/v1/batch',

})
```

@@ -92,3 +94,6 @@

```ts
const analytics = new Analytics({ '<MY_WRITE_KEY>', { maxEventsInBatch: 1 });
const analytics = new Analytics({
...
maxEventsInBatch: 1
});
```

@@ -99,6 +104,9 @@ Batching means that your message might not get sent right away. But every method call takes an optional callback, which you can use to know when a particular message is flushed from the queue, like so:

analytics.track({
userId: '019mr8mf4r',
event: 'Ultimate Played'
callback: (ctx) => console.log(ctx)
})
userId: '019mr8mf4r',
event: 'Ultimate Played',
},
(err, ctx) => {
...
}
)
```

@@ -153,3 +161,2 @@ ## Error Handling

['SIGINT', 'SIGTERM'].forEach((code) => process.on(code, onExit))
```

@@ -172,6 +179,6 @@

```ts
// subscribe to identify calls
// subscribe to specific events
analytics.on('identify', (err) => console.error(err))
// subscribe to a specific event
analytics.on('track', (ctx) => console.log(ctx))

@@ -187,4 +194,4 @@ ```

const marketingAnalytics = new Analytics('MARKETING_WRITE_KEY');
const appAnalytics = new Analytics('APP_WRITE_KEY');
const marketingAnalytics = new Analytics({ writeKey: 'MARKETING_WRITE_KEY' });
const appAnalytics = new Analytics({ writeKey: 'APP_WRITE_KEY' });
```

@@ -197,3 +204,3 @@

3. Log events and errors the event emitter:
3. Log events.
```js

@@ -208,2 +215,16 @@ ['initialize', 'call_after_close',

## Development: Disabling Analytics for Tests
- If you want to intercept / disable analytics for integration tests, you can use something like [nock](https://github.com/nock/nock)
```ts
// Note: nock will _not_ work if polyfill fetch with something like undici, as nock uses the http module. Undici has its own interception method.
import nock from 'nock'
nock('https://api.segment.io')
.post('/v1/batch')
.reply(201)
.persist()
```
## Differences from legacy analytics-node / Migration Guide

@@ -221,11 +242,9 @@

- Instantiation requires an object
- Instantiation now requires an _object_ as the first argument.
```ts
// old
var analytics = new Analytics('YOUR_WRITE_KEY'); // not supported
var analytics = new Analytics('YOUR_WRITE_KEY');
// new
const analytics = new Analytics({ writeKey: 'YOUR_WRITE_KEY' });
// new!
const analytics = new Analytics({ writeKey: '<MY_WRITE_KEY>' })
```

@@ -248,47 +267,87 @@ - Graceful shutdown (See Graceful Shutdown section)

- `flushAt` configuration option -> `maxEventsInBatch`.
- `callback` option is moved to configuration
- `callback` call signature is different
```ts
// old
analytics.track({
userId: '019mr8mf4r',
event: 'Ultimate Played'
}), function(err, batch){
if (err) {
console.error(err)
(err, batch) => void
// new
(err, ctx) => void
```
## Plugin Architecture
- See segment's [documentation for plugin architecture](https://segment.com/docs/connections/sources/catalog/libraries/website/javascript/#plugin-architecture).
```ts
import type { Plugin } from '@segment/analytics-node'
export const lowercase: Plugin = {
name: 'Lowercase events',
type: 'enrichment',
version: '1.0.0',
isLoaded: () => true,
load: () => Promise.resolve(),
track: (ctx) => {
ctx.updateEvent('event', ctx.event.event.toLowerCase())
return ctx
}
});
}
// new
analytics.register(lowercase)
```
## Selecting Destinations
The alias, group, identify, page and track calls can all be passed an object of integrations that lets you turn certain destinations on or off. By default all destinations are enabled.
Here’s an example with the integrations object shown:
```ts
analytics.track({
userId: '019mr8mf4r',
event: 'Ultimate Played',
callback: (ctx) => {
if (ctx.failedDelivery()) {
console.error(ctx)
}
event: 'Membership Upgraded',
userId: '97234974',
integrations: {
'All': false,
'Vero': true,
'Google Analytics': false
}
})
```
In this case, we’re specifying that we want this track to only go to Vero. All: false says that no destination should be enabled unless otherwise specified. Vero: true turns on Vero, etc.
## Development / Disabling Analytics
- If you want to disable analytics for unit tests, you can use something like [nock](https://github.com/nock/nock) or [jest mocks](https://jestjs.io/docs/manual-mocks).
Destination flags are case sensitive and match the [destination’s name in the docs](https://segment.com/docs/connections/destinations) (i.e. “AdLearn Open Platform”, “awe.sm”, “MailChimp”, etc.). In some cases, there may be several names for a destination; if that happens you’ll see a “Adding (destination name) to the Integrations Object” section in the destination’s doc page with a list of valid names.
You should prefer mocking. However, if you need to intercept the request, you can do:
Note:
- Available at the business level, filtering track calls can be done right from the Segment UI on your source schema page. We recommend using the UI if possible since it’s a much simpler way of managing your filters and can be updated with no code changes on your side.
- If you are on a grandfathered plan, events sent server-side that are filtered through the Segment dashboard will still count towards your API usage.
## Usage in AWS Lambda
- [AWS lambda execution environment](https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html) is challenging for typically non-response-blocking async activites like tracking or logging, since the runtime terminates / freezes after a response is emitted.
Here is an example of using analytics.js within a handler:
```ts
// Note: nock will _not_ work if polyfill fetch with something like undici, as nock uses the http module. Undici has its own interception method.
import nock from 'nock'
const { Analytics } = require("@segment/analytics-node");
const mockApiHost = 'https://foo.bar'
const mockPath = '/foo'
// since analytics has the potential to be stateful if there are any plugins added,
// to be on the safe side, we should instantiate a new instance of analytics on every request (the cost of instantiation is low).
const analytics = () => new Analytics({
maxEventsInBatch: 1,
writeKey: '<MY_WRITE_KEY>',
})
.on("error", console.error);
nock(mockApiHost) // using regex matching in nock changes the perf profile quite a bit
.post(mockPath, (body) => true)
.reply(201)
.persist()
module.exports.handler = async (event) => {
...
// we need to await before returning, otherwise the lambda will exit before sending the request.
await new Promise((resolve) => {
analytics().track({
event: 'Hello world',
anonymousId: 'foo',
}, () => resolve())
const analytics = new Analytics({ host: mockApiHost, path: mockPath })
return {
statusCode: 200,
};
....
};
```
import {
EventProperties,
Traits,
Emitter,
CoreAnalytics,

@@ -10,15 +9,14 @@ CoreContext,

EventQueue,
dispatchAndEmit,
CoreOptions,
Callback,
CoreSegmentEvent,
bindAll,
PriorityQueue,
CoreEmitterContract,
pTimeout,
Integrations,
} from '@segment/analytics-core'
import { AnalyticsSettings, validateSettings } from './settings'
import { version } from '../../package.json'
import { configureNodePlugin } from '../plugins/segmentio'
import { createConfiguredNodePlugin } from '../plugins/segmentio'
import { createNodeEventFactory } from '../lib/create-node-event-factory'
import { Callback, dispatchAndEmit } from './dispatch-emit'
import { NodeEmitter } from './emitter'

@@ -28,2 +26,5 @@ // create a derived class since we may want to add node specific things to Context later

export interface Plugin extends CorePlugin {}
type Timestamp = string | Date
/**

@@ -36,16 +37,7 @@ * An ID associated with the user. Note: at least one of userId or anonymousId must be included.

/** Events from CoreOptions */
export interface SegmentEventOptions {
context?: Context
timestamp?: CoreOptions['timestamp']
}
/**
* Map of emitter event names to method args.
* A dictionary of extra context to attach to the call.
* Note: context differs from traits because it is not attributes of the user itself.
*/
type NodeEmitterEvents = CoreEmitterContract<Context> & {
initialize: [AnalyticsSettings]
call_after_close: [SegmentEvent] // any event that did not get dispatched due to close
drained: []
}
type AdditionalContext = Record<string, any>

@@ -70,15 +62,14 @@ class NodePriorityQueue extends PriorityQueue<Context> {

type: SegmentEventType
options?: SegmentEventOptions
}
export class Analytics
extends Emitter<NodeEmitterEvents>
implements CoreAnalytics
{
private _eventFactory: EventFactory
export class Analytics extends NodeEmitter implements CoreAnalytics {
private readonly _eventFactory: EventFactory
private _isClosed = false
private _pendingEvents = 0
private readonly _closeAndFlushDefaultTimeout: number
private readonly _publisher: ReturnType<
typeof createConfiguredNodePlugin
>['publisher']
queue: EventQueue
private readonly _queue: EventQueue

@@ -92,3 +83,3 @@ ready: Promise<void>

this._eventFactory = createNodeEventFactory()
this.queue = new EventQueue(new NodePriorityQueue())
this._queue = new EventQueue(new NodePriorityQueue())

@@ -99,12 +90,12 @@ const flushInterval = settings.flushInterval ?? 10000

this.ready = this.register(
configureNodePlugin({
writeKey: settings.writeKey,
host: settings.host,
path: settings.path,
maxRetries: settings.maxRetries ?? 3,
maxEventsInBatch: settings.maxEventsInBatch ?? 15,
flushInterval,
})
).then(() => undefined)
const { plugin, publisher } = createConfiguredNodePlugin({
writeKey: settings.writeKey,
host: settings.host,
path: settings.path,
maxRetries: settings.maxRetries ?? 3,
maxEventsInBatch: settings.maxEventsInBatch ?? 15,
flushInterval,
})
this._publisher = publisher
this.ready = this.register(plugin).then(() => undefined)

@@ -131,2 +122,3 @@ this.emit('initialize', settings)

} = {}): Promise<void> {
this._publisher.flushAfterClose(this._pendingEvents)
this._isClosed = true

@@ -151,5 +143,3 @@ const promise = new Promise<void>((resolve) => {

dispatchAndEmit(segmentEvent, this.queue, this, {
callback: callback,
})
dispatchAndEmit(segmentEvent, this._queue, this, callback)
.catch((ctx) => ctx)

@@ -169,16 +159,25 @@ .finally(() => {

*/
alias({
userId,
previousId,
options,
callback,
}: {
/* The new user id you want to associate with the user. */
userId: string
/* The previous id that the user was recognized by (this can be either a userId or an anonymousId). */
previousId: string
options?: SegmentEventOptions
alias(
{
userId,
previousId,
context,
timestamp,
integrations,
}: {
/* The new user id you want to associate with the user. */
userId: string
/* The previous id that the user was recognized by (this can be either a userId or an anonymousId). */
previousId: string
context?: AdditionalContext
timestamp?: Timestamp
integrations?: Integrations
},
callback?: Callback
}): void {
const segmentEvent = this._eventFactory.alias(userId, previousId, options)
): void {
const segmentEvent = this._eventFactory.alias(userId, previousId, {
context,
integrations,
timestamp,
})
this._dispatch(segmentEvent, callback)

@@ -191,19 +190,26 @@ }

*/
group({
groupId,
userId,
anonymousId,
traits = {},
options = {},
callback,
}: IdentityOptions & {
groupId: string
traits?: Traits
options?: SegmentEventOptions
group(
{
timestamp,
groupId,
userId,
anonymousId,
traits = {},
context,
integrations,
}: IdentityOptions & {
groupId: string
traits?: Traits
context?: AdditionalContext
timestamp?: Timestamp
integrations?: Integrations
},
callback?: Callback
}): void {
): void {
const segmentEvent = this._eventFactory.group(groupId, traits, {
...options,
context,
anonymousId,
userId,
timestamp,
integrations,
})

@@ -218,17 +224,21 @@

*/
identify({
userId,
anonymousId,
traits = {},
options,
callback,
}: IdentityOptions & {
traits?: Traits
options?: SegmentEventOptions
identify(
{
userId,
anonymousId,
traits = {},
context,
integrations,
}: IdentityOptions & {
traits?: Traits
context?: AdditionalContext
integrations?: Integrations
},
callback?: Callback
}): void {
): void {
const segmentEvent = this._eventFactory.identify(userId, traits, {
...options,
context,
anonymousId,
userId,
integrations,
})

@@ -242,22 +252,25 @@ this._dispatch(segmentEvent, callback)

*/
page({
userId,
anonymousId,
category,
name,
properties,
options,
timestamp,
callback,
}: IdentityOptions & {
/* The category of the page. Useful for cases like ecommerce where many pages might live under a single category. */
category?: string
/* The name of the page.*/
name?: string
/* A dictionary of properties of the page. */
properties?: EventProperties
page(
{
userId,
anonymousId,
category,
name,
properties,
context,
timestamp,
integrations,
}: IdentityOptions & {
/* The category of the page. Useful for cases like ecommerce where many pages might live under a single category. */
category?: string
/* The name of the page.*/
name?: string
/* A dictionary of properties of the page. */
properties?: EventProperties
timestamp?: Timestamp
context?: AdditionalContext
integrations?: Integrations
},
callback?: Callback
timestamp?: string | Date
options?: SegmentEventOptions
}): void {
): void {
const segmentEvent = this._eventFactory.page(

@@ -267,3 +280,3 @@ category ?? null,

properties,
{ ...options, anonymousId, userId, timestamp }
{ context, anonymousId, userId, timestamp, integrations }
)

@@ -279,12 +292,15 @@ this._dispatch(segmentEvent, callback)

*/
screen({
userId,
anonymousId,
category,
name,
properties,
options,
callback,
timestamp,
}: Parameters<Analytics['page']>[0]): void {
screen(
{
userId,
anonymousId,
category,
name,
properties,
context,
timestamp,
integrations,
}: Parameters<Analytics['page']>[0],
callback?: Callback
): void {
const segmentEvent = this._eventFactory.screen(

@@ -294,3 +310,3 @@ category ?? null,

properties,
{ ...options, anonymousId, userId, timestamp }
{ context, anonymousId, userId, timestamp, integrations }
)

@@ -305,19 +321,26 @@

*/
track({
userId,
anonymousId,
event,
properties,
options,
callback,
}: IdentityOptions & {
event: string
properties?: EventProperties
options?: SegmentEventOptions
track(
{
userId,
anonymousId,
event,
properties,
context,
timestamp,
integrations,
}: IdentityOptions & {
event: string
properties?: EventProperties
context?: AdditionalContext
timestamp?: Timestamp
integrations?: Integrations
},
callback?: Callback
}): void {
): void {
const segmentEvent = this._eventFactory.track(event, properties, {
...options,
context,
userId,
anonymousId,
timestamp,
integrations,
})

@@ -333,7 +356,7 @@

async register(...plugins: CorePlugin<any, any>[]): Promise<void> {
return this.queue.criticalTasks.run(async () => {
return this._queue.criticalTasks.run(async () => {
const ctx = Context.system()
const registrations = plugins.map((xt) =>
this.queue.register(ctx, xt, this)
this._queue.register(ctx, xt, this)
)

@@ -356,5 +379,5 @@ await Promise.all(registrations)

const deregistrations = pluginNames.map(async (pl) => {
const plugin = this.queue.plugins.find((p) => p.name === pl)
const plugin = this._queue.plugins.find((p) => p.name === pl)
if (plugin) {
return this.queue.deregister(ctx, plugin, this)
return this._queue.deregister(ctx, plugin, this)
} else {

@@ -361,0 +384,0 @@ ctx.log('warn', `plugin ${pl} not found`)

@@ -1,2 +0,2 @@

import { CorePlugin, ValidationError } from '@segment/analytics-core'
import { ValidationError } from '@segment/analytics-core'

@@ -9,5 +9,2 @@ export interface AnalyticsSettings {

/**
* An optional array of additional plugins that are capable of augmenting analytics-node functionality and enriching data.
*/
plugins?: CorePlugin[]
/**

@@ -14,0 +11,0 @@ * The base URL of the API. Default: "https://api.segment.io"

@@ -1,2 +0,2 @@

export * from './app/analytics-node'
export { Analytics, Context, Plugin } from './app/analytics-node'
export type { AnalyticsSettings } from './app/settings'

@@ -3,0 +3,0 @@

import { v4 as uuid } from '@lukeed/uuid'
import { CoreContext, CoreSegmentEvent } from '@segment/analytics-core'
const MAX_EVENT_SIZE_IN_BYTES = 32 * 1024 // 32 KB
const MAX_BATCH_SIZE_IN_BYTES = 480 * 1024 // 480 KB (500 KB is the limit, leaving some padding)
const MAX_EVENT_SIZE_IN_KB = 32
const MAX_BATCH_SIZE_IN_KB = 480 // (500 KB is the limit, leaving some padding)

@@ -21,16 +21,29 @@ interface PendingItem {

}
public tryAdd(item: PendingItem) {
if (this.length === this.maxEventCount) return false
public tryAdd(
item: PendingItem
): { success: true } | { success: false; message: string } {
if (this.length === this.maxEventCount)
return {
success: false,
message: `Event limit of ${this.maxEventCount} has been exceeded.`,
}
const eventSize = this.calculateSize(item.context)
if (eventSize > MAX_EVENT_SIZE_IN_BYTES) {
// Event exceeds Segment's limits
return false
if (eventSize > MAX_EVENT_SIZE_IN_KB * 1024) {
return {
success: false,
message: `Event exceeds maximum event size of ${MAX_EVENT_SIZE_IN_KB} KB`,
}
}
if (this.sizeInBytes + eventSize <= MAX_BATCH_SIZE_IN_BYTES) {
this.items.push(item)
this.sizeInBytes += eventSize
return true
if (this.sizeInBytes + eventSize > MAX_BATCH_SIZE_IN_KB * 1024) {
return {
success: false,
message: `Event has caused batch size to exceed ${MAX_BATCH_SIZE_IN_KB} KB`,
}
}
return false
this.items.push(item)
this.sizeInBytes += eventSize
return { success: true }
}

@@ -37,0 +50,0 @@

@@ -29,7 +29,3 @@ import { CoreContext, CorePlugin } from '@segment/analytics-core'

export function configureNodePlugin(
props: ConfigureNodePluginProps
): SegmentNodePlugin {
const publisher = new Publisher(props)
export function createNodePlugin(publisher: Publisher): SegmentNodePlugin {
function action(ctx: CoreContext): Promise<CoreContext> {

@@ -54,1 +50,9 @@ normalizeEvent(ctx)

}
export const createConfiguredNodePlugin = (props: ConfigureNodePluginProps) => {
const publisher = new Publisher(props)
return {
publisher: publisher,
plugin: createNodePlugin(publisher),
}
}

@@ -32,3 +32,3 @@ import { backoff, CoreContext } from '@segment/analytics-core'

private pendingFlushTimeout?: ReturnType<typeof setTimeout>
private batch?: ContextBatch
private _batch?: ContextBatch

@@ -40,2 +40,3 @@ private _flushInterval: number

private _url: string
private _closeAndFlushPendingItemsCount?: number

@@ -63,6 +64,6 @@ constructor({

const batch = new ContextBatch(this._maxEventsInBatch)
this.batch = batch
this._batch = batch
this.pendingFlushTimeout = setTimeout(() => {
if (batch === this.batch) {
this.batch = undefined
if (batch === this._batch) {
this._batch = undefined
}

@@ -79,5 +80,25 @@ this.pendingFlushTimeout = undefined

this.pendingFlushTimeout && clearTimeout(this.pendingFlushTimeout)
this.batch = undefined
this._batch = undefined
}
flushAfterClose(pendingItemsCount: number) {
if (!pendingItemsCount) {
// if number of pending items is 0, there will never be anything else entering the batch, since the app is closed.
return
}
this._closeAndFlushPendingItemsCount = pendingItemsCount
// if batch is empty, there's nothing to flush, and when things come in, enqueue will handle them.
if (!this._batch) return
// the number of globally pending items will always be larger or the same as batch size.
// Any mismatch is because some globally pending items are in plugins.
const isExpectingNoMoreItems = this._batch.length === pendingItemsCount
if (isExpectingNoMoreItems) {
this.send(this._batch).catch(noop)
this.clearBatch()
}
}
/**

@@ -89,3 +110,3 @@ * Enqueues the context for future delivery.

enqueue(ctx: CoreContext): Promise<CoreContext> {
const batch = this.batch ?? this.createBatch()
const batch = this._batch ?? this.createBatch()

@@ -104,3 +125,3 @@ const { promise: ctxPromise, resolve } = extractPromiseParts<CoreContext>()

Add an event to the existing batch.
Success: Check if batch is full and send if it is.
Success: Check if batch is full or no more items are expected to come in (i.e. closing). If so, send batch.
Failure: Assume event is too big to fit in current batch - send existing batch.

@@ -111,5 +132,8 @@ Add an event to the new batch.

*/
if (batch.tryAdd(pendingItem)) {
if (batch.length === this._maxEventsInBatch) {
const addStatus = batch.tryAdd(pendingItem)
if (addStatus.success) {
const isExpectingNoMoreItems =
batch.length === this._closeAndFlushPendingItemsCount
const isFull = batch.length === this._maxEventsInBatch
if (isFull || isExpectingNoMoreItems) {
this.send(batch).catch(noop)

@@ -119,3 +143,6 @@ this.clearBatch()

return ctxPromise
} else if (batch.length) {
}
// If the new item causes the maximimum event size to be exceeded, send the current batch and create a new one.
if (batch.length) {
this.send(batch).catch(noop)

@@ -127,9 +154,8 @@ this.clearBatch()

if (!fallbackBatch.tryAdd(pendingItem)) {
ctx.setFailedDelivery({
reason: new Error(`Event exceeds maximum event size of 32 kb`),
})
return Promise.resolve(ctx)
} else {
if (fallbackBatch.length === this._maxEventsInBatch) {
const fbAddStatus = fallbackBatch.tryAdd(pendingItem)
if (fbAddStatus.success) {
const isExpectingNoMoreItems =
fallbackBatch.length === this._closeAndFlushPendingItemsCount
if (isExpectingNoMoreItems) {
this.send(fallbackBatch).catch(noop)

@@ -139,2 +165,8 @@ this.clearBatch()

return ctxPromise
} else {
// this should only occur if max event size is exceeded
ctx.setFailedDelivery({
reason: new Error(fbAddStatus.message),
})
return Promise.resolve(ctx)
}

@@ -144,2 +176,5 @@ }

private async send(batch: ContextBatch) {
if (this._closeAndFlushPendingItemsCount) {
this._closeAndFlushPendingItemsCount -= batch.length
}
const events = batch.getEvents()

@@ -146,0 +181,0 @@ const payload = JSON.stringify({ batch: events })

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc