Skip to content

Pipeline blocks

mindone.diffusers.modular_pipelines.modular_pipeline.ModularPipelineBlocks

Bases: ConfigMixin, PushToHubMixin

Base class for all Pipeline Blocks: PipelineBlock, AutoPipelineBlocks, SequentialPipelineBlocks, LoopSequentialPipelineBlocks

[ModularPipelineBlocks] provides method to load and save the defination of pipeline blocks.

This is an experimental feature and is likely to change in the future.

Source code in mindone/diffusers/modular_pipelines/modular_pipeline.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
class ModularPipelineBlocks(ConfigMixin, PushToHubMixin):
    """
    Base class for all Pipeline Blocks: PipelineBlock, AutoPipelineBlocks, SequentialPipelineBlocks,
    LoopSequentialPipelineBlocks

    [`ModularPipelineBlocks`] provides method to load and save the defination of pipeline blocks.

    <Tip warning={true}>

        This is an experimental feature and is likely to change in the future.

    </Tip>
    """

    config_name = "modular_config.json"
    model_name = None

    @classmethod
    def _get_signature_keys(cls, obj):
        parameters = inspect.signature(obj.__init__).parameters
        required_parameters = {k: v for k, v in parameters.items() if v.default == inspect._empty}
        optional_parameters = set({k for k, v in parameters.items() if v.default != inspect._empty})
        expected_modules = set(required_parameters.keys()) - {"self"}

        return expected_modules, optional_parameters

    def __init__(self):
        self.sub_blocks = InsertableDict()

    @property
    def description(self) -> str:
        """Description of the block. Must be implemented by subclasses."""
        return ""

    @property
    def expected_components(self) -> List[ComponentSpec]:
        return []

    @property
    def expected_configs(self) -> List[ConfigSpec]:
        return []

    @property
    def inputs(self) -> List[InputParam]:
        """List of input parameters. Must be implemented by subclasses."""
        return []

    def _get_required_inputs(self):
        input_names = []
        for input_param in self.inputs:
            if input_param.required:
                input_names.append(input_param.name)

        return input_names

    @property
    def required_inputs(self) -> List[InputParam]:
        return self._get_required_inputs()

    @property
    def intermediate_outputs(self) -> List[OutputParam]:
        """List of intermediate output parameters. Must be implemented by subclasses."""
        return []

    def _get_outputs(self):
        return self.intermediate_outputs

    @property
    def outputs(self) -> List[OutputParam]:
        return self._get_outputs()

    @classmethod
    def from_pretrained(
        cls,
        pretrained_model_name_or_path: str,
        trust_remote_code: Optional[bool] = None,
        **kwargs,
    ):
        hub_kwargs_names = [
            "cache_dir",
            "force_download",
            "local_files_only",
            "proxies",
            "resume_download",
            "revision",
            "subfolder",
            "token",
        ]
        hub_kwargs = {name: kwargs.pop(name) for name in hub_kwargs_names if name in kwargs}

        config = cls.load_config(pretrained_model_name_or_path)
        has_remote_code = "auto_map" in config and cls.__name__ in config["auto_map"]
        trust_remote_code = resolve_trust_remote_code(trust_remote_code, pretrained_model_name_or_path, has_remote_code)
        if not (has_remote_code and trust_remote_code):
            raise ValueError(
                "Selected model repository does not happear to have any custom code or does not have a valid `config.json` file."
            )

        class_ref = config["auto_map"][cls.__name__]
        module_file, class_name = class_ref.split(".")
        module_file = module_file + ".py"
        block_cls = get_class_from_dynamic_module(
            pretrained_model_name_or_path,
            module_file=module_file,
            class_name=class_name,
            **hub_kwargs,
            **kwargs,
        )
        expected_kwargs, optional_kwargs = block_cls._get_signature_keys(block_cls)
        block_kwargs = {name: kwargs.pop(name) for name in kwargs if name in expected_kwargs or name in optional_kwargs}

        return block_cls(**block_kwargs)

    def save_pretrained(self, save_directory, push_to_hub=False, **kwargs):
        # TODO: factor out this logic.
        cls_name = self.__class__.__name__

        full_mod = type(self).__module__
        module = full_mod.rsplit(".", 1)[-1].replace("__dynamic__", "")
        parent_module = self.save_pretrained.__func__.__qualname__.split(".", 1)[0]
        auto_map = {f"{parent_module}": f"{module}.{cls_name}"}

        self.register_to_config(auto_map=auto_map)
        self.save_config(save_directory=save_directory, push_to_hub=push_to_hub, **kwargs)
        config = dict(self.config)
        self._internal_dict = FrozenDict(config)

    def init_pipeline(
        self,
        pretrained_model_name_or_path: Optional[Union[str, os.PathLike]] = None,
        components_manager: Optional[ComponentsManager] = None,
        collection: Optional[str] = None,
    ) -> "ModularPipeline":
        """
        create a ModularPipeline, optionally accept modular_repo to load from hub.
        """
        pipeline_class_name = MODULAR_PIPELINE_MAPPING.get(self.model_name, ModularPipeline.__name__)
        diffusers_module = importlib.import_module("mindone.diffusers")
        pipeline_class = getattr(diffusers_module, pipeline_class_name)

        modular_pipeline = pipeline_class(
            blocks=deepcopy(self),
            pretrained_model_name_or_path=pretrained_model_name_or_path,
            components_manager=components_manager,
            collection=collection,
        )
        return modular_pipeline

    def get_block_state(self, state: PipelineState) -> dict:
        """Get all inputs and intermediates in one dictionary"""
        data = {}
        state_inputs = self.inputs

        # Check inputs
        for input_param in state_inputs:
            if input_param.name:
                value = state.get(input_param.name)
                if input_param.required and value is None:
                    raise ValueError(f"Required input '{input_param.name}' is missing")
                elif value is not None or (value is None and input_param.name not in data):
                    data[input_param.name] = value

            elif input_param.kwargs_type:
                # if kwargs_type is provided, get all inputs with matching kwargs_type
                if input_param.kwargs_type not in data:
                    data[input_param.kwargs_type] = {}
                inputs_kwargs = state.get_by_kwargs(input_param.kwargs_type)
                if inputs_kwargs:
                    for k, v in inputs_kwargs.items():
                        if v is not None:
                            data[k] = v
                            data[input_param.kwargs_type][k] = v

        return BlockState(**data)

    def set_block_state(self, state: PipelineState, block_state: BlockState):
        for output_param in self.intermediate_outputs:
            if not hasattr(block_state, output_param.name):
                raise ValueError(f"Intermediate output '{output_param.name}' is missing in block state")
            param = getattr(block_state, output_param.name)
            state.set(output_param.name, param, output_param.kwargs_type)

        for input_param in self.inputs:
            if input_param.name and hasattr(block_state, input_param.name):
                param = getattr(block_state, input_param.name)
                # Only add if the value is different from what's in the state
                current_value = state.get(input_param.name)
                if current_value is not param:  # Using identity comparison to check if object was modified
                    state.set(input_param.name, param, input_param.kwargs_type)

            elif input_param.kwargs_type:
                # if it is a kwargs type, e.g. "guider_input_fields", it is likely to be a list of parameters
                # we need to first find out which inputs are and loop through them.
                intermediate_kwargs = state.get_by_kwargs(input_param.kwargs_type)
                for param_name, current_value in intermediate_kwargs.items():
                    if param_name is None:
                        continue

                    if not hasattr(block_state, param_name):
                        continue

                    param = getattr(block_state, param_name)
                    if current_value is not param:  # Using identity comparison to check if object was modified
                        state.set(param_name, param, input_param.kwargs_type)

    @staticmethod
    def combine_inputs(*named_input_lists: List[Tuple[str, List[InputParam]]]) -> List[InputParam]:
        """
        Combines multiple lists of InputParam objects from different blocks. For duplicate inputs, updates only if
        current default value is None and new default value is not None. Warns if multiple non-None default values
        exist for the same input.

        Args:
            named_input_lists: List of tuples containing (block_name, input_param_list) pairs

        Returns:
            List[InputParam]: Combined list of unique InputParam objects
        """
        combined_dict = {}  # name -> InputParam
        value_sources = {}  # name -> block_name

        for block_name, inputs in named_input_lists:
            for input_param in inputs:
                if input_param.name is None and input_param.kwargs_type is not None:
                    input_name = "*_" + input_param.kwargs_type
                else:
                    input_name = input_param.name
                if input_name in combined_dict:
                    current_param = combined_dict[input_name]
                    if (
                        current_param.default is not None
                        and input_param.default is not None
                        and current_param.default != input_param.default
                    ):
                        warnings.warn(
                            f"Multiple different default values found for input '{input_name}': "
                            f"{current_param.default} (from block '{value_sources[input_name]}') and "
                            f"{input_param.default} (from block '{block_name}'). Using {current_param.default}."
                        )
                    if current_param.default is None and input_param.default is not None:
                        combined_dict[input_name] = input_param
                        value_sources[input_name] = block_name
                else:
                    combined_dict[input_name] = input_param
                    value_sources[input_name] = block_name

        return list(combined_dict.values())

    @staticmethod
    def combine_outputs(*named_output_lists: List[Tuple[str, List[OutputParam]]]) -> List[OutputParam]:
        """
        Combines multiple lists of OutputParam objects from different blocks. For duplicate outputs, keeps the first
        occurrence of each output name.

        Args:
            named_output_lists: List of tuples containing (block_name, output_param_list) pairs

        Returns:
            List[OutputParam]: Combined list of unique OutputParam objects
        """
        combined_dict = {}  # name -> OutputParam

        for block_name, outputs in named_output_lists:
            for output_param in outputs:
                if (output_param.name not in combined_dict) or (
                    combined_dict[output_param.name].kwargs_type is None and output_param.kwargs_type is not None
                ):
                    combined_dict[output_param.name] = output_param

        return list(combined_dict.values())

    @property
    def input_names(self) -> List[str]:
        return [input_param.name for input_param in self.inputs]

    @property
    def intermediate_output_names(self) -> List[str]:
        return [output_param.name for output_param in self.intermediate_outputs]

    @property
    def output_names(self) -> List[str]:
        return [output_param.name for output_param in self.outputs]

    @property
    def doc(self):
        return make_doc_string(
            self.inputs,
            self.outputs,
            self.description,
            class_name=self.__class__.__name__,
            expected_components=self.expected_components,
            expected_configs=self.expected_configs,
        )

mindone.diffusers.modular_pipelines.modular_pipeline.ModularPipelineBlocks.description property

Description of the block. Must be implemented by subclasses.

mindone.diffusers.modular_pipelines.modular_pipeline.ModularPipelineBlocks.inputs property

List of input parameters. Must be implemented by subclasses.

mindone.diffusers.modular_pipelines.modular_pipeline.ModularPipelineBlocks.intermediate_outputs property

List of intermediate output parameters. Must be implemented by subclasses.

mindone.diffusers.modular_pipelines.modular_pipeline.ModularPipelineBlocks.combine_inputs(*named_input_lists) staticmethod

Combines multiple lists of InputParam objects from different blocks. For duplicate inputs, updates only if current default value is None and new default value is not None. Warns if multiple non-None default values exist for the same input.

PARAMETER DESCRIPTION
named_input_lists

List of tuples containing (block_name, input_param_list) pairs

TYPE: List[Tuple[str, List[InputParam]]] DEFAULT: ()

RETURNS DESCRIPTION
List[InputParam]

List[InputParam]: Combined list of unique InputParam objects

Source code in mindone/diffusers/modular_pipelines/modular_pipeline.py
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
@staticmethod
def combine_inputs(*named_input_lists: List[Tuple[str, List[InputParam]]]) -> List[InputParam]:
    """
    Combines multiple lists of InputParam objects from different blocks. For duplicate inputs, updates only if
    current default value is None and new default value is not None. Warns if multiple non-None default values
    exist for the same input.

    Args:
        named_input_lists: List of tuples containing (block_name, input_param_list) pairs

    Returns:
        List[InputParam]: Combined list of unique InputParam objects
    """
    combined_dict = {}  # name -> InputParam
    value_sources = {}  # name -> block_name

    for block_name, inputs in named_input_lists:
        for input_param in inputs:
            if input_param.name is None and input_param.kwargs_type is not None:
                input_name = "*_" + input_param.kwargs_type
            else:
                input_name = input_param.name
            if input_name in combined_dict:
                current_param = combined_dict[input_name]
                if (
                    current_param.default is not None
                    and input_param.default is not None
                    and current_param.default != input_param.default
                ):
                    warnings.warn(
                        f"Multiple different default values found for input '{input_name}': "
                        f"{current_param.default} (from block '{value_sources[input_name]}') and "
                        f"{input_param.default} (from block '{block_name}'). Using {current_param.default}."
                    )
                if current_param.default is None and input_param.default is not None:
                    combined_dict[input_name] = input_param
                    value_sources[input_name] = block_name
            else:
                combined_dict[input_name] = input_param
                value_sources[input_name] = block_name

    return list(combined_dict.values())

mindone.diffusers.modular_pipelines.modular_pipeline.ModularPipelineBlocks.combine_outputs(*named_output_lists) staticmethod

Combines multiple lists of OutputParam objects from different blocks. For duplicate outputs, keeps the first occurrence of each output name.

PARAMETER DESCRIPTION
named_output_lists

List of tuples containing (block_name, output_param_list) pairs

TYPE: List[Tuple[str, List[OutputParam]]] DEFAULT: ()

RETURNS DESCRIPTION
List[OutputParam]

List[OutputParam]: Combined list of unique OutputParam objects

Source code in mindone/diffusers/modular_pipelines/modular_pipeline.py
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
@staticmethod
def combine_outputs(*named_output_lists: List[Tuple[str, List[OutputParam]]]) -> List[OutputParam]:
    """
    Combines multiple lists of OutputParam objects from different blocks. For duplicate outputs, keeps the first
    occurrence of each output name.

    Args:
        named_output_lists: List of tuples containing (block_name, output_param_list) pairs

    Returns:
        List[OutputParam]: Combined list of unique OutputParam objects
    """
    combined_dict = {}  # name -> OutputParam

    for block_name, outputs in named_output_lists:
        for output_param in outputs:
            if (output_param.name not in combined_dict) or (
                combined_dict[output_param.name].kwargs_type is None and output_param.kwargs_type is not None
            ):
                combined_dict[output_param.name] = output_param

    return list(combined_dict.values())

mindone.diffusers.modular_pipelines.modular_pipeline.ModularPipelineBlocks.get_block_state(state)

Get all inputs and intermediates in one dictionary

Source code in mindone/diffusers/modular_pipelines/modular_pipeline.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def get_block_state(self, state: PipelineState) -> dict:
    """Get all inputs and intermediates in one dictionary"""
    data = {}
    state_inputs = self.inputs

    # Check inputs
    for input_param in state_inputs:
        if input_param.name:
            value = state.get(input_param.name)
            if input_param.required and value is None:
                raise ValueError(f"Required input '{input_param.name}' is missing")
            elif value is not None or (value is None and input_param.name not in data):
                data[input_param.name] = value

        elif input_param.kwargs_type:
            # if kwargs_type is provided, get all inputs with matching kwargs_type
            if input_param.kwargs_type not in data:
                data[input_param.kwargs_type] = {}
            inputs_kwargs = state.get_by_kwargs(input_param.kwargs_type)
            if inputs_kwargs:
                for k, v in inputs_kwargs.items():
                    if v is not None:
                        data[k] = v
                        data[input_param.kwargs_type][k] = v

    return BlockState(**data)

mindone.diffusers.modular_pipelines.modular_pipeline.ModularPipelineBlocks.init_pipeline(pretrained_model_name_or_path=None, components_manager=None, collection=None)

create a ModularPipeline, optionally accept modular_repo to load from hub.

Source code in mindone/diffusers/modular_pipelines/modular_pipeline.py
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def init_pipeline(
    self,
    pretrained_model_name_or_path: Optional[Union[str, os.PathLike]] = None,
    components_manager: Optional[ComponentsManager] = None,
    collection: Optional[str] = None,
) -> "ModularPipeline":
    """
    create a ModularPipeline, optionally accept modular_repo to load from hub.
    """
    pipeline_class_name = MODULAR_PIPELINE_MAPPING.get(self.model_name, ModularPipeline.__name__)
    diffusers_module = importlib.import_module("mindone.diffusers")
    pipeline_class = getattr(diffusers_module, pipeline_class_name)

    modular_pipeline = pipeline_class(
        blocks=deepcopy(self),
        pretrained_model_name_or_path=pretrained_model_name_or_path,
        components_manager=components_manager,
        collection=collection,
    )
    return modular_pipeline

mindone.diffusers.modular_pipelines.modular_pipeline.SequentialPipelineBlocks

Bases: ModularPipelineBlocks

A Pipeline Blocks that combines multiple pipeline block classes into one. When called, it will call each block in sequence.

This class inherits from [ModularPipelineBlocks]. Check the superclass documentation for the generic methods the library implements for all the pipeline blocks (such as loading or saving etc.)

This is an experimental feature and is likely to change in the future.

ATTRIBUTE DESCRIPTION
block_classes

List of block classes to be used

block_names

List of prefixes for each block

Source code in mindone/diffusers/modular_pipelines/modular_pipeline.py
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
class SequentialPipelineBlocks(ModularPipelineBlocks):
    """
    A Pipeline Blocks that combines multiple pipeline block classes into one. When called, it will call each block in
    sequence.

    This class inherits from [`ModularPipelineBlocks`]. Check the superclass documentation for the generic methods the
    library implements for all the pipeline blocks (such as loading or saving etc.)

    <Tip warning={true}>

        This is an experimental feature and is likely to change in the future.

    </Tip>

    Attributes:
        block_classes: List of block classes to be used
        block_names: List of prefixes for each block
    """

    block_classes = []
    block_names = []

    @property
    def description(self):
        return ""

    @property
    def model_name(self):
        return next((block.model_name for block in self.sub_blocks.values() if block.model_name is not None), None)

    @property
    def expected_components(self):
        expected_components = []
        for block in self.sub_blocks.values():
            for component in block.expected_components:
                if component not in expected_components:
                    expected_components.append(component)
        return expected_components

    @property
    def expected_configs(self):
        expected_configs = []
        for block in self.sub_blocks.values():
            for config in block.expected_configs:
                if config not in expected_configs:
                    expected_configs.append(config)
        return expected_configs

    @classmethod
    def from_blocks_dict(cls, blocks_dict: Dict[str, Any]) -> "SequentialPipelineBlocks":
        """Creates a SequentialPipelineBlocks instance from a dictionary of blocks.

        Args:
            blocks_dict: Dictionary mapping block names to block classes or instances

        Returns:
            A new SequentialPipelineBlocks instance
        """
        instance = cls()

        # Create instances if classes are provided
        sub_blocks = InsertableDict()
        for name, block in blocks_dict.items():
            if inspect.isclass(block):
                sub_blocks[name] = block()
            else:
                sub_blocks[name] = block

        instance.block_classes = [block.__class__ for block in sub_blocks.values()]
        instance.block_names = list(sub_blocks.keys())
        instance.sub_blocks = sub_blocks
        return instance

    def __init__(self):
        sub_blocks = InsertableDict()
        for block_name, block_cls in zip(self.block_names, self.block_classes):
            sub_blocks[block_name] = block_cls()
        self.sub_blocks = sub_blocks

    def _get_inputs(self):
        inputs = []
        outputs = set()

        # Go through all blocks in order
        for block in self.sub_blocks.values():
            # Add inputs that aren't in outputs yet
            for inp in block.inputs:
                if inp.name not in outputs and inp.name not in {input.name for input in inputs}:
                    inputs.append(inp)

            # Only add outputs if the block cannot be skipped
            should_add_outputs = True
            if hasattr(block, "block_trigger_inputs") and None not in block.block_trigger_inputs:
                should_add_outputs = False

            if should_add_outputs:
                # Add this block's outputs
                block_intermediate_outputs = [out.name for out in block.intermediate_outputs]
                outputs.update(block_intermediate_outputs)

        return inputs

    # YiYi TODO: add test for this
    @property
    def inputs(self) -> List[Tuple[str, Any]]:
        return self._get_inputs()

    @property
    def required_inputs(self) -> List[str]:
        # Get the first block from the dictionary
        first_block = next(iter(self.sub_blocks.values()))
        required_by_any = set(getattr(first_block, "required_inputs", set()))

        # Union with required inputs from all other blocks
        for block in list(self.sub_blocks.values())[1:]:
            block_required = set(getattr(block, "required_inputs", set()))
            required_by_any.update(block_required)

        return list(required_by_any)

    @property
    def intermediate_outputs(self) -> List[str]:
        named_outputs = []
        for name, block in self.sub_blocks.items():
            inp_names = {inp.name for inp in block.inputs}
            # so we only need to list new variables as intermediate_outputs,
            # but if user wants to list these they modified it's still fine (a.k.a we don't enforce)
            # filter out them here so they do not end up as intermediate_outputs
            if name not in inp_names:
                named_outputs.append((name, block.intermediate_outputs))
        combined_outputs = self.combine_outputs(*named_outputs)
        return combined_outputs

    # YiYi TODO: I think we can remove the outputs property
    @property
    def outputs(self) -> List[str]:
        # return next(reversed(self.sub_blocks.values())).intermediate_outputs
        return self.intermediate_outputs

    def __call__(self, pipeline, state: PipelineState) -> PipelineState:
        for block_name, block in self.sub_blocks.items():
            try:
                pipeline, state = block(pipeline, state)
            except Exception as e:
                error_msg = (
                    f"\nError in block: ({block_name}, {block.__class__.__name__})\n"
                    f"Error details: {str(e)}\n"
                    f"Traceback:\n{traceback.format_exc()}"
                )
                logger.error(error_msg)
                raise
        return pipeline, state

    def _get_trigger_inputs(self):
        """
        Returns a set of all unique trigger input values found in the blocks. Returns: Set[str] containing all unique
        block_trigger_inputs values
        """

        def fn_recursive_get_trigger(blocks):
            trigger_values = set()

            if blocks is not None:
                for name, block in blocks.items():
                    # Check if current block has trigger inputs(i.e. auto block)
                    if hasattr(block, "block_trigger_inputs") and block.block_trigger_inputs is not None:
                        # Add all non-None values from the trigger inputs list
                        trigger_values.update(t for t in block.block_trigger_inputs if t is not None)

                    # If block has sub_blocks, recursively check them
                    if block.sub_blocks:
                        nested_triggers = fn_recursive_get_trigger(block.sub_blocks)
                        trigger_values.update(nested_triggers)

            return trigger_values

        return fn_recursive_get_trigger(self.sub_blocks)

    @property
    def trigger_inputs(self):
        return self._get_trigger_inputs()

    def _traverse_trigger_blocks(self, trigger_inputs):
        # Convert trigger_inputs to a set for easier manipulation
        active_triggers = set(trigger_inputs)

        def fn_recursive_traverse(block, block_name, active_triggers):
            result_blocks = OrderedDict()

            # sequential(include loopsequential) or PipelineBlock
            if not hasattr(block, "block_trigger_inputs"):
                if block.sub_blocks:
                    # sequential or LoopSequentialPipelineBlocks (keep traversing)
                    for sub_block_name, sub_block in block.sub_blocks.items():
                        blocks_to_update = fn_recursive_traverse(sub_block, sub_block_name, active_triggers)
                        blocks_to_update = fn_recursive_traverse(sub_block, sub_block_name, active_triggers)
                        blocks_to_update = {f"{block_name}.{k}": v for k, v in blocks_to_update.items()}
                        result_blocks.update(blocks_to_update)
                else:
                    # PipelineBlock
                    result_blocks[block_name] = block
                    # Add this block's output names to active triggers if defined
                    if hasattr(block, "outputs"):
                        active_triggers.update(out.name for out in block.outputs)
                return result_blocks

            # auto
            else:
                # Find first block_trigger_input that matches any value in our active_triggers
                this_block = None
                for trigger_input in block.block_trigger_inputs:
                    if trigger_input is not None and trigger_input in active_triggers:
                        this_block = block.trigger_to_block_map[trigger_input]
                        break

                # If no matches found, try to get the default (None) block
                if this_block is None and None in block.block_trigger_inputs:
                    this_block = block.trigger_to_block_map[None]

                if this_block is not None:
                    # sequential/auto (keep traversing)
                    if this_block.sub_blocks:
                        result_blocks.update(fn_recursive_traverse(this_block, block_name, active_triggers))
                    else:
                        # PipelineBlock
                        result_blocks[block_name] = this_block
                        # Add this block's output names to active triggers if defined
                        # YiYi TODO: do we need outputs here? can it just be intermediate_outputs? can we get rid of outputs attribute?
                        if hasattr(this_block, "outputs"):
                            active_triggers.update(out.name for out in this_block.outputs)

            return result_blocks

        all_blocks = OrderedDict()
        for block_name, block in self.sub_blocks.items():
            blocks_to_update = fn_recursive_traverse(block, block_name, active_triggers)
            all_blocks.update(blocks_to_update)
        return all_blocks

    def get_execution_blocks(self, *trigger_inputs):
        trigger_inputs_all = self.trigger_inputs

        if trigger_inputs is not None:
            if not isinstance(trigger_inputs, (list, tuple, set)):
                trigger_inputs = [trigger_inputs]
            invalid_inputs = [x for x in trigger_inputs if x not in trigger_inputs_all]
            if invalid_inputs:
                logger.warning(
                    f"The following trigger inputs will be ignored as they are not supported: {invalid_inputs}"
                )
                trigger_inputs = [x for x in trigger_inputs if x in trigger_inputs_all]

        if trigger_inputs is None:
            if None in trigger_inputs_all:
                trigger_inputs = [None]
            else:
                trigger_inputs = [trigger_inputs_all[0]]
        blocks_triggered = self._traverse_trigger_blocks(trigger_inputs)
        return SequentialPipelineBlocks.from_blocks_dict(blocks_triggered)

    def __repr__(self):
        class_name = self.__class__.__name__
        base_class = self.__class__.__bases__[0].__name__
        header = (
            f"{class_name}(\n  Class: {base_class}\n" if base_class and base_class != "object" else f"{class_name}(\n"
        )

        if self.trigger_inputs:
            header += "\n"
            header += "  " + "=" * 100 + "\n"
            header += "  This pipeline contains blocks that are selected at runtime based on inputs.\n"
            header += f"  Trigger Inputs: {[inp for inp in self.trigger_inputs if inp is not None]}\n"
            # Get first trigger input as example
            example_input = next(t for t in self.trigger_inputs if t is not None)
            header += f"  Use `get_execution_blocks()` with input names to see selected blocks (e.g. `get_execution_blocks('{example_input}')`).\n"
            header += "  " + "=" * 100 + "\n\n"

        # Format description with proper indentation
        desc_lines = self.description.split("\n")
        desc = []
        # First line with "Description:" label
        desc.append(f"  Description: {desc_lines[0]}")
        # Subsequent lines with proper indentation
        if len(desc_lines) > 1:
            desc.extend(f"      {line}" for line in desc_lines[1:])
        desc = "\n".join(desc) + "\n"

        # Components section - focus only on expected components
        expected_components = getattr(self, "expected_components", [])
        components_str = format_components(expected_components, indent_level=2, add_empty_lines=False)

        # Configs section - use format_configs with add_empty_lines=False
        expected_configs = getattr(self, "expected_configs", [])
        configs_str = format_configs(expected_configs, indent_level=2, add_empty_lines=False)

        # Blocks section - moved to the end with simplified format
        blocks_str = "  Sub-Blocks:\n"
        for i, (name, block) in enumerate(self.sub_blocks.items()):
            # Get trigger input for this block
            trigger = None
            if hasattr(self, "block_to_trigger_map"):
                trigger = self.block_to_trigger_map.get(name)
                # Format the trigger info
                if trigger is None:
                    trigger_str = "[default]"
                elif isinstance(trigger, (list, tuple)):
                    trigger_str = f"[trigger: {', '.join(str(t) for t in trigger)}]"
                else:
                    trigger_str = f"[trigger: {trigger}]"
                # For AutoPipelineBlocks, add bullet points
                blocks_str += f"    • {name} {trigger_str} ({block.__class__.__name__})\n"
            else:
                # For SequentialPipelineBlocks, show execution order
                blocks_str += f"    [{i}] {name} ({block.__class__.__name__})\n"

            # Add block description
            desc_lines = block.description.split("\n")
            indented_desc = desc_lines[0]
            if len(desc_lines) > 1:
                indented_desc += "\n" + "\n".join("                   " + line for line in desc_lines[1:])
            blocks_str += f"       Description: {indented_desc}\n\n"

        # Build the representation with conditional sections
        result = f"{header}\n{desc}"

        # Only add components section if it has content
        if components_str.strip():
            result += f"\n\n{components_str}"

        # Only add configs section if it has content
        if configs_str.strip():
            result += f"\n\n{configs_str}"

        # Always add blocks section
        result += f"\n\n{blocks_str})"

        return result

    @property
    def doc(self):
        return make_doc_string(
            self.inputs,
            self.outputs,
            self.description,
            class_name=self.__class__.__name__,
            expected_components=self.expected_components,
            expected_configs=self.expected_configs,
        )

mindone.diffusers.modular_pipelines.modular_pipeline.SequentialPipelineBlocks.from_blocks_dict(blocks_dict) classmethod

Creates a SequentialPipelineBlocks instance from a dictionary of blocks.

PARAMETER DESCRIPTION
blocks_dict

Dictionary mapping block names to block classes or instances

TYPE: Dict[str, Any]

RETURNS DESCRIPTION
SequentialPipelineBlocks

A new SequentialPipelineBlocks instance

Source code in mindone/diffusers/modular_pipelines/modular_pipeline.py
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
@classmethod
def from_blocks_dict(cls, blocks_dict: Dict[str, Any]) -> "SequentialPipelineBlocks":
    """Creates a SequentialPipelineBlocks instance from a dictionary of blocks.

    Args:
        blocks_dict: Dictionary mapping block names to block classes or instances

    Returns:
        A new SequentialPipelineBlocks instance
    """
    instance = cls()

    # Create instances if classes are provided
    sub_blocks = InsertableDict()
    for name, block in blocks_dict.items():
        if inspect.isclass(block):
            sub_blocks[name] = block()
        else:
            sub_blocks[name] = block

    instance.block_classes = [block.__class__ for block in sub_blocks.values()]
    instance.block_names = list(sub_blocks.keys())
    instance.sub_blocks = sub_blocks
    return instance

mindone.diffusers.modular_pipelines.modular_pipeline.LoopSequentialPipelineBlocks

Bases: ModularPipelineBlocks

A Pipeline blocks that combines multiple pipeline block classes into a For Loop. When called, it will call each block in sequence.

This class inherits from [ModularPipelineBlocks]. Check the superclass documentation for the generic methods the library implements for all the pipeline blocks (such as loading or saving etc.)

This is an experimental feature and is likely to change in the future.

ATTRIBUTE DESCRIPTION
block_classes

List of block classes to be used

block_names

List of prefixes for each block

Source code in mindone/diffusers/modular_pipelines/modular_pipeline.py
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
class LoopSequentialPipelineBlocks(ModularPipelineBlocks):
    """
    A Pipeline blocks that combines multiple pipeline block classes into a For Loop. When called, it will call each
    block in sequence.

    This class inherits from [`ModularPipelineBlocks`]. Check the superclass documentation for the generic methods the
    library implements for all the pipeline blocks (such as loading or saving etc.)

    <Tip warning={true}>

        This is an experimental feature and is likely to change in the future.

    </Tip>

    Attributes:
        block_classes: List of block classes to be used
        block_names: List of prefixes for each block
    """

    model_name = None
    block_classes = []
    block_names = []

    @property
    def description(self) -> str:
        """Description of the block. Must be implemented by subclasses."""
        raise NotImplementedError("description method must be implemented in subclasses")

    @property
    def loop_expected_components(self) -> List[ComponentSpec]:
        return []

    @property
    def loop_expected_configs(self) -> List[ConfigSpec]:
        return []

    @property
    def loop_inputs(self) -> List[InputParam]:
        """List of input parameters. Must be implemented by subclasses."""
        return []

    @property
    def loop_required_inputs(self) -> List[str]:
        input_names = []
        for input_param in self.loop_inputs:
            if input_param.required:
                input_names.append(input_param.name)
        return input_names

    @property
    def loop_intermediate_outputs(self) -> List[OutputParam]:
        """List of intermediate output parameters. Must be implemented by subclasses."""
        return []

    # modified from SequentialPipelineBlocks to include loop_expected_components
    @property
    def expected_components(self):
        expected_components = []
        for block in self.sub_blocks.values():
            for component in block.expected_components:
                if component not in expected_components:
                    expected_components.append(component)
        for component in self.loop_expected_components:
            if component not in expected_components:
                expected_components.append(component)
        return expected_components

    # modified from SequentialPipelineBlocks to include loop_expected_configs
    @property
    def expected_configs(self):
        expected_configs = []
        for block in self.sub_blocks.values():
            for config in block.expected_configs:
                if config not in expected_configs:
                    expected_configs.append(config)
        for config in self.loop_expected_configs:
            if config not in expected_configs:
                expected_configs.append(config)
        return expected_configs

    def _get_inputs(self):
        inputs = []
        inputs.extend(self.loop_inputs)
        outputs = set()

        for name, block in self.sub_blocks.items():
            # Add inputs that aren't in outputs yet
            for inp in block.inputs:
                if inp.name not in outputs and inp not in inputs:
                    inputs.append(inp)

            # Only add outputs if the block cannot be skipped
            should_add_outputs = True
            if hasattr(block, "block_trigger_inputs") and None not in block.block_trigger_inputs:
                should_add_outputs = False

            if should_add_outputs:
                # Add this block's outputs
                block_intermediate_outputs = [out.name for out in block.intermediate_outputs]
                outputs.update(block_intermediate_outputs)

        for input_param in inputs:
            if input_param.name in self.required_inputs:
                input_param.required = True
            else:
                input_param.required = False

        return inputs

    @property
    # Copied from diffusers.modular_pipelines.modular_pipeline.SequentialPipelineBlocks.inputs
    def inputs(self):
        return self._get_inputs()

    # modified from SequentialPipelineBlocks, if any additionan input required by the loop is required by the block
    @property
    def required_inputs(self) -> List[str]:
        # Get the first block from the dictionary
        first_block = next(iter(self.sub_blocks.values()))
        required_by_any = set(getattr(first_block, "required_inputs", set()))

        required_by_loop = set(getattr(self, "loop_required_inputs", set()))
        required_by_any.update(required_by_loop)

        # Union with required inputs from all other blocks
        for block in list(self.sub_blocks.values())[1:]:
            block_required = set(getattr(block, "required_inputs", set()))
            required_by_any.update(block_required)

        return list(required_by_any)

    # YiYi TODO: this need to be thought about more
    # modified from SequentialPipelineBlocks to include loop_intermediate_outputs
    @property
    def intermediate_outputs(self) -> List[str]:
        named_outputs = [(name, block.intermediate_outputs) for name, block in self.sub_blocks.items()]
        combined_outputs = self.combine_outputs(*named_outputs)
        for output in self.loop_intermediate_outputs:
            if output.name not in {output.name for output in combined_outputs}:
                combined_outputs.append(output)
        return combined_outputs

    # YiYi TODO: this need to be thought about more
    @property
    def outputs(self) -> List[str]:
        return next(reversed(self.sub_blocks.values())).intermediate_outputs

    def __init__(self):
        sub_blocks = InsertableDict()
        for block_name, block_cls in zip(self.block_names, self.block_classes):
            sub_blocks[block_name] = block_cls()
        self.sub_blocks = sub_blocks

    @classmethod
    def from_blocks_dict(cls, blocks_dict: Dict[str, Any]) -> "LoopSequentialPipelineBlocks":
        """
        Creates a LoopSequentialPipelineBlocks instance from a dictionary of blocks.

        Args:
            blocks_dict: Dictionary mapping block names to block instances

        Returns:
            A new LoopSequentialPipelineBlocks instance
        """
        instance = cls()

        # Create instances if classes are provided
        sub_blocks = InsertableDict()
        for name, block in blocks_dict.items():
            if inspect.isclass(block):
                sub_blocks[name] = block()
            else:
                sub_blocks[name] = block

        instance.block_classes = [block.__class__ for block in blocks_dict.values()]
        instance.block_names = list(blocks_dict.keys())
        instance.sub_blocks = blocks_dict
        return instance

    def loop_step(self, components, state: PipelineState, **kwargs):
        for block_name, block in self.sub_blocks.items():
            try:
                components, state = block(components, state, **kwargs)
            except Exception as e:
                error_msg = (
                    f"\nError in block: ({block_name}, {block.__class__.__name__})\n"
                    f"Error details: {str(e)}\n"
                    f"Traceback:\n{traceback.format_exc()}"
                )
                logger.error(error_msg)
                raise
        return components, state

    def __call__(self, components, state: PipelineState) -> PipelineState:
        raise NotImplementedError("`__call__` method needs to be implemented by the subclass")

    @property
    def doc(self):
        return make_doc_string(
            self.inputs,
            self.outputs,
            self.description,
            class_name=self.__class__.__name__,
            expected_components=self.expected_components,
            expected_configs=self.expected_configs,
        )

    # modified from SequentialPipelineBlocks,
    # (does not need trigger_inputs related part so removed them,
    # do not need to support auto block for loop blocks)
    def __repr__(self):
        class_name = self.__class__.__name__
        base_class = self.__class__.__bases__[0].__name__
        header = (
            f"{class_name}(\n  Class: {base_class}\n" if base_class and base_class != "object" else f"{class_name}(\n"
        )

        # Format description with proper indentation
        desc_lines = self.description.split("\n")
        desc = []
        # First line with "Description:" label
        desc.append(f"  Description: {desc_lines[0]}")
        # Subsequent lines with proper indentation
        if len(desc_lines) > 1:
            desc.extend(f"      {line}" for line in desc_lines[1:])
        desc = "\n".join(desc) + "\n"

        # Components section - focus only on expected components
        expected_components = getattr(self, "expected_components", [])
        components_str = format_components(expected_components, indent_level=2, add_empty_lines=False)

        # Configs section - use format_configs with add_empty_lines=False
        expected_configs = getattr(self, "expected_configs", [])
        configs_str = format_configs(expected_configs, indent_level=2, add_empty_lines=False)

        # Blocks section - moved to the end with simplified format
        blocks_str = "  Sub-Blocks:\n"
        for i, (name, block) in enumerate(self.sub_blocks.items()):
            # For SequentialPipelineBlocks, show execution order
            blocks_str += f"    [{i}] {name} ({block.__class__.__name__})\n"

            # Add block description
            desc_lines = block.description.split("\n")
            indented_desc = desc_lines[0]
            if len(desc_lines) > 1:
                indented_desc += "\n" + "\n".join("                   " + line for line in desc_lines[1:])
            blocks_str += f"       Description: {indented_desc}\n\n"

        # Build the representation with conditional sections
        result = f"{header}\n{desc}"

        # Only add components section if it has content
        if components_str.strip():
            result += f"\n\n{components_str}"

        # Only add configs section if it has content
        if configs_str.strip():
            result += f"\n\n{configs_str}"

        # Always add blocks section
        result += f"\n\n{blocks_str})"

        return result

    def progress_bar(self, iterable=None, total=None):
        if not hasattr(self, "_progress_bar_config"):
            self._progress_bar_config = {}
        elif not isinstance(self._progress_bar_config, dict):
            raise ValueError(
                f"`self._progress_bar_config` should be of type `dict`, but is {type(self._progress_bar_config)}."
            )

        if iterable is not None:
            return tqdm(iterable, **self._progress_bar_config)
        elif total is not None:
            return tqdm(total=total, **self._progress_bar_config)
        else:
            raise ValueError("Either `total` or `iterable` has to be defined.")

    def set_progress_bar_config(self, **kwargs):
        self._progress_bar_config = kwargs

mindone.diffusers.modular_pipelines.modular_pipeline.LoopSequentialPipelineBlocks.description property

Description of the block. Must be implemented by subclasses.

mindone.diffusers.modular_pipelines.modular_pipeline.LoopSequentialPipelineBlocks.loop_inputs property

List of input parameters. Must be implemented by subclasses.

mindone.diffusers.modular_pipelines.modular_pipeline.LoopSequentialPipelineBlocks.loop_intermediate_outputs property

List of intermediate output parameters. Must be implemented by subclasses.

mindone.diffusers.modular_pipelines.modular_pipeline.LoopSequentialPipelineBlocks.from_blocks_dict(blocks_dict) classmethod

Creates a LoopSequentialPipelineBlocks instance from a dictionary of blocks.

PARAMETER DESCRIPTION
blocks_dict

Dictionary mapping block names to block instances

TYPE: Dict[str, Any]

RETURNS DESCRIPTION
LoopSequentialPipelineBlocks

A new LoopSequentialPipelineBlocks instance

Source code in mindone/diffusers/modular_pipelines/modular_pipeline.py
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
@classmethod
def from_blocks_dict(cls, blocks_dict: Dict[str, Any]) -> "LoopSequentialPipelineBlocks":
    """
    Creates a LoopSequentialPipelineBlocks instance from a dictionary of blocks.

    Args:
        blocks_dict: Dictionary mapping block names to block instances

    Returns:
        A new LoopSequentialPipelineBlocks instance
    """
    instance = cls()

    # Create instances if classes are provided
    sub_blocks = InsertableDict()
    for name, block in blocks_dict.items():
        if inspect.isclass(block):
            sub_blocks[name] = block()
        else:
            sub_blocks[name] = block

    instance.block_classes = [block.__class__ for block in blocks_dict.values()]
    instance.block_names = list(blocks_dict.keys())
    instance.sub_blocks = blocks_dict
    return instance

mindone.diffusers.modular_pipelines.modular_pipeline.AutoPipelineBlocks

Bases: ModularPipelineBlocks

A Pipeline Blocks that automatically selects a block to run based on the inputs.

This class inherits from [ModularPipelineBlocks]. Check the superclass documentation for the generic methods the library implements for all the pipeline blocks (such as loading or saving etc.)

This is an experimental feature and is likely to change in the future.

ATTRIBUTE DESCRIPTION
block_classes

List of block classes to be used

block_names

List of prefixes for each block

block_trigger_inputs

List of input names that trigger specific blocks, with None for default

Source code in mindone/diffusers/modular_pipelines/modular_pipeline.py
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
class AutoPipelineBlocks(ModularPipelineBlocks):
    """
    A Pipeline Blocks that automatically selects a block to run based on the inputs.

    This class inherits from [`ModularPipelineBlocks`]. Check the superclass documentation for the generic methods the
    library implements for all the pipeline blocks (such as loading or saving etc.)

    <Tip warning={true}>

        This is an experimental feature and is likely to change in the future.

    </Tip>

    Attributes:
        block_classes: List of block classes to be used
        block_names: List of prefixes for each block
        block_trigger_inputs: List of input names that trigger specific blocks, with None for default
    """

    block_classes = []
    block_names = []
    block_trigger_inputs = []

    def __init__(self):
        sub_blocks = InsertableDict()
        for block_name, block_cls in zip(self.block_names, self.block_classes):
            sub_blocks[block_name] = block_cls()
        self.sub_blocks = sub_blocks
        if not (len(self.block_classes) == len(self.block_names) == len(self.block_trigger_inputs)):
            raise ValueError(
                f"In {self.__class__.__name__}, the number of block_classes, block_names, and block_trigger_inputs must be the same."
            )
        default_blocks = [t for t in self.block_trigger_inputs if t is None]
        # can only have 1 or 0 default block, and has to put in the last
        # the order of blocks matters here because the first block with matching trigger will be dispatched
        # e.g. blocks = [inpaint, img2img] and block_trigger_inputs = ["mask", "image"]
        # as long as mask is provided, it is inpaint; if only image is provided, it is img2img
        if len(default_blocks) > 1 or (len(default_blocks) == 1 and self.block_trigger_inputs[-1] is not None):
            raise ValueError(
                f"In {self.__class__.__name__}, exactly one None must be specified as the last element "
                "in block_trigger_inputs."
            )

        # Map trigger inputs to block objects
        self.trigger_to_block_map = dict(zip(self.block_trigger_inputs, self.sub_blocks.values()))
        self.trigger_to_block_name_map = dict(zip(self.block_trigger_inputs, self.sub_blocks.keys()))
        self.block_to_trigger_map = dict(zip(self.sub_blocks.keys(), self.block_trigger_inputs))

    @property
    def model_name(self):
        return next(iter(self.sub_blocks.values())).model_name

    @property
    def description(self):
        return ""

    @property
    def expected_components(self):
        expected_components = []
        for block in self.sub_blocks.values():
            for component in block.expected_components:
                if component not in expected_components:
                    expected_components.append(component)
        return expected_components

    @property
    def expected_configs(self):
        expected_configs = []
        for block in self.sub_blocks.values():
            for config in block.expected_configs:
                if config not in expected_configs:
                    expected_configs.append(config)
        return expected_configs

    @property
    def required_inputs(self) -> List[str]:
        if None not in self.block_trigger_inputs:
            return []
        first_block = next(iter(self.sub_blocks.values()))
        required_by_all = set(getattr(first_block, "required_inputs", set()))

        # Intersect with required inputs from all other blocks
        for block in list(self.sub_blocks.values())[1:]:
            block_required = set(getattr(block, "required_inputs", set()))
            required_by_all.intersection_update(block_required)

        return list(required_by_all)

    # YiYi TODO: add test for this
    @property
    def inputs(self) -> List[Tuple[str, Any]]:
        named_inputs = [(name, block.inputs) for name, block in self.sub_blocks.items()]
        combined_inputs = self.combine_inputs(*named_inputs)
        # mark Required inputs only if that input is required by all the blocks
        for input_param in combined_inputs:
            if input_param.name in self.required_inputs:
                input_param.required = True
            else:
                input_param.required = False
        return combined_inputs

    @property
    def intermediate_outputs(self) -> List[str]:
        named_outputs = [(name, block.intermediate_outputs) for name, block in self.sub_blocks.items()]
        combined_outputs = self.combine_outputs(*named_outputs)
        return combined_outputs

    @property
    def outputs(self) -> List[str]:
        named_outputs = [(name, block.outputs) for name, block in self.sub_blocks.items()]
        combined_outputs = self.combine_outputs(*named_outputs)
        return combined_outputs

    def __call__(self, pipeline, state: PipelineState) -> PipelineState:
        # Find default block first (if any)

        block = self.trigger_to_block_map.get(None)
        for input_name in self.block_trigger_inputs:
            if input_name is not None and state.get(input_name) is not None:
                block = self.trigger_to_block_map[input_name]
                break

        if block is None:
            logger.warning(f"skipping auto block: {self.__class__.__name__}")
            return pipeline, state

        try:
            logger.info(f"Running block: {block.__class__.__name__}, trigger: {input_name}")
            return block(pipeline, state)
        except Exception as e:
            error_msg = (
                f"\nError in block: {block.__class__.__name__}\n"
                f"Error details: {str(e)}\n"
                f"Traceback:\n{traceback.format_exc()}"
            )
            logger.error(error_msg)
            raise

    def _get_trigger_inputs(self):
        """
        Returns a set of all unique trigger input values found in the blocks. Returns: Set[str] containing all unique
        block_trigger_inputs values
        """

        def fn_recursive_get_trigger(blocks):
            trigger_values = set()

            if blocks is not None:
                for name, block in blocks.items():
                    # Check if current block has trigger inputs(i.e. auto block)
                    if hasattr(block, "block_trigger_inputs") and block.block_trigger_inputs is not None:
                        # Add all non-None values from the trigger inputs list
                        trigger_values.update(t for t in block.block_trigger_inputs if t is not None)

                    # If block has sub_blocks, recursively check them
                    if block.sub_blocks:
                        nested_triggers = fn_recursive_get_trigger(block.sub_blocks)
                        trigger_values.update(nested_triggers)

            return trigger_values

        trigger_inputs = set(self.block_trigger_inputs)
        trigger_inputs.update(fn_recursive_get_trigger(self.sub_blocks))

        return trigger_inputs

    @property
    def trigger_inputs(self):
        return self._get_trigger_inputs()

    def __repr__(self):
        class_name = self.__class__.__name__
        base_class = self.__class__.__bases__[0].__name__
        header = (
            f"{class_name}(\n  Class: {base_class}\n" if base_class and base_class != "object" else f"{class_name}(\n"
        )

        if self.trigger_inputs:
            header += "\n"
            header += "  " + "=" * 100 + "\n"
            header += "  This pipeline contains blocks that are selected at runtime based on inputs.\n"
            header += f"  Trigger Inputs: {[inp for inp in self.trigger_inputs if inp is not None]}\n"
            header += "  " + "=" * 100 + "\n\n"

        # Format description with proper indentation
        desc_lines = self.description.split("\n")
        desc = []
        # First line with "Description:" label
        desc.append(f"  Description: {desc_lines[0]}")
        # Subsequent lines with proper indentation
        if len(desc_lines) > 1:
            desc.extend(f"      {line}" for line in desc_lines[1:])
        desc = "\n".join(desc) + "\n"

        # Components section - focus only on expected components
        expected_components = getattr(self, "expected_components", [])
        components_str = format_components(expected_components, indent_level=2, add_empty_lines=False)

        # Configs section - use format_configs with add_empty_lines=False
        expected_configs = getattr(self, "expected_configs", [])
        configs_str = format_configs(expected_configs, indent_level=2, add_empty_lines=False)

        # Blocks section - moved to the end with simplified format
        blocks_str = "  Sub-Blocks:\n"
        for i, (name, block) in enumerate(self.sub_blocks.items()):
            # Get trigger input for this block
            trigger = None
            if hasattr(self, "block_to_trigger_map"):
                trigger = self.block_to_trigger_map.get(name)
                # Format the trigger info
                if trigger is None:
                    trigger_str = "[default]"
                elif isinstance(trigger, (list, tuple)):
                    trigger_str = f"[trigger: {', '.join(str(t) for t in trigger)}]"
                else:
                    trigger_str = f"[trigger: {trigger}]"
                # For AutoPipelineBlocks, add bullet points
                blocks_str += f"    • {name} {trigger_str} ({block.__class__.__name__})\n"
            else:
                # For SequentialPipelineBlocks, show execution order
                blocks_str += f"    [{i}] {name} ({block.__class__.__name__})\n"

            # Add block description
            desc_lines = block.description.split("\n")
            indented_desc = desc_lines[0]
            if len(desc_lines) > 1:
                indented_desc += "\n" + "\n".join("                   " + line for line in desc_lines[1:])
            blocks_str += f"       Description: {indented_desc}\n\n"

        # Build the representation with conditional sections
        result = f"{header}\n{desc}"

        # Only add components section if it has content
        if components_str.strip():
            result += f"\n\n{components_str}"

        # Only add configs section if it has content
        if configs_str.strip():
            result += f"\n\n{configs_str}"

        # Always add blocks section
        result += f"\n\n{blocks_str})"

        return result

    @property
    def doc(self):
        return make_doc_string(
            self.inputs,
            self.outputs,
            self.description,
            class_name=self.__class__.__name__,
            expected_components=self.expected_components,
            expected_configs=self.expected_configs,
        )