ApolloをJMSで使ってみる

ActiveMQApolloがよさげ。
http://activemq.apache.org/apollo/

stomp推奨なのかな。
http://stomp.github.com/

stompで使ってみたかったんだけど・・・
Javaのクライアントの貧弱っぷりに泣けた。

とりあえずってことでJMSで使ってみることにする。

Apolloのインストール

基本的にはダウンロードして展開するだけ。

cd /usr/local/src/
# download先は適宜。
wget http://ftp.riken.jp/net/apache/activemq/activemq-apollo/1.1/apache-apollo-1.1-unix-distro.tar.gz
tar xvzf apache-apollo-1.1-unix-distro.tar.gz -C /usr/local/
ln -s /usr/local/apache-apollo-1.1 /usr/local/apollo

Javaの準備

Javaがインストールされてある必要がある。
$JAVA_HOMEを指定するか、個別にパス通すか。
ちなみに$JAVA_HOME/bin/javaだけでなく、$JAVA_HOME/bin/keytoolも必要。

brokerの作成

場所は任意。

cd /var/lib/
/usr/local/apollo/bin/apollo create mybroker  # broker名は任意。ここでkeytool使います。

起動スクリプトを準備

broker作ったらメッセージに出るんですけど、
一緒に生成されてるのでリンクはるだけ。

ln -s "/var/lib/mybroker/bin/apollo-broker-service" /etc/init.d/

config

そのままでもいいんだけど、デフォルトだとWebUIが127.0.0.1にバインドされて
リモートだとみれないので変更する

vi /var/lib/mybroker/etc/apllo.xml

下記を任意に変更。

<web_admin bind="http://127.0.0.1:61680"/>

起動

/etc/init.d/apollo-broker-service start

Javaからキューイング

package jp.ne.hatena.adorechic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.HashMap;
import java.util.Map;

public class Producer {
	private QueueConnectionFactory factory;

	public Producer( {
		FACTORY = new ActiveMQConnectionFactory("admin","password","tcp://192.168.0.1:61613");
	}


	public void queue(Map<String,String> param, String queueName) {

		QueueConnection connection = null;
		QueueSession session = null;
		QueueSender sender = null;

		try {
			QueueConnectionFactory factory 
				= new ActiveMQConnectionFactory("admin","password","tcp://192.168.0.1:61613");
			connection = FACTORY.createQueueConnection();

			session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
			Queue queue = session.createQueue(queueName);
			sender = session.createSender(queue);

			connection.start();

			MapMessage msg = session.createMapMessage();
			for(Map.Entry<String,String> entry: param.entrySet()) {
				msg.setString(entry.getKey(), entry.getValue());
			}

			sender.send(msg);

		} catch (JMSException e) {
			throw new RuntimeException("queue:"+queueName+"->"+param, e);
		} finally {
			try {
				if (sender != null) sender.close();
				if (session != null) session.close();
				if (connection != null) connection.close();
			} catch (JMSException e) {
				throw new RuntimeException("close failed queue:"+queueName+"->"+param, e);
			}
		}
	}

	public static void main(String[] args) {
		Map<String,String> map = new HashMap<String,String>();
		map.put("testKey", "testVal");
		new Producer().queue(map, "TestQueue");
	}
}

依存関係も一応。バージョンは適宜。

		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.5.0</version>
		</dependency>

		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.0.1</version>
		</dependency>

		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.6.4</version>
		</dependency>

Javaでレシーブ

SpringJMSを使うと手っ取り早い。

Consumer
package jp.ne.hatena.adorechic;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;

public class Consumer implements MessageListener{

	public void onMessage(Message message) {
		MapMessage map = (MapMessage) message;
		try {
			String value = map.getString("testKey");
			System.out.println("receive->" + value);
		} catch (JMSException e) {
			e.printStackTrace(); 
		}
	}
}
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	   xmlns:amq="http://activemq.apache.org/schema/core"
	   xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans.xsd
                           http://activemq.apache.org/schema/core
                           http://activemq.apache.org/schema/core/activemq-core.xsd">


	<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://192.168.0.1:61613"/>
		<property name="userName" value="admin" />
		<property name="password" value="password" />
	</bean>


	<bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg value="TestQueue"/>
	</bean>
	<bean id="testQueueListener" class="jp.ne.hatena.adorechic.Consumer"/>

	<bean id="container" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="receiveTimeout" value="10000" />
		<property name="concurrentConsumers" value="1"/>
		<property name="maxConcurrentConsumers" value="5"/>
		<property name="connectionFactory" ref="connectionFactory"/>
		<property name="messageListener" ref="testQueueListener"/>
		<property name="destination" ref="testQueue" />
	</bean>

</beans>
Launcher
package jp.ne.hatena.adorechic;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

public class Launcher {
	public static void main(String[] args) {
		final ApplicationContext context = new ClassPathXmlApplicationContext("context.xml");
		Runtime.getRuntime().addShutdownHook(new Thread(){
				@Override
				public void run(){
					System.out.println("shutdown start");
					DefaultMessageListenerContainer container =
							(DefaultMessageListenerContainer)context.getBean("container");
					container.shutdown();
				}
		});
	}
}
依存関係
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>3.0.5.RELEASE</version>
		</dependency>

		<dependency>
			<groupId>org.apache.xbean</groupId>
			<artifactId>xbean-spring</artifactId>
			<version>3.4</version>
		</dependency>

		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.5.0</version>
		</dependency>

		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.0.1</version>
		</dependency>

		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.6.4</version>
		</dependency>