
Security News
The Changelog Podcast: Practical Steps to Stay Safe on npm
Learn the essential steps every developer should take to stay secure on npm and reduce exposure to supply chain attacks.
streaming-jupyter-integrations
Advanced tools
Streaming Jupyter Integrations project includes a set of magics for interactively running Flink SQL jobs in Jupyter Notebooks
In order to actually use these magics, you must install our PIP package along jupyterlab-lsp:
python3 -m pip install jupyterlab-lsp streaming-jupyter-integrations
Register in Jupyter with a running IPython in the first cell:
%load_ext streaming_jupyter_integrations.magics
Then you need to decide which execution mode and execution target to choose.
%flink_connect --execution-mode [mode] --execution-target [target]
By default, the streaming execution mode and local execution target are used.
%flink_connect
Currently, Flink supports two execution modes: batch and streaming. Please see Flink documentation for more details.
In order to specify execution mode, add --execution-mode parameter, for instance:
%flink_connect --execution-mode batch
Streaming Jupyter Integrations supports 3 execution targets:
Running Flink in local mode will start a MiniCluster in a local JVM with parallelism 1.
In order to run Flink locally, use:
%flink_connect --execution-target local
Alternatively, since the execution target is local by default, use:
%flink_connect
One can specify port of the local JobManager (8099 by default). This is useful especially if you run multiple Notebooks in a single JupyterLab.
%flink_connect --execution-target local --local-port 8123
Running Flink in remote mode will connect to an existing Flink session cluster. Besides specifying --execution-target
to be remote, you also need to specify --remote-hostname and --remote-port pointing to Flink Job Manager's
REST API address.
%flink_connect \
--execution-target remote \
--remote-hostname example.com \
--remote-port 8888
Running Flink in yarn-session mode will connect to an existing Flink session cluster running on YARN. You may specify
the hostname and port of the YARN Resource Manager (--resource-manager-hostname and --resource-manager-port).
If Resource Manager address is not provided, it is assumed that notebook runs on the same node as Resource Manager.
You can also specify YARN applicationId (--yarn-application-id) to which the notebook will connect to.
If --yarn-application-id is not specified and there is one YARN application running on the cluster, the notebook will
try to connect to it. Otherwise, it will fail.
Connecting to a remote Flink session cluster running on a remote YARN cluster:
%flink_connect \
--execution-target yarn-session \
--resource-manager-hostname example.com \
--resource-manager-port 8888 \
--yarn-application-id application_1666172784500_0001
Connecting to a Flink session cluster running on a YARN cluster:
%flink_connect \
--execution-target yarn-session \
--yarn-application-id application_1666172784500_0001
Connecting to a Flink session cluster running on a dedicated YARN cluster:
%flink_connect --execution-target yarn-session
Magics allow for dynamic variable substitution in Flink SQL cells.
my_variable = 1
SELECT * FROM some_table WHERE product_id = {my_variable}
Moreover, you can mark sensitive variables like password so they will be read from environment variables or user input every time one runs the cell:
CREATE TABLE MyUserTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users',
'username' = '${my_username}',
'password' = '${my_password}'
);
%%flink_execute commandThe command allows to use Python DataStream API and Table API. There are two handles exposed for each API:
stream_env and table_env, respectively.
Table API example:
%%flink_execute
query = """
SELECT user_id, COUNT(*)
FROM orders
GROUP BY user_id
"""
execution_output = table_env.execute_sql(query)
When Table API is used, the final result has to be assigned to execution_output variable.
DataStream API example:
%%flink_execute
from pyflink.common.typeinfo import Types
execution_output = stream_env.from_collection(
collection=[(1, 'aaa'), (2, 'bb'), (3, 'cccc')],
type_info=Types.ROW([Types.INT(), Types.STRING()])
)
When DataStream API is used, the final result has to be assigned to execution_output variable. Please note that
the pipeline does not end with .execute(), the execution is triggered by the Jupyter magics under the hood.
There are currently 2 options for running streaming_jupyter_integrations for development. We can either
use a Docker image or install it on our machine.
You can build a Docker image of Jupyter Notebooks by running the command below.
It will contain functionality that was developed in this project.
docker build --tag streaming_jupyter_integrations_image .
After the image is built, we can run it using this command.
docker run --name streaming_jupyter_integrations -p 8888:8888 streaming_jupyter_integrations_image
After that we should be able to reach our Jupyterhub running on Docker under: http://127.0.0.1:8888/
Note: You will need NodeJS to build the extension package.
The jlpm command is JupyterLab's pinned version of
yarn that is installed with JupyterLab. You may use
yarn or npm in lieu of jlpm below. In order to use jlpm, you have to
have jupyterlab installed (e.g., by brew install jupyterlab, if you use
Homebrew as your package manager).
# Clone the repo to your local environment
# Change directory to the flink_sql_lsp_extension directory
# Install package in development mode
pip install -e .
# Link your development version of the extension with JupyterLab
jupyter labextension develop . --overwrite
# Rebuild extension Typescript source after making changes
jlpm build
The project uses pre-commit hooks to ensure code quality, mostly by linting. To use it, install pre-commit and then run
pre-commit install --install-hooks
From that moment, it will lint the files you have modified on every commit attempt.
FAQs
JupyterNotebook Flink magics
We found that streaming-jupyter-integrations demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Learn the essential steps every developer should take to stay secure on npm and reduce exposure to supply chain attacks.

Security News
Experts push back on new claims about AI-driven ransomware, warning that hype and sponsored research are distorting how the threat is understood.

Security News
Ruby's creator Matz assumes control of RubyGems and Bundler repositories while former maintainers agree to step back and transfer all rights to end the dispute.