들어가기
GitHub - AmorGakCo/Backend: 모두 모여 각자 코딩 플랫폼 [Backend]
모두 모여 각자 코딩 플랫폼 [Backend]. Contribute to AmorGakCo/Backend development by creating an account on GitHub.
github.com
모각코 모집 플랫폼을 개발하면서 FCM을 이용한 WebPush와 SMS 알림 발송을 구현했습니다.
알림전송의 딜레이를 줄이기 위해 비동기 프로세스가 필요했고 안전한 비동기 프로세스 구축을 위해 RabbitMQ를 선택했습니다.
이 글에서는 알림 발송에 실패한 경우 재시도 전략과 데드레터 처리 과정을 정리해보려 합니다.
AMQP
RabbitMQ의 특징으로는 가장 먼저 AMQP기반의 메세징 미들웨어라고 할 수 있습니다.
AMQP는 시스템이 효율적으로 메세지를 주고 받기 위한 메세징 프로토콜입니다. HTTP가 웹 통신 규약이듯 AMQP는 메세지 통신 규약입니다.
그렇기 때문에 HTTP와 마찬가지로 정해진 규격이 존재하고 그 규격에 맞춰 통신하게 됩니다.
또한 RabbitMQ는 구성 요소로 Exchange, Binding, Queue, Routing, Publisher, Consumer 이 다섯가지가 가장 중요합니다.
해당 개념들은
AMQP 0-9-1 Model Explained | RabbitMQ
<!--
www.rabbitmq.com
이 곳에 아주 잘 정리돼 있습니다.
서버 구조
우선 GCP에 임시로 배포한 서버 구조부터 살펴 보겠습니다.
메인 서버에서 각종 알림 요청을 받고 RabbitMQ에 메세지를 생산하고 Notification Server에서 소비하는 전형적인 생산자/소비자 구조입니다.
RabbitMQ 구성
RabbitMQ의 Exchange, Queue, Binding 구조를 그림으로 살펴보겠습니다.
전체적인 구조는 위와 같습니다.
1. Main Server의 Producer가 Notification Exchange에 Routing Key를 지정해 메세지를 발행, Notification Exchange는 Direct Exchange 타입으로 FCM Queue에 라우팅
2. Consumer는 메세지를 소비합니다.
3. 예외 상황이 발생하면 메세지를 거부합니다.
4. 거부된 메세지는 FCMQueue의 DLX(Dead Letter Exchange)를통해 FCM Dead Letter Queue에 라우팅됩니다.
5. FCM Dead Letter Queue 에서는 재시도 회수를 체크하여 임계값을 넘었다면 Slack에 알림을 주고 임계값을 넘지 않았다면 FCM Dealy Queue에 전달합니다.
6. FCM Delay Queue에서는 정해진 TTL만큼 기다린뒤에 TTL이 만료되면 FCM Delay Queue의 데드레터 큐인 FCM Queue에 전달돼 재시도를 수행합니다.
Producer 설정 (Spring AMQP)
먼저 Exchange, Queue, Binding 설정부터 살펴보겠습니다.
@EnableRabbit
@Configuration
@RequiredArgsConstructor
@Profile("!test")
public class RabbitMQConfig {
private final RabbitMQProperties properties;
private static final int FCM_DELAY_TTL = 1000;
@Bean
public RabbitAdmin rabbitAdmin(final ConnectionFactory connectionFactory) {
final RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.declareQueue(fcmQueue());
rabbitAdmin.declareQueue(fcmDelayQueue());
rabbitAdmin.declareQueue(fcmDeadLetterQueue());
rabbitAdmin.declareExchange(notificationExchange());
rabbitAdmin.declareExchange(delayExchange());
rabbitAdmin.declareExchange(deadLetterExchange());
rabbitAdmin.declareBinding(bindingFcmQueue());
rabbitAdmin.declareBinding(bindingFcmDeadLetterQueue());
rabbitAdmin.declareBinding(bindingFcmDelayQueue());
return rabbitAdmin;
}
@Bean
public Queue fcmQueue() {
return QueueBuilder.durable(QueueName.FCM_QUEUE.getName())
.deadLetterRoutingKey(RoutingKey.NOTIFICATION_FCM_DEAD_LETTER.getKey())
.deadLetterExchange(ExchangeName.NOTIFICATION_DEAD_LETTER.getName())
.build();
}
@Bean
public Queue fcmDelayQueue(){
return QueueBuilder.durable(QueueName.FCM_DELAY_QUEUE.getName())
.deadLetterExchange(ExchangeName.NOTIFICATION.getName())
.deadLetterRoutingKey(RoutingKey.NOTIFICATION_FCM.getKey())
.ttl(FCM_DELAY_TTL)
.build();
}
@Bean
public Queue fcmDeadLetterQueue(){
return new Queue(QueueName.FCM_DEAD_LETTER_QUEUE.getName(),true);
}
@Bean
public DirectExchange notificationExchange() {
return new DirectExchange(ExchangeName.NOTIFICATION.getName());
}
@Bean
public DirectExchange delayExchange() {
return new DirectExchange(ExchangeName.NOTIFICATION_DELAY.getName());
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(ExchangeName.NOTIFICATION_DEAD_LETTER.getName());
}
@Bean
public Binding bindingFcmQueue() {
return BindingBuilder.bind(fcmQueue())
.to(notificationExchange())
.with(RoutingKey.NOTIFICATION_FCM.getKey());
}
@Bean
public Binding bindingFcmDelayQueue() {
return BindingBuilder.bind(fcmDelayQueue())
.to(delayExchange())
.with(RoutingKey.NOTIFICATION_FCM_DELAY.getKey());
}
@Bean
public Binding bindingFcmDeadLetterQueue() {
return BindingBuilder.bind(fcmDeadLetterQueue())
.to(deadLetterExchange())
.with(RoutingKey.NOTIFICATION_FCM_DEAD_LETTER.getKey());
}
@Bean
public RabbitTemplate rabbitTemplate(
final ConnectionFactory connectionFactory, final MessageConverter messageConverter) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
@Bean
public ConnectionFactory connectionFactory() {
final CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(properties.host());
connectionFactory.setPort(properties.port());
connectionFactory.setUsername(properties.username());
connectionFactory.setPassword(properties.password());
return connectionFactory;
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
위 설정에서 가장 중요한 부분은 FCM Delay Queue의 데드레터로 FCM Queue를 설정해준 부분이고 FCM Queue의 데드레터로 FCM Dead Letter Queue를 지정한 부분입니다.
데드레터로 이동하는 기준은 3가지입니다.
- TTL 만료
- Baisc.nack , Basic.reject 로 메세지 거부
- 큐에 설정된 메세지의 Max Length를 초과할 경우
@Component
@RequiredArgsConstructor
public class FcmPublisher implements Publisher {
private final RabbitTemplate rabbitTemplate;
private final FcmTokenRepository fcmTokenRepository;
@Override
public void publish(final Notification notification) {
final Member receiver = notification.getReceiver();
fcmTokenRepository
.findById(receiver.getId().toString())
.ifPresent(
token -> convertAndSend(notification, token.getToken()));
}
private void convertAndSend(final Notification notification, final String token) {
rabbitTemplate.convertAndSend(
ExchangeName.NOTIFICATION.getName(),
RoutingKey.NOTIFICATION_FCM.getKey(),
FcmMessageRequest.builder()
.title(notification.getTitle())
.notificationId(notification.getId())
.content(notification.getContent())
.token(token)
.build());
}
}
Producer는 fcm 토큰을 Redis에서 조회한 뒤에 DTO를 생성해 메세지 브로커로 전송하게됩니다.
Consumer 설정
@Configuration
@EnableRabbit
@RequiredArgsConstructor
public class RabbitMQConfig {
private final RabbitMQProperties properties;
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
final SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPrefetchCount(0);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(5);
return factory;
}
@Bean
public ConnectionFactory connectionFactory() {
final CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(properties.host());
connectionFactory.setPort(properties.port());
connectionFactory.setUsername(properties.username());
connectionFactory.setPassword(properties.password());
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(
final ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
}
@Component
@RequiredArgsConstructor
@Slf4j
public class FcmConsumer {
private final ObjectMapper objectMapper;
@RabbitListener(queues = "fcm")
public void send(
String fcmMessage,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
final Channel channel)
throws IOException, FirebaseMessagingException {
FcmMessageRequest fcmMessageRequest =
objectMapper.readValue(fcmMessage, FcmMessageRequest.class);
final Message message = createFcmMessage(fcmMessageRequest);
try{
FirebaseMessaging.getInstance().send(message);
}catch (FirebaseMessagingException e){
log.info("RabbitMQ Nacked FCM Notification : {}",fcmMessage);
channel.basicNack(deliveryTag,false,false);
}
}
public Message createFcmMessage(final FcmMessageRequest request) {
return Message.builder()
.setWebpushConfig(
WebpushConfig.builder()
.setNotification(
WebpushNotification.builder()
.setTitle(request.getTitle())
.setBody(request.getContent())
.build())
.build())
.setToken(request.getToken())
.build();
}
}
JSON 메세지를 수신하여 DTO로 변환한 뒤에 실제 FCM에 메세지를 전송하는 과정입니다.
FCM Message를 생성하여 전송하고 만약 실패할 경우 channel에 Nack을 전송하게 됩니다.
Channel 이란?
RabbitMQ는 기본적으로 물리적인 TCP Connection을 맺고 하나의 커넥션 안에 Channel이라는 논리커넥션을 사용하게 됩니다.
TCP Connection이 맺고 끊을 때 비용이 많이 발생하기 때문이고 심지어 CachingConnectionFactory는 이 Channel을 재사용하게됩니다. 마치 DB 커넥션을 커넥션 풀에 모아놓고 사용하듯이 채널도 채널 풀에 모아놓고 사용하게됩니다.
FCM Delay Queue Consumer
@RequiredArgsConstructor
@Component
@Slf4j
public class FcmDeadLetterConsumer {
private static final Integer RETRY_THRESHOLD = 2;
private final RabbitTemplate rabbitTemplate;
private final SlackSender slackSender;
private final ObjectMapper objectMapper;
@RabbitListener(queues = "fcm.deadletter")
public void send(Message fcmMessage) throws IOException {
FcmMessageRequest fcmMessageRequest =
objectMapper.readValue(fcmMessage.getBody(), FcmMessageRequest.class);
Integer retryCount = getRetryCount(fcmMessage);
if (retryCount > RETRY_THRESHOLD) {
sendSlack(fcmMessageRequest, retryCount);
} else {
fcmMessage.getMessageProperties().getHeaders().put("x-retries-count", ++retryCount);
rabbitTemplate.convertAndSend(
ExchangeName.NOTIFICATION_DELAY.getName(),
RoutingKey.NOTIFICATION_FCM_DELAY.getKey(),
fcmMessage);
}
}
private Integer getRetryCount(final Message fcmMessage) {
return Optional.ofNullable(
(Integer)
fcmMessage
.getMessageProperties()
.getHeaders()
.get("x-retries-count"))
.orElse(1);
}
public void sendSlack(final FcmMessageRequest failedMessage, final Integer retryCount) {
slackSender.sendFailedMessage(failedMessage.toString(), retryCount);
}
}
테스트
예외가 발생할 경우 데드레터 큐와 딜레이 큐를 잘 거치는지 확인하기 위해서 FCM Consumer를 약간 수정했습니다.
@RabbitListener(queues = "fcm")
public void send(
String fcmMessage,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
final Channel channel)
throws IOException, FirebaseMessagingException {
FcmMessageRequest fcmMessageRequest =
objectMapper.readValue(fcmMessage, FcmMessageRequest.class);
final Message message = createFcmMessage(fcmMessageRequest);
if (fcmMessageRequest.getNotificationId() % 2 == 0) {
try {
throw new NotificationException();
} catch (NotificationException e) {
channel.basicNack(deliveryTag, false, false);
}
} else {
FirebaseMessaging.getInstance().send(message);
}
}
NotificationId가 짝수일 때 의도적으로 예외를 발생시켜 메세지를 nack하겠습니다.
정상 수신
비정상 수신 [Consumer basic.nack]
3회 시도 끝에 실패하게되면 Slack으로 알림이 잘 오는 것을 볼 수 있습니다.
이 방식이 최선인가?
우선 재시도 전략을 구축할 때 가장 먼저 생각해야할 부분은 "과연 재시도가 필요한 로직인가?"를 먼저 고민해야할 것 같습니다.
위 방식의 재시도 매커니즘은 Delay Queue와 Dead Letter Queue가 별도로 필요합니다. 그만큼 관리 포인트도 늘어나고 리소스를 소모하게 됩니다.
때론 예외가 발생하면 적절한 예외 응답을 내려주는 것이 좋을 수 있습니다.
또한 재시도 전략엔 위 방식만 존재하는 것은 아닙니다. 위 방식은 큐 자체에 TTL를 걸기 때문에 TTL을 변경하고싶다면 큐를 다시 선언해야하는 단점이 있습니다. 또한 큐에 TTL을 걸기때문에 메세지 별로 TTL을 걸거나 TTL에 세밀한 변화를 주기 어렵습니다.
Requeue = true
가장 간단한 방법으로는 basic.nack 이후에 requeue 플래그를 true로 주어 바로 FCM Queue에 메세지를 다시 집어 넣을 수도 있습니다.
다만 지속적인 requeue가 발생해 FCM Queue에 메세지가 지속적으로 쌓일 수 있기 때문에 주의해야합니다.
channel.basicNack(deliveryTag, false, true); // 3번째 파라미터가 requeue 파라미터
메세지 자체에 TTL 걸기
메세지 자체에 TTL을 걸면 재시도 횟수가 늘어나면 Exponential하게 TTL을 늘리는 등 세밀한 제어가 가능할 수 있습니다.
기존에 그렸던 도식과 조금 차이가 있습니다.
Consumer가 메세지를 basic.nack하지 않고 Delay Queue에 메세지에 TTL을 걸고 Publish하는 방식입니다.
TTL이 만료되면 Delay Queue의 데드레터인 FCM Queue에 들어가게됩니다.
다만 메세지 별로 expiration 를 걸게 됐을 때 문제가 발생할 수 있는데요.
공식문서에 따르면 만료된 메세지는 큐의 헤드에 다달아야 삭제된다고 합니다.
즉 만료된 메세지가 존재한다 하더라도 해당 메세지 앞에 만료를 기다리는 메세지가 존재하면 기다려야한다는 것입니다.
반면 큐 자체에 TTL을 걸게되면 만료 그 즉시 삭제를 시도합니다.