001/* 002 * Copyright (c) 2009 The openGion Project. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the specific language 014 * governing permissions and limitations under the License. 015 */ 016 017package org.opengion.fukurou.queue; 018 019import javax.jms.Connection; 020import javax.jms.JMSException; 021import javax.jms.MessageProducer; 022import javax.jms.Queue; 023import javax.jms.QueueConnectionFactory; 024import javax.jms.QueueSession; 025import javax.jms.Session; 026import javax.jms.TextMessage; 027import javax.naming.Context; 028import javax.naming.InitialContext; 029import javax.naming.NamingException; 030 031import org.apache.activemq.ActiveMQConnection; 032import org.apache.activemq.ActiveMQConnectionFactory; 033// import org.opengion.hayabusa.common.HybsSystemException; 034 035// import com.sun.star.uno.RuntimeException; 036 037/** 038 * MQサーバへのメッセージキュー送信用クラス 039 * 040 * MQサーバへのメッセージキュー送信用のクラスです。 041 * Apache ActiveMQとAmazonMQへの送信が可能です。 042 * tomcatからの送信(JNDI利用)と、 043 * バッチ処理(urlを指定し接続)の2通りが可能です。 044 * 045 * ※Apache ActiveMQとAmazonMQの切り替えは、 046 * jmsServerの接続先URLを変更するのみで接続の変更が可能です。 047 * (proxy環境からAmazonMqへの接続は行えない場合があります) 048 * 049 * @og.group メッセージ連携 050 * 051 * @og.rev 5.10.14.0 (2019/08/01) 新規作成 052 * 053 * @version 5 054 * @author oota 055 * @since JDK7 056 * 057 */ 058public class QueueSend_MQ implements QueueSend { 059 private Connection connection; 060 private Session session; 061 private MessageProducer sender; 062 private Context ctx; 063 // バッチ用フィールド 064 private boolean batch; 065 private String mqUserId = ""; 066 private String mqPassword = ""; 067 068 /** 069 * 接続処理 070 * MQサーバに接続を行います。 071 * 072 * @param jmsServer jmsサーバ接続名(バッチの場合はurl) 073 */ 074 public void connect(final String jmsServer) { 075 try { 076 ctx = new InitialContext(); 077 // 1. Connectionの作成s 078// QueueConnectionFactory factory = null; 079 final QueueConnectionFactory factory; 080 if (batch) { 081 // バッチ処理の場合。URL指定で、ユーザIDとパスワードを指定して接続。 082 mqUserId = System.getProperty("mqUserId"); 083 mqPassword = System.getProperty("mqPassword"); 084 factory = new ActiveMQConnectionFactory(jmsServer); 085 connection = (ActiveMQConnection)factory.createConnection(mqUserId, mqPassword); 086 } else { 087 // tomcat接続の場合。JNDIを利用して接続。 088 factory = (QueueConnectionFactory) ctx.lookup("java:comp/env/" + jmsServer); 089 connection = (ActiveMQConnection)factory.createConnection(); 090 } 091 092 // 2. Connectioの開始 093 connection.start(); 094 095 } catch (final JMSException jmse) { 096 throwErrMsg("MQサーバーの接続に失敗しました。" + jmse.getMessage()); 097 } catch (final NamingException ne) { 098 throwErrMsg("名前解決に失敗しました。" + ne.getMessage()); 099 } 100 } 101 102 /** 103 * 接続処理 104 * MQサーバに接続します。 105 * connect(String jmsServer)と同じ処理になります。 106 * 107 * @og.rev 5.10.15.0 (2019/08/30) 引数追加対応 108 * 109 * @param jmsServer jmsサーバ情報 110 * @param sqsAccessKey アクセスキー(MQサーバでは未使用) 111 * @param sqsSecretKey シークレットキー(MQサーバでは未使用) 112 */ 113 @Override 114 public void connect(final String jmsServer, final String sqsAccessKey, final String sqsSecretKey) { 115 // MQではsqsAccessKeyとsqsSecretKeyは利用しません。 116 connect(jmsServer); 117 } 118 119 /** 120 * エラーメッセージ送信。 121 * 122 * @og.rev 5.10.15.0 (2019/08/30) Hybs除外 123 * 124 * @param errMsg エラーメッセージ 125 */ 126 public void throwErrMsg(final String errMsg) { 127 throw new RuntimeException( errMsg ); 128// if (batch) { 129// // バッチ用エラー 130// throw new RuntimeException(errMsg); 131// } else { 132// // 画面用エラー 133// throw new HybsSystemException(errMsg); 134// } 135 } 136 137 /** 138 * メッセージ送信 139 * MQサーバにメッセージを送信します。 140 * 141 * @param queueInfo 送信キュー情報 142 */ 143 @Override 144 public void sendMessage(final QueueInfo queueInfo) { 145 try { 146 // 初期チェック 147 if (connection == null) { 148 throwErrMsg("MQサーバに接続されていません。"); 149 } 150 151 // 1. QueueSessionの作成 152 session = connection.createSession(queueInfo.isMqTransacted(), queueInfo.getMqAcknowledgeMode()); 153 if (session == null) { 154 throwErrMsg("キューセッションの生成に失敗しました。"); 155 } 156 157 // 2. Queueの作成 158// Queue queue = null; 159// queue = session.createQueue(queueInfo.getMqQueueName()); 160 final Queue queue = session.createQueue(queueInfo.getMqQueueName()); 161 sender = session.createProducer(queue); 162 163 // 3. テキストメッセージの作成 164 final TextMessage msg = session.createTextMessage(queueInfo.getMessage()); 165 166 // 4. 送信処理 167 sender.send(msg); 168 169 } catch (JMSException e) { 170 throwErrMsg("キューの送信処理に失敗しました。" + e.getMessage()); 171 } 172 } 173 174 /** 175 * クローズ処理 176 * MQサーバとの接続をクローズします。 177 * 178 * @og.rev 8.0.0.0 (2021/07/31) Avoid catching generic exceptions such as Exception in try-catch block 179 */ 180 @Override 181 public void close() { 182 if (ctx != null) { 183 try { 184 ctx.close(); 185// } catch (Exception e) { // 8.0.0.0 (2021/07/31) 186 } catch( final Throwable th ) { 187 System.out.println("ctxのクローズに失敗しました。"); 188 } 189 } 190 // 1. sender,session,connectionのクローズ処理 191 if (sender != null) { 192 try { 193 sender.close(); 194// } catch (Exception e) { // 8.0.0.0 (2021/07/31) 195 } catch( final Throwable th ) { 196 System.out.println("senderのクローズに失敗しました。"); 197 } 198 } 199 if (session != null) { 200 try { 201 session.close(); 202// } catch (Exception e) { // 8.0.0.0 (2021/07/31) 203 } catch( final Throwable th ) { 204 System.out.println("sessionのクローズに失敗しました。"); 205 } 206 } 207 if (connection != null) { 208 try { 209 connection.close(); 210// } catch (Exception e) { // 8.0.0.0 (2021/07/31) 211 } catch( final Throwable th ) { 212 System.out.println("connectionのクローズに失敗しました。"); 213 } 214 } 215 } 216 217 /** 218 * バッチ処理判定フラグを設定します。 219 * バッチ処理の場合は引数で接続先情報を与えます。 220 * それ以外の場合(Tomcat)ではJNDIより情報を取得します。 221 * 222 * @param batchFlg バッチ処理判定フラグ 223 */ 224 @Override 225 public void setBatchFlg(final Boolean batchFlg) { 226 batch = batchFlg; 227 } 228 229 /** 230 * テスト用メソッド 231 * テスト実行用です。 232 * 233 * @param args 引数 234 */ 235 public static void main(final String[] args) { 236 System.out.println("main start"); 237 // 送信情報の設定 238 final String url = "tcp://localhost:61616"; 239 final String queueName = "test01"; 240 final String msg = "送信メッセージ"; 241 242 final QueueInfo queueInfo = new QueueInfo(); 243 queueInfo.setMqQueueName(queueName); 244 queueInfo.setMqTransacted(false); 245 queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE); 246 queueInfo.setMessage(msg); 247 248 final QueueSend queueSend = new QueueSend_MQ(); 249 queueSend.setBatchFlg(true); 250 251 try { 252 queueSend.connect(url,null,null); 253// queueSend.connect(url); 254 queueSend.sendMessage(queueInfo); 255// } catch (final Exception e) { // 8.0.0.0 (2021/07/31) Avoid catching generic exceptions such as Exception in try-catch block 256// System.out.println(e.getMessage()); 257 } catch( final Throwable th ) { 258 System.out.println(th.getMessage()); 259 } finally { 260 queueSend.close(); 261 } 262 263 System.out.println("main end"); 264 } 265}