事件总线抽象接口
发布订阅接口
public interface IEventBus
{
void Publish<T>(string topic, T data);
Task PublishAsync<T>(string topic, T data);
void Subscribe(params string[] topics);
}
统一调用事件处理
public interface ICallEventHandler
{
Task Handle(string topic, byte[] value);
}
事件处理
泛型事件处理
动态类型事件处理
事件基类
订阅特性
订阅管理
基于AspNetCore Channel 实现进程内事件总线
- 安装
Install-Package IEventBus.AspNetCore
dotnet add package IEventBus.AspNetCore
- 注册服务
//version 1.0.0
builder.Services.AddAspNetCoreEventBus();
//version 2.0.0
builder.Services.AddEventBus
(
options => { options.UseAspNetCore(); }
);
- 配置管道
app.UseEventBus();
- 定义事件类
//如果事件处理类继承IntegrationEventHandler<> 需要继承IntegrationEvent
public class Test
{
public string Msg { get; set; }
}
- 定义事件处理类重写HandleAsync
//订阅主题
[Subscribe("Test")]
public class TestHandler : IntegrationEventHandler<Test>//Or DynamicIntegrationEventHandler
{
public override async Task HandleAsync(string topic, Test? value)
{
Console.WriteLine(value?.Msg);
await Task.Yield();
}
}
- 生产者通过IEventBus.PublishAsync 生产数据
[Route("api/[controller]")]
[ApiController]
public class EventBusController : ControllerBase
{
private readonly IEventBus _eventBus;
public EventBusController(IEventBus eventBus)
{
_eventBus = eventBus;
}
[HttpGet]
public async Task Test()
{
await _eventBus.PublishAsync("mc", new Test { Msg = "zxc" });
}
}
基于 Confluent.Kafka 实现事件总线
- 安装
//version 1.0.0
Install-Package Confluent.Kafka.EventBus.AspNetCore
//version 2.0.0
Install-Package IEventBus.Confluent.Kafka
//version 1.0.0
dotnet add package Confluent.Kafka.EventBus.AspNetCore
//version 2.0.0
dotnet add package IEventBus.Confluent.Kafka
- 注册服务
//version 1.0.0
builder.Services.AddConfluentKafkaEventBus();
//version 2.0.0
builder.Services.AddEventBus
(
options => { options.UseKafka(builder.Configuration); }
);
- 配置管道
app.UseEventBus();
- 定义事件类
//如果事件处理类继承IntegrationEventHandler<> 需要继承IntegrationEvent
public class Test
{
public string Msg { get; set; }
}
- 定义事件处理类重写HandleAsync
//订阅主题
[Subscribe("Test")]
public class TestHandler : IntegrationEventHandler<Test>//Or DynamicIntegrationEventHandler
{
public override async Task HandleAsync(string topic, Test? value)
{
Console.WriteLine(value?.Msg);
await Task.Yield();
}
}
- 生产者通过IEventBus.PublishAsync 生产数据
[Route("api/[controller]")]
[ApiController]
public class EventBusController : ControllerBase
{
private readonly IEventBus _eventBus;
public EventBusController(IEventBus eventBus)
{
_eventBus = eventBus;
}
[HttpGet]
public async Task Test()
{
await _eventBus.PublishAsync("mc", new Test { Msg = "zxc" });
}
}