ActiveMQ 정리
ActiveMQ 설치
다운로드 URL : http://activemq.apache.org/activemq-5152-release.html
설치
- 5.10.0 이하는 java 1.6 가능
- 그 이상은 java 1.7 가능
- .zip 다운로드 하여 압축품
- 경로 ~/bin/
- 실행 및 종료 ./activemq start : 백그라운드 실행 ./activemq status : 상태 확인 ./activemq stop : 종료 ./activemq console : 포어그라운드 실행
콘솔 정보(기본) port : 8161 is/pw : admin/admin
JMS(Java Message Service) 사용
###activeMQ
- Apache ActiveMQ 는 대중적이고 강력한 오픈 소스 메시징 그리고 통함 패턴 서버 입니다.
- Apache ActiveMQ는 빠르며, 다양한 언어간의 클라이언트 및 프로토콜을 지원하고, 사용하기 쉬운 엔터프라이즈 통합 패턴 및 많은 고급 기능을 제공하면서 JMS 1.1 및 J2EE 1.4를 완벽하게 지원합니다.
- MOM(메시지 지향 미들웨어)입니다.
- ActiveMQ는 JMS를 지원하는 클라이언트를 포함하는 브로커, 자바 뿐만 아니라 다양한 언어를이용하는시스템간의 통신을할수있게해줍니다. 또한 클러스터링기능 및 DB 그리고 FileSystem을 통해 각 시스템간의 일관성 및 지속성을 유지 시켜줍니다.
- 간단히 정의하면 클라이언트 간 메시지를 송수신 할 수 있는 오픈 소스 Broker(JMS 서버)입니다.
###JMS
- JMS 는 자바 기반의 MOM(메시지 지향 미들웨어) API 이며 둘 이상의 클라이언트 간의 메시지를 보냅니다.
- JMS 는 자바 플랫폼, 엔터프라이즈 에디션(EE) 기반이며, 메시지 생성, 송수신, 읽기를 합니다. 또한 비동기적이며 신뢰할 만하고 느슨하게 연결된 서로 다른 분산 어플리케이션 컴포넌트 간의 통신을 허용합니다.
- JMS의 핵심 개념은 Message Broker 와 Destination 입니다.
Message Broker : 목적지에 안전하게 메시지를 건네주는 중개자 역할 Destination : 목적지에 배달될 2가지 메시지 모델 QUEUE, TOPIC
QUEUE : Point to Point (Consumer 는 메시지를 받기 위해 경쟁합니다.) TOPIC : Publish to Subscribe
ActiveMQ 메시지 처리 구조
Producer(생산자) > Broker > Consumer(소비자) : Producer가 Message를 Queue/Topic에 넣어두면 Consumer가 Message를 가져와 처리하는 방식
사용
Maven 추가
<!-- ActiveMQ -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.2</version>
</dependency>
발송 예
// MsgSender - Producer 사용
package com.soon.msg;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MsgSender {
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
// default broker URL is : tcp://localhost:61616
private static String subject = "SOON_QUEUE";
public static void main(String[] args) throws JMSException {
// Getting JMS connection from the server and starting it
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
// /Library/Java/JavaVirtualMachines/jdk1.8.0_152.jdk/Contents/Home
// Creating a non transactional seesion to send/receive JMS message
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Destination represents here our queue 'SOON_QUEUE' on the JMS server
// The queue will be created automatically on the server
Destination destination = session.createQueue(subject);
// MessageProducer is used for sending messages to the queue
MessageProducer producer = session.createProducer(destination);
// We will send a small text message saying 'Hello World'
TextMessage message = session.createTextMessage("Hello World SOON");
producer.send(message);
System.out.println("SOON QUEUE : " + message.getText());
connection.close();
}
}
리시버 예
// MsgReceiver - Consumer 사용
package com.soon.msg;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MsgReceiver {
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private static String subject = "SOON_QUEUE";
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(subject);
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive();
if(message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received msg : " + textMessage.getText());
}
connection.close();
}
}
MDP(Message Driven POJO)로 JMS를 구현
방법
- javax.jms.MessageListener를 구현하여 onMessage메소드를 만듬
- Spring은 MessageListenerAdapter 를 이용하여 보다 청아한?(깨끗한?) POJO를 만듬
구현구조 Queue/Topic > Message Listener Container > Message Listener Adapter (handleMessage) > POJO
설정
<!-- A JMS connection factory for ActiveMQ -->
<bean id="connectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="tcp://localhost:61616"/>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="SOON_QUEUE" />
</bean>
<bean id="defaultMessageDelegate" class="com.soon.delegate.DefaultMessageDelegate" />
<!--
MessageListenerAdapter의 메시지 처리 메소드의 이름은 기본적으로 'handleMessage' 이다
-->
<bean id="messageListener"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<property name="delegate" ref="defaultMessageDelegate" />
<property name="defaultListenerMethod" value="handleMessage" />
</bean>
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener" />
</bean>
MDP : Message-Driven POJO
참고 : http://activemq.apache.org/spring-support.html http://yunsunghan.tistory.com/126
JMSConfig.xml 예
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
<!-- A JMS connection factory for ActiveMQ -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616"/>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="SOON_QUEUE" />
</bean>
<bean id="defaultMessageDelegate" class="com.soon.delegate.DefaultMessageDelegate" />
<!-- A POJO that implements the JMS message listener -->
<!--
<bean id="messageListener" class="com.soon.listener.SimpleMessageListener" />
-->
<!--
MessageListenerAdapter의 메시지 처리 메소드의 이름은 기본적으로 'handleMessage' 이다
delegat에 전달 받을 객체로 변환해주기 위해서 Converter를 등록함
-->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<property name="delegate" ref="defaultMessageDelegate" />
<property name="defaultListenerMethod" value="handleMessage" />
</bean>
<!--
JMS 메시지를 전달 받고 MessageListener 에 전달시에 쓰레드로 구동하기 위한 Executer를 하나 만든다.
이것을 세팅하지 않은 경우 org.springframework.core.task.SimpleAsyncTaskExecutor 가 사용된다.
-->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener" />
<property name="concurrentConsumers" value="1" />
<property name="maxConcurrentConsumers" value="5" />
<property name="taskExecutor" ref="listenerThreadPoolTaskExecutor" />
</bean>
<bean id="listenerThreadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="30" />
<property name="queueCapacity" value="0" />
<property name="keepAliveSeconds" value="10" />
<property name="threadNamePrefix" value="SOON_QUE-" />
</bean>
</beans>
메시지 처리 Delegate 예
// 인터페이스
package com.soon.delegate;
import java.io.Serializable;
import java.util.Map;
import javax.jms.TextMessage;
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
구현
// 클래스
package com.soon.delegate;
import java.io.Serializable;
import java.util.Map;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;
public class DefaultMessageDelegate implements MessageDelegate{
private static final Logger log = Logger.getLogger(DefaultMessageDelegate.class);
@Override
public void handleMessage(String message) {
// TODO Auto-generated method stub
log.info("1");
log.info("handleMessage : {}" + message);
}
@Override
public void handleMessage(Map message) {
// TODO Auto-generated method stub
log.info("2");
}
@Override
public void handleMessage(byte[] message) {
// TODO Auto-generated method stub
log.info("3");
}
@Override
public void handleMessage(Serializable message) {
// TODO Auto-generated method stub
log.info("4");
}
}
'공부 > Spring' 카테고리의 다른 글
[Boot] @WebMvcTest (0) | 2019.02.15 |
---|---|
[Boot] @SpringBootTest (0) | 2019.02.15 |
[Boot] 환경설정 관련 (0) | 2019.02.10 |
[JPA] 스프링 부트 + jPA 사용시 참고.... (0) | 2019.02.09 |
[Boot] DataSource 설정이 없이 실행하고 싶다. (0) | 2019.02.08 |