Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

binlog

Package Overview
Dependencies
Maintainers
1
Versions
4
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

binlog - npm Package Compare versions

Comparing version
0.4.0
to
0.5.0
+1
-28
Cargo.lock

@@ -35,6 +35,5 @@ # This file is automatically @generated by Cargo.

name = "binlog"
version = "0.4.0"
version = "0.5.0"
dependencies = [
"byteorder",
"crossbeam-channel",
"pyo3",

@@ -94,22 +93,2 @@ "r2d2",

[[package]]
name = "crossbeam-channel"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aaa7bd5fb665c6864b5f963dd9097905c54125909c7aa94c9e18507cdbe6c53"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38"
dependencies = [
"cfg-if",
"lazy_static",
]
[[package]]
name = "dtoa"

@@ -225,8 +204,2 @@ version = "0.4.8"

[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"

@@ -233,0 +206,0 @@ version = "0.2.121"

+1
-2
[package]
name = "binlog"
version = "0.4.0"
version = "0.5.0"
edition = "2021"

@@ -24,3 +24,2 @@ authors = ["Yusuf Simonson <simonson@gmail.com>"]

[dependencies]
crossbeam-channel = "0.5.4"
string_cache = "0.8.4"

@@ -27,0 +26,0 @@

# Changelog
## 0.5.0 (5/8/2022)
* Support for pub/sub timeouts
* Redis store
* Removed separate thread for pub/sub iterators
## 0.4.0 (4/21/2022)

@@ -4,0 +10,0 @@

Metadata-Version: 2.1
Name: binlog
Version: 0.4.0
Version: 0.5.0
Classifier: Programming Language :: Rust

@@ -5,0 +5,0 @@ Classifier: Programming Language :: Python :: Implementation :: CPython

@@ -11,6 +11,7 @@ import tempfile

store.push(entry)
sub_entry = next(sub)
sub_entry = sub.next(None)
assert entry.timestamp == sub_entry.timestamp
assert entry.name == sub_entry.name
assert entry.value == sub_entry.value
sub_entry = sub.next(0.01)
assert sub_entry == None

@@ -20,8 +20,8 @@ #![cfg_attr(feature = "benches", feature(test))]

pub use self::errors::Error;
pub use self::stores::memory::{MemoryRange, MemoryStore, MemoryStreamIterator};
pub use self::stores::traits::{Range, RangeableStore, Store, SubscribeableStore};
pub use self::stores::memory::{MemoryRange, MemoryStore, MemoryStreamSubscription};
pub use self::stores::traits::{Range, RangeableStore, Store, SubscribeableStore, Subscription};
#[cfg(feature = "redis-store")]
pub use self::stores::redis::{RedisStreamIterator, RedisStreamStore};
pub use self::stores::redis::{RedisStreamStore, RedisStreamSubscription};
#[cfg(feature = "sqlite-store")]
pub use self::stores::sqlite::{SqliteRange, SqliteRangeIterator, SqliteStore};
use std::borrow::Cow;
use std::ops::Bound;
use std::time::Duration;
use crate::{Error, Range, RangeableStore, Store, SubscribeableStore};
use crate::{Error, Range, RangeableStore, Store, SubscribeableStore, Subscription};

@@ -93,8 +94,5 @@ use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError};

impl SqliteRange {
pub fn count(&self) -> PyResult<u64> {
// Don't consume `self.range` so further operation on `self` can be
// run. The downside of this is that we can't release the GIL, so
// count is not as cheap as it should be.
pub fn count(&self, py: Python) -> PyResult<u64> {
if let Some(range) = &self.range {
map_result(range.count())
py.allow_threads(move || map_result(range.count()))
} else {

@@ -113,6 +111,8 @@ Err(PyValueError::new_err("range already consumed"))

pub fn iter(&mut self) -> PyResult<SqliteRangeIterator> {
pub fn iter(&mut self, py: Python) -> PyResult<SqliteRangeIterator> {
if let Some(range) = self.range.take() {
let iter = map_result(range.iter())?;
Ok(SqliteRangeIterator { iter })
py.allow_threads(move || {
let iter = map_result(range.iter())?;
Ok(SqliteRangeIterator { iter })
})
} else {

@@ -163,5 +163,5 @@ Err(PyValueError::new_err("range already consumed"))

pub fn subscribe(&self, name: String) -> PyResult<RedisStreamIterator> {
let iter = map_result(self.store.subscribe(name))?;
Ok(RedisStreamIterator { iter })
pub fn subscribe(&self, name: String) -> PyResult<RedisStreamSubscription> {
let subscription = map_result(self.store.subscribe(name))?;
Ok(RedisStreamSubscription { subscription })
}

@@ -171,19 +171,15 @@ }

#[pyclass]
pub struct RedisStreamIterator {
iter: crate::RedisStreamIterator,
pub struct RedisStreamSubscription {
subscription: crate::RedisStreamSubscription,
}
#[pymethods]
impl RedisStreamIterator {
fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
impl RedisStreamSubscription {
pub fn next(&mut self, py: Python, duration: Option<f32>) -> PyResult<Option<Entry>> {
let duration = duration.map(Duration::from_secs_f32);
py.allow_threads(move || {
let entry = map_result(self.subscription.next(duration))?;
Ok(entry.map(|e| e.into()))
})
}
fn __next__(mut slf: PyRefMut<'_, Self>, py: Python) -> Option<PyObject> {
match slf.iter.next() {
Some(Ok(entry)) => Some(Entry::from(entry).into_py(py)),
Some(Err(err)) => Some(map_result::<()>(Err(err)).unwrap_err().into_py(py)),
None => None,
}
}
}

@@ -198,4 +194,4 @@

m.add_class::<RedisStreamStore>()?;
m.add_class::<RedisStreamIterator>()?;
m.add_class::<RedisStreamSubscription>()?;
Ok(())
}
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
use std::ops::{Bound, RangeBounds};
use std::sync::{Arc, Mutex, Weak};
use std::sync::{Arc, Condvar, Mutex, Weak};
use std::time::Duration;
use std::vec::IntoIter as VecIter;
use crate::{utils, Entry, Error, Range, RangeableStore, Store, SubscribeableStore};
use crate::{utils, Entry, Error, Range, RangeableStore, Store, SubscribeableStore, Subscription};
use crossbeam_channel::{unbounded, Receiver, Sender};
use string_cache::DefaultAtom as Atom;

@@ -15,5 +15,18 @@

entries: BTreeMap<(i64, Atom), Vec<Vec<u8>>>,
subscribers: HashMap<Atom, Vec<Weak<MemoryStreamIteratorInternal>>>,
subscribers: HashMap<Atom, Vec<Weak<MemoryStreamSubscriptionInternal>>>,
}
struct MemoryStreamSubscriptionInternal {
latest: Mutex<Option<Entry>>,
cvar: Condvar,
}
impl MemoryStreamSubscriptionInternal {
fn notify(&self, entry: Entry) {
let mut latest = self.latest.lock().unwrap();
*latest = Some(entry);
self.cvar.notify_all();
}
}
#[derive(Clone, Default)]

@@ -34,3 +47,3 @@ pub struct MemoryStore(Arc<Mutex<MemoryStoreInternal>>);

let entry = entry.into_owned();
let mut new_subscribers = Vec::<Weak<MemoryStreamIteratorInternal>>::default();
let mut new_subscribers = Vec::<Weak<MemoryStreamSubscriptionInternal>>::default();
for subscriber in subscribers.drain(..) {

@@ -166,15 +179,21 @@ if let Some(subscriber) = Weak::upgrade(&subscriber) {

impl SubscribeableStore for MemoryStore {
type Subscription = MemoryStreamIterator;
type Subscription = MemoryStreamSubscription;
fn subscribe<A: Into<Atom>>(&self, name: A) -> Result<Self::Subscription, Error> {
let (tx, rx) = unbounded();
let iterator_internal = Arc::new(MemoryStreamIteratorInternal { tx });
let name = name.into();
let latest = self.latest(&name)?;
let subscription_internal = Arc::new(MemoryStreamSubscriptionInternal {
latest: Mutex::new(latest),
cvar: Condvar::new(),
});
let mut internal = self.0.lock().unwrap();
internal
.subscribers
.entry(name.into())
.entry(name)
.or_insert_with(Vec::default)
.push(Arc::downgrade(&iterator_internal));
Ok(MemoryStreamIterator {
_internal: iterator_internal,
rx,
.push(Arc::downgrade(&subscription_internal));
Ok(MemoryStreamSubscription {
internal: subscription_internal,
last_timestamp: None,
})

@@ -184,24 +203,35 @@ }

struct MemoryStreamIteratorInternal {
tx: Sender<Entry>,
#[derive(Clone)]
pub struct MemoryStreamSubscription {
internal: Arc<MemoryStreamSubscriptionInternal>,
last_timestamp: Option<i64>,
}
impl MemoryStreamIteratorInternal {
fn notify(&self, entry: Entry) {
self.tx.send(entry).unwrap();
}
}
impl Subscription for MemoryStreamSubscription {
fn next(&mut self, timeout: Option<Duration>) -> Result<Option<Entry>, Error> {
let mut latest = self.internal.latest.lock().unwrap();
#[derive(Clone)]
pub struct MemoryStreamIterator {
_internal: Arc<MemoryStreamIteratorInternal>,
rx: Receiver<Entry>,
}
loop {
if let Some(latest) = &*latest {
if let Some(last_timestamp) = self.last_timestamp {
if last_timestamp < latest.timestamp {
self.last_timestamp = Some(latest.timestamp);
return Ok(Some(latest.clone()));
}
} else {
self.last_timestamp = Some(latest.timestamp);
return Ok(Some(latest.clone()));
}
}
impl Iterator for MemoryStreamIterator {
type Item = Result<Entry, Error>;
fn next(&mut self) -> Option<Self::Item> {
let value = self.rx.recv().unwrap();
Some(Ok(value))
if let Some(timeout) = timeout {
let result = self.internal.cvar.wait_timeout(latest, timeout).unwrap();
if result.1.timed_out() {
return Ok(None);
}
latest = result.0;
} else {
latest = self.internal.cvar.wait(latest).unwrap();
}
}
}

@@ -208,0 +238,0 @@ }

use std::borrow::Cow;
use std::error::Error as StdError;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use crate::{Entry, Error, Store, SubscribeableStore};
use crate::{Entry, Error, Store, SubscribeableStore, Subscription};
use byteorder::{ByteOrder, LittleEndian};
use crossbeam_channel::{unbounded, Receiver, Sender};
use redis::streams::{StreamId, StreamMaxlen, StreamRangeReply, StreamReadOptions, StreamReadReply};

@@ -100,3 +98,3 @@ use redis::{Client, Cmd, Commands, Connection, ConnectionLike, IntoConnectionInfo, RedisError, Value};

channel,
StreamMaxlen::Approx(1),
StreamMaxlen::Equals(1),
"*",

@@ -137,48 +135,21 @@ &[

impl SubscribeableStore for RedisStreamStore {
type Subscription = RedisStreamIterator;
type Subscription = RedisStreamSubscription;
fn subscribe<A: Into<Atom>>(&self, name: A) -> Result<Self::Subscription, Error> {
let conn = self.client.get_connection()?;
RedisStreamIterator::new(conn, name.into())
Ok(RedisStreamSubscription::new(conn, name.into()))
}
}
pub struct RedisStreamIterator {
shutdown: Arc<AtomicBool>,
rx: Option<Receiver<Result<Entry, Error>>>,
listener_thread: Option<thread::JoinHandle<()>>,
pub struct RedisStreamSubscription {
conn: Connection,
name: Atom,
last_id: String,
}
impl RedisStreamIterator {
fn new(conn: Connection, name: Atom) -> Result<Self, Error> {
let (tx, rx) = unbounded::<Result<Entry, Error>>();
let shutdown = Arc::new(AtomicBool::new(false));
let listener_thread = {
let shutdown = shutdown.clone();
thread::spawn(|| stream_listener(conn, name, tx, shutdown))
};
Ok(RedisStreamIterator {
shutdown,
rx: Some(rx),
listener_thread: Some(listener_thread),
})
}
}
impl Drop for RedisStreamIterator {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Relaxed);
let rx = self.rx.take().unwrap();
drop(rx);
let listener_thread = self.listener_thread.take().unwrap();
listener_thread.join().unwrap();
}
}
impl Iterator for RedisStreamIterator {
type Item = Result<Entry, Error>;
fn next(&mut self) -> Option<Self::Item> {
match self.rx.as_ref().unwrap().recv() {
Ok(value) => Some(value),
Err(_) => None,
impl RedisStreamSubscription {
fn new(conn: Connection, name: Atom) -> Self {
RedisStreamSubscription {
conn,
name,
last_id: "0".to_string(),
}

@@ -188,28 +159,20 @@ }

fn stream_listener(mut conn: Connection, name: Atom, tx: Sender<Result<Entry, Error>>, shutdown: Arc<AtomicBool>) {
let channels = vec![redis_channel(&name)];
let mut last_id = "0".to_string();
let opts = StreamReadOptions::default().block(STREAM_READ_BLOCK_MS);
loop {
let reply: StreamReadReply = match conn.xread_options(&channels, &[&last_id], &opts) {
Ok(reply) => reply,
Err(err) => {
if tx.send(Err(err.into())).is_err() || shutdown.load(Ordering::Relaxed) {
return;
} else {
continue;
impl Subscription for RedisStreamSubscription {
fn next(&mut self, timeout: Option<Duration>) -> Result<Option<Entry>, Error> {
let channels = vec![redis_channel(&self.name)];
let opts = StreamReadOptions::default().block(match timeout {
Some(timeout) => timeout.as_millis().try_into().unwrap(),
None => STREAM_READ_BLOCK_MS,
});
loop {
let reply: StreamReadReply = self.conn.xread_options(&channels, &[&self.last_id], &opts)?;
if let Some(stream_key) = reply.keys.into_iter().next() {
if let Some(stream_id) = stream_key.ids.into_iter().next() {
let value = entry_from_stream_id(&stream_id, self.name.clone())?;
self.last_id = stream_id.id;
return Ok(Some(value));
}
}
};
if reply.keys.is_empty() && shutdown.load(Ordering::Relaxed) {
return;
}
for stream_key in reply.keys {
for stream_id in stream_key.ids {
if tx.send(entry_from_stream_id(&stream_id, name.clone())).is_err() {
return;
}
last_id = stream_id.id;
if timeout.is_some() {
return Ok(None);
}

@@ -216,0 +179,0 @@ }

use std::borrow::Cow;
use std::ops::RangeBounds;
use std::time::Duration;

@@ -26,4 +27,8 @@ use crate::{Entry, Error};

pub trait SubscribeableStore: Store {
type Subscription: Iterator<Item = Result<Entry, Error>>;
type Subscription: Subscription;
fn subscribe<A: Into<Atom>>(&self, name: A) -> Result<Self::Subscription, Error>;
}
pub trait Subscription {
fn next(&mut self, timeout: Option<Duration>) -> Result<Option<Entry>, Error>;
}
use std::borrow::Cow;
use std::collections::VecDeque;
use std::time::Duration;
use crate::{Entry, Error, Range, RangeableStore, Store, SubscribeableStore, Subscription};
use string_cache::DefaultAtom as Atom;
use crate::{Entry, Error, Range, RangeableStore, Store, SubscribeableStore};
/// Defines a unit test function.

@@ -78,6 +80,10 @@ #[doc(hidden)]

pub fn pubsub<S: SubscribeableStore + Clone>(store: &S) {
let subscriber = store.subscribe("test_pubsub").unwrap();
let mut subscriber = store.subscribe("test_pubsub").unwrap();
insert_sample_data(store, "test_pubsub").unwrap();
let results: VecDeque<Result<Entry, Error>> = subscriber.take(10).collect();
check_sample_data(results, "test_pubsub").unwrap();
// should get just the last entry
let entry = subscriber.next(None).unwrap().unwrap();
assert_eq!(entry, Entry::new_with_timestamp(10, "test_pubsub", vec![10]));
// should timeout
let entry = subscriber.next(Some(Duration::from_millis(10))).unwrap();
assert!(entry.is_none());
}

@@ -84,0 +90,0 @@