diff --git a/src/seq/seqPvt.h b/src/seq/seqPvt.h index 073680cc9e97d50571a4a61d72b37f9abadeccf3..282a68171189a8c1c24afb8072935fca8303287e 100644 --- a/src/seq/seqPvt.h +++ b/src/seq/seqPvt.h @@ -150,10 +150,8 @@ struct state_set epicsEventId syncSem; /* semaphore for event sync */ epicsEventId dead; /* event to signal state set exit done */ /* these are arrays, one for each channel */ - epicsEventId *getSem; /* semaphores for async get */ - epicsEventId *putSem; /* semaphores for async put */ - PVREQ **getReq; /* get requests */ - PVREQ **putReq; /* put requests */ + PVREQ **getReq; /* currently pending get requests */ + PVREQ **putReq; /* currently pending put requests */ PVMETA *metaData; /* meta data (safe mode) */ /* safe mode */ boolean *dirty; /* array of flags, one for each channel */ diff --git a/src/seq/seq_ca.c b/src/seq/seq_ca.c index 2107eaa93cd4fa7c63c6c0363545cbdbc989efe5..0ae845ba016777a5976264125a772b6802c17a42 100644 --- a/src/seq/seq_ca.c +++ b/src/seq/seq_ca.c @@ -308,28 +308,30 @@ static void proc_db_events( epicsMutexMustLock(sp->lock); /* Signal completion */ - if (ss) + switch (evtype) { - switch (evtype) - { - case pvEventGet: - epicsEventSignal(ss->getSem[chNum(ch)]); - break; - case pvEventPut: - epicsEventSignal(ss->putSem[chNum(ch)]); - break; - default: + case pvEventPut: + ss->putReq[chNum(ch)] = NULL; + epicsEventSignal(ss->syncSem); + break; + case pvEventGet: + ss->getReq[chNum(ch)] = NULL; + epicsEventSignal(ss->syncSem); + if (optTest(sp, OPT_SAFE)) break; - } + /* else: fall through */ + case pvEventMonitor: + /* Wake up each state set that uses this channel in a when condition. */ + /* In safe mode this is only necessary for monitor events, since the + effects of get events are local to the state set. */ + ss_wakeup(sp, ch->eventNum); + break; } /* If there's an event flag associated with this channel, set it */ if (ch->syncedTo) seq_efSet(sp->ss, ch->syncedTo); - /* Wake up each state set that uses this channel in an event */ - ss_wakeup(sp, ch->eventNum); - epicsMutexUnlock(sp->lock); } @@ -436,12 +438,14 @@ void seq_conn_handler(int connected, void *arg) seq_camonitor(ch, FALSE); } /* terminate outstanding requests that wait for completion */ + /* TODO: can there be a race condition with pvPut/pvGet? */ for (nss = 0; nss < sp->numSS; nss++) { - sp->ss[nss].getReq[chNum(ch)] = NULL; - epicsEventSignal(sp->ss[nss].getSem[chNum(ch)]); - sp->ss[nss].putReq[chNum(ch)] = NULL; - epicsEventSignal(sp->ss[nss].putSem[chNum(ch)]); + SSCB *ss = sp->ss + nss; + + ss->getReq[chNum(ch)] = NULL; + ss->putReq[chNum(ch)] = NULL; + epicsEventSignal(ss->syncSem); } } else diff --git a/src/seq/seq_if.c b/src/seq/seq_if.c index e5850c02dd48ec2d6bf420c7e8946694915354b7..84edcbad2a73f0aaaf64674bdd9a5709956d5685 100644 --- a/src/seq/seq_if.c +++ b/src/seq/seq_if.c @@ -48,6 +48,84 @@ static pvStat check_connected(DBCHAN *dbch, PVMETA *meta) } } +static pvStat check_pending( + pvEventType evtype, + SS_ID ss, + PVREQ **req, + const char *varName, + DBCHAN *dbch, + PVMETA *meta, + enum compType compType, + double tmo) +{ + const char *call = evtype == pvEventGet ? "pvGet" : "pvPut"; + const char *op = evtype == pvEventGet ? "get" : "put"; + const char *tmo_msg = evtype == pvEventGet ? "get completion timeout" : "put completion timeout"; + const char *err_msg = evtype == pvEventGet ? "get completion failure" : "put completion failure"; + + assert(evtype != pvEventMonitor); + if (compType == SYNC) + { + if (tmo <= 0.0) + { + errlogSevPrintf(errlogMajor, + "%s(%s,SYNC,%f): user error (timeout must be positive)\n", + call, varName, tmo); + return pvStatERROR; + } + while (*req) + { + /* a request is already pending (must be an async request) */ + double before, after; + pvStat status; + + pvTimeGetCurrentDouble(&before); + switch (epicsEventWaitWithTimeout(ss->syncSem, tmo)) + { + case epicsEventWaitOK: + status = check_connected(dbch, meta); + if (status != pvStatOK) + return status; + pvTimeGetCurrentDouble(&after); + tmo -= (after - before); + if (tmo > 0.0) + break; + /* else: fall through to timeout */ + case epicsEventWaitTimeout: + errlogSevPrintf(errlogMajor, + "%s(ss %s, var %s, pv %s): failed (timeout " + "waiting for other %s requests to finish)\n", + call, ss->ssName, varName, dbch->dbName, op + ); + meta->status = pvStatTIMEOUT; + meta->severity = pvSevrERROR; + meta->message = tmo_msg; + return meta->status; + case epicsEventWaitError: + errlogSevPrintf(errlogFatal, + "%s: epicsEventWaitWithTimeout() failure\n", call); + meta->status = pvStatERROR; + meta->severity = pvSevrERROR; + meta->message = err_msg; + return pvStatERROR; + } + } + } + else if (compType == ASYNC) + { + if (*req) { + errlogSevPrintf(errlogMajor, + "%s(ss %s, var %s, pv %s): user error " + "(there is already a %s pending for this variable/" + "state set combination)\n", + call, ss->ssName, varName, dbch->dbName, op + ); + return pvStatERROR; + } + } + return pvStatOK; +} + /* * Get value from a channel. * TODO: add optional timeout argument. @@ -58,7 +136,6 @@ epicsShareFunc pvStat seq_pvGet(SS_ID ss, VAR_ID varId, enum compType compType, CHAN *ch = sp->chan + varId; pvStat status; PVREQ *req; - epicsEventId getSem = ss->getSem[varId]; DBCHAN *dbch = ch->dbch; PVMETA *meta = metaPtr(ch,ss); @@ -86,74 +163,10 @@ epicsShareFunc pvStat seq_pvGet(SS_ID ss, VAR_ID varId, enum compType compType, compType = optTest(sp, OPT_ASYNC) ? ASYNC : SYNC; } - if (compType == SYNC) - { - double before, after; - pvTimeGetCurrentDouble(&before); - if (tmo <= 0.0) - { - errlogSevPrintf(errlogMajor, - "pvGet(%s,SYNC,%f): user error (timeout must be positive)\n", - ch->varName, tmo); - return pvStatERROR; - } - switch (epicsEventWaitWithTimeout(getSem, tmo)) - { - case epicsEventWaitOK: - status = check_connected(dbch, meta); - if (status != pvStatOK) return epicsEventSignal(getSem), status; - pvTimeGetCurrentDouble(&after); - tmo -= (after - before); - if (tmo <= 0.0) - tmo = 0.001; - break; - case epicsEventWaitTimeout: - errlogSevPrintf(errlogMajor, - "pvGet(ss %s, var %s, pv %s): failed (timeout " - "waiting for other get requests to finish)\n", - ss->ssName, ch->varName, dbch->dbName - ); - return pvStatERROR; - case epicsEventWaitError: - /* try to recover */ - ss->getReq[varId] = NULL; - epicsEventSignal(getSem); - errlogSevPrintf(errlogFatal, - "pvGet: epicsEventWaitWithTimeout() failure\n"); - return pvStatERROR; - } - } - else if (compType == ASYNC) - { - switch (epicsEventTryWait(getSem)) - { - case epicsEventWaitOK: - if (ss->getReq[varId] != NULL) - { - /* previous request timed out but user - did not call pvGetComplete */ - ss->getReq[varId] = NULL; - } - status = check_connected(dbch, meta); - if (status != pvStatOK) return epicsEventSignal(getSem), status; - break; - case epicsEventWaitTimeout: - errlogSevPrintf(errlogMajor, - "pvGet(ss %s, var %s, pv %s): user error " - "(there is already a get pending for this variable/" - "state set combination)\n", - ss->ssName, ch->varName, dbch->dbName - ); - return pvStatERROR; - case epicsEventWaitError: - /* try to recover */ - ss->getReq[varId] = NULL; - epicsEventSignal(getSem); - errlogSevPrintf(errlogFatal, - "pvGet: epicsEventTryWait() failure\n"); - return pvStatERROR; - } - } + status = check_pending(pvEventGet, ss, ss->getReq + varId, ch->varName, + dbch, meta, compType, tmo); + if (status != pvStatOK) + return status; /* Allocate and initialize a pv request */ req = (PVREQ *)freeListMalloc(sp->pvReqPool); @@ -175,11 +188,11 @@ epicsShareFunc pvStat seq_pvGet(SS_ID ss, VAR_ID varId, enum compType compType, meta->status = pvStatERROR; meta->severity = pvSevrERROR; meta->message = "get failure"; - errlogSevPrintf(errlogFatal, "pvGet(var %s, pv %s): pvVarGetCallback() failure: %s\n", + errlogSevPrintf(errlogFatal, + "pvGet(var %s, pv %s): pvVarGetCallback() failure: %s\n", ch->varName, dbch->dbName, pvVarGetMess(dbch->pvid)); - ss->getReq[varId] = NULL; + ss->getReq[varId] = NULL; /* cancel the request */ freeListFree(sp->pvReqPool, req); - epicsEventSignal(getSem); check_connected(dbch, meta); return status; } @@ -187,32 +200,35 @@ epicsShareFunc pvStat seq_pvGet(SS_ID ss, VAR_ID varId, enum compType compType, /* Synchronous: wait for completion */ if (compType == SYNC) { - epicsEventWaitStatus event_status; - - pvSysFlush(sp->pvSys); - event_status = epicsEventWaitWithTimeout(getSem, tmo); - ss->getReq[varId] = NULL; - epicsEventSignal(getSem); - switch (event_status) + while (ss->getReq[varId]) { - case epicsEventWaitOK: - status = check_connected(dbch, meta); - if (status != pvStatOK) return status; - if (optTest(sp, OPT_SAFE)) - /* Copy regardless of whether dirty flag is set or not */ - ss_read_buffer(ss, ch, FALSE); - break; - case epicsEventWaitTimeout: - meta->status = pvStatTIMEOUT; - meta->severity = pvSevrERROR; - meta->message = "get completion timeout"; - return meta->status; - case epicsEventWaitError: - meta->status = pvStatERROR; - meta->severity = pvSevrERROR; - meta->message = "get completion failure"; - return meta->status; + pvSysFlush(sp->pvSys); + switch (epicsEventWaitWithTimeout(ss->syncSem, tmo)) + { + case epicsEventWaitOK: + break; + case epicsEventWaitTimeout: + ss->getReq[varId] = NULL; /* cancel the request */ + meta->status = pvStatTIMEOUT; + meta->severity = pvSevrERROR; + meta->message = "get completion timeout"; + return meta->status; + case epicsEventWaitError: + errlogSevPrintf(errlogFatal, + "pvGet: epicsEventWaitWithTimeout() failure\n"); + ss->getReq[varId] = NULL; /* cancel the request */ + meta->status = pvStatERROR; + meta->severity = pvSevrERROR; + meta->message = "get completion failure"; + return meta->status; + } } + status = check_connected(dbch, meta); + if (status != pvStatOK) + return status; + if (optTest(sp, OPT_SAFE)) + /* Copy regardless of whether dirty flag is set or not */ + ss_read_buffer(ss, ch, FALSE); } return pvStatOK; @@ -235,10 +251,8 @@ epicsShareFunc boolean seq_pvGetComplete( for (n = 0; n < length; n++) { - epicsEventId getSem = ss->getSem[varId+n]; boolean done = FALSE; CHAN *ch = sp->chan + varId + n; - pvStat status; if (!ch->dbch) { @@ -251,37 +265,15 @@ epicsShareFunc boolean seq_pvGetComplete( } else if (!ss->getReq[varId+n]) { - errlogSevPrintf(errlogMinor, - "pvGetComplete(%s): no pending get request for this variable\n", - ch->varName); - done = TRUE; - } - else - { - switch (epicsEventTryWait(getSem)) + pvStat status = check_connected(ch->dbch, metaPtr(ch,ss)); + if (status == pvStatOK && optTest(sp, OPT_SAFE)) { - case epicsEventWaitOK: - ss->getReq[varId+n] = NULL; - epicsEventSignal(getSem); - status = check_connected(ch->dbch, metaPtr(ch,ss)); - if (status == pvStatOK && optTest(sp, OPT_SAFE)) - { - /* In safe mode, copy value and meta data from shared buffer - to ss local buffer. */ - /* Copy regardless of whether dirty flag is set or not */ - ss_read_buffer(ss, ch, FALSE); - } - done = TRUE; - break; - case epicsEventWaitTimeout: - break; - case epicsEventWaitError: - ss->getReq[varId+n] = NULL; - epicsEventSignal(getSem); - errlogSevPrintf(errlogFatal, "pvGetComplete(%s): " - "epicsEventTryWait(getSem[%d]) failure\n", ch->varName, varId); - break; + /* In safe mode, copy value and meta data from shared buffer + to ss local buffer. */ + /* Copy regardless of whether dirty flag is set or not */ + ss_read_buffer(ss, ch, FALSE); } + done = TRUE; } anyDone = anyDone || done; @@ -291,7 +283,7 @@ epicsShareFunc boolean seq_pvGetComplete( { complete[n] = done; } - else if (any && done) + else if (any == done) { break; } @@ -316,7 +308,6 @@ epicsShareFunc void seq_pvGetCancel( for (n = 0; n < length; n++) { - epicsEventId getSem = ss->getSem[varId+n]; CHAN *ch = ss->prog->chan + varId + n; if (!ch->dbch) @@ -328,8 +319,7 @@ epicsShareFunc void seq_pvGetCancel( } else { - ss->getReq[varId+n] = NULL; - epicsEventSignal(getSem); + ss->getReq[varId+n] = NULL; /* cancel the request */ } } } @@ -408,7 +398,6 @@ epicsShareFunc pvStat seq_pvPut(SS_ID ss, VAR_ID varId, enum compType compType, PVREQ *req; DBCHAN *dbch = ch->dbch; PVMETA *meta = metaPtr(ch,ss); - epicsEventId putSem = ss->putSem[varId]; DEBUG("pvPut: pv name=%s, var=%p\n", dbch ? dbch->dbName : "<anonymous>", var); @@ -433,75 +422,13 @@ epicsShareFunc pvStat seq_pvPut(SS_ID ss, VAR_ID varId, enum compType compType, /* Determine whether to perform synchronous, asynchronous, or plain put ((+a) option was never honored for put, so DEFAULT - means non-blocking and therefore implicitly asynchronous) */ - if (compType == SYNC) - { - double before, after; - pvTimeGetCurrentDouble(&before); - if (tmo <= 0.0) - { - errlogSevPrintf(errlogMajor, - "pvPut(%s,SYNC,%f): user error (timeout must be positive)\n", - ch->varName, tmo); - return pvStatERROR; - } - switch (epicsEventWaitWithTimeout(putSem, tmo)) - { - case epicsEventWaitOK: - pvTimeGetCurrentDouble(&after); - tmo -= (after - before); - if (tmo <= 0.0) - tmo = 0.001; - break; - case epicsEventWaitTimeout: - errlogSevPrintf(errlogMajor, - "pvPut(ss %s, var %s, pv %s): failed (timeout " - "waiting for other put requests to finish)\n", - ss->ssName, ch->varName, dbch->dbName - ); - return pvStatERROR; - case epicsEventWaitError: - /* try to recover */ - ss->putReq[varId] = NULL; - epicsEventSignal(putSem); - errlogSevPrintf(errlogFatal, - "pvPut: epicsEventWaitWithTimeout() failure\n"); - return pvStatERROR; - } - } - else if (compType == ASYNC) - { - switch (epicsEventTryWait(putSem)) - { - case epicsEventWaitOK: - if (ss->putReq[varId] != NULL) - { - /* previous request timed out but user - did not call pvPutComplete */ - ss->putReq[varId] = NULL; - } - break; - case epicsEventWaitTimeout: - meta->status = pvStatERROR; - meta->severity = pvSevrERROR; - meta->message = "already one put pending"; - status = meta->status; - errlogSevPrintf(errlogMajor, - "pvPut(ss %s, var %s, pv %s): user error " - "(there is already a put pending for this variable/" - "state set combination)\n", - ss->ssName, ch->varName, dbch->dbName - ); - return pvStatERROR; - case epicsEventWaitError: - /* try to recover */ - ss->putReq[varId] = NULL; - epicsEventSignal(putSem); - errlogSevPrintf(errlogFatal, - "pvPut: epicsEventTryWait() failure\n"); - return pvStatERROR; - } - } + means fire-and-forget) */ + status = check_pending(pvEventPut, ss, ss->putReq + varId, ch->varName, + dbch, meta, compType, tmo); + if (status != pvStatOK) + return status; + + assert(ss->putReq[varId] == NULL); /* Determine number of elements to put (don't try to put more than db count) */ @@ -541,43 +468,44 @@ epicsShareFunc pvStat seq_pvPut(SS_ID ss, VAR_ID varId, enum compType compType, req); /* user arg */ if (status != pvStatOK) { - ss->putReq[varId] = NULL; errlogSevPrintf(errlogFatal, "pvPut(var %s, pv %s): pvVarPutCallback() failure: %s\n", ch->varName, dbch->dbName, pvVarGetMess(dbch->pvid)); + ss->putReq[varId] = NULL; /* cancel the request */ freeListFree(sp->pvReqPool, req); - epicsEventSignal(putSem); check_connected(dbch, meta); return status; } } - /* Synchronous: wait for completion (10s timeout) */ + /* Synchronous: wait for completion */ if (compType == SYNC) { - epicsEventWaitStatus event_status; - - pvSysFlush(sp->pvSys); - event_status = epicsEventWaitWithTimeout(putSem, tmo); - ss->putReq[varId] = NULL; - epicsEventSignal(putSem); - switch (event_status) + while (ss->putReq[varId]) { - case epicsEventWaitOK: - status = check_connected(dbch, meta); - if (status != pvStatOK) return status; - break; - case epicsEventWaitTimeout: - meta->status = pvStatTIMEOUT; - meta->severity = pvSevrERROR; - meta->message = "put completion timeout"; - return meta->status; - break; - case epicsEventWaitError: - meta->status = pvStatERROR; - meta->severity = pvSevrERROR; - meta->message = "put completion failure"; - return meta->status; - break; + pvSysFlush(sp->pvSys); + switch (epicsEventWaitWithTimeout(ss->syncSem, tmo)) + { + case epicsEventWaitOK: + status = check_connected(dbch, meta); + if (status != pvStatOK) return status; + break; + case epicsEventWaitTimeout: + ss->putReq[varId] = NULL; /* cancel the request */ + meta->status = pvStatTIMEOUT; + meta->severity = pvSevrERROR; + meta->message = "put completion timeout"; + return meta->status; + break; + case epicsEventWaitError: + errlogSevPrintf(errlogFatal, + "pvPut: epicsEventWaitWithTimeout() failure\n"); + ss->putReq[varId] = NULL; /* cancel the request */ + meta->status = pvStatERROR; + meta->severity = pvSevrERROR; + meta->message = "put completion failure"; + return meta->status; + break; + } } } @@ -600,7 +528,6 @@ epicsShareFunc boolean seq_pvPutComplete( for (n = 0; n < length; n++) { - epicsEventId putSem = ss->putSem[varId+n]; boolean done = FALSE; CHAN *ch = sp->chan + varId + n; @@ -615,31 +542,9 @@ epicsShareFunc boolean seq_pvPutComplete( } else if (!ss->putReq[varId+n]) { - errlogSevPrintf(errlogMinor, - "pvPutComplete(%s): no pending put request for this variable\n", - ch->varName); + check_connected(ch->dbch, metaPtr(ch,ss)); done = TRUE; } - else - { - switch (epicsEventTryWait(putSem)) - { - case epicsEventWaitOK: - ss->putReq[varId+n] = NULL; - epicsEventSignal(putSem); - check_connected(ch->dbch, metaPtr(ch,ss)); - done = TRUE; - break; - case epicsEventWaitTimeout: - break; - case epicsEventWaitError: - ss->putReq[varId+n] = NULL; - epicsEventSignal(putSem); - errlogSevPrintf(errlogFatal, "pvPutComplete(%s): " - "epicsEventTryWait(putSem[%d]) failure\n", ch->varName, varId); - break; - } - } anyDone = anyDone || done; allDone = allDone && done; @@ -648,7 +553,7 @@ epicsShareFunc boolean seq_pvPutComplete( { complete[n] = done; } - else if (any && done) + else if (any == done) { break; } @@ -673,7 +578,6 @@ epicsShareFunc void seq_pvPutCancel( for (n = 0; n < length; n++) { - epicsEventId putSem = ss->putSem[varId+n]; CHAN *ch = ss->prog->chan + varId + n; if (!ch->dbch) @@ -685,8 +589,7 @@ epicsShareFunc void seq_pvPutCancel( } else { - ss->putReq[varId+n] = NULL; - epicsEventSignal(putSem); + ss->putReq[varId+n] = NULL; /* cancel the request */ } } } diff --git a/src/seq/seq_main.c b/src/seq/seq_main.c index 155ac5130f4d8c539356e5948d73dfeffdb48d38..63ee5e38fd03e92b6729330d3024a91df7d75e18 100644 --- a/src/seq/seq_main.c +++ b/src/seq/seq_main.c @@ -246,8 +246,6 @@ static boolean init_sprog(PROG *sp, seqProgram *seqProg) */ static boolean init_sscb(PROG *sp, SSCB *ss, seqSS *seqSS) { - unsigned nch; - /* Fill in SSCB */ ss->ssName = seqSS->ssName; ss->numStates = seqSS->numStates; @@ -269,18 +267,6 @@ static boolean init_sscb(PROG *sp, SSCB *ss, seqSS *seqSS) if (sp->numChans > 0) { - ss->getSem = newArray(epicsEventId, sp->numChans); - if (!ss->getSem) - { - errlogSevPrintf(errlogFatal, "init_sscb: calloc failed\n"); - return FALSE; - } - ss->putSem = newArray(epicsEventId, sp->numChans); - if (!ss->putSem) - { - errlogSevPrintf(errlogFatal, "init_sscb: calloc failed\n"); - return FALSE; - } ss->getReq = newArray(PVREQ*, sp->numChans); if (!ss->getReq) { @@ -303,22 +289,7 @@ static boolean init_sscb(PROG *sp, SSCB *ss, seqSS *seqSS) } } } - for (nch = 0; nch < sp->numChans; nch++) - { - ss->getSem[nch] = epicsEventCreate(epicsEventFull); - if (!ss->getSem[nch]) - { - errlogSevPrintf(errlogFatal, "init_sscb: epicsEventCreate failed\n"); - return FALSE; - } - ss->putSem[nch] = epicsEventCreate(epicsEventFull); - if (!ss->putSem[nch]) - { - errlogSevPrintf(errlogFatal, "init_sscb: epicsEventCreate failed\n"); - return FALSE; - } - /* note: do not pre-allocate request structures */ - } + /* note: do not pre-allocate request structures */ ss->dead = epicsEventCreate(epicsEventEmpty); if (!ss->dead) { @@ -530,14 +501,7 @@ void seq_free(PROG *sp) SSCB *ss = sp->ss + nss; epicsEventDestroy(ss->syncSem); - for (nch = 0; nch < sp->numChans; nch++) - { - epicsEventDestroy(ss->getSem[nch]); - epicsEventDestroy(ss->putSem[nch]); - } free(ss->metaData); - free(ss->getSem); - free(ss->putSem); epicsEventDestroy(ss->dead);