001package org.opengion.fukurou.queue;
002
003import java.util.ArrayList;
004import java.util.List;
005
006import javax.jms.JMSException;
007// import javax.jms.Message;
008import javax.jms.MessageListener;
009import javax.jms.Queue;
010import javax.jms.QueueConnection;
011import javax.jms.QueueConnectionFactory;
012import javax.jms.QueueReceiver;
013import javax.jms.QueueSession;
014import javax.jms.TextMessage;
015import javax.naming.Context;
016import javax.naming.InitialContext;
017
018import org.apache.activemq.ActiveMQConnectionFactory;
019
020/**
021 * MQメッセージ受信用クラス。
022 *
023 * @og.group メッセージ連携
024 *
025 * @og.rev 5.10.15.2 (2019/09/20) 新規作成
026 *
027 * @version 5
028 * @author oota
029 * @since JDK7
030 */
031public class QueueReceive_MQ implements QueueReceive{
032
033        private QueueConnection connection ;
034        private QueueSession session ;
035        private QueueReceiver receiver ;
036//      List<QueueReceiver> listReceiver ;
037        private List<QueueReceiver> listReceiver ;              // 7.2.9.4 (2020/11/20)
038        private boolean batch ;
039
040        /**
041         * 接続処理
042         * メッセージキューサーバに接続します。
043         *
044         *  @param jmsServer jsmサーバ
045         *  @param sqsAccessKey sqs用awsアクセスキー(MQでは利用しません)
046         *  @param sqsSecretKey sqs用awsシークレットキー(MQでは利用しません)
047         */
048        public void connect(final String jmsServer, final String sqsAccessKey, final String sqsSecretKey) {
049                connect(jmsServer);
050        }
051
052        /**
053         * 接続処理
054         * jmsServerに接続します。
055         * MQの場合は、受信リスナーを設定して、随時メッセージ受信処理を行います。
056         * SQSの場合は最大受信件数の10件の処理を行います。
057         *
058         * @param jmsServer 接続先情報 MQ:jndi接続先 SQS:URL
059         */
060        private void connect(final String jmsServer) {
061                try {
062                        if(batch) {
063                                // バッチ用
064                                final String mqUserId = System.getProperty("mqUserId");
065                                final String mqPassword = System.getProperty("mqPassword");
066                                final QueueConnectionFactory factory = new ActiveMQConnectionFactory(jmsServer);
067                                connection = factory.createQueueConnection(mqUserId,  mqPassword);
068                        }else {
069                                // jndi接続用
070                                final Context ctx = new InitialContext();
071                                final QueueConnectionFactory factory = (QueueConnectionFactory)ctx.lookup("java:comp/env/" + jmsServer);
072                                connection = factory.createQueueConnection();
073                        }
074
075                        connection.start();
076
077                        // Receiveの作成
078                        session = connection.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE);
079
080                        // 初期化
081                        listReceiver = new ArrayList<QueueReceiver>();
082//              }catch(Exception e) {                                           // 8.0.0.0 (2021/07/31)
083//                      throw new RuntimeException("MQサーバの接続に失敗しました。:" + e.getMessage());
084                } catch( final Throwable th ) {
085                        throw new RuntimeException("MQサーバの接続に失敗しました。:",th );
086                }
087        }
088
089        /**
090         * 受信処理
091         * メッセージキューの受信の処理を行います。
092         *
093         * @param queueName キューの名前
094         * @return キュー情報格納クラス
095         */
096        @Override
097        public QueueInfo receive(final String queueName) {
098                QueueInfo queueInfo = null;
099
100                try {
101                        final Queue queue = session.createQueue(queueName);
102                        receiver = session.createReceiver(queue);
103
104                        final TextMessage msg = (TextMessage)receiver.receive(1000);
105
106                        if(msg != null) {
107                                // メッセージ受信の確認応答
108                                msg.acknowledge();
109
110                                // メッセージの設定
111                                queueInfo = new QueueInfo();
112                                queueInfo.setMessage(msg.getText());
113                        }
114//              }catch(Exception e) {                                                           // 8.0.0.0 (2021/07/31)
115//                      throw new RuntimeException(e.getMessage());
116                } catch( final Throwable th ) {
117                        throw new RuntimeException( th );
118                }finally {
119                        try {
120                                receiver.close();
121//                      }catch(Exception e) { ; }                                               // 8.0.0.0 (2021/07/31)
122                        } catch( final Throwable th ) {
123                                System.out.println("receiverのクローズに失敗しました。");
124                        }
125                }
126
127                return queueInfo;
128        }
129
130        /**
131         * リスナーの起動
132         * 指定したキュー名に対して、
133         * MessageListenerのリスナーを設定します。
134         *
135         * @param queueName キュー名
136         * @param listener MessageListerを実装したクラス
137         */
138        @Override
139        public void setListener(final String queueName, final MessageListener listener) {
140                QueueReceiver receiver = null;
141                try {
142                        final Queue queue = session.createQueue(queueName);
143                        receiver = session.createReceiver(queue);
144                        receiver.setMessageListener(listener);
145
146                        // リスナーの起動
147                        listReceiver.add(receiver);
148                }catch(JMSException ex) {
149//                      throw new RuntimeException("リスナーの起動に失敗しました。" + e.getMessage());
150                        throw new RuntimeException("リスナーの起動に失敗しました。" , ex);             // 8.0.0.0 (2021/07/31) original stack trace may be lost
151                }
152        }
153
154        /**
155         * クローズリスナー
156         * レシーバーをクローズすることで、
157         * リスナーの処理を終了します。
158         */
159        public void closeListener() {
160                for(final QueueReceiver receiver: listReceiver) {
161                        try {
162                                receiver.close();
163//                      }catch(Exception e) { ; }                                               // 8.0.0.0 (2021/07/31)
164                        } catch( final Throwable th ) {
165                                System.out.println("receiverのクローズに失敗しました。");
166                        }
167                }
168
169                // 初期化
170                listReceiver = null;
171                listReceiver = new ArrayList<QueueReceiver>();
172        }
173
174        /**
175         * クローズ処理
176         * クローズ処理を行います。
177         *
178         * @og.rev 8.0.0.0 (2021/07/31) Avoid catching generic exceptions such as Exception in try-catch block
179         */
180        @Override
181        public void close() {
182                if(receiver != null) {
183                        try {
184                                receiver.close();
185//                      }catch(Exception e) { ; }                                               // 8.0.0.0 (2021/07/31)
186                        } catch( final Throwable th ) {
187                                System.out.println("receiverのクローズに失敗しました。");
188                        }
189                }
190                if(session != null) {
191                        try {
192                                session.close();
193//                      }catch(Exception e) { ; }                                               // 8.0.0.0 (2021/07/31)
194                        } catch( final Throwable th ) {
195                                System.out.println("sessionのクローズに失敗しました。");
196                        }
197                }
198                if(connection != null) {
199                        try {
200                                connection.close();
201//                      }catch(Exception e) { ; }                                               // 8.0.0.0 (2021/07/31)
202                        } catch( final Throwable th ) {
203                                System.out.println("connectionのクローズに失敗しました。");
204                        }
205                }
206        }
207
208        /**
209         * バッチ処理判定フラグを設定します。
210         *
211         * @param batchFlg バッチ処理判定フラグ
212         */
213        public void setBatchFlg(final Boolean batchFlg) {
214                batch = batchFlg;
215        }
216
217        /**
218         * 検証用メソッド
219         * テスト用のメソッドです。
220         *
221         * @param args 引数
222         */
223        public static void main(final String[] args) {
224                final QueueReceive receive = new QueueReceive_MQ();
225                final String jmsServer = "tcp://localhost:61616";
226
227                // バッチフラグにtrueを設定
228                // 未設定の場合は、tomcatのjndi接続処理が実行されます。
229                receive.setBatchFlg(true);
230
231                // 認証情報の設定
232                System.setProperty("mqUserId", "admin");
233                System.setProperty("mqPassword", "admin");
234
235                // 接続
236                receive.connect(jmsServer, null, null);
237
238                // 処理対象のキュー名
239                final String queueName = "queue01";
240
241
242                // ** 1件受信する場合
243                final QueueInfo queueInfo = receive.receive(queueName);
244                if(queueInfo != null) {
245                        System.out.println("message:" + queueInfo.getMessage());
246                }else {
247                        System.out.println("キューが登録されていません。");
248                }
249
250//              // ** リスナーを設定して、受信を検知すると処理を実行します。(MQのみ)
251//              // MessageListerを実装した、QueueReceiveListenerクラスを作成します。
252//              MessageListener listener = new QueueReceiveListener();
253//              receive.setListener(queueName, listener);
254//              // 複数のキューにリスナーを設定することも可能です。
255//              receive.setListener("queue02", listener);
256//
257//              try {
258//                      // 1分間リスナーを起動しておく場合の、プロセス待機処理
259//                      Thread.sleep(60 * 1000);
260//              }catch(InterruptedException e) {
261//                      throw new RuntimeException(e.getMessage());
262//              }
263
264                // リスナー利用時は、closeListenerを実行して、解放してください。
265                receive.closeListener();
266
267                // 終了処理
268                receive.close();
269        }
270
271//      /**
272//       * QueueReceiveリスナークラス
273//       * リスナー用のクラスです。
274//       * MQに設定することで、メッセージが受信されると、
275//       * 自動的にonMessageメソッドが実行されます。
276//       *
277//       */
278//      static class QueueReceiveListener implements MessageListener {
279//              /**
280//               * メッセージ受信処理
281//               * MQサーバにメッセージが受信されると、
282//               * メソッドの処理が行われます。
283//               *
284//               * @param message 受信メッセージ
285//               */
286//              @Override
287//              public void onMessage(final Message message) {
288//
289//                      // メッセージ受信
290//                      TextMessage msg = (TextMessage) message;
291//                      String msgText = "";
292//
293//                      try {
294//                              // キューサーバのメッセージを取得
295//                              msgText = msg.getText();
296//                              // メーッセージの受信応答を返します。
297//                              msg.acknowledge();
298//
299//                              System.out.println("message:" + msgText);
300//
301//                      } catch (JMSException e) {
302//                              throw new RuntimeException(e.getMessage());
303//                      }
304//              }
305//      }
306
307}