<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"> <html> <head> <meta http-equiv="Content-Type" content="text/html;charset=utf-8"> <meta http-equiv="cache-control" content="no-cache"> <title>Genivia - The mq plugin for inbound message queueing and message replay</title> <link href="genivia_tabs.css" rel="stylesheet" type="text/css"/> <script type="text/javascript" src="jquery.js"></script> <script type="text/javascript" src="dynsections.js"></script> <link href="doxygen.css" rel="stylesheet" type="text/css"> <link href="genivia_content.css" rel="stylesheet" type="text/css"> </head> <body> <div id="top"> <div id="titlearea"> <table height="72px" width="100%" cellspacing="0" cellpadding="0"> <tbody> <tr> <td width="10%"> </td> <td width="175px"><a href="https://www.genivia.com"><img alt="Genivia" src="GeniviaLogo2_trans_noslogan.png"/></a></td> <td class="tab_home"><a href="https://www.genivia.com">Home</a></td> <td class="tab_home"><a href="https://www.genivia.com/docs.html">Documentation</a></td> <td> <div style="float: right; font-size: 18px; font-weight: bold;">The mq plugin for inbound message queueing and message replay</div> <br> <div style="float: right; font-size: 10px;">updated Mon Dec 19 2016</div> </td> <td width="10%"> </td> </tr> </tbody> </table> </div> <!-- Generated by Doxygen 1.8.11 --> <div id="navrow1" class="tabs"> <ul class="tablist"> <li><a href="index.html"><span>Main Page</span></a></li> <li class="current"><a href="pages.html"><span>Related Pages</span></a></li> <li><a href="annotated.html"><span>Classes</span></a></li> <li><a href="files.html"><span>Files</span></a></li> </ul> </div> </div><!-- top --> <div class="header"> <div class="headertitle"> <div class="title">The mq plugin for inbound message queueing and message replay </div> </div> </div><!--header--> <div class="contents"> <div class="textblock"><p>The inbound message queueing plugin can be used to queue messages that should not be discarded with the WS-RM protocol's NoDiscard behavior. Messages that are out of sequence as per WS-RM protocol and should be handled by one thread (or a thread pool) should be queued for later replay and service operation invocation. If an unlimited number of threads is available, the simplest WS-RM protocol NoDiscard behavior is implemented by starting a thread for each inbound message and letting the thread block with the <a class="el" href="wsrmapi_8h.html#af8e440bcc087dbd943b30a112f476193" title="Receiver (server)-side check for the presence of WS-Addressing and WS-RM header blocks in the SOAP he...">soap_wsrm_check_and_wait()</a> or <a class="el" href="wsrmapi_8h.html#a8641e119a993564112e0a0e4a1000d0d" title="Receiver (server)-side check for the presence of WS-Addressing and WS-RM header blocks in the SOAP he...">soap_wsrm_check_send_empty_response_and_wait()</a> calls. However, that approach is not efficient with HTTP keep-alive because the next messages on the keep-alive socket will be blocked from being processed. This plugin is designed to process messages on an HTTP keep-alive socket even when operations block.</p> <h1><a class="anchor" id="mq_1"></a> Server-Side Queueing of One-Way Messages</h1> <p>Queueing one-way messages for internal replay is implemented with the message queueing plugin as follows, by queueing inbound messages received on a single socket and then replaying them all in sequence as received from the socket:</p> <div class="fragment"><div class="line"><span class="preprocessor">#include "<a class="code" href="mq_8h.html">mq.h</a>"</span></div><div class="line"></div><div class="line"> <span class="keyword">struct </span>soap *soap = soap_new1(SOAP_IO_KEEPALIVE);</div><div class="line"> soap_register_plugin(soap, <a class="code" href="mq_8h.html#a96bf20fae13f6f4318f1592b9d0adfba">soap_mq</a>);</div><div class="line"> ...</div><div class="line"> <span class="comment">// port bind etc</span></div><div class="line"> ...</div><div class="line"> <span class="keywordflow">while</span> (soap_valid_socket(soap_accept(soap)))</div><div class="line"> {</div><div class="line"> <span class="comment">// queue all messages on this socket (socket is HTTP keep alive)</span></div><div class="line"> <span class="comment">// for each message received, we send HTTP 202 Accepted</span></div><div class="line"> <span class="keyword">struct </span>ms_queue *queue = <a class="code" href="mq_8h.html#abffc350d4d0892c678b294badd572f11">soap_mq_queue</a>(soap);</div><div class="line"> <span class="keyword">struct </span>ms_msg *msg;</div><div class="line"> <span class="keywordflow">while</span> (<a class="code" href="mq_8h.html#a762d9aa6281902622b846135918b897f">soap_mq_get</a>(soap, queue))</div><div class="line"> soap_send_empty_response(soap, 202); <span class="comment">// 202 Accept</span></div><div class="line"></div><div class="line"> <span class="comment">// we now internally replay all messages to invoke services</span></div><div class="line"> <span class="comment">// services are assumed to NOT send a response message back</span></div><div class="line"> <span class="comment">// i.e. one-way operations</span></div><div class="line"> <span class="keywordflow">for</span> (msg = <a class="code" href="mq_8h.html#a5efe3ffa00498c5b01ccdb5d447cffa2">soap_mq_begin</a>(queue); msg; msg = <a class="code" href="mq_8h.html#aa7609d8ff2851cb2b0237710216039aa">soap_mq_next</a>(msg))</div><div class="line"> soap_serve(&msg->soap);</div><div class="line"></div><div class="line"> <span class="comment">// delete all queued messages, also calls these on each queued msg state:</span></div><div class="line"> <span class="comment">// soap_destroy(&msg->soap);</span></div><div class="line"> <span class="comment">// soap_end(&msg->soap);</span></div><div class="line"> <span class="comment">// soap_done(&msg->soap);</span></div><div class="line"> <a class="code" href="mq_8h.html#a85cbc8e97baa65dc3b3c6bc124213c27">soap_mq_del</a>(queue, NULL);</div><div class="line"></div><div class="line"> <span class="comment">// delete the queue (allocated in current context)</span></div><div class="line"> soap_destroy(soap);</div><div class="line"> soap_end(soap);</div><div class="line"> }</div></div><!-- fragment --><p>Alternatively, it is also possible to call soap_mq_del(queue, msg) after soap_serve(&msg->soap) to immediately delete the message after processing (calling soap_mq_next(msg) next in the loop is still valid).</p> <h1><a class="anchor" id="mq_2"></a> WS-RM Server-Side Message Queueing for NoDiscard Behavior with Callback Services</h1> <p>When messages are controlled by the WS-ReliableMessaging protocol, we can keep the WS-RM messages in a queue that were received out of order until the order is restored and queued messages can be dispatched. This WS-RM behavior is desirable with WS-RM NoDiscard. To implement this approach, we use an inbound message queue for each socket accepted and processed by a thread.</p> <div class="fragment"><div class="line"><span class="preprocessor">#include "<a class="code" href="wsaapi_8h.html">wsaapi.h</a>"</span></div><div class="line"><span class="preprocessor">#include "<a class="code" href="wsrmapi_8h.html">wsrmapi.h</a>"</span></div><div class="line"><span class="preprocessor">#include "<a class="code" href="mq_8h.html">mq.h</a>"</span></div><div class="line"><span class="preprocessor">#include "threads.h"</span></div><div class="line"></div><div class="line"> <span class="keyword">struct </span>soap *soap = soap_new1(SOAP_IO_KEEPALIVE);</div><div class="line"> soap_register_plugin(soap, <a class="code" href="wsaapi_8h.html#aa013e3760b97c2efcc71d29b57394501">soap_wsa</a>);</div><div class="line"> soap_register_plugin(soap, <a class="code" href="wsrmapi_8h.html#a3ca1614f5da3589a41957cb2f93394dc">soap_wsrm</a>);</div><div class="line"> soap_register_plugin(soap, <a class="code" href="mq_8h.html#a96bf20fae13f6f4318f1592b9d0adfba">soap_mq</a>);</div><div class="line"> ...</div><div class="line"> <span class="comment">// port bind etc</span></div><div class="line"> ...</div><div class="line"> <span class="keywordflow">while</span> (soap_valid_socket(soap_accept(soap)))</div><div class="line"> {</div><div class="line"> THREAD_TYPE tid;</div><div class="line"> <span class="keyword">struct </span>soap *tsoap = soap_copy(soap);</div><div class="line"> <span class="keywordflow">if</span> (!tsoap)</div><div class="line"> {</div><div class="line"> soap_closesock(soap);</div><div class="line"> <span class="keywordflow">continue</span>;</div><div class="line"> }</div><div class="line"> THREAD_CREATE(&tid, (<span class="keywordtype">void</span>*(*)(<span class="keywordtype">void</span>*))process_request, (<span class="keywordtype">void</span>*)tsoap);</div><div class="line"> }</div><div class="line"></div><div class="line"><span class="keywordtype">void</span> *process_request(<span class="keywordtype">void</span> *tsoap)</div><div class="line">{</div><div class="line"> <span class="keyword">struct </span>soap *soap = (<span class="keyword">struct </span>soap*)tsoap;</div><div class="line"> <span class="keyword">struct </span>ms_queue *queue = <a class="code" href="mq_8h.html#abffc350d4d0892c678b294badd572f11">soap_mq_queue</a>(soap);</div><div class="line"> <span class="keyword">struct </span>ms_msg *msg;</div><div class="line"> <span class="keyword">struct </span>soap ctx;</div><div class="line"> <span class="keywordflow">while</span> ((msg = <a class="code" href="mq_8h.html#a762d9aa6281902622b846135918b897f">soap_mq_get</a>(soap, queue)) != NULL)</div><div class="line"> {</div><div class="line"> <span class="comment">// parse the message headers, if NoDiscard then keep message in queue to retry later</span></div><div class="line"> <span class="comment">// copy the context, since we want to preserve the original to retry later</span></div><div class="line"> soap_copy_context(&ctx, &msg->soap);</div><div class="line"></div><div class="line"> <span class="keywordflow">if</span> (soap_begin_serve(&ctx))</div><div class="line"> {</div><div class="line"> soap_send_fault(&ctx); <span class="comment">// send fault, close socket</span></div><div class="line"> <a class="code" href="mq_8h.html#a85cbc8e97baa65dc3b3c6bc124213c27">soap_mq_del</a>(queue, msg); <span class="comment">// delete message from queue</span></div><div class="line"> }</div><div class="line"> <span class="keywordflow">else</span> <span class="keywordflow">if</span> (!ctx.header || !ctx.header->wsrm__Sequence)</div><div class="line"> {</div><div class="line"> <span class="comment">// this is not a WS-RM message, so serve immediately</span></div><div class="line"> soap_serve(&msg->soap); <span class="comment">// service operations</span></div><div class="line"> <a class="code" href="mq_8h.html#a85cbc8e97baa65dc3b3c6bc124213c27">soap_mq_del</a>(queue, msg); <span class="comment">// delete message from queue</span></div><div class="line"> }</div><div class="line"> <span class="keywordflow">else</span> <span class="keywordflow">if</span> (!<a class="code" href="wsrmapi_8h.html#a0304e1219b99929fa576cf12114206eb">soap_wsrm_check</a>(&ctx))</div><div class="line"> {</div><div class="line"> <span class="comment">// check is OK, process this WS-RM message now</span></div><div class="line"> soap_serve(&msg->soap); <span class="comment">// service operations SHOULD NOT call soap_wsrm_check()</span></div><div class="line"> <a class="code" href="mq_8h.html#a85cbc8e97baa65dc3b3c6bc124213c27">soap_mq_del</a>(queue, msg); <span class="comment">// delete message from queue</span></div><div class="line"> }</div><div class="line"> <span class="keywordflow">else</span> <span class="keywordflow">if</span> (ctx.error != SOAP_STOP)</div><div class="line"> {</div><div class="line"> <span class="comment">// check failed, not a WS-RM message or other WS-RM error</span></div><div class="line"> soap_send_fault(&ctx); <span class="comment">// send fault, close socket</span></div><div class="line"> <a class="code" href="mq_8h.html#a85cbc8e97baa65dc3b3c6bc124213c27">soap_mq_del</a>(queue, msg); <span class="comment">// delete message from queue</span></div><div class="line"> }</div><div class="line"> soap_destroy(&ctx);</div><div class="line"> soap_end(&ctx);</div><div class="line"> soap_done(&ctx);</div><div class="line"> }</div><div class="line"> <span class="comment">// as long as the queue is not empty and WS-RM sequence(s) not terminated, keep trying</span></div><div class="line"> <span class="keywordflow">while</span> ((msg = <a class="code" href="mq_8h.html#a5efe3ffa00498c5b01ccdb5d447cffa2">soap_mq_begin</a>(queue)) != NULL)</div><div class="line"> {</div><div class="line"> <span class="comment">// process queued WS-RM messages</span></div><div class="line"> <span class="keywordflow">for</span> (; msg != NULL; msg = <a class="code" href="mq_8h.html#aa7609d8ff2851cb2b0237710216039aa">soap_mq_next</a>(msg))</div><div class="line"> {</div><div class="line"> <span class="comment">// try next message in queue</span></div><div class="line"> soap_copy_context(&ctx, &msg->soap);</div><div class="line"> <span class="keywordflow">if</span> (!soap_begin_serve(&ctx) && !<a class="code" href="wsrmapi_8h.html#a0304e1219b99929fa576cf12114206eb">soap_wsrm_check</a>(&ctx))</div><div class="line"> {</div><div class="line"> <span class="comment">// check is OK, process message</span></div><div class="line"> soap_serve(&msg->soap);</div><div class="line"> <a class="code" href="mq_8h.html#a85cbc8e97baa65dc3b3c6bc124213c27">soap_mq_del</a>(queue, msg);</div><div class="line"> }</div><div class="line"> <span class="keywordflow">else</span> <span class="keywordflow">if</span> (ctx.error != SOAP_STOP)</div><div class="line"> <a class="code" href="mq_8h.html#a85cbc8e97baa65dc3b3c6bc124213c27">soap_mq_del</a>(queue, msg);</div><div class="line"> soap_destroy(&ctx);</div><div class="line"> soap_end(&ctx);</div><div class="line"> soap_done(&ctx);</div><div class="line"> }</div><div class="line"> sleep(1); <span class="comment">// sleep some before around we go again</span></div><div class="line"> }</div><div class="line"> <span class="keywordflow">return</span> NULL;</div><div class="line">}</div></div><!-- fragment --><p>In the first loop that runs over the messages received on the same keep-alive socket, the messages will be processed and services dispatched immediately for non-WS-RM messages and when the WS-RM check succeeds. This check is done in the server dispatch loop as shown, which means that WS-RM-based service operations SHOULD NOT call <a class="el" href="wsrmapi_8h.html#a0304e1219b99929fa576cf12114206eb" title="Receiver (server)-side check for the presence of WS-Addressing and WS-RM header blocks in the SOAP he...">soap_wsrm_check()</a> again. WS-RM messages that cannot be processed yet since they are out of the sequence order will remain in the queue.</p> <p>The second loop over the queued messages will retry to dispatch service operations according to the WS-RM message order as required by WS-RM NoDiscard sequence behavior. The loop will run until the queue is empty or when the WS-RM sequences are closed/terminated. </p> </div></div><!-- contents --> <hr class="footer"> <address class="footer"> Copyright (C) 2016, Robert van Engelen, Genivia Inc., All Rights Reserved. </address> <address class="footer"><small> Converted on Mon Dec 19 2016 18:22:50 by <a target="_blank" href="http://www.doxygen.org/index.html">Doxygen</a> 1.8.11</small></address> <br> <div style="height: 246px; background: #DBDBDB;"> </body> </html>