python-pkcs11
Advanced tools
| name: install-opencryptoki | ||
| author: Matthias Valvekens | ||
| description: Install opencryptoki and configure an empty token | ||
| inputs: | ||
| os: | ||
| description: OS to target | ||
| required: true | ||
| token-label: | ||
| description: Label assigned to the token | ||
| required: true | ||
| token-user-pin: | ||
| description: User PIN to configure on the token | ||
| required: true | ||
| token-so-pin: | ||
| description: Security officer PIN to configure on the token | ||
| required: true | ||
| outputs: | ||
| module: | ||
| description: Path to PKCS#11 module | ||
| value: ${{ steps.install.outputs.module }} | ||
| runs: | ||
| using: "composite" | ||
| steps: | ||
| - name: Install opencryptoki | ||
| id: install | ||
| shell: bash | ||
| run: | | ||
| if [[ "${OS_NAME:0:6}" == 'ubuntu' ]]; then | ||
| sudo apt install libcap-dev libldap-dev | ||
| git clone https://github.com/opencryptoki/opencryptoki | ||
| cd opencryptoki | ||
| ./bootstrap.sh | ||
| ./configure --prefix=/usr --sysconfdir=/etc \ | ||
| --with-pkcs-group=users \ | ||
| --disable-tpmtok --disable-ccatok --disable-ep11tok --disable-icsftok \ | ||
| --disable-p11sak --disable-pkcstok_migrate --disable-pkcsstats | ||
| make | ||
| sudo make install | ||
| sudo ldconfig | ||
| echo -e "slot 0\n{\nstdll = libpkcs11_sw.so\ntokversion = 3.12\n}" > /tmp/opencryptoki.conf | ||
| sudo cp /tmp/opencryptoki.conf /etc/opencryptoki/ | ||
| sudo chown root:root /etc/opencryptoki/opencryptoki.conf | ||
| echo "module=/usr/lib/opencryptoki/libopencryptoki.so" >> "$GITHUB_OUTPUT" | ||
| else | ||
| echo "$OS_NAME is not a supported target system" | ||
| exit 1 | ||
| fi | ||
| env: | ||
| OS_NAME: ${{ inputs.os }} | ||
| - name: Run opencryptoki daemon | ||
| shell: bash | ||
| run: sudo -u pkcsslotd pkcsslotd | ||
| - name: Initialize token | ||
| shell: bash | ||
| run: | | ||
| echo "${{ inputs.token-label }}" | pkcsconf -I -c 0 -S 87654321 | ||
| pkcsconf -P -c 0 -S 87654321 -n "${{ inputs.token-so-pin }}" | ||
| pkcsconf -u -c 0 -S "${{ inputs.token-so-pin }}" -n "${{ inputs.token-user-pin }}" |
| name: test-setup | ||
| author: Matthias Valvekens | ||
| description: Perform set-up for python-pkcs11 CI | ||
| inputs: | ||
| os: | ||
| description: OS to target | ||
| required: true | ||
| python-version: | ||
| description: Python version to target | ||
| required: true | ||
| dependency-group: | ||
| description: UV dependency group to install | ||
| required: true | ||
| pkcs11-platform: | ||
| description: PKCS#11 platform to target | ||
| required: true | ||
| token-label: | ||
| description: Label assigned to the token | ||
| required: true | ||
| token-user-pin: | ||
| description: User PIN to configure on the token | ||
| required: true | ||
| token-so-pin: | ||
| description: Security officer PIN to configure on the token | ||
| required: true | ||
| outputs: | ||
| module: | ||
| description: Path to PKCS#11 module | ||
| value: ${{ steps.install-result.outputs.module }} | ||
| module2: | ||
| description: Path to alternative PKCS#11 module ('multi' only) | ||
| value: ${{ steps.install-result.outputs.module2 }} | ||
| runs: | ||
| using: "composite" | ||
| steps: | ||
| - name: Setup Python | ||
| uses: actions/setup-python@v5 | ||
| with: | ||
| python-version: ${{ inputs.python-version }} | ||
| - uses: ./.github/actions/install-softhsm | ||
| if: inputs.pkcs11-platform == 'softhsm' || inputs.pkcs11-platform == 'multi' | ||
| id: softhsm | ||
| with: | ||
| os: ${{ inputs.os }} | ||
| token-label: ${{ inputs.token-label }} | ||
| token-so-pin: ${{ inputs.token-so-pin }} | ||
| token-user-pin: ${{ inputs.token-user-pin }} | ||
| - uses: ./.github/actions/install-opencryptoki | ||
| # only run opencryptoki tests on ubuntu | ||
| # (macos and windows don't seem to be supported) | ||
| if: inputs.pkcs11-platform == 'opencryptoki' || inputs.pkcs11-platform == 'multi' | ||
| id: opencryptoki | ||
| with: | ||
| os: ${{ inputs.os }} | ||
| token-label: ${{ inputs.token-label }} | ||
| token-so-pin: ${{ inputs.token-so-pin }} | ||
| token-user-pin: ${{ inputs.token-user-pin }} | ||
| - name: Set module path | ||
| id: install-result | ||
| shell: bash | ||
| run: | | ||
| if [[ "$PLATFORM" == 'opencryptoki' ]]; then | ||
| echo "module=${{ steps.opencryptoki.outputs.module }}" >> "$GITHUB_OUTPUT" | ||
| elif [[ "$PLATFORM" == 'softhsm' ]]; then | ||
| echo "module=${{ steps.softhsm.outputs.module }}" >> "$GITHUB_OUTPUT" | ||
| elif [[ "$PLATFORM" == 'multi' ]]; then | ||
| # NOTE: the 'multi' platform is only used for testing the code that | ||
| # swaps between multiple PKCS#11 implementations. As such, any two | ||
| # PKCS#11 implementations will do. If we add a 3rd platform | ||
| # to the CI at a later stage that is faster to install than opencryptoki, | ||
| # switching is an option. | ||
| echo "module=${{ steps.softhsm.outputs.module }}" >> "$GITHUB_OUTPUT" | ||
| echo "module2=${{ steps.opencryptoki.outputs.module }}" >> "$GITHUB_OUTPUT" | ||
| else | ||
| echo "$PLATFORM is not a valid PKCS#11 platform choice" | ||
| exit 1 | ||
| fi | ||
| env: | ||
| PLATFORM: ${{ inputs.pkcs11-platform }} | ||
| - name: Install uv | ||
| uses: astral-sh/setup-uv@v4 | ||
| with: | ||
| enable-cache: true | ||
| python-version: ${{ inputs.python-version }} | ||
| - name: Install testing dependencies | ||
| shell: bash | ||
| run: uv sync --no-dev --exact --group "${{ inputs.dependency-group }}" |
| name: Coverage | ||
| on: | ||
| pull_request: {} | ||
| workflow_dispatch: {} | ||
| env: | ||
| UV_PYTHON_PREFERENCE: only-system | ||
| UV_NO_SYNC: "1" | ||
| PKCS11_TOKEN_LABEL: "TEST" | ||
| PKCS11_TOKEN_PIN: "1234" | ||
| PKCS11_TOKEN_SO_PIN: "5678" | ||
| jobs: | ||
| # For now, we run the coverage as a separate job. | ||
| # At the time of writing, the latest version of Cython's line tracing | ||
| # seems to lead to segfaults in Python 3.13 -> TODO: investigate | ||
| pytest-coverage: | ||
| runs-on: ubuntu-latest | ||
| strategy: | ||
| matrix: | ||
| pkcs11-platform: | ||
| - softhsm | ||
| - opencryptoki | ||
| steps: | ||
| - name: Acquire sources | ||
| uses: actions/checkout@v4 | ||
| - name: Arm coverage-only compiler directives | ||
| # Unfortunately, it doesn't seem to be possible to pass directives | ||
| # to Cython through environment variables: https://github.com/cython/cython/issues/3930 | ||
| # Doing it here is still better than introducing a non-declarative setup.py into the | ||
| # build again. | ||
| run: sed -i 's/#coverage#cython/#cython/g' pkcs11/*.pyx | ||
| - uses: ./.github/actions/test-setup | ||
| id: setup | ||
| with: | ||
| os: ubuntu-latest | ||
| python-version: "3.12" | ||
| dependency-group: coverage | ||
| token-label: ${{ env.PKCS11_TOKEN_LABEL }} | ||
| token-so-pin: ${{ env.PKCS11_TOKEN_SO_PIN }} | ||
| token-user-pin: ${{ env.PKCS11_TOKEN_PIN }} | ||
| pkcs11-platform: ${{ matrix.pkcs11-platform }} | ||
| env: | ||
| CFLAGS: "-DCYTHON_TRACE_NOGIL=1" | ||
| EXT_BUILD_DEBUG: "1" | ||
| - name: Run tests | ||
| run: uv run pytest -v --cov=pkcs11 --cov-branch --cov-report=xml:${{ matrix.pkcs11-platform }}-coverage.xml | ||
| env: | ||
| PKCS11_MODULE: ${{ steps.setup.outputs.module }} | ||
| - name: Stash coverage report | ||
| uses: actions/upload-artifact@v4 | ||
| with: | ||
| name: coverage-${{ strategy.job-index }} | ||
| path: "*-coverage.xml" | ||
| pytest-coverage-multilib: | ||
| runs-on: ubuntu-latest | ||
| steps: | ||
| - name: Acquire sources | ||
| uses: actions/checkout@v4 | ||
| - uses: ./.github/actions/test-setup | ||
| id: setup | ||
| with: | ||
| os: ubuntu-latest | ||
| pkcs11-platform: multi | ||
| token-label: ${{ env.PKCS11_TOKEN_LABEL }} | ||
| token-so-pin: ${{ env.PKCS11_TOKEN_SO_PIN }} | ||
| token-user-pin: ${{ env.PKCS11_TOKEN_PIN }} | ||
| python-version: "3.12" | ||
| dependency-group: coverage | ||
| - name: Run tests | ||
| run: uv run pytest -v --cov=pkcs11 --cov-branch --cov-report=xml:multilib-coverage.xml tests/test_multilib.py | ||
| env: | ||
| PKCS11_MODULE: ${{ steps.setup.outputs.module }} | ||
| PKCS11_MODULE2: ${{ steps.setup.outputs.module2 }} | ||
| - name: Stash coverage report | ||
| uses: actions/upload-artifact@v4 | ||
| with: | ||
| name: coverage-multilib | ||
| path: "*-coverage.xml" | ||
| codecov-upload: | ||
| permissions: | ||
| actions: write | ||
| contents: read | ||
| runs-on: ubuntu-latest | ||
| needs: [pytest-coverage] | ||
| steps: | ||
| # checkout necessary to ensure the uploaded report contains the correct paths | ||
| - uses: actions/checkout@v4 | ||
| - name: Retrieve coverage reports | ||
| uses: actions/download-artifact@v4 | ||
| with: | ||
| pattern: coverage-* | ||
| path: ./reports/ | ||
| - name: Upload all coverage reports to Codecov | ||
| uses: codecov/codecov-action@v5 | ||
| with: | ||
| token: ${{ secrets.CODECOV_TOKEN }} | ||
| directory: ./reports/ | ||
| flags: unittests | ||
| env_vars: OS,PYTHON | ||
| name: codecov-umbrella |
| from datetime import datetime | ||
| from struct import Struct | ||
| from pkcs11.constants import ( | ||
| Attribute, | ||
| CertificateType, | ||
| MechanismFlag, | ||
| ObjectClass, | ||
| ) | ||
| from pkcs11.mechanisms import KeyType, Mechanism | ||
| # (Pack Function, Unpack Function) functions | ||
| handle_bool = (Struct("?").pack, lambda v: False if len(v) == 0 else Struct("?").unpack(v)[0]) | ||
| handle_ulong = (Struct("L").pack, lambda v: Struct("L").unpack(v)[0]) | ||
| handle_str = (lambda s: s.encode("utf-8"), lambda b: b.decode("utf-8")) | ||
| handle_date = ( | ||
| lambda s: s.strftime("%Y%m%d").encode("ascii"), | ||
| lambda s: datetime.strptime(s.decode("ascii"), "%Y%m%d").date(), | ||
| ) | ||
| handle_bytes = (bytes, bytes) | ||
| # The PKCS#11 biginteger type is an array of bytes in network byte order. | ||
| # If you have an int type, wrap it in biginteger() | ||
| handle_biginteger = handle_bytes | ||
| def _enum(type_): | ||
| """Factory to pack/unpack ints into IntEnums.""" | ||
| pack, unpack = handle_ulong | ||
| return (lambda v: pack(int(v)), lambda v: type_(unpack(v))) | ||
| ATTRIBUTE_TYPES = { | ||
| Attribute.ALWAYS_AUTHENTICATE: handle_bool, | ||
| Attribute.ALWAYS_SENSITIVE: handle_bool, | ||
| Attribute.APPLICATION: handle_str, | ||
| Attribute.BASE: handle_biginteger, | ||
| Attribute.CERTIFICATE_TYPE: _enum(CertificateType), | ||
| Attribute.CHECK_VALUE: handle_bytes, | ||
| Attribute.CLASS: _enum(ObjectClass), | ||
| Attribute.COEFFICIENT: handle_biginteger, | ||
| Attribute.DECRYPT: handle_bool, | ||
| Attribute.DERIVE: handle_bool, | ||
| Attribute.EC_PARAMS: handle_bytes, | ||
| Attribute.EC_POINT: handle_bytes, | ||
| Attribute.ENCRYPT: handle_bool, | ||
| Attribute.END_DATE: handle_date, | ||
| Attribute.EXPONENT_1: handle_biginteger, | ||
| Attribute.EXPONENT_2: handle_biginteger, | ||
| Attribute.EXTRACTABLE: handle_bool, | ||
| Attribute.HASH_OF_ISSUER_PUBLIC_KEY: handle_bytes, | ||
| Attribute.HASH_OF_SUBJECT_PUBLIC_KEY: handle_bytes, | ||
| Attribute.ID: handle_bytes, | ||
| Attribute.ISSUER: handle_bytes, | ||
| Attribute.KEY_GEN_MECHANISM: _enum(Mechanism), | ||
| Attribute.KEY_TYPE: _enum(KeyType), | ||
| Attribute.LABEL: handle_str, | ||
| Attribute.LOCAL: handle_bool, | ||
| Attribute.MODIFIABLE: handle_bool, | ||
| Attribute.COPYABLE: handle_bool, | ||
| Attribute.MODULUS: handle_biginteger, | ||
| Attribute.MODULUS_BITS: handle_ulong, | ||
| Attribute.NEVER_EXTRACTABLE: handle_bool, | ||
| Attribute.OBJECT_ID: handle_bytes, | ||
| Attribute.PRIME: handle_biginteger, | ||
| Attribute.PRIME_BITS: handle_ulong, | ||
| Attribute.PRIME_1: handle_biginteger, | ||
| Attribute.PRIME_2: handle_biginteger, | ||
| Attribute.PRIVATE: handle_bool, | ||
| Attribute.PRIVATE_EXPONENT: handle_biginteger, | ||
| Attribute.PUBLIC_EXPONENT: handle_biginteger, | ||
| Attribute.SENSITIVE: handle_bool, | ||
| Attribute.SERIAL_NUMBER: handle_bytes, | ||
| Attribute.SIGN: handle_bool, | ||
| Attribute.SIGN_RECOVER: handle_bool, | ||
| Attribute.START_DATE: handle_date, | ||
| Attribute.SUBJECT: handle_bytes, | ||
| Attribute.SUBPRIME: handle_biginteger, | ||
| Attribute.SUBPRIME_BITS: handle_ulong, | ||
| Attribute.TOKEN: handle_bool, | ||
| Attribute.TRUSTED: handle_bool, | ||
| Attribute.UNIQUE_ID: handle_str, | ||
| Attribute.UNWRAP: handle_bool, | ||
| Attribute.URL: handle_str, | ||
| Attribute.VALUE: handle_biginteger, | ||
| Attribute.VALUE_BITS: handle_ulong, | ||
| Attribute.VALUE_LEN: handle_ulong, | ||
| Attribute.VERIFY: handle_bool, | ||
| Attribute.VERIFY_RECOVER: handle_bool, | ||
| Attribute.WRAP: handle_bool, | ||
| Attribute.WRAP_WITH_TRUSTED: handle_bool, | ||
| } | ||
| """ | ||
| Map of attributes to (serialize, deserialize) functions. | ||
| """ | ||
| ALL_CAPABILITIES = ( | ||
| Attribute.ENCRYPT, | ||
| Attribute.DECRYPT, | ||
| Attribute.WRAP, | ||
| Attribute.UNWRAP, | ||
| Attribute.SIGN, | ||
| Attribute.VERIFY, | ||
| Attribute.DERIVE, | ||
| ) | ||
| def _apply_common(template, id_, label, store): | ||
| if id_: | ||
| template[Attribute.ID] = id_ | ||
| if label: | ||
| template[Attribute.LABEL] = label | ||
| template[Attribute.TOKEN] = bool(store) | ||
| def _apply_capabilities(template, possible_capas, capabilities): | ||
| for attr in possible_capas: | ||
| template[attr] = _capa_attr_to_mechanism_flag[attr] & capabilities | ||
| _capa_attr_to_mechanism_flag = { | ||
| Attribute.ENCRYPT: MechanismFlag.ENCRYPT, | ||
| Attribute.DECRYPT: MechanismFlag.DECRYPT, | ||
| Attribute.WRAP: MechanismFlag.WRAP, | ||
| Attribute.UNWRAP: MechanismFlag.UNWRAP, | ||
| Attribute.SIGN: MechanismFlag.SIGN, | ||
| Attribute.VERIFY: MechanismFlag.VERIFY, | ||
| Attribute.DERIVE: MechanismFlag.DERIVE, | ||
| } | ||
| class AttributeMapper: | ||
| """ | ||
| Class mapping PKCS#11 attributes to and from Python values. | ||
| """ | ||
| def __init__(self): | ||
| self.attribute_types = dict(ATTRIBUTE_TYPES) | ||
| self.default_secret_key_template = { | ||
| Attribute.CLASS: ObjectClass.SECRET_KEY, | ||
| Attribute.ID: b"", | ||
| Attribute.LABEL: "", | ||
| Attribute.PRIVATE: True, | ||
| Attribute.SENSITIVE: True, | ||
| } | ||
| self.default_public_key_template = { | ||
| Attribute.CLASS: ObjectClass.PUBLIC_KEY, | ||
| Attribute.ID: b"", | ||
| Attribute.LABEL: "", | ||
| } | ||
| self.default_private_key_template = { | ||
| Attribute.CLASS: ObjectClass.PRIVATE_KEY, | ||
| Attribute.ID: b"", | ||
| Attribute.LABEL: "", | ||
| Attribute.PRIVATE: True, | ||
| Attribute.SENSITIVE: True, | ||
| } | ||
| def register_handler(self, key, pack, unpack): | ||
| self.attribute_types[key] = (pack, unpack) | ||
| def _handler(self, key): | ||
| try: | ||
| return self.attribute_types[key] | ||
| except KeyError as e: | ||
| raise NotImplementedError(f"Can't handle attribute type {hex(key)}.") from e | ||
| def pack_attribute(self, key, value): | ||
| """Pack a Attribute value into a bytes array.""" | ||
| pack, _ = self._handler(key) | ||
| return pack(value) | ||
| def unpack_attributes(self, key, value): | ||
| """Unpack a Attribute bytes array into a Python value.""" | ||
| _, unpack = self._handler(key) | ||
| return unpack(value) | ||
| def public_key_template( | ||
| self, | ||
| *, | ||
| capabilities, | ||
| id_, | ||
| label, | ||
| store, | ||
| ): | ||
| template = self.default_public_key_template | ||
| _apply_capabilities( | ||
| template, (Attribute.ENCRYPT, Attribute.WRAP, Attribute.VERIFY), capabilities | ||
| ) | ||
| _apply_common(template, id_, label, store) | ||
| return template | ||
| def private_key_template( | ||
| self, | ||
| *, | ||
| capabilities, | ||
| id_, | ||
| label, | ||
| store, | ||
| ): | ||
| template = self.default_private_key_template | ||
| _apply_capabilities( | ||
| template, | ||
| (Attribute.DECRYPT, Attribute.UNWRAP, Attribute.SIGN, Attribute.DERIVE), | ||
| capabilities, | ||
| ) | ||
| _apply_common(template, id_, label, store) | ||
| return template | ||
| def secret_key_template( | ||
| self, | ||
| *, | ||
| capabilities, | ||
| id_, | ||
| label, | ||
| store, | ||
| ): | ||
| return self.generic_key_template( | ||
| self.default_secret_key_template, | ||
| capabilities=capabilities, | ||
| id_=id_, | ||
| label=label, | ||
| store=store, | ||
| ) | ||
| def generic_key_template( | ||
| self, | ||
| base_template, | ||
| *, | ||
| capabilities, | ||
| id_, | ||
| label, | ||
| store, | ||
| ): | ||
| template = dict(base_template) | ||
| _apply_capabilities(template, ALL_CAPABILITIES, capabilities) | ||
| _apply_common(template, id_, label, store) | ||
| return template |
| """ | ||
| PKCS#11 Slots and Tokens | ||
| """ | ||
| import os | ||
| import unittest | ||
| import pkcs11 | ||
| from . import LIB | ||
| @unittest.skipUnless("PKCS11_MODULE2" in os.environ, "Requires an additional PKCS#11 module") | ||
| class MultilibTests(unittest.TestCase): | ||
| def test_double_initialise_different_libs(self): | ||
| lib1 = pkcs11.lib(LIB) | ||
| lib2 = pkcs11.lib(os.environ["PKCS11_MODULE2"]) | ||
| self.assertIsNotNone(lib1) | ||
| self.assertIsNotNone(lib2) | ||
| self.assertIsNot(lib1, lib2) | ||
| slots1 = lib1.get_slots() | ||
| slots2 = lib2.get_slots() | ||
| self.assertGreaterEqual(len(slots1), 1) | ||
| self.assertGreaterEqual(len(slots2), 1) |
@@ -10,12 +10,13 @@ name: install-softhsm | ||
| description: Label assigned to the token | ||
| required: false | ||
| default: TEST | ||
| required: true | ||
| token-user-pin: | ||
| description: User PIN to configure on the token | ||
| required: false | ||
| default: "1234" | ||
| required: true | ||
| token-so-pin: | ||
| description: Security officer PIN to configure on the token | ||
| required: false | ||
| default: "5678" | ||
| required: true | ||
| outputs: | ||
| module: | ||
| description: Path to PKCS#11 module | ||
| value: ${{ steps.install.outputs.module }} | ||
| runs: | ||
@@ -25,2 +26,3 @@ using: "composite" | ||
| - name: Install SoftHSM | ||
| id: install | ||
| shell: bash | ||
@@ -33,10 +35,10 @@ run: | | ||
| echo "SOFTHSM2_CONF=/tmp/softhsm2.conf" >> "$GITHUB_ENV" | ||
| echo "PKCS11_MODULE=/usr/lib/softhsm/libsofthsm2.so" >> "$GITHUB_ENV" | ||
| echo "module=/usr/lib/softhsm/libsofthsm2.so" >> "$GITHUB_OUTPUT" | ||
| elif [[ "${OS_NAME:0:5}" == 'macos' ]]; then | ||
| brew install softhsm | ||
| echo "PKCS11_MODULE=/opt/homebrew/lib/softhsm/libsofthsm2.so" >> "$GITHUB_ENV" | ||
| echo "module=/opt/homebrew/lib/softhsm/libsofthsm2.so" >> "$GITHUB_OUTPUT" | ||
| elif [[ "${OS_NAME:0:7}" == 'windows' ]]; then | ||
| choco install softhsm.install | ||
| echo "SOFTHSM2_CONF=D:/SoftHSM2/etc/softhsm2.conf" >> "$GITHUB_ENV" | ||
| echo "PKCS11_MODULE=D:/SoftHSM2/lib/softhsm2-x64.dll" >> "$GITHUB_ENV" | ||
| echo "module=D:/SoftHSM2/lib/softhsm2-x64.dll" >> "$GITHUB_OUTPUT" | ||
| echo "D:/SoftHSM2/bin" >> "$GITHUB_PATH" | ||
@@ -55,4 +57,1 @@ echo "D:/SoftHSM2/lib" >> "$GITHUB_PATH" | ||
| --pin "${{ inputs.token-user-pin }}" --so-pin "${{ inputs.token-so-pin }}" | ||
| echo "PKCS11_TOKEN_LABEL=${{ inputs.token-label }}" >> "$GITHUB_ENV" | ||
| echo "PKCS11_TOKEN_PIN=${{ inputs.token-user-pin }}" >> "$GITHUB_ENV" | ||
| echo "PKCS11_TOKEN_SO_PIN=${{ inputs.token-so-pin }}" >> "$GITHUB_ENV" |
@@ -144,2 +144,2 @@ name: Publish release to PyPI | ||
| fail_on_unmatched_files: true | ||
| name: python-pkcs11 ${{ needs.extract-params.outputs.version }} | ||
| name: v${{ needs.extract-params.outputs.version }} |
@@ -10,6 +10,11 @@ name: Tests | ||
| UV_NO_SYNC: "1" | ||
| PKCS11_TOKEN_LABEL: "TEST" | ||
| PKCS11_TOKEN_PIN: "1234" | ||
| PKCS11_TOKEN_SO_PIN: "5678" | ||
| jobs: | ||
| run: | ||
| tests: | ||
| runs-on: ${{ matrix.os }} | ||
| strategy: | ||
| # Our test suite is pretty fast, so fail-fast: false allows for better troubleshooting. | ||
| fail-fast: false | ||
| matrix: | ||
@@ -26,22 +31,57 @@ os: | ||
| - "3.13" | ||
| pkcs11-platform: | ||
| - softhsm | ||
| - opencryptoki | ||
| exclude: | ||
| # only run opencryptoki tests on ubuntu | ||
| # (macos and windows don't seem to be supported) | ||
| - pkcs11-platform: opencryptoki | ||
| os: windows-latest | ||
| - pkcs11-platform: opencryptoki | ||
| os: macos-latest | ||
| steps: | ||
| - name: Acquire sources | ||
| uses: actions/checkout@v4 | ||
| - name: Setup Python | ||
| uses: actions/setup-python@v5 | ||
| - uses: ./.github/actions/test-setup | ||
| id: setup | ||
| with: | ||
| os: ${{ matrix.os }} | ||
| token-label: ${{ env.PKCS11_TOKEN_LABEL }} | ||
| token-so-pin: ${{ env.PKCS11_TOKEN_SO_PIN }} | ||
| token-user-pin: ${{ env.PKCS11_TOKEN_PIN }} | ||
| python-version: ${{ matrix.python-version }} | ||
| - uses: ./.github/actions/install-softhsm | ||
| pkcs11-platform: ${{ matrix.pkcs11-platform }} | ||
| dependency-group: testing | ||
| - name: Run tests | ||
| run: uv run pytest -v | ||
| env: | ||
| PKCS11_MODULE: ${{ steps.setup.outputs.module }} | ||
| multilib-tests: | ||
| runs-on: ubuntu-latest | ||
| strategy: | ||
| fail-fast: false | ||
| matrix: | ||
| python-version: | ||
| - "3.9" | ||
| - "3.10" | ||
| - "3.11" | ||
| - "3.12" | ||
| - "3.13" | ||
| steps: | ||
| - name: Acquire sources | ||
| uses: actions/checkout@v4 | ||
| - uses: ./.github/actions/test-setup | ||
| id: setup | ||
| with: | ||
| os: ${{ matrix.os }} | ||
| - name: Install uv | ||
| uses: astral-sh/setup-uv@v4 | ||
| with: | ||
| enable-cache: true | ||
| os: ubuntu-latest | ||
| pkcs11-platform: multi | ||
| token-label: ${{ env.PKCS11_TOKEN_LABEL }} | ||
| token-so-pin: ${{ env.PKCS11_TOKEN_SO_PIN }} | ||
| token-user-pin: ${{ env.PKCS11_TOKEN_PIN }} | ||
| python-version: ${{ matrix.python-version }} | ||
| - name: Install testing dependencies | ||
| run: uv sync --no-dev --exact --group testing | ||
| dependency-group: testing | ||
| - name: Run tests | ||
| run: uv run pytest -v | ||
| run: uv run pytest -v tests/test_multilib.py | ||
| env: | ||
| PKCS11_MODULE: ${{ steps.setup.outputs.module }} | ||
| PKCS11_MODULE2: ${{ steps.setup.outputs.module2 }} |
+4
-1
@@ -9,2 +9,5 @@ *.so | ||
| /python_pkcs11.egg-info/ | ||
| /.eggs/ | ||
| /.eggs/ | ||
| .coverage | ||
| *coverage.xml | ||
| *.html |
+25
-17
@@ -5,10 +5,9 @@ """ | ||
| from .constants import * # noqa: F403 | ||
| from .exceptions import * # noqa: F403 | ||
| from .mechanisms import * # noqa: F403 | ||
| from .types import * # noqa: F403 | ||
| from .util import dh, dsa, ec, rsa, x509 # noqa: F401 | ||
| from pkcs11.constants import * # noqa: F403 | ||
| from pkcs11.exceptions import * # noqa: F403 | ||
| from pkcs11.mechanisms import * # noqa: F403 | ||
| from pkcs11.types import * # noqa: F403 | ||
| from pkcs11.util import dh, dsa, ec, rsa, x509 # noqa: F401 | ||
| _so = None | ||
| _lib = None | ||
| _loaded = {} | ||
@@ -21,12 +20,11 @@ | ||
| """ | ||
| global _lib | ||
| global _so | ||
| global _loaded | ||
| if _lib: | ||
| if _so != so: | ||
| raise AlreadyInitialized( # noqa: F405 | ||
| "Already initialized with %s" % _so | ||
| ) | ||
| else: | ||
| return _lib | ||
| try: | ||
| _lib = _loaded[so] | ||
| if not _lib.initialized: | ||
| _lib.initialize() | ||
| return _lib | ||
| except KeyError: | ||
| pass | ||
@@ -36,4 +34,14 @@ from . import _pkcs11 | ||
| _lib = _pkcs11.lib(so) | ||
| _so = so | ||
| _loaded[so] = _lib | ||
| return _lib | ||
| def unload(so): | ||
| global _loaded | ||
| try: | ||
| loaded_lib = _loaded[so] | ||
| except KeyError: | ||
| return | ||
| del _loaded[so] | ||
| loaded_lib.unload() |
+197
-20
@@ -7,3 +7,3 @@ """ | ||
| from defaults import * | ||
| from pkcs11.exceptions import * | ||
@@ -143,2 +143,4 @@ cdef extern from '../extern/cryptoki.h': | ||
| CKR_OPERATION_CANCEL_FAILED, | ||
| CKR_VENDOR_DEFINED, | ||
@@ -151,2 +153,3 @@ | ||
| CKU_CONTEXT_SPECIFIC, | ||
| CKU_USER_NOBODY, | ||
@@ -257,2 +260,14 @@ cdef enum: | ||
| ctypedef struct CK_AES_CTR_PARAMS: | ||
| CK_ULONG ulCounterBits | ||
| CK_BYTE[16] cb | ||
| ctypedef struct CK_GCM_PARAMS: | ||
| CK_BYTE *pIv | ||
| CK_ULONG ulIvLen | ||
| CK_ULONG ulIvBits | ||
| CK_BYTE *pAAD | ||
| CK_ULONG ulAADLen | ||
| CK_ULONG ulTagBits | ||
| ctypedef struct CK_KEY_DERIVATION_STRING_DATA: | ||
@@ -596,2 +611,24 @@ CK_BYTE *pData | ||
| ctypedef CK_RV (*KeyOperationInit) ( | ||
| CK_SESSION_HANDLE session, | ||
| CK_MECHANISM *mechanism, | ||
| CK_OBJECT_HANDLE key | ||
| ) nogil | ||
| ctypedef CK_RV (*OperationUpdateWithResult) ( | ||
| CK_SESSION_HANDLE session, | ||
| CK_BYTE *part_in, | ||
| CK_ULONG part_in_len, | ||
| CK_BYTE *part_out, | ||
| CK_ULONG *part_out_len | ||
| ) nogil | ||
| ctypedef CK_RV (*OperationUpdate) ( | ||
| CK_SESSION_HANDLE session, | ||
| CK_BYTE *part_in, | ||
| CK_ULONG part_in_len | ||
| ) nogil | ||
| ctypedef CK_RV (*OperationWithResult) ( | ||
| CK_SESSION_HANDLE session, | ||
| CK_BYTE *part_out, | ||
| CK_ULONG *part_out_len | ||
| ) nogil | ||
@@ -608,21 +645,161 @@ cdef inline CK_BYTE_buffer(length): | ||
| cdef inline bytes _pack_attribute(key, value): | ||
| """Pack a Attribute value into a bytes array.""" | ||
| # Note: this `cdef inline` declaration doesn't seem to be consistently labelled | ||
| # as executed by Cython's line tracing, so we flag it as nocover | ||
| # to avoid noise in the metrics. | ||
| try: | ||
| pack, _ = ATTRIBUTE_TYPES[key] | ||
| return pack(value) | ||
| except KeyError: | ||
| raise NotImplementedError("Can't pack this %s. " | ||
| "Expand ATTRIBUTE_TYPES!" % key) | ||
| cdef inline _unpack_attributes(key, value): | ||
| """Unpack a Attribute bytes array into a Python value.""" | ||
| try: | ||
| _, unpack = ATTRIBUTE_TYPES[key] | ||
| return unpack(bytes(value)) | ||
| except KeyError: | ||
| raise NotImplementedError("Can't unpack this %s. " | ||
| "Expand ATTRIBUTE_TYPES!" % key) | ||
| cdef inline object map_rv_to_error(CK_RV rv): # pragma: nocover | ||
| if rv == CKR_ATTRIBUTE_TYPE_INVALID: | ||
| exc = AttributeTypeInvalid() | ||
| elif rv == CKR_ATTRIBUTE_VALUE_INVALID: | ||
| exc = AttributeValueInvalid() | ||
| elif rv == CKR_ATTRIBUTE_READ_ONLY: | ||
| exc = AttributeReadOnly() | ||
| elif rv == CKR_ATTRIBUTE_SENSITIVE: | ||
| exc = AttributeSensitive() | ||
| elif rv == CKR_ARGUMENTS_BAD: | ||
| exc = ArgumentsBad() | ||
| elif rv == CKR_BUFFER_TOO_SMALL: | ||
| exc = PKCS11Error("Buffer was too small. Should never see this.") | ||
| elif rv == CKR_CRYPTOKI_ALREADY_INITIALIZED: | ||
| exc = PKCS11Error("Initialisation error (already initialized). Should never see this.") | ||
| elif rv == CKR_CRYPTOKI_NOT_INITIALIZED: | ||
| exc = PKCS11Error("Initialisation error (not initialized). Should never see this.") | ||
| elif rv == CKR_DATA_INVALID: | ||
| exc = DataInvalid() | ||
| elif rv == CKR_DATA_LEN_RANGE: | ||
| exc = DataLenRange() | ||
| elif rv == CKR_DOMAIN_PARAMS_INVALID: | ||
| exc = DomainParamsInvalid() | ||
| elif rv == CKR_DEVICE_ERROR: | ||
| exc = DeviceError() | ||
| elif rv == CKR_DEVICE_MEMORY: | ||
| exc = DeviceMemory() | ||
| elif rv == CKR_DEVICE_REMOVED: | ||
| exc = DeviceRemoved() | ||
| elif rv == CKR_ENCRYPTED_DATA_INVALID: | ||
| exc = EncryptedDataInvalid() | ||
| elif rv == CKR_ENCRYPTED_DATA_LEN_RANGE: | ||
| exc = EncryptedDataLenRange() | ||
| elif rv == CKR_EXCEEDED_MAX_ITERATIONS: | ||
| exc = ExceededMaxIterations() | ||
| elif rv == CKR_FUNCTION_CANCELED: | ||
| exc = FunctionCancelled() | ||
| elif rv == CKR_FUNCTION_FAILED: | ||
| exc = FunctionFailed() | ||
| elif rv == CKR_FUNCTION_REJECTED: | ||
| exc = FunctionRejected() | ||
| elif rv == CKR_FUNCTION_NOT_SUPPORTED: | ||
| exc = FunctionNotSupported() | ||
| elif rv == CKR_KEY_HANDLE_INVALID: | ||
| exc = KeyHandleInvalid() | ||
| elif rv == CKR_KEY_INDIGESTIBLE: | ||
| exc = KeyIndigestible() | ||
| elif rv == CKR_KEY_NEEDED: | ||
| exc = KeyNeeded() | ||
| elif rv == CKR_KEY_NOT_NEEDED: | ||
| exc = KeyNotNeeded() | ||
| elif rv == CKR_KEY_SIZE_RANGE: | ||
| exc = KeySizeRange() | ||
| elif rv == CKR_KEY_NOT_WRAPPABLE: | ||
| exc = KeyNotWrappable() | ||
| elif rv == CKR_KEY_TYPE_INCONSISTENT: | ||
| exc = KeyTypeInconsistent() | ||
| elif rv == CKR_KEY_UNEXTRACTABLE: | ||
| exc = KeyUnextractable() | ||
| elif rv == CKR_GENERAL_ERROR: | ||
| exc = GeneralError() | ||
| elif rv == CKR_HOST_MEMORY: | ||
| exc = HostMemory() | ||
| elif rv == CKR_MECHANISM_INVALID: | ||
| exc = MechanismInvalid() | ||
| elif rv == CKR_MECHANISM_PARAM_INVALID: | ||
| exc = MechanismParamInvalid() | ||
| elif rv == CKR_NO_EVENT: | ||
| exc = NoEvent() | ||
| elif rv == CKR_OBJECT_HANDLE_INVALID: | ||
| exc = ObjectHandleInvalid() | ||
| elif rv == CKR_OPERATION_ACTIVE: | ||
| exc = OperationActive() | ||
| elif rv == CKR_OPERATION_NOT_INITIALIZED: | ||
| exc = OperationNotInitialized() | ||
| elif rv == CKR_PIN_EXPIRED: | ||
| exc = PinExpired() | ||
| elif rv == CKR_PIN_INCORRECT: | ||
| exc = PinIncorrect() | ||
| elif rv == CKR_PIN_INVALID: | ||
| exc = PinInvalid() | ||
| elif rv == CKR_PIN_LEN_RANGE: | ||
| exc = PinLenRange() | ||
| elif rv == CKR_PIN_LOCKED: | ||
| exc = PinLocked() | ||
| elif rv == CKR_PIN_TOO_WEAK: | ||
| exc = PinTooWeak() | ||
| elif rv == CKR_PUBLIC_KEY_INVALID: | ||
| exc = PublicKeyInvalid() | ||
| elif rv == CKR_RANDOM_NO_RNG: | ||
| exc = RandomNoRNG() | ||
| elif rv == CKR_RANDOM_SEED_NOT_SUPPORTED: | ||
| exc = RandomSeedNotSupported() | ||
| elif rv == CKR_SESSION_CLOSED: | ||
| exc = SessionClosed() | ||
| elif rv == CKR_SESSION_COUNT: | ||
| exc = SessionCount() | ||
| elif rv == CKR_SESSION_EXISTS: | ||
| exc = SessionExists() | ||
| elif rv == CKR_SESSION_HANDLE_INVALID: | ||
| exc = SessionHandleInvalid() | ||
| elif rv == CKR_SESSION_PARALLEL_NOT_SUPPORTED: | ||
| exc = PKCS11Error("Parallel not supported. Should never see this.") | ||
| elif rv == CKR_SESSION_READ_ONLY: | ||
| exc = SessionReadOnly() | ||
| elif rv == CKR_SESSION_READ_ONLY_EXISTS: | ||
| exc = SessionReadOnlyExists() | ||
| elif rv == CKR_SESSION_READ_WRITE_SO_EXISTS: | ||
| exc = SessionReadWriteSOExists() | ||
| elif rv == CKR_SIGNATURE_LEN_RANGE: | ||
| exc = SignatureLenRange() | ||
| elif rv == CKR_SIGNATURE_INVALID: | ||
| exc = SignatureInvalid() | ||
| elif rv == CKR_TEMPLATE_INCOMPLETE: | ||
| exc = TemplateIncomplete() | ||
| elif rv == CKR_TEMPLATE_INCONSISTENT: | ||
| exc = TemplateInconsistent() | ||
| elif rv == CKR_SLOT_ID_INVALID: | ||
| exc = SlotIDInvalid() | ||
| elif rv == CKR_TOKEN_NOT_PRESENT: | ||
| exc = TokenNotPresent() | ||
| elif rv == CKR_TOKEN_NOT_RECOGNIZED: | ||
| exc = TokenNotRecognised() | ||
| elif rv == CKR_TOKEN_WRITE_PROTECTED: | ||
| exc = TokenWriteProtected() | ||
| elif rv == CKR_UNWRAPPING_KEY_HANDLE_INVALID: | ||
| exc = UnwrappingKeyHandleInvalid() | ||
| elif rv == CKR_UNWRAPPING_KEY_SIZE_RANGE: | ||
| exc = UnwrappingKeySizeRange() | ||
| elif rv == CKR_UNWRAPPING_KEY_TYPE_INCONSISTENT: | ||
| exc = UnwrappingKeyTypeInconsistent() | ||
| elif rv == CKR_USER_NOT_LOGGED_IN: | ||
| exc = UserNotLoggedIn() | ||
| elif rv == CKR_USER_ALREADY_LOGGED_IN: | ||
| exc = UserAlreadyLoggedIn() | ||
| elif rv == CKR_USER_ANOTHER_ALREADY_LOGGED_IN: | ||
| exc = AnotherUserAlreadyLoggedIn() | ||
| elif rv == CKR_USER_PIN_NOT_INITIALIZED: | ||
| exc = UserPinNotInitialized() | ||
| elif rv == CKR_USER_TOO_MANY_TYPES: | ||
| exc = UserTooManyTypes() | ||
| elif rv == CKR_USER_TYPE_INVALID: | ||
| exc = PKCS11Error("User type invalid. Should never see this.") | ||
| elif rv == CKR_WRAPPED_KEY_INVALID: | ||
| exc = WrappedKeyInvalid() | ||
| elif rv == CKR_WRAPPED_KEY_LEN_RANGE: | ||
| exc = WrappedKeyLenRange() | ||
| elif rv == CKR_WRAPPING_KEY_HANDLE_INVALID: | ||
| exc = WrappingKeyHandleInvalid() | ||
| elif rv == CKR_WRAPPING_KEY_SIZE_RANGE: | ||
| exc = WrappingKeySizeRange() | ||
| elif rv == CKR_WRAPPING_KEY_TYPE_INCONSISTENT: | ||
| exc = WrappingKeyTypeInconsistent() | ||
| else: | ||
| exc = PKCS11Error("Unmapped error code %s" % hex(rv)) | ||
| return exc |
+27
-0
@@ -88,2 +88,4 @@ """ | ||
| """Object label (:class:`str`).""" | ||
| UNIQUE_ID = 0x00000004 | ||
| """Unique identifier (:class:`str`).""" | ||
| APPLICATION = 0x00000010 | ||
@@ -516,1 +518,26 @@ VALUE = 0x00000011 | ||
| ERROR_STATE = 0x01000000 | ||
| @unique | ||
| class CancelStrategy(IntEnum): | ||
| """ | ||
| Strategy to cancel cryptographic operations | ||
| """ | ||
| DEFAULT = 0 | ||
| """ | ||
| Default strategy: attempt to cancel by finalising the operation using dummy data, | ||
| regardless of whether the operation is successful or not. | ||
| This approach should work universally, but is semantically difficult to identify | ||
| as a cancellation. | ||
| """ | ||
| CANCEL_WITH_INIT = 1 | ||
| """ | ||
| Attempt to cancel by calling the ``C_XYZInit`` function of the running operation | ||
| with a ``NULL`` mechanism. | ||
| This usage is defined in PKCS#11 3.0 but not universally implemented. | ||
| """ | ||
| # TODO support cancelling with C_SessionCancel on PKCS#11 3.0 and up |
+2
-93
@@ -8,12 +8,6 @@ """ | ||
| from datetime import datetime | ||
| from struct import Struct | ||
| from .constants import ( | ||
| Attribute, | ||
| CertificateType, | ||
| from pkcs11.constants import ( | ||
| MechanismFlag, | ||
| ObjectClass, | ||
| ) | ||
| from .mechanisms import MGF, KeyType, Mechanism | ||
| from pkcs11.mechanisms import MGF, KeyType, Mechanism | ||
@@ -114,86 +108,1 @@ DEFAULT_GENERATE_MECHANISMS = { | ||
| """ | ||
| # (Pack Function, Unpack Function) functions | ||
| _bool = (Struct("?").pack, lambda v: Struct("?").unpack(v)[0]) | ||
| _ulong = (Struct("L").pack, lambda v: Struct("L").unpack(v)[0]) | ||
| _str = (lambda s: s.encode("utf-8"), lambda b: b.decode("utf-8")) | ||
| _date = ( | ||
| lambda s: s.strftime("%Y%m%d").encode("ascii"), | ||
| lambda s: datetime.strptime(s.decode("ascii"), "%Y%m%d").date(), | ||
| ) | ||
| _bytes = (bytes, bytes) | ||
| # The PKCS#11 biginteger type is an array of bytes in network byte order. | ||
| # If you have an int type, wrap it in biginteger() | ||
| _biginteger = _bytes | ||
| def _enum(type_): | ||
| """Factory to pack/unpack intos into IntEnums.""" | ||
| pack, unpack = _ulong | ||
| return (lambda v: pack(int(v)), lambda v: type_(unpack(v))) | ||
| ATTRIBUTE_TYPES = { | ||
| Attribute.ALWAYS_AUTHENTICATE: _bool, | ||
| Attribute.ALWAYS_SENSITIVE: _bool, | ||
| Attribute.APPLICATION: _str, | ||
| Attribute.BASE: _biginteger, | ||
| Attribute.CERTIFICATE_TYPE: _enum(CertificateType), | ||
| Attribute.CHECK_VALUE: _bytes, | ||
| Attribute.CLASS: _enum(ObjectClass), | ||
| Attribute.COEFFICIENT: _biginteger, | ||
| Attribute.DECRYPT: _bool, | ||
| Attribute.DERIVE: _bool, | ||
| Attribute.EC_PARAMS: _bytes, | ||
| Attribute.EC_POINT: _bytes, | ||
| Attribute.ENCRYPT: _bool, | ||
| Attribute.END_DATE: _date, | ||
| Attribute.EXPONENT_1: _biginteger, | ||
| Attribute.EXPONENT_2: _biginteger, | ||
| Attribute.EXTRACTABLE: _bool, | ||
| Attribute.HASH_OF_ISSUER_PUBLIC_KEY: _bytes, | ||
| Attribute.HASH_OF_SUBJECT_PUBLIC_KEY: _bytes, | ||
| Attribute.ID: _bytes, | ||
| Attribute.ISSUER: _bytes, | ||
| Attribute.KEY_GEN_MECHANISM: _enum(Mechanism), | ||
| Attribute.KEY_TYPE: _enum(KeyType), | ||
| Attribute.LABEL: _str, | ||
| Attribute.LOCAL: _bool, | ||
| Attribute.MODIFIABLE: _bool, | ||
| Attribute.COPYABLE: _bool, | ||
| Attribute.MODULUS: _biginteger, | ||
| Attribute.MODULUS_BITS: _ulong, | ||
| Attribute.NEVER_EXTRACTABLE: _bool, | ||
| Attribute.OBJECT_ID: _bytes, | ||
| Attribute.PRIME: _biginteger, | ||
| Attribute.PRIME_BITS: _ulong, | ||
| Attribute.PRIME_1: _biginteger, | ||
| Attribute.PRIME_2: _biginteger, | ||
| Attribute.PRIVATE: _bool, | ||
| Attribute.PRIVATE_EXPONENT: _biginteger, | ||
| Attribute.PUBLIC_EXPONENT: _biginteger, | ||
| Attribute.SENSITIVE: _bool, | ||
| Attribute.SERIAL_NUMBER: _bytes, | ||
| Attribute.SIGN: _bool, | ||
| Attribute.SIGN_RECOVER: _bool, | ||
| Attribute.START_DATE: _date, | ||
| Attribute.SUBJECT: _bytes, | ||
| Attribute.SUBPRIME: _biginteger, | ||
| Attribute.SUBPRIME_BITS: _ulong, | ||
| Attribute.TOKEN: _bool, | ||
| Attribute.TRUSTED: _bool, | ||
| Attribute.UNWRAP: _bool, | ||
| Attribute.URL: _str, | ||
| Attribute.VALUE: _biginteger, | ||
| Attribute.VALUE_BITS: _ulong, | ||
| Attribute.VALUE_LEN: _ulong, | ||
| Attribute.VERIFY: _bool, | ||
| Attribute.VERIFY_RECOVER: _bool, | ||
| Attribute.WRAP: _bool, | ||
| Attribute.WRAP_WITH_TRUSTED: _bool, | ||
| } | ||
| """ | ||
| Map of attributes to (serialize, deserialize) functions. | ||
| """ |
@@ -13,8 +13,2 @@ """ | ||
| class AlreadyInitialized(PKCS11Error): | ||
| """ | ||
| pkcs11 was already initialized with another library. | ||
| """ | ||
| class AnotherUserAlreadyLoggedIn(PKCS11Error): | ||
@@ -21,0 +15,0 @@ pass |
+21
-0
| from enum import IntEnum | ||
| from pkcs11.exceptions import ArgumentsBad | ||
| class KeyType(IntEnum): | ||
@@ -797,1 +799,20 @@ """ | ||
| return "<MGF.%s>" % self.name | ||
| class GCMParams: | ||
| def __init__(self, nonce, aad=None, tag_bits=128): | ||
| if len(nonce) > 12: | ||
| raise ArgumentsBad("IV must be less than 12 bytes") | ||
| self.nonce = nonce | ||
| self.aad = aad | ||
| self.tag_bits = tag_bits | ||
| class CTRParams: | ||
| def __init__(self, nonce): | ||
| if len(nonce) >= 16: | ||
| raise ArgumentsBad( | ||
| f"{nonce.hex()} is too long to serve as a CTR nonce, must be 15 bytes or less " | ||
| f"to leave room for the block counter." | ||
| ) | ||
| self.nonce = nonce |
+217
-169
@@ -8,18 +8,11 @@ """ | ||
| from binascii import hexlify | ||
| from threading import RLock | ||
| from functools import cached_property | ||
| try: | ||
| from functools import cached_property | ||
| except ImportError: | ||
| from cached_property import cached_property | ||
| from .constants import ( | ||
| from pkcs11 import CancelStrategy | ||
| from pkcs11.constants import ( | ||
| Attribute, | ||
| MechanismFlag, | ||
| ObjectClass, | ||
| SlotFlag, | ||
| TokenFlag, | ||
| UserType, | ||
| ) | ||
| from .exceptions import ( | ||
| from pkcs11.exceptions import ( | ||
| ArgumentsBad, | ||
@@ -32,3 +25,3 @@ AttributeTypeInvalid, | ||
| ) | ||
| from .mechanisms import KeyType, Mechanism | ||
| from pkcs11.mechanisms import KeyType, Mechanism | ||
@@ -39,7 +32,25 @@ PROTECTED_AUTH = object() | ||
| class IdentifiedBy: | ||
| __slots__ = () | ||
| def _identity(self): | ||
| raise NotImplementedError() | ||
| def __eq__(self, other): | ||
| return isinstance(other, IdentifiedBy) and self._identity() == other._identity() | ||
| def __hash__(self): | ||
| return hash(self._identity()) | ||
| def _CK_UTF8CHAR_to_str(data): | ||
| """Convert CK_UTF8CHAR to string.""" | ||
| return data.rstrip(b"\0").decode("utf-8").rstrip() | ||
| """ | ||
| Convert CK_UTF8CHAR to string. | ||
| Substitutes invalid bytes with the replacement character to make usage more | ||
| robust with non-compliant tokens. | ||
| """ | ||
| return data.rstrip(b"\0").decode("utf-8", errors="replace").rstrip() | ||
| def _CK_VERSION_to_tuple(data): | ||
@@ -91,3 +102,3 @@ """Convert CK_VERSION to tuple.""" | ||
| class Slot: | ||
| class Slot(IdentifiedBy): | ||
| """ | ||
@@ -101,27 +112,38 @@ A PKCS#11 device slot. | ||
| def __init__( | ||
| self, | ||
| lib, | ||
| slot_id, | ||
| slotDescription=None, | ||
| manufacturerID=None, | ||
| hardwareVersion=None, | ||
| firmwareVersion=None, | ||
| flags=None, | ||
| **kwargs, | ||
| ): | ||
| self._lib = lib # Hold a reference to the lib to prevent gc | ||
| __slots__ = () | ||
| self.slot_id = slot_id | ||
| @property | ||
| def flags(self): | ||
| """Capabilities of this slot (:class:`SlotFlag`).""" | ||
| raise NotImplementedError() | ||
| @property | ||
| def hardware_version(self): | ||
| """Hardware version (:class:`tuple`).""" | ||
| raise NotImplementedError() | ||
| @property | ||
| def firmware_version(self): | ||
| """Firmware version (:class:`tuple`).""" | ||
| raise NotImplementedError() | ||
| @property | ||
| def cryptoki_version(self): | ||
| """PKCS#11 API version (:class: `tuple`)""" | ||
| raise NotImplementedError() | ||
| @property | ||
| def slot_id(self): | ||
| """Slot identifier (opaque).""" | ||
| self.slot_description = _CK_UTF8CHAR_to_str(slotDescription) | ||
| raise NotImplementedError() | ||
| @property | ||
| def slot_description(self): | ||
| """Slot name (:class:`str`).""" | ||
| self.manufacturer_id = _CK_UTF8CHAR_to_str(manufacturerID) | ||
| raise NotImplementedError() | ||
| @property | ||
| def manufacturer_id(self): | ||
| """Slot/device manufacturer's name (:class:`str`).""" | ||
| self.hardware_version = _CK_VERSION_to_tuple(hardwareVersion) | ||
| """Hardware version (:class:`tuple`).""" | ||
| self.firmware_version = _CK_VERSION_to_tuple(firmwareVersion) | ||
| """Firmware version (:class:`tuple`).""" | ||
| self.flags = SlotFlag(flags) | ||
| """Capabilities of this slot (:class:`SlotFlag`).""" | ||
| raise NotImplementedError() | ||
@@ -153,23 +175,4 @@ def get_token(self): | ||
| def __eq__(self, other): | ||
| return self.slot_id == other.slot_id | ||
| def __str__(self): | ||
| return "\n".join( | ||
| ( | ||
| "Slot Description: %s" % self.slot_description, | ||
| "Manufacturer ID: %s" % self.manufacturer_id, | ||
| "Hardware Version: %s.%s" % self.hardware_version, | ||
| "Firmware Version: %s.%s" % self.firmware_version, | ||
| "Flags: %s" % self.flags, | ||
| ) | ||
| ) | ||
| def __repr__(self): | ||
| return "<{klass} (slotID={slot_id} flags={flags})>".format( | ||
| klass=type(self).__name__, slot_id=self.slot_id, flags=str(self.flags) | ||
| ) | ||
| class Token: | ||
| class Token(IdentifiedBy): | ||
| """ | ||
@@ -182,35 +185,53 @@ A PKCS#11 token. | ||
| def __init__( | ||
| self, | ||
| slot, | ||
| label=None, | ||
| serialNumber=None, | ||
| model=None, | ||
| manufacturerID=None, | ||
| hardwareVersion=None, | ||
| firmwareVersion=None, | ||
| flags=None, | ||
| **kwargs, | ||
| ): | ||
| self.slot = slot | ||
| __slots__ = () | ||
| @property | ||
| def flags(self): | ||
| """Capabilities of this token (:class:`TokenFlag`).""" | ||
| raise NotImplementedError() | ||
| @property | ||
| def hardware_version(self): | ||
| """Hardware version (:class:`tuple`).""" | ||
| raise NotImplementedError() | ||
| @property | ||
| def firmware_version(self): | ||
| """Firmware version (:class:`tuple`).""" | ||
| raise NotImplementedError() | ||
| @property | ||
| def slot(self): | ||
| """The :class:`Slot` this token is installed in.""" | ||
| self.label = _CK_UTF8CHAR_to_str(label) | ||
| raise NotImplementedError() | ||
| @property | ||
| def label(self): | ||
| """Label of this token (:class:`str`).""" | ||
| self.serial = serialNumber.rstrip() | ||
| raise NotImplementedError() | ||
| @property | ||
| def serial(self): | ||
| """Serial number of this token (:class:`bytes`).""" | ||
| self.manufacturer_id = _CK_UTF8CHAR_to_str(manufacturerID) | ||
| """Manufacturer ID.""" | ||
| self.model = _CK_UTF8CHAR_to_str(model) | ||
| """Model name.""" | ||
| self.hardware_version = _CK_VERSION_to_tuple(hardwareVersion) | ||
| """Hardware version (:class:`tuple`).""" | ||
| self.firmware_version = _CK_VERSION_to_tuple(firmwareVersion) | ||
| """Firmware version (:class:`tuple`).""" | ||
| self.flags = TokenFlag(flags) | ||
| """Capabilities of this token (:class:`pkcs11.flags.TokenFlag`).""" | ||
| raise NotImplementedError() | ||
| def __eq__(self, other): | ||
| return self.slot == other.slot | ||
| @property | ||
| def manufacturer_id(self): | ||
| """Manufacturer ID (:class:`str`).""" | ||
| raise NotImplementedError() | ||
| def open(self, rw=False, user_pin=None, so_pin=None, user_type=None): | ||
| @property | ||
| def model(self): | ||
| """Model name (:class:`str`).""" | ||
| raise NotImplementedError() | ||
| def open( | ||
| self, | ||
| rw=False, | ||
| user_pin=None, | ||
| so_pin=None, | ||
| user_type=None, | ||
| attribute_mapper=None, | ||
| cancel_strategy=CancelStrategy.DEFAULT, | ||
| ): | ||
| """ | ||
@@ -236,2 +257,6 @@ Open a session on the token and optionally log in as a user or | ||
| so_pin is set, otherwise UserType.USER. | ||
| :param attribute_mapper: | ||
| Optionally pass in a custom :class:`pkcs11.attributes.AttributeMapper`. | ||
| :param cancel_strategy: | ||
| Cancellation strategy for interrupted cryptographic operations. | ||
@@ -242,12 +267,4 @@ :rtype: Session | ||
| def __str__(self): | ||
| return self.label | ||
| def __repr__(self): | ||
| return "<{klass} (label='{label}' serial={serial} flags={flags})>".format( | ||
| klass=type(self).__name__, label=self.label, serial=self.serial, flags=str(self.flags) | ||
| ) | ||
| class Session: | ||
| class Session(IdentifiedBy): | ||
| """ | ||
@@ -263,23 +280,4 @@ A PKCS#11 :class:`Token` session. | ||
| def __init__(self, token, handle, rw=False, user_type=UserType.NOBODY): | ||
| self.token = token | ||
| """:class:`Token` this session is on.""" | ||
| __slots__ = () | ||
| self._handle = handle | ||
| # Big operation lock prevents other threads from entering/reentering | ||
| # operations. If the same thread enters the lock, they will get a | ||
| # Cryptoki warning | ||
| self._operation_lock = RLock() | ||
| self.rw = rw | ||
| """True if this is a read/write session.""" | ||
| self.user_type = user_type | ||
| """User type for this session (:class:`pkcs11.constants.UserType`).""" | ||
| def __eq__(self, other): | ||
| return self.token == other.token and self._handle == other._handle | ||
| def __hash__(self): | ||
| return hash(self._handle) | ||
| def __enter__(self): | ||
@@ -291,2 +289,12 @@ return self | ||
| @property | ||
| def token(self): | ||
| """:class:`Token` this session is on.""" | ||
| raise NotImplementedError() | ||
| @property | ||
| def rw(self): | ||
| """True if this is a read/write session.""" | ||
| raise NotImplementedError() | ||
| def close(self): | ||
@@ -296,2 +304,5 @@ """Close the session.""" | ||
| def reaffirm_credentials(self, pin): | ||
| raise NotImplementedError() | ||
| def get_key(self, object_class=None, key_type=None, label=None, id=None): | ||
@@ -333,19 +344,14 @@ """ | ||
| iterator = self.get_objects(attrs) | ||
| try: | ||
| key = next(iterator) | ||
| except StopIteration as ex: | ||
| raise NoSuchKey("No key matching %s" % attrs) from ex | ||
| try: | ||
| try: | ||
| key = next(iterator) | ||
| except StopIteration as ex: | ||
| raise NoSuchKey("No key matching %s" % attrs) from ex | ||
| next(iterator) | ||
| raise MultipleObjectsReturned("More than 1 key matches %s" % attrs) | ||
| except StopIteration: | ||
| pass | ||
| return key | ||
| try: | ||
| next(iterator) | ||
| raise MultipleObjectsReturned("More than 1 key matches %s" % attrs) | ||
| except StopIteration: | ||
| return key | ||
| finally: | ||
| # Force finalizing SearchIter rather than waiting for garbage | ||
| # collection, so that we release the operation lock. | ||
| iterator._finalize() | ||
| def get_objects(self, attrs=None): | ||
@@ -452,3 +458,3 @@ """ | ||
| :param KeyType key_type: Key type these parameters are for | ||
| :param int params_length: Size of the parameters (e.g. prime length) | ||
| :param int param_length: Size of the parameters (e.g. prime length) | ||
| in bits. | ||
@@ -538,2 +544,5 @@ :param store: Store these parameters in the HSM | ||
| def _generate_keypair(self, key_type, key_length=None, **kwargs): | ||
| raise NotImplementedError() | ||
| def seed_random(self, seed): | ||
@@ -585,4 +594,23 @@ """ | ||
| def set_pin(self, old_pin, new_pin): | ||
| """Change the user pin.""" | ||
| raise NotImplementedError() | ||
| class Object: | ||
| def init_pin(self, pin): | ||
| """ | ||
| Initializes the user PIN. | ||
| Differs from set_pin in that it sets the user PIN for the first time. | ||
| Once set, the pin can be changed using set_pin. | ||
| """ | ||
| raise NotImplementedError() | ||
| def _digest(self, data, mechanism=None, mechanism_param=None): | ||
| raise NotImplementedError() | ||
| def _digest_generator(self, data, mechanism=None, mechanism_param=None): | ||
| raise NotImplementedError() | ||
| class Object(IdentifiedBy): | ||
| """ | ||
@@ -602,13 +630,19 @@ A PKCS#11 object residing on a :class:`Token`. | ||
| def __init__(self, session, handle): | ||
| self.session = session | ||
| """:class:`Session` this object is valid for.""" | ||
| self._handle = handle | ||
| @property | ||
| def session(self): | ||
| raise NotImplementedError() | ||
| def __eq__(self, other): | ||
| return self.session == other.session and self._handle == other._handle | ||
| @property | ||
| def handle(self): | ||
| raise NotImplementedError() | ||
| def __hash__(self): | ||
| return hash((self.session, self._handle)) | ||
| def __getitem__(self, key): | ||
| raise NotImplementedError() | ||
| def __setitem__(self, key, value): | ||
| raise NotImplementedError() | ||
| def get_attributes(self, keys): | ||
| raise NotImplementedError() | ||
| def copy(self, attrs): | ||
@@ -657,21 +691,2 @@ """ | ||
| def __init__(self, session, handle, params=None): | ||
| super().__init__(session, handle) | ||
| self.params = params | ||
| def __getitem__(self, key): | ||
| if self._handle is None: | ||
| try: | ||
| return self.params[key] | ||
| except KeyError as ex: | ||
| raise AttributeTypeInvalid from ex | ||
| else: | ||
| return super().__getitem__(key) | ||
| def __setitem__(self, key, value): | ||
| if self._handle is None: | ||
| self.params[key] = value | ||
| else: | ||
| super().__setitem__(key, value) | ||
| @cached_property | ||
@@ -715,5 +730,43 @@ def key_type(self): | ||
| class Key(Object): | ||
| class LocalDomainParameters(DomainParameters): | ||
| def __init__(self, session, params): | ||
| self._session = session | ||
| self.params = params | ||
| @property | ||
| def session(self): | ||
| return self._session | ||
| @property | ||
| def handle(self): | ||
| return None | ||
| def __getitem__(self, key): | ||
| try: | ||
| return self.params[key] | ||
| except KeyError as ex: | ||
| raise AttributeTypeInvalid from ex | ||
| def get_attributes(self, keys): | ||
| return {key: self.params[key] for key in keys if key in self.params} | ||
| def __setitem__(self, key, value): | ||
| self.params[key] = value | ||
| class HasKeyType(Object): | ||
| @cached_property | ||
| def key_type(self): | ||
| """Key type (:class:`pkcs11.mechanisms.KeyType`).""" | ||
| return self[Attribute.KEY_TYPE] | ||
| class Key(HasKeyType): | ||
| """Base class for all key :class:`Object` types.""" | ||
| @property | ||
| def key_length(self): | ||
| """Key length in bits.""" | ||
| raise NotImplementedError | ||
| @cached_property | ||
@@ -730,7 +783,2 @@ def id(self): | ||
| @cached_property | ||
| def key_type(self): | ||
| """Key type (:class:`pkcs11.mechanisms.KeyType`).""" | ||
| return self[Attribute.KEY_TYPE] | ||
| @cached_property | ||
| def _key_description(self): | ||
@@ -831,3 +879,3 @@ """A description of the key.""" | ||
| class EncryptMixin(Object): | ||
| class EncryptMixin(HasKeyType): | ||
| """ | ||
@@ -917,3 +965,3 @@ This :class:`Object` supports the encrypt capability. | ||
| class DecryptMixin(Object): | ||
| class DecryptMixin(HasKeyType): | ||
| """ | ||
@@ -950,3 +998,3 @@ This :class:`Object` supports the decrypt capability. | ||
| class SignMixin(Object): | ||
| class SignMixin(HasKeyType): | ||
| """ | ||
@@ -988,3 +1036,3 @@ This :class:`Object` supports the sign capability. | ||
| class VerifyMixin(Object): | ||
| class VerifyMixin(HasKeyType): | ||
| """ | ||
@@ -1033,3 +1081,3 @@ This :class:`Object` supports the verify capability. | ||
| class WrapMixin(Object): | ||
| class WrapMixin(HasKeyType): | ||
| """ | ||
@@ -1055,3 +1103,3 @@ This :class:`Object` supports the wrap capability. | ||
| class UnwrapMixin(Object): | ||
| class UnwrapMixin(HasKeyType): | ||
| """ | ||
@@ -1095,3 +1143,3 @@ This :class:`Object` supports the unwrap capability. | ||
| class DeriveMixin(Object): | ||
| class DeriveMixin(HasKeyType): | ||
| """ | ||
@@ -1098,0 +1146,0 @@ This :class:`Object` supports the derive capability. |
@@ -8,4 +8,4 @@ """ | ||
| from ..constants import Attribute | ||
| from . import biginteger | ||
| from pkcs11.constants import Attribute | ||
| from pkcs11.util import biginteger | ||
@@ -12,0 +12,0 @@ |
@@ -9,4 +9,4 @@ """ | ||
| from ..constants import Attribute | ||
| from . import biginteger | ||
| from pkcs11.constants import Attribute | ||
| from pkcs11.util import biginteger | ||
@@ -13,0 +13,0 @@ |
@@ -15,4 +15,4 @@ """ | ||
| from ..constants import Attribute, ObjectClass | ||
| from ..mechanisms import KeyType | ||
| from pkcs11.constants import Attribute, ObjectClass | ||
| from pkcs11.mechanisms import KeyType | ||
@@ -19,0 +19,0 @@ |
@@ -7,6 +7,6 @@ """ | ||
| from ..constants import Attribute, MechanismFlag, ObjectClass | ||
| from ..defaults import DEFAULT_KEY_CAPABILITIES | ||
| from ..mechanisms import KeyType | ||
| from . import biginteger | ||
| from pkcs11.constants import Attribute, MechanismFlag, ObjectClass | ||
| from pkcs11.defaults import DEFAULT_KEY_CAPABILITIES | ||
| from pkcs11.mechanisms import KeyType | ||
| from pkcs11.util import biginteger | ||
@@ -13,0 +13,0 @@ |
@@ -5,6 +5,7 @@ """ | ||
| from asn1crypto.core import OctetString | ||
| from asn1crypto.x509 import Certificate | ||
| from ..constants import Attribute, CertificateType, ObjectClass | ||
| from ..mechanisms import KeyType | ||
| from pkcs11.constants import Attribute, CertificateType, ObjectClass | ||
| from pkcs11.mechanisms import KeyType | ||
@@ -59,2 +60,6 @@ | ||
| # bytes(key_info['public_key']) returns the binary encoding | ||
| # of the EC point itself (decoded from its BitString representation in X.509), | ||
| # but PKCS#11 expects this as a DER OctetString. | ||
| key = OctetString(key).dump() | ||
| attrs.update( | ||
@@ -61,0 +66,0 @@ { |
+3
-2
| Metadata-Version: 2.4 | ||
| Name: python-pkcs11 | ||
| Version: 0.8.1 | ||
| Version: 0.9.0 | ||
| Summary: PKCS#11 support for Python | ||
@@ -48,3 +48,4 @@ Author-email: Andrey Kislyuk <kislyuk@gmail.com>, Danielle Madeley <danielle@madeley.id.au> | ||
| HSM platforms, but this test setup has not been maintained over time. Currently, | ||
| the integration tests in GitHub Actions use SoftHSMv2 as a baseline. If you would like | ||
| the integration tests in GitHub Actions use SoftHSMv2 as a baseline. | ||
| We also test against ``opencryptoki`` in CI. If you would like | ||
| to contribute some CI setup with additional PKCS#11 implementations or actual HSMs, | ||
@@ -51,0 +52,0 @@ let's chat! |
+23
-0
@@ -70,2 +70,19 @@ [build-system] | ||
| [tool.coverage.run] | ||
| plugins = ["Cython.Coverage"] | ||
| [tool.coverage.report] | ||
| exclude_lines = [ | ||
| "pragma: no cover", | ||
| "pragma: nocover", | ||
| "raise AssertionError", | ||
| "raise NotImplementedError", | ||
| "raise MemoryError", | ||
| "raise TypeError", | ||
| "TYPE_CHECKING", | ||
| "^\\s*\\.\\.\\.", | ||
| "noqa" | ||
| ] | ||
| precision = 2 | ||
| [dependency-groups] | ||
@@ -77,2 +94,8 @@ testing = [ | ||
| ] | ||
| coverage = [ | ||
| { include-group = "testing" }, | ||
| "coverage>=7.9.1", | ||
| "pytest-cov>=4.0,<6.3", | ||
| "cython", | ||
| ] | ||
| docs = [ | ||
@@ -79,0 +102,0 @@ "sphinx>=7.4.7", |
| Metadata-Version: 2.4 | ||
| Name: python-pkcs11 | ||
| Version: 0.8.1 | ||
| Version: 0.9.0 | ||
| Summary: PKCS#11 support for Python | ||
@@ -48,3 +48,4 @@ Author-email: Andrey Kislyuk <kislyuk@gmail.com>, Danielle Madeley <danielle@madeley.id.au> | ||
| HSM platforms, but this test setup has not been maintained over time. Currently, | ||
| the integration tests in GitHub Actions use SoftHSMv2 as a baseline. If you would like | ||
| the integration tests in GitHub Actions use SoftHSMv2 as a baseline. | ||
| We also test against ``opencryptoki`` in CI. If you would like | ||
| to contribute some CI setup with additional PKCS#11 implementations or actual HSMs, | ||
@@ -51,0 +52,0 @@ let's chat! |
| .gitignore | ||
| .python-version | ||
| .readthedocs.yaml | ||
| Dockerfile | ||
| LICENSE | ||
@@ -11,3 +10,6 @@ MANIFEST.in | ||
| .github/release-template.md | ||
| .github/actions/install-opencryptoki/action.yml | ||
| .github/actions/install-softhsm/action.yml | ||
| .github/actions/test-setup/action.yml | ||
| .github/workflows/coverage.yml | ||
| .github/workflows/quality.yml | ||
@@ -34,2 +36,3 @@ .github/workflows/release.yml | ||
| pkcs11/_pkcs11.pyx | ||
| pkcs11/attributes.py | ||
| pkcs11/constants.py | ||
@@ -59,2 +62,3 @@ pkcs11/defaults.py | ||
| tests/test_iterators.py | ||
| tests/test_multilib.py | ||
| tests/test_public_key_external.py | ||
@@ -61,0 +65,0 @@ tests/test_rsa.py |
+2
-1
@@ -23,3 +23,4 @@ .. image:: https://travis-ci.org/danni/python-pkcs11.svg?branch=master | ||
| HSM platforms, but this test setup has not been maintained over time. Currently, | ||
| the integration tests in GitHub Actions use SoftHSMv2 as a baseline. If you would like | ||
| the integration tests in GitHub Actions use SoftHSMv2 as a baseline. | ||
| We also test against ``opencryptoki`` in CI. If you would like | ||
| to contribute some CI setup with additional PKCS#11 implementations or actual HSMs, | ||
@@ -26,0 +27,0 @@ let's chat! |
+358
-1
@@ -8,3 +8,3 @@ """ | ||
| import pkcs11 | ||
| from pkcs11 import Mechanism | ||
| from pkcs11 import ArgumentsBad, CTRParams, GCMParams, Mechanism, PKCS11Error | ||
@@ -37,2 +37,11 @@ from . import FIXME, TestCase, requires | ||
| @requires(Mechanism.AES_CBC_PAD) | ||
| def test_encrypt_undersized_buffer(self): | ||
| data = b"INPUT DATA" * 200 | ||
| iv = b"0" * 16 | ||
| crypttext = self.key.encrypt(data, mechanism_param=iv, buffer_size=32) | ||
| text = self.key.decrypt(crypttext, mechanism_param=iv, buffer_size=32) | ||
| self.assertEqual(data, text) | ||
| @requires(Mechanism.AES_CBC_PAD) | ||
| def test_encrypt_stream(self): | ||
@@ -64,2 +73,68 @@ data = ( | ||
| @requires(Mechanism.AES_CBC_PAD) | ||
| def test_encrypt_stream_interrupt_releases_operation(self): | ||
| data = ( | ||
| b"I" * 16, | ||
| b"N" * 16, | ||
| b"P" * 16, | ||
| b"U" * 16, | ||
| b"T" * 10, | ||
| ) | ||
| iv = b"0" * 16 | ||
| def _data_with_error(): | ||
| yield data[0] | ||
| yield data[1] | ||
| yield data[2] | ||
| raise ValueError | ||
| def attempt_encrypt(): | ||
| list(self.key.encrypt(_data_with_error(), mechanism_param=iv)) | ||
| self.assertRaises(ValueError, attempt_encrypt) | ||
| cryptblocks = list(self.key.encrypt(data, mechanism_param=iv)) | ||
| text = b"".join(self.key.decrypt(cryptblocks, mechanism_param=iv)) | ||
| self.assertEqual(b"".join(data), text) | ||
| @requires(Mechanism.AES_CBC_PAD) | ||
| def test_encrypt_stream_gc_releases_operation(self): | ||
| data = ( | ||
| b"I" * 16, | ||
| b"N" * 16, | ||
| b"P" * 16, | ||
| b"U" * 16, | ||
| b"T" * 10, | ||
| ) | ||
| iv = b"0" * 16 | ||
| attempt = self.key.encrypt(data, mechanism_param=iv) | ||
| next(attempt) | ||
| del attempt | ||
| cryptblocks = list(self.key.encrypt(data, mechanism_param=iv)) | ||
| text = b"".join(self.key.decrypt(cryptblocks, mechanism_param=iv)) | ||
| self.assertEqual(b"".join(data), text) | ||
| @requires(Mechanism.AES_CBC_PAD) | ||
| def test_encrypt_stream_undersized_buffers(self): | ||
| data = ( | ||
| b"I" * 3189, | ||
| b"N" * 3284, | ||
| b"P" * 5812, | ||
| b"U" * 2139, | ||
| b"T" * 5851, | ||
| ) | ||
| iv = b"0" * 16 | ||
| cryptblocks = list(self.key.encrypt(data, mechanism_param=iv, buffer_size=256)) | ||
| self.assertEqual(len(cryptblocks), len(data) + 1) | ||
| crypttext = b"".join(cryptblocks) | ||
| self.assertNotEqual(b"".join(data), crypttext) | ||
| text = b"".join(self.key.decrypt(cryptblocks, mechanism_param=iv, buffer_size=5)) | ||
| self.assertEqual(b"".join(data), text) | ||
| @requires(Mechanism.AES_CBC_PAD) | ||
| def test_encrypt_whacky_sizes(self): | ||
@@ -393,1 +468,283 @@ data = [(char * ord(char)).encode("utf-8") for char in "HELLO WORLD"] | ||
| self.assertEqual(text, data) | ||
| @requires(Mechanism.AES_GCM) | ||
| def test_encrypt_gcm(self): | ||
| data = b"INPUT DATA" | ||
| nonce = b"0" * 12 | ||
| crypttext = self.key.encrypt( | ||
| data, mechanism=Mechanism.AES_GCM, mechanism_param=GCMParams(nonce) | ||
| ) | ||
| self.assertIsInstance(crypttext, bytes) | ||
| self.assertNotEqual(data, crypttext) | ||
| text = self.key.decrypt( | ||
| crypttext, mechanism=Mechanism.AES_GCM, mechanism_param=GCMParams(nonce) | ||
| ) | ||
| self.assertEqual(data, text) | ||
| def test_gcm_nonce_size_limit(self): | ||
| def _inst(): | ||
| return GCMParams(nonce=b"0" * 13) | ||
| self.assertRaises(ArgumentsBad, _inst) | ||
| @requires(Mechanism.AES_GCM) | ||
| def test_encrypt_gcm_with_aad(self): | ||
| data = b"INPUT DATA" | ||
| nonce = b"0" * 12 | ||
| crypttext = self.key.encrypt( | ||
| data, mechanism=Mechanism.AES_GCM, mechanism_param=GCMParams(nonce, b"foo") | ||
| ) | ||
| self.assertIsInstance(crypttext, bytes) | ||
| self.assertNotEqual(data, crypttext) | ||
| text = self.key.decrypt( | ||
| crypttext, mechanism=Mechanism.AES_GCM, mechanism_param=GCMParams(nonce, b"foo") | ||
| ) | ||
| self.assertEqual(data, text) | ||
| @requires(Mechanism.AES_GCM) | ||
| def test_encrypt_gcm_with_mismatching_nonces(self): | ||
| data = b"INPUT DATA" | ||
| nonce1 = b"0" * 12 | ||
| nonce2 = b"1" * 12 | ||
| crypttext = self.key.encrypt( | ||
| data, mechanism=Mechanism.AES_GCM, mechanism_param=GCMParams(nonce1, b"foo") | ||
| ) | ||
| self.assertIsInstance(crypttext, bytes) | ||
| self.assertNotEqual(data, crypttext) | ||
| # This should be EncryptedDataInvalid, but in practice not all tokens support this | ||
| with self.assertRaises(PKCS11Error): | ||
| self.key.decrypt( | ||
| crypttext, mechanism=Mechanism.AES_GCM, mechanism_param=GCMParams(nonce2, b"foo") | ||
| ) | ||
| @requires(Mechanism.AES_GCM) | ||
| def test_encrypt_gcm_with_mismatching_aad(self): | ||
| data = b"INPUT DATA" | ||
| nonce = b"0" * 12 | ||
| crypttext = self.key.encrypt( | ||
| data, mechanism=Mechanism.AES_GCM, mechanism_param=GCMParams(nonce, b"foo") | ||
| ) | ||
| self.assertIsInstance(crypttext, bytes) | ||
| self.assertNotEqual(data, crypttext) | ||
| with self.assertRaises(PKCS11Error): | ||
| self.key.decrypt( | ||
| crypttext, mechanism=Mechanism.AES_GCM, mechanism_param=GCMParams(nonce, b"bar") | ||
| ) | ||
| @requires(Mechanism.AES_GCM) | ||
| def test_encrypt_gcm_with_custom_tag_length(self): | ||
| data = b"INPUT DATA" | ||
| nonce = b"0" * 12 | ||
| crypttext = self.key.encrypt( | ||
| data, mechanism=Mechanism.AES_GCM, mechanism_param=GCMParams(nonce, b"foo", 120) | ||
| ) | ||
| self.assertIsInstance(crypttext, bytes) | ||
| self.assertNotEqual(data, crypttext) | ||
| text = self.key.decrypt( | ||
| crypttext, mechanism=Mechanism.AES_GCM, mechanism_param=GCMParams(nonce, b"foo", 120) | ||
| ) | ||
| self.assertEqual(data, text) | ||
| # This should be EncryptedDataInvalid, but in practice not all tokens support this | ||
| with self.assertRaises(PKCS11Error): | ||
| text = self.key.decrypt( | ||
| crypttext, | ||
| mechanism=Mechanism.AES_GCM, | ||
| mechanism_param=GCMParams(nonce, b"foo", 128), | ||
| ) | ||
| @parameterized.expand( | ||
| [ | ||
| (b""), | ||
| (b"0" * 12), | ||
| (b"0" * 15), | ||
| ] | ||
| ) | ||
| @requires(Mechanism.AES_CTR) | ||
| @FIXME.opencryptoki # opencryptoki incorrectly forces AES-CTR input to be padded | ||
| def test_encrypt_ctr(self, nonce): | ||
| data = b"INPUT DATA SEVERAL BLOCKS LONG SO THE COUNTER GOES UP A FEW TIMES" * 20 | ||
| crypttext = self.key.encrypt( | ||
| data, mechanism=Mechanism.AES_CTR, mechanism_param=CTRParams(nonce) | ||
| ) | ||
| self.assertIsInstance(crypttext, bytes) | ||
| self.assertNotEqual(data, crypttext) | ||
| text = self.key.decrypt( | ||
| crypttext, mechanism=Mechanism.AES_CTR, mechanism_param=CTRParams(nonce) | ||
| ) | ||
| self.assertEqual(data, text) | ||
| @requires(Mechanism.AES_CTR) | ||
| def test_encrypt_ctr_exactly_padded(self): | ||
| # let's still verify the "restricted" AES-CTR supported by opencryptoki | ||
| data = b"PADDED INPUT DATA TO MAKE OPENCRYPTOKI HAPPY" * 16 | ||
| nonce = b"0" * 15 | ||
| crypttext = self.key.encrypt( | ||
| data, mechanism=Mechanism.AES_CTR, mechanism_param=CTRParams(nonce) | ||
| ) | ||
| self.assertIsInstance(crypttext, bytes) | ||
| self.assertNotEqual(data, crypttext) | ||
| text = self.key.decrypt( | ||
| crypttext, mechanism=Mechanism.AES_CTR, mechanism_param=CTRParams(nonce) | ||
| ) | ||
| self.assertEqual(data, text) | ||
| def test_ctr_nonce_size_limit(self): | ||
| def _inst(): | ||
| return CTRParams(nonce=b"0" * 16) | ||
| self.assertRaises(ArgumentsBad, _inst) | ||
| @requires(Mechanism.AES_CTR) | ||
| @FIXME.opencryptoki # opencryptoki incorrectly forces AES-CTR input to be padded | ||
| def test_encrypt_ctr_nonce_mismatch(self): | ||
| data = b"INPUT DATA SEVERAL BLOCKS LONG SO THE COUNTER GOES UP A FEW TIMES" * 20 | ||
| crypttext = self.key.encrypt( | ||
| data, mechanism=Mechanism.AES_CTR, mechanism_param=CTRParams(b"0" * 12) | ||
| ) | ||
| self.assertIsInstance(crypttext, bytes) | ||
| self.assertNotEqual(data, crypttext) | ||
| text = self.key.decrypt( | ||
| crypttext, mechanism=Mechanism.AES_CTR, mechanism_param=CTRParams(b"1" * 12) | ||
| ) | ||
| self.assertNotEqual(data, text) | ||
| @parameterized.expand( | ||
| [ | ||
| ( | ||
| "ae6852f8121067cc4bf7a5765577f39e", | ||
| b"Single block msg", | ||
| "00000030", | ||
| "0000000000000000", | ||
| "e4095d4fb7a7b3792d6175a3261311b8", | ||
| ), | ||
| ( | ||
| "7e24067817fae0d743d6ce1f32539163", | ||
| bytes(range(0x20)), | ||
| "006cb6db", | ||
| "c0543b59da48d90b", | ||
| "5104a106168a72d9790d41ee8edad388eb2e1efc46da57c8fce630df9141be28", | ||
| ), | ||
| ( | ||
| "7691be035e5020a8ac6e618529f9a0dc", | ||
| bytes(range(0x24)), | ||
| "00e0017b", | ||
| "27777f3f4a1786f0", | ||
| "c1cf48a89f2ffdd9cf4652e9efdb72d74540a42bde6d7836d59a5ceaaef3105325b2072f", | ||
| ), | ||
| ( | ||
| "16af5b145fc9f579c175f93e3bfb0eed863d06ccfdb78515", | ||
| b"Single block msg", | ||
| "00000048", | ||
| "36733c147d6d93cb", | ||
| "4b55384fe259c9c84e7935a003cbe928", | ||
| ), | ||
| ( | ||
| "7c5cb2401b3dc33c19e7340819e0f69c678c3db8e6f6a91a", | ||
| bytes(range(0x20)), | ||
| "0096b03b", | ||
| "020c6eadc2cb500d", | ||
| "453243fc609b23327edfaafa7131cd9f8490701c5ad4a79cfc1fe0ff42f4fb00", | ||
| ), | ||
| ( | ||
| "02bf391ee8ecb159b959617b0965279bf59b60a786d3e0fe", | ||
| bytes(range(0x24)), | ||
| "0007bdfd", | ||
| "5cbd60278dcc0912", | ||
| "96893fc55e5c722f540b7dd1ddf7e758d288bc95c69165884536c811662f2188abee0935", | ||
| ), | ||
| ( | ||
| "776beff2851db06f4c8a0542c8696f6c6a81af1eec96b4d37fc1d689e6c1c104", | ||
| b"Single block msg", | ||
| "00000060", | ||
| "db5672c97aa8f0b2", | ||
| "145ad01dbf824ec7560863dc71e3e0c0", | ||
| ), | ||
| ( | ||
| "f6d66d6bd52d59bb0796365879eff886c66dd51a5b6a99744b50590c87a23884", | ||
| bytes(range(0x20)), | ||
| "00faac24", | ||
| "c1585ef15a43d875", | ||
| "f05e231b3894612c49ee000b804eb2a9b8306b508f839d6a5530831d9344af1c", | ||
| ), | ||
| ] | ||
| ) | ||
| # https://github.com/opencryptoki/opencryptoki/issues/881 | ||
| @FIXME.opencryptoki | ||
| @requires(Mechanism.AES_CTR) | ||
| def test_aes_ctr_test_vector(self, key, plaintext, nonce, iv, expected_ciphertext): | ||
| """Official test vectors from RFC 3686""" | ||
| key = self.session.create_object( | ||
| { | ||
| pkcs11.Attribute.CLASS: pkcs11.ObjectClass.SECRET_KEY, | ||
| pkcs11.Attribute.KEY_TYPE: pkcs11.KeyType.AES, | ||
| pkcs11.Attribute.VALUE: bytes.fromhex(key), | ||
| } | ||
| ) | ||
| params = CTRParams(bytes.fromhex(nonce) + bytes.fromhex(iv)) | ||
| ciphertext = key.encrypt(plaintext, mechanism_param=params, mechanism=Mechanism.AES_CTR) | ||
| self.assertEqual(bytes.fromhex(expected_ciphertext), ciphertext) | ||
| @parameterized.expand( | ||
| [ | ||
| ( | ||
| "00000000000000000000000000000000", | ||
| "", | ||
| "", | ||
| "000000000000000000000000", | ||
| "", | ||
| "58e2fccefa7e3061367f1d57a4e7455a", | ||
| ), | ||
| ( | ||
| "00000000000000000000000000000000", | ||
| "00000000000000000000000000000000", | ||
| "", | ||
| "000000000000000000000000", | ||
| "0388dace60b6a392f328c2b971b2fe78", | ||
| "ab6e47d42cec13bdf53a67b21257bddf", | ||
| ), | ||
| ( | ||
| "feffe9928665731c6d6a8f9467308308", | ||
| "d9313225f88406e5a55909c5aff5269a86a7a9531534f7da2e4c303d8a318a721c3c0c95956809532fcf0e2449a6b525b16aedf5aa0de657ba637b391aafd255", | ||
| "", | ||
| "cafebabefacedbaddecaf888", | ||
| "42831ec2217774244b7221b784d0d49ce3aa212f2c02a4e035c17e2329aca12e21d514b25466931c7d8f6a5aac84aa051ba30b396a0aac973d58e091473f5985", | ||
| "4d5c2af327cd64a62cf35abd2ba6fab4", | ||
| ), | ||
| ( | ||
| "feffe9928665731c6d6a8f9467308308", | ||
| "d9313225f88406e5a55909c5aff5269a86a7a9531534f7da2e4c303d8a318a721c3c0c95956809532fcf0e2449a6b525b16aedf5aa0de657ba637b39", | ||
| "feedfacedeadbeeffeedfacedeadbeefabaddad2", | ||
| "cafebabefacedbaddecaf888", | ||
| "42831ec2217774244b7221b784d0d49ce3aa212f2c02a4e035c17e2329aca12e21d514b25466931c7d8f6a5aac84aa051ba30b396a0aac973d58e091", | ||
| "5bc94fbc3221a5db94fae95ae7121a47", | ||
| ), | ||
| ], | ||
| ) | ||
| @requires(Mechanism.AES_GCM) | ||
| def test_aes_gcm_test_vector( | ||
| self, key, plaintext, aad, nonce, expected_ciphertext, expected_tag | ||
| ): | ||
| """Some test vectors from McGrew-Viega""" | ||
| key = self.session.create_object( | ||
| { | ||
| pkcs11.Attribute.CLASS: pkcs11.ObjectClass.SECRET_KEY, | ||
| pkcs11.Attribute.KEY_TYPE: pkcs11.KeyType.AES, | ||
| pkcs11.Attribute.VALUE: bytes.fromhex(key), | ||
| } | ||
| ) | ||
| params = GCMParams(nonce=bytes.fromhex(nonce), aad=bytes.fromhex(aad)) | ||
| result = key.encrypt( | ||
| bytes.fromhex(plaintext), mechanism_param=params, mechanism=Mechanism.AES_GCM | ||
| ) | ||
| expected_output = bytes.fromhex(expected_ciphertext) + bytes.fromhex(expected_tag) | ||
| self.assertEqual(expected_output, result) |
+27
-0
@@ -76,1 +76,28 @@ """ | ||
| self.assertEqual(digest, m.digest()) | ||
| @requires(Mechanism.SHA256) | ||
| def test_digest_stream_interrupt_releases_operation(self): | ||
| data = ( | ||
| b"I" * 16, | ||
| b"N" * 16, | ||
| b"P" * 16, | ||
| b"U" * 16, | ||
| b"T" * 10, | ||
| ) | ||
| def _data_with_error(): | ||
| yield data[0] | ||
| yield data[1] | ||
| yield data[2] | ||
| raise ValueError | ||
| def attempt_digest(): | ||
| self.session.digest(_data_with_error(), mechanism=Mechanism.SHA256) | ||
| self.assertRaises(ValueError, attempt_digest) | ||
| # ...try again | ||
| digest = self.session.digest(data, mechanism=Mechanism.SHA256) | ||
| m = hashlib.sha256() | ||
| for d in data: | ||
| m.update(d) | ||
| self.assertEqual(digest, m.digest()) |
+9
-29
@@ -19,3 +19,3 @@ """ | ||
| from . import TestCase, requires | ||
| from . import Only, TestCase, requires | ||
@@ -73,4 +73,6 @@ | ||
| @requires(Mechanism.ECDSA) | ||
| @Only.softhsm2 | ||
| def test_import_key_params(self): | ||
| # Using explicit curve params is bad practice and many HSMs | ||
| # don't support this usage, so we only test it on SoftHSM | ||
| der = base64.b64decode(""" | ||
@@ -121,16 +123,5 @@ MIICXDCCAc8GByqGSM49AgEwggHCAgEBME0GByqGSM49AQECQgH///////////// | ||
| priv = base64.b64decode(""" | ||
| MIICnAIBAQRB9JsyE7khj/d2jm5RkE9T2DKgr/y3gn4Ju+8oWfdIpurNKM4hh3Oo | ||
| 0T+ilc0BEy/SfJ5iqUxU5TocdFRpOUzfUIKgggHGMIIBwgIBATBNBgcqhkjOPQEB | ||
| AkIB//////////////////////////////////////////////////////////// | ||
| //////////////////////////8wgZ4EQgH///////////////////////////// | ||
| /////////////////////////////////////////////////////////ARBUZU+ | ||
| uWGOHJofkpohoLaFQO6i2nJbmbMV87i0iZGO8QnhVhk5Uex+k3sWUsC9O7G/BzVz | ||
| 34g9LDTx70Uf1GtQPwADFQDQnogAKRy4U5bMZxc5MoSqoNpkugSBhQQAxoWOBrcE | ||
| BOnNnj7LZiOVtEKcZIE5BT+1Ifgor2BrTT26oUted+/nWSj+HcEnov+o3jNIs8GF | ||
| akKb+X5+McLlvWYBGDkpaniaO8AEXIpftCx9G9mY9URJV5tEaBevvRcnPmYsl+5y | ||
| mV70JkDFULkBP60HYTU8cIaicsJAiL6Udp/RZlACQgH///////////////////// | ||
| //////////////////////pRhoeDvy+Wa3/MAUj3CaXQO7XJuImcR667b7cekThk | ||
| CQIBAaGBiQOBhgAEATC4LYExQRq9H+2K1sGbAj6S8WlEL1Cr89guoIYhZsXNhMwY | ||
| MQ2PssJ5huE/vhFWYSR0z3iDp1UXB114r5EXvmDEAWx/32cqnwnuNbyJd/W8IapY | ||
| vN/QAI/1qMV2bopaSmlwabxm8dt/NFCIa3nNYxYyLTjoP16fXTnnI0GSu2dMFatV | ||
| MHcCAQEEIMu1c8rEExH5jAfFy9bIS8RbMoHaKqoyvzrRz5rTUip2oAoGCCqGSM49 | ||
| AwEHoUQDQgAEdrKww7nWyfHoT2jqgGK3wFaJGssJJZD0bIY7RsIISqeaT88bU/HK | ||
| 44HxKoBkOs/JWHX5m/zrblnz40kjOuPZeA== | ||
| """) | ||
@@ -140,15 +131,4 @@ priv = self.session.create_object(decode_ec_private_key(priv)) | ||
| pub = base64.b64decode(""" | ||
| MIICXDCCAc8GByqGSM49AgEwggHCAgEBME0GByqGSM49AQECQgH///////////// | ||
| //////////////////////////////////////////////////////////////// | ||
| /////////zCBngRCAf////////////////////////////////////////////// | ||
| ///////////////////////////////////////8BEFRlT65YY4cmh+SmiGgtoVA | ||
| 7qLacluZsxXzuLSJkY7xCeFWGTlR7H6TexZSwL07sb8HNXPfiD0sNPHvRR/Ua1A/ | ||
| AAMVANCeiAApHLhTlsxnFzkyhKqg2mS6BIGFBADGhY4GtwQE6c2ePstmI5W0Qpxk | ||
| gTkFP7Uh+CivYGtNPbqhS1537+dZKP4dwSei/6jeM0izwYVqQpv5fn4xwuW9ZgEY | ||
| OSlqeJo7wARcil+0LH0b2Zj1RElXm0RoF6+9Fyc+ZiyX7nKZXvQmQMVQuQE/rQdh | ||
| NTxwhqJywkCIvpR2n9FmUAJCAf////////////////////////////////////// | ||
| ////+lGGh4O/L5Zrf8wBSPcJpdA7tcm4iZxHrrtvtx6ROGQJAgEBA4GGAAQBMLgt | ||
| gTFBGr0f7YrWwZsCPpLxaUQvUKvz2C6ghiFmxc2EzBgxDY+ywnmG4T++EVZhJHTP | ||
| eIOnVRcHXXivkRe+YMQBbH/fZyqfCe41vIl39bwhqli839AAj/WoxXZuilpKaXBp | ||
| vGbx2380UIhrec1jFjItOOg/Xp9dOecjQZK7Z0wVq1U= | ||
| MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEdrKww7nWyfHoT2jqgGK3wFaJGssJ | ||
| JZD0bIY7RsIISqeaT88bU/HK44HxKoBkOs/JWHX5m/zrblnz40kjOuPZeA== | ||
| """) | ||
@@ -155,0 +135,0 @@ pub = self.session.create_object(decode_ec_public_key(pub)) |
@@ -5,4 +5,2 @@ """ | ||
| import unittest | ||
| import pkcs11 | ||
@@ -35,5 +33,2 @@ | ||
| @requires(pkcs11.Mechanism.AES_KEY_GEN, pkcs11.Mechanism.AES_CBC_PAD) | ||
| # Ideally deleting iterator #1 would terminate the operation, but it | ||
| # currently does not. | ||
| @unittest.expectedFailure | ||
| def test_close_iterators(self): | ||
@@ -40,0 +35,0 @@ self.session.generate_key(pkcs11.KeyType.AES, 128, label="LOOK ME UP") |
+132
-2
@@ -8,3 +8,3 @@ """ | ||
| from . import FIXME, TestCase, requires | ||
| from . import FIXME, TOKEN_PIN, TestCase, requires | ||
@@ -19,2 +19,6 @@ | ||
| def test_key_length(self): | ||
| self.assertEqual(1024, self.private.key_length) | ||
| self.assertEqual(1024, self.public.key_length) | ||
| @requires(Mechanism.RSA_PKCS) | ||
@@ -31,2 +35,14 @@ def test_sign_pkcs_v15(self): | ||
| @requires(Mechanism.SHA512_RSA_PKCS) | ||
| def test_sign_with_reauthentication(self): | ||
| public, private = self.session.generate_keypair( | ||
| KeyType.RSA, 1024, private_template={Attribute.ALWAYS_AUTHENTICATE: True} | ||
| ) | ||
| data = "INPUT" | ||
| signature = private.sign(data, pin=TOKEN_PIN) | ||
| self.assertIsNotNone(signature) | ||
| self.assertIsInstance(signature, bytes) | ||
| self.assertTrue(public.verify(data, signature)) | ||
| @requires(Mechanism.SHA512_RSA_PKCS) | ||
| def test_sign_default(self): | ||
@@ -48,3 +64,3 @@ data = b"HELLO WORLD" * 1024 | ||
| b"U" * 16, | ||
| b"T" * 10, # don't align to the blocksize | ||
| b"T" * 10, | ||
| ) | ||
@@ -57,2 +73,103 @@ | ||
| @requires(Mechanism.SHA512_RSA_PKCS) | ||
| def test_sign_stream_with_reauthentication(self): | ||
| public, private = self.session.generate_keypair( | ||
| KeyType.RSA, 1024, private_template={Attribute.ALWAYS_AUTHENTICATE: True} | ||
| ) | ||
| data = ( | ||
| b"I" * 16, | ||
| b"N" * 16, | ||
| b"P" * 16, | ||
| b"U" * 16, | ||
| b"T" * 10, | ||
| ) | ||
| signature = private.sign(data, pin=TOKEN_PIN) | ||
| self.assertIsNotNone(signature) | ||
| self.assertIsInstance(signature, bytes) | ||
| self.assertTrue(public.verify(data, signature)) | ||
| @requires(Mechanism.SHA512_RSA_PKCS) | ||
| def test_sign_stream_with_empty_blocks(self): | ||
| data = ( | ||
| b"I" * 16, | ||
| b"N" * 16, | ||
| b"", | ||
| b"P" * 16, | ||
| b"" * 10, | ||
| b"U" * 16, | ||
| b"T" * 10, | ||
| ) | ||
| signature = self.private.sign(data) | ||
| self.assertIsNotNone(signature) | ||
| self.assertIsInstance(signature, bytes) | ||
| self.assertTrue(self.public.verify(data, signature)) | ||
| @requires(Mechanism.SHA512_RSA_PKCS) | ||
| def test_sign_stream_undersized_buffer(self): | ||
| data = ( | ||
| b"I" * 16, | ||
| b"N" * 16, | ||
| b"P" * 16, | ||
| b"U" * 16, | ||
| b"T" * 10, | ||
| ) | ||
| signature = self.private.sign(data, buffer_size=16) | ||
| self.assertIsNotNone(signature) | ||
| self.assertIsInstance(signature, bytes) | ||
| self.assertTrue(self.public.verify(data, signature)) | ||
| @requires(Mechanism.SHA512_RSA_PKCS) | ||
| def test_sign_stream_interrupt_releases_operation(self): | ||
| data = ( | ||
| b"I" * 16, | ||
| b"N" * 16, | ||
| b"P" * 16, | ||
| b"U" * 16, | ||
| b"T" * 10, | ||
| ) | ||
| def _data_with_error(): | ||
| yield data[0] | ||
| yield data[1] | ||
| yield data[2] | ||
| raise ValueError | ||
| def attempt_sign(): | ||
| self.private.sign(_data_with_error()) | ||
| self.assertRaises(ValueError, attempt_sign) | ||
| # ...try again | ||
| signature = self.private.sign(data) | ||
| self.assertIsNotNone(signature) | ||
| self.assertIsInstance(signature, bytes) | ||
| self.assertTrue(self.public.verify(data, signature)) | ||
| @requires(Mechanism.SHA512_RSA_PKCS) | ||
| def test_verify_stream_interrupt_releases_operation(self): | ||
| data = ( | ||
| b"I" * 16, | ||
| b"N" * 16, | ||
| b"P" * 16, | ||
| b"U" * 16, | ||
| b"T" * 10, | ||
| ) | ||
| def _data_with_error(): | ||
| yield data[0] | ||
| yield data[1] | ||
| yield data[2] | ||
| raise ValueError | ||
| signature = self.private.sign(data) | ||
| def attempt_verify(): | ||
| self.public.verify(_data_with_error(), signature) | ||
| self.assertRaises(ValueError, attempt_verify) | ||
| # ...try again | ||
| self.assertTrue(self.public.verify(data, signature)) | ||
| @requires(Mechanism.RSA_PKCS_OAEP) | ||
@@ -118,2 +235,15 @@ @FIXME.opencryptoki # can't set key attributes | ||
| @requires(Mechanism.SHA1_RSA_PKCS_PSS) | ||
| def test_sign_pss_undersized_buffer(self): | ||
| data = b"SOME DATA" | ||
| signature = self.private.sign( | ||
| data, | ||
| mechanism=Mechanism.SHA1_RSA_PKCS_PSS, | ||
| mechanism_param=(Mechanism.SHA_1, MGF.SHA1, 20), | ||
| buffer_size=16, | ||
| ) | ||
| self.assertTrue(self.public.verify(data, signature, mechanism=Mechanism.SHA1_RSA_PKCS_PSS)) | ||
| @requires(Mechanism.RSA_PKCS_OAEP) | ||
@@ -120,0 +250,0 @@ def test_encrypt_too_much_data(self): |
+186
-2
@@ -6,4 +6,13 @@ """ | ||
| import pkcs11 | ||
| from pkcs11 import ( | ||
| Attribute, | ||
| AttributeSensitive, | ||
| AttributeTypeInvalid, | ||
| ObjectClass, | ||
| PKCS11Error, | ||
| ) | ||
| from pkcs11.attributes import AttributeMapper, handle_bool, handle_str | ||
| from pkcs11.exceptions import PinIncorrect, PinLenRange | ||
| from . import FIXME, TOKEN_PIN, TOKEN_SO_PIN, Not, Only, TestCase, requires | ||
| from . import TOKEN_PIN, TOKEN_SO_PIN, Not, Only, TestCase, requires | ||
@@ -90,3 +99,2 @@ | ||
| @FIXME.opencryptoki | ||
| def test_create_object(self): | ||
@@ -150,2 +158,11 @@ with self.token.open(user_pin=TOKEN_PIN) as session: | ||
| @requires(pkcs11.Mechanism.AES_KEY_GEN) | ||
| def test_key_search_by_id(self): | ||
| with self.token.open(user_pin=TOKEN_PIN) as session: | ||
| key1 = session.generate_key(pkcs11.KeyType.AES, 128, label="KEY", id=b"1") | ||
| key2 = session.generate_key(pkcs11.KeyType.AES, 128, label="KEY", id=b"2") | ||
| self.assertEqual(session.get_key(id=b"1"), key1) | ||
| self.assertEqual(session.get_key(id=b"2"), key2) | ||
| self.assertNotEqual(session.get_key(id=b"1"), session.get_key(id=b"2")) | ||
| @Not.nfast # Not supported | ||
@@ -163,1 +180,168 @@ @Not.opencryptoki # Not supported | ||
| self.assertTrue(all(c != "\0" for c in random)) | ||
| @requires(pkcs11.Mechanism.AES_KEY_GEN) | ||
| def test_attribute_reading_failures(self): | ||
| with self.token.open(user_pin=TOKEN_PIN) as session: | ||
| key = session.generate_key(pkcs11.KeyType.AES, 128, label="SAMPLE KEY") | ||
| with self.assertRaises(AttributeSensitive): | ||
| key.__getitem__(Attribute.VALUE) | ||
| with self.assertRaises(AttributeTypeInvalid): | ||
| key.__getitem__(Attribute.CERTIFICATE_TYPE) | ||
| @requires(pkcs11.Mechanism.AES_KEY_GEN) | ||
| def test_bulk_attribute_raise_error_if_no_result(self): | ||
| with self.token.open(user_pin=TOKEN_PIN) as session: | ||
| key = session.generate_key(pkcs11.KeyType.AES, 128, label="SAMPLE KEY") | ||
| with self.assertRaises(AttributeSensitive): | ||
| key.get_attributes([Attribute.VALUE]) | ||
| with self.assertRaises(AttributeTypeInvalid): | ||
| key.get_attributes([Attribute.CERTIFICATE_TYPE]) | ||
| # we can't know which error code the token will choose here | ||
| with self.assertRaises(PKCS11Error): | ||
| key.get_attributes([Attribute.VALUE, Attribute.CERTIFICATE_TYPE]) | ||
| @requires(pkcs11.Mechanism.AES_KEY_GEN) | ||
| def test_bulk_attribute_partial_success_sensitive_attribute(self): | ||
| with self.token.open(user_pin=TOKEN_PIN) as session: | ||
| key = session.generate_key(pkcs11.KeyType.AES, 128, label="SAMPLE KEY") | ||
| result = key.get_attributes([Attribute.LABEL, Attribute.VALUE, Attribute.CLASS]) | ||
| expected = {Attribute.LABEL: "SAMPLE KEY", Attribute.CLASS: ObjectClass.SECRET_KEY} | ||
| self.assertDictEqual(expected, result) | ||
| @requires(pkcs11.Mechanism.AES_KEY_GEN) | ||
| def test_bulk_attribute_partial_success_irrelevant_attribute(self): | ||
| with self.token.open(user_pin=TOKEN_PIN) as session: | ||
| key = session.generate_key(pkcs11.KeyType.AES, 128, label="SAMPLE KEY", id=b"a") | ||
| result = key.get_attributes( | ||
| [Attribute.LABEL, Attribute.CERTIFICATE_TYPE, Attribute.CLASS, Attribute.ID] | ||
| ) | ||
| expected = { | ||
| Attribute.LABEL: "SAMPLE KEY", | ||
| Attribute.CLASS: ObjectClass.SECRET_KEY, | ||
| Attribute.ID: b"a", | ||
| } | ||
| self.assertDictEqual(expected, result) | ||
| @requires(pkcs11.Mechanism.AES_KEY_GEN) | ||
| def test_bulk_attribute_partial_success_with_some_empty_attrs(self): | ||
| with self.token.open(user_pin=TOKEN_PIN) as session: | ||
| key = session.generate_key(pkcs11.KeyType.AES, 128, label="", id=b"") | ||
| result = key.get_attributes( | ||
| [Attribute.LABEL, Attribute.CLASS, Attribute.VALUE, Attribute.ID] | ||
| ) | ||
| expected = { | ||
| Attribute.LABEL: "", | ||
| Attribute.CLASS: ObjectClass.SECRET_KEY, | ||
| Attribute.ID: b"", | ||
| } | ||
| self.assertDictEqual(expected, result) | ||
| @requires(pkcs11.Mechanism.AES_KEY_GEN) | ||
| def test_bulk_attribute_only_empty_attrs(self): | ||
| with self.token.open(user_pin=TOKEN_PIN) as session: | ||
| key = session.generate_key(pkcs11.KeyType.AES, 128, label="", id=b"") | ||
| result = key.get_attributes([Attribute.LABEL, Attribute.ID]) | ||
| expected = { | ||
| Attribute.LABEL: "", | ||
| Attribute.ID: b"", | ||
| } | ||
| self.assertDictEqual(expected, result) | ||
| @requires(pkcs11.Mechanism.AES_KEY_GEN) | ||
| def test_bulk_attribute_empty_key_list(self): | ||
| with self.token.open(user_pin=TOKEN_PIN) as session: | ||
| key = session.generate_key(pkcs11.KeyType.AES, 128, label="SAMPLE KEY") | ||
| result = key.get_attributes([]) | ||
| self.assertDictEqual({}, result) | ||
| @requires(pkcs11.Mechanism.AES_KEY_GEN) | ||
| def test_custom_attribute_mapper(self): | ||
| custom_mapper = AttributeMapper() | ||
| custom_mapper.register_handler(Attribute.ID, *handle_str) | ||
| with self.token.open(user_pin=TOKEN_PIN, attribute_mapper=custom_mapper) as session: | ||
| key = session.generate_key(pkcs11.KeyType.AES, 128, id="SAMPLE KEY") | ||
| id_attr = key[Attribute.ID] | ||
| self.assertIsInstance(id_attr, str) | ||
| self.assertEqual("SAMPLE KEY", id_attr) | ||
| @requires(pkcs11.Mechanism.AES_KEY_GEN) | ||
| def test_set_unsupported_attribute(self): | ||
| with self.token.open(user_pin=TOKEN_PIN) as session: | ||
| key = session.generate_key(pkcs11.KeyType.AES, 128, label="TEST") | ||
| with self.assertRaises(NotImplementedError): | ||
| key[0xDEADBEEF] = b"1234" | ||
| @requires(pkcs11.Mechanism.AES_KEY_GEN) | ||
| def test_treat_empty_bool_as_false(self): | ||
| class CustomMapper(AttributeMapper): | ||
| # contrived handler that decodes the 'ID' attribute as a bool | ||
| def _handler(self, key): | ||
| orig = super()._handler(key) | ||
| if key == Attribute.ID: | ||
| return orig[0], handle_bool[1] | ||
| return orig | ||
| with self.token.open(user_pin=TOKEN_PIN, attribute_mapper=CustomMapper()) as session: | ||
| key = session.generate_key(pkcs11.KeyType.AES, 128, id=b"") | ||
| bool_read = key[Attribute.ID] | ||
| self.assertIsInstance(bool_read, bool) | ||
| self.assertFalse(bool_read, False) | ||
| @Only.softhsm2 | ||
| def test_set_pin(self): | ||
| old_token_pin = TOKEN_PIN | ||
| new_token_pin = f"{TOKEN_PIN}56" | ||
| with self.token.open(rw=True, user_pin=old_token_pin) as session: | ||
| session.set_pin(old_token_pin, new_token_pin) | ||
| with self.token.open(user_pin=new_token_pin) as session: | ||
| self.assertIsInstance(session, pkcs11.Session) | ||
| with self.token.open(rw=True, user_pin=new_token_pin) as session: | ||
| session.set_pin(new_token_pin, old_token_pin) | ||
| with self.token.open(user_pin=old_token_pin) as session: | ||
| self.assertIsInstance(session, pkcs11.Session) | ||
| with self.token.open(rw=True, user_pin=old_token_pin) as session: | ||
| with self.assertRaises(AttributeError): | ||
| session.set_pin(None, new_token_pin) | ||
| with self.assertRaises(AttributeError): | ||
| session.set_pin(old_token_pin, None) | ||
| with self.assertRaises(PinLenRange): | ||
| session.set_pin(old_token_pin, "") | ||
| with self.assertRaises(PinIncorrect): | ||
| session.set_pin("", new_token_pin) | ||
| @Only.softhsm2 | ||
| def test_init_pin(self): | ||
| new_token_pin = f"{TOKEN_PIN}56" | ||
| with self.token.open(rw=True, so_pin=TOKEN_SO_PIN) as session: | ||
| session.init_pin(new_token_pin) | ||
| with self.token.open(rw=True, user_pin=new_token_pin) as session: | ||
| self.assertIsInstance(session, pkcs11.Session) | ||
| with self.token.open(rw=True, so_pin=TOKEN_SO_PIN) as session: | ||
| session.init_pin(TOKEN_PIN) | ||
| with self.token.open(rw=True, user_pin=TOKEN_PIN) as session: | ||
| self.assertIsInstance(session, pkcs11.Session) | ||
| with self.token.open(rw=True, so_pin=TOKEN_SO_PIN) as session: | ||
| with self.assertRaises(AttributeError): | ||
| session.init_pin(None) | ||
| with self.assertRaises(PinLenRange): | ||
| session.init_pin("") |
@@ -8,2 +8,3 @@ """ | ||
| import pkcs11 | ||
| from pkcs11 import PKCS11Error | ||
@@ -15,4 +16,7 @@ from . import LIB, TOKEN, Not, Only | ||
| def test_double_initialise(self): | ||
| self.assertIsNotNone(pkcs11.lib(LIB)) | ||
| self.assertIsNotNone(pkcs11.lib(LIB)) | ||
| attempt1 = pkcs11.lib(LIB) | ||
| attempt2 = pkcs11.lib(LIB) | ||
| self.assertIsNotNone(attempt1) | ||
| self.assertIsNotNone(attempt2) | ||
| self.assertIs(attempt1, attempt2) | ||
@@ -23,5 +27,5 @@ def test_nonexistent_lib(self): | ||
| def test_double_initialise_different_libs(self): | ||
| def test_double_initialise_nonexistent_lib(self): | ||
| self.assertIsNotNone(pkcs11.lib(LIB)) | ||
| with self.assertRaises(pkcs11.AlreadyInitialized): | ||
| with self.assertRaises(RuntimeError): | ||
| pkcs11.lib("somethingelse.so") | ||
@@ -46,2 +50,39 @@ | ||
| def test_reinitialize(self): | ||
| lib = pkcs11.lib(LIB) | ||
| slots = lib.get_slots() | ||
| self.assertGreaterEqual(len(slots), 1) | ||
| lib.reinitialize() | ||
| self.assertTrue(lib.initialized) | ||
| lib = pkcs11.lib(LIB) | ||
| slots = lib.get_slots() | ||
| self.assertGreaterEqual(len(slots), 1) | ||
| def test_finalize(self): | ||
| lib = pkcs11.lib(LIB) | ||
| slots = lib.get_slots() | ||
| self.assertGreaterEqual(len(slots), 1) | ||
| lib.finalize() | ||
| self.assertFalse(lib.initialized) | ||
| self.assertRaises(PKCS11Error, lib.get_slots) | ||
| def test_auto_reinitialise(self): | ||
| lib = pkcs11.lib(LIB) | ||
| lib.finalize() | ||
| self.assertFalse(lib.initialized) | ||
| lib = pkcs11.lib(LIB) | ||
| slots = lib.get_slots() | ||
| self.assertGreaterEqual(len(slots), 1) | ||
| def test_unload_reload(self): | ||
| pkcs11.lib(LIB) | ||
| pkcs11.unload(LIB) | ||
| lib = pkcs11.lib(LIB) | ||
| slots = lib.get_slots() | ||
| self.assertGreaterEqual(len(slots), 1) | ||
| def test_get_mechanism_info(self): | ||
@@ -48,0 +89,0 @@ lib = pkcs11.lib(LIB) |
+17
-20
@@ -128,23 +128,20 @@ """ | ||
| @requires(Mechanism.ECDSA_SHA1) | ||
| @requires(Mechanism.ECDSA_SHA256) | ||
| def test_verify_certificate_ecdsa(self): | ||
| # Warning: proof of concept code only! | ||
| CERT = base64.b64decode(""" | ||
| MIIDGjCCAsKgAwIBAgIJAL+PbwiJUZB1MAkGByqGSM49BAEwRTELMAkGA1UEBhMC | ||
| QVUxEzARBgNVBAgTClNvbWUtU3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdp | ||
| dHMgUHR5IEx0ZDAeFw0xNzA3MDMxMTUxMTBaFw0xOTA3MDMxMTUxMTBaMEUxCzAJ | ||
| BgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5l | ||
| dCBXaWRnaXRzIFB0eSBMdGQwggFLMIIBAwYHKoZIzj0CATCB9wIBATAsBgcqhkjO | ||
| PQEBAiEA/////wAAAAEAAAAAAAAAAAAAAAD///////////////8wWwQg/////wAA | ||
| AAEAAAAAAAAAAAAAAAD///////////////wEIFrGNdiqOpPns+u9VXaYhrxlHQaw | ||
| zFOw9jvOPD4n0mBLAxUAxJ02CIbnBJNqZnjhE50mt4GffpAEQQRrF9Hy4SxCR/i8 | ||
| 5uVjpEDydwN9gS3rM6D0oTlF2JjClk/jQuL+Gn+bjufrSnwPnhYrzjNXazFezsu2 | ||
| QGg3v1H1AiEA/////wAAAAD//////////7zm+q2nF56E87nKwvxjJVECAQEDQgAE | ||
| royPJHkCQMq55egxmQxkFWqiz+yJx0MZP98is99SrkiK5UadFim3r3ZSt5kfh/cc | ||
| Ccmy94BZCmihhGJ0F4eB2qOBpzCBpDAdBgNVHQ4EFgQURNXKlYGsAMItf4Ad8fkg | ||
| Rg9ATqEwdQYDVR0jBG4wbIAURNXKlYGsAMItf4Ad8fkgRg9ATqGhSaRHMEUxCzAJ | ||
| BgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5l | ||
| dCBXaWRnaXRzIFB0eSBMdGSCCQC/j28IiVGQdTAMBgNVHRMEBTADAQH/MAkGByqG | ||
| SM49BAEDRwAwRAIgAdJp/S9vSjS6EvRy/9zl5k2DBKGI52A3Ygsp1a96UicCIDul | ||
| m/eL2OcGdNbzqzsC11alhemJX7Qt9GOcVqQwROIm | ||
| MIICgzCCAgmgAwIBAgICEAAwCgYIKoZIzj0EAwIwUTELMAkGA1UEBhMCQkUxFDAS | ||
| BgNVBAoMC0V4YW1wbGUgSW5jMRowGAYDVQQLDBFUZXN0aW5nIEF1dGhvcml0eTEQ | ||
| MA4GA1UEAwwHUm9vdCBDQTAgFw0wMDAxMDEwMDAwMDBaGA8yNTAwMDEwMTAwMDAw | ||
| MFowUTELMAkGA1UEBhMCQkUxFDASBgNVBAoMC0V4YW1wbGUgSW5jMRowGAYDVQQL | ||
| DBFUZXN0aW5nIEF1dGhvcml0eTEQMA4GA1UEAwwHUm9vdCBDQTB2MBAGByqGSM49 | ||
| AgEGBSuBBAAiA2IABIxRV+HCT5hbggdOa0CxbOyLRgCRQIFHnsjwk7UZCBeb+SHb | ||
| r4zHM447nASOEwJKvc37UttkdC4lpdOjw9OkwltCSMCS2s22v18//MqjRoQ8wAiX | ||
| hk1mR499ltu1jKicDKOBsTCBrjAdBgNVHQ4EFgQUJNkIpnJ27yAJidmTShDvCLfz | ||
| PJYwHwYDVR0jBBgwFoAUJNkIpnJ27yAJidmTShDvCLfzPJYwDwYDVR0TAQH/BAUw | ||
| AwEB/zAOBgNVHQ8BAf8EBAMCAYYwSwYDVR0fBEQwQjBAoD6gPIY6aHR0cDovL3B5 | ||
| aGFua28udGVzdHMvdGVzdGluZy1jYS1lY2RzYS9jcmxzL3Jvb3QvbGF0ZXN0LmNy | ||
| bDAKBggqhkjOPQQDAgNoADBlAjApktbaE81Qil3bbI5UFWqpH4JsW1pgucZTlQN+ | ||
| VmXMRT/0SVHTMM64IK1B8CzVhI8CMQCFbdX+K7KZYNDYuA7gTQHdp7l12PXMoBGE | ||
| dcda0K/1qwvA2w6mNU1qi/b0Is7oA0I= | ||
| """) | ||
@@ -159,7 +156,7 @@ | ||
| assert x509.signature_algo == "ecdsa" | ||
| assert x509.hash_algo == "sha1" | ||
| assert x509.hash_algo == "sha256" | ||
| signature = decode_ecdsa_signature(x509.signature) | ||
| self.assertTrue(key.verify(value, signature, mechanism=Mechanism.ECDSA_SHA1)) | ||
| self.assertTrue(key.verify(value, signature, mechanism=Mechanism.ECDSA_SHA256)) | ||
@@ -166,0 +163,0 @@ @requires(Mechanism.RSA_PKCS_KEY_PAIR_GEN, Mechanism.SHA256_RSA_PKCS) |
-27
| ARG IMAGE=debian:stable | ||
| FROM $IMAGE | ||
| RUN apt-get update && \ | ||
| DEBIAN_FRONTEND="noninteractive" apt-get install -y gcc python3 python3-dev softhsm2 openssl && \ | ||
| rm -rf /var/lib/apt/lists/* | ||
| COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/ | ||
| WORKDIR /test | ||
| ADD uv.lock pyproject.toml setup.py . | ||
| ADD pkcs11/ pkcs11/ | ||
| ADD extern/ extern/ | ||
| ENV UV_LINK_MODE=copy | ||
| RUN --mount=type=cache,target=/root/.cache/uv \ | ||
| uv sync --all-extras | ||
| ENV PKCS11_MODULE=/usr/lib/softhsm/libsofthsm2.so | ||
| ENV PKCS11_TOKEN_LABEL=TEST | ||
| ENV PKCS11_TOKEN_PIN=1234 | ||
| ENV PKCS11_TOKEN_SO_PIN=5678 | ||
| RUN softhsm2-util --init-token --free --label TEST --pin 1234 --so-pin 5678 | ||
| ADD tests/ tests/ | ||
| CMD ["uv", "run", "pytest", "-v"] |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
762018
11.88%68
6.25%5680
16.92%