leniu-java-mq

leniu-tengyun-core 消息队列规范

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "leniu-java-mq" with this command: npx skills add xu-cell/ai-engineering-init/xu-cell-ai-engineering-init-leniu-java-mq

leniu-tengyun-core 消息队列规范

项目特征

特征 说明

包名前缀 net.xnzn.core.*

JDK 版本 21

消息工具 MqUtil

消费者注解 @MQMessageListener(group, topic, tag)

消费者接口 implements MQListener<MqPayload<String>>

主题常量 LeMqConstant.Topic

延迟枚举 LeMqConstant.DelayDuration

JSON工具 JacksonUtil

异常类 LeException

核心架构:三层分工

MsgSend(静态工具类)→ Listener(接收消息)→ Handler(分发处理)

层 职责 示例

XxxMessageSend 静态工具类,封装消息发送逻辑 OrderMessageSend

XxxMqListenerYyy 消费者,接收 MQ 消息并分发到 Handler OrderMqListenerAsyncSave

XxxMqHandler 业务处理,统一处理各类消息 OrderMqHandler

消息发送

三种发送方式

import net.xnzn.core.common.mq.MqUtil; import net.xnzn.core.common.constant.LeMqConstant; import net.xnzn.core.common.utils.JacksonUtil;

// 1. 普通消息(立即发送) MqUtil.send(JacksonUtil.writeValueAsString(po), LeMqConstant.Topic.XXX_TOPIC);

// 2. 事务消息(在 DB 事务提交后发送,保证一致性) MqUtil.sendByTxEnd(JacksonUtil.writeValueAsString(po), LeMqConstant.Topic.XXX_TOPIC);

// 3. 延迟消息(指定延迟时长后触发) MqUtil.sendDelay(JacksonUtil.writeValueAsString(po), LeMqConstant.Topic.XXX_TOPIC, LeMqConstant.DelayDuration.ONE_MINUTE);

关键点:

  • 消息体必须用 JacksonUtil.writeValueAsString() 序列化为 String 再发送

  • 事务消息用 sendByTxEnd() (如 DB 事务回滚则不发送)

  • 延迟时间用 LeMqConstant.DelayDuration 枚举(不用 Duration.ofMinutes() )

DelayDuration 枚举(常用值)

LeMqConstant.DelayDuration.ONE_MINUTE // 1分钟 LeMqConstant.DelayDuration.THIRTY_MINUTES // 30分钟 // 其他枚举值参见 LeMqConstant.DelayDuration

消息发送类(静态工具类模式)

/**

  • 订单消息发送

  • 关键特征:

    1. 不是 @Component,是纯静态工具类
    1. 私有构造器
    1. PO 中包含 traceId 和 tenantId(用于跨线程追踪) */ @Slf4j public class XxxMessageSend {

    private XxxMessageSend() {} // 禁止实例化

    private static final String MQ_ERROR_LOG = "发送MQ消息失败";

    /**

    • 普通消息(适用于非事务性发送) */ public static void sendXxxEvent(XxxPO po) { log.info("[XxxMQ]发送xxx事件"); po.setTraceId(LogUtil.getCurrentTraceId()); po.setTenantId(TenantContextHolder.getTenantId()); MqUtil.send(JacksonUtil.writeValueAsString(po), LeMqConstant.Topic.XXX_TOPIC); }

    /**

    • 事务消息(在 @Transactional 方法中使用,事务提交后才发送) */ public static void sendXxxEventByTx(XxxPO po) { log.info("[XxxMQ]发送xxx事务消息"); po.setTraceId(LogUtil.getCurrentTraceId()); po.setTenantId(TenantContextHolder.getTenantId()); try { MqUtil.sendByTxEnd(JacksonUtil.writeValueAsString(po), LeMqConstant.Topic.XXX_TOPIC); } catch (Exception e) { log.error(MQ_ERROR_LOG, e); } }

    /**

    • 延迟消息(超时取消等场景) */ public static void sendXxxDelay(XxxPO po, LeMqConstant.DelayDuration delayDuration) { log.info("[XxxMQ]发送xxx延迟消息"); po.setTraceId(LogUtil.getCurrentTraceId()); po.setTenantId(TenantContextHolder.getTenantId()); MqUtil.sendDelay(JacksonUtil.writeValueAsString(po), LeMqConstant.Topic.XXX_TOPIC, delayDuration); } }

PO 消息体规范

import lombok.Data;

/**

  • MQ 消息 PO(Message Payload Object)

  • 必须包含 traceId 和 tenantId 字段 */ @Data public class XxxPO {

    /** 链路追踪ID */ private String traceId;

    /** 租户ID */ private String tenantId;

    /** 业务字段 */ private Long orderId; private String outTradeNo; // 其他字段... }

消息消费

Listener 类(真实代码模式)

import lombok.extern.slf4j.Slf4j; import net.xnzn.core.common.mq.MqPayload; import net.xnzn.framework.mq.MQListener; import net.xnzn.framework.mq.MQMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy;

/**

  • MQ 消费者 Listener

  • @see LeMqConstant.Topic#XXX_TOPIC */ @Slf4j @MQMessageListener( group = "module-xxx-topic-name", // 消费组,格式:模块名-topic-tag topic = "xxx", // Topic 名称 tag = "xxx-topic-name" // Tag 名称(对应 LeMqConstant.Topic) ) public class XxxMqListenerYyy implements MQListener<MqPayload<String>> {

    @Autowired @Lazy // ⚠️ 必须 @Lazy,避免循环依赖 private XxxMqHandler xxxMqHandler;

    @Override public void onMessage(MqPayload<String> payload) { // 委托给 Handler 处理,使用方法引用 xxxMqHandler.handleMessage(payload, XxxPO.class, XxxMqHandler::handleXxx); } }

Handler 类(统一处理消息)

import cn.hutool.core.text.CharSequenceUtil; import com.pig4cloud.pigx.common.core.exception.LeException; import lombok.extern.slf4j.Slf4j; import net.xnzn.core.common.export.util.I18nUtil; import net.xnzn.core.common.mq.MqPayload; import net.xnzn.core.common.utils.JacksonUtil; import net.xnzn.framework.data.tenant.TenantContextHolder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service;

import java.util.function.BiConsumer;

@Slf4j @Service public class XxxMqHandler {

@Lazy
@Autowired
private XxxService xxxService;

/**
 * 统一处理调用(核心模板方法)
 * 负责:反序列化、设置租户上下文、异常兜底
 */
public &#x3C;T> void handleMessage(MqPayload&#x3C;String> payload, Class&#x3C;T> clz, BiConsumer&#x3C;XxxMqHandler, T> handleFunc) {
    I18nUtil.loadDefaultLocale();
    try {
        log.info("[Xxx消息]收到消息 {}", payload);
        T payloadData = JacksonUtil.readValue(payload.getData(), clz);
        if (payloadData != null) {
            TenantContextHolder.setTenantId(payload.getTenantId());  // 设置租户上下文
            handleFunc.accept(this, payloadData);
        } else {
            log.error("[Xxx消息]解析失败");
        }
    } catch (Exception e) {
        log.error("[Xxx消息]处理异常", e);
    }
}

/**
 * 处理 xxx 事件
 */
public void handleXxx(XxxPO payload) {
    try {
        log.info("[Xxx事件]MQ消费:开始");
        xxxService.processXxx(payload);
        log.info("[Xxx事件]MQ消费:消息消费完成");
    } catch (Exception e) {
        log.error("[Xxx事件]MQ消费:处理异常", e);
    }
}

}

常见场景

场景1:事务消息(下单后通知)

@Transactional(rollbackFor = Exception.class) public void createOrder(OrderParam param) { // 1. 保存订单 OrderInfo order = OrderInfo.newDefaultInstance(); order.setCanteenId(param.getCanteenId()); orderMapper.insert(order);

// 2. 事务提交后发送消息(保证一致性)
OrderPlacedPO po = new OrderPlacedPO();
po.setOrderInfo(order);
OrderMessageSend.sendOrderPlacedByTx(po);  // 内部使用 sendByTxEnd

log.info("订单创建成功,orderId:{}", order.getId());

}

场景2:延迟消息(订单超时取消)

public static LocalDateTime sendOrderTimeout(String macOrderId, LeMqConstant.DelayDuration delayDuration) { log.info("[订单MQv3]发送未支付订单异步支付超时通知"); OrderCancelPO po = new OrderCancelPO(); po.setMacOrderId(macOrderId); po.setTenantId(TenantContextHolder.getTenantId()); po.setTraceId(LogUtil.getCurrentTraceId());

// 延迟发送
MqUtil.sendDelay(JacksonUtil.writeValueAsString(po), LeMqConstant.Topic.ORDER_V3_ASYNC_TIMEOUT, delayDuration);

// 返回预计触发时间
return LocalDateTime.now().plusSeconds(delayDuration.getMilliseconds() / 1000);

}

场景3:带 Redisson 分布式锁的 MQ 消费

public void orderAsyncSave(OrderSavePO payload) { // 消费时加分布式锁(防止并发处理同一订单) RLock lock = RedisUtil.getLock(OrderCacheConstants.orderCacheSaveLockKey(payload.getMacOrderId())); lock.lock(); try { log.info("[订单异步保存]MQ消费:开始"); doSaveOrder(payload); log.info("[订单异步保存]MQ消费:消息消费完成"); } catch (Exception e) { log.error("[订单异步保存]MQ消费:处理异常", e); } finally { // 安全释放锁 try { if (lock.isHeldByCurrentThread() && lock.isLocked()) { lock.unlock(); } } catch (Exception e) { log.error("解锁异常", e); } } }

日志规范

// 发送端日志格式 log.info("[模块MQv3]发送xxx事件");

// 消费端日志格式 log.info("[xxx事件]MQ消费:开始"); log.info("[xxx事件]MQ消费:消息消费完成"); log.error("[xxx事件]MQ消费:处理异常", e);

常见错误

错误写法 正确写法 说明

MqUtil.send(dto, topic) 直接传对象 MqUtil.send(JacksonUtil.writeValueAsString(dto), topic)

必须先序列化为 String

@MqConsumer(topic = ...)

@MQMessageListener(group, topic, tag)

  • implements MQListener<MqPayload<String>>

实际框架注解不同

Duration.ofMinutes(30)

LeMqConstant.DelayDuration.THIRTY_MINUTES

延迟枚举不是 Duration

忘记在 PO 中设置 traceId /tenantId

po.setTraceId(LogUtil.getCurrentTraceId())

多租户追踪必须设置

消费方法直接 @Autowired 服务 @Autowired @Lazy

避免循环依赖

在 MQ 发送类上加 @Component

纯静态工具类(私有构造器,不注入 Spring) 发送类是静态工具

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

scheduled-jobs

No summary provided by upstream source.

Repository SourceNeeds Review
General

loki-log-query

No summary provided by upstream source.

Repository SourceNeeds Review
General

progress

No summary provided by upstream source.

Repository SourceNeeds Review