I have an observable enriched_sns$ which I'm enriching by performing operations on each emitted item using concatMap, and then combining the results into an array using toArray(). However, I'm curious if there's an alternative approach to achieve the same result without using toArray().
Here's my current code:
private enriched_sns$ = toObservable(this.sn_results).pipe(
concatMap((results) => results as StreamNotificationResult[]),
concatMap((notification) => {
const user_id = notification.activities[0].actor.id;
return zip(of(notification), this.read_user_data_service.readUserData(user_id));
}),
map(([notification, user]) => {
return {
notification,
user: user,
};
}),
toArray(),
);
Is there a way to obtain an array of emitted items at the end of the observable without resorting to toArray()? Any help or alternative approaches would be greatly appreciated. Thank you!
The reason you need to use
toArray()in your example is because you are taking the emitted array from your signal and then emitting each element individually.You could use the
forkJoinfunction to handle subscribing to an array of observables, rather than usingconcatMapto emit each element one at a time:There is one important difference
forkJoinwill subscribe to all observables at the same time, not one at a time. If you need to limit concurrency, you could check out therxjs-etcpackage that has aforkJoinConcurrentfunction which allows you to specify concurrency:Note, use of
zip/ofis not necessary at all. You can simply add a.pipeto your inner observable and map the emission to your desired shape: