| |
The messageq module provides simplified, very lightweight message queueing (requiring single thread polling)
No multithread locking is required because single thread is assumed.
Message sending and receiving is formed using very short, inline functions.
Each message queue has a controlling structure of type
struct messageqQ
The actual message queue buffer is pointed to from the messageQ struct as indicated by initial setup using:
void messageqQueueInit(
struct messageqQ *qp, // control struct to init
const char *qname, // for debug purposes
int msgnbytes, // size of each message
int nq, // number of messages in qbuf
void *qbuf // memory for msgs of size msgnbytes*nq
);
Adding messages to a queue begins with a call to
void *messageqSendStart(qp)
which returns a block of memory of size msgnbytes (per initialization).
The caller should then compose the message into this memory, and call
messageqSendFinish(qp)
to commit the message (before any other message is added or removed).
Alternately, the message may be abandoned without having to make any other calls.
Similarly, a message is received by calling
void *messageqReceiveStart(qp)
(which returns NULL if no more messages), and then calling
messageqReceiveFinish(qp).
Miscellaneous Functions:
messageqDestroy(qp) -- call if memory is to be recycled
messageqPurge(qp) -- throwaway any messages in the message queue
messageqPrint(void) -- print information about all message queues
====================================================================
// messageq.h -- Simplified message queueing (requiring single thread polling)
// No multithread locking is required because single thread is assumed.
//
struct messageqQ
{
int msgnbytes; // no. of bytes per message in qbuf
int maxq; // max messages in qbuf (capacity of qbuf)
int nq; // no. of messages q'd, 0 to max
int maxnq; // worst case messages q'd (for debug)
unsigned char *qbuf; // base address of buffer for q of messages
unsigned char *qtop; // == qbuf+maxq*msgnbytes
unsigned char *qwr; // where in qbuf to write next message
unsigned char *qrd; // where in qbuf to read next message
struct messageqQ *nextp; // for linked list for debug purposes
struct messageqQ *prevp; // for linked list for debug purposes
const char *qname; // for debug purposes
};
//--- messageqQueueInit -- one time initialization of message queue
// It is recommended that you define a data structure (perhaps using
// unions) that describes all possible messages for your queue,
// and define an array: struct mydatastruct myqueue[nq],
// use sizeof(datastruct) for the msgnbytes arg.
void messageqQueueInit(
struct messageqQ *qp, // control struct to init
const char *qname, // for debug purposes
int msgnbytes, // size of each message
int nq, // number of messages in qbuf
void *qbuf // memory for msgs of size msgnbytes*nq
);
// messageqPurge -- throwaway any messages in the message queue
void messageqPurge(
struct messageqQ *qp // control struct
);
// messageqDestroy -- unlink from global list and may zero the messageq
void messageqDestroy(
struct messageqQ *qp // control struct
);
// messageqOverflow -- internal diagnostic routine called when app wants to add
// a message to a queue that is full.
void messageqOverflow(
struct messageqQ *qp
);
//--- messageqSendStart -- obtain next message queue entry location.
// Returns NULL if no more room... be sure to check...
// this is not normal, but may be hard to entirely avoid.
// Assuming non-NULL return, which should be cast to the appropriate
// data type pointer per application,
// caller can then fill out the message and call messageqSendFinish()
// or can abandon it at any time without calling messageSendFinish().
static inline void *messageqSendStart(struct messageqQ *qp)
{
if ( qp->nq >= qp->maxq )
{
messageqOverflow(qp); // for diagnostics...
return NULL;
}
memset(qp->qwr, 0, qp->msgnbytes); // optional! just in case
return qp->qwr;
}
//--- messageqSendFinish -- commit message started with messageqSendStart
static inline void messageqSendFinish(struct messageqQ *qp)
{
int nq = (++qp->nq);
// Note: Q room should have been checked at messageqSendStart...
if ( qp->maxnq < nq ) qp->maxnq = nq; // for debugging
unsigned char *qwr = qp->qwr + qp->msgnbytes;
if ( qwr >= qp->qtop ) qwr = qp->qbuf;
qp->qwr = qwr;
}
//--- messageqReceiveStart -- get address of message to read in queue.
// Returns NULL if no more messages to look at -- be sure to check...
// it is normal to run out of messages.
// After consuming the message, call messageqReceiveFinish.
static inline void *messageqReceiveStart(struct messageqQ *qp)
{
if ( qp->nq <= 0 ) return NULL;
return qp->qrd;
}
//--- messageqReceiveFinish -- dispose of message to be read from queue.
static inline void messageqReceiveFinish(struct messageqQ *qp)
{
--qp->nq; // assume noone underflows...
unsigned char *qrd = qp->qrd + qp->msgnbytes;
if ( qrd >= qp->qtop ) qrd = qp->qbuf;
qp->qrd = qrd;
}
// messageqPrint -- printf info about all messages queues
void messageqPrint(void);
// messageqPrintIncremental -- print i'th line of info re. all messages queues
int messageqPrintIncremental(int Increment);
// messageqClearStats -- clear statistics from all linked message queues
void messageqClearStats(void);
-----------------------------------------------------------------------------------------------------------------------
// messageq.c -- Simplified message queueing (requiring single thread polling)
// No multithread locking is required because single thread is assumed.
#include <stdio.h>
#include <debuglog.h>
#include <messageq.h>
// messageqHead -- for linked list of all queues
// The only reason to have such a linked list is for debug purposes;
// otherwise they stand alone.
struct messageqHead
{
int isinit; // nonzero after head is init'd
struct messageqQ head; // dummy queue to head linked list
} messageqHead;
static void messageqInitInternal(void)
{
messageqHead.head.qname = "dummyhead";
messageqHead.head.nextp = messageqHead.head.prevp =
&messageqHead.head;
messageqHead.isinit = 1;
}
void messageqQueueInit(
struct messageqQ *qp, // control struct to init
const char *qname, // for debug purposes
int msgnbytes, // size of each message
int maxq, // number of messages in qbuf
void *qbuf // memory for msgs of size msgnbytes*maxq
)
{
if ( ! messageqHead.isinit )
{
messageqInitInternal();
}
memset(qp, 0, sizeof(*qp)); // to be sure
qp->msgnbytes = msgnbytes;
qp->maxq = maxq;
// in messageqReset: qp->nq = 0;
qp->maxnq = 0;
qp->qbuf = qbuf;
qp->qtop = qbuf + maxq*msgnbytes;
// in messageqReset: qp->qwr = qbuf;
// in messageqReset: qp->qrd = qbuf;
qp->qname = qname;
// Link into list for debugging
qp->nextp = &messageqHead.head;
qp->prevp = messageqHead.head.prevp;
messageqHead.head.prevp = qp;
qp->prevp->nextp = qp;
messageqPurge(qp); // share code
}
// messageqPurge -- throwaway any messages in the message queue
void messageqPurge(
struct messageqQ *qp // control struct
)
{
qp->nq = 0;
qp->qwr = qp->qbuf;
qp->qrd = qp->qbuf;
}
// messageqDestroy -- unlink from global list and may zero the messageq
void messageqDestroy(
struct messageqQ *qp // control struct
)
{
qp->nextp->prevp = qp->prevp;
qp->prevp->nextp = qp->nextp;
qp->maxnq = 0;
memset(qp, 0, sizeof(*qp)); // help catch bugs
}
// messageqOverflow -- internal routine called when app wants to add
// a message to a queue that is full.
void messageqOverflow(
struct messageqQ *qp
)
{
if ( qp->qname )
{
printf("WARNING: Messageq %s attempted overflow!\n", qp->qname );
debugf("WARNING: Messageq %s attempted overflow!\n", qp->qname );
}
else
{
printf("WARNING: Messageq (no name; qp == %p) attempted overflow!\n",
qp );
debugf("WARNING: Messageq (no name; qp == %p) attempted overflow!\n",
qp );
}
}
// messageqPrintIncremental -- print i'th line of info re. all messages queues
int messageqPrintIncremental(int Increment)
{
if ( ! messageqHead.isinit )
{
messageqInitInternal();
}
int Decrement = Increment++;
struct messageqQ *qp = &messageqHead.head;
if ( Decrement-- <= 0 )
{
if ( ! messageqHead.isinit )
{
messageqInitInternal();
}
return Increment;
}
if ( Decrement-- <= 0 )
{
printf("------------------------------------------------------\n");
printf("------------- Message Queues: ------------------------\n");
printf("Full:worst/max Msgsz QName\n");
// printf("---------------- ----- -------------------------------\n");
return Increment;
}
for ( qp = qp->nextp; qp != &messageqHead.head; qp = qp->nextp )
{
if ( Decrement-- <= 0 )
{
printf(" %5d/%5d %5d %s\n",
qp->maxnq, qp->maxq, qp->msgnbytes, qp->qname );
return Increment;
}
}
if ( Decrement-- <= 0 )
{
printf("------------------------------------------------------\n");
return Increment;
}
return 0;
}
// messageqPrint -- printf info about all messages queues
void messageqPrint(void)
{
int Increment = 0;
while ( (Increment = messageqPrintIncremental(Increment)) != 0 ) {;}
}
void messageqClearStats(void)
{
if ( ! messageqHead.isinit )
{
messageqInitInternal();
}
struct messageqQ *qp = &messageqHead.head;
for ( qp = qp->nextp; qp != &messageqHead.head; qp = qp->nextp )
{
qp->maxnq = 0;
}
}