1. 消息队列
1.1. 消息说明
Entity的CUD事件发生后,将消息发送到SQS对象。
发送消息的对象采用白名单机制,订阅对象才会发送事件消息到SQS队列,需要订阅对象请联系管理员
- 消息内容
- tenantId: 租户ID
- objectName: 对象名称
- objectId: 对象的ID
- operation: 操作,值为create, update, delete
- billTypeId: 单据类型ID,仅在单据时有值
- billTypeCode: 单据类型编码, 仅在单据时有值
- billStatus: 单据的状态(完全状态)
返回内容为:
{
"tenantId":"XXXX",
"objectName":"Reimburse",
"objectId":"KBJG]03V",
"operation":"create",
"billTypeId":"EV2VP160CXE000B",
"billTypeCode":"EX052",
"billStatus":"BillStatus.draft"
}
- UserTask对象消息会额外返回data,data包含以下信息(仅UserTask会返回data)
- isActed: 是否已执行
- taskName: 任务名称
- objectId: 任务关联的对象id
- objectType: 任务关联的对象名称
- taskUsers: 任务通知人列表
- actionName: 执行的动作名称,仅在审批后有值
- actionValue: 执行的动作value,仅在审批后有值
- isDeleted: 是否已删除,已删除的任务不需要再执行
- isActed: 是否已完成
- userId: 用户id
- userName: 用户名称
- userCode: 用户编码
- userExternalObjectId: 用户外部系统id
UserTask返回消息内容为:
{
"tenantId": "1",
"objectName": "UserTask", // 对象名称
"objectId": "HNM71P5056V0008", // 对象id
"operation": "update", // 操作
"data": { // 消息数据(仅UserTask会返回data)
"isActed": true, // 是否已执行
"taskName": "凭证审核", // 任务名称
"objectId": "EMM71P5056V0002", // 任务关联的对象id
"objectType": "Voucher", // 任务关联的对象名称
"taskUsers": [ // 任务通知人列表
{
"actionName": "同意", // 执行的动作名称
"actionValue": "userAgree", // 执行的动作value
"isDeleted": false, // 是否已删除,已删除的任务不需要再执行
"isActed": true, // 是否已完成
"userId": "Q4V6B05DW0001", // 用户id
"userName": "王XX", // 用户名称
"userCode": "05", // 用户编码
"userExternalObjectId": "" // 用户外部系统id
}
]
}
}
1.2. SQS 使用说明
1.2.1. 在工程中引用sdk, 两者选其一即可
引入官方sqs sdk
<dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-sqs</artifactId> <version>version</version> </dependency>
引入openapi sdk
since 1.1.0
<dependency> <groupId>com.q7link.openapi</groupId> <artifactId>openapi-client</artifactId> <version>version</version> </dependency>
1.2.2. 支持语言
- 提供sdk的语言, 文档地址:https://docs.amazonaws.cn/
- Java
- .NET
- Python
- PHP
- Ruby
- 浏览器
1.2.3. java sdk使用示例 aws sdk 版本一
package com.q7link.openapi.example;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.opensdk.BaseRequest;
import com.amazonaws.opensdk.SdkRequestConfig;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.q7link.openapi.Openapi;
import com.q7link.openapi.model.GetQueueRequest;
import com.q7link.openapi.model.Queue;
import java.util.List;
/**
* sqs request example
*
* @author yankunjie
*/
public class AmazonSqsReceiveMessage {
private static Regions regions = Regions.CN_NORTH_1;
private static AmazonSQS amazonSQS;
private static Openapi openapi;
private static final String ACCESS_KEY_ID = "";
private static final String SECRET_KEY = "";
private static final String OPEN_ID = "";
public static void main(String[] args) {
// 通过openapi queue接口获取queueUrl
Queue queue = getQueue();
String queueUrl = queue.getQueueUrl();
while (true) {
// 接收消息
List<Message> messages = receiveMessages(queueUrl);
for (Message message : messages) {
// 消息体
String body = message.getBody();
// do something
System.out.println(body);
// 删除消息
amazonSQS().deleteMessage(queueUrl, message.getReceiptHandle());
}
}
}
/**
* 接收消息,推荐使用长轮询
* <pre>
* 1.最大等待时间:20秒 值范围:1-20秒
* 2.最大接收消息数:10 值范围:1-10
* 3.默认可见性超时:60秒 值范围:0-12天
* 根据业务需求适当调整可见性超时时间
* </pre>
*
* @param queueUrl 队列url
* @return 消息列表
*/
private static List<Message> receiveMessages(String queueUrl) {
ReceiveMessageRequest request = new ReceiveMessageRequest()
// 队列url
.withQueueUrl(queueUrl)
// 可见性超时:默认60秒,收到消息在可见性超时时间内不会再次收到该消息。注意:可见性超时无法保证不会接收消息两次
.withVisibilityTimeout(10)
// 最大等待时间:20秒 值范围:1-20秒
.withWaitTimeSeconds(5)
// 最大接收消息数:10 值范围:1-10
.withMaxNumberOfMessages(10);
return amazonSQS().receiveMessage(request).getMessages();
}
/**
* 获取队列信息
*
* @return 队列对象
*/
private static Queue getQueue() {
GetQueueRequest request = new GetQueueRequest();
request.sdkRequestConfig(getSdkRequestConfig(request));
return openapi().getQueue(request).getQueue();
}
private static SdkRequestConfig getSdkRequestConfig(BaseRequest request) {
return request.sdkRequestConfig().copyBuilder()
.customHeader("Content-Type", "application/json")
.customHeader("Access-Key-Id", ACCESS_KEY_ID)
.customHeader("Open-Id", OPEN_ID)
.build();
}
public static Openapi openapi() {
if (openapi == null) {
String accessKey = ACCESS_KEY_ID;
String accessSecret = SECRET_KEY;
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, accessSecret);
openapi = Openapi.builder()
.iamCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.build();
}
return openapi;
}
public static AmazonSQS amazonSQS() {
if (amazonSQS == null) {
String accessKey = ACCESS_KEY_ID;
String accessSecret = SECRET_KEY;
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, accessSecret);
amazonSQS = AmazonSQSClientBuilder.standard()
.withRegion(regions)
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.build();
}
return amazonSQS;
}
}
SQS官方文档:https://docs.amazonaws.cn/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html