:- use_module(library(tipc/tipc)). :- use_module(library(unix)). /* * An example of a connectionless transaction oriented service */ ping :- tipc_socket(S, dgram), tipc_bind(S, name_seq(18888,17,17), scope(zone)), ping_loop(S). ping_loop(S) :- tipc_receive(S, Data, From, [as(codes)]), % format('from: ~w data: ~s~n', [From, Data]), tipc_send(S, Data, From, []), ping_loop(S). test :- tipc_service_exists(name_seq(18888,17,17)), format('sending 1 million pings...~n', []), tipc_socket(S, dgram), tipc_send(S, "now is the time for all good men...aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", name(18888,17,0), []), tipc_receive(S, Data, From, [as(codes)]), % From, will be his port-id. Use it from now on forall(between(1,1000000, _X), ( tipc_send(S, Data, From, []), tipc_receive(S, Data, From, [as(codes)]) )), tipc_close_socket(S). test :- format('you must do a "start_ping", first~n'), !, fail. start_ping :- thread_create(ping, _, [alias(ping_server), detached(true)]). /* * An example of a connection oriented byte stream stream service */ server(S, In, Out) :- tipc_socket(S, stream), tipc_bind(S, name(20000,21,0), scope(zone)), tipc_listen(S, 5), tipc_open_socket(S, Accept, _), call_cleanup(dispatch(Accept, In, Out), tipc_close_socket(S)). dispatch(Accept, In, Out) :- tipc_accept(Accept, Socket, Peer), format('peer: ~w~n', [Peer]), tipc_open_socket(Socket, In, Out), ( loopback(In, Out); writeln('dispatch exiting...')). loopback(In, Out) :- catch(\+at_end_of_stream(In), Error, (writeln(Error), fail)), read_pending_codes(In, Codes, []), format(Out, '~s', [Codes]), flush_output(Out), loopback(In, Out). /* * everything that you write on Out will appear as input on In */ client(S, In,Out) :- tipc_service_exists(name(20000,21,0)), format('everything that you write on Out will appear on In~n', []), tipc_socket(S, stream), tipc_connect(S, name(20000,21,0)), tipc_open_socket(S, In, Out). client(_S, _In, _Out) :- format('you must do a "start_server/0", first~n'), fail. start_server :- thread_create(server(_S, _In, _Out), _, [alias(loopback_server), detached(true)]). /* * An example of a connectionless datagram service using a multi-cast * service topology */ server_mcast(Lower, Upper) :- tipc_socket(S, rdm), tipc_bind(S,name_seq(20001, Lower, Upper), scope(zone)), call_cleanup(forall(repeat, ( tipc_receive(S, Data, From, [as(codes)]), thread_self(Self), format('thread: ~w rcvd: ~s from: ~w~n', [Self, Data, From]) )), tipc_close_socket(S)). start_mcast :- thread_create(server_mcast(0,99), _, [alias(first), detached(true)]), thread_create(server_mcast(100,199), _, [alias(second), detached(true)]), thread_create(server_mcast(200,299), _, [alias(third), detached(true)]), thread_create(server_mcast(300,399), _, [alias(fourth), detached(true)]). /* * Specify which servers are to receive the message by varying Lower * and Upper. */ client_mcast(Lower, Upper) :- tipc_service_exists(name_seq(20001, 0, 400)), tipc_socket(S, rdm), format(codes(Msg), 'message to {~d, ~d, ~d}', [18888, Lower, Upper]), tipc_send(S, Msg, mcast(20001, Lower, Upper), []), tipc_close_socket(S), !. client_mcast( _Lower, _Upper) :- format('you must start the servers via "start_mcast/0", first.~n'), fail. /* * Here's a more complex usage of the subscription service * A collection of connection-oriented byte-stream services is started, * then a client uses the topology server to talk to each one, in turn. */ bind_server(name_seq(Service, Lower, Upper), _Timeout) :- tipc_socket(S, seqpacket), tipc_bind(S, name_seq(Service, Lower, Upper), scope(node)), tipc_get_name(S, PortId), thread_self(Myself), format('~w: ~w~n', [Myself, PortId]), tipc_listen(S, 5), tipc_open_socket(S, AcceptFd, _), tipc_accept(AcceptFd, S1, From), tipc_open_socket(S1, In, Out), format('~w: connected and waiting for data~n', [Myself]), \+at_end_of_stream(In), read_pending_codes(In, Data, []), format('~w: received: "~s" from: ~w~n', [Myself, Data, From]), format(Out, 'goodbye from ~w', [Myself]), close(In), close(Out), tipc_close_socket(S). new_server(_Alias, NameSeq, Timeout) :- thread_create(bind_server(NameSeq, Timeout), _, [ detached(true)]). start_subscription_servers :- new_server(server1, name_seq(20002, 6, 53), 20), new_server(server2, name_seq(20002, 3, 5), 25), new_server(server3, name_seq(20002, 54, 55), 30), new_server(server4, name_seq(20002, 56,60), 35), format('now use "tipc-config -nt=20002" to see the name table~n'). send_greetings(PortId) :- tipc_socket(S1, seqpacket), tipc_connect(S1, PortId), tipc_open_socket(S1, In1, Out1), format(Out1, "hello", []), flush_output(Out1), \+at_end_of_stream(In1), read_pending_codes(In1, Codes, []), format('client rvcd: ~s~n', [Codes]), close(In1), close(Out1). % % greetEm(published(_NameSeq, PortId)) :- send_greetings(PortId). greetEm(X) :- writeln(X). subscribe_demo :- % new hotness tipc_service_exists(name_seq(20002, 0, 100)), tipc_service_port_monitor([name_seq(20002, 0, 100)], greetEm, 0.5), !. subscribe_demo :- format('you must start the servers using "start_subscription_servers/0", first.~n'), fail. /* */ % TIPC quicksort by Rosenwald, from Keysey, from Bratko, (with % apologies to Hoare!) % % A really good example of how NOT to do this sort of thing. It does % however, have the advantage of creating lots of worker % processes, allowing us to observe TIPC's distribution of client % requests among a collection of servers. lesser(X, Y) :- Y @< X. /* * The original algorithm */ quicksort([], []). quicksort([Pivot | More], Sorted) :- partition(lesser(Pivot), More, Smalls, Bigs), quicksort(Smalls, SortedSmalls), quicksort(Bigs, SortedBigs), append(SortedSmalls, [Pivot | SortedBigs], Sorted). /* * This is the same algorithm, decomposed into a multi-server * parallel form. * * CAUTION: This is a toy application that can be broken quite easily. */ portray(term(X)) :- writeq(X), writeln(.). tipc_quicksort([], []) :- !. tipc_quicksort(Data, Sorted) :- % tipc_service_exists(name(20003,23,0)), tipc_socket(S, stream), catch(tipc_connect(S, name(20003,23,0)), Err, (writeln(Err), tipc_close_socket(S), !, fail)), tipc_open_socket(S, In, Out), print(Out, term(Data)), % output a readable term close(Out), read(In, Sorted), close(In), !. tipc_quicksort(_, _) :- format('you must start the servers using "start_quicksort_server/0", first~n'), fail. /* */ start_quicksort_server :- tipc_service_exists(name(20003,23,0)), format('service already exists~n'), !. start_quicksort_server :- forall(between(1,16, _), thread_create(tipc_quicksort_server, _, [detached(true)])), format('do a "tipc-config -nt=20003", and you will see 16 servers bound to name(20003,23,0)~n'). tipc_quicksort_server :- tipc_socket(S, stream), tipc_bind(S, name(20003,23,0), scope(node)), tipc_listen(S, 5), tipc_open_socket(S, AcceptFd, _), call_cleanup(dispatch(AcceptFd), % doesn't exit, except on error tipc_close_socket(S)). dispatch(AcceptFd) :- tipc_accept(AcceptFd, S1, _Peer), tipc_qs_daemon(S1), dispatch(AcceptFd). tipc_qs_daemon(S1) :- tipc_open_socket(S1, In, Out), read(In, [Pivot | Data]), close(In), partition(lesser(Pivot), Data, Smalls, Bigs), tipc_quicksort(Smalls, SortedSmalls), tipc_quicksort(Bigs, SortedBigs), append(SortedSmalls, [Pivot | SortedBigs], Sorted), print(Out, term(Sorted)), close(Out). /* * */ server_half(S) :- tipc_receive(S, Data, From, [as(codes)]), format('server received: ~s from: ~w~n', [Data, From]), sleep(1), tipc_send(S, "goodbye!", From, []), tipc_close_socket(S). child_half(S1) :- tipc_get_name(S1, PortId), tipc_close_socket(S1), tipc_socket(S, rdm), tipc_send(S, "hello", PortId, []), tipc_receive(S, Data, From, [as(codes)]), format('child_received: ~s from: ~w~n', [Data, From]), tipc_close_socket(S), halt. two_way_fork_demo(ExitStatus) :- tipc_socket(S, rdm), fork(Pid), ( (Pid == child) -> child_half(S); (server_half(S), wait(Pid, ExitStatus))). server_half1(S) :- repeat, tipc_receive(S, Data, _From, [as(codes)]), format('server receives: ~s~n', [Data]), Data == "end_of_file", tipc_close_socket(S). child_half1(S) :- tipc_get_name(S, PortId), forall(between(1,2000, X), ( format(codes(Data), 'message ~d', [X]), tipc_send(S, Data, PortId, []) )), tipc_send(S, "end_of_file", PortId, []), tipc_close_socket(S), halt. one_way_fork_demo(ExitStatus) :- tipc_socket(S, dgram), fork(Pid), ( (Pid == child) -> child_half1(S); (process_wait(Pid, ExitStatus), server_half1(S))), !.