Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

fused

Package Overview
Dependencies
Maintainers
2
Versions
104
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

fused - npm Package Compare versions

Comparing version
2.0.1
to
2.0.3
+0
-1
fused/_formatter/udf.py

@@ -208,3 +208,2 @@ from __future__ import annotations

<ul style="margin-top: 0; margin-bottom: 0px;">
<li>{copyable_text(udf._generate_code()[0], show_text=False)} Copy code</li>
{saved_udf_toolkit_str}

@@ -211,0 +210,0 @@ </ul>

+21
-14

@@ -268,8 +268,12 @@ import datetime

submit_kwargs.get("engine", None) == "local"
or submit_kwargs.get("instance_type", "realtime") == "realtime"
or submit_kwargs.get("instance_type", None) is not None
):
# default logic: try realtime and fallback to large instance
submit_kwargs["instance_type"] = "realtime"
submit_kwargs_realtime = submit_kwargs.copy()
submit_kwargs_realtime["instance_type"] = "realtime"
# for realtime attempt, always with one run per instance (i.e. never use
# ChunkedJobPool), otherwise the current fallback logic does not work correctly
submit_kwargs_realtime["n_processes_per_worker"] = 1
result = fused.submit(
udf, arg_list, **run_kwargs, collect=False, **submit_kwargs
udf, arg_list, **run_kwargs, collect=False, **submit_kwargs_realtime
)

@@ -505,7 +509,7 @@ result.wait()

steps: list[str] | None = None,
extract_instance_type: str = "realtime",
extract_instance_type: str | None = None,
extract_max_workers: int = 256,
partition_instance_type: str = "realtime",
partition_instance_type: str | None = None,
partition_max_workers: int = 256,
overview_instance_type: str = "realtime",
overview_instance_type: str | None = None,
overview_max_workers: int = 256,

@@ -594,11 +598,11 @@ **kwargs,

extract_instance_type (str): The instance type to use for the extract step.
Defaults to "realtime".
By default, tries first with "realtime" and then falls back to "large".
extract_max_workers (int): The maximum number of workers to use for the extract step.
Defaults to 256.
partition_instance_type (str): The instance type to use for the partition step.
Defaults to "realtime".
By default, tries first with "realtime" and then falls back to "large".
partition_max_workers (int): The maximum number of workers to use for the partition step.
Defaults to 256.
overview_instance_type (str): The instance type to use for the overview step.
Defaults to "realtime".
By default, tries first with "realtime" and then falls back to "large".
overview_max_workers (int): The maximum number of workers to use for the overview step.

@@ -616,9 +620,11 @@ Defaults to 256.

You can override this behavior by specifying the `engine`, `instance_type`,
`max_workers`, `n_processes_per_worker`, etc parameters as additional
keyword arguments to this function (`**kwargs`). If you want to specify
those per step, use `extract_instance_type`, `partition_instance_type`, etc.
For example, to run everything locally on the same machine where this
function runs, use:
run_ingest_raster_to_h3(..., engine="local")
In contrast to `fused.submit` itself, the ingestion sets `n_processes_per_worker=1`
by default to avoid out-of-memory issues on batch instances. You can
increase this if you know the instance has enough memory to process multiple
chunks in parallel.
"""

@@ -761,2 +767,3 @@ try:

# of time outs or out-of-memory errors, which are unlikely to be resolved
# by retrying
kwargs = {"max_retry": 0} | kwargs

@@ -763,0 +770,0 @@

@@ -23,2 +23,3 @@ import asyncio

from fused._options import options as OPTIONS
from fused.core._udf_ops import get_collection_name_with_fallback

@@ -135,5 +136,9 @@ from . import load_async

elif isinstance(udf, UdfJobStepConfig):
udf.udf.collection_name = get_collection_name_with_fallback(
udf.udf.collection_name
)
return ResolvedLocalJobStepUdf(job_step=udf, udf=udf.udf)
elif isinstance(udf, Udf):
udf.collection_name = get_collection_name_with_fallback(udf.collection_name)
job_step = UdfJobStepConfig(udf=udf, name=udf.name)

@@ -181,5 +186,9 @@ return ResolvedLocalJobStepUdf(job_step=job_step, udf=udf)

elif isinstance(udf, UdfJobStepConfig):
udf.udf.collection_name = get_collection_name_with_fallback(
udf.udf.collection_name
)
return ResolvedLocalJobStepUdf(job_step=udf, udf=udf.udf)
elif isinstance(udf, Udf):
udf.collection_name = get_collection_name_with_fallback(udf.collection_name)
job_step = UdfJobStepConfig(udf=udf, name=udf.name)

@@ -186,0 +195,0 @@ return ResolvedLocalJobStepUdf(job_step=job_step, udf=udf)

@@ -1,1 +0,1 @@

__version__ = "2.0.1"
__version__ = "2.0.3"

@@ -12,5 +12,3 @@ import json

from .udf import structure_params
class CustomJobConfig(FusedBaseModel):

@@ -95,38 +93,12 @@ name: Optional[str] = None

"""Structure text for README.md."""
job_names = [f"job_{step.udf.name}" for step in job.steps]
from fused.models.udf.base_udf import METADATA_FUSED_DESCRIPTION
jobs = []
for step in job.steps:
jobs.append(
f"job_{step.udf.name} = {step.udf.name}({structure_params(step._generate_job_params())})"
)
job_names = ", ".join([f"job_{step.udf.name}" for step in job.steps])
str_udf_imports = "\n".join(
[f"from udf_{step.udf.name} import {step.udf.name}" for step in job.steps]
)
str_run_local = "job.run_local(file_id=0, chunk_id=0)"
str_run_remote = "job.run_remote(output_table='output_table_name')"
str_job = "\n".join(jobs)
str_multijob = f"job = fused.experimental.job([{', '.join(job_names)}])"
src = f"""
# Fused Multi-Step Job
src = f"""# {job_names}
## Get started
```python
# Import UDFs
{str_udf_imports}
# Instantiate individual jobs
{str_job}
# Instantiate multi-step job
{str_multijob}
# Run locally
{str_run_local}
# Run remotely
{str_run_remote}
```
"""
for step in job.steps:
if step.udf.metadata:
src += f"""{step.udf.metadata.get(METADATA_FUSED_DESCRIPTION, "")}\n\n"""
return src

@@ -133,0 +105,0 @@

@@ -37,6 +37,4 @@ from __future__ import annotations

create_directory_and_zip,
extract_parameters,
generate_meta_json,
generate_readme,
stringify_named_params,
structure_params,

@@ -225,65 +223,2 @@ )

def _generate_job_params(self) -> list[str]:
# Derive parameters
positional_parameters, named_parameters = extract_parameters(self.udf.code)
# Replace default code params with params passed to job
named_parameters.update(self.udf.parameters)
# Remove positional parameters that already have values in the job
positional_parameters = [
positional_parameter
for positional_parameter in positional_parameters
if positional_parameter not in named_parameters.keys()
]
if self.type == "udf":
if self.input:
positional_parameters[0] = f"arg_list={repr(self.input)}"
else:
warnings.warn(
FusedTypeWarning(f'Rendering of type "{self.type}" may be incomplete.')
)
# After replacing the inputs, look for any parameters with no values
# that match the reserved parameter names. These should not be included
# in instantiating the UDF because they are provided by the system.
positional_parameters = [
param
for param in positional_parameters
if param not in SYSTEM_PARAMETER_NAMES
]
_params_fn_instance = positional_parameters + stringify_named_params(
named_parameters
)
output_config = []
if hasattr(self, "output"):
output_str = repr(self.output)
if output_str is not None:
output_config.append(f"output_table={output_str}")
return _params_fn_instance + output_config
def _generate_code(self, headerfile=False):
# String: Job instantiation
str_job_inst = f"job = {self.udf.entrypoint}._to_job_step({structure_params(self._generate_job_params())})"
# String: Job execution
str_job_exec = "job._run_local()"
# String: UDF
str_udf, header_cells = self.udf._generate_code(
include_imports=False, headerfile=headerfile
)
# Structure cell
src = f"""
{STR_IMPORTS}
{str_udf}
{str_job_inst}
{str_job_exec}
"""
return src, header_cells
def render(self, headerfile=False):
_render(self, headerfile=headerfile)
def export(

@@ -999,5 +934,2 @@ self,

def render(self, headerfile=False):
_render(self, headerfile=headerfile)
def get_sample(self):

@@ -1031,3 +963,2 @@ raise NotImplementedError(

"meta.json": generate_meta_json(job),
# "multijob.py": obj._generate_code(headerfile=True),
"README.md": generate_readme(job),

@@ -1057,40 +988,2 @@ }

def _render(job, headerfile=False):
from IPython import get_ipython
from IPython.core.inputsplitter import IPythonInputSplitter
def create_new_cell(contents):
"""Similar to ipython.set_next_input but allows multiple cells to be created at once."""
from IPython.core.getipython import get_ipython
shell = get_ipython()
payload = dict(
source="set_next_input",
text=contents,
replace=False,
)
shell.payload_manager.write_payload(payload, single=False)
# Get the current IPython instance.
ipython = get_ipython()
if ipython is None:
raise RuntimeError("This function can only be used in a Jupyter Notebook.")
# Create an instance of IPythonInputSplitter.
splitter = IPythonInputSplitter()
# Generate code string and split into lines.
src_udf, header_cells = job._generate_code(headerfile=headerfile)
lines = src_udf.strip().split("\n")
# Set the content of the subsequent cell with.
create_new_cell(splitter.transform_cell("\n".join(lines)))
# Headers
if headerfile:
for header in header_cells:
create_new_cell(header)
class DatasetInputBase(BaseModel):

@@ -1097,0 +990,0 @@ type: Literal[None, "v2"]

from __future__ import annotations
import ast
import difflib

@@ -14,3 +13,2 @@ import json

from pathlib import PurePath
from textwrap import dedent, indent
from typing import (

@@ -48,8 +46,2 @@ TYPE_CHECKING,

from .._codegen import (
extract_parameters,
stringify_headers,
stringify_named_params,
structure_params,
)
from .._inplace import _maybe_inplace

@@ -770,105 +762,2 @@

def _generate_code(
self, include_imports=True, headerfile=False
) -> tuple[str, Sequence[str]]:
def _extract_fn(src: str) -> ast.FunctionDef | ast.AsyncFunctionDef:
# TODO: handle Header objects in header param
parsed_ast = ast.parse(src)
# Find all function definitions in the AST
function_defs = [
node
for node in ast.walk(parsed_ast)
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef))
]
# first function (assume it's the target function)
return function_defs[0]
def _extract_fn_body(
function_def: ast.FunctionDef | ast.AsyncFunctionDef, src: str
) -> str:
target_function_body = function_def.body
# Reconstruct the source code of the function body
line_start = target_function_body[0].lineno - 1
line_end = target_function_body[-1].end_lineno
target_function_body_lines = [
line for line in src.splitlines()[line_start:line_end]
]
target_function_body_str = "\n".join(target_function_body_lines)
return target_function_body_str
# Derive parameters
positional_parameters, named_parameters = extract_parameters(self.code)
_params_fn_original = positional_parameters + stringify_named_params(
named_parameters
)
# String: Imports
str_imports = (
"\n".join(
[
"import fused",
"from fused.models.udf import Header",
]
)
+ "\n\n"
)
# String: UDF header - Replace Header with file reference
header_files = []
processed_headers = []
# TODO: do this conversion as part of header attribute
for header in self.headers:
if isinstance(header, Header):
if headerfile:
filename = header.module_name + ".py"
processed_headers.append(filename)
else:
processed_headers.append(header)
# Create a file magic string
header_files.append(header._generate_cell_code())
else:
processed_headers.append(header)
_headers = (
stringify_headers(processed_headers) if headerfile else processed_headers
)
params_decorator = [f"\n headers={_headers}\n"]
str_udf_header = (
f"@fused.udf({structure_params(params_decorator, separator=',')})"
)
# String: Function header
fn = _extract_fn(self.code)
str_async = "async " if isinstance(fn, ast.AsyncFunctionDef) else ""
str_fn_header = f"{str_async}def {self.entrypoint}({structure_params(_params_fn_original)}):"
# String: Function body
str_fn_body = dedent(_extract_fn_body(fn, src=self.code))
str_udf = f"""
{str_udf_header}
{str_fn_header}
{indent(str_fn_body, " " * 4)}
"""
if include_imports:
str_udf = str_imports + str_udf
return str_udf.strip("\n"), header_files
def render(self):
from IPython import get_ipython
# Get the current IPython instance.
ipython = get_ipython()
if ipython is None:
raise RuntimeError("This function can only be used in a Jupyter Notebook.")
# Generate code string and spl into lines.
code = self._generate_code()[0].strip()
transformed = ipython.input_transformer_manager.transform_cell(code)
# Set the content of the subsequent cell with.
ipython.set_next_input(transformed)
@property

@@ -875,0 +764,0 @@ def utils(self):

Metadata-Version: 2.4
Name: fused
Version: 2.0.1
Version: 2.0.3
Project-URL: Homepage, https://www.fused.io

@@ -5,0 +5,0 @@ Project-URL: Documentation, https://docs.fused.io