일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
- 구글지도
- ABLY
- 자마린
- SecureStorage
- 안드로이드
- hot reload
- iOS노치
- ASP.NET Web API
- c#
- 배포
- 망할
- 닷넷
- Cloudflare
- xcode13
- 구글맵
- 개발
- n8n
- 지도
- 흑우마스터
- AWS
- MAUI
- Xamarin Forms
- Xamarin
- 프로그래밍
- aws lambda
- vpc
- 비주얼스튜디오2022
- .net maui
- Android
- v
- Today
- Total
흑우마스터의 마법의 공간
RabbitMQ 시작하기 - Direct Exchange 본문
public class Worker : BackgroundService
{
private readonly IConnection _connection;
private readonly IModel _channel;
public Worker(IRabbitMQConnectionFactory factory)
{
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
// Declare the exchange and queue
_channel.ExchangeDeclare(exchange: "mydirectexchange", type: "direct");
_channel.QueueDeclare(queue: "myqueue", durable: false, exclusive: false, autoDelete: false, arguments: null);
// Bind the queue to the exchange with a routing key of "consumer1"
_channel.QueueBind(queue: "myqueue", exchange: "mydirectexchange", routingKey: "consumer1");
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (sender, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Received message: {message}");
// Acknowledge the message
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
_channel.BasicConsume(queue: "myqueue", autoAck: false, consumer: consumer);
return Task.CompletedTask;
}
public override void Dispose()
{
_channel.Close();
_connection.Close();
base.Dispose();
}
}
해당 샘플을 통해 직접 교환(Direct Type Exchange) 방식에 대해서 알아봅니다.
우선 Consumer에 해당하는 .NET Worker Service를 준비합니다. 이름이 mydirectexchange인 Direct Exchange과 이름이 myqueue인 단일 Quere가 있습니다. 또한 consumer1의 라우팅 키를 사용하여 Quere를 Exchange에 바인딩하고 있습니다.
생성자에서 Quere 및 Exchange을 선언하고 지정된 Routing Key를 사용하여 Quere을 Exchange에 바인딩 하는 구조입니다.
다만 1편과 달리 ExchangeDeclare가 등장했는데 QuereDeclare처럼 별도의 Exchange를 지정해야 될 때 사용 됩니다. 여기에서는 Direct 타입으로 선언하고 있습니다.
ExecuteAsync 메서드에서 들어오는 메시지를 처리하는 RabbitMQ 클라이언트 라이브러리에서 제공하는 클래스인EventingBasicConsumer의 새 인스턴스를 만듭니다.
메시지 본문을 문자열로 변환하고 콘솔에 작성하여 들어오는 각 메시지를 처리하는 Received 이벤트 핸들러를 등록하고 있습니다.
또한 Channel에서 BasicAck 메서드를 호출하여 각 메시지를 승인합니다.
마지막으로 Dispose 메서드에서는 Worker Service가 중지될 때 RabbitMQ에 대한 채널과 연결을 닫습니다.
이 구현 샘플에서는 Routing Key가 consumer1인 myqueue 대기열에서 수신하는 Consumer 인스턴스가 하나만 있다고 가정합니다. Routing Key가 서로 다른 Comsumer 인스턴스가 여러 개 있는 경우 각 Routing Key에 대해 별도의 Quere 및 이벤트 핸들러를 생성해야 합니다.
이제 Producer에 해당하는 코드를 살펴봅시다. 이번에는 ASP.NET Core REST API 형태로 준비했습니다
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using RabbitMQ.Client;
namespace MyApi.Controllers
{
[ApiController]
[Route("[controller]")]
public class MessagesController : ControllerBase
{
private readonly IConnection _connection;
private readonly IModel _channel;
public MessagesController(IConfiguration configuration, IRabbitMQConnectionFactory factory)
{
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
// Declare the exchange
_channel.ExchangeDeclare(exchange: "mydirectexchange", type: "direct");
}
[HttpPost]
public IActionResult Post([FromBody] MessageRequest request)
{
// Get the routing key from the request
var routingKey = request.RoutingKey;
// Create the message body as a byte array
var messageBody = Encoding.UTF8.GetBytes(request.Message);
// Publish the message to the exchange with the specified routing key
_channel.BasicPublish(exchange: "mydirectexchange", routingKey: routingKey, basicProperties: null, body: messageBody);
return Ok();
}
public override void Dispose()
{
_channel.Close();
_connection.Close();
base.Dispose();
}
}
public class MessageRequest
{
public string RoutingKey { get; set; }
public string Message { get; set; }
}
}
코드에서는 Worker Service 샘플 코드와 같은 이름의 Exchange인 mydirectexchange를 사용하고 있습니다. 또한 생성자에 IRabbitMQConnectionFactory 인스턴스를 주입하여 RabbitMQ에 대한 연결 및 채널을 생성합니다.
해당 샘플은 Controller에서 Post 메서드를 구현하고 있는데 API를 호출하는 곳에서 Routing Key와 본문이 포함 된 MessageRequest 객체를 수신합니다.
그 다음 지정 된 Routing Key를 사용하여 Exchange에 메시지를 게시합니다. 미리 정의 된 여러 개의 Routing Key가 존재하는 경우 적절한 분기문을 통해 라우팅을 정의할 수 있도록 하는 샘플입니다.
API에 호출 할 때 Worker Service가 인지 할 수 있도록 Consumer1를 Routing Key로 잡고 메시지를 보내면 해당 메시지와 함께 mydirectexchange로 메시지를 보내고 exchange에서는 Consumer1을 Routing Key로 잡고 있는 Worker Service에게 메시지를 전달합니다.
다만 이 경우 Routing Key가 노출 되어 원치 않는 동작을 하게 될 수 있기 때문에 관련 내용에 대해 보안 설정할 필요가 있습니다.
'프로그래밍 > RabbitMQ' 카테고리의 다른 글
RabbitMQ Client를 활용한 Worker Service 예제(Ver. BasicConsume) (0) | 2023.04.17 |
---|---|
Consumer에 BasicConsume, BasicGet 차이 (0) | 2023.04.17 |
RabbitMQ 시작하기 (0) | 2023.03.23 |