Xử lý Backpressure với RxJava - Android Mastery Dan Tech

Android Thread: Kiến Thức RxJava Backpressure

Ở bài viết trước chúng ta đã hiểu về RxJava và 1 số hỗ trợ của nó trong việc lập trình phần mềm Java / Kotlin. Tiếp tục ở bài này chúng ta nói về 1 khái niệm thú vị trong lập trình Backpressure.

Backpressure là gì?

Backpressure là hiện tượng Cung nhiều hơn Cầu.

Vài ví dụ trong thực tiễn

Ví dụ 1: Tại rạp chiếu phim ABC chỉ có một cổng bán vé. Hôm nay có bộ phim mới rất Hot ra mắt khiến cho hàng trăm người đến xếp hàng mua vé. Cổng bán vé có khả năng xử lý yêu cầu của 1 khách trong vòng 3 phút, hiện tại hàng còn 200 người đang đợi mua vé và chỉ còn 45 phút nữa là phim chiếu.

Lúc này cho dù cổng bán vé làm việc hết công suất thì đến lúc phim chiếu cũng chỉ có thể xử lý cho khoảng 15 khách nữa, số còn lại sẽ không vào rạp kịp để xem phim đúng giờ. -> Backpressure lên hệ thống xử lý bán vé thủ công của rạp chiếu phim

Ví dụ 2: Năm 2069 lượng mưa trên thượng nguồn quá lớn làm cho lưu lượng nước chảy xuống các đập thủy điện nhanh và rất bất ngờ. Các van điều nước của đập mặc dù đã hoạt động hết công suất nhưng vẫn không thể điều phối lưu lượng nước 1 cách hiệu quả, áp lực nước càng ngày càng tăng lên hệ thống đập. Các giám đốc, lãnh đạo của đơn vị đập thủy điện buộc phải đưa ra quyết định mở đập xả lũ trong vòng 12h tiếp theo để giảm áp lực lên hệ thống điều phối nước. -> Backpressure lên hệ thống điều phối nước của đập thủy điện, buộc phải có phương án nhất thời để bảo vệ sức chịu tải của đập.

Vài ví dụ trong lập trình

Ví dụ 1: Vào ngày BackFriday lượng khách hàng tham gia mua hàng của các sàn thương mại điện tử gia tăng đột biến trong khung giờ 11h sáng và 11h đêm. Với hệ thống hiện tại không thể xử lý 1 cách mượt mà các giao dịch của khách hàng. -> Backpressure lên hệ thống đăng nhập, thanh toán của ứng dụng mua hàng.

Ví dụ 2: Bạn là một Mobile Engineer, thực hiện mở một Socket Connection để nhận về dữ liệu Real time Crypto Market. Socket sẽ trả về dữ liệu cho bạn khoảng 3 – 5s 1 lần. Trong khoảng thời gian này Mobile đủ sức để nhận, xử lý và Render trên ứng dụng. Tuy nhiên, 1 hôm Elon Musk đã đăng một dòng trạng thái trên X.com để shill Doge coin, và rồi mọi thứ điên rồ xuất hiện khi các Trader vào, ra lệnh liên tục làm cho hệ thống Crypto Market phản hồi liên tục, và trả về dữ liệu Market cho bạn với tần suất 500ms 1 lần. Lúc này thiết bị di động của bạn đã bị đơ thì quá nhiều phản hồi từ Socket -> UI Render -> trải nghiệm khó chịu cho khách hàng -> Backpressure lên hệ thống xử lý & Render của ứng dụng Mobile.

Khái niệm backpressure trong lập trình

Backpressure là hiện tượng xảy ra sự chênh lệch giữa tốc độ xử lý của ConsumerProducer theo hướng tốc độ Emit 1 Item của Producer nhanh hơn tốc độ xử lý 1 Item của Consumer.

Backpressure is a situation where there is an imbalance between the consumer and the producer. In this case, the producer generates items faster than the consumer can process them.

Learn this, win the Interview in 30s

Backpressure trong RxJava xảy ra khi nào

Trong RxJava chúng ta có các Observable Object có khả năng Emit items, và các Subscriber sẽ nhận các Item này và xử lý theo từng nhu cầu riêng.

Backpressure xảy ra khi tốc độ gọi onNext của emitter nhanh hơn tốc độ xử lý ở đầu subscriber

Backpressure trong Observable

Observable.create {
    var i = 0L
    // dummy call to emit items
    while (true) {
        i += 1
        it.onNext(LongArray(DummyDataSize) { i }) // simulate large data
        Thread.sleep(10) // emit every 10 ms
    }
} .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.computation())
  .subscribe({
      Thread.sleep(100) // simulate computation
      Log.d("BackpressureViewModel", "onRxJavaObservableBackpressure: ${it[0]}")
  }, {
      Log.d("BackpressureViewModel", "onRxJavaObservableBackpressure: Error: $it")
  })

Xem ví dụ trên đây bạn có thể thấy, ở trong scope của hàm create, chúng ta emit các item với tốc độ 10ms 1 lần. Nhưng ở đầu subscribe chúng ta xử lý mỗi item tốn tới 100ms. Như vậy xử lý xong 1 item đã có 10 items đang đợi sẵn để được xử lý tiếp; xử lý 10 items thì đã có 90 items đang đợi nữa rồi và con số sẽ tăng lên mãi không ngừng đến khi Resource của thiết bị không thể đáp ứng được sẽ xảy ra Crash.

Các Crash trong trường hợp này thường là OutOfMemory Exception. Ở đây hệ thống không ghi nhận bạn code sai, vì code thật sự không sai. Chỉ là đang quản lý Memory chưa được tốt nên xảy ra tình trạng này.

Backpressure trong Subject

PublishSubject.create()
   .subscribeOn(Schedulers.io())
   .observeOn(Schedulers.computation())
   .subscribe({
      Thread.sleep(100) // simulate computation
      Log.d("BackpressureViewModel", "onRxJavaSubjectBackpressure: ${it[0]}")
  }, {
      Log.d("BackpressureViewModel", "onRxJavaSubjectBackpressure: Error: $it")
  })
  
// dummy call to emit items
var i = 0L
while (true) {
    i += 1
    subject.onNext(LongArray(DummyDataSize) { i }) // simulate large data
    Thread.sleep(10) // emit every 1 ms
}

Subject có cách hoạt động khác 1 chút so với Observable nên bạn có thể thấy ta được quyền chủ động emit item cho Subject ở một nơi bất kỳ, không cần thiết phải trong scope của create giống như Observable.

Trong 2 ví dụ trên, khi bạn set trị số cho DummyDataSize càng lớn, thì Backpressure xảy ra càng nhanh, OutOfMemory xảy ra càng nhanh.

Hãy checkout source code tại GitHub của mình để tìm hiểu thêm nhé: https://github.com/dantech0xff/AndroidConcurrencyExamples

Các hướng tiếp cận Backpressure trong RxJava

99% Backpressure sẽ gây ra OutOfMemory. Ngoài OutOfMemory Exception, trước khi nó xảy ra User đã phải trải qua 1 số trải nghiệm tệ trên phần mềm: đơ, lag, chậm, hoặc nóng máy.

Để xử lý Backpressure chỉ có 1 cách, là đừng để nó xảy ra. Để làm được điều đó bạn có thể tham khảo 3 hướng dưới đây

Throttle

Throttle – dịch trắng nghĩa là bóp cổ.

Throttle trong RxJava để phân tán đầu ra của data stream sao cho chúng cách đều nhau ít nhất 1 khoảng thời gian (t), trong lúc phân tán ta có thể sẽ loại bỏ bớt 1 số item bị thừa. Có 3 kiểu Throttle bạn có thể tìm hiểu

Throttle First

RxJava Throttle First mechanic – Source
observable.throttleFirst(1000, TimeUnit.MILLISECONDS)

Throttle First là một giải pháp, loại bỏ bớt các emitted items dư thừa nếu chúng được emit trong một khoảng thời gian nhất định.

Giả sử item đầu tiên của throttleFirst(1000, TimeUnit.MILLISECONDS) nhận vào lúc timestampMillis = t, thì trong khoảng thời gian t + 1000 sẽ không có bất cứ item nào được subscriber xử lý nữa. Item đầu tiên được emit từ mốc thời gian t + 1000 trở đi sẽ được subscriber xử lý, và mốc t được reset lại như mới.

Câu hỏi phỏng vấn: Throttle First có nhận được đầy đủ Item đầu tiên và cuối cùng của Observable không?

Trả lời: Throttle First nhận được Item đầu tiên từ Observable. Tuy nhiên trường hợp Last item được emit trong khoảng thời gian bị throttle (bị drop) thì subscriber sẽ không thể nhận được, dẫn đến không có last item.

Throttle Last

RxJava Throttle Last mechanic – Source
throttleLast(1000, TimeUnit.MILLISECONDS)

Tương tự với cách hoạt động của Throttle First, tuy nhiên ở cách xử lý này chúng ta sẽ chỉ nhận Item được emit cuối cùng trong khoảng thời gian t + 1000 với t là thời điểm subscribe, 1000 là deltatime của throttleLast.

Câu hỏi phỏng vấn: Throttle Last có nhận đủ Item đầu tiên và cuối cùng từ Observable không?

Trả lời: Throttle Last có thể sẽ không nhận được Item đầu tiên từ Observable, nếu có một item được emit ngay sau đó, và đảm bảo vẫn đang trong khoảng thời gian Throttle. Throttle Last cũng có thể không nhận được Item cuối cùng từ Observable, nếu nó được emit trong 1 khung thời gian Throttle, và liền sau đó là một onComplete.

Throttle Latest

RxJava Throttle Latest mechanic – Source

Throttle Latest là hướng tiếp cận giúp Subscriber nhận được Item đầu tiên từ Observable giống như Throttle First, từ Item thứ 2 trở đi sẽ được xử lý giống như Throttle Last.

Ứng dụng Throttle

Throttle có thể được dùng để Comsumer phân tán tần suất nhận dữ liệu một cách đều đặn. Có thể ví dụ: Game Event, Video Streaming, Web Socket Event, …

Debounce

RxJava Debounce mechanic – Source

Debounce là một hướng tiếp cận hoàn toàn khác với Throttle. Trong khi Throttle sẽ chặn việc emit Item theo 1 khung thời gian, thì Debounce lại linh hoạt kéo dãn cái khung thời gian đó ra (có thể đến vô tận nếu thời gian giữa 2 lần emit < debounce time)

Debounce có thể được sử dụng trong các xử lý ta cần hạn chế spam event. Có thể ví dụ: Action Click Event, Search Text, . ..

Flowable

Flowable không phải là một operator trong RxJava. Flowable là một Alternative của Observable với các hỗ trợ xử lý Backpressure mạnh mẽ và tiện dụng hơn.

Flowable hỗ trợ 5 chiến lược xử lý Backpressure

Chiến lược BackpressureGiải thích
MISSINGTrả ra một Error MissingBackpressureException nếu Consumer không thể xử lý kịp Backpressure (Không crash)
ERRORTương tự như MISSING, trả ra một MissingBackpressureException để báo hiệu Consumer không thể xử lý kịp Backpressure (Không crash)
BUFFERConsumer sẽ cố gắng xử lý nhiều nhất có thể những dữ liệu được gửi xuống từ Flowable, trong trường hợp OutOfMemory sẽ Crash app
DROPTrong trường hợp Consumer không thể xử lý kịp các Emit từ Producer, và Buffer đã đầy thì Producer sẽ chủ động DROP (bỏ bớt) Item đang đợi, khi nào Buffer được trống, Producer sẽ push Item mới nhất vào trong Buffer để cho Consumer xử lý.
LATESTLATEST cũng tương tự với DROP nhưng behavior khác.
Trong trường hợp Buffer bị đầy, Producer với chiến lược LATEST cache 1 Item cuối cùng nó đang giữ. Khi nào Buffer available trở lại, Producer sẽ push Item này vào Buffer để đưa đến Consumer. -> Cách làm này giúp tránh mất data của Latest Item.
Bảng mô tả các chiến lược xử lý Backpressure của RxJava Flowable

Mặc dù Flowable đã có giải pháp để xử lý Backpressure, tuy nhiên ta vẫn có thể gặp OOM Crash trong trường hợp Emit Item có dung lượng quá lớn. Lúc này tuy Buffer chưa được đầy, nhưng bộ nhớ máy đã bị chiếm gần hết dẫn đến OutOfMemory Exception. Khi gặp các trường hợp như thế này ta buộc phải quay lại tìm hiểu vấn đề Memory xảy ra ở đâu, và tìm cách tối ưu chúng.

Happy Coding!