Skip to content

Workflow

formed.workflow.archive

Archive data structures for workflow execution persistence.

This module defines NamedTuple structures for serializing workflow executions to JSON format. These archives capture all metadata needed to restore past executions, including step fingerprints, source code hashes, and dependency information.

WorkflowStepArchive

Bases: NamedTuple

Archived snapshot of a WorkflowStep's execution-time metadata.

This structure captures all information needed to:

  1. Look up cached results (fingerprint, format_identifier)
  2. Understand what ran (version, source_hash, config)
  3. Reconstruct dependency references (dependency_fingerprints)

All steps are stored flat in WorkflowGraphArchive.steps, and dependencies are referenced by fingerprint rather than nested recursively.

name instance-attribute

name

step_type instance-attribute

step_type

fingerprint instance-attribute

fingerprint

format_identifier instance-attribute

format_identifier

version instance-attribute

version

source_hash instance-attribute

source_hash

config instance-attribute

config

deterministic instance-attribute

deterministic

cacheable instance-attribute

cacheable

should_be_cached instance-attribute

should_be_cached

dependency_fingerprints instance-attribute

dependency_fingerprints

fieldref class-attribute instance-attribute

fieldref = None

json

json()

Convert to JSON-serializable dict.

Source code in src/formed/workflow/archive.py
55
56
57
def json(self) -> dict[str, JsonValue]:
    """Convert to JSON-serializable dict."""
    return self._asdict()

from_json classmethod

from_json(data)

Create from JSON-deserialized dict.

Source code in src/formed/workflow/archive.py
59
60
61
62
63
@classmethod
def from_json(cls, data: JsonValue) -> Self:
    """Create from JSON-deserialized dict."""
    assert isinstance(data, dict)
    return cls(**cast(dict, data))

WorkflowGraphArchive

Bases: NamedTuple

Archived snapshot of a WorkflowGraph's execution-time state.

All steps are stored flat here (not nested). Dependencies between steps are represented by fingerprints in WorkflowStepArchive.dependency_fingerprints.

steps instance-attribute

steps

execution_order instance-attribute

execution_order

json

json()

Convert to JSON-serializable dict.

Source code in src/formed/workflow/archive.py
80
81
82
83
84
85
def json(self) -> dict[str, JsonValue]:
    """Convert to JSON-serializable dict."""
    return {
        "steps": {name: step.json() for name, step in self.steps.items()},
        "execution_order": cast(list, self.execution_order),
    }

from_json classmethod

from_json(data)

Create from JSON-deserialized dict.

Source code in src/formed/workflow/archive.py
87
88
89
90
91
92
93
94
@classmethod
def from_json(cls, data: dict[str, JsonValue]) -> Self:
    """Create from JSON-deserialized dict."""
    steps = {
        name: WorkflowStepArchive.from_json(step_data) for name, step_data in cast(dict, data["steps"]).items()
    }
    execution_order = data["execution_order"]
    return cls(steps=steps, execution_order=cast(list, execution_order))

WorkflowExecutionArchive

Bases: NamedTuple

Complete execution snapshot saved to execution.json.

This is the top-level structure that organizers save and restore. State is NOT included here - it's saved separately as it's mutable.

format_version instance-attribute

format_version

id instance-attribute

id

graph instance-attribute

graph

metadata instance-attribute

metadata

json

json()

Convert to JSON-serializable dict.

Source code in src/formed/workflow/archive.py
116
117
118
119
120
121
122
123
def json(self) -> dict[str, JsonValue]:
    """Convert to JSON-serializable dict."""
    return {
        "format_version": self.format_version,
        "id": self.id,
        "graph": self.graph.json(),
        "metadata": self.metadata,
    }

from_json classmethod

from_json(data)

Create from JSON-deserialized dict.

Source code in src/formed/workflow/archive.py
125
126
127
128
129
130
131
132
133
134
@classmethod
def from_json(cls, data: dict[str, JsonValue]) -> Self:
    """Create from JSON-deserialized dict."""
    graph = WorkflowGraphArchive.from_json(cast(dict, data["graph"]))
    return cls(
        format_version=cast(Literal["2.0"], data["format_version"]),
        id=cast(str, data["id"]),
        graph=graph,
        metadata=cast(dict, data["metadata"]),
    )

formed.workflow.cache

Workflow step result caching implementations.

This module provides caching backends for storing and retrieving workflow step results. Multiple cache implementations are available for different use cases.

Available Caches
  • EmptyWorkflowCache: No-op cache that never stores results
  • MemoryWorkflowCache: In-memory cache for development/testing
  • FilesystemWorkflowCache: Persistent file-based cache (default)

Examples:

>>> from formed.workflow.cache import FilesystemWorkflowCache
>>>
>>> # Create filesystem cache
>>> cache = FilesystemWorkflowCache(".formed/cache")
>>>
>>> # Check if step result is cached
>>> if step_info in cache:
...     result = cache[step_info]
... else:
...     result = execute_step()
...     cache[step_info] = result

WorkflowCache

Bases: Registrable

Abstract base class for workflow step result caching.

WorkflowCache provides a dict-like interface for storing and retrieving step execution results, keyed by WorkflowStepInfo (which includes the step's fingerprint).

Subclasses implement different storage backends (memory, filesystem, etc.).

Examples:

>>> # Implement custom cache
>>> class MyCache(WorkflowCache):
...     def __getitem__(self, step_info):
...         # Retrieve from custom backend
...         pass
...     def __setitem__(self, step_info, value):
...         # Store to custom backend
...         pass
...     def __contains__(self, step_info):
...         # Check if cached
...         pass
...     def __delitem__(self, step_info):
...         # Remove from cache
...         pass
Note
  • Cache keys are WorkflowStepInfo instances
  • Fingerprints uniquely identify step configurations
  • Thread-safety depends on implementation

EmptyWorkflowCache

Bases: WorkflowCache

No-op cache that never stores results.

EmptyWorkflowCache disables caching entirely. All contains checks return False and all getitem calls raise KeyError, forcing steps to always re-execute.

This is useful for debugging or when caching is undesirable.

Examples:

>>> cache = EmptyWorkflowCache()
>>> cache[step_info] = result  # Does nothing
>>> step_info in cache  # Always returns False

MemoryWorkflowCache

MemoryWorkflowCache()

Bases: WorkflowCache

In-memory cache for workflow step results.

MemoryWorkflowCache stores results in a Python dictionary, providing fast access but no persistence across process restarts. Useful for development, testing, or when results don't need to survive process boundaries.

Examples:

>>> cache = MemoryWorkflowCache()
>>> cache[step_info] = result
>>> if step_info in cache:
...     result = cache[step_info]
>>> print(len(cache))  # Number of cached steps
Note
  • Cache is lost when process ends
  • Not suitable for production workflows
  • No size limit - can grow unbounded
Source code in src/formed/workflow/cache.py
188
189
def __init__(self) -> None:
    self._cache: dict["WorkflowStepInfo", Any] = {}

FilesystemWorkflowCache

FilesystemWorkflowCache(directory)

Bases: WorkflowCache

Persistent file-based cache for workflow step results.

FilesystemWorkflowCache stores step results in a directory structure organized by fingerprint. Each step's result is serialized using its configured Format and written to a subdirectory. File locking ensures thread-safe concurrent access.

ATTRIBUTE DESCRIPTION
_directory

Root directory for cache storage.

Examples:

>>> cache = FilesystemWorkflowCache(".formed/cache")
>>> cache[step_info] = result  # Writes to .formed/cache/<fingerprint>/
>>> if step_info in cache:
...     result = cache[step_info]  # Reads from disk
>>> del cache[step_info]  # Removes cached result
Note
  • Results persist across process restarts
  • Thread-safe via file locking
  • Cache directory structure: {cache_dir}/{fingerprint}/
  • Each step uses its Format for serialization
  • Suitable for production workflows

Initialize filesystem cache.

PARAMETER DESCRIPTION
directory

Root directory for cache storage. Created if doesn't exist.

TYPE: str | PathLike

Source code in src/formed/workflow/cache.py
241
242
243
244
245
246
247
248
249
def __init__(self, directory: str | PathLike) -> None:
    """Initialize filesystem cache.

    Args:
        directory: Root directory for cache storage. Created if doesn't exist.

    """
    self._directory = Path(directory)
    self._directory.mkdir(parents=True, exist_ok=True)

formed.workflow.callback

Callback system for workflow execution monitoring.

This module provides a callback interface for monitoring and responding to workflow execution events. Callbacks can be used for logging, metrics collection, checkpointing, or custom workflow orchestration logic.

Key Components:

- `WorkflowCallback`: Abstract base class for all callbacks
- `EmptyWorkflowCallback`: No-op callback
- `MultiWorkflowCallback`: Combines multiple callbacks
Features
  • Hook points at execution and step start/end
  • Access to execution and step contexts
  • Composable callback system
  • Registrable for configuration-based instantiation

Examples:

>>> from formed.workflow import WorkflowCallback
>>>
>>> @WorkflowCallback.register("custom")
... class CustomCallback(WorkflowCallback):
...     def on_step_start(self, step_context, execution_context):
...         print(f"Starting step: {step_context.info.name}")
...
...     def on_step_end(self, step_context, execution_context):
...         print(f"Finished step: {step_context.info.name}")

WorkflowCallback

Bases: Registrable

Abstract base class for workflow execution callbacks.

Callbacks provide hooks to execute custom logic at various points during workflow execution. Subclasses can override hook methods to implement custom monitoring, logging, or orchestration behavior.

Hook execution order
  1. on_execution_start - once at workflow start
  2. on_step_start - before each step execution
  3. on_step_end - after each step execution
  4. on_execution_end - once at workflow end

Examples:

>>> class LoggingCallback(WorkflowCallback):
...     def on_execution_start(self, execution_context):
...         print("Workflow started")
...
...     def on_step_end(self, step_context, execution_context):
...         print(f"Step {step_context.info.name} completed")

on_execution_start

on_execution_start(execution_context)

Called once at the start of workflow execution.

PARAMETER DESCRIPTION
execution_context

Context containing execution metadata and state.

TYPE: WorkflowExecutionContext

Source code in src/formed/workflow/callback.py
65
66
67
68
69
70
71
72
73
74
75
def on_execution_start(
    self,
    execution_context: "WorkflowExecutionContext",
) -> None:
    """Called once at the start of workflow execution.

    Args:
        execution_context: Context containing execution metadata and state.

    """
    pass

on_execution_end

on_execution_end(execution_context)

Called once at the end of workflow execution.

PARAMETER DESCRIPTION
execution_context

Context containing execution metadata and state.

TYPE: WorkflowExecutionContext

Source code in src/formed/workflow/callback.py
77
78
79
80
81
82
83
84
85
86
87
def on_execution_end(
    self,
    execution_context: "WorkflowExecutionContext",
) -> None:
    """Called once at the end of workflow execution.

    Args:
        execution_context: Context containing execution metadata and state.

    """
    pass

on_step_start

on_step_start(step_context, execution_context)

Called before each step execution.

PARAMETER DESCRIPTION
step_context

Context for the step about to execute.

TYPE: WorkflowStepContext

execution_context

Context for the overall execution.

TYPE: WorkflowExecutionContext

Source code in src/formed/workflow/callback.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def on_step_start(
    self,
    step_context: "WorkflowStepContext",
    execution_context: "WorkflowExecutionContext",
) -> None:
    """Called before each step execution.

    Args:
        step_context: Context for the step about to execute.
        execution_context: Context for the overall execution.

    """
    pass

on_step_end

on_step_end(step_context, execution_context)

Called after each step execution.

PARAMETER DESCRIPTION
step_context

Context for the step that just executed.

TYPE: WorkflowStepContext

execution_context

Context for the overall execution.

TYPE: WorkflowExecutionContext

Source code in src/formed/workflow/callback.py
103
104
105
106
107
108
109
110
111
112
113
114
115
def on_step_end(
    self,
    step_context: "WorkflowStepContext",
    execution_context: "WorkflowExecutionContext",
) -> None:
    """Called after each step execution.

    Args:
        step_context: Context for the step that just executed.
        execution_context: Context for the overall execution.

    """
    pass

EmptyWorkflowCallback

Bases: WorkflowCallback

No-op callback that does nothing.

This callback can be used as a placeholder or default when no callback behavior is needed.

on_execution_start

on_execution_start(execution_context)

Called once at the start of workflow execution.

PARAMETER DESCRIPTION
execution_context

Context containing execution metadata and state.

TYPE: WorkflowExecutionContext

Source code in src/formed/workflow/callback.py
65
66
67
68
69
70
71
72
73
74
75
def on_execution_start(
    self,
    execution_context: "WorkflowExecutionContext",
) -> None:
    """Called once at the start of workflow execution.

    Args:
        execution_context: Context containing execution metadata and state.

    """
    pass

on_execution_end

on_execution_end(execution_context)

Called once at the end of workflow execution.

PARAMETER DESCRIPTION
execution_context

Context containing execution metadata and state.

TYPE: WorkflowExecutionContext

Source code in src/formed/workflow/callback.py
77
78
79
80
81
82
83
84
85
86
87
def on_execution_end(
    self,
    execution_context: "WorkflowExecutionContext",
) -> None:
    """Called once at the end of workflow execution.

    Args:
        execution_context: Context containing execution metadata and state.

    """
    pass

on_step_start

on_step_start(step_context, execution_context)

Called before each step execution.

PARAMETER DESCRIPTION
step_context

Context for the step about to execute.

TYPE: WorkflowStepContext

execution_context

Context for the overall execution.

TYPE: WorkflowExecutionContext

Source code in src/formed/workflow/callback.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def on_step_start(
    self,
    step_context: "WorkflowStepContext",
    execution_context: "WorkflowExecutionContext",
) -> None:
    """Called before each step execution.

    Args:
        step_context: Context for the step about to execute.
        execution_context: Context for the overall execution.

    """
    pass

on_step_end

on_step_end(step_context, execution_context)

Called after each step execution.

PARAMETER DESCRIPTION
step_context

Context for the step that just executed.

TYPE: WorkflowStepContext

execution_context

Context for the overall execution.

TYPE: WorkflowExecutionContext

Source code in src/formed/workflow/callback.py
103
104
105
106
107
108
109
110
111
112
113
114
115
def on_step_end(
    self,
    step_context: "WorkflowStepContext",
    execution_context: "WorkflowExecutionContext",
) -> None:
    """Called after each step execution.

    Args:
        step_context: Context for the step that just executed.
        execution_context: Context for the overall execution.

    """
    pass

MultiWorkflowCallback

MultiWorkflowCallback(callbacks)

Bases: WorkflowCallback

Callback that executes multiple callbacks in sequence.

This callback allows composing multiple callbacks together, calling each one in order for every hook.

PARAMETER DESCRIPTION
callbacks

Sequence of callbacks to execute.

TYPE: Sequence[WorkflowCallback]

Examples:

>>> callback1 = LoggingCallback()
>>> callback2 = MetricsCallback()
>>> multi = MultiWorkflowCallback([callback1, callback2])
>>> # Both callbacks will be called for each hook
Source code in src/formed/workflow/callback.py
148
149
def __init__(self, callbacks: Sequence["WorkflowCallback"]) -> None:
    self._callbacks = callbacks

on_execution_start

on_execution_start(execution_context)
Source code in src/formed/workflow/callback.py
151
152
153
154
155
156
def on_execution_start(
    self,
    execution_context: "WorkflowExecutionContext",
) -> None:
    for callback in self._callbacks:
        callback.on_execution_start(execution_context)

on_execution_end

on_execution_end(execution_context)
Source code in src/formed/workflow/callback.py
158
159
160
161
162
163
def on_execution_end(
    self,
    execution_context: "WorkflowExecutionContext",
) -> None:
    for callback in self._callbacks:
        callback.on_execution_end(execution_context)

on_step_start

on_step_start(step_context, execution_context)
Source code in src/formed/workflow/callback.py
165
166
167
168
169
170
171
def on_step_start(
    self,
    step_context: "WorkflowStepContext",
    execution_context: "WorkflowExecutionContext",
) -> None:
    for callback in self._callbacks:
        callback.on_step_start(step_context, execution_context)

on_step_end

on_step_end(step_context, execution_context)
Source code in src/formed/workflow/callback.py
173
174
175
176
177
178
179
def on_step_end(
    self,
    step_context: "WorkflowStepContext",
    execution_context: "WorkflowExecutionContext",
) -> None:
    for callback in self._callbacks:
        callback.on_step_end(step_context, execution_context)

formed.workflow.colt

COLT_BUILDER module-attribute

COLT_BUILDER = ColtBuilder(
    typekey=COLT_TYPEKEY,
    argskey=COLT_ARGSKEY,
    callback=MultiCallback(
        DatetimeCallback(), RefCallback()
    ),
)

WorkflowRef

WorkflowRef(
    annotation, path, step_name, config, field_name=None
)

Bases: Generic[_T], Placeholder[_T]

Source code in src/formed/workflow/colt.py
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(
    self,
    annotation: _T,
    path: tuple[int | str, ...],
    step_name: str,
    config: Any,
    field_name: str | None = None,
) -> None:
    super().__init__(annotation)
    self._path = path
    self._step_name = step_name
    self._config = config
    self._field_name = field_name

path property

path

step_name property

step_name

config property

config

field_name property

field_name

is_ref staticmethod

is_ref(builder, config)
Source code in src/formed/workflow/colt.py
28
29
30
31
32
33
34
35
@staticmethod
def is_ref(builder: "ColtBuilder", config: Any) -> bool:
    return (
        isinstance(config, Mapping)
        and set(config) == {builder.typekey, WORKFLOW_REFKEY}
        and config[builder.typekey] == WORKFLOW_REFTYPE
        and isinstance(config[WORKFLOW_REFKEY], str)
    )

match_type_hint

match_type_hint(annotation)
Source code in src/formed/workflow/colt.py
67
68
69
70
def match_type_hint(self, annotation: Any) -> bool:
    if self._annotation is Any:
        return True  # Allow Any to match any type hint for flexibility
    return super().match_type_hint(annotation)

RefCallback

Bases: ColtCallback

Replace ref configs with WorkflowRef instances as placeholders

on_build

on_build(path, config, builder, context, annotation=None)
Source code in src/formed/workflow/colt.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def on_build(
    self,
    path: ParamPath,
    config: Any,
    builder: ColtBuilder,
    context: ColtContext,
    annotation: type[_T] | Callable[..., _T] | None = None,
) -> Any:
    from .graph import WorkflowGraph

    annotation = remove_optional(annotation)
    if isinstance(annotation, type) and issubclass(annotation, WorkflowGraph):
        if not isinstance(config, Mapping):
            raise ConfigurationError(f"[{get_path_name(path)}] Expected a mapping, got {config}")
        self._register_step_types(builder, path, config, context)
        return config

    if WorkflowRef.is_ref(builder, config):
        step_name, field_name = WorkflowRef._parse_ref(config[WORKFLOW_REFKEY])
        step_type = self._find_step_type(path, step_name, context)
        step_output_annotation = step_type.get_output_type()
        if field_name is not None:
            try:
                step_output_annotation = typing.get_type_hints(step_output_annotation).get(field_name, Any)
            except TypeError:
                step_output_annotation = Any
        return WorkflowRef(
            annotation=step_output_annotation,
            path=path,
            step_name=step_name,
            config=config,
            field_name=field_name,
        )

    raise SkipCallback

DatetimeCallback

Bases: ColtCallback

on_build

on_build(path, config, builder, context, annotation=None)
Source code in src/formed/workflow/colt.py
175
176
177
178
179
180
181
182
183
184
185
186
def on_build(
    self,
    path: ParamPath,
    config: Any,
    builder: ColtBuilder,
    context: ColtContext,
    annotation: type[_T] | Callable[..., _T] | None = None,
) -> Any:
    del path, builder, context
    if isinstance(config, str) and annotation is datetime.datetime:
        return datetime.datetime.fromisoformat(config)
    raise SkipCallback

formed.workflow.constants

WORKFLOW_REFKEY module-attribute

WORKFLOW_REFKEY = 'ref'

WORKFLOW_REFTYPE module-attribute

WORKFLOW_REFTYPE = 'ref'

WORKFLOW_DEFAULT_DIRECTORY module-attribute

WORKFLOW_DEFAULT_DIRECTORY = (
    DEFAULT_FORMED_DIRECTORY / "workflow"
)

WORKFLOW_DEFAULT_SETTINGS_PATH module-attribute

WORKFLOW_DEFAULT_SETTINGS_PATH = (
    DEFAULT_WORKING_DIRECTORY / "formed.yml"
)

WORKFLOW_INTEGRATION_DIRECTORY module-attribute

WORKFLOW_INTEGRATION_DIRECTORY = (
    DEFAULT_FORMED_DIRECTORY / "integrations"
)

WORKFLOW_WORKSPACE_DIRECTORY module-attribute

WORKFLOW_WORKSPACE_DIRECTORY = (
    DEFAULT_FORMED_DIRECTORY / "workspaces"
)

formed.workflow.executor

Workflow execution engine and context management.

This module provides the execution engine for workflows, coordinating step execution, caching, callbacks, and state management.

Key Components
  • WorkflowExecutor: Abstract base for execution engines
  • DefaultWorkflowExecutor: Default sequential execution implementation
  • WorkflowExecutionContext: Runtime context for workflow execution
  • WorkflowExecutionInfo: Metadata about workflow execution
Features
  • Sequential step execution with dependency resolution
  • Cache integration for step results
  • Callback hooks for monitoring and logging
  • Execution state tracking
  • Git and environment metadata capture

Examples:

>>> from formed.workflow import WorkflowGraph, DefaultWorkflowExecutor
>>> from formed.workflow.cache import FilesystemWorkflowCache
>>>
>>> # Load workflow and create executor
>>> graph = WorkflowGraph.from_jsonnet("workflow.jsonnet")
>>> executor = DefaultWorkflowExecutor()
>>> cache = FilesystemWorkflowCache(".formed/cache")
>>>
>>> # Execute workflow
>>> with executor:
...     context = executor(graph, cache=cache)
>>> print(context.state.status)  # "completed"

logger module-attribute

logger = getLogger(__name__)

T module-attribute

T = TypeVar('T')

WorkflowExecutorT module-attribute

WorkflowExecutorT = TypeVar(
    "WorkflowExecutorT", bound="WorkflowExecutor"
)

WorkflowExecutionID module-attribute

WorkflowExecutionID = NewType('WorkflowExecutionID', str)

WorkflowExecutionStatus

Bases: str, Enum

PENDING class-attribute instance-attribute

PENDING = 'pending'

RUNNING class-attribute instance-attribute

RUNNING = 'running'

FAILURE class-attribute instance-attribute

FAILURE = 'failure'

CANCELED class-attribute instance-attribute

CANCELED = 'canceled'

COMPLETED class-attribute instance-attribute

COMPLETED = 'completed'

WorkflowExecutionState dataclass

WorkflowExecutionState(
    execution_id=None,
    status=PENDING,
    started_at=None,
    finished_at=None,
)

execution_id class-attribute instance-attribute

execution_id = None

status class-attribute instance-attribute

status = PENDING

started_at class-attribute instance-attribute

started_at = None

finished_at class-attribute instance-attribute

finished_at = None

WorkflowExecutionMetadata dataclass

WorkflowExecutionMetadata(
    version=version("formed"),
    git=get_git_info(),
    environment=dict(),
    required_modules=list(),
    dependent_packages=get_installed_packages(),
)

version class-attribute instance-attribute

version = version('formed')

git class-attribute instance-attribute

git = field(default_factory=get_git_info)

environment class-attribute instance-attribute

environment = field(default_factory=dict)

required_modules class-attribute instance-attribute

required_modules = field(default_factory=list)

dependent_packages class-attribute instance-attribute

dependent_packages = field(
    default_factory=get_installed_packages
)

WorkflowExecutionInfo dataclass

WorkflowExecutionInfo(
    graph, id=None, metadata=WorkflowExecutionMetadata()
)

graph instance-attribute

graph

id class-attribute instance-attribute

id = None

metadata class-attribute instance-attribute

metadata = field(default_factory=WorkflowExecutionMetadata)

json

json()

Convert to JSON-serializable dict.

Returns a dict that may contain types requiring WorkflowJSONEncoder for proper serialization (e.g., datetime, git info).

Examples:

>>> data = execution_info.to_json_dict()
>>> json.dump(data, file, cls=WorkflowJSONEncoder)
Source code in src/formed/workflow/executor.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def json(self) -> dict[str, JsonValue]:
    """Convert to JSON-serializable dict.

    Returns a dict that may contain types requiring `WorkflowJSONEncoder`
    for proper serialization (e.g., datetime, git info).

    Examples:
        >>> data = execution_info.to_json_dict()
        >>> json.dump(data, file, cls=WorkflowJSONEncoder)
    """
    return {
        "format_version": "2.0",
        "id": self.id or "",
        "graph": self.graph.to_archive().json(),
        "metadata": as_jsonvalue(self.metadata),
    }

from_json classmethod

from_json(data)

Reconstruct from JSON-serializable dict.

Validates format version and reconstructs all fields.

Examples:

>>> data = json.load(file, cls=WorkflowJSONDecoder)
>>> execution_info = WorkflowExecutionInfo.from_json_dict(data)
Source code in src/formed/workflow/executor.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
@classmethod
def from_json(cls, data: dict[str, JsonValue], /) -> Self:
    """Reconstruct from JSON-serializable dict.

    Validates format version and reconstructs all fields.

    Examples:
        >>> data = json.load(file, cls=WorkflowJSONDecoder)
        >>> execution_info = WorkflowExecutionInfo.from_json_dict(data)
    """

    if data.get("format_version") != "2.0":
        raise ValueError(f"Unsupported format version '{data.get('format_version')}'. Expected '2.0'.")

    execution_id = WorkflowExecutionID(cast(str, data["id"])) if data["id"] else None
    graph = WorkflowGraph.from_archive(WorkflowGraphArchive.from_json(cast(dict[str, JsonValue], data["graph"])))
    metadata: WorkflowExecutionMetadata = from_jsonvalue(data["metadata"])

    assert isinstance(metadata, WorkflowExecutionMetadata)

    return cls(
        graph=graph,
        id=execution_id,
        metadata=metadata,
    )

WorkflowExecutionContext dataclass

WorkflowExecutionContext(
    info,
    state,
    cache=EmptyWorkflowCache(),
    callback=EmptyWorkflowCallback(),
)

info instance-attribute

info

state instance-attribute

state

cache class-attribute instance-attribute

cache = field(default_factory=EmptyWorkflowCache)

callback class-attribute instance-attribute

callback = field(default_factory=EmptyWorkflowCallback)

WorkflowExecutor

Bases: Registrable

DefaultWorkflowExecutor

use_execution_context

use_execution_context()
Source code in src/formed/workflow/executor.py
313
314
def use_execution_context() -> Optional[WorkflowExecutionContext]:
    return _EXECUTION_CONTEXT.get()

formed.workflow.format

Serialization formats for workflow step results.

This module provides format abstractions for serializing and deserializing workflow step results. Multiple formats support different data types and use cases.

Key Components
  • Format: Abstract base class for all formats
  • PickleFormat: Universal format using cloudpickle
  • JsonFormat: JSON format for JSON-serializable data
  • MappingFormat: JSON format for mappings/dicts
  • AutoFormat: Automatically selects appropriate format
Features
  • Support for iterators and streaming data
  • Type-safe serialization/deserialization
  • Format auto-detection based on data type
  • Extensible via registration system

Examples:

>>> from formed.workflow import JsonFormat, PickleFormat
>>>
>>> # JSON format for simple data
>>> json_format = JsonFormat()
>>> json_format.write({"key": "value"}, directory)
>>> data = json_format.read(directory)
>>>
>>> # Pickle format for complex objects
>>> pickle_format = PickleFormat()
>>> pickle_format.write(my_model, directory)
>>> model = pickle_format.read(directory)

Format

Bases: Generic[_T], Registrable

Abstract base class for serialization formats.

Formats handle serialization and deserialization of workflow step results to/from disk. Each format is identified by a unique identifier and can indicate if it's the default format for a given type.

CLASS TYPE PARAMETER DESCRIPTION
_T

Type of data this format can serialize/deserialize.

identifier property

identifier

Get the unique identifier for this format.

RETURNS DESCRIPTION
str

Format identifier string.

write

write(artifact, directory)

Write artifact to directory.

PARAMETER DESCRIPTION
artifact

Data to serialize.

TYPE: _T

directory

Directory to write to.

TYPE: Path

Source code in src/formed/workflow/format.py
79
80
81
82
83
84
85
86
87
def write(self, artifact: _T, directory: Path) -> None:
    """Write artifact to directory.

    Args:
        artifact: Data to serialize.
        directory: Directory to write to.

    """
    raise NotImplementedError

read

read(directory)

Read artifact from directory.

PARAMETER DESCRIPTION
directory

Directory to read from.

TYPE: Path

RETURNS DESCRIPTION
_T

Deserialized data.

Source code in src/formed/workflow/format.py
89
90
91
92
93
94
95
96
97
98
99
def read(self, directory: Path) -> _T:
    """Read artifact from directory.

    Args:
        directory: Directory to read from.

    Returns:
        Deserialized data.

    """
    raise NotImplementedError

is_default_of classmethod

is_default_of(obj)

Check if this format is the default for the given object type.

PARAMETER DESCRIPTION
obj

Object to check.

TYPE: Any

RETURNS DESCRIPTION
bool

True if this format should be used by default for this type.

Source code in src/formed/workflow/format.py
101
102
103
104
105
106
107
108
109
110
111
112
@classmethod
def is_default_of(cls, obj: Any) -> bool:
    """Check if this format is the default for the given object type.

    Args:
        obj: Object to check.

    Returns:
        True if this format should be used by default for this type.

    """
    return False

PickleFormat

Bases: Format[_T], Generic[_T]

Universal serialization format using cloudpickle.

This format can serialize almost any Python object, including functions, classes, and complex nested structures. It also supports streaming iterators.

Examples:

>>> format = PickleFormat()
>>> format.write(my_object, directory)
>>> obj = format.read(directory)
>>>
>>> # For iterators
>>> format.write(iter(range(1000)), directory)
>>> iterator = format.read(directory)  # Returns iterator
Note

This is the fallback format when no other format applies.

identifier property

identifier

Get the unique identifier for this format.

RETURNS DESCRIPTION
str

Format identifier string.

write

write(artifact, directory)
Source code in src/formed/workflow/format.py
158
159
160
161
162
163
164
165
166
167
def write(self, artifact: _T, directory: Path) -> None:
    artifact_path = self._get_artifact_path(directory)
    with open(artifact_path, "wb") as f:
        if isinstance(artifact, Iterator):
            cloudpickle.dump(True, f)
            for item in artifact:
                cloudpickle.dump(item, f)
        else:
            cloudpickle.dump(False, f)
            cloudpickle.dump(artifact, f)

read

read(directory)
Source code in src/formed/workflow/format.py
169
170
171
172
173
174
175
def read(self, directory: Path) -> _T:
    artifact_path = self._get_artifact_path(directory)
    with open(artifact_path, "rb") as f:
        is_iterator = cloudpickle.load(f)
        if is_iterator:
            return cast(_T, self._IteratorWrapper(artifact_path))
        return cast(_T, cloudpickle.load(f))

is_default_of classmethod

is_default_of(obj)

Check if this format is the default for the given object type.

PARAMETER DESCRIPTION
obj

Object to check.

TYPE: Any

RETURNS DESCRIPTION
bool

True if this format should be used by default for this type.

Source code in src/formed/workflow/format.py
101
102
103
104
105
106
107
108
109
110
111
112
@classmethod
def is_default_of(cls, obj: Any) -> bool:
    """Check if this format is the default for the given object type.

    Args:
        obj: Object to check.

    Returns:
        True if this format should be used by default for this type.

    """
    return False

JsonFormat

Bases: Format[_JsonFormattableT], Generic[_JsonFormattableT]

JSON-based serialization format for JSON-compatible data.

This format serializes data to JSON files (.json for single objects, .jsonl for iterators). It supports all JSON-serializable types plus dataclasses, named tuples, and Pydantic models.

Features
  • Human-readable format
  • Support for iterators via JSON Lines format
  • Automatic type reconstruction using metadata
  • Custom JSON encoder/decoder for extended types
CLASS TYPE PARAMETER DESCRIPTION
_JsonFormattableT

JSON-compatible type (primitives, containers, dataclasses, etc.)

Examples:

>>> format = JsonFormat()
>>>
>>> # Single object
>>> format.write({"key": "value"}, directory)
>>> data = format.read(directory)
>>>
>>> # Iterator (uses JSONL)
>>> format.write(iter([{"a": 1}, {"a": 2}]), directory)
>>> iterator = format.read(directory)
Note

This is the default format for JSON-serializable types. Objects are reconstructed with their original type using metadata.

identifier property

identifier

Get the unique identifier for this format.

RETURNS DESCRIPTION
str

Format identifier string.

write

write(artifact, directory)

Write JSON-serializable artifact to directory.

Writes artifact.json for single objects or artifact.jsonl for iterators. Also writes metadata.json containing type information for reconstruction.

PARAMETER DESCRIPTION
artifact

Data to serialize (single object or iterator).

TYPE: _JsonFormattableT

directory

Directory to write to.

TYPE: Path

Source code in src/formed/workflow/format.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def write(self, artifact: _JsonFormattableT, directory: Path) -> None:
    """Write JSON-serializable artifact to directory.

    Writes artifact.json for single objects or `artifact.jsonl` for
    iterators. Also writes metadata.json containing type information
    for reconstruction.

    Args:
        artifact: Data to serialize (single object or iterator).
        directory: Directory to write to.

    """
    artifact_class: type[_JsonFormattableT] | None = None
    if isinstance(artifact, Iterator):
        artifact_path = directory / "artifact.jsonl"
        with open(artifact_path, "w") as f:
            for item in artifact:
                artifact_class = cast(
                    type[_JsonFormattableT],
                    artifact_class or type(item),
                )
                json.dump(item, f, cls=WorkflowJSONEncoder, ensure_ascii=False)
                f.write("\n")
    else:
        artifact_class = type(artifact)
        artifact_path = directory / "artifact.json"
        with open(artifact_path, "w") as f:
            json.dump(artifact, f, cls=WorkflowJSONEncoder, ensure_ascii=False)
    if artifact_class is not None:
        metadata = {
            "module": artifact_class.__module__,
            "class": artifact_class.__name__,
        }
        metadata_path = directory / "metadata.json"
        with open(metadata_path, "w") as f:
            json.dump(metadata, f, ensure_ascii=False)

read

read(directory)

Read JSON artifact from directory.

Reads artifact.json or artifact.jsonl and reconstructs the original type using metadata.json if available.

PARAMETER DESCRIPTION
directory

Directory to read from.

TYPE: Path

RETURNS DESCRIPTION
_JsonFormattableT

Deserialized data with original type.

Source code in src/formed/workflow/format.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def read(self, directory: Path) -> _JsonFormattableT:
    """Read JSON artifact from directory.

    Reads `artifact.json` or artifact.jsonl and reconstructs the
    original type using `metadata.json` if available.

    Args:
        directory: Directory to read from.

    Returns:
        Deserialized data with original type.

    """
    metadata_path = directory / "metadata.json"
    artifact_class: type[_JsonFormattableT] | None = None
    if metadata_path.exists():
        with open(metadata_path) as f:
            metadata = json.load(f, cls=WorkflowJSONDecoder)
        module = importlib.import_module(metadata["module"])
        artifact_class = getattr(module, metadata["class"])

    is_iterator = (directory / "artifact.jsonl").exists()
    if is_iterator:
        artifact_path = directory / "artifact.jsonl"
        return cast(_JsonFormattableT, self._IteratorWrapper(artifact_path, artifact_class))

    artifact_path = directory / "artifact.json"
    with open(artifact_path) as f:
        data = json.load(f, cls=WorkflowJSONDecoder)
        if artifact_class is not None:
            return colt.build(data, artifact_class)
        return cast(_JsonFormattableT, data)

is_default_of classmethod

is_default_of(obj)

Check if JSON format is default for object type.

PARAMETER DESCRIPTION
obj

Object to check.

TYPE: Any

RETURNS DESCRIPTION
bool

True for JSON-serializable types (primitives, containers,

bool

dataclasses, named tuples, Pydantic models).

Source code in src/formed/workflow/format.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
@classmethod
def is_default_of(cls, obj: Any) -> bool:
    """Check if JSON format is default for object type.

    Args:
        obj: Object to check.

    Returns:
        True for JSON-serializable types (primitives, containers,
        dataclasses, named tuples, Pydantic models).

    """
    return isinstance(
        obj,
        (
            int,
            float,
            str,
            bool,
            dict,
            list,
            tuple,
            IDataclass,
            INamedTuple,
            IPydanticModel,
        ),
    )

MappingFormat

MappingFormat(format)

Bases: Format[Mapping[str, _T]], Generic[_T]

Format for mappings using subdirectories for values.

This format stores each mapping entry as a subdirectory, with the key as the directory name and the value serialized using a nested format. This allows mappings of complex objects to be stored in an organized directory structure.

CLASS TYPE PARAMETER DESCRIPTION
_T

Type of mapping values.

PARAMETER DESCRIPTION
format

Format to use for serializing mapping values.

TYPE: Format[_T]

Examples:

>>> # Mapping of strings to dataframes
>>> inner_format = PickleFormat()
>>> format = MappingFormat(inner_format)
>>>
>>> data = {
...     "train": train_df,
...     "test": test_df,
... }
>>> format.write(data, directory)
>>> # Creates: directory/train/artifact.pkl
>>> #          directory/test/artifact.pkl
>>>
>>> loaded = format.read(directory)
Note

Keys must be valid directory names (no special characters).

Source code in src/formed/workflow/format.py
364
365
def __init__(self, format: Format[_T]) -> None:
    self._format = format

identifier property

identifier

Get the unique identifier for this format.

RETURNS DESCRIPTION
str

Format identifier string.

write

write(artifact, directory)

Write mapping to subdirectories.

Each mapping entry is written to a subdirectory named after the key, with the value serialized using the nested format.

PARAMETER DESCRIPTION
artifact

Mapping to serialize.

TYPE: Mapping[str, _T]

directory

Directory to write to.

TYPE: Path

Source code in src/formed/workflow/format.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
def write(self, artifact: Mapping[str, _T], directory: Path) -> None:
    """Write mapping to subdirectories.

    Each mapping entry is written to a subdirectory named after
    the key, with the value serialized using the nested format.

    Args:
        artifact: Mapping to serialize.
        directory: Directory to write to.

    """
    for key, value in artifact.items():
        subdir = directory / key
        subdir.mkdir(parents=True)
        self._format.write(value, subdir)

read

read(directory)

Read mapping from subdirectories.

Reconstructs the mapping by reading each subdirectory as a key-value pair.

PARAMETER DESCRIPTION
directory

Directory to read from.

TYPE: Path

RETURNS DESCRIPTION
Mapping[str, _T]

Reconstructed mapping.

Source code in src/formed/workflow/format.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
def read(self, directory: Path) -> Mapping[str, _T]:
    """Read mapping from subdirectories.

    Reconstructs the mapping by reading each subdirectory as a
    key-value pair.

    Args:
        directory: Directory to read from.

    Returns:
        Reconstructed mapping.

    """
    artifact: dict[str, _T] = {}
    for subdir in directory.glob("*"):
        artifact[subdir.name] = self._format.read(subdir)
    return artifact

is_default_of classmethod

is_default_of(obj)

Check if this format is the default for the given object type.

PARAMETER DESCRIPTION
obj

Object to check.

TYPE: Any

RETURNS DESCRIPTION
bool

True if this format should be used by default for this type.

Source code in src/formed/workflow/format.py
101
102
103
104
105
106
107
108
109
110
111
112
@classmethod
def is_default_of(cls, obj: Any) -> bool:
    """Check if this format is the default for the given object type.

    Args:
        obj: Object to check.

    Returns:
        True if this format should be used by default for this type.

    """
    return False

DatasetFormat

Bases: Format[Dataset[_T]], Generic[_T]

identifier property

identifier

Get the unique identifier for this format.

RETURNS DESCRIPTION
str

Format identifier string.

write

write(artifact, directory)
Source code in src/formed/workflow/format.py
404
405
406
407
def write(self, artifact: Dataset[_T], directory: Path) -> None:
    import shutil

    shutil.copytree(artifact.path, directory / "dataset")

read

read(directory)
Source code in src/formed/workflow/format.py
409
410
def read(self, directory: Path) -> Dataset[_T]:
    return Dataset[_T].from_path(directory / "dataset")

is_default_of classmethod

is_default_of(obj)
Source code in src/formed/workflow/format.py
412
413
414
@classmethod
def is_default_of(cls, obj: Any) -> bool:
    return isinstance(obj, Dataset)

AutoFormat

Bases: Format[_T]

Automatic format selection based on object type.

This format automatically selects the most appropriate format for an object by checking each registered format's is_default_of() method. It stores the chosen format name in metadata for correct deserialization.

Selection priority
  1. Last registered format that claims the type (most specific)
  2. Falls back to pickle format if no format claims the type

Examples:

>>> format = AutoFormat()
>>>
>>> # Automatically uses JsonFormat for dict
>>> format.write({"key": "value"}, directory)
>>>
>>> # Automatically uses PickleFormat for custom objects
>>> format.write(my_custom_object, directory)
>>>
>>> # Reads with the same format used during write
>>> obj = format.read(directory)
Note

This is the recommended format for most use cases as it provides optimal serialization for each type.

identifier property

identifier

Get the unique identifier for this format.

RETURNS DESCRIPTION
str

Format identifier string.

write

write(artifact, directory)

Write artifact using automatically selected format.

Selects the appropriate format, writes the artifact, and stores the format name in metadata for deserialization.

PARAMETER DESCRIPTION
artifact

Data to serialize.

TYPE: _T

directory

Directory to write to.

TYPE: Path

Source code in src/formed/workflow/format.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
def write(self, artifact: _T, directory: Path) -> None:
    """Write artifact using automatically selected format.

    Selects the appropriate format, writes the artifact, and stores
    the format name in metadata for deserialization.

    Args:
        artifact: Data to serialize.
        directory: Directory to write to.

    """
    format_name = self._get_default_format_name(artifact)
    format = cast(type[Format[_T]], Format.by_name(format_name))()
    format.write(artifact, directory)
    (directory / self._FORMAT_FILENAME).write_text(json.dumps({"name": format_name}))

read

read(directory)

Read artifact using the format recorded in metadata.

Reads the format metadata and uses the same format that was used during writing.

PARAMETER DESCRIPTION
directory

Directory to read from.

TYPE: Path

RETURNS DESCRIPTION
_T

Deserialized data.

Source code in src/formed/workflow/format.py
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
def read(self, directory: Path) -> _T:
    """Read artifact using the format recorded in metadata.

    Reads the format metadata and uses the same format that was
    used during writing.

    Args:
        directory: Directory to read from.

    Returns:
        Deserialized data.

    """
    format_metadata = json.loads((directory / self._FORMAT_FILENAME).read_text())
    format_name = format_metadata["name"]
    format = cast(type[Format[_T]], Format.by_name(format_name))()
    return format.read(directory)

is_default_of classmethod

is_default_of(obj)

Check if this format is the default for the given object type.

PARAMETER DESCRIPTION
obj

Object to check.

TYPE: Any

RETURNS DESCRIPTION
bool

True if this format should be used by default for this type.

Source code in src/formed/workflow/format.py
101
102
103
104
105
106
107
108
109
110
111
112
@classmethod
def is_default_of(cls, obj: Any) -> bool:
    """Check if this format is the default for the given object type.

    Args:
        obj: Object to check.

    Returns:
        True if this format should be used by default for this type.

    """
    return False

formed.workflow.graph

Workflow graph construction and dependency resolution.

This module provides the WorkflowGraph class which parses workflow configurations and builds directed acyclic graphs (DAGs) of workflow steps with dependency tracking.

Key Features
  • Parse Jsonnet workflow configurations
  • Automatic dependency detection via references
  • Topological sorting for execution order
  • Cycle detection in dependencies
  • DAG-based workflow representation

Examples:

>>> from formed.workflow import WorkflowGraph
>>>
>>> # Load workflow from Jsonnet config
>>> graph = WorkflowGraph.from_jsonnet("workflow.jsonnet")
>>>
>>> # Access steps in topological order
>>> for step_info in graph:
...     print(f"Step: {step_info.name}")
...     print(f"Dependencies: {step_info.dependencies}")
>>>
>>> # Get specific step
>>> preprocess_step = graph["preprocess"]
>>> print(preprocess_step.fingerprint)

WorkflowGraphConfig

Bases: TypedDict

steps instance-attribute

steps

WorkflowGraph

WorkflowGraph(steps)

Bases: FromJsonnet

Source code in src/formed/workflow/graph.py
129
130
131
132
133
def __init__(
    self,
    steps: Mapping[str, Lazy[WorkflowStep]],
) -> None:
    self._step_info = self._build_step_info(steps)

get_subgraph

get_subgraph(step_name)

Get a subgraph containing a step and all its dependencies.

Only works with live (non-archived) graphs. For archived graphs, dependencies are already resolved in the archive.

Source code in src/formed/workflow/graph.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def get_subgraph(self, step_name: str) -> "WorkflowGraph":
    """Get a subgraph containing a step and all its dependencies.

    Only works with live (non-archived) graphs. For archived graphs,
    dependencies are already resolved in the archive.
    """
    if step_name not in self._step_info:
        raise ValueError(f"Step {step_name} not found in the graph")
    step_info = self._step_info[step_name]

    # Type narrowing: ensure all steps are live
    if not isinstance(step_info.step, Lazy):
        raise TypeError(
            f"Cannot create subgraph from archived step '{step_name}'. "
            f"Subgraph extraction only works with live workflows."
        )

    subgraph_steps: dict[str, Lazy[WorkflowStep]] = {step_name: step_info.step}
    for _, dependant_step_info in step_info.dependencies:
        if not isinstance(dependant_step_info.step, Lazy):
            raise TypeError(f"Cannot create subgraph: dependency '{dependant_step_info.name}' is archived.")
        for sub_step_info in self.get_subgraph(dependant_step_info.name):
            if not isinstance(sub_step_info.step, Lazy):
                raise TypeError(f"Cannot create subgraph: nested dependency '{sub_step_info.name}' is archived.")
            subgraph_steps[sub_step_info.name] = sub_step_info.step
    return WorkflowGraph(subgraph_steps)

visualize

visualize(*, output=stdout, additional_info={})
Source code in src/formed/workflow/graph.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def visualize(
    self,
    *,
    output: TextIO = sys.stdout,
    additional_info: Mapping[str, str] = {},
) -> None:
    def get_node(name: str) -> str:
        if name in additional_info:
            return f"{name}: {additional_info[name]}"
        return name

    dag = DAG(
        {
            get_node(name): {get_node(dep.name) for _, dep in info.dependencies}
            for name, info in self._step_info.items()
        }
    )

    dag.visualize(output=output)

from_config classmethod

from_config(config)
Source code in src/formed/workflow/graph.py
188
189
190
@classmethod
def from_config(cls, config: WorkflowGraphConfig) -> "WorkflowGraph":
    return cls.__COLT_BUILDER__(config, WorkflowGraph)

to_archive

to_archive()

Convert graph to archive format for serialization.

This captures the execution-time state of all steps in a flat structure. Only works with live graphs.

Source code in src/formed/workflow/graph.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def to_archive(self) -> WorkflowGraphArchive:
    """Convert graph to archive format for serialization.

    This captures the execution-time state of all steps in a flat structure.
    Only works with live graphs.
    """
    # Ensure all steps are live before archiving
    for step_info in self:
        if not isinstance(step_info.step, Lazy):
            raise TypeError(
                f"Cannot archive graph containing archived step '{step_info.name}'. "
                f"Only live graphs can be converted to archives."
            )

    # Convert all steps to archives
    steps: dict[str, WorkflowStepArchive] = {}
    for step_info in self:
        steps[step_info.name] = step_info.to_archive()

    # Compute execution order (topological sort)
    execution_order = [step_info.name for step_info in self]

    return WorkflowGraphArchive(
        steps=steps,
        execution_order=execution_order,
    )

from_archive classmethod

from_archive(archive)

Reconstruct graph from archive.

Handles all dependency resolution internally using a two-pass approach. Organizers don't need to know about the complexity.

Source code in src/formed/workflow/graph.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
@classmethod
def from_archive(cls, archive: WorkflowGraphArchive) -> "WorkflowGraph":
    """Reconstruct graph from archive.

    Handles all dependency resolution internally using a two-pass approach.
    Organizers don't need to know about the complexity.
    """
    # First pass: Create all step infos without dependencies
    # This builds a fingerprint -> WorkflowStepInfo map
    fingerprint_to_info: dict[str, WorkflowStepInfo] = {}

    for step_name in archive.execution_order:
        step_archive = archive.steps[step_name]
        # Create WorkflowStepInfo with archive but no dependencies yet
        step_info = WorkflowStepInfo(
            name=step_archive.name,
            step=step_archive,
            dependencies=frozenset(),
            fieldref=step_archive.fieldref,
        )
        fingerprint_to_info[step_archive.fingerprint] = step_info

    # Second pass: Resolve dependencies using from_archive
    step_name_to_info: dict[str, WorkflowStepInfo] = {}
    for step_name in archive.execution_order:
        step_archive = archive.steps[step_name]
        step_info = WorkflowStepInfo.from_archive(step_archive, fingerprint_to_info)
        step_name_to_info[step_name] = step_info

    # Create graph directly by setting _step_info
    graph = cls.__new__(cls)
    graph._step_info = step_name_to_info
    return graph

from_jsonnet classmethod

from_jsonnet(filename, ext_vars=None, overrides=None)
Source code in src/formed/common/jsonnet.py
126
127
128
129
130
131
132
133
134
@classmethod
def from_jsonnet(
    cls,
    filename: Union[str, PathLike],
    ext_vars: Optional[Mapping[str, Any]] = None,
    overrides: Optional[str] = None,
) -> Self:
    json_config = load_jsonnet(filename, ext_vars=ext_vars, overrides=overrides)
    return cls.from_json(json_config)

json

json()
Source code in src/formed/common/jsonnet.py
140
141
142
143
def json(self) -> JsonValue:
    if not hasattr(self, "__json_config__"):
        raise RuntimeError(f"{self.__class__.__name__} instance has no JSON config")
    return self.__json_config__

from_json classmethod

from_json(o)
Source code in src/formed/common/jsonnet.py
145
146
147
148
149
@classmethod
def from_json(cls, o: JsonValue, /) -> Self:
    obj = cls.__COLT_BUILDER__(cls.__pre_init__(o), cls)
    obj.__json_config__ = o
    return obj

formed.workflow.jsonschema

generate_workflow_schema

generate_workflow_schema(title='formed Workflow Graph')
Source code in src/formed/workflow/jsonschema.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def generate_workflow_schema(
    title: str = "formed Workflow Graph",
) -> dict[str, Any]:
    WorkflowGraphSchema = TypedDict("WorkflowGraphSchema", {"steps": dict[str, WorkflowStep]})
    definitions = {"__ref__": _REF_SCHEMA}
    generator = JsonSchemaGenerator(
        callback=_ref_callback,
        typekey=COLT_TYPEKEY,
        argskey=COLT_ARGSKEY,
    )
    schema = generator(
        WorkflowGraphSchema,
        definitions=definitions,
        title=title,
    )
    if defs := schema.get("$defs", None):
        for key, defschema in defs.items():
            defschema = _remove_ref_schema(defschema)
            def_schema = _remove_importable_schema(defschema)
            defs[key] = def_schema
    return schema

formed.workflow.organizer

Workflow organizers for managing execution state and artifacts.

This module provides organizers that coordinate workflow execution, managing caches, callbacks, and persistent storage of execution state.

Key Components
  • WorkflowOrganizer: Abstract base class for organizers
  • MemoryWorkflowOrganizer: In-memory organizer for testing
  • FilesystemWorkflowOrganizer: Persistent filesystem-based organizer
Features
  • Execution lifecycle management
  • Integration with caches and callbacks
  • Persistent storage of execution metadata and results
  • Thread-safe execution with file locking
  • Automatic log capture and organization

Examples:

>>> from formed.workflow import FilesystemWorkflowOrganizer, DefaultWorkflowExecutor
>>>
>>> # Create filesystem organizer
>>> organizer = FilesystemWorkflowOrganizer(
...     directory=".formed",
...     callbacks=[my_callback]
... )
>>>
>>> # Run workflow
>>> executor = DefaultWorkflowExecutor()
>>> context = organizer.run(executor, workflow_graph)
>>>
>>> # Access results
>>> print(context.state)
>>> print(context.results)

logger module-attribute

logger = getLogger(__name__)

T_WorkflowOrganizer module-attribute

T_WorkflowOrganizer = TypeVar(
    "T_WorkflowOrganizer", bound="WorkflowOrganizer"
)

WorkflowOrganizer

WorkflowOrganizer(cache, callbacks)

Bases: Registrable

Source code in src/formed/workflow/organizer.py
71
72
73
74
75
76
77
78
79
80
def __init__(
    self,
    cache: "WorkflowCache",
    callbacks: Optional[Union[WorkflowCallback, Sequence[WorkflowCallback]]],
) -> None:
    if isinstance(callbacks, WorkflowCallback):
        callbacks = [callbacks]

    self.cache = cache
    self.callback = MultiWorkflowCallback(callbacks or [])

cache instance-attribute

cache = cache

callback instance-attribute

callback = MultiWorkflowCallback(callbacks or [])

run

run(executor, execution)
Source code in src/formed/workflow/organizer.py
82
83
84
85
86
87
88
89
90
91
92
def run(
    self,
    executor: "WorkflowExecutor",
    execution: Union[WorkflowGraph, WorkflowExecutionInfo],
) -> WorkflowExecutionContext:
    with executor:
        return executor(
            execution,
            cache=self.cache,
            callback=self.callback,
        )

get

get(execution_id)
Source code in src/formed/workflow/organizer.py
94
95
def get(self, execution_id: WorkflowExecutionID) -> Optional[WorkflowExecutionContext]:
    return None

exists

exists(execution_id)
Source code in src/formed/workflow/organizer.py
97
98
def exists(self, execution_id: WorkflowExecutionID) -> bool:
    return self.get(execution_id) is not None

remove

remove(execution_id)
Source code in src/formed/workflow/organizer.py
100
101
def remove(self, execution_id: WorkflowExecutionID) -> None:
    pass

MemoryWorkflowOrganizer

MemoryWorkflowOrganizer(callbacks=None)

Bases: WorkflowOrganizer

Source code in src/formed/workflow/organizer.py
106
107
108
109
110
def __init__(
    self,
    callbacks: Optional[Union[WorkflowCallback, Sequence[WorkflowCallback]]] = None,
) -> None:
    super().__init__(MemoryWorkflowCache(), callbacks)

cache instance-attribute

cache = cache

callback instance-attribute

callback = MultiWorkflowCallback(callbacks or [])

run

run(executor, execution)
Source code in src/formed/workflow/organizer.py
82
83
84
85
86
87
88
89
90
91
92
def run(
    self,
    executor: "WorkflowExecutor",
    execution: Union[WorkflowGraph, WorkflowExecutionInfo],
) -> WorkflowExecutionContext:
    with executor:
        return executor(
            execution,
            cache=self.cache,
            callback=self.callback,
        )

get

get(execution_id)
Source code in src/formed/workflow/organizer.py
94
95
def get(self, execution_id: WorkflowExecutionID) -> Optional[WorkflowExecutionContext]:
    return None

exists

exists(execution_id)
Source code in src/formed/workflow/organizer.py
97
98
def exists(self, execution_id: WorkflowExecutionID) -> bool:
    return self.get(execution_id) is not None

remove

remove(execution_id)
Source code in src/formed/workflow/organizer.py
100
101
def remove(self, execution_id: WorkflowExecutionID) -> None:
    pass

FilesystemWorkflowOrganizer

FilesystemWorkflowOrganizer(directory=None, callbacks=None)

Bases: WorkflowOrganizer

Source code in src/formed/workflow/organizer.py
272
273
274
275
276
277
278
279
280
281
282
283
def __init__(
    self,
    directory: Optional[Union[str, PathLike]] = None,
    callbacks: Optional[Union[WorkflowCallback, Sequence[WorkflowCallback]]] = None,
) -> None:
    self._directory = Path(directory or self._DEFAULT_DIRECTORY).expanduser().resolve().absolute()

    if isinstance(callbacks, WorkflowCallback):
        callbacks = [callbacks]
    callbacks = [self._Callback(self)] + list(callbacks or [])

    super().__init__(FilesystemWorkflowCache(self.cache_directory), callbacks)

directory property

directory

cache_directory property

cache_directory

executions_directory property

executions_directory

cache instance-attribute

cache = cache

callback instance-attribute

callback = MultiWorkflowCallback(callbacks or [])

get

get(execution_id)
Source code in src/formed/workflow/organizer.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def get(self, execution_id: WorkflowExecutionID) -> Optional[WorkflowExecutionContext]:
    execution_directory = self.executions_directory / execution_id
    if not execution_directory.exists():
        return None

    execution_path = execution_directory / self._Callback._EXECUTION_FILENAME
    if not execution_path.exists():
        return None

    # Use helper method to load execution info
    execution_info = WorkflowExecutionInfo.from_json(json.loads(execution_path.read_text()))

    # Use helper method to load execution state
    state_path = execution_directory / self._Callback._STATE_FILENAME
    if state_path.exists():
        with state_path.open("r") as jsonfile:
            execution_state = json.load(jsonfile, cls=WorkflowJSONDecoder)
    else:
        logger.warning(f"State file not found for execution {execution_id}")
        execution_state = WorkflowExecutionState(execution_id=execution_info.id)

    return WorkflowExecutionContext(execution_info, execution_state, self.cache, self.callback)

exists

exists(execution_id)
Source code in src/formed/workflow/organizer.py
320
321
322
def exists(self, execution_id: WorkflowExecutionID) -> bool:
    execution_directory = self.executions_directory / execution_id
    return execution_directory.exists()

remove

remove(execution_id)
Source code in src/formed/workflow/organizer.py
324
325
326
327
328
329
def remove(self, execution_id: WorkflowExecutionID) -> None:
    execution_directory = self.executions_directory / execution_id
    if not execution_directory.exists():
        return

    shutil.rmtree(execution_directory)

run

run(executor, execution)
Source code in src/formed/workflow/organizer.py
82
83
84
85
86
87
88
89
90
91
92
def run(
    self,
    executor: "WorkflowExecutor",
    execution: Union[WorkflowGraph, WorkflowExecutionInfo],
) -> WorkflowExecutionContext:
    with executor:
        return executor(
            execution,
            cache=self.cache,
            callback=self.callback,
        )

formed.workflow.settings

WorkflowSettings dataclass

WorkflowSettings(
    executor=_default_executor(),
    organizer=_default_organizer(),
)

executor class-attribute instance-attribute

executor = field(default_factory=_default_executor)

organizer class-attribute instance-attribute

organizer = field(default_factory=_default_organizer)

formed.workflow.step

T module-attribute

T = TypeVar('T')

OutputT module-attribute

OutputT = TypeVar('OutputT')

StepFunctionT module-attribute

StepFunctionT = TypeVar(
    "StepFunctionT", bound=Callable[..., Any]
)

WorkflowStepT module-attribute

WorkflowStepT = TypeVar(
    "WorkflowStepT", bound="WorkflowStep"
)

WorkflowStepArgFlag

Bases: str, Enum

IGNORE class-attribute instance-attribute

IGNORE = 'ignore'

WorkflowStepResultFlag

Bases: str, Enum

METRICS class-attribute instance-attribute

METRICS = 'metrics'

get_flags classmethod

get_flags(step_or_annotation)
Source code in src/formed/workflow/step.py
55
56
57
58
59
60
61
62
63
64
@classmethod
def get_flags(cls, step_or_annotation: Any) -> frozenset["WorkflowStepResultFlag"]:
    if isinstance(step_or_annotation, WorkflowStepInfo):
        step_or_annotation = step_or_annotation.step_class.get_output_type()
    if isinstance(step_or_annotation, WorkflowStep):
        step_or_annotation = step_or_annotation.get_output_type()
    origin = typing.get_origin(step_or_annotation)
    if origin is not Annotated:
        return frozenset()
    return frozenset(a for a in typing.get_args(step_or_annotation) if isinstance(a, WorkflowStepResultFlag))

WorkflowStepStatus

Bases: str, Enum

PENDING class-attribute instance-attribute

PENDING = 'pending'

RUNNING class-attribute instance-attribute

RUNNING = 'running'

FAILURE class-attribute instance-attribute

FAILURE = 'failure'

CANCELED class-attribute instance-attribute

CANCELED = 'canceled'

COMPLETED class-attribute instance-attribute

COMPLETED = 'completed'

WorkflowStepState dataclass

WorkflowStepState(
    fingerprint,
    status=PENDING,
    started_at=None,
    finished_at=None,
)

fingerprint instance-attribute

fingerprint

status class-attribute instance-attribute

status = PENDING

started_at class-attribute instance-attribute

started_at = None

finished_at class-attribute instance-attribute

finished_at = None

WorkflowStepContext dataclass

WorkflowStepContext(info, state)

info instance-attribute

info

state instance-attribute

state

WorkflowStep

WorkflowStep(*args, **kwargs)

Bases: Generic[OutputT], Registrable

Source code in src/formed/workflow/step.py
96
97
98
def __init__(self, *args: Any, **kwargs: Any):
    self._args = args
    self._kwargs = kwargs

VERSION class-attribute

VERSION = None

DETERMINISTIC class-attribute

DETERMINISTIC = True

CACHEABLE class-attribute

CACHEABLE = None

FORMAT instance-attribute

FORMAT

FUNCTION instance-attribute

FUNCTION

get_output_type classmethod

get_output_type(field=None)
Source code in src/formed/workflow/step.py
111
112
113
114
115
116
117
118
119
120
@classmethod
def get_output_type(cls, field: Optional[str] = None) -> type[OutputT]:
    return_annotation = cls.FUNCTION.__annotations__.get("return", Any)
    if field is not None:
        return_annotation = typing.get_type_hints(return_annotation).get(field, Any)
    if getattr(return_annotation, "__parameters__", None):
        # This is a workaround for generic steps to skip the type checking.
        # We need to infer the output type from the configuration.
        return cast(type[OutputT], TypeVar("T"))
    return cast(type[OutputT], return_annotation)

from_callable classmethod

from_callable(
    func,
    *,
    version=None,
    deterministic=True,
    cacheable=None,
    format=None,
)
Source code in src/formed/workflow/step.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@classmethod
def from_callable(
    cls,
    func: Callable[..., OutputT],
    *,
    version: Optional[str] = None,
    deterministic: bool = True,
    cacheable: Optional[bool] = None,
    format: Optional[Union[str, Format[OutputT]]] = None,
) -> type["WorkflowStep[OutputT]"]:
    if isinstance(format, str):
        format = cast(type[Format[OutputT]], Format.by_name(format))()
    if version is None:
        version = object_fingerprint(normalize_source(inspect.getsource(func)))

    class WrapperStep(WorkflowStep):
        VERSION = version
        DETERMINISTIC = deterministic
        CACHEABLE = cacheable
        FUNCTION = func
        FORMAT = format or AutoFormat()

        def __init__(self, *args: Any, **kwargs: Any) -> None:
            super().__init__(*args, **kwargs)

    signature = inspect.signature(func)
    annotations = typing.get_type_hints(func)
    init_annotations = {k: v for k, v in annotations.items() if k != "return"}
    setattr(WrapperStep, "__name__", func.__name__)
    setattr(WrapperStep, "__qualname__", func.__qualname__)
    setattr(WrapperStep, "__doc__", func.__doc__)
    setattr(getattr(WrapperStep, "__init__"), "__annotations__", init_annotations)
    setattr(
        getattr(WrapperStep, "__init__"),
        "__signature__",
        signature.replace(return_annotation=annotations.get("return", inspect.Signature.empty)),
    )

    return WrapperStep

get_source classmethod

get_source()
Source code in src/formed/workflow/step.py
162
163
164
@classmethod
def get_source(cls) -> str:
    return inspect.getsource(cls.FUNCTION)

get_normalized_source classmethod

get_normalized_source()
Source code in src/formed/workflow/step.py
166
167
168
@classmethod
def get_normalized_source(cls) -> str:
    return normalize_source(cls.get_source())

get_ignore_args classmethod

get_ignore_args()
Source code in src/formed/workflow/step.py
170
171
172
173
@classmethod
def get_ignore_args(cls) -> frozenset[str]:
    annotations = cls.FUNCTION.__annotations__
    return frozenset(k for k, v in annotations.items() if WorkflowStepArgFlag.IGNORE in typing.get_args(v))

WorkflowStepInfo dataclass

WorkflowStepInfo(name, step, dependencies, fieldref=None)

Bases: Generic[WorkflowStepT]

Unified step info that works for both live and archived steps.

The step field determines the mode:

  • Lazy[WorkflowStepT]: Live mode - can be constructed and executed
  • WorkflowStepArchive: Archived mode - immutable snapshot from past execution

Live mode (before execution):

  • step: Lazy[WorkflowStepT] that can be constructed
  • dependencies: references to other live WorkflowStepInfo objects
  • Properties computed from step_class

Archived mode (after execution):

  • step: WorkflowStepArchive with pre-computed metadata
  • dependencies: references to other archived WorkflowStepInfo objects
  • Properties returned from archive

name instance-attribute

name

step instance-attribute

step

dependencies instance-attribute

dependencies

fieldref class-attribute instance-attribute

fieldref = None

step_class cached property

step_class

Get the step class. Only works in live mode.

archive cached property

archive

Get the archive. Only works in archived mode.

format cached property

format

Get the format. Works in both modes.

version cached property

version

Get the version. Works in both modes.

deterministic cached property

deterministic

Get deterministic flag. Works in both modes.

cacheable cached property

cacheable

Get cacheable flag. Works in both modes.

should_be_cached cached property

should_be_cached

Check if step should be cached. Works in both modes.

fingerprint cached property

fingerprint

Get fingerprint. Works in both modes.

is_live

is_live()

Check if this is a live step.

Source code in src/formed/workflow/step.py
205
206
207
def is_live(self) -> bool:
    """Check if this is a live step."""
    return isinstance(self.step, Lazy)

is_archived

is_archived()

Check if this is an archived step.

Source code in src/formed/workflow/step.py
209
210
211
def is_archived(self) -> bool:
    """Check if this is an archived step."""
    return isinstance(self.step, WorkflowStepArchive)

to_archive

to_archive()

Convert to archive format for serialization.

This is called at execution time to capture the current state. Only works in live mode.

Source code in src/formed/workflow/step.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def to_archive(self) -> WorkflowStepArchive:
    """Convert to archive format for serialization.

    This is called at execution time to capture the current state.
    Only works in live mode.
    """
    if isinstance(self.step, WorkflowStepArchive):
        # Already archived, return as-is
        return self.step

    if not isinstance(self.step, Lazy):
        raise TypeError("Step must be either Lazy or dict")

    # Capture source code if available
    normalized_source: str | None = None
    try:
        normalized_source = self.step_class.get_normalized_source()
    except (OSError, TypeError):
        # Built-in functions or C extensions don't have source
        pass

    # Build dependency fingerprint map with fieldrefs
    dependency_fingerprints: dict[str, dict[str, Any]] = {}
    for path, dep_info in self.dependencies:
        param_path_str = ".".join(str(p) for p in (path if isinstance(path, tuple) else (path,)))
        dependency_fingerprints[param_path_str] = {
            "fingerprint": dep_info.fingerprint,
            "fieldref": dep_info.fieldref,
        }

    # Get step type name (the registered name in Colt)
    step_type: str
    if hasattr(self.step.constructor, "__registered_name__"):
        step_type = self.step.constructor.__registered_name__  # type: ignore
    else:
        # Fallback to class name
        step_type = self.step.constructor.__name__  # type: ignore

    # Build the archive using NamedTuple constructor
    return WorkflowStepArchive(
        name=self.name,
        step_type=step_type,
        fingerprint=self.fingerprint,
        format_identifier=self.format.identifier,
        version=self.version,
        source_hash=object_fingerprint(normalized_source) if normalized_source else "",
        config=dict(self.step.config) if isinstance(self.step.config, Mapping) else {},
        deterministic=self.deterministic,
        cacheable=self.cacheable,
        should_be_cached=self.should_be_cached,
        dependency_fingerprints=dependency_fingerprints,
        fieldref=self.fieldref,
    )

from_archive classmethod

from_archive(archive, dependency_map)

Reconstruct WorkflowStepInfo from an archive.

PARAMETER DESCRIPTION
archive

The archived step metadata

TYPE: WorkflowStepArchive

dependency_map

Map from fingerprint to WorkflowStepInfo for all dependencies

TYPE: Mapping[str, WorkflowStepInfo]

RETURNS DESCRIPTION
WorkflowStepInfo

WorkflowStepInfo in archived mode (step field is WorkflowStepArchive)

Source code in src/formed/workflow/step.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
@classmethod
def from_archive(
    cls,
    archive: WorkflowStepArchive,
    dependency_map: Mapping[str, "WorkflowStepInfo"],
) -> "WorkflowStepInfo":
    """Reconstruct WorkflowStepInfo from an archive.

    Args:
        archive: The archived step metadata
        dependency_map: Map from fingerprint to `WorkflowStepInfo` for all dependencies

    Returns:
        `WorkflowStepInfo` in archived mode (step field is `WorkflowStepArchive`)
    """
    # Reconstruct dependencies using the dependency_map
    dependencies: set[tuple[StrictParamPath, WorkflowStepInfo]] = set()
    for param_path_str, dep_data in archive.dependency_fingerprints.items():
        dep_fingerprint = dep_data["fingerprint"]
        assert isinstance(dep_fingerprint, str)

        dep_fieldref = dep_data["fieldref"] if "fieldref" in dep_data else None
        assert dep_fieldref is None or isinstance(dep_fieldref, str)

        if dep_fingerprint not in dependency_map:
            raise ValueError(
                f"Dependency with fingerprint {dep_fingerprint} not found in dependency_map for step {archive.name}"
            )
        path_parts = tuple(param_path_str.split("."))
        base_dep_info = dependency_map[dep_fingerprint]

        # If the dependency has a fieldref, create a new WorkflowStepInfo with fieldref set
        if dep_fieldref:
            dep_info = WorkflowStepInfo(
                name=base_dep_info.name,
                step=base_dep_info.step,
                dependencies=base_dep_info.dependencies,
                fieldref=dep_fieldref,
            )
        else:
            dep_info = base_dep_info

        dependencies.add((path_parts, dep_info))

    return cls(
        name=archive.name,
        step=archive,  # Store the archive directly, not a Lazy
        dependencies=frozenset(dependencies),
        fieldref=archive.fieldref,
    )

json

json()

Convert to dict for JSON serialization (legacy compatibility).

Source code in src/formed/workflow/step.py
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
def json(self) -> dict[str, JsonValue]:
    """Convert to dict for JSON serialization (legacy compatibility)."""
    if isinstance(self.step, WorkflowStepArchive):
        config = self.step.config
    elif isinstance(self.step, Lazy):
        config = self.step.config
    else:
        config = {}

    return {
        "name": self.name,
        "version": self.version,
        "format": self.format.identifier,
        "deterministic": self.deterministic,
        "cacheable": self.cacheable,
        "fingerprint": self.fingerprint,
        "config": config,
    }

step

step(
    name: str,
    *,
    version: Optional[str] = ...,
    deterministic: bool = ...,
    cacheable: Optional[bool] = ...,
    exist_ok: bool = ...,
    format: Optional[Union[str, Format]] = ...,
) -> Callable[[StepFunctionT], StepFunctionT]
step(
    name: StepFunctionT,
    *,
    version: Optional[str] = ...,
    deterministic: bool = ...,
    cacheable: Optional[bool] = ...,
    exist_ok: bool = ...,
    format: Optional[Union[str, Format]] = ...,
) -> StepFunctionT
step(
    *,
    version: Optional[str] = ...,
    deterministic: bool = ...,
    cacheable: Optional[bool] = ...,
    exist_ok: bool = ...,
    format: Optional[Union[str, Format]] = ...,
) -> Callable[[StepFunctionT], StepFunctionT]
step(
    name=None,
    *,
    version=None,
    deterministic=True,
    cacheable=None,
    exist_ok=False,
    format=None,
)
Source code in src/formed/workflow/step.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
def step(
    name: Optional[Union[str, StepFunctionT]] = None,
    *,
    version: Optional[str] = None,
    deterministic: bool = True,
    cacheable: Optional[bool] = None,
    exist_ok: bool = False,
    format: Optional[Union[str, Format]] = None,
) -> Union[StepFunctionT, Callable[[StepFunctionT], StepFunctionT]]:
    def register(name: str, func: StepFunctionT) -> None:
        step_class = WorkflowStep[Any].from_callable(
            func,
            version=version,
            deterministic=deterministic,
            cacheable=cacheable,
            format=format,
        )
        WorkflowStep.register(name, exist_ok=exist_ok)(step_class)

    def decorator(func: StepFunctionT) -> StepFunctionT:
        nonlocal name
        name = name or func.__name__
        assert isinstance(name, str)
        register(name, func)
        return func

    if name is None:
        return decorator

    if not isinstance(name, str):
        func = name
        register(func.__name__, func)
        return func

    return decorator

use_step_context

use_step_context()
Source code in src/formed/workflow/step.py
510
511
def use_step_context() -> Optional[WorkflowStepContext]:
    return _STEP_CONTEXT.get()

use_step_logger

use_step_logger(default: Union[str, Logger]) -> Logger
use_step_logger(default: None = ...) -> Optional[Logger]
use_step_logger(default=None)
Source code in src/formed/workflow/step.py
522
523
524
525
526
527
528
529
530
def use_step_logger(default: Optional[Union[str, Logger]] = None) -> Optional[Logger]:
    context = use_step_context()
    if context is not None:
        return get_step_logger_from_info(context.info)
    if default is None:
        return None
    if isinstance(default, str):
        return getLogger(default)
    return default

use_step_workdir

use_step_workdir()
Source code in src/formed/workflow/step.py
533
534
535
536
537
538
539
def use_step_workdir() -> Path:
    context = use_step_context()
    if context is None:
        raise RuntimeError("No step context found")
    workdir = WORKFLOW_WORKSPACE_DIRECTORY / context.info.fingerprint
    workdir.mkdir(parents=True, exist_ok=True)
    return workdir

get_step_logger_from_info

get_step_logger_from_info(info)
Source code in src/formed/workflow/step.py
542
543
def get_step_logger_from_info(info: WorkflowStepInfo) -> Logger:
    return getLogger(f"formed.workflow.step.{info.name}.{info.fingerprint[:8]}")

load_artifact

load_artifact(path, format)
Source code in src/formed/workflow/step.py
551
552
553
554
@step("formed::load_artifact", cacheable=False)
def load_artifact(path: str | PathLike, format: Format):
    path = minato.cached_path(path)
    return format.read(path)

formed.workflow.utils

WorkflowJSONEncoder

Bases: JSONEncoder

default

default(o)
Source code in src/formed/workflow/utils.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def default(self, o: Any) -> Any:
    if isinstance(o, IJsonCompatible):
        return {
            _PYTHON_DATA_TYPE_KEY: _JSONDataType.CONTAINER,
            _PYTHON_DATA_VALUE_KEY: o.json(),
            _PYTHON_DATA_CONTAINER_KEY: f"{o.__class__.__module__}.{o.__class__.__qualname__}",
        }
    if isinstance(o, IJsonSerializable):
        return o.json()
    if isinstance(o, datetime.datetime):
        return {_PYTHON_DATA_TYPE_KEY: _JSONDataType.DATETIME, _PYTHON_DATA_VALUE_KEY: o.isoformat()}
    if is_namedtuple(o):
        return {
            _PYTHON_DATA_TYPE_KEY: _JSONDataType.CONTAINER,
            _PYTHON_DATA_VALUE_KEY: o._asdict(),
            _PYTHON_DATA_CONTAINER_KEY: f"{o.__class__.__module__}.{o.__class__.__qualname__}",
        }
    if isinstance(o, tuple):
        return {
            _PYTHON_DATA_TYPE_KEY: _JSONDataType.CONTAINER,
            _PYTHON_DATA_VALUE_KEY: [self.default(i) for i in o],
            _PYTHON_DATA_CONTAINER_KEY: f"{o.__class__.__module__}.{o.__class__.__qualname__}",
        }
    if isinstance(o, (set, frozenset)):
        return {
            _PYTHON_DATA_TYPE_KEY: _JSONDataType.CONTAINER,
            _PYTHON_DATA_VALUE_KEY: sorted((self.default(i) for i in o), key=lambda x: (hash(x), str(x))),
            _PYTHON_DATA_CONTAINER_KEY: f"{o.__class__.__module__}.{o.__class__.__qualname__}",
        }
    if dataclasses.is_dataclass(o) and not isinstance(o, type):
        return {
            _PYTHON_DATA_TYPE_KEY: _JSONDataType.CONTAINER,
            _PYTHON_DATA_VALUE_KEY: {
                field.name: self.default(getattr(o, field.name))
                for field in dataclasses.fields(o)
                if hasattr(o, field.name)
            },
            _PYTHON_DATA_CONTAINER_KEY: f"{o.__class__.__module__}.{o.__class__.__qualname__}",
        }
    if isinstance(o, IPydanticModel):
        return {
            _PYTHON_DATA_TYPE_KEY: _JSONDataType.CONTAINER,
            _PYTHON_DATA_VALUE_KEY: o.model_dump(mode="json"),
            _PYTHON_DATA_CONTAINER_KEY: f"{o.__class__.__module__}.{o.__class__.__qualname__}",
        }
    if isinstance(o, type):
        return {
            _PYTHON_DATA_TYPE_KEY: _JSONDataType.CLASS,
            _PYTHON_DATA_VALUE_KEY: f"{o.__module__}.{o.__qualname__}",
        }
    if isinstance(o, Counter):
        return {
            _PYTHON_DATA_TYPE_KEY: _JSONDataType.COUNTER,
            _PYTHON_DATA_VALUE_KEY: dict(o),
        }
    if isinstance(o, numpy.ndarray):
        return {
            _PYTHON_DATA_TYPE_KEY: _JSONDataType.NDARRAY,
            _PYTHON_DATA_VALUE_KEY: {
                "dtype": str(o.dtype),
                "shape": o.shape,
                "data": base64.b85encode(o.tobytes()).decode(),
            },
        }
    if isinstance(o, list):
        return [self.default(i) for i in o]
    if isinstance(o, dict):
        return {k: self.default(v) for k, v in o.items()}
    if isinstance(o, (bool, int, float, str)) or o is None:
        return o
    return {
        _PYTHON_DATA_TYPE_KEY: _JSONDataType.PICKLE,
        _PYTHON_DATA_VALUE_KEY: base64.b85encode(cloudpickle.dumps(o)).decode(),
    }

WorkflowJSONDecoder

WorkflowJSONDecoder()

Bases: JSONDecoder

Source code in src/formed/workflow/utils.py
127
128
def __init__(self) -> None:
    super().__init__(object_hook=self._reconstruct)

object_fingerprint

object_fingerprint(obj)
Source code in src/formed/workflow/utils.py
25
26
27
28
29
def object_fingerprint(obj: Any) -> str:
    with suppress(TypeError, ValueError):
        # This is a workaround for fingerprint consistency.
        obj = json.loads(json.dumps(obj, cls=WorkflowJSONEncoder, sort_keys=True))
    return b58encode(hash_object_bytes(obj)).decode()

as_jsonvalue

as_jsonvalue(value)
Source code in src/formed/workflow/utils.py
32
33
def as_jsonvalue(value: Any) -> JsonValue:
    return cast(JsonValue, json.loads(json.dumps(value, cls=WorkflowJSONEncoder)))

from_jsonvalue

from_jsonvalue(value)
Source code in src/formed/workflow/utils.py
36
37
def from_jsonvalue(value: JsonValue) -> Any:
    return WorkflowJSONDecoder._reconstruct(value)