ModularPipelineBlocks¶
ModularPipelineBlocks is the basic block for building a ModularPipeline. It defines what components, inputs/outputs, and computation a block should perform for a specific step in a pipeline. A ModularPipelineBlocks connects with other blocks, using state, to enable the modular construction of workflows.
A ModularPipelineBlocks on it's own can't be executed. It is a blueprint for what a step should do in a pipeline. To actually run and execute a pipeline, the ModularPipelineBlocks needs to be converted into a [ModularPipeline].
This guide will show you how to create a ModularPipelineBlocks.
Inputs and outputs¶
Tip
Refer to the States guide if you aren't familiar with how state works in Modular Diffusers.
A ModularPipelineBlocks requires inputs, and intermediate_outputs.
-
inputsare values provided by a user and retrieved from thePipelineState. This is useful because some workflows resize an image, but the original image is still required. ThePipelineStatemaintains the original image.Use
InputParamto defineinputs.from mindone.diffusers.modular_pipelines import InputParam user_inputs = [ InputParam(name="image", type_hint="PIL.Image", description="raw input image to process") ] -
intermediate_inputsare values typically created from a previous block but it can also be directly provided if no preceding block generates them. Unlikeinputs,intermediate_inputscan be modified.Use
InputParamto defineintermediate_inputs.user_intermediate_inputs = [ InputParam(name="processed_image", type_hint="torch.Tensor", description="image that has been preprocessed and normalized"), ] -
intermediate_outputsare new values created by a block and added to thePipelineState. Theintermediate_outputsare available asintermediate_inputsfor subsequent blocks or available as the final output from running the pipeline.Use
OutputParamto defineintermediate_outputs.from mindone.diffusers.modular_pipelines import OutputParam user_intermediate_outputs = [ OutputParam(name="image_latents", description="latents representing the image") ]
The intermediate inputs and outputs share data to connect blocks. They are accessible at any point, allowing you to track the workflow's progress.
Computation logic¶
The computation a block performs is defined in the __call__ method and it follows a specific structure.
- Retrieve the
BlockStateto get a local view of theinputsandintermediate_inputs. - Implement the computation logic on the
inputsandintermediate_inputs. - Update
PipelineStateto push changes from the localBlockStateback to the globalPipelineState. - Return the components and state which becomes available to the next block.
def __call__(self, components, state):
# Get a local view of the state variables this block needs
block_state = self.get_block_state(state)
# Your computation logic here
# block_state contains all your inputs and intermediate_inputs
# Access them like: block_state.image, block_state.processed_image
# Update the pipeline state with your updated block_states
self.set_block_state(state, block_state)
return components, state
Components and configs¶
The components and pipeline-level configs a block needs are specified in ComponentSpec and ConfigSpec.
ComponentSpeccontains the expected components used by a block. You need thenameof the component and ideally atype_hintthat specifies exactly what the component is.ConfigSpeccontains pipeline-level settings that control behavior across all blocks.
from mindone.diffusers import ComponentSpec, ConfigSpec
expected_components = [
ComponentSpec(name="unet", type_hint=UNet2DConditionModel),
ComponentSpec(name="scheduler", type_hint=EulerDiscreteScheduler)
]
expected_config = [
ConfigSpec("force_zeros_for_empty_prompt", True)
]
When the blocks are converted into a pipeline, the components become available to the block as the first argument in __call__.
def __call__(self, components, state):
# Access components using dot notation
unet = components.unet
vae = components.vae
scheduler = components.scheduler