fused
Advanced tools
@@ -208,3 +208,2 @@ from __future__ import annotations | ||
| <ul style="margin-top: 0; margin-bottom: 0px;"> | ||
| <li>{copyable_text(udf._generate_code()[0], show_text=False)} Copy code</li> | ||
| {saved_udf_toolkit_str} | ||
@@ -211,0 +210,0 @@ </ul> |
+21
-14
@@ -268,8 +268,12 @@ import datetime | ||
| submit_kwargs.get("engine", None) == "local" | ||
| or submit_kwargs.get("instance_type", "realtime") == "realtime" | ||
| or submit_kwargs.get("instance_type", None) is not None | ||
| ): | ||
| # default logic: try realtime and fallback to large instance | ||
| submit_kwargs["instance_type"] = "realtime" | ||
| submit_kwargs_realtime = submit_kwargs.copy() | ||
| submit_kwargs_realtime["instance_type"] = "realtime" | ||
| # for realtime attempt, always with one run per instance (i.e. never use | ||
| # ChunkedJobPool), otherwise the current fallback logic does not work correctly | ||
| submit_kwargs_realtime["n_processes_per_worker"] = 1 | ||
| result = fused.submit( | ||
| udf, arg_list, **run_kwargs, collect=False, **submit_kwargs | ||
| udf, arg_list, **run_kwargs, collect=False, **submit_kwargs_realtime | ||
| ) | ||
@@ -505,7 +509,7 @@ result.wait() | ||
| steps: list[str] | None = None, | ||
| extract_instance_type: str = "realtime", | ||
| extract_instance_type: str | None = None, | ||
| extract_max_workers: int = 256, | ||
| partition_instance_type: str = "realtime", | ||
| partition_instance_type: str | None = None, | ||
| partition_max_workers: int = 256, | ||
| overview_instance_type: str = "realtime", | ||
| overview_instance_type: str | None = None, | ||
| overview_max_workers: int = 256, | ||
@@ -594,11 +598,11 @@ **kwargs, | ||
| extract_instance_type (str): The instance type to use for the extract step. | ||
| Defaults to "realtime". | ||
| By default, tries first with "realtime" and then falls back to "large". | ||
| extract_max_workers (int): The maximum number of workers to use for the extract step. | ||
| Defaults to 256. | ||
| partition_instance_type (str): The instance type to use for the partition step. | ||
| Defaults to "realtime". | ||
| By default, tries first with "realtime" and then falls back to "large". | ||
| partition_max_workers (int): The maximum number of workers to use for the partition step. | ||
| Defaults to 256. | ||
| overview_instance_type (str): The instance type to use for the overview step. | ||
| Defaults to "realtime". | ||
| By default, tries first with "realtime" and then falls back to "large". | ||
| overview_max_workers (int): The maximum number of workers to use for the overview step. | ||
@@ -616,9 +620,11 @@ Defaults to 256. | ||
| You can override this behavior by specifying the `engine`, `instance_type`, | ||
| `max_workers`, `n_processes_per_worker`, etc parameters as additional | ||
| keyword arguments to this function (`**kwargs`). If you want to specify | ||
| those per step, use `extract_instance_type`, `partition_instance_type`, etc. | ||
| For example, to run everything locally on the same machine where this | ||
| function runs, use: | ||
| run_ingest_raster_to_h3(..., engine="local") | ||
| In contrast to `fused.submit` itself, the ingestion sets `n_processes_per_worker=1` | ||
| by default to avoid out-of-memory issues on batch instances. You can | ||
| increase this if you know the instance has enough memory to process multiple | ||
| chunks in parallel. | ||
| """ | ||
@@ -761,2 +767,3 @@ try: | ||
| # of time outs or out-of-memory errors, which are unlikely to be resolved | ||
| # by retrying | ||
| kwargs = {"max_retry": 0} | kwargs | ||
@@ -763,0 +770,0 @@ |
+9
-0
@@ -23,2 +23,3 @@ import asyncio | ||
| from fused._options import options as OPTIONS | ||
| from fused.core._udf_ops import get_collection_name_with_fallback | ||
@@ -135,5 +136,9 @@ from . import load_async | ||
| elif isinstance(udf, UdfJobStepConfig): | ||
| udf.udf.collection_name = get_collection_name_with_fallback( | ||
| udf.udf.collection_name | ||
| ) | ||
| return ResolvedLocalJobStepUdf(job_step=udf, udf=udf.udf) | ||
| elif isinstance(udf, Udf): | ||
| udf.collection_name = get_collection_name_with_fallback(udf.collection_name) | ||
| job_step = UdfJobStepConfig(udf=udf, name=udf.name) | ||
@@ -181,5 +186,9 @@ return ResolvedLocalJobStepUdf(job_step=job_step, udf=udf) | ||
| elif isinstance(udf, UdfJobStepConfig): | ||
| udf.udf.collection_name = get_collection_name_with_fallback( | ||
| udf.udf.collection_name | ||
| ) | ||
| return ResolvedLocalJobStepUdf(job_step=udf, udf=udf.udf) | ||
| elif isinstance(udf, Udf): | ||
| udf.collection_name = get_collection_name_with_fallback(udf.collection_name) | ||
| job_step = UdfJobStepConfig(udf=udf, name=udf.name) | ||
@@ -186,0 +195,0 @@ return ResolvedLocalJobStepUdf(job_step=job_step, udf=udf) |
@@ -1,1 +0,1 @@ | ||
| __version__ = "2.0.1" | ||
| __version__ = "2.0.3" |
@@ -12,5 +12,3 @@ import json | ||
| from .udf import structure_params | ||
| class CustomJobConfig(FusedBaseModel): | ||
@@ -95,38 +93,12 @@ name: Optional[str] = None | ||
| """Structure text for README.md.""" | ||
| job_names = [f"job_{step.udf.name}" for step in job.steps] | ||
| from fused.models.udf.base_udf import METADATA_FUSED_DESCRIPTION | ||
| jobs = [] | ||
| for step in job.steps: | ||
| jobs.append( | ||
| f"job_{step.udf.name} = {step.udf.name}({structure_params(step._generate_job_params())})" | ||
| ) | ||
| job_names = ", ".join([f"job_{step.udf.name}" for step in job.steps]) | ||
| str_udf_imports = "\n".join( | ||
| [f"from udf_{step.udf.name} import {step.udf.name}" for step in job.steps] | ||
| ) | ||
| str_run_local = "job.run_local(file_id=0, chunk_id=0)" | ||
| str_run_remote = "job.run_remote(output_table='output_table_name')" | ||
| str_job = "\n".join(jobs) | ||
| str_multijob = f"job = fused.experimental.job([{', '.join(job_names)}])" | ||
| src = f""" | ||
| # Fused Multi-Step Job | ||
| src = f"""# {job_names} | ||
| ## Get started | ||
| ```python | ||
| # Import UDFs | ||
| {str_udf_imports} | ||
| # Instantiate individual jobs | ||
| {str_job} | ||
| # Instantiate multi-step job | ||
| {str_multijob} | ||
| # Run locally | ||
| {str_run_local} | ||
| # Run remotely | ||
| {str_run_remote} | ||
| ``` | ||
| """ | ||
| for step in job.steps: | ||
| if step.udf.metadata: | ||
| src += f"""{step.udf.metadata.get(METADATA_FUSED_DESCRIPTION, "")}\n\n""" | ||
| return src | ||
@@ -133,0 +105,0 @@ |
+0
-107
@@ -37,6 +37,4 @@ from __future__ import annotations | ||
| create_directory_and_zip, | ||
| extract_parameters, | ||
| generate_meta_json, | ||
| generate_readme, | ||
| stringify_named_params, | ||
| structure_params, | ||
@@ -225,65 +223,2 @@ ) | ||
| def _generate_job_params(self) -> list[str]: | ||
| # Derive parameters | ||
| positional_parameters, named_parameters = extract_parameters(self.udf.code) | ||
| # Replace default code params with params passed to job | ||
| named_parameters.update(self.udf.parameters) | ||
| # Remove positional parameters that already have values in the job | ||
| positional_parameters = [ | ||
| positional_parameter | ||
| for positional_parameter in positional_parameters | ||
| if positional_parameter not in named_parameters.keys() | ||
| ] | ||
| if self.type == "udf": | ||
| if self.input: | ||
| positional_parameters[0] = f"arg_list={repr(self.input)}" | ||
| else: | ||
| warnings.warn( | ||
| FusedTypeWarning(f'Rendering of type "{self.type}" may be incomplete.') | ||
| ) | ||
| # After replacing the inputs, look for any parameters with no values | ||
| # that match the reserved parameter names. These should not be included | ||
| # in instantiating the UDF because they are provided by the system. | ||
| positional_parameters = [ | ||
| param | ||
| for param in positional_parameters | ||
| if param not in SYSTEM_PARAMETER_NAMES | ||
| ] | ||
| _params_fn_instance = positional_parameters + stringify_named_params( | ||
| named_parameters | ||
| ) | ||
| output_config = [] | ||
| if hasattr(self, "output"): | ||
| output_str = repr(self.output) | ||
| if output_str is not None: | ||
| output_config.append(f"output_table={output_str}") | ||
| return _params_fn_instance + output_config | ||
| def _generate_code(self, headerfile=False): | ||
| # String: Job instantiation | ||
| str_job_inst = f"job = {self.udf.entrypoint}._to_job_step({structure_params(self._generate_job_params())})" | ||
| # String: Job execution | ||
| str_job_exec = "job._run_local()" | ||
| # String: UDF | ||
| str_udf, header_cells = self.udf._generate_code( | ||
| include_imports=False, headerfile=headerfile | ||
| ) | ||
| # Structure cell | ||
| src = f""" | ||
| {STR_IMPORTS} | ||
| {str_udf} | ||
| {str_job_inst} | ||
| {str_job_exec} | ||
| """ | ||
| return src, header_cells | ||
| def render(self, headerfile=False): | ||
| _render(self, headerfile=headerfile) | ||
| def export( | ||
@@ -999,5 +934,2 @@ self, | ||
| def render(self, headerfile=False): | ||
| _render(self, headerfile=headerfile) | ||
| def get_sample(self): | ||
@@ -1031,3 +963,2 @@ raise NotImplementedError( | ||
| "meta.json": generate_meta_json(job), | ||
| # "multijob.py": obj._generate_code(headerfile=True), | ||
| "README.md": generate_readme(job), | ||
@@ -1057,40 +988,2 @@ } | ||
| def _render(job, headerfile=False): | ||
| from IPython import get_ipython | ||
| from IPython.core.inputsplitter import IPythonInputSplitter | ||
| def create_new_cell(contents): | ||
| """Similar to ipython.set_next_input but allows multiple cells to be created at once.""" | ||
| from IPython.core.getipython import get_ipython | ||
| shell = get_ipython() | ||
| payload = dict( | ||
| source="set_next_input", | ||
| text=contents, | ||
| replace=False, | ||
| ) | ||
| shell.payload_manager.write_payload(payload, single=False) | ||
| # Get the current IPython instance. | ||
| ipython = get_ipython() | ||
| if ipython is None: | ||
| raise RuntimeError("This function can only be used in a Jupyter Notebook.") | ||
| # Create an instance of IPythonInputSplitter. | ||
| splitter = IPythonInputSplitter() | ||
| # Generate code string and split into lines. | ||
| src_udf, header_cells = job._generate_code(headerfile=headerfile) | ||
| lines = src_udf.strip().split("\n") | ||
| # Set the content of the subsequent cell with. | ||
| create_new_cell(splitter.transform_cell("\n".join(lines))) | ||
| # Headers | ||
| if headerfile: | ||
| for header in header_cells: | ||
| create_new_cell(header) | ||
| class DatasetInputBase(BaseModel): | ||
@@ -1097,0 +990,0 @@ type: Literal[None, "v2"] |
| from __future__ import annotations | ||
| import ast | ||
| import difflib | ||
@@ -14,3 +13,2 @@ import json | ||
| from pathlib import PurePath | ||
| from textwrap import dedent, indent | ||
| from typing import ( | ||
@@ -48,8 +46,2 @@ TYPE_CHECKING, | ||
| from .._codegen import ( | ||
| extract_parameters, | ||
| stringify_headers, | ||
| stringify_named_params, | ||
| structure_params, | ||
| ) | ||
| from .._inplace import _maybe_inplace | ||
@@ -770,105 +762,2 @@ | ||
| def _generate_code( | ||
| self, include_imports=True, headerfile=False | ||
| ) -> tuple[str, Sequence[str]]: | ||
| def _extract_fn(src: str) -> ast.FunctionDef | ast.AsyncFunctionDef: | ||
| # TODO: handle Header objects in header param | ||
| parsed_ast = ast.parse(src) | ||
| # Find all function definitions in the AST | ||
| function_defs = [ | ||
| node | ||
| for node in ast.walk(parsed_ast) | ||
| if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)) | ||
| ] | ||
| # first function (assume it's the target function) | ||
| return function_defs[0] | ||
| def _extract_fn_body( | ||
| function_def: ast.FunctionDef | ast.AsyncFunctionDef, src: str | ||
| ) -> str: | ||
| target_function_body = function_def.body | ||
| # Reconstruct the source code of the function body | ||
| line_start = target_function_body[0].lineno - 1 | ||
| line_end = target_function_body[-1].end_lineno | ||
| target_function_body_lines = [ | ||
| line for line in src.splitlines()[line_start:line_end] | ||
| ] | ||
| target_function_body_str = "\n".join(target_function_body_lines) | ||
| return target_function_body_str | ||
| # Derive parameters | ||
| positional_parameters, named_parameters = extract_parameters(self.code) | ||
| _params_fn_original = positional_parameters + stringify_named_params( | ||
| named_parameters | ||
| ) | ||
| # String: Imports | ||
| str_imports = ( | ||
| "\n".join( | ||
| [ | ||
| "import fused", | ||
| "from fused.models.udf import Header", | ||
| ] | ||
| ) | ||
| + "\n\n" | ||
| ) | ||
| # String: UDF header - Replace Header with file reference | ||
| header_files = [] | ||
| processed_headers = [] | ||
| # TODO: do this conversion as part of header attribute | ||
| for header in self.headers: | ||
| if isinstance(header, Header): | ||
| if headerfile: | ||
| filename = header.module_name + ".py" | ||
| processed_headers.append(filename) | ||
| else: | ||
| processed_headers.append(header) | ||
| # Create a file magic string | ||
| header_files.append(header._generate_cell_code()) | ||
| else: | ||
| processed_headers.append(header) | ||
| _headers = ( | ||
| stringify_headers(processed_headers) if headerfile else processed_headers | ||
| ) | ||
| params_decorator = [f"\n headers={_headers}\n"] | ||
| str_udf_header = ( | ||
| f"@fused.udf({structure_params(params_decorator, separator=',')})" | ||
| ) | ||
| # String: Function header | ||
| fn = _extract_fn(self.code) | ||
| str_async = "async " if isinstance(fn, ast.AsyncFunctionDef) else "" | ||
| str_fn_header = f"{str_async}def {self.entrypoint}({structure_params(_params_fn_original)}):" | ||
| # String: Function body | ||
| str_fn_body = dedent(_extract_fn_body(fn, src=self.code)) | ||
| str_udf = f""" | ||
| {str_udf_header} | ||
| {str_fn_header} | ||
| {indent(str_fn_body, " " * 4)} | ||
| """ | ||
| if include_imports: | ||
| str_udf = str_imports + str_udf | ||
| return str_udf.strip("\n"), header_files | ||
| def render(self): | ||
| from IPython import get_ipython | ||
| # Get the current IPython instance. | ||
| ipython = get_ipython() | ||
| if ipython is None: | ||
| raise RuntimeError("This function can only be used in a Jupyter Notebook.") | ||
| # Generate code string and spl into lines. | ||
| code = self._generate_code()[0].strip() | ||
| transformed = ipython.input_transformer_manager.transform_cell(code) | ||
| # Set the content of the subsequent cell with. | ||
| ipython.set_next_input(transformed) | ||
| @property | ||
@@ -875,0 +764,0 @@ def utils(self): |
+1
-1
| Metadata-Version: 2.4 | ||
| Name: fused | ||
| Version: 2.0.1 | ||
| Version: 2.0.3 | ||
| Project-URL: Homepage, https://www.fused.io | ||
@@ -5,0 +5,0 @@ Project-URL: Documentation, https://docs.fused.io |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
933656
-0.76%22348
-0.85%