@elastic.io/object-storage-client
Advanced tools
Comparing version 0.0.1-dev.9 to 0.0.1-dev.10
import ObjectStorage from './object-storage'; | ||
import Message from './message'; | ||
export { Message, ObjectStorage }; | ||
export { ObjectStorage }; | ||
export { ObjectDTO } from './object-storage'; |
@@ -8,3 +8,1 @@ "use strict"; | ||
exports.ObjectStorage = object_storage_1.default; | ||
const message_1 = __importDefault(require("./message")); | ||
exports.Message = message_1.default; |
/// <reference types="node" /> | ||
import { Readable } from 'stream'; | ||
import { Readable, Duplex } from 'stream'; | ||
interface JWTPayload { | ||
[index: string]: string; | ||
} | ||
export interface ObjectDTO { | ||
stream?: Readable; | ||
headers: any; | ||
} | ||
export declare type TransformMiddleware = Duplex; | ||
export default class ObjectStorage { | ||
private api; | ||
private message; | ||
private readonly jwtSecret?; | ||
private static httpAgent; | ||
private static httpsAgent; | ||
private client; | ||
private forwards; | ||
private reverses; | ||
constructor(config: { | ||
uri: string; | ||
jwtSecret?: string; | ||
cipher: { | ||
key: string; | ||
iv: string; | ||
}; | ||
}); | ||
private requestRetry; | ||
private getHeaders; | ||
getAsJSON(objectId: string, jwtPayloadOrToken: JWTPayload | string): Promise<any>; | ||
getAsStream(objectId: string, jwtPayloadOrToken: JWTPayload | string): Promise<Readable>; | ||
addAsStream(stream: Readable, jwtPayloadOrToken: JWTPayload | string): Promise<string>; | ||
use(forward: TransformMiddleware, reverse: TransformMiddleware): ObjectStorage; | ||
getAsJSON(objectId: string, jwtPayloadOrToken?: JWTPayload | string): Promise<any>; | ||
getAsStream(objectId: string, jwtPayloadOrToken?: JWTPayload | string): Promise<ObjectDTO>; | ||
addAsStream(stream: Readable, jwtPayloadOrToken?: JWTPayload | string): Promise<string>; | ||
private applyMiddlewares; | ||
addAsJSON(data: any, jwtPayloadOrToken: JWTPayload | string): Promise<string>; | ||
} | ||
export {}; |
@@ -7,103 +7,47 @@ "use strict"; | ||
const stream_1 = require("stream"); | ||
const uuid_1 = __importDefault(require("uuid")); | ||
const axios_1 = __importDefault(require("axios")); | ||
const http_1 = __importDefault(require("http")); | ||
const https_1 = __importDefault(require("https")); | ||
const logger_1 = __importDefault(require("./logger")); | ||
const message_1 = __importDefault(require("./message")); | ||
const get_stream_1 = __importDefault(require("get-stream")); | ||
const jsonwebtoken_1 = require("jsonwebtoken"); | ||
const util_1 = require("util"); | ||
const storage_client_1 = __importDefault(require("./storage-client")); | ||
; | ||
; | ||
class ObjectStorage { | ||
constructor(config) { | ||
this.api = axios_1.default.create({ | ||
baseURL: `${config.uri}/`, | ||
httpAgent: ObjectStorage.httpAgent, | ||
httpsAgent: ObjectStorage.httpsAgent, | ||
validateStatus: null, | ||
maxContentLength: Infinity, | ||
maxRedirects: 0 | ||
}); | ||
this.jwtSecret = config.jwtSecret; | ||
this.message = new message_1.default(config.cipher); | ||
this.client = new storage_client_1.default({ uri: config.uri, jwtSecret: config.jwtSecret }); | ||
this.forwards = []; | ||
this.reverses = []; | ||
} | ||
async requestRetry(request, { maxAttempts = 3, delay = 100, onResponse } = {}) { | ||
let attempts = 0; | ||
let res; | ||
let err; | ||
while (attempts < maxAttempts) { | ||
err = null; | ||
res = null; | ||
attempts++; | ||
try { | ||
res = await request(); | ||
} | ||
catch (e) { | ||
err = e; | ||
} | ||
if (onResponse && onResponse(err, res)) { | ||
continue; | ||
} | ||
// last attempt error should not be logged | ||
if ((err || res.status >= 400) && attempts < maxAttempts) { | ||
logger_1.default.warn('Error during object request: %s', err || `${res.status} (${res.statusText})`); | ||
await new Promise((resolve) => setTimeout(resolve, delay)); | ||
continue; | ||
} | ||
break; | ||
} | ||
if (err || res.status >= 400) { | ||
throw err || new Error(`HTTP error during object request: ${res.status} (${res.statusText})`); | ||
} | ||
return res; | ||
use(forward, reverse) { | ||
this.forwards.push(forward); | ||
this.reverses.unshift(reverse); | ||
return this; | ||
} | ||
async getHeaders(jwtPayloadOrToken, override) { | ||
if (typeof jwtPayloadOrToken !== 'string' && !this.jwtSecret) { | ||
throw new Error('Neither JWT token passed, nor JWT secret provided during initialization'); | ||
} | ||
const token = typeof jwtPayloadOrToken === 'string' ? jwtPayloadOrToken : await util_1.promisify(jsonwebtoken_1.sign)(jwtPayloadOrToken, this.jwtSecret); | ||
return Object.assign({ Authorization: `Bearer ${token}` }, override); | ||
} | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
async getAsJSON(objectId, jwtPayloadOrToken) { | ||
const content = await get_stream_1.default(await this.getAsStream(objectId, jwtPayloadOrToken)); | ||
return JSON.parse(content); | ||
const objectDTO = await this.getAsStream(objectId, jwtPayloadOrToken); | ||
const data = await get_stream_1.default(objectDTO.stream); | ||
return JSON.parse(data); | ||
} | ||
async getAsStream(objectId, jwtPayloadOrToken) { | ||
const res = await this.requestRetry(async () => this.api.get(`/objects/${objectId}`, { responseType: 'stream', headers: await this.getHeaders(jwtPayloadOrToken) })); | ||
return this.message.unpackStream(res.data); | ||
const res = await this.client.readStream(objectId, jwtPayloadOrToken); | ||
const resultStream = this.applyMiddlewares(res.data, this.reverses); | ||
return { stream: resultStream, headers: res.headers }; | ||
} | ||
async addAsStream(stream, jwtPayloadOrToken) { | ||
let objectId = uuid_1.default.v4(); | ||
const res = await this.api.put(`/objects/${objectId}`, this.message.packStream(stream), { headers: await this.getHeaders(jwtPayloadOrToken, { 'content-type': 'application/octet-stream' }) }); | ||
if (res.status >= 400) { | ||
throw new Error(`HTTP error during object add: ${res.status} (${res.statusText})`); | ||
} | ||
return objectId; | ||
const resultStream = this.applyMiddlewares(stream, this.forwards); | ||
const res = await this.client.writeStream(resultStream, jwtPayloadOrToken); | ||
return res.data.objectId; | ||
} | ||
applyMiddlewares(stream, middlewares) { | ||
return middlewares.reduce((stream, middleware) => { | ||
return stream.pipe(middleware); | ||
}, stream); | ||
} | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
async addAsJSON(data, jwtPayloadOrToken) { | ||
let objectId = uuid_1.default.v4(); | ||
const dataString = JSON.stringify(data); | ||
await this.requestRetry(async () => { | ||
const dataStream = new stream_1.Readable(); | ||
dataStream.push(dataString); | ||
dataStream.push(null); | ||
return this.api.put(`/objects/${objectId}`, this.message.packStream(dataStream), { headers: await this.getHeaders(jwtPayloadOrToken, { 'content-type': 'application/octet-stream' }) }); | ||
}, { | ||
onResponse: (err, res) => { | ||
if (!err && res.status === 409) { | ||
logger_1.default.warn('Generated already existing UUID'); | ||
objectId = uuid_1.default.v4(); | ||
return true; | ||
} | ||
} | ||
}); | ||
return objectId; | ||
const dataStream = new stream_1.Readable(); | ||
dataStream.push(dataString); | ||
dataStream.push(null); | ||
return this.addAsStream(dataStream, jwtPayloadOrToken); | ||
} | ||
} | ||
// you will be able to create a lot of ObjectStorage instances and they will all use single http agent | ||
ObjectStorage.httpAgent = new http_1.default.Agent({ keepAlive: true }); | ||
ObjectStorage.httpsAgent = new https_1.default.Agent({ keepAlive: true }); | ||
exports.default = ObjectStorage; |
{ | ||
"name": "@elastic.io/object-storage-client", | ||
"version": "0.0.1-dev.9", | ||
"version": "0.0.1-dev.10", | ||
"description": "Elastic.io Message Store Client", | ||
@@ -43,3 +43,3 @@ "main": "dist/index.js", | ||
"eslint-plugin-standard": "4.0.0", | ||
"mocha": "6.1.4", | ||
"mocha": "8.0.1", | ||
"nock": "10.0.6", | ||
@@ -46,0 +46,0 @@ "sinon": "7.3.2", |
import ObjectStorage from './object-storage'; | ||
import Message from './message'; | ||
export { Message, ObjectStorage }; | ||
export { ObjectStorage }; | ||
export { ObjectDTO } from './object-storage'; |
@@ -1,131 +0,65 @@ | ||
import { Readable } from 'stream'; | ||
import uuid from 'uuid'; | ||
import axios, { AxiosInstance, AxiosResponse } from 'axios'; | ||
import http from 'http'; | ||
import https from 'https'; | ||
import log from './logger'; | ||
import Message from './message'; | ||
import { Readable, Duplex } from 'stream'; | ||
import getStream from 'get-stream'; | ||
import { sign } from 'jsonwebtoken'; | ||
import { promisify } from 'util'; | ||
import StorageClient from './storage-client'; | ||
interface JWTPayload { [index: string]: string }; | ||
export interface ObjectDTO { | ||
stream?: Readable; | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
headers: any; | ||
}; | ||
export type TransformMiddleware = Duplex; | ||
export default class ObjectStorage { | ||
private api: AxiosInstance; | ||
private message: Message; | ||
private readonly jwtSecret?: string; | ||
// you will be able to create a lot of ObjectStorage instances and they will all use single http agent | ||
private static httpAgent = new http.Agent({ keepAlive: true }); | ||
private static httpsAgent = new https.Agent({ keepAlive: true }); | ||
private client: StorageClient; | ||
private forwards: TransformMiddleware[]; | ||
private reverses: TransformMiddleware[]; | ||
public constructor (config: {uri: string; jwtSecret?: string; cipher: {key: string; iv: string}}) { | ||
this.api = axios.create({ | ||
baseURL: `${config.uri}/`, | ||
httpAgent: ObjectStorage.httpAgent, | ||
httpsAgent: ObjectStorage.httpsAgent, | ||
validateStatus: null, | ||
maxContentLength: Infinity, | ||
maxRedirects: 0 | ||
}); | ||
this.jwtSecret = config.jwtSecret; | ||
this.message = new Message(config.cipher); | ||
public constructor(config: {uri: string; jwtSecret?: string }) { | ||
this.client = new StorageClient({ uri: config.uri, jwtSecret: config.jwtSecret }); | ||
this.forwards = []; | ||
this.reverses = []; | ||
} | ||
private async requestRetry (request: () => Promise<AxiosResponse>, { maxAttempts = 3, delay = 100, onResponse }: { maxAttempts?: number; delay?: number; onResponse?: (err: Error, res: AxiosResponse) => boolean } = {}): Promise<AxiosResponse> { | ||
let attempts = 0; | ||
let res; | ||
let err; | ||
while (attempts < maxAttempts) { | ||
err = null; | ||
res = null; | ||
attempts++; | ||
try { | ||
res = await request(); | ||
} catch (e) { | ||
err = e; | ||
} | ||
if (onResponse && onResponse(err, res)) { | ||
continue; | ||
} | ||
// last attempt error should not be logged | ||
if ((err || res.status >= 400) && attempts < maxAttempts) { | ||
log.warn('Error during object request: %s', err || `${res.status} (${res.statusText})`); | ||
await new Promise((resolve): NodeJS.Timeout => setTimeout(resolve, delay)); | ||
continue; | ||
} | ||
break; | ||
} | ||
if (err || res.status >= 400) { | ||
throw err || new Error(`HTTP error during object request: ${res.status} (${res.statusText})`); | ||
} | ||
return res; | ||
public use(forward: TransformMiddleware, reverse: TransformMiddleware): ObjectStorage { | ||
this.forwards.push(forward); | ||
this.reverses.unshift(reverse); | ||
return this; | ||
} | ||
private async getHeaders (jwtPayloadOrToken: JWTPayload | string, override?: { [index: string]: string }) { | ||
if (typeof jwtPayloadOrToken !== 'string' && !this.jwtSecret) { | ||
throw new Error('Neither JWT token passed, nor JWT secret provided during initialization'); | ||
} | ||
const token = typeof jwtPayloadOrToken === 'string' ? jwtPayloadOrToken : await promisify(sign)(jwtPayloadOrToken, this.jwtSecret); | ||
return { Authorization: `Bearer ${token}`, ...override }; | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
public async getAsJSON(objectId: string, jwtPayloadOrToken?: JWTPayload | string): Promise<any> { | ||
const objectDTO = await this.getAsStream(objectId, jwtPayloadOrToken); | ||
const data = await getStream(objectDTO.stream); | ||
return JSON.parse(data); | ||
} | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
public async getAsJSON (objectId: string, jwtPayloadOrToken: JWTPayload | string): Promise<any> { | ||
const content = await getStream(await this.getAsStream(objectId, jwtPayloadOrToken)); | ||
return JSON.parse(content); | ||
public async getAsStream(objectId: string, jwtPayloadOrToken?: JWTPayload | string): Promise<ObjectDTO> { | ||
const res = await this.client.readStream(objectId, jwtPayloadOrToken); | ||
const resultStream = this.applyMiddlewares(res.data, this.reverses); | ||
return { stream: resultStream, headers: res.headers }; | ||
} | ||
public async getAsStream (objectId: string, jwtPayloadOrToken: JWTPayload | string): Promise<Readable> { | ||
const res = await this.requestRetry( | ||
async (): Promise<AxiosResponse> => this.api.get(`/objects/${objectId}`, { responseType: 'stream', headers: await this.getHeaders(jwtPayloadOrToken) }) | ||
); | ||
return this.message.unpackStream(res.data); | ||
public async addAsStream(stream: Readable, jwtPayloadOrToken?: JWTPayload | string): Promise<string> { | ||
const resultStream = this.applyMiddlewares(stream, this.forwards); | ||
const res = await this.client.writeStream(resultStream, jwtPayloadOrToken); | ||
return res.data.objectId; | ||
} | ||
public async addAsStream (stream: Readable, jwtPayloadOrToken: JWTPayload | string): Promise<string> { | ||
let objectId = uuid.v4(); | ||
const res = await this.api.put( | ||
`/objects/${objectId}`, | ||
this.message.packStream(stream), | ||
{ headers: await this.getHeaders(jwtPayloadOrToken, { 'content-type': 'application/octet-stream' }) }); | ||
if (res.status >= 400) { | ||
throw new Error(`HTTP error during object add: ${res.status} (${res.statusText})`); | ||
} | ||
return objectId; | ||
private applyMiddlewares(stream: Readable, middlewares: TransformMiddleware[]): Readable { | ||
return middlewares.reduce((stream, middleware) => { | ||
return stream.pipe(middleware); | ||
}, stream); | ||
} | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
public async addAsJSON (data: any, jwtPayloadOrToken: JWTPayload | string): Promise<string> { | ||
let objectId = uuid.v4(); | ||
public async addAsJSON(data: any, jwtPayloadOrToken: JWTPayload | string): Promise<string> { | ||
const dataString = JSON.stringify(data); | ||
await this.requestRetry( | ||
async (): Promise<AxiosResponse> => { | ||
const dataStream = new Readable(); | ||
dataStream.push(dataString); | ||
dataStream.push(null); | ||
return this.api.put( | ||
`/objects/${objectId}`, | ||
this.message.packStream(dataStream), | ||
{ headers: await this.getHeaders(jwtPayloadOrToken, { 'content-type': 'application/octet-stream' }) } | ||
); | ||
}, | ||
{ | ||
onResponse: (err, res): boolean => { | ||
if (!err && res.status === 409) { | ||
log.warn('Generated already existing UUID'); | ||
objectId = uuid.v4(); | ||
return true; | ||
} | ||
} | ||
} | ||
); | ||
return objectId; | ||
const dataStream = new Readable(); | ||
dataStream.push(dataString); | ||
dataStream.push(null); | ||
return this.addAsStream(dataStream, jwtPayloadOrToken); | ||
} | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
47720
21
1011
3