Python 观察者模式

观察者模式是一种行为设计模式,它定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。当主题对象状态发生变化时,它的所有观察者都会收到通知并自动更新。

生活中的观察者模式

想象一下报纸订阅的场景:

  • 报社(主题)发布新闻
  • 订阅者(观察者)订阅报纸
  • 当有新报纸出版时,所有订阅者都会自动收到报纸
  • 订阅者可以随时取消订阅

这种"发布-订阅"的机制就是观察者模式的典型应用。


观察者模式的核心组件

观察者模式包含四个关键角色:

1. 主题 (Subject)

  • 维护观察者列表
  • 提供注册和注销观察者的方法
  • 通知所有观察者状态变化

2. 具体主题 (Concrete Subject)

  • 存储具体状态信息
  • 状态改变时通知观察者

3. 观察者 (Observer)

  • 定义更新接口,用于接收主题通知

4. 具体观察者 (Concrete Observer)

  • 实现更新接口,保持与主题状态一致

观察者模式的实现

让我们通过一个完整的示例来理解观察者模式的实现。

基础实现

实例

from abc import ABC, abstractmethod
from typing import List

# 观察者接口
class Observer(ABC):
    @abstractmethod
    def update(self, message: str):
        """观察者更新方法"""
        pass

# 主题接口
class Subject(ABC):
    @abstractmethod
    def register_observer(self, observer: Observer):
        """注册观察者"""
        pass
   
    @abstractmethod
    def remove_observer(self, observer: Observer):
        """移除观察者"""
        pass
   
    @abstractmethod
    def notify_observers(self):
        """通知所有观察者"""
        pass

# 具体主题 - 新闻发布器
class NewsPublisher(Subject):
    def __init__(self):
        self._observers: List[Observer] = []
        self._latest_news = ""
   
    def register_observer(self, observer: Observer):
        """注册观察者"""
        if observer not in self._observers:
            self._observers.append(observer)
            print(f"观察者 {observer.__class__.__name__} 注册成功")
   
    def remove_observer(self, observer: Observer):
        """移除观察者"""
        if observer in self._observers:
            self._observers.remove(observer)
            print(f"观察者 {observer.__class__.__name__} 移除成功")
   
    def notify_observers(self):
        """通知所有观察者"""
        print(f"\n发布新消息: {self._latest_news}")
        for observer in self._observers:
            observer.update(self._latest_news)
   
    def publish_news(self, news: str):
        """发布新闻"""
        self._latest_news = news
        self.notify_observers()
   
    def get_observer_count(self) -> int:
        """获取观察者数量"""
        return len(self._observers)

# 具体观察者 - 邮件订阅者
class EmailSubscriber(Observer):
    def __init__(self, name: str):
        self.name = name
   
    def update(self, message: str):
        """接收更新并发送邮件"""
        print(f"[邮件] {self.name} 收到消息: {message}")

# 具体观察者 - 短信订阅者
class SMSSubscriber(Observer):
    def __init__(self, name: str, phone: str):
        self.name = name
        self.phone = phone
   
    def update(self, message: str):
        """接收更新并发送短信"""
        print(f"[短信] {self.phone} - {self.name} 收到消息: {message}")

# 具体观察者 - 应用推送订阅者
class AppPushSubscriber(Observer):
    def __init__(self, username: str):
        self.username = username
   
    def update(self, message: str):
        """接收更新并发送应用推送"""
        print(f"[应用推送] 用户 {self.username} 收到推送: {message}")

使用示例

实例

def demo_basic_observer():
    """基础观察者模式演示"""
    print("=== 观察者模式基础演示 ===\n")
   
    # 创建新闻发布器
    publisher = NewsPublisher()
   
    # 创建不同类型的订阅者
    email_user = EmailSubscriber("张三")
    sms_user = SMSSubscriber("李四", "13800138000")
    app_user = AppPushSubscriber("王五")
   
    # 注册订阅者
    publisher.register_observer(email_user)
    publisher.register_observer(sms_user)
    publisher.register_observer(app_user)
   
    print(f"\n当前订阅者数量: {publisher.get_observer_count()}")
   
    # 发布新闻
    publisher.publish_news("Python 3.12 正式发布!")
   
    # 移除一个订阅者
    publisher.remove_observer(sms_user)
   
    print(f"\n移除后订阅者数量: {publisher.get_observer_count()}")
   
    # 再次发布新闻
    publisher.publish_news("观察者模式学习指南上线!")

# 运行演示
if __name__ == "__main__":
    demo_basic_observer()

运行结果:

=== 观察者模式基础演示 ===

观察者 EmailSubscriber 注册成功
观察者 SMSSubscriber 注册成功
观察者 AppPushSubscriber 注册成功

当前订阅者数量: 3

发布新消息: Python 3.12 正式发布!
[邮件] 张三 收到消息: Python 3.12 正式发布!
[短信] 13800138000 - 李四 收到消息: Python 3.12 正式发布!
[应用推送] 用户 王五 收到推送: Python 3.12 正式发布!
观察者 SMSSubscriber 移除成功

移除后订阅者数量: 2

发布新消息: 观察者模式学习指南上线!
[邮件] 张三 收到消息: 观察者模式学习指南上线!
[应用推送] 用户 王五 收到推送: 观察者模式学习指南上线!

高级应用:股票价格监控系统

让我们通过一个更实用的例子来加深理解。

实例

# 股票监控系统示例
class Stock:
    """股票类"""
    def __init__(self, symbol: str, price: float):
        self.symbol = symbol  # 股票代码
        self._price = price   # 当前价格
        self._history = [price]  # 价格历史
   
    @property
    def price(self) -> float:
        return self._price
   
    @price.setter
    def price(self, value: float):
        """设置价格并记录历史"""
        old_price = self._price
        self._price = value
        self._history.append(value)
        return old_price, value
   
    def get_price_change(self) -> float:
        """获取价格变化百分比"""
        if len(self._history) < 2:
            return 0.0
        return ((self._history[-1] - self._history[-2]) / self._history[-2]) * 100

class StockMarket(Subject):
    """股票市场 - 具体主题"""
    def __init__(self):
        self._observers: List[Observer] = []
        self._stocks = {}  # 股票字典: {代码: Stock对象}
   
    def register_observer(self, observer: Observer):
        if observer not in self._observers:
            self._observers.append(observer)
   
    def remove_observer(self, observer: Observer):
        if observer in self._observers:
            self._observers.remove(observer)
   
    def notify_observers(self, stock: Stock):
        """通知观察者特定股票的变化"""
        change = stock.get_price_change()
        message = f"{stock.symbol}: ${stock.price:.2f} ({change:+.2f}%)"
       
        for observer in self._observers:
            observer.update(message)
   
    def add_stock(self, symbol: str, initial_price: float):
        """添加股票到市场"""
        self._stocks[symbol] = Stock(symbol, initial_price)
   
    def update_stock_price(self, symbol: str, new_price: float):
        """更新股票价格并通知观察者"""
        if symbol in self._stocks:
            stock = self._stocks[symbol]
            old_price, current_price = stock.price, new_price
            stock.price = new_price
           
            # 只有价格真正变化时才通知
            if old_price != current_price:
                self.notify_observers(stock)

class Investor(Observer):
    """投资者 - 具体观察者"""
    def __init__(self, name: str, interest_stocks: List[str] = None):
        self.name = name
        self.interest_stocks = interest_stocks or []
        self.notifications = []
   
    def update(self, message: str):
        """接收股票更新"""
        stock_symbol = message.split(":")[0]
       
        # 只关注感兴趣的股票
        if not self.interest_stocks or stock_symbol in self.interest_stocks:
            self.notifications.append(message)
            print(f"投资者 {self.name} 收到: {message}")
   
    def show_notifications(self):
        """显示所有通知"""
        print(f"\n=== {self.name} 的通知记录 ===")
        for notification in self.notifications:
            print(f"  - {notification}")

def demo_stock_market():
    """股票市场演示"""
    print("\n=== 股票市场监控系统演示 ===\n")
   
    # 创建股票市场
    market = StockMarket()
   
    # 添加股票
    market.add_stock("AAPL", 150.0)
    market.add_stock("GOOGL", 2800.0)
    market.add_stock("TSLA", 250.0)
   
    # 创建投资者
    investor1 = Investor("技术投资者", ["AAPL", "GOOGL"])
    investor2 = Investor("新能源投资者", ["TSLA"])
    investor3 = Investor("全市场投资者")
   
    # 注册投资者
    market.register_observer(investor1)
    market.register_observer(investor2)
    market.register_observer(investor3)
   
    # 模拟股票价格变化
    print("开始模拟股票交易...")
    market.update_stock_price("AAPL", 152.5)   # AAPL 上涨
    market.update_stock_price("TSLA", 245.0)   # TSLA 下跌
    market.update_stock_price("GOOGL", 2820.0) # GOOGL 上涨
    market.update_stock_price("AAPL", 151.0)   # AAPL 小幅下跌
   
    # 显示投资者通知记录
    investor1.show_notifications()
    investor2.show_notifications()
    investor3.show_notifications()

# 运行股票市场演示
demo_stock_market()

观察者模式的优缺点

优点

优点 说明
松耦合 主题和观察者之间依赖关系松散,可以独立变化
可扩展性 可以轻松添加新的观察者,无需修改主题代码
广播通信 支持一对多的通信方式,消息可以广播给多个对象
开放封闭原则 对扩展开放,对修改封闭

缺点

缺点 说明 解决方案
内存泄漏 观察者忘记注销可能导致内存泄漏 使用弱引用或确保及时注销
更新顺序 观察者更新顺序不确定 如果需要顺序,可以添加优先级机制
性能问题 观察者数量多时,通知可能影响性能 使用异步通知或批量更新

实际应用场景

1. GUI 事件处理

实例

# 伪代码示例
button.on_click(handler1)  # 注册点击事件观察者
button.on_click(handler2)  # 注册另一个观察者
# 点击按钮时,所有注册的处理器都会被调用

2. 消息队列系统

  • 生产者发布消息
  • 多个消费者订阅并处理消息

3. 数据监控系统

  • 数据源状态变化
  • 多个监控服务接收通知并采取行动

4. 游戏开发

  • 游戏事件系统
  • UI 元素响应游戏状态变化

最佳实践和注意事项

1. 使用弱引用避免内存泄漏

实例

import weakref

class SafeSubject(Subject):
    def __init__(self):
        self._observers = weakref.WeakSet()  # 使用弱引用集合
   
    def register_observer(self, observer: Observer):
        self._observers.add(observer)

2. 异步通知提高性能

实例

import asyncio

class AsyncObserver(ABC):
    @abstractmethod
    async def update_async(self, message: str):
        pass

class AsyncSubject:
    async def notify_observers_async(self):
        tasks = [observer.update_async(self._message)
                for observer in self._observers]
        await asyncio.gather(*tasks)

3. 添加错误处理

实例

def safe_notify(self):
    for observer in self._observers.copy():  # 使用副本避免修改问题
        try:
            observer.update(self._message)
        except Exception as e:
            print(f"观察者 {observer} 更新失败: {e}")

总结

观察者模式是 Python 中非常重要的设计模式,它提供了一种优雅的方式来实现对象间的松耦合通信。通过本文的学习,你应该能够:

  1. 理解观察者模式的核心概念和适用场景
  2. 掌握观察者模式的基本实现方法
  3. 在实际项目中应用观察者模式解决问题
  4. 避免观察者模式的常见陷阱