Writing scraped items to db

31 views Asked by At

I am new to scrapy and I am scraping a site to learn. I have set up a spider, items, pipelines and process. My spider is run from CrawlerProcess. When I run the spider using a debugger, the items are updated successfully, but the pipelines are never called so, the tables are neither created or updated.

My directory tree looks like this.

pvoutput
│   └───pvoutput
│       ├───spiders
│       │   └───country_spider.py
│       ├───items.py
│       ├───pipelines.py
│       ├───process.py
│       ├───settings.py

spider.py

class CountrySystemsSpider(scrapy.Spider):
    name = "country_systems_spider"

    def __init__(self, id, country):
        self.id = id
        self.country = country.lower()
        self.systems = []

    def start_requests(self):
        yield scrapy.Request(f"https://pvoutput.org/ladder.jsp?f=1&country={self.id}")

    def parse(self, response):
        country_item = CountryItem()
        system_item = SystemItem()
        country = None
        table_rows = response.css(".e2, .o2")
        for row in table_rows:
            country = row.xpath(".//td[4]//text()").get()
            country = country.strip().lower()
            if country != self.country:
                continue
            tag = row.css("a:first-child")
            name = row.css("a:first-child::text").get()
            link = tag.xpath("@href").get()
            with suppress(AttributeError, IndexError):
                sid = re.search(r"sid=(\d+)", link).group(1)
                id = re.search(r"id=(\d+)", link).group(1)
                self.systems.append({"country": self.country, "name": name,
                    "sid": sid, "id": id,})
        
        next_link = response.xpath("//a[contains(text(), 'Next')]")
        if next_link:
            next_href = next_link[0].attrib["href"]
            next_href = f"https://pvoutput.org/ladder.jsp{next_href}"
            yield scrapy.Request(next_href, self.parse)
        else:
            country_item["sid"] = self.id
            country_item["name"] = country
            for system in self.systems:
                for key in system.keys():
                    system_item[key] = system[key]
                yield system_item

items.py

class CountryItem(Item):
    sid = Field()
    name = Field()
    system_ids = Field()

class SystemItem(Item):
    country = Field()
    name = Field()
    id = Field()
    sid = Field()

pipelines.py

class Country(Base):
    __tablename__ = "country"

    sid = Column(Integer, primary_key=True)
    name = Column(String, nullable=False)
    systems = Column(Integer, ForeignKey("country.sid"), nullable=False)

from sqlalchemy import create_engine, create_all
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

def create_tables(engine):
    Base.metadata.create_all(engine, checkfirst=True)

def db_connect(db_url):
    engine = create_engine(db_url)
    return engine


def create_instance_with_sid(model, **kwargs):
    sid =   kwargs.get("sid")
    if not sid:
        raise KeyError("sid not in kwargs.")
    return model(**kwargs)


def get_one_or_create(
    session,
    model,
    create_method="",
    create_method_kwargs=None,
    **kwargs):
    try:
        return session.query(model).filter_by(**kwargs).one(), False
    except NoResultFound:
        kwargs.update(create_method_kwargs or {})
        # created = create_instance_with_sid(model, **kwargs)
        created = getattr(model, create_method, model)(**kwargs)
        try:
            session.add(created)
            session.flush()
            return created, True
        except IntegrityError:
            session.rollback()
            return session.query(model).filter_by(**kwargs).one(), False

class PostgresPipeline:
    def __init__(self, db_url):
        self.DB_URL = db_url
        engine = db_connect(self.DB_URL)
        self.session = sessionmaker(bind=engine)
        create_tables(engine)

    @classmethod
    def from_crawler(cls, crawler):
        return cls(db_url=crawler.settings.get("DATABASE_URL"))
    
    def open_spider(self, spider):
        engine = db_connect(self.DB_URL)
        self.session = sessionmake(bind=engine)
    
    def close_spider(self, spider):
        self.session.close()

    def process_item(self, item, spider):
        import logging
        if isinstance(item, CountryItem):
            model = Country
        elif isinstance(item, SystemInfoItem):
            model = System
        sid = item["sid"]
        try:
            item, created = get_one_or_create(self.session, model, sid=sid)
            for attr, val in item.items():
                setattr(item, attr, val)
            self.session.add(item)
            self.session.commit()
        except:
            self.session.rollback()
            raise
        finally:
            self.session.close()
        return item

process.py

def get_systems_in_country(country):
    process = CrawlerProcess(get_project_settings())
    process.crawl(CountrySystemsSpider, id=224, country="Switzerland")
    process.start()

I have updated ITEM_PIPELINES={"projectname.pipelines.PostgresPipeline": 500}, the database url is also defined in settings.py.

I can write to csv directly from the spider class. But it seems the pipelines do not work as expected. I have also double checked my project settings to be sure I am not missing anything.

What am I missing?

0

There are 0 answers