1

I am using redux-oservable and am having an issue with timing on my observables. I want to introduce a delay between the emission of values within an array.

The epic that I'm working with looks like this at the moment:

export const fileImportEpi = (
  action$: any,
  state$: any,
  { apiClient }: IDependencies
) =>
  action$.pipe(
    ofType(FILE_IMPORT),
    withLatestFrom(state$),
    mergeMap(([{ payload }]: any) => { // Input is a 2 element array of objects, where the first object contains the payload key
      const { id, file, config = {} } = payload
      const headers = {
        'Content-Type': 'application/zip',
        Filename: file.name,
      }

      return from(
        apiClient.post(('import', file, {
          ...config,
          headers,
        })
      ).pipe(
        mapTo(fileSuccessAction(id)),
        catchError((error) => {
          return of(
            apiErrorAction(error.response ? error.response.data : error),
            fileAttrsUpdateAction(id, {
              error: error.response ? error.response.data : error,
            }),
            fileFailAction(id)
          )
        }) // Catch
      ) // Pipe
    }) // MergeMap
  )

The issue is that is basically launches all of the POST requests immediately, and this is causing issues in the backend. What I would like to do is introduce a delay between the emission of each post request (let's say 20ms).

I can't seem to make this work though. Wherever I introduce the delay is seems to just launch all of the post requests sooner or later, however never with delay between each request. How could I go about doing this?

(You could argue that the issue should be fixed in the backend however that is not really an option at the moment.)

Edit:

To show another method I have also tried:

export const fileImportEpic = (
  action$: any,
  state$: any,
  { fpiClient }: IDependencies
) =>
  action$.pipe(
    ofType(FILE_IMPORT),
    withLatestFrom(state$),

    mergeMap(([{ payload }]: any) => {
      // mergeMap(([{ payload }]: any) => { // Input looks like: [{payload: obj, otheKeys:...}, {type:val, otherKeys:...}] (Ie. a 2 element array where the first element contains the payload)

      const { id, file, config = {} } = payload
      const headers = {
        'Content-Type': 'application/zip',
        Filename: file.name,
      }
      const pl = [payload]

      // Try to release each part of the payload with a delay in between, instead of returning a post request promise
      return from(pl).pipe(
        delay(3000),
        mergeMap((payload: any) => {
          const { id, file, config = {} } = payload
          const headers = {
            'Content-Type': 'application/zip',
            Filename: file.name,
          }

          return from(
            apiClient.post('archives/import', file, {
              ...config,
              headers,
            })
          ).pipe(
            mapTo(fileSuccessAction(id)),
            catchError((error) => {
              return of(
                apiErrorAction(error.response ? error.response.data : error),
                fileAttrsUpdateAction(id, {
                  error: error.response ? error.response.data : error,
                }),
                fileFailAction(id)
              )
            }) // Catch
          )
        })
      ) // Pipe
    }) // MergeMap 1
  )

2 Answers 2

1

change mergeMap to concatMap

const { of, from, delay, concatMap } = rxjs;

const api = (id) => fetch(`https://jsonplaceholder.typicode.com/todos/${id}`).then((res) => res.json());

of(1, 2, 3, 4, 5).pipe(
  concatMap((id) => from(api(id)).pipe(delay(1000)))
).subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.6/rxjs.umd.min.js"></script>

1
  • Awesome, that did it! Thank you. I'm realizing now though that this is actually not 100% what I want, and I actually just want to delay the creation of the API calls and then let them execute in parallel, instead of waiting for one to finish before we start the other. Do you know how I could achieve that?
    – Tyler
    Commented Sep 6, 2022 at 5:32
0

use defer -> https://rxjs.dev/api/index/function/defer

const { of, delay, defer, concatAll } = rxjs;

const api = () => fetch('https://jsonplaceholder.typicode.com/todos/1').then((res) => res.json());
const api$ = defer(api);

of(api$).pipe(delay(10000), concatAll()).subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.6/rxjs.umd.min.js"></script>

1
  • @Tyler like this?
    – Eddy Lin
    Commented Sep 6, 2022 at 9:11

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.