2.9 KiB
2.9 KiB
RxJava原理
目录
RxJava概念
定义
// RxJava:响应式编程库
// - 基于观察者模式
// - 支持异步操作
// - 链式调用
核心类
// Observable:被观察者
// Observer:观察者
// Subscriber:订阅者
// Scheduler:调度器
观察者模式
基本使用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
emitter.onNext("Hello");
emitter.onNext("World");
emitter.onComplete();
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
操作符
常用操作符
// map:转换
Observable.just(1, 2, 3)
.map(i -> i * 2)
.subscribe(System.out::println);
// filter:过滤
Observable.just(1, 2, 3, 4, 5)
.filter(i -> i % 2 == 0)
.subscribe(System.out::println);
// flatMap:扁平化
Observable.just(1, 2, 3)
.flatMap(i -> Observable.just(i * 2))
.subscribe(System.out::println);
线程调度
Scheduler
// subscribeOn:指定上游线程
// observeOn:指定下游线程
Observable.create(...)
.subscribeOn(Schedulers.io()) // 在 IO 线程执行
.observeOn(AndroidSchedulers.mainThread()) // 在主线程观察
.subscribe(...);
背压
背压问题
// 当生产速度 > 消费速度时,产生背压
// 可能导致内存溢出
解决方案
// 使用 Flowable
Flowable.create(...)
.onBackpressureBuffer() // 缓冲
.subscribe(...);
RxJava源码分析
关键类
// Observable:被观察者
// Observer:观察者
// Scheduler:调度器
// Operator:操作符
RxJava最佳实践
1. 及时取消订阅
Disposable disposable = observable.subscribe(...);
disposable.dispose(); // 取消订阅
2. 使用合适的操作符
// 根据需求选择操作符
// map、filter、flatMap 等
面试常见问题
Q1: RxJava 的原理?
答案:
- 基于观察者模式
- 支持链式调用
- 支持线程调度
Q2: subscribeOn 和 observeOn 的区别?
答案:
- subscribeOn:指定上游线程
- observeOn:指定下游线程
Q3: RxJava 的背压?
答案:
- 生产速度 > 消费速度时产生
- 使用 Flowable 处理
最后更新:2024年