Accessing a generator created Using AsyncIO

41 views Asked by At

I am trying to follow this [tutorial][1], but use asyncio instead. I am stuck on the one part in the main function where I put the tasks in the list and then gather them in the results variable. I imagine that this is each of the pages. I want to use the following parse function to extract all the item information from each page


from typing import Any
import asyncio
import httpx

from selectolax.parser import HTMLParser
from urllib.parse import urljoin


async def fetch_html(client: httpx.AsyncClient, url: str, **kwargs) -> Any:
    headers = {
        "User-Agent": **"Add yours or you will get a 403"**
    }

    if kwargs.get("page"):
        resp = await client.get(
            url=url + str(kwargs.get("page")), follow_redirects=True, headers=headers
        )

    else:
        resp = await client.get(url=url, follow_redirects=True, headers=headers)

    try:
        resp.raise_for_status()
    except httpx.HTTPStatusError as exc:
        print(
            f"Error response {exc.response.status_code} while requesting {exc.request.url!r}. Page limit exceeded"
        )
        return False

    html = HTMLParser(html=resp.text)
    return html


def extract_text(html: HTMLParser, sel: str):
    try:
        text = html.css_first(sel).text()
        return text
    except AttributeError:
        return None


def parse_search_page(html: HTMLParser):
    products = html.css("li.VcGDfKKy_dvNbxUqm29K")
    for product in products:
        yield urljoin("https://www.rei.com/", product.css_first("a").attributes["href"])


async def main():
    async with httpx.AsyncClient(http2=True) as client:
        tasks = []
        rei_url = "https://www.rei.com/c/camping-and-hiking/f/scd-deals?page="
        for i in range(1, 2):
            tasks.append(asyncio.create_task(fetch_html(client, rei_url, page=i)))
        results = await asyncio.gather(*tasks) # print statement calls this [<HTMLParser chars=1437412>] so I thought this was an HTMLParser object that I can acesss in the following function

    links = parse_search_page(results) # Argument of type "list[Unknown]" cannot be assigned to parameter "html" of type "HTMLParser" in function "parse_search_page"
    for link in links:
        print(link)


if __name__ == "__main__":
    asyncio.run(main())

I am getting an Attribute error. My ideal data would have the results and then I would be able to loop through it with the parse page function (which itself is a generator that I can loop through). That is the mental model I have currently. I have tried putting both in the create task, making the parse function async, but that didn't seem to make sense. I am not [1]: https://youtu.be/DHvzCVLv_FA?si=9qJeqLmI02iYSCkA

1

There are 1 answers

0
Adrian Fletcher On

You get a generator which is an iterable object. You just need to iterate through it and store the results somewhere (this is shown with the two lists in the main function). I also pulled some logic out of the main function so that it is more dynamic. If there's a cleaner way to do it that will be greatly appreciated.

from collections.abc import Iterable
from time import perf_counter
from typing import Any
import asyncio
import httpx
from dataclasses import dataclass, asdict, fields

from selectolax.parser import HTMLParser
from urllib.parse import urljoin


@dataclass
class Item:
    name: str | None
    item_num: str | None
    price: str | None
    rating: float | None
    image_link: str | None


async def fetch_html(client: httpx.AsyncClient, url: str, **kwargs) -> Any:
    headers = {
        "User-Agent": **Add your own headers**
    }

    if kwargs.get("page"):
        resp = await client.get(
            url=url + str(kwargs.get("page")), follow_redirects=True, headers=headers
        )

    else:
        resp = await client.get(url=url, follow_redirects=True, headers=headers)

    try:
        resp.raise_for_status()
    except httpx.HTTPStatusError as exc:
        print(
            f"Error response {exc.response.status_code} while requesting {exc.request.url!r}. Page limit exceeded"
        )
        return False

    html = HTMLParser(html=resp.text)
    return html


def parse_search_page(html: HTMLParser):
    products = html.css("li.VcGDfKKy_dvNbxUqm29K")
    for product in products:
        yield urljoin("https://www.rei.com/", product.css_first("a").attributes["href"])


def parse_item_page(html: HTMLParser):
    new_item = Item(
        name=extract_text(html, "h1#product-page-title"),
        item_num=extract_text(html, "span#product-item-number"),
        price=extract_text(html, "span#buy-box-product-price"),
        rating=extract_text(html, "span.cdr-rating__number_13-5-3"),
        image_link=html.css_first("img").attributes["src"],
    )
    return asdict(new_item)


def extract_text(html: HTMLParser, sel: str) -> Any:
    try:
        text = html.css_first(sel).text()
        return text
    except AttributeError:
        return None


async def fetch_all_pages(
    client: httpx.AsyncClient, url: str, page_range: Iterable[int], **kwargs
):
    tasks = []
    for page_num in page_range:
        tasks.append(asyncio.create_task(fetch_html(client, url, page=page_num)))
    results = await asyncio.gather(*tasks)
    return results


async def main():
    async with httpx.AsyncClient(http2=True) as client:
        item_page_links = []
        item_desc = []

        page_range = range(1, 2)
        rei_url = "https://www.rei.com/c/camping-and-hiking/f/scd-deals?page="

        start_time = perf_counter()
        top_level_html = await fetch_all_pages(client, rei_url, page_range)

        for page in top_level_html:
            item_page_links = parse_search_page(page)

        for link in item_page_links:
            print(link)
            item_html = await fetch_html(client, link)
            item_desc.append(parse_item_page(item_html))

        print(item_desc)
        end_time = perf_counter()
        print(f"Total time to scrape:{end_time - start_time} ")


if __name__ == "__main__":
    asyncio.run(main())