RxJava trong Android -Android Mastery
RxJava trong Android -Android Mastery

Android Thread: Kiến Thức RxJava

Có nhiều cách để thao tác với Thread trong Android: Thread, ThreadPoolExecutor, HandlerThead hoặc RxJava hoặc Kotlin Coroutines. Trong chuỗi bài viết này mình sẽ mang đến đầy đủ 3 mảng kiến thức để làm rõ các khái niệm, cách sử dụng, ưu nhược điểm của 3 phương pháp này cho bạn đọc.

RxJava là gì?

Khái niệm

RxJava là thư viện viết bởi ngôn ngữ Java, với mục đích đơn giản hóa việc tiếp nhận và xử lý logic bất đồng bộ nhiều Thread trong chương trình. RxJava nhìn nhận logic chúng xử lý là 1 data stream (luồng dữ liệu). Trên các data stream này RxJava cung cấp các hỗ trợ đắc lực để thao tác, gọi là các operator.

Bạn có thể gọi các Data Stream có thể được gọi là các Observable Object (đối tượng mà có thể được theo dõi sự thay đổi dữ liệu)

Các loại Observable Object trong RxJava

Trong lập trình có 2 khái niệm Hot Stream, Cold Stream dùng để chỉ các kiểu luồng dữ liệu với các đặc tính riêng.

Việc quy hoạch các loại Observable Object trong RxJava cũng sẽ được mình phân loại trên 1 trong 2 loại này. Các tiêu chuẩn phân loại dựa trên sự khác biệt của thời điểm mà object đó emit item, và cách chúng quản lý các Observer Object.

TypeSpecsVí dụ
Cold Observable (Cold Stream)– Chỉ emit data khi có subscriber.
– Mỗi subscriber của Cold Observable sẽ nhận một data stream độc lập riêng biệt.
– Query data base theo các điều kiện có sẵn
– Gọi và xử lý REST API
– Một task vụ long run, và yêu cầu kết quả trả về (Scale Image Bitmap, Clean cache, long run calculation, ..)
Hot Observable (Hot Stream)– Có thể emit data ngay cả khi không có subscriber nào.
– Các Subcribers chia sẻ chung một source data stream, điều này đồng nghĩa với các subscriber đăng ký nhận data sau sẽ có khả năng bị thiếu data
– Đăng ký newsletter cho blog, người đăng ký sau sẽ không thể nhận đủ các newsletter đã có trước đó.
– Live stream game / bóng đá / phát sóng tivi, người join sau sẽ phải xem tiếp tục chương trình đã có sẵn.
Phân loại Observable Object trong RxJava
Observable ObjectTính năng đặc biệt
SingleKhi chạy sẽ emit duy nhất 1 item, hoặc 1 callback onError. Kiểu của Item sẽ phụ thuộc vào định nghĩa ban đầu của Single<T>
CompletableKhông emit item nào, nhưng sẽ trả về callback onComplete, hoặc onError khi chạy xong.
ObservableCó thể emit nhiều hơn 1 item, vẫn có callback onError giống các Observable khác. (Giống Single, nhưng có thể emit nhiều hơn 1 item)
FlowableGiống với Observable, nhưng có hỗ trợ các option để xử lý Backpressure
Các Cold Observable trong RxJava
Observable ObjectTính năng đặc biệt
PublishSubjectLà một stream có thể có nhiều Subscriber cùng 1 lúc. PublishSubject emit dữ liệu theo thời gian thực, các Subscriber tham gia vào luồng dữ liệu sau sẽ không nhận được các item đã emit trước đó.
BehaviorSubjectLà 1 stream tương tự PublishSubject nhưng đặc biệt hơn ở chỗ các Subscriber tham gia vào luồng dữ liệu sau sẽ nhận được 1 item gần nhất đã emit trước đó trước khi nhận các emit item mới.
Các Hot Observable trong RxJava

Các loại Operator trong RxJava

RxJava hỗ trợ nhiều operator cho việc thao tác, xử lý. Việc học thuộc hết cách sử dụng RxJava operator là vô nghĩa. Hãy tiếp thu, tham khảo và vận dụng chúng phù hợp theo từng bài toán cụ thể của bạn!

RxJava operatorÝ nghĩaExample
mapmap item (được emit) của Observable thành một kiểu dữ liệu mới.
.map { it -> it * it }
Dùng để thay đổi kiểu dữ liệu trước khi Subscriber nhận được kết quả. Thường dùng trong các logic lấy dữ liệu từ Raw Datasource (Database, Network, calculation, ..) trước khi đưa lên View.
flatMapCũng là map, nhưng đối tượng trả về ở đây bắt buộc là một Observable Object. Có thể hiểu rằng bạn đang dùng kết quả trả về của 1 emission trước đó cho một Observable tiếp theo (chaining).
.flatMap { it -> Observable.just(it * 2) }
Dùng để chaining nhiều Observable liên tiếp nhau, kết quả emission của Observable này là input của Observable tiếp theo. Khiến cho logic và code dễ đọc, dễ hiểu + debug.
Ví dụ điển hình: Bạn đang có danh sách 100 image urls được lưu trữ trong local database, và cần download chúng một cách tuần tự. Giải pháp lúc này là chúng ta có thể lấy ra danh sách 100 urls này, và cho đi download lần lượt thông qua toán tử flatMap
filterLà một toán tử để lọc các item được emit theo 1 điều kiện nhất định trước khi được subscriber nhận và xử lý.Trong một vài logic cụ thể bạn cần lọc các kết quả trước khi xử lý để giảm tải logic ở đầu subscriber thì có thể dùng filter.
Ví dụ bài toán: là ở đầu network trả về liên tục các reactions, comments trong 1 phiên live stream. Tuy nhiên vì số lượng quá nhiều, bạn chỉ muốn filter các reaction thả tim để hiển thị lên màn hình user. Thì đây là 1 toán tử phù hợp để sử dụng.
distinctCũng tương tự filter nhưng gắt gao, khắt khe hơn. Distinct có nhiều biến thể, về cơ bản là nó filter để cho đầu subscriber không phải nhận những item bị trùng lắp nhau.Tự search nha fen. Quá dễ rồi.
distinctUntilChangedTương tự distinct, nhưng nó không cho phép 2 value trùng nhau được nhận và xử lý liên tục ở đầu subscriber (down stream).
throttleFirstthrottle = bóp cổ
first = đầu tiên

Việc throttle sẽ được trigger ngay sau khi nhận item đầu tiên (sẽ không có delay cho item đầu tiên)
Lười viết quá, khi nào code khắc biết.
Thường dùng trong các xử lý UI Event, hoặc cũng có thể là data stream.
throttleLastthrottle = bóp cổ
last = cuối cùng
—-
Việc throttle sẽ được trigger ngay từ đầu (sẽ luôn có delay)
Thích thì dùng, không thì thôi cũng được. Tùy chiến thuật mà mình chọn.
debouncedebouce = bật ngược ra / từ chối
——
Khi có 1 item được emit, debouce sẽ chưa truyền nó xuống down stream (subscriber) ngay mà vẫn hold tại đó, tiếp tục đợi hết thời gian debouce.
Nếu trong thời gian đợi debouce mà lại tiếp tục có item được emit thì sẽ drop (loại bỏ) item đang hold, giữ item mới và tiếp tục đợi 1 khoảng debouce time.
Cứ thể lặp mãi cho đến khi trong debouce time không có item nào được emit nữa thì down stream (subscriber) mới được nhận và xử lý.
Xài cái này thì phải cẩn thận kẻo không bao giờ nhận được item luôn.
Kinh nghiệm của mình có thể dùng nó trong search typing text, hoặc xử lý các strictly button event để tránh spam.
Bảng mô tả ngắn gọn 1 số operator mà mình hay dùng trong quá trình làm việc với RxJava – Android

Backpressure trong RxJava?

Cái này để dành viết riêng 1 bài nha. Kiến thức của nó dài, ảo diệu và khó hình dung.

RxAndroid là gì?

RxAndroid là một extension của RxJava dành riêng cho Android Platform. Nó cung cấp các tích hợp để RxJava hoạt động tốt, ổn định trên hệ điều hành Android.

Câu hỏi: Vì sao cần RxAndroid? Nếu không có RxAndroid thì có dùng được RxJava trong Android không?

Trả lời: Cần RxAndroid để RxJava có thể hoạt động được với các Android Component. Nếu không có RxAndroid, RxJava vẫn có thể hoạt động được. Nhưng không thể tích hợp được với Android Component (Fragment, Activity, UI Thread …)

usersObservable
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread()) // RxAndroid extension
    .subscribe(users -> {
        // Update the UI with the list of users
    });

Một số điểm mạnh của RxJava so với cách lập trình cũ

RxJava dễ sử dụng, dễ hiểu với người có nền tảng tốt về Threading, lập trình Multi Threads.

  • RxJava sử dụng ThreadPool và được quy hoạch trong các Schedulers, giúp cho code trở nên sạch và tường minh hơn. Schedulers.io, Schedulers.computation. Điều này cũng giúp cho phần mềm viết ra tận dụng được tốt tài nguyên trong chương trình.
  • RxJava quản lý việc subscribe lên các Observable Object bằng các Disposable Object. Điều này đồng nghĩa lập trình viên có thể chủ động quản lý lifecycle của các data stream. Chủ động tránh memory leaks.
  • RxJava cung cấp giải pháp quản lý Error thông qua callback onError cho subscriber. Chương trình viết với RxJava sẽ an toàn hơn với các lỗi crash trong quá trình xử lý ở upstream.
  • RxJava cung cấp giải pháp switch thread một cách dễ hiểu, dễ sử dụng thông qua 2 phương thức subscribeOn, observeOn

Một số drawback của RxJava

Gọi là drawback của RxJava mà không phải disadvantages vì đây sẽ luôn là vấn đề của multi threading. Cái mình muốn truyền đạt ở đây là chỉ ra vấn đề còn tồn đọng để tránh khi làm việc. Thật ra không thể có giải pháp hoàn hảo dù chúng ta dùng công cụ nào cho multi threading programming.

  • RxJava sử dụng ThreadPool, nhưng nếu tất cả các Thread trong ThreadPool đều đang được tận dụng thì RxJava sẽ tạo thêm Thread. Điều này vô tình vẫn gây ra overhead cho ứng dụng. Xin nhấn mạnh rằng đây không phải là nhược điểm, đây là sự thật hiện hữu trong lập trình. Mình chỉ ra để chúng ta hiểu RxJava không phải là thuốc giải cho mọi vấn đề trong multi thread Java / Android.
  • Callback hells – đương nhiên rồi vì multi thread là callback hells. It’s what it’s
  • Khi nào nghĩ ra mình sẽ viết vào đây tiếp!