fused
Advanced tools
+70
-19
@@ -302,7 +302,9 @@ import datetime | ||
| submit_kwargs["instance_type"] = "large" | ||
| if submit_kwargs.get("n_processes_per_worker", None) is None: | ||
| if "n_processes_per_worker" not in submit_kwargs: | ||
| print( | ||
| "-- Starting job with n_processes_per_worker=32 by default. You can optimise this for better resource utilization." | ||
| "-- Starting a job with worker_concurrency=4 by default. You can " | ||
| "optimise this for better resource utilization (increase it if the " | ||
| "instance has enough memory or decrease it when running out of memory)" | ||
| ) | ||
| submit_kwargs["n_processes_per_worker"] = 32 | ||
| submit_kwargs["n_processes_per_worker"] = 4 | ||
| result = fused.submit( | ||
@@ -313,2 +315,12 @@ udf, arg_list, **run_kwargs, collect=False, **submit_kwargs | ||
| # otherwise run once with user provided configuration | ||
| if ( | ||
| submit_kwargs.get("instance_type", "realtime") != "realtime" | ||
| and "n_processes_per_worker" not in submit_kwargs | ||
| ): | ||
| print( | ||
| "-- Starting a job with worker_concurrency=4 by default. You can " | ||
| "optimise this for better resource utilization (increase it if the " | ||
| "instance has enough memory or decrease it when running out of memory)" | ||
| ) | ||
| submit_kwargs["n_processes_per_worker"] = 4 | ||
| result = result = fused.submit( | ||
@@ -510,7 +522,7 @@ udf, arg_list, **run_kwargs, collect=False, **submit_kwargs | ||
| steps: list[str] | None = None, | ||
| extract_instance_type: str | None = None, | ||
| extract_engine: str | None = None, | ||
| extract_max_workers: int = 256, | ||
| partition_instance_type: str | None = None, | ||
| partition_engine: str | None = None, | ||
| partition_max_workers: int = 256, | ||
| overview_instance_type: str | None = None, | ||
| overview_engine: str | None = None, | ||
| overview_max_workers: int = 256, | ||
@@ -598,11 +610,11 @@ **kwargs, | ||
| steps are run. | ||
| extract_instance_type (str): The instance type to use for the extract step. | ||
| extract_engine (str): The engine to use for the extract step. | ||
| By default, tries first with "realtime" and then falls back to "large". | ||
| extract_max_workers (int): The maximum number of workers to use for the extract step. | ||
| Defaults to 256. | ||
| partition_instance_type (str): The instance type to use for the partition step. | ||
| partition_engine (str): The engine to use for the partition step. | ||
| By default, tries first with "realtime" and then falls back to "large". | ||
| partition_max_workers (int): The maximum number of workers to use for the partition step. | ||
| Defaults to 256. | ||
| overview_instance_type (str): The instance type to use for the overview step. | ||
| overview_engine (str): The engine to use for the overview step. | ||
| By default, tries first with "realtime" and then falls back to "large". | ||
@@ -615,3 +627,5 @@ overview_max_workers (int): The maximum number of workers to use for the overview step. | ||
| Typical keywords include `engine`, `instance_type`, `max_workers`, | ||
| `n_processes_per_worker` and `max_retry`. | ||
| `worker_concurrency` and `max_retry`. | ||
| For `worker_concurrency`, if not specified, a default of 4 is | ||
| used when running on batch instances. | ||
@@ -622,6 +636,6 @@ The extract, partition and overview steps are run in parallel using | ||
| You can override this behavior by specifying the `engine`, `instance_type`, | ||
| `max_workers`, `n_processes_per_worker`, etc parameters as additional | ||
| You can override this behavior by specifying the `engine`, `max_workers`, | ||
| `worker_concurrency`, etc parameters as additional | ||
| keyword arguments to this function (`**kwargs`). If you want to specify | ||
| those per step, use `extract_instance_type`, `partition_instance_type`, etc. | ||
| those per step, use `extract_engine`, `partition_engine`, etc. | ||
| For example, to run everything locally on the same machine where this | ||
@@ -734,3 +748,3 @@ function runs, use: | ||
| warnings.warn( | ||
| "`extract_kwargs` is deprecated, use `extract_instance_type` and `extract_max_workers` instead.", | ||
| "`extract_kwargs` is deprecated, use `extract_engine` and `extract_max_workers` instead.", | ||
| FusedDeprecationWarning, | ||
@@ -742,3 +756,3 @@ stacklevel=2, | ||
| warnings.warn( | ||
| "`partition_kwargs` is deprecated, use `partition_instance_type`, `partition_max_workers` instead.", | ||
| "`partition_kwargs` is deprecated, use `partition_engine`, `partition_max_workers` instead.", | ||
| FusedDeprecationWarning, | ||
@@ -750,3 +764,3 @@ stacklevel=2, | ||
| warnings.warn( | ||
| "`overview_kwargs` is deprecated, use `overview_instance_type` and `overview_max_workers` instead.", | ||
| "`overview_kwargs` is deprecated, use `overview_engine` and `overview_max_workers` instead.", | ||
| FusedDeprecationWarning, | ||
@@ -756,12 +770,36 @@ stacklevel=2, | ||
| if "extract_instance_type" in kwargs: | ||
| warnings.warn( | ||
| "`extract_instance_type` is deprecated, use `extract_engine` instead.", | ||
| FusedDeprecationWarning, | ||
| stacklevel=2, | ||
| ) | ||
| extract_engine = kwargs.pop("extract_instance_type") | ||
| if "partition_instance_type" in kwargs: | ||
| warnings.warn( | ||
| "`partition_instance_type` is deprecated, use `partition_engine` instead.", | ||
| FusedDeprecationWarning, | ||
| stacklevel=2, | ||
| ) | ||
| partition_engine = kwargs.pop("partition_instance_type") | ||
| if "overview_instance_type" in kwargs: | ||
| warnings.warn( | ||
| "`overview_instance_type` is deprecated, use `overview_engine` instead.", | ||
| FusedDeprecationWarning, | ||
| stacklevel=2, | ||
| ) | ||
| overview_engine = kwargs.pop("overview_instance_type") | ||
| extract_kwargs: dict[str, Any] = { | ||
| "instance_type": extract_instance_type, | ||
| "instance_type": extract_engine, | ||
| "max_workers": extract_max_workers, | ||
| } | kwargs.pop("extract_kwargs", {}) | ||
| partition_kwargs: dict[str, Any] = { | ||
| "instance_type": partition_instance_type, | ||
| "instance_type": partition_engine, | ||
| "max_workers": partition_max_workers, | ||
| } | kwargs.pop("partition_kwargs", {}) | ||
| overview_kwargs: dict[str, Any] = { | ||
| "instance_type": overview_instance_type, | ||
| "instance_type": overview_engine, | ||
| "max_workers": overview_max_workers, | ||
@@ -774,4 +812,17 @@ } | kwargs.pop("overview_kwargs", {}) | ||
| # by retrying | ||
| # - n_processes_per_worker=4: when running on a (typically large) batch | ||
| # instance, the default to use as many processes as cores will almost | ||
| # always result in out-of-memory issues -> use a conservative default of 4 | ||
| # instead. User can still increase this if they know the instance has enough | ||
| # memory | ||
| # (n_processes_per_worker is set later per step inside _submit_with_fallback) | ||
| kwargs = {"max_retry": 0} | kwargs | ||
| # TMP map new keywords to old style keywords for run/submit | ||
| if "engine" in kwargs and kwargs.get("engine") not in ("local", "remote"): | ||
| kwargs["instance_type"] = kwargs.pop("engine") | ||
| if "worker_concurrency" in kwargs: | ||
| kwargs["n_processes_per_worker"] = kwargs.pop("worker_concurrency") | ||
| ########################################################################### | ||
@@ -778,0 +829,0 @@ # Step one: extracting pixel values and converting to hex divided in chunks |
@@ -258,3 +258,3 @@ import os | ||
| verbose_udf_runs: StrictBool = True | ||
| verbose_udf_runs: StrictBool = False | ||
| """Whether to print logs from UDF runs by default""" | ||
@@ -261,0 +261,0 @@ |
@@ -1,1 +0,1 @@ | ||
| __version__ = "2.0.4" | ||
| __version__ = "2.1.0" |
@@ -268,6 +268,6 @@ """Thread-safe utilities for concurrent execution with isolated logging.""" | ||
| stdout_file = stack.enter_context( | ||
| SyncingFileWrapper(open(log_dir / "stdout", "w", buffering=1)) | ||
| SyncingFileWrapper(open(log_dir / "stdout.log", "w", buffering=1)) | ||
| ) | ||
| stderr_file = stack.enter_context( | ||
| SyncingFileWrapper(open(log_dir / "stderr", "w", buffering=1)) | ||
| SyncingFileWrapper(open(log_dir / "stderr.log", "w", buffering=1)) | ||
| ) | ||
@@ -274,0 +274,0 @@ |
@@ -365,2 +365,3 @@ from __future__ import annotations | ||
| cache: Set to False as a shortcut for `cache_max_age='0s'` to disable caching. (Default True) | ||
| verbose: Set to True to print stdout and stderr from the UDF execution. | ||
| **kwargs: Keyword arguments to pass to the UDF. | ||
@@ -445,6 +446,6 @@ | ||
| Note that retries will only be attempted if the object is waited on, | ||
| e.g. with `pool.wait()`, `pool.tail()`, or `pool.collect()`. | ||
| e.g. with `pool.wait()`, `pool.tail()`, or `pool.df()`. | ||
| Returns: | ||
| A JobPool object. Call `.collect()` to get the results. | ||
| A JobPool object. Call `.df()` to get the results. | ||
@@ -457,3 +458,3 @@ Example: | ||
| >>> pool = my_udf.map([1, 2, 3]) | ||
| >>> results = pool.collect() | ||
| >>> results = pool.df() | ||
| >>> print(results) | ||
@@ -513,3 +514,3 @@ [2, 3, 4] | ||
| Note that retries will only be attempted if the object is waited on, | ||
| e.g. with `pool.wait()`, `pool.tail()`, or `pool.collect()`. | ||
| e.g. with `pool.wait()`, `pool.tail()`, or `pool.df()`. | ||
@@ -519,3 +520,3 @@ Note worker_concurrency is not supported for async map. | ||
| Returns: | ||
| An AsyncJobPool object. Call `.collect()` to get the results. | ||
| An AsyncJobPool object. Call `.df()` to get the results. | ||
@@ -528,3 +529,3 @@ Example: | ||
| >>> pool = my_udf.map_async([1, 2, 3]) | ||
| >>> results = pool.collect() | ||
| >>> results = pool.df() | ||
| >>> print(results) | ||
@@ -531,0 +532,0 @@ [2, 3, 4] |
+1
-1
| Metadata-Version: 2.4 | ||
| Name: fused | ||
| Version: 2.0.4 | ||
| Version: 2.1.0 | ||
| Project-URL: Homepage, https://www.fused.io | ||
@@ -5,0 +5,0 @@ Project-URL: Documentation, https://docs.fused.io |
Sorry, the diff of this file is too big to display
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
937451
0.28%22434
0.25%