You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

xdat

Package Overview
Dependencies
Maintainers
1
Versions
285
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

xdat - pypi Package Compare versions

Comparing version
0.1.298
to
0.1.299
+1
-1
PKG-INFO
Metadata-Version: 2.4
Name: xdat
Version: 0.1.298
Version: 0.1.299
Summary: eXtended Data Analysis Toolkit

@@ -5,0 +5,0 @@ Home-page: https://bitbucket.org/hermetric/xdat/

Metadata-Version: 2.4
Name: xdat
Version: 0.1.298
Version: 0.1.299
Summary: eXtended Data Analysis Toolkit

@@ -5,0 +5,0 @@ Home-page: https://bitbucket.org/hermetric/xdat/

@@ -1,1 +0,1 @@

0.1.298
0.1.299

@@ -13,2 +13,4 @@ import hashlib

from concurrent.futures import ThreadPoolExecutor
from functools import partial
import pickle

@@ -30,2 +32,3 @@ import portalocker

import imgaug.augmenters as iaa
from tqdm import tqdm

@@ -253,2 +256,5 @@

if mode == 'predict':
mode = 'test'
assert mode in ['fit', 'val', 'test', 'prod'], mode

@@ -275,3 +281,3 @@

self.mode = mode
self.n_jobs = n_jobs
self.n_jobs = self.n_jobs_orig = n_jobs
self.n_jobs_batch_size = n_jobs_batch_size

@@ -419,22 +425,11 @@ self.dtype = dtype

#
# prepare generator
self.clear_cache('__calc_XYw_', warn_if_missing=False)
def iterate_samples():
for idx, row in enumerate(iterrows(df)):
sample = self.cached('__calc_XYw_', self._get_sample_wrapper, row, _as_pickle=True)
if sample is not None:
yield idx, sample
#
# getting sample0 for shape information
sample0 = None
for _, sample0 in iterate_samples():
break
for _, row in enumerate(iterrows(df)):
sample0 = self._get_sample_wrapper(row)
if sample0 is not None:
break
assert sample0 is not None, "At least one sample should be DataSample"
self.log('- calculating results size')
indices_df = [idx for idx, _ in iterate_samples()]
arr_size = len(indices_df)
#

@@ -465,34 +460,73 @@ # initialize data format:

#
# build data structures
def stack_input_from_gen(gen, shape):
arr = np.empty((arr_size, *shape), dtype=self.dtype)
self.log('- preallocating arrays')
max_size = len(df)
for idx, a in enumerate(gen):
arr[idx] = a
return arr
final_inputs = []
final_inputs = [np.empty((max_size, *shape), dtype=self.dtype) for shape in self._input_shapes]
final_outputs = None
final_weights = None
self.log(' + stacking inputs')
for idx, shape in enumerate(self._input_shapes):
arr = stack_input_from_gen((s.inputs[idx] for _, s in iterate_samples()), shape)
# arr = arr.reshape([arr_size]+shape)
final_inputs.append(arr)
self.log(' + stacking outputs')
if self._with_outputs:
final_outputs = []
for idx, shape in enumerate(self._output_shapes):
arr = stack_input_from_gen((s.outputs[idx] for _, s in iterate_samples()), shape)
# arr = arr.reshape([arr_size]+shape)
final_outputs.append(arr)
final_outputs = [np.empty((max_size, *shape), dtype=self.dtype) for shape in self._output_shapes]
self.log(' + preparing weights')
if self._with_weight:
final_weights = np.array([s.weight for _, s in iterate_samples()])
# keep same behavior as before (weights array dtype tied to self.dtype)
final_weights = np.empty((max_size,), dtype=self.dtype)
self.clear_cache('__calc_XYw_')
indices_df = []
write_i = 0
self.log(' + filling arrays (skipping None samples)')
def _calc_one(self, args):
idx, row = args
sample = self._get_sample_wrapper(row)
if sample is None:
return None
return idx, sample
# too many workers uses up A LOT of memory for some reason.. so we want between 1..4 workers
max_workers = max(1, min(4, self.n_jobs_orig if self.n_jobs_orig > 0 else os.cpu_count()))
with ThreadPoolExecutor(max_workers=max_workers) as ex:
# executor.map preserves input order and does NOT require materializing all rows
it = ex.map(partial(_calc_one, self), enumerate(iterrows(df)))
for out in tqdm(it, total=len(df), desc="preparing arrays"):
if out is None:
continue
idx, sample = out
indices_df.append(idx)
# inputs (copy into preallocated buffers)
for j in range(len(final_inputs)):
final_inputs[j][write_i] = sample.inputs[j]
# outputs
if self._with_outputs:
for j in range(len(final_outputs)):
final_outputs[j][write_i] = sample.outputs[j]
# weights
if self._with_weight:
final_weights[write_i] = sample.weight
write_i += 1
kept = write_i
self.log(f' + shrinking arrays to kept={kept}')
# zero-copy views
final_inputs = [a[:kept] for a in final_inputs]
if self._with_outputs:
final_outputs = [a[:kept] for a in final_outputs]
else:
final_outputs = None
if self._with_weight:
final_weights = final_weights[:kept]
else:
final_weights = None
gc.collect()

@@ -499,0 +533,0 @@ return final_inputs, final_outputs, final_weights, indices_df

@@ -83,3 +83,3 @@ import pandas as pd

def x_on_iter_as_gen(llist, calc_func, total=None, backend=None, n_jobs=-1):
def x_on_iter_as_gen(llist, calc_func, total=None, backend=None, batch_size='auto', n_jobs=-1):
"""

@@ -89,4 +89,2 @@ Special case, instead of returning results, it yields results

assert total is not None
if n_jobs == 1:

@@ -99,3 +97,3 @@ for i in tqdm(llist, total=total):

try:
for v in Parallel(backend=backend, n_jobs=n_jobs)(delayed(calc_func)(i) for i in xtqdm):
for v in Parallel(backend=backend, batch_size=batch_size, n_jobs=n_jobs)(delayed(calc_func)(i) for i in xtqdm):
yield v

@@ -102,0 +100,0 @@