흑우마스터의 마법의 공간

RabbitMQ 시작하기 본문

프로그래밍/RabbitMQ

RabbitMQ 시작하기

흑우마스터 2023. 3. 23. 16:30

RabbitMQ는 응용 프로그램이 메시지 큐를 통해 서로 통신할 수 있도록 하는 메시징 브로커입니다.

AMQP(Advanced Message Queuing Protocol)를 사용하여 애플리케이션 간에 메시지를 전송합니다.

RabbitMQ에서 메시지는 임시 저장 위치인 대기열로 전송됩니다.

그런 다음 소비자는 메시지를 처리할 준비가 되면 대기열에서 메시지를 검색할 수 있습니다.

이를 통해 애플리케이션의 분리가 가능하고 애플리케이션 간의 비동기 통신이 가능합니다.

대기열 외에도 RabbitMQ는 교환도 사용합니다.

Exchange는 생산자로부터 메시지를 받은 다음 일련의 라우팅 규칙에 따라 메시지를 하나 이상의 대기열로 라우팅합니다.

RabbitMQ에는 4가지 유형의 교환이 있습니다.

  1. 직접 교환: 메시지는 특정 라우팅 키를 기반으로 대기열로 라우팅됩니다.
  2. 주제 교환: 메시지는 라우팅 키와 일치하는 패턴을 기반으로 하나 이상의 대기열로 라우팅됩니다.
  3. 팬아웃 교환: 메시지는 교환에 연결된 모든 대기열로 라우팅됩니다.
  4. 헤더 교환: 메시지는 메시지 헤더 값에 따라 하나 이상의 대기열로 라우팅됩니다.

전반적으로 RabbitMQ는 애플리케이션 간에 안정적인 통신을 가능하게 하는 데 사용할 수 있는 강력한 메시징 기술입니다.

대기열과 교환을 사용하여 분산 시스템 구축을 위한 유연하고 확장 가능한 솔루션을 제공합니다.

 

.NET Core에서 구현하기 위해서는 RabbitMQ.Client 를 Nuget 등을 통해 추가하고 다음과 같이 접속 정보을 위한 기본 연결 코드를 작성해야 합니다.

var factory = new ConnectionFactory() 
{ 
    HostName = "도메인명", 
    Port = 포트번호, 
    UserName = "계정명", 
    Password = "패스워드" 
}; 

using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

 

메시지를 생성하고 큐에 전달하는 Producer(프로듀서)는 다음과 같은 코드를 작성할 수 있습니다 

여기에서는 Fanout을 다루겠습니다

channel.QueueDeclare(queue: "myqueue", durable: false, exclusive: false, autoDelete: false, arguments: null);

string message = "Hello, RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "", routingKey: "myqueue", basicProperties: null, body: body);

Console.WriteLine("Sent message: {0}", message);

 

myquere 라는 큐 이름 정의하고 BasicPublish를 통해 메시지를 RabbitMQ 서버로 보내 큐에 적재합니다. 

 

우선 QuereDeclare에 대해 다뤄보겠습니다. 아래와 같은 인자를 보내 큐를 세팅할 수 있습니다.

 

  • queue: 선언할 큐의 이름.
  • durable: 대기열이 브로커 재시작 후에도 유지되어야 하는지 여부. 이 경우 브로커가 다시 시작되면 큐가 삭제된다는 의미인 'false'로 설정했습니다.
  • exclusive: 대기열이 현재 연결에 독점적이어야 하는지 여부. 여러 연결이 대기열에 액세스할 수 있음을 의미하는 false로 설정했습니다.
  • autoDelete: 마지막 소비자가 구독을 취소할 때 대기열을 삭제할지 여부. 'false'로 설정했는데, 이는 소비자가 없더라도 대기열이 남아 있음을 의미합니다.
  • arguments: 대기열을 선언하기 위한 선택적 인수. 이 경우 기본 인수를 사용한다는 의미인 null로 설정했습니다.

 

그리고 BasicPublish에 대해서도 다뤄보겠습니다.

 

string message = "Hello, RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "", routingKey: "myqueue", basicProperties: null, body: body);

 

  • exchange: 메시지를 게시할 교환의 이름입니다. 이 경우 빈 문자열을 사용하여 메시지를 "myqueue" 대기열로 직접 라우팅하는 기본 교환을 나타냅니다.
  • routingKey: 메시지를 게시할 때 사용할 라우팅 키입니다. 이 경우 게시하려는 대기열의 이름과 일치하는 라우팅 키로 "myqueue"를 사용하고 있습니다.
  • basicProperties: 선택적 메시지 속성입니다. 이 경우 기본 속성을 사용한다는 의미인 null로 설정합니다.
  • body: 바이트 배열로 인코딩된 메시지 본문입니다. 이 경우 "Hello, RabbitMQ!" UTF-8 바이트 배열로 메시지.

 

QuereDeclare를 통해 큐를 정의하고 BasicPublish를 통해 메시지를 발송 하는 것까지 완료 하였습니다. 이제 수신하는 Consumer에 대해 다뤄보겠습니다.

 

channel.QueueDeclare(queue: "myqueue", durable: false, exclusive: false, autoDelete: false, arguments: null);

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine("Received message: {0}", message);
};

channel.BasicConsume(queue: "myqueue", autoAck: true, consumer: consumer);

 

동일한 방법으로 큐를 정의하고 직접 메시지를 처리할 수 있는 기본 이벤트 객체 EventingBasicConsumer 인스턴스를 만듭니다.  인스턴스 consumer는 이벤트를 수신할 수 있는 핸들러이며 메시지가 대기열에 도착할 때마다 트리거 됩니다

 

이벤트 핸들러의 내부 코드에 ea 객체를 통해 메시지 본문을 추출하여 콘솔에 쓸 수 있습니다.

 

여기까지는 Consume가 메시지를 수신 할 수 있도록 하는 정의 단계였고 이제 그 큐에서 메시지에 대한 핸들러가 동작할 수 있도록 하기 위해 BasicConsume 메서드를 작성해줍니다.

 

autoAck가 true로 정의되어 있기 때문에 메시지가 수신 될 때마다 위에서 작성 된 Received 이벤트로 메시지를 받을 수 있습니다.

channel.BasicConsume(queue: "myqueue", autoAck: true, consumer: consumer);

 

대시보드에서 확인하면 아래와 같이 확인할 수 있으며 각 항목에 대해서도 알려 드립니다

  • Ready :  현재 소비자가 대기열에서 검색할 수 있는 메시지 수를 나타냅니다. 소비자에게 아직 전달되지 않았거나 소비자가 확인하지 않았기 때문에 대기열로 반환된 메시지입니다.
  • Unacked :  unacknowledged의 줄임말로서 소비자에게 전달되었지만 아직 확인되지 않은 메시지 수를 나타냅니다. 소비자가 메시지를 받으면 메시지를 성공적으로 처리했음을 나타내는 확인 메시지를 브로커에 다시 보내야 합니다. 일정 시간 내에 승인을 받지 못하면 메시지가 다른 소비자에게 다시 전달됩니다.
  • Total  : ReadyUnacked 메시지를 모두 포함하여 큐에 있는 총 메시지 수를 나타냅니다.