信号
信号是一种机制,每当 ormar 中发生给定类型的事件时,就会触发您的代码(函数/方法)。
为此,您需要为所选型号的给定类型信号注册接收器。
定义接收者
给出如下示例模型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 | import databases
import sqlalchemy
import ormar
base_ormar_config = ormar.OrmarConfig(
database=databases.Database("sqlite:///db.sqlite"),
metadata=sqlalchemy.MetaData(),
)
class Album(ormar.Model):
ormar_config = base_ormar_config.copy()
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
is_best_seller: bool = ormar.Boolean(default=False)
play_count: int = ormar.Integer(default=0)
|
例如,您可以定义一个触发器,如果该专辑的播放次数超过 50 次,则该触发器将设置 album.is_best_seller 状态。
导入 pre_update 装饰器,查看下面当前可用装饰器/信号的列表。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 | import asyncio
import databases
import ormar
import sqlalchemy
from examples import create_drop_database
from ormar import pre_update
DATABASE_URL = "sqlite:///test.db"
ormar_base_config = ormar.OrmarConfig(
database=databases.Database(DATABASE_URL),
metadata=sqlalchemy.MetaData(),
)
class Album(ormar.Model):
ormar_config = ormar_base_config.copy(
tablename="albums",
)
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
is_best_seller: bool = ormar.Boolean(default=False)
play_count: int = ormar.Integer(default=0)
@pre_update(Album)
async def before_update(sender, instance, **kwargs):
if instance.play_count > 50 and not instance.is_best_seller:
instance.is_best_seller = True
@create_drop_database(base_config=ormar_base_config)
async def run_query():
# here album.play_count ans is_best_seller get default values
album = await Album.objects.create(name="Venice")
assert not album.is_best_seller
assert album.play_count == 0
album.play_count = 30
# here a trigger is called but play_count is too low
await album.update()
assert not album.is_best_seller
album.play_count = 60
await album.update()
assert album.is_best_seller
asyncio.run(run_query())
|
定义你的函数。
请注意,每个接收器功能:
- 必须是可调用的
- 必须接受接收发送对象类的第一个发送者参数
- 必须接受 **kwargs 参数作为每个 ormar 中发送的参数。信号可以随时更改,因此您的函数必须为它们提供服务。
- 必须是异步的,因为回调被收集并等待。
pre_update 当前除了发送者之外仅发送一个参数,并且它是实例一。
请注意 pre_update 装饰器如何接受 senders 参数,该参数可以是您要为其运行信号接收器的单个模型或模型列表。
目前,如果不显式地将所有模型全部传递到接收器的注册中,则无法立即为所有模型设置信号。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 | import asyncio
import databases
import ormar
import sqlalchemy
from examples import create_drop_database
from ormar import pre_update
DATABASE_URL = "sqlite:///test.db"
ormar_base_config = ormar.OrmarConfig(
database=databases.Database(DATABASE_URL),
metadata=sqlalchemy.MetaData(),
)
class Album(ormar.Model):
ormar_config = ormar_base_config.copy(
tablename="albums",
)
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
is_best_seller: bool = ormar.Boolean(default=False)
play_count: int = ormar.Integer(default=0)
@pre_update(Album)
async def before_update(sender, instance, **kwargs):
if instance.play_count > 50 and not instance.is_best_seller:
instance.is_best_seller = True
@create_drop_database(base_config=ormar_base_config)
async def run_query():
# here album.play_count ans is_best_seller get default values
album = await Album.objects.create(name="Venice")
assert not album.is_best_seller
assert album.play_count == 0
album.play_count = 30
# here a trigger is called but play_count is too low
await album.update()
assert not album.is_best_seller
album.play_count = 60
await album.update()
assert album.is_best_seller
asyncio.run(run_query())
|
!!!note 请注意,接收器是在类级别 -> 上定义的,因此即使您通过实例连接/断开函数,它也会运行/停止运行该 ormar.Model 类上的所有操作。
请注意,我们新创建的函数具有实例和实例的类,因此如果您愿意,您可以轻松地在接收器内运行数据库查询。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 | import asyncio
import databases
import ormar
import sqlalchemy
from examples import create_drop_database
from ormar import pre_update
DATABASE_URL = "sqlite:///test.db"
ormar_base_config = ormar.OrmarConfig(
database=databases.Database(DATABASE_URL),
metadata=sqlalchemy.MetaData(),
)
class Album(ormar.Model):
ormar_config = ormar_base_config.copy(
tablename="albums",
)
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
is_best_seller: bool = ormar.Boolean(default=False)
play_count: int = ormar.Integer(default=0)
@pre_update(Album)
async def before_update(sender, instance, **kwargs):
if instance.play_count > 50 and not instance.is_best_seller:
instance.is_best_seller = True
@create_drop_database(base_config=ormar_base_config)
async def run_query():
# here album.play_count ans is_best_seller get default values
album = await Album.objects.create(name="Venice")
assert not album.is_best_seller
assert album.play_count == 0
album.play_count = 30
# here a trigger is called but play_count is too low
await album.update()
assert not album.is_best_seller
album.play_count = 60
await album.update()
assert album.is_best_seller
asyncio.run(run_query())
|
您可以通过将模型列表传递给信号装饰器来一次为多个模型定义相同的接收器。
| # define a dummy debug function
@pre_update([Album, Track])
async def before_update(sender, instance, **kwargs):
print(f"{sender.get_name()}: {instance.model_dump_json()}: {kwargs}")
|
当然,您也可以为同一信号和模型创建多个函数。他们每个人都会在每个信号时运行。
| @pre_update(Album)
async def before_update(sender, instance, **kwargs):
print(f"{sender.get_name()}: {instance.model_dump_json()}: {kwargs}")
@pre_update(Album)
async def before_update2(sender, instance, **kwargs):
print(f'About to update {sender.get_name()} with pk: {instance.pk}')
|
请注意,ormar 装饰器是语法糖,您可以直接连接给定模型的给定信号的函数或方法。 Connect 仅接受一个参数 - 您的接收器函数/方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 | class AlbumAuditor:
def __init__(self):
self.event_type = "ALBUM_INSTANCE"
async def before_save(self, sender, instance, **kwargs):
await AuditLog(
event_type=f"{self.event_type}_SAVE", event_log=instance.model_dump_json()
).save()
auditor = AlbumAuditor()
pre_save(Album)(auditor.before_save)
# call above has same result like the one below
Album.ormar_config.signals.pre_save.connect(auditor.before_save)
# signals are also exposed on instance
album = Album(name='Miami')
album.signals.pre_save.connect(auditor.before_save)
|
!!!警告请注意,信号保留对接收器的引用(而不是弱引用),因此请记住这一点以避免循环引用。
断开接收器的连接
要断开接收器并停止其运行给定模型,您需要断开它。
| @pre_update(Album)
async def before_update(sender, instance, **kwargs):
if instance.play_count > 50 and not instance.is_best_seller:
instance.is_best_seller = True
# disconnect given function from signal for given Model
Album.ormar_config.signals.pre_save.disconnect(before_save)
# signals are also exposed on instance
album = Album(name='Miami')
album.signals.pre_save.disconnect(before_save)
|
可用信号
!!!警告 请注意,不会发送信号:
| * bulk operations (`QuerySet.bulk_create` and `QuerySet.bulk_update`) as they are designed for speed.
* queryset table level operations (`QuerySet.update` and `QuerySet.delete`) as they run on the underlying tables
(more like raw sql update/delete operations) and do not have specific instance.
|
预保存
pre_save(sender: Type["Model"], instance: "Model")
发送 Model.save() 和 Model.objects.create() 方法。
sender 是一个 ormar.Model 类,instance 是要保存的模型。
保存后
post_save(sender: Type["Model"], instance: "Model")
发送 Model.save() 和 Model.objects.create() 方法。
sender 是一个 ormar.Model 类,instance 是保存的模型。
预更新
pre_update(sender: Type["Model"], instance: "Model")
发送给 Model.update() 方法。
sender 是一个 ormar.Model 类,instance 是要更新的模型。
更新后
post_update(sender: Type["Model"], instance: "Model")
发送给 Model.update() 方法。
sender 是一个 ormar.Model 类,instance 是已更新的模型。
预删除
pre_delete(sender: Type["Model"], instance: "Model")
发送 Model.save() 和 Model.objects.create() 方法。
sender 是一个 ormar.Model 类,instance 是要删除的模型。
删除后
post_delete(sender: Type["Model"], instance: "Model")
发送给 Model.update() 方法。
sender 是一个 ormar.Model 类,instance 是被删除的模型。
预关系添加
pre_relation_add(sender: Type["Model"], instance: "Model", child: "Model", relation_name: str, passed_args: Dict)
发送多对多关系的 Model.relation_name.add() 方法和外键关系的反向。
sender - 发送者类,instance - 添加相关模型的实例,child - 正在添加的模型,relation_name - 添加子项的关系的名称,对于添加信号也传递_kwargs - 传递给 add() 的 kwargs 的字典
后关系添加
post_relation_add(sender: Type["Model"], instance: "Model", child: "Model", relation_name: str, passed_args: Dict)
发送多对多关系的 Model.relation_name.add() 方法和外键关系的反向。
sender - 发送者类,instance - 添加相关模型的实例,child - 正在添加的模型,relation_name - 添加子项的关系的名称,对于添加信号也传递_kwargs - 传递给 add() 的 kwargs 的字典
预关系删除
pre_relation_remove(sender: Type["Model"], instance: "Model", child: "Model", relation_name: str)
发送多对多关系的 Model.relation_name.remove() 方法和外键关系的反向。
sender - 发送者类,instance - 添加相关模型的实例,child - 正在添加的模型,relation_name - 添加子项的关系的名称。
删除关系后
post_relation_remove(sender: Type["Model"], instance: "Model", child: "Model", relation_name: str, passed_args: Dict)
发送多对多关系的 Model.relation_name.remove() 方法和外键关系的反向。
sender - 发送者类,instance - 添加相关模型的实例,child - 正在添加的模型,relation_name - 添加子项的关系的名称。
批量更新后
post_bulk_update(sender: Type["Model"], instances: List["Model"], **kwargs)
, 发送给 Model.objects.bulk_update(List[objects])
方法。
定义您自己的信号
请注意,您可以创建自己的信号,尽管您必须在代码或子类 ormar.Model 中手动发送它们并在那里触发信号。
创建新信号非常简单。以下示例将设置一个名为 your_custom_signal 的新信号。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 | import databases
import sqlalchemy
import ormar
base_ormar_config = ormar.OrmarConfig(
database=databases.Database("sqlite:///db.sqlite"),
metadata=sqlalchemy.MetaData(),
)
class Album(ormar.Model):
ormar_config = base_ormar_config.copy()
id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
is_best_seller: bool = ormar.Boolean(default=False)
play_count: int = ormar.Integer(default=0)
Album.ormar_config.signals.your_custom_signal = ormar.Signal()
Album.ormar_config.signals.your_custom_signal.connect(your_receiver_name)
|
实际上,信号的底层是一个 SignalEmitter 实例,它保存已知信号的字典,并允许您将它们作为属性访问。当您尝试访问不存在的信号时,SignalEmitter 将为您创建一个信号。
所以上面的例子可以简化为。将为您创建信号。
| Album.ormar_config.signals.your_custom_signal.connect(your_receiver_name)
|
现在要触发此信号,您需要调用信号的发送方法。
| await Album.ormar_config.signals.your_custom_signal.send(sender=Album)
|
请注意,sender 是唯一必需的参数,它应该是 ormar Model 类。
附加参数必须作为关键字参数传递。
| await Album.ormar_config.signals.your_custom_signal.send(sender=Album, my_param=True)
|