New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

meshctrl

Package Overview
Dependencies
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

meshctrl - pypi Package Compare versions

Comparing version
0.1.11
to
0.1.12
+1
-1
LICENSE
MIT License
Copyright (c) [year] [fullname]
Copyright (c) 2022-present AmidaWare LLC

@@ -5,0 +5,0 @@ Permission is hereby granted, free of charge, to any person obtaining a copy

Metadata-Version: 2.1
Name: meshctrl
Version: 0.1.11
Version: 0.1.12
Summary: Python port of MeshCentral's Meshctrl.js program

@@ -5,0 +5,0 @@ Home-page: https://github.com/amidaware/meshctrl-py

@@ -7,13 +7,12 @@ import setuptools

setuptools.setup(
name='meshctrl',
version='0.1.11',
description='Python port of MeshCentral\'s Meshctrl.js program',
name="meshctrl",
version="0.1.12",
description="Python port of MeshCentral's Meshctrl.js program",
long_description=long_description,
long_description_content_type="text/markdown",
url='https://github.com/amidaware/meshctrl-py',
author='Josh Krawczyk',
author_email='josh@torchlake.com',
license='MIT',
install_requires=["websockets", "pycryptodome"],
url="https://github.com/amidaware/meshctrl-py",
author="Josh Krawczyk",
author_email="josh@torchlake.com",
license="MIT",
install_requires=["websockets==10.2", "pycryptodome==3.14.1"],
classifiers=[

@@ -24,3 +23,2 @@ "Programming Language :: Python :: 3",

],
package_dir={"": "src"},

@@ -27,0 +25,0 @@ packages=setuptools.find_packages(where="src"),

Metadata-Version: 2.1
Name: meshctrl
Version: 0.1.11
Version: 0.1.12
Summary: Python port of MeshCentral's Meshctrl.js program

@@ -5,0 +5,0 @@ Home-page: https://github.com/amidaware/meshctrl-py

@@ -1,2 +0,2 @@

websockets
pycryptodome
websockets==10.2
pycryptodome==3.14.1
import json
import websockets
import asyncio
from typing import Optional
from typing import Optional, Union, List, Dict
from . import utils

@@ -16,2 +16,3 @@

# TODO: allow getting token in a file
def __init__(

@@ -82,11 +83,11 @@ self,

async def _websocket_call(self, data: dict) -> dict:
async def _websocket_call(self, data: Dict) -> Dict:
"""Initiates the websocket connection to mesh and returns the data.
Args:
data (dict):
data (Dict):
The data passed to MeshCentral.
Returns:
dict: MeshCentral Response.
Dict: MeshCentral Response.
"""

@@ -109,3 +110,3 @@

def _send(self, data: dict) -> dict:
def _send(self, data: Dict) -> Dict:
"""Initiates asynchronous call"""

@@ -115,28 +116,28 @@

def server_info(self) -> dict:
def server_info(self) -> Dict:
"""Gets MeshCentral server info.
Returns:
dict:
Dict:
Returns server info.
Example:
{
'domain': '',
'name': 'mesh.example.com',
'mpsname': 'mesh.example.com',
'mpsport': 4433,
'port': 4443,
'emailcheck': True,
'domainauth': False,
'serverTime': 1645560067270,
'features': 9607777,
'features2': 16513,
'languages': ['en', 'cs', 'da', 'de', 'es', 'fi', 'fr', 'hi', 'it', 'ja', 'ko', 'nl', 'nn', 'pl', 'pt-br', 'pt', 'ru', 'sv', 'tr', 'zh-chs', 'zh-cht'],
'tlshash': '16D462CC0D306CFC7F242382A1606E2A57E6481B4EAAC7E5C6D91EFA306F9CABD0CD91566A8A35C3DA9580E1F51CF985',
'agentCertHash': 'V7IZUeuuIWMCY8e1SIb8fKqM1RkS4fUmCbCZzi4cMMzHAi3EJPi9Y8CP5XQfz2tZ',
'https': True,
'redirport': 8080,
'magenturl': 'mc://mesh.example.com:4443',
'domainsuffix': '',
'domain': '',
'name': 'mesh.example.com',
'mpsname': 'mesh.example.com',
'mpsport': 4433,
'port': 4443,
'emailcheck': True,
'domainauth': False,
'serverTime': 1645560067270,
'features': 9607777,
'features2': 16513,
'languages': ['en', 'cs', 'da', 'de', 'es', 'fi', 'fr', 'hi', 'it', 'ja', 'ko', 'nl', 'nn', 'pl', 'pt-br', 'pt', 'ru', 'sv', 'tr', 'zh-chs', 'zh-cht'],
'tlshash': '16D462CC0D306CFC7F242382A1606E2A57E6481B4EAAC7E5C6D91EFA306F9CABD0CD91566A8A35C3DA9580E1F51CF985',
'agentCertHash': 'V7IZUeuuIWMCY8e1SIb8fKqM1RkS4fUmCbCZzi4cMMzHAi3EJPi9Y8CP5XQfz2tZ',
'https': True,
'redirport': 8080,
'magenturl': 'mc://mesh.example.com:4443',
'domainsuffix': '',
'certExpire': 1652972190000

@@ -146,13 +147,11 @@ }

data = {
"action": "serverinfo"
}
data = {"action": "serverinfo"}
return self._send(data)["serverinfo"]
return self._send(data)["serverinfo"]
def user_info(self) -> dict:
def user_info(self) -> Dict:
"""Gets logged on user info.
Returns:
dict:
Dict:
Returns current user info

@@ -162,18 +161,18 @@

{
'_id': 'user//username',
'name': 'username',
'creation': 1643754241,
'_id': 'user//username',
'name': 'username',
'creation': 1643754241,
'links': {
'mesh//oAUeYE3HCqUFXWCkqwqfW@ElJ7orX6hrNv$r$RyCEsVgtUQNxYC6dLs4jlfQNTPA': {
'rights': 4294967295
},
},
'mesh//$lhtFH8ZYcVEZYSqLx1O2vxqgSdzX9bjZLAbmRMz3lJ@XLulbyhqeRUPF4MbaN64': {
'rights': 4294967295
}
},
'email': 'example@example.com',
'emailVerified': True,
'siteadmin': 4294967295,
'pastlogin': 1645505345,
'access': 1645558617,
},
'email': 'example@example.com',
'emailVerified': True,
'siteadmin': 4294967295,
'pastlogin': 1645505345,
'access': 1645558617,
'login': 1645505346

@@ -183,9 +182,23 @@ }

data = {"action": "userinfo"}
return self._send(data)["userinfo"]
def list_user_sessions(self) -> Dict:
"""List connected websockets users
Returns:
List:
Returns list of users that are connected to websockets
Example:
{'user//tactical': 1}
"""
data = {
"action": "userinfo"
"action": "wssessioncount",
}
return self._send(data)["userinfo"]
return self._send(data)["wssessions"]
def get_device_group_id_by_name(self, group: str) -> Optional[str]:

@@ -244,3 +257,3 @@ """Get the device group id by group name.

def list_device_groups(self, hex: bool = False) -> list:
def list_device_groups(self, hex: bool = False) -> List[Dict]:
"""List device groups

@@ -256,3 +269,3 @@

Returns:
list: Mesh device groups.
List[Dict]: Mesh device groups.
"""

@@ -275,3 +288,2 @@

# TODO: Don't create device group if name already exists
def add_device_group(

@@ -284,3 +296,3 @@ self,

consent: int = 0,
) -> dict:
) -> Dict:
"""Add device group

@@ -308,3 +320,3 @@

Returns:
dict: Returns a confirmation that the device group was created
Dict: Returns a confirmation that the device group was created

@@ -335,3 +347,3 @@ Example:

self, id: Optional[str] = None, group: Optional[str] = None
) -> dict:
) -> Dict:
"""Remove device group by group name or id

@@ -349,3 +361,3 @@

Returns:
dict: Returns a confirmation that the device group was deleted.
Dict: Returns a confirmation that the device group was deleted.

@@ -373,3 +385,2 @@ Example:

# TODO: look into inviteCodes options
# TODO: Don't create device group if name already exists
def edit_device_group(

@@ -383,3 +394,3 @@ self,

consent: Optional[int] = None,
) -> dict:
) -> Dict:
"""Edit device group by group name or id

@@ -405,3 +416,3 @@

Returns:
dict: Returns a confirmation that the device group was updated.
Dict: Returns a confirmation that the device group was updated.

@@ -493,12 +504,10 @@ Example:

def list_users(self) -> list:
def list_users(self) -> List[Dict]:
"""List users
Returns:
list: Mesh user accounts.
List[Dict]: Mesh user accounts.
"""
data = {
"action": "users"
}
data = {"action": "users"}

@@ -519,3 +528,3 @@ return self._send(data)["users"]

rights: Optional[str] = None,
) -> dict:
) -> Dict:
"""Add User

@@ -551,3 +560,3 @@

Returns:
dict: Returns a confirmation that the user was added
Dict: Returns a confirmation that the user was added

@@ -590,3 +599,3 @@ Example:

if rights:
data["siteadmin"] = utils.permissions_str_to_int(rights)
data["siteadmin"] = utils.user_permissions_str_to_int(rights)

@@ -605,3 +614,3 @@ return self._send(data)

rights: Optional[str] = None,
) -> dict:
) -> Dict:
"""Edit User

@@ -632,3 +641,3 @@

Returns:
dict: Returns a confirmation that the user was edited
Dict: Returns a confirmation that the user was edited

@@ -671,3 +680,3 @@ Example:

def remove_user(self, username: str, domain: str = "") -> dict:
def remove_user(self, username: str, domain: str = "") -> Dict:
"""Delete User

@@ -684,3 +693,3 @@

Returns:
dict: Returns a confirmation that the user was deleted.
Dict: Returns a confirmation that the user was deleted.

@@ -703,4 +712,536 @@ Example:

def list_user_groups(self, json: bool = False) -> Union[List[Dict], Dict]:
"""List user groups
Args:
json (bool):
Return a dictionary with the group id as the key. Can be
useful for group lookup without iterating over all groups
Returns:
List[Dict] or Dict: Mesh user groups.
"""
data = {"action": "usergroups"}
if json:
return self._send(data)["ugroups"]
else:
return [group for group in self._send(data)["ugroups"].values()]
def add_user_group(
self, name: str, desc: Optional[str] = None, domain: Optional[str] = None
) -> Dict:
"""Add user group
Args:
name (str):
Name of the user group.
desc (str, optional):
Description of user group.
domain (str, optional):
Domain of user group
Returns:
Dict: Returns confirmation that the user group was added.
Example:
{
'action': 'createusergroup',
'responseid': '26100a76-0057-459d-9881-acf5fe357883',
'result': 'ok',
'ugrpid': 'ugrp//4nGnRRX@Ii9sL29TSYomnsZtRgDGKInE0d43HGsGposFtMwkBtxvYtsT6rX2XtdB',
'links': {}
}
"""
data = {
"action": "createusergroup",
"name": utils.format_usergroup_id(name, domain),
"responseid": utils.gen_response_id(),
}
if desc:
data["desc"] = desc
if domain:
data["domain"] = domain
return self._send(data)
def remove_user_group(self, group_id: str, domain: str = "") -> Dict:
"""Remove user group
Args:
group_id (str):
Id of the user group.
domain (str, optional)
Domain for the user group
Returns:
Dict: Returns confirmation that the user group was removed.
Example:
{
'action': 'deleteusergroup',
'responseid': '9f131710-2548-4607-ad7b-a1c2814c1c5c',
'result': 'ok',
'ugrpid': 'ugrp//HR5E2E9ax5hc$FD9hSFQKI7mKiknUjXy5r3Q$don5iOa2fMDTU0AwnsHCC8KHkNX'
}
"""
data = {
"action": "deleteusergroup",
"ugrpid": utils.format_usergroup_id(group_id, domain),
"responseid": utils.gen_response_id(),
}
return self._send(data)
def add_to_user_group(
self, group_id: str, id: str, domain: str = "", rights: Optional[int] = 0
) -> Dict:
"""Add to user group
Add a user, device or device group to a user group.
Args:
group_id (str):
Id of the user group. Can start with ugrp// or be just the name.
id (str)
Id of the user, device, or device group you are adding. Should start with
user//, node//, or mesh//
domain (str, optional)
Domain of user group.
rights (int, optional)
Rights granted for adding device or device group.
- 4294967295 for full admin or the sum of the following numbers.
- 1 = Edit Device Group 2 = Manage Users
- 4 = Manage Computers 8 = Remote Control
- 16 = Agent Console 32 = Server Files
- 64 = Wake Device 128 = Set Notes
- 256 = Remote View Only 512 = No Terminal
- 1024 = No Files 2048 = No Intel AMT
- 4096 = Desktop Limited Input 8192 = Limit Events
- 16384 = Chat / Notify 32768 = Uninstall Agent
- 65536 = No Remote Desktop 131072 = Remote Commands
- 262144 = Reset / Power off
Returns:
Dict: Returns confirmation that the user, device, or device group was added to user group.
Example:
{
'action': 'addusertousergroup',
'responseid': 'aa09cc76-70f2-47b1-b680-92d4e363eaab',
'result': 'ok',
'added': 1,
'failed': 0
}
"""
data = {
"responseid": utils.gen_response_id(),
}
if id.startswith("user/"):
data["action"] = "addusertousergroup"
data["ugrpid"] = utils.format_usergroup_id(group_id, domain)
data["usernames"] = [id.split("/")[2]]
elif id.startswith("mesh/"):
data["action"] = "addmeshuser"
data["userid"] = utils.format_usergroup_id(group_id, domain)
data["meshid"] = id
data["meshadmin"] = rights if rights else 0
elif id.startswith("node/"):
data["action"] = "adddeviceuser"
data["nodeid"] = utils.format_node_id(id)
data["userids"] = [utils.format_usergroup_id(group_id, domain)]
data["meshadmin"] = rights
else:
raise ValueError(
"The id is incorrect. Must start with mesh//, user//, or node//."
)
return self._send(data)
def remove_from_user_group(self, group_id: str, id: str, domain: str = "") -> Dict:
"""Remove from user group
Remove a user, device or device group from a user group.
Args:
group_id (str):
Id of the user group. Can start with ugrp// or be just the name.
id (str)
Id of the user, device, or device group you are adding. Should start with
user//, node//, or mesh//
domain (str, optional)
Domain of user group.
Returns:
Dict: Returns confirmation that the user, device, or device group was removed from user group.
Example:
{
'action': 'removeuserfromusergroup',
'responseid': '65243852-4480-400c-baae-d48ca313164e',
'result': 'ok'
}
"""
data = {
"responseid": utils.gen_response_id(),
}
if id.startswith("user/"):
data["action"] = "removeuserfromusergroup"
data["ugrpid"] = utils.format_usergroup_id(group_id, domain)
data["userid"] = id
elif id.startswith("mesh/"):
data["action"] = "removemeshuser"
data["userid"] = utils.format_usergroup_id(group_id, domain)
data["meshid"] = id
elif id.startswith("node/"):
data["action"] = "adddeviceuser"
data["nodeid"] = utils.format_node_id(id)
data["userids"] = [utils.format_usergroup_id(group_id, domain)]
data["meshadmin"] = 0
data["remove"] = True
else:
raise ValueError("id must start with user/, mesh/, or node/")
return self._send(data)
def move_to_device_group(
self, group_id: str, dev_id: str, domain: str = ""
) -> Dict:
"""Move node to new device group
Group_id can be the group name or full id.
Args:
group_id (str):
Name or id of the device group. Can start with mesh// or be just the name.
dev_id (str)
Id of the device you are moving.
domain (str, optional)
Domain of user group.
Returns:
Dict: Returns confirmation that the device was moved to device group.
"""
data = {
"action": "changeDeviceMesh",
"responseid": utils.gen_response_id(),
"nodeids": [utils.format_node_id(dev_id)],
"meshid": utils.format_devicegroup_id(group_id, domain),
}
return self._send(data)
def add_user_to_device_group(
self, group_id: str, user_id: str, domain: str = "", rights: Optional[int] = 0
) -> Dict:
"""Add user to device group
Args:
group_id (str):
Name or id of the device group. Can start with mesh// or be just the name.
user_id (str)
Id of the user you are adding.
domain (str, optional)
Domain of user group.
rights (int, optional)
Rights granted for adding device or device group.
- 4294967295 for full admin or the sum of the following numbers.
- 1 = Edit Device Group 2 = Manage Users
- 4 = Manage Computers 8 = Remote Control
- 16 = Agent Console 32 = Server Files
- 64 = Wake Device 128 = Set Notes
- 256 = Remote View Only 512 = No Terminal
- 1024 = No Files 2048 = No Intel AMT
- 4096 = Desktop Limited Input 8192 = Limit Events
- 16384 = Chat / Notify 32768 = Uninstall Agent
- 65536 = No Remote Desktop 131072 = Remote Commands
- 262144 = Reset / Power off
Returns:
Dict: Returns confirmation that the user was added to device group.
"""
data = {
"action": "addmeshuser",
"responseid": utils.gen_response_id(),
"usernames": [utils.format_user_id(user_id, domain)],
"meshadmin": rights,
"meshid": utils.format_devicegroup_id(group_id, domain),
}
return self._send(data)
def remove_user_from_device_group(
self, group_id: str, user_id: str, domain: str = ""
) -> Dict:
"""Remove user from device group
Args:
group_id (str):
Name or id of the device group. Can start with mesh// or be just the name.
user_id (str)
Id of the user you are removing.
domain (str, optional)
Domain of user group.
Returns:
Dict: Returns confirmation that the user was removed from device group.
"""
data = {
"action": "removemeshuser",
"responseid": utils.gen_response_id(),
"usernames": [utils.format_user_id(user_id, domain)],
"meshid": utils.format_devicegroup_id(group_id, domain),
}
return self._send(data)
def add_user_to_device(
self, node_id: str, user_id: str, domain: str = "", rights: Optional[int] = 0
) -> Dict:
"""Add user to device
Args:
node_id (str):
id of device. Can start with node// or be just the name.
user_id (str)
Id of the user you are adding.
domain (str, optional)
Domain of user group.
rights (int, optional)
Rights granted for adding device or device group.
- 4294967295 for full admin or the sum of the following numbers.
- 1 = Edit Device Group 2 = Manage Users
- 4 = Manage Computers 8 = Remote Control
- 16 = Agent Console 32 = Server Files
- 64 = Wake Device 128 = Set Notes
- 256 = Remote View Only 512 = No Terminal
- 1024 = No Files 2048 = No Intel AMT
- 4096 = Desktop Limited Input 8192 = Limit Events
- 16384 = Chat / Notify 32768 = Uninstall Agent
- 65536 = No Remote Desktop 131072 = Remote Commands
- 262144 = Reset / Power off
Returns:
Dict: Returns confirmation that the user was added to device.
"""
data = {
"action": "adddeviceuser",
"responseid": utils.gen_response_id(),
"usernames": [utils.format_user_id(user_id, domain)],
"nodeid": utils.format_node_id(node_id, domain),
"rights": rights,
}
return self._send(data)
def remove_user_from_device(
self, node_id: str, user_id: str, domain: str = ""
) -> Dict:
"""Add user to device
Args:
node_id (str):
id of device. Can start with node// or be just the name.
user_id (str)
Id of the user you are removing.
domain (str, optional)
Domain of user group.
Returns:
Dict: Returns confirmation that the user was removed from device.
"""
data = {
"action": "adddeviceuser",
"responseid": utils.gen_response_id(),
"usernames": [utils.format_user_id(user_id, domain)],
"nodeid": utils.format_node_id(node_id, domain),
"rights": 0,
"remove": True,
}
return self._send(data)
def list_devices(
self,
group_id: Optional[str] = None,
domain: str = "",
count: bool = False,
details: bool = False,
json: bool = False,
filter: Optional[str] = None,
filter_ids: Optional[List[str]] = None,
) -> Union[List[Dict], int, Dict]:
"""List devices
List all devices or filter by a device group or filter by other properties
Args:
group_id (str, optional):
id of group. Can start with mesh// or be just the name.
domain (str, optional):
Domain of the devices and device group
count (bool, optional)
Only return the device count
json (bool, optional)
Returns the json device list result instead of a list
details (bool, optional):
Show all device details in output
filter_ids (List[str], optional):
List of devices to filter by
filter (str, optional):
filter strings. accepted values below:
x - Devices with x in the name.
user:x or u:x - Devices with x in the name of currently logged in user.
ip:x - Devices x IP address.
group:x or g:x - Devices with x in device group name.
tag:x or t:x - Devices with x in device tag.
atag:x or a:x - Devices with x in device agent tag.
os:x - Devices with x in the device OS description.
amt:x - Devices with Intel AMT provisioning state (0, 1, 2).
desc:x - Devices with x in device description.
wsc:ok - Devices with Windows Security Center ok.
wsc:noav - Devices with Windows Security Center with anti-virus problem.
wsc:noupdate - Devices with Windows Security Center with update problem.
wsc:nofirewall - Devices with Windows Security Center with firewall problem.
wsc:any - Devices with Windows Security Center with any problem.
a and b - Match both conditions with precedence over OR. For example: lab and g:home.
a or b - Math one of the conditions, for example: lab or g:home.
Returns:
Dict: Returns confirmation that the user was added to device.
"""
data = {}
if details:
data["action"] = "getDeviceDetails"
else:
data["action"] = "nodes"
if group_id:
data["meshid"] = utils.format_devicegroup_id(group_id, domain)
nodes = self._send(data)["nodes"]
if filter_ids:
for meshid in nodes:
nodes[meshid] = [
node
for node in nodes[meshid]
if node._id.split("/")[-1] in filter_ids
]
if filter:
for meshid in nodes:
nodes[meshid] = utils.parse_and_search_nodes(
nodes, filter, device_groups=self.list_device_groups()
)
if count:
return len([node for sublist in nodes.values() for node in list])
elif json:
return nodes
else:
return [node for sublist in nodes.values() for node in list]
def device_info(self, id: str) -> Dict:
"""Get device info
Args:
id (str):
id of device. Can start with node// or be just the id.
Returns:
Dict: Returns information about the device
"""
raise NotImplementedError()
def list_events(self):
# TODO
raise NotImplementedError()
def list_login_tokens(self):
# TODO
raise NotImplementedError()
def add_login_token(self):
# TODO
raise NotImplementedError()
def remove_login_token(self):
# TODO
raise NotImplementedError()
def broadcast_message(self):
# TODO
raise NotImplementedError()
def remove_all_users_from_user_group(self):
# TODO
raise NotImplementedError()
def send_invite_email(self):
# TODO
raise NotImplementedError()
def generate_invite_link(self):
# TODO
raise NotImplementedError()
def shell(self):
# TODO
raise NotImplementedError()
def device_power(self):
# TODO
raise NotImplementedError()
def device_sharing(self):
# TODO
raise NotImplementedError()
def agent_download(self):
# TODO
raise NotImplementedError()
def upload(self):
# TODO
raise NotImplementedError()
def download(self):
# TODO
raise NotImplementedError()
def device_open_url(self):
# TODO
raise NotImplementedError()
def device_message(self):
# TODO
raise NotImplementedError()
def device_toast(self):
# TODO
raise NotImplementedError()
# run command on an agent
def run_command(self, node_id: str, command: str, runAsUser: int = 0) -> dict:
def run_command(self, node_id: str, command: str, runAsUser: int = 0) -> Dict:

@@ -707,0 +1248,0 @@ data = {

@@ -6,2 +6,4 @@ import time

import string
import re
from typing import Dict, List, Optional

@@ -51,35 +53,32 @@ from Crypto.Cipher import AES

def permissions_str_to_int(perms: str) -> int:
site_perms = 0x00000000
perms_list = perms.lower().split(",")
for perm in perms_list:
if perm == "none":
break
elif perm == "full":
site_perms = 0xFFFFFFFF
break
elif perm == "backup":
site_perms |= 0x00000001
elif perm == "manageusers":
site_perms |= 0x00000002
elif perm == "restore":
site_perms |= 0x00000004
elif perm == "fileaccess":
site_perms |= 0x00000008
elif perm == "update":
site_perms |= 0x00000010
elif perm == "locked":
site_perms |= 0x00000020
elif perm == "nonewgroups":
site_perms |= 0x00000040
elif perm == "notools":
site_perms |= 0x00000080
elif perm == "usergroups":
site_perms |= 0x00000100
elif perm == "recording":
site_perms |= 0x00000200
elif perm == "locksettings":
site_perms |= 0x00000400
elif perm == "allevents":
site_perms |= 0x00000800
def user_permissions_str_to_int(perms: str) -> int:
site_perms = 0
if "none" in perms:
return 0
elif "full" in perms:
return 0xFFFFFFFF
elif "backup" in perms:
site_perms |= 1
elif "manageusers" in perms:
site_perms |= 2
elif "restore" in perms:
site_perms |= 4
elif "fileaccess" in perms:
site_perms |= 8
elif "update" in perms:
site_perms |= 16
elif "locked" in perms:
site_perms |= 32
elif "nonewgroups" in perms:
site_perms |= 64
elif "notools" in perms:
site_perms |= 128
elif "usergroups" in perms:
site_perms |= 256
elif "recording" in perms:
site_perms |= 512
elif "locksettings" in perms:
site_perms |= 1024
elif "allevents" in perms:
site_perms |= 2048

@@ -89,6 +88,207 @@ return int(site_perms)

def format_user_id(username, domain=""):
def devicegroup_permissions_str_to_int(perms: str) -> int:
rights = 0
if "fullrights" in perms:
return 0xFFFFFFFF
if "editgroup" in perms:
rights |= 1
if "manageusers" in perms:
rights |= 2
if "managedevices" in perms:
rights |= 4
if "remotecontrol" in perms:
rights |= 8
if "agentconsole" in perms:
rights |= 16
if "serverfiles" in perms:
rights |= 32
if "wakedevices" in perms:
rights |= 64
if "notes" in perms:
rights |= 128
if "desktopviewonly" in perms:
rights |= 256
if "noterminal" in perms:
rights |= 512
if "nofiles" in perms:
rights |= 1024
if "noamt" in perms:
rights |= 2048
if "limiteddesktop" in perms:
rights |= 4096
if "limitedevents" in perms:
rights |= 8192
if "chatnotify" in perms:
rights |= 16384
if "uninstall" in perms:
rights |= 32768
return rights
def format_user_id(username: str, domain="") -> str:
if "user/" not in username:
return f"user/{domain}/{username}"
return f"user/{domain}/{username.lower()}"
else:
return username
return username.lower()
def format_usergroup_id(group: str, domain="") -> str:
if "ugrp/" not in group:
return f"ugrp/{domain}/{group}"
else:
return group
def format_devicegroup_id(group: str, domain="") -> str:
if "mesh/" not in group:
return f"ugrp/{domain}/{group}"
else:
return group
def format_node_id(node: str, domain="") -> str:
if "node/" not in node:
return f"node/{domain}/{node}"
else:
return node
def parse_and_search_nodes(
nodes: List[Dict], filter: str, device_groups: Dict
) -> List[Dict]:
results = []
filters = filter.split(" or ")
for f in filters:
or_results = parse_search_and_input(nodes, f)
if not results:
results = or_results
else:
results = [node for node in or_results if node in results]
def parse_search_and_input(
nodes: List[Dict], f: str, device_groups: Dict
) -> List[Dict]:
filters = filter.split(" and ")
results = []
for f in filters:
and_results = filter_devices_by_filter(nodes, f)
if not results:
results = and_results
else:
results = [node for node in and_results if node in results]
return results
def filter_devices_by_filter(
nodes: List[Dict], f: str, device_groups: Dict
) -> List[Dict]:
results = []
user_search = f[5:] if f.startsWith("user:".lower()) else None
user_search = f[2:] if f.startsWith("u:".lower()) else None
ip_search = f[3:] if f.startsWith("ip:".lower()) else None
group_search = f[6:] if f.startsWith("group:".lower()) else None
group_search = f[2:] if f.startsWith("g:".lower()) else None
tag_search = f[4:] if f.startsWith("tag:".lower()) else None
tag_search = f[2:] if f.startsWith("t:".lower()) else None
agent_tag_search = f[5:] if f.startsWith("atag:".lower()) else None
os_search = f[3:] if f.startsWith("os:".lower()) else None
amt_search = f[4:] if f.startsWith("amt:".lower()) else None
desc_search = f[5:] if f.startsWith("desc:".lower()) else None
wsc_search = None
if f == "wsc:ok":
wsc_search = 1
elif f == "wsc:noav":
wsc_search = 2
elif f == "wsc:noupdate":
wsc_search = 3
elif f == "wsc:nofirewall":
wsc_search = 4
elif f == "wsc:any":
wsc_search = 5
if f == "":
results = nodes
elif ip_search:
results = [node for node in nodes if node["ip"] and ip_search in node["ip"]]
elif group_search:
if device_groups:
results = [
node
for node in nodes
if group_search in device_groups[node["meshid"]].name
]
elif tag_search or tag_search == "":
results = [
node
for node in nodes
if ((node["agent"] and not node["agent"]["tag"]) and tag_search == "")
or (
node["agent"]
and node["agent"]["tag"]
and node["agent"]["tag"].lower() in tag_search
)
]
elif agent_tag_search or agent_tag_search == "":
results = [
node
for node in nodes
if (node["agent"] and node["agent"]["tag"] and agent_tag_search == "")
or (
node["agent"]
and node["agent"]["tag"]
and node["agent"]["tag"].lower() in agent_tag_search
)
]
elif user_search:
results = [
node for node in nodes if node["users"] and user_search in node["users"]
]
elif os_search:
results = [
node
for node in nodes
if node["osdesc"] and os_search in node["osdesc"].lower()
]
elif amt_search or amt_search == "":
results = [
node
for node in nodes
if (node["intelamt"] and amt_search == "")
or node["intelamt"]["state"] == amt_search
]
elif desc_search or desc_search == "":
results = [
node
for node in nodes
if (
"desc" in node["desc"].keys()
and node["desc"] != ""
and (desc_search == "" or desc_search in node["desc"])
)
]
elif wsc_search:
for node in nodes:
if (
wsc_search == 1
and node["wsc"]["antiVirus"] == "OK"
and node["wsc"]["autoUpdate"] == "OK"
and node["wsc"]["firewall"] == "OK"
):
results.append(node)
elif (wsc_search == 2 or wsc_search == 5) and (
node["wsc"]["antiVirus"] != "OK"
):
results.append(node)
elif (wsc_search == 3 or wsc_search == 5) and (node["autoUpdate" != "OK"]):
results.append(node)
elif (wsc_search == 4 or wsc_search == 5) and (
node["wsc"]["firewall"] != "OK"
):
results.append(node)
else:
regex = re.compile("|".join(re.split(r"/\s+/", f)))
results = [node for node in nodes if regex.search(node["name"].lower())]
return results