Socket
Socket
Sign inDemoInstall

@sanity/migrate

Package Overview
Dependencies
Maintainers
44
Versions
183
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@sanity/migrate - npm Package Compare versions

Comparing version 3.25.1-canary.22 to 3.26.2-canary.47

lib/dts/src/_exports/index.d.ts.map

661

lib/_exports/index.esm.js
import FIFO from 'fast-fifo';
import { open } from 'node:fs/promises';
import { SanityEncoder, patch, at, CompactFormatter } from '@bjoerge/mutiny';
import { createSafeJsonParser } from '@sanity/util/createSafeJsonParser';
import arrify from 'arrify';
import { SanityEncoder } from '@bjoerge/mutiny';
import { fromString } from '@sanity/util/paths';
import { tmpdir } from 'node:os';
import path from 'node:path';
import groq from 'groq-js';
const objectToString = Object.prototype.toString;

@@ -388,3 +393,3 @@ const uint8ArrayStringified = "[object Uint8Array]";

}
async function* streamAsyncIterator(stream) {
async function* streamToAsyncIterator(stream) {
const reader = stream.getReader();

@@ -431,5 +436,5 @@ try {

async function* fromExportArchive(path) {
for await (const [header, entry] of streamAsyncIterator(untar(await maybeDecompress(readFileAsWebStream(path))))) {
for await (const [header, entry] of streamToAsyncIterator(untar(await maybeDecompress(readFileAsWebStream(path))))) {
if (header.type === "file" && header.name.endsWith(".ndjson")) {
for await (const chunk of streamAsyncIterator(entry)) {
for await (const chunk of streamToAsyncIterator(entry)) {
yield chunk;

@@ -440,5 +445,9 @@ }

}
function assert2xx(res) {
async function assert2xx(res) {
if (res.status < 200 || res.status > 299) {
const err = new Error("HTTP Error ".concat(res.status, ": ").concat(res.statusText));
const response = await res.json().catch(() => {
throw new Error("Error parsing JSON ".concat(res.status, ": ").concat(res.statusText));
});
const message = response.error ? response.error.description : "HTTP Error ".concat(res.status, ": ").concat(res.statusText);
const err = new Error(message);
err.statusCode = res.status;

@@ -448,3 +457,3 @@ throw err;

}
async function fetchAsyncIterator(_ref) {
async function fetchStream(_ref) {
let {

@@ -455,6 +464,9 @@ url,

const response = await fetch(url, init);
assert2xx(response);
await assert2xx(response);
if (response.body === null) throw new Error("No response received");
return streamAsyncIterator(response.body);
return response.body;
}
async function fetchAsyncIterator(options) {
return streamToAsyncIterator(await fetchStream(options));
}
function normalizeApiHost(apiHost) {

@@ -510,7 +522,7 @@ return apiHost.replace(/^https?:\/\//, "");

}),
export: dataset => ({
export: (dataset, documentTypes) => ({
global: false,
method: "GET",
path: "/data/export/".concat(dataset),
searchParams: []
searchParams: documentTypes && (documentTypes == null ? void 0 : documentTypes.length) > 0 ? [["types", documentTypes.join(",")]] : []
}),

@@ -530,3 +542,3 @@ mutate: (dataset, options) => {

var _a;
return fetchAsyncIterator(toFetchOptions({
return fetchStream(toFetchOptions({
projectId: options.projectId,

@@ -536,5 +548,8 @@ apiVersion: options.apiVersion,

apiHost: (_a = options.apiHost) != null ? _a : "api.sanity.io",
endpoint: endpoints.data.export(options.dataset)
endpoint: endpoints.data.export(options.dataset, options.documentTypes)
}));
}
const safeJsonParser = createSafeJsonParser({
errorLabel: "Error streaming dataset"
});
function* fromDocuments(documents) {

@@ -563,5 +578,14 @@ for (const document of documents) {

}
async function* parseJSON(it) {
for await (const chunk of it) {
yield JSON.parse(chunk);
function parseJSON(it) {
try {
let {
parse = JSON.parse
} = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : {};
return async function* () {
for await (const chunk of it) {
yield parse(chunk);
}
}();
} catch (e) {
return Promise.reject(e);
}

@@ -576,3 +600,3 @@ }

for await (const chunk of it) {
if (predicate(chunk)) {
if (await predicate(chunk)) {
yield chunk;

@@ -602,5 +626,10 @@ }

}
function ndjson(it) {
return parseJSON(filter(split(decodeText(it), "\n"), line => Boolean(line && line.trim())));
function parse(it, options) {
return parseJSON(filter(split(it, "\n"), line => Boolean(line && line.trim())), options);
}
async function* stringify(iterable) {
for await (const doc of iterable) {
yield "".concat(JSON.stringify(doc), "\n");
}
}
async function* take(it, count) {

@@ -620,2 +649,32 @@ let i = 0;

}
async function* tap(it, interceptor) {
for await (const chunk of it) {
interceptor(chunk);
yield chunk;
}
}
async function mapAsync(it, project, concurrency) {
const {
pMapIterable
} = await import('p-map');
return pMapIterable(it, v => project(v), {
concurrency
});
}
async function lastValueFrom(it, options) {
const defaultGiven = ("defaultValue" in (options != null ? options : {}));
let latestValue;
let didYield = false;
for await (const value of it) {
didYield = true;
latestValue = value;
}
if (!didYield) {
if (defaultGiven) {
return options.defaultValue;
}
throw new Error("No value yielded from async iterable. If this iterable is empty, provide a default value.");
}
return latestValue;
}
async function* concatStr(it) {

@@ -628,22 +687,268 @@ let buf = "";

}
async function* toMutationEndpoint(options, mutations) {
var _a;
for await (const mutation of mutations) {
const fetchOptions = toFetchOptions({
projectId: options.projectId,
apiVersion: options.apiVersion,
token: options.token,
apiHost: (_a = options.apiHost) != null ? _a : "api.sanity.io",
endpoint: endpoints.data.mutate(options.dataset, {
returnIds: true
}),
body: JSON.stringify({
mutations: SanityEncoder.encode(Array.isArray(mutation) ? mutation : [mutation])
})
const CHUNK_SIZE = 1024;
function bufferThroughFile(source, filename, options) {
const signal = options == null ? void 0 : options.signal;
let writeHandle;
let readHandle;
let bufferDone = false;
signal == null ? void 0 : signal.addEventListener("abort", async () => {
await Promise.all([writeHandle && writeHandle.close(), readHandle && (await readHandle).close()]);
});
let readerCount = 0;
let ready;
async function pump(reader) {
try {
while (true) {
const {
done,
value
} = await reader.read();
if (done || (signal == null ? void 0 : signal.aborted)) {
return;
}
await writeHandle.write(value);
}
} finally {
await writeHandle.close();
bufferDone = true;
reader.releaseLock();
}
}
function createBufferedReader() {
let totalBytesRead = 0;
return async function tryReadFromBuffer(handle) {
const {
bytesRead,
buffer
} = await handle.read(new Uint8Array(CHUNK_SIZE), 0, CHUNK_SIZE, totalBytesRead);
if (bytesRead === 0 && !bufferDone && !(signal == null ? void 0 : signal.aborted)) {
return tryReadFromBuffer(handle);
}
totalBytesRead += bytesRead;
return {
bytesRead,
buffer
};
};
}
function init() {
if (!ready) {
ready = (async () => {
writeHandle = await open(filename, "w");
pump(source.getReader());
})();
}
return ready;
}
function getReadHandle() {
if (!readHandle) {
readHandle = open(filename, "r");
}
return readHandle;
}
function onReaderStart() {
readerCount++;
}
async function onReaderEnd() {
readerCount--;
if (readerCount === 0 && readHandle) {
const handle = readHandle;
readHandle = null;
await (await handle).close();
}
}
return () => {
const readChunk = createBufferedReader();
return new ReadableStream({
async start() {
if (signal == null ? void 0 : signal.aborted) {
throw new Error("Cannot create new buffered readers on aborted stream");
}
onReaderStart();
await init();
await getReadHandle();
},
async pull(controller) {
if (!readHandle) {
throw new Error("Cannot read from closed handle");
}
const {
bytesRead,
buffer
} = await readChunk(await readHandle);
if (bytesRead === 0 && bufferDone) {
await onReaderEnd();
controller.close();
} else {
controller.enqueue(buffer.subarray(0, bytesRead));
}
}
});
for await (const result of parseJSON(concatStr(decodeText(await fetchAsyncIterator(fetchOptions))))) {
yield result;
};
}
function asyncIterableToStream(it) {
return new ReadableStream({
async pull(controller) {
const {
value,
done
} = await it.next();
if (done) {
controller.close();
} else {
controller.enqueue(value);
}
}
});
}
async function* toSanityMutations(it) {
for await (const mutation of it) {
if (Array.isArray(mutation)) {
yield SanityEncoder.encode(mutation);
continue;
}
yield SanityEncoder.encode([mutation])[0];
}
}
const MUTATION_ENDPOINT_MAX_BODY_SIZE = 1024 * 256;
const DEFAULT_MUTATION_CONCURRENCY = 6;
const MAX_MUTATION_CONCURRENCY = 10;
const PADDING_SIZE = '{"mutations":[]}'.length;
async function* batchMutations(mutations, maxBatchSize, options) {
let currentBatch = [];
let currentBatchSize = 0;
for await (const mutation of mutations) {
if ((options == null ? void 0 : options.preserveTransactions) && Array.isArray(mutation)) {
yield currentBatch;
yield mutation;
currentBatch = [];
currentBatchSize = 0;
continue;
}
const mutationSize = JSON.stringify(mutation).length;
if (mutationSize >= maxBatchSize + PADDING_SIZE) {
if (currentBatch.length) {
yield currentBatch;
}
yield [...arrify(mutation)];
currentBatch = [];
currentBatchSize = 0;
continue;
}
currentBatchSize += mutationSize;
if (currentBatchSize >= maxBatchSize + PADDING_SIZE) {
yield currentBatch;
currentBatch = [];
currentBatchSize = 0;
}
currentBatch.push(...arrify(mutation));
}
if (currentBatch.length > 0) {
yield currentBatch;
}
}
function create(document) {
return {
type: "create",
document
};
}
function patch(id, patches, options) {
return {
type: "patch",
id,
patches: arrify(patches),
...(options ? {
options
} : {})
};
}
function at(path, operation) {
return {
path: typeof path === "string" ? fromString(path) : path,
op: operation
};
}
function createIfNotExists(document) {
return {
type: "createIfNotExists",
document
};
}
function createOrReplace(document) {
return {
type: "createOrReplace",
document
};
}
function delete_(id) {
return {
type: "delete",
id
};
}
const del = delete_;
const set = value => ({
type: "set",
value
});
const setIfMissing = value => ({
type: "setIfMissing",
value
});
const unset = () => ({
type: "unset"
});
const inc = function () {
let amount = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : 1;
return {
type: "inc",
amount
};
};
const dec = function () {
let amount = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : 1;
return {
type: "dec",
amount
};
};
const diffMatchPatch = value => ({
type: "diffMatchPatch",
value
});
function insert(items, position, indexOrReferenceItem) {
return {
type: "insert",
referenceItem: indexOrReferenceItem,
position,
items: arrify(items)
};
}
function append(items) {
return insert(items, "after", -1);
}
function prepend(items) {
return insert(items, "before", 0);
}
function insertBefore(items, indexOrReferenceItem) {
return insert(items, "before", indexOrReferenceItem);
}
const insertAfter = (items, indexOrReferenceItem) => {
return insert(items, "after", indexOrReferenceItem);
};
function truncate(startIndex, endIndex) {
return {
type: "truncate",
startIndex,
endIndex
};
}
function replace(items, referenceItem) {
return {
type: "replace",
referenceItem,
items: arrify(items)
};
}
function getValueType(value) {

@@ -692,3 +997,3 @@ if (Array.isArray(value)) {

filter: migration.filter,
documentType: migration.documentType
documentTypes: migration.documentTypes
});

@@ -703,6 +1008,7 @@ }

function createAsyncIterableMutation(migration, opts) {
const documentTypesSet = new Set(opts.documentTypes);
return async function* run(docs, context) {
for await (const doc of docs) {
if (doc._type !== opts.documentType) continue;
const documentMutations = collectDocumentMutations(migration, doc, context);
for await (const doc of docs()) {
if (!documentTypesSet.has(doc._type)) continue;
const documentMutations = await collectDocumentMutations(migration, doc, context);
if (documentMutations.length > 0) {

@@ -714,10 +1020,11 @@ yield documentMutations;

}
function collectDocumentMutations(migration, doc, context) {
async function collectDocumentMutations(migration, doc, context) {
var _a;
const documentMutations = (_a = migration.document) == null ? void 0 : _a.call(migration, doc, context);
const nodeMigrations = flatMapDeep(doc, (value, path) => {
const documentMutations = Promise.resolve((_a = migration.document) == null ? void 0 : _a.call(migration, doc, context));
const nodeMigrations = flatMapDeep(doc, async (value, path) => {
var _a2;
return [...arrify((_a2 = migration.node) == null ? void 0 : _a2.call(migration, value, path, context)), ...arrify(migrateNodeType(migration, value, path, context))].map(change => normalizeNodeMutation(path, change));
const [nodeReturnValues, nodeTypeReturnValues] = await Promise.all([Promise.resolve((_a2 = migration.node) == null ? void 0 : _a2.call(migration, value, path, context)), Promise.resolve(migrateNodeType(migration, value, path, context))]);
return [...arrify(nodeReturnValues), ...arrify(nodeTypeReturnValues)].map(change => normalizeNodeMutation(path, change));
});
return [...arrify(documentMutations), ...nodeMigrations].map(change => normalizeDocumentMutation(doc._id, change));
return (await Promise.all([...arrify(await documentMutations), ...nodeMigrations])).flat().map(change => normalizeDocumentMutation(doc._id, change));
}

@@ -749,30 +1056,262 @@ function normalizeDocumentMutation(documentId, change) {

}
function collectMigrationMutations(migration, documents) {
const ctx = {
withDocument: () => {
throw new Error("Not implemented yet");
function collectMigrationMutations(migration, documents, context) {
const migrate = normalizeMigrateDefinition(migration);
return migrate(documents, context);
}
function getBufferFilePath() {
return path.join(tmpdir(), "/export-buffer-".concat(Date.now(), ".tmp"));
}
function parseGroq(query) {
try {
return groq.parse(query);
} catch (err) {
err.message = 'Failed to parse GROQ filter "'.concat(query, '": ').concat(err.message);
throw err;
}
}
async function groqQuery(it, query, params) {
const parsedFilter = parseGroq(query);
const all = await toArray(it);
return (await groq.evaluate(parsedFilter, {
dataset: all,
params
})).get();
}
function createBufferFileContext(getReader) {
function getAllDocumentsFromBuffer() {
return parse(decodeText(streamToAsyncIterator(getReader())), {
parse: safeJsonParser
});
}
async function getDocumentsFromBuffer(ids) {
const found = {};
let remaining = ids.length;
for await (const doc of getAllDocumentsFromBuffer()) {
if (ids.includes(doc._id)) {
remaining--;
found[doc._id] = doc;
}
if (remaining === 0) break;
}
return ids.map(id => found[id]);
}
async function getDocumentFromBuffer(id) {
return (await getDocumentsFromBuffer([id]))[0];
}
function queryFromBuffer(query, params) {
return groqQuery(getAllDocumentsFromBuffer(), query, params);
}
return {
getDocument: getDocumentFromBuffer,
getDocuments: getDocumentsFromBuffer,
query: queryFromBuffer
};
const migrate = normalizeMigrateDefinition(migration);
return migrate(documents, ctx);
}
async function* run(config, migration) {
const mutations = collectMigrationMutations(migration, ndjson(await fromExportEndpoint(config.api)));
for await (const result of toMutationEndpoint(config.api, mutations)) {
yield formatMutationResponse(result);
function isSystemDocumentId(id) {
return id.startsWith("_.");
}
function parseGroqFilter(filter) {
try {
return groq.parse("*[".concat(filter, "]"));
} catch (err) {
err.message = 'Failed to parse GROQ filter "'.concat(filter, '": ').concat(err.message);
throw err;
}
}
function formatMutationResponse(mutationResponse) {
return "OK (transactionId = ".concat(mutationResponse.transactionId, ")\n").concat(mutationResponse.results.map(result => {
return " - ".concat(result.operation, ": ").concat(result.id);
}).join("\n"));
async function matchesFilter(parsedFilter, document) {
const result = await (await groq.evaluate(parsedFilter, {
dataset: [document]
})).get();
return result.length === 1;
}
async function* applyFilters(migration, documents) {
const documentTypes = migration.documentTypes;
const parsedFilter = migration.filter ? parseGroqFilter(migration.filter) : void 0;
for await (const doc of documents) {
if (isSystemDocumentId(doc._id)) {
continue;
}
if (documentTypes && documentTypes.length > 0 && !documentTypes.includes(doc._type)) {
continue;
}
if (parsedFilter && !(await matchesFilter(parsedFilter, doc))) {
continue;
}
yield doc;
}
}
async function* toFetchOptionsIterable(apiConfig, mutations) {
var _a;
for await (const mut of mutations) {
yield toFetchOptions({
projectId: apiConfig.projectId,
apiVersion: apiConfig.apiVersion,
token: apiConfig.token,
apiHost: (_a = apiConfig.apiHost) != null ? _a : "api.sanity.io",
endpoint: endpoints.data.mutate(apiConfig.dataset, {
returnIds: true
}),
body: JSON.stringify({
mutations: mut
})
});
}
}
async function run(config, migration) {
var _a, _b, _c;
const stats = {
documents: 0,
mutations: 0,
pending: 0,
queuedBatches: 0,
completedTransactions: [],
currentMutations: []
};
const filteredDocuments = applyFilters(migration, parse(decodeText(streamToAsyncIterator(await fromExportEndpoint({
...config.api,
documentTypes: migration.documentTypes
}))), {
parse: safeJsonParser
}));
const abortController = new AbortController();
const createReader = bufferThroughFile(asyncIterableToStream(stringify(filteredDocuments)), getBufferFilePath(), {
signal: abortController.signal
});
const context = createBufferFileContext(createReader);
const documents = () => tap(parse(decodeText(streamToAsyncIterator(createReader())), {
parse: safeJsonParser
}), () => {
var _a2;
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, {
...stats,
documents: ++stats.documents
});
});
const mutations = tap(collectMigrationMutations(migration, documents, context), muts => {
var _a2;
stats.currentMutations = arrify(muts);
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, {
...stats,
mutations: ++stats.mutations
});
});
const concurrency = (_a = config == null ? void 0 : config.concurrency) != null ? _a : DEFAULT_MUTATION_CONCURRENCY;
if (concurrency > MAX_MUTATION_CONCURRENCY) {
throw new Error("Concurrency exceeds maximum allowed value (".concat(MAX_MUTATION_CONCURRENCY, ")"));
}
const batches = tap(batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE), () => {
var _a2;
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, {
...stats,
queuedBatches: ++stats.queuedBatches
});
});
const submit = async opts => lastValueFrom(parseJSON(concatStr(decodeText(await fetchAsyncIterator(opts)))));
const commits = await mapAsync(toFetchOptionsIterable(config.api, batches), opts => {
var _a2;
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, {
...stats,
pending: ++stats.pending
});
return submit(opts);
}, concurrency);
for await (const result of commits) {
stats.completedTransactions.push(result);
(_b = config.onProgress) == null ? void 0 : _b.call(config, {
...stats
});
}
(_c = config.onProgress) == null ? void 0 : _c.call(config, {
...stats,
done: true
});
abortController.abort();
}
async function runFromArchive(migration, path, config) {
var _a, _b, _c;
const stats = {
documents: 0,
mutations: 0,
pending: 0,
queuedBatches: 0,
completedTransactions: [],
currentMutations: []
};
const filteredDocuments = applyFilters(migration, parse(decodeText(fromExportArchive(path)), {
parse: safeJsonParser
}));
const abortController = new AbortController();
const createReader = bufferThroughFile(asyncIterableToStream(stringify(filteredDocuments)), getBufferFilePath(), {
signal: abortController.signal
});
const context = createBufferFileContext(createReader);
const documents = () => tap(parse(decodeText(streamToAsyncIterator(createReader())), {
parse: safeJsonParser
}), () => {
var _a2;
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, {
...stats,
documents: ++stats.documents
});
});
const mutations = tap(collectMigrationMutations(migration, documents, context), muts => {
var _a2;
stats.currentMutations = arrify(muts);
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, {
...stats,
mutations: ++stats.mutations
});
});
const batches = tap(batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE), () => {
var _a2;
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, {
...stats,
queuedBatches: ++stats.queuedBatches
});
});
const concurrency = (_a = config == null ? void 0 : config.concurrency) != null ? _a : DEFAULT_MUTATION_CONCURRENCY;
if (concurrency > MAX_MUTATION_CONCURRENCY) {
throw new Error("Concurrency exceeds maximum allowed value (".concat(MAX_MUTATION_CONCURRENCY, ")"));
}
const commits = await mapAsync(toFetchOptionsIterable(config.api, batches), opts => {
var _a2;
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, {
...stats,
pending: ++stats.pending
});
return Promise.resolve();
}, concurrency);
for await (const result of commits) {
(_b = config.onProgress) == null ? void 0 : _b.call(config, {
...stats
});
}
abortController.abort();
(_c = config.onProgress) == null ? void 0 : _c.call(config, {
...stats,
done: true
});
}
async function* dryRun(config, migration) {
const mutations = collectMigrationMutations(migration, ndjson(await fromExportEndpoint(config.api)));
const filteredDocuments = applyFilters(migration, parse(decodeText(streamToAsyncIterator(await fromExportEndpoint({
...config.api,
documentTypes: migration.documentTypes
}))), {
parse: safeJsonParser
}));
const abortController = new AbortController();
const createReader = bufferThroughFile(asyncIterableToStream(filteredDocuments), getBufferFilePath(), {
signal: abortController.signal
});
const context = createBufferFileContext(createReader);
const mutations = collectMigrationMutations(migration, () => parse(decodeText(streamToAsyncIterator(createReader())), {
parse: safeJsonParser
}), context);
abortController.abort();
for await (const mutation of mutations) {
if (!mutation) continue;
yield CompactFormatter.format(Array.isArray(mutation) ? mutation : [mutation]);
yield JSON.stringify(mutation, null, 2);
}
}
export { collectMigrationMutations, decodeText, defineMigration, delay, dryRun, filter, fromDocuments, fromExportArchive, fromExportEndpoint, map, ndjson, parseJSON, run, split, stringifyJSON, take, toArray };
export { DEFAULT_MUTATION_CONCURRENCY, MAX_MUTATION_CONCURRENCY, append, at, collectMigrationMutations, create, createIfNotExists, createOrReplace, dec, decodeText, defineMigration, del, delay, delete_, diffMatchPatch, dryRun, filter, fromDocuments, fromExportArchive, fromExportEndpoint, inc, insert, insertAfter, insertBefore, map, parse, parseJSON, patch, prepend, replace, run, runFromArchive, safeJsonParser, set, setIfMissing, split, stringify, stringifyJSON, take, toArray, toFetchOptionsIterable, truncate, unset };
//# sourceMappingURL=index.esm.js.map

@@ -8,4 +8,9 @@ 'use strict';

var promises = require('node:fs/promises');
var createSafeJsonParser = require('@sanity/util/createSafeJsonParser');
var arrify = require('arrify');
var mutiny = require('@bjoerge/mutiny');
var arrify = require('arrify');
var paths = require('@sanity/util/paths');
var node_os = require('node:os');
var path = require('node:path');
var groq = require('groq-js');
function _interopDefaultCompat(e) {

@@ -18,2 +23,4 @@ return e && typeof e === 'object' && 'default' in e ? e : {

var arrify__default = /*#__PURE__*/_interopDefaultCompat(arrify);
var path__default = /*#__PURE__*/_interopDefaultCompat(path);
var groq__default = /*#__PURE__*/_interopDefaultCompat(groq);
const objectToString = Object.prototype.toString;

@@ -402,3 +409,3 @@ const uint8ArrayStringified = "[object Uint8Array]";

}
async function* streamAsyncIterator(stream) {
async function* streamToAsyncIterator(stream) {
const reader = stream.getReader();

@@ -445,5 +452,5 @@ try {

async function* fromExportArchive(path) {
for await (const [header, entry] of streamAsyncIterator(untar(await maybeDecompress(readFileAsWebStream(path))))) {
for await (const [header, entry] of streamToAsyncIterator(untar(await maybeDecompress(readFileAsWebStream(path))))) {
if (header.type === "file" && header.name.endsWith(".ndjson")) {
for await (const chunk of streamAsyncIterator(entry)) {
for await (const chunk of streamToAsyncIterator(entry)) {
yield chunk;

@@ -454,5 +461,9 @@ }

}
function assert2xx(res) {
async function assert2xx(res) {
if (res.status < 200 || res.status > 299) {
const err = new Error("HTTP Error ".concat(res.status, ": ").concat(res.statusText));
const response = await res.json().catch(() => {
throw new Error("Error parsing JSON ".concat(res.status, ": ").concat(res.statusText));
});
const message = response.error ? response.error.description : "HTTP Error ".concat(res.status, ": ").concat(res.statusText);
const err = new Error(message);
err.statusCode = res.status;

@@ -462,3 +473,3 @@ throw err;

}
async function fetchAsyncIterator(_ref) {
async function fetchStream(_ref) {
let {

@@ -469,6 +480,9 @@ url,

const response = await fetch(url, init);
assert2xx(response);
await assert2xx(response);
if (response.body === null) throw new Error("No response received");
return streamAsyncIterator(response.body);
return response.body;
}
async function fetchAsyncIterator(options) {
return streamToAsyncIterator(await fetchStream(options));
}
function normalizeApiHost(apiHost) {

@@ -524,7 +538,7 @@ return apiHost.replace(/^https?:\/\//, "");

}),
export: dataset => ({
export: (dataset, documentTypes) => ({
global: false,
method: "GET",
path: "/data/export/".concat(dataset),
searchParams: []
searchParams: documentTypes && (documentTypes == null ? void 0 : documentTypes.length) > 0 ? [["types", documentTypes.join(",")]] : []
}),

@@ -544,3 +558,3 @@ mutate: (dataset, options) => {

var _a;
return fetchAsyncIterator(toFetchOptions({
return fetchStream(toFetchOptions({
projectId: options.projectId,

@@ -550,5 +564,8 @@ apiVersion: options.apiVersion,

apiHost: (_a = options.apiHost) != null ? _a : "api.sanity.io",
endpoint: endpoints.data.export(options.dataset)
endpoint: endpoints.data.export(options.dataset, options.documentTypes)
}));
}
const safeJsonParser = createSafeJsonParser.createSafeJsonParser({
errorLabel: "Error streaming dataset"
});
function* fromDocuments(documents) {

@@ -577,5 +594,14 @@ for (const document of documents) {

}
async function* parseJSON(it) {
for await (const chunk of it) {
yield JSON.parse(chunk);
function parseJSON(it) {
try {
let {
parse = JSON.parse
} = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : {};
return async function* () {
for await (const chunk of it) {
yield parse(chunk);
}
}();
} catch (e) {
return Promise.reject(e);
}

@@ -590,3 +616,3 @@ }

for await (const chunk of it) {
if (predicate(chunk)) {
if (await predicate(chunk)) {
yield chunk;

@@ -616,5 +642,10 @@ }

}
function ndjson(it) {
return parseJSON(filter(split(decodeText(it), "\n"), line => Boolean(line && line.trim())));
function parse(it, options) {
return parseJSON(filter(split(it, "\n"), line => Boolean(line && line.trim())), options);
}
async function* stringify(iterable) {
for await (const doc of iterable) {
yield "".concat(JSON.stringify(doc), "\n");
}
}
async function* take(it, count) {

@@ -634,2 +665,32 @@ let i = 0;

}
async function* tap(it, interceptor) {
for await (const chunk of it) {
interceptor(chunk);
yield chunk;
}
}
async function mapAsync(it, project, concurrency) {
const {
pMapIterable
} = await import('p-map');
return pMapIterable(it, v => project(v), {
concurrency
});
}
async function lastValueFrom(it, options) {
const defaultGiven = ("defaultValue" in (options != null ? options : {}));
let latestValue;
let didYield = false;
for await (const value of it) {
didYield = true;
latestValue = value;
}
if (!didYield) {
if (defaultGiven) {
return options.defaultValue;
}
throw new Error("No value yielded from async iterable. If this iterable is empty, provide a default value.");
}
return latestValue;
}
async function* concatStr(it) {

@@ -642,22 +703,268 @@ let buf = "";

}
async function* toMutationEndpoint(options, mutations) {
var _a;
for await (const mutation of mutations) {
const fetchOptions = toFetchOptions({
projectId: options.projectId,
apiVersion: options.apiVersion,
token: options.token,
apiHost: (_a = options.apiHost) != null ? _a : "api.sanity.io",
endpoint: endpoints.data.mutate(options.dataset, {
returnIds: true
}),
body: JSON.stringify({
mutations: mutiny.SanityEncoder.encode(Array.isArray(mutation) ? mutation : [mutation])
})
const CHUNK_SIZE = 1024;
function bufferThroughFile(source, filename, options) {
const signal = options == null ? void 0 : options.signal;
let writeHandle;
let readHandle;
let bufferDone = false;
signal == null ? void 0 : signal.addEventListener("abort", async () => {
await Promise.all([writeHandle && writeHandle.close(), readHandle && (await readHandle).close()]);
});
let readerCount = 0;
let ready;
async function pump(reader) {
try {
while (true) {
const {
done,
value
} = await reader.read();
if (done || (signal == null ? void 0 : signal.aborted)) {
return;
}
await writeHandle.write(value);
}
} finally {
await writeHandle.close();
bufferDone = true;
reader.releaseLock();
}
}
function createBufferedReader() {
let totalBytesRead = 0;
return async function tryReadFromBuffer(handle) {
const {
bytesRead,
buffer
} = await handle.read(new Uint8Array(CHUNK_SIZE), 0, CHUNK_SIZE, totalBytesRead);
if (bytesRead === 0 && !bufferDone && !(signal == null ? void 0 : signal.aborted)) {
return tryReadFromBuffer(handle);
}
totalBytesRead += bytesRead;
return {
bytesRead,
buffer
};
};
}
function init() {
if (!ready) {
ready = (async () => {
writeHandle = await promises.open(filename, "w");
pump(source.getReader());
})();
}
return ready;
}
function getReadHandle() {
if (!readHandle) {
readHandle = promises.open(filename, "r");
}
return readHandle;
}
function onReaderStart() {
readerCount++;
}
async function onReaderEnd() {
readerCount--;
if (readerCount === 0 && readHandle) {
const handle = readHandle;
readHandle = null;
await (await handle).close();
}
}
return () => {
const readChunk = createBufferedReader();
return new ReadableStream({
async start() {
if (signal == null ? void 0 : signal.aborted) {
throw new Error("Cannot create new buffered readers on aborted stream");
}
onReaderStart();
await init();
await getReadHandle();
},
async pull(controller) {
if (!readHandle) {
throw new Error("Cannot read from closed handle");
}
const {
bytesRead,
buffer
} = await readChunk(await readHandle);
if (bytesRead === 0 && bufferDone) {
await onReaderEnd();
controller.close();
} else {
controller.enqueue(buffer.subarray(0, bytesRead));
}
}
});
for await (const result of parseJSON(concatStr(decodeText(await fetchAsyncIterator(fetchOptions))))) {
yield result;
};
}
function asyncIterableToStream(it) {
return new ReadableStream({
async pull(controller) {
const {
value,
done
} = await it.next();
if (done) {
controller.close();
} else {
controller.enqueue(value);
}
}
});
}
async function* toSanityMutations(it) {
for await (const mutation of it) {
if (Array.isArray(mutation)) {
yield mutiny.SanityEncoder.encode(mutation);
continue;
}
yield mutiny.SanityEncoder.encode([mutation])[0];
}
}
const MUTATION_ENDPOINT_MAX_BODY_SIZE = 1024 * 256;
const DEFAULT_MUTATION_CONCURRENCY = 6;
const MAX_MUTATION_CONCURRENCY = 10;
const PADDING_SIZE = '{"mutations":[]}'.length;
async function* batchMutations(mutations, maxBatchSize, options) {
let currentBatch = [];
let currentBatchSize = 0;
for await (const mutation of mutations) {
if ((options == null ? void 0 : options.preserveTransactions) && Array.isArray(mutation)) {
yield currentBatch;
yield mutation;
currentBatch = [];
currentBatchSize = 0;
continue;
}
const mutationSize = JSON.stringify(mutation).length;
if (mutationSize >= maxBatchSize + PADDING_SIZE) {
if (currentBatch.length) {
yield currentBatch;
}
yield [...arrify__default.default(mutation)];
currentBatch = [];
currentBatchSize = 0;
continue;
}
currentBatchSize += mutationSize;
if (currentBatchSize >= maxBatchSize + PADDING_SIZE) {
yield currentBatch;
currentBatch = [];
currentBatchSize = 0;
}
currentBatch.push(...arrify__default.default(mutation));
}
if (currentBatch.length > 0) {
yield currentBatch;
}
}
function create(document) {
return {
type: "create",
document
};
}
function patch(id, patches, options) {
return {
type: "patch",
id,
patches: arrify__default.default(patches),
...(options ? {
options
} : {})
};
}
function at(path, operation) {
return {
path: typeof path === "string" ? paths.fromString(path) : path,
op: operation
};
}
function createIfNotExists(document) {
return {
type: "createIfNotExists",
document
};
}
function createOrReplace(document) {
return {
type: "createOrReplace",
document
};
}
function delete_(id) {
return {
type: "delete",
id
};
}
const del = delete_;
const set = value => ({
type: "set",
value
});
const setIfMissing = value => ({
type: "setIfMissing",
value
});
const unset = () => ({
type: "unset"
});
const inc = function () {
let amount = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : 1;
return {
type: "inc",
amount
};
};
const dec = function () {
let amount = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : 1;
return {
type: "dec",
amount
};
};
const diffMatchPatch = value => ({
type: "diffMatchPatch",
value
});
function insert(items, position, indexOrReferenceItem) {
return {
type: "insert",
referenceItem: indexOrReferenceItem,
position,
items: arrify__default.default(items)
};
}
function append(items) {
return insert(items, "after", -1);
}
function prepend(items) {
return insert(items, "before", 0);
}
function insertBefore(items, indexOrReferenceItem) {
return insert(items, "before", indexOrReferenceItem);
}
const insertAfter = (items, indexOrReferenceItem) => {
return insert(items, "after", indexOrReferenceItem);
};
function truncate(startIndex, endIndex) {
return {
type: "truncate",
startIndex,
endIndex
};
}
function replace(items, referenceItem) {
return {
type: "replace",
referenceItem,
items: arrify__default.default(items)
};
}
function getValueType(value) {

@@ -706,3 +1013,3 @@ if (Array.isArray(value)) {

filter: migration.filter,
documentType: migration.documentType
documentTypes: migration.documentTypes
});

@@ -717,6 +1024,7 @@ }

function createAsyncIterableMutation(migration, opts) {
const documentTypesSet = new Set(opts.documentTypes);
return async function* run(docs, context) {
for await (const doc of docs) {
if (doc._type !== opts.documentType) continue;
const documentMutations = collectDocumentMutations(migration, doc, context);
for await (const doc of docs()) {
if (!documentTypesSet.has(doc._type)) continue;
const documentMutations = await collectDocumentMutations(migration, doc, context);
if (documentMutations.length > 0) {

@@ -728,16 +1036,17 @@ yield documentMutations;

}
function collectDocumentMutations(migration, doc, context) {
async function collectDocumentMutations(migration, doc, context) {
var _a;
const documentMutations = (_a = migration.document) == null ? void 0 : _a.call(migration, doc, context);
const nodeMigrations = flatMapDeep(doc, (value, path) => {
const documentMutations = Promise.resolve((_a = migration.document) == null ? void 0 : _a.call(migration, doc, context));
const nodeMigrations = flatMapDeep(doc, async (value, path) => {
var _a2;
return [...arrify__default.default((_a2 = migration.node) == null ? void 0 : _a2.call(migration, value, path, context)), ...arrify__default.default(migrateNodeType(migration, value, path, context))].map(change => normalizeNodeMutation(path, change));
const [nodeReturnValues, nodeTypeReturnValues] = await Promise.all([Promise.resolve((_a2 = migration.node) == null ? void 0 : _a2.call(migration, value, path, context)), Promise.resolve(migrateNodeType(migration, value, path, context))]);
return [...arrify__default.default(nodeReturnValues), ...arrify__default.default(nodeTypeReturnValues)].map(change => normalizeNodeMutation(path, change));
});
return [...arrify__default.default(documentMutations), ...nodeMigrations].map(change => normalizeDocumentMutation(doc._id, change));
return (await Promise.all([...arrify__default.default(await documentMutations), ...nodeMigrations])).flat().map(change => normalizeDocumentMutation(doc._id, change));
}
function normalizeDocumentMutation(documentId, change) {
return isMutation(change) ? change : mutiny.patch(documentId, change);
return isMutation(change) ? change : patch(documentId, change);
}
function normalizeNodeMutation(path, change) {
return isOperation(change) ? mutiny.at(path, change) : change;
return isOperation(change) ? at(path, change) : change;
}

@@ -763,33 +1072,276 @@ function migrateNodeType(migration, value, path, context) {

}
function collectMigrationMutations(migration, documents) {
const ctx = {
withDocument: () => {
throw new Error("Not implemented yet");
function collectMigrationMutations(migration, documents, context) {
const migrate = normalizeMigrateDefinition(migration);
return migrate(documents, context);
}
function getBufferFilePath() {
return path__default.default.join(node_os.tmpdir(), "/export-buffer-".concat(Date.now(), ".tmp"));
}
function parseGroq(query) {
try {
return groq__default.default.parse(query);
} catch (err) {
err.message = 'Failed to parse GROQ filter "'.concat(query, '": ').concat(err.message);
throw err;
}
}
async function groqQuery(it, query, params) {
const parsedFilter = parseGroq(query);
const all = await toArray(it);
return (await groq__default.default.evaluate(parsedFilter, {
dataset: all,
params
})).get();
}
function createBufferFileContext(getReader) {
function getAllDocumentsFromBuffer() {
return parse(decodeText(streamToAsyncIterator(getReader())), {
parse: safeJsonParser
});
}
async function getDocumentsFromBuffer(ids) {
const found = {};
let remaining = ids.length;
for await (const doc of getAllDocumentsFromBuffer()) {
if (ids.includes(doc._id)) {
remaining--;
found[doc._id] = doc;
}
if (remaining === 0) break;
}
return ids.map(id => found[id]);
}
async function getDocumentFromBuffer(id) {
return (await getDocumentsFromBuffer([id]))[0];
}
function queryFromBuffer(query, params) {
return groqQuery(getAllDocumentsFromBuffer(), query, params);
}
return {
getDocument: getDocumentFromBuffer,
getDocuments: getDocumentsFromBuffer,
query: queryFromBuffer
};
const migrate = normalizeMigrateDefinition(migration);
return migrate(documents, ctx);
}
async function* run(config, migration) {
const mutations = collectMigrationMutations(migration, ndjson(await fromExportEndpoint(config.api)));
for await (const result of toMutationEndpoint(config.api, mutations)) {
yield formatMutationResponse(result);
function isSystemDocumentId(id) {
return id.startsWith("_.");
}
function parseGroqFilter(filter) {
try {
return groq__default.default.parse("*[".concat(filter, "]"));
} catch (err) {
err.message = 'Failed to parse GROQ filter "'.concat(filter, '": ').concat(err.message);
throw err;
}
}
function formatMutationResponse(mutationResponse) {
return "OK (transactionId = ".concat(mutationResponse.transactionId, ")\n").concat(mutationResponse.results.map(result => {
return " - ".concat(result.operation, ": ").concat(result.id);
}).join("\n"));
async function matchesFilter(parsedFilter, document) {
const result = await (await groq__default.default.evaluate(parsedFilter, {
dataset: [document]
})).get();
return result.length === 1;
}
async function* applyFilters(migration, documents) {
const documentTypes = migration.documentTypes;
const parsedFilter = migration.filter ? parseGroqFilter(migration.filter) : void 0;
for await (const doc of documents) {
if (isSystemDocumentId(doc._id)) {
continue;
}
if (documentTypes && documentTypes.length > 0 && !documentTypes.includes(doc._type)) {
continue;
}
if (parsedFilter && !(await matchesFilter(parsedFilter, doc))) {
continue;
}
yield doc;
}
}
async function* toFetchOptionsIterable(apiConfig, mutations) {
var _a;
for await (const mut of mutations) {
yield toFetchOptions({
projectId: apiConfig.projectId,
apiVersion: apiConfig.apiVersion,
token: apiConfig.token,
apiHost: (_a = apiConfig.apiHost) != null ? _a : "api.sanity.io",
endpoint: endpoints.data.mutate(apiConfig.dataset, {
returnIds: true
}),
body: JSON.stringify({
mutations: mut
})
});
}
}
async function run(config, migration) {
var _a, _b, _c;
const stats = {
documents: 0,
mutations: 0,
pending: 0,
queuedBatches: 0,
completedTransactions: [],
currentMutations: []
};
const filteredDocuments = applyFilters(migration, parse(decodeText(streamToAsyncIterator(await fromExportEndpoint({
...config.api,
documentTypes: migration.documentTypes
}))), {
parse: safeJsonParser
}));
const abortController = new AbortController();
const createReader = bufferThroughFile(asyncIterableToStream(stringify(filteredDocuments)), getBufferFilePath(), {
signal: abortController.signal
});
const context = createBufferFileContext(createReader);
const documents = () => tap(parse(decodeText(streamToAsyncIterator(createReader())), {
parse: safeJsonParser
}), () => {
var _a2;
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, {
...stats,
documents: ++stats.documents
});
});
const mutations = tap(collectMigrationMutations(migration, documents, context), muts => {
var _a2;
stats.currentMutations = arrify__default.default(muts);
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, {
...stats,
mutations: ++stats.mutations
});
});
const concurrency = (_a = config == null ? void 0 : config.concurrency) != null ? _a : DEFAULT_MUTATION_CONCURRENCY;
if (concurrency > MAX_MUTATION_CONCURRENCY) {
throw new Error("Concurrency exceeds maximum allowed value (".concat(MAX_MUTATION_CONCURRENCY, ")"));
}
const batches = tap(batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE), () => {
var _a2;
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, {
...stats,
queuedBatches: ++stats.queuedBatches
});
});
const submit = async opts => lastValueFrom(parseJSON(concatStr(decodeText(await fetchAsyncIterator(opts)))));
const commits = await mapAsync(toFetchOptionsIterable(config.api, batches), opts => {
var _a2;
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, {
...stats,
pending: ++stats.pending
});
return submit(opts);
}, concurrency);
for await (const result of commits) {
stats.completedTransactions.push(result);
(_b = config.onProgress) == null ? void 0 : _b.call(config, {
...stats
});
}
(_c = config.onProgress) == null ? void 0 : _c.call(config, {
...stats,
done: true
});
abortController.abort();
}
async function runFromArchive(migration, path, config) {
var _a, _b, _c;
const stats = {
documents: 0,
mutations: 0,
pending: 0,
queuedBatches: 0,
completedTransactions: [],
currentMutations: []
};
const filteredDocuments = applyFilters(migration, parse(decodeText(fromExportArchive(path)), {
parse: safeJsonParser
}));
const abortController = new AbortController();
const createReader = bufferThroughFile(asyncIterableToStream(stringify(filteredDocuments)), getBufferFilePath(), {
signal: abortController.signal
});
const context = createBufferFileContext(createReader);
const documents = () => tap(parse(decodeText(streamToAsyncIterator(createReader())), {
parse: safeJsonParser
}), () => {
var _a2;
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, {
...stats,
documents: ++stats.documents
});
});
const mutations = tap(collectMigrationMutations(migration, documents, context), muts => {
var _a2;
stats.currentMutations = arrify__default.default(muts);
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, {
...stats,
mutations: ++stats.mutations
});
});
const batches = tap(batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE), () => {
var _a2;
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, {
...stats,
queuedBatches: ++stats.queuedBatches
});
});
const concurrency = (_a = config == null ? void 0 : config.concurrency) != null ? _a : DEFAULT_MUTATION_CONCURRENCY;
if (concurrency > MAX_MUTATION_CONCURRENCY) {
throw new Error("Concurrency exceeds maximum allowed value (".concat(MAX_MUTATION_CONCURRENCY, ")"));
}
const commits = await mapAsync(toFetchOptionsIterable(config.api, batches), opts => {
var _a2;
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, {
...stats,
pending: ++stats.pending
});
return Promise.resolve();
}, concurrency);
for await (const result of commits) {
(_b = config.onProgress) == null ? void 0 : _b.call(config, {
...stats
});
}
abortController.abort();
(_c = config.onProgress) == null ? void 0 : _c.call(config, {
...stats,
done: true
});
}
async function* dryRun(config, migration) {
const mutations = collectMigrationMutations(migration, ndjson(await fromExportEndpoint(config.api)));
const filteredDocuments = applyFilters(migration, parse(decodeText(streamToAsyncIterator(await fromExportEndpoint({
...config.api,
documentTypes: migration.documentTypes
}))), {
parse: safeJsonParser
}));
const abortController = new AbortController();
const createReader = bufferThroughFile(asyncIterableToStream(filteredDocuments), getBufferFilePath(), {
signal: abortController.signal
});
const context = createBufferFileContext(createReader);
const mutations = collectMigrationMutations(migration, () => parse(decodeText(streamToAsyncIterator(createReader())), {
parse: safeJsonParser
}), context);
abortController.abort();
for await (const mutation of mutations) {
if (!mutation) continue;
yield mutiny.CompactFormatter.format(Array.isArray(mutation) ? mutation : [mutation]);
yield JSON.stringify(mutation, null, 2);
}
}
exports.DEFAULT_MUTATION_CONCURRENCY = DEFAULT_MUTATION_CONCURRENCY;
exports.MAX_MUTATION_CONCURRENCY = MAX_MUTATION_CONCURRENCY;
exports.append = append;
exports.at = at;
exports.collectMigrationMutations = collectMigrationMutations;
exports.create = create;
exports.createIfNotExists = createIfNotExists;
exports.createOrReplace = createOrReplace;
exports.dec = dec;
exports.decodeText = decodeText;
exports.defineMigration = defineMigration;
exports.del = del;
exports.delay = delay;
exports.delete_ = delete_;
exports.diffMatchPatch = diffMatchPatch;
exports.dryRun = dryRun;

@@ -800,10 +1352,25 @@ exports.filter = filter;

exports.fromExportEndpoint = fromExportEndpoint;
exports.inc = inc;
exports.insert = insert;
exports.insertAfter = insertAfter;
exports.insertBefore = insertBefore;
exports.map = map;
exports.ndjson = ndjson;
exports.parse = parse;
exports.parseJSON = parseJSON;
exports.patch = patch;
exports.prepend = prepend;
exports.replace = replace;
exports.run = run;
exports.runFromArchive = runFromArchive;
exports.safeJsonParser = safeJsonParser;
exports.set = set;
exports.setIfMissing = setIfMissing;
exports.split = split;
exports.stringify = stringify;
exports.stringifyJSON = stringifyJSON;
exports.take = take;
exports.toArray = toArray;
exports.toFetchOptionsIterable = toFetchOptionsIterable;
exports.truncate = truncate;
exports.unset = unset;
//# sourceMappingURL=index.js.map

@@ -1,7 +0,11 @@

import {Mutation} from '@bjoerge/mutiny'
import type {NodePatch} from '@bjoerge/mutiny'
import type {Operation} from '@bjoerge/mutiny'
import type {Path} from '@bjoerge/mutiny'
import {SanityDocument} from '@sanity/types'
import type {KeyedSegment} from '@sanity/types'
import {MultipleMutationResult} from '@sanity/client'
import {Mutation as Mutation_2} from '@sanity/client'
import type {Path} from '@sanity/types'
import {SanityDocument as SanityDocument_2} from '@sanity/types'
declare type AnyArray<T = any> = T[] | readonly T[]
export declare type AnyOp = SetOp<unknown> | SetIfMissingOp<unknown> | UnsetOp
export declare interface APIConfig {

@@ -15,12 +19,103 @@ projectId: string

/**
* Creates an `insert` operation that appends the provided items.
* @param items - The items to append.
* @returns An `insert` operation at the end of the array.
* {@link https://www.sanity.io/docs/http-patches#Cw4vhD88}
* @beta
*/
export declare function append<const Items extends AnyArray<unknown>>(
items: Items | ArrayElement<Items>,
): InsertOp<NormalizeReadOnlyArray<Items>, 'after', -1>
declare type ArrayElement<A> = A extends readonly (infer T)[] ? T : never
export declare type ArrayOp =
| InsertOp<AnyArray, RelativePosition, IndexedSegment | KeyedSegment>
| UpsertOp<AnyArray, RelativePosition, IndexedSegment | KeyedSegment>
| ReplaceOp<AnyArray, IndexedSegment | KeyedSegment>
| TruncateOp
export declare type AssignOp<T extends object = object> = {
type: 'assign'
value: T
}
export declare type AsyncIterableMigration = (
documents: AsyncIterableIterator<SanityDocument>,
documents: () => AsyncIterableIterator<SanityDocument_2>,
context: MigrationContext,
) => AsyncGenerator<Mutation | Mutation[]>
/**
* Creates a node patch at a specific path.
* @param path - The path where the operation should be applied.
* @param operation - The operation to be applied.
* @returns The node patch.
* @beta
*/
export declare function at<O extends Operation>(
path: Path | string,
operation: O,
): NodePatch<Path, O>
export declare function collectMigrationMutations(
migration: Migration,
documents: AsyncIterableIterator<SanityDocument>,
documents: () => AsyncIterableIterator<SanityDocument_2>,
context: MigrationContext,
): AsyncGenerator<Mutation | Mutation[], any, unknown>
/**
* Creates a new document.
* @param document - The document to be created.
* @returns The mutation operation to create the document.
* @beta
*/
export declare function create<Doc extends Optional<SanityDocument, '_id'>>(
document: Doc,
): CreateMutation<Doc>
/**
* Creates a document if it does not exist.
* @param document - The document to be created.
* @returns The mutation operation to create the document if it does not exist.
* @beta
*/
export declare function createIfNotExists<Doc extends SanityDocument>(
document: Doc,
): CreateIfNotExistsMutation<Doc>
export declare type CreateIfNotExistsMutation<Doc extends SanityDocument> = {
type: 'createIfNotExists'
document: Doc
}
export declare type CreateMutation<Doc extends Optional<SanityDocument, '_id'>> = {
type: 'create'
document: Doc
}
/**
* Creates or replaces a document.
* @param document - The document to be created or replaced.
* @returns The mutation operation to create or replace the document.
* @beta
*/
export declare function createOrReplace<Doc extends SanityDocument>(
document: Doc,
): CreateOrReplaceMutation<Doc>
export declare type CreateOrReplaceMutation<Doc extends SanityDocument> = {
type: 'createOrReplace'
document: Doc
}
/**
* Creates a `dec` (decrement) operation with the provided amount.
* @param amount - The amount to decrement by.
* @returns A `dec` operation.
* {@link https://www.sanity.io/docs/http-patches#vIT8WWQo}
* @beta
*/
export declare const dec: <const N extends number = 1>(amount?: N) => DecOp<N>
export declare function decodeText(

@@ -30,4 +125,16 @@ it: AsyncIterableIterator<Uint8Array>,

export declare type DecOp<Amount extends number> = {
type: 'dec'
amount: Amount
}
export declare const DEFAULT_MUTATION_CONCURRENCY = 6
export declare function defineMigration<T extends Migration>(migration: T): T
/**
* Alias for delete_
*/
export declare const del: typeof delete_
export declare function delay<T>(

@@ -38,2 +145,29 @@ it: AsyncIterableIterator<T>,

/**
* Deletes a document.
* @param id - The id of the document to be deleted.
* @returns The mutation operation to delete the document.
* @beta
*/
export declare function delete_(id: string): DeleteMutation
export declare type DeleteMutation = {
type: 'delete'
id: string
}
/**
* Creates a `diffMatchPatch` operation with the provided value.
* @param value - The value for the diff match patch operation.
* @returns A `diffMatchPatch` operation.
* {@link https://www.sanity.io/docs/http-patches#aTbJhlAJ}
* @beta
*/
export declare const diffMatchPatch: (value: string) => DiffMatchPatchOp
export declare type DiffMatchPatchOp = {
type: 'diffMatchPatch'
value: string
}
export declare type DocumentMigrationReturnValue =

@@ -47,14 +181,23 @@ | undefined

export declare function dryRun(
config: MigrationRunnerOptions_2,
config: MigrationRunnerOptions,
migration: Migration,
): AsyncGenerator<string, void, unknown>
export declare interface ExportAPIConfig extends APIConfig {
documentTypes?: string[]
}
declare interface FetchOptions {
url: string | URL
init: RequestInit
}
export declare function filter<T>(
it: AsyncIterableIterator<T>,
predicate: (value: T) => boolean,
predicate: (value: T) => boolean | Promise<boolean>,
): AsyncGenerator<Awaited<T>, void, unknown>
export declare function fromDocuments(
documents: SanityDocument[],
): Generator<SanityDocument, void, unknown>
documents: SanityDocument_2[],
): Generator<SanityDocument_2, void, unknown>

@@ -64,5 +207,82 @@ export declare function fromExportArchive(path: string): AsyncGenerator<Uint8Array, void, unknown>

export declare function fromExportEndpoint(
options: APIConfig,
): Promise<AsyncGenerator<Uint8Array, void, unknown>>
options: ExportAPIConfig,
): Promise<ReadableStream<Uint8Array>>
/**
* Creates an `inc` (increment) operation with the provided amount.
* @param amount - The amount to increment by.
* @returns An `inc` operation.
* {@link https://www.sanity.io/docs/http-patches#vIT8WWQo}
* @beta
*/
export declare const inc: <const N extends number = 1>(amount?: N) => IncOp<N>
export declare type IncOp<Amount extends number> = {
type: 'inc'
amount: Amount
}
export declare type IndexedSegment = number
/**
* Creates an `insert` operation with the provided items, position, and reference item.
* @param items - The items to insert.
* @param position - The position to insert at.
* @param indexOrReferenceItem - The index or reference item to insert before or after.
* @returns An `insert` operation.
* {@link https://www.sanity.io/docs/http-patches#febxf6Fk}
* @beta
*/
export declare function insert<
const Items extends AnyArray<unknown>,
const Pos extends RelativePosition,
const ReferenceItem extends IndexedSegment | KeyedSegment,
>(
items: Items | ArrayElement<Items>,
position: Pos,
indexOrReferenceItem: ReferenceItem,
): InsertOp<NormalizeReadOnlyArray<Items>, Pos, ReferenceItem>
/**
* Creates an `insert` operation that inserts the provided items after the provided index or reference item.
* @param items - The items to insert.
* @param indexOrReferenceItem - The index or reference item to insert after.
* @returns An `insert` operation after the provided index or reference item.
* {@link https://www.sanity.io/docs/http-patches#0SQmPlb6}
* @beta
*/
export declare const insertAfter: <
const Items extends AnyArray<unknown>,
const ReferenceItem extends number | KeyedSegment,
>(
items: Items | ArrayElement<Items>,
indexOrReferenceItem: ReferenceItem,
) => InsertOp<NormalizeReadOnlyArray<Items>, 'after', ReferenceItem>
/**
* Creates an `insert` operation that inserts the provided items before the provided index or reference item.
* @param items - The items to insert.
* @param indexOrReferenceItem - The index or reference item to insert before.
* @returns An `insert` operation before the provided index or reference item.
* {@link https://www.sanity.io/docs/http-patches#0SQmPlb6}
*/
export declare function insertBefore<
const Items extends AnyArray<unknown>,
const ReferenceItem extends IndexedSegment | KeyedSegment,
>(
items: Items | ArrayElement<Items>,
indexOrReferenceItem: ReferenceItem,
): InsertOp<NormalizeReadOnlyArray<Items>, 'before', ReferenceItem>
export declare type InsertOp<
Items extends AnyArray,
Pos extends RelativePosition,
ReferenceItem extends IndexedSegment | KeyedSegment,
> = {
type: 'insert'
referenceItem: ReferenceItem
position: Pos
items: Items
}
export declare type JsonArray = JsonValue[] | readonly JsonValue[]

@@ -76,2 +296,8 @@

export declare interface JSONOptions<Type> {
parse?: JSONParser<Type>
}
export declare type JSONParser<Type> = (line: string) => Type
export declare type JsonPrimitive = string | number | boolean | null

@@ -81,2 +307,4 @@

export {KeyedSegment}
export declare function map<T, U>(

@@ -87,2 +315,4 @@ it: AsyncIterableIterator<T>,

export declare const MAX_MUTATION_CONCURRENCY = 10
export declare type MigrateDefinition = NodeMigration | AsyncIterableMigration

@@ -96,3 +326,6 @@

filter?: string
documentType: string
/**
* What document types to migrate
*/
documentTypes?: string[]
migrate: Def

@@ -102,63 +335,76 @@ }

export declare interface MigrationContext {
withDocument(id: string): Promise<SanityDocument | null>
getDocument<T extends SanityDocument_2>(id: string): Promise<T | undefined>
getDocuments<T extends SanityDocument_2>(ids: string[]): Promise<T[]>
query<T>(query: string, params?: Record<string, unknown>): Promise<T>
}
declare interface MigrationRunnerOptions {
export declare type MigrationProgress = {
documents: number
mutations: number
pending: number
queuedBatches: number
currentMutations: Mutation[]
completedTransactions: MultipleMutationResult[]
done?: boolean
}
export declare interface MigrationRunnerConfig {
api: APIConfig
concurrency?: number
onProgress?: (event: MigrationProgress) => void
}
declare interface MigrationRunnerOptions_2 {
declare interface MigrationRunnerOptions {
api: APIConfig
}
export declare function ndjson(
it: AsyncIterableIterator<Uint8Array>,
): AsyncIterableIterator<unknown>
export declare type Mutation<Doc extends SanityDocument = any> =
| CreateMutation<Doc>
| CreateIfNotExistsMutation<Doc>
| CreateOrReplaceMutation<Doc>
| DeleteMutation
| PatchMutation
export declare interface NodeMigration {
document?: <Doc extends SanityDocument>(
document?: <Doc extends SanityDocument_2>(
doc: Doc,
context: NodeMigrationContext,
) => DocumentMigrationReturnValue
context: MigrationContext,
) => DocumentMigrationReturnValue | Promise<DocumentMigrationReturnValue>
node?: <Node extends JsonValue>(
node: Node,
path: Path,
context: NodeMigrationContext,
) => NodeMigrationReturnValue
context: MigrationContext,
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue>
object?: <Node extends JsonObject>(
node: Node,
path: Path,
context: NodeMigrationContext,
) => NodeMigrationReturnValue
context: MigrationContext,
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue>
array?: <Node extends JsonArray>(
node: Node,
path: Path,
context: NodeMigrationContext,
) => NodeMigrationReturnValue
context: MigrationContext,
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue>
string?: <Node extends string>(
node: Node,
path: Path,
context: NodeMigrationContext,
) => NodeMigrationReturnValue
context: MigrationContext,
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue>
number?: <Node extends number>(
node: Node,
path: Path,
context: NodeMigrationContext,
) => NodeMigrationReturnValue
context: MigrationContext,
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue>
boolean?: <Node extends boolean>(
node: Node,
path: Path,
context: NodeMigrationContext,
) => NodeMigrationReturnValue
context: MigrationContext,
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue>
null?: <Node extends null>(
node: Node,
path: Path,
context: NodeMigrationContext,
) => NodeMigrationReturnValue
context: MigrationContext,
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue>
}
export declare interface NodeMigrationContext {
withDocument(id: string): Promise<SanityDocument | null>
}
export declare type NodeMigrationReturnValue =

@@ -169,11 +415,156 @@ | DocumentMigrationReturnValue

export declare function parseJSON(it: AsyncIterableIterator<string>): AsyncIterableIterator<unknown>
export declare type NodePatch<P extends Path = Path, O extends Operation = Operation> = {
path: P
op: O
}
export declare type NodePatchList =
| [NodePatch, ...NodePatch[]]
| NodePatch[]
| readonly NodePatch[]
| readonly [NodePatch, ...NodePatch[]]
declare type NormalizeReadOnlyArray<T> = T extends readonly [infer NP, ...infer Rest]
? [NP, ...Rest]
: T extends readonly (infer NP)[]
? NP[]
: T
export declare type NumberOp = IncOp<number> | DecOp<number>
export declare type ObjectOp = AssignOp | UnassignOp
export declare type Operation = PrimitiveOp | ArrayOp | ObjectOp
declare type Optional<T, K extends keyof T> = Omit<T, K> & Partial<Pick<T, K>>
export declare function parse<Type>(
it: AsyncIterableIterator<string>,
options?: JSONOptions<Type>,
): AsyncIterableIterator<Type>
export declare function parseJSON<Type>(
it: AsyncIterableIterator<string>,
{parse}?: JSONOptions<Type>,
): AsyncIterableIterator<Type>
/**
* Applies a patch to a document.
* @param id - The id of the document to be patched.
* @param patches - The patches to be applied.
* @param options - Optional patch options.
* @returns The mutation operation to patch the document.
* @beta
*/
export declare function patch<P extends NodePatchList | NodePatch>(
id: string,
patches: P,
options?: PatchOptions,
): PatchMutation<NormalizeReadOnlyArray<Tuplify<P>>>
export declare type PatchMutation<Patches extends NodePatchList = NodePatchList> = {
type: 'patch'
id: string
patches: Patches
options?: PatchOptions
}
export declare type PatchOptions = {
ifRevision?: string
}
export {Path}
export declare function run(
config: MigrationRunnerOptions,
/**
* Creates an `insert` operation that prepends the provided items.
* @param items - The items to prepend.
* @returns An `insert` operation at the beginning of the array.
* {@link https://www.sanity.io/docs/http-patches#refAUsf0}
* @beta
*/
export declare function prepend<const Items extends AnyArray<unknown>>(
items: Items | ArrayElement<Items>,
): InsertOp<NormalizeReadOnlyArray<Items>, 'before', 0>
export declare type PrimitiveOp = AnyOp | StringOp | NumberOp
export declare type RelativePosition = 'before' | 'after'
/**
* Creates a `replace` operation with the provided items and reference item.
* @param items - The items to replace.
* @param referenceItem - The reference item to replace.
* @returns A ReplaceOp operation.
* @remarks This will be converted to an `insert`/`replace` patch when submitted to the API
* {@link https://www.sanity.io/docs/http-patches#GnVSwcPa}
* @beta
*/
export declare function replace<
Items extends any[],
ReferenceItem extends IndexedSegment | KeyedSegment,
>(items: Items | ArrayElement<Items>, referenceItem: ReferenceItem): ReplaceOp<Items, ReferenceItem>
export declare type ReplaceOp<
Items extends AnyArray,
ReferenceItem extends IndexedSegment | KeyedSegment,
> = {
type: 'replace'
referenceItem: ReferenceItem
items: Items
}
export declare function run(config: MigrationRunnerConfig, migration: Migration): Promise<void>
export declare function runFromArchive(
migration: Migration,
): AsyncGenerator<string, void, unknown>
path: string,
config: MigrationRunnerConfig,
): Promise<void>
/**
* Safe JSON parser that is able to handle lines interrupted by an error object.
*
* This may occur when streaming NDJSON from the Export HTTP API.
*
* @internal
* @see {@link https://github.com/sanity-io/sanity/pull/1787 | Initial pull request}
*/
export declare const safeJsonParser: (line: string) => SanityDocument_2
export declare type SanityDocument = {
_id?: string
_type: string
_createdAt?: string
_updatedAt?: string
_rev?: string
}
/**
* Creates a `set` operation with the provided value.
* @param value - The value to set.
* @returns A `set` operation.
* {@link https://www.sanity.io/docs/http-patches#6TPENSW3}
* @beta
*/
export declare const set: <const T>(value: T) => SetOp<T>
/**
* Creates a `setIfMissing` operation with the provided value.
* @param value - The value to set if missing.
* @returns A `setIfMissing` operation.
* {@link https://www.sanity.io/docs/http-patches#A80781bT}
* @beta
*/
export declare const setIfMissing: <const T>(value: T) => SetIfMissingOp<T>
export declare type SetIfMissingOp<T> = {
type: 'setIfMissing'
value: T
}
export declare type SetOp<T> = {
type: 'set'
value: T
}
export declare function split(

@@ -184,2 +575,6 @@ it: AsyncIterableIterator<string>,

export declare function stringify(
iterable: AsyncIterableIterator<unknown>,
): AsyncGenerator<string, void, unknown>
export declare function stringifyJSON(

@@ -189,2 +584,4 @@ it: AsyncIterableIterator<unknown>,

export declare type StringOp = DiffMatchPatchOp
export declare function take<T>(

@@ -197,2 +594,58 @@ it: AsyncIterableIterator<T>,

export declare function toFetchOptionsIterable(
apiConfig: APIConfig,
mutations: AsyncIterableIterator<Mutation_2[]>,
): AsyncGenerator<FetchOptions, void, unknown>
/**
* Creates a `truncate` operation that will remove all items after `startIndex` until the end of the array or the provided `endIndex`.
* @param startIndex - The start index for the truncate operation.
* @param endIndex - The end index for the truncate operation.
* @returns A `truncate` operation.
* @remarks - This will be converted to an `unset` patch when submitted to the API
* {@link https://www.sanity.io/docs/http-patches#xRtBjp8o}
* @beta
*/
export declare function truncate(startIndex: number, endIndex?: number): TruncateOp
export declare type TruncateOp = {
type: 'truncate'
startIndex: number
endIndex?: number
}
declare type Tuplify<T> = T extends readonly [infer NP, ...infer Rest]
? [NP, ...Rest]
: T extends readonly (infer NP)[]
? NP[]
: [T]
export declare type UnassignOp<K extends readonly string[] = readonly string[]> = {
type: 'unassign'
keys: K
}
/**
* Creates an `unset` operation.
* @returns An `unset` operation.
* {@link https://www.sanity.io/docs/http-patches#xRtBjp8o}
* @beta
*/
export declare const unset: () => UnsetOp
export declare type UnsetOp = {
type: 'unset'
}
export declare type UpsertOp<
Items extends AnyArray,
Pos extends RelativePosition,
ReferenceItem extends IndexedSegment | KeyedSegment,
> = {
type: 'upsert'
items: Items
referenceItem: ReferenceItem
position: Pos
}
export {}

27

package.json
{
"name": "@sanity/migrate",
"version": "3.25.1-canary.22+7467ec230d",
"version": "3.26.2-canary.47+dc645319c7",
"description": "Tooling for running data migrations on Sanity.io projects",

@@ -37,13 +37,2 @@ "keywords": [

},
"./mutations": {
"types": "./lib/dts/src/_exports/mutations.d.ts",
"source": "./src/_exports/mutations.ts",
"require": "./lib/_exports/mutations.js",
"node": {
"module": "./lib/_exports/mutations.esm.js",
"import": "./lib/_exports/mutations.cjs.mjs"
},
"import": "./lib/_exports/mutations.esm.js",
"default": "./lib/_exports/mutations.esm.js"
},
"./package.json": "./package.json"

@@ -74,11 +63,17 @@ },

"lint": "eslint .",
"watch": "pkg-utils watch --tsconfig tsconfig.lib.json"
"watch": "pkg-utils watch --tsconfig tsconfig.lib.json",
"test": "jest"
},
"dependencies": {
"@bjoerge/mutiny": "^0.5.1",
"@sanity/types": "3.25.1-canary.22+7467ec230d",
"@sanity/client": "^6.11.1",
"@sanity/types": "3.26.2-canary.47+dc645319c7",
"@sanity/util": "3.26.2-canary.47+dc645319c7",
"arrify": "^2.0.1",
"fast-fifo": "^1.3.2"
"fast-fifo": "^1.3.2",
"groq-js": "^1.4.1",
"p-map": "^7.0.1"
},
"devDependencies": {
"@types/arrify": "^2.0.1",
"rimraf": "^3.0.2"

@@ -89,3 +84,3 @@ },

},
"gitHead": "7467ec230d743872295acc60992b3510fa2dc51f"
"gitHead": "dc645319c7d109039871bc4b3385a0df47a10d55"
}
export {fromExportArchive} from '../sources/fromExportArchive'
export {fromExportEndpoint} from '../sources/fromExportEndpoint'
export {fromDocuments} from '../sources/fromDocuments'
export {safeJsonParser} from '../sources/fromExportEndpoint'
export * from '../types'

@@ -8,3 +9,6 @@ export * from '../defineMigration'

export * from '../runner/run'
export * from '../runner/runFromArchive'
export * from '../runner/dryRun'
export * from '../runner/collectMigrationMutations'
export {MAX_MUTATION_CONCURRENCY, DEFAULT_MUTATION_CONCURRENCY} from '../runner/constants'
export * from '../mutations'

@@ -25,7 +25,8 @@ type SupportedMethod = 'GET' | 'POST'

}),
export: (dataset: string): Endpoint => ({
export: (dataset: string, documentTypes?: string[]): Endpoint => ({
global: false,
method: 'GET',
path: `/data/export/${dataset}`,
searchParams: [],
searchParams:
documentTypes && documentTypes?.length > 0 ? [['types', documentTypes.join(',')]] : [],
}),

@@ -32,0 +33,0 @@ mutate: (

@@ -1,2 +0,2 @@

import {streamAsyncIterator} from '../utils/streamToAsyncIterator'
import {streamToAsyncIterator} from '../utils/streamToAsyncIterator'

@@ -11,5 +11,13 @@ export interface FetchOptions {

export function assert2xx(res: Response) {
export async function assert2xx(res: Response): Promise<void> {
if (res.status < 200 || res.status > 299) {
const err = new Error(`HTTP Error ${res.status}: ${res.statusText}`) as HTTPError
const response = await res.json().catch(() => {
throw new Error(`Error parsing JSON ${res.status}: ${res.statusText}`)
})
const message = response.error
? response.error.description
: `HTTP Error ${res.status}: ${res.statusText}`
const err = new Error(message) as HTTPError
err.statusCode = res.status

@@ -20,7 +28,11 @@ throw err

export async function fetchAsyncIterator({url, init}: FetchOptions) {
export async function fetchStream({url, init}: FetchOptions) {
const response = await fetch(url, init)
assert2xx(response)
await assert2xx(response)
if (response.body === null) throw new Error('No response received')
return streamAsyncIterator(response.body)
return response.body
}
export async function fetchAsyncIterator(options: FetchOptions) {
return streamToAsyncIterator(await fetchStream(options))
}

@@ -1,4 +0,7 @@

export async function* filter<T>(it: AsyncIterableIterator<T>, predicate: (value: T) => boolean) {
export async function* filter<T>(
it: AsyncIterableIterator<T>,
predicate: (value: T) => boolean | Promise<boolean>,
) {
for await (const chunk of it) {
if (predicate(chunk)) {
if (await predicate(chunk)) {
yield chunk

@@ -5,0 +8,0 @@ }

@@ -1,6 +0,13 @@

export async function* parseJSON(
export type JSONParser<Type> = (line: string) => Type
export interface JSONOptions<Type> {
parse?: JSONParser<Type>
}
export async function* parseJSON<Type>(
it: AsyncIterableIterator<string>,
): AsyncIterableIterator<unknown> {
{parse = JSON.parse}: JSONOptions<Type> = {},
): AsyncIterableIterator<Type> {
for await (const chunk of it) {
yield JSON.parse(chunk)
yield parse(chunk)
}

@@ -7,0 +14,0 @@ }

import {split} from './split'
import {decodeText} from './decodeText'
import {parseJSON} from './json'
import {type JSONOptions, parseJSON} from './json'
import {filter} from './filter'
export function ndjson(it: AsyncIterableIterator<Uint8Array>) {
return parseJSON(filter(split(decodeText(it), '\n'), (line) => Boolean(line && line.trim())))
export function parse<Type>(
it: AsyncIterableIterator<string>,
options?: JSONOptions<Type>,
): AsyncIterableIterator<Type> {
return parseJSON(
filter(split(it, '\n'), (line) => Boolean(line && line.trim())),
options,
)
}
export async function* stringify(iterable: AsyncIterableIterator<unknown>) {
for await (const doc of iterable) {
yield `${JSON.stringify(doc)}\n`
}
}
import {SanityDocument} from '@sanity/types'
import {Migration} from '../types'
import {Migration, MigrationContext} from '../types'
import {normalizeMigrateDefinition} from './normalizeMigrateDefinition'

@@ -7,11 +7,7 @@

migration: Migration,
documents: AsyncIterableIterator<SanityDocument>,
documents: () => AsyncIterableIterator<SanityDocument>,
context: MigrationContext,
) {
const ctx = {
withDocument: () => {
throw new Error('Not implemented yet')
},
}
const migrate = normalizeMigrateDefinition(migration)
return migrate(documents, ctx)
return migrate(documents, context)
}

@@ -1,7 +0,13 @@

import {CompactFormatter} from '@bjoerge/mutiny'
import {SanityDocument} from '@sanity/types'
import {APIConfig, Migration} from '../types'
import {ndjson} from '../it-utils/ndjson'
import {fromExportEndpoint} from '../sources/fromExportEndpoint'
import {fromExportEndpoint, safeJsonParser} from '../sources/fromExportEndpoint'
import {streamToAsyncIterator} from '../utils/streamToAsyncIterator'
import {bufferThroughFile} from '../fs-webstream/bufferThroughFile'
import {asyncIterableToStream} from '../utils/asyncIterableToStream'
import {parse} from '../it-utils/ndjson'
import {decodeText} from '../it-utils'
import {collectMigrationMutations} from './collectMigrationMutations'
import {getBufferFilePath} from './utils/getBufferFile'
import {createBufferFileContext} from './utils/createBufferFileContext'
import {applyFilters} from './utils/applyFilters'

@@ -13,11 +19,36 @@ interface MigrationRunnerOptions {

export async function* dryRun(config: MigrationRunnerOptions, migration: Migration) {
const filteredDocuments = applyFilters(
migration,
parse<SanityDocument>(
decodeText(
streamToAsyncIterator(
await fromExportEndpoint({...config.api, documentTypes: migration.documentTypes}),
),
),
{parse: safeJsonParser},
),
)
const abortController = new AbortController()
const createReader = bufferThroughFile(
asyncIterableToStream(filteredDocuments),
getBufferFilePath(),
{signal: abortController.signal},
)
const context = createBufferFileContext(createReader)
const mutations = collectMigrationMutations(
migration,
ndjson(await fromExportEndpoint(config.api)) as AsyncIterableIterator<SanityDocument>,
() => parse(decodeText(streamToAsyncIterator(createReader())), {parse: safeJsonParser}),
context,
)
// stop buffering the export once we're done collecting all mutations
abortController.abort()
for await (const mutation of mutations) {
if (!mutation) continue
yield CompactFormatter.format(Array.isArray(mutation) ? mutation : [mutation])
yield JSON.stringify(mutation, null, 2)
}
}

@@ -1,5 +0,5 @@

import {at, Mutation, NodePatch, Operation, patch, Path} from '@bjoerge/mutiny'
import {SanityDocument} from '@sanity/types'
import {Path, SanityDocument} from '@sanity/types'
import arrify from 'arrify'
import {AsyncIterableMigration, Migration, NodeMigration, NodeMigrationContext} from '../types'
import {at, Mutation, NodePatch, Operation, patch} from '../mutations'
import {AsyncIterableMigration, Migration, MigrationContext, NodeMigration} from '../types'
import {JsonArray, JsonObject, JsonValue} from '../json'

@@ -16,3 +16,3 @@ import {flatMapDeep} from './utils/flatMapDeep'

filter: migration.filter,
documentType: migration.documentType,
documentTypes: migration.documentTypes,
})

@@ -42,8 +42,11 @@ }

migration: NodeMigration,
opts: {filter?: string; documentType?: string},
opts: {filter?: string; documentTypes?: string[]},
): AsyncIterableMigration {
const documentTypesSet = new Set(opts.documentTypes)
return async function* run(docs, context) {
for await (const doc of docs) {
if (doc._type !== opts.documentType) continue
const documentMutations = collectDocumentMutations(migration, doc, context)
for await (const doc of docs()) {
if (!documentTypesSet.has(doc._type)) continue
const documentMutations = await collectDocumentMutations(migration, doc, context)
if (documentMutations.length > 0) {

@@ -56,18 +59,22 @@ yield documentMutations

function collectDocumentMutations(
async function collectDocumentMutations(
migration: NodeMigration,
doc: SanityDocument,
context: NodeMigrationContext,
context: MigrationContext,
) {
const documentMutations = migration.document?.(doc, context)
const nodeMigrations = flatMapDeep(doc as JsonValue, (value, path) => {
return [
...arrify(migration.node?.(value, path, context)),
...arrify(migrateNodeType(migration, value, path, context)),
].map((change) => normalizeNodeMutation(path, change))
const documentMutations = Promise.resolve(migration.document?.(doc, context))
const nodeMigrations = flatMapDeep(doc as JsonValue, async (value, path) => {
const [nodeReturnValues, nodeTypeReturnValues] = await Promise.all([
Promise.resolve(migration.node?.(value, path, context)),
Promise.resolve(migrateNodeType(migration, value, path, context)),
])
return [...arrify(nodeReturnValues), ...arrify(nodeTypeReturnValues)].map((change) =>
normalizeNodeMutation(path, change),
)
})
return [...arrify(documentMutations), ...nodeMigrations].map((change) =>
normalizeDocumentMutation(doc._id, change),
)
return (await Promise.all([...arrify(await documentMutations), ...nodeMigrations]))
.flat()
.map((change) => normalizeDocumentMutation(doc._id, change))
}

@@ -100,3 +107,3 @@

path: Path,
context: NodeMigrationContext,
context: MigrationContext,
) {

@@ -103,0 +110,0 @@ switch (getValueType(value)) {

import {SanityDocument} from '@sanity/types'
import {MultipleMutationResult} from '@sanity/client'
import {APIConfig, Migration} from '../types'
import {ndjson} from '../it-utils/ndjson'
import {fromExportEndpoint} from '../sources/fromExportEndpoint'
import {toMutationEndpoint} from '../targets/toMutationEndpoint'
import {MultipleMutationResult, Mutation as SanityMutation} from '@sanity/client'
import arrify from 'arrify'
import {APIConfig, Migration, MigrationProgress} from '../types'
import {parse, stringify} from '../it-utils/ndjson'
import {fromExportEndpoint, safeJsonParser} from '../sources/fromExportEndpoint'
import {endpoints} from '../fetch-utils/endpoints'
import {toFetchOptions} from '../fetch-utils/sanityRequestOptions'
import {tap} from '../it-utils/tap'
import {mapAsync} from '../it-utils/mapAsync'
import {lastValueFrom} from '../it-utils/lastValueFrom'
import {decodeText, parseJSON} from '../it-utils'
import {concatStr} from '../it-utils/concatStr'
import {fetchAsyncIterator, FetchOptions} from '../fetch-utils/fetchStream'
import {bufferThroughFile} from '../fs-webstream/bufferThroughFile'
import {streamToAsyncIterator} from '../utils/streamToAsyncIterator'
import {asyncIterableToStream} from '../utils/asyncIterableToStream'
import {toSanityMutations} from './utils/toSanityMutations'
import {
DEFAULT_MUTATION_CONCURRENCY,
MAX_MUTATION_CONCURRENCY,
MUTATION_ENDPOINT_MAX_BODY_SIZE,
} from './constants'
import {batchMutations} from './utils/batchMutations'
import {collectMigrationMutations} from './collectMigrationMutations'
import {getBufferFilePath} from './utils/getBufferFile'
import {createBufferFileContext} from './utils/createBufferFileContext'
import {applyFilters} from './utils/applyFilters'
interface MigrationRunnerOptions {
export interface MigrationRunnerConfig {
api: APIConfig
concurrency?: number
onProgress?: (event: MigrationProgress) => void
}
export async function* run(config: MigrationRunnerOptions, migration: Migration) {
const mutations = collectMigrationMutations(
export async function* toFetchOptionsIterable(
apiConfig: APIConfig,
mutations: AsyncIterableIterator<SanityMutation[]>,
) {
for await (const mut of mutations) {
yield toFetchOptions({
projectId: apiConfig.projectId,
apiVersion: apiConfig.apiVersion,
token: apiConfig.token,
apiHost: apiConfig.apiHost ?? 'api.sanity.io',
endpoint: endpoints.data.mutate(apiConfig.dataset, {returnIds: true}),
body: JSON.stringify({mutations: mut}),
})
}
}
export async function run(config: MigrationRunnerConfig, migration: Migration) {
const stats: MigrationProgress = {
documents: 0,
mutations: 0,
pending: 0,
queuedBatches: 0,
completedTransactions: [],
currentMutations: [],
}
const filteredDocuments = applyFilters(
migration,
ndjson(await fromExportEndpoint(config.api)) as AsyncIterableIterator<SanityDocument>,
parse<SanityDocument>(
decodeText(
streamToAsyncIterator(
await fromExportEndpoint({...config.api, documentTypes: migration.documentTypes}),
),
),
{parse: safeJsonParser},
),
)
const abortController = new AbortController()
for await (const result of toMutationEndpoint(config.api, mutations)) {
yield formatMutationResponse(result)
const createReader = bufferThroughFile(
asyncIterableToStream(stringify(filteredDocuments)),
getBufferFilePath(),
{signal: abortController.signal},
)
const context = createBufferFileContext(createReader)
const documents = () =>
tap(
parse<SanityDocument>(decodeText(streamToAsyncIterator(createReader())), {
parse: safeJsonParser,
}),
() => {
config.onProgress?.({...stats, documents: ++stats.documents})
},
)
const mutations = tap(collectMigrationMutations(migration, documents, context), (muts) => {
stats.currentMutations = arrify(muts)
config.onProgress?.({
...stats,
mutations: ++stats.mutations,
})
})
const concurrency = config?.concurrency ?? DEFAULT_MUTATION_CONCURRENCY
if (concurrency > MAX_MUTATION_CONCURRENCY) {
throw new Error(`Concurrency exceeds maximum allowed value (${MAX_MUTATION_CONCURRENCY})`)
}
}
function formatMutationResponse(mutationResponse: MultipleMutationResult) {
return `OK (transactionId = ${mutationResponse.transactionId})
${mutationResponse.results
.map((result) => {
return ` - ${result.operation}: ${result.id}`
const batches = tap(
batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE),
() => {
config.onProgress?.({...stats, queuedBatches: ++stats.queuedBatches})
},
)
const submit = async (opts: FetchOptions): Promise<MultipleMutationResult> =>
lastValueFrom(parseJSON(concatStr(decodeText(await fetchAsyncIterator(opts)))))
const commits = await mapAsync(
toFetchOptionsIterable(config.api, batches),
(opts) => {
config.onProgress?.({...stats, pending: ++stats.pending})
return submit(opts)
},
concurrency,
)
for await (const result of commits) {
stats.completedTransactions.push(result)
config.onProgress?.({
...stats,
})
}
config.onProgress?.({
...stats,
done: true,
})
.join('\n')}`
// Cancel export/buffer stream, it's not needed anymore
abortController.abort()
}

@@ -1,2 +0,2 @@

import {Path, PathElement} from '@bjoerge/mutiny'
import {Path, PathSegment} from '@sanity/types'
import {JsonArray, JsonObject, JsonValue} from '../../json'

@@ -17,3 +17,3 @@ import {getValueType} from './getValueType'

container: JsonArray | JsonObject,
): PathElement {
): PathSegment {
if (

@@ -20,0 +20,0 @@ item &&

import {maybeDecompress} from '../fs-webstream/maybeDecompress'
import {untar} from '../tar-webstream/untar'
import {streamAsyncIterator} from '../utils/streamToAsyncIterator'
import {streamToAsyncIterator} from '../utils/streamToAsyncIterator'
import {readFileAsWebStream} from '../fs-webstream/readFileAsWebStream'
export async function* fromExportArchive(path: string) {
for await (const [header, entry] of streamAsyncIterator(
for await (const [header, entry] of streamToAsyncIterator(
untar(await maybeDecompress(readFileAsWebStream(path))),
)) {
if (header.type === 'file' && header.name.endsWith('.ndjson')) {
for await (const chunk of streamAsyncIterator(entry)) {
for await (const chunk of streamToAsyncIterator(entry)) {
yield chunk

@@ -14,0 +14,0 @@ }

@@ -1,8 +0,10 @@

import {fetchAsyncIterator} from '../fetch-utils/fetchStream'
import {createSafeJsonParser} from '@sanity/util/createSafeJsonParser'
import {SanityDocument} from '@sanity/types'
import {fetchStream} from '../fetch-utils/fetchStream'
import {toFetchOptions} from '../fetch-utils/sanityRequestOptions'
import {endpoints} from '../fetch-utils/endpoints'
import {APIConfig} from '../types'
import {ExportAPIConfig} from '../types'
export function fromExportEndpoint(options: APIConfig) {
return fetchAsyncIterator(
export function fromExportEndpoint(options: ExportAPIConfig) {
return fetchStream(
toFetchOptions({

@@ -13,5 +15,17 @@ projectId: options.projectId,

apiHost: options.apiHost ?? 'api.sanity.io',
endpoint: endpoints.data.export(options.dataset),
endpoint: endpoints.data.export(options.dataset, options.documentTypes),
}),
)
}
/**
* Safe JSON parser that is able to handle lines interrupted by an error object.
*
* This may occur when streaming NDJSON from the Export HTTP API.
*
* @internal
* @see {@link https://github.com/sanity-io/sanity/pull/1787 | Initial pull request}
*/
export const safeJsonParser = createSafeJsonParser<SanityDocument>({
errorLabel: 'Error streaming dataset',
})

@@ -1,2 +0,2 @@

import {streamAsyncIterator} from '../../utils/streamToAsyncIterator'
import {streamToAsyncIterator} from '../../utils/streamToAsyncIterator'
import {untar} from '../untar'

@@ -7,4 +7,4 @@ import {readFileAsWebStream} from '../../fs-webstream/readFileAsWebStream'

const fileStream = readFileAsWebStream(file)
for await (const [header, body] of streamAsyncIterator(untar(fileStream))) {
yield [header.name, streamAsyncIterator(body)]
for await (const [header, body] of streamToAsyncIterator(untar(fileStream))) {
yield [header.name, streamToAsyncIterator(body)]
}

@@ -11,0 +11,0 @@ }

@@ -1,2 +0,2 @@

import {streamAsyncIterator} from '../../utils/streamToAsyncIterator'
import {streamToAsyncIterator} from '../../utils/streamToAsyncIterator'
import {toArray} from '../../it-utils/toArray'

@@ -33,5 +33,5 @@ import {untar} from '../untar'

for await (const [header, body] of streamAsyncIterator(untar(fileStream))) {
for await (const [header, body] of streamToAsyncIterator(untar(fileStream))) {
if (header.name.includes(file)) {
const chunks = await toArray(streamAsyncIterator(body))
const chunks = await toArray(streamToAsyncIterator(body))
const sum = await shasum(concatUint8Arrays(chunks))

@@ -38,0 +38,0 @@ expect(sum).toEqual('02c936cda5695fa4f43f5dc919c1f55c362faa6dd558dfb2d77d524f004069db')

@@ -1,2 +0,2 @@

import {streamAsyncIterator} from '../../utils/streamToAsyncIterator'
import {streamToAsyncIterator} from '../../utils/streamToAsyncIterator'
import {toArray} from '../../it-utils/toArray'

@@ -9,4 +9,4 @@ import {untar} from '../untar'

const fileStream = readFileAsWebStream(file)
for await (const [header, body] of streamAsyncIterator(untar(fileStream))) {
const content = await toArray(decodeText(streamAsyncIterator(body)))
for await (const [header, body] of streamToAsyncIterator(untar(fileStream))) {
const content = await toArray(decodeText(streamToAsyncIterator(body)))
yield [header.name, {type: header.type, content}]

@@ -13,0 +13,0 @@ }

@@ -1,2 +0,2 @@

import {streamAsyncIterator} from '../utils/streamToAsyncIterator'
import {streamToAsyncIterator} from '../utils/streamToAsyncIterator'

@@ -8,5 +8,5 @@ /**

export async function drain(stream: ReadableStream<unknown>) {
for await (const _ of streamAsyncIterator(stream)) {
for await (const _ of streamToAsyncIterator(stream)) {
// do nothing
}
}

@@ -1,4 +0,5 @@

import type {SanityDocument} from '@sanity/types'
import type {Mutation, NodePatch, Operation, Path} from '@bjoerge/mutiny'
import type {SanityDocument, Path} from '@sanity/types'
import {MultipleMutationResult} from '@sanity/client'
import {JsonArray, JsonObject, JsonValue} from './json'
import {Mutation, NodePatch, Operation} from './mutations'

@@ -9,3 +10,3 @@ export type {Path}

export type AsyncIterableMigration = (
documents: AsyncIterableIterator<SanityDocument>,
documents: () => AsyncIterableIterator<SanityDocument>,
context: MigrationContext,

@@ -20,3 +21,8 @@ ) => AsyncGenerator<Mutation | Mutation[]>

filter?: string
documentType: string
/**
* What document types to migrate
*/
documentTypes?: string[]
migrate: Def

@@ -27,4 +33,16 @@ }

export type MigrationProgress = {
documents: number
mutations: number
pending: number
queuedBatches: number
currentMutations: Mutation[]
completedTransactions: MultipleMutationResult[]
done?: boolean
}
export interface MigrationContext {
withDocument(id: string): Promise<SanityDocument | null>
getDocument<T extends SanityDocument>(id: string): Promise<T | undefined>
getDocuments<T extends SanityDocument>(ids: string[]): Promise<T[]>
query<T>(query: string, params?: Record<string, unknown>): Promise<T>
}

@@ -40,4 +58,4 @@

export interface NodeMigrationContext {
withDocument(id: string): Promise<SanityDocument | null>
export interface ExportAPIConfig extends APIConfig {
documentTypes?: string[]
}

@@ -57,39 +75,39 @@

doc: Doc,
context: NodeMigrationContext,
) => DocumentMigrationReturnValue
context: MigrationContext,
) => DocumentMigrationReturnValue | Promise<DocumentMigrationReturnValue>
node?: <Node extends JsonValue>(
node: Node,
path: Path,
context: NodeMigrationContext,
) => NodeMigrationReturnValue
context: MigrationContext,
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue>
object?: <Node extends JsonObject>(
node: Node,
path: Path,
context: NodeMigrationContext,
) => NodeMigrationReturnValue
context: MigrationContext,
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue>
array?: <Node extends JsonArray>(
node: Node,
path: Path,
context: NodeMigrationContext,
) => NodeMigrationReturnValue
context: MigrationContext,
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue>
string?: <Node extends string>(
node: Node,
path: Path,
context: NodeMigrationContext,
) => NodeMigrationReturnValue
context: MigrationContext,
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue>
number?: <Node extends number>(
node: Node,
path: Path,
context: NodeMigrationContext,
) => NodeMigrationReturnValue
context: MigrationContext,
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue>
boolean?: <Node extends boolean>(
node: Node,
path: Path,
context: NodeMigrationContext,
) => NodeMigrationReturnValue
context: MigrationContext,
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue>
null?: <Node extends null>(
node: Node,
path: Path,
context: NodeMigrationContext,
) => NodeMigrationReturnValue
context: MigrationContext,
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue>
}

@@ -1,2 +0,2 @@

export async function* streamAsyncIterator<T>(stream: ReadableStream<T>) {
export async function* streamToAsyncIterator<T>(stream: ReadableStream<T>) {
// Get a lock on the stream

@@ -3,0 +3,0 @@ const reader = stream.getReader()

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc