Introdução ao RX Java com Kotlin
Olá povo,
Tirando a teia de aranha aqui do blog, resolvi fazer um post introdutório sobre RXJava no Android. Na verdade, eu tinha feito uma apresentação há um bom tempo atrás e acabou que não usei essa apresentação em canto nenhum. Então resolvi transcrevê-la aqui para o blog. São apenas anotações minhas e que provavelmente não estão na melhor ordem, mas pode ser que seja útil pra alguém.
O que é o RX Java?
De uma maneira bem simplista, RxJava é uma biblioteca que permite representar de forma declarativa, qualquer operação como um fluxo assíncrono de dados, que pode ser criado por qualquer thread, e consumido por múltiplos objetos em (opcionalmente) threads diferentes.
Onde usar?
Existe uma infinidade de cenários onde o RX Java pode ser utilizados. Mas de uma forma geral, ele é utilizado em chamadas assíncronas, tais como:
- Acesso à rede, tais como chamadas HTTP;
- Leitura e escrita no sistema de arquivos e banco de dados;
- Controle de eventos de UI que devem disparar ações;
- Tratamento de dados dos sensores do aparelho;
- Envio de mensagens internas da aplicação;
- etc...
Usos comuns
No desenvolvimento de aplicações Android, o RX Java é muito utilizado para substituir ou fornecer uma forma mais elegante para resolver problemas que podem ser solucionados com algumas bibliotecas de terceiros e/ou APIs nativas do Android:
- AsyncTask (Observable + Observer)
- Chaining callbacks (callbacks encadeados)
- Handler / TimerTask
- Retrofit. Call pattern? (Adapter-RxJava)
- EventBus (PublishSubject)
Elementos fundamentais
O RX Java tem os seguintes elementos principais:
- Observables
- Observers
- Schedulers
- Operators
Veremos cada um deles nas próximas seções.
Observable
Um Observable é o um tipo de emissor que emite um fluxo de dados de forma sequencial (por padrão). Esse fluxo normalmente é consumido por um Observer.
Um Observable é similar ao Iterable
do Java, a principal diferença entre Observable
e o Iterable
é que o primeiro provê acesso assíncrono ao fluxo de dados enquanto que o no segundo, o acesso é síncrono.
Observer
Os Observers são os objetos que consomem os dados emitidos por um Observable.
Podem haver múltiplos Observers registrados para receber dados emitidos por um Observable.
Observers podem (e devem) tratar quaisquer erros que venham a ocorrer enquanto o Observable está emitindo dados. Ele também saberá quando não houver mais itens a serem consumidos.
Observable + Observer
O RX Java implementa o padrão Observer. Quando um Observable emite itens, as seguintes ações são chamadas no Observer:
onNext
é chamado para cada item emitido pelo Observable;- Quando todos os itens forem emitidos, o
onComplete
será invocado; - Caso algum erro ocorra durante a emissão de algum item, o
onError
será chamado e a sequência será interrompida (por padrão).
Adicionando a dependência do RX Java
Para testar os exemplos a seguir, crie um novo projeto no Android Studio e adicione as seguintes dependências no build.gradle do módulo app
.
dependencies {
...
implementation "io.reactivex.rxjava2:rxjava:2.2.6"
implementation "io.reactivex.rxjava2:rxandroid:2.1.1"
}
A primeira dependência é do RX Java em si, enquanto que a segunda é necessária para podemos exibir os resultados emitidos pelo Observable na main thread do Android. Falaremos sobre isso na seção sobre "Schedulers".
Por ser totalmente independente do framework do Android, podemos testar o RX Java na própria JVM, por isso, o código a seguir pode ser executado na classe de teste que criada automaticamente quando o projeto Android é criado (<projeto>/app/src/test/java/<pacote.do.app>/ExampleUnitTest
).
Criando um Observable
Existem várias maneiras de criar um Observable, algumas delas estão listadas abaixo:
just
Cria um observable que emite apenas um item.
Observable.just("Hello RX")
.subscribe { valor -> println(valor) }Resultado
---------------------------
Hello RX
Ao utilizar o método just
foi criado um objeto Observable
que emite apenas a String "Hello RX". No método subscribe
é registrado um observer/consumer/receiver dos dados que serão emitidos. Com o Kotlin, isso é feito com uma simples instrução lambda, onde o valor emitido é recebido como parâmetro.
from*
Existem algumas variações do método from
para criar um Observable
. Abaixo estão sendo listadas duas delas:
Observable.fromArray("Google", "Microsoft", "Apple")
.subscribe { println(it) }Resultado
---------------------------
Microsoft
AppleObservable.fromIterable(listOf("Google", "Microsoft", "Apple"))
.subscribe(
{ println(it) }, // onNext
{ e -> println("Erro") }, // onError
{ println("Complete") } // onComplete
)Resultado
---------------------------
Microsoft
Apple
Complete
Os nomes dos métodos são auto-explicativos. Perceba que o lambda passado para no subscribe
passado para o Observable criado pelo método fromArray
é chamado 3 vezes (uma para cada item emitido). Outro detalhe desse exemplo é que foram passados três lambdas para o Observable criado com o fromIterable
que representam respectivamente os métodos: onNext
, onError
e onComplete
.
create
Permite criar um Observable “do zero”. Assim, cabe a você chamar cada método do Observer: onNext
para cada item emitido; onComplete
ao terminar a sequência; e onError
em caso de erro.
fun getObservableFromList(lista: List<String>) =
Observable.create<String> { emitter ->
lista.forEach { nome ->
if (nome == "") {
emitter.onError(Exception("Valor em branco"))
}
emitter.onNext(nome)
}
emitter.onComplete()
}
A função acima está criando um Observable que emitirá os itens de uma lista. Caso algum item da lista seja vazio, um erro será gerado e a sequência será interrompida. Perceba que o parâmetro emitter
será responsável por chamar os métodos onNext
, onComplete
e eventualmente o onError
.
getObservableFromList(listOf("Nelson", "Glauber", "Leal"))
.subscribe { println(it) }Resultado
---------------------------
Nelson
Glauber
LealgetObservableFromList(listOf("Nelson", "", "Leal"))
.subscribe(
{ println(it) },
{ e -> println("Deu erro!") }
)Resultado
---------------------------
Nelson
Deu erro!
Perceba que na primeira chamada, como a lista não possui nenhum item vazio, todo seu conteúdo é exibido. Já no segundo caso, apenas o primeiro item é mostrado, uma vez que o segundo item é vazio, a sequência é interrompida e a mensagem de erro é exibida.
interval
O método interval
retorna um Observable que emite uma sequência crescente de inteiros em um intervalo de tempo constante e configurável.
Observable.intervalRange(
10L, // start
5L, // count
0L, // initial delay
1L, // period
TimeUnit.SECONDS
).subscribe { println("Emitido no segundo $it") }Resultado
---------------------------
Emitido no segundo 10
Emitido no segundo 11
Emitido no segundo 12
Emitido no segundo 13
Emitido no segundo 14
Nesse exemplo, a emissão é feita a cada um segundo. Se você executar esse código na classe de teste como foi mencionado no inicio do artigo, apenas o primeiro item será emitido. Nesse caso você deverá adicionar um Thread.sleep(6_000)
que fará com que o teste aguarde 6 segundos (1 a mais que o nosso Observable precisa.
Tipos de emissores
Até agora falamos apenas do tipo de emissor Observable
mas o RX possui outros tipos:
- Flowable (
onNext
,onError
eonComplete
) igual ao Observable, mas suporta o conceito de Back pressure. - Maybe (
onSuccess
ouonError
ouonComplete
). Esse emissor é usado quando se deseja retornar um único valor opcional. Seus métodos são mutuamente exclusivos, ou seja, apenas um deles é chamado. Se o dado existir, oonSuccess
será chamado; se não existir, oonComplete
será invocado; ou se um erro ocorrer, oonError
é disparado. - Single (
onSuccess
ouonError
). Quando apenas um único valor obrigatório precisa ser retornado, deve-se usar o emissor Single. Com ele, se o valor existir, o métodoonSuccess
será chamado, caso contrário, o métodoonError
deve ser utilizado. - Completable (
onComplete
ouonError
). O Completable não emite dados. ele é utilizado apenas saber se uma operação foi concluída com sucesso ou não. Um exemplo muito comum no uso doCompletable
, é o acesso a uma REST API onde o servidor pode retornar o status HTTP 204 (No content) indicando que a operação foi bem sucedida, mas que não há nada para ser retornado para o cliente. Nesse caso, oonComplete
seria chamado. Caso algum erro aconteça, oonError
será disparado.
Flowable.create<String> ({ emitter ->
emitter.onNext("Hello Flowable")
emitter.onComplete()
emitter.onError(Exception())
}, BackpressureStrategy.BUFFER)
Maybe.create<String> { emitter ->
emitter.onSuccess("Hello Maybe")
emitter.onComplete()
emitter.onError(Exception())
}
Single.create<String> { emitter ->
emitter.onSuccess("Hello Single")
emitter.onError(Exception())
}
Completable.create { emitter ->
emitter.onComplete()
emitter.onError(Exception())
}
doOnSubscribe, doOnNext, doOnError, doOnComplete, …
Esses métodos são chamados quando as respectivas ações são chamadas no Observable.
Observable.just("Hello")
.doOnSubscribe { println("Subscribed!") }
.doOnNext { s -> println("Emitiu: $s") }
.doOnError { e -> println("Deu Erro: $e") }
.doOnComplete { println("Complete!") }
.subscribe { println("Subscribe") }Resultado
---------------------------
Subscribed!
Emitiu: Hello
Subscribe
Complete!
Existem ainda outros métodos que podem ser úteis, tais como: doAfterNext
, doFinally
, doOnDispose
, entre outros.
Schedulers
Schedulers determinam a thread em que o emissor emitirá seu fluxo de dados e a thread em que os Observers consumirão essas informações.
Os Schedulers são passados respectivamente nos métodos subscribeOn
e observeOn
. Os principais Schedulers são:
- Main Thread (
AndrdoiSchedulers.mainThread()
- Computation (
Schedulers.computation()
) - I/O (
Schedulers.io()
) - Nova thread (
Schedulers.newThread()
)
Observable.create {
...
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
Com o código acima, toda a operação realizada pelo Scheduler de I/O e o resultado é postado na main thread do Android.
Os Schedulers io()
e computation()
são capazes de gerenciar um pool de thread automaticamente
Transformers
Para evitar ter que utilizar várias vezes o subscribeOn
e o observeOn
, podemos criar um ObservableTransformer
e utiliza-lo com o método compose
.
fun <T> applyObservableAsync(): ObservableTransformer<T, T> {
return ObservableTransformer { observable ->
observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
}
}...Observable.just("a", "b", "c")
.compose(applyObservableAsync())
.subscribe { letra-> print(letra) }
Operadores
Os operadores manipulam os itens entre o produtor (Observable) e o consumidor (Observer). O RX Java tem uma infinidade de operadores, vou listar apenas alguns principais aqui.
map
Transforma os itens emitidos por um Observable aplicando uma função para cada item.
Observable.fromArray("Google", "Microsoft", "Apple")
.map { it to it.length }
.subscribe { pair ->
val (name, length) = pair
println("$name - $length")
}Resultado
---------------------------
Google - 6
Microsoft - 9
Apple - 5
O Observable está emitindo o nome das empresas, mas o método map
transformou cada String emitida em um Pair
(utilizando a extension function to
) onde a chave é o nome da empresa e o valor é a quantidade de caracteres do nome da empresa.
flatMap
O flatMap
é bem similar ao map
, a diferença principal é que a função utilizada retorna um novo Observable (e não o dado modificado como no map
). Isso é particularmente útil ao precisar modificar o dado emitido utilizando um outro Observable.
fun getSO(company: String) =
Observable.create<String> { emitter ->
emitter.onNext(
when (company) {
"Apple" -> "iOS"
"Google" -> "Android"
"Microsoft" -> "Windows"
else -> "Desconhecido"
}
)
emitter.onComplete()
}
Observable.just("Apple", "Google", "Microsoft")
.flatMap { s ->
getSO(s)
}
.subscribe { println(it) }Resultado
---------------------------
iOS
Android
Windows
Perceba que nesse exemplo foi utilizado um outro Observable que emite o nome do sistema operacional de acordo com o nome do fabricante. Imagine que estivéssemos acessando um web service (ou o sistema de arquivos, por exemplo) para obter a lista de empresas e outro para obter o nome do sistema operacional.
Um detalhe importante é que o flatMap
não mantém a ordem dos valores emitidos.
filter
Realiza uma filtragem para emitir apenas os itens que satisfaçam uma determinada condição.
Observable.fromArray(1, 2, 3, 4, 5, 6, 7, 8)
.filter { i -> i % 2 == 0 }
.subscribe { i -> print("$i, ") }Resultado
---------------------------
2, 4, 6, 8,
zip
O operador zip
é um dos que mais demonstra o poder o RX Java. Ele combina as emissões de dois ou mais Observables por meio de uma função e emite um item para cada combinação desses Observables.
Observable.zip(
Observable.just(
"Joey", "Ross", "Chandler", "Monica", "Rachel", "Phoebe"),
Observable.just(
"Barney", "Ted", "Marshall", "Lily", "Robin"),
BiFunction<String, String, String> { friend, himym ->
"$friend = $himym"
}
)
.subscribe { t -> println(t) }Resultado
---------------------------
Joey = Barney
Ross = Ted
Chandler = Marshall
Monica = Lily
Rachel = Robin
A BiFunction
é triplamente tipada <String, String, String>
, sendo os tipos, respectivamente: o do primeiro Observable, o do segundo Observable e o tipo de retorno da função. Perceba que o valor "Phoebe" não foi emitido porque não houve uma emissão correspondente no segundo Observable (e porque eu não gosto dela ☺).
Caso precise de mais Observables, ao invés da BiFunction
, você pode usar Function3
, Function4
, …, Function9
.
concat x merge
Ambos combinam as emissões. Como o nome diz, o concat
concatena em ordem duas ou mais emissões. Então um Observable só executará assim que seu antecessor terminar de emitir os itens.
Já o merge
interpola as duas emissões, ou seja não mantém a ordem dos itens emitidos.
fun getRandomDelay() = (Math.random() * 3).toLong() * 1000L
val cidades = Observable.create<String> { emitter ->
listOf(
"Tóquio", "Rio", "Berlim", "Denver",
"Moscou", "Nairobi", "Helsinque", "Oslo"
).forEach {
Thread.sleep(getRandomDelay())
emitter.onNext(it)
}
emitter.onComplete()
}.subscribeOn(Schedulers.newThread())
val bb = Observable.create<String> { emitter ->
listOf("Walt", "Jesse", "Skyler", "Saul", "Hank")
.forEach {
Thread.sleep(getRandomDelay())
emitter.onNext(it)
}
emitter.onComplete()
}.subscribeOn(Schedulers.newThread())
Basicamente temos dois Observable emitindo seus dados com intervalos randômicos. Perceba que foi utilizado Schedulers.newThread()
pois se ambos os Observables estiverem na mesma thread, o merge
e o concat
funcionarão da mesma forma.
Usando o concat
um resultado constante, primeiros os itens do Observable cidades
seria emitido, e só então os itens do Observable bb
.
Observable.concat(cidades, bb)
.subscribe { t -> print("$t, ") }Resultado
---------------------------
Tóquio, Rio, Berlim, Denver, Moscou, Nairobi, Helsinque, Oslo, Walt, Jesse, Skyler, Saul, Hank,
Com o merge
teríamos resultados variados, já os itens dos observables são chamados de forma intercalada.
Observable.merge(cidades, bb)
.subscribe { t -> print("$t, ") }Resultado (variável)
---------------------------
Walt, Jesse, Skyler, Tóquio, Rio, Berlim, Denver, Saul, Moscou, Nairobi, Helsinque, Hank, Oslo,
take
Consome apenas os primeiros N itens emitidos por um Observable.
Vejamos um exemplo simples.
Observable.just(1, 2, 3)
.take(2)
.subscribe { print("$it ") }Resultado
---------------------------
1 2
Agora um exemplo mais real.
fun getRandomDelay() = (Math.random() * 5).toLong() * 1000L
val cache =
Observable.create<String> {
Thread.sleep(getRandomDelay())
it.onNext("Cache")
it.onComplete()
}.subscribeOn(Schedulers.newThread())
val network =
Observable.create<String> {
Thread.sleep(getRandomDelay())
it.onNext("Network")
it.onComplete()
}.subscribeOn(Schedulers.newThread())
Observable.merge(cache, network)
.take(1)
.subscribe { s -> println(s) }
Resultado
---------------------------
Depende. Cache ou Network
O código acima simula uma situação bem comum. Imagine que você tem uma informação na web e a mesma informação pode estar salva localmente no cache. Você pode realizar ambas as requisições (com o operador merge
) solicitando o mesmo dado, e só pegar a primeira que foi retornada com o take(1)
. Note que novamente foi utilizado o Schedulers.newThread()
.
debounce
Somente emite um item de um Observable se um determinado tempo passou sem emitir um outro item.
Observable.create<Int> { emitter ->
emitter.onNext(1)
Thread.sleep(400) // 400 < 500, descarta o 1
emitter.onNext(2)
Thread.sleep(600) // 600 > 500, emite o 2
emitter.onNext(3)
Thread.sleep(100) // 100 < 500, descarta o 3
emitter.onNext(4)
Thread.sleep(600) // 600 > 500, emite o 4
emitter.onNext(5)
Thread.sleep(600) // 600 > 500, emite o 5
emitter.onComplete()
}
.debounce(500, TimeUnit.MILLISECONDS)
.subscribe { i -> println(i.toString()) }Resultado
---------------------------
2
4
5
repeat
Cria um Observable que emite um item múltiplas vezes.
Observable.just("A", "B", "C")
.repeat(2)
.subscribe { print(it) }Resultado
---------------------------
ABCABC
retry & retryWhen
Se um observable emitir um erro, é possível tentar novamente na esperança que ele complete sem erros.
Observable.create<String> { emitter ->
val numero = (Math.random() * 10).toInt()
if (numero < 5) {
emitter.onNext("Sucesso!")
emitter.onComplete()
} else {
emitter.onError(RuntimeException("Falhou! Tente de novo"))
}
}.retry(2)
.subscribe(
{ s -> println("TERMINEI! $s") },
{ e -> println("Erro") }
)Resultado
(Varia de acordo com o número gerado)
---------------------------
TERMINEI! Sucesso!
Se o número gerado for maior ou igual a 5, um erro será gerado, mas o onError
não será chamado imediatamente, pois estamos informando para o Observable tentar novamente por até duas vezes (com retry(2)
). O onError
só será chamado se as duas chamadas seguintes não gerarem um número menor que 5.
Usamos esse exemplo, mas imagine que você está fazendo uma requisição a um servidor web, e por alguma razão a primeira chamada falhou. Você pode fazer isso usando o retry
, mas o retryWhen
permite tentar novamente apenas se uma determinada condição for satisfeita, inclusive permitindo um delay para a nova tentativa.
Observable.create<String> { emitter ->
val numero = (Math.random() * 10).toInt()
if (numero < 5) {
emitter.onNext("Sucesso!")
emitter.onComplete()
} else {
emitter.onError(RuntimeException("Falhou! Tente de novo"))
}
}.retryWhen { completed ->
completed.delay(2, TimeUnit.SECONDS)
}.subscribe(
{ s -> println("TERMINEI! $s") },
{ e -> println("Erro") }
)
Igual ao exemplo anterior, mas a nova tentativa só ocorre após 2 segundos.
Disposable
Ao realizar uma subscription em um Observable, uma instância de Disposable
é retornada. Isso permite liberar os recursos e threads alocadas para o Observer por meio do método dispose
.
Se estiver realizando várias subscriptions, podemos utilizar a classe CompositeDisposable
, adicionar os objetos Disposable
a ele, e liberar todos invocando o método clear
. Isso é muito importante no Android para evitar leaks de memória.
val disposable = Observable.just("Hello").subscribe { println(it) }
disposable.dispose()
val obs1 = Observable.just("A").subscribe { println(it) }
val obs2 = Observable.just("B").subscribe { println(it) }
val compositeDisposable = CompositeDisposable()
compositeDisposable.add(obs1)
compositeDisposable.add(obs2)
compositeDisposable.clear()
PublishSubject
A classe PublishSubject
é uma subclasse de Subject
e funciona como Observable e Observer ao mesmo tempo. Pois além receber dados (como um Observer) ele também emite (como um Observable). Desta forma, um Subject
possui acesso aos métodos onNext
, onComplete
e onError
(no Observer esses métodos estão disponíveis apenas internamente).
class MainActivity : AppCompatActivity() {
val source = PublishSubject.create<String>()
val disposables = CompositeDisposable()
var count = 0
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
disposables.add(
source.subscribe(
{ dado -> textView.text = dado },
{ e -> textView.text = "Erro" }
)
)
button.setOnClickListener {
source.onNext("Mensagem ${++count}")
if (count == 10) {
source.onComplete()
}
}
}
override fun onDestroy() {
super.onDestroy()
disposables.clear()
}
}
Nesse exemplo, quando o botão é clicado, um novo dado é emitido pelo PublishSubject
e a variável count
é incrementada. Com isso, o Observer registrado no método subscribe
é chamado e o texto do TextView é atualizado. Quando count
chegar a 10, onComplete
é chamado e a sequência termina.
ConnectableObservable
Por padrão, cada vez que um Observer se registra em um Observable, a sequência é enviada novamente. É possível fazer com que a mesma emissão seja enviada para Observables diferentes utilizando a classe ConnectableObservable
.
val observable = Observable.just("Event")
.map { s ->
println("Expensive operation for $s")
s
}
.publish()
.autoConnect(2)
observable.subscribe { s -> println("Sub1 got: $s") }
observable.subscribe { s -> println("Sub2 got: $s") }
Resultado
---------------------------
Expensive operation for Event
Sub1 received: Event
Sub2 received: Event
O método publish()
retorna um ConnectableObservable
no qual os Observers podem se registrar. Foi utilizado o autoConnect(2)
para determinar que as emissões podem começar após dois observables se registrarem. Outra forma seria utilizar o método connect()
para que o envio seja realizado para todos os Observers.
Fonte: https://blog.danlew.net/2016/06/13/multicasting-in-rxjava/
Conclusão
RX Java é uma biblioteca importantíssima no desenvolvimento de aplicações Android e muito utilizada pela comunidade. Por isso, o seu conhecimento é essencial por parte dos desenvolvedores. Esse post é apenas um conjunto de anotações que eu tinha feito quando estava estudando RX. Espero que seja útil de alguma forma para alguém.
Referências
Nesse outro post eu coloquei um monte de links sobre RX Java.
Meus agradecimentos a Diego Nascimento e Ubiratan Soares pela revisão desse artigo!
Qualquer dúvida, deixe seus comentários. =)
4br4ç05