애플리케이션 서버와 RabbitMQ 간에 여러 논리적인 채널을 다중화할 목적으로 물리적인 네트워크 연결을 설정해야 한다. 채널 생성과는 달리 연결을 만드는 작업에는 데이터 베이스 연결과 유사하게 비용이 많이 든다. 보통 데이터베이스 연결은 풀에서 관리되며 단일 실행 스레드가 각 인스턴스 풀을 사용한다. 하지만 AMQP는 다중화된 채널을 통해 여러 스레드가 단일 연결을 사용할 수 있다는 점에서 다르다.
현재로서는 CCM(가상회사)이 단일 연결부터 시작
리치 인터넷 애플리케이션(Rich Internet Application)이 자바로 작성되었으므로 클라이언트 API를 먼저 알아볼 것이다.
ConnctionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
connection connection = factory.newConnection();
CCM(가상회사) 목표
- 애플리케이션은 RabbitMQ에 연결 확립 여부를 책임져야 한다.
- RabbitMQ에 연결이 중단되면 자체적으로 다시 연결한다.
- 연결이 중단되면 메시지를 송수신하는 작업을 적절하게 처리해야 한다.
-> 이미 CCM이 목표로 하는 기능을 지원하는 몇몇 라이브러리가 존재한다. Spring AMQP, Mule AMQP, Beetle 등이 있다.
public class RabbitMqManager implements ShutdownListener
{
private final static Logger LOGGER = Logger.getLogger(RabbitMqManager.class.getName());
private final ConnectionFactory factory;
private final ScheduleExecutorService executor;
public RabbitMqManager (final ConnectionFactory factory)
{
this.factory = factory;
exector = Exectors.newSingleThreadScheduleExecutor();
connection = null;
}
}
RabbitMqManager 클래스의 목적은 RabbitMQ의 단일 연결을 관리하는 데 집중하는 것이다. 따라서 연결되지 않았음을 의미하는 NULL 값으로 Connection 인스턴스에 단일 참조를 유지한다. 재연결 시도는 비동기 방식으로 이뤄지며, 이때 주요 애플리케이션 스레드가 동원되는 것을 피하기 위해 executor를 생성하여 비동기 작업을 수행할 수 있도록 volatile 키워드로 선언한다.
public void start(){
try{
connection = factory.newConnection();
connection.addShutdownListener(this);
LOGGER.info("Connected to " + factory.getHost() + ":" + factory.getPort());
} catch(final Exception e) {
LOGGER.log(Level.SEVERE, "Failed to connect to "+factory.getHost() + ":" + factory.getPort(), e);
asyncWaitAndReconnect();
}
}
start 함수를 호출한 경우에만 연결을 시도한다. RabbitMqManager 클래스 자체를 연결 셧다운 이벤트를 위한 리스너로 등록한다는 점이다. 이는 연결 시 예기치 않은 일이 발생했을 때 shutdownCompleted 함수가 호출될 수 있도록 하기 위함이다. 또한 시작 시에 연결에 실패하면 asyncWaitAndReconnect 함수를 호출해서 이를 처리한다.
protected void asyncWateAndReconnet(){
executor.schedule(new Runnable()
{
@Override
public void run()
{
start();
}
}, 15, TimeUnit.SECONDS);
}
해당 함수는 단순히 RabbitMqManager 클래스를 15초 후에 재시작하도록 스케줄링한다. 15초인 이유는 재연결을 시도할 때 스래싱(thrashing)을 피하기 위함이다.
-> 스래싱(thrashing) : 운영체제에서 빈번하게 페이지 부재(page fault)가 발생하는 현상으로, 프로세스 수행 시간보다 페이지 교환에 소요되는 시간이 더 많다는 것을 의미
너무 빠르게 재연결을 시도할 필요는 없다. 실제로 지수적 백오프(exponential back-off) 전략을 이전 코드에 손쉽게 접목시킬 수 있다.
@Override
public void shutdownCompleted(final ShutdownSignalException cause)
{
// 예기치 않은 문제가 발생할 때만 재연결한다.
if(!cause.isInitiatedByAppication()){
LOGGER.log(Level.SEVERE, "Lost connection to " + factory.getHost() + ":" + factory.getPort(), cause);
connection = null;
asyncWaitAndReconnect();
}
}
이 함수는 연결 작업이 틀어졌을 때 RabbitMQ 자바 클라이언트가 호출하는 함수이다.
여기서 중요한 점은 정상적인 애플리케이션을 종료할때 발생하는 연결 중단 작업을 수행하지 않았을 때만 재연결을 시도한다.
또한 shutdownCompleted 함수라 불리는 RabbitMQ 클라이언트 스레드의 동원을 피하기 위해 비동기 방식으로 재연결한다는 점도 중요하다.
public void stop(){
executor.shutdownNow();
if(connection == null){
return;
}
try{
connection.close();
} catch(final Exception e) {
LOGGER.log(Level.SEVERE, "Failed to close connection", e);
} finally {
connection = null;
}
}
재연결 시도를 담당하는 executor의 종료 후에 자바의 번거롭지만 필수적인 예외 처리 방법으로 연결 자체를 깔끔하게 처리한다.
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("ccm-dev");
factory.setPassword("coney123");
factory.setVirtualHost("ccom-dev-vhost");
factory.setHost("localhost");
factory.setPort(5672);
RabbitMqManager connectionManager = new RabbitMqManager(factory);
connectionManager.start();
연결설정은 RabbitMQ와 특정 작업을 할 수 있는 기반이 되지만 실제 작업은 채널에서 발생한다.
채널과 작업
Channel 인스턴스는 Connection 객체로 생성 가능하다.
public Channel createChannel(){
try{
return connection == null ? null : connection.createChannel();
} catch (final Exception e) {
LOGGER.log(Level.SEVERE, "Failed to create channel", e);
return null;
}
}
채널 생성이 잘못되면 함수는 널 값을 반환한다. 이는 애플리케이션을 RabbitMQ와 관련된 어떠한 오류로부터 보호하기 위함이다. 메시징 서브시스템에서 발생하는 예외를 처리하기보다 잠재적으로 널 값으로 처리하는 것이다. 같은 논리로 채널을 닫는 작업도 잠재적인 예외로 처리하는 함수에 위임한다.
public void closeChannel (final Channel channel){
// isOpen 함수를 전적으로 신뢰할 수 없다!
if ((channel == null) || (!channel.isOpen())){
return;
}
try{
channel.close();
} catch (final Exception e) {
LOGGER.log(Leveel.SEVERE, "Failed to close channel : " + channel, e);
}
}
isOpen 함수를 전적으로 신뢰할수 없다. 그 이유는 채널이 열렸는지 검사한 후에 다른 스레드가 채널을 닫을 수도 있기 때문이다.
참고로 채널 인스턴스는 기술적으로 스레드로부터 안전(thread safe)하지만, 동시에 같은 채널을 사용하는 다수의 스레드를 갖지 않도록 구현할 것을 강력히 권장한다.
코드에서 자주 발생할 수 있는 '채널을 열고, 채널과 무언가를 하고, 채널을 닫는다.'
이 같은 패턴을 사용하기 위해 규악을 정의한 인터페이스를 생성한다.
public interface ChannelCallable<T>{
String getDescription();
T call(Channel channel) throws IOException;
}
그리고 나서 ChannelCallable 인스턴스를 실행하기 위해 RabbitMqManager 함수에 이를 추가한다.
public <T> T call (final ChannelCallable<T> callable)
{
final Channel channel = createChannel();
if(channel != null){
try{
retrun callable.call(channel);
} catch (final Exception e) {
LOGGER.log(Level.SEVERE, " Failed to run : " + callable.getDescription() + " on channel : " + channel, e);
} finally {
closeChannel(channel);
}
}
return null;
}
call 함수의 호출자는 메시징 단계에서 발생할 수 있는 어떠한 오류에도 보호받을 것이며, 뭔가 잘못되면 널 값을 받을 것이다.
'WEB > RabbitMQ' 카테고리의 다른 글
Rabbitmq 설치초기 (0) | 2020.01.08 |
---|---|
4. 애플리케이션 수신함(3) (0) | 2019.12.20 |
3. 애플리케이션 수신함(2) (0) | 2019.12.14 |
1. 메시징 개념 (0) | 2019.12.13 |