% This LaTeX document was generated using the LaTeX backend of PlDoc, % The SWI-Prolog documentation system \section{library(stomp): STOMP client.} \label{sec:stomp} \begin{tags} \tag{author} Hongxin Liang and Jan Wielemaker\mtag{See also}- \url{http://stomp.github.io/index.html} \\- \url{https://github.com/jasonrbriggs/stomp.py} \tag{license} BSD-2 \tag{To be done} TSL support \end{tags} This module provides a STOMP (Simple (or Streaming) Text Orientated Messaging Protocol) client. This client is based on work by Hongxin Liang. The current version is a major rewrite, both changing the API and the low-level STOMP frame (de)serialization. The predicate \predref{stomp_connection}{5} is used to register a connection. The connection is established by \predref{stomp_connect}{1}, which is lazily called from any of the predicates that send a STOMP frame. After establishing the connection two threads are created. One receives STOMP frames and the other manages and watches the \textit{heart beat}. \subsection{Threading} \label{sec:stomp-threading} Upon receiving a frame the callback registered with \predref{stomp_connection}{5} is called in the context of the receiving thread. More demanding applications may decide to send incomming frames to a SWI-Prolog message queue and have one or more \textit{worker threads} processing the input. Alternatively, frames may be inspected by the receiving thread and either processed immediately or be dispatched to either new or running threads. The best scenario depends on the message rate, processing time and concurrency of the Prolog application. All message sending predicates of this library are \textit{thread safe}. If two threads send a frame to the same connection the library ensures that both frames are properly serialized. \subsection{Reconnecting} \label{sec:stomp-reconnecting} By default this library tries to establish the connection and the user gets notified by means of a \const{disconnected} pseudo frame that the connection is lost. Using the Options argument of \predref{stomp_connection}{6} the system can be configured to try and keep connecting if the server is not available and reconnect if the connection is lost. See the \file{pong.pl} example.\vspace{0.7cm} \begin{description} \predicate[det]{stomp_connection}{5}{+Address, +Host, +Headers, :Callback, -Connection} \nodescription \predicate[det]{stomp_connection}{6}{+Address, +Host, +Headers, :Callback, -Connection, +Options} Create a connection reference. The connection is not set up yet by this predicate. \arg{Callback} is called on any received frame except for \textit{heart beat} frames as below. \begin{code} call(Callback, Command, Connection, Header, Body) \end{code} Where command is one of the commands below. \arg{Header} is a dict holding the STOMP frame header, where all values are strings except for the \verb$'content-length'$ key value which is passed as an integer. Body is a string or, if the data is of the type \verb$application/json$, a dict. \begin{description} \termitem{connected}{} A connection was established. \arg{Connection} and Header are valid. \termitem{disconnected}{} The connection was lost. Only \arg{Connection} is valid. \termitem{message}{} A message arrived. All three arguments are valid. Body is a dict if the \verb$content-type$ of the message is \verb$application/json$ and a string otherwise. \termitem{heartbeat}{} A heartbeat was received. Only \arg{Connection} is valid. This callback is also called for each newline that follows a frame. These newlines can be a heartbeat, but can also be additional newlines that follow a frame. \termitem{heartbeat_timeout}{} No heartbeat was received. Only \arg{Connection} is valid. \termitem{error}{} An error happened. All three arguments are valid and handled as \const{message}. \end{description} Note that \predref{stomp_teardown}{1} causes the receiving and heartbeat thread to be signalled with \predref{abort}{0}. \arg{Options} processed: \begin{description} \termitem{reconnect}{+Bool} Try to reestablish the connection to the server if the connection is lost. Default is \const{false} \termitem{connect_timeout}{+Seconds} Maximum time to try and reestablish a connection. The default is \verb$600$ (10 minutes). \termitem{json_options}{+Options} \arg{Options} passed to \predref{json_read_dict}{3} to translate \verb$application/json$ content to Prolog. Default is \verb$[]$. \end{description} \begin{arguments} \arg{Address} & is a valid address for \predref{tcp_connect}{3}. Normally a term \arg{Host}:Port, e.g., \verb$localhost:32772$. \\ \arg{Host} & is a path denoting the STOMP host. Often just \verb$/$. \\ \arg{Headers} & is a dict with STOMP headers used for the \verb$CONNECT$ request. \\ \arg{Connection} & is an opaque ground term that identifies the connection. \\ \end{arguments} \predicate[nondet]{stomp_connection_property}{2}{?Connection, ?Property} True when \arg{Property}, is a property of \arg{Connection}. Defined properties are: \begin{description} \termitem{address}{Address} \termitem{callback}{Callback} \termitem{host}{Host} \termitem{headers}{Headers} \termitem{reconnect}{Bool} \termitem{connect_timeout}{Seconds} All the above properties result from the \predref{stomp_connection}{6} registration. \termitem{receiver_thread_id}{Thread} \termitem{stream}{Stream} \termitem{heartbeat_thread_id}{Thread} \termitem{received_heartbeat}{TimeStamp} These describe an active STOMP connection. \end{description} \predicate{stomp_destroy_connection}{1}{+Connection} Destroy a connection. If it is active, first use \predref{stomp_teardown}{1} to disconnect. \predicate[det]{stomp_setup}{2}{+Connection, +Options} Set up the actual socket connection and start receiving thread. This is a no-op if the connection has already been created. The \arg{Options} processed are below. Other options are passed to \predref{tcp_connect}{3}. \begin{description} \termitem{timeout}{+Seconds} If \predref{tcp_connect}{3} fails, retry until the timeout is reached. If \arg{Seconds} is \const{inf} or \const{infinite}, keep retrying forever. \end{description} \predicate[semidet]{stomp_teardown}{1}{+Connection} Tear down the socket connection, stop receiving thread and heartbeat thread (if applicable). The registration of the connection as created by \predref{stomp_connection}{5} is preserved and the connection may be reconnected using \predref{stomp_connect}{1}. \predicate[det]{stomp_reconnect}{1}{+Connection} Teardown the connection and try to reconnect. \predicate[det]{stomp_connect}{1}{+Connection} \nodescription \predicate[det]{stomp_connect}{2}{+Connection, +Options} Setup the connection. First ensures a socket connection and if this is new send a \verb$CONNECT$ frame. Protocol version and heartbeat negotiation will be handled. \verb$STOMP$ frame is not used for backward compatibility. This predicate waits for the connection handshake to have completed. Currently it waits for a maximum of 10 seconds after establishing the socket for the server reply. Calling this on an established connection has no effect. \begin{tags} \tag{Errors} \verb$stomp_error(connect, Connection, Detail)$ if no connection could be established. \tag{See also} \url{http://stomp.github.io/stomp-specification-1.2.html\#CONNECT_or_STOMP_Frame).} \end{tags} \predicate[det]{stomp_send}{4}{+Connection, +Destination, +Headers, +Body} Send a \verb$SEND$ frame. If \verb$content-type$ is not provided, \verb$text/plain$ will be used. \verb$content-length$ will be filled in automatically. \begin{tags} \tag{See also} \url{http://stomp.github.io/stomp-specification-1.2.html\#SEND} \end{tags} \predicate[det]{stomp_send_json}{4}{+Connection, +Destination, +Headers, +JSON} Send a \verb$SEND$ frame. \verb$JSON$ can be either a \arg{JSON} term or a dict. \verb$content-type$ is filled in automatically as \verb$application/json$ and \verb$content-length$ will be filled in automatically as well. \begin{tags} \tag{See also} \url{http://stomp.github.io/stomp-specification-1.2.html\#SEND} \end{tags} \predicate[det]{stomp_subscribe}{4}{+Connection, +Destination, +Id, +Headers} Send a \verb$SUBSCRIBE$ frame. \begin{tags} \tag{See also} \url{http://stomp.github.io/stomp-specification-1.2.html\#SUBSCRIBE} \end{tags} \predicate[det]{stomp_unsubscribe}{2}{+Connection, +Id} Send an \verb$UNSUBSCRIBE$ frame. \begin{tags} \tag{See also} \url{http://stomp.github.io/stomp-specification-1.2.html\#UNSUBSCRIBE} \end{tags} \predicate[det]{stomp_ack}{3}{+Connection, +MessageId, +Headers} Send an \verb$ACK$ frame. See \predref{stomp_ack}{2} for simply passing the header received with the message we acknowledge. \begin{tags} \tag{See also} \url{http://stomp.github.io/stomp-specification-1.2.html\#ACK} \end{tags} \predicate[det]{stomp_nack}{3}{+Connection, +MessageId, +Headers} Send a \verb$NACK$ frame. See \predref{stomp_nack}{2} for simply passing the header received with the message we acknowledge. \begin{tags} \tag{See also} \url{http://stomp.github.io/stomp-specification-1.2.html\#NACK} \end{tags} \predicate[det]{stomp_ack}{2}{+Connection, +MsgHeader} \nodescription \predicate[det]{stomp_nack}{2}{+Connection, +MsgHeader} Reply with an ACK or NACK based on the received message header. On a STOMP 1.1 request we get an \const{ack} field in the header and reply with an \const{id}. For STOMP 1.2 we reply with the \verb$message-id$ and \const{subscription} that we received with the message. \predicate[det]{stomp_begin}{2}{+Connection, +Transaction} Send a \verb$BEGIN$ frame. @see \url{http://stomp.github.io/stomp-specification-1.2.html\#BEGIN} \predicate[det]{stomp_commit}{2}{+Connection, +Transaction} Send a \verb$COMMIT$ frame. \begin{tags} \tag{See also} \url{http://stomp.github.io/stomp-specification-1.2.html\#COMMIT} \end{tags} \predicate[det]{stomp_abort}{2}{+Connection, +Transaction} Send a \verb$ABORT$ frame. \begin{tags} \tag{See also} \url{http://stomp.github.io/stomp-specification-1.2.html\#ABORT} \end{tags} \predicate[semidet]{stomp_transaction}{2}{+Connection, :Goal} Run \arg{Goal} as \predref{once}{1}, tagging all \verb$SEND$ messages inside the transaction with the transaction id. If \arg{Goal} fails or raises an exception the transaction is aborted. Failure or exceptions cause the transaction to be aborted using \predref{stomp_abort}{2}, after which the result is forwarded. \predicate[det]{stomp_disconnect}{2}{+Connection, +Headers} Send a \verb$DISCONNECT$ frame. If the connection has the \const{reconnect} property set to \const{true}, this will be reset to \const{disconnected} to avoid reconnecting. A subsequent \predref{stomp_connect}{2} resets the reconnect status. \begin{tags} \tag{See also} \url{http://stomp.github.io/stomp-specification-1.2.html\#DISCONNECT} \end{tags} \end{description}