Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

binlog

Package Overview
Dependencies
Maintainers
1
Versions
4
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

binlog - npm Package Compare versions

Comparing version
0.2.0
to
0.3.0
+220
src/stores/memory.rs
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
use std::ops::{Bound, RangeBounds};
use std::sync::{Arc, Mutex, Weak};
use std::vec::IntoIter as VecIter;
use crate::{utils, Entry, Error, Range, RangeableStore, Store, SubscribeableStore};
use crossbeam_channel::{unbounded, Receiver, Sender};
use string_cache::DefaultAtom as Atom;
#[derive(Clone, Default)]
struct MemoryStoreInternal {
entries: BTreeMap<(i64, Atom), Vec<Vec<u8>>>,
subscribers: HashMap<Atom, Vec<Weak<MemoryStreamIteratorInternal>>>,
}
#[derive(Clone, Default)]
pub struct MemoryStore(Arc<Mutex<MemoryStoreInternal>>);
impl Store for MemoryStore {
fn push(&self, entry: Cow<Entry>) -> Result<(), Error> {
let mut internal = self.0.lock().unwrap();
internal
.entries
.entry((entry.timestamp, entry.name.clone()))
.or_insert_with(Vec::default)
.push(entry.value.clone());
if let Some(subscribers) = internal.subscribers.get_mut(&entry.name) {
let entry = entry.into_owned();
let mut new_subscribers = Vec::<Weak<MemoryStreamIteratorInternal>>::default();
for subscriber in subscribers.drain(..) {
if let Some(subscriber) = Weak::upgrade(&subscriber) {
subscriber.notify(entry.clone());
new_subscribers.push(Arc::downgrade(&subscriber));
}
}
*subscribers = new_subscribers;
}
Ok(())
}
fn latest<A: Into<Atom>>(&self, name: A) -> Result<Option<Entry>, Error> {
let name = name.into();
let internal = self.0.lock().unwrap();
for ((map_timestamp, map_name), map_values) in internal.entries.iter().rev() {
if map_name != &name {
continue;
}
if let Some(value) = map_values.last() {
return Ok(Some(Entry::new_with_timestamp(*map_timestamp, name, value.clone())));
}
}
Ok(None)
}
}
impl RangeableStore for MemoryStore {
type Range = MemoryRange;
fn range<A: Into<Atom>, R: RangeBounds<i64>>(&self, range: R, name: Option<A>) -> Result<Self::Range, Error> {
utils::check_bounds(range.start_bound(), range.end_bound())?;
Ok(Self::Range {
internal: self.0.clone(),
start_bound: range.start_bound().cloned(),
end_bound: range.end_bound().cloned(),
name: name.map(|n| n.into()),
})
}
}
pub struct MemoryRange {
internal: Arc<Mutex<MemoryStoreInternal>>,
start_bound: Bound<i64>,
end_bound: Bound<i64>,
name: Option<Atom>,
}
impl MemoryRange {
fn full_start_bound(&self) -> (i64, Atom) {
match self.start_bound {
Bound::Included(timestamp) => (timestamp, Atom::from("")),
Bound::Excluded(timestamp) => (timestamp + 1, Atom::from("")),
Bound::Unbounded => (i64::min_value(), Atom::from("")),
}
}
fn done_iterating_in_range(&self, timestamp: i64) -> bool {
match self.end_bound {
Bound::Included(end_bound_timestamp) => timestamp <= end_bound_timestamp,
Bound::Excluded(end_bound_timestamp) => timestamp < end_bound_timestamp,
Bound::Unbounded => false,
}
}
fn filter_name_in_range(&self, name: &Atom) -> bool {
if let Some(ref expected_name) = self.name {
name != expected_name
} else {
false
}
}
}
impl Range for MemoryRange {
type Iter = VecIter<Result<Entry, Error>>;
fn count(&self) -> Result<u64, Error> {
let mut count: u64 = 0;
let internal = self.internal.lock().unwrap();
for ((timestamp, name), values) in internal.entries.range(self.full_start_bound()..) {
if self.done_iterating_in_range(*timestamp) {
break;
}
if self.filter_name_in_range(name) {
continue;
}
count += values.len() as u64;
}
Ok(count)
}
fn remove(self) -> Result<(), Error> {
let mut removeable_keys = Vec::default();
let mut internal = self.internal.lock().unwrap();
for ((timestamp, name), _values) in internal.entries.range(self.full_start_bound()..) {
if self.done_iterating_in_range(*timestamp) {
break;
}
if self.filter_name_in_range(name) {
continue;
}
removeable_keys.push((*timestamp, name.clone()));
}
for key in removeable_keys {
internal.entries.remove(&key);
}
Ok(())
}
fn iter(self) -> Result<Self::Iter, Error> {
let mut returnable_entries = Vec::default();
let internal = self.internal.lock().unwrap();
for ((timestamp, name), values) in internal.entries.range(self.full_start_bound()..) {
if self.done_iterating_in_range(*timestamp) {
break;
}
if self.filter_name_in_range(name) {
continue;
}
for value in values.iter() {
returnable_entries.push(Ok(Entry::new_with_timestamp(*timestamp, name.clone(), value.clone())));
}
}
Ok(returnable_entries.into_iter())
}
}
impl SubscribeableStore for MemoryStore {
type Subscription = MemoryStreamIterator;
fn subscribe<A: Into<Atom>>(&self, name: A) -> Result<Self::Subscription, Error> {
let (tx, rx) = unbounded();
let iterator_internal = Arc::new(MemoryStreamIteratorInternal { tx });
let mut internal = self.0.lock().unwrap();
internal
.subscribers
.entry(name.into())
.or_insert_with(Vec::default)
.push(Arc::downgrade(&iterator_internal));
Ok(MemoryStreamIterator {
_internal: iterator_internal,
rx,
})
}
}
struct MemoryStreamIteratorInternal {
tx: Sender<Entry>,
}
impl MemoryStreamIteratorInternal {
fn notify(&self, entry: Entry) {
self.tx.send(entry).unwrap();
}
}
#[derive(Clone)]
pub struct MemoryStreamIterator {
_internal: Arc<MemoryStreamIteratorInternal>,
rx: Receiver<Entry>,
}
impl Iterator for MemoryStreamIterator {
type Item = Result<Entry, Error>;
fn next(&mut self) -> Option<Self::Item> {
let value = self.rx.recv().unwrap();
Some(Ok(value))
}
}
#[cfg(test)]
mod tests {
use crate::{define_test, test_rangeable_store_impl, test_store_impl, test_subscribeable_store_impl, MemoryStore};
test_store_impl!(MemoryStore::default());
test_rangeable_store_impl!(MemoryStore::default());
test_subscribeable_store_impl!(MemoryStore::default());
}
#[cfg(test)]
#[cfg(feature = "benches")]
mod benches {
use crate::{bench_rangeable_store_impl, bench_store_impl, define_bench, MemoryStore};
bench_store_impl!(MemoryStore::default());
bench_rangeable_store_impl!(MemoryStore::default());
}
pub mod memory;
#[cfg(feature = "redis-store")]
pub mod redis;
#[cfg(feature = "sqlite-store")]
pub mod sqlite;
pub mod traits;
use std::borrow::Cow;
use std::error::Error as StdError;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use crate::{Entry, Error, Store, SubscribeableStore};
use byteorder::{ByteOrder, LittleEndian};
use crossbeam_channel::{unbounded, Receiver, Sender};
use redis::streams::{StreamId, StreamMaxlen, StreamRangeReply, StreamReadOptions, StreamReadReply};
use redis::{Client, Cmd, Commands, Connection, ConnectionLike, IntoConnectionInfo, RedisError, Value};
use string_cache::DefaultAtom as Atom;
static STREAM_READ_BLOCK_MS: usize = 1000;
static CONN_POOL_MAX_COUNT: usize = 4;
impl From<RedisError> for Error {
fn from(err: RedisError) -> Self {
Error::Database(Box::new(err))
}
}
fn redis_channel(name: &Atom) -> String {
format!("binlog:stream:v0:{}", name)
}
fn invalid_data_err<E: Into<Box<dyn StdError + Send + Sync>>>(msg: E) -> Error {
IoError::new(IoErrorKind::InvalidData, msg).into()
}
fn unexpected_data_format() -> Error {
invalid_data_err("unexpected data format received from redis")
}
fn entry_from_stream_id(stream_id: &StreamId, name: Atom) -> Result<Entry, Error> {
let (timestamp, value) = match (stream_id.map.get("timestamp"), stream_id.map.get("value")) {
(Some(Value::Data(timestamp_bytes)), Some(Value::Data(value_bytes))) => {
(LittleEndian::read_i64(timestamp_bytes), value_bytes)
}
_ => {
return Err(unexpected_data_format());
}
};
Ok(Entry::new_with_timestamp(timestamp, name, value.clone()))
}
#[derive(Clone)]
pub struct RedisStreamStore {
client: Client,
conn_pool: Arc<Mutex<Vec<Connection>>>,
max_stream_len: StreamMaxlen,
}
impl RedisStreamStore {
pub fn new_with_client(client: Client, max_stream_len: usize) -> Self {
Self {
client,
conn_pool: Arc::new(Mutex::new(Vec::default())),
max_stream_len: StreamMaxlen::Approx(max_stream_len),
}
}
pub fn new<T: IntoConnectionInfo>(params: T, max_stream_len: usize) -> Result<Self, Error> {
Ok(Self::new_with_client(Client::open(params)?, max_stream_len))
}
fn with_connection<T, F>(&self, f: F) -> Result<T, Error>
where
F: FnOnce(&mut Connection) -> Result<T, Error>,
{
let mut conn = {
let mut conn_pool = self.conn_pool.lock().unwrap();
if let Some(conn) = conn_pool.pop() {
conn
} else {
self.client.get_connection()?
}
};
// It's possible that the connection is in a bad state, so don't return
// it to the pool if an error occurred.
let result = f(&mut conn)?;
let mut conn_pool = self.conn_pool.lock().unwrap();
if conn_pool.len() < CONN_POOL_MAX_COUNT {
conn_pool.push(conn);
}
Ok(result)
}
}
impl Store for RedisStreamStore {
fn push(&self, entry: Cow<Entry>) -> Result<(), Error> {
let channel = redis_channel(&entry.name);
let mut timestamp_bytes = [0; 8];
LittleEndian::write_i64(&mut timestamp_bytes, entry.timestamp);
let cmd = Cmd::xadd_maxlen(
channel,
self.max_stream_len,
"*",
&[
("timestamp", timestamp_bytes.as_slice()),
("value", entry.value.as_slice()),
],
);
self.with_connection(|conn| {
conn.req_command(&cmd)?;
Ok(())
})
}
fn latest<A: Into<Atom>>(&self, name: A) -> Result<Option<Entry>, Error> {
let name = name.into();
let channel = redis_channel(&name);
let reply: StreamRangeReply = self.with_connection(move |conn| {
let value = conn.xrevrange_count(channel, "+", "-", 1i8)?;
Ok(value)
})?;
debug_assert!(reply.ids.len() <= 1);
if reply.ids.is_empty() {
Ok(None)
} else {
match entry_from_stream_id(&reply.ids[0], name) {
Ok(entry) => Ok(Some(entry)),
Err(err) => Err(err),
}
}
}
}
impl SubscribeableStore for RedisStreamStore {
type Subscription = RedisStreamIterator;
fn subscribe<A: Into<Atom>>(&self, name: A) -> Result<Self::Subscription, Error> {
let conn = self.client.get_connection()?;
RedisStreamIterator::new(conn, name.into())
}
}
pub struct RedisStreamIterator {
shutdown: Arc<AtomicBool>,
rx: Option<Receiver<Result<Entry, Error>>>,
listener_thread: Option<thread::JoinHandle<()>>,
}
impl RedisStreamIterator {
fn new(conn: Connection, name: Atom) -> Result<Self, Error> {
let (tx, rx) = unbounded::<Result<Entry, Error>>();
let shutdown = Arc::new(AtomicBool::new(false));
let listener_thread = {
let shutdown = shutdown.clone();
thread::spawn(|| stream_listener(conn, name, tx, shutdown))
};
Ok(RedisStreamIterator {
shutdown,
rx: Some(rx),
listener_thread: Some(listener_thread),
})
}
}
impl Drop for RedisStreamIterator {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Relaxed);
let rx = self.rx.take().unwrap();
drop(rx);
let listener_thread = self.listener_thread.take().unwrap();
listener_thread.join().unwrap();
}
}
impl Iterator for RedisStreamIterator {
type Item = Result<Entry, Error>;
fn next(&mut self) -> Option<Self::Item> {
match self.rx.as_ref().unwrap().recv() {
Ok(value) => Some(value),
Err(_) => None,
}
}
}
fn stream_listener(mut conn: Connection, name: Atom, tx: Sender<Result<Entry, Error>>, shutdown: Arc<AtomicBool>) {
let channels = vec![redis_channel(&name)];
let mut last_id = "$".to_string();
let opts = StreamReadOptions::default().block(STREAM_READ_BLOCK_MS);
loop {
let reply: StreamReadReply = match conn.xread_options(&channels, &[&last_id], &opts) {
Ok(reply) => reply,
Err(err) => {
if tx.send(Err(err.into())).is_err() || shutdown.load(Ordering::Relaxed) {
return;
} else {
continue;
}
}
};
if reply.keys.is_empty() && shutdown.load(Ordering::Relaxed) {
return;
}
for stream_key in reply.keys {
for stream_id in stream_key.ids {
if tx.send(entry_from_stream_id(&stream_id, name.clone())).is_err() {
return;
}
last_id = stream_id.id;
}
}
}
}
#[cfg(test)]
mod tests {
use crate::{define_test, test_store_impl, test_subscribeable_store_impl, RedisStreamStore};
test_store_impl!(RedisStreamStore::new("redis://localhost:6379", 100).unwrap());
test_subscribeable_store_impl!(RedisStreamStore::new("redis://localhost:6379", 100).unwrap());
}
#[cfg(test)]
#[cfg(feature = "benches")]
mod benches {
use crate::{bench_store_impl, define_bench, RedisStreamStore};
bench_store_impl!(RedisStreamStore::new("redis://localhost:6379", 100).unwrap());
}
use std::borrow::Cow;
use std::collections::VecDeque;
use std::ops::{Bound, RangeBounds};
use std::path::Path;
use std::vec::IntoIter as VecIter;
use crate::{utils, Entry, Error, Range, RangeableStore, Store};
use r2d2::{Error as R2d2Error, Pool};
use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::{params, params_from_iter, Error as SqliteError, OptionalExtension, ParamsFromIter};
use string_cache::DefaultAtom as Atom;
use zstd::bulk::{compress, Decompressor};
static SCHEMA: &str = r#"
create table if not exists log (
id integer primary key,
ts integer not null,
name text not null,
size integer not null,
value blob not null
);
create index idx_log_ts on log(ts);
"#;
// Do not compress entries smaller than this size
static MIN_SIZE_TO_COMPRESS: usize = 32;
static DEFAULT_COMPRESSION_LEVEL: i32 = 1;
static PAGINATION_LIMIT: usize = 1000;
impl From<SqliteError> for Error {
fn from(err: SqliteError) -> Self {
Error::Database(Box::new(err))
}
}
impl From<R2d2Error> for Error {
fn from(err: R2d2Error) -> Self {
Error::Database(Box::new(err))
}
}
fn entry_from_row<S: Into<Atom>>(
decompressor: &mut Decompressor<'_>,
timestamp: i64,
name: S,
size: usize,
blob: Vec<u8>,
) -> Result<Entry, Error> {
if size > 0 {
let blob_decompressed = decompressor.decompress(&blob, size)?;
Ok(Entry::new_with_timestamp(timestamp, name.into(), blob_decompressed))
} else {
Ok(Entry::new_with_timestamp(timestamp, name.into(), blob))
}
}
struct StatementBuilder {
start_bound: Bound<i64>,
end_bound: Bound<i64>,
name: Option<Atom>,
}
impl StatementBuilder {
fn new<R: RangeBounds<i64>>(range: R, name: Option<Atom>) -> StatementBuilder {
Self {
start_bound: range.start_bound().cloned(),
end_bound: range.end_bound().cloned(),
name,
}
}
fn params(&self) -> ParamsFromIter<VecIter<String>> {
if let Some(name) = &self.name {
params_from_iter(vec![name.to_string()].into_iter())
} else {
params_from_iter(vec![].into_iter())
}
}
fn statement<'a>(&self, prefix: &'a str, suffix: &'a str) -> Cow<'a, str> {
let mut clauses = Vec::new();
match self.start_bound {
Bound::Included(s) => clauses.push(format!("ts >= {}", s)),
Bound::Excluded(s) => clauses.push(format!("ts > {}", s)),
Bound::Unbounded => {}
}
match self.end_bound {
Bound::Included(e) => clauses.push(format!("ts <= {}", e)),
Bound::Excluded(e) => clauses.push(format!("ts < {}", e)),
Bound::Unbounded => {}
}
if self.name.is_some() {
clauses.push("name = ?".to_string());
}
let where_clause = if clauses.is_empty() {
"".to_string()
} else {
format!("where {}", clauses.join(" and "))
};
if where_clause.is_empty() && suffix.is_empty() {
Cow::Borrowed(prefix)
} else {
Cow::Owned(format!("{} {} {}", prefix, where_clause, suffix))
}
}
}
#[derive(Clone)]
pub struct SqliteStore {
pool: Pool<SqliteConnectionManager>,
compression_level: i32,
}
impl SqliteStore {
pub fn new_with_pool(pool: Pool<SqliteConnectionManager>, compression_level: Option<i32>) -> Result<Self, Error> {
{
let conn = pool.get()?;
conn.pragma_update(None, "journal_mode", "wal2")?;
conn.execute(SCHEMA, params![])?;
}
Ok(Self {
pool,
compression_level: compression_level.unwrap_or(DEFAULT_COMPRESSION_LEVEL),
})
}
pub fn new<P: AsRef<Path>>(path: P, compression_level: Option<i32>) -> Result<Self, Error> {
let manager = SqliteConnectionManager::file(path);
let pool = r2d2::Pool::new(manager)?;
Self::new_with_pool(pool, compression_level)
}
}
impl Store for SqliteStore {
fn push(&self, entry: Cow<Entry>) -> Result<(), Error> {
let (blob_compressed, size) = if entry.value.len() >= MIN_SIZE_TO_COMPRESS {
(compress(&entry.value, self.compression_level)?, entry.value.len())
} else {
(Vec::default(), 0)
};
let blob_ref = if blob_compressed.is_empty() {
&entry.value
} else {
&blob_compressed
};
let conn = self.pool.get()?;
let mut stmt = conn.prepare_cached("insert into log (ts, name, size, value) values (?, ?, ?, ?)")?;
stmt.execute(params![entry.timestamp, entry.name.as_ref(), size, blob_ref])?;
Ok(())
}
fn latest<A: Into<Atom>>(&self, name: A) -> Result<Option<Entry>, Error> {
let name = name.into();
let conn = self.pool.get()?;
let mut stmt = conn.prepare_cached("select ts, size, value from log where name = ? order by ts desc")?;
let row = stmt
.query_row(params![name.as_ref()], |row| {
Ok((row.get(0)?, row.get(1)?, row.get(2)?))
})
.optional()?;
if let Some((timestamp, size, blob)) = row {
let mut decompressor = Decompressor::new()?;
let entry = entry_from_row(&mut decompressor, timestamp, name, size, blob)?;
Ok(Some(entry))
} else {
Ok(None)
}
}
}
impl RangeableStore for SqliteStore {
type Range = SqliteRange;
fn range<A: Into<Atom>, R: RangeBounds<i64>>(&self, range: R, name: Option<A>) -> Result<Self::Range, Error> {
utils::check_bounds(range.start_bound(), range.end_bound())?;
Ok(SqliteRange {
pool: self.pool.clone(),
statement_builder: StatementBuilder::new(range, name.map(|n| n.into())),
})
}
}
pub struct SqliteRange {
pool: Pool<SqliteConnectionManager>,
statement_builder: StatementBuilder,
}
impl Range for SqliteRange {
type Iter = SqliteRangeIterator;
fn count(&self) -> Result<u64, Error> {
let conn = self.pool.get()?;
let mut stmt = conn.prepare(&self.statement_builder.statement("select count(id) from log", ""))?;
let len: u64 = stmt.query_row(self.statement_builder.params(), |row| row.get(0))?;
Ok(len)
}
fn remove(self) -> Result<(), Error> {
let conn = self.pool.get()?;
let mut stmt = conn.prepare(&self.statement_builder.statement("delete from log", ""))?;
stmt.execute(self.statement_builder.params())?;
Ok(())
}
fn iter(self) -> Result<Self::Iter, Error> {
Ok(SqliteRangeIterator {
pool: self.pool,
statement_builder: self.statement_builder,
entries: VecDeque::default(),
offset: 0,
done: false,
})
}
}
pub struct SqliteRangeIterator {
pool: Pool<SqliteConnectionManager>,
statement_builder: StatementBuilder,
entries: VecDeque<Entry>,
offset: usize,
done: bool,
}
impl SqliteRangeIterator {
fn fill_entries(&mut self) -> Result<(), Error> {
let conn = self.pool.get()?;
let mut stmt = conn.prepare(&self.statement_builder.statement(
"select ts, name, size, value from log",
&format!("order by ts limit {} offset {}", PAGINATION_LIMIT, self.offset),
))?;
let mut rows = stmt.query(self.statement_builder.params())?;
let mut decompressor = Decompressor::new()?;
let mut added = 0;
while let Some(row) = rows.next()? {
let timestamp: i64 = row.get(0)?;
let name: String = row.get(1)?;
let size: usize = row.get(2)?;
let blob: Vec<u8> = row.get(3)?;
self.entries
.push_back(entry_from_row(&mut decompressor, timestamp, name, size, blob)?);
added += 1;
}
if added < PAGINATION_LIMIT {
self.done = true;
}
self.offset += PAGINATION_LIMIT;
Ok(())
}
}
impl Iterator for SqliteRangeIterator {
type Item = Result<Entry, Error>;
fn next(&mut self) -> Option<Self::Item> {
if self.entries.is_empty() && !self.done {
if let Err(err) = self.fill_entries() {
return Some(Err(err));
}
}
self.entries.pop_front().map(Ok)
}
}
#[cfg(test)]
mod tests {
use crate::{define_test, test_rangeable_store_impl, test_store_impl, SqliteStore};
use tempfile::NamedTempFile;
test_store_impl!({
let file = NamedTempFile::new().unwrap().into_temp_path();
SqliteStore::new(file, None).unwrap()
});
test_rangeable_store_impl!({
let file = NamedTempFile::new().unwrap().into_temp_path();
SqliteStore::new(file, None).unwrap()
});
}
#[cfg(test)]
#[cfg(feature = "benches")]
mod benches {
use crate::{bench_rangeable_store_impl, bench_store_impl, define_bench, SqliteStore};
use tempfile::NamedTempFile;
bench_store_impl!({
let file = NamedTempFile::new().unwrap().into_temp_path();
SqliteStore::new(file, None).unwrap()
});
bench_rangeable_store_impl!({
let file = NamedTempFile::new().unwrap().into_temp_path();
SqliteStore::new(file, None).unwrap()
});
}
use std::borrow::Cow;
use std::ops::RangeBounds;
use crate::{Entry, Error};
use string_cache::DefaultAtom as Atom;
pub trait Store: Send + Sync {
fn push(&self, entry: Cow<Entry>) -> Result<(), Error>;
fn latest<A: Into<Atom>>(&self, name: A) -> Result<Option<Entry>, Error>;
}
pub trait RangeableStore: Store {
type Range: Range;
fn range<A: Into<Atom>, R: RangeBounds<i64>>(&self, range: R, name: Option<A>) -> Result<Self::Range, Error>;
}
pub trait Range {
type Iter: Iterator<Item = Result<Entry, Error>>;
fn count(&self) -> Result<u64, Error>;
fn remove(self) -> Result<(), Error>;
fn iter(self) -> Result<Self::Iter, Error>;
}
pub trait SubscribeableStore: Store {
type Subscription: Iterator<Item = Result<Entry, Error>>;
fn subscribe<A: Into<Atom>>(&self, name: A) -> Result<Self::Subscription, Error>;
}
+1
-1

@@ -35,3 +35,3 @@ # This file is automatically @generated by Cargo.

name = "binlog"
version = "0.2.0"
version = "0.3.0"
dependencies = [

@@ -38,0 +38,0 @@ "byteorder",

[package]
name = "binlog"
version = "0.2.0"
version = "0.3.0"
edition = "2021"

@@ -5,0 +5,0 @@ authors = ["Yusuf Simonson <simonson@gmail.com>"]

# Changelog
## 0.2.0 (not yet released)
## 0.3.0 (4/7/2022)
* Merged the python and rust libraries into one
* Rust library
* Support for getting the latest value (PR #30)
* Accept `Into<Atom>` rather than `Atom`, providing a more ergonomic interface (PR #36)
* Fixed range check error found by the fuzzer (PR #35)
* Hide test macros from the docs (PR #32)
* Reorganize code (PR #31, #27)
* Python library
* Added support for sqlite ranges (PR #37)
* Release the GIL where possible for better performance (PR #37)
* Sqlite store
* Removed the global compressor/decompressor for better multithreaded performance (PR #34)
## 0.2.0 (4/6/2022)
* Redis streaming store (PR #22)
* Python interface for the redis streaming store (PR #23)
* Use `i64`s for timestamps instead of `Duration` (PR #19)
* Split up the store trait (PR #18)
* Merged the python and rust libraries into one (PR #7)
* Added tests, CI (PR #9)
* Added examples (PR #12)
* Added benchmarks (PR #16)
* Added `Cargo.lock` since the package can now be built as a cdylib
* Added tests
* Added examples
* Added CI
* Python: Support for pushing to sqlite stores
* Sqlite: Enabled WAL2 mode (PR #17)

@@ -12,0 +31,0 @@ ## 0.1.0 (3/31/2022)

use binlog::{Entry, Error, Range, RangeableStore, SqliteStore, Store};
use std::borrow::Cow;
use string_cache::Atom;
use string_cache::DefaultAtom as Atom;

@@ -20,3 +20,3 @@ /// Demonstrates the sqlite store, with results in `example.db`. You may want to delete that before

// Queries are done via `range`. Here we grab entries with any timestamp and any name.
let range = store.range(.., None)?;
let range = store.range(.., Option::<String>::None)?;
// Count the number of entries.

@@ -23,0 +23,0 @@ println!("initial count: {}", range.count()?);

Metadata-Version: 2.1
Name: binlog
Version: 0.2.0
Version: 0.3.0
Classifier: Programming Language :: Rust

@@ -8,2 +8,3 @@ Classifier: Programming Language :: Python :: Implementation :: CPython

Requires-Dist: cffi
License-File: LICENSE
Summary: A binary data log library

@@ -23,5 +24,5 @@ Keywords: database

A rust library for creating and managing logs of arbitrary binary data. Presently it's used to collect sensor data on a robot. But should be generally helpful in cases where you need to store timeseries data, in a nearly (but not strictly) append-only fashion.
A rust library for creating and managing logs of arbitrary binary data. Presently it's used to collect sensor data. But it should generally be helpful in cases where you need to store timeseries data, in a nearly (but not strictly) append-only fashion.
The underlying storage of logs are pluggable via a few [traits](https://github.com/ysimonson/binlog/blob/main/src/traits.rs). Binlog includes built-in implementations via sqlite, redis, and in-memory-only. Additionally, python bindings allow you to use (a subset of) binlog from python.
The underlying storage of logs are pluggable via a few [traits](https://github.com/ysimonson/binlog/blob/main/src/stores/traits.rs). Binlog includes built-in implementations via sqlite, redis, and in-memory-only. Additionally, python bindings allow you to use (a subset of) binlog from python.

@@ -54,3 +55,3 @@ ## Usage

// Queries are done via `range`. Here we grab entries with any timestamp and any name.
let range = store.range(.., None)?;
let range = store.range(.., Option::<String>::None)?;
// Count the number of entries.

@@ -89,3 +90,3 @@ println!("initial count: {}", range.count()?);

Stores implement the [`Store` trait, and zero or more optional extensions](https://github.com/ysimonson/binlog/blob/main/src/traits.rs) depending on their supported functionality. A few stores implementations are built-in to `binlog`:
Stores implement the [`Store` trait, and zero or more optional extensions](https://github.com/ysimonson/binlog/blob/main/src/stores/traits.rs) depending on their supported functionality. A few stores implementations are built-in to `binlog`:

@@ -92,0 +93,0 @@ ### In-memory-only

@@ -13,4 +13,2 @@ import tempfile

sub_entry = next(sub)
print(dir(entry))
print(dir(sub_entry))
assert entry.timestamp == sub_entry.timestamp

@@ -17,0 +15,0 @@ assert entry.name == sub_entry.name

@@ -7,5 +7,5 @@ # binlog

A rust library for creating and managing logs of arbitrary binary data. Presently it's used to collect sensor data on a robot. But should be generally helpful in cases where you need to store timeseries data, in a nearly (but not strictly) append-only fashion.
A rust library for creating and managing logs of arbitrary binary data. Presently it's used to collect sensor data. But it should generally be helpful in cases where you need to store timeseries data, in a nearly (but not strictly) append-only fashion.
The underlying storage of logs are pluggable via a few [traits](https://github.com/ysimonson/binlog/blob/main/src/traits.rs). Binlog includes built-in implementations via sqlite, redis, and in-memory-only. Additionally, python bindings allow you to use (a subset of) binlog from python.
The underlying storage of logs are pluggable via a few [traits](https://github.com/ysimonson/binlog/blob/main/src/stores/traits.rs). Binlog includes built-in implementations via sqlite, redis, and in-memory-only. Additionally, python bindings allow you to use (a subset of) binlog from python.

@@ -38,3 +38,3 @@ ## Usage

// Queries are done via `range`. Here we grab entries with any timestamp and any name.
let range = store.range(.., None)?;
let range = store.range(.., Option::<String>::None)?;
// Count the number of entries.

@@ -73,3 +73,3 @@ println!("initial count: {}", range.count()?);

Stores implement the [`Store` trait, and zero or more optional extensions](https://github.com/ysimonson/binlog/blob/main/src/traits.rs) depending on their supported functionality. A few stores implementations are built-in to `binlog`:
Stores implement the [`Store` trait, and zero or more optional extensions](https://github.com/ysimonson/binlog/blob/main/src/stores/traits.rs) depending on their supported functionality. A few stores implementations are built-in to `binlog`:

@@ -76,0 +76,0 @@ ### In-memory-only

@@ -6,6 +6,7 @@ use std::borrow::Cow;

use string_cache::Atom;
use string_cache::DefaultAtom as Atom;
use test::Bencher;
/// Defines a benchmark function.
#[doc(hidden)]
#[macro_export]

@@ -22,2 +23,3 @@ macro_rules! define_bench {

#[doc(hidden)]
#[macro_export]

@@ -28,5 +30,7 @@ macro_rules! bench_store_impl {

define_bench!(push_parallel, $code);
define_bench!(latest, $code);
};
}
#[doc(hidden)]
#[macro_export]

@@ -40,3 +44,3 @@ macro_rules! bench_rangeable_store_impl {

pub fn push<S: Store>(b: &mut Bencher, store: &S) {
let entry = Entry::new_with_timestamp(1, Atom::from("bench_push"), vec![1, 2, 3]);
let entry = Entry::new_with_timestamp(1, "bench_push", vec![1, 2, 3]);
b.iter(|| {

@@ -53,5 +57,6 @@ store.push(Cow::Borrowed(&entry)).unwrap();

threads.push(thread::spawn(move || {
for j in 1..1001 {
let name = Atom::from("bench_push_parallel");
for j in 1..101 {
let idx = i * j;
let entry = Entry::new_with_timestamp(idx, Atom::from("bench_push_parallel"), vec![1, 2, 3]);
let entry = Entry::new_with_timestamp(idx, name.clone(), vec![1, 2, 3]);
store.push(Cow::Owned(entry)).unwrap();

@@ -67,10 +72,24 @@ }

pub fn latest<S: Store + Clone + 'static>(b: &mut Bencher, store: &S) {
let name = Atom::from("bench_latest");
store
.push(Cow::Owned(Entry::new_with_timestamp(1, name.clone(), vec![1, 2, 3])))
.unwrap();
b.iter(|| {
store.latest(name.clone()).unwrap();
});
}
pub fn iter<S: RangeableStore>(b: &mut Bencher, store: &S) {
for i in 0..=255u8 {
let entry = Entry::new_with_timestamp(i.into(), Atom::from("bench_iter"), vec![i]);
let entry = Entry::new_with_timestamp(i.into(), "bench_iter", vec![i]);
store.push(Cow::Owned(entry)).unwrap();
}
b.iter(|| {
assert_eq!(store.range(.., None).unwrap().iter().unwrap().count(), 256);
assert_eq!(
store.range(.., Option::<Atom>::None).unwrap().iter().unwrap().count(),
256
);
});
}

@@ -13,3 +13,3 @@ use std::time::{SystemTime, UNIX_EPOCH};

impl Entry {
pub fn new(name: Atom, value: Vec<u8>) -> Entry {
pub fn new<A: Into<Atom>>(name: A, value: Vec<u8>) -> Entry {
let now = SystemTime::now()

@@ -21,8 +21,12 @@ .duration_since(UNIX_EPOCH)

.expect("great scott!!");
Self::new_with_timestamp(now, name, value)
Self::new_with_timestamp(now, name.into(), value)
}
pub fn new_with_timestamp(timestamp: i64, name: Atom, value: Vec<u8>) -> Entry {
Self { timestamp, name, value }
pub fn new_with_timestamp<A: Into<Atom>>(timestamp: i64, name: A, value: Vec<u8>) -> Entry {
Self {
timestamp,
name: name.into(),
value,
}
}
}

@@ -7,4 +7,3 @@ #![cfg_attr(feature = "benches", feature(test))]

mod errors;
mod memory;
mod traits;
mod stores;
mod utils;

@@ -16,6 +15,2 @@ #[macro_use]

mod python;
#[cfg(feature = "redis-store")]
mod redis;
#[cfg(feature = "sqlite-store")]
mod sqlite;
#[cfg(feature = "benches")]

@@ -27,8 +22,8 @@ #[macro_use]

pub use self::errors::Error;
pub use self::memory::MemoryStore;
pub use self::traits::{Range, RangeableStore, Store, SubscribeableStore};
pub use self::stores::memory::{MemoryRange, MemoryStore, MemoryStreamIterator};
pub use self::stores::traits::{Range, RangeableStore, Store, SubscribeableStore};
#[cfg(feature = "redis-store")]
pub use self::redis::{RedisStreamIterator, RedisStreamStore};
pub use self::stores::redis::{RedisStreamIterator, RedisStreamStore};
#[cfg(feature = "sqlite-store")]
pub use self::sqlite::{SqliteRange, SqliteRangeIterator, SqliteStore};
pub use self::stores::sqlite::{SqliteRange, SqliteRangeIterator, SqliteStore};
use std::borrow::Cow;
use std::ops::Bound;
use crate::{Error, Store, SubscribeableStore};
use crate::{Error, Range, RangeableStore, Store, SubscribeableStore};
use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use string_cache::DefaultAtom as Atom;

@@ -38,3 +38,3 @@ fn map_result<T>(res: Result<T, Error>) -> PyResult<T> {

fn from(entry: Entry) -> crate::Entry {
crate::Entry::new_with_timestamp(entry.timestamp, Atom::from(entry.name), entry.value)
crate::Entry::new_with_timestamp(entry.timestamp, entry.name, entry.value)
}

@@ -63,8 +63,83 @@ }

pub fn push(&self, entry: Entry) -> PyResult<()> {
map_result(self.store.push(Cow::Owned(entry.into())))
pub fn push(&self, py: Python, entry: Entry) -> PyResult<()> {
let entry = Cow::Owned(entry.into());
py.allow_threads(move || map_result(self.store.push(entry)))
}
pub fn range(
&self,
start_bound: Option<i64>,
end_bound: Option<i64>,
name: Option<String>,
) -> PyResult<SqliteRange> {
let start_bound = match start_bound {
Some(ts) => Bound::Included(ts),
None => Bound::Unbounded,
};
let end_bound = match end_bound {
Some(ts) => Bound::Excluded(ts),
None => Bound::Unbounded,
};
let range = map_result(self.store.range((start_bound, end_bound), name))?;
Ok(SqliteRange { range: Some(range) })
}
}
#[pyclass]
pub struct SqliteRange {
range: Option<crate::SqliteRange>,
}
#[pymethods]
impl SqliteRange {
pub fn count(&self) -> PyResult<u64> {
// Don't consume `self.range` so further operation on `self` can be
// run. The downside of this is that we can't release the GIL, so
// count is not as cheap as it should be.
if let Some(range) = &self.range {
map_result(range.count())
} else {
Err(PyValueError::new_err("range already consumed"))
}
}
pub fn remove(&mut self, py: Python) -> PyResult<()> {
if let Some(range) = self.range.take() {
py.allow_threads(move || map_result(range.remove()))
} else {
Err(PyValueError::new_err("range already consumed"))
}
}
pub fn iter(&mut self) -> PyResult<SqliteRangeIterator> {
if let Some(range) = self.range.take() {
let iter = map_result(range.iter())?;
Ok(SqliteRangeIterator { iter })
} else {
Err(PyValueError::new_err("range already consumed"))
}
}
}
#[pyclass]
pub struct SqliteRangeIterator {
iter: crate::SqliteRangeIterator,
}
#[pymethods]
impl SqliteRangeIterator {
fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}
fn __next__(mut slf: PyRefMut<'_, Self>, py: Python) -> Option<PyObject> {
match slf.iter.next() {
Some(Ok(entry)) => Some(Entry::from(entry).into_py(py)),
Some(Err(err)) => Some(map_result::<()>(Err(err)).unwrap_err().into_py(py)),
None => None,
}
}
}
#[pyclass]
pub struct RedisStreamStore {

@@ -83,8 +158,9 @@ store: crate::RedisStreamStore,

pub fn push(&self, entry: Entry) -> PyResult<()> {
map_result(self.store.push(Cow::Owned(entry.into())))
pub fn push(&self, py: Python, entry: Entry) -> PyResult<()> {
let entry = Cow::Owned(entry.into());
py.allow_threads(move || map_result(self.store.push(entry)))
}
pub fn subscribe(&self, name: String) -> PyResult<RedisStreamIterator> {
let iter = map_result(self.store.subscribe(Atom::from(name)))?;
let iter = map_result(self.store.subscribe(name))?;
Ok(RedisStreamIterator { iter })

@@ -117,5 +193,8 @@ }

m.add_class::<Entry>()?;
m.add_class::<SqliteStore>()?;
m.add_class::<SqliteRange>()?;
m.add_class::<SqliteRangeIterator>()?;
m.add_class::<RedisStreamStore>()?;
m.add_class::<SqliteStore>()?;
m.add_class::<RedisStreamIterator>()?;
Ok(())
}
use std::borrow::Cow;
use std::collections::VecDeque;
use string_cache::DefaultAtom as Atom;
use crate::{Entry, Error, Range, RangeableStore, Store, SubscribeableStore};
use string_cache::DefaultAtom as Atom;
/// Defines a unit test function.
#[doc(hidden)]
#[macro_export]

@@ -20,3 +20,11 @@ macro_rules! define_test {

#[doc(hidden)]
#[macro_export]
macro_rules! test_store_impl {
($code:expr) => {
define_test!(latest, $code);
};
}
#[macro_export]
macro_rules! test_rangeable_store_impl {

@@ -29,2 +37,3 @@ ($code:expr) => {

#[doc(hidden)]
#[macro_export]

@@ -37,5 +46,5 @@ macro_rules! test_subscribeable_store_impl {

fn insert_sample_data<S: Store>(store: &S, name: Atom) -> Result<(), Error> {
fn insert_sample_data<S: Store>(store: &S, name: &str) -> Result<(), Error> {
for i in 1..11 {
let entry = Entry::new_with_timestamp(i.into(), name.clone(), vec![i]);
let entry = Entry::new_with_timestamp(i.into(), name, vec![i]);
store.push(Cow::Owned(entry))?;

@@ -46,7 +55,7 @@ }

fn check_sample_data(mut results: VecDeque<Result<Entry, Error>>, name: Atom) -> Result<(), Error> {
fn check_sample_data(mut results: VecDeque<Result<Entry, Error>>, name: &str) -> Result<(), Error> {
assert_eq!(results.len(), 10);
for i in 1..11u8 {
let result = results.pop_front().unwrap()?;
assert_eq!(result, Entry::new_with_timestamp(i.into(), name.clone(), vec![i]));
assert_eq!(result, Entry::new_with_timestamp(i.into(), name, vec![i]));
}

@@ -57,27 +66,33 @@ Ok(())

pub fn remove<S: RangeableStore>(store: &S) {
insert_sample_data(store, Atom::from("test_remove")).unwrap();
assert_eq!(store.range(.., None).unwrap().count().unwrap(), 10);
store.range(2.., None).unwrap().remove().unwrap();
assert_eq!(store.range(.., None).unwrap().count().unwrap(), 1);
store
.range(.., Some(Atom::from("test_remove")))
.unwrap()
.remove()
.unwrap();
assert_eq!(store.range(.., None).unwrap().count().unwrap(), 0);
insert_sample_data(store, "test_remove").unwrap();
assert_eq!(store.range(.., Option::<Atom>::None).unwrap().count().unwrap(), 10);
store.range(2.., Option::<Atom>::None).unwrap().remove().unwrap();
assert_eq!(store.range(.., Option::<Atom>::None).unwrap().count().unwrap(), 1);
store.range(.., Some("test_remove")).unwrap().remove().unwrap();
assert_eq!(store.range(.., Option::<Atom>::None).unwrap().count().unwrap(), 0);
}
pub fn iter<S: RangeableStore>(store: &S) {
insert_sample_data(store, Atom::from("test_iter")).unwrap();
let results: VecDeque<Result<Entry, Error>> = store.range(.., None).unwrap().iter().unwrap().collect();
check_sample_data(results, Atom::from("test_iter")).unwrap();
insert_sample_data(store, "test_iter").unwrap();
let results: VecDeque<Result<Entry, Error>> =
store.range(.., Option::<Atom>::None).unwrap().iter().unwrap().collect();
check_sample_data(results, "test_iter").unwrap();
}
pub fn pubsub<S: SubscribeableStore + Clone>(store: &S) {
let subscriber = store.subscribe(Atom::from("test_pubsub")).unwrap();
let subscriber = store.subscribe("test_pubsub").unwrap();
// Give enough time for the thread to start up
std::thread::sleep(std::time::Duration::from_millis(100));
insert_sample_data(store, Atom::from("test_pubsub")).unwrap();
insert_sample_data(store, "test_pubsub").unwrap();
let results: VecDeque<Result<Entry, Error>> = subscriber.take(10).collect();
check_sample_data(results, Atom::from("test_pubsub")).unwrap();
check_sample_data(results, "test_pubsub").unwrap();
}
pub fn latest<S: Store + Clone>(store: &S) {
assert_eq!(store.latest("test_latest").unwrap(), None);
insert_sample_data(store, "test_latest").unwrap();
assert_eq!(
store.latest("test_latest").unwrap(),
Some(Entry::new_with_timestamp(10, "test_latest", vec![10]))
);
}

@@ -0,12 +1,27 @@

use super::Error;
use std::cmp::Ordering;
use std::ops::Bound;
use super::Error;
fn unwrap_bound(bound: Bound<&i64>) -> Option<i64> {
match bound {
Bound::Included(ts) => Some(*ts),
Bound::Excluded(ts) => Some(*ts),
_ => None,
}
}
pub(crate) fn check_bounds(start_bound: Bound<&i64>, end_bound: Bound<&i64>) -> Result<(), Error> {
match (start_bound, end_bound) {
(Bound::Included(s), Bound::Included(e)) if s < e => Err(Error::BadRange),
(Bound::Included(s), Bound::Excluded(e)) if s <= e => Err(Error::BadRange),
(Bound::Excluded(s), Bound::Included(e)) if s <= e => Err(Error::BadRange),
_ => Ok(()),
if let (Some(start_ts), Some(end_ts)) = (unwrap_bound(start_bound), unwrap_bound(end_bound)) {
match start_ts.cmp(&end_ts) {
Ordering::Less => {}
Ordering::Equal => {
if matches!(start_bound, Bound::Excluded(_)) || matches!(end_bound, Bound::Excluded(_)) {
return Err(Error::BadRange);
}
}
Ordering::Greater => return Err(Error::BadRange),
}
}
Ok(())
}

@@ -13,0 +28,0 @@

use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
use std::ops::{Bound, RangeBounds};
use std::sync::{Arc, Mutex, Weak};
use std::vec::IntoIter as VecIter;
use super::{utils, Entry, Error, Range, RangeableStore, Store, SubscribeableStore};
use crossbeam_channel::{unbounded, Receiver, Sender};
use string_cache::DefaultAtom as Atom;
#[derive(Clone, Default)]
struct MemoryStoreInternal {
entries: BTreeMap<i64, Vec<(Atom, Vec<u8>)>>,
subscribers: HashMap<Atom, Vec<Weak<MemoryStreamIteratorInternal>>>,
}
#[derive(Clone, Default)]
pub struct MemoryStore(Arc<Mutex<MemoryStoreInternal>>);
impl Store for MemoryStore {
fn push(&self, entry: Cow<Entry>) -> Result<(), Error> {
let mut internal = self.0.lock().unwrap();
internal
.entries
.entry(entry.timestamp)
.or_insert_with(Vec::default)
.push((entry.name.clone(), entry.value.clone()));
if let Some(subscribers) = internal.subscribers.get_mut(&entry.name) {
let entry = entry.into_owned();
let mut new_subscribers = Vec::<Weak<MemoryStreamIteratorInternal>>::default();
for subscriber in subscribers.drain(..) {
if let Some(subscriber) = Weak::upgrade(&subscriber) {
subscriber.notify(entry.clone());
new_subscribers.push(Arc::downgrade(&subscriber));
}
}
*subscribers = new_subscribers;
}
Ok(())
}
}
impl RangeableStore for MemoryStore {
type Range = MemoryRange;
fn range<R: RangeBounds<i64>>(&self, range: R, name: Option<Atom>) -> Result<Self::Range, Error> {
utils::check_bounds(range.start_bound(), range.end_bound())?;
Ok(Self::Range {
internal: self.0.clone(),
start_bound: range.start_bound().cloned(),
end_bound: range.end_bound().cloned(),
name,
})
}
}
pub struct MemoryRange {
internal: Arc<Mutex<MemoryStoreInternal>>,
start_bound: Bound<i64>,
end_bound: Bound<i64>,
name: Option<Atom>,
}
impl Range for MemoryRange {
type Iter = VecIter<Result<Entry, Error>>;
fn count(&self) -> Result<u64, Error> {
let mut count: u64 = 0;
let internal = self.internal.lock().unwrap();
for (_, range) in internal.entries.range((self.start_bound, self.end_bound)) {
if let Some(ref name) = self.name {
count += range.iter().filter(|e| &e.0 == name).count() as u64;
} else {
count += range.len() as u64;
}
}
Ok(count)
}
fn remove(self) -> Result<(), Error> {
let mut internal = self.internal.lock().unwrap();
for (_, range) in internal.entries.range_mut((self.start_bound, self.end_bound)) {
if let Some(ref name) = self.name {
*range = range.drain(..).filter(|e| &e.0 != name).collect();
} else {
*range = Vec::default();
}
}
Ok(())
}
fn iter(self) -> Result<Self::Iter, Error> {
let mut returnable_entries = Vec::default();
let internal = self.internal.lock().unwrap();
for (timestamp, range) in internal.entries.range((self.start_bound, self.end_bound)) {
if let Some(ref name) = self.name {
for entry in range.iter().filter(|e| &e.0 == name) {
returnable_entries.push(Ok(Entry::new_with_timestamp(
*timestamp,
entry.0.clone(),
entry.1.clone(),
)));
}
} else {
for entry in range.iter() {
returnable_entries.push(Ok(Entry::new_with_timestamp(
*timestamp,
entry.0.clone(),
entry.1.clone(),
)));
}
}
}
Ok(returnable_entries.into_iter())
}
}
impl SubscribeableStore for MemoryStore {
type Subscription = MemoryStreamIterator;
fn subscribe(&self, name: Atom) -> Result<Self::Subscription, Error> {
let (tx, rx) = unbounded();
let iterator_internal = Arc::new(MemoryStreamIteratorInternal { tx });
let mut internal = self.0.lock().unwrap();
internal
.subscribers
.entry(name)
.or_insert_with(Vec::default)
.push(Arc::downgrade(&iterator_internal));
Ok(MemoryStreamIterator {
_internal: iterator_internal,
rx,
})
}
}
struct MemoryStreamIteratorInternal {
tx: Sender<Entry>,
}
impl MemoryStreamIteratorInternal {
fn notify(&self, entry: Entry) {
self.tx.send(entry).unwrap();
}
}
#[derive(Clone)]
pub struct MemoryStreamIterator {
_internal: Arc<MemoryStreamIteratorInternal>,
rx: Receiver<Entry>,
}
impl Iterator for MemoryStreamIterator {
type Item = Result<Entry, Error>;
fn next(&mut self) -> Option<Self::Item> {
let value = self.rx.recv().unwrap();
Some(Ok(value))
}
}
#[cfg(test)]
mod tests {
use crate::{define_test, test_rangeable_store_impl, test_subscribeable_store_impl};
test_rangeable_store_impl!({
use super::MemoryStore;
MemoryStore::default()
});
test_subscribeable_store_impl!({
use super::MemoryStore;
MemoryStore::default()
});
}
#[cfg(feature = "benches")]
mod benches {
use crate::{bench_rangeable_store_impl, bench_store_impl, define_bench};
bench_store_impl!({
use crate::MemoryStore;
MemoryStore::default()
});
bench_rangeable_store_impl!({
use crate::MemoryStore;
MemoryStore::default()
});
}
use std::borrow::Cow;
use std::error::Error as StdError;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use super::{Entry, Error, Store, SubscribeableStore};
use byteorder::{ByteOrder, LittleEndian};
use crossbeam_channel::{unbounded, Receiver, Sender};
use redis::streams::{StreamMaxlen, StreamReadOptions, StreamReadReply};
use redis::{Client, Cmd, Commands, Connection, ConnectionLike, IntoConnectionInfo, RedisError, Value};
use string_cache::DefaultAtom as Atom;
static STREAM_READ_BLOCK_MS: usize = 1000;
static PUSH_CONNS_MAX_COUNT: usize = 4;
macro_rules! get_field_from_stream_map {
($tx:expr, $stream_id:expr, $field_name:expr) => {
match $stream_id.map.get($field_name) {
Some(Value::Data(bytes)) => bytes,
_ => {
let err = invalid_data_err("unexpected data format received from redis");
if $tx.send(Err(err)).is_err() {
return;
} else {
break;
}
}
}
};
}
impl From<RedisError> for Error {
fn from(err: RedisError) -> Self {
Error::Database(Box::new(err))
}
}
fn redis_channel(name: &Atom) -> String {
format!("binlog:stream:v0:{}", name)
}
fn invalid_data_err<E: Into<Box<dyn StdError + Send + Sync>>>(msg: E) -> Error {
IoError::new(IoErrorKind::InvalidData, msg).into()
}
#[derive(Clone)]
pub struct RedisStreamStore {
client: Client,
push_conns: Arc<Mutex<Vec<Connection>>>,
max_stream_len: StreamMaxlen,
}
impl RedisStreamStore {
pub fn new_with_client(client: Client, max_stream_len: usize) -> Self {
Self {
client,
push_conns: Arc::new(Mutex::new(Vec::default())),
max_stream_len: StreamMaxlen::Approx(max_stream_len),
}
}
pub fn new<T: IntoConnectionInfo>(params: T, max_stream_len: usize) -> Result<Self, Error> {
Ok(Self::new_with_client(Client::open(params)?, max_stream_len))
}
}
impl Store for RedisStreamStore {
fn push(&self, entry: Cow<Entry>) -> Result<(), Error> {
let channel = redis_channel(&entry.name);
let mut timestamp_bytes = [0; 8];
LittleEndian::write_i64(&mut timestamp_bytes, entry.timestamp);
let payload = &[
("timestamp", timestamp_bytes.as_slice()),
("value", entry.value.as_slice()),
];
let cmd = Cmd::xadd_maxlen(channel, self.max_stream_len, "*", payload);
let mut push_conn = {
let mut push_conns = self.push_conns.lock().unwrap();
if let Some(conn) = push_conns.pop() {
conn
} else {
self.client.get_connection()?
}
};
push_conn.req_command(&cmd)?;
let mut push_conns = self.push_conns.lock().unwrap();
if push_conns.len() < PUSH_CONNS_MAX_COUNT {
push_conns.push(push_conn);
}
Ok(())
}
}
impl SubscribeableStore for RedisStreamStore {
type Subscription = RedisStreamIterator;
fn subscribe(&self, name: Atom) -> Result<Self::Subscription, Error> {
let conn = self.client.get_connection()?;
RedisStreamIterator::new(conn, name)
}
}
pub struct RedisStreamIterator {
shutdown: Arc<AtomicBool>,
rx: Option<Receiver<Result<Entry, Error>>>,
listener_thread: Option<thread::JoinHandle<()>>,
}
impl RedisStreamIterator {
fn new(conn: Connection, name: Atom) -> Result<Self, Error> {
let (tx, rx) = unbounded::<Result<Entry, Error>>();
let shutdown = Arc::new(AtomicBool::new(false));
let listener_thread = {
let shutdown = shutdown.clone();
thread::spawn(|| stream_listener(conn, name, tx, shutdown))
};
Ok(RedisStreamIterator {
shutdown,
rx: Some(rx),
listener_thread: Some(listener_thread),
})
}
}
impl Drop for RedisStreamIterator {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Relaxed);
let rx = self.rx.take().unwrap();
drop(rx);
let listener_thread = self.listener_thread.take().unwrap();
listener_thread.join().unwrap();
}
}
impl Iterator for RedisStreamIterator {
type Item = Result<Entry, Error>;
fn next(&mut self) -> Option<Self::Item> {
match self.rx.as_ref().unwrap().recv() {
Ok(value) => Some(value),
Err(_) => None,
}
}
}
fn stream_listener(mut conn: Connection, name: Atom, tx: Sender<Result<Entry, Error>>, shutdown: Arc<AtomicBool>) {
let channels = vec![redis_channel(&name)];
let mut last_id = "$".to_string();
let opts = StreamReadOptions::default().block(STREAM_READ_BLOCK_MS);
loop {
let reply: StreamReadReply = match conn.xread_options(&channels, &[&last_id], &opts) {
Ok(reply) => reply,
Err(err) => {
if tx.send(Err(err.into())).is_err() || shutdown.load(Ordering::Relaxed) {
return;
} else {
continue;
}
}
};
if reply.keys.is_empty() && shutdown.load(Ordering::Relaxed) {
return;
}
for stream_key in reply.keys {
for stream_id in stream_key.ids {
let timestamp_bytes = get_field_from_stream_map!(tx, stream_id, "timestamp");
let timestamp = LittleEndian::read_i64(timestamp_bytes);
let value = get_field_from_stream_map!(tx, stream_id, "value");
let entry = Entry::new_with_timestamp(timestamp, name.clone(), value.clone());
if tx.send(Ok(entry)).is_err() {
return;
}
last_id = stream_id.id;
}
}
}
}
#[cfg(test)]
mod tests {
use crate::define_test;
test_subscribeable_store_impl!({ super::RedisStreamStore::new("redis://localhost:6379", 100).unwrap() });
}
#[cfg(feature = "benches")]
mod benches {
use crate::{bench_store_impl, define_bench};
bench_store_impl!({ super::RedisStreamStore::new("redis://localhost:6379", 100).unwrap() });
}
use std::borrow::Cow;
use std::collections::VecDeque;
use std::ops::{Bound, RangeBounds};
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::vec::IntoIter as VecIter;
use super::{utils, Entry, Error, Range, RangeableStore, Store};
use r2d2::{Error as R2d2Error, Pool};
use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::Error as SqliteError;
use rusqlite::{params, params_from_iter, ParamsFromIter};
use string_cache::DefaultAtom as Atom;
use zstd::bulk::{Compressor, Decompressor};
static SCHEMA: &str = r#"
create table if not exists log (
id integer primary key,
ts integer not null,
name text not null,
size integer not null,
value blob not null
);
create index idx_log_ts on log(ts);
"#;
// Do not compress entries smaller than this size
static MIN_SIZE_TO_COMPRESS: usize = 32;
static DEFAULT_COMPRESSION_LEVEL: i32 = 1;
static PAGINATION_LIMIT: usize = 1000;
impl From<SqliteError> for Error {
fn from(err: SqliteError) -> Self {
Error::Database(Box::new(err))
}
}
impl From<R2d2Error> for Error {
fn from(err: R2d2Error) -> Self {
Error::Database(Box::new(err))
}
}
struct StatementBuilder {
start_bound: Bound<i64>,
end_bound: Bound<i64>,
name: Option<Atom>,
}
impl StatementBuilder {
fn new<R: RangeBounds<i64>>(range: R, name: Option<Atom>) -> StatementBuilder {
Self {
start_bound: range.start_bound().cloned(),
end_bound: range.end_bound().cloned(),
name,
}
}
fn params(&self) -> ParamsFromIter<VecIter<String>> {
if let Some(name) = &self.name {
params_from_iter(vec![name.to_string()].into_iter())
} else {
params_from_iter(vec![].into_iter())
}
}
fn statement<'a>(&self, prefix: &'a str, suffix: &'a str) -> Cow<'a, str> {
let mut clauses = Vec::new();
match self.start_bound {
Bound::Included(s) => clauses.push(format!("ts >= {}", s)),
Bound::Excluded(s) => clauses.push(format!("ts > {}", s)),
Bound::Unbounded => {}
}
match self.end_bound {
Bound::Included(e) => clauses.push(format!("ts <= {}", e)),
Bound::Excluded(e) => clauses.push(format!("ts < {}", e)),
Bound::Unbounded => {}
}
if self.name.is_some() {
clauses.push("name = ?".to_string());
}
let where_clause = if clauses.is_empty() {
"".to_string()
} else {
format!("where {}", clauses.join(" and "))
};
if where_clause.is_empty() && suffix.is_empty() {
Cow::Borrowed(prefix)
} else {
Cow::Owned(format!("{} {} {}", prefix, where_clause, suffix))
}
}
}
#[derive(Clone)]
pub struct SqliteStore {
pool: Pool<SqliteConnectionManager>,
compressor: Arc<Mutex<Compressor<'static>>>,
}
impl SqliteStore {
pub fn new_with_pool(pool: Pool<SqliteConnectionManager>, compression_level: Option<i32>) -> Result<Self, Error> {
{
let conn = pool.get()?;
conn.pragma_update(None, "journal_mode", "wal2")?;
conn.execute(SCHEMA, params![])?;
}
let compressor = Compressor::new(compression_level.unwrap_or(DEFAULT_COMPRESSION_LEVEL))?;
Ok(Self {
pool,
compressor: Arc::new(Mutex::new(compressor)),
})
}
pub fn new<P: AsRef<Path>>(path: P, compression_level: Option<i32>) -> Result<Self, Error> {
let manager = SqliteConnectionManager::file(path);
let pool = r2d2::Pool::new(manager)?;
Self::new_with_pool(pool, compression_level)
}
}
impl Store for SqliteStore {
fn push(&self, entry: Cow<Entry>) -> Result<(), Error> {
let (blob_compressed, size) = if entry.value.len() >= MIN_SIZE_TO_COMPRESS {
let mut compressor = self.compressor.lock().unwrap();
(compressor.compress(&entry.value)?, entry.value.len())
} else {
(Vec::default(), 0)
};
let blob_ref = if blob_compressed.is_empty() {
&entry.value
} else {
&blob_compressed
};
let conn = self.pool.get()?;
let mut stmt = conn.prepare_cached("insert into log (ts, name, size, value) values (?, ?, ?, ?)")?;
stmt.execute(params![entry.timestamp, entry.name.as_ref(), size, blob_ref])?;
Ok(())
}
}
impl RangeableStore for SqliteStore {
type Range = SqliteRange;
fn range<R: RangeBounds<i64>>(&self, range: R, name: Option<Atom>) -> Result<Self::Range, Error> {
utils::check_bounds(range.start_bound(), range.end_bound())?;
Ok(SqliteRange {
pool: self.pool.clone(),
statement_builder: StatementBuilder::new(range, name),
})
}
}
pub struct SqliteRange {
pool: Pool<SqliteConnectionManager>,
statement_builder: StatementBuilder,
}
impl Range for SqliteRange {
type Iter = SqliteRangeIterator;
fn count(&self) -> Result<u64, Error> {
let conn = self.pool.get()?;
let mut stmt = conn.prepare(&self.statement_builder.statement("select count(id) from log", ""))?;
let len: u64 = stmt.query_row(self.statement_builder.params(), |row| row.get(0))?;
Ok(len)
}
fn remove(self) -> Result<(), Error> {
let conn = self.pool.get()?;
let mut stmt = conn.prepare(&self.statement_builder.statement("delete from log", ""))?;
stmt.execute(self.statement_builder.params())?;
Ok(())
}
fn iter(self) -> Result<Self::Iter, Error> {
Ok(SqliteRangeIterator {
pool: self.pool,
statement_builder: self.statement_builder,
entries: VecDeque::default(),
offset: 0,
done: false,
})
}
}
pub struct SqliteRangeIterator {
pool: Pool<SqliteConnectionManager>,
statement_builder: StatementBuilder,
entries: VecDeque<Entry>,
offset: usize,
done: bool,
}
impl SqliteRangeIterator {
fn fill_entries(&mut self) -> Result<(), Error> {
let conn = self.pool.get()?;
let mut stmt = conn.prepare(&self.statement_builder.statement(
"select ts, name, size, value from log",
&format!("order by ts limit {} offset {}", PAGINATION_LIMIT, self.offset),
))?;
let mut rows = stmt.query(self.statement_builder.params())?;
let mut decompressor = Decompressor::new()?;
let mut added = 0;
while let Some(row) = rows.next()? {
let timestamp: i64 = row.get(0)?;
let name: String = row.get(1)?;
let name: Atom = Atom::from(name);
let size: usize = row.get(2)?;
let mut blob: Vec<u8> = row.get(3)?;
if size > 0 {
blob = decompressor.decompress(&blob, size)?;
}
self.entries.push_back(Entry::new_with_timestamp(timestamp, name, blob));
added += 1;
}
if added < PAGINATION_LIMIT {
self.done = true;
}
self.offset += PAGINATION_LIMIT;
Ok(())
}
}
impl Iterator for SqliteRangeIterator {
type Item = Result<Entry, Error>;
fn next(&mut self) -> Option<Self::Item> {
if self.entries.is_empty() && !self.done {
if let Err(err) = self.fill_entries() {
return Some(Err(err));
}
}
self.entries.pop_front().map(Ok)
}
}
#[cfg(test)]
mod tests {
use crate::define_test;
test_rangeable_store_impl!({
use super::SqliteStore;
use tempfile::NamedTempFile;
let file = NamedTempFile::new().unwrap().into_temp_path();
SqliteStore::new(file, None).unwrap()
});
}
#[cfg(feature = "benches")]
mod benches {
use crate::{bench_rangeable_store_impl, bench_store_impl, define_bench};
bench_store_impl!({
use super::SqliteStore;
use tempfile::NamedTempFile;
let file = NamedTempFile::new().unwrap().into_temp_path();
SqliteStore::new(file, None).unwrap()
});
bench_rangeable_store_impl!({
use super::SqliteStore;
use tempfile::NamedTempFile;
let file = NamedTempFile::new().unwrap().into_temp_path();
SqliteStore::new(file, None).unwrap()
});
}
use std::borrow::Cow;
use std::ops::RangeBounds;
use super::{Entry, Error};
use string_cache::DefaultAtom as Atom;
pub trait Store: Send + Sync {
fn push(&self, entry: Cow<Entry>) -> Result<(), Error>;
}
pub trait RangeableStore: Store {
type Range: Range;
fn range<R: RangeBounds<i64>>(&self, range: R, name: Option<Atom>) -> Result<Self::Range, Error>;
}
pub trait Range {
type Iter: Iterator<Item = Result<Entry, Error>>;
fn count(&self) -> Result<u64, Error>;
fn remove(self) -> Result<(), Error>;
fn iter(self) -> Result<Self::Iter, Error>;
}
pub trait SubscribeableStore: Store {
type Subscription: Iterator<Item = Result<Entry, Error>>;
fn subscribe(&self, name: Atom) -> Result<Self::Subscription, Error>;
}