186 lines
2.9 KiB
Markdown
186 lines
2.9 KiB
Markdown
# RxJava原理
|
||
|
||
## 目录
|
||
- [RxJava概念](#rxjava概念)
|
||
- [观察者模式](#观察者模式)
|
||
- [操作符](#操作符)
|
||
- [线程调度](#线程调度)
|
||
- [背压](#背压)
|
||
- [RxJava源码分析](#rxjava源码分析)
|
||
- [RxJava最佳实践](#rxjava最佳实践)
|
||
- [面试常见问题](#面试常见问题)
|
||
|
||
---
|
||
|
||
## RxJava概念
|
||
|
||
### 定义
|
||
|
||
```java
|
||
// RxJava:响应式编程库
|
||
// - 基于观察者模式
|
||
// - 支持异步操作
|
||
// - 链式调用
|
||
```
|
||
|
||
### 核心类
|
||
|
||
```java
|
||
// Observable:被观察者
|
||
// Observer:观察者
|
||
// Subscriber:订阅者
|
||
// Scheduler:调度器
|
||
```
|
||
|
||
---
|
||
|
||
## 观察者模式
|
||
|
||
### 基本使用
|
||
|
||
```java
|
||
Observable.create(new ObservableOnSubscribe<String>() {
|
||
@Override
|
||
public void subscribe(ObservableEmitter<String> emitter) {
|
||
emitter.onNext("Hello");
|
||
emitter.onNext("World");
|
||
emitter.onComplete();
|
||
}
|
||
})
|
||
.subscribe(new Observer<String>() {
|
||
@Override
|
||
public void onSubscribe(Disposable d) {
|
||
}
|
||
|
||
@Override
|
||
public void onNext(String s) {
|
||
System.out.println(s);
|
||
}
|
||
|
||
@Override
|
||
public void onError(Throwable e) {
|
||
}
|
||
|
||
@Override
|
||
public void onComplete() {
|
||
}
|
||
});
|
||
```
|
||
|
||
---
|
||
|
||
## 操作符
|
||
|
||
### 常用操作符
|
||
|
||
```java
|
||
// map:转换
|
||
Observable.just(1, 2, 3)
|
||
.map(i -> i * 2)
|
||
.subscribe(System.out::println);
|
||
|
||
// filter:过滤
|
||
Observable.just(1, 2, 3, 4, 5)
|
||
.filter(i -> i % 2 == 0)
|
||
.subscribe(System.out::println);
|
||
|
||
// flatMap:扁平化
|
||
Observable.just(1, 2, 3)
|
||
.flatMap(i -> Observable.just(i * 2))
|
||
.subscribe(System.out::println);
|
||
```
|
||
|
||
---
|
||
|
||
## 线程调度
|
||
|
||
### Scheduler
|
||
|
||
```java
|
||
// subscribeOn:指定上游线程
|
||
// observeOn:指定下游线程
|
||
|
||
Observable.create(...)
|
||
.subscribeOn(Schedulers.io()) // 在 IO 线程执行
|
||
.observeOn(AndroidSchedulers.mainThread()) // 在主线程观察
|
||
.subscribe(...);
|
||
```
|
||
|
||
---
|
||
|
||
## 背压
|
||
|
||
### 背压问题
|
||
|
||
```java
|
||
// 当生产速度 > 消费速度时,产生背压
|
||
// 可能导致内存溢出
|
||
```
|
||
|
||
### 解决方案
|
||
|
||
```java
|
||
// 使用 Flowable
|
||
Flowable.create(...)
|
||
.onBackpressureBuffer() // 缓冲
|
||
.subscribe(...);
|
||
```
|
||
|
||
---
|
||
|
||
## RxJava源码分析
|
||
|
||
### 关键类
|
||
|
||
```java
|
||
// Observable:被观察者
|
||
// Observer:观察者
|
||
// Scheduler:调度器
|
||
// Operator:操作符
|
||
```
|
||
|
||
---
|
||
|
||
## RxJava最佳实践
|
||
|
||
### 1. 及时取消订阅
|
||
|
||
```java
|
||
Disposable disposable = observable.subscribe(...);
|
||
disposable.dispose(); // 取消订阅
|
||
```
|
||
|
||
### 2. 使用合适的操作符
|
||
|
||
```java
|
||
// 根据需求选择操作符
|
||
// map、filter、flatMap 等
|
||
```
|
||
|
||
---
|
||
|
||
## 面试常见问题
|
||
|
||
### Q1: RxJava 的原理?
|
||
|
||
**答案:**
|
||
- 基于观察者模式
|
||
- 支持链式调用
|
||
- 支持线程调度
|
||
|
||
### Q2: subscribeOn 和 observeOn 的区别?
|
||
|
||
**答案:**
|
||
- **subscribeOn**:指定上游线程
|
||
- **observeOn**:指定下游线程
|
||
|
||
### Q3: RxJava 的背压?
|
||
|
||
**答案:**
|
||
- 生产速度 > 消费速度时产生
|
||
- 使用 Flowable 处理
|
||
|
||
---
|
||
|
||
*最后更新:2024年*
|