From 9d8e1bbee78d22733d8910f0183aac21ceba1f05 Mon Sep 17 00:00:00 2001 From: "benjamin.franksen" <benjamin.franksen@helmholtz-berlin.de> Date: Wed, 3 Jul 2013 01:10:40 +0000 Subject: [PATCH] seq: fix problem with synchronous pvPut/pvGet The problem was that that once a synchronous request times out, the variable would never recover from the timeout state. This was because the semaphores would not be signalled on timeout. However, in order to correctly discard events (i.e. callbacks from CA) that belong to timed out requests, we have to remember them. This is what the getReq and putReq arrays in struct state_set are for. The fix assumes that CA will eventually call the callback for each accepted request, even if the channel disconnects. If the assumption is wrong then we have a memory leak. --- src/seq/seqPvt.h | 2 ++ src/seq/seq_ca.c | 20 ++++++++++------- src/seq/seq_if.c | 53 +++++++++++++++++++++++++++++++++++++++------- src/seq/seq_main.c | 13 ++++++++++++ 4 files changed, 72 insertions(+), 16 deletions(-) diff --git a/src/seq/seqPvt.h b/src/seq/seqPvt.h index 9176f831..6f2cead9 100644 --- a/src/seq/seqPvt.h +++ b/src/seq/seqPvt.h @@ -158,6 +158,8 @@ struct state_set /* these are arrays, one for each channel */ epicsEventId *getSemId; /* semaphores for async get */ epicsEventId *putSemId; /* semaphores for async put */ + PVREQ **getReq; /* get requests */ + PVREQ **putReq; /* put requests */ /* safe mode */ boolean *dirty; /* array of flags, one for each channel */ SEQ_VARS *var; /* variable value block */ diff --git a/src/seq/seq_ca.c b/src/seq/seq_ca.c index 3986b650..676d0d9d 100644 --- a/src/seq/seq_ca.c +++ b/src/seq/seq_ca.c @@ -157,14 +157,16 @@ pvStat seq_connect(SPROG *sp, boolean wait) void seq_get_handler( void *var, pvType type, unsigned count, pvValue *value, void *arg, pvStat status) { - PVREQ *rQ = (PVREQ *)arg; - CHAN *ch = rQ->ch; + PVREQ *rq = (PVREQ *)arg; + CHAN *ch = rq->ch; + SSCB *ss = rq->ss; SPROG *sp = ch->sprog; assert(ch->dbch != NULL); freeListFree(sp->pvReqPool, arg); - /* Process event handling in each state set */ - proc_db_events(value, type, ch, rQ->ss, GET_COMPLETE, status); + /* ignore callback if not expected, e.g. already timed out */ + if (ss->getReq[chNum(ch)] == rq) + proc_db_events(value, type, ch, ss, GET_COMPLETE, status); } /* @@ -174,14 +176,16 @@ void seq_get_handler( void seq_put_handler( void *var, pvType type, unsigned count, pvValue *value, void *arg, pvStat status) { - PVREQ *rQ = (PVREQ *)arg; - CHAN *ch = rQ->ch; + PVREQ *rq = (PVREQ *)arg; + CHAN *ch = rq->ch; + SSCB *ss = rq->ss; SPROG *sp = ch->sprog; assert(ch->dbch != NULL); freeListFree(sp->pvReqPool, arg); - /* Process event handling in each state set */ - proc_db_events(value, type, ch, rQ->ss, PUT_COMPLETE, status); + /* ignore callback if not expected, e.g. already timed out */ + if (ss->putReq[chNum(ch)] == rq) + proc_db_events(value, type, ch, ss, PUT_COMPLETE, status); } /* diff --git a/src/seq/seq_if.c b/src/seq/seq_if.c index 31b76e1e..894b5318 100644 --- a/src/seq/seq_if.c +++ b/src/seq/seq_if.c @@ -107,6 +107,9 @@ epicsShareFunc pvStat epicsShareAPI seq_pvGet(SS_ID ss, VAR_ID varId, enum compT ); return pvStatERROR; case epicsEventWaitError: + /* try to recover */ + ss->getReq[varId] = NULL; + epicsEventSignal(getSem); errlogSevPrintf(errlogFatal, "pvGet: epicsEventWaitWithTimeout() failure\n"); return pvStatERROR; @@ -129,6 +132,9 @@ epicsShareFunc pvStat epicsShareAPI seq_pvGet(SS_ID ss, VAR_ID varId, enum compT ); return pvStatERROR; case epicsEventWaitError: + /* try to recover */ + ss->getReq[varId] = NULL; + epicsEventSignal(getSem); errlogSevPrintf(errlogFatal, "pvGet: epicsEventTryWait() failure\n"); return pvStatERROR; @@ -140,6 +146,9 @@ epicsShareFunc pvStat epicsShareAPI seq_pvGet(SS_ID ss, VAR_ID varId, enum compT req->ss = ss; req->ch = ch; + assert(ss->getReq[varId] == NULL); + ss->getReq[varId] = req; + /* Perform the PV get operation with a callback routine specified. Requesting more than db channel has available is ok. */ status = pvVarGetCallback( @@ -155,6 +164,7 @@ epicsShareFunc pvStat epicsShareAPI seq_pvGet(SS_ID ss, VAR_ID varId, enum compT meta->message = "get failure"; errlogSevPrintf(errlogFatal, "pvGet(var %s, pv %s): pvVarGetCallback() failure: %s\n", ch->varName, dbch->dbName, pvVarGetMess(dbch->pvid)); + ss->getReq[varId] = NULL; freeListFree(sp->pvReqPool, req); epicsEventSignal(getSem); check_connected(dbch, meta); @@ -164,11 +174,15 @@ epicsShareFunc pvStat epicsShareAPI seq_pvGet(SS_ID ss, VAR_ID varId, enum compT /* Synchronous: wait for completion */ if (compType == SYNC) { + epicsEventWaitStatus event_status; + pvSysFlush(sp->pvSys); - switch (epicsEventWaitWithTimeout(getSem, tmo)) + event_status = epicsEventWaitWithTimeout(getSem, tmo); + ss->getReq[varId] = NULL; + epicsEventSignal(getSem); + switch (event_status) { case epicsEventWaitOK: - epicsEventSignal(getSem); status = check_connected(dbch, meta); if (status) return status; if (sp->options & OPT_SAFE) @@ -222,6 +236,7 @@ epicsShareFunc boolean epicsShareAPI seq_pvGetComplete(SS_ID ss, VAR_ID varId) switch (epicsEventTryWait(getSem)) { case epicsEventWaitOK: + ss->getReq[varId] = NULL; epicsEventSignal(getSem); status = check_connected(ch->dbch, metaPtr(ch,ss)); /* TODO: returning either TRUE or FALSE here seems wrong. We return TRUE, @@ -235,11 +250,14 @@ epicsShareFunc boolean epicsShareAPI seq_pvGetComplete(SS_ID ss, VAR_ID varId) ss_read_buffer(ss, ch, FALSE); return TRUE; case epicsEventWaitTimeout: + epicsEventSignal(getSem); return FALSE; case epicsEventWaitError: + ss->getReq[varId] = NULL; + epicsEventSignal(getSem); errlogSevPrintf(errlogFatal, "pvGetComplete: " "epicsEventTryWait(getSemId[%d]) failure\n", varId); - default: + default: /* pacify gcc which does not understand the we checked all possibilities */ return FALSE; } } @@ -254,7 +272,7 @@ static void *putq_cp(void *dest, const void *src, size_t elemSize) struct putq_cp_arg *arg = (struct putq_cp_arg *)src; CHAN *ch = arg->ch; - return memcpy(pv_value_ptr(dest, ch->type->getType), + return memcpy(pv_value_ptr(dest, ch->type->getType), /*BUG? should that be putType?*/ arg->var, ch->type->size * ch->count); } @@ -265,7 +283,7 @@ static void anonymous_put(SS_ID ss, CHAN *ch) if (ch->queue) { QUEUE queue = ch->queue; - pvType type = ch->type->getType; + pvType type = ch->type->getType; /*BUG? should that be putType?*/ size_t size = ch->type->size; boolean full; struct putq_cp_arg arg = {ch, var}; @@ -361,6 +379,9 @@ epicsShareFunc pvStat epicsShareAPI seq_pvPut(SS_ID ss, VAR_ID varId, enum compT ); return pvStatERROR; case epicsEventWaitError: + /* try to recover */ + ss->putReq[varId] = NULL; + epicsEventSignal(putSem); errlogSevPrintf(errlogFatal, "pvPut: epicsEventWaitWithTimeout() failure\n"); return pvStatERROR; @@ -385,6 +406,9 @@ epicsShareFunc pvStat epicsShareAPI seq_pvPut(SS_ID ss, VAR_ID varId, enum compT ); return pvStatERROR; case epicsEventWaitError: + /* try to recover */ + ss->putReq[varId] = NULL; + epicsEventSignal(putSem); errlogSevPrintf(errlogFatal, "pvPut: epicsEventTryWait() failure\n"); return pvStatERROR; @@ -418,6 +442,9 @@ epicsShareFunc pvStat epicsShareAPI seq_pvPut(SS_ID ss, VAR_ID varId, enum compT req->ss = ss; req->ch = ch; + assert(ss->putReq[varId] == NULL); + ss->putReq[varId] = req; + status = pvVarPutCallback( dbch->pvid, /* PV id */ ch->type->putType, /* data type */ @@ -427,6 +454,7 @@ epicsShareFunc pvStat epicsShareAPI seq_pvPut(SS_ID ss, VAR_ID varId, enum compT req); /* user arg */ if (status != pvStatOK) { + ss->putReq[varId] = NULL; errlogSevPrintf(errlogFatal, "pvPut(var %s, pv %s): pvVarPutCallback() failure: %s\n", ch->varName, dbch->dbName, pvVarGetMess(dbch->pvid)); freeListFree(sp->pvReqPool, req); @@ -439,11 +467,15 @@ epicsShareFunc pvStat epicsShareAPI seq_pvPut(SS_ID ss, VAR_ID varId, enum compT /* Synchronous: wait for completion (10s timeout) */ if (compType == SYNC) { + epicsEventWaitStatus event_status; + pvSysFlush(sp->pvSys); - switch (epicsEventWaitWithTimeout(putSem, tmo)) + event_status = epicsEventWaitWithTimeout(putSem, tmo); + ss->putReq[varId] = NULL; + epicsEventSignal(putSem); + switch (event_status) { case epicsEventWaitOK: - epicsEventSignal(putSem); status = check_connected(dbch, meta); if (status) return status; break; @@ -487,6 +519,7 @@ epicsShareFunc boolean epicsShareAPI seq_pvPutComplete( switch (epicsEventTryWait(putSem)) { case epicsEventWaitOK: + ss->putReq[varId] = NULL; epicsEventSignal(putSem); check_connected(ch->dbch, metaPtr(ch,ss)); /* TODO: returning either TRUE or FALSE here seems wrong. We return TRUE, @@ -494,10 +527,14 @@ epicsShareFunc boolean epicsShareAPI seq_pvPutComplete( status by calling pvStatus and/or pvMessage. */ done = TRUE; break; + case epicsEventWaitTimeout: + epicsEventSignal(putSem); + break; case epicsEventWaitError: + ss->putReq[varId] = NULL; + epicsEventSignal(putSem); errlogSevPrintf(errlogFatal, "pvPutComplete(%s): " "epicsEventTryWait(putSem[%d]) failure\n", ch->varName, varId); - case epicsEventWaitTimeout: break; } diff --git a/src/seq/seq_main.c b/src/seq/seq_main.c index d776edf7..f9506d11 100644 --- a/src/seq/seq_main.c +++ b/src/seq/seq_main.c @@ -309,6 +309,18 @@ static boolean init_sscb(SPROG *sp, SSCB *ss, seqSS *seqSS) errlogSevPrintf(errlogFatal, "init_sscb: calloc failed\n"); return FALSE; } + ss->getReq = newArray(PVREQ*, sp->numChans); + if (!ss->getReq) + { + errlogSevPrintf(errlogFatal, "init_sscb: calloc failed\n"); + return FALSE; + } + ss->putReq = newArray(PVREQ*, sp->numChans); + if (!ss->putReq) + { + errlogSevPrintf(errlogFatal, "init_sscb: calloc failed\n"); + return FALSE; + } } for (nch = 0; nch < sp->numChans; nch++) { @@ -324,6 +336,7 @@ static boolean init_sscb(SPROG *sp, SSCB *ss, seqSS *seqSS) errlogSevPrintf(errlogFatal, "init_sscb: epicsEventCreate failed\n"); return FALSE; } + /* note: do not pre-allocate request structures */ } ss->dead = epicsEventCreate(epicsEventEmpty); if (!ss->dead) -- GitLab