/* This file is part of ClioPatria. Author: HTTP: http://e-culture.multimedian.nl/ GITWEB: http://gollem.science.uva.nl/git/ClioPatria.git GIT: git://gollem.science.uva.nl/home/git/ClioPatria.git GIT: http://gollem.science.uva.nl/home/git/ClioPatria.git Copyright: 2007, E-Culture/MultimediaN ClioPatria is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 2 of the License, or (at your option) any later version. ClioPatria is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with ClioPatria. If not, see . */ :- module(http_replay, [ http_replay/2 % +Log, +Options ]). :- use_module(http_cookie). :- use_module(library(debug)). :- use_module(library(time)). :- use_module(library(gensym)). :- use_module(library(aggregate)). :- use_module(library(option)). /** Replay HTTP logfiles to stress-test the server @tbd Provide more concurrency @tbd Manage 'not modified' */ %% http_replay(+LogFile, +Options) is det. % % Replay requests from LogFile. Options include % % * host(Server) % Replay on the indicated server instead of the host named % in the request % % * port(Port) % Port on which to access the server. % % * prefix(Prefix) % Remove prefix from paths. Same as prefix(Prefix, ''). % % * prefix(Old, New) % Replace path prefix Old by New % % * concurrent(Count) % Concurrency level (default: 1) % % * count(+Count) % Process (at most) Count records. http_replay(Log, Options) :- flag(http_processed, _, 0), flag(http_bytes, _, 0), option(count(Count), Options, -1), setup_call_cleanup( start_dispatchers(Options), setup_call_cleanup( open(Log, read, In, [encoding(utf8)]), ( read(In, T0), replay(T0, In, Count) ), close(In)), join_dispatchers). replay(end_of_file, _, _) :- !. replay(_, _, 0) :- !. replay(Term, In, Count0) :- ( dispatch(Term) -> true ; format(user_error, 'FAILED: Replay ~q~n', [Term]) ), read_log(In, Term2), Count1 is Count0 - 1, replay(Term2, In, Count1). read_log(In, Term) :- read_term(In, Term, [syntax_errors(dec10)]). /******************************* * DISPATCHERS * *******************************/ :- dynamic dispatcher/2, % ThreadID, Queue session_on/2, % Session --> ThreadID id_on/2. % RequestID --> ThreadID queue_size(10). dispatch(Term) :- dispatcher_for(Term, Thread, Id, Why), ( Thread == all -> forall(dispatcher(_,Q), thread_send_message(Q, Term)) ; Thread == none -> true ; dispatcher(Thread, Queue), get_time(T0), thread_send_message(Queue, Term), get_time(T1), T is T1-T0, debug(replay, 'Sending ~D to ~w (~w; waited ~3f sec)', [Id, Thread, Why, T]) ). dispatcher_for(quit, all, -, quit) :- !. dispatcher_for(server(_,_), all, -, server) :- !, retractall(session_on(_,_)), retractall(id_on(_,_)). dispatcher_for(request(Id, _Time, Request), Target, Id, session) :- memberchk(session(Session), Request), session_on(Session, Target), !, asserta(id_on(Id, Target)). dispatcher_for(request(Id, _Time, _Request), Target, Id, new) :- !, get_time(T0), State = s(nowait), repeat, aggregate(min(Waiting, Target), waiting(Target, Waiting), min(Waiting, Target)), ( queue_size(Max), Waiting >= Max -> debug(replay_drain, 'All queues are full; waiting', []), sleep(0.01), nb_setarg(1, State, wait), fail ; !, ( arg(1, State, wait) -> get_time(T1), T is T1 - T0, debug(replay, 'Waited ~3f sec for queues to drain', [T]) ; true ) ), asserta(id_on(Id, Target)). dispatcher_for(completed(Id, _TimeUsed, _Bytes, _Code, _Reply), Target, Id, completed) :- retract(id_on(Id, Target)), !. dispatcher_for(_, none, -, none). waiting(Target, Waiting) :- dispatcher(Target, Queue), message_queue_property(Queue, size(Waiting)). start_dispatchers(Options) :- option(concurrent(N), Options, 1), forall(between(1, N, I), ( atom_concat(dispatcher_, I, Id), message_queue_create(Queue, [max_size(10000)]), thread_create(process_event(Queue, Options), _, [alias(Id)]), assertz(dispatcher(Id, Queue)) )). process_event(Queue, Options) :- repeat, thread_get_message(Queue, Message), replay_term(Message, Options), Message == quit, !. join_dispatchers :- dispatch(quit), forall(retract(dispatcher(Id, Queue)), ( thread_join(Id, _), message_queue_destroy(Queue))). replay_term(server(_StartStop, _Time), _Options) :- !, join_all. replay_term(request(Id, _Time, Request), Options) :- !, request(Id, Request, Options). replay_term(completed(Id, _TimeUsed, _Bytes, _Code, Reply), Options) :- !, completed(Id, Reply, Options). replay_term(Term, _Options) :- debug(replay, 'Unknown log term: ~p~n', [Term]). /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - Restrictions * If requests are in paralel, run them in parallel - Run request in thread - If `completed', do a join on the thread * Manage session cookies - Use session-id as client-id - Run all - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ %% request(+Id, +Request, +Options) is det. % % Re-sent a request to the server. request(Id, Request, Options) :- memberchk(method(Method), Request), ok_method(Method), !, ( memberchk(session(Session), Request) -> true ; Session = (-) ), url_parts(Request, Parts, Options), request_options(Request, ROptions), thread_create(make_request(Id, Session, Method, Parts, ROptions), TID, []), assert(thread_map(Id, TID)). request(Id, Request, _Options) :- memberchk(method(Method), Request), format(user_error, 'Request ~w using method ~q is not supported~n', [Id, Method]). ok_method(get). ok_method(head). :- thread_local session_map/2, % LogSession, Client thread_map/2. % RequestID, Thread make_request(Id, Session, Method, Parts, Options) :- call_with_time_limit( 300, make_request2(Id, Session, Method, Parts, Options)). make_request2(Id, Session, Method, Parts, Options) :- ( session_map(Session, ClientId) -> IsNew = old ; IsNew = new, gensym(client, ClientId) ), memberchk(path(Path), Parts), debug(replay, 'Request ~w for ~q on ~w client ~w', [Id, Path, IsNew, ClientId]), get_time(Now), open_null_stream(Dest), call_cleanup(http_get(ClientId, Parts, _Reply, [ to(stream(Dest)), method(Method), cert_verify_hook(http_replay:ssl_verify) | Options ]), Reason, done(Path, Reason, Now, Dest)), ( IsNew == new, http_current_cookie(ClientId, swipl_session, Session, _) -> debug(replay, 'Using client ~w on session ~w~n', [ClientId, Session]), assert(session_map(Session, ClientId)) ; true ). ssl_verify(_SSL, _ProblemCertificate, _AllCertificates, _FirstCertificate, _Error). done(Path, Reason, T0, Dest) :- get_time(Now), Time is Now-T0, byte_count(Dest, Count), progress(Count), close(Dest), debug(replay, '~w: (~w) got ~D bytes in ~3f sec', [Path, Reason, Count, Time]). progress(Count) :- flag(http_processed, Requests, Requests+1), flag(http_bytes, Bytes, Bytes+Count), ( Count mod 1000 =:= 0 -> format(user_error, '\rProcessed: ~`.t ~D~25|~t~D~40|', [Requests, Bytes]) ; true ). %% url_parts(+Request, -Parts, +Options) is det. % % Create a new request from the log-entry and Options. url_parts(Request, [ method(Method), host(Host), port(Port), path(Path), scheme(Proto) | Parts ], Options) :- option(scheme(Proto), Options, http), memberchk(method(Method), Request), memberchk(path(Path0), Request), map_path(Path0, Path, Options), ( memberchk(host(Host), Options) -> true ; memberchk(host(Host), Request) ), ( memberchk(port(Port), Options) -> true ; memberchk(port(Port), Request) -> true ; Port = 80 ), more_parts(Request, Parts). more_parts([], []). more_parts([H|T0], [H|T]) :- cp_part(H), !, more_parts(T0, T). more_parts([_|T0], T) :- more_parts(T0, T). cp_part(search(_)). map_path(Path0, Path, Options) :- memberchk(prefix(Prefix), Options), atom_concat(Prefix, Path, Path0), !. map_path(Path0, Path, Options) :- memberchk(prefix(Old, New), Options), atom_concat(Old, Path1, Path0), !, atom_concat(New, Path1, Path). map_path(Path, Path, _). %% request_options(+Request, -Options) is det. % % Extract additional options for the query from the request. % Currently, this extracts possible range-options. Future versions % may also pass the Accept options. request_options(Request, [range(Range)]) :- memberchk(range(Range), Request), !. request_options(_, []). %% completed(+Id, +Reply, +Options) % % Wait for the completion of request Id. completed(Id, _Reply, _Options) :- retract(thread_map(Id, TID)), ( catch(thread_join(TID, State), _, fail) -> debug(replay, 'Request ~d ended: ~w', [Id, State]) ; debug(replay, 'Skipped ~d', [Id]) ). %% join_all is det. % % Join all pending threads. join_all :- current_thread(TID, _State), retract(thread_map(Id, TID)), !, thread_join(TID, State), debug(replay, 'Request ~w ended: ~w', [Id, State]), join_all. join_all.