centralcli
Advanced tools
| #!/usr/bin/env python3 | ||
| # -*- coding: utf-8 -*- | ||
| from pathlib import Path | ||
| import typer | ||
| from centralcli import common, config, render | ||
| from centralcli.clicommon import Mac | ||
| from rich.markup import escape | ||
| app = typer.Typer() | ||
| def _generate_bssids_from_file(file: Path, out: Path = None, num_bssids: int = 6): | ||
| out_file = out or Path(str(file).replace(file.suffix, f"_out{file.suffix}")) | ||
| file_data = config.get_file_data(file) | ||
| if not file_data: | ||
| common.exit(f"No data found in {file}") | ||
| macs = [Mac(ap["mac"] , bssids=True, num_bssids=6) for ap in file_data] | ||
| invalid_macs = [(idx, m) for idx, m in enumerate(macs, start=2) if not m.ok] | ||
| if invalid_macs: | ||
| err = "\n".join([f"Mac: {m.orig} on row {row} in invalid." for row, m in invalid_macs]) | ||
| common.exit(err) | ||
| out_data = [] | ||
| for ap, mac_obj in zip(file_data, macs): | ||
| to_end = ["BSSID", "LocationId", "LocationValue", "Description"] | ||
| bssids = [bssid for radio_bssid_list in mac_obj.bssids.values() for bssid in radio_bssid_list[0:num_bssids]] | ||
| out_data += [{**{k: v for k, v in ap.items() if k not in to_end}, "BSSID": bssid, "LocationId": ap.get("LocationId"), "Description": ap.get("Description")} for bssid in bssids] | ||
| # out_data += [{**{k: v for k, v in ap.items() if k not in to_end}, "BSSID": bssid, "LocationId": ap.get("LocationId"), "LocationValue": ap.get("LocationValue"), "Description": ap.get("Description")} for bssid in bssids] | ||
| headers = ",".join(out_data[0].keys()) | ||
| values = "\n".join([",".join(['' if v is None else v for v in inner.values()]) for inner in out_data]) | ||
| csv_data = f"{headers}\n{values}" | ||
| written = out_file.write_text(csv_data) | ||
| render.console.print(csv_data) | ||
| render.console.print(f"Output written to {out_file}") | ||
| return written | ||
| # F070-11-1600 Market Street-Floor11 | ||
| def _build_description(ap_name: str, site_name: str | None, description: str | None) -> str: | ||
| if site_name is None and description: | ||
| return description | ||
| _, site_code, floor, _, _ = map(lambda txt: txt.removeprefix("WAP").removeprefix("wap"), ap_name.split("-")) | ||
| padded_floor = floor if len(floor) == 2 else f"{floor:02s}" | ||
| floor = padded_floor.removeprefix("0") | ||
| return f"{site_code}-{padded_floor}-{site_name}-Floor{floor}" | ||
| def bssids_from_xls( | ||
| file: Path, | ||
| site: str = None, | ||
| out: Path = None, | ||
| bssids: bool = True, | ||
| num_bssids: int = 6, | ||
| bssid_out: Path = None | ||
| ): | ||
| prepped_file = out or file.parent / f"{file.stem}.csv" | ||
| import tablib | ||
| import importlib.util | ||
| if importlib.util.find_spec("et_xmlfile") is None: | ||
| common.exit(f"Missing optional xlsx support. re-install centralcli with optional dependency to add support for xlsx files. [cyan]uv tool install {escape('centralcli[xlsx]')}[/]\n[italic]No need to uninstall, just re-run as described to add support for xlsx[/]") | ||
| book = tablib.Databook() | ||
| book.xlsx = file.read_bytes() # type: ignore | ||
| datasets = [ds for ds in book.sheets() if "summary" not in ds.title.lower()] | ||
| out_keys = ["name", "serial", "mac", "BSSID", "LocationId", "Description"] | ||
| output = [] | ||
| for ds in datasets: | ||
| as_dict = [{k.lower().removesuffix(" number").removesuffix("-address"): v for k, v in inner.items()} for inner in ds.dict] | ||
| as_dict = [{**inner, "BSSID": None, "LocationId": None, "Description": None if ds.title.lower().startswith("spare") else _build_description(inner['name'], site_name=site, description=inner.get("Description", inner.get("description")))} for inner in as_dict if any(inner.values())] | ||
| output += as_dict | ||
| header = ",".join(out_keys) | ||
| written = prepped_file.write_text( | ||
| "\n".join([header, *[",".join('' if ap.get(key) is None else ap[key] for key in out_keys) for ap in output]]) | ||
| ) | ||
| if not bssids or not written: | ||
| common.exit(f"Wrote {written} bytes to {prepped_file.name}", code=0 if written else 1) | ||
| written = _generate_bssids_from_file(prepped_file, out=bssid_out, num_bssids=num_bssids) | ||
| common.exit(code=0 if written else 1) | ||
| @app.command() | ||
| def bssids( | ||
| ap_mac: str = typer.Argument(None, help="AP Mac Address", show_default=False), | ||
| file: Path = typer.Option(None, help="fetch MACs from file for bssid calc", exists=True, show_default=False), | ||
| dir: Path = typer.Option(None, help="process all files in dir for bssid calc", exists=True, show_default=False), | ||
| site: str = typer.Option(None, "--site", "-s", help="The official name of the site (from SFDC), not required if [cyan]description[/] field is in the input file. [dim italic]Only applies with [cyan]--file[/][/] :triangular_flag:"), | ||
| num_bssids: int = typer.Option(None, "-n", help="The number of BSSIDs to generate in output file. [dim italic]Only applies with [cyan]--file[/][/] :triangular_flag:"), # TODO render.help_block add helper for this common... Valid/Only applies with --{flag} :triangular_flag: | ||
| vertical: bool = typer.Option(False, "-V", "--vertical", help="Display BSSIDs vertically with no headers"), | ||
| out: Path = typer.Option(None, help="Output to file. --file option will always create an output file ... by default will create new file with same name as --file with _out appended to the file name.",), | ||
| ): | ||
| """Generate bssids based on AP MAC(s) | ||
| Using --file :triangular_flag: will result in a file containing the headers necessary for import into MS Teams for e911 location. | ||
| """ | ||
| if ap_mac and Path(ap_mac).exists(): | ||
| file = Path(ap_mac) # allow them to specify the file as the first arg | ||
| if file: | ||
| if file.suffix == ".xlsx": | ||
| bssids_from_xls(file, site=site, num_bssids=num_bssids, bssid_out=out) | ||
| written = _generate_bssids_from_file(file, out=out, num_bssids=num_bssids) | ||
| common.exit(code=0 if written else 1) | ||
| elif dir: | ||
| files = [file for file in dir.iterdir() if file.suffix in [".csv", ".yaml", ".yml", ".json"]] | ||
| written = [_generate_bssids_from_file(file, out=out, num_bssids=num_bssids) for file in files] | ||
| common.exit(code=0 if all(written) else 1) | ||
| else: | ||
| typer.echo(Mac(ap_mac, bssids=True, num_bssids=num_bssids, tablefmt="table" if not vertical else "vertical")) | ||
| @app.callback() | ||
| def callback(): | ||
| """Generate bssids based on AP MAC(s)""" | ||
| pass | ||
| if __name__ == "__main__": | ||
| app() |
| from __future__ import annotations | ||
| from typing import TYPE_CHECKING | ||
| from centralcli import utils | ||
| from centralcli.client import Session | ||
| from centralcli.constants import DeviceStatusFilter, APDeployment | ||
| if TYPE_CHECKING: | ||
| from centralcli.response import Response | ||
| class MonitoringAPI: | ||
| def __init__(self, session: Session): | ||
| self.session = session | ||
| async def get_aps( | ||
| self, | ||
| site_id: int | None = None, | ||
| model: str | None = None, | ||
| status: DeviceStatusFilter | None = None, | ||
| deployment: APDeployment | None = None, | ||
| limit: int = 100, | ||
| next: int | None = 1 | ||
| ) -> Response: | ||
| url = "/network-monitoring/v1alpha1/aps" | ||
| filters = { | ||
| "siteId": site_id, | ||
| "model": model, | ||
| "status": status if status is None or not hasattr(status, "value") else status.value, | ||
| "deployment": deployment if deployment is None or not hasattr(deployment, "value") else deployment.value, | ||
| } | ||
| filters = utils.strip_none(filters) | ||
| params = { | ||
| "filter": " and ".join(f"{k} eq '{v}'" for k, v in filters.items()) or None, # TODO need to test if Enum works without sending .value | ||
| "limit": limit, | ||
| "next": next | ||
| } | ||
| return await self.session.get(url, params=params) | ||
@@ -27,2 +27,3 @@ #!/usr/bin/env python3 | ||
| from typing import Callable, Iterable, List, Literal, Optional, Sequence, overload | ||
| from functools import cached_property | ||
@@ -260,3 +261,26 @@ import click | ||
| from .classic.api import ClassicAPI | ||
| from .cnx.api import CentralAPI, GreenLakeAPI | ||
| class APIClients: # TODO play with cached property vs setting in init to see how it impacts import performance across the numerous files that need this | ||
| def __init__(self, *, classic_base_url: str = config.classic.base_url, glp_base_url: str = config.glp.base_url, cnx_base_url: str = config.cnx.base_url, silent: bool = False): | ||
| self.classic_base_url = classic_base_url | ||
| self.glp_base_url = glp_base_url | ||
| self.cnx_base_url = cnx_base_url | ||
| self.silent = silent | ||
| @cached_property | ||
| def classic(self): | ||
| return ClassicAPI(self.classic_base_url, silent=self.silent) | ||
| @cached_property | ||
| def glp(self): | ||
| return None if not config.glp.ok else GreenLakeAPI(self.glp_base_url, silent=self.silent) | ||
| @cached_property | ||
| def cnx(self): | ||
| return None if not config.cnx.ok else CentralAPI(self.cnx_base_url, silent=self.silent) | ||
| api_clients = APIClients() | ||
| from .cache import Cache, CacheCert, CacheClient, CacheDevice, CacheGroup, CacheGuest, CacheInvDevice, CacheLabel, CacheMpsk, CacheMpskNetwork, CachePortal, CacheSite, CacheTemplate, CacheFloorPlanAP, CacheBuilding | ||
@@ -263,0 +287,0 @@ |
@@ -5,8 +5,7 @@ import csv | ||
| from centralcli import common, config, render, utils | ||
| from centralcli import api_clients, common, config, render, utils | ||
| from .classic.api import ClassicAPI | ||
| from .response import Response | ||
| api = ClassicAPI(config.classic.base_url) | ||
| api = api_clients.classic | ||
@@ -13,0 +12,0 @@ |
+12
-22
@@ -13,3 +13,3 @@ #!/usr/bin/env python3 | ||
| from centralcli.client import BatchRequest | ||
| from centralcli.clitree import add, assign, caas, cancel, check, clone, convert, export, kick, refresh, rename, test, ts, unassign, update, upgrade | ||
| from centralcli.clitree import add, assign, caas, cancel, check, clone, convert, export, kick, refresh, rename, test, ts, unassign, update, upgrade, generate | ||
| from centralcli.clitree import dev as clidev | ||
@@ -55,2 +55,3 @@ from centralcli.clitree.batch import batch | ||
| app.add_typer(convert.app, name="convert",) | ||
| app.add_typer(generate.app, name="generate",) | ||
| app.add_typer(clidev.app, name="dev", hidden=True) | ||
@@ -403,3 +404,3 @@ | ||
| collect: bool = typer.Option(False, "--collect", "-c", help="Store raw webhooks in local json file", hidden=True), | ||
| yes: bool = common.options.yes, | ||
| yes: int = common.options.yes_int, | ||
| debug: bool = common.options.debug, | ||
@@ -593,5 +594,5 @@ default: bool = common.options.default, | ||
| # TOGLP | ||
| @app.command(hidden=True) | ||
| @app.command(hidden=False) | ||
| def enable( | ||
| what: EnableDisableArgs = typer.Argument("auto-sub"), | ||
| what: EnableDisableArgs = typer.Argument(...), | ||
| services: list[common.cache.LicenseTypes] = typer.Argument(..., show_default=False), # type: ignore | ||
@@ -608,10 +609,5 @@ yes: bool = common.options.yes, | ||
| """ | ||
| _msg = "[bright_green]Enable[/] auto-subscribe for license" | ||
| if len(services) > 1: # pragma: no cover | ||
| _svc_msg = '\n '.join([s.name for s in services]) | ||
| _msg = f'{_msg}s:\n {_svc_msg}\n' | ||
| else: | ||
| svc = services[0] | ||
| _msg = f'{_msg} {svc.name}' | ||
| render.econsole.print(_msg) | ||
| services: list[LicenseTypes] = services # retyping common.cache.LicenseTypes | ||
| _msg = "[bright_green]Enable[/] auto-subscribe for the following subscription tiers:" | ||
| render.econsole.print(f"{_msg} {utils.summarize_list(services, max=None)}") | ||
| render.econsole.print('\n[dark_orange]!![/] Enabling auto-subscribe applies the specified tier (i.e. foundation/advanced) for [green bold]all[/] devices of the same type.') | ||
@@ -627,5 +623,5 @@ render.econsole.print('[cyan]enable auto-sub advanced-switch-6300[/] will result in [green bold]all[/] switch models being set to auto-subscribe the advanced license appropriate for that model.') | ||
| @app.command(hidden=True) | ||
| @app.command(hidden=False) | ||
| def disable( | ||
| what: EnableDisableArgs = typer.Argument("auto-sub"), | ||
| what: EnableDisableArgs = typer.Argument(...), | ||
| services: list[common.cache.LicenseTypes] = typer.Argument(..., show_default=False), # type: ignore | ||
@@ -643,10 +639,4 @@ yes: bool = common.options.yes, | ||
| services: list[LicenseTypes] = services # retyping common.cache.LicenseTypes | ||
| _msg = "[bright_green]Disable[/] auto-subscribe for license" | ||
| if len(services) > 1: # pragma: no cover | ||
| _svc_msg = '\n '.join([s.name for s in services]) | ||
| _msg = f'{_msg}s:\n {_svc_msg}\n' | ||
| else: | ||
| svc = services[0] | ||
| _msg = f'{_msg} {svc.name}' | ||
| render.econsole.print(_msg) | ||
| _msg = "[bright_green]Disable[/] auto-subscribe for the following subscription tiers:" | ||
| render.econsole.print(f"{_msg} {utils.summarize_list(services, max=None)}") | ||
| render.econsole.print('\n[dark_orange3]:warning:[/] Disabling auto subscribe removes auto-subscribe for all models of the same type.') | ||
@@ -653,0 +643,0 @@ render.econsole.print('[cyan]disable auto-sub advanced-switch-6300[/] will result in auto-subscribe being disabled for [green bold]all[/] switch models.') |
+39
-8
@@ -38,2 +38,3 @@ from __future__ import annotations | ||
| class LoggedRequests: | ||
@@ -369,2 +370,7 @@ def __init__(self, url: str, method: str = "GET", ok: bool = None): | ||
| self.auth.handle_expired_token() | ||
| elif resp.status == 401 and self.is_cnx: | ||
| spin_txt_retry = "(retry after token refresh)" | ||
| self.spinner.start(f"Attempting to Refresh {'' if not self.is_cnx else '[green]GLP[/] '}Token") | ||
| self.auth.handle_expired_token() | ||
| self.spinner.succeed() | ||
| elif resp.status == 500: | ||
@@ -511,14 +517,36 @@ spin_txt_retry = ":shit: [bright_red blink]retry[/] after 500: [cyan]Internal Server Error[/]" | ||
| # total is provided for some calls with the total # of records available | ||
| # TODO # TOGLP need to use "next" as pagination field | ||
| # if params.get("limit") and params.get("next") and isinstance(r.raw, dict) and r.raw.get("total") and (len(r.output) + (params.get("limit", 0) * params["next"]) < r.raw["total"]): | ||
| is_events = True if url.endswith("/monitoring/v2/events") else False | ||
| do_next = do_pagination = False | ||
| if params.get("limit") and params.get("next") and isinstance(r.raw, dict) and r.raw.get("total") and (len(r.output) if params["next"] in [None, 1] else len(r.output) + (params.get("limit", 0)) * params["next"]) < r.raw["total"]: | ||
| do_pagination = True | ||
| do_next = True | ||
| if params.get(offset_key, 99) == 0 and isinstance(r.raw, dict) and r.raw.get("total") and (len(r.output) + params.get("limit", 0) < r.raw.get("total", 0)): | ||
| do_pagination = True | ||
| if do_pagination: | ||
| _total = count or r.raw["total"] if not is_events or r.raw["total"] <= 10_000 else 10_000 # events endpoint will fail if offset + limit > 10,000 | ||
| if _total > len(r.output): | ||
| _limit = params.get("limit", 100) | ||
| _offset = params.get(offset_key, 0) | ||
| br = BatchRequest | ||
| _reqs = [ | ||
| br(self.exec_api_call, url, data=data, json_data=json_data, method=method, headers=headers, params={**params, offset_key: i, "limit": _limit}, **kwargs) | ||
| for i in range(len(r.output), _total, _limit) | ||
| ] | ||
| if not do_next: | ||
| _offset = params.get(offset_key, 0) | ||
| br = BatchRequest | ||
| _reqs = [ | ||
| br(self.exec_api_call, url, data=data, json_data=json_data, method=method, headers=headers, params={**params, offset_key: i, "limit": _limit}, **kwargs) | ||
| for i in range(len(r.output), _total, _limit) | ||
| ] | ||
| else: | ||
| _next_possible_range = list(range(int(r.raw["next"]), _total + 1)) | ||
| _next_range = [i for i in _next_possible_range if i * _limit <= _total] | ||
| if _next_range[-1] * _limit < _total: | ||
| _next_range += [min([i for i in _next_possible_range if i * _limit > _total])] # TODO more elegant way to do this. | ||
| br = BatchRequest | ||
| _reqs = [ | ||
| br(self.exec_api_call, url, data=data, json_data=json_data, method=method, headers=headers, params={**params, "next": i, "limit": _limit}, **kwargs) | ||
| for i in _next_range | ||
| ] | ||
| batch_res: List[Response] = await self._batch_request(_reqs) | ||
@@ -728,5 +756,8 @@ failures: List[Response] = [r for r in batch_res if not r.ok] # A failure means both the original attempt and the retry failed. | ||
| _tot_start = time.perf_counter() | ||
| max_calls_per_chunk = MAX_CALLS_PER_CHUNK | ||
| if self.base_url == config.cnx.base_url and not any([call.args and call.args[0].startswith("http") for call in api_calls]): | ||
| max_calls_per_chunk = 10 | ||
| if self.requests: # a call has been made no need to verify first call (token refresh) | ||
| chunked_calls = utils.chunker(api_calls, MAX_CALLS_PER_CHUNK) | ||
| chunked_calls = utils.chunker(api_calls, max_calls_per_chunk) | ||
| else: | ||
@@ -741,3 +772,3 @@ resp: Response = await api_calls[0].func( | ||
| m_resp: List[Response] = utils.listify(resp) | ||
| chunked_calls = utils.chunker(api_calls[1:], MAX_CALLS_PER_CHUNK) | ||
| chunked_calls = utils.chunker(api_calls[1:], max_calls_per_chunk) | ||
@@ -744,0 +775,0 @@ # Make calls 6 at a time ensuring timing so that 7 per second limit is not exceeded |
@@ -18,7 +18,7 @@ # -*- coding: utf-8 -*- | ||
| ArgumentType = Literal["cache", "name", "device", "devices", "device_type", "what", "group", "groups", "group_dev", "site", "import_file", "wid", "version", "session_id", "ssid", "portal", "portals"] | ||
| ArgumentType = Literal["cache", "name", "device", "devices", "device_type", "what", "group", "groups", "group_dev", "site", "import_file", "wid", "version", "session_id", "ssid", "portal", "portals", "banner_file"] | ||
| OptionType = Literal[ | ||
| "client", "group", "group_many", "site", "site_many", "label", "label_many", "debug", "debugv", "device_type", "do_json", "do_yaml", "do_csv", "do_table", | ||
| "outfile", "reverse", "pager", "ssid", "yes", "yes_int", "device_many", "device", "swarm_device", "swarm", "sort_by", "default", "workspace", "verbose", | ||
| "raw", "end", "update_cache", "show_example", "at", "in", "reboot", "start", "past", "subscription", "version", "not_version", "band" | ||
| "raw", "end", "update_cache", "show_example", "at", "in", "reboot", "start", "past", "subscription", "version", "not_version", "band", "banner", "banner_file", | ||
| ] | ||
@@ -41,2 +41,3 @@ | ||
| self.import_file: ArgumentInfo = typer.Argument(None, exists=True, show_default=False,) | ||
| self.banner_file: ArgumentInfo = typer.Argument(None, help="The file with the desired banner text. [dim italic]supports .j2 (Jinja2) template[/]", exists=True, show_default=False) | ||
| self.wid: ArgumentInfo = typer.Argument(..., help="Use [cyan]show webhooks[/] to get the wid", show_default=False,) | ||
@@ -194,2 +195,4 @@ self.portal: ArgumentInfo = typer.Argument(..., metavar=iden_meta.portal, autocompletion=cache.portal_completion, show_default=False,) | ||
| self.band: OptionInfo = typer.Option(None, help=f"Show Bandwidth for a specific band [dim]{escape('[ap must be provided]')}[/]", show_default=False) | ||
| self.banner_file: OptionInfo = typer.Option(None, "--banner-file", help="The file with the desired banner text. [dim italic]supports .j2 (Jinja2) template[/]", exists=True, show_default=False) | ||
| self.banner: OptionInfo = typer.Option(False, "--banner", help="Update banner text. This option will prompt for banner text (paste into terminal)", show_default=False) | ||
| self.yes: OptionInfo = typer.Option(False, "-Y", "-y", "--yes", help="Bypass confirmation prompts - Assume Yes",) | ||
@@ -196,0 +199,0 @@ self.yes_int: OptionInfo = typer.Option( |
@@ -15,3 +15,3 @@ #!/usr/bin/env python3 | ||
| from centralcli import common, config, log, render, utils | ||
| from centralcli import api_clients, common, config, log, render, utils | ||
| from centralcli.client import BatchRequest | ||
@@ -25,11 +25,8 @@ from centralcli.constants import DevTypes, GatewayRole, NotifyToArgs, iden_meta, state_abbrev_to_pretty | ||
| from ..clicommon import APIClients | ||
| api_clients = APIClients() | ||
| api = api_clients.classic | ||
| app = typer.Typer() | ||
| color = utils.color | ||
| class AddWlanArgs(str, Enum): | ||
@@ -36,0 +33,0 @@ type = "type" |
@@ -7,5 +7,4 @@ #!/usr/bin/env python3 | ||
| from centralcli import common, log, render, utils | ||
| from centralcli import api_clients, common, log, render, utils | ||
| from centralcli.cache import CacheDevice, CacheLabel, CacheSub | ||
| from centralcli.clicommon import APIClients | ||
| from centralcli.client import BatchRequest | ||
@@ -15,3 +14,3 @@ from centralcli.constants import iden_meta | ||
| api_clients = APIClients() | ||
| api = api_clients.classic | ||
@@ -82,3 +81,3 @@ glp_api = api_clients.glp | ||
| sub: CacheSub = common.cache.get_sub_identifier(sub_name_or_id, end_date=end_date) | ||
| sub: CacheSub = common.cache.get_sub_identifier(sub_name_or_id, end_date=end_date, best_match=True) # TODO add qty param and return list of sub objects if best sub object can not satisfy the qty necessary | ||
| if len(devices) > sub.available: | ||
@@ -85,0 +84,0 @@ log.warning(f"{len(devices)} devices exceeds {sub.available}... the number of available subscriptions for [bright_green]{sub.name}[/bright_green]|[medium_spring_green]{sub.key}[/]. [dim italic]As of last Subscription cache update[/]", show=True) |
@@ -15,2 +15,3 @@ from __future__ import annotations | ||
| from centralcli.cache import api | ||
| from centralcli.constants import APIAction | ||
| from centralcli.models.imports import ImportMACs, ImportMPSKs | ||
@@ -189,2 +190,25 @@ | ||
| @app.command() | ||
| def variables( | ||
| import_file: Path = common.arguments.get("import_file", help="Path to file with variables"), | ||
| show_example: bool = common.options.show_example, | ||
| yes: bool = common.options.yes, | ||
| debug: bool = common.options.debug, | ||
| default: bool = common.options.default, | ||
| workspace: str = common.options.workspace, | ||
| ) -> None: | ||
| """Batch add variables for devices based on data from required import file. | ||
| Use [cyan]cencli batch add variables --example[/] to see example import file formats. | ||
| [italic]Accepts same format as Aruba Central UI, but also accepts .yaml[/] | ||
| """ | ||
| if show_example: | ||
| render.console.print(examples.add_variables) | ||
| return | ||
| if not import_file: | ||
| common.exit(render._batch_invalid_msg("cencli batch add variables [OPTIONS] [IMPORT_FILE]")) | ||
| common.batch_add_update_replace_variables(import_file, action=APIAction.ADD, yes=yes) | ||
| @app.command() | ||
| def macs( | ||
@@ -191,0 +215,0 @@ import_file: Path = common.arguments.import_file, |
@@ -10,2 +10,3 @@ #!/usr/bin/env python3 | ||
| from centralcli import common, render | ||
| from centralcli.constants import APIAction | ||
@@ -22,2 +23,3 @@ from . import examples | ||
| reboot: bool = typer.Option(False, "--reboot", "-R", help="Automatically reboot device if IP or VLAN is changed [dim italic]Reboot is required for changes to take effect when IP or VLAN settings are changed[/]"), | ||
| banner_file: Path = common.options.banner_file, | ||
| yes: bool = common.options.yes, | ||
@@ -29,4 +31,7 @@ debug: bool = common.options.debug, | ||
| ) -> None: | ||
| """Update per-ap-settings or ap-altitude (at AP level) in mass based on settings from import file | ||
| """Update per-ap-settings, ap-altitude, banner, etc. (at AP level) in mass based on settings from import file | ||
| When [cyan]--banner-file <file>[/] is provided. Only the banner is processed. The import_file is used as the variable file if the banner_file is a .j2 file. | ||
| i.e. Most common scenario... banner_file is a j2 with {{ hostname }} which is converted to the value from the hostname field in the import file. | ||
| Use [cyan]--example[/] to see expected import file format and required fields. | ||
@@ -41,3 +46,8 @@ """ | ||
| data = common._get_import_file(import_file, "devices") | ||
| if banner_file: | ||
| render.econsole.print(f"[deep_sky_blue]:information:[/] When --banner-file is provided. Only the banner is processed. re-run the command without banner to process any other updates from {import_file.name}") | ||
| common.batch_update_aps(data, yes=yes, reboot=reboot) | ||
@@ -73,5 +83,90 @@ | ||
| @app.command() | ||
| def ap_banner( | ||
| import_file: Path = common.arguments.import_file, | ||
| banner_file: Path = common.arguments.banner_file, | ||
| _banner_file: Path = common.options.get("banner_file", hidden=True), | ||
| banner: bool = common.options.banner, | ||
| group_level: bool = typer.Option(False, "-G", "--groups", help=f"Treat import file as group import, update ap group level configs. {render.help_block('Update applied at device level, import expected to be device import')}"), | ||
| show_example: bool = common.options.show_example, | ||
| yes: bool = common.options.yes, | ||
| debug: bool = common.options.debug, | ||
| debugv: bool = common.options.debugv, | ||
| default: bool = common.options.default, | ||
| workspace: str = common.options.workspace, | ||
| ) -> None: | ||
| """Update banner (MOTD) text for APs at group or device level | ||
| When the banner_file is a [cyan].j2[/] file. It is processed as a jinja2 template with variables coming from the import_file. | ||
| i.e. Most common scenario... banner_file is a j2 with {{ hostname }} which is converted to the value from the hostname field in the import file. | ||
| Use [cyan]--example[/] to see expected import file format and required fields. | ||
| """ | ||
| if show_example: | ||
| render.console.print("Expects .yaml, .json, or .csv file with [cyan]serial[/] unless [cyan]-G[/]|[cyan]--groups[/] is used. Then expects the same with [cyan]name[/] [dim italic](The group name)[/]") | ||
| render.console.print("Any other keys/values provided in the import will be used as variables if [cyan]banner_file[/] provided is a Jinja2 template [dim italic](.j2 file)[/]") | ||
| render.econsole.print( | ||
| "--------------------- [bright_green].yaml example for[/] [magenta]APs[/] ---------------------\n" | ||
| "- serial: CN12345678\n" | ||
| " hostname: barn.615.ab12\n" | ||
| ) | ||
| render.econsole.print( | ||
| "--------------------- [bright_green].csv example for[/] [magenta]Groups[/] ---------------------\n" | ||
| "name,some_var\n" | ||
| "group_name,some_value\n" | ||
| ) | ||
| render.econsole.print("[dark_olive_green2]See [cyan]batch update aps --example[/cyan] for expanded example device import_file format[/]") | ||
| render.econsole.print("[dark_olive_green2]See [cyan]batch add groups --example[/cyan] for expanded example group import_file format[/]") | ||
| return | ||
| if not import_file: | ||
| common.exit(render._batch_invalid_msg("cencli batch update ap-banner [OPTIONS] [IMPORT_FILE] [BANNER_FILE]")) | ||
| is_tmp_file = False | ||
| if banner: # pragma: no cover requires tty | ||
| banner_file = common.get_banner_from_user() | ||
| is_tmp_file = True | ||
| banner_file = banner_file or _banner_file | ||
| if not banner_file: | ||
| common.exit("Missing required argument 'banner_file'") | ||
| data = common._get_import_file(import_file, "devices" if not group_level else "groups") | ||
| common.batch_update_ap_banner(data, banner_file, group_level=group_level, yes=yes) | ||
| if is_tmp_file: # pragma: no cover | ||
| banner_file.unlink(missing_ok=True) | ||
| @app.command() | ||
| def variables( | ||
| import_file: Path = common.arguments.get("import_file", help="Path to file with variables"), | ||
| replace: bool = typer.Option(False, "-R", "--replace", help=f"Replace all existing variables with the variables provided {render.help_block('existing variables are retained unless updated in this payload')}"), | ||
| show_example: bool = common.options.show_example, | ||
| yes: bool = common.options.yes, | ||
| debug: bool = common.options.debug, | ||
| default: bool = common.options.default, | ||
| workspace: str = common.options.workspace, | ||
| ) -> None: | ||
| """Batch update/replace variables for devices based on data from required import file. | ||
| Use [cyan]cencli batch update variables --example[/] to see example import file formats. | ||
| [italic]Accepts same format as Aruba Central UI, but also accepts .yaml[/] | ||
| [cyan]-R[/]|[cyan]--replace[/] :triangular_flag: Will flush all existing variables (for the devices in the import_file) and replace with the variables from the import_file. | ||
| By default: Existing variables not in the import_file are left intact. | ||
| """ | ||
| if show_example: | ||
| render.console.print(examples.update_variables) | ||
| return | ||
| if not import_file: | ||
| common.exit(render._batch_invalid_msg("cencli batch update variables [OPTIONS] [IMPORT_FILE]")) | ||
| action = APIAction.REPLACE if replace else APIAction.UPDATE | ||
| common.batch_add_update_replace_variables(import_file, action=action, yes=yes) | ||
| @app.callback() | ||
| def callback(): | ||
| """Batch update devices (GreenLake Inventory) / aps.""" | ||
| """Batch update devices (GreenLake Inventory) / aps / variables.""" | ||
| pass | ||
@@ -78,0 +173,0 @@ |
@@ -9,4 +9,4 @@ #!/usr/bin/env python3 | ||
| from centralcli import common, config, render, utils | ||
| from centralcli.cache import CacheCert, CacheLabel, CacheSite, api | ||
| from centralcli import api_clients, common, config, render, utils | ||
| from centralcli.cache import CacheCert, CacheLabel, CacheSite | ||
| from centralcli.client import BatchRequest | ||
@@ -25,2 +25,3 @@ from centralcli.constants import iden_meta | ||
| api = api_clients.classic | ||
@@ -27,0 +28,0 @@ @app.command() |
@@ -84,5 +84,8 @@ #!/usr/bin/env python3 | ||
| """ | ||
| common.cache(refresh=True) | ||
| res = common.cache(refresh=True) | ||
| exit_code = 0 if all([r.ok if not hasattr(r, "all_ok") else r.all_ok for r in res]) else 1 | ||
| common.exit(code=exit_code) | ||
| # CACHE add cache for webhooks | ||
@@ -89,0 +92,0 @@ @app.command() |
@@ -9,3 +9,3 @@ #!/usr/bin/env python3 | ||
| from centralcli import cleaner, common, log, render | ||
| from centralcli import cleaner, common, render | ||
| from centralcli.cache import api | ||
@@ -80,6 +80,3 @@ from centralcli.constants import CloudAuthMacSortBy, CloudAuthUploadType, TimeRange | ||
| if resp.ok: | ||
| try: | ||
| resp.output = cleaner.cloudauth_upload_status(resp.output) | ||
| except Exception as e: # pragma: no cover | ||
| log.error(f"Error cleaning output of cloud auth mac upload {repr(e)}") | ||
| resp.output = cleaner.cloudauth_upload_status(resp.output) | ||
@@ -86,0 +83,0 @@ render.display_results( |
@@ -7,7 +7,5 @@ #!/usr/bin/env python3 | ||
| from centralcli import cleaner, common, render | ||
| from centralcli.clicommon import APIClients | ||
| from centralcli import api_clients, cleaner, common, render | ||
| from centralcli.constants import SortTsCmdOptions, TSDevTypes, iden_meta, lib_to_api # noqa | ||
| api_clients = APIClients() | ||
| api = api_clients.classic | ||
@@ -14,0 +12,0 @@ |
@@ -10,10 +10,8 @@ #!/usr/bin/env python3 | ||
| from centralcli import common, config, log, render | ||
| from centralcli import api_clients, common, config, log, render | ||
| from centralcli.client import Session | ||
| from centralcli.response import Response | ||
| from ..cache import api | ||
| from ..constants import SortGroupOptions | ||
| app = typer.Typer() | ||
| api = api_clients.classic | ||
@@ -192,4 +190,3 @@ | ||
| def command( | ||
| import_file: Path = common.arguments.import_file, | ||
| sort_by: SortGroupOptions = common.options.sort_by, | ||
| sort_by: str = common.options.sort_by, | ||
| reverse: bool = common.options.reverse, | ||
@@ -216,2 +213,12 @@ yes: bool = common.options.yes, | ||
| """ | ||
| from ..cnx.api import CentralAPI | ||
| api = CentralAPI() | ||
| tablefmt = common.get_format(do_json=do_json, do_yaml=do_yaml, do_csv=do_csv, do_table=do_table) | ||
| resp = api.session.request(api.monitoring.get_aps) # deployment=APDeployment.STANDALONE, status=DeviceStatusFilter.ONLINE, limit=3) | ||
| render.display_results(resp, tablefmt=tablefmt, outfile=outfile, pager=pager, sort_by=sort_by, reverse=reverse, output_by_key="deviceName") | ||
| # from centralcli import api_clients | ||
| # resp = api_clients.glp.session.request(api_clients.glp.devices.get_glp_devices, sort_by="archived", reverse=True) | ||
| # render.display_results(resp, tablefmt=tablefmt, outfile=outfile, pager=pager) | ||
| ... | ||
@@ -218,0 +225,0 @@ |
@@ -12,5 +12,4 @@ #!/usr/bin/env python3 | ||
| from centralcli import cleaner, common, render, utils | ||
| from centralcli import api_clients, cleaner, common, render, utils | ||
| from centralcli.cache import CacheDevice | ||
| from centralcli.clicommon import APIClients | ||
| from centralcli.constants import iden_meta, lib_to_api | ||
@@ -25,5 +24,3 @@ | ||
| api_clients = APIClients() | ||
| api = api_clients.classic | ||
| app = typer.Typer() | ||
@@ -30,0 +27,0 @@ typer.Argument = partial(typer.Argument, show_default=False) |
@@ -9,5 +9,4 @@ #!/usr/bin/env python3 | ||
| from centralcli import common, log, render, utils | ||
| from centralcli import api_clients, common, log, render, utils | ||
| from centralcli.cache import CacheDevice, CacheLabel, CacheSub | ||
| from centralcli.clicommon import APIClients | ||
| from centralcli.client import BatchRequest | ||
@@ -17,9 +16,7 @@ from centralcli.constants import iden_meta | ||
| api_clients = APIClients() | ||
| app = typer.Typer() | ||
| api = api_clients.classic | ||
| glp_api = api_clients.glp | ||
| app = typer.Typer() | ||
| # TOGLP | ||
@@ -26,0 +23,0 @@ @app.command(deprecated=True, hidden=glp_api is not None) |
@@ -13,3 +13,3 @@ #!/usr/bin/env python3 | ||
| from centralcli import cleaner, common, log, render, utils | ||
| from centralcli import cleaner, common, log, render, utils, config | ||
| from centralcli.caas import CaasAPI | ||
@@ -108,6 +108,8 @@ from centralcli.cache import CacheCert, CacheDevice, CacheGroup, CachePortal, CacheTemplate, api | ||
| @app.command(help="Update existing or add new Variables for a device/template") | ||
| @app.command() | ||
| def variables( | ||
| device: str = typer.Argument(..., metavar=iden_meta.dev, autocompletion=common.cache.dev_completion, show_default=False,), | ||
| var_value: list[str] = typer.Argument(..., help="comma seperated list 'variable = value, variable2 = value2'", show_default=False,), | ||
| device: str = common.arguments.device, | ||
| var_value: list[str] = typer.Argument(None, help=f"comma seperated list 'variable = value, variable2 = value2' [dim]{escape('[')}[red]required[/red] unless [cyan]--var-file[/] is specified{escape(']')}[/dim]", show_default=False,), | ||
| var_file: Path = typer.Option(None, "-F", "--file", help="Path to file with variables", exists=True, show_default=False,), | ||
| replace: bool = typer.Option(False, "-R", "--replace", help=f"Replace all existing variables with the variables provided {render.help_block('existing variables are retained unless updated in this payload')}"), | ||
| yes: bool = common.options.yes, | ||
@@ -118,20 +120,37 @@ debug: bool = common.options.debug, | ||
| ) -> None: | ||
| """Update/replace existing or add new Variables for a device | ||
| Use [cyan]cencli batch update variables [IMPORT_FILE][/] to update multiple devices. | ||
| If providing [cyan]--file[/] :triangular_flag: Only the device specified via the device argument will be processed, even if the file has variables for other devices defined. | ||
| [cyan]-R[/]|[cyan]--replace[/] :triangular_flag: Will flush all existing variables leaving only the variables provided. | ||
| """ | ||
| dev = common.cache.get_inv_identifier(device) | ||
| serial = dev.serial | ||
| var_dict = common.parse_var_value_list(var_value) | ||
| msg = "Sending Update" if yes else "Please Confirm: [bright_green]Update[/]" | ||
| render.econsole.print(f"{msg} {dev.rich_help_text}", emoji=False) | ||
| _ = [render.econsole.print(f' {k}: [bright_green]{v}[/]', emoji=False) for k, v in var_dict.items()] | ||
| if render.confirm(yes): | ||
| resp = api.session.request( | ||
| api.configuration.update_device_template_variables, | ||
| serial, | ||
| dev.mac, | ||
| var_dict=var_dict | ||
| var_dict = {} if not var_file else utils.unlistify(config.get_file_data(var_file)) | ||
| var_dict = var_dict.get(dev.serial, var_dict) # json by serial | ||
| if var_value: | ||
| var_dict = {**var_dict, **common.parse_var_value_list(var_value)} | ||
| if not var_dict: | ||
| common.exit( | ||
| "Missing required paramerter. [cyan]var_value[/] (args) and/or [cyan]--file[/] is required.\n" | ||
| "See [cyan]cencli update variables --help[/] for more details." | ||
| ) | ||
| render.display_results(resp, tablefmt="action") | ||
| render.econsole.print(f"[bright_green]Update{'ing' if yes else ''}[/] {dev.rich_help_text}", emoji=False) | ||
| render.econsole.print(*[f' {k}: [bright_green]{v}[/]' for k, v in var_dict.items()], emoji=False, sep="\n") | ||
| if replace: | ||
| render.econsole.print(f"\n[dark_orange3]:warning:[/] [cyan]-R[/]|[cyan]--replace[/] :triangular_flag: used. [bright_red]All existing variables will be flushed[/]. Only the variables above will be defined for [cyan]{dev.serial}[/]") | ||
| render.confirm(yes) | ||
| resp = api.session.request( | ||
| api.configuration.update_device_template_variables, | ||
| serial, | ||
| dev.mac, | ||
| var_dict=var_dict, | ||
| replace=replace | ||
| ) | ||
| render.display_results(resp, tablefmt="action") | ||
| @app.command() | ||
@@ -251,4 +270,6 @@ def group( | ||
| group_dev: str = common.arguments.get("group_dev", autocompletion=common.cache.group_dev_ap_gw_completion), | ||
| cli_file: Path = typer.Argument(..., help="File containing desired config/template in CLI format.", exists=True, show_default=False,), | ||
| cli_file: Path = typer.Argument(None, help="File containing desired config/template in CLI format.", exists=True, show_default=False,), | ||
| var_file: Path = typer.Argument(None, help="File containing variables for j2 config template.", exists=True, show_default=False,), | ||
| banner_file: Path = common.options.banner_file, | ||
| banner: bool = common.options.banner, | ||
| do_gw: bool = common.options.do_gw, | ||
@@ -261,3 +282,19 @@ do_ap: bool = common.options.do_ap, | ||
| ) -> None: | ||
| is_tmp_file = False | ||
| group_dev: CacheDevice | CacheGroup = common.cache.get_identifier(group_dev, qry_funcs=["group", "dev"], device_type=["ap", "gw"]) | ||
| if banner: # pragma: no cover requires tty | ||
| banner_file = common.get_banner_from_user() | ||
| is_tmp_file = True | ||
| if not cli_file and not banner_file: | ||
| common.exit("cli_file or --banner-file <banner file> is required") | ||
| if banner_file: | ||
| if do_gw or group_dev.is_dev and not group_dev.type == "ap": | ||
| common.exit("banner update only valid for APs or AP Groups") | ||
| common.batch_update_ap_banner(data=dict(group_dev), banner_file=banner_file, group_level=group_dev.is_group, yes=yes) | ||
| if is_tmp_file: # pragma: no cover | ||
| banner_file.unlink(missing_ok=True) | ||
| common.exit(code=0) # will exit from batch_update_ap_banner in display_results if it failed. | ||
| config_out = utils.generate_template(cli_file, var_file=var_file) | ||
@@ -264,0 +301,0 @@ cli_cmds = utils.validate_config(config_out) |
@@ -9,2 +9,3 @@ from __future__ import annotations | ||
| from ... import config | ||
| from .central.monitoring import MonitoringAPI | ||
@@ -36,3 +37,20 @@ | ||
| class CentralAPI: | ||
| def __init__(self, base_url: StrOrURL = None, *, aio_session: ClientSession = None, silent: bool = True): | ||
| self._session = Session(base_url=base_url or config.cnx.base_url, aio_session=aio_session, silent=silent, cnx=True) | ||
| @property | ||
| def session(self) -> Session: | ||
| return self._session | ||
| @session.setter | ||
| def session(self, session: Session) -> None: | ||
| self._session = session # pragma: no cover We don't use this currently | ||
| @cached_property | ||
| def monitoring(self) -> MonitoringAPI: | ||
| return MonitoringAPI(self.session) | ||
@@ -15,3 +15,2 @@ # (C) Copyright 2025 Hewlett Packard Enterprise Development LP. | ||
| class NewCentralURLs: | ||
@@ -117,3 +116,3 @@ Authentication = {"OAUTH": "https://sso.common.cloud.hpe.com/as/token.oauth2"} | ||
| def get_access_token(self, app_name: Literal["new_central", "glp"] = "glp"): | ||
| def get_access_token(self, app_name: Literal["new_central", "glp"] = "glp") -> str: | ||
| """ | ||
@@ -177,4 +176,4 @@ Create a new access token for the specified application. | ||
| """ | ||
| log.info(f"{app_name} access Token has expired.", show=True) | ||
| log.info("Handling Token Expiry...", show=True) | ||
| log.info(f"{app_name} access Token expired/invalid. Fetching new token...") | ||
| client_id, client_secret = self._return_client_credentials(app_name) | ||
@@ -181,0 +180,0 @@ if any(credential is None for credential in [client_id, client_secret]): |
@@ -555,3 +555,4 @@ from __future__ import annotations | ||
| except Exception as e: | ||
| raise UserWarning(f'Unable to load configuration from {import_file}\n{e.__class__.__name__}\n\n{e}') | ||
| e.add_note(f'Unable to load data from {import_file} due to error above') | ||
| raise e | ||
@@ -558,0 +559,0 @@ if isinstance(import_data, list): |
@@ -163,2 +163,10 @@ #!/usr/bin/env python3 | ||
| class DeviceStatusFilter(str, Enum): | ||
| ONLINE = "ONLINE" | ||
| OFFLINE = "OFFLINE" | ||
| class APDeployment(str, Enum): | ||
| STANDALONE = "Standalone" | ||
| CLUSTER = "Cluster" | ||
| class GroupDevTypes(str, Enum): | ||
@@ -187,2 +195,14 @@ ap = "ap" | ||
| class MacFormat(str, Enum): | ||
| COLS = "COLS" | ||
| DASHES = "DASHES" | ||
| DOTS = "DOTS" | ||
| CLEAN = "CLEAN" | ||
| cols = "cols" | ||
| dashes = "dashes" | ||
| dots = "dots" | ||
| clean = "clean" | ||
| # Here are all the types for the below Enum | ||
@@ -403,2 +423,7 @@ # 3: Bridge (Switch) | ||
| class APIAction(str, Enum): | ||
| ADD = "ADD" | ||
| UPDATE = "UPDATE" | ||
| REPLACE = "REPLACE" | ||
| class RadioMode(str, Enum): | ||
@@ -2408,3 +2433,2 @@ access = "access" | ||
| "--version", | ||
| "-v", | ||
| "-V" | ||
@@ -2411,0 +2435,0 @@ ] |
@@ -59,3 +59,7 @@ from __future__ import annotations | ||
| @property | ||
| def ok(self) -> bool: | ||
| return self.base_url is not None | ||
| class Tokens(BaseModel): | ||
@@ -62,0 +66,0 @@ access: Optional[str] = Field(..., alias=AliasChoices("access", "access_token", "access-token")) |
| from pydantic import BaseModel, field_serializer, ConfigDict | ||
| from typing import Dict, Any | ||
| from typing import Any | ||
| from datetime import datetime | ||
@@ -21,7 +21,7 @@ import pendulum | ||
| ) | ||
| details: Dict[str, Any] | ||
| details: dict[str, Any] | None | ||
| status: str | ||
| stats: CloudAuthUploadStats | ||
| submittedAt: datetime | ||
| lastUpdatedAt: datetime | ||
| submittedAt: datetime | None | ||
| lastUpdatedAt: datetime | None | ||
| durationNanos: int | ||
@@ -33,2 +33,4 @@ fileName: str | ||
| def pretty_dt(cls, dt: datetime) -> DateTime: | ||
| if datetime(1, 1, 1, 0, 0, tzinfo=dt.tzinfo) == dt: | ||
| return None | ||
| return DateTime(dt.timestamp()) |
@@ -287,2 +287,1 @@ """object Classes""" | ||
| return valid | ||
+41
-12
@@ -158,5 +158,6 @@ #!/usr/bin/env python3 | ||
| # TODO Partial support for sending rich.Text needs to cleaned up and make all output returned to display_results rich.text, use console.print vs typer.echo in display_results | ||
| # Output class can likely be elliminated and return rich.Text from render.output | ||
| class Output(): | ||
| def __init__(self, rawdata: str = "", prettydata: str = "", config: Config = None, tablefmt: TableFormat | None = None): | ||
| def __init__(self, rawdata: str = "", prettydata: str | Text = "", config: Config = None, tablefmt: TableFormat | None = None): | ||
| self.config = config | ||
@@ -172,5 +173,6 @@ self._file = rawdata # found typer.unstyle AFTER I built this | ||
| if self.tty: | ||
| out = self.tty if not isinstance(self.tty, Text) else str(self.tty) | ||
| pretty_up = typer.style("Up\n", fg="green") | ||
| pretty_down = typer.style("Down\n", fg="red") | ||
| out = self.tty.replace("Up\n", pretty_up).replace("Down\n", pretty_down) | ||
| out = out.replace("Up\n", pretty_up).replace("Down\n", pretty_down) | ||
| else: | ||
@@ -183,7 +185,15 @@ out = self.file | ||
| def __rich__(self): | ||
| pretty_up = "[green]Up[/]\n" | ||
| pretty_down = "[red]Down[/]\n" | ||
| out = self.tty.replace("Up\n", pretty_up).replace("Down\n", pretty_down) | ||
| is_text = False | ||
| if isinstance(self.tty, Text): | ||
| is_text = True | ||
| out = self.format_rich_text() | ||
| out = self.tty.markup | ||
| else: | ||
| pretty_up = "[green]Up[/]\n" | ||
| pretty_down = "[red]Down[/]\n" | ||
| out = self.tty.replace("Up\n", pretty_up).replace("Down\n", pretty_down) | ||
| out = self.sanitize_strings(out) | ||
| if all([is_text, out]): | ||
| out = Text.from_markup(out, emoji=":cd:" not in out) | ||
| return out if out else "\u26a0 No Data. This may be normal." | ||
@@ -201,2 +211,7 @@ | ||
| def format_rich_text(self) -> Text: | ||
| self.tty.highlight_words([" Up ", "UP "], "bright_green") | ||
| self.tty.highlight_words([" Down ", " DOWN "], "red") | ||
| return self.tty | ||
| def sanitize_strings(self, strings: str, config=None) -> str: # pragma: no cover | ||
@@ -226,3 +241,3 @@ """Sanitize Output for demos | ||
| def menu(self, data_len: int = None) -> str: | ||
| def menu(self, data_len: int = None) -> str: # pragma: no cover requires tty | ||
| def isborder(line: str) -> bool: | ||
@@ -258,3 +273,3 @@ return all(not c.isalnum() for c in list(line)) | ||
| try: | ||
| return typer.unstyle(self._file) | ||
| return typer.unstyle(self.tty) | ||
| except TypeError: | ||
@@ -413,3 +428,3 @@ return self._file | ||
| if not isinstance(data, list) or group_by not in data[0]: | ||
| log.error(f"Error in render.do_group_by_table invalid type {type(data)} or {group_by} not found in header.") | ||
| log.error(f"Error in render.build_rich_table_rows invalid type {type(data)} or {group_by} not found in header.") | ||
| [table.add_row(*list(in_dict.values())) for in_dict in data] | ||
@@ -618,3 +633,6 @@ return table | ||
| raw_data = yaml.safe_dump(json.loads(json.dumps(outdata, cls=Encoder)), sort_keys=False) | ||
| table_data = rich_capture(raw_data) | ||
| # table_data = rich_capture(raw_data.replace("'", "")) | ||
| table_data = rich_capture(Syntax(rich_capture(raw_data.replace("'", "")), "yaml", background_color=None, theme='native')) | ||
| table_data = Text.from_ansi(table_data, overflow="fold") | ||
| ... | ||
@@ -627,2 +645,4 @@ elif tablefmt == "csv": | ||
| return str(value.iso) | ||
| elif value in ["❌", "✅"]: | ||
| return False if value == "❌" else True | ||
| else: | ||
@@ -794,3 +814,3 @@ return str(value) if "," not in str(value) else f'"{value}"' | ||
| """ | ||
| style = "dim" if help_type == "default" else "dim red" | ||
| style = "dim red" if help_type == "requires" else "dim" | ||
| return f"[{style}]{escape(f'[{help_type}: {default_txt}]')}[/{style}]" | ||
@@ -958,3 +978,12 @@ | ||
| typer.echo_via_pager(outdata) if pager and tty and len(outdata) > tty.rows else typer.echo(outdata) | ||
| # display output to screen. | ||
| if isinstance(outdata.tty, Text): | ||
| emoji = ":cd:" not in outdata # HACK prevent :cd: often found in MAC addresses from being rendered as 💿 | ||
| if pager and tty and len(outdata) > tty.rows: | ||
| with console.pager: | ||
| console.print(outdata, emoji=emoji) | ||
| else: | ||
| console.print(outdata, emoji=emoji) | ||
| else: | ||
| typer.echo_via_pager(outdata) if pager and tty and len(outdata) > tty.rows else typer.echo(outdata) | ||
@@ -961,0 +990,0 @@ if caption and outdata.tablefmt != "rich": # rich prints the caption by default for all others we need to add it to the output |
@@ -12,3 +12,2 @@ #!/usr/bin/env python3 | ||
| from rich.console import Console | ||
| from rich.text import Text | ||
| from yarl import URL | ||
@@ -160,3 +159,3 @@ | ||
| self.url = response.url if isinstance(response.url, URL) else URL(response.url) | ||
| self.error = response.reason or "OK" if response.ok else "ERROR" # visualrf does not send OK for reason when call is successful | ||
| self.error = response.reason or ("OK" if response.ok else "ERROR") # visualrf does not send OK for reason when call is successful | ||
| if isinstance(response, ClientResponse): | ||
@@ -335,3 +334,3 @@ self.status = response.status | ||
| r = render.output([data], tablefmt="yaml") | ||
| r = Text.from_ansi(r.tty) | ||
| # r = Text.from_ansi(r.tty) # now yaml now set r.tty to rich.Text | ||
| r = "\n".join([f" {line}" for line in str(r).splitlines()]) | ||
@@ -338,0 +337,0 @@ else: |
@@ -32,3 +32,3 @@ #!/usr/bin/env python3 | ||
| TabLibFormats = Literal['json', 'yaml', 'csv', 'tsv', 'dbf', 'html', 'jira', 'latex', 'df', 'rst', 'cli'] | ||
| ExampleType = Literal["devices", "sites", "groups", "labels", "macs", "mpsk"] | ||
| ExampleType = Literal["devices", "sites", "groups", "labels", "macs", "mpsk", "variables"] | ||
| Action = Literal["add", "delete", "move", "rename", "other"] | ||
@@ -160,3 +160,3 @@ | ||
| self.ds = tablib.Dataset().load(self.data, format=data_format) | ||
| self.clean = self._get_clean_data(self.ds) | ||
| self.clean = self._get_clean_data(self.ds,) | ||
| self.json = self.get_json() | ||
@@ -200,3 +200,7 @@ self.yaml = self.get_yaml() | ||
| out = self._handle_bools(data) | ||
| return [utils.strip_none(inner) for inner in out] | ||
| out = [utils.strip_none(inner) for inner in out] | ||
| if not self.type == "variables": | ||
| return out | ||
| else: | ||
| return {dev["_sys_serial"]: dev for dev in out} | ||
@@ -644,3 +648,18 @@ def _handle_bools(self, data: tablib.Dataset) -> List[Dict[str, Any]]: | ||
| # -- // VARIABLES \\ -- | ||
| data="""_sys_serial,_sys_lan_mac,_sys_hostname,_sys_gateway,_sys_module_command,user_var1,user_var2 | ||
| US12345678,aabbccddeeff,snantx-idf1-sw1,10.0.30.1,type jl728a,value1,value2 | ||
| SG12345679,ffee.ddcc.bbaa,snantx-idf1-sw2,10.0.30.1,type jl728a,value1,value2 | ||
| TW12345680,01:aa:bb:cc:dd:ee,snantx-idf1-sw3,10.0.30.1,type jl728a,value1,value2""" | ||
| example = Example(data, type="variables", action="other") | ||
| clibatch_update_variables = f"""[italic cyan]cencli batch update variables IMPORT_FILE[/]: | ||
| Requires the following keys (include as header row for csv import): | ||
| [cyan]_sys_serial[/], [cyan]_sys_lan_mac[/] [italic](both are required)[/]. | ||
| {example} | ||
| """ | ||
| # -- // ARCHIVE / UNARCHIVE \\ -- | ||
@@ -817,2 +836,4 @@ clibatch_archive = f"""[italic cyan]cencli batch archive IMPORT_FILE[/]: | ||
| self.update_aps = clibatch_update_aps | ||
| self.update_variables = clibatch_update_variables | ||
| self.add_variables = clibatch_update_variables.replace("update variables", "add variables") | ||
| self.update_devices = clibatch_update_aps.replace("update aps", "update devices") | ||
@@ -822,3 +843,3 @@ self.assign_subscriptions = clibatch_assign_subscriptions | ||
| def __getattr__(self, key: str): | ||
| if key not in self.__dict__.keys(): # pragma: no cover | ||
| if key != "__iter__" and key not in self.__dict__.keys(): # pragma: no cover | ||
| log.error(f"An attempt was made to get {key} attr from ImportExamples which is not defined.") | ||
@@ -825,0 +846,0 @@ return f":warning: [bright_red]Error[/] no str defined for [cyan]ImportExamples.{key}[/]" |
@@ -31,3 +31,3 @@ import os | ||
| PortalAuthTypes = Sequence[PortalAuthType] | ||
| CacheTableName = Literal["devices", "sites", "groups", "labels", "macs", "mpsk", "subscriptions"] | ||
| ImportType = Literal["devices", "sites", "groups", "labels", "macs", "mpsk", "subscriptions", "variables"] | ||
| DynamicAntenna = Literal["narrow", "wide"] | ||
@@ -34,0 +34,0 @@ RadioType = Literal["2.4", "5", "6"] |
+16
-13
@@ -27,3 +27,3 @@ # -*- coding: utf-8 -*- | ||
| from centralcli.typedefs import StrOrURL | ||
| from centralcli.typedefs import StrOrURL, StrEnum | ||
@@ -456,3 +456,3 @@ if TYPE_CHECKING: | ||
| @staticmethod | ||
| def generate_template(template_file: Path | str, var_file: Path | str | None,) -> str: | ||
| def generate_template(template_file: Path | str, var_file: Path | str | None = None, *, config_data: list | dict = None) -> str: | ||
| """Generate configuration files based on j2 templates and provided variables.""" | ||
@@ -464,13 +464,14 @@ template_file = Path(str(template_file)) if not isinstance(template_file, Path) else template_file | ||
| if template_file.suffix == ".j2": | ||
| if not var_file: # no var file specified look for file in same dir as template with same base name and yaml/json suffix | ||
| for file in template_file.parent.iterdir(): | ||
| if file.stem == template_file.stem and file.suffix in [".yaml", ".yml", ".json"]: | ||
| var_file = file | ||
| break | ||
| if not var_file: | ||
| econsole = Console(stderr=True) | ||
| econsole.print("[dark_orange3]:warning:[/] [cyan].j2[/] file provided with no matching variable file") | ||
| raise typer.Exit(1) | ||
| if not config_data: | ||
| if not var_file: # no var file specified look for file in same dir as template with same base name and yaml/json suffix | ||
| for file in template_file.parent.iterdir(): | ||
| if file.stem == template_file.stem and file.suffix in [".yaml", ".yml", ".json"]: | ||
| var_file = file | ||
| break | ||
| if not var_file: | ||
| econsole = Console(stderr=True) | ||
| econsole.print("[dark_orange3]:warning:[/] [cyan].j2[/] file provided with no matching variable file") | ||
| raise typer.Exit(1) | ||
| config_data = yaml.load(var_file.read_text(), Loader=yaml.SafeLoader) | ||
| config_data = yaml.load(var_file.read_text(), Loader=yaml.SafeLoader) | ||
@@ -612,3 +613,3 @@ env = Environment(loader=FileSystemLoader(str(template_file.parent)), trim_blocks=True, lstrip_blocks=True) | ||
| @staticmethod | ||
| def summarize_list(items: List[str], max: int = 6, pad: int = 4, sep: str = '\n', color: str | None = 'cyan', italic: bool = False, bold: bool = False) -> str: | ||
| def summarize_list(items: List[str, StrEnum], max: int = 6, pad: int = 4, sep: str = '\n', color: str | None = 'cyan', italic: bool = False, bold: bool = False, use_enum_name: bool = False) -> str: | ||
| if not items: | ||
@@ -627,2 +628,4 @@ return "" | ||
| enum_attr = "value" if not use_enum_name else "name" | ||
| items = [item if not hasattr(item, enum_attr) else getattr(item, enum_attr) for item in items] | ||
| items = [f'{"" if not pad else " " * pad}{fmt}{item}{"[/]" if fmt else ""}' for item in items] | ||
@@ -629,0 +632,0 @@ if len(items) == 1: # If there is only 1 item we return it with just the formatting and strip the pad |
@@ -30,8 +30,8 @@ #!/usr/bin/env python | ||
| * http://pygments.org/docs/api/#pygments.lexer.Lexer.name | ||
| aliases {list} – languages, against whose GFM block names CsvLexer will apply | ||
| aliases {list} - languages, against whose GFM block names CsvLexer will apply | ||
| * https://git.io/fhjla | ||
| filenames {list} – file name patterns, for whose contents CsvLexer will apply | ||
| tokens {dict} – regular expressions internally matching CsvLexer’s components | ||
| filenames {list} - file name patterns, for whose contents CsvLexer will apply | ||
| tokens {dict} - regular expressions internally matching CsvLexer's components | ||
| Based on StackOverflow user Adobe’s code: | ||
| Based on StackOverflow user Adobe's code: | ||
| * https://stackoverflow.com/a/25508711/298171 | ||
@@ -46,60 +46,60 @@ """ | ||
| 'root': [ | ||
| (r'^[^,\n]*', Operator, 'second'), | ||
| (r'^[^,\n]*', Operator, 'second'), | ||
| ], | ||
| 'second': [ | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Name.Constant), 'third'), | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Name.Constant), 'third'), | ||
| ], | ||
| 'third': [ | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Declaration), 'fourth'), | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Declaration), 'fourth'), | ||
| ], | ||
| 'fourth': [ | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Literal.Number), 'fifth'), | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Literal.Number), 'fifth'), | ||
| ], | ||
| 'fifth': [ | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Literal.String.Single), 'sixth'), | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Literal.String.Single), 'sixth'), | ||
| ], | ||
| 'sixth': [ | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Name.Constant), 'seventh'), | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Name.Constant), 'seventh'), | ||
| ], | ||
| 'seventh': [ | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Namespace), 'eighth'), | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Namespace), 'eighth'), | ||
| ], | ||
| 'eighth': [ | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Literal.Number), 'ninth'), | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Literal.Number), 'ninth'), | ||
| ], | ||
| 'ninth': [ | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Literal.String.Single), 'tenth'), | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Literal.String.Single), 'tenth'), | ||
| ], | ||
| 'tenth': [ | ||
| (r'(,)([^,\n]*)', Operator, 'eleventh'), | ||
| (r'(,)([^,\n]*)', Operator, 'eleventh'), | ||
| ], | ||
| 'eleventh': [ | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Name.Constant), 'twelfth'), | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Name.Constant), 'twelfth'), | ||
| ], | ||
| 'twelfth': [ | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Declaration), 'thirteenth'), | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Declaration), 'thirteenth'), | ||
| ], | ||
| 'thirteenth': [ | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Literal.Number), 'fourteenth'), | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Literal.Number), 'fourteenth'), | ||
| ], | ||
| 'fourteenth': [ | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Literal.String.Single), 'fifteenth'), | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Literal.String.Single), 'fifteenth'), | ||
| ], | ||
| 'fifteenth': [ | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Name.Constant), 'sixteenth'), | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Name.Constant), 'sixteenth'), | ||
| ], | ||
| 'sixteenth': [ | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Namespace), 'seventeenth'), | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Namespace), 'seventeenth'), | ||
| ], | ||
| 'seventeenth': [ | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Literal.Number), 'eighteenth'), | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Literal.Number), 'eighteenth'), | ||
| ], | ||
| 'eighteenth': [ | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Literal.String.Single), 'nineteenth'), | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Literal.String.Single), 'nineteenth'), | ||
| ], | ||
| 'nineteenth': [ | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Type), 'unsupported'), | ||
| (r'(,)([^,\n]*)', bygroups(Punctuation, Keyword.Type), 'unsupported'), | ||
| ], | ||
| 'unsupported': [ | ||
| (r'(.+)', bygroups(Punctuation)), | ||
| (r'(.+)', bygroups(Punctuation)), | ||
| ], | ||
@@ -106,0 +106,0 @@ } |
@@ -10,3 +10,3 @@ import sys | ||
| batch_dir = Path().home() / "git/myrepos/cencli-batch" | ||
| batch_dir = Path().home() / "git/cencli-batch" | ||
@@ -13,0 +13,0 @@ # -- break up arguments passed as single string from vscode promptString -- |
+256
-10
| # pragma: exclude file Still a WIP | ||
| import base64 | ||
| from functools import lru_cache | ||
| import ipaddress | ||
| from itertools import groupby | ||
| from typing import Literal | ||
| import aiohttp | ||
| from google.protobuf.json_format import MessageToDict | ||
| from rich import inspect | ||
@@ -10,7 +14,7 @@ from rich.console import Console | ||
| from centralcli import render | ||
| from centralcli.cache import api | ||
| from centralcli.cache import CacheDevice, api | ||
| from centralcli.models.config import WSSConfig | ||
| from centralcli.typedefs import LogType | ||
| from . import config, log | ||
| from . import config, log, utils, cache | ||
| from .objects import DateTime | ||
@@ -28,13 +32,86 @@ from .protobuf import audit_pb2, monitoring_pb2, streaming_pb2 | ||
| IGNORED_MSG_TYPES = ["STAT_UPLINK", "STAT_CLIENT"] | ||
| pretty_value = { | ||
| "ADD": "[bright_green]ADD[/]", | ||
| "DELETE": "[red]DELETE[/]", | ||
| "UPDATE": "[dark_orange3]UPDATE[/]", | ||
| "UP": "[bright_green]UP[/]", | ||
| "DOWN": "[red]DOWN[/]", | ||
| } | ||
| # TODO need to convert mac / ip / essid fields as described in readme of https://github.com/aruba/central-python-workflows/tree/main/streaming-api-client | ||
| async def _decode(data, field_type: FieldType = "mac"): | ||
| def _decode(data, field_type: FieldType = "ip"): | ||
| """ | ||
| Decode fields from protobuf payloads. | ||
| - If `data` is a base64-encoded string (MessageToDict output), decode to bytes. | ||
| - If already bytes/bytearray, use as-is. | ||
| - For 'essid' return UTF-8 string (fallback to bytes on decode error). | ||
| - For 'ip' return human-readable address (IPv4 or IPv6) via ipaddress. | ||
| - For 'mac' return colon-separated lower-case MAC using utils.Mac. | ||
| """ | ||
| try: | ||
| _data = base64.b64decode(data) | ||
| if field_type == "essid": | ||
| return _data | ||
| raw = None | ||
| # MessageToDict may return nested dicts for complex types like IpAddress | ||
| if isinstance(data, dict): | ||
| # Try to extract the nested 'addr' or numeric representation | ||
| if "addr" in data: | ||
| data = data["addr"] | ||
| elif "ip" in data: | ||
| data = data["ip"] | ||
| else: | ||
| # nothing to decode | ||
| return data | ||
| if isinstance(data, str): | ||
| # MessageToDict encodes bytes as base64 strings; try to decode | ||
| try: | ||
| raw = base64.b64decode(data) | ||
| except Exception: | ||
| # not base64 — assume already human readable string | ||
| if field_type in ["essid", "network"]: | ||
| return data | ||
| if field_type == "ip": | ||
| if data.is_digit(): # Some ip address fields represented as str(int) i.e. probeIp | ||
| raw = int(data) | ||
| else: | ||
| return data | ||
| if field_type == "mac": | ||
| try: | ||
| return utils.Mac(data).cols | ||
| except Exception: | ||
| return data | ||
| return data | ||
| elif isinstance(data, (bytes, bytearray)): | ||
| raw = bytes(data) | ||
| elif isinstance(data, int): | ||
| raw = data | ||
| else: | ||
| return data | ||
| if field_type in ["essid", "network"]: | ||
| try: | ||
| return raw.decode("utf-8", errors="replace") | ||
| except Exception: | ||
| return raw | ||
| if field_type == "ip": | ||
| return '.'.join('%d' % byte for byte in _data) | ||
| # raw might be bytes (base64-decoded), or an integer already (MessageToDict can return ints for some proto variants) | ||
| try: | ||
| if isinstance(raw, int): | ||
| return str(ipaddress.ip_address(raw)) | ||
| # bytes -> integer big endian | ||
| addr = ipaddress.ip_address(int.from_bytes(raw, "big")) | ||
| return str(addr) | ||
| except Exception: | ||
| # fallback: dotted decimal for 4-byte IPv4 | ||
| if isinstance(raw, (bytes, bytearray)) and len(raw) == 4: | ||
| return '.'.join(str(b) for b in raw) | ||
| return raw | ||
| if field_type == "mac": | ||
| return utils.Mac(raw).cols | ||
| return ':'.join('%02x' % ord(byte) for byte in _data) | ||
| return ':'.join(f'{b:02x}' for b in raw) | ||
| except Exception as e: | ||
@@ -97,2 +174,170 @@ log.exception(f"Exception while attempting to decode {field_type} in wss payload. \n{e}") | ||
| def decode_mac_field(data): | ||
| """ | ||
| Accepts either: | ||
| - a base64 string (as produced by MessageToDict for bytes fields), or | ||
| - raw bytes (as produced by parsed protobuf message) | ||
| Returns MAC as colon-separated lower-case string: '00:11:22:33:44:55' | ||
| """ | ||
| # convert base64 string -> bytes, or accept bytes as-is | ||
| if isinstance(data, str): | ||
| try: | ||
| raw = base64.b64decode(data) | ||
| except Exception: | ||
| # not base64 → return original | ||
| return data | ||
| elif isinstance(data, (bytes, bytearray)): | ||
| raw = bytes(data) | ||
| else: | ||
| return data | ||
| # Use helper to format nicely | ||
| return utils.Mac(raw).cols | ||
| def extract_ranges(nums): | ||
| nums = sorted(set(nums)) | ||
| try: | ||
| groups = (list(g) for _, g in groupby(enumerate(nums), key=lambda iv: iv[1] - iv[0])) | ||
| except Exception as e: | ||
| log.error(f"{repr(e)} in extract_ranges, {nums = }", show=True) | ||
| return f"{nums[0]} - {nums[-1]}" | ||
| return ",".join([f"{g[0][1]}-{g[-1][1]}" if g[0][1] != g[-1][1] else str(g[0][1]) for g in groups]) | ||
| @lru_cache | ||
| def _get_device(_key, _value): | ||
| dev = cache.devices_by_serial.get(_value) | ||
| if not dev: | ||
| return _value | ||
| return CacheDevice(dev).rich_help_text | ||
| def get_devices(as_dict, key): | ||
| device_fields = ["deviceId", "associatedDevice"] | ||
| return {**as_dict, key: [{k: v if k not in device_fields else _get_device(k, v) for k, v in inner.items()} for inner in as_dict[key]]} | ||
| def colorize_fields(as_dict: dict, key) -> dict: | ||
| color_fields = ["status", "action", "operState", "adminState"] | ||
| return {**as_dict, key: [{k: v if k not in color_fields else pretty_value.get(v, v) for k, v in inner.items()} for inner in as_dict[key]]} | ||
| def get_macs(as_dict, key) -> dict: | ||
| as_dict = colorize_fields(as_dict, key=key) | ||
| mac_keys = { | ||
| "timestamp": "timestamp", | ||
| "uptime": "uptime", | ||
| "deviceId": "device", | ||
| "associatedDevice": "associated_device", | ||
| "network": "network", | ||
| "essid": "essid", | ||
| "ipAddress": "ip", | ||
| "probeIpAddr": "probe_ip", | ||
| "macaddr": "mac", | ||
| "radioMac": "radio_mac", | ||
| "interfaceMac": "interface_mac", | ||
| "peerMac": "peer_mac", | ||
| "localMac": "local_mac", | ||
| "srcIp": "src_ip", | ||
| "dstIp": "dst_ip" | ||
| } | ||
| for mkey in mac_keys.keys(): | ||
| if mkey == "timestamp": | ||
| macs = [DateTime(iface[mkey]) for iface in as_dict[key] if mkey in iface] | ||
| elif mkey in ["deviceId", "associatedDevice"]: | ||
| as_dict = get_devices(as_dict, key=key) | ||
| continue | ||
| elif mkey == "uptime": | ||
| as_dict = {**as_dict, key: [{k: v if k != "uptime" else DateTime(v, "durwords-short", round_to_minute=True) for k, v in inner.items()} for inner in as_dict[key]]} | ||
| continue | ||
| elif mkey in ["essid", "network"]: | ||
| macs = [_decode(iface[mkey], field_type=mkey) for iface in as_dict[key] if mkey in iface] | ||
| elif mkey in ["probeIpAddr"]: | ||
| macs = [_decode(iface[mkey], field_type=mac_keys[mkey].split("_")[-1]) for iface in as_dict[key] if mkey in iface] | ||
| else: | ||
| macs = [_decode(iface[mkey]["addr"], field_type=mac_keys[mkey].split("_")[-1]) for iface in as_dict[key] if mkey in iface] | ||
| if macs: | ||
| as_dict[key] = [{k if k != mkey else mac_keys[mkey]: v if k != mkey else mac for k, v in iface.items()} for iface, mac in zip(as_dict[key], macs)] | ||
| return as_dict | ||
| def get_ips(as_dict, key) -> dict: | ||
| ip_keys = { | ||
| "ipAddress": "ip", | ||
| } | ||
| for ip_key in ip_keys.keys(): | ||
| ips = [_decode(iface[ip_key]["addr"]) for iface in as_dict[key] if ip_key in iface] | ||
| if ips: | ||
| as_dict[key] = [{k if k != ip_key else ip_keys[ip_key]: v if k != ip_key else ip for k, v in iface.items()} for iface, ip in zip(as_dict[key], ips)] | ||
| return as_dict | ||
| def format_pb_data(pb_data: monitoring_pb2.MonitoringInformation) -> dict: | ||
| as_dict = MessageToDict(pb_data) | ||
| if pb_data.interfaces: | ||
| allowed_vlans = [extract_ranges(iface["allowedVlan"]) for iface in as_dict["interfaces"] if "allowedVlan" in iface] | ||
| if allowed_vlans: | ||
| as_dict["interfaces"] = [{**iface, "allowedVlan": v} for iface, v in zip(as_dict["interfaces"], allowed_vlans)] | ||
| # macs = get_macs(as_dict["interfaces"]) | ||
| # as_dict["interfaces"] = [{**iface, "mac": v} for iface, v in zip(as_dict["interfaces"], macs)] | ||
| as_dict = get_macs(as_dict, "interfaces") | ||
| if pb_data.interface_stats: | ||
| as_dict = get_macs(as_dict, "interfaceStats") | ||
| if pb_data.client_stats: | ||
| as_dict = get_macs(as_dict, "clientStats") | ||
| if pb_data.wired_clients: | ||
| as_dict = get_macs(as_dict, "wiredClients") | ||
| if pb_data.wireless_clients: | ||
| as_dict = get_macs(as_dict, "wirelessClients") | ||
| if pb_data.vap_stats: | ||
| as_dict = get_macs(as_dict, "vapStats") | ||
| if pb_data.radio_stats: | ||
| as_dict = get_macs(as_dict, "radioStats") | ||
| if pb_data.ipprobe_stats: | ||
| as_dict = get_macs(as_dict, "ipprobeStats") | ||
| if pb_data.ssid_stats: | ||
| as_dict = get_macs(as_dict, "ssidStats") | ||
| if pb_data.ike_tunnels: | ||
| as_dict = get_macs(as_dict, "ikeTunnels") | ||
| if pb_data.device_stats: | ||
| as_dict = get_macs(as_dict, "deviceStats") | ||
| if pb_data.uplink_probe_stats: | ||
| as_dict = get_macs(as_dict, "uplinkProbeStats") | ||
| if pb_data.uplink_wan_stats: | ||
| as_dict = get_macs(as_dict, "uplinkWanStats") | ||
| if pb_data.vlan_stats: | ||
| as_dict = get_macs(as_dict, "vlanStats") | ||
| if pb_data.device_neighbours: | ||
| as_dict = get_macs(as_dict, "deviceNeighbours") | ||
| if pb_data.tunnel_stats: | ||
| as_dict = get_macs(as_dict, "tunnelStats") | ||
| if pb_data.uplink_stats: | ||
| as_dict = get_macs(as_dict, "uplinkStats") | ||
| if pb_data.switches: | ||
| as_dict = get_macs(as_dict, "switches") | ||
| if pb_data.aps: | ||
| as_dict = get_macs(as_dict, "aps") | ||
| if pb_data.mobility_controllers: | ||
| as_dict = get_macs(as_dict, "mobilityControllers") | ||
| if pb_data.radios: | ||
| as_dict = get_macs(as_dict, "radios") | ||
| if pb_data.vaps: | ||
| as_dict = get_macs(as_dict, "vaps") | ||
| if pb_data.networks: | ||
| as_dict = get_macs(as_dict, "networks") | ||
| if pb_data.tunnels: | ||
| as_dict = get_macs(as_dict, "tunnels") | ||
| as_dict = {"timestamp": DateTime(pb_data.timestamp), **{k: v for k, v in as_dict.items() if k != "timestamp"}} # Move timestamp to top and format | ||
| del as_dict["customerId"] | ||
| return as_dict | ||
| # TODO base_url will be required once not hardcoded, need to determine if base-url can be determined reliably from central base and provide config option for it. | ||
@@ -135,4 +380,5 @@ async def follow_logs(wss_config: WSSConfig, log_type: LogType = "event"): | ||
| # asyncio.create_task(_clean_data(monitoring_data)) | ||
| console.print(f"-- {DateTime(stream_data.timestamp / 1000 / 1000 / 1000)} --\n{pb_data}") | ||
| ... | ||
| if [t for t in MessageToDict(pb_data).get("dataElements", []) if t not in IGNORED_MSG_TYPES]: | ||
| pb_dict = format_pb_data(pb_data) | ||
| render.display_results(data=pb_dict, tablefmt="yaml") | ||
| elif msg.type == aiohttp.WSMsgType.ERROR: | ||
@@ -139,0 +385,0 @@ econsole.print(msg.data) |
+11
-86
| Metadata-Version: 2.4 | ||
| Name: centralcli | ||
| Version: 9.1.0 | ||
| Version: 9.2.3 | ||
| Summary: A CLI for interacting with Aruba Central (Cloud Management Platform). Facilitates bulk imports, exports, reporting. A handy tool if you have devices managed by Aruba Central. | ||
@@ -25,2 +25,3 @@ License-Expression: MIT | ||
| Provides-Extra: speedups | ||
| Provides-Extra: xlsx | ||
| Requires-Dist: Brotli ; (platform_python_implementation == "CPython") and (extra == "speedups") | ||
@@ -53,2 +54,3 @@ Requires-Dist: PyYAML (>=6) | ||
| Requires-Dist: tablib (>=3) | ||
| Requires-Dist: tablib[xlsx] ; extra == "xlsx" | ||
| Requires-Dist: tabulate (>=0.8) | ||
@@ -69,2 +71,3 @@ Requires-Dist: tinydb (>=4) | ||
| [](https://raw.githubusercontent.com/Pack3tL0ss/central-api-cli/master/docs/img/coverage.svg) | ||
| [](https://www.codefactor.io/repository/github/pack3tl0ss/central-api-cli/overview/master) | ||
| [](https://pepy.tech/project/centralcli) | ||
@@ -105,3 +108,3 @@ [](https://pypi.org/project/centralcli/) | ||
| ### uv | ||
| #### Install `uv` on Linx/Mac: | ||
| #### Install `uv` on Linux/Mac: | ||
| ```bash | ||
@@ -124,9 +127,11 @@ # quick install on MacOS/Linux | ||
| ```bash | ||
| # install centralcli (showing command with optional `--python 3.13` flag see note below) | ||
| uv tool install --python 3.13 centralcli | ||
| # install centralcli (showing command with optional `--python 3.14` flag see note below) | ||
| uv tool install --python 3.14 centralcli | ||
| ``` | ||
| > Including `--python 3.13` above is optional provided you have python installed on the system. However uv facilitates deploying a stand-alone version of python with the package. There are likely speed advantages to doing this if your current version is dated (check with `python -V`) | ||
| > Including `--python 3.14` above is optional. However uv facilitates deploying a stand-alone version of python with the package. There are likely speed advantages to doing this if your current version is dated (check with `python -V`) | ||
| > Run `cencli --install-completion` to enable auto/tab completion in your shell once installed. *Shell needs to be restarted to take effect*. | ||
| Then to Upgrade `centralcli` | ||
@@ -137,84 +142,4 @@ ```bash | ||
| ### pipx | ||
| `centralcli` can also be installed via pipx, similar to `uv` pipx will install `centralcli` in an isolated environment, and expose the `cencli` command in PATH. | ||
| > Alternative (less preferred) installation methods using pipx or pip are described [here](other_install.md). | ||
| > The first section below is for Debian based systems refer to [pipx documentation](https://pipx.pypa.io/stable/installation/) for instructions on other OSs. | ||
| ```bash | ||
| # install pipx (Debian) | ||
| sudo apt update | ||
| sudo apt install pipx | ||
| pipx ensurepath | ||
| # install central CLI | ||
| pipx install centralcli --include-deps | ||
| # optional install speedups for centralcli (this pulls in additional optional dependencies, that can improve performance.) Minimal impact in most scenarios. | ||
| pipx install centralcli[speedups] --force # force if centralcli was already installed and you are just adding speedups | ||
| ``` | ||
| Then to Upgrade `centralcli` | ||
| ```bash | ||
| pip upgrade centralcli | ||
| ``` | ||
| ### pip (manually install in virtual environment) | ||
| > The example below is for Debian based systems, where `apt` is referenced but should be easy to translate to other OSs given you have some familiarity with the package management commands (i.e. `dnf`). On Windows python should install with pip. The pip commands are still valid. | ||
| ```shell | ||
| # (Debian) If you don't have pip | ||
| sudo apt update | ||
| sudo apt install python3-pip | ||
| sudo apt install python3-virtualenv | ||
| # create a directory to store the venv in | ||
| cd ~ # ensure you are in your home dir | ||
| mkdir .venvs # creates hidden .venvs dir to store venv in | ||
| cd .venvs # change to that directory | ||
| export DEB_PYTHON_INSTALL_LAYOUT='deb' # Just ensures the directory structure for simpler instructions (Ubuntu 22.04 changed the dir layout of venvs without it) | ||
| python3 -m virtualenv centralcli --prompt cencli # prompt is optional | ||
| source centralcli/bin/activate # activates the venv | ||
| # Install centralcli | ||
| pip install centralcli | ||
| # optional install speedups for centralcli (this pulls in additional optional dependencies, that can improve performance.) Minimal impact in most scenarios. | ||
| pip install centralcli[speedups] | ||
| which centralcli # Should return ~/.venvs/centralcli/bin/centralcli | ||
| # for BASH shell Update .bashrc to update PATH on login (keep the single quotes) | ||
| echo 'export PATH="$PATH:$HOME/.venvs/centralcli/bin"' >> ~/.bashrc | ||
| # for zsh or others, do the equivalent... i.e. update .zshrc in a similar manner | ||
| ``` | ||
| Then to upgrade: | ||
| ```bash | ||
| ~/.venvs/centralcli/bin/pip install -U centralcli | ||
| ``` | ||
| ### pip (in system python environment) | ||
| [Requires python3.10 or above and pip.](#if-you-don-t-have-python) | ||
| It's recommended to use the `uv` install method above, however if you don't use a lot of python apps (meaning a dependency conflict with other apps is not a concern). Then simply installing via pip is possible (*albeit not recommended*). | ||
| > This method is primarily feasible on Windows, as current versions of many Linux distributions do not allow installing apps in the system python environment. | ||
| ```bash | ||
| pip install centralcli | ||
| # optional install speedups for centralcli (this pulls in additional optional dependencies, that can improve performance.) Minimal impact in most scenarios. | ||
| pip install centralcli[speedups] | ||
| ``` | ||
| Then to upgrade: | ||
| ```bash | ||
| pip install -U centralcli | ||
| ``` | ||
| ### if you don't have python | ||
| - You can get it for any platform @ [https://www.python.org](https://www.python.org) | ||
| - On Windows 10 it's also available in the Windows store, and via winget. | ||
| ## Configuration | ||
@@ -221,0 +146,0 @@ ✨ pre-populating the config as described below is optional. Central CLI will prompt for the information it needs on first run if no config exists. |
+16
-7
| [project] | ||
| name = "centralcli" | ||
| version = "9.1.0" | ||
| version = "9.2.3" | ||
| description = "A CLI for interacting with Aruba Central (Cloud Management Platform). Facilitates bulk imports, exports, reporting. A handy tool if you have devices managed by Aruba Central." | ||
@@ -66,9 +66,10 @@ license = "MIT" | ||
| [project.optional-dependencies] | ||
| hook-proxy = [ "fastapi", "uvicorn", "psutil"] | ||
| hook-proxy = ["fastapi", "uvicorn", "psutil"] | ||
| speedups = [ | ||
| "aiodns>=3.2.0;platform_system != 'Windows'", | ||
| "aiodns>=3.2.0;platform_system != 'Windows'", | ||
| "pycares<5.0.0;platform_system != 'Windows'", | ||
| "Brotli;platform_python_implementation == \"CPython\"", | ||
| "Brotli;platform_python_implementation == \"CPython\"", | ||
| "brotlicffi;platform_python_implementation != 'CPython'" | ||
| ] | ||
| xlsx = ["tablib[xlsx]"] | ||
@@ -90,2 +91,3 @@ [project.urls] | ||
| jsonref = ">=1.1.0" | ||
| nox-uv = ">=0.6.3" | ||
@@ -115,6 +117,4 @@ [build-system] | ||
| [dependency-groups] | ||
| nox = [ | ||
| test = [ | ||
| "nox-uv>=0.6.3", | ||
| ] | ||
| test = [ | ||
| "pytest", | ||
@@ -128,2 +128,11 @@ "pytest-cov", | ||
| ] | ||
| dev = [ | ||
| "ruff>=0.8.0", | ||
| "pytest>=6", | ||
| "pytest-cov>=6.2.1", | ||
| "nox>=2025.5.1", | ||
| "pytest-asyncio>=1.2.0", | ||
| "jsonref>=1.1.0", | ||
| "nox-uv>=0.6.3", | ||
| ] | ||
@@ -130,0 +139,0 @@ [tool.pytest.ini_options] |
+8
-85
@@ -6,2 +6,3 @@ # Aruba Central API CLI | ||
| [](https://raw.githubusercontent.com/Pack3tL0ss/central-api-cli/master/docs/img/coverage.svg) | ||
| [](https://www.codefactor.io/repository/github/pack3tl0ss/central-api-cli/overview/master) | ||
| [](https://pepy.tech/project/centralcli) | ||
@@ -42,3 +43,3 @@ [](https://pypi.org/project/centralcli/) | ||
| ### uv | ||
| #### Install `uv` on Linx/Mac: | ||
| #### Install `uv` on Linux/Mac: | ||
| ```bash | ||
@@ -61,9 +62,11 @@ # quick install on MacOS/Linux | ||
| ```bash | ||
| # install centralcli (showing command with optional `--python 3.13` flag see note below) | ||
| uv tool install --python 3.13 centralcli | ||
| # install centralcli (showing command with optional `--python 3.14` flag see note below) | ||
| uv tool install --python 3.14 centralcli | ||
| ``` | ||
| > Including `--python 3.13` above is optional provided you have python installed on the system. However uv facilitates deploying a stand-alone version of python with the package. There are likely speed advantages to doing this if your current version is dated (check with `python -V`) | ||
| > Including `--python 3.14` above is optional. However uv facilitates deploying a stand-alone version of python with the package. There are likely speed advantages to doing this if your current version is dated (check with `python -V`) | ||
| > Run `cencli --install-completion` to enable auto/tab completion in your shell once installed. *Shell needs to be restarted to take effect*. | ||
| Then to Upgrade `centralcli` | ||
@@ -74,84 +77,4 @@ ```bash | ||
| ### pipx | ||
| `centralcli` can also be installed via pipx, similar to `uv` pipx will install `centralcli` in an isolated environment, and expose the `cencli` command in PATH. | ||
| > Alternative (less preferred) installation methods using pipx or pip are described [here](other_install.md). | ||
| > The first section below is for Debian based systems refer to [pipx documentation](https://pipx.pypa.io/stable/installation/) for instructions on other OSs. | ||
| ```bash | ||
| # install pipx (Debian) | ||
| sudo apt update | ||
| sudo apt install pipx | ||
| pipx ensurepath | ||
| # install central CLI | ||
| pipx install centralcli --include-deps | ||
| # optional install speedups for centralcli (this pulls in additional optional dependencies, that can improve performance.) Minimal impact in most scenarios. | ||
| pipx install centralcli[speedups] --force # force if centralcli was already installed and you are just adding speedups | ||
| ``` | ||
| Then to Upgrade `centralcli` | ||
| ```bash | ||
| pip upgrade centralcli | ||
| ``` | ||
| ### pip (manually install in virtual environment) | ||
| > The example below is for Debian based systems, where `apt` is referenced but should be easy to translate to other OSs given you have some familiarity with the package management commands (i.e. `dnf`). On Windows python should install with pip. The pip commands are still valid. | ||
| ```shell | ||
| # (Debian) If you don't have pip | ||
| sudo apt update | ||
| sudo apt install python3-pip | ||
| sudo apt install python3-virtualenv | ||
| # create a directory to store the venv in | ||
| cd ~ # ensure you are in your home dir | ||
| mkdir .venvs # creates hidden .venvs dir to store venv in | ||
| cd .venvs # change to that directory | ||
| export DEB_PYTHON_INSTALL_LAYOUT='deb' # Just ensures the directory structure for simpler instructions (Ubuntu 22.04 changed the dir layout of venvs without it) | ||
| python3 -m virtualenv centralcli --prompt cencli # prompt is optional | ||
| source centralcli/bin/activate # activates the venv | ||
| # Install centralcli | ||
| pip install centralcli | ||
| # optional install speedups for centralcli (this pulls in additional optional dependencies, that can improve performance.) Minimal impact in most scenarios. | ||
| pip install centralcli[speedups] | ||
| which centralcli # Should return ~/.venvs/centralcli/bin/centralcli | ||
| # for BASH shell Update .bashrc to update PATH on login (keep the single quotes) | ||
| echo 'export PATH="$PATH:$HOME/.venvs/centralcli/bin"' >> ~/.bashrc | ||
| # for zsh or others, do the equivalent... i.e. update .zshrc in a similar manner | ||
| ``` | ||
| Then to upgrade: | ||
| ```bash | ||
| ~/.venvs/centralcli/bin/pip install -U centralcli | ||
| ``` | ||
| ### pip (in system python environment) | ||
| [Requires python3.10 or above and pip.](#if-you-don-t-have-python) | ||
| It's recommended to use the `uv` install method above, however if you don't use a lot of python apps (meaning a dependency conflict with other apps is not a concern). Then simply installing via pip is possible (*albeit not recommended*). | ||
| > This method is primarily feasible on Windows, as current versions of many Linux distributions do not allow installing apps in the system python environment. | ||
| ```bash | ||
| pip install centralcli | ||
| # optional install speedups for centralcli (this pulls in additional optional dependencies, that can improve performance.) Minimal impact in most scenarios. | ||
| pip install centralcli[speedups] | ||
| ``` | ||
| Then to upgrade: | ||
| ```bash | ||
| pip install -U centralcli | ||
| ``` | ||
| ### if you don't have python | ||
| - You can get it for any platform @ [https://www.python.org](https://www.python.org) | ||
| - On Windows 10 it's also available in the Windows store, and via winget. | ||
| ## Configuration | ||
@@ -158,0 +81,0 @@ ✨ pre-populating the config as described below is optional. Central CLI will prompt for the information it needs on first run if no config exists. |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
4884761
0.92%116
1.75%112392
0.79%