/**
*
*/
package activemq.service;
import java.util.Random;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class Producer implements Runnable {
private static Logger logger = LoggerFactory.getLogger(Producer.class);//slf4j logging
public Producer(String name){
Thread thread = new Thread(this);
thread.setName(Producer.class.getName() + "-" + name);
thread.start();
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
this.init();
int i = 10;
while( i-- > 0 ){
try {
this.sendText("hello," + i);
} catch (JMSException e) {
e.printStackTrace();
} finally{
try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
this.close();
}
/**
* @param args
*/
public static void main(String[] args) {
new Producer("" + new Random().nextInt());
}
/**
* 初始化相关参数
*/
private void init(){
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
try {
connection = factory.createConnection();
logger.info("start connection");
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// destination = session.createTopic(TOPIC_NAME);
destination = session.createQueue(QUEUE_NAME);
producer = session.createProducer(destination);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void close(){
try {
logger.info("close connection");
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void sendText(String msg) throws JMSException{
TextMessage message = session.createTextMessage(msg);
message.setStringProperty("headname", "remoteB");
logger.info("send:" + msg);
producer.send(message);
}
private Destination destination;
private MessageProducer producer;
private Session session;
private Connection connection;
private static final String url = "tcp://localhost:61616";
private static final String QUEUE_NAME = "FOO.BA";
private static final String TOPIC_NAME = "FOO.TOPIC";
protected String expectedBody = "<hello>world!</hello>";
}
|
/**
*
*/
package com.topsci.xxxxxx.provider.service;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
class TestClient {
private static Logger logger = LoggerFactory.getLogger(TestClient.class);//slf4j logging
// The host:port combination to connect to
private InetAddress hostAddress;
private int port;
private SocketChannel client;
protected Charset charset = Charset.forName("UTF-8");
protected CharsetEncoder charsetEncoder = charset.newEncoder();
protected CharsetDecoder charsetDecoder = charset.newDecoder();
public TestClient(InetAddress hostAddress, int port) throws IOException{
this.hostAddress = hostAddress;
this.port = port;
// Create a non-blocking socket channel
client = SocketChannel.open();
// client.configureBlocking(false);
// client.register(this.selector, SelectionKey.OP_CONNECT);
// Kick off connection establishment
client.connect(new InetSocketAddress(this.hostAddress, this.port));
}
private void request(String msg){
ByteBuffer buffer = ByteBuffer.allocate(100);
try {
this.client.write(charsetEncoder.encode(CharBuffer.wrap(msg)));
int len = this.client.read(buffer);
if (len > 0){
buffer.flip();
logger.info(charsetDecoder.decode(buffer).toString());
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* @param args
* @throws IOException
* @throws UnknownHostException
*/
public static void main(String[] args) throws UnknownHostException, IOException {
// TODO Auto-generated method stub
TestClient client = new TestClient(InetAddress.getByName("localhost"), 9999);
Random random = new Random();
for (int i = 5; i > 0; i--){
String req = "请求编号 " + random.nextInt();
client.request(req);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
|
/**
*
*/
package com.topsci.xxxxxx.provider.service;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* 监听客户端请求
*/
class ListenServer implements Runnable {
private static Logger logger = LoggerFactory.getLogger(ListenServer.class);//slf4j logging
/**
* 检查当前运行状态
* @return
*/
public boolean isRunning(){
return this.isRunning;
}
/**
* 设置是否继续运行
* @param run
*/
public void setRun(boolean run){
this.canRun = run;
}
/**
*
* 构造函数
*/
public ListenServer(int port,ListenServerHandler handler) throws IOException {
activeSockets = 0;
this.port = port;// 初始化服务器Channel绑定的端口
this.handler = handler;
selector = Selector.open();// 初始化Selector对象
server = ServerSocketChannel.open();// 初始化服务器Channel对象
ServerSocket socket = server.socket();// 获取服务器Channel对应的//ServerSocket对象
socket.bind(new InetSocketAddress(this.port));// 把Socket绑定到监听端口9999上
server.configureBlocking(false);// 将服务器Channel设置为非阻塞模式
server.register(selector, SelectionKey.OP_ACCEPT); // 将服务器Channel注册到Selector对象,并指出服务器Channel所感兴趣的事件为可接受请求操作
}
/**
* 判断客户端状态处理客户请求
*/
public void run() {
logger.info("listening...");
while (this.canRun) {
this.isRunning = true;
try {
/**
* 应用Select机制轮循是否有用户感兴趣的新的网络事件发生,当没有
* 新的网络事件发生时,此方法会阻塞,直到有新的网络事件发生为止
*/
selector.select();
} catch (IOException e) {
logger.warn(e.getMessage());
continue;// 当有异常发生时,继续进行循环操作
}
/**
* 得到活动的网络连接选择键的集合
*/
Set<SelectionKey> keys = selector.selectedKeys();
activeSockets = keys.size();// 获取活动连接的数目
/**
* 应用For—Each循环遍历整个选择键集合
*/
for (SelectionKey key : keys) {
if (this.handleEvent(key)) keys.remove(key);//成功处理则移除键
}
}
this.isRunning = false;
logger.info("finish listen");
}
/**
* 根据请求类型进行相应处理
* @param key
* @return 成功处理返回true,否则返回false
*/
private boolean handleEvent(SelectionKey key){
boolean rt = false;
if (key.isAcceptable()) {//如果关键字状态是为可接受,则接受连接,注册通道,以接受更多的* 事件,进行相关的服务器程序处理
logger.debug("accept a client:" + key.toString());
rt = doServerSocketEvent(key);
} else if (key.isReadable()) {//如果关键字状态为可读,则说明Channel是一个客户端的连接通道, 进行相应的读取客户端数据的操作
rt = doClientReadEvent(key);
}else if (key.isWritable()) { //如果关键字状态为可写,则也说明Channel是一个客户端的连接通道, 进行相应的向客户端写数据的操作
// logger.debug("can write");
// rt = doClinetWriteEvent(key);
}
return rt;
}
/**
* 处理服务器事件操作
*
* @param key
* 服务器选择键对象
*/
private boolean doServerSocketEvent(SelectionKey key) {
SocketChannel client = null;
try {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
client = server.accept();
if (client != null){
client.configureBlocking(false);// 将客户端Channel设置为非阻塞型
//将客户端Channel注册到Selector对象上,并且指出客户端Channel所感 兴趣的事件为可读和可写
client.register(selector, SelectionKey.OP_READ);
}
} catch (IOException e) {
try {
client.close();
} catch (IOException e1) {
logger.warn(e1.getMessage());
}
}
return true;
}
/**
* 进行读取客户端请求或数据信息
*
* @param key
* 客户端选择键对象
*/
private boolean doClientReadEvent(SelectionKey key) {
SocketChannel client = (SocketChannel) key.channel();
//读取数据
String clientMsg = null;
if (client != null){
ByteBuffer buffer = ByteBuffer.allocate(100);
try {
int len = client.read(buffer);
if (len > 0){
buffer.flip();
clientMsg = charsetDecoder.decode(buffer).toString();
logger.debug("read client:" + clientMsg);
}
} catch (IOException e) {
try {
client.close();
} catch (IOException e1) {
logger.error(e1.getMessage());
}
logger.warn(e.getMessage());
}
}
//数据交由handler处理并返回处理结果数据
if (clientMsg != null){
Map<String, Object> response = this.handler.handle(clientMsg);
this.doResponse(client, response, key);
}
return true;
}
/**
* 根据handler返回的结果进行处理
* @param client 客户
* @param response 结果
* @param key 键
*/
private void doResponse(SocketChannel client, Map<String, Object> response, SelectionKey key){
//判断结果并做响应
switch ((ListenServerHandlerFlag)response.get(ListenServerHandler.RESULT_FLAG)){
case refuse:{//拒绝此客户,断开与其连接
try {
client.close();
} catch (IOException e) {
logger.error(e.getMessage());
}
key.cancel();
logger.debug("refuse this client");
}
case error:{//出现异常
logger.debug("exception occur");
this.responseErrorDataToClient(response, client);
}
case ok:{//成功处理,将rt返回给client
byte[] toClient = (byte[]) response.get(ListenServerHandler.RESULT_RT);
if (toClient != null){
this.responseDataToClient(toClient, client);
}
logger.debug("request handle ok");
}
}
}
/**
* 将数据返回给客户
* @param data
* @param client
*/
private void responseDataToClient(byte[] data, SocketChannel client){
try {
client.write(ByteBuffer.wrap(data));
} catch (CharacterCodingException e) {
logger.warn(e.getMessage());
} catch (IOException e) {
logger.warn(e.getMessage());
try {
client.close();
} catch (IOException e1) {
logger.error(e1.getMessage());
}
}
}
/**
* 将错误消息返回给客户
* @param data
* @param client
*/
private void responseErrorDataToClient(Map<String, Object> response, SocketChannel client){
// TODO 暂时未实
String errorMsg = (String)response.get(ListenServerHandler.RESULT_ERROR_MSG);
if (errorMsg != null){
}
}
/**
* 进行向客户端写数据操作
*
* @param key
* 客户端选择键对象
*/
private boolean doClinetWriteEvent(SelectionKey key) {
logger.info("do nothing");
return true;
}
/**
* 服务器Channel对象,负责接受用户连接
*/
private ServerSocketChannel server;
/**
* Selector对象,负责监控所有的连接到服务器的网络事件的发生
*/
private Selector selector;
/**
* 总的活动连接数
*/
private int activeSockets;
/**
* 服务器Channel绑定的端口号
*/
private int port;
private ListenServerHandler handler;//消息处理器
protected Charset charset = Charset.forName("UTF-8");
protected CharsetEncoder charsetEncoder = charset.newEncoder();
protected CharsetDecoder charsetDecoder = charset.newDecoder();
private boolean canRun;//是否继续运行的标志
private boolean isRunning;//是否正在运行的标志
public static void main(String [] argu) throws IOException{
ListenServer server = new ListenServer(9999,new ListenServerHandler(Charset.forName("UTF-8")));
Thread t1 = new Thread(server);
// t1.setDaemon(true);
t1.start();
}
}
|
/**
* 方法调用代理
* 负责采用反射方式调用指定序号的模型识别函数recogniseItemModelX,其中X为序号
* @param i 方法序号
* @param item 方法执行时的参数
* @return 能够识别则写入识别的结果(成功或错误消息);否则返回null
*/
private List<Map<String,Object>> invokeRecogniseItemModelMethod(int i, String param){
Object result = null;
try {
Class<?> clazz = this.getClass();
Method method = clazz.getDeclaredMethod("recogniseItemModel" + i, String.class);
result = method.invoke(this, new Object[]{param});
} catch (Exception e) {
e.printStackTrace();
logger.error(e.getMessage());
}
return (List<Map<String,Object>>) result;
}
/**
* 根据bean的类型获取bean实例
* @param type 类型
* @return 获取失败返回null
*/
public <T> T getBean(T type){
T rt = null;
try {
String className = this.contextPath + ".ObjectFactory";
Class clazz = Class.forName(className);
String methodName = type.toString();
//处理方法名称
Pattern p = Pattern.compile("(?<=[.])[A-Z]\\w*$");
Matcher m = p.matcher(methodName);
if (m.find()) methodName = "create" + m.group();
else throw new NoSuchMethodError();
//获取创建bean的方法
Method method = clazz.getMethod(methodName);
Object obj = method.invoke(clazz.newInstance());
rt = (T) obj;
} catch (Exception e) {
e.printStackTrace();
}
return rt;
}
|