消息队列
消息说明
Entity的CUD事件发生后,将产生事件消息
两种方式获取消息
- 通过SQS队列轮训消息 发送消息的对象采用白名单机制,订阅对象才会发送事件消息到SQS队列,需要订阅对象请联系管理员
- 通过webhook接收消息 需要联系管理员设置webhook地址
- 消息内容
- tenantId: 租户ID
- objectName: 对象名称
- objectId: 对象的ID
- operation: 操作,值为create, update, delete
- billTypeId: 单据类型ID,仅在单据时有值
- billTypeCode: 单据类型编码, 仅在单据时有值
- billStatus: 单据的状态(完全状态)
返回内容为:
{
"tenantId":"XXXX",
"objectName":"Reimburse",
"objectId":"KBJG]03V",
"operation":"create",
"billTypeId":"EV2VP160CXE000B",
"billTypeCode":"EX052",
"billStatus":"BillStatus.draft"
}
- UserTask对象消息会额外返回data,data包含以下信息(仅UserTask会返回data)
- isActed: 是否已执行
- taskName: 任务名称
- objectId: 任务关联的对象id
- objectType: 任务关联的对象名称
- taskUsers: 任务通知人列表
- actionName: 执行的动作名称,仅在审批后有值
- actionValue: 执行的动作value,仅在审批后有值
- isDeleted: 是否已删除,已删除的任务不需要再执行
- isActed: 是否已完成
- userId: 用户id
- userName: 用户名称
- userCode: 用户编码
- userExternalObjectId: 用户外部系统id
UserTask返回消息内容为:
{
"tenantId": "1",
"objectName": "UserTask", // 对象名称
"objectId": "HNM71P5056V0008", // 对象id
"operation": "update", // 操作
"data": { // 消息数据(仅UserTask会返回data)
"isActed": true, // 是否已执行
"taskName": "凭证审核", // 任务名称
"objectId": "EMM71P5056V0002", // 任务关联的对象id
"objectType": "Voucher", // 任务关联的对象名称
"taskUsers": [ // 任务通知人列表
{
"actionName": "同意", // 执行的动作名称
"actionValue": "userAgree", // 执行的动作value
"isDeleted": false, // 是否已删除,已删除的任务不需要再执行
"isActed": true, // 是否已完成
"userId": "Q4V6B05DW0001", // 用户id
"userName": "王XX", // 用户名称
"userCode": "05", // 用户编码
"userExternalObjectId": "" // 用户外部系统id
}
]
}
}
SQS 使用说明
在工程中引用sdk, 两者选其一即可
- 引入官方sqs sdk
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
<version>version</version>
</dependency>
- 引入openapi sdk
since 1.1.0
<dependency>
<groupId>com.q7link.openapi</groupId>
<artifactId>openapi-client</artifactId>
<version>version</version>
</dependency>
支持语言
- 提供sdk的语言, 文档地址:https://docs.amazonaws.cn/
- Java
- .NET
- Python
- PHP
- Ruby
- 浏览器
java sdk使用示例 aws sdk 版本一
package com.q7link.openapi.example;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.opensdk.BaseRequest;
import com.amazonaws.opensdk.SdkRequestConfig;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.q7link.openapi.Openapi;
import com.q7link.openapi.model.GetQueueRequest;
import com.q7link.openapi.model.Queue;
import java.util.List;
/**
* sqs request example
*
* @author yankunjie
*/
public class AmazonSqsReceiveMessage {
private static Regions regions = Regions.CN_NORTH_1;
private static AmazonSQS amazonSQS;
private static Openapi openapi;
private static final String ACCESS_KEY_ID = "";
private static final String SECRET_KEY = "";
private static final String OPEN_ID = "";
public static void main(String[] args) {
// 通过openapi queue接口获取queueUrl
Queue queue = getQueue();
String queueUrl = queue.getQueueUrl();
while (true) {
// 接收消息
List<Message> messages = receiveMessages(queueUrl);
for (Message message : messages) {
// 消息体
String body = message.getBody();
// do something
System.out.println(body);
// 删除消息
amazonSQS().deleteMessage(queueUrl, message.getReceiptHandle());
}
}
}
/**
* 接收消息,推荐使用长轮询
* <pre>
* 1.最大等待时间:20秒 值范围:1-20秒
* 2.最大接收消息数:10 值范围:1-10
* 3.默认可见性超时:60秒 值范围:0-12天
* 根据业务需求适当调整可见性超时时间
* </pre>
*
* @param queueUrl 队列url
* @return 消息列表
*/
private static List<Message> receiveMessages(String queueUrl) {
ReceiveMessageRequest request = new ReceiveMessageRequest()
// 队列url
.withQueueUrl(queueUrl)
// 可见性超时:默认60秒,收到消息在可见性超时时间内不会再次收到该消息。注意:可见性超时无法保证不会接收消息两次
.withVisibilityTimeout(10)
// 最大等待时间:20秒 值范围:1-20秒
.withWaitTimeSeconds(5)
// 最大接收消息数:10 值范围:1-10
.withMaxNumberOfMessages(10);
return amazonSQS().receiveMessage(request).getMessages();
}
/**
* 获取队列信息
*
* @return 队列对象
*/
private static Queue getQueue() {
GetQueueRequest request = new GetQueueRequest();
request.sdkRequestConfig(getSdkRequestConfig(request));
return openapi().getQueue(request).getQueue();
}
private static SdkRequestConfig getSdkRequestConfig(BaseRequest request) {
return request.sdkRequestConfig().copyBuilder()
.customHeader("Content-Type", "application/json")
.customHeader("Access-Key-Id", ACCESS_KEY_ID)
.customHeader("Open-Id", OPEN_ID)
.build();
}
public static Openapi openapi() {
if (openapi == null) {
String accessKey = ACCESS_KEY_ID;
String accessSecret = SECRET_KEY;
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, accessSecret);
openapi = Openapi.builder()
.iamCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.build();
}
return openapi;
}
public static AmazonSQS amazonSQS() {
if (amazonSQS == null) {
String accessKey = ACCESS_KEY_ID;
String accessSecret = SECRET_KEY;
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, accessSecret);
amazonSQS = AmazonSQSClientBuilder.standard()
.withRegion(regions)
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.build();
}
return amazonSQS;
}
}
SQS官方文档:https://docs.amazonaws.cn/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html
Webhook 说明
系统产生消 息后,发送POST请求到设置的Webhook地址,收到请求后处理header和body;header中的signature用于验证消息来源,body为加密的消息内容,需要解密后才能获取到消息内容。
-
验证签名 数据组成: callbackUrl + sorted(排除signature的headers) + encryptBody + verifyToken
哈希方法: 使用 SHA-1 进行哈希计算。
签名比较: 将计算得到的 signature 与 header 中的 signature 进行比较,以验证签名的正确性。
注意:验签的header不是收到的所有header,是设置webhook时双方约定的header
-
解密 body 步骤:
将 body 转换为字节数组(通过 hexToBytes 方法)。
使用 encryptKey 进行解密,解密算法为 AES/CBC/NoPadding。
将解密得到的数据解析为 JSON 格式。
java代码示例
验签工具
import com.google.gson.Gson;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.Map;
import java.util.TreeMap;
public class SignatureVerifier {
private static final Gson gson = new Gson();
public static boolean verifySignature(String callbackUrl, Map<String, String> headers, String encryptBody, String verifyToken) throws Exception {
// 创建一个 TreeMap 来对 headers 进行排序
Map<String, String> sortedHeaders = new TreeMap<>(headers);
sortedHeaders.remove("signature"); // 移除 signature 字段
// 将排序后的 map 转换为 JSON 字符串
String headersJson = gson.toJson(sortedHeaders);
// 拼接数据
String dataToHash = callbackUrl + headersJson + encryptBody + verifyToken;
// 生成 SHA-1 哈希
MessageDigest digest = MessageDigest.getInstance("SHA-1");
byte[] hash = digest.digest(dataToHash.getBytes(StandardCharsets.UTF_8));
// 获取 header 中的签名
String signatureFromHeader = headers.get("signature");
// 将计算得到的签名与 header 中的签名进行比较
return signatureFromHeader != null && signatureFromHeader.equals(bytesToHex(hash));
}
private static String bytesToHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02x", b));
}
return sb.toString();
}
}
解密工具
import javax.crypto.Cipher;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.io.UnsupportedEncodingException;
public class AESUtils {
// 偏移量
public static final String VIPARA = "5928772605893626";
// 编码方式
public static final String CODE_TYPE = "UTF-8";
// 填充类型
public static final String AES_TYPE = "AES/CBC/NoPadding";
// 填充字符集
private static final String[] consult = new String[]{"0","1","2","3","4","5","6","7","8","9","A","B","C","D","E","F"};
/**
* AES解密
* @param encryptText 需解密字符串(十六进制)
* @param aesKey AES密钥(需为16字节)
* @return 解密后的字符串
* @throws Exception
*/
public static String decrypt(String encryptText, String aesKey) throws Exception {
byte[] bytes = hexToBytes(encryptText);
IvParameterSpec zeroIv = new IvParameterSpec(VIPARA.getBytes());
// 密钥填充至16位
if(aesKey.length() < 16) aesKey = completionCodeFor16Bytes(aesKey);
SecretKeySpec key = new SecretKeySpec(aesKey.getBytes(), "AES");
Cipher cipher = Cipher.getInstance(AES_TYPE);
cipher.init(Cipher.DECRYPT_MODE, key, zeroIv);
byte[] decryptedData = cipher.doFinal(bytes);
return restoreData(new String(decryptedData, CODE_TYPE));
}
/**
* 字符填充(加密前处理)
* @param str 需填充的字符串
* @return 填充后的字符串
* @throws UnsupportedEncodingException
*/
public static String completionCodeFor16Bytes(String str) throws UnsupportedEncodingException {
int len = str.getBytes(CODE_TYPE).length;
int index = len % 16;
int coverCnt = 16 - index; // 需填充字符数量
String coverVal = consult[coverCnt - 1]; // 填充值
StringBuffer sb = new StringBuffer(str);
// 补位填充
for (int i = 0; i < coverCnt; i++) sb.append(coverVal);
return sb.toString();
}
/**
* 还原字符串(去填充)
* @param str 需还原的字符串
* @return 去填充后的字符串
*/
public static String restoreData(String str) {
// 获取最后的字符串值
int num = 0;
String markStr = str.substring(str.length() - 1);
// 获取需截取字符长度
for (int i = 0; i < consult.length; i++) {
if (consult[i].equals(markStr)) {
num = i + 1;
break;
}
}
// 还原字符
str = str.substring(0, str.length() - num);
return str;
}
/**
* byte数组转16进制值字符串
* @param buf byte数组
* @return 16进制字符串
*/
public static String toHex(byte[] buf) {
if (buf != null && buf.length != 0) {
StringBuilder out = new StringBuilder();
for (int i = 0; i < buf.length; ++i) {
out.append(consult[buf[i] >> 4 & 15]).append(consult[buf[i] & 15]);
}
return out.toString();
} else {
return "";
}
}
/**
* 字符串转byte数组
* @param str 16进制字符串
* @return byte数组
*/
public static byte[] hexToBytes(String str) {
if (str == null) {
return null;
} else {
char[] hex = str.toCharArray();
int length = hex.length / 2;
byte[] raw = new byte[length];
for (int i = 0; i < length; ++i) {
int high = Character.digit(hex[i * 2], 16);
int low = Character.digit(hex[i * 2 + 1], 16);
int value = high << 4 | low;
if (value > 127) {
value -= 256;
}
raw[i] = (byte) value;
}
return raw;
}
}
}
在之前提供的代码中,gson 是一个 Gson 类的实例,用于将排序后的 headers Map 转换为 JSON 字符串。你需要在项目中添加 Gson 库作为依赖,以便使用 Gson 类。
如何添加 Gson 依赖,如果使用 Maven 在 pom.xml 文件中添加 Gson 依赖:
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>