Skip to content

feat(os/gevent): Add gevent package for event-driven architecture support #4365

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

LanceAdd
Copy link
Contributor

实现了GoFrame框架的事件总线模块(gevent),提供了一种解耦组件间通信的机制。该模块支持事件优先级、并行处理、错误处理策略等。

主要特性

  1. 事件总线(Event Bus):

    • 实现了顺序事件总线(SeqEventBus)
    • 支持多个订阅者订阅同一主题
    • 提供默认事件总线实例
  2. 事件(Event):

    • 基础事件接口和实现
    • 支持自定义事件工厂函数
    • 包含主题、数据、错误处理模式和执行模式等属性
  3. 优先级处理:

    • 5个优先级级别(PriorityImmediate, PriorityUrgent, PriorityHigh, PriorityNormal, PriorityLow)
    • 相同优先级按订阅顺序处理
  4. 执行模式:

    • 顺序执行(Seq)
    • 并行执行(Parallel)
  5. 错误处理:

    • Stop模式: 出错时停止后续处理
    • Ignore模式: 忽略错误继续处理
  6. 订阅管理:

    • 支持订阅和取消订阅
    • 自动清理无订阅者的主题
  7. 其他特性:

    • 支持自定义事件工厂函数
    • 线程安全
    • 优雅关闭机制

使用示例

基础用法

 可以使用DefaultEventBusy也可以使用gevent.New()创建自己的EventBus
// 订阅事件
subscriber, err := gevent.DefaultEventBus.Subscribe("user.registered", func(e gevent.Event) error {
    fmt.Println("处理用户注册事件:", e.GetData())
    return nil
}, nil, nil)

// 发布事件
params := map[string]any{"userId": 12345, "username": "john"}
ok, err := gevent.DefaultEventBus.Publish("user.registered", params, gevent.Ignore, gevent.Seq)

并行处理

// 订阅多个处理器
gevent.DefaultEventBus.Subscribe("order.created", func(e gevent.Event) error {
    // 发送邮件
    return nil
}, nil, nil)

gevent.DefaultEventBus.Subscribe("order.created", func(e gevent.Event) error {
    // 更新库存
    return nil
}, nil, nil)

// 并行执行
params := map[string]any{"orderId": "ORD-12345"}
gevent.DefaultEventBus.Publish("order.created", params, gevent.Ignore, gevent.Parallel)

优先级处理

// 高优先级处理器
gevent.DefaultEventBus.Subscribe("payment.success", handler1, nil, nil, gevent.PriorityHigh)

// 低优先级处理器
gevent.DefaultEventBus.Subscribe("payment.success", handler2, nil, nil, gevent.PriorityLow)

- 新增 gevent 包,提供事件总线功能
- 实现了事件的发布、订阅、取消订阅等基本功能
- 支持事件的顺序和并行执行模式
- 提供自定义事件工厂函数支持
- 增加错误处理机制,支持停止和忽略错误模式
- 添加了完整的单元测试用例验证功能正确性
- 调整了 handlerProcessor 中信号处理的顺序,确保 wg.Done() 在所有处理完成后执行
- 优化了 topicProcessor 的初始化逻辑,将 asyncProcess 的启动移至初始化之后
- 调整了 handlerProcessor 中信号处理的顺序,确保 wg.Done() 在所有处理完成后执行
- 优化了 topicProcessor 的初始化逻辑,将 asyncProcess 的启动移至初始化之后
- 为 SeqEventBusOption 结构体添加 CloneEvent 字段,用于控制是否克隆事件
- 实现 cloneEvent 方法,根据配置决定是否克隆事件- 修改事件处理流程,支持事件克隆功能
- 调整 DefaultEventBus 的 QueueSize从 1000 改为 100
- 添加 TopicSize 方法,用于获取主题数量
- 添加 Topics 方法,用于获取所有主题列表- 添加 ContainsTopic 方法,用于检查主题是否存在
- 添加 TopicSubscriberSize 方法,用于获取特定主题的订阅者数量
- 新增 asyncProcessAll 方法,用于处理所有事件后关闭通道
- 添加 quitChan通道,用于通知事件总线关闭
- 重构 asyncProcess 方法,支持根据配置选择不同的处理策略- 增加 WaitExit配置项,决定是否等待所有事件处理完成再退出
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant