수신함 구축
- 라우팅 전략은 메시지가 라우팅될 큐(혹은 다수의 큐)를 결정한다.
라우팅 전략은 자유로운 형식의 문자열로 구성된 라우팅 키와 잠재적인 메시지 메타 정보에 근거로 두고 있다.
여기에서 고려하는 사용자 간 메시징 시스템(user-to-user messaging system)에서는, 메시지를 수신자의 수신함 역할을 담당하는 큐로 라우팅해야 한다.
따라서 사용해야 하는 익스체인지 라우팅 전략은 다이렉트 방식이다.
그림과 같이 목적지 큐의 이름을 메시지 생성 시 사용했던 라우팅 키와 짝을 맞추는 방식이다.
애플리케이션에 메시징 로직을 접목시키기 위해 자바스크립트 프론트엔드와 자바 백엔드 사이에 이미 존재하는 폴링(Polling)방식을 따를 것이다. 그러나 이는 가장 효율적인 접근법은 아니지만 추후 변경예정이다.
AJAX 방식으로 호췰되며 메시지는 JSON 객체로 표시된다.
메시지 텍스트 내용 상단에 타임스탬프(timestamp), 송신자(sender), 수신자 아이디(receiver ID)같은 메타 정보를 포함할것이다.
// UserMessageManager class
public class UserMessageManager{
static final String USER_INBOXES_EXCHANGE = "user-inboxes";
@Inject
RabbitMqManager rabbitMqManager;
public void onApplicationStart(){
rabbitMqManager.call(new ChannelCallable<DeclareOk>() {
@Override
public String getDescription(){
return "Declaring direct exchange: "+USER_INBOXES_EXCHANGE;
}
@Override
public DeclareOk call(final Channel channel) throws IOException {
String exchange = USER_INBOXES_EXCHANGE;
String type = "direct";
boolean durable = true;
boolean autoDelete = false;
Map<String, Object> arguments = null;
return channel.exchangeDeclare(exchange, type, durable, autoDelete, arguments);
}
});
}
}
의존성을 주입을 토해 RabbitMqManager 인스턴스를 받은 후에 onApplicationStart 함수를 생성한다.
onApplicationStart 는 애플리케이션 서버를 시작할때마다 매번 호출한다. 이 함수는 사용자 간에 메시지를 발행하는 익스체인지를 선언하는 일이 전부다.
-> 익스체인지 선언은 속성이 동일하면 멱등성(idempotent)을 지닌다. 따라서 다른 속성으로 이미 존재하는 익스체인지를 선언하려고 하면 실패할 것이다. 항상 익스체인지 선언에는 동일한 속성을 사용해야 한다. 속성을 변경하고자 한다면 새로운 속성으로 익스체인지를 선언하기 전에 기존의 익스체인지를 삭제해야 한다. 이러한 규칙은 큐 선언에도 적용된다.
// 수신함큐를 생성하고 익스체인지에 결합
public void onUserLogin(final long userID){
final String queue = getUserInboxQueue(userID);
rabbitMqManager.call(new ChannelCallable<BindOk>(){
@Override
public String getDescription(){
return "Declaring user queue: "+queue+", binding it to exchange: "+USER_INBOXES_EXCHANGE;
}
@Override
public BindOk call(final Channel channel) throws IOException{
boolean durable = true;
boolean autoDelete = false;
boolean exclusive = false;
Map<String, Object> arguments = null;
channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
String routingKey = queue;
retrun channel.queueBind(queue, USER_INBOXES_EXCHANGE, routingKey);
}
});
}
사용자가 매번 시스템에 로그인할 때마다 애플리케이션은 onUserLogin을 호출한다.
getUserInboxQueue에서 수신자의 큐 이름을 얻은 후에 declareUserMessageQueue를 호출한다.
declareUserMessageQueue 함수에서는 큐는 익스체인지와 거의 유사한 방식으로 선언하는데, 다음과 같이 약간 다른 속성을 갖는다.
- durable : 브로커를 재식작한 후에도 큐를 선언한 사애톨 유지하기 위해 true로 설정한다.
- autoDelete : 큐에서 소비할 대상이 없더라도 큐를 유지하기 위해 fasle로 설정한다.
- exclusive : 다른 연결에서 큐를 사용하기 위해 false로 설정한다.
- arguments : 사용자 정의 큐를 구성할 필요가 없으므로 NULL로 설정한다.
그리고 나서 큐는 다이렉트 라우팅 전략이 메시지를 큐로 전송시킬 수 있도록 라우팅 키처럼 자신의 큐 이름을 사용해 익스체인지에 큐를 결합한다. 이작업이 완료되면 user-inboxes 익스체인지에 메시지를 발행해서, 실제로 발행된 라우팅 키와 일치하는 이름을 가진 사용자 큐에 메시지를 전달할 수 있다.
익스 체인지에 결합된 큐가 없거나 라우팅 전략이 일치하는 목적지 큐를 찾지 못할 경우 익스체인지에 발행된 메시지는 아무 통보 없이 폐기될것이다. 라우팅되지 않은 메시지가 폐기될 때 선택사항으로 통지를 받을 수도 있다. 이것은 다음장에 살펴볼것이다. |
사용자에게 미시지 보내기
static final String MESSAGE_CONTENT_TYPE = "application/vnd.ccm.pmsg.vl+json";
static final String MESSAGE_ENCODING = "UTF-8";
public String sendUserMessage(final long userId, final String jsonMessage){
return rabbitMqManager.call(new ChannelCallable<String>(){
@Override
public String getDescription(){
return "Sending message to user: " + userId;
}
@Override
public String call (final Channel channel) throws IOException{
String queue = getUserInboxQueue(userId);
// 큐가 존재하지 않으면 선언한다.
declareUserMessageQueue(queue, channel);
String messageId = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties.Builder()
.contentType(MESSAGE_CONTENT_TYPE)
.contentEncoding(MESSAGE_ENCODING)
.messageId(messageId)
.deliveryMode(2)
.build();
String routingKey = queue;
// 다이렉트 익스체인지에 메시지를 발행한다.
channel.basicPublish(USER_INBOXES_EXCHANGE, routingKey, props, jsonMessage.getByte(MESSAGE_ENCODING));
return messageId;
}
});
}
이제 declareUserMessageQueue 함수가 onUserLogin에서 분리된 이유를 알아보자.
사용자가 다른 사용자에게 메시지 보낼 때 마다 sendUserMessage를 호출하고 있다.
수신이 언제나 시스템에 로그인되어 있음을 보장할 수 없으니 송신자에 한해서는 목적지 큐가 확실하게 존재하는지 확인하는것은 불가능하다. 따라서 가장 안전한 행위는 메시지를 보낼때마다 큐를 선언하는 거싱며, 이때 연산은 멱등성을 가진다는 점에 유의해야 한다. 그래서 큐가 존재한다면 아무 일도 하지 않을 것이다.
이상할수 있지만, 송신자는 메시지 손실을 방지하기 위해 큐가 존재하는지 확인해야 할 책임을 갖는다.
이는 AMQP의 일반적인 패턴이다. 이벤트 간 앞서 발생(happens before)하는 관계에 강제성이 없다면, 멱등성을 이용하여 재선언하는 것이 최선의 방법이다. 반대로 확인하고 행동(check then act)하는 패턴은 권하지 않는다. AMQP를 사용하는 전형적인 분산 환경에서 익스체인지나 큐의 존재 여부를 확인하는 작업은 어떠한 보장도 할 수 없기 때문이다. |
메시지를 발행하는 방법은 매우 간단하다.
다이렉트 라우팅처럼 큐 이름을 라우팅 키로 사용해서 user0inboxes 익스체인지로 basicPublish 함수를 호출한다.
호출 시엔 실제 메시지 페이로드 나타내는 바이트 배열과 일부 메시지 속성을 선택적으로 사용한다.
메시지 관련 메시지 속성을 상세히 살펴보자
- contentType : 발행된 메시지는 바이트(byte) 배열로 소비되는데, 어떤 것도 이들 바이트가 정말로 나타내는 것이 무엇인지는 말해주지 않는다.
- contentEncoding : 문자열 메시지 발행을 위해 문자열을 바이트 배열로 직렬화할 때 명시적으로 UTF-8 인코딩을 사용한다. 메시지가 추가 설명 없이도 스스로 해독 가능하게 하기 위해 메시지를 판독할 수 있는 필요한 모든 메타 정보를 제공한다.
- messageID : 메시지 식별자는 메시징과 분산 애플리케이션에서 추적성(traceability)을 위한 중요한 요소다. 우선 지금은 메시지마다 고유 식별자를 갖길 원한다고 가정하고, 식별자처럼 생성할 수 있는 UUID를 사용한다.
- deliveryMode : 속성 값을 단순히 숫자 2로 설정했기 때문에 아마 가장 이해하기 어려웠을 것이다. AMQP 명세서는 해당 속성 값을 다음과 같이 정의하고 있다. 비영속성(Non-persistent)은 1로 설정하고 영속성(Persistent)은 2로 설정한다.
사실, 뭐든 상관없이 메시지가 소실되지 않도록 RabbitMQ 브로커가 디스크에 메시지를 기록할 것이라는 보장을 원한다.
익스체인지와 큐의 내구성을 메시지 역속성과 혼동하지 말자. 내구성을 가진 큐에 저장된 비영속성 메시지는 브로커 재시작 후에 텅 빈 큐만 남겨놓고 사라질것이다. |
그런데 예를 들어 RabiitMQ와 연결이 끓어지고 사용자의 메시지 전송이 실패하면 어떤 일이 벌어질까?
이 경우 sendUserMessage 클래스는 널 값을 반환해서 호출자에게 이 문제를 처리하도록 위임한다.
예제에서는 단순히 사용자에게 메시징 애플리케이션이 직므 문제를 겪고 있는 중이라고 알리면 된다.
비영속성 전달 모드를 사용해야 하는 이유가 무엇일까? 메시지가 손실되지 않도록 보장하는 것이 RabiitMQ 같은 메시지 브로커의 핵임이 아닌가? 맞는 말이긴 하지만 보장성이 유연해질 수 있는 상황이 존재한다. 발행자를 소방 호스라 생각하고 별로 중요하지 않은 메시지를 브로커에게 홍수처럼 퍼붓는 시나리오를 생각해보자. 이 때 비영속적인 전달 방법을 사용하면 높은 성능의 결과로 호스트 컴퓨터 디스크에 접근하는 것을 줄일수 있다. |
'WEB > RabbitMQ' 카테고리의 다른 글
Rabbitmq 설치초기 (0) | 2020.01.08 |
---|---|
4. 애플리케이션 수신함(3) (0) | 2019.12.20 |
2. 애플리케이션 수신함 (0) | 2019.12.14 |
1. 메시징 개념 (0) | 2019.12.13 |