
Security News
PyPI Expands Trusted Publishing to GitLab Self-Managed as Adoption Passes 25 Percent
PyPI adds Trusted Publishing support for GitLab Self-Managed as adoption reaches 25% of uploads
datastream-direct
Advanced tools
A Python client library for connecting to and querying the DataStream Direct API.
pip install datastream-direct
from datastream_direct import connect
# Connect to the API
connection = connect(
username="your_username",
password="your_password",
host="your_host",
port=5432,
database="your_database"
)
# Get cursor and execute query
cursor = connection.cursor()
cursor.execute("SELECT * FROM well_combined LIMIT 10")
results = cursor.fetchall()
# Results will be a list of tuples
for row in results:
print(row)
# Close the connection
connection.close()
from datastream_direct import connect, fetch_frame
import pandas as pd
# Connect
conn = connect(
username="user",
password="pass",
host="localhost",
port=5432,
database="mydb"
)
# Get results as a DataFrame with proper type conversion
df = fetch_frame(conn.cursor(), "SELECT * FROM well_combined LIMIT 100")
print(df.dtypes) # Shows proper column types
print(df.head())
# Clean up
conn.close()
connect(username, password, host, port, database)Create a connection to the DataStream Direct API.
Parameters:
username (str): Authentication usernamepassword (str): Authentication passwordhost (str): Database hostport (int): Database portdatabase (str): Database nameReturns: DatastreamDirectConnection instance
Raises:
ValueError: If any required parameter is invalidAuthenticationError: If authentication failsConnectionError: If connection failsExample:
from datastream_direct import connect
connection = connect(
username="user",
password="pass",
host="data-api.energydomain.com",
port=443,
database="energy_domain"
)
fetch_frame(cursor, query)Execute a query and return results as a pandas DataFrame with proper type conversion.
Parameters:
cursor (DatastreamDirectCursor): Cursor instancequery (str): SQL query stringReturns: pandas.DataFrame with properly typed columns
Example:
from datastream_direct import connect, fetch_frame
connection = connect(username="user", password="pass", ...)
cursor = connection.cursor()
# Get results as a DataFrame
df = fetch_frame(cursor, "SELECT * FROM well_combined LIMIT 100")
print(df.dtypes) # Shows proper column types
data_stream_direct_to_spotfire_types(dataframe, column_metadata)Set Spotfire-specific data types for a pandas DataFrame.
Parameters:
dataframe (pandas.DataFrame): A pandas DataFrame to apply Spotfire types tocolumn_metadata (List[DataStreamDirectColumnMetadata]): List of column metadata objects describing the columns in the DataFrameReturns: None (modifies the DataFrame in-place)
Raises:
ImportError: If the Spotfire package is not availableNote: This function is only available if the Spotfire package is installed.
Example:
from datastream_direct import connect, data_stream_direct_to_spotfire_types
import pandas as pd
connection = connect(username="user", password="pass", ...)
cursor = connection.cursor()
cursor.execute("SELECT * FROM well_combined LIMIT 100")
cursor.fetchall()
df = pd.DataFrame(cursor.rows, columns=[col.name for col in cursor.metadata])
data_stream_direct_to_spotfire_types(df, cursor.metadata)
DatastreamDirectConnectionConnection class for managing API connections.
Methods:
cursor(): Create a new cursor for executing queriesclose(): Close the connection and clean up resourcesProperties:
is_closed (bool): Check if the connection is closedDatastreamDirectCursorCursor class for executing SQL queries and retrieving results.
Methods:
execute(sql): Execute a SQL queryfetchall(): Fetch all results from the last executed queryclose(): Close the cursor and clean up resourcesProperties:
metadata: Column metadata from the last executed queryis_closed (bool): Check if the cursor is closedConnectionConfigConfiguration dataclass for connecting to the DataStream Direct API.
Attributes:
username (str): Username for authenticationpassword (str): Password for authenticationhost (str): Hostname or IP address of the DataStream Direct serviceport (int): Port number (must be between 1 and 65535)database (str): Name of the database to connect toMethods:
get_base_url(): Get the base URL for the connectionRaises:
ValueError: If any parameter is invalid or emptyExample:
from datastream_direct import ConnectionConfig
config = ConnectionConfig(
username="user",
password="pass",
host="data-api.energydomain.com",
port=443,
database="energy_domain"
)
base_url = config.get_base_url()
QueryResultResult of a SQL query execution. Supports iteration over rows.
Attributes:
headers (List[str]): List of column namesrows (List[Tuple[Any, ...]]): List of tuples containing row dataresults_metadata (Optional[Dict[str, Any]]): Optional metadata about the query resultsExample:
from datastream_direct import QueryResult
result = QueryResult(
headers=["id", "name"],
rows=[(1, "Alice"), (2, "Bob")]
)
print(len(result)) # 2
for row in result:
print(row)
DataStreamDirectErrorBase exception class for all DataStream Direct errors. All exceptions raised by the client inherit from this class.
Attributes:
message (str): The error messageerror_code (Optional[str]): Optional error code if providedExample:
from datastream_direct import connect, DataStreamDirectError
try:
connection = connect(...)
except DataStreamDirectError as e:
print(f"Error: {e.message}")
if e.error_code:
print(f"Error Code: {e.error_code}")
AuthenticationError(DataStreamDirectError)Raised when authentication fails. This exception is raised when the provided credentials are invalid or when the authentication process fails.
Example:
from datastream_direct import connect, AuthenticationError
try:
connection = connect(username="user", password="wrong", ...)
except AuthenticationError as e:
print(f"Authentication failed: {e.message}")
ConnectionError(DataStreamDirectError)Raised when connection to the API fails. This exception is raised when there are network issues, the service is unavailable, or when attempting to use a closed connection.
Example:
from datastream_direct import connect, ConnectionError
try:
connection = connect(...)
# Later, try to use closed connection
cursor = connection.cursor()
except ConnectionError as e:
print(f"Connection error: {e.message}")
QueryError(DataStreamDirectError)Raised when query execution fails. This exception is raised when a SQL query fails to execute, either due to syntax errors, permission issues, or other query-related problems.
Example:
from datastream_direct import QueryError
try:
cursor.execute("INVALID SQL")
except QueryError as e:
print(f"Query failed: {e.message}")
APIError(DataStreamDirectError)Raised when the API returns an error response. This exception is raised for general API errors that don't fall into other specific error categories.
Attributes:
message (str): The error messagestatus_code (Optional[HttpStatus]): HTTP status code if availableerror_code (Optional[str]): Optional error code if providedExample:
from datastream_direct import APIError
try:
cursor.execute("SELECT * FROM nonexistent")
except APIError as e:
print(f"API error: {e.message}")
if e.status_code:
print(f"Status code: {e.status_code}")
The library provides specific exception classes for different error scenarios:
DataStreamDirectError: Base exception classAuthenticationError: Authentication failuresConnectionError: Connection issuesQueryError: SQL execution failuresAPIError: General API errorsExample:
from datastream_direct import connect, AuthenticationError, QueryError
try:
connection = connect(username="user", password="pass", ...)
cursor = connection.cursor()
cursor.execute("SELECT * FROM table")
results = cursor.fetchall()
except AuthenticationError as e:
print(f"Authentication failed: {e}")
except QueryError as e:
print(f"Query failed: {e}")
finally:
connection.close()
The library uses Python's standard logging module. To enable debug logging:
import logging
logging.basicConfig(level=logging.DEBUG)
Note: For connection configuration details, see the ConnectionConfig class in the API Reference section above.
FAQs
Python client for DataStream Direct API
We found that datastream-direct demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
PyPI adds Trusted Publishing support for GitLab Self-Managed as adoption reaches 25% of uploads

Research
/Security News
A malicious Chrome extension posing as an Ethereum wallet steals seed phrases by encoding them into Sui transactions, enabling full wallet takeover.

Security News
Socket is heading to London! Stop by our booth or schedule a meeting to see what we've been working on.