cloudflare-workers-dynamodb
Advanced tools
Comparing version 0.0.2 to 0.0.3
256
index.js
var SHA256 = require('crypto-js/sha256'); | ||
var HmacSHA256 = require('crypto-js/hmac-sha256'); | ||
var uuidv4 = require('uuid/v4'); | ||
@@ -7,2 +8,4 @@ var accessKey = null; | ||
var primaryKey = null; | ||
var sortKey = null; | ||
var valueKey = null; | ||
var tableName = null; | ||
@@ -15,2 +18,4 @@ var regionList = null; | ||
primaryKey = config.primaryKey; | ||
sortKey = config.sortKey; | ||
valueKey = config.valueKey; | ||
tableName = config.tableName; | ||
@@ -114,3 +119,3 @@ regionList = config.regionList; | ||
async function getItem(key, waitUntil) { | ||
async function getItemRace(pkey, skey, waitUntil) { | ||
@@ -121,3 +126,4 @@ const requestObject = { | ||
} | ||
requestObject.Key[primaryKey] = { "S": key }; | ||
requestObject.Key[primaryKey] = { "S": pkey }; | ||
requestObject.Key[sortKey] = { "N": '' + skey }; | ||
@@ -147,3 +153,3 @@ const request = JSON.stringify(requestObject); | ||
if (!response.ok && promiseMap.size === 0) { | ||
throw new Error(`unable to get item: ${key}`) | ||
throw new Error(`unable to get item: ${pkey}`) | ||
} | ||
@@ -169,12 +175,252 @@ | ||
let item = null; | ||
if (responseBody.Item && responseBody.Item.content) { | ||
item = responseBody.Item.content.S; | ||
if (responseBody.Item && responseBody.Item[valueKey]) { | ||
item = responseBody.Item[valueKey].S; | ||
} | ||
return { value: item, region } | ||
} | ||
async function getDynamoItem(pkey, skey, type, region) { | ||
const requestObject = { | ||
"TableName": tableName, | ||
"Key": { } | ||
} | ||
requestObject.Key[primaryKey] = { "S": pkey }; | ||
requestObject.Key[sortKey] = { "N": '' + skey }; | ||
const request = JSON.stringify(requestObject); | ||
//console.log('getDynamoItem() body ' + request) | ||
const firstRegion = (region === undefined) ? regionList[0] : region; // TODO: assign a primary read/write region | ||
const promise = signAndSendRequest(firstRegion, 'GetItem', request) | ||
.then(rsp => ({response: rsp})) | ||
.catch(ex => ({response: { ok: false, status: 999, statusText: ex.toString()}})) | ||
var { response } = await promise; | ||
if (!response.ok) { | ||
throw new Error(`unable to get item ${pkey} ${skey} because of error ${response.statusText}`); | ||
} | ||
let responseBody = await response.json(); | ||
let item = null; | ||
if (responseBody.Item && responseBody.Item[valueKey]) { | ||
if (type === 'arrayBuffer') { | ||
let base64text = responseBody.Item[valueKey].B; | ||
let binaryString = atob(base64text); | ||
var byteArray = new Uint8Array(binaryString.length); | ||
for (let i = 0; i < binaryString.length; i++) { | ||
byteArray[i] = binaryString.charCodeAt(i); | ||
} | ||
item = byteArray; | ||
} | ||
else { | ||
item = responseBody.Item[valueKey].S; | ||
} | ||
} | ||
return item; | ||
} | ||
async function putDynamoItem(pkey, skey, value) { | ||
let itemValue = (value instanceof Uint8Array) ? | ||
{ "B": btoa(String.fromCharCode(...value)) } : { "S": value }; | ||
const requestObject = { | ||
"TableName": tableName, | ||
"Item": { } | ||
} | ||
requestObject.Item[primaryKey] = { "S": pkey }; | ||
requestObject.Item[sortKey] = { "N": '' + skey }; | ||
requestObject.Item[valueKey] = itemValue; | ||
const request = JSON.stringify(requestObject); | ||
//console.log('putDynamoItem() body ' + request) | ||
const firstRegion = regionList[0]; // TODO: assign a primary write region | ||
const promise = signAndSendRequest(firstRegion, 'PutItem', request) | ||
.then(rsp => ({response: rsp, region: firstRegion})) | ||
.catch(ex => ({response: { ok: false, status: 999, statusText: ex.toString()}, region: firstRegion})) | ||
var { response, region } = await promise; | ||
if (!response.ok) { | ||
throw new Error(`unable to put item ${pkey} ${skey} because of error ${response.statusText}`); | ||
} | ||
return undefined; | ||
} | ||
async function deleteDynamoItem(pkey, skey) { | ||
const requestObject = { | ||
"TableName": tableName, | ||
"Key": { } | ||
} | ||
requestObject.Key[primaryKey] = { "S": pkey }; | ||
requestObject.Key[sortKey] = { "N": '' + skey }; | ||
const request = JSON.stringify(requestObject); | ||
//console.log('deleteDynamoItem() ' + request) | ||
const firstRegion = regionList[0]; // TODO: assign a primary write region | ||
const promise = signAndSendRequest(firstRegion, 'DeleteItem', request) | ||
.then(rsp => ({response: rsp, region: firstRegion})) | ||
.catch(ex => ({response: { ok: false, status: 999, statusText: ex.toString()}, region: firstRegion})) | ||
var { response, region } = await promise; | ||
if (!response.ok) { | ||
throw new Error(`unable to delete item ${pkey} ${skey} because of error ${response.statusText}`); | ||
} | ||
return undefined; | ||
} | ||
function parseBlockMeta(value) { | ||
let blockId = value.slice(3, 39); | ||
let blockCount = parseInt(value.slice(47), 10); | ||
return { blockId, blockCount }; | ||
} | ||
var BLOCK_SIZE = 400000; | ||
var BLOCK_REGEX = /^id=[0-9a-f]{8}-(?:[0-9a-f]{4}-){3}[0-9a-f]{12};length=[0-9]{1,}$/ | ||
async function getItem(key, waitUntil) { | ||
let {value, region} = await getItemRace(key, 0, waitUntil); | ||
//console.log(`race winner: ${region} with value ${value}`); | ||
if (value === null || value.search(BLOCK_REGEX) === -1) { | ||
return value; | ||
} | ||
let { blockId, blockCount } = parseBlockMeta(value); | ||
let promiseList = []; | ||
for (let blockIndex = 0; blockIndex < blockCount; blockIndex++) { | ||
let blockPromise = getDynamoItem(blockId, blockIndex, 'arrayBuffer', region) | ||
promiseList.push(blockPromise) | ||
} | ||
let blockList = await Promise.all(promiseList); | ||
let finalValue = ''; | ||
let byteArraySize = 0; | ||
for (let blockData of blockList) { | ||
if (blockData === null) { | ||
let err = new Error(`key '${key}' has missing data blocks and needs deletion`); | ||
err.blockRecord = value; | ||
throw err; | ||
} | ||
if (blockData instanceof Uint8Array) { | ||
byteArraySize += blockData.byteLength; | ||
} | ||
else { | ||
finalValue += blockData; | ||
} | ||
} | ||
if (byteArraySize > 0) { | ||
let resultArray = new Uint8Array(byteArraySize); | ||
let offset = 0; | ||
for (let blockData of blockList) { | ||
resultArray.set(blockData, offset); | ||
offset += blockData.byteLength; | ||
} | ||
let decoder = new TextDecoder(); | ||
finalValue = decoder.decode(resultArray); | ||
} | ||
return finalValue; | ||
} | ||
async function putItem(key, value) { | ||
let oldValue = await getDynamoItem(key, 0); | ||
let oldBlock = undefined; | ||
if (oldValue !== null && oldValue.search(BLOCK_REGEX) === 0) { | ||
oldBlock = oldValue; | ||
} | ||
let encoder = new TextEncoder(); | ||
let encoded = encoder.encode(value); | ||
if (encoded.length <= BLOCK_SIZE) { | ||
await putDynamoItem(key, '0', value); | ||
return oldBlock; | ||
} | ||
let blockList = []; | ||
let blocks = Math.floor(encoded.length / BLOCK_SIZE); | ||
let lastBlock = (encoded.length % BLOCK_SIZE) > 0 ? 1 : 0; | ||
let totalBlocks = blocks + lastBlock; | ||
for (let i = 0; i < totalBlocks; i++) { | ||
let startIndex = i * BLOCK_SIZE; | ||
let endIndex = startIndex + BLOCK_SIZE; | ||
let block = encoded.slice(startIndex, endIndex); | ||
blockList.push(block); | ||
} | ||
let blockId = uuidv4(); | ||
let blockIndex = 0; | ||
for (let block of blockList) { | ||
try { | ||
await putDynamoItem(blockId, blockIndex, block); | ||
} | ||
catch (ex) { | ||
throw new Error(`${blockId} ${blockIndex} block put error: ${ex.message}`) | ||
} | ||
blockIndex++; | ||
} | ||
await putDynamoItem(key, 0, `id=${blockId};length=${blockList.length}`); | ||
return oldBlock; | ||
} | ||
async function delItem(key) { | ||
let value = await getDynamoItem(key, 0); | ||
if (value === null) { | ||
return false; | ||
} | ||
if (value.search(BLOCK_REGEX) === -1) { | ||
return deleteDynamoItem(key, 0); | ||
} | ||
let { blockId, blockCount } = parseBlockMeta(value); | ||
for (let blockIndex = 0; blockIndex < blockCount; blockIndex++) { | ||
await deleteDynamoItem(blockId, blockIndex); | ||
} | ||
return deleteDynamoItem(key, 0); | ||
} | ||
async function clean(block) { | ||
let { blockId, blockCount } = parseBlockMeta(block); | ||
for (let blockIndex = 0; blockIndex < blockCount; blockIndex++) { | ||
await deleteDynamoItem(blockId, blockIndex); | ||
} | ||
} | ||
exports.init = init; | ||
exports.listTables = listTables; | ||
exports.get = getItem; | ||
exports.put = putItem; | ||
exports.del = delItem; | ||
exports.clean = clean; |
{ | ||
"name": "cloudflare-workers-dynamodb", | ||
"version": "0.0.2", | ||
"version": "0.0.3", | ||
"description": "Query DynamoDB from Cloudflare Workers", | ||
@@ -24,4 +24,5 @@ "main": "index.js", | ||
"dependencies": { | ||
"crypto-js": "3.1.9-1" | ||
"crypto-js": "3.1.9-1", | ||
"uuid": "3.3.2" | ||
} | ||
} |
14889
325
2
+ Addeduuid@3.3.2
+ Addeduuid@3.3.2(transitive)