New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

telnetlib3

Package Overview
Dependencies
Maintainers
1
Versions
36
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

telnetlib3 - pypi Package Compare versions

Comparing version
2.4.0
to
2.5.0
+418
telnetlib3/encodings/atascii.py
"""
ATASCII (Atari 8-bit) encoding.
ATASCII is the character encoding used by Atari 8-bit computers (400, 800,
XL, XE series). It shares the printable ASCII range 0x20-0x7A with standard
ASCII, but replaces control codes 0x00-0x1F with graphics characters and uses
0x80-0xFF as inverse-video variants.
Mapping sources:
- https://www.kreativekorp.com/charset/map/atascii/
- https://github.com/JSJvR/atari-8-bit-utils
The inverse-video range (0x80-0xFF) maps to the same Unicode characters as
the corresponding normal byte (byte & 0x7F), except where a distinct glyph
exists (e.g. complementary block elements). This makes encoding lossy for
inverse bytes, which is the same trade-off as the PETSCII codec.
Notable: 0x9B is the ATASCII end-of-line character (mapped to U+000A LF).
"""
# std imports
import codecs
from typing import Dict, Tuple, Union
# Decoding Table -- ATASCII, 256 entries.
#
# 0x00-0x1F : graphics characters (heart, box drawing, triangles, etc.)
# 0x20-0x5F : standard ASCII (space, digits, uppercase, punctuation)
# 0x60 : diamond suit
# 0x61-0x7A : lowercase a-z
# 0x7B-0x7F : spade suit, pipe, clear-screen, backspace, tab glyphs
# 0x80-0xFF : inverse video (mostly same glyphs as 0x00-0x7F)
DECODING_TABLE = (
# 0x00-0x1F: Graphics characters
'\u2665' # 0x00 BLACK HEART SUIT
'\u251c' # 0x01 BOX DRAWINGS LIGHT VERTICAL AND RIGHT
'\u23b9' # 0x02 RIGHT VERTICAL BOX LINE
'\u2518' # 0x03 BOX DRAWINGS LIGHT UP AND LEFT
'\u2524' # 0x04 BOX DRAWINGS LIGHT VERTICAL AND LEFT
'\u2510' # 0x05 BOX DRAWINGS LIGHT DOWN AND LEFT
'\u2571' # 0x06 BOX DRAWINGS LIGHT DIAGONAL UPPER RIGHT TO LOWER LEFT
'\u2572' # 0x07 BOX DRAWINGS LIGHT DIAGONAL UPPER LEFT TO LOWER RIGHT
'\u25e2' # 0x08 BLACK LOWER RIGHT TRIANGLE
'\u2597' # 0x09 QUADRANT LOWER RIGHT
'\u25e3' # 0x0A BLACK LOWER LEFT TRIANGLE
'\u259d' # 0x0B QUADRANT UPPER RIGHT
'\u2598' # 0x0C QUADRANT UPPER LEFT
'\U0001fb82' # 0x0D UPPER ONE QUARTER BLOCK
'\u2582' # 0x0E LOWER ONE QUARTER BLOCK
'\u2596' # 0x0F QUADRANT LOWER LEFT
'\u2663' # 0x10 BLACK CLUB SUIT
'\u250c' # 0x11 BOX DRAWINGS LIGHT DOWN AND RIGHT
'\u2500' # 0x12 BOX DRAWINGS LIGHT HORIZONTAL
'\u253c' # 0x13 BOX DRAWINGS LIGHT VERTICAL AND HORIZONTAL
'\u25cf' # 0x14 BLACK CIRCLE
'\u2584' # 0x15 LOWER HALF BLOCK
'\u258e' # 0x16 LEFT ONE QUARTER BLOCK
'\u252c' # 0x17 BOX DRAWINGS LIGHT DOWN AND HORIZONTAL
'\u2534' # 0x18 BOX DRAWINGS LIGHT UP AND HORIZONTAL
'\u258c' # 0x19 LEFT HALF BLOCK
'\u2514' # 0x1A BOX DRAWINGS LIGHT UP AND RIGHT
'\u241b' # 0x1B SYMBOL FOR ESCAPE
'\u2191' # 0x1C UPWARDS ARROW (cursor up)
'\u2193' # 0x1D DOWNWARDS ARROW (cursor down)
'\u2190' # 0x1E LEFTWARDS ARROW (cursor left)
'\u2192' # 0x1F RIGHTWARDS ARROW (cursor right)
# 0x20-0x5F: Standard ASCII
' ' # 0x20 SPACE
'!' # 0x21
'"' # 0x22
'#' # 0x23
'$' # 0x24
'%' # 0x25
'&' # 0x26
"'" # 0x27
'(' # 0x28
')' # 0x29
'*' # 0x2A
'+' # 0x2B
',' # 0x2C
'-' # 0x2D
'.' # 0x2E
'/' # 0x2F
'0' # 0x30
'1' # 0x31
'2' # 0x32
'3' # 0x33
'4' # 0x34
'5' # 0x35
'6' # 0x36
'7' # 0x37
'8' # 0x38
'9' # 0x39
':' # 0x3A
';' # 0x3B
'<' # 0x3C
'=' # 0x3D
'>' # 0x3E
'?' # 0x3F
'@' # 0x40
'A' # 0x41
'B' # 0x42
'C' # 0x43
'D' # 0x44
'E' # 0x45
'F' # 0x46
'G' # 0x47
'H' # 0x48
'I' # 0x49
'J' # 0x4A
'K' # 0x4B
'L' # 0x4C
'M' # 0x4D
'N' # 0x4E
'O' # 0x4F
'P' # 0x50
'Q' # 0x51
'R' # 0x52
'S' # 0x53
'T' # 0x54
'U' # 0x55
'V' # 0x56
'W' # 0x57
'X' # 0x58
'Y' # 0x59
'Z' # 0x5A
'[' # 0x5B
'\\' # 0x5C
']' # 0x5D
'^' # 0x5E
'_' # 0x5F
# 0x60-0x7F: Lowercase + special glyphs
'\u2666' # 0x60 BLACK DIAMOND SUIT
'a' # 0x61
'b' # 0x62
'c' # 0x63
'd' # 0x64
'e' # 0x65
'f' # 0x66
'g' # 0x67
'h' # 0x68
'i' # 0x69
'j' # 0x6A
'k' # 0x6B
'l' # 0x6C
'm' # 0x6D
'n' # 0x6E
'o' # 0x6F
'p' # 0x70
'q' # 0x71
'r' # 0x72
's' # 0x73
't' # 0x74
'u' # 0x75
'v' # 0x76
'w' # 0x77
'x' # 0x78
'y' # 0x79
'z' # 0x7A
'\u2660' # 0x7B BLACK SPADE SUIT
'|' # 0x7C VERTICAL LINE
'\u21b0' # 0x7D UPWARDS ARROW WITH TIP LEFTWARDS (clear screen)
'\u25c0' # 0x7E BLACK LEFT-POINTING TRIANGLE (backspace)
'\u25b6' # 0x7F BLACK RIGHT-POINTING TRIANGLE (tab)
# 0x80-0xFF: Inverse video range
# Bytes with distinct glyphs get their own Unicode mapping;
# the rest share the same character as (byte & 0x7F).
'\u2665' # 0x80 = inverse of 0x00 (heart)
'\u251c' # 0x81 = inverse of 0x01
'\u258a' # 0x82 LEFT THREE QUARTERS BLOCK (distinct)
'\u2518' # 0x83 = inverse of 0x03
'\u2524' # 0x84 = inverse of 0x04
'\u2510' # 0x85 = inverse of 0x05
'\u2571' # 0x86 = inverse of 0x06
'\u2572' # 0x87 = inverse of 0x07
'\u25e4' # 0x88 BLACK UPPER LEFT TRIANGLE (distinct)
'\u259b' # 0x89 QUADRANT UPPER LEFT AND UPPER RIGHT AND LOWER LEFT (distinct)
'\u25e5' # 0x8A BLACK UPPER RIGHT TRIANGLE (distinct)
'\u2599' # 0x8B QUADRANT UPPER LEFT AND LOWER LEFT AND LOWER RIGHT (distinct)
'\u259f' # 0x8C QUADRANT UPPER RIGHT AND LOWER LEFT AND LOWER RIGHT (distinct)
'\u2586' # 0x8D LOWER THREE QUARTERS BLOCK (distinct)
'\U0001fb85' # 0x8E UPPER THREE QUARTERS BLOCK (distinct)
'\u259c' # 0x8F QUADRANT UPPER LEFT AND UPPER RIGHT AND LOWER RIGHT (distinct)
'\u2663' # 0x90 = inverse of 0x10 (club)
'\u250c' # 0x91 = inverse of 0x11
'\u2500' # 0x92 = inverse of 0x12
'\u253c' # 0x93 = inverse of 0x13
'\u25d8' # 0x94 INVERSE BULLET (distinct)
'\u2580' # 0x95 UPPER HALF BLOCK (distinct)
'\U0001fb8a' # 0x96 RIGHT THREE QUARTERS BLOCK (distinct)
'\u252c' # 0x97 = inverse of 0x17
'\u2534' # 0x98 = inverse of 0x18
'\u2590' # 0x99 RIGHT HALF BLOCK (distinct)
'\u2514' # 0x9A = inverse of 0x1A
'\n' # 0x9B ATASCII END OF LINE
'\u2191' # 0x9C = inverse of 0x1C (up arrow)
'\u2193' # 0x9D = inverse of 0x1D (down arrow)
'\u2190' # 0x9E = inverse of 0x1E (left arrow)
'\u2192' # 0x9F = inverse of 0x1F (right arrow)
'\u2588' # 0xA0 FULL BLOCK (distinct)
'!' # 0xA1 = inverse of 0x21
'"' # 0xA2 = inverse of 0x22
'#' # 0xA3 = inverse of 0x23
'$' # 0xA4 = inverse of 0x24
'%' # 0xA5 = inverse of 0x25
'&' # 0xA6 = inverse of 0x26
"'" # 0xA7 = inverse of 0x27
'(' # 0xA8 = inverse of 0x28
')' # 0xA9 = inverse of 0x29
'*' # 0xAA = inverse of 0x2A
'+' # 0xAB = inverse of 0x2B
',' # 0xAC = inverse of 0x2C
'-' # 0xAD = inverse of 0x2D
'.' # 0xAE = inverse of 0x2E
'/' # 0xAF = inverse of 0x2F
'0' # 0xB0 = inverse of 0x30
'1' # 0xB1 = inverse of 0x31
'2' # 0xB2 = inverse of 0x32
'3' # 0xB3 = inverse of 0x33
'4' # 0xB4 = inverse of 0x34
'5' # 0xB5 = inverse of 0x35
'6' # 0xB6 = inverse of 0x36
'7' # 0xB7 = inverse of 0x37
'8' # 0xB8 = inverse of 0x38
'9' # 0xB9 = inverse of 0x39
':' # 0xBA = inverse of 0x3A
';' # 0xBB = inverse of 0x3B
'<' # 0xBC = inverse of 0x3C
'=' # 0xBD = inverse of 0x3D
'>' # 0xBE = inverse of 0x3E
'?' # 0xBF = inverse of 0x3F
'@' # 0xC0 = inverse of 0x40
'A' # 0xC1 = inverse of 0x41
'B' # 0xC2 = inverse of 0x42
'C' # 0xC3 = inverse of 0x43
'D' # 0xC4 = inverse of 0x44
'E' # 0xC5 = inverse of 0x45
'F' # 0xC6 = inverse of 0x46
'G' # 0xC7 = inverse of 0x47
'H' # 0xC8 = inverse of 0x48
'I' # 0xC9 = inverse of 0x49
'J' # 0xCA = inverse of 0x4A
'K' # 0xCB = inverse of 0x4B
'L' # 0xCC = inverse of 0x4C
'M' # 0xCD = inverse of 0x4D
'N' # 0xCE = inverse of 0x4E
'O' # 0xCF = inverse of 0x4F
'P' # 0xD0 = inverse of 0x50
'Q' # 0xD1 = inverse of 0x51
'R' # 0xD2 = inverse of 0x52
'S' # 0xD3 = inverse of 0x53
'T' # 0xD4 = inverse of 0x54
'U' # 0xD5 = inverse of 0x55
'V' # 0xD6 = inverse of 0x56
'W' # 0xD7 = inverse of 0x57
'X' # 0xD8 = inverse of 0x58
'Y' # 0xD9 = inverse of 0x59
'Z' # 0xDA = inverse of 0x5A
'[' # 0xDB = inverse of 0x5B
'\\' # 0xDC = inverse of 0x5C
']' # 0xDD = inverse of 0x5D
'^' # 0xDE = inverse of 0x5E
'_' # 0xDF = inverse of 0x5F
'\u2666' # 0xE0 = inverse of 0x60 (diamond)
'a' # 0xE1 = inverse of 0x61
'b' # 0xE2 = inverse of 0x62
'c' # 0xE3 = inverse of 0x63
'd' # 0xE4 = inverse of 0x64
'e' # 0xE5 = inverse of 0x65
'f' # 0xE6 = inverse of 0x66
'g' # 0xE7 = inverse of 0x67
'h' # 0xE8 = inverse of 0x68
'i' # 0xE9 = inverse of 0x69
'j' # 0xEA = inverse of 0x6A
'k' # 0xEB = inverse of 0x6B
'l' # 0xEC = inverse of 0x6C
'm' # 0xED = inverse of 0x6D
'n' # 0xEE = inverse of 0x6E
'o' # 0xEF = inverse of 0x6F
'p' # 0xF0 = inverse of 0x70
'q' # 0xF1 = inverse of 0x71
'r' # 0xF2 = inverse of 0x72
's' # 0xF3 = inverse of 0x73
't' # 0xF4 = inverse of 0x74
'u' # 0xF5 = inverse of 0x75
'v' # 0xF6 = inverse of 0x76
'w' # 0xF7 = inverse of 0x77
'x' # 0xF8 = inverse of 0x78
'y' # 0xF9 = inverse of 0x79
'z' # 0xFA = inverse of 0x7A
'\u2660' # 0xFB = inverse of 0x7B (spade)
'|' # 0xFC = inverse of 0x7C
'\u21b0' # 0xFD = inverse of 0x7D (clear screen)
'\u25c0' # 0xFE = inverse of 0x7E (backspace)
'\u25b6' # 0xFF = inverse of 0x7F (tab)
)
assert len(DECODING_TABLE) == 256
def _normalize_eol(text: str) -> str:
r"""
Normalize CR and CRLF to LF for ATASCII encoding.
ATASCII uses byte 0x9B as its end-of-line character, mapped to ``\n`` (U+000A). Standard CR
(U+000D) has no native ATASCII representation (byte 0x0D is a graphics character), so CR and
CRLF are both folded to LF before charmap encoding.
"""
return text.replace('\r\n', '\n').replace('\r', '\n')
class Codec(codecs.Codec):
"""ATASCII character map codec."""
def encode( # pylint: disable=redefined-builtin
self, input: str, errors: str = 'strict'
) -> Tuple[bytes, int]:
"""Encode input string using ATASCII character map."""
input = _normalize_eol(input)
return codecs.charmap_encode(input, errors, ENCODING_TABLE)
def decode( # pylint: disable=redefined-builtin
self, input: bytes, errors: str = 'strict'
) -> Tuple[str, int]:
"""Decode input bytes using ATASCII character map."""
return codecs.charmap_decode(
input, errors, DECODING_TABLE # type: ignore[arg-type]
)
class IncrementalEncoder(codecs.IncrementalEncoder):
"""ATASCII incremental encoder with CR/CRLF → LF normalization."""
def __init__(self, errors: str = 'strict') -> None:
"""Initialize encoder with pending CR state."""
super().__init__(errors)
self._pending_cr = False
def encode( # pylint: disable=redefined-builtin
self, input: str, final: bool = False
) -> bytes:
"""Encode input string incrementally."""
if self._pending_cr:
input = '\r' + input
self._pending_cr = False
if not final and input.endswith('\r'):
input = input[:-1]
self._pending_cr = True
input = _normalize_eol(input)
return codecs.charmap_encode(input, self.errors, ENCODING_TABLE)[0]
def reset(self) -> None:
"""Reset encoder state."""
self._pending_cr = False
def getstate(self) -> int:
"""Return encoder state as integer."""
return int(self._pending_cr)
def setstate(self, state: Union[int, str]) -> None:
"""Restore encoder state from integer."""
self._pending_cr = bool(state)
class IncrementalDecoder(codecs.IncrementalDecoder):
"""ATASCII incremental decoder."""
def decode( # type: ignore[override] # pylint: disable=redefined-builtin
self, input: bytes, final: bool = False
) -> str:
"""Decode input bytes incrementally."""
return codecs.charmap_decode(
input, self.errors, DECODING_TABLE # type: ignore[arg-type]
)[0]
class StreamWriter(Codec, codecs.StreamWriter):
"""ATASCII stream writer."""
class StreamReader(Codec, codecs.StreamReader):
"""ATASCII stream reader."""
def getregentry() -> codecs.CodecInfo:
"""Return the codec registry entry."""
return codecs.CodecInfo(
name='atascii',
encode=Codec().encode,
decode=Codec().decode, # type: ignore[arg-type]
incrementalencoder=IncrementalEncoder,
incrementaldecoder=IncrementalDecoder,
streamreader=StreamReader,
streamwriter=StreamWriter,
)
def getaliases() -> Tuple[str, ...]:
"""Return codec aliases."""
return ('atari8bit', 'atari_8bit')
# Build encoding table preferring normal bytes (0x00-0x7F) over inverse
# (0x80-0xFF). codecs.charmap_build() picks the last occurrence, which
# maps printable characters to the inverse-video range. We fix this by
# overwriting with normal-range entries so they take priority.
# Exception: '\n' must still encode to 0x9B (ATASCII EOL), not 0x0A.
def _build_encoding_table() -> Dict[int, int]:
table: Dict[int, int] = codecs.charmap_build(DECODING_TABLE)
for byte_val in range(0x80):
table[ord(DECODING_TABLE[byte_val])] = byte_val
# '\n' at 0x0A must encode to 0x9B (ATASCII EOL), not 0x0A
table[ord('\n')] = 0x9B
return table
ENCODING_TABLE = _build_encoding_table()
"""Tests for the ATASCII (Atari 8-bit) codec."""
# std imports
import codecs
# 3rd party
import pytest
# local
import telnetlib3 # noqa: F401 - registers codecs
from telnetlib3.encodings import atascii
def test_codec_lookup():
info = codecs.lookup('atascii')
assert info.name == 'atascii'
@pytest.mark.parametrize("alias", ['atari8bit', 'atari_8bit'])
def test_codec_aliases(alias):
info = codecs.lookup(alias)
assert info.name == 'atascii'
def test_ascii_letters_uppercase():
data = bytes(range(0x41, 0x5B))
assert data.decode('atascii') == 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
def test_ascii_letters_lowercase():
data = bytes(range(0x61, 0x7B))
assert data.decode('atascii') == 'abcdefghijklmnopqrstuvwxyz'
def test_digits():
data = bytes(range(0x30, 0x3A))
assert data.decode('atascii') == '0123456789'
def test_space():
assert b'\x20'.decode('atascii') == ' '
@pytest.mark.parametrize("byte_val,expected", [
pytest.param(0x00, '\u2665', id='heart'),
pytest.param(0x01, '\u251c', id='box_vert_right'),
pytest.param(0x08, '\u25e2', id='lower_right_triangle'),
pytest.param(0x09, '\u2597', id='quadrant_lower_right'),
pytest.param(0x10, '\u2663', id='club'),
pytest.param(0x12, '\u2500', id='horizontal_line'),
pytest.param(0x14, '\u25cf', id='black_circle'),
pytest.param(0x15, '\u2584', id='lower_half_block'),
pytest.param(0x19, '\u258c', id='left_half_block'),
pytest.param(0x1B, '\u241b', id='symbol_for_escape'),
])
def test_graphics_chars(byte_val, expected):
assert bytes([byte_val]).decode('atascii') == expected
@pytest.mark.parametrize("byte_val,expected", [
pytest.param(0x1C, '\u2191', id='cursor_up'),
pytest.param(0x1D, '\u2193', id='cursor_down'),
pytest.param(0x1E, '\u2190', id='cursor_left'),
pytest.param(0x1F, '\u2192', id='cursor_right'),
])
def test_cursor_arrows(byte_val, expected):
assert bytes([byte_val]).decode('atascii') == expected
@pytest.mark.parametrize("byte_val,expected", [
pytest.param(0x60, '\u2666', id='diamond'),
pytest.param(0x7B, '\u2660', id='spade'),
pytest.param(0x7C, '|', id='pipe'),
pytest.param(0x7D, '\u21b0', id='clear_screen'),
pytest.param(0x7E, '\u25c0', id='backspace_triangle'),
pytest.param(0x7F, '\u25b6', id='tab_triangle'),
])
def test_special_glyphs(byte_val, expected):
assert bytes([byte_val]).decode('atascii') == expected
def test_atascii_eol():
assert b'\x9b'.decode('atascii') == '\n'
@pytest.mark.parametrize("byte_val,expected", [
pytest.param(0x82, '\u258a', id='left_three_quarters'),
pytest.param(0x88, '\u25e4', id='upper_left_triangle'),
pytest.param(0x89, '\u259b', id='quadrant_UL_UR_LL'),
pytest.param(0x8A, '\u25e5', id='upper_right_triangle'),
pytest.param(0x8B, '\u2599', id='quadrant_UL_LL_LR'),
pytest.param(0x8C, '\u259f', id='quadrant_UR_LL_LR'),
pytest.param(0x8D, '\u2586', id='lower_three_quarters'),
pytest.param(0x8E, '\U0001fb85', id='upper_three_quarters'),
pytest.param(0x8F, '\u259c', id='quadrant_UL_UR_LR'),
pytest.param(0x94, '\u25d8', id='inverse_bullet'),
pytest.param(0x95, '\u2580', id='upper_half_block'),
pytest.param(0x96, '\U0001fb8a', id='right_three_quarters'),
pytest.param(0x99, '\u2590', id='right_half_block'),
pytest.param(0xA0, '\u2588', id='full_block'),
])
def test_inverse_distinct_glyphs(byte_val, expected):
assert bytes([byte_val]).decode('atascii') == expected
def test_inverse_shared_glyphs():
for byte_val in (0x80, 0x81, 0x83, 0x84, 0x85, 0x86, 0x87):
normal = bytes([byte_val & 0x7F]).decode('atascii')
inverse = bytes([byte_val]).decode('atascii')
assert inverse == normal
def test_inverse_ascii_range():
for byte_val in range(0xA1, 0xFB):
normal = bytes([byte_val & 0x7F]).decode('atascii')
inverse = bytes([byte_val]).decode('atascii')
assert inverse == normal
def test_full_decode_no_crash():
data = bytes(range(256))
result = data.decode('atascii')
assert len(result) == 256
def test_encode_eol():
encoded, length = codecs.lookup('atascii').encode('\n')
assert encoded == b'\x9b'
assert length == 1
def test_encode_unique_chars():
encoded, _ = codecs.lookup('atascii').encode('\u258a')
assert encoded == b'\x82'
encoded, _ = codecs.lookup('atascii').encode('\u25d8')
assert encoded == b'\x94'
encoded, _ = codecs.lookup('atascii').encode('\u2588')
assert encoded == b'\xa0'
def test_encode_charmap_prefers_normal_byte():
encoded, _ = codecs.lookup('atascii').encode('\u2665')
assert encoded == b'\x00'
encoded, _ = codecs.lookup('atascii').encode('A')
assert encoded == b'\x41'
def test_incremental_decoder():
decoder = codecs.getincrementaldecoder('atascii')()
assert decoder.decode(b'\x00', False) == '\u2665'
assert decoder.decode(b'AB', True) == 'AB'
def test_incremental_encoder():
encoder = codecs.getincrementalencoder('atascii')()
assert encoder.encode('\u258a', False) == b'\x82'
assert encoder.encode('\n', True) == b'\x9b'
def test_strict_error_on_unencodable():
with pytest.raises(UnicodeEncodeError):
'\u00e9'.encode('atascii')
def test_replace_error_mode():
result = '\u00e9'.encode('atascii', errors='replace')
assert result == b'\x3f'
def test_ignore_error_mode():
result = '\u00e9'.encode('atascii', errors='ignore')
assert result == b''
def test_encode_cr_as_eol():
encoded, length = codecs.lookup('atascii').encode('\r')
assert encoded == b'\x9b'
assert length == 1
def test_encode_crlf_as_single_eol():
encoded, length = codecs.lookup('atascii').encode('\r\n')
assert encoded == b'\x9b'
assert length == 1
def test_encode_mixed_line_endings():
encoded, _ = codecs.lookup('atascii').encode('hello\r\nworld\r')
hello_eol = 'hello\n'.encode('atascii')
world_eol = 'world\n'.encode('atascii')
assert encoded == hello_eol + world_eol
def test_incremental_encoder_cr_then_lf():
encoder = codecs.getincrementalencoder('atascii')()
result = encoder.encode('hello\r', final=False)
assert result == 'hello'.encode('atascii')
result = encoder.encode('\nworld', final=True)
assert result == '\nworld'.encode('atascii')
def test_incremental_encoder_cr_then_other():
encoder = codecs.getincrementalencoder('atascii')()
result = encoder.encode('A\r', final=False)
assert result == 'A'.encode('atascii')
result = encoder.encode('B', final=True)
assert result == '\nB'.encode('atascii')
def test_incremental_encoder_cr_final():
encoder = codecs.getincrementalencoder('atascii')()
result = encoder.encode('end\r', final=True)
assert result == 'end\n'.encode('atascii')
def test_incremental_encoder_reset():
encoder = codecs.getincrementalencoder('atascii')()
encoder.encode('A\r', final=False)
encoder.reset()
assert encoder.getstate() == 0
def test_incremental_encoder_getstate_setstate():
encoder = codecs.getincrementalencoder('atascii')()
encoder.encode('A\r', final=False)
assert encoder.getstate() == 1
state = encoder.getstate()
encoder2 = codecs.getincrementalencoder('atascii')()
encoder2.setstate(state)
result = encoder2.encode('\n', final=True)
assert result == b'\x9b'
def test_native_graphics_0x0a_0x0d():
assert b'\x0a'.decode('atascii') == '\u25e3'
assert b'\x0d'.decode('atascii') == '\U0001fb82'
def test_decoding_table_length():
assert len(atascii.DECODING_TABLE) == 256
"""Tests for telnetlib3.client_shell — Terminal mode handling."""
# std imports
import sys
import types
# 3rd party
import pytest
if sys.platform == "win32":
pytest.skip("POSIX-only tests", allow_module_level=True)
# std imports
# std imports (POSIX only)
import termios # noqa: E402
# local
from telnetlib3.client_shell import ( # noqa: E402
_INPUT_XLAT,
_INPUT_SEQ_XLAT,
Terminal,
InputFilter,
)
def _make_writer(will_echo: bool = False, raw_mode: bool = False) -> object:
"""Build a minimal mock writer with the attributes Terminal needs."""
writer = types.SimpleNamespace(
will_echo=will_echo,
log=types.SimpleNamespace(debug=lambda *a, **kw: None),
)
if raw_mode:
writer._raw_mode = True
return writer
def _cooked_mode() -> "Terminal.ModeDef":
"""Return a typical cooked-mode ModeDef with canonical input enabled."""
return Terminal.ModeDef(
iflag=termios.BRKINT | termios.ICRNL | termios.IXON,
oflag=termios.OPOST | termios.ONLCR,
cflag=termios.CS8 | termios.CREAD,
lflag=termios.ICANON | termios.ECHO | termios.ISIG | termios.IEXTEN,
ispeed=termios.B38400,
ospeed=termios.B38400,
cc=[b'\x00'] * termios.NCCS,
)
class TestDetermineMode:
def test_linemode_when_no_echo_no_raw(self) -> None:
writer = _make_writer(will_echo=False, raw_mode=False)
term = Terminal.__new__(Terminal)
term.telnet_writer = writer
mode = _cooked_mode()
result = term.determine_mode(mode)
assert result is mode
def test_raw_mode_when_will_echo(self) -> None:
writer = _make_writer(will_echo=True, raw_mode=False)
term = Terminal.__new__(Terminal)
term.telnet_writer = writer
mode = _cooked_mode()
result = term.determine_mode(mode)
assert result is not mode
assert not (result.lflag & termios.ICANON)
assert not (result.lflag & termios.ECHO)
def test_raw_mode_when_force_raw(self) -> None:
writer = _make_writer(will_echo=False, raw_mode=True)
term = Terminal.__new__(Terminal)
term.telnet_writer = writer
mode = _cooked_mode()
result = term.determine_mode(mode)
assert result is not mode
assert not (result.lflag & termios.ICANON)
assert not (result.lflag & termios.ECHO)
assert not (result.oflag & termios.OPOST)
def test_raw_mode_when_both_echo_and_raw(self) -> None:
writer = _make_writer(will_echo=True, raw_mode=True)
term = Terminal.__new__(Terminal)
term.telnet_writer = writer
mode = _cooked_mode()
result = term.determine_mode(mode)
assert result is not mode
assert not (result.lflag & termios.ICANON)
assert not (result.lflag & termios.ECHO)
class TestInputXlat:
def test_atascii_del_to_backspace(self) -> None:
assert _INPUT_XLAT["atascii"][0x7F] == 0x7E
def test_atascii_bs_to_backspace(self) -> None:
assert _INPUT_XLAT["atascii"][0x08] == 0x7E
def test_atascii_cr_to_eol(self) -> None:
assert _INPUT_XLAT["atascii"][0x0D] == 0x9B
def test_atascii_lf_to_eol(self) -> None:
assert _INPUT_XLAT["atascii"][0x0A] == 0x9B
def test_petscii_del_to_backspace(self) -> None:
assert _INPUT_XLAT["petscii"][0x7F] == 0x14
def test_petscii_bs_to_backspace(self) -> None:
assert _INPUT_XLAT["petscii"][0x08] == 0x14
def test_normal_bytes_not_in_xlat(self) -> None:
xlat = _INPUT_XLAT["atascii"]
for b in (ord('a'), ord('A'), ord('1'), ord(' ')):
assert b not in xlat
class TestInputFilterAtascii:
@staticmethod
def _make_filter() -> InputFilter:
return InputFilter(
_INPUT_SEQ_XLAT["atascii"], _INPUT_XLAT["atascii"]
)
@pytest.mark.parametrize("seq,expected", list(_INPUT_SEQ_XLAT["atascii"].items()))
def test_sequence_translated(self, seq: bytes, expected: bytes) -> None:
f = self._make_filter()
assert f.feed(seq) == expected
def test_passthrough_ascii(self) -> None:
f = self._make_filter()
assert f.feed(b"hello") == b"hello"
def test_mixed_text_and_sequence(self) -> None:
f = self._make_filter()
assert f.feed(b"hi\x1b[Alo") == b"hi\x1clo"
def test_multiple_sequences(self) -> None:
f = self._make_filter()
assert f.feed(b"\x1b[A\x1b[B\x1b[C\x1b[D") == b"\x1c\x1d\x1f\x1e"
def test_single_byte_xlat_applied(self) -> None:
f = self._make_filter()
assert f.feed(b"\x7f") == b"\x7e"
assert f.feed(b"\x08") == b"\x7e"
def test_cr_to_atascii_eol(self) -> None:
f = self._make_filter()
assert f.feed(b"\r") == b"\x9b"
def test_lf_to_atascii_eol(self) -> None:
f = self._make_filter()
assert f.feed(b"\n") == b"\x9b"
def test_text_with_enter(self) -> None:
f = self._make_filter()
assert f.feed(b"hello\r") == b"hello\x9b"
def test_split_sequence_buffered(self) -> None:
f = self._make_filter()
assert f.feed(b"\x1b") == b""
assert f.feed(b"[A") == b"\x1c"
def test_split_sequence_mid_csi(self) -> None:
f = self._make_filter()
assert f.feed(b"\x1b[") == b""
assert f.feed(b"A") == b"\x1c"
def test_bare_esc_flushed_on_non_prefix(self) -> None:
f = self._make_filter()
assert f.feed(b"\x1b") == b""
assert f.feed(b"x") == b"\x1bx"
def test_delete_key_sequence(self) -> None:
f = self._make_filter()
assert f.feed(b"\x1b[3~") == b"\x7e"
def test_ss3_arrow_keys(self) -> None:
f = self._make_filter()
assert f.feed(b"\x1bOA") == b"\x1c"
assert f.feed(b"\x1bOD") == b"\x1e"
class TestInputFilterPetscii:
@staticmethod
def _make_filter() -> InputFilter:
return InputFilter(
_INPUT_SEQ_XLAT["petscii"], _INPUT_XLAT["petscii"]
)
@pytest.mark.parametrize("seq,expected", list(_INPUT_SEQ_XLAT["petscii"].items()))
def test_sequence_translated(self, seq: bytes, expected: bytes) -> None:
f = self._make_filter()
assert f.feed(seq) == expected
def test_home_key(self) -> None:
f = self._make_filter()
assert f.feed(b"\x1b[H") == b"\x13"
def test_insert_key(self) -> None:
f = self._make_filter()
assert f.feed(b"\x1b[2~") == b"\x94"
def test_single_byte_xlat_applied(self) -> None:
f = self._make_filter()
assert f.feed(b"\x7f") == b"\x14"
class TestInputFilterEmpty:
def test_no_xlat_passthrough(self) -> None:
f = InputFilter({}, {})
assert f.feed(b"hello\x1b[Aworld") == b"hello\x1b[Aworld"
def test_empty_feed(self) -> None:
f = InputFilter({}, {})
assert f.feed(b"") == b""
"""Tests for the PETSCII (Commodore 64/128) codec."""
# std imports
import codecs
# 3rd party
import pytest
# local
import telnetlib3 # noqa: F401 - registers codecs
from telnetlib3.encodings import petscii
def test_codec_lookup():
info = codecs.lookup('petscii')
assert info.name == 'petscii'
@pytest.mark.parametrize("alias", ['cbm', 'commodore', 'c64', 'c128'])
def test_codec_aliases(alias):
info = codecs.lookup(alias)
assert info.name == 'petscii'
def test_digits():
data = bytes(range(0x30, 0x3A))
assert data.decode('petscii') == '0123456789'
def test_space():
assert b'\x20'.decode('petscii') == ' '
def test_lowercase_at_41_5A():
data = bytes(range(0x41, 0x5B))
assert data.decode('petscii') == 'abcdefghijklmnopqrstuvwxyz'
def test_uppercase_at_C1_DA():
data = bytes(range(0xC1, 0xDB))
assert data.decode('petscii') == 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
def test_return():
assert b'\x0d'.decode('petscii') == '\r'
@pytest.mark.parametrize("byte_val,expected", [
pytest.param(0x5C, '\u00a3', id='pound_sign'),
pytest.param(0x5E, '\u2191', id='up_arrow'),
pytest.param(0x5F, '\u2190', id='left_arrow'),
pytest.param(0x61, '\u2660', id='spade'),
pytest.param(0x7E, '\u03c0', id='pi'),
pytest.param(0x78, '\u2663', id='club'),
pytest.param(0x7A, '\u2666', id='diamond'),
pytest.param(0x73, '\u2665', id='heart'),
])
def test_graphics_chars(byte_val, expected):
assert bytes([byte_val]).decode('petscii') == expected
def test_full_decode_no_crash():
data = bytes(range(256))
result = data.decode('petscii')
assert len(result) == 256
def test_encode_lowercase():
encoded, length = codecs.lookup('petscii').encode('hello')
assert encoded == bytes([0x48, 0x45, 0x4c, 0x4c, 0x4f])
assert length == 5
def test_encode_uppercase():
encoded, length = codecs.lookup('petscii').encode('HELLO')
assert encoded == b'\xc8\xc5\xcc\xcc\xcf'
assert length == 5
def test_round_trip_digits():
for byte_val in range(0x30, 0x3A):
original = bytes([byte_val])
decoded = original.decode('petscii')
re_encoded = decoded.encode('petscii')
assert re_encoded == original
def test_incremental_decoder():
decoder = codecs.getincrementaldecoder('petscii')()
assert decoder.decode(b'\xc1', False) == 'A'
assert decoder.decode(b'\x42\x43', True) == 'bc'
def test_incremental_encoder():
encoder = codecs.getincrementalencoder('petscii')()
assert encoder.encode('A', False) == b'\xc1'
assert encoder.encode('bc', True) == b'\x42\x43'
def test_decoding_table_length():
assert len(petscii.DECODING_TABLE) == 256
+2
-2

@@ -71,6 +71,6 @@ # std imports

# The short X.Y version.
version = "2.4"
version = "2.5"
# The full version, including alpha/beta/rc tags.
release = "2.4.0" # keep in sync with pyproject.toml and telnetlib3/accessories.py !!
release = "2.5.0" # keep in sync with pyproject.toml and telnetlib3/accessories.py !!

@@ -77,0 +77,0 @@ # The language for content auto-generated by Sphinx. Refer to documentation

@@ -208,2 +208,46 @@ =========

Retro BBS Encodings
~~~~~~~~~~~~~~~~~~~
telnetlib3 includes custom codecs for retro computing platforms commonly
found on telnet BBS systems:
- **ATASCII** (``--encoding=atascii``) -- Atari 8-bit computers (400, 800,
XL, XE). Graphics characters at 0x00-0x1F, card suits, box drawing, and
an inverse-video range at 0x80-0xFF. The ATASCII end-of-line character
(0x9B) maps to newline. Aliases: ``atari8bit``, ``atari_8bit``.
- **PETSCII** (``--encoding=petscii``) -- Commodore 64/128 shifted
(lowercase) mode. Lowercase a-z at 0x41-0x5A, uppercase A-Z at
0xC1-0xDA. Aliases: ``cbm``, ``commodore``, ``c64``, ``c128``.
- **Atari ST** (``--encoding=atarist``) -- Atari ST character set with
extended Latin, Greek, and math symbols. Alias: ``atari``.
These encodings use bytes 0x80-0xFF for standard glyphs, which conflicts
with the telnet protocol's default 7-bit NVT mode. When any of these
encodings is selected, ``--force-binary`` is automatically enabled so that
high-bit bytes are transmitted without requiring BINARY option negotiation.
PETSCII inline color codes are translated to ANSI 24-bit RGB using the
VIC-II C64 palette, and cursor control codes (up/down/left/right, HOME,
CLR, DEL) are translated to ANSI sequences. ATASCII control character
glyphs (cursor movement, backspace, clear screen) are similarly translated.
Keyboard input is also mapped: arrow keys, backspace, delete, and enter
produce the correct raw bytes for each encoding::
telnetlib3-client --encoding=atascii area52.tk 5200
telnetlib3-client --encoding=petscii bbs.example.com 6400
``telnetlib3-fingerprint`` decodes and translates banners with these
encodings, including PETSCII colors.
SyncTERM Font Detection
^^^^^^^^^^^^^^^^^^^^^^^^
When a server sends a SyncTERM/CTerm font selection sequence
(``CSI Ps1 ; Ps2 SP D``), both ``telnetlib3-client`` and
``telnetlib3-fingerprint`` automatically switch the session encoding
to match the font (e.g. font 36 = ATASCII, 32-35 = PETSCII, 0 = CP437).
An explicit ``--encoding`` flag takes precedence over font detection.
Line Endings

@@ -234,2 +278,40 @@ ~~~~~~~~~~~~

Raw Mode and Line Mode
~~~~~~~~~~~~~~~~~~~~~~
``telnetlib3-client`` defaults to **raw terminal mode** -- the local
terminal is set to raw (no line buffering, no local echo, no signal
processing), and each keystroke is sent to the server immediately. This
is the correct mode for most BBS and MUD servers that handle their own
echo and line editing.
Use ``--line-mode`` to switch to line-buffered input with local echo,
which is appropriate for simple command-line services that expect the
client to perform local line editing::
# Default: raw mode (correct for most servers)
telnetlib3-client bbs.example.com
# Line mode: local echo and line buffering
telnetlib3-client --line-mode simple-service.example.com
Similarly, ``telnetlib3-server --pty-exec`` defaults to raw PTY mode
(disabling PTY echo), which is correct for programs that handle their own
terminal I/O (curses, blessed, etc.). Use ``--line-mode`` for programs
that expect cooked/canonical PTY mode::
# Default: raw PTY (correct for curses programs)
telnetlib3-server --pty-exec /bin/bash -- --login
# Line mode: cooked PTY with echo (for simple programs like bc)
telnetlib3-server --pty-exec /bin/bc --line-mode
Debugging
~~~~~~~~~
Use ``--loglevel=trace`` to see hexdump-style output of all bytes sent
and received on the wire::
telnetlib3-client --loglevel=trace --logfile=debug.log bbs.example.com
server_binary.py

@@ -236,0 +318,0 @@ ~~~~~~~~~~~~~~~~

History
=======
2.5.0
* change: ``telnetlib3-client`` now defaults to raw terminal mode (no line buffering, no local
echo), which is correct for most servers. Use ``--line-mode`` to restore line-buffered
local-echo behavior.
* change: ``telnetlib3-server --pty-exec`` now defaults to raw PTY mode. Use ``--line-mode`` to
restore cooked PTY mode with echo.
* change: ``connect_minwait`` default reduced to 0 across
:class:`~telnetlib3.client_base.BaseClient`, :func:`~telnetlib3.client.open_connection`, and
``telnetlib3-client``. Negotiation continues asynchronously. Use ``--connect-minwait`` to
restore a delay if needed, or, use :meth:`~telnetlib3.stream_writer.TelnetWriter.wait_for` in
server or client shells to await a specific negotiation state.
* new: Color, keyboard input translation and ``--encoding`` support for ATASCII (ATARI ASCII) and
PETSCII (Commodore ASCII).
* new: SyncTERM/CTerm font selection sequence detection (``CSI Ps1 ; Ps2 SP D``). Both
``telnetlib3-fingerprint`` and ``telnetlib3-client`` detect font switching and auto-switch
encoding to the matching codec (e.g. font 36 = ATASCII, 32-35 = PETSCII, 0 = CP437). Explicit
``--encoding`` takes precedence.
* new: :data:`~telnetlib3.accessories.TRACE` log level (5, below ``DEBUG``) with
:func:`~telnetlib3.accessories.hexdump` style output for all sent and received bytes. Use
``--loglevel=trace``.
* bugfix: :func:`~telnetlib3.guard_shells.robot_check` now uses a narrow
character (space) instead of a wide Unicode character, allowing retro
terminal emulators to pass.
* bugfix: ATASCII codec now maps bytes 0x0D and 0x0A to CR and LF instead
of graphics characters, fixing garbled output when connecting to Atari
BBS systems.
* bugfix: ATASCII codec normalizes CR and CRLF to the native ATASCII
EOL (0x9B) during encoding, so the Return key works correctly.
* bugfix: PETSCII bare CR (0x0D) is now normalized to CRLF in raw
terminal mode and to LF in ``telnetlib3-fingerprint`` banners.
* bugfix: ``telnetlib3-fingerprint`` re-encodes prompt responses for retro
encodings so servers receive the correct EOL byte.
* bugfix: ``telnetlib3-fingerprint`` no longer crashes with
``LookupError`` when the server negotiates an unknown charset.
Banner formatting falls back to ``latin-1``.
* bugfix: :meth:`~telnetlib3.client.TelnetClient.send_charset` normalises
non-standard encoding names (``iso-8859-02`` to ``iso-8859-2``,
``cp-1250`` to ``cp1250``, etc.).
* enhancement: ``telnetlib3-fingerprint`` responds more like a terminal and to more
y/n prompts about colors, encoding, etc. to collect more banners for https://bbs.modem.xyz/
project.
* enhancement: ``telnetlib3-fingerprint`` banner formatting uses
``surrogateescape`` error handler, preserving raw high bytes (e.g. CP437
art) as surrogates instead of replacing them with U+FFFD.

@@ -31,3 +75,9 @@ 2.4.0

default raised from 1024 to 65536.
* enhancement: new ``--encoding=petscii`` and ``--encoding=atarist``
* new: ATASCII (Atari 8-bit) codec -- ``--encoding=atascii`` for connecting
to Atari BBS systems. Maps all 256 byte values to Unicode including
graphics characters, card suits, and the inverse-video range (0x80-0xFF).
ATASCII EOL (0x9B) maps to newline. Aliases: ``atari8bit``, ``atari_8bit``.
* enhancement: ``--encoding=atascii``, ``--encoding=petscii``, and
``--encoding=atarist`` now auto-enable ``--force-binary`` for both client
and server, since these encodings use bytes 0x80-0xFF for standard glyphs.
* bugfix: rare LINEMODE ACK loop with misbehaving servers that re-send

@@ -34,0 +84,0 @@ unchanged MODE without ACK.

Metadata-Version: 2.4
Name: telnetlib3
Version: 2.4.0
Version: 2.5.0
Summary: Python Telnet server and client CLI and Protocol library

@@ -113,6 +113,6 @@ Project-URL: Homepage, https://github.com/jquast/telnetlib3

telnetlib3-server 0.0.0.0 1984 --shell=bin.server_wargame.shell
# run an external program with a pseudo-terminal
telnetlib3-server --pty-exec /bin/bash --pty-raw -- --login
# or a simple linemode program, bc (calculator)
telnetlib3-server --pty-exec /bin/bc
# run an external program with a pseudo-terminal (raw mode is default)
telnetlib3-server --pty-exec /bin/bash -- --login
# or a linemode program, bc (calculator)
telnetlib3-server --pty-exec /bin/bc --line-mode

@@ -119,0 +119,0 @@

@@ -7,3 +7,3 @@ [build-system]

name = "telnetlib3"
version = "2.4.0"
version = "2.5.0"
description = " Python Telnet server and client CLI and Protocol library"

@@ -10,0 +10,0 @@ readme = "README.rst"

@@ -75,6 +75,6 @@ .. image:: https://img.shields.io/pypi/v/telnetlib3.svg

telnetlib3-server 0.0.0.0 1984 --shell=bin.server_wargame.shell
# run an external program with a pseudo-terminal
telnetlib3-server --pty-exec /bin/bash --pty-raw -- --login
# or a simple linemode program, bc (calculator)
telnetlib3-server --pty-exec /bin/bc
# run an external program with a pseudo-terminal (raw mode is default)
telnetlib3-server --pty-exec /bin/bash -- --login
# or a linemode program, bc (calculator)
telnetlib3-server --pty-exec /bin/bc --line-mode

@@ -81,0 +81,0 @@

@@ -12,2 +12,6 @@ """Accessory functions."""

#: Custom TRACE log level, below DEBUG (10).
TRACE = 5
logging.addLevelName(TRACE, "TRACE")
if TYPE_CHECKING: # pragma: no cover

@@ -18,5 +22,7 @@ # local

__all__ = (
"TRACE",
"encoding_from_lang",
"name_unicode",
"eightbits",
"hexdump",
"make_logger",

@@ -42,3 +48,3 @@ "repr_mapping",

"""Return the current version of telnetlib3."""
return "2.3.0" # keep in sync with pyproject.toml and docs/conf.py !!
return "2.5.0" # keep in sync with pyproject.toml and docs/conf.py !!

@@ -102,2 +108,25 @@

def hexdump(data: bytes, prefix: str = "") -> str:
"""
Format *data* as ``hexdump -C`` style output.
Each 16-byte row shows the offset, hex bytes grouped 8+8,
and printable ASCII on the right::
00000000 48 65 6c 6c 6f 20 57 6f 72 6c 64 0d 0a |Hello World..|
:param data: Raw bytes to format.
:param prefix: String prepended to every line (e.g. ``">> "``).
:rtype: str
"""
lines: list[str] = []
for offset in range(0, len(data), 16):
chunk = data[offset : offset + 16]
hex_left = " ".join(f"{b:02x}" for b in chunk[:8])
hex_right = " ".join(f"{b:02x}" for b in chunk[8:])
ascii_part = "".join(chr(b) if 0x20 <= b < 0x7F else "." for b in chunk)
lines.append(f"{prefix}{offset:08x} {hex_left:<23s} {hex_right:<23s} |{ascii_part}|")
return "\n".join(lines)
_DEFAULT_LOGFMT = " ".join(

@@ -112,3 +141,5 @@ ("%(asctime)s", "%(levelname)s", "%(filename)s:%(lineno)d", "%(message)s")

"""Create and return simple logger for given arguments."""
lvl = getattr(logging, loglevel.upper())
lvl = getattr(logging, loglevel.upper(), None)
if lvl is None:
lvl = logging.getLevelName(loglevel.upper())

@@ -115,0 +146,0 @@ _cfg: Dict[str, Any] = {"format": logfmt}

@@ -19,2 +19,3 @@ """Module provides class BaseClient."""

from .telopt import DO, WILL, theNULL, name_commands
from .accessories import TRACE, hexdump
from .stream_reader import TelnetReader, TelnetReaderUnicode

@@ -48,3 +49,3 @@ from .stream_writer import TelnetWriter, TelnetWriterUnicode

force_binary: bool = False,
connect_minwait: float = 1.0,
connect_minwait: float = 0,
connect_maxwait: float = 4.0,

@@ -217,4 +218,9 @@ limit: Optional[int] = None,

"""
if self.log.isEnabledFor(TRACE):
self.log.log(TRACE, "recv %d bytes\n%s", len(data), hexdump(data, prefix="<< "))
self._last_received = datetime.datetime.now()
# Detect SyncTERM font switching sequences and auto-switch encoding.
self._detect_syncterm_font(data)
# Enqueue and account for buffered size

@@ -239,2 +245,29 @@ self._rx_queue.append(data)

def _detect_syncterm_font(self, data: bytes) -> None:
"""
Scan *data* for SyncTERM font selection and switch encoding.
When :attr:`_encoding_explicit` is set on the writer (indicating
the user passed ``--encoding``), the font switch is logged but
does not override the encoding.
"""
if self.writer is None:
return
# local
from .server_fingerprinting import ( # pylint: disable=import-outside-toplevel
_SYNCTERM_BINARY_ENCODINGS,
detect_syncterm_font,
)
encoding = detect_syncterm_font(data)
if encoding is not None:
self.log.debug("SyncTERM font switch: %s", encoding)
if getattr(self.writer, '_encoding_explicit', False):
self.log.debug(
"ignoring font switch, explicit encoding: %s",
self.writer.environ_encoding)
else:
self.writer.environ_encoding = encoding
if encoding in _SYNCTERM_BINARY_ENCODINGS:
self.force_binary = True
# public properties

@@ -241,0 +274,0 @@

@@ -9,3 +9,3 @@ """Telnet client shell implementations for interactive terminal sessions."""

import collections
from typing import Any, Tuple, Union, Optional
from typing import Any, Dict, Tuple, Union, Optional

@@ -17,5 +17,117 @@ # local

__all__ = ("telnet_client_shell",)
__all__ = ("InputFilter", "telnet_client_shell")
# Input byte translation tables for retro encodings in raw mode.
# Maps terminal keyboard bytes to the raw bytes the BBS expects.
# Applied BEFORE decoding/encoding, bypassing the codec entirely for
# characters that can't round-trip through Unicode (e.g. ATASCII 0x7E
# shares its Unicode codepoint U+25C0 with 0xFE).
_INPUT_XLAT: Dict[str, Dict[int, int]] = {
"atascii": {
0x7F: 0x7E, # DEL → ATASCII backspace (byte 0x7E)
0x08: 0x7E, # BS → ATASCII backspace (byte 0x7E)
0x0D: 0x9B, # CR → ATASCII EOL (byte 0x9B)
0x0A: 0x9B, # LF → ATASCII EOL (byte 0x9B)
},
"petscii": {
0x7F: 0x14, # DEL → PETSCII DEL (byte 0x14)
0x08: 0x14, # BS → PETSCII DEL (byte 0x14)
},
}
# Multi-byte escape sequence translation tables for retro encodings.
# Maps common ANSI terminal escape sequences (arrow keys, delete, etc.)
# to the raw bytes the BBS expects. Inspired by blessed's
# DEFAULT_SEQUENCE_MIXIN but kept minimal for the sequences that matter.
_INPUT_SEQ_XLAT: Dict[str, Dict[bytes, bytes]] = {
"atascii": {
b"\x1b[A": b"\x1c", # cursor up (CSI)
b"\x1b[B": b"\x1d", # cursor down
b"\x1b[C": b"\x1f", # cursor right
b"\x1b[D": b"\x1e", # cursor left
b"\x1bOA": b"\x1c", # cursor up (SS3 / application mode)
b"\x1bOB": b"\x1d", # cursor down
b"\x1bOC": b"\x1f", # cursor right
b"\x1bOD": b"\x1e", # cursor left
b"\x1b[3~": b"\x7e", # delete → ATASCII backspace
b"\t": b"\x7f", # tab → ATASCII tab
},
"petscii": {
b"\x1b[A": b"\x91", # cursor up (CSI)
b"\x1b[B": b"\x11", # cursor down
b"\x1b[C": b"\x1d", # cursor right
b"\x1b[D": b"\x9d", # cursor left
b"\x1bOA": b"\x91", # cursor up (SS3 / application mode)
b"\x1bOB": b"\x11", # cursor down
b"\x1bOC": b"\x1d", # cursor right
b"\x1bOD": b"\x9d", # cursor left
b"\x1b[3~": b"\x14", # delete → PETSCII DEL
b"\x1b[H": b"\x13", # home → PETSCII HOME
b"\x1b[2~": b"\x94", # insert → PETSCII INSERT
},
}
class InputFilter: # pylint: disable=too-few-public-methods
"""
Translate terminal escape sequences and single bytes to retro encoding bytes.
Combines single-byte translation (backspace, delete) with multi-byte
escape sequence matching (arrow keys, function keys). Uses prefix-based
buffering inspired by blessed's ``get_leading_prefixes`` to handle
sequences split across reads.
:param seq_xlat: Multi-byte escape sequence → replacement bytes.
:param byte_xlat: Single input byte → replacement byte.
"""
def __init__(
self, seq_xlat: Dict[bytes, bytes], byte_xlat: Dict[int, int]
) -> None:
"""Initialize input filter with sequence and byte translation tables."""
self._byte_xlat = byte_xlat
# Sort sequences longest-first so \x1b[3~ matches before \x1b[3
self._seq_sorted: Tuple[Tuple[bytes, bytes], ...] = tuple(
sorted(seq_xlat.items(), key=lambda kv: len(kv[0]), reverse=True)
)
# Prefix set for partial-match buffering (blessed's get_leading_prefixes)
self._prefixes: frozenset[bytes] = frozenset(
seq[:i] for seq in seq_xlat for i in range(1, len(seq))
)
self._buf = b""
def feed(self, data: bytes) -> bytes:
"""
Process input bytes, returning raw bytes to send to the remote host.
Escape sequences are matched against the configured table and replaced. Partial sequences
are buffered until the next call. Single bytes are translated via the byte translation
table.
:param data: Raw bytes from terminal stdin.
:returns: Translated bytes ready to send to the remote BBS.
"""
self._buf += data
result = bytearray()
while self._buf:
# Try multi-byte sequence match at current position
matched = False
for seq, repl in self._seq_sorted:
if self._buf[:len(seq)] == seq:
result.extend(repl)
self._buf = self._buf[len(seq):]
matched = True
break
if matched:
continue
# Check if buffer is a prefix of any known sequence — wait for more
if self._buf in self._prefixes:
break
# No sequence match, emit single byte with translation
b = self._buf[0]
self._buf = self._buf[1:]
result.append(self._byte_xlat.get(b, b))
return bytes(result)
if sys.platform == "win32":

@@ -78,7 +190,10 @@

"""Return copy of 'mode' with changes suggested for telnet connection."""
if not self.telnet_writer.will_echo:
# return mode as-is
raw_mode = getattr(self.telnet_writer, '_raw_mode', False)
if not self.telnet_writer.will_echo and not raw_mode:
self.telnet_writer.log.debug("local echo, linemode")
return mode
self.telnet_writer.log.debug("server echo, kludge mode")
if raw_mode and not self.telnet_writer.will_echo:
self.telnet_writer.log.debug("raw mode forced, no server echo")
else:
self.telnet_writer.log.debug("server echo, kludge mode")

@@ -177,4 +292,6 @@ # "Raw mode", see tty.py function setraw. This allows sending

linesep = "\n"
if term._istty and telnet_writer.will_echo: # pylint: disable=protected-access
linesep = "\r\n"
if term._istty: # pylint: disable=protected-access
_raw = getattr(telnet_writer, '_raw_mode', False)
if telnet_writer.will_echo or _raw:
linesep = "\r\n"
stdin, stdout = await term.make_stdio()

@@ -268,3 +385,9 @@ escape_name = accessories.name_unicode(keyboard_escape)

break
telnet_writer.write(inp.decode())
_inf = getattr(telnet_writer, '_input_filter', None)
if _inf is not None:
translated = _inf.feed(inp)
if translated:
telnet_writer._write(translated) # pylint: disable=protected-access
else:
telnet_writer.write(inp.decode())
stdin_task = accessories.make_reader_task(stdin)

@@ -310,4 +433,11 @@ wait_for.add(stdin_task)

out = _cf.filter(out)
if getattr(telnet_writer, '_raw_mode', False):
# Normalize all line endings to LF, then to CRLF
# for the raw terminal (OPOST disabled). PETSCII
# BBSes send bare CR (0x0D) as line terminator.
out = (out.replace('\r\n', '\n')
.replace('\r', '\n')
.replace('\n', '\r\n'))
stdout.write(out.encode() or b":?!?:")
telnet_task = accessories.make_reader_task(telnet_reader, size=2**24)
wait_for.add(telnet_task)

@@ -184,2 +184,40 @@ #!/usr/bin/env python3

@staticmethod
def _normalize_charset_name(name: str) -> str:
"""
Normalize server-advertised charset names for :func:`codecs.lookup`.
Servers sometimes advertise non-standard encoding names that Python's
codec registry does not recognise. This tries progressively simpler
variations until one resolves:
1. Original name (spaces → hyphens)
2. Leading zeros stripped from numeric parts (``iso-8859-02`` → ``iso-8859-2``)
3. Hyphens removed entirely (``cp-1250`` → ``cp1250``)
4. Hyphens removed from all but the first segment (``iso-8859-2`` kept)
:param name: Raw charset name from the server.
:returns: Normalized name suitable for :func:`codecs.lookup`.
"""
# std imports
import re # pylint: disable=import-outside-toplevel
base = name.strip().replace(' ', '-')
# Strip leading zeros from numeric segments: iso-8859-02 → iso-8859-2
no_leading_zeros = re.sub(r'-0+(\d)', r'-\1', base)
# All hyphens removed: cp-1250 → cp1250
no_hyphens = base.replace('-', '')
# Keep first hyphen-segment, collapse the rest: iso-8859-2 stays
parts = no_leading_zeros.split('-')
if len(parts) > 2:
partial = parts[0] + '-' + ''.join(parts[1:])
else:
partial = no_leading_zeros
for candidate in (base, no_leading_zeros, no_hyphens, partial):
try:
codecs.lookup(candidate)
return candidate
except LookupError:
continue
return base
def send_charset(self, offered: List[str]) -> str:

@@ -222,3 +260,5 @@ """

try:
canon = codecs.lookup(offer).name
canon = codecs.lookup(
self._normalize_charset_name(offer)
).name

@@ -378,3 +418,3 @@ # Record first viable encoding

shell: Optional[ShellCallback] = None,
connect_minwait: float = 2.0,
connect_minwait: float = 0,
connect_maxwait: float = 3.0,

@@ -498,3 +538,3 @@ connect_timeout: Optional[float] = None,

async def run_client() -> None:
async def run_client() -> None: # pylint: disable=too-many-locals,too-many-statements,too-complex
"""Command-line 'telnetlib3-client' entry point, via setuptools."""

@@ -512,24 +552,27 @@ args = _transform_args(_get_argument_parser().parse_args())

# Wrap client factory to inject always_will/always_do before negotiation
client_factory: Optional[Callable[..., client_base.BaseClient]] = None
if always_will or always_do:
# Wrap client factory to inject always_will/always_do and encoding
# flags before negotiation starts.
encoding_explicit = args["encoding"] not in ("utf8", "utf-8", False)
def _client_factory(**kwargs: Any) -> client_base.BaseClient:
client: TelnetClient
if sys.platform != "win32" and sys.stdin.isatty():
client = TelnetTerminalClient(**kwargs)
else:
client = TelnetClient(**kwargs)
orig_connection_made = client.connection_made
def _client_factory(**kwargs: Any) -> client_base.BaseClient:
client: TelnetClient
if sys.platform != "win32" and sys.stdin.isatty():
client = TelnetTerminalClient(**kwargs)
else:
client = TelnetClient(**kwargs)
orig_connection_made = client.connection_made
def _patched_connection_made(transport: asyncio.BaseTransport) -> None:
orig_connection_made(transport)
assert client.writer is not None
def _patched_connection_made(transport: asyncio.BaseTransport) -> None:
orig_connection_made(transport)
assert client.writer is not None
if always_will:
client.writer.always_will = always_will
if always_do:
client.writer.always_do = always_do
client.writer._encoding_explicit = encoding_explicit # pylint: disable=protected-access
client.connection_made = _patched_connection_made # type: ignore[method-assign]
return client
client.connection_made = _patched_connection_made # type: ignore[method-assign]
return client
client_factory = _client_factory
client_factory: Optional[Callable[..., client_base.BaseClient]] = _client_factory

@@ -545,4 +588,15 @@ # Wrap the shell callback to inject color filter when enabled

ColorFilter,
PetsciiColorFilter,
AtasciiControlFilter,
)
# Auto-select encoding-specific filters
encoding_name: str = args.get("encoding", "") or ""
is_petscii = encoding_name.lower() in ("petscii", "cbm", "commodore", "c64", "c128")
is_atascii = encoding_name.lower() in ("atascii", "atari8bit", "atari_8bit")
if colormatch == "petscii":
colormatch = "c64"
if is_petscii and colormatch != "c64":
colormatch = "c64"
if colormatch not in PALETTES:

@@ -561,3 +615,8 @@ print(

)
color_filter = ColorFilter(color_config)
if is_petscii or colormatch == "c64":
color_filter_obj: object = PetsciiColorFilter(color_config)
elif is_atascii:
color_filter_obj = AtasciiControlFilter()
else:
color_filter_obj = ColorFilter(color_config)
original_shell = shell_callback

@@ -570,3 +629,3 @@

# pylint: disable-next=protected-access
writer_arg._color_filter = color_filter # type: ignore[union-attr]
writer_arg._color_filter = color_filter_obj # type: ignore[union-attr]
await original_shell(reader, writer_arg)

@@ -576,2 +635,33 @@

# Wrap shell to inject raw_mode flag and input translation for retro encodings
raw_mode: bool = args.get("raw_mode", False)
if raw_mode:
# local
from .client_shell import ( # pylint: disable=import-outside-toplevel
_INPUT_XLAT,
_INPUT_SEQ_XLAT,
InputFilter,
)
enc_key = (args.get("encoding", "") or "").lower()
byte_xlat = _INPUT_XLAT.get(enc_key, {})
seq_xlat = _INPUT_SEQ_XLAT.get(enc_key, {})
input_filter: Optional[InputFilter] = (
InputFilter(seq_xlat, byte_xlat) if (seq_xlat or byte_xlat) else None
)
_inner_shell = shell_callback
async def _raw_shell(
reader: Union[TelnetReader, TelnetReaderUnicode],
writer_arg: Union[TelnetWriter, TelnetWriterUnicode],
) -> None:
# pylint: disable-next=protected-access
writer_arg._raw_mode = True # type: ignore[union-attr]
if input_filter is not None:
# pylint: disable-next=protected-access
writer_arg._input_filter = input_filter # type: ignore[union-attr]
await _inner_shell(reader, writer_arg)
shell_callback = _raw_shell
# Build connection kwargs explicitly to avoid pylint false positive

@@ -626,5 +716,13 @@ connection_kwargs: Dict[str, Any] = {

parser.add_argument(
"--connect-minwait", default=1.0, type=float, help="shell delay for negotiation"
"--line-mode",
action="store_true",
default=False,
help="use line-buffered input with local echo instead of raw terminal "
"mode. By default the client uses raw mode (no line buffering, no "
"local echo) which is correct for most BBS and MUD servers.",
)
parser.add_argument(
"--connect-minwait", default=0, type=float, help="shell delay for negotiation"
)
parser.add_argument(
"--connect-maxwait", default=4.0, type=float, help="timeout for pending negotiation"

@@ -659,3 +757,3 @@ )

"--colormatch",
default="ega",
default="vga",
metavar="PALETTE",

@@ -730,2 +828,12 @@ help=(

def _transform_args(args: argparse.Namespace) -> Dict[str, Any]:
# Auto-enable force_binary for retro BBS encodings that use high-bit bytes.
# local
from .encodings import FORCE_BINARY_ENCODINGS # pylint: disable=import-outside-toplevel
force_binary = args.force_binary
raw_mode = not args.line_mode
if args.encoding.lower().replace('-', '_') in FORCE_BINARY_ENCODINGS:
force_binary = True
raw_mode = True
return {

@@ -741,3 +849,3 @@ "host": args.host,

"term": args.term,
"force_binary": args.force_binary,
"force_binary": force_binary,
"encoding_errors": args.encoding_errors,

@@ -754,2 +862,3 @@ "connect_minwait": args.connect_minwait,

"reverse_video": args.reverse_video,
"raw_mode": raw_mode,
}

@@ -927,2 +1036,4 @@

client.writer.environ_encoding = environ_encoding
# pylint: disable-next=protected-access
client.writer._encoding_explicit = environ_encoding != "ascii"
client.writer.always_will = fp_always_will

@@ -951,3 +1062,3 @@ client.writer.always_do = fp_always_do

term=ttype,
connect_minwait=2.0,
connect_minwait=0,
connect_maxwait=4.0,

@@ -954,0 +1065,0 @@ connect_timeout=args.connect_timeout,

@@ -44,3 +44,9 @@ """

__all__ = ("ColorConfig", "ColorFilter", "PALETTES")
__all__ = (
"AtasciiControlFilter",
"ColorConfig",
"ColorFilter",
"PetsciiColorFilter",
"PALETTES",
)

@@ -169,2 +175,22 @@ # Type alias for a 16-color palette: 16 (R, G, B) tuples indexed 0-15.

),
# VIC-II C64 palette (Pepto's colodore reference).
# Indexed by VIC-II color register 0-15, NOT ANSI SGR order.
"c64": (
(0, 0, 0), # 0 black
(255, 255, 255), # 1 white
(136, 0, 0), # 2 red
(170, 255, 238), # 3 cyan
(204, 68, 204), # 4 purple
(0, 204, 85), # 5 green
(0, 0, 170), # 6 blue
(238, 238, 119), # 7 yellow
(221, 136, 85), # 8 orange
(102, 68, 0), # 9 brown
(255, 119, 119), # 10 pink / light red
(51, 51, 51), # 11 dark grey
(119, 119, 119), # 12 grey
(170, 255, 102), # 13 light green
(0, 136, 255), # 14 light blue
(187, 187, 187), # 15 light grey
),
}

@@ -434,1 +460,182 @@

return result
# PETSCII decoded control character → VIC-II palette index (0-15).
_PETSCII_COLOR_CODES: Dict[str, int] = {
'\x05': 1, # WHT (white)
'\x1c': 2, # RED
'\x1e': 5, # GRN (green)
'\x1f': 6, # BLU (blue)
'\x81': 8, # ORN (orange)
'\x90': 0, # BLK (black)
'\x95': 9, # BRN (brown)
'\x96': 10, # LRD (pink / light red)
'\x97': 11, # GR1 (dark grey)
'\x98': 12, # GR2 (grey)
'\x99': 13, # LGR (light green)
'\x9a': 14, # LBL (light blue)
'\x9b': 15, # GR3 (light grey)
'\x9c': 4, # PUR (purple)
'\x9e': 7, # YEL (yellow)
'\x9f': 3, # CYN (cyan)
}
# PETSCII cursor/screen control codes → ANSI escape sequences.
_PETSCII_CURSOR_CODES: Dict[str, str] = {
'\x11': '\x1b[B', # cursor down
'\x91': '\x1b[A', # cursor up
'\x1d': '\x1b[C', # cursor right
'\x9d': '\x1b[D', # cursor left
'\x13': '\x1b[H', # HOME (cursor to top-left)
'\x93': '\x1b[2J', # CLR (clear screen)
'\x14': '\x08\x1b[P', # DEL (destructive backspace)
}
# All PETSCII control chars handled by the filter.
_PETSCII_FILTER_CHARS = (
frozenset(_PETSCII_COLOR_CODES)
| frozenset(_PETSCII_CURSOR_CODES)
| {'\x12', '\x92'}
)
# Precompiled pattern matching any single PETSCII control character that
# the filter should consume (color codes, cursor codes, RVS ON/OFF).
_PETSCII_CTRL_RE = re.compile(
'[' + re.escape(''.join(sorted(_PETSCII_FILTER_CHARS))) + ']'
)
class PetsciiColorFilter:
r"""
Translate PETSCII control codes to ANSI sequences.
PETSCII uses single-byte control codes embedded in the text stream for
color changes, cursor movement, and screen control. This filter
translates them to ANSI equivalents:
- **Colors**: 16 VIC-II palette colors → ``\x1b[38;2;R;G;Bm`` (24-bit RGB)
- **Reverse video**: RVS ON/OFF → ``\x1b[7m`` / ``\x1b[27m``
- **Cursor**: up/down/left/right → ``\x1b[A/B/C/D``
- **Screen**: HOME → ``\x1b[H``, CLR → ``\x1b[2J``
- **DEL**: destructive backspace → ``\x08\x1b[P``
:param config: Color configuration (uses ``brightness`` and ``contrast``
for palette adjustment; ``palette_name`` is ignored — always C64).
"""
def __init__(self, config: Optional[ColorConfig] = None) -> None:
"""Initialize PETSCII filter with optional color configuration."""
palette = PALETTES["c64"]
if config is not None:
brightness = config.brightness
contrast = config.contrast
else:
brightness = 0.9
contrast = 0.8
self._adjusted: List[Tuple[int, int, int]] = [
_adjust_color(r, g, b, brightness, contrast) for r, g, b in palette
]
def _sgr_for_index(self, idx: int) -> str:
"""Return a 24-bit foreground SGR sequence for palette *idx*."""
r, g, b = self._adjusted[idx]
return f'\x1b[38;2;{r};{g};{b}m'
def filter(self, text: str) -> str:
"""
Replace PETSCII control codes with ANSI sequences.
PETSCII control characters (colors, cursor, screen) are replaced with their ANSI
equivalents. All other characters pass through unchanged.
:param text: Decoded PETSCII text (Unicode string).
:returns: Text with PETSCII controls translated to ANSI.
"""
if not _PETSCII_CTRL_RE.search(text):
return text
return _PETSCII_CTRL_RE.sub(self._replace, text)
def _replace(self, match: Match[str]) -> str:
"""Regex callback for a single PETSCII control character."""
ch = match.group()
idx = _PETSCII_COLOR_CODES.get(ch)
if idx is not None:
return self._sgr_for_index(idx)
cursor = _PETSCII_CURSOR_CODES.get(ch)
if cursor is not None:
return cursor
if ch == '\x12':
return '\x1b[7m'
if ch == '\x92':
return '\x1b[27m'
return ''
def flush(self) -> str:
"""
Flush buffered state.
PETSCII color codes are single-byte, so no buffering is needed.
:returns: Always ``""``.
"""
return ""
# ATASCII decoded control character glyphs → ANSI terminal sequences.
# The atascii codec decodes control bytes to Unicode glyphs; this map
# translates those glyphs to the terminal actions they represent.
_ATASCII_CONTROL_CODES: Dict[str, str] = {
'\u25c0': '\x08\x1b[P', # ◀ backspace/delete (0x7E / 0xFE)
'\u25b6': '\t', # ▶ tab (0x7F / 0xFF)
'\u21b0': '\x1b[2J\x1b[H', # ↰ clear screen (0x7D / 0xFD)
'\u2191': '\x1b[A', # ↑ cursor up (0x1C / 0x9C)
'\u2193': '\x1b[B', # ↓ cursor down (0x1D / 0x9D)
'\u2190': '\x1b[D', # ← cursor left (0x1E / 0x9E)
'\u2192': '\x1b[C', # → cursor right (0x1F / 0x9F)
}
_ATASCII_CTRL_RE = re.compile(
'[' + re.escape(''.join(sorted(_ATASCII_CONTROL_CODES))) + ']'
)
class AtasciiControlFilter:
r"""
Translate decoded ATASCII control character glyphs to ANSI sequences.
The ``atascii`` codec decodes ATASCII control bytes into Unicode glyphs
(e.g. byte 0x7E → U+25C0 ◀). This filter replaces those glyphs with
the ANSI terminal sequences that produce the intended effect:
- **Backspace/delete**: ◀ → ``\x08\x1b[P`` (destructive backspace)
- **Tab**: ▶ → ``\t``
- **Clear screen**: ↰ → ``\x1b[2J\x1b[H``
- **Cursor movement**: ↑↓←→ → ``\x1b[A/B/D/C``
"""
def filter(self, text: str) -> str:
"""
Replace ATASCII control glyphs with ANSI sequences.
:param text: Decoded ATASCII text (Unicode string).
:returns: Text with control glyphs translated to ANSI.
"""
if not _ATASCII_CTRL_RE.search(text):
return text
return _ATASCII_CTRL_RE.sub(self._replace, text)
@staticmethod
def _replace(match: Match[str]) -> str:
"""Regex callback for a single ATASCII control glyph."""
return _ATASCII_CONTROL_CODES.get(match.group(), '')
@staticmethod
def flush() -> str:
"""
Flush buffered state.
ATASCII control glyphs are single characters, so no buffering is needed.
:returns: Always ``""``.
"""
return ""
"""
Custom BBS/retro-computing codecs for telnetlib3.
Registers petscii and atarist codecs with Python's codecs module on import.
These encodings are then available for use with ``bytes.decode()`` and the
``--encoding`` CLI flag of ``telnetlib3-fingerprint``.
Registers atascii, petscii, and atarist codecs with Python's codecs module
on import. These encodings are then available for use with
``bytes.decode()`` and the ``--encoding`` CLI flag of
``telnetlib3-fingerprint``.
"""

@@ -48,2 +49,10 @@

#: Encoding names (and aliases) that require BINARY mode for high-bit bytes.
#: Used by CLI entry points to auto-enable ``--force-binary``.
FORCE_BINARY_ENCODINGS = frozenset({
'atascii', 'atari8bit', 'atari_8bit',
'petscii', 'cbm', 'commodore', 'c64', 'c128',
'atarist', 'atari',
})
codecs.register(_search_function)

@@ -7,5 +7,5 @@ """

The ``robot_check`` function can reliably detect whether the remote end is a real terminal
emulator by measuring the rendered width of a wide Unicode character. Real terminals
render it as width 2, while bots typically see width 1 or timeout.
The ``robot_check`` function detects whether the remote end is a real terminal emulator
by requesting a cursor position report (CPR) after writing a single space character.
Real terminals respond to CPR, while bots typically timeout.

@@ -34,4 +34,4 @@ These shells are used when normal shell access is denied due to connection limits or

# Wide character test - U+231A WATCH should render as width 2
_WIDE_TEST_CHAR = "\u231a"
# Narrow test character - a plain space works on any terminal
_TEST_CHAR = " "

@@ -219,9 +219,9 @@ # Input limit for guard shells

"""
Check if client can render wide characters.
Check if client responds to cursor position report.
:returns: True if client passes (renders wide char as width 2).
:returns: True if client passes (responds to CPR with expected width).
"""
with _latin1_reading(reader):
width = await _measure_width(reader, writer, _WIDE_TEST_CHAR, timeout)
return bool(width == 2)
width = await _measure_width(reader, writer, _TEST_CHAR, timeout)
return bool(width == 1)

@@ -228,0 +228,0 @@

@@ -17,2 +17,3 @@ """Module provides class BaseServer."""

from .telopt import theNULL
from .accessories import TRACE, hexdump
from .stream_reader import TelnetReader, TelnetReaderUnicode

@@ -198,3 +199,3 @@ from .stream_writer import TelnetWriter, TelnetWriterUnicode

def data_received(self, data: bytes) -> None:
def data_received(self, data: bytes) -> None: # pylint: disable=too-complex
"""

@@ -216,2 +217,4 @@ Process bytes received by transport.

#
if logger.isEnabledFor(TRACE):
logger.log(TRACE, "recv %d bytes\n%s", len(data), hexdump(data, prefix="<< "))
self._last_received = datetime.datetime.now()

@@ -218,0 +221,0 @@ self._rx_bytes += len(data)

@@ -23,4 +23,8 @@ """

import subprocess
from typing import Any
from typing import Any, NamedTuple
# 3rd party
import wcwidth as _wcwidth
from wcwidth.escape_sequences import ZERO_WIDTH_PATTERN as _ZERO_WIDTH_STR_PATTERN
# local

@@ -69,26 +73,31 @@ from . import fingerprinting as _fps

_PROBE_TIMEOUT = 0.5
_MAX_PROMPT_REPLIES = 5
_JQ = shutil.which("jq")
# Match "yes/no" or "y/n" surrounded by non-alphanumeric chars (or at
# string boundaries). Used to auto-answer confirmation prompts.
_YN_RE = re.compile(rb"(?i)(?:^|[^a-zA-Z0-9])(yes/no|y/n)(?:[^a-zA-Z0-9]|$)")
# Match "yes/no", "y/n", "(Yes|No)", or "(Yn)"/"[Yn]" surrounded by
# non-alphanumeric chars (or at string boundaries). The "(Yn)" form is
# a common BBS convention where the capital letter indicates the default.
_YN_RE = re.compile(
rb"(?i)(?:^|[^a-zA-Z0-9])(yes/no|y/n|\(yes\|no\))(?:[^a-zA-Z0-9]|$)"
rb"|[(\[][Yy][Nn][)\]]"
rb"|[(\[][yY][nN][)\]]"
)
# Match MUD/BBS login prompts that offer 'who' as a command.
# Quoted: "enter a name (or 'who')", or bare WHO without surrounding
# alphanumerics: "\nWHO to see players connected.\n"
_WHO_RE = re.compile(rb"(?i)(?:'who'|\"who\"|(?:^|[^a-zA-Z0-9])who(?:[^a-zA-Z0-9]|$))")
# Same pattern for 'help' — offered as a login-screen command on many MUDs.
_HELP_RE = re.compile(rb"(?i)(?:'help'|\"help\"|(?:^|[^a-zA-Z0-9])help(?:[^a-zA-Z0-9]|$))")
# Match "color?" prompts — many MUDs ask if the user wants color.
_COLOR_RE = re.compile(rb"(?i)color\s*\?")
# Match numbered menu items offering UTF-8, e.g. "5) UTF-8" or "3) utf8".
# Many BBS/MUD systems present a charset selection menu at connect time.
_MENU_UTF8_RE = re.compile(rb"(\d+)\s*\)\s*UTF-?8", re.IGNORECASE)
# Match numbered menu items offering UTF-8, e.g. "5) UTF-8", "[3] UTF-8",
# "2. UTF-8", or "1 ... UTF-8". Many BBS/MUD systems present a charset
# selection menu at connect time. The optional \S+/ prefix handles
# "Something/UTF-8" labels.
_MENU_UTF8_RE = re.compile(
rb"[\[(]?(\d+)\s*(?:[\])]|\.{1,3})\s*(?:\S+\s*/\s*)?UTF-?8", re.IGNORECASE
)
# Match numbered menu items offering ANSI, e.g. "(1) Ansi" or "[2] ANSI".
# Brackets may be parentheses or square brackets (mixed allowed).
_MENU_ANSI_RE = re.compile(rb"[\[(](\d+)[\])]\s*ANSI", re.IGNORECASE)
# Match numbered menu items offering ANSI, e.g. "(1) Ansi", "[2] ANSI",
# "3. English/ANSI", or "1 ... English/ANSI". Brackets, dot, and
# ellipsis delimiters are accepted.
_MENU_ANSI_RE = re.compile(
rb"[\[(]?(\d+)\s*(?:[\])]|\.{1,3})\s*(?:\S+\s*/\s*)?ANSI", re.IGNORECASE
)

@@ -98,2 +107,204 @@ # Match "gb/big5" encoding selection prompts common on Chinese BBS systems.

# Strip ANSI/VT100 escape sequences from raw bytes for pattern matching.
# Reuse wcwidth's comprehensive pattern (CSI, OSC, DCS, APC, PM, charset, Fe, Fp).
_ANSI_STRIP_RE = re.compile(_ZERO_WIDTH_STR_PATTERN.pattern.encode("ascii"))
# Match "Press [.ESC.] twice" botcheck prompts (e.g. Mystic BBS).
_ESC_TWICE_RE = re.compile(rb"(?i)press\s+[\[<]?\.?esc\.?[\]>]?\s+twice")
# Match single "Press [ESC]" prompts without "twice" (e.g. Herbie's BBS).
_ESC_ONCE_RE = re.compile(rb"(?i)press\s+[\[<]?\.?esc\.?[\]>]?(?!\s+twice)")
# Match "HIT RETURN", "PRESS RETURN", "PRESS ENTER", "HIT ENTER", etc.
# Common on Worldgroup/MajorBBS and other vintage BBS systems.
_RETURN_PROMPT_RE = re.compile(
rb"(?i)(?:hit|press)\s+(?:return|enter)\s*[:\.]?"
)
# Match "Press the BACKSPACE key" prompts — standard telnet terminal
# detection (e.g. TelnetBible.com). Respond with ASCII BS (0x08).
_BACKSPACE_KEY_RE = re.compile(
rb"(?i)press\s+the\s+backspace\s+key"
)
# Match "press del/backspace" and "hit your backspace/delete" prompts from
# PETSCII BBS systems (e.g. Image BBS C/G detect). Respond with PETSCII
# DEL byte (0x14).
_DEL_BACKSPACE_RE = re.compile(
rb"(?i)(?:press|hit)\s+(?:your\s+)?"
rb"(?:del(?:ete)?(?:\s*/\s*backspace)?|backspace(?:\s*/\s*del(?:ete)?)?)"
rb"(?:\s+key)?\s*[:\.]?"
)
# Match "More: (Y)es, (N)o, (C)ontinuous?" pagination prompts.
# Answer "C" (Continuous) to disable pagination and collect the full banner.
_MORE_PROMPT_RE = re.compile(
rb"(?i)more[:\s]*\(?[yY]\)?.*\(?[cC]\)?\s*(?:ontinuous|ont)"
)
# Match DSR (Device Status Report) request: ESC [ 6 n.
# Servers send this to detect ANSI-capable terminals; we reply with a
# Cursor Position Report (CPR) so the server sees us as ANSI-capable.
_DSR_RE = re.compile(rb"\x1b\[6n")
# Match SyncTERM/CTerm font selection: CSI Ps1 ; Ps2 SP D
# Reference: https://syncterm.bbsdev.net/cterm.html
# Ps1 = font page (0 = primary), Ps2 = font ID (0-255).
_SYNCTERM_FONT_RE = re.compile(rb"\x1b\[(\d+);(\d+) D")
#: Map SyncTERM font IDs to Python codec names.
#: Font IDs from CTerm spec / Synchronet SBBS / x84 SYNCTERM_FONTMAP.
SYNCTERM_FONT_ENCODINGS: dict[int, str] = {
0: "cp437",
1: "cp1251",
2: "koi8-r",
3: "iso-8859-2",
4: "iso-8859-4",
5: "cp866",
6: "iso-8859-9",
8: "iso-8859-8",
9: "koi8-u",
10: "iso-8859-15",
11: "iso-8859-4",
12: "koi8-r",
13: "iso-8859-4",
14: "iso-8859-5",
16: "iso-8859-15",
17: "cp850",
18: "cp850",
20: "cp1251",
21: "iso-8859-7",
22: "koi8-r",
23: "iso-8859-4",
24: "iso-8859-1",
25: "cp866",
26: "cp437",
27: "cp866",
29: "cp866",
30: "iso-8859-1",
31: "cp1131",
32: "petscii",
33: "petscii",
34: "petscii",
35: "petscii",
36: "atascii",
37: "cp437",
38: "cp437",
39: "cp437",
40: "cp437",
41: "cp437",
42: "cp437",
}
#: Encodings that require ``force_binary`` for high-bit bytes.
_SYNCTERM_BINARY_ENCODINGS = frozenset({
"petscii", "atascii",
})
log = logging.getLogger(__name__)
def detect_syncterm_font(data: bytes) -> str | None:
"""
Extract encoding from a SyncTERM font selection sequence in *data*.
Scans *data* for ``CSI Ps1 ; Ps2 SP D`` and returns the corresponding
Python codec name from :data:`SYNCTERM_FONT_ENCODINGS`, or ``None``
if no font sequence is found or the font ID is unrecognised.
:param data: Raw bytes that may contain escape sequences.
:returns: Encoding name or ``None``.
"""
match = _SYNCTERM_FONT_RE.search(data)
if match is None:
return None
font_id = int(match.group(2))
return SYNCTERM_FONT_ENCODINGS.get(font_id)
#: Encodings where standard telnet CR+LF must be re-encoded to the
#: codec's native EOL byte. The codec's ``encode()`` handles the
#: actual CR → LF normalization; we just gate the re-encoding step.
_RETRO_EOL_ENCODINGS = frozenset({
'atascii', 'atari8bit', 'atari_8bit',
})
def _reencode_prompt(response: bytes, encoding: str) -> bytes:
r"""
Re-encode an ASCII prompt response for the server's encoding.
For retro encodings (ATASCII), the standard ``\r\n`` line ending
is re-encoded through the codec so the server receives its native
EOL byte. For all other encodings the response is returned as-is.
:param response: ASCII prompt response bytes (e.g. ``b"yes\r\n"``).
:param encoding: Remote server encoding name.
:returns: Response bytes suitable for the server's encoding.
"""
normalized = encoding.lower().replace('-', '_')
if normalized not in _RETRO_EOL_ENCODINGS:
return response
try:
text = response.decode('ascii')
return text.encode(encoding)
except (UnicodeDecodeError, UnicodeEncodeError, LookupError):
return response
class _VirtualCursor:
"""
Track virtual cursor column to generate position-aware CPR responses.
The server's robot-check sends DSR, writes a test character, then sends
DSR again. It compares the two cursor positions to verify the character
rendered at the expected width. By tracking what the server writes
between DSR requests and advancing the column using :func:`wcwidth.wcwidth`,
the scanner produces CPR responses that satisfy the width check.
When *encoding* is set to a single-byte encoding like ``cp437``, raw
bytes are decoded with that encoding before measuring — this gives
correct column widths for servers that use SyncTERM font switching
where the raw bytes are not valid UTF-8.
"""
def __init__(self, encoding: str = "utf-8") -> None:
self.col = 1
self.encoding = encoding
self.dsr_requests = 0
self.dsr_replies = 0
def cpr(self) -> bytes:
"""Return a CPR response (``ESC [ row ; col R``) at the current position."""
self.dsr_replies += 1
return f"\x1b[1;{self.col}R".encode("ascii")
def advance(self, data: bytes) -> None:
"""
Advance cursor column for *data* (non-DSR text from the server).
ANSI escape sequences are stripped first so they do not contribute
to cursor movement. Backspace and carriage return are handled.
Bytes are decoded using :attr:`encoding` so that single-byte
encodings like CP437 produce the correct character widths.
"""
stripped = _ANSI_STRIP_RE.sub(b"", data)
try:
text = stripped.decode(self.encoding, errors="replace")
# pylint: disable-next=broad-exception-caught,overlapping-except
except (LookupError, Exception):
text = stripped.decode("latin-1")
for ch in text:
cp = ord(ch)
if cp == 0x08: # backspace
self.col = max(1, self.col - 1)
elif cp == 0x0D: # \r
self.col = 1
elif cp >= 0x20:
w = _wcwidth.wcwidth(ch)
if w > 0:
self.col += w
logger = logging.getLogger("telnetlib3.server_fingerprint")

@@ -134,55 +345,57 @@

def _detect_yn_prompt(banner: bytes) -> bytes: # pylint: disable=too-many-return-statements
r"""
Return an appropriate first-prompt response based on banner content.
class _PromptResult(NamedTuple):
"""Result of prompt detection with optional encoding override."""
If the banner contains a ``yes/no`` or ``y/n`` confirmation prompt
(case-insensitive, delimited by non-alphanumeric characters), returns
``b"yes\r\n"`` or ``b"y\r\n"`` respectively.
response: bytes | None
encoding: str | None = None
If the banner contains a ``color?`` prompt (case-insensitive),
returns ``b"y\r\n"`` to accept color.
If the banner contains a numbered menu item for UTF-8 (e.g.
``5) UTF-8``), returns the digit followed by ``b"\r\n"`` to select
the UTF-8 charset option.
def _detect_yn_prompt( # pylint: disable=too-many-return-statements
banner: bytes,
) -> _PromptResult:
r"""
Return an appropriate first-prompt response based on banner content.
If the banner contains a bracketed numbered menu item for ANSI
(e.g. ``(1) Ansi`` or ``[2] ANSI``), returns the digit followed
by ``b"\r\n"`` to select the ANSI option.
ANSI escape sequences are stripped before pattern matching so that
embedded color/cursor codes do not interfere with detection.
If the banner contains a ``gb/big5`` encoding selection prompt
(common on Chinese BBS systems), returns ``b"big5\r\n"`` to select
Big5 encoding.
Returns a :class:`_PromptResult` whose *response* is ``None`` when
no recognizable prompt is found — the caller should fall back to
sending a bare ``\r\n``. When a UTF-8 charset menu is selected,
*encoding* is set to ``"utf-8"`` so the caller can update the
session encoding.
If the banner contains a MUD/BBS login prompt offering ``'who'`` as
an alternative (e.g. "enter a name (or 'who')"), returns
``b"who\r\n"``. Similarly, ``'help'`` prompts return ``b"help\r\n"``.
Otherwise returns a bare ``b"\r\n"``.
:param banner: Raw banner bytes collected before the first prompt.
:returns: Response bytes to send.
:returns: Prompt result with response bytes and optional encoding.
"""
match = _YN_RE.search(banner)
stripped = _ANSI_STRIP_RE.sub(b"", banner)
if _ESC_TWICE_RE.search(stripped):
return _PromptResult(b"\x1b\x1b")
if _ESC_ONCE_RE.search(stripped):
return _PromptResult(b"\x1b")
if _MORE_PROMPT_RE.search(stripped):
return _PromptResult(b"C\r\n")
match = _YN_RE.search(stripped)
if match:
token = match.group(1).lower()
if token == b"yes/no":
return b"yes\r\n"
return b"y\r\n"
if _COLOR_RE.search(banner):
return b"y\r\n"
menu_match = _MENU_UTF8_RE.search(banner)
token = match.group(1)
if token is not None and token.lower() in (b"yes/no", b"(yes|no)"):
return _PromptResult(b"yes\r\n")
return _PromptResult(b"y\r\n")
if _COLOR_RE.search(stripped):
return _PromptResult(b"y\r\n")
menu_match = _MENU_UTF8_RE.search(stripped)
if menu_match:
return menu_match.group(1) + b"\r\n"
ansi_match = _MENU_ANSI_RE.search(banner)
return _PromptResult(menu_match.group(1) + b"\r\n", encoding="utf-8")
ansi_match = _MENU_ANSI_RE.search(stripped)
if ansi_match:
return ansi_match.group(1) + b"\r\n"
if _GB_BIG5_RE.search(banner):
return b"big5\r\n"
if _WHO_RE.search(banner):
return b"who\r\n"
if _HELP_RE.search(banner):
return b"help\r\n"
return b"\r\n"
return _PromptResult(ansi_match.group(1) + b"\r\n")
if _GB_BIG5_RE.search(stripped):
return _PromptResult(b"big5\r\n", encoding="big5")
if _BACKSPACE_KEY_RE.search(stripped):
return _PromptResult(b"\x08")
if _DEL_BACKSPACE_RE.search(stripped):
return _PromptResult(b"\x14")
if _RETURN_PROMPT_RE.search(stripped):
return _PromptResult(b"\r\n")
return _PromptResult(None)

@@ -223,2 +436,4 @@

``"ascii"`` per :rfc:`1572`; use ``"cp037"`` for EBCDIC hosts.
When set to something other than ``"ascii"``, SyncTERM font
switches will not override this encoding.
:param scan_type: ``"quick"`` probes CORE + MUD options only (default);

@@ -233,2 +448,3 @@ ``"full"`` includes all LEGACY options.

writer.environ_encoding = environ_encoding
writer._encoding_explicit = environ_encoding != "ascii" # pylint: disable=protected-access
try:

@@ -254,3 +470,3 @@ await _fingerprint_session(

async def _fingerprint_session( # pylint: disable=too-many-locals
async def _fingerprint_session( # noqa: E501 ; pylint: disable=too-many-locals,too-many-branches,too-many-statements,too-complex
reader: TelnetReader,

@@ -272,18 +488,68 @@ writer: TelnetWriter,

start_time = time.time()
cursor = _VirtualCursor(encoding=writer.environ_encoding)
# 1. Let straggler negotiation settle
await asyncio.sleep(_NEGOTIATION_SETTLE)
# 1. Let straggler negotiation settle — read (and respond to DSR)
# instead of sleeping blind so early DSR requests get a CPR reply.
settle_data = await _read_banner_until_quiet(
reader, quiet_time=_NEGOTIATION_SETTLE, max_wait=_NEGOTIATION_SETTLE,
max_bytes=banner_max_bytes, writer=writer, cursor=cursor,
)
# 2. Read banner (pre-return) — wait until output stops
banner_before = await _read_banner_until_quiet(
reader, quiet_time=banner_quiet_time, max_wait=banner_max_wait, max_bytes=banner_max_bytes
banner_before_raw = await _read_banner_until_quiet(
reader, quiet_time=banner_quiet_time, max_wait=banner_max_wait,
max_bytes=banner_max_bytes, writer=writer, cursor=cursor,
)
banner_before = settle_data + banner_before_raw
# 3. Send return (or "yes"/"y" if the banner contains a y/n prompt)
prompt_response = _detect_yn_prompt(banner_before)
writer.write(prompt_response)
await writer.drain()
banner_after = await _read_banner_until_quiet(
reader, quiet_time=banner_quiet_time, max_wait=banner_max_wait, max_bytes=banner_max_bytes
)
# 3. Respond to prompts — some servers ask multiple questions in
# sequence (e.g. "color?" then a UTF-8 charset menu). Loop up to
# _MAX_PROMPT_REPLIES times, stopping early when no prompt is detected
# or the connection is lost.
after_chunks: list[bytes] = []
latest_banner = banner_before
for _prompt_round in range(_MAX_PROMPT_REPLIES):
prompt_result = _detect_yn_prompt(latest_banner)
detected = prompt_result.response
# Skip if the ESC response was already sent inline during banner
# collection (time-sensitive botcheck countdowns).
if detected in (b"\x1b\x1b", b"\x1b") and getattr(
writer, '_esc_inline', False
):
# pylint: disable-next=protected-access
writer._esc_inline = False # type: ignore[attr-defined]
detected = None
prompt_response = _reencode_prompt(
detected if detected is not None else b"\r\n",
writer.environ_encoding,
)
writer.write(prompt_response)
await writer.drain()
# When the server presents a charset menu and we select an
# encoding (e.g. UTF-8 or Big5), switch the session encoding
# so that subsequent banner data is decoded correctly.
if prompt_result.encoding:
writer.environ_encoding = prompt_result.encoding
cursor.encoding = prompt_result.encoding
protocol = writer.protocol
if protocol is not None:
protocol.force_binary = True
previous_banner = latest_banner
latest_banner = await _read_banner_until_quiet(
reader, quiet_time=banner_quiet_time, max_wait=banner_max_wait,
max_bytes=banner_max_bytes, writer=writer, cursor=cursor,
)
after_chunks.append(latest_banner)
if writer.is_closing() or not latest_banner:
break
# Stop when the server repeats the same banner — it is not
# advancing through prompts, just re-displaying the login screen.
if latest_banner == previous_banner:
break
# Stop when no prompt was detected AND the new banner has no
# prompt either (servers like Mystic BBS send a non-interactive
# preamble before the real botcheck prompt).
if detected is None and _detect_yn_prompt(latest_banner).response is None:
break
banner_after = b"".join(after_chunks)

@@ -312,5 +578,13 @@ # 4. Snapshot option states before probing

"encoding": writer.environ_encoding,
"banner_before_return": _format_banner(banner_before, encoding=writer.environ_encoding),
"banner_after_return": _format_banner(banner_after, encoding=writer.environ_encoding),
"banner_before_return": _format_banner(
banner_before,
encoding=writer.environ_encoding,
),
"banner_after_return": _format_banner(
banner_after,
encoding=writer.environ_encoding,
),
"timing": {"probe": probe_time, "total": time.time() - start_time},
"dsr_requests": cursor.dsr_requests,
"dsr_replies": cursor.dsr_replies,
}

@@ -595,3 +869,3 @@ )

def _format_banner(data: bytes, encoding: str = "utf-8") -> str:
"""
r"""
Format raw banner bytes for JSON serialization.

@@ -602,9 +876,33 @@

Uses ``surrogateescape`` so high bytes (common in CP437 BBS art)
are preserved as surrogates (e.g. byte ``0xB1`` → ``U+DCB1``)
rather than replaced with ``U+FFFD``. JSON serialization escapes
them as ``\udcXX``, which round-trips through :func:`json.load`.
Falls back to ``latin-1`` when the requested encoding is unavailable
(e.g. a server-advertised charset that Python does not recognise).
When *encoding* is ``petscii``, inline PETSCII color control codes
are translated to ANSI 24-bit RGB SGR sequences using the VIC-II
C64 palette so the saved banner is human-readable with colors.
:param data: Raw bytes from the server.
:param encoding: Character encoding to use for decoding.
:returns: Decoded text string (undecodable bytes replaced).
:returns: Decoded text string (raw bytes preserved as surrogates).
"""
return data.decode(encoding, errors="replace")
try:
text = data.decode(encoding, errors="surrogateescape")
except LookupError:
text = data.decode("latin-1")
if encoding.lower() in ("petscii", "cbm", "commodore", "c64", "c128"):
# local
from .color_filter import PetsciiColorFilter # pylint: disable=import-outside-toplevel
text = PetsciiColorFilter().filter(text)
# PETSCII uses CR (0x0D) as line terminator; normalize to LF.
text = text.replace('\r\n', '\n').replace('\r', '\n')
return text
async def _await_mssp_data(writer: TelnetWriter, deadline: float) -> None:

@@ -641,3 +939,26 @@ """Wait for MSSP data until *deadline* if server acknowledged MSSP."""

async def _read_banner_until_quiet(
def _respond_to_dsr(
chunk: bytes, writer: TelnetWriter, cursor: _VirtualCursor | None
) -> None:
"""
Send CPR response(s) for each DSR found in *chunk*.
When *cursor* is provided, text between DSR sequences advances the
virtual cursor column so each CPR reflects the correct position.
Without a cursor, a static ``ESC [ 1 ; 1 R`` is sent for every DSR.
"""
if cursor is None:
for _ in _DSR_RE.finditer(chunk):
writer.write(b"\x1b[1;1R")
return
pos = 0
for match in _DSR_RE.finditer(chunk):
cursor.dsr_requests += 1
cursor.advance(chunk[pos:match.start()])
writer.write(cursor.cpr())
pos = match.end()
cursor.advance(chunk[pos:])
async def _read_banner_until_quiet( # noqa: E501 ; pylint: disable=too-many-positional-arguments,too-complex,too-many-nested-blocks
reader: TelnetReader,

@@ -647,2 +968,4 @@ quiet_time: float = 2.0,

max_bytes: int = _BANNER_MAX_BYTES,
writer: TelnetWriter | None = None,
cursor: _VirtualCursor | None = None,
) -> bytes:

@@ -656,2 +979,16 @@ """

When *writer* is provided, any DSR (Device Status Report) request
``ESC [ 6 n`` found in the incoming data is answered with a CPR
(Cursor Position Report) so the server detects an ANSI-capable
terminal.
When *cursor* is provided, the CPR response reflects the tracked
virtual cursor column (advanced by :func:`wcwidth.wcwidth` for each
printable character). This defeats robot-check guards that verify
cursor movement after writing a test character.
Time-sensitive prompts — ``Press [.ESC.] twice`` botcheck countdowns
— are detected inline and responded to immediately so the reply
arrives before the countdown expires.
:param reader: :class:`~telnetlib3.stream_reader.TelnetReader` instance.

@@ -661,5 +998,9 @@ :param quiet_time: Seconds of silence before considering banner complete.

:param max_bytes: Maximum bytes per read call.
:param writer: Optional :class:`~telnetlib3.stream_writer.TelnetWriter`
used to send CPR replies to DSR requests.
:param cursor: Optional :class:`_VirtualCursor` for position-aware CPR.
:returns: Banner bytes (may be empty).
"""
chunks: list[bytes] = []
esc_responded = False
loop = asyncio.get_event_loop()

@@ -675,2 +1016,37 @@ deadline = loop.time() + max_wait

break
if writer is not None and _DSR_RE.search(chunk):
_respond_to_dsr(chunk, writer, cursor)
await writer.drain()
elif cursor is not None:
cursor.advance(chunk)
if writer is not None:
font_enc = detect_syncterm_font(chunk)
if font_enc is not None:
log.debug("SyncTERM font switch detected: %s", font_enc)
if getattr(writer, '_encoding_explicit', False):
log.debug(
"ignoring font switch, explicit encoding: %s",
writer.environ_encoding)
else:
writer.environ_encoding = font_enc
if cursor is not None:
cursor.encoding = font_enc
protocol = writer.protocol
if (protocol is not None
and font_enc in _SYNCTERM_BINARY_ENCODINGS):
protocol.force_binary = True
if not esc_responded:
stripped_chunk = _ANSI_STRIP_RE.sub(b"", chunk)
if _ESC_TWICE_RE.search(stripped_chunk):
writer.write(b"\x1b\x1b")
await writer.drain()
esc_responded = True
# pylint: disable-next=protected-access
writer._esc_inline = True # type: ignore[attr-defined]
elif _ESC_ONCE_RE.search(stripped_chunk):
writer.write(b"\x1b")
await writer.drain()
esc_responded = True
# pylint: disable-next=protected-access
writer._esc_inline = True # type: ignore[attr-defined]
chunks.append(chunk)

@@ -677,0 +1053,0 @@ except (asyncio.TimeoutError, EOFError):

@@ -251,21 +251,2 @@ """Telnet server shell implementations."""

async def get_next_ascii(
reader: Union[TelnetReader, TelnetReaderUnicode],
writer: Union[TelnetWriter, TelnetWriterUnicode],
) -> Optional[str]:
"""Accept the next non-ANSI-escape character from reader."""
_reader = cast(TelnetReaderUnicode, reader)
escape_sequence = False
while not writer.is_closing():
next_char = await _reader.read(1)
if next_char == "\x1b":
escape_sequence = True
elif escape_sequence:
if 61 <= ord(next_char) <= 90 or 97 <= ord(next_char) <= 122:
escape_sequence = False
else:
return next_char
return None
@types.coroutine

@@ -272,0 +253,0 @@ def readline(

@@ -62,3 +62,3 @@ """

pty_args: Optional[List[str]] = None
pty_raw: bool = False
pty_raw: bool = True
robot_check: bool = False

@@ -993,7 +993,17 @@ pty_fork_limit: int = 0

parser.add_argument(
"--line-mode",
action="store_true",
default=False,
help="use cooked PTY mode with echo for --pty-exec instead of raw "
"mode. By default PTY echo is disabled (raw mode), which is "
"correct for programs that handle their own terminal I/O "
"(curses, blessed, ucs-detect).",
)
# Hidden backwards-compat: --pty-raw was the default since 2.5,
# keep it as a silent no-op so existing scripts don't break.
parser.add_argument(
"--pty-raw",
action="store_true",
default=_config.pty_raw,
help="raw mode for --pty-exec: disable PTY echo for programs that "
"handle their own terminal I/O (curses, blessed, ucs-detect)",
default=False,
help=argparse.SUPPRESS,
)

@@ -1026,2 +1036,6 @@ parser.add_argument(

result["pty_args"] = pty_args if PTY_SUPPORT else None
# --pty-raw is a hidden no-op (raw is now the default);
# --line-mode opts out of raw mode.
result.pop("pty_raw", None)
result["pty_raw"] = not result.pop("line_mode", False)
if not PTY_SUPPORT:

@@ -1031,2 +1045,10 @@ result["pty_exec"] = None

result["pty_raw"] = False
# Auto-enable force_binary for retro BBS encodings that use high-bit bytes.
# local
from .encodings import FORCE_BINARY_ENCODINGS # pylint: disable=import-outside-toplevel
if result["encoding"].lower().replace('-', '_') in FORCE_BINARY_ENCODINGS:
result["force_binary"] = True
return result

@@ -1033,0 +1055,0 @@

@@ -11,5 +11,50 @@ # std imports

# local
from telnetlib3.accessories import make_logger, repr_mapping, function_lookup, make_reader_task
from telnetlib3.accessories import (
TRACE,
hexdump,
make_logger,
repr_mapping,
function_lookup,
make_reader_task,
)
def test_trace_level_registered():
assert TRACE == 5
assert logging.getLevelName(TRACE) == "TRACE"
assert logging.getLevelName("TRACE") == TRACE
def test_hexdump_short():
data = b"Hello World\r\n"
result = hexdump(data)
assert "48 65 6c 6c 6f 20 57 6f" in result
assert "72 6c 64 0d 0a" in result
assert "|Hello World..|" in result
def test_hexdump_two_rows():
data = bytes(range(32))
result = hexdump(data)
lines = result.splitlines()
assert len(lines) == 2
assert lines[0].startswith("00000000")
assert lines[1].startswith("00000010")
def test_hexdump_prefix():
result = hexdump(b"\xff\xfd\x18", prefix=">> ")
assert result.startswith(">> 00000000")
assert "ff fd 18" in result
def test_hexdump_empty():
assert hexdump(b"") == ""
def test_make_logger_trace_level():
logger = make_logger("acc_trace", loglevel="trace")
assert logger.isEnabledFor(TRACE)
def test_make_logger_no_file():

@@ -16,0 +61,0 @@ logger = make_logger("acc_no_file", loglevel="info")

@@ -65,3 +65,33 @@ # std imports

@pytest.mark.parametrize(
"offered,expected",
[
pytest.param(["iso-8859-02"], "iso-8859-02", id="iso_leading_zero"),
pytest.param(["iso 8859-02"], "iso 8859-02", id="iso_space_leading_zero"),
pytest.param(["cp-1250"], "cp-1250", id="cp_hyphen"),
],
)
@pytest.mark.asyncio
async def test_send_charset_normalization(offered, expected):
c = _make_client(encoding=False)
c.default_encoding = None
assert c.send_charset(offered) == expected
@pytest.mark.parametrize(
"name,expected",
[
pytest.param("iso-8859-02", "iso-8859-2", id="iso_leading_zero"),
pytest.param("iso 8859-02", "iso-8859-2", id="iso_space_leading_zero"),
pytest.param("cp-1250", "cp1250", id="cp_hyphen"),
pytest.param("UTF-8", "UTF-8", id="passthrough"),
pytest.param("iso-8859-15", "iso-8859-15", id="no_leading_zero"),
pytest.param("x-penn-def", "x-penn-def", id="unknown_passthrough"),
],
)
def test_normalize_charset_name(name, expected):
assert cl.TelnetClient._normalize_charset_name(name) == expected
@pytest.mark.asyncio
async def test_send_env():

@@ -68,0 +98,0 @@ c = _make_client(term="xterm", cols=132, rows=43)

@@ -11,2 +11,4 @@ """Tests for telnetlib3.color_filter — ANSI color palette translation."""

ColorFilter,
PetsciiColorFilter,
AtasciiControlFilter,
_adjust_color,

@@ -31,3 +33,3 @@ _is_foreground_code,

def test_all_expected_palettes_exist(self) -> None:
assert set(PALETTES.keys()) == {"ega", "cga", "vga", "amiga", "xterm"}
assert set(PALETTES.keys()) == {"ega", "cga", "vga", "amiga", "xterm", "c64"}

@@ -457,3 +459,3 @@

class TestColorFilterDifferentPalettes:
@pytest.mark.parametrize("name", list(PALETTES.keys()))
@pytest.mark.parametrize("name", [n for n in PALETTES if n != "c64"])
def test_palette_red_foreground(self, name: str) -> None:

@@ -464,1 +466,132 @@ f = ColorFilter(ColorConfig(palette_name=name, brightness=1.0, contrast=1.0))

assert f"38;2;{rgb[0]};{rgb[1]};{rgb[2]}" in result
class TestPetsciiColorFilter:
def _make_filter(self, **kwargs: object) -> PetsciiColorFilter:
cfg = ColorConfig(brightness=1.0, contrast=1.0, **kwargs) # type: ignore[arg-type]
return PetsciiColorFilter(cfg)
@pytest.mark.parametrize("ctrl_char,palette_idx", [
('\x05', 1),
('\x1c', 2),
('\x1e', 5),
('\x1f', 6),
('\x81', 8),
('\x90', 0),
('\x95', 9),
('\x96', 10),
('\x97', 11),
('\x98', 12),
('\x99', 13),
('\x9a', 14),
('\x9b', 15),
('\x9c', 4),
('\x9e', 7),
('\x9f', 3),
])
def test_color_code_to_24bit(self, ctrl_char: str, palette_idx: int) -> None:
f = self._make_filter()
result = f.filter(f"hello{ctrl_char}world")
rgb = PALETTES["c64"][palette_idx]
assert f"\x1b[38;2;{rgb[0]};{rgb[1]};{rgb[2]}m" in result
assert ctrl_char not in result
assert "hello" in result
assert "world" in result
def test_rvs_on(self) -> None:
f = self._make_filter()
result = f.filter("before\x12after")
assert "\x1b[7m" in result
assert "\x12" not in result
def test_rvs_off(self) -> None:
f = self._make_filter()
result = f.filter("before\x92after")
assert "\x1b[27m" in result
assert "\x92" not in result
def test_mixed_colors_and_rvs(self) -> None:
f = self._make_filter()
result = f.filter("\x1c\x12hello\x92\x05world")
red_rgb = PALETTES["c64"][2]
white_rgb = PALETTES["c64"][1]
assert f"\x1b[38;2;{red_rgb[0]};{red_rgb[1]};{red_rgb[2]}m" in result
assert "\x1b[7m" in result
assert "\x1b[27m" in result
assert f"\x1b[38;2;{white_rgb[0]};{white_rgb[1]};{white_rgb[2]}m" in result
assert "hello" in result
assert "world" in result
def test_plain_text_unchanged(self) -> None:
f = self._make_filter()
assert f.filter("hello world") == "hello world"
def test_non_petscii_control_chars_unchanged(self) -> None:
f = self._make_filter()
result = f.filter("A\x07B\x0bC")
assert "A\x07B\x0bC" == result
def test_cursor_controls_translated(self) -> None:
f = self._make_filter()
assert f.filter("A\x13B") == "A\x1b[HB"
assert f.filter("A\x93B") == "A\x1b[2JB"
assert f.filter("A\x11B") == "A\x1b[BB"
assert f.filter("A\x91B") == "A\x1b[AB"
assert f.filter("A\x1dB") == "A\x1b[CB"
assert f.filter("A\x9dB") == "A\x1b[DB"
assert f.filter("A\x14B") == "A\x08\x1b[PB"
def test_flush_returns_empty(self) -> None:
f = self._make_filter()
assert f.flush() == ""
def test_brightness_contrast_applied(self) -> None:
f_full = PetsciiColorFilter(ColorConfig(brightness=1.0, contrast=1.0))
f_dim = PetsciiColorFilter(ColorConfig(brightness=0.5, contrast=0.5))
result_full = f_full.filter("\x1c")
result_dim = f_dim.filter("\x1c")
assert result_full != result_dim
def test_default_config(self) -> None:
f = PetsciiColorFilter()
result = f.filter("\x1c")
assert "\x1b[38;2;" in result
class TestAtasciiControlFilter:
@pytest.mark.parametrize("glyph,expected", [
('\u25c0', '\x08\x1b[P'),
('\u25b6', '\t'),
('\u21b0', '\x1b[2J\x1b[H'),
('\u2191', '\x1b[A'),
('\u2193', '\x1b[B'),
('\u2190', '\x1b[D'),
('\u2192', '\x1b[C'),
])
def test_control_glyph_translated(self, glyph: str, expected: str) -> None:
f = AtasciiControlFilter()
result = f.filter(f"before{glyph}after")
assert f"before{expected}after" == result
def test_backspace_erases(self) -> None:
f = AtasciiControlFilter()
result = f.filter("DINGO\u25c0\u25c0\u25c0\u25c0\u25c0")
assert result == "DINGO" + "\x08\x1b[P" * 5
def test_plain_text_unchanged(self) -> None:
f = AtasciiControlFilter()
assert f.filter("hello world") == "hello world"
def test_atascii_graphics_unchanged(self) -> None:
f = AtasciiControlFilter()
text = "\u2663\u2665\u2666\u2660"
assert f.filter(text) == text
def test_flush_returns_empty(self) -> None:
f = AtasciiControlFilter()
assert f.flush() == ""
def test_multiple_controls_in_one_string(self) -> None:
f = AtasciiControlFilter()
result = f.filter("\u2191\u2193\u2190\u2192")
assert result == "\x1b[A\x1b[B\x1b[D\x1b[C"

@@ -453,3 +453,3 @@ """Test instantiation of basic server and client forms."""

b"Escape character is '^]'.\r\n"
b"hello, space cadet.\r\r\n"
b"hello, space cadet.\r\n"
b"\x1b[m\r\n"

@@ -482,3 +482,2 @@ b"Connection closed by foreign host.\r\n"

if inp:
writer.echo(inp)
writer.write("\ngoodbye.\n")

@@ -505,3 +504,3 @@ await writer.drain()

b"Escape character is '^]'.\n"
b"Press Return to continue:\r\ngoodbye.\n"
b"Press Return to continue:\r\ngoodbye.\r\n"
b"\x1b[m\nConnection closed by foreign host.\n"

@@ -508,0 +507,0 @@ )

# std imports
import asyncio
import functools

@@ -8,2 +9,3 @@ # 3rd party

# local
from telnetlib3 import server_fingerprinting as sfp
from telnetlib3.guard_shells import (

@@ -13,2 +15,3 @@ ConnectionCounter,

busy_shell,
robot_check,
robot_shell,

@@ -407,1 +410,50 @@ _latin1_reading,

await reader.read(-1)
async def test_fingerprint_scanner_defeats_robot_check(unused_tcp_port):
"""Fingerprint scanner's virtual cursor defeats the server's robot_check."""
# local
from telnetlib3.guard_shells import _TEST_CHAR, _measure_width # noqa: PLC0415
from telnetlib3.tests.accessories import create_server # noqa: PLC0415
measured_width: list[int | None] = []
async def guarded_shell(reader, writer):
with _latin1_reading(reader):
width = await _measure_width(reader, writer, _TEST_CHAR, timeout=5.0)
measured_width.append(width)
if width == 1:
writer.write("Robot check passed!\r\n")
await writer.drain()
writer.close()
await writer.wait_closed()
async with create_server(
host="127.0.0.1",
port=unused_tcp_port,
shell=guarded_shell,
connect_maxwait=0.5,
):
# local
import telnetlib3 # noqa: PLC0415
shell = functools.partial(
sfp.fingerprinting_client_shell,
host="127.0.0.1",
port=unused_tcp_port,
silent=True,
banner_quiet_time=1.0,
banner_max_wait=5.0,
)
reader, writer = await telnetlib3.open_connection(
host="127.0.0.1",
port=unused_tcp_port,
encoding=False,
shell=shell,
connect_minwait=0.5,
)
# Shell runs as a background task — wait for it to finish.
await asyncio.wait_for(writer.protocol.waiter_closed, timeout=10.0)
assert measured_width, "server shell never ran"
assert measured_width[0] == 1, f"expected width=1, got {measured_width[0]}"

@@ -31,2 +31,7 @@ # std imports

class _MockProtocol:
def __init__(self):
self.force_binary = False
class MockWriter:

@@ -49,2 +54,3 @@ def __init__(self, extra=None, will_options=None, wont_options=None):

self.comport_data: dict[str, object] | None = None
self.protocol = _MockProtocol()
self._closing = False

@@ -89,2 +95,28 @@

class InteractiveMockReader:
"""
MockReader that gates chunks behind writer responses.
The first chunk is available immediately. Each subsequent chunk is released only after the
writer has accumulated one more write than before, simulating a server that waits for client
input before sending the next prompt.
"""
def __init__(self, chunks, writer):
self._chunks = list(chunks)
self._writer = writer
self._idx = 0
async def read(self, n):
if self._idx >= len(self._chunks):
await asyncio.sleep(10)
return b""
needed_writes = self._idx
while len(self._writer._writes) < needed_writes:
await asyncio.sleep(0.001)
chunk = self._chunks[self._idx]
self._idx += 1
return chunk[:n]
_BINARY_PROBE = {"BINARY": {"status": "WILL", "opt": fps.BINARY}}

@@ -219,5 +251,86 @@

assert not sfp._format_banner(b"")
assert sfp._format_banner(b"\xff\xfe\xfd") == "\ufffd\ufffd\ufffd"
def test_format_banner_surrogateescape():
"""High bytes are preserved as surrogates, not replaced with U+FFFD."""
result = sfp._format_banner(b"\xff\xfe\xb1")
assert "\ufffd" not in result
assert result == "\udcff\udcfe\udcb1"
raw = result.encode("ascii", errors="surrogateescape")
assert raw == b"\xff\xfe\xb1"
def test_format_banner_json_roundtrip():
"""Surrogates survive JSON serialization and can recover raw bytes."""
banner = sfp._format_banner(b"Hello\xb1\xb2World")
encoded = json.dumps(banner)
decoded = json.loads(encoded)
assert decoded == banner
raw = decoded.encode("ascii", errors="surrogateescape")
assert raw == b"Hello\xb1\xb2World"
def test_format_banner_unknown_encoding_fallback():
"""Unknown encoding falls back to latin-1 instead of raising LookupError."""
result = sfp._format_banner(b"Hello\xb1World", encoding="x-no-such-codec")
assert result == "Hello\xb1World"
assert result == b"Hello\xb1World".decode("latin-1")
def test_format_banner_atascii():
"""ATASCII encoding decodes banner bytes through the registered codec."""
result = sfp._format_banner(b"Hello\x9b", encoding="atascii")
assert result == "Hello\n"
def test_format_banner_petscii_color():
"""PETSCII color codes are translated to ANSI 24-bit RGB in banners."""
result = sfp._format_banner(b"\x1c\xc8\xc9", encoding="petscii")
assert "\x1b[38;2;" in result
assert "HI" in result
assert "\x1c" not in result
def test_format_banner_petscii_rvs():
"""PETSCII RVS ON/OFF are translated to ANSI reverse in banners."""
result = sfp._format_banner(b"\x12\xc8\xc9\x92", encoding="petscii")
assert "\x1b[7m" in result
assert "\x1b[27m" in result
def test_format_banner_petscii_newline():
"""PETSCII CR line terminators are normalized to LF in banners."""
result = sfp._format_banner(b"\xc8\xc9\x0d\xca\xcb", encoding="petscii")
assert "HI\nJK" == result
def test_format_banner_petscii_cursor():
"""PETSCII cursor controls are translated to ANSI in banners."""
result = sfp._format_banner(b"\x13\xc8\xc9", encoding="petscii")
assert "\x1b[H" in result
assert "HI" in result
@pytest.mark.parametrize(
"data,expected",
[
pytest.param(b"\x1b[0;0 D", "cp437", id="cp437_font0"),
pytest.param(b"\x1b[0;36 D", "atascii", id="atascii_font36"),
pytest.param(b"\x1b[0;32 D", "petscii", id="petscii_c64_upper"),
pytest.param(b"\x1b[0;40 D", "cp437", id="topaz_plus_font40"),
pytest.param(b"\x1b[1;36 D", "atascii", id="atascii_secondary"),
pytest.param(b"hello world", None, id="no_sequence"),
pytest.param(b"\x1b[0;255 D", None, id="unknown_font_id"),
],
)
def test_detect_syncterm_font(data, expected):
assert sfp.detect_syncterm_font(data) == expected
def test_syncterm_font_in_banner():
"""Font sequence embedded in banner data is detected."""
data = b"Welcome\x1b[0;36 Dto the BBS"
assert sfp.detect_syncterm_font(data) == "atascii"
@pytest.mark.asyncio

@@ -722,4 +835,4 @@ async def test_read_banner():

[
pytest.param(b"Welcome\r\n", b"\r\n", id="no_prompt"),
pytest.param(b"", b"\r\n", id="empty"),
pytest.param(b"Welcome\r\n", None, id="no_prompt"),
pytest.param(b"", None, id="empty"),
pytest.param(b"Continue? (yes/no) ", b"yes\r\n", id="yes_no_parens"),

@@ -732,35 +845,7 @@ pytest.param(b"Continue? (y/n) ", b"y\r\n", id="y_n_parens"),

pytest.param(b"Type yes/no please", b"yes\r\n", id="yes_no_space_delimited"),
pytest.param(b"systemd/network", b"\r\n", id="false_positive_word"),
pytest.param(b"beyond", b"\r\n", id="substring_y_n_not_matched"),
pytest.param(
b"Please enter a name: (or 'who' or 'finger'):", b"who\r\n", id="who_single_quotes"
),
pytest.param(b'Enter your name (or "who"):', b"who\r\n", id="who_double_quotes"),
pytest.param(b"What is your name? (or 'WHO')", b"who\r\n", id="who_uppercase"),
pytest.param(b"Enter your name:", b"\r\n", id="name_prompt_no_who"),
pytest.param(
b"connect <name> <password>\r\n"
b"WHO to see players connected.\r\n"
b"QUIT to disconnect.\r\n",
b"who\r\n",
id="who_bare_command_listing",
),
pytest.param(b"Type WHO to list users", b"who\r\n", id="who_bare_mid_sentence"),
pytest.param(b"somehow", b"\r\n", id="who_inside_word_not_matched"),
pytest.param(b"Type 'help' for a list of commands:", b"help\r\n", id="help_single_quotes"),
pytest.param(b'Enter your name (or "help"):', b"help\r\n", id="help_double_quotes"),
pytest.param(
b"HELP to see available commands.\r\n",
b"help\r\n",
id="help_bare_command_listing",
),
pytest.param(b"Type HELP for info", b"help\r\n", id="help_bare_mid_sentence"),
pytest.param(b"helpful tips", b"\r\n", id="help_inside_word_not_matched"),
pytest.param(
b"connect <name>\r\n"
b"WHO to see players connected.\r\n"
b"HELP to see available commands.\r\n",
b"who\r\n",
id="who_preferred_over_help",
),
pytest.param(b"Continue? (Yes|No) ", b"yes\r\n", id="yes_pipe_no_parens"),
pytest.param(b"Accept? (YES|NO):", b"yes\r\n", id="yes_pipe_no_upper"),
pytest.param(b"systemd/network", None, id="false_positive_word"),
pytest.param(b"beyond", None, id="substring_y_n_not_matched"),
pytest.param(b"Enter your name:", None, id="name_prompt_no_who"),
pytest.param(b"Color? ", b"y\r\n", id="color_question"),

@@ -770,3 +855,3 @@ pytest.param(b"Do you want color? ", b"y\r\n", id="color_in_sentence"),

pytest.param(b"color ? ", b"y\r\n", id="color_space_before_question"),
pytest.param(b"colorful display", b"\r\n", id="color_no_question_mark"),
pytest.param(b"colorful display", None, id="color_no_question_mark"),
pytest.param(

@@ -784,3 +869,7 @@ b"Select charset:\r\n1) ASCII\r\n2) ISO-8859-1\r\n5) UTF-8\r\n",

),
pytest.param(b"1) ASCII\r\n2) Latin-1\r\n", b"\r\n", id="menu_no_utf8"),
pytest.param(b"[5] UTF-8\r\nSelect: ", b"5\r\n", id="menu_utf8_brackets"),
pytest.param(b"[2] utf-8\r\n", b"2\r\n", id="menu_utf8_brackets_lower"),
pytest.param(b"3. UTF-8\r\n", b"3\r\n", id="menu_utf8_dot"),
pytest.param(b" 5 ... UTF-8\r\n", b"5\r\n", id="menu_utf8_ellipsis"),
pytest.param(b"1) ASCII\r\n2) Latin-1\r\n", None, id="menu_no_utf8"),
pytest.param(b"(1) Ansi\r\n(2) VT100\r\n", b"1\r\n", id="menu_ansi_parens"),

@@ -791,3 +880,16 @@ pytest.param(b"[1] ANSI\r\n[2] VT100\r\n", b"1\r\n", id="menu_ansi_brackets"),

pytest.param(b"(1] ANSI\r\n", b"1\r\n", id="menu_ansi_mixed_brackets"),
pytest.param(b"3. ANSI\r\n", b"3\r\n", id="menu_ansi_dot"),
pytest.param(b"3. English/ANSI\r\n", b"3\r\n", id="menu_english_ansi"),
pytest.param(b"2. English/ANSI\r\n", b"2\r\n", id="menu_english_ansi_2"),
pytest.param(
b" 1 ... English/ANSI The standard\r\n",
b"1\r\n",
id="menu_ansi_ellipsis",
),
pytest.param(
b" 2 .. English/ANSI\r\n",
b"2\r\n",
id="menu_ansi_double_dot",
),
pytest.param(
b"1) ASCII\r\n2) UTF-8\r\n(3) Ansi\r\n",

@@ -797,2 +899,7 @@ b"2\r\n",

),
pytest.param(
b"1. ASCII\r\n2. UTF-8\r\n3. English/ANSI\r\n",
b"2\r\n",
id="menu_utf8_dot_preferred_over_ansi_dot",
),
pytest.param(b"gb/big5", b"big5\r\n", id="gb_big5"),

@@ -802,9 +909,178 @@ pytest.param(b"GB/Big5\r\n", b"big5\r\n", id="gb_big5_mixed_case"),

pytest.param(b"gb/big 5\r\n", b"big5\r\n", id="gb_big5_space_before_5"),
pytest.param(b"bigfoot5", b"\r\n", id="big5_inside_word_not_matched"),
pytest.param(b"bigfoot5", None, id="big5_inside_word_not_matched"),
pytest.param(
b"Press [.ESC.] twice within 15 seconds to CONTINUE...",
b"\x1b\x1b",
id="esc_twice_mystic",
),
pytest.param(
b"Press [ESC] twice to continue",
b"\x1b\x1b",
id="esc_twice_no_dots",
),
pytest.param(
b"Press ESC twice to continue",
b"\x1b\x1b",
id="esc_twice_bare",
),
pytest.param(
b"Press <Esc> twice for the BBS ... ",
b"\x1b\x1b",
id="esc_twice_angle_brackets",
),
pytest.param(
b"\x1b[33mPress [.ESC.] twice within 10 seconds\x1b[0m",
b"\x1b\x1b",
id="esc_twice_ansi_wrapped",
),
pytest.param(
b"\x1b[1;1H\x1b[2JPress [.ESC.] twice within 15 seconds to CONTINUE...",
b"\x1b\x1b",
id="esc_twice_after_clear_screen",
),
pytest.param(
b"Please press [ESC] to continue",
b"\x1b",
id="esc_once_brackets",
),
pytest.param(
b"Press ESC to continue",
b"\x1b",
id="esc_once_bare",
),
pytest.param(
b"press <Esc> to continue",
b"\x1b",
id="esc_once_angle_brackets",
),
pytest.param(
b"\x1b[33mPress [ESC] to continue\x1b[0m",
b"\x1b",
id="esc_once_ansi_wrapped",
),
pytest.param(b"HIT RETURN:", b"\r\n", id="hit_return"),
pytest.param(b"Hit Return.", b"\r\n", id="hit_return_lower"),
pytest.param(b"PRESS RETURN:", b"\r\n", id="press_return"),
pytest.param(b"Press Enter:", b"\r\n", id="press_enter"),
pytest.param(b"press enter", b"\r\n", id="press_enter_lower"),
pytest.param(b"Hit Enter to continue", b"\r\n", id="hit_enter"),
pytest.param(
b"\x1b[1mHIT RETURN:\x1b[0m",
b"\r\n",
id="hit_return_ansi_wrapped",
),
pytest.param(
b"\x1b[31mColor? \x1b[0m",
b"y\r\n",
id="color_ansi_wrapped",
),
pytest.param(
b"\x1b[1mContinue? (y/n)\x1b[0m ",
b"y\r\n",
id="yn_ansi_wrapped",
),
pytest.param(
b"Do you support the ANSI color standard (Yn)? ",
b"y\r\n",
id="yn_paren_capital_y",
),
pytest.param(
b"Continue? [Yn]",
b"y\r\n",
id="yn_bracket_capital_y",
),
pytest.param(
b"Do something (yN)",
b"y\r\n",
id="yn_paren_capital_n",
),
pytest.param(
b"More: (Y)es, (N)o, (C)ontinuous?",
b"C\r\n",
id="more_continuous",
),
pytest.param(
b"\x1b[33mMore: (Y)es, (N)o, (C)ontinuous?\x1b[0m",
b"C\r\n",
id="more_continuous_ansi",
),
pytest.param(
b"more (Y/N/C)ontinuous: ",
b"C\r\n",
id="more_ync_compact",
),
pytest.param(
b"Press the BACKSPACE key to detect your terminal type: ",
b"\x08",
id="backspace_key_telnetbible",
),
pytest.param(
b"\x1b[1mPress the BACKSPACE key\x1b[0m",
b"\x08",
id="backspace_key_ansi_wrapped",
),
pytest.param(
b"\x0cpress del/backspace:",
b"\x14",
id="petscii_del_backspace",
),
pytest.param(
b"\x0c\r\npress del/backspace:",
b"\x14",
id="petscii_del_backspace_crlf",
),
pytest.param(
b"press backspace:",
b"\x14",
id="petscii_backspace_only",
),
pytest.param(
b"press del:",
b"\x14",
id="petscii_del_only",
),
pytest.param(
b"PRESS DEL/BACKSPACE.",
b"\x14",
id="petscii_del_backspace_upper",
),
pytest.param(
b"press backspace/del:",
b"\x14",
id="petscii_backspace_del_reversed",
),
pytest.param(
b"PLEASE HIT YOUR BACKSPACE/DELETE\r\nKEY FOR C/G DETECT:",
b"\x14",
id="petscii_hit_your_backspace_delete",
),
pytest.param(
b"hit your delete/backspace key:",
b"\x14",
id="petscii_hit_your_delete_backspace_key",
),
],
)
def test_detect_yn_prompt(banner, expected):
assert sfp._detect_yn_prompt(banner) == expected
assert sfp._detect_yn_prompt(banner).response == expected
@pytest.mark.parametrize(
"banner, expected_encoding",
[
pytest.param(b"5) UTF-8\r\n", "utf-8", id="utf8_menu"),
pytest.param(b"[2] utf-8\r\n", "utf-8", id="utf8_brackets"),
pytest.param(b"1) UTF8", "utf-8", id="utf8_no_hyphen"),
pytest.param(b"gb/big5", "big5", id="gb_big5"),
pytest.param(b"GB/Big5\r\n", "big5", id="gb_big5_mixed"),
pytest.param(b"(1) Ansi\r\n", None, id="ansi_no_encoding"),
pytest.param(b"yes/no", None, id="yn_no_encoding"),
pytest.param(b"Color? ", None, id="color_no_encoding"),
pytest.param(b"nothing special", None, id="none_no_encoding"),
],
)
def test_detect_yn_prompt_encoding(banner, expected_encoding):
assert sfp._detect_yn_prompt(banner).encoding == expected_encoding
@pytest.mark.asyncio

@@ -855,2 +1131,24 @@ async def test_fingerprinting_shell_yn_prompt(tmp_path):

@pytest.mark.asyncio
async def test_fingerprinting_shell_esc_twice_prompt(tmp_path):
"""Banner with ESC-twice botcheck sends two raw ESC bytes."""
save_path = str(tmp_path / "result.json")
reader = MockReader([b"Press [.ESC.] twice within 15 seconds to CONTINUE..."])
writer = MockWriter(will_options=[fps.SGA])
await sfp.fingerprinting_client_shell(
reader,
writer,
host="localhost",
port=23,
save_path=save_path,
silent=True,
banner_quiet_time=0.01,
banner_max_wait=0.01,
mssp_wait=0.01,
)
assert b"\x1b\x1b" in writer._writes
@pytest.mark.asyncio
async def test_fingerprinting_shell_no_yn_prompt(tmp_path):

@@ -880,7 +1178,11 @@ """Banner without y/n prompt sends bare '\\r\\n'."""

@pytest.mark.asyncio
async def test_fingerprinting_shell_who_prompt(tmp_path):
"""Banner with 'who' option causes 'who\\r\\n' response."""
async def test_fingerprinting_shell_multi_prompt(tmp_path):
"""Server asks color first, then presents a UTF-8 charset menu."""
save_path = str(tmp_path / "result.json")
reader = MockReader([b"Please enter a name: (or 'who' or 'finger'):"])
writer = MockWriter(will_options=[fps.SGA])
reader = InteractiveMockReader([
b"Color? ",
b"Select charset:\r\n1) ASCII\r\n2) UTF-8\r\n",
b"Welcome!\r\n",
], writer)

@@ -899,11 +1201,17 @@ await sfp.fingerprinting_client_shell(

assert b"who\r\n" in writer._writes
assert b"y\r\n" in writer._writes
assert b"2\r\n" in writer._writes
assert writer.environ_encoding == "utf-8"
assert writer.protocol.force_binary is True
@pytest.mark.asyncio
async def test_fingerprinting_shell_help_prompt(tmp_path):
"""Banner with 'help' option causes 'help\\r\\n' response."""
async def test_fingerprinting_shell_multi_prompt_stops_on_bare_return(tmp_path):
"""Loop stops after a bare \\r\\n response (no prompt detected)."""
save_path = str(tmp_path / "result.json")
reader = MockReader([b"Type 'help' for a list of commands:"])
writer = MockWriter(will_options=[fps.SGA])
reader = InteractiveMockReader([
b"Color? ",
b"Welcome!\r\n",
], writer)

@@ -922,5 +1230,33 @@ await sfp.fingerprinting_client_shell(

assert b"help\r\n" in writer._writes
assert b"y\r\n" in writer._writes
prompt_writes = [w for w in writer._writes if w in (b"y\r\n", b"\r\n")]
assert len(prompt_writes) == 2
assert prompt_writes == [b"y\r\n", b"\r\n"]
@pytest.mark.asyncio
async def test_fingerprinting_shell_multi_prompt_max_replies(tmp_path):
"""Loop does not exceed _MAX_PROMPT_REPLIES rounds."""
save_path = str(tmp_path / "result.json")
writer = MockWriter(will_options=[fps.SGA])
banners = [f"Color? (round {i}) ".encode()
for i in range(sfp._MAX_PROMPT_REPLIES + 1)]
reader = InteractiveMockReader(banners, writer)
await sfp.fingerprinting_client_shell(
reader,
writer,
host="localhost",
port=23,
save_path=save_path,
silent=True,
banner_quiet_time=0.01,
banner_max_wait=0.01,
mssp_wait=0.01,
)
y_writes = [w for w in writer._writes if w == b"y\r\n"]
assert len(y_writes) == sfp._MAX_PROMPT_REPLIES
class TestCullDisplay:

@@ -948,1 +1284,276 @@ """Tests for _cull_display bytes conversion."""

assert result == {}
@pytest.mark.asyncio
async def test_read_banner_until_quiet_responds_to_dsr():
"""DSR (ESC[6n) in banner data triggers a CPR response (ESC[1;1R)."""
reader = MockReader([b"Hello\x1b[6nWorld"])
writer = MockWriter()
result = await sfp._read_banner_until_quiet(
reader, quiet_time=0.01, max_wait=0.05, writer=writer,
)
assert result == b"Hello\x1b[6nWorld"
assert b"\x1b[1;1R" in writer._writes
@pytest.mark.asyncio
async def test_read_banner_until_quiet_multiple_dsr():
"""Multiple DSR requests each get a CPR response."""
reader = MockReader([b"\x1b[6n", b"banner\x1b[6n"])
writer = MockWriter()
await sfp._read_banner_until_quiet(
reader, quiet_time=0.01, max_wait=0.05, writer=writer,
)
cpr_count = sum(1 for w in writer._writes if w == b"\x1b[1;1R")
assert cpr_count == 2
@pytest.mark.asyncio
async def test_read_banner_until_quiet_no_dsr_no_write():
"""No DSR in banner means no CPR writes."""
reader = MockReader([b"Welcome to BBS\r\n"])
writer = MockWriter()
await sfp._read_banner_until_quiet(
reader, quiet_time=0.01, max_wait=0.05, writer=writer,
)
assert not writer._writes
@pytest.mark.asyncio
async def test_read_banner_until_quiet_no_writer_ignores_dsr():
"""Without a writer, DSR is silently ignored."""
reader = MockReader([b"Hello\x1b[6n"])
result = await sfp._read_banner_until_quiet(
reader, quiet_time=0.01, max_wait=0.05,
)
assert result == b"Hello\x1b[6n"
@pytest.mark.asyncio
async def test_fingerprinting_shell_dsr_response(tmp_path):
"""Full session responds to DSR in the pre-return banner."""
save_path = str(tmp_path / "result.json")
reader = MockReader([b"\x1b[6nWelcome to BBS\r\n"])
writer = MockWriter(will_options=[fps.SGA])
await sfp.fingerprinting_client_shell(
reader,
writer,
host="localhost",
port=23,
save_path=save_path,
silent=True,
banner_quiet_time=0.01,
banner_max_wait=0.01,
mssp_wait=0.01,
)
assert b"\x1b[1;1R" in writer._writes
@pytest.mark.asyncio
async def test_fingerprinting_settle_dsr_response(tmp_path):
"""DSR arriving during negotiation settle gets an immediate CPR reply."""
save_path = str(tmp_path / "result.json")
reader = MockReader([b"\x1b[6nWelcome\r\n"])
writer = MockWriter(will_options=[fps.SGA])
await sfp.fingerprinting_client_shell(
reader,
writer,
host="localhost",
port=23,
save_path=save_path,
silent=True,
banner_quiet_time=0.01,
banner_max_wait=0.01,
mssp_wait=0.01,
)
assert b"\x1b[1;1R" in writer._writes
@pytest.mark.asyncio
async def test_fingerprinting_shell_ansi_ellipsis_menu(tmp_path):
"""Worldgroup/MajorBBS ellipsis-menu selects first numbered option."""
save_path = str(tmp_path / "result.json")
writer = MockWriter(will_options=[fps.SGA, fps.ECHO])
reader = InteractiveMockReader([
(b"Please choose one of these languages/protocols:\r\n\r\n"
b" 1 ... English/ANSI The standard English language version\r\n"
b" 2 ... English/RIP The English version of RIPscrip graphics\r\n"
b"\r\nChoose a number from 1 to 2: "),
b"Welcome!\r\n",
], writer)
await sfp.fingerprinting_client_shell(
reader,
writer,
host="localhost",
port=23,
save_path=save_path,
silent=True,
banner_quiet_time=0.01,
banner_max_wait=0.01,
mssp_wait=0.01,
)
assert b"1\r\n" in writer._writes
@pytest.mark.asyncio
async def test_read_banner_inline_esc_twice():
"""ESC-twice botcheck is responded to inline during banner collection."""
reader = MockReader([
b"Mystic BBS v1.12\r\n",
b"Press [.ESC.] twice within 15 seconds to CONTINUE...\r\n",
b"Press [.ESC.] twice within 14 seconds to CONTINUE...\r\n",
])
writer = MockWriter()
await sfp._read_banner_until_quiet(
reader, quiet_time=0.01, max_wait=0.05, writer=writer,
)
assert b"\x1b\x1b" in writer._writes
esc_count = sum(1 for w in writer._writes if w == b"\x1b\x1b")
assert esc_count == 1
@pytest.mark.asyncio
async def test_read_banner_inline_esc_once():
"""ESC-once prompt is responded to inline during banner collection."""
reader = MockReader([b"Press [ESC] to continue\r\n"])
writer = MockWriter()
await sfp._read_banner_until_quiet(
reader, quiet_time=0.01, max_wait=0.05, writer=writer,
)
assert b"\x1b" in writer._writes
@pytest.mark.asyncio
async def test_fingerprinting_shell_esc_inline_no_duplicate(tmp_path):
"""Inline ESC response prevents duplicate in the prompt loop."""
save_path = str(tmp_path / "result.json")
writer = MockWriter(will_options=[fps.SGA])
reader = InteractiveMockReader([
b"Press [.ESC.] twice within 15 seconds to CONTINUE...\r\n",
b"Welcome to Mystic BBS!\r\nLogin: ",
], writer)
await sfp.fingerprinting_client_shell(
reader,
writer,
host="localhost",
port=23,
save_path=save_path,
silent=True,
banner_quiet_time=0.01,
banner_max_wait=0.01,
mssp_wait=0.01,
)
esc_writes = [w for w in writer._writes if w == b"\x1b\x1b"]
assert len(esc_writes) == 1
@pytest.mark.asyncio
async def test_fingerprinting_shell_delayed_prompt(tmp_path):
"""Bare-return banner followed by ESC-twice prompt still gets answered."""
save_path = str(tmp_path / "result.json")
writer = MockWriter(will_options=[fps.SGA])
reader = InteractiveMockReader([
b"Starting BBS-DOS...\r\n",
b"Press [.ESC.] twice within 15 seconds to CONTINUE...",
b"Welcome!\r\n",
], writer)
await sfp.fingerprinting_client_shell(
reader,
writer,
host="localhost",
port=23,
save_path=save_path,
silent=True,
banner_quiet_time=0.01,
banner_max_wait=0.01,
mssp_wait=0.01,
)
assert b"\x1b\x1b" in writer._writes
@pytest.mark.asyncio
async def test_read_banner_virtual_cursor_defeats_robot_check():
"""DSR-space-DSR produces CPR col=1 then col=2 (width=1)."""
reader = MockReader([b"\x1b[6n \x1b[6n"])
writer = MockWriter()
cursor = sfp._VirtualCursor()
await sfp._read_banner_until_quiet(
reader, quiet_time=0.01, max_wait=0.05, writer=writer, cursor=cursor,
)
cpr_writes = [w for w in writer._writes if b"R" in w]
assert cpr_writes[0] == b"\x1b[1;1R"
assert cpr_writes[1] == b"\x1b[1;2R"
@pytest.mark.asyncio
async def test_read_banner_virtual_cursor_separate_chunks():
"""DSR in separate chunks still tracks cursor correctly."""
reader = MockReader([b"\x1b[6n", b" \x1b[6n"])
writer = MockWriter()
cursor = sfp._VirtualCursor()
await sfp._read_banner_until_quiet(
reader, quiet_time=0.01, max_wait=0.05, writer=writer, cursor=cursor,
)
cpr_writes = [w for w in writer._writes if b"R" in w]
assert cpr_writes[0] == b"\x1b[1;1R"
assert cpr_writes[1] == b"\x1b[1;2R"
@pytest.mark.asyncio
async def test_read_banner_virtual_cursor_wide_char():
"""Wide CJK character advances cursor by 2."""
reader = MockReader([b"\x1b[6n\xe4\xb8\xad\x1b[6n"])
writer = MockWriter()
cursor = sfp._VirtualCursor()
await sfp._read_banner_until_quiet(
reader, quiet_time=0.01, max_wait=0.05, writer=writer, cursor=cursor,
)
cpr_writes = [w for w in writer._writes if b"R" in w]
assert cpr_writes[0] == b"\x1b[1;1R"
assert cpr_writes[1] == b"\x1b[1;3R"
def test_virtual_cursor_backspace():
"""Backspace moves cursor left."""
cursor = sfp._VirtualCursor()
cursor.advance(b"AB\x08")
assert cursor.col == 2
def test_virtual_cursor_cr():
"""Carriage return resets cursor to column 1."""
cursor = sfp._VirtualCursor()
cursor.advance(b"Hello\r")
assert cursor.col == 1
def test_virtual_cursor_ansi_stripped():
"""ANSI color codes do not advance cursor."""
cursor = sfp._VirtualCursor()
cursor.advance(b"\x1b[31mX\x1b[0m")
assert cursor.col == 2
@pytest.mark.parametrize("response,encoding,expected", [
pytest.param(b"\r\n", "atascii", b"\x9b", id="atascii_bare_return"),
pytest.param(b"yes\r\n", "atascii", b"yes\x9b", id="atascii_yes"),
pytest.param(b"y\r\n", "atascii", b"y\x9b", id="atascii_y"),
pytest.param(b"\r\n", "ascii", b"\r\n", id="ascii_unchanged"),
pytest.param(b"\r\n", "utf-8", b"\r\n", id="utf8_unchanged"),
pytest.param(b"yes\r\n", "utf-8", b"yes\r\n", id="utf8_yes_unchanged"),
pytest.param(b"\x1b\x1b", "atascii", b"\x1b\x1b", id="atascii_esc_esc"),
])
def test_reencode_prompt(response, encoding, expected):
# local
import telnetlib3 # noqa: F401 - registers codecs
assert sfp._reencode_prompt(response, encoding) == expected

@@ -272,18 +272,2 @@ # std imports

@pytest.mark.parametrize(
"input_chars,closing,expected",
[
pytest.param(["a"], False, "a", id="normal"),
pytest.param(["\x1b", "A", "x"], False, "x", id="skips_escape"),
pytest.param([], True, None, id="returns_none_when_closing"),
pytest.param(["\x1b", "1", "A", "x"], False, "x", id="escape_non_letter"),
],
)
@pytest.mark.asyncio
async def test_get_next_ascii(input_chars, closing, expected):
writer = MockWriter()
writer._closing = closing
assert await ss.get_next_ascii(MockReader(input_chars), writer) == expected
@pytest.mark.parametrize(
"input_data,expected",

@@ -361,4 +345,4 @@ [

[
pytest.param(2, True, id="width_2"),
pytest.param(1, False, id="width_1"),
pytest.param(1, True, id="width_1"),
pytest.param(2, False, id="width_2"),
pytest.param(None, False, id="width_none"),

@@ -365,0 +349,0 @@ ],

@@ -148,3 +148,3 @@ [tox]

codespell --skip="*.pyc,htmlcov*,_build,build,*.egg-info,.tox,.git" \
--ignore-words-list="wont,nams,flushin,thirdparty,lient,caf" \
--ignore-words-list="wont,nams,flushin,thirdparty,lient,caf,alo" \
--uri-ignore-words-list "*" \

@@ -151,0 +151,0 @@ --summary --count

Sorry, the diff of this file is too big to display