프로그래밍/RabbitMQ
RabbitMQ Client를 활용한 Worker Service 예제(Ver. BasicConsume)
흑우마스터
2023. 4. 17. 12:07
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace MyWorkerService
{
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private IConnection _connection;
private IModel _channel;
private EventingBasicConsumer _consumer;
public Worker(ILogger<Worker> logger)
{
_logger = logger;
InitRabbitMQ();
}
private void InitRabbitMQ()
{
var factory = new ConnectionFactory()
{
HostName = "호스트 주소",
Port = 포트번호
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.QueueDeclare(queue: "큐 이름",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
_channel.BasicQos(0, 1, false);
_consumer = new EventingBasicConsumer(_channel);
_consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
// Call your REST API here with the received message
// Acknowledge the message
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
//처리가 되어야지만 Ack를 해줌
};
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
_channel.BasicConsume(queue: "큐 이름",
autoAck: false, //수신하자마자 ack 처리 하지 않도록
consumer: _consumer);
while (!stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
await Task.Delay(1000, stoppingToken);
}
}
public override void Dispose()
{
_channel.Close();
_connection.Close();
base.Dispose();
}
}
}
1초에 한번 큐에서 남아있는 큐에서 가져오는 건 동일하지만 BasicConsume을 쓰게 되면 주기는 같아도 가져오는 수량(남아있던 큐 메시지)이 다르고 BasicGet의 경우에는 딱 한 개씩 처리가 끝나는 경우에만 가져온다.
만약 복수의 Consumer 클라이언트가 존재한다면 BasicConsume을 쓰는 개체가 한번에 큐를 가져가기 때문에 BasicGet을 써서 나눠서 가져가는 것이 낫지 않을까 생각