hiPhive
Advanced tools
| from typing import Dict, List | ||
| import numpy as np | ||
| from sklearn.metrics import r2_score | ||
| def get_model_metrics(A: np.ndarray, | ||
| parameters: List[float], | ||
| y_target: np.ndarray, | ||
| y_predicted: np.ndarray) -> Dict[str, float]: | ||
| """Computes model parameters and model evaluation metrics including | ||
| * root-mean square-error (RMSE) over training set | ||
| * R^2 score over training set (coefficient of determination, regression score function) | ||
| * Akaike information criterion (AIC) | ||
| * Bayesian information criterion (BIC) | ||
| * leave-one-out cross-validation (LOO-CV, estimate) | ||
| * number of non-zero parameters (n_nzp) | ||
| * number of parameters (n_parameters) | ||
| Parameters | ||
| ---------- | ||
| A | ||
| fit matrix used to train model | ||
| parameters | ||
| model parameters | ||
| y_target | ||
| target values used to train model | ||
| y_predicted | ||
| predicted values from model | ||
| """ | ||
| n_samples, n_parameters_tot = A.shape | ||
| n_parameters = np.count_nonzero(parameters) | ||
| # compute rmse | ||
| delta_y = y_predicted - y_target | ||
| mse = np.mean(delta_y**2) | ||
| rmse = np.sqrt(mse) | ||
| # evaluate Information Criterias | ||
| aic = get_aic(mse, n_samples, n_parameters) | ||
| bic = get_bic(mse, n_samples, n_parameters) | ||
| # r2 score | ||
| r2 = r2_score(y_target, y_predicted) | ||
| # summarize | ||
| metrics = dict(rmse_train=rmse, | ||
| R2_train=r2, | ||
| AIC=aic, | ||
| BIC=bic, | ||
| n_nonzero_parameters=n_parameters) | ||
| return metrics | ||
| def get_aic(mse: float, n_samples: int, n_parameters: int) -> float: | ||
| """Returns the Akaiki information criterion (AIC).""" | ||
| aic = n_samples * np.log(mse) + 2 * n_parameters | ||
| return aic | ||
| def get_bic(mse: float, n_samples: int, n_parameters: int) -> float: | ||
| """Returns the Bayesian information criterion (BIC).""" | ||
| bic = n_samples * np.log(mse) + n_parameters * np.log(n_samples) | ||
| return bic | ||
| def estimate_loocv(A: np.ndarray, | ||
| y_target: np.ndarray, | ||
| y_predicted: np.ndarray) -> float: | ||
| """Calculates the approximative leave-one-out cross-validation | ||
| (LOO-CV) root mean square error (RMSE). | ||
| Parameters | ||
| ---------- | ||
| A | ||
| Matrix in OLS problem y=Ax, should be inversible | ||
| y_target | ||
| Target values for y | ||
| y_predicted | ||
| OLS obtained prediction for y | ||
| """ | ||
| if len(A[1, :]) > len(A[:, 1]): | ||
| raise ValueError('Matrix is underdetermined') | ||
| H = A.dot(np.linalg.inv(A.T.dot(A))).dot(A.T) | ||
| e = (y_target - y_predicted) / (1 - np.diag(H)) | ||
| return np.linalg.norm(e) / np.sqrt(len(e)) |
| import pickle | ||
| def _write_pickle(fname, data): | ||
| """ Write data to pickle file with filename fname """ | ||
| with open(fname, 'wb') as handle: | ||
| pickle.dump(data, handle, protocol=pickle.HIGHEST_PROTOCOL) | ||
| def _read_pickle(fname): | ||
| """ Read pickle file and return content """ | ||
| with open(fname, 'rb') as handle: | ||
| return pickle.load(handle) |
| import numpy as np | ||
| from itertools import product | ||
| def write_fcs_gpumd(fname_fc, fname_clusters, fcs, order, tol=1e-10): | ||
| """ | ||
| Writes force constants of given order in GPUMD format. | ||
| Parameters | ||
| ---------- | ||
| fname_fc : str | ||
| name of file which contains the lookup force constants | ||
| fname_clusters : str | ||
| name of file which contains the clusters and the fc lookup index | ||
| fcs : ForceConstants | ||
| force constants | ||
| order : int | ||
| force constants for this order will be written to file | ||
| tol : float | ||
| if the norm of a force constant is less than tol then it is not written. | ||
| if two force-constants are within tol; they are considered equal. | ||
| """ | ||
| cluster_lookup, fc_lookup = _get_lookup_data_smart(fcs, order, tol) | ||
| _write_clusters(fname_clusters, cluster_lookup, order) | ||
| _write_fc_lookup(fname_fc, fc_lookup, order) | ||
| def _write_fc_lookup(fname, fc_lookup, order): | ||
| """ Writes the lookup force constants to file """ | ||
| fmt = '{}' + ' {}'*order | ||
| with open(fname, 'w') as f: | ||
| f.write(str(len(fc_lookup)) + '\n\n') | ||
| for fc in fc_lookup: | ||
| for xyz in product(range(3), repeat=order): | ||
| f.write(fmt.format(*xyz, fc[xyz])+'\n') | ||
| f.write('\n') | ||
| def _write_clusters(fname, cluster_lookup, order): | ||
| """ Writes the cluster lookup to file """ | ||
| fmt = '{}' + ' {}'*order | ||
| with open(fname, 'w') as f: | ||
| f.write(str(len(cluster_lookup)) + '\n\n') | ||
| for c, i in cluster_lookup.items(): | ||
| line = fmt.format(*c, i) + '\n' | ||
| f.write(line) | ||
| def _get_lookup_data_smart(fcs, order, tol): | ||
| """ Groups force constants for a given order into groups for which the | ||
| force constant is identical. """ | ||
| fc_lookup = [] | ||
| cluster_lookup = dict() | ||
| axis = tuple(range(1, order+1)) | ||
| clusters = [c for c in fcs._fc_dict.keys() if len(c) == order and np.linalg.norm(fcs[c]) > tol] | ||
| fc_all = np.array([fcs[c] for c in clusters]) | ||
| indices = list(range(len(clusters))) | ||
| while len(indices) > 0: | ||
| i = indices[0] | ||
| delta = fc_all[indices] - fc_all[i] | ||
| delta_norm = np.sqrt(np.sum(delta**2, axis=axis)) | ||
| inds_to_del = [indices[x] for x in np.where(delta_norm < tol)[0]] | ||
| assert i in inds_to_del | ||
| fc_lookup.append(fc_all[i]) | ||
| for j in inds_to_del: | ||
| indices.remove(j) | ||
| cluster_lookup[clusters[j]] = len(fc_lookup)-1 | ||
| return cluster_lookup, fc_lookup | ||
| def _get_lookup_data_naive(fcs, order, tol): | ||
| """ Groups force constants for a given order into groups for which the | ||
| force constant is identical. """ | ||
| fc_lookup = [] | ||
| cluster_lookup = dict() | ||
| clusters = [c for c in fcs._fc_dict.keys() if len(c) == order] | ||
| for c in clusters: | ||
| fc1 = fcs[c] | ||
| if np.linalg.norm(fc1) < tol: | ||
| continue | ||
| for i, fc2 in enumerate(fc_lookup): | ||
| if np.linalg.norm(fc1 - fc2) < tol: | ||
| cluster_lookup[c] = i | ||
| break | ||
| else: | ||
| cluster_lookup[c] = len(fc_lookup) | ||
| fc_lookup.append(fc1) | ||
| return cluster_lookup, fc_lookup | ||
| def write_fcp_txt(fname, path, n_types, max_order): | ||
| """ Write driver potential file for GPUMD. | ||
| Parameters | ||
| ---------- | ||
| fname : str | ||
| file name | ||
| path : str | ||
| path to directory with force constant file | ||
| n_types : int | ||
| number of atom types | ||
| max_order : int | ||
| maximum order of the force constant potential | ||
| Format | ||
| ------ | ||
| Format is a simple file containing the following | ||
| fcp number_of_atom_types | ||
| highest_order | ||
| path_to_force_constant_files | ||
| which in practice for a binary system with a sixth order model would mean | ||
| fcp 2 | ||
| 6 | ||
| /path/to/your/folder | ||
| """ | ||
| with open(fname, 'w') as f: | ||
| f.write('fcp {}\n'.format(n_types)) | ||
| f.write('{}\n'.format(max_order)) | ||
| f.write('{}'.format(path.rstrip('/'))) # without a trailing '/' | ||
| def write_r0(fname, atoms): | ||
| """ | ||
| Write GPUMD r0 file, with reference atomic positions. | ||
| Parameters | ||
| ---------- | ||
| fname : str | ||
| name of file to which to write the atomic positions | ||
| atoms : ase.Atoms | ||
| input structure | ||
| """ | ||
| line = '{} {} {}\n' | ||
| with open(fname, 'w') as f: | ||
| for a in atoms: | ||
| f.write(line.format(*a.position)) |
| """ This module contains functions and variables to control hiPhive's logging | ||
| * `logger` - the module logger | ||
| * set_config - function to control level and logfile | ||
| """ | ||
| import logging | ||
| import sys | ||
| from timeit import default_timer as timer | ||
| # This is the root logger of hiPhive | ||
| logger = logging.getLogger('hiphive') | ||
| # Will process all levels of INFO or higher | ||
| logger.setLevel(logging.INFO) | ||
| # If you know what you are doing you may set this to True | ||
| logger.propagate = False | ||
| # The hiPhive logger will collect events from childs and the default behaviour | ||
| # is to print it directly to stdout | ||
| ch = logging.StreamHandler(sys.stdout) | ||
| logger.addHandler(ch) | ||
| continuous_logging = False | ||
| # TODO: use Context management protocol instead | ||
| class Progress: | ||
| """ Progress bar like functionality. """ | ||
| def __init__(self, tot=None, mode='frac', estimate_remaining=True): | ||
| if tot is None: | ||
| self._tot = '?' | ||
| assert not estimate_remaining | ||
| else: | ||
| self._tot = tot | ||
| self._progress = 0 | ||
| self._estimate_remaining = estimate_remaining | ||
| self._last_update = 0 | ||
| self._start = timer() | ||
| def tick(self): | ||
| self._progress += 1 | ||
| delta = timer() - self._last_update | ||
| if continuous_logging and delta > 2: | ||
| self._last_update = timer() | ||
| print('\r' + ' ' * 70 + '\r', end='', flush=True) | ||
| print('{}/{}={:.3%}'.format(self._progress, | ||
| self._tot, self._progress/self._tot), end='', flush=True) | ||
| if self._estimate_remaining and self._tot != '?': | ||
| remaining_time = (self._tot - self._progress) * ( | ||
| timer() - self._start) / self._progress | ||
| print(' time remaining: {:.3}'.format(remaining_time), end='', | ||
| flush=True) | ||
| def close(self): | ||
| if continuous_logging: | ||
| print('\r' + ' ' * 70 + '\r', end='', flush=True) | ||
| s = timer() - self._start | ||
| d, remainder = divmod(s, 60*60*24) | ||
| h, remainder = divmod(remainder, 60*60) | ||
| m, s = divmod(remainder, 60) | ||
| logger.info('Done in {}d {}h {}m {:.3}s' | ||
| .format(int(d), int(h), int(m), s)) | ||
| def set_config(filename=None, level=None, continuous=None): | ||
| """ | ||
| Alters the way logging is handled. | ||
| Parameters | ||
| ---------- | ||
| filename : str | ||
| name of file the log should be sent to | ||
| level : int | ||
| verbosity level; see `Python logging library | ||
| <https://docs.python.org/3/library/logging.html>`_ for details | ||
| continuous : bool | ||
| if True the progress will be continously updated | ||
| """ | ||
| # If a filename is provided a logfile will be created | ||
| if filename is not None: | ||
| fh = logging.FileHandler(filename) | ||
| logger.addHandler(fh) | ||
| if level is not None: | ||
| logger.setLevel(level) | ||
| if continuous is not None: | ||
| global continuous_logging | ||
| continuous_logging = continuous |
| """ | ||
| This module provides functions for reading and writing data files | ||
| in phonopy and phono3py formats. | ||
| """ | ||
| import os | ||
| import warnings | ||
| from itertools import product | ||
| import hiphive | ||
| import numpy as np | ||
| from .logging_tools import logger | ||
| with warnings.catch_warnings(): | ||
| warnings.filterwarnings('ignore', category=FutureWarning) | ||
| import h5py | ||
| logger = logger.getChild('io_phonopy') | ||
| def _filename_to_format(filename: str) -> str: | ||
| """ Tries to guess the format from the filename """ | ||
| basename = os.path.basename(filename) | ||
| if basename == 'FORCE_CONSTANTS': | ||
| return 'text' | ||
| if '.' in basename: | ||
| extension = basename.split('.')[-1] | ||
| if extension == 'hdf5': | ||
| return 'hdf5' | ||
| raise ValueError('Could not guess file format') | ||
| def read_phonopy_fc2(filename: str, format: str = None) -> np.ndarray: | ||
| """Parses a second-order force constant file in phonopy format. | ||
| Parameters | ||
| ---------- | ||
| filename : str | ||
| input file name | ||
| format : str | ||
| specify the file-format, if None tries to guess the format from filename; | ||
| possible values: "text", "hdf5" | ||
| Returns | ||
| ------- | ||
| numpy.ndarray | ||
| second-order force constant matrix (`N,N,3,3` array) | ||
| where `N` is the number of atoms | ||
| """ | ||
| fc2_readers = { | ||
| 'text': _read_phonopy_fc2_text, | ||
| 'hdf5': _read_phonopy_fc2_hdf5 | ||
| } | ||
| if format is None: | ||
| format = _filename_to_format(filename) | ||
| if format not in fc2_readers.keys(): | ||
| raise ValueError('Did not recognize format {}'.format(format)) | ||
| return fc2_readers[format](filename) | ||
| def write_phonopy_fc2(filename: str, | ||
| fc2, | ||
| format: str = None) -> None: | ||
| """Writes second-order force constant matrix in phonopy format. | ||
| Parameters | ||
| ---------- | ||
| filename : str | ||
| output file name | ||
| fc2 : ForceConstants or numpy.ndarray | ||
| second-order force constant matrix | ||
| format : str | ||
| specify the file-format, if None try to guess the format from filename; | ||
| possible values: "text", "hdf5" | ||
| """ | ||
| fc2_writers = { | ||
| 'text': _write_phonopy_fc2_text, | ||
| 'hdf5': _write_phonopy_fc2_hdf5 | ||
| } | ||
| # get fc2_array | ||
| if isinstance(fc2, hiphive.ForceConstants): | ||
| fc2_array = fc2.get_fc_array(order=2) | ||
| elif isinstance(fc2, np.ndarray): | ||
| fc2_array = fc2 | ||
| else: | ||
| raise TypeError('fc2 should be ForceConstants or NumPy array') | ||
| # check that fc2 has correct shape | ||
| n_atoms = fc2_array.shape[0] | ||
| if fc2_array.shape != (n_atoms, n_atoms, 3, 3): | ||
| raise ValueError('fc2 has wrong shape') | ||
| # write | ||
| if format is None: | ||
| format = _filename_to_format(filename) | ||
| if format not in fc2_writers.keys(): | ||
| raise ValueError('Did not recognize format {}'.format(format)) | ||
| fc2_writers[format](filename, fc2_array) | ||
| def read_phonopy_fc3(filename: str) -> np.ndarray: | ||
| """Parses a third order force constant file in phonopy hdf5 format. | ||
| Parameters | ||
| ---------- | ||
| filename : str | ||
| input file name | ||
| Returns | ||
| ------- | ||
| numpy.ndarray | ||
| third order force constant matrix | ||
| """ | ||
| with h5py.File(filename, 'r') as hf: | ||
| if 'fc3' in hf.keys(): | ||
| fc3 = hf['fc3'][:] | ||
| else: | ||
| raise IOError('Could not find fc3 in file {}'.format(filename)) | ||
| return fc3 | ||
| def write_phonopy_fc3(filename: str, fc3) -> None: | ||
| """Writes third order force constant matrix in phonopy hdf5 format. | ||
| Parameters | ||
| ---------- | ||
| filename : str | ||
| output file name | ||
| fc3 : ForceConstants or numpy.ndarray | ||
| third order force constant matrix | ||
| """ | ||
| if isinstance(fc3, hiphive.ForceConstants): | ||
| fc3_array = fc3.get_fc_array(order=3) | ||
| elif isinstance(fc3, np.ndarray): | ||
| fc3_array = fc3 | ||
| else: | ||
| raise TypeError('fc3 should be ForceConstants or NumPy array') | ||
| # check that fc3 has correct shape | ||
| n_atoms = fc3_array.shape[0] | ||
| if fc3_array.shape != (n_atoms, n_atoms, n_atoms, 3, 3, 3): | ||
| raise ValueError('fc3 has wrong shape') | ||
| with h5py.File(filename, 'w') as hf: | ||
| hf.create_dataset('fc3', data=fc3_array, compression='gzip') | ||
| hf.flush() | ||
| def _read_phonopy_fc2_text(filename: str) -> np.ndarray: | ||
| """ Reads phonopy-fc2 file in text format. """ | ||
| with open(filename, 'r') as f: | ||
| # read shape of force constants | ||
| line = f.readline() | ||
| line_ints = [int(x) for x in line.split()] | ||
| if len(line_ints) == 1: | ||
| n_atoms = line_ints[0] | ||
| elif len(line_ints) == 2: | ||
| assert line_ints[0] == line_ints[1] | ||
| n_atoms = line_ints[0] | ||
| else: | ||
| raise ValueError('Unsupported or incorrect phonopy format') | ||
| fc2 = np.full((n_atoms, n_atoms, 3, 3), np.nan) | ||
| # read force constants | ||
| lines = f.readlines() | ||
| for n, line in enumerate(lines): | ||
| flds = line.split() | ||
| if len(flds) == 2: | ||
| i = int(flds[0]) - 1 # phonopy index starts with 1 | ||
| j = int(flds[1]) - 1 | ||
| for x in range(3): | ||
| fc_row = lines[n + x + 1].split() | ||
| for y in range(3): | ||
| fc2[i][j][x][y] = float(fc_row[y]) | ||
| return fc2 | ||
| def _read_phonopy_fc2_hdf5(filename: str) -> np.ndarray: | ||
| """ Reads phonopy-fc2 file in hdf5 format. """ | ||
| with h5py.File(filename, 'r') as hf: | ||
| if 'force_constants' in hf.keys(): | ||
| fc2 = hf['force_constants'][:] | ||
| elif 'fc2' in hf.keys(): | ||
| fc2 = hf['fc2'][:] | ||
| else: | ||
| raise IOError('Could not find fc2 in file {}'.format(filename)) | ||
| return fc2 | ||
| def _write_phonopy_fc2_text(filename: str, fc2: np.ndarray) -> None: | ||
| """ Writes second-order force constants to file in text format. """ | ||
| n_atoms = fc2.shape[0] | ||
| with open(filename, 'w') as f: | ||
| f.write('{:5d} {:5d}\n'.format(n_atoms, n_atoms)) | ||
| for i, j in product(range(n_atoms), repeat=2): | ||
| fc2_ij = fc2[(i, j)] | ||
| if fc2_ij.shape != (3, 3): | ||
| raise ValueError('Invalid shape for fc2[({},{})]'.format(i, j)) | ||
| f.write('{:-5d}{:5d}\n'.format(i + 1, j + 1)) | ||
| for row in fc2_ij: | ||
| f.write((3*' {:22.15f}'+'\n').format(*tuple(row))) | ||
| def _write_phonopy_fc2_hdf5(filename: str, fc2: np.ndarray) -> None: | ||
| """ Writes second-order force constants to file in hdf5 format. """ | ||
| with h5py.File(filename, 'w') as hf: | ||
| hf.create_dataset('fc2', data=fc2, compression='gzip') | ||
| hf.flush() |
| from itertools import product | ||
| import numpy as np | ||
| def _obj2str(a, none_char='-'): | ||
| """ Casts object a to str. """ | ||
| if isinstance(a, float): | ||
| # if the float is 2.49999999 then round | ||
| if str(a)[::-1].find('.') > 5: | ||
| return '{:.5f}'.format(a) | ||
| elif a is None: | ||
| return none_char | ||
| return str(a) | ||
| _array2str = np.vectorize(_obj2str) | ||
| def print_table(matrix, sum_=False): | ||
| """ Prints matrix data in a nice table format. | ||
| The matrix element matrix[i][j] should correspond to information about | ||
| order j+2 and n-body i+1. | ||
| Example | ||
| -------- | ||
| >> matrix = numpy.array([[None, None], [4.0, 3.0]]) | ||
| >> print_table(matrix) | ||
| body/order | 2 | 3 | ||
| ------------------------ | ||
| 1 | - | - | ||
| 2 | 4.0 | 3.0 | ||
| Parameters | ||
| ---------- | ||
| matrix : numpy.ndarray | ||
| matrix to be printed | ||
| sum: : bool | ||
| whether or not to print the sum along each row and column | ||
| """ | ||
| table_str = table_array_to_string(matrix, sum_) | ||
| print(table_str) | ||
| def table_array_to_string(matrix, sum_=False): | ||
| """ Generate nice table string from a numpy array with floats/ints """ | ||
| table_array = _generate_table_array(matrix, sum_) | ||
| table_array_str = _array2str(table_array) | ||
| table_str = _generate_table_str(table_array_str) | ||
| return table_str | ||
| def _generate_table_array(table_array, sum_=False): | ||
| """ Generate table in numpy array format """ | ||
| # initialze table | ||
| n_rows, n_cols = table_array.shape | ||
| A = _build_table_frame(order=n_cols+1, nbody=n_rows, sum_=sum_) | ||
| # fill table | ||
| for order, nbody in product(range(2, n_cols+2), range(1, n_rows+1)): | ||
| if nbody <= order: | ||
| A[nbody, order-1] = table_array[nbody-1, order-2] | ||
| if sum_: | ||
| for i, row in enumerate(A[1:-1, 1:-1], start=1): | ||
| A[i, -1] = sum(val for val in row if val is not None) | ||
| for i, col in enumerate(A[1:-1, 1:-1].T, start=1): | ||
| A[-1, i] = sum(val for val in col if val is not None) | ||
| A[-1, -1] = '' | ||
| return A | ||
| def _generate_table_str(table_array): | ||
| """ Generate a string from a numpy array of strings """ | ||
| table_str = [] | ||
| n_rows, n_cols = table_array.shape | ||
| # find maximum widths for each column | ||
| widths = [] | ||
| for i in range(n_cols): | ||
| widths.append(max(len(val) for val in table_array[:, i])+2) | ||
| # formatting str for each row | ||
| row_format = '|'.join('{:^'+str(width)+'}' for width in widths) | ||
| # finalize | ||
| for i in range(n_rows): | ||
| if i == 1: | ||
| table_str.append('-' * (sum(widths)+n_cols-1)) | ||
| table_str.append(row_format.format(*table_array[i, :])) | ||
| table_str = '\n'.join(table_str) | ||
| return table_str | ||
| def _build_table_frame(order, nbody, sum_=False): | ||
| """ Builds/initializes table/array. """ | ||
| if sum_: | ||
| A = np.empty((nbody+2, order+1), dtype='object') | ||
| A[0, -1] = 'sum' | ||
| A[-1, 0] = 'sum' | ||
| else: | ||
| A = np.empty((nbody+1, order), dtype='object') | ||
| A[0][0] = 'body/order' | ||
| A[0, 1:order] = range(2, order+1) | ||
| A[1:nbody+1, 0] = range(1, nbody+1) | ||
| return A | ||
| if __name__ == "__main__": | ||
| # input dummy cutoff table | ||
| # insert row for nbody=1 | ||
| cutoffs = np.array([[None, None, None, None, None], | ||
| [6.0, 6.0, 6.0, 3.7, 3.7], | ||
| [5.0, 5.0, 5.0, 3.0, 3.0], | ||
| [3.7, 3.7, 3.7, 0.0, 0.0]]) | ||
| # input dummy cluster count table | ||
| cluster_counts = np.array([[1, 3, 5, 5, 2], | ||
| [12, 22, 39, 42, 58], | ||
| [19, 41, 123, 421, 912], | ||
| [42, 112, 410, 617, 3271]]) | ||
| print_table(cutoffs) | ||
| print('\n') | ||
| print_table(cluster_counts, sum_=False) | ||
| print('\n') | ||
| print_table(cluster_counts, sum_=True) |
| """ | ||
| Helper functions for reading and writing objects to tar files | ||
| """ | ||
| import pickle | ||
| import tempfile | ||
| import warnings | ||
| import ase.io as aseIO | ||
| with warnings.catch_warnings(): | ||
| warnings.filterwarnings('ignore', category=FutureWarning) | ||
| import h5py | ||
| def add_ase_atoms_to_tarfile(tar_file, atoms, arcname, format='json'): | ||
| """ Adds an ase.Atoms object to tar_file """ | ||
| temp_file = tempfile.NamedTemporaryFile() | ||
| aseIO.write(temp_file.name, atoms, format=format) | ||
| temp_file.seek(0) | ||
| tar_info = tar_file.gettarinfo(arcname=arcname, fileobj=temp_file) | ||
| tar_file.addfile(tar_info, temp_file) | ||
| def read_ase_atoms_from_tarfile(tar_file, arcname, format='json'): | ||
| """ Reads ase.Atoms from tar file """ | ||
| temp_file = tempfile.NamedTemporaryFile() | ||
| temp_file.write(tar_file.extractfile(arcname).read()) | ||
| temp_file.seek(0) | ||
| atoms = aseIO.read(temp_file.name, format=format) | ||
| return atoms | ||
| def add_items_to_tarfile_hdf5(tar_file, items, arcname): | ||
| """ Add items to one hdf5 file """ | ||
| temp_file = tempfile.NamedTemporaryFile() | ||
| hf = h5py.File(temp_file.name, 'w') | ||
| for key, value in items.items(): | ||
| hf.create_dataset(key, data=value, compression='gzip') | ||
| hf.close() | ||
| temp_file.seek(0) | ||
| tar_info = tar_file.gettarinfo(arcname=arcname, fileobj=temp_file) | ||
| tar_file.addfile(tar_info, temp_file) | ||
| temp_file.close() | ||
| def add_items_to_tarfile_pickle(tar_file, items, arcname): | ||
| """ Add items by pickling them """ | ||
| temp_file = tempfile.TemporaryFile() | ||
| pickle.dump(items, temp_file) | ||
| temp_file.seek(0) | ||
| tar_info = tar_file.gettarinfo(arcname=arcname, fileobj=temp_file) | ||
| tar_file.addfile(tar_info, temp_file) | ||
| temp_file.close() | ||
| def add_items_to_tarfile_custom(tar_file, items): | ||
| """ Add items assuming they have a custom write function """ | ||
| for key, value in items.items(): | ||
| temp_file = tempfile.TemporaryFile() | ||
| value.write(temp_file) | ||
| temp_file.seek(0) | ||
| tar_info = tar_file.gettarinfo(arcname=key, fileobj=temp_file) | ||
| tar_file.addfile(tar_info, temp_file) | ||
| temp_file.close() | ||
| def add_list_to_tarfile_custom(tar_file, objects, arcname): | ||
| """ Add list of objects assuming they have a custom write function """ | ||
| for i, obj in enumerate(objects): | ||
| temp_file = tempfile.TemporaryFile() | ||
| obj.write(temp_file) | ||
| temp_file.seek(0) | ||
| fname = '{}_{}'.format(arcname, i) | ||
| tar_info = tar_file.gettarinfo(arcname=fname, fileobj=temp_file) | ||
| tar_file.addfile(tar_info, temp_file) | ||
| temp_file.close() | ||
| def read_items_hdf5(tar_file, arcname): | ||
| """ Read items from hdf5file inside tar_file """ | ||
| # read hdf5 | ||
| temp_file = tempfile.NamedTemporaryFile() | ||
| temp_file.write(tar_file.extractfile(arcname).read()) | ||
| temp_file.seek(0) | ||
| hf = h5py.File(temp_file.name, 'r') | ||
| items = {key: value[:] for key, value in hf.items()} | ||
| hf.close() | ||
| return items | ||
| def read_items_pickle(tar_file, arcname): | ||
| items = dict() | ||
| items = pickle.load(tar_file.extractfile(arcname)) | ||
| return items | ||
| def read_list_custom(tar_file, arcname, read_function, **kwargs): | ||
| objects = [] | ||
| i = 0 | ||
| while True: | ||
| try: | ||
| fname = '{}_{}'.format(arcname, i) | ||
| f = tar_file.extractfile(fname) | ||
| obj = read_function(f, **kwargs) | ||
| objects.append(obj) | ||
| f.close() | ||
| except KeyError: | ||
| break | ||
| i += 1 | ||
| return objects |
| """ | ||
| The ``io.shengBTE`` module provides functions for reading and writing data | ||
| files in shengBTE format. | ||
| """ | ||
| import numpy as np | ||
| from itertools import product | ||
| from collections import namedtuple | ||
| from ..core.structures import Supercell | ||
| import hiphive | ||
| def read_shengBTE_fc3(filename, prim, supercell, symprec=1e-5): | ||
| """Reads third-order force constants file in shengBTE format. | ||
| Parameters | ||
| ---------- | ||
| filename : str | ||
| input file name | ||
| prim : ase.Atoms | ||
| primitive cell for the force constants | ||
| supercell : ase.Atoms | ||
| supercell onto which to map force constants | ||
| symprec : float | ||
| structural symmetry tolerance | ||
| Returns | ||
| ------- | ||
| fcs : ForceConstants | ||
| third order force constants for the specified supercell | ||
| """ | ||
| raw_sheng = _read_raw_sheng(filename) | ||
| sheng = _raw_to_fancy(raw_sheng, prim.cell) | ||
| fcs = _sheng_to_fcs(sheng, prim, supercell, symprec) | ||
| return fcs | ||
| def write_shengBTE_fc3(filename, fcs, prim, symprec=1e-5, cutoff=np.inf, | ||
| fc_tol=1e-8): | ||
| """Writes third-order force constants file in shengBTE format. | ||
| Parameters | ||
| ----------- | ||
| filename : str | ||
| input file name | ||
| fcs : ForceConstants | ||
| force constants; the supercell associated with this object must be | ||
| based on prim | ||
| prim : ase.Atoms | ||
| primitive configuration (must be equivalent to structure used in the | ||
| shengBTE calculation) | ||
| symprec : float | ||
| structural symmetry tolerance | ||
| cutoff : float | ||
| all atoms in cluster must be within this cutoff | ||
| fc_tol : float | ||
| if the absolute value of the largest entry in a force constant is less | ||
| than fc_tol it will not be written | ||
| """ | ||
| sheng = _fcs_to_sheng(fcs, prim, symprec, cutoff, fc_tol) | ||
| raw_sheng = _fancy_to_raw(sheng) | ||
| _write_raw_sheng(raw_sheng, filename) | ||
| def _read_raw_sheng(filename): | ||
| """ Read shengBTE fc3 file. | ||
| Parameters | ||
| ---------- | ||
| filename : str | ||
| input file name | ||
| Returns | ||
| ------- | ||
| list | ||
| list with shengBTE block entries, where an entry consist of | ||
| [i, j, k, cell_pos2, cell_pos3, fc3] | ||
| """ | ||
| lines_per_fc_block = 32 | ||
| fc3_data_shengBTE = [] | ||
| with open(filename, 'r') as f: | ||
| lines = f.readlines() | ||
| num_fcs = int(lines[0]) | ||
| lind = 2 | ||
| for i in range(1, num_fcs+1): | ||
| # sanity check | ||
| assert int(lines[lind]) == i, (int(lines[lind]), i) | ||
| # read cell poses | ||
| cell_pos2 = np.array([float(fld) for fld in lines[lind+1].split()]) | ||
| cell_pos3 = np.array([float(fld) for fld in lines[lind+2].split()]) | ||
| # read basis indices | ||
| i, j, k = (int(fld) for fld in lines[lind+3].split()) | ||
| # read fc | ||
| fc3_ijk = np.zeros((3, 3, 3)) | ||
| for n in range(27): | ||
| x, y, z = [int(fld) - 1 for fld in lines[lind+4+n].split()[:3]] | ||
| fc3_ijk[x, y, z] = float(lines[lind+4+n].split()[-1]) | ||
| # append entry | ||
| entry = [i, j, k, cell_pos2, cell_pos3, fc3_ijk] | ||
| fc3_data_shengBTE.append(entry) | ||
| lind += lines_per_fc_block | ||
| return fc3_data_shengBTE | ||
| _ShengEntry = namedtuple('Entry', ['site_0', 'site_1', 'site_2', 'pos_1', | ||
| 'pos_2', 'fc', 'offset_1', 'offset_2']) | ||
| def _raw_to_fancy(raw_sheng, cell): | ||
| """ | ||
| Converts force constants as read from shengBTE file to the namedtuple | ||
| format defined above (_ShengEntry). | ||
| """ | ||
| sheng = [] | ||
| for raw_entry in raw_sheng: | ||
| p1, p2 = raw_entry[3:5] | ||
| offset_1 = np.linalg.solve(cell.T, p1).round(0).astype(int) | ||
| offset_2 = np.linalg.solve(cell.T, p2).round(0).astype(int) | ||
| entry = _ShengEntry(*(i - 1 for i in raw_entry[:3]), *raw_entry[3:], | ||
| offset_1, offset_2) | ||
| sheng.append(entry) | ||
| return sheng | ||
| def _fancy_to_raw(sheng): | ||
| """ | ||
| Converts force constants namedtuple format defined above (_ShengEntry) to | ||
| format used for writing shengBTE files. | ||
| """ | ||
| raw_sheng = [] | ||
| for entry in sheng: | ||
| raw_entry = list(entry[:6]) | ||
| raw_entry[0] += 1 | ||
| raw_entry[1] += 1 | ||
| raw_entry[2] += 1 | ||
| raw_sheng.append(raw_entry) | ||
| return raw_sheng | ||
| def _write_raw_sheng(raw_sheng, filename): | ||
| """ See corresponding read function. """ | ||
| with open(filename, 'w') as f: | ||
| f.write('{}\n\n'.format(len(raw_sheng))) | ||
| for index, fc3_row in enumerate(raw_sheng, start=1): | ||
| i, j, k, cell_pos2, cell_pos3, fc3_ijk = fc3_row | ||
| f.write('{:5d}\n'.format(index)) | ||
| f.write((3*'{:14.10f} '+'\n').format(*cell_pos2)) | ||
| f.write((3*'{:14.10f} '+'\n').format(*cell_pos3)) | ||
| f.write((3*'{:5d}'+'\n').format(i, j, k)) | ||
| for x, y, z in product(range(3), repeat=3): | ||
| f.write((3*' {:}').format(x+1, y+1, z+1)) | ||
| f.write(' {:14.10f}\n'.format(fc3_ijk[x, y, z])) | ||
| f.write('\n') | ||
| def _fcs_to_sheng(fcs, prim, symprec, cutoff, fc_tol): | ||
| """ Produces a list containing fcs in shengBTE format | ||
| prim and fcs.supercell must be aligned. | ||
| """ | ||
| supercell = Supercell(fcs.supercell, prim, symprec) | ||
| assert all(fcs.supercell.pbc) and all(prim.pbc) | ||
| n_atoms = len(supercell) | ||
| D = fcs.supercell.get_all_distances(mic=False, vector=True) | ||
| D_mic = fcs.supercell.get_all_distances(mic=True, vector=True) | ||
| M = np.eye(n_atoms, dtype=bool) | ||
| for i in range(n_atoms): | ||
| for j in range(i + 1, n_atoms): | ||
| M[i, j] = (np.allclose(D[i, j], D_mic[i, j], atol=symprec, rtol=0) | ||
| and np.linalg.norm(D[i, j]) < cutoff) | ||
| M[j, i] = M[i, j] | ||
| data = {} | ||
| for a0 in supercell: | ||
| for a1 in supercell: | ||
| if not M[a0.index, a1.index]: | ||
| continue | ||
| for a2 in supercell: | ||
| if not (M[a0.index, a2.index] and M[a1.index, a2.index]): | ||
| continue | ||
| offset_1 = np.subtract(a1.offset, a0.offset) | ||
| offset_2 = np.subtract(a2.offset, a0.offset) | ||
| sites = (a0.site, a1.site, a2.site) | ||
| key = sites + tuple(offset_1) + tuple(offset_2) | ||
| ijk = (a0.index, a1.index, a2.index) | ||
| fc = fcs[ijk] | ||
| if key in data: | ||
| assert np.allclose(data[key], fc, atol=fc_tol) | ||
| else: | ||
| data[key] = fc | ||
| sheng = [] | ||
| for k, fc in data.items(): | ||
| if np.max(np.abs(fc)) < fc_tol: | ||
| continue | ||
| offset_1 = k[3:6] | ||
| pos_1 = np.dot(offset_1, prim.cell) | ||
| offset_2 = k[6:9] | ||
| pos_2 = np.dot(offset_2, prim.cell) | ||
| entry = _ShengEntry(*k[:3], pos_1, pos_2, fc, offset_1, offset_2) | ||
| sheng.append(entry) | ||
| return sheng | ||
| def _sheng_to_fcs(sheng, prim, supercell, symprec): | ||
| supercell_map = Supercell(supercell, prim, symprec) | ||
| fc_array = np.zeros((len(supercell),) * 3 + (3,) * 3) | ||
| mapped_clusters = np.zeros((len(supercell),) * 3, dtype=bool) | ||
| for atom in supercell_map: | ||
| i = atom.index | ||
| for entry in sheng: | ||
| if atom.site != entry.site_0: | ||
| continue | ||
| j = supercell_map.index(entry.site_1, entry.offset_1 + atom.offset) | ||
| k = supercell_map.index(entry.site_2, entry.offset_2 + atom.offset) | ||
| ijk = (i, j, k) | ||
| if mapped_clusters[ijk]: | ||
| raise Exception('Ambiguous force constant.' | ||
| ' Supercell is too small') | ||
| fc_array[ijk] = entry.fc | ||
| mapped_clusters[ijk] = True | ||
| fcs = hiphive.force_constants.ForceConstants.from_arrays( | ||
| supercell, fc3_array=fc_array) | ||
| return fcs |
| Metadata-Version: 1.2 | ||
| Name: hiphive | ||
| Version: 0.6 | ||
| Version: 0.7 | ||
| Summary: High-order force constants for the masses | ||
@@ -16,3 +16,4 @@ Home-page: http://hiphive.materialsmodeling.org/ | ||
| extensive tutorial can be found in the | ||
| `user guide <https://hiphive.materialsmodeling.org/>`_. | ||
| `user guide <https://hiphive.materialsmodeling.org/>`_. Complete examples of | ||
| using hiphive for force constants extaction can be found at `hiphive examples <https://gitlab.com/materials-modeling/hiphive-examples/>`_. | ||
@@ -83,4 +84,4 @@ **hiPhive** is written in Python, which allows | ||
| Also consult the `Credits <https://hiphive.materialsmodeling.org/credits>` page | ||
| of the documentation for additional references. | ||
| Also consult the `Credits <https://hiphive.materialsmodeling.org/credits>`_ | ||
| page of the documentation for additional references. | ||
@@ -87,0 +88,0 @@ **hiphive** and its development are hosted on |
@@ -5,4 +5,5 @@ ase | ||
| numpy>=1.12 | ||
| sklearn | ||
| scikit-learn | ||
| scipy | ||
| spglib | ||
| sympy | ||
| sympy>=1.1 |
@@ -39,12 +39,14 @@ README.rst | ||
| hiphive/fitting/fit_methods.py | ||
| hiphive/fitting/io.py | ||
| hiphive/fitting/model_selection.py | ||
| hiphive/fitting/oi.py | ||
| hiphive/fitting/optimizer.py | ||
| hiphive/fitting/split_bregman.py | ||
| hiphive/fitting/tools.py | ||
| hiphive/io/__init__.py | ||
| hiphive/io/logging.py | ||
| hiphive/io/phonopy.py | ||
| hiphive/io/pretty_table_prints.py | ||
| hiphive/io/read_write_files.py | ||
| hiphive/io/shengBTE.py | ||
| hiphive/input_output/__init__.py | ||
| hiphive/input_output/gpumd.py | ||
| hiphive/input_output/logging_tools.py | ||
| hiphive/input_output/phonopy.py | ||
| hiphive/input_output/pretty_table_prints.py | ||
| hiphive/input_output/read_write_files.py | ||
| hiphive/input_output/shengBTE.py | ||
| hiphive/md_tools/__init__.py | ||
@@ -51,0 +53,0 @@ hiphive/md_tools/spectral_energy_density.py |
@@ -33,3 +33,3 @@ """ | ||
| 'Paul Erhart'] | ||
| __copyright__ = '2019' | ||
| __copyright__ = '2020' | ||
| __license__ = 'MIT' | ||
@@ -39,3 +39,3 @@ __credits__ = ['Fredrik Eriksson', | ||
| 'Paul Erhart'] | ||
| __version__ = '0.6' | ||
| __version__ = '0.7' | ||
| __all__ = ['ClusterSpace', | ||
@@ -46,4 +46,3 @@ 'StructureContainer', | ||
| 'config', | ||
| 'enforce_rotational_sum_rules', | ||
| 'io'] | ||
| 'enforce_rotational_sum_rules'] | ||
| __maintainer__ = 'The hiPhive developers team' | ||
@@ -50,0 +49,0 @@ __maintainer_email__ = 'hiphive@materialsmodeling.org' |
@@ -5,9 +5,11 @@ """Contains a calculator which given an arbitrary list of clusters and | ||
| """ | ||
| import numpy as np | ||
| import math | ||
| from collections import defaultdict as dd | ||
| from typing import List, Tuple | ||
| import numpy as np | ||
| from ase import Atoms | ||
| from ase.calculators.calculator import Calculator, all_changes | ||
| from hiphive.force_constants import SortedForceConstants | ||
| from ase.calculators.calculator import Calculator, all_changes | ||
| from ase.geometry import find_mic | ||
| from hiphive.utilities import get_displacements | ||
| from .numba_calc import _get_forces | ||
@@ -25,5 +27,5 @@ | ||
| ----------- | ||
| fcs: ForceConstants | ||
| fcs : ForceConstants | ||
| the force constants instance must include the atomic configuration | ||
| max_disp: float | ||
| max_disp : float | ||
| maximum allowed displacement before calculator raises ValueError | ||
@@ -34,3 +36,3 @@ """ | ||
| def __init__(self, fcs, max_disp=3.0): | ||
| def __init__(self, fcs: SortedForceConstants, max_disp: float = 3.0): | ||
| Calculator.__init__(self) | ||
@@ -79,3 +81,5 @@ | ||
| def calculate(self, atoms=None, properties=['energy'], system_changes=all_changes): | ||
| def calculate(self, atoms: Atoms = None, | ||
| properties: List[str] = ['energy'], | ||
| system_changes: List[str] = all_changes) -> None: | ||
| Calculator.calculate(self, atoms, properties, system_changes) | ||
@@ -90,19 +94,15 @@ self._check_atoms() | ||
| def _check_atoms(self): | ||
| """Check that the atomic configuration, with which the calculator is | ||
| def _check_atoms(self) -> None: | ||
| """Checks that the atomic configuration, with which the calculator is | ||
| associated, is compatible with the ideal configuration provided during | ||
| initialization.""" | ||
| if len(self.atoms) != len(self.atoms_ideal): | ||
| raise ValueError('Length of atoms does not match reference atoms') | ||
| raise ValueError('Length of atoms does not match reference structure') | ||
| if not all(self.atoms.numbers == self.atoms_ideal.numbers): | ||
| raise ValueError('Atomic numbers does not match reference atoms') | ||
| raise ValueError('Atomic numbers do not match reference structure') | ||
| def _compute_displacements(self): | ||
| """Evaluate the atomic displacements between the current and the ideal | ||
| def _compute_displacements(self) -> None: | ||
| """Evaluates the atomic displacements between the current and the ideal | ||
| (reference) configuration.""" | ||
| displacements = [] | ||
| for pos, ideal_pos in zip(self.atoms.positions, self.atoms_ideal.positions): | ||
| v_ij = np.array([pos - ideal_pos]) | ||
| displacements.append(find_mic(v_ij, self.atoms.cell, pbc=True)[0][0]) | ||
| self.displacements = np.array(displacements) | ||
| self.displacements = get_displacements(self.atoms, self.atoms_ideal) | ||
@@ -115,4 +115,4 @@ # sanity check that displacements are not too large | ||
| def compute_energy_and_forces(self): | ||
| """Compute energy and forces. | ||
| def compute_energy_and_forces(self) -> Tuple[float, list]: | ||
| """Computes energy and forces. | ||
@@ -140,5 +140,5 @@ Returns | ||
| def __repr__(self): | ||
| def __repr__(self) -> str: | ||
| fc_dict_str = '{{{}: {}, ...}}'.format(self.clusters[0], self.force_constants[0]) | ||
| fcs_str = 'ForceConstants(fc_dict={}, atoms={!r})'.format(fc_dict_str, self.atoms_ideal) | ||
| return 'ForceConstantCalculator({})'.format(fcs_str) |
@@ -15,9 +15,9 @@ """ | ||
| from .core.orbits import Orbit | ||
| from .io.logging import logger | ||
| from .io.pretty_table_prints import print_table | ||
| from .io.read_write_files import (add_items_to_tarfile_pickle, | ||
| add_items_to_tarfile_custom, | ||
| add_list_to_tarfile_custom, | ||
| read_items_pickle, | ||
| read_list_custom) | ||
| from .input_output.logging_tools import logger | ||
| from .input_output.pretty_table_prints import print_table | ||
| from .input_output.read_write_files import (add_items_to_tarfile_pickle, | ||
| add_items_to_tarfile_custom, | ||
| add_list_to_tarfile_custom, | ||
| read_items_pickle, | ||
| read_list_custom) | ||
| from .cutoffs import Cutoffs, CutoffMaximumBody, BaseClusterFilter | ||
@@ -24,0 +24,0 @@ |
@@ -1,2 +0,2 @@ | ||
| from hiphive.io.logging import logger | ||
| from hiphive.input_output.logging_tools import logger | ||
@@ -3,0 +3,0 @@ |
@@ -10,3 +10,3 @@ """ | ||
| import numpy as np | ||
| from ..io.logging import logger | ||
| from ..input_output.logging_tools import logger | ||
@@ -116,6 +116,3 @@ # TODO: Rename logger | ||
| try: | ||
| pickle.dump(data, f) | ||
| except Exception: | ||
| raise Exception('Failed writing to file.') | ||
| pickle.dump(data, f) | ||
@@ -135,6 +132,3 @@ @staticmethod | ||
| """ | ||
| try: | ||
| data = pickle.load(f) | ||
| except Exception: | ||
| raise Exception('Failed loading from file.') | ||
| data = pickle.load(f) | ||
| atoms = aseAtoms(numbers=data['numbers'], | ||
@@ -141,0 +135,0 @@ scaled_positions=data['basis'], |
@@ -10,3 +10,3 @@ import itertools | ||
| from .orbits import get_orbits | ||
| from ..io.logging import logger | ||
| from ..input_output.logging_tools import logger | ||
| from .eigentensors import create_eigentensors as _create_eigentensors | ||
@@ -117,3 +117,2 @@ from .tensors import rotate_tensor_precalc, rotation_tensor_as_matrix | ||
| orders = cs.cutoffs.orders | ||
| permutations = BiMap() | ||
@@ -123,3 +122,2 @@ for order in orders: | ||
| permutations.append(permutation) | ||
| cs._permutations = permutations | ||
@@ -130,4 +128,2 @@ | ||
| # object | ||
| # TODO: Add kw prim to CS init. To use the supplied primitive cell instead of | ||
| # having spglib return a funky rotated cell | ||
| # TODO: Does the basis check need to be done? There might not be a problem that | ||
@@ -154,4 +150,3 @@ # it is close to 1 instead of 0 anymore. If thats the case it is better to keep | ||
| # TODO: is symprec the best tolerance to use for volume check? | ||
| cell_volume_match = np.isclose(spg_cell_volume, prototype_cell_volume, | ||
| atol=cs.symprec, rtol=0) | ||
| cell_volume_match = np.isclose(spg_cell_volume, prototype_cell_volume, atol=cs.symprec, rtol=0) | ||
@@ -163,13 +158,8 @@ if numbers_match and cell_volume_match: | ||
| basis = spgPrim[1] | ||
| if np.any(spgPrim[1] > (1 - cs.symprec)): | ||
| logger.debug('Found basis close to 1:\n' + | ||
| ' {}'.format(str(spgPrim[1]))) | ||
| basis = spgPrim[1].round(8) % 1 # TODO | ||
| logger.debug('Wrapping to:\n' + | ||
| ' {}'.format(str(basis))) | ||
| if np.any(basis > (1 - cs.symprec)): | ||
| logger.debug('Found basis close to 1:\n {}'.format(str(basis))) | ||
| basis = basis.round(8) % 1 # TODO | ||
| logger.debug('Wrapping to:\n {}'.format(str(basis))) | ||
| prim = Atoms(cell=spgPrim[0], scaled_positions=basis, numbers=spgPrim[2], pbc=True) | ||
| prim = Atoms(cell=spgPrim[0], | ||
| scaled_positions=basis, | ||
| numbers=spgPrim[2], pbc=True) | ||
| # log primitive cell information | ||
@@ -183,8 +173,6 @@ logger.info('Primitive cell:') | ||
| for symbol, spos in zip(prim.get_chemical_symbols(), prim.basis): | ||
| logger.info(' {:2} [{:9.5f} {:9.5f} {:9.5f}]'.format( | ||
| symbol, *spos)) | ||
| logger.info(' {:2} [{:9.5f} {:9.5f} {:9.5f}]'.format(symbol, *spos)) | ||
| else: | ||
| for sym, spos in zip(prim[:3].get_chemical_symbols(), prim[:3].basis): | ||
| logger.info(' {:2} [{:9.5f} {:9.5f} {:9.5f}]'.format( | ||
| sym, *spos)) | ||
| logger.info(' {:2} [{:9.5f} {:9.5f} {:9.5f}]'.format(sym, *spos)) | ||
| logger.info(' ...') | ||
@@ -202,5 +190,3 @@ logger.info('') | ||
| prim = cs._prim | ||
| symmetry_dataset = spg.get_symmetry_dataset(prim, symprec=cs.symprec) | ||
| cs._symmetry_dataset = symmetry_dataset | ||
@@ -210,6 +196,4 @@ | ||
| logger.info(' Spacegroup: {}'.format(cs.spacegroup)) | ||
| logger.info(' Unique atoms: {}'.format( | ||
| len(set(cs.wyckoff_sites)))) | ||
| logger.info(' Symmetry operations: {}'.format( | ||
| len(cs.rotation_matrices))) | ||
| logger.info(' Unique site: {}'.format(len(set(cs.wyckoff_sites)))) | ||
| logger.info(' Symmetry operations: {}'.format(len(cs.rotation_matrices))) | ||
| logger.info(' symprec: {:.2e}'.format(cs.symprec)) | ||
@@ -227,3 +211,2 @@ logger.info('') | ||
| tol = cs.symprec | ||
| atom_list = BiMap() | ||
@@ -237,7 +220,7 @@ | ||
| logger.info(' Maximum cutoff: {}'.format(cs.cutoffs.max_cutoff)) | ||
| # Find all the atoms which is neighbors to the atoms in the center cell | ||
| # The pair cutoff should be larger or equal than the others | ||
| cutoffs = [(cs.cutoffs.max_cutoff - tol) / 2] * len(cs._prim) | ||
| nl = NeighborList(cutoffs=cutoffs, skin=0, self_interaction=True, | ||
| bothways=True) | ||
| nl = NeighborList(cutoffs=cutoffs, skin=0, self_interaction=True, bothways=True) | ||
| nl.update(cs._prim) | ||
@@ -251,5 +234,4 @@ for i in range(len(cs._prim)): | ||
| nl = NeighborList( | ||
| cutoffs=[(cs.cutoffs.max_cutoff + tol) / 2] * | ||
| len(cs._prim), skin=0, self_interaction=True, | ||
| bothways=True) | ||
| cutoffs=[(cs.cutoffs.max_cutoff + tol) / 2] * len(cs._prim), | ||
| skin=0, self_interaction=True, bothways=True) | ||
| nl.update(cs._prim) | ||
@@ -268,4 +250,3 @@ distance_from_cutoff = tol | ||
| if distance_from_cutoff != tol: | ||
| msg = 'Maximum cutoff close to neighbor shell, change cutoff' | ||
| raise Exception(msg) | ||
| raise Exception('Maximum cutoff close to neighbor shell, change cutoff') | ||
@@ -288,10 +269,7 @@ msg = ' Found {} center atom{} with {} images totaling {} atoms'.format( | ||
| numbers = [cs._prim.numbers[a.site] for a in cs._atom_list] | ||
| # Make an atoms object out of the scaled positions | ||
| atoms = Atoms(cell=cs._prim.cell, | ||
| scaled_positions=spos, | ||
| numbers=numbers, | ||
| pbc=False) | ||
| atoms = Atoms(cell=cs._prim.cell, scaled_positions=spos, numbers=numbers, pbc=False) | ||
| cs._cluster_filter.setup(atoms) | ||
| cs._cluster_list = get_clusters(atoms, cs.cutoffs, len(cs._prim)) | ||
@@ -302,4 +280,3 @@ | ||
| logger.info(' Clusters: {}'.format(dict(counter))) | ||
| logger.info(' Total number of clusters: {}\n' | ||
| .format(sum(counter.values()))) | ||
| logger.info(' Total number of clusters: {}\n'.format(sum(counter.values()))) | ||
@@ -323,3 +300,2 @@ | ||
| cs._dropped_orbits = [] | ||
| for i in range(len(cs.orbits)): | ||
@@ -335,4 +311,3 @@ if i in orbits_to_drop: | ||
| logger.info(' Orbits: {}'.format(dict(counter))) | ||
| logger.info(' Total number of orbits: {}\n' | ||
| .format(sum(counter.values()))) | ||
| logger.info(' Total number of orbits: {}\n'.format(sum(counter.values()))) | ||
@@ -358,12 +333,9 @@ | ||
| for order in cs.cutoffs.orders: | ||
| n_ets[order] = sum(len(orb.eigentensors) for orb in cs.orbits | ||
| if orb.order == order) | ||
| n_ets[order] = sum(len(orb.eigentensors) for orb in cs.orbits if orb.order == order) | ||
| logger.info(' Eigentensors: {}'.format(n_ets)) | ||
| logger.info(' Total number of parameters: {}' | ||
| .format(sum(n_ets.values()))) | ||
| logger.info(' Total number of parameters: {}'.format(sum(n_ets.values()))) | ||
| if len(cs._dropped_orbits) > 0: | ||
| logger.info(' Discarded orbits:') | ||
| for orb in cs._dropped_orbits: | ||
| logger.info(' {}' | ||
| .format(cs.cluster_list[orb.prototype_index])) | ||
| logger.info(' {}'.format(cs.cluster_list[orb.prototype_index])) | ||
| logger.info('') | ||
@@ -370,0 +342,0 @@ |
@@ -8,3 +8,3 @@ # Contains the get_clusters function which generates clusters | ||
| from .utilities import BiMap | ||
| from ..io.logging import logger | ||
| from ..input_output.logging_tools import logger | ||
@@ -11,0 +11,0 @@ logger = logger.getChild('get_clusters') |
@@ -5,3 +5,3 @@ import numpy as np | ||
| from .tensors import rotate_tensor | ||
| from ..io.logging import logger | ||
| from ..input_output.logging_tools import logger | ||
| from .utilities import SparseMatrix | ||
@@ -8,0 +8,0 @@ |
@@ -10,9 +10,9 @@ """ | ||
| from .atoms import Atom | ||
| from ..io.logging import logger | ||
| from ..io.read_write_files import (add_items_to_tarfile_hdf5, | ||
| add_items_to_tarfile_pickle, | ||
| add_list_to_tarfile_custom, | ||
| read_items_hdf5, | ||
| read_items_pickle, | ||
| read_list_custom) | ||
| from ..input_output.logging_tools import logger | ||
| from ..input_output.read_write_files import (add_items_to_tarfile_hdf5, | ||
| add_items_to_tarfile_pickle, | ||
| add_list_to_tarfile_custom, | ||
| read_items_hdf5, | ||
| read_items_pickle, | ||
| read_list_custom) | ||
| logger = logger.getChild('orbits') | ||
@@ -335,6 +335,3 @@ | ||
| """ | ||
| try: | ||
| pickle.dump(self, f) | ||
| except Exception: | ||
| raise Exception('Failed writing to file.') | ||
| pickle.dump(self, f) | ||
@@ -354,6 +351,3 @@ @staticmethod | ||
| """ | ||
| try: | ||
| return pickle.load(f) | ||
| except Exception: | ||
| raise Exception('Failed loading from file.') | ||
| return pickle.load(f) | ||
@@ -360,0 +354,0 @@ |
@@ -10,3 +10,3 @@ """ | ||
| def enforce_rotational_sum_rules(cs, parameters, sum_rules, **kwargs): | ||
| def enforce_rotational_sum_rules(cs, parameters, sum_rules=None, alpha=1e-6): | ||
| """ Enforces rotational sum rules by projecting parameters. | ||
@@ -30,6 +30,2 @@ | ||
| i.e. less correction but higher stability [default: 1e-6] | ||
| iterations : int | ||
| number of iterations to run the projection since each step projects | ||
| the solution down to each nullspace in serial; keyword argument | ||
| passed to the optimizer [default: 10] | ||
@@ -55,27 +51,50 @@ Returns | ||
| fcp = ForceConstantPotential(cs, new_params) | ||
| """ | ||
| all_sum_rules = ['Huang', 'Born-Huang'] | ||
| # setup | ||
| parameters = parameters.copy() | ||
| # handle optional kwargs input | ||
| iterations = kwargs.pop('iterations', 10) | ||
| ridge_alpha = kwargs.pop('ridge_alpha', 1e-6) | ||
| if len(kwargs) != 0: | ||
| raise ValueError('Unrecognised kwargs: {}'.format(kwargs)) | ||
| if sum_rules is None: | ||
| sum_rules = all_sum_rules | ||
| # get constraint-matrix | ||
| M = get_rotational_constraint_matrix(cs, sum_rules) | ||
| # before fit | ||
| d = M.dot(parameters) | ||
| delta = np.linalg.norm(d) | ||
| print('Rotational sum-rules before, ||Ax|| = {:20.15f}'.format(delta)) | ||
| # fitting | ||
| ridge = Ridge(alpha=alpha, fit_intercept=False, solver='sparse_cg') | ||
| ridge.fit(M, d) | ||
| parameters -= ridge.coef_ | ||
| # after fit | ||
| d = M.dot(parameters) | ||
| delta = np.linalg.norm(d) | ||
| print('Rotational sum-rules after, ||Ax|| = {:20.15f}'.format(delta)) | ||
| return parameters | ||
| def get_rotational_constraint_matrix(cs, sum_rules=None): | ||
| all_sum_rules = ['Huang', 'Born-Huang'] | ||
| if sum_rules is None: | ||
| sum_rules = all_sum_rules | ||
| # setup | ||
| assert len(sum_rules) > 0 | ||
| for s in sum_rules: | ||
| if s not in all_sum_rules: | ||
| raise ValueError('Sum rule {} not allowed, select from {}'.format(s, all_sum_rules)) | ||
| # make orbit-parameter index map | ||
| params = [] | ||
| n = 0 | ||
| for orbit_index, orbit in enumerate(cs.orbits): | ||
| number_params_in_orbit = len(orbit.eigentensors) | ||
| params.append(list(range(n, n + number_params_in_orbit))) | ||
| n += number_params_in_orbit | ||
| params = _get_orbit_parameter_map(cs) | ||
| lookup = _get_fc_lookup(cs) | ||
| # create lookuptable for force constants | ||
| lookup = {} | ||
| for orbit_index, orbit in enumerate(cs.orbits): | ||
| for of in orbit.orientation_families: | ||
| for cluster_index, perm_index in zip(of.cluster_indices, of.permutation_indices): | ||
| cluster = cs._cluster_list[cluster_index] | ||
| perm = cs._permutations[perm_index] | ||
| lookup[tuple(cluster)] = [et.transpose(perm) for et in of.eigentensors], orbit_index | ||
| # append the sum rule matrices | ||
@@ -89,7 +108,5 @@ Ms = [] | ||
| Ms.append(_create_Born_Huang_constraint(*args)) | ||
| else: | ||
| raise ValueError('At least one unknown sum rule: {}'.format(sum_rule)) | ||
| # transform and stack matrices | ||
| cvs_trans = cs._cvs | ||
| for i, M in enumerate(Ms): | ||
@@ -102,14 +119,32 @@ row, col, data = [], [], [] | ||
| M = coo_matrix((data, (row, col)), shape=M.shape) | ||
| Ms[i] = M.dot(cvs_trans) | ||
| M = M.dot(cvs_trans) | ||
| M = M.toarray() | ||
| Ms[i] = M | ||
| for _ in range(iterations): | ||
| for M in Ms: | ||
| clf = Ridge(alpha=ridge_alpha) | ||
| d = M.dot(parameters) | ||
| clf.fit(M, d) | ||
| parameters -= clf.coef_ # subtract the correction | ||
| return np.vstack(Ms) | ||
| return parameters | ||
| def _get_orbit_parameter_map(cs): | ||
| # make orbit-parameter index map | ||
| params = [] | ||
| n = 0 | ||
| for orbit_index, orbit in enumerate(cs.orbits): | ||
| n_params_in_orbit = len(orbit.eigentensors) | ||
| params.append(list(range(n, n + n_params_in_orbit))) | ||
| n += n_params_in_orbit | ||
| return params | ||
| def _get_fc_lookup(cs): | ||
| # create lookuptable for force constants | ||
| lookup = {} | ||
| for orbit_index, orbit in enumerate(cs.orbits): | ||
| for of in orbit.orientation_families: | ||
| for cluster_index, perm_index in zip(of.cluster_indices, of.permutation_indices): | ||
| cluster = cs._cluster_list[cluster_index] | ||
| perm = cs._permutations[perm_index] | ||
| lookup[tuple(cluster)] = [et.transpose(perm) for et in of.eigentensors], orbit_index | ||
| return lookup | ||
| def _create_Huang_constraint(lookup, parameter_map, atom_list, prim): | ||
@@ -116,0 +151,0 @@ |
@@ -5,3 +5,3 @@ import itertools | ||
| from . import atoms as atoms_module | ||
| from ..io.logging import logger | ||
| from ..input_output.logging_tools import logger | ||
@@ -8,0 +8,0 @@ logger = logger.getChild('relate_structures') |
| import numpy as np | ||
| from ase import Atoms | ||
@@ -113,6 +112,2 @@ | ||
| def to_atoms(self): | ||
| return Atoms(cell=self.cell, positions=self.pos, | ||
| numbers=self.numbers, pbc=True) | ||
| def __len__(self): | ||
@@ -129,10 +124,2 @@ return len(self._spos) | ||
| @property | ||
| def pos(self): | ||
| return np.dot(self._spos, self._cell) | ||
| @property | ||
| def numbers(self): | ||
| return self._numbers | ||
| def __getitem__(self, index): | ||
@@ -187,5 +174,1 @@ if index >= len(self): | ||
| return len(self._supercell) | ||
| @property | ||
| def cell(self): | ||
| return self._supercell.cell |
@@ -9,3 +9,3 @@ """ | ||
| from ..io.logging import logger | ||
| from ..input_output.logging_tools import logger | ||
| from .utilities import SparseMatrix, BiMap | ||
@@ -20,4 +20,2 @@ from .orbits import get_orbits | ||
| logger.debug('Starting constructing constraint matrices') | ||
| # First we need to know the global index of a eigentensor given which | ||
@@ -40,2 +38,33 @@ # orbit it belongs to. | ||
| M = get_translational_constraint_matrix(cs) | ||
| nullspace = M.nullspace() | ||
| logger.debug('Assembling solutions...') | ||
| row = [] | ||
| col = [] | ||
| data = [] | ||
| for c, vec in enumerate(nullspace): | ||
| for r, _, v in vec.row_list(): | ||
| row.append(r) | ||
| col.append(c) | ||
| data.append(np.float64(v)) | ||
| # This is the final product, the constraint map (or constraint vectors) | ||
| cs._cvs = coo_matrix((data, (row, col)), shape=(len(vec), len(nullspace))) | ||
| logger.debug('{} degrees of freedom'.format(cs._cvs.shape[1])) | ||
| logger.debug('Finished constructing constraint vectors') | ||
| def get_translational_constraint_matrix(cs): | ||
| logger.debug('Starting constructing constraint matrices') | ||
| # First we need to know the global index of a eigentensor given which | ||
| # orbit it belongs to. | ||
| params = [] | ||
| n = 0 | ||
| for orbit_index, orbit in enumerate(cs.orbits): | ||
| nParams_in_orbit = len(orbit.eigentensors) | ||
| params.append(list(range(n, n + nParams_in_orbit))) | ||
| n += nParams_in_orbit | ||
| # The lookup is a fast way to get the eigentensors given a cluster Also the | ||
@@ -107,18 +136,3 @@ # orbit index is bundled to quickly find the correct parameter indices | ||
| logger.debug('Finished constructing constraint matrices') | ||
| logger.debug('Starting constructing constraint vectors') | ||
| nullspace = M.nullspace() | ||
| logger.debug('Assembling solutions...') | ||
| row = [] | ||
| col = [] | ||
| data = [] | ||
| for c, vec in enumerate(nullspace): | ||
| for r, _, v in vec.row_list(): | ||
| row.append(r) | ||
| col.append(c) | ||
| data.append(np.float64(v)) | ||
| # This is the final product, the constraint map (or constraint vectors) | ||
| cs._cvs = coo_matrix((data, (row, col)), shape=(len(vec), len(nullspace))) | ||
| logger.debug('{} degrees of freedom'.format(cs._cvs.shape[1])) | ||
| logger.debug('Finished constructing constraint vectors') | ||
| return M |
| """ | ||
| The ``utilities`` module contains various support functions and classes. | ||
| """ | ||
| import sympy | ||
| import numpy as np | ||
| from scipy.sparse import coo_matrix | ||
| from collections import defaultdict | ||
| from ..io.logging import Progress | ||
| from sympy.core import S | ||
| import sympy | ||
| from ..input_output.logging_tools import Progress | ||
| __all__ = ['Progress'] | ||
@@ -113,3 +117,13 @@ | ||
| def to_array(self): | ||
| """ Cast SparseMatrix instance to numpy array """ | ||
| row, col, data = [], [], [] | ||
| for r, c, v in self.row_list(): | ||
| row.append(r) | ||
| col.append(c) | ||
| data.append(np.float64(v)) | ||
| M = coo_matrix((data, (row, col)), shape=self.shape) | ||
| return M.toarray() | ||
| class BiMap: | ||
@@ -146,3 +160,2 @@ """Simple list like structure with fast dict-lookup | ||
| except KeyError: | ||
| print(self._dict) | ||
| raise ValueError('{} is not in list'.format(value)) | ||
@@ -149,0 +162,0 @@ |
+4
-38
| import pickle | ||
| import numpy as np | ||
| from ase.neighborlist import NeighborList | ||
| from hiphive.io.pretty_table_prints import table_array_to_string | ||
| from hiphive.input_output.pretty_table_prints import table_array_to_string | ||
@@ -27,8 +27,6 @@ | ||
| try: | ||
| self._cutoff_matrix = np.array(cutoff_matrix, dtype=np.float) | ||
| except Exception: | ||
| raise Exception('Please specify cutoffs as a matrix of numbers') | ||
| self._cutoff_matrix = np.array(cutoff_matrix, dtype=np.float) | ||
| if len(self._cutoff_matrix.shape) != 2: | ||
| raise TypeError('Please specify cutoff matrix as a 2D array') | ||
| raise ValueError('Please specify cutoff matrix as a 2D array') | ||
| for i, row in enumerate(self._cutoff_matrix): | ||
@@ -287,33 +285,1 @@ if np.any(row[i:] < 0): | ||
| return True | ||
| class MaxTripletDistance(BaseClusterFilter): | ||
| """Rejects large three-body clusters. | ||
| If two or all pairs within the cluster are further away from each | ||
| other than the specified distance the cluster is | ||
| rejected. Alternatively, only one pair distance can exceed the | ||
| specified cutoff. | ||
| The filter only rejects for three-body clusters. All other | ||
| clusters are accepted. | ||
| Parameters | ||
| ---------- | ||
| max_dist : float | ||
| maximum distance allowed for the two closest pairs | ||
| """ | ||
| def __init__(self, max_dist): | ||
| self._max_dist = max_dist | ||
| def __call__(self, cluster): | ||
| cluster = tuple(set(cluster)) | ||
| if len(cluster) != 3: | ||
| return True | ||
| i, j, k = cluster | ||
| ij = self._atoms.get_distance(i, j) < self._max_dist | ||
| ik = self._atoms.get_distance(i, k) < self._max_dist | ||
| jk = self._atoms.get_distance(j, k) < self._max_dist | ||
| return (ij and (ik or jk)) or (ik and jk) |
@@ -5,3 +5,3 @@ from .optimizer import Optimizer | ||
| from .fit_methods import fit, available_fit_methods | ||
| from .io import _read_pickle as read_summary | ||
| from .oi import _read_pickle as read_summary | ||
@@ -8,0 +8,0 @@ __all__ = ['fit', |
@@ -9,3 +9,3 @@ """ | ||
| from .fit_methods import available_fit_methods | ||
| from .io import _write_pickle | ||
| from .oi import _write_pickle | ||
@@ -29,4 +29,4 @@ | ||
| method to be used for training; possible choice are | ||
| "least-squares", "lasso", "elasticnet", "bayesian-ridge", "ardr", | ||
| "rfe", "split-bregman" | ||
| "ardr", "bayesian-ridge", "elasticnet", "lasso", "least-squares", | ||
| "omp", "rfe", "ridge", "split-bregman" | ||
| standardize : bool | ||
@@ -157,7 +157,8 @@ if True the fit matrix and target values are standardized before fitting, | ||
| def __str__(self) -> str: | ||
| info = self.summary | ||
| width = 54 | ||
| s = [] | ||
| s.append(' {} '.format(self.__class__.__name__).center(width, '=')) | ||
| for key in sorted(self.summary.keys()): | ||
| value = self.summary[key] | ||
| for key in info.keys(): | ||
| value = info[key] | ||
| if isinstance(value, (str, int, np.integer)): | ||
@@ -164,0 +165,0 @@ s.append('{:30} : {}'.format(key, value)) |
@@ -7,2 +7,3 @@ """ | ||
| from sklearn.model_selection import KFold, ShuffleSplit | ||
| from sklearn.metrics import r2_score | ||
| from typing import Any, Dict, Tuple | ||
@@ -13,2 +14,3 @@ from .base_optimizer import BaseOptimizer | ||
| from .tools import ScatterData | ||
| from .model_selection import get_model_metrics | ||
@@ -40,3 +42,3 @@ | ||
| ---------- | ||
| fit_data : tupe(numpy.ndarray, numpy.ndarray) | ||
| fit_data : tuple(numpy.ndarray, numpy.ndarray) | ||
| the first element of the tuple represents the fit matrix `A` | ||
@@ -49,4 +51,4 @@ (`N, M` array) while the second element represents the vector | ||
| method to be used for training; possible choice are | ||
| "least-squares", "lasso", "elasticnet", "bayesian-ridge", "ardr", | ||
| "rfe", "split-bregman" | ||
| "ardr", "bayesian-ridge", "elasticnet", "lasso", "least-squares", | ||
| "omp", "rfe", "ridge", "split-bregman" | ||
| standardize : bool | ||
@@ -91,4 +93,3 @@ if True the fit matrix and target values are standardized before fitting, | ||
| super().__init__(fit_data, fit_method, standardize, check_condition, | ||
| seed) | ||
| super().__init__(fit_data, fit_method, standardize, check_condition, seed) | ||
@@ -103,3 +104,2 @@ if validation_method not in validation_methods.keys(): | ||
| self._n_splits = n_splits | ||
| self._set_kwargs(kwargs) | ||
@@ -118,3 +118,3 @@ | ||
| self._rmse_valid_splits = None | ||
| self._rmse_train_final = None | ||
| self.model_metrics = {} | ||
@@ -126,4 +126,11 @@ def train(self) -> None: | ||
| **self._fit_kwargs) | ||
| self._rmse_train_final = self.compute_rmse(self._A, self._y) | ||
| y_train_predicted = np.dot(self._A, self.parameters) | ||
| metrics = get_model_metrics(self._A, self.parameters, self._y, y_train_predicted) | ||
| # finalize metrics | ||
| self.model_metrics['rmse_train_final'] = metrics['rmse_train'] | ||
| self.model_metrics['R2_train'] = metrics['R2_train'] | ||
| self.model_metrics['AIC'] = metrics['AIC'] | ||
| self.model_metrics['BIC'] = metrics['BIC'] | ||
| def validate(self) -> None: | ||
@@ -160,2 +167,5 @@ """ Runs validation. """ | ||
| self.model_metrics['rmse_validation'] = np.sqrt(np.mean(self._rmse_valid_splits**2)) | ||
| self.model_metrics['R2_validation'] = r2_score(valid_target, valid_predicted) | ||
| def _set_kwargs(self, kwargs: dict) -> None: | ||
@@ -186,2 +196,3 @@ """ | ||
| """ comprehensive information about the optimizer """ | ||
| info = super().summary | ||
@@ -192,3 +203,2 @@ | ||
| info['n_splits'] = self.n_splits | ||
| info['rmse_train_final'] = self.rmse_train_final | ||
| info['rmse_train'] = self.rmse_train | ||
@@ -201,2 +211,5 @@ info['rmse_train_splits'] = self.rmse_train_splits | ||
| # add metrics | ||
| info = {**info, **self.model_metrics} | ||
| # add kwargs used for fitting and splitting | ||
@@ -244,3 +257,5 @@ info = {**info, **self._fit_kwargs, **self._split_kwargs} | ||
| """ | ||
| return self._rmse_train_final | ||
| if 'rmse_train_final' not in self.model_metrics: | ||
| return None | ||
| return self.model_metrics['rmse_train_final'] | ||
@@ -247,0 +262,0 @@ @property |
@@ -39,4 +39,4 @@ """ | ||
| method to be used for training; possible choice are | ||
| "least-squares", "lasso", "elasticnet", "bayesian-ridge", "ardr", | ||
| "rfe", "split-bregman" | ||
| "ardr", "bayesian-ridge", "elasticnet", "lasso", "least-squares", | ||
| "omp", "rfe", "ridge", "split-bregman" | ||
| standardize : bool | ||
@@ -73,4 +73,3 @@ if True the fit matrix and target values are standardized before fitting, | ||
| super().__init__(fit_data, fit_method, standardize, check_condition, | ||
| seed) | ||
| super().__init__(fit_data, fit_method, standardize, check_condition, seed) | ||
@@ -90,3 +89,3 @@ # set training size | ||
| self._test_set_list = None | ||
| self._parameter_vectors = None | ||
| self._parameters_splits = None | ||
| self._parameters_std = None | ||
@@ -113,4 +112,3 @@ self._rmse_train_ensemble = None | ||
| self.train_size, replace=self.bootstrap) | ||
| test_set = np.setdiff1d( | ||
| range(self.n_target_values), train_set) | ||
| test_set = np.setdiff1d(range(self.n_target_values), train_set) | ||
@@ -127,11 +125,7 @@ # train | ||
| # collect data from each fit | ||
| self._parameter_vectors = np.array( | ||
| [opt.parameters for opt in optimizers]) | ||
| self._parameters_splits = np.array([opt.parameters for opt in optimizers]) | ||
| self._train_set_list = [opt.train_set for opt in optimizers] | ||
| self._test_set_list = [opt.test_set for opt in optimizers] | ||
| self._rmse_train_ensemble = np.array( | ||
| [opt.rmse_train for opt in optimizers]) | ||
| self._rmse_test_ensemble = np.array( | ||
| [opt.rmse_test for opt in optimizers]) | ||
| self._rmse_train_ensemble = np.array([opt.rmse_train for opt in optimizers]) | ||
| self._rmse_test_ensemble = np.array([opt.rmse_test for opt in optimizers]) | ||
@@ -142,9 +136,6 @@ def _construct_final_model(self) -> None: | ||
| """ | ||
| self._fit_results['parameters'] = np.mean( | ||
| self.parameter_vectors, axis=0) | ||
| self._parameters_std = np.std(self.parameter_vectors, axis=0) | ||
| self._fit_results['parameters'] = np.mean(self.parameters_splits, axis=0) | ||
| self._parameters_std = np.std(self.parameters_splits, axis=0) | ||
| def predict(self, | ||
| A: np.ndarray, | ||
| return_std: bool = False) \ | ||
| def predict(self, A: np.ndarray, return_std: bool = False) \ | ||
| -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: | ||
@@ -172,3 +163,3 @@ """ | ||
| if return_std: | ||
| predictions = np.dot(A, self.parameter_vectors.T) | ||
| predictions = np.dot(A, self.parameters_splits.T) | ||
| if len(predictions.shape) == 1: # shape is (N, ) | ||
@@ -188,6 +179,6 @@ std = np.std(predictions) | ||
| """ | ||
| if self.parameter_vectors is None: | ||
| if self.parameters_splits is None: | ||
| return None | ||
| error_matrix = np.zeros((self._n_rows, self.ensemble_size)) | ||
| for i, parameters in enumerate(self.parameter_vectors): | ||
| for i, parameters in enumerate(self.parameters_splits): | ||
| error_matrix[:, i] = np.dot(self._A, parameters) - self._y | ||
@@ -232,5 +223,5 @@ return error_matrix | ||
| @property | ||
| def parameter_vectors(self) -> List[np.ndarray]: | ||
| """ all parameter vectors in the ensemble """ | ||
| return self._parameter_vectors | ||
| def parameters_splits(self) -> List[np.ndarray]: | ||
| """ all parameters vectors in the ensemble """ | ||
| return self._parameters_splits | ||
@@ -237,0 +228,0 @@ @property |
@@ -22,2 +22,4 @@ """ | ||
| Ridge, | ||
| OrthogonalMatchingPursuit, | ||
| OrthogonalMatchingPursuitCV, | ||
| RidgeCV, | ||
@@ -32,3 +34,4 @@ ElasticNet, | ||
| from typing import Any, Dict, List, Union | ||
| from ..io.logging import logger | ||
| from ..input_output.logging_tools import logger | ||
| from .split_bregman import fit_split_bregman | ||
@@ -76,3 +79,3 @@ | ||
| if check_condition: | ||
| if check_condition and X.shape[0] >= X.shape[1]: | ||
| cond = np.linalg.cond(X) | ||
@@ -207,2 +210,99 @@ if cond > 1e10: | ||
| def _fit_adaptive_lasso(X, y, stop_tol=1e-5, n_lasso_iters=30, alpha=None, **kwargs): | ||
| """ | ||
| Returns the solution `a` to the linear problem `Xa=y` obtained by | ||
| using an adaptive LASSO method. | ||
| Parameters | ||
| ---------- | ||
| X | ||
| fit matrix | ||
| y | ||
| target array | ||
| alpha | ||
| alpha value | ||
| lasso_iter | ||
| number of LASSO fits to carry out | ||
| """ | ||
| def update_weights(x): | ||
| return 1 / (2 * np.sqrt(np.abs(x)) + 1e-12) | ||
| # line scan | ||
| if alpha is None: | ||
| return _fit_adaptive_lasso_line_scan( | ||
| X, y, n_lasso_iters=n_lasso_iters, stop_tol=stop_tol, **kwargs) | ||
| # adaptive-lasso fit | ||
| n_cols = X.shape[1] | ||
| weights = np.ones(n_cols) | ||
| x = np.zeros(n_cols) | ||
| for it in range(n_lasso_iters): | ||
| X_w = X / weights.reshape(1, -1) | ||
| lasso = Lasso(alpha=alpha, fit_intercept=False, **kwargs) | ||
| lasso.fit(X_w, y) | ||
| x_new = lasso.coef_ / weights | ||
| dx = np.linalg.norm(x_new - x) | ||
| x = x_new | ||
| weights = update_weights(x) | ||
| print('adaptive-lasso iter {:3d}, |x|: {:11.8f}, |dx|: {:11.8f}, |dx|/|x|: {:11.8f}' | ||
| .format(it, np.linalg.norm(x), dx, dx/np.linalg.norm(x))) | ||
| if dx < stop_tol: | ||
| break | ||
| results = dict(parameters=x, weights=weights) | ||
| return results | ||
| def _fit_adaptive_lasso_line_scan(X: np.ndarray, | ||
| y: np.ndarray, | ||
| cv_splits: int = 5, | ||
| alphas: List[float] = None, | ||
| **kwargs) -> Dict[str, Any]: | ||
| """Adaptive-lasso with line-scan for optimal alpha. | ||
| Parameters | ||
| ----------- | ||
| X | ||
| fit matrix | ||
| y | ||
| target array | ||
| alphas | ||
| list of alpha values to be evaluated. The optimal | ||
| alpha found will be used in the final fit. | ||
| cv_splits | ||
| how many CV splits to carry out when evaluating each lambda value. | ||
| """ | ||
| from .cross_validation import CrossValidationEstimator | ||
| # default lambda values to scan | ||
| if alphas is None: | ||
| alphas = np.logspace(-7, -1, 15) | ||
| # run lin-scan of lambda values | ||
| cv_data = [] | ||
| for alpha in alphas: | ||
| cve = CrossValidationEstimator((X, y), fit_method='adaptive-lasso', | ||
| validation_method='shuffle-split', | ||
| alpha=alpha, test_size=0.1, train_size=0.9, | ||
| n_splits=cv_splits, **kwargs) | ||
| cve.validate() | ||
| print('adaptive-lasso alpha {}, cv-rmse {:11.8f}'.format(alpha, cve.rmse_validation)) | ||
| cv_data.append([alpha, cve.rmse_validation]) | ||
| # select best lambda | ||
| cv_data = np.array(cv_data) | ||
| optimal_ind = cv_data[:, 1].argmin() | ||
| alpha_optimal = cv_data[optimal_ind, 0] | ||
| # final fit with optimal lambda | ||
| results = _fit_adaptive_lasso(X, y, alpha=alpha_optimal, **kwargs) | ||
| results['alpha_optimal'] = alpha_optimal | ||
| return results | ||
| def _fit_ridge(X, y, alpha=None, fit_intercept=False, **kwargs): | ||
@@ -399,3 +499,2 @@ results = dict() | ||
| """ | ||
| from .cross_validation import CrossValidationEstimator | ||
@@ -451,14 +550,18 @@ | ||
| def _get_tags(self): | ||
| from sklearn.base import _DEFAULT_TAGS | ||
| return _DEFAULT_TAGS | ||
| def fit_rfe(X: np.ndarray, | ||
| y: np.ndarray, | ||
| n_features: int = None, | ||
| step: Union[int, float] = 0.04, | ||
| estimator: str = 'least-squares', | ||
| final_estimator: str = None, | ||
| estimator_kwargs: dict = {}, | ||
| final_estimator_kwargs: dict = {}, | ||
| cv_splits: int = 5, | ||
| n_jobs: int = -1, | ||
| **rfe_kwargs): | ||
| def _fit_rfe(X: np.ndarray, | ||
| y: np.ndarray, | ||
| n_features: int = None, | ||
| step: Union[int, float] = 0.04, | ||
| estimator: str = 'least-squares', | ||
| final_estimator: str = None, | ||
| estimator_kwargs: dict = {}, | ||
| final_estimator_kwargs: dict = {}, | ||
| cv_splits: int = 5, | ||
| n_jobs: int = -1, | ||
| **rfe_kwargs): | ||
| """ | ||
@@ -526,2 +629,45 @@ Returns the solution `a` to the linear problem `Xa=y` obtained by | ||
| def _fit_omp(X: np.ndarray, y: np.ndarray, | ||
| n_nonzero_coefs: int = None, fit_intercept: bool = False, | ||
| cv_splits: int = 5, | ||
| n_jobs: int = -1, | ||
| **kwargs) -> Dict[str, Any]: | ||
| """ | ||
| Fitting using Orthogonal Matching Pursuit (OMP) see | ||
| https://scikit-learn.org/stable/modules/linear_model.html#omp | ||
| Parameters | ||
| ----------- | ||
| X | ||
| fit matrix | ||
| y | ||
| target array | ||
| n_nonzero_coefs | ||
| number of non-zero parameters to obtain, if None will find the | ||
| optimal number via CV | ||
| cv_splits | ||
| number of cv-splits to carry out if finding optimal n_features | ||
| n_jobs | ||
| number of cores to use during the cross validation. | ||
| -1 means using all processors. | ||
| """ | ||
| if n_nonzero_coefs is None: | ||
| n_cols = X.shape[1] | ||
| cv = ShuffleSplit(train_size=0.9, test_size=0.1, n_splits=cv_splits) | ||
| ompcv = OrthogonalMatchingPursuitCV( | ||
| fit_intercept=fit_intercept, max_iter=n_cols, cv=cv, n_jobs=n_jobs, **kwargs) | ||
| ompcv.fit(X, y) | ||
| parameters = ompcv.coef_ | ||
| else: | ||
| omp = OrthogonalMatchingPursuit( | ||
| n_nonzero_coefs=n_nonzero_coefs, fit_intercept=fit_intercept, **kwargs) | ||
| omp.fit(X, y) | ||
| parameters = omp.coef_ | ||
| results = dict() | ||
| results['parameters'] = parameters | ||
| return results | ||
| fit_methods = OrderedDict([ | ||
@@ -531,2 +677,3 @@ ('least-squares', _fit_least_squares), | ||
| ('ridge', _fit_ridge), | ||
| ('omp', _fit_omp), | ||
| ('bayesian-ridge', _fit_bayesian_ridge), | ||
@@ -536,4 +683,6 @@ ('elasticnet', _fit_elasticnet), | ||
| ('ardr', _fit_ardr), | ||
| ('rfe', fit_rfe), | ||
| ('adaptive-lasso', _fit_adaptive_lasso), | ||
| ('rfe', _fit_rfe), | ||
| ]) | ||
| available_fit_methods = sorted(fit_methods.keys()) |
@@ -6,2 +6,4 @@ """ | ||
| from sklearn.model_selection import train_test_split | ||
| from sklearn.metrics import r2_score | ||
| from .model_selection import get_model_metrics | ||
| from typing import Any, Dict, List, Tuple, Union | ||
@@ -39,4 +41,4 @@ from .base_optimizer import BaseOptimizer | ||
| method to be used for training; possible choice are | ||
| "least-squares", "lasso", "elasticnet", "bayesian-ridge", "ardr", | ||
| "rfe", "split-bregman" | ||
| "ardr", "bayesian-ridge", "elasticnet", "lasso", "least-squares", | ||
| "omp", "rfe", "ridge", "split-bregman" | ||
| standardize : bool | ||
@@ -77,3 +79,3 @@ if True the fit matrix and target values are standardized before fitting, | ||
| standardize: bool = True, | ||
| train_size: Union[int, float] = 0.75, | ||
| train_size: Union[int, float] = 0.9, | ||
| test_size: Union[int, float] = None, | ||
@@ -95,8 +97,5 @@ train_set: Union[Tuple[int], List[int]] = None, | ||
| # will be populate once running train | ||
| self._rmse_train = None | ||
| self._rmse_test = None | ||
| self._contributions_train = None | ||
| self._contributions_test = None | ||
| self.train_scatter_data = None | ||
| self.test_scatter_data = None | ||
| self.model_metrics = {} | ||
@@ -114,5 +113,8 @@ def train(self) -> None: | ||
| **self._kwargs) | ||
| self._rmse_train = self.compute_rmse(A_train, y_train) | ||
| self.train_scatter_data = ScatterData(y_train, self.predict(A_train)) | ||
| y_train_predicted = self.predict(A_train) | ||
| self.train_scatter_data = ScatterData(y_train, y_train_predicted) | ||
| # get training metrics | ||
| metrics = get_model_metrics(A_train, self.parameters, y_train, y_train_predicted) | ||
| # perform testing | ||
@@ -122,8 +124,19 @@ if self.test_set is not None: | ||
| y_test = self._y[self.test_set] | ||
| self._rmse_test = self.compute_rmse(A_test, y_test) | ||
| self.test_scatter_data = ScatterData(y_test, self.predict(A_test)) | ||
| y_test_predicted = self.predict(A_test) | ||
| rmse_test = self.compute_rmse(A_test, y_test) | ||
| r2_test = r2_score(y_test, y_test_predicted) | ||
| self.test_scatter_data = ScatterData(y_test, y_test_predicted) | ||
| else: | ||
| self._rmse_test = None | ||
| rmse_test = None | ||
| r2_test = None | ||
| self.test_scatter_data = None | ||
| # finalize metrics | ||
| self.model_metrics['rmse_train'] = metrics['rmse_train'] | ||
| self.model_metrics['rmse_test'] = rmse_test | ||
| self.model_metrics['R2_train'] = metrics['R2_train'] | ||
| self.model_metrics['R2_test'] = r2_test | ||
| self.model_metrics['AIC'] = metrics['AIC'] | ||
| self.model_metrics['BIC'] = metrics['BIC'] | ||
| def _setup_rows(self, | ||
@@ -207,5 +220,6 @@ train_size: Union[int, float], | ||
| # add metrics | ||
| info = {**info, **self.model_metrics} | ||
| # Add class specific data | ||
| info['rmse_train'] = self.rmse_train | ||
| info['rmse_test'] = self.rmse_test | ||
| info['train_size'] = self.train_size | ||
@@ -237,3 +251,5 @@ info['train_set'] = self.train_set | ||
| """ root mean squared error for training set """ | ||
| return self._rmse_train | ||
| if 'rmse_train' not in self.model_metrics: | ||
| return None | ||
| return self.model_metrics['rmse_train'] | ||
@@ -243,17 +259,7 @@ @property | ||
| """ root mean squared error for test set """ | ||
| return self._rmse_test | ||
| if 'rmse_test' not in self.model_metrics: | ||
| return None | ||
| return self.model_metrics['rmse_test'] | ||
| @property | ||
| def contributions_train(self) -> np.ndarray: | ||
| """ average contribution to the predicted values for | ||
| the train set from each parameter """ | ||
| return self._contributions_train | ||
| @property | ||
| def contributions_test(self) -> np.ndarray: | ||
| """ average contribution to the predicted values for | ||
| the test set from each parameter """ | ||
| return self._contributions_test | ||
| @property | ||
| def train_set(self) -> List[int]: | ||
@@ -260,0 +266,0 @@ """ indices of rows included in the training set """ |
@@ -5,16 +5,139 @@ """ | ||
| doi:10.1137/080725891 | ||
| This implementation was based on the CSLD code written by Fei Zhou (周非), | ||
| Weston Nielson, Yi Xia, and Vidvuds Ozoliņš and makes use of a right | ||
| preconditioner for increased efficiency. | ||
| The preconditioning algorithm is described in Compressive sensing lattice | ||
| dynamics. I. General formalism Fei Zhou (周非), Weston Nielson, Yi Xia, | ||
| and Vidvuds Ozoliņš Phys. Rev. B 100, 184308 | ||
| """ | ||
| import numpy as np | ||
| from scipy.optimize import minimize | ||
| from typing import Any, Dict | ||
| import scipy.linalg | ||
| from typing import Dict, Optional, Any, List, Union | ||
| from hiphive.input_output.logging_tools import logger | ||
| logger = logger.getChild(__name__) | ||
| def fit_split_bregman(A: np.ndarray, | ||
| y: np.ndarray, | ||
| mu: float = 1e-3, | ||
| lmbda: float = 100, | ||
| mu: Optional[Union[float, List[float]]] = None, | ||
| lmbda: float = 3., | ||
| n_iters: int = 1000, | ||
| tol: float = 1e-6) -> Dict[str, Any]: | ||
| tol: float = 1e-4, | ||
| cg_tol: float = 1e-1, | ||
| cg_n_iters: Optional[int] = None, | ||
| iprint: Optional[int] = None, | ||
| cv_splits: int = 5, | ||
| ) -> Dict[str, Any]: | ||
| r""" | ||
| Determines the solution :math:`\\boldsymbol{x}` to the linear | ||
| problem :math:`\\boldsymbol{A}\\boldsymbol{x}=\\boldsymbol{y}` using | ||
| the split-Bregman algorithm described in T. Goldstein and S. Osher, | ||
| SIAM J. Imaging Sci. 2, 323 (2009); doi:10.1137/080725891. | ||
| The obtained parameters are returned in the form of a | ||
| dictionary with a key named `parameters`. | ||
| In compressive sensing, the solution vector :math:`\boldsymbol{x}` is given | ||
| by | ||
| .. math:: | ||
| \boldsymbol{x} = | ||
| \arg\min_{\boldsymbol{x}} \left\Vert\boldsymbol{x}\right\Vert_1 | ||
| + \mu \left\Vert\boldsymbol{A}\boldsymbol{x} | ||
| - \boldsymbol{y}\right\Vert^2_2, | ||
| where :math:`\mu` and :math:`\lambda` are hyperparameters that control the | ||
| sparseness of the solution and the efficiency of the algorithm. | ||
| This implementation was based on the CSLD code written by Fei Zhou (周非), | ||
| Weston Nielson, Yi Xia, and Vidvuds Ozoliņš and makes use of a right | ||
| preconditioner for increased efficiency. | ||
| The preconditioning algorithm is described in Compressive sensing lattice | ||
| dynamics. I. General formalism Fei Zhou (周非), Weston Nielson, Yi Xia, | ||
| and Vidvuds Ozoliņš Phys. Rev. B 100, 184308 | ||
| Parameters | ||
| ---------- | ||
| A | ||
| fit matrix | ||
| y | ||
| target array | ||
| mu | ||
| sparseness parameter, can be given as a single value, a list of | ||
| values or None. If a list of values is given, cross validation will be | ||
| used to determine the optimal value. If None is given, a default set of | ||
| values will be tested using cross validation. | ||
| lmbda | ||
| weight of additional L2-norm in split-Bregman | ||
| n_iters | ||
| maximal number of split-Bregman iterations | ||
| tol | ||
| convergence criterion iterative minimization | ||
| cg_n_iters | ||
| maximum number of conjugate gradient iterations. If ``None``, a | ||
| reasonable guess will be made based on the number of free parameters | ||
| cg_tol | ||
| relative tolerance for converging the conjugate gradient step. | ||
| iprint | ||
| controls the frequency of logging output. ``iprint=None`` means no | ||
| output; ``iprint = n`` means print the status of the minimization every | ||
| n steps | ||
| cv_splits | ||
| if mu is None or multiple mu's provided, this controls how many CV | ||
| splits to carry out when evaluating each mu value. | ||
| """ | ||
| if mu is None: | ||
| mu = np.logspace(2, 6, 5) | ||
| if isinstance(mu, (float, int, np.float, np.int)): | ||
| results = _split_bregman(A, y, mu, lmbda, n_iters, tol, cg_tol, cg_n_iters, iprint) | ||
| else: | ||
| # multiple mu values given, use CV to select the best one | ||
| from .cross_validation import CrossValidationEstimator | ||
| cv_data = [] | ||
| for mu_value in mu: | ||
| cve = CrossValidationEstimator( | ||
| (A, y), fit_method='split-bregman', | ||
| n_splits=cv_splits, mu=mu_value, lmbda=lmbda, n_iters=n_iters, | ||
| tol=tol, cg_tol=cg_tol, cg_n_iters=cg_n_iters, iprint=iprint) | ||
| cve.validate() | ||
| rmse = cve.rmse_validation | ||
| cv_data.append([mu_value, rmse]) | ||
| if iprint: | ||
| logger.info("mu: {}, rmse: {:.4f}".format(mu_value, rmse)) | ||
| # select best lambda | ||
| cv_data = np.array(cv_data) | ||
| optimal_ind = cv_data[:, 1].argmin() | ||
| mu_optimal = cv_data[optimal_ind, 0] | ||
| # final fit with optimal lambda | ||
| results = _split_bregman(A, y, mu_optimal, lmbda, | ||
| n_iters, tol, cg_tol, | ||
| cg_n_iters, iprint) | ||
| results['mu_optimal'] = mu_optimal | ||
| return results | ||
| def _split_bregman(A: np.ndarray, | ||
| y: np.ndarray, | ||
| mu: float, | ||
| lmbda: float, | ||
| n_iters: int, | ||
| tol: float, | ||
| cg_tol: float, | ||
| cg_n_iters: int, | ||
| iprint: Optional[int], | ||
| ) -> Dict[str, np.ndarray]: | ||
| """ | ||
| Determines the solution :math:`\\boldsymbol{x}` to the linear | ||
@@ -27,2 +150,10 @@ problem :math:`\\boldsymbol{A}\\boldsymbol{x}=\\boldsymbol{y}` using | ||
| This implementation was based on the CSLD code written by Fei Zhou (周非), | ||
| Weston Nielson, Yi Xia, and Vidvuds Ozoliņš and makes use of a right | ||
| preconditioner for increased efficiency. | ||
| The preconditioning algorithm is described in Compressive sensing lattice | ||
| dynamics. I. General formalism Fei Zhou (周非), Weston Nielson, Yi Xia, | ||
| and Vidvuds Ozoliņš Phys. Rev. B 100, 184308 | ||
| Parameters | ||
@@ -42,5 +173,14 @@ ---------- | ||
| convergence criterion iterative minimization | ||
| cg_n_iters | ||
| maximum number of conjugate gradient iterations. If ``None``, a | ||
| reasonable guess will be made based on the number of free parameters | ||
| cg_tol | ||
| relative tolerance for converging the conjugate gradient step. | ||
| iprint | ||
| controls the frequency of logging output. ``iprint=None`` means no | ||
| output; ``iprint = n`` means print the status of the minimization every | ||
| n steps | ||
| """ | ||
| def _shrink(y: np.ndarray, alpha: float) -> np.ndarray: | ||
| def _shrink(y: np.ndarray) -> np.ndarray: | ||
| """ | ||
@@ -50,4 +190,11 @@ Shrinkage operator as defined in Eq. (11) of the paper by Nelson | ||
| """ | ||
| return np.sign(y) * np.maximum(np.abs(y) - alpha, 0.0) | ||
| return np.sign(y) * np.maximum(np.abs(y) - 1 / lmbda, 0.0) | ||
| if not cg_n_iters: | ||
| cg_n_iters = int(np.max([10, A.shape[1] / 2.])) | ||
| preconditioner = _get_right_preconditioner(A, 1 / mu, lmbda) | ||
| A = np.dot(A, preconditioner) | ||
| n_cols = A.shape[1] | ||
@@ -57,40 +204,44 @@ d = np.zeros(n_cols) | ||
| x = np.zeros(n_cols) | ||
| dx_norm = 1. | ||
| old_norm = 0.0 | ||
| for k in range(n_iters): | ||
| x_prev = x | ||
| # Precompute for speed. | ||
| AtA = np.dot(A.conj().transpose(), A) | ||
| ftA = np.dot(y.conj().transpose(), A) | ||
| ii = 0 | ||
| for i in range(n_iters): | ||
| args = (A, y, mu, lmbda, d, b, AtA, ftA) | ||
| res = minimize(_objective_function, x, args, method='BFGS', | ||
| options={'disp': False}, | ||
| jac=_objective_function_derivative) | ||
| x = res.x | ||
| # The next two lines cover equations 18, 21, and 20 in the Nelson paper. | ||
| # except we apply mu to the L2 term instead of the L1 term | ||
| x = _minimize_cg(x, A, y, d, b, mu, lmbda, cg_n_iters, cg_tol * dx_norm) | ||
| d = _shrink(x + b) | ||
| b = b + x - d | ||
| d = _shrink(mu*x + b, 1.0/lmbda) | ||
| b = b + mu*x - d | ||
| dx_norm = np.linalg.norm(x - x_prev) / np.linalg.norm(x) | ||
| new_norm = np.linalg.norm(x) | ||
| ii = ii + 1 | ||
| if iprint and (k + 1) % iprint == 1: | ||
| logger.info("Split Bregman it={}, |dU|/|U|={:.6}".format(k + 1, | ||
| dx_norm)) | ||
| if abs(new_norm-old_norm) < tol: | ||
| if dx_norm <= tol: | ||
| break | ||
| old_norm = new_norm | ||
| if dx_norm > tol: | ||
| logger.warning("Split Bregman did not converge: |dU|/|U|={:.6} " | ||
| "Try increasing mu".format(dx_norm)) | ||
| fit_results = {'parameters': x} | ||
| fit_results = {'parameters': np.dot(preconditioner, x)} | ||
| return fit_results | ||
| def _objective_function(x: np.ndarray, A: np.ndarray, y: np.ndarray, | ||
| mu: float, lmbda: float, d: np.ndarray, b: np.ndarray, | ||
| AtA: np.ndarray, ftA: np.ndarray) -> np.ndarray: | ||
| def _minimize_cg(x, A, y, d, b, mu, lmbda, n_iters, gtol): | ||
| """ | ||
| Returns the objective function to be minimized. | ||
| Minimises the objective function using conjugate gradient. | ||
| The conjugate gradient algorithm is described in S. Boyd and | ||
| L. Vandenberghe, "Convex Optimization" (Cambridge University Press, 2004). | ||
| This implementation was based on the CSLD code written by Fei Zhou (周非), | ||
| Weston Nielson, Yi Xia, and Vidvuds Ozoliņš. | ||
| Parameters | ||
| ----------- | ||
| X | ||
| x | ||
| fit matrix | ||
@@ -104,63 +255,61 @@ y | ||
| d | ||
| same notation as Nelson et al. paper | ||
| same notation as Nelson, Hart paper | ||
| b | ||
| same notation as Nelson et al. paper | ||
| AtA | ||
| sensing matrix transpose times sensing matrix. | ||
| ftA | ||
| np.dot(y.conj().transpose(), A) | ||
| same notation as Nelson, Hart paper | ||
| n_iters | ||
| maximum number of iterations | ||
| gtol | ||
| gradient tolerance for convergence | ||
| """ | ||
| error_vector = np.matmul(A, x) - y | ||
| r = -(mu * np.matmul(A.T, error_vector) - lmbda * (d - b - x)) | ||
| p = r | ||
| delta = np.dot(r, r) | ||
| error_vector = np.dot(A, x) - y | ||
| for k in range(n_iters): | ||
| error_vector = np.matmul(A, p) | ||
| rp = mu * np.matmul(A.T, error_vector) + lmbda * p | ||
| alpha = delta / np.dot(p, rp) | ||
| obj_function = 0.5*np.vdot(error_vector, error_vector) | ||
| x = x + alpha * p | ||
| r = r - alpha * rp | ||
| if obj_function.imag > 0.0: | ||
| raise RuntimeError( | ||
| 'Objective function contains non-zero imaginary part.)') | ||
| delta_prev = delta | ||
| delta = np.dot(r, r) | ||
| sparseness_correction = d - b - mu*x | ||
| obj_function += 0.5*lmbda * \ | ||
| np.vdot(sparseness_correction, sparseness_correction) | ||
| if np.sqrt(delta) < gtol: | ||
| break | ||
| if obj_function.imag > 0.0: | ||
| raise RuntimeError( | ||
| 'Objective function contains non-zero imaginary part.)') | ||
| beta = delta / delta_prev | ||
| p = beta * p + r | ||
| return obj_function | ||
| sdelta = np.sqrt(delta) | ||
| if sdelta > gtol: | ||
| logger.warning("unconverged gradient, CGmin = {:12.6}".format(sdelta)) | ||
| return x | ||
| def _objective_function_derivative(x: np.ndarray, | ||
| A: np.ndarray, | ||
| y: np.ndarray, | ||
| mu: float, | ||
| lmbda: float, | ||
| d: np.ndarray, | ||
| b: np.ndarray, | ||
| AtA: np.ndarray, | ||
| ftA: np.ndarray) -> np.ndarray: | ||
| def _get_right_preconditioner(A, mu, lmbda): | ||
| """ | ||
| Returns the derivative of the objective function. | ||
| Returns the preconditioning matrix. | ||
| Calculates equation 34 of "Compressive sensing lattice dynamics. I. General | ||
| formalism Fei Zhou (周非), Weston Nielson, Yi Xia, and Vidvuds Ozoliņš | ||
| Phys. Rev. B 100, 184308" | ||
| Parameters | ||
| ----------- | ||
| X | ||
| A | ||
| fit matrix | ||
| y | ||
| target array | ||
| mu | ||
| the parameter that adjusts sparseness. | ||
| preconditioner mu parameter. | ||
| lmbda | ||
| Split Bregman parameter | ||
| d | ||
| same notation as Nelson, Hart paper | ||
| b | ||
| same notation as Nelson, Hart paper | ||
| AtA | ||
| sensing matrix transpose times sensing matrix. | ||
| ftA | ||
| np.dot(y.conj().transpose(), A) | ||
| preconditioner lmbda value | ||
| """ | ||
| ret = np.squeeze(np.dot(x[np.newaxis, :], AtA) - | ||
| ftA - lmbda*mu*(d - mu * x - b)) | ||
| return ret | ||
| ata = np.matmul(A.T, A) | ||
| ata = (ata.T + ata) / 2 | ||
| eig_val, eig_vec = scipy.linalg.eigh(ata) | ||
| over_sq_q = 1 / np.sqrt(eig_val + mu * lmbda) | ||
| return np.matmul(eig_vec, np.matmul(np.diag(over_sq_q), eig_vec.T)) |
@@ -17,3 +17,3 @@ """ | ||
| from .force_constants import SortedForceConstants | ||
| from .io.logging import logger | ||
| from .input_output.logging_tools import logger | ||
| from .calculators.numba_calc import (clusters_force_contribution, | ||
@@ -94,3 +94,3 @@ cluster_force_contribution) | ||
| return new_cluster | ||
| logger.info('Populating orbits') | ||
| logger.debug('Populating orbits') | ||
| bar = Progress(len(cs.orbits)) | ||
@@ -97,0 +97,0 @@ scR_tensormatrix_lookup = dict() |
@@ -7,6 +7,5 @@ """ | ||
| import pickle | ||
| import numpy as np | ||
| from collections import Counter | ||
| import numpy as np | ||
| from .force_constant_model import ForceConstantModel | ||
@@ -101,9 +100,12 @@ from .core.orbits import Orbit | ||
| # get cell | ||
| if hasattr(_prim, '_cell'): # 3.17 | ||
| cell = _prim._cell | ||
| pbc = _prim._pbc | ||
| else: # 3.18 | ||
| cell = _prim.cell[:] | ||
| pbc = _prim.pbc | ||
| # assume PBC True (as it has to be True in hiphive) | ||
| pbc = [True, True, True] | ||
| # finalize | ||
| new_prim = Atoms( | ||
@@ -148,4 +150,4 @@ symbols=_prim.symbols, positions=_prim.positions, cell=cell, pbc=pbc) | ||
| def orbit_data(self): | ||
| """ list : list of dictionaries containing detailed information for | ||
| each orbit, e.g. cluster radius and force constant | ||
| """ List[dict] : list of dictionaries containing detailed information for | ||
| each orbit, e.g. cluster radius and force constant | ||
| """ | ||
@@ -152,0 +154,0 @@ data = [] |
+126
-110
@@ -11,9 +11,12 @@ """ | ||
| from string import ascii_lowercase, ascii_uppercase | ||
| from typing import List, Tuple | ||
| from scipy.linalg import eig | ||
| from ase import units | ||
| from ase import Atoms, units | ||
| from .io import shengBTE as shengBTEIO | ||
| from .io import phonopy as phonopyIO | ||
| from .io.read_write_files import add_items_to_tarfile_pickle, read_items_pickle,\ | ||
| from .input_output import shengBTE as shengBTEIO | ||
| from .input_output import phonopy as phonopyIO | ||
| from .input_output.read_write_files import add_items_to_tarfile_pickle, read_items_pickle,\ | ||
| add_ase_atoms_to_tarfile, read_ase_atoms_from_tarfile | ||
| from .input_output import gpumd as gpumdIO | ||
@@ -24,3 +27,3 @@ | ||
| def __init__(self, supercell): | ||
| def __init__(self, supercell: Atoms): | ||
| if not all(supercell.pbc): | ||
@@ -35,21 +38,21 @@ raise ValueError('configuration must have periodic boundary conditions') | ||
| @property | ||
| def n_atoms(self): | ||
| """ int : number of atoms """ | ||
| def n_atoms(self) -> int: | ||
| """ number of atoms """ | ||
| return len(self.supercell) | ||
| @property | ||
| def supercell(self): | ||
| """ ase.Atoms : supercell associated with force constants """ | ||
| def supercell(self) -> Atoms: | ||
| """ supercell associated with force constants """ | ||
| return self._supercell.copy() | ||
| @property | ||
| def clusters(self): | ||
| """ list : sorted list of clusters """ | ||
| def clusters(self) -> list: | ||
| """ sorted list of clusters """ | ||
| return sorted(self._fc_dict.keys(), key=lambda c: (len(c), c)) | ||
| def __len__(self): | ||
| def __len__(self) -> int: | ||
| """ int : number of clusters (or force constants) """ | ||
| return len(self._fc_dict) | ||
| def __add__(self, fcs_other): | ||
| def __add__(self, fcs_other) -> None: | ||
| """ ForceConstants : addition of two force constants """ | ||
@@ -71,3 +74,3 @@ if type(self) != type(fcs_other): | ||
| def get_fc_dict(self, order=None): | ||
| def get_fc_dict(self, order: int = None) -> dict: | ||
| """ Returns force constant dictionary for one specific order. | ||
@@ -80,3 +83,3 @@ | ||
| ---------- | ||
| order : int | ||
| order | ||
| force constants returned for this order | ||
@@ -86,5 +89,4 @@ | ||
| ------- | ||
| dict | ||
| dictionary with keys corresponding to clusters and values to | ||
| respective force constant | ||
| dictionary with keys corresponding to clusters and values to | ||
| respective force constant | ||
| """ | ||
@@ -103,3 +105,3 @@ if order is None: | ||
| def get_fc_array(self, order, format='phonopy'): | ||
| def get_fc_array(self, order: int, format: str = 'phonopy') -> np.ndarray: | ||
| """ Returns force constants in array format for specified order. | ||
@@ -109,5 +111,5 @@ | ||
| ---------- | ||
| order : int | ||
| order | ||
| force constants for this order will be returned | ||
| format : str | ||
| format | ||
| specify which format (shape) the NumPy array should have, | ||
@@ -118,5 +120,4 @@ possible values are `phonopy` and `ase` | ||
| ------- | ||
| numpy.ndarray | ||
| NumPy array with shape `(N,)*order + (3,)*order` where `N` is | ||
| the number of atoms | ||
| NumPy array with shape `(N,)*order + (3,)*order` where `N` is | ||
| the number of atoms | ||
| """ | ||
@@ -141,10 +142,5 @@ if order not in self.orders: | ||
| def compute_gamma_frequencies(self): | ||
| """ Computes gamma frequencies using the second-order force constants. | ||
| Returns | ||
| ------- | ||
| gamma_frequencies : numpy.ndarray | ||
| Gamma frequencies in THz | ||
| """ | ||
| def compute_gamma_frequencies(self) -> np.ndarray: | ||
| """ Returns the Gamma frequencies in THz using the second-order force | ||
| constants. """ | ||
| fc2_array = self.get_fc_array(order=2) | ||
@@ -154,10 +150,10 @@ masses = self.supercell.get_masses() | ||
| def assert_acoustic_sum_rules(self, order=None, tol=1e-6): | ||
| """ Asserts that acoustic sum rules are enforced for force constants. | ||
| def assert_acoustic_sum_rules(self, order: int = None, tol: float = 1e-6): | ||
| """ Asserts that force constants obey acoustic sum rules. | ||
| Parameters | ||
| ---------- | ||
| order : int | ||
| order | ||
| specifies which order to check, if None all are checked | ||
| tol : float | ||
| tol | ||
| numeric tolerance for checking sum rules | ||
@@ -168,3 +164,3 @@ | ||
| AssertionError | ||
| if acoustic sum rules are not enforced | ||
| if acoustic sum rules are violated | ||
| """ | ||
@@ -182,3 +178,3 @@ | ||
| for order in orders: | ||
| assert_msg = 'Acoustic sum rule order {} not enforced for atom' | ||
| assert_msg = 'Acoustic sum rule for order {} violated for atom' | ||
| assert_msg += ' {}' * (order - 1) + ' x' | ||
@@ -190,6 +186,5 @@ for ijk in product(atomic_indices, repeat=order-1): | ||
| fc_sum_ijk += self[cluster] | ||
| assert np.linalg.norm(fc_sum_ijk) < tol, \ | ||
| assert_msg.format(order, *ijk) | ||
| assert np.linalg.norm(fc_sum_ijk) < tol, assert_msg.format(order, *ijk) | ||
| def print_force_constant(self, cluster): | ||
| def print_force_constant(self, cluster: Tuple[int]) -> None: | ||
| """ | ||
@@ -200,3 +195,3 @@ Prints force constants for a cluster in a nice format. | ||
| ---------- | ||
| cluster : tuple(int) | ||
| cluster | ||
| sites belonging to the cluster | ||
@@ -230,3 +225,3 @@ """ | ||
| def __str__(self): | ||
| def __str__(self) -> str: | ||
| s = [] | ||
@@ -248,3 +243,3 @@ s.append(' ForceConstants '.center(54, '=')) | ||
| def __repr__(self): | ||
| def __repr__(self) -> str: | ||
| fc_dict_str = '{{{}: {}, ..., {}: {}}}'.format( | ||
@@ -256,3 +251,4 @@ self.clusters[0], self[self.clusters[0]].round(5), | ||
| def _repr_fc(self, cluster, precision=5, suppress=True): | ||
| def _repr_fc(self, cluster: Tuple[int], | ||
| precision: float = 5, suppress: bool = True) -> str: | ||
| """ | ||
@@ -263,3 +259,3 @@ Representation for single cluster and its force constant. | ||
| ---------- | ||
| cluster : tuple | ||
| cluster | ||
| tuple of ints indicating the sites belonging to the cluster | ||
@@ -276,3 +272,3 @@ """ | ||
| def _sanity_check_dict(self, fc_dict): | ||
| def _sanity_check_dict(self, fc_dict: dict) -> None: | ||
| """ Checks that all indices in clusters are between 0 and number of | ||
@@ -285,3 +281,4 @@ atoms. """ | ||
| @classmethod | ||
| def from_arrays(cls, supercell, fc2_array=None, fc3_array=None): | ||
| def from_arrays(cls, supercell: Atoms, | ||
| fc2_array: np.ndarray = None, fc3_array: np.ndarray = None): | ||
| """ Constructs FCs from numpy arrays. | ||
@@ -293,7 +290,7 @@ | ||
| ---------- | ||
| supercell : ase.Atoms | ||
| supercell | ||
| supercell structure | ||
| fc2_array : np.ndarray | ||
| fc2_array | ||
| second-order force constant in phonopy format, i.e. must have shape (N, N, 3, 3) | ||
| fc3_array : np.ndarray | ||
| fc3_array | ||
| third-order force constant in phonopy format, i.e. must have shape (N, N, N, 3, 3, 3) | ||
@@ -323,3 +320,3 @@ """ | ||
| @classmethod | ||
| def from_sparse_dict(cls, fc_dict, supercell): | ||
| def from_sparse_dict(cls, fc_dict: dict, supercell: Atoms): | ||
| """ Assumes label symmetries, meaning only one cluster for each | ||
@@ -330,5 +327,6 @@ permuation should be included | ||
| ---------- | ||
| fc_dict : dict | ||
| fc_dict | ||
| keys corresponding to clusters and values to the force constants | ||
| supercell : ase.Atoms | ||
| supercell | ||
| atomic configuration | ||
| """ | ||
@@ -338,3 +336,3 @@ return SortedForceConstants(fc_dict, supercell=supercell) | ||
| @classmethod | ||
| def from_dense_dict(cls, fc_dict, supercell): | ||
| def from_dense_dict(cls, fc_dict: dict, supercell: Atoms): | ||
| """ All permutations of clusters that are not zero must be listed, | ||
@@ -345,5 +343,6 @@ if label symmetries are fullfilled will return a SortedForceConstants | ||
| ---------- | ||
| fc_dict : dict | ||
| fc_dict | ||
| keys corresponding to clusters and values to the force constants | ||
| supercell : ase.Atoms | ||
| supercell | ||
| atomic configuration | ||
| """ | ||
@@ -357,3 +356,3 @@ if check_label_symmetries(fc_dict): | ||
| @classmethod | ||
| def read_phonopy(cls, supercell, fname, format=None): | ||
| def read_phonopy(cls, supercell: Atoms, fname: str, format: str = None): | ||
| """ Reads force constants from a phonopy calculation. | ||
@@ -363,8 +362,9 @@ | ||
| ---------- | ||
| supercell : ase.Atoms | ||
| supercell | ||
| supercell structure (`SPOSCAR`) | ||
| fname : str | ||
| fname | ||
| name of second-order force constant file | ||
| format : str | ||
| format for second-order force constants | ||
| format | ||
| format for second-order force constants; | ||
| possible values: "text", "hdf5" | ||
| """ | ||
@@ -375,3 +375,3 @@ fc2_array = phonopyIO.read_phonopy_fc2(fname, format=format) | ||
| @classmethod | ||
| def read_phono3py(cls, supercell, fname): | ||
| def read_phono3py(cls, supercell: Atoms, fname: str): | ||
| """ Reads force constants from a phono3py calculation. | ||
@@ -381,5 +381,5 @@ | ||
| ---------- | ||
| supercell : ase.Atoms | ||
| supercell | ||
| supercell structure (`SPOSCAR`) | ||
| fname : str | ||
| fname | ||
| name of third-order force constant file | ||
@@ -391,3 +391,3 @@ """ | ||
| @classmethod | ||
| def read_shengBTE(cls, supercell, fname, prim): | ||
| def read_shengBTE(cls, supercell: Atoms, fname: str, prim: Atoms): | ||
| """ Reads third order force constants from a shengBTE calculation. | ||
@@ -399,7 +399,7 @@ | ||
| ---------- | ||
| supercell : str | ||
| supercell | ||
| supercell structure | ||
| fname : str | ||
| fname | ||
| name of third-order force constant file | ||
| prim : ase.Atoms | ||
| prim | ||
| primitive configuration (must be equivalent to structure used in | ||
@@ -412,3 +412,3 @@ the shengBTE calculation) | ||
| @classmethod | ||
| def read(cls, fname): | ||
| def read(cls, fname: str): | ||
| """ Reads ForceConstants from file. | ||
@@ -418,3 +418,3 @@ | ||
| ---------- | ||
| fname : str | ||
| fname | ||
| name of file from which to read | ||
@@ -435,3 +435,3 @@ """ | ||
| def write(self, fname): | ||
| def write(self, fname: str) -> None: | ||
| """ Writes entire ForceConstants object to file. | ||
@@ -441,3 +441,3 @@ | ||
| ---------- | ||
| fname : str | ||
| fname | ||
| name of file to which to write | ||
@@ -451,3 +451,3 @@ """ | ||
| def write_to_phonopy(self, fname, format=None): | ||
| def write_to_phonopy(self, fname: str, format: str = None) -> None: | ||
| """ | ||
@@ -458,10 +458,11 @@ Writes force constants in phonopy format. | ||
| ---------- | ||
| fname : str | ||
| fname | ||
| name of file to which to write second-order force constant | ||
| format : str | ||
| format for second-order force constants | ||
| format | ||
| format for second-order force constants; | ||
| possible values: "text", "hdf5" | ||
| """ | ||
| phonopyIO.write_phonopy_fc2(fname, self, format=format) | ||
| def write_to_phono3py(self, fname): | ||
| def write_to_phono3py(self, fname: str) -> None: | ||
| """ | ||
@@ -472,3 +473,3 @@ Writes force constants in phono3py format. | ||
| ---------- | ||
| fname : str | ||
| fname | ||
| name of file to which to write third-order force constant | ||
@@ -478,3 +479,3 @@ """ | ||
| def write_to_shengBTE(self, fname, prim, **kwargs): | ||
| def write_to_shengBTE(self, fname: str, prim: Atoms, **kwargs) -> None: | ||
| """ | ||
@@ -485,5 +486,5 @@ Writes third order force constants in shengBTE format. | ||
| ---------- | ||
| fname : str | ||
| fname | ||
| name of file to which to write third-order force constant | ||
| prim : ase.Atoms | ||
| prim | ||
| primitive configuration (must be equivalent to structure used in | ||
@@ -506,3 +507,3 @@ the shengBTE calculation) | ||
| def __init__(self, fc_dict, supercell): | ||
| def __init__(self, fc_dict: dict, supercell: Atoms) -> None: | ||
| super().__init__(supercell) | ||
@@ -513,3 +514,3 @@ self._sanity_check_dict(fc_dict) | ||
| def __getitem__(self, cluster): | ||
| def __getitem__(self, cluster: Tuple[int]): | ||
| sorted_cluster = tuple(sorted(cluster)) | ||
@@ -526,7 +527,7 @@ | ||
| @property | ||
| def orders(self): | ||
| """ list : orders for which force constants exist """ | ||
| def orders(self) -> List[int]: | ||
| """ orders for which force constants exist """ | ||
| return self._orders.copy() | ||
| def _sanity_check_dict(self, fc_dict): | ||
| def _sanity_check_dict(self, fc_dict: dict) -> None: | ||
| super()._sanity_check_dict(fc_dict) | ||
@@ -539,3 +540,21 @@ | ||
| def write_to_GPUMD(self, fname_fc, fname_clusters, order, tol=1e-10): | ||
| """ | ||
| Writes force constants of the specified order in GPUMD format. | ||
| Parameters | ||
| ---------- | ||
| fname_fc : str | ||
| name of file which contains the lookup force constants | ||
| fname_clusters : str | ||
| name of file which contains the clusters and the fc lookup index | ||
| order : int | ||
| force constants for this order will be written to file | ||
| tol : float | ||
| if the norm of a force constant is less than tol then it is not written. | ||
| if two force-constants are within tol; they are considered equal. | ||
| """ | ||
| gpumdIO.write_fcs_gpumd(fname_fc, fname_clusters, self, order, tol) | ||
| class RawForceConstants(ForceConstants): | ||
@@ -552,3 +571,3 @@ """ Force constants without label symmetries. | ||
| def __init__(self, fc_dict, supercell): | ||
| def __init__(self, fc_dict: dict, supercell: Atoms) -> None: | ||
| super().__init__(supercell) | ||
@@ -559,3 +578,3 @@ self._sanity_check_dict(fc_dict) | ||
| def __getitem__(self, cluster): | ||
| def __getitem__(self, cluster: Tuple[int]): | ||
| if cluster not in self._fc_dict.keys(): | ||
@@ -566,4 +585,4 @@ return np.zeros((3,)*len(cluster)) | ||
| @property | ||
| def orders(self): | ||
| """ list : orders for which force constants exist """ | ||
| def orders(self) -> List[int]: | ||
| """ orders for which force constants exist """ | ||
| return self._orders.copy() | ||
@@ -577,3 +596,3 @@ | ||
| def array_to_dense_dict(fc_array, fc_tol=1e-10): | ||
| def array_to_dense_dict(fc_array: np.ndarray, fc_tol: float = 1e-10) -> dict: | ||
| """ Constructs a dense dict from an fc array in phonopy format. | ||
@@ -586,5 +605,5 @@ | ||
| ---------- | ||
| fc_array : np.ndarray | ||
| fc_array | ||
| force constant array in phonopy format | ||
| fc_tol : float | ||
| fc_tol | ||
| tolerance for considering force constants zero or not | ||
@@ -608,3 +627,3 @@ """ | ||
| def check_label_symmetries(fc_dict): | ||
| def check_label_symmetries(fc_dict: dict) -> bool: | ||
| """ Checks label symmetries for dense fc dict. | ||
@@ -618,3 +637,3 @@ | ||
| ---------- | ||
| fc_dict : dict | ||
| fc_dict | ||
| keys corresponding to clusters and values to the force constants | ||
@@ -632,3 +651,3 @@ """ | ||
| def dense_dict_to_sparse_dict(fc_dict): | ||
| def dense_dict_to_sparse_dict(fc_dict: dict) -> dict: | ||
| """ Converts dense dict to sparse dict. | ||
@@ -641,3 +660,3 @@ | ||
| ---------- | ||
| fc_dict : dict | ||
| fc_dict | ||
| keys corresponding to clusters and values to the force constants | ||
@@ -652,3 +671,3 @@ """ | ||
| def symbolize_force_constant(fc, tol=1e-10): | ||
| def symbolize_force_constant(fc: np.ndarray, tol: float = 1e-10) -> np.ndarray: | ||
| """Carries out a symbolic symmetrization of a force constant tensor. | ||
@@ -658,5 +677,5 @@ | ||
| ---------- | ||
| fc : numpy.ndarray | ||
| fc | ||
| force constant tensor | ||
| tol : float | ||
| tol | ||
| tolerance used to decide whether two elements are identical | ||
@@ -666,4 +685,3 @@ | ||
| ------- | ||
| numpy.ndarray | ||
| symbolic representation of force constant matrix | ||
| symbolic representation of force constant matrix | ||
| """ | ||
@@ -687,5 +705,4 @@ fc_int = np.round(fc / tol).astype(int) | ||
| def _compute_gamma_frequencies(fc2, masses): | ||
| """ Computes Gamma frequencies using second-order force constants | ||
| def _compute_gamma_frequencies(fc2: np.ndarray, masses: List[float]) -> np.ndarray: | ||
| """ Computes Gamma frequencies using second-order force constants. | ||
| Assumes fc2 is in units of eV/A2. | ||
@@ -695,5 +712,5 @@ | ||
| ---------- | ||
| fc2 : numpy.ndarray | ||
| fc2 | ||
| second-order force constants in phonopy format | ||
| masses : list(float) | ||
| masses | ||
| mass of each atom | ||
@@ -703,4 +720,3 @@ | ||
| ------- | ||
| gamma_frequencies : numpy.ndarray | ||
| Gamma frequencies in THz | ||
| Gamma frequencies in THz | ||
| """ | ||
@@ -707,0 +723,0 @@ |
@@ -8,3 +8,3 @@ import numpy as np | ||
| generate_phonon_rattled_structures | ||
| from hiphive.io.logging import set_config | ||
| from hiphive.input_output.logging_tools import set_config | ||
@@ -67,5 +67,4 @@ | ||
| print('Creating initial model') | ||
| rattled_structures = generate_rattled_structures( | ||
| atoms_ideal, n_structures, 0.03) | ||
| prepare_structures(rattled_structures, atoms_ideal, calc) | ||
| rattled_structures = generate_rattled_structures(atoms_ideal, n_structures, 0.03) | ||
| rattled_structures = prepare_structures(rattled_structures, atoms_ideal, calc) | ||
| for structure in rattled_structures: | ||
@@ -89,3 +88,3 @@ sc.add_structure(structure) | ||
| atoms_ideal, fc2, n_structures, T) | ||
| prepare_structures(phonon_rattled_structures, atoms_ideal, calc) | ||
| phonon_rattled_structures = prepare_structures(phonon_rattled_structures, atoms_ideal, calc) | ||
@@ -92,0 +91,0 @@ # fit new model |
@@ -11,13 +11,13 @@ """ | ||
| from .io.read_write_files import (add_items_to_tarfile_hdf5, | ||
| add_items_to_tarfile_pickle, | ||
| add_items_to_tarfile_custom, | ||
| add_list_to_tarfile_custom, | ||
| read_items_hdf5, | ||
| read_items_pickle, | ||
| read_list_custom) | ||
| from .input_output.read_write_files import (add_items_to_tarfile_hdf5, | ||
| add_items_to_tarfile_pickle, | ||
| add_items_to_tarfile_custom, | ||
| add_list_to_tarfile_custom, | ||
| read_items_hdf5, | ||
| read_items_pickle, | ||
| read_list_custom) | ||
| from .cluster_space import ClusterSpace | ||
| from .force_constant_model import ForceConstantModel | ||
| from .io.logging import logger | ||
| from .input_output.logging_tools import logger | ||
| logger = logger.getChild('structure_container') | ||
@@ -181,3 +181,3 @@ | ||
| logger.info('Adding structure') | ||
| logger.debug('Adding structure') | ||
| M = self._compute_fit_matrix(atoms) | ||
@@ -260,2 +260,4 @@ structure = FitStructure(atoms_copy, M, **meta_data) | ||
| s += ['{:22} : {}'.format('Total number of structures', len(self))] | ||
| _, target_forces = self.get_fit_data() | ||
| s += ['{:22} : {}'.format('Number of force components', len(target_forces))] | ||
| s.append(''.center(n, '-')) | ||
@@ -273,8 +275,8 @@ s.append(str_structure(-1, dummy)) | ||
| """ Compute fit matrix for a single atoms object """ | ||
| logger.info('Computing fit matrix') | ||
| logger.debug('Computing fit matrix') | ||
| if atoms != getattr(self._previous_fcm, 'atoms', None): | ||
| logger.info(' Building new FCM object') | ||
| logger.debug(' Building new FCM object') | ||
| self._previous_fcm = ForceConstantModel(atoms, self._cs) | ||
| else: | ||
| logger.info(' Reusing old FCM object') | ||
| logger.debug(' Reusing old FCM object') | ||
| return self._previous_fcm.get_fit_matrix(atoms.get_array('displacements')) | ||
@@ -281,0 +283,0 @@ |
| import numpy as np | ||
| from ase.units import kB | ||
| from hiphive.io.logging import logger | ||
| from hiphive.input_output.logging_tools import logger | ||
@@ -29,3 +29,3 @@ logger = logger.getChild(__name__) | ||
| See: West and Estreicher PRL 96, 115504 (2006) | ||
| See: West and Estreicher, Physical Review Letters **96**, 115504 (2006) | ||
@@ -60,3 +60,3 @@ Parameters | ||
| """ Thermal excitation of phonon modes as described by West and | ||
| Estreicher PRL 96, 115504 (2006). | ||
| Estreicher, Physical Review Letters **96**, 115504 (2006). | ||
@@ -63,0 +63,0 @@ _s is a mode index |
@@ -58,3 +58,3 @@ """ | ||
| accepted with a probability determined from the minimum | ||
| interatomic distance :math:`d_{ij}`. If :math`\\min(d_{ij})` is | ||
| interatomic distance :math:`d_{ij}`. If :math:`\\min(d_{ij})` is | ||
| smaller than :math:`d_{min}` the move is only accepted with a low | ||
@@ -88,3 +88,3 @@ probability. | ||
| rattle amplitude (standard deviation in normal distribution); | ||
| note this value is not *directly* connected to the final | ||
| note this value is not connected to the final | ||
| average displacement for the structures | ||
@@ -97,2 +97,4 @@ d_min : float | ||
| generated | ||
| n_iter : int | ||
| number of Monte Carlo cycles | ||
@@ -99,0 +101,0 @@ Returns |
+124
-42
@@ -5,12 +5,13 @@ """ | ||
| from typing import List | ||
| from typing import List, Tuple | ||
| import numpy as np | ||
| from ase import Atoms | ||
| from ase.calculators.calculator import Calculator | ||
| from ase.calculators.singlepoint import SinglePointCalculator | ||
| from ase.geometry import find_mic | ||
| from ase.geometry import get_distances | ||
| from hiphive import ClusterSpace, ForceConstants | ||
| from hiphive.force_constant_model import ForceConstantModel | ||
| from .io.logging import logger | ||
| from ase.neighborlist import neighbor_list | ||
| from .cluster_space import ClusterSpace | ||
| from .force_constants import ForceConstants | ||
| from .input_output.logging_tools import logger | ||
@@ -21,3 +22,5 @@ | ||
| def get_displacements(atoms: Atoms, atoms_ideal: Atoms) -> np.ndarray: | ||
| def get_displacements(atoms: Atoms, | ||
| atoms_ideal: Atoms, | ||
| cell_tol: float = 1e-4) -> np.ndarray: | ||
| """Returns the the smallest possible displacements between a | ||
@@ -38,45 +41,106 @@ displaced configuration relative to an ideal (reference) | ||
| ideal configuration relative to which displacements are computed | ||
| cell_tol | ||
| cell tolerance; if cell missmatch more than tol value error is raised | ||
| """ | ||
| if not np.array_equal(atoms.numbers, atoms_ideal.numbers): | ||
| raise ValueError('Atomic numbers do not match.') | ||
| if np.linalg.norm(atoms.cell - atoms_ideal.cell) > cell_tol: | ||
| raise ValueError('Cells do not match.') | ||
| displacements = [] | ||
| for pos, ideal_pos in zip(atoms.positions, atoms_ideal.positions): | ||
| v_ij = np.array([pos - ideal_pos]) | ||
| displacements.append(find_mic(v_ij, atoms_ideal.cell, pbc=True)[0][0]) | ||
| return np.array(displacements) | ||
| raw_position_diff = atoms.positions - atoms_ideal.positions | ||
| wrapped_mic_displacements = find_mic(raw_position_diff, atoms_ideal.cell, pbc=True)[0] | ||
| return wrapped_mic_displacements | ||
| def prepare_structures(structures: List[Atoms], atoms_ideal: Atoms, | ||
| calc: Calculator) -> None: | ||
| """Prepares a set of structures in the format suitable for a | ||
| :class:`StructureContainer <hiphive.StructureContainer>`. This | ||
| includes retrieving the forces using the provided calculator, | ||
| resetting the positions to the ideal configuration, and adding | ||
| arrays with forces and displacements. | ||
| def prepare_structure(atoms: Atoms, | ||
| atoms_ideal: Atoms, | ||
| calc: SinglePointCalculator = None) -> Atoms: | ||
| """Prepare a structure in the format suitable for a | ||
| :class:`StructureContainer <hiphive.StructureContainer>`. | ||
| Notes | ||
| ----- | ||
| Changes the structures in place. | ||
| Parameters | ||
| ---------- | ||
| atoms | ||
| input structure | ||
| atoms_ideal | ||
| reference structure relative to which displacements are computed | ||
| calc | ||
| ASE calculator used for computing forces | ||
| Returns | ||
| ------- | ||
| ASE atoms object | ||
| prepared ASE atoms object with forces and displacements as arrays | ||
| """ | ||
| # get forces | ||
| if 'forces' in atoms.arrays: | ||
| forces = atoms.get_array('forces') | ||
| elif calc is not None: | ||
| atoms_tmp = atoms.copy() | ||
| atoms_tmp.calc = calc | ||
| forces = atoms_tmp.get_forces() | ||
| elif isinstance(atoms.calc, SinglePointCalculator): | ||
| forces = atoms.get_forces() | ||
| # setup new atoms | ||
| perm = find_permutation(atoms, atoms_ideal) | ||
| atoms_new = atoms.copy() | ||
| atoms_new = atoms_new[perm] | ||
| atoms_new.arrays['forces'] = forces[perm] | ||
| disps = get_displacements(atoms_new, atoms_ideal) | ||
| atoms_new.arrays['displacements'] = disps | ||
| atoms_new.positions = atoms_ideal.positions | ||
| return atoms_new | ||
| def prepare_structures(structures: List[Atoms], | ||
| atoms_ideal: Atoms, | ||
| calc: SinglePointCalculator = None) -> List[Atoms]: | ||
| """Prepares a set of structures in the format suitable for adding them to | ||
| a :class:`StructureContainer <hiphive.StructureContainer>`. | ||
| `structures` should represent a list of supercells with displacements | ||
| while `atoms_ideal` should provide the ideal reference structure (without | ||
| displacements) for the given structures. | ||
| The structures that are returned will have their positions reset to the | ||
| ideal structures. Displacements and forces will be added as arrays to the | ||
| atoms objects. | ||
| If no calculator is provided, then there must be an ASE | ||
| `SinglePointCalculator <ase.calculators.singlepoint>` object attached to | ||
| the structures or the forces should already be attached as | ||
| arrays to the structures. | ||
| If a calculator is provided then it will be used to compute the forces for | ||
| all structures. | ||
| Example | ||
| ------- | ||
| >>> db = connect('dft_training_structures.db') | ||
| >>> training_structures = [row.toatoms() for row in db.select()] | ||
| >>> training_structures = prepare_structures(training_structures, atoms_ideal) | ||
| >>> for s in training_structures: | ||
| >>> sc.add_structure(s) | ||
| Parameters | ||
| ---------- | ||
| structures | ||
| list of input configurations | ||
| list of input displaced structures | ||
| atoms_ideal | ||
| reference configuration relative to which displacements | ||
| are computed | ||
| reference structure relative to which displacements are computed | ||
| calc | ||
| ASE calculator used for computing forces | ||
| Returns | ||
| ------- | ||
| list of prepared structures with forces and displacements as arrays | ||
| """ | ||
| for atoms in structures: | ||
| atoms.set_calculator(calc) | ||
| forces = atoms.get_forces() | ||
| displacements = get_displacements(atoms, atoms_ideal) | ||
| atoms.positions = atoms_ideal.get_positions() | ||
| atoms.new_array('forces', forces) | ||
| atoms.new_array('displacements', displacements) | ||
| return [prepare_structure(s, atoms_ideal, calc) for s in structures] | ||
| def find_permutation(atoms: Atoms, atoms_ref: Atoms) -> List[int]: | ||
| def find_permutation(atoms: Atoms, | ||
| atoms_ref: Atoms) -> List[int]: | ||
| """ Returns the best permutation of atoms for mapping one | ||
@@ -131,3 +195,6 @@ configuration onto another. | ||
| def __init__(self, types, distance, count=0): | ||
| def __init__(self, | ||
| types: List[str], | ||
| distance: float, | ||
| count: int = 0): | ||
| self.types = types | ||
@@ -144,3 +211,5 @@ self.distance = distance | ||
| def get_neighbor_shells(atoms: Atoms, cutoff: float, dist_tol: float = 1e-5) -> List[Shell]: | ||
| def get_neighbor_shells(atoms: Atoms, | ||
| cutoff: float, | ||
| dist_tol: float = 1e-5) -> List[Shell]: | ||
| """ Returns a list of neighbor shells. | ||
@@ -168,9 +237,2 @@ | ||
| # TODO: Remove this once this feature have been in ASE long enough | ||
| try: | ||
| from ase.neighborlist import neighbor_list | ||
| except ImportError: | ||
| raise ImportError('get_neighbor_shells uses new functionality from ASE' | ||
| ', please update ASE in order to use this function.') | ||
| # get distances | ||
@@ -206,5 +268,8 @@ ijd = neighbor_list('ijd', atoms, cutoff) | ||
| def extract_parameters(fcs: ForceConstants, cs: ClusterSpace): | ||
| def extract_parameters(fcs: ForceConstants, | ||
| cs: ClusterSpace) -> Tuple[np.ndarray, np.ndarray, int, np.ndarray]: | ||
| """ Extracts parameters from force constants. | ||
| TODO: Rename this function with more explanatory name? | ||
| This function can be used to extract parameters to create a | ||
@@ -221,4 +286,21 @@ ForceConstantPotential from a known set of force constants. | ||
| cluster space | ||
| Returns | ||
| ------- | ||
| x : {(N,), (N, K)} ndarray | ||
| Least-squares solution. If `b` is two-dimensional, | ||
| the solutions are in the `K` columns of `x`. | ||
| residuals : {(1,), (K,), (0,)} ndarray | ||
| Sums of residuals; squared Euclidean 2-norm for each column in | ||
| ``b - a*x``. | ||
| If the rank of `a` is < N or M <= N, this is an empty array. | ||
| If `b` is 1-dimensional, this is a (1,) shape array. | ||
| Otherwise the shape is (K,). | ||
| rank : int | ||
| Rank of matrix `a`. | ||
| s : (min(M, N),) ndarray | ||
| Singular values of `a`. | ||
| """ | ||
| from .force_constant_model import ForceConstantModel | ||
| fcm = ForceConstantModel(fcs.supercell, cs) | ||
| return np.linalg.lstsq(*fcm.get_fcs_sensing(fcs), rcond=None)[0] |
+5
-4
| Metadata-Version: 1.2 | ||
| Name: hiphive | ||
| Version: 0.6 | ||
| Version: 0.7 | ||
| Summary: High-order force constants for the masses | ||
@@ -16,3 +16,4 @@ Home-page: http://hiphive.materialsmodeling.org/ | ||
| extensive tutorial can be found in the | ||
| `user guide <https://hiphive.materialsmodeling.org/>`_. | ||
| `user guide <https://hiphive.materialsmodeling.org/>`_. Complete examples of | ||
| using hiphive for force constants extaction can be found at `hiphive examples <https://gitlab.com/materials-modeling/hiphive-examples/>`_. | ||
@@ -83,4 +84,4 @@ **hiPhive** is written in Python, which allows | ||
| Also consult the `Credits <https://hiphive.materialsmodeling.org/credits>` page | ||
| of the documentation for additional references. | ||
| Also consult the `Credits <https://hiphive.materialsmodeling.org/credits>`_ | ||
| page of the documentation for additional references. | ||
@@ -87,0 +88,0 @@ **hiphive** and its development are hosted on |
+4
-3
@@ -8,3 +8,4 @@ hiPhive | ||
| extensive tutorial can be found in the | ||
| `user guide <https://hiphive.materialsmodeling.org/>`_. | ||
| `user guide <https://hiphive.materialsmodeling.org/>`_. Complete examples of | ||
| using hiphive for force constants extaction can be found at `hiphive examples <https://gitlab.com/materials-modeling/hiphive-examples/>`_. | ||
@@ -75,4 +76,4 @@ **hiPhive** is written in Python, which allows | ||
| Also consult the `Credits <https://hiphive.materialsmodeling.org/credits>` page | ||
| of the documentation for additional references. | ||
| Also consult the `Credits <https://hiphive.materialsmodeling.org/credits>`_ | ||
| page of the documentation for additional references. | ||
@@ -79,0 +80,0 @@ **hiphive** and its development are hosted on |
+3
-2
@@ -38,5 +38,6 @@ #!/usr/bin/env python3 | ||
| 'numpy>=1.12', | ||
| 'sklearn', | ||
| 'scipy', | ||
| 'scikit-learn', | ||
| 'spglib', | ||
| 'sympy'], | ||
| 'sympy>=1.1'], | ||
| packages=find_packages(), | ||
@@ -43,0 +44,0 @@ classifiers=[ |
| import pickle | ||
| def _write_pickle(fname, data): | ||
| """ Write data to pickle file with filename fname """ | ||
| with open(fname, 'wb') as handle: | ||
| pickle.dump(data, handle, protocol=pickle.HIGHEST_PROTOCOL) | ||
| def _read_pickle(fname): | ||
| """ Read pickle file and return content """ | ||
| with open(fname, 'rb') as handle: | ||
| return pickle.load(handle) |
| """ This module contains functions and variables to control hiPhive's logging | ||
| * `logger` - the module logger | ||
| * set_config - function to control level and logfile | ||
| """ | ||
| import logging | ||
| import sys | ||
| from timeit import default_timer as timer | ||
| # This is the root logger of hiPhive | ||
| logger = logging.getLogger('hiphive') | ||
| # Will process all levels of INFO or higher | ||
| logger.setLevel(logging.INFO) | ||
| # If you know what you are doing you may set this to True | ||
| logger.propagate = False | ||
| # The hiPhive logger will collect events from childs and the default behaviour | ||
| # is to print it directly to stdout | ||
| ch = logging.StreamHandler(sys.stdout) | ||
| logger.addHandler(ch) | ||
| continuous_logging = False | ||
| # TODO: use Context management protocol instead | ||
| class Progress: | ||
| """ Progress bar like functionality. """ | ||
| def __init__(self, tot=None, mode='frac', estimate_remaining=True): | ||
| if tot is None: | ||
| self._tot = '?' | ||
| assert not estimate_remaining | ||
| else: | ||
| self._tot = tot | ||
| self._progress = 0 | ||
| self._estimate_remaining = estimate_remaining | ||
| self._last_update = 0 | ||
| self._start = timer() | ||
| def tick(self): | ||
| self._progress += 1 | ||
| delta = timer() - self._last_update | ||
| if continuous_logging and delta > 2: | ||
| self._last_update = timer() | ||
| print('\r' + ' ' * 70 + '\r', end='', flush=True) | ||
| print('{}/{}={:.3%}'.format(self._progress, | ||
| self._tot, self._progress/self._tot), end='', flush=True) | ||
| if self._estimate_remaining and self._tot != '?': | ||
| remaining_time = (self._tot - self._progress) * ( | ||
| timer() - self._start) / self._progress | ||
| print(' time remaining: {:.3}'.format(remaining_time), end='', | ||
| flush=True) | ||
| def close(self): | ||
| if continuous_logging: | ||
| print('\r' + ' ' * 70 + '\r', end='', flush=True) | ||
| s = timer() - self._start | ||
| d, remainder = divmod(s, 60*60*24) | ||
| h, remainder = divmod(remainder, 60*60) | ||
| m, s = divmod(remainder, 60) | ||
| logger.info('Done in {}d {}h {}m {:.3}s' | ||
| .format(int(d), int(h), int(m), s)) | ||
| def set_config(filename=None, level=None, continuous=None): | ||
| """ | ||
| Alters the way logging is handled. | ||
| Parameters | ||
| ---------- | ||
| filename : str | ||
| name of file the log should be sent to | ||
| level : int | ||
| verbosity level; see `Python logging library | ||
| <https://docs.python.org/3/library/logging.html>`_ for details | ||
| continuous : bool | ||
| if True the progress will be continously updated | ||
| """ | ||
| # If a filename is provided a logfile will be created | ||
| if filename is not None: | ||
| fh = logging.FileHandler(filename) | ||
| logger.addHandler(fh) | ||
| if level is not None: | ||
| logger.setLevel(level) | ||
| if continuous is not None: | ||
| global continuous_logging | ||
| continuous_logging = continuous |
| """ | ||
| This module provides functions for reading and writing data files | ||
| in phonopy and phono3py formats. | ||
| """ | ||
| import warnings | ||
| import os | ||
| import numpy as np | ||
| from itertools import product | ||
| import hiphive | ||
| from ..io.logging import logger | ||
| with warnings.catch_warnings(): | ||
| warnings.filterwarnings('ignore', category=FutureWarning) | ||
| import h5py | ||
| logger = logger.getChild('io_phonopy') | ||
| def _filename_to_format(filename): | ||
| """ Tries to guess the format from the filename """ | ||
| basename = os.path.basename(filename) | ||
| if basename == 'FORCE_CONSTANTS': | ||
| return 'text' | ||
| if '.' in basename: | ||
| extension = basename.split('.')[-1] | ||
| if extension == 'hdf5': | ||
| return 'hdf5' | ||
| raise ValueError('Could not guess file format') | ||
| def read_phonopy_fc2(filename, format=None): | ||
| """Parses a second-order force constant file in phonopy format. | ||
| Parameters | ||
| ---------- | ||
| filename : str | ||
| input file name | ||
| format : str | ||
| specify the file-format, if None tries to guess the format from filename | ||
| Returns | ||
| ------- | ||
| numpy.ndarray | ||
| second-order force constant matrix (`N,N,3,3` array) | ||
| where `N` is the number of atoms | ||
| """ | ||
| fc2_readers = { | ||
| 'text': _read_phonopy_fc2_text, | ||
| 'hdf5': _read_phonopy_fc2_hdf5 | ||
| } | ||
| if format is None: | ||
| format = _filename_to_format(filename) | ||
| if format not in fc2_readers.keys(): | ||
| raise ValueError('Did not recognize format {}'.format(format)) | ||
| return fc2_readers[format](filename) | ||
| def write_phonopy_fc2(filename, fc2, format=None): | ||
| """Writes second-order force constant matrix in phonopy format. | ||
| Parameters | ||
| ---------- | ||
| filename : str | ||
| output file name | ||
| fc2 : ForceConstants or numpy.ndarray | ||
| second-order force constant matrix | ||
| format : str | ||
| specify the file-format, if None try to guess the format from filename | ||
| """ | ||
| fc2_writers = { | ||
| 'text': _write_phonopy_fc2_text, | ||
| 'hdf5': _write_phonopy_fc2_hdf5 | ||
| } | ||
| # get fc2_array | ||
| if isinstance(fc2, hiphive.force_constants.ForceConstants): | ||
| fc2_array = fc2.get_fc_array(order=2) | ||
| elif isinstance(fc2, np.ndarray): | ||
| fc2_array = fc2 | ||
| else: | ||
| raise TypeError('fc2 should be ForceConstants or NumPy array') | ||
| # check that fc2 has correct shape | ||
| n_atoms = fc2_array.shape[0] | ||
| if fc2_array.shape != (n_atoms, n_atoms, 3, 3): | ||
| raise ValueError('fc2 has wrong shape') | ||
| # write | ||
| if format is None: | ||
| format = _filename_to_format(filename) | ||
| if format not in fc2_writers.keys(): | ||
| raise ValueError('Did not recognize format {}'.format(format)) | ||
| fc2_writers[format](filename, fc2_array) | ||
| def read_phonopy_fc3(filename): | ||
| """Parses a third order force constant file in phonopy hdf5 format. | ||
| Parameters | ||
| ---------- | ||
| filename : str | ||
| input file name | ||
| Returns | ||
| ------- | ||
| numpy.ndarray | ||
| third order force constant matrix | ||
| """ | ||
| with h5py.File(filename, 'r') as hf: | ||
| if 'fc3' in hf.keys(): | ||
| fc3 = hf['fc3'][:] | ||
| else: | ||
| raise IOError('Could not find fc3 in file {}'.format(filename)) | ||
| return fc3 | ||
| def write_phonopy_fc3(filename, fc3): | ||
| """Writes third order force constant matrix in phonopy hdf5 format. | ||
| Parameters | ||
| ---------- | ||
| filename : str | ||
| output file name | ||
| fc3 : ForceConstants or numpy.ndarray | ||
| third order force constant matrix | ||
| """ | ||
| if isinstance(fc3, hiphive.force_constants.ForceConstants): | ||
| fc3_array = fc3.get_fc_array(order=3) | ||
| elif isinstance(fc3, np.ndarray): | ||
| fc3_array = fc3 | ||
| else: | ||
| raise TypeError('fc3 should be ForceConstants or NumPy array') | ||
| # check that fc3 has correct shape | ||
| n_atoms = fc3_array.shape[0] | ||
| if fc3_array.shape != (n_atoms, n_atoms, n_atoms, 3, 3, 3): | ||
| raise ValueError('fc3 has wrong shape') | ||
| with h5py.File(filename, 'w') as hf: | ||
| hf.create_dataset('fc3', data=fc3_array, compression='gzip') | ||
| hf.flush() | ||
| def _read_phonopy_fc2_text(filename): | ||
| """ Reads phonopy-fc2 file in text format. """ | ||
| with open(filename, 'r') as f: | ||
| # read shape of force constants | ||
| line = f.readline() | ||
| line_ints = [int(x) for x in line.split()] | ||
| if len(line_ints) == 1: | ||
| n_atoms = line_ints[0] | ||
| elif len(line_ints) == 2: | ||
| assert line_ints[0] == line_ints[1] | ||
| n_atoms = line_ints[0] | ||
| else: | ||
| raise ValueError('Unsupported or incorrect phonopy format') | ||
| fc2 = np.full((n_atoms, n_atoms, 3, 3), np.nan) | ||
| # read force constants | ||
| lines = f.readlines() | ||
| for n, line in enumerate(lines): | ||
| flds = line.split() | ||
| if len(flds) == 2: | ||
| i = int(flds[0]) - 1 # phonopy index starts with 1 | ||
| j = int(flds[1]) - 1 | ||
| for x in range(3): | ||
| fc_row = lines[n + x + 1].split() | ||
| for y in range(3): | ||
| fc2[i][j][x][y] = float(fc_row[y]) | ||
| return fc2 | ||
| def _read_phonopy_fc2_hdf5(filename): | ||
| """ Reads phonopy-fc2 file in hdf5 format. """ | ||
| with h5py.File(filename, 'r') as hf: | ||
| if 'force_constants' in hf.keys(): | ||
| fc2 = hf['force_constants'][:] | ||
| elif 'fc2' in hf.keys(): | ||
| fc2 = hf['fc2'][:] | ||
| else: | ||
| raise IOError('Could not find fc2 in file {}'.format(filename)) | ||
| return fc2 | ||
| def _write_phonopy_fc2_text(filename, fc2): | ||
| """ Writes second-order force constants to file in text format. """ | ||
| n_atoms = fc2.shape[0] | ||
| with open(filename, 'w') as f: | ||
| f.write('{:5d} {:5d}\n'.format(n_atoms, n_atoms)) | ||
| for i, j in product(range(n_atoms), repeat=2): | ||
| fc2_ij = fc2[(i, j)] | ||
| if fc2_ij.shape != (3, 3): | ||
| raise ValueError('Invalid shape for fc2[({},{})]'.format(i, j)) | ||
| f.write('{:-5d}{:5d}\n'.format(i + 1, j + 1)) | ||
| for row in fc2_ij: | ||
| f.write((3*' {:22.15f}'+'\n').format(*tuple(row))) | ||
| def _write_phonopy_fc2_hdf5(filename, fc2): | ||
| """ Writes second-order force constants to file in hdf5 format. """ | ||
| with h5py.File(filename, 'w') as hf: | ||
| hf.create_dataset('fc2', data=fc2, compression='gzip') | ||
| hf.flush() |
| from itertools import product | ||
| import numpy as np | ||
| def _obj2str(a, none_char='-'): | ||
| """ Casts object a to str. """ | ||
| if isinstance(a, float): | ||
| # if the float is 2.49999999 then round | ||
| if str(a)[::-1].find('.') > 5: | ||
| return '{:.5f}'.format(a) | ||
| elif a is None: | ||
| return none_char | ||
| return str(a) | ||
| _array2str = np.vectorize(_obj2str) | ||
| def print_table(matrix, sum_=False): | ||
| """ Prints matrix data in a nice table format. | ||
| The matrix element matrix[i][j] should correspond to information about | ||
| order j+2 and n-body i+1. | ||
| Example | ||
| -------- | ||
| >> matrix = numpy.array([[None, None], [4.0, 3.0]]) | ||
| >> print_table(matrix) | ||
| body/order | 2 | 3 | ||
| ------------------------ | ||
| 1 | - | - | ||
| 2 | 4.0 | 3.0 | ||
| Parameters | ||
| ---------- | ||
| matrix : numpy.ndarray | ||
| matrix to be printed | ||
| sum: : bool | ||
| whether or not to print the sum along each row and column | ||
| """ | ||
| table_str = table_array_to_string(matrix, sum_) | ||
| print(table_str) | ||
| def table_array_to_string(matrix, sum_=False): | ||
| """ Generate nice table string from a numpy array with floats/ints """ | ||
| table_array = _generate_table_array(matrix, sum_) | ||
| table_array_str = _array2str(table_array) | ||
| table_str = _generate_table_str(table_array_str) | ||
| return table_str | ||
| def _generate_table_array(table_array, sum_=False): | ||
| """ Generate table in numpy array format """ | ||
| # initialze table | ||
| n_rows, n_cols = table_array.shape | ||
| A = _build_table_frame(order=n_cols+1, nbody=n_rows, sum_=sum_) | ||
| # fill table | ||
| for order, nbody in product(range(2, n_cols+2), range(1, n_rows+1)): | ||
| if nbody <= order: | ||
| A[nbody, order-1] = table_array[nbody-1, order-2] | ||
| if sum_: | ||
| for i, row in enumerate(A[1:-1, 1:-1], start=1): | ||
| A[i, -1] = sum(val for val in row if val is not None) | ||
| for i, col in enumerate(A[1:-1, 1:-1].T, start=1): | ||
| A[-1, i] = sum(val for val in col if val is not None) | ||
| A[-1, -1] = '' | ||
| return A | ||
| def _generate_table_str(table_array): | ||
| """ Generate a string from a numpy array of strings """ | ||
| table_str = [] | ||
| n_rows, n_cols = table_array.shape | ||
| # find maximum widths for each column | ||
| widths = [] | ||
| for i in range(n_cols): | ||
| widths.append(max(len(val) for val in table_array[:, i])+2) | ||
| # formatting str for each row | ||
| row_format = '|'.join('{:^'+str(width)+'}' for width in widths) | ||
| # finalize | ||
| for i in range(n_rows): | ||
| if i == 1: | ||
| table_str.append('-' * (sum(widths)+n_cols-1)) | ||
| table_str.append(row_format.format(*table_array[i, :])) | ||
| table_str = '\n'.join(table_str) | ||
| return table_str | ||
| def _build_table_frame(order, nbody, sum_=False): | ||
| """ Builds/initializes table/array. """ | ||
| if sum_: | ||
| A = np.empty((nbody+2, order+1), dtype='object') | ||
| A[0, -1] = 'sum' | ||
| A[-1, 0] = 'sum' | ||
| else: | ||
| A = np.empty((nbody+1, order), dtype='object') | ||
| A[0][0] = 'body/order' | ||
| A[0, 1:order] = range(2, order+1) | ||
| A[1:nbody+1, 0] = range(1, nbody+1) | ||
| return A | ||
| def _print_table(table): | ||
| """ Prints 2D array with str elements. """ | ||
| n_rows, n_cols = table.shape | ||
| # find maximum widths for each column | ||
| widths = [] | ||
| for i in range(n_cols): | ||
| widths.append(max(len(val) for val in table[:, i])+2) | ||
| # formatting str for each row | ||
| row_format = '|'.join('{:^'+str(width)+'}' for width in widths) | ||
| for i in range(n_rows): | ||
| if i == 1: | ||
| print('-'*(sum(widths)+n_rows-1)) | ||
| print(row_format.format(*table[i, :])) | ||
| if __name__ == "__main__": | ||
| # input dummy cutoff table | ||
| # insert row for nbody=1 | ||
| cutoffs = np.array([[None, None, None, None, None], | ||
| [6.0, 6.0, 6.0, 3.7, 3.7], | ||
| [5.0, 5.0, 5.0, 3.0, 3.0], | ||
| [3.7, 3.7, 3.7, 0.0, 0.0]]) | ||
| # input dummy cluster count table | ||
| cluster_counts = np.array([[1, 3, 5, 5, 2], | ||
| [12, 22, 39, 42, 58], | ||
| [19, 41, 123, 421, 912], | ||
| [42, 112, 410, 617, 3271]]) | ||
| print_table(cutoffs) | ||
| print('\n') | ||
| print_table(cluster_counts, sum_=False) | ||
| print('\n') | ||
| print_table(cluster_counts, sum_=True) |
| """ | ||
| Helper functions for reading and writing objects to tar files | ||
| """ | ||
| import pickle | ||
| import tempfile | ||
| import warnings | ||
| import ase.io as aseIO | ||
| with warnings.catch_warnings(): | ||
| warnings.filterwarnings('ignore', category=FutureWarning) | ||
| import h5py | ||
| def add_ase_atoms_to_tarfile(tar_file, atoms, arcname, format='json'): | ||
| """ Adds an ase.Atoms object to tar_file """ | ||
| temp_file = tempfile.NamedTemporaryFile() | ||
| aseIO.write(temp_file.name, atoms, format=format) | ||
| temp_file.seek(0) | ||
| tar_info = tar_file.gettarinfo(arcname=arcname, fileobj=temp_file) | ||
| tar_file.addfile(tar_info, temp_file) | ||
| def read_ase_atoms_from_tarfile(tar_file, arcname, format='json'): | ||
| """ Reads ase.Atoms from tar file """ | ||
| temp_file = tempfile.NamedTemporaryFile() | ||
| temp_file.write(tar_file.extractfile(arcname).read()) | ||
| temp_file.seek(0) | ||
| atoms = aseIO.read(temp_file.name, format=format) | ||
| return atoms | ||
| def add_items_to_tarfile_hdf5(tar_file, items, arcname): | ||
| """ Add items to one hdf5 file """ | ||
| temp_file = tempfile.NamedTemporaryFile() | ||
| hf = h5py.File(temp_file.name) | ||
| for key, value in items.items(): | ||
| hf.create_dataset(key, data=value, compression='gzip') | ||
| hf.close() | ||
| temp_file.seek(0) | ||
| tar_info = tar_file.gettarinfo(arcname=arcname, fileobj=temp_file) | ||
| tar_file.addfile(tar_info, temp_file) | ||
| temp_file.close() | ||
| def add_items_to_tarfile_pickle(tar_file, items, arcname): | ||
| """ Add items by pickling them """ | ||
| temp_file = tempfile.TemporaryFile() | ||
| pickle.dump(items, temp_file) | ||
| temp_file.seek(0) | ||
| tar_info = tar_file.gettarinfo(arcname=arcname, fileobj=temp_file) | ||
| tar_file.addfile(tar_info, temp_file) | ||
| temp_file.close() | ||
| def add_items_to_tarfile_custom(tar_file, items): | ||
| """ Add items assuming they have a custom write function """ | ||
| for key, value in items.items(): | ||
| temp_file = tempfile.TemporaryFile() | ||
| value.write(temp_file) | ||
| temp_file.seek(0) | ||
| tar_info = tar_file.gettarinfo(arcname=key, fileobj=temp_file) | ||
| tar_file.addfile(tar_info, temp_file) | ||
| temp_file.close() | ||
| def add_list_to_tarfile_custom(tar_file, objects, arcname): | ||
| """ Add list of objects assuming they have a custom write function """ | ||
| for i, obj in enumerate(objects): | ||
| temp_file = tempfile.TemporaryFile() | ||
| obj.write(temp_file) | ||
| temp_file.seek(0) | ||
| fname = '{}_{}'.format(arcname, i) | ||
| tar_info = tar_file.gettarinfo(arcname=fname, fileobj=temp_file) | ||
| tar_file.addfile(tar_info, temp_file) | ||
| temp_file.close() | ||
| def read_items_hdf5(tar_file, arcname): | ||
| """ Read items from hdf5file inside tar_file """ | ||
| # read hdf5 | ||
| temp_file = tempfile.NamedTemporaryFile() | ||
| temp_file.write(tar_file.extractfile(arcname).read()) | ||
| temp_file.seek(0) | ||
| hf = h5py.File(temp_file.name, 'r') | ||
| items = {key: value[:] for key, value in hf.items()} | ||
| hf.close() | ||
| return items | ||
| def read_items_pickle(tar_file, arcname): | ||
| items = dict() | ||
| items = pickle.load(tar_file.extractfile(arcname)) | ||
| return items | ||
| def read_list_custom(tar_file, arcname, read_function, **kwargs): | ||
| objects = [] | ||
| i = 0 | ||
| while True: | ||
| try: | ||
| fname = '{}_{}'.format(arcname, i) | ||
| f = tar_file.extractfile(fname) | ||
| obj = read_function(f, **kwargs) | ||
| objects.append(obj) | ||
| f.close() | ||
| except KeyError: | ||
| break | ||
| i += 1 | ||
| return objects |
| """ | ||
| The ``io.shengBTE`` module provides functions for reading and writing data | ||
| files in shengBTE format. | ||
| """ | ||
| import numpy as np | ||
| from itertools import product | ||
| from collections import namedtuple | ||
| from ..core.structures import Supercell | ||
| import hiphive | ||
| def read_shengBTE_fc3(filename, prim, supercell, symprec=1e-5): | ||
| """Reads third-order force constants file in shengBTE format. | ||
| Parameters | ||
| ---------- | ||
| filename : str | ||
| input file name | ||
| prim : ase.Atoms | ||
| primitive cell for the force constants | ||
| supercell : ase.Atoms | ||
| supercell onto which to map force constants | ||
| symprec : float | ||
| structural symmetry tolerance | ||
| Returns | ||
| ------- | ||
| fcs : ForceConstants | ||
| third order force constants for the specified supercell | ||
| """ | ||
| raw_sheng = _read_raw_sheng(filename) | ||
| sheng = _raw_to_fancy(raw_sheng, prim.cell) | ||
| fcs = _sheng_to_fcs(sheng, prim, supercell, symprec) | ||
| return fcs | ||
| def write_shengBTE_fc3(filename, fcs, prim, symprec=1e-5, cutoff=np.inf, | ||
| fc_tol=1e-8): | ||
| """Writes third-order force constants file in shengBTE format. | ||
| Parameters | ||
| ----------- | ||
| filename : str | ||
| input file name | ||
| fcs : ForceConstants | ||
| force constants; the supercell associated with this object must be | ||
| based on prim | ||
| prim : ase.Atoms | ||
| primitive configuration (must be equivalent to structure used in the | ||
| shengBTE calculation) | ||
| symprec : float | ||
| structural symmetry tolerance | ||
| cutoff : float | ||
| all atoms in cluster must be within this cutoff | ||
| fc_tol : float | ||
| if the absolute value of the largest entry in a force constant is less | ||
| than fc_tol it will not be written | ||
| """ | ||
| sheng = _fcs_to_sheng(fcs, prim, symprec, cutoff, fc_tol) | ||
| raw_sheng = _fancy_to_raw(sheng) | ||
| _write_raw_sheng(raw_sheng, filename) | ||
| def _read_raw_sheng(filename): | ||
| """ Read shengBTE fc3 file. | ||
| Parameters | ||
| ---------- | ||
| filename : str | ||
| input file name | ||
| Returns | ||
| ------- | ||
| list | ||
| list with shengBTE block entries, where an entry consist of | ||
| [i, j, k, cell_pos2, cell_pos3, fc3] | ||
| """ | ||
| lines_per_fc_block = 32 | ||
| fc3_data_shengBTE = [] | ||
| with open(filename, 'r') as f: | ||
| lines = f.readlines() | ||
| num_fcs = int(lines[0]) | ||
| lind = 2 | ||
| for i in range(1, num_fcs+1): | ||
| # sanity check | ||
| assert int(lines[lind]) == i, (int(lines[lind]), i) | ||
| # read cell poses | ||
| cell_pos2 = np.array([float(fld) for fld in lines[lind+1].split()]) | ||
| cell_pos3 = np.array([float(fld) for fld in lines[lind+2].split()]) | ||
| # read basis indices | ||
| i, j, k = (int(fld) for fld in lines[lind+3].split()) | ||
| # read fc | ||
| fc3_ijk = np.zeros((3, 3, 3)) | ||
| for n in range(27): | ||
| x, y, z = [int(fld) - 1 for fld in lines[lind+4+n].split()[:3]] | ||
| fc3_ijk[x, y, z] = float(lines[lind+4+n].split()[-1]) | ||
| # append entry | ||
| entry = [i, j, k, cell_pos2, cell_pos3, fc3_ijk] | ||
| fc3_data_shengBTE.append(entry) | ||
| lind += lines_per_fc_block | ||
| return fc3_data_shengBTE | ||
| _ShengEntry = namedtuple('Entry', ['site_0', 'site_1', 'site_2', 'pos_1', | ||
| 'pos_2', 'fc', 'offset_1', 'offset_2']) | ||
| def _raw_to_fancy(raw_sheng, cell): | ||
| """ | ||
| Converts force constants as read from shengBTE file to the namedtuple | ||
| format defined above (_ShengEntry). | ||
| """ | ||
| sheng = [] | ||
| for raw_entry in raw_sheng: | ||
| p1, p2 = raw_entry[3:5] | ||
| offset_1 = np.linalg.solve(cell.T, p1).round(0).astype(int) | ||
| offset_2 = np.linalg.solve(cell.T, p2).round(0).astype(int) | ||
| entry = _ShengEntry(*(i - 1 for i in raw_entry[:3]), *raw_entry[3:], | ||
| offset_1, offset_2) | ||
| sheng.append(entry) | ||
| return sheng | ||
| def _fancy_to_raw(sheng): | ||
| """ | ||
| Converts force constants namedtuple format defined above (_ShengEntry) to | ||
| format used for writing shengBTE files. | ||
| """ | ||
| raw_sheng = [] | ||
| for entry in sheng: | ||
| raw_entry = list(entry[:6]) | ||
| raw_entry[0] += 1 | ||
| raw_entry[1] += 1 | ||
| raw_entry[2] += 1 | ||
| raw_sheng.append(raw_entry) | ||
| return raw_sheng | ||
| def _write_raw_sheng(raw_sheng, filename): | ||
| """ See corresponding read function. """ | ||
| with open(filename, 'w') as f: | ||
| f.write('{}\n\n'.format(len(raw_sheng))) | ||
| for index, fc3_row in enumerate(raw_sheng, start=1): | ||
| i, j, k, cell_pos2, cell_pos3, fc3_ijk = fc3_row | ||
| f.write('{:5d}\n'.format(index)) | ||
| f.write((3*'{:14.10f} '+'\n').format(*cell_pos2)) | ||
| f.write((3*'{:14.10f} '+'\n').format(*cell_pos3)) | ||
| f.write((3*'{:5d}'+'\n').format(i, j, k)) | ||
| for x, y, z in product(range(3), repeat=3): | ||
| f.write((3*' {:}').format(x+1, y+1, z+1)) | ||
| f.write(' {:14.10f}\n'.format(fc3_ijk[x, y, z])) | ||
| f.write('\n') | ||
| def _fcs_to_sheng(fcs, prim, symprec, cutoff, fc_tol): | ||
| """ Produces a list containing fcs in shengBTE format | ||
| prim and fcs.supercell must be aligned. | ||
| """ | ||
| supercell = Supercell(fcs.supercell, prim, symprec) | ||
| assert all(fcs.supercell.pbc) and all(prim.pbc) | ||
| n_atoms = len(supercell) | ||
| D = fcs.supercell.get_all_distances(mic=False, vector=True) | ||
| D_mic = fcs.supercell.get_all_distances(mic=True, vector=True) | ||
| M = np.eye(n_atoms, dtype=bool) | ||
| for i in range(n_atoms): | ||
| for j in range(i + 1, n_atoms): | ||
| M[i, j] = (np.allclose(D[i, j], D_mic[i, j], atol=symprec, rtol=0) | ||
| and np.linalg.norm(D[i, j]) < cutoff) | ||
| M[j, i] = M[i, j] | ||
| data = {} | ||
| for a0 in supercell: | ||
| for a1 in supercell: | ||
| if not M[a0.index, a1.index]: | ||
| continue | ||
| for a2 in supercell: | ||
| if not (M[a0.index, a2.index] and M[a1.index, a2.index]): | ||
| continue | ||
| offset_1 = np.subtract(a1.offset, a0.offset) | ||
| offset_2 = np.subtract(a2.offset, a0.offset) | ||
| sites = (a0.site, a1.site, a2.site) | ||
| key = sites + tuple(offset_1) + tuple(offset_2) | ||
| ijk = (a0.index, a1.index, a2.index) | ||
| fc = fcs[ijk] | ||
| if key in data: | ||
| assert np.allclose(data[key], fc, atol=fc_tol) | ||
| else: | ||
| data[key] = fc | ||
| sheng = [] | ||
| for k, fc in data.items(): | ||
| if np.max(np.abs(fc)) < fc_tol: | ||
| continue | ||
| offset_1 = k[3:6] | ||
| pos_1 = np.dot(offset_1, prim.cell) | ||
| offset_2 = k[6:9] | ||
| pos_2 = np.dot(offset_2, prim.cell) | ||
| entry = _ShengEntry(*k[:3], pos_1, pos_2, fc, offset_1, offset_2) | ||
| sheng.append(entry) | ||
| return sheng | ||
| def _sheng_to_fcs(sheng, prim, supercell, symprec): | ||
| supercell_map = Supercell(supercell, prim, symprec) | ||
| fc_array = np.zeros((len(supercell),) * 3 + (3,) * 3) | ||
| mapped_clusters = np.zeros((len(supercell),) * 3, dtype=bool) | ||
| for atom in supercell_map: | ||
| i = atom.index | ||
| for entry in sheng: | ||
| if atom.site != entry.site_0: | ||
| continue | ||
| j = supercell_map.index(entry.site_1, entry.offset_1 + atom.offset) | ||
| k = supercell_map.index(entry.site_2, entry.offset_2 + atom.offset) | ||
| ijk = (i, j, k) | ||
| if mapped_clusters[ijk]: | ||
| raise Exception('Ambiguous force constant.' | ||
| ' Supercell is too small') | ||
| fc_array[ijk] = entry.fc | ||
| mapped_clusters[ijk] = True | ||
| fcs = hiphive.force_constants.ForceConstants.from_arrays( | ||
| supercell, fc3_array=fc_array) | ||
| return fcs |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
338180
7.14%59
3.51%7855
6.62%