MQ发送文件到队列
mqfilesend.java
package com.mq.dpca.file; import java.io.File; import java.io.FileInputStream; import com.ibm.mq.MQEnvironment; import com.ibm.mq.MQException; import com.ibm.mq.MQMessage; import com.ibm.mq.MQPutMessageOptions; import com.ibm.mq.MQQueue; import com.ibm.mq.MQQueueManager; import com.ibm.mq.constants.MQConstants; /** * * MQ发送文件功能 * */ public class mqfilesend { final int BUFFER_LEN = 1024 * 1024; // 定义发送文件的大小 private MQQueueManager qmgr; // 连接到队列管理器 private MQQueue outQueue; // 传输队列 private String queueName = "aa"; // 队列名称 private String host = "127.0.0.1"; // 队列名称 private int port = 1414; // 侦听器的端口号 private String channel = "SYSTEM.BKR.CONFIG"; // 通道名称 private String qmgrName = "mm"; // 队列管理器 private MQMessage outMsg; // 创建消息缓冲 private MQPutMessageOptions pmo; // 设置获取消息选项 private String fileName = "D://log.txt"; // 要往队列上发的文件 /** * 初始化服务器连接信息 * * @throws Exception */ private void init() throws Exception { /** 设置MQEnvironment 属性以便客户机连接 */ MQEnvironment.hostname = host; MQEnvironment.channel = channel; MQEnvironment.port = port; // MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,MQC.TRANSPORT_WEBSPHERE // MQ); /* 连接到队列管理器 */ qmgr = new MQQueueManager(qmgrName); /* 设置队列打开选项以便输出 */ int opnOptn = MQConstants.MQOO_OUTPUT | MQConstants.MQOO_FAIL_IF_QUIESCING; // int opnOptn = MQConstants.MQOO_OUTPUT ; outQueue = qmgr.accessQueue(queueName, opnOptn, null, null, null); } /** * 发送的主程序 * * @throws Exception */ public void sendMessages() throws Exception { /* 设置放置消息选项 */ pmo = new MQPutMessageOptions(); outMsg = new MQMessage(); FileInputStream fis = new FileInputStream(new File(fileName)); byte buffer[] = new byte[BUFFER_LEN]; int count = 0; while (true) { count = fis.read(buffer, 0, BUFFER_LEN); if (count == -1) { break; } outMsg.write(buffer); if (count < BUFFER_LEN) { System.out.println("aaa"); } outQueue.put(outMsg, pmo); outMsg.clearMessage(); } fis.close(); } public void runGoupSender() { try { init(); sendMessages(); qmgr.commit(); System.out.println("/n Messages successfully Send "); } catch (MQException mqe) { mqe.printStackTrace(); try { System.out.println("/n Backing out Transaction "); qmgr.backout(); System.exit(2); } catch (Exception e) { e.printStackTrace(); System.exit(2); } } catch (Exception e) { e.printStackTrace(); System.exit(2); } } /** * 程序的入口 * * @param args */ public static void main(String args[]) { MQFileSender mfs = new MQFileSender(); int i = 0; while (true) { i++; System.out.println("消息记录" + i); mfs.runGoupSender(); if (i == 1) { break; } } } }
MQConfig.java ~ 6KB (6)
package com.mq.dpca.msg; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.PropertyResourceBundle; import java.util.ResourceBundle; import com.mq.dpca.util.RenameUtil; /** * MQ访问配置文件各参数的获取 * */ public class MQConfig { // MQ配置及server配置文件路径 private static final String ACTIONPATH = "config.properties"; private static MQConfig instance = null; private String MQ_MANAGER = null; private String MQ_HOST_NAME = null; private String MQ_CHANNEL = null; private String MQ_QUEUE_NAME = null; private String MQ_PROT = null; private String MQ_CCSID = null; private String MQ_QUEUE_SUB = null; private String FILE_DIR = null; public String getFILE_DIR() { return FILE_DIR; } public void setFILE_DIR(String fILE_DIR) { FILE_DIR = fILE_DIR; } public String getMQ_MANAGER() { return MQ_MANAGER; } public void setMQ_MANAGER(String mq_manager) { MQ_MANAGER = mq_manager; } public String getMQ_HOST_NAME() { return MQ_HOST_NAME; } public void setMQ_HOST_NAME(String mq_host_name) { MQ_HOST_NAME = mq_host_name; } public String getMQ_CHANNEL() { return MQ_CHANNEL; } public void setMQ_CHANNEL(String mq_channel) { MQ_CHANNEL = mq_channel; } public String getMQ_QUEUE_NAME() { return MQ_QUEUE_NAME; } public void setMQ_QUEUE_NAME(String mq_queue_name) { MQ_QUEUE_NAME = mq_queue_name; } public String getMQ_PROT() { return MQ_PROT; } public void setMQ_PROT(String mq_prot) { MQ_PROT = mq_prot; } public String getMQ_CCSID() { return MQ_CCSID; } public void setMQ_CCSID(String mq_ccsid) { MQ_CCSID = mq_ccsid; } public static MQConfig getInstance() { if (instance == null) { instance = new MQConfig().getNewDbConfig(); } return instance; } public static ResourceBundle getCFG(){ String cfgPath = ""; InputStream in = null; ResourceBundle pathCfg = null; ResourceBundle mqCfg = null; pathCfg = PropertyResourceBundle.getBundle("config"); cfgPath = RenameUtil.getParameter(pathCfg, "mqcfgPath"); try { in = new BufferedInputStream(new FileInputStream(cfgPath)); mqCfg = new PropertyResourceBundle(in); return mqCfg; } catch (FileNotFoundException e) { e.printStackTrace(); }catch (IOException e) { e.printStackTrace(); } return mqCfg; } public static MQConfig getNewDbConfig() { MQConfig dc = new MQConfig(); Properties prop = new Properties(); String path = null; InputStream fis = null; try { fis = MQConfig.class.getClassLoader().getResourceAsStream( "config.properties"); // fis = new FileInputStream(new File(path + ACTIONPATH)); prop.load(fis); dc.MQ_CCSID = prop.getProperty("MQ_CCSID"); dc.MQ_CHANNEL = prop.getProperty("MQ_CHANNEL"); dc.MQ_HOST_NAME = prop.getProperty("MQ_HOST_NAME"); dc.MQ_MANAGER = prop.getProperty("MQ_MANAGER"); dc.MQ_PROT = prop.getProperty("MQ_PROT"); dc.MQ_QUEUE_NAME = prop.getProperty("MQ_QUEUE_NAME"); dc.MQ_QUEUE_SUB = prop.getProperty("MQ_QUEUE_SUB"); dc.FILE_DIR = prop.getProperty("FILE_DIR"); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } return dc; } public static MQConfig MqConfig() { MQConfig dc = new MQConfig(); ResourceBundle rb = getCFG(); try { // fis = new FileInputStream(new File(path + ACTIONPATH)); dc.MQ_CCSID = RenameUtil.getParameter(rb,"MQ_CCSID"); dc.MQ_CHANNEL = RenameUtil.getParameter(rb,"MQ_CHANNEL"); dc.MQ_HOST_NAME = RenameUtil.getParameter(rb,"MQ_HOST_NAME"); dc.MQ_MANAGER = RenameUtil.getParameter(rb,"MQ_MANAGER"); dc.MQ_PROT = RenameUtil.getParameter(rb,"MQ_PROT"); dc.MQ_QUEUE_NAME = RenameUtil.getParameter(rb,"MQ_QUEUE_NAME"); dc.MQ_QUEUE_SUB = RenameUtil.getParameter(rb,"MQ_QUEUE_SUB"); dc.FILE_DIR = RenameUtil.getParameter(rb,"FILE_DIR"); } catch (Exception e) { e.printStackTrace(); } return dc; } public static MQConfig getNewDbConfigFromKey(String key) { MQConfig dc = null; Properties prop = new Properties(); String path = null; InputStream fis = null; try { fis = MQConfig.class.getClassLoader().getResourceAsStream( "config.properties"); // fis = new FileInputStream(new File(path + ACTIONPATH)); prop.load(fis); dc = new MQConfig(); dc.MQ_MANAGER = prop.getProperty(key + "_MQ_MANAGER"); dc.MQ_CCSID = prop.getProperty(key + "_MQ_CCSID"); dc.MQ_CHANNEL = prop.getProperty(key + "_MQ_CHANNEL"); dc.MQ_HOST_NAME = prop.getProperty(key + "_MQ_HOST_NAME"); dc.MQ_PROT = prop.getProperty(key + "_MQ_PROT"); dc.MQ_QUEUE_NAME = prop.getProperty(key + "_MQ_QUEUE_NAME"); dc.MQ_QUEUE_SUB = prop.getProperty(key + "_MQ_QUEUE_SUB"); dc.FILE_DIR = prop.getProperty(key + "_FILE_DIR"); } catch (FileNotFoundException e) { dc = null; } catch (IOException e) { dc = null; } return dc; } public static void main(String args[]) throws Exception { MQConfig mc = MQConfig.getNewDbConfigFromKey("002"); System.out.println(mc); } public void setMQ_QUEUE_SUB(String mQ_QUEUE_SUB) { MQ_QUEUE_SUB = mQ_QUEUE_SUB; } public String getMQ_QUEUE_SUB() { return MQ_QUEUE_SUB; } public static String getValueByKey(String key) { InputStream fis = null; String value = null; Properties prop = new Properties(); try { fis = MQConfig.class.getClassLoader().getResourceAsStream( "config.properties"); prop.load(fis); value = prop.getProperty(key); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } return value; } }
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/10150.html