@directus/storage-driver-s3
Advanced tools
Comparing version 10.0.24-rc.0 to 10.1.0
import { ObjectCannedACL, ServerSideEncryption } from '@aws-sdk/client-s3'; | ||
import { Driver, Range } from '@directus/storage'; | ||
import { TusDriver, Range, ChunkedUploadContext } from '@directus/storage'; | ||
import { Readable } from 'node:stream'; | ||
@@ -15,7 +15,15 @@ | ||
forcePathStyle?: boolean; | ||
tus?: { | ||
chunkSize?: number; | ||
}; | ||
}; | ||
declare class DriverS3 implements Driver { | ||
declare class DriverS3 implements TusDriver { | ||
private config; | ||
private client; | ||
private root; | ||
private readonly client; | ||
private readonly root; | ||
private partUploadSemaphore; | ||
private readonly preferredPartSize; | ||
maxMultipartParts: 10000; | ||
minPartSize: 5242880; | ||
maxUploadSize: 5497558138880; | ||
constructor(config: DriverS3Config); | ||
@@ -35,4 +43,14 @@ private getClient; | ||
list(prefix?: string): AsyncGenerator<string, void, unknown>; | ||
get tusExtensions(): string[]; | ||
createChunkedUpload(filepath: string, context: ChunkedUploadContext): Promise<ChunkedUploadContext>; | ||
deleteChunkedUpload(filepath: string, context: ChunkedUploadContext): Promise<void>; | ||
finishChunkedUpload(filepath: string, context: ChunkedUploadContext): Promise<void>; | ||
writeChunk(filepath: string, content: Readable, offset: number, context: ChunkedUploadContext): Promise<number>; | ||
private uploadPart; | ||
private uploadParts; | ||
private retrieveParts; | ||
private finishMultipartUpload; | ||
private calcOptimalPartSize; | ||
} | ||
export { DriverS3, type DriverS3Config, DriverS3 as default }; |
// src/index.ts | ||
import { | ||
AbortMultipartUploadCommand, | ||
CompleteMultipartUploadCommand, | ||
CopyObjectCommand, | ||
CreateMultipartUploadCommand, | ||
DeleteObjectCommand, | ||
DeleteObjectsCommand, | ||
GetObjectCommand, | ||
HeadObjectCommand, | ||
ListObjectsV2Command, | ||
S3Client | ||
ListPartsCommand, | ||
S3Client, | ||
UploadPartCommand | ||
} from "@aws-sdk/client-s3"; | ||
@@ -13,6 +19,11 @@ import { Upload } from "@aws-sdk/lib-storage"; | ||
import { isReadableStream } from "@directus/utils/node"; | ||
import { Semaphore } from "@shopify/semaphore"; | ||
import { NodeHttpHandler } from "@smithy/node-http-handler"; | ||
import { ERRORS, StreamSplitter, TUS_RESUMABLE } from "@tus/utils"; | ||
import fs, { promises as fsProm } from "fs"; | ||
import { Agent as HttpAgent } from "http"; | ||
import { Agent as HttpsAgent } from "https"; | ||
import os from "os"; | ||
import { join } from "path"; | ||
import { promises as streamProm } from "stream"; | ||
var DriverS3 = class { | ||
@@ -22,2 +33,10 @@ config; | ||
root; | ||
// TUS specific members | ||
partUploadSemaphore; | ||
preferredPartSize; | ||
maxMultipartParts = 1e4; | ||
minPartSize = 5242880; | ||
// 5MiB | ||
maxUploadSize = 5497558138880; | ||
// 5TiB | ||
constructor(config) { | ||
@@ -27,2 +46,4 @@ this.config = config; | ||
this.root = this.config.root ? normalizePath(this.config.root, { removeLeading: true }) : ""; | ||
this.preferredPartSize = config.tus?.chunkSize ?? this.minPartSize; | ||
this.partUploadSemaphore = new Semaphore(60); | ||
} | ||
@@ -79,7 +100,7 @@ getClient() { | ||
} | ||
const { Body: stream } = await this.client.send(new GetObjectCommand(commandInput)); | ||
if (!stream || !isReadableStream(stream)) { | ||
const { Body: stream2 } = await this.client.send(new GetObjectCommand(commandInput)); | ||
if (!stream2 || !isReadableStream(stream2)) { | ||
throw new Error(`No stream returned for file "${filepath}"`); | ||
} | ||
return stream; | ||
return stream2; | ||
} | ||
@@ -173,2 +194,199 @@ async stat(filepath) { | ||
} | ||
// TUS implementation based on https://github.com/tus/tus-node-server | ||
get tusExtensions() { | ||
return ["creation", "termination", "expiration"]; | ||
} | ||
async createChunkedUpload(filepath, context) { | ||
const command = new CreateMultipartUploadCommand({ | ||
Bucket: this.config.bucket, | ||
Key: this.fullPath(filepath), | ||
Metadata: { "tus-version": TUS_RESUMABLE }, | ||
...context.metadata?.["contentType"] ? { | ||
ContentType: context.metadata["contentType"] | ||
} : {}, | ||
...context.metadata?.["cacheControl"] ? { | ||
CacheControl: context.metadata["cacheControl"] | ||
} : {} | ||
}); | ||
const res = await this.client.send(command); | ||
context.metadata["upload-id"] = res.UploadId; | ||
return context; | ||
} | ||
async deleteChunkedUpload(filepath, context) { | ||
const key = this.fullPath(filepath); | ||
try { | ||
const { "upload-id": uploadId } = context.metadata; | ||
if (uploadId) { | ||
await this.client.send( | ||
new AbortMultipartUploadCommand({ | ||
Bucket: this.config.bucket, | ||
Key: key, | ||
UploadId: uploadId | ||
}) | ||
); | ||
} | ||
} catch (error) { | ||
if (error?.code && ["NotFound", "NoSuchKey", "NoSuchUpload"].includes(error.Code)) { | ||
throw ERRORS.FILE_NOT_FOUND; | ||
} | ||
throw error; | ||
} | ||
await this.client.send( | ||
new DeleteObjectsCommand({ | ||
Bucket: this.config.bucket, | ||
Delete: { | ||
Objects: [{ Key: key }] | ||
} | ||
}) | ||
); | ||
} | ||
async finishChunkedUpload(filepath, context) { | ||
const key = this.fullPath(filepath); | ||
const uploadId = context.metadata["upload-id"]; | ||
const size = context.size; | ||
const chunkSize = this.calcOptimalPartSize(size); | ||
const expectedParts = Math.ceil(size / chunkSize); | ||
let parts = await this.retrieveParts(key, uploadId); | ||
let retries = 0; | ||
while (parts.length !== expectedParts && retries < 3) { | ||
++retries; | ||
await new Promise((resolve) => setTimeout(resolve, 500 * retries)); | ||
parts = await this.retrieveParts(key, uploadId); | ||
} | ||
if (parts.length !== expectedParts) { | ||
throw { | ||
status_code: 500, | ||
body: "Failed to upload all parts to S3." | ||
}; | ||
} | ||
await this.finishMultipartUpload(key, uploadId, parts); | ||
} | ||
async writeChunk(filepath, content, offset, context) { | ||
const key = this.fullPath(filepath); | ||
const uploadId = context.metadata["upload-id"]; | ||
const size = context.size; | ||
const parts = await this.retrieveParts(key, uploadId); | ||
const partNumber = parts.length > 0 ? parts[parts.length - 1].PartNumber : 0; | ||
const nextPartNumber = partNumber + 1; | ||
const requestedOffset = offset; | ||
const bytesUploaded = await this.uploadParts(key, uploadId, size, content, nextPartNumber, offset); | ||
return requestedOffset + bytesUploaded; | ||
} | ||
async uploadPart(key, uploadId, readStream, partNumber) { | ||
const data = await this.client.send( | ||
new UploadPartCommand({ | ||
Bucket: this.config.bucket, | ||
Key: key, | ||
UploadId: uploadId, | ||
PartNumber: partNumber, | ||
Body: readStream | ||
}) | ||
); | ||
return data.ETag; | ||
} | ||
async uploadParts(key, uploadId, size, readStream, currentPartNumber, offset) { | ||
const promises = []; | ||
let pendingChunkFilepath = null; | ||
let bytesUploaded = 0; | ||
let permit = void 0; | ||
const splitterStream = new StreamSplitter({ | ||
chunkSize: this.calcOptimalPartSize(size), | ||
directory: os.tmpdir() | ||
}).on("beforeChunkStarted", async () => { | ||
permit = await this.partUploadSemaphore.acquire(); | ||
}).on("chunkStarted", (filepath) => { | ||
pendingChunkFilepath = filepath; | ||
}).on("chunkFinished", ({ path, size: partSize }) => { | ||
pendingChunkFilepath = null; | ||
const partNumber = currentPartNumber++; | ||
const acquiredPermit = permit; | ||
offset += partSize; | ||
const isFinalPart = size === offset; | ||
const deferred = new Promise(async (resolve, reject) => { | ||
try { | ||
const readable = fs.createReadStream(path); | ||
readable.on("error", reject); | ||
if (partSize >= this.minPartSize || isFinalPart) { | ||
await this.uploadPart(key, uploadId, readable, partNumber); | ||
bytesUploaded += partSize; | ||
} else { | ||
} | ||
resolve(); | ||
} catch (error) { | ||
reject(error); | ||
} finally { | ||
fsProm.rm(path).catch(() => { | ||
}); | ||
acquiredPermit?.release(); | ||
} | ||
}); | ||
promises.push(deferred); | ||
}).on("chunkError", () => { | ||
permit?.release(); | ||
}); | ||
try { | ||
await streamProm.pipeline(readStream, splitterStream); | ||
} catch (error) { | ||
if (pendingChunkFilepath !== null) { | ||
try { | ||
await fsProm.rm(pendingChunkFilepath); | ||
} catch { | ||
} | ||
} | ||
promises.push(Promise.reject(error)); | ||
} finally { | ||
await Promise.all(promises); | ||
} | ||
return bytesUploaded; | ||
} | ||
async retrieveParts(key, uploadId, partNumberMarker) { | ||
const data = await this.client.send( | ||
new ListPartsCommand({ | ||
Bucket: this.config.bucket, | ||
Key: key, | ||
UploadId: uploadId, | ||
PartNumberMarker: partNumberMarker | ||
}) | ||
); | ||
let parts = data.Parts ?? []; | ||
if (data.IsTruncated) { | ||
const rest = await this.retrieveParts(key, uploadId, data.NextPartNumberMarker); | ||
parts = [...parts, ...rest]; | ||
} | ||
if (!partNumberMarker) { | ||
parts.sort((a, b) => a.PartNumber - b.PartNumber); | ||
} | ||
return parts; | ||
} | ||
async finishMultipartUpload(key, uploadId, parts) { | ||
const command = new CompleteMultipartUploadCommand({ | ||
Bucket: this.config.bucket, | ||
Key: key, | ||
UploadId: uploadId, | ||
MultipartUpload: { | ||
Parts: parts.map((part) => { | ||
return { | ||
ETag: part.ETag, | ||
PartNumber: part.PartNumber | ||
}; | ||
}) | ||
} | ||
}); | ||
const response = await this.client.send(command); | ||
return response.Location; | ||
} | ||
calcOptimalPartSize(size) { | ||
if (size === void 0) { | ||
size = this.maxUploadSize; | ||
} | ||
let optimalPartSize; | ||
if (size <= this.preferredPartSize) { | ||
optimalPartSize = size; | ||
} else if (size <= this.preferredPartSize * this.maxMultipartParts) { | ||
optimalPartSize = this.preferredPartSize; | ||
} else { | ||
optimalPartSize = Math.ceil(size / this.maxMultipartParts); | ||
} | ||
return optimalPartSize; | ||
} | ||
}; | ||
@@ -175,0 +393,0 @@ var src_default = DriverS3; |
{ | ||
"name": "@directus/storage-driver-s3", | ||
"version": "10.0.24-rc.0", | ||
"version": "10.1.0", | ||
"description": "S3 file storage abstraction for `@directus/storage`", | ||
@@ -26,5 +26,7 @@ "homepage": "https://directus.io", | ||
"@aws-sdk/lib-storage": "3.569.0", | ||
"@shopify/semaphore": "3.1.0", | ||
"@smithy/node-http-handler": "2.5.0", | ||
"@directus/storage": "10.0.13", | ||
"@directus/utils": "12.0.0-rc.0" | ||
"@tus/utils": "0.2.0", | ||
"@directus/storage": "10.1.0", | ||
"@directus/utils": "11.0.10" | ||
}, | ||
@@ -31,0 +33,0 @@ "devDependencies": { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Filesystem access
Supply chain riskAccesses the file system, and could potentially read sensitive data.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
20847
444
0
7
1
+ Added@shopify/semaphore@3.1.0
+ Added@tus/utils@0.2.0
+ Added@directus/constants@11.0.4(transitive)
+ Added@directus/storage@10.1.0(transitive)
+ Added@directus/system-data@1.1.0(transitive)
+ Added@directus/utils@11.0.10(transitive)
+ Added@shopify/semaphore@3.1.0(transitive)
+ Added@tus/utils@0.2.0(transitive)
+ Addedjoi@17.13.3(transitive)
- Removed@directus/constants@11.1.0-rc.0(transitive)
- Removed@directus/storage@10.0.13(transitive)
- Removed@directus/system-data@2.0.0-rc.0(transitive)
- Removed@directus/utils@12.0.0-rc.0(transitive)
- Removedjoi@17.13.1(transitive)
Updated@directus/storage@10.1.0
Updated@directus/utils@11.0.10