新公司要整一个AI相关产品,进行了对应的难点的技术调研,做个代码记录
流式文字聊天
注意项:
client的超时时长最好整长些
文字的动态效果,实际是由服务端那边控制,我们只需要负责文字的追加即可,不需要自己再进行相关的动画追加
suspend fun postWithSteam(
path: String,
body: String,
headers: Map<String, String>? = null,
mediaType: String = "application/json; charset=utf-8",
): BufferedSource = withContext(Dispatchers.IO) {
val requestBody = body.toRequestBody(mediaType.toMediaType())
val requestBuilder = Request.Builder().url(fullUrl(path)).withToken()
headers?.forEach { (k, v) -> requestBuilder.addHeader(k, v) }
val request = requestBuilder.post(requestBody).build()
clientForSteam.newCall(request).await().let { response ->
if (!response.isSuccessful) throw IOException("Unexpected code $response")
response.body?.source() ?: throw IOException("Empty response body")
}
}
**
* 流式聊天接口
* @param onValueChange 追加每行文本数据
*/
suspend fun chatStream(
question: String,
modelName: String,
conversationId: String = "None",
onValueChange: (ChatResp) -> Unit,
) = withContext(Dispatchers.IO) {
val chatRequest = ChatRequest.build(question, modelName, conversationId)
val body = chatRequest.toJsonString()
val source = postWithSteam(PATH_CHAT_STREAM, body)
runCatching {
source.use {
val buffer = StringBuilder()
while (!source.exhausted()) {
val chunk = source.readUtf8(88)
buffer.append(chunk)
// 查找所有完整的JSON片段(这里如果服务端是以\n换行,则比较好处理,按行遍历即可,但是我们这服务端没这样做,无语...)
var start = buffer.indexOf("{")
var end = buffer.indexOf("}", start)
while (start != -1 && end != -1) {
val jsonStr = buffer.substring(start, end + 1)
withContext(Dispatchers.Main) {
//json数据转为实体类
val data = jsonStr.fromJson<ChatResp>()
onValueChange(data)
}
buffer.delete(0, end + 1)
start = buffer.indexOf("{")
end = buffer.indexOf("}", start)
}
}
}
}.exceptionOrNull()?.also {
Log.d("ttt", "chatStream: 错误 ${it.message}", it)
}
}
实时聊天
注:下面仅做代码摘录,复制是跑不起来的, 还需要自己修改下,实际上需要调用对应模型api获取对应的key
通用工具类封装
AiChatBase,一个通用的抽象的封装,主要是给页面进行调用
有2个状态isConnected,isRemoteAudioPlaying进行使用
isConnected: 表明已经成功连接音频了
isRemoteAudioPlaying: 表明远程音频正在播放(主要是控制展示)
按住说话和松开逻辑,则使用启用或禁用音频轨道来使用 对应的方法: allowRecordVoice
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
abstract class AiChatBase {
/**
* 是否已经成功连接(webrtc或websocket协议是否已经连接)
*/
protected val _isConnected = MutableStateFlow(false)
val isConnected: StateFlow<Boolean> = _isConnected
/**
* 初始化过程是否有出现错误
*/
protected val _isError = MutableStateFlow(false)
val isError: StateFlow<Boolean> = _isError
/**
* AI是否在说话(远程是否在说话)
*/
protected val _isRemoteAudioPlaying = MutableStateFlow(false)
val isRemoteAudioPlaying: StateFlow<Boolean> = _isRemoteAudioPlaying
abstract suspend fun initModel(modelName:String="" ,voice:String)
abstract fun allowRecordVoice(flag: Boolean)
/**
* 关闭连接,释放资源
*/
abstract fun release()
}
AudioTaskPlayer
(播放AI音频的工具类,实际服务端返回的是pcm文件的base64):
class AudioTaskPlayer(){
private val sampleRate = 24000
private val audioTrack by lazy {
val channelConfig = AudioFormat.CHANNEL_OUT_MONO
val audioFormat = AudioFormat.ENCODING_PCM_16BIT
val minBufferSize = AudioTrack.getMinBufferSize(sampleRate, channelConfig, audioFormat)
AudioTrack(
AudioManager.STREAM_MUSIC,
sampleRate,
channelConfig,
audioFormat,
minBufferSize,
AudioTrack.MODE_STREAM
)
}
private val channelCount = 1
private val bytesPerSample = 2 // 16bit = 2 bytes
private var totalDurationSec = 0.0
private var lastAppendDurationSec = 0.0
fun append(base64String:String){
val audioBytes: ByteArray = Base64.decode(base64String, Base64.NO_WRAP)
audioTrack.play()
audioTrack.write(audioBytes, 0, audioBytes.size)
// 计算本次追加的音频时长
lastAppendDurationSec = audioBytes.size.toDouble() / (sampleRate * channelCount * bytesPerSample)
totalDurationSec += lastAppendDurationSec
Log.d("AudioTaskPlayer", "本次追加音频时长: ${lastAppendDurationSec}s, 总时长: ${totalDurationSec}s")
}
fun getLastAppendDurationSec(): Double {
return lastAppendDurationSec
}
fun getTotalDurationSec(): Double {
return totalDurationSec
}
/**
* 暂停播放及刷新缓存区里的数据
*
*/
fun pauseAndFlush(){
audioTrack.pause()
audioTrack.flush()
}
fun release() {
audioTrack.release()
}
}
ManualAudioRecorder
(用来录音的工具类):
import android.annotation.SuppressLint
import android.content.Context
import android.media.AudioFormat
import android.media.AudioRecord
import android.media.MediaRecorder
import android.util.Base64
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import java.io.ByteArrayOutputStream
/**
* 用来录音的工具类
*
*/
@SuppressLint("MissingPermission")
class ManualAudioRecorder() {
private val sampleRate = 16000
private val channelConfig = AudioFormat.CHANNEL_IN_MONO
private val audioFormat = AudioFormat.ENCODING_PCM_16BIT
private val minBufferSize = AudioRecord.getMinBufferSize(sampleRate, channelConfig, audioFormat)
private val audioRecord by lazy {
AudioRecord(
MediaRecorder.AudioSource.MIC,
sampleRate,
channelConfig,
audioFormat,
minBufferSize
)
}
private var recordingJob: Job? = null
private var outputStream: ByteArrayOutputStream? = null
fun startRecording() {
outputStream = ByteArrayOutputStream()
audioRecord?.startRecording()
recordingJob = CoroutineScope(Dispatchers.IO).launch {
val buffer = ByteArray(minBufferSize)
while (isActive && audioRecord?.recordingState == AudioRecord.RECORDSTATE_RECORDING) {
val read = audioRecord?.read(buffer, 0, buffer.size) ?: 0
if (read > 0) {
outputStream?.write(buffer, 0, read)
}
}
}
}
/**
* 停止录音,并将录音文件按8KB分割,适配websocket的单帧包传输
*
* @return base64的数据String列表(每个item实际的音频数据大小最大限制为8KB)
*/
fun stopRecordingList(): List<String> {
recordingJob?.cancel()
audioRecord?.stop()
val audioBytes = outputStream?.toByteArray()
outputStream?.close()
outputStream = null
val chunkSize = 8 * 1024 // 8KB
val result = mutableListOf<String>()
if (audioBytes != null) {
var offset = 0
while (offset < audioBytes.size) {
val end = minOf(offset + chunkSize, audioBytes.size)
val chunk = audioBytes.copyOfRange(offset, end)
val base64Chunk = Base64.encodeToString(chunk, Base64.NO_WRAP)
result.add(base64Chunk)
offset = end
}
}
return result
}
/**
* 停止录音,并返回录音的base64字符串数据
*
* @return
*/
fun stopRecording(): String? {
recordingJob?.cancel()
audioRecord?.stop()
val audioBytes = outputStream?.toByteArray()
outputStream?.close()
outputStream = null
return if (audioBytes != null) {
Base64.encodeToString(audioBytes, Base64.NO_WRAP)
} else {
null
}
}
/**
* 释放资源
*
*/
fun release() {
audioRecord.stop()
audioRecord?.release()
}
}
实时语音聊天(oepnai)
这个的底层是使用的webrtc协议
需要导入依赖
// WebRTC https://github.com/GetStream/webrtc-android
implementation("io.getstream:stream-webrtc-android:1.3.8")
Ai封装的工具类:
import android.media.AudioManager
import com.redteamobile.aicamera.api.MyApi
import com.redteamobile.aicamera.application
import com.redteamobile.aicamera.myCo
import com.redteamobile.aicamera.utils.logd
import com.redteamobile.aicamera.utils.loge
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import okhttp3.MediaType.Companion.toMediaType
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.RequestBody.Companion.toRequestBody
import okhttp3.logging.HttpLoggingInterceptor
import org.webrtc.AudioTrack
import org.webrtc.DataChannel
import org.webrtc.IceCandidate
import org.webrtc.MediaConstraints
import org.webrtc.MediaStream
import org.webrtc.PeerConnection
import org.webrtc.PeerConnection.IceServer
import org.webrtc.PeerConnectionFactory
import org.webrtc.RtpReceiver
import org.webrtc.SdpObserver
import org.webrtc.SessionDescription
import org.webrtc.audio.JavaAudioDeviceModule
import java.util.concurrent.TimeUnit
/**
* open ai实时对话工具类
*/
class OpenAiChatUtil: AiChatBase() {
private var API_KEY = "YOUR_API_KEY"
private val BASE_URL = "https://api.openai.com/v1/realtime"
private val MODEL = "gpt-4o-realtime-preview-2024-12-17"
private val client by lazy {
OkHttpClient.Builder()
.addInterceptor(HttpLoggingInterceptor().apply {
level = HttpLoggingInterceptor.Level.BODY
})
.addNetworkInterceptor { chain ->
val oriRequest = chain.request()
val newReqeust =
oriRequest.newBuilder().header("Content-Type", "application/sdp").build()
chain.proceed(newReqeust)
}
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.build()
}
private var audioManager: AudioManager? = null
// 通用的音频约束配置
private fun createAudioConstraints() = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("googEchoCancellation", "true"))
mandatory.add(MediaConstraints.KeyValuePair("googNoiseSuppression", "true"))
mandatory.add(MediaConstraints.KeyValuePair("googHighpassFilter", "true"))
}
suspend fun sendOffer(sdp: SessionDescription) = withContext(Dispatchers.IO) {
val requestBody = sdp.description.toRequestBody("application/sdp".toMediaType())
val request = Request.Builder()
.url("${BASE_URL}?model=${MODEL}")
.post(requestBody)
.addHeader("Authorization", "Bearer ${API_KEY}")
.addHeader("Content-Type", "application/sdp")
.build()
client.newCall(request).execute()
}
private var remoteAudioTrack: AudioTrack? = null
// 音频检测相关参数
private var lastAudioTimestamp = 0L
private val SILENCE_THRESHOLD = 500 // 音量阈值
private val AUDIO_TIMEOUT = 100L // 100ms无声音则认为停止播放
private var peerConnection :PeerConnection?=null
private lateinit var audioTrack :AudioTrack
private lateinit var factory :PeerConnectionFactory
init {
runCatching {
// 设置使用扬声器
audioManager = application.getSystemService(AudioManager::class.java)
audioManager?.apply {
mode = AudioManager.MODE_IN_COMMUNICATION
isSpeakerphoneOn = true
}
// 初始化PeerConnectionFactory
val options = PeerConnectionFactory.InitializationOptions.builder(application)
.setEnableInternalTracer(true)
.createInitializationOptions()
PeerConnectionFactory.initialize(options)
// 创建音频设备模块
val audioDeviceModule = JavaAudioDeviceModule.builder(application)
.setUseHardwareAcousticEchoCanceler(true)
.setUseHardwareNoiseSuppressor(true)
.createAudioDeviceModule()
factory = PeerConnectionFactory.builder()
.setAudioDeviceModule(audioDeviceModule)
.createPeerConnectionFactory()
// 创建PeerConnection配置
val iceServers = listOf(
IceServer.builder("stun:stun.l.google.com:19302").createIceServer()
)
val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
bundlePolicy = PeerConnection.BundlePolicy.MAXBUNDLE
rtcpMuxPolicy = PeerConnection.RtcpMuxPolicy.REQUIRE
}
val audioSource = factory.createAudioSource(createAudioConstraints())
audioTrack = factory.createAudioTrack("audio0", audioSource).apply {
// 初始时禁用音频轨道
this?.setEnabled(false)
}
// 创建PeerConnection并设置Observer
peerConnection = factory.createPeerConnection(rtcConfig, object : PeerConnection.Observer {
override fun onConnectionChange(newState: PeerConnection.PeerConnectionState) {
//更新当前的连接状态
_isConnected.update { newState == PeerConnection.PeerConnectionState.CONNECTED }
when (newState) {
PeerConnection.PeerConnectionState.NEW -> "WebRTC状态: NEW - 初始化状态".logd()
PeerConnection.PeerConnectionState.CONNECTING -> "WebRTC状态: CONNECTING - 正在建立连接".logd()
PeerConnection.PeerConnectionState.CONNECTED -> {
"WebRTC状态: CONNECTED - 连接成功,可以开始音频传输".logd()
_isConnected.update { true }
_isError.update { false }
}
PeerConnection.PeerConnectionState.DISCONNECTED -> {
"WebRTC状态: DISCONNECTED - 连接断开,可能是网络问题".logd()
_isConnected.update { false }
}
PeerConnection.PeerConnectionState.FAILED -> {
"WebRTC状态: FAILED - 连接失败,需要重新建立连接".logd()
_isConnected.update { false }
}
PeerConnection.PeerConnectionState.CLOSED -> {
"WebRTC状态: CLOSED - 连接已关闭".logd()
_isConnected.update { false }
}
}
}
override fun onSignalingChange(p0: PeerConnection.SignalingState?) {
"信令状态改变: $p0".logd()
}
override fun onIceConnectionChange(p0: PeerConnection.IceConnectionState?) {
"ICE连接状态改变: $p0".logd()
}
override fun onIceConnectionReceivingChange(p0: Boolean) {
"ICE接收状态改变: $p0".logd()
}
override fun onIceGatheringChange(p0: PeerConnection.IceGatheringState?) {
"ICE收集状态改变: $p0".logd()
}
override fun onIceCandidate(p0: IceCandidate?) {}
override fun onIceCandidatesRemoved(p0: Array<out IceCandidate>?) {}
override fun onAddStream(p0: MediaStream?) {
"收到远程音频流".logd()
}
override fun onRemoveStream(p0: MediaStream?) {
"远程音频流移除".logd()
}
override fun onDataChannel(p0: DataChannel?) {
"数据通道创建: ${p0?.label()}".logd()
}
override fun onRenegotiationNeeded() {
"需要重新协商".logd()
}
override fun onAddTrack(p0: RtpReceiver?, p1: Array<out MediaStream>?) {
"收到远程Track".logd()
p0?.track()?.let { track ->
if (track.kind() == "audio") {
remoteAudioTrack = track as AudioTrack
remoteAudioTrack?.addSink { audioData, channelCount, sampleRate, samplesPerChannel, bitsPerSample, timestamp ->
if (audioData == null || !audioData.hasRemaining()) return@addSink
// 计算音频数据的平均振幅
var sum = 0.0
val buffer = ShortArray(audioData.remaining() / 2)
audioData.asShortBuffer().get(buffer)
for (sample in buffer) {
sum += Math.abs(sample.toDouble())
}
val averageAmplitude = sum / buffer.size
val currentTime = System.currentTimeMillis()
val hasSound = averageAmplitude > SILENCE_THRESHOLD
if (hasSound) {
lastAudioTimestamp = currentTime
} else if (currentTime - lastAudioTimestamp > AUDIO_TIMEOUT) {
myCo.launch(Dispatchers.Main) {
_isRemoteAudioPlaying.update { false }
}
return@addSink
}
myCo.launch(Dispatchers.Main) {
_isRemoteAudioPlaying.update { hasSound }
}
}
}
}
}
})
// 使用AddTrack替代AddStream
peerConnection?.addTrack(audioTrack)
}.onFailure {
"初始化WebRTC失败: ${it.message}".loge(it)
_isConnected.update { false }
_isError.update { true }
}
}
override suspend fun initModel(modelName:String , voice:String) = withContext(Dispatchers.IO){
val mModel = modelName.takeIf { it.isNotBlank() }?:MODEL
val mVoice = voice.takeIf { it.isNotBlank() }?:"shimmer"
//1.获取key
val resp = MyApi.createRealtimeSession(mModel, mVoice)
API_KEY = resp.client_secret.value
//2.开始建立连接
// 创建并发送offer
createAndSendOffer()
}
// 创建并发送offer的函数
private fun createAndSendOffer() {
try {
"开始创建offer".logd()
val offerConstraints = MediaConstraints().apply {
mandatory.add(MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true"))
}
peerConnection?.createOffer(object : SdpObserver {
override fun onCreateSuccess(sdp: SessionDescription) {
"offer创建成功".logd()
peerConnection?.setLocalDescription(object : SdpObserver {
override fun onSetSuccess() {
"本地SDP设置成功,准备发送到服务器".logd()
sdp.description?.let { offerSdp ->
myCo.launch {
try {
val response = sendOffer(sdp)
if (response.isSuccessful) {
val answerSdp = response.body?.string()
// "收到服务器answer响应: $answerSdp".logd()
answerSdp?.let { sdpStr ->
val answer = SessionDescription(
SessionDescription.Type.ANSWER,
sdpStr
)
peerConnection?.setRemoteDescription(object :
SdpObserver {
override fun onSetSuccess() {
"远程SDP设置成功".logd()
}
override fun onSetFailure(error: String?) {
"设置远程SDP失败: $error".loge()
_isConnected.update { false }
_isError.update { true }
}
override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, answer)
}
} else {
"服务器响应错误: ${response.code}".loge()
_isError.update { true }
_isConnected.update { false }
}
} catch (e: Exception) {
"网络请求失败: ${e.message}".loge(e)
_isConnected.update { false }
_isError.update { true }
}
}
}
}
override fun onSetFailure(error: String?) {
"设置本地SDP失败: $error".loge()
_isConnected.update { false }
_isError.update { true }
}
override fun onCreateSuccess(p0: SessionDescription?) {}
override fun onCreateFailure(p0: String?) {}
}, sdp)
}
override fun onCreateFailure(error: String?) {
"创建offer失败: $error".loge()
_isConnected.update { false }
_isError.update { true }
}
override fun onSetSuccess() {}
override fun onSetFailure(p0: String?) {}
}, offerConstraints)
} catch (e: Exception) {
"创建offer过程发生异常: ${e.message}".loge(e)
_isConnected.update { false }
_isError.update { true }
}
}
/**
* 是否允许录音
*
* @param flag
*/
override fun allowRecordVoice(flag: Boolean){
audioTrack.setEnabled(flag)
}
override fun release() {
runCatching {
// 恢复音频设置
audioManager?.apply {
mode = AudioManager.MODE_NORMAL
isSpeakerphoneOn = false
}
audioManager = null
peerConnection?.close()
peerConnection = null
_isConnected.update { false }
remoteAudioTrack?.dispose()
remoteAudioTrack = null
_isRemoteAudioPlaying.update { false }
}
}
/**
* 切换音频输出设备
* @param useSpeaker true使用扬声器,false使用听筒
*/
fun switchAudioOutput(useSpeaker: Boolean) {
audioManager?.isSpeakerphoneOn = useSpeaker
}
/**
* 获取当前是否使用扬声器
* @return true表示使用扬声器,false表示使用听筒
*/
fun isSpeakerphoneOn(): Boolean {
return audioManager?.isSpeakerphoneOn ?: false
}
}
实时语音聊天(阿里百炼)
注意:需要依赖okhttp
AliChat(阿里百炼大模型的对接):
/**
*
* 阿里百炼大模型(websocket协议)
*
* 文档地址: [https://help.aliyun.com/zh/model-studio/realtime?spm=a2c4g.11186623.0.0.753a2c71zI1ocS#729529f3da40n](https://help.aliyun.com/zh/model-studio/realtime?spm=a2c4g.11186623.0.0.753a2c71zI1ocS#729529f3da40n)
*
* 服务端会返回多段音频的base64字符串,需要自行控制处理进行播放
*
* 基于 WebSocket (wss) 实现的 AI 实时语音对话工具类
* 假设音频格式为 PCM 16K 单声道 16bit
*/
@SuppressLint("MissingPermission")
class AliChat() : AiChatBase(){
private var apiKey = "YOUR_API_KEY"
private val wsUrl = "wss://dashscope.aliyuncs.com/api-ws/v1/realtime"
private val MODEL = "qwen-omni-turbo-realtime"
private val client by lazy {
OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.build()
}
private var webSocket: WebSocket? = null
//是否是为一次请求数据(避免多轮对话音频混合了)
var hasStratResp = false
//用来定时更新播放状态
var updatePlayingStateJob :Job?=null
//录音工具类
val recordUtil by lazy {
ManualAudioRecorder()
}
//播放声音工具类
val audioTaskPlayer by lazy {
AudioTaskPlayer()
}
init {
//isRemoteAudioPlaying,这个通过每次追加的音频数据,自动计算播放的总时长,然后在协程中判断时间来决定是否为ai说话状态
updatePlayingStateJob = myCo.launch {
var time = 0L
while (true){
if(isRemoteAudioPlaying.value.not()){
time = 0L
delay(1000)
}else{
if (time==0L){
time = System.currentTimeMillis()
}
val cTime = System.currentTimeMillis()
if((cTime-time)>=audioTaskPlayer.getTotalDurationSec()*1000){
//算播完了
_isRemoteAudioPlaying.value = false
}else{
delay(1000)
}
}
}
}
}
override suspend fun initModel(modelName:String, voice:String) = withContext(Dispatchers.IO){
val mModelName = modelName.takeIf { it.isNotBlank() }?:MODEL
//todo 声音传参还没传
val mVoice = "Chelsie"
//支持音色
//其中商业版模型voice参数可选值为:["Cherry", "Serena", "Ethan", "Chelsie"],开源版模型voice参数可选值为:["Ethan", "Chelsie"]
val resp = MyApi.createRealtimeSession(mModelName, mVoice)
apiKey= resp.client_secret.value
connect()
}
/**
* 建立 WebSocket 连接
*/
private fun connect() {
val request = Request.Builder()
.url("$wsUrl?model=$MODEL")
.addHeader("Authorization", "Bearer $apiKey")
.build()
webSocket = client.newWebSocket(request, object : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
_isConnected.value = true
_isError.value = false
Log.d("AiChatUtilNew", "WebSocket 连接成功")
}
override fun onMessage(webSocket: WebSocket, text: String) {
// 可选:处理文本消息
Log.d("AiChatUtilNew", "收到文本消息: $text")
val jsonObject = JSONObject(text)
val type = jsonObject.getString("type")
when(type.toLowerCase()){
"session.created"->{ //创建链接成功后,设置不要自动检测语音模式
val str = """
{
"event_id": "event_${System.currentTimeMillis()}",
"type": "session.update",
"session": {
"turn_detection": null
}
}
""".trimIndent()
webSocket.send(str)
}
"response.created"->{
hasStratResp = true
}
"response.audio.delta"->{//获取到音频数据,追加到播放列表中并触发播放
val base64 = jsonObject.getString("delta")
if (hasStratResp){
playAudio(base64)
}
}
"response.done"->{
hasStratResp = false
}
}
}
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
_isConnected.value = false
Log.d("AiChatUtilNew", "WebSocket 正在关闭: $reason")
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
_isConnected.value = false
Log.d("AiChatUtilNew", "WebSocket 已关闭: $reason")
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
_isConnected.value = false
_isError.value = true
Log.e("AiChatUtilNew", "WebSocket 连接失败: ${t.message}")
}
})
}
/**
* 开始语音对话(采集麦克风音频并发送)
*/
override fun allowRecordVoice(flag: Boolean){
if (flag){
//是否请求成功
hasStratResp = false
//先停止ai播放声音,再触发录音
MediaPlaylistManager.getInstance().stop()
MediaPlaylistManager.getInstance().clearPlaylist()
startVoiceChat()
}else{
stopVoiceChat()
}
}
@SuppressLint("MissingPermission")
private fun startVoiceChat() {
if (!_isConnected.value) return
audioTaskPlayer.pauseAndFlush()
recordUtil.startRecording()
}
/**
* 停止语音对话(停止采集和发送)
*/
private fun stopVoiceChat() {
val tempVoiceBase64 = recordUtil.stopRecordingList()
sendVoiceToServer(tempVoiceBase64)
}
private fun sendVoiceToServer(voiceBase64List:List<String>){
//追加音频流缓存区
voiceBase64List.forEach {voiceBase64->
val str ="""
{
"event_id": "event_cc${System.currentTimeMillis()}",
"type": "input_audio_buffer.append",
"audio": "${voiceBase64}"
}
""".trimIndent()
//websocket单帧包,最大限制
if (str.length>=262144){
toast("Sorry,somethings is wrong.Please retry.")
return
}
webSocket?.send(str).also { Log.d("AiChatUtilNew", "sendVoiceToServer: 追加音频流缓存") }
}
//确认提交
val s = """
{
"event_id": "event_jk${System.currentTimeMillis()}",
"type": "input_audio_buffer.commit"
}
""".trimIndent()
webSocket?.send(s).also { Log.d("AiChatUtilNew", "sendVoiceToServer: 提交音频数据") }
val s1 = """
{
"type": "response.create",
"response": {
"modalities": ["text", "audio"]
}
}
""".trimIndent()
webSocket?.send(s1).also { Log.d("AiChatUtilNew", "sendVoiceToServer: 触发服务端响应") }
}
/**
* 播放AI返回的音频
*/
private fun playAudio(audioData: String) {
//里面已经包含了第一个数据的处理的
audioTaskPlayer.append(audioData)
_isRemoteAudioPlaying.value = true
}
/**
* 关闭连接,释放资源
*/
override fun release() {
//录音工具释放资源
recordUtil.release()
//音频播放工具清空播放列表
MediaPlaylistManager.Companion.getInstance().clearPlaylist()
audioTaskPlayer.pauseAndFlush()
audioTaskPlayer.release()
updatePlayingStateJob?.cancel()
webSocket?.close(1000, "用户主动关闭")
webSocket = null
_isConnected.value = false
}
}
实时聊天(智谱)
import android.annotation.SuppressLint
import android.util.Log
import com.blankj.utilcode.util.PathUtils
import com.redteamobile.aicamera.api.MyApi
import com.redteamobile.aicamera.myCo
import com.redteamobile.aicamera.utils.ManualAudioRecorder
import com.redteamobile.aicamera.utils.MediaPlaylistManager
import com.redteamobile.aicamera.utils.toast
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okhttp3.WebSocket
import okhttp3.WebSocketListener
import org.json.JSONObject
import java.io.File
import java.util.concurrent.TimeUnit
/**
* 智谱AI实时对话(websocket协议)
*
* 对接坑点: 我们录音得到的格式是pcm而不是wav格式
* 文档地址 [https://bigmodel.cn/dev/api/rtav/GLM-Realtime](https://bigmodel.cn/dev/api/rtav/GLM-Realtime)
*
*/
class ZhipuAiChat: AiChatBase(){
private var apiKey = "YOUR_API_KEY"
private val wsUrl = "wss://open.bigmodel.cn/api/paas/v4/realtime"
private val MODEL = "glm-realtime-flash"
private val client by lazy {
OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.build()
}
private var webSocket: WebSocket? = null
//是否是为一次请求数据(避免多轮对话音频混合了)
var hasStratResp = false
//用来定时更新播放状态
var updatePlayingStateJob :Job?=null
//录音工具类
val recordUtil by lazy {
ManualAudioRecorder()
}
//播放声音工具类
val audioTaskPlayer by lazy {
AudioTaskPlayer()
}
init {
//isRemoteAudioPlaying,这个通过每次追加的音频数据,自动计算播放的总时长,然后在协程中判断时间来决定是否为ai说话状态
updatePlayingStateJob = myCo.launch {
var time = 0L
while (true){
if(isRemoteAudioPlaying.value.not()){
time = 0L
delay(1000)
}else{
if (time==0L){
time = System.currentTimeMillis()
}
val cTime = System.currentTimeMillis()
if((cTime-time)>=audioTaskPlayer.getTotalDurationSec()*1000){
//算播完了
_isRemoteAudioPlaying.value = false
}else{
delay(1000)
}
}
}
}
}
override suspend fun initModel(modelName:String, voice:String) = withContext(Dispatchers.IO){
val mModelName = modelName.takeIf { it.isNotBlank() }?:MODEL
//女声:tongtong;
//男声:xiaochen;
//默认女声:tongtong.
//甜美女性:female-tianmei
//青年大学生:male-qn-daxuesheng.
//精英青年:male-qn-jingying.
//萌萌女童:lovely_girl.
//少女:female-shaonv
//todo 声音传参还没传
//更改语音是在里面 "voice":"female-shaonv" 执行
val mVoice = "female-shaonv"
val resp = MyApi.createRealtimeSession(mModelName, mVoice)
apiKey= resp.client_secret.value
connect()
}
/**
* 建立 WebSocket 连接
*/
private fun connect() {
val request = Request.Builder()
.url("$wsUrl?model=$MODEL")
.addHeader("Authorization", "Bearer $apiKey")
.build()
webSocket = client.newWebSocket(request, object : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
_isConnected.value = true
_isError.value = false
Log.d("ZhipuAiChat", "WebSocket 连接成功")
}
override fun onMessage(webSocket: WebSocket, text: String) {
// 可选:处理文本消息
Log.d("ZhipuAiChat", "收到文本消息: $text")
val jsonObject = JSONObject(text)
val type = jsonObject.getString("type")
when(type.toLowerCase()){
"session.created"->{ //创建链接成功后,设置不要自动检测语音模式
val str = """
{
"event_id": "event_${System.currentTimeMillis()}",
"type": "session.update",
"session": {
"voice":"female-shaonv",
"input_audio_format": "pcm",
"output_audio_format": "pcm",
"turn_detection": {
"type": "client_vad"
},
"beta_fields": {
"chat_mode": "audio"
}
}
}
""".trimIndent()
webSocket.send(str)
}
"response.created"->{
hasStratResp = true
}
"response.audio.delta"->{//获取到音频数据,追加到播放列表中并触发播放
val base64 = jsonObject.getString("delta")
if (hasStratResp){
playAudio(base64)
}
}
"response.done"->{
hasStratResp = false
}
}
}
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
_isConnected.value = false
Log.d("ZhipuAiChat", "WebSocket 正在关闭: $reason")
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
_isConnected.value = false
Log.d("ZhipuAiChat", "WebSocket 已关闭: $reason")
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
_isConnected.value = false
_isError.value = true
Log.e("ZhipuAiChat", "WebSocket 连接失败: ${t.message}")
}
})
}
/**
* 开始语音对话(采集麦克风音频并发送)
*/
override fun allowRecordVoice(flag: Boolean){
if (flag){
//是否请求成功
hasStratResp = false
//先停止ai播放声音,再触发录音
MediaPlaylistManager.Companion.getInstance().stop()
MediaPlaylistManager.Companion.getInstance().clearPlaylist()
startVoiceChat()
}else{
stopVoiceChat()
}
}
@SuppressLint("MissingPermission")
private fun startVoiceChat() {
if (!_isConnected.value) return
audioTaskPlayer.pauseAndFlush()
recordUtil.startRecording()
}
/**
* 停止语音对话(停止采集和发送)
*/
private fun stopVoiceChat() {
val tempVoiceBase64 = recordUtil.stopRecordingList()
sendVoiceToServer(tempVoiceBase64)
}
private fun sendVoiceToServer(voiceBase64List:List<String>){
voiceBase64List.forEach {voiceBase64->
//追加音频流缓存区
val str ="""
{
"type": "input_audio_buffer.append",
"audio": "${voiceBase64}",
"client_timestamp":${System.currentTimeMillis()}
}
""".trimIndent()
//websocket单帧包,最大限制
if (str.length>=262144){
toast("Sorry,somethings is wrong.Please retry.")
return
}
webSocket?.send(str).also { Log.d("ZhipuAiChat", "sendVoiceToServer: 追加音频流缓存") }
}
//确认提交
val s = """
{
"client_timestamp": ${System.currentTimeMillis()}",
"type": "input_audio_buffer.commit"
}
""".trimIndent()
webSocket?.send(s).also { Log.d("ZhipuAiChat", "sendVoiceToServer: 提交音频数据") }
val s1 = """
{
"type": "response.create"
}
""".trimIndent()
webSocket?.send(s1).also { Log.d("ZhipuAiChat", "sendVoiceToServer: 触发服务端响应") }
}
/**
* 播放AI返回的音频
*/
private fun playAudio(audioData: String) {
//里面已经包含了第一个数据的处理的
audioTaskPlayer.append(audioData)
_isRemoteAudioPlaying.value = true
}
/**
* 关闭连接,释放资源
*/
override fun release() {
//录音工具释放资源
recordUtil.release()
//音频播放工具清空播放列表
MediaPlaylistManager.Companion.getInstance().clearPlaylist()
audioTaskPlayer.pauseAndFlush()
audioTaskPlayer.release()
updatePlayingStateJob?.cancel()
webSocket?.close(1000, "用户主动关闭")
webSocket = null
_isConnected.value = false
}
}
实时聊天的页面
效果:
大概描述,就是长按说话,松开则触发AI回答,同时AI右上角会有个说话动态图的效果
源码如下:
import android.Manifest
import android.content.pm.PackageManager
import androidx.activity.compose.rememberLauncherForActivityResult
import androidx.activity.result.contract.ActivityResultContracts
import androidx.compose.foundation.Image
import androidx.compose.foundation.background
import androidx.compose.foundation.gestures.detectTapGestures
import androidx.compose.foundation.layout.Arrangement
import androidx.compose.foundation.layout.Box
import androidx.compose.foundation.layout.Column
import androidx.compose.foundation.layout.Row
import androidx.compose.foundation.layout.fillMaxSize
import androidx.compose.foundation.layout.fillMaxWidth
import androidx.compose.foundation.layout.height
import androidx.compose.foundation.layout.padding
import androidx.compose.foundation.layout.size
import androidx.compose.foundation.shape.RoundedCornerShape
import androidx.compose.material3.Text
import androidx.compose.runtime.Composable
import androidx.compose.runtime.DisposableEffect
import androidx.compose.runtime.LaunchedEffect
import androidx.compose.runtime.collectAsState
import androidx.compose.runtime.getValue
import androidx.compose.runtime.mutableStateOf
import androidx.compose.runtime.produceState
import androidx.compose.runtime.remember
import androidx.compose.runtime.rememberCoroutineScope
import androidx.compose.runtime.setValue
import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier
import androidx.compose.ui.draw.clip
import androidx.compose.ui.graphics.Color
import androidx.compose.ui.input.pointer.pointerInput
import androidx.compose.ui.layout.ContentScale
import androidx.compose.ui.platform.LocalContext
import androidx.compose.ui.res.painterResource
import androidx.compose.ui.unit.dp
import androidx.compose.ui.unit.sp
import androidx.constraintlayout.compose.ConstraintLayout
import androidx.constraintlayout.compose.Dimension
import androidx.core.content.ContextCompat
import com.redtea.hybott.ui.space.MySpaceRight
import com.redteamobile.aicamera.R
import com.redteamobile.aicamera.pages.LocalNavController
import com.redteamobile.aicamera.pages.Pages
import com.redteamobile.aicamera.pages.customTextStyleExtraBold
import com.redteamobile.aicamera.ui.controls.BackIcon
import com.redteamobile.aicamera.ui.controls.GifImage
import com.redteamobile.aicamera.ui.controls.dialog.LoadingDialog
import com.redteamobile.aicamera.ui.controls.dialog.rememberApiErrorDialog
import com.redteamobile.aicamera.ui.theme.AccentColor
import com.redteamobile.aicamera.interfaces.AliChat
import com.redteamobile.aicamera.interfaces.ZhipuAiChat
import com.redteamobile.aicamera.utils.JsonCacheUtil
import com.redteamobile.aicamera.utils.logd
import com.redteamobile.aicamera.utils.loge
import com.redteamobile.aicamera.utils.toast
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.launch
@Composable
fun ChatPage(route: Pages.Chat) {
val context = LocalContext.current
//数据为空的判断(一般不可能是为空的!)
val id = route.roleId
val item = roleInfoList.find { it.id == id }
if (item == null) {
return
}
//权限
var hasAudioPermission by remember { mutableStateOf(false) }
var isInit by remember { mutableStateOf(false) }
// 权限请求
val permissionLauncher = rememberLauncherForActivityResult(
ActivityResultContracts.RequestPermission()
) { isGranted: Boolean ->
hasAudioPermission = isGranted
}
// 检查权限
LaunchedEffect(Unit) {
when (PackageManager.PERMISSION_GRANTED) {
ContextCompat.checkSelfPermission(
context,
Manifest.permission.RECORD_AUDIO
),
-> {
hasAudioPermission = true
}
else -> {
permissionLauncher.launch(Manifest.permission.RECORD_AUDIO)
}
}
}
// val aiChatUtil = remember { AiChatUtil() }
// val aiChatUtil = remember { AliChat() }
val aiChatUtil = remember { ZhipuAiChat() }
//ai聊天是否已连接
val isConnected by aiChatUtil.isConnected.collectAsState()
// 记录是否正在按下(是否正在录音)
var isRecording by remember { mutableStateOf(false) }
//ai是否在说话
val isSpeaking by aiChatUtil.isRemoteAudioPlaying.collectAsState()
// 停止录音的函数
fun stopRecording() {
try {
"停止录音".logd()
// 禁用音频轨道
aiChatUtil.allowRecordVoice(false)
isRecording = false
} catch (e: Exception) {
"停止录音失败: ${e.message}".logd()
}
}
// 开始录音的函数
fun startRecording() {
if (!hasAudioPermission) {
"需要录音权限:".loge()
return
}
try {
"开始录音".logd()
// 启用音频轨道
aiChatUtil.allowRecordVoice(true)
isRecording = true
} catch (e: Exception) {
"录音失败了: ${e.message}".loge(e)
stopRecording()
}
}
val navController = LocalNavController.current
var showErrorDialog by rememberApiErrorDialog {
navController.popBackStack()
}
LaunchedEffect(Unit) {
aiChatUtil.isError.collectLatest { showErrorDialog = it }
}
//loading弹窗
val showLoading by produceState(true,isConnected) {
if (value.not()) {
return@produceState
} else {
value = when {
isConnected && showErrorDialog.not()-> false
else -> true
}
}
}
if (showLoading) {
LoadingDialog(tip = "wait for init...", showCancel = true) {
navController.popBackStack()
}
}
val scope = rememberCoroutineScope()
LaunchedEffect(hasAudioPermission,isInit) {
//有权限才开始
if (hasAudioPermission && isInit.not()) {
isInit = true
scope.launch {
runCatching {
val item = JsonCacheUtil.getAiChapterConfigJson()
.also {
"${it.size}".logd()
}
.find { it.id == route.roleId }
if (item == null) {
aiChatUtil.initModel("","")
} else {
aiChatUtil.initModel("",voice = item.voice)
}
}.onFailure {
"初始化ai聊天角色失败: ${it.message}".loge(it)
showErrorDialog = true
toast("chat init error: ${it.message}")
}
}
}
}
LaunchedEffect(isConnected) {
if (isConnected) {
//todo ai聊天开始的时候,应该说句话
// toast("已经连接了!")
}
}
DisposableEffect(Unit) {//资源释放
onDispose {
aiChatUtil.release()
}
}
Box(modifier = Modifier.fillMaxSize()) {
Image(
painterResource(R.drawable.bg_star_sky),
null,
modifier = Modifier.fillMaxSize(),
contentScale = ContentScale.FillBounds
)
Column(
modifier = Modifier
.padding(top = 40.dp)
.align(alignment = Alignment.Center),
horizontalAlignment = Alignment.CenterHorizontally,
verticalArrangement = Arrangement.Center
) {
ConstraintLayout(modifier = Modifier.fillMaxWidth()) {
val (img, bubule, text) = createRefs()
Image(
painterResource(item.talkIcon),
null,
modifier = Modifier
.size(168.dp)
.constrainAs(img) {
start.linkTo(parent.start)
end.linkTo(parent.end)
}
)
if (isSpeaking) {
Image(
painterResource(R.drawable.bg_bubble),
null,
modifier = Modifier
.size(84.dp)
.constrainAs(bubule) {
start.linkTo(img.end, 4.dp)
top.linkTo(img.top)
}
)
GifImage(
modifier = Modifier.constrainAs(text) {
start.linkTo(bubule.start)
top.linkTo(bubule.top)
end.linkTo(bubule.end)
bottom.linkTo(bubule.bottom)
width = Dimension.fillToConstraints
height = Dimension.fillToConstraints
}, R.drawable.voice_inputing, contentScale = ContentScale.FillBounds
)
}
}
Box(modifier = Modifier.height(40.dp), contentAlignment = Alignment.Center) {
if (isRecording) {
Text(
"Release To Send",
fontSize = 16.sp,
style = customTextStyleExtraBold,
color = Color.White
)
}
}
//底部的说话的长按钮
Row(
modifier = Modifier
.clip(RoundedCornerShape(20.dp))
.background(
AccentColor,
RoundedCornerShape(20.dp)
)
.pointerInput(Unit) {
detectTapGestures(
onPress = {
startRecording()
try {
awaitRelease()
} finally {
stopRecording()
}
}
)
}
.size(500.dp, 56.dp),
verticalAlignment = Alignment.CenterVertically,
horizontalArrangement = Arrangement.Center
) {
if (isRecording) {
GifImage(modifier = Modifier.size(237.dp, 44.dp), R.drawable.voice_input_long)
} else {
Image(
painterResource(R.drawable.icon_mico),
null,
modifier = Modifier.size(32.dp)
)
MySpaceRight(8.dp)
Text(
"Hold To Talk",
style = customTextStyleExtraBold,
fontSize = 18.sp,
color = Color.White
)
}
}
}
BackIcon()
}
}
评论区