Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

fluent-plugin-openlineage

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

fluent-plugin-openlineage

  • 0.1.3
  • Rubygems
  • Socket score

Version published
Maintainers
1
Created
Source

Fluent-plugin-openlineage, a plugin for Fluentd

Gem Version

fluent-plugin-openlineage is a Fluentd plugin that verifies if a JSON matches the OpenLineage schema. It is intended to be used together with a Fluentd Application.

Requirements

fluent-plugin-prometheusfluentdruby
1.x.y>= v1.9.1>= 2.4
1.[0-7].y>= v0.14.8>= 2.1
0.x.y>= v0.12.0>= 1.9

Installation

Add this line to your application's Gemfile:

gem 'fluent-plugin-openlineage'

And then execute:

$ bundle

Or install it yourself using one of the following:

$ gem install fluent-plugin-openlineage

$ fluent-gem install fluent-plugin-openlineage

Usage

fluentd-plugin-openlineage include only one plugin.

  • openlineage parse plugin

Why are Fluentd and Openlineage a perfect match?

This is part of the OpenLineage Project repository at: https://github.com/OpenLineage/OpenLineage/tree/main/proxy/fluentd

Modern data collectors (Fluentd, Logstash, Vector, etc.) can be extremely useful when designing production-grade architectures for processing Openlineage events.

They can be used for features such as:

  • A server-proxy in front of the Openlineage backend (like Marquez) to handle load spikes and buffer incoming events when the backend is down (e.g., due to a maintenance window).
  • The ability to copy the event to multiple backends such as HTTP, Kafka or cloud object storage. Data collectors implement that out-of-the-box.

They have great potential except for a single missing feature: the ability to parse and validate OpenLineage events at the point of HTTP input. This is important as one would like to get a Bad Request response immediately when sending invalid OpenLineage events to an endpoint. Fortunately, this missing feature can be implemented as a plugin.

We decided to implement an OpenLineage parser plugin for Fluentd because:

  • Fluentd has a small footprint in terms of resource utilization and does not require that JVM be installed,
  • Fluentd plugins can be installed from local files (no need to register in a plugin repository).

As a side effect, the Fluentd integration can be also used as a OpenLineage HTTP validation backend for development purposes.

Fluentd features

Some interesting Fluentd features are available according to the official documentation:

The official Fluentd documentation does not mention guarantees about event ordering. However, retrieving Openlineage events and buffering in file/memory should be considered a millisecond-long operation, while any HTTP backend cannot guarantee ordering in such a case. On the other hand, by default the amount of threads to flush the buffer is set to 1 and configurable (flush_thread_count).

Quickstart with Docker

Please refer to the Dockerfile and fluent.conf to see how to build and install the plugin with the example usage scenario provided in docker-compose.yml. To run the example setup, go to the docker directory and execute the following command:

docker-compose up

After all the containers have started, send some HTTP requests:

curl -X POST \
-d '{"test":"test"}' \
-H 'Content-Type: application/json' \
http://localhost:9880/api/v1/lineage

In response, you should see the following message:

Openlineage validation failed: path "/": "run" is a required property, path "/": "job" is a required property, path "/": "eventTime" is a required property, path "/": "producer" is a required property, path "/": "schemaURL" is a required property

Next, send some valid requests:

curl -X POST \
-d "$(cat test-start.json)" \
-H 'Content-Type: application/json' \
http://localhost:9880/api/v1/lineage
curl -X POST \
-d "$(cat test-complete.json)" \
-H 'Content-Type: application/json' \
http://localhost:9880/api/v1/lineage

After that you should see entities in Marquez (http://localhost:3000/) in the my-namespace namespace.

To clean up, run

docker-compose down

Configuration

Although Openlineage event is specified according to Json-Schema, its real-life validation may vary and backends like Marquez may have less strict approach to validating certain types of facets. For example, Marquez allows a non-valid DataQualityMetricsInputDatasetFacet. To give more flexibility, fluentd parser allows following configuration parameters:

validate_input_dataset_facets => true/false
validate_output_dataset_facets => true/false
validate_dataset_facets => true/false
validate_run_facets => true/false
validate_job_facets =>  true/false

By default, only validate_run_facets and validate_job_facets are set to true/

Development

To build dependencies:

bundle install
bundle

To run the tests:

bundle exec rake test
Installation

The easiest way to install the plugin is to install the main application Fluentd and along with it, these external packages for example in a Dockerfile:

  • rusty_json_schema installs a JSON validation library for Rust,
  • fluent-plugin-out-http allows non-bulk HTTP out requests (sending each OpenLineage event in a separate request).
fluent-gem install rusty_json_schema
fluent-gem install fluent-plugin-out-http
fluent-gem install fluent-plugin-openlineage

Fluentd proxy setup

Monitoring with Prometheus and some other configs you can try by running a separate fluent.conf file

You may also want to check how Fluentd application itself is doing using Prometheus and for that, you may want to add the plugin: fluent-plugin-prometheus at https://github.com/fluent/fluent-plugin-prometheus and include the following setup in your prometheus.yml file:

global:
  scrape_interval: 10s # Set the scrape interval to every 10 seconds. Default is every 1 minute.

#### A scrape configuration containing exactly one endpoint to scrape:
#### Here it's Prometheus itself.
scrape_configs:
  - job_name: 'fluentd'
    static_configs:
      - targets: ['localhost:24231']

You may also want to include the following additional parameters to your fluent.conf file:

#### source
<source>
  @type forward
  bind 0.0.0.0
  port 24224
</source>

#### count the number of incoming records per tag
<filter company.*>
  @type prometheus
  <metric>
    name fluentd_input_status_num_records_total
    type counter
    desc The total number of incoming records
    <labels>
      tag ${tag}
      hostname ${hostname}
    </labels>
  </metric>
</filter>

#### count the number of outgoing records per tag
<match company.*>
  @type copy

  <store>
    @type forward
    <server>
      name myserver1
      host 192.168.1.3
      port 24224
      weight 60
    </server>
  </store>

  <store>
    @type prometheus
    <metric>
      name fluentd_output_status_num_records_total
      type counter
      desc The total number of outgoing records
      <labels>
        tag ${tag}
        hostname ${hostname}
      </labels>
    </metric>
  </store>

</match>

#### expose metrics in prometheus format

<source>
  @type prometheus
  bind 0.0.0.0
  port 24231
  metrics_path /metrics
</source>

<source>
  @type prometheus_output_monitor
  interval 10
  <labels>
    hostname ${hostname}
  </labels>
</source>

For any additional information, you can check out Fluentd official documentation on https://docs.fluentd.org/monitoring-fluentd/monitoring-prometheus#example-prometheus-queries# fluentd-openlineage-parser

FAQs

Package last updated on 05 Aug 2024

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc