Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

servicing

Package Overview
Dependencies
Maintainers
1
Versions
12
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

servicing - npm Package Compare versions

Comparing version
0.0.6
to
0.0.7
+70
-19
Cargo.lock

@@ -139,5 +139,5 @@ # This file is automatically @generated by Cargo.

name = "cc"
version = "1.0.94"
version = "1.0.95"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17f6e324229dc011159fcc089755d1e2e216a90d43a7dea6853ca740b84f35e7"
checksum = "d32a725bc159af97c3e629873bb9f88fb8cf8a4867175f76dc987815ea07c83b"

@@ -278,2 +278,17 @@ [[package]]

[[package]]
name = "futures"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"

@@ -285,2 +300,3 @@ version = "0.3.30"

"futures-core",
"futures-sink",
]

@@ -295,2 +311,30 @@

[[package]]
name = "futures-executor"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-macro"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"

@@ -313,6 +357,12 @@ version = "0.3.30"

dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]

@@ -776,5 +826,5 @@

name = "proc-macro2"
version = "1.0.80"
version = "1.0.81"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a56dea16b0a29e94408b9aa5e2940a4eedbd128a1ba20e8f7ae60fd3d465af0e"
checksum = "3d1597b0c024618f09a9c3b8655b7e430397a36d23fdafec26d6965e9eec3eba"
dependencies = [

@@ -907,5 +957,5 @@ "unicode-ident",

name = "reqwest"
version = "0.12.3"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e6cc1e89e689536eb5aeede61520e874df5a4707df811cd5da4aa5fbb2aae19"
checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10"
dependencies = [

@@ -956,5 +1006,5 @@ "base64",

name = "rustix"
version = "0.38.32"
version = "0.38.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89"
checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f"
dependencies = [

@@ -980,5 +1030,5 @@ "bitflags 2.5.0",

name = "rustls-pki-types"
version = "1.4.1"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecd36cc4259e3e4514335c4a138c6b43171a8d61d8f5c9348f9fc7529416f247"
checksum = "beb461507cee2c2ff151784c52762cf4d9ff6a61f3e80968600ed24fa837fa54"

@@ -1087,3 +1137,3 @@ [[package]]

name = "servicing"
version = "0.0.6"
version = "0.0.7"
dependencies = [

@@ -1094,2 +1144,3 @@ "base64",

"env_logger",
"futures",
"log",

@@ -1108,5 +1159,5 @@ "pyo3",

name = "signal-hook-registry"
version = "1.4.1"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1"
checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1"
dependencies = [

@@ -1143,5 +1194,5 @@ "libc",

name = "syn"
version = "2.0.59"
version = "2.0.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a6531ffc7b071655e4ce2e04bd464c4830bb585a61cabb96cf808f05172615a"
checksum = "909518bc7b1c9b779f1bbf07f2929d35af9f0f37e47c6e9ef7f9dddc1e1821f3"
dependencies = [

@@ -1200,5 +1251,5 @@ "proc-macro2",

name = "thiserror"
version = "1.0.58"
version = "1.0.59"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03468839009160513471e86a034bb2c5c0e4baae3b43f79ffc55c4a5427b3297"
checksum = "f0126ad08bff79f29fc3ae6a55cc72352056dfff61e3ff8bb7129476d44b23aa"
dependencies = [

@@ -1210,5 +1261,5 @@ "thiserror-impl",

name = "thiserror-impl"
version = "1.0.58"
version = "1.0.59"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7"
checksum = "d1cd413b5d558b4c5bf3680e324a6fa5014e7b7c067a51e69dbdf47eb7148b66"
dependencies = [

@@ -1215,0 +1266,0 @@ "proc-macro2",

[package]
name = "servicing"
version = "0.0.6"
version = "0.0.7"
edition = "2021"

@@ -25,1 +25,2 @@

base64 = "0.22.0"
futures = "0.3.30"
Metadata-Version: 2.3
Name: servicing
Version: 0.0.6
Version: 0.0.7
Classifier: Programming Language :: Rust

@@ -5,0 +5,0 @@ Classifier: Programming Language :: Python :: Implementation :: CPython

@@ -19,3 +19,4 @@ from typing import List, Optional

def __init__(self, port: Optional[int] = None,
def __init__(self,
port: Optional[int] = None,
replicas: Optional[int] = None,

@@ -62,3 +63,3 @@ cloud: Optional[str] = None,

def down(self, name: str) -> None:
def down(self, name: str, force: Optional[bool] = None) -> None:
"""

@@ -68,2 +69,3 @@ Stop a service

:param name: the name of the service to stop
:param force: whether to force stop the service
"""

@@ -70,0 +72,0 @@

@@ -8,3 +8,2 @@ #![allow(dead_code)] // Remove this later

sync::{Arc, Mutex, OnceLock},
thread::{sleep, spawn},
time::Duration,

@@ -14,2 +13,3 @@ };

use base64::Engine;
use futures::future::join_all;
use log::{error, info, warn};

@@ -20,2 +20,6 @@ use pyo3::{pyclass, pymethods, Bound, PyAny};

use serde::{Deserialize, Serialize};
use tokio::{
runtime::{self, Runtime},
time::sleep,
};

@@ -31,3 +35,4 @@ use crate::{

static CLUSTER_ORCHESTRATOR: &str = "skypilot";
static SEVICE_CHECK_INTERVAL: Duration = Duration::from_secs(5);
static SERVICE_CHECK_INTERVAL: Duration = Duration::from_secs(5);
static REPLICA_UP_CHECK: &str = "no ready replicas";

@@ -41,2 +46,3 @@ static REGEX_URL: OnceLock<Regex> = OnceLock::new();

client: Client,
rt: Runtime,
service: Arc<Mutex<HashMap<String, Service>>>,

@@ -70,4 +76,15 @@ }

// tokio runtime with one dedicated worker
let rt = runtime::Builder::new_multi_thread()
.worker_threads(1)
.thread_name("servicing")
.enable_all()
.build()?;
Ok(Self {
client: Client::new(),
client: Client::builder()
.pool_max_idle_per_host(0)
.timeout(Duration::from_secs(10))
.build()?,
rt,
service,

@@ -202,12 +219,12 @@ })

let url = url.to_string();
let url = url.to_string() + &service.template.service.readiness_probe;
// spawn a thread to check when service comes online, then update the service status
spawn(move || {
// spawn a green thread to check when service comes online, then update the service status
let fut = async move {
let url = format!("http://{}", url);
loop {
match helper::fetch(&client_clone, &url) {
match helper::fetch(&client_clone, &url).await {
Ok(resp) => {
if resp.to_lowercase().contains("no ready replicas") {
sleep(SEVICE_CHECK_INTERVAL);
if resp.to_lowercase().contains(REPLICA_UP_CHECK) {
sleep(SERVICE_CHECK_INTERVAL).await;
continue;

@@ -237,3 +254,4 @@ }

}
});
};
self.rt.spawn(fut);

@@ -245,35 +263,73 @@ return Ok(());

pub fn down(&mut self, name: String) -> Result<(), ServicingError> {
pub fn down(&mut self, name: String, force: Option<bool>) -> Result<(), ServicingError> {
// get the service configuration
match self.service.lock()?.get_mut(&name) {
Some(service) if service.up => {
info!("Destroying the service with the configuration: {:?}", name);
// launch the cluster
let mut child = Command::new("sky")
.arg("serve")
.arg("down")
.arg(&name)
.spawn()?;
child.wait()?;
// Update service status
service.url = None;
service.up = false;
Ok(())
}
Some(_) => Err(ServicingError::ServiceNotUp(name)),
None => Err(ServicingError::ServiceNotFound(name)),
Some(_) => match force {
Some(true) => {}
Some(false) | None => {
return Err(ServicingError::ServiceNotUp(name));
}
},
None => return Err(ServicingError::ServiceNotFound(name)),
}
info!("Destroying the service with the configuration: {:?}", name);
// launch the cluster
let mut child = Command::new("sky")
.arg("serve")
.arg("down")
.arg(&name)
.spawn()?;
child.wait()?;
Ok(())
}
pub fn status(&self, name: String, pretty: Option<bool>) -> Result<String, ServicingError> {
pub fn status(&mut self, name: String, pretty: Option<bool>) -> Result<String, ServicingError> {
// Check if the service exists
if let Some(service) = self.service.lock()?.get(&name) {
if let Some(service) = self.service.lock()?.get_mut(&name) {
info!("Checking the status of the service: {:?}", name);
// if service is up poll once to see if it's still up
if let (true, Some(url)) = (service.up, &service.url) {
let url = format!(
"http://{}{}",
url, &service.template.service.readiness_probe
);
let r = self.rt.block_on(async {
let res = helper::fetch(&self.client, &url).await;
match res {
Ok(resp) => {
if resp.to_lowercase().contains(REPLICA_UP_CHECK) {
Err(ServicingError::ServiceNotUp(name.clone()))
} else {
// it's up
Ok(())
}
}
Err(e) => Err::<(), _>(ServicingError::General(e.to_string())),
}
});
match r {
Ok(_) => {
//No-op
info!("Service {} is up", name);
}
Err(e) => {
warn!("{:?}", e);
service.up = false;
}
}
}
return Ok(match pretty {
Some(true) => serde_json::to_string_pretty(service)?,
Some(false) => serde_json::to_string(service)?,
None => serde_json::to_string(service)?,
_ => serde_json::to_string(service)?,
});

@@ -315,3 +371,7 @@ }

pub fn load(&mut self, location: Option<PathBuf>) -> Result<(), ServicingError> {
pub fn load(
&mut self,
location: Option<PathBuf>,
update_status: Option<bool>,
) -> Result<(), ServicingError> {
let location = if let Some(location) = location {

@@ -335,2 +395,83 @@ helper::create_directory(

if let Some(true) = update_status {
info!("Checking for services that may come up while you were away...");
// Clones to pass to threads
let service_clone = self.service.clone();
let client_clone = self.client.clone();
let mut service_to_check = Vec::new();
// iterate through the services and find that are down
self.service
.lock()?
.iter()
.filter(|(_, service)| !service.up && service.url.is_some())
.for_each(|(name, service)| {
service_to_check.push((
name.clone(),
service
.url
.clone()
.expect("Gettting url, this should never be None")
+ &service.template.service.readiness_probe,
))
});
if service_to_check.is_empty() {
info!("No services to check");
return Ok(());
}
info!("Services to check: {:?}", service_to_check);
self.rt.spawn(async move {
let mut handles = Vec::new();
for (name, url) in service_to_check {
let client_clone = client_clone.clone();
let url = format!("http://{}", url);
let handle = tokio::spawn(async move {
match helper::fetch_and_check(
&client_clone,
&url,
REPLICA_UP_CHECK,
Some(SERVICE_CHECK_INTERVAL),
)
.await
{
Ok(_) => {}
Err(e) => {
return Err(e);
}
}
Ok(name)
});
handles.push(handle);
}
for res in join_all(handles).await {
let mut service = match service_clone.lock() {
Ok(s) => s,
Err(e) => {
error!("Poisoned lock {e}");
return;
}
};
match res {
Ok(Ok(r)) => {
if let Some(service) = service.get_mut(&r) {
service.up = true;
info!("Service {} is up", r);
}
}
Ok(Err(e)) => {
warn!("{e}");
}
Err(e) => {
error!("{e}");
}
}
}
});
}
Ok(())

@@ -388,2 +529,5 @@ }

run: None,
disk_size: None,
cpu: None,
memory: None,
}),

@@ -393,2 +537,5 @@ )

// test the runtime... should NOT panic
dis.rt.block_on(async { "" });
dis.save(None).unwrap();

@@ -408,3 +555,3 @@

dis.load(None).unwrap();
dis.load(None, None).unwrap();
{

@@ -411,0 +558,0 @@ let services = dis.service.lock().unwrap();

@@ -0,1 +1,2 @@

//! Helper module houses all the helper functions used by the service module.
use std::{

@@ -8,2 +9,3 @@ fs,

thread::{spawn, JoinHandle},
time::Duration,
};

@@ -13,2 +15,3 @@

use reqwest::{header::ACCEPT, Client};
use tokio::time::sleep;

@@ -164,18 +167,36 @@ use crate::error::ServicingError;

pub fn fetch(client: &Client, url: &str) -> Result<String, ServicingError> {
// create tokio runtime that is single threaded
let result = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?
.block_on(async {
let res = client
.get(url)
.header(ACCEPT, "application/json")
.send()
.await?;
let body = res.text().await?;
Ok::<_, ServicingError>(body)
})?;
pub async fn fetch(client: &Client, url: &str) -> Result<String, reqwest::Error> {
let res = client
.get(url)
.header(ACCEPT, "application/json")
.send()
.await?;
let body = res.text().await?;
Ok(body)
}
Ok(result)
pub async fn fetch_and_check(
client: &Client,
url: &str,
expected: &str,
delay: Option<Duration>,
) -> Result<(), ServicingError> {
loop {
let res = client
.get(url)
.header(ACCEPT, "application/json")
.send()
.await?;
let body = res.text().await?;
if !body.to_lowercase().contains(expected) {
break;
}
if let Some(delay) = delay {
sleep(delay).await;
}
}
Ok(())
}

@@ -14,3 +14,3 @@ use env_logger::Builder;

fn servicing(m: &Bound<'_, PyModule>) -> PyResult<()> {
// if release mode, set log level to info
// if release mode, set log level to warn
if cfg!(not(debug_assertions)) {

@@ -17,0 +17,0 @@ Builder::new().filter_level(log::LevelFilter::Warn).init();

@@ -21,2 +21,3 @@ use pyo3::{pyclass, pymethods};

#[new]
#[allow(clippy::too_many_arguments)]
pub fn new(

@@ -23,0 +24,0 @@ port: Option<u16>,