ios - How to wait for all observable within a mutable array to finish in RxSwift -
my expectation add observables on-the-fly (eg: images upload), let them start, and, when finished dynamically enqueueing everything, wait observable finished.
here class :
open class instantobservables<t> { lazy var disposebag = disposebag() public init() { } lazy var observables: [observable<t>] = [] lazy var disposables: [disposable] = [] open func enqueue(observable: observable<t>) { observables.append(observable) let disposable = observable .subscribe() disposables.append(disposable) disposable .adddisposableto(disposebag) } open func removeandstop(atindex index: int) { guard observables.indices.contains(index) && disposables.indices.contains(index) else { return } let disposable = disposables.remove(at: index) disposable.dispose() _ = observables.remove(at: index) } open func waitforallobservablestobefinished() -> observable<[t]> { let multipleobservable = observable.zip(observables) observables.removeall() disposables.removeall() return multipleobservable } open func cancelobservables() { disposebag = disposebag() } }
but when subscribe observable sent waitforallobservablestobefinished()
, of them re-executed (which logic, regarding how rx works).
how warranty each executed once, whatever number of subscription ?
while writing question, got answer ! altering observable through sharereplay(1)
, , enqueuing , subscribing altered observable.. works !
here updated code :
open class instantobservables<t> { lazy var disposebag = disposebag() public init() { } lazy var observables: [observable<t>] = [] lazy var disposables: [disposable] = [] open func enqueue(observable: observable<t>) { let shared = observable.sharereplay(1) observables.append(shared) let disposable = shared .subscribe() disposables.append(disposable) disposable .adddisposableto(disposebag) } open func removeandstop(atindex index: int) { guard observables.indices.contains(index) && disposables.indices.contains(index) else { return } let disposable = disposables.remove(at: index) disposable.dispose() _ = observables.remove(at: index) } open func waitforallobservablestobefinished() -> observable<[t]> { let multipleobservable = observable.zip(observables) observables.removeall() disposables.removeall() return multipleobservable } open func cancelobservables() { disposebag = disposebag() } }
Comments
Post a Comment