ApolloをJMSで使ってみる
ActiveMQのApolloがよさげ。
http://activemq.apache.org/apollo/
stomp推奨なのかな。
http://stomp.github.com/
stompで使ってみたかったんだけど・・・
Javaのクライアントの貧弱っぷりに泣けた。
とりあえずってことでJMSで使ってみることにする。
Apolloのインストール
基本的にはダウンロードして展開するだけ。
cd /usr/local/src/ # download先は適宜。 wget http://ftp.riken.jp/net/apache/activemq/activemq-apollo/1.1/apache-apollo-1.1-unix-distro.tar.gz tar xvzf apache-apollo-1.1-unix-distro.tar.gz -C /usr/local/ ln -s /usr/local/apache-apollo-1.1 /usr/local/apollo
Javaの準備
Javaがインストールされてある必要がある。
$JAVA_HOMEを指定するか、個別にパス通すか。
ちなみに$JAVA_HOME/bin/javaだけでなく、$JAVA_HOME/bin/keytoolも必要。
brokerの作成
場所は任意。
cd /var/lib/ /usr/local/apollo/bin/apollo create mybroker # broker名は任意。ここでkeytool使います。
起動スクリプトを準備
broker作ったらメッセージに出るんですけど、
一緒に生成されてるのでリンクはるだけ。
ln -s "/var/lib/mybroker/bin/apollo-broker-service" /etc/init.d/
config
そのままでもいいんだけど、デフォルトだとWebUIが127.0.0.1にバインドされて
リモートだとみれないので変更する
vi /var/lib/mybroker/etc/apllo.xml
下記を任意に変更。
<web_admin bind="http://127.0.0.1:61680"/>
起動
/etc/init.d/apollo-broker-service start
Javaからキューイング
package jp.ne.hatena.adorechic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.util.HashMap; import java.util.Map; public class Producer { private QueueConnectionFactory factory; public Producer( { FACTORY = new ActiveMQConnectionFactory("admin","password","tcp://192.168.0.1:61613"); } public void queue(Map<String,String> param, String queueName) { QueueConnection connection = null; QueueSession session = null; QueueSender sender = null; try { QueueConnectionFactory factory = new ActiveMQConnectionFactory("admin","password","tcp://192.168.0.1:61613"); connection = FACTORY.createQueueConnection(); session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); sender = session.createSender(queue); connection.start(); MapMessage msg = session.createMapMessage(); for(Map.Entry<String,String> entry: param.entrySet()) { msg.setString(entry.getKey(), entry.getValue()); } sender.send(msg); } catch (JMSException e) { throw new RuntimeException("queue:"+queueName+"->"+param, e); } finally { try { if (sender != null) sender.close(); if (session != null) session.close(); if (connection != null) connection.close(); } catch (JMSException e) { throw new RuntimeException("close failed queue:"+queueName+"->"+param, e); } } } public static void main(String[] args) { Map<String,String> map = new HashMap<String,String>(); map.put("testKey", "testVal"); new Producer().queue(map, "TestQueue"); } }
依存関係も一応。バージョンは適宜。
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.5.0</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.0.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.4</version> </dependency>
Javaでレシーブ
SpringJMSを使うと手っ取り早い。
Consumer
package jp.ne.hatena.adorechic; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; public class Consumer implements MessageListener{ public void onMessage(Message message) { MapMessage map = (MapMessage) message; try { String value = map.getString("testKey"); System.out.println("receive->" + value); } catch (JMSException e) { e.printStackTrace(); } } }
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.0.1:61613"/> <property name="userName" value="admin" /> <property name="password" value="password" /> </bean> <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="TestQueue"/> </bean> <bean id="testQueueListener" class="jp.ne.hatena.adorechic.Consumer"/> <bean id="container" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="receiveTimeout" value="10000" /> <property name="concurrentConsumers" value="1"/> <property name="maxConcurrentConsumers" value="5"/> <property name="connectionFactory" ref="connectionFactory"/> <property name="messageListener" ref="testQueueListener"/> <property name="destination" ref="testQueue" /> </bean> </beans>
Launcher
package jp.ne.hatena.adorechic; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.listener.DefaultMessageListenerContainer; public class Launcher { public static void main(String[] args) { final ApplicationContext context = new ClassPathXmlApplicationContext("context.xml"); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run(){ System.out.println("shutdown start"); DefaultMessageListenerContainer container = (DefaultMessageListenerContainer)context.getBean("container"); container.shutdown(); } }); } }
依存関係
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>3.0.5.RELEASE</version> </dependency> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.4</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.5.0</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.0.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.4</version> </dependency>