본문 바로가기

공부/기타

ActiveMQ 정리

ActiveMQ 정리

ActiveMQ 설치

다운로드 URL : http://activemq.apache.org/activemq-5152-release.html

설치 - 5.10.0 이하는 java 1.6 가능 - 그 이상은 java 1.7 가능

  1. .zip 다운로드 하여 압축품
  2. 경로 ~/bin/
  3. 실행 및 종료 ./activemq start : 백그라운드 실행 ./activemq status : 상태 확인 ./activemq stop : 종료 ./activemq console : 포어그라운드 실행

콘솔 정보(기본) port : 8161 is/pw : admin/admin

JMS(Java Message Service) 사용

activeMQ

  • Apache ActiveMQ 는 대중적이고 강력한 오픈 소스 메시징 그리고 통함 패턴 서버 입니다.
  • Apache ActiveMQ는 빠르며, 다양한 언어간의 클라이언트 및 프로토콜을 지원하고, 사용하기 쉬운 엔터프라이즈 통합 패턴 및 많은 고급 기능을 제공하면서 JMS 1.1 및 J2EE 1.4를 완벽하게 지원합니다.
  • MOM(메시지 지향 미들웨어)입니다.
  • ActiveMQ는 JMS를 지원하는 클라이언트를 포함하는 브로커, 자바 뿐만 아니라 다양한 언어를이용하는시스템간의 통신을할수있게해줍니다. 또한 클러스터링기능 및 DB 그리고 FileSystem을 통해 각 시스템간의 일관성 및 지속성을 유지 시켜줍니다.
  • 간단히 정의하면 클라이언트 간 메시지를 송수신 할 수 있는 오픈 소스 Broker(JMS 서버)입니다.

JMS

  • JMS 는 자바 기반의 MOM(메시지 지향 미들웨어) API 이며 둘 이상의 클라이언트 간의 메시지를 보냅니다.
  • JMS 는 자바 플랫폼, 엔터프라이즈 에디션(EE) 기반이며, 메시지 생성, 송수신, 읽기를 합니다. 또한 비동기적이며 신뢰할 만하고 느슨하게 연결된 서로 다른 분산 어플리케이션 컴포넌트 간의 통신을 허용합니다.
  • JMS의 핵심 개념은 Message Broker 와 Destination 입니다.

Message Broker : 목적지에 안전하게 메시지를 건네주는 중개자 역할 Destination : 목적지에 배달될 2가지 메시지 모델 QUEUE, TOPIC

QUEUE : Point to Point (Consumer 는 메시지를 받기 위해 경쟁합니다.) TOPIC : Publish to Subscribe

ActiveMQ 메시지 처리 구조

Producer(생산자) > Broker > Consumer(소비자) : Producer가 Message를 Queue/Topic에 넣어두면 Consumer가 Message를 가져와 처리하는 방식

사용

Maven 추가

<!-- ActiveMQ  -->
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.15.2</version>
</dependency>

발송 예

// MsgSender - Producer 사용
package com.soon.msg;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class MsgSender {

    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    // default broker URL is : tcp://localhost:61616

    private static String subject = "SOON_QUEUE";

    public static void main(String[] args) throws JMSException {

        // Getting JMS connection from the server and starting it
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        // /Library/Java/JavaVirtualMachines/jdk1.8.0_152.jdk/Contents/Home
        // Creating a non transactional seesion to send/receive JMS message
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Destination represents here our queue 'SOON_QUEUE' on the JMS server
        // The queue will be created automatically on the server
        Destination destination = session.createQueue(subject);

        // MessageProducer is used for sending messages to the queue
        MessageProducer producer = session.createProducer(destination);

        // We will send a small text message saying 'Hello World'
        TextMessage message = session.createTextMessage("Hello World SOON");

        producer.send(message);

        System.out.println("SOON QUEUE : " + message.getText());

        connection.close();
      }
}

리시버 예

    // MsgReceiver - Consumer 사용
    package com.soon.msg;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;

    public class MsgReceiver {
        private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

        private static String subject = "SOON_QUEUE";

        public static void main(String[] args) throws JMSException {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

            Connection connection = connectionFactory.createConnection();

            connection.start();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            Destination destination = session.createQueue(subject);

            MessageConsumer consumer = session.createConsumer(destination);

            Message message = consumer.receive();

            if(message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("Received msg : " + textMessage.getText());
            }
            connection.close();

        }
    }

MDP(Message Driven POJO)로 JMS를 구현

방법

  1. javax.jms.MessageListener를 구현하여 onMessage메소드를 만듬
  2. Spring은 MessageListenerAdapter 를 이용하여 보다 청아한?(깨끗한?) POJO를 만듬

구현구조 Queue/Topic > Message Listener Container > Message Listener Adapter (handleMessage) > POJO

설정

<!-- A JMS connection factory for ActiveMQ -->
<bean id="connectionFactory"
  class="org.apache.activemq.ActiveMQConnectionFactory"    
  p:brokerURL="tcp://localhost:61616"/>

<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
  <constructor-arg value="SOON_QUEUE" />
</bean>

<bean id="defaultMessageDelegate" class="com.soon.delegate.DefaultMessageDelegate" />

<!--
MessageListenerAdapter의 메시지 처리 메소드의 이름은 기본적으로 'handleMessage' 이다
-->
<bean id="messageListener"
  class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
  <property name="delegate" ref="defaultMessageDelegate" />
  <property name="defaultListenerMethod" value="handleMessage" />
</bean>

<bean id="jmsContainer"
  class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="destination" ref="destination"/>
  <property name="messageListener" ref="messageListener" />
</bean>

MDP : Message-Driven POJO

참고 : http://activemq.apache.org/spring-support.html http://yunsunghan.tistory.com/126

JMSConfig.xml 예

  <?xml version="1.0" encoding="UTF-8"?>
  <beans xmlns="http://www.springframework.org/schema/beans"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns:jms="http://www.springframework.org/schema/jms"
         xmlns:p="http://www.springframework.org/schema/p"
         xsi:schemaLocation="
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

    <!-- A JMS connection factory for ActiveMQ -->
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616"/>

    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="SOON_QUEUE" />
    </bean>

    <bean id="defaultMessageDelegate" class="com.soon.delegate.DefaultMessageDelegate" />

    <!-- A POJO that implements the JMS message listener -->
    <!--
    <bean id="messageListener" class="com.soon.listener.SimpleMessageListener" />
    -->

    <!--
        MessageListenerAdapter의 메시지 처리 메소드의 이름은 기본적으로 'handleMessage' 이다
        delegat에 전달 받을 객체로 변환해주기 위해서 Converter를 등록함
     -->
    <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="defaultMessageDelegate" />
        <property name="defaultListenerMethod" value="handleMessage" />
        </bean>

    <!--
        JMS 메시지를 전달 받고 MessageListener 에 전달시에 쓰레드로 구동하기 위한 Executer를 하나 만든다.
        이것을 세팅하지 않은 경우 org.springframework.core.task.SimpleAsyncTaskExecutor 가 사용된다.  
     -->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="destination"/>
            <property name="messageListener" ref="messageListener" />
            <property name="concurrentConsumers" value="1" />
            <property name="maxConcurrentConsumers" value="5" />
            <property name="taskExecutor" ref="listenerThreadPoolTaskExecutor" />
        </bean>

    <bean id="listenerThreadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="10" />
        <property name="maxPoolSize" value="30" />
        <property name="queueCapacity" value="0" />
        <property name="keepAliveSeconds" value="10" />
        <property name="threadNamePrefix" value="SOON_QUE-" />
    </bean>


  </beans>

메시지 처리 Delegate 예

// 인터페이스    
package com.soon.delegate;

import java.io.Serializable;
import java.util.Map;

import javax.jms.TextMessage;

public interface MessageDelegate {
    void handleMessage(String message);

    void handleMessage(Map message);

    void handleMessage(byte[] message);

    void handleMessage(Serializable message);
}

구현

// 클래스
package com.soon.delegate;

import java.io.Serializable;
import java.util.Map;

import javax.jms.TextMessage;

import org.apache.log4j.Logger;

public class DefaultMessageDelegate implements MessageDelegate{

    private static final Logger log = Logger.getLogger(DefaultMessageDelegate.class);

    @Override
    public void handleMessage(String message) {
        // TODO Auto-generated method stub
        log.info("1");

        log.info("handleMessage : {}" + message);
    }

    @Override
    public void handleMessage(Map message) {
        // TODO Auto-generated method stub
        log.info("2");
    }

    @Override
    public void handleMessage(byte[] message) {
        // TODO Auto-generated method stub
        log.info("3");
    }

    @Override
    public void handleMessage(Serializable message) {
        // TODO Auto-generated method stub
        log.info("4");
    }

}

'공부 > 기타' 카테고리의 다른 글

mysql 타임존 설정  (0) 2019.04.15
RabbitMQ 학습  (0) 2019.04.09
MAC에서 RabbitMQ 설치  (0) 2019.04.03
APACHE 2.4 설치하기  (0) 2019.02.08
메일(SMTP) 프로토콜 사용??  (0) 2019.02.08