Introduction to Retrofit and RxJava

Fabio Collini

Introduction to Retrofit and RxJava

AppDays 2015 Pordenone

Ego slide

Retrofit

RxJava

RxJava is a Java VM implementation of ReactiveX (Reactive Extensions): a library for composing asynchronous and event-based programs by using observable sequences.

github.com/ReactiveX/RxJava

RxJava is not simple…

Demo project

github.com/fabioCollini/IntroToRetrofitRxJava

HTTP request definition

public interface StackOverflowService {   @GET("/users")  UserResponse getTopUsers(); }

HTTP request definition

public interface StackOverflowService {   @GET("/users")  UserResponse getTopUsers(); } public class UserResponse {  private List<User> items;  public List<User> getItems() {    return items;  }}

Service creation

RestAdapter restAdapter =   new RestAdapter.Builder()

Service creation

RestAdapter restAdapter =   new RestAdapter.Builder()  .setEndpoint("http://api.stackexchange.com/2.2/")

Service creation

RestAdapter restAdapter =   new RestAdapter.Builder()  .setEndpoint("http://api.stackexchange.com/2.2/")  .build();

Service creation

RestAdapter restAdapter =   new RestAdapter.Builder()  .setEndpoint("http://api.stackexchange.com/2.2/")  .build();StackOverflowService service =   restAdapter.create(StackOverflowService.class);

Service creation

RestAdapter restAdapter =   new RestAdapter.Builder()  .setEndpoint("http://api.stackexchange.com/2.2/")  .setRequestInterceptor(request -> {    request.addQueryParam("site", "stackoverflow");    request.addQueryParam("key", "...");  })  .build();StackOverflowService service =  restAdapter.create(StackOverflowService.class);

Synchronous request

private List<User> loadItemsSync() {  List<User> users =     service.getTopUsers().getItems();  if (users.size() > 5) {    users = users.subList(0, 5);  }  return users;}

Request parameters

@GET("/users/{userId}/top-tags") TagResponse getTags(@Path("userId") int userId);  @GET("/users/{userId}/badges") BadgeResponse getBadges(@Path("userId") int userId);

Request parameters

@GET("/users/{userId}/top-tags") TagResponse getTags(@Path("userId") int userId);  @GET("/users/{userId}/badges") BadgeResponse getBadges(@Path("userId") int userId);

service.getTags(12345);

/users/12345/top-tags?site=stackoverflow&key=…

Other annotations

Composition

List<User> users = service.getTopUsers().getItems();if (users.size() > 5) {  users = users.subList(0, 5);}List<UserStats> statsList = new ArrayList<>();for (User user : users) {  TagResponse tags =     service.getTags(user.getId());  BadgeResponse badges =     service.getBadges(user.getId());  statsList.add(new UserStats(user,     tags.getItems(), badges.getItems()));}return statsList;

AsyncTask

new AsyncTask<Void, Void, List<User>>() {  @Override   protected List<User> doInBackground(Void... p) {    try {      return loadItemsSync();    } catch (Exception e) {      return null;    }  }  @Override  protected void onPostExecute(List<User> users) {    if (users != null) {      adapter.addAll(users);    } else {      showError();    }  }}.execute();

Synchronous request

public interface StackOverflowService {   @GET("/users")  UserResponse getTopUsers(); }

Callbacks

public interface StackOverflowService {   @GET("/users")   void getTopUsers(Callback<UserResponse> callback); }

Callbacks in action

service.getTopUsers(new Callback<UserResponse>() {  @Override public void success(      UserResponse userResponse, Response r) {    List<User> users = userResponse.getItems();    if (users.size() > 5)      users = users.subList(0, 5);    adapter.addAll(users);  }  @Override public void failure(RetrofitError e) {    showError();  }});

Callbacks in action

service.getTopUsers(new Callback<UserResponse>() {  @Override public void success(      UserResponse userResponse, Response r) {    List<User> users = userResponse.getItems();    if (users.size() > 5)      users = users.subList(0, 5);    adapter.addAll(users);  }  @Override public void failure(RetrofitError e) {    showError();  }});

Callback hell

service.getBadges(userId, new Callback<BadgeResponse>() {  @Override public void success(BadgeResponse badges, Response r) {    service.getTags(userId, new Callback<TagResponse>() {      @Override public void success(TagResponse tags, Response r) {        callback.success(new UserStats(user,           tags.getItems(), badges.getItems()), r);      }       @Override public void failure(RetrofitError error) {        callback.failure(error);      }    });  }   @Override public void failure(RetrofitError error) {    callback.failure(error);  }});

Retrofit

public interface StackOverflowService {   @GET("/users")   void getTopUsers(Callback<UserResponse> callback); }

Retrofit + RxJava

public interface StackOverflowService {   @GET("/users")  Observable<UserResponse> getTopUsers(); }

RxJava in action

service.getTopUsers()  .subscribe(new Action1<UserResponse>() {    @Override public void call(UserResponse r) {      List<User> users = r.getItems();      if (users.size() > 5)        users = users.subList(0, 5);      adapter.addAll(users);    }  }, new Action1<Throwable>() {    @Override public void call(Throwable t) {      showError();    }  });

Java 8 / Retrolambda

service.getTopUsers()    .subscribe(      r -> {        List<User> users = r.getItems();        if (users.size() > 5)          users = users.subList(0, 5);        adapter.addAll(users);      },       t -> showError()    );

Threading

service    .getTopUsers()    .subscribeOn(Schedulers.io())    .observeOn(AndroidSchedulers.mainThread())    .subscribe(      r -> {        List<User> users = r.getItems()        if (users.size() > 5)          users = users.subList(0, 5);        adapter.addAll(users);      },       t -> showError()    );

subscribe

public final Subscription subscribe(  final Action1<? super T> onNext,   final Action1<Throwable> onError) {    //...}

onNext | onError

marble

marble

onNext* (onComplete | onError)?

marble

marble

Observable creation

Observable.just(1, 2, 3);

Observable creation

Observable.just(1, 2, 3);Observable.from(Arrays.asList("A", "B", "C", "D"));

Observable creation

Observable.just(1, 2, 3);Observable.from(Arrays.asList("A", "B", "C", "D"));Observable.error(new IOException());

Observable creation

Observable.just(1, 2, 3);Observable.from(Arrays.asList("A", "B", "C", "D"));Observable.error(new IOException());Observable.interval(5, TimeUnit.SECONDS);

Observable creation

Observable.just(1, 2, 3);Observable.from(Arrays.asList("A", "B", "C", "D"));Observable.error(new IOException());Observable.interval(5, TimeUnit.SECONDS);Observable.create(subscriber -> {  try {    subscriber.onNext(createFirstValue());    subscriber.onNext(createSecondValue());    subscriber.onCompleted();  } catch (Throwable t) {    subscriber.onError(t);  }});

Observable in action

public Subscription subscribe(  Action1<? super T> onNext,   Action1<Throwable> onError,   Action0 onComplete);

Observable in action

public Subscription subscribe(  Action1<? super T> onNext,   Action1<Throwable> onError,   Action0 onComplete); Observable.just(1, 2, 3).subscribe(    System.out::println,     Throwable::printStackTrace,     () -> System.out.println("Completed")  );

Observer

Observable.just(1, 2, 3)  .subscribe(new Observer<Integer>() {      @Override public void onNext(Integer i) {          System.out.println(i);      }      @Override public void onError(Throwable t) {          t.printStackTrace();      }      @Override public void onCompleted() {          System.out.println("Completed");      }  });

map

marble

map

service.getTopUsers()  .subscribe(    r -> {      List<User> users = r.getItems()      if (users.size() > 5)        users = users.subList(0, 5);      adapter.addAll(users);    },     t -> showError()  );

map

service.getTopUsers()  .map(r -> r.getItems())  .subscribe(    users -> {      if (users.size() > 5)        users = users.subList(0, 5);      adapter.addAll(users);    },     t -> showError()  );

map

service.getTopUsers()  .map(r -> r.getItems())  .map(users -> users.size() > 5 ?       users.subList(0, 5) : users)  .subscribe(    users -> adapter.addAll(users),     t -> showError()  );

map

service.getTopUsers()  .map(UserResponse::getItems)  .map(users -> users.size() > 5 ?      users.subList(0, 5) : users)  .subscribe(    adapter::addAll,     t -> showError()  );

zip

marble

zip

Observable.zip(    service.getTags(user.getId()),    service.getBadges(user.getId()),    (tags, badges) ->       new UserStats(        user, tags.getItems(), badges.getItems())      ));        

zip

Observable.zip(    service.getTags(user.getId()),    service.getBadges(user.getId()),    (tags, badges) ->       new UserStats(        user, tags.getItems(), badges.getItems())      ));        

zip

Observable.zip(    service.getTags(user.getId()),    service.getBadges(user.getId()),    (tags, badges) ->       new UserStats(        user, tags.getItems(), badges.getItems())      ));        

zip

Observable.zip(    service.getTags(user.getId()),    service.getBadges(user.getId()),    (tags, badges) ->       new UserStats(        user, tags.getItems(), badges.getItems())      ));        

zip

Observable.zip(    service.getTags(user.getId()),    service.getBadges(user.getId()),    (tags, badges) ->       new UserStats(        user, tags.getItems(), badges.getItems()      ));        

zip

Observable.zip(    service.getTags(user.getId())      .map(TagResponse::getItems),    service.getBadges(user.getId())      .map(BadgeResponse::getItems),    (tags, badges) ->       new UserStats(user, tags, badges));        

Multi value map

    Observable.just(1, 2, 3).map(        i -> Observable.just(i * 10, i * 10 + 1)    );

Multi value map

Observable<Observable<Integer>> observable =    Observable.just(1, 2, 3).map(        i -> Observable.just(i * 10, i * 10 + 1)    );

Multi value map

Observable<Observable<Integer>> observable =    Observable.just(1, 2, 3).map(        i -> Observable.just(i * 10, i * 10 + 1)    ); [1, 2, 3] [[10, 11], [20, 21], [30, 31]]

flatMap

marble

flatMap

Observable<Integer> observable =    Observable.just(1, 2, 3).flatMap(        i -> Observable.just(i * 10, i * 10 + 1)    ); [1, 2, 3] [10, 11, 20, 21, 30, 31]

flatMap

Observable<Profile> observable =   service.login(userName, password)    .flatMap(token -> service.getProfile(token));

flatMap

Observable<Profile> observable =   service.login(userName, password)    .flatMap(token -> service.getProfile(token)); Observable<Profile> observable =   service.login(userName, password)    .flatMap(service::getProfile);

flatMap

service  .getTopUsers()//1<UserResponse>

flatMap

service  .getTopUsers()//1<UserResponse>  .flatMap(r -> Observable.from(r.getItems()))  //20<User>

flatMap

service  .getTopUsers()//1<UserResponse>  .flatMapIterable(UserResponse::getItems)//20<User>

flatMap

service  .getTopUsers()//1<UserResponse>  .flatMapIterable(UserResponse::getItems)//20<User>  .limit(5)//5<User>

flatMap

service  .getTopUsers()//1<UserResponse>  .flatMapIterable(UserResponse::getItems)//20<User>  .limit(5)//5<User>  .flatMap(this::loadUserStats)//5<UserStats>

flatMap

service  .getTopUsers()//1<UserResponse>  .flatMapIterable(UserResponse::getItems)//20<User>  .limit(5)//5<User>  .flatMap(this::loadUserStats)//5<UserStats>  .toList();//1<List<UserStats>>

Order is not preserved

flatMap source code

public final <R> Observable<R> flatMap(      Func1<        ? super T,         ? extends Observable<? extends R>      > func) {  return merge(map(func));}

concatMap

marble

concatMap source code

public final <R> Observable<R> concatMap(      Func1<        ? super T,         ? extends Observable<? extends R>      > func) {    return concat(map(func));}

concatMap

service    .getTopUsers()    .flatMapIterable(UserResponse::getItems)    .limit(5)    .concatMap(this::loadUserStats)    .toList();

timeout

service    .getTopUsers()    .flatMapIterable(UserResponse::getItems)    .limit(5)    .concatMap(this::loadRepoStats)    .toList()    .timeout(20, TimeUnit.SECONDS);

retry

service    .getTopUsers()    .retry(2)    .flatMapIterable(UserResponse::getItems)    .limit(5)    .concatMap(this::loadRepoStats)    .toList()    .timeout(20, TimeUnit.SECONDS)    .retry(1);

Cache

public class Cache {  private List<UserStats> items;  public void save(List<UserStats> users) {    items = users;  }  public Observable<List<UserStats>> load(      Throwable t) {    if (items == null)      return Observable.error(t);    else      return Observable.just(items);  }}

doOnNext / onErrorResumeNext

service.getTopUsers()    .retry(2)    .flatMapIterable(UserResponse::getItems)    .limit(5)    .concatMap(this::loadRepoStats)    .toList()    .timeout(20, TimeUnit.SECONDS)    .retry(1)    .doOnNext(cache::save)    .onErrorResumeNext(cache::load);

Subscription

Observable  .interval(1, TimeUnit.SECONDS)  .timestamp()  .subscribe(System.out::println);

Subscription

Subscription subscription = Observable  .interval(1, TimeUnit.SECONDS)  .timestamp()  .subscribe(System.out::println); Thread.sleep(2500); subscription.unsubscribe();

Subscription

Subscription subscription = Observable  .interval(1, TimeUnit.SECONDS)  .timestamp()  .subscribe(System.out::println); Thread.sleep(2500); subscription.unsubscribe();

Timestamped(timestampMillis = 1429360406807, value = 0) Timestamped(timestampMillis = 1429360407805, value = 1)

How many requests?

Observable<UserResponse> observable =   service.getTopUsers(); Subscription s1 = observable.subscribe(  System.out::println, Throwable::printStackTrace);Subscription s2 = observable.subscribe(  System.out::println, Throwable::printStackTrace);

How many requests?

Observable<UserResponse> observable =   service.getTopUsers(); Subscription s1 = observable.subscribe(  System.out::println, Throwable::printStackTrace);Subscription s2 = observable.subscribe(  System.out::println, Throwable::printStackTrace);

Hot observables

Observable<UserResponse> observable =   service.getTopUsers(); ConnectableObservable<UserResponse> replayObservable  = observable.replay(1); Subscription s1 = replayObservable.subscribe(  System.out::println, Throwable::printStackTrace);Subscription s2 = replayObservable.subscribe(  System.out::println, Throwable::printStackTrace); Subscription s3 = replayObservable.connect();

Activity lifecycle

@Override public View onCreateView(...) {  ...  retainedFragment = RetainedFragment    .getOrCreate(getActivity());   if (retainedFragment.get() == null) {    Observable<List<T>> observable = loadItems()        .subscribeOn(Schedulers.io())        .observeOn(AndroidSchedulers.mainThread());    retainedFragment.bind(observable.replay(1));  }  ...}

Activity lifecycle

@Override public void onResume() {  super.onResume();  subscription = retainedFragment.get()    .subscribe(      this::showDataInList,       t -> showError()    );} @Override public void onPause() {  super.onPause();  subscription.unsubscribe();}

RetainedFragment

public class RetainedFragment<T> extends Fragment {  private Subscription connectableSubscription = Subscriptions.empty();  private ConnectableObservable<T> observable;  public RetainedFragment() {      setRetainInstance(true);  }  public void bind(ConnectableObservable<T> observable) {      this.observable = observable;      connectableSubscription = observable.connect();  }  @Override public void onDestroy() {      super.onDestroy();      connectableSubscription.unsubscribe();  }  public Observable<T> get() {      return observable;  }}    

RetainedFragment

public class RetainedFragment<T> extends Fragment {  private Subscription connectableSubscription = Subscriptions.empty();  private ConnectableObservable<T> observable;  public RetainedFragment() {      setRetainInstance(true);  }  public void bind(ConnectableObservable<T> observable) {      this.observable = observable;      connectableSubscription = observable.connect();  }  @Override public void onDestroy() {      super.onDestroy();      connectableSubscription.unsubscribe();  }  public Observable<T> get() {      return observable;  }}    

RetainedFragment

public class RetainedFragment<T> extends Fragment {  private Subscription connectableSubscription = Subscriptions.empty();  private ConnectableObservable<T> observable;  public RetainedFragment() {      setRetainInstance(true);  }  public void bind(ConnectableObservable<T> observable) {      this.observable = observable;      connectableSubscription = observable.connect();  }  @Override public void onDestroy() {      super.onDestroy();      connectableSubscription.unsubscribe();  }  public Observable<T> get() {      return observable;  }}    

Thanks for your attention!