Net 手写 事件总线 发布订阅消息
前言
今晚打老虎
事件总线是对发布-订阅模式的一种实现。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种解耦
的目的。(项目的解耦很重要)
参考链接:https://blog.csdn.net/ZhaoHuaQiao_FL/article/details/118733737
第一步: 定义一个事件总线扩展类 EventBusExtensions
AppDomain.CurrentDomain.GetAssemblies().SelectMany(a => a.GetTypes()).ToList()
首先定义一个事件总线扩展类EventBusExtensions
通常获取获取AppDomain中所有的程序集
public static class EventBusExtensions
{
public static IServiceCollection AddSimpleEventBus(this IServiceCollection services)
{
var typeMethods = AppDomain.CurrentDomain.GetAssemblies().SelectMany(a => a.GetTypes()).ToList()
.Where(m => m.IsClass && !m.IsAbstract && !m.IsInterface && typeof(ISubscribeHandler).IsAssignableFrom(m))
.SelectMany(n => n.GetMethods(BindingFlags.Public | BindingFlags.Instance)
.Where(x => x.IsDefined(typeof(SubscribeMessageAttribute), false)
&& x.GetParameters().Length == 2
&& x.GetParameters()[0].ParameterType == typeof(string)
&& x.GetParameters()[1].ParameterType == typeof(object))
.GroupBy(m => m.DeclaringType));
if (!typeMethods.Any()) return services;
foreach (var item in typeMethods)
{
if (!item.Any()) continue;
var typeInstance = Activator.CreateInstance(item.Key);
foreach (var method in item)
{
var isAsyncMethod = false;
var action = Delegate.CreateDelegate(isAsyncMethod ? typeof(Func<string, object, Task>) : typeof(Action<string, object>), typeInstance, method.Name);
var subscribeMessageAttributes = method.GetCustomAttributes<SubscribeMessageAttribute>();
foreach (var attr in subscribeMessageAttributes)
{
if (string.IsNullOrWhiteSpace(attr.MessageId)) continue;
if (isAsyncMethod)
{
InternalMessageCenter.Instance.Subscribe(item.Key, attr.MessageId, (Func<string, object, Task>)action);
}
else
{
InternalMessageCenter.Instance.Subscribe(item.Key, attr.MessageId, (Action<string, object>)action);
}
}
}
}
return services;
}
}
第二步 :定义发布接口与订阅接口的中间类 InternalMessageCenter
然后在写一个定义发布接口与订阅接口的中间类 InternalMessageCenter
internal sealed class InternalMessageCenter
{
private readonly ConcurrentDictionary<string, Func<string, object, Task>[]> MessageHandlerQueues = new ConcurrentDictionary<string, Func<string, object, Task>[]>();
private readonly ConcurrentBag<string> TypeMessageIdsRegisterTable = new ConcurrentBag<string>();
private InternalMessageCenter()
{
}
private static readonly Lazy<InternalMessageCenter> lazy = new Lazy<InternalMessageCenter>(() => new InternalMessageCenter());
private static readonly Lazy<InternalMessageCenter> InstanceLock = lazy;
internal static InternalMessageCenter Instance => InstanceLock.Value;
internal void Subscribe<T>(string messageId, params Action<string, object>[] messageHandlers)
{
Subscribe(typeof(T), messageId, messageHandlers);
}
internal void Subscribe<T>(string messageId, params Func<string, object, Task>[] messageHandlers)
{
Subscribe(typeof(T), messageId, messageHandlers);
}
internal void Unsubscribe(string messageId)
{
_ = MessageHandlerQueues.TryRemove(messageId, out _);
}
internal void Send(string messageId, object payload = null, bool isSync = false)
{
if (MessageHandlerQueues.TryGetValue(messageId, out var messageHandlers))
{
foreach (var eventHandler in messageHandlers)
{
if (isSync)
{
eventHandler(messageId, payload).GetAwaiter().GetResult();
}
else
{
eventHandler(messageId, payload);
}
}
}
}
internal void Subscribe(Type t, string messageId, params Action<string, object>[] messageHandlers)
{
if (messageHandlers == null || messageHandlers.Length == 0) return;
var handlers = messageHandlers.Select(u =>
{
Func<string, object, Task> handler = async (m, o) =>
{
u(m, o);
await Task.CompletedTask;
};
return handler;
});
Subscribe(t, messageId, handlers.ToArray());
}
internal void Subscribe(Type t, string messageId, params Func<string, object, Task>[] messageHandlers)
{
if (messageHandlers == null || messageHandlers.Length == 0) return;
var uniqueMessageId = $"{t.FullName}_{messageId}";
if (!TypeMessageIdsRegisterTable.Contains(uniqueMessageId))
{
TypeMessageIdsRegisterTable.Add(uniqueMessageId);
}
var isAdded = MessageHandlerQueues.TryAdd(messageId, messageHandlers);
if (!isAdded)
{
MessageHandlerQueues[messageId] = MessageHandlerQueues[messageId].Concat(messageHandlers).ToArray();
}
}
}
第三步:定义消息订阅特性
定义一个消息订阅特性SubscribeMessageAttribute
,用于指定此方法已经订阅,上面的扩展类中拿到此特性进行处理了的
[AttributeUsage(AttributeTargets.Method, AllowMultiple = true)]
public class SubscribeMessageAttribute : Attribute
{
public SubscribeMessageAttribute(string messageId)
{
MessageId = messageId;
}
public string MessageId { get; set; }
}
第五步:定义一个handler接口
最后在定义一个handler接口,里面什么都不用写,用于标注此类是被订阅的类
public interface ISubscribeHandler
{
}
第六步:定义消息发布
目前是静态的,不太喜欢注入的方式
public static class MessageCenter
{
public static void Subscribe<T>(string messageId, params Action<string, object>[] messageHandlers)
{
InternalMessageCenter.Instance.Subscribe<T>(messageId, messageHandlers);
}
public static void Subscribe<T>(string messageId, params Func<string, object, Task>[] messageHandlers)
{
InternalMessageCenter.Instance.Subscribe<T>(messageId, messageHandlers);
}
public static void Send(string messageId, object payload = null, bool isSync = false)
{
InternalMessageCenter.Instance.Send(messageId, payload, isSync);
}
public static void Unsubscribe(string messageId)
{
InternalMessageCenter.Instance.Unsubscribe(messageId);
}
}
订阅消息
public class LogSubscribeHandler : ISubscribeHandler
{
[SubscribeMessage("update:logininfo")]
public void UpdateUserLoginInfo(string eventId, object payload)
{
}
[SubscribeMessage("create:oplog")]
public void CreateOpLog(string eventId, object payload)
{
Console.WriteLine(payload.ToString());
}
[SubscribeMessage("create:exlog")]
public void CreateExLog(string eventId, object payload)
{
}
}
最后注册服务
public void ConfigureServices(IServiceCollection services)
{
services.AddSimpleEventBus();
}
测试


MessageCenter.Send("create:oplog","hello word!");
https://gitee.com/Pridejoy/Bing