ModularPipelineBlocks¶
ModularPipelineBlocks
is the basic block for building a ModularPipeline
. It defines what components, inputs/outputs, and computation a block should perform for a specific step in a pipeline. A ModularPipelineBlocks
connects with other blocks, using state, to enable the modular construction of workflows.
A ModularPipelineBlocks
on it's own can't be executed. It is a blueprint for what a step should do in a pipeline. To actually run and execute a pipeline, the ModularPipelineBlocks
needs to be converted into a [ModularPipeline
].
This guide will show you how to create a ModularPipelineBlocks
.
Inputs and outputs¶
Tip
Refer to the States guide if you aren't familiar with how state works in Modular Diffusers.
A ModularPipelineBlocks
requires inputs
, and intermediate_outputs
.
-
inputs
are values provided by a user and retrieved from thePipelineState
. This is useful because some workflows resize an image, but the original image is still required. ThePipelineState
maintains the original image.Use
InputParam
to defineinputs
.from mindone.diffusers.modular_pipelines import InputParam user_inputs = [ InputParam(name="image", type_hint="PIL.Image", description="raw input image to process") ]
-
intermediate_inputs
are values typically created from a previous block but it can also be directly provided if no preceding block generates them. Unlikeinputs
,intermediate_inputs
can be modified.Use
InputParam
to defineintermediate_inputs
.user_intermediate_inputs = [ InputParam(name="processed_image", type_hint="torch.Tensor", description="image that has been preprocessed and normalized"), ]
-
intermediate_outputs
are new values created by a block and added to thePipelineState
. Theintermediate_outputs
are available asintermediate_inputs
for subsequent blocks or available as the final output from running the pipeline.Use
OutputParam
to defineintermediate_outputs
.from mindone.diffusers.modular_pipelines import OutputParam user_intermediate_outputs = [ OutputParam(name="image_latents", description="latents representing the image") ]
The intermediate inputs and outputs share data to connect blocks. They are accessible at any point, allowing you to track the workflow's progress.
Computation logic¶
The computation a block performs is defined in the __call__
method and it follows a specific structure.
- Retrieve the
BlockState
to get a local view of theinputs
andintermediate_inputs
. - Implement the computation logic on the
inputs
andintermediate_inputs
. - Update
PipelineState
to push changes from the localBlockState
back to the globalPipelineState
. - Return the components and state which becomes available to the next block.
def __call__(self, components, state):
# Get a local view of the state variables this block needs
block_state = self.get_block_state(state)
# Your computation logic here
# block_state contains all your inputs and intermediate_inputs
# Access them like: block_state.image, block_state.processed_image
# Update the pipeline state with your updated block_states
self.set_block_state(state, block_state)
return components, state
Components and configs¶
The components and pipeline-level configs a block needs are specified in ComponentSpec
and ConfigSpec
.
ComponentSpec
contains the expected components used by a block. You need thename
of the component and ideally atype_hint
that specifies exactly what the component is.ConfigSpec
contains pipeline-level settings that control behavior across all blocks.
from mindone.diffusers import ComponentSpec, ConfigSpec
expected_components = [
ComponentSpec(name="unet", type_hint=UNet2DConditionModel),
ComponentSpec(name="scheduler", type_hint=EulerDiscreteScheduler)
]
expected_config = [
ConfigSpec("force_zeros_for_empty_prompt", True)
]
When the blocks are converted into a pipeline, the components become available to the block as the first argument in __call__
.
def __call__(self, components, state):
# Access components using dot notation
unet = components.unet
vae = components.vae
scheduler = components.scheduler