AppDays 2015 Pordenone
RxJava is a Java VM implementation of ReactiveX (Reactive Extensions): a library for composing asynchronous and event-based programs by using observable sequences.
public interface StackOverflowService {
@GET("/users")
UserResponse getTopUsers();
}
public interface StackOverflowService {
@GET("/users")
UserResponse getTopUsers();
}
public class UserResponse {
private List<User> items;
public List<User> getItems() {
return items;
}
}
RestAdapter restAdapter =
new RestAdapter.Builder()
RestAdapter restAdapter =
new RestAdapter.Builder()
.setEndpoint("http://api.stackexchange.com/2.2/")
RestAdapter restAdapter =
new RestAdapter.Builder()
.setEndpoint("http://api.stackexchange.com/2.2/")
.build();
RestAdapter restAdapter =
new RestAdapter.Builder()
.setEndpoint("http://api.stackexchange.com/2.2/")
.build();
StackOverflowService service =
restAdapter.create(StackOverflowService.class);
RestAdapter restAdapter =
new RestAdapter.Builder()
.setEndpoint("http://api.stackexchange.com/2.2/")
.setRequestInterceptor(request -> {
request.addQueryParam("site", "stackoverflow");
request.addQueryParam("key", "...");
})
.build();
StackOverflowService service =
restAdapter.create(StackOverflowService.class);
private List<User> loadItemsSync() {
List<User> users =
service.getTopUsers().getItems();
if (users.size() > 5) {
users = users.subList(0, 5);
}
return users;
}
@GET("/users/{userId}/top-tags")
TagResponse getTags(@Path("userId") int userId);
@GET("/users/{userId}/badges")
BadgeResponse getBadges(@Path("userId") int userId);
@GET("/users/{userId}/top-tags")
TagResponse getTags(@Path("userId") int userId);
@GET("/users/{userId}/badges")
BadgeResponse getBadges(@Path("userId") int userId);
service.getTags(12345);
/users/12345/top-tags?site=stackoverflow&key=…
List<User> users = service.getTopUsers().getItems();
if (users.size() > 5) {
users = users.subList(0, 5);
}
List<UserStats> statsList = new ArrayList<>();
for (User user : users) {
TagResponse tags =
service.getTags(user.getId());
BadgeResponse badges =
service.getBadges(user.getId());
statsList.add(new UserStats(user,
tags.getItems(), badges.getItems()));
}
return statsList;
new AsyncTask<Void, Void, List<User>>() {
@Override
protected List<User> doInBackground(Void... p) {
try {
return loadItemsSync();
} catch (Exception e) {
return null;
}
}
@Override
protected void onPostExecute(List<User> users) {
if (users != null) {
adapter.addAll(users);
} else {
showError();
}
}
}.execute();
public interface StackOverflowService {
@GET("/users")
UserResponse getTopUsers();
}
public interface StackOverflowService {
@GET("/users")
void getTopUsers(Callback<UserResponse> callback);
}
service.getTopUsers(new Callback<UserResponse>() {
@Override public void success(
UserResponse userResponse, Response r) {
List<User> users = userResponse.getItems();
if (users.size() > 5)
users = users.subList(0, 5);
adapter.addAll(users);
}
@Override public void failure(RetrofitError e) {
showError();
}
});
service.getTopUsers(new Callback<UserResponse>() {
@Override public void success(
UserResponse userResponse, Response r) {
List<User> users = userResponse.getItems();
if (users.size() > 5)
users = users.subList(0, 5);
adapter.addAll(users);
}
@Override public void failure(RetrofitError e) {
showError();
}
});
service.getBadges(userId, new Callback<BadgeResponse>() {
@Override public void success(BadgeResponse badges, Response r) {
service.getTags(userId, new Callback<TagResponse>() {
@Override public void success(TagResponse tags, Response r) {
callback.success(new UserStats(user,
tags.getItems(), badges.getItems()), r);
}
@Override public void failure(RetrofitError error) {
callback.failure(error);
}
});
}
@Override public void failure(RetrofitError error) {
callback.failure(error);
}
});
public interface StackOverflowService {
@GET("/users")
void getTopUsers(Callback<UserResponse> callback);
}
public interface StackOverflowService {
@GET("/users")
Observable<UserResponse> getTopUsers();
}
service.getTopUsers()
.subscribe(new Action1<UserResponse>() {
@Override public void call(UserResponse r) {
List<User> users = r.getItems();
if (users.size() > 5)
users = users.subList(0, 5);
adapter.addAll(users);
}
}, new Action1<Throwable>() {
@Override public void call(Throwable t) {
showError();
}
});
service.getTopUsers()
.subscribe(
r -> {
List<User> users = r.getItems();
if (users.size() > 5)
users = users.subList(0, 5);
adapter.addAll(users);
},
t -> showError()
);
service
.getTopUsers()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
r -> {
List<User> users = r.getItems()
if (users.size() > 5)
users = users.subList(0, 5);
adapter.addAll(users);
},
t -> showError()
);
public final Subscription subscribe(
final Action1<? super T> onNext,
final Action1<Throwable> onError) {
//...
}
Observable.just(1, 2, 3);
Observable.just(1, 2, 3);
Observable.from(Arrays.asList("A", "B", "C", "D"));
Observable.just(1, 2, 3);
Observable.from(Arrays.asList("A", "B", "C", "D"));
Observable.error(new IOException());
Observable.just(1, 2, 3);
Observable.from(Arrays.asList("A", "B", "C", "D"));
Observable.error(new IOException());
Observable.interval(5, TimeUnit.SECONDS);
Observable.just(1, 2, 3);
Observable.from(Arrays.asList("A", "B", "C", "D"));
Observable.error(new IOException());
Observable.interval(5, TimeUnit.SECONDS);
Observable.create(subscriber -> {
try {
subscriber.onNext(createFirstValue());
subscriber.onNext(createSecondValue());
subscriber.onCompleted();
} catch (Throwable t) {
subscriber.onError(t);
}
});
public Subscription subscribe(
Action1<? super T> onNext,
Action1<Throwable> onError,
Action0 onComplete
);
public Subscription subscribe(
Action1<? super T> onNext,
Action1<Throwable> onError,
Action0 onComplete
);
Observable.just(1, 2, 3).subscribe(
System.out::println,
Throwable::printStackTrace,
() -> System.out.println("Completed")
);
Observable.just(1, 2, 3)
.subscribe(new Observer<Integer>() {
@Override public void onNext(Integer i) {
System.out.println(i);
}
@Override public void onError(Throwable t) {
t.printStackTrace();
}
@Override public void onCompleted() {
System.out.println("Completed");
}
});
service.getTopUsers()
.subscribe(
r -> {
List<User> users = r.getItems()
if (users.size() > 5)
users = users.subList(0, 5);
adapter.addAll(users);
},
t -> showError()
);
service.getTopUsers()
.map(r -> r.getItems())
.subscribe(
users -> {
if (users.size() > 5)
users = users.subList(0, 5);
adapter.addAll(users);
},
t -> showError()
);
service.getTopUsers()
.map(r -> r.getItems())
.map(users -> users.size() > 5 ?
users.subList(0, 5) : users)
.subscribe(
users -> adapter.addAll(users),
t -> showError()
);
service.getTopUsers()
.map(UserResponse::getItems)
.map(users -> users.size() > 5 ?
users.subList(0, 5) : users)
.subscribe(
adapter::addAll,
t -> showError()
);
Observable.zip(
service.getTags(user.getId()),
service.getBadges(user.getId()),
(tags, badges) ->
new UserStats(
user, tags.getItems(), badges.getItems())
)
);
Observable.zip(
service.getTags(user.getId()),
service.getBadges(user.getId()),
(tags, badges) ->
new UserStats(
user, tags.getItems(), badges.getItems())
)
);
Observable.zip(
service.getTags(user.getId()),
service.getBadges(user.getId()),
(tags, badges) ->
new UserStats(
user, tags.getItems(), badges.getItems())
)
);
Observable.zip(
service.getTags(user.getId()),
service.getBadges(user.getId()),
(tags, badges) ->
new UserStats(
user, tags.getItems(), badges.getItems())
)
);
Observable.zip(
service.getTags(user.getId()),
service.getBadges(user.getId()),
(tags, badges) ->
new UserStats(
user, tags.getItems(), badges.getItems()
)
);
Observable.zip(
service.getTags(user.getId())
.map(TagResponse::getItems),
service.getBadges(user.getId())
.map(BadgeResponse::getItems),
(tags, badges) ->
new UserStats(user, tags, badges)
);
Observable.just(1, 2, 3).map(
i -> Observable.just(i * 10, i * 10 + 1)
);
Observable<Observable<Integer>> observable =
Observable.just(1, 2, 3).map(
i -> Observable.just(i * 10, i * 10 + 1)
);
Observable<Observable<Integer>> observable =
Observable.just(1, 2, 3).map(
i -> Observable.just(i * 10, i * 10 + 1)
);
[1, 2, 3]
[[10, 11], [20, 21], [30, 31]]
Observable<Integer> observable =
Observable.just(1, 2, 3).flatMap(
i -> Observable.just(i * 10, i * 10 + 1)
);
[1, 2, 3]
[10, 11, 20, 21, 30, 31]
Observable<Profile> observable =
service.login(userName, password)
.flatMap(token -> service.getProfile(token));
Observable<Profile> observable =
service.login(userName, password)
.flatMap(token -> service.getProfile(token));
Observable<Profile> observable =
service.login(userName, password)
.flatMap(service::getProfile);
service
.getTopUsers()//1<UserResponse>
service
.getTopUsers()//1<UserResponse>
.flatMap(r -> Observable.from(r.getItems()))
//20<User>
service
.getTopUsers()//1<UserResponse>
.flatMapIterable(UserResponse::getItems)//20<User>
service
.getTopUsers()//1<UserResponse>
.flatMapIterable(UserResponse::getItems)//20<User>
.limit(5)//5<User>
service
.getTopUsers()//1<UserResponse>
.flatMapIterable(UserResponse::getItems)//20<User>
.limit(5)//5<User>
.flatMap(this::loadUserStats)//5<UserStats>
service
.getTopUsers()//1<UserResponse>
.flatMapIterable(UserResponse::getItems)//20<User>
.limit(5)//5<User>
.flatMap(this::loadUserStats)//5<UserStats>
.toList();//1<List<UserStats>>
public final <R> Observable<R> flatMap(
Func1<
? super T,
? extends Observable<? extends R>
> func) {
return merge(map(func));
}
public final <R> Observable<R> concatMap(
Func1<
? super T,
? extends Observable<? extends R>
> func) {
return concat(map(func));
}
service
.getTopUsers()
.flatMapIterable(UserResponse::getItems)
.limit(5)
.concatMap(this::loadUserStats)
.toList();
service
.getTopUsers()
.flatMapIterable(UserResponse::getItems)
.limit(5)
.concatMap(this::loadRepoStats)
.toList()
.timeout(20, TimeUnit.SECONDS);
service
.getTopUsers()
.retry(2)
.flatMapIterable(UserResponse::getItems)
.limit(5)
.concatMap(this::loadRepoStats)
.toList()
.timeout(20, TimeUnit.SECONDS)
.retry(1);
public class Cache {
private List<UserStats> items;
public void save(List<UserStats> users) {
items = users;
}
public Observable<List<UserStats>> load(
Throwable t) {
if (items == null)
return Observable.error(t);
else
return Observable.just(items);
}
}
service.getTopUsers()
.retry(2)
.flatMapIterable(UserResponse::getItems)
.limit(5)
.concatMap(this::loadRepoStats)
.toList()
.timeout(20, TimeUnit.SECONDS)
.retry(1)
.doOnNext(cache::save)
.onErrorResumeNext(cache::load);
Observable
.interval(1, TimeUnit.SECONDS)
.timestamp()
.subscribe(System.out::println);
Subscription subscription = Observable
.interval(1, TimeUnit.SECONDS)
.timestamp()
.subscribe(System.out::println);
Thread.sleep(2500);
subscription.unsubscribe();
Subscription subscription = Observable
.interval(1, TimeUnit.SECONDS)
.timestamp()
.subscribe(System.out::println);
Thread.sleep(2500);
subscription.unsubscribe();
Timestamped(timestampMillis = 1429360406807, value = 0) Timestamped(timestampMillis = 1429360407805, value = 1)
Observable<UserResponse> observable =
service.getTopUsers();
Subscription s1 = observable.subscribe(
System.out::println, Throwable::printStackTrace);
Subscription s2 = observable.subscribe(
System.out::println, Throwable::printStackTrace);
Observable<UserResponse> observable =
service.getTopUsers();
Subscription s1 = observable.subscribe(
System.out::println, Throwable::printStackTrace);
Subscription s2 = observable.subscribe(
System.out::println, Throwable::printStackTrace);
Observable<UserResponse> observable =
service.getTopUsers();
ConnectableObservable<UserResponse> replayObservable
= observable.replay(1);
Subscription s1 = replayObservable.subscribe(
System.out::println, Throwable::printStackTrace);
Subscription s2 = replayObservable.subscribe(
System.out::println, Throwable::printStackTrace);
Subscription s3 = replayObservable.connect();
@Override public View onCreateView(...) {
...
retainedFragment = RetainedFragment
.getOrCreate(getActivity());
if (retainedFragment.get() == null) {
Observable<List<T>> observable = loadItems()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
retainedFragment.bind(observable.replay(1));
}
...
}
@Override public void onResume() {
super.onResume();
subscription = retainedFragment.get()
.subscribe(
this::showDataInList,
t -> showError()
);
}
@Override public void onPause() {
super.onPause();
subscription.unsubscribe();
}
public class RetainedFragment<T> extends Fragment {
private Subscription connectableSubscription = Subscriptions.empty();
private ConnectableObservable<T> observable;
public RetainedFragment() {
setRetainInstance(true);
}
public void bind(ConnectableObservable<T> observable) {
this.observable = observable;
connectableSubscription = observable.connect();
}
@Override public void onDestroy() {
super.onDestroy();
connectableSubscription.unsubscribe();
}
public Observable<T> get() {
return observable;
}
}
public class RetainedFragment<T> extends Fragment {
private Subscription connectableSubscription = Subscriptions.empty();
private ConnectableObservable<T> observable;
public RetainedFragment() {
setRetainInstance(true);
}
public void bind(ConnectableObservable<T> observable) {
this.observable = observable;
connectableSubscription = observable.connect();
}
@Override public void onDestroy() {
super.onDestroy();
connectableSubscription.unsubscribe();
}
public Observable<T> get() {
return observable;
}
}
public class RetainedFragment<T> extends Fragment {
private Subscription connectableSubscription = Subscriptions.empty();
private ConnectableObservable<T> observable;
public RetainedFragment() {
setRetainInstance(true);
}
public void bind(ConnectableObservable<T> observable) {
this.observable = observable;
connectableSubscription = observable.connect();
}
@Override public void onDestroy() {
super.onDestroy();
connectableSubscription.unsubscribe();
}
public Observable<T> get() {
return observable;
}
}