lyrieek
lyrieek
3月前 · 4 人阅读

创建一个被观察者,发起一个网络请求。
observable

 Retrofit retrofit = new Retrofit.Builder()
                .baseUrl(Urls.baseUrl)
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .build();
        GetRequest request=retrofit.create(GetRequest.class);
        Observable<Translation> observable=request.getCall1();

GetRequest代码

public interface GetRequest  {

    @GET(Urls.test)
    Observable<Translation> getCall();

    @GET(Urls.test1)
    Observable<Translation> getCall1();

Translation代码

public class Translation {

    private int status;
    private content content;
    private static class content{
        private String from;
        private String to;
        private String vendor;
        private String out;
        private int errNo;
    }
    public String show(){
        Log.e("yzh",content.out);
        return content.out;
    }
}

对retrofit不太了解的可以去看一下retrofit入门

1 模拟一次重复查询的情况,使用了Rxjava的interval操作符
//最外面是一个循环的操作
  Observable.interval(2,1, TimeUnit.SECONDS)
                .doOnNext(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e("yzh","第"+aLong+"次轮询");
                        //请求在这里执行,注意看到线程的切换
                        observable.subscribeOn(Schedulers.io())
                                .observeOn(AndroidSchedulers.mainThread())
                                .subscribe(new Observer<Translation>() {
                                    @Override
                                    public void onSubscribe(Disposable d) {

                                    }

                                    @Override
                                    public void onNext(Translation translation) {
                                        translation.show();
                                    }

                                    @Override
                                    public void onError(Throwable e) {
                                        Log.e("yzh","请求失败");
                                    }

                                    @Override
                                    public void onComplete() {

                                    }
                                });
                    }
                }).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {

            }

            @Override
            public void onError(Throwable e) {
                Log.e("yzh","对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.e("yzh","对Complete事件作出响应");
            }
        });

打印结果

//        03-12 14:56:16.528 8733-8760/com.example.issuser.rxtest E/yzh: 第0次轮询
//        03-12 14:56:16.886 8733-8733/com.example.issuser.rxtest E/yzh: 嗨世界
//        03-12 14:56:17.528 8733-8760/com.example.issuser.rxtest E/yzh: 第1次轮询
//        03-12 14:56:17.702 8733-8733/com.example.issuser.rxtest E/yzh: 嗨世界
//        03-12 14:56:18.529 8733-8760/com.example.issuser.rxtest E/yzh: 第2次轮询
//        03-12 14:56:18.675 8733-8733/com.example.issuser.rxtest E/yzh: 嗨世界
//        03-12 14:56:19.528 8733-8760/com.example.issuser.rxtest E/yzh: 第3次轮询
//        03-12 14:56:19.674 8733-8733/com.example.issuser.rxtest E/yzh: 嗨世界
2.条件轮询 ,使用RxJava的repeatWhen

被观察者的网络请求同上

observable.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
                return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Object o) throws Exception {
                        //设置轮询条件
                        if(i>3){
                            return Observable.error(new Throwable("轮询结束"));
                        }else{
                            return Observable.just(1).delay(2, TimeUnit.SECONDS);
                        }

                    }
                });
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Translation>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("yzh","onSubscribe");
                    }

                    @Override
                    public void onNext(Translation translation) {
                        translation.show();
                        i++;
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("yzh","onError--"+e.toString());
                    }

                    @Override
                    public void onComplete() {

                    }
                });

打印结果

        03-12 15:25:00.524 12910-12910/com.example.issuser.rxtest E/yzh: onSubscribe
        03-12 15:25:00.715 12910-12910/com.example.issuser.rxtest E/yzh: hi china
        03-12 15:25:02.852 12910-12910/com.example.issuser.rxtest E/yzh: hi china
        03-12 15:25:05.092 12910-12910/com.example.issuser.rxtest E/yzh: hi china
        03-12 15:25:07.244 12910-12910/com.example.issuser.rxtest E/yzh: hi china
        03-12 15:25:09.382 12910-12910/com.example.issuser.rxtest E/yzh: hi china
        03-12 15:25:09.395 12910-12910/com.example.issuser.rxtest E/yzh: onError--java.lang.Throwable: 轮询结束
3 网络异常重连 这个是常见的情况但是不好模拟所以没有打印结果

用到的是RxJava的retryWhen

observable.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
           @Override
           public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
               return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                   @Override
                   public ObservableSource<?> apply(Throwable throwable) throws Exception {
                       Log.e("yzh","发生异常=="+throwable.toString());
                       if(throwable instanceof IOException){
                           Log.e("yzh","属于IO异常,重试");
                           if(currentRetryCount<maxConnectCount){
                               currentRetryCount++;
                               Log.e("yzh","重试的次数--"+currentRetryCount);
                               waitRetryTime=1000+currentRetryCount*1000;
                               Log.e("yzh","等待时间=="+waitRetryTime);
                                return Observable.just(1).delay(waitRetryTime, TimeUnit.MILLISECONDS);
                           }else{
                               return Observable.error(new Throwable("重试次数已超过设置次数 = " +currentRetryCount  + ",即 不再重试"));

                           }
                       }else{
                           return Observable.error(new Throwable("发生了非网络异常(非I/O异常)"));
                       }

                   }
               });
           }
       }).subscribeOn(Schedulers.io())
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(new Observer<Translation>() {
                   @Override
                   public void onSubscribe(Disposable d) {

                   }

                   @Override
                   public void onNext(Translation translation) {
                       Log.e("yzh",  "发送成功");
                       translation.show();
                   }

                   @Override
                   public void onError(Throwable e) {
                        Log.e("yzh",e.toString());
                   }

                   @Override
                   public void onComplete() {

                   }
               });
4 对网络请求返回的数据再做一次处理,用到RxJava的flatmap

常见的使用场景是注册并且直接登录这个场景,注意线程的切换

observable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Translation>() {
                    @Override
                    public void accept(Translation translation) throws Exception {
                        Log.e("yzh","doOnNextThread--"+Thread.currentThread().getName());
                        Log.e("yzh","第一次网络请求成功");
                        translation.show();
                    }
                })
                //切换观察者所在线程
                .observeOn(Schedulers.io())
                .flatMap(new Function<Translation, ObservableSource<Translation>>() {
                    @Override
                    public ObservableSource<Translation> apply(Translation translation) throws Exception {
                        Log.e("yzh","flatMapThread--"+Thread.currentThread().getName());
                        Log.e("yzh","flatmap--");
                        translation.show();
                        //另一个网络请求,这里就不详细列举了
                        return observable1;
                    }
                })
                //切换观察者所在线程
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Translation>() {
                    @Override
                    public void accept(Translation translation) throws Exception {
                        Log.e("yzh","acceptThread--"+Thread.currentThread().getName());
                        Log.e("yzh","accept--");
                        translation.show();
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                    }
                });

打印结果 注意在第二次请求时 观察者所在线程的切换

03-12 11:50:40.681 11667-11667/com.example.issuser.rxtest E/yzh: doOnNextThread--main
03-12 11:50:40.681 11667-11667/com.example.issuser.rxtest E/yzh: 第一次网络请求成功
03-12 11:50:40.681 11667-11667/com.example.issuser.rxtest E/yzh: 嗨世界
03-12 11:50:40.681 11667-11720/com.example.issuser.rxtest E/yzh: flatMapThread--RxCachedThreadScheduler-2
03-12 11:50:40.681 11667-11720/com.example.issuser.rxtest E/yzh: flatmap--
03-12 11:50:40.681 11667-11720/com.example.issuser.rxtest E/yzh: 嗨世界
03-12 11:50:40.949 11667-11667/com.example.issuser.rxtest E/yzh: acceptThread--main
03-12 11:50:40.949 11667-11667/com.example.issuser.rxtest E/yzh: accept--
03-12 11:50:40.949 11667-11667/com.example.issuser.rxtest E/yzh: hi china
5 联合判断多个条件 应用于登录注册等需要填完多个信息

a. RxTextView.textChanges()监听控件的数据变化 ,需要引入依赖:compile 'com.jakewharton.rxbinding2:rxbinding:2.0.0'
b. skip(1) 跳过控件一开始无任何输入值的情况

Observable<CharSequence> nameObservable = RxTextView.textChanges(et_name).skip(1);
 Observable<CharSequence> ageObservable = RxTextView.textChanges(et_age).skip(1);
 Observable<CharSequence> jobObservable = RxTextView.textChanges(et_job).skip(1);
Observable.combineLatest(nameObservable, ageObservable, jobObservable, new Function3<CharSequence, CharSequence, CharSequence, Boolean>() {
            @Override
            public Boolean apply(CharSequence charSequence, CharSequence charSequence2, CharSequence charSequence3) throws Exception {
                boolean isUserNameValid= !TextUtils.isEmpty(et_name.getText().toString());
                boolean isUserAgeValid=!TextUtils.isEmpty(et_age.getText().toString());
                boolean isUserJobValid=!TextUtils.isEmpty(et_job.getText().toString());

                return isUserNameValid&isUserAgeValid&isUserJobValid;
            }
        }).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                Log.e("yzh","提交按钮是否可以点击--"+aBoolean);
            }
        });
6 使用RxJava时要注意的内存泄露,例如有时候页面结束 异步操作并未完成
//        防止activity结束 出现问题
          private final CompositeDisposable disposables = new CompositeDisposable();
          disposables.add(observer);
          @Override
          protected void onDestroy() {
              super.onDestroy();
              // 将所有的 observer 取消订阅
              disposables.clear();
         }
收藏 0
关键词: yzh public override com 12 exa
评论