Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 95 additions & 51 deletions scst/src/scst_dlm.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ static struct scst_dlm_rem_ua *scst_dlm_pr_reg_recv_rem_ua(struct scst_device *d
dlm_lockspace_t *ls,
struct scst_dev_registrant *reg,
int index);
static void scst_dlm_remove_lock(dlm_lockspace_t *ls, struct scst_lksb *lksb,
const char *name);

static inline void compile_time_size_checks(void)
{
Expand Down Expand Up @@ -141,29 +143,58 @@ static int scst_dlm_lock_wait(dlm_lockspace_t *ls, int mode,
}

/*
* scst_dlm_unlock_wait - Discard a DLM lock
* scst_dlm_force_unlock_and_drain - Destroy the lkb backing @lksb and wait
* until the destroying CAST has been observed.
*
* After this returns, DLM no longer holds a reference to @lksb: no further
* AST can fire for it, so the containing storage may be freed.
*
* DLM_LKF_FORCEUNLOCK guarantees that a destroying CAST with sb_status
* -DLM_EUNLOCK will arrive even if an earlier operation is still in flight
* or a peer is departing (recovery synthesizes the reply in that case via
* dlm_recover_waiters_pre()). Other (non-destroying) CASTs that may have
* been pending — e.g. a stale reply for a canceled convert — are logged
* and the wait continues.
*
* The 60s watchdog logs progress but never returns early. The only
* "hang forever" case is a permanently broken lockspace; md-cluster and
* gfs2 accept the same constraint.
*/
static int scst_dlm_unlock_wait(dlm_lockspace_t *ls, struct scst_lksb *lksb)
static void scst_dlm_force_unlock_and_drain(dlm_lockspace_t *ls,
struct scst_lksb *lksb,
const char *name)
{
unsigned long start = jiffies;
u32 lkid = lksb->lksb.sb_lkid;
int res;

sBUG_ON(!ls);

init_completion(&lksb->compl);
res = dlm_unlock(ls, lksb->lksb.sb_lkid, 0, &lksb->lksb, lksb);
if (res < 0)
goto out;
res = wait_for_completion_timeout(&lksb->compl, 60 * HZ);
if (res > 0) {
res = lksb->lksb.sb_status;
if (res == -DLM_EUNLOCK || res == -DLM_ECANCEL)
res = 0;
} else if (res == 0) {
res = -ETIMEDOUT;
res = dlm_unlock(ls, lkid, DLM_LKF_FORCEUNLOCK, &lksb->lksb, lksb);
if (res < 0) {
PRINT_ERROR("dlm_unlock(FORCEUNLOCK, lkid=%08x, name=%s) submission failed: %d",
lkid, name ? : "?", res);
return;
}

out:
return res;
while (true) {
long wait = wait_for_completion_timeout(&lksb->compl, 60 * HZ);

if (wait > 0) {
int status = lksb->lksb.sb_status;

if (status == -DLM_EUNLOCK)
return;

PRINT_WARNING("stale CAST (lkid=%08x name=%s status=%d %ums)",
lkid, name ? : "?", status,
jiffies_to_msecs(jiffies - start));
continue;
}

PRINT_WARNING("draining FORCEUNLOCK (lkid=%08x name=%s, %ums)",
lkid, name ? : "?",
jiffies_to_msecs(jiffies - start));
}
}

/* Number of persistent reservation registrants. */
Expand Down Expand Up @@ -197,8 +228,6 @@ static void scst_dlm_pr_rm_reg_ls(dlm_lockspace_t *ls,
struct scst_dev_registrant *reg,
bool include_rem_ua)
{
int res;

if (include_rem_ua) {
struct scst_dlm_rem_ua *ua, *tmp;

Expand All @@ -217,10 +246,7 @@ static void scst_dlm_pr_rm_reg_ls(dlm_lockspace_t *ls,

if (!reg->lksb.lksb.sb_lkid)
return;
res = scst_dlm_unlock_wait(ls, &reg->lksb);
WARN(res < 0, "scst_dlm_unlock_wait(%08x) failed (%d)",
reg->lksb.lksb.sb_lkid, res);
reg->lksb.lksb.sb_lkid = 0;
scst_dlm_remove_lock(ls, &reg->lksb, NULL);
reg->dlm_idx = -1;
}

Expand Down Expand Up @@ -393,9 +419,12 @@ static int scst_copy_from_dlm(struct scst_device *dev, dlm_lockspace_t *ls,
lvb->pr_type);
}
} else {
PRINT_ERROR("pr_add_registrant %s." PR_REG_LOCK
" failed\n", dev->virt_name, i);
scst_dlm_unlock_wait(ls, &reg_lksb[i]);
char reg_name[32];

snprintf(reg_name, sizeof(reg_name), PR_REG_LOCK, i);
PRINT_ERROR("pr_add_registrant %s.%s failed\n",
dev->virt_name, reg_name);
scst_dlm_remove_lock(ls, &reg_lksb[i], reg_name);
continue;
}

Expand Down Expand Up @@ -444,8 +473,7 @@ static int scst_copy_from_dlm(struct scst_device *dev, dlm_lockspace_t *ls,

cancel:
for (i = 0; i < nr_registrants; i++)
if (reg_lksb[i].lksb.sb_lkid)
scst_dlm_unlock_wait(ls, &reg_lksb[i]);
scst_dlm_remove_lock(ls, &reg_lksb[i], NULL);

goto out;
}
Expand Down Expand Up @@ -811,21 +839,43 @@ static void scst_pr_toggle_lock(struct scst_pr_dlm_data *pr_dlm,
pr_dlm->dev->virt_name, lock_name, res);
if (!lksb.lksb.sb_lkid)
continue;
scst_dlm_lock_wait(ls, DLM_LOCK_NL, &lksb,
DLM_LKF_CONVERT, lock_name, NULL);
scst_dlm_unlock_wait(ls, &lksb);
scst_dlm_remove_lock(ls, &lksb, lock_name);
}
}

/* Remove a lock from the local DLM lockspace instance. */
/*
* scst_dlm_remove_lock - Remove a lock from the local DLM lockspace instance.
*
* Contract: when this function returns, the lkb has been destroyed and DLM
* no longer holds a reference to @lksb. No further AST can fire for it, so
* the containing storage may be freed.
*
* The clean-release idiom for a lock that may be held in EX or PW is to
* convert to NL first (signals "I'm done writing; LVB state is settled")
* and then unlock. The convert is best-effort: on failure we proceed
* to a FORCEUNLOCK regardless, so the contract is still met. The final
* scst_dlm_force_unlock_and_drain() waits for the destroying CAST before
* returning.
*/
static void scst_dlm_remove_lock(dlm_lockspace_t *ls, struct scst_lksb *lksb,
const char *name)
{
unsigned long start;
int res;

if (!lksb->lksb.sb_lkid)
return;
scst_dlm_lock_wait(ls, DLM_LOCK_NL, lksb, DLM_LKF_CONVERT, name,
NULL);
scst_dlm_unlock_wait(ls, lksb);

start = jiffies;

res = scst_dlm_lock_wait(ls, DLM_LOCK_NL, lksb, DLM_LKF_CONVERT, name,
NULL);
if (res < 0)
PRINT_WARNING("convert-to-NL failed (lkid=%08x, name=%s, res=%d, %ums)",
lksb->lksb.sb_lkid, name ? : "?", res,
jiffies_to_msecs(jiffies - start));

scst_dlm_force_unlock_and_drain(ls, lksb, name);
lksb->lksb.sb_lkid = 0;
}

Expand Down Expand Up @@ -1066,7 +1116,7 @@ static dlm_lockspace_t *get_lockspace(struct scst_device *dev)
else if (modified_lvb)
scst_trigger_reread_lvb(pr_dlm, ls);

scst_dlm_unlock_wait(ls, &pr_lksb);
scst_dlm_remove_lock(ls, &pr_lksb, PR_LOCK);

/*
* Only store the lockspace pointer in pr_dlm->ls after the lockspace
Expand Down Expand Up @@ -1153,7 +1203,7 @@ static void scst_dlm_pr_write_unlock(struct scst_device *dev,
DLM_LKF_CONVERT | DLM_LKF_VALBLK, PR_DATA_LOCK,
NULL);
scst_pr_toggle_lock(pr_dlm, ls, PR_POST_UPDATE_LOCK);
scst_dlm_unlock_wait(ls, pr_lksb);
scst_dlm_remove_lock(ls, pr_lksb, PR_LOCK);
}

static bool scst_dlm_reserved(struct scst_device *dev)
Expand Down Expand Up @@ -1214,7 +1264,7 @@ static void scst_dlm_res_unlock(struct scst_device *dev,
scst_pr_toggle_lock(pr_dlm, ls, PR_PRE_UPDATE_LOCK);
scst_pr_toggle_lock(pr_dlm, ls, PR_POST_UPDATE_LOCK);
}
scst_dlm_unlock_wait(ls, pr_lksb);
scst_dlm_remove_lock(ls, pr_lksb, PR_LOCK);
}

static bool scst_dlm_is_rsv_holder(struct scst_device *dev,
Expand Down Expand Up @@ -1438,8 +1488,7 @@ static void scst_reread_lvb_work(struct work_struct *work)
NULL);
if (res >= 0)
scst_trigger_reread_lvb(pr_dlm, ls);
if (pr_lksb.lksb.sb_lkid)
scst_dlm_unlock_wait(ls, &pr_lksb);
scst_dlm_remove_lock(ls, &pr_lksb, PR_LOCK);

unlock_ls:
mutex_unlock(&pr_dlm->ls_mutex);
Expand All @@ -1462,8 +1511,7 @@ static void scst_lvb_upd_work(struct work_struct *work)
res = scst_dlm_lock_wait(ls, DLM_LOCK_EX, &lksb, 0, PR_LOCK, NULL);
if (res >= 0)
scst_trigger_lvb_update(pr_dlm, ls);
if (lksb.lksb.sb_lkid)
scst_dlm_unlock_wait(ls, &lksb);
scst_dlm_remove_lock(ls, &lksb, PR_LOCK);

unlock_ls:
mutex_unlock(&pr_dlm->ls_mutex);
Expand Down Expand Up @@ -1588,7 +1636,7 @@ static void scst_pr_dlm_cleanup(struct scst_device *dev)
mutex_lock(&pr_dlm->ls_mutex);
scst_dlm_lock_wait(ls, DLM_LOCK_EX, &pr_lksb, 0, PR_LOCK, NULL);
scst_dlm_remove_locks(pr_dlm, ls);
scst_dlm_unlock_wait(ls, &pr_lksb);
scst_dlm_remove_lock(ls, &pr_lksb, PR_LOCK);
pr_dlm->ls = NULL;
mutex_unlock(&pr_dlm->ls_mutex);

Expand Down Expand Up @@ -1735,18 +1783,15 @@ static void scst_dlm_pr_reg_send_rem_ua(struct scst_device *dev, dlm_lockspace_t
}

/*
* scst_dlm_rm_rem_ua_ls - Unlock the lock associated with a remote unit attention
* scst_dlm_rm_rem_ua_ls - Remove the DLM lock associated with a remote unit
* attention. After this returns, no further AST can fire for &ua->lksb, so
* the rem_ua may be freed.
*/
static void scst_dlm_rm_rem_ua_ls(dlm_lockspace_t *ls, struct scst_dlm_rem_ua *ua)
{
int res;

if (!ua->lksb.lksb.sb_lkid)
return;
res = scst_dlm_unlock_wait(ls, &ua->lksb);
WARN(res < 0, "scst_dlm_unlock_wait(%08x) failed (%d)",
ua->lksb.lksb.sb_lkid, res);
ua->lksb.lksb.sb_lkid = 0;
scst_dlm_remove_lock(ls, &ua->lksb, NULL);
}

/*
Expand Down Expand Up @@ -1805,8 +1850,7 @@ static struct scst_dlm_rem_ua *scst_dlm_pr_reg_recv_rem_ua(struct scst_device *d
return ua;

unlock_cancel:
if (ua->lksb.lksb.sb_lkid)
scst_dlm_unlock_wait(ls, &ua->lksb);
scst_dlm_rm_rem_ua_ls(ls, ua);

cancel:
scst_dlm_free_rem_ua(ua);
Expand Down
Loading