zhangbin_1M的个人空间 https://blog.eetop.cn/ross1m [收藏] [复制] [分享] [RSS]

空间首页 动态 记录 日志 相册 主题 分享 留言板 个人资料

日志

A simple message queue in Ventoux

已有 878 次阅读| 2008-5-5 09:43 |个人分类:Ventoux

The messageq module provides simplified, very lightweight message queueing (requiring single thread polling)
No multithread locking is required because single thread is assumed.

Message sending and receiving is formed using very short, inline functions.

Each message queue has a controlling structure of type

    struct messageqQ

The actual message queue buffer is pointed to from the messageQ struct as indicated by initial setup using:

    void messageqQueueInit(
        struct messageqQ *qp,       // control struct to init
        const char *qname,             // for debug purposes
        int msgnbytes,                      // size of each message
        int nq,                                     // number of messages in qbuf
        void *qbuf                              // memory for msgs of size msgnbytes*nq
        );

Adding messages to a queue begins with a call to

    void *messageqSendStart(qp)

which returns a block of memory of size msgnbytes (per initialization).
The caller should then compose the message into this memory, and call

    messageqSendFinish(qp)

to commit the message (before any other message is added or removed).
Alternately, the message may be abandoned without having to make any other calls.

Similarly, a message is received by calling

    void *messageqReceiveStart(qp)

(which returns NULL if no more messages), and then calling

    messageqReceiveFinish(qp).
   

Miscellaneous Functions:

    messageqDestroy(qp) -- call if memory is to be recycled
    messageqPurge(qp) -- throwaway any messages in the message queue
    messageqPrint(void) -- print information about all message queues

====================================================================

// messageq.h -- Simplified message queueing (requiring single thread polling)
//      No multithread locking is required because single thread is assumed.
//

struct messageqQ
{
    int msgnbytes;              // no. of bytes per message in qbuf
    int maxq;                   // max messages in qbuf (capacity of qbuf)
    int nq;                     // no. of messages q'd, 0 to max
    int maxnq;                  // worst case messages q'd (for debug)
    unsigned char *qbuf;        // base address of buffer for q of messages
    unsigned char *qtop;        // == qbuf+maxq*msgnbytes
    unsigned char *qwr;         // where in qbuf to write next message
    unsigned char *qrd;         // where in qbuf to read next message
    struct messageqQ *nextp;    // for linked list for debug purposes
    struct messageqQ *prevp;    // for linked list for debug purposes
    const char *qname;          // for debug purposes
};

//--- messageqQueueInit -- one time initialization of message queue
//      It is recommended that you define a data structure (perhaps using
//      unions) that describes all possible messages for your queue,
//      and define an array:  struct mydatastruct myqueue[nq],
//      use sizeof(datastruct) for the msgnbytes arg.
void messageqQueueInit(
    struct messageqQ *qp,       // control struct to init
    const char *qname,          // for debug purposes
    int msgnbytes,              // size of each message
    int nq,                     // number of messages in qbuf
    void *qbuf                  // memory for msgs of size msgnbytes*nq
    );

// messageqPurge -- throwaway any messages in the message queue
void messageqPurge(
    struct messageqQ *qp        // control struct
    );

// messageqDestroy -- unlink from global list and may zero the messageq
void messageqDestroy(
    struct messageqQ *qp        // control struct
    );

// messageqOverflow -- internal diagnostic routine called when app wants to add
//      a message to a queue that is full.
void messageqOverflow(
    struct messageqQ *qp
    );

//--- messageqSendStart -- obtain next message queue entry location.
//      Returns NULL if no more room... be sure to check...
//      this is not normal, but may be hard to entirely avoid.
//      Assuming non-NULL return, which should be cast to the appropriate
//      data type pointer per application,
//      caller can then fill out the message and call messageqSendFinish()
//      or can abandon it at any time without calling messageSendFinish().
static inline void *messageqSendStart(struct messageqQ *qp)
{
    if ( qp->nq >= qp->maxq )
    {
        messageqOverflow(qp);   // for diagnostics...
        return NULL;
    }
    memset(qp->qwr, 0, qp->msgnbytes);  // optional! just in case
    return qp->qwr;
}

//--- messageqSendFinish -- commit message started with messageqSendStart
static inline void messageqSendFinish(struct messageqQ *qp)
{
    int nq = (++qp->nq);
    // Note: Q room should have been checked at messageqSendStart...
    if ( qp->maxnq < nq ) qp->maxnq = nq;       // for debugging
    unsigned char *qwr = qp->qwr + qp->msgnbytes;
    if ( qwr >= qp->qtop ) qwr = qp->qbuf;
    qp->qwr = qwr;
}

//--- messageqReceiveStart -- get address of message to read in queue.
//      Returns NULL if no more messages to look at -- be sure to check...
//      it is normal to run out of messages.
//      After consuming the message, call messageqReceiveFinish.
static inline void *messageqReceiveStart(struct messageqQ *qp)
{
    if ( qp->nq <= 0 ) return NULL;
    return qp->qrd;
}

//--- messageqReceiveFinish -- dispose of message to be read from queue.
static inline void messageqReceiveFinish(struct messageqQ *qp)
{
    --qp->nq;   // assume noone underflows...
    unsigned char *qrd = qp->qrd + qp->msgnbytes;
    if ( qrd >= qp->qtop ) qrd = qp->qbuf;
    qp->qrd = qrd;
}

// messageqPrint -- printf info about all messages queues
void messageqPrint(void);

// messageqPrintIncremental -- print i'th line of info re. all messages queues
int messageqPrintIncremental(int Increment);

// messageqClearStats -- clear statistics from all linked message queues
void messageqClearStats(void);

-----------------------------------------------------------------------------------------------------------------------

// messageq.c -- Simplified message queueing (requiring single thread polling)
//      No multithread locking is required because single thread is assumed.

#include <stdio.h>
#include <debuglog.h>

#include <messageq.h>

// messageqHead -- for linked list of all queues
//      The only reason to have such a linked list is for debug purposes;
//      otherwise they stand alone.
struct messageqHead
{
    int isinit;                         // nonzero after head is init'd
    struct messageqQ head;          // dummy queue to head linked list
} messageqHead;

static void messageqInitInternal(void)
{
    messageqHead.head.qname = "dummyhead";
    messageqHead.head.nextp = messageqHead.head.prevp =
        &messageqHead.head;
    messageqHead.isinit = 1;
}


void messageqQueueInit(
    struct messageqQ *qp,        // control struct to init
    const char *qname,          // for debug purposes
    int msgnbytes,              // size of each message
    int maxq,                   // number of messages in qbuf
    void *qbuf                  // memory for msgs of size msgnbytes*maxq
    )
{
    if ( ! messageqHead.isinit )
    {
        messageqInitInternal();
    }
    memset(qp, 0, sizeof(*qp));         // to be sure
    qp->msgnbytes = msgnbytes;
    qp->maxq = maxq;
    // in messageqReset: qp->nq = 0;
    qp->maxnq = 0;
    qp->qbuf = qbuf;
    qp->qtop = qbuf + maxq*msgnbytes;
    // in messageqReset: qp->qwr = qbuf;
    // in messageqReset: qp->qrd = qbuf;
    qp->qname = qname;
    // Link into list for debugging
    qp->nextp = &messageqHead.head;
    qp->prevp = messageqHead.head.prevp;
    messageqHead.head.prevp = qp;
    qp->prevp->nextp = qp;

    messageqPurge(qp);          // share code
}

// messageqPurge -- throwaway any messages in the message queue
void messageqPurge(
    struct messageqQ *qp        // control struct
    )
{
    qp->nq = 0;
    qp->qwr = qp->qbuf;
    qp->qrd = qp->qbuf;
}


// messageqDestroy -- unlink from global list and may zero the messageq
void messageqDestroy(
    struct messageqQ *qp        // control struct
    )
{
    qp->nextp->prevp = qp->prevp;
    qp->prevp->nextp = qp->nextp;
    qp->maxnq = 0;
    memset(qp, 0, sizeof(*qp)); // help catch bugs
}

// messageqOverflow -- internal routine called when app wants to add
//      a message to a queue that is full.
void messageqOverflow(
    struct messageqQ *qp
    )
{
    if ( qp->qname )
    {
        printf("WARNING: Messageq %s attempted overflow!\n", qp->qname );
        debugf("WARNING: Messageq %s attempted overflow!\n", qp->qname );
    }
    else
    {
        printf("WARNING: Messageq (no name; qp == %p) attempted overflow!\n",
            qp );
        debugf("WARNING: Messageq (no name; qp == %p) attempted overflow!\n",
            qp );
    }
}

// messageqPrintIncremental -- print i'th line of info re. all messages queues
int messageqPrintIncremental(int Increment)
{
    if ( ! messageqHead.isinit )
    {
        messageqInitInternal();
    }
    int Decrement = Increment++;
    struct messageqQ *qp = &messageqHead.head;
    if ( Decrement-- <= 0 )
    {
        if ( ! messageqHead.isinit )
        {
            messageqInitInternal();
        }
        return Increment;
    }
    if ( Decrement-- <= 0 )
    {
        printf("------------------------------------------------------\n");
        printf("------------- Message Queues: ------------------------\n");
        printf("Full:worst/max   Msgsz QName\n");
        // printf("---------------- ----- -------------------------------\n");
        return Increment;
    }
    for ( qp = qp->nextp; qp != &messageqHead.head; qp = qp->nextp )
    {
        if ( Decrement-- <= 0 )
        {
            printf("     %5d/%5d %5d %s\n",
                qp->maxnq, qp->maxq, qp->msgnbytes, qp->qname );
            return Increment;
        }
    }
    if ( Decrement-- <= 0 )
    {
        printf("------------------------------------------------------\n");
        return Increment;
    }
    return 0;
}

// messageqPrint -- printf info about all messages queues
void messageqPrint(void)
{
    int Increment = 0;
    while ( (Increment = messageqPrintIncremental(Increment)) != 0 ) {;}
}

void messageqClearStats(void)
{
    if ( ! messageqHead.isinit )
    {
        messageqInitInternal();
    }
    struct messageqQ *qp = &messageqHead.head;
    for ( qp = qp->nextp; qp != &messageqHead.head; qp = qp->nextp )
    {
        qp->maxnq = 0;
    }
}


点赞

评论 (0 个评论)

facelist

您需要登录后才可以评论 登录 | 注册

  • 关注TA
  • 加好友
  • 联系TA
  • 0

    周排名
  • 0

    月排名
  • 0

    总排名
  • 0

    关注
  • 1

    粉丝
  • 0

    好友
  • 0

    获赞
  • 5

    评论
  • 445

    访问数
关闭

站长推荐 上一条 /1 下一条

小黑屋| 关于我们| 联系我们| 在线咨询| 隐私声明| EETOP 创芯网
( 京ICP备:10050787号 京公网安备:11010502037710 )

GMT+8, 2024-5-30 08:41 , Processed in 0.027044 second(s), 15 queries , Gzip On, Redis On.

eetop公众号 创芯大讲堂 创芯人才网
返回顶部