Workflow¶
- Archive
- Cache
- Callback
- Colt
- Constants
- Executor
- Format
- Graph
- JSON Schema
- Organizer
- Settings
- Step
- Utils
formed.workflow.archive
¶
Archive data structures for workflow execution persistence.
This module defines NamedTuple structures for serializing workflow executions to JSON format. These archives capture all metadata needed to restore past executions, including step fingerprints, source code hashes, and dependency information.
WorkflowStepArchive
¶
Bases: NamedTuple
Archived snapshot of a WorkflowStep's execution-time metadata.
This structure captures all information needed to:
- Look up cached results (
fingerprint,format_identifier) - Understand what ran (
version,source_hash,config) - Reconstruct dependency references (
dependency_fingerprints)
All steps are stored flat in WorkflowGraphArchive.steps, and dependencies
are referenced by fingerprint rather than nested recursively.
json
¶
json()
Convert to JSON-serializable dict.
Source code in src/formed/workflow/archive.py
55 56 57 | |
from_json
classmethod
¶
from_json(data)
Create from JSON-deserialized dict.
Source code in src/formed/workflow/archive.py
59 60 61 62 63 | |
WorkflowGraphArchive
¶
Bases: NamedTuple
Archived snapshot of a WorkflowGraph's execution-time state.
All steps are stored flat here (not nested). Dependencies between steps
are represented by fingerprints in WorkflowStepArchive.dependency_fingerprints.
json
¶
json()
Convert to JSON-serializable dict.
Source code in src/formed/workflow/archive.py
80 81 82 83 84 85 | |
from_json
classmethod
¶
from_json(data)
Create from JSON-deserialized dict.
Source code in src/formed/workflow/archive.py
87 88 89 90 91 92 93 94 | |
WorkflowExecutionArchive
¶
Bases: NamedTuple
Complete execution snapshot saved to execution.json.
This is the top-level structure that organizers save and restore. State is NOT included here - it's saved separately as it's mutable.
json
¶
json()
Convert to JSON-serializable dict.
Source code in src/formed/workflow/archive.py
116 117 118 119 120 121 122 123 | |
from_json
classmethod
¶
from_json(data)
Create from JSON-deserialized dict.
Source code in src/formed/workflow/archive.py
125 126 127 128 129 130 131 132 133 134 | |
formed.workflow.cache
¶
Workflow step result caching implementations.
This module provides caching backends for storing and retrieving workflow step results. Multiple cache implementations are available for different use cases.
Available Caches
- EmptyWorkflowCache: No-op cache that never stores results
- MemoryWorkflowCache: In-memory cache for development/testing
- FilesystemWorkflowCache: Persistent file-based cache (default)
Examples:
>>> from formed.workflow.cache import FilesystemWorkflowCache
>>>
>>> # Create filesystem cache
>>> cache = FilesystemWorkflowCache(".formed/cache")
>>>
>>> # Check if step result is cached
>>> if step_info in cache:
... result = cache[step_info]
... else:
... result = execute_step()
... cache[step_info] = result
WorkflowCache
¶
Bases: Registrable
Abstract base class for workflow step result caching.
WorkflowCache provides a dict-like interface for storing and retrieving step execution results, keyed by WorkflowStepInfo (which includes the step's fingerprint).
Subclasses implement different storage backends (memory, filesystem, etc.).
Examples:
>>> # Implement custom cache
>>> class MyCache(WorkflowCache):
... def __getitem__(self, step_info):
... # Retrieve from custom backend
... pass
... def __setitem__(self, step_info, value):
... # Store to custom backend
... pass
... def __contains__(self, step_info):
... # Check if cached
... pass
... def __delitem__(self, step_info):
... # Remove from cache
... pass
Note
- Cache keys are WorkflowStepInfo instances
- Fingerprints uniquely identify step configurations
- Thread-safety depends on implementation
EmptyWorkflowCache
¶
Bases: WorkflowCache
No-op cache that never stores results.
EmptyWorkflowCache disables caching entirely. All contains checks return False and all getitem calls raise KeyError, forcing steps to always re-execute.
This is useful for debugging or when caching is undesirable.
Examples:
>>> cache = EmptyWorkflowCache()
>>> cache[step_info] = result # Does nothing
>>> step_info in cache # Always returns False
MemoryWorkflowCache
¶
MemoryWorkflowCache()
Bases: WorkflowCache
In-memory cache for workflow step results.
MemoryWorkflowCache stores results in a Python dictionary, providing fast access but no persistence across process restarts. Useful for development, testing, or when results don't need to survive process boundaries.
Examples:
>>> cache = MemoryWorkflowCache()
>>> cache[step_info] = result
>>> if step_info in cache:
... result = cache[step_info]
>>> print(len(cache)) # Number of cached steps
Note
- Cache is lost when process ends
- Not suitable for production workflows
- No size limit - can grow unbounded
Source code in src/formed/workflow/cache.py
188 189 | |
FilesystemWorkflowCache
¶
FilesystemWorkflowCache(directory)
Bases: WorkflowCache
Persistent file-based cache for workflow step results.
FilesystemWorkflowCache stores step results in a directory structure organized by fingerprint. Each step's result is serialized using its configured Format and written to a subdirectory. File locking ensures thread-safe concurrent access.
| ATTRIBUTE | DESCRIPTION |
|---|---|
_directory |
Root directory for cache storage.
|
Examples:
>>> cache = FilesystemWorkflowCache(".formed/cache")
>>> cache[step_info] = result # Writes to .formed/cache/<fingerprint>/
>>> if step_info in cache:
... result = cache[step_info] # Reads from disk
>>> del cache[step_info] # Removes cached result
Note
- Results persist across process restarts
- Thread-safe via file locking
- Cache directory structure: {cache_dir}/{fingerprint}/
- Each step uses its Format for serialization
- Suitable for production workflows
Initialize filesystem cache.
| PARAMETER | DESCRIPTION |
|---|---|
directory
|
Root directory for cache storage. Created if doesn't exist.
TYPE:
|
Source code in src/formed/workflow/cache.py
241 242 243 244 245 246 247 248 249 | |
formed.workflow.callback
¶
Callback system for workflow execution monitoring.
This module provides a callback interface for monitoring and responding to workflow execution events. Callbacks can be used for logging, metrics collection, checkpointing, or custom workflow orchestration logic.
Key Components:
- `WorkflowCallback`: Abstract base class for all callbacks
- `EmptyWorkflowCallback`: No-op callback
- `MultiWorkflowCallback`: Combines multiple callbacks
Features
- Hook points at execution and step start/end
- Access to execution and step contexts
- Composable callback system
- Registrable for configuration-based instantiation
Examples:
>>> from formed.workflow import WorkflowCallback
>>>
>>> @WorkflowCallback.register("custom")
... class CustomCallback(WorkflowCallback):
... def on_step_start(self, step_context, execution_context):
... print(f"Starting step: {step_context.info.name}")
...
... def on_step_end(self, step_context, execution_context):
... print(f"Finished step: {step_context.info.name}")
WorkflowCallback
¶
Bases: Registrable
Abstract base class for workflow execution callbacks.
Callbacks provide hooks to execute custom logic at various points during workflow execution. Subclasses can override hook methods to implement custom monitoring, logging, or orchestration behavior.
Hook execution order
- on_execution_start - once at workflow start
- on_step_start - before each step execution
- on_step_end - after each step execution
- on_execution_end - once at workflow end
Examples:
>>> class LoggingCallback(WorkflowCallback):
... def on_execution_start(self, execution_context):
... print("Workflow started")
...
... def on_step_end(self, step_context, execution_context):
... print(f"Step {step_context.info.name} completed")
on_execution_start
¶
on_execution_start(execution_context)
Called once at the start of workflow execution.
| PARAMETER | DESCRIPTION |
|---|---|
execution_context
|
Context containing execution metadata and state.
TYPE:
|
Source code in src/formed/workflow/callback.py
65 66 67 68 69 70 71 72 73 74 75 | |
on_execution_end
¶
on_execution_end(execution_context)
Called once at the end of workflow execution.
| PARAMETER | DESCRIPTION |
|---|---|
execution_context
|
Context containing execution metadata and state.
TYPE:
|
Source code in src/formed/workflow/callback.py
77 78 79 80 81 82 83 84 85 86 87 | |
on_step_start
¶
on_step_start(step_context, execution_context)
Called before each step execution.
| PARAMETER | DESCRIPTION |
|---|---|
step_context
|
Context for the step about to execute.
TYPE:
|
execution_context
|
Context for the overall execution.
TYPE:
|
Source code in src/formed/workflow/callback.py
89 90 91 92 93 94 95 96 97 98 99 100 101 | |
on_step_end
¶
on_step_end(step_context, execution_context)
Called after each step execution.
| PARAMETER | DESCRIPTION |
|---|---|
step_context
|
Context for the step that just executed.
TYPE:
|
execution_context
|
Context for the overall execution.
TYPE:
|
Source code in src/formed/workflow/callback.py
103 104 105 106 107 108 109 110 111 112 113 114 115 | |
EmptyWorkflowCallback
¶
Bases: WorkflowCallback
No-op callback that does nothing.
This callback can be used as a placeholder or default when no callback behavior is needed.
on_execution_start
¶
on_execution_start(execution_context)
Called once at the start of workflow execution.
| PARAMETER | DESCRIPTION |
|---|---|
execution_context
|
Context containing execution metadata and state.
TYPE:
|
Source code in src/formed/workflow/callback.py
65 66 67 68 69 70 71 72 73 74 75 | |
on_execution_end
¶
on_execution_end(execution_context)
Called once at the end of workflow execution.
| PARAMETER | DESCRIPTION |
|---|---|
execution_context
|
Context containing execution metadata and state.
TYPE:
|
Source code in src/formed/workflow/callback.py
77 78 79 80 81 82 83 84 85 86 87 | |
on_step_start
¶
on_step_start(step_context, execution_context)
Called before each step execution.
| PARAMETER | DESCRIPTION |
|---|---|
step_context
|
Context for the step about to execute.
TYPE:
|
execution_context
|
Context for the overall execution.
TYPE:
|
Source code in src/formed/workflow/callback.py
89 90 91 92 93 94 95 96 97 98 99 100 101 | |
on_step_end
¶
on_step_end(step_context, execution_context)
Called after each step execution.
| PARAMETER | DESCRIPTION |
|---|---|
step_context
|
Context for the step that just executed.
TYPE:
|
execution_context
|
Context for the overall execution.
TYPE:
|
Source code in src/formed/workflow/callback.py
103 104 105 106 107 108 109 110 111 112 113 114 115 | |
MultiWorkflowCallback
¶
MultiWorkflowCallback(callbacks)
Bases: WorkflowCallback
Callback that executes multiple callbacks in sequence.
This callback allows composing multiple callbacks together, calling each one in order for every hook.
| PARAMETER | DESCRIPTION |
|---|---|
callbacks
|
Sequence of callbacks to execute.
TYPE:
|
Examples:
>>> callback1 = LoggingCallback()
>>> callback2 = MetricsCallback()
>>> multi = MultiWorkflowCallback([callback1, callback2])
>>> # Both callbacks will be called for each hook
Source code in src/formed/workflow/callback.py
148 149 | |
on_execution_start
¶
on_execution_start(execution_context)
Source code in src/formed/workflow/callback.py
151 152 153 154 155 156 | |
on_execution_end
¶
on_execution_end(execution_context)
Source code in src/formed/workflow/callback.py
158 159 160 161 162 163 | |
on_step_start
¶
on_step_start(step_context, execution_context)
Source code in src/formed/workflow/callback.py
165 166 167 168 169 170 171 | |
on_step_end
¶
on_step_end(step_context, execution_context)
Source code in src/formed/workflow/callback.py
173 174 175 176 177 178 179 | |
formed.workflow.colt
¶
COLT_BUILDER
module-attribute
¶
COLT_BUILDER = ColtBuilder(
typekey=COLT_TYPEKEY,
argskey=COLT_ARGSKEY,
callback=MultiCallback(
DatetimeCallback(), RefCallback()
),
)
WorkflowRef
¶
WorkflowRef(
annotation, path, step_name, config, field_name=None
)
Bases: Generic[_T], Placeholder[_T]
Source code in src/formed/workflow/colt.py
37 38 39 40 41 42 43 44 45 46 47 48 49 | |
is_ref
staticmethod
¶
is_ref(builder, config)
Source code in src/formed/workflow/colt.py
28 29 30 31 32 33 34 35 | |
match_type_hint
¶
match_type_hint(annotation)
Source code in src/formed/workflow/colt.py
67 68 69 70 | |
RefCallback
¶
Bases: ColtCallback
Replace ref configs with WorkflowRef instances as placeholders
on_build
¶
on_build(path, config, builder, context, annotation=None)
Source code in src/formed/workflow/colt.py
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 | |
DatetimeCallback
¶
Bases: ColtCallback
on_build
¶
on_build(path, config, builder, context, annotation=None)
Source code in src/formed/workflow/colt.py
175 176 177 178 179 180 181 182 183 184 185 186 | |
formed.workflow.constants
¶
WORKFLOW_DEFAULT_DIRECTORY
module-attribute
¶
WORKFLOW_DEFAULT_DIRECTORY = (
DEFAULT_FORMED_DIRECTORY / "workflow"
)
WORKFLOW_DEFAULT_SETTINGS_PATH
module-attribute
¶
WORKFLOW_DEFAULT_SETTINGS_PATH = (
DEFAULT_WORKING_DIRECTORY / "formed.yml"
)
WORKFLOW_INTEGRATION_DIRECTORY
module-attribute
¶
WORKFLOW_INTEGRATION_DIRECTORY = (
DEFAULT_FORMED_DIRECTORY / "integrations"
)
WORKFLOW_WORKSPACE_DIRECTORY
module-attribute
¶
WORKFLOW_WORKSPACE_DIRECTORY = (
DEFAULT_FORMED_DIRECTORY / "workspaces"
)
formed.workflow.executor
¶
Workflow execution engine and context management.
This module provides the execution engine for workflows, coordinating step execution, caching, callbacks, and state management.
Key Components
WorkflowExecutor: Abstract base for execution enginesDefaultWorkflowExecutor: Default sequential execution implementationWorkflowExecutionContext: Runtime context for workflow executionWorkflowExecutionInfo: Metadata about workflow execution
Features
- Sequential step execution with dependency resolution
- Cache integration for step results
- Callback hooks for monitoring and logging
- Execution state tracking
- Git and environment metadata capture
Examples:
>>> from formed.workflow import WorkflowGraph, DefaultWorkflowExecutor
>>> from formed.workflow.cache import FilesystemWorkflowCache
>>>
>>> # Load workflow and create executor
>>> graph = WorkflowGraph.from_jsonnet("workflow.jsonnet")
>>> executor = DefaultWorkflowExecutor()
>>> cache = FilesystemWorkflowCache(".formed/cache")
>>>
>>> # Execute workflow
>>> with executor:
... context = executor(graph, cache=cache)
>>> print(context.state.status) # "completed"
WorkflowExecutorT
module-attribute
¶
WorkflowExecutorT = TypeVar(
"WorkflowExecutorT", bound="WorkflowExecutor"
)
WorkflowExecutionStatus
¶
Bases: str, Enum
WorkflowExecutionState
dataclass
¶
WorkflowExecutionState(
execution_id=None,
status=PENDING,
started_at=None,
finished_at=None,
)
WorkflowExecutionMetadata
dataclass
¶
WorkflowExecutionMetadata(
version=version("formed"),
git=get_git_info(),
environment=dict(),
required_modules=list(),
dependent_packages=get_installed_packages(),
)
WorkflowExecutionInfo
dataclass
¶
WorkflowExecutionInfo(
graph, id=None, metadata=WorkflowExecutionMetadata()
)
metadata
class-attribute
instance-attribute
¶
metadata = field(default_factory=WorkflowExecutionMetadata)
json
¶
json()
Convert to JSON-serializable dict.
Returns a dict that may contain types requiring WorkflowJSONEncoder
for proper serialization (e.g., datetime, git info).
Examples:
>>> data = execution_info.to_json_dict()
>>> json.dump(data, file, cls=WorkflowJSONEncoder)
Source code in src/formed/workflow/executor.py
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 | |
from_json
classmethod
¶
from_json(data)
Reconstruct from JSON-serializable dict.
Validates format version and reconstructs all fields.
Examples:
>>> data = json.load(file, cls=WorkflowJSONDecoder)
>>> execution_info = WorkflowExecutionInfo.from_json_dict(data)
Source code in src/formed/workflow/executor.py
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | |
WorkflowExecutionContext
dataclass
¶
WorkflowExecutionContext(
info,
state,
cache=EmptyWorkflowCache(),
callback=EmptyWorkflowCallback(),
)
callback
class-attribute
instance-attribute
¶
callback = field(default_factory=EmptyWorkflowCallback)
WorkflowExecutor
¶
Bases: Registrable
DefaultWorkflowExecutor
¶
Bases: WorkflowExecutor
use_execution_context
¶
use_execution_context()
Source code in src/formed/workflow/executor.py
313 314 | |
formed.workflow.format
¶
Serialization formats for workflow step results.
This module provides format abstractions for serializing and deserializing workflow step results. Multiple formats support different data types and use cases.
Key Components
Format: Abstract base class for all formatsPickleFormat: Universal format using cloudpickleJsonFormat: JSON format for JSON-serializable dataMappingFormat: JSON format for mappings/dictsAutoFormat: Automatically selects appropriate format
Features
- Support for iterators and streaming data
- Type-safe serialization/deserialization
- Format auto-detection based on data type
- Extensible via registration system
Examples:
>>> from formed.workflow import JsonFormat, PickleFormat
>>>
>>> # JSON format for simple data
>>> json_format = JsonFormat()
>>> json_format.write({"key": "value"}, directory)
>>> data = json_format.read(directory)
>>>
>>> # Pickle format for complex objects
>>> pickle_format = PickleFormat()
>>> pickle_format.write(my_model, directory)
>>> model = pickle_format.read(directory)
Format
¶
Bases: Generic[_T], Registrable
Abstract base class for serialization formats.
Formats handle serialization and deserialization of workflow step results to/from disk. Each format is identified by a unique identifier and can indicate if it's the default format for a given type.
| CLASS TYPE PARAMETER | DESCRIPTION |
|---|---|
_T
|
Type of data this format can serialize/deserialize.
|
identifier
property
¶
identifier
Get the unique identifier for this format.
| RETURNS | DESCRIPTION |
|---|---|
str
|
Format identifier string. |
write
¶
write(artifact, directory)
Write artifact to directory.
| PARAMETER | DESCRIPTION |
|---|---|
artifact
|
Data to serialize.
TYPE:
|
directory
|
Directory to write to.
TYPE:
|
Source code in src/formed/workflow/format.py
79 80 81 82 83 84 85 86 87 | |
read
¶
read(directory)
Read artifact from directory.
| PARAMETER | DESCRIPTION |
|---|---|
directory
|
Directory to read from.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
_T
|
Deserialized data. |
Source code in src/formed/workflow/format.py
89 90 91 92 93 94 95 96 97 98 99 | |
is_default_of
classmethod
¶
is_default_of(obj)
Check if this format is the default for the given object type.
| PARAMETER | DESCRIPTION |
|---|---|
obj
|
Object to check.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if this format should be used by default for this type. |
Source code in src/formed/workflow/format.py
101 102 103 104 105 106 107 108 109 110 111 112 | |
PickleFormat
¶
Bases: Format[_T], Generic[_T]
Universal serialization format using cloudpickle.
This format can serialize almost any Python object, including functions, classes, and complex nested structures. It also supports streaming iterators.
Examples:
>>> format = PickleFormat()
>>> format.write(my_object, directory)
>>> obj = format.read(directory)
>>>
>>> # For iterators
>>> format.write(iter(range(1000)), directory)
>>> iterator = format.read(directory) # Returns iterator
Note
This is the fallback format when no other format applies.
identifier
property
¶
identifier
Get the unique identifier for this format.
| RETURNS | DESCRIPTION |
|---|---|
str
|
Format identifier string. |
write
¶
write(artifact, directory)
Source code in src/formed/workflow/format.py
158 159 160 161 162 163 164 165 166 167 | |
read
¶
read(directory)
Source code in src/formed/workflow/format.py
169 170 171 172 173 174 175 | |
is_default_of
classmethod
¶
is_default_of(obj)
Check if this format is the default for the given object type.
| PARAMETER | DESCRIPTION |
|---|---|
obj
|
Object to check.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if this format should be used by default for this type. |
Source code in src/formed/workflow/format.py
101 102 103 104 105 106 107 108 109 110 111 112 | |
JsonFormat
¶
Bases: Format[_JsonFormattableT], Generic[_JsonFormattableT]
JSON-based serialization format for JSON-compatible data.
This format serializes data to JSON files (.json for single objects, .jsonl for iterators). It supports all JSON-serializable types plus dataclasses, named tuples, and Pydantic models.
Features
- Human-readable format
- Support for iterators via JSON Lines format
- Automatic type reconstruction using metadata
- Custom JSON encoder/decoder for extended types
| CLASS TYPE PARAMETER | DESCRIPTION |
|---|---|
_JsonFormattableT
|
JSON-compatible type (primitives, containers, dataclasses, etc.)
|
Examples:
>>> format = JsonFormat()
>>>
>>> # Single object
>>> format.write({"key": "value"}, directory)
>>> data = format.read(directory)
>>>
>>> # Iterator (uses JSONL)
>>> format.write(iter([{"a": 1}, {"a": 2}]), directory)
>>> iterator = format.read(directory)
Note
This is the default format for JSON-serializable types. Objects are reconstructed with their original type using metadata.
identifier
property
¶
identifier
Get the unique identifier for this format.
| RETURNS | DESCRIPTION |
|---|---|
str
|
Format identifier string. |
write
¶
write(artifact, directory)
Write JSON-serializable artifact to directory.
Writes artifact.json for single objects or artifact.jsonl for
iterators. Also writes metadata.json containing type information
for reconstruction.
| PARAMETER | DESCRIPTION |
|---|---|
artifact
|
Data to serialize (single object or iterator).
TYPE:
|
directory
|
Directory to write to.
TYPE:
|
Source code in src/formed/workflow/format.py
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 | |
read
¶
read(directory)
Read JSON artifact from directory.
Reads artifact.json or artifact.jsonl and reconstructs the
original type using metadata.json if available.
| PARAMETER | DESCRIPTION |
|---|---|
directory
|
Directory to read from.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
_JsonFormattableT
|
Deserialized data with original type. |
Source code in src/formed/workflow/format.py
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 | |
is_default_of
classmethod
¶
is_default_of(obj)
Check if JSON format is default for object type.
| PARAMETER | DESCRIPTION |
|---|---|
obj
|
Object to check.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True for JSON-serializable types (primitives, containers, |
bool
|
dataclasses, named tuples, Pydantic models). |
Source code in src/formed/workflow/format.py
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 | |
MappingFormat
¶
MappingFormat(format)
Bases: Format[Mapping[str, _T]], Generic[_T]
Format for mappings using subdirectories for values.
This format stores each mapping entry as a subdirectory, with the key as the directory name and the value serialized using a nested format. This allows mappings of complex objects to be stored in an organized directory structure.
| CLASS TYPE PARAMETER | DESCRIPTION |
|---|---|
_T
|
Type of mapping values.
|
| PARAMETER | DESCRIPTION |
|---|---|
format
|
Format to use for serializing mapping values.
TYPE:
|
Examples:
>>> # Mapping of strings to dataframes
>>> inner_format = PickleFormat()
>>> format = MappingFormat(inner_format)
>>>
>>> data = {
... "train": train_df,
... "test": test_df,
... }
>>> format.write(data, directory)
>>> # Creates: directory/train/artifact.pkl
>>> # directory/test/artifact.pkl
>>>
>>> loaded = format.read(directory)
Note
Keys must be valid directory names (no special characters).
Source code in src/formed/workflow/format.py
364 365 | |
identifier
property
¶
identifier
Get the unique identifier for this format.
| RETURNS | DESCRIPTION |
|---|---|
str
|
Format identifier string. |
write
¶
write(artifact, directory)
Write mapping to subdirectories.
Each mapping entry is written to a subdirectory named after the key, with the value serialized using the nested format.
| PARAMETER | DESCRIPTION |
|---|---|
artifact
|
Mapping to serialize.
TYPE:
|
directory
|
Directory to write to.
TYPE:
|
Source code in src/formed/workflow/format.py
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 | |
read
¶
read(directory)
Read mapping from subdirectories.
Reconstructs the mapping by reading each subdirectory as a key-value pair.
| PARAMETER | DESCRIPTION |
|---|---|
directory
|
Directory to read from.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Mapping[str, _T]
|
Reconstructed mapping. |
Source code in src/formed/workflow/format.py
383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 | |
is_default_of
classmethod
¶
is_default_of(obj)
Check if this format is the default for the given object type.
| PARAMETER | DESCRIPTION |
|---|---|
obj
|
Object to check.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if this format should be used by default for this type. |
Source code in src/formed/workflow/format.py
101 102 103 104 105 106 107 108 109 110 111 112 | |
DatasetFormat
¶
Bases: Format[Dataset[_T]], Generic[_T]
identifier
property
¶
identifier
Get the unique identifier for this format.
| RETURNS | DESCRIPTION |
|---|---|
str
|
Format identifier string. |
write
¶
write(artifact, directory)
Source code in src/formed/workflow/format.py
404 405 406 407 | |
read
¶
read(directory)
Source code in src/formed/workflow/format.py
409 410 | |
is_default_of
classmethod
¶
is_default_of(obj)
Source code in src/formed/workflow/format.py
412 413 414 | |
AutoFormat
¶
Bases: Format[_T]
Automatic format selection based on object type.
This format automatically selects the most appropriate format for
an object by checking each registered format's is_default_of()
method. It stores the chosen format name in metadata for correct
deserialization.
Selection priority
- Last registered format that claims the type (most specific)
- Falls back to pickle format if no format claims the type
Examples:
>>> format = AutoFormat()
>>>
>>> # Automatically uses JsonFormat for dict
>>> format.write({"key": "value"}, directory)
>>>
>>> # Automatically uses PickleFormat for custom objects
>>> format.write(my_custom_object, directory)
>>>
>>> # Reads with the same format used during write
>>> obj = format.read(directory)
Note
This is the recommended format for most use cases as it provides optimal serialization for each type.
identifier
property
¶
identifier
Get the unique identifier for this format.
| RETURNS | DESCRIPTION |
|---|---|
str
|
Format identifier string. |
write
¶
write(artifact, directory)
Write artifact using automatically selected format.
Selects the appropriate format, writes the artifact, and stores the format name in metadata for deserialization.
| PARAMETER | DESCRIPTION |
|---|---|
artifact
|
Data to serialize.
TYPE:
|
directory
|
Directory to write to.
TYPE:
|
Source code in src/formed/workflow/format.py
470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 | |
read
¶
read(directory)
Read artifact using the format recorded in metadata.
Reads the format metadata and uses the same format that was used during writing.
| PARAMETER | DESCRIPTION |
|---|---|
directory
|
Directory to read from.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
_T
|
Deserialized data. |
Source code in src/formed/workflow/format.py
486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 | |
is_default_of
classmethod
¶
is_default_of(obj)
Check if this format is the default for the given object type.
| PARAMETER | DESCRIPTION |
|---|---|
obj
|
Object to check.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if this format should be used by default for this type. |
Source code in src/formed/workflow/format.py
101 102 103 104 105 106 107 108 109 110 111 112 | |
formed.workflow.graph
¶
Workflow graph construction and dependency resolution.
This module provides the WorkflowGraph class which parses workflow configurations and builds directed acyclic graphs (DAGs) of workflow steps with dependency tracking.
Key Features
- Parse Jsonnet workflow configurations
- Automatic dependency detection via references
- Topological sorting for execution order
- Cycle detection in dependencies
- DAG-based workflow representation
Examples:
>>> from formed.workflow import WorkflowGraph
>>>
>>> # Load workflow from Jsonnet config
>>> graph = WorkflowGraph.from_jsonnet("workflow.jsonnet")
>>>
>>> # Access steps in topological order
>>> for step_info in graph:
... print(f"Step: {step_info.name}")
... print(f"Dependencies: {step_info.dependencies}")
>>>
>>> # Get specific step
>>> preprocess_step = graph["preprocess"]
>>> print(preprocess_step.fingerprint)
WorkflowGraph
¶
WorkflowGraph(steps)
Bases: FromJsonnet
Source code in src/formed/workflow/graph.py
129 130 131 132 133 | |
get_subgraph
¶
get_subgraph(step_name)
Get a subgraph containing a step and all its dependencies.
Only works with live (non-archived) graphs. For archived graphs, dependencies are already resolved in the archive.
Source code in src/formed/workflow/graph.py
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | |
visualize
¶
visualize(*, output=stdout, additional_info={})
Source code in src/formed/workflow/graph.py
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 | |
from_config
classmethod
¶
from_config(config)
Source code in src/formed/workflow/graph.py
188 189 190 | |
to_archive
¶
to_archive()
Convert graph to archive format for serialization.
This captures the execution-time state of all steps in a flat structure. Only works with live graphs.
Source code in src/formed/workflow/graph.py
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 | |
from_archive
classmethod
¶
from_archive(archive)
Reconstruct graph from archive.
Handles all dependency resolution internally using a two-pass approach. Organizers don't need to know about the complexity.
Source code in src/formed/workflow/graph.py
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 | |
from_jsonnet
classmethod
¶
from_jsonnet(filename, ext_vars=None, overrides=None)
Source code in src/formed/common/jsonnet.py
126 127 128 129 130 131 132 133 134 | |
json
¶
json()
Source code in src/formed/common/jsonnet.py
140 141 142 143 | |
from_json
classmethod
¶
from_json(o)
Source code in src/formed/common/jsonnet.py
145 146 147 148 149 | |
formed.workflow.jsonschema
¶
generate_workflow_schema
¶
generate_workflow_schema(title='formed Workflow Graph')
Source code in src/formed/workflow/jsonschema.py
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | |
formed.workflow.organizer
¶
Workflow organizers for managing execution state and artifacts.
This module provides organizers that coordinate workflow execution, managing caches, callbacks, and persistent storage of execution state.
Key Components
WorkflowOrganizer: Abstract base class for organizersMemoryWorkflowOrganizer: In-memory organizer for testingFilesystemWorkflowOrganizer: Persistent filesystem-based organizer
Features
- Execution lifecycle management
- Integration with caches and callbacks
- Persistent storage of execution metadata and results
- Thread-safe execution with file locking
- Automatic log capture and organization
Examples:
>>> from formed.workflow import FilesystemWorkflowOrganizer, DefaultWorkflowExecutor
>>>
>>> # Create filesystem organizer
>>> organizer = FilesystemWorkflowOrganizer(
... directory=".formed",
... callbacks=[my_callback]
... )
>>>
>>> # Run workflow
>>> executor = DefaultWorkflowExecutor()
>>> context = organizer.run(executor, workflow_graph)
>>>
>>> # Access results
>>> print(context.state)
>>> print(context.results)
T_WorkflowOrganizer
module-attribute
¶
T_WorkflowOrganizer = TypeVar(
"T_WorkflowOrganizer", bound="WorkflowOrganizer"
)
WorkflowOrganizer
¶
WorkflowOrganizer(cache, callbacks)
Bases: Registrable
Source code in src/formed/workflow/organizer.py
71 72 73 74 75 76 77 78 79 80 | |
run
¶
run(executor, execution)
Source code in src/formed/workflow/organizer.py
82 83 84 85 86 87 88 89 90 91 92 | |
get
¶
get(execution_id)
Source code in src/formed/workflow/organizer.py
94 95 | |
exists
¶
exists(execution_id)
Source code in src/formed/workflow/organizer.py
97 98 | |
remove
¶
remove(execution_id)
Source code in src/formed/workflow/organizer.py
100 101 | |
MemoryWorkflowOrganizer
¶
MemoryWorkflowOrganizer(callbacks=None)
Bases: WorkflowOrganizer
Source code in src/formed/workflow/organizer.py
106 107 108 109 110 | |
run
¶
run(executor, execution)
Source code in src/formed/workflow/organizer.py
82 83 84 85 86 87 88 89 90 91 92 | |
get
¶
get(execution_id)
Source code in src/formed/workflow/organizer.py
94 95 | |
exists
¶
exists(execution_id)
Source code in src/formed/workflow/organizer.py
97 98 | |
remove
¶
remove(execution_id)
Source code in src/formed/workflow/organizer.py
100 101 | |
FilesystemWorkflowOrganizer
¶
FilesystemWorkflowOrganizer(directory=None, callbacks=None)
Bases: WorkflowOrganizer
Source code in src/formed/workflow/organizer.py
272 273 274 275 276 277 278 279 280 281 282 283 | |
get
¶
get(execution_id)
Source code in src/formed/workflow/organizer.py
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 | |
exists
¶
exists(execution_id)
Source code in src/formed/workflow/organizer.py
320 321 322 | |
remove
¶
remove(execution_id)
Source code in src/formed/workflow/organizer.py
324 325 326 327 328 329 | |
run
¶
run(executor, execution)
Source code in src/formed/workflow/organizer.py
82 83 84 85 86 87 88 89 90 91 92 | |
formed.workflow.settings
¶
WorkflowSettings
dataclass
¶
WorkflowSettings(
executor=_default_executor(),
organizer=_default_organizer(),
)
formed.workflow.step
¶
StepFunctionT
module-attribute
¶
StepFunctionT = TypeVar(
"StepFunctionT", bound=Callable[..., Any]
)
WorkflowStepArgFlag
¶
Bases: str, Enum
WorkflowStepResultFlag
¶
Bases: str, Enum
get_flags
classmethod
¶
get_flags(step_or_annotation)
Source code in src/formed/workflow/step.py
55 56 57 58 59 60 61 62 63 64 | |
WorkflowStepStatus
¶
Bases: str, Enum
WorkflowStepState
dataclass
¶
WorkflowStepState(
fingerprint,
status=PENDING,
started_at=None,
finished_at=None,
)
WorkflowStepContext
dataclass
¶
WorkflowStepContext(info, state)
WorkflowStep
¶
WorkflowStep(*args, **kwargs)
Bases: Generic[OutputT], Registrable
Source code in src/formed/workflow/step.py
96 97 98 | |
get_output_type
classmethod
¶
get_output_type(field=None)
Source code in src/formed/workflow/step.py
111 112 113 114 115 116 117 118 119 120 | |
from_callable
classmethod
¶
from_callable(
func,
*,
version=None,
deterministic=True,
cacheable=None,
format=None,
)
Source code in src/formed/workflow/step.py
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | |
get_source
classmethod
¶
get_source()
Source code in src/formed/workflow/step.py
162 163 164 | |
get_normalized_source
classmethod
¶
get_normalized_source()
Source code in src/formed/workflow/step.py
166 167 168 | |
get_ignore_args
classmethod
¶
get_ignore_args()
Source code in src/formed/workflow/step.py
170 171 172 173 | |
WorkflowStepInfo
dataclass
¶
WorkflowStepInfo(name, step, dependencies, fieldref=None)
Bases: Generic[WorkflowStepT]
Unified step info that works for both live and archived steps.
The step field determines the mode:
Lazy[WorkflowStepT]: Live mode - can be constructed and executedWorkflowStepArchive: Archived mode - immutable snapshot from past execution
Live mode (before execution):
- step:
Lazy[WorkflowStepT]that can be constructed - dependencies: references to other live
WorkflowStepInfoobjects - Properties computed from
step_class
Archived mode (after execution):
- step:
WorkflowStepArchivewith pre-computed metadata - dependencies: references to other archived
WorkflowStepInfoobjects - Properties returned from archive
should_be_cached
cached
property
¶
should_be_cached
Check if step should be cached. Works in both modes.
is_live
¶
is_live()
Check if this is a live step.
Source code in src/formed/workflow/step.py
205 206 207 | |
is_archived
¶
is_archived()
Check if this is an archived step.
Source code in src/formed/workflow/step.py
209 210 211 | |
to_archive
¶
to_archive()
Convert to archive format for serialization.
This is called at execution time to capture the current state. Only works in live mode.
Source code in src/formed/workflow/step.py
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 | |
from_archive
classmethod
¶
from_archive(archive, dependency_map)
Reconstruct WorkflowStepInfo from an archive.
| PARAMETER | DESCRIPTION |
|---|---|
archive
|
The archived step metadata
TYPE:
|
dependency_map
|
Map from fingerprint to
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
WorkflowStepInfo
|
|
Source code in src/formed/workflow/step.py
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 | |
json
¶
json()
Convert to dict for JSON serialization (legacy compatibility).
Source code in src/formed/workflow/step.py
402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 | |
step
¶
step(
name: str,
*,
version: Optional[str] = ...,
deterministic: bool = ...,
cacheable: Optional[bool] = ...,
exist_ok: bool = ...,
format: Optional[Union[str, Format]] = ...,
) -> Callable[[StepFunctionT], StepFunctionT]
step(
name: StepFunctionT,
*,
version: Optional[str] = ...,
deterministic: bool = ...,
cacheable: Optional[bool] = ...,
exist_ok: bool = ...,
format: Optional[Union[str, Format]] = ...,
) -> StepFunctionT
step(
*,
version: Optional[str] = ...,
deterministic: bool = ...,
cacheable: Optional[bool] = ...,
exist_ok: bool = ...,
format: Optional[Union[str, Format]] = ...,
) -> Callable[[StepFunctionT], StepFunctionT]
step(
name=None,
*,
version=None,
deterministic=True,
cacheable=None,
exist_ok=False,
format=None,
)
Source code in src/formed/workflow/step.py
473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 | |
use_step_context
¶
use_step_context()
Source code in src/formed/workflow/step.py
510 511 | |
use_step_logger
¶
use_step_logger(default: Union[str, Logger]) -> Logger
use_step_logger(default: None = ...) -> Optional[Logger]
use_step_logger(default=None)
Source code in src/formed/workflow/step.py
522 523 524 525 526 527 528 529 530 | |
use_step_workdir
¶
use_step_workdir()
Source code in src/formed/workflow/step.py
533 534 535 536 537 538 539 | |
get_step_logger_from_info
¶
get_step_logger_from_info(info)
Source code in src/formed/workflow/step.py
542 543 | |
load_artifact
¶
load_artifact(path, format)
Source code in src/formed/workflow/step.py
551 552 553 554 | |
formed.workflow.utils
¶
WorkflowJSONEncoder
¶
Bases: JSONEncoder
default
¶
default(o)
Source code in src/formed/workflow/utils.py
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 | |
WorkflowJSONDecoder
¶
WorkflowJSONDecoder()
Bases: JSONDecoder
Source code in src/formed/workflow/utils.py
127 128 | |
object_fingerprint
¶
object_fingerprint(obj)
Source code in src/formed/workflow/utils.py
25 26 27 28 29 | |
as_jsonvalue
¶
as_jsonvalue(value)
Source code in src/formed/workflow/utils.py
32 33 | |
from_jsonvalue
¶
from_jsonvalue(value)
Source code in src/formed/workflow/utils.py
36 37 | |