001package org.opengion.fukurou.queue; 002 003import java.util.ArrayList; 004import java.util.List; 005 006import javax.jms.JMSException; 007// import javax.jms.Message; 008import javax.jms.MessageListener; 009import javax.jms.Queue; 010import javax.jms.QueueConnection; 011import javax.jms.QueueConnectionFactory; 012import javax.jms.QueueReceiver; 013import javax.jms.QueueSession; 014import javax.jms.TextMessage; 015import javax.naming.Context; 016import javax.naming.InitialContext; 017 018import org.apache.activemq.ActiveMQConnectionFactory; 019 020/** 021 * MQメッセージ受信用クラス。 022 * 023 * @og.group メッセージ連携 024 * 025 * @og.rev 5.10.15.2 (2019/09/20) 新規作成 026 * 027 * @version 5 028 * @author oota 029 * @since JDK7 030 */ 031public class QueueReceive_MQ implements QueueReceive{ 032 033 private QueueConnection connection ; 034 private QueueSession session ; 035 private QueueReceiver receiver ; 036// List<QueueReceiver> listReceiver ; 037 private List<QueueReceiver> listReceiver ; // 7.2.9.4 (2020/11/20) 038 private boolean batch ; 039 040 /** 041 * 接続処理 042 * メッセージキューサーバに接続します。 043 * 044 * @param jmsServer jsmサーバ 045 * @param sqsAccessKey sqs用awsアクセスキー(MQでは利用しません) 046 * @param sqsSecretKey sqs用awsシークレットキー(MQでは利用しません) 047 */ 048 public void connect(final String jmsServer, final String sqsAccessKey, final String sqsSecretKey) { 049 connect(jmsServer); 050 } 051 052 /** 053 * 接続処理 054 * jmsServerに接続します。 055 * MQの場合は、受信リスナーを設定して、随時メッセージ受信処理を行います。 056 * SQSの場合は最大受信件数の10件の処理を行います。 057 * 058 * @param jmsServer 接続先情報 MQ:jndi接続先 SQS:URL 059 */ 060 private void connect(final String jmsServer) { 061 try { 062 if(batch) { 063 // バッチ用 064 final String mqUserId = System.getProperty("mqUserId"); 065 final String mqPassword = System.getProperty("mqPassword"); 066 final QueueConnectionFactory factory = new ActiveMQConnectionFactory(jmsServer); 067 connection = factory.createQueueConnection(mqUserId, mqPassword); 068 }else { 069 // jndi接続用 070 final Context ctx = new InitialContext(); 071 final QueueConnectionFactory factory = (QueueConnectionFactory)ctx.lookup("java:comp/env/" + jmsServer); 072 connection = factory.createQueueConnection(); 073 } 074 075 connection.start(); 076 077 // Receiveの作成 078 session = connection.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE); 079 080 // 初期化 081 listReceiver = new ArrayList<QueueReceiver>(); 082// }catch(Exception e) { // 8.0.0.0 (2021/07/31) 083// throw new RuntimeException("MQサーバの接続に失敗しました。:" + e.getMessage()); 084 } catch( final Throwable th ) { 085 throw new RuntimeException("MQサーバの接続に失敗しました。:",th ); 086 } 087 } 088 089 /** 090 * 受信処理 091 * メッセージキューの受信の処理を行います。 092 * 093 * @param queueName キューの名前 094 * @return キュー情報格納クラス 095 */ 096 @Override 097 public QueueInfo receive(final String queueName) { 098 QueueInfo queueInfo = null; 099 100 try { 101 final Queue queue = session.createQueue(queueName); 102 receiver = session.createReceiver(queue); 103 104 final TextMessage msg = (TextMessage)receiver.receive(1000); 105 106 if(msg != null) { 107 // メッセージ受信の確認応答 108 msg.acknowledge(); 109 110 // メッセージの設定 111 queueInfo = new QueueInfo(); 112 queueInfo.setMessage(msg.getText()); 113 } 114// }catch(Exception e) { // 8.0.0.0 (2021/07/31) 115// throw new RuntimeException(e.getMessage()); 116 } catch( final Throwable th ) { 117 throw new RuntimeException( th ); 118 }finally { 119 try { 120 receiver.close(); 121// }catch(Exception e) { ; } // 8.0.0.0 (2021/07/31) 122 } catch( final Throwable th ) { 123 System.out.println("receiverのクローズに失敗しました。"); 124 } 125 } 126 127 return queueInfo; 128 } 129 130 /** 131 * リスナーの起動 132 * 指定したキュー名に対して、 133 * MessageListenerのリスナーを設定します。 134 * 135 * @param queueName キュー名 136 * @param listener MessageListerを実装したクラス 137 */ 138 @Override 139 public void setListener(final String queueName, final MessageListener listener) { 140 QueueReceiver receiver = null; 141 try { 142 final Queue queue = session.createQueue(queueName); 143 receiver = session.createReceiver(queue); 144 receiver.setMessageListener(listener); 145 146 // リスナーの起動 147 listReceiver.add(receiver); 148 }catch(JMSException ex) { 149// throw new RuntimeException("リスナーの起動に失敗しました。" + e.getMessage()); 150 throw new RuntimeException("リスナーの起動に失敗しました。" , ex); // 8.0.0.0 (2021/07/31) original stack trace may be lost 151 } 152 } 153 154 /** 155 * クローズリスナー 156 * レシーバーをクローズすることで、 157 * リスナーの処理を終了します。 158 */ 159 public void closeListener() { 160 for(final QueueReceiver receiver: listReceiver) { 161 try { 162 receiver.close(); 163// }catch(Exception e) { ; } // 8.0.0.0 (2021/07/31) 164 } catch( final Throwable th ) { 165 System.out.println("receiverのクローズに失敗しました。"); 166 } 167 } 168 169 // 初期化 170 listReceiver = null; 171 listReceiver = new ArrayList<QueueReceiver>(); 172 } 173 174 /** 175 * クローズ処理 176 * クローズ処理を行います。 177 * 178 * @og.rev 8.0.0.0 (2021/07/31) Avoid catching generic exceptions such as Exception in try-catch block 179 */ 180 @Override 181 public void close() { 182 if(receiver != null) { 183 try { 184 receiver.close(); 185// }catch(Exception e) { ; } // 8.0.0.0 (2021/07/31) 186 } catch( final Throwable th ) { 187 System.out.println("receiverのクローズに失敗しました。"); 188 } 189 } 190 if(session != null) { 191 try { 192 session.close(); 193// }catch(Exception e) { ; } // 8.0.0.0 (2021/07/31) 194 } catch( final Throwable th ) { 195 System.out.println("sessionのクローズに失敗しました。"); 196 } 197 } 198 if(connection != null) { 199 try { 200 connection.close(); 201// }catch(Exception e) { ; } // 8.0.0.0 (2021/07/31) 202 } catch( final Throwable th ) { 203 System.out.println("connectionのクローズに失敗しました。"); 204 } 205 } 206 } 207 208 /** 209 * バッチ処理判定フラグを設定します。 210 * 211 * @param batchFlg バッチ処理判定フラグ 212 */ 213 public void setBatchFlg(final Boolean batchFlg) { 214 batch = batchFlg; 215 } 216 217 /** 218 * 検証用メソッド 219 * テスト用のメソッドです。 220 * 221 * @param args 引数 222 */ 223 public static void main(final String[] args) { 224 final QueueReceive receive = new QueueReceive_MQ(); 225 final String jmsServer = "tcp://localhost:61616"; 226 227 // バッチフラグにtrueを設定 228 // 未設定の場合は、tomcatのjndi接続処理が実行されます。 229 receive.setBatchFlg(true); 230 231 // 認証情報の設定 232 System.setProperty("mqUserId", "admin"); 233 System.setProperty("mqPassword", "admin"); 234 235 // 接続 236 receive.connect(jmsServer, null, null); 237 238 // 処理対象のキュー名 239 final String queueName = "queue01"; 240 241 242 // ** 1件受信する場合 243 final QueueInfo queueInfo = receive.receive(queueName); 244 if(queueInfo != null) { 245 System.out.println("message:" + queueInfo.getMessage()); 246 }else { 247 System.out.println("キューが登録されていません。"); 248 } 249 250// // ** リスナーを設定して、受信を検知すると処理を実行します。(MQのみ) 251// // MessageListerを実装した、QueueReceiveListenerクラスを作成します。 252// MessageListener listener = new QueueReceiveListener(); 253// receive.setListener(queueName, listener); 254// // 複数のキューにリスナーを設定することも可能です。 255// receive.setListener("queue02", listener); 256// 257// try { 258// // 1分間リスナーを起動しておく場合の、プロセス待機処理 259// Thread.sleep(60 * 1000); 260// }catch(InterruptedException e) { 261// throw new RuntimeException(e.getMessage()); 262// } 263 264 // リスナー利用時は、closeListenerを実行して、解放してください。 265 receive.closeListener(); 266 267 // 終了処理 268 receive.close(); 269 } 270 271// /** 272// * QueueReceiveリスナークラス 273// * リスナー用のクラスです。 274// * MQに設定することで、メッセージが受信されると、 275// * 自動的にonMessageメソッドが実行されます。 276// * 277// */ 278// static class QueueReceiveListener implements MessageListener { 279// /** 280// * メッセージ受信処理 281// * MQサーバにメッセージが受信されると、 282// * メソッドの処理が行われます。 283// * 284// * @param message 受信メッセージ 285// */ 286// @Override 287// public void onMessage(final Message message) { 288// 289// // メッセージ受信 290// TextMessage msg = (TextMessage) message; 291// String msgText = ""; 292// 293// try { 294// // キューサーバのメッセージを取得 295// msgText = msg.getText(); 296// // メーッセージの受信応答を返します。 297// msg.acknowledge(); 298// 299// System.out.println("message:" + msgText); 300// 301// } catch (JMSException e) { 302// throw new RuntimeException(e.getMessage()); 303// } 304// } 305// } 306 307}