telnetlib3
Advanced tools
| """ | ||
| ATASCII (Atari 8-bit) encoding. | ||
| ATASCII is the character encoding used by Atari 8-bit computers (400, 800, | ||
| XL, XE series). It shares the printable ASCII range 0x20-0x7A with standard | ||
| ASCII, but replaces control codes 0x00-0x1F with graphics characters and uses | ||
| 0x80-0xFF as inverse-video variants. | ||
| Mapping sources: | ||
| - https://www.kreativekorp.com/charset/map/atascii/ | ||
| - https://github.com/JSJvR/atari-8-bit-utils | ||
| The inverse-video range (0x80-0xFF) maps to the same Unicode characters as | ||
| the corresponding normal byte (byte & 0x7F), except where a distinct glyph | ||
| exists (e.g. complementary block elements). This makes encoding lossy for | ||
| inverse bytes, which is the same trade-off as the PETSCII codec. | ||
| Notable: 0x9B is the ATASCII end-of-line character (mapped to U+000A LF). | ||
| """ | ||
| # std imports | ||
| import codecs | ||
| from typing import Dict, Tuple, Union | ||
| # Decoding Table -- ATASCII, 256 entries. | ||
| # | ||
| # 0x00-0x1F : graphics characters (heart, box drawing, triangles, etc.) | ||
| # 0x20-0x5F : standard ASCII (space, digits, uppercase, punctuation) | ||
| # 0x60 : diamond suit | ||
| # 0x61-0x7A : lowercase a-z | ||
| # 0x7B-0x7F : spade suit, pipe, clear-screen, backspace, tab glyphs | ||
| # 0x80-0xFF : inverse video (mostly same glyphs as 0x00-0x7F) | ||
| DECODING_TABLE = ( | ||
| # 0x00-0x1F: Graphics characters | ||
| '\u2665' # 0x00 BLACK HEART SUIT | ||
| '\u251c' # 0x01 BOX DRAWINGS LIGHT VERTICAL AND RIGHT | ||
| '\u23b9' # 0x02 RIGHT VERTICAL BOX LINE | ||
| '\u2518' # 0x03 BOX DRAWINGS LIGHT UP AND LEFT | ||
| '\u2524' # 0x04 BOX DRAWINGS LIGHT VERTICAL AND LEFT | ||
| '\u2510' # 0x05 BOX DRAWINGS LIGHT DOWN AND LEFT | ||
| '\u2571' # 0x06 BOX DRAWINGS LIGHT DIAGONAL UPPER RIGHT TO LOWER LEFT | ||
| '\u2572' # 0x07 BOX DRAWINGS LIGHT DIAGONAL UPPER LEFT TO LOWER RIGHT | ||
| '\u25e2' # 0x08 BLACK LOWER RIGHT TRIANGLE | ||
| '\u2597' # 0x09 QUADRANT LOWER RIGHT | ||
| '\u25e3' # 0x0A BLACK LOWER LEFT TRIANGLE | ||
| '\u259d' # 0x0B QUADRANT UPPER RIGHT | ||
| '\u2598' # 0x0C QUADRANT UPPER LEFT | ||
| '\U0001fb82' # 0x0D UPPER ONE QUARTER BLOCK | ||
| '\u2582' # 0x0E LOWER ONE QUARTER BLOCK | ||
| '\u2596' # 0x0F QUADRANT LOWER LEFT | ||
| '\u2663' # 0x10 BLACK CLUB SUIT | ||
| '\u250c' # 0x11 BOX DRAWINGS LIGHT DOWN AND RIGHT | ||
| '\u2500' # 0x12 BOX DRAWINGS LIGHT HORIZONTAL | ||
| '\u253c' # 0x13 BOX DRAWINGS LIGHT VERTICAL AND HORIZONTAL | ||
| '\u25cf' # 0x14 BLACK CIRCLE | ||
| '\u2584' # 0x15 LOWER HALF BLOCK | ||
| '\u258e' # 0x16 LEFT ONE QUARTER BLOCK | ||
| '\u252c' # 0x17 BOX DRAWINGS LIGHT DOWN AND HORIZONTAL | ||
| '\u2534' # 0x18 BOX DRAWINGS LIGHT UP AND HORIZONTAL | ||
| '\u258c' # 0x19 LEFT HALF BLOCK | ||
| '\u2514' # 0x1A BOX DRAWINGS LIGHT UP AND RIGHT | ||
| '\u241b' # 0x1B SYMBOL FOR ESCAPE | ||
| '\u2191' # 0x1C UPWARDS ARROW (cursor up) | ||
| '\u2193' # 0x1D DOWNWARDS ARROW (cursor down) | ||
| '\u2190' # 0x1E LEFTWARDS ARROW (cursor left) | ||
| '\u2192' # 0x1F RIGHTWARDS ARROW (cursor right) | ||
| # 0x20-0x5F: Standard ASCII | ||
| ' ' # 0x20 SPACE | ||
| '!' # 0x21 | ||
| '"' # 0x22 | ||
| '#' # 0x23 | ||
| '$' # 0x24 | ||
| '%' # 0x25 | ||
| '&' # 0x26 | ||
| "'" # 0x27 | ||
| '(' # 0x28 | ||
| ')' # 0x29 | ||
| '*' # 0x2A | ||
| '+' # 0x2B | ||
| ',' # 0x2C | ||
| '-' # 0x2D | ||
| '.' # 0x2E | ||
| '/' # 0x2F | ||
| '0' # 0x30 | ||
| '1' # 0x31 | ||
| '2' # 0x32 | ||
| '3' # 0x33 | ||
| '4' # 0x34 | ||
| '5' # 0x35 | ||
| '6' # 0x36 | ||
| '7' # 0x37 | ||
| '8' # 0x38 | ||
| '9' # 0x39 | ||
| ':' # 0x3A | ||
| ';' # 0x3B | ||
| '<' # 0x3C | ||
| '=' # 0x3D | ||
| '>' # 0x3E | ||
| '?' # 0x3F | ||
| '@' # 0x40 | ||
| 'A' # 0x41 | ||
| 'B' # 0x42 | ||
| 'C' # 0x43 | ||
| 'D' # 0x44 | ||
| 'E' # 0x45 | ||
| 'F' # 0x46 | ||
| 'G' # 0x47 | ||
| 'H' # 0x48 | ||
| 'I' # 0x49 | ||
| 'J' # 0x4A | ||
| 'K' # 0x4B | ||
| 'L' # 0x4C | ||
| 'M' # 0x4D | ||
| 'N' # 0x4E | ||
| 'O' # 0x4F | ||
| 'P' # 0x50 | ||
| 'Q' # 0x51 | ||
| 'R' # 0x52 | ||
| 'S' # 0x53 | ||
| 'T' # 0x54 | ||
| 'U' # 0x55 | ||
| 'V' # 0x56 | ||
| 'W' # 0x57 | ||
| 'X' # 0x58 | ||
| 'Y' # 0x59 | ||
| 'Z' # 0x5A | ||
| '[' # 0x5B | ||
| '\\' # 0x5C | ||
| ']' # 0x5D | ||
| '^' # 0x5E | ||
| '_' # 0x5F | ||
| # 0x60-0x7F: Lowercase + special glyphs | ||
| '\u2666' # 0x60 BLACK DIAMOND SUIT | ||
| 'a' # 0x61 | ||
| 'b' # 0x62 | ||
| 'c' # 0x63 | ||
| 'd' # 0x64 | ||
| 'e' # 0x65 | ||
| 'f' # 0x66 | ||
| 'g' # 0x67 | ||
| 'h' # 0x68 | ||
| 'i' # 0x69 | ||
| 'j' # 0x6A | ||
| 'k' # 0x6B | ||
| 'l' # 0x6C | ||
| 'm' # 0x6D | ||
| 'n' # 0x6E | ||
| 'o' # 0x6F | ||
| 'p' # 0x70 | ||
| 'q' # 0x71 | ||
| 'r' # 0x72 | ||
| 's' # 0x73 | ||
| 't' # 0x74 | ||
| 'u' # 0x75 | ||
| 'v' # 0x76 | ||
| 'w' # 0x77 | ||
| 'x' # 0x78 | ||
| 'y' # 0x79 | ||
| 'z' # 0x7A | ||
| '\u2660' # 0x7B BLACK SPADE SUIT | ||
| '|' # 0x7C VERTICAL LINE | ||
| '\u21b0' # 0x7D UPWARDS ARROW WITH TIP LEFTWARDS (clear screen) | ||
| '\u25c0' # 0x7E BLACK LEFT-POINTING TRIANGLE (backspace) | ||
| '\u25b6' # 0x7F BLACK RIGHT-POINTING TRIANGLE (tab) | ||
| # 0x80-0xFF: Inverse video range | ||
| # Bytes with distinct glyphs get their own Unicode mapping; | ||
| # the rest share the same character as (byte & 0x7F). | ||
| '\u2665' # 0x80 = inverse of 0x00 (heart) | ||
| '\u251c' # 0x81 = inverse of 0x01 | ||
| '\u258a' # 0x82 LEFT THREE QUARTERS BLOCK (distinct) | ||
| '\u2518' # 0x83 = inverse of 0x03 | ||
| '\u2524' # 0x84 = inverse of 0x04 | ||
| '\u2510' # 0x85 = inverse of 0x05 | ||
| '\u2571' # 0x86 = inverse of 0x06 | ||
| '\u2572' # 0x87 = inverse of 0x07 | ||
| '\u25e4' # 0x88 BLACK UPPER LEFT TRIANGLE (distinct) | ||
| '\u259b' # 0x89 QUADRANT UPPER LEFT AND UPPER RIGHT AND LOWER LEFT (distinct) | ||
| '\u25e5' # 0x8A BLACK UPPER RIGHT TRIANGLE (distinct) | ||
| '\u2599' # 0x8B QUADRANT UPPER LEFT AND LOWER LEFT AND LOWER RIGHT (distinct) | ||
| '\u259f' # 0x8C QUADRANT UPPER RIGHT AND LOWER LEFT AND LOWER RIGHT (distinct) | ||
| '\u2586' # 0x8D LOWER THREE QUARTERS BLOCK (distinct) | ||
| '\U0001fb85' # 0x8E UPPER THREE QUARTERS BLOCK (distinct) | ||
| '\u259c' # 0x8F QUADRANT UPPER LEFT AND UPPER RIGHT AND LOWER RIGHT (distinct) | ||
| '\u2663' # 0x90 = inverse of 0x10 (club) | ||
| '\u250c' # 0x91 = inverse of 0x11 | ||
| '\u2500' # 0x92 = inverse of 0x12 | ||
| '\u253c' # 0x93 = inverse of 0x13 | ||
| '\u25d8' # 0x94 INVERSE BULLET (distinct) | ||
| '\u2580' # 0x95 UPPER HALF BLOCK (distinct) | ||
| '\U0001fb8a' # 0x96 RIGHT THREE QUARTERS BLOCK (distinct) | ||
| '\u252c' # 0x97 = inverse of 0x17 | ||
| '\u2534' # 0x98 = inverse of 0x18 | ||
| '\u2590' # 0x99 RIGHT HALF BLOCK (distinct) | ||
| '\u2514' # 0x9A = inverse of 0x1A | ||
| '\n' # 0x9B ATASCII END OF LINE | ||
| '\u2191' # 0x9C = inverse of 0x1C (up arrow) | ||
| '\u2193' # 0x9D = inverse of 0x1D (down arrow) | ||
| '\u2190' # 0x9E = inverse of 0x1E (left arrow) | ||
| '\u2192' # 0x9F = inverse of 0x1F (right arrow) | ||
| '\u2588' # 0xA0 FULL BLOCK (distinct) | ||
| '!' # 0xA1 = inverse of 0x21 | ||
| '"' # 0xA2 = inverse of 0x22 | ||
| '#' # 0xA3 = inverse of 0x23 | ||
| '$' # 0xA4 = inverse of 0x24 | ||
| '%' # 0xA5 = inverse of 0x25 | ||
| '&' # 0xA6 = inverse of 0x26 | ||
| "'" # 0xA7 = inverse of 0x27 | ||
| '(' # 0xA8 = inverse of 0x28 | ||
| ')' # 0xA9 = inverse of 0x29 | ||
| '*' # 0xAA = inverse of 0x2A | ||
| '+' # 0xAB = inverse of 0x2B | ||
| ',' # 0xAC = inverse of 0x2C | ||
| '-' # 0xAD = inverse of 0x2D | ||
| '.' # 0xAE = inverse of 0x2E | ||
| '/' # 0xAF = inverse of 0x2F | ||
| '0' # 0xB0 = inverse of 0x30 | ||
| '1' # 0xB1 = inverse of 0x31 | ||
| '2' # 0xB2 = inverse of 0x32 | ||
| '3' # 0xB3 = inverse of 0x33 | ||
| '4' # 0xB4 = inverse of 0x34 | ||
| '5' # 0xB5 = inverse of 0x35 | ||
| '6' # 0xB6 = inverse of 0x36 | ||
| '7' # 0xB7 = inverse of 0x37 | ||
| '8' # 0xB8 = inverse of 0x38 | ||
| '9' # 0xB9 = inverse of 0x39 | ||
| ':' # 0xBA = inverse of 0x3A | ||
| ';' # 0xBB = inverse of 0x3B | ||
| '<' # 0xBC = inverse of 0x3C | ||
| '=' # 0xBD = inverse of 0x3D | ||
| '>' # 0xBE = inverse of 0x3E | ||
| '?' # 0xBF = inverse of 0x3F | ||
| '@' # 0xC0 = inverse of 0x40 | ||
| 'A' # 0xC1 = inverse of 0x41 | ||
| 'B' # 0xC2 = inverse of 0x42 | ||
| 'C' # 0xC3 = inverse of 0x43 | ||
| 'D' # 0xC4 = inverse of 0x44 | ||
| 'E' # 0xC5 = inverse of 0x45 | ||
| 'F' # 0xC6 = inverse of 0x46 | ||
| 'G' # 0xC7 = inverse of 0x47 | ||
| 'H' # 0xC8 = inverse of 0x48 | ||
| 'I' # 0xC9 = inverse of 0x49 | ||
| 'J' # 0xCA = inverse of 0x4A | ||
| 'K' # 0xCB = inverse of 0x4B | ||
| 'L' # 0xCC = inverse of 0x4C | ||
| 'M' # 0xCD = inverse of 0x4D | ||
| 'N' # 0xCE = inverse of 0x4E | ||
| 'O' # 0xCF = inverse of 0x4F | ||
| 'P' # 0xD0 = inverse of 0x50 | ||
| 'Q' # 0xD1 = inverse of 0x51 | ||
| 'R' # 0xD2 = inverse of 0x52 | ||
| 'S' # 0xD3 = inverse of 0x53 | ||
| 'T' # 0xD4 = inverse of 0x54 | ||
| 'U' # 0xD5 = inverse of 0x55 | ||
| 'V' # 0xD6 = inverse of 0x56 | ||
| 'W' # 0xD7 = inverse of 0x57 | ||
| 'X' # 0xD8 = inverse of 0x58 | ||
| 'Y' # 0xD9 = inverse of 0x59 | ||
| 'Z' # 0xDA = inverse of 0x5A | ||
| '[' # 0xDB = inverse of 0x5B | ||
| '\\' # 0xDC = inverse of 0x5C | ||
| ']' # 0xDD = inverse of 0x5D | ||
| '^' # 0xDE = inverse of 0x5E | ||
| '_' # 0xDF = inverse of 0x5F | ||
| '\u2666' # 0xE0 = inverse of 0x60 (diamond) | ||
| 'a' # 0xE1 = inverse of 0x61 | ||
| 'b' # 0xE2 = inverse of 0x62 | ||
| 'c' # 0xE3 = inverse of 0x63 | ||
| 'd' # 0xE4 = inverse of 0x64 | ||
| 'e' # 0xE5 = inverse of 0x65 | ||
| 'f' # 0xE6 = inverse of 0x66 | ||
| 'g' # 0xE7 = inverse of 0x67 | ||
| 'h' # 0xE8 = inverse of 0x68 | ||
| 'i' # 0xE9 = inverse of 0x69 | ||
| 'j' # 0xEA = inverse of 0x6A | ||
| 'k' # 0xEB = inverse of 0x6B | ||
| 'l' # 0xEC = inverse of 0x6C | ||
| 'm' # 0xED = inverse of 0x6D | ||
| 'n' # 0xEE = inverse of 0x6E | ||
| 'o' # 0xEF = inverse of 0x6F | ||
| 'p' # 0xF0 = inverse of 0x70 | ||
| 'q' # 0xF1 = inverse of 0x71 | ||
| 'r' # 0xF2 = inverse of 0x72 | ||
| 's' # 0xF3 = inverse of 0x73 | ||
| 't' # 0xF4 = inverse of 0x74 | ||
| 'u' # 0xF5 = inverse of 0x75 | ||
| 'v' # 0xF6 = inverse of 0x76 | ||
| 'w' # 0xF7 = inverse of 0x77 | ||
| 'x' # 0xF8 = inverse of 0x78 | ||
| 'y' # 0xF9 = inverse of 0x79 | ||
| 'z' # 0xFA = inverse of 0x7A | ||
| '\u2660' # 0xFB = inverse of 0x7B (spade) | ||
| '|' # 0xFC = inverse of 0x7C | ||
| '\u21b0' # 0xFD = inverse of 0x7D (clear screen) | ||
| '\u25c0' # 0xFE = inverse of 0x7E (backspace) | ||
| '\u25b6' # 0xFF = inverse of 0x7F (tab) | ||
| ) | ||
| assert len(DECODING_TABLE) == 256 | ||
| def _normalize_eol(text: str) -> str: | ||
| r""" | ||
| Normalize CR and CRLF to LF for ATASCII encoding. | ||
| ATASCII uses byte 0x9B as its end-of-line character, mapped to ``\n`` (U+000A). Standard CR | ||
| (U+000D) has no native ATASCII representation (byte 0x0D is a graphics character), so CR and | ||
| CRLF are both folded to LF before charmap encoding. | ||
| """ | ||
| return text.replace('\r\n', '\n').replace('\r', '\n') | ||
| class Codec(codecs.Codec): | ||
| """ATASCII character map codec.""" | ||
| def encode( # pylint: disable=redefined-builtin | ||
| self, input: str, errors: str = 'strict' | ||
| ) -> Tuple[bytes, int]: | ||
| """Encode input string using ATASCII character map.""" | ||
| input = _normalize_eol(input) | ||
| return codecs.charmap_encode(input, errors, ENCODING_TABLE) | ||
| def decode( # pylint: disable=redefined-builtin | ||
| self, input: bytes, errors: str = 'strict' | ||
| ) -> Tuple[str, int]: | ||
| """Decode input bytes using ATASCII character map.""" | ||
| return codecs.charmap_decode( | ||
| input, errors, DECODING_TABLE # type: ignore[arg-type] | ||
| ) | ||
| class IncrementalEncoder(codecs.IncrementalEncoder): | ||
| """ATASCII incremental encoder with CR/CRLF → LF normalization.""" | ||
| def __init__(self, errors: str = 'strict') -> None: | ||
| """Initialize encoder with pending CR state.""" | ||
| super().__init__(errors) | ||
| self._pending_cr = False | ||
| def encode( # pylint: disable=redefined-builtin | ||
| self, input: str, final: bool = False | ||
| ) -> bytes: | ||
| """Encode input string incrementally.""" | ||
| if self._pending_cr: | ||
| input = '\r' + input | ||
| self._pending_cr = False | ||
| if not final and input.endswith('\r'): | ||
| input = input[:-1] | ||
| self._pending_cr = True | ||
| input = _normalize_eol(input) | ||
| return codecs.charmap_encode(input, self.errors, ENCODING_TABLE)[0] | ||
| def reset(self) -> None: | ||
| """Reset encoder state.""" | ||
| self._pending_cr = False | ||
| def getstate(self) -> int: | ||
| """Return encoder state as integer.""" | ||
| return int(self._pending_cr) | ||
| def setstate(self, state: Union[int, str]) -> None: | ||
| """Restore encoder state from integer.""" | ||
| self._pending_cr = bool(state) | ||
| class IncrementalDecoder(codecs.IncrementalDecoder): | ||
| """ATASCII incremental decoder.""" | ||
| def decode( # type: ignore[override] # pylint: disable=redefined-builtin | ||
| self, input: bytes, final: bool = False | ||
| ) -> str: | ||
| """Decode input bytes incrementally.""" | ||
| return codecs.charmap_decode( | ||
| input, self.errors, DECODING_TABLE # type: ignore[arg-type] | ||
| )[0] | ||
| class StreamWriter(Codec, codecs.StreamWriter): | ||
| """ATASCII stream writer.""" | ||
| class StreamReader(Codec, codecs.StreamReader): | ||
| """ATASCII stream reader.""" | ||
| def getregentry() -> codecs.CodecInfo: | ||
| """Return the codec registry entry.""" | ||
| return codecs.CodecInfo( | ||
| name='atascii', | ||
| encode=Codec().encode, | ||
| decode=Codec().decode, # type: ignore[arg-type] | ||
| incrementalencoder=IncrementalEncoder, | ||
| incrementaldecoder=IncrementalDecoder, | ||
| streamreader=StreamReader, | ||
| streamwriter=StreamWriter, | ||
| ) | ||
| def getaliases() -> Tuple[str, ...]: | ||
| """Return codec aliases.""" | ||
| return ('atari8bit', 'atari_8bit') | ||
| # Build encoding table preferring normal bytes (0x00-0x7F) over inverse | ||
| # (0x80-0xFF). codecs.charmap_build() picks the last occurrence, which | ||
| # maps printable characters to the inverse-video range. We fix this by | ||
| # overwriting with normal-range entries so they take priority. | ||
| # Exception: '\n' must still encode to 0x9B (ATASCII EOL), not 0x0A. | ||
| def _build_encoding_table() -> Dict[int, int]: | ||
| table: Dict[int, int] = codecs.charmap_build(DECODING_TABLE) | ||
| for byte_val in range(0x80): | ||
| table[ord(DECODING_TABLE[byte_val])] = byte_val | ||
| # '\n' at 0x0A must encode to 0x9B (ATASCII EOL), not 0x0A | ||
| table[ord('\n')] = 0x9B | ||
| return table | ||
| ENCODING_TABLE = _build_encoding_table() |
| """Tests for the ATASCII (Atari 8-bit) codec.""" | ||
| # std imports | ||
| import codecs | ||
| # 3rd party | ||
| import pytest | ||
| # local | ||
| import telnetlib3 # noqa: F401 - registers codecs | ||
| from telnetlib3.encodings import atascii | ||
| def test_codec_lookup(): | ||
| info = codecs.lookup('atascii') | ||
| assert info.name == 'atascii' | ||
| @pytest.mark.parametrize("alias", ['atari8bit', 'atari_8bit']) | ||
| def test_codec_aliases(alias): | ||
| info = codecs.lookup(alias) | ||
| assert info.name == 'atascii' | ||
| def test_ascii_letters_uppercase(): | ||
| data = bytes(range(0x41, 0x5B)) | ||
| assert data.decode('atascii') == 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' | ||
| def test_ascii_letters_lowercase(): | ||
| data = bytes(range(0x61, 0x7B)) | ||
| assert data.decode('atascii') == 'abcdefghijklmnopqrstuvwxyz' | ||
| def test_digits(): | ||
| data = bytes(range(0x30, 0x3A)) | ||
| assert data.decode('atascii') == '0123456789' | ||
| def test_space(): | ||
| assert b'\x20'.decode('atascii') == ' ' | ||
| @pytest.mark.parametrize("byte_val,expected", [ | ||
| pytest.param(0x00, '\u2665', id='heart'), | ||
| pytest.param(0x01, '\u251c', id='box_vert_right'), | ||
| pytest.param(0x08, '\u25e2', id='lower_right_triangle'), | ||
| pytest.param(0x09, '\u2597', id='quadrant_lower_right'), | ||
| pytest.param(0x10, '\u2663', id='club'), | ||
| pytest.param(0x12, '\u2500', id='horizontal_line'), | ||
| pytest.param(0x14, '\u25cf', id='black_circle'), | ||
| pytest.param(0x15, '\u2584', id='lower_half_block'), | ||
| pytest.param(0x19, '\u258c', id='left_half_block'), | ||
| pytest.param(0x1B, '\u241b', id='symbol_for_escape'), | ||
| ]) | ||
| def test_graphics_chars(byte_val, expected): | ||
| assert bytes([byte_val]).decode('atascii') == expected | ||
| @pytest.mark.parametrize("byte_val,expected", [ | ||
| pytest.param(0x1C, '\u2191', id='cursor_up'), | ||
| pytest.param(0x1D, '\u2193', id='cursor_down'), | ||
| pytest.param(0x1E, '\u2190', id='cursor_left'), | ||
| pytest.param(0x1F, '\u2192', id='cursor_right'), | ||
| ]) | ||
| def test_cursor_arrows(byte_val, expected): | ||
| assert bytes([byte_val]).decode('atascii') == expected | ||
| @pytest.mark.parametrize("byte_val,expected", [ | ||
| pytest.param(0x60, '\u2666', id='diamond'), | ||
| pytest.param(0x7B, '\u2660', id='spade'), | ||
| pytest.param(0x7C, '|', id='pipe'), | ||
| pytest.param(0x7D, '\u21b0', id='clear_screen'), | ||
| pytest.param(0x7E, '\u25c0', id='backspace_triangle'), | ||
| pytest.param(0x7F, '\u25b6', id='tab_triangle'), | ||
| ]) | ||
| def test_special_glyphs(byte_val, expected): | ||
| assert bytes([byte_val]).decode('atascii') == expected | ||
| def test_atascii_eol(): | ||
| assert b'\x9b'.decode('atascii') == '\n' | ||
| @pytest.mark.parametrize("byte_val,expected", [ | ||
| pytest.param(0x82, '\u258a', id='left_three_quarters'), | ||
| pytest.param(0x88, '\u25e4', id='upper_left_triangle'), | ||
| pytest.param(0x89, '\u259b', id='quadrant_UL_UR_LL'), | ||
| pytest.param(0x8A, '\u25e5', id='upper_right_triangle'), | ||
| pytest.param(0x8B, '\u2599', id='quadrant_UL_LL_LR'), | ||
| pytest.param(0x8C, '\u259f', id='quadrant_UR_LL_LR'), | ||
| pytest.param(0x8D, '\u2586', id='lower_three_quarters'), | ||
| pytest.param(0x8E, '\U0001fb85', id='upper_three_quarters'), | ||
| pytest.param(0x8F, '\u259c', id='quadrant_UL_UR_LR'), | ||
| pytest.param(0x94, '\u25d8', id='inverse_bullet'), | ||
| pytest.param(0x95, '\u2580', id='upper_half_block'), | ||
| pytest.param(0x96, '\U0001fb8a', id='right_three_quarters'), | ||
| pytest.param(0x99, '\u2590', id='right_half_block'), | ||
| pytest.param(0xA0, '\u2588', id='full_block'), | ||
| ]) | ||
| def test_inverse_distinct_glyphs(byte_val, expected): | ||
| assert bytes([byte_val]).decode('atascii') == expected | ||
| def test_inverse_shared_glyphs(): | ||
| for byte_val in (0x80, 0x81, 0x83, 0x84, 0x85, 0x86, 0x87): | ||
| normal = bytes([byte_val & 0x7F]).decode('atascii') | ||
| inverse = bytes([byte_val]).decode('atascii') | ||
| assert inverse == normal | ||
| def test_inverse_ascii_range(): | ||
| for byte_val in range(0xA1, 0xFB): | ||
| normal = bytes([byte_val & 0x7F]).decode('atascii') | ||
| inverse = bytes([byte_val]).decode('atascii') | ||
| assert inverse == normal | ||
| def test_full_decode_no_crash(): | ||
| data = bytes(range(256)) | ||
| result = data.decode('atascii') | ||
| assert len(result) == 256 | ||
| def test_encode_eol(): | ||
| encoded, length = codecs.lookup('atascii').encode('\n') | ||
| assert encoded == b'\x9b' | ||
| assert length == 1 | ||
| def test_encode_unique_chars(): | ||
| encoded, _ = codecs.lookup('atascii').encode('\u258a') | ||
| assert encoded == b'\x82' | ||
| encoded, _ = codecs.lookup('atascii').encode('\u25d8') | ||
| assert encoded == b'\x94' | ||
| encoded, _ = codecs.lookup('atascii').encode('\u2588') | ||
| assert encoded == b'\xa0' | ||
| def test_encode_charmap_prefers_normal_byte(): | ||
| encoded, _ = codecs.lookup('atascii').encode('\u2665') | ||
| assert encoded == b'\x00' | ||
| encoded, _ = codecs.lookup('atascii').encode('A') | ||
| assert encoded == b'\x41' | ||
| def test_incremental_decoder(): | ||
| decoder = codecs.getincrementaldecoder('atascii')() | ||
| assert decoder.decode(b'\x00', False) == '\u2665' | ||
| assert decoder.decode(b'AB', True) == 'AB' | ||
| def test_incremental_encoder(): | ||
| encoder = codecs.getincrementalencoder('atascii')() | ||
| assert encoder.encode('\u258a', False) == b'\x82' | ||
| assert encoder.encode('\n', True) == b'\x9b' | ||
| def test_strict_error_on_unencodable(): | ||
| with pytest.raises(UnicodeEncodeError): | ||
| '\u00e9'.encode('atascii') | ||
| def test_replace_error_mode(): | ||
| result = '\u00e9'.encode('atascii', errors='replace') | ||
| assert result == b'\x3f' | ||
| def test_ignore_error_mode(): | ||
| result = '\u00e9'.encode('atascii', errors='ignore') | ||
| assert result == b'' | ||
| def test_encode_cr_as_eol(): | ||
| encoded, length = codecs.lookup('atascii').encode('\r') | ||
| assert encoded == b'\x9b' | ||
| assert length == 1 | ||
| def test_encode_crlf_as_single_eol(): | ||
| encoded, length = codecs.lookup('atascii').encode('\r\n') | ||
| assert encoded == b'\x9b' | ||
| assert length == 1 | ||
| def test_encode_mixed_line_endings(): | ||
| encoded, _ = codecs.lookup('atascii').encode('hello\r\nworld\r') | ||
| hello_eol = 'hello\n'.encode('atascii') | ||
| world_eol = 'world\n'.encode('atascii') | ||
| assert encoded == hello_eol + world_eol | ||
| def test_incremental_encoder_cr_then_lf(): | ||
| encoder = codecs.getincrementalencoder('atascii')() | ||
| result = encoder.encode('hello\r', final=False) | ||
| assert result == 'hello'.encode('atascii') | ||
| result = encoder.encode('\nworld', final=True) | ||
| assert result == '\nworld'.encode('atascii') | ||
| def test_incremental_encoder_cr_then_other(): | ||
| encoder = codecs.getincrementalencoder('atascii')() | ||
| result = encoder.encode('A\r', final=False) | ||
| assert result == 'A'.encode('atascii') | ||
| result = encoder.encode('B', final=True) | ||
| assert result == '\nB'.encode('atascii') | ||
| def test_incremental_encoder_cr_final(): | ||
| encoder = codecs.getincrementalencoder('atascii')() | ||
| result = encoder.encode('end\r', final=True) | ||
| assert result == 'end\n'.encode('atascii') | ||
| def test_incremental_encoder_reset(): | ||
| encoder = codecs.getincrementalencoder('atascii')() | ||
| encoder.encode('A\r', final=False) | ||
| encoder.reset() | ||
| assert encoder.getstate() == 0 | ||
| def test_incremental_encoder_getstate_setstate(): | ||
| encoder = codecs.getincrementalencoder('atascii')() | ||
| encoder.encode('A\r', final=False) | ||
| assert encoder.getstate() == 1 | ||
| state = encoder.getstate() | ||
| encoder2 = codecs.getincrementalencoder('atascii')() | ||
| encoder2.setstate(state) | ||
| result = encoder2.encode('\n', final=True) | ||
| assert result == b'\x9b' | ||
| def test_native_graphics_0x0a_0x0d(): | ||
| assert b'\x0a'.decode('atascii') == '\u25e3' | ||
| assert b'\x0d'.decode('atascii') == '\U0001fb82' | ||
| def test_decoding_table_length(): | ||
| assert len(atascii.DECODING_TABLE) == 256 |
| """Tests for telnetlib3.client_shell — Terminal mode handling.""" | ||
| # std imports | ||
| import sys | ||
| import types | ||
| # 3rd party | ||
| import pytest | ||
| if sys.platform == "win32": | ||
| pytest.skip("POSIX-only tests", allow_module_level=True) | ||
| # std imports | ||
| # std imports (POSIX only) | ||
| import termios # noqa: E402 | ||
| # local | ||
| from telnetlib3.client_shell import ( # noqa: E402 | ||
| _INPUT_XLAT, | ||
| _INPUT_SEQ_XLAT, | ||
| Terminal, | ||
| InputFilter, | ||
| ) | ||
| def _make_writer(will_echo: bool = False, raw_mode: bool = False) -> object: | ||
| """Build a minimal mock writer with the attributes Terminal needs.""" | ||
| writer = types.SimpleNamespace( | ||
| will_echo=will_echo, | ||
| log=types.SimpleNamespace(debug=lambda *a, **kw: None), | ||
| ) | ||
| if raw_mode: | ||
| writer._raw_mode = True | ||
| return writer | ||
| def _cooked_mode() -> "Terminal.ModeDef": | ||
| """Return a typical cooked-mode ModeDef with canonical input enabled.""" | ||
| return Terminal.ModeDef( | ||
| iflag=termios.BRKINT | termios.ICRNL | termios.IXON, | ||
| oflag=termios.OPOST | termios.ONLCR, | ||
| cflag=termios.CS8 | termios.CREAD, | ||
| lflag=termios.ICANON | termios.ECHO | termios.ISIG | termios.IEXTEN, | ||
| ispeed=termios.B38400, | ||
| ospeed=termios.B38400, | ||
| cc=[b'\x00'] * termios.NCCS, | ||
| ) | ||
| class TestDetermineMode: | ||
| def test_linemode_when_no_echo_no_raw(self) -> None: | ||
| writer = _make_writer(will_echo=False, raw_mode=False) | ||
| term = Terminal.__new__(Terminal) | ||
| term.telnet_writer = writer | ||
| mode = _cooked_mode() | ||
| result = term.determine_mode(mode) | ||
| assert result is mode | ||
| def test_raw_mode_when_will_echo(self) -> None: | ||
| writer = _make_writer(will_echo=True, raw_mode=False) | ||
| term = Terminal.__new__(Terminal) | ||
| term.telnet_writer = writer | ||
| mode = _cooked_mode() | ||
| result = term.determine_mode(mode) | ||
| assert result is not mode | ||
| assert not (result.lflag & termios.ICANON) | ||
| assert not (result.lflag & termios.ECHO) | ||
| def test_raw_mode_when_force_raw(self) -> None: | ||
| writer = _make_writer(will_echo=False, raw_mode=True) | ||
| term = Terminal.__new__(Terminal) | ||
| term.telnet_writer = writer | ||
| mode = _cooked_mode() | ||
| result = term.determine_mode(mode) | ||
| assert result is not mode | ||
| assert not (result.lflag & termios.ICANON) | ||
| assert not (result.lflag & termios.ECHO) | ||
| assert not (result.oflag & termios.OPOST) | ||
| def test_raw_mode_when_both_echo_and_raw(self) -> None: | ||
| writer = _make_writer(will_echo=True, raw_mode=True) | ||
| term = Terminal.__new__(Terminal) | ||
| term.telnet_writer = writer | ||
| mode = _cooked_mode() | ||
| result = term.determine_mode(mode) | ||
| assert result is not mode | ||
| assert not (result.lflag & termios.ICANON) | ||
| assert not (result.lflag & termios.ECHO) | ||
| class TestInputXlat: | ||
| def test_atascii_del_to_backspace(self) -> None: | ||
| assert _INPUT_XLAT["atascii"][0x7F] == 0x7E | ||
| def test_atascii_bs_to_backspace(self) -> None: | ||
| assert _INPUT_XLAT["atascii"][0x08] == 0x7E | ||
| def test_atascii_cr_to_eol(self) -> None: | ||
| assert _INPUT_XLAT["atascii"][0x0D] == 0x9B | ||
| def test_atascii_lf_to_eol(self) -> None: | ||
| assert _INPUT_XLAT["atascii"][0x0A] == 0x9B | ||
| def test_petscii_del_to_backspace(self) -> None: | ||
| assert _INPUT_XLAT["petscii"][0x7F] == 0x14 | ||
| def test_petscii_bs_to_backspace(self) -> None: | ||
| assert _INPUT_XLAT["petscii"][0x08] == 0x14 | ||
| def test_normal_bytes_not_in_xlat(self) -> None: | ||
| xlat = _INPUT_XLAT["atascii"] | ||
| for b in (ord('a'), ord('A'), ord('1'), ord(' ')): | ||
| assert b not in xlat | ||
| class TestInputFilterAtascii: | ||
| @staticmethod | ||
| def _make_filter() -> InputFilter: | ||
| return InputFilter( | ||
| _INPUT_SEQ_XLAT["atascii"], _INPUT_XLAT["atascii"] | ||
| ) | ||
| @pytest.mark.parametrize("seq,expected", list(_INPUT_SEQ_XLAT["atascii"].items())) | ||
| def test_sequence_translated(self, seq: bytes, expected: bytes) -> None: | ||
| f = self._make_filter() | ||
| assert f.feed(seq) == expected | ||
| def test_passthrough_ascii(self) -> None: | ||
| f = self._make_filter() | ||
| assert f.feed(b"hello") == b"hello" | ||
| def test_mixed_text_and_sequence(self) -> None: | ||
| f = self._make_filter() | ||
| assert f.feed(b"hi\x1b[Alo") == b"hi\x1clo" | ||
| def test_multiple_sequences(self) -> None: | ||
| f = self._make_filter() | ||
| assert f.feed(b"\x1b[A\x1b[B\x1b[C\x1b[D") == b"\x1c\x1d\x1f\x1e" | ||
| def test_single_byte_xlat_applied(self) -> None: | ||
| f = self._make_filter() | ||
| assert f.feed(b"\x7f") == b"\x7e" | ||
| assert f.feed(b"\x08") == b"\x7e" | ||
| def test_cr_to_atascii_eol(self) -> None: | ||
| f = self._make_filter() | ||
| assert f.feed(b"\r") == b"\x9b" | ||
| def test_lf_to_atascii_eol(self) -> None: | ||
| f = self._make_filter() | ||
| assert f.feed(b"\n") == b"\x9b" | ||
| def test_text_with_enter(self) -> None: | ||
| f = self._make_filter() | ||
| assert f.feed(b"hello\r") == b"hello\x9b" | ||
| def test_split_sequence_buffered(self) -> None: | ||
| f = self._make_filter() | ||
| assert f.feed(b"\x1b") == b"" | ||
| assert f.feed(b"[A") == b"\x1c" | ||
| def test_split_sequence_mid_csi(self) -> None: | ||
| f = self._make_filter() | ||
| assert f.feed(b"\x1b[") == b"" | ||
| assert f.feed(b"A") == b"\x1c" | ||
| def test_bare_esc_flushed_on_non_prefix(self) -> None: | ||
| f = self._make_filter() | ||
| assert f.feed(b"\x1b") == b"" | ||
| assert f.feed(b"x") == b"\x1bx" | ||
| def test_delete_key_sequence(self) -> None: | ||
| f = self._make_filter() | ||
| assert f.feed(b"\x1b[3~") == b"\x7e" | ||
| def test_ss3_arrow_keys(self) -> None: | ||
| f = self._make_filter() | ||
| assert f.feed(b"\x1bOA") == b"\x1c" | ||
| assert f.feed(b"\x1bOD") == b"\x1e" | ||
| class TestInputFilterPetscii: | ||
| @staticmethod | ||
| def _make_filter() -> InputFilter: | ||
| return InputFilter( | ||
| _INPUT_SEQ_XLAT["petscii"], _INPUT_XLAT["petscii"] | ||
| ) | ||
| @pytest.mark.parametrize("seq,expected", list(_INPUT_SEQ_XLAT["petscii"].items())) | ||
| def test_sequence_translated(self, seq: bytes, expected: bytes) -> None: | ||
| f = self._make_filter() | ||
| assert f.feed(seq) == expected | ||
| def test_home_key(self) -> None: | ||
| f = self._make_filter() | ||
| assert f.feed(b"\x1b[H") == b"\x13" | ||
| def test_insert_key(self) -> None: | ||
| f = self._make_filter() | ||
| assert f.feed(b"\x1b[2~") == b"\x94" | ||
| def test_single_byte_xlat_applied(self) -> None: | ||
| f = self._make_filter() | ||
| assert f.feed(b"\x7f") == b"\x14" | ||
| class TestInputFilterEmpty: | ||
| def test_no_xlat_passthrough(self) -> None: | ||
| f = InputFilter({}, {}) | ||
| assert f.feed(b"hello\x1b[Aworld") == b"hello\x1b[Aworld" | ||
| def test_empty_feed(self) -> None: | ||
| f = InputFilter({}, {}) | ||
| assert f.feed(b"") == b"" |
| """Tests for the PETSCII (Commodore 64/128) codec.""" | ||
| # std imports | ||
| import codecs | ||
| # 3rd party | ||
| import pytest | ||
| # local | ||
| import telnetlib3 # noqa: F401 - registers codecs | ||
| from telnetlib3.encodings import petscii | ||
| def test_codec_lookup(): | ||
| info = codecs.lookup('petscii') | ||
| assert info.name == 'petscii' | ||
| @pytest.mark.parametrize("alias", ['cbm', 'commodore', 'c64', 'c128']) | ||
| def test_codec_aliases(alias): | ||
| info = codecs.lookup(alias) | ||
| assert info.name == 'petscii' | ||
| def test_digits(): | ||
| data = bytes(range(0x30, 0x3A)) | ||
| assert data.decode('petscii') == '0123456789' | ||
| def test_space(): | ||
| assert b'\x20'.decode('petscii') == ' ' | ||
| def test_lowercase_at_41_5A(): | ||
| data = bytes(range(0x41, 0x5B)) | ||
| assert data.decode('petscii') == 'abcdefghijklmnopqrstuvwxyz' | ||
| def test_uppercase_at_C1_DA(): | ||
| data = bytes(range(0xC1, 0xDB)) | ||
| assert data.decode('petscii') == 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' | ||
| def test_return(): | ||
| assert b'\x0d'.decode('petscii') == '\r' | ||
| @pytest.mark.parametrize("byte_val,expected", [ | ||
| pytest.param(0x5C, '\u00a3', id='pound_sign'), | ||
| pytest.param(0x5E, '\u2191', id='up_arrow'), | ||
| pytest.param(0x5F, '\u2190', id='left_arrow'), | ||
| pytest.param(0x61, '\u2660', id='spade'), | ||
| pytest.param(0x7E, '\u03c0', id='pi'), | ||
| pytest.param(0x78, '\u2663', id='club'), | ||
| pytest.param(0x7A, '\u2666', id='diamond'), | ||
| pytest.param(0x73, '\u2665', id='heart'), | ||
| ]) | ||
| def test_graphics_chars(byte_val, expected): | ||
| assert bytes([byte_val]).decode('petscii') == expected | ||
| def test_full_decode_no_crash(): | ||
| data = bytes(range(256)) | ||
| result = data.decode('petscii') | ||
| assert len(result) == 256 | ||
| def test_encode_lowercase(): | ||
| encoded, length = codecs.lookup('petscii').encode('hello') | ||
| assert encoded == bytes([0x48, 0x45, 0x4c, 0x4c, 0x4f]) | ||
| assert length == 5 | ||
| def test_encode_uppercase(): | ||
| encoded, length = codecs.lookup('petscii').encode('HELLO') | ||
| assert encoded == b'\xc8\xc5\xcc\xcc\xcf' | ||
| assert length == 5 | ||
| def test_round_trip_digits(): | ||
| for byte_val in range(0x30, 0x3A): | ||
| original = bytes([byte_val]) | ||
| decoded = original.decode('petscii') | ||
| re_encoded = decoded.encode('petscii') | ||
| assert re_encoded == original | ||
| def test_incremental_decoder(): | ||
| decoder = codecs.getincrementaldecoder('petscii')() | ||
| assert decoder.decode(b'\xc1', False) == 'A' | ||
| assert decoder.decode(b'\x42\x43', True) == 'bc' | ||
| def test_incremental_encoder(): | ||
| encoder = codecs.getincrementalencoder('petscii')() | ||
| assert encoder.encode('A', False) == b'\xc1' | ||
| assert encoder.encode('bc', True) == b'\x42\x43' | ||
| def test_decoding_table_length(): | ||
| assert len(petscii.DECODING_TABLE) == 256 |
+2
-2
@@ -71,6 +71,6 @@ # std imports | ||
| # The short X.Y version. | ||
| version = "2.4" | ||
| version = "2.5" | ||
| # The full version, including alpha/beta/rc tags. | ||
| release = "2.4.0" # keep in sync with pyproject.toml and telnetlib3/accessories.py !! | ||
| release = "2.5.0" # keep in sync with pyproject.toml and telnetlib3/accessories.py !! | ||
@@ -77,0 +77,0 @@ # The language for content auto-generated by Sphinx. Refer to documentation |
+82
-0
@@ -208,2 +208,46 @@ ========= | ||
| Retro BBS Encodings | ||
| ~~~~~~~~~~~~~~~~~~~ | ||
| telnetlib3 includes custom codecs for retro computing platforms commonly | ||
| found on telnet BBS systems: | ||
| - **ATASCII** (``--encoding=atascii``) -- Atari 8-bit computers (400, 800, | ||
| XL, XE). Graphics characters at 0x00-0x1F, card suits, box drawing, and | ||
| an inverse-video range at 0x80-0xFF. The ATASCII end-of-line character | ||
| (0x9B) maps to newline. Aliases: ``atari8bit``, ``atari_8bit``. | ||
| - **PETSCII** (``--encoding=petscii``) -- Commodore 64/128 shifted | ||
| (lowercase) mode. Lowercase a-z at 0x41-0x5A, uppercase A-Z at | ||
| 0xC1-0xDA. Aliases: ``cbm``, ``commodore``, ``c64``, ``c128``. | ||
| - **Atari ST** (``--encoding=atarist``) -- Atari ST character set with | ||
| extended Latin, Greek, and math symbols. Alias: ``atari``. | ||
| These encodings use bytes 0x80-0xFF for standard glyphs, which conflicts | ||
| with the telnet protocol's default 7-bit NVT mode. When any of these | ||
| encodings is selected, ``--force-binary`` is automatically enabled so that | ||
| high-bit bytes are transmitted without requiring BINARY option negotiation. | ||
| PETSCII inline color codes are translated to ANSI 24-bit RGB using the | ||
| VIC-II C64 palette, and cursor control codes (up/down/left/right, HOME, | ||
| CLR, DEL) are translated to ANSI sequences. ATASCII control character | ||
| glyphs (cursor movement, backspace, clear screen) are similarly translated. | ||
| Keyboard input is also mapped: arrow keys, backspace, delete, and enter | ||
| produce the correct raw bytes for each encoding:: | ||
| telnetlib3-client --encoding=atascii area52.tk 5200 | ||
| telnetlib3-client --encoding=petscii bbs.example.com 6400 | ||
| ``telnetlib3-fingerprint`` decodes and translates banners with these | ||
| encodings, including PETSCII colors. | ||
| SyncTERM Font Detection | ||
| ^^^^^^^^^^^^^^^^^^^^^^^^ | ||
| When a server sends a SyncTERM/CTerm font selection sequence | ||
| (``CSI Ps1 ; Ps2 SP D``), both ``telnetlib3-client`` and | ||
| ``telnetlib3-fingerprint`` automatically switch the session encoding | ||
| to match the font (e.g. font 36 = ATASCII, 32-35 = PETSCII, 0 = CP437). | ||
| An explicit ``--encoding`` flag takes precedence over font detection. | ||
| Line Endings | ||
@@ -234,2 +278,40 @@ ~~~~~~~~~~~~ | ||
| Raw Mode and Line Mode | ||
| ~~~~~~~~~~~~~~~~~~~~~~ | ||
| ``telnetlib3-client`` defaults to **raw terminal mode** -- the local | ||
| terminal is set to raw (no line buffering, no local echo, no signal | ||
| processing), and each keystroke is sent to the server immediately. This | ||
| is the correct mode for most BBS and MUD servers that handle their own | ||
| echo and line editing. | ||
| Use ``--line-mode`` to switch to line-buffered input with local echo, | ||
| which is appropriate for simple command-line services that expect the | ||
| client to perform local line editing:: | ||
| # Default: raw mode (correct for most servers) | ||
| telnetlib3-client bbs.example.com | ||
| # Line mode: local echo and line buffering | ||
| telnetlib3-client --line-mode simple-service.example.com | ||
| Similarly, ``telnetlib3-server --pty-exec`` defaults to raw PTY mode | ||
| (disabling PTY echo), which is correct for programs that handle their own | ||
| terminal I/O (curses, blessed, etc.). Use ``--line-mode`` for programs | ||
| that expect cooked/canonical PTY mode:: | ||
| # Default: raw PTY (correct for curses programs) | ||
| telnetlib3-server --pty-exec /bin/bash -- --login | ||
| # Line mode: cooked PTY with echo (for simple programs like bc) | ||
| telnetlib3-server --pty-exec /bin/bc --line-mode | ||
| Debugging | ||
| ~~~~~~~~~ | ||
| Use ``--loglevel=trace`` to see hexdump-style output of all bytes sent | ||
| and received on the wire:: | ||
| telnetlib3-client --loglevel=trace --logfile=debug.log bbs.example.com | ||
| server_binary.py | ||
@@ -236,0 +318,0 @@ ~~~~~~~~~~~~~~~~ |
+51
-1
| History | ||
| ======= | ||
| 2.5.0 | ||
| * change: ``telnetlib3-client`` now defaults to raw terminal mode (no line buffering, no local | ||
| echo), which is correct for most servers. Use ``--line-mode`` to restore line-buffered | ||
| local-echo behavior. | ||
| * change: ``telnetlib3-server --pty-exec`` now defaults to raw PTY mode. Use ``--line-mode`` to | ||
| restore cooked PTY mode with echo. | ||
| * change: ``connect_minwait`` default reduced to 0 across | ||
| :class:`~telnetlib3.client_base.BaseClient`, :func:`~telnetlib3.client.open_connection`, and | ||
| ``telnetlib3-client``. Negotiation continues asynchronously. Use ``--connect-minwait`` to | ||
| restore a delay if needed, or, use :meth:`~telnetlib3.stream_writer.TelnetWriter.wait_for` in | ||
| server or client shells to await a specific negotiation state. | ||
| * new: Color, keyboard input translation and ``--encoding`` support for ATASCII (ATARI ASCII) and | ||
| PETSCII (Commodore ASCII). | ||
| * new: SyncTERM/CTerm font selection sequence detection (``CSI Ps1 ; Ps2 SP D``). Both | ||
| ``telnetlib3-fingerprint`` and ``telnetlib3-client`` detect font switching and auto-switch | ||
| encoding to the matching codec (e.g. font 36 = ATASCII, 32-35 = PETSCII, 0 = CP437). Explicit | ||
| ``--encoding`` takes precedence. | ||
| * new: :data:`~telnetlib3.accessories.TRACE` log level (5, below ``DEBUG``) with | ||
| :func:`~telnetlib3.accessories.hexdump` style output for all sent and received bytes. Use | ||
| ``--loglevel=trace``. | ||
| * bugfix: :func:`~telnetlib3.guard_shells.robot_check` now uses a narrow | ||
| character (space) instead of a wide Unicode character, allowing retro | ||
| terminal emulators to pass. | ||
| * bugfix: ATASCII codec now maps bytes 0x0D and 0x0A to CR and LF instead | ||
| of graphics characters, fixing garbled output when connecting to Atari | ||
| BBS systems. | ||
| * bugfix: ATASCII codec normalizes CR and CRLF to the native ATASCII | ||
| EOL (0x9B) during encoding, so the Return key works correctly. | ||
| * bugfix: PETSCII bare CR (0x0D) is now normalized to CRLF in raw | ||
| terminal mode and to LF in ``telnetlib3-fingerprint`` banners. | ||
| * bugfix: ``telnetlib3-fingerprint`` re-encodes prompt responses for retro | ||
| encodings so servers receive the correct EOL byte. | ||
| * bugfix: ``telnetlib3-fingerprint`` no longer crashes with | ||
| ``LookupError`` when the server negotiates an unknown charset. | ||
| Banner formatting falls back to ``latin-1``. | ||
| * bugfix: :meth:`~telnetlib3.client.TelnetClient.send_charset` normalises | ||
| non-standard encoding names (``iso-8859-02`` to ``iso-8859-2``, | ||
| ``cp-1250`` to ``cp1250``, etc.). | ||
| * enhancement: ``telnetlib3-fingerprint`` responds more like a terminal and to more | ||
| y/n prompts about colors, encoding, etc. to collect more banners for https://bbs.modem.xyz/ | ||
| project. | ||
| * enhancement: ``telnetlib3-fingerprint`` banner formatting uses | ||
| ``surrogateescape`` error handler, preserving raw high bytes (e.g. CP437 | ||
| art) as surrogates instead of replacing them with U+FFFD. | ||
@@ -31,3 +75,9 @@ 2.4.0 | ||
| default raised from 1024 to 65536. | ||
| * enhancement: new ``--encoding=petscii`` and ``--encoding=atarist`` | ||
| * new: ATASCII (Atari 8-bit) codec -- ``--encoding=atascii`` for connecting | ||
| to Atari BBS systems. Maps all 256 byte values to Unicode including | ||
| graphics characters, card suits, and the inverse-video range (0x80-0xFF). | ||
| ATASCII EOL (0x9B) maps to newline. Aliases: ``atari8bit``, ``atari_8bit``. | ||
| * enhancement: ``--encoding=atascii``, ``--encoding=petscii``, and | ||
| ``--encoding=atarist`` now auto-enable ``--force-binary`` for both client | ||
| and server, since these encodings use bytes 0x80-0xFF for standard glyphs. | ||
| * bugfix: rare LINEMODE ACK loop with misbehaving servers that re-send | ||
@@ -34,0 +84,0 @@ unchanged MODE without ACK. |
+5
-5
| Metadata-Version: 2.4 | ||
| Name: telnetlib3 | ||
| Version: 2.4.0 | ||
| Version: 2.5.0 | ||
| Summary: Python Telnet server and client CLI and Protocol library | ||
@@ -113,6 +113,6 @@ Project-URL: Homepage, https://github.com/jquast/telnetlib3 | ||
| telnetlib3-server 0.0.0.0 1984 --shell=bin.server_wargame.shell | ||
| # run an external program with a pseudo-terminal | ||
| telnetlib3-server --pty-exec /bin/bash --pty-raw -- --login | ||
| # or a simple linemode program, bc (calculator) | ||
| telnetlib3-server --pty-exec /bin/bc | ||
| # run an external program with a pseudo-terminal (raw mode is default) | ||
| telnetlib3-server --pty-exec /bin/bash -- --login | ||
| # or a linemode program, bc (calculator) | ||
| telnetlib3-server --pty-exec /bin/bc --line-mode | ||
@@ -119,0 +119,0 @@ |
+1
-1
@@ -7,3 +7,3 @@ [build-system] | ||
| name = "telnetlib3" | ||
| version = "2.4.0" | ||
| version = "2.5.0" | ||
| description = " Python Telnet server and client CLI and Protocol library" | ||
@@ -10,0 +10,0 @@ readme = "README.rst" |
+4
-4
@@ -75,6 +75,6 @@ .. image:: https://img.shields.io/pypi/v/telnetlib3.svg | ||
| telnetlib3-server 0.0.0.0 1984 --shell=bin.server_wargame.shell | ||
| # run an external program with a pseudo-terminal | ||
| telnetlib3-server --pty-exec /bin/bash --pty-raw -- --login | ||
| # or a simple linemode program, bc (calculator) | ||
| telnetlib3-server --pty-exec /bin/bc | ||
| # run an external program with a pseudo-terminal (raw mode is default) | ||
| telnetlib3-server --pty-exec /bin/bash -- --login | ||
| # or a linemode program, bc (calculator) | ||
| telnetlib3-server --pty-exec /bin/bc --line-mode | ||
@@ -81,0 +81,0 @@ |
@@ -12,2 +12,6 @@ """Accessory functions.""" | ||
| #: Custom TRACE log level, below DEBUG (10). | ||
| TRACE = 5 | ||
| logging.addLevelName(TRACE, "TRACE") | ||
| if TYPE_CHECKING: # pragma: no cover | ||
@@ -18,5 +22,7 @@ # local | ||
| __all__ = ( | ||
| "TRACE", | ||
| "encoding_from_lang", | ||
| "name_unicode", | ||
| "eightbits", | ||
| "hexdump", | ||
| "make_logger", | ||
@@ -42,3 +48,3 @@ "repr_mapping", | ||
| """Return the current version of telnetlib3.""" | ||
| return "2.3.0" # keep in sync with pyproject.toml and docs/conf.py !! | ||
| return "2.5.0" # keep in sync with pyproject.toml and docs/conf.py !! | ||
@@ -102,2 +108,25 @@ | ||
| def hexdump(data: bytes, prefix: str = "") -> str: | ||
| """ | ||
| Format *data* as ``hexdump -C`` style output. | ||
| Each 16-byte row shows the offset, hex bytes grouped 8+8, | ||
| and printable ASCII on the right:: | ||
| 00000000 48 65 6c 6c 6f 20 57 6f 72 6c 64 0d 0a |Hello World..| | ||
| :param data: Raw bytes to format. | ||
| :param prefix: String prepended to every line (e.g. ``">> "``). | ||
| :rtype: str | ||
| """ | ||
| lines: list[str] = [] | ||
| for offset in range(0, len(data), 16): | ||
| chunk = data[offset : offset + 16] | ||
| hex_left = " ".join(f"{b:02x}" for b in chunk[:8]) | ||
| hex_right = " ".join(f"{b:02x}" for b in chunk[8:]) | ||
| ascii_part = "".join(chr(b) if 0x20 <= b < 0x7F else "." for b in chunk) | ||
| lines.append(f"{prefix}{offset:08x} {hex_left:<23s} {hex_right:<23s} |{ascii_part}|") | ||
| return "\n".join(lines) | ||
| _DEFAULT_LOGFMT = " ".join( | ||
@@ -112,3 +141,5 @@ ("%(asctime)s", "%(levelname)s", "%(filename)s:%(lineno)d", "%(message)s") | ||
| """Create and return simple logger for given arguments.""" | ||
| lvl = getattr(logging, loglevel.upper()) | ||
| lvl = getattr(logging, loglevel.upper(), None) | ||
| if lvl is None: | ||
| lvl = logging.getLevelName(loglevel.upper()) | ||
@@ -115,0 +146,0 @@ _cfg: Dict[str, Any] = {"format": logfmt} |
@@ -19,2 +19,3 @@ """Module provides class BaseClient.""" | ||
| from .telopt import DO, WILL, theNULL, name_commands | ||
| from .accessories import TRACE, hexdump | ||
| from .stream_reader import TelnetReader, TelnetReaderUnicode | ||
@@ -48,3 +49,3 @@ from .stream_writer import TelnetWriter, TelnetWriterUnicode | ||
| force_binary: bool = False, | ||
| connect_minwait: float = 1.0, | ||
| connect_minwait: float = 0, | ||
| connect_maxwait: float = 4.0, | ||
@@ -217,4 +218,9 @@ limit: Optional[int] = None, | ||
| """ | ||
| if self.log.isEnabledFor(TRACE): | ||
| self.log.log(TRACE, "recv %d bytes\n%s", len(data), hexdump(data, prefix="<< ")) | ||
| self._last_received = datetime.datetime.now() | ||
| # Detect SyncTERM font switching sequences and auto-switch encoding. | ||
| self._detect_syncterm_font(data) | ||
| # Enqueue and account for buffered size | ||
@@ -239,2 +245,29 @@ self._rx_queue.append(data) | ||
| def _detect_syncterm_font(self, data: bytes) -> None: | ||
| """ | ||
| Scan *data* for SyncTERM font selection and switch encoding. | ||
| When :attr:`_encoding_explicit` is set on the writer (indicating | ||
| the user passed ``--encoding``), the font switch is logged but | ||
| does not override the encoding. | ||
| """ | ||
| if self.writer is None: | ||
| return | ||
| # local | ||
| from .server_fingerprinting import ( # pylint: disable=import-outside-toplevel | ||
| _SYNCTERM_BINARY_ENCODINGS, | ||
| detect_syncterm_font, | ||
| ) | ||
| encoding = detect_syncterm_font(data) | ||
| if encoding is not None: | ||
| self.log.debug("SyncTERM font switch: %s", encoding) | ||
| if getattr(self.writer, '_encoding_explicit', False): | ||
| self.log.debug( | ||
| "ignoring font switch, explicit encoding: %s", | ||
| self.writer.environ_encoding) | ||
| else: | ||
| self.writer.environ_encoding = encoding | ||
| if encoding in _SYNCTERM_BINARY_ENCODINGS: | ||
| self.force_binary = True | ||
| # public properties | ||
@@ -241,0 +274,0 @@ |
@@ -9,3 +9,3 @@ """Telnet client shell implementations for interactive terminal sessions.""" | ||
| import collections | ||
| from typing import Any, Tuple, Union, Optional | ||
| from typing import Any, Dict, Tuple, Union, Optional | ||
@@ -17,5 +17,117 @@ # local | ||
| __all__ = ("telnet_client_shell",) | ||
| __all__ = ("InputFilter", "telnet_client_shell") | ||
| # Input byte translation tables for retro encodings in raw mode. | ||
| # Maps terminal keyboard bytes to the raw bytes the BBS expects. | ||
| # Applied BEFORE decoding/encoding, bypassing the codec entirely for | ||
| # characters that can't round-trip through Unicode (e.g. ATASCII 0x7E | ||
| # shares its Unicode codepoint U+25C0 with 0xFE). | ||
| _INPUT_XLAT: Dict[str, Dict[int, int]] = { | ||
| "atascii": { | ||
| 0x7F: 0x7E, # DEL → ATASCII backspace (byte 0x7E) | ||
| 0x08: 0x7E, # BS → ATASCII backspace (byte 0x7E) | ||
| 0x0D: 0x9B, # CR → ATASCII EOL (byte 0x9B) | ||
| 0x0A: 0x9B, # LF → ATASCII EOL (byte 0x9B) | ||
| }, | ||
| "petscii": { | ||
| 0x7F: 0x14, # DEL → PETSCII DEL (byte 0x14) | ||
| 0x08: 0x14, # BS → PETSCII DEL (byte 0x14) | ||
| }, | ||
| } | ||
| # Multi-byte escape sequence translation tables for retro encodings. | ||
| # Maps common ANSI terminal escape sequences (arrow keys, delete, etc.) | ||
| # to the raw bytes the BBS expects. Inspired by blessed's | ||
| # DEFAULT_SEQUENCE_MIXIN but kept minimal for the sequences that matter. | ||
| _INPUT_SEQ_XLAT: Dict[str, Dict[bytes, bytes]] = { | ||
| "atascii": { | ||
| b"\x1b[A": b"\x1c", # cursor up (CSI) | ||
| b"\x1b[B": b"\x1d", # cursor down | ||
| b"\x1b[C": b"\x1f", # cursor right | ||
| b"\x1b[D": b"\x1e", # cursor left | ||
| b"\x1bOA": b"\x1c", # cursor up (SS3 / application mode) | ||
| b"\x1bOB": b"\x1d", # cursor down | ||
| b"\x1bOC": b"\x1f", # cursor right | ||
| b"\x1bOD": b"\x1e", # cursor left | ||
| b"\x1b[3~": b"\x7e", # delete → ATASCII backspace | ||
| b"\t": b"\x7f", # tab → ATASCII tab | ||
| }, | ||
| "petscii": { | ||
| b"\x1b[A": b"\x91", # cursor up (CSI) | ||
| b"\x1b[B": b"\x11", # cursor down | ||
| b"\x1b[C": b"\x1d", # cursor right | ||
| b"\x1b[D": b"\x9d", # cursor left | ||
| b"\x1bOA": b"\x91", # cursor up (SS3 / application mode) | ||
| b"\x1bOB": b"\x11", # cursor down | ||
| b"\x1bOC": b"\x1d", # cursor right | ||
| b"\x1bOD": b"\x9d", # cursor left | ||
| b"\x1b[3~": b"\x14", # delete → PETSCII DEL | ||
| b"\x1b[H": b"\x13", # home → PETSCII HOME | ||
| b"\x1b[2~": b"\x94", # insert → PETSCII INSERT | ||
| }, | ||
| } | ||
| class InputFilter: # pylint: disable=too-few-public-methods | ||
| """ | ||
| Translate terminal escape sequences and single bytes to retro encoding bytes. | ||
| Combines single-byte translation (backspace, delete) with multi-byte | ||
| escape sequence matching (arrow keys, function keys). Uses prefix-based | ||
| buffering inspired by blessed's ``get_leading_prefixes`` to handle | ||
| sequences split across reads. | ||
| :param seq_xlat: Multi-byte escape sequence → replacement bytes. | ||
| :param byte_xlat: Single input byte → replacement byte. | ||
| """ | ||
| def __init__( | ||
| self, seq_xlat: Dict[bytes, bytes], byte_xlat: Dict[int, int] | ||
| ) -> None: | ||
| """Initialize input filter with sequence and byte translation tables.""" | ||
| self._byte_xlat = byte_xlat | ||
| # Sort sequences longest-first so \x1b[3~ matches before \x1b[3 | ||
| self._seq_sorted: Tuple[Tuple[bytes, bytes], ...] = tuple( | ||
| sorted(seq_xlat.items(), key=lambda kv: len(kv[0]), reverse=True) | ||
| ) | ||
| # Prefix set for partial-match buffering (blessed's get_leading_prefixes) | ||
| self._prefixes: frozenset[bytes] = frozenset( | ||
| seq[:i] for seq in seq_xlat for i in range(1, len(seq)) | ||
| ) | ||
| self._buf = b"" | ||
| def feed(self, data: bytes) -> bytes: | ||
| """ | ||
| Process input bytes, returning raw bytes to send to the remote host. | ||
| Escape sequences are matched against the configured table and replaced. Partial sequences | ||
| are buffered until the next call. Single bytes are translated via the byte translation | ||
| table. | ||
| :param data: Raw bytes from terminal stdin. | ||
| :returns: Translated bytes ready to send to the remote BBS. | ||
| """ | ||
| self._buf += data | ||
| result = bytearray() | ||
| while self._buf: | ||
| # Try multi-byte sequence match at current position | ||
| matched = False | ||
| for seq, repl in self._seq_sorted: | ||
| if self._buf[:len(seq)] == seq: | ||
| result.extend(repl) | ||
| self._buf = self._buf[len(seq):] | ||
| matched = True | ||
| break | ||
| if matched: | ||
| continue | ||
| # Check if buffer is a prefix of any known sequence — wait for more | ||
| if self._buf in self._prefixes: | ||
| break | ||
| # No sequence match, emit single byte with translation | ||
| b = self._buf[0] | ||
| self._buf = self._buf[1:] | ||
| result.append(self._byte_xlat.get(b, b)) | ||
| return bytes(result) | ||
| if sys.platform == "win32": | ||
@@ -78,7 +190,10 @@ | ||
| """Return copy of 'mode' with changes suggested for telnet connection.""" | ||
| if not self.telnet_writer.will_echo: | ||
| # return mode as-is | ||
| raw_mode = getattr(self.telnet_writer, '_raw_mode', False) | ||
| if not self.telnet_writer.will_echo and not raw_mode: | ||
| self.telnet_writer.log.debug("local echo, linemode") | ||
| return mode | ||
| self.telnet_writer.log.debug("server echo, kludge mode") | ||
| if raw_mode and not self.telnet_writer.will_echo: | ||
| self.telnet_writer.log.debug("raw mode forced, no server echo") | ||
| else: | ||
| self.telnet_writer.log.debug("server echo, kludge mode") | ||
@@ -177,4 +292,6 @@ # "Raw mode", see tty.py function setraw. This allows sending | ||
| linesep = "\n" | ||
| if term._istty and telnet_writer.will_echo: # pylint: disable=protected-access | ||
| linesep = "\r\n" | ||
| if term._istty: # pylint: disable=protected-access | ||
| _raw = getattr(telnet_writer, '_raw_mode', False) | ||
| if telnet_writer.will_echo or _raw: | ||
| linesep = "\r\n" | ||
| stdin, stdout = await term.make_stdio() | ||
@@ -268,3 +385,9 @@ escape_name = accessories.name_unicode(keyboard_escape) | ||
| break | ||
| telnet_writer.write(inp.decode()) | ||
| _inf = getattr(telnet_writer, '_input_filter', None) | ||
| if _inf is not None: | ||
| translated = _inf.feed(inp) | ||
| if translated: | ||
| telnet_writer._write(translated) # pylint: disable=protected-access | ||
| else: | ||
| telnet_writer.write(inp.decode()) | ||
| stdin_task = accessories.make_reader_task(stdin) | ||
@@ -310,4 +433,11 @@ wait_for.add(stdin_task) | ||
| out = _cf.filter(out) | ||
| if getattr(telnet_writer, '_raw_mode', False): | ||
| # Normalize all line endings to LF, then to CRLF | ||
| # for the raw terminal (OPOST disabled). PETSCII | ||
| # BBSes send bare CR (0x0D) as line terminator. | ||
| out = (out.replace('\r\n', '\n') | ||
| .replace('\r', '\n') | ||
| .replace('\n', '\r\n')) | ||
| stdout.write(out.encode() or b":?!?:") | ||
| telnet_task = accessories.make_reader_task(telnet_reader, size=2**24) | ||
| wait_for.add(telnet_task) |
+136
-25
@@ -184,2 +184,40 @@ #!/usr/bin/env python3 | ||
| @staticmethod | ||
| def _normalize_charset_name(name: str) -> str: | ||
| """ | ||
| Normalize server-advertised charset names for :func:`codecs.lookup`. | ||
| Servers sometimes advertise non-standard encoding names that Python's | ||
| codec registry does not recognise. This tries progressively simpler | ||
| variations until one resolves: | ||
| 1. Original name (spaces → hyphens) | ||
| 2. Leading zeros stripped from numeric parts (``iso-8859-02`` → ``iso-8859-2``) | ||
| 3. Hyphens removed entirely (``cp-1250`` → ``cp1250``) | ||
| 4. Hyphens removed from all but the first segment (``iso-8859-2`` kept) | ||
| :param name: Raw charset name from the server. | ||
| :returns: Normalized name suitable for :func:`codecs.lookup`. | ||
| """ | ||
| # std imports | ||
| import re # pylint: disable=import-outside-toplevel | ||
| base = name.strip().replace(' ', '-') | ||
| # Strip leading zeros from numeric segments: iso-8859-02 → iso-8859-2 | ||
| no_leading_zeros = re.sub(r'-0+(\d)', r'-\1', base) | ||
| # All hyphens removed: cp-1250 → cp1250 | ||
| no_hyphens = base.replace('-', '') | ||
| # Keep first hyphen-segment, collapse the rest: iso-8859-2 stays | ||
| parts = no_leading_zeros.split('-') | ||
| if len(parts) > 2: | ||
| partial = parts[0] + '-' + ''.join(parts[1:]) | ||
| else: | ||
| partial = no_leading_zeros | ||
| for candidate in (base, no_leading_zeros, no_hyphens, partial): | ||
| try: | ||
| codecs.lookup(candidate) | ||
| return candidate | ||
| except LookupError: | ||
| continue | ||
| return base | ||
| def send_charset(self, offered: List[str]) -> str: | ||
@@ -222,3 +260,5 @@ """ | ||
| try: | ||
| canon = codecs.lookup(offer).name | ||
| canon = codecs.lookup( | ||
| self._normalize_charset_name(offer) | ||
| ).name | ||
@@ -378,3 +418,3 @@ # Record first viable encoding | ||
| shell: Optional[ShellCallback] = None, | ||
| connect_minwait: float = 2.0, | ||
| connect_minwait: float = 0, | ||
| connect_maxwait: float = 3.0, | ||
@@ -498,3 +538,3 @@ connect_timeout: Optional[float] = None, | ||
| async def run_client() -> None: | ||
| async def run_client() -> None: # pylint: disable=too-many-locals,too-many-statements,too-complex | ||
| """Command-line 'telnetlib3-client' entry point, via setuptools.""" | ||
@@ -512,24 +552,27 @@ args = _transform_args(_get_argument_parser().parse_args()) | ||
| # Wrap client factory to inject always_will/always_do before negotiation | ||
| client_factory: Optional[Callable[..., client_base.BaseClient]] = None | ||
| if always_will or always_do: | ||
| # Wrap client factory to inject always_will/always_do and encoding | ||
| # flags before negotiation starts. | ||
| encoding_explicit = args["encoding"] not in ("utf8", "utf-8", False) | ||
| def _client_factory(**kwargs: Any) -> client_base.BaseClient: | ||
| client: TelnetClient | ||
| if sys.platform != "win32" and sys.stdin.isatty(): | ||
| client = TelnetTerminalClient(**kwargs) | ||
| else: | ||
| client = TelnetClient(**kwargs) | ||
| orig_connection_made = client.connection_made | ||
| def _client_factory(**kwargs: Any) -> client_base.BaseClient: | ||
| client: TelnetClient | ||
| if sys.platform != "win32" and sys.stdin.isatty(): | ||
| client = TelnetTerminalClient(**kwargs) | ||
| else: | ||
| client = TelnetClient(**kwargs) | ||
| orig_connection_made = client.connection_made | ||
| def _patched_connection_made(transport: asyncio.BaseTransport) -> None: | ||
| orig_connection_made(transport) | ||
| assert client.writer is not None | ||
| def _patched_connection_made(transport: asyncio.BaseTransport) -> None: | ||
| orig_connection_made(transport) | ||
| assert client.writer is not None | ||
| if always_will: | ||
| client.writer.always_will = always_will | ||
| if always_do: | ||
| client.writer.always_do = always_do | ||
| client.writer._encoding_explicit = encoding_explicit # pylint: disable=protected-access | ||
| client.connection_made = _patched_connection_made # type: ignore[method-assign] | ||
| return client | ||
| client.connection_made = _patched_connection_made # type: ignore[method-assign] | ||
| return client | ||
| client_factory = _client_factory | ||
| client_factory: Optional[Callable[..., client_base.BaseClient]] = _client_factory | ||
@@ -545,4 +588,15 @@ # Wrap the shell callback to inject color filter when enabled | ||
| ColorFilter, | ||
| PetsciiColorFilter, | ||
| AtasciiControlFilter, | ||
| ) | ||
| # Auto-select encoding-specific filters | ||
| encoding_name: str = args.get("encoding", "") or "" | ||
| is_petscii = encoding_name.lower() in ("petscii", "cbm", "commodore", "c64", "c128") | ||
| is_atascii = encoding_name.lower() in ("atascii", "atari8bit", "atari_8bit") | ||
| if colormatch == "petscii": | ||
| colormatch = "c64" | ||
| if is_petscii and colormatch != "c64": | ||
| colormatch = "c64" | ||
| if colormatch not in PALETTES: | ||
@@ -561,3 +615,8 @@ print( | ||
| ) | ||
| color_filter = ColorFilter(color_config) | ||
| if is_petscii or colormatch == "c64": | ||
| color_filter_obj: object = PetsciiColorFilter(color_config) | ||
| elif is_atascii: | ||
| color_filter_obj = AtasciiControlFilter() | ||
| else: | ||
| color_filter_obj = ColorFilter(color_config) | ||
| original_shell = shell_callback | ||
@@ -570,3 +629,3 @@ | ||
| # pylint: disable-next=protected-access | ||
| writer_arg._color_filter = color_filter # type: ignore[union-attr] | ||
| writer_arg._color_filter = color_filter_obj # type: ignore[union-attr] | ||
| await original_shell(reader, writer_arg) | ||
@@ -576,2 +635,33 @@ | ||
| # Wrap shell to inject raw_mode flag and input translation for retro encodings | ||
| raw_mode: bool = args.get("raw_mode", False) | ||
| if raw_mode: | ||
| # local | ||
| from .client_shell import ( # pylint: disable=import-outside-toplevel | ||
| _INPUT_XLAT, | ||
| _INPUT_SEQ_XLAT, | ||
| InputFilter, | ||
| ) | ||
| enc_key = (args.get("encoding", "") or "").lower() | ||
| byte_xlat = _INPUT_XLAT.get(enc_key, {}) | ||
| seq_xlat = _INPUT_SEQ_XLAT.get(enc_key, {}) | ||
| input_filter: Optional[InputFilter] = ( | ||
| InputFilter(seq_xlat, byte_xlat) if (seq_xlat or byte_xlat) else None | ||
| ) | ||
| _inner_shell = shell_callback | ||
| async def _raw_shell( | ||
| reader: Union[TelnetReader, TelnetReaderUnicode], | ||
| writer_arg: Union[TelnetWriter, TelnetWriterUnicode], | ||
| ) -> None: | ||
| # pylint: disable-next=protected-access | ||
| writer_arg._raw_mode = True # type: ignore[union-attr] | ||
| if input_filter is not None: | ||
| # pylint: disable-next=protected-access | ||
| writer_arg._input_filter = input_filter # type: ignore[union-attr] | ||
| await _inner_shell(reader, writer_arg) | ||
| shell_callback = _raw_shell | ||
| # Build connection kwargs explicitly to avoid pylint false positive | ||
@@ -626,5 +716,13 @@ connection_kwargs: Dict[str, Any] = { | ||
| parser.add_argument( | ||
| "--connect-minwait", default=1.0, type=float, help="shell delay for negotiation" | ||
| "--line-mode", | ||
| action="store_true", | ||
| default=False, | ||
| help="use line-buffered input with local echo instead of raw terminal " | ||
| "mode. By default the client uses raw mode (no line buffering, no " | ||
| "local echo) which is correct for most BBS and MUD servers.", | ||
| ) | ||
| parser.add_argument( | ||
| "--connect-minwait", default=0, type=float, help="shell delay for negotiation" | ||
| ) | ||
| parser.add_argument( | ||
| "--connect-maxwait", default=4.0, type=float, help="timeout for pending negotiation" | ||
@@ -659,3 +757,3 @@ ) | ||
| "--colormatch", | ||
| default="ega", | ||
| default="vga", | ||
| metavar="PALETTE", | ||
@@ -730,2 +828,12 @@ help=( | ||
| def _transform_args(args: argparse.Namespace) -> Dict[str, Any]: | ||
| # Auto-enable force_binary for retro BBS encodings that use high-bit bytes. | ||
| # local | ||
| from .encodings import FORCE_BINARY_ENCODINGS # pylint: disable=import-outside-toplevel | ||
| force_binary = args.force_binary | ||
| raw_mode = not args.line_mode | ||
| if args.encoding.lower().replace('-', '_') in FORCE_BINARY_ENCODINGS: | ||
| force_binary = True | ||
| raw_mode = True | ||
| return { | ||
@@ -741,3 +849,3 @@ "host": args.host, | ||
| "term": args.term, | ||
| "force_binary": args.force_binary, | ||
| "force_binary": force_binary, | ||
| "encoding_errors": args.encoding_errors, | ||
@@ -754,2 +862,3 @@ "connect_minwait": args.connect_minwait, | ||
| "reverse_video": args.reverse_video, | ||
| "raw_mode": raw_mode, | ||
| } | ||
@@ -927,2 +1036,4 @@ | ||
| client.writer.environ_encoding = environ_encoding | ||
| # pylint: disable-next=protected-access | ||
| client.writer._encoding_explicit = environ_encoding != "ascii" | ||
| client.writer.always_will = fp_always_will | ||
@@ -951,3 +1062,3 @@ client.writer.always_do = fp_always_do | ||
| term=ttype, | ||
| connect_minwait=2.0, | ||
| connect_minwait=0, | ||
| connect_maxwait=4.0, | ||
@@ -954,0 +1065,0 @@ connect_timeout=args.connect_timeout, |
@@ -44,3 +44,9 @@ """ | ||
| __all__ = ("ColorConfig", "ColorFilter", "PALETTES") | ||
| __all__ = ( | ||
| "AtasciiControlFilter", | ||
| "ColorConfig", | ||
| "ColorFilter", | ||
| "PetsciiColorFilter", | ||
| "PALETTES", | ||
| ) | ||
@@ -169,2 +175,22 @@ # Type alias for a 16-color palette: 16 (R, G, B) tuples indexed 0-15. | ||
| ), | ||
| # VIC-II C64 palette (Pepto's colodore reference). | ||
| # Indexed by VIC-II color register 0-15, NOT ANSI SGR order. | ||
| "c64": ( | ||
| (0, 0, 0), # 0 black | ||
| (255, 255, 255), # 1 white | ||
| (136, 0, 0), # 2 red | ||
| (170, 255, 238), # 3 cyan | ||
| (204, 68, 204), # 4 purple | ||
| (0, 204, 85), # 5 green | ||
| (0, 0, 170), # 6 blue | ||
| (238, 238, 119), # 7 yellow | ||
| (221, 136, 85), # 8 orange | ||
| (102, 68, 0), # 9 brown | ||
| (255, 119, 119), # 10 pink / light red | ||
| (51, 51, 51), # 11 dark grey | ||
| (119, 119, 119), # 12 grey | ||
| (170, 255, 102), # 13 light green | ||
| (0, 136, 255), # 14 light blue | ||
| (187, 187, 187), # 15 light grey | ||
| ), | ||
| } | ||
@@ -434,1 +460,182 @@ | ||
| return result | ||
| # PETSCII decoded control character → VIC-II palette index (0-15). | ||
| _PETSCII_COLOR_CODES: Dict[str, int] = { | ||
| '\x05': 1, # WHT (white) | ||
| '\x1c': 2, # RED | ||
| '\x1e': 5, # GRN (green) | ||
| '\x1f': 6, # BLU (blue) | ||
| '\x81': 8, # ORN (orange) | ||
| '\x90': 0, # BLK (black) | ||
| '\x95': 9, # BRN (brown) | ||
| '\x96': 10, # LRD (pink / light red) | ||
| '\x97': 11, # GR1 (dark grey) | ||
| '\x98': 12, # GR2 (grey) | ||
| '\x99': 13, # LGR (light green) | ||
| '\x9a': 14, # LBL (light blue) | ||
| '\x9b': 15, # GR3 (light grey) | ||
| '\x9c': 4, # PUR (purple) | ||
| '\x9e': 7, # YEL (yellow) | ||
| '\x9f': 3, # CYN (cyan) | ||
| } | ||
| # PETSCII cursor/screen control codes → ANSI escape sequences. | ||
| _PETSCII_CURSOR_CODES: Dict[str, str] = { | ||
| '\x11': '\x1b[B', # cursor down | ||
| '\x91': '\x1b[A', # cursor up | ||
| '\x1d': '\x1b[C', # cursor right | ||
| '\x9d': '\x1b[D', # cursor left | ||
| '\x13': '\x1b[H', # HOME (cursor to top-left) | ||
| '\x93': '\x1b[2J', # CLR (clear screen) | ||
| '\x14': '\x08\x1b[P', # DEL (destructive backspace) | ||
| } | ||
| # All PETSCII control chars handled by the filter. | ||
| _PETSCII_FILTER_CHARS = ( | ||
| frozenset(_PETSCII_COLOR_CODES) | ||
| | frozenset(_PETSCII_CURSOR_CODES) | ||
| | {'\x12', '\x92'} | ||
| ) | ||
| # Precompiled pattern matching any single PETSCII control character that | ||
| # the filter should consume (color codes, cursor codes, RVS ON/OFF). | ||
| _PETSCII_CTRL_RE = re.compile( | ||
| '[' + re.escape(''.join(sorted(_PETSCII_FILTER_CHARS))) + ']' | ||
| ) | ||
| class PetsciiColorFilter: | ||
| r""" | ||
| Translate PETSCII control codes to ANSI sequences. | ||
| PETSCII uses single-byte control codes embedded in the text stream for | ||
| color changes, cursor movement, and screen control. This filter | ||
| translates them to ANSI equivalents: | ||
| - **Colors**: 16 VIC-II palette colors → ``\x1b[38;2;R;G;Bm`` (24-bit RGB) | ||
| - **Reverse video**: RVS ON/OFF → ``\x1b[7m`` / ``\x1b[27m`` | ||
| - **Cursor**: up/down/left/right → ``\x1b[A/B/C/D`` | ||
| - **Screen**: HOME → ``\x1b[H``, CLR → ``\x1b[2J`` | ||
| - **DEL**: destructive backspace → ``\x08\x1b[P`` | ||
| :param config: Color configuration (uses ``brightness`` and ``contrast`` | ||
| for palette adjustment; ``palette_name`` is ignored — always C64). | ||
| """ | ||
| def __init__(self, config: Optional[ColorConfig] = None) -> None: | ||
| """Initialize PETSCII filter with optional color configuration.""" | ||
| palette = PALETTES["c64"] | ||
| if config is not None: | ||
| brightness = config.brightness | ||
| contrast = config.contrast | ||
| else: | ||
| brightness = 0.9 | ||
| contrast = 0.8 | ||
| self._adjusted: List[Tuple[int, int, int]] = [ | ||
| _adjust_color(r, g, b, brightness, contrast) for r, g, b in palette | ||
| ] | ||
| def _sgr_for_index(self, idx: int) -> str: | ||
| """Return a 24-bit foreground SGR sequence for palette *idx*.""" | ||
| r, g, b = self._adjusted[idx] | ||
| return f'\x1b[38;2;{r};{g};{b}m' | ||
| def filter(self, text: str) -> str: | ||
| """ | ||
| Replace PETSCII control codes with ANSI sequences. | ||
| PETSCII control characters (colors, cursor, screen) are replaced with their ANSI | ||
| equivalents. All other characters pass through unchanged. | ||
| :param text: Decoded PETSCII text (Unicode string). | ||
| :returns: Text with PETSCII controls translated to ANSI. | ||
| """ | ||
| if not _PETSCII_CTRL_RE.search(text): | ||
| return text | ||
| return _PETSCII_CTRL_RE.sub(self._replace, text) | ||
| def _replace(self, match: Match[str]) -> str: | ||
| """Regex callback for a single PETSCII control character.""" | ||
| ch = match.group() | ||
| idx = _PETSCII_COLOR_CODES.get(ch) | ||
| if idx is not None: | ||
| return self._sgr_for_index(idx) | ||
| cursor = _PETSCII_CURSOR_CODES.get(ch) | ||
| if cursor is not None: | ||
| return cursor | ||
| if ch == '\x12': | ||
| return '\x1b[7m' | ||
| if ch == '\x92': | ||
| return '\x1b[27m' | ||
| return '' | ||
| def flush(self) -> str: | ||
| """ | ||
| Flush buffered state. | ||
| PETSCII color codes are single-byte, so no buffering is needed. | ||
| :returns: Always ``""``. | ||
| """ | ||
| return "" | ||
| # ATASCII decoded control character glyphs → ANSI terminal sequences. | ||
| # The atascii codec decodes control bytes to Unicode glyphs; this map | ||
| # translates those glyphs to the terminal actions they represent. | ||
| _ATASCII_CONTROL_CODES: Dict[str, str] = { | ||
| '\u25c0': '\x08\x1b[P', # ◀ backspace/delete (0x7E / 0xFE) | ||
| '\u25b6': '\t', # ▶ tab (0x7F / 0xFF) | ||
| '\u21b0': '\x1b[2J\x1b[H', # ↰ clear screen (0x7D / 0xFD) | ||
| '\u2191': '\x1b[A', # ↑ cursor up (0x1C / 0x9C) | ||
| '\u2193': '\x1b[B', # ↓ cursor down (0x1D / 0x9D) | ||
| '\u2190': '\x1b[D', # ← cursor left (0x1E / 0x9E) | ||
| '\u2192': '\x1b[C', # → cursor right (0x1F / 0x9F) | ||
| } | ||
| _ATASCII_CTRL_RE = re.compile( | ||
| '[' + re.escape(''.join(sorted(_ATASCII_CONTROL_CODES))) + ']' | ||
| ) | ||
| class AtasciiControlFilter: | ||
| r""" | ||
| Translate decoded ATASCII control character glyphs to ANSI sequences. | ||
| The ``atascii`` codec decodes ATASCII control bytes into Unicode glyphs | ||
| (e.g. byte 0x7E → U+25C0 ◀). This filter replaces those glyphs with | ||
| the ANSI terminal sequences that produce the intended effect: | ||
| - **Backspace/delete**: ◀ → ``\x08\x1b[P`` (destructive backspace) | ||
| - **Tab**: ▶ → ``\t`` | ||
| - **Clear screen**: ↰ → ``\x1b[2J\x1b[H`` | ||
| - **Cursor movement**: ↑↓←→ → ``\x1b[A/B/D/C`` | ||
| """ | ||
| def filter(self, text: str) -> str: | ||
| """ | ||
| Replace ATASCII control glyphs with ANSI sequences. | ||
| :param text: Decoded ATASCII text (Unicode string). | ||
| :returns: Text with control glyphs translated to ANSI. | ||
| """ | ||
| if not _ATASCII_CTRL_RE.search(text): | ||
| return text | ||
| return _ATASCII_CTRL_RE.sub(self._replace, text) | ||
| @staticmethod | ||
| def _replace(match: Match[str]) -> str: | ||
| """Regex callback for a single ATASCII control glyph.""" | ||
| return _ATASCII_CONTROL_CODES.get(match.group(), '') | ||
| @staticmethod | ||
| def flush() -> str: | ||
| """ | ||
| Flush buffered state. | ||
| ATASCII control glyphs are single characters, so no buffering is needed. | ||
| :returns: Always ``""``. | ||
| """ | ||
| return "" |
| """ | ||
| Custom BBS/retro-computing codecs for telnetlib3. | ||
| Registers petscii and atarist codecs with Python's codecs module on import. | ||
| These encodings are then available for use with ``bytes.decode()`` and the | ||
| ``--encoding`` CLI flag of ``telnetlib3-fingerprint``. | ||
| Registers atascii, petscii, and atarist codecs with Python's codecs module | ||
| on import. These encodings are then available for use with | ||
| ``bytes.decode()`` and the ``--encoding`` CLI flag of | ||
| ``telnetlib3-fingerprint``. | ||
| """ | ||
@@ -48,2 +49,10 @@ | ||
| #: Encoding names (and aliases) that require BINARY mode for high-bit bytes. | ||
| #: Used by CLI entry points to auto-enable ``--force-binary``. | ||
| FORCE_BINARY_ENCODINGS = frozenset({ | ||
| 'atascii', 'atari8bit', 'atari_8bit', | ||
| 'petscii', 'cbm', 'commodore', 'c64', 'c128', | ||
| 'atarist', 'atari', | ||
| }) | ||
| codecs.register(_search_function) |
@@ -7,5 +7,5 @@ """ | ||
| The ``robot_check`` function can reliably detect whether the remote end is a real terminal | ||
| emulator by measuring the rendered width of a wide Unicode character. Real terminals | ||
| render it as width 2, while bots typically see width 1 or timeout. | ||
| The ``robot_check`` function detects whether the remote end is a real terminal emulator | ||
| by requesting a cursor position report (CPR) after writing a single space character. | ||
| Real terminals respond to CPR, while bots typically timeout. | ||
@@ -34,4 +34,4 @@ These shells are used when normal shell access is denied due to connection limits or | ||
| # Wide character test - U+231A WATCH should render as width 2 | ||
| _WIDE_TEST_CHAR = "\u231a" | ||
| # Narrow test character - a plain space works on any terminal | ||
| _TEST_CHAR = " " | ||
@@ -219,9 +219,9 @@ # Input limit for guard shells | ||
| """ | ||
| Check if client can render wide characters. | ||
| Check if client responds to cursor position report. | ||
| :returns: True if client passes (renders wide char as width 2). | ||
| :returns: True if client passes (responds to CPR with expected width). | ||
| """ | ||
| with _latin1_reading(reader): | ||
| width = await _measure_width(reader, writer, _WIDE_TEST_CHAR, timeout) | ||
| return bool(width == 2) | ||
| width = await _measure_width(reader, writer, _TEST_CHAR, timeout) | ||
| return bool(width == 1) | ||
@@ -228,0 +228,0 @@ |
@@ -17,2 +17,3 @@ """Module provides class BaseServer.""" | ||
| from .telopt import theNULL | ||
| from .accessories import TRACE, hexdump | ||
| from .stream_reader import TelnetReader, TelnetReaderUnicode | ||
@@ -198,3 +199,3 @@ from .stream_writer import TelnetWriter, TelnetWriterUnicode | ||
| def data_received(self, data: bytes) -> None: | ||
| def data_received(self, data: bytes) -> None: # pylint: disable=too-complex | ||
| """ | ||
@@ -216,2 +217,4 @@ Process bytes received by transport. | ||
| # | ||
| if logger.isEnabledFor(TRACE): | ||
| logger.log(TRACE, "recv %d bytes\n%s", len(data), hexdump(data, prefix="<< ")) | ||
| self._last_received = datetime.datetime.now() | ||
@@ -218,0 +221,0 @@ self._rx_bytes += len(data) |
@@ -23,4 +23,8 @@ """ | ||
| import subprocess | ||
| from typing import Any | ||
| from typing import Any, NamedTuple | ||
| # 3rd party | ||
| import wcwidth as _wcwidth | ||
| from wcwidth.escape_sequences import ZERO_WIDTH_PATTERN as _ZERO_WIDTH_STR_PATTERN | ||
| # local | ||
@@ -69,26 +73,31 @@ from . import fingerprinting as _fps | ||
| _PROBE_TIMEOUT = 0.5 | ||
| _MAX_PROMPT_REPLIES = 5 | ||
| _JQ = shutil.which("jq") | ||
| # Match "yes/no" or "y/n" surrounded by non-alphanumeric chars (or at | ||
| # string boundaries). Used to auto-answer confirmation prompts. | ||
| _YN_RE = re.compile(rb"(?i)(?:^|[^a-zA-Z0-9])(yes/no|y/n)(?:[^a-zA-Z0-9]|$)") | ||
| # Match "yes/no", "y/n", "(Yes|No)", or "(Yn)"/"[Yn]" surrounded by | ||
| # non-alphanumeric chars (or at string boundaries). The "(Yn)" form is | ||
| # a common BBS convention where the capital letter indicates the default. | ||
| _YN_RE = re.compile( | ||
| rb"(?i)(?:^|[^a-zA-Z0-9])(yes/no|y/n|\(yes\|no\))(?:[^a-zA-Z0-9]|$)" | ||
| rb"|[(\[][Yy][Nn][)\]]" | ||
| rb"|[(\[][yY][nN][)\]]" | ||
| ) | ||
| # Match MUD/BBS login prompts that offer 'who' as a command. | ||
| # Quoted: "enter a name (or 'who')", or bare WHO without surrounding | ||
| # alphanumerics: "\nWHO to see players connected.\n" | ||
| _WHO_RE = re.compile(rb"(?i)(?:'who'|\"who\"|(?:^|[^a-zA-Z0-9])who(?:[^a-zA-Z0-9]|$))") | ||
| # Same pattern for 'help' — offered as a login-screen command on many MUDs. | ||
| _HELP_RE = re.compile(rb"(?i)(?:'help'|\"help\"|(?:^|[^a-zA-Z0-9])help(?:[^a-zA-Z0-9]|$))") | ||
| # Match "color?" prompts — many MUDs ask if the user wants color. | ||
| _COLOR_RE = re.compile(rb"(?i)color\s*\?") | ||
| # Match numbered menu items offering UTF-8, e.g. "5) UTF-8" or "3) utf8". | ||
| # Many BBS/MUD systems present a charset selection menu at connect time. | ||
| _MENU_UTF8_RE = re.compile(rb"(\d+)\s*\)\s*UTF-?8", re.IGNORECASE) | ||
| # Match numbered menu items offering UTF-8, e.g. "5) UTF-8", "[3] UTF-8", | ||
| # "2. UTF-8", or "1 ... UTF-8". Many BBS/MUD systems present a charset | ||
| # selection menu at connect time. The optional \S+/ prefix handles | ||
| # "Something/UTF-8" labels. | ||
| _MENU_UTF8_RE = re.compile( | ||
| rb"[\[(]?(\d+)\s*(?:[\])]|\.{1,3})\s*(?:\S+\s*/\s*)?UTF-?8", re.IGNORECASE | ||
| ) | ||
| # Match numbered menu items offering ANSI, e.g. "(1) Ansi" or "[2] ANSI". | ||
| # Brackets may be parentheses or square brackets (mixed allowed). | ||
| _MENU_ANSI_RE = re.compile(rb"[\[(](\d+)[\])]\s*ANSI", re.IGNORECASE) | ||
| # Match numbered menu items offering ANSI, e.g. "(1) Ansi", "[2] ANSI", | ||
| # "3. English/ANSI", or "1 ... English/ANSI". Brackets, dot, and | ||
| # ellipsis delimiters are accepted. | ||
| _MENU_ANSI_RE = re.compile( | ||
| rb"[\[(]?(\d+)\s*(?:[\])]|\.{1,3})\s*(?:\S+\s*/\s*)?ANSI", re.IGNORECASE | ||
| ) | ||
@@ -98,2 +107,204 @@ # Match "gb/big5" encoding selection prompts common on Chinese BBS systems. | ||
| # Strip ANSI/VT100 escape sequences from raw bytes for pattern matching. | ||
| # Reuse wcwidth's comprehensive pattern (CSI, OSC, DCS, APC, PM, charset, Fe, Fp). | ||
| _ANSI_STRIP_RE = re.compile(_ZERO_WIDTH_STR_PATTERN.pattern.encode("ascii")) | ||
| # Match "Press [.ESC.] twice" botcheck prompts (e.g. Mystic BBS). | ||
| _ESC_TWICE_RE = re.compile(rb"(?i)press\s+[\[<]?\.?esc\.?[\]>]?\s+twice") | ||
| # Match single "Press [ESC]" prompts without "twice" (e.g. Herbie's BBS). | ||
| _ESC_ONCE_RE = re.compile(rb"(?i)press\s+[\[<]?\.?esc\.?[\]>]?(?!\s+twice)") | ||
| # Match "HIT RETURN", "PRESS RETURN", "PRESS ENTER", "HIT ENTER", etc. | ||
| # Common on Worldgroup/MajorBBS and other vintage BBS systems. | ||
| _RETURN_PROMPT_RE = re.compile( | ||
| rb"(?i)(?:hit|press)\s+(?:return|enter)\s*[:\.]?" | ||
| ) | ||
| # Match "Press the BACKSPACE key" prompts — standard telnet terminal | ||
| # detection (e.g. TelnetBible.com). Respond with ASCII BS (0x08). | ||
| _BACKSPACE_KEY_RE = re.compile( | ||
| rb"(?i)press\s+the\s+backspace\s+key" | ||
| ) | ||
| # Match "press del/backspace" and "hit your backspace/delete" prompts from | ||
| # PETSCII BBS systems (e.g. Image BBS C/G detect). Respond with PETSCII | ||
| # DEL byte (0x14). | ||
| _DEL_BACKSPACE_RE = re.compile( | ||
| rb"(?i)(?:press|hit)\s+(?:your\s+)?" | ||
| rb"(?:del(?:ete)?(?:\s*/\s*backspace)?|backspace(?:\s*/\s*del(?:ete)?)?)" | ||
| rb"(?:\s+key)?\s*[:\.]?" | ||
| ) | ||
| # Match "More: (Y)es, (N)o, (C)ontinuous?" pagination prompts. | ||
| # Answer "C" (Continuous) to disable pagination and collect the full banner. | ||
| _MORE_PROMPT_RE = re.compile( | ||
| rb"(?i)more[:\s]*\(?[yY]\)?.*\(?[cC]\)?\s*(?:ontinuous|ont)" | ||
| ) | ||
| # Match DSR (Device Status Report) request: ESC [ 6 n. | ||
| # Servers send this to detect ANSI-capable terminals; we reply with a | ||
| # Cursor Position Report (CPR) so the server sees us as ANSI-capable. | ||
| _DSR_RE = re.compile(rb"\x1b\[6n") | ||
| # Match SyncTERM/CTerm font selection: CSI Ps1 ; Ps2 SP D | ||
| # Reference: https://syncterm.bbsdev.net/cterm.html | ||
| # Ps1 = font page (0 = primary), Ps2 = font ID (0-255). | ||
| _SYNCTERM_FONT_RE = re.compile(rb"\x1b\[(\d+);(\d+) D") | ||
| #: Map SyncTERM font IDs to Python codec names. | ||
| #: Font IDs from CTerm spec / Synchronet SBBS / x84 SYNCTERM_FONTMAP. | ||
| SYNCTERM_FONT_ENCODINGS: dict[int, str] = { | ||
| 0: "cp437", | ||
| 1: "cp1251", | ||
| 2: "koi8-r", | ||
| 3: "iso-8859-2", | ||
| 4: "iso-8859-4", | ||
| 5: "cp866", | ||
| 6: "iso-8859-9", | ||
| 8: "iso-8859-8", | ||
| 9: "koi8-u", | ||
| 10: "iso-8859-15", | ||
| 11: "iso-8859-4", | ||
| 12: "koi8-r", | ||
| 13: "iso-8859-4", | ||
| 14: "iso-8859-5", | ||
| 16: "iso-8859-15", | ||
| 17: "cp850", | ||
| 18: "cp850", | ||
| 20: "cp1251", | ||
| 21: "iso-8859-7", | ||
| 22: "koi8-r", | ||
| 23: "iso-8859-4", | ||
| 24: "iso-8859-1", | ||
| 25: "cp866", | ||
| 26: "cp437", | ||
| 27: "cp866", | ||
| 29: "cp866", | ||
| 30: "iso-8859-1", | ||
| 31: "cp1131", | ||
| 32: "petscii", | ||
| 33: "petscii", | ||
| 34: "petscii", | ||
| 35: "petscii", | ||
| 36: "atascii", | ||
| 37: "cp437", | ||
| 38: "cp437", | ||
| 39: "cp437", | ||
| 40: "cp437", | ||
| 41: "cp437", | ||
| 42: "cp437", | ||
| } | ||
| #: Encodings that require ``force_binary`` for high-bit bytes. | ||
| _SYNCTERM_BINARY_ENCODINGS = frozenset({ | ||
| "petscii", "atascii", | ||
| }) | ||
| log = logging.getLogger(__name__) | ||
| def detect_syncterm_font(data: bytes) -> str | None: | ||
| """ | ||
| Extract encoding from a SyncTERM font selection sequence in *data*. | ||
| Scans *data* for ``CSI Ps1 ; Ps2 SP D`` and returns the corresponding | ||
| Python codec name from :data:`SYNCTERM_FONT_ENCODINGS`, or ``None`` | ||
| if no font sequence is found or the font ID is unrecognised. | ||
| :param data: Raw bytes that may contain escape sequences. | ||
| :returns: Encoding name or ``None``. | ||
| """ | ||
| match = _SYNCTERM_FONT_RE.search(data) | ||
| if match is None: | ||
| return None | ||
| font_id = int(match.group(2)) | ||
| return SYNCTERM_FONT_ENCODINGS.get(font_id) | ||
| #: Encodings where standard telnet CR+LF must be re-encoded to the | ||
| #: codec's native EOL byte. The codec's ``encode()`` handles the | ||
| #: actual CR → LF normalization; we just gate the re-encoding step. | ||
| _RETRO_EOL_ENCODINGS = frozenset({ | ||
| 'atascii', 'atari8bit', 'atari_8bit', | ||
| }) | ||
| def _reencode_prompt(response: bytes, encoding: str) -> bytes: | ||
| r""" | ||
| Re-encode an ASCII prompt response for the server's encoding. | ||
| For retro encodings (ATASCII), the standard ``\r\n`` line ending | ||
| is re-encoded through the codec so the server receives its native | ||
| EOL byte. For all other encodings the response is returned as-is. | ||
| :param response: ASCII prompt response bytes (e.g. ``b"yes\r\n"``). | ||
| :param encoding: Remote server encoding name. | ||
| :returns: Response bytes suitable for the server's encoding. | ||
| """ | ||
| normalized = encoding.lower().replace('-', '_') | ||
| if normalized not in _RETRO_EOL_ENCODINGS: | ||
| return response | ||
| try: | ||
| text = response.decode('ascii') | ||
| return text.encode(encoding) | ||
| except (UnicodeDecodeError, UnicodeEncodeError, LookupError): | ||
| return response | ||
| class _VirtualCursor: | ||
| """ | ||
| Track virtual cursor column to generate position-aware CPR responses. | ||
| The server's robot-check sends DSR, writes a test character, then sends | ||
| DSR again. It compares the two cursor positions to verify the character | ||
| rendered at the expected width. By tracking what the server writes | ||
| between DSR requests and advancing the column using :func:`wcwidth.wcwidth`, | ||
| the scanner produces CPR responses that satisfy the width check. | ||
| When *encoding* is set to a single-byte encoding like ``cp437``, raw | ||
| bytes are decoded with that encoding before measuring — this gives | ||
| correct column widths for servers that use SyncTERM font switching | ||
| where the raw bytes are not valid UTF-8. | ||
| """ | ||
| def __init__(self, encoding: str = "utf-8") -> None: | ||
| self.col = 1 | ||
| self.encoding = encoding | ||
| self.dsr_requests = 0 | ||
| self.dsr_replies = 0 | ||
| def cpr(self) -> bytes: | ||
| """Return a CPR response (``ESC [ row ; col R``) at the current position.""" | ||
| self.dsr_replies += 1 | ||
| return f"\x1b[1;{self.col}R".encode("ascii") | ||
| def advance(self, data: bytes) -> None: | ||
| """ | ||
| Advance cursor column for *data* (non-DSR text from the server). | ||
| ANSI escape sequences are stripped first so they do not contribute | ||
| to cursor movement. Backspace and carriage return are handled. | ||
| Bytes are decoded using :attr:`encoding` so that single-byte | ||
| encodings like CP437 produce the correct character widths. | ||
| """ | ||
| stripped = _ANSI_STRIP_RE.sub(b"", data) | ||
| try: | ||
| text = stripped.decode(self.encoding, errors="replace") | ||
| # pylint: disable-next=broad-exception-caught,overlapping-except | ||
| except (LookupError, Exception): | ||
| text = stripped.decode("latin-1") | ||
| for ch in text: | ||
| cp = ord(ch) | ||
| if cp == 0x08: # backspace | ||
| self.col = max(1, self.col - 1) | ||
| elif cp == 0x0D: # \r | ||
| self.col = 1 | ||
| elif cp >= 0x20: | ||
| w = _wcwidth.wcwidth(ch) | ||
| if w > 0: | ||
| self.col += w | ||
| logger = logging.getLogger("telnetlib3.server_fingerprint") | ||
@@ -134,55 +345,57 @@ | ||
| def _detect_yn_prompt(banner: bytes) -> bytes: # pylint: disable=too-many-return-statements | ||
| r""" | ||
| Return an appropriate first-prompt response based on banner content. | ||
| class _PromptResult(NamedTuple): | ||
| """Result of prompt detection with optional encoding override.""" | ||
| If the banner contains a ``yes/no`` or ``y/n`` confirmation prompt | ||
| (case-insensitive, delimited by non-alphanumeric characters), returns | ||
| ``b"yes\r\n"`` or ``b"y\r\n"`` respectively. | ||
| response: bytes | None | ||
| encoding: str | None = None | ||
| If the banner contains a ``color?`` prompt (case-insensitive), | ||
| returns ``b"y\r\n"`` to accept color. | ||
| If the banner contains a numbered menu item for UTF-8 (e.g. | ||
| ``5) UTF-8``), returns the digit followed by ``b"\r\n"`` to select | ||
| the UTF-8 charset option. | ||
| def _detect_yn_prompt( # pylint: disable=too-many-return-statements | ||
| banner: bytes, | ||
| ) -> _PromptResult: | ||
| r""" | ||
| Return an appropriate first-prompt response based on banner content. | ||
| If the banner contains a bracketed numbered menu item for ANSI | ||
| (e.g. ``(1) Ansi`` or ``[2] ANSI``), returns the digit followed | ||
| by ``b"\r\n"`` to select the ANSI option. | ||
| ANSI escape sequences are stripped before pattern matching so that | ||
| embedded color/cursor codes do not interfere with detection. | ||
| If the banner contains a ``gb/big5`` encoding selection prompt | ||
| (common on Chinese BBS systems), returns ``b"big5\r\n"`` to select | ||
| Big5 encoding. | ||
| Returns a :class:`_PromptResult` whose *response* is ``None`` when | ||
| no recognizable prompt is found — the caller should fall back to | ||
| sending a bare ``\r\n``. When a UTF-8 charset menu is selected, | ||
| *encoding* is set to ``"utf-8"`` so the caller can update the | ||
| session encoding. | ||
| If the banner contains a MUD/BBS login prompt offering ``'who'`` as | ||
| an alternative (e.g. "enter a name (or 'who')"), returns | ||
| ``b"who\r\n"``. Similarly, ``'help'`` prompts return ``b"help\r\n"``. | ||
| Otherwise returns a bare ``b"\r\n"``. | ||
| :param banner: Raw banner bytes collected before the first prompt. | ||
| :returns: Response bytes to send. | ||
| :returns: Prompt result with response bytes and optional encoding. | ||
| """ | ||
| match = _YN_RE.search(banner) | ||
| stripped = _ANSI_STRIP_RE.sub(b"", banner) | ||
| if _ESC_TWICE_RE.search(stripped): | ||
| return _PromptResult(b"\x1b\x1b") | ||
| if _ESC_ONCE_RE.search(stripped): | ||
| return _PromptResult(b"\x1b") | ||
| if _MORE_PROMPT_RE.search(stripped): | ||
| return _PromptResult(b"C\r\n") | ||
| match = _YN_RE.search(stripped) | ||
| if match: | ||
| token = match.group(1).lower() | ||
| if token == b"yes/no": | ||
| return b"yes\r\n" | ||
| return b"y\r\n" | ||
| if _COLOR_RE.search(banner): | ||
| return b"y\r\n" | ||
| menu_match = _MENU_UTF8_RE.search(banner) | ||
| token = match.group(1) | ||
| if token is not None and token.lower() in (b"yes/no", b"(yes|no)"): | ||
| return _PromptResult(b"yes\r\n") | ||
| return _PromptResult(b"y\r\n") | ||
| if _COLOR_RE.search(stripped): | ||
| return _PromptResult(b"y\r\n") | ||
| menu_match = _MENU_UTF8_RE.search(stripped) | ||
| if menu_match: | ||
| return menu_match.group(1) + b"\r\n" | ||
| ansi_match = _MENU_ANSI_RE.search(banner) | ||
| return _PromptResult(menu_match.group(1) + b"\r\n", encoding="utf-8") | ||
| ansi_match = _MENU_ANSI_RE.search(stripped) | ||
| if ansi_match: | ||
| return ansi_match.group(1) + b"\r\n" | ||
| if _GB_BIG5_RE.search(banner): | ||
| return b"big5\r\n" | ||
| if _WHO_RE.search(banner): | ||
| return b"who\r\n" | ||
| if _HELP_RE.search(banner): | ||
| return b"help\r\n" | ||
| return b"\r\n" | ||
| return _PromptResult(ansi_match.group(1) + b"\r\n") | ||
| if _GB_BIG5_RE.search(stripped): | ||
| return _PromptResult(b"big5\r\n", encoding="big5") | ||
| if _BACKSPACE_KEY_RE.search(stripped): | ||
| return _PromptResult(b"\x08") | ||
| if _DEL_BACKSPACE_RE.search(stripped): | ||
| return _PromptResult(b"\x14") | ||
| if _RETURN_PROMPT_RE.search(stripped): | ||
| return _PromptResult(b"\r\n") | ||
| return _PromptResult(None) | ||
@@ -223,2 +436,4 @@ | ||
| ``"ascii"`` per :rfc:`1572`; use ``"cp037"`` for EBCDIC hosts. | ||
| When set to something other than ``"ascii"``, SyncTERM font | ||
| switches will not override this encoding. | ||
| :param scan_type: ``"quick"`` probes CORE + MUD options only (default); | ||
@@ -233,2 +448,3 @@ ``"full"`` includes all LEGACY options. | ||
| writer.environ_encoding = environ_encoding | ||
| writer._encoding_explicit = environ_encoding != "ascii" # pylint: disable=protected-access | ||
| try: | ||
@@ -254,3 +470,3 @@ await _fingerprint_session( | ||
| async def _fingerprint_session( # pylint: disable=too-many-locals | ||
| async def _fingerprint_session( # noqa: E501 ; pylint: disable=too-many-locals,too-many-branches,too-many-statements,too-complex | ||
| reader: TelnetReader, | ||
@@ -272,18 +488,68 @@ writer: TelnetWriter, | ||
| start_time = time.time() | ||
| cursor = _VirtualCursor(encoding=writer.environ_encoding) | ||
| # 1. Let straggler negotiation settle | ||
| await asyncio.sleep(_NEGOTIATION_SETTLE) | ||
| # 1. Let straggler negotiation settle — read (and respond to DSR) | ||
| # instead of sleeping blind so early DSR requests get a CPR reply. | ||
| settle_data = await _read_banner_until_quiet( | ||
| reader, quiet_time=_NEGOTIATION_SETTLE, max_wait=_NEGOTIATION_SETTLE, | ||
| max_bytes=banner_max_bytes, writer=writer, cursor=cursor, | ||
| ) | ||
| # 2. Read banner (pre-return) — wait until output stops | ||
| banner_before = await _read_banner_until_quiet( | ||
| reader, quiet_time=banner_quiet_time, max_wait=banner_max_wait, max_bytes=banner_max_bytes | ||
| banner_before_raw = await _read_banner_until_quiet( | ||
| reader, quiet_time=banner_quiet_time, max_wait=banner_max_wait, | ||
| max_bytes=banner_max_bytes, writer=writer, cursor=cursor, | ||
| ) | ||
| banner_before = settle_data + banner_before_raw | ||
| # 3. Send return (or "yes"/"y" if the banner contains a y/n prompt) | ||
| prompt_response = _detect_yn_prompt(banner_before) | ||
| writer.write(prompt_response) | ||
| await writer.drain() | ||
| banner_after = await _read_banner_until_quiet( | ||
| reader, quiet_time=banner_quiet_time, max_wait=banner_max_wait, max_bytes=banner_max_bytes | ||
| ) | ||
| # 3. Respond to prompts — some servers ask multiple questions in | ||
| # sequence (e.g. "color?" then a UTF-8 charset menu). Loop up to | ||
| # _MAX_PROMPT_REPLIES times, stopping early when no prompt is detected | ||
| # or the connection is lost. | ||
| after_chunks: list[bytes] = [] | ||
| latest_banner = banner_before | ||
| for _prompt_round in range(_MAX_PROMPT_REPLIES): | ||
| prompt_result = _detect_yn_prompt(latest_banner) | ||
| detected = prompt_result.response | ||
| # Skip if the ESC response was already sent inline during banner | ||
| # collection (time-sensitive botcheck countdowns). | ||
| if detected in (b"\x1b\x1b", b"\x1b") and getattr( | ||
| writer, '_esc_inline', False | ||
| ): | ||
| # pylint: disable-next=protected-access | ||
| writer._esc_inline = False # type: ignore[attr-defined] | ||
| detected = None | ||
| prompt_response = _reencode_prompt( | ||
| detected if detected is not None else b"\r\n", | ||
| writer.environ_encoding, | ||
| ) | ||
| writer.write(prompt_response) | ||
| await writer.drain() | ||
| # When the server presents a charset menu and we select an | ||
| # encoding (e.g. UTF-8 or Big5), switch the session encoding | ||
| # so that subsequent banner data is decoded correctly. | ||
| if prompt_result.encoding: | ||
| writer.environ_encoding = prompt_result.encoding | ||
| cursor.encoding = prompt_result.encoding | ||
| protocol = writer.protocol | ||
| if protocol is not None: | ||
| protocol.force_binary = True | ||
| previous_banner = latest_banner | ||
| latest_banner = await _read_banner_until_quiet( | ||
| reader, quiet_time=banner_quiet_time, max_wait=banner_max_wait, | ||
| max_bytes=banner_max_bytes, writer=writer, cursor=cursor, | ||
| ) | ||
| after_chunks.append(latest_banner) | ||
| if writer.is_closing() or not latest_banner: | ||
| break | ||
| # Stop when the server repeats the same banner — it is not | ||
| # advancing through prompts, just re-displaying the login screen. | ||
| if latest_banner == previous_banner: | ||
| break | ||
| # Stop when no prompt was detected AND the new banner has no | ||
| # prompt either (servers like Mystic BBS send a non-interactive | ||
| # preamble before the real botcheck prompt). | ||
| if detected is None and _detect_yn_prompt(latest_banner).response is None: | ||
| break | ||
| banner_after = b"".join(after_chunks) | ||
@@ -312,5 +578,13 @@ # 4. Snapshot option states before probing | ||
| "encoding": writer.environ_encoding, | ||
| "banner_before_return": _format_banner(banner_before, encoding=writer.environ_encoding), | ||
| "banner_after_return": _format_banner(banner_after, encoding=writer.environ_encoding), | ||
| "banner_before_return": _format_banner( | ||
| banner_before, | ||
| encoding=writer.environ_encoding, | ||
| ), | ||
| "banner_after_return": _format_banner( | ||
| banner_after, | ||
| encoding=writer.environ_encoding, | ||
| ), | ||
| "timing": {"probe": probe_time, "total": time.time() - start_time}, | ||
| "dsr_requests": cursor.dsr_requests, | ||
| "dsr_replies": cursor.dsr_replies, | ||
| } | ||
@@ -595,3 +869,3 @@ ) | ||
| def _format_banner(data: bytes, encoding: str = "utf-8") -> str: | ||
| """ | ||
| r""" | ||
| Format raw banner bytes for JSON serialization. | ||
@@ -602,9 +876,33 @@ | ||
| Uses ``surrogateescape`` so high bytes (common in CP437 BBS art) | ||
| are preserved as surrogates (e.g. byte ``0xB1`` → ``U+DCB1``) | ||
| rather than replaced with ``U+FFFD``. JSON serialization escapes | ||
| them as ``\udcXX``, which round-trips through :func:`json.load`. | ||
| Falls back to ``latin-1`` when the requested encoding is unavailable | ||
| (e.g. a server-advertised charset that Python does not recognise). | ||
| When *encoding* is ``petscii``, inline PETSCII color control codes | ||
| are translated to ANSI 24-bit RGB SGR sequences using the VIC-II | ||
| C64 palette so the saved banner is human-readable with colors. | ||
| :param data: Raw bytes from the server. | ||
| :param encoding: Character encoding to use for decoding. | ||
| :returns: Decoded text string (undecodable bytes replaced). | ||
| :returns: Decoded text string (raw bytes preserved as surrogates). | ||
| """ | ||
| return data.decode(encoding, errors="replace") | ||
| try: | ||
| text = data.decode(encoding, errors="surrogateescape") | ||
| except LookupError: | ||
| text = data.decode("latin-1") | ||
| if encoding.lower() in ("petscii", "cbm", "commodore", "c64", "c128"): | ||
| # local | ||
| from .color_filter import PetsciiColorFilter # pylint: disable=import-outside-toplevel | ||
| text = PetsciiColorFilter().filter(text) | ||
| # PETSCII uses CR (0x0D) as line terminator; normalize to LF. | ||
| text = text.replace('\r\n', '\n').replace('\r', '\n') | ||
| return text | ||
| async def _await_mssp_data(writer: TelnetWriter, deadline: float) -> None: | ||
@@ -641,3 +939,26 @@ """Wait for MSSP data until *deadline* if server acknowledged MSSP.""" | ||
| async def _read_banner_until_quiet( | ||
| def _respond_to_dsr( | ||
| chunk: bytes, writer: TelnetWriter, cursor: _VirtualCursor | None | ||
| ) -> None: | ||
| """ | ||
| Send CPR response(s) for each DSR found in *chunk*. | ||
| When *cursor* is provided, text between DSR sequences advances the | ||
| virtual cursor column so each CPR reflects the correct position. | ||
| Without a cursor, a static ``ESC [ 1 ; 1 R`` is sent for every DSR. | ||
| """ | ||
| if cursor is None: | ||
| for _ in _DSR_RE.finditer(chunk): | ||
| writer.write(b"\x1b[1;1R") | ||
| return | ||
| pos = 0 | ||
| for match in _DSR_RE.finditer(chunk): | ||
| cursor.dsr_requests += 1 | ||
| cursor.advance(chunk[pos:match.start()]) | ||
| writer.write(cursor.cpr()) | ||
| pos = match.end() | ||
| cursor.advance(chunk[pos:]) | ||
| async def _read_banner_until_quiet( # noqa: E501 ; pylint: disable=too-many-positional-arguments,too-complex,too-many-nested-blocks | ||
| reader: TelnetReader, | ||
@@ -647,2 +968,4 @@ quiet_time: float = 2.0, | ||
| max_bytes: int = _BANNER_MAX_BYTES, | ||
| writer: TelnetWriter | None = None, | ||
| cursor: _VirtualCursor | None = None, | ||
| ) -> bytes: | ||
@@ -656,2 +979,16 @@ """ | ||
| When *writer* is provided, any DSR (Device Status Report) request | ||
| ``ESC [ 6 n`` found in the incoming data is answered with a CPR | ||
| (Cursor Position Report) so the server detects an ANSI-capable | ||
| terminal. | ||
| When *cursor* is provided, the CPR response reflects the tracked | ||
| virtual cursor column (advanced by :func:`wcwidth.wcwidth` for each | ||
| printable character). This defeats robot-check guards that verify | ||
| cursor movement after writing a test character. | ||
| Time-sensitive prompts — ``Press [.ESC.] twice`` botcheck countdowns | ||
| — are detected inline and responded to immediately so the reply | ||
| arrives before the countdown expires. | ||
| :param reader: :class:`~telnetlib3.stream_reader.TelnetReader` instance. | ||
@@ -661,5 +998,9 @@ :param quiet_time: Seconds of silence before considering banner complete. | ||
| :param max_bytes: Maximum bytes per read call. | ||
| :param writer: Optional :class:`~telnetlib3.stream_writer.TelnetWriter` | ||
| used to send CPR replies to DSR requests. | ||
| :param cursor: Optional :class:`_VirtualCursor` for position-aware CPR. | ||
| :returns: Banner bytes (may be empty). | ||
| """ | ||
| chunks: list[bytes] = [] | ||
| esc_responded = False | ||
| loop = asyncio.get_event_loop() | ||
@@ -675,2 +1016,37 @@ deadline = loop.time() + max_wait | ||
| break | ||
| if writer is not None and _DSR_RE.search(chunk): | ||
| _respond_to_dsr(chunk, writer, cursor) | ||
| await writer.drain() | ||
| elif cursor is not None: | ||
| cursor.advance(chunk) | ||
| if writer is not None: | ||
| font_enc = detect_syncterm_font(chunk) | ||
| if font_enc is not None: | ||
| log.debug("SyncTERM font switch detected: %s", font_enc) | ||
| if getattr(writer, '_encoding_explicit', False): | ||
| log.debug( | ||
| "ignoring font switch, explicit encoding: %s", | ||
| writer.environ_encoding) | ||
| else: | ||
| writer.environ_encoding = font_enc | ||
| if cursor is not None: | ||
| cursor.encoding = font_enc | ||
| protocol = writer.protocol | ||
| if (protocol is not None | ||
| and font_enc in _SYNCTERM_BINARY_ENCODINGS): | ||
| protocol.force_binary = True | ||
| if not esc_responded: | ||
| stripped_chunk = _ANSI_STRIP_RE.sub(b"", chunk) | ||
| if _ESC_TWICE_RE.search(stripped_chunk): | ||
| writer.write(b"\x1b\x1b") | ||
| await writer.drain() | ||
| esc_responded = True | ||
| # pylint: disable-next=protected-access | ||
| writer._esc_inline = True # type: ignore[attr-defined] | ||
| elif _ESC_ONCE_RE.search(stripped_chunk): | ||
| writer.write(b"\x1b") | ||
| await writer.drain() | ||
| esc_responded = True | ||
| # pylint: disable-next=protected-access | ||
| writer._esc_inline = True # type: ignore[attr-defined] | ||
| chunks.append(chunk) | ||
@@ -677,0 +1053,0 @@ except (asyncio.TimeoutError, EOFError): |
@@ -251,21 +251,2 @@ """Telnet server shell implementations.""" | ||
| async def get_next_ascii( | ||
| reader: Union[TelnetReader, TelnetReaderUnicode], | ||
| writer: Union[TelnetWriter, TelnetWriterUnicode], | ||
| ) -> Optional[str]: | ||
| """Accept the next non-ANSI-escape character from reader.""" | ||
| _reader = cast(TelnetReaderUnicode, reader) | ||
| escape_sequence = False | ||
| while not writer.is_closing(): | ||
| next_char = await _reader.read(1) | ||
| if next_char == "\x1b": | ||
| escape_sequence = True | ||
| elif escape_sequence: | ||
| if 61 <= ord(next_char) <= 90 or 97 <= ord(next_char) <= 122: | ||
| escape_sequence = False | ||
| else: | ||
| return next_char | ||
| return None | ||
| @types.coroutine | ||
@@ -272,0 +253,0 @@ def readline( |
+26
-4
@@ -62,3 +62,3 @@ """ | ||
| pty_args: Optional[List[str]] = None | ||
| pty_raw: bool = False | ||
| pty_raw: bool = True | ||
| robot_check: bool = False | ||
@@ -993,7 +993,17 @@ pty_fork_limit: int = 0 | ||
| parser.add_argument( | ||
| "--line-mode", | ||
| action="store_true", | ||
| default=False, | ||
| help="use cooked PTY mode with echo for --pty-exec instead of raw " | ||
| "mode. By default PTY echo is disabled (raw mode), which is " | ||
| "correct for programs that handle their own terminal I/O " | ||
| "(curses, blessed, ucs-detect).", | ||
| ) | ||
| # Hidden backwards-compat: --pty-raw was the default since 2.5, | ||
| # keep it as a silent no-op so existing scripts don't break. | ||
| parser.add_argument( | ||
| "--pty-raw", | ||
| action="store_true", | ||
| default=_config.pty_raw, | ||
| help="raw mode for --pty-exec: disable PTY echo for programs that " | ||
| "handle their own terminal I/O (curses, blessed, ucs-detect)", | ||
| default=False, | ||
| help=argparse.SUPPRESS, | ||
| ) | ||
@@ -1026,2 +1036,6 @@ parser.add_argument( | ||
| result["pty_args"] = pty_args if PTY_SUPPORT else None | ||
| # --pty-raw is a hidden no-op (raw is now the default); | ||
| # --line-mode opts out of raw mode. | ||
| result.pop("pty_raw", None) | ||
| result["pty_raw"] = not result.pop("line_mode", False) | ||
| if not PTY_SUPPORT: | ||
@@ -1031,2 +1045,10 @@ result["pty_exec"] = None | ||
| result["pty_raw"] = False | ||
| # Auto-enable force_binary for retro BBS encodings that use high-bit bytes. | ||
| # local | ||
| from .encodings import FORCE_BINARY_ENCODINGS # pylint: disable=import-outside-toplevel | ||
| if result["encoding"].lower().replace('-', '_') in FORCE_BINARY_ENCODINGS: | ||
| result["force_binary"] = True | ||
| return result | ||
@@ -1033,0 +1055,0 @@ |
@@ -11,5 +11,50 @@ # std imports | ||
| # local | ||
| from telnetlib3.accessories import make_logger, repr_mapping, function_lookup, make_reader_task | ||
| from telnetlib3.accessories import ( | ||
| TRACE, | ||
| hexdump, | ||
| make_logger, | ||
| repr_mapping, | ||
| function_lookup, | ||
| make_reader_task, | ||
| ) | ||
| def test_trace_level_registered(): | ||
| assert TRACE == 5 | ||
| assert logging.getLevelName(TRACE) == "TRACE" | ||
| assert logging.getLevelName("TRACE") == TRACE | ||
| def test_hexdump_short(): | ||
| data = b"Hello World\r\n" | ||
| result = hexdump(data) | ||
| assert "48 65 6c 6c 6f 20 57 6f" in result | ||
| assert "72 6c 64 0d 0a" in result | ||
| assert "|Hello World..|" in result | ||
| def test_hexdump_two_rows(): | ||
| data = bytes(range(32)) | ||
| result = hexdump(data) | ||
| lines = result.splitlines() | ||
| assert len(lines) == 2 | ||
| assert lines[0].startswith("00000000") | ||
| assert lines[1].startswith("00000010") | ||
| def test_hexdump_prefix(): | ||
| result = hexdump(b"\xff\xfd\x18", prefix=">> ") | ||
| assert result.startswith(">> 00000000") | ||
| assert "ff fd 18" in result | ||
| def test_hexdump_empty(): | ||
| assert hexdump(b"") == "" | ||
| def test_make_logger_trace_level(): | ||
| logger = make_logger("acc_trace", loglevel="trace") | ||
| assert logger.isEnabledFor(TRACE) | ||
| def test_make_logger_no_file(): | ||
@@ -16,0 +61,0 @@ logger = make_logger("acc_no_file", loglevel="info") |
@@ -65,3 +65,33 @@ # std imports | ||
| @pytest.mark.parametrize( | ||
| "offered,expected", | ||
| [ | ||
| pytest.param(["iso-8859-02"], "iso-8859-02", id="iso_leading_zero"), | ||
| pytest.param(["iso 8859-02"], "iso 8859-02", id="iso_space_leading_zero"), | ||
| pytest.param(["cp-1250"], "cp-1250", id="cp_hyphen"), | ||
| ], | ||
| ) | ||
| @pytest.mark.asyncio | ||
| async def test_send_charset_normalization(offered, expected): | ||
| c = _make_client(encoding=False) | ||
| c.default_encoding = None | ||
| assert c.send_charset(offered) == expected | ||
| @pytest.mark.parametrize( | ||
| "name,expected", | ||
| [ | ||
| pytest.param("iso-8859-02", "iso-8859-2", id="iso_leading_zero"), | ||
| pytest.param("iso 8859-02", "iso-8859-2", id="iso_space_leading_zero"), | ||
| pytest.param("cp-1250", "cp1250", id="cp_hyphen"), | ||
| pytest.param("UTF-8", "UTF-8", id="passthrough"), | ||
| pytest.param("iso-8859-15", "iso-8859-15", id="no_leading_zero"), | ||
| pytest.param("x-penn-def", "x-penn-def", id="unknown_passthrough"), | ||
| ], | ||
| ) | ||
| def test_normalize_charset_name(name, expected): | ||
| assert cl.TelnetClient._normalize_charset_name(name) == expected | ||
| @pytest.mark.asyncio | ||
| async def test_send_env(): | ||
@@ -68,0 +98,0 @@ c = _make_client(term="xterm", cols=132, rows=43) |
@@ -11,2 +11,4 @@ """Tests for telnetlib3.color_filter — ANSI color palette translation.""" | ||
| ColorFilter, | ||
| PetsciiColorFilter, | ||
| AtasciiControlFilter, | ||
| _adjust_color, | ||
@@ -31,3 +33,3 @@ _is_foreground_code, | ||
| def test_all_expected_palettes_exist(self) -> None: | ||
| assert set(PALETTES.keys()) == {"ega", "cga", "vga", "amiga", "xterm"} | ||
| assert set(PALETTES.keys()) == {"ega", "cga", "vga", "amiga", "xterm", "c64"} | ||
@@ -457,3 +459,3 @@ | ||
| class TestColorFilterDifferentPalettes: | ||
| @pytest.mark.parametrize("name", list(PALETTES.keys())) | ||
| @pytest.mark.parametrize("name", [n for n in PALETTES if n != "c64"]) | ||
| def test_palette_red_foreground(self, name: str) -> None: | ||
@@ -464,1 +466,132 @@ f = ColorFilter(ColorConfig(palette_name=name, brightness=1.0, contrast=1.0)) | ||
| assert f"38;2;{rgb[0]};{rgb[1]};{rgb[2]}" in result | ||
| class TestPetsciiColorFilter: | ||
| def _make_filter(self, **kwargs: object) -> PetsciiColorFilter: | ||
| cfg = ColorConfig(brightness=1.0, contrast=1.0, **kwargs) # type: ignore[arg-type] | ||
| return PetsciiColorFilter(cfg) | ||
| @pytest.mark.parametrize("ctrl_char,palette_idx", [ | ||
| ('\x05', 1), | ||
| ('\x1c', 2), | ||
| ('\x1e', 5), | ||
| ('\x1f', 6), | ||
| ('\x81', 8), | ||
| ('\x90', 0), | ||
| ('\x95', 9), | ||
| ('\x96', 10), | ||
| ('\x97', 11), | ||
| ('\x98', 12), | ||
| ('\x99', 13), | ||
| ('\x9a', 14), | ||
| ('\x9b', 15), | ||
| ('\x9c', 4), | ||
| ('\x9e', 7), | ||
| ('\x9f', 3), | ||
| ]) | ||
| def test_color_code_to_24bit(self, ctrl_char: str, palette_idx: int) -> None: | ||
| f = self._make_filter() | ||
| result = f.filter(f"hello{ctrl_char}world") | ||
| rgb = PALETTES["c64"][palette_idx] | ||
| assert f"\x1b[38;2;{rgb[0]};{rgb[1]};{rgb[2]}m" in result | ||
| assert ctrl_char not in result | ||
| assert "hello" in result | ||
| assert "world" in result | ||
| def test_rvs_on(self) -> None: | ||
| f = self._make_filter() | ||
| result = f.filter("before\x12after") | ||
| assert "\x1b[7m" in result | ||
| assert "\x12" not in result | ||
| def test_rvs_off(self) -> None: | ||
| f = self._make_filter() | ||
| result = f.filter("before\x92after") | ||
| assert "\x1b[27m" in result | ||
| assert "\x92" not in result | ||
| def test_mixed_colors_and_rvs(self) -> None: | ||
| f = self._make_filter() | ||
| result = f.filter("\x1c\x12hello\x92\x05world") | ||
| red_rgb = PALETTES["c64"][2] | ||
| white_rgb = PALETTES["c64"][1] | ||
| assert f"\x1b[38;2;{red_rgb[0]};{red_rgb[1]};{red_rgb[2]}m" in result | ||
| assert "\x1b[7m" in result | ||
| assert "\x1b[27m" in result | ||
| assert f"\x1b[38;2;{white_rgb[0]};{white_rgb[1]};{white_rgb[2]}m" in result | ||
| assert "hello" in result | ||
| assert "world" in result | ||
| def test_plain_text_unchanged(self) -> None: | ||
| f = self._make_filter() | ||
| assert f.filter("hello world") == "hello world" | ||
| def test_non_petscii_control_chars_unchanged(self) -> None: | ||
| f = self._make_filter() | ||
| result = f.filter("A\x07B\x0bC") | ||
| assert "A\x07B\x0bC" == result | ||
| def test_cursor_controls_translated(self) -> None: | ||
| f = self._make_filter() | ||
| assert f.filter("A\x13B") == "A\x1b[HB" | ||
| assert f.filter("A\x93B") == "A\x1b[2JB" | ||
| assert f.filter("A\x11B") == "A\x1b[BB" | ||
| assert f.filter("A\x91B") == "A\x1b[AB" | ||
| assert f.filter("A\x1dB") == "A\x1b[CB" | ||
| assert f.filter("A\x9dB") == "A\x1b[DB" | ||
| assert f.filter("A\x14B") == "A\x08\x1b[PB" | ||
| def test_flush_returns_empty(self) -> None: | ||
| f = self._make_filter() | ||
| assert f.flush() == "" | ||
| def test_brightness_contrast_applied(self) -> None: | ||
| f_full = PetsciiColorFilter(ColorConfig(brightness=1.0, contrast=1.0)) | ||
| f_dim = PetsciiColorFilter(ColorConfig(brightness=0.5, contrast=0.5)) | ||
| result_full = f_full.filter("\x1c") | ||
| result_dim = f_dim.filter("\x1c") | ||
| assert result_full != result_dim | ||
| def test_default_config(self) -> None: | ||
| f = PetsciiColorFilter() | ||
| result = f.filter("\x1c") | ||
| assert "\x1b[38;2;" in result | ||
| class TestAtasciiControlFilter: | ||
| @pytest.mark.parametrize("glyph,expected", [ | ||
| ('\u25c0', '\x08\x1b[P'), | ||
| ('\u25b6', '\t'), | ||
| ('\u21b0', '\x1b[2J\x1b[H'), | ||
| ('\u2191', '\x1b[A'), | ||
| ('\u2193', '\x1b[B'), | ||
| ('\u2190', '\x1b[D'), | ||
| ('\u2192', '\x1b[C'), | ||
| ]) | ||
| def test_control_glyph_translated(self, glyph: str, expected: str) -> None: | ||
| f = AtasciiControlFilter() | ||
| result = f.filter(f"before{glyph}after") | ||
| assert f"before{expected}after" == result | ||
| def test_backspace_erases(self) -> None: | ||
| f = AtasciiControlFilter() | ||
| result = f.filter("DINGO\u25c0\u25c0\u25c0\u25c0\u25c0") | ||
| assert result == "DINGO" + "\x08\x1b[P" * 5 | ||
| def test_plain_text_unchanged(self) -> None: | ||
| f = AtasciiControlFilter() | ||
| assert f.filter("hello world") == "hello world" | ||
| def test_atascii_graphics_unchanged(self) -> None: | ||
| f = AtasciiControlFilter() | ||
| text = "\u2663\u2665\u2666\u2660" | ||
| assert f.filter(text) == text | ||
| def test_flush_returns_empty(self) -> None: | ||
| f = AtasciiControlFilter() | ||
| assert f.flush() == "" | ||
| def test_multiple_controls_in_one_string(self) -> None: | ||
| f = AtasciiControlFilter() | ||
| result = f.filter("\u2191\u2193\u2190\u2192") | ||
| assert result == "\x1b[A\x1b[B\x1b[D\x1b[C" |
@@ -453,3 +453,3 @@ """Test instantiation of basic server and client forms.""" | ||
| b"Escape character is '^]'.\r\n" | ||
| b"hello, space cadet.\r\r\n" | ||
| b"hello, space cadet.\r\n" | ||
| b"\x1b[m\r\n" | ||
@@ -482,3 +482,2 @@ b"Connection closed by foreign host.\r\n" | ||
| if inp: | ||
| writer.echo(inp) | ||
| writer.write("\ngoodbye.\n") | ||
@@ -505,3 +504,3 @@ await writer.drain() | ||
| b"Escape character is '^]'.\n" | ||
| b"Press Return to continue:\r\ngoodbye.\n" | ||
| b"Press Return to continue:\r\ngoodbye.\r\n" | ||
| b"\x1b[m\nConnection closed by foreign host.\n" | ||
@@ -508,0 +507,0 @@ ) |
| # std imports | ||
| import asyncio | ||
| import functools | ||
@@ -8,2 +9,3 @@ # 3rd party | ||
| # local | ||
| from telnetlib3 import server_fingerprinting as sfp | ||
| from telnetlib3.guard_shells import ( | ||
@@ -13,2 +15,3 @@ ConnectionCounter, | ||
| busy_shell, | ||
| robot_check, | ||
| robot_shell, | ||
@@ -407,1 +410,50 @@ _latin1_reading, | ||
| await reader.read(-1) | ||
| async def test_fingerprint_scanner_defeats_robot_check(unused_tcp_port): | ||
| """Fingerprint scanner's virtual cursor defeats the server's robot_check.""" | ||
| # local | ||
| from telnetlib3.guard_shells import _TEST_CHAR, _measure_width # noqa: PLC0415 | ||
| from telnetlib3.tests.accessories import create_server # noqa: PLC0415 | ||
| measured_width: list[int | None] = [] | ||
| async def guarded_shell(reader, writer): | ||
| with _latin1_reading(reader): | ||
| width = await _measure_width(reader, writer, _TEST_CHAR, timeout=5.0) | ||
| measured_width.append(width) | ||
| if width == 1: | ||
| writer.write("Robot check passed!\r\n") | ||
| await writer.drain() | ||
| writer.close() | ||
| await writer.wait_closed() | ||
| async with create_server( | ||
| host="127.0.0.1", | ||
| port=unused_tcp_port, | ||
| shell=guarded_shell, | ||
| connect_maxwait=0.5, | ||
| ): | ||
| # local | ||
| import telnetlib3 # noqa: PLC0415 | ||
| shell = functools.partial( | ||
| sfp.fingerprinting_client_shell, | ||
| host="127.0.0.1", | ||
| port=unused_tcp_port, | ||
| silent=True, | ||
| banner_quiet_time=1.0, | ||
| banner_max_wait=5.0, | ||
| ) | ||
| reader, writer = await telnetlib3.open_connection( | ||
| host="127.0.0.1", | ||
| port=unused_tcp_port, | ||
| encoding=False, | ||
| shell=shell, | ||
| connect_minwait=0.5, | ||
| ) | ||
| # Shell runs as a background task — wait for it to finish. | ||
| await asyncio.wait_for(writer.protocol.waiter_closed, timeout=10.0) | ||
| assert measured_width, "server shell never ran" | ||
| assert measured_width[0] == 1, f"expected width=1, got {measured_width[0]}" |
@@ -31,2 +31,7 @@ # std imports | ||
| class _MockProtocol: | ||
| def __init__(self): | ||
| self.force_binary = False | ||
| class MockWriter: | ||
@@ -49,2 +54,3 @@ def __init__(self, extra=None, will_options=None, wont_options=None): | ||
| self.comport_data: dict[str, object] | None = None | ||
| self.protocol = _MockProtocol() | ||
| self._closing = False | ||
@@ -89,2 +95,28 @@ | ||
| class InteractiveMockReader: | ||
| """ | ||
| MockReader that gates chunks behind writer responses. | ||
| The first chunk is available immediately. Each subsequent chunk is released only after the | ||
| writer has accumulated one more write than before, simulating a server that waits for client | ||
| input before sending the next prompt. | ||
| """ | ||
| def __init__(self, chunks, writer): | ||
| self._chunks = list(chunks) | ||
| self._writer = writer | ||
| self._idx = 0 | ||
| async def read(self, n): | ||
| if self._idx >= len(self._chunks): | ||
| await asyncio.sleep(10) | ||
| return b"" | ||
| needed_writes = self._idx | ||
| while len(self._writer._writes) < needed_writes: | ||
| await asyncio.sleep(0.001) | ||
| chunk = self._chunks[self._idx] | ||
| self._idx += 1 | ||
| return chunk[:n] | ||
| _BINARY_PROBE = {"BINARY": {"status": "WILL", "opt": fps.BINARY}} | ||
@@ -219,5 +251,86 @@ | ||
| assert not sfp._format_banner(b"") | ||
| assert sfp._format_banner(b"\xff\xfe\xfd") == "\ufffd\ufffd\ufffd" | ||
| def test_format_banner_surrogateescape(): | ||
| """High bytes are preserved as surrogates, not replaced with U+FFFD.""" | ||
| result = sfp._format_banner(b"\xff\xfe\xb1") | ||
| assert "\ufffd" not in result | ||
| assert result == "\udcff\udcfe\udcb1" | ||
| raw = result.encode("ascii", errors="surrogateescape") | ||
| assert raw == b"\xff\xfe\xb1" | ||
| def test_format_banner_json_roundtrip(): | ||
| """Surrogates survive JSON serialization and can recover raw bytes.""" | ||
| banner = sfp._format_banner(b"Hello\xb1\xb2World") | ||
| encoded = json.dumps(banner) | ||
| decoded = json.loads(encoded) | ||
| assert decoded == banner | ||
| raw = decoded.encode("ascii", errors="surrogateescape") | ||
| assert raw == b"Hello\xb1\xb2World" | ||
| def test_format_banner_unknown_encoding_fallback(): | ||
| """Unknown encoding falls back to latin-1 instead of raising LookupError.""" | ||
| result = sfp._format_banner(b"Hello\xb1World", encoding="x-no-such-codec") | ||
| assert result == "Hello\xb1World" | ||
| assert result == b"Hello\xb1World".decode("latin-1") | ||
| def test_format_banner_atascii(): | ||
| """ATASCII encoding decodes banner bytes through the registered codec.""" | ||
| result = sfp._format_banner(b"Hello\x9b", encoding="atascii") | ||
| assert result == "Hello\n" | ||
| def test_format_banner_petscii_color(): | ||
| """PETSCII color codes are translated to ANSI 24-bit RGB in banners.""" | ||
| result = sfp._format_banner(b"\x1c\xc8\xc9", encoding="petscii") | ||
| assert "\x1b[38;2;" in result | ||
| assert "HI" in result | ||
| assert "\x1c" not in result | ||
| def test_format_banner_petscii_rvs(): | ||
| """PETSCII RVS ON/OFF are translated to ANSI reverse in banners.""" | ||
| result = sfp._format_banner(b"\x12\xc8\xc9\x92", encoding="petscii") | ||
| assert "\x1b[7m" in result | ||
| assert "\x1b[27m" in result | ||
| def test_format_banner_petscii_newline(): | ||
| """PETSCII CR line terminators are normalized to LF in banners.""" | ||
| result = sfp._format_banner(b"\xc8\xc9\x0d\xca\xcb", encoding="petscii") | ||
| assert "HI\nJK" == result | ||
| def test_format_banner_petscii_cursor(): | ||
| """PETSCII cursor controls are translated to ANSI in banners.""" | ||
| result = sfp._format_banner(b"\x13\xc8\xc9", encoding="petscii") | ||
| assert "\x1b[H" in result | ||
| assert "HI" in result | ||
| @pytest.mark.parametrize( | ||
| "data,expected", | ||
| [ | ||
| pytest.param(b"\x1b[0;0 D", "cp437", id="cp437_font0"), | ||
| pytest.param(b"\x1b[0;36 D", "atascii", id="atascii_font36"), | ||
| pytest.param(b"\x1b[0;32 D", "petscii", id="petscii_c64_upper"), | ||
| pytest.param(b"\x1b[0;40 D", "cp437", id="topaz_plus_font40"), | ||
| pytest.param(b"\x1b[1;36 D", "atascii", id="atascii_secondary"), | ||
| pytest.param(b"hello world", None, id="no_sequence"), | ||
| pytest.param(b"\x1b[0;255 D", None, id="unknown_font_id"), | ||
| ], | ||
| ) | ||
| def test_detect_syncterm_font(data, expected): | ||
| assert sfp.detect_syncterm_font(data) == expected | ||
| def test_syncterm_font_in_banner(): | ||
| """Font sequence embedded in banner data is detected.""" | ||
| data = b"Welcome\x1b[0;36 Dto the BBS" | ||
| assert sfp.detect_syncterm_font(data) == "atascii" | ||
| @pytest.mark.asyncio | ||
@@ -722,4 +835,4 @@ async def test_read_banner(): | ||
| [ | ||
| pytest.param(b"Welcome\r\n", b"\r\n", id="no_prompt"), | ||
| pytest.param(b"", b"\r\n", id="empty"), | ||
| pytest.param(b"Welcome\r\n", None, id="no_prompt"), | ||
| pytest.param(b"", None, id="empty"), | ||
| pytest.param(b"Continue? (yes/no) ", b"yes\r\n", id="yes_no_parens"), | ||
@@ -732,35 +845,7 @@ pytest.param(b"Continue? (y/n) ", b"y\r\n", id="y_n_parens"), | ||
| pytest.param(b"Type yes/no please", b"yes\r\n", id="yes_no_space_delimited"), | ||
| pytest.param(b"systemd/network", b"\r\n", id="false_positive_word"), | ||
| pytest.param(b"beyond", b"\r\n", id="substring_y_n_not_matched"), | ||
| pytest.param( | ||
| b"Please enter a name: (or 'who' or 'finger'):", b"who\r\n", id="who_single_quotes" | ||
| ), | ||
| pytest.param(b'Enter your name (or "who"):', b"who\r\n", id="who_double_quotes"), | ||
| pytest.param(b"What is your name? (or 'WHO')", b"who\r\n", id="who_uppercase"), | ||
| pytest.param(b"Enter your name:", b"\r\n", id="name_prompt_no_who"), | ||
| pytest.param( | ||
| b"connect <name> <password>\r\n" | ||
| b"WHO to see players connected.\r\n" | ||
| b"QUIT to disconnect.\r\n", | ||
| b"who\r\n", | ||
| id="who_bare_command_listing", | ||
| ), | ||
| pytest.param(b"Type WHO to list users", b"who\r\n", id="who_bare_mid_sentence"), | ||
| pytest.param(b"somehow", b"\r\n", id="who_inside_word_not_matched"), | ||
| pytest.param(b"Type 'help' for a list of commands:", b"help\r\n", id="help_single_quotes"), | ||
| pytest.param(b'Enter your name (or "help"):', b"help\r\n", id="help_double_quotes"), | ||
| pytest.param( | ||
| b"HELP to see available commands.\r\n", | ||
| b"help\r\n", | ||
| id="help_bare_command_listing", | ||
| ), | ||
| pytest.param(b"Type HELP for info", b"help\r\n", id="help_bare_mid_sentence"), | ||
| pytest.param(b"helpful tips", b"\r\n", id="help_inside_word_not_matched"), | ||
| pytest.param( | ||
| b"connect <name>\r\n" | ||
| b"WHO to see players connected.\r\n" | ||
| b"HELP to see available commands.\r\n", | ||
| b"who\r\n", | ||
| id="who_preferred_over_help", | ||
| ), | ||
| pytest.param(b"Continue? (Yes|No) ", b"yes\r\n", id="yes_pipe_no_parens"), | ||
| pytest.param(b"Accept? (YES|NO):", b"yes\r\n", id="yes_pipe_no_upper"), | ||
| pytest.param(b"systemd/network", None, id="false_positive_word"), | ||
| pytest.param(b"beyond", None, id="substring_y_n_not_matched"), | ||
| pytest.param(b"Enter your name:", None, id="name_prompt_no_who"), | ||
| pytest.param(b"Color? ", b"y\r\n", id="color_question"), | ||
@@ -770,3 +855,3 @@ pytest.param(b"Do you want color? ", b"y\r\n", id="color_in_sentence"), | ||
| pytest.param(b"color ? ", b"y\r\n", id="color_space_before_question"), | ||
| pytest.param(b"colorful display", b"\r\n", id="color_no_question_mark"), | ||
| pytest.param(b"colorful display", None, id="color_no_question_mark"), | ||
| pytest.param( | ||
@@ -784,3 +869,7 @@ b"Select charset:\r\n1) ASCII\r\n2) ISO-8859-1\r\n5) UTF-8\r\n", | ||
| ), | ||
| pytest.param(b"1) ASCII\r\n2) Latin-1\r\n", b"\r\n", id="menu_no_utf8"), | ||
| pytest.param(b"[5] UTF-8\r\nSelect: ", b"5\r\n", id="menu_utf8_brackets"), | ||
| pytest.param(b"[2] utf-8\r\n", b"2\r\n", id="menu_utf8_brackets_lower"), | ||
| pytest.param(b"3. UTF-8\r\n", b"3\r\n", id="menu_utf8_dot"), | ||
| pytest.param(b" 5 ... UTF-8\r\n", b"5\r\n", id="menu_utf8_ellipsis"), | ||
| pytest.param(b"1) ASCII\r\n2) Latin-1\r\n", None, id="menu_no_utf8"), | ||
| pytest.param(b"(1) Ansi\r\n(2) VT100\r\n", b"1\r\n", id="menu_ansi_parens"), | ||
@@ -791,3 +880,16 @@ pytest.param(b"[1] ANSI\r\n[2] VT100\r\n", b"1\r\n", id="menu_ansi_brackets"), | ||
| pytest.param(b"(1] ANSI\r\n", b"1\r\n", id="menu_ansi_mixed_brackets"), | ||
| pytest.param(b"3. ANSI\r\n", b"3\r\n", id="menu_ansi_dot"), | ||
| pytest.param(b"3. English/ANSI\r\n", b"3\r\n", id="menu_english_ansi"), | ||
| pytest.param(b"2. English/ANSI\r\n", b"2\r\n", id="menu_english_ansi_2"), | ||
| pytest.param( | ||
| b" 1 ... English/ANSI The standard\r\n", | ||
| b"1\r\n", | ||
| id="menu_ansi_ellipsis", | ||
| ), | ||
| pytest.param( | ||
| b" 2 .. English/ANSI\r\n", | ||
| b"2\r\n", | ||
| id="menu_ansi_double_dot", | ||
| ), | ||
| pytest.param( | ||
| b"1) ASCII\r\n2) UTF-8\r\n(3) Ansi\r\n", | ||
@@ -797,2 +899,7 @@ b"2\r\n", | ||
| ), | ||
| pytest.param( | ||
| b"1. ASCII\r\n2. UTF-8\r\n3. English/ANSI\r\n", | ||
| b"2\r\n", | ||
| id="menu_utf8_dot_preferred_over_ansi_dot", | ||
| ), | ||
| pytest.param(b"gb/big5", b"big5\r\n", id="gb_big5"), | ||
@@ -802,9 +909,178 @@ pytest.param(b"GB/Big5\r\n", b"big5\r\n", id="gb_big5_mixed_case"), | ||
| pytest.param(b"gb/big 5\r\n", b"big5\r\n", id="gb_big5_space_before_5"), | ||
| pytest.param(b"bigfoot5", b"\r\n", id="big5_inside_word_not_matched"), | ||
| pytest.param(b"bigfoot5", None, id="big5_inside_word_not_matched"), | ||
| pytest.param( | ||
| b"Press [.ESC.] twice within 15 seconds to CONTINUE...", | ||
| b"\x1b\x1b", | ||
| id="esc_twice_mystic", | ||
| ), | ||
| pytest.param( | ||
| b"Press [ESC] twice to continue", | ||
| b"\x1b\x1b", | ||
| id="esc_twice_no_dots", | ||
| ), | ||
| pytest.param( | ||
| b"Press ESC twice to continue", | ||
| b"\x1b\x1b", | ||
| id="esc_twice_bare", | ||
| ), | ||
| pytest.param( | ||
| b"Press <Esc> twice for the BBS ... ", | ||
| b"\x1b\x1b", | ||
| id="esc_twice_angle_brackets", | ||
| ), | ||
| pytest.param( | ||
| b"\x1b[33mPress [.ESC.] twice within 10 seconds\x1b[0m", | ||
| b"\x1b\x1b", | ||
| id="esc_twice_ansi_wrapped", | ||
| ), | ||
| pytest.param( | ||
| b"\x1b[1;1H\x1b[2JPress [.ESC.] twice within 15 seconds to CONTINUE...", | ||
| b"\x1b\x1b", | ||
| id="esc_twice_after_clear_screen", | ||
| ), | ||
| pytest.param( | ||
| b"Please press [ESC] to continue", | ||
| b"\x1b", | ||
| id="esc_once_brackets", | ||
| ), | ||
| pytest.param( | ||
| b"Press ESC to continue", | ||
| b"\x1b", | ||
| id="esc_once_bare", | ||
| ), | ||
| pytest.param( | ||
| b"press <Esc> to continue", | ||
| b"\x1b", | ||
| id="esc_once_angle_brackets", | ||
| ), | ||
| pytest.param( | ||
| b"\x1b[33mPress [ESC] to continue\x1b[0m", | ||
| b"\x1b", | ||
| id="esc_once_ansi_wrapped", | ||
| ), | ||
| pytest.param(b"HIT RETURN:", b"\r\n", id="hit_return"), | ||
| pytest.param(b"Hit Return.", b"\r\n", id="hit_return_lower"), | ||
| pytest.param(b"PRESS RETURN:", b"\r\n", id="press_return"), | ||
| pytest.param(b"Press Enter:", b"\r\n", id="press_enter"), | ||
| pytest.param(b"press enter", b"\r\n", id="press_enter_lower"), | ||
| pytest.param(b"Hit Enter to continue", b"\r\n", id="hit_enter"), | ||
| pytest.param( | ||
| b"\x1b[1mHIT RETURN:\x1b[0m", | ||
| b"\r\n", | ||
| id="hit_return_ansi_wrapped", | ||
| ), | ||
| pytest.param( | ||
| b"\x1b[31mColor? \x1b[0m", | ||
| b"y\r\n", | ||
| id="color_ansi_wrapped", | ||
| ), | ||
| pytest.param( | ||
| b"\x1b[1mContinue? (y/n)\x1b[0m ", | ||
| b"y\r\n", | ||
| id="yn_ansi_wrapped", | ||
| ), | ||
| pytest.param( | ||
| b"Do you support the ANSI color standard (Yn)? ", | ||
| b"y\r\n", | ||
| id="yn_paren_capital_y", | ||
| ), | ||
| pytest.param( | ||
| b"Continue? [Yn]", | ||
| b"y\r\n", | ||
| id="yn_bracket_capital_y", | ||
| ), | ||
| pytest.param( | ||
| b"Do something (yN)", | ||
| b"y\r\n", | ||
| id="yn_paren_capital_n", | ||
| ), | ||
| pytest.param( | ||
| b"More: (Y)es, (N)o, (C)ontinuous?", | ||
| b"C\r\n", | ||
| id="more_continuous", | ||
| ), | ||
| pytest.param( | ||
| b"\x1b[33mMore: (Y)es, (N)o, (C)ontinuous?\x1b[0m", | ||
| b"C\r\n", | ||
| id="more_continuous_ansi", | ||
| ), | ||
| pytest.param( | ||
| b"more (Y/N/C)ontinuous: ", | ||
| b"C\r\n", | ||
| id="more_ync_compact", | ||
| ), | ||
| pytest.param( | ||
| b"Press the BACKSPACE key to detect your terminal type: ", | ||
| b"\x08", | ||
| id="backspace_key_telnetbible", | ||
| ), | ||
| pytest.param( | ||
| b"\x1b[1mPress the BACKSPACE key\x1b[0m", | ||
| b"\x08", | ||
| id="backspace_key_ansi_wrapped", | ||
| ), | ||
| pytest.param( | ||
| b"\x0cpress del/backspace:", | ||
| b"\x14", | ||
| id="petscii_del_backspace", | ||
| ), | ||
| pytest.param( | ||
| b"\x0c\r\npress del/backspace:", | ||
| b"\x14", | ||
| id="petscii_del_backspace_crlf", | ||
| ), | ||
| pytest.param( | ||
| b"press backspace:", | ||
| b"\x14", | ||
| id="petscii_backspace_only", | ||
| ), | ||
| pytest.param( | ||
| b"press del:", | ||
| b"\x14", | ||
| id="petscii_del_only", | ||
| ), | ||
| pytest.param( | ||
| b"PRESS DEL/BACKSPACE.", | ||
| b"\x14", | ||
| id="petscii_del_backspace_upper", | ||
| ), | ||
| pytest.param( | ||
| b"press backspace/del:", | ||
| b"\x14", | ||
| id="petscii_backspace_del_reversed", | ||
| ), | ||
| pytest.param( | ||
| b"PLEASE HIT YOUR BACKSPACE/DELETE\r\nKEY FOR C/G DETECT:", | ||
| b"\x14", | ||
| id="petscii_hit_your_backspace_delete", | ||
| ), | ||
| pytest.param( | ||
| b"hit your delete/backspace key:", | ||
| b"\x14", | ||
| id="petscii_hit_your_delete_backspace_key", | ||
| ), | ||
| ], | ||
| ) | ||
| def test_detect_yn_prompt(banner, expected): | ||
| assert sfp._detect_yn_prompt(banner) == expected | ||
| assert sfp._detect_yn_prompt(banner).response == expected | ||
| @pytest.mark.parametrize( | ||
| "banner, expected_encoding", | ||
| [ | ||
| pytest.param(b"5) UTF-8\r\n", "utf-8", id="utf8_menu"), | ||
| pytest.param(b"[2] utf-8\r\n", "utf-8", id="utf8_brackets"), | ||
| pytest.param(b"1) UTF8", "utf-8", id="utf8_no_hyphen"), | ||
| pytest.param(b"gb/big5", "big5", id="gb_big5"), | ||
| pytest.param(b"GB/Big5\r\n", "big5", id="gb_big5_mixed"), | ||
| pytest.param(b"(1) Ansi\r\n", None, id="ansi_no_encoding"), | ||
| pytest.param(b"yes/no", None, id="yn_no_encoding"), | ||
| pytest.param(b"Color? ", None, id="color_no_encoding"), | ||
| pytest.param(b"nothing special", None, id="none_no_encoding"), | ||
| ], | ||
| ) | ||
| def test_detect_yn_prompt_encoding(banner, expected_encoding): | ||
| assert sfp._detect_yn_prompt(banner).encoding == expected_encoding | ||
| @pytest.mark.asyncio | ||
@@ -855,2 +1131,24 @@ async def test_fingerprinting_shell_yn_prompt(tmp_path): | ||
| @pytest.mark.asyncio | ||
| async def test_fingerprinting_shell_esc_twice_prompt(tmp_path): | ||
| """Banner with ESC-twice botcheck sends two raw ESC bytes.""" | ||
| save_path = str(tmp_path / "result.json") | ||
| reader = MockReader([b"Press [.ESC.] twice within 15 seconds to CONTINUE..."]) | ||
| writer = MockWriter(will_options=[fps.SGA]) | ||
| await sfp.fingerprinting_client_shell( | ||
| reader, | ||
| writer, | ||
| host="localhost", | ||
| port=23, | ||
| save_path=save_path, | ||
| silent=True, | ||
| banner_quiet_time=0.01, | ||
| banner_max_wait=0.01, | ||
| mssp_wait=0.01, | ||
| ) | ||
| assert b"\x1b\x1b" in writer._writes | ||
| @pytest.mark.asyncio | ||
| async def test_fingerprinting_shell_no_yn_prompt(tmp_path): | ||
@@ -880,7 +1178,11 @@ """Banner without y/n prompt sends bare '\\r\\n'.""" | ||
| @pytest.mark.asyncio | ||
| async def test_fingerprinting_shell_who_prompt(tmp_path): | ||
| """Banner with 'who' option causes 'who\\r\\n' response.""" | ||
| async def test_fingerprinting_shell_multi_prompt(tmp_path): | ||
| """Server asks color first, then presents a UTF-8 charset menu.""" | ||
| save_path = str(tmp_path / "result.json") | ||
| reader = MockReader([b"Please enter a name: (or 'who' or 'finger'):"]) | ||
| writer = MockWriter(will_options=[fps.SGA]) | ||
| reader = InteractiveMockReader([ | ||
| b"Color? ", | ||
| b"Select charset:\r\n1) ASCII\r\n2) UTF-8\r\n", | ||
| b"Welcome!\r\n", | ||
| ], writer) | ||
@@ -899,11 +1201,17 @@ await sfp.fingerprinting_client_shell( | ||
| assert b"who\r\n" in writer._writes | ||
| assert b"y\r\n" in writer._writes | ||
| assert b"2\r\n" in writer._writes | ||
| assert writer.environ_encoding == "utf-8" | ||
| assert writer.protocol.force_binary is True | ||
| @pytest.mark.asyncio | ||
| async def test_fingerprinting_shell_help_prompt(tmp_path): | ||
| """Banner with 'help' option causes 'help\\r\\n' response.""" | ||
| async def test_fingerprinting_shell_multi_prompt_stops_on_bare_return(tmp_path): | ||
| """Loop stops after a bare \\r\\n response (no prompt detected).""" | ||
| save_path = str(tmp_path / "result.json") | ||
| reader = MockReader([b"Type 'help' for a list of commands:"]) | ||
| writer = MockWriter(will_options=[fps.SGA]) | ||
| reader = InteractiveMockReader([ | ||
| b"Color? ", | ||
| b"Welcome!\r\n", | ||
| ], writer) | ||
@@ -922,5 +1230,33 @@ await sfp.fingerprinting_client_shell( | ||
| assert b"help\r\n" in writer._writes | ||
| assert b"y\r\n" in writer._writes | ||
| prompt_writes = [w for w in writer._writes if w in (b"y\r\n", b"\r\n")] | ||
| assert len(prompt_writes) == 2 | ||
| assert prompt_writes == [b"y\r\n", b"\r\n"] | ||
| @pytest.mark.asyncio | ||
| async def test_fingerprinting_shell_multi_prompt_max_replies(tmp_path): | ||
| """Loop does not exceed _MAX_PROMPT_REPLIES rounds.""" | ||
| save_path = str(tmp_path / "result.json") | ||
| writer = MockWriter(will_options=[fps.SGA]) | ||
| banners = [f"Color? (round {i}) ".encode() | ||
| for i in range(sfp._MAX_PROMPT_REPLIES + 1)] | ||
| reader = InteractiveMockReader(banners, writer) | ||
| await sfp.fingerprinting_client_shell( | ||
| reader, | ||
| writer, | ||
| host="localhost", | ||
| port=23, | ||
| save_path=save_path, | ||
| silent=True, | ||
| banner_quiet_time=0.01, | ||
| banner_max_wait=0.01, | ||
| mssp_wait=0.01, | ||
| ) | ||
| y_writes = [w for w in writer._writes if w == b"y\r\n"] | ||
| assert len(y_writes) == sfp._MAX_PROMPT_REPLIES | ||
| class TestCullDisplay: | ||
@@ -948,1 +1284,276 @@ """Tests for _cull_display bytes conversion.""" | ||
| assert result == {} | ||
| @pytest.mark.asyncio | ||
| async def test_read_banner_until_quiet_responds_to_dsr(): | ||
| """DSR (ESC[6n) in banner data triggers a CPR response (ESC[1;1R).""" | ||
| reader = MockReader([b"Hello\x1b[6nWorld"]) | ||
| writer = MockWriter() | ||
| result = await sfp._read_banner_until_quiet( | ||
| reader, quiet_time=0.01, max_wait=0.05, writer=writer, | ||
| ) | ||
| assert result == b"Hello\x1b[6nWorld" | ||
| assert b"\x1b[1;1R" in writer._writes | ||
| @pytest.mark.asyncio | ||
| async def test_read_banner_until_quiet_multiple_dsr(): | ||
| """Multiple DSR requests each get a CPR response.""" | ||
| reader = MockReader([b"\x1b[6n", b"banner\x1b[6n"]) | ||
| writer = MockWriter() | ||
| await sfp._read_banner_until_quiet( | ||
| reader, quiet_time=0.01, max_wait=0.05, writer=writer, | ||
| ) | ||
| cpr_count = sum(1 for w in writer._writes if w == b"\x1b[1;1R") | ||
| assert cpr_count == 2 | ||
| @pytest.mark.asyncio | ||
| async def test_read_banner_until_quiet_no_dsr_no_write(): | ||
| """No DSR in banner means no CPR writes.""" | ||
| reader = MockReader([b"Welcome to BBS\r\n"]) | ||
| writer = MockWriter() | ||
| await sfp._read_banner_until_quiet( | ||
| reader, quiet_time=0.01, max_wait=0.05, writer=writer, | ||
| ) | ||
| assert not writer._writes | ||
| @pytest.mark.asyncio | ||
| async def test_read_banner_until_quiet_no_writer_ignores_dsr(): | ||
| """Without a writer, DSR is silently ignored.""" | ||
| reader = MockReader([b"Hello\x1b[6n"]) | ||
| result = await sfp._read_banner_until_quiet( | ||
| reader, quiet_time=0.01, max_wait=0.05, | ||
| ) | ||
| assert result == b"Hello\x1b[6n" | ||
| @pytest.mark.asyncio | ||
| async def test_fingerprinting_shell_dsr_response(tmp_path): | ||
| """Full session responds to DSR in the pre-return banner.""" | ||
| save_path = str(tmp_path / "result.json") | ||
| reader = MockReader([b"\x1b[6nWelcome to BBS\r\n"]) | ||
| writer = MockWriter(will_options=[fps.SGA]) | ||
| await sfp.fingerprinting_client_shell( | ||
| reader, | ||
| writer, | ||
| host="localhost", | ||
| port=23, | ||
| save_path=save_path, | ||
| silent=True, | ||
| banner_quiet_time=0.01, | ||
| banner_max_wait=0.01, | ||
| mssp_wait=0.01, | ||
| ) | ||
| assert b"\x1b[1;1R" in writer._writes | ||
| @pytest.mark.asyncio | ||
| async def test_fingerprinting_settle_dsr_response(tmp_path): | ||
| """DSR arriving during negotiation settle gets an immediate CPR reply.""" | ||
| save_path = str(tmp_path / "result.json") | ||
| reader = MockReader([b"\x1b[6nWelcome\r\n"]) | ||
| writer = MockWriter(will_options=[fps.SGA]) | ||
| await sfp.fingerprinting_client_shell( | ||
| reader, | ||
| writer, | ||
| host="localhost", | ||
| port=23, | ||
| save_path=save_path, | ||
| silent=True, | ||
| banner_quiet_time=0.01, | ||
| banner_max_wait=0.01, | ||
| mssp_wait=0.01, | ||
| ) | ||
| assert b"\x1b[1;1R" in writer._writes | ||
| @pytest.mark.asyncio | ||
| async def test_fingerprinting_shell_ansi_ellipsis_menu(tmp_path): | ||
| """Worldgroup/MajorBBS ellipsis-menu selects first numbered option.""" | ||
| save_path = str(tmp_path / "result.json") | ||
| writer = MockWriter(will_options=[fps.SGA, fps.ECHO]) | ||
| reader = InteractiveMockReader([ | ||
| (b"Please choose one of these languages/protocols:\r\n\r\n" | ||
| b" 1 ... English/ANSI The standard English language version\r\n" | ||
| b" 2 ... English/RIP The English version of RIPscrip graphics\r\n" | ||
| b"\r\nChoose a number from 1 to 2: "), | ||
| b"Welcome!\r\n", | ||
| ], writer) | ||
| await sfp.fingerprinting_client_shell( | ||
| reader, | ||
| writer, | ||
| host="localhost", | ||
| port=23, | ||
| save_path=save_path, | ||
| silent=True, | ||
| banner_quiet_time=0.01, | ||
| banner_max_wait=0.01, | ||
| mssp_wait=0.01, | ||
| ) | ||
| assert b"1\r\n" in writer._writes | ||
| @pytest.mark.asyncio | ||
| async def test_read_banner_inline_esc_twice(): | ||
| """ESC-twice botcheck is responded to inline during banner collection.""" | ||
| reader = MockReader([ | ||
| b"Mystic BBS v1.12\r\n", | ||
| b"Press [.ESC.] twice within 15 seconds to CONTINUE...\r\n", | ||
| b"Press [.ESC.] twice within 14 seconds to CONTINUE...\r\n", | ||
| ]) | ||
| writer = MockWriter() | ||
| await sfp._read_banner_until_quiet( | ||
| reader, quiet_time=0.01, max_wait=0.05, writer=writer, | ||
| ) | ||
| assert b"\x1b\x1b" in writer._writes | ||
| esc_count = sum(1 for w in writer._writes if w == b"\x1b\x1b") | ||
| assert esc_count == 1 | ||
| @pytest.mark.asyncio | ||
| async def test_read_banner_inline_esc_once(): | ||
| """ESC-once prompt is responded to inline during banner collection.""" | ||
| reader = MockReader([b"Press [ESC] to continue\r\n"]) | ||
| writer = MockWriter() | ||
| await sfp._read_banner_until_quiet( | ||
| reader, quiet_time=0.01, max_wait=0.05, writer=writer, | ||
| ) | ||
| assert b"\x1b" in writer._writes | ||
| @pytest.mark.asyncio | ||
| async def test_fingerprinting_shell_esc_inline_no_duplicate(tmp_path): | ||
| """Inline ESC response prevents duplicate in the prompt loop.""" | ||
| save_path = str(tmp_path / "result.json") | ||
| writer = MockWriter(will_options=[fps.SGA]) | ||
| reader = InteractiveMockReader([ | ||
| b"Press [.ESC.] twice within 15 seconds to CONTINUE...\r\n", | ||
| b"Welcome to Mystic BBS!\r\nLogin: ", | ||
| ], writer) | ||
| await sfp.fingerprinting_client_shell( | ||
| reader, | ||
| writer, | ||
| host="localhost", | ||
| port=23, | ||
| save_path=save_path, | ||
| silent=True, | ||
| banner_quiet_time=0.01, | ||
| banner_max_wait=0.01, | ||
| mssp_wait=0.01, | ||
| ) | ||
| esc_writes = [w for w in writer._writes if w == b"\x1b\x1b"] | ||
| assert len(esc_writes) == 1 | ||
| @pytest.mark.asyncio | ||
| async def test_fingerprinting_shell_delayed_prompt(tmp_path): | ||
| """Bare-return banner followed by ESC-twice prompt still gets answered.""" | ||
| save_path = str(tmp_path / "result.json") | ||
| writer = MockWriter(will_options=[fps.SGA]) | ||
| reader = InteractiveMockReader([ | ||
| b"Starting BBS-DOS...\r\n", | ||
| b"Press [.ESC.] twice within 15 seconds to CONTINUE...", | ||
| b"Welcome!\r\n", | ||
| ], writer) | ||
| await sfp.fingerprinting_client_shell( | ||
| reader, | ||
| writer, | ||
| host="localhost", | ||
| port=23, | ||
| save_path=save_path, | ||
| silent=True, | ||
| banner_quiet_time=0.01, | ||
| banner_max_wait=0.01, | ||
| mssp_wait=0.01, | ||
| ) | ||
| assert b"\x1b\x1b" in writer._writes | ||
| @pytest.mark.asyncio | ||
| async def test_read_banner_virtual_cursor_defeats_robot_check(): | ||
| """DSR-space-DSR produces CPR col=1 then col=2 (width=1).""" | ||
| reader = MockReader([b"\x1b[6n \x1b[6n"]) | ||
| writer = MockWriter() | ||
| cursor = sfp._VirtualCursor() | ||
| await sfp._read_banner_until_quiet( | ||
| reader, quiet_time=0.01, max_wait=0.05, writer=writer, cursor=cursor, | ||
| ) | ||
| cpr_writes = [w for w in writer._writes if b"R" in w] | ||
| assert cpr_writes[0] == b"\x1b[1;1R" | ||
| assert cpr_writes[1] == b"\x1b[1;2R" | ||
| @pytest.mark.asyncio | ||
| async def test_read_banner_virtual_cursor_separate_chunks(): | ||
| """DSR in separate chunks still tracks cursor correctly.""" | ||
| reader = MockReader([b"\x1b[6n", b" \x1b[6n"]) | ||
| writer = MockWriter() | ||
| cursor = sfp._VirtualCursor() | ||
| await sfp._read_banner_until_quiet( | ||
| reader, quiet_time=0.01, max_wait=0.05, writer=writer, cursor=cursor, | ||
| ) | ||
| cpr_writes = [w for w in writer._writes if b"R" in w] | ||
| assert cpr_writes[0] == b"\x1b[1;1R" | ||
| assert cpr_writes[1] == b"\x1b[1;2R" | ||
| @pytest.mark.asyncio | ||
| async def test_read_banner_virtual_cursor_wide_char(): | ||
| """Wide CJK character advances cursor by 2.""" | ||
| reader = MockReader([b"\x1b[6n\xe4\xb8\xad\x1b[6n"]) | ||
| writer = MockWriter() | ||
| cursor = sfp._VirtualCursor() | ||
| await sfp._read_banner_until_quiet( | ||
| reader, quiet_time=0.01, max_wait=0.05, writer=writer, cursor=cursor, | ||
| ) | ||
| cpr_writes = [w for w in writer._writes if b"R" in w] | ||
| assert cpr_writes[0] == b"\x1b[1;1R" | ||
| assert cpr_writes[1] == b"\x1b[1;3R" | ||
| def test_virtual_cursor_backspace(): | ||
| """Backspace moves cursor left.""" | ||
| cursor = sfp._VirtualCursor() | ||
| cursor.advance(b"AB\x08") | ||
| assert cursor.col == 2 | ||
| def test_virtual_cursor_cr(): | ||
| """Carriage return resets cursor to column 1.""" | ||
| cursor = sfp._VirtualCursor() | ||
| cursor.advance(b"Hello\r") | ||
| assert cursor.col == 1 | ||
| def test_virtual_cursor_ansi_stripped(): | ||
| """ANSI color codes do not advance cursor.""" | ||
| cursor = sfp._VirtualCursor() | ||
| cursor.advance(b"\x1b[31mX\x1b[0m") | ||
| assert cursor.col == 2 | ||
| @pytest.mark.parametrize("response,encoding,expected", [ | ||
| pytest.param(b"\r\n", "atascii", b"\x9b", id="atascii_bare_return"), | ||
| pytest.param(b"yes\r\n", "atascii", b"yes\x9b", id="atascii_yes"), | ||
| pytest.param(b"y\r\n", "atascii", b"y\x9b", id="atascii_y"), | ||
| pytest.param(b"\r\n", "ascii", b"\r\n", id="ascii_unchanged"), | ||
| pytest.param(b"\r\n", "utf-8", b"\r\n", id="utf8_unchanged"), | ||
| pytest.param(b"yes\r\n", "utf-8", b"yes\r\n", id="utf8_yes_unchanged"), | ||
| pytest.param(b"\x1b\x1b", "atascii", b"\x1b\x1b", id="atascii_esc_esc"), | ||
| ]) | ||
| def test_reencode_prompt(response, encoding, expected): | ||
| # local | ||
| import telnetlib3 # noqa: F401 - registers codecs | ||
| assert sfp._reencode_prompt(response, encoding) == expected |
@@ -272,18 +272,2 @@ # std imports | ||
| @pytest.mark.parametrize( | ||
| "input_chars,closing,expected", | ||
| [ | ||
| pytest.param(["a"], False, "a", id="normal"), | ||
| pytest.param(["\x1b", "A", "x"], False, "x", id="skips_escape"), | ||
| pytest.param([], True, None, id="returns_none_when_closing"), | ||
| pytest.param(["\x1b", "1", "A", "x"], False, "x", id="escape_non_letter"), | ||
| ], | ||
| ) | ||
| @pytest.mark.asyncio | ||
| async def test_get_next_ascii(input_chars, closing, expected): | ||
| writer = MockWriter() | ||
| writer._closing = closing | ||
| assert await ss.get_next_ascii(MockReader(input_chars), writer) == expected | ||
| @pytest.mark.parametrize( | ||
| "input_data,expected", | ||
@@ -361,4 +345,4 @@ [ | ||
| [ | ||
| pytest.param(2, True, id="width_2"), | ||
| pytest.param(1, False, id="width_1"), | ||
| pytest.param(1, True, id="width_1"), | ||
| pytest.param(2, False, id="width_2"), | ||
| pytest.param(None, False, id="width_none"), | ||
@@ -365,0 +349,0 @@ ], |
+1
-1
@@ -148,3 +148,3 @@ [tox] | ||
| codespell --skip="*.pyc,htmlcov*,_build,build,*.egg-info,.tox,.git" \ | ||
| --ignore-words-list="wont,nams,flushin,thirdparty,lient,caf" \ | ||
| --ignore-words-list="wont,nams,flushin,thirdparty,lient,caf,alo" \ | ||
| --uri-ignore-words-list "*" \ | ||
@@ -151,0 +151,0 @@ --summary --count |
Sorry, the diff of this file is too big to display
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
1273082
8.87%110
3.77%27627
9.2%