% This LaTeX document was generated using the LaTeX backend of PlDoc, % The SWI-Prolog documentation system \section{library(redis_streams): Using Redis streams} \label{sec:redisstreams} \begin{tags} \tag{See also} \url{https://redis.io/topics/streams-intro} \end{tags} A Redis stream is a set of messages consisting of key-value pairs that are identified by a time and sequence number. Streams are powerful objects that can roughly be used for three purposes: \begin{itemize} \item Maintain and query a log of events, i.e., a \textit{timeline}. \item Provide an alternative to Redis' publish/subscribe API that ensures messages get delivered by all clients even if they are offline at the moment an event is published. \item Distribute messages over a group of clients. This mode assigns messages to clients in a round-robin fashion. Clients confirm a specific message is handled. Living clients can inspect the stream for possibly dead clients and migrate the pending messages to other clients. \end{itemize} This library abstracts the latter two scenarios. The main predicates are \begin{itemize} \item \predref{xadd}{4} to add to a stream \item \predref{xlisten}{3} to read and broadcast messages from a stream \item \predref{xlisten_group}{5} to act as a \textit{consumer} in a consumer group. \end{itemize} \vspace{0.7cm} \begin{description} \predicate{xstream_set}{3}{+Redis, +Key, +Option} Set an option on for \arg{Key} on \arg{Redis}. Currently supports: \begin{description} \termitem{maxlen}{+Count} Make \predref{xadd}{4} add a \verb$MAXLEN ~$ \arg{Count} option to the \verb$XADD$ command, capping the length of the stream. See also \arg{Redis} as a message brokering system (\secref{redis-brokering}) \end{description} \predicate[det]{xadd}{4}{+Redis, +Key, ?Id, +Data:dict} Add a message to a the stream \arg{Key} on \arg{Redis}. The length of the stream can be capped using the \predref{xstream_set}{3} option \verb$maxlen(Count)$. If \arg{Id} is unbound, generating the id is left to the server and \arg{Id} is unified with the returned id. The returned id is a string consisting of the time stamp in milliseconds and a sequence number. See \arg{Redis} docs for details. \predicate{xlisten}{3}{+Redis, +Streams, +Options} Listen using \verb$XREAD$ on one or more \arg{Streams} on the server \arg{Redis}. For each message that arrives, call \predref{broadcast}{1}, where Data is a dict representing the message. \begin{code} broadcast(redis(Redis, Stream, Id, Data)) \end{code} \arg{Options}: \begin{description} \termitem{count}{+Count} Process at most \arg{Count} messages per stream for each request. \termitem{start}{+Start} Normally either \verb$0$ to start get all messages from the epoch or \verb|$| to get messages starting with the last. Default is \verb|$|. \termitem{starts}{+List} May be used as an alternative to the \predref{start}{1} option to specify the start for each stream. This may be used to restart listening if the application remembers the last processed id. \end{description} Note that this predicate does \textbf{not terminate}. It is normally executed in a thread. The following call listens to the streams \const{key1} and \const{key2} on the default \arg{Redis} server. Using \verb$reconnect(true)$, the client will try to re-establish a connection if the collection got lost. \begin{code} ?- redis_connect(default, C, [reconnect(true)]), thread_create(xlisten(C, [key1, key2], [start($)]), _, [detached(true)]). \end{code} \begin{arguments} \arg{Redis} & is either a \arg{Redis} server name (see \predref{redis_server}{3}) or an open connection. If it is a server name, a new connection is opened that is closed if \predref{xlisten}{3} completes. \\ \end{arguments} \begin{tags} \tag{See also} \predref{redis_subscribe}{2} implements the classical pub/sub system of \arg{Redis} that does not have any memory. \end{tags} \predicate{xlisten_group}{5}{+Redis, +Group, +Consumer, +Streams, +Options} Listen as \arg{Consumer} to \arg{Group}. This is similar to \predref{xlisten}{3}, with the following differences: \begin{itemize} \item Instead of using \predref{broadcast}{1}, \predref{broadcast_request}{1} is used and the message is only considered processed if \predref{broadcast_request}{1} succeeds. If the message is handled with success, an \verb$XACK$ is sent to the server. \end{itemize} \arg{Options} processed: \begin{description} \termitem{block}{+Seconds} Causes \verb$XREADGROUP$ to return with timeout when no messages arrive within \arg{Seconds}. On a timeout, \predref{xidle_group}{5} is called which will try to handle messages to other consumers pending longer than \arg{Seconds}. Choosing the time depends on the application. Notably: \begin{itemize} \item Using a time shorter than the required processing time will make the job migrate from consumer to consumer until \verb$max_deliveries(Count)$ is exceeded. Note that the original receiver does not notice that the job is claimed and thus multiple consumers may ultimately answer the message. \item Using a too long time causes an unnecessarily long delay if a node fails. \end{itemize} \termitem{max_deliveries}{+Count} Re-deliver (using \verb$XCLAIM$) a message max \arg{Count} times. Exceeding this calls \predref{xhook}{2}. Default \arg{Count} is \verb$3$. \termitem{max_claim}{+Count} Do not claim more than \arg{Count} messages during a single idle action. Default is \verb$10$. \end{description} \predicate{xconsumer_stop}{1}{+Leave} May be called from a consumer listener to stop the consumer. This predicate throws the exception \verb$redis(stop(Leave))$, which is caught by \predref{xlisten_group}{5}. \predicate[multifile]{xhook}{2}{+Stream, +Event} This multifile predicate is called on certain stream events. Defined events are: \begin{description} \termitem{delivery_failed}{Id, Group, Delivered} A message was delivered more than specified by \predref{max_deliveries}{1} of \predref{xlisten_group}{5}. \arg{Id} is the message id, \arg{Group} the group and \arg{Delivered} the current delivery count. If the hooks fails, the message is acknowledged using \verb$XACK$. From \href{https://redis.io/topics/streams-intro}{introduction to streams}: \begin{quote} "So once the deliveries counter reaches a given large number that you chose, it is probably wiser to put such messages in another stream and send a notification to the system administrator. This is basically the way that Redis streams implement the concept of the dead letter." \end{quote} \end{description} \end{description}