Skip to content

AudioLDM 2

AudioLDM 2 was proposed in AudioLDM 2: Learning Holistic Audio Generation with Self-supervised Pretraining by Haohe Liu et al. AudioLDM 2 takes a text prompt as input and predicts the corresponding audio. It can generate text-conditional sound effects, human speech and music.

Inspired by Stable Diffusion, AudioLDM 2 is a text-to-audio latent diffusion model (LDM) that learns continuous audio representations from text embeddings. Two text encoder models are used to compute the text embeddings from a prompt input: the text-branch of CLAP and the encoder of Flan-T5. These text embeddings are then projected to a shared embedding space by an AudioLDM2ProjectionModel. A GPT2 language model (LM) is used to auto-regressively predict eight new embedding vectors, conditional on the projected CLAP and Flan-T5 embeddings. The generated embedding vectors and Flan-T5 text embeddings are used as cross-attention conditioning in the LDM. The UNet of AudioLDM 2 is unique in the sense that it takes two cross-attention embeddings, as opposed to one cross-attention conditioning, as in most other LDMs.

The abstract of the paper is the following:

Although audio generation shares commonalities across different types of audio, such as speech, music, and sound effects, designing models for each type requires careful consideration of specific objectives and biases that can significantly differ from those of other types. To bring us closer to a unified perspective of audio generation, this paper proposes a framework that utilizes the same learning method for speech, music, and sound effect generation. Our framework introduces a general representation of audio, called "language of audio" (LOA). Any audio can be translated into LOA based on AudioMAE, a self-supervised pre-trained representation learning model. In the generation process, we translate any modalities into LOA by using a GPT-2 model, and we perform self-supervised audio generation learning with a latent diffusion model conditioned on LOA. The proposed framework naturally brings advantages such as in-context learning abilities and reusable self-supervised pretrained AudioMAE and latent diffusion models. Experiments on the major benchmarks of text-to-audio, text-to-music, and text-to-speech demonstrate state-of-the-art or competitive performance against previous approaches. Our code, pretrained model, and demo are available at this https URL.

This pipeline was contributed by sanchit-gandhi and Nguyễn Công Tú Anh. The original codebase can be found at haoheliu/audioldm2.

Tips

Choosing a checkpoint

AudioLDM2 comes in three variants. Two of these checkpoints are applicable to the general task of text-to-audio generation. The third checkpoint is trained exclusively on text-to-music generation.

All checkpoints share the same model size for the text encoders and VAE. They differ in the size and depth of the UNet. See table below for details on the three checkpoints:

Checkpoint Task UNet Model Size Total Model Size Training Data / h
audioldm2 Text-to-audio 350M 1.1B 1150k
audioldm2-large Text-to-audio 750M 1.5B 1150k
audioldm2-music Text-to-music 350M 1.1B 665k
audioldm2-gigaspeech Text-to-speech 350M 1.1B 10k
audioldm2-ljspeech Text-to-speech 350M 1.1B

Constructing a prompt

  • Descriptive prompt inputs work best: use adjectives to describe the sound (e.g. "high quality" or "clear") and make the prompt context specific (e.g. "water stream in a forest" instead of "stream").
  • It's best to use general terms like "cat" or "dog" instead of specific names or abstract objects the model may not be familiar with.
  • Using a negative prompt can significantly improve the quality of the generated waveform, by guiding the generation away from terms that correspond to poor quality audio. Try using a negative prompt of "Low quality."

Controlling inference

  • The quality of the predicted audio sample can be controlled by the num_inference_steps argument; higher steps give higher quality audio at the expense of slower inference.
  • The length of the predicted audio sample can be controlled by varying the audio_length_in_s argument.

Evaluating generated waveforms:

  • The quality of the generated waveforms can vary significantly based on the seed. Try generating with different seeds until you find a satisfactory generation.
  • Multiple waveforms can be generated in one go: set num_waveforms_per_prompt to a value greater than 1. Automatic scoring will be performed between the generated waveforms and prompt text, and the audios ranked from best to worst accordingly.

The following example demonstrates how to construct good music and speech generation using the aforementioned tips: example.

Tip

Make sure to check out the Schedulers guide to learn how to explore the tradeoff between scheduler speed and quality, and see the reuse components across pipelines section to learn how to efficiently load the same components into multiple pipelines.

mindone.diffusers.AudioLDM2Pipeline

Bases: DiffusionPipeline

Pipeline for text-to-audio generation using AudioLDM2.

This model inherits from [DiffusionPipeline]. Check the superclass documentation for the generic methods implemented for all pipelines (downloading, saving etc.).

PARAMETER DESCRIPTION
vae

Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations.

TYPE: [`AutoencoderKL`]

text_encoder

First frozen text-encoder. AudioLDM2 uses the joint audio-text embedding model CLAP, specifically the laion/clap-htsat-unfused variant. The text branch is used to encode the text prompt to a prompt embedding. The full audio-text model is used to rank generated waveforms against the text prompt by computing similarity scores.

TYPE: [`~transformers.ClapModel`]

text_encoder_2

Second frozen text-encoder. AudioLDM2 uses the encoder of T5, specifically the google/flan-t5-large variant. Second frozen text-encoder use for TTS. AudioLDM2 uses the encoder of Vits.

TYPE: [`~transformers.T5EncoderModel`, `~transformers.VitsModel`]

projection_model

A trained model used to linearly project the hidden-states from the first and second text encoder models and insert learned SOS and EOS token embeddings. The projected hidden-states from the two text encoders are concatenated to give the input to the language model. A Learned Position Embedding for the Vits hidden-states

TYPE: [`AudioLDM2ProjectionModel`]

language_model

An auto-regressive language model used to generate a sequence of hidden-states conditioned on the projected outputs from the two text encoders.

TYPE: [`~transformers.GPT2Model`]

tokenizer

Tokenizer to tokenize text for the first frozen text-encoder.

TYPE: [`~transformers.RobertaTokenizer`]

tokenizer_2

Tokenizer to tokenize text for the second frozen text-encoder.

TYPE: [`~transformers.T5Tokenizer`, `~transformers.VitsTokenizer`]

feature_extractor

Feature extractor to pre-process generated audio waveforms to log-mel spectrograms for automatic scoring.

TYPE: [`~transformers.ClapFeatureExtractor`]

unet

A UNet2DConditionModel to denoise the encoded audio latents.

TYPE: [`UNet2DConditionModel`]

scheduler

A scheduler to be used in combination with unet to denoise the encoded audio latents. Can be one of [DDIMScheduler], [LMSDiscreteScheduler], or [PNDMScheduler].

TYPE: [`SchedulerMixin`]

vocoder

Vocoder of class SpeechT5HifiGan to convert the mel-spectrogram latents to the final audio waveform.

TYPE: [`~transformers.SpeechT5HifiGan`]

Source code in mindone/diffusers/pipelines/audioldm2/pipeline_audioldm2.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
class AudioLDM2Pipeline(DiffusionPipeline):
    r"""
    Pipeline for text-to-audio generation using AudioLDM2.

    This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods
    implemented for all pipelines (downloading, saving etc.).

    Args:
        vae ([`AutoencoderKL`]):
            Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations.
        text_encoder ([`~transformers.ClapModel`]):
            First frozen text-encoder. AudioLDM2 uses the joint audio-text embedding model
            [CLAP](https://huggingface.co/docs/transformers/model_doc/clap#transformers.CLAPTextModelWithProjection),
            specifically the [laion/clap-htsat-unfused](https://huggingface.co/laion/clap-htsat-unfused) variant. The
            text branch is used to encode the text prompt to a prompt embedding. The full audio-text model is used to
            rank generated waveforms against the text prompt by computing similarity scores.
        text_encoder_2 ([`~transformers.T5EncoderModel`, `~transformers.VitsModel`]):
            Second frozen text-encoder. AudioLDM2 uses the encoder of
            [T5](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5EncoderModel), specifically the
            [google/flan-t5-large](https://huggingface.co/google/flan-t5-large) variant. Second frozen text-encoder use
            for TTS. AudioLDM2 uses the encoder of
            [Vits](https://huggingface.co/docs/transformers/model_doc/vits#transformers.VitsModel).
        projection_model ([`AudioLDM2ProjectionModel`]):
            A trained model used to linearly project the hidden-states from the first and second text encoder models
            and insert learned SOS and EOS token embeddings. The projected hidden-states from the two text encoders are
            concatenated to give the input to the language model. A Learned Position Embedding for the Vits
            hidden-states
        language_model ([`~transformers.GPT2Model`]):
            An auto-regressive language model used to generate a sequence of hidden-states conditioned on the projected
            outputs from the two text encoders.
        tokenizer ([`~transformers.RobertaTokenizer`]):
            Tokenizer to tokenize text for the first frozen text-encoder.
        tokenizer_2 ([`~transformers.T5Tokenizer`, `~transformers.VitsTokenizer`]):
            Tokenizer to tokenize text for the second frozen text-encoder.
        feature_extractor ([`~transformers.ClapFeatureExtractor`]):
            Feature extractor to pre-process generated audio waveforms to log-mel spectrograms for automatic scoring.
        unet ([`UNet2DConditionModel`]):
            A `UNet2DConditionModel` to denoise the encoded audio latents.
        scheduler ([`SchedulerMixin`]):
            A scheduler to be used in combination with `unet` to denoise the encoded audio latents. Can be one of
            [`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
        vocoder ([`~transformers.SpeechT5HifiGan`]):
            Vocoder of class `SpeechT5HifiGan` to convert the mel-spectrogram latents to the final audio waveform.
    """

    def __init__(
        self,
        vae: AutoencoderKL,
        text_encoder: ClapModel,
        text_encoder_2: Union[T5EncoderModel, VitsModel],
        projection_model: AudioLDM2ProjectionModel,
        language_model: GPT2Model,
        tokenizer: Union[RobertaTokenizer, RobertaTokenizerFast],
        tokenizer_2: Union[T5Tokenizer, T5TokenizerFast, VitsTokenizer],
        feature_extractor: ClapFeatureExtractor,
        unet: AudioLDM2UNet2DConditionModel,
        scheduler: KarrasDiffusionSchedulers,
        vocoder: SpeechT5HifiGan,
    ):
        super().__init__()

        self.register_modules(
            vae=vae,
            text_encoder=text_encoder,
            text_encoder_2=text_encoder_2,
            projection_model=projection_model,
            language_model=language_model,
            tokenizer=tokenizer,
            tokenizer_2=tokenizer_2,
            feature_extractor=feature_extractor,
            unet=unet,
            scheduler=scheduler,
            vocoder=vocoder,
        )
        self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)

    # Copied from mindone.diffusers.pipelines.pipeline_utils.StableDiffusionMixin.enable_vae_slicing
    def enable_vae_slicing(self):
        r"""
        Enable sliced VAE decoding. When this option is enabled, the VAE will split the input tensor in slices to
        compute decoding in several steps. This is useful to save some memory and allow larger batch sizes.
        """
        self.vae.enable_slicing()

    # Copied from mindone.diffusers.pipelines.pipeline_utils.StableDiffusionMixin.disable_vae_slicing
    def disable_vae_slicing(self):
        r"""
        Disable sliced VAE decoding. If `enable_vae_slicing` was previously enabled, this method will go back to
        computing decoding in one step.
        """
        self.vae.disable_slicing()

    def enable_model_cpu_offload(self):
        r"""
        Offloads all models to CPU using accelerate, reducing memory usage with a low impact on performance. Compared
        to `enable_sequential_cpu_offload`, this method moves one whole model at a time to the GPU when its `forward`
        method is called, and the model remains in GPU until the next model runs. Memory savings are lower than with
        `enable_sequential_cpu_offload`, but performance is much better due to the iterative execution of the `unet`.
        """
        hook = None

        # We'll offload the last model manually.
        self.final_offload_hook = hook

    def generate_language_model(
        self,
        inputs_embeds: mindspore.tensor = None,
        max_new_tokens: int = 8,
        **model_kwargs,
    ):
        """

        Generates a sequence of hidden-states from the language model, conditioned on the embedding inputs.

        Parameters:
            inputs_embeds (`mindspore.tensor` of shape `(batch_size, sequence_length, hidden_size)`):
                The sequence used as a prompt for the generation.
            max_new_tokens (`int`):
                Number of new tokens to generate.
            model_kwargs (`Dict[str, Any]`, *optional*):
                Ad hoc parametrization of additional model-specific kwargs that will be forwarded to the `forward`
                function of the model.

        Return:
            `inputs_embeds (`mindspore.tensor` of shape `(batch_size, sequence_length, hidden_size)`):
                The sequence of generated hidden-states.
        """
        self.language_model._supports_dynamic_input = True
        max_new_tokens = max_new_tokens if max_new_tokens is not None else self.language_model.config.max_new_tokens
        model_kwargs = self.language_model._get_initial_cache_position(inputs_embeds, model_kwargs)
        for _ in range(max_new_tokens):
            # prepare model inputs
            model_inputs = prepare_inputs_for_generation(inputs_embeds, **model_kwargs)

            # forward pass to get next hidden states
            output = self.language_model(**model_inputs, return_dict=True)

            next_hidden_states = output.last_hidden_state

            # Update the model input
            inputs_embeds = mint.cat([inputs_embeds, next_hidden_states[:, -1:, :]], dim=1)

            # Update generated hidden states, model inputs, and length for next step
            model_kwargs = self.language_model._update_model_kwargs_for_generation(output, model_kwargs)

        return inputs_embeds[:, -max_new_tokens:, :]

    def encode_prompt(
        self,
        prompt,
        num_waveforms_per_prompt,
        do_classifier_free_guidance,
        transcription=None,
        negative_prompt=None,
        prompt_embeds: Optional[mindspore.tensor] = None,
        negative_prompt_embeds: Optional[mindspore.tensor] = None,
        generated_prompt_embeds: Optional[mindspore.tensor] = None,
        negative_generated_prompt_embeds: Optional[mindspore.tensor] = None,
        attention_mask: Optional[mindspore.tensor] = None,
        negative_attention_mask: Optional[mindspore.tensor] = None,
        max_new_tokens: Optional[int] = None,
    ):
        r"""
        Encodes the prompt into text encoder hidden states.

        Args:
            prompt (`str` or `List[str]`, *optional*):
                prompt to be encoded
            transcription (`str` or `List[str]`):
                transcription of text to speech
            num_waveforms_per_prompt (`int`):
                number of waveforms that should be generated per prompt
            do_classifier_free_guidance (`bool`):
                whether to use classifier free guidance or not
            negative_prompt (`str` or `List[str]`, *optional*):
                The prompt or prompts not to guide the audio generation. If not defined, one has to pass
                `negative_prompt_embeds` instead. Ignored when not using guidance (i.e., ignored if `guidance_scale` is
                less than `1`).
            prompt_embeds (`mindspore.tensor`, *optional*):
                Pre-computed text embeddings from the Flan T5 model. Can be used to easily tweak text inputs, *e.g.*
                prompt weighting. If not provided, text embeddings will be computed from `prompt` input argument.
            negative_prompt_embeds (`mindspore.tensor`, *optional*):
                Pre-computed negative text embeddings from the Flan T5 model. Can be used to easily tweak text inputs,
                *e.g.* prompt weighting. If not provided, negative_prompt_embeds will be computed from
                `negative_prompt` input argument.
            generated_prompt_embeds (`mindspore.tensor`, *optional*):
                Pre-generated text embeddings from the GPT2 langauge model. Can be used to easily tweak text inputs,
                 *e.g.* prompt weighting. If not provided, text embeddings will be generated from `prompt` input
                 argument.
            negative_generated_prompt_embeds (`mindspore.tensor`, *optional*):
                Pre-generated negative text embeddings from the GPT2 language model. Can be used to easily tweak text
                inputs, *e.g.* prompt weighting. If not provided, negative_prompt_embeds will be computed from
                `negative_prompt` input argument.
            attention_mask (`mindspore.tensor`, *optional*):
                Pre-computed attention mask to be applied to the `prompt_embeds`. If not provided, attention mask will
                be computed from `prompt` input argument.
            negative_attention_mask (`mindspore.tensor`, *optional*):
                Pre-computed attention mask to be applied to the `negative_prompt_embeds`. If not provided, attention
                mask will be computed from `negative_prompt` input argument.
            max_new_tokens (`int`, *optional*, defaults to None):
                The number of new tokens to generate with the GPT2 language model.
        Returns:
            prompt_embeds (`mindspore.tensor`):
                Text embeddings from the Flan T5 model.
            attention_mask (`mindspore.tensor`):
                Attention mask to be applied to the `prompt_embeds`.
            generated_prompt_embeds (`mindspore.tensor`):
                Text embeddings generated from the GPT2 langauge model.

        Example:

        ```python
        >>> import scipy
        >>> from mindone.diffusers import AudioLDM2Pipeline

        >>> repo_id = "cvssp/audioldm2"
        >>> pipe = AudioLDM2Pipeline.from_pretrained(repo_id, torch_dtype=mindspore.float16)


        >>> # Get text embedding vectors
        >>> prompt_embeds, attention_mask, generated_prompt_embeds = pipe.encode_prompt(
        ...     prompt="Techno music with a strong, upbeat tempo and high melodic riffs",
        ...     do_classifier_free_guidance=True,
        ... )

        >>> # Pass text embeddings to pipeline for text-conditional audio generation
        >>> audio = pipe(
        ...     prompt_embeds=prompt_embeds,
        ...     attention_mask=attention_mask,
        ...     generated_prompt_embeds=generated_prompt_embeds,
        ...     num_inference_steps=200,
        ...     audio_length_in_s=10.0,
        ... ).audios[0]

        >>> # save generated audio sample
        >>> scipy.io.wavfile.write("techno.wav", rate=16000, data=audio)
        ```"""
        if prompt is not None and isinstance(prompt, str):
            batch_size = 1
        elif prompt is not None and isinstance(prompt, list):
            batch_size = len(prompt)
        else:
            batch_size = prompt_embeds.shape[0]

        # Define tokenizers and text encoders
        tokenizers = [self.tokenizer, self.tokenizer_2]
        is_vits_text_encoder = isinstance(self.text_encoder_2, VitsModel)

        if is_vits_text_encoder:
            text_encoders = [self.text_encoder, self.text_encoder_2.text_encoder]
        else:
            text_encoders = [self.text_encoder, self.text_encoder_2]

        if prompt_embeds is None:
            prompt_embeds_list = []
            attention_mask_list = []

            for tokenizer, text_encoder in zip(tokenizers, text_encoders):
                use_prompt = isinstance(
                    tokenizer, (RobertaTokenizer, RobertaTokenizerFast, T5Tokenizer, T5TokenizerFast)
                )
                text_inputs = tokenizer(
                    prompt if use_prompt else transcription,
                    padding="max_length"
                    if isinstance(tokenizer, (RobertaTokenizer, RobertaTokenizerFast, VitsTokenizer))
                    else True,
                    max_length=tokenizer.model_max_length,
                    truncation=True,
                    return_tensors="np",
                )
                text_input_ids = mindspore.tensor(text_inputs.input_ids)
                attention_mask = mindspore.tensor(text_inputs.attention_mask)
                untruncated_ids = mindspore.tensor(tokenizer(prompt, padding="longest", return_tensors="np").input_ids)

                if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not mint.equal(
                    text_input_ids, untruncated_ids
                ):
                    removed_text = tokenizer.batch_decode(untruncated_ids[:, tokenizer.model_max_length - 1 : -1])
                    logger.warning(
                        f"The following part of your input was truncated because {text_encoder.config.model_type} can "
                        f"only handle sequences up to {tokenizer.model_max_length} tokens: {removed_text}"
                    )

                text_input_ids = text_input_ids
                attention_mask = attention_mask

                if text_encoder.config.model_type == "clap":
                    prompt_embeds = text_encoder.get_text_features(
                        text_input_ids,
                        attention_mask=attention_mask,
                    )
                    # append the seq-len dim: (bs, hidden_size) -> (bs, seq_len, hidden_size)
                    prompt_embeds = prompt_embeds[:, None, :]
                    # make sure that we attend to this single hidden-state
                    attention_mask = attention_mask.new_ones((batch_size, 1))
                elif is_vits_text_encoder:
                    # Add end_token_id and attention mask in the end of sequence phonemes
                    for text_input_id, text_attention_mask in zip(text_input_ids, attention_mask):
                        for idx, phoneme_id in enumerate(text_input_id):
                            if phoneme_id == 0:
                                text_input_id[idx] = 182
                                text_attention_mask[idx] = 1
                                break
                    prompt_embeds = text_encoder(
                        text_input_ids, attention_mask=attention_mask, padding_mask=attention_mask.unsqueeze(-1)
                    )
                    prompt_embeds = prompt_embeds[0]
                else:
                    prompt_embeds = text_encoder(
                        text_input_ids,
                        attention_mask=attention_mask,
                    )
                    prompt_embeds = prompt_embeds[0]

                prompt_embeds_list.append(prompt_embeds)
                attention_mask_list.append(attention_mask)

            projection_output = self.projection_model(
                hidden_states=prompt_embeds_list[0],
                hidden_states_1=prompt_embeds_list[1],
                attention_mask=attention_mask_list[0],
                attention_mask_1=attention_mask_list[1],
            )
            projected_prompt_embeds = projection_output.hidden_states
            projected_attention_mask = projection_output.attention_mask

            generated_prompt_embeds = self.generate_language_model(
                projected_prompt_embeds,
                attention_mask=projected_attention_mask,
                max_new_tokens=max_new_tokens,
            )

        prompt_embeds = prompt_embeds.to(dtype=self.text_encoder_2.dtype)
        attention_mask = (
            attention_mask if attention_mask is not None else mint.ones(prompt_embeds.shape[:2], dtype=mindspore.int64)
        )
        generated_prompt_embeds = generated_prompt_embeds.to(dtype=self.language_model.dtype)

        bs_embed, seq_len, hidden_size = prompt_embeds.shape
        # duplicate text embeddings for each generation per prompt, using mps friendly method
        prompt_embeds = prompt_embeds.tile((1, num_waveforms_per_prompt, 1))
        prompt_embeds = prompt_embeds.view(bs_embed * num_waveforms_per_prompt, seq_len, hidden_size)

        # duplicate attention mask for each generation per prompt
        attention_mask = attention_mask.tile((1, num_waveforms_per_prompt))
        attention_mask = attention_mask.view(bs_embed * num_waveforms_per_prompt, seq_len)

        bs_embed, seq_len, hidden_size = generated_prompt_embeds.shape
        # duplicate generated embeddings for each generation per prompt, using mps friendly method
        generated_prompt_embeds = generated_prompt_embeds.tile((1, num_waveforms_per_prompt, 1))
        generated_prompt_embeds = generated_prompt_embeds.view(
            bs_embed * num_waveforms_per_prompt, seq_len, hidden_size
        )

        # get unconditional embeddings for classifier free guidance
        if do_classifier_free_guidance and negative_prompt_embeds is None:
            uncond_tokens: List[str]
            if negative_prompt is None:
                uncond_tokens = [""] * batch_size
            elif type(prompt) is not type(negative_prompt):
                raise TypeError(
                    f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !="
                    f" {type(prompt)}."
                )
            elif isinstance(negative_prompt, str):
                uncond_tokens = [negative_prompt]
            elif batch_size != len(negative_prompt):
                raise ValueError(
                    f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:"
                    f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches"
                    " the batch size of `prompt`."
                )
            else:
                uncond_tokens = negative_prompt

            negative_prompt_embeds_list = []
            negative_attention_mask_list = []
            max_length = prompt_embeds.shape[1]
            for tokenizer, text_encoder in zip(tokenizers, text_encoders):
                uncond_input = tokenizer(
                    uncond_tokens,
                    padding="max_length",
                    max_length=tokenizer.model_max_length
                    if isinstance(tokenizer, (RobertaTokenizer, RobertaTokenizerFast, VitsTokenizer))
                    else max_length,
                    truncation=True,
                    return_tensors="np",
                )

                uncond_input_ids = mindspore.tensor(uncond_input.input_ids)
                negative_attention_mask = mindspore.tensor(uncond_input.attention_mask)

                if text_encoder.config.model_type == "clap":
                    negative_prompt_embeds = text_encoder.get_text_features(
                        uncond_input_ids,
                        attention_mask=negative_attention_mask,
                    )
                    # append the seq-len dim: (bs, hidden_size) -> (bs, seq_len, hidden_size)
                    negative_prompt_embeds = negative_prompt_embeds[:, None, :]
                    # make sure that we attend to this single hidden-state
                    negative_attention_mask = negative_attention_mask.new_ones((batch_size, 1))
                elif is_vits_text_encoder:
                    negative_prompt_embeds = mint.zeros(
                        batch_size,
                        tokenizer.model_max_length,
                        text_encoder.config.hidden_size,
                    ).to(dtype=self.text_encoder_2.dtype)
                    negative_attention_mask = mint.zeros(batch_size, tokenizer.model_max_length).to(
                        dtype=self.text_encoder_2.dtype
                    )
                else:
                    negative_prompt_embeds = text_encoder(
                        uncond_input_ids,
                        attention_mask=negative_attention_mask,
                    )
                    negative_prompt_embeds = negative_prompt_embeds[0]

                negative_prompt_embeds_list.append(negative_prompt_embeds)
                negative_attention_mask_list.append(negative_attention_mask)

            projection_output = self.projection_model(
                hidden_states=negative_prompt_embeds_list[0],
                hidden_states_1=negative_prompt_embeds_list[1],
                attention_mask=negative_attention_mask_list[0],
                attention_mask_1=negative_attention_mask_list[1],
            )
            negative_projected_prompt_embeds = projection_output.hidden_states
            negative_projected_attention_mask = projection_output.attention_mask

            negative_generated_prompt_embeds = self.generate_language_model(
                negative_projected_prompt_embeds,
                attention_mask=negative_projected_attention_mask,
                max_new_tokens=max_new_tokens,
            )

        if do_classifier_free_guidance:
            seq_len = negative_prompt_embeds.shape[1]

            negative_prompt_embeds = negative_prompt_embeds.to(dtype=self.text_encoder_2.dtype)
            negative_attention_mask = (
                negative_attention_mask
                if negative_attention_mask is not None
                else mint.ones(negative_prompt_embeds.shape[:2], dtype=mindspore.int64)
            )
            negative_generated_prompt_embeds = negative_generated_prompt_embeds.to(dtype=self.language_model.dtype)

            # duplicate unconditional embeddings for each generation per prompt, using mps friendly method
            negative_prompt_embeds = negative_prompt_embeds.tile((1, num_waveforms_per_prompt, 1))
            negative_prompt_embeds = negative_prompt_embeds.view(batch_size * num_waveforms_per_prompt, seq_len, -1)

            # duplicate unconditional attention mask for each generation per prompt
            negative_attention_mask = negative_attention_mask.tile((1, num_waveforms_per_prompt))
            negative_attention_mask = negative_attention_mask.view(batch_size * num_waveforms_per_prompt, seq_len)

            # duplicate unconditional generated embeddings for each generation per prompt
            seq_len = negative_generated_prompt_embeds.shape[1]
            negative_generated_prompt_embeds = negative_generated_prompt_embeds.tile((1, num_waveforms_per_prompt, 1))
            negative_generated_prompt_embeds = negative_generated_prompt_embeds.view(
                batch_size * num_waveforms_per_prompt, seq_len, -1
            )

            # For classifier free guidance, we need to do two forward passes.
            # Here we concatenate the unconditional and text embeddings into a single batch
            # to avoid doing two forward passes
            prompt_embeds = mint.cat([negative_prompt_embeds, prompt_embeds])
            attention_mask = mint.cat([negative_attention_mask, attention_mask])
            generated_prompt_embeds = mint.cat([negative_generated_prompt_embeds, generated_prompt_embeds])

        return prompt_embeds, attention_mask, generated_prompt_embeds

    # Copied from mindone.diffusers.pipelines.audioldm.pipeline_audioldm.AudioLDMPipeline.mel_spectrogram_to_waveform
    def mel_spectrogram_to_waveform(self, mel_spectrogram):
        if mel_spectrogram.dim() == 4:
            mel_spectrogram = mel_spectrogram.squeeze(1)

        waveform = self.vocoder(mel_spectrogram)
        # we always cast to float32 as this does not cause significant overhead and is compatible with bfloat16
        waveform = waveform.float()
        return waveform

    def score_waveforms(self, text, audio, num_waveforms_per_prompt, dtype):
        inputs = self.tokenizer(text, return_tensors="np", padding=True)
        inputs["input_ids"] = mindspore.tensor(inputs["input_ids"])
        inputs["attention_mask"] = mindspore.tensor(inputs["attention_mask"])
        resampled_audio = librosa.resample(
            audio.numpy(), orig_sr=self.vocoder.config.sampling_rate, target_sr=self.feature_extractor.sampling_rate
        )
        inputs["input_features"] = mindspore.tensor(
            self.feature_extractor(
                list(resampled_audio), return_tensors="np", sampling_rate=self.feature_extractor.sampling_rate
            ).input_features
        )
        # compute the audio-text similarity score using the CLAP model
        logits_per_text = self.text_encoder(**inputs).logits_per_text
        # sort by the highest matching generations per prompt
        indices = mint.argsort(logits_per_text, dim=1, descending=True)[:, :num_waveforms_per_prompt]
        audio = mint.index_select(audio, 0, indices.reshape(-1))
        return audio

    # Copied from mindone.diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs
    def prepare_extra_step_kwargs(self, generator, eta):
        # prepare extra kwargs for the scheduler step, since not all schedulers have the same signature
        # eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.
        # eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502
        # and should be between [0, 1]

        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
        extra_step_kwargs = {}
        if accepts_eta:
            extra_step_kwargs["eta"] = eta

        # check if the scheduler accepts generator
        accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
        if accepts_generator:
            extra_step_kwargs["generator"] = generator
        return extra_step_kwargs

    def check_inputs(
        self,
        prompt,
        audio_length_in_s,
        vocoder_upsample_factor,
        callback_steps,
        transcription=None,
        negative_prompt=None,
        prompt_embeds=None,
        negative_prompt_embeds=None,
        generated_prompt_embeds=None,
        negative_generated_prompt_embeds=None,
        attention_mask=None,
        negative_attention_mask=None,
    ):
        min_audio_length_in_s = vocoder_upsample_factor * self.vae_scale_factor
        if audio_length_in_s < min_audio_length_in_s:
            raise ValueError(
                f"`audio_length_in_s` has to be a positive value greater than or equal to {min_audio_length_in_s}, but "
                f"is {audio_length_in_s}."
            )

        if self.vocoder.config.model_in_dim % self.vae_scale_factor != 0:
            raise ValueError(
                f"The number of frequency bins in the vocoder's log-mel spectrogram has to be divisible by the "
                f"VAE scale factor, but got {self.vocoder.config.model_in_dim} bins and a scale factor of "
                f"{self.vae_scale_factor}."
            )

        if (callback_steps is None) or (
            callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)
        ):
            raise ValueError(
                f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
                f" {type(callback_steps)}."
            )

        if prompt is not None and prompt_embeds is not None:
            raise ValueError(
                f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
                " only forward one of the two."
            )
        elif prompt is None and (prompt_embeds is None or generated_prompt_embeds is None):
            raise ValueError(
                "Provide either `prompt`, or `prompt_embeds` and `generated_prompt_embeds`. Cannot leave "
                "`prompt` undefined without specifying both `prompt_embeds` and `generated_prompt_embeds`."
            )
        elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
            raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")

        if negative_prompt is not None and negative_prompt_embeds is not None:
            raise ValueError(
                f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
                f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
            )
        elif negative_prompt_embeds is not None and negative_generated_prompt_embeds is None:
            raise ValueError(
                "Cannot forward `negative_prompt_embeds` without `negative_generated_prompt_embeds`. Ensure that"
                "both arguments are specified"
            )

        if prompt_embeds is not None and negative_prompt_embeds is not None:
            if prompt_embeds.shape != negative_prompt_embeds.shape:
                raise ValueError(
                    "`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
                    f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
                    f" {negative_prompt_embeds.shape}."
                )
            if attention_mask is not None and attention_mask.shape != prompt_embeds.shape[:2]:
                raise ValueError(
                    "`attention_mask should have the same batch size and sequence length as `prompt_embeds`, but got:"
                    f"`attention_mask: {attention_mask.shape} != `prompt_embeds` {prompt_embeds.shape}"
                )

        if transcription is None:
            if self.text_encoder_2.config.model_type == "vits":
                raise ValueError("Cannot forward without transcription. Please make sure to" " have transcription")
        elif transcription is not None and (not isinstance(transcription, str) and not isinstance(transcription, list)):
            raise ValueError(f"`transcription` has to be of type `str` or `list` but is {type(transcription)}")

        if generated_prompt_embeds is not None and negative_generated_prompt_embeds is not None:
            if generated_prompt_embeds.shape != negative_generated_prompt_embeds.shape:
                raise ValueError(
                    "`generated_prompt_embeds` and `negative_generated_prompt_embeds` must have the same shape when "
                    f"passed directly, but got: `generated_prompt_embeds` {generated_prompt_embeds.shape} != "
                    f"`negative_generated_prompt_embeds` {negative_generated_prompt_embeds.shape}."
                )
            if (
                negative_attention_mask is not None
                and negative_attention_mask.shape != negative_prompt_embeds.shape[:2]
            ):
                raise ValueError(
                    "`attention_mask should have the same batch size and sequence length as `prompt_embeds`, but got:"
                    f"`attention_mask: {negative_attention_mask.shape} != `prompt_embeds` {negative_prompt_embeds.shape}"
                )

    # Copied from mindone.diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents \
    # with width->self.vocoder.config.model_in_dim
    def prepare_latents(self, batch_size, num_channels_latents, height, dtype, generator, latents=None):
        shape = (
            batch_size,
            num_channels_latents,
            int(height) // self.vae_scale_factor,
            int(self.vocoder.config.model_in_dim) // self.vae_scale_factor,
        )
        if isinstance(generator, list) and len(generator) != batch_size:
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )

        if latents is None:
            latents = randn_tensor(shape, generator=generator, dtype=dtype)
        else:
            latents = latents

        # scale the initial noise by the standard deviation required by the scheduler
        latents = latents * self.scheduler.init_noise_sigma
        return latents

    def __call__(
        self,
        prompt: Union[str, List[str]] = None,
        transcription: Union[str, List[str]] = None,
        audio_length_in_s: Optional[float] = None,
        num_inference_steps: int = 200,
        guidance_scale: float = 3.5,
        negative_prompt: Optional[Union[str, List[str]]] = None,
        num_waveforms_per_prompt: Optional[int] = 1,
        eta: float = 0.0,
        generator: Optional[Union[np.random.Generator, List[np.random.Generator]]] = None,
        latents: Optional[mindspore.tensor] = None,
        prompt_embeds: Optional[mindspore.tensor] = None,
        negative_prompt_embeds: Optional[mindspore.tensor] = None,
        generated_prompt_embeds: Optional[mindspore.tensor] = None,
        negative_generated_prompt_embeds: Optional[mindspore.tensor] = None,
        attention_mask: Optional[mindspore.tensor] = None,
        negative_attention_mask: Optional[mindspore.tensor] = None,
        max_new_tokens: Optional[int] = None,
        return_dict: bool = False,
        callback: Optional[Callable[[int, int, mindspore.tensor], None]] = None,
        callback_steps: Optional[int] = 1,
        cross_attention_kwargs: Optional[Dict[str, Any]] = None,
        output_type: Optional[str] = "np",
    ):
        r"""
        The call function to the pipeline for generation.

        Args:
            prompt (`str` or `List[str]`, *optional*):
                The prompt or prompts to guide audio generation. If not defined, you need to pass `prompt_embeds`.
            transcription (`str` or `List[str]`, *optional*):\
                The transcript for text to speech.
            audio_length_in_s (`int`, *optional*, defaults to 10.24):
                The length of the generated audio sample in seconds.
            num_inference_steps (`int`, *optional*, defaults to 200):
                The number of denoising steps. More denoising steps usually lead to a higher quality audio at the
                expense of slower inference.
            guidance_scale (`float`, *optional*, defaults to 3.5):
                A higher guidance scale value encourages the model to generate audio that is closely linked to the text
                `prompt` at the expense of lower sound quality. Guidance scale is enabled when `guidance_scale > 1`.
            negative_prompt (`str` or `List[str]`, *optional*):
                The prompt or prompts to guide what to not include in audio generation. If not defined, you need to
                pass `negative_prompt_embeds` instead. Ignored when not using guidance (`guidance_scale < 1`).
            num_waveforms_per_prompt (`int`, *optional*, defaults to 1):
                The number of waveforms to generate per prompt. If `num_waveforms_per_prompt > 1`, then automatic
                scoring is performed between the generated outputs and the text prompt. This scoring ranks the
                generated waveforms based on their cosine similarity with the text input in the joint text-audio
                embedding space.
            eta (`float`, *optional*, defaults to 0.0):
                Corresponds to parameter eta (η) from the [DDIM](https://arxiv.org/abs/2010.02502) paper. Only applies
                to the [`~schedulers.DDIMScheduler`], and is ignored in other schedulers.
            generator (`np.random.Generator` or `List[np.random.Generator]`, *optional*):
                A [`np.random.Generator`](https://pytorch.org/docs/stable/generated/np.random.Generator.html) to make
                generation deterministic.
            latents (`mindspore.tensor`, *optional*):
                Pre-generated noisy latents sampled from a Gaussian distribution, to be used as inputs for spectrogram
                generation. Can be used to tweak the same generation with different prompts. If not provided, a latents
                tensor is generated by sampling using the supplied random `generator`.
            prompt_embeds (`mindspore.tensor`, *optional*):
                Pre-generated text embeddings. Can be used to easily tweak text inputs (prompt weighting). If not
                provided, text embeddings are generated from the `prompt` input argument.
            negative_prompt_embeds (`mindspore.tensor`, *optional*):
                Pre-generated negative text embeddings. Can be used to easily tweak text inputs (prompt weighting). If
                not provided, `negative_prompt_embeds` are generated from the `negative_prompt` input argument.
            generated_prompt_embeds (`mindspore.tensor`, *optional*):
                Pre-generated text embeddings from the GPT2 langauge model. Can be used to easily tweak text inputs,
                 *e.g.* prompt weighting. If not provided, text embeddings will be generated from `prompt` input
                 argument.
            negative_generated_prompt_embeds (`mindspore.tensor`, *optional*):
                Pre-generated negative text embeddings from the GPT2 language model. Can be used to easily tweak text
                inputs, *e.g.* prompt weighting. If not provided, negative_prompt_embeds will be computed from
                `negative_prompt` input argument.
            attention_mask (`mindspore.tensor`, *optional*):
                Pre-computed attention mask to be applied to the `prompt_embeds`. If not provided, attention mask will
                be computed from `prompt` input argument.
            negative_attention_mask (`mindspore.tensor`, *optional*):
                Pre-computed attention mask to be applied to the `negative_prompt_embeds`. If not provided, attention
                mask will be computed from `negative_prompt` input argument.
            max_new_tokens (`int`, *optional*, defaults to None):
                Number of new tokens to generate with the GPT2 language model. If not provided, number of tokens will
                be taken from the config of the model.
            return_dict (`bool`, *optional*, defaults to `True`):
                Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a
                plain tuple.
            callback (`Callable`, *optional*):
                A function that calls every `callback_steps` steps during inference. The function is called with the
                following arguments: `callback(step: int, timestep: int, latents: mindspore.tensor)`.
            callback_steps (`int`, *optional*, defaults to 1):
                The frequency at which the `callback` function is called. If not specified, the callback is called at
                every step.
            cross_attention_kwargs (`dict`, *optional*):
                A kwargs dictionary that if specified is passed along to the [`AttentionProcessor`] as defined in
                [`self.processor`](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/attention_processor.py).
            output_type (`str`, *optional*, defaults to `"np"`):
                The output format of the generated audio. Choose between `"np"` to return a NumPy `np.ndarray` or
                `"pt"` to return a PyTorch `mindspore.tensor` object. Set to `"latent"` to return the latent diffusion
                model (LDM) output.

        Examples:

        Returns:
            [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`:
                If `return_dict` is `True`, [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] is returned,
                otherwise a `tuple` is returned where the first element is a list with the generated audio.
        """
        # 0. Convert audio input length from seconds to spectrogram height
        vocoder_upsample_factor = np.prod(self.vocoder.config.upsample_rates) / self.vocoder.config.sampling_rate

        if audio_length_in_s is None:
            audio_length_in_s = self.unet.config.sample_size * self.vae_scale_factor * vocoder_upsample_factor

        height = int(audio_length_in_s / vocoder_upsample_factor)

        original_waveform_length = int(audio_length_in_s * self.vocoder.config.sampling_rate)
        if height % self.vae_scale_factor != 0:
            height = int(np.ceil(height / self.vae_scale_factor)) * self.vae_scale_factor
            logger.info(
                f"Audio length in seconds {audio_length_in_s} is increased to {height * vocoder_upsample_factor} "
                f"so that it can be handled by the model. It will be cut to {audio_length_in_s} after the "
                f"denoising process."
            )

        # 1. Check inputs. Raise error if not correct
        self.check_inputs(
            prompt,
            audio_length_in_s,
            vocoder_upsample_factor,
            callback_steps,
            transcription,
            negative_prompt,
            prompt_embeds,
            negative_prompt_embeds,
            generated_prompt_embeds,
            negative_generated_prompt_embeds,
            attention_mask,
            negative_attention_mask,
        )

        # 2. Define call parameters
        if prompt is not None and isinstance(prompt, str):
            batch_size = 1
        elif prompt is not None and isinstance(prompt, list):
            batch_size = len(prompt)
        else:
            batch_size = prompt_embeds.shape[0]

        # here `guidance_scale` is defined analog to the guidance weight `w` of equation (2)
        # of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1`
        # corresponds to doing no classifier free guidance.
        do_classifier_free_guidance = guidance_scale > 1.0

        # 3. Encode input prompt
        prompt_embeds, attention_mask, generated_prompt_embeds = self.encode_prompt(
            prompt,
            num_waveforms_per_prompt,
            do_classifier_free_guidance,
            transcription,
            negative_prompt,
            prompt_embeds=prompt_embeds,
            negative_prompt_embeds=negative_prompt_embeds,
            generated_prompt_embeds=generated_prompt_embeds,
            negative_generated_prompt_embeds=negative_generated_prompt_embeds,
            attention_mask=attention_mask,
            negative_attention_mask=negative_attention_mask,
            max_new_tokens=max_new_tokens,
        )

        # 4. Prepare timesteps
        self.scheduler.set_timesteps(num_inference_steps)
        timesteps = self.scheduler.timesteps

        # 5. Prepare latent variables
        num_channels_latents = self.unet.config.in_channels
        latents = self.prepare_latents(
            batch_size * num_waveforms_per_prompt,
            num_channels_latents,
            height,
            prompt_embeds.dtype,
            generator,
            latents,
        )

        # 6. Prepare extra step kwargs
        extra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta)

        # 7. Denoising loop
        num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order
        with self.progress_bar(total=num_inference_steps) as progress_bar:
            for i, t in enumerate(timesteps):
                # expand the latents if we are doing classifier free guidance
                latent_model_input = mint.cat([latents] * 2) if do_classifier_free_guidance else latents
                latent_model_input = self.scheduler.scale_model_input(latent_model_input, t)

                # predict the noise residual
                noise_pred = self.unet(
                    latent_model_input,
                    t,
                    encoder_hidden_states=generated_prompt_embeds,
                    encoder_hidden_states_1=prompt_embeds,
                    encoder_attention_mask_1=attention_mask,
                    return_dict=False,
                )[0]

                # perform guidance
                if do_classifier_free_guidance:
                    noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
                    noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

                # compute the previous noisy sample x_t -> x_t-1
                latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs)[0]

                # call the callback, if provided
                if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0):
                    progress_bar.update()
                    if callback is not None and i % callback_steps == 0:
                        step_idx = i // getattr(self.scheduler, "order", 1)
                        callback(step_idx, t, latents)

        # 8. Post-processing
        if not output_type == "latent":
            latents = 1 / self.vae.config.scaling_factor * latents
            mel_spectrogram = self.vae.decode(latents)[0]
        else:
            return AudioPipelineOutput(audios=latents)

        audio = self.mel_spectrogram_to_waveform(mel_spectrogram)

        audio = audio[:, :original_waveform_length]

        # 9. Automatic scoring
        if num_waveforms_per_prompt > 1 and prompt is not None:
            audio = self.score_waveforms(
                text=prompt,
                audio=audio,
                num_waveforms_per_prompt=num_waveforms_per_prompt,
                dtype=prompt_embeds.dtype,
            )

        if output_type == "np":
            audio = audio.numpy()

        if not return_dict:
            return (audio,)

        return AudioPipelineOutput(audios=audio)

mindone.diffusers.AudioLDM2Pipeline.__call__(prompt=None, transcription=None, audio_length_in_s=None, num_inference_steps=200, guidance_scale=3.5, negative_prompt=None, num_waveforms_per_prompt=1, eta=0.0, generator=None, latents=None, prompt_embeds=None, negative_prompt_embeds=None, generated_prompt_embeds=None, negative_generated_prompt_embeds=None, attention_mask=None, negative_attention_mask=None, max_new_tokens=None, return_dict=False, callback=None, callback_steps=1, cross_attention_kwargs=None, output_type='np')

The call function to the pipeline for generation.

PARAMETER DESCRIPTION
prompt

The prompt or prompts to guide audio generation. If not defined, you need to pass prompt_embeds.

TYPE: `str` or `List[str]`, *optional* DEFAULT: None

transcription

\ The transcript for text to speech.

TYPE: `str` or `List[str]`, *optional* DEFAULT: None

audio_length_in_s

The length of the generated audio sample in seconds.

TYPE: `int`, *optional*, defaults to 10.24 DEFAULT: None

num_inference_steps

The number of denoising steps. More denoising steps usually lead to a higher quality audio at the expense of slower inference.

TYPE: `int`, *optional*, defaults to 200 DEFAULT: 200

guidance_scale

A higher guidance scale value encourages the model to generate audio that is closely linked to the text prompt at the expense of lower sound quality. Guidance scale is enabled when guidance_scale > 1.

TYPE: `float`, *optional*, defaults to 3.5 DEFAULT: 3.5

negative_prompt

The prompt or prompts to guide what to not include in audio generation. If not defined, you need to pass negative_prompt_embeds instead. Ignored when not using guidance (guidance_scale < 1).

TYPE: `str` or `List[str]`, *optional* DEFAULT: None

num_waveforms_per_prompt

The number of waveforms to generate per prompt. If num_waveforms_per_prompt > 1, then automatic scoring is performed between the generated outputs and the text prompt. This scoring ranks the generated waveforms based on their cosine similarity with the text input in the joint text-audio embedding space.

TYPE: `int`, *optional*, defaults to 1 DEFAULT: 1

eta

Corresponds to parameter eta (η) from the DDIM paper. Only applies to the [~schedulers.DDIMScheduler], and is ignored in other schedulers.

TYPE: `float`, *optional*, defaults to 0.0 DEFAULT: 0.0

generator

A np.random.Generator to make generation deterministic.

TYPE: `np.random.Generator` or `List[np.random.Generator]`, *optional* DEFAULT: None

latents

Pre-generated noisy latents sampled from a Gaussian distribution, to be used as inputs for spectrogram generation. Can be used to tweak the same generation with different prompts. If not provided, a latents tensor is generated by sampling using the supplied random generator.

TYPE: `mindspore.tensor`, *optional* DEFAULT: None

prompt_embeds

Pre-generated text embeddings. Can be used to easily tweak text inputs (prompt weighting). If not provided, text embeddings are generated from the prompt input argument.

TYPE: `mindspore.tensor`, *optional* DEFAULT: None

negative_prompt_embeds

Pre-generated negative text embeddings. Can be used to easily tweak text inputs (prompt weighting). If not provided, negative_prompt_embeds are generated from the negative_prompt input argument.

TYPE: `mindspore.tensor`, *optional* DEFAULT: None

generated_prompt_embeds

Pre-generated text embeddings from the GPT2 langauge model. Can be used to easily tweak text inputs, e.g. prompt weighting. If not provided, text embeddings will be generated from prompt input argument.

TYPE: `mindspore.tensor`, *optional* DEFAULT: None

negative_generated_prompt_embeds

Pre-generated negative text embeddings from the GPT2 language model. Can be used to easily tweak text inputs, e.g. prompt weighting. If not provided, negative_prompt_embeds will be computed from negative_prompt input argument.

TYPE: `mindspore.tensor`, *optional* DEFAULT: None

attention_mask

Pre-computed attention mask to be applied to the prompt_embeds. If not provided, attention mask will be computed from prompt input argument.

TYPE: `mindspore.tensor`, *optional* DEFAULT: None

negative_attention_mask

Pre-computed attention mask to be applied to the negative_prompt_embeds. If not provided, attention mask will be computed from negative_prompt input argument.

TYPE: `mindspore.tensor`, *optional* DEFAULT: None

max_new_tokens

Number of new tokens to generate with the GPT2 language model. If not provided, number of tokens will be taken from the config of the model.

TYPE: `int`, *optional*, defaults to None DEFAULT: None

return_dict

Whether or not to return a [~pipelines.stable_diffusion.StableDiffusionPipelineOutput] instead of a plain tuple.

TYPE: `bool`, *optional*, defaults to `True` DEFAULT: False

callback

A function that calls every callback_steps steps during inference. The function is called with the following arguments: callback(step: int, timestep: int, latents: mindspore.tensor).

TYPE: `Callable`, *optional* DEFAULT: None

callback_steps

The frequency at which the callback function is called. If not specified, the callback is called at every step.

TYPE: `int`, *optional*, defaults to 1 DEFAULT: 1

cross_attention_kwargs

A kwargs dictionary that if specified is passed along to the [AttentionProcessor] as defined in self.processor.

TYPE: `dict`, *optional* DEFAULT: None

output_type

The output format of the generated audio. Choose between "np" to return a NumPy np.ndarray or "pt" to return a PyTorch mindspore.tensor object. Set to "latent" to return the latent diffusion model (LDM) output.

TYPE: `str`, *optional*, defaults to `"np"` DEFAULT: 'np'

RETURNS DESCRIPTION

[~pipelines.stable_diffusion.StableDiffusionPipelineOutput] or tuple: If return_dict is True, [~pipelines.stable_diffusion.StableDiffusionPipelineOutput] is returned, otherwise a tuple is returned where the first element is a list with the generated audio.

Source code in mindone/diffusers/pipelines/audioldm2/pipeline_audioldm2.py
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
def __call__(
    self,
    prompt: Union[str, List[str]] = None,
    transcription: Union[str, List[str]] = None,
    audio_length_in_s: Optional[float] = None,
    num_inference_steps: int = 200,
    guidance_scale: float = 3.5,
    negative_prompt: Optional[Union[str, List[str]]] = None,
    num_waveforms_per_prompt: Optional[int] = 1,
    eta: float = 0.0,
    generator: Optional[Union[np.random.Generator, List[np.random.Generator]]] = None,
    latents: Optional[mindspore.tensor] = None,
    prompt_embeds: Optional[mindspore.tensor] = None,
    negative_prompt_embeds: Optional[mindspore.tensor] = None,
    generated_prompt_embeds: Optional[mindspore.tensor] = None,
    negative_generated_prompt_embeds: Optional[mindspore.tensor] = None,
    attention_mask: Optional[mindspore.tensor] = None,
    negative_attention_mask: Optional[mindspore.tensor] = None,
    max_new_tokens: Optional[int] = None,
    return_dict: bool = False,
    callback: Optional[Callable[[int, int, mindspore.tensor], None]] = None,
    callback_steps: Optional[int] = 1,
    cross_attention_kwargs: Optional[Dict[str, Any]] = None,
    output_type: Optional[str] = "np",
):
    r"""
    The call function to the pipeline for generation.

    Args:
        prompt (`str` or `List[str]`, *optional*):
            The prompt or prompts to guide audio generation. If not defined, you need to pass `prompt_embeds`.
        transcription (`str` or `List[str]`, *optional*):\
            The transcript for text to speech.
        audio_length_in_s (`int`, *optional*, defaults to 10.24):
            The length of the generated audio sample in seconds.
        num_inference_steps (`int`, *optional*, defaults to 200):
            The number of denoising steps. More denoising steps usually lead to a higher quality audio at the
            expense of slower inference.
        guidance_scale (`float`, *optional*, defaults to 3.5):
            A higher guidance scale value encourages the model to generate audio that is closely linked to the text
            `prompt` at the expense of lower sound quality. Guidance scale is enabled when `guidance_scale > 1`.
        negative_prompt (`str` or `List[str]`, *optional*):
            The prompt or prompts to guide what to not include in audio generation. If not defined, you need to
            pass `negative_prompt_embeds` instead. Ignored when not using guidance (`guidance_scale < 1`).
        num_waveforms_per_prompt (`int`, *optional*, defaults to 1):
            The number of waveforms to generate per prompt. If `num_waveforms_per_prompt > 1`, then automatic
            scoring is performed between the generated outputs and the text prompt. This scoring ranks the
            generated waveforms based on their cosine similarity with the text input in the joint text-audio
            embedding space.
        eta (`float`, *optional*, defaults to 0.0):
            Corresponds to parameter eta (η) from the [DDIM](https://arxiv.org/abs/2010.02502) paper. Only applies
            to the [`~schedulers.DDIMScheduler`], and is ignored in other schedulers.
        generator (`np.random.Generator` or `List[np.random.Generator]`, *optional*):
            A [`np.random.Generator`](https://pytorch.org/docs/stable/generated/np.random.Generator.html) to make
            generation deterministic.
        latents (`mindspore.tensor`, *optional*):
            Pre-generated noisy latents sampled from a Gaussian distribution, to be used as inputs for spectrogram
            generation. Can be used to tweak the same generation with different prompts. If not provided, a latents
            tensor is generated by sampling using the supplied random `generator`.
        prompt_embeds (`mindspore.tensor`, *optional*):
            Pre-generated text embeddings. Can be used to easily tweak text inputs (prompt weighting). If not
            provided, text embeddings are generated from the `prompt` input argument.
        negative_prompt_embeds (`mindspore.tensor`, *optional*):
            Pre-generated negative text embeddings. Can be used to easily tweak text inputs (prompt weighting). If
            not provided, `negative_prompt_embeds` are generated from the `negative_prompt` input argument.
        generated_prompt_embeds (`mindspore.tensor`, *optional*):
            Pre-generated text embeddings from the GPT2 langauge model. Can be used to easily tweak text inputs,
             *e.g.* prompt weighting. If not provided, text embeddings will be generated from `prompt` input
             argument.
        negative_generated_prompt_embeds (`mindspore.tensor`, *optional*):
            Pre-generated negative text embeddings from the GPT2 language model. Can be used to easily tweak text
            inputs, *e.g.* prompt weighting. If not provided, negative_prompt_embeds will be computed from
            `negative_prompt` input argument.
        attention_mask (`mindspore.tensor`, *optional*):
            Pre-computed attention mask to be applied to the `prompt_embeds`. If not provided, attention mask will
            be computed from `prompt` input argument.
        negative_attention_mask (`mindspore.tensor`, *optional*):
            Pre-computed attention mask to be applied to the `negative_prompt_embeds`. If not provided, attention
            mask will be computed from `negative_prompt` input argument.
        max_new_tokens (`int`, *optional*, defaults to None):
            Number of new tokens to generate with the GPT2 language model. If not provided, number of tokens will
            be taken from the config of the model.
        return_dict (`bool`, *optional*, defaults to `True`):
            Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a
            plain tuple.
        callback (`Callable`, *optional*):
            A function that calls every `callback_steps` steps during inference. The function is called with the
            following arguments: `callback(step: int, timestep: int, latents: mindspore.tensor)`.
        callback_steps (`int`, *optional*, defaults to 1):
            The frequency at which the `callback` function is called. If not specified, the callback is called at
            every step.
        cross_attention_kwargs (`dict`, *optional*):
            A kwargs dictionary that if specified is passed along to the [`AttentionProcessor`] as defined in
            [`self.processor`](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/attention_processor.py).
        output_type (`str`, *optional*, defaults to `"np"`):
            The output format of the generated audio. Choose between `"np"` to return a NumPy `np.ndarray` or
            `"pt"` to return a PyTorch `mindspore.tensor` object. Set to `"latent"` to return the latent diffusion
            model (LDM) output.

    Examples:

    Returns:
        [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`:
            If `return_dict` is `True`, [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] is returned,
            otherwise a `tuple` is returned where the first element is a list with the generated audio.
    """
    # 0. Convert audio input length from seconds to spectrogram height
    vocoder_upsample_factor = np.prod(self.vocoder.config.upsample_rates) / self.vocoder.config.sampling_rate

    if audio_length_in_s is None:
        audio_length_in_s = self.unet.config.sample_size * self.vae_scale_factor * vocoder_upsample_factor

    height = int(audio_length_in_s / vocoder_upsample_factor)

    original_waveform_length = int(audio_length_in_s * self.vocoder.config.sampling_rate)
    if height % self.vae_scale_factor != 0:
        height = int(np.ceil(height / self.vae_scale_factor)) * self.vae_scale_factor
        logger.info(
            f"Audio length in seconds {audio_length_in_s} is increased to {height * vocoder_upsample_factor} "
            f"so that it can be handled by the model. It will be cut to {audio_length_in_s} after the "
            f"denoising process."
        )

    # 1. Check inputs. Raise error if not correct
    self.check_inputs(
        prompt,
        audio_length_in_s,
        vocoder_upsample_factor,
        callback_steps,
        transcription,
        negative_prompt,
        prompt_embeds,
        negative_prompt_embeds,
        generated_prompt_embeds,
        negative_generated_prompt_embeds,
        attention_mask,
        negative_attention_mask,
    )

    # 2. Define call parameters
    if prompt is not None and isinstance(prompt, str):
        batch_size = 1
    elif prompt is not None and isinstance(prompt, list):
        batch_size = len(prompt)
    else:
        batch_size = prompt_embeds.shape[0]

    # here `guidance_scale` is defined analog to the guidance weight `w` of equation (2)
    # of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1`
    # corresponds to doing no classifier free guidance.
    do_classifier_free_guidance = guidance_scale > 1.0

    # 3. Encode input prompt
    prompt_embeds, attention_mask, generated_prompt_embeds = self.encode_prompt(
        prompt,
        num_waveforms_per_prompt,
        do_classifier_free_guidance,
        transcription,
        negative_prompt,
        prompt_embeds=prompt_embeds,
        negative_prompt_embeds=negative_prompt_embeds,
        generated_prompt_embeds=generated_prompt_embeds,
        negative_generated_prompt_embeds=negative_generated_prompt_embeds,
        attention_mask=attention_mask,
        negative_attention_mask=negative_attention_mask,
        max_new_tokens=max_new_tokens,
    )

    # 4. Prepare timesteps
    self.scheduler.set_timesteps(num_inference_steps)
    timesteps = self.scheduler.timesteps

    # 5. Prepare latent variables
    num_channels_latents = self.unet.config.in_channels
    latents = self.prepare_latents(
        batch_size * num_waveforms_per_prompt,
        num_channels_latents,
        height,
        prompt_embeds.dtype,
        generator,
        latents,
    )

    # 6. Prepare extra step kwargs
    extra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta)

    # 7. Denoising loop
    num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order
    with self.progress_bar(total=num_inference_steps) as progress_bar:
        for i, t in enumerate(timesteps):
            # expand the latents if we are doing classifier free guidance
            latent_model_input = mint.cat([latents] * 2) if do_classifier_free_guidance else latents
            latent_model_input = self.scheduler.scale_model_input(latent_model_input, t)

            # predict the noise residual
            noise_pred = self.unet(
                latent_model_input,
                t,
                encoder_hidden_states=generated_prompt_embeds,
                encoder_hidden_states_1=prompt_embeds,
                encoder_attention_mask_1=attention_mask,
                return_dict=False,
            )[0]

            # perform guidance
            if do_classifier_free_guidance:
                noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
                noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

            # compute the previous noisy sample x_t -> x_t-1
            latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs)[0]

            # call the callback, if provided
            if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0):
                progress_bar.update()
                if callback is not None and i % callback_steps == 0:
                    step_idx = i // getattr(self.scheduler, "order", 1)
                    callback(step_idx, t, latents)

    # 8. Post-processing
    if not output_type == "latent":
        latents = 1 / self.vae.config.scaling_factor * latents
        mel_spectrogram = self.vae.decode(latents)[0]
    else:
        return AudioPipelineOutput(audios=latents)

    audio = self.mel_spectrogram_to_waveform(mel_spectrogram)

    audio = audio[:, :original_waveform_length]

    # 9. Automatic scoring
    if num_waveforms_per_prompt > 1 and prompt is not None:
        audio = self.score_waveforms(
            text=prompt,
            audio=audio,
            num_waveforms_per_prompt=num_waveforms_per_prompt,
            dtype=prompt_embeds.dtype,
        )

    if output_type == "np":
        audio = audio.numpy()

    if not return_dict:
        return (audio,)

    return AudioPipelineOutput(audios=audio)

mindone.diffusers.AudioLDM2Pipeline.disable_vae_slicing()

Disable sliced VAE decoding. If enable_vae_slicing was previously enabled, this method will go back to computing decoding in one step.

Source code in mindone/diffusers/pipelines/audioldm2/pipeline_audioldm2.py
202
203
204
205
206
207
def disable_vae_slicing(self):
    r"""
    Disable sliced VAE decoding. If `enable_vae_slicing` was previously enabled, this method will go back to
    computing decoding in one step.
    """
    self.vae.disable_slicing()

mindone.diffusers.AudioLDM2Pipeline.enable_model_cpu_offload()

Offloads all models to CPU using accelerate, reducing memory usage with a low impact on performance. Compared to enable_sequential_cpu_offload, this method moves one whole model at a time to the GPU when its forward method is called, and the model remains in GPU until the next model runs. Memory savings are lower than with enable_sequential_cpu_offload, but performance is much better due to the iterative execution of the unet.

Source code in mindone/diffusers/pipelines/audioldm2/pipeline_audioldm2.py
209
210
211
212
213
214
215
216
217
218
219
def enable_model_cpu_offload(self):
    r"""
    Offloads all models to CPU using accelerate, reducing memory usage with a low impact on performance. Compared
    to `enable_sequential_cpu_offload`, this method moves one whole model at a time to the GPU when its `forward`
    method is called, and the model remains in GPU until the next model runs. Memory savings are lower than with
    `enable_sequential_cpu_offload`, but performance is much better due to the iterative execution of the `unet`.
    """
    hook = None

    # We'll offload the last model manually.
    self.final_offload_hook = hook

mindone.diffusers.AudioLDM2Pipeline.enable_vae_slicing()

Enable sliced VAE decoding. When this option is enabled, the VAE will split the input tensor in slices to compute decoding in several steps. This is useful to save some memory and allow larger batch sizes.

Source code in mindone/diffusers/pipelines/audioldm2/pipeline_audioldm2.py
194
195
196
197
198
199
def enable_vae_slicing(self):
    r"""
    Enable sliced VAE decoding. When this option is enabled, the VAE will split the input tensor in slices to
    compute decoding in several steps. This is useful to save some memory and allow larger batch sizes.
    """
    self.vae.enable_slicing()

mindone.diffusers.AudioLDM2Pipeline.encode_prompt(prompt, num_waveforms_per_prompt, do_classifier_free_guidance, transcription=None, negative_prompt=None, prompt_embeds=None, negative_prompt_embeds=None, generated_prompt_embeds=None, negative_generated_prompt_embeds=None, attention_mask=None, negative_attention_mask=None, max_new_tokens=None)

Encodes the prompt into text encoder hidden states.

PARAMETER DESCRIPTION
prompt

prompt to be encoded

TYPE: `str` or `List[str]`, *optional*

transcription

transcription of text to speech

TYPE: `str` or `List[str]` DEFAULT: None

num_waveforms_per_prompt

number of waveforms that should be generated per prompt

TYPE: `int`

do_classifier_free_guidance

whether to use classifier free guidance or not

TYPE: `bool`

negative_prompt

The prompt or prompts not to guide the audio generation. If not defined, one has to pass negative_prompt_embeds instead. Ignored when not using guidance (i.e., ignored if guidance_scale is less than 1).

TYPE: `str` or `List[str]`, *optional* DEFAULT: None

prompt_embeds

Pre-computed text embeddings from the Flan T5 model. Can be used to easily tweak text inputs, e.g. prompt weighting. If not provided, text embeddings will be computed from prompt input argument.

TYPE: `mindspore.tensor`, *optional* DEFAULT: None

negative_prompt_embeds

Pre-computed negative text embeddings from the Flan T5 model. Can be used to easily tweak text inputs, e.g. prompt weighting. If not provided, negative_prompt_embeds will be computed from negative_prompt input argument.

TYPE: `mindspore.tensor`, *optional* DEFAULT: None

generated_prompt_embeds

Pre-generated text embeddings from the GPT2 langauge model. Can be used to easily tweak text inputs, e.g. prompt weighting. If not provided, text embeddings will be generated from prompt input argument.

TYPE: `mindspore.tensor`, *optional* DEFAULT: None

negative_generated_prompt_embeds

Pre-generated negative text embeddings from the GPT2 language model. Can be used to easily tweak text inputs, e.g. prompt weighting. If not provided, negative_prompt_embeds will be computed from negative_prompt input argument.

TYPE: `mindspore.tensor`, *optional* DEFAULT: None

attention_mask

Pre-computed attention mask to be applied to the prompt_embeds. If not provided, attention mask will be computed from prompt input argument.

TYPE: `mindspore.tensor`, *optional* DEFAULT: None

negative_attention_mask

Pre-computed attention mask to be applied to the negative_prompt_embeds. If not provided, attention mask will be computed from negative_prompt input argument.

TYPE: `mindspore.tensor`, *optional* DEFAULT: None

max_new_tokens

The number of new tokens to generate with the GPT2 language model.

TYPE: `int`, *optional*, defaults to None DEFAULT: None

Example:

>>> import scipy
>>> from mindone.diffusers import AudioLDM2Pipeline

>>> repo_id = "cvssp/audioldm2"
>>> pipe = AudioLDM2Pipeline.from_pretrained(repo_id, torch_dtype=mindspore.float16)


>>> # Get text embedding vectors
>>> prompt_embeds, attention_mask, generated_prompt_embeds = pipe.encode_prompt(
...     prompt="Techno music with a strong, upbeat tempo and high melodic riffs",
...     do_classifier_free_guidance=True,
... )

>>> # Pass text embeddings to pipeline for text-conditional audio generation
>>> audio = pipe(
...     prompt_embeds=prompt_embeds,
...     attention_mask=attention_mask,
...     generated_prompt_embeds=generated_prompt_embeds,
...     num_inference_steps=200,
...     audio_length_in_s=10.0,
... ).audios[0]

>>> # save generated audio sample
>>> scipy.io.wavfile.write("techno.wav", rate=16000, data=audio)
Source code in mindone/diffusers/pipelines/audioldm2/pipeline_audioldm2.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
def encode_prompt(
    self,
    prompt,
    num_waveforms_per_prompt,
    do_classifier_free_guidance,
    transcription=None,
    negative_prompt=None,
    prompt_embeds: Optional[mindspore.tensor] = None,
    negative_prompt_embeds: Optional[mindspore.tensor] = None,
    generated_prompt_embeds: Optional[mindspore.tensor] = None,
    negative_generated_prompt_embeds: Optional[mindspore.tensor] = None,
    attention_mask: Optional[mindspore.tensor] = None,
    negative_attention_mask: Optional[mindspore.tensor] = None,
    max_new_tokens: Optional[int] = None,
):
    r"""
    Encodes the prompt into text encoder hidden states.

    Args:
        prompt (`str` or `List[str]`, *optional*):
            prompt to be encoded
        transcription (`str` or `List[str]`):
            transcription of text to speech
        num_waveforms_per_prompt (`int`):
            number of waveforms that should be generated per prompt
        do_classifier_free_guidance (`bool`):
            whether to use classifier free guidance or not
        negative_prompt (`str` or `List[str]`, *optional*):
            The prompt or prompts not to guide the audio generation. If not defined, one has to pass
            `negative_prompt_embeds` instead. Ignored when not using guidance (i.e., ignored if `guidance_scale` is
            less than `1`).
        prompt_embeds (`mindspore.tensor`, *optional*):
            Pre-computed text embeddings from the Flan T5 model. Can be used to easily tweak text inputs, *e.g.*
            prompt weighting. If not provided, text embeddings will be computed from `prompt` input argument.
        negative_prompt_embeds (`mindspore.tensor`, *optional*):
            Pre-computed negative text embeddings from the Flan T5 model. Can be used to easily tweak text inputs,
            *e.g.* prompt weighting. If not provided, negative_prompt_embeds will be computed from
            `negative_prompt` input argument.
        generated_prompt_embeds (`mindspore.tensor`, *optional*):
            Pre-generated text embeddings from the GPT2 langauge model. Can be used to easily tweak text inputs,
             *e.g.* prompt weighting. If not provided, text embeddings will be generated from `prompt` input
             argument.
        negative_generated_prompt_embeds (`mindspore.tensor`, *optional*):
            Pre-generated negative text embeddings from the GPT2 language model. Can be used to easily tweak text
            inputs, *e.g.* prompt weighting. If not provided, negative_prompt_embeds will be computed from
            `negative_prompt` input argument.
        attention_mask (`mindspore.tensor`, *optional*):
            Pre-computed attention mask to be applied to the `prompt_embeds`. If not provided, attention mask will
            be computed from `prompt` input argument.
        negative_attention_mask (`mindspore.tensor`, *optional*):
            Pre-computed attention mask to be applied to the `negative_prompt_embeds`. If not provided, attention
            mask will be computed from `negative_prompt` input argument.
        max_new_tokens (`int`, *optional*, defaults to None):
            The number of new tokens to generate with the GPT2 language model.
    Returns:
        prompt_embeds (`mindspore.tensor`):
            Text embeddings from the Flan T5 model.
        attention_mask (`mindspore.tensor`):
            Attention mask to be applied to the `prompt_embeds`.
        generated_prompt_embeds (`mindspore.tensor`):
            Text embeddings generated from the GPT2 langauge model.

    Example:

    ```python
    >>> import scipy
    >>> from mindone.diffusers import AudioLDM2Pipeline

    >>> repo_id = "cvssp/audioldm2"
    >>> pipe = AudioLDM2Pipeline.from_pretrained(repo_id, torch_dtype=mindspore.float16)


    >>> # Get text embedding vectors
    >>> prompt_embeds, attention_mask, generated_prompt_embeds = pipe.encode_prompt(
    ...     prompt="Techno music with a strong, upbeat tempo and high melodic riffs",
    ...     do_classifier_free_guidance=True,
    ... )

    >>> # Pass text embeddings to pipeline for text-conditional audio generation
    >>> audio = pipe(
    ...     prompt_embeds=prompt_embeds,
    ...     attention_mask=attention_mask,
    ...     generated_prompt_embeds=generated_prompt_embeds,
    ...     num_inference_steps=200,
    ...     audio_length_in_s=10.0,
    ... ).audios[0]

    >>> # save generated audio sample
    >>> scipy.io.wavfile.write("techno.wav", rate=16000, data=audio)
    ```"""
    if prompt is not None and isinstance(prompt, str):
        batch_size = 1
    elif prompt is not None and isinstance(prompt, list):
        batch_size = len(prompt)
    else:
        batch_size = prompt_embeds.shape[0]

    # Define tokenizers and text encoders
    tokenizers = [self.tokenizer, self.tokenizer_2]
    is_vits_text_encoder = isinstance(self.text_encoder_2, VitsModel)

    if is_vits_text_encoder:
        text_encoders = [self.text_encoder, self.text_encoder_2.text_encoder]
    else:
        text_encoders = [self.text_encoder, self.text_encoder_2]

    if prompt_embeds is None:
        prompt_embeds_list = []
        attention_mask_list = []

        for tokenizer, text_encoder in zip(tokenizers, text_encoders):
            use_prompt = isinstance(
                tokenizer, (RobertaTokenizer, RobertaTokenizerFast, T5Tokenizer, T5TokenizerFast)
            )
            text_inputs = tokenizer(
                prompt if use_prompt else transcription,
                padding="max_length"
                if isinstance(tokenizer, (RobertaTokenizer, RobertaTokenizerFast, VitsTokenizer))
                else True,
                max_length=tokenizer.model_max_length,
                truncation=True,
                return_tensors="np",
            )
            text_input_ids = mindspore.tensor(text_inputs.input_ids)
            attention_mask = mindspore.tensor(text_inputs.attention_mask)
            untruncated_ids = mindspore.tensor(tokenizer(prompt, padding="longest", return_tensors="np").input_ids)

            if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not mint.equal(
                text_input_ids, untruncated_ids
            ):
                removed_text = tokenizer.batch_decode(untruncated_ids[:, tokenizer.model_max_length - 1 : -1])
                logger.warning(
                    f"The following part of your input was truncated because {text_encoder.config.model_type} can "
                    f"only handle sequences up to {tokenizer.model_max_length} tokens: {removed_text}"
                )

            text_input_ids = text_input_ids
            attention_mask = attention_mask

            if text_encoder.config.model_type == "clap":
                prompt_embeds = text_encoder.get_text_features(
                    text_input_ids,
                    attention_mask=attention_mask,
                )
                # append the seq-len dim: (bs, hidden_size) -> (bs, seq_len, hidden_size)
                prompt_embeds = prompt_embeds[:, None, :]
                # make sure that we attend to this single hidden-state
                attention_mask = attention_mask.new_ones((batch_size, 1))
            elif is_vits_text_encoder:
                # Add end_token_id and attention mask in the end of sequence phonemes
                for text_input_id, text_attention_mask in zip(text_input_ids, attention_mask):
                    for idx, phoneme_id in enumerate(text_input_id):
                        if phoneme_id == 0:
                            text_input_id[idx] = 182
                            text_attention_mask[idx] = 1
                            break
                prompt_embeds = text_encoder(
                    text_input_ids, attention_mask=attention_mask, padding_mask=attention_mask.unsqueeze(-1)
                )
                prompt_embeds = prompt_embeds[0]
            else:
                prompt_embeds = text_encoder(
                    text_input_ids,
                    attention_mask=attention_mask,
                )
                prompt_embeds = prompt_embeds[0]

            prompt_embeds_list.append(prompt_embeds)
            attention_mask_list.append(attention_mask)

        projection_output = self.projection_model(
            hidden_states=prompt_embeds_list[0],
            hidden_states_1=prompt_embeds_list[1],
            attention_mask=attention_mask_list[0],
            attention_mask_1=attention_mask_list[1],
        )
        projected_prompt_embeds = projection_output.hidden_states
        projected_attention_mask = projection_output.attention_mask

        generated_prompt_embeds = self.generate_language_model(
            projected_prompt_embeds,
            attention_mask=projected_attention_mask,
            max_new_tokens=max_new_tokens,
        )

    prompt_embeds = prompt_embeds.to(dtype=self.text_encoder_2.dtype)
    attention_mask = (
        attention_mask if attention_mask is not None else mint.ones(prompt_embeds.shape[:2], dtype=mindspore.int64)
    )
    generated_prompt_embeds = generated_prompt_embeds.to(dtype=self.language_model.dtype)

    bs_embed, seq_len, hidden_size = prompt_embeds.shape
    # duplicate text embeddings for each generation per prompt, using mps friendly method
    prompt_embeds = prompt_embeds.tile((1, num_waveforms_per_prompt, 1))
    prompt_embeds = prompt_embeds.view(bs_embed * num_waveforms_per_prompt, seq_len, hidden_size)

    # duplicate attention mask for each generation per prompt
    attention_mask = attention_mask.tile((1, num_waveforms_per_prompt))
    attention_mask = attention_mask.view(bs_embed * num_waveforms_per_prompt, seq_len)

    bs_embed, seq_len, hidden_size = generated_prompt_embeds.shape
    # duplicate generated embeddings for each generation per prompt, using mps friendly method
    generated_prompt_embeds = generated_prompt_embeds.tile((1, num_waveforms_per_prompt, 1))
    generated_prompt_embeds = generated_prompt_embeds.view(
        bs_embed * num_waveforms_per_prompt, seq_len, hidden_size
    )

    # get unconditional embeddings for classifier free guidance
    if do_classifier_free_guidance and negative_prompt_embeds is None:
        uncond_tokens: List[str]
        if negative_prompt is None:
            uncond_tokens = [""] * batch_size
        elif type(prompt) is not type(negative_prompt):
            raise TypeError(
                f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !="
                f" {type(prompt)}."
            )
        elif isinstance(negative_prompt, str):
            uncond_tokens = [negative_prompt]
        elif batch_size != len(negative_prompt):
            raise ValueError(
                f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:"
                f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches"
                " the batch size of `prompt`."
            )
        else:
            uncond_tokens = negative_prompt

        negative_prompt_embeds_list = []
        negative_attention_mask_list = []
        max_length = prompt_embeds.shape[1]
        for tokenizer, text_encoder in zip(tokenizers, text_encoders):
            uncond_input = tokenizer(
                uncond_tokens,
                padding="max_length",
                max_length=tokenizer.model_max_length
                if isinstance(tokenizer, (RobertaTokenizer, RobertaTokenizerFast, VitsTokenizer))
                else max_length,
                truncation=True,
                return_tensors="np",
            )

            uncond_input_ids = mindspore.tensor(uncond_input.input_ids)
            negative_attention_mask = mindspore.tensor(uncond_input.attention_mask)

            if text_encoder.config.model_type == "clap":
                negative_prompt_embeds = text_encoder.get_text_features(
                    uncond_input_ids,
                    attention_mask=negative_attention_mask,
                )
                # append the seq-len dim: (bs, hidden_size) -> (bs, seq_len, hidden_size)
                negative_prompt_embeds = negative_prompt_embeds[:, None, :]
                # make sure that we attend to this single hidden-state
                negative_attention_mask = negative_attention_mask.new_ones((batch_size, 1))
            elif is_vits_text_encoder:
                negative_prompt_embeds = mint.zeros(
                    batch_size,
                    tokenizer.model_max_length,
                    text_encoder.config.hidden_size,
                ).to(dtype=self.text_encoder_2.dtype)
                negative_attention_mask = mint.zeros(batch_size, tokenizer.model_max_length).to(
                    dtype=self.text_encoder_2.dtype
                )
            else:
                negative_prompt_embeds = text_encoder(
                    uncond_input_ids,
                    attention_mask=negative_attention_mask,
                )
                negative_prompt_embeds = negative_prompt_embeds[0]

            negative_prompt_embeds_list.append(negative_prompt_embeds)
            negative_attention_mask_list.append(negative_attention_mask)

        projection_output = self.projection_model(
            hidden_states=negative_prompt_embeds_list[0],
            hidden_states_1=negative_prompt_embeds_list[1],
            attention_mask=negative_attention_mask_list[0],
            attention_mask_1=negative_attention_mask_list[1],
        )
        negative_projected_prompt_embeds = projection_output.hidden_states
        negative_projected_attention_mask = projection_output.attention_mask

        negative_generated_prompt_embeds = self.generate_language_model(
            negative_projected_prompt_embeds,
            attention_mask=negative_projected_attention_mask,
            max_new_tokens=max_new_tokens,
        )

    if do_classifier_free_guidance:
        seq_len = negative_prompt_embeds.shape[1]

        negative_prompt_embeds = negative_prompt_embeds.to(dtype=self.text_encoder_2.dtype)
        negative_attention_mask = (
            negative_attention_mask
            if negative_attention_mask is not None
            else mint.ones(negative_prompt_embeds.shape[:2], dtype=mindspore.int64)
        )
        negative_generated_prompt_embeds = negative_generated_prompt_embeds.to(dtype=self.language_model.dtype)

        # duplicate unconditional embeddings for each generation per prompt, using mps friendly method
        negative_prompt_embeds = negative_prompt_embeds.tile((1, num_waveforms_per_prompt, 1))
        negative_prompt_embeds = negative_prompt_embeds.view(batch_size * num_waveforms_per_prompt, seq_len, -1)

        # duplicate unconditional attention mask for each generation per prompt
        negative_attention_mask = negative_attention_mask.tile((1, num_waveforms_per_prompt))
        negative_attention_mask = negative_attention_mask.view(batch_size * num_waveforms_per_prompt, seq_len)

        # duplicate unconditional generated embeddings for each generation per prompt
        seq_len = negative_generated_prompt_embeds.shape[1]
        negative_generated_prompt_embeds = negative_generated_prompt_embeds.tile((1, num_waveforms_per_prompt, 1))
        negative_generated_prompt_embeds = negative_generated_prompt_embeds.view(
            batch_size * num_waveforms_per_prompt, seq_len, -1
        )

        # For classifier free guidance, we need to do two forward passes.
        # Here we concatenate the unconditional and text embeddings into a single batch
        # to avoid doing two forward passes
        prompt_embeds = mint.cat([negative_prompt_embeds, prompt_embeds])
        attention_mask = mint.cat([negative_attention_mask, attention_mask])
        generated_prompt_embeds = mint.cat([negative_generated_prompt_embeds, generated_prompt_embeds])

    return prompt_embeds, attention_mask, generated_prompt_embeds

mindone.diffusers.AudioLDM2Pipeline.generate_language_model(inputs_embeds=None, max_new_tokens=8, **model_kwargs)

Generates a sequence of hidden-states from the language model, conditioned on the embedding inputs.

PARAMETER DESCRIPTION
inputs_embeds

The sequence used as a prompt for the generation.

TYPE: `mindspore.tensor` of shape `(batch_size, sequence_length, hidden_size)` DEFAULT: None

max_new_tokens

Number of new tokens to generate.

TYPE: `int` DEFAULT: 8

model_kwargs

Ad hoc parametrization of additional model-specific kwargs that will be forwarded to the forward function of the model.

TYPE: `Dict[str, Any]`, *optional* DEFAULT: {}

Return

inputs_embeds (mindspore.tensorof shape(batch_size, sequence_length, hidden_size)`): The sequence of generated hidden-states.

Source code in mindone/diffusers/pipelines/audioldm2/pipeline_audioldm2.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def generate_language_model(
    self,
    inputs_embeds: mindspore.tensor = None,
    max_new_tokens: int = 8,
    **model_kwargs,
):
    """

    Generates a sequence of hidden-states from the language model, conditioned on the embedding inputs.

    Parameters:
        inputs_embeds (`mindspore.tensor` of shape `(batch_size, sequence_length, hidden_size)`):
            The sequence used as a prompt for the generation.
        max_new_tokens (`int`):
            Number of new tokens to generate.
        model_kwargs (`Dict[str, Any]`, *optional*):
            Ad hoc parametrization of additional model-specific kwargs that will be forwarded to the `forward`
            function of the model.

    Return:
        `inputs_embeds (`mindspore.tensor` of shape `(batch_size, sequence_length, hidden_size)`):
            The sequence of generated hidden-states.
    """
    self.language_model._supports_dynamic_input = True
    max_new_tokens = max_new_tokens if max_new_tokens is not None else self.language_model.config.max_new_tokens
    model_kwargs = self.language_model._get_initial_cache_position(inputs_embeds, model_kwargs)
    for _ in range(max_new_tokens):
        # prepare model inputs
        model_inputs = prepare_inputs_for_generation(inputs_embeds, **model_kwargs)

        # forward pass to get next hidden states
        output = self.language_model(**model_inputs, return_dict=True)

        next_hidden_states = output.last_hidden_state

        # Update the model input
        inputs_embeds = mint.cat([inputs_embeds, next_hidden_states[:, -1:, :]], dim=1)

        # Update generated hidden states, model inputs, and length for next step
        model_kwargs = self.language_model._update_model_kwargs_for_generation(output, model_kwargs)

    return inputs_embeds[:, -max_new_tokens:, :]

mindone.diffusers.AudioLDM2ProjectionModel

Bases: ModelMixin, ConfigMixin

A simple linear projection model to map two text embeddings to a shared latent space. It also inserts learned embedding vectors at the start and end of each text embedding sequence respectively. Each variable appended with _1 refers to that corresponding to the second text encoder. Otherwise, it is from the first.

PARAMETER DESCRIPTION
text_encoder_dim

Dimensionality of the text embeddings from the first text encoder (CLAP).

TYPE: `int`

text_encoder_1_dim

Dimensionality of the text embeddings from the second text encoder (T5 or VITS).

TYPE: `int`

langauge_model_dim

Dimensionality of the text embeddings from the language model (GPT2).

TYPE: `int`

Source code in mindone/diffusers/pipelines/audioldm2/modeling_audioldm2.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class AudioLDM2ProjectionModel(ModelMixin, ConfigMixin):
    """
    A simple linear projection model to map two text embeddings to a shared latent space. It also inserts learned
    embedding vectors at the start and end of each text embedding sequence respectively. Each variable appended with
    `_1` refers to that corresponding to the second text encoder. Otherwise, it is from the first.

    Args:
        text_encoder_dim (`int`):
            Dimensionality of the text embeddings from the first text encoder (CLAP).
        text_encoder_1_dim (`int`):
            Dimensionality of the text embeddings from the second text encoder (T5 or VITS).
        langauge_model_dim (`int`):
            Dimensionality of the text embeddings from the language model (GPT2).
    """

    @register_to_config
    def __init__(
        self,
        text_encoder_dim,
        text_encoder_1_dim,
        langauge_model_dim,
        use_learned_position_embedding=None,
        max_seq_length=None,
    ):
        super().__init__()
        # additional projection layers for each text encoder
        self.projection = mint.nn.Linear(text_encoder_dim, langauge_model_dim)
        self.projection_1 = mint.nn.Linear(text_encoder_1_dim, langauge_model_dim)

        # learnable SOS / EOS token embeddings for each text encoder
        self.sos_embed = Parameter(mint.ones(langauge_model_dim))
        self.eos_embed = Parameter(mint.ones(langauge_model_dim))

        self.sos_embed_1 = Parameter(mint.ones(langauge_model_dim))
        self.eos_embed_1 = Parameter(mint.ones(langauge_model_dim))

        self.use_learned_position_embedding = use_learned_position_embedding

        # learable positional embedding for vits encoder
        if self.use_learned_position_embedding is not None:
            self.learnable_positional_embedding = Parameter(mint.zeros((1, text_encoder_1_dim, max_seq_length)))

    def construct(
        self,
        hidden_states: Optional[mindspore.tensor] = None,
        hidden_states_1: Optional[mindspore.tensor] = None,
        attention_mask: Optional[mindspore.tensor] = None,
        attention_mask_1: Optional[mindspore.tensor] = None,
    ):
        hidden_states = self.projection(hidden_states)
        hidden_states, attention_mask = add_special_tokens(
            hidden_states, attention_mask, sos_token=self.sos_embed, eos_token=self.eos_embed
        )

        # Add positional embedding for Vits hidden state
        if self.use_learned_position_embedding is not None:
            hidden_states_1 = (hidden_states_1.permute(0, 2, 1) + self.learnable_positional_embedding).permute(0, 2, 1)

        hidden_states_1 = self.projection_1(hidden_states_1)
        hidden_states_1, attention_mask_1 = add_special_tokens(
            hidden_states_1, attention_mask_1, sos_token=self.sos_embed_1, eos_token=self.eos_embed_1
        )

        # concatenate clap and t5 text encoding
        hidden_states = mint.cat([hidden_states, hidden_states_1], dim=1)

        # concatenate attention masks
        if attention_mask is None and attention_mask_1 is not None:
            attention_mask = attention_mask_1.new_ones((hidden_states[:2]))
        elif attention_mask is not None and attention_mask_1 is None:
            attention_mask_1 = attention_mask.new_ones((hidden_states_1[:2]))

        if attention_mask is not None and attention_mask_1 is not None:
            attention_mask = mint.cat([attention_mask, attention_mask_1], dim=-1)
        else:
            attention_mask = None

        return AudioLDM2ProjectionModelOutput(
            hidden_states=hidden_states,
            attention_mask=attention_mask,
        )

mindone.diffusers.AudioLDM2UNet2DConditionModel

Bases: ModelMixin, ConfigMixin, UNet2DConditionLoadersMixin

A conditional 2D UNet model that takes a noisy sample, conditional state, and a timestep and returns a sample shaped output. Compared to the vanilla [UNet2DConditionModel], this variant optionally includes an additional self-attention layer in each Transformer block, as well as multiple cross-attention layers. It also allows for up to two cross-attention embeddings, encoder_hidden_states and encoder_hidden_states_1.

This model inherits from [ModelMixin]. Check the superclass documentation for it's generic methods implemented for all models (such as downloading or saving).

PARAMETER DESCRIPTION
sample_size

Height and width of input/output sample.

TYPE: `int` or `Tuple[int, int]`, *optional*, defaults to `None` DEFAULT: None

in_channels

Number of channels in the input sample.

TYPE: `int`, *optional*, defaults to 4 DEFAULT: 4

out_channels

Number of channels in the output.

TYPE: `int`, *optional*, defaults to 4 DEFAULT: 4

flip_sin_to_cos

Whether to flip the sin to cos in the time embedding.

TYPE: `bool`, *optional*, defaults to `False` DEFAULT: True

freq_shift

The frequency shift to apply to the time embedding.

TYPE: `int`, *optional*, defaults to 0 DEFAULT: 0

down_block_types

The tuple of downsample blocks to use.

TYPE: `Tuple[str]`, *optional*, defaults to `("CrossAttnDownBlock2D", "CrossAttnDownBlock2D", "CrossAttnDownBlock2D", "DownBlock2D")` DEFAULT: ('CrossAttnDownBlock2D', 'CrossAttnDownBlock2D', 'CrossAttnDownBlock2D', 'DownBlock2D')

mid_block_type

Block type for middle of UNet, it can only be UNetMidBlock2DCrossAttn for AudioLDM2.

TYPE: `str`, *optional*, defaults to `"UNetMidBlock2DCrossAttn"` DEFAULT: 'UNetMidBlock2DCrossAttn'

up_block_types

The tuple of upsample blocks to use.

TYPE: `Tuple[str]`, *optional*, defaults to `("UpBlock2D", "CrossAttnUpBlock2D", "CrossAttnUpBlock2D", "CrossAttnUpBlock2D")` DEFAULT: ('UpBlock2D', 'CrossAttnUpBlock2D', 'CrossAttnUpBlock2D', 'CrossAttnUpBlock2D')

only_cross_attention

Whether to include self-attention in the basic transformer blocks, see [~models.attention.BasicTransformerBlock].

TYPE: `bool` or `Tuple[bool]`, *optional*, default to `False` DEFAULT: False

block_out_channels

The tuple of output channels for each block.

TYPE: `Tuple[int]`, *optional*, defaults to `(320, 640, 1280, 1280)` DEFAULT: (320, 640, 1280, 1280)

layers_per_block

The number of layers per block.

TYPE: `int`, *optional*, defaults to 2 DEFAULT: 2

downsample_padding

The padding to use for the downsampling convolution.

TYPE: `int`, *optional*, defaults to 1 DEFAULT: 1

mid_block_scale_factor

The scale factor to use for the mid block.

TYPE: `float`, *optional*, defaults to 1.0 DEFAULT: 1

act_fn

The activation function to use.

TYPE: `str`, *optional*, defaults to `"silu"` DEFAULT: 'silu'

norm_num_groups

The number of groups to use for the normalization. If None, normalization and activation layers is skipped in post-processing.

TYPE: `int`, *optional*, defaults to 32 DEFAULT: 32

norm_eps

The epsilon to use for the normalization.

TYPE: `float`, *optional*, defaults to 1e-5 DEFAULT: 1e-05

cross_attention_dim

The dimension of the cross attention features.

TYPE: `int` or `Tuple[int]`, *optional*, defaults to 1280 DEFAULT: 1280

transformer_layers_per_block

The number of transformer blocks of type [~models.attention.BasicTransformerBlock]. Only relevant for [~models.unet_2d_blocks.CrossAttnDownBlock2D], [~models.unet_2d_blocks.CrossAttnUpBlock2D], [~models.unet_2d_blocks.UNetMidBlock2DCrossAttn].

TYPE: `int` or `Tuple[int]`, *optional*, defaults to 1 DEFAULT: 1

attention_head_dim

The dimension of the attention heads.

TYPE: `int`, *optional*, defaults to 8 DEFAULT: 8

num_attention_heads

The number of attention heads. If not defined, defaults to attention_head_dim

TYPE: `int`, *optional* DEFAULT: None

resnet_time_scale_shift

Time scale shift config for ResNet blocks (see [~models.resnet.ResnetBlock2D]). Choose from default or scale_shift.

TYPE: `str`, *optional*, defaults to `"default"` DEFAULT: 'default'

class_embed_type

The type of class embedding to use which is ultimately summed with the time embeddings. Choose from None, "timestep", "identity", "projection", or "simple_projection".

TYPE: `str`, *optional*, defaults to `None` DEFAULT: None

num_class_embeds

Input dimension of the learnable embedding matrix to be projected to time_embed_dim, when performing class conditioning with class_embed_type equal to None.

TYPE: `int`, *optional*, defaults to `None` DEFAULT: None

time_embedding_type

The type of position embedding to use for timesteps. Choose from positional or fourier.

TYPE: `str`, *optional*, defaults to `positional` DEFAULT: 'positional'

time_embedding_dim

An optional override for the dimension of the projected time embedding.

TYPE: `int`, *optional*, defaults to `None` DEFAULT: None

time_embedding_act_fn

Optional activation function to use only once on the time embeddings before they are passed to the rest of the UNet. Choose from silu, mish, gelu, and swish.

TYPE: `str`, *optional*, defaults to `None` DEFAULT: None

timestep_post_act

The second activation function to use in timestep embedding. Choose from silu, mish and gelu.

TYPE: `str`, *optional*, defaults to `None` DEFAULT: None

time_cond_proj_dim

The dimension of cond_proj layer in the timestep embedding.

TYPE: `int`, *optional*, defaults to `None` DEFAULT: None

conv_in_kernel

The kernel size of conv_in layer.

TYPE: `int`, *optional*, default to `3` DEFAULT: 3

conv_out_kernel

The kernel size of conv_out layer.

TYPE: `int`, *optional*, default to `3` DEFAULT: 3

projection_class_embeddings_input_dim

The dimension of the class_labels input when class_embed_type="projection". Required when class_embed_type="projection".

TYPE: `int`, *optional* DEFAULT: None

class_embeddings_concat

Whether to concatenate the time embeddings with the class embeddings.

TYPE: `bool`, *optional*, defaults to `False` DEFAULT: False

Source code in mindone/diffusers/pipelines/audioldm2/modeling_audioldm2.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
class AudioLDM2UNet2DConditionModel(ModelMixin, ConfigMixin, UNet2DConditionLoadersMixin):
    r"""
    A conditional 2D UNet model that takes a noisy sample, conditional state, and a timestep and returns a sample
    shaped output. Compared to the vanilla [`UNet2DConditionModel`], this variant optionally includes an additional
    self-attention layer in each Transformer block, as well as multiple cross-attention layers. It also allows for up
    to two cross-attention embeddings, `encoder_hidden_states` and `encoder_hidden_states_1`.

    This model inherits from [`ModelMixin`]. Check the superclass documentation for it's generic methods implemented
    for all models (such as downloading or saving).

    Parameters:
        sample_size (`int` or `Tuple[int, int]`, *optional*, defaults to `None`):
            Height and width of input/output sample.
        in_channels (`int`, *optional*, defaults to 4): Number of channels in the input sample.
        out_channels (`int`, *optional*, defaults to 4): Number of channels in the output.
        flip_sin_to_cos (`bool`, *optional*, defaults to `False`):
            Whether to flip the sin to cos in the time embedding.
        freq_shift (`int`, *optional*, defaults to 0): The frequency shift to apply to the time embedding.
        down_block_types (`Tuple[str]`, *optional*, defaults to `("CrossAttnDownBlock2D", "CrossAttnDownBlock2D", "CrossAttnDownBlock2D", "DownBlock2D")`):
            The tuple of downsample blocks to use.
        mid_block_type (`str`, *optional*, defaults to `"UNetMidBlock2DCrossAttn"`):
            Block type for middle of UNet, it can only be `UNetMidBlock2DCrossAttn` for AudioLDM2.
        up_block_types (`Tuple[str]`, *optional*, defaults to `("UpBlock2D", "CrossAttnUpBlock2D", "CrossAttnUpBlock2D", "CrossAttnUpBlock2D")`):
            The tuple of upsample blocks to use.
        only_cross_attention (`bool` or `Tuple[bool]`, *optional*, default to `False`):
            Whether to include self-attention in the basic transformer blocks, see
            [`~models.attention.BasicTransformerBlock`].
        block_out_channels (`Tuple[int]`, *optional*, defaults to `(320, 640, 1280, 1280)`):
            The tuple of output channels for each block.
        layers_per_block (`int`, *optional*, defaults to 2): The number of layers per block.
        downsample_padding (`int`, *optional*, defaults to 1): The padding to use for the downsampling convolution.
        mid_block_scale_factor (`float`, *optional*, defaults to 1.0): The scale factor to use for the mid block.
        act_fn (`str`, *optional*, defaults to `"silu"`): The activation function to use.
        norm_num_groups (`int`, *optional*, defaults to 32): The number of groups to use for the normalization.
            If `None`, normalization and activation layers is skipped in post-processing.
        norm_eps (`float`, *optional*, defaults to 1e-5): The epsilon to use for the normalization.
        cross_attention_dim (`int` or `Tuple[int]`, *optional*, defaults to 1280):
            The dimension of the cross attention features.
        transformer_layers_per_block (`int` or `Tuple[int]`, *optional*, defaults to 1):
            The number of transformer blocks of type [`~models.attention.BasicTransformerBlock`]. Only relevant for
            [`~models.unet_2d_blocks.CrossAttnDownBlock2D`], [`~models.unet_2d_blocks.CrossAttnUpBlock2D`],
            [`~models.unet_2d_blocks.UNetMidBlock2DCrossAttn`].
        attention_head_dim (`int`, *optional*, defaults to 8): The dimension of the attention heads.
        num_attention_heads (`int`, *optional*):
            The number of attention heads. If not defined, defaults to `attention_head_dim`
        resnet_time_scale_shift (`str`, *optional*, defaults to `"default"`): Time scale shift config
            for ResNet blocks (see [`~models.resnet.ResnetBlock2D`]). Choose from `default` or `scale_shift`.
        class_embed_type (`str`, *optional*, defaults to `None`):
            The type of class embedding to use which is ultimately summed with the time embeddings. Choose from `None`,
            `"timestep"`, `"identity"`, `"projection"`, or `"simple_projection"`.
        num_class_embeds (`int`, *optional*, defaults to `None`):
            Input dimension of the learnable embedding matrix to be projected to `time_embed_dim`, when performing
            class conditioning with `class_embed_type` equal to `None`.
        time_embedding_type (`str`, *optional*, defaults to `positional`):
            The type of position embedding to use for timesteps. Choose from `positional` or `fourier`.
        time_embedding_dim (`int`, *optional*, defaults to `None`):
            An optional override for the dimension of the projected time embedding.
        time_embedding_act_fn (`str`, *optional*, defaults to `None`):
            Optional activation function to use only once on the time embeddings before they are passed to the rest of
            the UNet. Choose from `silu`, `mish`, `gelu`, and `swish`.
        timestep_post_act (`str`, *optional*, defaults to `None`):
            The second activation function to use in timestep embedding. Choose from `silu`, `mish` and `gelu`.
        time_cond_proj_dim (`int`, *optional*, defaults to `None`):
            The dimension of `cond_proj` layer in the timestep embedding.
        conv_in_kernel (`int`, *optional*, default to `3`): The kernel size of `conv_in` layer.
        conv_out_kernel (`int`, *optional*, default to `3`): The kernel size of `conv_out` layer.
        projection_class_embeddings_input_dim (`int`, *optional*): The dimension of the `class_labels` input when
            `class_embed_type="projection"`. Required when `class_embed_type="projection"`.
        class_embeddings_concat (`bool`, *optional*, defaults to `False`): Whether to concatenate the time
            embeddings with the class embeddings.
    """

    _supports_gradient_checkpointing = True

    @register_to_config
    def __init__(
        self,
        sample_size: Optional[int] = None,
        in_channels: int = 4,
        out_channels: int = 4,
        flip_sin_to_cos: bool = True,
        freq_shift: int = 0,
        down_block_types: Tuple[str] = (
            "CrossAttnDownBlock2D",
            "CrossAttnDownBlock2D",
            "CrossAttnDownBlock2D",
            "DownBlock2D",
        ),
        mid_block_type: Optional[str] = "UNetMidBlock2DCrossAttn",
        up_block_types: Tuple[str] = ("UpBlock2D", "CrossAttnUpBlock2D", "CrossAttnUpBlock2D", "CrossAttnUpBlock2D"),
        only_cross_attention: Union[bool, Tuple[bool]] = False,
        block_out_channels: Tuple[int] = (320, 640, 1280, 1280),
        layers_per_block: Union[int, Tuple[int]] = 2,
        downsample_padding: int = 1,
        mid_block_scale_factor: float = 1,
        act_fn: str = "silu",
        norm_num_groups: Optional[int] = 32,
        norm_eps: float = 1e-5,
        cross_attention_dim: Union[int, Tuple[int]] = 1280,
        transformer_layers_per_block: Union[int, Tuple[int]] = 1,
        attention_head_dim: Union[int, Tuple[int]] = 8,
        num_attention_heads: Optional[Union[int, Tuple[int]]] = None,
        use_linear_projection: bool = False,
        class_embed_type: Optional[str] = None,
        num_class_embeds: Optional[int] = None,
        upcast_attention: bool = False,
        resnet_time_scale_shift: str = "default",
        time_embedding_type: str = "positional",
        time_embedding_dim: Optional[int] = None,
        time_embedding_act_fn: Optional[str] = None,
        timestep_post_act: Optional[str] = None,
        time_cond_proj_dim: Optional[int] = None,
        conv_in_kernel: int = 3,
        conv_out_kernel: int = 3,
        projection_class_embeddings_input_dim: Optional[int] = None,
        class_embeddings_concat: bool = False,
    ):
        super().__init__()

        self.sample_size = sample_size

        if num_attention_heads is not None:
            raise ValueError(
                "At the moment it is not possible to define the number of attention heads via `num_attention_heads` because of a naming issue \
                as described in https://github.com/huggingface/diffusers/issues/2011#issuecomment-1547958131. \
                Passing `num_attention_heads` will only be supported in diffusers v0.19."
            )

        # If `num_attention_heads` is not defined (which is the case for most models)
        # it will default to `attention_head_dim`. This looks weird upon first reading it and it is.
        # The reason for this behavior is to correct for incorrectly named variables that were introduced
        # Changing `attention_head_dim` to `num_attention_heads` for 40,000+ configurations is too backwards breaking
        # which is why we correct for the naming here.
        num_attention_heads = num_attention_heads or attention_head_dim

        # Check inputs
        if len(down_block_types) != len(up_block_types):
            raise ValueError(
                f"Must provide the same number of `down_block_types` as `up_block_types`. `down_block_types`: \
                    {down_block_types}. `up_block_types`: {up_block_types}."
            )

        if len(block_out_channels) != len(down_block_types):
            raise ValueError(
                f"Must provide the same number of `block_out_channels` as `down_block_types`. `block_out_channels`:\
                      {block_out_channels}. `down_block_types`: {down_block_types}."
            )

        if not isinstance(only_cross_attention, bool) and len(only_cross_attention) != len(down_block_types):
            raise ValueError(
                f"Must provide the same number of `only_cross_attention` as `down_block_types`. `only_cross_attention`: \
                    {only_cross_attention}. `down_block_types`: {down_block_types}."
            )

        if not isinstance(num_attention_heads, int) and len(num_attention_heads) != len(down_block_types):
            raise ValueError(
                f"Must provide the same number of `num_attention_heads` as `down_block_types`. `num_attention_heads`: \
                    {num_attention_heads}. `down_block_types`: {down_block_types}."
            )

        if not isinstance(attention_head_dim, int) and len(attention_head_dim) != len(down_block_types):
            raise ValueError(
                f"Must provide the same number of `attention_head_dim` as `down_block_types`. `attention_head_dim`: \
                    {attention_head_dim}. `down_block_types`: {down_block_types}."
            )

        if isinstance(cross_attention_dim, list) and len(cross_attention_dim) != len(down_block_types):
            raise ValueError(
                f"Must provide the same number of `cross_attention_dim` as `down_block_types`. `cross_attention_dim`: \
                    {cross_attention_dim}. `down_block_types`: {down_block_types}."
            )

        if not isinstance(layers_per_block, int) and len(layers_per_block) != len(down_block_types):
            raise ValueError(
                f"Must provide the same number of `layers_per_block` as `down_block_types`. `layers_per_block`: \
                    {layers_per_block}. `down_block_types`: {down_block_types}."
            )

        # input
        conv_in_padding = (conv_in_kernel - 1) // 2
        self.conv_in = mint.nn.Conv2d(
            in_channels,
            block_out_channels[0],
            kernel_size=conv_in_kernel,
            padding=conv_in_padding,
        )

        # time
        if time_embedding_type == "positional":
            time_embed_dim = time_embedding_dim or block_out_channels[0] * 4

            self.time_proj = Timesteps(block_out_channels[0], flip_sin_to_cos, freq_shift)
            timestep_input_dim = block_out_channels[0]
        else:
            raise ValueError(f"{time_embedding_type} does not exist. Please make sure to use `positional`.")

        self.time_embedding = TimestepEmbedding(
            timestep_input_dim,
            time_embed_dim,
            act_fn=act_fn,
            post_act_fn=timestep_post_act,
            cond_proj_dim=time_cond_proj_dim,
        )

        # class embedding
        if class_embed_type is None and num_class_embeds is not None:
            self.class_embedding = nn.Embedding(num_class_embeds, time_embed_dim)
        elif class_embed_type == "timestep":
            self.class_embedding = TimestepEmbedding(timestep_input_dim, time_embed_dim, act_fn=act_fn)
        elif class_embed_type == "identity":
            self.class_embedding = nn.Identity(time_embed_dim, time_embed_dim)
        elif class_embed_type == "projection":
            if projection_class_embeddings_input_dim is None:
                raise ValueError(
                    "`class_embed_type`: 'projection' requires `projection_class_embeddings_input_dim` be set"
                )
            # The projection `class_embed_type` is the same as the timestep `class_embed_type` except
            # 1. the `class_labels` inputs are not first converted to sinusoidal embeddings
            # 2. it projects from an arbitrary input dimension.
            #
            # Note that `TimestepEmbedding` is quite general, being mainly linear layers and activations.
            # When used for embedding actual timesteps, the timesteps are first converted to sinusoidal embeddings.
            # As a result, `TimestepEmbedding` can be passed arbitrary vectors.
            self.class_embedding = TimestepEmbedding(projection_class_embeddings_input_dim, time_embed_dim)
        elif class_embed_type == "simple_projection":
            if projection_class_embeddings_input_dim is None:
                raise ValueError(
                    "`class_embed_type`: 'simple_projection' requires `projection_class_embeddings_input_dim` be set"
                )
            self.class_embedding = mint.nn.Linear(projection_class_embeddings_input_dim, time_embed_dim)
        else:
            self.class_embedding = None

        if time_embedding_act_fn is None:
            self.time_embed_act = None
        else:
            self.time_embed_act = get_activation(time_embedding_act_fn)

        down_blocks = []
        up_blocks = []

        if isinstance(only_cross_attention, bool):
            only_cross_attention = [only_cross_attention] * len(down_block_types)

        if isinstance(num_attention_heads, int):
            num_attention_heads = (num_attention_heads,) * len(down_block_types)

        if isinstance(cross_attention_dim, int):
            cross_attention_dim = (cross_attention_dim,) * len(down_block_types)

        if isinstance(layers_per_block, int):
            layers_per_block = [layers_per_block] * len(down_block_types)

        if isinstance(transformer_layers_per_block, int):
            transformer_layers_per_block = [transformer_layers_per_block] * len(down_block_types)

        if class_embeddings_concat:
            # The time embeddings are concatenated with the class embeddings. The dimension of the
            # time embeddings passed to the down, middle, and up blocks is twice the dimension of the
            # regular time embeddings
            blocks_time_embed_dim = time_embed_dim * 2
        else:
            blocks_time_embed_dim = time_embed_dim

        # down
        output_channel = block_out_channels[0]
        for i, down_block_type in enumerate(down_block_types):
            input_channel = output_channel
            output_channel = block_out_channels[i]
            is_final_block = i == len(block_out_channels) - 1

            down_block = get_down_block(
                down_block_type,
                num_layers=layers_per_block[i],
                transformer_layers_per_block=transformer_layers_per_block[i],
                in_channels=input_channel,
                out_channels=output_channel,
                temb_channels=blocks_time_embed_dim,
                add_downsample=not is_final_block,
                resnet_eps=norm_eps,
                resnet_act_fn=act_fn,
                resnet_groups=norm_num_groups,
                cross_attention_dim=cross_attention_dim[i],
                num_attention_heads=num_attention_heads[i],
                downsample_padding=downsample_padding,
                use_linear_projection=use_linear_projection,
                only_cross_attention=only_cross_attention[i],
                upcast_attention=upcast_attention,
                resnet_time_scale_shift=resnet_time_scale_shift,
            )
            down_blocks.append(down_block)
        self.down_blocks = nn.CellList(down_blocks)

        # mid
        if mid_block_type == "UNetMidBlock2DCrossAttn":
            self.mid_block = UNetMidBlock2DCrossAttn(
                transformer_layers_per_block=transformer_layers_per_block[-1],
                in_channels=block_out_channels[-1],
                temb_channels=blocks_time_embed_dim,
                resnet_eps=norm_eps,
                resnet_act_fn=act_fn,
                output_scale_factor=mid_block_scale_factor,
                resnet_time_scale_shift=resnet_time_scale_shift,
                cross_attention_dim=cross_attention_dim[-1],
                num_attention_heads=num_attention_heads[-1],
                resnet_groups=norm_num_groups,
                use_linear_projection=use_linear_projection,
                upcast_attention=upcast_attention,
            )
        else:
            raise ValueError(
                f"unknown mid_block_type : {mid_block_type}. Should be `UNetMidBlock2DCrossAttn` for AudioLDM2."
            )

        # count how many layers upsample the images
        self.num_upsamplers = 0

        # up
        reversed_block_out_channels = list(reversed(block_out_channels))
        reversed_num_attention_heads = list(reversed(num_attention_heads))
        reversed_layers_per_block = list(reversed(layers_per_block))
        reversed_cross_attention_dim = list(reversed(cross_attention_dim))
        reversed_transformer_layers_per_block = list(reversed(transformer_layers_per_block))
        only_cross_attention = list(reversed(only_cross_attention))

        output_channel = reversed_block_out_channels[0]
        for i, up_block_type in enumerate(up_block_types):
            is_final_block = i == len(block_out_channels) - 1

            prev_output_channel = output_channel
            output_channel = reversed_block_out_channels[i]
            input_channel = reversed_block_out_channels[min(i + 1, len(block_out_channels) - 1)]

            # add upsample block for all BUT final layer
            if not is_final_block:
                add_upsample = True
                self.num_upsamplers += 1
            else:
                add_upsample = False

            up_block = get_up_block(
                up_block_type,
                num_layers=reversed_layers_per_block[i] + 1,
                transformer_layers_per_block=reversed_transformer_layers_per_block[i],
                in_channels=input_channel,
                out_channels=output_channel,
                prev_output_channel=prev_output_channel,
                temb_channels=blocks_time_embed_dim,
                add_upsample=add_upsample,
                resnet_eps=norm_eps,
                resnet_act_fn=act_fn,
                resnet_groups=norm_num_groups,
                cross_attention_dim=reversed_cross_attention_dim[i],
                num_attention_heads=reversed_num_attention_heads[i],
                use_linear_projection=use_linear_projection,
                only_cross_attention=only_cross_attention[i],
                upcast_attention=upcast_attention,
                resnet_time_scale_shift=resnet_time_scale_shift,
            )
            up_blocks.append(up_block)
            prev_output_channel = output_channel
        self.up_blocks = nn.CellList(up_blocks)

        # out
        if norm_num_groups is not None:
            self.conv_norm_out = mint.nn.GroupNorm(
                num_channels=block_out_channels[0], num_groups=norm_num_groups, eps=norm_eps
            )

            self.conv_act = get_activation(act_fn)

        else:
            self.conv_norm_out = None
            self.conv_act = None

        conv_out_padding = (conv_out_kernel - 1) // 2
        self.conv_out = mint.nn.Conv2d(
            block_out_channels[0],
            out_channels,
            kernel_size=conv_out_kernel,
            padding=conv_out_padding,
        )

    @property
    # Copied from diffusers.models.unets.unet_2d_condition.UNet2DConditionModel.attn_processors
    def attn_processors(self) -> Dict[str, AttentionProcessor]:
        r"""
        Returns:
            `dict` of attention processors: A dictionary containing all attention processors used in the model with
            indexed by its weight name.
        """
        # set recursively
        processors = {}

        def fn_recursive_add_processors(name: str, module: nn.Cell, processors: Dict[str, AttentionProcessor]):
            if hasattr(module, "get_processor"):
                processors[f"{name}.processor"] = module.get_processor()

            for sub_name, child in module.named_children():
                fn_recursive_add_processors(f"{name}.{sub_name}", child, processors)

            return processors

        for name, module in self.named_children():
            fn_recursive_add_processors(name, module, processors)

        return processors

    # Copied from diffusers.models.unets.unet_2d_condition.UNet2DConditionModel.set_attn_processor
    def set_attn_processor(self, processor: Union[AttentionProcessor, Dict[str, AttentionProcessor]]):
        r"""
        Sets the attention processor to use to compute attention.

        Parameters:
            processor (`dict` of `AttentionProcessor` or only `AttentionProcessor`):
                The instantiated processor class or a dictionary of processor classes that will be set as the processor
                for **all** `Attention` layers.

                If `processor` is a dict, the key needs to define the path to the corresponding cross attention
                processor. This is strongly recommended when setting trainable attention processors.

        """
        count = len(self.attn_processors.keys())

        if isinstance(processor, dict) and len(processor) != count:
            raise ValueError(
                f"A dict of processors was passed, but the number of processors {len(processor)} does not match the"
                f" number of attention layers: {count}. Please make sure to pass {count} processor classes."
            )

        def fn_recursive_attn_processor(name: str, module: nn.Cell, processor):
            if hasattr(module, "set_processor"):
                if not isinstance(processor, dict):
                    module.set_processor(processor)
                else:
                    module.set_processor(processor.pop(f"{name}.processor"))

            for sub_name, child in module.named_children():
                fn_recursive_attn_processor(f"{name}.{sub_name}", child, processor)

        for name, module in self.named_children():
            fn_recursive_attn_processor(name, module, processor)

    # Copied from diffusers.models.unets.unet_2d_condition.UNet2DConditionModel.set_default_attn_processor
    def set_default_attn_processor(self):
        """
        Disables custom attention processors and sets the default attention implementation.
        """
        if all(proc.__class__ in ADDED_KV_ATTENTION_PROCESSORS for proc in self.attn_processors.values()):
            processor = AttnAddedKVProcessor()
        elif all(proc.__class__ in CROSS_ATTENTION_PROCESSORS for proc in self.attn_processors.values()):
            processor = AttnProcessor()
        else:
            raise ValueError(
                f"Cannot call `set_default_attn_processor` when attention processors are of type {next(iter(self.attn_processors.values()))}"
            )

        self.set_attn_processor(processor)

    # Copied from diffusers.models.unets.unet_2d_condition.UNet2DConditionModel.set_attention_slice
    def set_attention_slice(self, slice_size):
        r"""
        Enable sliced attention computation.

        When this option is enabled, the attention module splits the input tensor in slices to compute attention in
        several steps. This is useful for saving some memory in exchange for a small decrease in speed.

        Args:
            slice_size (`str` or `int` or `list(int)`, *optional*, defaults to `"auto"`):
                When `"auto"`, input to the attention heads is halved, so attention is computed in two steps. If
                `"max"`, maximum amount of memory is saved by running only one slice at a time. If a number is
                provided, uses as many slices as `attention_head_dim // slice_size`. In this case, `attention_head_dim`
                must be a multiple of `slice_size`.
        """
        sliceable_head_dims = []

        def fn_recursive_retrieve_sliceable_dims(module: nn.Cell):
            if hasattr(module, "set_attention_slice"):
                sliceable_head_dims.append(module.sliceable_head_dim)

            for child in module.children():
                fn_recursive_retrieve_sliceable_dims(child)

        # retrieve number of attention layers
        for module in self.children():
            fn_recursive_retrieve_sliceable_dims(module)

        num_sliceable_layers = len(sliceable_head_dims)

        if slice_size == "auto":
            # half the attention head size is usually a good trade-off between
            # speed and memory
            slice_size = [dim // 2 for dim in sliceable_head_dims]
        elif slice_size == "max":
            # make smallest slice possible
            slice_size = num_sliceable_layers * [1]

        slice_size = num_sliceable_layers * [slice_size] if not isinstance(slice_size, list) else slice_size

        if len(slice_size) != len(sliceable_head_dims):
            raise ValueError(
                f"You have provided {len(slice_size)}, but {self.config} has {len(sliceable_head_dims)} different"
                f" attention layers. Make sure to match `len(slice_size)` to be {len(sliceable_head_dims)}."
            )

        for i in range(len(slice_size)):
            size = slice_size[i]
            dim = sliceable_head_dims[i]
            if size is not None and size > dim:
                raise ValueError(f"size {size} has to be smaller or equal to {dim}.")

        # Recursively walk through all the children.
        # Any children which exposes the set_attention_slice method
        # gets the message
        def fn_recursive_set_attention_slice(module: nn.Cell, slice_size: List[int]):
            if hasattr(module, "set_attention_slice"):
                module.set_attention_slice(slice_size.pop())

            for child in module.children():
                fn_recursive_set_attention_slice(child, slice_size)

        reversed_slice_size = list(reversed(slice_size))
        for module in self.children():
            fn_recursive_set_attention_slice(module, reversed_slice_size)

    # Copied from diffusers.models.unets.unet_2d_condition.UNet2DConditionModel._set_gradient_checkpointing
    def _set_gradient_checkpointing(self, module, value=False):
        if hasattr(module, "gradient_checkpointing"):
            module.gradient_checkpointing = value

    def construct(
        self,
        sample: mindspore.tensor,
        timestep: Union[mindspore.tensor, float, int],
        encoder_hidden_states: mindspore.tensor,
        class_labels: Optional[mindspore.tensor] = None,
        timestep_cond: Optional[mindspore.tensor] = None,
        attention_mask: Optional[mindspore.tensor] = None,
        cross_attention_kwargs: Optional[Dict[str, Any]] = None,
        encoder_attention_mask: Optional[mindspore.tensor] = None,
        return_dict: bool = True,
        encoder_hidden_states_1: Optional[mindspore.tensor] = None,
        encoder_attention_mask_1: Optional[mindspore.tensor] = None,
    ) -> Union[UNet2DConditionOutput, Tuple]:
        r"""
        The [`AudioLDM2UNet2DConditionModel`] forward method.

        Args:
            sample (`mindspore.tensor`):
                The noisy input tensor with the following shape `(batch, channel, height, width)`.
            timestep (`mindspore.tensor` or `float` or `int`): The number of timesteps to denoise an input.
            encoder_hidden_states (`mindspore.tensor`):
                The encoder hidden states with shape `(batch, sequence_length, feature_dim)`.
            encoder_attention_mask (`mindspore.tensor`):
                A cross-attention mask of shape `(batch, sequence_length)` is applied to `encoder_hidden_states`. If
                `True` the mask is kept, otherwise if `False` it is discarded. Mask will be converted into a bias,
                which adds large negative values to the attention scores corresponding to "discard" tokens.
            return_dict (`bool`, *optional*, defaults to `True`):
                Whether or not to return a [`~models.unets.unet_2d_condition.UNet2DConditionOutput`] instead of a plain
                tuple.
            cross_attention_kwargs (`dict`, *optional*):
                A kwargs dictionary that if specified is passed along to the [`AttnProcessor`].
            encoder_hidden_states_1 (`mindspore.tensor`, *optional*):
                A second set of encoder hidden states with shape `(batch, sequence_length_2, feature_dim_2)`. Can be
                used to condition the model on a different set of embeddings to `encoder_hidden_states`.
            encoder_attention_mask_1 (`mindspore.tensor`, *optional*):
                A cross-attention mask of shape `(batch, sequence_length_2)` is applied to `encoder_hidden_states_1`.
                If `True` the mask is kept, otherwise if `False` it is discarded. Mask will be converted into a bias,
                which adds large negative values to the attention scores corresponding to "discard" tokens.

        Returns:
            [`~models.unets.unet_2d_condition.UNet2DConditionOutput`] or `tuple`:
                If `return_dict` is True, an [`~models.unets.unet_2d_condition.UNet2DConditionOutput`] is returned,
                otherwise a `tuple` is returned where the first element is the sample tensor.
        """
        # By default samples have to be AT least a multiple of the overall upsampling factor.
        # The overall upsampling factor is equal to 2 ** (# num of upsampling layers).
        # However, the upsampling interpolation output size can be forced to fit any upsampling size
        # on the fly if necessary.
        # default_overall_up_factor = 2**self.num_upsamplers

        # upsample size should be forwarded when sample is not a multiple of `default_overall_up_factor`
        upsample_size = None
        logger.info("Forward upsample size to force interpolation output size.")
        forward_upsample_size = True

        # ensure attention_mask is a bias, and give it a singleton query_tokens dimension
        # expects mask of shape:
        #   [batch, key_tokens]
        # adds singleton query_tokens dimension:
        #   [batch,                    1, key_tokens]
        # this helps to broadcast it as a bias over attention scores, which will be in one of the following shapes:
        #   [batch,  heads, query_tokens, key_tokens] (e.g. mint sdp attn)
        #   [batch * heads, query_tokens, key_tokens] (e.g. xformers or classic attn)
        if attention_mask is not None:
            # assume that mask is expressed as:
            #   (1 = keep,      0 = discard)
            # convert mask into a bias that can be added to attention scores:
            #       (keep = +0,     discard = -10000.0)
            attention_mask = (1 - attention_mask.to(sample.dtype)) * -10000.0
            attention_mask = attention_mask.unsqueeze(1)

        # convert encoder_attention_mask to a bias the same way we do for attention_mask
        if encoder_attention_mask is not None:
            encoder_attention_mask = (1 - encoder_attention_mask.to(sample.dtype)) * -10000.0
            encoder_attention_mask = encoder_attention_mask.unsqueeze(1)

        if encoder_attention_mask_1 is not None:
            encoder_attention_mask_1 = (1 - encoder_attention_mask_1.to(sample.dtype)) * -10000.0
            encoder_attention_mask_1 = encoder_attention_mask_1.unsqueeze(1)

        # 1. time
        timesteps = timestep
        if not mindspore.is_tensor(timesteps):
            # TODO: this requires sync between CPU and GPU. So try to pass timesteps as tensors if you can
            # This would be a good case for the `match` statement (Python 3.10+)
            if isinstance(timestep, float):
                dtype = mindspore.float64
            else:
                dtype = mindspore.int64
            timesteps = mindspore.tensor([timesteps], dtype=dtype)
        elif len(timesteps.shape) == 0:
            timesteps = timesteps[None]

        # broadcast to batch dimension in a way that's compatible with ONNX/Core ML
        timesteps = timesteps.broadcast_to((sample.shape[0],))

        t_emb = self.time_proj(timesteps)

        # `Timesteps` does not contain any weights and will always return f32 tensors
        # but time_embedding might actually be running in fp16. so we need to cast here.
        # there might be better ways to encapsulate this.
        t_emb = t_emb.to(dtype=sample.dtype)

        emb = self.time_embedding(t_emb, timestep_cond)
        aug_emb = None

        if self.class_embedding is not None:
            if class_labels is None:
                raise ValueError("class_labels should be provided when num_class_embeds > 0")

            if self.config.class_embed_type == "timestep":
                class_labels = self.time_proj(class_labels)

                # `Timesteps` does not contain any weights and will always return f32 tensors
                # there might be better ways to encapsulate this.
                class_labels = class_labels.to(dtype=sample.dtype)

            class_emb = self.class_embedding(class_labels).to(dtype=sample.dtype)

            if self.config.class_embeddings_concat:
                emb = mint.cat([emb, class_emb], dim=-1)
            else:
                emb = emb + class_emb

        emb = emb + aug_emb if aug_emb is not None else emb

        if self.time_embed_act is not None:
            emb = self.time_embed_act(emb)

        # 2. pre-process
        sample = self.conv_in(sample)

        # 3. down
        down_block_res_samples = (sample,)
        for downsample_block in self.down_blocks:
            if hasattr(downsample_block, "has_cross_attention") and downsample_block.has_cross_attention:
                sample, res_samples = downsample_block(
                    hidden_states=sample,
                    temb=emb,
                    encoder_hidden_states=encoder_hidden_states,
                    attention_mask=attention_mask,
                    cross_attention_kwargs=cross_attention_kwargs,
                    encoder_attention_mask=encoder_attention_mask,
                    encoder_hidden_states_1=encoder_hidden_states_1,
                    encoder_attention_mask_1=encoder_attention_mask_1,
                )
            else:
                sample, res_samples = downsample_block(hidden_states=sample, temb=emb)

            down_block_res_samples += res_samples

        # 4. mid
        if self.mid_block is not None:
            sample = self.mid_block(
                sample,
                emb,
                encoder_hidden_states=encoder_hidden_states,
                attention_mask=attention_mask,
                cross_attention_kwargs=cross_attention_kwargs,
                encoder_attention_mask=encoder_attention_mask,
                encoder_hidden_states_1=encoder_hidden_states_1,
                encoder_attention_mask_1=encoder_attention_mask_1,
            )

        # 5. up
        for i, upsample_block in enumerate(self.up_blocks):
            is_final_block = i == len(self.up_blocks) - 1

            res_samples = down_block_res_samples[-len(upsample_block.resnets) :]
            down_block_res_samples = down_block_res_samples[: -len(upsample_block.resnets)]

            # if we have not reached the final block and need to forward the
            # upsample size, we do it here
            if not is_final_block and forward_upsample_size:
                upsample_size = down_block_res_samples[-1].shape[2:]

            if hasattr(upsample_block, "has_cross_attention") and upsample_block.has_cross_attention:
                sample = upsample_block(
                    hidden_states=sample,
                    temb=emb,
                    res_hidden_states_tuple=res_samples,
                    encoder_hidden_states=encoder_hidden_states,
                    cross_attention_kwargs=cross_attention_kwargs,
                    upsample_size=upsample_size,
                    attention_mask=attention_mask,
                    encoder_attention_mask=encoder_attention_mask,
                    encoder_hidden_states_1=encoder_hidden_states_1,
                    encoder_attention_mask_1=encoder_attention_mask_1,
                )
            else:
                sample = upsample_block(
                    hidden_states=sample, temb=emb, res_hidden_states_tuple=res_samples, upsample_size=upsample_size
                )

        # 6. post-process
        if self.conv_norm_out:
            sample = self.conv_norm_out(sample)
            sample = self.conv_act(sample)
        sample = self.conv_out(sample)

        if not return_dict:
            return (sample,)

        return UNet2DConditionOutput(sample=sample)

mindone.diffusers.AudioLDM2UNet2DConditionModel.attn_processors: Dict[str, AttentionProcessor] property

RETURNS DESCRIPTION
Dict[str, AttentionProcessor]

dict of attention processors: A dictionary containing all attention processors used in the model with

Dict[str, AttentionProcessor]

indexed by its weight name.

mindone.diffusers.AudioLDM2UNet2DConditionModel.construct(sample, timestep, encoder_hidden_states, class_labels=None, timestep_cond=None, attention_mask=None, cross_attention_kwargs=None, encoder_attention_mask=None, return_dict=True, encoder_hidden_states_1=None, encoder_attention_mask_1=None)

The [AudioLDM2UNet2DConditionModel] forward method.

PARAMETER DESCRIPTION
sample

The noisy input tensor with the following shape (batch, channel, height, width).

TYPE: `mindspore.tensor`

timestep

The number of timesteps to denoise an input.

TYPE: `mindspore.tensor` or `float` or `int`

encoder_hidden_states

The encoder hidden states with shape (batch, sequence_length, feature_dim).

TYPE: `mindspore.tensor`

encoder_attention_mask

A cross-attention mask of shape (batch, sequence_length) is applied to encoder_hidden_states. If True the mask is kept, otherwise if False it is discarded. Mask will be converted into a bias, which adds large negative values to the attention scores corresponding to "discard" tokens.

TYPE: `mindspore.tensor` DEFAULT: None

return_dict

Whether or not to return a [~models.unets.unet_2d_condition.UNet2DConditionOutput] instead of a plain tuple.

TYPE: `bool`, *optional*, defaults to `True` DEFAULT: True

cross_attention_kwargs

A kwargs dictionary that if specified is passed along to the [AttnProcessor].

TYPE: `dict`, *optional* DEFAULT: None

encoder_hidden_states_1

A second set of encoder hidden states with shape (batch, sequence_length_2, feature_dim_2). Can be used to condition the model on a different set of embeddings to encoder_hidden_states.

TYPE: `mindspore.tensor`, *optional* DEFAULT: None

encoder_attention_mask_1

A cross-attention mask of shape (batch, sequence_length_2) is applied to encoder_hidden_states_1. If True the mask is kept, otherwise if False it is discarded. Mask will be converted into a bias, which adds large negative values to the attention scores corresponding to "discard" tokens.

TYPE: `mindspore.tensor`, *optional* DEFAULT: None

RETURNS DESCRIPTION
Union[UNet2DConditionOutput, Tuple]

[~models.unets.unet_2d_condition.UNet2DConditionOutput] or tuple: If return_dict is True, an [~models.unets.unet_2d_condition.UNet2DConditionOutput] is returned, otherwise a tuple is returned where the first element is the sample tensor.

Source code in mindone/diffusers/pipelines/audioldm2/modeling_audioldm2.py
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
def construct(
    self,
    sample: mindspore.tensor,
    timestep: Union[mindspore.tensor, float, int],
    encoder_hidden_states: mindspore.tensor,
    class_labels: Optional[mindspore.tensor] = None,
    timestep_cond: Optional[mindspore.tensor] = None,
    attention_mask: Optional[mindspore.tensor] = None,
    cross_attention_kwargs: Optional[Dict[str, Any]] = None,
    encoder_attention_mask: Optional[mindspore.tensor] = None,
    return_dict: bool = True,
    encoder_hidden_states_1: Optional[mindspore.tensor] = None,
    encoder_attention_mask_1: Optional[mindspore.tensor] = None,
) -> Union[UNet2DConditionOutput, Tuple]:
    r"""
    The [`AudioLDM2UNet2DConditionModel`] forward method.

    Args:
        sample (`mindspore.tensor`):
            The noisy input tensor with the following shape `(batch, channel, height, width)`.
        timestep (`mindspore.tensor` or `float` or `int`): The number of timesteps to denoise an input.
        encoder_hidden_states (`mindspore.tensor`):
            The encoder hidden states with shape `(batch, sequence_length, feature_dim)`.
        encoder_attention_mask (`mindspore.tensor`):
            A cross-attention mask of shape `(batch, sequence_length)` is applied to `encoder_hidden_states`. If
            `True` the mask is kept, otherwise if `False` it is discarded. Mask will be converted into a bias,
            which adds large negative values to the attention scores corresponding to "discard" tokens.
        return_dict (`bool`, *optional*, defaults to `True`):
            Whether or not to return a [`~models.unets.unet_2d_condition.UNet2DConditionOutput`] instead of a plain
            tuple.
        cross_attention_kwargs (`dict`, *optional*):
            A kwargs dictionary that if specified is passed along to the [`AttnProcessor`].
        encoder_hidden_states_1 (`mindspore.tensor`, *optional*):
            A second set of encoder hidden states with shape `(batch, sequence_length_2, feature_dim_2)`. Can be
            used to condition the model on a different set of embeddings to `encoder_hidden_states`.
        encoder_attention_mask_1 (`mindspore.tensor`, *optional*):
            A cross-attention mask of shape `(batch, sequence_length_2)` is applied to `encoder_hidden_states_1`.
            If `True` the mask is kept, otherwise if `False` it is discarded. Mask will be converted into a bias,
            which adds large negative values to the attention scores corresponding to "discard" tokens.

    Returns:
        [`~models.unets.unet_2d_condition.UNet2DConditionOutput`] or `tuple`:
            If `return_dict` is True, an [`~models.unets.unet_2d_condition.UNet2DConditionOutput`] is returned,
            otherwise a `tuple` is returned where the first element is the sample tensor.
    """
    # By default samples have to be AT least a multiple of the overall upsampling factor.
    # The overall upsampling factor is equal to 2 ** (# num of upsampling layers).
    # However, the upsampling interpolation output size can be forced to fit any upsampling size
    # on the fly if necessary.
    # default_overall_up_factor = 2**self.num_upsamplers

    # upsample size should be forwarded when sample is not a multiple of `default_overall_up_factor`
    upsample_size = None
    logger.info("Forward upsample size to force interpolation output size.")
    forward_upsample_size = True

    # ensure attention_mask is a bias, and give it a singleton query_tokens dimension
    # expects mask of shape:
    #   [batch, key_tokens]
    # adds singleton query_tokens dimension:
    #   [batch,                    1, key_tokens]
    # this helps to broadcast it as a bias over attention scores, which will be in one of the following shapes:
    #   [batch,  heads, query_tokens, key_tokens] (e.g. mint sdp attn)
    #   [batch * heads, query_tokens, key_tokens] (e.g. xformers or classic attn)
    if attention_mask is not None:
        # assume that mask is expressed as:
        #   (1 = keep,      0 = discard)
        # convert mask into a bias that can be added to attention scores:
        #       (keep = +0,     discard = -10000.0)
        attention_mask = (1 - attention_mask.to(sample.dtype)) * -10000.0
        attention_mask = attention_mask.unsqueeze(1)

    # convert encoder_attention_mask to a bias the same way we do for attention_mask
    if encoder_attention_mask is not None:
        encoder_attention_mask = (1 - encoder_attention_mask.to(sample.dtype)) * -10000.0
        encoder_attention_mask = encoder_attention_mask.unsqueeze(1)

    if encoder_attention_mask_1 is not None:
        encoder_attention_mask_1 = (1 - encoder_attention_mask_1.to(sample.dtype)) * -10000.0
        encoder_attention_mask_1 = encoder_attention_mask_1.unsqueeze(1)

    # 1. time
    timesteps = timestep
    if not mindspore.is_tensor(timesteps):
        # TODO: this requires sync between CPU and GPU. So try to pass timesteps as tensors if you can
        # This would be a good case for the `match` statement (Python 3.10+)
        if isinstance(timestep, float):
            dtype = mindspore.float64
        else:
            dtype = mindspore.int64
        timesteps = mindspore.tensor([timesteps], dtype=dtype)
    elif len(timesteps.shape) == 0:
        timesteps = timesteps[None]

    # broadcast to batch dimension in a way that's compatible with ONNX/Core ML
    timesteps = timesteps.broadcast_to((sample.shape[0],))

    t_emb = self.time_proj(timesteps)

    # `Timesteps` does not contain any weights and will always return f32 tensors
    # but time_embedding might actually be running in fp16. so we need to cast here.
    # there might be better ways to encapsulate this.
    t_emb = t_emb.to(dtype=sample.dtype)

    emb = self.time_embedding(t_emb, timestep_cond)
    aug_emb = None

    if self.class_embedding is not None:
        if class_labels is None:
            raise ValueError("class_labels should be provided when num_class_embeds > 0")

        if self.config.class_embed_type == "timestep":
            class_labels = self.time_proj(class_labels)

            # `Timesteps` does not contain any weights and will always return f32 tensors
            # there might be better ways to encapsulate this.
            class_labels = class_labels.to(dtype=sample.dtype)

        class_emb = self.class_embedding(class_labels).to(dtype=sample.dtype)

        if self.config.class_embeddings_concat:
            emb = mint.cat([emb, class_emb], dim=-1)
        else:
            emb = emb + class_emb

    emb = emb + aug_emb if aug_emb is not None else emb

    if self.time_embed_act is not None:
        emb = self.time_embed_act(emb)

    # 2. pre-process
    sample = self.conv_in(sample)

    # 3. down
    down_block_res_samples = (sample,)
    for downsample_block in self.down_blocks:
        if hasattr(downsample_block, "has_cross_attention") and downsample_block.has_cross_attention:
            sample, res_samples = downsample_block(
                hidden_states=sample,
                temb=emb,
                encoder_hidden_states=encoder_hidden_states,
                attention_mask=attention_mask,
                cross_attention_kwargs=cross_attention_kwargs,
                encoder_attention_mask=encoder_attention_mask,
                encoder_hidden_states_1=encoder_hidden_states_1,
                encoder_attention_mask_1=encoder_attention_mask_1,
            )
        else:
            sample, res_samples = downsample_block(hidden_states=sample, temb=emb)

        down_block_res_samples += res_samples

    # 4. mid
    if self.mid_block is not None:
        sample = self.mid_block(
            sample,
            emb,
            encoder_hidden_states=encoder_hidden_states,
            attention_mask=attention_mask,
            cross_attention_kwargs=cross_attention_kwargs,
            encoder_attention_mask=encoder_attention_mask,
            encoder_hidden_states_1=encoder_hidden_states_1,
            encoder_attention_mask_1=encoder_attention_mask_1,
        )

    # 5. up
    for i, upsample_block in enumerate(self.up_blocks):
        is_final_block = i == len(self.up_blocks) - 1

        res_samples = down_block_res_samples[-len(upsample_block.resnets) :]
        down_block_res_samples = down_block_res_samples[: -len(upsample_block.resnets)]

        # if we have not reached the final block and need to forward the
        # upsample size, we do it here
        if not is_final_block and forward_upsample_size:
            upsample_size = down_block_res_samples[-1].shape[2:]

        if hasattr(upsample_block, "has_cross_attention") and upsample_block.has_cross_attention:
            sample = upsample_block(
                hidden_states=sample,
                temb=emb,
                res_hidden_states_tuple=res_samples,
                encoder_hidden_states=encoder_hidden_states,
                cross_attention_kwargs=cross_attention_kwargs,
                upsample_size=upsample_size,
                attention_mask=attention_mask,
                encoder_attention_mask=encoder_attention_mask,
                encoder_hidden_states_1=encoder_hidden_states_1,
                encoder_attention_mask_1=encoder_attention_mask_1,
            )
        else:
            sample = upsample_block(
                hidden_states=sample, temb=emb, res_hidden_states_tuple=res_samples, upsample_size=upsample_size
            )

    # 6. post-process
    if self.conv_norm_out:
        sample = self.conv_norm_out(sample)
        sample = self.conv_act(sample)
    sample = self.conv_out(sample)

    if not return_dict:
        return (sample,)

    return UNet2DConditionOutput(sample=sample)

mindone.diffusers.AudioLDM2UNet2DConditionModel.set_attention_slice(slice_size)

Enable sliced attention computation.

When this option is enabled, the attention module splits the input tensor in slices to compute attention in several steps. This is useful for saving some memory in exchange for a small decrease in speed.

PARAMETER DESCRIPTION
slice_size

When "auto", input to the attention heads is halved, so attention is computed in two steps. If "max", maximum amount of memory is saved by running only one slice at a time. If a number is provided, uses as many slices as attention_head_dim // slice_size. In this case, attention_head_dim must be a multiple of slice_size.

TYPE: `str` or `int` or `list(int)`, *optional*, defaults to `"auto"`

Source code in mindone/diffusers/pipelines/audioldm2/modeling_audioldm2.py
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
def set_attention_slice(self, slice_size):
    r"""
    Enable sliced attention computation.

    When this option is enabled, the attention module splits the input tensor in slices to compute attention in
    several steps. This is useful for saving some memory in exchange for a small decrease in speed.

    Args:
        slice_size (`str` or `int` or `list(int)`, *optional*, defaults to `"auto"`):
            When `"auto"`, input to the attention heads is halved, so attention is computed in two steps. If
            `"max"`, maximum amount of memory is saved by running only one slice at a time. If a number is
            provided, uses as many slices as `attention_head_dim // slice_size`. In this case, `attention_head_dim`
            must be a multiple of `slice_size`.
    """
    sliceable_head_dims = []

    def fn_recursive_retrieve_sliceable_dims(module: nn.Cell):
        if hasattr(module, "set_attention_slice"):
            sliceable_head_dims.append(module.sliceable_head_dim)

        for child in module.children():
            fn_recursive_retrieve_sliceable_dims(child)

    # retrieve number of attention layers
    for module in self.children():
        fn_recursive_retrieve_sliceable_dims(module)

    num_sliceable_layers = len(sliceable_head_dims)

    if slice_size == "auto":
        # half the attention head size is usually a good trade-off between
        # speed and memory
        slice_size = [dim // 2 for dim in sliceable_head_dims]
    elif slice_size == "max":
        # make smallest slice possible
        slice_size = num_sliceable_layers * [1]

    slice_size = num_sliceable_layers * [slice_size] if not isinstance(slice_size, list) else slice_size

    if len(slice_size) != len(sliceable_head_dims):
        raise ValueError(
            f"You have provided {len(slice_size)}, but {self.config} has {len(sliceable_head_dims)} different"
            f" attention layers. Make sure to match `len(slice_size)` to be {len(sliceable_head_dims)}."
        )

    for i in range(len(slice_size)):
        size = slice_size[i]
        dim = sliceable_head_dims[i]
        if size is not None and size > dim:
            raise ValueError(f"size {size} has to be smaller or equal to {dim}.")

    # Recursively walk through all the children.
    # Any children which exposes the set_attention_slice method
    # gets the message
    def fn_recursive_set_attention_slice(module: nn.Cell, slice_size: List[int]):
        if hasattr(module, "set_attention_slice"):
            module.set_attention_slice(slice_size.pop())

        for child in module.children():
            fn_recursive_set_attention_slice(child, slice_size)

    reversed_slice_size = list(reversed(slice_size))
    for module in self.children():
        fn_recursive_set_attention_slice(module, reversed_slice_size)

mindone.diffusers.AudioLDM2UNet2DConditionModel.set_attn_processor(processor)

Sets the attention processor to use to compute attention.

PARAMETER DESCRIPTION
processor

The instantiated processor class or a dictionary of processor classes that will be set as the processor for all Attention layers.

If processor is a dict, the key needs to define the path to the corresponding cross attention processor. This is strongly recommended when setting trainable attention processors.

TYPE: `dict` of `AttentionProcessor` or only `AttentionProcessor`

Source code in mindone/diffusers/pipelines/audioldm2/modeling_audioldm2.py
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
def set_attn_processor(self, processor: Union[AttentionProcessor, Dict[str, AttentionProcessor]]):
    r"""
    Sets the attention processor to use to compute attention.

    Parameters:
        processor (`dict` of `AttentionProcessor` or only `AttentionProcessor`):
            The instantiated processor class or a dictionary of processor classes that will be set as the processor
            for **all** `Attention` layers.

            If `processor` is a dict, the key needs to define the path to the corresponding cross attention
            processor. This is strongly recommended when setting trainable attention processors.

    """
    count = len(self.attn_processors.keys())

    if isinstance(processor, dict) and len(processor) != count:
        raise ValueError(
            f"A dict of processors was passed, but the number of processors {len(processor)} does not match the"
            f" number of attention layers: {count}. Please make sure to pass {count} processor classes."
        )

    def fn_recursive_attn_processor(name: str, module: nn.Cell, processor):
        if hasattr(module, "set_processor"):
            if not isinstance(processor, dict):
                module.set_processor(processor)
            else:
                module.set_processor(processor.pop(f"{name}.processor"))

        for sub_name, child in module.named_children():
            fn_recursive_attn_processor(f"{name}.{sub_name}", child, processor)

    for name, module in self.named_children():
        fn_recursive_attn_processor(name, module, processor)

mindone.diffusers.AudioLDM2UNet2DConditionModel.set_default_attn_processor()

Disables custom attention processors and sets the default attention implementation.

Source code in mindone/diffusers/pipelines/audioldm2/modeling_audioldm2.py
604
605
606
607
608
609
610
611
612
613
614
615
616
617
def set_default_attn_processor(self):
    """
    Disables custom attention processors and sets the default attention implementation.
    """
    if all(proc.__class__ in ADDED_KV_ATTENTION_PROCESSORS for proc in self.attn_processors.values()):
        processor = AttnAddedKVProcessor()
    elif all(proc.__class__ in CROSS_ATTENTION_PROCESSORS for proc in self.attn_processors.values()):
        processor = AttnProcessor()
    else:
        raise ValueError(
            f"Cannot call `set_default_attn_processor` when attention processors are of type {next(iter(self.attn_processors.values()))}"
        )

    self.set_attn_processor(processor)

mindone.diffusers.pipelines.AudioPipelineOutput dataclass

Bases: BaseOutput

Output class for audio pipelines.

Source code in mindone/diffusers/pipelines/pipeline_utils.py
100
101
102
103
104
105
106
107
108
109
110
@dataclass
class AudioPipelineOutput(BaseOutput):
    """
    Output class for audio pipelines.

    Args:
        audios (`np.ndarray`)
            List of denoised audio samples of a NumPy array of shape `(batch_size, num_channels, sample_rate)`.
    """

    audios: np.ndarray