Skip to content
Snippets Groups Projects
Commit 9d8e1bbe authored by benjamin.franksen's avatar benjamin.franksen
Browse files

seq: fix problem with synchronous pvPut/pvGet

The problem was that that once a synchronous request times out, the
variable would never recover from the timeout state. This was because
the semaphores would not be signalled on timeout. However, in order
to correctly discard events (i.e. callbacks from CA) that belong to
timed out requests, we have to remember them. This is what the getReq
and putReq arrays in struct state_set are for.

The fix assumes that CA will eventually call the callback for each
accepted request, even if the channel disconnects. If the assumption is
wrong then we have a memory leak.
parent 3435504a
No related branches found
No related tags found
No related merge requests found
......@@ -158,6 +158,8 @@ struct state_set
/* these are arrays, one for each channel */
epicsEventId *getSemId; /* semaphores for async get */
epicsEventId *putSemId; /* semaphores for async put */
PVREQ **getReq; /* get requests */
PVREQ **putReq; /* put requests */
/* safe mode */
boolean *dirty; /* array of flags, one for each channel */
SEQ_VARS *var; /* variable value block */
......
......@@ -157,14 +157,16 @@ pvStat seq_connect(SPROG *sp, boolean wait)
void seq_get_handler(
void *var, pvType type, unsigned count, pvValue *value, void *arg, pvStat status)
{
PVREQ *rQ = (PVREQ *)arg;
CHAN *ch = rQ->ch;
PVREQ *rq = (PVREQ *)arg;
CHAN *ch = rq->ch;
SSCB *ss = rq->ss;
SPROG *sp = ch->sprog;
assert(ch->dbch != NULL);
freeListFree(sp->pvReqPool, arg);
/* Process event handling in each state set */
proc_db_events(value, type, ch, rQ->ss, GET_COMPLETE, status);
/* ignore callback if not expected, e.g. already timed out */
if (ss->getReq[chNum(ch)] == rq)
proc_db_events(value, type, ch, ss, GET_COMPLETE, status);
}
/*
......@@ -174,14 +176,16 @@ void seq_get_handler(
void seq_put_handler(
void *var, pvType type, unsigned count, pvValue *value, void *arg, pvStat status)
{
PVREQ *rQ = (PVREQ *)arg;
CHAN *ch = rQ->ch;
PVREQ *rq = (PVREQ *)arg;
CHAN *ch = rq->ch;
SSCB *ss = rq->ss;
SPROG *sp = ch->sprog;
assert(ch->dbch != NULL);
freeListFree(sp->pvReqPool, arg);
/* Process event handling in each state set */
proc_db_events(value, type, ch, rQ->ss, PUT_COMPLETE, status);
/* ignore callback if not expected, e.g. already timed out */
if (ss->putReq[chNum(ch)] == rq)
proc_db_events(value, type, ch, ss, PUT_COMPLETE, status);
}
/*
......
......@@ -107,6 +107,9 @@ epicsShareFunc pvStat epicsShareAPI seq_pvGet(SS_ID ss, VAR_ID varId, enum compT
);
return pvStatERROR;
case epicsEventWaitError:
/* try to recover */
ss->getReq[varId] = NULL;
epicsEventSignal(getSem);
errlogSevPrintf(errlogFatal,
"pvGet: epicsEventWaitWithTimeout() failure\n");
return pvStatERROR;
......@@ -129,6 +132,9 @@ epicsShareFunc pvStat epicsShareAPI seq_pvGet(SS_ID ss, VAR_ID varId, enum compT
);
return pvStatERROR;
case epicsEventWaitError:
/* try to recover */
ss->getReq[varId] = NULL;
epicsEventSignal(getSem);
errlogSevPrintf(errlogFatal,
"pvGet: epicsEventTryWait() failure\n");
return pvStatERROR;
......@@ -140,6 +146,9 @@ epicsShareFunc pvStat epicsShareAPI seq_pvGet(SS_ID ss, VAR_ID varId, enum compT
req->ss = ss;
req->ch = ch;
assert(ss->getReq[varId] == NULL);
ss->getReq[varId] = req;
/* Perform the PV get operation with a callback routine specified.
Requesting more than db channel has available is ok. */
status = pvVarGetCallback(
......@@ -155,6 +164,7 @@ epicsShareFunc pvStat epicsShareAPI seq_pvGet(SS_ID ss, VAR_ID varId, enum compT
meta->message = "get failure";
errlogSevPrintf(errlogFatal, "pvGet(var %s, pv %s): pvVarGetCallback() failure: %s\n",
ch->varName, dbch->dbName, pvVarGetMess(dbch->pvid));
ss->getReq[varId] = NULL;
freeListFree(sp->pvReqPool, req);
epicsEventSignal(getSem);
check_connected(dbch, meta);
......@@ -164,11 +174,15 @@ epicsShareFunc pvStat epicsShareAPI seq_pvGet(SS_ID ss, VAR_ID varId, enum compT
/* Synchronous: wait for completion */
if (compType == SYNC)
{
epicsEventWaitStatus event_status;
pvSysFlush(sp->pvSys);
switch (epicsEventWaitWithTimeout(getSem, tmo))
event_status = epicsEventWaitWithTimeout(getSem, tmo);
ss->getReq[varId] = NULL;
epicsEventSignal(getSem);
switch (event_status)
{
case epicsEventWaitOK:
epicsEventSignal(getSem);
status = check_connected(dbch, meta);
if (status) return status;
if (sp->options & OPT_SAFE)
......@@ -222,6 +236,7 @@ epicsShareFunc boolean epicsShareAPI seq_pvGetComplete(SS_ID ss, VAR_ID varId)
switch (epicsEventTryWait(getSem))
{
case epicsEventWaitOK:
ss->getReq[varId] = NULL;
epicsEventSignal(getSem);
status = check_connected(ch->dbch, metaPtr(ch,ss));
/* TODO: returning either TRUE or FALSE here seems wrong. We return TRUE,
......@@ -235,11 +250,14 @@ epicsShareFunc boolean epicsShareAPI seq_pvGetComplete(SS_ID ss, VAR_ID varId)
ss_read_buffer(ss, ch, FALSE);
return TRUE;
case epicsEventWaitTimeout:
epicsEventSignal(getSem);
return FALSE;
case epicsEventWaitError:
ss->getReq[varId] = NULL;
epicsEventSignal(getSem);
errlogSevPrintf(errlogFatal, "pvGetComplete: "
"epicsEventTryWait(getSemId[%d]) failure\n", varId);
default:
default: /* pacify gcc which does not understand the we checked all possibilities */
return FALSE;
}
}
......@@ -254,7 +272,7 @@ static void *putq_cp(void *dest, const void *src, size_t elemSize)
struct putq_cp_arg *arg = (struct putq_cp_arg *)src;
CHAN *ch = arg->ch;
return memcpy(pv_value_ptr(dest, ch->type->getType),
return memcpy(pv_value_ptr(dest, ch->type->getType), /*BUG? should that be putType?*/
arg->var, ch->type->size * ch->count);
}
......@@ -265,7 +283,7 @@ static void anonymous_put(SS_ID ss, CHAN *ch)
if (ch->queue)
{
QUEUE queue = ch->queue;
pvType type = ch->type->getType;
pvType type = ch->type->getType; /*BUG? should that be putType?*/
size_t size = ch->type->size;
boolean full;
struct putq_cp_arg arg = {ch, var};
......@@ -361,6 +379,9 @@ epicsShareFunc pvStat epicsShareAPI seq_pvPut(SS_ID ss, VAR_ID varId, enum compT
);
return pvStatERROR;
case epicsEventWaitError:
/* try to recover */
ss->putReq[varId] = NULL;
epicsEventSignal(putSem);
errlogSevPrintf(errlogFatal,
"pvPut: epicsEventWaitWithTimeout() failure\n");
return pvStatERROR;
......@@ -385,6 +406,9 @@ epicsShareFunc pvStat epicsShareAPI seq_pvPut(SS_ID ss, VAR_ID varId, enum compT
);
return pvStatERROR;
case epicsEventWaitError:
/* try to recover */
ss->putReq[varId] = NULL;
epicsEventSignal(putSem);
errlogSevPrintf(errlogFatal,
"pvPut: epicsEventTryWait() failure\n");
return pvStatERROR;
......@@ -418,6 +442,9 @@ epicsShareFunc pvStat epicsShareAPI seq_pvPut(SS_ID ss, VAR_ID varId, enum compT
req->ss = ss;
req->ch = ch;
assert(ss->putReq[varId] == NULL);
ss->putReq[varId] = req;
status = pvVarPutCallback(
dbch->pvid, /* PV id */
ch->type->putType, /* data type */
......@@ -427,6 +454,7 @@ epicsShareFunc pvStat epicsShareAPI seq_pvPut(SS_ID ss, VAR_ID varId, enum compT
req); /* user arg */
if (status != pvStatOK)
{
ss->putReq[varId] = NULL;
errlogSevPrintf(errlogFatal, "pvPut(var %s, pv %s): pvVarPutCallback() failure: %s\n",
ch->varName, dbch->dbName, pvVarGetMess(dbch->pvid));
freeListFree(sp->pvReqPool, req);
......@@ -439,11 +467,15 @@ epicsShareFunc pvStat epicsShareAPI seq_pvPut(SS_ID ss, VAR_ID varId, enum compT
/* Synchronous: wait for completion (10s timeout) */
if (compType == SYNC)
{
epicsEventWaitStatus event_status;
pvSysFlush(sp->pvSys);
switch (epicsEventWaitWithTimeout(putSem, tmo))
event_status = epicsEventWaitWithTimeout(putSem, tmo);
ss->putReq[varId] = NULL;
epicsEventSignal(putSem);
switch (event_status)
{
case epicsEventWaitOK:
epicsEventSignal(putSem);
status = check_connected(dbch, meta);
if (status) return status;
break;
......@@ -487,6 +519,7 @@ epicsShareFunc boolean epicsShareAPI seq_pvPutComplete(
switch (epicsEventTryWait(putSem))
{
case epicsEventWaitOK:
ss->putReq[varId] = NULL;
epicsEventSignal(putSem);
check_connected(ch->dbch, metaPtr(ch,ss));
/* TODO: returning either TRUE or FALSE here seems wrong. We return TRUE,
......@@ -494,10 +527,14 @@ epicsShareFunc boolean epicsShareAPI seq_pvPutComplete(
status by calling pvStatus and/or pvMessage. */
done = TRUE;
break;
case epicsEventWaitTimeout:
epicsEventSignal(putSem);
break;
case epicsEventWaitError:
ss->putReq[varId] = NULL;
epicsEventSignal(putSem);
errlogSevPrintf(errlogFatal, "pvPutComplete(%s): "
"epicsEventTryWait(putSem[%d]) failure\n", ch->varName, varId);
case epicsEventWaitTimeout:
break;
}
......
......@@ -309,6 +309,18 @@ static boolean init_sscb(SPROG *sp, SSCB *ss, seqSS *seqSS)
errlogSevPrintf(errlogFatal, "init_sscb: calloc failed\n");
return FALSE;
}
ss->getReq = newArray(PVREQ*, sp->numChans);
if (!ss->getReq)
{
errlogSevPrintf(errlogFatal, "init_sscb: calloc failed\n");
return FALSE;
}
ss->putReq = newArray(PVREQ*, sp->numChans);
if (!ss->putReq)
{
errlogSevPrintf(errlogFatal, "init_sscb: calloc failed\n");
return FALSE;
}
}
for (nch = 0; nch < sp->numChans; nch++)
{
......@@ -324,6 +336,7 @@ static boolean init_sscb(SPROG *sp, SSCB *ss, seqSS *seqSS)
errlogSevPrintf(errlogFatal, "init_sscb: epicsEventCreate failed\n");
return FALSE;
}
/* note: do not pre-allocate request structures */
}
ss->dead = epicsEventCreate(epicsEventEmpty);
if (!ss->dead)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment