Python 观察者模式
观察者模式是一种行为设计模式,它定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。当主题对象状态发生变化时,它的所有观察者都会收到通知并自动更新。
生活中的观察者模式
想象一下报纸订阅的场景:
- 报社(主题)发布新闻
- 订阅者(观察者)订阅报纸
- 当有新报纸出版时,所有订阅者都会自动收到报纸
- 订阅者可以随时取消订阅
这种"发布-订阅"的机制就是观察者模式的典型应用。
观察者模式的核心组件
观察者模式包含四个关键角色:
1. 主题 (Subject)
- 维护观察者列表
- 提供注册和注销观察者的方法
- 通知所有观察者状态变化
2. 具体主题 (Concrete Subject)
- 存储具体状态信息
- 状态改变时通知观察者
3. 观察者 (Observer)
- 定义更新接口,用于接收主题通知
4. 具体观察者 (Concrete Observer)
- 实现更新接口,保持与主题状态一致
观察者模式的实现
让我们通过一个完整的示例来理解观察者模式的实现。
基础实现
实例
from abc import ABC, abstractmethod
from typing import List
# 观察者接口
class Observer(ABC):
@abstractmethod
def update(self, message: str):
"""观察者更新方法"""
pass
# 主题接口
class Subject(ABC):
@abstractmethod
def register_observer(self, observer: Observer):
"""注册观察者"""
pass
@abstractmethod
def remove_observer(self, observer: Observer):
"""移除观察者"""
pass
@abstractmethod
def notify_observers(self):
"""通知所有观察者"""
pass
# 具体主题 - 新闻发布器
class NewsPublisher(Subject):
def __init__(self):
self._observers: List[Observer] = []
self._latest_news = ""
def register_observer(self, observer: Observer):
"""注册观察者"""
if observer not in self._observers:
self._observers.append(observer)
print(f"观察者 {observer.__class__.__name__} 注册成功")
def remove_observer(self, observer: Observer):
"""移除观察者"""
if observer in self._observers:
self._observers.remove(observer)
print(f"观察者 {observer.__class__.__name__} 移除成功")
def notify_observers(self):
"""通知所有观察者"""
print(f"\n发布新消息: {self._latest_news}")
for observer in self._observers:
observer.update(self._latest_news)
def publish_news(self, news: str):
"""发布新闻"""
self._latest_news = news
self.notify_observers()
def get_observer_count(self) -> int:
"""获取观察者数量"""
return len(self._observers)
# 具体观察者 - 邮件订阅者
class EmailSubscriber(Observer):
def __init__(self, name: str):
self.name = name
def update(self, message: str):
"""接收更新并发送邮件"""
print(f"[邮件] {self.name} 收到消息: {message}")
# 具体观察者 - 短信订阅者
class SMSSubscriber(Observer):
def __init__(self, name: str, phone: str):
self.name = name
self.phone = phone
def update(self, message: str):
"""接收更新并发送短信"""
print(f"[短信] {self.phone} - {self.name} 收到消息: {message}")
# 具体观察者 - 应用推送订阅者
class AppPushSubscriber(Observer):
def __init__(self, username: str):
self.username = username
def update(self, message: str):
"""接收更新并发送应用推送"""
print(f"[应用推送] 用户 {self.username} 收到推送: {message}")
from typing import List
# 观察者接口
class Observer(ABC):
@abstractmethod
def update(self, message: str):
"""观察者更新方法"""
pass
# 主题接口
class Subject(ABC):
@abstractmethod
def register_observer(self, observer: Observer):
"""注册观察者"""
pass
@abstractmethod
def remove_observer(self, observer: Observer):
"""移除观察者"""
pass
@abstractmethod
def notify_observers(self):
"""通知所有观察者"""
pass
# 具体主题 - 新闻发布器
class NewsPublisher(Subject):
def __init__(self):
self._observers: List[Observer] = []
self._latest_news = ""
def register_observer(self, observer: Observer):
"""注册观察者"""
if observer not in self._observers:
self._observers.append(observer)
print(f"观察者 {observer.__class__.__name__} 注册成功")
def remove_observer(self, observer: Observer):
"""移除观察者"""
if observer in self._observers:
self._observers.remove(observer)
print(f"观察者 {observer.__class__.__name__} 移除成功")
def notify_observers(self):
"""通知所有观察者"""
print(f"\n发布新消息: {self._latest_news}")
for observer in self._observers:
observer.update(self._latest_news)
def publish_news(self, news: str):
"""发布新闻"""
self._latest_news = news
self.notify_observers()
def get_observer_count(self) -> int:
"""获取观察者数量"""
return len(self._observers)
# 具体观察者 - 邮件订阅者
class EmailSubscriber(Observer):
def __init__(self, name: str):
self.name = name
def update(self, message: str):
"""接收更新并发送邮件"""
print(f"[邮件] {self.name} 收到消息: {message}")
# 具体观察者 - 短信订阅者
class SMSSubscriber(Observer):
def __init__(self, name: str, phone: str):
self.name = name
self.phone = phone
def update(self, message: str):
"""接收更新并发送短信"""
print(f"[短信] {self.phone} - {self.name} 收到消息: {message}")
# 具体观察者 - 应用推送订阅者
class AppPushSubscriber(Observer):
def __init__(self, username: str):
self.username = username
def update(self, message: str):
"""接收更新并发送应用推送"""
print(f"[应用推送] 用户 {self.username} 收到推送: {message}")
使用示例
实例
def demo_basic_observer():
"""基础观察者模式演示"""
print("=== 观察者模式基础演示 ===\n")
# 创建新闻发布器
publisher = NewsPublisher()
# 创建不同类型的订阅者
email_user = EmailSubscriber("张三")
sms_user = SMSSubscriber("李四", "13800138000")
app_user = AppPushSubscriber("王五")
# 注册订阅者
publisher.register_observer(email_user)
publisher.register_observer(sms_user)
publisher.register_observer(app_user)
print(f"\n当前订阅者数量: {publisher.get_observer_count()}")
# 发布新闻
publisher.publish_news("Python 3.12 正式发布!")
# 移除一个订阅者
publisher.remove_observer(sms_user)
print(f"\n移除后订阅者数量: {publisher.get_observer_count()}")
# 再次发布新闻
publisher.publish_news("观察者模式学习指南上线!")
# 运行演示
if __name__ == "__main__":
demo_basic_observer()
"""基础观察者模式演示"""
print("=== 观察者模式基础演示 ===\n")
# 创建新闻发布器
publisher = NewsPublisher()
# 创建不同类型的订阅者
email_user = EmailSubscriber("张三")
sms_user = SMSSubscriber("李四", "13800138000")
app_user = AppPushSubscriber("王五")
# 注册订阅者
publisher.register_observer(email_user)
publisher.register_observer(sms_user)
publisher.register_observer(app_user)
print(f"\n当前订阅者数量: {publisher.get_observer_count()}")
# 发布新闻
publisher.publish_news("Python 3.12 正式发布!")
# 移除一个订阅者
publisher.remove_observer(sms_user)
print(f"\n移除后订阅者数量: {publisher.get_observer_count()}")
# 再次发布新闻
publisher.publish_news("观察者模式学习指南上线!")
# 运行演示
if __name__ == "__main__":
demo_basic_observer()
运行结果:
=== 观察者模式基础演示 === 观察者 EmailSubscriber 注册成功 观察者 SMSSubscriber 注册成功 观察者 AppPushSubscriber 注册成功 当前订阅者数量: 3 发布新消息: Python 3.12 正式发布! [邮件] 张三 收到消息: Python 3.12 正式发布! [短信] 13800138000 - 李四 收到消息: Python 3.12 正式发布! [应用推送] 用户 王五 收到推送: Python 3.12 正式发布! 观察者 SMSSubscriber 移除成功 移除后订阅者数量: 2 发布新消息: 观察者模式学习指南上线! [邮件] 张三 收到消息: 观察者模式学习指南上线! [应用推送] 用户 王五 收到推送: 观察者模式学习指南上线!
高级应用:股票价格监控系统
让我们通过一个更实用的例子来加深理解。
实例
# 股票监控系统示例
class Stock:
"""股票类"""
def __init__(self, symbol: str, price: float):
self.symbol = symbol # 股票代码
self._price = price # 当前价格
self._history = [price] # 价格历史
@property
def price(self) -> float:
return self._price
@price.setter
def price(self, value: float):
"""设置价格并记录历史"""
old_price = self._price
self._price = value
self._history.append(value)
return old_price, value
def get_price_change(self) -> float:
"""获取价格变化百分比"""
if len(self._history) < 2:
return 0.0
return ((self._history[-1] - self._history[-2]) / self._history[-2]) * 100
class StockMarket(Subject):
"""股票市场 - 具体主题"""
def __init__(self):
self._observers: List[Observer] = []
self._stocks = {} # 股票字典: {代码: Stock对象}
def register_observer(self, observer: Observer):
if observer not in self._observers:
self._observers.append(observer)
def remove_observer(self, observer: Observer):
if observer in self._observers:
self._observers.remove(observer)
def notify_observers(self, stock: Stock):
"""通知观察者特定股票的变化"""
change = stock.get_price_change()
message = f"{stock.symbol}: ${stock.price:.2f} ({change:+.2f}%)"
for observer in self._observers:
observer.update(message)
def add_stock(self, symbol: str, initial_price: float):
"""添加股票到市场"""
self._stocks[symbol] = Stock(symbol, initial_price)
def update_stock_price(self, symbol: str, new_price: float):
"""更新股票价格并通知观察者"""
if symbol in self._stocks:
stock = self._stocks[symbol]
old_price, current_price = stock.price, new_price
stock.price = new_price
# 只有价格真正变化时才通知
if old_price != current_price:
self.notify_observers(stock)
class Investor(Observer):
"""投资者 - 具体观察者"""
def __init__(self, name: str, interest_stocks: List[str] = None):
self.name = name
self.interest_stocks = interest_stocks or []
self.notifications = []
def update(self, message: str):
"""接收股票更新"""
stock_symbol = message.split(":")[0]
# 只关注感兴趣的股票
if not self.interest_stocks or stock_symbol in self.interest_stocks:
self.notifications.append(message)
print(f"投资者 {self.name} 收到: {message}")
def show_notifications(self):
"""显示所有通知"""
print(f"\n=== {self.name} 的通知记录 ===")
for notification in self.notifications:
print(f" - {notification}")
def demo_stock_market():
"""股票市场演示"""
print("\n=== 股票市场监控系统演示 ===\n")
# 创建股票市场
market = StockMarket()
# 添加股票
market.add_stock("AAPL", 150.0)
market.add_stock("GOOGL", 2800.0)
market.add_stock("TSLA", 250.0)
# 创建投资者
investor1 = Investor("技术投资者", ["AAPL", "GOOGL"])
investor2 = Investor("新能源投资者", ["TSLA"])
investor3 = Investor("全市场投资者")
# 注册投资者
market.register_observer(investor1)
market.register_observer(investor2)
market.register_observer(investor3)
# 模拟股票价格变化
print("开始模拟股票交易...")
market.update_stock_price("AAPL", 152.5) # AAPL 上涨
market.update_stock_price("TSLA", 245.0) # TSLA 下跌
market.update_stock_price("GOOGL", 2820.0) # GOOGL 上涨
market.update_stock_price("AAPL", 151.0) # AAPL 小幅下跌
# 显示投资者通知记录
investor1.show_notifications()
investor2.show_notifications()
investor3.show_notifications()
# 运行股票市场演示
demo_stock_market()
class Stock:
"""股票类"""
def __init__(self, symbol: str, price: float):
self.symbol = symbol # 股票代码
self._price = price # 当前价格
self._history = [price] # 价格历史
@property
def price(self) -> float:
return self._price
@price.setter
def price(self, value: float):
"""设置价格并记录历史"""
old_price = self._price
self._price = value
self._history.append(value)
return old_price, value
def get_price_change(self) -> float:
"""获取价格变化百分比"""
if len(self._history) < 2:
return 0.0
return ((self._history[-1] - self._history[-2]) / self._history[-2]) * 100
class StockMarket(Subject):
"""股票市场 - 具体主题"""
def __init__(self):
self._observers: List[Observer] = []
self._stocks = {} # 股票字典: {代码: Stock对象}
def register_observer(self, observer: Observer):
if observer not in self._observers:
self._observers.append(observer)
def remove_observer(self, observer: Observer):
if observer in self._observers:
self._observers.remove(observer)
def notify_observers(self, stock: Stock):
"""通知观察者特定股票的变化"""
change = stock.get_price_change()
message = f"{stock.symbol}: ${stock.price:.2f} ({change:+.2f}%)"
for observer in self._observers:
observer.update(message)
def add_stock(self, symbol: str, initial_price: float):
"""添加股票到市场"""
self._stocks[symbol] = Stock(symbol, initial_price)
def update_stock_price(self, symbol: str, new_price: float):
"""更新股票价格并通知观察者"""
if symbol in self._stocks:
stock = self._stocks[symbol]
old_price, current_price = stock.price, new_price
stock.price = new_price
# 只有价格真正变化时才通知
if old_price != current_price:
self.notify_observers(stock)
class Investor(Observer):
"""投资者 - 具体观察者"""
def __init__(self, name: str, interest_stocks: List[str] = None):
self.name = name
self.interest_stocks = interest_stocks or []
self.notifications = []
def update(self, message: str):
"""接收股票更新"""
stock_symbol = message.split(":")[0]
# 只关注感兴趣的股票
if not self.interest_stocks or stock_symbol in self.interest_stocks:
self.notifications.append(message)
print(f"投资者 {self.name} 收到: {message}")
def show_notifications(self):
"""显示所有通知"""
print(f"\n=== {self.name} 的通知记录 ===")
for notification in self.notifications:
print(f" - {notification}")
def demo_stock_market():
"""股票市场演示"""
print("\n=== 股票市场监控系统演示 ===\n")
# 创建股票市场
market = StockMarket()
# 添加股票
market.add_stock("AAPL", 150.0)
market.add_stock("GOOGL", 2800.0)
market.add_stock("TSLA", 250.0)
# 创建投资者
investor1 = Investor("技术投资者", ["AAPL", "GOOGL"])
investor2 = Investor("新能源投资者", ["TSLA"])
investor3 = Investor("全市场投资者")
# 注册投资者
market.register_observer(investor1)
market.register_observer(investor2)
market.register_observer(investor3)
# 模拟股票价格变化
print("开始模拟股票交易...")
market.update_stock_price("AAPL", 152.5) # AAPL 上涨
market.update_stock_price("TSLA", 245.0) # TSLA 下跌
market.update_stock_price("GOOGL", 2820.0) # GOOGL 上涨
market.update_stock_price("AAPL", 151.0) # AAPL 小幅下跌
# 显示投资者通知记录
investor1.show_notifications()
investor2.show_notifications()
investor3.show_notifications()
# 运行股票市场演示
demo_stock_market()
观察者模式的优缺点
优点
| 优点 | 说明 |
|---|---|
| 松耦合 | 主题和观察者之间依赖关系松散,可以独立变化 |
| 可扩展性 | 可以轻松添加新的观察者,无需修改主题代码 |
| 广播通信 | 支持一对多的通信方式,消息可以广播给多个对象 |
| 开放封闭原则 | 对扩展开放,对修改封闭 |
缺点
| 缺点 | 说明 | 解决方案 |
|---|---|---|
| 内存泄漏 | 观察者忘记注销可能导致内存泄漏 | 使用弱引用或确保及时注销 |
| 更新顺序 | 观察者更新顺序不确定 | 如果需要顺序,可以添加优先级机制 |
| 性能问题 | 观察者数量多时,通知可能影响性能 | 使用异步通知或批量更新 |
实际应用场景
1. GUI 事件处理
实例
# 伪代码示例
button.on_click(handler1) # 注册点击事件观察者
button.on_click(handler2) # 注册另一个观察者
# 点击按钮时,所有注册的处理器都会被调用
button.on_click(handler1) # 注册点击事件观察者
button.on_click(handler2) # 注册另一个观察者
# 点击按钮时,所有注册的处理器都会被调用
2. 消息队列系统
- 生产者发布消息
- 多个消费者订阅并处理消息
3. 数据监控系统
- 数据源状态变化
- 多个监控服务接收通知并采取行动
4. 游戏开发
- 游戏事件系统
- UI 元素响应游戏状态变化
最佳实践和注意事项
1. 使用弱引用避免内存泄漏
实例
import weakref
class SafeSubject(Subject):
def __init__(self):
self._observers = weakref.WeakSet() # 使用弱引用集合
def register_observer(self, observer: Observer):
self._observers.add(observer)
class SafeSubject(Subject):
def __init__(self):
self._observers = weakref.WeakSet() # 使用弱引用集合
def register_observer(self, observer: Observer):
self._observers.add(observer)
2. 异步通知提高性能
实例
import asyncio
class AsyncObserver(ABC):
@abstractmethod
async def update_async(self, message: str):
pass
class AsyncSubject:
async def notify_observers_async(self):
tasks = [observer.update_async(self._message)
for observer in self._observers]
await asyncio.gather(*tasks)
class AsyncObserver(ABC):
@abstractmethod
async def update_async(self, message: str):
pass
class AsyncSubject:
async def notify_observers_async(self):
tasks = [observer.update_async(self._message)
for observer in self._observers]
await asyncio.gather(*tasks)
3. 添加错误处理
实例
def safe_notify(self):
for observer in self._observers.copy(): # 使用副本避免修改问题
try:
observer.update(self._message)
except Exception as e:
print(f"观察者 {observer} 更新失败: {e}")
for observer in self._observers.copy(): # 使用副本避免修改问题
try:
observer.update(self._message)
except Exception as e:
print(f"观察者 {observer} 更新失败: {e}")
总结
观察者模式是 Python 中非常重要的设计模式,它提供了一种优雅的方式来实现对象间的松耦合通信。通过本文的学习,你应该能够:
- 理解观察者模式的核心概念和适用场景
- 掌握观察者模式的基本实现方法
- 在实际项目中应用观察者模式解决问题
- 避免观察者模式的常见陷阱
点我分享笔记