Socket
Socket
Sign inDemoInstall

leo-sdk

Package Overview
Dependencies
54
Maintainers
7
Versions
201
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

    leo-sdk

Load data onto the LEO Platform


Version published
Weekly downloads
1.9K
increased by3.35%
Maintainers
7
Install size
9.21 MB
Created
Weekly downloads
 

Readme

Source

A Node/Typescript SDK that pushes data to and pulls data from an instance of an RStreams bus.

Full documentation and getting started guide here: rstreams-site-url/rstreams-node-sdk/

For this SDK API documentation, get started with the RStreamsSdk.

Put

Send events one at a time to an instance of the RStreams bus.

Put Operation documentation

import { ConfigurationResources, RStreamsSdk } from "leo-sdk";
import { PersonRaw, PersonRawResults } from "../lib/types";
import axios from "axios";

async function main() {
  const rsdk: RStreamsSdk  = new RStreamsSdk();
  const person = await getRandomPerson();
  await rsdk.putEvent('rstreams-example.load-people', 'rstreams-example.people', person);
}

async function getRandomPerson(): Promise<PersonRaw> {
  const NUM_EVENTS = 1;
  const url = `https://randomuser.me/api/?results=${NUM_EVENTS}&exc=login,registered,phone,cell,picture,id&noinfo`;
  const {data, status} = await axios.get<PersonRawResults>(url);
  
  if (status !== 200) {
    throw new Error('Unable to get randomPeople from https://randomuser.me API: ' + status);
  }
  
  console.log('Person: ' + data.results[0].name.first + ' ' + data.results[0].name.last);

  return data.results[0];
}

(async () => {
  await main();
})()

Enrich

Read events from one queue, modify them and send them on to another queue of the RStreams bus.

Enrich Operation documentation

import { EnrichOptions,  RStreamsSdk } from "leo-sdk";
import { Person, PersonRaw } from "../lib/types";
import axios from "axios";

async function main() {
  const rsdk: RStreamsSdk  = new RStreamsSdk();
  const opts: EnrichOptions<PersonRaw, Person>  = {
    id: 'rstreams-example.people-to-peopleplus',
    inQueue: 'rstreams-example.people',
    outQueue: 'rstreams-example.peopleplus',
    start: 'z/2022/04/20',
    config: {
      limit: 2
    },
    transform: async (person: PersonRaw) => {
      const p: Person = translate(person);
      await addCountryCode(p);
      return p;
    }
  };

  await rsdk.enrichEvents<PersonRaw, Person>(opts);
}

interface CountryCode {cca2: string;}

/**
 * @param person The person to add addr.countryCode to by calling a public API to
 *               turn a country name in a 2 digit country code (iso cca2)
 */
 async function addCountryCode(person: Person): Promise<void> {
    const url = `https://restcountries.com/v3.1/name/${person.addr.country}?fullText=true&fields=cca2`;    
    const cc: CountryCode = await axios.get(url);
    person.addr.countryCode = cc.cca2;
  }

/**
 * @param p The type from the public API we want to modify
 * @returns The new type that is flatter and gets rid of some attributes don't need
 */
function translate(p: PersonRaw): Person {
  return {
    gender: p.gender,
    firstName: p.name.first,
    lastName: p.name.last,
    email: p.email,
    birthDate: p.dob.date,
    nationality: p.nat,
    addr: {
      addr1: p.location.street.number + ' ' + p.location.street.name,
      city: p.location.city, 
      state: p.location.state,
      country: p.location.country,
      postcode: p.location.postcode,
      longitude: p.location.coordinates.longitude,
      latitude: p.location.coordinates.latitude,
      tzOffset: p.location.timezone.offset,
      tzDesc: p.location.timezone.description
    }
  }
}

(async () => {
  await main();
})()

Offload

Stream data down from an RStreams bus queue at scale, optionally modify it and send it to another source such as a database or file.

Offload Operation documentation

import { OffloadOptions,  RStreamsSdk } from "leo-sdk";
import { Person } from "../lib/types";
import axios, { AxiosResponse } from "axios";

async function main() {
  const rsdk: RStreamsSdk  = new RStreamsSdk();
  const opts: OffloadOptions<Person>  = {
    id: 'rstreams-example.offload-one-peopleplus',
    inQueue: 'rstreams-example.people',
    start: 'z/2022/04/20',
    limit: 2,
    transform: async (person: Person) => {
        await savePerson(person);
        return true;        
    }
  };

  await rsdk.offloadEvents<Person>(opts);
}

interface PostResponse {
    success: boolean;
}

/**
 * @param person Save the person to another system.
 */
async function savePerson(person: Person): Promise<void> {
  const url = `https://run.mocky.io/v3/83997150-ab13-43da-9fb9-66051ba06c10?mocky-delay=500ms`;    
  const {data, status}: AxiosResponse<PostResponse, any> = await axios.post<PostResponse>(url, person);
  if (status !== 200 || !data || data.success !== true) {
    throw new Error('Saving person to external system failed');
  }
}

(async () => {
  await main();
})()

FAQs

Last updated on 29 Nov 2023

Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc