본문 바로가기

공부/Spring

[AvtiveMQ] 정리

ActiveMQ 정리

ActiveMQ 설치

다운로드 URL : http://activemq.apache.org/activemq-5152-release.html

설치

  • 5.10.0 이하는 java 1.6 가능
  • 그 이상은 java 1.7 가능
  1. .zip 다운로드 하여 압축품
  2. 경로 ~/bin/
  3. 실행 및 종료 ./activemq start : 백그라운드 실행 ./activemq status : 상태 확인 ./activemq stop : 종료 ./activemq console : 포어그라운드 실행

콘솔 정보(기본) port : 8161 is/pw : admin/admin

JMS(Java Message Service) 사용

###activeMQ

  • Apache ActiveMQ 는 대중적이고 강력한 오픈 소스 메시징 그리고 통함 패턴 서버 입니다.
  • Apache ActiveMQ는 빠르며, 다양한 언어간의 클라이언트 및 프로토콜을 지원하고, 사용하기 쉬운 엔터프라이즈 통합 패턴 및 많은 고급 기능을 제공하면서 JMS 1.1 및 J2EE 1.4를 완벽하게 지원합니다.
  • MOM(메시지 지향 미들웨어)입니다.
  • ActiveMQ는 JMS를 지원하는 클라이언트를 포함하는 브로커, 자바 뿐만 아니라 다양한 언어를이용하는시스템간의 통신을할수있게해줍니다. 또한 클러스터링기능 및 DB 그리고 FileSystem을 통해 각 시스템간의 일관성 및 지속성을 유지 시켜줍니다.
  • 간단히 정의하면 클라이언트 간 메시지를 송수신 할 수 있는 오픈 소스 Broker(JMS 서버)입니다.

###JMS

  • JMS 는 자바 기반의 MOM(메시지 지향 미들웨어) API 이며 둘 이상의 클라이언트 간의 메시지를 보냅니다.
  • JMS 는 자바 플랫폼, 엔터프라이즈 에디션(EE) 기반이며, 메시지 생성, 송수신, 읽기를 합니다. 또한 비동기적이며 신뢰할 만하고 느슨하게 연결된 서로 다른 분산 어플리케이션 컴포넌트 간의 통신을 허용합니다.
  • JMS의 핵심 개념은 Message Broker 와 Destination 입니다.

Message Broker : 목적지에 안전하게 메시지를 건네주는 중개자 역할 Destination : 목적지에 배달될 2가지 메시지 모델 QUEUE, TOPIC

QUEUE : Point to Point (Consumer 는 메시지를 받기 위해 경쟁합니다.) TOPIC : Publish to Subscribe

ActiveMQ 메시지 처리 구조

Producer(생산자) > Broker > Consumer(소비자) : Producer가 Message를 Queue/Topic에 넣어두면 Consumer가 Message를 가져와 처리하는 방식

사용

Maven 추가

<!-- ActiveMQ  -->
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.15.2</version>
</dependency>

발송 예

// MsgSender - Producer 사용
package com.soon.msg;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class MsgSender {

	private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
	// default broker URL is : tcp://localhost:61616

	private static String subject = "SOON_QUEUE";

	public static void main(String[] args) throws JMSException {

		// Getting JMS connection from the server and starting it
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
		Connection connection = connectionFactory.createConnection();
		connection.start();
		// /Library/Java/JavaVirtualMachines/jdk1.8.0_152.jdk/Contents/Home
		// Creating a non transactional seesion to send/receive JMS message
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

		// Destination represents here our queue 'SOON_QUEUE' on the JMS server
		// The queue will be created automatically on the server
		Destination destination = session.createQueue(subject);

		// MessageProducer is used for sending messages to the queue
		MessageProducer producer = session.createProducer(destination);

		// We will send a small text message saying 'Hello World'
		TextMessage message = session.createTextMessage("Hello World SOON");

		producer.send(message);

		System.out.println("SOON QUEUE : " + message.getText());

		connection.close();
	  }
}

리시버 예

    // MsgReceiver - Consumer 사용
    package com.soon.msg;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;

    public class MsgReceiver {
    	private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    	private static String subject = "SOON_QUEUE";

    	public static void main(String[] args) throws JMSException {
    		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

    		Connection connection = connectionFactory.createConnection();

    		connection.start();

    		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    		Destination destination = session.createQueue(subject);

    		MessageConsumer consumer = session.createConsumer(destination);

    		Message message = consumer.receive();

    		if(message instanceof TextMessage) {
    			TextMessage textMessage = (TextMessage) message;
    			System.out.println("Received msg : " + textMessage.getText());
    		}
    		connection.close();

    	}
    }

MDP(Message Driven POJO)로 JMS를 구현

방법

  1. javax.jms.MessageListener를 구현하여 onMessage메소드를 만듬
  2. Spring은 MessageListenerAdapter 를 이용하여 보다 청아한?(깨끗한?) POJO를 만듬

구현구조 Queue/Topic > Message Listener Container > Message Listener Adapter (handleMessage) > POJO

설정

<!-- A JMS connection factory for ActiveMQ -->
<bean id="connectionFactory"
  class="org.apache.activemq.ActiveMQConnectionFactory"    
  p:brokerURL="tcp://localhost:61616"/>

<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
  <constructor-arg value="SOON_QUEUE" />
</bean>

<bean id="defaultMessageDelegate" class="com.soon.delegate.DefaultMessageDelegate" />

<!--
MessageListenerAdapter의 메시지 처리 메소드의 이름은 기본적으로 'handleMessage' 이다
-->
<bean id="messageListener"
  class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
  <property name="delegate" ref="defaultMessageDelegate" />
  <property name="defaultListenerMethod" value="handleMessage" />
</bean>

<bean id="jmsContainer"
  class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="destination" ref="destination"/>
  <property name="messageListener" ref="messageListener" />
</bean>

MDP : Message-Driven POJO

참고 : http://activemq.apache.org/spring-support.html http://yunsunghan.tistory.com/126

JMSConfig.xml 예

  <?xml version="1.0" encoding="UTF-8"?>
  <beans xmlns="http://www.springframework.org/schema/beans"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns:jms="http://www.springframework.org/schema/jms"
         xmlns:p="http://www.springframework.org/schema/p"
         xsi:schemaLocation="
         	http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  		http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

  	<!-- A JMS connection factory for ActiveMQ -->
  	<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616"/>

  	<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
  		<constructor-arg value="SOON_QUEUE" />
  	</bean>

  	<bean id="defaultMessageDelegate" class="com.soon.delegate.DefaultMessageDelegate" />

  	<!-- A POJO that implements the JMS message listener -->
  	<!--
  	<bean id="messageListener" class="com.soon.listener.SimpleMessageListener" />
  	-->

  	<!--
  		MessageListenerAdapter의 메시지 처리 메소드의 이름은 기본적으로 'handleMessage' 이다
  		delegat에 전달 받을 객체로 변환해주기 위해서 Converter를 등록함
  	 -->
  	<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
  		<property name="delegate" ref="defaultMessageDelegate" />
  		<property name="defaultListenerMethod" value="handleMessage" />
    	</bean>

  	<!--
  		JMS	메시지를 전달 받고 MessageListener 에 전달시에 쓰레드로 구동하기 위한 Executer를 하나 만든다.
  		이것을 세팅하지 않은 경우 org.springframework.core.task.SimpleAsyncTaskExecutor 가 사용된다.  
  	 -->
  	<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    		<property name="connectionFactory" ref="connectionFactory"/>
    		<property name="destination" ref="destination"/>
    		<property name="messageListener" ref="messageListener" />
    		<property name="concurrentConsumers" value="1" />
    		<property name="maxConcurrentConsumers" value="5" />
    		<property name="taskExecutor" ref="listenerThreadPoolTaskExecutor" />
    	</bean>

  	<bean id="listenerThreadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
  		<property name="corePoolSize" value="10" />
  		<property name="maxPoolSize" value="30" />
  		<property name="queueCapacity" value="0" />
  		<property name="keepAliveSeconds" value="10" />
  		<property name="threadNamePrefix" value="SOON_QUE-" />
  	</bean>


  </beans>

메시지 처리 Delegate 예

// 인터페이스    
package com.soon.delegate;

import java.io.Serializable;
import java.util.Map;

import javax.jms.TextMessage;

public interface MessageDelegate {
	void handleMessage(String message);

	void handleMessage(Map message);

	void handleMessage(byte[] message);

	void handleMessage(Serializable message);
}

구현

// 클래스
package com.soon.delegate;

import java.io.Serializable;
import java.util.Map;

import javax.jms.TextMessage;

import org.apache.log4j.Logger;

public class DefaultMessageDelegate implements MessageDelegate{

	private static final Logger log = Logger.getLogger(DefaultMessageDelegate.class);

	@Override
	public void handleMessage(String message) {
		// TODO Auto-generated method stub
		log.info("1");

		log.info("handleMessage : {}" + message);
	}

	@Override
	public void handleMessage(Map message) {
		// TODO Auto-generated method stub
		log.info("2");
	}

	@Override
	public void handleMessage(byte[] message) {
		// TODO Auto-generated method stub
		log.info("3");
	}

	@Override
	public void handleMessage(Serializable message) {
		// TODO Auto-generated method stub
		log.info("4");
	}

}

'공부 > Spring' 카테고리의 다른 글

[Boot] @WebMvcTest  (0) 2019.02.15
[Boot] @SpringBootTest  (0) 2019.02.15
[Boot] 환경설정 관련  (0) 2019.02.10
[JPA] 스프링 부트 + jPA 사용시 참고....  (0) 2019.02.09
[Boot] DataSource 설정이 없이 실행하고 싶다.  (0) 2019.02.08