Security News
PyPI Introduces Digital Attestations to Strengthen Python Package Security
PyPI now supports digital attestations, enhancing security and trust by allowing package maintainers to verify the authenticity of Python packages.
@convex-dev/aggregate
Advanced tools
Convex component to calculate counts and sums of values for efficient aggregation.
Note: Convex Components are currently in beta
This Convex component calculates count and sums of values for efficient aggregation.
Suppose you have a leaderboard of game scores. These are some operations that the Aggregate component makes easy and efficient:
aggregate.count(ctx)
aggregate.count(ctx, { lower: { key: 65, inclusive: false } })
aggregate.at(ctx, Math.floor(aggregate.count(ctx) * 0.95))
aggregate.sum(ctx) / aggregate.count(ctx)
aggregate.indexOf(ctx, 65)
// aggregateScoreByUser is the leaderboard scores partitioned by username.
const bounds = { prefix: [username] };
const highScoreForUser = aggregateScoreByUser.max(ctx, bounds);
const avgScoreForUser =
aggregateScoreByUser.sum(ctx, bounds) /
aggregateScoreByUser.count(ctx, bounds);
The Aggregate component provides O(log(n))
-time lookups, instead of the O(n)
that would result from naive usage of .collect()
in Convex or COUNT(*)
in MySQL or Postgres.
With plain Convex indexes, you can insert new documents and you can paginate through all documents. But sometimes you want big-picture data that encompases many of your individual data points, without having to fetch them all. That's where aggregates come in.
The Aggregates component keeps a data structure with denormalized counts and sums. It's effectively a key-value store which is sorted by the key, and where you can count values and number of keys that lie between two keys.
The keys may be arbitrary Convex values, so you can choose to sort your data by:
key=null
for everything if you just want
a total count, such as for random access.You can use sorting to partition your data set, enabling namspacing, multitenancy, sharding, and more.
If you want to keep track of multiple games with scores for each user,
use a tuple of [game, username, score]
as the key.
Then you can bound your queries with a prefix of the key:
aggregateByGame.count(ctx, { prefix: [game] })
counts how many times
a given game has been playedaggregateByGame.count(ctx, { prefix: [game, username] })
counts how many
times a given user has played a given game.aggregateByGame.max(ctx, { prefix: [game, username] })
returns the high
score for a given user in a given game.Pay attention to the sort order when aggregating. While
aggregateByGame.max(ctx, { prefix: [game] })
looks like it might give the
highest score for a game, it actually gives the user with the highest username
who has played that game (like "Zach"). To get the highest score for a game, you
would need to aggregate with key [game, score]
.
To support different sorting and partitioning keys, you can define multiple instances. See below for details.
The Aggregate component can efficiently calculate all of these:
You'll need an existing Convex project to use the component. Convex is a hosted backend platform, including a database, serverless functions, and a ton more you can learn about here.
Run npm create convex
or follow any of the quickstarts to set one up.
See example/
for a working demo.
npm install @convex-dev/aggregate
convex.config.ts
file in your
app's convex/
folder and install the component by calling use
:// convex/convex.config.ts
import { defineApp } from "convex/server";
import aggregate from "@convex-dev/aggregate/convex.config";
const app = defineApp();
app.use(aggregate);
export default app;
You can aggregate multiple tables, multiple sort keys, or multiple values, but you need to make an instance of the aggregate component for each.
You do this by using the aggregate
component multiple times, giving each
usage its own name.
app.use(aggregate, { name: "aggregateScores" });
app.use(aggregate, { name: "aggregateByGame" });
You then use the named aggregate when initializing the TableAggregate
as we'll
see below, using components.aggregateScores
instead of components.aggregate
.
Usually you want to aggregate data in a Convex table. If you're aggregating data that's not in a table, you can use the lower-level API.
For table-based data, you can use the TableAggregate
to define how table data
will be sorted and summed in the aggregate component.
import { components } from "./_generated/api";
import { DataModel } from "./_generated/dataModel";
import { mutation as rawMutation } from "./_generated/server";
import { TableAggregate } from "@convex-dev/aggregate";
const aggregate = new TableAggregate<number, DataModel, "mytable">(
components.aggregate,
{
sortKey: (doc) => doc._creationTime, // Allows querying across time ranges.
sumValue: (doc) => doc.value, // The value to be used in `.sum` calculations.
}
);
Since these are happening in a mutation, you can rest assured that the table and its aggregate will update atomically.
Pick your key as described above. For example,
here's how you might define aggregateByGame
, as an aggregate on the "scores"
table:
const aggregateByGame = new TableAggregate<
[Id<"games">, string, number],
DataModel,
"leaderboard"
>(components.aggregateByGame, {
sortKey: (doc) => [doc.gameId, doc.username, doc.score],
});
When the table changes, you should update the aggregate as well, in the same mutation.
// When you insert into the table, call `aggregate.insert`
const id = await ctx.db.insert("mytable", { foo, bar });
const doc = await ctx.db.get(id);
await aggregate.insert(ctx, doc!);
// If you update a document, use `aggregate.replace`
const oldDoc = await ctx.db.get(id);
await ctx.db.patch(id, { foo });
const newDoc = await ctx.db.get(id);
await aggregate.replace(ctx, oldDoc!, newDoc!);
// And if you delete a document, use `aggregate.delete`
const oldDoc = await ctx.db.get(id);
await ctx.db.delete(id);
await aggregate.delete(ctx, oldDoc!);
It's important that every modification to the table also updates the associated aggregate. See tips below.
Note that Convex mutations are atomic, so you don't need to worry about race conditions where the document is written but the aggregate isn't, and you don't need to worry about a query reading a document that isn't in the aggregate yet.
If the table already has data before attaching the aggregate, run a migration to backfill.
Now that your Aggregate component has all of the data from your table, you can call any of the methods on your instance to aggregate data.
// convex/myfunctions.ts
// then in your queries and mutations you can do
const tableCount = await aggregateByGame.count(ctx);
// or any of the other examples listed above.
See more examples in
example/convex/leaderboard.ts
, and see the
docstrings on the Aggregate class.
To run the examples:
npm i && cd aggregate/example && npm i
npm run dev
and create a new projectleaderboard:addScore
and leaderboard:userAverageScore
.If you don't need the ordering, partitioning, or summing behavior of
TableAggregate
, there's a simpler interface you can use: Randomize
.
import { components } from "./_generated/api";
import { DataModel } from "./_generated/dataModel";
import { mutation as rawMutation } from "./_generated/server";
import { Randomize } from "@convex-dev/aggregate";
import { customMutation } from "convex-helpers/server/customFunctions";
// This is like TableAggregate but there's no key or sumValue.
const randomize = new Randomize<DataModel, "mytable">(components.aggregate);
// In a mutation, insert into the component when you insert into your table.
const id = await ctx.db.insert("mytable", data);
await randomize.insert(ctx, id);
// As before, delete from the component when you delete from your table
await ctx.db.delete(id);
await randomize.delete(ctx, id);
// in a query, get the total document count.
const totalCount = await randomize.count(ctx);
// get a random document's id.
const randomId = await randomize.random(ctx);
See more examples in example/convex/shuffle.ts
,
including a paginated random shuffle of some music.
Convex supports infinite-scroll pagination which is reactive so you never have to worry about items going missing from your list. But sometimes you want to display separate pages of results on separate pages of your app.
For this example, imagine you have a table of photos
// convex/schema.ts
defineSchema({
photos: defineTable({
url: v.string(),
}),
});
And an aggregate defined with key as _creationTime
.
// convex/convex.config.ts
app.use(aggregate, { name: "photos" });
// convex/photos.ts
const photos = new TableAggregate<number, DataModel, "photos">(
components.photos,
{ sortKey: (doc) => doc._creationTime }
);
You can pick a page size and jump to any page once you have TableAggregate
to
map from offset to an index key.
In this example, if offset
is 100 and numItems
is 10, we get the hundredth
_creationTime
(in ascending order) and starting there we get the next ten
documents.
export const pageOfPhotos({
args: { offset: v.number(), numItems: v.number() },
handler: async (ctx, { offset, numItems }) => {
const { key } = await photos.at(ctx, offset);
return await ctx.db.query("photos")
.withIndex("by_creation_time", q=>q.gte("_creationTime", key))
.take(numItems);
},
});
See the full example in example/convex/photos.ts
.
Often you're aggregating over a table of data, but sometimes you want to
aggregate data that isn't stored anywhere else. For that, you can use the
DirectAggregate
interface, which is like TableAggregate
except you handle
insert, delete, and replace operations yourself.
import { components } from "./_generated/api";
import { DataModel } from "./_generated/dataModel";
import { DirectAggregate } from "@convex-dev/aggregate";
// The first generic parameter (number in this case) is the key.
// The second generic parameter (string in this case) should be unique to
// be a tie-breaker in case two data points have the same key.
const aggregate = new DirectAggregate<number, string>(components.aggregate);
// within a mutation, add values to be aggregated
await aggregate.insert(ctx, key, id);
// if you want to use `.sum` to aggregate sums of values, insert with a sumValue
await aggregate.insert(ctx, key, id, sumValue);
// or delete values that were previously added
await aggregate.delete(ctx, key, id);
// or update values
await aggregate.replace(ctx, oldKey, newKey, id);
See example/convex/stats.ts
for an example.
You've set up your aggregate. Now let's see how to backfill it to account for existing data, keep the data in sync with mutations, or repair it when the data gets out of sync.
Adding aggregation to an existing table requires a migration. There are several ways to perform migrations, but here's an overview of one way:
insertIfDoesNotExist
/replaceOrInsert
/deleteIfExists
in place of
insert
/replace
/delete
(or idempotentTrigger
in place of trigger
)
to update items in the aggregate. These methods act the same, except they
work even if the aggregate component isn't in sync with the table.insertIfDoesNotExist
. In the example,
you would run runAggregateBackfill
in
leaderboard.ts.Aggregate
, you can start calling
read methods like aggregate.count(ctx)
and you can change the write methods
back (insertIfDoesNotExist
-> insert
etc.).It's important that every modification to the table also updates the associated aggregate. If they get out of sync then computed aggregates might be incorrect. Then you might have to fix them.
There are three ways to go about keeping data in sync:
aggregate
in any mutation that updates the
source-of-truth table.// Example of a mutation that calls `insertScore`.
export const playAGame = mutation(async (ctx) => {
...
await insertScore(ctx, gameId, user1, user1Score);
await insertScore(ctx, gameId, user2, user2Score);
});
// All inserts to the "scores" table go through this function.
async function insertScore(ctx, gameId, username, score) {
const id = await ctx.db.insert("scores", { gameId, username, score });
await doc = await ctx.db.get(id);
await aggregateByGame.insert(ctx, doc!);
}
Trigger
,
which automatically runs code when a mutation changes the data in a table.// Triggers hook up writes to the table to the TableAggregate.
const triggers = new Triggers<DataModel>();
triggers.register("mytable", aggregate.trigger());
export const mutation = customMutation(rawMutation, customCtx(triggers.wrapDB));
The example/convex/photos.ts
example uses
a trigger.
If some mutation or direct write in the Dashboard updated the source of truth data without writing to the aggregate, they can get out of sync and the returned aggregates may be incorrect.
The simplest way to fix is to start over. Either call
await aggregate.clear(ctx)
or rename the component like
app.use(aggregate, { name: "newName" })
which will reset it to be empty. Then
follow the instructions from above.
There is an alternative which doesn't clear the aggregates: compare the source
of truth to the aggregate table. You can use db.query("mytable").paginate()
on your Convex table and aggregate.paginate()
on the aggregate. Update the
aggregates based on the diff of these two paginated data streams.
Like all Convex queries, aggregates are reactive, and updating them is transactional.
If aggregated data updates infrequently, everything runs smoothly. However, if aggregated data updates frequently, the reactivity and atomicity can cause issues: reactive queries can rerun often, and mutations can slow down.
Reactivity means if you query an aggregate, like a count, sum, rank, offset-based page, etc. your UI will automatically update to reflect updates.
If someone gets a new high score, everyone else's leaderboard will show them moving down, and the total count of scores will increase. If I add a new song, it will automatically get shuffled into the music album.
Don't worry about polling to get new results. Don't worry about data needing a few seconds to propagate through the system. And you don't need to refresh your browser. As soon as the data is updated, the aggregates are updated everywhere, including the user's UI.
Transactionality means if you do multiple writes in the same mutation, like adding data to a table and inserting it into an aggregate, those operations are performed together. No query or mutation can observe a race condition where the data exists in the table but not in the aggregate. And if two mutations insert data into an aggregate, the count will go up by two, even if the mutations are running in parallel.
There's a special transactional property of components that is even better than the Convex guarantees you're used to. If you were to keep a denormalized count with a normal Convex mutation, you'll notice that the TypeScript can run with various orderings, messing up the final result.
// You might try to do this before experiencing the wonders of the Aggregate component.
async function increment(ctx: MutationCtx) {
const doc = (await ctx.query("count").unique())!;
await ctx.db.patch(doc._id, { value: doc.value + 1 });
}
export const addTwo = mutation({
handler: async (ctx) => {
await Promise.all([increment(ctx), increment(ctx)]);
},
});
When you call the addTwo
mutation, the count will increase by... one.
That's because TypeScript runs both db.query
s before running the db.patch
s.
But with the Aggregate component, the count goes up by two as intended. That's
because component operations are atomic.
export const addTwo = mutation({
handler: async (ctx) => {
await Promise.all([
aggregate.insert(ctx, "some key", "a"),
aggregate.insert(ctx, "other key", "b"),
]);
},
});
You may have noticed that Aggregate
methods can be called from actions, unlike
ctx.db
. This was an accident, but it works, so let's call it a feature! In
particular, each Aggregate
method called from any context, including from an
action, will be atomic within itself. However, we recommend calling the methods
from a mutation or query so they can be transactional with other database
reads and writes.
Reactivity and transactionality can be amazing for user experience, but if you observe issues with queries rerunning often or mutations slowing down or throwing errors, you may need to learn about the internals of the aggregate component. Specifically, how reads and writes intersect.
When a query calls await aggregate.count(ctx)
, this depends on
the entire aggregate data structure. When any mutation changes the data
structure, i.e. insert
, delete
, or replace
, the query reruns and sends new
results to the frontend. This is necessary to keep the frontend looking snappy,
but it can cause large function call and bandwidth usage on Convex.
When a mutation calls await aggregate.count(ctx)
, then this mutation needs
to run transactionally
relative to other mutations. Another mutation that does an insert
, delete
,
or replace
can cause an OCC conflict.
In order to calculate in O(log(n))
time, the aggregate component stores
denormalized counts in an internal data structure. Data points with nearby keys
may have their counts accumulated in one place.
Imagine the leaderboard aggregate defined with key=[username, score]
. Users
"Laura" and "Lauren" have adjacent keys, so there is a node internal to the
Aggregate component that includes the counts of Laura and Lauren combined.
If Laura is looking at her own high score, this involves reading from the
internal node shared with Lauren. So when Lauren gets a new high score,
Laura's query reruns (but its result doesn't change). And when Laura and Lauren
both get new scores at the same time, their mutations will run slower to make
the change to the internal node transactional.
Corollary: if a table's aggregate uses a key on _creationTime
,
each new data point will be added to the same part of the data structure (the
end), because _creationTime
keeps increasing. Therefore all inserts will
wait for each other and no mutations can run in parallel.
To reduce the read dependency footprint of your query, you can partition your
aggregate space and make sure you're using bounds
whenever possible. Examples:
// This query only reads scores between 95 and 100, so in a query it only reruns
// when a score in that range changes, and in a mutation it only conflicts with
// mutations that modify a score in that range.
await aggregateByScore.count(ctx, {
lower: { key: 95, inclusive: false },
upper: { key: 100, inclusive: true },
});
// This query only reads data from a specific user, so it will only rerun or
// conflict when a mutation modifies that user.
await aggregateScoreByUser.count(ctx, { prefix: [username] });
The aggregate data structure internally denormalizes counts so they can be calculated efficiently by only reading a few documents instead of every document in your table.
However, this isn't always required: we can trade off speed and database bandwidth for reduced impact of writes.
By default, the root aggregation document is lazy; it doesn't store a count.
This means aggregate.count(ctx)
has to look at several documents instead of
just one, but it also means that an insert at a very small key won't intersect
with a write or read on a very large key.
If you want to maximize query speed without worrying about conflicts, e.g.
because the data changes infrequently but queries are frequent, you can turn
off the default behavior by starting over with aggregate.clear(ctx, 16, false)
which set rootLazy
to false
.
Another way to optimize lazy aggregation is to increase the maxNodeSize
of the
aggregate data structure. e.g. if the root is lazy and maxNodeSize
is the
default of 16, that means each write updates some document that accumulates
1/16th of the entire data structure. So each write will intersect with 1/16th of
all other writes, and reads may spuriously rerun 1/16th of the time. To increase
maxNodeSize
, run aggregate.clear(ctx, maxNodeSize)
and start over.
Found a bug? Feature request? File it here.
FAQs
Convex component to calculate counts and sums of values for efficient aggregation.
We found that @convex-dev/aggregate demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 0 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
PyPI now supports digital attestations, enhancing security and trust by allowing package maintainers to verify the authenticity of Python packages.
Security News
GitHub removed 27 malicious pull requests attempting to inject harmful code across multiple open source repositories, in another round of low-effort attacks.
Security News
RubyGems.org has added a new "maintainer" role that allows for publishing new versions of gems. This new permission type is aimed at improving security for gem owners and the service overall.