Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions internal/apps/payment/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func InitiatePayment(ctx context.Context, p *project.Project, payer *oauth.User,
return err
}
itemID = reservedItemID
logger.InfoF(ctx, "Reserved item %d for project %d and payer %d", itemID, p.ID, payer.ID)
logger.InfoF(ctx, "Reserved item %d for project %s and payer %d", itemID, p.ID, payer.ID)

outTradeNo := genOutTradeNo()
order := PaymentOrder{
Expand Down Expand Up @@ -315,14 +315,14 @@ func HandleNotify(ctx context.Context, q map[string]string) (bool, string) {
refundErr := doEpayRefund(ctx, cfg.ClientID, secret, order.TradeNo, moneyString(order.Amount))
if refundErr == nil {
tNow := time.Now()
if updateErr := db.DB(ctx).
Model(&PaymentOrder{}).
Where("out_trade_no = ?", outTradeNo).
Updates(map[string]any{"status": OrderStatusRefunded, "refunded_at": &tNow}).
Error; updateErr != nil {
processed, updateErr := markOrderRefundedAndReturnItem(ctx, &order, map[string]any{"status": OrderStatusRefunded, "refunded_at": &tNow}, OrderStatusRefunding)
if updateErr != nil {
logger.ErrorF(ctx, "payment refund retry: failed to mark order %s refunded: %v", outTradeNo, updateErr)
return false, "update order status failed"
}
db.Redis.RPush(ctx, project.ProjectItemsKey(order.ProjectID), order.ItemID)
if !processed {
return true, "idempotent"
}
return true, "refund retry ok"
}
return false, "refund retry failed"
Expand Down Expand Up @@ -359,13 +359,18 @@ func HandleNotify(ctx context.Context, q map[string]string) (bool, string) {
tNow := time.Now()
updates["status"] = OrderStatusRefunded
updates["refunded_at"] = &tNow
// 把 item 推回 Redis 队列恢复库存
db.Redis.RPush(ctx, project.ProjectItemsKey(order.ProjectID), order.ItemID)
if _, updateErr := markOrderRefundedAndReturnItem(ctx, &order, updates, OrderStatusPaid); updateErr != nil {
logger.ErrorF(ctx, "payment refund: failed to mark order %s refunded: %v", outTradeNo, updateErr)
return false, "update order status failed"
}
} else {
updates["status"] = OrderStatusRefunding
if updateErr := db.DB(ctx).Model(&PaymentOrder{}).
Where("out_trade_no = ? AND status = ?", outTradeNo, OrderStatusPaid).Updates(updates).Error; updateErr != nil {
logger.ErrorF(ctx, "payment refund: failed to mark order %s refunding: %v", outTradeNo, updateErr)
return false, "update order status failed"
}
}
db.DB(ctx).Model(&PaymentOrder{}).
Where("out_trade_no = ?", outTradeNo).Updates(updates)
// 让 epay 重试,再次进入时因状态非 PENDING 会回到 idempotent 分支
return false, fmt.Sprintf("fulfill failed: %v", err)
}
Expand Down
35 changes: 26 additions & 9 deletions internal/apps/payment/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import (
"time"

"github.com/hibiken/asynq"
"github.com/linux-do/cdk/internal/apps/project"
"github.com/linux-do/cdk/internal/db"
"github.com/linux-do/cdk/internal/logger"
"gorm.io/gorm"
)

// HandleExpireStaleOrders 清理长时间未付款的 PENDING 订单。
Expand Down Expand Up @@ -61,20 +61,37 @@ func HandleExpireStaleOrders(ctx context.Context, _ *asynq.Task) error {
// expireOrder 通过 CAS 将单笔 PENDING 订单置为 FAILED,并把预占的 item 归还 Redis。
// 使用 CAS (WHERE status=PENDING) 保证幂等:
// - 若 notify 回调恰好在此同时到达并推进了状态,CAS 得 0 行,安全跳过。
//
// 修复:归还 item 后,如果项目之前被标记为已完成,现在有库存了,则重置 IsCompleted。
func expireOrder(ctx context.Context, order *PaymentOrder) {
rows := db.DB(ctx).Model(&PaymentOrder{}).
Where("out_trade_no = ? AND status = ?", order.OutTradeNo, OrderStatusPending).
Update("status", OrderStatusFailed).
RowsAffected
processed := false
if err := db.DB(ctx).Transaction(func(tx *gorm.DB) error {
result := tx.Model(&PaymentOrder{}).
Where("out_trade_no = ? AND status = ?", order.OutTradeNo, OrderStatusPending).
Update("status", OrderStatusFailed)
if result.Error != nil {
return result.Error
}
if result.RowsAffected == 0 {
return nil
}

// 另一协程(notify 回调)已处理,跳过
if rows == 0 {
if err := returnReservedItem(ctx, tx, order.ProjectID, order.ItemID); err != nil {
return err
}

processed = true
return nil
}); err != nil {
logger.ErrorF(ctx, "payment cleanup: failed to expire order %s: %v", order.OutTradeNo, err)
return
}

if !processed {
logger.InfoF(ctx, "payment cleanup: order %s already processed, skipping", order.OutTradeNo)
return
}

// 归还预占的 item,恢复项目库存
db.Redis.RPush(ctx, project.ProjectItemsKey(order.ProjectID), order.ItemID)
logger.InfoF(ctx, "payment cleanup: returned item %d to project %s stock", order.ItemID, order.ProjectID)
logger.InfoF(ctx, "payment cleanup: order %s expired and marked as FAILED", order.OutTradeNo)
}
73 changes: 73 additions & 0 deletions internal/apps/payment/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* MIT License
*
* Copyright (c) 2025 linux.do
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package payment

import (
"context"
"fmt"

"github.com/linux-do/cdk/internal/apps/project"
"github.com/linux-do/cdk/internal/db"
"gorm.io/gorm"
)

// returnReservedItem 把支付预占的 item 归还 Redis,并在同一个数据库事务中重置项目完成状态。
// Redis RPush 或项目状态更新失败时返回 error,由调用方触发外层数据库事务回滚。
func returnReservedItem(ctx context.Context, tx *gorm.DB, projectID string, itemID uint64) error {
var proj project.Project
if err := tx.Where("id = ?", projectID).First(&proj).Error; err != nil {
return fmt.Errorf("load project %s: %w", projectID, err)
}
if err := db.Redis.RPush(ctx, project.ProjectItemsKey(projectID), itemID).Err(); err != nil {
return fmt.Errorf("return item %d to project %s stock: %w", itemID, projectID, err)
}
if err := proj.ResetCompletedStatusIfHasStock(ctx, tx); err != nil {
return fmt.Errorf("reset completed status for project %s: %w", projectID, err)
}
return nil
}

func markOrderRefundedAndReturnItem(ctx context.Context, order *PaymentOrder, updates map[string]any, expectedStatus OrderStatus) (bool, error) {
processed := false
err := db.DB(ctx).Transaction(func(tx *gorm.DB) error {
result := tx.Model(&PaymentOrder{}).
Where("out_trade_no = ? AND status = ?", order.OutTradeNo, expectedStatus).
Updates(updates)
if result.Error != nil {
return result.Error
}
if result.RowsAffected == 0 {
return nil
}

if err := returnReservedItem(ctx, tx, order.ProjectID, order.ItemID); err != nil {
return err
}

processed = true
return nil
})
return processed, err
}
21 changes: 21 additions & 0 deletions internal/apps/project/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,27 @@ func (p *Project) GetReceivedItem(ctx context.Context, userID uint64) (*ProjectI
return item, nil
}

// ResetCompletedStatusIfHasStock 在调用方归还库存时重置项目完成状态。
// 适用场景:
// - 付费订单退款成功后,item 被归还到 Redis 队列,需检查并重置项目完成状态
// - 付费订单超时过期后,预占的 item 被归还,需检查并重置项目完成状态
//
// 调用方应在归还 item 的同一个数据库事务中传入 tx;
// 若 Redis RPush 或数据库更新失败,错误会返回给外层事务处理。
func (p *Project) ResetCompletedStatusIfHasStock(ctx context.Context, tx *gorm.DB) error {
hasStock, err := p.HasStock(ctx)
if err != nil {
return err
}
if !hasStock {
return nil
}

return tx.Model(&Project{}).
Where("id = ? AND is_completed = ?", p.ID, true).
Update("is_completed", false).Error
}

type ProjectReport struct {
ID uint64 `json:"id" gorm:"primaryKey,autoIncrement"`
ProjectID string `json:"project_id" gorm:"size:64;index;uniqueIndex:idx_project_reporter"`
Expand Down
Loading