흑우마스터의 마법의 공간

RabbitMQ 시작하기 - Direct Exchange 본문

프로그래밍/RabbitMQ

RabbitMQ 시작하기 - Direct Exchange

흑우마스터 2023. 3. 23. 18:18
 public class Worker : BackgroundService
    {
        private readonly IConnection _connection;
        private readonly IModel _channel;

        public Worker(IRabbitMQConnectionFactory factory)
        {
            _connection = factory.CreateConnection();
            _channel = _connection.CreateModel();

            // Declare the exchange and queue
            _channel.ExchangeDeclare(exchange: "mydirectexchange", type: "direct");
            _channel.QueueDeclare(queue: "myqueue", durable: false, exclusive: false, autoDelete: false, arguments: null);
            
            // Bind the queue to the exchange with a routing key of "consumer1"
            _channel.QueueBind(queue: "myqueue", exchange: "mydirectexchange", routingKey: "consumer1");
        }

        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            stoppingToken.ThrowIfCancellationRequested();

            var consumer = new EventingBasicConsumer(_channel);

            consumer.Received += (sender, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);

                Console.WriteLine($"Received message: {message}");

                // Acknowledge the message
                _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };

            _channel.BasicConsume(queue: "myqueue", autoAck: false, consumer: consumer);

            return Task.CompletedTask;
        }

        public override void Dispose()
        {
            _channel.Close();
            _connection.Close();

            base.Dispose();
        }
    }

해당 샘플을 통해 직접 교환(Direct Type Exchange) 방식에 대해서 알아봅니다.

 

우선 Consumer에 해당하는 .NET Worker Service를 준비합니다. 이름이 mydirectexchange인 Direct Exchange과 이름이 myqueue인 단일 Quere가 있습니다. 또한  consumer1의 라우팅 키를 사용하여 Quere를 Exchange에 바인딩하고 있습니다.

생성자에서 Quere 및 Exchange을 선언하고 지정된 Routing Key를 사용하여 Quere을 Exchange에 바인딩 하는 구조입니다.

 

다만 1편과 달리 ExchangeDeclare가 등장했는데 QuereDeclare처럼 별도의 Exchange를 지정해야 될 때 사용 됩니다. 여기에서는 Direct 타입으로 선언하고 있습니다.

 

ExecuteAsync 메서드에서 들어오는 메시지를 처리하는 RabbitMQ 클라이언트 라이브러리에서 제공하는 클래스인EventingBasicConsumer의 새 인스턴스를 만듭니다.

 

메시지 본문을 문자열로 변환하고 콘솔에 작성하여 들어오는 각 메시지를 처리하는 Received 이벤트 핸들러를 등록하고 있습니다.

 

또한 Channel에서 BasicAck 메서드를 호출하여 각 메시지를 승인합니다.

 

마지막으로 Dispose 메서드에서는 Worker Service가 중지될 때 RabbitMQ에 대한 채널과 연결을 닫습니다.

 

이 구현 샘플에서는 Routing Key가 consumer1myqueue 대기열에서 수신하는 Consumer 인스턴스가 하나만 있다고 가정합니다. Routing Key가 서로 다른 Comsumer 인스턴스가 여러 개 있는 경우 각 Routing Key에 대해 별도의 Quere 및 이벤트 핸들러를 생성해야 합니다.

 

 

이제 Producer에 해당하는 코드를 살펴봅시다. 이번에는 ASP.NET Core REST API 형태로 준비했습니다

 

using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using RabbitMQ.Client;

namespace MyApi.Controllers
{
    [ApiController]
    [Route("[controller]")]
    public class MessagesController : ControllerBase
    {
        private readonly IConnection _connection;
        private readonly IModel _channel;

        public MessagesController(IConfiguration configuration, IRabbitMQConnectionFactory factory)
        {
            _connection = factory.CreateConnection();
            _channel = _connection.CreateModel();

            // Declare the exchange
            _channel.ExchangeDeclare(exchange: "mydirectexchange", type: "direct");
        }

        [HttpPost]
        public IActionResult Post([FromBody] MessageRequest request)
        {
            // Get the routing key from the request
            var routingKey = request.RoutingKey;

            // Create the message body as a byte array
            var messageBody = Encoding.UTF8.GetBytes(request.Message);

            // Publish the message to the exchange with the specified routing key
            _channel.BasicPublish(exchange: "mydirectexchange", routingKey: routingKey, basicProperties: null, body: messageBody);

            return Ok();
        }

        public override void Dispose()
        {
            _channel.Close();
            _connection.Close();

            base.Dispose();
        }
    }

    public class MessageRequest
    {
        public string RoutingKey { get; set; }
        public string Message { get; set; }
    }
}

 

코드에서는 Worker Service 샘플 코드와 같은 이름의 Exchange인 mydirectexchange를 사용하고 있습니다.  또한 생성자에 IRabbitMQConnectionFactory 인스턴스를 주입하여 RabbitMQ에 대한 연결 및 채널을 생성합니다.

 

해당 샘플은 Controller에서 Post 메서드를 구현하고 있는데 API를 호출하는 곳에서 Routing Key와 본문이 포함 된 MessageRequest 객체를 수신합니다.

 

그 다음 지정 된 Routing Key를 사용하여 Exchange에 메시지를 게시합니다. 미리 정의 된 여러 개의 Routing Key가 존재하는 경우 적절한 분기문을 통해 라우팅을 정의할 수 있도록 하는 샘플입니다.

 

API에 호출 할 때 Worker Service가 인지 할 수 있도록 Consumer1를 Routing Key로 잡고 메시지를 보내면 해당 메시지와 함께 mydirectexchange로 메시지를 보내고 exchange에서는 Consumer1을 Routing Key로 잡고 있는 Worker Service에게 메시지를 전달합니다. 

 

다만 이 경우 Routing Key가 노출 되어 원치 않는 동작을 하게 될 수 있기 때문에 관련 내용에 대해 보안 설정할 필요가 있습니다.