servicing
Advanced tools
+70
-19
@@ -139,5 +139,5 @@ # This file is automatically @generated by Cargo. | ||
| name = "cc" | ||
| version = "1.0.94" | ||
| version = "1.0.95" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "17f6e324229dc011159fcc089755d1e2e216a90d43a7dea6853ca740b84f35e7" | ||
| checksum = "d32a725bc159af97c3e629873bb9f88fb8cf8a4867175f76dc987815ea07c83b" | ||
@@ -278,2 +278,17 @@ [[package]] | ||
| [[package]] | ||
| name = "futures" | ||
| version = "0.3.30" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" | ||
| dependencies = [ | ||
| "futures-channel", | ||
| "futures-core", | ||
| "futures-executor", | ||
| "futures-io", | ||
| "futures-sink", | ||
| "futures-task", | ||
| "futures-util", | ||
| ] | ||
| [[package]] | ||
| name = "futures-channel" | ||
@@ -285,2 +300,3 @@ version = "0.3.30" | ||
| "futures-core", | ||
| "futures-sink", | ||
| ] | ||
@@ -295,2 +311,30 @@ | ||
| [[package]] | ||
| name = "futures-executor" | ||
| version = "0.3.30" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" | ||
| dependencies = [ | ||
| "futures-core", | ||
| "futures-task", | ||
| "futures-util", | ||
| ] | ||
| [[package]] | ||
| name = "futures-io" | ||
| version = "0.3.30" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" | ||
| [[package]] | ||
| name = "futures-macro" | ||
| version = "0.3.30" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" | ||
| dependencies = [ | ||
| "proc-macro2", | ||
| "quote", | ||
| "syn", | ||
| ] | ||
| [[package]] | ||
| name = "futures-sink" | ||
@@ -313,6 +357,12 @@ version = "0.3.30" | ||
| dependencies = [ | ||
| "futures-channel", | ||
| "futures-core", | ||
| "futures-io", | ||
| "futures-macro", | ||
| "futures-sink", | ||
| "futures-task", | ||
| "memchr", | ||
| "pin-project-lite", | ||
| "pin-utils", | ||
| "slab", | ||
| ] | ||
@@ -776,5 +826,5 @@ | ||
| name = "proc-macro2" | ||
| version = "1.0.80" | ||
| version = "1.0.81" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "a56dea16b0a29e94408b9aa5e2940a4eedbd128a1ba20e8f7ae60fd3d465af0e" | ||
| checksum = "3d1597b0c024618f09a9c3b8655b7e430397a36d23fdafec26d6965e9eec3eba" | ||
| dependencies = [ | ||
@@ -907,5 +957,5 @@ "unicode-ident", | ||
| name = "reqwest" | ||
| version = "0.12.3" | ||
| version = "0.12.4" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "3e6cc1e89e689536eb5aeede61520e874df5a4707df811cd5da4aa5fbb2aae19" | ||
| checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10" | ||
| dependencies = [ | ||
@@ -956,5 +1006,5 @@ "base64", | ||
| name = "rustix" | ||
| version = "0.38.32" | ||
| version = "0.38.34" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89" | ||
| checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" | ||
| dependencies = [ | ||
@@ -980,5 +1030,5 @@ "bitflags 2.5.0", | ||
| name = "rustls-pki-types" | ||
| version = "1.4.1" | ||
| version = "1.5.0" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "ecd36cc4259e3e4514335c4a138c6b43171a8d61d8f5c9348f9fc7529416f247" | ||
| checksum = "beb461507cee2c2ff151784c52762cf4d9ff6a61f3e80968600ed24fa837fa54" | ||
@@ -1087,3 +1137,3 @@ [[package]] | ||
| name = "servicing" | ||
| version = "0.0.6" | ||
| version = "0.0.7" | ||
| dependencies = [ | ||
@@ -1094,2 +1144,3 @@ "base64", | ||
| "env_logger", | ||
| "futures", | ||
| "log", | ||
@@ -1108,5 +1159,5 @@ "pyo3", | ||
| name = "signal-hook-registry" | ||
| version = "1.4.1" | ||
| version = "1.4.2" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" | ||
| checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" | ||
| dependencies = [ | ||
@@ -1143,5 +1194,5 @@ "libc", | ||
| name = "syn" | ||
| version = "2.0.59" | ||
| version = "2.0.60" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "4a6531ffc7b071655e4ce2e04bd464c4830bb585a61cabb96cf808f05172615a" | ||
| checksum = "909518bc7b1c9b779f1bbf07f2929d35af9f0f37e47c6e9ef7f9dddc1e1821f3" | ||
| dependencies = [ | ||
@@ -1200,5 +1251,5 @@ "proc-macro2", | ||
| name = "thiserror" | ||
| version = "1.0.58" | ||
| version = "1.0.59" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "03468839009160513471e86a034bb2c5c0e4baae3b43f79ffc55c4a5427b3297" | ||
| checksum = "f0126ad08bff79f29fc3ae6a55cc72352056dfff61e3ff8bb7129476d44b23aa" | ||
| dependencies = [ | ||
@@ -1210,5 +1261,5 @@ "thiserror-impl", | ||
| name = "thiserror-impl" | ||
| version = "1.0.58" | ||
| version = "1.0.59" | ||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||
| checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" | ||
| checksum = "d1cd413b5d558b4c5bf3680e324a6fa5014e7b7c067a51e69dbdf47eb7148b66" | ||
| dependencies = [ | ||
@@ -1215,0 +1266,0 @@ "proc-macro2", |
+2
-1
| [package] | ||
| name = "servicing" | ||
| version = "0.0.6" | ||
| version = "0.0.7" | ||
| edition = "2021" | ||
@@ -25,1 +25,2 @@ | ||
| base64 = "0.22.0" | ||
| futures = "0.3.30" |
+1
-1
| Metadata-Version: 2.3 | ||
| Name: servicing | ||
| Version: 0.0.6 | ||
| Version: 0.0.7 | ||
| Classifier: Programming Language :: Rust | ||
@@ -5,0 +5,0 @@ Classifier: Programming Language :: Python :: Implementation :: CPython |
+4
-2
@@ -19,3 +19,4 @@ from typing import List, Optional | ||
| def __init__(self, port: Optional[int] = None, | ||
| def __init__(self, | ||
| port: Optional[int] = None, | ||
| replicas: Optional[int] = None, | ||
@@ -62,3 +63,3 @@ cloud: Optional[str] = None, | ||
| def down(self, name: str) -> None: | ||
| def down(self, name: str, force: Optional[bool] = None) -> None: | ||
| """ | ||
@@ -68,2 +69,3 @@ Stop a service | ||
| :param name: the name of the service to stop | ||
| :param force: whether to force stop the service | ||
| """ | ||
@@ -70,0 +72,0 @@ |
+178
-31
@@ -8,3 +8,2 @@ #![allow(dead_code)] // Remove this later | ||
| sync::{Arc, Mutex, OnceLock}, | ||
| thread::{sleep, spawn}, | ||
| time::Duration, | ||
@@ -14,2 +13,3 @@ }; | ||
| use base64::Engine; | ||
| use futures::future::join_all; | ||
| use log::{error, info, warn}; | ||
@@ -20,2 +20,6 @@ use pyo3::{pyclass, pymethods, Bound, PyAny}; | ||
| use serde::{Deserialize, Serialize}; | ||
| use tokio::{ | ||
| runtime::{self, Runtime}, | ||
| time::sleep, | ||
| }; | ||
@@ -31,3 +35,4 @@ use crate::{ | ||
| static CLUSTER_ORCHESTRATOR: &str = "skypilot"; | ||
| static SEVICE_CHECK_INTERVAL: Duration = Duration::from_secs(5); | ||
| static SERVICE_CHECK_INTERVAL: Duration = Duration::from_secs(5); | ||
| static REPLICA_UP_CHECK: &str = "no ready replicas"; | ||
@@ -41,2 +46,3 @@ static REGEX_URL: OnceLock<Regex> = OnceLock::new(); | ||
| client: Client, | ||
| rt: Runtime, | ||
| service: Arc<Mutex<HashMap<String, Service>>>, | ||
@@ -70,4 +76,15 @@ } | ||
| // tokio runtime with one dedicated worker | ||
| let rt = runtime::Builder::new_multi_thread() | ||
| .worker_threads(1) | ||
| .thread_name("servicing") | ||
| .enable_all() | ||
| .build()?; | ||
| Ok(Self { | ||
| client: Client::new(), | ||
| client: Client::builder() | ||
| .pool_max_idle_per_host(0) | ||
| .timeout(Duration::from_secs(10)) | ||
| .build()?, | ||
| rt, | ||
| service, | ||
@@ -202,12 +219,12 @@ }) | ||
| let url = url.to_string(); | ||
| let url = url.to_string() + &service.template.service.readiness_probe; | ||
| // spawn a thread to check when service comes online, then update the service status | ||
| spawn(move || { | ||
| // spawn a green thread to check when service comes online, then update the service status | ||
| let fut = async move { | ||
| let url = format!("http://{}", url); | ||
| loop { | ||
| match helper::fetch(&client_clone, &url) { | ||
| match helper::fetch(&client_clone, &url).await { | ||
| Ok(resp) => { | ||
| if resp.to_lowercase().contains("no ready replicas") { | ||
| sleep(SEVICE_CHECK_INTERVAL); | ||
| if resp.to_lowercase().contains(REPLICA_UP_CHECK) { | ||
| sleep(SERVICE_CHECK_INTERVAL).await; | ||
| continue; | ||
@@ -237,3 +254,4 @@ } | ||
| } | ||
| }); | ||
| }; | ||
| self.rt.spawn(fut); | ||
@@ -245,35 +263,73 @@ return Ok(()); | ||
| pub fn down(&mut self, name: String) -> Result<(), ServicingError> { | ||
| pub fn down(&mut self, name: String, force: Option<bool>) -> Result<(), ServicingError> { | ||
| // get the service configuration | ||
| match self.service.lock()?.get_mut(&name) { | ||
| Some(service) if service.up => { | ||
| info!("Destroying the service with the configuration: {:?}", name); | ||
| // launch the cluster | ||
| let mut child = Command::new("sky") | ||
| .arg("serve") | ||
| .arg("down") | ||
| .arg(&name) | ||
| .spawn()?; | ||
| child.wait()?; | ||
| // Update service status | ||
| service.url = None; | ||
| service.up = false; | ||
| Ok(()) | ||
| } | ||
| Some(_) => Err(ServicingError::ServiceNotUp(name)), | ||
| None => Err(ServicingError::ServiceNotFound(name)), | ||
| Some(_) => match force { | ||
| Some(true) => {} | ||
| Some(false) | None => { | ||
| return Err(ServicingError::ServiceNotUp(name)); | ||
| } | ||
| }, | ||
| None => return Err(ServicingError::ServiceNotFound(name)), | ||
| } | ||
| info!("Destroying the service with the configuration: {:?}", name); | ||
| // launch the cluster | ||
| let mut child = Command::new("sky") | ||
| .arg("serve") | ||
| .arg("down") | ||
| .arg(&name) | ||
| .spawn()?; | ||
| child.wait()?; | ||
| Ok(()) | ||
| } | ||
| pub fn status(&self, name: String, pretty: Option<bool>) -> Result<String, ServicingError> { | ||
| pub fn status(&mut self, name: String, pretty: Option<bool>) -> Result<String, ServicingError> { | ||
| // Check if the service exists | ||
| if let Some(service) = self.service.lock()?.get(&name) { | ||
| if let Some(service) = self.service.lock()?.get_mut(&name) { | ||
| info!("Checking the status of the service: {:?}", name); | ||
| // if service is up poll once to see if it's still up | ||
| if let (true, Some(url)) = (service.up, &service.url) { | ||
| let url = format!( | ||
| "http://{}{}", | ||
| url, &service.template.service.readiness_probe | ||
| ); | ||
| let r = self.rt.block_on(async { | ||
| let res = helper::fetch(&self.client, &url).await; | ||
| match res { | ||
| Ok(resp) => { | ||
| if resp.to_lowercase().contains(REPLICA_UP_CHECK) { | ||
| Err(ServicingError::ServiceNotUp(name.clone())) | ||
| } else { | ||
| // it's up | ||
| Ok(()) | ||
| } | ||
| } | ||
| Err(e) => Err::<(), _>(ServicingError::General(e.to_string())), | ||
| } | ||
| }); | ||
| match r { | ||
| Ok(_) => { | ||
| //No-op | ||
| info!("Service {} is up", name); | ||
| } | ||
| Err(e) => { | ||
| warn!("{:?}", e); | ||
| service.up = false; | ||
| } | ||
| } | ||
| } | ||
| return Ok(match pretty { | ||
| Some(true) => serde_json::to_string_pretty(service)?, | ||
| Some(false) => serde_json::to_string(service)?, | ||
| None => serde_json::to_string(service)?, | ||
| _ => serde_json::to_string(service)?, | ||
| }); | ||
@@ -315,3 +371,7 @@ } | ||
| pub fn load(&mut self, location: Option<PathBuf>) -> Result<(), ServicingError> { | ||
| pub fn load( | ||
| &mut self, | ||
| location: Option<PathBuf>, | ||
| update_status: Option<bool>, | ||
| ) -> Result<(), ServicingError> { | ||
| let location = if let Some(location) = location { | ||
@@ -335,2 +395,83 @@ helper::create_directory( | ||
| if let Some(true) = update_status { | ||
| info!("Checking for services that may come up while you were away..."); | ||
| // Clones to pass to threads | ||
| let service_clone = self.service.clone(); | ||
| let client_clone = self.client.clone(); | ||
| let mut service_to_check = Vec::new(); | ||
| // iterate through the services and find that are down | ||
| self.service | ||
| .lock()? | ||
| .iter() | ||
| .filter(|(_, service)| !service.up && service.url.is_some()) | ||
| .for_each(|(name, service)| { | ||
| service_to_check.push(( | ||
| name.clone(), | ||
| service | ||
| .url | ||
| .clone() | ||
| .expect("Gettting url, this should never be None") | ||
| + &service.template.service.readiness_probe, | ||
| )) | ||
| }); | ||
| if service_to_check.is_empty() { | ||
| info!("No services to check"); | ||
| return Ok(()); | ||
| } | ||
| info!("Services to check: {:?}", service_to_check); | ||
| self.rt.spawn(async move { | ||
| let mut handles = Vec::new(); | ||
| for (name, url) in service_to_check { | ||
| let client_clone = client_clone.clone(); | ||
| let url = format!("http://{}", url); | ||
| let handle = tokio::spawn(async move { | ||
| match helper::fetch_and_check( | ||
| &client_clone, | ||
| &url, | ||
| REPLICA_UP_CHECK, | ||
| Some(SERVICE_CHECK_INTERVAL), | ||
| ) | ||
| .await | ||
| { | ||
| Ok(_) => {} | ||
| Err(e) => { | ||
| return Err(e); | ||
| } | ||
| } | ||
| Ok(name) | ||
| }); | ||
| handles.push(handle); | ||
| } | ||
| for res in join_all(handles).await { | ||
| let mut service = match service_clone.lock() { | ||
| Ok(s) => s, | ||
| Err(e) => { | ||
| error!("Poisoned lock {e}"); | ||
| return; | ||
| } | ||
| }; | ||
| match res { | ||
| Ok(Ok(r)) => { | ||
| if let Some(service) = service.get_mut(&r) { | ||
| service.up = true; | ||
| info!("Service {} is up", r); | ||
| } | ||
| } | ||
| Ok(Err(e)) => { | ||
| warn!("{e}"); | ||
| } | ||
| Err(e) => { | ||
| error!("{e}"); | ||
| } | ||
| } | ||
| } | ||
| }); | ||
| } | ||
| Ok(()) | ||
@@ -388,2 +529,5 @@ } | ||
| run: None, | ||
| disk_size: None, | ||
| cpu: None, | ||
| memory: None, | ||
| }), | ||
@@ -393,2 +537,5 @@ ) | ||
| // test the runtime... should NOT panic | ||
| dis.rt.block_on(async { "" }); | ||
| dis.save(None).unwrap(); | ||
@@ -408,3 +555,3 @@ | ||
| dis.load(None).unwrap(); | ||
| dis.load(None, None).unwrap(); | ||
| { | ||
@@ -411,0 +558,0 @@ let services = dis.service.lock().unwrap(); |
+36
-15
@@ -0,1 +1,2 @@ | ||
| //! Helper module houses all the helper functions used by the service module. | ||
| use std::{ | ||
@@ -8,2 +9,3 @@ fs, | ||
| thread::{spawn, JoinHandle}, | ||
| time::Duration, | ||
| }; | ||
@@ -13,2 +15,3 @@ | ||
| use reqwest::{header::ACCEPT, Client}; | ||
| use tokio::time::sleep; | ||
@@ -164,18 +167,36 @@ use crate::error::ServicingError; | ||
| pub fn fetch(client: &Client, url: &str) -> Result<String, ServicingError> { | ||
| // create tokio runtime that is single threaded | ||
| let result = tokio::runtime::Builder::new_current_thread() | ||
| .enable_all() | ||
| .build()? | ||
| .block_on(async { | ||
| let res = client | ||
| .get(url) | ||
| .header(ACCEPT, "application/json") | ||
| .send() | ||
| .await?; | ||
| let body = res.text().await?; | ||
| Ok::<_, ServicingError>(body) | ||
| })?; | ||
| pub async fn fetch(client: &Client, url: &str) -> Result<String, reqwest::Error> { | ||
| let res = client | ||
| .get(url) | ||
| .header(ACCEPT, "application/json") | ||
| .send() | ||
| .await?; | ||
| let body = res.text().await?; | ||
| Ok(body) | ||
| } | ||
| Ok(result) | ||
| pub async fn fetch_and_check( | ||
| client: &Client, | ||
| url: &str, | ||
| expected: &str, | ||
| delay: Option<Duration>, | ||
| ) -> Result<(), ServicingError> { | ||
| loop { | ||
| let res = client | ||
| .get(url) | ||
| .header(ACCEPT, "application/json") | ||
| .send() | ||
| .await?; | ||
| let body = res.text().await?; | ||
| if !body.to_lowercase().contains(expected) { | ||
| break; | ||
| } | ||
| if let Some(delay) = delay { | ||
| sleep(delay).await; | ||
| } | ||
| } | ||
| Ok(()) | ||
| } |
+1
-1
@@ -14,3 +14,3 @@ use env_logger::Builder; | ||
| fn servicing(m: &Bound<'_, PyModule>) -> PyResult<()> { | ||
| // if release mode, set log level to info | ||
| // if release mode, set log level to warn | ||
| if cfg!(not(debug_assertions)) { | ||
@@ -17,0 +17,0 @@ Builder::new().filter_level(log::LevelFilter::Warn).init(); |
@@ -21,2 +21,3 @@ use pyo3::{pyclass, pymethods}; | ||
| #[new] | ||
| #[allow(clippy::too_many_arguments)] | ||
| pub fn new( | ||
@@ -23,0 +24,0 @@ port: Option<u16>, |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
84874
8.88%96
2.13%