
Research
/Security News
10 npm Typosquatted Packages Deploy Multi-Stage Credential Harvester
Socket researchers found 10 typosquatted npm packages that auto-run on install, show fake CAPTCHAs, fingerprint by IP, and deploy a credential stealer.
@castore/event-storage-adapter-dynamodb
Advanced tools
DRY Castore EventStorageAdapter implementation using DynamoDB
DRY Castore EventStorageAdapter implementation using AWS DynamoDB.
# npm
npm install @castore/event-storage-adapter-dynamodb
# yarn
yarn add @castore/event-storage-adapter-dynamodb
This package has @castore/core and @aws-sdk/client-dynamodb (above v3) as peer dependencies, so you will have to install them as well:
# npm
npm install @castore/core @aws-sdk/client-dynamodb
# yarn
yarn add @castore/core @aws-sdk/client-dynamodb
This library exposes two adapters:
DynamoDBSingleTableEventStorageAdapter which can plug several event stores to a single DynamoDB table.DynamoDBEventStorageAdapter which needs a DynamoDB table per event store.The legacy DynamoDBEventStorageAdapter is still exposed for backward compatibility. It will be deprecated and renamed LegacyDynamoDBEventStorageAdapter in the v2, to be finally removed in the v3.
Documentation:
DynamoDBSingleTableEventStorageAdapterimport { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBSingleTableEventStorageAdapter } from '@castore/event-storage-adapter-dynamodb';
const dynamoDBClient = new DynamoDBClient({});
const pokemonsEventStorageAdapter = new DynamoDBSingleTableEventStorageAdapter(
{
tableName: 'my-table-name',
dynamoDBClient,
},
);
// π Alternatively, provide a getter
const pokemonsEventStorageAdapter =
new DynamoDBSingleTableEventStorageAdapter({
tableName: () => process.env.MY_TABLE_NAME,
dynamoDBClient,
});
const pokemonsEventStore = new EventStore({
...
eventStorageAdapter: pokemonsEventStorageAdapter,
});
This will directly plug your EventStore to DynamoDB π
This adapter persists aggregates in separate partitions: When persisting an event, its aggregateId, prefixed by the eventStoreId, is used as partition key (string attribute) and its version is used as sort key (number attribute).
A Global Secondary Index is also required to efficiently retrieve the event store aggregates ids (listAggregateIds operation). Only initial events (version = 1) are projected. A KEYS_ONLY projection type is sufficient.
// π Initial event
{
"aggregateId": "POKEMONS#123", // <= Partition key
"version": 1, // <= Sort key
"eventStoreId": "POKEMONS", // <= initialEvents index partition key
"timestamp": "2022-01-01T00:00:00.000Z", // <= initialEvents index sort key
"type": "POKEMON_APPEARED",
"payload": { "name": "Pikachu", "level": 42 },
"metadata": { "trigger": "random" }
}
// π Non-initial event
{
"aggregateId": "POKEMONS#123",
"version": 2,
// Event is not projected on initialEvents index (to limit costs)
"timestamp": "2023-01-01T00:00:00.000Z",
"type": "POKEMON_LEVELED_UP"
}
The getEvents method (which is used by the getAggregate and getExistingAggregate methods of the EventStore class) uses consistent reads, so is always consistent.
The pushEvent method is a write operation and so is always consistent. It is conditioned to avoid race conditions, as required by the Castore specifications.
By design, the listAggregateIds operation can only be eventually consistent (GSIs reads cannot be consistent).
Note that if you define your infrastructure as code in TypeScript, you can directly use this package instead of hard-coding the below values:
import {
EVENT_TABLE_PK, // => aggregateId
EVENT_TABLE_SK, // => version
EVENT_TABLE_INITIAL_EVENT_INDEX_NAME, // => initialEvents
EVENT_TABLE_EVENT_STORE_ID_KEY, // => eventStoreId
EVENT_TABLE_TIMESTAMP_KEY, // => timestamp
} from '@castore/event-storage-adapter-dynamodb';
{
"Type": "AWS::DynamoDB::Table",
"Properties": {
"AttributeDefinitions": [
{ "AttributeName": "aggregateId", "AttributeType": "S" },
{ "AttributeName": "version", "AttributeType": "N" }
{ "AttributeName": "eventStoreId", "AttributeType": "S" },
{ "AttributeName": "timestamp", "AttributeType": "S" }
],
"KeySchema": [
{ "AttributeName": "aggregateId", "KeyType": "HASH" },
{ "AttributeName": "version", "KeyType": "RANGE" }
],
"GlobalSecondaryIndexes": [
{
"IndexName": "initialEvents",
"KeySchema": [
{ "AttributeName": "eventStoreId", "KeyType": "HASH" },
{ "AttributeName": "timestamp", "KeyType": "RANGE" }
],
"Projection": "KEYS_ONLY"
}
]
}
}
import { Table, AttributeType, ProjectionType } from 'aws-cdk-lib/aws-dynamodb';
const { STRING, NUMBER } = AttributeType;
const { KEYS_ONLY } = ProjectionType;
const pokemonsEventsTable = new Table(scope, 'PokemonEvents', {
partitionKey: {
name: 'aggregateId',
type: STRING,
},
sortKey: {
name: 'version',
type: NUMBER,
},
});
pokemonsEventsTable.addGlobalSecondaryIndex({
indexName: 'initialEvents',
partitionKey: {
name: 'eventStoreId',
type: STRING,
},
sortKey: {
name: 'timestamp',
type: STRING,
},
projectionType: KEYS_ONLY,
});
resource "aws_dynamodb_table" "pokemons-events-table" {
hash_key = "aggregateId"
range_key = "version"
attribute {
name = "aggregateId"
type = "S"
}
attribute {
name = "version"
type = "N"
}
attribute {
name = "eventStoreId"
type = "S"
}
attribute {
name = "timestamp"
type = "S"
}
global_secondary_index {
name = "initialEvents"
hash_key = "eventStoreId"
range_key = "timestamp"
projection_type = "KEYS_ONLY"
}
}
This adapter implements the EventGroups API using the DynamoDB Transactions API:
import { EventStore } from '@castore/core';
// π TransactWriteItems N events simultaneously
await EventStore.pushEventGroup(
// events are correctly typed π
eventStoreA.groupEvent(eventA1),
eventStoreA.groupEvent(eventA2),
eventStoreB.groupEvent(eventB),
...
);
Note that:
DynamoDBSingleTableEventStorageAdapterTransactWriteItem API limitations: It can target up to 100 distinct events in one or more DynamoDB tables within the same AWS account and in the same Region.Required IAM permissions for each operations:
getEvents (+ getAggregate, getExistingAggregate): dynamodb:QuerypushEvent: dynamodb:PutItemlistAggregateIds: dynamodb:Query on the initialEvents indexImageParserThis library also exposes a useful ImageParser class to parse DynamoDB stream images from a DynamoDBSingleTableEventStorageAdapter. It will build a correctly typed NotificationMessage ouf of a stream image, unmarshalling it, removing the prefix of the aggregateId in the process and validating the eventStoreId:
import { ImageParser } from '@castore/event-storage-adapter-dynamodb';
const imageParser = new ImageParser({
sourceEventStores: [pokemonsEventStore, trainersEventStore],
});
// π Typed as EventStoreNotificationMessage<
// typeof pokemonsEventStore
// | typeof trainersEventStore...
// >
const notificationMessage = imageParser.parseImage(
streamImage,
// π Optional options
unmarshallOptions,
);
DynamoDBEventStorageAdapterimport { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBEventStorageAdapter } from '@castore/event-storage-adapter-dynamodb';
const dynamoDBClient = new DynamoDBClient({});
const pokemonsEventStorageAdapter = new DynamoDBEventStorageAdapter({
tableName: 'my-table-name',
dynamoDBClient,
});
// π Alternatively, provide a getter
const pokemonsEventStorageAdapter = new DynamoDBEventStorageAdapter({
tableName: () => process.env.MY_TABLE_NAME,
dynamoDBClient,
});
const pokemonsEventStore = new EventStore({
...
eventStorageAdapter: pokemonsEventStorageAdapter
})
This will directly plug your EventStore to DynamoDB π
This adapter persists aggregates in separate partitions: When persisting an event, its aggregateId is used as partition key (string attribute) and its version is used as sort key (number attribute).
A Global Secondary Index is also required to efficiently retrieve the event store aggregates ids (listAggregateIds operation). Only initial events (version = 1) are projected. A KEYS_ONLY projection type is sufficient.
// π Initial event
{
"aggregateId": "123", // <= Partition key
"version": 1, // <= Sort key
"isInitialEvent": 1, // <= initialEvents index partition key
"timestamp": "2022-01-01T00:00:00.000Z", // <= initialEvents index sort key
"type": "POKEMON_APPEARED",
"payload": { "name": "Pikachu", "level": 42 },
"metadata": { "trigger": "random" }
}
// π Non-initial event
{
"aggregateId": "123",
"version": 2,
// Event is not projected on initialEvents index (to limit costs)
"timestamp": "2023-01-01T00:00:00.000Z",
"type": "POKEMON_LEVELED_UP"
}
The getEvents method (which is used by the getAggregate and getExistingAggregate methods of the EventStore class) uses consistent reads, so is always consistent.
The pushEvent method is a write operation and so is always consistent. It is conditioned to avoid race conditions, as required by the Castore specifications.
By design, the listAggregateIds operation can only be eventually consistent (GSIs reads cannot be consistent).
Note that if you define your infrastructure as code in TypeScript, you can directly use this package instead of hard-coding the below values:
import {
EVENT_TABLE_PK, // => aggregateId
EVENT_TABLE_SK, // => version
EVENT_TABLE_INITIAL_EVENT_INDEX_NAME, // => initialEvents
EVENT_TABLE_IS_INITIAL_EVENT_KEY, // => isInitialEvent
EVENT_TABLE_TIMESTAMP_KEY, // => timestamp
} from '@castore/event-storage-adapter-dynamodb';
{
"Type": "AWS::DynamoDB::Table",
"Properties": {
"AttributeDefinitions": [
{ "AttributeName": "aggregateId", "AttributeType": "S" },
{ "AttributeName": "version", "AttributeType": "N" }
{ "AttributeName": "isInitialEvent", "AttributeType": "N" },
{ "AttributeName": "timestamp", "AttributeType": "S" }
],
"KeySchema": [
{ "AttributeName": "aggregateId", "KeyType": "HASH" },
{ "AttributeName": "version", "KeyType": "RANGE" }
],
"GlobalSecondaryIndexes": [
{
"IndexName": "initialEvents",
"KeySchema": [
{ "AttributeName": "isInitialEvent", "KeyType": "HASH" },
{ "AttributeName": "timestamp", "KeyType": "RANGE" }
],
"Projection": "KEYS_ONLY"
}
]
}
}
import { Table, AttributeType, ProjectionType } from 'aws-cdk-lib/aws-dynamodb';
const { STRING, NUMBER } = AttributeType;
const { KEYS_ONLY } = ProjectionType;
const pokemonsEventsTable = new Table(scope, 'PokemonEvents', {
partitionKey: {
name: 'aggregateId',
type: STRING,
},
sortKey: {
name: 'version',
type: NUMBER,
},
});
pokemonsEventsTable.addGlobalSecondaryIndex({
indexName: 'initialEvents',
partitionKey: {
name: 'isInitialEvent',
type: NUMBER,
},
sortKey: {
name: 'timestamp',
type: STRING,
},
projectionType: KEYS_ONLY,
});
resource "aws_dynamodb_table" "pokemons-events-table" {
hash_key = "aggregateId"
range_key = "version"
attribute {
name = "aggregateId"
type = "S"
}
attribute {
name = "version"
type = "N"
}
attribute {
name = "isInitialEvent"
type = "N"
}
attribute {
name = "timestamp"
type = "S"
}
global_secondary_index {
name = "initialEvents"
hash_key = "isInitialEvent"
range_key = "timestamp"
projection_type = "KEYS_ONLY"
}
}
This adapter implements the EventGroups API using the DynamoDB Transactions API:
import { EventStore } from '@castore/core';
// π TransactWriteItems N events simultaneously
await EventStore.pushEventGroup(
// events are correctly typed π
eventStoreA.groupEvent(eventA1),
eventStoreA.groupEvent(eventA2),
eventStoreB.groupEvent(eventB),
...
);
Note that:
DynamoDBEventStorageAdapterTransactWriteItem API limitations: It can target up to 100 distinct events in one or more DynamoDB tables within the same AWS account and in the same Region.Required IAM permissions for each operations:
getEvents (+ getAggregate, getExistingAggregate): dynamodb:QuerypushEvent: dynamodb:PutItemlistAggregateIds: dynamodb:Query on the initialEvents indexFAQs
DRY Castore EventStorageAdapter implementation using DynamoDB
We found that @castore/event-storage-adapter-dynamodb demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago.Β It has 4 open source maintainers collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Research
/Security News
Socket researchers found 10 typosquatted npm packages that auto-run on install, show fake CAPTCHAs, fingerprint by IP, and deploy a credential stealer.

Product
Socket Firewall Enterprise is now available with flexible deployment, configurable policies, and expanded language support.

Security News
Open source dashboard CNAPulse tracks CVE Numbering Authoritiesβ publishing activity, highlighting trends and transparency across the CVE ecosystem.