`
maimode
  • 浏览: 412383 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论
收藏列表
标题 标签 来源
activeMQ consumer
/**
 * 
 */
package activemq.service;

import java.util.Random;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 */
public class Consumer implements Runnable {
	private static Logger logger = LoggerFactory.getLogger(Consumer.class);//slf4j logging
	
	public Consumer(String name){
		Thread thread = new Thread(this);
		thread.setName(Consumer.class.getName() + "-" + name);
		thread.start();
	}
	
	/* (non-Javadoc)
	 * @see java.lang.Runnable#run()
	 */
	@Override
	public void run() {
		this.init();
		int i = 100;
		while( i-- > 0 ){
			this.consumeTextMessage();
			try {
				Thread.sleep(1000 * 5);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		this.close();
	}
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		new Consumer("" + new Random().nextInt());
	}
	
	private void consumeTextMessage(){
		try {
			Message message = consumer.receive(1000);
			this.onMessage(message);
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
	
	private void onMessage(Message message){
		try {
			if (message instanceof TextMessage) {
				TextMessage txtMsg = (TextMessage) message;
				String msg = txtMsg.getText();
				logger.info("receive:" + msg);
			}
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	private void init(){
		ConnectionFactory factory = new ActiveMQConnectionFactory(url);
		try {
			connection = factory.createConnection();
			logger.info("start connection");
			connection.start();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			destination = session.createTopic(TOPIC_NAME);
			topic = (Topic)destination;
//			destination = session.createQueue(QUEUE_NAME);
			consumer = session.createConsumer(destination);
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	private void close(){
		try {
			logger.info("close connection");
			connection.close();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	private Topic topic;
	private Destination destination;
	private MessageConsumer consumer;
	private Session session;
	private Connection connection;
	
	private static final String url = "tcp://localhost:61616";
	private static final String QUEUE_NAME = "FOO.BA";
	private static final String TOPIC_NAME = "FOO.TOPIC";
}
activeMQ producer
/**
 * 
 */
package activemq.service;

import java.util.Random;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 */
public class Producer implements Runnable {
	private static Logger logger = LoggerFactory.getLogger(Producer.class);//slf4j logging
	
	public Producer(String name){
		Thread thread = new Thread(this);
		thread.setName(Producer.class.getName() + "-" + name);
		thread.start();
	}
	
	
	/* (non-Javadoc)
	 * @see java.lang.Runnable#run()
	 */
	@Override
	public void run() {
		this.init();
		int i = 10;
		while( i-- > 0 ){
			try {
				this.sendText("hello," + i);
			} catch (JMSException e) {
				e.printStackTrace();
			} finally{
				try {
					Thread.sleep(1000 * 10);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
		this.close();
	}
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		new Producer("" + new Random().nextInt());
	}
	
	/**
	 * 初始化相关参数
	 */
	private void init(){
		ConnectionFactory factory = new ActiveMQConnectionFactory(url);
		try {
			connection = factory.createConnection();
			logger.info("start connection");
			connection.start();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//			destination = session.createTopic(TOPIC_NAME);
			destination = session.createQueue(QUEUE_NAME);
			producer = session.createProducer(destination);
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	private void close(){
		try {
			logger.info("close connection");
			connection.close();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	private void sendText(String msg) throws JMSException{
		TextMessage message = session.createTextMessage(msg);
		message.setStringProperty("headname", "remoteB");
		logger.info("send:" + msg);
		producer.send(message);
	}
	
	private Destination destination;
	private MessageProducer producer;
	private Session session;
	private Connection connection;
	
	private static final String url = "tcp://localhost:61616";
	private static final String QUEUE_NAME = "FOO.BA";
	private static final String TOPIC_NAME = "FOO.TOPIC";
	protected String expectedBody = "<hello>world!</hello>";
}
NIO client
/**
 * 
 */
package com.topsci.xxxxxx.provider.service;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.Random;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 */
class TestClient {
	private static Logger logger = LoggerFactory.getLogger(TestClient.class);//slf4j logging

	// The host:port combination to connect to
    private InetAddress hostAddress;
    private int         port;
    private SocketChannel client;
    
    protected Charset charset = Charset.forName("UTF-8");  
	protected CharsetEncoder charsetEncoder = charset.newEncoder();  
	protected CharsetDecoder charsetDecoder = charset.newDecoder(); 
    
    public TestClient(InetAddress hostAddress, int port) throws IOException{
		this.hostAddress = hostAddress;
		this.port = port;
		
		// Create a non-blocking socket channel
        client = SocketChannel.open();
//        client.configureBlocking(false);
//        client.register(this.selector, SelectionKey.OP_CONNECT);

        // Kick off connection establishment
        client.connect(new InetSocketAddress(this.hostAddress, this.port));
	}
    
    private void request(String msg){
		ByteBuffer buffer = ByteBuffer.allocate(100);
		try {
			this.client.write(charsetEncoder.encode(CharBuffer.wrap(msg)));
			int len = this.client.read(buffer);
			if (len > 0){
				buffer.flip();
				logger.info(charsetDecoder.decode(buffer).toString());
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
    
	
	/**
	 * @param args
	 * @throws IOException 
	 * @throws UnknownHostException 
	 */
	public static void main(String[] args) throws UnknownHostException, IOException {
		// TODO Auto-generated method stub
		TestClient client = new TestClient(InetAddress.getByName("localhost"), 9999);
		Random random = new Random();
		
		for (int i = 5; i > 0; i--){
			String req = "请求编号 " + random.nextInt();
			client.request(req);
			try {
				Thread.sleep(5000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
}
NIO server
/**
 * 
 */
package com.topsci.xxxxxx.provider.service;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 * 监听客户端请求
 */
class ListenServer implements Runnable {
	private static Logger logger = LoggerFactory.getLogger(ListenServer.class);//slf4j logging
	
	/**
	 * 检查当前运行状态
	 * @return
	 */
	public boolean isRunning(){
		return this.isRunning;
	}
	
	/**
	 * 设置是否继续运行
	 * @param run
	 */
	public void setRun(boolean run){
		this.canRun = run;
	}
		
	/**
	 * 
	 * 构造函数
	 */
	public ListenServer(int port,ListenServerHandler handler) throws IOException {
		activeSockets = 0;
		this.port = port;// 初始化服务器Channel绑定的端口
		this.handler = handler;
		selector = Selector.open();// 初始化Selector对象
		server = ServerSocketChannel.open();// 初始化服务器Channel对象
		ServerSocket socket = server.socket();// 获取服务器Channel对应的//ServerSocket对象
		socket.bind(new InetSocketAddress(this.port));// 把Socket绑定到监听端口9999上
		server.configureBlocking(false);// 将服务器Channel设置为非阻塞模式
		server.register(selector, SelectionKey.OP_ACCEPT); // 将服务器Channel注册到Selector对象,并指出服务器Channel所感兴趣的事件为可接受请求操作
	}

	/**
	 * 判断客户端状态处理客户请求
	 */
	public void run() {
		logger.info("listening...");
		while (this.canRun) {
			this.isRunning = true;
			try {
				/**
				 * 应用Select机制轮循是否有用户感兴趣的新的网络事件发生,当没有
				 * 新的网络事件发生时,此方法会阻塞,直到有新的网络事件发生为止
				 */
				selector.select();
			} catch (IOException e) {
				logger.warn(e.getMessage());
				continue;// 当有异常发生时,继续进行循环操作
			}
			
			/**
			 * 得到活动的网络连接选择键的集合
			 */
			Set<SelectionKey> keys = selector.selectedKeys();
			activeSockets = keys.size();// 获取活动连接的数目
			
			/**
			 * 应用For—Each循环遍历整个选择键集合
			 */
			for (SelectionKey key : keys) {
				if (this.handleEvent(key)) keys.remove(key);//成功处理则移除键
			}
		}
		this.isRunning = false;
		logger.info("finish listen");
	}
	
	/**
	 * 根据请求类型进行相应处理
	 * @param key
	 * @return 成功处理返回true,否则返回false
	 */
	private boolean handleEvent(SelectionKey key){
		boolean rt = false;
		if (key.isAcceptable()) {//如果关键字状态是为可接受,则接受连接,注册通道,以接受更多的* 事件,进行相关的服务器程序处理
			logger.debug("accept a client:" + key.toString());
			rt = doServerSocketEvent(key);
		} else if (key.isReadable()) {//如果关键字状态为可读,则说明Channel是一个客户端的连接通道, 进行相应的读取客户端数据的操作
			rt = doClientReadEvent(key);
		}else if (key.isWritable()) { //如果关键字状态为可写,则也说明Channel是一个客户端的连接通道, 进行相应的向客户端写数据的操作
//			logger.debug("can write");
//			rt = doClinetWriteEvent(key);
		}
		return rt;
	}

	/**
	 * 处理服务器事件操作
	 * 
	 * @param key
	 *            服务器选择键对象
	 */
	private boolean doServerSocketEvent(SelectionKey key) {
		SocketChannel client = null;
		try {
			ServerSocketChannel server = (ServerSocketChannel) key.channel();
			client = server.accept();
			if (client != null){
				client.configureBlocking(false);// 将客户端Channel设置为非阻塞型
				//将客户端Channel注册到Selector对象上,并且指出客户端Channel所感 兴趣的事件为可读和可写
				client.register(selector, SelectionKey.OP_READ);
			}
		} catch (IOException e) {
			try {
				client.close();
			} catch (IOException e1) {
				logger.warn(e1.getMessage());
			}
		}
		return true;
	}
	
	/**
	 * 进行读取客户端请求或数据信息
	 * 
	 * @param key
	 *            客户端选择键对象
	 */
	private boolean doClientReadEvent(SelectionKey key) {
		SocketChannel client = (SocketChannel) key.channel();
		//读取数据
		String clientMsg = null;
		if (client != null){
			ByteBuffer buffer = ByteBuffer.allocate(100);
			try {
				int len = client.read(buffer);
				if (len > 0){
					buffer.flip();  
					clientMsg = charsetDecoder.decode(buffer).toString(); 
					logger.debug("read client:" + clientMsg);
				}
			} catch (IOException e) {
				try {
					client.close();
				} catch (IOException e1) {
					logger.error(e1.getMessage());
				}
				logger.warn(e.getMessage());
			}
		} 
		
		//数据交由handler处理并返回处理结果数据
		if (clientMsg != null){
			Map<String, Object> response = this.handler.handle(clientMsg);
			this.doResponse(client, response, key);
		}
		
		return true;
	}
	
	/**
	 * 根据handler返回的结果进行处理
	 * @param client 客户
	 * @param response 结果
	 * @param key 键
	 */
	private void doResponse(SocketChannel client, Map<String, Object> response, SelectionKey key){
		//判断结果并做响应
		switch ((ListenServerHandlerFlag)response.get(ListenServerHandler.RESULT_FLAG)){
		case refuse:{//拒绝此客户,断开与其连接
			try {
				client.close();
			} catch (IOException e) {
				logger.error(e.getMessage());
			}
			key.cancel();
			logger.debug("refuse this client");
		}
		case error:{//出现异常
			logger.debug("exception occur");
			this.responseErrorDataToClient(response, client);
		}
		case ok:{//成功处理,将rt返回给client
			byte[] toClient = (byte[]) response.get(ListenServerHandler.RESULT_RT);
			if (toClient != null){
				this.responseDataToClient(toClient, client);	
			}
			logger.debug("request handle ok");
		}
		}
	}
	
	/**
	 * 将数据返回给客户
	 * @param data
	 * @param client
	 */
	private void responseDataToClient(byte[] data, SocketChannel client){
		try {
			client.write(ByteBuffer.wrap(data));
		} catch (CharacterCodingException e) {
			logger.warn(e.getMessage());
		} catch (IOException e) {
			logger.warn(e.getMessage());
			try {
				client.close();
			} catch (IOException e1) {
				logger.error(e1.getMessage());
			}
		}
	}
	
	/**
	 * 将错误消息返回给客户
	 * @param data
	 * @param client
	 */
	private void responseErrorDataToClient(Map<String, Object> response, SocketChannel client){
		// TODO 暂时未实
		String errorMsg = (String)response.get(ListenServerHandler.RESULT_ERROR_MSG);
		if (errorMsg != null){
			
		}
	}

	/**
	 * 进行向客户端写数据操作
	 * 
	 * @param key
	 *            客户端选择键对象
	 */
	private boolean doClinetWriteEvent(SelectionKey key) {
		logger.info("do nothing");
		return true;
	}
	
	/**
	 * 服务器Channel对象,负责接受用户连接
	 */
	private ServerSocketChannel server;
	/**
	 * Selector对象,负责监控所有的连接到服务器的网络事件的发生
	 */
	private Selector selector;
	/**
	 * 总的活动连接数
	 */
	private int activeSockets;
	/**
	 * 服务器Channel绑定的端口号
	 */
	private int port;
	private ListenServerHandler handler;//消息处理器
	
	protected Charset charset = Charset.forName("UTF-8");  
	protected CharsetEncoder charsetEncoder = charset.newEncoder();  
	protected CharsetDecoder charsetDecoder = charset.newDecoder(); 
	
	private boolean canRun;//是否继续运行的标志
	private boolean isRunning;//是否正在运行的标志
	
	
	public static void main(String [] argu) throws IOException{
		ListenServer server = new ListenServer(9999,new ListenServerHandler(Charset.forName("UTF-8")));
		Thread t1 = new Thread(server);
//		t1.setDaemon(true);
		t1.start();
	}
}
反射代码
	/**
	 * 方法调用代理
	 * 负责采用反射方式调用指定序号的模型识别函数recogniseItemModelX,其中X为序号
	 * @param i 方法序号
	 * @param item 方法执行时的参数
	 * @return 能够识别则写入识别的结果(成功或错误消息);否则返回null
	 */
	private List<Map<String,Object>> invokeRecogniseItemModelMethod(int i, String param){
		Object result = null;
		try {
			Class<?> clazz = this.getClass();
			Method method = clazz.getDeclaredMethod("recogniseItemModel" + i, String.class);
			result = method.invoke(this, new Object[]{param});
		} catch (Exception e) {
			e.printStackTrace();
			logger.error(e.getMessage());
		}
		return (List<Map<String,Object>>) result;
	}




        /**
	 * 根据bean的类型获取bean实例
	 * @param type 类型
	 * @return 获取失败返回null
	 */
	public <T> T getBean(T type){
		T rt = null;
		try {
			String className = this.contextPath + ".ObjectFactory";
			Class clazz = Class.forName(className);
			String methodName = type.toString();
			//处理方法名称
			Pattern p = Pattern.compile("(?<=[.])[A-Z]\\w*$");
			Matcher m = p.matcher(methodName);
			if (m.find()) methodName = "create" + m.group();
			else throw new NoSuchMethodError();
			//获取创建bean的方法
			Method method = clazz.getMethod(methodName);
			Object obj = method.invoke(clazz.newInstance());
			rt = (T) obj;
		} catch (Exception e) {
			e.printStackTrace();
		} 

		return rt;
	}
Global site tag (gtag.js) - Google Analytics