1. 消息队列
1.1. 消息说明
Entity的CUD事件发生后,将产生事件消息
两种方式获取消息
- 通过SQS队列轮训消息 发送消息的对象采用白名单机制,订阅对象才会发送事件消息到SQS队列,需要订阅对象请联系管理员
- 通过webhook接收消息 需要联系管理员设置webhook地址
- 消息内容
- tenantId: 租户ID
- objectName: 对象名称
- objectId: 对象的ID
- operation: 操作,值为create, update, delete
- billTypeId: 单据类型ID,仅在单据时有值
- billTypeCode: 单据类型编码, 仅在单据时有值
- billStatus: 单据的状态(完全状态)
返回内容为:
{
"tenantId":"XXXX",
"objectName":"Reimburse",
"objectId":"KBJG]03V",
"operation":"create",
"billTypeId":"EV2VP160CXE000B",
"billTypeCode":"EX052",
"billStatus":"BillStatus.draft"
}
- UserTask对象消息会额外返回data,data包含以下信息(仅UserTask会返回data)
- isActed: 是否已执行
- taskName: 任务名称
- objectId: 任务关联的对象id
- objectType: 任务关联的对象名称
- taskUsers: 任务通知人列表
- actionName: 执行的动作名称,仅在审批后有值
- actionValue: 执行的动作value,仅在审批后有值
- isDeleted: 是否已删除,已删除的任务不需要再执行
- isActed: 是否已完成
- userId: 用户id
- userName: 用户名称
- userCode: 用户编码
- userExternalObjectId: 用户外部系统id
UserTask返回消息内容为:
{
"tenantId": "1",
"objectName": "UserTask", // 对象名称
"objectId": "HNM71P5056V0008", // 对象id
"operation": "update", // 操作
"data": { // 消息数据(仅UserTask会返回data)
"isActed": true, // 是否已执行
"taskName": "凭证审核", // 任务名称
"objectId": "EMM71P5056V0002", // 任务关联的对象id
"objectType": "Voucher", // 任务关联的对象名称
"taskUsers": [ // 任务通知人列表
{
"actionName": "同意", // 执行的动作名称
"actionValue": "userAgree", // 执行的动作value
"isDeleted": false, // 是否已删除,已删除的任务不需要再执行
"isActed": true, // 是否已完成
"userId": "Q4V6B05DW0001", // 用户id
"userName": "王XX", // 用户名称
"userCode": "05", // 用户编码
"userExternalObjectId": "" // 用户外部系统id
}
]
}
}
1.2. SQS 使用说明
1.2.1. 在工程中引用sdk, 两者选其一即可
引入官方sqs sdk
<dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-sqs</artifactId> <version>version</version> </dependency>
引入openapi sdk
since 1.1.0
<dependency> <groupId>com.q7link.openapi</groupId> <artifactId>openapi-client</artifactId> <version>version</version> </dependency>
1.2.2. 支持语言
- 提供sdk的语言, 文档地址:https://docs.amazonaws.cn/
- Java
- .NET
- Python
- PHP
- Ruby
- 浏览器
1.2.3. java sdk使用示例 aws sdk 版本一
package com.q7link.openapi.example;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.opensdk.BaseRequest;
import com.amazonaws.opensdk.SdkRequestConfig;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.q7link.openapi.Openapi;
import com.q7link.openapi.model.GetQueueRequest;
import com.q7link.openapi.model.Queue;
import java.util.List;
/**
* sqs request example
*
* @author yankunjie
*/
public class AmazonSqsReceiveMessage {
private static Regions regions = Regions.CN_NORTH_1;
private static AmazonSQS amazonSQS;
private static Openapi openapi;
private static final String ACCESS_KEY_ID = "";
private static final String SECRET_KEY = "";
private static final String OPEN_ID = "";
public static void main(String[] args) {
// 通过openapi queue接口获取queueUrl
Queue queue = getQueue();
String queueUrl = queue.getQueueUrl();
while (true) {
// 接收消息
List<Message> messages = receiveMessages(queueUrl);
for (Message message : messages) {
// 消息体
String body = message.getBody();
// do something
System.out.println(body);
// 删除消息
amazonSQS().deleteMessage(queueUrl, message.getReceiptHandle());
}
}
}
/**
* 接收消息,推荐使用长轮询
* <pre>
* 1.最大等待时间:20秒 值范围:1-20秒
* 2.最大接收消息数:10 值范围:1-10
* 3.默认可见性超时:60秒 值范围:0-12天
* 根据业务需求适当调整可见性超时时间
* </pre>
*
* @param queueUrl 队列url
* @return 消息列表
*/
private static List<Message> receiveMessages(String queueUrl) {
ReceiveMessageRequest request = new ReceiveMessageRequest()
// 队列url
.withQueueUrl(queueUrl)
// 可见性超时:默认60秒,收到消息在可见性超时时间内不会再次收到该消息。注意:可见性超时无法保证不会接收消息两次
.withVisibilityTimeout(10)
// 最大等待时间:20秒 值范围:1-20秒
.withWaitTimeSeconds(5)
// 最大接收消息数:10 值范围:1-10
.withMaxNumberOfMessages(10);
return amazonSQS().receiveMessage(request).getMessages();
}
/**
* 获取队列信息
*
* @return 队列对象
*/
private static Queue getQueue() {
GetQueueRequest request = new GetQueueRequest();
request.sdkRequestConfig(getSdkRequestConfig(request));
return openapi().getQueue(request).getQueue();
}
private static SdkRequestConfig getSdkRequestConfig(BaseRequest request) {
return request.sdkRequestConfig().copyBuilder()
.customHeader("Content-Type", "application/json")
.customHeader("Access-Key-Id", ACCESS_KEY_ID)
.customHeader("Open-Id", OPEN_ID)
.build();
}
public static Openapi openapi() {
if (openapi == null) {
String accessKey = ACCESS_KEY_ID;
String accessSecret = SECRET_KEY;
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, accessSecret);
openapi = Openapi.builder()
.iamCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.build();
}
return openapi;
}
public static AmazonSQS amazonSQS() {
if (amazonSQS == null) {
String accessKey = ACCESS_KEY_ID;
String accessSecret = SECRET_KEY;
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, accessSecret);
amazonSQS = AmazonSQSClientBuilder.standard()
.withRegion(regions)
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.build();
}
return amazonSQS;
}
}
SQS官方文档:https://docs.amazonaws.cn/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html
1.3. Webhook 说明
系统产生消息后,发送POST请求到设置的Webhook地址,收到请求后处理header和body;header中的signature用于验证消息来源,body为加密的消息内容,需要解密后才能获取到消息内容。
验证签名 数据组成: callbackUrl + sorted(排除signature的headers) + encryptBody + verifyToken
哈希方法: 使用 SHA-1 进行哈希计算。
签名比较: 将计算得到的 signature 与 header 中的 signature 进行比较,以验证签名的正确性。
注意:验签的header不是收到的所有header,是设置webhook时双方约定的header
解密 body 步骤:
将 body 转换为字节数组(通过 hexToBytes 方法)。
使用 encryptKey 进行解密,解密算法为 AES/CBC/NoPadding。
将解密得到的数据解析为 JSON 格式。
1.3.1. java代码示例
验签工具
import com.google.gson.Gson;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.Map;
import java.util.TreeMap;
public class SignatureVerifier {
private static final Gson gson = new Gson();
public static boolean verifySignature(String callbackUrl, Map<String, String> headers, String encryptBody, String verifyToken) throws Exception {
// 创建一个 TreeMap 来对 headers 进行排序
Map<String, String> sortedHeaders = new TreeMap<>(headers);
sortedHeaders.remove("signature"); // 移除 signature 字段
// 将排序后的 map 转换为 JSON 字符串
String headersJson = gson.toJson(sortedHeaders);
// 拼接数据
String dataToHash = callbackUrl + headersJson + encryptBody + verifyToken;
// 生成 SHA-1 哈希
MessageDigest digest = MessageDigest.getInstance("SHA-1");
byte[] hash = digest.digest(dataToHash.getBytes(StandardCharsets.UTF_8));
// 获取 header 中的签名
String signatureFromHeader = headers.get("signature");
// 将计算得到的签名与 header 中的签名进行比较
return signatureFromHeader != null && signatureFromHeader.equals(bytesToHex(hash));
}
private static String bytesToHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02x", b));
}
return sb.toString();
}
}
解密工具
import javax.crypto.Cipher;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.io.UnsupportedEncodingException;
public class AESUtils {
// 偏移量
public static final String VIPARA = "5928772605893626";
// 编码方式
public static final String CODE_TYPE = "UTF-8";
// 填充类型
public static final String AES_TYPE = "AES/CBC/NoPadding";
// 填充字符集
private static final String[] consult = new String[]{"0","1","2","3","4","5","6","7","8","9","A","B","C","D","E","F"};
/**
* AES解密
* @param encryptText 需解密字符串(十六进制)
* @param aesKey AES密钥(需为16字节)
* @return 解密后的字符串
* @throws Exception
*/
public static String decrypt(String encryptText, String aesKey) throws Exception {
byte[] bytes = hexToBytes(encryptText);
IvParameterSpec zeroIv = new IvParameterSpec(VIPARA.getBytes());
// 密钥填充至16位
if(aesKey.length() < 16) aesKey = completionCodeFor16Bytes(aesKey);
SecretKeySpec key = new SecretKeySpec(aesKey.getBytes(), "AES");
Cipher cipher = Cipher.getInstance(AES_TYPE);
cipher.init(Cipher.DECRYPT_MODE, key, zeroIv);
byte[] decryptedData = cipher.doFinal(bytes);
return restoreData(new String(decryptedData, CODE_TYPE));
}
/**
* 字符填充(加密前处理)
* @param str 需填充的字符串
* @return 填充后的字符串
* @throws UnsupportedEncodingException
*/
public static String completionCodeFor16Bytes(String str) throws UnsupportedEncodingException {
int len = str.getBytes(CODE_TYPE).length;
int index = len % 16;
int coverCnt = 16 - index; // 需填充字符数量
String coverVal = consult[coverCnt - 1]; // 填充值
StringBuffer sb = new StringBuffer(str);
// 补位填充
for (int i = 0; i < coverCnt; i++) sb.append(coverVal);
return sb.toString();
}
/**
* 还原字符串(去填充)
* @param str 需还原的字符串
* @return 去填充后的字符串
*/
public static String restoreData(String str) {
// 获取最后的字符串值
int num = 0;
String markStr = str.substring(str.length() - 1);
// 获取需截取字符长度
for (int i = 0; i < consult.length; i++) {
if (consult[i].equals(markStr)) {
num = i + 1;
break;
}
}
// 还原字符
str = str.substring(0, str.length() - num);
return str;
}
/**
* byte数组转16进制值字符串
* @param buf byte数组
* @return 16进制字符串
*/
public static String toHex(byte[] buf) {
if (buf != null && buf.length != 0) {
StringBuilder out = new StringBuilder();
for (int i = 0; i < buf.length; ++i) {
out.append(consult[buf[i] >> 4 & 15]).append(consult[buf[i] & 15]);
}
return out.toString();
} else {
return "";
}
}
/**
* 字符串转byte数组
* @param str 16进制字符串
* @return byte数组
*/
public static byte[] hexToBytes(String str) {
if (str == null) {
return null;
} else {
char[] hex = str.toCharArray();
int length = hex.length / 2;
byte[] raw = new byte[length];
for (int i = 0; i < length; ++i) {
int high = Character.digit(hex[i * 2], 16);
int low = Character.digit(hex[i * 2 + 1], 16);
int value = high << 4 | low;
if (value > 127) {
value -= 256;
}
raw[i] = (byte) value;
}
return raw;
}
}
}
在之前提供的代码中,gson 是一个 Gson 类的实例,用于将排序后的 headers Map 转换为 JSON 字符串。你需要在项目中添加 Gson 库作为依赖,以便使用 Gson 类。
如何添加 Gson 依赖,如果使用 Maven 在 pom.xml 文件中添加 Gson 依赖:
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
使用示例
public static void main(String[] args) throws Exception {
// 假设你有如下数据
String callbackUrl = "http://example.com/callback";
Map<String, String> headers = MapBuilder.of(
"Tenant-Id", "your Tenant-Id",
"signature", "your signature"
);
String encryptBody = "your encryptBody";
String verifyToken = "your verifyToken";
String encryptKey = "your encryptKey";
// 验证签名
boolean isSignatureValid = SignatureVerifier.verifySignature(callbackUrl, headers, encryptBody, verifyToken);
if (isSignatureValid) {
// 验签通过
}
// 解密 body
String decryptedBody = AESUtils.decrypt(encryptBody, encryptKey);
// decryptedBody 为解密后的消息内容
}
1.3.2. Nodejs代码示例
依赖安装
首先,确保你已经安装了必要的 Node.js 模块。你可以使用以下命令来安装它们:
npm install crypto
验签
const crypto = require('crypto');
// 排除 signature 字段,并根据键排序
function sortHeaders(headers) {
const sortedHeaders = {};
Object.keys(headers)
.filter(key => key.toLowerCase() !== 'signature') // 排除 signature
.sort() // 排序
.forEach(key => {
sortedHeaders[key] = headers[key];
});
return sortedHeaders;
}
// 计算签名
function calculateSignature(callbackUrl, headers, encryptBody, verifyToken) {
const sortedHeaders = sortHeaders(headers);
const sortedHeadersJson = JSON.stringify(sortedHeaders);
// 构建签名字符串
const signatureString = callbackUrl + sortedHeadersJson + encryptBody + verifyToken;
// 计算 SHA-1 签名
const sha1 = crypto.createHash('sha1');
sha1.update(signatureString, 'utf8');
const calculatedSignature = sha1.digest('hex');
return calculatedSignature;
}
// 验证签名
function verifySignature(calculatedSignature, receivedSignature) {
return calculatedSignature === receivedSignature;
}
// 示例使用
const callbackUrl = 'http://example.com/callback';
const headers = {
'Header1': 'Value1',
'Header2': 'Value2',
'signature': 'signatureToExclude' // 注意:此字段会被排除
};
const encryptBody = 'encryptedBodyExample';
const verifyToken = 'yourVerifyToken';
const receivedSignature = 'expectedSignature'; // 接收到的签名
// 计算签名
const calculatedSignature = calculateSignature(callbackUrl, headers, encryptBody, verifyToken);
// 验证签名
const isSignatureValid = verifySignature(calculatedSignature, receivedSignature);
console.log('Calculated Signature:', calculatedSignature);
console.log('Is Signature Valid:', isSignatureValid);
消息解密
const crypto = require('crypto');
// 偏移量
const VIPARA = '5928772605893626';
// 填充字符集
const consult = ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F"];
/**
* AES解密
* @param {string} encryptText - 需解密的十六进制字符串
* @param {string} aesKey - AES密钥(需为16字节)
* @returns {string} - 解密后的字符串
*/
function decrypt(encryptText, aesKey) {
const bytes = hexToBytes(encryptText);
// 密钥填充至16字节
if (aesKey.length < 16) {
aesKey = completionCodeFor16Bytes(aesKey);
}
// 创建解密器
const decipher = crypto.createDecipheriv('aes-128-cbc', Buffer.from(aesKey, 'utf8'), Buffer.from(VIPARA, 'utf8'));
// 设置 NoPadding
decipher.setAutoPadding(false);
let decrypted = decipher.update(bytes);
decrypted = Buffer.concat([decrypted, decipher.final()]);
return restoreData(decrypted.toString('utf8'));
}
/**
* 字符填充(加密前处理)
* @param {string} str - 需填充的字符串
* @returns {string} - 填充后的字符串
*/
function completionCodeFor16Bytes(str) {
const len = Buffer.from(str, 'utf8').length;
const index = len % 16;
const coverCnt = 16 - index; // 需填充字符数量
const coverVal = consult[coverCnt - 1]; // 填充值
return str + coverVal.repeat(coverCnt);
}
/**
* 还原字符串(去填充)
* @param {string} str - 需还原的字符串
* @returns {string} - 去填充后的字符串
*/
function restoreData(str) {
// 获取最后的字符值
const markStr = str.slice(-1);
let num = 0;
// 获取需截取字符长度
for (let i = 0; i < consult.length; i++) {
if (consult[i] === markStr) {
num = i + 1;
break;
}
}
// 还原字符
return str.slice(0, -num);
}
/**
* 16进制字符串转byte数组
* @param {string} str - 16进制字符串
* @returns {Buffer} - byte数组
*/
function hexToBytes(str) {
return Buffer.from(str, 'hex');
}
// 示例使用
const encryptedHex = 'yourEncryptedHexString'; // 你的已加密的十六进制字符串
const aesKey = 'your16ByteKeyHere'; // 你的 AES 密钥
// 解密数据
try {
const decryptedData = decrypt(encryptedHex, aesKey);
console.log('Decrypted Data:', decryptedData);
} catch (error) {
console.error('Decryption Error:', error.message);
}