Is it possible save peewee SQL query as string or json and then load back to ModelSelect object

68 views Asked by At

I created a universal keyboard with pagination in telegram bot. It works on a peewee query object ModelSelect, but I can't save it to redis or other storage, now I save it in memory, and every run the object is erased and the keyboard stops working.

I tried:

from playhouse.shortcuts import model_to_dict, dict_to_model
from peewee import * 

db = SqliteDatabase('test.db')

class User(Model): 
    id:   int 
    name: str = TextField()
    
    class Meta:
        database = db

# create User table
User.create_table()

# add new User
User(name = "new_user").save()

# select User with id = 1
item = User.select().where(User.id == 1)
print(f"SQL: {item.sql()}")

# try to convert ModelSelect to dict
_dict = model_to_dict(item)
print(_dict)

# try to convert dict to ModelSelect
_item = dict_to_model(User, _dict)
print(_item)

But got error:

SQL: ('SELECT "t1"."id", "t1"."name" FROM "user" AS "t1" WHERE ("t1"."id" = ?)', [1])
Traceback (most recent call last):
  File "C:\testfile.py", line 27, in <module>
    _dict = model_to_dict(item)
  File "C:\Python310\lib\site-packages\playhouse\shortcuts.py", line 74, in model_to_dict
    for field in model._meta.sorted_fields:
AttributeError: 'ModelSelect' object has no attribute '_meta'
2

There are 2 answers

2
coleifer On

This is the wrong idea. Serializing model instances to dicts is perfectly fine, and they can be de-serialized back to model instances. You cannot reasonably expect to serialize a query object, though. The query just represents the SQL and, when executed, contains a reference to a cursor object.

If you want to serialize an object, you would:

# select User with id = 1 -- note call to .get()!!
item = User.select().where(User.id == 1).get()
data = model_to_dict(item)
2
abuztrade On

I have written a serializer for the ModelSelect object.

from __future__ import annotations
from peewee import * 
from typing import *
import peewee

# create database
db = SqliteDatabase(':memory:')

class User(Model): 
    id:   int 
    name: str = TextField()
    #
    posts: List[Post]

    class Meta:
        database = db

class Post(Model):
    id: int
    title: str = TextField()
    author: User = ForeignKeyField(User, backref = 'posts')
    #
    author_id: int

    class Meta:
        database = db

# create User table
User.create_table()
Post.create_table()

# add new User
user = User(name = "new_user")
user.save()

# add new Post and set user as author
Post(title = "first_post", author = user).save()

class Serializer:
    """
    ## class for serializing and deserializing `peewee.ModelSelect` object
    
    :param main_model_class: Main class from which all models are inherited. (Optional)

    ### Usage:

    ```python
    import peewee

    # create database
    db = peewee.SqliteDatabase(':memory:')

    class User(peewee.Model): 
        id:   int 
        name: str = peewee.TextField()

        class Meta:
            database = db

    # create User table
    User.create_table()

    # add new User
    User(name = "first user").save()

    # create query
    query = User.select().where(User.id == 1).limit(1)
    print(query.sql())
    # ('SELECT "t1"."id", "t1"."name" FROM "user" AS "t1" WHERE ("t1"."id" = ?) LIMIT ?', [1, 1])
    print(list(query))
    # [<User: 1>]
    
    # serialize query to dict
    serialized = Serializer(main_model_class = peewee.Model).serialize(query)
    print(serialized)
    # {'model': 'MODEL User', '_join_ctx': ..., ...}
    # Now you can save serialized dict

    deserialized = Serializer(main_model_class = peewee.Model).deserialize(data)
    print(deserialized.sql())
    # ('SELECT "t1"."id", "t1"."name" FROM "user" AS "t1" WHERE ("t1"."id" = ?) LIMIT ?', [1, 1])
    print(list(deserialized))
    # [<User: 1>]
    ```

    """
    mark_serialized_key = "__serialized__"
    reserved_objects: tuple[Union[peewee.Join, peewee.Expression]] = (
        peewee.Join, peewee.Expression
    )
    reserved_objects_names: tuple[str] = tuple(
        x.__name__ 
        for x in reserved_objects
    )

    def __init__(self, main_model_class: Optional[peewee.Model] = peewee.Model):
        self.model_class = main_model_class
        self.models = list(self.reserved_objects) + main_model_class.__subclasses__()
        
    def _get_model_by_name(self, name: str) -> Union[peewee.Model, Any, None]:
        """
        Restore string name of object to real object

        :param name: name of object
        """
        for model in self.models:
            if name == model.__name__:
                return model
        return None
    
    # # # # # # # # # # # # #
    #                       #
    #    Serialize part     #
    #                       #
    # # # # # # # # # # # # #

    def serialize(self, obj: Union[peewee.ModelSelect, Any, dict]) -> dict:
        """
        Serialize `peewee.ModelSelect` object to dict

        :param obj: `peewee.ModelSelect` object
        """
        result = {}
        obj_dict = obj.__dict__ if isinstance(obj, dict) is False else obj

        for key, value in obj_dict.items():
            if key == '_database' or value is None:
                continue

            key = self._serialize_object(value = key)

            if isinstance(value, (str, int, float, bool)) is False:
                value = self._serialize_object(value)

            result[key] = value
        
        if isinstance(obj, peewee.ModelSelect):
            result[self.mark_serialized_key] = True

        return result

    def _serialize_array(self, value: Union[list, tuple]) -> Union[list, tuple]:
        """
        Serialize list or tuple 

        :param value: list or tuple
        """
        result = [
            self._serialize_object(item)
            for item in value
        ]

        if isinstance(value, tuple):
            return tuple(result)
        
        return result
        
    def _serialize_object(self, value: Union[peewee.Model, str, dict, list, tuple, Any]) -> Union[str, dict, Any]:
        """
        Serialize object

        :param value: object
        """
        if isinstance(value, dict):
            return self.serialize(value)
        
        elif isinstance(value, (list, tuple)):
            return self._serialize_array(value)
        
        elif isinstance(value, self.reserved_objects):
            return dict(
                value_type = value.__class__.__name__,
                value      = self.serialize(value)
            )
        
        elif hasattr(value, '__name__'):
            return f"MODEL {value.__name__}"
        
        elif hasattr(value, 'model'):
            return f"MODEL_PROPERTY {value.model.__name__}.{value.name}"
        
        else: 
            return value

    # # # # # # # # # # # # # #
    #                         #
    #    Deserialize part     #
    #                         #
    # # # # # # # # # # # # # #

    def deserialize(self, data: dict) -> peewee.ModelSelect:
        """
        Deserialize dict to `peewee.ModelSelect` object

        :param data: dict
        """
        result = {}

        if data.get(self.mark_serialized_key):
            main_model: peewee.Model = self._deserialize_object(data.get('model'))

        for key, value in data.items():
            if key == self.mark_serialized_key: 
                continue

            key = self._deserialize_object(key)
            value = self._deserialize_object(value)

            if isinstance(value, self.reserved_objects) and key == '_on':
                key = 'on'

            result[key] = value

        if data.get(self.mark_serialized_key):
            selection = main_model.select()

            for k, v in result.items():
                setattr(selection, k, v)
            
            return selection

        return result

    def _deserialize_array(self, value: Union[list, tuple]) -> Union[list, tuple]:
        """
        Deserialize list or tuple

        :param value: list or tuple
        """
        result = [
            self._deserialize_object(item)
            for item in value
        ]

        if isinstance(value, tuple):
            return tuple(result)
        
        return result

    def _deserialize_object(self, value: Any) -> Any:
        """
        Deserialize object

        :param value: object
        """
        if isinstance(value, dict) and value.get("value_type") in self.reserved_objects_names:
            obj = self._get_model_by_name(value["value_type"])
            return obj(**self.deserialize(value["value"]))
        
        elif isinstance(value, dict):
            return self.deserialize(value)
        
        elif isinstance(value, (list, tuple)):
            return self._deserialize_array(value)

        elif isinstance(value, str) and value.startswith("MODEL_PROPERTY"):
            model_name, property_name = value.replace("MODEL_PROPERTY ", "").split('.')
            model = self._get_model_by_name(model_name)
            return getattr(model, property_name)
        
        elif isinstance(value, str) and value.startswith("MODEL"):
            model_name = value.replace("MODEL ", "")
            return self._get_model_by_name(model_name)
        
        else:
            return value

# shortcuts
def modelselect_to_dict(item: ModelSelect) -> dict:
    """Shortcut for export `peewee.ModelSelect` to dict"""
    return Serializer().serialize(item)

def dict_to_modelselect(data: dict, main_model_class: Optional[peewee.Model] = peewee.Model) -> ModelSelect:
    """Shortcut for import `peewee.ModelSelect` from dict"""
    return Serializer(main_model_class).deserialize(data)

# create query
item = User.select().join(Post, on = (User.id == Post.author_id)).where((User.id == 1) & (User.name == "new_user")).limit(1).offset(0).group_by(User.id)

# export ModelSelect query to dict
dicted = modelselect_to_dict(item)
print("Exported as dict:\n", dicted, "\n")

# import dict to ModelSelect
usered = dict_to_modelselect(dicted)

# query sqls
print("Initial SQL:\n", item.sql())
# ('SELECT "t1"."id", "t1"."name" FROM "user" AS "t1" INNER JOIN "post" AS "t2" ON ("t1"."id" = "t2"."author_id") WHERE (("t1"."id" = ?) AND ("t1"."name" = ?)) GROUP BY "t1"."id" LIMIT ? OFFSET ?', [1, 'new_user', 1, 0])
print("Restored SQL:\n", usered.sql(), "\n")
# ('SELECT "t1"."id", "t1"."name" FROM "user" AS "t1" INNER JOIN "post" AS "t2" ON ("t1"."id" = "t2"."author_id") WHERE (("t1"."id" = ?) AND ("t1"."name" = ?)) GROUP BY "t1"."id" LIMIT ? OFFSET ?', [1, 'new_user', 1, 0])

# query results
print("Initial query result:\n", list(item))
print("Restored query result:\n", list(usered))

# compare initial and restored query
print("\nEqual:", item.sql() == usered.sql() and list(item) == list(usered), "\n")