Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

process-street-tap-postgres

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

process-street-tap-postgres

Singer.io tap for extracting data from PostgresSQL - PipelineWise compatible

  • 2.2.0
  • PyPI
  • Socket score

Maintainers
1

process-street-tap-postgres

PyPI version PyPI - Python Version License: MIT

Singer tap that extracts data from a PostgreSQL database and produces JSON-formatted data following the Singer spec.

This is a PipelineWise compatible tap connector.

IMPORTANT

This is a custom fork that adds support for ltree to the pipelinewise-tap-postgres tap.

If a new release is made from the upstream repo, this fork will need to be updated to match, deployed again to PyPI, and then the Dockerfile updated to match the new version number.

You can find the PyPI API key in 1Password.

To create a new PyPI package run:

> make venv
> . ./venv/bin/activate
> python setup.py sdist
> 

How to use it

The recommended method of running this tap is to use it from PipelineWise. When running it from PipelineWise you don't need to configure this tap with JSON files and most of things are automated. Please check the related documentation at Tap Postgres

If you want to run this Singer Tap independently please read further.

Install and Run

First, make sure Python 3 is installed on your system or follow these installation instructions for Mac or Ubuntu.

It's recommended to use a virtualenv:

  python3 -m venv venv
  pip install pipelinewise-tap-postgres

or

  make venv

Create a config.json

{
  "host": "localhost",
  "port": 5432,
  "user": "postgres",
  "password": "secret",
  "dbname": "db"
}

These are the same basic configuration properties used by the PostgreSQL command-line client (psql).

Full list of options in config.json:

PropertyTypeRequired?DefaultDescription
hostStringYes-PostgreSQL host
portIntegerYes-PostgreSQL port
userStringYes-PostgreSQL user
passwordStringYes-PostgreSQL password
dbnameStringYes-PostgreSQL database name
filter_schemasStringNoNoneComma separated schema names to scan only the required schemas to improve the performance of data extraction.
sslStringNoNoneIf set to "true" then use SSL via postgres sslmode require option. If the server does not accept SSL connections or the client certificate is not recognized the connection will fail.
logical_poll_total_secondsIntegerNo10800Stop running the tap when no data received from wal after certain number of seconds.
break_at_end_lsnBooleanNotrueStop running the tap if the newly received lsn is after the max lsn that was detected when the tap started.
max_run_secondsIntegerNo43200Stop running the tap after certain number of seconds.
debug_lsnStringNoNoneIf set to "true" then add _sdc_lsn property to the singer messages to debug postgres LSN position in the WAL stream.
tap_idStringNoNoneID of the pipeline/tap
itersizeIntegerNo20000Size of PG cursor iterator when doing INCREMENTAL or FULL_TABLE
default_replication_methodStringNoNoneDefault replication method to use when no one is provided in the catalog (Values: LOG_BASED, INCREMENTAL or FULL_TABLE)
use_secondaryBooleanNoFalseUse a database replica for INCREMENTAL and FULL_TABLE replication
secondary_hostStringNo-PostgreSQL Replica host (required if use_secondary is True)
secondary_portIntegerNo-PostgreSQL Replica port (required if use_secondary is True)
limitIntegerNoNoneAdds a limit to INCREMENTAL queries to limit the number of records returns per run

Run the tap in Discovery Mode

tap-postgres --config config.json --discover                # Should dump a Catalog to stdout
tap-postgres --config config.json --discover > catalog.json # Capture the Catalog

Add Metadata to the Catalog

Each entry under the Catalog's "stream" key will need the following metadata:

{
  "streams": [
    {
      "stream_name": "my_topic"
      "metadata": [{
        "breadcrumb": [],
        "metadata": {
          "selected": true,
          "replication-method": "LOG_BASED",
        }
      }]
    }
  ]
}

The replication method can be one of FULL_TABLE, INCREMENTAL or LOG_BASED.

Note: Log based replication requires a few adjustments in the source postgres database, please read further for more information.

Run the tap in Sync Mode

tap-postgres --config config.json --catalog catalog.json

The tap will write bookmarks to stdout which can be captured and passed as an optional --state state.json parameter to the tap for the next sync.

Log Based replication requirements

  • PostgreSQL databases running PostgreSQL versions 9.4.x or greater. To avoid a critical PostgreSQL bug, use at least one of the following minor versions:

    • PostgreSQL 12.0
    • PostgreSQL 11.2
    • PostgreSQL 10.7
    • PostgreSQL 9.6.12
    • PostgreSQL 9.5.16
    • PostgreSQL 9.4.21
  • A connection to the master instance. Log-based replication will only work by connecting to the master instance.

  • wal2json plugin: To use Log Based for your PostgreSQL integration, you must install the wal2json plugin version >= 2.3. The wal2json plugin outputs JSON objects for logical decoding, which the tap then uses to perform Log-based Replication. Steps for installing the plugin vary depending on your operating system. Instructions for each operating system type are in the wal2json’s GitHub repository:

  • postgres config file: Locate the database configuration file (usually postgresql.conf) and define the parameters as follows:

    wal_level=logical
    max_replication_slots=5
    max_wal_senders=5
    

    Restart your PostgreSQL service to ensure the changes take effect.

    Note: For max_replication_slots and max_wal_senders, we’re defaulting to a value of 5. This should be sufficient unless you have a large number of read replicas connected to the master instance.

  • Existing replication slot: Log based replication requires a dedicated logical replication slot. In PostgreSQL, a logical replication slot represents a stream of database changes that can then be replayed to a client in the order they were made on the original server. Each slot streams a sequence of changes from a single database.

    Login to the master instance as a superuser and using the wal2json plugin, create a logical replication slot:

      SELECT *
      FROM pg_create_logical_replication_slot('pipelinewise_<database_name>', 'wal2json');
    

    Note: Replication slots are specific to a given database in a cluster. If you want to connect multiple databases - whether in one integration or several - you’ll need to create a replication slot for each database.

To run tests:

  1. Install python test dependencies in a virtual env:
 make venv
  1. You need to have a postgres database to run the tests and export its credentials.

You can make use of the local docker-compose to spin up a test database by running make start_db

Test objects will be created in the postgres database.

  1. To run the unit tests:
  make unit_test
  1. To run the integration tests:
  make integration_test

To run pylint:

Install python dependencies and run python linter

  make venv
  make pylint

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc