https://www.rabbitmq.com/tutorials/tutorial-five-spring-amqp.html
https://stackoverflow.com/questions/50234800/rabbitmq-queue-and-routing-key
RabbitMQ 참고사이트)
https://skibis.tistory.com/310
https://blog.outsider.ne.kr/985
https://spring.io/blog/2011/04/01/routing-topologies-for-performance-and-scalability-with-rabbitmq/
https://velog.io/@minholee_93/RabbitMQ-Multiple-Consumer-for-Queue-n3k4ryt43h
https://jack-vanlightly.com/blog/2017/12/4/rabbitmq-vs-kafka-part-1-messaging-topologies
https://velog.io/@minholee_93/RabbitMQ-Retry-Mechanism-nik4tel6hs
RabbitMQ
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
#spring.rabbitmq.addresses=쉼표(,0로 구분된 접속 가능한 클라이언트의 주소 정보
#spring.rabbitmq.connection-timeout= 접속시 최대 소요시간, 기본값0 (제한없음)
#spring.rabbitmq.host=래빗MQ 서버주소 , 기본값 localhost
#spring.rabbitmq.port=포트, 기본값 5672
#spring.rabbitmq.username=접속시 사용할 사용자 이름, 기본값 guest
#spring.rabbitmq.password=비번, 기본값 guest
#spring.rabbitmq.virtual-host= 브로커에 접속시 사용할 가상주소
# 스프링 부트는 유일한 ConnectionFactory가 존재ㅏㅎ면 자동으로 RabbitTemplate을 설정한다.
# 이 템플릿은 큐에 메시지를 전송하는데 쓰인다.
#
#spring.rabbitmq.template.exchange= 전송 명령에 사용할 기본 인스체인지 이름, 기본값 none
#spring.rabbitmq.template.routing-key= 전송 명령에 사용할 기본 라우팅 키, 기본값 none
#spring.rabbitmq.template.receive-timeout= 수신 소요 최대 시간, 기본값 0(제한없음)
#spring.rabbitmq.template.reply-timeout= 전송 및 수신 명령 시 소요 최대시간, 기본값 5초
#spring.rabbitmq.template.retry.enabled= 재시도를 활성화할지 여부, 기본값 false 비활성화
#spring.rabbitmq.template.retry.max-attempts= 메시지 전송 재시도 횟수, 기본값 3
#spring.rabbitmq.template.retry.initial-interval= 첫번째 전송과 두번쨰 전송 사이의 시간 간격, 기본값 1초
#spring.rabbitmq.template.retry.max-interval= 메시지 전송 재시도 간 최대 간격, 기본값 10초
#spring.rabbitmq.template.retry.multiplier= 이전 재시도 간격에서 횟수가 증가될 때마다 가중되는 비율, 기본값 1.0
래빗MQ에 메시지를 저송하려면, 메시지 페이로드를 byte[]형태로 변환해야한다.
String 객체인 경우에는 단순히 String.getBytes 메소드만 호출하면 된다. 그러나 객체를 전송해야 한다면 조금은 복잡한 일이 된다.
기본적으로 객체가 Serializable 인터페이스를 구현한 형태여야 하며, 자바의 직렬화 기능을 사용해 객체를 byte[] 형태로 변환해야 한다.
물론 자바의 직렬화 기능은 자바 플랫폼이 아닌 시스템에 전송할 경우 좋지 못한 선택이 된다.
래빗MQ는 MessageConverter를 사용해 메시지 생성을 위임한다.
기본적으로 SImpleMessageConverter를 사용하게 되는데, 이는 위에 설명한 방식을 구현한 클래스다.
그러나 실제로는 자바의 직렬화 기능 대신 XML이나 JSON같은 다양한 메시지 형태를 지원하는 구현체를 사용해 페이로드를 생성한다.
#spring.rabbitmq.listener.type= 리스너 컨테이너 유형, direct나 simple을 선택할 수 있고 기본값은 simple이다.
#spring.rabbitmq.listener.simple.acknowledge-mode= 컨테이너 승인 모드, 기본값 none
#spring.rabbitmq.listener.simple.prefetch= 단일 요청에서 처리할 메시지 개수, 기본값 none
#spring.rabbitmq.listener.simple.default-requeue-rejected= 반려된 전송 건에 대해 다시 큐에 쌓을지 여부를 설정
#spring.rabbitmq.listener.simple.concurrency= 리스너 실행에 사용할 최소 스레드 수 설정
#spring.rabbitmq.listener.simple.max-concurrency= 리스너 실행에 사용할 최대 스레드 수 설정
#spring.rabbitmq.listener.direct.prefetch= 컨테이너 승인 모드, 기본값 none
#spring.rabbitmq.listener.direct.default-requeue-rejected= 반려된 전송건에 대해 다시 큐에 쌓을지 여부를 설정
#spring.rabbitmq.listener.direct.consumers-per-queue= 큐 당 컨슈머의 수, 기본값 1
spring.activemq.broker-url=tcp://localhost:61616
spring.jms.template.receive-timeout=500ms
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
package com.example.demo.activemqApp;
public class CustomMessage {
private String text;
private int priority;
private boolean secret;
public CustomMessage() {
}
public CustomMessage(String text, int priority, boolean secret) {
this.text = text;
this.priority = priority;
this.secret = secret;
}
public String getText() {
return text;
}
public void setText(String text) {
this.text = text;
}
public int getPriority() {
return priority;
}
public void setPriority(int priority) {
this.priority = priority;
}
public boolean isSecret() {
return secret;
}
public void setSecret(boolean secret) {
this.secret = secret;
}
@Override
public String toString() {
return "CustomMessage{" +
"text='" + text + '\'' +
", priority=" + priority +
", secret=" + secret +
'}';
}
}
package com.example.demo.activemqApp;
import java.io.Serializable;
import java.util.Date;
public class Order {
private String id;
private Date timestamp;
public Order() {
}
public Order(String id, Date timestamp) {
this.id = id;
this.timestamp = timestamp;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Date getTimestamp() {
return timestamp;
}
public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
@Override
public String toString() {
return "Order{" +
"id='" + id + '\'' +
", timestamp=" + timestamp +
'}';
}
}
package com.example.demo.activemqApp;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfiguration {
private static final String queueName = "spring-boot.q";
private static final String topicExchangeName = "spring-boot-exchange.e";
@Bean("queue1")
Queue queue1() {
// 큐 생성
return new Queue(queueName, false);
}
@Bean("exchange1")
TopicExchange exchange1() {
// TopicExchange 작성
return new TopicExchange(topicExchangeName);
}
@Bean
Binding bindingA(@Qualifier("exchange1") TopicExchange exchange,@Qualifier("queue1") Queue queue) {
// exchange와 queue 바인딩
// foo.bar.# 패턴인식
return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
}
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
// SimpleMessageConverter 대신 Jackson2JsonMessageConverter 를 쓰게하기 위한 코드
@Bean
MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
package com.example.demo.activemqApp;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
import java.util.Objects;
@Service
@Slf4j
public class CustomMessageListener {
@RabbitListener(queues = "spring-boot.q")
public void receiveMessage(final Message message) {
if(Objects.equals(message.getPayload().getClass(), CustomMessage.class)){
CustomMessage customMessage = (CustomMessage) message.getPayload();
log.info("customMessage = {}, {}, {}",customMessage.getText(), customMessage.getPriority(), customMessage.isSecret());
} else if(Objects.equals(message.getPayload().getClass(), Order.class)){
Order order = (Order) message.getPayload();
log.info("Order.tosring = {}, {}",order.getId(), order.getTimestamp());
} else {
log.info("message.tostring = {}",message.toString());
}
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(name="orders.e", type = ExchangeTypes.TOPIC),
value = @Queue(name = "incoiming-orders.q"),
key = "new-order"
))
public void Order(final Message message){
log.info("Order = {} ",message.toString());
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(name="orders.e", type = ExchangeTypes.TOPIC),
value = @Queue(name = "incoiming-orders2.q"),
key = "new-order"
))
public void Order2(final Message message){
log.info("Order2 = {}",message.toString());
}
}
package com.example.demo.activemqApp;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class Run implements ApplicationRunner {
private static final String topicExchange = "spring-boot-exchange.e";
private static final String topicExchangeOrder = "orders.e";
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void run(ApplicationArguments args) throws Exception {
rabbitTemplate.convertAndSend(topicExchange, "foo.bar.baz", "Hello Message!");
CustomMessage message = new CustomMessage("Hello Message!!!", 1, true);
rabbitTemplate.convertAndSend(topicExchange, "foo.bar.baz", message);
Order order = new Order("id-001",new Date());
rabbitTemplate.convertAndSend(topicExchange, "foo.bar.baz",order);
rabbitTemplate.convertAndSend(topicExchangeOrder, "new-order",order);
Order order2 = new Order("id-002",new Date());
rabbitTemplate.convertAndSend(topicExchangeOrder, "new-order", order2, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("type", "10");
return message;
}
});
TimeUnit.SECONDS.sleep(3 );
System.exit(-1);
}
}
http://localhost:15672/#/exchanges
http://localhost:15672/#/queues
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | . ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.2.4.RELEASE) 2020-02-18 00:34:34.889 INFO 17992 --- [ restartedMain] com.example.demo.DemoApplication : Starting DemoApplication on DESKTOP-6HPEM1U with PID 17992 (D:\example\springboot2_DB_example1\target\classes started by k in D:\example\springboot2_DB_example1) 2020-02-18 00:34:34.892 INFO 17992 --- [ restartedMain] com.example.demo.DemoApplication : No active profile set, falling back to default profiles: default 2020-02-18 00:34:34.937 INFO 17992 --- [ restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable 2020-02-18 00:34:34.937 INFO 17992 --- [ restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : For additional web related logging consider setting the 'logging.level.web' property to 'DEBUG' 2020-02-18 00:34:35.792 INFO 17992 --- [ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http) 2020-02-18 00:34:35.802 INFO 17992 --- [ restartedMain] o.apache.catalina.core.StandardService : Starting service [Tomcat] 2020-02-18 00:34:35.803 INFO 17992 --- [ restartedMain] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.30] 2020-02-18 00:34:35.890 INFO 17992 --- [ restartedMain] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext 2020-02-18 00:34:35.891 INFO 17992 --- [ restartedMain] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 954 ms 2020-02-18 00:34:36.090 INFO 17992 --- [ restartedMain] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor' 2020-02-18 00:34:36.229 INFO 17992 --- [ restartedMain] o.s.b.d.a.OptionalLiveReloadServer : LiveReload server is running on port 35729 2020-02-18 00:34:36.284 INFO 17992 --- [ restartedMain] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672] 2020-02-18 00:34:36.314 INFO 17992 --- [ restartedMain] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#899b9fb:0/SimpleConnection@d8cc3d2 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 12928] 2020-02-18 00:34:36.317 INFO 17992 --- [ restartedMain] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (spring-boot.q) durable:false, auto-delete:false, exclusive:false. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost. 2020-02-18 00:34:36.378 INFO 17992 --- [ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path '' 2020-02-18 00:34:36.381 INFO 17992 --- [ restartedMain] com.example.demo.DemoApplication : Started DemoApplication in 1.777 seconds (JVM running for 2.643) 2020-02-18 00:34:36.406 INFO 17992 --- [ntContainer#0-1] c.e.d.activemqApp.CustomMessageListener : message.tostring = GenericMessage [payload=Hello Message!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=spring-boot-exchange.e, amqp_deliveryTag=1, amqp_consumerQueue=spring-boot.q, amqp_redelivered=false, amqp_receivedRoutingKey=foo.bar.baz, amqp_contentEncoding=UTF-8, id=a9a6d287-f2db-c037-d3ab-1bcf46271b7e, amqp_consumerTag=amq.ctag-0fVzm9FbC2JBTeRmlTzIcA, amqp_lastInBatch=false, contentType=application/json, __TypeId__=java.lang.String, timestamp=1581953676404}] 2020-02-18 00:34:36.430 INFO 17992 --- [ntContainer#0-1] c.e.d.activemqApp.CustomMessageListener : customMessage = Hello Message!!!, 1, true 2020-02-18 00:34:36.430 INFO 17992 --- [ntContainer#0-1] c.e.d.activemqApp.CustomMessageListener : Order.tosring = id-001, Tue Feb 18 00:34:36 KST 2020 2020-02-18 00:34:36.437 INFO 17992 --- [ntContainer#1-1] c.e.d.activemqApp.CustomMessageListener : Order2 = GenericMessage [payload=Order{id='id-001', timestamp=Tue Feb 18 00:34:36 KST 2020}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=orders.e, amqp_deliveryTag=1, amqp_consumerQueue=incoiming-orders2.q, amqp_redelivered=false, amqp_receivedRoutingKey=new-order, amqp_contentEncoding=UTF-8, id=a73d7b5f-267e-c926-99a9-331199ff3e84, amqp_consumerTag=amq.ctag-9xrEGeykecKWhRKOKfbHGQ, amqp_lastInBatch=false, contentType=application/json, __TypeId__=com.example.demo.activemqApp.Order, timestamp=1581953676430}] 2020-02-18 00:34:36.437 INFO 17992 --- [ntContainer#2-1] c.e.d.activemqApp.CustomMessageListener : Order = GenericMessage [payload=Order{id='id-001', timestamp=Tue Feb 18 00:34:36 KST 2020}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=orders.e, amqp_deliveryTag=1, amqp_consumerQueue=incoiming-orders.q, amqp_redelivered=false, amqp_receivedRoutingKey=new-order, amqp_contentEncoding=UTF-8, id=3c2b9e99-d0c4-2f9f-ac1e-9d81839cf880, amqp_consumerTag=amq.ctag-8JkaNuw0cpf9Z8KBX2Evzg, amqp_lastInBatch=false, contentType=application/json, __TypeId__=com.example.demo.activemqApp.Order, timestamp=1581953676430}] 2020-02-18 00:34:36.437 INFO 17992 --- [ntContainer#2-1] c.e.d.activemqApp.CustomMessageListener : Order = GenericMessage [payload=Order{id='id-002', timestamp=Tue Feb 18 00:34:36 KST 2020}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=orders.e, amqp_deliveryTag=2, amqp_consumerQueue=incoiming-orders.q, amqp_redelivered=false, type=10, amqp_receivedRoutingKey=new-order, amqp_contentEncoding=UTF-8, id=6f2efcd9-f2c2-5af4-0d33-44c3cdfa117e, amqp_consumerTag=amq.ctag-8JkaNuw0cpf9Z8KBX2Evzg, amqp_lastInBatch=false, contentType=application/json, __TypeId__=com.example.demo.activemqApp.Order, timestamp=1581953676437}] 2020-02-18 00:34:36.437 INFO 17992 --- [ntContainer#1-1] c.e.d.activemqApp.CustomMessageListener : Order2 = GenericMessage [payload=Order{id='id-002', timestamp=Tue Feb 18 00:34:36 KST 2020}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=orders.e, amqp_deliveryTag=2, amqp_consumerQueue=incoiming-orders2.q, amqp_redelivered=false, type=10, amqp_receivedRoutingKey=new-order, amqp_contentEncoding=UTF-8, id=8a9119c5-8be7-f28d-681a-b66e5852c918, amqp_consumerTag=amq.ctag-9xrEGeykecKWhRKOKfbHGQ, amqp_lastInBatch=false, contentType=application/json, __TypeId__=com.example.demo.activemqApp.Order, timestamp=1581953676437}] 2020-02-18 00:34:39.427 INFO 17992 --- [extShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish. 2020-02-18 00:34:39.437 INFO 17992 --- [extShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish. 2020-02-18 00:34:39.438 INFO 17992 --- [extShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish. 2020-02-18 00:34:39.440 INFO 17992 --- [extShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish. 2020-02-18 00:34:39.441 INFO 17992 --- [extShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish. 2020-02-18 00:34:40.441 INFO 17992 --- [extShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish. 2020-02-18 00:34:40.453 INFO 17992 --- [extShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Shutdown ignored - container is not active already 2020-02-18 00:34:40.453 INFO 17992 --- [extShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Shutdown ignored - container is not active already 2020-02-18 00:34:40.454 INFO 17992 --- [extShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Shutdown ignored - container is not active already 2020-02-18 00:34:40.458 INFO 17992 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor' Process finished with exit code -1 | cs |
'WEB > 스프링 부트 2' 카테고리의 다른 글
@JmsListener (0) | 2020.02.17 |
---|---|
JMS, ActiveMQ (0) | 2020.02.15 |
JavaMail (0) | 2020.02.14 |
JDBC, Mapper / JPA / TestEntityManagerTest / SessionFactory (0) | 2020.02.14 |
MySQL 연결 / 스프링 schema.sql, data.sql / Flyway (0) | 2020.02.13 |