Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@directus/storage-driver-s3

Package Overview
Dependencies
Maintainers
0
Versions
48
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@directus/storage-driver-s3 - npm Package Compare versions

Comparing version 10.0.24-rc.0 to 10.1.0

26

dist/index.d.ts
import { ObjectCannedACL, ServerSideEncryption } from '@aws-sdk/client-s3';
import { Driver, Range } from '@directus/storage';
import { TusDriver, Range, ChunkedUploadContext } from '@directus/storage';
import { Readable } from 'node:stream';

@@ -15,7 +15,15 @@

forcePathStyle?: boolean;
tus?: {
chunkSize?: number;
};
};
declare class DriverS3 implements Driver {
declare class DriverS3 implements TusDriver {
private config;
private client;
private root;
private readonly client;
private readonly root;
private partUploadSemaphore;
private readonly preferredPartSize;
maxMultipartParts: 10000;
minPartSize: 5242880;
maxUploadSize: 5497558138880;
constructor(config: DriverS3Config);

@@ -35,4 +43,14 @@ private getClient;

list(prefix?: string): AsyncGenerator<string, void, unknown>;
get tusExtensions(): string[];
createChunkedUpload(filepath: string, context: ChunkedUploadContext): Promise<ChunkedUploadContext>;
deleteChunkedUpload(filepath: string, context: ChunkedUploadContext): Promise<void>;
finishChunkedUpload(filepath: string, context: ChunkedUploadContext): Promise<void>;
writeChunk(filepath: string, content: Readable, offset: number, context: ChunkedUploadContext): Promise<number>;
private uploadPart;
private uploadParts;
private retrieveParts;
private finishMultipartUpload;
private calcOptimalPartSize;
}
export { DriverS3, type DriverS3Config, DriverS3 as default };
// src/index.ts
import {
AbortMultipartUploadCommand,
CompleteMultipartUploadCommand,
CopyObjectCommand,
CreateMultipartUploadCommand,
DeleteObjectCommand,
DeleteObjectsCommand,
GetObjectCommand,
HeadObjectCommand,
ListObjectsV2Command,
S3Client
ListPartsCommand,
S3Client,
UploadPartCommand
} from "@aws-sdk/client-s3";

@@ -13,6 +19,11 @@ import { Upload } from "@aws-sdk/lib-storage";

import { isReadableStream } from "@directus/utils/node";
import { Semaphore } from "@shopify/semaphore";
import { NodeHttpHandler } from "@smithy/node-http-handler";
import { ERRORS, StreamSplitter, TUS_RESUMABLE } from "@tus/utils";
import fs, { promises as fsProm } from "fs";
import { Agent as HttpAgent } from "http";
import { Agent as HttpsAgent } from "https";
import os from "os";
import { join } from "path";
import { promises as streamProm } from "stream";
var DriverS3 = class {

@@ -22,2 +33,10 @@ config;

root;
// TUS specific members
partUploadSemaphore;
preferredPartSize;
maxMultipartParts = 1e4;
minPartSize = 5242880;
// 5MiB
maxUploadSize = 5497558138880;
// 5TiB
constructor(config) {

@@ -27,2 +46,4 @@ this.config = config;

this.root = this.config.root ? normalizePath(this.config.root, { removeLeading: true }) : "";
this.preferredPartSize = config.tus?.chunkSize ?? this.minPartSize;
this.partUploadSemaphore = new Semaphore(60);
}

@@ -79,7 +100,7 @@ getClient() {

}
const { Body: stream } = await this.client.send(new GetObjectCommand(commandInput));
if (!stream || !isReadableStream(stream)) {
const { Body: stream2 } = await this.client.send(new GetObjectCommand(commandInput));
if (!stream2 || !isReadableStream(stream2)) {
throw new Error(`No stream returned for file "${filepath}"`);
}
return stream;
return stream2;
}

@@ -173,2 +194,199 @@ async stat(filepath) {

}
// TUS implementation based on https://github.com/tus/tus-node-server
get tusExtensions() {
return ["creation", "termination", "expiration"];
}
async createChunkedUpload(filepath, context) {
const command = new CreateMultipartUploadCommand({
Bucket: this.config.bucket,
Key: this.fullPath(filepath),
Metadata: { "tus-version": TUS_RESUMABLE },
...context.metadata?.["contentType"] ? {
ContentType: context.metadata["contentType"]
} : {},
...context.metadata?.["cacheControl"] ? {
CacheControl: context.metadata["cacheControl"]
} : {}
});
const res = await this.client.send(command);
context.metadata["upload-id"] = res.UploadId;
return context;
}
async deleteChunkedUpload(filepath, context) {
const key = this.fullPath(filepath);
try {
const { "upload-id": uploadId } = context.metadata;
if (uploadId) {
await this.client.send(
new AbortMultipartUploadCommand({
Bucket: this.config.bucket,
Key: key,
UploadId: uploadId
})
);
}
} catch (error) {
if (error?.code && ["NotFound", "NoSuchKey", "NoSuchUpload"].includes(error.Code)) {
throw ERRORS.FILE_NOT_FOUND;
}
throw error;
}
await this.client.send(
new DeleteObjectsCommand({
Bucket: this.config.bucket,
Delete: {
Objects: [{ Key: key }]
}
})
);
}
async finishChunkedUpload(filepath, context) {
const key = this.fullPath(filepath);
const uploadId = context.metadata["upload-id"];
const size = context.size;
const chunkSize = this.calcOptimalPartSize(size);
const expectedParts = Math.ceil(size / chunkSize);
let parts = await this.retrieveParts(key, uploadId);
let retries = 0;
while (parts.length !== expectedParts && retries < 3) {
++retries;
await new Promise((resolve) => setTimeout(resolve, 500 * retries));
parts = await this.retrieveParts(key, uploadId);
}
if (parts.length !== expectedParts) {
throw {
status_code: 500,
body: "Failed to upload all parts to S3."
};
}
await this.finishMultipartUpload(key, uploadId, parts);
}
async writeChunk(filepath, content, offset, context) {
const key = this.fullPath(filepath);
const uploadId = context.metadata["upload-id"];
const size = context.size;
const parts = await this.retrieveParts(key, uploadId);
const partNumber = parts.length > 0 ? parts[parts.length - 1].PartNumber : 0;
const nextPartNumber = partNumber + 1;
const requestedOffset = offset;
const bytesUploaded = await this.uploadParts(key, uploadId, size, content, nextPartNumber, offset);
return requestedOffset + bytesUploaded;
}
async uploadPart(key, uploadId, readStream, partNumber) {
const data = await this.client.send(
new UploadPartCommand({
Bucket: this.config.bucket,
Key: key,
UploadId: uploadId,
PartNumber: partNumber,
Body: readStream
})
);
return data.ETag;
}
async uploadParts(key, uploadId, size, readStream, currentPartNumber, offset) {
const promises = [];
let pendingChunkFilepath = null;
let bytesUploaded = 0;
let permit = void 0;
const splitterStream = new StreamSplitter({
chunkSize: this.calcOptimalPartSize(size),
directory: os.tmpdir()
}).on("beforeChunkStarted", async () => {
permit = await this.partUploadSemaphore.acquire();
}).on("chunkStarted", (filepath) => {
pendingChunkFilepath = filepath;
}).on("chunkFinished", ({ path, size: partSize }) => {
pendingChunkFilepath = null;
const partNumber = currentPartNumber++;
const acquiredPermit = permit;
offset += partSize;
const isFinalPart = size === offset;
const deferred = new Promise(async (resolve, reject) => {
try {
const readable = fs.createReadStream(path);
readable.on("error", reject);
if (partSize >= this.minPartSize || isFinalPart) {
await this.uploadPart(key, uploadId, readable, partNumber);
bytesUploaded += partSize;
} else {
}
resolve();
} catch (error) {
reject(error);
} finally {
fsProm.rm(path).catch(() => {
});
acquiredPermit?.release();
}
});
promises.push(deferred);
}).on("chunkError", () => {
permit?.release();
});
try {
await streamProm.pipeline(readStream, splitterStream);
} catch (error) {
if (pendingChunkFilepath !== null) {
try {
await fsProm.rm(pendingChunkFilepath);
} catch {
}
}
promises.push(Promise.reject(error));
} finally {
await Promise.all(promises);
}
return bytesUploaded;
}
async retrieveParts(key, uploadId, partNumberMarker) {
const data = await this.client.send(
new ListPartsCommand({
Bucket: this.config.bucket,
Key: key,
UploadId: uploadId,
PartNumberMarker: partNumberMarker
})
);
let parts = data.Parts ?? [];
if (data.IsTruncated) {
const rest = await this.retrieveParts(key, uploadId, data.NextPartNumberMarker);
parts = [...parts, ...rest];
}
if (!partNumberMarker) {
parts.sort((a, b) => a.PartNumber - b.PartNumber);
}
return parts;
}
async finishMultipartUpload(key, uploadId, parts) {
const command = new CompleteMultipartUploadCommand({
Bucket: this.config.bucket,
Key: key,
UploadId: uploadId,
MultipartUpload: {
Parts: parts.map((part) => {
return {
ETag: part.ETag,
PartNumber: part.PartNumber
};
})
}
});
const response = await this.client.send(command);
return response.Location;
}
calcOptimalPartSize(size) {
if (size === void 0) {
size = this.maxUploadSize;
}
let optimalPartSize;
if (size <= this.preferredPartSize) {
optimalPartSize = size;
} else if (size <= this.preferredPartSize * this.maxMultipartParts) {
optimalPartSize = this.preferredPartSize;
} else {
optimalPartSize = Math.ceil(size / this.maxMultipartParts);
}
return optimalPartSize;
}
};

@@ -175,0 +393,0 @@ var src_default = DriverS3;

8

package.json
{
"name": "@directus/storage-driver-s3",
"version": "10.0.24-rc.0",
"version": "10.1.0",
"description": "S3 file storage abstraction for `@directus/storage`",

@@ -26,5 +26,7 @@ "homepage": "https://directus.io",

"@aws-sdk/lib-storage": "3.569.0",
"@shopify/semaphore": "3.1.0",
"@smithy/node-http-handler": "2.5.0",
"@directus/storage": "10.0.13",
"@directus/utils": "12.0.0-rc.0"
"@tus/utils": "0.2.0",
"@directus/storage": "10.1.0",
"@directus/utils": "11.0.10"
},

@@ -31,0 +33,0 @@ "devDependencies": {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc