秒杀 预约活动项目中如何高效的保证下单交易成功?保证redis( 四 )

msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {//实现库存真正到数据库内扣减的逻辑Message msg = msgs.get(0);String jsonString = new String(msg.getBody());Map map = JSON.parseObject(jsonString, Map.class);Integer itemId = (Integer) map.get("itemId");Integer amount = (Integer) map.get("amount");itemStockDOMapper.decreaseStock(itemId,amount);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}}
@Componentpublic class MqProducer {private DefaultMQProducer producer;//为了保证MQ发送一定成功所以需要使用rocketmq中的事务private TransactionMQProducer transactionMQProducer;@Value("${mq.nameserver.addr}")private String nameAddr;@Value("${mq.topicname}")private String topicName;@Resourceprivate OrderService orderService;@Resourceprivate StockLogDOMapper stockLogDOMapper;@PostConstructpublic void init() throws MQClientException {//做mq producer的初始化producer = new DefaultMQProducer("producer_group");producer.setNamesrvAddr(nameAddr);producer.start();transactionMQProducer = new TransactionMQProducer("transaction_producer_group");transactionMQProducer.setNamesrvAddr(nameAddr);transactionMQProducer.start();transactionMQProducer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object arg) {//真正要做的事情,就是创建订单这步就是MQ事务中本地事务部分Integer itemId = (Integer) ((Map)arg).get("itemId");Integer promoId = (Integer) ((Map)arg).get("promoId");Integer userId = (Integer) ((Map)arg).get("userId");Integer amount = (Integer) ((Map)arg).get("amount");String stockLogId = (String) ((Map)arg).get("stockLogId");try {orderService.createOrder(userId,itemId,promoId,amount,stockLogId);} catch (BusinessException e) {e.printStackTrace();//设置对应的stockLog为回滚状态StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);stockLogDO.setStatus(3);stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO);return LocalTransactionState.ROLLBACK_MESSAGE;}return LocalTransactionState.COMMIT_MESSAGE;}/*** 假设在创建订单createOrder这个过程发生了很长的时间,创建订单在数据库压力比较大的情况下可能用了* 10s多 。这样MQ就发现本地事务一直没有说提交成功还是提交失败回滚,处于一个UNKNOW的状态* 于是就需要checkLocalTransaction回调判断是否下单是否是成功的*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {//根据是否扣减库存成功,来判断要返回COMMIT,ROLLBACK还是继续UNKNOWNString jsonString= new String(msg.getBody());Map map = JSON.parseObject(jsonString, Map.class);Integer itemId = (Integer) map.get("itemId");Integer amount = (Integer) map.get("amount");String stockLogId = (String) map.get("stockLogId");StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);if(stockLogDO == null){return LocalTransactionState.UNKNOW;}if(stockLogDO.getStatus().intValue() == 2){return LocalTransactionState.COMMIT_MESSAGE;}else if(stockLogDO.getStatus().intValue() == 1){return LocalTransactionState.UNKNOW;}return LocalTransactionState.ROLLBACK_MESSAGE;}});}//事务同步扣减库存方法//事务怎么体现的?只要数据库中的数据提交了,对应的一个消息是必定发送成功的,数据库内消息回滚了,那么消息必定不发送 。public boolean transactionAsyncReduceStock(Integer userId,Integer itemId,Integer promoId, Integer amount,String stockLogId){HashMap bodyMap = new HashMap<>();bodyMap.put("itemId",itemId);bodyMap.put("amount",amount);bodyMap.put("stockLogId",stockLogId);HashMap argsMap = new HashMap<>();argsMap.put("itemId",itemId);argsMap.put("amount",amount);argsMap.put("userId",userId);argsMap.put("promoId",promoId);argsMap.put("stockLogId",stockLogId);Message message = new Message(topicName,"increase",JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8")));TransactionSendResult sendResult = null;try {//sendMessageInTransaction第二个参数argsMap就是后面会被本地事务中的executeLocalTransaction方法接收到sendResult = transactionMQProducer.sendMessageInTransaction(message,argsMap);} catch (MQClientException e) {e.printStackTrace();return false;}if(sendResult.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE){return false;}else if(sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE){return true;}else{return false;}}}