Android BLE 蓝牙开发进阶:RxAndroidBle框架下的设备连接与数据通信实战
1. RxAndroidBle框架入门从零搭建BLE开发环境第一次接触RxAndroidBle时我被它简洁的API设计惊艳到了。相比Android原生的BLE接口这个基于RxJava的框架让异步操作变得像搭积木一样简单。记得当时为了赶项目进度我用原生API写了300多行的设备连接代码而改用RxAndroidBle后同样的功能不到50行就搞定了。要在项目中引入RxAndroidBle首先需要在build.gradle中添加依赖。这里有个小技巧建议同时引入RxJava和Lifecycle组件可以更好地管理订阅生命周期dependencies { // RxAndroidBle核心库 implementation com.polidea.rxandroidble3:rxandroidble:1.17.2 // RxJava3支持 implementation io.reactivex.rxjava3:rxjava:3.1.6 implementation io.reactivex.rxjava3:rxandroid:3.0.2 // 生命周期管理可选但推荐 implementation androidx.lifecycle:lifecycle-runtime-ktx:2.6.2 }权限配置是BLE开发的第一道坎。从Android 12开始蓝牙权限体系有了重大变化需要特别注意版本兼容性。我在实际项目中遇到过不少因为权限问题导致的崩溃后来总结出了这个工具类object BlePermissionHelper { /** * 检查当前是否具备BLE操作权限 */ SuppressLint(InlinedApi) fun hasPermissions(context: Context): Boolean { return if (Build.VERSION.SDK_INT Build.VERSION_CODES.S) { ContextCompat.checkSelfPermission(context, Manifest.permission.BLUETOOTH_SCAN) PackageManager.PERMISSION_GRANTED ContextCompat.checkSelfPermission(context, Manifest.permission.BLUETOOTH_CONNECT) PackageManager.PERMISSION_GRANTED } else { ContextCompat.checkSelfPermission(context, Manifest.permission.ACCESS_FINE_LOCATION) PackageManager.PERMISSION_GRANTED } } /** * 获取需要申请的权限数组 */ fun getRequiredPermissions(): ArrayString { return if (Build.VERSION.SDK_INT Build.VERSION_CODES.S) { arrayOf( Manifest.permission.BLUETOOTH_SCAN, Manifest.permission.BLUETOOTH_CONNECT ) } else { arrayOf(Manifest.permission.ACCESS_FINE_LOCATION) } } }初始化RxBleClient时有个性能优化点应该使用ApplicationContext而不是ActivityContext避免内存泄漏。我习惯在自定义Application类中初始化class MyApp : Application() { val rxBleClient by lazy { RxBleClient.create(this) } }2. 设备扫描与发现的实战技巧扫描BLE设备看似简单但要做好用户体验却有不少门道。首先要注意扫描模式的选取不同的模式对功耗和响应速度影响很大。经过多次测试我总结出这些模式的使用场景SCAN_MODE_LOW_POWER适合后台持续扫描间隔约5秒上报一次结果SCAN_MODE_BALANCED平衡模式约2秒上报一次适合大多数场景SCAN_MODE_LOW_LATENCY高频率扫描适合需要实时响应的场景SCAN_MODE_OPPORTUNISTIC被动模式依赖其他应用的扫描结果一个健壮的扫描器实现应该包含以下功能点class BleScanner(private val context: Context) { private val client (context.applicationContext as MyApp).rxBleClient private var scanDisposable: Disposable? null private val devices ConcurrentHashMapString, ScanResult() fun startScan( scanMode: Int ScanSettings.SCAN_MODE_LOW_LATENCY, callbackType: Int ScanSettings.CALLBACK_TYPE_ALL_MATCHES ): ObservableScanResult { if (!BlePermissionHelper.hasPermissions(context)) { return Observable.error(SecurityException(缺少蓝牙扫描权限)) } stopScan() // 先停止已有扫描 val settings ScanSettings.Builder() .setScanMode(scanMode) .setCallbackType(callbackType) .build() return client.scanBleDevices(settings) .doOnSubscribe { scanDisposable it } .doOnNext { result - devices[result.bleDevice.macAddress] result } .doOnDispose { devices.clear() } } fun stopScan() { scanDisposable?.dispose() scanDisposable null } fun getDeviceList(): ListScanResult { return devices.values.toList() } }在实际项目中我发现设备去重和信号强度过滤是两个高频需求。RxAndroidBle已经帮我们处理了设备去重但信号强度过滤需要自己实现。这里分享一个实用的RSSI过滤技巧fun filterByRssi(minRssi: Int -80, maxRssi: Int 0): ObservableScanResult { return scanResults .filter { it.rssi in minRssi..maxRssi } .distinctUntilChanged { old, new - old.bleDevice.macAddress new.bleDevice.macAddress abs(old.rssi - new.rssi) 3 // 只有当信号强度变化超过3dB才触发更新 } }3. 设备连接与状态管理设备连接是BLE开发中最容易出问题的环节。经过多次踩坑我总结出一个稳定的连接流程应该包含以下步骤建立物理连接发现服务启用通知/指示错误处理和重连机制RxAndroidBle的连接操作非常简洁fun connectDevice(macAddress: String): ObservableRxBleConnection { return client.getBleDevice(macAddress) .establishConnection(false) // 参数表示是否自动连接 .flatMap { connection - connection.discoverServices().map { services - connection to services } } .retryWhen { errors - errors.flatMap { error - if (error is BleDisconnectedException) { Observable.timer(2, TimeUnit.SECONDS) } else { Observable.error(error) } } } }连接状态管理是个容易被忽视但非常重要的功能。我在项目中实现了一个状态机来管理连接状态enum class ConnectionState { DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING, ERROR } class ConnectionManager { private val stateSubject BehaviorSubject.createConnectionState() val stateObservable: ObservableConnectionState stateSubject.distinctUntilChanged() fun connect(device: RxBleDevice) { stateSubject.onNext(ConnectionState.CONNECTING) device.establishConnection(false) .subscribe( { stateSubject.onNext(ConnectionState.CONNECTED) }, { error - stateSubject.onNext(ConnectionState.ERROR) stateSubject.onNext(ConnectionState.DISCONNECTED) } ) } }对于需要长时间保持连接的设备建议实现自动重连机制。这里有个实用技巧指数退避算法可以避免频繁重连导致的电量消耗fun autoReconnect(device: RxBleDevice): ObservableRxBleConnection { return device.establishConnection(false) .retryWhen { errors - errors.zipWith( Observable.range(1, 5), { error, retryCount - Pair(error, retryCount) } ).flatMap { (error, retryCount) - if (error is BleDisconnectedException) { Observable.timer(Math.pow(2.0, retryCount.toDouble()).toLong(), TimeUnit.SECONDS) } else { Observable.error(error) } } } }4. 数据通信的进阶实践BLE通信的核心是Characteristic的操作主要包括读取、写入和订阅通知。RxAndroidBle为这些操作提供了统一的API风格// 读取Characteristic fun readCharacteristic( connection: RxBleConnection, serviceUuid: UUID, charUuid: UUID ): ObservableByteArray { return connection.readCharacteristic(serviceUuid, charUuid) } // 写入Characteristic带响应 fun writeCharacteristic( connection: RxBleConnection, serviceUuid: UUID, charUuid: UUID, data: ByteArray ): ObservableByteArray { return connection.writeCharacteristic(serviceUuid, charUuid, data) } // 订阅通知 fun setupNotification( connection: RxBleConnection, serviceUuid: UUID, charUuid: UUID ): ObservableByteArray { return connection.setupNotification(serviceUuid, charUuid) .flatMap { it } // 将ObservableObservableByteArray展平 }在实际项目中数据分包是个常见需求。特别是当传输的数据超过20字节BLE单次传输的最大限制时需要实现分段传输。这是我常用的一个分段写入工具类object BlePacketHelper { private const val MTU 20 // 标准BLE MTU大小 fun splitPacket(data: ByteArray): ListByteArray { return data.toList().chunked(MTU).map { it.toByteArray() } } fun writeLargeData( connection: RxBleConnection, serviceUuid: UUID, charUuid: UUID, data: ByteArray ): ObservableBoolean { val packets splitPacket(data) return Observable.fromIterable(packets) .concatMap { packet - connection.writeCharacteristic(serviceUuid, charUuid, packet) .map { true } } .reduce { acc, _ - acc true } } }对于需要双向通信的场景建议使用写请求通知响应的模式。下面是一个完整的命令交互示例fun executeCommand( connection: RxBleConnection, command: ByteArray ): ObservableByteArray { val writeCharUuid UUID.fromString(0000ffe1-0000-1000-8000-00805f9b34fb) val notifyCharUuid UUID.fromString(0000ffe2-0000-1000-8000-00805f9b34fb) val serviceUuid UUID.fromString(0000ffe0-0000-1000-8000-00805f9b34fb) return connection.setupNotification(serviceUuid, notifyCharUuid) .flatMap { notificationObservable - connection.writeCharacteristic(serviceUuid, writeCharUuid, command) .ignoreElements() .andThen(notificationObservable) .take(1) // 只取第一个响应 } }5. 性能优化与疑难解答在复杂场景下使用BLE时性能优化至关重要。以下是几个经过验证的优化技巧连接参数优化适当调整connectionInterval7.5ms - 4000ms根据场景选择slaveLatency0-499合理设置supervisionTimeout100ms - 32000msfun requestConnectionParameters( connection: RxBleConnection, interval: Int, latency: Int, timeout: Int ): ObservableBoolean { return connection.requestConnectionPriority(interval, latency, timeout) .map { true } .onErrorReturn { false } }MTU协商 在Android 5.0上可以请求更大的MTU最大512字节显著提高传输效率fun negotiateMtu(connection: RxBleConnection, mtu: Int): ObservableInt { return connection.requestMtu(mtu) .onErrorReturn { connection.mtu } }多设备管理 当需要同时连接多个设备时要注意资源分配。我通常使用连接池模式class BleConnectionPool(private val maxConnections: Int) { private val connections ConcurrentHashMapString, RxBleConnection() fun getConnection(device: RxBleDevice): ObservableRxBleConnection { return if (connections.size maxConnections) { Observable.error(IllegalStateException(连接数已达上限)) } else { device.establishConnection(false) .doOnNext { connections[device.macAddress] it } .doOnDispose { connections.remove(device.macAddress) } } } }常见问题排查指南连接不稳定检查设备距离、干扰源适当调整连接参数数据传输失败确认MTU大小检查分包逻辑高延迟减少同时操作的Characteristic数量优化通知间隔功耗过高降低扫描频率增加连接间隔6. 实战案例健康设备数据采集系统去年我开发过一个健康监测系统需要同时连接心率带、血氧仪和体温计三个BLE设备。这个项目让我积累了不少实战经验下面分享关键实现首先定义设备管理器统一管理多个设备连接class HealthDeviceManager { private val heartRateDevice: RxBleDevice private val spo2Device: RxBleDevice private val tempDevice: RxBleDevice private val disposables CompositeDisposable() fun startMonitoring(): ObservableHealthData { val heartRateObs setupHeartRateMonitoring() val spo2Obs setupSpo2Monitoring() val tempObs setupTemperatureMonitoring() return Observable.merge(heartRateObs, spo2Obs, tempObs) } private fun setupHeartRateMonitoring(): ObservableHealthData { return heartRateDevice.establishConnection(false) .flatMap { connection - connection.setupNotification(HEART_RATE_SERVICE, HEART_RATE_CHAR) .flatMap { it } .map { parseHeartRate(it) } } .doOnSubscribe { disposables.add(it) } } // 其他设备监控方法类似... fun stop() { disposables.clear() } }数据解析是健康设备开发的关键环节。以心率数据为例需要按照规范解析fun parseHeartRate(data: ByteArray): HeartRateData { val flags data[0].toInt() and 0xff var offset 1 val is16bit (flags and 0x01) ! 0 val hasEnergy (flags and 0x08) ! 0 val heartRate if (is16bit) { val value (data[offset 1].toInt() and 0xff shl 8) or (data[offset].toInt() and 0xff) offset 2 value } else { data[offset].toInt() and 0xff offset 1 } val energyExpended if (hasEnergy) { val value (data[offset 1].toInt() and 0xff shl 8) or (data[offset].toInt() and 0xff) offset 2 value } else null return HeartRateData(heartRate, energyExpended) }对于需要实时显示数据的应用建议使用RxJava的背压策略避免UI卡顿fun observeRealTimeData(): FlowableHealthData { return healthDeviceManager.startMonitoring() .toFlowable(BackpressureStrategy.LATEST) // 只处理最新数据 .observeOn(AndroidSchedulers.mainThread()) }7. 测试与调试技巧可靠的测试方案是保证BLE应用质量的关键。我通常采用分层测试策略单元测试使用RxAndroidBle的mock功能RunWith(MockitoJUnitRunner::class) class BleManagerTest { Mock lateinit var mockDevice: RxBleDevice Mock lateinit var mockConnection: RxBleConnection Test fun testConnection() { val manager BleManager(mockDevice) when(mockDevice.establishConnection(anyBoolean())) .thenReturn(Observable.just(mockConnection)) manager.connect().test() .assertComplete() .assertNoErrors() } }集成测试使用真实设备自动化测试LargeTest RunWith(AndroidJUnit4::class) class BleIntegrationTest { get:Rule val permissionRule GrantPermissionRule.grant( Manifest.permission.BLUETOOTH_SCAN, Manifest.permission.BLUETOOTH_CONNECT ) Test fun testDeviceCommunication() { val device RxBleClient.create(InstrumentationRegistry.getInstrumentation().targetContext) .getBleDevice(00:11:22:33:44:55) val tester BleDeviceTester(device) tester.runBasicTest() .blockingAwait() } }性能测试关注关键指标连接建立时间数据传输速率功耗表现内存占用调试BLE应用时我常用的工具组合nRF Connect查看广播数据和服务列表WiresharkBLE嗅探器抓包分析通信过程Android Studio Profiler监控应用性能RxJava的doOn操作符在数据流中插入调试点fun debugFlow(connection: RxBleConnection): ObservableByteArray { return connection.setupNotification(SERVICE_UUID, CHAR_UUID) .flatMap { it } .doOnNext { Log.d(BLE, 收到数据: ${it.toHexString()}) } .doOnError { Log.e(BLE, 发生错误, it) } .doOnSubscribe { Log.i(BLE, 开始订阅) } .doOnDispose { Log.w(BLE, 订阅被释放) } }8. 生产环境的最佳实践在将BLE应用发布到生产环境前有几个关键点需要特别注意后台连接策略使用Foreground Service保持连接合理设置扫描间隔处理Doze模式限制class BleBackgroundService : Service() { private val binder LocalBinder() private lateinit var bleManager: BleManager inner class LocalBinder : Binder() { fun getService(): BleBackgroundService thisBleBackgroundService } override fun onBind(intent: Intent): IBinder binder override fun onCreate() { super.onCreate() startForeground(NOTIFICATION_ID, createNotification()) bleManager BleManager(applicationContext) bleManager.startBackgroundScan() } private fun createNotification(): Notification { val channelId if (Build.VERSION.SDK_INT Build.VERSION_CODES.O) { createNotificationChannel() } else return NotificationCompat.Builder(this, channelId) .setContentTitle(BLE服务运行中) .setSmallIcon(R.drawable.ic_ble) .build() } RequiresApi(Build.VERSION_CODES.O) private fun createNotificationChannel(): String { val channelId ble_background val channel NotificationChannel( channelId, BLE服务, NotificationManager.IMPORTANCE_LOW ) getSystemService(NotificationManager::class.java) .createNotificationChannel(channel) return channelId } }设备兼容性处理不同厂商的BLE实现有差异准备备用通信方案实现自动降级机制安全策略使用加密通信如LE Secure Connection实现配对绑定敏感数据本地加密fun setupSecureConnection(device: RxBleDevice): ObservableRxBleConnection { return device.establishConnection(false) .flatMap { connection - if (Build.VERSION.SDK_INT Build.VERSION_CODES.M) { connection.requestConnectionPriority( RxBleConnection.CONNECTION_PRIORITY_HIGH ).ignoreElements() .andThen(Observable.just(connection)) } else { Observable.just(connection) } } .flatMap { connection - connection.requestMtu(247) .onErrorReturnItem(connection.mtu) .flatMap { mtu - if (connection.supportsL2cap()) { connection.setupL2capChannel() .map { channel - SecureConnection(connection, channel) } } else { Observable.just(SecureConnection(connection)) } } } }用户体验优化清晰的连接状态指示自动重连提示数据传输进度显示错误恢复指导在开发商业级BLE应用时建议建立完善的监控体系连接成功率统计数据传输错误率设备兼容性报告性能指标监控class BleAnalytics { fun logConnectionEvent(device: RxBleDevice, success: Boolean, duration: Long) { val params bundleOf( mac to device.macAddress, success to success, duration_ms to duration ) Firebase.analytics.logEvent(ble_connection, params) } fun logDataTransfer( characteristic: UUID, size: Int, time: Long, error: String? null ) { val params bundleOf( char_uuid to characteristic.toString(), data_size to size, transfer_time to time, error to (error ?: none) ) Firebase.analytics.logEvent(ble_data_transfer, params) } }