001package org.opengion.plugin.daemon;
002
003import java.io.File;
004import java.util.Date;
005import java.util.Locale;                                                                                        // 7.2.9.4 (2020/11/20)
006
007import javax.jms.JMSException;
008import javax.jms.Message;
009import javax.jms.MessageListener;
010import javax.jms.TextMessage;
011
012// import org.opengion.fukurou.util.BizUtil;
013import org.opengion.fukurou.business.BizUtil;
014import org.opengion.fukurou.queue.QueueInfo;
015import org.opengion.fukurou.queue.QueueReceive;
016import org.opengion.fukurou.queue.QueueReceiveFactory;
017import org.opengion.fukurou.util.HybsTimerTask;
018import org.opengion.fukurou.util.StringUtil;
019import org.opengion.hayabusa.common.HybsSystem;
020import org.opengion.hayabusa.common.HybsSystemException;
021import org.opengion.hayabusa.queue.DBAccessQueue;
022
023/**
024 * メッセージキュー受信 メッセージキューの受信処理を行います。
025 *
026 * @og.group メッセージ連携
027 *
028 * @og.rev 5.10.15.2 (2019/09/20) 新規作成
029 *
030 * @version 5.0
031 * @author oota
032 * @since JDK7
033 *
034 */
035public class Daemon_QueueReceive extends HybsTimerTask {
036        /** このプログラムのVERSION文字列を設定します。   {@value} */
037        private static final String VERSION = "7.2.9.4 (2020/11/20)" ;
038
039        private int loopCnt ;
040        private QueueReceive queueReceive ;
041
042        private static final int LOOP_COUNTER = 24;
043        private static final char FPSC = File.pathSeparatorChar ;                                       // 7.2.9.4 (2020/11/20) システムに依存するパス区切り文字
044
045        private final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys("CLOUD_SQS_ACCESS_KEY");
046        private final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys("CLOUD_SQS_SECRET_KEY");
047        private final String MQ_QUEUE_TYPE;
048        private final String MQ_QURUE_SERVER_URL = HybsSystem.sys("MQ_QUEUE_SERVER_URL");
049        private final String MQ_QUEUE_RECEIVE_LISTENER =HybsSystem.sys("MQ_QUEUE_RECEIVE_LISTENER");
050
051        private final String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID");
052        private final String USER_ID   = "CYYYYY";
053        private final String PG_ID;
054        private final String DMN_NAME  = "QueueReceiveDMN";
055        private final DBAccessQueue dbAccessQueue;
056
057        private final String REAL_PATH = HybsSystem.sys("REAL_PATH");                           // 7.2.9.4 (2020/11/20)
058
059        /**
060         * コンストラクター
061         * 初期処理を行います。
062         *
063         * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する
064         */
065        public Daemon_QueueReceive() {
066                super();
067
068                // パラメータの設定
069                // 7.2.9.4 (2020/11/20) PMD:Avoid if (x != y) ..; else ..;
070                if(StringUtil.isNull(HybsSystem.sys("MQ_QUEUE_TYPE"))) {
071                        throw new RuntimeException("システムリソースにMQ_QUEUE_TYPEを登録して下さい");
072                }else {
073//                      MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase();
074                        MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase( Locale.JAPAN );    // 7.2.9.4 (2020/11/20)
075                        PG_ID = StringUtil.cut("QueRec" + MQ_QUEUE_TYPE, 10);
076                }
077
078                dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME);
079
080//              // パラメータの設定
081//              if(!StringUtil.isNull(HybsSystem.sys("MQ_QUEUE_TYPE"))) {
082////                    MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase();
083//                      MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase( Locale.JAPAN );    // 7.2.9.4 (2020/11/20)
084//                      PG_ID = StringUtil.cut("QueRec" + MQ_QUEUE_TYPE, 10);
085//              }else {
086//                      throw new RuntimeException("システムリソースにMQ_QUEUE_TYPEを登録して下さい");
087//              }
088//
089//              dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME);
090        }
091
092        /**
093         * 初期処理 MQサーバに接続します。
094         *
095         * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する
096         */
097        @Override
098        public void initDaemon() {
099                // 開始ログO
100                final StringBuilder errMsg = new StringBuilder();
101                if (MQ_QUEUE_TYPE == null) {
102                        errMsg.append("MQ_QUEUE_TYPE");
103                }
104                if (MQ_QURUE_SERVER_URL == null) {
105                        errMsg.append(" MQ_QUEUE_SERVER_URL");
106                }
107
108                if (errMsg.length() > 0) {
109                        errMsg.append(" キュータイプを特定するために、左記のシステムリソースを登録して下さい。");
110                        throw new HybsSystemException(errMsg.toString());
111                }
112
113//              final String queueType = MQ_QUEUE_TYPE.toUpperCase();
114                final String queueType = MQ_QUEUE_TYPE.toUpperCase( Locale.JAPAN );     // 7.2.9.4 (2020/11/20)
115
116                // 開始ログ
117                System.out.println("MQキュータイプ:" + queueType);
118                System.out.println("MQサーバーURL:" + MQ_QURUE_SERVER_URL);
119
120                queueReceive = QueueReceiveFactory.newQueueReceive(queueType);
121
122                queueReceive.connect(MQ_QURUE_SERVER_URL, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY);
123        }
124
125        /**
126         * 開始処理 タイマータスクのデーモン処理の開始ポイントです。
127         */
128        @Override
129        protected void startDaemon() {
130                if (loopCnt % LOOP_COUNTER == 0) {
131                        loopCnt = 1;
132                        System.out.println();
133                        System.out.print(toString() + " " + new Date() + " ");
134                } else {
135                        // 対象 キュー名(グループ名)とbizlogic名の取得処理
136                        final String[][] ge67vals = dbAccessQueue.setlectGE67();
137                        // キュー情報登録チェック
138                        if (ge67vals.length == 0) {
139                                final String errMsg = "GE67にキュー情報が登録されていません。";
140                                throw new RuntimeException(errMsg);
141                        }
142                        // MQとSQSで処理を分岐
143                        // MQ:指定キューIDからキューメッセージを取得
144                        // SQS:キューメッセージを取得してからキューID(グループID)を取得
145                        switch (MQ_QUEUE_TYPE) {
146                                case "MQ":
147                                        processMq(ge67vals);
148                                        break;
149                                case "SQS":
150                                        processSqs(ge67vals);
151                                        break;
152                                default:
153                                        final String errMsg = "リソース(MQ_QUEUE_TYPE)の値が不正です。:" + MQ_QUEUE_TYPE;
154                                        throw new RuntimeException(errMsg);
155                        }
156
157                        loopCnt++;
158                }
159        }
160
161        /**
162         * MQ用の処理
163         * GE67に登録されているキューIDの、
164         * メッセージキューを取得して処理を行います。
165         *
166         * @param ge67vals GE67の配列データ
167         */
168        private void processMq(final String[][] ge67vals) {
169                boolean listenerMode = false;
170
171                if("true".equals(MQ_QUEUE_RECEIVE_LISTENER)) {
172                        listenerMode = true;
173                }
174
175                if(listenerMode) {
176                        // リスナーの初期化
177                        queueReceive.closeListener();
178                }
179
180                // ge67のキューリスト分繰り返します
181                for (int row = 0; row < ge67vals.length; row++) {
182                        final String queueId = ge67vals[row][0];
183                        final String bizLogicId = ge67vals[row][1];
184
185                        if(listenerMode) {
186                                // リスナーを設定して、動的な受信処理(MQ専用)
187                                final QueueReceiveListener listener = new QueueReceiveListener(queueId, bizLogicId);
188                                queueReceive.setListener(queueId, listener);
189                        }else {
190                                // 1件の受信処理
191                                final QueueInfo queueInfo = queueReceive.receive(queueId);
192                                if (queueInfo != null) {
193                                        processMessage(queueId, bizLogicId, queueInfo.getMessage());
194                                        // 1件処理を行ったら処理を終了します。
195                                        break;
196                                }
197                        }
198                }
199        }
200
201        /**
202         * SQS用の処理
203         * SQSはグループIDを指定して、キューを取得することはできず、
204         * 任意のキューを1つ取得してから、
205         * 判定処理を行います。
206         * GE67に登録されていないグループIDのキューが取得された場合は、
207         * GE68にエラーレコードを登録します。
208         *
209         * @param ge67vals GE67の配列データ
210         */
211        private void processSqs(final String[][] ge67vals) {
212                // 下記はSQSの場合(キューを1件取得して処理)
213                final QueueInfo queueInfo = queueReceive.receive(null);
214
215                // キューが未取得の場合
216                if(queueInfo == null) {
217                        return;
218                }
219
220                // 受信したキューを処理
221                final String groupId = queueInfo.getSqsFifoGroupId();
222                Boolean existsFlg = false;
223                // valsにグループIDのレコードが存在するか検索
224                for (int row = 0; row < ge67vals.length; row++) {
225                        final String queueId = ge67vals[row][0];
226
227                        if (groupId != null && groupId.equals(queueId)) {
228                                // 該当レコードあり
229                                final String bizLogicId = ge67vals[row][1];
230                                processMessage(queueId, bizLogicId, queueInfo.getMessage());
231
232                                existsFlg = true;
233                                break;
234                        }
235                }
236
237                if (!existsFlg) {
238                        // 該当groupIdの未登録エラー
239                        // 処理番号生成
240                        final String syoriNo = dbAccessQueue.generateSyoriNo();
241                        dbAccessQueue.insertGE68(groupId, syoriNo, null, queueInfo.getMessage());
242                        dbAccessQueue.updateGE68Error(syoriNo, "SQSキューに設定されているグループIDが、GE67に未登録です。");
243                }
244        }
245
246        /**
247         * キャンセル処理
248         * タイマータスクのデーモン処理の終了ポイントです。
249         *
250         * @return キャンセルできれば、true
251         */
252        @Override
253        public boolean cancel() {
254                if (queueReceive != null) {
255                        queueReceive.close();
256                }
257
258                return super.cancel();
259        }
260
261        /**
262         * メッセージの処理
263         *  受信したメッセージをbizLogicに渡して、
264         *  処理を実行します。
265         *
266         * @param queueId キューID
267         * @param bizLogicId ビズロジックID
268         * @param msgText 受信メッセージ
269         */
270        private void processMessage(final String queueId, final String bizLogicId, final String msgText) {
271                String syoriNo = "";
272                try {
273                        // 処理番号生成
274                        syoriNo = dbAccessQueue.generateSyoriNo();
275
276                        // 管理テーブル登録
277                        dbAccessQueue.insertGE68(queueId, syoriNo, bizLogicId, msgText);
278
279                        // bizLogicの処理を実行
280                        callActBizLogic(SYSTEM_ID, bizLogicId, msgText);
281
282                        // 管理テーブル更新(完了)
283                        dbAccessQueue.updateGE68(syoriNo, DBAccessQueue.FGKAN_END);
284
285                } catch (Throwable te) {
286                        // bizLogicでのエラーはログの未出力して、処理を継続します。
287                        // bizLogicのエラー情報はCauseに格納されているため、Causeから取得します。
288                        String errMessage = null;
289                        if (te.getCause() != null) {
290                                // causeが設定されている場合のエラー情報
291                                errMessage = te.getCause().getMessage();
292                        } else {
293                                // causeが未設定の場合のエラー情報
294                                errMessage = te.getMessage();
295                        }
296                        System.out.println(errMessage);
297                        try {
298                                // エラーテーブルに登録
299                                dbAccessQueue.updateGE68Error(syoriNo, errMessage);
300                        //      8.0.0.0 (2021/07/31) Avoid catching generic exceptions such as Exception in try-catch block
301//                      } catch (Exception e) {
302//                              // ここでのエラーはスルーします。
303//                              System.out.println("管理テーブル登録エラー:" + e.getMessage());
304                        } catch( final Throwable th ) {
305                                // ここでのエラーはスルーします。
306                                System.out.println("管理テーブル登録エラー:" + th.getMessage());
307                        }
308                }
309        }
310
311        /**
312         * bizLogic処理の呼び出し
313         * 必要なパス情報をリソースから取得して、
314         * BizUtil.actBizLogicにパス情報を渡すことで、
315         * bizLogicの処理を行います。
316         *
317         * @og.rev 7.2.9.4 (2020/11/20) spotbugs:null になっている可能性があるメソッドの戻り値を利用している
318         *
319         * @param systemId  システムID
320         * @param logicName ロジックファイル名
321         * @param msgText   メッセージ
322         * @throws Throwable エラー情報
323         */
324        private void callActBizLogic(final String systemId, final String logicName, final String msgText) throws Throwable {
325                // 対象 クラスパスの生成
326                // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。
327                // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。
328                // bizLogicTag.javaのコードを移植
329                final String classDir = REAL_PATH + HybsSystem.sys( "BIZLOGIC_CLASS_PATH" );            // bizの下のパス
330                final String webIinf  = REAL_PATH + "WEB-INF" + File.separator ;
331
332                final StringBuilder sb = new StringBuilder().append('.').append(FPSC);
333
334                final File lib = new File( webIinf + "lib");
335                final File[] libFiles = lib.listFiles();
336                if( libFiles != null ) {
337                        // 7.2.9.4 (2020/11/20) PMD:This for loop can be replaced by a foreach loop
338                        for( final File file : libFiles ) {
339                                sb.append( file.getAbsolutePath() ).append(FPSC);
340                        }
341//                      for (int i = 0; i < libFiles.length; i++) {
342//                              sb.append( libFiles[i].getAbsolutePath() ).append(FPSC);
343//                      }
344                }
345
346                // 上記で生成したクラスパスをclassPathに格納
347                final String classPath =
348                        sb.append( webIinf ).append( "classes" ).append(FPSC)
349                          .append( classDir ).append(FPSC)              // bizの下のパス
350                          .toString();
351
352                // ソースパス情報の生成
353                final String  srcDir        = REAL_PATH + HybsSystem.sys( "BIZLOGIC_SRC_PATH" );
354                final boolean isAutoCompile = HybsSystem.sysBool( "BIZLOGIC_AUTO_COMPILE" );
355                final boolean isHotDeploy   = HybsSystem.sysBool( "BIZLOGIC_HOT_DEPLOY" );
356
357                // bizLogicに渡すパラメータ
358                final String[] keys = new String[] { "message" };
359                final String[] vals = new String[] { msgText };
360
361                // bizLogic処理の実行
362                BizUtil.actBizLogic( srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals );
363        }
364
365//      7.2.9.4 (2020/11/20) spotbugs:null になっている可能性があるメソッドの戻り値を利用している
366//      private void callActBizLogic(final String systemId, final String logicName, final String msgText) throws Throwable {
367//              // 対象 クラスパスの生成
368//              // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。
369//              // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。
370//              // bizLogicTag.javaのコードを移植
371//              final StringBuilder sb = new StringBuilder();
372//              sb.append('.').append(File.pathSeparatorChar);
373//              final File lib = new File(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "lib");
374//              final File[] libFiles = lib.listFiles();
375//              for (int i = 0; i < libFiles.length; i++) {
376//                      sb.append(libFiles[i].getAbsolutePath()).append(File.pathSeparatorChar);
377//              }
378//              sb.append(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "classes").append(File.pathSeparatorChar);
379//              // bizの下のパス
380//              sb.append(HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH")).append(File.pathSeparatorChar);
381//              // 上記で生成したクラスパスをclassPathに格納
382//              final String classPath = sb.toString();
383//
384//              // ソースパス情報の生成
385//              final String srcDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_SRC_PATH");
386//              final String classDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH");
387//              final boolean isAutoCompile = HybsSystem.sysBool("BIZLOGIC_AUTO_COMPILE");
388//              final boolean isHotDeploy = HybsSystem.sysBool("BIZLOGIC_HOT_DEPLOY");
389//
390//              // bizLogicに渡すパラメータ
391//              final String[] keys = new String[] { "message" };
392//              final String[] vals = new String[] { msgText };
393//
394//              // bizLogic処理の実行
395//              BizUtil.actBizLogic(srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals);
396//      }
397
398        /**
399         * 受信処理リスナー用のインナークラス
400         * QueueReceiveリスナークラス リスナー用のクラスです。
401         *  MQに設定することで、メッセージが受信されると、
402         * onMessageメソッドが実行されます。
403         *
404         * @og.rev 7.2.9.4 (2020/11/20) private final 追加
405         */
406//      class QueueReceiveListener implements MessageListener {
407        private final class QueueReceiveListener implements MessageListener {
408//              private String queueId = "";
409//              private String bizLogicId = "";
410                private final String queueId ;
411                private final String bizLogicId ;
412
413                /**
414                 * コンストラクター 初期処理を行います。
415                 *
416                 * @param quId  キューID
417                 * @param bizId ビズロジックID
418                 */
419                public QueueReceiveListener(final String quId, final String bizId) {
420                        queueId    = quId;
421                        bizLogicId = bizId;
422                }
423
424                /**
425                 * メッセージ受信処理 MQサーバにメッセージが受信されると、 メソッドの処理が行われます。
426                 *
427                 * @param message 受信メッセージ
428                 */
429                @Override
430                public void onMessage(final Message message) {
431                        // 要求番号 : ここでは使用していません。
432                        final String ykno = "";
433
434                        // メッセージ受信
435                        final TextMessage msg = (TextMessage) message;
436                        String msgText = "";
437
438                        try {
439                                // キューサーバのメッセージを取得
440                                msgText = msg.getText();
441
442                                // メーッセージの受信応答を返します。
443                                msg.acknowledge();
444
445                                processMessage(queueId, bizLogicId, msgText);
446
447                        } catch (JMSException jmse) {
448                                try {
449                                        // 管理テーブル更新
450                                        // 管理テーブル更新(エラー)
451                                        dbAccessQueue.updateGE68(ykno, DBAccessQueue.FGKAN_ERROR);
452                                //      8.0.0.0 (2021/07/31) Avoid catching generic exceptions such as Exception in try-catch block
453//                              } catch (Exception e) {
454//                                      // ここでのエラーはスルーします。
455//                                      System.out.println("管理テーブル登録エラー:" + e.getMessage());
456                                } catch( final Throwable th ) {
457                                        // ここでのエラーはスルーします。
458                                        System.out.println("管理テーブル登録エラー:" + th.getMessage());
459                                }
460
461                                throw new HybsSystemException("bizLogicの処理中にエラーが発生しました。" + jmse.getMessage());
462                        }
463                }
464        }
465}