javascript - rxjs/redux-observer: observable.retry to re-establish connection -
i using elixir phoenix websocket in application i'm building , have epic looks this:
const socketobservable = observable.create((observer: object) => { const socket = new socket(`${getwebsocketurl()}/socket`, { params: { token: readsession(), } }); socket.connect(); socket.onopen(() => observer.next({ type: socket_connected, socket }), ); socket.onerror((error) => observer.error({ type: websocket_error, error }), ); return () => { // socket.disconnect(); }; }); const connecttosocket = ( action$: object, ) => action$.oftype(connect_to_socket) .switchmap(() => socketobservable .catch((error) => observable.of(error)), ) .retry(); export default connecttosocket;
what happen user notified when network connection goes away emitting { type: websocket_error, error }
, have notification removed when connection reestablished emitting { type: socket_connected, socket }
. got first part working, when re-connection happens, { type: socket_connected, socket }
never dispatches. using redux-saga, able make work using following code:
const connecttosocket = (): object => eventchannel((emitter: (object) => mixed) => { const socket = new socket(`${getwebsocketurl()}/socket`, { params: { token: readsession(), } }); socket.connect(); socket.onopen(() => emitter({ socket })); socket.onerror((error) => { emitter({ error }); }); return () => { // socket.disconnect(); }; }); export function* callconnecttosocket(): generator<ioeffect, *, *> { const chan = yield call(connecttosocket); while (true) { const { socket, error } = yield take(chan); if (socket) { yield put({ type: socket_connected, socket }); } else { yield put({ error, type: websocket_error }); } } } export function* watchconnecttosocket(): generator<ioeffect, *, *> { yield takelatest(connect_to_socket, callconnecttosocket); }
for rxjs code, thought tacking .retry()
@ end of chain supposed trigger retry of source observable if en error emitted per documentation rxjs observable.retry, may don't understand retry
supposed or how use properly. may can achieve want.
for retry
operator take effect, source observable has produce error. , seems in example error notification never reaches retry
gets swallowed catch
operator recovers error.
to make work, may try make catch
operator return observable first emits action , produces error:
const connecttosocket = action$ => actions$.oftype(connect_to_socket) .switchmap(() => socketobservable .catch(error => observable.of(error).concat(observable.throw(error))) ) .retry();
update:
i think worth mentioning rx
follows grammar next* (complete|error)?
, meaning next()
calls come after error()
on same observer have no effect. so, if socket
recovers error , executes onopen
callback after has executed onerror
, socket_connected
notification not reach consumer.
this possibly handled either replacing error
next
notifications or restarting socketobservable
each time error occurs, meaning new socket
instance created (but not want).
here's runnable code sample demonstrating how retry
might work:
const { createstore, applymiddleware } = redux; const { createepicmiddleware } = reduxobservable; const socketobservable = rx.observable.create(observer => { const t1 = settimeout(() => observer.next({ type: "socket_connected" }), 200); const t2 = settimeout(() => observer.error({ type: "socket_error" }), 400); return () => { cleartimeout(t1); cleartimeout(t2); }; }) const connecttosocket = action$ => action$ .do(action => console.log(action)) .oftype("connect_to_socket") .switchmap(() => socketobservable .catch(error => rx.observable.of(error).concat(rx.observable.throw(error))) // make 2 attempts re-connect, i.e. restart socketobservable .retry(2) ) // recover in case if both attempts reconnect have failed .retry(); const store = createstore( (state, action) => state, applymiddleware(createepicmiddleware(connecttosocket))); // dispatch connect_to_socket 2 times rx.observable.interval(2000) .take(2) .subscribe(x => store.dispatch({ type: "connect_to_socket" }));
<script src="https://unpkg.com/rxjs@5.4.2/bundles/rx.min.js"></script> <script src="https://unpkg.com/redux@3.7.2/dist/redux.min.js"></script> <script src="https://unpkg.com/redux-observable@0.14.1/dist/redux-observable.min.js"></script>
Comments
Post a Comment