Subject 是一種橋樑或代理,在 ReactiveX 的某些實作中可用,它同時扮演觀察者 (observer) 和 Observable 的角色。由於它是觀察者,它可以訂閱一個或多個 Observable;由於它是 Observable,它可以重新發射它觀察到的項目,並且也可以發射新的項目。
因為 Subject 訂閱了一個 Observable,它會觸發該 Observable 開始發射項目(如果該 Observable 是「冷的」,也就是說,它會在訂閱後才開始發射項目)。這可能會使最終的 Subject 成為原始「冷的」Observable 的「熱的」Observable 變體。
在大多數 ReactiveX 實作中,尤其是在可以多執行緒環境中執行的實作中,subject 在其觀察者端 不 被視為執行緒安全。但是,observable 端,即 Subscribe()
永遠是執行緒安全的。
這表示從多個執行緒呼叫 OnNext
、OnError
或 OnCompleted
可能會導致未定義的狀態。
因此,大多數 ReactiveX 實作都提供一個特殊運算子,使觀察者端也具有執行緒安全。尋找 ToSerialized
運算子。
有四種 Subject
專為特定用例而設計。並非所有實作都提供所有這些種類,並且有些實作使用其他命名慣例(例如,在 RxScala 中,這裡所謂的「PublishSubject」簡稱為「Subject」)。
AsyncSubject
會發射來源 Observable 發射的最後一個值(而且只發射最後一個值),並且只有在來源 Observable 完成後才會發射。(如果來源 Observable 沒有發射任何值,則 AsyncSubject
也會在不發射任何值的情況下完成。)
它也會將相同的最終值發射給任何後續的觀察者。但是,如果來源 Observable 以錯誤終止,則 AsyncSubject
不會發射任何項目,而只會傳遞來自來源 Observable 的錯誤通知。
當觀察者訂閱 BehaviorSubject
時,它會先發射來源 Observable 最近發射的項目(如果尚未發射任何項目,則發射 seed/預設值),然後繼續發射來源 Observable 後續發射的任何其他項目。
但是,如果來源 Observable 以錯誤終止,則 BehaviorSubject
不會向後續的觀察者發射任何項目,而只會傳遞來自來源 Observable 的錯誤通知。
PublishSubject
只會向觀察者發射在訂閱時間之後來源 Observable 發射的項目。
請注意,PublishSubject
可能會在建立後立即開始發射項目(除非您已採取措施防止這種情況),因此可能會在建立 Subject 和觀察者訂閱它之間遺失一或多個項目。如果您需要保證來源 Observable 發射的所有項目都交付,您需要使用 Create
建立該 Observable,以便您可以手動重新引入「冷的」Observable 行為(在開始發射項目之前檢查是否所有觀察者都已訂閱),或切換為使用 ReplaySubject
。
如果來源 Observable 以錯誤終止,則 PublishSubject
不會向後續的觀察者發射任何項目,而只會傳遞來自來源 Observable 的錯誤通知。
ReplaySubject
會向任何觀察者發射來源 Observable 發射的所有項目,無論觀察者何時訂閱。
還有一些 ReplaySubject
的版本會在重播緩衝區威脅到超出一定大小或自項目最初發射以來經過指定的時間間隔後,丟棄舊項目。
如果您將 ReplaySubject
用作觀察者,請注意不要從多個執行緒呼叫其 onNext
方法(或其其他 on
方法),因為這可能會導致同時(非循序)呼叫,這違反了 Observable 合約,並在最終的 Subject 中產生關於哪個項目或通知應先重播的歧義。
在某些 ReactiveX 風格和版本中,例如 RxJava 3.x,還有一些可用的 Subject 類型,它們滿足一些額外的常見角色。
一個 Subject 會將事件排隊,直到單個觀察者訂閱它,將這些事件重播給它直到觀察者趕上,然後切換為將事件即時轉發給這個單個觀察者,直到這個 UnicastSubject 終止或觀察者處置。
表示一個熱的類 Single 來源和事件消費者,類似於 Subject。由於 Single 只能發射一個項目或錯誤,因此 SingleSubject 隱含地是一個類重播 Subject。
表示一個熱的類 Maybe 來源和事件消費者,類似於 Subject。由於 Maybe 只能發射一個項目、一個錯誤或變為已完成,因此 MaybeSubject 隱含地是一個類重播 Subject。
表示一個熱的類 Completable 來源和事件消費者,類似於 Subject。由於 Completable 只能完成或保留一個錯誤,因此 CompletableSubject 隱含地是一個類重播 Subject。
RxJava 2.x 和 RxJava 3.x 將具有背壓感知 (backpressure-aware) 的 Subject 定義為 Processor,其命名與上面的其他 Subject 非常相似。它們的行為幾乎相同,唯一的例外是如果訂閱者沒有請求更多項目,它們不會溢出訂閱者。一般來說,這些 Subject 不會在訂閱者之間協調,如果他們跟不上,可能會單獨使他們失敗。
上面圖示的一個特殊處理器 MulticastProcessor
會在其訂閱者之間協調背壓。
SingleSubject
、MaybeSubject
和 CompletableSubject
Subject 類型沒有處理器變體,因為它們不需要支援背壓,並且始終最多只能保留一個元素。
處理器也實作了 Reactive Streams Processor 介面,因此它們與 Java 中的 Reactive Streams 生態系統相容。
待定
待定
如果您有一個 Subject
,並且想要將其傳遞給其他代理,而無需公開其 Subscriber
介面,您可以使用呼叫其 asObservable
方法來遮罩它,這將以純 Observable
的形式傳回 Subject。
AsyncSubject
BehaviorSubject
PublishSubject
ReplaySubject
如果您有一個 Subject
,並且想要將其傳遞給其他代理,而無需公開其 Subscriber
介面,您可以使用呼叫其 asObservable
方法來遮罩它,這將以純 Observable
的形式傳回 Subject。
AsyncSubject
BehaviorSubject
PublishSubject
ReplaySubject
如果您有一個 Subject
,並且想要將其傳遞給其他代理,而無需公開其 Observer
介面,您可以使用呼叫其 hide
方法來遮罩它,這將以純 Observable
的形式傳回 Subject。
AsyncSubject
BehaviorSubject
PublishSubject
ReplaySubject
UnicastSubject
CompletableSubject
MaybeSubject
SingleSubject
待定
待定
AsyncSubject
BehaviorSubject
ReplaySubject
待定
待定
待定