javascript - rxjs/redux-observer: observable.retry to re-establish connection -


i using elixir phoenix websocket in application i'm building , have epic looks this:

const socketobservable = observable.create((observer: object) => {   const socket = new socket(`${getwebsocketurl()}/socket`, { params: {     token: readsession(),   } });    socket.connect();    socket.onopen(() =>       observer.next({ type: socket_connected, socket }),   );    socket.onerror((error) =>       observer.error({ type: websocket_error, error }),   );    return () => {     // socket.disconnect();   }; });  const connecttosocket = (   action$: object, ) => action$.oftype(connect_to_socket) .switchmap(() =>   socketobservable     .catch((error) => observable.of(error)), ) .retry();  export default connecttosocket; 

what happen user notified when network connection goes away emitting { type: websocket_error, error } , have notification removed when connection reestablished emitting { type: socket_connected, socket }. got first part working, when re-connection happens, { type: socket_connected, socket } never dispatches. using redux-saga, able make work using following code:

const connecttosocket = (): object =>   eventchannel((emitter: (object) => mixed) => {     const socket = new socket(`${getwebsocketurl()}/socket`, { params: {       token: readsession(),     } });      socket.connect();      socket.onopen(() => emitter({ socket }));      socket.onerror((error) => {       emitter({ error });     });      return () => {       // socket.disconnect();     };   });  export function* callconnecttosocket(): generator<ioeffect, *, *> {   const chan = yield call(connecttosocket);   while (true) {     const { socket, error } = yield take(chan);     if (socket) {       yield put({ type: socket_connected, socket });     } else {       yield put({ error, type: websocket_error });     }   } }  export function* watchconnecttosocket(): generator<ioeffect, *, *> {   yield takelatest(connect_to_socket, callconnecttosocket); } 

for rxjs code, thought tacking .retry() @ end of chain supposed trigger retry of source observable if en error emitted per documentation rxjs observable.retry, may don't understand retry supposed or how use properly. may can achieve want.

for retry operator take effect, source observable has produce error. , seems in example error notification never reaches retry gets swallowed catch operator recovers error.

to make work, may try make catch operator return observable first emits action , produces error:

const connecttosocket = action$ =>     actions$.oftype(connect_to_socket)         .switchmap(() => socketobservable             .catch(error => observable.of(error).concat(observable.throw(error)))         )         .retry(); 

update:

i think worth mentioning rx follows grammar next* (complete|error)?, meaning next() calls come after error() on same observer have no effect. so, if socket recovers error , executes onopen callback after has executed onerror, socket_connected notification not reach consumer.

this possibly handled either replacing error next notifications or restarting socketobservable each time error occurs, meaning new socket instance created (but not want).

here's runnable code sample demonstrating how retry might work:

const { createstore, applymiddleware } = redux;  const { createepicmiddleware } = reduxobservable;    const socketobservable = rx.observable.create(observer => {      const t1 = settimeout(() => observer.next({ type: "socket_connected" }), 200);      const t2 = settimeout(() => observer.error({ type: "socket_error" }), 400);        return () => {          cleartimeout(t1);          cleartimeout(t2);      };  })    const connecttosocket = action$ => action$      .do(action => console.log(action))      .oftype("connect_to_socket")      .switchmap(() => socketobservable          .catch(error => rx.observable.of(error).concat(rx.observable.throw(error)))          // make 2 attempts re-connect, i.e. restart socketobservable          .retry(2)      )      // recover in case if both attempts reconnect have failed      .retry();    const store = createstore(      (state, action) => state,      applymiddleware(createepicmiddleware(connecttosocket)));    // dispatch connect_to_socket 2 times  rx.observable.interval(2000)      .take(2)      .subscribe(x => store.dispatch({ type: "connect_to_socket" }));
<script src="https://unpkg.com/rxjs@5.4.2/bundles/rx.min.js"></script>  <script src="https://unpkg.com/redux@3.7.2/dist/redux.min.js"></script>  <script src="https://unpkg.com/redux-observable@0.14.1/dist/redux-observable.min.js"></script>


Comments

Popular posts from this blog

php - Vagrant up error - Uncaught Reflection Exception: Class DOMDocument does not exist -

vue.js - Create hooks for automated testing -

Add new key value to json node in java -