15

I am trying to use switchMap to cancel any previous http calls in Angular2. The code is basically

var run = ():Observable<any> => {
        var url = 'http://...'
        return this._http.get(url)
            .map(result => {
                return var xmlData:string = result.text()

            });
    }


    function pollTasks() {
        return Observable.of(1)
            .switchMap(() => run())
            .map(res => res)
    }

    // caller can do subscription and store it as a handle:
    let tasksSubscription =
        pollTasks()
            .subscribe(data => {
                console.log('aa'+data)
            });

and so I call the entire source several times in a row and receive several replies (i.e.: aa+data)

I was under the impression switchMap should cancel the previous calls.

9
  • How do you call it several times? Commented Mar 3, 2016 at 7:13
  • well just for testing I simply run is as in: runCmd(); runCmd(); runCmd(); ...
    – born2net
    Commented Mar 3, 2016 at 16:14
  • 1
    from the answer below I now understand that that it can't UNLESS it comes from the same originating stream... disappointing... but logical, tx!
    – born2net
    Commented Mar 3, 2016 at 16:18
  • 2
    You could merge the input to a single stream, then it should work. Commented Mar 3, 2016 at 16:19
  • any possible code snippet on the merge would be awesome ...
    – born2net
    Commented Mar 3, 2016 at 19:02

4 Answers 4

21

The requests need to originate from the same underlying stream. Here's a factory function that will create an http service instance that should do what you want:

function httpService(url) {
   // httpRequest$ stream that allows us to push new requests
   const httpRequest$ = new Rx.Subject();

   // httpResponse$ stream that allows clients of the httpService
   // to handle our responses (fetch here can be replaced with whatever
   // http library you're using).
   const httpResponse$ = httpRequest$
       .switchMap(() => fetch(url));


   // Expose a single method get() that pushes a new
   // request onto the httpRequest stream. Expose the
   // httpResponse$ stream to handle responses.
   return {
       get: () => httpRequest$.next(),
       httpResponse$
   };
}

And now the client code can use this service like this:

const http = httpService('http://my.api.com/resource');

// Subscribe to any responses
http.httpResponse$.subscribe(resp => console.log(resp));

// Call http.get() a bunch of times: should only get one log!!
http.get();
http.get();
http.get();
6
  • TX so much... saved me some figuring out, appreciate the code (wish I could give u higher score ;) ... small change is just to use next instead of onNext as per rxjs 5.0... works great... and I like the fact you used the subject to both listen to trigger... very nice! I am sure this post will get u a lot of +1's
    – born2net
    Commented Mar 3, 2016 at 17:45
  • one thing I have added was .share() so I will not create new streams from subscriptions and share among listeners...
    – born2net
    Commented Mar 3, 2016 at 18:16
  • Calvin Belden, after further investigation I see that I still have to somehow merge the streams since I am running multiple components which all call the http service, and sure, each one will have only its last call to http be pused through, but still each component will have exactly one call pending, which is not what I want as this will cause a mismatch in data, and so I have to somehow merge all the streams into a single stream and only listen to the absolute last call... mmm....
    – born2net
    Commented Mar 3, 2016 at 19:00
  • Can these different components share the same instance of the httpService? Commented Mar 4, 2016 at 15:13
  • When you call .next() in the subject it calls the fetch method again? or how it works? Commented Jul 28, 2017 at 21:33
7
constructor(private _http:Http, private appStore:AppStore) {

        this.httpRequest$ = new Subject();


        this.httpRequest$
            .map(v=> {
                return v;
        })
        .switchMap((v:any):any => {
            console.log(v);
            if (v.id==-1||v.id=='-1')
                return 'bye, cancel all pending network calls';                
            return this._http.get('example.com)
                .map(result => {
                    var xmlData:string = result.text()
                });
        }).share()
        .subscribe(e => {                
        })
    ...

and to push data in:

this.httpRequest$.next({id: busId});        

this works great and I can now have a single service that I can pipe all network calls through as well as cancel prior calls...

see image below, as new calls come in, prior ones are canceled. note how I set slow network with a delay of 4sec to provide all is working as expected...

enter image description here

1
  • What is busId in next() function? I tried but not working
    – Phuong
    Commented Jan 17, 2018 at 8:55
3

I think when you use the switchMap operator, you can only cancel requests in the current data flow. I mean the events that occur on the same observable chain...

If you call your pollTasks method several times, you won't be able to the cancel previous requests because they won't be in the same data flow... You create an observable chain each time you call the method.

I don't know how you trigger the execution of your requests.

If you want to execute your request each 500ms, you could try this:

pollTasks() {
  return Observable.interval(500)
                .switchMap(() => run())
                .map(res => res.json());
}

In this case, if there are in-progress request after 500ms, they will be canceled to execute the new one

With the approach, you just need to call once the pollTasks method.

You can also trigger the asynchronous processing chain based on user events. For example, when characters are filled in inputs:

var control = new Control();
// The control is attached to an input using the ngFormControl directive
control.valueChanges.switchMap(() => run())
                .map(res => res.json())
                .subscribe(...);

There is a proposal to link / initiate more easily processing chain on DOM events (fromEvent)

See this link:

3
  • Makes sense.. I was wondering how can rxjs magically associate diff calls and cancel the last one out... but I guess now the answer is that it can't UNLESS it comes from the same originating stream... disappointing... but logical, tx!
    – born2net
    Commented Mar 3, 2016 at 16:17
  • I am wondering what do you recommend? as I have a service that I need to all via API (on component load) but this component may be loaded and unloaded a few times a seconds, so what would you think the best approach would be to cancel prior calls... short of checking IDs and canceling out server returns that do not match current selected IDs (yachh...)... tx Sean
    – born2net
    Commented Mar 3, 2016 at 16:21
  • Yes that's the point ;-) but the stream can be created from a dom event (with fromEvent method). This way, each time you click (or type something, ...) the previous in-progress request will be canceled... Commented Mar 3, 2016 at 16:23
0

You can use Subject but if you do so, you have to manage subscription and publish. If you just want a method returning Observable which cancel requests within a interval, here is how I would do that :

observer: Observer<CustomObject>;
debug = 0;

myFunction(someValue) {
    this.observable = new Observable<CustomObject>(observer => {
        if (!this.observer) {
            this.observer = observer;
        }

       // we simulate http response time
        setTimeout(() => {
            this.observer.next(someValue);
            this.debug++;
        }, 1000);
    })
    .debounceTime(3000) // We cancel request withing 3s interval and take only the last one
    .switchMap(fn)
    .do(() => {
        console.log("debug", this.debug);
    });
}

Using the same observer all along let us cancel requests according all we want (debounceTime, distinctUntilChanged, ...).

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.