iotoolz
iotoolz
is an improvement over e2fyi-utils
and is inspired partly by toolz
.
iotoolz
is a lib to help provide a consistent dev-x for interacting with any IO resources.
It provides an abstract class iotoolz.AbcStream
which mimics python's native open
very closely (with some additional parameters and methods such as save
).
API documentation can be found at https://iotoolz.readthedocs.io/en/latest/.
Change logs are available in CHANGELOG.md.
Supported streams
Current the following streams are supported:
iotoolz.FileStream
: wrapper over built-in open
function (file://
)iotoolz.TempStream
: in-memory stream that will rollover to disk (tmp://
, temp://
)iotoolz.HttpStream
: http or https stream implemented with requests
(http://
, https://
)iotoolz.extensions.S3Stream
: s3 stream implemented with boto3
(s3://
, s3a://
, s3n://
)iotoolz.extensions.MinioStream
: s3 stream implemented with minio
(minio://
)
Installation
pip install iotoolz
pip install iotoolz[boto3]
pip install iotoolz[all]
Available extras:
all
: All the optional dependenciesboto3
: boto3
for iotoolz.extensions.S3Stream
minio
: minio
for iotoolz.extensions.MinioStream
chardet
: install LGPL chardet
for guessing encoding. Otherwise charset_normalizer
will be used.
Quickstart
The helper object iotoolz.streams.stream_factory
is a default singleton of
iotoolz.streams.Streams
provided to support most of the common use cases.
iotoolz.streams.open_stream
(alias iotoolz.streams.Stream
) and is a util method
provided by the singleton helper to create a stream object. This method accepts the same
arguments as python's open
method with the following additional parameters:
data
: optional str or bytes that will be passed into the streamfileobj
: optional file-like object which will be copied into the streamcontent_type
: optional mime type information to describe the stream (e.g. application/json)inmem_size
: determines how much memory to allocate to the stream before rolling over to local file system. Defaults to no limits (may result in MemoryError).schema_kwargs
: optional mapping of schemas to their default kwargs.
Basic Setup
from iotoolz.streams import (
set_schema_kwargs,
set_buffer_rollover_size,
)
set_schema_kwargs(
"https",
verify=False,
use_post=True
)
set_schema_kwargs(
"s3",
client=boto3.client(
"s3",
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
aws_session_token=SESSION_TOKEN,
)
)
set_schema_kwargs(
"minio",
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
secure=True,
)
set_buffer_rollover_size(10**8)
Opening streams
You can open any stream just like python's built-in open
method.
import pandas
from iotoolz import open_stream
with open_stream(
"https://foo/bar/data.txt",
mode="r",
schema_kwargs={"https": {"verify": False}}
) as stream:
for line in stream:
print(line)
with open_stream("https://foo.bar/api/data", "wb", use_post=True) as stream:
stream.write(b"hello world")
with open_stream("path/to/data.csv", "r") as csv_source,
open_stream("s3://bucket/foobar.txt?StorageClass=STANDARD", "w") as s3_sink:
csv_source.pipe(s3_sink)
with open_stream("s3://bucket/foobar.csv", "r") as csv:
df = pd.read_csv(csv)
TempStream
TempStream
is a stream can functions like a virtual file system in memory.
import gc
from iotoolz import Stream, exists, glob, iter_dir
Stream("tmp://foo/bar/data.txt", data="foobar")
exists("tmp://foo/bar/data.txt")
gc.collect()
exists("tmp://foo/bar/data.txt")
s1 = Stream("tmp://foo/bar/data.txt", data="foobar")
s2 = Stream("tmp://foo/example.txt", data="...")
iter_dir("tmp://foo/")
glob("tmp://foo/bar/*.txt")
Stream-like operations
Stream
is an alias of open_stream
, both methods return a concrete AbcStream
object.
You can treat the object as both a "file-like" and "stream-like" object - i.e. you can
read, write, seek, flush, close the object.
NOTE
By default, the underlying buffer is in-memory. You can enable rollover to disk by
passing the inmem_size
arg to the method, or update the default inmem_size
value
with the iotoolz.streams.set_buffer_rollover_size
method.
from iotoolz import open_stream, Stream, set_buffer_rollover_size
assert open_stream == Stream
set_buffer_rollover_size(10**8)
stream = Stream(
"path/to/data.txt",
mode="rw",
)
data = stream.read()
print(stream.encoding)
print(stream.content_type)
stream.seek(5)
stream.write("replace with this text")
stream.seek(0, whence=2)
stream.write("additional text after original eof")
stream.save()
stream.close()
Path-like operations
exists
, mkdir
, iter_dir
and glob
are path-like methods that are available to the
stream object. These methods mimics their equivalent in pathlib.Path
when appropriate.
method | supported streams | desc |
---|
stats | All Streams | return the StreamInfo for an existing resource |
unlink , delete , remove | All Streams | Delete and remove the stream (except for TempStream where the buffer is cleared instead) |
exists | All Streams | check if a stream points to an existing resource. |
mkdir | FileStream | create a directory. |
rmdir | FileStream , TempStream , and S3Stream , | remove recursively everything in the directory. |
iter_dir | FileStream , TempStream , and S3Stream | iterate thru the streams in the directory. |
glob | FileStream , TempStream , and S3Stream | iterate thru the streams in the directory that match a pattern. |
import itertools
from iotoolz import Stream, mkdir, iter_dir, glob, exists
mkdir("path/to/folder", parents=True, exist_ok=True)
Stream("path/to/folder").mkdir(parents=True, exist_ok=True)
iter_dir("s3://bucket/prefix/")
for stream in Stream("s3://bucket/prefix/").iter_dir():
print(stream.uri)
glob("s3://bucket/prefix/*txt")
for stream in Stream("s3://bucket/prefix/").glob("*.txt"):
print(stream.uri)
exists("s3://bucket/prefix/foo.txt")
info = stats("s3://bucket/prefix/foo.txt")
print(info.name)
print(info.content_type)
print(info.encoding)
print(info.last_modified)
print(info.etag)
print(info.extras)
unlink("s3://bucket/prefix/foo.txt")
rmdir("s3://bucket/prefix/")
Piping streams
pipe
is method to push data to a sink (similar to NodeJS stream except it has no
watermark or buffering).
from iotoolz.streams import open_stream
local_file = open_stream(
"path/to/google.html", content_type="text/html", mode="w"
)
temp_file = open_stream(
"tmp://google.html", content_type="text/html", mode="wb"
)
with open_stream("https://google.com") as source:
source.pipe(temp_file).pipe(local_file)
local_file2 = open_stream(
"path/to/google1.html", content_type="text/html", mode="w"
)
local_file3 = open_stream(
"path/to/google2.html", content_type="text/html", mode="w"
)
with open_stream("tmp://foo_src", mode="w") as source:
source.pipe(local_file2)
source.pipe(local_file3)
source.write("hello world")
TODO support transform streams so that pipe can be more useful
License