What is RxJava in Korean

RxJava 와 Android

Reactive Programming 은 최근에 큰 화두가 최근 2년간 유행어처럼 번져 나간 최고의 화두입니다. 서버 백엔드부터 Front 클라이언트들까지 두루두루 그 영향이 미치지 않은 곳이 없다 싶을 정도로 많은 곳에서 회자되고 있습니다.

Reactive Programming 은 위키에서는 다음과 같이 정의되어 있습니다.

컴퓨팅으로써 반응형 프로그래밍은 데이터의 흐름과 변화에 대한 전달을 기반으로 하는 프로그래밍 패러다임이다.

끊임없이 요청/변경되는 데이터에 반응하기 위해 나온 Reactive Programming 은 데이터를 처리함에 있어서 비동기적으로 데이터를 처리할 때 효율적으로 할 수 있도록 하기 위한 방법입니다.

다음의 그림을 보시면 더 쉽게 이해하실 수 있습니다. Source : LightBend Reactive Programming vs Reactive Stream

일반적인 비동기 데이터 처리가 처리가 끝날때까지 쓰레드를 대기시키거나 콜백을 받아서 처리하기 때문에 불필요한 컴퓨팅 리소스 사용이 발생하게 됩니다. 반면 Messaging 기반의 Reactive Programming 에서는 필요한 경우에만 쓰레드를 생성후 메세지 형태로 전달하기 때문에 더 효율적으로 컴퓨팅 리소스를 사용할 수 있습니다.

RxJava 란 무엇인가?

MS 에서 .NET 의 Reactive Extension 의 Java 버전으로 데이터베이스의 데이터 처리 과정에서 비동기 처리를 효율적으로 하기 위해서 만든 뒤 큰 호응을 얻어 차차 다른 언어로 개발되어 지금에 이르러 RxSwift, RxJS, RxJava 등등 다양한 언어로 사용할 수 있게 된 것입니다.

RxJava 의 역사와 로드맵은 다음과 같습니다.

  • 2014년 11월 - 1.0 배포

  • 2016년 10월 - 2.0 배포

  • 2017년 07월 - 1.0 기능개발 중단, 버그 수정만 함

  • 2018년 03월 - 1.0 유지보수 종료.

2017년 여름이면 1.0 은 더이상 추가개발을 하지 않을 것이며 이는 공식 문서상에 2.0이 배포된 후에 공개된 내용입니다.
앞으로 새롭게 RxJava 를 배울 분들은 RxJava 2.0 을 배우시길 권장합니다.

본 포스팅에서는 RxJava 를 경험하지 못한 분들이 최대한 빨리, 쉽게 RxJava 를 쓸 수 있도록 사용 예시를 위주로 설명하도록 하겠습니다.

RxJava 의 기초 이해하기

RxJava 는 데이터를 가공/변형/처리를 하는 라이브러리입니다. RxJava 에서 처리할 수 있는 데이터의 형태는 Nothing, One, More, Unlimited Datas 를 처리할 수 있습니다. 그에 따라서 Completable, Maybe, Single, Observable, Processor 로 각각 처리할 수 있습니다.

데이터의 형태에 따라 다음과 같이 분류할 수 있습니다.

Completable Maybe Single Observable Subject
Nothing O
One O O O O
More O O
Unlimited O O

일반적으로 사용하는 Observable 보도록 하겠습니다.

기존 코드를 RxJava 로 바꿔보기

다음과 같이 예시를 프로그래밍하도록 해보겠습니다.

List<Integer> datas = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

0~10까지의 Int 형 배열이 있습니다.
Int 형 배열을 a 부터 j 까지 변경하여 화면에 출력하도록 하겠습니다.

일반적인 처리는 다음과 같이 예상할 수 있습니다.

for (int data : datas) {
  char value = (char) data + 'a';
  print(value);
}

위의 코드를 RxJava 로 변경하면 다음과 같이 바꿀 수 있습니다.

Observable.fromIteratable(datas)
  .subscribe(data -> {
    char value = (char) data + 'a';
    print(value);
  });

위의 RxJava 코드는 기존의 코드에 비해서 특이점을 찾기 어렵습니다.

RxJava 의 map 이라는 처리 기능을 활용해보도록 하겠습니다.

Observable.fromIteratable(datas)
  .map(data -> (char) data + 'a')
  .subscribe(value -> print(value));

map 은 최초에 들어온 데이터를 다른 형태로 변경 시킬 수 있는 처리 기능입니다.


이제 기존의 요구사항에 1개가 더 추가되었습니다.

0~10까지의 Int 형 배열이 있습니다.
Int 형 배열을 a 부터 j 까지 변경하여 화면에 출력하도록 하겠습니다.
추가 1 : 짝수에 해당하는 숫자는 출력하지 말 것.

기존의 코드를 수정해보도록 하겠습니다.

for (int data : datas) {
  if ((data  % 2) != 0) {
    char value = (char) data + 'a';
    print(value);
  }
}

RxJava 로 변경하면 filter 라는 처리 기능을 추가적으로 사용할 수 있습니다.

Observable.fromIteratable(datas)
  .filter(data -> (data % 2) != 0)
  .map(data -> (char) data + 'a')
  .subscribe(value -> print(value));

filter 는 반환이 true 이면 값을 다음 단계로 전달 할 수 있고 false 이면 전달하지 않습니다.


다시 요구사항에 더 추가되었습니다.

0~10까지의 Int 형 배열이 있습니다.
Int 형 배열을 a 부터 j 까지 변경하여 화면에 출력하도록 하겠습니다.
추가 1 : 짝수에 해당하는 숫자는 출력하지 말 것.
추가 2 : 각 문자는 10번씩 반복되도록 한꺼번에 출력되도록 할 것.

for 형태의 코드를 수정하면 다음과 같이 될 것입니다.

List<Character> chars = new ArrayList<>();
for (int data : datas) {
  if ((data  % 2) != 0) {
    char value = (char) data + 'a';
    for (int idx = 0; idx < 10; idx++) {
      chars.add(value);
    }
  }
}

print(chars);

RxJava 는 다음과 같습니다.

Observable.fromIteratable(datas)
  .filter(data -> (data % 2) != 0)
  .map(data -> (char) data + 'a')
  .flatMap(value -> Observable.range(0, 10).map(index -> value))
  .collect(() -> new ArrayList<Character>(), (chars, value) -> chars.add(value))
  .subscribe(values -> print(values));

flatmap 은 1개의 데이터에 대해 n 개의 데이터로 변형할 수 있는 처리 기능입니다.
collect 는 주입된 데이터를 특정 Collection 에 담을 수 있도록 하는 처리 기능입니다.

Observable.range 는 (n, m) 에 대해 n 부터 n + m -1 만큼 int 형 데이터를 호출해주는 데이터로 Observable.range(0, 10).map(index -> value)) 는 value 라는 값을 10번 반복하도록 하였습니다.

Subject 알아보기

여태까지 알아본 Observable 은 정해진 데이터를 전달하는 경우에 사용되고 있습니다.

하지만 데이터가 언제 어떻게 시작될지 모르는 경우도 있습니다.
가장 일반적인 예시로 사용자가 화면을 터치하는 것은 언제 얼마나 할 지 알 수 없습니다.

이런 경우에는 Subject 를 사용할 수 있습니다.

앞서 Observable 에 맞춰서 예시를 작성해보겠습니다.

PublishSubject<Integer> subject = PublishSubject.<Integer>create();
subject
  .filter(data -> (data % 2) != 0)
  .map(data -> (char) data + 'a')
  .flatMap(value -> Observable.range(0, 10).map(index -> value))
  .collect(() -> new ArrayList<Character>(), (chars, value) -> chars.add(value))
  .subscribe(values -> print(values));

위와 같이 데이터가 들어왔을 때 동작할 것을 미리 정의를 한 후 다음과 같이 데이터를 넣으면 됩니다.

subject.onNext(0);
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.onNext(4);
subject.onNext(5);
// 이하 생략...

Observable 과 Subject 의 가장 큰 차이는 내부에서 데이터를 주입하느냐 외부에서 주입받느냐의 차이로 이해하시면 됩니다.

비동기 처리 하기

RxJava 를 사용하는 가장 큰 이유는 비동기처리에 굉장히 유연하고 쉽게 사용할 수 있다는 것에 있습니다.

RxJava 에서 주로 쓰는 비동기 쓰레드 선언은 다음과 같습니다.

  • Schedulers.io()
    • Network IO 나 File IO 등을 처리하기 위한 쓰레드
  • Schedulers.computation()
    • 단순 연산로직등에 사용되며 Event Looper 에 의해 동작하는 쓰레드
  • AndroidSchedulers.mainThread()
    • Android 에서 UI Thread 에서 처리하기 위한 쓰레드

이 외에도 trampoline(), immediately() 등이 있으나 주로 쓰는건 위의 3개로 이해하시면 됩니다.

RxJava 에서 비동기 쓰레드 선언은 데이터 주입과 데이터의 처리 시점을 위하여 있습니다.

  • 데이터 주입 : subscribeOn(쓰레드)
    • 데이터를 주입하는 시점에 대한 쓰레드 선언이며 모든 stream 내에서 최종적으로 선언한 쓰레드가 할당됩니다.
  • 데이터 처리 : observeOn(쓰레드)
    • 쓰레드를 선언한 다음부터 새로운 쓰레드가 선언되기 전까지 데이터 처리에 동작할 쓰레드를 할당합니다.

이제 비동기 처리된 코드를 보도록 하겠습니다.

Observable.create(emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    emitter.onNext(4);
    emitter.onComplete();
  })
  .subscribeOn(Schedulers.computation())
  .observeOn(Schedulers.io())
  .filter(data -> (data % 2) != 0)
  .map(data -> (char) data + 'a')
  .observeOn(Schedulers.computation())
  .flatMap(value -> Observable.range(0, 10).map(index -> value))
  .collect(() -> new ArrayList<Character>(), (chars, value) -> chars.add(value))
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(values -> print(values));

위의 코드를 각각 기능을 나누어서 동작할 쓰레드를 구분하면 다음과 같습니다.

  • Observable.create() - Schedulers.computation()
  • filter() - Schedulers.io()
  • map() - Schedulers.io()
  • flatMap() - Schedulers.computation()
  • collect() - Schedulers.computation()
  • subscribe() - AndroidSchedulers.mainThread()

Wrap Up

RxJava 는 데이터 비동기 처리를 효율적으로 하기 위해 나온 라이브러리입니다.
데이터 단위로 처리해야 할 기능을 선언하고 비동기 선언이 쉽게 될 수 있도록 지원하고 있습니다.

Observable 은 1개 이상의 무제한 데이터를 처리하는 기능을 지원합니다.
Subject 는 1개 이상의 무제한 데이터를 처리할 수 있지만 Observable 과의 차이는 외부에서 데이터를 주입할 수 있습니다.

쓰레드 선언은 subscribeOnobserveOn 으로 나뉘며 각각 데이터 주입과 데이터 처리 시점을 위해 선언됩니다.
주로 사용되는 쓰레드는 Schedulers.io()Schedulers.computation() 입니다.
Android 의 UI Thread 는 AndroidSchedulers.mainThread() 를 사용하시면 됩니다.

RxJava 는 다양한 처리 기능을 제공합니다.
본문 예로 든 filter, map, flatMap 등은 극히 예시의 일부이며 자세한 정보는 Rx 의 문서를 참고하시길 바랍니다.

0 articles

results for ""

    No results matching ""