MQ接收队列到本地文件的Java代码详解编程语言

MQ接收队列到本地文件

 

MQFileReceiver.java     

package com.mq.dpca.file; 
 
import java.io.File; 
import java.io.FileOutputStream; 
 
import com.ibm.mq.MQEnvironment; 
import com.ibm.mq.MQException; 
import com.ibm.mq.MQGetMessageOptions; 
import com.ibm.mq.MQMessage; 
import com.ibm.mq.MQQueue; 
import com.ibm.mq.MQQueueManager; 
import com.ibm.mq.constants.MQConstants; 
import com.mq.dpca.msg.MQConfig; 
import com.mq.dpca.util.ReadCmdLine; 
import com.mq.dpca.util.RenameUtil; 
 
/** 
 *  
 * MQ分组接收文件功能 
 * 主动轮询 
 */ 
public class MQFileReceiver { 
	private MQQueueManager qmgr; // 连接到队列管理器 
 
	private MQQueue inQueue; // 传输队列 
 
	private String queueName = ""; // 队列名称 
 
	private String host = ""; // 
 
	private int port = 1414; // 侦听器的端口号 
 
	private String channel = ""; // 通道名称 
 
	private String qmgrName = ""; // 队列管理器 
 
	private MQMessage inMsg; // 创建消息缓冲 
 
	private MQGetMessageOptions gmo; // 设置获取消息选项 
 
	private static String fileName = null; // 接收队列上的消息并存入文件 
 
	private int ccsid = 0; 
 
	private static String file_dir = null; 
 
	/** 
	 * 程序的入口 
	 *  
	 * @param args 
	 */ 
	public static void main(String args[]) { 
		MQFileReceiver mfs = new MQFileReceiver(); 
		//初始化连接 
		mfs.initproperty(); 
		//接收文件 
		mfs.runGoupReceiver(); 
		//获取shell脚本名 
//		String shellname = MQConfig.getValueByKey(fileName); 
//		if(shellname!=null&&!"".equals(shellname)){ 
//			//调用shell 
//			ReadCmdLine.callShell(shellname); 
//		}else{ 
//			System.out.println("have no shell name,Only receive files."); 
//		} 
 
	} 
 
	public void runGoupReceiver() { 
		try { 
			init(); 
			getGroupMessages(); 
			qmgr.commit(); 
			System.out.println("/n Messages successfully Receive "); 
		} catch (MQException mqe) { 
			mqe.printStackTrace(); 
			try { 
				System.out.println("/n Backing out Transaction "); 
				qmgr.backout(); 
				System.exit(2); 
			} catch (Exception e) { 
				e.printStackTrace(); 
				System.exit(2); 
			} 
		} catch (Exception e) { 
			e.printStackTrace(); 
			System.exit(2); 
		} 
	} 
 
	/** 
	 * 初始化服务器连接信息 
	 *  
	 * @throws Exception 
	 */ 
	private void init() throws Exception { 
		/* 为客户机连接设置MQEnvironment属性 */ 
		MQEnvironment.hostname = host; 
		MQEnvironment.channel = channel; 
		MQEnvironment.port = port; 
 
		/* 连接到队列管理器 */ 
		qmgr = new MQQueueManager(qmgrName); 
 
		/* 设置队列打开选项以输 */ 
		int opnOptn = MQConstants.MQOO_INPUT_AS_Q_DEF 
				| MQConstants.MQOO_FAIL_IF_QUIESCING; 
 
		/* 打开队列以输 */ 
		inQueue = qmgr.accessQueue(queueName, opnOptn, null, null, null); 
	} 
 
	/** 
	 * 接受文件的主函数 
	 *  
	 * @throws Exception 
	 */ 
	public void getGroupMessages() { 
		/* 设置获取消息选项 */ 
		gmo = new MQGetMessageOptions(); 
		gmo.options = MQConstants.MQGMO_FAIL_IF_QUIESCING; 
		gmo.options = gmo.options + MQConstants.MQGMO_SYNCPOINT; 
		/* 等待消息 */ 
		gmo.options = gmo.options + MQConstants.MQGMO_WAIT; 
		/* 设置等待时间限制 */ 
		gmo.waitInterval = 5000; 
		/* 只获取消息 */ 
		gmo.options = gmo.options + MQConstants.MQGMO_ALL_MSGS_AVAILABLE; 
		/* 以辑顺序获取消息 */ 
		gmo.options = gmo.options + MQConstants.MQGMO_LOGICAL_ORDER; 
		gmo.matchOptions = MQConstants.MQMO_MATCH_GROUP_ID; 
		/* 创建消息缓冲 */ 
		inMsg = new MQMessage(); 
		try { 
			FileOutputStream fos = null; 
			/* 处理组消息 */ 
			while (true) { 
				try { 
					inQueue.get(inMsg, gmo); 
					if (fos == null) { 
						try { 
							fileName = inMsg.getStringProperty("fileName"); 
							String fileName_full = null; 
							fileName_full = file_dir + RenameUtil.rename(fileName); 
							fos = new FileOutputStream(new File(fileName_full)); 
							int msgLength = inMsg.getMessageLength(); 
							byte[] buffer = new byte[msgLength]; 
							inMsg.readFully(buffer); 
							fos.write(buffer, 0, msgLength); 
							/* 查看是否是最后消息标识 */ 
							char x = gmo.groupStatus; 
							if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) { 
								System.out.println("Last Msg in Group"); 
								break; 
							} 
							inMsg.clearMessage(); 
 
						} catch (Exception e) { 
							System.out 
									.println("Receiver the message without property,do nothing!"); 
							inMsg.clearMessage(); 
						} 
					} else { 
						int msgLength = inMsg.getMessageLength(); 
						byte[] buffer = new byte[msgLength]; 
						inMsg.readFully(buffer); 
						fos.write(buffer, 0, msgLength); 
						/* 查看是否是最后消息标识 */ 
						char x = gmo.groupStatus; 
						if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) { 
							System.out.println("Last Msg in Group"); 
							break; 
						} 
						inMsg.clearMessage(); 
					} 
				} catch (Exception e) { 
					char x = gmo.groupStatus; 
					if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) { 
						System.out.println("Last Msg in Group"); 
					} 
					break; 
				} 
			} 
			if (fos != null) 
				fos.close(); 
		} catch (Exception e) { 
			System.out.println(e.getMessage()); 
		} 
	} 
 
	public void initproperty() { 
		MQConfig config = new MQConfig().getInstance(); 
		if (config.getMQ_MANAGER() != null) { 
			qmgrName = config.getMQ_MANAGER(); 
			queueName = config.getMQ_QUEUE_NAME(); 
			channel = config.getMQ_CHANNEL(); 
			host = config.getMQ_HOST_NAME(); 
			port = Integer.valueOf(config.getMQ_PROT()); 
			ccsid = Integer.valueOf(config.getMQ_CCSID()); 
			file_dir = config.getFILE_DIR(); 
		} 
	} 
} 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/10377.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论