xdat
Advanced tools
+1
-1
| Metadata-Version: 2.4 | ||
| Name: xdat | ||
| Version: 0.1.298 | ||
| Version: 0.1.299 | ||
| Summary: eXtended Data Analysis Toolkit | ||
@@ -5,0 +5,0 @@ Home-page: https://bitbucket.org/hermetric/xdat/ |
| Metadata-Version: 2.4 | ||
| Name: xdat | ||
| Version: 0.1.298 | ||
| Version: 0.1.299 | ||
| Summary: eXtended Data Analysis Toolkit | ||
@@ -5,0 +5,0 @@ Home-page: https://bitbucket.org/hermetric/xdat/ |
+1
-1
@@ -1,1 +0,1 @@ | ||
| 0.1.298 | ||
| 0.1.299 |
+74
-40
@@ -13,2 +13,4 @@ import hashlib | ||
| from concurrent.futures import ThreadPoolExecutor | ||
| from functools import partial | ||
| import pickle | ||
@@ -30,2 +32,3 @@ import portalocker | ||
| import imgaug.augmenters as iaa | ||
| from tqdm import tqdm | ||
@@ -253,2 +256,5 @@ | ||
| if mode == 'predict': | ||
| mode = 'test' | ||
| assert mode in ['fit', 'val', 'test', 'prod'], mode | ||
@@ -275,3 +281,3 @@ | ||
| self.mode = mode | ||
| self.n_jobs = n_jobs | ||
| self.n_jobs = self.n_jobs_orig = n_jobs | ||
| self.n_jobs_batch_size = n_jobs_batch_size | ||
@@ -419,22 +425,11 @@ self.dtype = dtype | ||
| # | ||
| # prepare generator | ||
| self.clear_cache('__calc_XYw_', warn_if_missing=False) | ||
| def iterate_samples(): | ||
| for idx, row in enumerate(iterrows(df)): | ||
| sample = self.cached('__calc_XYw_', self._get_sample_wrapper, row, _as_pickle=True) | ||
| if sample is not None: | ||
| yield idx, sample | ||
| # | ||
| # getting sample0 for shape information | ||
| sample0 = None | ||
| for _, sample0 in iterate_samples(): | ||
| break | ||
| for _, row in enumerate(iterrows(df)): | ||
| sample0 = self._get_sample_wrapper(row) | ||
| if sample0 is not None: | ||
| break | ||
| assert sample0 is not None, "At least one sample should be DataSample" | ||
| self.log('- calculating results size') | ||
| indices_df = [idx for idx, _ in iterate_samples()] | ||
| arr_size = len(indices_df) | ||
| # | ||
@@ -465,34 +460,73 @@ # initialize data format: | ||
| # | ||
| # build data structures | ||
| def stack_input_from_gen(gen, shape): | ||
| arr = np.empty((arr_size, *shape), dtype=self.dtype) | ||
| self.log('- preallocating arrays') | ||
| max_size = len(df) | ||
| for idx, a in enumerate(gen): | ||
| arr[idx] = a | ||
| return arr | ||
| final_inputs = [] | ||
| final_inputs = [np.empty((max_size, *shape), dtype=self.dtype) for shape in self._input_shapes] | ||
| final_outputs = None | ||
| final_weights = None | ||
| self.log(' + stacking inputs') | ||
| for idx, shape in enumerate(self._input_shapes): | ||
| arr = stack_input_from_gen((s.inputs[idx] for _, s in iterate_samples()), shape) | ||
| # arr = arr.reshape([arr_size]+shape) | ||
| final_inputs.append(arr) | ||
| self.log(' + stacking outputs') | ||
| if self._with_outputs: | ||
| final_outputs = [] | ||
| for idx, shape in enumerate(self._output_shapes): | ||
| arr = stack_input_from_gen((s.outputs[idx] for _, s in iterate_samples()), shape) | ||
| # arr = arr.reshape([arr_size]+shape) | ||
| final_outputs.append(arr) | ||
| final_outputs = [np.empty((max_size, *shape), dtype=self.dtype) for shape in self._output_shapes] | ||
| self.log(' + preparing weights') | ||
| if self._with_weight: | ||
| final_weights = np.array([s.weight for _, s in iterate_samples()]) | ||
| # keep same behavior as before (weights array dtype tied to self.dtype) | ||
| final_weights = np.empty((max_size,), dtype=self.dtype) | ||
| self.clear_cache('__calc_XYw_') | ||
| indices_df = [] | ||
| write_i = 0 | ||
| self.log(' + filling arrays (skipping None samples)') | ||
| def _calc_one(self, args): | ||
| idx, row = args | ||
| sample = self._get_sample_wrapper(row) | ||
| if sample is None: | ||
| return None | ||
| return idx, sample | ||
| # too many workers uses up A LOT of memory for some reason.. so we want between 1..4 workers | ||
| max_workers = max(1, min(4, self.n_jobs_orig if self.n_jobs_orig > 0 else os.cpu_count())) | ||
| with ThreadPoolExecutor(max_workers=max_workers) as ex: | ||
| # executor.map preserves input order and does NOT require materializing all rows | ||
| it = ex.map(partial(_calc_one, self), enumerate(iterrows(df))) | ||
| for out in tqdm(it, total=len(df), desc="preparing arrays"): | ||
| if out is None: | ||
| continue | ||
| idx, sample = out | ||
| indices_df.append(idx) | ||
| # inputs (copy into preallocated buffers) | ||
| for j in range(len(final_inputs)): | ||
| final_inputs[j][write_i] = sample.inputs[j] | ||
| # outputs | ||
| if self._with_outputs: | ||
| for j in range(len(final_outputs)): | ||
| final_outputs[j][write_i] = sample.outputs[j] | ||
| # weights | ||
| if self._with_weight: | ||
| final_weights[write_i] = sample.weight | ||
| write_i += 1 | ||
| kept = write_i | ||
| self.log(f' + shrinking arrays to kept={kept}') | ||
| # zero-copy views | ||
| final_inputs = [a[:kept] for a in final_inputs] | ||
| if self._with_outputs: | ||
| final_outputs = [a[:kept] for a in final_outputs] | ||
| else: | ||
| final_outputs = None | ||
| if self._with_weight: | ||
| final_weights = final_weights[:kept] | ||
| else: | ||
| final_weights = None | ||
| gc.collect() | ||
@@ -499,0 +533,0 @@ return final_inputs, final_outputs, final_weights, indices_df |
@@ -83,3 +83,3 @@ import pandas as pd | ||
| def x_on_iter_as_gen(llist, calc_func, total=None, backend=None, n_jobs=-1): | ||
| def x_on_iter_as_gen(llist, calc_func, total=None, backend=None, batch_size='auto', n_jobs=-1): | ||
| """ | ||
@@ -89,4 +89,2 @@ Special case, instead of returning results, it yields results | ||
| assert total is not None | ||
| if n_jobs == 1: | ||
@@ -99,3 +97,3 @@ for i in tqdm(llist, total=total): | ||
| try: | ||
| for v in Parallel(backend=backend, n_jobs=n_jobs)(delayed(calc_func)(i) for i in xtqdm): | ||
| for v in Parallel(backend=backend, batch_size=batch_size, n_jobs=n_jobs)(delayed(calc_func)(i) for i in xtqdm): | ||
| yield v | ||
@@ -102,0 +100,0 @@ |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
933226
0.12%9068
0.23%