New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

centralcli

Package Overview
Dependencies
Maintainers
1
Versions
322
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

centralcli - pypi Package Compare versions

Comparing version
9.1.0
to
9.2.3
+125
centralcli/clitree/generate.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pathlib import Path
import typer
from centralcli import common, config, render
from centralcli.clicommon import Mac
from rich.markup import escape
app = typer.Typer()
def _generate_bssids_from_file(file: Path, out: Path = None, num_bssids: int = 6):
out_file = out or Path(str(file).replace(file.suffix, f"_out{file.suffix}"))
file_data = config.get_file_data(file)
if not file_data:
common.exit(f"No data found in {file}")
macs = [Mac(ap["mac"] , bssids=True, num_bssids=6) for ap in file_data]
invalid_macs = [(idx, m) for idx, m in enumerate(macs, start=2) if not m.ok]
if invalid_macs:
err = "\n".join([f"Mac: {m.orig} on row {row} in invalid." for row, m in invalid_macs])
common.exit(err)
out_data = []
for ap, mac_obj in zip(file_data, macs):
to_end = ["BSSID", "LocationId", "LocationValue", "Description"]
bssids = [bssid for radio_bssid_list in mac_obj.bssids.values() for bssid in radio_bssid_list[0:num_bssids]]
out_data += [{**{k: v for k, v in ap.items() if k not in to_end}, "BSSID": bssid, "LocationId": ap.get("LocationId"), "Description": ap.get("Description")} for bssid in bssids]
# out_data += [{**{k: v for k, v in ap.items() if k not in to_end}, "BSSID": bssid, "LocationId": ap.get("LocationId"), "LocationValue": ap.get("LocationValue"), "Description": ap.get("Description")} for bssid in bssids]
headers = ",".join(out_data[0].keys())
values = "\n".join([",".join(['' if v is None else v for v in inner.values()]) for inner in out_data])
csv_data = f"{headers}\n{values}"
written = out_file.write_text(csv_data)
render.console.print(csv_data)
render.console.print(f"Output written to {out_file}")
return written
# F070-11-1600 Market Street-Floor11
def _build_description(ap_name: str, site_name: str | None, description: str | None) -> str:
if site_name is None and description:
return description
_, site_code, floor, _, _ = map(lambda txt: txt.removeprefix("WAP").removeprefix("wap"), ap_name.split("-"))
padded_floor = floor if len(floor) == 2 else f"{floor:02s}"
floor = padded_floor.removeprefix("0")
return f"{site_code}-{padded_floor}-{site_name}-Floor{floor}"
def bssids_from_xls(
file: Path,
site: str = None,
out: Path = None,
bssids: bool = True,
num_bssids: int = 6,
bssid_out: Path = None
):
prepped_file = out or file.parent / f"{file.stem}.csv"
import tablib
import importlib.util
if importlib.util.find_spec("et_xmlfile") is None:
common.exit(f"Missing optional xlsx support. re-install centralcli with optional dependency to add support for xlsx files. [cyan]uv tool install {escape('centralcli[xlsx]')}[/]\n[italic]No need to uninstall, just re-run as described to add support for xlsx[/]")
book = tablib.Databook()
book.xlsx = file.read_bytes() # type: ignore
datasets = [ds for ds in book.sheets() if "summary" not in ds.title.lower()]
out_keys = ["name", "serial", "mac", "BSSID", "LocationId", "Description"]
output = []
for ds in datasets:
as_dict = [{k.lower().removesuffix(" number").removesuffix("-address"): v for k, v in inner.items()} for inner in ds.dict]
as_dict = [{**inner, "BSSID": None, "LocationId": None, "Description": None if ds.title.lower().startswith("spare") else _build_description(inner['name'], site_name=site, description=inner.get("Description", inner.get("description")))} for inner in as_dict if any(inner.values())]
output += as_dict
header = ",".join(out_keys)
written = prepped_file.write_text(
"\n".join([header, *[",".join('' if ap.get(key) is None else ap[key] for key in out_keys) for ap in output]])
)
if not bssids or not written:
common.exit(f"Wrote {written} bytes to {prepped_file.name}", code=0 if written else 1)
written = _generate_bssids_from_file(prepped_file, out=bssid_out, num_bssids=num_bssids)
common.exit(code=0 if written else 1)
@app.command()
def bssids(
ap_mac: str = typer.Argument(None, help="AP Mac Address", show_default=False),
file: Path = typer.Option(None, help="fetch MACs from file for bssid calc", exists=True, show_default=False),
dir: Path = typer.Option(None, help="process all files in dir for bssid calc", exists=True, show_default=False),
site: str = typer.Option(None, "--site", "-s", help="The official name of the site (from SFDC), not required if [cyan]description[/] field is in the input file. [dim italic]Only applies with [cyan]--file[/][/] :triangular_flag:"),
num_bssids: int = typer.Option(None, "-n", help="The number of BSSIDs to generate in output file. [dim italic]Only applies with [cyan]--file[/][/] :triangular_flag:"), # TODO render.help_block add helper for this common... Valid/Only applies with --{flag} :triangular_flag:
vertical: bool = typer.Option(False, "-V", "--vertical", help="Display BSSIDs vertically with no headers"),
out: Path = typer.Option(None, help="Output to file. --file option will always create an output file ... by default will create new file with same name as --file with _out appended to the file name.",),
):
"""Generate bssids based on AP MAC(s)
Using --file :triangular_flag: will result in a file containing the headers necessary for import into MS Teams for e911 location.
"""
if ap_mac and Path(ap_mac).exists():
file = Path(ap_mac) # allow them to specify the file as the first arg
if file:
if file.suffix == ".xlsx":
bssids_from_xls(file, site=site, num_bssids=num_bssids, bssid_out=out)
written = _generate_bssids_from_file(file, out=out, num_bssids=num_bssids)
common.exit(code=0 if written else 1)
elif dir:
files = [file for file in dir.iterdir() if file.suffix in [".csv", ".yaml", ".yml", ".json"]]
written = [_generate_bssids_from_file(file, out=out, num_bssids=num_bssids) for file in files]
common.exit(code=0 if all(written) else 1)
else:
typer.echo(Mac(ap_mac, bssids=True, num_bssids=num_bssids, tablefmt="table" if not vertical else "vertical"))
@app.callback()
def callback():
"""Generate bssids based on AP MAC(s)"""
pass
if __name__ == "__main__":
app()
from __future__ import annotations
from typing import TYPE_CHECKING
from centralcli import utils
from centralcli.client import Session
from centralcli.constants import DeviceStatusFilter, APDeployment
if TYPE_CHECKING:
from centralcli.response import Response
class MonitoringAPI:
def __init__(self, session: Session):
self.session = session
async def get_aps(
self,
site_id: int | None = None,
model: str | None = None,
status: DeviceStatusFilter | None = None,
deployment: APDeployment | None = None,
limit: int = 100,
next: int | None = 1
) -> Response:
url = "/network-monitoring/v1alpha1/aps"
filters = {
"siteId": site_id,
"model": model,
"status": status if status is None or not hasattr(status, "value") else status.value,
"deployment": deployment if deployment is None or not hasattr(deployment, "value") else deployment.value,
}
filters = utils.strip_none(filters)
params = {
"filter": " and ".join(f"{k} eq '{v}'" for k, v in filters.items()) or None, # TODO need to test if Enum works without sending .value
"limit": limit,
"next": next
}
return await self.session.get(url, params=params)
+24
-0

@@ -27,2 +27,3 @@ #!/usr/bin/env python3

from typing import Callable, Iterable, List, Literal, Optional, Sequence, overload
from functools import cached_property

@@ -260,3 +261,26 @@ import click

from .classic.api import ClassicAPI
from .cnx.api import CentralAPI, GreenLakeAPI
class APIClients: # TODO play with cached property vs setting in init to see how it impacts import performance across the numerous files that need this
def __init__(self, *, classic_base_url: str = config.classic.base_url, glp_base_url: str = config.glp.base_url, cnx_base_url: str = config.cnx.base_url, silent: bool = False):
self.classic_base_url = classic_base_url
self.glp_base_url = glp_base_url
self.cnx_base_url = cnx_base_url
self.silent = silent
@cached_property
def classic(self):
return ClassicAPI(self.classic_base_url, silent=self.silent)
@cached_property
def glp(self):
return None if not config.glp.ok else GreenLakeAPI(self.glp_base_url, silent=self.silent)
@cached_property
def cnx(self):
return None if not config.cnx.ok else CentralAPI(self.cnx_base_url, silent=self.silent)
api_clients = APIClients()
from .cache import Cache, CacheCert, CacheClient, CacheDevice, CacheGroup, CacheGuest, CacheInvDevice, CacheLabel, CacheMpsk, CacheMpskNetwork, CachePortal, CacheSite, CacheTemplate, CacheFloorPlanAP, CacheBuilding

@@ -263,0 +287,0 @@

+2
-3

@@ -5,8 +5,7 @@ import csv

from centralcli import common, config, render, utils
from centralcli import api_clients, common, config, render, utils
from .classic.api import ClassicAPI
from .response import Response
api = ClassicAPI(config.classic.base_url)
api = api_clients.classic

@@ -13,0 +12,0 @@

+12
-22

@@ -13,3 +13,3 @@ #!/usr/bin/env python3

from centralcli.client import BatchRequest
from centralcli.clitree import add, assign, caas, cancel, check, clone, convert, export, kick, refresh, rename, test, ts, unassign, update, upgrade
from centralcli.clitree import add, assign, caas, cancel, check, clone, convert, export, kick, refresh, rename, test, ts, unassign, update, upgrade, generate
from centralcli.clitree import dev as clidev

@@ -55,2 +55,3 @@ from centralcli.clitree.batch import batch

app.add_typer(convert.app, name="convert",)
app.add_typer(generate.app, name="generate",)
app.add_typer(clidev.app, name="dev", hidden=True)

@@ -403,3 +404,3 @@

collect: bool = typer.Option(False, "--collect", "-c", help="Store raw webhooks in local json file", hidden=True),
yes: bool = common.options.yes,
yes: int = common.options.yes_int,
debug: bool = common.options.debug,

@@ -593,5 +594,5 @@ default: bool = common.options.default,

# TOGLP
@app.command(hidden=True)
@app.command(hidden=False)
def enable(
what: EnableDisableArgs = typer.Argument("auto-sub"),
what: EnableDisableArgs = typer.Argument(...),
services: list[common.cache.LicenseTypes] = typer.Argument(..., show_default=False), # type: ignore

@@ -608,10 +609,5 @@ yes: bool = common.options.yes,

"""
_msg = "[bright_green]Enable[/] auto-subscribe for license"
if len(services) > 1: # pragma: no cover
_svc_msg = '\n '.join([s.name for s in services])
_msg = f'{_msg}s:\n {_svc_msg}\n'
else:
svc = services[0]
_msg = f'{_msg} {svc.name}'
render.econsole.print(_msg)
services: list[LicenseTypes] = services # retyping common.cache.LicenseTypes
_msg = "[bright_green]Enable[/] auto-subscribe for the following subscription tiers:"
render.econsole.print(f"{_msg} {utils.summarize_list(services, max=None)}")
render.econsole.print('\n[dark_orange]!![/] Enabling auto-subscribe applies the specified tier (i.e. foundation/advanced) for [green bold]all[/] devices of the same type.')

@@ -627,5 +623,5 @@ render.econsole.print('[cyan]enable auto-sub advanced-switch-6300[/] will result in [green bold]all[/] switch models being set to auto-subscribe the advanced license appropriate for that model.')

@app.command(hidden=True)
@app.command(hidden=False)
def disable(
what: EnableDisableArgs = typer.Argument("auto-sub"),
what: EnableDisableArgs = typer.Argument(...),
services: list[common.cache.LicenseTypes] = typer.Argument(..., show_default=False), # type: ignore

@@ -643,10 +639,4 @@ yes: bool = common.options.yes,

services: list[LicenseTypes] = services # retyping common.cache.LicenseTypes
_msg = "[bright_green]Disable[/] auto-subscribe for license"
if len(services) > 1: # pragma: no cover
_svc_msg = '\n '.join([s.name for s in services])
_msg = f'{_msg}s:\n {_svc_msg}\n'
else:
svc = services[0]
_msg = f'{_msg} {svc.name}'
render.econsole.print(_msg)
_msg = "[bright_green]Disable[/] auto-subscribe for the following subscription tiers:"
render.econsole.print(f"{_msg} {utils.summarize_list(services, max=None)}")
render.econsole.print('\n[dark_orange3]:warning:[/] Disabling auto subscribe removes auto-subscribe for all models of the same type.')

@@ -653,0 +643,0 @@ render.econsole.print('[cyan]disable auto-sub advanced-switch-6300[/] will result in auto-subscribe being disabled for [green bold]all[/] switch models.')

@@ -38,2 +38,3 @@ from __future__ import annotations

class LoggedRequests:

@@ -369,2 +370,7 @@ def __init__(self, url: str, method: str = "GET", ok: bool = None):

self.auth.handle_expired_token()
elif resp.status == 401 and self.is_cnx:
spin_txt_retry = "(retry after token refresh)"
self.spinner.start(f"Attempting to Refresh {'' if not self.is_cnx else '[green]GLP[/] '}Token")
self.auth.handle_expired_token()
self.spinner.succeed()
elif resp.status == 500:

@@ -511,14 +517,36 @@ spin_txt_retry = ":shit: [bright_red blink]retry[/] after 500: [cyan]Internal Server Error[/]"

# total is provided for some calls with the total # of records available
# TODO # TOGLP need to use "next" as pagination field
# if params.get("limit") and params.get("next") and isinstance(r.raw, dict) and r.raw.get("total") and (len(r.output) + (params.get("limit", 0) * params["next"]) < r.raw["total"]):
is_events = True if url.endswith("/monitoring/v2/events") else False
do_next = do_pagination = False
if params.get("limit") and params.get("next") and isinstance(r.raw, dict) and r.raw.get("total") and (len(r.output) if params["next"] in [None, 1] else len(r.output) + (params.get("limit", 0)) * params["next"]) < r.raw["total"]:
do_pagination = True
do_next = True
if params.get(offset_key, 99) == 0 and isinstance(r.raw, dict) and r.raw.get("total") and (len(r.output) + params.get("limit", 0) < r.raw.get("total", 0)):
do_pagination = True
if do_pagination:
_total = count or r.raw["total"] if not is_events or r.raw["total"] <= 10_000 else 10_000 # events endpoint will fail if offset + limit > 10,000
if _total > len(r.output):
_limit = params.get("limit", 100)
_offset = params.get(offset_key, 0)
br = BatchRequest
_reqs = [
br(self.exec_api_call, url, data=data, json_data=json_data, method=method, headers=headers, params={**params, offset_key: i, "limit": _limit}, **kwargs)
for i in range(len(r.output), _total, _limit)
]
if not do_next:
_offset = params.get(offset_key, 0)
br = BatchRequest
_reqs = [
br(self.exec_api_call, url, data=data, json_data=json_data, method=method, headers=headers, params={**params, offset_key: i, "limit": _limit}, **kwargs)
for i in range(len(r.output), _total, _limit)
]
else:
_next_possible_range = list(range(int(r.raw["next"]), _total + 1))
_next_range = [i for i in _next_possible_range if i * _limit <= _total]
if _next_range[-1] * _limit < _total:
_next_range += [min([i for i in _next_possible_range if i * _limit > _total])] # TODO more elegant way to do this.
br = BatchRequest
_reqs = [
br(self.exec_api_call, url, data=data, json_data=json_data, method=method, headers=headers, params={**params, "next": i, "limit": _limit}, **kwargs)
for i in _next_range
]
batch_res: List[Response] = await self._batch_request(_reqs)

@@ -728,5 +756,8 @@ failures: List[Response] = [r for r in batch_res if not r.ok] # A failure means both the original attempt and the retry failed.

_tot_start = time.perf_counter()
max_calls_per_chunk = MAX_CALLS_PER_CHUNK
if self.base_url == config.cnx.base_url and not any([call.args and call.args[0].startswith("http") for call in api_calls]):
max_calls_per_chunk = 10
if self.requests: # a call has been made no need to verify first call (token refresh)
chunked_calls = utils.chunker(api_calls, MAX_CALLS_PER_CHUNK)
chunked_calls = utils.chunker(api_calls, max_calls_per_chunk)
else:

@@ -741,3 +772,3 @@ resp: Response = await api_calls[0].func(

m_resp: List[Response] = utils.listify(resp)
chunked_calls = utils.chunker(api_calls[1:], MAX_CALLS_PER_CHUNK)
chunked_calls = utils.chunker(api_calls[1:], max_calls_per_chunk)

@@ -744,0 +775,0 @@ # Make calls 6 at a time ensuring timing so that 7 per second limit is not exceeded

@@ -18,7 +18,7 @@ # -*- coding: utf-8 -*-

ArgumentType = Literal["cache", "name", "device", "devices", "device_type", "what", "group", "groups", "group_dev", "site", "import_file", "wid", "version", "session_id", "ssid", "portal", "portals"]
ArgumentType = Literal["cache", "name", "device", "devices", "device_type", "what", "group", "groups", "group_dev", "site", "import_file", "wid", "version", "session_id", "ssid", "portal", "portals", "banner_file"]
OptionType = Literal[
"client", "group", "group_many", "site", "site_many", "label", "label_many", "debug", "debugv", "device_type", "do_json", "do_yaml", "do_csv", "do_table",
"outfile", "reverse", "pager", "ssid", "yes", "yes_int", "device_many", "device", "swarm_device", "swarm", "sort_by", "default", "workspace", "verbose",
"raw", "end", "update_cache", "show_example", "at", "in", "reboot", "start", "past", "subscription", "version", "not_version", "band"
"raw", "end", "update_cache", "show_example", "at", "in", "reboot", "start", "past", "subscription", "version", "not_version", "band", "banner", "banner_file",
]

@@ -41,2 +41,3 @@

self.import_file: ArgumentInfo = typer.Argument(None, exists=True, show_default=False,)
self.banner_file: ArgumentInfo = typer.Argument(None, help="The file with the desired banner text. [dim italic]supports .j2 (Jinja2) template[/]", exists=True, show_default=False)
self.wid: ArgumentInfo = typer.Argument(..., help="Use [cyan]show webhooks[/] to get the wid", show_default=False,)

@@ -194,2 +195,4 @@ self.portal: ArgumentInfo = typer.Argument(..., metavar=iden_meta.portal, autocompletion=cache.portal_completion, show_default=False,)

self.band: OptionInfo = typer.Option(None, help=f"Show Bandwidth for a specific band [dim]{escape('[ap must be provided]')}[/]", show_default=False)
self.banner_file: OptionInfo = typer.Option(None, "--banner-file", help="The file with the desired banner text. [dim italic]supports .j2 (Jinja2) template[/]", exists=True, show_default=False)
self.banner: OptionInfo = typer.Option(False, "--banner", help="Update banner text. This option will prompt for banner text (paste into terminal)", show_default=False)
self.yes: OptionInfo = typer.Option(False, "-Y", "-y", "--yes", help="Bypass confirmation prompts - Assume Yes",)

@@ -196,0 +199,0 @@ self.yes_int: OptionInfo = typer.Option(

@@ -15,3 +15,3 @@ #!/usr/bin/env python3

from centralcli import common, config, log, render, utils
from centralcli import api_clients, common, config, log, render, utils
from centralcli.client import BatchRequest

@@ -25,11 +25,8 @@ from centralcli.constants import DevTypes, GatewayRole, NotifyToArgs, iden_meta, state_abbrev_to_pretty

from ..clicommon import APIClients
api_clients = APIClients()
api = api_clients.classic
app = typer.Typer()
color = utils.color
class AddWlanArgs(str, Enum):

@@ -36,0 +33,0 @@ type = "type"

@@ -7,5 +7,4 @@ #!/usr/bin/env python3

from centralcli import common, log, render, utils
from centralcli import api_clients, common, log, render, utils
from centralcli.cache import CacheDevice, CacheLabel, CacheSub
from centralcli.clicommon import APIClients
from centralcli.client import BatchRequest

@@ -15,3 +14,3 @@ from centralcli.constants import iden_meta

api_clients = APIClients()
api = api_clients.classic

@@ -82,3 +81,3 @@ glp_api = api_clients.glp

sub: CacheSub = common.cache.get_sub_identifier(sub_name_or_id, end_date=end_date)
sub: CacheSub = common.cache.get_sub_identifier(sub_name_or_id, end_date=end_date, best_match=True) # TODO add qty param and return list of sub objects if best sub object can not satisfy the qty necessary
if len(devices) > sub.available:

@@ -85,0 +84,0 @@ log.warning(f"{len(devices)} devices exceeds {sub.available}... the number of available subscriptions for [bright_green]{sub.name}[/bright_green]|[medium_spring_green]{sub.key}[/]. [dim italic]As of last Subscription cache update[/]", show=True)

@@ -15,2 +15,3 @@ from __future__ import annotations

from centralcli.cache import api
from centralcli.constants import APIAction
from centralcli.models.imports import ImportMACs, ImportMPSKs

@@ -189,2 +190,25 @@

@app.command()
def variables(
import_file: Path = common.arguments.get("import_file", help="Path to file with variables"),
show_example: bool = common.options.show_example,
yes: bool = common.options.yes,
debug: bool = common.options.debug,
default: bool = common.options.default,
workspace: str = common.options.workspace,
) -> None:
"""Batch add variables for devices based on data from required import file.
Use [cyan]cencli batch add variables --example[/] to see example import file formats.
[italic]Accepts same format as Aruba Central UI, but also accepts .yaml[/]
"""
if show_example:
render.console.print(examples.add_variables)
return
if not import_file:
common.exit(render._batch_invalid_msg("cencli batch add variables [OPTIONS] [IMPORT_FILE]"))
common.batch_add_update_replace_variables(import_file, action=APIAction.ADD, yes=yes)
@app.command()
def macs(

@@ -191,0 +215,0 @@ import_file: Path = common.arguments.import_file,

@@ -10,2 +10,3 @@ #!/usr/bin/env python3

from centralcli import common, render
from centralcli.constants import APIAction

@@ -22,2 +23,3 @@ from . import examples

reboot: bool = typer.Option(False, "--reboot", "-R", help="Automatically reboot device if IP or VLAN is changed [dim italic]Reboot is required for changes to take effect when IP or VLAN settings are changed[/]"),
banner_file: Path = common.options.banner_file,
yes: bool = common.options.yes,

@@ -29,4 +31,7 @@ debug: bool = common.options.debug,

) -> None:
"""Update per-ap-settings or ap-altitude (at AP level) in mass based on settings from import file
"""Update per-ap-settings, ap-altitude, banner, etc. (at AP level) in mass based on settings from import file
When [cyan]--banner-file <file>[/] is provided. Only the banner is processed. The import_file is used as the variable file if the banner_file is a .j2 file.
i.e. Most common scenario... banner_file is a j2 with {{ hostname }} which is converted to the value from the hostname field in the import file.
Use [cyan]--example[/] to see expected import file format and required fields.

@@ -41,3 +46,8 @@ """

data = common._get_import_file(import_file, "devices")
if banner_file:
render.econsole.print(f"[deep_sky_blue]:information:[/] When --banner-file is provided. Only the banner is processed. re-run the command without banner to process any other updates from {import_file.name}")
common.batch_update_aps(data, yes=yes, reboot=reboot)

@@ -73,5 +83,90 @@

@app.command()
def ap_banner(
import_file: Path = common.arguments.import_file,
banner_file: Path = common.arguments.banner_file,
_banner_file: Path = common.options.get("banner_file", hidden=True),
banner: bool = common.options.banner,
group_level: bool = typer.Option(False, "-G", "--groups", help=f"Treat import file as group import, update ap group level configs. {render.help_block('Update applied at device level, import expected to be device import')}"),
show_example: bool = common.options.show_example,
yes: bool = common.options.yes,
debug: bool = common.options.debug,
debugv: bool = common.options.debugv,
default: bool = common.options.default,
workspace: str = common.options.workspace,
) -> None:
"""Update banner (MOTD) text for APs at group or device level
When the banner_file is a [cyan].j2[/] file. It is processed as a jinja2 template with variables coming from the import_file.
i.e. Most common scenario... banner_file is a j2 with {{ hostname }} which is converted to the value from the hostname field in the import file.
Use [cyan]--example[/] to see expected import file format and required fields.
"""
if show_example:
render.console.print("Expects .yaml, .json, or .csv file with [cyan]serial[/] unless [cyan]-G[/]|[cyan]--groups[/] is used. Then expects the same with [cyan]name[/] [dim italic](The group name)[/]")
render.console.print("Any other keys/values provided in the import will be used as variables if [cyan]banner_file[/] provided is a Jinja2 template [dim italic](.j2 file)[/]")
render.econsole.print(
"--------------------- [bright_green].yaml example for[/] [magenta]APs[/] ---------------------\n"
"- serial: CN12345678\n"
" hostname: barn.615.ab12\n"
)
render.econsole.print(
"--------------------- [bright_green].csv example for[/] [magenta]Groups[/] ---------------------\n"
"name,some_var\n"
"group_name,some_value\n"
)
render.econsole.print("[dark_olive_green2]See [cyan]batch update aps --example[/cyan] for expanded example device import_file format[/]")
render.econsole.print("[dark_olive_green2]See [cyan]batch add groups --example[/cyan] for expanded example group import_file format[/]")
return
if not import_file:
common.exit(render._batch_invalid_msg("cencli batch update ap-banner [OPTIONS] [IMPORT_FILE] [BANNER_FILE]"))
is_tmp_file = False
if banner: # pragma: no cover requires tty
banner_file = common.get_banner_from_user()
is_tmp_file = True
banner_file = banner_file or _banner_file
if not banner_file:
common.exit("Missing required argument 'banner_file'")
data = common._get_import_file(import_file, "devices" if not group_level else "groups")
common.batch_update_ap_banner(data, banner_file, group_level=group_level, yes=yes)
if is_tmp_file: # pragma: no cover
banner_file.unlink(missing_ok=True)
@app.command()
def variables(
import_file: Path = common.arguments.get("import_file", help="Path to file with variables"),
replace: bool = typer.Option(False, "-R", "--replace", help=f"Replace all existing variables with the variables provided {render.help_block('existing variables are retained unless updated in this payload')}"),
show_example: bool = common.options.show_example,
yes: bool = common.options.yes,
debug: bool = common.options.debug,
default: bool = common.options.default,
workspace: str = common.options.workspace,
) -> None:
"""Batch update/replace variables for devices based on data from required import file.
Use [cyan]cencli batch update variables --example[/] to see example import file formats.
[italic]Accepts same format as Aruba Central UI, but also accepts .yaml[/]
[cyan]-R[/]|[cyan]--replace[/] :triangular_flag: Will flush all existing variables (for the devices in the import_file) and replace with the variables from the import_file.
By default: Existing variables not in the import_file are left intact.
"""
if show_example:
render.console.print(examples.update_variables)
return
if not import_file:
common.exit(render._batch_invalid_msg("cencli batch update variables [OPTIONS] [IMPORT_FILE]"))
action = APIAction.REPLACE if replace else APIAction.UPDATE
common.batch_add_update_replace_variables(import_file, action=action, yes=yes)
@app.callback()
def callback():
"""Batch update devices (GreenLake Inventory) / aps."""
"""Batch update devices (GreenLake Inventory) / aps / variables."""
pass

@@ -78,0 +173,0 @@

@@ -9,4 +9,4 @@ #!/usr/bin/env python3

from centralcli import common, config, render, utils
from centralcli.cache import CacheCert, CacheLabel, CacheSite, api
from centralcli import api_clients, common, config, render, utils
from centralcli.cache import CacheCert, CacheLabel, CacheSite
from centralcli.client import BatchRequest

@@ -25,2 +25,3 @@ from centralcli.constants import iden_meta

api = api_clients.classic

@@ -27,0 +28,0 @@ @app.command()

@@ -84,5 +84,8 @@ #!/usr/bin/env python3

"""
common.cache(refresh=True)
res = common.cache(refresh=True)
exit_code = 0 if all([r.ok if not hasattr(r, "all_ok") else r.all_ok for r in res]) else 1
common.exit(code=exit_code)
# CACHE add cache for webhooks

@@ -89,0 +92,0 @@ @app.command()

@@ -9,3 +9,3 @@ #!/usr/bin/env python3

from centralcli import cleaner, common, log, render
from centralcli import cleaner, common, render
from centralcli.cache import api

@@ -80,6 +80,3 @@ from centralcli.constants import CloudAuthMacSortBy, CloudAuthUploadType, TimeRange

if resp.ok:
try:
resp.output = cleaner.cloudauth_upload_status(resp.output)
except Exception as e: # pragma: no cover
log.error(f"Error cleaning output of cloud auth mac upload {repr(e)}")
resp.output = cleaner.cloudauth_upload_status(resp.output)

@@ -86,0 +83,0 @@ render.display_results(

@@ -7,7 +7,5 @@ #!/usr/bin/env python3

from centralcli import cleaner, common, render
from centralcli.clicommon import APIClients
from centralcli import api_clients, cleaner, common, render
from centralcli.constants import SortTsCmdOptions, TSDevTypes, iden_meta, lib_to_api # noqa
api_clients = APIClients()
api = api_clients.classic

@@ -14,0 +12,0 @@

@@ -10,10 +10,8 @@ #!/usr/bin/env python3

from centralcli import common, config, log, render
from centralcli import api_clients, common, config, log, render
from centralcli.client import Session
from centralcli.response import Response
from ..cache import api
from ..constants import SortGroupOptions
app = typer.Typer()
api = api_clients.classic

@@ -192,4 +190,3 @@

def command(
import_file: Path = common.arguments.import_file,
sort_by: SortGroupOptions = common.options.sort_by,
sort_by: str = common.options.sort_by,
reverse: bool = common.options.reverse,

@@ -216,2 +213,12 @@ yes: bool = common.options.yes,

"""
from ..cnx.api import CentralAPI
api = CentralAPI()
tablefmt = common.get_format(do_json=do_json, do_yaml=do_yaml, do_csv=do_csv, do_table=do_table)
resp = api.session.request(api.monitoring.get_aps) # deployment=APDeployment.STANDALONE, status=DeviceStatusFilter.ONLINE, limit=3)
render.display_results(resp, tablefmt=tablefmt, outfile=outfile, pager=pager, sort_by=sort_by, reverse=reverse, output_by_key="deviceName")
# from centralcli import api_clients
# resp = api_clients.glp.session.request(api_clients.glp.devices.get_glp_devices, sort_by="archived", reverse=True)
# render.display_results(resp, tablefmt=tablefmt, outfile=outfile, pager=pager)
...

@@ -218,0 +225,0 @@

@@ -12,5 +12,4 @@ #!/usr/bin/env python3

from centralcli import cleaner, common, render, utils
from centralcli import api_clients, cleaner, common, render, utils
from centralcli.cache import CacheDevice
from centralcli.clicommon import APIClients
from centralcli.constants import iden_meta, lib_to_api

@@ -25,5 +24,3 @@

api_clients = APIClients()
api = api_clients.classic
app = typer.Typer()

@@ -30,0 +27,0 @@ typer.Argument = partial(typer.Argument, show_default=False)

@@ -9,5 +9,4 @@ #!/usr/bin/env python3

from centralcli import common, log, render, utils
from centralcli import api_clients, common, log, render, utils
from centralcli.cache import CacheDevice, CacheLabel, CacheSub
from centralcli.clicommon import APIClients
from centralcli.client import BatchRequest

@@ -17,9 +16,7 @@ from centralcli.constants import iden_meta

api_clients = APIClients()
app = typer.Typer()
api = api_clients.classic
glp_api = api_clients.glp
app = typer.Typer()
# TOGLP

@@ -26,0 +23,0 @@ @app.command(deprecated=True, hidden=glp_api is not None)

@@ -13,3 +13,3 @@ #!/usr/bin/env python3

from centralcli import cleaner, common, log, render, utils
from centralcli import cleaner, common, log, render, utils, config
from centralcli.caas import CaasAPI

@@ -108,6 +108,8 @@ from centralcli.cache import CacheCert, CacheDevice, CacheGroup, CachePortal, CacheTemplate, api

@app.command(help="Update existing or add new Variables for a device/template")
@app.command()
def variables(
device: str = typer.Argument(..., metavar=iden_meta.dev, autocompletion=common.cache.dev_completion, show_default=False,),
var_value: list[str] = typer.Argument(..., help="comma seperated list 'variable = value, variable2 = value2'", show_default=False,),
device: str = common.arguments.device,
var_value: list[str] = typer.Argument(None, help=f"comma seperated list 'variable = value, variable2 = value2' [dim]{escape('[')}[red]required[/red] unless [cyan]--var-file[/] is specified{escape(']')}[/dim]", show_default=False,),
var_file: Path = typer.Option(None, "-F", "--file", help="Path to file with variables", exists=True, show_default=False,),
replace: bool = typer.Option(False, "-R", "--replace", help=f"Replace all existing variables with the variables provided {render.help_block('existing variables are retained unless updated in this payload')}"),
yes: bool = common.options.yes,

@@ -118,20 +120,37 @@ debug: bool = common.options.debug,

) -> None:
"""Update/replace existing or add new Variables for a device
Use [cyan]cencli batch update variables [IMPORT_FILE][/] to update multiple devices.
If providing [cyan]--file[/] :triangular_flag: Only the device specified via the device argument will be processed, even if the file has variables for other devices defined.
[cyan]-R[/]|[cyan]--replace[/] :triangular_flag: Will flush all existing variables leaving only the variables provided.
"""
dev = common.cache.get_inv_identifier(device)
serial = dev.serial
var_dict = common.parse_var_value_list(var_value)
msg = "Sending Update" if yes else "Please Confirm: [bright_green]Update[/]"
render.econsole.print(f"{msg} {dev.rich_help_text}", emoji=False)
_ = [render.econsole.print(f' {k}: [bright_green]{v}[/]', emoji=False) for k, v in var_dict.items()]
if render.confirm(yes):
resp = api.session.request(
api.configuration.update_device_template_variables,
serial,
dev.mac,
var_dict=var_dict
var_dict = {} if not var_file else utils.unlistify(config.get_file_data(var_file))
var_dict = var_dict.get(dev.serial, var_dict) # json by serial
if var_value:
var_dict = {**var_dict, **common.parse_var_value_list(var_value)}
if not var_dict:
common.exit(
"Missing required paramerter. [cyan]var_value[/] (args) and/or [cyan]--file[/] is required.\n"
"See [cyan]cencli update variables --help[/] for more details."
)
render.display_results(resp, tablefmt="action")
render.econsole.print(f"[bright_green]Update{'ing' if yes else ''}[/] {dev.rich_help_text}", emoji=False)
render.econsole.print(*[f' {k}: [bright_green]{v}[/]' for k, v in var_dict.items()], emoji=False, sep="\n")
if replace:
render.econsole.print(f"\n[dark_orange3]:warning:[/] [cyan]-R[/]|[cyan]--replace[/] :triangular_flag: used. [bright_red]All existing variables will be flushed[/]. Only the variables above will be defined for [cyan]{dev.serial}[/]")
render.confirm(yes)
resp = api.session.request(
api.configuration.update_device_template_variables,
serial,
dev.mac,
var_dict=var_dict,
replace=replace
)
render.display_results(resp, tablefmt="action")
@app.command()

@@ -251,4 +270,6 @@ def group(

group_dev: str = common.arguments.get("group_dev", autocompletion=common.cache.group_dev_ap_gw_completion),
cli_file: Path = typer.Argument(..., help="File containing desired config/template in CLI format.", exists=True, show_default=False,),
cli_file: Path = typer.Argument(None, help="File containing desired config/template in CLI format.", exists=True, show_default=False,),
var_file: Path = typer.Argument(None, help="File containing variables for j2 config template.", exists=True, show_default=False,),
banner_file: Path = common.options.banner_file,
banner: bool = common.options.banner,
do_gw: bool = common.options.do_gw,

@@ -261,3 +282,19 @@ do_ap: bool = common.options.do_ap,

) -> None:
is_tmp_file = False
group_dev: CacheDevice | CacheGroup = common.cache.get_identifier(group_dev, qry_funcs=["group", "dev"], device_type=["ap", "gw"])
if banner: # pragma: no cover requires tty
banner_file = common.get_banner_from_user()
is_tmp_file = True
if not cli_file and not banner_file:
common.exit("cli_file or --banner-file <banner file> is required")
if banner_file:
if do_gw or group_dev.is_dev and not group_dev.type == "ap":
common.exit("banner update only valid for APs or AP Groups")
common.batch_update_ap_banner(data=dict(group_dev), banner_file=banner_file, group_level=group_dev.is_group, yes=yes)
if is_tmp_file: # pragma: no cover
banner_file.unlink(missing_ok=True)
common.exit(code=0) # will exit from batch_update_ap_banner in display_results if it failed.
config_out = utils.generate_template(cli_file, var_file=var_file)

@@ -264,0 +301,0 @@ cli_cmds = utils.validate_config(config_out)

@@ -9,2 +9,3 @@ from __future__ import annotations

from ... import config
from .central.monitoring import MonitoringAPI

@@ -36,3 +37,20 @@

class CentralAPI:
def __init__(self, base_url: StrOrURL = None, *, aio_session: ClientSession = None, silent: bool = True):
self._session = Session(base_url=base_url or config.cnx.base_url, aio_session=aio_session, silent=silent, cnx=True)
@property
def session(self) -> Session:
return self._session
@session.setter
def session(self, session: Session) -> None:
self._session = session # pragma: no cover We don't use this currently
@cached_property
def monitoring(self) -> MonitoringAPI:
return MonitoringAPI(self.session)

@@ -15,3 +15,2 @@ # (C) Copyright 2025 Hewlett Packard Enterprise Development LP.

class NewCentralURLs:

@@ -117,3 +116,3 @@ Authentication = {"OAUTH": "https://sso.common.cloud.hpe.com/as/token.oauth2"}

def get_access_token(self, app_name: Literal["new_central", "glp"] = "glp"):
def get_access_token(self, app_name: Literal["new_central", "glp"] = "glp") -> str:
"""

@@ -177,4 +176,4 @@ Create a new access token for the specified application.

"""
log.info(f"{app_name} access Token has expired.", show=True)
log.info("Handling Token Expiry...", show=True)
log.info(f"{app_name} access Token expired/invalid. Fetching new token...")
client_id, client_secret = self._return_client_credentials(app_name)

@@ -181,0 +180,0 @@ if any(credential is None for credential in [client_id, client_secret]):

@@ -555,3 +555,4 @@ from __future__ import annotations

except Exception as e:
raise UserWarning(f'Unable to load configuration from {import_file}\n{e.__class__.__name__}\n\n{e}')
e.add_note(f'Unable to load data from {import_file} due to error above')
raise e

@@ -558,0 +559,0 @@ if isinstance(import_data, list):

@@ -163,2 +163,10 @@ #!/usr/bin/env python3

class DeviceStatusFilter(str, Enum):
ONLINE = "ONLINE"
OFFLINE = "OFFLINE"
class APDeployment(str, Enum):
STANDALONE = "Standalone"
CLUSTER = "Cluster"
class GroupDevTypes(str, Enum):

@@ -187,2 +195,14 @@ ap = "ap"

class MacFormat(str, Enum):
COLS = "COLS"
DASHES = "DASHES"
DOTS = "DOTS"
CLEAN = "CLEAN"
cols = "cols"
dashes = "dashes"
dots = "dots"
clean = "clean"
# Here are all the types for the below Enum

@@ -403,2 +423,7 @@ # 3: Bridge (Switch)

class APIAction(str, Enum):
ADD = "ADD"
UPDATE = "UPDATE"
REPLACE = "REPLACE"
class RadioMode(str, Enum):

@@ -2408,3 +2433,2 @@ access = "access"

"--version",
"-v",
"-V"

@@ -2411,0 +2435,0 @@ ]

@@ -59,3 +59,7 @@ from __future__ import annotations

@property
def ok(self) -> bool:
return self.base_url is not None
class Tokens(BaseModel):

@@ -62,0 +66,0 @@ access: Optional[str] = Field(..., alias=AliasChoices("access", "access_token", "access-token"))

from pydantic import BaseModel, field_serializer, ConfigDict
from typing import Dict, Any
from typing import Any
from datetime import datetime

@@ -21,7 +21,7 @@ import pendulum

)
details: Dict[str, Any]
details: dict[str, Any] | None
status: str
stats: CloudAuthUploadStats
submittedAt: datetime
lastUpdatedAt: datetime
submittedAt: datetime | None
lastUpdatedAt: datetime | None
durationNanos: int

@@ -33,2 +33,4 @@ fileName: str

def pretty_dt(cls, dt: datetime) -> DateTime:
if datetime(1, 1, 1, 0, 0, tzinfo=dt.tzinfo) == dt:
return None
return DateTime(dt.timestamp())

@@ -287,2 +287,1 @@ """object Classes"""

return valid

@@ -158,5 +158,6 @@ #!/usr/bin/env python3

# TODO Partial support for sending rich.Text needs to cleaned up and make all output returned to display_results rich.text, use console.print vs typer.echo in display_results
# Output class can likely be elliminated and return rich.Text from render.output
class Output():
def __init__(self, rawdata: str = "", prettydata: str = "", config: Config = None, tablefmt: TableFormat | None = None):
def __init__(self, rawdata: str = "", prettydata: str | Text = "", config: Config = None, tablefmt: TableFormat | None = None):
self.config = config

@@ -172,5 +173,6 @@ self._file = rawdata # found typer.unstyle AFTER I built this

if self.tty:
out = self.tty if not isinstance(self.tty, Text) else str(self.tty)
pretty_up = typer.style("Up\n", fg="green")
pretty_down = typer.style("Down\n", fg="red")
out = self.tty.replace("Up\n", pretty_up).replace("Down\n", pretty_down)
out = out.replace("Up\n", pretty_up).replace("Down\n", pretty_down)
else:

@@ -183,7 +185,15 @@ out = self.file

def __rich__(self):
pretty_up = "[green]Up[/]\n"
pretty_down = "[red]Down[/]\n"
out = self.tty.replace("Up\n", pretty_up).replace("Down\n", pretty_down)
is_text = False
if isinstance(self.tty, Text):
is_text = True
out = self.format_rich_text()
out = self.tty.markup
else:
pretty_up = "[green]Up[/]\n"
pretty_down = "[red]Down[/]\n"
out = self.tty.replace("Up\n", pretty_up).replace("Down\n", pretty_down)
out = self.sanitize_strings(out)
if all([is_text, out]):
out = Text.from_markup(out, emoji=":cd:" not in out)
return out if out else "\u26a0 No Data. This may be normal."

@@ -201,2 +211,7 @@

def format_rich_text(self) -> Text:
self.tty.highlight_words([" Up ", "UP "], "bright_green")
self.tty.highlight_words([" Down ", " DOWN "], "red")
return self.tty
def sanitize_strings(self, strings: str, config=None) -> str: # pragma: no cover

@@ -226,3 +241,3 @@ """Sanitize Output for demos

def menu(self, data_len: int = None) -> str:
def menu(self, data_len: int = None) -> str: # pragma: no cover requires tty
def isborder(line: str) -> bool:

@@ -258,3 +273,3 @@ return all(not c.isalnum() for c in list(line))

try:
return typer.unstyle(self._file)
return typer.unstyle(self.tty)
except TypeError:

@@ -413,3 +428,3 @@ return self._file

if not isinstance(data, list) or group_by not in data[0]:
log.error(f"Error in render.do_group_by_table invalid type {type(data)} or {group_by} not found in header.")
log.error(f"Error in render.build_rich_table_rows invalid type {type(data)} or {group_by} not found in header.")
[table.add_row(*list(in_dict.values())) for in_dict in data]

@@ -618,3 +633,6 @@ return table

raw_data = yaml.safe_dump(json.loads(json.dumps(outdata, cls=Encoder)), sort_keys=False)
table_data = rich_capture(raw_data)
# table_data = rich_capture(raw_data.replace("'", ""))
table_data = rich_capture(Syntax(rich_capture(raw_data.replace("'", "")), "yaml", background_color=None, theme='native'))
table_data = Text.from_ansi(table_data, overflow="fold")
...

@@ -627,2 +645,4 @@ elif tablefmt == "csv":

return str(value.iso)
elif value in ["❌", "✅"]:
return False if value == "❌" else True
else:

@@ -794,3 +814,3 @@ return str(value) if "," not in str(value) else f'"{value}"'

"""
style = "dim" if help_type == "default" else "dim red"
style = "dim red" if help_type == "requires" else "dim"
return f"[{style}]{escape(f'[{help_type}: {default_txt}]')}[/{style}]"

@@ -958,3 +978,12 @@

typer.echo_via_pager(outdata) if pager and tty and len(outdata) > tty.rows else typer.echo(outdata)
# display output to screen.
if isinstance(outdata.tty, Text):
emoji = ":cd:" not in outdata # HACK prevent :cd: often found in MAC addresses from being rendered as 💿
if pager and tty and len(outdata) > tty.rows:
with console.pager:
console.print(outdata, emoji=emoji)
else:
console.print(outdata, emoji=emoji)
else:
typer.echo_via_pager(outdata) if pager and tty and len(outdata) > tty.rows else typer.echo(outdata)

@@ -961,0 +990,0 @@ if caption and outdata.tablefmt != "rich": # rich prints the caption by default for all others we need to add it to the output

@@ -12,3 +12,2 @@ #!/usr/bin/env python3

from rich.console import Console
from rich.text import Text
from yarl import URL

@@ -160,3 +159,3 @@

self.url = response.url if isinstance(response.url, URL) else URL(response.url)
self.error = response.reason or "OK" if response.ok else "ERROR" # visualrf does not send OK for reason when call is successful
self.error = response.reason or ("OK" if response.ok else "ERROR") # visualrf does not send OK for reason when call is successful
if isinstance(response, ClientResponse):

@@ -335,3 +334,3 @@ self.status = response.status

r = render.output([data], tablefmt="yaml")
r = Text.from_ansi(r.tty)
# r = Text.from_ansi(r.tty) # now yaml now set r.tty to rich.Text
r = "\n".join([f" {line}" for line in str(r).splitlines()])

@@ -338,0 +337,0 @@ else:

@@ -32,3 +32,3 @@ #!/usr/bin/env python3

TabLibFormats = Literal['json', 'yaml', 'csv', 'tsv', 'dbf', 'html', 'jira', 'latex', 'df', 'rst', 'cli']
ExampleType = Literal["devices", "sites", "groups", "labels", "macs", "mpsk"]
ExampleType = Literal["devices", "sites", "groups", "labels", "macs", "mpsk", "variables"]
Action = Literal["add", "delete", "move", "rename", "other"]

@@ -160,3 +160,3 @@

self.ds = tablib.Dataset().load(self.data, format=data_format)
self.clean = self._get_clean_data(self.ds)
self.clean = self._get_clean_data(self.ds,)
self.json = self.get_json()

@@ -200,3 +200,7 @@ self.yaml = self.get_yaml()

out = self._handle_bools(data)
return [utils.strip_none(inner) for inner in out]
out = [utils.strip_none(inner) for inner in out]
if not self.type == "variables":
return out
else:
return {dev["_sys_serial"]: dev for dev in out}

@@ -644,3 +648,18 @@ def _handle_bools(self, data: tablib.Dataset) -> List[Dict[str, Any]]:

# -- // VARIABLES \\ --
data="""_sys_serial,_sys_lan_mac,_sys_hostname,_sys_gateway,_sys_module_command,user_var1,user_var2
US12345678,aabbccddeeff,snantx-idf1-sw1,10.0.30.1,type jl728a,value1,value2
SG12345679,ffee.ddcc.bbaa,snantx-idf1-sw2,10.0.30.1,type jl728a,value1,value2
TW12345680,01:aa:bb:cc:dd:ee,snantx-idf1-sw3,10.0.30.1,type jl728a,value1,value2"""
example = Example(data, type="variables", action="other")
clibatch_update_variables = f"""[italic cyan]cencli batch update variables IMPORT_FILE[/]:
Requires the following keys (include as header row for csv import):
[cyan]_sys_serial[/], [cyan]_sys_lan_mac[/] [italic](both are required)[/].
{example}
"""
# -- // ARCHIVE / UNARCHIVE \\ --

@@ -817,2 +836,4 @@ clibatch_archive = f"""[italic cyan]cencli batch archive IMPORT_FILE[/]:

self.update_aps = clibatch_update_aps
self.update_variables = clibatch_update_variables
self.add_variables = clibatch_update_variables.replace("update variables", "add variables")
self.update_devices = clibatch_update_aps.replace("update aps", "update devices")

@@ -822,3 +843,3 @@ self.assign_subscriptions = clibatch_assign_subscriptions

def __getattr__(self, key: str):
if key not in self.__dict__.keys(): # pragma: no cover
if key != "__iter__" and key not in self.__dict__.keys(): # pragma: no cover
log.error(f"An attempt was made to get {key} attr from ImportExamples which is not defined.")

@@ -825,0 +846,0 @@ return f":warning: [bright_red]Error[/] no str defined for [cyan]ImportExamples.{key}[/]"

@@ -31,3 +31,3 @@ import os

PortalAuthTypes = Sequence[PortalAuthType]
CacheTableName = Literal["devices", "sites", "groups", "labels", "macs", "mpsk", "subscriptions"]
ImportType = Literal["devices", "sites", "groups", "labels", "macs", "mpsk", "subscriptions", "variables"]
DynamicAntenna = Literal["narrow", "wide"]

@@ -34,0 +34,0 @@ RadioType = Literal["2.4", "5", "6"]

@@ -27,3 +27,3 @@ # -*- coding: utf-8 -*-

from centralcli.typedefs import StrOrURL
from centralcli.typedefs import StrOrURL, StrEnum

@@ -456,3 +456,3 @@ if TYPE_CHECKING:

@staticmethod
def generate_template(template_file: Path | str, var_file: Path | str | None,) -> str:
def generate_template(template_file: Path | str, var_file: Path | str | None = None, *, config_data: list | dict = None) -> str:
"""Generate configuration files based on j2 templates and provided variables."""

@@ -464,13 +464,14 @@ template_file = Path(str(template_file)) if not isinstance(template_file, Path) else template_file

if template_file.suffix == ".j2":
if not var_file: # no var file specified look for file in same dir as template with same base name and yaml/json suffix
for file in template_file.parent.iterdir():
if file.stem == template_file.stem and file.suffix in [".yaml", ".yml", ".json"]:
var_file = file
break
if not var_file:
econsole = Console(stderr=True)
econsole.print("[dark_orange3]:warning:[/] [cyan].j2[/] file provided with no matching variable file")
raise typer.Exit(1)
if not config_data:
if not var_file: # no var file specified look for file in same dir as template with same base name and yaml/json suffix
for file in template_file.parent.iterdir():
if file.stem == template_file.stem and file.suffix in [".yaml", ".yml", ".json"]:
var_file = file
break
if not var_file:
econsole = Console(stderr=True)
econsole.print("[dark_orange3]:warning:[/] [cyan].j2[/] file provided with no matching variable file")
raise typer.Exit(1)
config_data = yaml.load(var_file.read_text(), Loader=yaml.SafeLoader)
config_data = yaml.load(var_file.read_text(), Loader=yaml.SafeLoader)

@@ -612,3 +613,3 @@ env = Environment(loader=FileSystemLoader(str(template_file.parent)), trim_blocks=True, lstrip_blocks=True)

@staticmethod
def summarize_list(items: List[str], max: int = 6, pad: int = 4, sep: str = '\n', color: str | None = 'cyan', italic: bool = False, bold: bool = False) -> str:
def summarize_list(items: List[str, StrEnum], max: int = 6, pad: int = 4, sep: str = '\n', color: str | None = 'cyan', italic: bool = False, bold: bool = False, use_enum_name: bool = False) -> str:
if not items:

@@ -627,2 +628,4 @@ return ""

enum_attr = "value" if not use_enum_name else "name"
items = [item if not hasattr(item, enum_attr) else getattr(item, enum_attr) for item in items]
items = [f'{"" if not pad else " " * pad}{fmt}{item}{"[/]" if fmt else ""}' for item in items]

@@ -629,0 +632,0 @@ if len(items) == 1: # If there is only 1 item we return it with just the formatting and strip the pad

@@ -30,8 +30,8 @@ #!/usr/bin/env python

* http://pygments.org/docs/api/#pygments.lexer.Lexer.name
aliases {list} – languages, against whose GFM block names CsvLexer will apply
aliases {list} - languages, against whose GFM block names CsvLexer will apply
* https://git.io/fhjla
filenames {list} – file name patterns, for whose contents CsvLexer will apply
tokens {dict} – regular expressions internally matching CsvLexer’s components
filenames {list} - file name patterns, for whose contents CsvLexer will apply
tokens {dict} - regular expressions internally matching CsvLexer's components
Based on StackOverflow user Adobe’s code:
Based on StackOverflow user Adobe's code:
* https://stackoverflow.com/a/25508711/298171

@@ -46,60 +46,60 @@ """

'root': [
(r'^[^,\n]*', Operator, 'second'),
(r'^[^,\n]*', Operator, 'second'),
],
'second': [
(r'(,)([^,\n]*)', bygroups(Punctuation, Name.Constant), 'third'),
(r'(,)([^,\n]*)', bygroups(Punctuation, Name.Constant), 'third'),
],
'third': [
(r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Declaration), 'fourth'),
(r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Declaration), 'fourth'),
],
'fourth': [
(r'(,)([^,\n]*)', bygroups(Punctuation, Literal.Number), 'fifth'),
(r'(,)([^,\n]*)', bygroups(Punctuation, Literal.Number), 'fifth'),
],
'fifth': [
(r'(,)([^,\n]*)', bygroups(Punctuation, Literal.String.Single), 'sixth'),
(r'(,)([^,\n]*)', bygroups(Punctuation, Literal.String.Single), 'sixth'),
],
'sixth': [
(r'(,)([^,\n]*)', bygroups(Punctuation, Name.Constant), 'seventh'),
(r'(,)([^,\n]*)', bygroups(Punctuation, Name.Constant), 'seventh'),
],
'seventh': [
(r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Namespace), 'eighth'),
(r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Namespace), 'eighth'),
],
'eighth': [
(r'(,)([^,\n]*)', bygroups(Punctuation, Literal.Number), 'ninth'),
(r'(,)([^,\n]*)', bygroups(Punctuation, Literal.Number), 'ninth'),
],
'ninth': [
(r'(,)([^,\n]*)', bygroups(Punctuation, Literal.String.Single), 'tenth'),
(r'(,)([^,\n]*)', bygroups(Punctuation, Literal.String.Single), 'tenth'),
],
'tenth': [
(r'(,)([^,\n]*)', Operator, 'eleventh'),
(r'(,)([^,\n]*)', Operator, 'eleventh'),
],
'eleventh': [
(r'(,)([^,\n]*)', bygroups(Punctuation, Name.Constant), 'twelfth'),
(r'(,)([^,\n]*)', bygroups(Punctuation, Name.Constant), 'twelfth'),
],
'twelfth': [
(r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Declaration), 'thirteenth'),
(r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Declaration), 'thirteenth'),
],
'thirteenth': [
(r'(,)([^,\n]*)', bygroups(Punctuation, Literal.Number), 'fourteenth'),
(r'(,)([^,\n]*)', bygroups(Punctuation, Literal.Number), 'fourteenth'),
],
'fourteenth': [
(r'(,)([^,\n]*)', bygroups(Punctuation, Literal.String.Single), 'fifteenth'),
(r'(,)([^,\n]*)', bygroups(Punctuation, Literal.String.Single), 'fifteenth'),
],
'fifteenth': [
(r'(,)([^,\n]*)', bygroups(Punctuation, Name.Constant), 'sixteenth'),
(r'(,)([^,\n]*)', bygroups(Punctuation, Name.Constant), 'sixteenth'),
],
'sixteenth': [
(r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Namespace), 'seventeenth'),
(r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Namespace), 'seventeenth'),
],
'seventeenth': [
(r'(,)([^,\n]*)', bygroups(Punctuation, Literal.Number), 'eighteenth'),
(r'(,)([^,\n]*)', bygroups(Punctuation, Literal.Number), 'eighteenth'),
],
'eighteenth': [
(r'(,)([^,\n]*)', bygroups(Punctuation, Literal.String.Single), 'nineteenth'),
(r'(,)([^,\n]*)', bygroups(Punctuation, Literal.String.Single), 'nineteenth'),
],
'nineteenth': [
(r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Type), 'unsupported'),
(r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Type), 'unsupported'),
],
'unsupported': [
(r'(.+)', bygroups(Punctuation)),
(r'(.+)', bygroups(Punctuation)),
],

@@ -106,0 +106,0 @@ }

@@ -10,3 +10,3 @@ import sys

batch_dir = Path().home() / "git/myrepos/cencli-batch"
batch_dir = Path().home() / "git/cencli-batch"

@@ -13,0 +13,0 @@ # -- break up arguments passed as single string from vscode promptString --

# pragma: exclude file Still a WIP
import base64
from functools import lru_cache
import ipaddress
from itertools import groupby
from typing import Literal
import aiohttp
from google.protobuf.json_format import MessageToDict
from rich import inspect

@@ -10,7 +14,7 @@ from rich.console import Console

from centralcli import render
from centralcli.cache import api
from centralcli.cache import CacheDevice, api
from centralcli.models.config import WSSConfig
from centralcli.typedefs import LogType
from . import config, log
from . import config, log, utils, cache
from .objects import DateTime

@@ -28,13 +32,86 @@ from .protobuf import audit_pb2, monitoring_pb2, streaming_pb2

IGNORED_MSG_TYPES = ["STAT_UPLINK", "STAT_CLIENT"]
pretty_value = {
"ADD": "[bright_green]ADD[/]",
"DELETE": "[red]DELETE[/]",
"UPDATE": "[dark_orange3]UPDATE[/]",
"UP": "[bright_green]UP[/]",
"DOWN": "[red]DOWN[/]",
}
# TODO need to convert mac / ip / essid fields as described in readme of https://github.com/aruba/central-python-workflows/tree/main/streaming-api-client
async def _decode(data, field_type: FieldType = "mac"):
def _decode(data, field_type: FieldType = "ip"):
"""
Decode fields from protobuf payloads.
- If `data` is a base64-encoded string (MessageToDict output), decode to bytes.
- If already bytes/bytearray, use as-is.
- For 'essid' return UTF-8 string (fallback to bytes on decode error).
- For 'ip' return human-readable address (IPv4 or IPv6) via ipaddress.
- For 'mac' return colon-separated lower-case MAC using utils.Mac.
"""
try:
_data = base64.b64decode(data)
if field_type == "essid":
return _data
raw = None
# MessageToDict may return nested dicts for complex types like IpAddress
if isinstance(data, dict):
# Try to extract the nested 'addr' or numeric representation
if "addr" in data:
data = data["addr"]
elif "ip" in data:
data = data["ip"]
else:
# nothing to decode
return data
if isinstance(data, str):
# MessageToDict encodes bytes as base64 strings; try to decode
try:
raw = base64.b64decode(data)
except Exception:
# not base64 — assume already human readable string
if field_type in ["essid", "network"]:
return data
if field_type == "ip":
if data.is_digit(): # Some ip address fields represented as str(int) i.e. probeIp
raw = int(data)
else:
return data
if field_type == "mac":
try:
return utils.Mac(data).cols
except Exception:
return data
return data
elif isinstance(data, (bytes, bytearray)):
raw = bytes(data)
elif isinstance(data, int):
raw = data
else:
return data
if field_type in ["essid", "network"]:
try:
return raw.decode("utf-8", errors="replace")
except Exception:
return raw
if field_type == "ip":
return '.'.join('%d' % byte for byte in _data)
# raw might be bytes (base64-decoded), or an integer already (MessageToDict can return ints for some proto variants)
try:
if isinstance(raw, int):
return str(ipaddress.ip_address(raw))
# bytes -> integer big endian
addr = ipaddress.ip_address(int.from_bytes(raw, "big"))
return str(addr)
except Exception:
# fallback: dotted decimal for 4-byte IPv4
if isinstance(raw, (bytes, bytearray)) and len(raw) == 4:
return '.'.join(str(b) for b in raw)
return raw
if field_type == "mac":
return utils.Mac(raw).cols
return ':'.join('%02x' % ord(byte) for byte in _data)
return ':'.join(f'{b:02x}' for b in raw)
except Exception as e:

@@ -97,2 +174,170 @@ log.exception(f"Exception while attempting to decode {field_type} in wss payload. \n{e}")

def decode_mac_field(data):
"""
Accepts either:
- a base64 string (as produced by MessageToDict for bytes fields), or
- raw bytes (as produced by parsed protobuf message)
Returns MAC as colon-separated lower-case string: '00:11:22:33:44:55'
"""
# convert base64 string -> bytes, or accept bytes as-is
if isinstance(data, str):
try:
raw = base64.b64decode(data)
except Exception:
# not base64 → return original
return data
elif isinstance(data, (bytes, bytearray)):
raw = bytes(data)
else:
return data
# Use helper to format nicely
return utils.Mac(raw).cols
def extract_ranges(nums):
nums = sorted(set(nums))
try:
groups = (list(g) for _, g in groupby(enumerate(nums), key=lambda iv: iv[1] - iv[0]))
except Exception as e:
log.error(f"{repr(e)} in extract_ranges, {nums = }", show=True)
return f"{nums[0]} - {nums[-1]}"
return ",".join([f"{g[0][1]}-{g[-1][1]}" if g[0][1] != g[-1][1] else str(g[0][1]) for g in groups])
@lru_cache
def _get_device(_key, _value):
dev = cache.devices_by_serial.get(_value)
if not dev:
return _value
return CacheDevice(dev).rich_help_text
def get_devices(as_dict, key):
device_fields = ["deviceId", "associatedDevice"]
return {**as_dict, key: [{k: v if k not in device_fields else _get_device(k, v) for k, v in inner.items()} for inner in as_dict[key]]}
def colorize_fields(as_dict: dict, key) -> dict:
color_fields = ["status", "action", "operState", "adminState"]
return {**as_dict, key: [{k: v if k not in color_fields else pretty_value.get(v, v) for k, v in inner.items()} for inner in as_dict[key]]}
def get_macs(as_dict, key) -> dict:
as_dict = colorize_fields(as_dict, key=key)
mac_keys = {
"timestamp": "timestamp",
"uptime": "uptime",
"deviceId": "device",
"associatedDevice": "associated_device",
"network": "network",
"essid": "essid",
"ipAddress": "ip",
"probeIpAddr": "probe_ip",
"macaddr": "mac",
"radioMac": "radio_mac",
"interfaceMac": "interface_mac",
"peerMac": "peer_mac",
"localMac": "local_mac",
"srcIp": "src_ip",
"dstIp": "dst_ip"
}
for mkey in mac_keys.keys():
if mkey == "timestamp":
macs = [DateTime(iface[mkey]) for iface in as_dict[key] if mkey in iface]
elif mkey in ["deviceId", "associatedDevice"]:
as_dict = get_devices(as_dict, key=key)
continue
elif mkey == "uptime":
as_dict = {**as_dict, key: [{k: v if k != "uptime" else DateTime(v, "durwords-short", round_to_minute=True) for k, v in inner.items()} for inner in as_dict[key]]}
continue
elif mkey in ["essid", "network"]:
macs = [_decode(iface[mkey], field_type=mkey) for iface in as_dict[key] if mkey in iface]
elif mkey in ["probeIpAddr"]:
macs = [_decode(iface[mkey], field_type=mac_keys[mkey].split("_")[-1]) for iface in as_dict[key] if mkey in iface]
else:
macs = [_decode(iface[mkey]["addr"], field_type=mac_keys[mkey].split("_")[-1]) for iface in as_dict[key] if mkey in iface]
if macs:
as_dict[key] = [{k if k != mkey else mac_keys[mkey]: v if k != mkey else mac for k, v in iface.items()} for iface, mac in zip(as_dict[key], macs)]
return as_dict
def get_ips(as_dict, key) -> dict:
ip_keys = {
"ipAddress": "ip",
}
for ip_key in ip_keys.keys():
ips = [_decode(iface[ip_key]["addr"]) for iface in as_dict[key] if ip_key in iface]
if ips:
as_dict[key] = [{k if k != ip_key else ip_keys[ip_key]: v if k != ip_key else ip for k, v in iface.items()} for iface, ip in zip(as_dict[key], ips)]
return as_dict
def format_pb_data(pb_data: monitoring_pb2.MonitoringInformation) -> dict:
as_dict = MessageToDict(pb_data)
if pb_data.interfaces:
allowed_vlans = [extract_ranges(iface["allowedVlan"]) for iface in as_dict["interfaces"] if "allowedVlan" in iface]
if allowed_vlans:
as_dict["interfaces"] = [{**iface, "allowedVlan": v} for iface, v in zip(as_dict["interfaces"], allowed_vlans)]
# macs = get_macs(as_dict["interfaces"])
# as_dict["interfaces"] = [{**iface, "mac": v} for iface, v in zip(as_dict["interfaces"], macs)]
as_dict = get_macs(as_dict, "interfaces")
if pb_data.interface_stats:
as_dict = get_macs(as_dict, "interfaceStats")
if pb_data.client_stats:
as_dict = get_macs(as_dict, "clientStats")
if pb_data.wired_clients:
as_dict = get_macs(as_dict, "wiredClients")
if pb_data.wireless_clients:
as_dict = get_macs(as_dict, "wirelessClients")
if pb_data.vap_stats:
as_dict = get_macs(as_dict, "vapStats")
if pb_data.radio_stats:
as_dict = get_macs(as_dict, "radioStats")
if pb_data.ipprobe_stats:
as_dict = get_macs(as_dict, "ipprobeStats")
if pb_data.ssid_stats:
as_dict = get_macs(as_dict, "ssidStats")
if pb_data.ike_tunnels:
as_dict = get_macs(as_dict, "ikeTunnels")
if pb_data.device_stats:
as_dict = get_macs(as_dict, "deviceStats")
if pb_data.uplink_probe_stats:
as_dict = get_macs(as_dict, "uplinkProbeStats")
if pb_data.uplink_wan_stats:
as_dict = get_macs(as_dict, "uplinkWanStats")
if pb_data.vlan_stats:
as_dict = get_macs(as_dict, "vlanStats")
if pb_data.device_neighbours:
as_dict = get_macs(as_dict, "deviceNeighbours")
if pb_data.tunnel_stats:
as_dict = get_macs(as_dict, "tunnelStats")
if pb_data.uplink_stats:
as_dict = get_macs(as_dict, "uplinkStats")
if pb_data.switches:
as_dict = get_macs(as_dict, "switches")
if pb_data.aps:
as_dict = get_macs(as_dict, "aps")
if pb_data.mobility_controllers:
as_dict = get_macs(as_dict, "mobilityControllers")
if pb_data.radios:
as_dict = get_macs(as_dict, "radios")
if pb_data.vaps:
as_dict = get_macs(as_dict, "vaps")
if pb_data.networks:
as_dict = get_macs(as_dict, "networks")
if pb_data.tunnels:
as_dict = get_macs(as_dict, "tunnels")
as_dict = {"timestamp": DateTime(pb_data.timestamp), **{k: v for k, v in as_dict.items() if k != "timestamp"}} # Move timestamp to top and format
del as_dict["customerId"]
return as_dict
# TODO base_url will be required once not hardcoded, need to determine if base-url can be determined reliably from central base and provide config option for it.

@@ -135,4 +380,5 @@ async def follow_logs(wss_config: WSSConfig, log_type: LogType = "event"):

# asyncio.create_task(_clean_data(monitoring_data))
console.print(f"-- {DateTime(stream_data.timestamp / 1000 / 1000 / 1000)} --\n{pb_data}")
...
if [t for t in MessageToDict(pb_data).get("dataElements", []) if t not in IGNORED_MSG_TYPES]:
pb_dict = format_pb_data(pb_data)
render.display_results(data=pb_dict, tablefmt="yaml")
elif msg.type == aiohttp.WSMsgType.ERROR:

@@ -139,0 +385,0 @@ econsole.print(msg.data)

+11
-86
Metadata-Version: 2.4
Name: centralcli
Version: 9.1.0
Version: 9.2.3
Summary: A CLI for interacting with Aruba Central (Cloud Management Platform). Facilitates bulk imports, exports, reporting. A handy tool if you have devices managed by Aruba Central.

@@ -25,2 +25,3 @@ License-Expression: MIT

Provides-Extra: speedups
Provides-Extra: xlsx
Requires-Dist: Brotli ; (platform_python_implementation == "CPython") and (extra == "speedups")

@@ -53,2 +54,3 @@ Requires-Dist: PyYAML (>=6)

Requires-Dist: tablib (>=3)
Requires-Dist: tablib[xlsx] ; extra == "xlsx"
Requires-Dist: tabulate (>=0.8)

@@ -69,2 +71,3 @@ Requires-Dist: tinydb (>=4)

[![Coverage](https://raw.githubusercontent.com/Pack3tL0ss/central-api-cli/master/docs/img/coverage.svg)](https://raw.githubusercontent.com/Pack3tL0ss/central-api-cli/master/docs/img/coverage.svg)
[![CodeFactor](https://www.codefactor.io/repository/github/pack3tl0ss/central-api-cli/badge/master)](https://www.codefactor.io/repository/github/pack3tl0ss/central-api-cli/overview/master)
[![Downloads](https://static.pepy.tech/badge/centralcli)](https://pepy.tech/project/centralcli)

@@ -105,3 +108,3 @@ [![PyPI - Installs](https://img.shields.io/pypi/dm/centralcli.svg?color=blue&label=Installs&logo=pypi&logoColor=gold)](https://pypi.org/project/centralcli/)

### uv
#### Install `uv` on Linx/Mac:
#### Install `uv` on Linux/Mac:
```bash

@@ -124,9 +127,11 @@ # quick install on MacOS/Linux

```bash
# install centralcli (showing command with optional `--python 3.13` flag see note below)
uv tool install --python 3.13 centralcli
# install centralcli (showing command with optional `--python 3.14` flag see note below)
uv tool install --python 3.14 centralcli
```
> Including `--python 3.13` above is optional provided you have python installed on the system. However uv facilitates deploying a stand-alone version of python with the package. There are likely speed advantages to doing this if your current version is dated (check with `python -V`)
> Including `--python 3.14` above is optional. However uv facilitates deploying a stand-alone version of python with the package. There are likely speed advantages to doing this if your current version is dated (check with `python -V`)
> Run `cencli --install-completion` to enable auto/tab completion in your shell once installed. *Shell needs to be restarted to take effect*.
Then to Upgrade `centralcli`

@@ -137,84 +142,4 @@ ```bash

### pipx
`centralcli` can also be installed via pipx, similar to `uv` pipx will install `centralcli` in an isolated environment, and expose the `cencli` command in PATH.
> Alternative (less preferred) installation methods using pipx or pip are described [here](other_install.md).
> The first section below is for Debian based systems refer to [pipx documentation](https://pipx.pypa.io/stable/installation/) for instructions on other OSs.
```bash
# install pipx (Debian)
sudo apt update
sudo apt install pipx
pipx ensurepath
# install central CLI
pipx install centralcli --include-deps
# optional install speedups for centralcli (this pulls in additional optional dependencies, that can improve performance.) Minimal impact in most scenarios.
pipx install centralcli[speedups] --force # force if centralcli was already installed and you are just adding speedups
```
Then to Upgrade `centralcli`
```bash
pip upgrade centralcli
```
### pip (manually install in virtual environment)
> The example below is for Debian based systems, where `apt` is referenced but should be easy to translate to other OSs given you have some familiarity with the package management commands (i.e. `dnf`). On Windows python should install with pip. The pip commands are still valid.
```shell
# (Debian) If you don't have pip
sudo apt update
sudo apt install python3-pip
sudo apt install python3-virtualenv
# create a directory to store the venv in
cd ~ # ensure you are in your home dir
mkdir .venvs # creates hidden .venvs dir to store venv in
cd .venvs # change to that directory
export DEB_PYTHON_INSTALL_LAYOUT='deb' # Just ensures the directory structure for simpler instructions (Ubuntu 22.04 changed the dir layout of venvs without it)
python3 -m virtualenv centralcli --prompt cencli # prompt is optional
source centralcli/bin/activate # activates the venv
# Install centralcli
pip install centralcli
# optional install speedups for centralcli (this pulls in additional optional dependencies, that can improve performance.) Minimal impact in most scenarios.
pip install centralcli[speedups]
which centralcli # Should return ~/.venvs/centralcli/bin/centralcli
# for BASH shell Update .bashrc to update PATH on login (keep the single quotes)
echo 'export PATH="$PATH:$HOME/.venvs/centralcli/bin"' >> ~/.bashrc
# for zsh or others, do the equivalent... i.e. update .zshrc in a similar manner
```
Then to upgrade:
```bash
~/.venvs/centralcli/bin/pip install -U centralcli
```
### pip (in system python environment)
[Requires python3.10 or above and pip.](#if-you-don-t-have-python)
It's recommended to use the `uv` install method above, however if you don't use a lot of python apps (meaning a dependency conflict with other apps is not a concern). Then simply installing via pip is possible (*albeit not recommended*).
> This method is primarily feasible on Windows, as current versions of many Linux distributions do not allow installing apps in the system python environment.
```bash
pip install centralcli
# optional install speedups for centralcli (this pulls in additional optional dependencies, that can improve performance.) Minimal impact in most scenarios.
pip install centralcli[speedups]
```
Then to upgrade:
```bash
pip install -U centralcli
```
### if you don't have python
- You can get it for any platform @ [https://www.python.org](https://www.python.org)
- On Windows 10 it's also available in the Windows store, and via winget.
## Configuration

@@ -221,0 +146,0 @@ ✨ pre-populating the config as described below is optional. Central CLI will prompt for the information it needs on first run if no config exists.

[project]
name = "centralcli"
version = "9.1.0"
version = "9.2.3"
description = "A CLI for interacting with Aruba Central (Cloud Management Platform). Facilitates bulk imports, exports, reporting. A handy tool if you have devices managed by Aruba Central."

@@ -66,9 +66,10 @@ license = "MIT"

[project.optional-dependencies]
hook-proxy = [ "fastapi", "uvicorn", "psutil"]
hook-proxy = ["fastapi", "uvicorn", "psutil"]
speedups = [
"aiodns>=3.2.0;platform_system != 'Windows'",
"aiodns>=3.2.0;platform_system != 'Windows'",
"pycares<5.0.0;platform_system != 'Windows'",
"Brotli;platform_python_implementation == \"CPython\"",
"Brotli;platform_python_implementation == \"CPython\"",
"brotlicffi;platform_python_implementation != 'CPython'"
]
xlsx = ["tablib[xlsx]"]

@@ -90,2 +91,3 @@ [project.urls]

jsonref = ">=1.1.0"
nox-uv = ">=0.6.3"

@@ -115,6 +117,4 @@ [build-system]

[dependency-groups]
nox = [
test = [
"nox-uv>=0.6.3",
]
test = [
"pytest",

@@ -128,2 +128,11 @@ "pytest-cov",

]
dev = [
"ruff>=0.8.0",
"pytest>=6",
"pytest-cov>=6.2.1",
"nox>=2025.5.1",
"pytest-asyncio>=1.2.0",
"jsonref>=1.1.0",
"nox-uv>=0.6.3",
]

@@ -130,0 +139,0 @@ [tool.pytest.ini_options]

@@ -6,2 +6,3 @@ # Aruba Central API CLI

[![Coverage](https://raw.githubusercontent.com/Pack3tL0ss/central-api-cli/master/docs/img/coverage.svg)](https://raw.githubusercontent.com/Pack3tL0ss/central-api-cli/master/docs/img/coverage.svg)
[![CodeFactor](https://www.codefactor.io/repository/github/pack3tl0ss/central-api-cli/badge/master)](https://www.codefactor.io/repository/github/pack3tl0ss/central-api-cli/overview/master)
[![Downloads](https://static.pepy.tech/badge/centralcli)](https://pepy.tech/project/centralcli)

@@ -42,3 +43,3 @@ [![PyPI - Installs](https://img.shields.io/pypi/dm/centralcli.svg?color=blue&label=Installs&logo=pypi&logoColor=gold)](https://pypi.org/project/centralcli/)

### uv
#### Install `uv` on Linx/Mac:
#### Install `uv` on Linux/Mac:
```bash

@@ -61,9 +62,11 @@ # quick install on MacOS/Linux

```bash
# install centralcli (showing command with optional `--python 3.13` flag see note below)
uv tool install --python 3.13 centralcli
# install centralcli (showing command with optional `--python 3.14` flag see note below)
uv tool install --python 3.14 centralcli
```
> Including `--python 3.13` above is optional provided you have python installed on the system. However uv facilitates deploying a stand-alone version of python with the package. There are likely speed advantages to doing this if your current version is dated (check with `python -V`)
> Including `--python 3.14` above is optional. However uv facilitates deploying a stand-alone version of python with the package. There are likely speed advantages to doing this if your current version is dated (check with `python -V`)
> Run `cencli --install-completion` to enable auto/tab completion in your shell once installed. *Shell needs to be restarted to take effect*.
Then to Upgrade `centralcli`

@@ -74,84 +77,4 @@ ```bash

### pipx
`centralcli` can also be installed via pipx, similar to `uv` pipx will install `centralcli` in an isolated environment, and expose the `cencli` command in PATH.
> Alternative (less preferred) installation methods using pipx or pip are described [here](other_install.md).
> The first section below is for Debian based systems refer to [pipx documentation](https://pipx.pypa.io/stable/installation/) for instructions on other OSs.
```bash
# install pipx (Debian)
sudo apt update
sudo apt install pipx
pipx ensurepath
# install central CLI
pipx install centralcli --include-deps
# optional install speedups for centralcli (this pulls in additional optional dependencies, that can improve performance.) Minimal impact in most scenarios.
pipx install centralcli[speedups] --force # force if centralcli was already installed and you are just adding speedups
```
Then to Upgrade `centralcli`
```bash
pip upgrade centralcli
```
### pip (manually install in virtual environment)
> The example below is for Debian based systems, where `apt` is referenced but should be easy to translate to other OSs given you have some familiarity with the package management commands (i.e. `dnf`). On Windows python should install with pip. The pip commands are still valid.
```shell
# (Debian) If you don't have pip
sudo apt update
sudo apt install python3-pip
sudo apt install python3-virtualenv
# create a directory to store the venv in
cd ~ # ensure you are in your home dir
mkdir .venvs # creates hidden .venvs dir to store venv in
cd .venvs # change to that directory
export DEB_PYTHON_INSTALL_LAYOUT='deb' # Just ensures the directory structure for simpler instructions (Ubuntu 22.04 changed the dir layout of venvs without it)
python3 -m virtualenv centralcli --prompt cencli # prompt is optional
source centralcli/bin/activate # activates the venv
# Install centralcli
pip install centralcli
# optional install speedups for centralcli (this pulls in additional optional dependencies, that can improve performance.) Minimal impact in most scenarios.
pip install centralcli[speedups]
which centralcli # Should return ~/.venvs/centralcli/bin/centralcli
# for BASH shell Update .bashrc to update PATH on login (keep the single quotes)
echo 'export PATH="$PATH:$HOME/.venvs/centralcli/bin"' >> ~/.bashrc
# for zsh or others, do the equivalent... i.e. update .zshrc in a similar manner
```
Then to upgrade:
```bash
~/.venvs/centralcli/bin/pip install -U centralcli
```
### pip (in system python environment)
[Requires python3.10 or above and pip.](#if-you-don-t-have-python)
It's recommended to use the `uv` install method above, however if you don't use a lot of python apps (meaning a dependency conflict with other apps is not a concern). Then simply installing via pip is possible (*albeit not recommended*).
> This method is primarily feasible on Windows, as current versions of many Linux distributions do not allow installing apps in the system python environment.
```bash
pip install centralcli
# optional install speedups for centralcli (this pulls in additional optional dependencies, that can improve performance.) Minimal impact in most scenarios.
pip install centralcli[speedups]
```
Then to upgrade:
```bash
pip install -U centralcli
```
### if you don't have python
- You can get it for any platform @ [https://www.python.org](https://www.python.org)
- On Windows 10 it's also available in the Windows store, and via winget.
## Configuration

@@ -158,0 +81,0 @@ ✨ pre-populating the config as described below is optional. Central CLI will prompt for the information it needs on first run if no config exists.

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display