跳转到主要内容

支持模型

以下为 MiniMax 提供的语音模型及其特性说明。
模型特性
speech-2.5-hd-preview最新的 HD 模型,韵律表现出色,复刻相似度极佳
speech-2.5-turbo-preview最新的 Turbo 模型,支持 40 个语种
speech-02-hd拥有出色的韵律、稳定性和复刻相似度,音质表现突出
speech-02-turbo拥有出色的韵律和稳定性,小语种能力加强,性能表现出色
speech-01-hd拥有超高的复刻相似度,音质表现突出
speech-01-turbo在出色的生成效果基础上有更快的生成速度

支持语言

MiniMax 的语音合成模型具备卓越的跨语言能力,全面支持 40 种全球广泛使用的语言。我们致力于打破语言壁垒,构建真正意义上的全球通用人工智能模型。 目前支持的语言包含:
支持语种
1. 中文(Chinese)15. 土耳其语(Turkish)28. 马来语(Malay)
2. 粤语(Cantonese)16. 荷兰语(Dutch)29. 波斯语(Persian)
3. 英语(English)17. 乌克兰语(Ukrainian)30. 斯洛伐克语(Slovak)
4. 西班牙语(Spanish)18. 泰语(Thai)31. 瑞典语(Swedish)
5. 法语(French)19. 波兰语(Polish)32. 克罗地亚语(Croatian)
6. 俄语(Russian)20. 罗马尼亚语(Romanian)33. 菲律宾语(Filipino)
7. 德语(German)21. 希腊语(Greek)34. 匈牙利语(Hungarian)
8. 葡萄牙语(Portuguese)22. 捷克语(Czech)35. 挪威语(Norwegian)
9. 阿拉伯语(Arabic)23. 芬兰语(Finnish)36. 斯洛文尼亚语(Slovenian)
10. 意大利语(Italian)24. 印地语(Hindi)37. 加泰罗尼亚语(Catalan)
11. 日语(Japanese)25. 保加利亚语(Bulgarian)38. 尼诺斯克语(Nynorsk)
12. 韩语(Korean)26. 丹麦语(Danish)39. 泰米尔语(Tamil)
13. 印尼语(Indonesian)27. 希伯来语(Hebrew)40. 阿非利卡语(Afrikaans)
14. 越南语(Vietnamese)

流式请求示例

本指南指导,流式播放返回的音频文件,并保存完整音频文件。 ⚠️ 注意,为实时播放音频流,需要先安装 mpv 播放器。并且,需要先将密钥信息写入环境变量 MINIMAX_API_KEY 请求示例

import asyncio
import websockets
import json
import ssl
import subprocess
import os

model = "speech-2.5-hd-preview"
file_format = "mp3"

class StreamAudioPlayer:
def __init__(self):
self.mpv_process = None

    def start_mpv(self):
        """Start MPV player process"""
        try:
            mpv_command = ["mpv", "--no-cache", "--no-terminal", "--", "fd://0"]
            self.mpv_process = subprocess.Popen(
                mpv_command,
                stdin=subprocess.PIPE,
                stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL,
            )
            print("MPV player started")
            return True
        except FileNotFoundError:
            print("Error: mpv not found. Please install mpv")
            return False
        except Exception as e:
            print(f"Failed to start mpv: {e}")
            return False

    def play_audio_chunk(self, hex_audio):
        """Play audio chunk"""
        try:
            if self.mpv_process and self.mpv_process.stdin:
                audio_bytes = bytes.fromhex(hex_audio)
                self.mpv_process.stdin.write(audio_bytes)
                self.mpv_process.stdin.flush()
                return True
        except Exception as e:
            print(f"Play failed: {e}")
            return False
        return False

    def stop(self):
        """Stop player"""
        if self.mpv_process:
            if self.mpv_process.stdin and not self.mpv_process.stdin.closed:
                self.mpv_process.stdin.close()
            try:
                self.mpv_process.wait(timeout=20)
            except subprocess.TimeoutExpired:
                self.mpv_process.terminate()

async def establish_connection(api_key):
"""Establish WebSocket connection"""
url = "wss://api.minimaxi.com/ws/v1/t2a_v2"
headers = {"Authorization": f"Bearer {api_key}"}

    ssl_context = ssl.create_default_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE

    try:
        ws = await websockets.connect(url, additional_headers=headers, ssl=ssl_context)
        connected = json.loads(await ws.recv())
        if connected.get("event") == "connected_success":
            print("Connection successful")
            return ws
        return None
    except Exception as e:
        print(f"Connection failed: {e}")
        return None

async def start_task(websocket):
"""Send task start request"""
start_msg = {
"event": "task_start",
"model": model,
"voice_setting": {
"voice_id": "male-qn-qingse",
"speed": 1,
"vol": 1,
"pitch": 0,
"english_normalization": False
},
"audio_setting": {
"sample_rate": 32000,
"bitrate": 128000,
"format": file_format,
"channel": 1
}
}
await websocket.send(json.dumps(start_msg))
response = json.loads(await websocket.recv())
return response.get("event") == "task_started"

async def continue_task_with_stream_play(websocket, text, player):
"""Send continue request and stream play audio"""
await websocket.send(json.dumps({
"event": "task_continue",
"text": text
}))

    chunk_counter = 1
    total_audio_size = 0
    audio_data = b""

    while True:
        try:
            response = json.loads(await websocket.recv())

            if "data" in response and "audio" in response["data"]:
                audio = response["data"]["audio"]
                if audio:
                    print(f"Playing chunk #{chunk_counter}")
                    audio_bytes = bytes.fromhex(audio)
                    if player.play_audio_chunk(audio):
                        total_audio_size += len(audio_bytes)
                        audio_data += audio_bytes
                        chunk_counter += 1

            if response.get("is_final"):
                print(f"Audio synthesis completed: {chunk_counter-1} chunks")
                if player.mpv_process and player.mpv_process.stdin:
                    player.mpv_process.stdin.close()

                # Save audio to file
                with open(f"output.{file_format}", "wb") as f:
                    f.write(audio_data)
                print(f"Audio saved as output.{file_format}")

                estimated_duration = total_audio_size * 0.0625 / 1000
                wait_time = max(estimated_duration + 5, 10)
                return wait_time

        except Exception as e:
            print(f"Error: {e}")
            break

    return 10

async def close_connection(websocket):
"""Close connection"""
if websocket:
try:
await websocket.send(json.dumps({"event": "task_finish"}))
await websocket.close()
except Exception:
pass

async def main():
API_KEY = os.getenv("MINIMAX_API_KEY")
TEXT = "真正的危险不是计算机开始像人一样思考,而是人开始像计算机一样思考。计算机只是可以帮我们处理一些简单事务。"

    player = StreamAudioPlayer()

    try:
        if not player.start_mpv():
            return

        ws = await establish_connection(API_KEY)
        if not ws:
            return

        if not await start_task(ws):
            print("Task startup failed")
            return

        wait_time = await continue_task_with_stream_play(ws, TEXT, player)
        await asyncio.sleep(wait_time)

    except Exception as e:
        print(f"Error: {e}")
    finally:
        player.stop()
        if 'ws' in locals():
            await close_connection(ws)

if name == "__main__":
asyncio.run(main())
I