# 组件开发
自定义协议扩展框架
ESB提供了一套扩展框架,专门用于实现定制的Transport或Endpoint。其核心是提供了两个基础抽象类供定制化实现扩展,它们是:
com.primeton.esb.adapter.framework.component.AbstractTransportComponent
用于Transport扩展,定制的Transport需直接或间接地继承此类,并重写其中的几个抽象方法。
com.primeton.esb.adapter.framework.component.AbstractEndpointComponent
用于Endpoint扩展,定制的Endpoint需直接或间接地继承此类,并重写其中的几个抽象方法。
说明:为保证在ESB工程中能找到该实现类,建议直接在ESB Studio中的ESB工程中创建并实现类,而不要用其他开发工具。(实际上,由于ESB Studio是基于Eclipse开发的,
其创建并实现Java类的方法与Eclipse是基本一样的)1.1 Transport实现要点
在transport实现类中,需要重写的几个重要方法如下:
void init() transport初始化时被调用一次。通常在此应读取相关配置参数,初始化运行环境。 void start() transport启动时被调用。此后应开始处理客户端的服务请求。 void stop() transport停止时被调用。此后应停止处理客户端的服务请求。 void handleReply(INormalizedMessage replyMsg) 当ESB向transport返回应答消息时被调用,其中replyMsg参数代表应答消息。此时应根据应答消息的内容向客户端返回应答报文,并结束本次服务请求。 void handleError(INormalizedMessage errMsg) 当ESB向transport返回错误消息时被调用。其中errMsg参数代表错误消息。此时应根据错误消息的内容向客户端返回错误报文,并结束本次服务请求。 void onChange(IComponentConfig cfg) 当transport配置参数发生变化时(如系统管理员在console中改变了参数值并执行热更新)被调用,其中cfg参数代表更新后的配置。此时应根据新的配置内容调整transport的行为。 此外,transport实现中还需调用如下的方法: void handleRequest(INormalizedMessage requestMsg) 将请求消息发送到请求通道。
需要注意的是,为保证ESB系统的性能,通常transport应具备多线程并发处理能力,且ESB的异步处理机制决定了请求和应答会在不同的线程中处理,具体说就是:transport通过调用handleRequest将请求消息发给ESB,而在应答消息返回时,ESB会(通常在另一个线程中)回调handleReply()或handleError()。 因此,如果需要请求和应答处理之间保持特定的状态信息,transport实现需自行实现一套上下文的挂起及恢复机制(某些通信框架本身提供类似机制,可直接利用,如Jetty和JavaNIO)。此外,ESB为每个服务请求生成唯一的ContinuationID标识符字串(可在handleRequest()后从系统头中取得),可以此作为上下文挂起及恢复的依据。
1.2 Endpoint实现要点在Endpoint实现类中,需要重写的几个重要方法如下:
void init() endpoint初始化时被调用一次。通常在此应读取相关配置参数,初始化运行环境。 void start() endpoint启动时被调用。此后应开始处理来自ESB的服务请求。 void stop() endpoint停止时被调用。此后应停止处理来自ESB的服务请求。 void onChange(IComponentConfig cfg) 当endpoint配置参数发生变化时(如系统管理员在console中改变了参数值并执行热更新)被调用,其中cfg参数代表更新后的配置。此时应根据新的配置内容调整endpoint的行为。 void handleRequest(INormalizedMessage requestMsg) 当ESB调用外部服务(即代理服务或中介服务调用业务服务)时被调用。其中requestMsg代表服务请求消息,此时应根据请求消息的内容执行对目标服务器的访问,并根据访问结果生成应答消息或错误消息,进而调用handleReply或handleError返回给ESB。 此外,endpoint实现中可以调用如下方法: void handleReply(INormalizedMessage replyMsg) 将应答消息发送到应答通道。 void handleError(INormalizedMessage errMsg) 将错误消息发送到错误通道。
需要注意的是,为保证ESB系统的性能,通常endpoint应具备多线程并发处理能力,且在请求和应答处理间应采用异步机制,具体说就是:endpoint在处理handleRequest()期间,不宜使线程阻塞于IO访问,而宜采用异步IO的处理方式,尤其在向服务器发送请求之后等待应答的这段时间。 因此,handleReply()或handleError()的调用通常会与handleRequest()的调用处于不同的线程,如果此期间需要保持特定的状态信息,endpoint需自行实现一套上下文的挂起及恢复机制(某些通信框架本身提供类似机制,可直接利用,如apache-httpclient和JavaNIO)。 当然,如果采用简单的同步IO处理,则不需要这些复杂的机制,功能上也可以实现,只是对ESB的整体性能不利。
无论是否采用异步IO处理,向ESB返回的消息(应答消息或错误消息)的系统头中都必须包含如下字段,且内容必须与请求消息中的一致:键值常量 键值字串 值类型 说明 CONTINUATION_ID $tip.continuationId String 服务请求ID。 MEDIA_RETURN_XPATH_NAME $tip.media.returnXpath String 中介服务返回值变量名。在由中介服务调用业务服务时需要,否则为空。 MEDIA_UUID_NAME $tip.media.uuid String 中介服务上下文ID。在由中介服务调用业务服务时需要,否则为空。 REPLY_CHANNEL $tip.reply.channel String 应答通道名。 TCP协议扩展示例
2.1 场景概述本示例基于一个模拟服务端程序及一个模拟客户端程序,分别模拟实施场景中的服务提供者和服务使用者。二者间基于TCP协议通信,其中:
模拟服务端在8888端口监听,当收到客户端连接请求后即建立并处理连接。针对每一个连接,先接收请求报文,再返回应答报文,最后关闭连接。
模拟客户端则主动连接服务器,发送请求报文,接收应答报文,最后关闭连接。
无论是请求报文还是应答报文,均遵从如下格式:
字段 字节长度 说明 内容长度 4 整数,网络(大端)字节序。表示后续报文内容的长度。 内容 由内容长度决定 报文内容,UTF-8编码。 本示例模拟在一个TCP协议环境下由ESB实现穿透服务的过程,即把模拟服务端提供的(基于TCP协议接口的)服务集成到ESB上,并重新发布(同样是基于TCP协议接口的)服务。而模拟客户端不再直接调用模拟服务端,而是调用ESB提供的TCP服务,且除了IP地址和端口相应变化外,其余功能实现不需改变。
2.2 ESB的配置及扩展实现为实现示例场景,需要配置一个ESB穿透服务,具体步骤如下:
- 配置一个TCP协议的Endpoint,连接模拟服务端。
- 配置一个TCP协议的业务服务,基于TCP Endpoint调用模拟服务端的服务。
- 配置一个中介服务。在中介服务流程中调用TCP业务服务,并在调用业务服务前后输出请求和应答报文内容。
- 配置一个TCP协议的Transport,接收并处理模拟客户端的连接。
- 配置一个TCP协议的代理服务,基于TCP Transport处理客户端的服务请求,并调用中介服务处理该请求
以上配置中所用到的TCP协议的transport和endpoint在ESB产品的默认安装包中是不存在的,需要做定制化的扩展实现。其中:
TCP协议的transport需要模仿模拟服务端的工作方式,即以短连接方式处理来自模拟客户端的连接请求。即针对每个连接,先接收请求报文,生成请求消息发送到请求通道;之后在应答消息从应答通道返回(或错误消息从错误通道返回)时,向客户端返回应答报文,并关闭连接。
TCP协议的endpoint需要模仿模拟客户端的工作方式,即针对每个来自ESB的请求消息,建立到模拟服务端的连接,根据请求消息的内容发送请求报文,接收应答报文,再根据应答报文内容生成应答消息并发送到应答通道。如此期间发生错误,则生成错误消息并发送到错误通道。
2.3 补充说明
本示例的目的在于说明ESB协议接入的扩展机制,而非TCP协议本身,故在TCP协议层面的处理比较简单。实际应用中基于TCP协议的通信机制可能更复杂(如长连接等),但相关内容的讨论不属于本文的范围。
此外,在TCP处理层面,出于简单考虑,没有使用JavaNIO的机制,而是采用传统的Socket机制,因此在endpoint实现中未能做到真正的异步处理,在实际应用中,这种处理方式可能会影响ESB的整体性能。
以下将创建一个简单的自定义协议服务,从而熟悉自定义协议服务的开发过程。
# 新建ESB项目
打开ESB Studio,进入ESB开发平台。点击ESB Studio工具条上的【文件->新建->项目】,在弹出的对话框中选择"ESB->ESB项目",如下图所示:
点击【下一步】,填写项目名称"custdemo",
点击【下一步】,显示项目的引用库信息。点击【完成】,在"资源管理器"视图中显示出custdemo项目。custdemo项目下默认没有创建模块。
# 新建公共Module
首先我们需要在custdemo项目中创建公共Module,选中custdemo项目,在右键菜单中选择"创建->公共Module",在弹出的对话框中设置Module名称, 如下图所示:
点击【完成】,新建的公共Module目录结构,如下图所示:
该Module用来配置HTTP服务、JMS服务、Web Service及自定义协议服务需要的公共信息:
- ransport:为代理服务实现通信接入。
- ndpoint:为业务服务实现通信接出。
- ariable:可创建变量、变量分类等。
- Java:可创建标准Java类。
# 新建业务Module
在ESB项目中开发各种服务,需要另外创建业务Module。选中custdemo项目,在右键菜单中选择"创建->业务Module",在弹出的对话框中设置Module名称, 如下图所示:
点击【完成】,在项目中生成业务Module:com.primeton.esb.custdemo.restaurant
,其中包含代理服务、中介服务、业务服务、Variable及Java。
一个ESB项目可以有多个业务Module。
- 代理服务:需关联到一个业务服务(穿透)或中介服务,即由这个业务服务或中介服务完成所代理的服务请求。直接关联业务服务时,协议类型需匹配。
- 中介服务:由中介流描述的处理逻辑,实现代理服务的业务处理。中介流中可包含一个或多个业务服务,以及相关的处理逻辑、数据转换等。中介服务中可以对JAVA方法、子中介服务、WS业务服务、JMS业务服务、HTTP业务服务、CUST业务服务、TCP业务服务、UDP业务服务、FTP业务服务、EJB业务服务、EMAIL业务服务等进行编排。
- 业务服务:是连接中介服务与Endpoint的数据处理层,根据不同的协议分为HTTP、JMS、WS、EJB、EMAIL、FTP、TCP、UDP、CUST等。
- Variable:可创建变量、变量分类等。
- Java:可创建标准Java类。
# 创建自定义Transport
在公共Module中选中"Transport",点右键,选择"创建->CUST类型Transport",在弹出的对话框中,配置相关参数,如下图所示:
点击【完成】,在资源管理器中产生"transport_cust.tansport_custx",并在中间编辑器中显示出具体的信息。配置如下图所示,填写必要信息并保存 。
- 协议类型:所要开发的协议的名称,根据实际情况填写。这里我们以TCP为协议类型。
- 扩展实现: Transport端组件实现类,负责接收请求并调用Transport发送请求。
- 扩展属性:自定义组件的所有相关属性配置,扩展属性的属性值支持变量配置。本例则设置一个port属性,表示TCP监听端口,值为9999。
- 响应线程池配置信息:为提升ESB消息处理能力,在消息接收处配置了线程池机制,是性能调优的核心配置之一。本教程案例统一采用全局配置,有关这部分的介绍可以参见产品联机帮助《ESB开发指南-使用指南-运行参数配置-线程池配置》。
输入协议类型为TCP,点击"扩展实现*"链接,打开类创建窗口,填写类名称TCPTransport,此类必须继承com.primeton.esb.adapter.framework.component.AbstractTransportComponent
类,如下图所示:
点击【完成】后,自动转到TCP Transport类源代码界面进行开发。源码详见TCPTransport.java。
package com.primeton.esb.custdemo.common;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.primeton.esb.adapter.framework.component.AbstractTransportComponent;
import com.primeton.esb.adapter.framework.config.IComponentConfig;
import com.primeton.esb.adapter.framework.exception.ESBAdapterException;
import com.primeton.esb.adapter.framework.messaging.INormalizedMessage;
import com.primeton.esb.adapter.framework.messaging.NormalizedMessageBuilder;
import com.primeton.esb.model.Constants;
public class TCPTransport extends AbstractTransportComponent {
private int port;
private ServerSocket serverSock;
private Thread listenerThread;
private ConcurrentHashMap<String, TCPTransport_Context> ctxMap = new ConcurrentHashMap<String, TCPTransport_Context>();
public void handleError(INormalizedMessage errMsg) throws ESBAdapterException {
// TODO 自动生成方法存根
Socket sock = null;
try {
String cid = (String) errMsg.getSystemHeader(Constants.CONTINUATION_ID);
TCPTransport_Context ctx = ctxMap.remove(cid);
if (ctx == null)
throw new Exception("resume context fail. cid='" + cid + "'");
String err = (String) errMsg.getMessagePayload();
sock = ctx.sock;
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
TCPUtil.writeString(out, err);
} catch (Exception e) {
e.printStackTrace();
throw new ESBAdapterException("handle error-reply error", e);
} finally {
if (sock != null) {
try {
sock.close(); //确保关闭了连接
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
public void handleReply(INormalizedMessage replyMsg) throws ESBAdapterException {
// TODO 自动生成方法存根
Socket sock = null;
try {
//从应答消息中取回cid
String cid = (String) replyMsg.getSystemHeader(Constants.CONTINUATION_ID);
//恢复上下文
TCPTransport_Context ctx = ctxMap.remove(cid);
if (ctx == null)
throw new Exception("resume context fail. cid='" + cid + "'");
//取得应答消息文本。(另外可从系统头、协议头中取得其他应答信息)
String reply = (String) replyMsg.getMessagePayload();
//构造应答消息并返回客户端(注意:此处用到了上下文中保存的socket连接及请求消息)
sock = ctx.sock;
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
TCPUtil.writeString(out, reply);
} catch (Exception e) {
e.printStackTrace();
throw new ESBAdapterException("handle reply error", e);
} finally {
if (sock != null) {
try {
sock.close(); //确保关闭了连接
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
public void init() throws ESBAdapterException {
// TODO 自动生成方法存根
//从配置的扩展属性中取得配置的端口号。
String portStr = this.config.getExtProperty("port");
this.port = Integer.parseInt(portStr);
}
public void onChange(IComponentConfig arg0) throws ESBAdapterException {
// TODO 自动生成方法存根
//配置发生变化时的处理。(用于支持热更新)
}
public void start() throws ESBAdapterException {
// TODO 自动生成方法存根
if (this.serverSock != null)
throw new ESBAdapterException("tcp server is in running.");
try {
this.serverSock = new ServerSocket(this.port);
} catch (Exception e) {
throw new ESBAdapterException("open listener at port " + this.port + " error.", e);
}
TCPTransport_ListenerRunnable listener = new TCPTransport_ListenerRunnable(serverSock);
this.listenerThread = new Thread(listener);
this.listenerThread.start();
System.out.println("TCP SERVER: init listener at port " + this.port);
}
public void stop() throws ESBAdapterException {
// TODO 自动生成方法存根
if (this.listenerThread != null) {
this.listenerThread.interrupt();
}
this.listenerThread = null;
if (this.serverSock != null) {
try {
this.serverSock.close();
} catch (Exception e) {
e.printStackTrace();
}
}
this.serverSock = null;
}
//=================================================
class TCPTransport_ListenerRunnable implements Runnable {
private ServerSocket serverSock;
public TCPTransport_ListenerRunnable(ServerSocket serverSock) {
this.serverSock = serverSock;
}
public void run() {
System.out.println("TCP SERVER: listener start !");
Socket clientSock = null;
while (true) {
//accept客户端连接
try {
clientSock = this.serverSock.accept();
} catch (Exception e) {
//accept出错,则停止整个TCP服务器,关闭服务端连接。
System.err.println("TCP SERVER: accept socket error.");
e.printStackTrace();
try {
this.serverSock.close();
} catch (Exception e1) {
e1.printStackTrace();
}
break;
}
//启动worker线程,处理客户端连接。
try {
TCPTransport_WorkerRunnable workerRunner = new TCPTransport_WorkerRunnable(clientSock);
Thread workerThread = new Thread(workerRunner);
workerThread.start();
} catch (Exception e) {
//启动workder线程出错,只关闭该请求的连接,不影响TCP服务器继续运行。
System.err.println("TCP SERVER: start worker error.");
e.printStackTrace();
try {
clientSock.close();
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
}
class TCPTransport_WorkerRunnable implements Runnable {
private Socket sock;
public TCPTransport_WorkerRunnable(Socket sock) {
this.sock = sock;
}
public void run() {
String cid = null;
try {
//读取客户端请求数据
DataInputStream in = new DataInputStream(this.sock.getInputStream());
String request = TCPUtil.readString(in);
System.err.println("TCP SERVER: req=" + request);
//构造esb请求消息
NormalizedMessageBuilder<String> msgBuilder = NormalizedMessageBuilder.withPayload(request);
msgBuilder.setRouteName("default"); //设置代理服务的路由ID(用于路由到指定的代理服务)
//设置消息头(此处用一个空的map做示例,可以根据需要改用其他类型)
Map<String, String> messageHeaders = new HashMap<String, String>();
msgBuilder.setMessageHeaders(messageHeaders);
//设置协议头(此处用一个空的map做示例,可以根据需要改用其他类型)
Map<String, String> protocolHeaders = new HashMap<String, String>();
msgBuilder.setProtocolHeaders(protocolHeaders);
INormalizedMessage msg = msgBuilder.build();
//将请求消息发送到request-channel
handleRequest(msg);
//从请求消息中取回cid(由底层框架生成,唯一标识一次服务请求)
cid = (String) msg.getSystemHeader(Constants.CONTINUATION_ID);
//用上下文保存当前状态
TCPTransport_Context ctx = new TCPTransport_Context();
ctx.cid = cid;
ctx.sock = this.sock;
ctx.request = request;
//将上下文挂起,等待异步回调处理
ctxMap.put(ctx.cid, ctx);
System.err.println("TCP SERVER: end request. cid=" + ctx.cid);
} catch (Exception e) {
System.err.println("TCP SERVER: execute worker error.");
e.printStackTrace();
try {
this.sock.close();
} catch (Exception e1) {
e1.printStackTrace();
}
if (cid != null) {
ctxMap.remove(cid); //出错时,确保删除了上下文,避免内存泄露。
}
}
}
}
class TCPTransport_Context {
String cid;
Socket sock;
String request;
}
}
TCPTransport主要实现方法介绍
方法 | 说明 |
---|---|
init() | 组件初始化,ESB启动时会调用此方法 |
start() | 组件启动,ESB启动时会调用此方法 |
stop() | 组件停止,当通过Console卸载时会调用此方法 |
handleReply(INormalizedMessage arg0) | 处理消息响应,当发出请求后,对于ESB 响应消息进行处理 |
handleError(INormalizedMessage arg0) | 处理消息响应,当发出请求后,对于接收的ESB 错误响应消息进行处理 |
onChange(IComponentConfig arg0) | 热更新,当通过Console修改配置参数时,会调用次方法 |
NormalizeMessage(规格化消息)为组件框架与ESBServer交互的消息,通过NormalizedMessageBuilder类来构建,创建方式如下:
NormalizedMessageBuilder<String> msgBuilder =
NormalizedMessageBuilder.withPayload("message");
//此值必须与代理服务扩展属性中的routeName值保持一致
msgBuilder.setRouteName("default");
// 添加系统头
msgBuilder.addSystemHeader("", "");
// 设置消息头
Map<String, String> messageHeaders = new HashMap<String, String>();
msgBuilder.setMessageHeaders(messageHeaders);
// 设置协议头
Map<String, String> protocolHeaders = new HashMap<String, String>();
msgBuilder.setProtocolHeaders(protocolHeaders);
INormalizedMessage nm = msgBuilder.build();
//调用transport发送请求
handleRequest(nm);
构建NormalizedMessage消息时routeName值必须与扩展属性配置中的routeName属性值保持一致,否则Transport发送请求时找不到对应的代理服务出现异常。NormalizedMessageBuilder |
---|
NormalizedMessageBuilder主要方法介绍:
方法 | 说明 |
---|---|
withPayload(T payload) | 创建NormalizedMessageBuilder,并设置NormalizedMessage消息体 |
addSystemHeader(String headerName, Object headerValue) | ESB内部系统消息头 |
setProtocolHeaders(Object headerValue) | 协议头,根据实际设置 |
setMessageHeaders(Object headerValue) | 消息头,根据实际设置 |
setMessagePayload(T messagePayload) | 设置NormalizedMessage的消息体 |
setRouteName(String routeName) | 标识消息由Transport路由到哪个代理服务,此值必须与代理服务扩展属性中的routeName值保持一致 |
在Transport面板点击+按钮,可进行扩展属性的添加,属性标识和属性值为必填项,扩展属性配置如下图所示:
ESB Server启动后,扩展属性值会被加载到com.primeton.esb.adapter.framework.config.IComponentConfig
类中,获取方式如下所示:
public void init() throws ESBAdapterException {
//从配置的扩展属性中取得配置的端口号。
String portStr = this.config.getExtProperty("port");
...
}
配置完成后的Transport如下图所示:
# 创建自定义Endpoint
在公共Module中展开"Endpoint",选中其下面的"CUST",点右键,选择"创建->CUST类型Endpoint",在弹出的对话框中,配置相关参数,如下图所示:
点击【完成】,在资源管理器中产生"endpoint_cust.endpoint_custx",并在中间编辑器中显示出具体的信息。配置如下图所示,填写配置信息并保存。
CUST Endpoint配置:
- 协议类型:所要开发的协议的名称,根据实际情况填写。这里我们以TCP为协议类型。
- 扩展实现:Endpoint端组件实现,主要由Endpoint端调用处理请求返回响应结果。
- 扩展属性:自定义组件的所有相关属性配置。本例则设置两个属性:host属性,表示endpoint所连接的目标主机地址;port属性,表示目标主机的TCP监听端口。本例中两个值分别为"127.0.0.1"和"8888"。
- 请求线程池配置信息:为提升ESB消息处理能力,在消息接收处配置了线程池机制,是性能调优的核心配置之一。本教程案例统一采用全局配置,有关这部分的介绍可以参见产品联机帮助《ESB开发指南-使用指南-运行参数配置-线程池配置》。
点击"扩展实现*"按钮,打开类创建窗口,填写类名称TCPEndpoint,此类必须继承com.primeton.esb.adapter.framework.component.AbstractEndpointComponent类,如下图所示:
点击【完成】后,自动转到TCP Endpoint类源代码界面进行开发。源码详见TCPEndpoint.java。
package com.primeton.esb.custdemo.common;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import com.primeton.esb.adapter.framework.component.AbstractEndpointComponent;
import com.primeton.esb.adapter.framework.config.IComponentConfig;
import com.primeton.esb.adapter.framework.exception.ESBAdapterException;
import com.primeton.esb.adapter.framework.messaging.INormalizedMessage;
import com.primeton.esb.adapter.framework.messaging.impl.NormalizedMessage;
import com.primeton.esb.model.Constants;
public class TCPEndpoint extends AbstractEndpointComponent {
private String host;
private int port;
public void handleRequest(INormalizedMessage requestMsg)
throws ESBAdapterException {
// TODO 自动生成方法存根
//取得请求消息文本。(另外可从系统头、协议头中取得其他请求信息)
String request = (String) requestMsg.getMessagePayload();
//建立到目标主机的短连接
SocketAddress addr = new InetSocketAddress(this.host, this.port);
Socket sock = new Socket();
try {
sock.connect(addr);
} catch (Exception e) {
throw new ESBAdapterException("connect to server " + this.host + ":" + this.port + "' error", e);
}
System.out.println("TCPEndpoint: connect to server " + this.host + ":" + this.port);
//向主机发送请求报文,并取得应答报文。(同步处理)
String reply = null;
try {
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
TCPUtil.writeString(out, request);
DataInputStream in = new DataInputStream(sock.getInputStream());
reply = TCPUtil.readString(in);
} catch (Exception e) {
throw new ESBAdapterException("handle request error", e);
} finally {
try {
sock.close();
} catch (Exception e) {
e.printStackTrace();
}
}
for (String key : requestMsg.getSystemHeaders().keySet()) {
Object value = requestMsg.getSystemHeaders().get(key);
System.out.println("----->sys-header: " + key + "=" + value);
}
/*
----->sys-header: $tip.continuationId=0d2e06c1-2e6d-11e3-8280-24fd52ce693f
----->sys-header: $history=[{timestamp=1381053165884, componentName=com.primeton.esb.cust_prj.common.transport.transport_cust.outgoing.request.channel, componentType=channel}, {timestamp=1381053165884, componentName=com.primeton.esb.cust_prj.cust_demo.ps.tcp.proxyService_tcp.default.incoming.request.channel, componentType=channel}, {timestamp=1381053165884, componentName=com.primeton.esb.cust_prj.cust_demo.ps.tcp.proxyService_tcp.default.outgoing.request.channel, componentType=channel}, {timestamp=1381053165884, componentName=com.primeton.esb.cust_prj.cust_demo.ms.tcp.mediaService.incoming.request.channel, componentType=channel}, {timestamp=1381053165962, componentName=com.primeton.esb.cust_prj.cust_demo.ms.tcp.mediaService.outgoing.request.channel, componentType=channel}, {timestamp=1381053165962, componentName=com.primeton.esb.cust_prj.cust_demo.bs.tcp.bizService_tcp.default.incoming.request.channel, componentType=channel}, {timestamp=1381053165962, componentName=com.primeton.esb.cust_prj.cust_demo.bs.tcp.bizService_tcp.default.outgoing.request.channel, componentType=channel}, {timestamp=1381053165962, componentName=com.primeton.esb.cust_prj.common.endpoint.endpoint_tcp.incoming.request.channel, componentType=channel}]
----->sys-header: $tip.media.uuid=9446a716-3706-4206-a6c0-b3520fe37f66
----->sys-header: $id=0d3c5ea1-2e6d-11e3-8280-24fd52ce693f
----->sys-header: $tip.reply.channel=com.primeton.esb.cust_prj.cust_demo.bs.tcp.bizService_tcp.default.incoming.reply.channel
----->sys-header: $tip.error.channel=com.primeton.esb.cust_prj.cust_demo.bs.tcp.bizService_tcp.default.incoming.error.channel
----->sys-header: $timestamp=1381053165884
----->sys-header: $tip.media.returnXpath=outMessage
----->sys-header: $tip.request.channel=com.primeton.esb.cust_prj.common.endpoint.endpoint_tcp.incoming.request.channel
----->sys-header: $tip_message_invoke_type=3
----
--->const Constants.CONTINUATION_ID='$tip.continuationId'
--->const Constants.REPLY_CHANNEL='$tip.reply.channel'
--->const Constants.MEDIA_UUID_NAME='$tip.media.uuid'
--->const Constants.MEDIA_RETURN_XPATH_NAME='$tip.media.returnXpath'
*/
String cid = (String) requestMsg.getSystemHeader(Constants.CONTINUATION_ID);
//String msgId = (String)requestMsg.getSystemHeader(MessageHeaders.ID);
String replyCh = (String) requestMsg.getSystemHeader(Constants.REPLY_CHANNEL);
//Integer invokeType = (Integer)requestMsg.getSystemHeader("$tip_message_invoke_type");
String uuid = (String) requestMsg.getSystemHeader("$tip.media.uuid"); //RouterHandler.java:50
String xpath = (String) requestMsg.getSystemHeader("$tip.media.returnXpath"); //RouterHandler.java:50
//Integer invokeType = (Integer)requestMsg.getSystemHeader("$tip_message_invoke_type");
//返回应答消息。
NormalizedMessage replyMsg = new NormalizedMessage();
//replyMsg.getSystemHeaders().putAll(requestMsg.getSystemHeaders());
replyMsg.getSystemHeaders().put(Constants.CONTINUATION_ID, cid);
replyMsg.getSystemHeaders().put(Constants.REPLY_CHANNEL, replyCh); //RouterHandler.java:50
//replyMsg.getSystemHeaders().put("$tip.media.uuid", uuid); //MediationService.java:158
replyMsg.getSystemHeaders().put(Constants.MEDIA_UUID_NAME, uuid); //MediationService.java:158
//replyMsg.getSystemHeaders().put("$tip.media.returnXpath", xpath);
replyMsg.getSystemHeaders().put(Constants.MEDIA_RETURN_XPATH_NAME, xpath);
System.out.println("--->const Constants.CONTINUATION_ID='" + Constants.CONTINUATION_ID + "'");
System.out.println("--->const Constants.REPLY_CHANNEL='" + Constants.REPLY_CHANNEL + "'");
System.out.println("--->const Constants.MEDIA_UUID_NAME='" + Constants.MEDIA_UUID_NAME + "'");
System.out.println("--->const Constants.MEDIA_RETURN_XPATH_NAME='" + Constants.MEDIA_RETURN_XPATH_NAME + "'");
//BaseDataContextImpl.java:290
//replyMsg.getSystemHeaders().put(MessageHeaders.ID, msgId);
//replyMsg.getSystemHeaders().put("$tip_message_invoke_type", invokeType);
replyMsg.setMessagePayload(reply);
this.handleReply(replyMsg);
}
public void init() throws ESBAdapterException {
// TODO 自动生成方法存根
//从配置的扩展属性中取得配置的端口号。
this.host = this.config.getExtProperty("host");
String portStr = this.config.getExtProperty("port");
this.port = Integer.parseInt(portStr);
}
public void onChange(IComponentConfig arg0) throws ESBAdapterException {
// TODO 自动生成方法存根
//配置发生变化时的处理。(用于支持热更新)
}
public void start() throws ESBAdapterException {
// TODO 自动生成方法存根
}
public void stop() throws ESBAdapterException {
// TODO 自动生成方法存根
}
//===============================================
class TCPEndpoint_ReplyReader {
}
}
TCP Endpoint主要实现方法介绍:
方法 | 说明 |
---|---|
init() | 组件初始化,ESB启动时会调用此方法 |
start() | 组件启动,ESB启动时会调用此方法 |
stop() | 组件停止,当通过Console卸载时会调用此方法 |
handleRequest(INormalizedMessage arg0) | 处理请求,Endpoint接收到请求后会调用到此方法 |
onChange(IComponentConfig arg0) | 热更新,当通过Console修改配置参数时,会调用此方法 |
在Endpoint面板点击+按钮,可进行扩展属性的添加,属性标识和属性值为必填项,扩展属性如下图所示:
配置完成后的Endpoint如下图所示:
# 创建自定义业务服务
在业务Module(com.primeton.esb.custdemo.restaurant)中点击"业务服务",在右键菜单中选择"创建->CUST业务服务",在弹出的对话框中,输入类别cust,类别名称cust,如下图所示:
- 类别:用于cust业务服务的业务化分类,便于业务服务的管理、查找。可以使用已经存在的类型,也可以填写新的类型。
- 文件名称:存储业务服务配置信息的文件名称。该文件名称要求本module中唯一,不推荐长文件名。
- 显示名称:业务服务的显示名称。显示名称可以为中文,推荐具有业务含义的显示名,便于管理、查找。
点击【完成】,在资源管理器中产生"bizService_cust.outbound_custx",并在中间编辑器中显示出具体的信息。点击"Endpoint*"后的【选择...】,选择刚创建的CUST类型的Endpoint:com.primeton.esb.custdemo.common.endpoint.endpoint_cust
,如下图所示:
- 协议类型:所要开发的协议的名称,根据实际情况填写。这里我们以TCP为协议类型。
- 扩展属性:自定义组件的相关属性配置,支持变量
- 请求线程池配置信息:为提升ESB消息处理能力,在消息接收处配置了线程池机制,是性能调优的核心配置之一。本教程案例统一采用全局配置,有关这部分的介绍可以参见产品联机帮助《ESB开发指南-使用指南-运行参数配置-线程池配置》。
# 创建自定义代理服务
在业务Module(com.primeton.esb.custdemo.restaurant
)中点击"代理服务",在右键菜单中选择"创建->CUST代理服务",在弹出的对话框中,输入类别cust,类别名称cust,设置文件名和显示名称,如下图所示:
点击【完成】,在资源管理器中产生"proxyService_cust.inbound_custx",并在中间编辑器中显示出具体的信息。"Transport"关联公共Module中的CUST类型Transport, 点击"Transport*"后的【选择...】,选择刚创建的CUST类型的Transport:com.primeton.esb.custdemo.common.transport.transport_cust, "关联服务类型"选择"业务服务", 点击"服务名称"对应的【选择...】,在弹出的对话框中选择CUST业务服务"com.primeton.esb.custdemo.restaurant.bs.cust.bizService_cust.default",填写相关信息并保存,如下图所示:
- 协议类型:所要开发的协议的名称,根据实际情况填写。这里我们以TCP为协议类型。
- 扩展属性:自定义组件的所有相关属性配置,支持变量。
- 线程池配置信息:为提升ESB消息处理能力,在消息接收处配置了线程池机制,是性能调优的核心配置之一。本教程案例统一采用全局配置,有关这部分的介绍可以参见产品联机帮助《ESB开发指南-使用指南-运行参数配置-线程池配置》。
扩展属性配置中的routeName属性值必须要填写,构建NormalizedMessage消息时routeName值要保持一致,否则Transport发送请求时找不到对应的代理服务出现异常。
NormalizedMessageBuilder<String> msgBuilder = NormalizedMessageBuilder.withPayload("message");
msgBuilder.setRouteName("default");
配置如下图所示:
# 第三方jar包配置
在studio下开发扩展类实现,如果用到相应的第三方jar包,需要将相应jar包配置到classpath中。在custdemo项目打开右键菜单,选择属性, 在打开的配置面板中点击左边导航栏中"【Java构建路径】"按钮,然后选中"【库】"选项卡,点击"【添加外部JAR(X)】",将第三方jar包配置到classpath。
# 测试运行
在开发环境中安装的ESB Studio内置了ESB Server,服务开发完后可以自动以文件形式部署到ESB Server中,直接可以测试运行。 运行时需要将第三方拷贝至esb安装目录\server\libs下,选择server1(custdemo)运行。如下图所示:
启动成功后,运行结果下图所示:
通过TCPTestClient向9999端口发送消息,经过ESB转发后,通过TCPTestServer可以监听到8888端口的消息,如下图所示:
# 其他示例代码
其他可能用到的源码
package com.primeton.esb.custdemo.common;
import java.io.DataInputStream;
import java.io.DataOutputStream;
public class TCPUtil {
private static final int MAX_DATALEN = 8192;
private static final String DEFAULT_ENCODING = "UTF-8";
public static byte[] readData(DataInputStream in) throws Exception {
int len = in.readInt();
if (len < 0 || len > MAX_DATALEN)
throw new Exception("read data len error: " + len);
byte[] buf = new byte[len];
in.read(buf);
return buf;
}
public static void writeData(DataOutputStream out, byte[] buf) throws Exception {
if (buf.length > MAX_DATALEN)
throw new Exception("write data len error: " + buf.length);
out.writeInt(buf.length);
out.write(buf);
}
public static String readString(DataInputStream in, String encoding) throws Exception {
byte[] buf = readData(in);
String str = new String(buf, encoding);
return str;
}
public static void writeString(DataOutputStream out, String str, String encoding) throws Exception {
byte[] buf = str.getBytes(encoding);
writeData(out, buf);
}
public static String readString(DataInputStream in) throws Exception {
return readString(in, DEFAULT_ENCODING);
}
public static void writeString(DataOutputStream out, String str) throws Exception {
writeString(out, str, DEFAULT_ENCODING);
}
}
package com.primeton.esb.custdemo.common;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
public class TCPTestServer {
public static void main(String[] args) throws Exception {
int port = 8888;
ServerSocket sock = new ServerSocket(port);
System.err.println("TEST SERVER: start listen ...");
try {
while (true) {
Socket clientSock = sock.accept();
startWorker(clientSock);
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static void startWorker(final Socket clientSock) {
Runnable runner = new Runnable() {
public void run() {
try {
while (true) {
DataInputStream in = new DataInputStream(clientSock.getInputStream());
DataOutputStream out = new DataOutputStream(clientSock.getOutputStream());
String request = TCPUtil.readString(in);
String reply = "reply for '" + request + "'";
TCPUtil.writeString(out, reply);
System.out.println("recv&send request='" + request + "', reply='" + reply + "'");
}
} catch (java.io.EOFException e) {
System.out.println("client closed. ");
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
clientSock.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
};
Thread t = new Thread(runner);
t.start();
}
}
package com.primeton.esb.custdemo.common;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
public class TCPTestClient {
private static String host = "localhost";
//private static int port = 8888;
private static int port = 9999;
public static void main(String[] args) throws Exception {
int threadNum = 3, queryNum = 5;
//int threadNum=1, queryNum=1;
for (int i = 0; i < threadNum; i++) {
startQueryThread(host, port, queryNum, i);
}
}
private static void startQueryThread(final String host, final int port,
final int queryNum, final int threadIndex) {
Runnable query = new Runnable() {
public void run() {
for (int i = 0; i < queryNum; i++) {
try {
doQuery(host, port, threadIndex, i);
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}
};
Thread t = new Thread(query);
t.start();
}
private static void doQuery(String host, int port, int threadIndex, int queryIndex) throws Exception {
SocketAddress addr = new InetSocketAddress(host, port);
Socket sock = new Socket();
try {
sock.connect(addr);
} catch (Exception e) {
throw new Exception("connect to server error.", e);
}
try {
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
DataInputStream in = new DataInputStream(sock.getInputStream());
String request = "request at (" + threadIndex + "," + queryIndex + ")";
System.out.println("send request: " + request);
TCPUtil.writeString(out, request);
String reply = TCPUtil.readString(in);
System.out.println("recv reply: " + reply);
} catch (Exception e) {
throw new Exception("connect to server error.", e);
} finally {
sock.close();
}
}
}