Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

qmb

Package Overview
Dependencies
Maintainers
1
Versions
9
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

qmb - npm Package Compare versions

Comparing version
0.0.10
to
0.0.51
+17
.github/dependabot.yml
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "weekly"
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "weekly"
- package-ecosystem: "docker"
directory: "/"
schedule:
interval: "weekly"
name: "🐛 Bug Report"
description: Create a new ticket for a bug.
title: "🐛 [BUG] - <title>"
labels: [
"bug"
]
body:
- type: textarea
id: description
attributes:
label: "Description"
description: "Provide a clear and detailed description of the issue."
placeholder: "Briefly summarize the problem..."
validations:
required: true
- type: textarea
id: version
attributes:
label: "Package Version or Git Commit Hash"
description: "Specify the version of the package or the exact Git commit hash where the bug occurs."
placeholder: "e.g., v1.2.3 or abc1234..."
validations:
required: true
- type: textarea
id: reprod
attributes:
label: "Steps to Reproduce"
description: "Outline the steps necessary to reproduce the issue."
placeholder: "Step 1: ... Step 2: ... Step 3: ..."
validations:
required: false
- type: textarea
id: logs
attributes:
label: "Relevant Logs"
description: "Attach any relevant logs or error messages (if applicable)."
placeholder: "Paste logs here..."
render: bash
validations:
required: false
- type: textarea
id: additional_context
attributes:
label: "Additional Context"
description: "Provide any extra information or context that might help address the issue."
placeholder: "Add any relevant details, links, or references..."
validations:
required: false
name: "📄 Documentation Issue"
description: Create a new ticket for improvements on the documentation.
title: "📄 [DOCS] - <title>"
labels: [
"documentation"
]
body:
- type: textarea
id: description
attributes:
label: "Description"
description: "Provide a clear explanation of the documentation issue or improvement suggestion."
placeholder: "Briefly describe the problem or enhancement..."
validations:
required: true
- type: textarea
id: location
attributes:
label: "Documentation Location"
description: "Specify the section or page where the issue exists or the improvement is needed."
placeholder: "e.g., Getting Started Guide, API Reference, etc."
validations:
required: true
- type: textarea
id: current_content
attributes:
label: "Current Content (if applicable)"
description: "Copy the current content that needs to be corrected or improved."
placeholder: "Paste the existing text here..."
validations:
required: false
- type: textarea
id: suggested_change
attributes:
label: "Suggested Change"
description: "Propose how the documentation should be updated or improved."
placeholder: "Describe your suggested changes or additions..."
validations:
required: false
- type: textarea
id: additional_context
attributes:
label: "Additional Context"
description: "Provide any extra information or context that might help address the issue."
placeholder: "Add any relevant details, links, or references..."
validations:
required: false
name: "💡 Feature Request"
description: Create a new ticket for a new feature request
title: "💡 [REQUEST] - <title>"
labels: [
"enhancement"
]
body:
- type: textarea
id: description
attributes:
label: "Description"
description: "Provide a concise overview of the proposed feature."
placeholder: "Briefly describe the feature you'd like to request..."
validations:
required: true
- type: textarea
id: basic_example
attributes:
label: "Basic Example"
description: "Provide a simple example or use case to illustrate the feature."
placeholder: "Describe how the feature could be used or implemented..."
validations:
required: false
- type: textarea
id: additional_context
attributes:
label: "Additional Context"
description: "Provide any extra information or context that might help address the issue."
placeholder: "Add any relevant details, links, or references..."
validations:
required: false
<!-- Thank you for contributing to qmb! -->
# Description
<!-- Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. -->
# Checklist:
- [ ] I have read the [CONTRIBUTING.md](/CONTRIBUTING.md).
name: Build docker images
on:
push:
tags:
- '*'
jobs:
docker:
runs-on: ubuntu-latest
steps:
- uses: endersonmenezes/free-disk-space@v2
with:
remove_android: true
remove_dotnet: true
remove_haskell: true
remove_tool_cache: true
- uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: docker/setup-qemu-action@v3
- uses: docker/setup-buildx-action@v3
- uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- uses: docker/build-push-action@v6
with:
context: .
platforms: linux/amd64
push: true
tags: |
${{ secrets.DOCKERHUB_USERNAME }}/qmb:latest
${{ secrets.DOCKERHUB_USERNAME }}/qmb:${{ github.ref_name }}
when:
- event: push
- event: pull_request
clone:
git:
image: woodpeckerci/plugin-git:2.6
settings:
tags: true
partial: false
steps:
- name: tagging
image: mcp/git:latest
commands:
- git describe --tags | tee .tag
- name: credential
image: python:3.12
commands:
- echo AWS_ACCESS_KEY_ID=$${AWS_ACCESS_KEY_ID} >> .env
- echo AWS_SECRET_ACCESS_KEY=$${AWS_SECRET_ACCESS_KEY} >> .env
- cat .env
environment:
AWS_ACCESS_KEY_ID:
from_secret: minio_access_key
AWS_SECRET_ACCESS_KEY:
from_secret: minio_secret_key
when:
- event: push
- name: credential
image: python:3.12
commands:
- echo AWS_ACCESS_KEY_ID=$${AWS_ACCESS_KEY_ID} >> .env
- echo AWS_SECRET_ACCESS_KEY=$${AWS_SECRET_ACCESS_KEY} >> .env
- cat .env
environment:
AWS_ACCESS_KEY_ID:
from_secret: readonly_minio_access_key
AWS_SECRET_ACCESS_KEY:
from_secret: readonly_minio_secret_key
when:
- event: pull_request
- name: docker
image: woodpeckerci/plugin-docker-buildx:6-insecure
settings:
repo: git.kclab.cloud/hzhangxyz/qmb
username: hzhangxyz
password:
from_secret: gitea_package
registry: git.kclab.cloud
tags_file: .tag
cache-to: type=s3,bucket=cache-53030,region=local,endpoint_url=https://s3.kclab.cloud,prefix=${CI_REPO}/docker/,mode=max
cache-from:
- type=s3\\,bucket=cache-53030\\,region=local\\,endpoint_url=https://s3.kclab.cloud\\,prefix=${CI_REPO}/docker/
env_file: .env
buildkit_config: |
[registry."docker.io"]
mirrors = ["https://docker.mirrors.kclab.cloud/"]
build_args:
PYPI_MIRROR: https://mirrors.ustc.edu.cn/pypi/simple
when:
- event: push
- name: docker
image: woodpeckerci/plugin-docker-buildx:6-insecure
settings:
repo: git.kclab.cloud/hzhangxyz/qmb
dry_run: true
tags_file: .tag
cache-from:
- type=s3\\,bucket=cache-53030\\,region=local\\,endpoint_url=https://s3.kclab.cloud\\,prefix=${CI_REPO}/docker/
env_file: .env
buildkit_config: |
[registry."docker.io"]
mirrors = ["https://docker.mirrors.kclab.cloud/"]
build_args:
PYPI_MIRROR: https://mirrors.ustc.edu.cn/pypi/simple
when:
- event: pull_request
when:
- event: push
- event: pull_request
variables:
- &pip_config
PIP_INDEX_URL: https://mirrors.ustc.edu.cn/pypi/simple
- &pre_commit_config
PRE_COMMIT_HOME: /woodpecker/cache/pre-commit
- &minio_config
MINIO_HOST: https://s3.kclab.cloud
MINIO_BUCKET: cache-53030
steps:
- name: open cache
image: minio/mc:latest
environment:
<<: [*pre_commit_config, *minio_config]
MINIO_ACCESS_KEY:
from_secret: readonly_minio_access_key
MINIO_SECRET_KEY:
from_secret: readonly_minio_secret_key
commands:
- mkdir --parents $${PRE_COMMIT_HOME}
- touch $${PRE_COMMIT_HOME}/.keep
- mc alias set minio $${MINIO_HOST} $${MINIO_ACCESS_KEY} $${MINIO_SECRET_KEY}
- mc cp --recursive minio/$${MINIO_BUCKET}/${CI_REPO}/pre-commit/ $${PRE_COMMIT_HOME}/
failure: ignore
- name: pre-commit
image: python:3.12
environment:
<<: [*pre_commit_config, *pip_config]
commands:
- pip install pre-commit
- pip install '.[dev]'
- pre-commit run --all-files
- name: save cache
image: minio/mc:latest
environment:
<<: [*pre_commit_config, *minio_config]
MINIO_ACCESS_KEY:
from_secret: minio_access_key
MINIO_SECRET_KEY:
from_secret: minio_secret_key
commands:
- mc alias set minio $${MINIO_HOST} $${MINIO_ACCESS_KEY} $${MINIO_SECRET_KEY}
- mc cp --recursive $${PRE_COMMIT_HOME}/ minio/$${MINIO_BUCKET}/${CI_REPO}/pre-commit/
when:
event: push
This folder contains scripts for testing in our own CI/CD platform based on Woodpecker CI, located in the USTC Knowledge Computing Laboratory. These scripts may contain many hard-coded absolute URLs, so they should not be used elsewhere.
when:
- event: push
- event: pull_request
variables:
- &pip_config
PIP_INDEX_URL: https://mirrors.ustc.edu.cn/pypi/simple
- &repo_config
TWINE_REPOSITORY_URL: https://git.kclab.cloud/api/packages/hzhangxyz/pypi
TWINE_USERNAME: hzhangxyz
TWINE_PASSWORD:
from_secret: gitea_package
clone:
git:
image: woodpeckerci/plugin-git:2.6
settings:
tags: true
partial: false
steps:
- name: build
image: python:3.12
environment:
<<: *pip_config
commands:
- pip install pipx
- pipx run build
- name: upload
image: python:3.12
environment:
<<: [*pip_config, *repo_config]
commands:
- pip install pipx
- pipx run twine upload dist/* --verbose
when:
event: push
# Contributing to qmb
Thanks for your interest in contributing to qmb!
Any kinds of contributions are welcome.
## How to contribute to qmb
If you have/implemented an idea or find/fixed a bug, use GitHub issues or pull requests.
Providing a minimum working example for bugs would be helpful.
If you have any questions, please use the GitHub discussions.
## Code structure
The code structure of qmb is following the standard Python package structure.
We organize the package code into the folder named `qmb`, and the tests into the folder named `tests`.
The file `pyproject.toml` is used to define the package metadata.
The file `Dockerfile` is used to build the Docker image for the CLI application.
There are also some other files such as `.clang-format`, `.pre-commit-config.yaml` used to format and lint the code.
## How to get involved
Please learn the basic Git usage to contribute to qmb.
We use the git flow mode to merge the pull requests.
Please provide the essential information with proper formatting in the git commit message and the pull request description.
Please make sure that your code is properly formatted, linted and typed when you submit a pull request.
In Python code conventions, use double quotes (") for string literals and single quotes (') for character literals.
The comments in the code are expected to be enough for other developers to understand your code.
Please add docstrings to the code in NumPy style.
If necessary, please update documentations and add tests for your changes.
Any warning should be fixed when submitting a pull request.
At last, please check carefully on the correctness and the robustness of the code when submitting a pull request.
---
Thanks! :heart: :heart: :heart:
USTC Knowledge Computing Lab
# Use the specified CUDA base image with Rocky Linux 9
FROM nvidia/cuda:12.9.1-cudnn-devel-rockylinux9
# Install dependencies
WORKDIR /app
RUN dnf update --assumeyes && \
dnf install --assumeyes git python3.12-devel libcusparselt-devel && \
dnf clean all
ARG PYPI_MIRROR=https://pypi.org/simple
RUN python3.12 -m ensurepip && \
pip3.12 install --index-url=${PYPI_MIRROR} --upgrade pip
# Install the package
COPY . /app
RUN pip3.12 install --index-url=${PYPI_MIRROR} .
ENV XDG_DATA_HOME=/tmp/data XDG_CONFIG_HOME=/tmp/config XDG_CACHE_HOME=/tmp/cache XDG_STATE_HOME=/tmp/state
# Set the entrypoint
ENTRYPOINT ["qmb"]
CMD []
"""
This is the main entry point for the command line application.
For the details of the command line application, run `qmb --help` or `python -m qmb --help`.
"""
import tyro
from . import openfermion as _ # type: ignore[no-redef]
from . import fcidump as _ # type: ignore[no-redef]
from . import hubbard as _ # type: ignore[no-redef]
from . import ising as _ # type: ignore[no-redef]
from . import vmc as _ # type: ignore[no-redef]
from . import haar as _ # type: ignore[no-redef]
from . import rldiag as _ # type: ignore[no-redef]
from . import precompile as _ # type: ignore[no-redef]
from . import list_loss as _ # type: ignore[no-redef]
from . import chop_imag as _ # type: ignore[no-redef]
from . import run as _ # type: ignore[no-redef]
from .subcommand_dict import subcommand_dict
def main() -> None:
"""
The main function for the command line application.
"""
tyro.extras.subcommand_cli_from_dict(subcommand_dict).main()
if __name__ == "__main__":
main()
#include <torch/extension.h>
namespace qmb_hamiltonian_cpu {
constexpr torch::DeviceType device = torch::kCPU;
template<typename T, std::int64_t size>
struct array_less {
bool operator()(const std::array<T, size>& lhs, const std::array<T, size>& rhs) const {
for (std::int64_t i = 0; i < size; ++i) {
if (lhs[i] < rhs[i]) {
return true;
}
if (lhs[i] > rhs[i]) {
return false;
}
}
return false;
}
};
template<typename T, std::int64_t size>
struct array_square_greater {
T square(const std::array<T, size>& value) const {
T result = 0;
for (std::int64_t i = 0; i < size; ++i) {
result += value[i] * value[i];
}
return result;
}
bool operator()(const std::array<T, size>& lhs, const std::array<T, size>& rhs) const {
return square(lhs) > square(rhs);
}
};
bool get_bit(std::uint8_t* data, std::uint8_t index) {
return ((*data) >> index) & 1;
}
void set_bit(std::uint8_t* data, std::uint8_t index, bool value) {
if (value) {
*data |= (1 << index);
} else {
*data &= ~(1 << index);
}
}
template<std::int64_t max_op_number, std::int64_t n_qubytes, std::int64_t particle_cut>
std::pair<bool, bool> hamiltonian_apply_kernel(
std::array<std::uint8_t, n_qubytes>& current_configs,
std::int64_t term_index,
std::int64_t batch_index,
const std::array<std::int16_t, max_op_number>* site, // term_number
const std::array<std::uint8_t, max_op_number>* kind // term_number
) {
static_assert(particle_cut == 1 || particle_cut == 2, "particle_cut != 1 or 2 not implemented");
bool success = true;
bool parity = false;
for (std::int64_t op_index = max_op_number; op_index-- > 0;) {
std::int16_t site_single = site[term_index][op_index];
std::uint8_t kind_single = kind[term_index][op_index];
if (kind_single == 2) {
continue;
}
std::uint8_t to_what = kind_single;
if (get_bit(&current_configs[site_single / 8], site_single % 8) == to_what) {
success = false;
break;
}
set_bit(&current_configs[site_single / 8], site_single % 8, to_what);
if constexpr (particle_cut == 1) {
for (std::int16_t s = 0; s < site_single; ++s) {
parity ^= get_bit(&current_configs[s / 8], s % 8);
}
}
}
return std::make_pair(success, parity);
}
template<std::int64_t max_op_number, std::int64_t n_qubytes, std::int64_t particle_cut>
void apply_within_kernel(
std::int64_t term_index,
std::int64_t batch_index,
std::int64_t term_number,
std::int64_t batch_size,
std::int64_t result_batch_size,
const std::array<std::int16_t, max_op_number>* site, // term_number
const std::array<std::uint8_t, max_op_number>* kind, // term_number
const std::array<double, 2>* coef, // term_number
const std::array<std::uint8_t, n_qubytes>* configs, // batch_size
const std::array<double, 2>* psi, // batch_size
const std::array<std::uint8_t, n_qubytes>* result_configs, // result_batch_size
std::array<double, 2>* result_psi
) {
std::array<std::uint8_t, n_qubytes> current_configs = configs[batch_index];
auto [success, parity] = hamiltonian_apply_kernel<max_op_number, n_qubytes, particle_cut>(
/*current_configs=*/current_configs,
/*term_index=*/term_index,
/*batch_index=*/batch_index,
/*site=*/site,
/*kind=*/kind
);
if (!success) {
return;
}
success = false;
std::int64_t low = 0;
std::int64_t high = result_batch_size - 1;
std::int64_t mid = 0;
auto less = array_less<std::uint8_t, n_qubytes>();
while (low <= high) {
mid = (low + high) / 2;
if (less(current_configs, result_configs[mid])) {
high = mid - 1;
} else if (less(result_configs[mid], current_configs)) {
low = mid + 1;
} else {
success = true;
break;
}
}
if (!success) {
return;
}
std::int8_t sign = parity ? -1 : +1;
result_psi[mid][0] += sign * (coef[term_index][0] * psi[batch_index][0] - coef[term_index][1] * psi[batch_index][1]);
result_psi[mid][1] += sign * (coef[term_index][0] * psi[batch_index][1] + coef[term_index][1] * psi[batch_index][0]);
}
template<std::int64_t max_op_number, std::int64_t n_qubytes, std::int64_t particle_cut>
void apply_within_kernel_interface(
std::int64_t term_number,
std::int64_t batch_size,
std::int64_t result_batch_size,
const std::array<std::int16_t, max_op_number>* site, // term_number
const std::array<std::uint8_t, max_op_number>* kind, // term_number
const std::array<double, 2>* coef, // term_number
const std::array<std::uint8_t, n_qubytes>* configs, // batch_size
const std::array<double, 2>* psi, // batch_size
const std::array<std::uint8_t, n_qubytes>* result_configs, // result_batch_size
std::array<double, 2>* result_psi
) {
for (std::int64_t term_index = 0; term_index < term_number; ++term_index) {
for (std::int64_t batch_index = 0; batch_index < batch_size; ++batch_index) {
apply_within_kernel<max_op_number, n_qubytes, particle_cut>(
/*term_index=*/term_index,
/*batch_index=*/batch_index,
/*term_number=*/term_number,
/*batch_size=*/batch_size,
/*result_batch_size=*/result_batch_size,
/*site=*/site,
/*kind=*/kind,
/*coef=*/coef,
/*configs=*/configs,
/*psi=*/psi,
/*result_configs=*/result_configs,
/*result_psi=*/result_psi
);
}
}
}
template<std::int64_t max_op_number, std::int64_t n_qubytes, std::int64_t particle_cut>
auto apply_within_interface(
const torch::Tensor& configs,
const torch::Tensor& psi,
const torch::Tensor& result_configs,
const torch::Tensor& site,
const torch::Tensor& kind,
const torch::Tensor& coef
) -> torch::Tensor {
std::int64_t device_id = configs.device().index();
std::int64_t batch_size = configs.size(0);
std::int64_t result_batch_size = result_configs.size(0);
std::int64_t term_number = site.size(0);
TORCH_CHECK(configs.device().type() == torch::kCPU, "configs must be on CPU.")
TORCH_CHECK(configs.device().index() == device_id, "configs must be on the same device as others.");
TORCH_CHECK(configs.is_contiguous(), "configs must be contiguous.")
TORCH_CHECK(configs.dtype() == torch::kUInt8, "configs must be uint8.")
TORCH_CHECK(configs.dim() == 2, "configs must be 2D.")
TORCH_CHECK(configs.size(0) == batch_size, "configs batch size must match the provided batch_size.");
TORCH_CHECK(configs.size(1) == n_qubytes, "configs must have the same number of qubits as the provided n_qubytes.");
TORCH_CHECK(psi.device().type() == torch::kCPU, "psi must be on CPU.")
TORCH_CHECK(psi.device().index() == device_id, "psi must be on the same device as others.");
TORCH_CHECK(psi.is_contiguous(), "psi must be contiguous.")
TORCH_CHECK(psi.dtype() == torch::kFloat64, "psi must be float64.")
TORCH_CHECK(psi.dim() == 2, "psi must be 2D.")
TORCH_CHECK(psi.size(0) == batch_size, "psi batch size must match the provided batch_size.");
TORCH_CHECK(psi.size(1) == 2, "psi must contain 2 elements for each batch.");
TORCH_CHECK(result_configs.device().type() == torch::kCPU, "result_configs must be on CPU.")
TORCH_CHECK(result_configs.device().index() == device_id, "result_configs must be on the same device as others.");
TORCH_CHECK(result_configs.is_contiguous(), "result_configs must be contiguous.")
TORCH_CHECK(result_configs.dtype() == torch::kUInt8, "result_configs must be uint8.")
TORCH_CHECK(result_configs.dim() == 2, "result_configs must be 2D.")
TORCH_CHECK(result_configs.size(0) == result_batch_size, "result_configs batch size must match the provided result_batch_size.")
TORCH_CHECK(result_configs.size(1) == n_qubytes, "result_configs must have the same number of qubits as the provided n_qubytes.");
TORCH_CHECK(site.device().type() == torch::kCPU, "site must be on CPU.")
TORCH_CHECK(site.device().index() == device_id, "site must be on the same device as others.");
TORCH_CHECK(site.is_contiguous(), "site must be contiguous.")
TORCH_CHECK(site.dtype() == torch::kInt16, "site must be int16.")
TORCH_CHECK(site.dim() == 2, "site must be 2D.")
TORCH_CHECK(site.size(0) == term_number, "site size must match the provided term_number.");
TORCH_CHECK(site.size(1) == max_op_number, "site must match the provided max_op_number.");
TORCH_CHECK(kind.device().type() == torch::kCPU, "kind must be on CPU.")
TORCH_CHECK(kind.device().index() == device_id, "kind must be on the same device as others.");
TORCH_CHECK(kind.is_contiguous(), "kind must be contiguous.")
TORCH_CHECK(kind.dtype() == torch::kUInt8, "kind must be uint8.")
TORCH_CHECK(kind.dim() == 2, "kind must be 2D.")
TORCH_CHECK(kind.size(0) == term_number, "kind size must match the provided term_number.");
TORCH_CHECK(kind.size(1) == max_op_number, "kind must match the provided max_op_number.");
TORCH_CHECK(coef.device().type() == torch::kCPU, "coef must be on CPU.")
TORCH_CHECK(coef.device().index() == device_id, "coef must be on the same device as others.");
TORCH_CHECK(coef.is_contiguous(), "coef must be contiguous.")
TORCH_CHECK(coef.dtype() == torch::kFloat64, "coef must be float64.")
TORCH_CHECK(coef.dim() == 2, "coef must be 2D.")
TORCH_CHECK(coef.size(0) == term_number, "coef size must match the provided term_number.");
TORCH_CHECK(coef.size(1) == 2, "coef must contain 2 elements for each term.");
auto result_sort_index = torch::arange(result_batch_size, torch::TensorOptions().dtype(torch::kInt64).device(device, device_id));
std::sort(
reinterpret_cast<std::int64_t*>(result_sort_index.data_ptr()),
reinterpret_cast<std::int64_t*>(result_sort_index.data_ptr()) + result_batch_size,
[&result_configs](std::int64_t i1, std::int64_t i2) {
return array_less<std::uint8_t, n_qubytes>()(
reinterpret_cast<const std::array<std::uint8_t, n_qubytes>*>(result_configs.data_ptr())[i1],
reinterpret_cast<const std::array<std::uint8_t, n_qubytes>*>(result_configs.data_ptr())[i2]
);
}
);
auto sorted_result_configs = result_configs.index({result_sort_index});
auto sorted_result_psi = torch::zeros({result_batch_size, 2}, torch::TensorOptions().dtype(torch::kFloat64).device(device, device_id));
apply_within_kernel_interface<max_op_number, n_qubytes, particle_cut>(
/*term_number=*/term_number,
/*batch_size=*/batch_size,
/*result_batch_size=*/result_batch_size,
/*site=*/reinterpret_cast<const std::array<std::int16_t, max_op_number>*>(site.data_ptr()),
/*kind=*/reinterpret_cast<const std::array<std::uint8_t, max_op_number>*>(kind.data_ptr()),
/*coef=*/reinterpret_cast<const std::array<double, 2>*>(coef.data_ptr()),
/*configs=*/reinterpret_cast<const std::array<std::uint8_t, n_qubytes>*>(configs.data_ptr()),
/*psi=*/reinterpret_cast<const std::array<double, 2>*>(psi.data_ptr()),
/*result_configs=*/reinterpret_cast<const std::array<std::uint8_t, n_qubytes>*>(sorted_result_configs.data_ptr()),
/*result_psi=*/reinterpret_cast<std::array<double, 2>*>(sorted_result_psi.data_ptr())
);
auto result_psi = torch::zeros_like(sorted_result_psi);
result_psi.index_put_({result_sort_index}, sorted_result_psi);
return result_psi;
}
template<typename T, typename Less = std::less<T>>
void add_into_heap(T* heap, std::int64_t heap_size, const T& value) {
auto less = Less();
std::int64_t index = 0;
if (less(value, heap[index])) {
} else {
while (true) {
// Calculate the indices of the left and right children
std::int64_t left = (index << 1) + 1;
std::int64_t right = (index << 1) + 2;
std::int64_t left_present = left < heap_size;
std::int64_t right_present = right < heap_size;
if (left_present) {
if (right_present) {
// Both left and right children are present
if (less(value, heap[left])) {
if (less(value, heap[right])) {
// Both children are greater than the value, break
break;
} else {
// The left child is greater than the value
heap[index] = heap[right];
index = right;
}
} else {
if (less(value, heap[right])) {
// The right child is greater than the value
heap[index] = heap[left];
index = left;
} else {
if (less(heap[left], heap[right])) {
heap[index] = heap[left];
index = left;
} else {
heap[index] = heap[right];
index = right;
}
}
}
} else {
// Only the left child is present
if (less(value, heap[left])) {
break;
} else {
heap[index] = heap[left];
index = left;
}
}
} else {
if (right_present) {
// Only the right child is present
if (less(value, heap[right])) {
break;
} else {
heap[index] = heap[right];
index = right;
}
} else {
// No children are present
break;
}
}
}
heap[index] = value;
}
}
template<typename T, std::int64_t size>
struct array_first_double_less {
double first_double(const std::array<T, size + sizeof(double) / sizeof(T)>& value) const {
double result;
for (std::int64_t i = 0; i < sizeof(double); ++i) {
reinterpret_cast<std::uint8_t*>(&result)[i] = reinterpret_cast<const std::uint8_t*>(&value[0])[i];
}
return result;
}
bool operator()(const std::array<T, size + sizeof(double) / sizeof(T)>& lhs, const std::array<T, size + sizeof(double) / sizeof(T)>& rhs) const {
return first_double(lhs) < first_double(rhs);
}
};
template<std::int64_t max_op_number, std::int64_t n_qubytes, std::int64_t particle_cut>
void find_relative_kernel(
std::int64_t term_index,
std::int64_t batch_index,
std::int64_t term_number,
std::int64_t batch_size,
std::int64_t exclude_size,
const std::array<std::int16_t, max_op_number>* site, // term_number
const std::array<std::uint8_t, max_op_number>* kind, // term_number
const std::array<double, 2>* coef, // term_number
const std::array<std::uint8_t, n_qubytes>* configs, // batch_size
const std::array<double, 2>* psi, // batch_size
const std::array<std::uint8_t, n_qubytes>* exclude_configs, // exclude_size
std::array<std::uint8_t, n_qubytes + sizeof(double) / sizeof(std::uint8_t)>* heap,
std::int64_t heap_size
) {
std::array<std::uint8_t, n_qubytes> current_configs = configs[batch_index];
auto [success, parity] = hamiltonian_apply_kernel<max_op_number, n_qubytes, particle_cut>(
/*current_configs=*/current_configs,
/*term_index=*/term_index,
/*batch_index=*/batch_index,
/*site=*/site,
/*kind=*/kind
);
if (!success) {
return;
}
success = true;
std::int64_t low = 0;
std::int64_t high = exclude_size - 1;
std::int64_t mid = 0;
auto less = array_less<std::uint8_t, n_qubytes>();
while (low <= high) {
mid = (low + high) / 2;
if (less(current_configs, exclude_configs[mid])) {
high = mid - 1;
} else if (less(exclude_configs[mid], current_configs)) {
low = mid + 1;
} else {
success = false;
break;
}
}
if (!success) {
return;
}
std::int8_t sign = parity ? -1 : +1;
double real = sign * (coef[term_index][0] * psi[batch_index][0] - coef[term_index][1] * psi[batch_index][1]);
double imag = sign * (coef[term_index][0] * psi[batch_index][1] + coef[term_index][1] * psi[batch_index][0]);
// Currently, the weight is calculated as the probability of the state, but it can be changed to other values in the future.
double weight = real * real + imag * imag;
std::array<std::uint8_t, n_qubytes + sizeof(double) / sizeof(std::uint8_t)> value;
for (std::int64_t i = 0; i < sizeof(double) / sizeof(uint8_t); ++i) {
value[i] = reinterpret_cast<const std::uint8_t*>(&weight)[i];
}
for (std::int64_t i = 0; i < n_qubytes; ++i) {
value[i + sizeof(double) / sizeof(uint8_t)] = current_configs[i];
}
add_into_heap<std::array<std::uint8_t, n_qubytes + sizeof(double) / sizeof(std::uint8_t)>, array_first_double_less<std::uint8_t, n_qubytes>>(
heap,
heap_size,
value
);
}
template<std::int64_t max_op_number, std::int64_t n_qubytes, std::int64_t particle_cut>
void find_relative_kernel_interface(
std::int64_t term_number,
std::int64_t batch_size,
std::int64_t exclude_size,
const std::array<std::int16_t, max_op_number>* site, // term_number
const std::array<std::uint8_t, max_op_number>* kind, // term_number
const std::array<double, 2>* coef, // term_number
const std::array<std::uint8_t, n_qubytes>* configs, // batch_size
const std::array<double, 2>* psi, // batch_size
const std::array<std::uint8_t, n_qubytes>* exclude_configs, // exclude_size
std::array<std::uint8_t, n_qubytes + sizeof(double) / sizeof(std::uint8_t)>* heap,
std::int64_t heap_size
) {
for (std::int64_t term_index = 0; term_index < term_number; ++term_index) {
for (std::int64_t batch_index = 0; batch_index < batch_size; ++batch_index) {
find_relative_kernel<max_op_number, n_qubytes, particle_cut>(
/*term_index=*/term_index,
/*batch_index=*/batch_index,
/*term_number=*/term_number,
/*batch_size=*/batch_size,
/*exclude_size=*/exclude_size,
/*site=*/site,
/*kind=*/kind,
/*coef=*/coef,
/*configs=*/configs,
/*psi=*/psi,
/*exclude_configs=*/exclude_configs,
/*heap=*/heap,
/*heap_size=*/heap_size
);
}
}
}
template<std::int64_t max_op_number, std::int64_t n_qubytes, std::int64_t particle_cut>
auto find_relative_interface(
const torch::Tensor& configs,
const torch::Tensor& psi,
const std::int64_t count_selected,
const torch::Tensor& site,
const torch::Tensor& kind,
const torch::Tensor& coef,
const torch::Tensor& exclude_configs
) -> torch::Tensor {
std::int64_t device_id = configs.device().index();
std::int64_t batch_size = configs.size(0);
std::int64_t term_number = site.size(0);
std::int64_t exclude_size = exclude_configs.size(0);
TORCH_CHECK(configs.device().type() == torch::kCPU, "configs must be on CPU.")
TORCH_CHECK(configs.device().index() == device_id, "configs must be on the same device as others.");
TORCH_CHECK(configs.is_contiguous(), "configs must be contiguous.")
TORCH_CHECK(configs.dtype() == torch::kUInt8, "configs must be uint8.")
TORCH_CHECK(configs.dim() == 2, "configs must be 2D.")
TORCH_CHECK(configs.size(0) == batch_size, "configs batch size must match the provided batch_size.");
TORCH_CHECK(configs.size(1) == n_qubytes, "configs must have the same number of qubits as the provided n_qubytes.");
TORCH_CHECK(psi.device().type() == torch::kCPU, "psi must be on CPU.")
TORCH_CHECK(psi.device().index() == device_id, "psi must be on the same device as others.");
TORCH_CHECK(psi.is_contiguous(), "psi must be contiguous.")
TORCH_CHECK(psi.dtype() == torch::kFloat64, "psi must be float64.")
TORCH_CHECK(psi.dim() == 2, "psi must be 2D.")
TORCH_CHECK(psi.size(0) == batch_size, "psi batch size must match the provided batch_size.");
TORCH_CHECK(psi.size(1) == 2, "psi must contain 2 elements for each batch.");
TORCH_CHECK(site.device().type() == torch::kCPU, "site must be on CPU.")
TORCH_CHECK(site.device().index() == device_id, "site must be on the same device as others.");
TORCH_CHECK(site.is_contiguous(), "site must be contiguous.")
TORCH_CHECK(site.dtype() == torch::kInt16, "site must be int16.")
TORCH_CHECK(site.dim() == 2, "site must be 2D.")
TORCH_CHECK(site.size(0) == term_number, "site size must match the provided term_number.");
TORCH_CHECK(site.size(1) == max_op_number, "site must match the provided max_op_number.");
TORCH_CHECK(kind.device().type() == torch::kCPU, "kind must be on CPU.")
TORCH_CHECK(kind.device().index() == device_id, "kind must be on the same device as others.");
TORCH_CHECK(kind.is_contiguous(), "kind must be contiguous.")
TORCH_CHECK(kind.dtype() == torch::kUInt8, "kind must be uint8.")
TORCH_CHECK(kind.dim() == 2, "kind must be 2D.")
TORCH_CHECK(kind.size(0) == term_number, "kind size must match the provided term_number.");
TORCH_CHECK(kind.size(1) == max_op_number, "kind must match the provided max_op_number.");
TORCH_CHECK(coef.device().type() == torch::kCPU, "coef must be on CPU.")
TORCH_CHECK(coef.device().index() == device_id, "coef must be on the same device as others.");
TORCH_CHECK(coef.is_contiguous(), "coef must be contiguous.")
TORCH_CHECK(coef.dtype() == torch::kFloat64, "coef must be float64.")
TORCH_CHECK(coef.dim() == 2, "coef must be 2D.")
TORCH_CHECK(coef.size(0) == term_number, "coef size must match the provided term_number.");
TORCH_CHECK(coef.size(1) == 2, "coef must contain 2 elements for each term.");
TORCH_CHECK(exclude_configs.device().type() == torch::kCPU, "configs must be on CPU.")
TORCH_CHECK(exclude_configs.device().index() == device_id, "configs must be on the same device as others.");
TORCH_CHECK(exclude_configs.is_contiguous(), "configs must be contiguous.")
TORCH_CHECK(exclude_configs.dtype() == torch::kUInt8, "configs must be uint8.")
TORCH_CHECK(exclude_configs.dim() == 2, "configs must be 2D.")
TORCH_CHECK(exclude_configs.size(0) == exclude_size, "configs batch size must match the provided exclude_size.");
TORCH_CHECK(exclude_configs.size(1) == n_qubytes, "configs must have the same number of qubits as the provided n_qubytes.");
auto sorted_exclude_configs = exclude_configs.clone(torch::MemoryFormat::Contiguous);
std::sort(
reinterpret_cast<std::array<std::uint8_t, n_qubytes>*>(sorted_exclude_configs.data_ptr()),
reinterpret_cast<std::array<std::uint8_t, n_qubytes>*>(sorted_exclude_configs.data_ptr()) + exclude_size,
array_less<std::uint8_t, n_qubytes>()
);
auto result_pool = torch::zeros(
{count_selected, n_qubytes + sizeof(double) / sizeof(std::uint8_t)},
torch::TensorOptions().dtype(torch::kUInt8).device(device, device_id)
);
std::array<std::uint8_t, n_qubytes + sizeof(double) / sizeof(std::uint8_t)>* heap =
reinterpret_cast<std::array<std::uint8_t, n_qubytes + sizeof(double) / sizeof(std::uint8_t)>*>(result_pool.data_ptr());
find_relative_kernel_interface<max_op_number, n_qubytes, particle_cut>(
/*term_number=*/term_number,
/*batch_size=*/batch_size,
/*exclude_size=*/exclude_size,
/*site=*/reinterpret_cast<const std::array<std::int16_t, max_op_number>*>(site.data_ptr()),
/*kind=*/reinterpret_cast<const std::array<std::uint8_t, max_op_number>*>(kind.data_ptr()),
/*coef=*/reinterpret_cast<const std::array<double, 2>*>(coef.data_ptr()),
/*configs=*/reinterpret_cast<const std::array<std::uint8_t, n_qubytes>*>(configs.data_ptr()),
/*psi=*/reinterpret_cast<const std::array<double, 2>*>(psi.data_ptr()),
/*exclude_configs=*/reinterpret_cast<const std::array<std::uint8_t, n_qubytes>*>(sorted_exclude_configs.data_ptr()),
/*heap=*/heap,
/*heap_size=*/count_selected
);
// Here, the bytes before sizeof(double) / sizeof(std::uint8_t) in result_pool are weights, and the bytes after are configs
// We need to remove items with weight 0, then sort and deduplicate the configs
auto result_config =
result_pool.index({torch::indexing::Slice(), torch::indexing::Slice(sizeof(double) / sizeof(std::uint8_t), torch::indexing::None)});
auto result_weight =
result_pool.index({torch::indexing::Slice(), torch::indexing::Slice(torch::indexing::None, sizeof(double) / sizeof(std::uint8_t))});
auto nonzero = torch::any(result_weight != 0, /*dim=*/1);
auto nonzero_result_config = result_config.index({nonzero});
auto unique_nonzero_result_config =
std::get<0>(torch::unique_dim(/*self=*/nonzero_result_config, /*dim=*/0, /*sorted=*/true, /*return_inverse=*/false, /*return_counts=*/false));
return unique_nonzero_result_config;
}
#ifndef N_QUBYTES
#define N_QUBYTES 0
#endif
#ifndef PARTICLE_CUT
#define PARTICLE_CUT 0
#endif
#if N_QUBYTES != 0
#define QMB_LIBRARY_HELPER(x, y) qmb_hamiltonian_##x##_##y
#define QMB_LIBRARY(x, y) QMB_LIBRARY_HELPER(x, y)
TORCH_LIBRARY_IMPL(QMB_LIBRARY(N_QUBYTES, PARTICLE_CUT), CPU, m) {
m.impl("apply_within", apply_within_interface</*max_op_number=*/4, /*n_qubytes=*/N_QUBYTES, /*particle_cut=*/PARTICLE_CUT>);
m.impl("find_relative", find_relative_interface</*max_op_number=*/4, /*n_qubytes=*/N_QUBYTES, /*particle_cut=*/PARTICLE_CUT>);
}
#undef QMB_LIBRARY
#undef QMB_LIBRARY_HELPER
#endif
} // namespace qmb_hamiltonian_cpu

Sorry, the diff of this file is not supported yet

#include <pybind11/complex.h>
#include <torch/extension.h>
namespace qmb_hamiltonian {
// The `prepare` function is responsible for parsing a raw Python dictionary representing Hamiltonian terms
// and transforming it into a structured tuple of tensors. This tuple is then stored on the Python side
// and utilized in subsequent calls to the PyTorch operators for further processing.
//
// The function takes a Python dictionary `hamiltonian` as input, where each key-value pair represents a term
// in the Hamiltonian. The key is a tuple of tuples, where each inner tuple contains two elements:
// - The first element is an integer representing the site index of the operator.
// - The second element is an integer representing the type of operator (0 for annihilation, 1 for creation).
// The value is either a float or a complex number representing the coefficient of the term.
//
// The function processes the dictionary and constructs three tensors:
// - `site`: An int16 tensor of shape [term_number, max_op_number], representing the site indices of the operators for
// each term.
// - `kind`: An uint8 tensor of shape [term_number, max_op_number], representing the type of operator for each term.
// The value are encoded as follows:
// - 0: Annihilation operator
// - 1: Creation operator
// - 2: Empty (identity operator)
// - `coef`: A float64 tensor of shape [term_number, 2], representing the coefficients of each term, with two elements
// for real and imaginary parts.
//
// The `max_op_number` template argument specifies the maximum number of operators per term, typically set to 4 for
// 2-body interactions.
template<std::int64_t max_op_number>
auto prepare(py::dict hamiltonian) {
std::int64_t term_number = hamiltonian.size();
auto site = torch::empty({term_number, max_op_number}, torch::TensorOptions().dtype(torch::kInt16).device(torch::kCPU));
// No need to initialize
auto kind = torch::full({term_number, max_op_number}, 2, torch::TensorOptions().dtype(torch::kUInt8).device(torch::kCPU));
// Initialize to 2 for identity as default
auto coef = torch::empty({term_number, 2}, torch::TensorOptions().dtype(torch::kFloat64).device(torch::kCPU));
// No need to initialize
auto site_accessor = site.accessor<std::int16_t, 2>();
auto kind_accessor = kind.accessor<std::uint8_t, 2>();
auto coef_accessor = coef.accessor<double, 2>();
std::int64_t index = 0;
for (auto& item : hamiltonian) {
auto key = item.first.cast<py::tuple>();
auto value_is_float = py::isinstance<py::float_>(item.second);
auto value = value_is_float ? std::complex<double>(item.second.cast<double>()) : item.second.cast<std::complex<double>>();
std::int64_t op_number = key.size();
for (std::int64_t i = 0; i < op_number; ++i) {
auto tuple = key[i].cast<py::tuple>();
site_accessor[index][i] = tuple[0].cast<std::int16_t>();
kind_accessor[index][i] = tuple[1].cast<std::uint8_t>();
}
coef_accessor[index][0] = value.real();
coef_accessor[index][1] = value.imag();
++index;
}
return std::make_tuple(site, kind, coef);
}
#ifndef N_QUBYTES
#define N_QUBYTES 0
#endif
#ifndef PARTICLE_CUT
#define PARTICLE_CUT 0
#endif
#if N_QUBYTES == 0
// Expose the `prepare` function to Python.
PYBIND11_MODULE(qmb_hamiltonian, m) {
m.def("prepare", prepare</*max_op_number=*/4>, py::arg("hamiltonian"));
}
#endif
#if N_QUBYTES != 0
#define QMB_LIBRARY_HELPER(x, y) qmb_hamiltonian_##x##_##y
#define QMB_LIBRARY(x, y) QMB_LIBRARY_HELPER(x, y)
TORCH_LIBRARY_FRAGMENT(QMB_LIBRARY(N_QUBYTES, PARTICLE_CUT), m) {
m.def("apply_within(Tensor configs_i, Tensor psi_i, Tensor configs_j, Tensor site, Tensor kind, Tensor coef) -> Tensor");
m.def("find_relative(Tensor configs_i, Tensor psi_i, int count_selected, Tensor site, Tensor kind, Tensor coef, Tensor configs_exclude) -> Tensor"
);
m.def("single_relative(Tensor configs, Tensor site, Tensor kind, Tensor coef) -> Tensor");
}
#undef QMB_LIBRARY
#undef QMB_LIBRARY_HELPER
#endif
} // namespace qmb_hamiltonian
"""
This file implements an auto-regressive transformers network with the sampling method introduced in https://arxiv.org/pdf/2408.07625.
This network makes use of DeepSeekMoE architecture introduced in https://arxiv.org/pdf/2401.06066.
"""
import typing
import torch
from .bitspack import pack_int, unpack_int
class FeedForward(torch.nn.Module):
"""
A feed-forward layer for transformer architectures.
"""
def __init__(self, embedding_dim: int, hidden_dim: int) -> None:
super().__init__()
self.model: torch.nn.Module = torch.nn.Sequential(
torch.nn.Linear(embedding_dim, hidden_dim),
torch.nn.GELU(),
torch.nn.Linear(hidden_dim, embedding_dim),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Forward pass of the feed-forward layer.
"""
# x: batch * site * embedding
x = self.model(x)
# x: batch * site * embedding
return x
class SelfAttention(torch.nn.Module):
"""
Self-attention unit with support for key-value cache.
"""
def __init__(self, embedding_dim: int, heads_num: int) -> None:
super().__init__()
self.heads_num: int = heads_num
self.heads_dim: int = embedding_dim // heads_num
assert self.heads_num * self.heads_dim == embedding_dim
self.qkv: torch.nn.Module = torch.nn.Linear(embedding_dim, 3 * embedding_dim)
self.out: torch.nn.Module = torch.nn.Linear(embedding_dim, embedding_dim)
def forward(
self,
x: torch.Tensor,
kv_cache: typing.Optional[tuple[torch.Tensor, torch.Tensor]],
mask: typing.Optional[torch.Tensor],
) -> tuple[torch.Tensor, tuple[torch.Tensor, torch.Tensor]]:
"""
Forward pass of the self-attention layer.
"""
# x: batch * site * embedding
batch_size, sites, embedding_dim = x.shape
q, k, v = self.qkv(x).split(embedding_dim, dim=-1)
# q, k, v: batch * site * embedding
q = q.view([batch_size, sites, self.heads_num, self.heads_dim])
k = k.view([batch_size, sites, self.heads_num, self.heads_dim])
v = v.view([batch_size, sites, self.heads_num, self.heads_dim])
# q, k, v: batch, site, heads_num, heads_dim
q = q.transpose(1, 2)
k = k.transpose(1, 2)
v = v.transpose(1, 2)
# q, k, v: batch, heads_num, site, heads_dim
if kv_cache is not None:
k_cache, v_cache = kv_cache
k = torch.cat([k_cache, k], dim=2)
v = torch.cat([v_cache, v], dim=2)
# q: batch, heads_num, site, heads_dim
# k, v: batch, heads_num, total_site, heads_dim
if mask is None:
total_sites = k.shape[-2]
mask = torch.ones(sites, total_sites, dtype=torch.bool, device=x.device).tril(diagonal=total_sites - sites)
# call scaled dot product attention
attn = torch.nn.functional.scaled_dot_product_attention(q, k, v, attn_mask=mask) # pylint: disable=not-callable
# attn: batch, heads_num, site, heads_dim
out = attn.transpose(1, 2).reshape([batch_size, sites, embedding_dim])
# out: batch, site, embedding_dim
return self.out(out), (k, v)
class DecoderUnit(torch.nn.Module):
"""
A decoder unit within the transformer architecture, integrating both self-attention and feed-forward layers.
"""
# pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments
self,
*,
embedding_dim: int,
heads_num: int,
feed_forward_dim: int,
shared_num: int,
routed_num: int,
selected_num: int,
) -> None:
super().__init__()
self.shared_num = shared_num
self.routed_num = routed_num
self.selected_num = selected_num
self.attention: torch.nn.Module = SelfAttention(embedding_dim, heads_num)
self.norm1: torch.nn.Module = torch.nn.LayerNorm(embedding_dim)
self.feed_forward_shared: torch.nn.ModuleList = torch.nn.ModuleList([FeedForward(embedding_dim, feed_forward_dim) for _ in range(shared_num)])
self.feed_forward_routed: torch.nn.ModuleList = torch.nn.ModuleList([FeedForward(embedding_dim, feed_forward_dim) for _ in range(routed_num)])
self.centroid: torch.nn.Parameter = torch.nn.Parameter(torch.randn(routed_num, embedding_dim))
self.norm2: torch.nn.Module = torch.nn.LayerNorm(embedding_dim)
self.bias: torch.Tensor
self.register_buffer("bias", torch.zeros([self.routed_num]))
self.accumulater: torch.Tensor
self.register_buffer("accumulater", torch.zeros([self.routed_num]))
self.count: torch.Tensor
self.register_buffer("count", torch.zeros([]))
def forward(
self,
x: torch.Tensor,
kv_cache: typing.Optional[tuple[torch.Tensor, torch.Tensor]],
mask: typing.Optional[torch.Tensor],
) -> tuple[torch.Tensor, tuple[torch.Tensor, torch.Tensor]]:
"""
Forward pass of the decoder unit.
"""
# x, y: batch * site * embedding
y, result_cache = self.attention(x, kv_cache, mask)
x = self.norm1(x + y)
# The following segmental code implements the DeepSeekMoE architecture.
y = x
for i, expert in enumerate(self.feed_forward_shared):
y = y + expert(x)
# similarity: batch * site * routed
similarity = torch.nn.functional.softmax(x @ self.centroid.t(), dim=-1)
# top_k_indices: batch * site * selected
_, top_k_indices = torch.topk(similarity + self.bias, self.selected_num, dim=-1)
# gate_prime, gate: batch * site * routed
gate_prime = torch.zeros_like(similarity).scatter_(-1, top_k_indices, similarity.gather(-1, top_k_indices))
gate = gate_prime / gate_prime.sum(dim=-1).unsqueeze(-1)
for i, expert in enumerate(self.feed_forward_routed):
y = y + expert(x) * gate[:, :, i].unsqueeze(-1)
x = self.norm2(y)
if self.training:
self.accumulater = self.accumulater + similarity.sum([0, 1])
self.count = self.count + similarity.size(0) * similarity.size(1)
return x, result_cache
class Transformers(torch.nn.Module):
"""
A transformer model consisting of multiple decoder units.
"""
def __init__( # pylint: disable=too-many-arguments
self,
*,
embedding_dim: int,
heads_num: int,
feed_forward_dim: int,
shared_num: int,
routed_num: int,
selected_num: int,
depth: int,
) -> None:
super().__init__()
self.layers: torch.nn.ModuleList = torch.nn.ModuleList(
DecoderUnit(
embedding_dim=embedding_dim,
heads_num=heads_num,
feed_forward_dim=feed_forward_dim,
shared_num=shared_num,
routed_num=routed_num,
selected_num=selected_num,
) for _ in range(depth))
def forward(
self,
x: torch.Tensor,
kv_cache: typing.Optional[list[tuple[torch.Tensor, torch.Tensor]]],
mask: typing.Optional[torch.Tensor],
) -> tuple[torch.Tensor, list[tuple[torch.Tensor, torch.Tensor]]]:
"""
Forward pass of the transformer model.
"""
# x: batch * site * embedding
result_cache: list[tuple[torch.Tensor, torch.Tensor]] = []
for i, layer in enumerate(self.layers):
if kv_cache is None:
x, cache = layer(x, None, mask)
else:
x, cache = layer(x, kv_cache[i], mask)
result_cache.append(cache)
return x, result_cache
class Tail(torch.nn.Module):
"""
The Tail layer for the transformer model, responsible for mapping the final embeddings to the desired output dimensions.
"""
def __init__(self, embedding_dim: int, hidden_dim: int, output_dim: int) -> None:
super().__init__()
self.model: torch.nn.Module = torch.nn.Sequential(
torch.nn.Linear(embedding_dim, hidden_dim),
torch.nn.GELU(),
torch.nn.Linear(hidden_dim, output_dim),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Forward pass of the Tail layer.
"""
# x: batch * site * embedding_dim
x = self.model(x)
# x: batch * site * output_dim
return x
class Embedding(torch.nn.Module):
"""
Embedding layer for transforming input data into a format suitable for the transformer model.
"""
def __init__(self, sites: int, physical_dim: int, embedding_dim: int) -> None:
super().__init__()
self.parameter: torch.nn.Parameter = torch.nn.Parameter(torch.randn([sites, physical_dim, embedding_dim]))
self.sites: int = sites
self.physical_dim: int = physical_dim
self.embedding_dim: int = embedding_dim
def forward(self, x: torch.Tensor, base: int) -> torch.Tensor:
"""
Forward pass of the Embedding layer.
"""
# x: batch * sites
batch, sites = x.shape
x = x.unsqueeze(-1).unsqueeze(-1).expand(-1, -1, -1, self.embedding_dim)
# x: batch * sites * config=1 * embedding
parameter = self.parameter[base:base + sites].unsqueeze(0).expand(batch, -1, -1, -1)
# parameter: batch * sites * config * embedding
result = torch.gather(parameter, -2, x.to(dtype=torch.int64))
# result: batch * site * 1 * embedding
return result.squeeze(-2)
class WaveFunctionElectronUpDown(torch.nn.Module):
"""
The wave function for the transformers network.
This module maintains the conservation of particle number of spin-up and spin-down electrons.
"""
# pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments
self,
*,
double_sites: int, # Number of qubits, where each pair of qubits represents a site
physical_dim: int, # Dimension of the physical space, which is always 2
is_complex: bool, # Indicates whether the wave function is complex-valued, which is always true
spin_up: int, # Number of spin-up electrons
spin_down: int, # Number of spin-down electrons
embedding_dim: int, # Dimension of the embedding space used in the transformer layers
heads_num: int, # Number of attention heads in the transformer's self-attention mechanism
feed_forward_dim: int, # Dimension of the feed-forward network within the transformer layers
shared_num: int, # Number of the shared expert in the DeepSeekMoE architecture
routed_num: int, # Number of the routed expert in the DeepSeekMoE architecture
selected_num: int, # Number of the selected expert in the DeepSeekMoE architecture
depth: int, # Number of decoder layers in the transformer model
ordering: int | list[int], # Ordering of sites: +1 for normal order, -1 for reversed order, or a custom order list
) -> None:
super().__init__()
assert double_sites % 2 == 0
self.double_sites: int = double_sites
self.sites: int = double_sites // 2
assert physical_dim == 2
assert is_complex == True # pylint: disable=singleton-comparison
self.spin_up: int = spin_up
self.spin_down: int = spin_down
self.embedding_dim: int = embedding_dim
self.heads_num: int = heads_num
self.feed_forward_dim: int = feed_forward_dim
self.shared_num: int = shared_num
self.routed_num: int = routed_num
self.selected_num: int = selected_num
self.depth: int = depth
# Embed configurations for each site, considering the four possible states of two qubits.
self.embedding: torch.nn.Module = Embedding(self.sites, 4, self.embedding_dim)
# Main body of the wave function computation.
self.transformers: torch.nn.Module = Transformers(
embedding_dim=self.embedding_dim,
heads_num=self.heads_num,
feed_forward_dim=self.feed_forward_dim,
shared_num=self.shared_num,
routed_num=self.routed_num,
selected_num=self.selected_num,
depth=self.depth,
)
# Tail layer mapping from embedding space to amplitude and phase space.
# (amplitude and phase) * (4 possible states)
self.tail: torch.nn.Module = Tail(self.embedding_dim, self.feed_forward_dim, 8)
# Site Ordering Configuration
# +1 for normal order, -1 for reversed order
if isinstance(ordering, int) and ordering == +1:
ordering = list(range(self.sites))
if isinstance(ordering, int) and ordering == -1:
ordering = list(reversed(range(self.sites)))
self.ordering: torch.Tensor
self.register_buffer("ordering", torch.tensor(ordering, dtype=torch.int64))
self.ordering_reversed: torch.Tensor
self.register_buffer("ordering_reversed", torch.scatter(torch.zeros(self.sites, dtype=torch.int64), 0, self.ordering, torch.arange(self.sites, dtype=torch.int64)))
# Dummy Parameter for Device and Dtype Retrieval
# This parameter is used to infer the device and dtype of the model.
self.dummy_param = torch.nn.Parameter(torch.empty(0))
@torch.jit.export
def _mask(self, x: torch.Tensor) -> torch.Tensor:
"""
Determined whether we could append spin up or spin down after uncompleted configurations.
"""
# pylint: disable=too-many-locals
# x: batch_size * current_site * 2
# x represents the uncompleted configurations
current_site = x.shape[1]
# number: batch_size * 2
# number denotes the total electron count for each uncompleted configuration
number = x.sum(dim=1)
# up/down_electron/hole: batch_size
# These variables store the count of electrons and holes for each uncompleted configuration
up_electron = number[:, 0]
down_electron = number[:, 1]
up_hole = current_site - up_electron
down_hole = current_site - down_electron
# add_up/down_electron/hole: batch_size
# These variables determine whether it is possible to append an up/down electron/hole
add_up_electron = up_electron < self.spin_up
add_down_electron = down_electron < self.spin_down
add_up_hole = up_hole < self.sites - self.spin_up
add_down_hole = down_hole < self.sites - self.spin_down
# add_up: batch_size * 2 * 1
# add_down: batch_size * 1 * 2
# These tensors represent the feasibility of adding up/down electrons/holes
add_up = torch.stack([add_up_hole, add_up_electron], dim=-1).unsqueeze(-1)
add_down = torch.stack([add_down_hole, add_down_electron], dim=-1).unsqueeze(-2)
# add: batch_size * 2 * 2
# add represents the logical AND of add_up and add_down, indicating the feasibility of appending specific electron/hole combinations
# add[_, 0, 0] indicates the possibility of adding an up hole and a down hole
# add[_, 0, 1] indicates the possibility of adding an up hole and a down electron
# add[_, 1, 0] indicates the possibility of adding an up electron and a down hole
# add[_, 1, 1] indicates the possibility of adding an up electron and a down electron
add = torch.logical_and(add_up, add_down)
return add
@torch.jit.export
def _normalize_amplitude(self, x: torch.Tensor) -> torch.Tensor:
"""
Normalize the log amplitude of uncompleted configurations.
"""
# x : ... * 2 * 2
# param : ...
param = (2 * x).exp().sum(dim=[-2, -1]).log() / 2
x = x - param.unsqueeze(-1).unsqueeze(-1)
# 1 = param = sqrt(sum(x.exp()^2)) now
return x
@torch.jit.export
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Compute the wave function psi for the given configurations.
"""
device: torch.device = self.dummy_param.device
batch_size: int = x.shape[0]
# x : batch_size * sites * 2
x = unpack_int(x, size=1, last_dim=self.double_sites).view([batch_size, self.sites, 2])
# Apply ordering
x = torch.index_select(x, 1, self.ordering_reversed)
# x4: batch_size * sites
# Prepare x4 as the input for the neural network
# The last configuration is excluded, and a zero vector is prepended at the beginning to represent the initial state.
x4: torch.Tensor = x[:, :-1, 0] * 2 + x[:, :-1, 1]
x4 = torch.cat([torch.zeros([batch_size, 1], device=device, dtype=torch.uint8), x4], dim=1)
# emb: batch_size * sites * embedding
# Embed the input tensor `x4` using the embedding layer.
# In embedding layer, 0 for bos, 1 for site 0, ...
emb: torch.Tensor = self.embedding(x4, 0)
# post_transformers: batch_size * sites * embedding
post_transformers, _ = self.transformers(emb, None, None)
# tail: batch_size * sites * 8
tail: torch.Tensor = self.tail(post_transformers)
# amplitude/phase : batch_size * sites * 2 * 2
amplitude: torch.Tensor = tail[:, :, :4].view(batch_size, self.sites, 2, 2)
phase: torch.Tensor = tail[:, :, 4:].view(batch_size, self.sites, 2, 2)
# Apply a filter mask to the amplitude to ensure the conservation of particle number.
amplitude = amplitude + torch.stack([torch.where(self._mask(x[:, :i]), 0, -torch.inf) for i in range(self.sites)], dim=1)
# Normalize the delta amplitude.
amplitude = self._normalize_amplitude(amplitude)
# batch/sites_indices: batch_size * sites
batch_indices: torch.Tensor = torch.arange(batch_size).unsqueeze(1).expand(-1, self.sites)
sites_indices: torch.Tensor = torch.arange(self.sites).unsqueeze(0).expand(batch_size, -1)
# selected_amplitude/phase: batch_size * sites
x_int32: torch.Tensor = x.to(dtype=torch.int32)
selected_amplitude: torch.Tensor = amplitude[batch_indices, sites_indices, x_int32[:, :, 0], x_int32[:, :, 1]]
selected_phase: torch.Tensor = phase[batch_indices, sites_indices, x_int32[:, :, 0], x_int32[:, :, 1]]
return torch.view_as_complex(torch.stack([selected_amplitude.double().sum(dim=1), selected_phase.double().sum(dim=1)], dim=-1)).exp()
@torch.jit.export
def _blocked_forward_for_generate_unique( # pylint: disable=too-many-arguments
self,
*,
x: torch.Tensor,
cache_input: typing.Optional[list[tuple[torch.Tensor, torch.Tensor]]],
block_num: int,
device: torch.device,
i: int,
) -> tuple[torch.Tensor, list[tuple[torch.Tensor, torch.Tensor]]]:
# pylint: disable=too-many-locals
local_batch_size: int = x.size(0)
local_batch_size_block = local_batch_size // block_num
remainder = local_batch_size % block_num
tail_list: list[torch.Tensor] = []
cache_list: list[list[tuple[torch.Tensor, torch.Tensor]]] = []
for j in range(block_num):
if j < remainder:
current_local_batch_size_block = local_batch_size_block + 1
else:
current_local_batch_size_block = local_batch_size_block
start_index = j * local_batch_size_block + min(j, remainder)
end_index = start_index + current_local_batch_size_block
batch_indices_block = torch.arange(start_index, end_index, device=device, dtype=torch.int64)
x_block: torch.Tensor = x[batch_indices_block]
xi_block: torch.Tensor = x_block[:, -1:, :]
xi4_block: torch.Tensor = xi_block[:, :, 0] * 2 + xi_block[:, :, 1]
emb_block: torch.Tensor = self.embedding(xi4_block, i)
if cache_input is None:
post_transformers_block, cache_block = self.transformers(emb_block, None, None)
else:
cache_block_input = [(cache_layer[0][batch_indices_block], cache_layer[1][batch_indices_block]) for cache_layer in cache_input]
post_transformers_block, cache_block = self.transformers(emb_block, cache_block_input, None)
tail_block: torch.Tensor = self.tail(post_transformers_block)
tail_list.append(tail_block)
cache_list.append(cache_block)
tail: torch.Tensor = torch.cat(tail_list)
cache: list[tuple[torch.Tensor, torch.Tensor]] = [
(torch.cat([cache_block[depth][0] for cache_block in cache_list]), torch.cat([cache_block[depth][1] for cache_block in cache_list])) for depth in range(self.depth)
]
return tail, cache
@torch.jit.export
def generate_unique(self, batch_size: int, block_num: int = 1) -> tuple[torch.Tensor, torch.Tensor, None, None]:
"""
Generate configurations uniquely.
see https://arxiv.org/pdf/2408.07625.
"""
# pylint: disable=too-many-locals
# pylint: disable=invalid-name
# pylint: disable=too-many-statements
device: torch.device = self.dummy_param.device
dtype: torch.dtype = self.dummy_param.dtype
cache: typing.Optional[list[tuple[torch.Tensor, torch.Tensor]]] = None
# x: local_batch_size * current_site * 2
x: torch.Tensor = torch.zeros([1, 1, 2], device=device, dtype=torch.uint8) # site=1, since the first is bos
# (un)perturbed_log_probability: local_batch_size
unperturbed_probability: torch.Tensor = torch.tensor([0], dtype=dtype, device=device)
perturbed_probability: torch.Tensor = torch.tensor([0], dtype=dtype, device=device)
for i in range(self.sites):
local_batch_size: int = x.size(0)
# # xi: batch * (sites=1) * 2
# xi: torch.Tensor = x[:, -1:, :]
# # xi4: batch * (sites=1)
# xi4: torch.Tensor = xi[:, :, 0] * 2 + xi[:, :, 1]
# # emb: batch * (sites=1) * embedding
# emb: torch.Tensor = self.embedding(xi4, i)
# # post_transformers: batch * (sites=1) * embedding
# post_transformers, cache = self.transformers(emb, cache, None)
# # tail: batch * (sites=1) * 8
# tail: torch.Tensor = self.tail(post_transformers)
tail, cache = self._blocked_forward_for_generate_unique(x=x, cache_input=cache, block_num=block_num, device=device, i=i)
# The first 4 item are amplitude
# delta_amplitude: batch * 2 * 2
# delta_amplitude represents the amplitude changes for the configurations at the new site.
delta_amplitude: torch.Tensor = tail[:, :, :4].view([local_batch_size, 2, 2])
# Apply a filter mask to the amplitude to ensure the conservation of particle number.
delta_amplitude = delta_amplitude + torch.where(self._mask(x[:, 1:]), 0, -torch.inf)
# normalized_delta_amplitude: batch_size * 2 * 2
# Normalize the delta amplitude.
normalized_delta_amplitude: torch.Tensor = self._normalize_amplitude(delta_amplitude)
# The delta unperturbed prob for all batch and 4 adds
l: torch.Tensor = (2 * normalized_delta_amplitude).view([local_batch_size, 4])
# and add to get the current unperturbed prob
l = unperturbed_probability.view([local_batch_size, 1]) + l
# Get perturbed prob by adding GUMBEL(0)
L: torch.Tensor = l - (-torch.rand_like(l).log()).log()
# Get max perturbed prob
Z: torch.Tensor = L.max(dim=-1).values.view([local_batch_size, 1])
# Evaluate the conditioned prob
tildeL: torch.Tensor = -torch.log(torch.exp(-perturbed_probability.view([local_batch_size, 1])) - torch.exp(-Z) + torch.exp(-L))
assert cache is not None
# Calculate appended configurations for 4 adds
# local_batch_size * current_site * 2 + local_batch_size * 1 * 2
x0: torch.Tensor = torch.cat([x, torch.tensor([[0, 0]], device=device, dtype=torch.uint8).expand(local_batch_size, -1, -1)], dim=1)
x1: torch.Tensor = torch.cat([x, torch.tensor([[0, 1]], device=device, dtype=torch.uint8).expand(local_batch_size, -1, -1)], dim=1)
x2: torch.Tensor = torch.cat([x, torch.tensor([[1, 0]], device=device, dtype=torch.uint8).expand(local_batch_size, -1, -1)], dim=1)
x3: torch.Tensor = torch.cat([x, torch.tensor([[1, 1]], device=device, dtype=torch.uint8).expand(local_batch_size, -1, -1)], dim=1)
# Cat all configurations to get x : new_local_batch_size * (current_size+1) * 2
# (un)perturbed prob : new_local_batch_size
x = torch.cat([x0, x1, x2, x3])
unperturbed_probability = l.permute(1, 0).reshape([4 * local_batch_size])
perturbed_probability = tildeL.permute(1, 0).reshape([4 * local_batch_size])
cache_indices = torch.arange(local_batch_size, device=device, dtype=torch.int64).unsqueeze(0).expand(4, -1).reshape([4 * local_batch_size])
# kv cache: batch * heads_num * site * heads_dim, so just repeat first dimension
# Filter results, only use largest batch_size ones
selected = perturbed_probability.argsort(descending=True)[:batch_size]
x = x[selected]
unperturbed_probability = unperturbed_probability[selected]
perturbed_probability = perturbed_probability[selected]
cache_indices = cache_indices[selected]
# If prob = 0, filter it forcely
selected = perturbed_probability.isfinite()
x = x[selected]
unperturbed_probability = unperturbed_probability[selected]
perturbed_probability = perturbed_probability[selected]
cache_indices = cache_indices[selected]
cache = [(cache_layer[0][cache_indices], cache_layer[1][cache_indices]) for cache_layer in cache]
# Apply ordering
x = torch.index_select(x[:, 1:], 1, self.ordering)
# Flatten site part of x
x = x.view([x.size(0), self.double_sites])
# It should return configurations, amplitudes, probabilities and multiplicities.
# But it is unique generator, so the last two fields are None
x = pack_int(x, size=1)
# Calculate the amplitude for the generated configurations in the batch.
real_batch_size = len(x)
real_batch_size_block = real_batch_size // block_num
remainder = real_batch_size % block_num
amplitude_list = []
for j in range(block_num):
if j < remainder:
current_real_batch_size_block = real_batch_size_block + 1
else:
current_real_batch_size_block = real_batch_size_block
start_index = j * real_batch_size_block + min(j, remainder)
end_index = start_index + current_real_batch_size_block
batch_indices_block = torch.arange(start_index, end_index, device=device, dtype=torch.int64)
amplitude_block = self(x[batch_indices_block])
amplitude_list.append(amplitude_block)
amplitude: torch.Tensor = torch.cat(amplitude_list)
return x, amplitude, None, None
class WaveFunctionNormal(torch.nn.Module):
"""
The wave function for the transformers model.
This module does not maintain any conservation.
"""
# pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments
self,
*,
sites: int, # Number of qubits
physical_dim: int, # Dimension of the physical space,
is_complex: bool, # Indicates whether the wave function is complex-valued, which is always true
embedding_dim: int, # Dimension of the embedding space used in the transformer layers
heads_num: int, # Number of attention heads in the transformer's self-attention mechanism
feed_forward_dim: int, # Dimension of the feed-forward network within the transformer layers
shared_num: int, # Number of the shared expert in the DeepSeekMoE architecture
routed_num: int, # Number of the routed expert in the DeepSeekMoE architecture
selected_num: int, # Number of the selected expert in the DeepSeekMoE architecture
depth: int, # Number of decoder layers in the transformer model
ordering: int | list[int], # Ordering of sites: +1 for normal order, -1 for reversed order, or a custom order list
) -> None:
super().__init__()
self.sites: int = sites
self.physical_dim: int = physical_dim
assert is_complex == True # pylint: disable=singleton-comparison
self.embedding_dim: int = embedding_dim
self.heads_num: int = heads_num
self.feed_forward_dim: int = feed_forward_dim
self.shared_num: int = shared_num
self.routed_num: int = routed_num
self.selected_num: int = selected_num
self.depth: int = depth
# Embed configurations for each site, considering the physical_dim possible states of each qubit.
self.embedding: torch.nn.Module = Embedding(self.sites, self.physical_dim, self.embedding_dim)
# Main body of the wave function computation.
self.transformers: torch.nn.Module = Transformers(
embedding_dim=self.embedding_dim,
heads_num=self.heads_num,
feed_forward_dim=self.feed_forward_dim,
shared_num=self.shared_num,
routed_num=self.routed_num,
selected_num=self.selected_num,
depth=self.depth,
)
# Tail layer mapping from embedding space to amplitude and phase space.
# (amplitude and phase) * (all possible states)
self.tail: torch.nn.Module = Tail(self.embedding_dim, self.feed_forward_dim, 2 * self.physical_dim)
# Site Ordering Configuration
# +1 for normal order, -1 for reversed order
if isinstance(ordering, int) and ordering == +1:
ordering = list(range(self.sites))
if isinstance(ordering, int) and ordering == -1:
ordering = list(reversed(range(self.sites)))
self.ordering: torch.Tensor
self.register_buffer("ordering", torch.tensor(ordering, dtype=torch.int64))
self.ordering_reversed: torch.Tensor
self.register_buffer("ordering_reversed", torch.scatter(torch.zeros(self.sites, dtype=torch.int64), 0, self.ordering, torch.arange(self.sites, dtype=torch.int64)))
# Dummy Parameter for Device and Dtype Retrieval
# This parameter is used to infer the device and dtype of the model.
self.dummy_param = torch.nn.Parameter(torch.empty(0))
@torch.jit.export
def _normalize_amplitude(self, x: torch.Tensor) -> torch.Tensor:
"""
Normalize the log amplitude of uncompleted configurations.
"""
# x : ... * physical_dim
# param : ...
param = (2 * x).exp().sum(dim=[-1]).log() / 2
x = x - param.unsqueeze(-1)
# 1 = param = sqrt(sum(x.exp()^2)) now
return x
@torch.jit.export
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Compute the wave function psi for the given configurations.
"""
device: torch.device = self.dummy_param.device
batch_size: int = x.shape[0]
# x : batch_size * sites
x = unpack_int(x, size=self._bit_size(), last_dim=self.sites).view([batch_size, self.sites])
# Apply ordering
x = torch.index_select(x, 1, self.ordering_reversed)
# x_for_emb: batch_size * sites
# Prepare x for emb as the input for the neural network
# The last configuration is excluded, and a zero vector is prepended at the beginning to represent the initial state.
x_for_emb: torch.Tensor = x[:, :-1]
x_for_emb = torch.cat([torch.zeros([batch_size, 1], device=device, dtype=torch.uint8), x_for_emb], dim=1)
# emb: batch_size * sites * embedding
# Embed the input tensor `x_for_emb` using the embedding layer.
# In embedding layer, 0 for bos, 1 for site 0, ...
emb: torch.Tensor = self.embedding(x_for_emb, 0)
# post_transformers: batch_size * sites * embedding
post_transformers, _ = self.transformers(emb, None, None)
# tail: batch_size * sites * (2*physical_dim)
tail: torch.Tensor = self.tail(post_transformers)
# amplitude/phase : batch_size * sites * physical_dim
amplitude: torch.Tensor = tail[:, :, :self.physical_dim]
phase: torch.Tensor = tail[:, :, self.physical_dim:]
# Normalize the delta amplitude.
amplitude = self._normalize_amplitude(amplitude)
# batch/sites_indices: batch_size * sites
batch_indices: torch.Tensor = torch.arange(batch_size).unsqueeze(1).expand(-1, self.sites)
sites_indices: torch.Tensor = torch.arange(self.sites).unsqueeze(0).expand(batch_size, -1)
# selected_amplitude/phase: batch_size * sites
x_int32: torch.Tensor = x.to(dtype=torch.int32)
selected_amplitude: torch.Tensor = amplitude[batch_indices, sites_indices, x_int32]
selected_phase: torch.Tensor = phase[batch_indices, sites_indices, x_int32]
return torch.view_as_complex(torch.stack([selected_amplitude.double().sum(dim=1), selected_phase.double().sum(dim=1)], dim=-1)).exp()
@torch.jit.export
def _blocked_forward_for_generate_unique( # pylint: disable=too-many-arguments
self,
*,
x: torch.Tensor,
cache_input: typing.Optional[list[tuple[torch.Tensor, torch.Tensor]]],
block_num: int,
device: torch.device,
i: int,
) -> tuple[torch.Tensor, list[tuple[torch.Tensor, torch.Tensor]]]:
# pylint: disable=too-many-locals
local_batch_size: int = x.size(0)
local_batch_size_block = local_batch_size // block_num
remainder = local_batch_size % block_num
tail_list: list[torch.Tensor] = []
cache_list: list[list[tuple[torch.Tensor, torch.Tensor]]] = []
for j in range(block_num):
if j < remainder:
current_local_batch_size_block = local_batch_size_block + 1
else:
current_local_batch_size_block = local_batch_size_block
start_index = j * local_batch_size_block + min(j, remainder)
end_index = start_index + current_local_batch_size_block
batch_indices_block = torch.arange(start_index, end_index, device=device, dtype=torch.int64)
x_block: torch.Tensor = x[batch_indices_block]
xi_block: torch.Tensor = x_block[:, -1:]
emb_block: torch.Tensor = self.embedding(xi_block, i)
if cache_input is None:
post_transformers_block, cache_block = self.transformers(emb_block, None, None)
else:
cache_block_input = [(cache_layer[0][batch_indices_block], cache_layer[1][batch_indices_block]) for cache_layer in cache_input]
post_transformers_block, cache_block = self.transformers(emb_block, cache_block_input, None)
tail_block: torch.Tensor = self.tail(post_transformers_block)
tail_list.append(tail_block)
cache_list.append(cache_block)
tail: torch.Tensor = torch.cat(tail_list)
cache: list[tuple[torch.Tensor, torch.Tensor]] = [
(torch.cat([cache_block[depth][0] for cache_block in cache_list]), torch.cat([cache_block[depth][1] for cache_block in cache_list])) for depth in range(self.depth)
]
return tail, cache
@torch.jit.export
def generate_unique(self, batch_size: int, block_num: int = 1) -> tuple[torch.Tensor, torch.Tensor, None, None]:
"""
Generate configurations uniquely.
see https://arxiv.org/pdf/2408.07625.
"""
# pylint: disable=too-many-locals
# pylint: disable=invalid-name
# pylint: disable=too-many-statements
device: torch.device = self.dummy_param.device
dtype: torch.dtype = self.dummy_param.dtype
cache: typing.Optional[list[tuple[torch.Tensor, torch.Tensor]]] = None
# x: local_batch_size * current_site
x: torch.Tensor = torch.zeros([1, 1], device=device, dtype=torch.uint8) # site=1, since the first is bos
# (un)perturbed_log_probability: local_batch_size
unperturbed_probability: torch.Tensor = torch.tensor([0], dtype=dtype, device=device)
perturbed_probability: torch.Tensor = torch.tensor([0], dtype=dtype, device=device)
for i in range(self.sites):
local_batch_size: int = x.size(0)
# # xi: batch * (sites=1)
# xi: torch.Tensor = x[:, -1:]
# # emb: batch * (sites=1) * embedding
# emb: torch.Tensor = self.embedding(xi, i)
# # post_transformers: batch * (sites=1) * embedding
# post_transformers, cache = self.transformers(emb, cache, None)
# # tail: batch * (sites=1) * (2*physical_dim)
# tail: torch.Tensor = self.tail(post_transformers)
tail, cache = self._blocked_forward_for_generate_unique(x=x, cache_input=cache, block_num=block_num, device=device, i=i)
# The first physical_dim item are amplitude
# delta_amplitude: batch * physical_dim
# delta_amplitude represents the amplitude changes for the configurations at the new site.
delta_amplitude: torch.Tensor = tail[:, :, :self.physical_dim].view([local_batch_size, self.physical_dim])
# normalized_delta_amplitude: batch_size * physical_dim
# Normalize the delta amplitude.
normalized_delta_amplitude: torch.Tensor = self._normalize_amplitude(delta_amplitude)
# The delta unperturbed prob for all batch and all new possible states
l: torch.Tensor = (2 * normalized_delta_amplitude).view([local_batch_size, self.physical_dim])
# and add to get the current unperturbed prob
l = unperturbed_probability.view([local_batch_size, 1]) + l
# Get perturbed prob by adding GUMBEL(0)
L: torch.Tensor = l - (-torch.rand_like(l).log()).log()
# Get max perturbed prob
Z: torch.Tensor = L.max(dim=-1).values.view([local_batch_size, 1])
# Evaluate the conditioned prob
tildeL: torch.Tensor = -torch.log(torch.exp(-perturbed_probability.view([local_batch_size, 1])) - torch.exp(-Z) + torch.exp(-L))
assert cache is not None
# Calculate appended configurations for all new possible states
# local_batch_size * current_site + local_batch_size * 1
xs: list[torch.Tensor] = [torch.cat([x, torch.tensor([j], device=device, dtype=torch.uint8).expand(local_batch_size, -1)], dim=1) for j in range(self.physical_dim)]
# Cat all configurations to get x : new_local_batch_size * (current_size+1)
# (un)perturbed prob : new_local_batch_size
x = torch.cat(xs)
unperturbed_probability = l.permute(1, 0).reshape([self.physical_dim * local_batch_size])
perturbed_probability = tildeL.permute(1, 0).reshape([self.physical_dim * local_batch_size])
cache_indices = torch.arange(local_batch_size, device=device, dtype=torch.int64).unsqueeze(0).expand(4, -1).reshape([4 * local_batch_size])
# kv cache: batch * heads_num * site * heads_dim, so just repeat first dimension
# Filter results, only use largest batch_size ones
selected = perturbed_probability.argsort(descending=True)[:batch_size]
x = x[selected]
unperturbed_probability = unperturbed_probability[selected]
perturbed_probability = perturbed_probability[selected]
cache_indices = cache_indices[selected]
# If prob = 0, filter it forcely
selected = perturbed_probability.isfinite()
x = x[selected]
unperturbed_probability = unperturbed_probability[selected]
perturbed_probability = perturbed_probability[selected]
cache_indices = cache_indices[selected]
cache = [(cache_layer[0][cache_indices], cache_layer[1][cache_indices]) for cache_layer in cache]
# Apply ordering
x = torch.index_select(x[:, 1:], 1, self.ordering)
# Flatten site part of x
x = x.view([x.size(0), self.sites])
# It should return configurations, amplitudes, probabilities and multiplicities.
# But it is unique generator, so the last two fields are None
x = pack_int(x, size=self._bit_size())
# Calculate the amplitude for the generated configurations in the batch.
real_batch_size = len(x)
real_batch_size_block = real_batch_size // block_num
remainder = real_batch_size % block_num
amplitude_list = []
for j in range(block_num):
if j < remainder:
current_real_batch_size_block = real_batch_size_block + 1
else:
current_real_batch_size_block = real_batch_size_block
start_index = j * real_batch_size_block + min(j, remainder)
end_index = start_index + current_real_batch_size_block
batch_indices_block = torch.arange(start_index, end_index, device=device, dtype=torch.int64)
amplitude_block = self(x[batch_indices_block])
amplitude_list.append(amplitude_block)
amplitude: torch.Tensor = torch.cat(amplitude_list)
return x, amplitude, None, None
def _bit_size(self) -> int:
if self.physical_dim <= 1 << 1:
return 1
if self.physical_dim <= 1 << 2:
return 2
if self.physical_dim <= 1 << 4:
return 4
if self.physical_dim <= 1 << 8:
return 8
raise ValueError("physical_dim should be less than or equal to 256")
"""
This module provides functions to combine multiple int1, int2, int4 or int8 values into a single byte.
"""
import torch
@torch.jit.script
def pack_int(tensor: torch.Tensor, size: int) -> torch.Tensor:
"""
Combines multiple small int values into a single byte.
Parameters
----------
tensor : torch.Tensor
The input tensor with shape ... * last_dimension.
size : 1 | 2 | 4 | 8
The size of the int values in bits.
Returns
-------
torch.Tensor
The packed tensor with shape ... * ((last_dimension + elements_per_byte - 1) // elements_per_byte).
"""
assert tensor.dtype == torch.uint8
assert size in [1, 2, 4, 8]
# Get the shape of the input tensor
shape = list(tensor.shape)
# Get the size of the last dimension
last_dim = shape.pop()
# Get the number of elements per byte
elements_per_byte = 8 // size
# Calculate how many zeros need to be padded
if last_dim % elements_per_byte != 0:
pad = elements_per_byte - (last_dim % elements_per_byte)
tensor = torch.nn.functional.pad(tensor, (0, pad))
last_dim = last_dim + pad
# Calculate the number of groups
num_bytes = last_dim // elements_per_byte
# Reshape the tensor to (..., num_bytes, elements_per_bytes)
shape.append(num_bytes)
shape.append(elements_per_byte)
tensor = tensor.view(shape)
# Define the weights tensor
weights = torch.tensor([1 << i for i in range(0, 8, size)], device=tensor.device, dtype=torch.uint8)
# Calculate the byte value for each group
packed = torch.sum(tensor * weights, dim=-1, dtype=torch.uint8)
assert packed.dtype == torch.uint8
return packed
@torch.jit.script
def unpack_int(tensor: torch.Tensor, size: int, last_dim: int) -> torch.Tensor:
"""
Unpacks bytes into multiple small int values based on the specified size.
Parameters
----------
tensor : torch.Tensor
The packed tensor with bytes.
size : 1 | 2 | 4 | 8
The size of the int values in bits.
last_dim : int
The original size of the last dimension before packing.
Returns
-------
torch.Tensor
The unpacked tensor with shape ... * last_dim.
"""
assert tensor.dtype == torch.uint8
assert size in [1, 2, 4, 8]
# Define the weights tensor and shifts tensor
weights = torch.tensor([1 << i for i in range(8)], device=tensor.device, dtype=torch.uint8).view([8 // size, size]).sum(dim=-1, dtype=torch.uint8)
shifts = torch.tensor(list(range(0, 8, size)), device=tensor.device, dtype=torch.uint8)
# Calculate the unpacked values
result = torch.bitwise_right_shift(torch.bitwise_and(tensor.unsqueeze(-1), weights), shifts)
# Reshape the result to the original shape
shape = list(result.shape)
shape.append(shape.pop() * shape.pop())
unpacked = result.view(shape)[..., :last_dim]
assert unpacked.dtype == torch.uint8
return unpacked
"""
This file implements the subspace chopping for the result of the imag script.
"""
import logging
import typing
import dataclasses
import tyro
import torch.utils.tensorboard
from .common import CommonConfig
from .subcommand_dict import subcommand_dict
@dataclasses.dataclass
class ChopImagConfig:
"""
The subspace chopping for the result of the imag script.
"""
common: typing.Annotated[CommonConfig, tyro.conf.OmitArgPrefixes]
# The number of configurations to eliminate every iteration
chop_size: typing.Annotated[int, tyro.conf.arg(aliases=["-c"])] = 10000
# The estimated magnitude of the second order term
second_order_magnitude: typing.Annotated[float, tyro.conf.arg(aliases=["-s"])] = 0.0
def main(self) -> None:
"""
The main function for the subspace chopping.
"""
# pylint: disable=too-many-locals
model, _, data = self.common.main()
logging.info(
"Arguments Summary: "
"Chop Size: %d, "
"Second Order Magnitude: %.10f",
self.chop_size,
self.second_order_magnitude,
)
configs, psi = data["imag"]["pool"]
configs = configs.to(device=self.common.device)
psi = psi.to(device=self.common.device)
writer = torch.utils.tensorboard.SummaryWriter(log_dir=self.common.folder()) # type: ignore[no-untyped-call]
original_configs = configs
original_psi = psi
ordered_configs: list[torch.Tensor] = []
ordered_psi: list[torch.Tensor] = []
mapping: dict[int, tuple[float, float]] = {}
i = 0
while True:
num_configs = len(configs)
logging.info("The number of configurations: %d", num_configs)
writer.add_scalar("chop_imag/num_configs", num_configs, i) # type: ignore[no-untyped-call]
psi = psi / psi.norm()
hamiltonian_psi = model.apply_within(configs, psi, configs)
psi_hamiltonian_psi = (psi.conj() @ hamiltonian_psi).real
energy = psi_hamiltonian_psi
logging.info("The energy: %.10f, The energy error is %.10f", energy.item(), energy.item() - model.ref_energy)
writer.add_scalar("chop_imag/energy", energy.item(), i) # type: ignore[no-untyped-call]
writer.add_scalar("chop_imag/error", energy.item() - model.ref_energy, i) # type: ignore[no-untyped-call]
writer.flush() # type: ignore[no-untyped-call]
mapping[num_configs] = (energy.item(), energy.item() - model.ref_energy)
if self.second_order_magnitude >= 0:
grad = hamiltonian_psi - psi_hamiltonian_psi * psi
delta = -psi.conj() * grad
real_delta = 2 * delta.real
second_order = (psi.conj() * psi).real * self.second_order_magnitude
rate = (real_delta + second_order).argsort()
else:
second_order = (psi.conj() * psi).real
rate = second_order.argsort()
unselected = rate[:self.chop_size]
ordered_configs.append(configs[unselected])
ordered_psi.append(psi[unselected])
selected = rate[self.chop_size:]
if len(selected) == 0:
break
configs = configs[selected]
psi = psi[selected]
i += 1
data["chop_imag"] = {
"ordered_configs": torch.cat(ordered_configs, dim=0),
"ordered_psi": torch.cat(ordered_psi, dim=0),
"original_configs": original_configs,
"original_psi": original_psi,
"mapping": mapping,
}
self.common.save(data, 0)
subcommand_dict["chop_imag"] = ChopImagConfig
"""
This file contains the common step to create a model and network for various scripts.
"""
import sys
import logging
import typing
import pathlib
import dataclasses
import torch
import tyro
from .model_dict import model_dict, ModelProto, NetworkProto
from .random_engine import dump_random_engine_state, load_random_engine_state
@dataclasses.dataclass
class CommonConfig:
"""
This class defines the common settings needed to create a model and network.
"""
# pylint: disable=too-many-instance-attributes
# The model name
model_name: typing.Annotated[str, tyro.conf.Positional, tyro.conf.arg(metavar="MODEL")]
# The network name
network_name: typing.Annotated[str, tyro.conf.Positional, tyro.conf.arg(metavar="NETWORK")]
# Arguments for physical model
physics_args: typing.Annotated[tuple[str, ...], tyro.conf.arg(aliases=["-P"]), tyro.conf.UseAppendAction] = ()
# Arguments for network
network_args: typing.Annotated[tuple[str, ...], tyro.conf.arg(aliases=["-N"]), tyro.conf.UseAppendAction] = ()
# The log path
log_path: typing.Annotated[pathlib.Path, tyro.conf.arg(aliases=["-L"])] = pathlib.Path("logs")
# The group name, leave empty to use the preset one given by the model
group_name: typing.Annotated[str | None, tyro.conf.arg(aliases=["-G"])] = None
# The job name, where it is recommended to use distinct job names for runs with varying parameters
current_job_name: typing.Annotated[str, tyro.conf.arg(aliases=["-J"])] = "main"
# The parent job name, it is only used for loading the checkpoint from the parent job, leave empty to use the current job name
parent_job_name: typing.Annotated[str | None, tyro.conf.arg(aliases=["-F"])] = None
# The manual random seed, leave empty for set seed automatically
random_seed: typing.Annotated[int | None, tyro.conf.arg(aliases=["-S"])] = None
# The interval to save the checkpoint
checkpoint_interval: typing.Annotated[int, tyro.conf.arg(aliases=["-I"])] = 5
# The device to run on
device: typing.Annotated[torch.device, tyro.conf.arg(aliases=["-D"])] = torch.device(type="cuda", index=0)
# The dtype of the network, leave empty to skip modifying the dtype
dtype: typing.Annotated[str | None, tyro.conf.arg(aliases=["-T"])] = None
# The maximum absolute step for the process, leave empty to loop forever
max_absolute_step: typing.Annotated[int | None, tyro.conf.arg(aliases=["-A"])] = None
# The maximum relative step for the process, leave empty to loop forever
max_relative_step: typing.Annotated[int | None, tyro.conf.arg(aliases=["-R"])] = None
def __post_init__(self) -> None:
if self.log_path is not None:
self.log_path = pathlib.Path(self.log_path)
if self.device is not None:
self.device = torch.device(self.device)
if self.max_absolute_step is not None and self.max_relative_step is not None:
raise ValueError("Both max_absolute_step and max_relative_step are set, please set only one of them.")
def folder(self) -> pathlib.Path:
"""
Get the folder name for the current job.
"""
assert self.group_name is not None
return self.log_path / self.group_name / self.current_job_name
def parent_folder(self) -> pathlib.Path:
"""
Get the folder name for the current job.
"""
assert self.group_name is not None
return self.log_path / self.group_name / (self.parent_job_name if self.parent_job_name is not None else self.current_job_name)
def save(self, data: typing.Any, step: int) -> None:
"""
Save data to checkpoint.
"""
data["random"] = {"host": torch.get_rng_state(), "device": dump_random_engine_state(self.device), "device_type": self.device.type}
data_pth = self.folder() / "data.pth"
local_data_pth = self.folder() / f"data.{step}.pth"
torch.save(data, local_data_pth)
data_pth.unlink(missing_ok=True)
if step % self.checkpoint_interval == 0:
data_pth.symlink_to(f"data.{step}.pth")
else:
local_data_pth.rename(data_pth)
if self.max_relative_step is not None:
self.max_absolute_step = step + self.max_relative_step - 1
self.max_relative_step = None
if step == self.max_absolute_step:
logging.info("Reached the maximum step, exiting.")
sys.exit(0)
def main(self, *, model_param: typing.Any = None, network_param: typing.Any = None) -> tuple[ModelProto, NetworkProto, typing.Any]:
"""
The main function to create the model and network.
"""
# pylint: disable=too-many-statements
# pylint: disable=too-many-branches
model_t = model_dict[self.model_name]
model_config_t = model_t.config_t
network_config_t = model_t.network_dict[self.network_name]
if "-h" in self.network_args or "--help" in self.network_args:
tyro.cli(network_config_t, args=self.network_args)
if self.group_name is None:
model_param_for_group_name = tyro.cli(model_config_t, args=self.physics_args)
self.group_name = model_t.default_group_name(model_param_for_group_name)
self.folder().mkdir(parents=True, exist_ok=True)
logging.basicConfig(
handlers=[logging.StreamHandler(), logging.FileHandler(self.folder() / "run.log")],
level=logging.INFO,
format=f"[%(process)d] %(asctime)s {self.group_name}({self.current_job_name}) %(levelname)s: %(message)s",
)
logging.info("Starting script with arguments: %a", sys.argv)
logging.info("Model: %s, Network: %s", self.model_name, self.network_name)
logging.info("Log directory: %s, Group name: %s, Job name: %s", self.log_path, self.group_name, self.current_job_name)
if model_param is not None:
logging.info("Model parameters: %a", model_param)
else:
logging.info("Physics arguments: %a", self.physics_args)
if network_param is not None:
logging.info("Network parameters: %a", network_param)
else:
logging.info("Network arguments: %a", self.network_args)
logging.info("Disabling PyTorch's default gradient computation")
torch.set_grad_enabled(False)
logging.info("Attempting to load checkpoint")
data: typing.Any = {}
checkpoint_path = self.parent_folder() / "data.pth"
if checkpoint_path.exists():
logging.info("Checkpoint found at: %s, loading...", checkpoint_path)
data = torch.load(checkpoint_path, map_location="cpu", weights_only=True)
logging.info("Checkpoint loaded successfully")
else:
if self.parent_job_name is not None:
raise FileNotFoundError(f"Checkpoint not found at: {checkpoint_path}")
logging.info("Checkpoint not found at: %s", checkpoint_path)
if self.random_seed is not None:
logging.info("Setting random seed to: %d", self.random_seed)
torch.manual_seed(self.random_seed)
elif "random" in data:
logging.info("Loading random seed from the checkpoint")
torch.set_rng_state(data["random"]["host"])
if data["random"]["device_type"] == self.device.type:
load_random_engine_state(data["random"]["device"], self.device)
else:
logging.info("Skipping loading random engine state for device since the device type does not match")
else:
logging.info("Random seed not specified, using current seed: %d", torch.seed())
if model_param is None:
logging.info("Parsing configurations for model: %s with arguments: %a", self.model_name, self.physics_args)
model_param = tyro.cli(model_config_t, args=self.physics_args)
else:
logging.info("The model parameters are given as %a, skipping parsing model arguments", model_param)
logging.info("Loading the model")
model: ModelProto = model_t(model_param)
logging.info("Physical model loaded successfully")
if network_param is None:
logging.info("Parsing configurations for network: %s with arguments: %a", self.network_name, self.network_args)
network_param = tyro.cli(network_config_t, args=self.network_args)
else:
logging.info("The network parameters are given as %a, skipping parsing network arguments", network_param)
logging.info("Initializing the network")
network: NetworkProto = network_param.create(model)
logging.info("Network initialized successfully")
if "network" in data:
logging.info("Loading state dict of the network")
network.load_state_dict(data["network"])
else:
logging.info("Skipping loading state dict of the network")
logging.info("Moving model to the device: %a", self.device)
network.to(device=self.device)
if self.dtype is not None:
logging.info("Changing network dtype to: %s", self.dtype)
match self.dtype:
case "bfloat16":
network.bfloat16()
case "float16":
network.half()
case "float32":
network.float()
case "float64":
network.double()
case _:
raise ValueError(f"Unknown dtype: {self.dtype}")
logging.info("Compiling the network")
network = torch.jit.script(network) # type: ignore[assignment]
logging.info("The checkpoints will be saved every %d steps", self.checkpoint_interval)
return model, network, data
"""
This file implements a cross MLP network.
"""
import itertools
import typing
import torch
from .bitspack import unpack_int
class FakeLinear(torch.nn.Module):
"""
A fake linear layer with zero input dimension to avoid PyTorch initialization warnings.
"""
def __init__(self, dim_in: int, dim_out: int) -> None:
super().__init__()
assert dim_in == 0
self.bias: torch.nn.Parameter = torch.nn.Parameter(torch.zeros([dim_out]))
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Forward pass for the fake linear layer.
"""
batch, _ = x.shape
return self.bias.view([1, -1]).expand([batch, -1])
def select_linear_layer(dim_in: int, dim_out: int) -> torch.nn.Module:
"""
Selects between a fake linear layer and a standard one to avoid initialization warnings when dim_in is zero.
"""
if dim_in == 0: # pylint: disable=no-else-return
return FakeLinear(dim_in, dim_out)
else:
return torch.nn.Linear(dim_in, dim_out)
class MLP(torch.nn.Module):
"""
This module implements multiple layers MLP with given dim_input, dim_output and hidden_size.
"""
def __init__(self, dim_input: int, dim_output: int, hidden_size: tuple[int, ...]) -> None:
super().__init__()
self.dim_input: int = dim_input
self.dim_output: int = dim_output
self.hidden_size: tuple[int, ...] = hidden_size
dimensions: list[int] = [dim_input] + list(hidden_size) + [dim_output]
linears: list[torch.nn.Module] = [select_linear_layer(i, j) for i, j in itertools.pairwise(dimensions)]
modules: list[torch.nn.Module] = [module for linear in linears for module in (linear, torch.nn.SiLU())][:-1]
self.model: torch.nn.Module = torch.nn.Sequential(*modules)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Forward pass for the MLP.
"""
return self.model(x)
class WaveFunction(torch.nn.Module):
"""
The wave function for the cross MLP network.
"""
# pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments
self,
*,
sites: int, # Number of qubits
physical_dim: int, # Dimension of the physical space, which is always 2 for MLP
is_complex: bool, # Indicates whether the wave function is complex-valued, which is always true for MLP
embedding_hidden_size: tuple[int, ...], # Hidden layer sizes for embedding part
embedding_size: int, # The dimension of the embedding
momentum_hidden_size: tuple[int, ...], # Hidden layer sizes for momentum part
momentum_count: int, # The number of max momentum order
tail_hidden_size: tuple[int, ...], # Hidden layer size for tail part
kind: typing.Literal[0, 1, 2], # Kind of the crossmlp forward function
ordering: int | list[int], # Ordering of sites: +1 for normal order, -1 for reversed order, or a custom order list
) -> None:
super().__init__()
self.sites: int = sites
assert physical_dim == 2
# This module is only used in reinforcement learning, which expects real values for the weights.
assert is_complex == False # pylint: disable=singleton-comparison
self.embedding_hidden_size: tuple[int, ...] = embedding_hidden_size
self.embedding_size: int = embedding_size
self.momentum_hidden_size: tuple[int, ...] = momentum_hidden_size
self.momentum_count: int = momentum_count
self.tail_hidden_size: tuple[int, ...] = tail_hidden_size
self.kind: typing.Literal[0, 1, 2] = kind
self.emb = MLP(self.sites, self.embedding_size, self.embedding_hidden_size)
self.momentum = torch.nn.ModuleList([MLP(self.embedding_size, self.embedding_size, momentum_hidden_size) for _ in range(self.momentum_count)])
self.tail = MLP(self.embedding_size, 1, tail_hidden_size)
# Site Ordering Configuration
# +1 for normal order, -1 for reversed order
if isinstance(ordering, int) and ordering == +1:
ordering = list(range(self.sites))
if isinstance(ordering, int) and ordering == -1:
ordering = list(reversed(range(self.sites)))
self.ordering: torch.Tensor
self.register_buffer("ordering", torch.tensor(ordering, dtype=torch.int64))
self.ordering_reversed: torch.Tensor
self.register_buffer("ordering_reversed", torch.scatter(torch.zeros(self.sites, dtype=torch.int64), 0, self.ordering, torch.arange(self.sites, dtype=torch.int64)))
# Dummy Parameter for Device and Dtype Retrieval
# This parameter is used to infer the device and dtype of the model.
self.dummy_param = torch.nn.Parameter(torch.empty(0))
@torch.jit.export
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Compute the wave function psi for the given configurations.
"""
dtype = self.dummy_param.dtype
# x: batch_size * sites
x = unpack_int(x, size=1, last_dim=self.sites)
# Apply ordering
x = torch.index_select(x, 1, self.ordering_reversed)
# Dtype conversion
x = x.to(dtype=dtype)
# emb: batch_size * embedding_size
emb = self.emb(x)
if self.kind == 0:
# x' = F(x - E[x]) + x
for layer in self.momentum:
new_emb = emb - emb.mean(dim=0, keepdim=True)
new_emb = layer(new_emb)
emb = emb + new_emb
emb = emb / emb.norm(p=2, dim=1, keepdim=True)
elif self.kind == 1:
# x' = F(x) - E[F(x)] + x
for layer in self.momentum:
new_emb = layer(emb)
new_emb = new_emb - new_emb.mean(dim=0, keepdim=True)
emb = emb + new_emb
emb = emb / emb.norm(p=2, dim=1, keepdim=True)
elif self.kind == 2:
# x' = (F(x) + x) - E [F(x) + x]
for layer in self.momentum:
new_emb = layer(emb)
new_emb = new_emb + emb
emb = new_emb - new_emb.mean(dim=0, keepdim=True)
emb = emb / emb.norm(p=2, dim=1, keepdim=True)
else:
raise ValueError(f"Invalid kind: {self.kind}")
tail = self.tail(emb).squeeze(-1)
return tail
@torch.jit.export
def generate_unique(self, batch_size: int, block_num: int = 1) -> tuple[torch.Tensor, torch.Tensor, None, None]:
"""
This module does not support generating unique configurations.
"""
# This module is only used in reinforcement learning, which does not require configurations sampling.
raise NotImplementedError("The generate_unique method is not implemented for this class.")
"""
This file provides an interface to work with FCIDUMP files.
"""
import os
import typing
import logging
import dataclasses
import re
import gzip
import pathlib
import hashlib
import torch
import yaml
import tyro
import openfermion
import platformdirs
from .mlp import WaveFunctionElectronUpDown as MlpWaveFunction
from .attention import WaveFunctionElectronUpDown as AttentionWaveFunction
from .crossmlp import WaveFunction as CrossMlpWaveFunction
from .hamiltonian import Hamiltonian
from .model_dict import model_dict, ModelProto, NetworkProto, NetworkConfigProto
QMB_MODEL_PATH = "QMB_MODEL_PATH"
@dataclasses.dataclass
class ModelConfig:
"""
The configuration of the model.
"""
# The openfermion model name
model_name: typing.Annotated[str, tyro.conf.Positional, tyro.conf.arg(metavar="MODEL")]
# The path of models folder
model_path: typing.Annotated[pathlib.Path | None, tyro.conf.arg(aliases=["-M"], help_behavior_hint=f"default: \"models\", can be overridden by `${QMB_MODEL_PATH}'")] = None
# The ref energy of the model, leave empty to read from FCIDUMP.yaml
ref_energy: typing.Annotated[float | None, tyro.conf.arg(aliases=["-r"])] = None
def __post_init__(self) -> None:
if self.model_path is not None:
self.model_path = pathlib.Path(self.model_path)
else:
if QMB_MODEL_PATH in os.environ:
self.model_path = pathlib.Path(os.environ[QMB_MODEL_PATH])
else:
self.model_path = pathlib.Path("models")
def _read_fcidump(file_name: pathlib.Path, *, cached: bool = False) -> tuple[tuple[int, int, int], dict[tuple[tuple[int, int], ...], complex]]:
# pylint: disable=too-many-locals
with gzip.open(file_name, "rt", encoding="utf-8") if file_name.name.endswith(".gz") else open(file_name, "rt", encoding="utf-8") as file:
n_orbit: int | None = None
n_electron: int | None = None
n_spin: int | None = None
for line in file:
data: str = line.lower()
if (match := re.search(r"norb\s*=\s*(\d+)", data)) is not None:
n_orbit = int(match.group(1))
if (match := re.search(r"nelec\s*=\s*(\d+)", data)) is not None:
n_electron = int(match.group(1))
if (match := re.search(r"ms2\s*=\s*(\d+)", data)) is not None:
n_spin = int(match.group(1))
if "&end" in data:
break
assert n_orbit is not None
assert n_electron is not None
assert n_spin is not None
if cached:
return (n_orbit, n_electron, n_spin), {}
energy_0: float = 0.0
energy_1: torch.Tensor = torch.zeros([n_orbit, n_orbit], dtype=torch.float64)
energy_2: torch.Tensor = torch.zeros([n_orbit, n_orbit, n_orbit, n_orbit], dtype=torch.float64)
for line in file:
pieces: list[str] = line.split()
coefficient: float = float(pieces[0])
sites: tuple[int, ...] = tuple(int(i) - 1 for i in pieces[1:])
match sites:
case (-1, -1, -1, -1):
energy_0 = coefficient
case (_, -1, -1, -1):
# Psi4 writes additional non-standard one-electron integrals in this format, which we omit.
pass
case (i, j, -1, -1):
energy_1[i, j] = coefficient
energy_1[j, i] = coefficient
case (_, _, _, -1):
# In the standard FCIDUMP format, there is no such term.
raise ValueError(f"Invalid FCIDUMP format: {sites}")
case (i, j, k, l):
energy_2[i, j, k, l] = coefficient
energy_2[i, j, l, k] = coefficient
energy_2[j, i, k, l] = coefficient
energy_2[j, i, l, k] = coefficient
energy_2[l, k, j, i] = coefficient
energy_2[k, l, j, i] = coefficient
energy_2[l, k, i, j] = coefficient
energy_2[k, l, i, j] = coefficient
case _:
raise ValueError(f"Invalid FCIDUMP format: {sites}")
energy_2 = energy_2.permute(0, 2, 3, 1).contiguous() / 2
energy_1_b: torch.Tensor = torch.zeros([n_orbit * 2, n_orbit * 2], dtype=torch.float64)
energy_2_b: torch.Tensor = torch.zeros([n_orbit * 2, n_orbit * 2, n_orbit * 2, n_orbit * 2], dtype=torch.float64)
energy_1_b[0::2, 0::2] = energy_1
energy_1_b[1::2, 1::2] = energy_1
energy_2_b[0::2, 0::2, 0::2, 0::2] = energy_2
energy_2_b[0::2, 1::2, 1::2, 0::2] = energy_2
energy_2_b[1::2, 0::2, 0::2, 1::2] = energy_2
energy_2_b[1::2, 1::2, 1::2, 1::2] = energy_2
interaction_operator: openfermion.InteractionOperator = openfermion.InteractionOperator(energy_0, energy_1_b.numpy(), energy_2_b.numpy()) # type: ignore[no-untyped-call]
fermion_operator: openfermion.FermionOperator = openfermion.get_fermion_operator(interaction_operator) # type: ignore[no-untyped-call]
return (n_orbit, n_electron, n_spin), {k: complex(v) for k, v in openfermion.normal_ordered(fermion_operator).terms.items()} # type: ignore[no-untyped-call]
class Model(ModelProto[ModelConfig]):
"""
This class handles the models from FCIDUMP files.
"""
network_dict: dict[str, type[NetworkConfigProto["Model"]]] = {}
config_t = ModelConfig
@classmethod
def default_group_name(cls, config: ModelConfig) -> str:
return config.model_name
def __init__(self, args: ModelConfig) -> None:
# pylint: disable=too-many-locals
logging.info("Input arguments successfully parsed")
logging.info("Model name: %s, Model path: %s", args.model_name, args.model_path)
model_name = args.model_name
model_path = args.model_path
ref_energy = args.ref_energy
assert model_path is not None
model_file_name = model_path / f"{model_name}.FCIDUMP.gz"
model_file_name = model_file_name if model_file_name.exists() else model_path / model_name
checksum = hashlib.sha256(model_file_name.read_bytes()).hexdigest() + "v5"
cache_file = platformdirs.user_cache_path("qmb", "kclab") / checksum
if cache_file.exists():
logging.info("Loading FCIDUMP metadata '%s' from file: %s", model_name, model_file_name)
(n_orbit, n_electron, n_spin), _ = _read_fcidump(model_file_name, cached=True)
logging.info("FCIDUMP metadata '%s' successfully loaded", model_name)
logging.info("Loading FCIDUMP Hamiltonian '%s' from cache", model_name)
openfermion_hamiltonian_data = torch.load(cache_file, map_location="cpu", weights_only=True)
logging.info("FCIDUMP Hamiltonian '%s' successfully loaded", model_name)
logging.info("Recovering internal Hamiltonian representation for model '%s'", model_name)
self.hamiltonian = Hamiltonian(openfermion_hamiltonian_data, kind="fermi")
logging.info("Internal Hamiltonian representation for model '%s' successfully recovered", model_name)
else:
logging.info("Loading FCIDUMP Hamiltonian '%s' from file: %s", model_name, model_file_name)
(n_orbit, n_electron, n_spin), openfermion_hamiltonian_dict = _read_fcidump(model_file_name)
logging.info("FCIDUMP Hamiltonian '%s' successfully loaded", model_name)
logging.info("Converting OpenFermion Hamiltonian to internal Hamiltonian representation")
self.hamiltonian = Hamiltonian(openfermion_hamiltonian_dict, kind="fermi")
logging.info("Internal Hamiltonian representation for model '%s' has been successfully created", model_name)
logging.info("Caching OpenFermion Hamiltonian for model '%s'", model_name)
cache_file.parent.mkdir(parents=True, exist_ok=True)
torch.save((self.hamiltonian.site, self.hamiltonian.kind, self.hamiltonian.coef), cache_file)
logging.info("OpenFermion Hamiltonian for model '%s' successfully cached", model_name)
self.n_qubit: int = n_orbit * 2
self.n_electron: int = n_electron
self.n_spin: int = n_spin
logging.info("Identified %d qubits, %d electrons and %d spin for model '%s'", self.n_qubit, self.n_electron, self.n_spin, model_name)
self.ref_energy: float
if ref_energy is not None:
self.ref_energy = ref_energy
else:
fcidump_ref_energy_file = model_file_name.parent / "FCIDUMP.yaml"
if fcidump_ref_energy_file.exists():
with open(fcidump_ref_energy_file, "rt", encoding="utf-8") as file:
fcidump_ref_energy_data = yaml.safe_load(file)
self.ref_energy = fcidump_ref_energy_data.get(pathlib.Path(model_name).name, 0)
else:
self.ref_energy = 0
logging.info("Reference energy for model '%s' is %.10f", model_name, self.ref_energy)
def apply_within(self, configs_i: torch.Tensor, psi_i: torch.Tensor, configs_j: torch.Tensor) -> torch.Tensor:
return self.hamiltonian.apply_within(configs_i, psi_i, configs_j)
def find_relative(self, configs_i: torch.Tensor, psi_i: torch.Tensor, count_selected: int, configs_exclude: torch.Tensor | None = None) -> torch.Tensor:
return self.hamiltonian.find_relative(configs_i, psi_i, count_selected, configs_exclude)
def single_relative(self, configs: torch.Tensor) -> torch.Tensor:
return self.hamiltonian.single_relative(configs)
def show_config(self, config: torch.Tensor) -> str:
string = "".join(f"{i:08b}"[::-1] for i in config.cpu().numpy())
return "[" + "".join(self._show_config_site(string[index:index + 2]) for index in range(0, self.n_qubit, 2)) + "]"
def _show_config_site(self, string: str) -> str:
match string:
case "00":
return " "
case "10":
return "↑"
case "01":
return "↓"
case "11":
return "↕"
case _:
raise ValueError(f"Invalid string: {string}")
model_dict["fcidump"] = Model
@dataclasses.dataclass
class MlpConfig:
"""
The configuration of the MLP network.
"""
# The hidden widths of the network
hidden: typing.Annotated[tuple[int, ...], tyro.conf.arg(aliases=["-w"])] = (512,)
def create(self, model: Model) -> NetworkProto:
"""
Create a MLP network for the model.
"""
logging.info("Hidden layer widths: %a", self.hidden)
network = MlpWaveFunction(
double_sites=model.n_qubit,
physical_dim=2,
is_complex=True,
spin_up=(model.n_electron + model.n_spin) // 2,
spin_down=(model.n_electron - model.n_spin) // 2,
hidden_size=self.hidden,
ordering=+1,
)
return network
Model.network_dict["mlp"] = MlpConfig
@dataclasses.dataclass
class AttentionConfig:
"""
The configuration of the attention network.
"""
# Embedding dimension
embedding_dim: typing.Annotated[int, tyro.conf.arg(aliases=["-e"])] = 512
# Heads number
heads_num: typing.Annotated[int, tyro.conf.arg(aliases=["-m"])] = 8
# Feedforward dimension
feed_forward_dim: typing.Annotated[int, tyro.conf.arg(aliases=["-f"])] = 2048
# Shared expert number
shared_expert_num: typing.Annotated[int, tyro.conf.arg(aliases=["-s"])] = 1
# Routed expert number
routed_expert_num: typing.Annotated[int, tyro.conf.arg(aliases=["-r"])] = 0
# Selected expert number
selected_expert_num: typing.Annotated[int, tyro.conf.arg(aliases=["-c"])] = 0
# Network depth
depth: typing.Annotated[int, tyro.conf.arg(aliases=["-d"])] = 6
def create(self, model: Model) -> NetworkProto:
"""
Create an attention network for the model.
"""
logging.info(
"Attention network configuration: "
"embedding dimension: %d, "
"number of heads: %d, "
"feed-forward dimension: %d, "
"shared expert number: %d, "
"routed expert number: %d, "
"selected expert number: %d, "
"depth: %d",
self.embedding_dim,
self.heads_num,
self.feed_forward_dim,
self.shared_expert_num,
self.routed_expert_num,
self.selected_expert_num,
self.depth,
)
network = AttentionWaveFunction(
double_sites=model.n_qubit,
physical_dim=2,
is_complex=True,
spin_up=(model.n_electron + model.n_spin) // 2,
spin_down=(model.n_electron - model.n_spin) // 2,
embedding_dim=self.embedding_dim,
heads_num=self.heads_num,
feed_forward_dim=self.feed_forward_dim,
shared_num=self.shared_expert_num,
routed_num=self.routed_expert_num,
selected_num=self.selected_expert_num,
depth=self.depth,
ordering=+1,
)
return network
Model.network_dict["attention"] = AttentionConfig
@dataclasses.dataclass
class CrossMlpConfig:
"""
The configuration of the cross MLP network.
"""
# The hidden widths of the embedding subnetwork
embedding_hidden: typing.Annotated[tuple[int, ...], tyro.conf.arg(aliases=["-w"])] = (64,)
# The dimension of the embedding
embedding_size: typing.Annotated[int, tyro.conf.arg(aliases=["-e"])] = 16
# The hidden widths of the momentum subnetwork
momentum_hidden: typing.Annotated[tuple[int, ...], tyro.conf.arg(aliases=["-m"])] = (64,)
# The number of max momentum order
momentum_count: typing.Annotated[int, tyro.conf.arg(aliases=["-n"])] = 1
# The hidden widths of the tail part
tail_hidden: typing.Annotated[tuple[int, ...], tyro.conf.arg(aliases=["-t"])] = (64,)
# The kind of the crossmlp forward function
kind: typing.Annotated[typing.Literal[0, 1, 2], tyro.conf.arg(aliases=["-k"])] = 0
# The ordering of the sites
ordering: typing.Annotated[int | list[int], tyro.conf.arg(aliases=["-o"])] = +1
def create(self, model: Model) -> NetworkProto:
"""
Create a cross MLP network for the model.
"""
logging.info(
"Cross MLP network configuration: "
"embedding hidden widths: %a, "
"embedding size: %d, "
"momentum hidden widths: %a, "
"momentum count: %d, "
"tail hidden widths: %a, "
"kind: %d, "
"ordering: %s",
self.embedding_hidden,
self.embedding_size,
self.momentum_hidden,
self.momentum_count,
self.tail_hidden,
self.kind,
self.ordering,
)
network = CrossMlpWaveFunction(
sites=model.n_qubit,
physical_dim=2,
is_complex=False,
embedding_hidden_size=self.embedding_hidden,
embedding_size=self.embedding_size,
momentum_hidden_size=self.momentum_hidden,
momentum_count=self.momentum_count,
tail_hidden_size=self.tail_hidden,
kind=self.kind,
ordering=self.ordering,
)
return network
Model.network_dict["crossmlp"] = CrossMlpConfig
"""
This file implements a two-step optimization process for solving quantum many-body problems based on imaginary time.
"""
import copy
import logging
import typing
import dataclasses
import functools
import scipy
import torch
import torch.utils.tensorboard
import tyro
from . import losses
from .common import CommonConfig
from .subcommand_dict import subcommand_dict
from .model_dict import ModelProto
from .optimizer import initialize_optimizer, scale_learning_rate
@dataclasses.dataclass
class _DynamicLanczos:
"""
This class implements the dynamic Lanczos algorithm for solving quantum many-body problems.
"""
# pylint: disable=too-many-instance-attributes
model: ModelProto
configs: torch.Tensor
psi: torch.Tensor
step: int
threshold: float
count_extend: int
batch_count_apply_within: int
single_extend: bool
first_extend: bool
def _extend(self, psi: torch.Tensor, basic_configs: torch.Tensor | None = None) -> None:
if basic_configs is None:
basic_configs = self.configs
logging.info("Extending basis...")
count_core = len(self.configs)
logging.info("Number of core configurations: %d", count_core)
self.configs = torch.cat([self.configs, self.model.find_relative(basic_configs, psi, self.count_extend, self.configs)])
count_selected = len(self.configs)
self.psi = torch.nn.functional.pad(self.psi, (0, count_selected - count_core))
logging.info("Basis extended from %d to %d", count_core, count_selected)
def run(self) -> typing.Iterable[tuple[torch.Tensor, torch.Tensor, torch.Tensor]]:
"""
Run the Lanczos algorithm.
Yields
------
energy : torch.Tensor
The ground energy.
configs : torch.Tensor
The configurations.
psi : torch.Tensor
The wavefunction amplitude on the configurations.
"""
alpha: list[torch.Tensor]
beta: list[torch.Tensor]
v: list[torch.Tensor]
energy: torch.Tensor
psi: torch.Tensor
if self.count_extend == 0:
# Do not extend the configuration, process the standard lanczos.
for _, [alpha, beta, v] in zip(range(1 + self.step), self._run()):
energy, psi = self._eigh_tridiagonal(alpha, beta, v)
yield energy, self.configs, psi
elif self.first_extend:
# Extend the configuration before all iterations.
psi = self.psi
for _ in range(self.step):
selected = (psi.conj() * psi).real.argsort(descending=True)[:self.count_extend]
configs = self.configs
self._extend(psi[selected], self.configs[selected])
psi = self.model.apply_within(configs, psi, self.configs) # pylint: disable=assignment-from-no-return
for _, [alpha, beta, v] in zip(range(1 + self.step), self._run()):
energy, psi = self._eigh_tridiagonal(alpha, beta, v)
yield energy, self.configs, psi
elif self.single_extend:
# Extend the configuration only once after the whole iteration.
for _, [alpha, beta, v] in zip(range(1 + self.step), self._run()):
energy, psi = self._eigh_tridiagonal(alpha, beta, v)
yield energy, self.configs, psi
# Extend based on all vector in v.
v_sum = functools.reduce(torch.add, ((vi.conj() * vi).real.cpu() for vi in v)).sqrt().to(device=self.configs.device)
self._extend(v_sum)
for _, [alpha, beta, v] in zip(range(1 + self.step), self._run()):
energy, psi = self._eigh_tridiagonal(alpha, beta, v)
yield energy, self.configs, psi
else:
# Extend the configuration, during processing the dynamic lanczos.
for step in range(1 + self.step):
for _, [alpha, beta, v] in zip(range(1 + step), self._run()):
pass
energy, psi = self._eigh_tridiagonal(alpha, beta, v)
yield energy, self.configs, psi
if step != self.step:
self._extend(v[-1])
def _run(self) -> typing.Iterable[tuple[list[torch.Tensor], list[torch.Tensor], list[torch.Tensor]]]:
"""
Process the standard lanczos.
Yields
------
alpha : list[torch.Tensor]
The alpha values.
beta : list[torch.Tensor]
The beta values.
v : list[torch.Tensor]
The v values.
"""
# In this function, we distribute data to the GPU and CPU.
# The details are as follows:
# All data other than v is always on the GPU.
# The last v is always on the GPU and the rest are moved to the CPU immediately after necessary calculations.
v: list[torch.Tensor] = [self.psi / torch.linalg.norm(self.psi)] # pylint: disable=not-callable
alpha: list[torch.Tensor] = []
beta: list[torch.Tensor] = []
w: torch.Tensor
w = self._apply_within(self.configs, v[-1], self.configs)
alpha.append((w.conj() @ v[-1]).real)
yield (alpha, beta, v)
w = w - alpha[-1] * v[-1]
while True:
norm_w = torch.linalg.norm(w) # pylint: disable=not-callable
if norm_w < self.threshold:
break
beta.append(norm_w)
v.append(w / beta[-1])
w = self._apply_within(self.configs, v[-1], self.configs)
alpha.append((w.conj() @ v[-1]).real)
yield (alpha, beta, v)
w = w - alpha[-1] * v[-1] - beta[-1] * v[-2]
v[-2] = v[-2].cpu() # v maybe very large, so we need to move it to CPU
def _apply_within(self, configs_i: torch.Tensor, psi_i: torch.Tensor, configs_j: torch.Tensor) -> torch.Tensor:
batch_size = len(configs_i)
local_batch_size = batch_size // self.batch_count_apply_within
remainder = batch_size % self.batch_count_apply_within
result: torch.Tensor | None = None
for i in range(self.batch_count_apply_within):
if i < remainder:
current_local_batch_size = local_batch_size + 1
else:
current_local_batch_size = local_batch_size
start_index = i * local_batch_size + min(i, remainder)
end_index = start_index + current_local_batch_size
local_result = self.model.apply_within( # pylint: disable=assignment-from-no-return
configs_i[start_index:end_index],
psi_i[start_index:end_index],
configs_j,
)
if result is None:
result = local_result
else:
result = result + local_result
assert result is not None
return result
def _eigh_tridiagonal(
self,
alpha: list[torch.Tensor],
beta: list[torch.Tensor],
v: list[torch.Tensor],
) -> tuple[torch.Tensor, torch.Tensor]:
if len(beta) == 0:
return alpha[0], v[0]
# Currently, PyTorch does not support eigh_tridiagonal natively, so we resort to using SciPy for this operation.
# We can only use 'stebz' or 'stemr' drivers in the current version of SciPy.
# However, 'stemr' consumes a lot of memory, so we opt for 'stebz' here.
# 'stebz' is efficient and only takes a few seconds even for large matrices with dimensions up to 10,000,000.
vals, vecs = scipy.linalg.eigh_tridiagonal(torch.stack(alpha, dim=0).cpu(), torch.stack(beta, dim=0).cpu(), lapack_driver="stebz", select="i", select_range=(0, 0))
energy = torch.as_tensor(vals[0])
result = functools.reduce(torch.add, (weight[0] * vector.to(device=self.configs.device) for weight, vector in zip(vecs, v)))
return energy, result
def _sampling_from_last_iteration(pool: tuple[torch.Tensor, torch.Tensor] | None, number: int) -> tuple[torch.Tensor, torch.Tensor] | tuple[None, None]:
"""
Sample configurations and wavefunction amplitudes from the last iteration.
Parameters
----------
pool : tuple[torch.Tensor, torch.Tensor] | None
The pool of configurations and wavefunction amplitudes from the last iteration.
number : int
The number of samples to draw.
Returns
-------
tuple[torch.Tensor, torch.Tensor] | tuple[None, None]
A tuple containing the sampled configurations and wavefunction amplitudes, or (None, None) if the pool is empty.
"""
if pool is None:
return None, None
configs, psi = pool
probabilities = (psi.conj() * psi).real
phi = probabilities.log()
g = phi - (-torch.rand_like(phi).log()).log()
_, indices = torch.topk(g, min(number, len(g)))
return configs[indices], psi[indices]
def _merge_pool_from_neural_network_and_pool_from_last_iteration(
configs_a: torch.Tensor,
psi_a: torch.Tensor,
configs_b: torch.Tensor | None,
psi_b: torch.Tensor | None,
) -> tuple[torch.Tensor, torch.Tensor]:
"""
Merge the pool of configurations and psi values from the neural network and the last iteration.
Parameters
----------
configs_a : torch.Tensor
Configurations from the neural network.
psi_a : torch.Tensor
Psi values from the neural network.
configs_b : torch.Tensor | None
Configurations from the last iteration.
psi_b : torch.Tensor | None
Psi values from the last iteration.
Returns
-------
tuple[torch.Tensor, torch.Tensor]
Merged configurations and psi values.
"""
if configs_b is None and psi_b is None:
return configs_a, psi_a
assert configs_b is not None
assert psi_b is not None
configs_both = torch.cat([configs_a, configs_b])
psi_both = torch.cat([psi_a, psi_b])
configs_result, indices = torch.unique(configs_both, return_inverse=True, dim=0)
# If the configurations are not unique, we prefer the psi from the last iteration.
psi_result = torch.zeros(len(configs_result), device=psi_both.device, dtype=psi_both.dtype).scatter_(0, indices, psi_both)
return configs_result, psi_result
@dataclasses.dataclass
class HaarConfig:
"""
The two-step optimization process for solving quantum many-body problems based on imaginary time.
"""
# pylint: disable=too-many-instance-attributes
common: typing.Annotated[CommonConfig, tyro.conf.OmitArgPrefixes]
# The sampling count from neural network
sampling_count_from_neural_network: typing.Annotated[int, tyro.conf.arg(aliases=["-n"])] = 1024
# The sampling count from last iteration
sampling_count_from_last_iteration: typing.Annotated[int, tyro.conf.arg(aliases=["-f"])] = 1024
# The extend count for the Krylov subspace
krylov_extend_count: typing.Annotated[int, tyro.conf.arg(aliases=["-c"], help_behavior_hint="default: 2048 if krylov_single_extend else 64")] = -1
# Whether to extend Krylov subspace before all iterations
krylov_extend_first: typing.Annotated[bool, tyro.conf.arg(aliases=["-z"])] = False
# Whether to extend only once for Krylov subspace
krylov_single_extend: typing.Annotated[bool, tyro.conf.arg(aliases=["-1"])] = False
# The number of Krylov iterations to perform
krylov_iteration: typing.Annotated[int, tyro.conf.arg(aliases=["-k"])] = 32
# The threshold for the Krylov iteration
krylov_threshold: typing.Annotated[float, tyro.conf.arg(aliases=["-d"])] = 1e-8
# The name of the loss function to use
loss_name: typing.Annotated[str, tyro.conf.arg(aliases=["-l"])] = "sum_filtered_angle_scaled_log"
# Whether to use the global optimizer
global_opt: typing.Annotated[bool, tyro.conf.arg(aliases=["-g"])] = False
# Whether to use LBFGS instead of Adam
use_lbfgs: typing.Annotated[bool, tyro.conf.arg(aliases=["-2"])] = False
# The learning rate for the local optimizer
learning_rate: typing.Annotated[float, tyro.conf.arg(aliases=["-r"], help_behavior_hint="(default: 1e-3 for Adam, 1 for LBFGS)")] = -1
# The number of steps for the local optimizer
local_step: typing.Annotated[int, tyro.conf.arg(aliases=["-s"], help_behavior_hint="(default: 10000 for Adam, 1000 for LBFGS)")] = -1
# The early break loss threshold for local optimization
local_loss: typing.Annotated[float, tyro.conf.arg(aliases=["-t"])] = 1e-8
# The number of psi values to log after local optimization
logging_psi: typing.Annotated[int, tyro.conf.arg(aliases=["-p"])] = 30
# The local batch count used to avoid memory overflow in generating configurations
local_batch_count_generation: typing.Annotated[int, tyro.conf.arg(aliases=["-m"])] = 1
# The local batch count used to avoid memory overflow in apply within
local_batch_count_apply_within: typing.Annotated[int, tyro.conf.arg(aliases=["-a"])] = 1
# The local batch count used to avoid memory overflow in loss function
local_batch_count_loss_function: typing.Annotated[int, tyro.conf.arg(aliases=["-b"])] = 1
def __post_init__(self) -> None:
if self.learning_rate == -1:
self.learning_rate = 1 if self.use_lbfgs else 1e-3
if self.local_step == -1:
self.local_step = 1000 if self.use_lbfgs else 10000
if self.krylov_extend_count == -1:
self.krylov_extend_count = 2048 if self.krylov_single_extend else 64
def main(self, *, model_param: typing.Any = None, network_param: typing.Any = None) -> None:
"""
The main function of two-step optimization process based on imaginary time.
"""
# pylint: disable=too-many-locals
# pylint: disable=too-many-statements
# pylint: disable=too-many-branches
model, network, data = self.common.main(model_param=model_param, network_param=network_param)
logging.info(
"Arguments Summary: "
"Sampling Count From neural network: %d, "
"Sampling Count From Last iteration: %d, "
"Krylov Extend Count: %d, "
"Krylov Extend First: %s, "
"Krylov Single Extend: %s, "
"krylov Iteration: %d, "
"krylov Threshold: %.10f, "
"Loss Function: %s, "
"Global Optimizer: %s, "
"Use LBFGS: %s, "
"Learning Rate: %.10f, "
"Local Steps: %d, "
"Local Loss Threshold: %.10f, "
"Logging Psi: %d, "
"Local Batch Count For Generation: %d, "
"Local Batch Count For Apply Within: %d, "
"Local Batch Count For Loss Function: %d",
self.sampling_count_from_neural_network,
self.sampling_count_from_last_iteration,
self.krylov_extend_count,
"Yes" if self.krylov_extend_first else "No",
"Yes" if self.krylov_single_extend else "No",
self.krylov_iteration,
self.krylov_threshold,
self.loss_name,
"Yes" if self.global_opt else "No",
"Yes" if self.use_lbfgs else "No",
self.learning_rate,
self.local_step,
self.local_loss,
self.logging_psi,
self.local_batch_count_generation,
self.local_batch_count_apply_within,
self.local_batch_count_loss_function,
)
optimizer = initialize_optimizer(
network.parameters(),
use_lbfgs=self.use_lbfgs,
learning_rate=self.learning_rate,
state_dict=data.get("optimizer"),
)
if "haar" not in data and "imag" in data:
logging.warning("The 'imag' subcommand is deprecated, please use 'haar' instead.")
data["haar"] = data["imag"]
del data["imag"]
if "haar" not in data:
data["haar"] = {"global": 0, "local": 0, "lanczos": 0, "pool": None}
else:
pool_configs, pool_psi = data["haar"]["pool"]
data["haar"]["pool"] = (pool_configs.to(device=self.common.device), pool_psi.to(device=self.common.device))
writer = torch.utils.tensorboard.SummaryWriter(log_dir=self.common.folder()) # type: ignore[no-untyped-call]
while True:
logging.info("Starting a new optimization cycle")
logging.info("Sampling configurations from neural network")
configs_from_neural_network, psi_from_neural_network, _, _ = network.generate_unique(self.sampling_count_from_neural_network, self.local_batch_count_generation)
logging.info("Sampling configurations from last iteration")
configs_from_last_iteration, psi_from_last_iteration = _sampling_from_last_iteration(data["haar"]["pool"], self.sampling_count_from_last_iteration)
logging.info("Merging configurations from neural network and last iteration")
configs, original_psi = _merge_pool_from_neural_network_and_pool_from_last_iteration(
configs_from_neural_network,
psi_from_neural_network,
configs_from_last_iteration,
psi_from_last_iteration,
)
logging.info("Sampling completed, unique configurations count: %d", len(configs))
logging.info("Computing the target for local optimization")
target_energy: torch.Tensor
for target_energy, configs, original_psi in _DynamicLanczos(
model=model,
configs=configs,
psi=original_psi,
step=self.krylov_iteration,
threshold=self.krylov_threshold,
count_extend=self.krylov_extend_count,
batch_count_apply_within=self.local_batch_count_apply_within,
single_extend=self.krylov_single_extend,
first_extend=self.krylov_extend_first,
).run():
logging.info("The current energy is %.10f where the sampling count is %d", target_energy.item(), len(configs))
writer.add_scalar("haar/lanczos/energy", target_energy, data["haar"]["lanczos"]) # type: ignore[no-untyped-call]
writer.add_scalar("haar/lanczos/error", target_energy - model.ref_energy, data["haar"]["lanczos"]) # type: ignore[no-untyped-call]
data["haar"]["lanczos"] += 1
max_index = original_psi.abs().argmax()
target_psi = original_psi / original_psi[max_index]
logging.info("Local optimization target calculated, the target energy is %.10f, the sampling count is %d", target_energy.item(), len(configs))
loss_func: typing.Callable[[torch.Tensor, torch.Tensor], torch.Tensor] = getattr(losses, self.loss_name)
optimizer = initialize_optimizer(
network.parameters(),
use_lbfgs=self.use_lbfgs,
learning_rate=self.learning_rate,
new_opt=not self.global_opt,
optimizer=optimizer,
)
def closure() -> torch.Tensor:
optimizer.zero_grad()
total_size = len(configs)
batch_size = total_size // self.local_batch_count_loss_function
remainder = total_size % self.local_batch_count_loss_function
total_loss = 0.0
total_psi = []
for i in range(self.local_batch_count_loss_function):
if i < remainder:
current_batch_size = batch_size + 1
else:
current_batch_size = batch_size
start_index = i * batch_size + min(i, remainder)
end_index = start_index + current_batch_size
batch_indices = torch.arange(start_index, end_index, device=configs.device, dtype=torch.int64)
psi_batch = target_psi[batch_indices]
batch_indices = torch.cat((batch_indices, torch.tensor([max_index], device=configs.device, dtype=torch.int64)))
batch_configs = configs[batch_indices]
psi = network(batch_configs)
psi_max = psi[-1]
psi = psi[:-1]
psi = psi / psi_max
loss = loss_func(psi, psi_batch)
loss = loss * (current_batch_size / total_size)
loss.backward() # type: ignore[no-untyped-call]
total_loss += loss.item()
total_psi.append(psi.detach())
total_loss_tensor = torch.tensor(total_loss)
total_loss_tensor.psi = torch.cat(total_psi) # type: ignore[attr-defined]
return total_loss_tensor
loss: torch.Tensor
try_index = 0
while True:
state_backup = copy.deepcopy(network.state_dict())
optimizer_backup = copy.deepcopy(optimizer.state_dict())
logging.info("Starting local optimization process")
success = True
last_loss: float = 0.0
local_step: int = data["haar"]["local"]
scale_learning_rate(optimizer, 1 / (1 << try_index))
for i in range(self.local_step):
loss = optimizer.step(closure) # type: ignore[assignment,arg-type]
logging.info("Local optimization in progress, step %d, current loss: %.10f", i, loss.item())
writer.add_scalar(f"haar/loss/{self.loss_name}", loss, local_step) # type: ignore[no-untyped-call]
local_step += 1
if torch.isnan(loss) or torch.isinf(loss):
logging.warning("Loss is NaN, restoring the previous state and exiting the optimization loop")
success = False
break
if loss < self.local_loss:
logging.info("Local optimization halted as the loss threshold has been met")
break
if abs(loss - last_loss) < self.local_loss:
logging.info("Local optimization halted as the loss difference is too small")
break
last_loss = loss.item()
scale_learning_rate(optimizer, 1 << try_index)
if success:
if any(torch.isnan(param).any() or torch.isinf(param).any() for param in network.parameters()):
logging.warning("NaN detected in parameters, restoring the previous state and exiting the optimization loop")
success = False
if success:
logging.info("Local optimization process completed")
data["haar"]["local"] = local_step
break
network.load_state_dict(state_backup)
optimizer.load_state_dict(optimizer_backup)
try_index = try_index + 1
logging.info("Current optimization cycle completed")
loss = typing.cast(torch.Tensor, torch.enable_grad(closure)()) # type: ignore[no-untyped-call,call-arg]
psi: torch.Tensor = loss.psi # type: ignore[attr-defined]
final_energy = ((psi.conj() @ model.apply_within(configs, psi, configs)) / (psi.conj() @ psi)).real
logging.info(
"Loss during local optimization: %.10f, Final energy: %.10f, Target energy: %.10f, Reference energy: %.10f, Final error: %.10f",
loss.item(),
final_energy.item(),
target_energy.item(),
model.ref_energy,
final_energy.item() - model.ref_energy,
)
writer.add_scalar("haar/energy/state", final_energy, data["haar"]["global"]) # type: ignore[no-untyped-call]
writer.add_scalar("haar/energy/target", target_energy, data["haar"]["global"]) # type: ignore[no-untyped-call]
writer.add_scalar("haar/error/state", final_energy - model.ref_energy, data["haar"]["global"]) # type: ignore[no-untyped-call]
writer.add_scalar("haar/error/target", target_energy - model.ref_energy, data["haar"]["global"]) # type: ignore[no-untyped-call]
logging.info("Displaying the largest amplitudes")
indices = target_psi.abs().argsort(descending=True)
text = []
for index in indices[:self.logging_psi]:
this_config = model.show_config(configs[index])
logging.info("Configuration: %s, Target amplitude: %s, Final amplitude: %s", this_config, f"{target_psi[index].item():.8f}", f"{psi[index].item():.8f}")
text.append(f"Configuration: {this_config}, Target amplitude: {target_psi[index].item():.8f}, Final amplitude: {psi[index].item():.8f}")
writer.add_text("config", "\n".join(text), data["haar"]["global"]) # type: ignore[no-untyped-call]
writer.flush() # type: ignore[no-untyped-call]
logging.info("Saving model checkpoint")
data["haar"]["pool"] = (configs, original_psi)
data["haar"]["global"] += 1
data["network"] = network.state_dict()
data["optimizer"] = optimizer.state_dict()
self.common.save(data, data["haar"]["global"])
logging.info("Checkpoint successfully saved")
logging.info("Current optimization cycle completed")
subcommand_dict["haar"] = HaarConfig
class ImagConfig(HaarConfig):
"""
Deprecated, use "haar" instead.
"""
# pylint: disable=too-few-public-methods
def __post_init__(self) -> None:
logging.warning("The 'imag' subcommand is deprecated, please use 'haar' instead.")
super().__post_init__()
subcommand_dict["imag"] = ImagConfig
"""
This file contains the Hamiltonian class, which is used to store the Hamiltonian and process iteration over each term in the Hamiltonian for given configurations.
"""
import os
import typing
import platformdirs
import torch
import torch.utils.cpp_extension
class Hamiltonian:
"""
The Hamiltonian type, which stores the Hamiltonian and processes iteration over each term in the Hamiltonian for given configurations.
"""
_hamiltonian_module: dict[tuple[str, int, int], object] = {}
@classmethod
def _set_torch_cuda_arch_list(cls) -> None:
"""
Set the CUDA architecture list for PyTorch to use when compiling the PyTorch extensions.
"""
if not torch.cuda.is_available():
return
if "TORCH_CUDA_ARCH_LIST" in os.environ:
return
supported_sm = [int(arch[3:]) for arch in torch.cuda.get_arch_list() if arch.startswith("sm_")]
max_supported_sm = max((sm // 10, sm % 10) for sm in supported_sm)
arch_list = set()
for i in range(torch.cuda.device_count()):
capability = min(max_supported_sm, torch.cuda.get_device_capability(i))
arch = f'{capability[0]}.{capability[1]}'
arch_list.add(arch)
os.environ["TORCH_CUDA_ARCH_LIST"] = ";".join(sorted(arch_list))
@classmethod
def _load_module(cls, device_type: str = "declaration", n_qubytes: int = 0, particle_cut: int = 0) -> object:
cls._set_torch_cuda_arch_list()
if device_type != "declaration":
cls._load_module("declaration", n_qubytes, particle_cut) # Ensure the declaration module is loaded first
key = (device_type, n_qubytes, particle_cut)
is_prepare = key == ("declaration", 0, 0)
name = "qmb_hamiltonian" if is_prepare else f"qmb_hamiltonian_{n_qubytes}_{particle_cut}"
if key not in cls._hamiltonian_module:
build_directory = platformdirs.user_cache_path("qmb", "kclab") / name / device_type
build_directory.mkdir(parents=True, exist_ok=True)
folder = os.path.dirname(__file__)
match device_type:
case "declaration":
sources = [f"{folder}/_hamiltonian.cpp"]
case "cpu":
sources = [f"{folder}/_hamiltonian_cpu.cpp"]
case "cuda":
sources = [f"{folder}/_hamiltonian_cuda.cu"]
case _:
raise ValueError("Unsupported device type")
cls._hamiltonian_module[key] = torch.utils.cpp_extension.load(
name=name,
sources=sources,
is_python_module=is_prepare,
extra_cflags=["-O3", "-ffast-math", "-march=native", f"-DN_QUBYTES={n_qubytes}", f"-DPARTICLE_CUT={particle_cut}", "-std=c++20"],
extra_cuda_cflags=["-O3", "--use_fast_math", f"-DN_QUBYTES={n_qubytes}", f"-DPARTICLE_CUT={particle_cut}", "-std=c++20"],
build_directory=build_directory,
)
if is_prepare: # pylint: disable=no-else-return
return cls._hamiltonian_module[key]
else:
return getattr(torch.ops, name)
@classmethod
def _prepare(cls, hamiltonian: dict[tuple[tuple[int, int], ...], complex]) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
return getattr(cls._load_module(), "prepare")(hamiltonian)
def __init__(self, hamiltonian: dict[tuple[tuple[int, int], ...], complex] | tuple[torch.Tensor, torch.Tensor, torch.Tensor], *, kind: str) -> None:
self.site: torch.Tensor
self.kind: torch.Tensor
self.coef: torch.Tensor
if isinstance(hamiltonian, dict):
self.site, self.kind, self.coef = self._prepare(hamiltonian)
self._sort_site_kind_coef()
else:
self.site, self.kind, self.coef = hamiltonian
self.particle_cut: int
match kind:
case "fermi":
self.particle_cut = 1
case "bose2":
self.particle_cut = 2
case _:
raise ValueError(f"Unknown kind: {kind}")
def _sort_site_kind_coef(self) -> None:
order = self.coef.norm(dim=1).argsort(descending=True)
self.site = self.site[order]
self.kind = self.kind[order]
self.coef = self.coef[order]
def _prepare_data(self, device: torch.device) -> None:
self.site = self.site.to(device=device).contiguous()
self.kind = self.kind.to(device=device).contiguous()
self.coef = self.coef.to(device=device).contiguous()
def apply_within(
self,
configs_i: torch.Tensor,
psi_i: torch.Tensor,
configs_j: torch.Tensor,
) -> torch.Tensor:
"""
Applies the Hamiltonian to the given vector.
Parameters
----------
configs_i : torch.Tensor
A uint8 tensor of shape [batch_size_i, n_qubytes] representing the input configurations.
psi_i : torch.Tensor
A complex64 tensor of shape [batch_size_i] representing the input amplitudes on the girven configurations.
configs_j : torch.Tensor
A uint8 tensor of shape [batch_size_j, n_qubytes] representing the output configurations.
Returns
-------
torch.Tensor
A tensor of shape [batch_size_j] representing the output amplitudes on the given configurations.
"""
self._prepare_data(configs_i.device)
_apply_within: typing.Callable[[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor], torch.Tensor]
_apply_within = getattr(self._load_module(configs_i.device.type, configs_i.size(1), self.particle_cut), "apply_within")
psi_j = torch.view_as_complex(_apply_within(configs_i, torch.view_as_real(psi_i), configs_j, self.site, self.kind, self.coef))
return psi_j
def find_relative(
self,
configs_i: torch.Tensor,
psi_i: torch.Tensor,
count_selected: int,
configs_exclude: torch.Tensor | None = None,
) -> torch.Tensor:
"""
Find relative configurations to the given configurations.
Parameters
----------
configs_i : torch.Tensor
A uint8 tensor of shape [batch_size, n_qubytes] representing the input configurations.
psi_i : torch.Tensor
A complex64 tensor of shape [batch_size] representing the input amplitudes on the girven configurations.
count_selected : int
The number of selected configurations to be returned.
configs_exclude : torch.Tensor, optional
A uint8 tensor of shape [batch_size_exclude, n_qubytes] representing the configurations to be excluded from the result, by default None
Returns
-------
torch.Tensor
The resulting configurations after applying the Hamiltonian, only the first `count_selected` configurations are guaranteed to be returned.
The order of the configurations is guaranteed to be sorted by estimated psi for the remaining configurations.
"""
if configs_exclude is None:
configs_exclude = configs_i
self._prepare_data(configs_i.device)
_find_relative: typing.Callable[[torch.Tensor, torch.Tensor, int, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor], torch.Tensor]
_find_relative = getattr(self._load_module(configs_i.device.type, configs_i.size(1), self.particle_cut), "find_relative")
configs_j = _find_relative(configs_i, torch.view_as_real(psi_i), count_selected, self.site, self.kind, self.coef, configs_exclude)
return configs_j
def single_relative(self, configs: torch.Tensor) -> torch.Tensor:
"""
Find a single relative configuration for each configurations.
Parameters
----------
configs : torch.Tensor
A uint8 tensor of shape [batch_size, n_qubytes] representing the input configurations.
Returns
-------
torch.Tensor
A uint8 tensor of shape [batch_size, n_qubytes] representing the resulting configurations.
"""
self._prepare_data(configs.device)
_single_relative: typing.Callable[[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor], torch.Tensor]
_single_relative = getattr(self._load_module(configs.device.type, configs.size(1), self.particle_cut), "single_relative")
configs_result = _single_relative(configs, self.site, self.kind, self.coef)
return configs_result
"""
This file offers an interface for defining Hubbard models on a two-dimensional lattice.
"""
import typing
import logging
import dataclasses
import torch
import tyro
from .mlp import WaveFunctionElectronUpDown as MlpWaveFunction
from .attention import WaveFunctionElectronUpDown as AttentionWaveFunction
from .hamiltonian import Hamiltonian
from .model_dict import model_dict, ModelProto, NetworkProto, NetworkConfigProto
@dataclasses.dataclass
class ModelConfig:
"""
The configuration for the Hubbard model.
"""
# The width of the hubbard lattice
m: typing.Annotated[int, tyro.conf.Positional]
# The height of the hubbard lattice
n: typing.Annotated[int, tyro.conf.Positional]
# The coefficient of t
t: typing.Annotated[float, tyro.conf.arg(aliases=["-t"])] = 1
# The coefficient of U
u: typing.Annotated[float, tyro.conf.arg(aliases=["-u"])] = 0
# The electron number, left empty for half-filling
electron_number: typing.Annotated[int | None, tyro.conf.arg(aliases=["-e"])] = None
# The ref energy of the model
ref_energy: typing.Annotated[float, tyro.conf.arg(aliases=["-r"])] = 0
def __post_init__(self) -> None:
if self.electron_number is None:
self.electron_number = self.m * self.n
if self.m <= 0 or self.n <= 0:
raise ValueError("The dimensions of the Hubbard model must be positive integers.")
if self.electron_number < 0 or self.electron_number > 2 * self.m * self.n:
raise ValueError(f"The electron number {self.electron_number} is out of bounds for a {self.m}x{self.n} lattice. Each site can host up to two electrons (spin up and spin down).")
class Model(ModelProto[ModelConfig]):
"""
This class handles the Hubbard model.
"""
network_dict: dict[str, type[NetworkConfigProto["Model"]]] = {}
config_t = ModelConfig
@classmethod
def default_group_name(cls, config: ModelConfig) -> str:
return f"Hubbard_{config.m}x{config.n}_t{config.t}_u{config.u}_e{config.electron_number}"
@classmethod
def _prepare_hamiltonian(cls, args: ModelConfig) -> dict[tuple[tuple[int, int], ...], complex]:
def _index(i: int, j: int, o: int) -> int:
return (i + j * args.m) * 2 + o
hamiltonian_dict: dict[tuple[tuple[int, int], ...], complex] = {}
for i in range(args.m):
for j in range(args.n):
# On-site interaction
hamiltonian_dict[(_index(i, j, 0), 1), (_index(i, j, 0), 0), (_index(i, j, 1), 1), (_index(i, j, 1), 0)] = args.u
# Nearest neighbor hopping
if i != 0:
hamiltonian_dict[(_index(i, j, 0), 1), (_index(i - 1, j, 0), 0)] = -args.t
hamiltonian_dict[(_index(i - 1, j, 0), 1), (_index(i, j, 0), 0)] = -args.t
hamiltonian_dict[(_index(i, j, 1), 1), (_index(i - 1, j, 1), 0)] = -args.t
hamiltonian_dict[(_index(i - 1, j, 1), 1), (_index(i, j, 1), 0)] = -args.t
if j != 0:
hamiltonian_dict[(_index(i, j, 0), 1), (_index(i, j - 1, 0), 0)] = -args.t
hamiltonian_dict[(_index(i, j - 1, 0), 1), (_index(i, j, 0), 0)] = -args.t
hamiltonian_dict[(_index(i, j, 1), 1), (_index(i, j - 1, 1), 0)] = -args.t
hamiltonian_dict[(_index(i, j - 1, 1), 1), (_index(i, j, 1), 0)] = -args.t
return hamiltonian_dict
def __init__(self, args: ModelConfig):
logging.info("Input arguments successfully parsed")
assert args.electron_number is not None
self.m: int = args.m
self.n: int = args.n
self.electron_number: int = args.electron_number
logging.info("Constructing Hubbard model with dimensions: width = %d, height = %d", self.m, self.n)
logging.info("The parameters of the model are: t = %.10f, U = %.10f, N = %d", args.t, args.u, args.electron_number)
logging.info("Initializing Hamiltonian for the lattice")
hamiltonian_dict: dict[tuple[tuple[int, int], ...], complex] = self._prepare_hamiltonian(args)
logging.info("Hamiltonian initialization complete")
self.ref_energy: float = args.ref_energy
logging.info("The ref energy is set to %.10f", self.ref_energy)
logging.info("Converting the Hamiltonian to internal Hamiltonian representation")
self.hamiltonian: Hamiltonian = Hamiltonian(hamiltonian_dict, kind="fermi")
logging.info("Internal Hamiltonian representation for model has been successfully created")
def apply_within(self, configs_i: torch.Tensor, psi_i: torch.Tensor, configs_j: torch.Tensor) -> torch.Tensor:
return self.hamiltonian.apply_within(configs_i, psi_i, configs_j)
def find_relative(self, configs_i: torch.Tensor, psi_i: torch.Tensor, count_selected: int, configs_exclude: torch.Tensor | None = None) -> torch.Tensor:
return self.hamiltonian.find_relative(configs_i, psi_i, count_selected, configs_exclude)
def single_relative(self, configs: torch.Tensor) -> torch.Tensor:
return self.hamiltonian.single_relative(configs)
def show_config(self, config: torch.Tensor) -> str:
string = "".join(f"{i:08b}"[::-1] for i in config.cpu().numpy())
return "[" + ".".join("".join(self._show_config_site(string[(i + j * self.m) * 2:(i + j * self.m) * 2 + 2]) for i in range(self.m)) for j in range(self.n)) + "]"
def _show_config_site(self, string: str) -> str:
match string:
case "00":
return " "
case "10":
return "↑"
case "01":
return "↓"
case "11":
return "↕"
case _:
raise ValueError(f"Invalid string: {string}")
model_dict["hubbard"] = Model
@dataclasses.dataclass
class MlpConfig:
"""
The configuration of the MLP network.
"""
# The hidden widths of the network
hidden: typing.Annotated[tuple[int, ...], tyro.conf.arg(aliases=["-w"])] = (512,)
def create(self, model: Model) -> NetworkProto:
"""
Create a MLP network for the model.
"""
logging.info("Hidden layer widths: %a", self.hidden)
network = MlpWaveFunction(
double_sites=model.m * model.n * 2,
physical_dim=2,
is_complex=True,
spin_up=model.electron_number // 2,
spin_down=model.electron_number - model.electron_number // 2,
hidden_size=self.hidden,
ordering=+1,
)
return network
Model.network_dict["mlp"] = MlpConfig
@dataclasses.dataclass
class AttentionConfig:
"""
The configuration of the attention network.
"""
# Embedding dimension
embedding_dim: typing.Annotated[int, tyro.conf.arg(aliases=["-e"])] = 512
# Heads number
heads_num: typing.Annotated[int, tyro.conf.arg(aliases=["-m"])] = 8
# Feedforward dimension
feed_forward_dim: typing.Annotated[int, tyro.conf.arg(aliases=["-f"])] = 2048
# Shared expert number
shared_expert_num: typing.Annotated[int, tyro.conf.arg(aliases=["-s"])] = 1
# Routed expert number
routed_expert_num: typing.Annotated[int, tyro.conf.arg(aliases=["-r"])] = 0
# Selected expert number
selected_expert_num: typing.Annotated[int, tyro.conf.arg(aliases=["-c"])] = 0
# Network depth
depth: typing.Annotated[int, tyro.conf.arg(aliases=["-d"])] = 6
def create(self, model: Model) -> NetworkProto:
"""
Create an attention network for the model.
"""
logging.info(
"Attention network configuration: "
"embedding dimension: %d, "
"number of heads: %d, "
"feed-forward dimension: %d, "
"shared expert number: %d, "
"routed expert number: %d, "
"selected expert number: %d, "
"depth: %d",
self.embedding_dim,
self.heads_num,
self.feed_forward_dim,
self.shared_expert_num,
self.routed_expert_num,
self.selected_expert_num,
self.depth,
)
network = AttentionWaveFunction(
double_sites=model.m * model.n * 2,
physical_dim=2,
is_complex=True,
spin_up=model.electron_number // 2,
spin_down=model.electron_number - model.electron_number // 2,
embedding_dim=self.embedding_dim,
heads_num=self.heads_num,
feed_forward_dim=self.feed_forward_dim,
shared_num=self.shared_expert_num,
routed_num=self.routed_expert_num,
selected_num=self.selected_expert_num,
depth=self.depth,
ordering=+1,
)
return network
Model.network_dict["attention"] = AttentionConfig
"""
This file offers an interface for defining Ising-like models on a two-dimensional lattice.
"""
import typing
import logging
import dataclasses
import collections
import torch
import tyro
from .mlp import WaveFunctionNormal as MlpWaveFunction
from .attention import WaveFunctionNormal as AttentionWaveFunction
from .hamiltonian import Hamiltonian
from .model_dict import model_dict, ModelProto, NetworkProto, NetworkConfigProto
@dataclasses.dataclass
class ModelConfig:
"""
The configuration for the Ising-like model.
"""
# pylint: disable=too-many-instance-attributes
# The width of the ising lattice
m: typing.Annotated[int, tyro.conf.Positional]
# The height of the ising lattice
n: typing.Annotated[int, tyro.conf.Positional]
# The coefficient of X
x: typing.Annotated[float, tyro.conf.arg(aliases=["-xe"])] = 0
# The coefficient of Y
y: typing.Annotated[float, tyro.conf.arg(aliases=["-ye"])] = 0
# The coefficient of Z
z: typing.Annotated[float, tyro.conf.arg(aliases=["-ze"])] = 0
# The coefficient of XX for horizontal bond
xh: typing.Annotated[float, tyro.conf.arg(aliases=["-xh"])] = 0
# The coefficient of YY for horizontal bond
yh: typing.Annotated[float, tyro.conf.arg(aliases=["-yh"])] = 0
# The coefficient of ZZ for horizontal bond
zh: typing.Annotated[float, tyro.conf.arg(aliases=["-zh"])] = 0
# The coefficient of XX for vertical bond
xv: typing.Annotated[float, tyro.conf.arg(aliases=["-xv"])] = 0
# The coefficient of YY for vertical bond
yv: typing.Annotated[float, tyro.conf.arg(aliases=["-yv"])] = 0
# The coefficient of ZZ for vertical bond
zv: typing.Annotated[float, tyro.conf.arg(aliases=["-zv"])] = 0
# The coefficient of XX for diagonal bond
xd: typing.Annotated[float, tyro.conf.arg(aliases=["-xd"])] = 0
# The coefficient of YY for diagonal bond
yd: typing.Annotated[float, tyro.conf.arg(aliases=["-yd"])] = 0
# The coefficient of ZZ for diagonal bond
zd: typing.Annotated[float, tyro.conf.arg(aliases=["-zd"])] = 0
# The coefficient of XX for antidiagonal bond
xa: typing.Annotated[float, tyro.conf.arg(aliases=["-xa"])] = 0
# The coefficient of YY for antidiagonal bond
ya: typing.Annotated[float, tyro.conf.arg(aliases=["-ya"])] = 0
# The coefficient of ZZ for antidiagonal bond
za: typing.Annotated[float, tyro.conf.arg(aliases=["-za"])] = 0
# The ref energy of the model
ref_energy: typing.Annotated[float, tyro.conf.arg(aliases=["-r"])] = 0
class Model(ModelProto[ModelConfig]):
"""
This class handles the Ising-like model.
"""
network_dict: dict[str, type[NetworkConfigProto["Model"]]] = {}
config_t = ModelConfig
@classmethod
def default_group_name(cls, config: ModelConfig) -> str:
# pylint: disable=too-many-locals
x = f"_x{config.x}" if config.x != 0 else ""
y = f"_y{config.y}" if config.y != 0 else ""
z = f"_z{config.z}" if config.z != 0 else ""
xh = f"_xh{config.xh}" if config.xh != 0 else ""
yh = f"_yh{config.yh}" if config.yh != 0 else ""
zh = f"_zh{config.zh}" if config.zh != 0 else ""
xv = f"_xv{config.xv}" if config.xv != 0 else ""
yv = f"_yv{config.yv}" if config.yv != 0 else ""
zv = f"_zv{config.zv}" if config.zv != 0 else ""
xd = f"_xd{config.xd}" if config.xd != 0 else ""
yd = f"_yd{config.yd}" if config.yd != 0 else ""
zd = f"_zd{config.zd}" if config.zd != 0 else ""
xa = f"_xa{config.xa}" if config.xa != 0 else ""
ya = f"_ya{config.ya}" if config.ya != 0 else ""
za = f"_za{config.za}" if config.za != 0 else ""
desc = x + y + z + xh + yh + zh + xv + yv + zv + xd + yd + zd + xa + ya + za
return f"Ising_{config.m}_{config.n}" + desc
@classmethod
def _prepare_hamiltonian(cls, args: ModelConfig) -> dict[tuple[tuple[int, int], ...], complex]:
# pylint: disable=too-many-branches
# pylint: disable=too-many-nested-blocks
def _index(i: int, j: int) -> int:
return i + j * args.m
def _x(i: int, j: int) -> tuple[tuple[tuple[tuple[int, int], ...], complex], ...]:
return (
(((_index(i, j), 1),), +1),
(((_index(i, j), 0),), +1),
)
def _y(i: int, j: int) -> tuple[tuple[tuple[tuple[int, int], ...], complex], ...]:
return (
(((_index(i, j), 1),), -1j),
(((_index(i, j), 0),), +1j),
)
def _z(i: int, j: int) -> tuple[tuple[tuple[tuple[int, int], ...], complex], ...]:
return (
(((_index(i, j), 1), (_index(i, j), 0)), +1),
(((_index(i, j), 0), (_index(i, j), 1)), -1),
)
hamiltonian: dict[tuple[tuple[int, int], ...], complex] = collections.defaultdict(lambda: 0)
# Express spin pauli matrix in hard core boson language.
for i in range(args.m):
for j in range(args.n):
k: tuple[tuple[int, int], ...]
k1: tuple[tuple[int, int], ...]
k2: tuple[tuple[int, int], ...]
v: complex
v1: complex
v2: complex
if True: # pylint: disable=using-constant-test
if args.x != 0:
for k, v in _x(i, j):
hamiltonian[k] += v * args.x
if args.y != 0:
for k, v in _y(i, j):
hamiltonian[k] += v * args.y
if args.z != 0:
for k, v in _z(i, j):
hamiltonian[k] += v * args.z
if i != 0:
if args.xh != 0:
for k1, v1 in _x(i, j):
for k2, v2 in _x(i - 1, j):
hamiltonian[(*k1, *k2)] += v1 * v2 * args.xh
if args.yh != 0:
for k1, v1 in _y(i, j):
for k2, v2 in _y(i - 1, j):
hamiltonian[(*k1, *k2)] += v1 * v2 * args.yh
if args.zh != 0:
for k1, v1 in _z(i, j):
for k2, v2 in _z(i - 1, j):
hamiltonian[(*k1, *k2)] += v1 * v2 * args.zh
if j != 0:
if args.xv != 0:
for k1, v1 in _x(i, j):
for k2, v2 in _x(i, j - 1):
hamiltonian[(*k1, *k2)] += v1 * v2 * args.xv
if args.yv != 0:
for k1, v1 in _y(i, j):
for k2, v2 in _y(i, j - 1):
hamiltonian[(*k1, *k2)] += v1 * v2 * args.yv
if args.zv != 0:
for k1, v1 in _z(i, j):
for k2, v2 in _z(i, j - 1):
hamiltonian[(*k1, *k2)] += v1 * v2 * args.zv
if i != 0 and j != 0:
if args.xd != 0:
for k1, v1 in _x(i, j):
for k2, v2 in _x(i - 1, j - 1):
hamiltonian[(*k1, *k2)] += v1 * v2 * args.xd
if args.yd != 0:
for k1, v1 in _y(i, j):
for k2, v2 in _y(i - 1, j - 1):
hamiltonian[(*k1, *k2)] += v1 * v2 * args.yd
if args.zd != 0:
for k1, v1 in _z(i, j):
for k2, v2 in _z(i - 1, j - 1):
hamiltonian[(*k1, *k2)] += v1 * v2 * args.zd
if i != 0 and j != args.n - 1:
if args.xa != 0:
for k1, v1 in _x(i, j):
for k2, v2 in _x(i - 1, j + 1):
hamiltonian[(*k1, *k2)] += v1 * v2 * args.xa
if args.ya != 0:
for k1, v1 in _y(i, j):
for k2, v2 in _y(i - 1, j + 1):
hamiltonian[(*k1, *k2)] += v1 * v2 * args.ya
if args.za != 0:
for k1, v1 in _z(i, j):
for k2, v2 in _z(i - 1, j + 1):
hamiltonian[(*k1, *k2)] += v1 * v2 * args.za
return hamiltonian
def __init__(self, args: ModelConfig) -> None:
logging.info("Input arguments successfully parsed")
self.m: int = args.m
self.n: int = args.n
logging.info("Constructing Ising model with dimensions: width = %d, height = %d", self.m, self.n)
logging.info("Element-wise coefficients: X = %.10f, Y = %.10f, Z = %.10f", args.x, args.y, args.z)
logging.info("Horizontal bond coefficients: X = %.10f, Y = %.10f, Z = %.10f", args.xh, args.yh, args.zh)
logging.info("Vertical bond coefficients: X = %.10f, Y = %.10f, Z = %.10f", args.xv, args.yv, args.zv)
logging.info("Diagonal bond coefficients: X = %.10f, Y = %.10f, Z = %.10f", args.xd, args.yd, args.zd)
logging.info("Anti-diagonal bond coefficients: X = %.10f, Y = %.10f, Z = %.10f", args.xa, args.ya, args.za)
logging.info("Initializing Hamiltonian for the lattice")
hamiltonian_dict: dict[tuple[tuple[int, int], ...], complex] = self._prepare_hamiltonian(args)
logging.info("Hamiltonian initialization complete")
self.ref_energy: float = args.ref_energy
logging.info("The ref energy is set to %.10f", self.ref_energy)
logging.info("Converting the Hamiltonian to internal Hamiltonian representation")
self.hamiltonian: Hamiltonian = Hamiltonian(hamiltonian_dict, kind="bose2")
logging.info("Internal Hamiltonian representation for model has been successfully created")
def apply_within(self, configs_i: torch.Tensor, psi_i: torch.Tensor, configs_j: torch.Tensor) -> torch.Tensor:
return self.hamiltonian.apply_within(configs_i, psi_i, configs_j)
def find_relative(self, configs_i: torch.Tensor, psi_i: torch.Tensor, count_selected: int, configs_exclude: torch.Tensor | None = None) -> torch.Tensor:
return self.hamiltonian.find_relative(configs_i, psi_i, count_selected, configs_exclude)
def single_relative(self, configs: torch.Tensor) -> torch.Tensor:
return self.hamiltonian.single_relative(configs)
def show_config(self, config: torch.Tensor) -> str:
string = "".join(f"{i:08b}"[::-1] for i in config.cpu().numpy())
return "[" + ".".join("".join("↑" if string[i + j * self.m] == "0" else "↓" for i in range(self.m)) for j in range(self.n)) + "]"
model_dict["ising"] = Model
@dataclasses.dataclass
class MlpConfig:
"""
The configuration of the MLP network.
"""
# The hidden widths of the network
hidden: typing.Annotated[tuple[int, ...], tyro.conf.arg(aliases=["-w"])] = (512,)
def create(self, model: Model) -> NetworkProto:
"""
Create a MLP network for the model.
"""
logging.info("Hidden layer widths: %a", self.hidden)
network = MlpWaveFunction(
sites=model.m * model.n,
physical_dim=2,
is_complex=True,
hidden_size=self.hidden,
ordering=+1,
)
return network
Model.network_dict["mlp"] = MlpConfig
@dataclasses.dataclass
class AttentionConfig:
"""
The configuration of the attention network.
"""
# Embedding dimension
embedding_dim: typing.Annotated[int, tyro.conf.arg(aliases=["-e"])] = 512
# Heads number
heads_num: typing.Annotated[int, tyro.conf.arg(aliases=["-m"])] = 8
# Feedforward dimension
feed_forward_dim: typing.Annotated[int, tyro.conf.arg(aliases=["-f"])] = 2048
# Shared expert number
shared_expert_num: typing.Annotated[int, tyro.conf.arg(aliases=["-s"])] = 1
# Routed expert number
routed_expert_num: typing.Annotated[int, tyro.conf.arg(aliases=["-r"])] = 0
# Selected expert number
selected_expert_num: typing.Annotated[int, tyro.conf.arg(aliases=["-c"])] = 0
# Network depth
depth: typing.Annotated[int, tyro.conf.arg(aliases=["-d"])] = 6
def create(self, model: Model) -> NetworkProto:
"""
Create an attention network for the model.
"""
logging.info(
"Attention network configuration: "
"embedding dimension: %d, "
"number of heads: %d, "
"feed-forward dimension: %d, "
"shared expert number: %d, "
"routed expert number: %d, "
"selected expert number: %d, "
"depth: %d",
self.embedding_dim,
self.heads_num,
self.feed_forward_dim,
self.shared_expert_num,
self.routed_expert_num,
self.selected_expert_num,
self.depth,
)
network = AttentionWaveFunction(
sites=model.m * model.n,
physical_dim=2,
is_complex=True,
embedding_dim=self.embedding_dim,
heads_num=self.heads_num,
feed_forward_dim=self.feed_forward_dim,
shared_num=self.shared_expert_num,
routed_num=self.routed_expert_num,
selected_num=self.selected_expert_num,
depth=self.depth,
ordering=+1,
)
return network
Model.network_dict["attention"] = AttentionConfig
"""
This file lists loss functions used in the direct optimization of wavefunction.
"""
import dataclasses
import inspect
import torch
from . import losses
from .subcommand_dict import subcommand_dict
@dataclasses.dataclass
class ListLossConfig:
"""
The list of loss functions used in the direct optimization of wavefunction.
"""
def main(self) -> None:
"""
The main function for listing loss functions.
"""
for name, member in inspect.getmembers(losses):
if isinstance(member, torch.jit.ScriptFunction) and not name.startswith("_"):
print(name, member.__doc__)
subcommand_dict["list_loss"] = ListLossConfig
"""
This file contains various loss functions used in the other script.
These functions help calculate the difference between the target wave function and the current state wave function.
"""
import math
import torch
@torch.jit.script
def _scaled_abs(s_abs: torch.Tensor, min_magnitude: float) -> torch.Tensor:
s_large = torch.log(s_abs)
s_small = (s_abs - min_magnitude) / min_magnitude + math.log(min_magnitude)
return torch.where(s_abs > min_magnitude, s_large, s_small)
@torch.jit.script
def _scaled_angle(scale: torch.Tensor, min_magnitude: float) -> torch.Tensor:
return 1 / (1 + min_magnitude / scale)
@torch.jit.script
def hybrid(s: torch.Tensor, t: torch.Tensor, min_magnitude: float = 1e-12) -> torch.Tensor:
"""
Compute the loss using a hybrid strategy that specifically accounts for the small magnitudes of the wave function.
"""
# In typical data scales, taking the difference of the log of s and t as the loss is appropriate.
# However, issues arise when s or t is particularly small.
# The log function amplifies the loss near small values and diminishes the loss near large values.
# This is generally reasonable, as we expect the same level of effort for changes like 0.1 to 0.01 and 1 to 0.1.
# But when the absolute value is extremely small, such as wanting 1e-20 to become 1e-30, we have little motivation to optimize this.
# Therefore, we need to reduce the gradient of the mapping function (currently log) near small values.
# We decide to make it linear near small values because we do not want to optimize it further.
# Even if the number of Hamiltonian terms reaches 1e8, the accumulated error of 1e-12 terms would only be 1e-4, less than chemical precision.
# On the other hand, changes in the angle near small values are also meaningless.
# However, for sufficiently large values, we want them to be optimized uniformly.
# For example, we want the effort for changes from -1 to +1 to be similar to that from -0.1 to +0.1.
# Therefore, the angle difference should be multiplied by a factor that is constant at 1 near large values but linearly converges to 0 near small values.
s_abs = torch.sqrt(s.real**2 + s.imag**2)
t_abs = torch.sqrt(t.real**2 + t.imag**2)
s_angle = torch.atan2(s.imag, s.real)
t_angle = torch.atan2(t.imag, t.real)
s_magnitude = _scaled_abs(s_abs, min_magnitude)
t_magnitude = _scaled_abs(t_abs, min_magnitude)
error_real = (s_magnitude - t_magnitude) / (2 * torch.pi)
error_imag = (s_angle - t_angle) / (2 * torch.pi)
error_imag = error_imag - error_imag.round()
scale = torch.where(s_abs > t_abs, s_abs, t_abs)
error_imag = error_imag * _scaled_angle(scale, min_magnitude)
loss = error_real**2 + error_imag**2
return loss.mean()
@torch.jit.script
def log(s: torch.Tensor, t: torch.Tensor) -> torch.Tensor:
"""
Calculate the loss based on the difference between the log of the current state wave function and the target wave function.
"""
s_abs = torch.sqrt(s.real**2 + s.imag**2)
t_abs = torch.sqrt(t.real**2 + t.imag**2)
s_angle = torch.atan2(s.imag, s.real)
t_angle = torch.atan2(t.imag, t.real)
s_magnitude = torch.log(s_abs)
t_magnitude = torch.log(t_abs)
error_real = (s_magnitude - t_magnitude) / (2 * torch.pi)
error_imag = (s_angle - t_angle) / (2 * torch.pi)
error_imag = error_imag - error_imag.round()
loss = error_real**2 + error_imag**2
return loss.mean()
@torch.jit.script
def sum_reweighted_log(s: torch.Tensor, t: torch.Tensor) -> torch.Tensor:
"""
Calculate the loss based on the difference between the log of the current state wave function and the target wave function,
but reweighted by the sum of the abs of the current state wave function and the target wave function.
"""
s_abs = torch.sqrt(s.real**2 + s.imag**2)
t_abs = torch.sqrt(t.real**2 + t.imag**2)
s_angle = torch.atan2(s.imag, s.real)
t_angle = torch.atan2(t.imag, t.real)
s_magnitude = torch.log(s_abs)
t_magnitude = torch.log(t_abs)
error_real = (s_magnitude - t_magnitude) / (2 * torch.pi)
error_imag = (s_angle - t_angle) / (2 * torch.pi)
error_imag = error_imag - error_imag.round()
loss = error_real**2 + error_imag**2
loss = loss * (t_abs + s_abs)
return loss.mean()
@torch.jit.script
def sum_filtered_log(s: torch.Tensor, t: torch.Tensor, min_magnitude: float = 1e-10) -> torch.Tensor:
"""
Calculate the loss based on the difference between the log of the current state wave function and the target wave function,
but filtered by the sum of the abs of the current state wave function and the target wave function.
"""
s_abs = torch.sqrt(s.real**2 + s.imag**2)
t_abs = torch.sqrt(t.real**2 + t.imag**2)
s_angle = torch.atan2(s.imag, s.real)
t_angle = torch.atan2(t.imag, t.real)
s_magnitude = torch.log(s_abs)
t_magnitude = torch.log(t_abs)
error_real = (s_magnitude - t_magnitude) / (2 * torch.pi)
error_imag = (s_angle - t_angle) / (2 * torch.pi)
error_imag = error_imag - error_imag.round()
loss = error_real**2 + error_imag**2
loss = loss * _scaled_angle(t_abs + s_abs, min_magnitude)
return loss.mean()
@torch.jit.script
def sum_filtered_scaled_log(s: torch.Tensor, t: torch.Tensor, min_magnitude: float = 1e-10) -> torch.Tensor:
"""
Calculate the loss based on the difference between the scaled log of the current state wave function and the target wave function,
but filtered by the sum of the abs of the current state wave function and the target wave function.
"""
s_abs = torch.sqrt(s.real**2 + s.imag**2)
t_abs = torch.sqrt(t.real**2 + t.imag**2)
s_angle = torch.atan2(s.imag, s.real)
t_angle = torch.atan2(t.imag, t.real)
s_magnitude = _scaled_abs(s_abs, min_magnitude)
t_magnitude = _scaled_abs(t_abs, min_magnitude)
error_real = (s_magnitude - t_magnitude) / (2 * torch.pi)
error_imag = (s_angle - t_angle) / (2 * torch.pi)
error_imag = error_imag - error_imag.round()
loss = error_real**2 + error_imag**2
loss = loss * _scaled_angle(t_abs + s_abs, min_magnitude)
return loss.mean()
@torch.jit.script
def sum_reweighted_angle_log(s: torch.Tensor, t: torch.Tensor) -> torch.Tensor:
"""
Calculate the loss based on the difference between the log of the current state wave function and the target wave function,
but angle only reweighted by the sum of the abs of the current state wave function and the target wave function.
"""
s_abs = torch.sqrt(s.real**2 + s.imag**2)
t_abs = torch.sqrt(t.real**2 + t.imag**2)
s_angle = torch.atan2(s.imag, s.real)
t_angle = torch.atan2(t.imag, t.real)
s_magnitude = torch.log(s_abs)
t_magnitude = torch.log(t_abs)
error_real = (s_magnitude - t_magnitude) / (2 * torch.pi)
error_imag = (s_angle - t_angle) / (2 * torch.pi)
error_imag = error_imag - error_imag.round()
error_imag = error_imag * (t_abs + s_abs)
loss = error_real**2 + error_imag**2
return loss.mean()
@torch.jit.script
def sum_filtered_angle_log(s: torch.Tensor, t: torch.Tensor, min_magnitude: float = 1e-10) -> torch.Tensor:
"""
Calculate the loss based on the difference between the log of the current state wave function and the target wave function,
but angle only filtered by the sum of the abs of the current state wave function and the target wave function.
"""
s_abs = torch.sqrt(s.real**2 + s.imag**2)
t_abs = torch.sqrt(t.real**2 + t.imag**2)
s_angle = torch.atan2(s.imag, s.real)
t_angle = torch.atan2(t.imag, t.real)
s_magnitude = torch.log(s_abs)
t_magnitude = torch.log(t_abs)
error_real = (s_magnitude - t_magnitude) / (2 * torch.pi)
error_imag = (s_angle - t_angle) / (2 * torch.pi)
error_imag = error_imag - error_imag.round()
error_imag = error_imag * _scaled_angle(t_abs + s_abs, min_magnitude)
loss = error_real**2 + error_imag**2
return loss.mean()
@torch.jit.script
def sum_filtered_angle_scaled_log(s: torch.Tensor, t: torch.Tensor, min_magnitude: float = 1e-10) -> torch.Tensor:
"""
Calculate the loss based on the difference between the scaled log of the current state wave function and the target wave function,
but angle only filtered by the sum of the abs of the current state wave function and the target wave function.
"""
s_abs = torch.sqrt(s.real**2 + s.imag**2)
t_abs = torch.sqrt(t.real**2 + t.imag**2)
s_angle = torch.atan2(s.imag, s.real)
t_angle = torch.atan2(t.imag, t.real)
s_magnitude = _scaled_abs(s_abs, min_magnitude)
t_magnitude = _scaled_abs(t_abs, min_magnitude)
error_real = (s_magnitude - t_magnitude) / (2 * torch.pi)
error_imag = (s_angle - t_angle) / (2 * torch.pi)
error_imag = error_imag - error_imag.round()
error_imag = error_imag * _scaled_angle(t_abs + s_abs, min_magnitude)
loss = error_real**2 + error_imag**2
return loss.mean()
@torch.jit.script
def direct(s: torch.Tensor, t: torch.Tensor) -> torch.Tensor:
"""
Calculate the loss based on the difference between the current state wave function and the target wave function directly.
"""
error = s - t
loss = error.real**2 + error.imag**2
return loss.mean()
"""
This file implements the MLP network from https://arxiv.org/pdf/2109.12606 with the sampling method introduced in https://arxiv.org/pdf/2408.07625.
"""
import itertools
import torch
from .bitspack import pack_int, unpack_int
class FakeLinear(torch.nn.Module):
"""
A fake linear layer with zero input dimension to avoid PyTorch initialization warnings.
"""
def __init__(self, dim_in: int, dim_out: int) -> None:
super().__init__()
assert dim_in == 0
self.bias: torch.nn.Parameter = torch.nn.Parameter(torch.zeros([dim_out]))
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Forward pass for the fake linear layer.
"""
batch, _ = x.shape
return self.bias.view([1, -1]).expand([batch, -1])
def select_linear_layer(dim_in: int, dim_out: int) -> torch.nn.Module:
"""
Selects between a fake linear layer and a standard one to avoid initialization warnings when dim_in is zero.
"""
if dim_in == 0: # pylint: disable=no-else-return
return FakeLinear(dim_in, dim_out)
else:
return torch.nn.Linear(dim_in, dim_out)
class MLP(torch.nn.Module):
"""
This module implements multiple layers MLP with given dim_input, dim_output and hidden_size.
"""
def __init__(self, dim_input: int, dim_output: int, hidden_size: tuple[int, ...]) -> None:
super().__init__()
self.dim_input: int = dim_input
self.dim_output: int = dim_output
self.hidden_size: tuple[int, ...] = hidden_size
dimensions: list[int] = [dim_input] + list(hidden_size) + [dim_output]
linears: list[torch.nn.Module] = [select_linear_layer(i, j) for i, j in itertools.pairwise(dimensions)]
modules: list[torch.nn.Module] = [module for linear in linears for module in (linear, torch.nn.SiLU())][:-1]
self.model: torch.nn.Module = torch.nn.Sequential(*modules)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Forward pass for the MLP.
"""
return self.model(x)
class WaveFunctionElectronUpDown(torch.nn.Module):
"""
The wave function for the MLP network.
This module maintains the conservation of particle number of spin-up and spin-down electrons.
"""
# pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments
self,
*,
double_sites: int, # Number of qubits, where each pair of qubits represents a site in the MLP model
physical_dim: int, # Dimension of the physical space, which is always 2 for MLP
is_complex: bool, # Indicates whether the wave function is complex-valued, which is always true for MLP
spin_up: int, # Number of spin-up electrons
spin_down: int, # Number of spin-down electrons
hidden_size: tuple[int, ...], # Hidden layer sizes for the MLPs used in the amplitude and phase networks
ordering: int | list[int], # Ordering of sites: +1 for normal order, -1 for reversed order, or a custom order list
) -> None:
super().__init__()
assert double_sites % 2 == 0
self.double_sites: int = double_sites
self.sites: int = double_sites // 2
assert physical_dim == 2
assert is_complex == True # pylint: disable=singleton-comparison
self.spin_up: int = spin_up
self.spin_down: int = spin_down
self.hidden_size: tuple[int, ...] = hidden_size
# Amplitude and Phase Networks for Each Site
# The amplitude network accept qubits from previous sites and outputs a vector of dimension 4,
# representing the configuration of two qubits on the current site.
# And the phase network accept qubits from all sites and outputs the phase,
self.amplitude: torch.nn.ModuleList = torch.nn.ModuleList([MLP(i * 2, 4, self.hidden_size) for i in range(self.sites)])
self.phase: torch.nn.Module = MLP(self.double_sites, 1, self.hidden_size)
# Site Ordering Configuration
# +1 for normal order, -1 for reversed order
if isinstance(ordering, int) and ordering == +1:
ordering = list(range(self.sites))
if isinstance(ordering, int) and ordering == -1:
ordering = list(reversed(range(self.sites)))
self.ordering: torch.Tensor
self.register_buffer("ordering", torch.tensor(ordering, dtype=torch.int64))
self.ordering_reversed: torch.Tensor
self.register_buffer("ordering_reversed", torch.scatter(torch.zeros(self.sites, dtype=torch.int64), 0, self.ordering, torch.arange(self.sites, dtype=torch.int64)))
# Dummy Parameter for Device and Dtype Retrieval
# This parameter is used to infer the device and dtype of the model.
self.dummy_param = torch.nn.Parameter(torch.empty(0))
@torch.jit.export
def _mask(self, x: torch.Tensor) -> torch.Tensor:
"""
Determined whether we could append spin up or spin down after uncompleted configurations.
"""
# pylint: disable=too-many-locals
# x: batch_size * current_site * 2
# x represents the uncompleted configurations
current_site = x.shape[1]
# number: batch_size * 2
# number denotes the total electron count for each uncompleted configuration
number = x.sum(dim=1)
# up/down_electron/hole: batch_size
# These variables store the count of electrons and holes for each uncompleted configuration
up_electron = number[:, 0]
down_electron = number[:, 1]
up_hole = current_site - up_electron
down_hole = current_site - down_electron
# add_up/down_electron/hole: batch_size
# These variables determine whether it is possible to append an up/down electron/hole
add_up_electron = up_electron < self.spin_up
add_down_electron = down_electron < self.spin_down
add_up_hole = up_hole < self.sites - self.spin_up
add_down_hole = down_hole < self.sites - self.spin_down
# add_up: batch_size * 2 * 1
# add_down: batch_size * 1 * 2
# These tensors represent the feasibility of adding up/down electrons/holes
add_up = torch.stack([add_up_hole, add_up_electron], dim=-1).unsqueeze(-1)
add_down = torch.stack([add_down_hole, add_down_electron], dim=-1).unsqueeze(-2)
# add: batch_size * 2 * 2
# add represents the logical AND of add_up and add_down, indicating the feasibility of appending specific electron/hole combinations
# add[_, 0, 0] indicates the possibility of adding an up hole and a down hole
# add[_, 0, 1] indicates the possibility of adding an up hole and a down electron
# add[_, 1, 0] indicates the possibility of adding an up electron and a down hole
# add[_, 1, 1] indicates the possibility of adding an up electron and a down electron
add = torch.logical_and(add_up, add_down)
return add
@torch.jit.export
def _normalize_amplitude(self, x: torch.Tensor) -> torch.Tensor:
"""
Normalize the log amplitude of uncompleted configurations.
"""
# x: batch_size * 2 * 2
# param: batch_size
param = (2 * x).exp().sum(dim=[-2, -1]).log() / 2
x = x - param.unsqueeze(-1).unsqueeze(-1)
# 1 = param = sqrt(sum(x.exp()^2)) now
return x
@torch.jit.export
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Compute the wave function psi for the given configurations.
"""
device: torch.device = self.dummy_param.device
dtype: torch.dtype = self.dummy_param.dtype
batch_size: int = x.shape[0]
# x: batch_size * sites * 2
x = unpack_int(x, size=1, last_dim=self.double_sites).view([batch_size, self.sites, 2])
# Apply ordering
x = torch.index_select(x, 1, self.ordering_reversed)
x_float: torch.Tensor = x.to(dtype=dtype)
arange: torch.Tensor = torch.arange(batch_size, device=device)
total_amplitude: torch.Tensor = torch.zeros([batch_size], device=device, dtype=dtype).double()
for i, amplitude_m in enumerate(self.amplitude):
# delta_amplitude: batch_size * 2 * 2
# delta_amplitude represents the amplitude changes for the configurations at the new site.
delta_amplitude: torch.Tensor = amplitude_m(x_float[:, :i].view([batch_size, 2 * i])).view([batch_size, 2, 2])
# Apply a filter mask to the amplitude to ensure the conservation of particle number.
delta_amplitude = delta_amplitude + torch.where(self._mask(x[:, :i]), 0, -torch.inf)
# normalized_delta_amplitude: batch_size * 2 * 2
# Normalize the delta amplitude.
normalized_delta_amplitude: torch.Tensor = self._normalize_amplitude(delta_amplitude)
# selected_delta_amplitude: batch
# Select the delta amplitude for the current site.
xi_int32: torch.Tensor = x[:, i, :].to(dtype=torch.int32)
selected_delta_amplitude: torch.Tensor = normalized_delta_amplitude[arange, xi_int32[:, 0], xi_int32[:, 1]]
total_amplitude = total_amplitude + selected_delta_amplitude.double()
total_phase: torch.Tensor = self.phase(x_float.view([batch_size, self.double_sites])).view([batch_size])
return torch.view_as_complex(torch.stack([total_amplitude, total_phase.double()], dim=-1)).exp()
@torch.jit.export
def generate_unique(self, batch_size: int, block_num: int = 1) -> tuple[torch.Tensor, torch.Tensor, None, None]:
"""
Generate configurations uniquely.
see https://arxiv.org/pdf/2408.07625.
"""
# pylint: disable=too-many-locals
# pylint: disable=invalid-name
# pylint: disable=too-many-statements
device: torch.device = self.dummy_param.device
dtype: torch.dtype = self.dummy_param.dtype
# x: local_batch_size * current_site * 2
x: torch.Tensor = torch.empty([1, 0, 2], device=device, dtype=torch.uint8)
# (un)perturbed_log_probability : local_batch_size
unperturbed_probability: torch.Tensor = torch.tensor([0], dtype=dtype, device=device)
perturbed_probability: torch.Tensor = torch.tensor([0], dtype=dtype, device=device)
for i, amplitude_m in enumerate(self.amplitude):
local_batch_size: int = x.shape[0]
x_float: torch.Tensor = x.to(dtype=dtype)
# delta_amplitude: batch * 2 * 2
# delta_amplitude represents the amplitude changes for the configurations at the new site.
local_batch_size_block = local_batch_size // block_num
remainder = local_batch_size % block_num
delta_amplitude_block_list: list[torch.Tensor] = []
for j in range(block_num):
if j < remainder:
current_local_batch_size_block = local_batch_size_block + 1
else:
current_local_batch_size_block = local_batch_size_block
start_index = j * local_batch_size_block + min(j, remainder)
end_index = start_index + current_local_batch_size_block
batch_indices_block = torch.arange(start_index, end_index, device=device, dtype=torch.int64)
delta_amplitude_block = amplitude_m(x_float.view([local_batch_size, 2 * i])[batch_indices_block]).view([current_local_batch_size_block, 2, 2])
delta_amplitude_block_list.append(delta_amplitude_block)
delta_amplitude: torch.Tensor = torch.cat(delta_amplitude_block_list)
# Apply a filter mask to the amplitude to ensure the conservation of particle number.
delta_amplitude = delta_amplitude + torch.where(self._mask(x), 0, -torch.inf)
# normalized_delta_amplitude: batch_size * 2 * 2
# Normalize the delta amplitude.
normalized_delta_amplitude: torch.Tensor = self._normalize_amplitude(delta_amplitude)
# The delta unperturbed prob for all batch and 4 adds
l: torch.Tensor = (2 * normalized_delta_amplitude).view([local_batch_size, 4])
# and add to get the current unperturbed prob
l = unperturbed_probability.view([local_batch_size, 1]) + l
# Get perturbed prob by adding GUMBEL(0)
L: torch.Tensor = l - (-torch.rand_like(l).log()).log()
# Get max perturbed prob
Z: torch.Tensor = L.max(dim=-1).values.view([local_batch_size, 1])
# Evaluate the conditioned prob
tildeL: torch.Tensor = -torch.log(torch.exp(-perturbed_probability.view([local_batch_size, 1])) - torch.exp(-Z) + torch.exp(-L))
# Calculate appended configurations for 4 adds
# local_batch_size * current_site * 2 + local_batch_size * 1 * 2
x0: torch.Tensor = torch.cat([x, torch.tensor([[0, 0]], device=device, dtype=torch.uint8).expand(local_batch_size, -1, -1)], dim=1)
x1: torch.Tensor = torch.cat([x, torch.tensor([[0, 1]], device=device, dtype=torch.uint8).expand(local_batch_size, -1, -1)], dim=1)
x2: torch.Tensor = torch.cat([x, torch.tensor([[1, 0]], device=device, dtype=torch.uint8).expand(local_batch_size, -1, -1)], dim=1)
x3: torch.Tensor = torch.cat([x, torch.tensor([[1, 1]], device=device, dtype=torch.uint8).expand(local_batch_size, -1, -1)], dim=1)
# Cat all configurations to get x : new_local_batch_size * (current_size+1) * 2
# (un)perturbed prob : new_local_batch_size
x = torch.cat([x0, x1, x2, x3])
unperturbed_probability = l.permute(1, 0).reshape([4 * local_batch_size])
perturbed_probability = tildeL.permute(1, 0).reshape([4 * local_batch_size])
# Filter results, only use largest batch_size ones
selected = perturbed_probability.argsort(descending=True)[:batch_size]
x = x[selected]
unperturbed_probability = unperturbed_probability[selected]
perturbed_probability = perturbed_probability[selected]
# If prob = 0, filter it forcely
selected = perturbed_probability.isfinite()
x = x[selected]
unperturbed_probability = unperturbed_probability[selected]
perturbed_probability = perturbed_probability[selected]
# Apply ordering
x = torch.index_select(x, 1, self.ordering)
# Flatten site part of x
x = x.view([x.size(0), self.double_sites])
# It should return configurations, amplitudes, probabilities and multiplicities.
# But it is unique generator, so the last two fields are None
x = pack_int(x, size=1)
# Calculate the amplitude for the generated configurations in the batch.
real_batch_size = len(x)
real_batch_size_block = real_batch_size // block_num
remainder = real_batch_size % block_num
amplitude_list = []
for j in range(block_num):
if j < remainder:
current_real_batch_size_block = real_batch_size_block + 1
else:
current_real_batch_size_block = real_batch_size_block
start_index = j * real_batch_size_block + min(j, remainder)
end_index = start_index + current_real_batch_size_block
batch_indices_block = torch.arange(start_index, end_index, device=device, dtype=torch.int64)
amplitude_block = self(x[batch_indices_block])
amplitude_list.append(amplitude_block)
amplitude: torch.Tensor = torch.cat(amplitude_list)
return x, amplitude, None, None
class WaveFunctionNormal(torch.nn.Module):
"""
The wave function for the MLP network.
This module does not maintain any conservation.
"""
def __init__( # pylint: disable=too-many-arguments
self,
*,
sites: int, # Number of qubits
physical_dim: int, # Dimension of the physical space, which is always 2 for MLP
is_complex: bool, # Indicates whether the wave function is complex-valued, which is always true for MLP
hidden_size: tuple[int, ...], # Hidden layer sizes for the MLPs used in the amplitude and phase networks
ordering: int | list[int], # Ordering of sites: +1 for normal order, -1 for reversed order, or a custom order list
) -> None:
super().__init__()
self.sites: int = sites
assert physical_dim == 2
assert is_complex == True # pylint: disable=singleton-comparison
self.hidden_size: tuple[int, ...] = hidden_size
# Amplitude and Phase Networks for Each Site
# The amplitude network takes in qubits from previous sites and outputs a vector of dimension 2, representing the configuration of the qubit at the current site.
# And the phase network accept qubits from all sites and outputs the phase,
self.amplitude: torch.nn.ModuleList = torch.nn.ModuleList([MLP(i, 2, self.hidden_size) for i in range(self.sites)])
self.phase: torch.nn.Module = MLP(self.sites, 1, self.hidden_size)
# Site Ordering Configuration
# +1 for normal order, -1 for reversed order
if isinstance(ordering, int) and ordering == +1:
ordering = list(range(self.sites))
if isinstance(ordering, int) and ordering == -1:
ordering = list(reversed(range(self.sites)))
self.ordering: torch.Tensor
self.register_buffer("ordering", torch.tensor(ordering, dtype=torch.int64))
self.ordering_reversed: torch.Tensor
self.register_buffer("ordering_reversed", torch.scatter(torch.zeros(self.sites, dtype=torch.int64), 0, self.ordering, torch.arange(self.sites, dtype=torch.int64)))
# Dummy Parameter for Device and Dtype Retrieval
# This parameter is used to infer the device and dtype of the model.
self.dummy_param = torch.nn.Parameter(torch.empty(0))
@torch.jit.export
def _normalize_amplitude(self, x: torch.Tensor) -> torch.Tensor:
"""
Normalize the log amplitude of uncompleted configurations.
"""
# x: batch_size * 2
# param: batch_size
param = (2 * x).exp().sum(dim=[-1]).log() / 2
x = x - param.unsqueeze(-1)
# 1 = param = sqrt(sum(x.exp()^2)) now
return x
@torch.jit.export
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Compute the wave function psi for the given configurations.
"""
device: torch.device = self.dummy_param.device
dtype: torch.dtype = self.dummy_param.dtype
batch_size: int = x.shape[0]
# x: batch_size * sites
x = unpack_int(x, size=1, last_dim=self.sites).view([batch_size, self.sites])
# Apply ordering
x = torch.index_select(x, 1, self.ordering_reversed)
x_float: torch.Tensor = x.to(dtype=dtype)
arange: torch.Tensor = torch.arange(batch_size, device=device)
total_amplitude: torch.Tensor = torch.zeros([batch_size], device=device, dtype=dtype).double()
for i, amplitude_m in enumerate(self.amplitude):
# delta_amplitude : batch_size * 2
# delta_amplitude represents the amplitude changes for the configurations at the new site.
delta_amplitude: torch.Tensor = amplitude_m(x_float[:, :i].view([batch_size, i])).view([batch_size, 2])
# normalized_delta_amplitude: batch_size * 2
# Normalize the delta amplitude.
normalized_delta_amplitude: torch.Tensor = self._normalize_amplitude(delta_amplitude)
# selected_delta_amplitude: batch
# Select the delta amplitude for the current site.
xi_int32: torch.Tensor = x[:, i].to(dtype=torch.int32)
selected_delta_amplitude: torch.Tensor = normalized_delta_amplitude[arange, xi_int32]
total_amplitude = total_amplitude + selected_delta_amplitude.double()
total_phase: torch.Tensor = self.phase(x_float.view([batch_size, self.sites])).view([batch_size])
return torch.view_as_complex(torch.stack([total_amplitude, total_phase.double()], dim=-1)).exp()
@torch.jit.export
def generate_unique(self, batch_size: int, block_num: int = 1) -> tuple[torch.Tensor, torch.Tensor, None, None]:
"""
Generate configurations uniquely.
see https://arxiv.org/pdf/2408.07625.
"""
# pylint: disable=too-many-locals
# pylint: disable=invalid-name
# pylint: disable=too-many-statements
device: torch.device = self.dummy_param.device
dtype: torch.dtype = self.dummy_param.dtype
# x: local_batch_size * current_site
x: torch.Tensor = torch.empty([1, 0], device=device, dtype=torch.uint8)
# (un)perturbed_log_probability : local_batch_size
unperturbed_probability: torch.Tensor = torch.tensor([0], dtype=dtype, device=device)
perturbed_probability: torch.Tensor = torch.tensor([0], dtype=dtype, device=device)
for i, amplitude_m in enumerate(self.amplitude):
local_batch_size: int = x.shape[0]
x_float: torch.Tensor = x.to(dtype=dtype)
# delta_amplitude: batch * 2
# delta_amplitude represents the amplitude changes for the configurations at the new site.
local_batch_size_block = local_batch_size // block_num
remainder = local_batch_size % block_num
delta_amplitude_block_list: list[torch.Tensor] = []
for j in range(block_num):
if j < remainder:
current_local_batch_size_block = local_batch_size_block + 1
else:
current_local_batch_size_block = local_batch_size_block
start_index = j * local_batch_size_block + min(j, remainder)
end_index = start_index + current_local_batch_size_block
batch_indices_block = torch.arange(start_index, end_index, device=device, dtype=torch.int64)
delta_amplitude_block = amplitude_m(x_float.view([local_batch_size, i])[batch_indices_block]).view([current_local_batch_size_block, 2])
delta_amplitude_block_list.append(delta_amplitude_block)
delta_amplitude: torch.Tensor = torch.cat(delta_amplitude_block_list)
# normalized_delta_amplitude: batch_size * 2
# Normalize the delta amplitude.
normalized_delta_amplitude: torch.Tensor = self._normalize_amplitude(delta_amplitude)
# The delta unperturbed prob for all batch and 2 adds
l: torch.Tensor = (2 * normalized_delta_amplitude).view([local_batch_size, 2])
# and add to get the current unperturbed prob
l = unperturbed_probability.view([local_batch_size, 1]) + l
# Get perturbed prob by adding GUMBEL(0)
L: torch.Tensor = l - (-torch.rand_like(l).log()).log()
# Get max perturbed prob
Z: torch.Tensor = L.max(dim=-1).values.view([local_batch_size, 1])
# Evaluate the conditioned prob
tildeL: torch.Tensor = -torch.log(torch.exp(-perturbed_probability.view([local_batch_size, 1])) - torch.exp(-Z) + torch.exp(-L))
# Calculate appended configurations for 2 adds
# local_batch_size * current_site + local_batch_size * 1
x0: torch.Tensor = torch.cat([x, torch.tensor([0], device=device, dtype=torch.uint8).expand(local_batch_size, -1)], dim=1)
x1: torch.Tensor = torch.cat([x, torch.tensor([1], device=device, dtype=torch.uint8).expand(local_batch_size, -1)], dim=1)
# Cat all configurations to get x : new_local_batch_size * (current_size+1) * 2
# (un)perturbed prob : new_local_batch_size
x = torch.cat([x0, x1])
unperturbed_probability = l.permute(1, 0).reshape([2 * local_batch_size])
perturbed_probability = tildeL.permute(1, 0).reshape([2 * local_batch_size])
# Filter results, only use largest batch_size ones
selected = perturbed_probability.argsort(descending=True)[:batch_size]
x = x[selected]
unperturbed_probability = unperturbed_probability[selected]
perturbed_probability = perturbed_probability[selected]
# If prob = 0, filter it forcely
selected = perturbed_probability.isfinite()
x = x[selected]
unperturbed_probability = unperturbed_probability[selected]
perturbed_probability = perturbed_probability[selected]
# Apply ordering
x = torch.index_select(x, 1, self.ordering)
# Flatten site part of x
x = x.view([x.size(0), self.sites])
# It should return configurations, amplitudes, probabilities and multiplicities.
# But it is unique generator, so the last two fields are None
x = pack_int(x, size=1)
# Calculate the amplitude for the generated configurations in the batch.
real_batch_size = len(x)
real_batch_size_block = real_batch_size // block_num
remainder = real_batch_size % block_num
amplitude_list = []
for j in range(block_num):
if j < remainder:
current_real_batch_size_block = real_batch_size_block + 1
else:
current_real_batch_size_block = real_batch_size_block
start_index = j * real_batch_size_block + min(j, remainder)
end_index = start_index + current_real_batch_size_block
batch_indices_block = torch.arange(start_index, end_index, device=device, dtype=torch.int64)
amplitude_block = self(x[batch_indices_block])
amplitude_list.append(amplitude_block)
amplitude: torch.Tensor = torch.cat(amplitude_list)
return x, amplitude, None, None
"""
This module is used to store a dictionary that maps model names to their corresponding models.
Other packages or subpackages can register their models by adding entries to this dictionary.
"""
import typing
import torch
class NetworkProto(typing.Protocol):
"""
The Network protocol defines the interface that all networks must implement.
"""
def __call__(self, x: torch.Tensor) -> torch.Tensor:
"""
Calculate the amplitude for the given configurations.
Parameters
----------
x : torch.Tensor
The configurations to calculate the amplitude for.
The configurations are a two-dimensional uint8 tensor with first dimension equal to some batch size.
The second dimension contains occupation for each qubit which is bitwise encoded.
Returns
-------
torch.Tensor
The amplitudes of the configurations.
The amplitudes are a one-dimensional complex tensor with the only dimension equal to the batch size.
"""
def generate_unique(self, batch_size: int, block_num: int = 1) -> tuple[torch.Tensor, torch.Tensor, None, None]:
"""
Generate a batch of unique configurations.
Parameters
----------
batch_size : int
The number of configurations to generate.
block_num : int, default=1
The number of batch block to generate. It is used to split the batch into smaller parts to avoid memory issues.
Returns
-------
tuple[torch.Tensor, torch.Tensor, None, None]
A tuple containing the generated configurations, their amplitudes, and two None values.
The configurations are a two-dimensional uint8 tensor with first dimension equal to `batch_size`.
The second dimension contains occupation for each qubit which is bitwise encoded.
The amplitudes are a one-dimensional complex tensor with the only dimension equal to `batch_size`.
The last two None values are reserved for future use.
"""
def load_state_dict(self, data: dict[str, torch.Tensor]) -> typing.Any:
"""torch.nn.Module function"""
def state_dict(self) -> dict[str, torch.Tensor]:
"""torch.nn.Module function"""
def to(self, device: torch.device) -> typing.Any:
"""torch.nn.Module function"""
def parameters(self) -> typing.Iterable[torch.Tensor]:
"""torch.nn.Module function"""
def bfloat16(self) -> typing.Self:
"""torch.nn.Module function"""
def half(self) -> typing.Self:
"""torch.nn.Module function"""
def float(self) -> typing.Self:
"""torch.nn.Module function"""
def double(self) -> typing.Self:
"""torch.nn.Module function"""
Model_contra = typing.TypeVar("Model_contra", contravariant=True)
class NetworkConfigProto(typing.Protocol[Model_contra]):
"""
The NetworkConfigProto protocol defines the interface that all network configs must implement.
"""
# pylint: disable=too-few-public-methods
def create(self, model: Model_contra) -> NetworkProto:
"""
Create the network from the given config for the given model.
"""
ModelConfig = typing.TypeVar("ModelConfig")
class ModelProto(typing.Protocol[ModelConfig]):
"""
The Model protocol defines the interface that all models must implement.
"""
network_dict: dict[str, type[NetworkConfigProto[typing.Self]]]
ref_energy: float
config_t: type[ModelConfig]
@classmethod
def default_group_name(cls, config: ModelConfig) -> str:
"""
Get the default group name for logging purposes.
Parameters
----------
config : ModelConfig
The config of model.
Returns
-------
str
The group name for logging purposes.
"""
def __init__(self, config: ModelConfig) -> None:
"""
Create a model from the given config.
Parameters
----------
config : ModelConfig
The config of model.
"""
def apply_within(self, configs_i: torch.Tensor, psi_i: torch.Tensor, configs_j: torch.Tensor) -> torch.Tensor:
"""
Applies the Hamiltonian to the given vector.
Parameters
----------
configs_i : torch.Tensor
The configurations to apply the Hamiltonian to.
psi_i : torch.Tensor
The amplitudes of the configurations.
configs_j : torch.Tensor
The configurations subspace for the result of the Hamiltonian application.
Returns
-------
torch.Tensor
The result of the Hamiltonian application on the selected configurations subspace.
"""
def find_relative(self, configs_i: torch.Tensor, psi_i: torch.Tensor, count_selected: int, configs_exclude: torch.Tensor | None = None) -> torch.Tensor:
"""
Find relative configurations to the given configurations.
Parameters
----------
configs_i : torch.Tensor
The configurations to find relative configurations for.
psi_i : torch.Tensor
The amplitudes of the configurations.
count_selected : int
The number of relative configurations to find.
configs_exclude : torch.Tensor, optional
The configurations to exclude from the result.
Returns
-------
torch.Tensor
The relative configurations.
"""
def single_relative(self, configs: torch.Tensor) -> torch.Tensor:
"""
Find a single relative configuration for each configurations.
Parameters
----------
configs : torch.Tensor
The configurations to find the single relative configuration for.
Returns
-------
torch.Tensor
The relative configuration.
"""
def show_config(self, config: torch.Tensor) -> str:
"""
Converts a configuration tensor to a string representation.
Parameters
----------
config : torch.Tensor
The configuration tensor to convert.
Returns
-------
str
The string representation of the configuration tensor.
"""
model_dict: dict[str, typing.Type[ModelProto]] = {}
"""
This file provides an interface to work with openfermion models.
"""
import os
import typing
import logging
import dataclasses
import pathlib
import torch
import tyro
import openfermion
from .mlp import WaveFunctionElectronUpDown as MlpWaveFunction
from .attention import WaveFunctionElectronUpDown as AttentionWaveFunction
from .crossmlp import WaveFunction as CrossMlpWaveFunction
from .hamiltonian import Hamiltonian
from .model_dict import model_dict, ModelProto, NetworkProto, NetworkConfigProto
QMB_MODEL_PATH = "QMB_MODEL_PATH"
@dataclasses.dataclass
class ModelConfig:
"""
The configuration of the model.
"""
# The openfermion model name
model_name: typing.Annotated[str, tyro.conf.Positional, tyro.conf.arg(metavar="MODEL")]
# The path of models folder
model_path: typing.Annotated[pathlib.Path | None, tyro.conf.arg(aliases=["-M"], help_behavior_hint=f"default: \"models\", can be overridden by `${QMB_MODEL_PATH}'")] = None
def __post_init__(self) -> None:
if self.model_path is not None:
self.model_path = pathlib.Path(self.model_path)
else:
if QMB_MODEL_PATH in os.environ:
self.model_path = pathlib.Path(os.environ[QMB_MODEL_PATH])
else:
self.model_path = pathlib.Path("models")
class Model(ModelProto[ModelConfig]):
"""
This class handles the openfermion model.
"""
network_dict: dict[str, type[NetworkConfigProto["Model"]]] = {}
config_t = ModelConfig
@classmethod
def default_group_name(cls, config: ModelConfig) -> str:
return config.model_name
def __init__(self, args: ModelConfig) -> None:
logging.info("Input arguments successfully parsed")
logging.info("Model name: %s, Model path: %s", args.model_name, args.model_path)
model_name = args.model_name
model_path = args.model_path
assert model_path is not None
model_file_name = model_path / f"{model_name}.hdf5"
logging.info("Loading OpenFermion model '%s' from file: %s", model_name, model_file_name)
openfermion_model: openfermion.MolecularData = openfermion.MolecularData(filename=str(model_file_name)) # type: ignore[no-untyped-call]
logging.info("OpenFermion model '%s' successfully loaded", model_name)
self.n_qubits: int = int(openfermion_model.n_qubits) # type: ignore[arg-type]
self.n_electrons: int = int(openfermion_model.n_electrons) # type: ignore[arg-type]
logging.info("Identified %d qubits and %d electrons for model '%s'", self.n_qubits, self.n_electrons, model_name)
self.ref_energy: float = float(openfermion_model.fci_energy) # type: ignore[arg-type]
logging.info("Reference energy for model '%s' is %.10f", model_name, self.ref_energy)
logging.info("Converting OpenFermion Hamiltonian to internal Hamiltonian representation")
self.hamiltonian: Hamiltonian = Hamiltonian(
openfermion.transforms.get_fermion_operator(openfermion_model.get_molecular_hamiltonian()).terms, # type: ignore[no-untyped-call]
kind="fermi",
)
logging.info("Internal Hamiltonian representation for model '%s' has been successfully created", model_name)
def apply_within(self, configs_i: torch.Tensor, psi_i: torch.Tensor, configs_j: torch.Tensor) -> torch.Tensor:
return self.hamiltonian.apply_within(configs_i, psi_i, configs_j)
def find_relative(self, configs_i: torch.Tensor, psi_i: torch.Tensor, count_selected: int, configs_exclude: torch.Tensor | None = None) -> torch.Tensor:
return self.hamiltonian.find_relative(configs_i, psi_i, count_selected, configs_exclude)
def single_relative(self, configs: torch.Tensor) -> torch.Tensor:
return self.hamiltonian.single_relative(configs)
def show_config(self, config: torch.Tensor) -> str:
string = "".join(f"{i:08b}"[::-1] for i in config.cpu().numpy())
return "[" + "".join(self._show_config_site(string[index:index + 2]) for index in range(0, self.n_qubits, 2)) + "]"
def _show_config_site(self, string: str) -> str:
match string:
case "00":
return " "
case "10":
return "↑"
case "01":
return "↓"
case "11":
return "↕"
case _:
raise ValueError(f"Invalid string: {string}")
model_dict["openfermion"] = Model
@dataclasses.dataclass
class MlpConfig:
"""
The configuration of the MLP network.
"""
# The hidden widths of the network
hidden: typing.Annotated[tuple[int, ...], tyro.conf.arg(aliases=["-w"])] = (512,)
def create(self, model: Model) -> NetworkProto:
"""
Create a MLP network for the model.
"""
logging.info("Hidden layer widths: %a", self.hidden)
network = MlpWaveFunction(
double_sites=model.n_qubits,
physical_dim=2,
is_complex=True,
spin_up=model.n_electrons // 2,
spin_down=model.n_electrons // 2,
hidden_size=self.hidden,
ordering=+1,
)
return network
Model.network_dict["mlp"] = MlpConfig
@dataclasses.dataclass
class AttentionConfig:
"""
The configuration of the attention network.
"""
# Embedding dimension
embedding_dim: typing.Annotated[int, tyro.conf.arg(aliases=["-e"])] = 512
# Heads number
heads_num: typing.Annotated[int, tyro.conf.arg(aliases=["-m"])] = 8
# Feedforward dimension
feed_forward_dim: typing.Annotated[int, tyro.conf.arg(aliases=["-f"])] = 2048
# Shared expert number
shared_expert_num: typing.Annotated[int, tyro.conf.arg(aliases=["-s"])] = 1
# Routed expert number
routed_expert_num: typing.Annotated[int, tyro.conf.arg(aliases=["-r"])] = 0
# Selected expert number
selected_expert_num: typing.Annotated[int, tyro.conf.arg(aliases=["-c"])] = 0
# Network depth
depth: typing.Annotated[int, tyro.conf.arg(aliases=["-d"])] = 6
def create(self, model: Model) -> NetworkProto:
"""
Create an attention network for the model.
"""
logging.info(
"Attention network configuration: "
"embedding dimension: %d, "
"number of heads: %d, "
"feed-forward dimension: %d, "
"shared expert number: %d, "
"routed expert number: %d, "
"selected expert number: %d, "
"depth: %d",
self.embedding_dim,
self.heads_num,
self.feed_forward_dim,
self.shared_expert_num,
self.routed_expert_num,
self.selected_expert_num,
self.depth,
)
network = AttentionWaveFunction(
double_sites=model.n_qubits,
physical_dim=2,
is_complex=True,
spin_up=model.n_electrons // 2,
spin_down=model.n_electrons // 2,
embedding_dim=self.embedding_dim,
heads_num=self.heads_num,
feed_forward_dim=self.feed_forward_dim,
shared_num=self.shared_expert_num,
routed_num=self.routed_expert_num,
selected_num=self.selected_expert_num,
depth=self.depth,
ordering=+1,
)
return network
Model.network_dict["attention"] = AttentionConfig
@dataclasses.dataclass
class CrossMlpConfig:
"""
The configuration of the cross MLP network.
"""
# The hidden widths of the embedding subnetwork
embedding_hidden: typing.Annotated[tuple[int, ...], tyro.conf.arg(aliases=["-w"])] = (64,)
# The dimension of the embedding
embedding_size: typing.Annotated[int, tyro.conf.arg(aliases=["-e"])] = 16
# The hidden widths of the momentum subnetwork
momentum_hidden: typing.Annotated[tuple[int, ...], tyro.conf.arg(aliases=["-m"])] = (64,)
# The number of max momentum order
momentum_count: typing.Annotated[int, tyro.conf.arg(aliases=["-n"])] = 1
# The hidden widths of the tail part
tail_hidden: typing.Annotated[tuple[int, ...], tyro.conf.arg(aliases=["-t"])] = (64,)
# The kind of the crossmlp forward function
kind: typing.Annotated[typing.Literal[0, 1, 2], tyro.conf.arg(aliases=["-k"])] = 0
# The ordering of the sites
ordering: typing.Annotated[int | list[int], tyro.conf.arg(aliases=["-o"])] = +1
def create(self, model: Model) -> NetworkProto:
"""
Create a cross MLP network for the model.
"""
logging.info(
"Cross MLP network configuration: "
"embedding hidden widths: %a, "
"embedding size: %d, "
"momentum hidden widths: %a, "
"momentum count: %d, "
"tail hidden widths: %a, "
"kind: %d, "
"ordering: %s",
self.embedding_hidden,
self.embedding_size,
self.momentum_hidden,
self.momentum_count,
self.tail_hidden,
self.kind,
self.ordering,
)
network = CrossMlpWaveFunction(
sites=model.n_qubits,
physical_dim=2,
is_complex=False,
embedding_hidden_size=self.embedding_hidden,
embedding_size=self.embedding_size,
momentum_hidden_size=self.momentum_hidden,
momentum_count=self.momentum_count,
tail_hidden_size=self.tail_hidden,
kind=self.kind,
ordering=self.ordering,
)
return network
Model.network_dict["crossmlp"] = CrossMlpConfig
"""
This module provides tools for PyTorch optimizers.
"""
import typing
import torch
def _migrate_tensor(tensor: torch.Tensor, device: torch.device) -> None:
"""
Migrates the tensor to the specified device.
"""
tensor.data = tensor.data.to(device=device)
if tensor.grad is not None:
tensor.grad.data = tensor.grad.data.to(device=device)
def _migrate_param(param: object, device: torch.device) -> None:
"""
Migrates the parameter to the specified device.
"""
if isinstance(param, torch.Tensor):
_migrate_tensor(param, device)
elif isinstance(param, list):
for subparam in param:
_migrate_param(subparam, device)
elif isinstance(param, dict):
for subparam in param.values():
_migrate_param(subparam, device)
elif isinstance(param, int | float | complex):
pass
else:
raise ValueError(f"Unexpected parameter type: {type(param)}")
def _migrate_optimizer(optimizer: torch.optim.Optimizer) -> None:
"""
Migrates the optimizer to the device of the model parameters.
"""
device: torch.device = optimizer.param_groups[0]["params"][0].device
_migrate_param(optimizer.state, device)
def initialize_optimizer( # pylint: disable=too-many-arguments
params: typing.Iterable[torch.Tensor],
*,
use_lbfgs: bool,
learning_rate: float,
new_opt: bool = True,
optimizer: torch.optim.Optimizer | None = None,
state_dict: typing.Any = None,
) -> torch.optim.Optimizer:
"""
Initialize an optimizer.
Parameters
----------
params : typing.Iterable[torch.Tensor]
The parameters to be optimized.
use_lbfgs : bool
Whether to use L-BFGS as the optimizer.
learning_rate : float
The initial learning rate.
new_opt : bool, default=True
Whether to create a new optimizer or use the given optimizer, by default True.
optimizer : torch.optim.Optimizer, optional
The optimizer to be used if new_opt is False, by default None.
state_dict : typing.Any, optional
The state dictionary of the optimizer to be loaded, by default None.
Returns
-------
torch.optim.Optimizer
The optimizer.
"""
if new_opt or optimizer is None:
if use_lbfgs:
optimizer = torch.optim.LBFGS(params, lr=learning_rate)
else:
optimizer = torch.optim.Adam(params, lr=learning_rate)
if state_dict is not None:
optimizer.load_state_dict(state_dict)
_migrate_optimizer(optimizer)
return optimizer
def scale_learning_rate(optimizer: torch.optim.Optimizer, scale: float) -> None:
"""
Scales the learning rate of all parameter groups in the optimizer by a given factor.
Parameters
----------
optimizer : torch.optim.Optimizer
The optimizer whose learning rate will be scaled.
scale : float
The factor by which the learning rate will be scaled.
"""
for param in optimizer.param_groups:
param["lr"] *= scale
"""
This file precompiles essential extensions to run specific model.
"""
import typing
import dataclasses
import torch
import tyro
from .model_dict import model_dict, ModelProto, NetworkProto
from .subcommand_dict import subcommand_dict
@dataclasses.dataclass
class PrecompileConfig:
"""
The precompilation for solving quantum many-body problems.
"""
# The model name
model_name: typing.Annotated[str, tyro.conf.Positional, tyro.conf.arg(metavar="MODEL")]
# Arguments for physical model
physics_args: typing.Annotated[tuple[str, ...], tyro.conf.arg(aliases=["-P"]), tyro.conf.UseAppendAction] = ()
# The device to run on
device: typing.Annotated[torch.device, tyro.conf.arg(aliases=["-D"])] = torch.device(type="cuda", index=0)
def main(self) -> None:
"""
The main function for precompilation.
"""
model_t = model_dict[self.model_name]
model_config_t = model_t.config_t
network_config_t = model_t.network_dict["mlp"]
model: ModelProto = model_t(tyro.cli(model_config_t, args=self.physics_args))
network: NetworkProto = network_config_t().create(model).to(device=self.device)
configs_i, psi_i, _, _ = network.generate_unique(1)
model.apply_within(configs_i, psi_i, configs_i)
model.find_relative(configs_i, psi_i, 1)
subcommand_dict["precompile"] = PrecompileConfig
"""
Random engine utilities.
"""
import torch
def dump_random_engine_state(device: torch.device) -> torch.Tensor:
"""
Dump the random engine state for the specified device.
"""
match device.type:
case "cpu":
return torch.get_rng_state()
case "cuda":
return torch.cuda.get_rng_state(device)
case _:
raise ValueError(f"Unknown device: {device}")
def load_random_engine_state(state: torch.Tensor, device: torch.device) -> None:
"""
Load the random engine state for the specified device.
"""
match device.type:
case "cpu":
torch.set_rng_state(state)
case "cuda":
torch.cuda.set_rng_state(state, device)
case _:
raise ValueError(f"Unknown device: {device}")
"""
This file implements the reinforcement learning based subspace diagonalization algorithm.
"""
import logging
import typing
import dataclasses
import functools
import tyro
import scipy
import torch
from .common import CommonConfig
from .subcommand_dict import subcommand_dict
from .model_dict import ModelProto
from .optimizer import initialize_optimizer
from .bitspack import pack_int
def lanczos_energy(model: ModelProto, configs: torch.Tensor, step: int, threshold: float) -> tuple[float, torch.Tensor]:
"""
Calculate the energy using the Lanczos method.
"""
vector = torch.randn([configs.size(0)], dtype=torch.complex128, device=configs.device)
v: list[torch.Tensor] = [vector / torch.linalg.norm(vector)] # pylint: disable=not-callable
alpha: list[torch.Tensor] = []
beta: list[torch.Tensor] = []
w: torch.Tensor
w = model.apply_within(configs, v[-1], configs)
alpha.append((w.conj() @ v[-1]).real)
w = w - alpha[-1] * v[-1]
i = 0
while True:
norm_w = torch.linalg.norm(w) # pylint: disable=not-callable
if norm_w < threshold:
break
beta.append(norm_w)
v.append(w / beta[-1])
w = model.apply_within(configs, v[-1], configs)
alpha.append((w.conj() @ v[-1]).real)
if i == step:
break
w = w - alpha[-1] * v[-1] - beta[-1] * v[-2]
v[-2] = v[-2].cpu() # v maybe very large, so we need to move it to CPU
i += 1
if len(beta) == 0:
return alpha[0].item(), v[0]
vals, vecs = scipy.linalg.eigh_tridiagonal(torch.stack(alpha, dim=0).cpu(), torch.stack(beta, dim=0).cpu(), lapack_driver="stebz", select="i", select_range=(0, 0))
energy = torch.as_tensor(vals[0])
result = functools.reduce(torch.add, (weight[0] * vector.to(device=configs.device) for weight, vector in zip(vecs, v)))
return energy.item(), result
def config_contributions(model: ModelProto, base_configs: torch.Tensor, active_configs: torch.Tensor, state: torch.Tensor) -> torch.Tensor:
"""
Calculate the config contribution for the given configurations pool of the ground state approximation.
"""
base_state = state[:base_configs.size(0)]
active_state = state[base_configs.size(0):]
num = (active_state.conj() * model.apply_within(base_configs, base_state, active_configs)).real * 2
den = (base_state.conj() @ base_state).real
result = num / den
return result
@dataclasses.dataclass
class RldiagConfig:
"""
The reinforcement learning based subspace diagonalization algorithm.
"""
# pylint: disable=too-many-instance-attributes
common: typing.Annotated[CommonConfig, tyro.conf.OmitArgPrefixes]
# The initial configuration for the first step, which is usually the Hatree-Fock state for quantum chemistry system
initial_config: typing.Annotated[typing.Optional[str], tyro.conf.arg(aliases=["-i"])] = None
# The maximum size of the configuration pool
max_pool_size: typing.Annotated[int, tyro.conf.arg(aliases=["-n"])] = 32768
# The learning rate for the local optimizer
learning_rate: typing.Annotated[float, tyro.conf.arg(aliases=["-r"])] = 1e-3
# The step of lanczos iteration for calculating the energy
lanczos_step: typing.Annotated[int, tyro.conf.arg(aliases=["-k"])] = 64
# The thereshold for the lanczos iteration
lanczos_threshold: typing.Annotated[float, tyro.conf.arg(aliases=["-d"])] = 1e-8
# The coefficient of configuration number for the sigma calculation
alpha: typing.Annotated[float, tyro.conf.arg(aliases=["-a"])] = 0.0
def main(self) -> None:
"""
The main function for the reinforcement learning based subspace diagonalization algorithm.
"""
# pylint: disable=too-many-statements
# pylint: disable=too-many-locals
# pylint: disable=too-many-branches
model, network, data = self.common.main()
logging.info(
"Arguments Summary: "
"Initial Configuration: %s, "
"Learning Rate: %.10f, "
"Lanczos step: %d, "
"Lanczos threshold: %.10f, "
"Alpha: %.10f",
self.initial_config if self.initial_config is not None else "None",
self.learning_rate,
self.lanczos_step,
self.lanczos_threshold,
self.alpha,
)
optimizer = initialize_optimizer(
network.parameters(),
use_lbfgs=False,
learning_rate=self.learning_rate,
state_dict=data.get("optimizer"),
)
if self.initial_config is None:
if "rldiag" not in data: # pylint: disable=no-else-raise
raise ValueError("The initial configuration is not set, please set it.")
else:
configs = data["rldiag"]["configs"].to(device=self.common.device)
else:
# The parameter initial_config accepts two formats:
# 1. The 0/1 string, such as "11111100000000000000000000000011110000000000110000110000"
# 2. The packed string, such as "63,0,0,192,3,48,12"
if all(i in "01" for i in self.initial_config):
# The 0/1 string
configs = pack_int(
torch.tensor([[int(i) for i in self.initial_config]], dtype=torch.bool, device=self.common.device),
size=1,
)
else:
# The packed string
configs = torch.tensor([[int(i) for i in self.initial_config.split(",")]], dtype=torch.uint8, device=self.common.device)
if "rldiag" not in data:
data["rldiag"] = {"global": 0, "local": 0, "configs": configs}
else:
data["rldiag"]["configs"] = configs
data["rldiag"]["local"] = 0
writer = torch.utils.tensorboard.SummaryWriter(log_dir=self.common.folder()) # type: ignore[no-untyped-call]
last_state = None
while True:
logging.info("Starting a new cycle")
logging.info("Evaluating each configuration")
score = network(configs)
logging.info("All configurations are evaluated")
logging.info("Applying the action")
# | old config pool |
# | pruned | remained | expanded |
# | new config pool |
action = score.real >= -self.alpha
_, topk = torch.topk(score.real, k=score.size(0) // 2, dim=0)
action[topk] = True
if score.size(0) > self.max_pool_size:
_, topk = torch.topk(-score.real, k=score.size(0) - self.max_pool_size)
action[topk] = False
action[0] = True
remained_configs = configs[action]
pruned_configs = configs[torch.logical_not(action)]
expanded_configs = model.single_relative(remained_configs) # There are duplicated config here
effective_expanded_configs, previous_to_effective = torch.unique(expanded_configs, dim=0, return_inverse=True)
old_configs = torch.cat([remained_configs, pruned_configs])
new_configs = torch.cat([remained_configs, effective_expanded_configs])
configs = new_configs
logging.info("Action has been applied")
configs_size = configs.size(0)
logging.info("Configuration pool size: %d", configs_size)
writer.add_scalar("rldiag/configs/global", configs_size, data["rldiag"]["global"]) # type: ignore[no-untyped-call]
writer.add_scalar("rldiag/configs/local", configs_size, data["rldiag"]["local"]) # type: ignore[no-untyped-call]
if last_state is not None:
old_state = last_state[torch.cat([action.nonzero()[:, 0], torch.logical_not(action).nonzero()[:, 0]])]
else:
_, old_state = lanczos_energy(model, old_configs, self.lanczos_step, self.lanczos_threshold)
new_energy, new_state = lanczos_energy(model, configs, self.lanczos_step, self.lanczos_threshold)
last_state = new_state
energy = new_energy
logging.info("Current energy is %.10f, Reference energy is %.10f, Energy error is %.10f", energy, model.ref_energy, energy - model.ref_energy)
writer.add_scalar("rldiag/energy/state/global", energy, data["rldiag"]["global"]) # type: ignore[no-untyped-call]
writer.add_scalar("rldiag/energy/state/local", energy, data["rldiag"]["local"]) # type: ignore[no-untyped-call]
writer.add_scalar("rldiag/energy/error/global", energy - model.ref_energy, data["rldiag"]["global"]) # type: ignore[no-untyped-call]
writer.add_scalar("rldiag/energy/error/local", energy - model.ref_energy, data["rldiag"]["local"]) # type: ignore[no-untyped-call]
if "base" not in data["rldiag"]:
# This is the first time to calculate the energy, which is usually the energy of the Hatree-Fock state for quantum chemistry system
# This will not be flushed acrossing different cycle chains.
data["rldiag"]["base"] = energy
old_contribution = config_contributions(model, remained_configs, pruned_configs, old_state)
effective_new_contribution = config_contributions(model, remained_configs, effective_expanded_configs, new_state)
new_contribution = effective_new_contribution[previous_to_effective]
contribution = torch.cat([old_contribution, new_contribution])
configs_for_training = torch.cat([pruned_configs, remained_configs])
with torch.enable_grad(): # type: ignore[no-untyped-call]
score_for_training = network(configs_for_training)
loss = torch.linalg.norm(score_for_training + contribution) # pylint: disable=not-callable
optimizer.zero_grad()
loss.backward()
optimizer.step() # pylint: disable=no-value-for-parameter
logging.info("Saving model checkpoint")
data["rldiag"]["configs"] = configs
data["rldiag"]["energy"] = energy
data["rldiag"]["global"] += 1
data["rldiag"]["local"] += 1
data["network"] = network.state_dict()
data["optimizer"] = optimizer.state_dict()
self.common.save(data, data["rldiag"]["global"])
logging.info("Checkpoint successfully saved")
logging.info("Current cycle completed")
subcommand_dict["rldiag"] = RldiagConfig
"""
This file implements the run command for the qmb package, which is different from the other commands.
It is used to run a configuration file that contains the settings for a model and network, instead of using command line arguments.
And it is not a specific algorithm, but a general command to run any other command with a configuration file.
"""
import typing
import dataclasses
import pathlib
import yaml
import tyro
from .subcommand_dict import subcommand_dict
from .model_dict import model_dict
from .common import CommonConfig
@dataclasses.dataclass
class RunConfig:
"""
The execution of the configuration file with other specific commands.
"""
# The configuration file name
file_name: typing.Annotated[pathlib.Path, tyro.conf.Positional, tyro.conf.arg(metavar="CONFIG")]
def main(self) -> None:
"""
Run the configuration file.
"""
with open(self.file_name, "rt", encoding="utf-8") as file:
data = yaml.safe_load(file)
common_data = data.pop("common")
physics_data = data.pop("physics")
network_data = data.pop("network")
script, param = next(iter(data.items()))
common = CommonConfig(**common_data)
run = subcommand_dict[script](**param, common=common)
model_t = model_dict[common.model_name]
model_config_t = model_t.config_t
network_config_t = model_t.network_dict[common.network_name]
network_param = network_config_t(**network_data)
model_param = model_config_t(**physics_data)
run.main(model_param=model_param, network_param=network_param) # type: ignore[call-arg]
subcommand_dict["run"] = RunConfig
"""
This module is used to store a dictionary that maps subcommand names to their corresponding dataclass types.
Other packages or subpackages can register their subcommands by adding entries to this dictionary, such as
```
from qmb.subcommand_dict import subcommand_dict
subcommand_dict["my_subcommand"] = MySubcommand
```
"""
import typing
class SubcommandProto(typing.Protocol):
"""
This protocol defines a dataclass with a `main` method, which will be called when the subcommand is executed.
"""
# pylint: disable=too-few-public-methods
def main(self) -> None:
"""
The main method to be called when the subcommand is executed.
"""
subcommand_dict: dict[str, typing.Callable[..., SubcommandProto]] = {}
"""
This file implements a variational Monte Carlo method for solving quantum many-body problems.
"""
import logging
import typing
import dataclasses
import torch
import torch.utils.tensorboard
import tyro
from .common import CommonConfig
from .subcommand_dict import subcommand_dict
from .optimizer import initialize_optimizer
@dataclasses.dataclass
class VmcConfig:
"""
The VMC optimization for solving quantum many-body problems.
"""
# pylint: disable=too-many-instance-attributes
common: typing.Annotated[CommonConfig, tyro.conf.OmitArgPrefixes]
# The sampling count
sampling_count: typing.Annotated[int, tyro.conf.arg(aliases=["-n"])] = 4000
# The number of relative configurations to be used in energy calculation
relative_count: typing.Annotated[int, tyro.conf.arg(aliases=["-c"])] = 40000
# Whether to use the global optimizer
global_opt: typing.Annotated[bool, tyro.conf.arg(aliases=["-g"])] = False
# Whether to use LBFGS instead of Adam
use_lbfgs: typing.Annotated[bool, tyro.conf.arg(aliases=["-2"])] = False
# The learning rate for the local optimizer
learning_rate: typing.Annotated[float, tyro.conf.arg(aliases=["-r"], help_behavior_hint="(default: 1e-3 for Adam, 1 for LBFGS)")] = -1
# The number of steps for the local optimizer
local_step: typing.Annotated[int, tyro.conf.arg(aliases=["-s"])] = 1000
def __post_init__(self) -> None:
if self.learning_rate == -1:
self.learning_rate = 1 if self.use_lbfgs else 1e-3
def main(self) -> None:
"""
The main function for the VMC optimization.
"""
# pylint: disable=too-many-statements
# pylint: disable=too-many-locals
model, network, data = self.common.main()
logging.info(
"Arguments Summary: "
"Sampling Count: %d, "
"Relative Count: %d, "
"Global Optimizer: %s, "
"Use LBFGS: %s, "
"Learning Rate: %.10f, "
"Local Steps: %d, ",
self.sampling_count,
self.relative_count,
"Yes" if self.global_opt else "No",
"Yes" if self.use_lbfgs else "No",
self.learning_rate,
self.local_step,
)
optimizer = initialize_optimizer(
network.parameters(),
use_lbfgs=self.use_lbfgs,
learning_rate=self.learning_rate,
state_dict=data.get("optimizer"),
)
if "vmc" not in data:
data["vmc"] = {"global": 0, "local": 0}
writer = torch.utils.tensorboard.SummaryWriter(log_dir=self.common.folder()) # type: ignore[no-untyped-call]
while True:
logging.info("Starting a new optimization cycle")
logging.info("Sampling configurations")
configs_i, psi_i, _, _ = network.generate_unique(self.sampling_count)
logging.info("Sampling completed, unique configurations count: %d", len(configs_i))
logging.info("Calculating relative configurations")
if self.relative_count <= len(configs_i):
configs_src = configs_i
configs_dst = configs_i
else:
configs_src = configs_i
configs_dst = torch.cat([configs_i, model.find_relative(configs_i, psi_i, self.relative_count - len(configs_i))])
logging.info("Relative configurations calculated, count: %d", len(configs_dst))
optimizer = initialize_optimizer(
network.parameters(),
use_lbfgs=self.use_lbfgs,
learning_rate=self.learning_rate,
new_opt=not self.global_opt,
optimizer=optimizer,
)
def closure() -> torch.Tensor:
# Optimizing energy
optimizer.zero_grad()
psi_src = network(configs_src)
with torch.no_grad():
psi_dst = network(configs_dst)
hamiltonian_psi_dst = model.apply_within(configs_dst, psi_dst, configs_src)
num = psi_src.conj() @ hamiltonian_psi_dst
den = psi_src.conj() @ psi_src.detach()
energy = num / den
energy = energy.real
energy.backward() # type: ignore[no-untyped-call]
return energy
logging.info("Starting local optimization process")
for i in range(self.local_step):
energy: torch.Tensor = optimizer.step(closure) # type: ignore[assignment,arg-type]
logging.info("Local optimization in progress, step: %d, energy: %.10f, ref energy: %.10f", i, energy.item(), model.ref_energy)
writer.add_scalar("vmc/energy", energy, data["vmc"]["local"]) # type: ignore[no-untyped-call]
writer.add_scalar("vmc/error", energy - model.ref_energy, data["vmc"]["local"]) # type: ignore[no-untyped-call]
data["vmc"]["local"] += 1
logging.info("Local optimization process completed")
writer.flush() # type: ignore[no-untyped-call]
logging.info("Saving model checkpoint")
data["vmc"]["global"] += 1
data["network"] = network.state_dict()
data["optimizer"] = optimizer.state_dict()
self.common.save(data, data["vmc"]["global"])
logging.info("Checkpoint successfully saved")
logging.info("Current optimization cycle completed")
subcommand_dict["vmc"] = VmcConfig
"""
Test module for bitspack.
"""
import torch
import qmb.bitspack
# pylint: disable=missing-function-docstring
def test_pack_unpack_int_size_1() -> None:
tensor = torch.tensor([1, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 1, 1, 0, 0, 1], dtype=torch.uint8)
size = 1
packed = qmb.bitspack.pack_int(tensor, size)
unpacked = qmb.bitspack.unpack_int(packed, size, tensor.shape[-1])
assert torch.all(unpacked == tensor)
def test_pack_unpack_int_size_2() -> None:
tensor = torch.tensor([3, 2, 1, 0, 3, 2, 1, 0, 3, 2, 1, 0, 3, 2, 1, 0], dtype=torch.uint8)
size = 2
packed = qmb.bitspack.pack_int(tensor, size)
unpacked = qmb.bitspack.unpack_int(packed, size, tensor.shape[-1])
assert torch.all(unpacked == tensor)
def test_pack_unpack_int_size_4() -> None:
tensor = torch.tensor([15, 10, 5, 0, 15, 10, 5, 0, 15, 10, 5, 0, 15, 10, 5, 0], dtype=torch.uint8)
size = 4
packed = qmb.bitspack.pack_int(tensor, size)
unpacked = qmb.bitspack.unpack_int(packed, size, tensor.shape[-1])
assert torch.all(unpacked == tensor)
def test_pack_unpack_int_size_8() -> None:
tensor = torch.tensor([15, 210, 5, 123, 151, 130, 5, 0, 15, 10, 75, 0, 15, 10, 5, 0], dtype=torch.uint8)
size = 8
packed = qmb.bitspack.pack_int(tensor, size)
unpacked = qmb.bitspack.unpack_int(packed, size, tensor.shape[-1])
assert torch.all(unpacked == tensor)
def test_pack_unpack_int_with_padding() -> None:
tensor = torch.tensor([1, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 1, 1, 0, 0], dtype=torch.uint8)
size = 1
packed = qmb.bitspack.pack_int(tensor, size)
unpacked = qmb.bitspack.unpack_int(packed, size, tensor.shape[-1])
assert torch.all(unpacked == tensor)
def test_pack_unpack_int_multi_dimensional() -> None:
tensor = torch.tensor([[1, 0, 1, 1], [0, 0, 1, 0], [1, 1, 0, 1], [1, 0, 0, 1]], dtype=torch.uint8)
size = 1
packed = qmb.bitspack.pack_int(tensor, size)
unpacked = qmb.bitspack.unpack_int(packed, size, tensor.shape[-1])
assert torch.all(unpacked == tensor)
def test_pack_unpack_int_size_2_with_padding() -> None:
tensor = torch.tensor([3, 2, 1, 0, 3, 2, 1, 0, 3, 2, 1, 0, 3, 2], dtype=torch.uint8)
size = 2
packed = qmb.bitspack.pack_int(tensor, size)
unpacked = qmb.bitspack.unpack_int(packed, size, tensor.shape[-1])
assert torch.all(unpacked == tensor)
def test_pack_unpack_int_size_4_with_padding() -> None:
tensor = torch.tensor([15, 10, 5, 0, 15, 10, 5, 0, 15, 10, 5, 0, 15, 10], dtype=torch.uint8)
size = 4
packed = qmb.bitspack.pack_int(tensor, size)
unpacked = qmb.bitspack.unpack_int(packed, size, tensor.shape[-1])
assert torch.all(unpacked == tensor)
def test_pack_unpack_int_size_1_large_tensor() -> None:
tensor = torch.randint(0, 2, (1000,), dtype=torch.uint8)
size = 1
packed = qmb.bitspack.pack_int(tensor, size)
unpacked = qmb.bitspack.unpack_int(packed, size, tensor.shape[-1])
assert torch.all(unpacked == tensor)
def test_pack_unpack_int_size_2_large_tensor() -> None:
tensor = torch.randint(0, 4, (1000,), dtype=torch.uint8)
size = 2
packed = qmb.bitspack.pack_int(tensor, size)
unpacked = qmb.bitspack.unpack_int(packed, size, tensor.shape[-1])
assert torch.all(unpacked == tensor)
def test_pack_unpack_int_size_4_large_tensor() -> None:
tensor = torch.randint(0, 16, (1000,), dtype=torch.uint8)
size = 4
packed = qmb.bitspack.pack_int(tensor, size)
unpacked = qmb.bitspack.unpack_int(packed, size, tensor.shape[-1])
assert torch.all(unpacked == tensor)
"""
Test module for PyTorch C++ extension.
"""
import qmb.hamiltonian
def test_import() -> None:
"""
Test the import and availability of the PyTorch C++ extension operations.
"""
# pylint: disable=protected-access
extension = qmb.hamiltonian.Hamiltonian._load_module()
_ = getattr(extension, "prepare")
+11
-2
name: Pre-commit Hooks
on: [push, pull_request]
on:
- push
- pull_request

@@ -8,7 +10,14 @@ jobs:

runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
python-version: '3.12'
cache: 'pip'
- name: Install dependencies
run: pip install '.[dev]'
- uses: pre-commit/action@v3.0.1
+24
-35

@@ -1,41 +0,27 @@

name: Build
name: Build wheels
on: [push, pull_request]
on:
- push
- pull_request
jobs:
build_wheels:
name: Build wheels on ${{ matrix.os }}
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
build_sdist:
name: Build source distribution
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up QEMU
if: runner.os == 'Linux'
uses: docker/setup-qemu-action@v3
with:
platforms: all
fetch-depth: 0
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Build sdist
run: pipx run build --sdist
- name: Install cibuildwheel
run: python -m pip install cibuildwheel==2.21.1
- name: Build wheels
run: python -m cibuildwheel --output-dir wheelhouse
env:
CIBW_TEST_SKIP: "*-win_arm64"
- uses: actions/upload-artifact@v4
with:
name: cibw-wheels-${{ matrix.os }}-${{ strategy.job-index }}
path: ./wheelhouse/*.whl
name: build-sdist
path: dist/*.tar.gz
build_sdist:
name: Build source distribution
build_wheel:
name: Build wheel
runs-on: ubuntu-latest

@@ -45,10 +31,12 @@

- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Build sdist
run: pipx run build --sdist
- name: Build wheel
run: pipx run build --wheel
- uses: actions/upload-artifact@v4
with:
name: cibw-sdist
path: dist/*.tar.gz
name: build-wheel
path: dist/*.whl

@@ -58,3 +46,3 @@ upload_pypi:

runs-on: ubuntu-latest
needs: [build_wheels, build_sdist]
needs: [build_wheel, build_sdist]
environment:

@@ -65,3 +53,3 @@ name: release

id-token: write
if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags/v')
if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags/v') && github.event.repository.visibility == 'public'

@@ -71,5 +59,6 @@ steps:

with:
pattern: cibw-*
pattern: build-*
path: dist
merge-multiple: true
- uses: pypa/gh-action-pypi-publish@release/v1

@@ -5,4 +5,8 @@ __pycache__

dist
models
_version.py
.idea
.vscode
.coverage*
events.*
*.so

@@ -12,3 +16,4 @@ *.hdf5

*.pt
*.pth
*.log
*.egg-info

@@ -52,2 +52,14 @@ # See https://pre-commit.com for more information

- repo: https://github.com/pylint-dev/pylint
rev: v3.3.1
hooks:
- id: pylint
language: system
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.13.0
hooks:
- id: mypy
language: system
- repo: https://github.com/google/yapf

@@ -57,2 +69,3 @@ rev: v0.40.2

- id: yapf
language: system

@@ -59,0 +72,0 @@ - repo: https://github.com/Lucas-C/pre-commit-hooks

+135
-10

@@ -1,7 +0,7 @@

Metadata-Version: 2.1
Metadata-Version: 2.4
Name: qmb
Version: 0.0.10
Version: 0.0.51
Summary: Quantum Manybody Problem
Author-email: Hao Zhang <hzhangxyz@outlook.com>
License: GPLv3
License-Expression: GPL-3.0-or-later
Project-URL: Homepage, https://github.com/USTC-KnowledgeComputingLab/qmb

@@ -14,5 +14,5 @@ Project-URL: Documentation, https://github.com/USTC-KnowledgeComputingLab/qmb

Classifier: Environment :: Console
Classifier: License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
Classifier: Programming Language :: C++
Classifier: Programming Language :: Python :: 3
Classifier: Environment :: GPU :: NVIDIA CUDA
Classifier: Topic :: Scientific/Engineering :: Mathematics

@@ -25,8 +25,133 @@ Classifier: Topic :: Scientific/Engineering :: Physics

License-File: LICENSE.md
Requires-Dist: numpy
Requires-Dist: scipy
Requires-Dist: torch
Requires-Dist: tyro
Requires-Dist: openfermion
Requires-Dist: platformdirs~=4.3.7
Requires-Dist: numpy~=1.26.4
Requires-Dist: scipy~=1.15.2
Requires-Dist: torch<2.8,>=2.6
Requires-Dist: pybind11~=2.13.6
Requires-Dist: ninja~=1.11.1.4
Requires-Dist: tyro~=0.9.18
Requires-Dist: pyyaml~=6.0.2
Requires-Dist: openfermion~=1.7.0
Requires-Dist: tensorboard~=2.19.0
Requires-Dist: standard-imghdr~=3.13.0; python_version >= "3.13"
Provides-Extra: dev
Requires-Dist: yapf~=0.43.0; extra == "dev"
Requires-Dist: pylint~=3.3.6; extra == "dev"
Requires-Dist: mypy<1.18,>=1.15; extra == "dev"
Requires-Dist: pytest<8.5.0,>=8.3.5; extra == "dev"
Requires-Dist: pytest-cov~=6.0.0; extra == "dev"
Requires-Dist: types-pyyaml~=6.0.12.20250326; extra == "dev"
Dynamic: license-file
# Quantum-Many-Body
# An efficient Neural-Network Quantum States Architecture for Strongly Correlated Systems
The Quantum-Many-Body (`qmb`) is a powerful tool designed to solve quantum-many-body problems.
## About The Project
This repository hosts a [Python][python-url] package named `qmb`, dedicated to solving quantum-many-body problem.
It implements a suite of algorithms and interfaces with various model descriptors, such as the [OpenFermion][openfermion-url] format and FCIDUMP.
Additionally, `qmb` can efficiently utilize accelerators such as GPU(s) to enhance its performance.
The package's main entry point is a command line interface (CLI) application, also named `qmb`.
## Getting Started
Users can run this application either using [Docker][docker-url] or locally.
Both approaches require GPU(s) with [CUDA][cuda-url] support and a properly installed GPU driver, which is typically included with the installation of the CUDA Toolkit.
### Run with Docker
After installing [Docker][docker-url] with [CUDA support][docker-cuda-url], pull [our prebuilt Docker image][our-docker-url] using:
```
docker pull hzhangxyz/qmb
```
If users experience network issues, consider [configuring Docker mirrors][docker-mirror-url].
Then, user can run `qmb` with
```
docker run --device=nvidia.com/gpu=all --rm -it hzhangxyz/qmb --help
```
This command utilizes Docker's [CDI][docker-cuda-cdi-url] feature to enable CUDA devices in `--device=nvidia.com/gpu=all`.
Alternatively, for [legacy][docker-cuda-legacy-url] support, users can run:
```
docker run --gpus all --rm -it hzhangxyz/qmb --help
```
Please note that we currently provide Docker images for Linux/AMD64 only.
When running with Docker, users might want to [mount][docker-mount-url] a local folder to share storage between the container and the local machine such as using the `-v` option.
### Run locally
To install locally, users first needs to install the [CUDA toolkit][cuda-url].
The `qmb` requires Python >= 3.12.
After setting up a compatible Python environment such as using [Anaconda][anaconda-url], [Miniconda][miniconda-url], [venv][venv-url] or [pyenv][pyenv-url], users can install [our prebuilt package][our-pypi-url] using:
```
pip install qmb
```
If users face network issues, consider setting up a mirror with the `-i` option.
Users can then invoke the `qmb` script with:
```
qmb --help
```
Please note that if the CUDA toolkit version is too old, users must install a compatible PyTorch version before running `pip install qmb`.
For example, use `pip install torch --index-url https://download.pytorch.org/whl/cu118` for CUDA 11.8 (see [PyTorch’s guide][pytorch-install-url] for details).
This older CUDA-compatible PyTorch must be installed first, otherwise, users will need to uninstall all existing PyTorch/CUDA-related python packages before reinstalling the correct version.
## Usage
The main entry point of this package is a CLI script named `qmb`.
Use the following command to view its usage:
```
qmb --help
```
This command provides a collection of subcommands, such as `imag`.
To access detailed help for a specific subcommand, users can append `--help` to the command.
For example, use `qmb haar --help` to view the help information for the `imag` subcommand.
Typically, `qmb` requires a specific descriptor for a particular physical or chemical model to execute.
We have collected a set of such models [here][models-url].
Users can clone or download this dataset into a folder named `models` within their current working directory.
This folder `models` is the default location which `qmb` will search for the necessary model files.
Alternatively, users can specify a custom path by setting the `$QMB_MODEL_PATH` environment variable, thereby overriding the default behavior.
After cloning or downloading the dataset, users can calculate the ground state of the $N_2$ system by running the command:
```
qmb haar openfermion mlp -PN2
```
This command utilizes the `imag` subcommand with the descriptor in OpenFermion format and the [mlp network][naqs-url],
It specifies the $N_2$ model via the `-PN2` flag since the $N_2$ model is loaded from the file `N2.hdf5` in the folder `models`.
For more detailed information, please refer to the help command and the documentation.
## Contributing
Contributions are welcome! Please see [CONTRIBUTING.md](CONTRIBUTING.md) for detailed guidelines.
## License
This project is distributed under the GPLv3 License. See [LICENSE.md](LICENSE.md) for more information.
[python-url]: https://www.python.org/
[openfermion-url]: https://quantumai.google/openfermion
[docker-url]: https://www.docker.com/
[cuda-url]: https://docs.nvidia.com/cuda/
[docker-cuda-url]: https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/index.html
[our-docker-url]: https://hub.docker.com/r/hzhangxyz/qmb
[docker-mirror-url]: https://docs.docker.com/docker-hub/image-library/mirror/
[docker-cuda-cdi-url]: https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/cdi-support.html
[docker-cuda-legacy-url]: https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/install-guide.html
[anaconda-url]: https://www.anaconda.com/
[miniconda-url]: https://docs.anaconda.com/miniconda/
[venv-url]: https://docs.python.org/3/library/venv.html
[pyenv-url]: https://github.com/pyenv/pyenv
[our-pypi-url]: https://pypi.org/project/qmb/
[docker-mount-url]: https://docs.docker.com/engine/storage/volumes/
[pytorch-install-url]: https://pytorch.org/get-started/locally/
[models-url]: https://huggingface.co/datasets/USTC-KnowledgeComputingLab/qmb-models
[naqs-url]: https://github.com/tomdbar/naqs-for-quantum-chemistry
[build-system]
requires = ["setuptools>=69.5", "setuptools_scm>=8.1", "pybind11>=2.13"]
requires = ["setuptools>=78.0,<80.10", "setuptools_scm~=8.2.0"]
build-backend = "setuptools.build_meta"

@@ -8,3 +8,15 @@

dynamic = ["version"]
dependencies = ["numpy", "scipy", "torch", "tyro", "openfermion"]
dependencies = [
"platformdirs~=4.3.7",
"numpy~=1.26.4",
"scipy~=1.15.2",
"torch>=2.6,<2.8",
"pybind11~=2.13.6",
"ninja~=1.11.1.4",
"tyro~=0.9.18",
"pyyaml~=6.0.2",
"openfermion~=1.7.0",
"tensorboard~=2.19.0",
"standard-imghdr~=3.13.0 ; python_version>='3.13'",
]
requires-python = ">=3.12"

@@ -14,3 +26,3 @@ authors = [{ name = "Hao Zhang", email = "hzhangxyz@outlook.com" }]

readme = "README.md"
license = {text = "GPLv3"}
license = "GPL-3.0-or-later"
keywords = ["quantum", "manybody", "quantum-chemistry", "quantum-simulation", "molecular-simulation", "algorithms", "simulation", "wave-function", "ground-state", "ansatz", "torch", "pytorch"]

@@ -20,5 +32,5 @@ classifiers = [

"Environment :: Console",
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
"Programming Language :: C++",
"Programming Language :: Python :: 3",
"Environment :: GPU :: NVIDIA CUDA",
"Topic :: Scientific/Engineering :: Mathematics",

@@ -30,2 +42,12 @@ "Topic :: Scientific/Engineering :: Physics",

[project.optional-dependencies]
dev = [
"yapf~=0.43.0",
"pylint~=3.3.6",
"mypy>=1.15,<1.18",
"pytest>=8.3.5,<8.5.0",
"pytest-cov~=6.0.0",
"types-pyyaml~=6.0.12.20250326",
]
[project.urls]

@@ -43,5 +65,9 @@ Homepage = "https://github.com/USTC-KnowledgeComputingLab/qmb"

[tool.setuptools.package-data]
qmb = ["*.cpp", "*.cu"]
[tool.setuptools_scm]
version_file = "qmb/_version.py"
version_scheme = "no-guess-dev"
fallback_version = "0.0.0"

@@ -52,9 +78,14 @@ [tool.yapf]

[tool.cibuildwheel.linux]
archs = ["x86_64", "i686", "aarch64", "ppc64le", "s390x"]
[tool.pylint]
max-line-length = 200
ignore-paths = ["qmb/_version.py"]
disable = ["duplicate-code"]
[tool.cibuildwheel.macos]
archs = ["x86_64", "arm64", "universal2"]
[tool.mypy]
disallow_untyped_calls = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
[tool.cibuildwheel.windows]
archs = ["AMD64", "x86", "ARM64"]
[tool.pytest.ini_options]
addopts = "--cov=qmb"

@@ -1,7 +0,7 @@

Metadata-Version: 2.1
Metadata-Version: 2.4
Name: qmb
Version: 0.0.10
Version: 0.0.51
Summary: Quantum Manybody Problem
Author-email: Hao Zhang <hzhangxyz@outlook.com>
License: GPLv3
License-Expression: GPL-3.0-or-later
Project-URL: Homepage, https://github.com/USTC-KnowledgeComputingLab/qmb

@@ -14,5 +14,5 @@ Project-URL: Documentation, https://github.com/USTC-KnowledgeComputingLab/qmb

Classifier: Environment :: Console
Classifier: License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
Classifier: Programming Language :: C++
Classifier: Programming Language :: Python :: 3
Classifier: Environment :: GPU :: NVIDIA CUDA
Classifier: Topic :: Scientific/Engineering :: Mathematics

@@ -25,8 +25,133 @@ Classifier: Topic :: Scientific/Engineering :: Physics

License-File: LICENSE.md
Requires-Dist: numpy
Requires-Dist: scipy
Requires-Dist: torch
Requires-Dist: tyro
Requires-Dist: openfermion
Requires-Dist: platformdirs~=4.3.7
Requires-Dist: numpy~=1.26.4
Requires-Dist: scipy~=1.15.2
Requires-Dist: torch<2.8,>=2.6
Requires-Dist: pybind11~=2.13.6
Requires-Dist: ninja~=1.11.1.4
Requires-Dist: tyro~=0.9.18
Requires-Dist: pyyaml~=6.0.2
Requires-Dist: openfermion~=1.7.0
Requires-Dist: tensorboard~=2.19.0
Requires-Dist: standard-imghdr~=3.13.0; python_version >= "3.13"
Provides-Extra: dev
Requires-Dist: yapf~=0.43.0; extra == "dev"
Requires-Dist: pylint~=3.3.6; extra == "dev"
Requires-Dist: mypy<1.18,>=1.15; extra == "dev"
Requires-Dist: pytest<8.5.0,>=8.3.5; extra == "dev"
Requires-Dist: pytest-cov~=6.0.0; extra == "dev"
Requires-Dist: types-pyyaml~=6.0.12.20250326; extra == "dev"
Dynamic: license-file
# Quantum-Many-Body
# An efficient Neural-Network Quantum States Architecture for Strongly Correlated Systems
The Quantum-Many-Body (`qmb`) is a powerful tool designed to solve quantum-many-body problems.
## About The Project
This repository hosts a [Python][python-url] package named `qmb`, dedicated to solving quantum-many-body problem.
It implements a suite of algorithms and interfaces with various model descriptors, such as the [OpenFermion][openfermion-url] format and FCIDUMP.
Additionally, `qmb` can efficiently utilize accelerators such as GPU(s) to enhance its performance.
The package's main entry point is a command line interface (CLI) application, also named `qmb`.
## Getting Started
Users can run this application either using [Docker][docker-url] or locally.
Both approaches require GPU(s) with [CUDA][cuda-url] support and a properly installed GPU driver, which is typically included with the installation of the CUDA Toolkit.
### Run with Docker
After installing [Docker][docker-url] with [CUDA support][docker-cuda-url], pull [our prebuilt Docker image][our-docker-url] using:
```
docker pull hzhangxyz/qmb
```
If users experience network issues, consider [configuring Docker mirrors][docker-mirror-url].
Then, user can run `qmb` with
```
docker run --device=nvidia.com/gpu=all --rm -it hzhangxyz/qmb --help
```
This command utilizes Docker's [CDI][docker-cuda-cdi-url] feature to enable CUDA devices in `--device=nvidia.com/gpu=all`.
Alternatively, for [legacy][docker-cuda-legacy-url] support, users can run:
```
docker run --gpus all --rm -it hzhangxyz/qmb --help
```
Please note that we currently provide Docker images for Linux/AMD64 only.
When running with Docker, users might want to [mount][docker-mount-url] a local folder to share storage between the container and the local machine such as using the `-v` option.
### Run locally
To install locally, users first needs to install the [CUDA toolkit][cuda-url].
The `qmb` requires Python >= 3.12.
After setting up a compatible Python environment such as using [Anaconda][anaconda-url], [Miniconda][miniconda-url], [venv][venv-url] or [pyenv][pyenv-url], users can install [our prebuilt package][our-pypi-url] using:
```
pip install qmb
```
If users face network issues, consider setting up a mirror with the `-i` option.
Users can then invoke the `qmb` script with:
```
qmb --help
```
Please note that if the CUDA toolkit version is too old, users must install a compatible PyTorch version before running `pip install qmb`.
For example, use `pip install torch --index-url https://download.pytorch.org/whl/cu118` for CUDA 11.8 (see [PyTorch’s guide][pytorch-install-url] for details).
This older CUDA-compatible PyTorch must be installed first, otherwise, users will need to uninstall all existing PyTorch/CUDA-related python packages before reinstalling the correct version.
## Usage
The main entry point of this package is a CLI script named `qmb`.
Use the following command to view its usage:
```
qmb --help
```
This command provides a collection of subcommands, such as `imag`.
To access detailed help for a specific subcommand, users can append `--help` to the command.
For example, use `qmb haar --help` to view the help information for the `imag` subcommand.
Typically, `qmb` requires a specific descriptor for a particular physical or chemical model to execute.
We have collected a set of such models [here][models-url].
Users can clone or download this dataset into a folder named `models` within their current working directory.
This folder `models` is the default location which `qmb` will search for the necessary model files.
Alternatively, users can specify a custom path by setting the `$QMB_MODEL_PATH` environment variable, thereby overriding the default behavior.
After cloning or downloading the dataset, users can calculate the ground state of the $N_2$ system by running the command:
```
qmb haar openfermion mlp -PN2
```
This command utilizes the `imag` subcommand with the descriptor in OpenFermion format and the [mlp network][naqs-url],
It specifies the $N_2$ model via the `-PN2` flag since the $N_2$ model is loaded from the file `N2.hdf5` in the folder `models`.
For more detailed information, please refer to the help command and the documentation.
## Contributing
Contributions are welcome! Please see [CONTRIBUTING.md](CONTRIBUTING.md) for detailed guidelines.
## License
This project is distributed under the GPLv3 License. See [LICENSE.md](LICENSE.md) for more information.
[python-url]: https://www.python.org/
[openfermion-url]: https://quantumai.google/openfermion
[docker-url]: https://www.docker.com/
[cuda-url]: https://docs.nvidia.com/cuda/
[docker-cuda-url]: https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/index.html
[our-docker-url]: https://hub.docker.com/r/hzhangxyz/qmb
[docker-mirror-url]: https://docs.docker.com/docker-hub/image-library/mirror/
[docker-cuda-cdi-url]: https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/cdi-support.html
[docker-cuda-legacy-url]: https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/install-guide.html
[anaconda-url]: https://www.anaconda.com/
[miniconda-url]: https://docs.anaconda.com/miniconda/
[venv-url]: https://docs.python.org/3/library/venv.html
[pyenv-url]: https://github.com/pyenv/pyenv
[our-pypi-url]: https://pypi.org/project/qmb/
[docker-mount-url]: https://docs.docker.com/engine/storage/volumes/
[pytorch-install-url]: https://pytorch.org/get-started/locally/
[models-url]: https://huggingface.co/datasets/USTC-KnowledgeComputingLab/qmb-models
[naqs-url]: https://github.com/tomdbar/naqs-for-quantum-chemistry

@@ -1,5 +0,21 @@

numpy
scipy
torch
tyro
openfermion
platformdirs~=4.3.7
numpy~=1.26.4
scipy~=1.15.2
torch<2.8,>=2.6
pybind11~=2.13.6
ninja~=1.11.1.4
tyro~=0.9.18
pyyaml~=6.0.2
openfermion~=1.7.0
tensorboard~=2.19.0
[:python_version >= "3.13"]
standard-imghdr~=3.13.0
[dev]
yapf~=0.43.0
pylint~=3.3.6
mypy<1.18,>=1.15
pytest<8.5.0,>=8.3.5
pytest-cov~=6.0.0
types-pyyaml~=6.0.12.20250326
.clang-format
.gitignore
.pre-commit-config.yaml
CONTRIBUTING.md
Dockerfile
LICENSE.md
README.md
pyproject.toml
setup.py
.github/PULL_REQUEST_TEMPLATE.md
.github/dependabot.yml
.github/ISSUE_TEMPLATE/BUG-REPORT.yml
.github/ISSUE_TEMPLATE/DOCUMENTATION.yml
.github/ISSUE_TEMPLATE/FEATURE-REQUEST.yml
.github/workflows/docker.yml
.github/workflows/pre-commit.yml
.github/workflows/wheels.yml
.woodpecker/README.md
.woodpecker/docker.yml
.woodpecker/pre-commit.yml
.woodpecker/wheels.yml
qmb/__init__.py
qmb/__main__.py
qmb/_hamiltonian.cpp
qmb/_hamiltonian_cpu.cpp
qmb/_hamiltonian_cuda.cu
qmb/_version.py
qmb/attention.py
qmb/bitspack.py
qmb/chop_imag.py
qmb/common.py
qmb/crossmlp.py
qmb/fcidump.py
qmb/haar.py
qmb/hamiltonian.py
qmb/hubbard.py
qmb/ising.py
qmb/list_loss.py
qmb/losses.py
qmb/mlp.py
qmb/model_dict.py
qmb/openfermion.py
qmb/optimizer.py
qmb/precompile.py
qmb/random_engine.py
qmb/rldiag.py
qmb/run.py
qmb/subcommand_dict.py
qmb/version.py
qmb/vmc.py
qmb.egg-info/PKG-INFO

@@ -18,2 +55,4 @@ qmb.egg-info/SOURCES.txt

qmb.egg-info/requires.txt
qmb.egg-info/top_level.txt
qmb.egg-info/top_level.txt
tests/bitspack_test.py
tests/extension_test.py

@@ -1,3 +0,7 @@

__all__ = ["version", "__version__"]
"""
The qmb package provides tools and algorithms to solve problems related to quantum many-body systems.
For more details, check out the README.md file.
"""
from .version import version, __version__

@@ -1,6 +0,11 @@

# file generated by setuptools_scm
# file generated by setuptools-scm
# don't change, don't track in version control
__all__ = ["__version__", "__version_tuple__", "version", "version_tuple"]
TYPE_CHECKING = False
if TYPE_CHECKING:
from typing import Tuple, Union
from typing import Tuple
from typing import Union
VERSION_TUPLE = Tuple[Union[int, str], ...]

@@ -15,3 +20,3 @@ else:

__version__ = version = '0.0.10'
__version_tuple__ = version_tuple = (0, 0, 10)
__version__ = version = '0.0.51'
__version_tuple__ = version_tuple = (0, 0, 51)

@@ -0,4 +1,11 @@

"""
This module provides the version information for the package.
It attempts to import the version number generated by setuptools_scm.
If that fails, it defaults to "0.0.0".
"""
try:
from ._version import version, __version__
from ._version import version, __version__ # pylint: disable=unused-import
except ImportError:
version = __version__ = "0.0.0"
+112
-1

@@ -1,1 +0,112 @@

# Quantum-Many-Body
# An efficient Neural-Network Quantum States Architecture for Strongly Correlated Systems
The Quantum-Many-Body (`qmb`) is a powerful tool designed to solve quantum-many-body problems.
## About The Project
This repository hosts a [Python][python-url] package named `qmb`, dedicated to solving quantum-many-body problem.
It implements a suite of algorithms and interfaces with various model descriptors, such as the [OpenFermion][openfermion-url] format and FCIDUMP.
Additionally, `qmb` can efficiently utilize accelerators such as GPU(s) to enhance its performance.
The package's main entry point is a command line interface (CLI) application, also named `qmb`.
## Getting Started
Users can run this application either using [Docker][docker-url] or locally.
Both approaches require GPU(s) with [CUDA][cuda-url] support and a properly installed GPU driver, which is typically included with the installation of the CUDA Toolkit.
### Run with Docker
After installing [Docker][docker-url] with [CUDA support][docker-cuda-url], pull [our prebuilt Docker image][our-docker-url] using:
```
docker pull hzhangxyz/qmb
```
If users experience network issues, consider [configuring Docker mirrors][docker-mirror-url].
Then, user can run `qmb` with
```
docker run --device=nvidia.com/gpu=all --rm -it hzhangxyz/qmb --help
```
This command utilizes Docker's [CDI][docker-cuda-cdi-url] feature to enable CUDA devices in `--device=nvidia.com/gpu=all`.
Alternatively, for [legacy][docker-cuda-legacy-url] support, users can run:
```
docker run --gpus all --rm -it hzhangxyz/qmb --help
```
Please note that we currently provide Docker images for Linux/AMD64 only.
When running with Docker, users might want to [mount][docker-mount-url] a local folder to share storage between the container and the local machine such as using the `-v` option.
### Run locally
To install locally, users first needs to install the [CUDA toolkit][cuda-url].
The `qmb` requires Python >= 3.12.
After setting up a compatible Python environment such as using [Anaconda][anaconda-url], [Miniconda][miniconda-url], [venv][venv-url] or [pyenv][pyenv-url], users can install [our prebuilt package][our-pypi-url] using:
```
pip install qmb
```
If users face network issues, consider setting up a mirror with the `-i` option.
Users can then invoke the `qmb` script with:
```
qmb --help
```
Please note that if the CUDA toolkit version is too old, users must install a compatible PyTorch version before running `pip install qmb`.
For example, use `pip install torch --index-url https://download.pytorch.org/whl/cu118` for CUDA 11.8 (see [PyTorch’s guide][pytorch-install-url] for details).
This older CUDA-compatible PyTorch must be installed first, otherwise, users will need to uninstall all existing PyTorch/CUDA-related python packages before reinstalling the correct version.
## Usage
The main entry point of this package is a CLI script named `qmb`.
Use the following command to view its usage:
```
qmb --help
```
This command provides a collection of subcommands, such as `imag`.
To access detailed help for a specific subcommand, users can append `--help` to the command.
For example, use `qmb haar --help` to view the help information for the `imag` subcommand.
Typically, `qmb` requires a specific descriptor for a particular physical or chemical model to execute.
We have collected a set of such models [here][models-url].
Users can clone or download this dataset into a folder named `models` within their current working directory.
This folder `models` is the default location which `qmb` will search for the necessary model files.
Alternatively, users can specify a custom path by setting the `$QMB_MODEL_PATH` environment variable, thereby overriding the default behavior.
After cloning or downloading the dataset, users can calculate the ground state of the $N_2$ system by running the command:
```
qmb haar openfermion mlp -PN2
```
This command utilizes the `imag` subcommand with the descriptor in OpenFermion format and the [mlp network][naqs-url],
It specifies the $N_2$ model via the `-PN2` flag since the $N_2$ model is loaded from the file `N2.hdf5` in the folder `models`.
For more detailed information, please refer to the help command and the documentation.
## Contributing
Contributions are welcome! Please see [CONTRIBUTING.md](CONTRIBUTING.md) for detailed guidelines.
## License
This project is distributed under the GPLv3 License. See [LICENSE.md](LICENSE.md) for more information.
[python-url]: https://www.python.org/
[openfermion-url]: https://quantumai.google/openfermion
[docker-url]: https://www.docker.com/
[cuda-url]: https://docs.nvidia.com/cuda/
[docker-cuda-url]: https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/index.html
[our-docker-url]: https://hub.docker.com/r/hzhangxyz/qmb
[docker-mirror-url]: https://docs.docker.com/docker-hub/image-library/mirror/
[docker-cuda-cdi-url]: https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/cdi-support.html
[docker-cuda-legacy-url]: https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/install-guide.html
[anaconda-url]: https://www.anaconda.com/
[miniconda-url]: https://docs.anaconda.com/miniconda/
[venv-url]: https://docs.python.org/3/library/venv.html
[pyenv-url]: https://github.com/pyenv/pyenv
[our-pypi-url]: https://pypi.org/project/qmb/
[docker-mount-url]: https://docs.docker.com/engine/storage/volumes/
[pytorch-install-url]: https://pytorch.org/get-started/locally/
[models-url]: https://huggingface.co/datasets/USTC-KnowledgeComputingLab/qmb-models
[naqs-url]: https://github.com/tomdbar/naqs-for-quantum-chemistry
import os
import glob
from setuptools import setup
from pybind11.setup_helpers import Pybind11Extension
cpp_files = glob.glob(os.path.join("qmb", "*.cpp"))
ext_modules = [Pybind11Extension(
f"qmb.{os.path.splitext(os.path.basename(cpp_file))[0]}",
[cpp_file],
cxx_std=17,
) for cpp_file in cpp_files]
setup(ext_modules=ext_modules)

Sorry, the diff of this file is not supported yet