# bps分布式事务
# 功能描述
BPS提供了客户端的流程操作API,通过客户端API操作流程时,每一次调用都会立即在引擎端执行并使流程数据入库。客户端分布式事务是为了保证流程批量操作的原子性。
# 接口描述
方法名 | 功能描述 |
---|---|
begin() | 开启分布式事务 |
commit() | 提交分布式事务 |
rollback() | 回滚分布式事务 |
beginWithUT(javax.transaction.UserTransaction userTransaction) | 开启分布式事务,同时将该事务与用户事务关联 |
rollbackWithCompensate(com.primeton.workflow.api2.client.gtx. IWFClientTransactionCompensate compensate) | 回滚分布式事务,并回调补偿接口 |
getStatus() | 获取当前分布式事务的状态 |
beginWithUTAndTimeout(javax.transaction.UserTransaction userTransaction, int timeoutSeconds) | 开启分布式事务,同时将该事务与用户事务关联,并指定过期时间 |
beginWithTimeout(int timeoutSeconds) | 开启分布式事务,同时指定过期时间 |
# 支持分布式事务的API
只有流程操作相关的API支持分布式事务,具体接口如下表:
接口名 | 接口描述 |
---|---|
com.eos.workflow.api.IWFProcessInstManager | 流程实例管理 |
com.eos.workflow.api.IWFActivityInstManager | 活动实例管理 |
com.eos.workflow.api.IWFWorkItemManager | 工作项管理 |
com.eos.workflow.api.IWFRelativeDataManager | 相关数据管理 |
com.eos.workflow.api.IWFWorkItemDrawbackManager | 工作项撤回管理 |
说明: 即使使用了列表中的接口,也不是所有的方法都支持分布式事务,以get、find、query、generate开头的查询类方法就不支持分布式事务,如IWFWorkItemManager.queryWorkItemUrl()、IWFRelativeDataManager.getRelativeData()等。
# 示例
业务系统需要自己管理业务事务,在业务操作和流程操作都做完之后,需要先提交业务事务,再提交分布式事务。如果在提交业务事务时失败,应该立即回滚业务事务,并同时回滚分布式事务。
// 获取BPS流程服务客户端
IWFOprApiClient oprClient = getOprApiClient(bpsClientID, tenantId);
// 获取分布式全局事务管理器
IWFClientTransactionManager transactionManager = oprClient.getClientTransactionManager();
Connection conn = null;
try {
// 获取用户业务系统的数据库连接并设置为不自动提交事务,即自己管理事务
conn = DBUtil.getConnection();
conn.setAutoCommit(false);
// 开启分布式事务
transactionManager.begin();
// 流程操作1,创建并启动流程实例
long procInstId = oprClient.getWFProcessInstApi().createProcessInstance("test-tx1", "test-tx1-8", "test");
// 业务操作1
conn.createStatement().executeUpdate(sql1);
// 流程操作2,创建并启动活动实例
oprClient.getWFProcessInstApi().createProcessInstance("test-tx1", "test-tx1-9", "test");
// 业务操作2
conn.createStatement().executeUpdate(sql2);
try {
conn.commit(); // 提交业务系统事务
} catch (Exception e) {
conn.rollback(); // 回滚业务系统事务
throw new Exception(e); // 抛出异常以便阻止分布式事务的提交
}
// 提交分布式事务,批量执行流程操作
transactionManager.commit();
System.out.println("业务系统事务和流程分布式事务全部提交成功");
} catch (Throwable t) { // 出现异常,可能是业务提交失败,也可能是业务提交成功但流程提交失败
if(transactionManager.getStatus() == Status.STATUS_ROLLING_BACK) {
// 如果分布式事务的状态为STATUS_ROLLING_BACK,说明业务提交成功但流程提交失败,则需要回滚时做业务补偿
transactionManager.rollbackWithCompensate(() -> {
// 当分布式事务提交失败时,由于业务事务已提交,这里需要做业务补偿动作
// doCompensate();
});
} else {
// 其他状态,说明是业务提交失败,只需要回滚分布式事务,不需要做业务补偿
transactionManager.rollback();
}
} finally {
// 关闭业务系统数据库资源
DBUtil.close(conn);
}
//获取BPS操作客户端
public static IWFOprApiClient getOprApiClient(String clientID, String tenantId) {
try {
WFApiServerUri serverUri = new WFApiServerUri("http://" + clientID + "/bps");
String userId = ""; //客户端自定义获取userId
String userName = ""; //客户端自定义获取userName
WFApiClientRequestUser requestUser = null;
if (isMultiTenantMode(clientID)) {
if (StringUtils.isEmpty(tenantId)) {
tenantId = "default"; //默认为 default
}
String token = getToken(clientID, tenantId);
requestUser = new WFApiClientRequestUser(clientID, userId, userName, tenantId, token);
} else {
requestUser = new WFApiClientRequestUser(clientID, userId, userName);
}
WFApiClientFactory.getRequestUserManager().setCurrentUser(requestUser);
oprApiClient = WFApiClientFactory.getOprApiClient(serverUri);
} catch (WFApiException e) {
throw new RuntimeException("Bps Client[" + clientID + "] is not existed!");
}
return oprApiClient;
}
//获取token
public static String getToken(String clientID, String tenantId) {
WFApiServerUri serverUri = new WFApiServerUri("http://" + clientID + "/bps");
String userId = ""; //客户端自定义获取userId
String userName = ""; //客户端自定义获取userName
WFApiClientRequestUser requestUser = new WFApiClientRequestUser(clientID, userId, userName);
WFApiClientFactory.getRequestUserManager().setCurrentUser(requestUser);
return WFApiClientFactory.getMgrApiClient(serverUri).getWFMultiTenancyApi().getTenantIdentityToken(tenantId);
}
说明:
客户端需要用到bps分布式事务功能,需在客户端pom文件引入bps相关的依赖才能实现功能,如下所示:
<dependency>
<groupId>com.primeton.bps</groupId>
<artifactId>bps-server-api2-client-resttemplate</artifactId>
<version>${bps-version}</version>
</dependency>
<dependency>
<groupId>com.primeton.bps</groupId>
<artifactId>bps-server-api2-client-localjvm</artifactId>
<version>${bps-version}</version>
</dependency>
<dependency>
# bps分布式事务接口提交方式
业务系统在调用分布式全局事务管理器的commit方法时,bps会调用com.primeton.workflow.api2.client.gtx.impl.WFClientTransaction#commit,该方法中会针对业务系统调用的多个支持事务的bps方法批量执行调用提交。
public void commit() {
try {
status = Status.STATUS_COMMITTING;
if (userTransaction != null) {
userTransaction.commit();
}
if (invokeInfos != null && invokeInfos.size() > 0) {
wfClientTxManager.getOprApiClient().getWFCommonApi().batchExecuteApis(invokeInfos);
}
status = Status.STATUS_COMMITTED;
} catch (Throwable e) {
status = Status.STATUS_MARKED_ROLLBACK;
throw WFApiExceptionUtils.wrapException(e);
}
}
其中batchExecuteApis方法是对bps拦截到支持事务的api做批量调用提交。
# 分布式事务的超时
当业务系统开启了BPS分布式事务后,必须在一定的时间范围内(默认为5分钟)提交事务,否则事务会超时,事务中所有的流程调用都会自动回滚。
# 超时配置
客户端分布式事务的超时机制默认是开启的,业务系统可以根据实际情况手动调整超时时间。
- 客户端分布式事务超时配置
目前超时配置不支持客户端自定义配置,需手动调用相关接口并传入超时时间,具体示例代码如下:
transactionManager.beginWithTimout(10); //参数为超时时间
//或者调用这个接口
transactionManager.beginWithUTAndTimeout(UserTransaction userTransaction, int timeoutSeconds);