Technical Works

ASC Technical support site

*

Active MQ紹介

   

1.Message Queueとは

まず、メッセージとはアプリケーションにとって意味のあるバイト文字列を意味します。メッセージはアプリケーション間または同じアプリケーション内の異なる部分の間で情報を転送するために使用されます。キューはメッセージを保存するために使用されるデータ構造体です。それで、メッセージキューを利用することで、送信側のソフトは相手の都合を考えずに、キューにデータを格納するだけで、確実にメッセージを送り届けることができます。メッセージキューはアプリケーション間の相互運用性が向上し、システム全体の耐障害性も向上するし、「待ち時間」の節約や、バッファリングを行うための資源の節約により、パフォーマンスが向上する場合もあります。

主要なメッセージングモデルにポイント・ツー・ポイント(PTP)およびパブリッシュ/サブスクライブ(Pub/Sub)があります。PTPはメッセージキューのコンセプトに基づいており、1つのあて先に対して1つの送信元からメッセージが送信されます。Pub/Subは階層化されたノードに定義された複数の購読者に1つの発信者からメッセージを配信します。

①Point-To-Pointメッセージ

送信側はキューにメッセージを送り、受信側はキューからメッセージを取り出します。受信側はメッセージ取得後、成功した旨をキュー(セッション)に通知を送ります。

point-to-point

②Publish/Subscribeメッセージ

購読者が発信者からメッセージを受け取るには、購読者が購読申し込み(サブスクライブ)する必要があります。一度購読申し込みを行うと、購読者はトピックからメッセージを取りに行く必要がなくなります。発信者からトピックにメッセージが送られるとトピックは購読申し込みを行っている購読者すべてにメッセージを配信します。

pub_sub

2.Active MQの設定

①ダウンロード

以下のApacheサイトから最新モジュールをダウンロードします。現時点では最新版は「apache-activemq-5.9.1」です。

amq-download

ダウンロードサイト:http://activemq.apache.org/download-archives.html

②設定

上記のサイトからダウンロードしたバイナリファイル(例:apache-activemq-5.9.1-bin.zip)を展開します。
展開したディレクトリから「bin」ディレクトリ配下に移動します。
コマンド例:C:\Dev\apache-activemq-5.9.1\bin
次に、Javaホームディレクトリが設定されている場合は不要ですが、設定されてない場合は、コマンドラインで、「JAVA_HOME」環境変数を設定します。
コマンド例:set JAVA_HOME=C:\Dev\Java\jdk1.7.0_72
最後に、起動スクリプト(Windowsの場合例:activemq.bat)を実行します。
コマンド例:activemq.bat

amq-startup

正しく起動されたか確認するために、以下のサイトにアクセスしてみます。アクセスする際、ユーザ認証画面が表示されます。ユーザ名とパスワードは初期設定で、「admin/admin」となっています。

アクセスサイト:http://localhost:8161/admin

amq-login

3.サンプルソース

以下のソースはメッセージキューの基本機能を示す簡単なサンプルです。基本的にメッセージを生成し、QueueもしくとTopicにメッセージ送信し、もう一方でメッセージを受信するイメージとなっています。以下のサンプルでは、テキストメッセージ(Hello World)を送信し、メッセージを受信後に画面に表示するもととなっております。Queueの場合は、メッセージを送信→メッセージ受信の順になりますが、Topicの場合は、まず、メッセージ受信起動→メッセージ送信→メッセージ受信の順になります。
Javaソースのコンパイル時、「javax.jms」関連パッケージがないと言うエラーメッセージが出力される場合は、J2EEに含まれている「jms.jar」ファイルをクラスパスに追加してください。

compile_run

①Point-To-Pointの例

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TestActiveMQ {

    public static void main(String[] args) throws Exception {

        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        Thread.sleep(1000);
    }

    public static void thread(Runnable runnable, boolean daemon) {

        Thread brokerThread = new Thread(runnable);
        brokerThread.setDaemon(daemon);
        brokerThread.start();
    }

    public static class HelloWorldProducer implements Runnable {

        public void run() {
            try {
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");

                Connection connection = connectionFactory.createConnection();
                connection.start();

                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                Destination destination = session.createQueue("TEST.QUEUE");

                MessageProducer producer = session.createProducer(destination);
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

                String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
                TextMessage message = session.createTextMessage(text);

                System.out.println("Sent message: " + message.hashCode() + " : " + Thread.currentThread().getName());
                producer.send(message);

                session.close();
                connection.close();
            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }
    }

    public static class HelloWorldConsumer implements Runnable {

        public void run() {
            try {
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");

                Connection connection = connectionFactory.createConnection();
                connection.start();

                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                Destination destination = session.createQueue("TEST.QUEUE");

                MessageConsumer consumer = session.createConsumer(destination);

                Message message = consumer.receive(1000);

                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    String text = textMessage.getText();
                    System.out.println("Received: " + text);
                } else {
                    System.out.println("Received: " + message);
                }

                consumer.close();
                session.close();
                connection.close();
            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }
    }

}

 

②Publish/Subscribeの例

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TestActiveMQ2 {

    public static void main(String[] args) throws Exception {

        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        Thread.sleep(2000);
    }

    public static void thread(Runnable runnable, boolean daemon) {

        Thread brokerThread = new Thread(runnable);
        brokerThread.setDaemon(daemon);
        brokerThread.start();
    }

    public static class HelloWorldProducer implements Runnable {

        public void run() {
            try {
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");

                Connection connection = connectionFactory.createConnection();
                connection.start();

                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                Topic topic = session.createTopic("TEST.TOPIC");

                MessageProducer producer = session.createProducer(topic);

                String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
                TextMessage message = session.createTextMessage(text);

                System.out.println("Sent message: " + message.hashCode() + " : " + Thread.currentThread().getName());
                producer.send(message);

                session.close();
                connection.close();
            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }
    }

    public static class HelloWorldConsumer implements Runnable {

        Connection connection = null;
        Session session = null;
        MessageConsumer consumer = null;

        public void run() {
            try {
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");

                connection = connectionFactory.createConnection();
                connection.start();

                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                Topic topic = session.createTopic("TEST.TOPIC");

                consumer = session.createConsumer(topic);

                MessageListener listner = new MessageListener() {

                    public void onMessage(Message message) {
                        try {
                            if (message instanceof TextMessage) {
                                TextMessage textMessage = (TextMessage) message;
                                String text = textMessage.getText();
                                System.out.println("Received: " + text);
                            } else {
                                System.out.println("Received: " + message);
                            }
                        } catch (JMSException e) {
                            System.out.println("Caught:" + e);
                            e.printStackTrace();
                        }
                    }
                };
                consumer.setMessageListener(listner);

            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }
    }

}

 

The following two tabs change content below.

最新記事 by 李 (全て見る)

 - Java, JBoss

Loading Facebook Comments ...

Message

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です

CAPTCHA


*

  関連記事

eclipse tips

今回は開発に役に立つ便利なeclipseのショートカットキーを紹介したいと思いま …

Infinispanの紹介

はじめに Infinispanはオープンソースデータグリッドプラットフォームで、 …

Jbossを利用したWebアプリケーションローカル環境構築手順(1/3)

Webアプリケーションの、ちょっとした動作確認や 自由に利用できる自分専用のデー …

Drools(Java Rules Engine)の紹介

ルール・エンジンとは ルール・エンジンとは、ビジネス上のルールといった「分岐処理 …

Infinispanの応用

今回はInfinispanを利用した応用編となります。 今回紹介する機能は以下の …