using Admin.Core.Common; using Autofac; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using Polly; using Polly.Retry; using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using System; using System.Net.Sockets; using System.Text; using System.Threading.Tasks; namespace Admin.Core.EventBus { /// /// 基于RabbitMQ的事件总线 /// public class EventBusRabbitMQ : IEventBus, IDisposable { const string BROKER_NAME = "Admincore_event_bus"; private readonly IRabbitMQPersistentConnection _persistentConnection; private readonly ILogger _logger; private readonly IEventBusSubscriptionsManager _subsManager; private readonly ILifetimeScope _autofac; private readonly string AUTOFAC_SCOPE_NAME = "Admincore_event_bus"; private readonly int _retryCount; private IModel _consumerChannel; private string _queueName; /// /// RabbitMQ事件总线 /// /// RabbitMQ持久连接 /// 日志 /// autofac容器 /// 事件总线订阅管理器 /// 队列名称 /// 重试次数 public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger logger, ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5) { _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); _queueName = queueName; _consumerChannel = CreateConsumerChannel(); _autofac = autofac; _retryCount = retryCount; _subsManager.OnEventRemoved += SubsManager_OnEventRemoved; } /// /// 订阅管理器事件 /// /// /// private void SubsManager_OnEventRemoved(object sender, string eventName) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueUnbind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); if (_subsManager.IsEmpty) { _queueName = string.Empty; _consumerChannel.Close(); } } } /// /// 发布 /// /// 事件模型 public void Publish(IntegrationEvent @event) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } var policy = RetryPolicy.Handle() .Or() .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => { _logger.LogWarning(ex, "Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})", @event.Id, $"{time.TotalSeconds:n1}", ex.Message); }); var eventName = @event.GetType().Name; _logger.LogTrace("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, eventName); using (var channel = _persistentConnection.CreateModel()) { _logger.LogTrace("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id); channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); var message = JsonConvert.SerializeObject(@event); var body = Encoding.UTF8.GetBytes(message); policy.Execute(() => { var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; // persistent _logger.LogTrace("Publishing event to RabbitMQ: {EventId}", @event.Id); channel.BasicPublish( exchange: BROKER_NAME, routingKey: eventName, mandatory: true, basicProperties: properties, body: body); }); } } /// /// 订阅 /// 动态 /// /// 事件处理器 /// 事件名 public void SubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler { _logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); DoInternalSubscription(eventName); _subsManager.AddDynamicSubscription(eventName); StartBasicConsume(); } /// /// 订阅 /// /// 约束:事件模型 /// 约束:事件处理器<事件模型> public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler { var eventName = _subsManager.GetEventKey(); DoInternalSubscription(eventName); _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); _subsManager.AddSubscription(); StartBasicConsume(); } private void DoInternalSubscription(string eventName) { var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); if (!containsKey) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); } } } /// /// 取消订阅 /// /// /// public void Unsubscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler { var eventName = _subsManager.GetEventKey(); _logger.LogInformation("Unsubscribing from event {EventName}", eventName); _subsManager.RemoveSubscription(); } public void UnsubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler { _subsManager.RemoveDynamicSubscription(eventName); } public void Dispose() { if (_consumerChannel != null) { _consumerChannel.Dispose(); } _subsManager.Clear(); } /// /// 开始基本消费 /// private void StartBasicConsume() { _logger.LogTrace("Starting RabbitMQ basic consume"); if (_consumerChannel != null) { var consumer = new AsyncEventingBasicConsumer(_consumerChannel); consumer.Received += Consumer_Received; _consumerChannel.BasicConsume( queue: _queueName, autoAck: false, consumer: consumer); } else { _logger.LogError("StartBasicConsume can't call on _consumerChannel == null"); } } /// /// 消费者接受到 /// /// /// /// private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs) { var eventName = eventArgs.RoutingKey; var message = Encoding.UTF8.GetString(eventArgs.Body.Span); try { if (message.ToLowerInvariant().Contains("throw-fake-exception")) { throw new InvalidOperationException($"Fake exception requested: \"{message}\""); } await ProcessEvent(eventName, message); } catch (Exception ex) { _logger.LogWarning(ex, "----- ERROR Processing message \"{Message}\"", message); } // Even on exception we take the message off the queue. // in a REAL WORLD app this should be handled with a Dead Letter Exchange (DLX). // For more information see: https://www.rabbitmq.com/dlx.html _consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false); } /// /// 创造消费通道 /// /// private IModel CreateConsumerChannel() { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } _logger.LogTrace("Creating RabbitMQ consumer channel"); var channel = _persistentConnection.CreateModel(); channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.CallbackException += (sender, ea) => { _logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel"); _consumerChannel.Dispose(); _consumerChannel = CreateConsumerChannel(); StartBasicConsume(); }; return channel; } private async Task ProcessEvent(string eventName, string message) { _logger.LogTrace("Processing RabbitMQ event: {EventName}", eventName); if (_subsManager.HasSubscriptionsForEvent(eventName)) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { var subscriptions = _subsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { if (subscription.IsDynamic) { var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; if (handler == null) continue; dynamic eventData = JObject.Parse(message); await Task.Yield(); await handler.Handle(eventData); } else { var handler = scope.ResolveOptional(subscription.HandlerType); if (handler == null) continue; var eventType = _subsManager.GetEventTypeByName(eventName); var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await Task.Yield(); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } } } else { _logger.LogWarning("No subscription for RabbitMQ event: {EventName}", eventName); } } } }