binlog
Advanced tools
+1
-28
@@ -35,6 +35,5 @@ # This file is automatically @generated by Cargo. | ||
| name = "binlog" | ||
| version = "0.4.0" | ||
| version = "0.5.0" | ||
| dependencies = [ | ||
| "byteorder", | ||
| "crossbeam-channel", | ||
| "pyo3", | ||
@@ -94,22 +93,2 @@ "r2d2", | ||
| [[package]] | ||
| name = "crossbeam-channel" | ||
| version = "0.5.4" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "5aaa7bd5fb665c6864b5f963dd9097905c54125909c7aa94c9e18507cdbe6c53" | ||
| dependencies = [ | ||
| "cfg-if", | ||
| "crossbeam-utils", | ||
| ] | ||
| [[package]] | ||
| name = "crossbeam-utils" | ||
| version = "0.8.8" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38" | ||
| dependencies = [ | ||
| "cfg-if", | ||
| "lazy_static", | ||
| ] | ||
| [[package]] | ||
| name = "dtoa" | ||
@@ -225,8 +204,2 @@ version = "0.4.8" | ||
| [[package]] | ||
| name = "lazy_static" | ||
| version = "1.4.0" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" | ||
| [[package]] | ||
| name = "libc" | ||
@@ -233,0 +206,0 @@ version = "0.2.121" |
+1
-2
| [package] | ||
| name = "binlog" | ||
| version = "0.4.0" | ||
| version = "0.5.0" | ||
| edition = "2021" | ||
@@ -24,3 +24,2 @@ authors = ["Yusuf Simonson <simonson@gmail.com>"] | ||
| [dependencies] | ||
| crossbeam-channel = "0.5.4" | ||
| string_cache = "0.8.4" | ||
@@ -27,0 +26,0 @@ |
+6
-0
| # Changelog | ||
| ## 0.5.0 (5/8/2022) | ||
| * Support for pub/sub timeouts | ||
| * Redis store | ||
| * Removed separate thread for pub/sub iterators | ||
| ## 0.4.0 (4/21/2022) | ||
@@ -4,0 +10,0 @@ |
+1
-1
| Metadata-Version: 2.1 | ||
| Name: binlog | ||
| Version: 0.4.0 | ||
| Version: 0.5.0 | ||
| Classifier: Programming Language :: Rust | ||
@@ -5,0 +5,0 @@ Classifier: Programming Language :: Python :: Implementation :: CPython |
@@ -11,6 +11,7 @@ import tempfile | ||
| store.push(entry) | ||
| sub_entry = next(sub) | ||
| sub_entry = sub.next(None) | ||
| assert entry.timestamp == sub_entry.timestamp | ||
| assert entry.name == sub_entry.name | ||
| assert entry.value == sub_entry.value | ||
| sub_entry = sub.next(0.01) | ||
| assert sub_entry == None |
+3
-3
@@ -20,8 +20,8 @@ #![cfg_attr(feature = "benches", feature(test))] | ||
| pub use self::errors::Error; | ||
| pub use self::stores::memory::{MemoryRange, MemoryStore, MemoryStreamIterator}; | ||
| pub use self::stores::traits::{Range, RangeableStore, Store, SubscribeableStore}; | ||
| pub use self::stores::memory::{MemoryRange, MemoryStore, MemoryStreamSubscription}; | ||
| pub use self::stores::traits::{Range, RangeableStore, Store, SubscribeableStore, Subscription}; | ||
| #[cfg(feature = "redis-store")] | ||
| pub use self::stores::redis::{RedisStreamIterator, RedisStreamStore}; | ||
| pub use self::stores::redis::{RedisStreamStore, RedisStreamSubscription}; | ||
| #[cfg(feature = "sqlite-store")] | ||
| pub use self::stores::sqlite::{SqliteRange, SqliteRangeIterator, SqliteStore}; |
+22
-26
| use std::borrow::Cow; | ||
| use std::ops::Bound; | ||
| use std::time::Duration; | ||
| use crate::{Error, Range, RangeableStore, Store, SubscribeableStore}; | ||
| use crate::{Error, Range, RangeableStore, Store, SubscribeableStore, Subscription}; | ||
@@ -93,8 +94,5 @@ use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError}; | ||
| impl SqliteRange { | ||
| pub fn count(&self) -> PyResult<u64> { | ||
| // Don't consume `self.range` so further operation on `self` can be | ||
| // run. The downside of this is that we can't release the GIL, so | ||
| // count is not as cheap as it should be. | ||
| pub fn count(&self, py: Python) -> PyResult<u64> { | ||
| if let Some(range) = &self.range { | ||
| map_result(range.count()) | ||
| py.allow_threads(move || map_result(range.count())) | ||
| } else { | ||
@@ -113,6 +111,8 @@ Err(PyValueError::new_err("range already consumed")) | ||
| pub fn iter(&mut self) -> PyResult<SqliteRangeIterator> { | ||
| pub fn iter(&mut self, py: Python) -> PyResult<SqliteRangeIterator> { | ||
| if let Some(range) = self.range.take() { | ||
| let iter = map_result(range.iter())?; | ||
| Ok(SqliteRangeIterator { iter }) | ||
| py.allow_threads(move || { | ||
| let iter = map_result(range.iter())?; | ||
| Ok(SqliteRangeIterator { iter }) | ||
| }) | ||
| } else { | ||
@@ -163,5 +163,5 @@ Err(PyValueError::new_err("range already consumed")) | ||
| pub fn subscribe(&self, name: String) -> PyResult<RedisStreamIterator> { | ||
| let iter = map_result(self.store.subscribe(name))?; | ||
| Ok(RedisStreamIterator { iter }) | ||
| pub fn subscribe(&self, name: String) -> PyResult<RedisStreamSubscription> { | ||
| let subscription = map_result(self.store.subscribe(name))?; | ||
| Ok(RedisStreamSubscription { subscription }) | ||
| } | ||
@@ -171,19 +171,15 @@ } | ||
| #[pyclass] | ||
| pub struct RedisStreamIterator { | ||
| iter: crate::RedisStreamIterator, | ||
| pub struct RedisStreamSubscription { | ||
| subscription: crate::RedisStreamSubscription, | ||
| } | ||
| #[pymethods] | ||
| impl RedisStreamIterator { | ||
| fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { | ||
| slf | ||
| impl RedisStreamSubscription { | ||
| pub fn next(&mut self, py: Python, duration: Option<f32>) -> PyResult<Option<Entry>> { | ||
| let duration = duration.map(Duration::from_secs_f32); | ||
| py.allow_threads(move || { | ||
| let entry = map_result(self.subscription.next(duration))?; | ||
| Ok(entry.map(|e| e.into())) | ||
| }) | ||
| } | ||
| fn __next__(mut slf: PyRefMut<'_, Self>, py: Python) -> Option<PyObject> { | ||
| match slf.iter.next() { | ||
| Some(Ok(entry)) => Some(Entry::from(entry).into_py(py)), | ||
| Some(Err(err)) => Some(map_result::<()>(Err(err)).unwrap_err().into_py(py)), | ||
| None => None, | ||
| } | ||
| } | ||
| } | ||
@@ -198,4 +194,4 @@ | ||
| m.add_class::<RedisStreamStore>()?; | ||
| m.add_class::<RedisStreamIterator>()?; | ||
| m.add_class::<RedisStreamSubscription>()?; | ||
| Ok(()) | ||
| } |
+61
-31
| use std::borrow::Cow; | ||
| use std::collections::{BTreeMap, HashMap}; | ||
| use std::ops::{Bound, RangeBounds}; | ||
| use std::sync::{Arc, Mutex, Weak}; | ||
| use std::sync::{Arc, Condvar, Mutex, Weak}; | ||
| use std::time::Duration; | ||
| use std::vec::IntoIter as VecIter; | ||
| use crate::{utils, Entry, Error, Range, RangeableStore, Store, SubscribeableStore}; | ||
| use crate::{utils, Entry, Error, Range, RangeableStore, Store, SubscribeableStore, Subscription}; | ||
| use crossbeam_channel::{unbounded, Receiver, Sender}; | ||
| use string_cache::DefaultAtom as Atom; | ||
@@ -15,5 +15,18 @@ | ||
| entries: BTreeMap<(i64, Atom), Vec<Vec<u8>>>, | ||
| subscribers: HashMap<Atom, Vec<Weak<MemoryStreamIteratorInternal>>>, | ||
| subscribers: HashMap<Atom, Vec<Weak<MemoryStreamSubscriptionInternal>>>, | ||
| } | ||
| struct MemoryStreamSubscriptionInternal { | ||
| latest: Mutex<Option<Entry>>, | ||
| cvar: Condvar, | ||
| } | ||
| impl MemoryStreamSubscriptionInternal { | ||
| fn notify(&self, entry: Entry) { | ||
| let mut latest = self.latest.lock().unwrap(); | ||
| *latest = Some(entry); | ||
| self.cvar.notify_all(); | ||
| } | ||
| } | ||
| #[derive(Clone, Default)] | ||
@@ -34,3 +47,3 @@ pub struct MemoryStore(Arc<Mutex<MemoryStoreInternal>>); | ||
| let entry = entry.into_owned(); | ||
| let mut new_subscribers = Vec::<Weak<MemoryStreamIteratorInternal>>::default(); | ||
| let mut new_subscribers = Vec::<Weak<MemoryStreamSubscriptionInternal>>::default(); | ||
| for subscriber in subscribers.drain(..) { | ||
@@ -166,15 +179,21 @@ if let Some(subscriber) = Weak::upgrade(&subscriber) { | ||
| impl SubscribeableStore for MemoryStore { | ||
| type Subscription = MemoryStreamIterator; | ||
| type Subscription = MemoryStreamSubscription; | ||
| fn subscribe<A: Into<Atom>>(&self, name: A) -> Result<Self::Subscription, Error> { | ||
| let (tx, rx) = unbounded(); | ||
| let iterator_internal = Arc::new(MemoryStreamIteratorInternal { tx }); | ||
| let name = name.into(); | ||
| let latest = self.latest(&name)?; | ||
| let subscription_internal = Arc::new(MemoryStreamSubscriptionInternal { | ||
| latest: Mutex::new(latest), | ||
| cvar: Condvar::new(), | ||
| }); | ||
| let mut internal = self.0.lock().unwrap(); | ||
| internal | ||
| .subscribers | ||
| .entry(name.into()) | ||
| .entry(name) | ||
| .or_insert_with(Vec::default) | ||
| .push(Arc::downgrade(&iterator_internal)); | ||
| Ok(MemoryStreamIterator { | ||
| _internal: iterator_internal, | ||
| rx, | ||
| .push(Arc::downgrade(&subscription_internal)); | ||
| Ok(MemoryStreamSubscription { | ||
| internal: subscription_internal, | ||
| last_timestamp: None, | ||
| }) | ||
@@ -184,24 +203,35 @@ } | ||
| struct MemoryStreamIteratorInternal { | ||
| tx: Sender<Entry>, | ||
| #[derive(Clone)] | ||
| pub struct MemoryStreamSubscription { | ||
| internal: Arc<MemoryStreamSubscriptionInternal>, | ||
| last_timestamp: Option<i64>, | ||
| } | ||
| impl MemoryStreamIteratorInternal { | ||
| fn notify(&self, entry: Entry) { | ||
| self.tx.send(entry).unwrap(); | ||
| } | ||
| } | ||
| impl Subscription for MemoryStreamSubscription { | ||
| fn next(&mut self, timeout: Option<Duration>) -> Result<Option<Entry>, Error> { | ||
| let mut latest = self.internal.latest.lock().unwrap(); | ||
| #[derive(Clone)] | ||
| pub struct MemoryStreamIterator { | ||
| _internal: Arc<MemoryStreamIteratorInternal>, | ||
| rx: Receiver<Entry>, | ||
| } | ||
| loop { | ||
| if let Some(latest) = &*latest { | ||
| if let Some(last_timestamp) = self.last_timestamp { | ||
| if last_timestamp < latest.timestamp { | ||
| self.last_timestamp = Some(latest.timestamp); | ||
| return Ok(Some(latest.clone())); | ||
| } | ||
| } else { | ||
| self.last_timestamp = Some(latest.timestamp); | ||
| return Ok(Some(latest.clone())); | ||
| } | ||
| } | ||
| impl Iterator for MemoryStreamIterator { | ||
| type Item = Result<Entry, Error>; | ||
| fn next(&mut self) -> Option<Self::Item> { | ||
| let value = self.rx.recv().unwrap(); | ||
| Some(Ok(value)) | ||
| if let Some(timeout) = timeout { | ||
| let result = self.internal.cvar.wait_timeout(latest, timeout).unwrap(); | ||
| if result.1.timed_out() { | ||
| return Ok(None); | ||
| } | ||
| latest = result.0; | ||
| } else { | ||
| latest = self.internal.cvar.wait(latest).unwrap(); | ||
| } | ||
| } | ||
| } | ||
@@ -208,0 +238,0 @@ } |
+31
-68
| use std::borrow::Cow; | ||
| use std::error::Error as StdError; | ||
| use std::io::{Error as IoError, ErrorKind as IoErrorKind}; | ||
| use std::sync::atomic::{AtomicBool, Ordering}; | ||
| use std::sync::{Arc, Mutex}; | ||
| use std::thread; | ||
| use std::time::Duration; | ||
| use crate::{Entry, Error, Store, SubscribeableStore}; | ||
| use crate::{Entry, Error, Store, SubscribeableStore, Subscription}; | ||
| use byteorder::{ByteOrder, LittleEndian}; | ||
| use crossbeam_channel::{unbounded, Receiver, Sender}; | ||
| use redis::streams::{StreamId, StreamMaxlen, StreamRangeReply, StreamReadOptions, StreamReadReply}; | ||
@@ -100,3 +98,3 @@ use redis::{Client, Cmd, Commands, Connection, ConnectionLike, IntoConnectionInfo, RedisError, Value}; | ||
| channel, | ||
| StreamMaxlen::Approx(1), | ||
| StreamMaxlen::Equals(1), | ||
| "*", | ||
@@ -137,48 +135,21 @@ &[ | ||
| impl SubscribeableStore for RedisStreamStore { | ||
| type Subscription = RedisStreamIterator; | ||
| type Subscription = RedisStreamSubscription; | ||
| fn subscribe<A: Into<Atom>>(&self, name: A) -> Result<Self::Subscription, Error> { | ||
| let conn = self.client.get_connection()?; | ||
| RedisStreamIterator::new(conn, name.into()) | ||
| Ok(RedisStreamSubscription::new(conn, name.into())) | ||
| } | ||
| } | ||
| pub struct RedisStreamIterator { | ||
| shutdown: Arc<AtomicBool>, | ||
| rx: Option<Receiver<Result<Entry, Error>>>, | ||
| listener_thread: Option<thread::JoinHandle<()>>, | ||
| pub struct RedisStreamSubscription { | ||
| conn: Connection, | ||
| name: Atom, | ||
| last_id: String, | ||
| } | ||
| impl RedisStreamIterator { | ||
| fn new(conn: Connection, name: Atom) -> Result<Self, Error> { | ||
| let (tx, rx) = unbounded::<Result<Entry, Error>>(); | ||
| let shutdown = Arc::new(AtomicBool::new(false)); | ||
| let listener_thread = { | ||
| let shutdown = shutdown.clone(); | ||
| thread::spawn(|| stream_listener(conn, name, tx, shutdown)) | ||
| }; | ||
| Ok(RedisStreamIterator { | ||
| shutdown, | ||
| rx: Some(rx), | ||
| listener_thread: Some(listener_thread), | ||
| }) | ||
| } | ||
| } | ||
| impl Drop for RedisStreamIterator { | ||
| fn drop(&mut self) { | ||
| self.shutdown.store(true, Ordering::Relaxed); | ||
| let rx = self.rx.take().unwrap(); | ||
| drop(rx); | ||
| let listener_thread = self.listener_thread.take().unwrap(); | ||
| listener_thread.join().unwrap(); | ||
| } | ||
| } | ||
| impl Iterator for RedisStreamIterator { | ||
| type Item = Result<Entry, Error>; | ||
| fn next(&mut self) -> Option<Self::Item> { | ||
| match self.rx.as_ref().unwrap().recv() { | ||
| Ok(value) => Some(value), | ||
| Err(_) => None, | ||
| impl RedisStreamSubscription { | ||
| fn new(conn: Connection, name: Atom) -> Self { | ||
| RedisStreamSubscription { | ||
| conn, | ||
| name, | ||
| last_id: "0".to_string(), | ||
| } | ||
@@ -188,28 +159,20 @@ } | ||
| fn stream_listener(mut conn: Connection, name: Atom, tx: Sender<Result<Entry, Error>>, shutdown: Arc<AtomicBool>) { | ||
| let channels = vec![redis_channel(&name)]; | ||
| let mut last_id = "0".to_string(); | ||
| let opts = StreamReadOptions::default().block(STREAM_READ_BLOCK_MS); | ||
| loop { | ||
| let reply: StreamReadReply = match conn.xread_options(&channels, &[&last_id], &opts) { | ||
| Ok(reply) => reply, | ||
| Err(err) => { | ||
| if tx.send(Err(err.into())).is_err() || shutdown.load(Ordering::Relaxed) { | ||
| return; | ||
| } else { | ||
| continue; | ||
| impl Subscription for RedisStreamSubscription { | ||
| fn next(&mut self, timeout: Option<Duration>) -> Result<Option<Entry>, Error> { | ||
| let channels = vec![redis_channel(&self.name)]; | ||
| let opts = StreamReadOptions::default().block(match timeout { | ||
| Some(timeout) => timeout.as_millis().try_into().unwrap(), | ||
| None => STREAM_READ_BLOCK_MS, | ||
| }); | ||
| loop { | ||
| let reply: StreamReadReply = self.conn.xread_options(&channels, &[&self.last_id], &opts)?; | ||
| if let Some(stream_key) = reply.keys.into_iter().next() { | ||
| if let Some(stream_id) = stream_key.ids.into_iter().next() { | ||
| let value = entry_from_stream_id(&stream_id, self.name.clone())?; | ||
| self.last_id = stream_id.id; | ||
| return Ok(Some(value)); | ||
| } | ||
| } | ||
| }; | ||
| if reply.keys.is_empty() && shutdown.load(Ordering::Relaxed) { | ||
| return; | ||
| } | ||
| for stream_key in reply.keys { | ||
| for stream_id in stream_key.ids { | ||
| if tx.send(entry_from_stream_id(&stream_id, name.clone())).is_err() { | ||
| return; | ||
| } | ||
| last_id = stream_id.id; | ||
| if timeout.is_some() { | ||
| return Ok(None); | ||
| } | ||
@@ -216,0 +179,0 @@ } |
| use std::borrow::Cow; | ||
| use std::ops::RangeBounds; | ||
| use std::time::Duration; | ||
@@ -26,4 +27,8 @@ use crate::{Entry, Error}; | ||
| pub trait SubscribeableStore: Store { | ||
| type Subscription: Iterator<Item = Result<Entry, Error>>; | ||
| type Subscription: Subscription; | ||
| fn subscribe<A: Into<Atom>>(&self, name: A) -> Result<Self::Subscription, Error>; | ||
| } | ||
| pub trait Subscription { | ||
| fn next(&mut self, timeout: Option<Duration>) -> Result<Option<Entry>, Error>; | ||
| } |
+11
-5
| use std::borrow::Cow; | ||
| use std::collections::VecDeque; | ||
| use std::time::Duration; | ||
| use crate::{Entry, Error, Range, RangeableStore, Store, SubscribeableStore, Subscription}; | ||
| use string_cache::DefaultAtom as Atom; | ||
| use crate::{Entry, Error, Range, RangeableStore, Store, SubscribeableStore}; | ||
| /// Defines a unit test function. | ||
@@ -78,6 +80,10 @@ #[doc(hidden)] | ||
| pub fn pubsub<S: SubscribeableStore + Clone>(store: &S) { | ||
| let subscriber = store.subscribe("test_pubsub").unwrap(); | ||
| let mut subscriber = store.subscribe("test_pubsub").unwrap(); | ||
| insert_sample_data(store, "test_pubsub").unwrap(); | ||
| let results: VecDeque<Result<Entry, Error>> = subscriber.take(10).collect(); | ||
| check_sample_data(results, "test_pubsub").unwrap(); | ||
| // should get just the last entry | ||
| let entry = subscriber.next(None).unwrap().unwrap(); | ||
| assert_eq!(entry, Entry::new_with_timestamp(10, "test_pubsub", vec![10])); | ||
| // should timeout | ||
| let entry = subscriber.next(Some(Duration::from_millis(10))).unwrap(); | ||
| assert!(entry.is_none()); | ||
| } | ||
@@ -84,0 +90,0 @@ |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
84
2.44%114265
-0.18%