Array of inner subscriptions with rxjs

104 views Asked by At

I'm trying to improve the flattening and reduce the chaining inside my Rxjs code.

A REST call of my service getAllItems() returns the following Items[]:

[{
  id: 101,
  name: 'Car',
  owner: 1
},
{
  id: 102,
  name: 'Table',
  owner: 2
}]

I have another endpoint getOwnerInfo(id:number) which provides information based in the "owner" id, information which I want to combine, so the final answer should look like:

[{
  id: 101,
  name: 'Car',
  owner: {
    id: 1,
    username: 'Mikelinin',
    name: 'Mikel',
  },
},
{
  id: 102,
  name: 'Table',
  owner: {
    id: 2,
    username: 'Thominin',
    name: 'Thomas',
  },
}]

My current implementation .subscribes once to getAllItems call and then in the subscription body it iterates per element subscribing to getOwnerInfo.

I've been looking through other flattening examples, but basically they "break" the array, and they never put it back together. I need the output to be an array.

I've tried to use from(), concatMap(), and mergeMap() but seems I am unable to combine both requests properly.

2

There are 2 answers

2
Daniel Gimenez On

The following will use from to convert a distinct array of owner ids into a stream of owner ids. Then it will use toArray to convert that stream back into an array and map the items array to an array of items with their respective owner object.

this.getAllItems().pipe(
  switchMap(items => {
    const ownerIds = Array.from(new Set(items.map(x => x.owner)));
    return from(ownerIds).pipe(
      concatMap(x => this.getOwnerInfo(x))
      toArray(),
      map(owners => items.map(x => ({ ...x, owner: owners.find(y => y.id === x.owner) })))
    )
  })
)
0
Mrk Sef On

Map your array of items into an array of observables that each emit the updated item. Then join the array of observables together.

getAllItems().pipe(
  
  // turn item[] into observable<updatedItem>[]
  map(items => items.map(item => getOwnerInfo(item.id).pipe(
    map(owner => ({...item, owner}))
  ),

  // join observable<updatedItem>[] into updatedItem[]
  switchMap(itemCalls => forkJoin(itemCalls))

).subscribe(console.log);

Or you can do the mapping and joining in one step:

getAllItems().pipe(

  // source item[], do stuff, emit updatedItem[]
  switchMap(items => forkJoin(
    items.map(item => getOwnerInfo(item.id).pipe(
      map(owner => ({...item, owner}))
    ))
  ))
  
).subscribe(console.log);