This note demonstrates how to use QNX Neutrino's asynchronous messaging. It includes:
Asynchronous messaging is an experimental feature; for information about the use of experimental software, see the Commercial Software License Agreement (CSLA) or Partner Software License Agreement (PSLA) in the Licensing area of our website, http://www.qnx.com/legal/licensing/. |
Asynchronous messaging is a communication model that relies on a store-and-forward technique. Compared to regular reply-based communication in client-server systems, the messaging infrastructure ensures delivery, even if a participant is temporarily offline, busy, or unobtainable. In this technique, the sender doesn't need a response from the recipient. This provides more flexibility and scalability as the senders and receivers are decoupled and are no longer required to execute in lockstep.
In some real-world scenarios, the client doesn't have to wait in a blocking state for the server to finish servicing its requests; rather, it can improve its performance by carrying out some tasks while waiting. Multiple asynchronous messages can also be sent out within a short period of time. Buffering those messages and sending them out as a bunch can reduce the frequency of entering the kernel, and thus improve system performance.
QNX Neutrino's asynchronous messaging provides an event-driven system model that nicely complements the traditional QNX message-passing architecture of synchronous messaging. Its main feature is high throughput and nonblocking message-passing. You can now design systems where QNX Neutrino can react in a timely way to asynchronous events generated externally or internally. Asynchronous messaging also provides a convenient and efficient mechanism to deliver asynchronous events and their associated data between system components.
|
The following figure shows a user process (a producer) that collects data from some input devices (e.g. sensors), buffers it, and sends it to another user process (a consumer) for processing. The basic technique in asynchronous messaging is “batch delivery,” which enhances throughput and reduces overhead. The data is sent as messages that have defined boundaries.
You might want to use asynchronous messaging instead of Send-Receive-Reply when:
You might want to use Send-Receive-Reply instead of asynchronous messaging when:
QNX Neutrino provides several APIs or interfaces for asynchronous messaging. You can use these interfaces in your own applications to experiment with asynchronous messaging.
To: | Use: |
---|---|
Create an asynchronous message channel | asyncmsg_channel_create() |
Destroy an asynchronous message channel | asyncmsg_channel_destroy() |
Establish a connection between a calling process and a channel | asyncmsg_connect_attach() |
Detach the connection | asyncmsg_connect_detach() |
Flush all the messages | asyncmsg_flush() |
Return the original connection attributes | asyncmsg_connect_attr() |
Send message(s) from a buffer or an array of buffers | asyncmsg_put()/asyncmsg_putv() |
Receive an asynchronous message from the channel | asyncmsg_get() |
Free a message buffer | asyncmsg_free() |
Allocate a message buffer for sending | asyncmsg_malloc() |
Here are several code examples to show how asynchronous messaging works:
This is the most simple example; it demonstrates the sending and receiving of one message. The trigger number, which defines the number of messages the sender wants to send at once, is set to 1.
#include <stdio.h> #include <process.h> #include <unistd.h> #include <string.h> #include <sys/asyncmsg.h> char *msg = "AsyncMsg Passing"; int callback(int err, void *cmsg, unsigned handle) { printf("Callback: err = %d, msg = %p (%p), handle = %d\n", err, cmsg, msg, handle); return 0; } /* * Simple test: Create a channel, connect to it, put a message, * get it, done. */ int main() { int chid, coid; struct _asyncmsg_connection_attr aca; struct _asyncmsg_get_header *agh, *agh1; if ((chid = asyncmsg_channel_create(0, 0666, 2048, 5, NULL, NULL)) == -1) { perror("channel_create"); return -1; } memset(&aca, 0, sizeof(aca)); aca.buffer_size = 2048; aca.max_num_buffer = 5; aca.trigger_num_msg = 1; if ((coid = asyncmsg_connect_attach(0, 0, chid, 0, 0, &aca)) == -1) { perror("connect_attach"); return -1; } if ((asyncmsg_put(coid, msg, strlen(msg) + 1, 1234, callback)) == -1) { perror("put"); return -1; } if ((agh = asyncmsg_get(chid)) == NULL) { perror("get"); return -1; } printf("Got message(s): \n\n"); while ((agh1 = agh)) { agh = agh1->next; printf("from process: %d (%d)\n", agh1->info.pid, getpid()); printf("msglen: %d (%d)\n", agh1->info.msglen, strlen(msg) + 1); printf("srclen: %d\n", agh1->info.srcmsglen); printf("err: %d\n", agh1->err); printf("parts: %d\n", agh1->parts); printf("msg: %s\n\n", (char *)agh1->iov->iov_base); asyncmsg_free(agh1); } /* give the callback a chance to run */ sleep(1); if (asyncmsg_connect_detach(coid) == -1) { perror("connect_detach"); return -1; } if (asyncmsg_channel_destroy(chid) == -1) { perror("channel_detach"); return -1; } return 0; }
The following program demonstrates how you can send more than one message from the sender, but use asyncmsg_get() only once to get all these messages together. You must set the appropriate trigger number (3 in this example) in the attributes that you pass to asyncmsg_connect_attach() to trigger the copying of the messages. The trigger number defines the number of messages the sender predetermines to send together. This increases throughput.
#include <stdio.h> #include <process.h> #include <unistd.h> #include <string.h> #include <sys/asyncmsg.h> int callback(int err, void *cmsg, unsigned handle) { printf("Callback: err = %d, msg = %p, handle = %d\n", err, cmsg, handle); return 0; } /* * Multiple put, then see if get can get all of them. */ int main() { int chid, coid, i; struct _asyncmsg_connection_attr aca; struct _asyncmsg_get_header *agh, *agh1; char msg[3][80]; if ((chid = asyncmsg_channel_create(_NTO_CHF_SENDER_LEN, 0666, 2048, 5, NULL, NULL)) == -1) { perror("channel_create"); return -1; } memset(&aca, 0, sizeof(aca)); aca.buffer_size = 2048; aca.max_num_buffer = 5; aca.trigger_num_msg = 3; if ((coid = asyncmsg_connect_attach(0, 0, chid, 0, 0, &aca)) == -1) { perror("connect_attach"); return -1; } for (i = 0; i < 3; i++) { sprintf(msg[i], "Async Message Passing (msgid %d)\n", i); if ((asyncmsg_put(coid, msg[i], strlen(msg[i]) + 1, 1234, callback)) == -1) { perror("put"); return -1; } } if ((agh = asyncmsg_get(chid)) == NULL) { perror("get"); return -1; } printf("Got message(s): \n\n"); while ((agh1 = agh)) { agh = agh1->next; printf("from process: %d (%d)\n", agh1->info.pid, getpid()); printf("msglen: %d (%d)\n", agh1->info.msglen, strlen(*msg) + 1); printf("srclen: %d\n", agh1->info.srcmsglen); printf("err: %d\n", agh1->err); printf("parts: %d\n", agh1->parts); printf("msg: %s\n\n", (char *)agh1->iov->iov_base); asyncmsg_free(agh1); } sleep(1); if (asyncmsg_connect_detach(coid) == -1) { perror("connect_detach"); return -1; } if (asyncmsg_channel_destroy(chid) == -1) { perror("channel_detach"); return -1; } return 0; }
The following program demonstrates the use of asyncmsg_flush(), which you can use to flush all messages; you don't have to set the trigger number. In fact, the receiver gets an event notification from the kernel as soon as messages are ready in the sender. The receiver then promptly picks up all the messages together.
#include <stdio.h> #include <errno.h> #include <process.h> #include <unistd.h> #include <string.h> #include <sys/asyncmsg.h> int num_callback_run = 0; int chid; int callback(int err, void *cmsg, unsigned handle) { num_callback_run++; printf("Callback: err = %d, msg = %p, handle = %d\n", err, cmsg, handle); return 0; } void * thread_for_get(void *arg) { int pchid = (int)arg; struct _pulse pulse; struct _asyncmsg_get_header *agh, *agh1; /* waiting for the event */ if (MsgReceivePulse(pchid, &pulse, sizeof(pulse), NULL) == -1) { perror("MsgReceivePulse"); return NULL; } if ((agh = asyncmsg_get(chid)) == NULL) { perror("get"); return NULL; } printf("Got message(s): \n\n"); while ((agh1 = agh)) { agh = agh1->next; printf("from process: %d (%d)\n", agh1->info.pid, getpid()); printf("msglen: %d\n", agh1->info.msglen); printf("srclen: %d\n", agh1->info.srcmsglen); printf("err: %d\n", agh1->err); printf("parts: %d\n", agh1->parts); printf("msg: %s\n\n", (char *)agh1->iov->iov_base); asyncmsg_free(agh1); } return 0; } /* * No trigger for put, but we force flush() and see if all * messages can be pushed out. */ int main() { int coid, pchid, i; struct sigevent gev; struct _asyncmsg_connection_attr aca; char msg[4][80]; /* prepare the event */ if ((pchid = ChannelCreate(0)) == -1) { perror("ChannelCreate"); return -1; } if ((errno = pthread_create(0, 0, thread_for_get, (void *)pchid)) != EOK) { perror("pthread_create"); return -1; } if ((gev.sigev_coid = ConnectAttach(0, 0, pchid, _NTO_SIDE_CHANNEL,0)) == -1) { perror("ConnectAttach"); return -1; } gev.sigev_notify = SIGEV_PULSE; gev.sigev_priority = SIGEV_PULSE_PRIO_INHERIT; /* async channel */ if ((chid = asyncmsg_channel_create(_NTO_CHF_SENDER_LEN, 0666, 2048, 5, &gev, NULL)) == -1) { perror("channel_create"); return -1; } memset(&aca, 0, sizeof(aca)); aca.buffer_size = 2048; aca.max_num_buffer = 5; aca.trigger_num_msg = 0; aca.call_back = callback; if ((coid = asyncmsg_connect_attach(0, 0, chid, 0, 0, &aca)) == -1) { perror("connect_attach"); return -1; } /* put up 4 messages */ for (i = 0; i < 4; i++) { sprintf(msg[i], "Async Message Passing (msgid %d)\n", i); if ((asyncmsg_put(coid, msg[i], strlen(msg[i]) + 1, 1234, callback)) == -1) { perror("put"); return -1; } } asyncmsg_flush(coid, 0); /* give the put callback a chance to run */ while (num_callback_run < 2) delay(500); if (asyncmsg_connect_detach(coid) == -1) { perror("connect_detach"); return -1; } if (asyncmsg_channel_destroy(chid) == -1) { perror("channel_detach"); return -1; } return 0; }
This program demonstrates how to use asyncmsg_connect_attach() to set the event that's used to trigger copying by the receiver.
#include <stdio.h> #include <process.h> #include <unistd.h> #include <string.h> #include <sys/asyncmsg.h> char *msg = "AsyncMsg Passing"; int callback(int err, void *cmsg, unsigned handle) { printf("Callback: err = %d, msg = %p (%p), handle = %d\n", err, cmsg, msg, handle); return 0; } /* * Simple test. Create a channel, connect to it, put a message, * get them, done */ int main() { int chid, coid, pchid; struct sigevent gev; struct _pulse pulse; struct _asyncmsg_connection_attr aca; struct _asyncmsg_get_header *agh, *agh1; /* prepare the event */ if ((pchid = ChannelCreate(0)) == -1) { perror("ChannelCreate"); return -1; } if ((gev.sigev_coid = ConnectAttach(0, 0, pchid, _NTO_SIDE_CHANNEL,0)) == -1) { perror("ConnectAttach"); return -1; } gev.sigev_notify = SIGEV_PULSE; gev.sigev_priority = SIGEV_PULSE_PRIO_INHERIT; /* async channel */ if ((chid = asyncmsg_channel_create(0, 0666, 2048, 5, &gev, NULL)) == -1) { perror("channel_create"); return -1; } memset(&aca, 0, sizeof(aca)); aca.buffer_size = 2048; aca.max_num_buffer = 5; aca.trigger_num_msg = 1; aca.call_back = callback; if ((coid = asyncmsg_connect_attach(0, 0, chid, 0, 0, &aca)) == -1) { perror("connect_attach"); return -1; } if ((asyncmsg_put(coid, msg, strlen(msg) + 1, 0, 0)) == -1) { perror("put"); return -1; } /* waiting for the event */ if (MsgReceivePulse(pchid, &pulse, sizeof(pulse), NULL) == -1) { perror("MsgReceivePulse"); return -1; } if ((agh = asyncmsg_get(chid)) == NULL) { perror("get"); return -1; } printf("Got message(s): \n\n"); while ((agh1 = agh)) { agh = agh1->next; printf("from process: %d (%d)\n", agh1->info.pid, getpid()); printf("msglen: %d (%d)\n", agh1->info.msglen, strlen(msg) + 1); printf("srclen: %d\n", agh1->info.srcmsglen); printf("err: %d\n", agh1->err); printf("parts: %d\n", agh1->parts); printf("msg: %s\n\n", (char *)agh1->iov->iov_base); asyncmsg_free(agh1); } /* give the callback a chance to run */ sleep(1); if (asyncmsg_connect_detach(coid) == -1) { perror("connect_detach"); return -1; } if (asyncmsg_channel_destroy(chid) == -1) { perror("channel_detach"); return -1; } return 0; }
In this example, you use time to trigger the copying by the receiver. Here, a timer is set for a certain period of time (for example, 2 seconds). After the timer expires, the receiver gets the notification and starts copying the messages.
#include <stdio.h> #include <process.h> #include <unistd.h> #include <string.h> #include <sys/asyncmsg.h> int callback(int err, void *cmsg, unsigned handle) { printf("Callback: err = %d, msg = %p, handle = %d\n", err, cmsg, handle); return 0; } /* * Set the get trigger to 2 seconds, put up 4 messages, see we could get a * notification for get after 2 seconds. */ int main() { int chid, coid, pchid, i; struct sigevent gev; struct _pulse pulse; struct _asyncmsg_connection_attr aca; struct _asyncmsg_get_header *agh, *agh1; char msg[4][80]; /* prepare the event */ if ((pchid = ChannelCreate(0)) == -1) { perror("ChannelCreate"); return -1; } if ((gev.sigev_coid = ConnectAttach(0, 0, pchid, _NTO_SIDE_CHANNEL, 0)) == -1) { perror("ConnectAttach"); return -1; } gev.sigev_notify = SIGEV_PULSE; gev.sigev_priority = SIGEV_PULSE_PRIO_INHERIT; /* async channel */ if ((chid = asyncmsg_channel_create(_NTO_CHF_SENDER_LEN, 0666, 2048, 5, &gev, NULL)) == -1) { perror("channel_create"); return -1; } memset(&aca, 0, sizeof(aca)); aca.buffer_size = 2048; aca.max_num_buffer = 5; aca.trigger_num_msg = 0; aca.trigger_time.nsec = 2000000000L; aca.trigger_time.interval_nsec = 0; aca.call_back = callback; if ((coid = asyncmsg_connect_attach(0, 0, chid, 0, 0, &aca)) == -1) { perror("connect_attach"); return -1; } /* put up 4 messages */ for (i = 0; i < 4; i++) { sprintf(msg[i], "Async Message Passing (msgid %d)\n", i); if ((asyncmsg_put(coid, msg[i], strlen(msg[i]) + 1, 1234, callback)) == -1) { perror("put"); return -1; } } /* waiting for the event */ if (MsgReceivePulse(pchid, &pulse, sizeof(pulse), NULL) == -1) { perror("MsgReceivePulse"); return -1; } if ((agh = asyncmsg_get(chid)) == NULL) { perror("get"); return -1; } printf("Got message(s): \n\n"); while ((agh1 = agh)) { agh = agh1->next; printf("from process: %d (%d)\n", agh1->info.pid, getpid()); printf("msglen: %d (%d)\n", agh1->info.msglen, strlen(*msg) + 1); printf("srclen: %d\n", agh1->info.srcmsglen); printf("err: %d\n", agh1->err); printf("parts: %d\n", agh1->parts); printf("msg: %s\n\n", (char *)agh1->iov->iov_base); asyncmsg_free(agh1); } /* give the callback a chance to run */ sleep(1); if (asyncmsg_connect_detach(coid) == -1) { perror("connect_detach"); return -1; } if (asyncmsg_channel_destroy(chid) == -1) { perror("channel_detach"); return -1; } return 0; }
This is a two-process example demonstrating simple asynchronous message delivery between a client and server process.
Both asynch_client.c and asynch_server.c files share the same header, asynch_server.h, to find each other using the registered name, and a defined message type for the query:
#define RECV_NAME "ASYNCH_RECEIVER" #define GET_ACHID 2 // message type for async chid query: no data; reply is 32-bit chid
The following file (asynch_server.c) is the server code that demonstrates asynchronous messaging. It registers a name with name_attach() so that the client(s) can find it, and answers one query from a client to get its asynchronous channel ID.
This server uses a MsgReceive() as a blocking point, and gets a pulse from the kernel whenever there are messages available to receive on the asynchronous channel, allowing the server to handle both synchronous and asynchronous messages easily.
#include <stdlib.h> #include <stdio.h> #include <errno.h> #include <sys/siginfo.h> #include <sys/neutrino.h> #include <sys/dispatch.h> #include <sys/asyncmsg.h> #include <unistd.h> #include <sys/trace.h> #include "asynch_server.h" #define PROGNAME "asynch_server: " /* structure for receive buffer for MsgReceive * currently only query message or pulse. */ union recv_msgs { struct _pulse pulse; uint16_t type; } recv_buf; /* pulse code for asynch message notification from kernel * this is a user pulse code, chosen here -- it need not * be this value or any particular value */ #define PULSE_ASYNCH_EVENT (_PULSE_CODE_MINAVAIL + 5) int main(int argc, char *argv[]) { name_attach_t *att; int rcvid; struct _msg_info msg_info; struct sigevent sigev; int self_coid; int achid; struct _asyncmsg_get_header *agh, *agh1; /* register my name so client can find me */ att = name_attach(NULL,RECV_NAME, 0 ); if (NULL == att ) { perror(PROGNAME "name_attach()"); exit(EXIT_FAILURE); } /* create a connection to the synchronous channel created by the * name_attach() call. Will use this to specify where the pulses * flagging async data available should be delivered. */ self_coid = ConnectAttach( 0, 0, att->chid, _NTO_SIDE_CHANNEL, 0 ); if( -1 == self_coid ) { perror(PROGNAME "ConnectAttach"); exit( EXIT_FAILURE ); } /* and fill in the event structure to describe a priority 10 pulse * to be delivered to my synchronous channel */ SIGEV_PULSE_INIT( &sigev, self_coid, 10, PULSE_ASYNCH_EVENT, 0 ); /* create an asynchronous channel with automatic buffering * it will not block an asyncmsg_get() call if there are no messages available * it will receive up to 10 messages of up to 1024 bytes each at once * I will get a pulse notification when the queue of available messages goes * from empty to non-empty */ achid = asyncmsg_channel_create( _NTO_CHF_ASYNC_NONBLOCK, 0666, 1024, 10, &sigev, NULL ); if( -1 == achid ) { perror( "asyncmsg_channel_create"); exit( EXIT_FAILURE ); } while(1) { rcvid = MsgReceive( att->chid, &recv_buf, sizeof (recv_buf), &msg_info ); if( -1 == rcvid ) { perror(PROGNAME "MsgReceive failed"); continue; } if ( 0 == rcvid ) { /* we received a pulse */ printf("got a pulse\n"); switch( recv_buf.pulse.code ) { /* system disconnect pulse */ case _PULSE_CODE_DISCONNECT: ConnectDetach( recv_buf.pulse.scoid ); printf(PROGNAME "disconnect from a client %X\n", recv_buf.pulse.scoid); break; /* our pulse - we've got one or more messages */ case PULSE_ASYNCH_EVENT: /* get one or more messages from our channel */ agh = asyncmsg_get( achid ); if (NULL == agh ) { perror("went to get a message, but nothing was there"); } else { /* the async receive header is, actually, a linked list of headers * if multiple messages have been received at once, so we need to * walk the list, looking at each header and message in turn */ while( agh ) { printf("the message came from %d in %d parts\n", agh->info.pid, agh->parts); printf("the data is '%s'\n", (char *)(agh->iov->iov_base)); agh1 = agh; agh = agh1->next; /* free this message */ asyncmsg_free( agh1 ); } } break; default: printf(PROGNAME "unexpected pulse code: %d\n", recv_buf.pulse.code ); break; } continue; } /* not an error, not a pulse, therefor a message */ if ( recv_buf.type == _IO_CONNECT ) { /* _IO_CONNECT because someone did a name_open() to us, must EOK it.*/ MsgReply( rcvid, EOK, NULL, 0 ); continue; } if ( recv_buf.type > _IO_BASE && recv_buf.type <= _IO_MAX ) { /* unexpected system message,probably qconn, error it */ MsgError( rcvid, ENOSYS ); continue; } switch( recv_buf.type ) { /* here our client asked for our asynchronous channel id * reply with it */ case GET_ACHID: printf("got request for my achid\n"); MsgReply(rcvid, 0, &achid, sizeof(achid )); break; default: /* some other expect message */ printf(PROGNAME "expect message type: %d\n", recv_buf.type); MsgError(rcvid, ENOSYS ); break; } } }
The following file, asynch_client.c, first locates the server using name_locate(), then sends a query to get the server's asynchronous channel ID. Next, it creates an asynchronous connection, and sends a message every second, with a callback attached to print out that the message was sent.
#include <stdlib.h> #include <stdio.h> #include <errno.h> #include <sys/siginfo.h> #include <sys/neutrino.h> #include <sys/dispatch.h> #include <sys/asyncmsg.h> #include <unistd.h> #include <sys/trace.h> #include "asynch_server.h" #define PROGNAME "asynch_client: " /* callback is run after each message has been delivered to the server. It is * registered at connect time, and called by a thread created during the * connect processing. */ int callback(int err, void *cmsg, unsigned handle) { static int num_callback_run = 0; ++num_callback_run; printf("Callback: err = %d, msg = %p, handle = %d\n", err, cmsg, handle); printf("run the %dth time by %d tid\n", num_callback_run, pthread_self()); return 0; } int main(int argc, char *argv[]) { int server_coid, a_coid; short msg; int a_chid; struct _asyncmsg_connection_attr aca; struct _server_info sinfo; int count = 1; /* look for server */ server_coid = name_open( RECV_NAME, 0 ); while( server_coid == -1 ) { sleep(1); server_coid = name_open( RECV_NAME, 0 ); } /* need server's pid, use this to get server info including pid */ ConnectServerInfo( 0, server_coid, &sinfo ); /* send message to server request the asynchronous channel id */ msg = GET_ACHID; if (MsgSend( server_coid, &msg, sizeof( msg ), &a_chid, sizeof(a_chid) )) { perror(PROGNAME "MsgSend"); exit( EXIT_FAILURE ); } /* setup array of send buffers */ memset(&aca, 0, sizeof(aca)); aca.buffer_size = 2048; aca.max_num_buffer = 5; /* don't locally accumulate more than 5 buffers */ aca.trigger_num_msg = 1; /* deliver each message as it is sent */ aca.call_back = callback; /* call this function after messages are sent */ /* create my asynchronous connection to the server */ a_coid = asyncmsg_connect_attach( 0, sinfo.pid, a_chid, 0, 0, &aca ); if( -1 == a_coid ) { perror( "async connect"); printf("server pid is %d\n", sinfo.pid ); exit( EXIT_FAILURE ); } while(1) { int ret, len; char buf[80]; len = sprintf(buf, "message #%d", count)+1; printf("sending %dth time\n", count ); /* deliver message to server, the count value will be passed into * my callback as the "handle" */ ret = asyncmsg_put( a_coid, buf, len, count, NULL ); if( -1 == ret ) { perror( "put"); exit( EXIT_FAILURE ); } count++; sleep(1); } }