AppDays 2015 Pordenone
RxJava is a Java VM implementation of ReactiveX (Reactive Extensions): a library for composing asynchronous and event-based programs by using observable sequences.

public interface StackOverflowService {@GET("/users")UserResponse getTopUsers();}
public interface StackOverflowService {@GET("/users")UserResponse getTopUsers();}public class UserResponse {private List<User> items;public List<User> getItems() {return items;}}
RestAdapter restAdapter =new RestAdapter.Builder()
RestAdapter restAdapter =new RestAdapter.Builder().setEndpoint("http://api.stackexchange.com/2.2/")
RestAdapter restAdapter =new RestAdapter.Builder().setEndpoint("http://api.stackexchange.com/2.2/").build();
RestAdapter restAdapter =new RestAdapter.Builder().setEndpoint("http://api.stackexchange.com/2.2/").build();StackOverflowService service =restAdapter.create(StackOverflowService.class);
RestAdapter restAdapter =new RestAdapter.Builder().setEndpoint("http://api.stackexchange.com/2.2/").setRequestInterceptor(request -> {request.addQueryParam("site", "stackoverflow");request.addQueryParam("key", "...");}).build();StackOverflowService service =restAdapter.create(StackOverflowService.class);
private List<User> loadItemsSync() {List<User> users =service.getTopUsers().getItems();if (users.size() > 5) {users = users.subList(0, 5);}return users;}
@GET("/users/{userId}/top-tags")TagResponse getTags(@Path("userId") int userId);@GET("/users/{userId}/badges")BadgeResponse getBadges(@Path("userId") int userId);
@GET("/users/{userId}/top-tags")TagResponse getTags(@Path("userId") int userId);@GET("/users/{userId}/badges")BadgeResponse getBadges(@Path("userId") int userId);
service.getTags(12345);
/users/12345/top-tags?site=stackoverflow&key=…
List<User> users = service.getTopUsers().getItems();if (users.size() > 5) {users = users.subList(0, 5);}List<UserStats> statsList = new ArrayList<>();for (User user : users) {TagResponse tags =service.getTags(user.getId());BadgeResponse badges =service.getBadges(user.getId());statsList.add(new UserStats(user,tags.getItems(), badges.getItems()));}return statsList;
new AsyncTask<Void, Void, List<User>>() {@Overrideprotected List<User> doInBackground(Void... p) {try {return loadItemsSync();} catch (Exception e) {return null;}}@Overrideprotected void onPostExecute(List<User> users) {if (users != null) {adapter.addAll(users);} else {showError();}}}.execute();
public interface StackOverflowService {@GET("/users")UserResponse getTopUsers();}
public interface StackOverflowService {@GET("/users")void getTopUsers(Callback<UserResponse> callback);}
service.getTopUsers(new Callback<UserResponse>() {@Override public void success(UserResponse userResponse, Response r) {List<User> users = userResponse.getItems();if (users.size() > 5)users = users.subList(0, 5);adapter.addAll(users);}@Override public void failure(RetrofitError e) {showError();}});
service.getTopUsers(new Callback<UserResponse>() {@Override public void success(UserResponse userResponse, Response r) {List<User> users = userResponse.getItems();if (users.size() > 5)users = users.subList(0, 5);adapter.addAll(users);}@Override public void failure(RetrofitError e) {showError();}});
service.getBadges(userId, new Callback<BadgeResponse>() {@Override public void success(BadgeResponse badges, Response r) {service.getTags(userId, new Callback<TagResponse>() {@Override public void success(TagResponse tags, Response r) {callback.success(new UserStats(user,tags.getItems(), badges.getItems()), r);}@Override public void failure(RetrofitError error) {callback.failure(error);}});}@Override public void failure(RetrofitError error) {callback.failure(error);}});
public interface StackOverflowService {@GET("/users")void getTopUsers(Callback<UserResponse> callback);}
public interface StackOverflowService {@GET("/users")Observable<UserResponse> getTopUsers();}
service.getTopUsers().subscribe(new Action1<UserResponse>() {@Override public void call(UserResponse r) {List<User> users = r.getItems();if (users.size() > 5)users = users.subList(0, 5);adapter.addAll(users);}}, new Action1<Throwable>() {@Override public void call(Throwable t) {showError();}});
service.getTopUsers().subscribe(r -> {List<User> users = r.getItems();if (users.size() > 5)users = users.subList(0, 5);adapter.addAll(users);},t -> showError());
service.getTopUsers().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(r -> {List<User> users = r.getItems()if (users.size() > 5)users = users.subList(0, 5);adapter.addAll(users);},t -> showError());
public final Subscription subscribe(final Action1<? super T> onNext,final Action1<Throwable> onError) {//...}




Observable.just(1, 2, 3);
Observable.just(1, 2, 3);Observable.from(Arrays.asList("A", "B", "C", "D"));
Observable.just(1, 2, 3);Observable.from(Arrays.asList("A", "B", "C", "D"));Observable.error(new IOException());
Observable.just(1, 2, 3);Observable.from(Arrays.asList("A", "B", "C", "D"));Observable.error(new IOException());Observable.interval(5, TimeUnit.SECONDS);
Observable.just(1, 2, 3);Observable.from(Arrays.asList("A", "B", "C", "D"));Observable.error(new IOException());Observable.interval(5, TimeUnit.SECONDS);Observable.create(subscriber -> {try {subscriber.onNext(createFirstValue());subscriber.onNext(createSecondValue());subscriber.onCompleted();} catch (Throwable t) {subscriber.onError(t);}});
public Subscription subscribe(Action1<? super T> onNext,Action1<Throwable> onError,Action0 onComplete);
public Subscription subscribe(Action1<? super T> onNext,Action1<Throwable> onError,Action0 onComplete);Observable.just(1, 2, 3).subscribe(System.out::println,Throwable::printStackTrace,() -> System.out.println("Completed"));
Observable.just(1, 2, 3).subscribe(new Observer<Integer>() {@Override public void onNext(Integer i) {System.out.println(i);}@Override public void onError(Throwable t) {t.printStackTrace();}@Override public void onCompleted() {System.out.println("Completed");}});

service.getTopUsers().subscribe(r -> {List<User> users = r.getItems()if (users.size() > 5)users = users.subList(0, 5);adapter.addAll(users);},t -> showError());
service.getTopUsers().map(r -> r.getItems()).subscribe(users -> {if (users.size() > 5)users = users.subList(0, 5);adapter.addAll(users);},t -> showError());
service.getTopUsers().map(r -> r.getItems()).map(users -> users.size() > 5 ?users.subList(0, 5) : users).subscribe(users -> adapter.addAll(users),t -> showError());
service.getTopUsers().map(UserResponse::getItems).map(users -> users.size() > 5 ?users.subList(0, 5) : users).subscribe(adapter::addAll,t -> showError());

Observable.zip(service.getTags(user.getId()),service.getBadges(user.getId()),(tags, badges) ->new UserStats(user, tags.getItems(), badges.getItems())));
Observable.zip(service.getTags(user.getId()),service.getBadges(user.getId()),(tags, badges) ->new UserStats(user, tags.getItems(), badges.getItems())));
Observable.zip(service.getTags(user.getId()),service.getBadges(user.getId()),(tags, badges) ->new UserStats(user, tags.getItems(), badges.getItems())));
Observable.zip(service.getTags(user.getId()),service.getBadges(user.getId()),(tags, badges) ->new UserStats(user, tags.getItems(), badges.getItems())));
Observable.zip(service.getTags(user.getId()),service.getBadges(user.getId()),(tags, badges) ->new UserStats(user, tags.getItems(), badges.getItems()));
Observable.zip(service.getTags(user.getId()).map(TagResponse::getItems),service.getBadges(user.getId()).map(BadgeResponse::getItems),(tags, badges) ->new UserStats(user, tags, badges));
Observable.just(1, 2, 3).map(i -> Observable.just(i * 10, i * 10 + 1));
Observable<Observable<Integer>> observable =Observable.just(1, 2, 3).map(i -> Observable.just(i * 10, i * 10 + 1));
Observable<Observable<Integer>> observable =Observable.just(1, 2, 3).map(i -> Observable.just(i * 10, i * 10 + 1));[1, 2, 3][[10, 11], [20, 21], [30, 31]]

Observable<Integer> observable =Observable.just(1, 2, 3).flatMap(i -> Observable.just(i * 10, i * 10 + 1));[1, 2, 3][10, 11, 20, 21, 30, 31]
Observable<Profile> observable =service.login(userName, password).flatMap(token -> service.getProfile(token));
Observable<Profile> observable =service.login(userName, password).flatMap(token -> service.getProfile(token));Observable<Profile> observable =service.login(userName, password).flatMap(service::getProfile);
service.getTopUsers()//1<UserResponse>
service.getTopUsers()//1<UserResponse>.flatMap(r -> Observable.from(r.getItems()))//20<User>
service.getTopUsers()//1<UserResponse>.flatMapIterable(UserResponse::getItems)//20<User>
service.getTopUsers()//1<UserResponse>.flatMapIterable(UserResponse::getItems)//20<User>.limit(5)//5<User>
service.getTopUsers()//1<UserResponse>.flatMapIterable(UserResponse::getItems)//20<User>.limit(5)//5<User>.flatMap(this::loadUserStats)//5<UserStats>
service.getTopUsers()//1<UserResponse>.flatMapIterable(UserResponse::getItems)//20<User>.limit(5)//5<User>.flatMap(this::loadUserStats)//5<UserStats>.toList();//1<List<UserStats>>

public final <R> Observable<R> flatMap(Func1<? super T,? extends Observable<? extends R>> func) {return merge(map(func));}

public final <R> Observable<R> concatMap(Func1<? super T,? extends Observable<? extends R>> func) {return concat(map(func));}
service.getTopUsers().flatMapIterable(UserResponse::getItems).limit(5).concatMap(this::loadUserStats).toList();
service.getTopUsers().flatMapIterable(UserResponse::getItems).limit(5).concatMap(this::loadRepoStats).toList().timeout(20, TimeUnit.SECONDS);
service.getTopUsers().retry(2).flatMapIterable(UserResponse::getItems).limit(5).concatMap(this::loadRepoStats).toList().timeout(20, TimeUnit.SECONDS).retry(1);
public class Cache {private List<UserStats> items;public void save(List<UserStats> users) {items = users;}public Observable<List<UserStats>> load(Throwable t) {if (items == null)return Observable.error(t);elsereturn Observable.just(items);}}
service.getTopUsers().retry(2).flatMapIterable(UserResponse::getItems).limit(5).concatMap(this::loadRepoStats).toList().timeout(20, TimeUnit.SECONDS).retry(1).doOnNext(cache::save).onErrorResumeNext(cache::load);
Observable.interval(1, TimeUnit.SECONDS).timestamp().subscribe(System.out::println);
Subscription subscription = Observable.interval(1, TimeUnit.SECONDS).timestamp().subscribe(System.out::println);Thread.sleep(2500);subscription.unsubscribe();
Subscription subscription = Observable.interval(1, TimeUnit.SECONDS).timestamp().subscribe(System.out::println);Thread.sleep(2500);subscription.unsubscribe();
Timestamped(timestampMillis = 1429360406807, value = 0) Timestamped(timestampMillis = 1429360407805, value = 1)
Observable<UserResponse> observable =service.getTopUsers();Subscription s1 = observable.subscribe(System.out::println, Throwable::printStackTrace);Subscription s2 = observable.subscribe(System.out::println, Throwable::printStackTrace);
Observable<UserResponse> observable =service.getTopUsers();Subscription s1 = observable.subscribe(System.out::println, Throwable::printStackTrace);Subscription s2 = observable.subscribe(System.out::println, Throwable::printStackTrace);
Observable<UserResponse> observable =service.getTopUsers();ConnectableObservable<UserResponse> replayObservable= observable.replay(1);Subscription s1 = replayObservable.subscribe(System.out::println, Throwable::printStackTrace);Subscription s2 = replayObservable.subscribe(System.out::println, Throwable::printStackTrace);Subscription s3 = replayObservable.connect();
@Override public View onCreateView(...) {...retainedFragment = RetainedFragment.getOrCreate(getActivity());if (retainedFragment.get() == null) {Observable<List<T>> observable = loadItems().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());retainedFragment.bind(observable.replay(1));}...}
@Override public void onResume() {super.onResume();subscription = retainedFragment.get().subscribe(this::showDataInList,t -> showError());}@Override public void onPause() {super.onPause();subscription.unsubscribe();}
public class RetainedFragment<T> extends Fragment {private Subscription connectableSubscription = Subscriptions.empty();private ConnectableObservable<T> observable;public RetainedFragment() {setRetainInstance(true);}public void bind(ConnectableObservable<T> observable) {this.observable = observable;connectableSubscription = observable.connect();}@Override public void onDestroy() {super.onDestroy();connectableSubscription.unsubscribe();}public Observable<T> get() {return observable;}}
public class RetainedFragment<T> extends Fragment {private Subscription connectableSubscription = Subscriptions.empty();private ConnectableObservable<T> observable;public RetainedFragment() {setRetainInstance(true);}public void bind(ConnectableObservable<T> observable) {this.observable = observable;connectableSubscription = observable.connect();}@Override public void onDestroy() {super.onDestroy();connectableSubscription.unsubscribe();}public Observable<T> get() {return observable;}}
public class RetainedFragment<T> extends Fragment {private Subscription connectableSubscription = Subscriptions.empty();private ConnectableObservable<T> observable;public RetainedFragment() {setRetainInstance(true);}public void bind(ConnectableObservable<T> observable) {this.observable = observable;connectableSubscription = observable.connect();}@Override public void onDestroy() {super.onDestroy();connectableSubscription.unsubscribe();}public Observable<T> get() {return observable;}}