Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

hiPhive

Package Overview
Dependencies
Maintainers
2
Versions
15
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

hiPhive - npm Package Compare versions

Comparing version
0.6
to
0.7
+87
hiphive/fitting/model_selection.py
from typing import Dict, List
import numpy as np
from sklearn.metrics import r2_score
def get_model_metrics(A: np.ndarray,
parameters: List[float],
y_target: np.ndarray,
y_predicted: np.ndarray) -> Dict[str, float]:
"""Computes model parameters and model evaluation metrics including
* root-mean square-error (RMSE) over training set
* R^2 score over training set (coefficient of determination, regression score function)
* Akaike information criterion (AIC)
* Bayesian information criterion (BIC)
* leave-one-out cross-validation (LOO-CV, estimate)
* number of non-zero parameters (n_nzp)
* number of parameters (n_parameters)
Parameters
----------
A
fit matrix used to train model
parameters
model parameters
y_target
target values used to train model
y_predicted
predicted values from model
"""
n_samples, n_parameters_tot = A.shape
n_parameters = np.count_nonzero(parameters)
# compute rmse
delta_y = y_predicted - y_target
mse = np.mean(delta_y**2)
rmse = np.sqrt(mse)
# evaluate Information Criterias
aic = get_aic(mse, n_samples, n_parameters)
bic = get_bic(mse, n_samples, n_parameters)
# r2 score
r2 = r2_score(y_target, y_predicted)
# summarize
metrics = dict(rmse_train=rmse,
R2_train=r2,
AIC=aic,
BIC=bic,
n_nonzero_parameters=n_parameters)
return metrics
def get_aic(mse: float, n_samples: int, n_parameters: int) -> float:
"""Returns the Akaiki information criterion (AIC)."""
aic = n_samples * np.log(mse) + 2 * n_parameters
return aic
def get_bic(mse: float, n_samples: int, n_parameters: int) -> float:
"""Returns the Bayesian information criterion (BIC)."""
bic = n_samples * np.log(mse) + n_parameters * np.log(n_samples)
return bic
def estimate_loocv(A: np.ndarray,
y_target: np.ndarray,
y_predicted: np.ndarray) -> float:
"""Calculates the approximative leave-one-out cross-validation
(LOO-CV) root mean square error (RMSE).
Parameters
----------
A
Matrix in OLS problem y=Ax, should be inversible
y_target
Target values for y
y_predicted
OLS obtained prediction for y
"""
if len(A[1, :]) > len(A[:, 1]):
raise ValueError('Matrix is underdetermined')
H = A.dot(np.linalg.inv(A.T.dot(A))).dot(A.T)
e = (y_target - y_predicted) / (1 - np.diag(H))
return np.linalg.norm(e) / np.sqrt(len(e))
import pickle
def _write_pickle(fname, data):
""" Write data to pickle file with filename fname """
with open(fname, 'wb') as handle:
pickle.dump(data, handle, protocol=pickle.HIGHEST_PROTOCOL)
def _read_pickle(fname):
""" Read pickle file and return content """
with open(fname, 'rb') as handle:
return pickle.load(handle)
import numpy as np
from itertools import product
def write_fcs_gpumd(fname_fc, fname_clusters, fcs, order, tol=1e-10):
"""
Writes force constants of given order in GPUMD format.
Parameters
----------
fname_fc : str
name of file which contains the lookup force constants
fname_clusters : str
name of file which contains the clusters and the fc lookup index
fcs : ForceConstants
force constants
order : int
force constants for this order will be written to file
tol : float
if the norm of a force constant is less than tol then it is not written.
if two force-constants are within tol; they are considered equal.
"""
cluster_lookup, fc_lookup = _get_lookup_data_smart(fcs, order, tol)
_write_clusters(fname_clusters, cluster_lookup, order)
_write_fc_lookup(fname_fc, fc_lookup, order)
def _write_fc_lookup(fname, fc_lookup, order):
""" Writes the lookup force constants to file """
fmt = '{}' + ' {}'*order
with open(fname, 'w') as f:
f.write(str(len(fc_lookup)) + '\n\n')
for fc in fc_lookup:
for xyz in product(range(3), repeat=order):
f.write(fmt.format(*xyz, fc[xyz])+'\n')
f.write('\n')
def _write_clusters(fname, cluster_lookup, order):
""" Writes the cluster lookup to file """
fmt = '{}' + ' {}'*order
with open(fname, 'w') as f:
f.write(str(len(cluster_lookup)) + '\n\n')
for c, i in cluster_lookup.items():
line = fmt.format(*c, i) + '\n'
f.write(line)
def _get_lookup_data_smart(fcs, order, tol):
""" Groups force constants for a given order into groups for which the
force constant is identical. """
fc_lookup = []
cluster_lookup = dict()
axis = tuple(range(1, order+1))
clusters = [c for c in fcs._fc_dict.keys() if len(c) == order and np.linalg.norm(fcs[c]) > tol]
fc_all = np.array([fcs[c] for c in clusters])
indices = list(range(len(clusters)))
while len(indices) > 0:
i = indices[0]
delta = fc_all[indices] - fc_all[i]
delta_norm = np.sqrt(np.sum(delta**2, axis=axis))
inds_to_del = [indices[x] for x in np.where(delta_norm < tol)[0]]
assert i in inds_to_del
fc_lookup.append(fc_all[i])
for j in inds_to_del:
indices.remove(j)
cluster_lookup[clusters[j]] = len(fc_lookup)-1
return cluster_lookup, fc_lookup
def _get_lookup_data_naive(fcs, order, tol):
""" Groups force constants for a given order into groups for which the
force constant is identical. """
fc_lookup = []
cluster_lookup = dict()
clusters = [c for c in fcs._fc_dict.keys() if len(c) == order]
for c in clusters:
fc1 = fcs[c]
if np.linalg.norm(fc1) < tol:
continue
for i, fc2 in enumerate(fc_lookup):
if np.linalg.norm(fc1 - fc2) < tol:
cluster_lookup[c] = i
break
else:
cluster_lookup[c] = len(fc_lookup)
fc_lookup.append(fc1)
return cluster_lookup, fc_lookup
def write_fcp_txt(fname, path, n_types, max_order):
""" Write driver potential file for GPUMD.
Parameters
----------
fname : str
file name
path : str
path to directory with force constant file
n_types : int
number of atom types
max_order : int
maximum order of the force constant potential
Format
------
Format is a simple file containing the following
fcp number_of_atom_types
highest_order
path_to_force_constant_files
which in practice for a binary system with a sixth order model would mean
fcp 2
6
/path/to/your/folder
"""
with open(fname, 'w') as f:
f.write('fcp {}\n'.format(n_types))
f.write('{}\n'.format(max_order))
f.write('{}'.format(path.rstrip('/'))) # without a trailing '/'
def write_r0(fname, atoms):
"""
Write GPUMD r0 file, with reference atomic positions.
Parameters
----------
fname : str
name of file to which to write the atomic positions
atoms : ase.Atoms
input structure
"""
line = '{} {} {}\n'
with open(fname, 'w') as f:
for a in atoms:
f.write(line.format(*a.position))
""" This module contains functions and variables to control hiPhive's logging
* `logger` - the module logger
* set_config - function to control level and logfile
"""
import logging
import sys
from timeit import default_timer as timer
# This is the root logger of hiPhive
logger = logging.getLogger('hiphive')
# Will process all levels of INFO or higher
logger.setLevel(logging.INFO)
# If you know what you are doing you may set this to True
logger.propagate = False
# The hiPhive logger will collect events from childs and the default behaviour
# is to print it directly to stdout
ch = logging.StreamHandler(sys.stdout)
logger.addHandler(ch)
continuous_logging = False
# TODO: use Context management protocol instead
class Progress:
""" Progress bar like functionality. """
def __init__(self, tot=None, mode='frac', estimate_remaining=True):
if tot is None:
self._tot = '?'
assert not estimate_remaining
else:
self._tot = tot
self._progress = 0
self._estimate_remaining = estimate_remaining
self._last_update = 0
self._start = timer()
def tick(self):
self._progress += 1
delta = timer() - self._last_update
if continuous_logging and delta > 2:
self._last_update = timer()
print('\r' + ' ' * 70 + '\r', end='', flush=True)
print('{}/{}={:.3%}'.format(self._progress,
self._tot, self._progress/self._tot), end='', flush=True)
if self._estimate_remaining and self._tot != '?':
remaining_time = (self._tot - self._progress) * (
timer() - self._start) / self._progress
print(' time remaining: {:.3}'.format(remaining_time), end='',
flush=True)
def close(self):
if continuous_logging:
print('\r' + ' ' * 70 + '\r', end='', flush=True)
s = timer() - self._start
d, remainder = divmod(s, 60*60*24)
h, remainder = divmod(remainder, 60*60)
m, s = divmod(remainder, 60)
logger.info('Done in {}d {}h {}m {:.3}s'
.format(int(d), int(h), int(m), s))
def set_config(filename=None, level=None, continuous=None):
"""
Alters the way logging is handled.
Parameters
----------
filename : str
name of file the log should be sent to
level : int
verbosity level; see `Python logging library
<https://docs.python.org/3/library/logging.html>`_ for details
continuous : bool
if True the progress will be continously updated
"""
# If a filename is provided a logfile will be created
if filename is not None:
fh = logging.FileHandler(filename)
logger.addHandler(fh)
if level is not None:
logger.setLevel(level)
if continuous is not None:
global continuous_logging
continuous_logging = continuous
"""
This module provides functions for reading and writing data files
in phonopy and phono3py formats.
"""
import os
import warnings
from itertools import product
import hiphive
import numpy as np
from .logging_tools import logger
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=FutureWarning)
import h5py
logger = logger.getChild('io_phonopy')
def _filename_to_format(filename: str) -> str:
""" Tries to guess the format from the filename """
basename = os.path.basename(filename)
if basename == 'FORCE_CONSTANTS':
return 'text'
if '.' in basename:
extension = basename.split('.')[-1]
if extension == 'hdf5':
return 'hdf5'
raise ValueError('Could not guess file format')
def read_phonopy_fc2(filename: str, format: str = None) -> np.ndarray:
"""Parses a second-order force constant file in phonopy format.
Parameters
----------
filename : str
input file name
format : str
specify the file-format, if None tries to guess the format from filename;
possible values: "text", "hdf5"
Returns
-------
numpy.ndarray
second-order force constant matrix (`N,N,3,3` array)
where `N` is the number of atoms
"""
fc2_readers = {
'text': _read_phonopy_fc2_text,
'hdf5': _read_phonopy_fc2_hdf5
}
if format is None:
format = _filename_to_format(filename)
if format not in fc2_readers.keys():
raise ValueError('Did not recognize format {}'.format(format))
return fc2_readers[format](filename)
def write_phonopy_fc2(filename: str,
fc2,
format: str = None) -> None:
"""Writes second-order force constant matrix in phonopy format.
Parameters
----------
filename : str
output file name
fc2 : ForceConstants or numpy.ndarray
second-order force constant matrix
format : str
specify the file-format, if None try to guess the format from filename;
possible values: "text", "hdf5"
"""
fc2_writers = {
'text': _write_phonopy_fc2_text,
'hdf5': _write_phonopy_fc2_hdf5
}
# get fc2_array
if isinstance(fc2, hiphive.ForceConstants):
fc2_array = fc2.get_fc_array(order=2)
elif isinstance(fc2, np.ndarray):
fc2_array = fc2
else:
raise TypeError('fc2 should be ForceConstants or NumPy array')
# check that fc2 has correct shape
n_atoms = fc2_array.shape[0]
if fc2_array.shape != (n_atoms, n_atoms, 3, 3):
raise ValueError('fc2 has wrong shape')
# write
if format is None:
format = _filename_to_format(filename)
if format not in fc2_writers.keys():
raise ValueError('Did not recognize format {}'.format(format))
fc2_writers[format](filename, fc2_array)
def read_phonopy_fc3(filename: str) -> np.ndarray:
"""Parses a third order force constant file in phonopy hdf5 format.
Parameters
----------
filename : str
input file name
Returns
-------
numpy.ndarray
third order force constant matrix
"""
with h5py.File(filename, 'r') as hf:
if 'fc3' in hf.keys():
fc3 = hf['fc3'][:]
else:
raise IOError('Could not find fc3 in file {}'.format(filename))
return fc3
def write_phonopy_fc3(filename: str, fc3) -> None:
"""Writes third order force constant matrix in phonopy hdf5 format.
Parameters
----------
filename : str
output file name
fc3 : ForceConstants or numpy.ndarray
third order force constant matrix
"""
if isinstance(fc3, hiphive.ForceConstants):
fc3_array = fc3.get_fc_array(order=3)
elif isinstance(fc3, np.ndarray):
fc3_array = fc3
else:
raise TypeError('fc3 should be ForceConstants or NumPy array')
# check that fc3 has correct shape
n_atoms = fc3_array.shape[0]
if fc3_array.shape != (n_atoms, n_atoms, n_atoms, 3, 3, 3):
raise ValueError('fc3 has wrong shape')
with h5py.File(filename, 'w') as hf:
hf.create_dataset('fc3', data=fc3_array, compression='gzip')
hf.flush()
def _read_phonopy_fc2_text(filename: str) -> np.ndarray:
""" Reads phonopy-fc2 file in text format. """
with open(filename, 'r') as f:
# read shape of force constants
line = f.readline()
line_ints = [int(x) for x in line.split()]
if len(line_ints) == 1:
n_atoms = line_ints[0]
elif len(line_ints) == 2:
assert line_ints[0] == line_ints[1]
n_atoms = line_ints[0]
else:
raise ValueError('Unsupported or incorrect phonopy format')
fc2 = np.full((n_atoms, n_atoms, 3, 3), np.nan)
# read force constants
lines = f.readlines()
for n, line in enumerate(lines):
flds = line.split()
if len(flds) == 2:
i = int(flds[0]) - 1 # phonopy index starts with 1
j = int(flds[1]) - 1
for x in range(3):
fc_row = lines[n + x + 1].split()
for y in range(3):
fc2[i][j][x][y] = float(fc_row[y])
return fc2
def _read_phonopy_fc2_hdf5(filename: str) -> np.ndarray:
""" Reads phonopy-fc2 file in hdf5 format. """
with h5py.File(filename, 'r') as hf:
if 'force_constants' in hf.keys():
fc2 = hf['force_constants'][:]
elif 'fc2' in hf.keys():
fc2 = hf['fc2'][:]
else:
raise IOError('Could not find fc2 in file {}'.format(filename))
return fc2
def _write_phonopy_fc2_text(filename: str, fc2: np.ndarray) -> None:
""" Writes second-order force constants to file in text format. """
n_atoms = fc2.shape[0]
with open(filename, 'w') as f:
f.write('{:5d} {:5d}\n'.format(n_atoms, n_atoms))
for i, j in product(range(n_atoms), repeat=2):
fc2_ij = fc2[(i, j)]
if fc2_ij.shape != (3, 3):
raise ValueError('Invalid shape for fc2[({},{})]'.format(i, j))
f.write('{:-5d}{:5d}\n'.format(i + 1, j + 1))
for row in fc2_ij:
f.write((3*' {:22.15f}'+'\n').format(*tuple(row)))
def _write_phonopy_fc2_hdf5(filename: str, fc2: np.ndarray) -> None:
""" Writes second-order force constants to file in hdf5 format. """
with h5py.File(filename, 'w') as hf:
hf.create_dataset('fc2', data=fc2, compression='gzip')
hf.flush()
from itertools import product
import numpy as np
def _obj2str(a, none_char='-'):
""" Casts object a to str. """
if isinstance(a, float):
# if the float is 2.49999999 then round
if str(a)[::-1].find('.') > 5:
return '{:.5f}'.format(a)
elif a is None:
return none_char
return str(a)
_array2str = np.vectorize(_obj2str)
def print_table(matrix, sum_=False):
""" Prints matrix data in a nice table format.
The matrix element matrix[i][j] should correspond to information about
order j+2 and n-body i+1.
Example
--------
>> matrix = numpy.array([[None, None], [4.0, 3.0]])
>> print_table(matrix)
body/order | 2 | 3
------------------------
1 | - | -
2 | 4.0 | 3.0
Parameters
----------
matrix : numpy.ndarray
matrix to be printed
sum: : bool
whether or not to print the sum along each row and column
"""
table_str = table_array_to_string(matrix, sum_)
print(table_str)
def table_array_to_string(matrix, sum_=False):
""" Generate nice table string from a numpy array with floats/ints """
table_array = _generate_table_array(matrix, sum_)
table_array_str = _array2str(table_array)
table_str = _generate_table_str(table_array_str)
return table_str
def _generate_table_array(table_array, sum_=False):
""" Generate table in numpy array format """
# initialze table
n_rows, n_cols = table_array.shape
A = _build_table_frame(order=n_cols+1, nbody=n_rows, sum_=sum_)
# fill table
for order, nbody in product(range(2, n_cols+2), range(1, n_rows+1)):
if nbody <= order:
A[nbody, order-1] = table_array[nbody-1, order-2]
if sum_:
for i, row in enumerate(A[1:-1, 1:-1], start=1):
A[i, -1] = sum(val for val in row if val is not None)
for i, col in enumerate(A[1:-1, 1:-1].T, start=1):
A[-1, i] = sum(val for val in col if val is not None)
A[-1, -1] = ''
return A
def _generate_table_str(table_array):
""" Generate a string from a numpy array of strings """
table_str = []
n_rows, n_cols = table_array.shape
# find maximum widths for each column
widths = []
for i in range(n_cols):
widths.append(max(len(val) for val in table_array[:, i])+2)
# formatting str for each row
row_format = '|'.join('{:^'+str(width)+'}' for width in widths)
# finalize
for i in range(n_rows):
if i == 1:
table_str.append('-' * (sum(widths)+n_cols-1))
table_str.append(row_format.format(*table_array[i, :]))
table_str = '\n'.join(table_str)
return table_str
def _build_table_frame(order, nbody, sum_=False):
""" Builds/initializes table/array. """
if sum_:
A = np.empty((nbody+2, order+1), dtype='object')
A[0, -1] = 'sum'
A[-1, 0] = 'sum'
else:
A = np.empty((nbody+1, order), dtype='object')
A[0][0] = 'body/order'
A[0, 1:order] = range(2, order+1)
A[1:nbody+1, 0] = range(1, nbody+1)
return A
if __name__ == "__main__":
# input dummy cutoff table
# insert row for nbody=1
cutoffs = np.array([[None, None, None, None, None],
[6.0, 6.0, 6.0, 3.7, 3.7],
[5.0, 5.0, 5.0, 3.0, 3.0],
[3.7, 3.7, 3.7, 0.0, 0.0]])
# input dummy cluster count table
cluster_counts = np.array([[1, 3, 5, 5, 2],
[12, 22, 39, 42, 58],
[19, 41, 123, 421, 912],
[42, 112, 410, 617, 3271]])
print_table(cutoffs)
print('\n')
print_table(cluster_counts, sum_=False)
print('\n')
print_table(cluster_counts, sum_=True)
"""
Helper functions for reading and writing objects to tar files
"""
import pickle
import tempfile
import warnings
import ase.io as aseIO
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=FutureWarning)
import h5py
def add_ase_atoms_to_tarfile(tar_file, atoms, arcname, format='json'):
""" Adds an ase.Atoms object to tar_file """
temp_file = tempfile.NamedTemporaryFile()
aseIO.write(temp_file.name, atoms, format=format)
temp_file.seek(0)
tar_info = tar_file.gettarinfo(arcname=arcname, fileobj=temp_file)
tar_file.addfile(tar_info, temp_file)
def read_ase_atoms_from_tarfile(tar_file, arcname, format='json'):
""" Reads ase.Atoms from tar file """
temp_file = tempfile.NamedTemporaryFile()
temp_file.write(tar_file.extractfile(arcname).read())
temp_file.seek(0)
atoms = aseIO.read(temp_file.name, format=format)
return atoms
def add_items_to_tarfile_hdf5(tar_file, items, arcname):
""" Add items to one hdf5 file """
temp_file = tempfile.NamedTemporaryFile()
hf = h5py.File(temp_file.name, 'w')
for key, value in items.items():
hf.create_dataset(key, data=value, compression='gzip')
hf.close()
temp_file.seek(0)
tar_info = tar_file.gettarinfo(arcname=arcname, fileobj=temp_file)
tar_file.addfile(tar_info, temp_file)
temp_file.close()
def add_items_to_tarfile_pickle(tar_file, items, arcname):
""" Add items by pickling them """
temp_file = tempfile.TemporaryFile()
pickle.dump(items, temp_file)
temp_file.seek(0)
tar_info = tar_file.gettarinfo(arcname=arcname, fileobj=temp_file)
tar_file.addfile(tar_info, temp_file)
temp_file.close()
def add_items_to_tarfile_custom(tar_file, items):
""" Add items assuming they have a custom write function """
for key, value in items.items():
temp_file = tempfile.TemporaryFile()
value.write(temp_file)
temp_file.seek(0)
tar_info = tar_file.gettarinfo(arcname=key, fileobj=temp_file)
tar_file.addfile(tar_info, temp_file)
temp_file.close()
def add_list_to_tarfile_custom(tar_file, objects, arcname):
""" Add list of objects assuming they have a custom write function """
for i, obj in enumerate(objects):
temp_file = tempfile.TemporaryFile()
obj.write(temp_file)
temp_file.seek(0)
fname = '{}_{}'.format(arcname, i)
tar_info = tar_file.gettarinfo(arcname=fname, fileobj=temp_file)
tar_file.addfile(tar_info, temp_file)
temp_file.close()
def read_items_hdf5(tar_file, arcname):
""" Read items from hdf5file inside tar_file """
# read hdf5
temp_file = tempfile.NamedTemporaryFile()
temp_file.write(tar_file.extractfile(arcname).read())
temp_file.seek(0)
hf = h5py.File(temp_file.name, 'r')
items = {key: value[:] for key, value in hf.items()}
hf.close()
return items
def read_items_pickle(tar_file, arcname):
items = dict()
items = pickle.load(tar_file.extractfile(arcname))
return items
def read_list_custom(tar_file, arcname, read_function, **kwargs):
objects = []
i = 0
while True:
try:
fname = '{}_{}'.format(arcname, i)
f = tar_file.extractfile(fname)
obj = read_function(f, **kwargs)
objects.append(obj)
f.close()
except KeyError:
break
i += 1
return objects
"""
The ``io.shengBTE`` module provides functions for reading and writing data
files in shengBTE format.
"""
import numpy as np
from itertools import product
from collections import namedtuple
from ..core.structures import Supercell
import hiphive
def read_shengBTE_fc3(filename, prim, supercell, symprec=1e-5):
"""Reads third-order force constants file in shengBTE format.
Parameters
----------
filename : str
input file name
prim : ase.Atoms
primitive cell for the force constants
supercell : ase.Atoms
supercell onto which to map force constants
symprec : float
structural symmetry tolerance
Returns
-------
fcs : ForceConstants
third order force constants for the specified supercell
"""
raw_sheng = _read_raw_sheng(filename)
sheng = _raw_to_fancy(raw_sheng, prim.cell)
fcs = _sheng_to_fcs(sheng, prim, supercell, symprec)
return fcs
def write_shengBTE_fc3(filename, fcs, prim, symprec=1e-5, cutoff=np.inf,
fc_tol=1e-8):
"""Writes third-order force constants file in shengBTE format.
Parameters
-----------
filename : str
input file name
fcs : ForceConstants
force constants; the supercell associated with this object must be
based on prim
prim : ase.Atoms
primitive configuration (must be equivalent to structure used in the
shengBTE calculation)
symprec : float
structural symmetry tolerance
cutoff : float
all atoms in cluster must be within this cutoff
fc_tol : float
if the absolute value of the largest entry in a force constant is less
than fc_tol it will not be written
"""
sheng = _fcs_to_sheng(fcs, prim, symprec, cutoff, fc_tol)
raw_sheng = _fancy_to_raw(sheng)
_write_raw_sheng(raw_sheng, filename)
def _read_raw_sheng(filename):
""" Read shengBTE fc3 file.
Parameters
----------
filename : str
input file name
Returns
-------
list
list with shengBTE block entries, where an entry consist of
[i, j, k, cell_pos2, cell_pos3, fc3]
"""
lines_per_fc_block = 32
fc3_data_shengBTE = []
with open(filename, 'r') as f:
lines = f.readlines()
num_fcs = int(lines[0])
lind = 2
for i in range(1, num_fcs+1):
# sanity check
assert int(lines[lind]) == i, (int(lines[lind]), i)
# read cell poses
cell_pos2 = np.array([float(fld) for fld in lines[lind+1].split()])
cell_pos3 = np.array([float(fld) for fld in lines[lind+2].split()])
# read basis indices
i, j, k = (int(fld) for fld in lines[lind+3].split())
# read fc
fc3_ijk = np.zeros((3, 3, 3))
for n in range(27):
x, y, z = [int(fld) - 1 for fld in lines[lind+4+n].split()[:3]]
fc3_ijk[x, y, z] = float(lines[lind+4+n].split()[-1])
# append entry
entry = [i, j, k, cell_pos2, cell_pos3, fc3_ijk]
fc3_data_shengBTE.append(entry)
lind += lines_per_fc_block
return fc3_data_shengBTE
_ShengEntry = namedtuple('Entry', ['site_0', 'site_1', 'site_2', 'pos_1',
'pos_2', 'fc', 'offset_1', 'offset_2'])
def _raw_to_fancy(raw_sheng, cell):
"""
Converts force constants as read from shengBTE file to the namedtuple
format defined above (_ShengEntry).
"""
sheng = []
for raw_entry in raw_sheng:
p1, p2 = raw_entry[3:5]
offset_1 = np.linalg.solve(cell.T, p1).round(0).astype(int)
offset_2 = np.linalg.solve(cell.T, p2).round(0).astype(int)
entry = _ShengEntry(*(i - 1 for i in raw_entry[:3]), *raw_entry[3:],
offset_1, offset_2)
sheng.append(entry)
return sheng
def _fancy_to_raw(sheng):
"""
Converts force constants namedtuple format defined above (_ShengEntry) to
format used for writing shengBTE files.
"""
raw_sheng = []
for entry in sheng:
raw_entry = list(entry[:6])
raw_entry[0] += 1
raw_entry[1] += 1
raw_entry[2] += 1
raw_sheng.append(raw_entry)
return raw_sheng
def _write_raw_sheng(raw_sheng, filename):
""" See corresponding read function. """
with open(filename, 'w') as f:
f.write('{}\n\n'.format(len(raw_sheng)))
for index, fc3_row in enumerate(raw_sheng, start=1):
i, j, k, cell_pos2, cell_pos3, fc3_ijk = fc3_row
f.write('{:5d}\n'.format(index))
f.write((3*'{:14.10f} '+'\n').format(*cell_pos2))
f.write((3*'{:14.10f} '+'\n').format(*cell_pos3))
f.write((3*'{:5d}'+'\n').format(i, j, k))
for x, y, z in product(range(3), repeat=3):
f.write((3*' {:}').format(x+1, y+1, z+1))
f.write(' {:14.10f}\n'.format(fc3_ijk[x, y, z]))
f.write('\n')
def _fcs_to_sheng(fcs, prim, symprec, cutoff, fc_tol):
""" Produces a list containing fcs in shengBTE format
prim and fcs.supercell must be aligned.
"""
supercell = Supercell(fcs.supercell, prim, symprec)
assert all(fcs.supercell.pbc) and all(prim.pbc)
n_atoms = len(supercell)
D = fcs.supercell.get_all_distances(mic=False, vector=True)
D_mic = fcs.supercell.get_all_distances(mic=True, vector=True)
M = np.eye(n_atoms, dtype=bool)
for i in range(n_atoms):
for j in range(i + 1, n_atoms):
M[i, j] = (np.allclose(D[i, j], D_mic[i, j], atol=symprec, rtol=0)
and np.linalg.norm(D[i, j]) < cutoff)
M[j, i] = M[i, j]
data = {}
for a0 in supercell:
for a1 in supercell:
if not M[a0.index, a1.index]:
continue
for a2 in supercell:
if not (M[a0.index, a2.index] and M[a1.index, a2.index]):
continue
offset_1 = np.subtract(a1.offset, a0.offset)
offset_2 = np.subtract(a2.offset, a0.offset)
sites = (a0.site, a1.site, a2.site)
key = sites + tuple(offset_1) + tuple(offset_2)
ijk = (a0.index, a1.index, a2.index)
fc = fcs[ijk]
if key in data:
assert np.allclose(data[key], fc, atol=fc_tol)
else:
data[key] = fc
sheng = []
for k, fc in data.items():
if np.max(np.abs(fc)) < fc_tol:
continue
offset_1 = k[3:6]
pos_1 = np.dot(offset_1, prim.cell)
offset_2 = k[6:9]
pos_2 = np.dot(offset_2, prim.cell)
entry = _ShengEntry(*k[:3], pos_1, pos_2, fc, offset_1, offset_2)
sheng.append(entry)
return sheng
def _sheng_to_fcs(sheng, prim, supercell, symprec):
supercell_map = Supercell(supercell, prim, symprec)
fc_array = np.zeros((len(supercell),) * 3 + (3,) * 3)
mapped_clusters = np.zeros((len(supercell),) * 3, dtype=bool)
for atom in supercell_map:
i = atom.index
for entry in sheng:
if atom.site != entry.site_0:
continue
j = supercell_map.index(entry.site_1, entry.offset_1 + atom.offset)
k = supercell_map.index(entry.site_2, entry.offset_2 + atom.offset)
ijk = (i, j, k)
if mapped_clusters[ijk]:
raise Exception('Ambiguous force constant.'
' Supercell is too small')
fc_array[ijk] = entry.fc
mapped_clusters[ijk] = True
fcs = hiphive.force_constants.ForceConstants.from_arrays(
supercell, fc3_array=fc_array)
return fcs
+5
-4
Metadata-Version: 1.2
Name: hiphive
Version: 0.6
Version: 0.7
Summary: High-order force constants for the masses

@@ -16,3 +16,4 @@ Home-page: http://hiphive.materialsmodeling.org/

extensive tutorial can be found in the
`user guide <https://hiphive.materialsmodeling.org/>`_.
`user guide <https://hiphive.materialsmodeling.org/>`_. Complete examples of
using hiphive for force constants extaction can be found at `hiphive examples <https://gitlab.com/materials-modeling/hiphive-examples/>`_.

@@ -83,4 +84,4 @@ **hiPhive** is written in Python, which allows

Also consult the `Credits <https://hiphive.materialsmodeling.org/credits>` page
of the documentation for additional references.
Also consult the `Credits <https://hiphive.materialsmodeling.org/credits>`_
page of the documentation for additional references.

@@ -87,0 +88,0 @@ **hiphive** and its development are hosted on

@@ -5,4 +5,5 @@ ase

numpy>=1.12
sklearn
scikit-learn
scipy
spglib
sympy
sympy>=1.1

@@ -39,12 +39,14 @@ README.rst

hiphive/fitting/fit_methods.py
hiphive/fitting/io.py
hiphive/fitting/model_selection.py
hiphive/fitting/oi.py
hiphive/fitting/optimizer.py
hiphive/fitting/split_bregman.py
hiphive/fitting/tools.py
hiphive/io/__init__.py
hiphive/io/logging.py
hiphive/io/phonopy.py
hiphive/io/pretty_table_prints.py
hiphive/io/read_write_files.py
hiphive/io/shengBTE.py
hiphive/input_output/__init__.py
hiphive/input_output/gpumd.py
hiphive/input_output/logging_tools.py
hiphive/input_output/phonopy.py
hiphive/input_output/pretty_table_prints.py
hiphive/input_output/read_write_files.py
hiphive/input_output/shengBTE.py
hiphive/md_tools/__init__.py

@@ -51,0 +53,0 @@ hiphive/md_tools/spectral_energy_density.py

@@ -33,3 +33,3 @@ """

'Paul Erhart']
__copyright__ = '2019'
__copyright__ = '2020'
__license__ = 'MIT'

@@ -39,3 +39,3 @@ __credits__ = ['Fredrik Eriksson',

'Paul Erhart']
__version__ = '0.6'
__version__ = '0.7'
__all__ = ['ClusterSpace',

@@ -46,4 +46,3 @@ 'StructureContainer',

'config',
'enforce_rotational_sum_rules',
'io']
'enforce_rotational_sum_rules']
__maintainer__ = 'The hiPhive developers team'

@@ -50,0 +49,0 @@ __maintainer_email__ = 'hiphive@materialsmodeling.org'

@@ -5,9 +5,11 @@ """Contains a calculator which given an arbitrary list of clusters and

"""
import numpy as np
import math
from collections import defaultdict as dd
from typing import List, Tuple
import numpy as np
from ase import Atoms
from ase.calculators.calculator import Calculator, all_changes
from hiphive.force_constants import SortedForceConstants
from ase.calculators.calculator import Calculator, all_changes
from ase.geometry import find_mic
from hiphive.utilities import get_displacements
from .numba_calc import _get_forces

@@ -25,5 +27,5 @@

-----------
fcs: ForceConstants
fcs : ForceConstants
the force constants instance must include the atomic configuration
max_disp: float
max_disp : float
maximum allowed displacement before calculator raises ValueError

@@ -34,3 +36,3 @@ """

def __init__(self, fcs, max_disp=3.0):
def __init__(self, fcs: SortedForceConstants, max_disp: float = 3.0):
Calculator.__init__(self)

@@ -79,3 +81,5 @@

def calculate(self, atoms=None, properties=['energy'], system_changes=all_changes):
def calculate(self, atoms: Atoms = None,
properties: List[str] = ['energy'],
system_changes: List[str] = all_changes) -> None:
Calculator.calculate(self, atoms, properties, system_changes)

@@ -90,19 +94,15 @@ self._check_atoms()

def _check_atoms(self):
"""Check that the atomic configuration, with which the calculator is
def _check_atoms(self) -> None:
"""Checks that the atomic configuration, with which the calculator is
associated, is compatible with the ideal configuration provided during
initialization."""
if len(self.atoms) != len(self.atoms_ideal):
raise ValueError('Length of atoms does not match reference atoms')
raise ValueError('Length of atoms does not match reference structure')
if not all(self.atoms.numbers == self.atoms_ideal.numbers):
raise ValueError('Atomic numbers does not match reference atoms')
raise ValueError('Atomic numbers do not match reference structure')
def _compute_displacements(self):
"""Evaluate the atomic displacements between the current and the ideal
def _compute_displacements(self) -> None:
"""Evaluates the atomic displacements between the current and the ideal
(reference) configuration."""
displacements = []
for pos, ideal_pos in zip(self.atoms.positions, self.atoms_ideal.positions):
v_ij = np.array([pos - ideal_pos])
displacements.append(find_mic(v_ij, self.atoms.cell, pbc=True)[0][0])
self.displacements = np.array(displacements)
self.displacements = get_displacements(self.atoms, self.atoms_ideal)

@@ -115,4 +115,4 @@ # sanity check that displacements are not too large

def compute_energy_and_forces(self):
"""Compute energy and forces.
def compute_energy_and_forces(self) -> Tuple[float, list]:
"""Computes energy and forces.

@@ -140,5 +140,5 @@ Returns

def __repr__(self):
def __repr__(self) -> str:
fc_dict_str = '{{{}: {}, ...}}'.format(self.clusters[0], self.force_constants[0])
fcs_str = 'ForceConstants(fc_dict={}, atoms={!r})'.format(fc_dict_str, self.atoms_ideal)
return 'ForceConstantCalculator({})'.format(fcs_str)

@@ -15,9 +15,9 @@ """

from .core.orbits import Orbit
from .io.logging import logger
from .io.pretty_table_prints import print_table
from .io.read_write_files import (add_items_to_tarfile_pickle,
add_items_to_tarfile_custom,
add_list_to_tarfile_custom,
read_items_pickle,
read_list_custom)
from .input_output.logging_tools import logger
from .input_output.pretty_table_prints import print_table
from .input_output.read_write_files import (add_items_to_tarfile_pickle,
add_items_to_tarfile_custom,
add_list_to_tarfile_custom,
read_items_pickle,
read_list_custom)
from .cutoffs import Cutoffs, CutoffMaximumBody, BaseClusterFilter

@@ -24,0 +24,0 @@

@@ -1,2 +0,2 @@

from hiphive.io.logging import logger
from hiphive.input_output.logging_tools import logger

@@ -3,0 +3,0 @@

@@ -10,3 +10,3 @@ """

import numpy as np
from ..io.logging import logger
from ..input_output.logging_tools import logger

@@ -116,6 +116,3 @@ # TODO: Rename logger

try:
pickle.dump(data, f)
except Exception:
raise Exception('Failed writing to file.')
pickle.dump(data, f)

@@ -135,6 +132,3 @@ @staticmethod

"""
try:
data = pickle.load(f)
except Exception:
raise Exception('Failed loading from file.')
data = pickle.load(f)
atoms = aseAtoms(numbers=data['numbers'],

@@ -141,0 +135,0 @@ scaled_positions=data['basis'],

@@ -10,3 +10,3 @@ import itertools

from .orbits import get_orbits
from ..io.logging import logger
from ..input_output.logging_tools import logger
from .eigentensors import create_eigentensors as _create_eigentensors

@@ -117,3 +117,2 @@ from .tensors import rotate_tensor_precalc, rotation_tensor_as_matrix

orders = cs.cutoffs.orders
permutations = BiMap()

@@ -123,3 +122,2 @@ for order in orders:

permutations.append(permutation)
cs._permutations = permutations

@@ -130,4 +128,2 @@

# object
# TODO: Add kw prim to CS init. To use the supplied primitive cell instead of
# having spglib return a funky rotated cell
# TODO: Does the basis check need to be done? There might not be a problem that

@@ -154,4 +150,3 @@ # it is close to 1 instead of 0 anymore. If thats the case it is better to keep

# TODO: is symprec the best tolerance to use for volume check?
cell_volume_match = np.isclose(spg_cell_volume, prototype_cell_volume,
atol=cs.symprec, rtol=0)
cell_volume_match = np.isclose(spg_cell_volume, prototype_cell_volume, atol=cs.symprec, rtol=0)

@@ -163,13 +158,8 @@ if numbers_match and cell_volume_match:

basis = spgPrim[1]
if np.any(spgPrim[1] > (1 - cs.symprec)):
logger.debug('Found basis close to 1:\n' +
' {}'.format(str(spgPrim[1])))
basis = spgPrim[1].round(8) % 1 # TODO
logger.debug('Wrapping to:\n' +
' {}'.format(str(basis)))
if np.any(basis > (1 - cs.symprec)):
logger.debug('Found basis close to 1:\n {}'.format(str(basis)))
basis = basis.round(8) % 1 # TODO
logger.debug('Wrapping to:\n {}'.format(str(basis)))
prim = Atoms(cell=spgPrim[0], scaled_positions=basis, numbers=spgPrim[2], pbc=True)
prim = Atoms(cell=spgPrim[0],
scaled_positions=basis,
numbers=spgPrim[2], pbc=True)
# log primitive cell information

@@ -183,8 +173,6 @@ logger.info('Primitive cell:')

for symbol, spos in zip(prim.get_chemical_symbols(), prim.basis):
logger.info(' {:2} [{:9.5f} {:9.5f} {:9.5f}]'.format(
symbol, *spos))
logger.info(' {:2} [{:9.5f} {:9.5f} {:9.5f}]'.format(symbol, *spos))
else:
for sym, spos in zip(prim[:3].get_chemical_symbols(), prim[:3].basis):
logger.info(' {:2} [{:9.5f} {:9.5f} {:9.5f}]'.format(
sym, *spos))
logger.info(' {:2} [{:9.5f} {:9.5f} {:9.5f}]'.format(sym, *spos))
logger.info(' ...')

@@ -202,5 +190,3 @@ logger.info('')

prim = cs._prim
symmetry_dataset = spg.get_symmetry_dataset(prim, symprec=cs.symprec)
cs._symmetry_dataset = symmetry_dataset

@@ -210,6 +196,4 @@

logger.info(' Spacegroup: {}'.format(cs.spacegroup))
logger.info(' Unique atoms: {}'.format(
len(set(cs.wyckoff_sites))))
logger.info(' Symmetry operations: {}'.format(
len(cs.rotation_matrices)))
logger.info(' Unique site: {}'.format(len(set(cs.wyckoff_sites))))
logger.info(' Symmetry operations: {}'.format(len(cs.rotation_matrices)))
logger.info(' symprec: {:.2e}'.format(cs.symprec))

@@ -227,3 +211,2 @@ logger.info('')

tol = cs.symprec
atom_list = BiMap()

@@ -237,7 +220,7 @@

logger.info(' Maximum cutoff: {}'.format(cs.cutoffs.max_cutoff))
# Find all the atoms which is neighbors to the atoms in the center cell
# The pair cutoff should be larger or equal than the others
cutoffs = [(cs.cutoffs.max_cutoff - tol) / 2] * len(cs._prim)
nl = NeighborList(cutoffs=cutoffs, skin=0, self_interaction=True,
bothways=True)
nl = NeighborList(cutoffs=cutoffs, skin=0, self_interaction=True, bothways=True)
nl.update(cs._prim)

@@ -251,5 +234,4 @@ for i in range(len(cs._prim)):

nl = NeighborList(
cutoffs=[(cs.cutoffs.max_cutoff + tol) / 2] *
len(cs._prim), skin=0, self_interaction=True,
bothways=True)
cutoffs=[(cs.cutoffs.max_cutoff + tol) / 2] * len(cs._prim),
skin=0, self_interaction=True, bothways=True)
nl.update(cs._prim)

@@ -268,4 +250,3 @@ distance_from_cutoff = tol

if distance_from_cutoff != tol:
msg = 'Maximum cutoff close to neighbor shell, change cutoff'
raise Exception(msg)
raise Exception('Maximum cutoff close to neighbor shell, change cutoff')

@@ -288,10 +269,7 @@ msg = ' Found {} center atom{} with {} images totaling {} atoms'.format(

numbers = [cs._prim.numbers[a.site] for a in cs._atom_list]
# Make an atoms object out of the scaled positions
atoms = Atoms(cell=cs._prim.cell,
scaled_positions=spos,
numbers=numbers,
pbc=False)
atoms = Atoms(cell=cs._prim.cell, scaled_positions=spos, numbers=numbers, pbc=False)
cs._cluster_filter.setup(atoms)
cs._cluster_list = get_clusters(atoms, cs.cutoffs, len(cs._prim))

@@ -302,4 +280,3 @@

logger.info(' Clusters: {}'.format(dict(counter)))
logger.info(' Total number of clusters: {}\n'
.format(sum(counter.values())))
logger.info(' Total number of clusters: {}\n'.format(sum(counter.values())))

@@ -323,3 +300,2 @@

cs._dropped_orbits = []
for i in range(len(cs.orbits)):

@@ -335,4 +311,3 @@ if i in orbits_to_drop:

logger.info(' Orbits: {}'.format(dict(counter)))
logger.info(' Total number of orbits: {}\n'
.format(sum(counter.values())))
logger.info(' Total number of orbits: {}\n'.format(sum(counter.values())))

@@ -358,12 +333,9 @@

for order in cs.cutoffs.orders:
n_ets[order] = sum(len(orb.eigentensors) for orb in cs.orbits
if orb.order == order)
n_ets[order] = sum(len(orb.eigentensors) for orb in cs.orbits if orb.order == order)
logger.info(' Eigentensors: {}'.format(n_ets))
logger.info(' Total number of parameters: {}'
.format(sum(n_ets.values())))
logger.info(' Total number of parameters: {}'.format(sum(n_ets.values())))
if len(cs._dropped_orbits) > 0:
logger.info(' Discarded orbits:')
for orb in cs._dropped_orbits:
logger.info(' {}'
.format(cs.cluster_list[orb.prototype_index]))
logger.info(' {}'.format(cs.cluster_list[orb.prototype_index]))
logger.info('')

@@ -370,0 +342,0 @@

@@ -8,3 +8,3 @@ # Contains the get_clusters function which generates clusters

from .utilities import BiMap
from ..io.logging import logger
from ..input_output.logging_tools import logger

@@ -11,0 +11,0 @@ logger = logger.getChild('get_clusters')

@@ -5,3 +5,3 @@ import numpy as np

from .tensors import rotate_tensor
from ..io.logging import logger
from ..input_output.logging_tools import logger
from .utilities import SparseMatrix

@@ -8,0 +8,0 @@

@@ -10,9 +10,9 @@ """

from .atoms import Atom
from ..io.logging import logger
from ..io.read_write_files import (add_items_to_tarfile_hdf5,
add_items_to_tarfile_pickle,
add_list_to_tarfile_custom,
read_items_hdf5,
read_items_pickle,
read_list_custom)
from ..input_output.logging_tools import logger
from ..input_output.read_write_files import (add_items_to_tarfile_hdf5,
add_items_to_tarfile_pickle,
add_list_to_tarfile_custom,
read_items_hdf5,
read_items_pickle,
read_list_custom)
logger = logger.getChild('orbits')

@@ -335,6 +335,3 @@

"""
try:
pickle.dump(self, f)
except Exception:
raise Exception('Failed writing to file.')
pickle.dump(self, f)

@@ -354,6 +351,3 @@ @staticmethod

"""
try:
return pickle.load(f)
except Exception:
raise Exception('Failed loading from file.')
return pickle.load(f)

@@ -360,0 +354,0 @@

@@ -10,3 +10,3 @@ """

def enforce_rotational_sum_rules(cs, parameters, sum_rules, **kwargs):
def enforce_rotational_sum_rules(cs, parameters, sum_rules=None, alpha=1e-6):
""" Enforces rotational sum rules by projecting parameters.

@@ -30,6 +30,2 @@

i.e. less correction but higher stability [default: 1e-6]
iterations : int
number of iterations to run the projection since each step projects
the solution down to each nullspace in serial; keyword argument
passed to the optimizer [default: 10]

@@ -55,27 +51,50 @@ Returns

fcp = ForceConstantPotential(cs, new_params)
"""
all_sum_rules = ['Huang', 'Born-Huang']
# setup
parameters = parameters.copy()
# handle optional kwargs input
iterations = kwargs.pop('iterations', 10)
ridge_alpha = kwargs.pop('ridge_alpha', 1e-6)
if len(kwargs) != 0:
raise ValueError('Unrecognised kwargs: {}'.format(kwargs))
if sum_rules is None:
sum_rules = all_sum_rules
# get constraint-matrix
M = get_rotational_constraint_matrix(cs, sum_rules)
# before fit
d = M.dot(parameters)
delta = np.linalg.norm(d)
print('Rotational sum-rules before, ||Ax|| = {:20.15f}'.format(delta))
# fitting
ridge = Ridge(alpha=alpha, fit_intercept=False, solver='sparse_cg')
ridge.fit(M, d)
parameters -= ridge.coef_
# after fit
d = M.dot(parameters)
delta = np.linalg.norm(d)
print('Rotational sum-rules after, ||Ax|| = {:20.15f}'.format(delta))
return parameters
def get_rotational_constraint_matrix(cs, sum_rules=None):
all_sum_rules = ['Huang', 'Born-Huang']
if sum_rules is None:
sum_rules = all_sum_rules
# setup
assert len(sum_rules) > 0
for s in sum_rules:
if s not in all_sum_rules:
raise ValueError('Sum rule {} not allowed, select from {}'.format(s, all_sum_rules))
# make orbit-parameter index map
params = []
n = 0
for orbit_index, orbit in enumerate(cs.orbits):
number_params_in_orbit = len(orbit.eigentensors)
params.append(list(range(n, n + number_params_in_orbit)))
n += number_params_in_orbit
params = _get_orbit_parameter_map(cs)
lookup = _get_fc_lookup(cs)
# create lookuptable for force constants
lookup = {}
for orbit_index, orbit in enumerate(cs.orbits):
for of in orbit.orientation_families:
for cluster_index, perm_index in zip(of.cluster_indices, of.permutation_indices):
cluster = cs._cluster_list[cluster_index]
perm = cs._permutations[perm_index]
lookup[tuple(cluster)] = [et.transpose(perm) for et in of.eigentensors], orbit_index
# append the sum rule matrices

@@ -89,7 +108,5 @@ Ms = []

Ms.append(_create_Born_Huang_constraint(*args))
else:
raise ValueError('At least one unknown sum rule: {}'.format(sum_rule))
# transform and stack matrices
cvs_trans = cs._cvs
for i, M in enumerate(Ms):

@@ -102,14 +119,32 @@ row, col, data = [], [], []

M = coo_matrix((data, (row, col)), shape=M.shape)
Ms[i] = M.dot(cvs_trans)
M = M.dot(cvs_trans)
M = M.toarray()
Ms[i] = M
for _ in range(iterations):
for M in Ms:
clf = Ridge(alpha=ridge_alpha)
d = M.dot(parameters)
clf.fit(M, d)
parameters -= clf.coef_ # subtract the correction
return np.vstack(Ms)
return parameters
def _get_orbit_parameter_map(cs):
# make orbit-parameter index map
params = []
n = 0
for orbit_index, orbit in enumerate(cs.orbits):
n_params_in_orbit = len(orbit.eigentensors)
params.append(list(range(n, n + n_params_in_orbit)))
n += n_params_in_orbit
return params
def _get_fc_lookup(cs):
# create lookuptable for force constants
lookup = {}
for orbit_index, orbit in enumerate(cs.orbits):
for of in orbit.orientation_families:
for cluster_index, perm_index in zip(of.cluster_indices, of.permutation_indices):
cluster = cs._cluster_list[cluster_index]
perm = cs._permutations[perm_index]
lookup[tuple(cluster)] = [et.transpose(perm) for et in of.eigentensors], orbit_index
return lookup
def _create_Huang_constraint(lookup, parameter_map, atom_list, prim):

@@ -116,0 +151,0 @@

@@ -5,3 +5,3 @@ import itertools

from . import atoms as atoms_module
from ..io.logging import logger
from ..input_output.logging_tools import logger

@@ -8,0 +8,0 @@ logger = logger.getChild('relate_structures')

import numpy as np
from ase import Atoms

@@ -113,6 +112,2 @@

def to_atoms(self):
return Atoms(cell=self.cell, positions=self.pos,
numbers=self.numbers, pbc=True)
def __len__(self):

@@ -129,10 +124,2 @@ return len(self._spos)

@property
def pos(self):
return np.dot(self._spos, self._cell)
@property
def numbers(self):
return self._numbers
def __getitem__(self, index):

@@ -187,5 +174,1 @@ if index >= len(self):

return len(self._supercell)
@property
def cell(self):
return self._supercell.cell

@@ -9,3 +9,3 @@ """

from ..io.logging import logger
from ..input_output.logging_tools import logger
from .utilities import SparseMatrix, BiMap

@@ -20,4 +20,2 @@ from .orbits import get_orbits

logger.debug('Starting constructing constraint matrices')
# First we need to know the global index of a eigentensor given which

@@ -40,2 +38,33 @@ # orbit it belongs to.

M = get_translational_constraint_matrix(cs)
nullspace = M.nullspace()
logger.debug('Assembling solutions...')
row = []
col = []
data = []
for c, vec in enumerate(nullspace):
for r, _, v in vec.row_list():
row.append(r)
col.append(c)
data.append(np.float64(v))
# This is the final product, the constraint map (or constraint vectors)
cs._cvs = coo_matrix((data, (row, col)), shape=(len(vec), len(nullspace)))
logger.debug('{} degrees of freedom'.format(cs._cvs.shape[1]))
logger.debug('Finished constructing constraint vectors')
def get_translational_constraint_matrix(cs):
logger.debug('Starting constructing constraint matrices')
# First we need to know the global index of a eigentensor given which
# orbit it belongs to.
params = []
n = 0
for orbit_index, orbit in enumerate(cs.orbits):
nParams_in_orbit = len(orbit.eigentensors)
params.append(list(range(n, n + nParams_in_orbit)))
n += nParams_in_orbit
# The lookup is a fast way to get the eigentensors given a cluster Also the

@@ -107,18 +136,3 @@ # orbit index is bundled to quickly find the correct parameter indices

logger.debug('Finished constructing constraint matrices')
logger.debug('Starting constructing constraint vectors')
nullspace = M.nullspace()
logger.debug('Assembling solutions...')
row = []
col = []
data = []
for c, vec in enumerate(nullspace):
for r, _, v in vec.row_list():
row.append(r)
col.append(c)
data.append(np.float64(v))
# This is the final product, the constraint map (or constraint vectors)
cs._cvs = coo_matrix((data, (row, col)), shape=(len(vec), len(nullspace)))
logger.debug('{} degrees of freedom'.format(cs._cvs.shape[1]))
logger.debug('Finished constructing constraint vectors')
return M
"""
The ``utilities`` module contains various support functions and classes.
"""
import sympy
import numpy as np
from scipy.sparse import coo_matrix
from collections import defaultdict
from ..io.logging import Progress
from sympy.core import S
import sympy
from ..input_output.logging_tools import Progress
__all__ = ['Progress']

@@ -113,3 +117,13 @@

def to_array(self):
""" Cast SparseMatrix instance to numpy array """
row, col, data = [], [], []
for r, c, v in self.row_list():
row.append(r)
col.append(c)
data.append(np.float64(v))
M = coo_matrix((data, (row, col)), shape=self.shape)
return M.toarray()
class BiMap:

@@ -146,3 +160,2 @@ """Simple list like structure with fast dict-lookup

except KeyError:
print(self._dict)
raise ValueError('{} is not in list'.format(value))

@@ -149,0 +162,0 @@

import pickle
import numpy as np
from ase.neighborlist import NeighborList
from hiphive.io.pretty_table_prints import table_array_to_string
from hiphive.input_output.pretty_table_prints import table_array_to_string

@@ -27,8 +27,6 @@

try:
self._cutoff_matrix = np.array(cutoff_matrix, dtype=np.float)
except Exception:
raise Exception('Please specify cutoffs as a matrix of numbers')
self._cutoff_matrix = np.array(cutoff_matrix, dtype=np.float)
if len(self._cutoff_matrix.shape) != 2:
raise TypeError('Please specify cutoff matrix as a 2D array')
raise ValueError('Please specify cutoff matrix as a 2D array')
for i, row in enumerate(self._cutoff_matrix):

@@ -287,33 +285,1 @@ if np.any(row[i:] < 0):

return True
class MaxTripletDistance(BaseClusterFilter):
"""Rejects large three-body clusters.
If two or all pairs within the cluster are further away from each
other than the specified distance the cluster is
rejected. Alternatively, only one pair distance can exceed the
specified cutoff.
The filter only rejects for three-body clusters. All other
clusters are accepted.
Parameters
----------
max_dist : float
maximum distance allowed for the two closest pairs
"""
def __init__(self, max_dist):
self._max_dist = max_dist
def __call__(self, cluster):
cluster = tuple(set(cluster))
if len(cluster) != 3:
return True
i, j, k = cluster
ij = self._atoms.get_distance(i, j) < self._max_dist
ik = self._atoms.get_distance(i, k) < self._max_dist
jk = self._atoms.get_distance(j, k) < self._max_dist
return (ij and (ik or jk)) or (ik and jk)

@@ -5,3 +5,3 @@ from .optimizer import Optimizer

from .fit_methods import fit, available_fit_methods
from .io import _read_pickle as read_summary
from .oi import _read_pickle as read_summary

@@ -8,0 +8,0 @@ __all__ = ['fit',

@@ -9,3 +9,3 @@ """

from .fit_methods import available_fit_methods
from .io import _write_pickle
from .oi import _write_pickle

@@ -29,4 +29,4 @@

method to be used for training; possible choice are
"least-squares", "lasso", "elasticnet", "bayesian-ridge", "ardr",
"rfe", "split-bregman"
"ardr", "bayesian-ridge", "elasticnet", "lasso", "least-squares",
"omp", "rfe", "ridge", "split-bregman"
standardize : bool

@@ -157,7 +157,8 @@ if True the fit matrix and target values are standardized before fitting,

def __str__(self) -> str:
info = self.summary
width = 54
s = []
s.append(' {} '.format(self.__class__.__name__).center(width, '='))
for key in sorted(self.summary.keys()):
value = self.summary[key]
for key in info.keys():
value = info[key]
if isinstance(value, (str, int, np.integer)):

@@ -164,0 +165,0 @@ s.append('{:30} : {}'.format(key, value))

@@ -7,2 +7,3 @@ """

from sklearn.model_selection import KFold, ShuffleSplit
from sklearn.metrics import r2_score
from typing import Any, Dict, Tuple

@@ -13,2 +14,3 @@ from .base_optimizer import BaseOptimizer

from .tools import ScatterData
from .model_selection import get_model_metrics

@@ -40,3 +42,3 @@

----------
fit_data : tupe(numpy.ndarray, numpy.ndarray)
fit_data : tuple(numpy.ndarray, numpy.ndarray)
the first element of the tuple represents the fit matrix `A`

@@ -49,4 +51,4 @@ (`N, M` array) while the second element represents the vector

method to be used for training; possible choice are
"least-squares", "lasso", "elasticnet", "bayesian-ridge", "ardr",
"rfe", "split-bregman"
"ardr", "bayesian-ridge", "elasticnet", "lasso", "least-squares",
"omp", "rfe", "ridge", "split-bregman"
standardize : bool

@@ -91,4 +93,3 @@ if True the fit matrix and target values are standardized before fitting,

super().__init__(fit_data, fit_method, standardize, check_condition,
seed)
super().__init__(fit_data, fit_method, standardize, check_condition, seed)

@@ -103,3 +104,2 @@ if validation_method not in validation_methods.keys():

self._n_splits = n_splits
self._set_kwargs(kwargs)

@@ -118,3 +118,3 @@

self._rmse_valid_splits = None
self._rmse_train_final = None
self.model_metrics = {}

@@ -126,4 +126,11 @@ def train(self) -> None:

**self._fit_kwargs)
self._rmse_train_final = self.compute_rmse(self._A, self._y)
y_train_predicted = np.dot(self._A, self.parameters)
metrics = get_model_metrics(self._A, self.parameters, self._y, y_train_predicted)
# finalize metrics
self.model_metrics['rmse_train_final'] = metrics['rmse_train']
self.model_metrics['R2_train'] = metrics['R2_train']
self.model_metrics['AIC'] = metrics['AIC']
self.model_metrics['BIC'] = metrics['BIC']
def validate(self) -> None:

@@ -160,2 +167,5 @@ """ Runs validation. """

self.model_metrics['rmse_validation'] = np.sqrt(np.mean(self._rmse_valid_splits**2))
self.model_metrics['R2_validation'] = r2_score(valid_target, valid_predicted)
def _set_kwargs(self, kwargs: dict) -> None:

@@ -186,2 +196,3 @@ """

""" comprehensive information about the optimizer """
info = super().summary

@@ -192,3 +203,2 @@

info['n_splits'] = self.n_splits
info['rmse_train_final'] = self.rmse_train_final
info['rmse_train'] = self.rmse_train

@@ -201,2 +211,5 @@ info['rmse_train_splits'] = self.rmse_train_splits

# add metrics
info = {**info, **self.model_metrics}
# add kwargs used for fitting and splitting

@@ -244,3 +257,5 @@ info = {**info, **self._fit_kwargs, **self._split_kwargs}

"""
return self._rmse_train_final
if 'rmse_train_final' not in self.model_metrics:
return None
return self.model_metrics['rmse_train_final']

@@ -247,0 +262,0 @@ @property

@@ -39,4 +39,4 @@ """

method to be used for training; possible choice are
"least-squares", "lasso", "elasticnet", "bayesian-ridge", "ardr",
"rfe", "split-bregman"
"ardr", "bayesian-ridge", "elasticnet", "lasso", "least-squares",
"omp", "rfe", "ridge", "split-bregman"
standardize : bool

@@ -73,4 +73,3 @@ if True the fit matrix and target values are standardized before fitting,

super().__init__(fit_data, fit_method, standardize, check_condition,
seed)
super().__init__(fit_data, fit_method, standardize, check_condition, seed)

@@ -90,3 +89,3 @@ # set training size

self._test_set_list = None
self._parameter_vectors = None
self._parameters_splits = None
self._parameters_std = None

@@ -113,4 +112,3 @@ self._rmse_train_ensemble = None

self.train_size, replace=self.bootstrap)
test_set = np.setdiff1d(
range(self.n_target_values), train_set)
test_set = np.setdiff1d(range(self.n_target_values), train_set)

@@ -127,11 +125,7 @@ # train

# collect data from each fit
self._parameter_vectors = np.array(
[opt.parameters for opt in optimizers])
self._parameters_splits = np.array([opt.parameters for opt in optimizers])
self._train_set_list = [opt.train_set for opt in optimizers]
self._test_set_list = [opt.test_set for opt in optimizers]
self._rmse_train_ensemble = np.array(
[opt.rmse_train for opt in optimizers])
self._rmse_test_ensemble = np.array(
[opt.rmse_test for opt in optimizers])
self._rmse_train_ensemble = np.array([opt.rmse_train for opt in optimizers])
self._rmse_test_ensemble = np.array([opt.rmse_test for opt in optimizers])

@@ -142,9 +136,6 @@ def _construct_final_model(self) -> None:

"""
self._fit_results['parameters'] = np.mean(
self.parameter_vectors, axis=0)
self._parameters_std = np.std(self.parameter_vectors, axis=0)
self._fit_results['parameters'] = np.mean(self.parameters_splits, axis=0)
self._parameters_std = np.std(self.parameters_splits, axis=0)
def predict(self,
A: np.ndarray,
return_std: bool = False) \
def predict(self, A: np.ndarray, return_std: bool = False) \
-> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]:

@@ -172,3 +163,3 @@ """

if return_std:
predictions = np.dot(A, self.parameter_vectors.T)
predictions = np.dot(A, self.parameters_splits.T)
if len(predictions.shape) == 1: # shape is (N, )

@@ -188,6 +179,6 @@ std = np.std(predictions)

"""
if self.parameter_vectors is None:
if self.parameters_splits is None:
return None
error_matrix = np.zeros((self._n_rows, self.ensemble_size))
for i, parameters in enumerate(self.parameter_vectors):
for i, parameters in enumerate(self.parameters_splits):
error_matrix[:, i] = np.dot(self._A, parameters) - self._y

@@ -232,5 +223,5 @@ return error_matrix

@property
def parameter_vectors(self) -> List[np.ndarray]:
""" all parameter vectors in the ensemble """
return self._parameter_vectors
def parameters_splits(self) -> List[np.ndarray]:
""" all parameters vectors in the ensemble """
return self._parameters_splits

@@ -237,0 +228,0 @@ @property

@@ -22,2 +22,4 @@ """

Ridge,
OrthogonalMatchingPursuit,
OrthogonalMatchingPursuitCV,
RidgeCV,

@@ -32,3 +34,4 @@ ElasticNet,

from typing import Any, Dict, List, Union
from ..io.logging import logger
from ..input_output.logging_tools import logger
from .split_bregman import fit_split_bregman

@@ -76,3 +79,3 @@

if check_condition:
if check_condition and X.shape[0] >= X.shape[1]:
cond = np.linalg.cond(X)

@@ -207,2 +210,99 @@ if cond > 1e10:

def _fit_adaptive_lasso(X, y, stop_tol=1e-5, n_lasso_iters=30, alpha=None, **kwargs):
"""
Returns the solution `a` to the linear problem `Xa=y` obtained by
using an adaptive LASSO method.
Parameters
----------
X
fit matrix
y
target array
alpha
alpha value
lasso_iter
number of LASSO fits to carry out
"""
def update_weights(x):
return 1 / (2 * np.sqrt(np.abs(x)) + 1e-12)
# line scan
if alpha is None:
return _fit_adaptive_lasso_line_scan(
X, y, n_lasso_iters=n_lasso_iters, stop_tol=stop_tol, **kwargs)
# adaptive-lasso fit
n_cols = X.shape[1]
weights = np.ones(n_cols)
x = np.zeros(n_cols)
for it in range(n_lasso_iters):
X_w = X / weights.reshape(1, -1)
lasso = Lasso(alpha=alpha, fit_intercept=False, **kwargs)
lasso.fit(X_w, y)
x_new = lasso.coef_ / weights
dx = np.linalg.norm(x_new - x)
x = x_new
weights = update_weights(x)
print('adaptive-lasso iter {:3d}, |x|: {:11.8f}, |dx|: {:11.8f}, |dx|/|x|: {:11.8f}'
.format(it, np.linalg.norm(x), dx, dx/np.linalg.norm(x)))
if dx < stop_tol:
break
results = dict(parameters=x, weights=weights)
return results
def _fit_adaptive_lasso_line_scan(X: np.ndarray,
y: np.ndarray,
cv_splits: int = 5,
alphas: List[float] = None,
**kwargs) -> Dict[str, Any]:
"""Adaptive-lasso with line-scan for optimal alpha.
Parameters
-----------
X
fit matrix
y
target array
alphas
list of alpha values to be evaluated. The optimal
alpha found will be used in the final fit.
cv_splits
how many CV splits to carry out when evaluating each lambda value.
"""
from .cross_validation import CrossValidationEstimator
# default lambda values to scan
if alphas is None:
alphas = np.logspace(-7, -1, 15)
# run lin-scan of lambda values
cv_data = []
for alpha in alphas:
cve = CrossValidationEstimator((X, y), fit_method='adaptive-lasso',
validation_method='shuffle-split',
alpha=alpha, test_size=0.1, train_size=0.9,
n_splits=cv_splits, **kwargs)
cve.validate()
print('adaptive-lasso alpha {}, cv-rmse {:11.8f}'.format(alpha, cve.rmse_validation))
cv_data.append([alpha, cve.rmse_validation])
# select best lambda
cv_data = np.array(cv_data)
optimal_ind = cv_data[:, 1].argmin()
alpha_optimal = cv_data[optimal_ind, 0]
# final fit with optimal lambda
results = _fit_adaptive_lasso(X, y, alpha=alpha_optimal, **kwargs)
results['alpha_optimal'] = alpha_optimal
return results
def _fit_ridge(X, y, alpha=None, fit_intercept=False, **kwargs):

@@ -399,3 +499,2 @@ results = dict()

"""
from .cross_validation import CrossValidationEstimator

@@ -451,14 +550,18 @@

def _get_tags(self):
from sklearn.base import _DEFAULT_TAGS
return _DEFAULT_TAGS
def fit_rfe(X: np.ndarray,
y: np.ndarray,
n_features: int = None,
step: Union[int, float] = 0.04,
estimator: str = 'least-squares',
final_estimator: str = None,
estimator_kwargs: dict = {},
final_estimator_kwargs: dict = {},
cv_splits: int = 5,
n_jobs: int = -1,
**rfe_kwargs):
def _fit_rfe(X: np.ndarray,
y: np.ndarray,
n_features: int = None,
step: Union[int, float] = 0.04,
estimator: str = 'least-squares',
final_estimator: str = None,
estimator_kwargs: dict = {},
final_estimator_kwargs: dict = {},
cv_splits: int = 5,
n_jobs: int = -1,
**rfe_kwargs):
"""

@@ -526,2 +629,45 @@ Returns the solution `a` to the linear problem `Xa=y` obtained by

def _fit_omp(X: np.ndarray, y: np.ndarray,
n_nonzero_coefs: int = None, fit_intercept: bool = False,
cv_splits: int = 5,
n_jobs: int = -1,
**kwargs) -> Dict[str, Any]:
"""
Fitting using Orthogonal Matching Pursuit (OMP) see
https://scikit-learn.org/stable/modules/linear_model.html#omp
Parameters
-----------
X
fit matrix
y
target array
n_nonzero_coefs
number of non-zero parameters to obtain, if None will find the
optimal number via CV
cv_splits
number of cv-splits to carry out if finding optimal n_features
n_jobs
number of cores to use during the cross validation.
-1 means using all processors.
"""
if n_nonzero_coefs is None:
n_cols = X.shape[1]
cv = ShuffleSplit(train_size=0.9, test_size=0.1, n_splits=cv_splits)
ompcv = OrthogonalMatchingPursuitCV(
fit_intercept=fit_intercept, max_iter=n_cols, cv=cv, n_jobs=n_jobs, **kwargs)
ompcv.fit(X, y)
parameters = ompcv.coef_
else:
omp = OrthogonalMatchingPursuit(
n_nonzero_coefs=n_nonzero_coefs, fit_intercept=fit_intercept, **kwargs)
omp.fit(X, y)
parameters = omp.coef_
results = dict()
results['parameters'] = parameters
return results
fit_methods = OrderedDict([

@@ -531,2 +677,3 @@ ('least-squares', _fit_least_squares),

('ridge', _fit_ridge),
('omp', _fit_omp),
('bayesian-ridge', _fit_bayesian_ridge),

@@ -536,4 +683,6 @@ ('elasticnet', _fit_elasticnet),

('ardr', _fit_ardr),
('rfe', fit_rfe),
('adaptive-lasso', _fit_adaptive_lasso),
('rfe', _fit_rfe),
])
available_fit_methods = sorted(fit_methods.keys())

@@ -6,2 +6,4 @@ """

from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score
from .model_selection import get_model_metrics
from typing import Any, Dict, List, Tuple, Union

@@ -39,4 +41,4 @@ from .base_optimizer import BaseOptimizer

method to be used for training; possible choice are
"least-squares", "lasso", "elasticnet", "bayesian-ridge", "ardr",
"rfe", "split-bregman"
"ardr", "bayesian-ridge", "elasticnet", "lasso", "least-squares",
"omp", "rfe", "ridge", "split-bregman"
standardize : bool

@@ -77,3 +79,3 @@ if True the fit matrix and target values are standardized before fitting,

standardize: bool = True,
train_size: Union[int, float] = 0.75,
train_size: Union[int, float] = 0.9,
test_size: Union[int, float] = None,

@@ -95,8 +97,5 @@ train_set: Union[Tuple[int], List[int]] = None,

# will be populate once running train
self._rmse_train = None
self._rmse_test = None
self._contributions_train = None
self._contributions_test = None
self.train_scatter_data = None
self.test_scatter_data = None
self.model_metrics = {}

@@ -114,5 +113,8 @@ def train(self) -> None:

**self._kwargs)
self._rmse_train = self.compute_rmse(A_train, y_train)
self.train_scatter_data = ScatterData(y_train, self.predict(A_train))
y_train_predicted = self.predict(A_train)
self.train_scatter_data = ScatterData(y_train, y_train_predicted)
# get training metrics
metrics = get_model_metrics(A_train, self.parameters, y_train, y_train_predicted)
# perform testing

@@ -122,8 +124,19 @@ if self.test_set is not None:

y_test = self._y[self.test_set]
self._rmse_test = self.compute_rmse(A_test, y_test)
self.test_scatter_data = ScatterData(y_test, self.predict(A_test))
y_test_predicted = self.predict(A_test)
rmse_test = self.compute_rmse(A_test, y_test)
r2_test = r2_score(y_test, y_test_predicted)
self.test_scatter_data = ScatterData(y_test, y_test_predicted)
else:
self._rmse_test = None
rmse_test = None
r2_test = None
self.test_scatter_data = None
# finalize metrics
self.model_metrics['rmse_train'] = metrics['rmse_train']
self.model_metrics['rmse_test'] = rmse_test
self.model_metrics['R2_train'] = metrics['R2_train']
self.model_metrics['R2_test'] = r2_test
self.model_metrics['AIC'] = metrics['AIC']
self.model_metrics['BIC'] = metrics['BIC']
def _setup_rows(self,

@@ -207,5 +220,6 @@ train_size: Union[int, float],

# add metrics
info = {**info, **self.model_metrics}
# Add class specific data
info['rmse_train'] = self.rmse_train
info['rmse_test'] = self.rmse_test
info['train_size'] = self.train_size

@@ -237,3 +251,5 @@ info['train_set'] = self.train_set

""" root mean squared error for training set """
return self._rmse_train
if 'rmse_train' not in self.model_metrics:
return None
return self.model_metrics['rmse_train']

@@ -243,17 +259,7 @@ @property

""" root mean squared error for test set """
return self._rmse_test
if 'rmse_test' not in self.model_metrics:
return None
return self.model_metrics['rmse_test']
@property
def contributions_train(self) -> np.ndarray:
""" average contribution to the predicted values for
the train set from each parameter """
return self._contributions_train
@property
def contributions_test(self) -> np.ndarray:
""" average contribution to the predicted values for
the test set from each parameter """
return self._contributions_test
@property
def train_set(self) -> List[int]:

@@ -260,0 +266,0 @@ """ indices of rows included in the training set """

@@ -5,16 +5,139 @@ """

doi:10.1137/080725891
This implementation was based on the CSLD code written by Fei Zhou (周非),
Weston Nielson, Yi Xia, and Vidvuds Ozoliņš and makes use of a right
preconditioner for increased efficiency.
The preconditioning algorithm is described in Compressive sensing lattice
dynamics. I. General formalism Fei Zhou (周非), Weston Nielson, Yi Xia,
and Vidvuds Ozoliņš Phys. Rev. B 100, 184308
"""
import numpy as np
from scipy.optimize import minimize
from typing import Any, Dict
import scipy.linalg
from typing import Dict, Optional, Any, List, Union
from hiphive.input_output.logging_tools import logger
logger = logger.getChild(__name__)
def fit_split_bregman(A: np.ndarray,
y: np.ndarray,
mu: float = 1e-3,
lmbda: float = 100,
mu: Optional[Union[float, List[float]]] = None,
lmbda: float = 3.,
n_iters: int = 1000,
tol: float = 1e-6) -> Dict[str, Any]:
tol: float = 1e-4,
cg_tol: float = 1e-1,
cg_n_iters: Optional[int] = None,
iprint: Optional[int] = None,
cv_splits: int = 5,
) -> Dict[str, Any]:
r"""
Determines the solution :math:`\\boldsymbol{x}` to the linear
problem :math:`\\boldsymbol{A}\\boldsymbol{x}=\\boldsymbol{y}` using
the split-Bregman algorithm described in T. Goldstein and S. Osher,
SIAM J. Imaging Sci. 2, 323 (2009); doi:10.1137/080725891.
The obtained parameters are returned in the form of a
dictionary with a key named `parameters`.
In compressive sensing, the solution vector :math:`\boldsymbol{x}` is given
by
.. math::
\boldsymbol{x} =
\arg\min_{\boldsymbol{x}} \left\Vert\boldsymbol{x}\right\Vert_1
+ \mu \left\Vert\boldsymbol{A}\boldsymbol{x}
- \boldsymbol{y}\right\Vert^2_2,
where :math:`\mu` and :math:`\lambda` are hyperparameters that control the
sparseness of the solution and the efficiency of the algorithm.
This implementation was based on the CSLD code written by Fei Zhou (周非),
Weston Nielson, Yi Xia, and Vidvuds Ozoliņš and makes use of a right
preconditioner for increased efficiency.
The preconditioning algorithm is described in Compressive sensing lattice
dynamics. I. General formalism Fei Zhou (周非), Weston Nielson, Yi Xia,
and Vidvuds Ozoliņš Phys. Rev. B 100, 184308
Parameters
----------
A
fit matrix
y
target array
mu
sparseness parameter, can be given as a single value, a list of
values or None. If a list of values is given, cross validation will be
used to determine the optimal value. If None is given, a default set of
values will be tested using cross validation.
lmbda
weight of additional L2-norm in split-Bregman
n_iters
maximal number of split-Bregman iterations
tol
convergence criterion iterative minimization
cg_n_iters
maximum number of conjugate gradient iterations. If ``None``, a
reasonable guess will be made based on the number of free parameters
cg_tol
relative tolerance for converging the conjugate gradient step.
iprint
controls the frequency of logging output. ``iprint=None`` means no
output; ``iprint = n`` means print the status of the minimization every
n steps
cv_splits
if mu is None or multiple mu's provided, this controls how many CV
splits to carry out when evaluating each mu value.
"""
if mu is None:
mu = np.logspace(2, 6, 5)
if isinstance(mu, (float, int, np.float, np.int)):
results = _split_bregman(A, y, mu, lmbda, n_iters, tol, cg_tol, cg_n_iters, iprint)
else:
# multiple mu values given, use CV to select the best one
from .cross_validation import CrossValidationEstimator
cv_data = []
for mu_value in mu:
cve = CrossValidationEstimator(
(A, y), fit_method='split-bregman',
n_splits=cv_splits, mu=mu_value, lmbda=lmbda, n_iters=n_iters,
tol=tol, cg_tol=cg_tol, cg_n_iters=cg_n_iters, iprint=iprint)
cve.validate()
rmse = cve.rmse_validation
cv_data.append([mu_value, rmse])
if iprint:
logger.info("mu: {}, rmse: {:.4f}".format(mu_value, rmse))
# select best lambda
cv_data = np.array(cv_data)
optimal_ind = cv_data[:, 1].argmin()
mu_optimal = cv_data[optimal_ind, 0]
# final fit with optimal lambda
results = _split_bregman(A, y, mu_optimal, lmbda,
n_iters, tol, cg_tol,
cg_n_iters, iprint)
results['mu_optimal'] = mu_optimal
return results
def _split_bregman(A: np.ndarray,
y: np.ndarray,
mu: float,
lmbda: float,
n_iters: int,
tol: float,
cg_tol: float,
cg_n_iters: int,
iprint: Optional[int],
) -> Dict[str, np.ndarray]:
"""
Determines the solution :math:`\\boldsymbol{x}` to the linear

@@ -27,2 +150,10 @@ problem :math:`\\boldsymbol{A}\\boldsymbol{x}=\\boldsymbol{y}` using

This implementation was based on the CSLD code written by Fei Zhou (周非),
Weston Nielson, Yi Xia, and Vidvuds Ozoliņš and makes use of a right
preconditioner for increased efficiency.
The preconditioning algorithm is described in Compressive sensing lattice
dynamics. I. General formalism Fei Zhou (周非), Weston Nielson, Yi Xia,
and Vidvuds Ozoliņš Phys. Rev. B 100, 184308
Parameters

@@ -42,5 +173,14 @@ ----------

convergence criterion iterative minimization
cg_n_iters
maximum number of conjugate gradient iterations. If ``None``, a
reasonable guess will be made based on the number of free parameters
cg_tol
relative tolerance for converging the conjugate gradient step.
iprint
controls the frequency of logging output. ``iprint=None`` means no
output; ``iprint = n`` means print the status of the minimization every
n steps
"""
def _shrink(y: np.ndarray, alpha: float) -> np.ndarray:
def _shrink(y: np.ndarray) -> np.ndarray:
"""

@@ -50,4 +190,11 @@ Shrinkage operator as defined in Eq. (11) of the paper by Nelson

"""
return np.sign(y) * np.maximum(np.abs(y) - alpha, 0.0)
return np.sign(y) * np.maximum(np.abs(y) - 1 / lmbda, 0.0)
if not cg_n_iters:
cg_n_iters = int(np.max([10, A.shape[1] / 2.]))
preconditioner = _get_right_preconditioner(A, 1 / mu, lmbda)
A = np.dot(A, preconditioner)
n_cols = A.shape[1]

@@ -57,40 +204,44 @@ d = np.zeros(n_cols)

x = np.zeros(n_cols)
dx_norm = 1.
old_norm = 0.0
for k in range(n_iters):
x_prev = x
# Precompute for speed.
AtA = np.dot(A.conj().transpose(), A)
ftA = np.dot(y.conj().transpose(), A)
ii = 0
for i in range(n_iters):
args = (A, y, mu, lmbda, d, b, AtA, ftA)
res = minimize(_objective_function, x, args, method='BFGS',
options={'disp': False},
jac=_objective_function_derivative)
x = res.x
# The next two lines cover equations 18, 21, and 20 in the Nelson paper.
# except we apply mu to the L2 term instead of the L1 term
x = _minimize_cg(x, A, y, d, b, mu, lmbda, cg_n_iters, cg_tol * dx_norm)
d = _shrink(x + b)
b = b + x - d
d = _shrink(mu*x + b, 1.0/lmbda)
b = b + mu*x - d
dx_norm = np.linalg.norm(x - x_prev) / np.linalg.norm(x)
new_norm = np.linalg.norm(x)
ii = ii + 1
if iprint and (k + 1) % iprint == 1:
logger.info("Split Bregman it={}, |dU|/|U|={:.6}".format(k + 1,
dx_norm))
if abs(new_norm-old_norm) < tol:
if dx_norm <= tol:
break
old_norm = new_norm
if dx_norm > tol:
logger.warning("Split Bregman did not converge: |dU|/|U|={:.6} "
"Try increasing mu".format(dx_norm))
fit_results = {'parameters': x}
fit_results = {'parameters': np.dot(preconditioner, x)}
return fit_results
def _objective_function(x: np.ndarray, A: np.ndarray, y: np.ndarray,
mu: float, lmbda: float, d: np.ndarray, b: np.ndarray,
AtA: np.ndarray, ftA: np.ndarray) -> np.ndarray:
def _minimize_cg(x, A, y, d, b, mu, lmbda, n_iters, gtol):
"""
Returns the objective function to be minimized.
Minimises the objective function using conjugate gradient.
The conjugate gradient algorithm is described in S. Boyd and
L. Vandenberghe, "Convex Optimization" (Cambridge University Press, 2004).
This implementation was based on the CSLD code written by Fei Zhou (周非),
Weston Nielson, Yi Xia, and Vidvuds Ozoliņš.
Parameters
-----------
X
x
fit matrix

@@ -104,63 +255,61 @@ y

d
same notation as Nelson et al. paper
same notation as Nelson, Hart paper
b
same notation as Nelson et al. paper
AtA
sensing matrix transpose times sensing matrix.
ftA
np.dot(y.conj().transpose(), A)
same notation as Nelson, Hart paper
n_iters
maximum number of iterations
gtol
gradient tolerance for convergence
"""
error_vector = np.matmul(A, x) - y
r = -(mu * np.matmul(A.T, error_vector) - lmbda * (d - b - x))
p = r
delta = np.dot(r, r)
error_vector = np.dot(A, x) - y
for k in range(n_iters):
error_vector = np.matmul(A, p)
rp = mu * np.matmul(A.T, error_vector) + lmbda * p
alpha = delta / np.dot(p, rp)
obj_function = 0.5*np.vdot(error_vector, error_vector)
x = x + alpha * p
r = r - alpha * rp
if obj_function.imag > 0.0:
raise RuntimeError(
'Objective function contains non-zero imaginary part.)')
delta_prev = delta
delta = np.dot(r, r)
sparseness_correction = d - b - mu*x
obj_function += 0.5*lmbda * \
np.vdot(sparseness_correction, sparseness_correction)
if np.sqrt(delta) < gtol:
break
if obj_function.imag > 0.0:
raise RuntimeError(
'Objective function contains non-zero imaginary part.)')
beta = delta / delta_prev
p = beta * p + r
return obj_function
sdelta = np.sqrt(delta)
if sdelta > gtol:
logger.warning("unconverged gradient, CGmin = {:12.6}".format(sdelta))
return x
def _objective_function_derivative(x: np.ndarray,
A: np.ndarray,
y: np.ndarray,
mu: float,
lmbda: float,
d: np.ndarray,
b: np.ndarray,
AtA: np.ndarray,
ftA: np.ndarray) -> np.ndarray:
def _get_right_preconditioner(A, mu, lmbda):
"""
Returns the derivative of the objective function.
Returns the preconditioning matrix.
Calculates equation 34 of "Compressive sensing lattice dynamics. I. General
formalism Fei Zhou (周非), Weston Nielson, Yi Xia, and Vidvuds Ozoliņš
Phys. Rev. B 100, 184308"
Parameters
-----------
X
A
fit matrix
y
target array
mu
the parameter that adjusts sparseness.
preconditioner mu parameter.
lmbda
Split Bregman parameter
d
same notation as Nelson, Hart paper
b
same notation as Nelson, Hart paper
AtA
sensing matrix transpose times sensing matrix.
ftA
np.dot(y.conj().transpose(), A)
preconditioner lmbda value
"""
ret = np.squeeze(np.dot(x[np.newaxis, :], AtA) -
ftA - lmbda*mu*(d - mu * x - b))
return ret
ata = np.matmul(A.T, A)
ata = (ata.T + ata) / 2
eig_val, eig_vec = scipy.linalg.eigh(ata)
over_sq_q = 1 / np.sqrt(eig_val + mu * lmbda)
return np.matmul(eig_vec, np.matmul(np.diag(over_sq_q), eig_vec.T))

@@ -17,3 +17,3 @@ """

from .force_constants import SortedForceConstants
from .io.logging import logger
from .input_output.logging_tools import logger
from .calculators.numba_calc import (clusters_force_contribution,

@@ -94,3 +94,3 @@ cluster_force_contribution)

return new_cluster
logger.info('Populating orbits')
logger.debug('Populating orbits')
bar = Progress(len(cs.orbits))

@@ -97,0 +97,0 @@ scR_tensormatrix_lookup = dict()

@@ -7,6 +7,5 @@ """

import pickle
import numpy as np
from collections import Counter
import numpy as np
from .force_constant_model import ForceConstantModel

@@ -101,9 +100,12 @@ from .core.orbits import Orbit

# get cell
if hasattr(_prim, '_cell'): # 3.17
cell = _prim._cell
pbc = _prim._pbc
else: # 3.18
cell = _prim.cell[:]
pbc = _prim.pbc
# assume PBC True (as it has to be True in hiphive)
pbc = [True, True, True]
# finalize
new_prim = Atoms(

@@ -148,4 +150,4 @@ symbols=_prim.symbols, positions=_prim.positions, cell=cell, pbc=pbc)

def orbit_data(self):
""" list : list of dictionaries containing detailed information for
each orbit, e.g. cluster radius and force constant
""" List[dict] : list of dictionaries containing detailed information for
each orbit, e.g. cluster radius and force constant
"""

@@ -152,0 +154,0 @@ data = []

@@ -11,9 +11,12 @@ """

from string import ascii_lowercase, ascii_uppercase
from typing import List, Tuple
from scipy.linalg import eig
from ase import units
from ase import Atoms, units
from .io import shengBTE as shengBTEIO
from .io import phonopy as phonopyIO
from .io.read_write_files import add_items_to_tarfile_pickle, read_items_pickle,\
from .input_output import shengBTE as shengBTEIO
from .input_output import phonopy as phonopyIO
from .input_output.read_write_files import add_items_to_tarfile_pickle, read_items_pickle,\
add_ase_atoms_to_tarfile, read_ase_atoms_from_tarfile
from .input_output import gpumd as gpumdIO

@@ -24,3 +27,3 @@

def __init__(self, supercell):
def __init__(self, supercell: Atoms):
if not all(supercell.pbc):

@@ -35,21 +38,21 @@ raise ValueError('configuration must have periodic boundary conditions')

@property
def n_atoms(self):
""" int : number of atoms """
def n_atoms(self) -> int:
""" number of atoms """
return len(self.supercell)
@property
def supercell(self):
""" ase.Atoms : supercell associated with force constants """
def supercell(self) -> Atoms:
""" supercell associated with force constants """
return self._supercell.copy()
@property
def clusters(self):
""" list : sorted list of clusters """
def clusters(self) -> list:
""" sorted list of clusters """
return sorted(self._fc_dict.keys(), key=lambda c: (len(c), c))
def __len__(self):
def __len__(self) -> int:
""" int : number of clusters (or force constants) """
return len(self._fc_dict)
def __add__(self, fcs_other):
def __add__(self, fcs_other) -> None:
""" ForceConstants : addition of two force constants """

@@ -71,3 +74,3 @@ if type(self) != type(fcs_other):

def get_fc_dict(self, order=None):
def get_fc_dict(self, order: int = None) -> dict:
""" Returns force constant dictionary for one specific order.

@@ -80,3 +83,3 @@

----------
order : int
order
force constants returned for this order

@@ -86,5 +89,4 @@

-------
dict
dictionary with keys corresponding to clusters and values to
respective force constant
dictionary with keys corresponding to clusters and values to
respective force constant
"""

@@ -103,3 +105,3 @@ if order is None:

def get_fc_array(self, order, format='phonopy'):
def get_fc_array(self, order: int, format: str = 'phonopy') -> np.ndarray:
""" Returns force constants in array format for specified order.

@@ -109,5 +111,5 @@

----------
order : int
order
force constants for this order will be returned
format : str
format
specify which format (shape) the NumPy array should have,

@@ -118,5 +120,4 @@ possible values are `phonopy` and `ase`

-------
numpy.ndarray
NumPy array with shape `(N,)*order + (3,)*order` where `N` is
the number of atoms
NumPy array with shape `(N,)*order + (3,)*order` where `N` is
the number of atoms
"""

@@ -141,10 +142,5 @@ if order not in self.orders:

def compute_gamma_frequencies(self):
""" Computes gamma frequencies using the second-order force constants.
Returns
-------
gamma_frequencies : numpy.ndarray
Gamma frequencies in THz
"""
def compute_gamma_frequencies(self) -> np.ndarray:
""" Returns the Gamma frequencies in THz using the second-order force
constants. """
fc2_array = self.get_fc_array(order=2)

@@ -154,10 +150,10 @@ masses = self.supercell.get_masses()

def assert_acoustic_sum_rules(self, order=None, tol=1e-6):
""" Asserts that acoustic sum rules are enforced for force constants.
def assert_acoustic_sum_rules(self, order: int = None, tol: float = 1e-6):
""" Asserts that force constants obey acoustic sum rules.
Parameters
----------
order : int
order
specifies which order to check, if None all are checked
tol : float
tol
numeric tolerance for checking sum rules

@@ -168,3 +164,3 @@

AssertionError
if acoustic sum rules are not enforced
if acoustic sum rules are violated
"""

@@ -182,3 +178,3 @@

for order in orders:
assert_msg = 'Acoustic sum rule order {} not enforced for atom'
assert_msg = 'Acoustic sum rule for order {} violated for atom'
assert_msg += ' {}' * (order - 1) + ' x'

@@ -190,6 +186,5 @@ for ijk in product(atomic_indices, repeat=order-1):

fc_sum_ijk += self[cluster]
assert np.linalg.norm(fc_sum_ijk) < tol, \
assert_msg.format(order, *ijk)
assert np.linalg.norm(fc_sum_ijk) < tol, assert_msg.format(order, *ijk)
def print_force_constant(self, cluster):
def print_force_constant(self, cluster: Tuple[int]) -> None:
"""

@@ -200,3 +195,3 @@ Prints force constants for a cluster in a nice format.

----------
cluster : tuple(int)
cluster
sites belonging to the cluster

@@ -230,3 +225,3 @@ """

def __str__(self):
def __str__(self) -> str:
s = []

@@ -248,3 +243,3 @@ s.append(' ForceConstants '.center(54, '='))

def __repr__(self):
def __repr__(self) -> str:
fc_dict_str = '{{{}: {}, ..., {}: {}}}'.format(

@@ -256,3 +251,4 @@ self.clusters[0], self[self.clusters[0]].round(5),

def _repr_fc(self, cluster, precision=5, suppress=True):
def _repr_fc(self, cluster: Tuple[int],
precision: float = 5, suppress: bool = True) -> str:
"""

@@ -263,3 +259,3 @@ Representation for single cluster and its force constant.

----------
cluster : tuple
cluster
tuple of ints indicating the sites belonging to the cluster

@@ -276,3 +272,3 @@ """

def _sanity_check_dict(self, fc_dict):
def _sanity_check_dict(self, fc_dict: dict) -> None:
""" Checks that all indices in clusters are between 0 and number of

@@ -285,3 +281,4 @@ atoms. """

@classmethod
def from_arrays(cls, supercell, fc2_array=None, fc3_array=None):
def from_arrays(cls, supercell: Atoms,
fc2_array: np.ndarray = None, fc3_array: np.ndarray = None):
""" Constructs FCs from numpy arrays.

@@ -293,7 +290,7 @@

----------
supercell : ase.Atoms
supercell
supercell structure
fc2_array : np.ndarray
fc2_array
second-order force constant in phonopy format, i.e. must have shape (N, N, 3, 3)
fc3_array : np.ndarray
fc3_array
third-order force constant in phonopy format, i.e. must have shape (N, N, N, 3, 3, 3)

@@ -323,3 +320,3 @@ """

@classmethod
def from_sparse_dict(cls, fc_dict, supercell):
def from_sparse_dict(cls, fc_dict: dict, supercell: Atoms):
""" Assumes label symmetries, meaning only one cluster for each

@@ -330,5 +327,6 @@ permuation should be included

----------
fc_dict : dict
fc_dict
keys corresponding to clusters and values to the force constants
supercell : ase.Atoms
supercell
atomic configuration
"""

@@ -338,3 +336,3 @@ return SortedForceConstants(fc_dict, supercell=supercell)

@classmethod
def from_dense_dict(cls, fc_dict, supercell):
def from_dense_dict(cls, fc_dict: dict, supercell: Atoms):
""" All permutations of clusters that are not zero must be listed,

@@ -345,5 +343,6 @@ if label symmetries are fullfilled will return a SortedForceConstants

----------
fc_dict : dict
fc_dict
keys corresponding to clusters and values to the force constants
supercell : ase.Atoms
supercell
atomic configuration
"""

@@ -357,3 +356,3 @@ if check_label_symmetries(fc_dict):

@classmethod
def read_phonopy(cls, supercell, fname, format=None):
def read_phonopy(cls, supercell: Atoms, fname: str, format: str = None):
""" Reads force constants from a phonopy calculation.

@@ -363,8 +362,9 @@

----------
supercell : ase.Atoms
supercell
supercell structure (`SPOSCAR`)
fname : str
fname
name of second-order force constant file
format : str
format for second-order force constants
format
format for second-order force constants;
possible values: "text", "hdf5"
"""

@@ -375,3 +375,3 @@ fc2_array = phonopyIO.read_phonopy_fc2(fname, format=format)

@classmethod
def read_phono3py(cls, supercell, fname):
def read_phono3py(cls, supercell: Atoms, fname: str):
""" Reads force constants from a phono3py calculation.

@@ -381,5 +381,5 @@

----------
supercell : ase.Atoms
supercell
supercell structure (`SPOSCAR`)
fname : str
fname
name of third-order force constant file

@@ -391,3 +391,3 @@ """

@classmethod
def read_shengBTE(cls, supercell, fname, prim):
def read_shengBTE(cls, supercell: Atoms, fname: str, prim: Atoms):
""" Reads third order force constants from a shengBTE calculation.

@@ -399,7 +399,7 @@

----------
supercell : str
supercell
supercell structure
fname : str
fname
name of third-order force constant file
prim : ase.Atoms
prim
primitive configuration (must be equivalent to structure used in

@@ -412,3 +412,3 @@ the shengBTE calculation)

@classmethod
def read(cls, fname):
def read(cls, fname: str):
""" Reads ForceConstants from file.

@@ -418,3 +418,3 @@

----------
fname : str
fname
name of file from which to read

@@ -435,3 +435,3 @@ """

def write(self, fname):
def write(self, fname: str) -> None:
""" Writes entire ForceConstants object to file.

@@ -441,3 +441,3 @@

----------
fname : str
fname
name of file to which to write

@@ -451,3 +451,3 @@ """

def write_to_phonopy(self, fname, format=None):
def write_to_phonopy(self, fname: str, format: str = None) -> None:
"""

@@ -458,10 +458,11 @@ Writes force constants in phonopy format.

----------
fname : str
fname
name of file to which to write second-order force constant
format : str
format for second-order force constants
format
format for second-order force constants;
possible values: "text", "hdf5"
"""
phonopyIO.write_phonopy_fc2(fname, self, format=format)
def write_to_phono3py(self, fname):
def write_to_phono3py(self, fname: str) -> None:
"""

@@ -472,3 +473,3 @@ Writes force constants in phono3py format.

----------
fname : str
fname
name of file to which to write third-order force constant

@@ -478,3 +479,3 @@ """

def write_to_shengBTE(self, fname, prim, **kwargs):
def write_to_shengBTE(self, fname: str, prim: Atoms, **kwargs) -> None:
"""

@@ -485,5 +486,5 @@ Writes third order force constants in shengBTE format.

----------
fname : str
fname
name of file to which to write third-order force constant
prim : ase.Atoms
prim
primitive configuration (must be equivalent to structure used in

@@ -506,3 +507,3 @@ the shengBTE calculation)

def __init__(self, fc_dict, supercell):
def __init__(self, fc_dict: dict, supercell: Atoms) -> None:
super().__init__(supercell)

@@ -513,3 +514,3 @@ self._sanity_check_dict(fc_dict)

def __getitem__(self, cluster):
def __getitem__(self, cluster: Tuple[int]):
sorted_cluster = tuple(sorted(cluster))

@@ -526,7 +527,7 @@

@property
def orders(self):
""" list : orders for which force constants exist """
def orders(self) -> List[int]:
""" orders for which force constants exist """
return self._orders.copy()
def _sanity_check_dict(self, fc_dict):
def _sanity_check_dict(self, fc_dict: dict) -> None:
super()._sanity_check_dict(fc_dict)

@@ -539,3 +540,21 @@

def write_to_GPUMD(self, fname_fc, fname_clusters, order, tol=1e-10):
"""
Writes force constants of the specified order in GPUMD format.
Parameters
----------
fname_fc : str
name of file which contains the lookup force constants
fname_clusters : str
name of file which contains the clusters and the fc lookup index
order : int
force constants for this order will be written to file
tol : float
if the norm of a force constant is less than tol then it is not written.
if two force-constants are within tol; they are considered equal.
"""
gpumdIO.write_fcs_gpumd(fname_fc, fname_clusters, self, order, tol)
class RawForceConstants(ForceConstants):

@@ -552,3 +571,3 @@ """ Force constants without label symmetries.

def __init__(self, fc_dict, supercell):
def __init__(self, fc_dict: dict, supercell: Atoms) -> None:
super().__init__(supercell)

@@ -559,3 +578,3 @@ self._sanity_check_dict(fc_dict)

def __getitem__(self, cluster):
def __getitem__(self, cluster: Tuple[int]):
if cluster not in self._fc_dict.keys():

@@ -566,4 +585,4 @@ return np.zeros((3,)*len(cluster))

@property
def orders(self):
""" list : orders for which force constants exist """
def orders(self) -> List[int]:
""" orders for which force constants exist """
return self._orders.copy()

@@ -577,3 +596,3 @@

def array_to_dense_dict(fc_array, fc_tol=1e-10):
def array_to_dense_dict(fc_array: np.ndarray, fc_tol: float = 1e-10) -> dict:
""" Constructs a dense dict from an fc array in phonopy format.

@@ -586,5 +605,5 @@

----------
fc_array : np.ndarray
fc_array
force constant array in phonopy format
fc_tol : float
fc_tol
tolerance for considering force constants zero or not

@@ -608,3 +627,3 @@ """

def check_label_symmetries(fc_dict):
def check_label_symmetries(fc_dict: dict) -> bool:
""" Checks label symmetries for dense fc dict.

@@ -618,3 +637,3 @@

----------
fc_dict : dict
fc_dict
keys corresponding to clusters and values to the force constants

@@ -632,3 +651,3 @@ """

def dense_dict_to_sparse_dict(fc_dict):
def dense_dict_to_sparse_dict(fc_dict: dict) -> dict:
""" Converts dense dict to sparse dict.

@@ -641,3 +660,3 @@

----------
fc_dict : dict
fc_dict
keys corresponding to clusters and values to the force constants

@@ -652,3 +671,3 @@ """

def symbolize_force_constant(fc, tol=1e-10):
def symbolize_force_constant(fc: np.ndarray, tol: float = 1e-10) -> np.ndarray:
"""Carries out a symbolic symmetrization of a force constant tensor.

@@ -658,5 +677,5 @@

----------
fc : numpy.ndarray
fc
force constant tensor
tol : float
tol
tolerance used to decide whether two elements are identical

@@ -666,4 +685,3 @@

-------
numpy.ndarray
symbolic representation of force constant matrix
symbolic representation of force constant matrix
"""

@@ -687,5 +705,4 @@ fc_int = np.round(fc / tol).astype(int)

def _compute_gamma_frequencies(fc2, masses):
""" Computes Gamma frequencies using second-order force constants
def _compute_gamma_frequencies(fc2: np.ndarray, masses: List[float]) -> np.ndarray:
""" Computes Gamma frequencies using second-order force constants.
Assumes fc2 is in units of eV/A2.

@@ -695,5 +712,5 @@

----------
fc2 : numpy.ndarray
fc2
second-order force constants in phonopy format
masses : list(float)
masses
mass of each atom

@@ -703,4 +720,3 @@

-------
gamma_frequencies : numpy.ndarray
Gamma frequencies in THz
Gamma frequencies in THz
"""

@@ -707,0 +723,0 @@

@@ -8,3 +8,3 @@ import numpy as np

generate_phonon_rattled_structures
from hiphive.io.logging import set_config
from hiphive.input_output.logging_tools import set_config

@@ -67,5 +67,4 @@

print('Creating initial model')
rattled_structures = generate_rattled_structures(
atoms_ideal, n_structures, 0.03)
prepare_structures(rattled_structures, atoms_ideal, calc)
rattled_structures = generate_rattled_structures(atoms_ideal, n_structures, 0.03)
rattled_structures = prepare_structures(rattled_structures, atoms_ideal, calc)
for structure in rattled_structures:

@@ -89,3 +88,3 @@ sc.add_structure(structure)

atoms_ideal, fc2, n_structures, T)
prepare_structures(phonon_rattled_structures, atoms_ideal, calc)
phonon_rattled_structures = prepare_structures(phonon_rattled_structures, atoms_ideal, calc)

@@ -92,0 +91,0 @@ # fit new model

@@ -11,13 +11,13 @@ """

from .io.read_write_files import (add_items_to_tarfile_hdf5,
add_items_to_tarfile_pickle,
add_items_to_tarfile_custom,
add_list_to_tarfile_custom,
read_items_hdf5,
read_items_pickle,
read_list_custom)
from .input_output.read_write_files import (add_items_to_tarfile_hdf5,
add_items_to_tarfile_pickle,
add_items_to_tarfile_custom,
add_list_to_tarfile_custom,
read_items_hdf5,
read_items_pickle,
read_list_custom)
from .cluster_space import ClusterSpace
from .force_constant_model import ForceConstantModel
from .io.logging import logger
from .input_output.logging_tools import logger
logger = logger.getChild('structure_container')

@@ -181,3 +181,3 @@

logger.info('Adding structure')
logger.debug('Adding structure')
M = self._compute_fit_matrix(atoms)

@@ -260,2 +260,4 @@ structure = FitStructure(atoms_copy, M, **meta_data)

s += ['{:22} : {}'.format('Total number of structures', len(self))]
_, target_forces = self.get_fit_data()
s += ['{:22} : {}'.format('Number of force components', len(target_forces))]
s.append(''.center(n, '-'))

@@ -273,8 +275,8 @@ s.append(str_structure(-1, dummy))

""" Compute fit matrix for a single atoms object """
logger.info('Computing fit matrix')
logger.debug('Computing fit matrix')
if atoms != getattr(self._previous_fcm, 'atoms', None):
logger.info(' Building new FCM object')
logger.debug(' Building new FCM object')
self._previous_fcm = ForceConstantModel(atoms, self._cs)
else:
logger.info(' Reusing old FCM object')
logger.debug(' Reusing old FCM object')
return self._previous_fcm.get_fit_matrix(atoms.get_array('displacements'))

@@ -281,0 +283,0 @@

import numpy as np
from ase.units import kB
from hiphive.io.logging import logger
from hiphive.input_output.logging_tools import logger

@@ -29,3 +29,3 @@ logger = logger.getChild(__name__)

See: West and Estreicher PRL 96, 115504 (2006)
See: West and Estreicher, Physical Review Letters **96**, 115504 (2006)

@@ -60,3 +60,3 @@ Parameters

""" Thermal excitation of phonon modes as described by West and
Estreicher PRL 96, 115504 (2006).
Estreicher, Physical Review Letters **96**, 115504 (2006).

@@ -63,0 +63,0 @@ _s is a mode index

@@ -58,3 +58,3 @@ """

accepted with a probability determined from the minimum
interatomic distance :math:`d_{ij}`. If :math`\\min(d_{ij})` is
interatomic distance :math:`d_{ij}`. If :math:`\\min(d_{ij})` is
smaller than :math:`d_{min}` the move is only accepted with a low

@@ -88,3 +88,3 @@ probability.

rattle amplitude (standard deviation in normal distribution);
note this value is not *directly* connected to the final
note this value is not connected to the final
average displacement for the structures

@@ -97,2 +97,4 @@ d_min : float

generated
n_iter : int
number of Monte Carlo cycles

@@ -99,0 +101,0 @@ Returns

@@ -5,12 +5,13 @@ """

from typing import List
from typing import List, Tuple
import numpy as np
from ase import Atoms
from ase.calculators.calculator import Calculator
from ase.calculators.singlepoint import SinglePointCalculator
from ase.geometry import find_mic
from ase.geometry import get_distances
from hiphive import ClusterSpace, ForceConstants
from hiphive.force_constant_model import ForceConstantModel
from .io.logging import logger
from ase.neighborlist import neighbor_list
from .cluster_space import ClusterSpace
from .force_constants import ForceConstants
from .input_output.logging_tools import logger

@@ -21,3 +22,5 @@

def get_displacements(atoms: Atoms, atoms_ideal: Atoms) -> np.ndarray:
def get_displacements(atoms: Atoms,
atoms_ideal: Atoms,
cell_tol: float = 1e-4) -> np.ndarray:
"""Returns the the smallest possible displacements between a

@@ -38,45 +41,106 @@ displaced configuration relative to an ideal (reference)

ideal configuration relative to which displacements are computed
cell_tol
cell tolerance; if cell missmatch more than tol value error is raised
"""
if not np.array_equal(atoms.numbers, atoms_ideal.numbers):
raise ValueError('Atomic numbers do not match.')
if np.linalg.norm(atoms.cell - atoms_ideal.cell) > cell_tol:
raise ValueError('Cells do not match.')
displacements = []
for pos, ideal_pos in zip(atoms.positions, atoms_ideal.positions):
v_ij = np.array([pos - ideal_pos])
displacements.append(find_mic(v_ij, atoms_ideal.cell, pbc=True)[0][0])
return np.array(displacements)
raw_position_diff = atoms.positions - atoms_ideal.positions
wrapped_mic_displacements = find_mic(raw_position_diff, atoms_ideal.cell, pbc=True)[0]
return wrapped_mic_displacements
def prepare_structures(structures: List[Atoms], atoms_ideal: Atoms,
calc: Calculator) -> None:
"""Prepares a set of structures in the format suitable for a
:class:`StructureContainer <hiphive.StructureContainer>`. This
includes retrieving the forces using the provided calculator,
resetting the positions to the ideal configuration, and adding
arrays with forces and displacements.
def prepare_structure(atoms: Atoms,
atoms_ideal: Atoms,
calc: SinglePointCalculator = None) -> Atoms:
"""Prepare a structure in the format suitable for a
:class:`StructureContainer <hiphive.StructureContainer>`.
Notes
-----
Changes the structures in place.
Parameters
----------
atoms
input structure
atoms_ideal
reference structure relative to which displacements are computed
calc
ASE calculator used for computing forces
Returns
-------
ASE atoms object
prepared ASE atoms object with forces and displacements as arrays
"""
# get forces
if 'forces' in atoms.arrays:
forces = atoms.get_array('forces')
elif calc is not None:
atoms_tmp = atoms.copy()
atoms_tmp.calc = calc
forces = atoms_tmp.get_forces()
elif isinstance(atoms.calc, SinglePointCalculator):
forces = atoms.get_forces()
# setup new atoms
perm = find_permutation(atoms, atoms_ideal)
atoms_new = atoms.copy()
atoms_new = atoms_new[perm]
atoms_new.arrays['forces'] = forces[perm]
disps = get_displacements(atoms_new, atoms_ideal)
atoms_new.arrays['displacements'] = disps
atoms_new.positions = atoms_ideal.positions
return atoms_new
def prepare_structures(structures: List[Atoms],
atoms_ideal: Atoms,
calc: SinglePointCalculator = None) -> List[Atoms]:
"""Prepares a set of structures in the format suitable for adding them to
a :class:`StructureContainer <hiphive.StructureContainer>`.
`structures` should represent a list of supercells with displacements
while `atoms_ideal` should provide the ideal reference structure (without
displacements) for the given structures.
The structures that are returned will have their positions reset to the
ideal structures. Displacements and forces will be added as arrays to the
atoms objects.
If no calculator is provided, then there must be an ASE
`SinglePointCalculator <ase.calculators.singlepoint>` object attached to
the structures or the forces should already be attached as
arrays to the structures.
If a calculator is provided then it will be used to compute the forces for
all structures.
Example
-------
>>> db = connect('dft_training_structures.db')
>>> training_structures = [row.toatoms() for row in db.select()]
>>> training_structures = prepare_structures(training_structures, atoms_ideal)
>>> for s in training_structures:
>>> sc.add_structure(s)
Parameters
----------
structures
list of input configurations
list of input displaced structures
atoms_ideal
reference configuration relative to which displacements
are computed
reference structure relative to which displacements are computed
calc
ASE calculator used for computing forces
Returns
-------
list of prepared structures with forces and displacements as arrays
"""
for atoms in structures:
atoms.set_calculator(calc)
forces = atoms.get_forces()
displacements = get_displacements(atoms, atoms_ideal)
atoms.positions = atoms_ideal.get_positions()
atoms.new_array('forces', forces)
atoms.new_array('displacements', displacements)
return [prepare_structure(s, atoms_ideal, calc) for s in structures]
def find_permutation(atoms: Atoms, atoms_ref: Atoms) -> List[int]:
def find_permutation(atoms: Atoms,
atoms_ref: Atoms) -> List[int]:
""" Returns the best permutation of atoms for mapping one

@@ -131,3 +195,6 @@ configuration onto another.

def __init__(self, types, distance, count=0):
def __init__(self,
types: List[str],
distance: float,
count: int = 0):
self.types = types

@@ -144,3 +211,5 @@ self.distance = distance

def get_neighbor_shells(atoms: Atoms, cutoff: float, dist_tol: float = 1e-5) -> List[Shell]:
def get_neighbor_shells(atoms: Atoms,
cutoff: float,
dist_tol: float = 1e-5) -> List[Shell]:
""" Returns a list of neighbor shells.

@@ -168,9 +237,2 @@

# TODO: Remove this once this feature have been in ASE long enough
try:
from ase.neighborlist import neighbor_list
except ImportError:
raise ImportError('get_neighbor_shells uses new functionality from ASE'
', please update ASE in order to use this function.')
# get distances

@@ -206,5 +268,8 @@ ijd = neighbor_list('ijd', atoms, cutoff)

def extract_parameters(fcs: ForceConstants, cs: ClusterSpace):
def extract_parameters(fcs: ForceConstants,
cs: ClusterSpace) -> Tuple[np.ndarray, np.ndarray, int, np.ndarray]:
""" Extracts parameters from force constants.
TODO: Rename this function with more explanatory name?
This function can be used to extract parameters to create a

@@ -221,4 +286,21 @@ ForceConstantPotential from a known set of force constants.

cluster space
Returns
-------
x : {(N,), (N, K)} ndarray
Least-squares solution. If `b` is two-dimensional,
the solutions are in the `K` columns of `x`.
residuals : {(1,), (K,), (0,)} ndarray
Sums of residuals; squared Euclidean 2-norm for each column in
``b - a*x``.
If the rank of `a` is < N or M <= N, this is an empty array.
If `b` is 1-dimensional, this is a (1,) shape array.
Otherwise the shape is (K,).
rank : int
Rank of matrix `a`.
s : (min(M, N),) ndarray
Singular values of `a`.
"""
from .force_constant_model import ForceConstantModel
fcm = ForceConstantModel(fcs.supercell, cs)
return np.linalg.lstsq(*fcm.get_fcs_sensing(fcs), rcond=None)[0]
Metadata-Version: 1.2
Name: hiphive
Version: 0.6
Version: 0.7
Summary: High-order force constants for the masses

@@ -16,3 +16,4 @@ Home-page: http://hiphive.materialsmodeling.org/

extensive tutorial can be found in the
`user guide <https://hiphive.materialsmodeling.org/>`_.
`user guide <https://hiphive.materialsmodeling.org/>`_. Complete examples of
using hiphive for force constants extaction can be found at `hiphive examples <https://gitlab.com/materials-modeling/hiphive-examples/>`_.

@@ -83,4 +84,4 @@ **hiPhive** is written in Python, which allows

Also consult the `Credits <https://hiphive.materialsmodeling.org/credits>` page
of the documentation for additional references.
Also consult the `Credits <https://hiphive.materialsmodeling.org/credits>`_
page of the documentation for additional references.

@@ -87,0 +88,0 @@ **hiphive** and its development are hosted on

@@ -8,3 +8,4 @@ hiPhive

extensive tutorial can be found in the
`user guide <https://hiphive.materialsmodeling.org/>`_.
`user guide <https://hiphive.materialsmodeling.org/>`_. Complete examples of
using hiphive for force constants extaction can be found at `hiphive examples <https://gitlab.com/materials-modeling/hiphive-examples/>`_.

@@ -75,4 +76,4 @@ **hiPhive** is written in Python, which allows

Also consult the `Credits <https://hiphive.materialsmodeling.org/credits>` page
of the documentation for additional references.
Also consult the `Credits <https://hiphive.materialsmodeling.org/credits>`_
page of the documentation for additional references.

@@ -79,0 +80,0 @@ **hiphive** and its development are hosted on

@@ -38,5 +38,6 @@ #!/usr/bin/env python3

'numpy>=1.12',
'sklearn',
'scipy',
'scikit-learn',
'spglib',
'sympy'],
'sympy>=1.1'],
packages=find_packages(),

@@ -43,0 +44,0 @@ classifiers=[

import pickle
def _write_pickle(fname, data):
""" Write data to pickle file with filename fname """
with open(fname, 'wb') as handle:
pickle.dump(data, handle, protocol=pickle.HIGHEST_PROTOCOL)
def _read_pickle(fname):
""" Read pickle file and return content """
with open(fname, 'rb') as handle:
return pickle.load(handle)
""" This module contains functions and variables to control hiPhive's logging
* `logger` - the module logger
* set_config - function to control level and logfile
"""
import logging
import sys
from timeit import default_timer as timer
# This is the root logger of hiPhive
logger = logging.getLogger('hiphive')
# Will process all levels of INFO or higher
logger.setLevel(logging.INFO)
# If you know what you are doing you may set this to True
logger.propagate = False
# The hiPhive logger will collect events from childs and the default behaviour
# is to print it directly to stdout
ch = logging.StreamHandler(sys.stdout)
logger.addHandler(ch)
continuous_logging = False
# TODO: use Context management protocol instead
class Progress:
""" Progress bar like functionality. """
def __init__(self, tot=None, mode='frac', estimate_remaining=True):
if tot is None:
self._tot = '?'
assert not estimate_remaining
else:
self._tot = tot
self._progress = 0
self._estimate_remaining = estimate_remaining
self._last_update = 0
self._start = timer()
def tick(self):
self._progress += 1
delta = timer() - self._last_update
if continuous_logging and delta > 2:
self._last_update = timer()
print('\r' + ' ' * 70 + '\r', end='', flush=True)
print('{}/{}={:.3%}'.format(self._progress,
self._tot, self._progress/self._tot), end='', flush=True)
if self._estimate_remaining and self._tot != '?':
remaining_time = (self._tot - self._progress) * (
timer() - self._start) / self._progress
print(' time remaining: {:.3}'.format(remaining_time), end='',
flush=True)
def close(self):
if continuous_logging:
print('\r' + ' ' * 70 + '\r', end='', flush=True)
s = timer() - self._start
d, remainder = divmod(s, 60*60*24)
h, remainder = divmod(remainder, 60*60)
m, s = divmod(remainder, 60)
logger.info('Done in {}d {}h {}m {:.3}s'
.format(int(d), int(h), int(m), s))
def set_config(filename=None, level=None, continuous=None):
"""
Alters the way logging is handled.
Parameters
----------
filename : str
name of file the log should be sent to
level : int
verbosity level; see `Python logging library
<https://docs.python.org/3/library/logging.html>`_ for details
continuous : bool
if True the progress will be continously updated
"""
# If a filename is provided a logfile will be created
if filename is not None:
fh = logging.FileHandler(filename)
logger.addHandler(fh)
if level is not None:
logger.setLevel(level)
if continuous is not None:
global continuous_logging
continuous_logging = continuous
"""
This module provides functions for reading and writing data files
in phonopy and phono3py formats.
"""
import warnings
import os
import numpy as np
from itertools import product
import hiphive
from ..io.logging import logger
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=FutureWarning)
import h5py
logger = logger.getChild('io_phonopy')
def _filename_to_format(filename):
""" Tries to guess the format from the filename """
basename = os.path.basename(filename)
if basename == 'FORCE_CONSTANTS':
return 'text'
if '.' in basename:
extension = basename.split('.')[-1]
if extension == 'hdf5':
return 'hdf5'
raise ValueError('Could not guess file format')
def read_phonopy_fc2(filename, format=None):
"""Parses a second-order force constant file in phonopy format.
Parameters
----------
filename : str
input file name
format : str
specify the file-format, if None tries to guess the format from filename
Returns
-------
numpy.ndarray
second-order force constant matrix (`N,N,3,3` array)
where `N` is the number of atoms
"""
fc2_readers = {
'text': _read_phonopy_fc2_text,
'hdf5': _read_phonopy_fc2_hdf5
}
if format is None:
format = _filename_to_format(filename)
if format not in fc2_readers.keys():
raise ValueError('Did not recognize format {}'.format(format))
return fc2_readers[format](filename)
def write_phonopy_fc2(filename, fc2, format=None):
"""Writes second-order force constant matrix in phonopy format.
Parameters
----------
filename : str
output file name
fc2 : ForceConstants or numpy.ndarray
second-order force constant matrix
format : str
specify the file-format, if None try to guess the format from filename
"""
fc2_writers = {
'text': _write_phonopy_fc2_text,
'hdf5': _write_phonopy_fc2_hdf5
}
# get fc2_array
if isinstance(fc2, hiphive.force_constants.ForceConstants):
fc2_array = fc2.get_fc_array(order=2)
elif isinstance(fc2, np.ndarray):
fc2_array = fc2
else:
raise TypeError('fc2 should be ForceConstants or NumPy array')
# check that fc2 has correct shape
n_atoms = fc2_array.shape[0]
if fc2_array.shape != (n_atoms, n_atoms, 3, 3):
raise ValueError('fc2 has wrong shape')
# write
if format is None:
format = _filename_to_format(filename)
if format not in fc2_writers.keys():
raise ValueError('Did not recognize format {}'.format(format))
fc2_writers[format](filename, fc2_array)
def read_phonopy_fc3(filename):
"""Parses a third order force constant file in phonopy hdf5 format.
Parameters
----------
filename : str
input file name
Returns
-------
numpy.ndarray
third order force constant matrix
"""
with h5py.File(filename, 'r') as hf:
if 'fc3' in hf.keys():
fc3 = hf['fc3'][:]
else:
raise IOError('Could not find fc3 in file {}'.format(filename))
return fc3
def write_phonopy_fc3(filename, fc3):
"""Writes third order force constant matrix in phonopy hdf5 format.
Parameters
----------
filename : str
output file name
fc3 : ForceConstants or numpy.ndarray
third order force constant matrix
"""
if isinstance(fc3, hiphive.force_constants.ForceConstants):
fc3_array = fc3.get_fc_array(order=3)
elif isinstance(fc3, np.ndarray):
fc3_array = fc3
else:
raise TypeError('fc3 should be ForceConstants or NumPy array')
# check that fc3 has correct shape
n_atoms = fc3_array.shape[0]
if fc3_array.shape != (n_atoms, n_atoms, n_atoms, 3, 3, 3):
raise ValueError('fc3 has wrong shape')
with h5py.File(filename, 'w') as hf:
hf.create_dataset('fc3', data=fc3_array, compression='gzip')
hf.flush()
def _read_phonopy_fc2_text(filename):
""" Reads phonopy-fc2 file in text format. """
with open(filename, 'r') as f:
# read shape of force constants
line = f.readline()
line_ints = [int(x) for x in line.split()]
if len(line_ints) == 1:
n_atoms = line_ints[0]
elif len(line_ints) == 2:
assert line_ints[0] == line_ints[1]
n_atoms = line_ints[0]
else:
raise ValueError('Unsupported or incorrect phonopy format')
fc2 = np.full((n_atoms, n_atoms, 3, 3), np.nan)
# read force constants
lines = f.readlines()
for n, line in enumerate(lines):
flds = line.split()
if len(flds) == 2:
i = int(flds[0]) - 1 # phonopy index starts with 1
j = int(flds[1]) - 1
for x in range(3):
fc_row = lines[n + x + 1].split()
for y in range(3):
fc2[i][j][x][y] = float(fc_row[y])
return fc2
def _read_phonopy_fc2_hdf5(filename):
""" Reads phonopy-fc2 file in hdf5 format. """
with h5py.File(filename, 'r') as hf:
if 'force_constants' in hf.keys():
fc2 = hf['force_constants'][:]
elif 'fc2' in hf.keys():
fc2 = hf['fc2'][:]
else:
raise IOError('Could not find fc2 in file {}'.format(filename))
return fc2
def _write_phonopy_fc2_text(filename, fc2):
""" Writes second-order force constants to file in text format. """
n_atoms = fc2.shape[0]
with open(filename, 'w') as f:
f.write('{:5d} {:5d}\n'.format(n_atoms, n_atoms))
for i, j in product(range(n_atoms), repeat=2):
fc2_ij = fc2[(i, j)]
if fc2_ij.shape != (3, 3):
raise ValueError('Invalid shape for fc2[({},{})]'.format(i, j))
f.write('{:-5d}{:5d}\n'.format(i + 1, j + 1))
for row in fc2_ij:
f.write((3*' {:22.15f}'+'\n').format(*tuple(row)))
def _write_phonopy_fc2_hdf5(filename, fc2):
""" Writes second-order force constants to file in hdf5 format. """
with h5py.File(filename, 'w') as hf:
hf.create_dataset('fc2', data=fc2, compression='gzip')
hf.flush()
from itertools import product
import numpy as np
def _obj2str(a, none_char='-'):
""" Casts object a to str. """
if isinstance(a, float):
# if the float is 2.49999999 then round
if str(a)[::-1].find('.') > 5:
return '{:.5f}'.format(a)
elif a is None:
return none_char
return str(a)
_array2str = np.vectorize(_obj2str)
def print_table(matrix, sum_=False):
""" Prints matrix data in a nice table format.
The matrix element matrix[i][j] should correspond to information about
order j+2 and n-body i+1.
Example
--------
>> matrix = numpy.array([[None, None], [4.0, 3.0]])
>> print_table(matrix)
body/order | 2 | 3
------------------------
1 | - | -
2 | 4.0 | 3.0
Parameters
----------
matrix : numpy.ndarray
matrix to be printed
sum: : bool
whether or not to print the sum along each row and column
"""
table_str = table_array_to_string(matrix, sum_)
print(table_str)
def table_array_to_string(matrix, sum_=False):
""" Generate nice table string from a numpy array with floats/ints """
table_array = _generate_table_array(matrix, sum_)
table_array_str = _array2str(table_array)
table_str = _generate_table_str(table_array_str)
return table_str
def _generate_table_array(table_array, sum_=False):
""" Generate table in numpy array format """
# initialze table
n_rows, n_cols = table_array.shape
A = _build_table_frame(order=n_cols+1, nbody=n_rows, sum_=sum_)
# fill table
for order, nbody in product(range(2, n_cols+2), range(1, n_rows+1)):
if nbody <= order:
A[nbody, order-1] = table_array[nbody-1, order-2]
if sum_:
for i, row in enumerate(A[1:-1, 1:-1], start=1):
A[i, -1] = sum(val for val in row if val is not None)
for i, col in enumerate(A[1:-1, 1:-1].T, start=1):
A[-1, i] = sum(val for val in col if val is not None)
A[-1, -1] = ''
return A
def _generate_table_str(table_array):
""" Generate a string from a numpy array of strings """
table_str = []
n_rows, n_cols = table_array.shape
# find maximum widths for each column
widths = []
for i in range(n_cols):
widths.append(max(len(val) for val in table_array[:, i])+2)
# formatting str for each row
row_format = '|'.join('{:^'+str(width)+'}' for width in widths)
# finalize
for i in range(n_rows):
if i == 1:
table_str.append('-' * (sum(widths)+n_cols-1))
table_str.append(row_format.format(*table_array[i, :]))
table_str = '\n'.join(table_str)
return table_str
def _build_table_frame(order, nbody, sum_=False):
""" Builds/initializes table/array. """
if sum_:
A = np.empty((nbody+2, order+1), dtype='object')
A[0, -1] = 'sum'
A[-1, 0] = 'sum'
else:
A = np.empty((nbody+1, order), dtype='object')
A[0][0] = 'body/order'
A[0, 1:order] = range(2, order+1)
A[1:nbody+1, 0] = range(1, nbody+1)
return A
def _print_table(table):
""" Prints 2D array with str elements. """
n_rows, n_cols = table.shape
# find maximum widths for each column
widths = []
for i in range(n_cols):
widths.append(max(len(val) for val in table[:, i])+2)
# formatting str for each row
row_format = '|'.join('{:^'+str(width)+'}' for width in widths)
# print
for i in range(n_rows):
if i == 1:
print('-'*(sum(widths)+n_rows-1))
print(row_format.format(*table[i, :]))
if __name__ == "__main__":
# input dummy cutoff table
# insert row for nbody=1
cutoffs = np.array([[None, None, None, None, None],
[6.0, 6.0, 6.0, 3.7, 3.7],
[5.0, 5.0, 5.0, 3.0, 3.0],
[3.7, 3.7, 3.7, 0.0, 0.0]])
# input dummy cluster count table
cluster_counts = np.array([[1, 3, 5, 5, 2],
[12, 22, 39, 42, 58],
[19, 41, 123, 421, 912],
[42, 112, 410, 617, 3271]])
print_table(cutoffs)
print('\n')
print_table(cluster_counts, sum_=False)
print('\n')
print_table(cluster_counts, sum_=True)
"""
Helper functions for reading and writing objects to tar files
"""
import pickle
import tempfile
import warnings
import ase.io as aseIO
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=FutureWarning)
import h5py
def add_ase_atoms_to_tarfile(tar_file, atoms, arcname, format='json'):
""" Adds an ase.Atoms object to tar_file """
temp_file = tempfile.NamedTemporaryFile()
aseIO.write(temp_file.name, atoms, format=format)
temp_file.seek(0)
tar_info = tar_file.gettarinfo(arcname=arcname, fileobj=temp_file)
tar_file.addfile(tar_info, temp_file)
def read_ase_atoms_from_tarfile(tar_file, arcname, format='json'):
""" Reads ase.Atoms from tar file """
temp_file = tempfile.NamedTemporaryFile()
temp_file.write(tar_file.extractfile(arcname).read())
temp_file.seek(0)
atoms = aseIO.read(temp_file.name, format=format)
return atoms
def add_items_to_tarfile_hdf5(tar_file, items, arcname):
""" Add items to one hdf5 file """
temp_file = tempfile.NamedTemporaryFile()
hf = h5py.File(temp_file.name)
for key, value in items.items():
hf.create_dataset(key, data=value, compression='gzip')
hf.close()
temp_file.seek(0)
tar_info = tar_file.gettarinfo(arcname=arcname, fileobj=temp_file)
tar_file.addfile(tar_info, temp_file)
temp_file.close()
def add_items_to_tarfile_pickle(tar_file, items, arcname):
""" Add items by pickling them """
temp_file = tempfile.TemporaryFile()
pickle.dump(items, temp_file)
temp_file.seek(0)
tar_info = tar_file.gettarinfo(arcname=arcname, fileobj=temp_file)
tar_file.addfile(tar_info, temp_file)
temp_file.close()
def add_items_to_tarfile_custom(tar_file, items):
""" Add items assuming they have a custom write function """
for key, value in items.items():
temp_file = tempfile.TemporaryFile()
value.write(temp_file)
temp_file.seek(0)
tar_info = tar_file.gettarinfo(arcname=key, fileobj=temp_file)
tar_file.addfile(tar_info, temp_file)
temp_file.close()
def add_list_to_tarfile_custom(tar_file, objects, arcname):
""" Add list of objects assuming they have a custom write function """
for i, obj in enumerate(objects):
temp_file = tempfile.TemporaryFile()
obj.write(temp_file)
temp_file.seek(0)
fname = '{}_{}'.format(arcname, i)
tar_info = tar_file.gettarinfo(arcname=fname, fileobj=temp_file)
tar_file.addfile(tar_info, temp_file)
temp_file.close()
def read_items_hdf5(tar_file, arcname):
""" Read items from hdf5file inside tar_file """
# read hdf5
temp_file = tempfile.NamedTemporaryFile()
temp_file.write(tar_file.extractfile(arcname).read())
temp_file.seek(0)
hf = h5py.File(temp_file.name, 'r')
items = {key: value[:] for key, value in hf.items()}
hf.close()
return items
def read_items_pickle(tar_file, arcname):
items = dict()
items = pickle.load(tar_file.extractfile(arcname))
return items
def read_list_custom(tar_file, arcname, read_function, **kwargs):
objects = []
i = 0
while True:
try:
fname = '{}_{}'.format(arcname, i)
f = tar_file.extractfile(fname)
obj = read_function(f, **kwargs)
objects.append(obj)
f.close()
except KeyError:
break
i += 1
return objects
"""
The ``io.shengBTE`` module provides functions for reading and writing data
files in shengBTE format.
"""
import numpy as np
from itertools import product
from collections import namedtuple
from ..core.structures import Supercell
import hiphive
def read_shengBTE_fc3(filename, prim, supercell, symprec=1e-5):
"""Reads third-order force constants file in shengBTE format.
Parameters
----------
filename : str
input file name
prim : ase.Atoms
primitive cell for the force constants
supercell : ase.Atoms
supercell onto which to map force constants
symprec : float
structural symmetry tolerance
Returns
-------
fcs : ForceConstants
third order force constants for the specified supercell
"""
raw_sheng = _read_raw_sheng(filename)
sheng = _raw_to_fancy(raw_sheng, prim.cell)
fcs = _sheng_to_fcs(sheng, prim, supercell, symprec)
return fcs
def write_shengBTE_fc3(filename, fcs, prim, symprec=1e-5, cutoff=np.inf,
fc_tol=1e-8):
"""Writes third-order force constants file in shengBTE format.
Parameters
-----------
filename : str
input file name
fcs : ForceConstants
force constants; the supercell associated with this object must be
based on prim
prim : ase.Atoms
primitive configuration (must be equivalent to structure used in the
shengBTE calculation)
symprec : float
structural symmetry tolerance
cutoff : float
all atoms in cluster must be within this cutoff
fc_tol : float
if the absolute value of the largest entry in a force constant is less
than fc_tol it will not be written
"""
sheng = _fcs_to_sheng(fcs, prim, symprec, cutoff, fc_tol)
raw_sheng = _fancy_to_raw(sheng)
_write_raw_sheng(raw_sheng, filename)
def _read_raw_sheng(filename):
""" Read shengBTE fc3 file.
Parameters
----------
filename : str
input file name
Returns
-------
list
list with shengBTE block entries, where an entry consist of
[i, j, k, cell_pos2, cell_pos3, fc3]
"""
lines_per_fc_block = 32
fc3_data_shengBTE = []
with open(filename, 'r') as f:
lines = f.readlines()
num_fcs = int(lines[0])
lind = 2
for i in range(1, num_fcs+1):
# sanity check
assert int(lines[lind]) == i, (int(lines[lind]), i)
# read cell poses
cell_pos2 = np.array([float(fld) for fld in lines[lind+1].split()])
cell_pos3 = np.array([float(fld) for fld in lines[lind+2].split()])
# read basis indices
i, j, k = (int(fld) for fld in lines[lind+3].split())
# read fc
fc3_ijk = np.zeros((3, 3, 3))
for n in range(27):
x, y, z = [int(fld) - 1 for fld in lines[lind+4+n].split()[:3]]
fc3_ijk[x, y, z] = float(lines[lind+4+n].split()[-1])
# append entry
entry = [i, j, k, cell_pos2, cell_pos3, fc3_ijk]
fc3_data_shengBTE.append(entry)
lind += lines_per_fc_block
return fc3_data_shengBTE
_ShengEntry = namedtuple('Entry', ['site_0', 'site_1', 'site_2', 'pos_1',
'pos_2', 'fc', 'offset_1', 'offset_2'])
def _raw_to_fancy(raw_sheng, cell):
"""
Converts force constants as read from shengBTE file to the namedtuple
format defined above (_ShengEntry).
"""
sheng = []
for raw_entry in raw_sheng:
p1, p2 = raw_entry[3:5]
offset_1 = np.linalg.solve(cell.T, p1).round(0).astype(int)
offset_2 = np.linalg.solve(cell.T, p2).round(0).astype(int)
entry = _ShengEntry(*(i - 1 for i in raw_entry[:3]), *raw_entry[3:],
offset_1, offset_2)
sheng.append(entry)
return sheng
def _fancy_to_raw(sheng):
"""
Converts force constants namedtuple format defined above (_ShengEntry) to
format used for writing shengBTE files.
"""
raw_sheng = []
for entry in sheng:
raw_entry = list(entry[:6])
raw_entry[0] += 1
raw_entry[1] += 1
raw_entry[2] += 1
raw_sheng.append(raw_entry)
return raw_sheng
def _write_raw_sheng(raw_sheng, filename):
""" See corresponding read function. """
with open(filename, 'w') as f:
f.write('{}\n\n'.format(len(raw_sheng)))
for index, fc3_row in enumerate(raw_sheng, start=1):
i, j, k, cell_pos2, cell_pos3, fc3_ijk = fc3_row
f.write('{:5d}\n'.format(index))
f.write((3*'{:14.10f} '+'\n').format(*cell_pos2))
f.write((3*'{:14.10f} '+'\n').format(*cell_pos3))
f.write((3*'{:5d}'+'\n').format(i, j, k))
for x, y, z in product(range(3), repeat=3):
f.write((3*' {:}').format(x+1, y+1, z+1))
f.write(' {:14.10f}\n'.format(fc3_ijk[x, y, z]))
f.write('\n')
def _fcs_to_sheng(fcs, prim, symprec, cutoff, fc_tol):
""" Produces a list containing fcs in shengBTE format
prim and fcs.supercell must be aligned.
"""
supercell = Supercell(fcs.supercell, prim, symprec)
assert all(fcs.supercell.pbc) and all(prim.pbc)
n_atoms = len(supercell)
D = fcs.supercell.get_all_distances(mic=False, vector=True)
D_mic = fcs.supercell.get_all_distances(mic=True, vector=True)
M = np.eye(n_atoms, dtype=bool)
for i in range(n_atoms):
for j in range(i + 1, n_atoms):
M[i, j] = (np.allclose(D[i, j], D_mic[i, j], atol=symprec, rtol=0)
and np.linalg.norm(D[i, j]) < cutoff)
M[j, i] = M[i, j]
data = {}
for a0 in supercell:
for a1 in supercell:
if not M[a0.index, a1.index]:
continue
for a2 in supercell:
if not (M[a0.index, a2.index] and M[a1.index, a2.index]):
continue
offset_1 = np.subtract(a1.offset, a0.offset)
offset_2 = np.subtract(a2.offset, a0.offset)
sites = (a0.site, a1.site, a2.site)
key = sites + tuple(offset_1) + tuple(offset_2)
ijk = (a0.index, a1.index, a2.index)
fc = fcs[ijk]
if key in data:
assert np.allclose(data[key], fc, atol=fc_tol)
else:
data[key] = fc
sheng = []
for k, fc in data.items():
if np.max(np.abs(fc)) < fc_tol:
continue
offset_1 = k[3:6]
pos_1 = np.dot(offset_1, prim.cell)
offset_2 = k[6:9]
pos_2 = np.dot(offset_2, prim.cell)
entry = _ShengEntry(*k[:3], pos_1, pos_2, fc, offset_1, offset_2)
sheng.append(entry)
return sheng
def _sheng_to_fcs(sheng, prim, supercell, symprec):
supercell_map = Supercell(supercell, prim, symprec)
fc_array = np.zeros((len(supercell),) * 3 + (3,) * 3)
mapped_clusters = np.zeros((len(supercell),) * 3, dtype=bool)
for atom in supercell_map:
i = atom.index
for entry in sheng:
if atom.site != entry.site_0:
continue
j = supercell_map.index(entry.site_1, entry.offset_1 + atom.offset)
k = supercell_map.index(entry.site_2, entry.offset_2 + atom.offset)
ijk = (i, j, k)
if mapped_clusters[ijk]:
raise Exception('Ambiguous force constant.'
' Supercell is too small')
fc_array[ijk] = entry.fc
mapped_clusters[ijk] = True
fcs = hiphive.force_constants.ForceConstants.from_arrays(
supercell, fc3_array=fc_array)
return fcs