ios - How to wait for all observable within a mutable array to finish in RxSwift -


my expectation add observables on-the-fly (eg: images upload), let them start, and, when finished dynamically enqueueing everything, wait observable finished.

here class :

open class instantobservables<t> {     lazy var disposebag = disposebag()      public init() { }      lazy var observables: [observable<t>] = []     lazy var disposables: [disposable] = []      open func enqueue(observable: observable<t>) {         observables.append(observable)          let disposable = observable             .subscribe()          disposables.append(disposable)          disposable             .adddisposableto(disposebag)     }      open func removeandstop(atindex index: int) {         guard observables.indices.contains(index)             && disposables.indices.contains(index) else {             return         }         let disposable = disposables.remove(at: index)         disposable.dispose()          _ = observables.remove(at: index)     }      open func waitforallobservablestobefinished() -> observable<[t]> {         let multipleobservable = observable.zip(observables)         observables.removeall()         disposables.removeall()         return multipleobservable     }      open func cancelobservables() {         disposebag = disposebag()     } } 

but when subscribe observable sent waitforallobservablestobefinished() , of them re-executed (which logic, regarding how rx works).

how warranty each executed once, whatever number of subscription ?

while writing question, got answer ! altering observable through sharereplay(1), , enqueuing , subscribing altered observable.. works !

here updated code :

open class instantobservables<t> {     lazy var disposebag = disposebag()      public init() { }      lazy var observables: [observable<t>] = []     lazy var disposables: [disposable] = []      open func enqueue(observable: observable<t>) {         let shared = observable.sharereplay(1)         observables.append(shared)          let disposable = shared             .subscribe()          disposables.append(disposable)          disposable             .adddisposableto(disposebag)     }      open func removeandstop(atindex index: int) {         guard observables.indices.contains(index)             && disposables.indices.contains(index) else {             return         }         let disposable = disposables.remove(at: index)         disposable.dispose()          _ = observables.remove(at: index)     }      open func waitforallobservablestobefinished() -> observable<[t]> {         let multipleobservable = observable.zip(observables)         observables.removeall()         disposables.removeall()         return multipleobservable     }      open func cancelobservables() {         disposebag = disposebag()     } } 

Comments

Popular posts from this blog

php - Vagrant up error - Uncaught Reflection Exception: Class DOMDocument does not exist -

vue.js - Create hooks for automated testing -

Add new key value to json node in java -