@sanity/migrate
Advanced tools
Comparing version 3.25.1-canary.22 to 3.26.2-canary.47
import FIFO from 'fast-fifo'; | ||
import { open } from 'node:fs/promises'; | ||
import { SanityEncoder, patch, at, CompactFormatter } from '@bjoerge/mutiny'; | ||
import { createSafeJsonParser } from '@sanity/util/createSafeJsonParser'; | ||
import arrify from 'arrify'; | ||
import { SanityEncoder } from '@bjoerge/mutiny'; | ||
import { fromString } from '@sanity/util/paths'; | ||
import { tmpdir } from 'node:os'; | ||
import path from 'node:path'; | ||
import groq from 'groq-js'; | ||
const objectToString = Object.prototype.toString; | ||
@@ -388,3 +393,3 @@ const uint8ArrayStringified = "[object Uint8Array]"; | ||
} | ||
async function* streamAsyncIterator(stream) { | ||
async function* streamToAsyncIterator(stream) { | ||
const reader = stream.getReader(); | ||
@@ -431,5 +436,5 @@ try { | ||
async function* fromExportArchive(path) { | ||
for await (const [header, entry] of streamAsyncIterator(untar(await maybeDecompress(readFileAsWebStream(path))))) { | ||
for await (const [header, entry] of streamToAsyncIterator(untar(await maybeDecompress(readFileAsWebStream(path))))) { | ||
if (header.type === "file" && header.name.endsWith(".ndjson")) { | ||
for await (const chunk of streamAsyncIterator(entry)) { | ||
for await (const chunk of streamToAsyncIterator(entry)) { | ||
yield chunk; | ||
@@ -440,5 +445,9 @@ } | ||
} | ||
function assert2xx(res) { | ||
async function assert2xx(res) { | ||
if (res.status < 200 || res.status > 299) { | ||
const err = new Error("HTTP Error ".concat(res.status, ": ").concat(res.statusText)); | ||
const response = await res.json().catch(() => { | ||
throw new Error("Error parsing JSON ".concat(res.status, ": ").concat(res.statusText)); | ||
}); | ||
const message = response.error ? response.error.description : "HTTP Error ".concat(res.status, ": ").concat(res.statusText); | ||
const err = new Error(message); | ||
err.statusCode = res.status; | ||
@@ -448,3 +457,3 @@ throw err; | ||
} | ||
async function fetchAsyncIterator(_ref) { | ||
async function fetchStream(_ref) { | ||
let { | ||
@@ -455,6 +464,9 @@ url, | ||
const response = await fetch(url, init); | ||
assert2xx(response); | ||
await assert2xx(response); | ||
if (response.body === null) throw new Error("No response received"); | ||
return streamAsyncIterator(response.body); | ||
return response.body; | ||
} | ||
async function fetchAsyncIterator(options) { | ||
return streamToAsyncIterator(await fetchStream(options)); | ||
} | ||
function normalizeApiHost(apiHost) { | ||
@@ -510,7 +522,7 @@ return apiHost.replace(/^https?:\/\//, ""); | ||
}), | ||
export: dataset => ({ | ||
export: (dataset, documentTypes) => ({ | ||
global: false, | ||
method: "GET", | ||
path: "/data/export/".concat(dataset), | ||
searchParams: [] | ||
searchParams: documentTypes && (documentTypes == null ? void 0 : documentTypes.length) > 0 ? [["types", documentTypes.join(",")]] : [] | ||
}), | ||
@@ -530,3 +542,3 @@ mutate: (dataset, options) => { | ||
var _a; | ||
return fetchAsyncIterator(toFetchOptions({ | ||
return fetchStream(toFetchOptions({ | ||
projectId: options.projectId, | ||
@@ -536,5 +548,8 @@ apiVersion: options.apiVersion, | ||
apiHost: (_a = options.apiHost) != null ? _a : "api.sanity.io", | ||
endpoint: endpoints.data.export(options.dataset) | ||
endpoint: endpoints.data.export(options.dataset, options.documentTypes) | ||
})); | ||
} | ||
const safeJsonParser = createSafeJsonParser({ | ||
errorLabel: "Error streaming dataset" | ||
}); | ||
function* fromDocuments(documents) { | ||
@@ -563,5 +578,14 @@ for (const document of documents) { | ||
} | ||
async function* parseJSON(it) { | ||
for await (const chunk of it) { | ||
yield JSON.parse(chunk); | ||
function parseJSON(it) { | ||
try { | ||
let { | ||
parse = JSON.parse | ||
} = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : {}; | ||
return async function* () { | ||
for await (const chunk of it) { | ||
yield parse(chunk); | ||
} | ||
}(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
@@ -576,3 +600,3 @@ } | ||
for await (const chunk of it) { | ||
if (predicate(chunk)) { | ||
if (await predicate(chunk)) { | ||
yield chunk; | ||
@@ -602,5 +626,10 @@ } | ||
} | ||
function ndjson(it) { | ||
return parseJSON(filter(split(decodeText(it), "\n"), line => Boolean(line && line.trim()))); | ||
function parse(it, options) { | ||
return parseJSON(filter(split(it, "\n"), line => Boolean(line && line.trim())), options); | ||
} | ||
async function* stringify(iterable) { | ||
for await (const doc of iterable) { | ||
yield "".concat(JSON.stringify(doc), "\n"); | ||
} | ||
} | ||
async function* take(it, count) { | ||
@@ -620,2 +649,32 @@ let i = 0; | ||
} | ||
async function* tap(it, interceptor) { | ||
for await (const chunk of it) { | ||
interceptor(chunk); | ||
yield chunk; | ||
} | ||
} | ||
async function mapAsync(it, project, concurrency) { | ||
const { | ||
pMapIterable | ||
} = await import('p-map'); | ||
return pMapIterable(it, v => project(v), { | ||
concurrency | ||
}); | ||
} | ||
async function lastValueFrom(it, options) { | ||
const defaultGiven = ("defaultValue" in (options != null ? options : {})); | ||
let latestValue; | ||
let didYield = false; | ||
for await (const value of it) { | ||
didYield = true; | ||
latestValue = value; | ||
} | ||
if (!didYield) { | ||
if (defaultGiven) { | ||
return options.defaultValue; | ||
} | ||
throw new Error("No value yielded from async iterable. If this iterable is empty, provide a default value."); | ||
} | ||
return latestValue; | ||
} | ||
async function* concatStr(it) { | ||
@@ -628,22 +687,268 @@ let buf = ""; | ||
} | ||
async function* toMutationEndpoint(options, mutations) { | ||
var _a; | ||
for await (const mutation of mutations) { | ||
const fetchOptions = toFetchOptions({ | ||
projectId: options.projectId, | ||
apiVersion: options.apiVersion, | ||
token: options.token, | ||
apiHost: (_a = options.apiHost) != null ? _a : "api.sanity.io", | ||
endpoint: endpoints.data.mutate(options.dataset, { | ||
returnIds: true | ||
}), | ||
body: JSON.stringify({ | ||
mutations: SanityEncoder.encode(Array.isArray(mutation) ? mutation : [mutation]) | ||
}) | ||
const CHUNK_SIZE = 1024; | ||
function bufferThroughFile(source, filename, options) { | ||
const signal = options == null ? void 0 : options.signal; | ||
let writeHandle; | ||
let readHandle; | ||
let bufferDone = false; | ||
signal == null ? void 0 : signal.addEventListener("abort", async () => { | ||
await Promise.all([writeHandle && writeHandle.close(), readHandle && (await readHandle).close()]); | ||
}); | ||
let readerCount = 0; | ||
let ready; | ||
async function pump(reader) { | ||
try { | ||
while (true) { | ||
const { | ||
done, | ||
value | ||
} = await reader.read(); | ||
if (done || (signal == null ? void 0 : signal.aborted)) { | ||
return; | ||
} | ||
await writeHandle.write(value); | ||
} | ||
} finally { | ||
await writeHandle.close(); | ||
bufferDone = true; | ||
reader.releaseLock(); | ||
} | ||
} | ||
function createBufferedReader() { | ||
let totalBytesRead = 0; | ||
return async function tryReadFromBuffer(handle) { | ||
const { | ||
bytesRead, | ||
buffer | ||
} = await handle.read(new Uint8Array(CHUNK_SIZE), 0, CHUNK_SIZE, totalBytesRead); | ||
if (bytesRead === 0 && !bufferDone && !(signal == null ? void 0 : signal.aborted)) { | ||
return tryReadFromBuffer(handle); | ||
} | ||
totalBytesRead += bytesRead; | ||
return { | ||
bytesRead, | ||
buffer | ||
}; | ||
}; | ||
} | ||
function init() { | ||
if (!ready) { | ||
ready = (async () => { | ||
writeHandle = await open(filename, "w"); | ||
pump(source.getReader()); | ||
})(); | ||
} | ||
return ready; | ||
} | ||
function getReadHandle() { | ||
if (!readHandle) { | ||
readHandle = open(filename, "r"); | ||
} | ||
return readHandle; | ||
} | ||
function onReaderStart() { | ||
readerCount++; | ||
} | ||
async function onReaderEnd() { | ||
readerCount--; | ||
if (readerCount === 0 && readHandle) { | ||
const handle = readHandle; | ||
readHandle = null; | ||
await (await handle).close(); | ||
} | ||
} | ||
return () => { | ||
const readChunk = createBufferedReader(); | ||
return new ReadableStream({ | ||
async start() { | ||
if (signal == null ? void 0 : signal.aborted) { | ||
throw new Error("Cannot create new buffered readers on aborted stream"); | ||
} | ||
onReaderStart(); | ||
await init(); | ||
await getReadHandle(); | ||
}, | ||
async pull(controller) { | ||
if (!readHandle) { | ||
throw new Error("Cannot read from closed handle"); | ||
} | ||
const { | ||
bytesRead, | ||
buffer | ||
} = await readChunk(await readHandle); | ||
if (bytesRead === 0 && bufferDone) { | ||
await onReaderEnd(); | ||
controller.close(); | ||
} else { | ||
controller.enqueue(buffer.subarray(0, bytesRead)); | ||
} | ||
} | ||
}); | ||
for await (const result of parseJSON(concatStr(decodeText(await fetchAsyncIterator(fetchOptions))))) { | ||
yield result; | ||
}; | ||
} | ||
function asyncIterableToStream(it) { | ||
return new ReadableStream({ | ||
async pull(controller) { | ||
const { | ||
value, | ||
done | ||
} = await it.next(); | ||
if (done) { | ||
controller.close(); | ||
} else { | ||
controller.enqueue(value); | ||
} | ||
} | ||
}); | ||
} | ||
async function* toSanityMutations(it) { | ||
for await (const mutation of it) { | ||
if (Array.isArray(mutation)) { | ||
yield SanityEncoder.encode(mutation); | ||
continue; | ||
} | ||
yield SanityEncoder.encode([mutation])[0]; | ||
} | ||
} | ||
const MUTATION_ENDPOINT_MAX_BODY_SIZE = 1024 * 256; | ||
const DEFAULT_MUTATION_CONCURRENCY = 6; | ||
const MAX_MUTATION_CONCURRENCY = 10; | ||
const PADDING_SIZE = '{"mutations":[]}'.length; | ||
async function* batchMutations(mutations, maxBatchSize, options) { | ||
let currentBatch = []; | ||
let currentBatchSize = 0; | ||
for await (const mutation of mutations) { | ||
if ((options == null ? void 0 : options.preserveTransactions) && Array.isArray(mutation)) { | ||
yield currentBatch; | ||
yield mutation; | ||
currentBatch = []; | ||
currentBatchSize = 0; | ||
continue; | ||
} | ||
const mutationSize = JSON.stringify(mutation).length; | ||
if (mutationSize >= maxBatchSize + PADDING_SIZE) { | ||
if (currentBatch.length) { | ||
yield currentBatch; | ||
} | ||
yield [...arrify(mutation)]; | ||
currentBatch = []; | ||
currentBatchSize = 0; | ||
continue; | ||
} | ||
currentBatchSize += mutationSize; | ||
if (currentBatchSize >= maxBatchSize + PADDING_SIZE) { | ||
yield currentBatch; | ||
currentBatch = []; | ||
currentBatchSize = 0; | ||
} | ||
currentBatch.push(...arrify(mutation)); | ||
} | ||
if (currentBatch.length > 0) { | ||
yield currentBatch; | ||
} | ||
} | ||
function create(document) { | ||
return { | ||
type: "create", | ||
document | ||
}; | ||
} | ||
function patch(id, patches, options) { | ||
return { | ||
type: "patch", | ||
id, | ||
patches: arrify(patches), | ||
...(options ? { | ||
options | ||
} : {}) | ||
}; | ||
} | ||
function at(path, operation) { | ||
return { | ||
path: typeof path === "string" ? fromString(path) : path, | ||
op: operation | ||
}; | ||
} | ||
function createIfNotExists(document) { | ||
return { | ||
type: "createIfNotExists", | ||
document | ||
}; | ||
} | ||
function createOrReplace(document) { | ||
return { | ||
type: "createOrReplace", | ||
document | ||
}; | ||
} | ||
function delete_(id) { | ||
return { | ||
type: "delete", | ||
id | ||
}; | ||
} | ||
const del = delete_; | ||
const set = value => ({ | ||
type: "set", | ||
value | ||
}); | ||
const setIfMissing = value => ({ | ||
type: "setIfMissing", | ||
value | ||
}); | ||
const unset = () => ({ | ||
type: "unset" | ||
}); | ||
const inc = function () { | ||
let amount = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : 1; | ||
return { | ||
type: "inc", | ||
amount | ||
}; | ||
}; | ||
const dec = function () { | ||
let amount = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : 1; | ||
return { | ||
type: "dec", | ||
amount | ||
}; | ||
}; | ||
const diffMatchPatch = value => ({ | ||
type: "diffMatchPatch", | ||
value | ||
}); | ||
function insert(items, position, indexOrReferenceItem) { | ||
return { | ||
type: "insert", | ||
referenceItem: indexOrReferenceItem, | ||
position, | ||
items: arrify(items) | ||
}; | ||
} | ||
function append(items) { | ||
return insert(items, "after", -1); | ||
} | ||
function prepend(items) { | ||
return insert(items, "before", 0); | ||
} | ||
function insertBefore(items, indexOrReferenceItem) { | ||
return insert(items, "before", indexOrReferenceItem); | ||
} | ||
const insertAfter = (items, indexOrReferenceItem) => { | ||
return insert(items, "after", indexOrReferenceItem); | ||
}; | ||
function truncate(startIndex, endIndex) { | ||
return { | ||
type: "truncate", | ||
startIndex, | ||
endIndex | ||
}; | ||
} | ||
function replace(items, referenceItem) { | ||
return { | ||
type: "replace", | ||
referenceItem, | ||
items: arrify(items) | ||
}; | ||
} | ||
function getValueType(value) { | ||
@@ -692,3 +997,3 @@ if (Array.isArray(value)) { | ||
filter: migration.filter, | ||
documentType: migration.documentType | ||
documentTypes: migration.documentTypes | ||
}); | ||
@@ -703,6 +1008,7 @@ } | ||
function createAsyncIterableMutation(migration, opts) { | ||
const documentTypesSet = new Set(opts.documentTypes); | ||
return async function* run(docs, context) { | ||
for await (const doc of docs) { | ||
if (doc._type !== opts.documentType) continue; | ||
const documentMutations = collectDocumentMutations(migration, doc, context); | ||
for await (const doc of docs()) { | ||
if (!documentTypesSet.has(doc._type)) continue; | ||
const documentMutations = await collectDocumentMutations(migration, doc, context); | ||
if (documentMutations.length > 0) { | ||
@@ -714,10 +1020,11 @@ yield documentMutations; | ||
} | ||
function collectDocumentMutations(migration, doc, context) { | ||
async function collectDocumentMutations(migration, doc, context) { | ||
var _a; | ||
const documentMutations = (_a = migration.document) == null ? void 0 : _a.call(migration, doc, context); | ||
const nodeMigrations = flatMapDeep(doc, (value, path) => { | ||
const documentMutations = Promise.resolve((_a = migration.document) == null ? void 0 : _a.call(migration, doc, context)); | ||
const nodeMigrations = flatMapDeep(doc, async (value, path) => { | ||
var _a2; | ||
return [...arrify((_a2 = migration.node) == null ? void 0 : _a2.call(migration, value, path, context)), ...arrify(migrateNodeType(migration, value, path, context))].map(change => normalizeNodeMutation(path, change)); | ||
const [nodeReturnValues, nodeTypeReturnValues] = await Promise.all([Promise.resolve((_a2 = migration.node) == null ? void 0 : _a2.call(migration, value, path, context)), Promise.resolve(migrateNodeType(migration, value, path, context))]); | ||
return [...arrify(nodeReturnValues), ...arrify(nodeTypeReturnValues)].map(change => normalizeNodeMutation(path, change)); | ||
}); | ||
return [...arrify(documentMutations), ...nodeMigrations].map(change => normalizeDocumentMutation(doc._id, change)); | ||
return (await Promise.all([...arrify(await documentMutations), ...nodeMigrations])).flat().map(change => normalizeDocumentMutation(doc._id, change)); | ||
} | ||
@@ -749,30 +1056,262 @@ function normalizeDocumentMutation(documentId, change) { | ||
} | ||
function collectMigrationMutations(migration, documents) { | ||
const ctx = { | ||
withDocument: () => { | ||
throw new Error("Not implemented yet"); | ||
function collectMigrationMutations(migration, documents, context) { | ||
const migrate = normalizeMigrateDefinition(migration); | ||
return migrate(documents, context); | ||
} | ||
function getBufferFilePath() { | ||
return path.join(tmpdir(), "/export-buffer-".concat(Date.now(), ".tmp")); | ||
} | ||
function parseGroq(query) { | ||
try { | ||
return groq.parse(query); | ||
} catch (err) { | ||
err.message = 'Failed to parse GROQ filter "'.concat(query, '": ').concat(err.message); | ||
throw err; | ||
} | ||
} | ||
async function groqQuery(it, query, params) { | ||
const parsedFilter = parseGroq(query); | ||
const all = await toArray(it); | ||
return (await groq.evaluate(parsedFilter, { | ||
dataset: all, | ||
params | ||
})).get(); | ||
} | ||
function createBufferFileContext(getReader) { | ||
function getAllDocumentsFromBuffer() { | ||
return parse(decodeText(streamToAsyncIterator(getReader())), { | ||
parse: safeJsonParser | ||
}); | ||
} | ||
async function getDocumentsFromBuffer(ids) { | ||
const found = {}; | ||
let remaining = ids.length; | ||
for await (const doc of getAllDocumentsFromBuffer()) { | ||
if (ids.includes(doc._id)) { | ||
remaining--; | ||
found[doc._id] = doc; | ||
} | ||
if (remaining === 0) break; | ||
} | ||
return ids.map(id => found[id]); | ||
} | ||
async function getDocumentFromBuffer(id) { | ||
return (await getDocumentsFromBuffer([id]))[0]; | ||
} | ||
function queryFromBuffer(query, params) { | ||
return groqQuery(getAllDocumentsFromBuffer(), query, params); | ||
} | ||
return { | ||
getDocument: getDocumentFromBuffer, | ||
getDocuments: getDocumentsFromBuffer, | ||
query: queryFromBuffer | ||
}; | ||
const migrate = normalizeMigrateDefinition(migration); | ||
return migrate(documents, ctx); | ||
} | ||
async function* run(config, migration) { | ||
const mutations = collectMigrationMutations(migration, ndjson(await fromExportEndpoint(config.api))); | ||
for await (const result of toMutationEndpoint(config.api, mutations)) { | ||
yield formatMutationResponse(result); | ||
function isSystemDocumentId(id) { | ||
return id.startsWith("_."); | ||
} | ||
function parseGroqFilter(filter) { | ||
try { | ||
return groq.parse("*[".concat(filter, "]")); | ||
} catch (err) { | ||
err.message = 'Failed to parse GROQ filter "'.concat(filter, '": ').concat(err.message); | ||
throw err; | ||
} | ||
} | ||
function formatMutationResponse(mutationResponse) { | ||
return "OK (transactionId = ".concat(mutationResponse.transactionId, ")\n").concat(mutationResponse.results.map(result => { | ||
return " - ".concat(result.operation, ": ").concat(result.id); | ||
}).join("\n")); | ||
async function matchesFilter(parsedFilter, document) { | ||
const result = await (await groq.evaluate(parsedFilter, { | ||
dataset: [document] | ||
})).get(); | ||
return result.length === 1; | ||
} | ||
async function* applyFilters(migration, documents) { | ||
const documentTypes = migration.documentTypes; | ||
const parsedFilter = migration.filter ? parseGroqFilter(migration.filter) : void 0; | ||
for await (const doc of documents) { | ||
if (isSystemDocumentId(doc._id)) { | ||
continue; | ||
} | ||
if (documentTypes && documentTypes.length > 0 && !documentTypes.includes(doc._type)) { | ||
continue; | ||
} | ||
if (parsedFilter && !(await matchesFilter(parsedFilter, doc))) { | ||
continue; | ||
} | ||
yield doc; | ||
} | ||
} | ||
async function* toFetchOptionsIterable(apiConfig, mutations) { | ||
var _a; | ||
for await (const mut of mutations) { | ||
yield toFetchOptions({ | ||
projectId: apiConfig.projectId, | ||
apiVersion: apiConfig.apiVersion, | ||
token: apiConfig.token, | ||
apiHost: (_a = apiConfig.apiHost) != null ? _a : "api.sanity.io", | ||
endpoint: endpoints.data.mutate(apiConfig.dataset, { | ||
returnIds: true | ||
}), | ||
body: JSON.stringify({ | ||
mutations: mut | ||
}) | ||
}); | ||
} | ||
} | ||
async function run(config, migration) { | ||
var _a, _b, _c; | ||
const stats = { | ||
documents: 0, | ||
mutations: 0, | ||
pending: 0, | ||
queuedBatches: 0, | ||
completedTransactions: [], | ||
currentMutations: [] | ||
}; | ||
const filteredDocuments = applyFilters(migration, parse(decodeText(streamToAsyncIterator(await fromExportEndpoint({ | ||
...config.api, | ||
documentTypes: migration.documentTypes | ||
}))), { | ||
parse: safeJsonParser | ||
})); | ||
const abortController = new AbortController(); | ||
const createReader = bufferThroughFile(asyncIterableToStream(stringify(filteredDocuments)), getBufferFilePath(), { | ||
signal: abortController.signal | ||
}); | ||
const context = createBufferFileContext(createReader); | ||
const documents = () => tap(parse(decodeText(streamToAsyncIterator(createReader())), { | ||
parse: safeJsonParser | ||
}), () => { | ||
var _a2; | ||
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, { | ||
...stats, | ||
documents: ++stats.documents | ||
}); | ||
}); | ||
const mutations = tap(collectMigrationMutations(migration, documents, context), muts => { | ||
var _a2; | ||
stats.currentMutations = arrify(muts); | ||
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, { | ||
...stats, | ||
mutations: ++stats.mutations | ||
}); | ||
}); | ||
const concurrency = (_a = config == null ? void 0 : config.concurrency) != null ? _a : DEFAULT_MUTATION_CONCURRENCY; | ||
if (concurrency > MAX_MUTATION_CONCURRENCY) { | ||
throw new Error("Concurrency exceeds maximum allowed value (".concat(MAX_MUTATION_CONCURRENCY, ")")); | ||
} | ||
const batches = tap(batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE), () => { | ||
var _a2; | ||
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, { | ||
...stats, | ||
queuedBatches: ++stats.queuedBatches | ||
}); | ||
}); | ||
const submit = async opts => lastValueFrom(parseJSON(concatStr(decodeText(await fetchAsyncIterator(opts))))); | ||
const commits = await mapAsync(toFetchOptionsIterable(config.api, batches), opts => { | ||
var _a2; | ||
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, { | ||
...stats, | ||
pending: ++stats.pending | ||
}); | ||
return submit(opts); | ||
}, concurrency); | ||
for await (const result of commits) { | ||
stats.completedTransactions.push(result); | ||
(_b = config.onProgress) == null ? void 0 : _b.call(config, { | ||
...stats | ||
}); | ||
} | ||
(_c = config.onProgress) == null ? void 0 : _c.call(config, { | ||
...stats, | ||
done: true | ||
}); | ||
abortController.abort(); | ||
} | ||
async function runFromArchive(migration, path, config) { | ||
var _a, _b, _c; | ||
const stats = { | ||
documents: 0, | ||
mutations: 0, | ||
pending: 0, | ||
queuedBatches: 0, | ||
completedTransactions: [], | ||
currentMutations: [] | ||
}; | ||
const filteredDocuments = applyFilters(migration, parse(decodeText(fromExportArchive(path)), { | ||
parse: safeJsonParser | ||
})); | ||
const abortController = new AbortController(); | ||
const createReader = bufferThroughFile(asyncIterableToStream(stringify(filteredDocuments)), getBufferFilePath(), { | ||
signal: abortController.signal | ||
}); | ||
const context = createBufferFileContext(createReader); | ||
const documents = () => tap(parse(decodeText(streamToAsyncIterator(createReader())), { | ||
parse: safeJsonParser | ||
}), () => { | ||
var _a2; | ||
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, { | ||
...stats, | ||
documents: ++stats.documents | ||
}); | ||
}); | ||
const mutations = tap(collectMigrationMutations(migration, documents, context), muts => { | ||
var _a2; | ||
stats.currentMutations = arrify(muts); | ||
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, { | ||
...stats, | ||
mutations: ++stats.mutations | ||
}); | ||
}); | ||
const batches = tap(batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE), () => { | ||
var _a2; | ||
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, { | ||
...stats, | ||
queuedBatches: ++stats.queuedBatches | ||
}); | ||
}); | ||
const concurrency = (_a = config == null ? void 0 : config.concurrency) != null ? _a : DEFAULT_MUTATION_CONCURRENCY; | ||
if (concurrency > MAX_MUTATION_CONCURRENCY) { | ||
throw new Error("Concurrency exceeds maximum allowed value (".concat(MAX_MUTATION_CONCURRENCY, ")")); | ||
} | ||
const commits = await mapAsync(toFetchOptionsIterable(config.api, batches), opts => { | ||
var _a2; | ||
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, { | ||
...stats, | ||
pending: ++stats.pending | ||
}); | ||
return Promise.resolve(); | ||
}, concurrency); | ||
for await (const result of commits) { | ||
(_b = config.onProgress) == null ? void 0 : _b.call(config, { | ||
...stats | ||
}); | ||
} | ||
abortController.abort(); | ||
(_c = config.onProgress) == null ? void 0 : _c.call(config, { | ||
...stats, | ||
done: true | ||
}); | ||
} | ||
async function* dryRun(config, migration) { | ||
const mutations = collectMigrationMutations(migration, ndjson(await fromExportEndpoint(config.api))); | ||
const filteredDocuments = applyFilters(migration, parse(decodeText(streamToAsyncIterator(await fromExportEndpoint({ | ||
...config.api, | ||
documentTypes: migration.documentTypes | ||
}))), { | ||
parse: safeJsonParser | ||
})); | ||
const abortController = new AbortController(); | ||
const createReader = bufferThroughFile(asyncIterableToStream(filteredDocuments), getBufferFilePath(), { | ||
signal: abortController.signal | ||
}); | ||
const context = createBufferFileContext(createReader); | ||
const mutations = collectMigrationMutations(migration, () => parse(decodeText(streamToAsyncIterator(createReader())), { | ||
parse: safeJsonParser | ||
}), context); | ||
abortController.abort(); | ||
for await (const mutation of mutations) { | ||
if (!mutation) continue; | ||
yield CompactFormatter.format(Array.isArray(mutation) ? mutation : [mutation]); | ||
yield JSON.stringify(mutation, null, 2); | ||
} | ||
} | ||
export { collectMigrationMutations, decodeText, defineMigration, delay, dryRun, filter, fromDocuments, fromExportArchive, fromExportEndpoint, map, ndjson, parseJSON, run, split, stringifyJSON, take, toArray }; | ||
export { DEFAULT_MUTATION_CONCURRENCY, MAX_MUTATION_CONCURRENCY, append, at, collectMigrationMutations, create, createIfNotExists, createOrReplace, dec, decodeText, defineMigration, del, delay, delete_, diffMatchPatch, dryRun, filter, fromDocuments, fromExportArchive, fromExportEndpoint, inc, insert, insertAfter, insertBefore, map, parse, parseJSON, patch, prepend, replace, run, runFromArchive, safeJsonParser, set, setIfMissing, split, stringify, stringifyJSON, take, toArray, toFetchOptionsIterable, truncate, unset }; | ||
//# sourceMappingURL=index.esm.js.map |
@@ -8,4 +8,9 @@ 'use strict'; | ||
var promises = require('node:fs/promises'); | ||
var createSafeJsonParser = require('@sanity/util/createSafeJsonParser'); | ||
var arrify = require('arrify'); | ||
var mutiny = require('@bjoerge/mutiny'); | ||
var arrify = require('arrify'); | ||
var paths = require('@sanity/util/paths'); | ||
var node_os = require('node:os'); | ||
var path = require('node:path'); | ||
var groq = require('groq-js'); | ||
function _interopDefaultCompat(e) { | ||
@@ -18,2 +23,4 @@ return e && typeof e === 'object' && 'default' in e ? e : { | ||
var arrify__default = /*#__PURE__*/_interopDefaultCompat(arrify); | ||
var path__default = /*#__PURE__*/_interopDefaultCompat(path); | ||
var groq__default = /*#__PURE__*/_interopDefaultCompat(groq); | ||
const objectToString = Object.prototype.toString; | ||
@@ -402,3 +409,3 @@ const uint8ArrayStringified = "[object Uint8Array]"; | ||
} | ||
async function* streamAsyncIterator(stream) { | ||
async function* streamToAsyncIterator(stream) { | ||
const reader = stream.getReader(); | ||
@@ -445,5 +452,5 @@ try { | ||
async function* fromExportArchive(path) { | ||
for await (const [header, entry] of streamAsyncIterator(untar(await maybeDecompress(readFileAsWebStream(path))))) { | ||
for await (const [header, entry] of streamToAsyncIterator(untar(await maybeDecompress(readFileAsWebStream(path))))) { | ||
if (header.type === "file" && header.name.endsWith(".ndjson")) { | ||
for await (const chunk of streamAsyncIterator(entry)) { | ||
for await (const chunk of streamToAsyncIterator(entry)) { | ||
yield chunk; | ||
@@ -454,5 +461,9 @@ } | ||
} | ||
function assert2xx(res) { | ||
async function assert2xx(res) { | ||
if (res.status < 200 || res.status > 299) { | ||
const err = new Error("HTTP Error ".concat(res.status, ": ").concat(res.statusText)); | ||
const response = await res.json().catch(() => { | ||
throw new Error("Error parsing JSON ".concat(res.status, ": ").concat(res.statusText)); | ||
}); | ||
const message = response.error ? response.error.description : "HTTP Error ".concat(res.status, ": ").concat(res.statusText); | ||
const err = new Error(message); | ||
err.statusCode = res.status; | ||
@@ -462,3 +473,3 @@ throw err; | ||
} | ||
async function fetchAsyncIterator(_ref) { | ||
async function fetchStream(_ref) { | ||
let { | ||
@@ -469,6 +480,9 @@ url, | ||
const response = await fetch(url, init); | ||
assert2xx(response); | ||
await assert2xx(response); | ||
if (response.body === null) throw new Error("No response received"); | ||
return streamAsyncIterator(response.body); | ||
return response.body; | ||
} | ||
async function fetchAsyncIterator(options) { | ||
return streamToAsyncIterator(await fetchStream(options)); | ||
} | ||
function normalizeApiHost(apiHost) { | ||
@@ -524,7 +538,7 @@ return apiHost.replace(/^https?:\/\//, ""); | ||
}), | ||
export: dataset => ({ | ||
export: (dataset, documentTypes) => ({ | ||
global: false, | ||
method: "GET", | ||
path: "/data/export/".concat(dataset), | ||
searchParams: [] | ||
searchParams: documentTypes && (documentTypes == null ? void 0 : documentTypes.length) > 0 ? [["types", documentTypes.join(",")]] : [] | ||
}), | ||
@@ -544,3 +558,3 @@ mutate: (dataset, options) => { | ||
var _a; | ||
return fetchAsyncIterator(toFetchOptions({ | ||
return fetchStream(toFetchOptions({ | ||
projectId: options.projectId, | ||
@@ -550,5 +564,8 @@ apiVersion: options.apiVersion, | ||
apiHost: (_a = options.apiHost) != null ? _a : "api.sanity.io", | ||
endpoint: endpoints.data.export(options.dataset) | ||
endpoint: endpoints.data.export(options.dataset, options.documentTypes) | ||
})); | ||
} | ||
const safeJsonParser = createSafeJsonParser.createSafeJsonParser({ | ||
errorLabel: "Error streaming dataset" | ||
}); | ||
function* fromDocuments(documents) { | ||
@@ -577,5 +594,14 @@ for (const document of documents) { | ||
} | ||
async function* parseJSON(it) { | ||
for await (const chunk of it) { | ||
yield JSON.parse(chunk); | ||
function parseJSON(it) { | ||
try { | ||
let { | ||
parse = JSON.parse | ||
} = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : {}; | ||
return async function* () { | ||
for await (const chunk of it) { | ||
yield parse(chunk); | ||
} | ||
}(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
@@ -590,3 +616,3 @@ } | ||
for await (const chunk of it) { | ||
if (predicate(chunk)) { | ||
if (await predicate(chunk)) { | ||
yield chunk; | ||
@@ -616,5 +642,10 @@ } | ||
} | ||
function ndjson(it) { | ||
return parseJSON(filter(split(decodeText(it), "\n"), line => Boolean(line && line.trim()))); | ||
function parse(it, options) { | ||
return parseJSON(filter(split(it, "\n"), line => Boolean(line && line.trim())), options); | ||
} | ||
async function* stringify(iterable) { | ||
for await (const doc of iterable) { | ||
yield "".concat(JSON.stringify(doc), "\n"); | ||
} | ||
} | ||
async function* take(it, count) { | ||
@@ -634,2 +665,32 @@ let i = 0; | ||
} | ||
async function* tap(it, interceptor) { | ||
for await (const chunk of it) { | ||
interceptor(chunk); | ||
yield chunk; | ||
} | ||
} | ||
async function mapAsync(it, project, concurrency) { | ||
const { | ||
pMapIterable | ||
} = await import('p-map'); | ||
return pMapIterable(it, v => project(v), { | ||
concurrency | ||
}); | ||
} | ||
async function lastValueFrom(it, options) { | ||
const defaultGiven = ("defaultValue" in (options != null ? options : {})); | ||
let latestValue; | ||
let didYield = false; | ||
for await (const value of it) { | ||
didYield = true; | ||
latestValue = value; | ||
} | ||
if (!didYield) { | ||
if (defaultGiven) { | ||
return options.defaultValue; | ||
} | ||
throw new Error("No value yielded from async iterable. If this iterable is empty, provide a default value."); | ||
} | ||
return latestValue; | ||
} | ||
async function* concatStr(it) { | ||
@@ -642,22 +703,268 @@ let buf = ""; | ||
} | ||
async function* toMutationEndpoint(options, mutations) { | ||
var _a; | ||
for await (const mutation of mutations) { | ||
const fetchOptions = toFetchOptions({ | ||
projectId: options.projectId, | ||
apiVersion: options.apiVersion, | ||
token: options.token, | ||
apiHost: (_a = options.apiHost) != null ? _a : "api.sanity.io", | ||
endpoint: endpoints.data.mutate(options.dataset, { | ||
returnIds: true | ||
}), | ||
body: JSON.stringify({ | ||
mutations: mutiny.SanityEncoder.encode(Array.isArray(mutation) ? mutation : [mutation]) | ||
}) | ||
const CHUNK_SIZE = 1024; | ||
function bufferThroughFile(source, filename, options) { | ||
const signal = options == null ? void 0 : options.signal; | ||
let writeHandle; | ||
let readHandle; | ||
let bufferDone = false; | ||
signal == null ? void 0 : signal.addEventListener("abort", async () => { | ||
await Promise.all([writeHandle && writeHandle.close(), readHandle && (await readHandle).close()]); | ||
}); | ||
let readerCount = 0; | ||
let ready; | ||
async function pump(reader) { | ||
try { | ||
while (true) { | ||
const { | ||
done, | ||
value | ||
} = await reader.read(); | ||
if (done || (signal == null ? void 0 : signal.aborted)) { | ||
return; | ||
} | ||
await writeHandle.write(value); | ||
} | ||
} finally { | ||
await writeHandle.close(); | ||
bufferDone = true; | ||
reader.releaseLock(); | ||
} | ||
} | ||
function createBufferedReader() { | ||
let totalBytesRead = 0; | ||
return async function tryReadFromBuffer(handle) { | ||
const { | ||
bytesRead, | ||
buffer | ||
} = await handle.read(new Uint8Array(CHUNK_SIZE), 0, CHUNK_SIZE, totalBytesRead); | ||
if (bytesRead === 0 && !bufferDone && !(signal == null ? void 0 : signal.aborted)) { | ||
return tryReadFromBuffer(handle); | ||
} | ||
totalBytesRead += bytesRead; | ||
return { | ||
bytesRead, | ||
buffer | ||
}; | ||
}; | ||
} | ||
function init() { | ||
if (!ready) { | ||
ready = (async () => { | ||
writeHandle = await promises.open(filename, "w"); | ||
pump(source.getReader()); | ||
})(); | ||
} | ||
return ready; | ||
} | ||
function getReadHandle() { | ||
if (!readHandle) { | ||
readHandle = promises.open(filename, "r"); | ||
} | ||
return readHandle; | ||
} | ||
function onReaderStart() { | ||
readerCount++; | ||
} | ||
async function onReaderEnd() { | ||
readerCount--; | ||
if (readerCount === 0 && readHandle) { | ||
const handle = readHandle; | ||
readHandle = null; | ||
await (await handle).close(); | ||
} | ||
} | ||
return () => { | ||
const readChunk = createBufferedReader(); | ||
return new ReadableStream({ | ||
async start() { | ||
if (signal == null ? void 0 : signal.aborted) { | ||
throw new Error("Cannot create new buffered readers on aborted stream"); | ||
} | ||
onReaderStart(); | ||
await init(); | ||
await getReadHandle(); | ||
}, | ||
async pull(controller) { | ||
if (!readHandle) { | ||
throw new Error("Cannot read from closed handle"); | ||
} | ||
const { | ||
bytesRead, | ||
buffer | ||
} = await readChunk(await readHandle); | ||
if (bytesRead === 0 && bufferDone) { | ||
await onReaderEnd(); | ||
controller.close(); | ||
} else { | ||
controller.enqueue(buffer.subarray(0, bytesRead)); | ||
} | ||
} | ||
}); | ||
for await (const result of parseJSON(concatStr(decodeText(await fetchAsyncIterator(fetchOptions))))) { | ||
yield result; | ||
}; | ||
} | ||
function asyncIterableToStream(it) { | ||
return new ReadableStream({ | ||
async pull(controller) { | ||
const { | ||
value, | ||
done | ||
} = await it.next(); | ||
if (done) { | ||
controller.close(); | ||
} else { | ||
controller.enqueue(value); | ||
} | ||
} | ||
}); | ||
} | ||
async function* toSanityMutations(it) { | ||
for await (const mutation of it) { | ||
if (Array.isArray(mutation)) { | ||
yield mutiny.SanityEncoder.encode(mutation); | ||
continue; | ||
} | ||
yield mutiny.SanityEncoder.encode([mutation])[0]; | ||
} | ||
} | ||
const MUTATION_ENDPOINT_MAX_BODY_SIZE = 1024 * 256; | ||
const DEFAULT_MUTATION_CONCURRENCY = 6; | ||
const MAX_MUTATION_CONCURRENCY = 10; | ||
const PADDING_SIZE = '{"mutations":[]}'.length; | ||
async function* batchMutations(mutations, maxBatchSize, options) { | ||
let currentBatch = []; | ||
let currentBatchSize = 0; | ||
for await (const mutation of mutations) { | ||
if ((options == null ? void 0 : options.preserveTransactions) && Array.isArray(mutation)) { | ||
yield currentBatch; | ||
yield mutation; | ||
currentBatch = []; | ||
currentBatchSize = 0; | ||
continue; | ||
} | ||
const mutationSize = JSON.stringify(mutation).length; | ||
if (mutationSize >= maxBatchSize + PADDING_SIZE) { | ||
if (currentBatch.length) { | ||
yield currentBatch; | ||
} | ||
yield [...arrify__default.default(mutation)]; | ||
currentBatch = []; | ||
currentBatchSize = 0; | ||
continue; | ||
} | ||
currentBatchSize += mutationSize; | ||
if (currentBatchSize >= maxBatchSize + PADDING_SIZE) { | ||
yield currentBatch; | ||
currentBatch = []; | ||
currentBatchSize = 0; | ||
} | ||
currentBatch.push(...arrify__default.default(mutation)); | ||
} | ||
if (currentBatch.length > 0) { | ||
yield currentBatch; | ||
} | ||
} | ||
function create(document) { | ||
return { | ||
type: "create", | ||
document | ||
}; | ||
} | ||
function patch(id, patches, options) { | ||
return { | ||
type: "patch", | ||
id, | ||
patches: arrify__default.default(patches), | ||
...(options ? { | ||
options | ||
} : {}) | ||
}; | ||
} | ||
function at(path, operation) { | ||
return { | ||
path: typeof path === "string" ? paths.fromString(path) : path, | ||
op: operation | ||
}; | ||
} | ||
function createIfNotExists(document) { | ||
return { | ||
type: "createIfNotExists", | ||
document | ||
}; | ||
} | ||
function createOrReplace(document) { | ||
return { | ||
type: "createOrReplace", | ||
document | ||
}; | ||
} | ||
function delete_(id) { | ||
return { | ||
type: "delete", | ||
id | ||
}; | ||
} | ||
const del = delete_; | ||
const set = value => ({ | ||
type: "set", | ||
value | ||
}); | ||
const setIfMissing = value => ({ | ||
type: "setIfMissing", | ||
value | ||
}); | ||
const unset = () => ({ | ||
type: "unset" | ||
}); | ||
const inc = function () { | ||
let amount = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : 1; | ||
return { | ||
type: "inc", | ||
amount | ||
}; | ||
}; | ||
const dec = function () { | ||
let amount = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : 1; | ||
return { | ||
type: "dec", | ||
amount | ||
}; | ||
}; | ||
const diffMatchPatch = value => ({ | ||
type: "diffMatchPatch", | ||
value | ||
}); | ||
function insert(items, position, indexOrReferenceItem) { | ||
return { | ||
type: "insert", | ||
referenceItem: indexOrReferenceItem, | ||
position, | ||
items: arrify__default.default(items) | ||
}; | ||
} | ||
function append(items) { | ||
return insert(items, "after", -1); | ||
} | ||
function prepend(items) { | ||
return insert(items, "before", 0); | ||
} | ||
function insertBefore(items, indexOrReferenceItem) { | ||
return insert(items, "before", indexOrReferenceItem); | ||
} | ||
const insertAfter = (items, indexOrReferenceItem) => { | ||
return insert(items, "after", indexOrReferenceItem); | ||
}; | ||
function truncate(startIndex, endIndex) { | ||
return { | ||
type: "truncate", | ||
startIndex, | ||
endIndex | ||
}; | ||
} | ||
function replace(items, referenceItem) { | ||
return { | ||
type: "replace", | ||
referenceItem, | ||
items: arrify__default.default(items) | ||
}; | ||
} | ||
function getValueType(value) { | ||
@@ -706,3 +1013,3 @@ if (Array.isArray(value)) { | ||
filter: migration.filter, | ||
documentType: migration.documentType | ||
documentTypes: migration.documentTypes | ||
}); | ||
@@ -717,6 +1024,7 @@ } | ||
function createAsyncIterableMutation(migration, opts) { | ||
const documentTypesSet = new Set(opts.documentTypes); | ||
return async function* run(docs, context) { | ||
for await (const doc of docs) { | ||
if (doc._type !== opts.documentType) continue; | ||
const documentMutations = collectDocumentMutations(migration, doc, context); | ||
for await (const doc of docs()) { | ||
if (!documentTypesSet.has(doc._type)) continue; | ||
const documentMutations = await collectDocumentMutations(migration, doc, context); | ||
if (documentMutations.length > 0) { | ||
@@ -728,16 +1036,17 @@ yield documentMutations; | ||
} | ||
function collectDocumentMutations(migration, doc, context) { | ||
async function collectDocumentMutations(migration, doc, context) { | ||
var _a; | ||
const documentMutations = (_a = migration.document) == null ? void 0 : _a.call(migration, doc, context); | ||
const nodeMigrations = flatMapDeep(doc, (value, path) => { | ||
const documentMutations = Promise.resolve((_a = migration.document) == null ? void 0 : _a.call(migration, doc, context)); | ||
const nodeMigrations = flatMapDeep(doc, async (value, path) => { | ||
var _a2; | ||
return [...arrify__default.default((_a2 = migration.node) == null ? void 0 : _a2.call(migration, value, path, context)), ...arrify__default.default(migrateNodeType(migration, value, path, context))].map(change => normalizeNodeMutation(path, change)); | ||
const [nodeReturnValues, nodeTypeReturnValues] = await Promise.all([Promise.resolve((_a2 = migration.node) == null ? void 0 : _a2.call(migration, value, path, context)), Promise.resolve(migrateNodeType(migration, value, path, context))]); | ||
return [...arrify__default.default(nodeReturnValues), ...arrify__default.default(nodeTypeReturnValues)].map(change => normalizeNodeMutation(path, change)); | ||
}); | ||
return [...arrify__default.default(documentMutations), ...nodeMigrations].map(change => normalizeDocumentMutation(doc._id, change)); | ||
return (await Promise.all([...arrify__default.default(await documentMutations), ...nodeMigrations])).flat().map(change => normalizeDocumentMutation(doc._id, change)); | ||
} | ||
function normalizeDocumentMutation(documentId, change) { | ||
return isMutation(change) ? change : mutiny.patch(documentId, change); | ||
return isMutation(change) ? change : patch(documentId, change); | ||
} | ||
function normalizeNodeMutation(path, change) { | ||
return isOperation(change) ? mutiny.at(path, change) : change; | ||
return isOperation(change) ? at(path, change) : change; | ||
} | ||
@@ -763,33 +1072,276 @@ function migrateNodeType(migration, value, path, context) { | ||
} | ||
function collectMigrationMutations(migration, documents) { | ||
const ctx = { | ||
withDocument: () => { | ||
throw new Error("Not implemented yet"); | ||
function collectMigrationMutations(migration, documents, context) { | ||
const migrate = normalizeMigrateDefinition(migration); | ||
return migrate(documents, context); | ||
} | ||
function getBufferFilePath() { | ||
return path__default.default.join(node_os.tmpdir(), "/export-buffer-".concat(Date.now(), ".tmp")); | ||
} | ||
function parseGroq(query) { | ||
try { | ||
return groq__default.default.parse(query); | ||
} catch (err) { | ||
err.message = 'Failed to parse GROQ filter "'.concat(query, '": ').concat(err.message); | ||
throw err; | ||
} | ||
} | ||
async function groqQuery(it, query, params) { | ||
const parsedFilter = parseGroq(query); | ||
const all = await toArray(it); | ||
return (await groq__default.default.evaluate(parsedFilter, { | ||
dataset: all, | ||
params | ||
})).get(); | ||
} | ||
function createBufferFileContext(getReader) { | ||
function getAllDocumentsFromBuffer() { | ||
return parse(decodeText(streamToAsyncIterator(getReader())), { | ||
parse: safeJsonParser | ||
}); | ||
} | ||
async function getDocumentsFromBuffer(ids) { | ||
const found = {}; | ||
let remaining = ids.length; | ||
for await (const doc of getAllDocumentsFromBuffer()) { | ||
if (ids.includes(doc._id)) { | ||
remaining--; | ||
found[doc._id] = doc; | ||
} | ||
if (remaining === 0) break; | ||
} | ||
return ids.map(id => found[id]); | ||
} | ||
async function getDocumentFromBuffer(id) { | ||
return (await getDocumentsFromBuffer([id]))[0]; | ||
} | ||
function queryFromBuffer(query, params) { | ||
return groqQuery(getAllDocumentsFromBuffer(), query, params); | ||
} | ||
return { | ||
getDocument: getDocumentFromBuffer, | ||
getDocuments: getDocumentsFromBuffer, | ||
query: queryFromBuffer | ||
}; | ||
const migrate = normalizeMigrateDefinition(migration); | ||
return migrate(documents, ctx); | ||
} | ||
async function* run(config, migration) { | ||
const mutations = collectMigrationMutations(migration, ndjson(await fromExportEndpoint(config.api))); | ||
for await (const result of toMutationEndpoint(config.api, mutations)) { | ||
yield formatMutationResponse(result); | ||
function isSystemDocumentId(id) { | ||
return id.startsWith("_."); | ||
} | ||
function parseGroqFilter(filter) { | ||
try { | ||
return groq__default.default.parse("*[".concat(filter, "]")); | ||
} catch (err) { | ||
err.message = 'Failed to parse GROQ filter "'.concat(filter, '": ').concat(err.message); | ||
throw err; | ||
} | ||
} | ||
function formatMutationResponse(mutationResponse) { | ||
return "OK (transactionId = ".concat(mutationResponse.transactionId, ")\n").concat(mutationResponse.results.map(result => { | ||
return " - ".concat(result.operation, ": ").concat(result.id); | ||
}).join("\n")); | ||
async function matchesFilter(parsedFilter, document) { | ||
const result = await (await groq__default.default.evaluate(parsedFilter, { | ||
dataset: [document] | ||
})).get(); | ||
return result.length === 1; | ||
} | ||
async function* applyFilters(migration, documents) { | ||
const documentTypes = migration.documentTypes; | ||
const parsedFilter = migration.filter ? parseGroqFilter(migration.filter) : void 0; | ||
for await (const doc of documents) { | ||
if (isSystemDocumentId(doc._id)) { | ||
continue; | ||
} | ||
if (documentTypes && documentTypes.length > 0 && !documentTypes.includes(doc._type)) { | ||
continue; | ||
} | ||
if (parsedFilter && !(await matchesFilter(parsedFilter, doc))) { | ||
continue; | ||
} | ||
yield doc; | ||
} | ||
} | ||
async function* toFetchOptionsIterable(apiConfig, mutations) { | ||
var _a; | ||
for await (const mut of mutations) { | ||
yield toFetchOptions({ | ||
projectId: apiConfig.projectId, | ||
apiVersion: apiConfig.apiVersion, | ||
token: apiConfig.token, | ||
apiHost: (_a = apiConfig.apiHost) != null ? _a : "api.sanity.io", | ||
endpoint: endpoints.data.mutate(apiConfig.dataset, { | ||
returnIds: true | ||
}), | ||
body: JSON.stringify({ | ||
mutations: mut | ||
}) | ||
}); | ||
} | ||
} | ||
async function run(config, migration) { | ||
var _a, _b, _c; | ||
const stats = { | ||
documents: 0, | ||
mutations: 0, | ||
pending: 0, | ||
queuedBatches: 0, | ||
completedTransactions: [], | ||
currentMutations: [] | ||
}; | ||
const filteredDocuments = applyFilters(migration, parse(decodeText(streamToAsyncIterator(await fromExportEndpoint({ | ||
...config.api, | ||
documentTypes: migration.documentTypes | ||
}))), { | ||
parse: safeJsonParser | ||
})); | ||
const abortController = new AbortController(); | ||
const createReader = bufferThroughFile(asyncIterableToStream(stringify(filteredDocuments)), getBufferFilePath(), { | ||
signal: abortController.signal | ||
}); | ||
const context = createBufferFileContext(createReader); | ||
const documents = () => tap(parse(decodeText(streamToAsyncIterator(createReader())), { | ||
parse: safeJsonParser | ||
}), () => { | ||
var _a2; | ||
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, { | ||
...stats, | ||
documents: ++stats.documents | ||
}); | ||
}); | ||
const mutations = tap(collectMigrationMutations(migration, documents, context), muts => { | ||
var _a2; | ||
stats.currentMutations = arrify__default.default(muts); | ||
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, { | ||
...stats, | ||
mutations: ++stats.mutations | ||
}); | ||
}); | ||
const concurrency = (_a = config == null ? void 0 : config.concurrency) != null ? _a : DEFAULT_MUTATION_CONCURRENCY; | ||
if (concurrency > MAX_MUTATION_CONCURRENCY) { | ||
throw new Error("Concurrency exceeds maximum allowed value (".concat(MAX_MUTATION_CONCURRENCY, ")")); | ||
} | ||
const batches = tap(batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE), () => { | ||
var _a2; | ||
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, { | ||
...stats, | ||
queuedBatches: ++stats.queuedBatches | ||
}); | ||
}); | ||
const submit = async opts => lastValueFrom(parseJSON(concatStr(decodeText(await fetchAsyncIterator(opts))))); | ||
const commits = await mapAsync(toFetchOptionsIterable(config.api, batches), opts => { | ||
var _a2; | ||
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, { | ||
...stats, | ||
pending: ++stats.pending | ||
}); | ||
return submit(opts); | ||
}, concurrency); | ||
for await (const result of commits) { | ||
stats.completedTransactions.push(result); | ||
(_b = config.onProgress) == null ? void 0 : _b.call(config, { | ||
...stats | ||
}); | ||
} | ||
(_c = config.onProgress) == null ? void 0 : _c.call(config, { | ||
...stats, | ||
done: true | ||
}); | ||
abortController.abort(); | ||
} | ||
async function runFromArchive(migration, path, config) { | ||
var _a, _b, _c; | ||
const stats = { | ||
documents: 0, | ||
mutations: 0, | ||
pending: 0, | ||
queuedBatches: 0, | ||
completedTransactions: [], | ||
currentMutations: [] | ||
}; | ||
const filteredDocuments = applyFilters(migration, parse(decodeText(fromExportArchive(path)), { | ||
parse: safeJsonParser | ||
})); | ||
const abortController = new AbortController(); | ||
const createReader = bufferThroughFile(asyncIterableToStream(stringify(filteredDocuments)), getBufferFilePath(), { | ||
signal: abortController.signal | ||
}); | ||
const context = createBufferFileContext(createReader); | ||
const documents = () => tap(parse(decodeText(streamToAsyncIterator(createReader())), { | ||
parse: safeJsonParser | ||
}), () => { | ||
var _a2; | ||
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, { | ||
...stats, | ||
documents: ++stats.documents | ||
}); | ||
}); | ||
const mutations = tap(collectMigrationMutations(migration, documents, context), muts => { | ||
var _a2; | ||
stats.currentMutations = arrify__default.default(muts); | ||
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, { | ||
...stats, | ||
mutations: ++stats.mutations | ||
}); | ||
}); | ||
const batches = tap(batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE), () => { | ||
var _a2; | ||
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, { | ||
...stats, | ||
queuedBatches: ++stats.queuedBatches | ||
}); | ||
}); | ||
const concurrency = (_a = config == null ? void 0 : config.concurrency) != null ? _a : DEFAULT_MUTATION_CONCURRENCY; | ||
if (concurrency > MAX_MUTATION_CONCURRENCY) { | ||
throw new Error("Concurrency exceeds maximum allowed value (".concat(MAX_MUTATION_CONCURRENCY, ")")); | ||
} | ||
const commits = await mapAsync(toFetchOptionsIterable(config.api, batches), opts => { | ||
var _a2; | ||
(_a2 = config.onProgress) == null ? void 0 : _a2.call(config, { | ||
...stats, | ||
pending: ++stats.pending | ||
}); | ||
return Promise.resolve(); | ||
}, concurrency); | ||
for await (const result of commits) { | ||
(_b = config.onProgress) == null ? void 0 : _b.call(config, { | ||
...stats | ||
}); | ||
} | ||
abortController.abort(); | ||
(_c = config.onProgress) == null ? void 0 : _c.call(config, { | ||
...stats, | ||
done: true | ||
}); | ||
} | ||
async function* dryRun(config, migration) { | ||
const mutations = collectMigrationMutations(migration, ndjson(await fromExportEndpoint(config.api))); | ||
const filteredDocuments = applyFilters(migration, parse(decodeText(streamToAsyncIterator(await fromExportEndpoint({ | ||
...config.api, | ||
documentTypes: migration.documentTypes | ||
}))), { | ||
parse: safeJsonParser | ||
})); | ||
const abortController = new AbortController(); | ||
const createReader = bufferThroughFile(asyncIterableToStream(filteredDocuments), getBufferFilePath(), { | ||
signal: abortController.signal | ||
}); | ||
const context = createBufferFileContext(createReader); | ||
const mutations = collectMigrationMutations(migration, () => parse(decodeText(streamToAsyncIterator(createReader())), { | ||
parse: safeJsonParser | ||
}), context); | ||
abortController.abort(); | ||
for await (const mutation of mutations) { | ||
if (!mutation) continue; | ||
yield mutiny.CompactFormatter.format(Array.isArray(mutation) ? mutation : [mutation]); | ||
yield JSON.stringify(mutation, null, 2); | ||
} | ||
} | ||
exports.DEFAULT_MUTATION_CONCURRENCY = DEFAULT_MUTATION_CONCURRENCY; | ||
exports.MAX_MUTATION_CONCURRENCY = MAX_MUTATION_CONCURRENCY; | ||
exports.append = append; | ||
exports.at = at; | ||
exports.collectMigrationMutations = collectMigrationMutations; | ||
exports.create = create; | ||
exports.createIfNotExists = createIfNotExists; | ||
exports.createOrReplace = createOrReplace; | ||
exports.dec = dec; | ||
exports.decodeText = decodeText; | ||
exports.defineMigration = defineMigration; | ||
exports.del = del; | ||
exports.delay = delay; | ||
exports.delete_ = delete_; | ||
exports.diffMatchPatch = diffMatchPatch; | ||
exports.dryRun = dryRun; | ||
@@ -800,10 +1352,25 @@ exports.filter = filter; | ||
exports.fromExportEndpoint = fromExportEndpoint; | ||
exports.inc = inc; | ||
exports.insert = insert; | ||
exports.insertAfter = insertAfter; | ||
exports.insertBefore = insertBefore; | ||
exports.map = map; | ||
exports.ndjson = ndjson; | ||
exports.parse = parse; | ||
exports.parseJSON = parseJSON; | ||
exports.patch = patch; | ||
exports.prepend = prepend; | ||
exports.replace = replace; | ||
exports.run = run; | ||
exports.runFromArchive = runFromArchive; | ||
exports.safeJsonParser = safeJsonParser; | ||
exports.set = set; | ||
exports.setIfMissing = setIfMissing; | ||
exports.split = split; | ||
exports.stringify = stringify; | ||
exports.stringifyJSON = stringifyJSON; | ||
exports.take = take; | ||
exports.toArray = toArray; | ||
exports.toFetchOptionsIterable = toFetchOptionsIterable; | ||
exports.truncate = truncate; | ||
exports.unset = unset; | ||
//# sourceMappingURL=index.js.map |
@@ -1,7 +0,11 @@ | ||
import {Mutation} from '@bjoerge/mutiny' | ||
import type {NodePatch} from '@bjoerge/mutiny' | ||
import type {Operation} from '@bjoerge/mutiny' | ||
import type {Path} from '@bjoerge/mutiny' | ||
import {SanityDocument} from '@sanity/types' | ||
import type {KeyedSegment} from '@sanity/types' | ||
import {MultipleMutationResult} from '@sanity/client' | ||
import {Mutation as Mutation_2} from '@sanity/client' | ||
import type {Path} from '@sanity/types' | ||
import {SanityDocument as SanityDocument_2} from '@sanity/types' | ||
declare type AnyArray<T = any> = T[] | readonly T[] | ||
export declare type AnyOp = SetOp<unknown> | SetIfMissingOp<unknown> | UnsetOp | ||
export declare interface APIConfig { | ||
@@ -15,12 +19,103 @@ projectId: string | ||
/** | ||
* Creates an `insert` operation that appends the provided items. | ||
* @param items - The items to append. | ||
* @returns An `insert` operation at the end of the array. | ||
* {@link https://www.sanity.io/docs/http-patches#Cw4vhD88} | ||
* @beta | ||
*/ | ||
export declare function append<const Items extends AnyArray<unknown>>( | ||
items: Items | ArrayElement<Items>, | ||
): InsertOp<NormalizeReadOnlyArray<Items>, 'after', -1> | ||
declare type ArrayElement<A> = A extends readonly (infer T)[] ? T : never | ||
export declare type ArrayOp = | ||
| InsertOp<AnyArray, RelativePosition, IndexedSegment | KeyedSegment> | ||
| UpsertOp<AnyArray, RelativePosition, IndexedSegment | KeyedSegment> | ||
| ReplaceOp<AnyArray, IndexedSegment | KeyedSegment> | ||
| TruncateOp | ||
export declare type AssignOp<T extends object = object> = { | ||
type: 'assign' | ||
value: T | ||
} | ||
export declare type AsyncIterableMigration = ( | ||
documents: AsyncIterableIterator<SanityDocument>, | ||
documents: () => AsyncIterableIterator<SanityDocument_2>, | ||
context: MigrationContext, | ||
) => AsyncGenerator<Mutation | Mutation[]> | ||
/** | ||
* Creates a node patch at a specific path. | ||
* @param path - The path where the operation should be applied. | ||
* @param operation - The operation to be applied. | ||
* @returns The node patch. | ||
* @beta | ||
*/ | ||
export declare function at<O extends Operation>( | ||
path: Path | string, | ||
operation: O, | ||
): NodePatch<Path, O> | ||
export declare function collectMigrationMutations( | ||
migration: Migration, | ||
documents: AsyncIterableIterator<SanityDocument>, | ||
documents: () => AsyncIterableIterator<SanityDocument_2>, | ||
context: MigrationContext, | ||
): AsyncGenerator<Mutation | Mutation[], any, unknown> | ||
/** | ||
* Creates a new document. | ||
* @param document - The document to be created. | ||
* @returns The mutation operation to create the document. | ||
* @beta | ||
*/ | ||
export declare function create<Doc extends Optional<SanityDocument, '_id'>>( | ||
document: Doc, | ||
): CreateMutation<Doc> | ||
/** | ||
* Creates a document if it does not exist. | ||
* @param document - The document to be created. | ||
* @returns The mutation operation to create the document if it does not exist. | ||
* @beta | ||
*/ | ||
export declare function createIfNotExists<Doc extends SanityDocument>( | ||
document: Doc, | ||
): CreateIfNotExistsMutation<Doc> | ||
export declare type CreateIfNotExistsMutation<Doc extends SanityDocument> = { | ||
type: 'createIfNotExists' | ||
document: Doc | ||
} | ||
export declare type CreateMutation<Doc extends Optional<SanityDocument, '_id'>> = { | ||
type: 'create' | ||
document: Doc | ||
} | ||
/** | ||
* Creates or replaces a document. | ||
* @param document - The document to be created or replaced. | ||
* @returns The mutation operation to create or replace the document. | ||
* @beta | ||
*/ | ||
export declare function createOrReplace<Doc extends SanityDocument>( | ||
document: Doc, | ||
): CreateOrReplaceMutation<Doc> | ||
export declare type CreateOrReplaceMutation<Doc extends SanityDocument> = { | ||
type: 'createOrReplace' | ||
document: Doc | ||
} | ||
/** | ||
* Creates a `dec` (decrement) operation with the provided amount. | ||
* @param amount - The amount to decrement by. | ||
* @returns A `dec` operation. | ||
* {@link https://www.sanity.io/docs/http-patches#vIT8WWQo} | ||
* @beta | ||
*/ | ||
export declare const dec: <const N extends number = 1>(amount?: N) => DecOp<N> | ||
export declare function decodeText( | ||
@@ -30,4 +125,16 @@ it: AsyncIterableIterator<Uint8Array>, | ||
export declare type DecOp<Amount extends number> = { | ||
type: 'dec' | ||
amount: Amount | ||
} | ||
export declare const DEFAULT_MUTATION_CONCURRENCY = 6 | ||
export declare function defineMigration<T extends Migration>(migration: T): T | ||
/** | ||
* Alias for delete_ | ||
*/ | ||
export declare const del: typeof delete_ | ||
export declare function delay<T>( | ||
@@ -38,2 +145,29 @@ it: AsyncIterableIterator<T>, | ||
/** | ||
* Deletes a document. | ||
* @param id - The id of the document to be deleted. | ||
* @returns The mutation operation to delete the document. | ||
* @beta | ||
*/ | ||
export declare function delete_(id: string): DeleteMutation | ||
export declare type DeleteMutation = { | ||
type: 'delete' | ||
id: string | ||
} | ||
/** | ||
* Creates a `diffMatchPatch` operation with the provided value. | ||
* @param value - The value for the diff match patch operation. | ||
* @returns A `diffMatchPatch` operation. | ||
* {@link https://www.sanity.io/docs/http-patches#aTbJhlAJ} | ||
* @beta | ||
*/ | ||
export declare const diffMatchPatch: (value: string) => DiffMatchPatchOp | ||
export declare type DiffMatchPatchOp = { | ||
type: 'diffMatchPatch' | ||
value: string | ||
} | ||
export declare type DocumentMigrationReturnValue = | ||
@@ -47,14 +181,23 @@ | undefined | ||
export declare function dryRun( | ||
config: MigrationRunnerOptions_2, | ||
config: MigrationRunnerOptions, | ||
migration: Migration, | ||
): AsyncGenerator<string, void, unknown> | ||
export declare interface ExportAPIConfig extends APIConfig { | ||
documentTypes?: string[] | ||
} | ||
declare interface FetchOptions { | ||
url: string | URL | ||
init: RequestInit | ||
} | ||
export declare function filter<T>( | ||
it: AsyncIterableIterator<T>, | ||
predicate: (value: T) => boolean, | ||
predicate: (value: T) => boolean | Promise<boolean>, | ||
): AsyncGenerator<Awaited<T>, void, unknown> | ||
export declare function fromDocuments( | ||
documents: SanityDocument[], | ||
): Generator<SanityDocument, void, unknown> | ||
documents: SanityDocument_2[], | ||
): Generator<SanityDocument_2, void, unknown> | ||
@@ -64,5 +207,82 @@ export declare function fromExportArchive(path: string): AsyncGenerator<Uint8Array, void, unknown> | ||
export declare function fromExportEndpoint( | ||
options: APIConfig, | ||
): Promise<AsyncGenerator<Uint8Array, void, unknown>> | ||
options: ExportAPIConfig, | ||
): Promise<ReadableStream<Uint8Array>> | ||
/** | ||
* Creates an `inc` (increment) operation with the provided amount. | ||
* @param amount - The amount to increment by. | ||
* @returns An `inc` operation. | ||
* {@link https://www.sanity.io/docs/http-patches#vIT8WWQo} | ||
* @beta | ||
*/ | ||
export declare const inc: <const N extends number = 1>(amount?: N) => IncOp<N> | ||
export declare type IncOp<Amount extends number> = { | ||
type: 'inc' | ||
amount: Amount | ||
} | ||
export declare type IndexedSegment = number | ||
/** | ||
* Creates an `insert` operation with the provided items, position, and reference item. | ||
* @param items - The items to insert. | ||
* @param position - The position to insert at. | ||
* @param indexOrReferenceItem - The index or reference item to insert before or after. | ||
* @returns An `insert` operation. | ||
* {@link https://www.sanity.io/docs/http-patches#febxf6Fk} | ||
* @beta | ||
*/ | ||
export declare function insert< | ||
const Items extends AnyArray<unknown>, | ||
const Pos extends RelativePosition, | ||
const ReferenceItem extends IndexedSegment | KeyedSegment, | ||
>( | ||
items: Items | ArrayElement<Items>, | ||
position: Pos, | ||
indexOrReferenceItem: ReferenceItem, | ||
): InsertOp<NormalizeReadOnlyArray<Items>, Pos, ReferenceItem> | ||
/** | ||
* Creates an `insert` operation that inserts the provided items after the provided index or reference item. | ||
* @param items - The items to insert. | ||
* @param indexOrReferenceItem - The index or reference item to insert after. | ||
* @returns An `insert` operation after the provided index or reference item. | ||
* {@link https://www.sanity.io/docs/http-patches#0SQmPlb6} | ||
* @beta | ||
*/ | ||
export declare const insertAfter: < | ||
const Items extends AnyArray<unknown>, | ||
const ReferenceItem extends number | KeyedSegment, | ||
>( | ||
items: Items | ArrayElement<Items>, | ||
indexOrReferenceItem: ReferenceItem, | ||
) => InsertOp<NormalizeReadOnlyArray<Items>, 'after', ReferenceItem> | ||
/** | ||
* Creates an `insert` operation that inserts the provided items before the provided index or reference item. | ||
* @param items - The items to insert. | ||
* @param indexOrReferenceItem - The index or reference item to insert before. | ||
* @returns An `insert` operation before the provided index or reference item. | ||
* {@link https://www.sanity.io/docs/http-patches#0SQmPlb6} | ||
*/ | ||
export declare function insertBefore< | ||
const Items extends AnyArray<unknown>, | ||
const ReferenceItem extends IndexedSegment | KeyedSegment, | ||
>( | ||
items: Items | ArrayElement<Items>, | ||
indexOrReferenceItem: ReferenceItem, | ||
): InsertOp<NormalizeReadOnlyArray<Items>, 'before', ReferenceItem> | ||
export declare type InsertOp< | ||
Items extends AnyArray, | ||
Pos extends RelativePosition, | ||
ReferenceItem extends IndexedSegment | KeyedSegment, | ||
> = { | ||
type: 'insert' | ||
referenceItem: ReferenceItem | ||
position: Pos | ||
items: Items | ||
} | ||
export declare type JsonArray = JsonValue[] | readonly JsonValue[] | ||
@@ -76,2 +296,8 @@ | ||
export declare interface JSONOptions<Type> { | ||
parse?: JSONParser<Type> | ||
} | ||
export declare type JSONParser<Type> = (line: string) => Type | ||
export declare type JsonPrimitive = string | number | boolean | null | ||
@@ -81,2 +307,4 @@ | ||
export {KeyedSegment} | ||
export declare function map<T, U>( | ||
@@ -87,2 +315,4 @@ it: AsyncIterableIterator<T>, | ||
export declare const MAX_MUTATION_CONCURRENCY = 10 | ||
export declare type MigrateDefinition = NodeMigration | AsyncIterableMigration | ||
@@ -96,3 +326,6 @@ | ||
filter?: string | ||
documentType: string | ||
/** | ||
* What document types to migrate | ||
*/ | ||
documentTypes?: string[] | ||
migrate: Def | ||
@@ -102,63 +335,76 @@ } | ||
export declare interface MigrationContext { | ||
withDocument(id: string): Promise<SanityDocument | null> | ||
getDocument<T extends SanityDocument_2>(id: string): Promise<T | undefined> | ||
getDocuments<T extends SanityDocument_2>(ids: string[]): Promise<T[]> | ||
query<T>(query: string, params?: Record<string, unknown>): Promise<T> | ||
} | ||
declare interface MigrationRunnerOptions { | ||
export declare type MigrationProgress = { | ||
documents: number | ||
mutations: number | ||
pending: number | ||
queuedBatches: number | ||
currentMutations: Mutation[] | ||
completedTransactions: MultipleMutationResult[] | ||
done?: boolean | ||
} | ||
export declare interface MigrationRunnerConfig { | ||
api: APIConfig | ||
concurrency?: number | ||
onProgress?: (event: MigrationProgress) => void | ||
} | ||
declare interface MigrationRunnerOptions_2 { | ||
declare interface MigrationRunnerOptions { | ||
api: APIConfig | ||
} | ||
export declare function ndjson( | ||
it: AsyncIterableIterator<Uint8Array>, | ||
): AsyncIterableIterator<unknown> | ||
export declare type Mutation<Doc extends SanityDocument = any> = | ||
| CreateMutation<Doc> | ||
| CreateIfNotExistsMutation<Doc> | ||
| CreateOrReplaceMutation<Doc> | ||
| DeleteMutation | ||
| PatchMutation | ||
export declare interface NodeMigration { | ||
document?: <Doc extends SanityDocument>( | ||
document?: <Doc extends SanityDocument_2>( | ||
doc: Doc, | ||
context: NodeMigrationContext, | ||
) => DocumentMigrationReturnValue | ||
context: MigrationContext, | ||
) => DocumentMigrationReturnValue | Promise<DocumentMigrationReturnValue> | ||
node?: <Node extends JsonValue>( | ||
node: Node, | ||
path: Path, | ||
context: NodeMigrationContext, | ||
) => NodeMigrationReturnValue | ||
context: MigrationContext, | ||
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue> | ||
object?: <Node extends JsonObject>( | ||
node: Node, | ||
path: Path, | ||
context: NodeMigrationContext, | ||
) => NodeMigrationReturnValue | ||
context: MigrationContext, | ||
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue> | ||
array?: <Node extends JsonArray>( | ||
node: Node, | ||
path: Path, | ||
context: NodeMigrationContext, | ||
) => NodeMigrationReturnValue | ||
context: MigrationContext, | ||
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue> | ||
string?: <Node extends string>( | ||
node: Node, | ||
path: Path, | ||
context: NodeMigrationContext, | ||
) => NodeMigrationReturnValue | ||
context: MigrationContext, | ||
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue> | ||
number?: <Node extends number>( | ||
node: Node, | ||
path: Path, | ||
context: NodeMigrationContext, | ||
) => NodeMigrationReturnValue | ||
context: MigrationContext, | ||
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue> | ||
boolean?: <Node extends boolean>( | ||
node: Node, | ||
path: Path, | ||
context: NodeMigrationContext, | ||
) => NodeMigrationReturnValue | ||
context: MigrationContext, | ||
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue> | ||
null?: <Node extends null>( | ||
node: Node, | ||
path: Path, | ||
context: NodeMigrationContext, | ||
) => NodeMigrationReturnValue | ||
context: MigrationContext, | ||
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue> | ||
} | ||
export declare interface NodeMigrationContext { | ||
withDocument(id: string): Promise<SanityDocument | null> | ||
} | ||
export declare type NodeMigrationReturnValue = | ||
@@ -169,11 +415,156 @@ | DocumentMigrationReturnValue | ||
export declare function parseJSON(it: AsyncIterableIterator<string>): AsyncIterableIterator<unknown> | ||
export declare type NodePatch<P extends Path = Path, O extends Operation = Operation> = { | ||
path: P | ||
op: O | ||
} | ||
export declare type NodePatchList = | ||
| [NodePatch, ...NodePatch[]] | ||
| NodePatch[] | ||
| readonly NodePatch[] | ||
| readonly [NodePatch, ...NodePatch[]] | ||
declare type NormalizeReadOnlyArray<T> = T extends readonly [infer NP, ...infer Rest] | ||
? [NP, ...Rest] | ||
: T extends readonly (infer NP)[] | ||
? NP[] | ||
: T | ||
export declare type NumberOp = IncOp<number> | DecOp<number> | ||
export declare type ObjectOp = AssignOp | UnassignOp | ||
export declare type Operation = PrimitiveOp | ArrayOp | ObjectOp | ||
declare type Optional<T, K extends keyof T> = Omit<T, K> & Partial<Pick<T, K>> | ||
export declare function parse<Type>( | ||
it: AsyncIterableIterator<string>, | ||
options?: JSONOptions<Type>, | ||
): AsyncIterableIterator<Type> | ||
export declare function parseJSON<Type>( | ||
it: AsyncIterableIterator<string>, | ||
{parse}?: JSONOptions<Type>, | ||
): AsyncIterableIterator<Type> | ||
/** | ||
* Applies a patch to a document. | ||
* @param id - The id of the document to be patched. | ||
* @param patches - The patches to be applied. | ||
* @param options - Optional patch options. | ||
* @returns The mutation operation to patch the document. | ||
* @beta | ||
*/ | ||
export declare function patch<P extends NodePatchList | NodePatch>( | ||
id: string, | ||
patches: P, | ||
options?: PatchOptions, | ||
): PatchMutation<NormalizeReadOnlyArray<Tuplify<P>>> | ||
export declare type PatchMutation<Patches extends NodePatchList = NodePatchList> = { | ||
type: 'patch' | ||
id: string | ||
patches: Patches | ||
options?: PatchOptions | ||
} | ||
export declare type PatchOptions = { | ||
ifRevision?: string | ||
} | ||
export {Path} | ||
export declare function run( | ||
config: MigrationRunnerOptions, | ||
/** | ||
* Creates an `insert` operation that prepends the provided items. | ||
* @param items - The items to prepend. | ||
* @returns An `insert` operation at the beginning of the array. | ||
* {@link https://www.sanity.io/docs/http-patches#refAUsf0} | ||
* @beta | ||
*/ | ||
export declare function prepend<const Items extends AnyArray<unknown>>( | ||
items: Items | ArrayElement<Items>, | ||
): InsertOp<NormalizeReadOnlyArray<Items>, 'before', 0> | ||
export declare type PrimitiveOp = AnyOp | StringOp | NumberOp | ||
export declare type RelativePosition = 'before' | 'after' | ||
/** | ||
* Creates a `replace` operation with the provided items and reference item. | ||
* @param items - The items to replace. | ||
* @param referenceItem - The reference item to replace. | ||
* @returns A ReplaceOp operation. | ||
* @remarks This will be converted to an `insert`/`replace` patch when submitted to the API | ||
* {@link https://www.sanity.io/docs/http-patches#GnVSwcPa} | ||
* @beta | ||
*/ | ||
export declare function replace< | ||
Items extends any[], | ||
ReferenceItem extends IndexedSegment | KeyedSegment, | ||
>(items: Items | ArrayElement<Items>, referenceItem: ReferenceItem): ReplaceOp<Items, ReferenceItem> | ||
export declare type ReplaceOp< | ||
Items extends AnyArray, | ||
ReferenceItem extends IndexedSegment | KeyedSegment, | ||
> = { | ||
type: 'replace' | ||
referenceItem: ReferenceItem | ||
items: Items | ||
} | ||
export declare function run(config: MigrationRunnerConfig, migration: Migration): Promise<void> | ||
export declare function runFromArchive( | ||
migration: Migration, | ||
): AsyncGenerator<string, void, unknown> | ||
path: string, | ||
config: MigrationRunnerConfig, | ||
): Promise<void> | ||
/** | ||
* Safe JSON parser that is able to handle lines interrupted by an error object. | ||
* | ||
* This may occur when streaming NDJSON from the Export HTTP API. | ||
* | ||
* @internal | ||
* @see {@link https://github.com/sanity-io/sanity/pull/1787 | Initial pull request} | ||
*/ | ||
export declare const safeJsonParser: (line: string) => SanityDocument_2 | ||
export declare type SanityDocument = { | ||
_id?: string | ||
_type: string | ||
_createdAt?: string | ||
_updatedAt?: string | ||
_rev?: string | ||
} | ||
/** | ||
* Creates a `set` operation with the provided value. | ||
* @param value - The value to set. | ||
* @returns A `set` operation. | ||
* {@link https://www.sanity.io/docs/http-patches#6TPENSW3} | ||
* @beta | ||
*/ | ||
export declare const set: <const T>(value: T) => SetOp<T> | ||
/** | ||
* Creates a `setIfMissing` operation with the provided value. | ||
* @param value - The value to set if missing. | ||
* @returns A `setIfMissing` operation. | ||
* {@link https://www.sanity.io/docs/http-patches#A80781bT} | ||
* @beta | ||
*/ | ||
export declare const setIfMissing: <const T>(value: T) => SetIfMissingOp<T> | ||
export declare type SetIfMissingOp<T> = { | ||
type: 'setIfMissing' | ||
value: T | ||
} | ||
export declare type SetOp<T> = { | ||
type: 'set' | ||
value: T | ||
} | ||
export declare function split( | ||
@@ -184,2 +575,6 @@ it: AsyncIterableIterator<string>, | ||
export declare function stringify( | ||
iterable: AsyncIterableIterator<unknown>, | ||
): AsyncGenerator<string, void, unknown> | ||
export declare function stringifyJSON( | ||
@@ -189,2 +584,4 @@ it: AsyncIterableIterator<unknown>, | ||
export declare type StringOp = DiffMatchPatchOp | ||
export declare function take<T>( | ||
@@ -197,2 +594,58 @@ it: AsyncIterableIterator<T>, | ||
export declare function toFetchOptionsIterable( | ||
apiConfig: APIConfig, | ||
mutations: AsyncIterableIterator<Mutation_2[]>, | ||
): AsyncGenerator<FetchOptions, void, unknown> | ||
/** | ||
* Creates a `truncate` operation that will remove all items after `startIndex` until the end of the array or the provided `endIndex`. | ||
* @param startIndex - The start index for the truncate operation. | ||
* @param endIndex - The end index for the truncate operation. | ||
* @returns A `truncate` operation. | ||
* @remarks - This will be converted to an `unset` patch when submitted to the API | ||
* {@link https://www.sanity.io/docs/http-patches#xRtBjp8o} | ||
* @beta | ||
*/ | ||
export declare function truncate(startIndex: number, endIndex?: number): TruncateOp | ||
export declare type TruncateOp = { | ||
type: 'truncate' | ||
startIndex: number | ||
endIndex?: number | ||
} | ||
declare type Tuplify<T> = T extends readonly [infer NP, ...infer Rest] | ||
? [NP, ...Rest] | ||
: T extends readonly (infer NP)[] | ||
? NP[] | ||
: [T] | ||
export declare type UnassignOp<K extends readonly string[] = readonly string[]> = { | ||
type: 'unassign' | ||
keys: K | ||
} | ||
/** | ||
* Creates an `unset` operation. | ||
* @returns An `unset` operation. | ||
* {@link https://www.sanity.io/docs/http-patches#xRtBjp8o} | ||
* @beta | ||
*/ | ||
export declare const unset: () => UnsetOp | ||
export declare type UnsetOp = { | ||
type: 'unset' | ||
} | ||
export declare type UpsertOp< | ||
Items extends AnyArray, | ||
Pos extends RelativePosition, | ||
ReferenceItem extends IndexedSegment | KeyedSegment, | ||
> = { | ||
type: 'upsert' | ||
items: Items | ||
referenceItem: ReferenceItem | ||
position: Pos | ||
} | ||
export {} |
{ | ||
"name": "@sanity/migrate", | ||
"version": "3.25.1-canary.22+7467ec230d", | ||
"version": "3.26.2-canary.47+dc645319c7", | ||
"description": "Tooling for running data migrations on Sanity.io projects", | ||
@@ -37,13 +37,2 @@ "keywords": [ | ||
}, | ||
"./mutations": { | ||
"types": "./lib/dts/src/_exports/mutations.d.ts", | ||
"source": "./src/_exports/mutations.ts", | ||
"require": "./lib/_exports/mutations.js", | ||
"node": { | ||
"module": "./lib/_exports/mutations.esm.js", | ||
"import": "./lib/_exports/mutations.cjs.mjs" | ||
}, | ||
"import": "./lib/_exports/mutations.esm.js", | ||
"default": "./lib/_exports/mutations.esm.js" | ||
}, | ||
"./package.json": "./package.json" | ||
@@ -74,11 +63,17 @@ }, | ||
"lint": "eslint .", | ||
"watch": "pkg-utils watch --tsconfig tsconfig.lib.json" | ||
"watch": "pkg-utils watch --tsconfig tsconfig.lib.json", | ||
"test": "jest" | ||
}, | ||
"dependencies": { | ||
"@bjoerge/mutiny": "^0.5.1", | ||
"@sanity/types": "3.25.1-canary.22+7467ec230d", | ||
"@sanity/client": "^6.11.1", | ||
"@sanity/types": "3.26.2-canary.47+dc645319c7", | ||
"@sanity/util": "3.26.2-canary.47+dc645319c7", | ||
"arrify": "^2.0.1", | ||
"fast-fifo": "^1.3.2" | ||
"fast-fifo": "^1.3.2", | ||
"groq-js": "^1.4.1", | ||
"p-map": "^7.0.1" | ||
}, | ||
"devDependencies": { | ||
"@types/arrify": "^2.0.1", | ||
"rimraf": "^3.0.2" | ||
@@ -89,3 +84,3 @@ }, | ||
}, | ||
"gitHead": "7467ec230d743872295acc60992b3510fa2dc51f" | ||
"gitHead": "dc645319c7d109039871bc4b3385a0df47a10d55" | ||
} |
export {fromExportArchive} from '../sources/fromExportArchive' | ||
export {fromExportEndpoint} from '../sources/fromExportEndpoint' | ||
export {fromDocuments} from '../sources/fromDocuments' | ||
export {safeJsonParser} from '../sources/fromExportEndpoint' | ||
export * from '../types' | ||
@@ -8,3 +9,6 @@ export * from '../defineMigration' | ||
export * from '../runner/run' | ||
export * from '../runner/runFromArchive' | ||
export * from '../runner/dryRun' | ||
export * from '../runner/collectMigrationMutations' | ||
export {MAX_MUTATION_CONCURRENCY, DEFAULT_MUTATION_CONCURRENCY} from '../runner/constants' | ||
export * from '../mutations' |
@@ -25,7 +25,8 @@ type SupportedMethod = 'GET' | 'POST' | ||
}), | ||
export: (dataset: string): Endpoint => ({ | ||
export: (dataset: string, documentTypes?: string[]): Endpoint => ({ | ||
global: false, | ||
method: 'GET', | ||
path: `/data/export/${dataset}`, | ||
searchParams: [], | ||
searchParams: | ||
documentTypes && documentTypes?.length > 0 ? [['types', documentTypes.join(',')]] : [], | ||
}), | ||
@@ -32,0 +33,0 @@ mutate: ( |
@@ -1,2 +0,2 @@ | ||
import {streamAsyncIterator} from '../utils/streamToAsyncIterator' | ||
import {streamToAsyncIterator} from '../utils/streamToAsyncIterator' | ||
@@ -11,5 +11,13 @@ export interface FetchOptions { | ||
export function assert2xx(res: Response) { | ||
export async function assert2xx(res: Response): Promise<void> { | ||
if (res.status < 200 || res.status > 299) { | ||
const err = new Error(`HTTP Error ${res.status}: ${res.statusText}`) as HTTPError | ||
const response = await res.json().catch(() => { | ||
throw new Error(`Error parsing JSON ${res.status}: ${res.statusText}`) | ||
}) | ||
const message = response.error | ||
? response.error.description | ||
: `HTTP Error ${res.status}: ${res.statusText}` | ||
const err = new Error(message) as HTTPError | ||
err.statusCode = res.status | ||
@@ -20,7 +28,11 @@ throw err | ||
export async function fetchAsyncIterator({url, init}: FetchOptions) { | ||
export async function fetchStream({url, init}: FetchOptions) { | ||
const response = await fetch(url, init) | ||
assert2xx(response) | ||
await assert2xx(response) | ||
if (response.body === null) throw new Error('No response received') | ||
return streamAsyncIterator(response.body) | ||
return response.body | ||
} | ||
export async function fetchAsyncIterator(options: FetchOptions) { | ||
return streamToAsyncIterator(await fetchStream(options)) | ||
} |
@@ -1,4 +0,7 @@ | ||
export async function* filter<T>(it: AsyncIterableIterator<T>, predicate: (value: T) => boolean) { | ||
export async function* filter<T>( | ||
it: AsyncIterableIterator<T>, | ||
predicate: (value: T) => boolean | Promise<boolean>, | ||
) { | ||
for await (const chunk of it) { | ||
if (predicate(chunk)) { | ||
if (await predicate(chunk)) { | ||
yield chunk | ||
@@ -5,0 +8,0 @@ } |
@@ -1,6 +0,13 @@ | ||
export async function* parseJSON( | ||
export type JSONParser<Type> = (line: string) => Type | ||
export interface JSONOptions<Type> { | ||
parse?: JSONParser<Type> | ||
} | ||
export async function* parseJSON<Type>( | ||
it: AsyncIterableIterator<string>, | ||
): AsyncIterableIterator<unknown> { | ||
{parse = JSON.parse}: JSONOptions<Type> = {}, | ||
): AsyncIterableIterator<Type> { | ||
for await (const chunk of it) { | ||
yield JSON.parse(chunk) | ||
yield parse(chunk) | ||
} | ||
@@ -7,0 +14,0 @@ } |
import {split} from './split' | ||
import {decodeText} from './decodeText' | ||
import {parseJSON} from './json' | ||
import {type JSONOptions, parseJSON} from './json' | ||
import {filter} from './filter' | ||
export function ndjson(it: AsyncIterableIterator<Uint8Array>) { | ||
return parseJSON(filter(split(decodeText(it), '\n'), (line) => Boolean(line && line.trim()))) | ||
export function parse<Type>( | ||
it: AsyncIterableIterator<string>, | ||
options?: JSONOptions<Type>, | ||
): AsyncIterableIterator<Type> { | ||
return parseJSON( | ||
filter(split(it, '\n'), (line) => Boolean(line && line.trim())), | ||
options, | ||
) | ||
} | ||
export async function* stringify(iterable: AsyncIterableIterator<unknown>) { | ||
for await (const doc of iterable) { | ||
yield `${JSON.stringify(doc)}\n` | ||
} | ||
} |
import {SanityDocument} from '@sanity/types' | ||
import {Migration} from '../types' | ||
import {Migration, MigrationContext} from '../types' | ||
import {normalizeMigrateDefinition} from './normalizeMigrateDefinition' | ||
@@ -7,11 +7,7 @@ | ||
migration: Migration, | ||
documents: AsyncIterableIterator<SanityDocument>, | ||
documents: () => AsyncIterableIterator<SanityDocument>, | ||
context: MigrationContext, | ||
) { | ||
const ctx = { | ||
withDocument: () => { | ||
throw new Error('Not implemented yet') | ||
}, | ||
} | ||
const migrate = normalizeMigrateDefinition(migration) | ||
return migrate(documents, ctx) | ||
return migrate(documents, context) | ||
} |
@@ -1,7 +0,13 @@ | ||
import {CompactFormatter} from '@bjoerge/mutiny' | ||
import {SanityDocument} from '@sanity/types' | ||
import {APIConfig, Migration} from '../types' | ||
import {ndjson} from '../it-utils/ndjson' | ||
import {fromExportEndpoint} from '../sources/fromExportEndpoint' | ||
import {fromExportEndpoint, safeJsonParser} from '../sources/fromExportEndpoint' | ||
import {streamToAsyncIterator} from '../utils/streamToAsyncIterator' | ||
import {bufferThroughFile} from '../fs-webstream/bufferThroughFile' | ||
import {asyncIterableToStream} from '../utils/asyncIterableToStream' | ||
import {parse} from '../it-utils/ndjson' | ||
import {decodeText} from '../it-utils' | ||
import {collectMigrationMutations} from './collectMigrationMutations' | ||
import {getBufferFilePath} from './utils/getBufferFile' | ||
import {createBufferFileContext} from './utils/createBufferFileContext' | ||
import {applyFilters} from './utils/applyFilters' | ||
@@ -13,11 +19,36 @@ interface MigrationRunnerOptions { | ||
export async function* dryRun(config: MigrationRunnerOptions, migration: Migration) { | ||
const filteredDocuments = applyFilters( | ||
migration, | ||
parse<SanityDocument>( | ||
decodeText( | ||
streamToAsyncIterator( | ||
await fromExportEndpoint({...config.api, documentTypes: migration.documentTypes}), | ||
), | ||
), | ||
{parse: safeJsonParser}, | ||
), | ||
) | ||
const abortController = new AbortController() | ||
const createReader = bufferThroughFile( | ||
asyncIterableToStream(filteredDocuments), | ||
getBufferFilePath(), | ||
{signal: abortController.signal}, | ||
) | ||
const context = createBufferFileContext(createReader) | ||
const mutations = collectMigrationMutations( | ||
migration, | ||
ndjson(await fromExportEndpoint(config.api)) as AsyncIterableIterator<SanityDocument>, | ||
() => parse(decodeText(streamToAsyncIterator(createReader())), {parse: safeJsonParser}), | ||
context, | ||
) | ||
// stop buffering the export once we're done collecting all mutations | ||
abortController.abort() | ||
for await (const mutation of mutations) { | ||
if (!mutation) continue | ||
yield CompactFormatter.format(Array.isArray(mutation) ? mutation : [mutation]) | ||
yield JSON.stringify(mutation, null, 2) | ||
} | ||
} |
@@ -1,5 +0,5 @@ | ||
import {at, Mutation, NodePatch, Operation, patch, Path} from '@bjoerge/mutiny' | ||
import {SanityDocument} from '@sanity/types' | ||
import {Path, SanityDocument} from '@sanity/types' | ||
import arrify from 'arrify' | ||
import {AsyncIterableMigration, Migration, NodeMigration, NodeMigrationContext} from '../types' | ||
import {at, Mutation, NodePatch, Operation, patch} from '../mutations' | ||
import {AsyncIterableMigration, Migration, MigrationContext, NodeMigration} from '../types' | ||
import {JsonArray, JsonObject, JsonValue} from '../json' | ||
@@ -16,3 +16,3 @@ import {flatMapDeep} from './utils/flatMapDeep' | ||
filter: migration.filter, | ||
documentType: migration.documentType, | ||
documentTypes: migration.documentTypes, | ||
}) | ||
@@ -42,8 +42,11 @@ } | ||
migration: NodeMigration, | ||
opts: {filter?: string; documentType?: string}, | ||
opts: {filter?: string; documentTypes?: string[]}, | ||
): AsyncIterableMigration { | ||
const documentTypesSet = new Set(opts.documentTypes) | ||
return async function* run(docs, context) { | ||
for await (const doc of docs) { | ||
if (doc._type !== opts.documentType) continue | ||
const documentMutations = collectDocumentMutations(migration, doc, context) | ||
for await (const doc of docs()) { | ||
if (!documentTypesSet.has(doc._type)) continue | ||
const documentMutations = await collectDocumentMutations(migration, doc, context) | ||
if (documentMutations.length > 0) { | ||
@@ -56,18 +59,22 @@ yield documentMutations | ||
function collectDocumentMutations( | ||
async function collectDocumentMutations( | ||
migration: NodeMigration, | ||
doc: SanityDocument, | ||
context: NodeMigrationContext, | ||
context: MigrationContext, | ||
) { | ||
const documentMutations = migration.document?.(doc, context) | ||
const nodeMigrations = flatMapDeep(doc as JsonValue, (value, path) => { | ||
return [ | ||
...arrify(migration.node?.(value, path, context)), | ||
...arrify(migrateNodeType(migration, value, path, context)), | ||
].map((change) => normalizeNodeMutation(path, change)) | ||
const documentMutations = Promise.resolve(migration.document?.(doc, context)) | ||
const nodeMigrations = flatMapDeep(doc as JsonValue, async (value, path) => { | ||
const [nodeReturnValues, nodeTypeReturnValues] = await Promise.all([ | ||
Promise.resolve(migration.node?.(value, path, context)), | ||
Promise.resolve(migrateNodeType(migration, value, path, context)), | ||
]) | ||
return [...arrify(nodeReturnValues), ...arrify(nodeTypeReturnValues)].map((change) => | ||
normalizeNodeMutation(path, change), | ||
) | ||
}) | ||
return [...arrify(documentMutations), ...nodeMigrations].map((change) => | ||
normalizeDocumentMutation(doc._id, change), | ||
) | ||
return (await Promise.all([...arrify(await documentMutations), ...nodeMigrations])) | ||
.flat() | ||
.map((change) => normalizeDocumentMutation(doc._id, change)) | ||
} | ||
@@ -100,3 +107,3 @@ | ||
path: Path, | ||
context: NodeMigrationContext, | ||
context: MigrationContext, | ||
) { | ||
@@ -103,0 +110,0 @@ switch (getValueType(value)) { |
import {SanityDocument} from '@sanity/types' | ||
import {MultipleMutationResult} from '@sanity/client' | ||
import {APIConfig, Migration} from '../types' | ||
import {ndjson} from '../it-utils/ndjson' | ||
import {fromExportEndpoint} from '../sources/fromExportEndpoint' | ||
import {toMutationEndpoint} from '../targets/toMutationEndpoint' | ||
import {MultipleMutationResult, Mutation as SanityMutation} from '@sanity/client' | ||
import arrify from 'arrify' | ||
import {APIConfig, Migration, MigrationProgress} from '../types' | ||
import {parse, stringify} from '../it-utils/ndjson' | ||
import {fromExportEndpoint, safeJsonParser} from '../sources/fromExportEndpoint' | ||
import {endpoints} from '../fetch-utils/endpoints' | ||
import {toFetchOptions} from '../fetch-utils/sanityRequestOptions' | ||
import {tap} from '../it-utils/tap' | ||
import {mapAsync} from '../it-utils/mapAsync' | ||
import {lastValueFrom} from '../it-utils/lastValueFrom' | ||
import {decodeText, parseJSON} from '../it-utils' | ||
import {concatStr} from '../it-utils/concatStr' | ||
import {fetchAsyncIterator, FetchOptions} from '../fetch-utils/fetchStream' | ||
import {bufferThroughFile} from '../fs-webstream/bufferThroughFile' | ||
import {streamToAsyncIterator} from '../utils/streamToAsyncIterator' | ||
import {asyncIterableToStream} from '../utils/asyncIterableToStream' | ||
import {toSanityMutations} from './utils/toSanityMutations' | ||
import { | ||
DEFAULT_MUTATION_CONCURRENCY, | ||
MAX_MUTATION_CONCURRENCY, | ||
MUTATION_ENDPOINT_MAX_BODY_SIZE, | ||
} from './constants' | ||
import {batchMutations} from './utils/batchMutations' | ||
import {collectMigrationMutations} from './collectMigrationMutations' | ||
import {getBufferFilePath} from './utils/getBufferFile' | ||
import {createBufferFileContext} from './utils/createBufferFileContext' | ||
import {applyFilters} from './utils/applyFilters' | ||
interface MigrationRunnerOptions { | ||
export interface MigrationRunnerConfig { | ||
api: APIConfig | ||
concurrency?: number | ||
onProgress?: (event: MigrationProgress) => void | ||
} | ||
export async function* run(config: MigrationRunnerOptions, migration: Migration) { | ||
const mutations = collectMigrationMutations( | ||
export async function* toFetchOptionsIterable( | ||
apiConfig: APIConfig, | ||
mutations: AsyncIterableIterator<SanityMutation[]>, | ||
) { | ||
for await (const mut of mutations) { | ||
yield toFetchOptions({ | ||
projectId: apiConfig.projectId, | ||
apiVersion: apiConfig.apiVersion, | ||
token: apiConfig.token, | ||
apiHost: apiConfig.apiHost ?? 'api.sanity.io', | ||
endpoint: endpoints.data.mutate(apiConfig.dataset, {returnIds: true}), | ||
body: JSON.stringify({mutations: mut}), | ||
}) | ||
} | ||
} | ||
export async function run(config: MigrationRunnerConfig, migration: Migration) { | ||
const stats: MigrationProgress = { | ||
documents: 0, | ||
mutations: 0, | ||
pending: 0, | ||
queuedBatches: 0, | ||
completedTransactions: [], | ||
currentMutations: [], | ||
} | ||
const filteredDocuments = applyFilters( | ||
migration, | ||
ndjson(await fromExportEndpoint(config.api)) as AsyncIterableIterator<SanityDocument>, | ||
parse<SanityDocument>( | ||
decodeText( | ||
streamToAsyncIterator( | ||
await fromExportEndpoint({...config.api, documentTypes: migration.documentTypes}), | ||
), | ||
), | ||
{parse: safeJsonParser}, | ||
), | ||
) | ||
const abortController = new AbortController() | ||
for await (const result of toMutationEndpoint(config.api, mutations)) { | ||
yield formatMutationResponse(result) | ||
const createReader = bufferThroughFile( | ||
asyncIterableToStream(stringify(filteredDocuments)), | ||
getBufferFilePath(), | ||
{signal: abortController.signal}, | ||
) | ||
const context = createBufferFileContext(createReader) | ||
const documents = () => | ||
tap( | ||
parse<SanityDocument>(decodeText(streamToAsyncIterator(createReader())), { | ||
parse: safeJsonParser, | ||
}), | ||
() => { | ||
config.onProgress?.({...stats, documents: ++stats.documents}) | ||
}, | ||
) | ||
const mutations = tap(collectMigrationMutations(migration, documents, context), (muts) => { | ||
stats.currentMutations = arrify(muts) | ||
config.onProgress?.({ | ||
...stats, | ||
mutations: ++stats.mutations, | ||
}) | ||
}) | ||
const concurrency = config?.concurrency ?? DEFAULT_MUTATION_CONCURRENCY | ||
if (concurrency > MAX_MUTATION_CONCURRENCY) { | ||
throw new Error(`Concurrency exceeds maximum allowed value (${MAX_MUTATION_CONCURRENCY})`) | ||
} | ||
} | ||
function formatMutationResponse(mutationResponse: MultipleMutationResult) { | ||
return `OK (transactionId = ${mutationResponse.transactionId}) | ||
${mutationResponse.results | ||
.map((result) => { | ||
return ` - ${result.operation}: ${result.id}` | ||
const batches = tap( | ||
batchMutations(toSanityMutations(mutations), MUTATION_ENDPOINT_MAX_BODY_SIZE), | ||
() => { | ||
config.onProgress?.({...stats, queuedBatches: ++stats.queuedBatches}) | ||
}, | ||
) | ||
const submit = async (opts: FetchOptions): Promise<MultipleMutationResult> => | ||
lastValueFrom(parseJSON(concatStr(decodeText(await fetchAsyncIterator(opts))))) | ||
const commits = await mapAsync( | ||
toFetchOptionsIterable(config.api, batches), | ||
(opts) => { | ||
config.onProgress?.({...stats, pending: ++stats.pending}) | ||
return submit(opts) | ||
}, | ||
concurrency, | ||
) | ||
for await (const result of commits) { | ||
stats.completedTransactions.push(result) | ||
config.onProgress?.({ | ||
...stats, | ||
}) | ||
} | ||
config.onProgress?.({ | ||
...stats, | ||
done: true, | ||
}) | ||
.join('\n')}` | ||
// Cancel export/buffer stream, it's not needed anymore | ||
abortController.abort() | ||
} |
@@ -1,2 +0,2 @@ | ||
import {Path, PathElement} from '@bjoerge/mutiny' | ||
import {Path, PathSegment} from '@sanity/types' | ||
import {JsonArray, JsonObject, JsonValue} from '../../json' | ||
@@ -17,3 +17,3 @@ import {getValueType} from './getValueType' | ||
container: JsonArray | JsonObject, | ||
): PathElement { | ||
): PathSegment { | ||
if ( | ||
@@ -20,0 +20,0 @@ item && |
import {maybeDecompress} from '../fs-webstream/maybeDecompress' | ||
import {untar} from '../tar-webstream/untar' | ||
import {streamAsyncIterator} from '../utils/streamToAsyncIterator' | ||
import {streamToAsyncIterator} from '../utils/streamToAsyncIterator' | ||
import {readFileAsWebStream} from '../fs-webstream/readFileAsWebStream' | ||
export async function* fromExportArchive(path: string) { | ||
for await (const [header, entry] of streamAsyncIterator( | ||
for await (const [header, entry] of streamToAsyncIterator( | ||
untar(await maybeDecompress(readFileAsWebStream(path))), | ||
)) { | ||
if (header.type === 'file' && header.name.endsWith('.ndjson')) { | ||
for await (const chunk of streamAsyncIterator(entry)) { | ||
for await (const chunk of streamToAsyncIterator(entry)) { | ||
yield chunk | ||
@@ -14,0 +14,0 @@ } |
@@ -1,8 +0,10 @@ | ||
import {fetchAsyncIterator} from '../fetch-utils/fetchStream' | ||
import {createSafeJsonParser} from '@sanity/util/createSafeJsonParser' | ||
import {SanityDocument} from '@sanity/types' | ||
import {fetchStream} from '../fetch-utils/fetchStream' | ||
import {toFetchOptions} from '../fetch-utils/sanityRequestOptions' | ||
import {endpoints} from '../fetch-utils/endpoints' | ||
import {APIConfig} from '../types' | ||
import {ExportAPIConfig} from '../types' | ||
export function fromExportEndpoint(options: APIConfig) { | ||
return fetchAsyncIterator( | ||
export function fromExportEndpoint(options: ExportAPIConfig) { | ||
return fetchStream( | ||
toFetchOptions({ | ||
@@ -13,5 +15,17 @@ projectId: options.projectId, | ||
apiHost: options.apiHost ?? 'api.sanity.io', | ||
endpoint: endpoints.data.export(options.dataset), | ||
endpoint: endpoints.data.export(options.dataset, options.documentTypes), | ||
}), | ||
) | ||
} | ||
/** | ||
* Safe JSON parser that is able to handle lines interrupted by an error object. | ||
* | ||
* This may occur when streaming NDJSON from the Export HTTP API. | ||
* | ||
* @internal | ||
* @see {@link https://github.com/sanity-io/sanity/pull/1787 | Initial pull request} | ||
*/ | ||
export const safeJsonParser = createSafeJsonParser<SanityDocument>({ | ||
errorLabel: 'Error streaming dataset', | ||
}) |
@@ -1,2 +0,2 @@ | ||
import {streamAsyncIterator} from '../../utils/streamToAsyncIterator' | ||
import {streamToAsyncIterator} from '../../utils/streamToAsyncIterator' | ||
import {untar} from '../untar' | ||
@@ -7,4 +7,4 @@ import {readFileAsWebStream} from '../../fs-webstream/readFileAsWebStream' | ||
const fileStream = readFileAsWebStream(file) | ||
for await (const [header, body] of streamAsyncIterator(untar(fileStream))) { | ||
yield [header.name, streamAsyncIterator(body)] | ||
for await (const [header, body] of streamToAsyncIterator(untar(fileStream))) { | ||
yield [header.name, streamToAsyncIterator(body)] | ||
} | ||
@@ -11,0 +11,0 @@ } |
@@ -1,2 +0,2 @@ | ||
import {streamAsyncIterator} from '../../utils/streamToAsyncIterator' | ||
import {streamToAsyncIterator} from '../../utils/streamToAsyncIterator' | ||
import {toArray} from '../../it-utils/toArray' | ||
@@ -33,5 +33,5 @@ import {untar} from '../untar' | ||
for await (const [header, body] of streamAsyncIterator(untar(fileStream))) { | ||
for await (const [header, body] of streamToAsyncIterator(untar(fileStream))) { | ||
if (header.name.includes(file)) { | ||
const chunks = await toArray(streamAsyncIterator(body)) | ||
const chunks = await toArray(streamToAsyncIterator(body)) | ||
const sum = await shasum(concatUint8Arrays(chunks)) | ||
@@ -38,0 +38,0 @@ expect(sum).toEqual('02c936cda5695fa4f43f5dc919c1f55c362faa6dd558dfb2d77d524f004069db') |
@@ -1,2 +0,2 @@ | ||
import {streamAsyncIterator} from '../../utils/streamToAsyncIterator' | ||
import {streamToAsyncIterator} from '../../utils/streamToAsyncIterator' | ||
import {toArray} from '../../it-utils/toArray' | ||
@@ -9,4 +9,4 @@ import {untar} from '../untar' | ||
const fileStream = readFileAsWebStream(file) | ||
for await (const [header, body] of streamAsyncIterator(untar(fileStream))) { | ||
const content = await toArray(decodeText(streamAsyncIterator(body))) | ||
for await (const [header, body] of streamToAsyncIterator(untar(fileStream))) { | ||
const content = await toArray(decodeText(streamToAsyncIterator(body))) | ||
yield [header.name, {type: header.type, content}] | ||
@@ -13,0 +13,0 @@ } |
@@ -1,2 +0,2 @@ | ||
import {streamAsyncIterator} from '../utils/streamToAsyncIterator' | ||
import {streamToAsyncIterator} from '../utils/streamToAsyncIterator' | ||
@@ -8,5 +8,5 @@ /** | ||
export async function drain(stream: ReadableStream<unknown>) { | ||
for await (const _ of streamAsyncIterator(stream)) { | ||
for await (const _ of streamToAsyncIterator(stream)) { | ||
// do nothing | ||
} | ||
} |
@@ -1,4 +0,5 @@ | ||
import type {SanityDocument} from '@sanity/types' | ||
import type {Mutation, NodePatch, Operation, Path} from '@bjoerge/mutiny' | ||
import type {SanityDocument, Path} from '@sanity/types' | ||
import {MultipleMutationResult} from '@sanity/client' | ||
import {JsonArray, JsonObject, JsonValue} from './json' | ||
import {Mutation, NodePatch, Operation} from './mutations' | ||
@@ -9,3 +10,3 @@ export type {Path} | ||
export type AsyncIterableMigration = ( | ||
documents: AsyncIterableIterator<SanityDocument>, | ||
documents: () => AsyncIterableIterator<SanityDocument>, | ||
context: MigrationContext, | ||
@@ -20,3 +21,8 @@ ) => AsyncGenerator<Mutation | Mutation[]> | ||
filter?: string | ||
documentType: string | ||
/** | ||
* What document types to migrate | ||
*/ | ||
documentTypes?: string[] | ||
migrate: Def | ||
@@ -27,4 +33,16 @@ } | ||
export type MigrationProgress = { | ||
documents: number | ||
mutations: number | ||
pending: number | ||
queuedBatches: number | ||
currentMutations: Mutation[] | ||
completedTransactions: MultipleMutationResult[] | ||
done?: boolean | ||
} | ||
export interface MigrationContext { | ||
withDocument(id: string): Promise<SanityDocument | null> | ||
getDocument<T extends SanityDocument>(id: string): Promise<T | undefined> | ||
getDocuments<T extends SanityDocument>(ids: string[]): Promise<T[]> | ||
query<T>(query: string, params?: Record<string, unknown>): Promise<T> | ||
} | ||
@@ -40,4 +58,4 @@ | ||
export interface NodeMigrationContext { | ||
withDocument(id: string): Promise<SanityDocument | null> | ||
export interface ExportAPIConfig extends APIConfig { | ||
documentTypes?: string[] | ||
} | ||
@@ -57,39 +75,39 @@ | ||
doc: Doc, | ||
context: NodeMigrationContext, | ||
) => DocumentMigrationReturnValue | ||
context: MigrationContext, | ||
) => DocumentMigrationReturnValue | Promise<DocumentMigrationReturnValue> | ||
node?: <Node extends JsonValue>( | ||
node: Node, | ||
path: Path, | ||
context: NodeMigrationContext, | ||
) => NodeMigrationReturnValue | ||
context: MigrationContext, | ||
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue> | ||
object?: <Node extends JsonObject>( | ||
node: Node, | ||
path: Path, | ||
context: NodeMigrationContext, | ||
) => NodeMigrationReturnValue | ||
context: MigrationContext, | ||
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue> | ||
array?: <Node extends JsonArray>( | ||
node: Node, | ||
path: Path, | ||
context: NodeMigrationContext, | ||
) => NodeMigrationReturnValue | ||
context: MigrationContext, | ||
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue> | ||
string?: <Node extends string>( | ||
node: Node, | ||
path: Path, | ||
context: NodeMigrationContext, | ||
) => NodeMigrationReturnValue | ||
context: MigrationContext, | ||
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue> | ||
number?: <Node extends number>( | ||
node: Node, | ||
path: Path, | ||
context: NodeMigrationContext, | ||
) => NodeMigrationReturnValue | ||
context: MigrationContext, | ||
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue> | ||
boolean?: <Node extends boolean>( | ||
node: Node, | ||
path: Path, | ||
context: NodeMigrationContext, | ||
) => NodeMigrationReturnValue | ||
context: MigrationContext, | ||
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue> | ||
null?: <Node extends null>( | ||
node: Node, | ||
path: Path, | ||
context: NodeMigrationContext, | ||
) => NodeMigrationReturnValue | ||
context: MigrationContext, | ||
) => NodeMigrationReturnValue | Promise<NodeMigrationReturnValue> | ||
} |
@@ -1,2 +0,2 @@ | ||
export async function* streamAsyncIterator<T>(stream: ReadableStream<T>) { | ||
export async function* streamToAsyncIterator<T>(stream: ReadableStream<T>) { | ||
// Get a lock on the stream | ||
@@ -3,0 +3,0 @@ const reader = stream.getReader() |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Manifest confusion
Supply chain riskThis package has inconsistent metadata. This could be malicious or caused by an error when publishing the package.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Manifest confusion
Supply chain riskThis package has inconsistent metadata. This could be malicious or caused by an error when publishing the package.
Found 1 instance in 1 package
5197042
286
8246
8
2
6
+ Added@sanity/client@^6.11.1
+ Addedgroq-js@^1.4.1
+ Addedp-map@^7.0.1
+ Added@sanity/client@6.22.0(transitive)
+ Added@sanity/eventsource@5.0.2(transitive)
+ Added@types/event-source-polyfill@1.0.5(transitive)
+ Added@types/eventsource@1.1.15(transitive)
+ Added@types/follow-redirects@1.14.4(transitive)
+ Added@types/node@22.7.4(transitive)
+ Added@types/progress-stream@2.0.5(transitive)
+ Addedcore-util-is@1.0.3(transitive)
+ Addeddebug@4.3.7(transitive)
+ Addeddecompress-response@7.0.0(transitive)
+ Addedevent-source-polyfill@1.0.31(transitive)
+ Addedeventsource@2.0.2(transitive)
+ Addedfollow-redirects@1.15.9(transitive)
+ Addedget-it@8.6.5(transitive)
+ Addedgroq-js@1.13.0(transitive)
+ Addedinherits@2.0.4(transitive)
+ Addedis-retry-allowed@2.2.0(transitive)
+ Addedisarray@1.0.0(transitive)
+ Addedmimic-response@3.1.0(transitive)
+ Addedms@2.1.3(transitive)
+ Addedp-map@7.0.2(transitive)
+ Addedprocess-nextick-args@2.0.1(transitive)
+ Addedprogress-stream@2.0.0(transitive)
+ Addedreadable-stream@2.3.8(transitive)
+ Addedrxjs@7.8.1(transitive)
+ Addedsafe-buffer@5.1.2(transitive)
+ Addedspeedometer@1.0.0(transitive)
+ Addedstring_decoder@1.1.1(transitive)
+ Addedthrough2@2.0.5(transitive)
+ Addedtslib@2.7.0(transitive)
+ Addedtunnel-agent@0.6.0(transitive)
+ Addedundici-types@6.19.8(transitive)
+ Addedutil-deprecate@1.0.2(transitive)
+ Addedxtend@4.0.2(transitive)