Introdução ao RX Java com Kotlin

Nelson Glauber
13 min readFeb 22, 2019

--

Olá povo,

Tirando a teia de aranha aqui do blog, resolvi fazer um post introdutório sobre RXJava no Android. Na verdade, eu tinha feito uma apresentação há um bom tempo atrás e acabou que não usei essa apresentação em canto nenhum. Então resolvi transcrevê-la aqui para o blog. São apenas anotações minhas e que provavelmente não estão na melhor ordem, mas pode ser que seja útil pra alguém.

O que é o RX Java?

De uma maneira bem simplista, RxJava é uma biblioteca que permite representar de forma declarativa, qualquer operação como um fluxo assíncrono de dados, que pode ser criado por qualquer thread, e consumido por múltiplos objetos em (opcionalmente) threads diferentes.

Onde usar?

Existe uma infinidade de cenários onde o RX Java pode ser utilizados. Mas de uma forma geral, ele é utilizado em chamadas assíncronas, tais como:

  • Acesso à rede, tais como chamadas HTTP;
  • Leitura e escrita no sistema de arquivos e banco de dados;
  • Controle de eventos de UI que devem disparar ações;
  • Tratamento de dados dos sensores do aparelho;
  • Envio de mensagens internas da aplicação;
  • etc...

Usos comuns

No desenvolvimento de aplicações Android, o RX Java é muito utilizado para substituir ou fornecer uma forma mais elegante para resolver problemas que podem ser solucionados com algumas bibliotecas de terceiros e/ou APIs nativas do Android:

Elementos fundamentais

O RX Java tem os seguintes elementos principais:

  • Observables
  • Observers
  • Schedulers
  • Operators

Veremos cada um deles nas próximas seções.

Observable

Um Observable é o um tipo de emissor que emite um fluxo de dados de forma sequencial (por padrão). Esse fluxo normalmente é consumido por um Observer.

Um Observable é similar ao Iterable do Java, a principal diferença entre Observable e o Iterable é que o primeiro provê acesso assíncrono ao fluxo de dados enquanto que o no segundo, o acesso é síncrono.

Observer

Os Observers são os objetos que consomem os dados emitidos por um Observable.

Podem haver múltiplos Observers registrados para receber dados emitidos por um Observable.

Observers podem (e devem) tratar quaisquer erros que venham a ocorrer enquanto o Observable está emitindo dados. Ele também saberá quando não houver mais itens a serem consumidos.

Observable + Observer

O RX Java implementa o padrão Observer. Quando um Observable emite itens, as seguintes ações são chamadas no Observer:

  • onNext é chamado para cada item emitido pelo Observable;
  • Quando todos os itens forem emitidos, o onComplete será invocado;
  • Caso algum erro ocorra durante a emissão de algum item, o onError será chamado e a sequência será interrompida (por padrão).

Adicionando a dependência do RX Java

Para testar os exemplos a seguir, crie um novo projeto no Android Studio e adicione as seguintes dependências no build.gradle do módulo app.

dependencies {
...
implementation "io.reactivex.rxjava2:rxjava:2.2.6"
implementation "io.reactivex.rxjava2:rxandroid:2.1.1"
}

A primeira dependência é do RX Java em si, enquanto que a segunda é necessária para podemos exibir os resultados emitidos pelo Observable na main thread do Android. Falaremos sobre isso na seção sobre "Schedulers".

Por ser totalmente independente do framework do Android, podemos testar o RX Java na própria JVM, por isso, o código a seguir pode ser executado na classe de teste que criada automaticamente quando o projeto Android é criado (<projeto>/app/src/test/java/<pacote.do.app>/ExampleUnitTest).

Criando um Observable

Existem várias maneiras de criar um Observable, algumas delas estão listadas abaixo:

just

Cria um observable que emite apenas um item.

Observable.just("Hello RX")
.subscribe { valor -> println(valor) }
Resultado
---------------------------
Hello RX

Ao utilizar o método just foi criado um objeto Observable que emite apenas a String "Hello RX". No método subscribe é registrado um observer/consumer/receiver dos dados que serão emitidos. Com o Kotlin, isso é feito com uma simples instrução lambda, onde o valor emitido é recebido como parâmetro.

from*

Existem algumas variações do método from para criar um Observable. Abaixo estão sendo listadas duas delas:

Observable.fromArray("Google", "Microsoft", "Apple")
.subscribe { println(it) }
Resultado
---------------------------
Google
Microsoft
Apple
Observable.fromIterable(listOf("Google", "Microsoft", "Apple"))
.subscribe(
{ println(it) }, // onNext
{ e -> println("Erro") }, // onError
{ println("Complete") } // onComplete
)
Resultado
---------------------------
Google
Microsoft
Apple
Complete

Os nomes dos métodos são auto-explicativos. Perceba que o lambda passado para no subscribe passado para o Observable criado pelo método fromArray é chamado 3 vezes (uma para cada item emitido). Outro detalhe desse exemplo é que foram passados três lambdas para o Observable criado com o fromIterable que representam respectivamente os métodos: onNext, onError e onComplete.

create

Permite criar um Observable “do zero”. Assim, cabe a você chamar cada método do Observer: onNext para cada item emitido; onComplete ao terminar a sequência; e onError em caso de erro.

fun getObservableFromList(lista: List<String>) =
Observable.create<String> { emitter ->
lista.forEach { nome ->
if (nome == "") {
emitter.onError(Exception("Valor em branco"))
}
emitter.onNext(nome)
}
emitter.onComplete()
}

A função acima está criando um Observable que emitirá os itens de uma lista. Caso algum item da lista seja vazio, um erro será gerado e a sequência será interrompida. Perceba que o parâmetro emitter será responsável por chamar os métodos onNext, onComplete e eventualmente o onError.

getObservableFromList(listOf("Nelson", "Glauber", "Leal"))
.subscribe { println(it) }
Resultado
---------------------------
Nelson
Glauber
Leal
getObservableFromList(listOf("Nelson", "", "Leal"))
.subscribe(
{ println(it) },
{ e -> println("Deu erro!") }
)
Resultado
---------------------------
Nelson
Deu erro!

Perceba que na primeira chamada, como a lista não possui nenhum item vazio, todo seu conteúdo é exibido. Já no segundo caso, apenas o primeiro item é mostrado, uma vez que o segundo item é vazio, a sequência é interrompida e a mensagem de erro é exibida.

interval

O método interval retorna um Observable que emite uma sequência crescente de inteiros em um intervalo de tempo constante e configurável.

Observable.intervalRange(
10L, // start
5L, // count
0L, // initial delay
1L, // period
TimeUnit.SECONDS
).subscribe { println("Emitido no segundo $it") }
Resultado
---------------------------
Emitido no segundo 10
Emitido no segundo 11
Emitido no segundo 12
Emitido no segundo 13
Emitido no segundo 14

Nesse exemplo, a emissão é feita a cada um segundo. Se você executar esse código na classe de teste como foi mencionado no inicio do artigo, apenas o primeiro item será emitido. Nesse caso você deverá adicionar um Thread.sleep(6_000) que fará com que o teste aguarde 6 segundos (1 a mais que o nosso Observable precisa.

Tipos de emissores

Até agora falamos apenas do tipo de emissor Observable mas o RX possui outros tipos:

  • Flowable (onNext, onError e onComplete) igual ao Observable, mas suporta o conceito de Back pressure.
  • Maybe (onSuccess ou onError ou onComplete). Esse emissor é usado quando se deseja retornar um único valor opcional. Seus métodos são mutuamente exclusivos, ou seja, apenas um deles é chamado. Se o dado existir, o onSuccess será chamado; se não existir, o onComplete será invocado; ou se um erro ocorrer, o onError é disparado.
  • Single (onSuccess ou onError). Quando apenas um único valor obrigatório precisa ser retornado, deve-se usar o emissor Single. Com ele, se o valor existir, o método onSuccess será chamado, caso contrário, o método onError deve ser utilizado.
  • Completable (onComplete ou onError). O Completable não emite dados. ele é utilizado apenas saber se uma operação foi concluída com sucesso ou não. Um exemplo muito comum no uso do Completable, é o acesso a uma REST API onde o servidor pode retornar o status HTTP 204 (No content) indicando que a operação foi bem sucedida, mas que não há nada para ser retornado para o cliente. Nesse caso, o onComplete seria chamado. Caso algum erro aconteça, o onError será disparado.
Flowable.create<String> ({ emitter ->
emitter.onNext("Hello Flowable")
emitter.onComplete()
emitter.onError(Exception())
}, BackpressureStrategy.BUFFER)

Maybe.create<String> { emitter ->
emitter.onSuccess("Hello Maybe")
emitter.onComplete()
emitter.onError(Exception())
}

Single.create<String> { emitter ->
emitter.onSuccess("Hello Single")
emitter.onError(Exception())
}

Completable.create { emitter ->
emitter.onComplete()
emitter.onError(Exception())
}

doOnSubscribe, doOnNext, doOnError, doOnComplete, …

Esses métodos são chamados quando as respectivas ações são chamadas no Observable.

Observable.just("Hello")
.doOnSubscribe { println("Subscribed!") }
.doOnNext { s -> println("Emitiu: $s") }
.doOnError { e -> println("Deu Erro: $e") }
.doOnComplete { println("Complete!") }
.subscribe { println("Subscribe") }
Resultado
---------------------------
Subscribed!
Emitiu: Hello
Subscribe
Complete!

Existem ainda outros métodos que podem ser úteis, tais como: doAfterNext, doFinally, doOnDispose, entre outros.

Schedulers

Schedulers determinam a thread em que o emissor emitirá seu fluxo de dados e a thread em que os Observers consumirão essas informações.

Os Schedulers são passados respectivamente nos métodos subscribeOn e observeOn. Os principais Schedulers são:

  • Main Thread (AndrdoiSchedulers.mainThread()
  • Computation (Schedulers.computation())
  • I/O ( Schedulers.io())
  • Nova thread (Schedulers.newThread())
Observable.create {
...
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

Com o código acima, toda a operação realizada pelo Scheduler de I/O e o resultado é postado na main thread do Android.

Os Schedulers io() e computation() são capazes de gerenciar um pool de thread automaticamente

Transformers

Para evitar ter que utilizar várias vezes o subscribeOn e o observeOn, podemos criar um ObservableTransformer e utiliza-lo com o método compose.

fun <T> applyObservableAsync(): ObservableTransformer<T, T> {
return ObservableTransformer { observable ->
observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
}
}
...Observable.just("a", "b", "c")
.compose(applyObservableAsync())
.subscribe { letra-> print(letra) }

Operadores

Os operadores manipulam os itens entre o produtor (Observable) e o consumidor (Observer). O RX Java tem uma infinidade de operadores, vou listar apenas alguns principais aqui.

map

Transforma os itens emitidos por um Observable aplicando uma função para cada item.

Observable.fromArray("Google", "Microsoft", "Apple")
.map { it to it.length }
.subscribe { pair ->
val (name, length) = pair
println("$name - $length")
}
Resultado
---------------------------
Google - 6
Microsoft - 9
Apple - 5

O Observable está emitindo o nome das empresas, mas o método map transformou cada String emitida em um Pair (utilizando a extension function to) onde a chave é o nome da empresa e o valor é a quantidade de caracteres do nome da empresa.

flatMap

O flatMap é bem similar ao map, a diferença principal é que a função utilizada retorna um novo Observable (e não o dado modificado como no map). Isso é particularmente útil ao precisar modificar o dado emitido utilizando um outro Observable.

fun getSO(company: String) =
Observable.create<String> { emitter ->
emitter.onNext(
when (company) {
"Apple" -> "iOS"
"Google" -> "Android"
"Microsoft" -> "Windows"
else -> "Desconhecido"
}
)
emitter.onComplete()
}

Observable.just("Apple", "Google", "Microsoft")
.flatMap { s ->
getSO(s)
}
.subscribe { println(it) }
Resultado
---------------------------
iOS
Android
Windows

Perceba que nesse exemplo foi utilizado um outro Observable que emite o nome do sistema operacional de acordo com o nome do fabricante. Imagine que estivéssemos acessando um web service (ou o sistema de arquivos, por exemplo) para obter a lista de empresas e outro para obter o nome do sistema operacional.

Um detalhe importante é que o flatMap não mantém a ordem dos valores emitidos.

filter

Realiza uma filtragem para emitir apenas os itens que satisfaçam uma determinada condição.

Observable.fromArray(1, 2, 3, 4, 5, 6, 7, 8)
.filter { i -> i % 2 == 0 }
.subscribe { i -> print("$i, ") }
Resultado
---------------------------
2, 4, 6, 8,

zip

O operador zip é um dos que mais demonstra o poder o RX Java. Ele combina as emissões de dois ou mais Observables por meio de uma função e emite um item para cada combinação desses Observables.

Observable.zip(
Observable.just(
"Joey", "Ross", "Chandler", "Monica", "Rachel", "Phoebe"),
Observable.just(
"Barney", "Ted", "Marshall", "Lily", "Robin"),
BiFunction<String, String, String> { friend, himym ->
"$friend = $himym"
}
)
.subscribe { t -> println(t) }
Resultado
---------------------------
Joey = Barney
Ross = Ted
Chandler = Marshall
Monica = Lily
Rachel = Robin

A BiFunction é triplamente tipada <String, String, String>, sendo os tipos, respectivamente: o do primeiro Observable, o do segundo Observable e o tipo de retorno da função. Perceba que o valor "Phoebe" não foi emitido porque não houve uma emissão correspondente no segundo Observable (e porque eu não gosto dela ☺).

Caso precise de mais Observables, ao invés da BiFunction, você pode usar Function3, Function4, …, Function9.

concat x merge

Ambos combinam as emissões. Como o nome diz, o concat concatena em ordem duas ou mais emissões. Então um Observable só executará assim que seu antecessor terminar de emitir os itens.

Já o merge interpola as duas emissões, ou seja não mantém a ordem dos itens emitidos.

fun getRandomDelay() = (Math.random() * 3).toLong() * 1000L

val cidades = Observable.create<String> { emitter ->
listOf(
"Tóquio", "Rio", "Berlim", "Denver",
"Moscou", "Nairobi", "Helsinque", "Oslo"
).forEach {
Thread.sleep(getRandomDelay())
emitter.onNext(it)
}
emitter.onComplete()
}.subscribeOn(Schedulers.newThread())

val bb = Observable.create<String> { emitter ->
listOf("Walt", "Jesse", "Skyler", "Saul", "Hank")
.forEach {
Thread.sleep(getRandomDelay())
emitter.onNext(it)
}
emitter.onComplete()
}.subscribeOn(Schedulers.newThread())

Basicamente temos dois Observable emitindo seus dados com intervalos randômicos. Perceba que foi utilizado Schedulers.newThread() pois se ambos os Observables estiverem na mesma thread, o merge e o concat funcionarão da mesma forma.

Usando o concat um resultado constante, primeiros os itens do Observable cidades seria emitido, e só então os itens do Observable bb.

Observable.concat(cidades, bb)
.subscribe { t -> print("$t, ") }
Resultado
---------------------------
Tóquio, Rio, Berlim, Denver, Moscou, Nairobi, Helsinque, Oslo, Walt, Jesse, Skyler, Saul, Hank,

Com o merge teríamos resultados variados, já os itens dos observables são chamados de forma intercalada.

Observable.merge(cidades, bb)
.subscribe { t -> print("$t, ") }
Resultado (variável)
---------------------------
Walt, Jesse, Skyler, Tóquio, Rio, Berlim, Denver, Saul, Moscou, Nairobi, Helsinque, Hank, Oslo,

take

Consome apenas os primeiros N itens emitidos por um Observable.

Vejamos um exemplo simples.

Observable.just(1, 2, 3)
.take(2)
.subscribe { print("$it ") }
Resultado
---------------------------
1 2

Agora um exemplo mais real.

fun getRandomDelay() = (Math.random() * 5).toLong() * 1000L

val cache =
Observable.create<String> {
Thread.sleep(getRandomDelay())
it.onNext("Cache")
it.onComplete()
}.subscribeOn(Schedulers.newThread())

val network =
Observable.create<String> {
Thread.sleep(getRandomDelay())
it.onNext("Network")
it.onComplete()
}.subscribeOn(Schedulers.newThread())

Observable.merge(cache, network)
.take(1)
.subscribe { s -> println(s) }

Resultado
---------------------------
Depende. Cache ou Network

O código acima simula uma situação bem comum. Imagine que você tem uma informação na web e a mesma informação pode estar salva localmente no cache. Você pode realizar ambas as requisições (com o operador merge) solicitando o mesmo dado, e só pegar a primeira que foi retornada com o take(1). Note que novamente foi utilizado o Schedulers.newThread().

debounce

Somente emite um item de um Observable se um determinado tempo passou sem emitir um outro item.

Observable.create<Int> { emitter ->
emitter.onNext(1)
Thread.sleep(400) // 400 < 500, descarta o 1
emitter.onNext(2)
Thread.sleep(600) // 600 > 500, emite o 2
emitter.onNext(3)
Thread.sleep(100) // 100 < 500, descarta o 3
emitter.onNext(4)
Thread.sleep(600) // 600 > 500, emite o 4
emitter.onNext(5)
Thread.sleep(600) // 600 > 500, emite o 5
emitter.onComplete()
}
.debounce(500, TimeUnit.MILLISECONDS)
.subscribe { i -> println(i.toString()) }
Resultado
---------------------------
2
4
5

repeat

Cria um Observable que emite um item múltiplas vezes.

Observable.just("A", "B", "C")
.repeat(2)
.subscribe { print(it) }
Resultado
---------------------------
ABCABC

retry & retryWhen

Se um observable emitir um erro, é possível tentar novamente na esperança que ele complete sem erros.

Observable.create<String> { emitter ->
val numero = (Math.random() * 10).toInt()
if (numero < 5) {
emitter.onNext("Sucesso!")
emitter.onComplete()
} else {
emitter.onError(RuntimeException("Falhou! Tente de novo"))
}
}.retry(2)
.subscribe(
{ s -> println("TERMINEI! $s") },
{ e -> println("Erro") }
)
Resultado
(Varia de acordo com o número gerado)
---------------------------
TERMINEI! Sucesso!

Se o número gerado for maior ou igual a 5, um erro será gerado, mas o onError não será chamado imediatamente, pois estamos informando para o Observable tentar novamente por até duas vezes (com retry(2) ). O onError só será chamado se as duas chamadas seguintes não gerarem um número menor que 5.

Usamos esse exemplo, mas imagine que você está fazendo uma requisição a um servidor web, e por alguma razão a primeira chamada falhou. Você pode fazer isso usando o retry, mas o retryWhen permite tentar novamente apenas se uma determinada condição for satisfeita, inclusive permitindo um delay para a nova tentativa.

Observable.create<String> { emitter ->
val numero = (Math.random() * 10).toInt()
if (numero < 5) {
emitter.onNext("Sucesso!")
emitter.onComplete()
} else {
emitter.onError(RuntimeException("Falhou! Tente de novo"))
}
}.retryWhen { completed ->
completed.delay(2, TimeUnit.SECONDS)

}.subscribe(
{ s -> println("TERMINEI! $s") },
{ e -> println("Erro") }
)

Igual ao exemplo anterior, mas a nova tentativa só ocorre após 2 segundos.

Disposable

Ao realizar uma subscription em um Observable, uma instância de Disposable é retornada. Isso permite liberar os recursos e threads alocadas para o Observer por meio do método dispose.

Se estiver realizando várias subscriptions, podemos utilizar a classe CompositeDisposable, adicionar os objetos Disposable a ele, e liberar todos invocando o método clear. Isso é muito importante no Android para evitar leaks de memória.

val disposable = Observable.just("Hello").subscribe { println(it) } 
disposable.dispose()

val obs1 = Observable.just("A").subscribe { println(it) }
val obs2 = Observable.just("B").subscribe { println(it) }

val compositeDisposable = CompositeDisposable()
compositeDisposable.add(obs1)
compositeDisposable.add(obs2)
compositeDisposable.clear()

PublishSubject

A classe PublishSubject é uma subclasse de Subject e funciona como Observable e Observer ao mesmo tempo. Pois além receber dados (como um Observer) ele também emite (como um Observable). Desta forma, um Subject possui acesso aos métodos onNext, onComplete e onError (no Observer esses métodos estão disponíveis apenas internamente).

class MainActivity : AppCompatActivity() {
val source = PublishSubject.create<String>()
val disposables = CompositeDisposable()
var count = 0

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
disposables.add(
source.subscribe(
{ dado -> textView.text = dado },
{ e -> textView.text = "Erro" }
)

)
button.setOnClickListener {
source.onNext("Mensagem ${++count}")
if (count == 10) {
source.onComplete()
}
}
}
override fun onDestroy() {
super.onDestroy()
disposables.clear()
}
}

Nesse exemplo, quando o botão é clicado, um novo dado é emitido pelo PublishSubject e a variável count é incrementada. Com isso, o Observer registrado no método subscribe é chamado e o texto do TextView é atualizado. Quando count chegar a 10, onComplete é chamado e a sequência termina.

ConnectableObservable

Por padrão, cada vez que um Observer se registra em um Observable, a sequência é enviada novamente. É possível fazer com que a mesma emissão seja enviada para Observables diferentes utilizando a classe ConnectableObservable.

val observable = Observable.just("Event")
.map { s ->
println("Expensive operation for $s")
s
}
.publish()
.autoConnect(2)

observable.subscribe { s -> println("Sub1 got: $s") }
observable.subscribe { s -> println("Sub2 got: $s") }

Resultado

---------------------------
Expensive operation for Event
Sub1 received: Event
Sub2 received: Event

O método publish() retorna um ConnectableObservable no qual os Observers podem se registrar. Foi utilizado o autoConnect(2) para determinar que as emissões podem começar após dois observables se registrarem. Outra forma seria utilizar o método connect() para que o envio seja realizado para todos os Observers.

Fonte: https://blog.danlew.net/2016/06/13/multicasting-in-rxjava/

Conclusão

RX Java é uma biblioteca importantíssima no desenvolvimento de aplicações Android e muito utilizada pela comunidade. Por isso, o seu conhecimento é essencial por parte dos desenvolvedores. Esse post é apenas um conjunto de anotações que eu tinha feito quando estava estudando RX. Espero que seja útil de alguma forma para alguém.

Referências

Nesse outro post eu coloquei um monte de links sobre RX Java.

Meus agradecimentos a Diego Nascimento e Ubiratan Soares pela revisão desse artigo!

Qualquer dúvida, deixe seus comentários. =)

4br4ç05

--

--

Nelson Glauber
Nelson Glauber

Written by Nelson Glauber

Android Developer. Google Developer Expert for Android/Kotlin. Author of “Dominando o Android”.