Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@segment/analytics-node

Package Overview
Dependencies
Maintainers
249
Versions
43
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@segment/analytics-node - npm Package Compare versions

Comparing version 0.0.1-rc.2 to 0.0.1-rc.3

dist/cjs/app/analytics-node.js

60

CHANGELOG.md

@@ -7,2 +7,62 @@ # @segment/analytics-node

- Updated dependencies [[`69154c3`](https://github.com/segmentio/analytics-next/commit/69154c31f0739c3d1e31c3fd4d0f075fac721289)]:
- @segment/analytics-core@1.2.2
## 0.0.1
### Patch Changes
- Updated dependencies [[`43897d6`](https://github.com/segmentio/analytics-next/commit/43897d6ffc5f6c7be6a9dec569997348b8c93e51)]:
- @segment/analytics-core@1.2.1
## 0.1.0
### Minor Changes
- [#738](https://github.com/segmentio/analytics-next/pull/738) [`fed489c`](https://github.com/segmentio/analytics-next/commit/fed489cbf2e5b4c0f8423453e24831ec5dcdd7ce) Thanks [@silesky](https://github.com/silesky)! - Make trait fields nullable. Type traits for group() differently than identify() call.
### Patch Changes
- Updated dependencies [[`fed489c`](https://github.com/segmentio/analytics-next/commit/fed489cbf2e5b4c0f8423453e24831ec5dcdd7ce), [`61688e2`](https://github.com/segmentio/analytics-next/commit/61688e251ad2f60dae4cfd65cf59401c29ec66bd)]:
- @segment/analytics-core@1.2.0
## 0.0.1
### Patch Changes
- Updated dependencies [[`80e0d0a`](https://github.com/segmentio/analytics-next/commit/80e0d0a7d074422654cbebe0c3edb90e1d42ad62)]:
- @segment/analytics-core@1.1.6
## 0.0.1
### Patch Changes
- Updated dependencies [[`90b915a`](https://github.com/segmentio/analytics-next/commit/90b915ac3447d76673e98661c54bf5a0ced2a555), [`108c77e`](https://github.com/segmentio/analytics-next/commit/108c77e81a4e9d2a64eb56e78f707ae6c2ea6ed2)]:
- @segment/analytics-core@1.1.5
## 0.0.1
### Patch Changes
- Updated dependencies [[`ecb4b8d`](https://github.com/segmentio/analytics-next/commit/ecb4b8db0194e06a3ee3c8cae57d4f327d15dc02)]:
- @segment/analytics-core@1.1.4
## 0.0.1
### Patch Changes
- Updated dependencies [[`0b9f4d7`](https://github.com/segmentio/analytics-next/commit/0b9f4d7e82662f7d5fda3590e93b10b3fd2e9833)]:
- @segment/analytics-core@1.1.3
## 0.0.1
### Patch Changes
- Updated dependencies [[`98d1b12`](https://github.com/segmentio/analytics-next/commit/98d1b127082f5fc7904980a561220c64c26edff3)]:
- @segment/analytics-core@1.1.2
## 0.0.1
### Patch Changes
- Updated dependencies [[`409cae4`](https://github.com/segmentio/analytics-next/commit/409cae4b9ac404277aa44bab7428186129b42a35)]:

@@ -9,0 +69,0 @@ - @segment/analytics-core@1.1.1

27

package.json
{
"name": "@segment/analytics-node",
"version": "0.0.1-rc.2",
"main": "./dist/cjs/src/index.js",
"module": "./dist/esm/src/index.js",
"types": "./dist/types/src/index.d.ts",
"version": "0.0.1-rc.3",
"main": "./dist/cjs/index.js",
"module": "./dist/esm/index.js",
"types": "./dist/types/index.d.ts",
"exports": {
"require": "./dist/cjs/src/index.js",
"import": "./dist/esm/src/index.js"
"require": "./dist/cjs/index.js",
"import": "./dist/esm/index.js",
"types": "./dist/types/index.d.ts"
},

@@ -18,3 +19,3 @@ "files": [

"engines": {
"node": ">=12"
"node": ">=14"
},

@@ -24,6 +25,7 @@ "scripts": {

"lint": "yarn concurrently 'yarn:eslint .' 'yarn:tsc --noEmit'",
"build": "yarn concurrently 'yarn:build:*'",
"build": "rm -rf dist && yarn concurrently 'yarn:build:*'",
"build:cjs": "yarn tsc -p tsconfig.build.json --outDir ./dist/cjs --module commonjs",
"version": "sh scripts/version.sh",
"build:esm": "yarn tsc -p tsconfig.build.json",
"watch": "yarn build:esm --watch --incremental",
"watch": "yarn build:esm --watch",
"watch:test": "yarn test --watch",

@@ -37,9 +39,10 @@ "tsc": "yarn run -T tsc",

"dependencies": {
"@segment/analytics-core": "1.1.1",
"@lukeed/uuid": "^2.0.0",
"@segment/analytics-core": "1.2.2",
"node-fetch": "^2.6.7",
"tslib": "^2.4.0"
"tslib": "^2.4.1"
},
"devDependencies": {
"@internal/config": "0.0.0",
"@types/node": "^12.12.14"
"@types/node": "^14"
},

@@ -46,0 +49,0 @@ "packageManager": "yarn@3.2.1",

@@ -0,6 +1,7 @@

# @segment/analytics-node
> ### Warning: This library is in [public beta](https://segment.com/legal/first-access-beta-preview) ⚠️
## Warning: Until 1.x release, use this library at your own risk!
While the API is very similar, the documentation for the legacy SDK (`analytics-node`) is here: https://segment.com/docs/connections/sources/catalog/libraries/server/node/
## Requirements
- Node.js >= 14
## Quick Start

@@ -17,9 +18,12 @@ ### Install library

### Usage (assuming some express-like web framework)
### Usage
Assuming some express-like web framework.
```ts
import { Analytics } from '@segment/analytics-node'
// or, if you use require:
const { Analytics } = require('@segment/analytics-node')
// instantiation
const analytics = new Analytics({ writeKey: '<MY_WRITE_KEY>' })
app.post('/login', (req, res) => {

@@ -30,2 +34,3 @@ analytics.identify({

})
res.sendStatus(200)
})

@@ -39,21 +44,9 @@

})
res.sendStatus(201)
});
```
## Complete Settings / Configuration
See complete list of settings in the [AnalyticsSettings interface](src/app/settings.ts).
```ts
new Analytics({
writeKey: '<MY_WRITE_KEY>',
host: 'https://api.segment.io',
path: '/v1/batch',
flushInterval: 10000,
plugins: [plugin1, plugin2],
// ... and more!
})
```
## Graceful Shutdown
### Avoid losing events on exit!
## Graceful Exit
Avoid losing events on exit!
* Call `.closeAndFlush()` to stop collecting new events and flush all existing events.

@@ -66,29 +59,15 @@ * If a callback on an event call is included, this also waits for all callbacks to be called, and any of their subsequent promises to be resolved.

```
### Graceful Shutdown: Advanced Example
#### Advanced Example
```ts
import { Analytics } from '@segment/analytics-node'
import express from 'express'
const analytics = new Analytics({ writeKey: '<MY_WRITE_KEY>' })
const app = express()
app.post('/cart', (req, res) => {
analytics.track({
userId: req.body.userId,
event: 'Add to cart',
properties: { productId: '123456' }
})
});
const server = app.listen(3000)
const onExit = async () => {
console.log("Gracefully closing server...");
await analytics.closeAndFlush() // flush all existing events
server.close(() => process.exit());
};
process.on("SIGINT", onExit);
process.on("SIGTERM", onExit);
await analytics.closeAndFlush()
server.close(() => {
console.log("Gracefully closing server...")
process.exit()
})
}
['SIGINT', 'SIGTERM'].forEach((code) => process.on(code, onExit))
```

@@ -105,16 +84,273 @@

console.log(unflushedEvents) // all events that came in after closeAndFlush was called
```
## Configuration Settings
See complete list of settings in the [AnalyticsSettings interface](src/app/settings.ts).
```ts
const analytics = new Analytics({
writeKey: '<MY_WRITE_KEY>',
host: 'https://api.segment.io',
path: '/v1/batch',
maxRetries: 3,
maxEventsInBatch: 15,
flushInterval: 10000,
// ... and more!
})
```
## Regional configuration
For Business plans with access to Regional Segment, you can use the host configuration parameter to send data to the desired region:
## Event Emitter
Oregon (Default) — api.segment.io/v1
Dublin — events.eu1.segmentapis.com
An example of setting the host to the EU endpoint using the Node library would be:
```ts
// listen globally to events
analytics.on('identify', (ctx) => console.log(ctx.event))
const analytics = new Analytics({
...
host: "https://events.eu1.segmentapis.com"
});
```
// listen for errors (if needed)
## Batching
Our libraries are built to support high performance environments. That means it is safe to use our Node library on a web server that’s serving thousands of requests per second.
Every method you call does not result in an HTTP request, but is queued in memory instead. Messages are then flushed in batch in the background, which allows for much faster operation.
By default, our library will flush:
- Every 15 messages (controlled by `settings.maxEventsInBatch`).
- If 10 seconds has passed since the last flush (controlled by `settings.flushInterval`)
There is a maximum of 500KB per batch request and 32KB per call.
If you don’t want to batch messages, you can turn batching off by setting the `maxEventsInBatch` setting to 1, like so:
```ts
const analytics = new Analytics({
...
maxEventsInBatch: 1
});
```
Batching means that your message might not get sent right away. But every method call takes an optional callback, which you can use to know when a particular message is flushed from the queue, like so:
```ts
analytics.track({
userId: '019mr8mf4r',
event: 'Ultimate Played',
},
(err, ctx) => {
...
}
)
```
## Error Handling
Subscribe and log all event delivery errors.
```ts
const analytics = new Analytics({ writeKey: '<MY_WRITE_KEY>' })
analytics.on('error', (err) => console.error(err))
```
## Event Emitter Interface
You can see the complete list of emitted events in [emitter.ts](src/app/emitter.ts)
```ts
analytics.on('error', (err) => console.error(err))
analytics.on('identify', (ctx) => console.log(ctx))
analytics.on('track', (ctx) => console.log(ctx))
```
#### You can use the emitter to log all HTTP Requests
```ts
analytics.on('http_request', (event) => console.log(event))
// when triggered, emits an event of the shape:
{
url: 'https://api.segment.io/v1/batch',
method: 'POST',
headers: {
'Content-Type': 'application/json',
...
},
body: '...',
}
```
## Multiple Clients
Different parts of your application may require different types of batching, or even sending to multiple Segment sources. In that case, you can initialize multiple instances of Analytics with different settings:
```ts
const marketingAnalytics = new Analytics({ writeKey: 'MARKETING_WRITE_KEY' });
const appAnalytics = new Analytics({ writeKey: 'APP_WRITE_KEY' });
```
## Troubleshooting
1. Double check that you’ve followed all the steps in the Quick Start.
2. Make sure that you’re calling a Segment API method once the library is successfully installed: identify, track, etc.
3. Log events.
```js
['initialize', 'call_after_close',
'screen', 'identify', 'group',
'track', 'ready', 'alias',
'page', 'error', 'register',
'deregister'].forEach((event) => analytics.on(event, console.log)
```
## Development: Disabling Analytics for Tests
- If you want to intercept / disable analytics for integration tests, you can use something like [nock](https://github.com/nock/nock)
```ts
// Note: nock will _not_ work if polyfill fetch with something like undici, as nock uses the http module. Undici has its own interception method.
import nock from 'nock'
nock('https://api.segment.io')
.post('/v1/batch')
.reply(201)
.persist()
```
## Differences from legacy analytics-node / Migration Guide
- Named imports.
```ts
// old
import Analytics from 'analytics-node'
// new
import { Analytics } from '@segment/analytics-node'
```
- Instantiation now requires an _object_ as the first argument.
```ts
// old
var analytics = new Analytics('YOUR_WRITE_KEY'); // not supported
// new!
const analytics = new Analytics({ writeKey: '<MY_WRITE_KEY>' })
```
- Graceful shutdown (See Graceful Shutdown section)
```ts
// old
await analytics.flush(function(err, batch) {
console.log('Flushed, and now this program can exit!');
});
// new
await analytics.closeAndFlush()
```
Other Differences:
- The `enable` configuration option has been removed-- see "Disabling Analytics" section
- the `errorHandler` configuration option has been remove -- see "Error Handling" section
- `flushAt` configuration option -> `maxEventsInBatch`.
- `callback` call signature is different
```ts
// old
(err, batch) => void
// new
(err, ctx) => void
```
- Undocumented behavior around `track` properties have been removed.
```ts
// old, undocumented behavior
analytics.track({
...
event: 'Ultimate Played',
myProp: 'abc'
})
// new
analytics.track({
...
event: 'Ultimate Played',
properties: {
myProp: 'abc'
}
})
```
## Plugin Architecture
- See segment's [documentation for plugin architecture](https://segment.com/docs/connections/sources/catalog/libraries/website/javascript/#plugin-architecture).
```ts
import type { Plugin } from '@segment/analytics-node'
export const lowercase: Plugin = {
name: 'Lowercase events',
type: 'enrichment',
version: '1.0.0',
isLoaded: () => true,
load: () => Promise.resolve(),
track: (ctx) => {
ctx.updateEvent('event', ctx.event.event.toLowerCase())
return ctx
}
}
analytics.register(lowercase)
```
## Selecting Destinations
The alias, group, identify, page and track calls can all be passed an object of integrations that lets you turn certain destinations on or off. By default all destinations are enabled.
Here’s an example with the integrations object shown:
```ts
analytics.track({
event: 'Membership Upgraded',
userId: '97234974',
integrations: {
'All': false,
'Vero': true,
'Google Analytics': false
}
})
```
In this case, we’re specifying that we want this track to only go to Vero. All: false says that no destination should be enabled unless otherwise specified. Vero: true turns on Vero, etc.
Destination flags are case sensitive and match the [destination’s name in the docs](https://segment.com/docs/connections/destinations) (i.e. “AdLearn Open Platform”, “awe.sm”, “MailChimp”, etc.). In some cases, there may be several names for a destination; if that happens you’ll see a “Adding (destination name) to the Integrations Object” section in the destination’s doc page with a list of valid names.
Note:
- Available at the business level, filtering track calls can be done right from the Segment UI on your source schema page. We recommend using the UI if possible since it’s a much simpler way of managing your filters and can be updated with no code changes on your side.
- If you are on a grandfathered plan, events sent server-side that are filtered through the Segment dashboard will still count towards your API usage.
## Usage in AWS Lambda
- [AWS lambda execution environment](https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html) is challenging for typically non-response-blocking async activites like tracking or logging, since the runtime terminates / freezes after a response is emitted.
Here is an example of using analytics.js within a handler:
```ts
const { Analytics } = require("@segment/analytics-node");
// since analytics has the potential to be stateful if there are any plugins added,
// to be on the safe side, we should instantiate a new instance of analytics on every request (the cost of instantiation is low).
const analytics = () => new Analytics({
maxEventsInBatch: 1,
writeKey: '<MY_WRITE_KEY>',
})
.on("error", console.error);
module.exports.handler = async (event) => {
...
// we need to await before returning, otherwise the lambda will exit before sending the request.
await new Promise((resolve) =>
analytics().track({ event: 'My Event', anonymousId: 'foo' }, resolve)
)
return {
statusCode: 200,
};
....
};
```

@@ -0,79 +1,30 @@

import { CoreAnalytics, bindAll, pTimeout } from '@segment/analytics-core'
import { AnalyticsSettings, validateSettings } from './settings'
import { version } from '../generated/version'
import { createConfiguredNodePlugin } from '../plugins/segmentio'
import { NodeEventFactory } from './event-factory'
import { Callback, dispatchAndEmit } from './dispatch-emit'
import { NodeEmitter } from './emitter'
import {
EventProperties,
Traits,
Emitter,
CoreAnalytics,
CoreContext,
CorePlugin,
EventFactory,
EventQueue,
dispatchAndEmit,
CoreOptions,
Callback,
CoreSegmentEvent,
bindAll,
PriorityQueue,
CoreEmitterContract,
pTimeout,
} from '@segment/analytics-core'
import { AnalyticsSettings, validateSettings } from './settings'
import { version } from '../../package.json'
import { configureNodePlugin } from '../plugins/segmentio'
AliasParams,
GroupParams,
IdentifyParams,
PageParams,
TrackParams,
Plugin,
SegmentEvent,
} from './types'
import { Context } from './context'
import { NodeEventQueue } from './event-queue'
// create a derived class since we may want to add node specific things to Context later
export class Context extends CoreContext {}
/**
* An ID associated with the user. Note: at least one of userId or anonymousId must be included.
**/
type IdentityOptions =
| { userId: string; anonymousId?: string }
| { userId?: string; anonymousId: string }
/** Events from CoreOptions */
export interface SegmentEventOptions {
context?: Context
timestamp?: CoreOptions['timestamp']
}
/**
* Map of emitter event names to method args.
*/
type NodeEmitterEvents = CoreEmitterContract<Context> & {
initialize: [AnalyticsSettings]
call_after_close: [SegmentEvent] // any event that did not get dispatched due to close
drained: []
}
class NodePriorityQueue extends PriorityQueue<Context> {
constructor() {
super(3, [])
}
// do not use an internal "seen" map
getAttempts(ctx: Context): number {
return ctx.attempts ?? 0
}
updateAttempts(ctx: Context): number {
ctx.attempts = this.getAttempts(ctx) + 1
return this.getAttempts(ctx)
}
}
type SegmentEventType = 'track' | 'page' | 'identify' | 'alias' | 'screen'
export interface SegmentEvent extends CoreSegmentEvent {
type: SegmentEventType
options?: SegmentEventOptions
}
export class Analytics
extends Emitter<NodeEmitterEvents>
implements CoreAnalytics
{
private _eventFactory: EventFactory
export class Analytics extends NodeEmitter implements CoreAnalytics {
private readonly _eventFactory: NodeEventFactory
private _isClosed = false
private _pendingEvents = 0
private readonly _closeAndFlushDefaultTimeout: number
private readonly _publisher: ReturnType<
typeof createConfiguredNodePlugin
>['publisher']
queue: EventQueue
private readonly _queue: NodeEventQueue

@@ -85,5 +36,6 @@ ready: Promise<void>

validateSettings(settings)
this._eventFactory = new EventFactory()
this.queue = new EventQueue(new NodePriorityQueue())
this._eventFactory = new NodeEventFactory()
this._queue = new NodeEventQueue()
const flushInterval = settings.flushInterval ?? 10000

@@ -93,4 +45,4 @@

this.ready = this.register(
configureNodePlugin({
const { plugin, publisher } = createConfiguredNodePlugin(
{
writeKey: settings.writeKey,

@@ -101,5 +53,9 @@ host: settings.host,

maxEventsInBatch: settings.maxEventsInBatch ?? 15,
httpRequestTimeout: settings.httpRequestTimeout,
flushInterval,
})
).then(() => undefined)
},
this as NodeEmitter
)
this._publisher = publisher
this.ready = this.register(plugin).then(() => undefined)

@@ -126,2 +82,3 @@ this.emit('initialize', settings)

} = {}): Promise<void> {
this._publisher.flushAfterClose(this._pendingEvents)
this._isClosed = true

@@ -138,3 +95,3 @@ const promise = new Promise<void>((resolve) => {

private _dispatch(segmentEvent: CoreSegmentEvent, callback?: Callback) {
private _dispatch(segmentEvent: SegmentEvent, callback?: Callback) {
if (this._isClosed) {

@@ -147,5 +104,3 @@ this.emit('call_after_close', segmentEvent as SegmentEvent)

dispatchAndEmit(segmentEvent, this.queue, this, {
callback: callback,
})
dispatchAndEmit(segmentEvent, this._queue, this, callback)
.catch((ctx) => ctx)

@@ -165,16 +120,11 @@ .finally(() => {

*/
alias({
userId,
previousId,
options,
callback,
}: {
/* The new user id you want to associate with the user. */
userId: string
/* The previous id that the user was recognized by (this can be either a userId or an anonymousId). */
previousId: string
options?: SegmentEventOptions
alias(
{ userId, previousId, context, timestamp, integrations }: AliasParams,
callback?: Callback
}): void {
const segmentEvent = this._eventFactory.alias(userId, previousId, options)
): void {
const segmentEvent = this._eventFactory.alias(userId, previousId, {
context,
integrations,
timestamp,
})
this._dispatch(segmentEvent, callback)

@@ -187,19 +137,20 @@ }

*/
group({
groupId,
userId,
anonymousId,
traits = {},
options = {},
callback,
}: IdentityOptions & {
groupId: string
traits?: Traits
options?: SegmentEventOptions
group(
{
timestamp,
groupId,
userId,
anonymousId,
traits = {},
context,
integrations,
}: GroupParams,
callback?: Callback
}): void {
): void {
const segmentEvent = this._eventFactory.group(groupId, traits, {
...options,
context,
anonymousId,
userId,
timestamp,
integrations,
})

@@ -214,17 +165,19 @@

*/
identify({
userId,
anonymousId,
traits = {},
options,
callback,
}: IdentityOptions & {
traits?: Traits
options?: SegmentEventOptions
identify(
{
userId,
anonymousId,
traits = {},
context,
timestamp,
integrations,
}: IdentifyParams,
callback?: Callback
}): void {
): void {
const segmentEvent = this._eventFactory.identify(userId, traits, {
...options,
context,
anonymousId,
userId,
timestamp,
integrations,
})

@@ -238,22 +191,15 @@ this._dispatch(segmentEvent, callback)

*/
page({
userId,
anonymousId,
category,
name,
properties,
options,
timestamp,
callback,
}: IdentityOptions & {
/* The category of the page. Useful for cases like ecommerce where many pages might live under a single category. */
category?: string
/* The name of the page.*/
name?: string
/* A dictionary of properties of the page. */
properties?: EventProperties
page(
{
userId,
anonymousId,
category,
name,
properties,
context,
timestamp,
integrations,
}: PageParams,
callback?: Callback
timestamp?: string | Date
options?: SegmentEventOptions
}): void {
): void {
const segmentEvent = this._eventFactory.page(

@@ -263,3 +209,3 @@ category ?? null,

properties,
{ ...options, anonymousId, userId, timestamp }
{ context, anonymousId, userId, timestamp, integrations }
)

@@ -275,12 +221,15 @@ this._dispatch(segmentEvent, callback)

*/
screen({
userId,
anonymousId,
category,
name,
properties,
options,
callback,
timestamp,
}: Parameters<Analytics['page']>[0]): void {
screen(
{
userId,
anonymousId,
category,
name,
properties,
context,
timestamp,
integrations,
}: PageParams,
callback?: Callback
): void {
const segmentEvent = this._eventFactory.screen(

@@ -290,3 +239,3 @@ category ?? null,

properties,
{ ...options, anonymousId, userId, timestamp }
{ context, anonymousId, userId, timestamp, integrations }
)

@@ -301,19 +250,20 @@

*/
track({
userId,
anonymousId,
event,
properties,
options,
callback,
}: IdentityOptions & {
event: string
properties?: EventProperties
options?: SegmentEventOptions
track(
{
userId,
anonymousId,
event,
properties,
context,
timestamp,
integrations,
}: TrackParams,
callback?: Callback
}): void {
): void {
const segmentEvent = this._eventFactory.track(event, properties, {
...options,
context,
userId,
anonymousId,
timestamp,
integrations,
})

@@ -328,8 +278,8 @@

*/
async register(...plugins: CorePlugin<any, any>[]): Promise<void> {
return this.queue.criticalTasks.run(async () => {
register(...plugins: Plugin[]): Promise<void> {
return this._queue.criticalTasks.run(async () => {
const ctx = Context.system()
const registrations = plugins.map((xt) =>
this.queue.register(ctx, xt, this)
this._queue.register(ctx, xt, this)
)

@@ -351,6 +301,6 @@ await Promise.all(registrations)

const deregistrations = pluginNames.map(async (pl) => {
const plugin = this.queue.plugins.find((p) => p.name === pl)
const deregistrations = pluginNames.map((pl) => {
const plugin = this._queue.plugins.find((p) => p.name === pl)
if (plugin) {
return this.queue.deregister(ctx, plugin, this)
return this._queue.deregister(ctx, plugin, this)
} else {

@@ -357,0 +307,0 @@ ctx.log('warn', `plugin ${pl} not found`)

@@ -1,2 +0,2 @@

import { CorePlugin, ValidationError } from '@segment/analytics-core'
import { ValidationError } from '@segment/analytics-core'

@@ -9,5 +9,2 @@ export interface AnalyticsSettings {

/**
* An optional array of additional plugins that are capable of augmenting analytics-node functionality and enriching data.
*/
plugins?: CorePlugin[]
/**

@@ -33,2 +30,6 @@ * The base URL of the API. Default: "https://api.segment.io"

flushInterval?: number
/**
* The maximum number of milliseconds to wait for an http request. Default: 10000
*/
httpRequestTimeout?: number
}

@@ -35,0 +36,0 @@

@@ -1,2 +0,17 @@

export * from './app/analytics-node'
export { Analytics } from './app/analytics-node'
export { Context } from './app/context'
export type {
Plugin,
GroupTraits,
UserTraits,
TrackParams,
IdentifyParams,
AliasParams,
GroupParams,
PageParams,
} from './app/types'
export type { AnalyticsSettings } from './app/settings'
// export Analytics as both a named export and a default export (for backwards-compat. reasons)
import { Analytics } from './app/analytics-node'
export default Analytics
import { v4 as uuid } from '@lukeed/uuid'
import { CoreContext, CoreSegmentEvent } from '@segment/analytics-core'
import type { Context } from '../../app/context'
import { SegmentEvent } from '../../app/types'
const MAX_EVENT_SIZE_IN_BYTES = 32 * 1024 // 32 KB
const MAX_BATCH_SIZE_IN_BYTES = 480 * 1024 // 480 KB (500 KB is the limit, leaving some padding)
const MAX_EVENT_SIZE_IN_KB = 32
const MAX_BATCH_SIZE_IN_KB = 480 // (500 KB is the limit, leaving some padding)
interface PendingItem {
resolver: (ctx: CoreContext) => void
context: CoreContext
resolver: (ctx: Context) => void
context: Context
}

@@ -21,16 +22,29 @@

}
public tryAdd(item: PendingItem) {
if (this.length === this.maxEventCount) return false
public tryAdd(
item: PendingItem
): { success: true } | { success: false; message: string } {
if (this.length === this.maxEventCount)
return {
success: false,
message: `Event limit of ${this.maxEventCount} has been exceeded.`,
}
const eventSize = this.calculateSize(item.context)
if (eventSize > MAX_EVENT_SIZE_IN_BYTES) {
// Event exceeds Segment's limits
return false
if (eventSize > MAX_EVENT_SIZE_IN_KB * 1024) {
return {
success: false,
message: `Event exceeds maximum event size of ${MAX_EVENT_SIZE_IN_KB} KB`,
}
}
if (this.sizeInBytes + eventSize <= MAX_BATCH_SIZE_IN_BYTES) {
this.items.push(item)
this.sizeInBytes += eventSize
return true
if (this.sizeInBytes + eventSize > MAX_BATCH_SIZE_IN_KB * 1024) {
return {
success: false,
message: `Event has caused batch size to exceed ${MAX_BATCH_SIZE_IN_KB} KB`,
}
}
return false
this.items.push(item)
this.sizeInBytes += eventSize
return { success: true }
}

@@ -42,7 +56,7 @@

private calculateSize(ctx: CoreContext): number {
private calculateSize(ctx: Context): number {
return encodeURI(JSON.stringify(ctx.event)).split(/%..|i/).length
}
getEvents(): CoreSegmentEvent[] {
getEvents(): SegmentEvent[] {
const events = this.items.map(({ context }) => context.event)

@@ -52,3 +66,3 @@ return events

getContexts(): CoreContext[] {
getContexts(): Context[] {
return this.items.map((item) => item.context)

@@ -55,0 +69,0 @@ }

@@ -1,8 +0,16 @@

import { CoreContext, CorePlugin } from '@segment/analytics-core'
import { Publisher, PublisherProps } from './publisher'
import { version } from '../../generated/version'
import { detectRuntime } from '../../lib/env'
import { Plugin } from '../../app/types'
import { Context } from '../../app/context'
import { NodeEmitter } from '../../app/emitter'
function normalizeEvent(ctx: CoreContext) {
ctx.updateEvent('context.library.name', 'AnalyticsNode')
ctx.updateEvent('context.library.version', '1.0.0')
ctx.updateEvent('_metadata.nodeVersion', process.versions.node)
function normalizeEvent(ctx: Context) {
ctx.updateEvent('context.library.name', '@segment/analytics-node')
ctx.updateEvent('context.library.version', version)
const runtime = detectRuntime()
if (runtime === 'node') {
ctx.updateEvent('_metadata.nodeVersion', process.versions.node)
}
ctx.updateEvent('_metadata.jsRuntime', runtime)
}

@@ -23,13 +31,8 @@

type SegmentNodePlugin = CorePlugin &
Required<Pick<CorePlugin, DefinedPluginFields>>
type SegmentNodePlugin = Plugin & Required<Pick<Plugin, DefinedPluginFields>>
export type ConfigureNodePluginProps = PublisherProps
export function configureNodePlugin(
props: ConfigureNodePluginProps
): SegmentNodePlugin {
const publisher = new Publisher(props)
function action(ctx: CoreContext): Promise<CoreContext> {
export function createNodePlugin(publisher: Publisher): SegmentNodePlugin {
function action(ctx: Context): Promise<Context> {
normalizeEvent(ctx)

@@ -53,1 +56,12 @@ return publisher.enqueue(ctx)

}
export const createConfiguredNodePlugin = (
props: ConfigureNodePluginProps,
emitter: NodeEmitter
) => {
const publisher = new Publisher(props, emitter)
return {
publisher: publisher,
plugin: createNodePlugin(publisher),
}
}

@@ -1,6 +0,10 @@

import { backoff, CoreContext } from '@segment/analytics-core'
import fetch from 'node-fetch'
import { backoff } from '@segment/analytics-core'
import { abortSignalAfterTimeout } from '../../lib/abort'
import type { Context } from '../../app/context'
import { tryCreateFormattedUrl } from '../../lib/create-url'
import { extractPromiseParts } from '../../lib/extract-promise-parts'
import { fetch } from '../../lib/fetch'
import { ContextBatch } from './context-batch'
import { NodeEmitter } from '../../app/emitter'
import { b64encode } from '../../lib/base-64-encode'

@@ -14,4 +18,4 @@ function sleep(timeoutInMs: number): Promise<void> {

interface PendingItem {
resolver: (ctx: CoreContext) => void
context: CoreContext
resolver: (ctx: Context) => void
context: Context
}

@@ -26,2 +30,3 @@

writeKey: string
httpRequestTimeout?: number
}

@@ -34,3 +39,3 @@

private pendingFlushTimeout?: ReturnType<typeof setTimeout>
private batch?: ContextBatch
private _batch?: ContextBatch

@@ -42,15 +47,22 @@ private _flushInterval: number

private _url: string
constructor({
host,
path,
maxRetries,
maxEventsInBatch,
flushInterval,
writeKey,
}: PublisherProps) {
private _closeAndFlushPendingItemsCount?: number
private _httpRequestTimeout: number
private _emitter: NodeEmitter
constructor(
{
host,
path,
maxRetries,
maxEventsInBatch,
flushInterval,
writeKey,
httpRequestTimeout,
}: PublisherProps,
emitter: NodeEmitter
) {
this._emitter = emitter
this._maxRetries = maxRetries
this._maxEventsInBatch = Math.max(maxEventsInBatch, 1)
this._flushInterval = flushInterval
this._auth = Buffer.from(`${writeKey}:`).toString('base64')
this._auth = b64encode(`${writeKey}:`)
this._url = tryCreateFormattedUrl(

@@ -60,2 +72,3 @@ host ?? 'https://api.segment.io',

)
this._httpRequestTimeout = httpRequestTimeout ?? 10000
}

@@ -66,6 +79,6 @@

const batch = new ContextBatch(this._maxEventsInBatch)
this.batch = batch
this._batch = batch
this.pendingFlushTimeout = setTimeout(() => {
if (batch === this.batch) {
this.batch = undefined
if (batch === this._batch) {
this._batch = undefined
}

@@ -82,5 +95,25 @@ this.pendingFlushTimeout = undefined

this.pendingFlushTimeout && clearTimeout(this.pendingFlushTimeout)
this.batch = undefined
this._batch = undefined
}
flushAfterClose(pendingItemsCount: number) {
if (!pendingItemsCount) {
// if number of pending items is 0, there will never be anything else entering the batch, since the app is closed.
return
}
this._closeAndFlushPendingItemsCount = pendingItemsCount
// if batch is empty, there's nothing to flush, and when things come in, enqueue will handle them.
if (!this._batch) return
// the number of globally pending items will always be larger or the same as batch size.
// Any mismatch is because some globally pending items are in plugins.
const isExpectingNoMoreItems = this._batch.length === pendingItemsCount
if (isExpectingNoMoreItems) {
this.send(this._batch).catch(noop)
this.clearBatch()
}
}
/**

@@ -91,6 +124,6 @@ * Enqueues the context for future delivery.

*/
enqueue(ctx: CoreContext): Promise<CoreContext> {
const batch = this.batch ?? this.createBatch()
enqueue(ctx: Context): Promise<Context> {
const batch = this._batch ?? this.createBatch()
const { promise: ctxPromise, resolve } = extractPromiseParts<CoreContext>()
const { promise: ctxPromise, resolve } = extractPromiseParts<Context>()

@@ -107,3 +140,3 @@ const pendingItem: PendingItem = {

Add an event to the existing batch.
Success: Check if batch is full and send if it is.
Success: Check if batch is full or no more items are expected to come in (i.e. closing). If so, send batch.
Failure: Assume event is too big to fit in current batch - send existing batch.

@@ -114,5 +147,8 @@ Add an event to the new batch.

*/
if (batch.tryAdd(pendingItem)) {
if (batch.length === this._maxEventsInBatch) {
const addStatus = batch.tryAdd(pendingItem)
if (addStatus.success) {
const isExpectingNoMoreItems =
batch.length === this._closeAndFlushPendingItemsCount
const isFull = batch.length === this._maxEventsInBatch
if (isFull || isExpectingNoMoreItems) {
this.send(batch).catch(noop)

@@ -122,3 +158,6 @@ this.clearBatch()

return ctxPromise
} else if (batch.length) {
}
// If the new item causes the maximimum event size to be exceeded, send the current batch and create a new one.
if (batch.length) {
this.send(batch).catch(noop)

@@ -130,9 +169,8 @@ this.clearBatch()

if (!fallbackBatch.tryAdd(pendingItem)) {
ctx.setFailedDelivery({
reason: new Error(`Event exceeds maximum event size of 32 kb`),
})
return Promise.resolve(ctx)
} else {
if (fallbackBatch.length === this._maxEventsInBatch) {
const fbAddStatus = fallbackBatch.tryAdd(pendingItem)
if (fbAddStatus.success) {
const isExpectingNoMoreItems =
fallbackBatch.length === this._closeAndFlushPendingItemsCount
if (isExpectingNoMoreItems) {
this.send(fallbackBatch).catch(noop)

@@ -142,2 +180,8 @@ this.clearBatch()

return ctxPromise
} else {
// this should only occur if max event size is exceeded
ctx.setFailedDelivery({
reason: new Error(fbAddStatus.message),
})
return Promise.resolve(ctx)
}

@@ -147,2 +191,5 @@ }

private async send(batch: ContextBatch) {
if (this._closeAndFlushPendingItemsCount) {
this._closeAndFlushPendingItemsCount -= batch.length
}
const events = batch.getEvents()

@@ -157,4 +204,8 @@ const payload = JSON.stringify({ batch: events })

let failureReason: unknown
const [signal, timeoutId] = abortSignalAfterTimeout(
this._httpRequestTimeout
)
try {
const response = await fetch(this._url, {
const requestInit = {
signal: signal,
method: 'POST',

@@ -167,4 +218,15 @@ headers: {

body: payload,
}
this._emitter.emit('http_request', {
url: this._url,
method: requestInit.method,
headers: requestInit.headers,
body: requestInit.body,
})
const response = await fetch(this._url, requestInit)
clearTimeout(timeoutId)
if (response.ok) {

@@ -171,0 +233,0 @@ // Successfully sent events, so exit!

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc