/* Part of SWISH Author: Jan Wielemaker E-mail: J.Wielemaker@cs.vu.nl WWW: http://www.swi-prolog.org Copyright (C): 2017, VU University Amsterdam CWI Amsterdam All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ :- module(swish_data_source, [ data_source/2, % :Id, +Source data_record/2, % :Id, -Record record/2, % :Id, -Record data_property/2, % :Id, ?Property data_row/2, % :Id, -Row data_row/4, % :Id, +Range, +Header, -Row data_dump/3, % :Id, +Range, -Row data_flush/1, % +Hash 'data assert'/1, % +Term 'data materialized'/3, % +Hash, +Signature, +SourceID 'data failed'/2 % +Hash, +Signature ]). :- use_module(library(error)). :- use_module(library(lists)). :- use_module(library(settings)). :- use_module(library(solution_sequences)). :- use_module(library(pengines)). :- setting(max_memory, integer, 8000, "Max memory used for cached data store (Mb)"). /** Cached data access This module provides access to external data by caching it as a Prolog predicate. The data itself is kept in a global data module, so it is maintained over a SWISH Pengine invocation. */ :- meta_predicate data_source(:, +), data_record(:, -), record(:, -), data_row(:, -), data_row(:, +, +, -), data_dump(:, +, -), data_property(:, -). :- multifile source/2. % +Term, -Goal /******************************* * ADMIN DATA * *******************************/ :- dynamic data_source_db/3, % Hash, Goal, Lock data_signature_db/2, % Hash, Signature data_materialized/5, % Hash, Materialized, SourceID, CPU, Wall data_last_access/3. % Hash, Time, Updates 'data assert'(Term) :- assertz(Term). %! 'data materialized'(+Hash, +Signature, +SourceVersion) is det. % % Called by a data plugin to indicate that loading the data has % finished. % % @arg Hash is the has of the original data source % @arg Signature is a term Hash(Arg1, Arg2, ...), where `Arg1`, ... % are atoms or small integers that indicate the field names. % @arg SourceVersion is a term that indicates the identity of the source. % this is typically a dict containing e.g., a time stamp, content % hash, HTTP =Etag= value, etc. 'data materialized'(Hash, Signature, SourceVersion) :- statistics(cputime, CPU1), get_time(Now), nb_current('$data_source_materalize', stats(Time0, CPU0)), CPU is CPU1 - CPU0, Wall is Now - Time0, assertz(data_signature_db(Hash, Signature)), assertz(data_materialized(Hash, Now, SourceVersion, CPU, Wall)). 'data failed'(_Hash, Signature) :- functor(Signature, Name, Arity), functor(Generic, Name, Arity), retractall(Generic). %! data_source(:Id, +Source) is det. % % Create a data source Id from the source definition Source. Source % definitions are plugin files loaded from swish(data). data_source(M:Id, Source) :- variant_sha1(Source, Hash), data_source_db(Hash, Source, _), !, ( clause(M:'$data'(Id, Hash), true) -> true ; assertz(M:'$data'(Id, Hash)) ). data_source(M:Id, Source) :- valid_source(Source), variant_sha1(Source, Hash), mutex_create(Lock), assertz(data_source_db(Hash, Source, Lock)), assertz(M:'$data'(Id, Hash)). %! record(:Id, -Record) is nondet. %! data_record(:Id, -Record) is nondet. % % True when Record is a dict representing a row in the dataset % identified by Id. % % @deprecated record/2 is deprecated. New code should use % data_record/2. record(Id, Record) :- data_record(Id, Record). data_record(M:Id, Record) :- data_hash(M:Id, Hash), materialize(Hash), data_signature_db(Hash, Signature), data_record(Signature, Id, Record, Head), call(Head). data_record(Signature, Tag, Record, Head) :- Signature =.. [Name|Keys], pairs_keys_values(Pairs, Keys, Values), dict_pairs(Record, Tag, Pairs), Head =.. [Name|Values]. data_hash(M:Id, Hash) :- clause(M:'$data'(Id, Hash), true), !. data_hash(_:Id, _) :- existence_error(dataset, Id). %! data_row(:Id, -Row) is nondet. %! data_row(:Id, +Range, +Header, -Row) is nondet. % % True when Row is a term Id(Arg, ...), where the first row contains % the column names. % % @arg Header If `true`, include a header row. % @see data_dump/3 to return a table and for a description of Range. data_row(Id, Row) :- data_row(Id, all, true, Row). data_row(M:Id, Range, Header, Row) :- must_be(boolean, Header), data_hash(M:Id, Hash), materialize(Hash), data_signature_db(Hash, Signature), Signature =.. [_|ColNames], same_length(ColNames, Vars), Goal =.. [Hash|Vars], Row =.. [Id|Vars], ( Header == true, Vars = ColNames ; range(Range, M:Id, Goal) ). range(all, _Id, Goal) :- !, call(Goal). range(From-To, _Id, Goal) :- !, Skip is From - 1, Size is To-Skip, limit(Size, offset(Skip, call(Goal))). range(Limit, _Id, Goal) :- Limit >= 0, !, limit(Limit, call(Goal)). range(Limit, Id, Goal) :- Limit < 0, data_property(Id, rows(Rows)), Skip is Rows+Limit, offset(Skip, call(Goal)). %! data_dump(:Id, +Range, -Table) is det. % % Table is a list of rows in the indicated range. This cooperates with % the table rendering to produce a data table. Range is one of: % % - all % All rows from the data are included. Be careful if these % are many as it is likely to make your browser very slow. % - From-To % List the (1-based) rows From to To % - Count % If Count >= 0, list the _first_, else list the _last_ % Count rows. data_dump(Id, Range, Table) :- findall(Row, data_row(Id, Range, true, Row), Table). %! data_property(:Id, ?Property) is nondet. % % True when Property is a known property about the data source Id. % Defined properties are: % % - columns(-Count) % Number of columns in the table. % - column_names(-Names) % Names is a list of the column names as they appear in the % data. % - rows(-Rows) % Number of rows in the table % - hash(-Hash) % Get the internal (hashed) identifier for the data source % - source_version(-SourceVersion) % A term (often a dict) that provides version information % about the source. Details depend on the source. % - materialized(-TimeStamp) % The data source was materialized at TimeStamp. % - source(-Term) % Description of the original source term used to declare % the data source data_property(M:Id, Property) :- data_hash(M:Id, Hash), materialize(Hash), property(Property), property(Property, Hash). property(columns(_)). property(column_names(_)). property(rows(_)). property(hash(_)). property(source_version(_)). property(materialized(_)). property(source(_)). property(columns(Count), Hash) :- data_signature_db(Hash, Signature), functor(Signature, _, Count). property(column_names(Names), Hash) :- data_signature_db(Hash, Signature), Signature =.. [_|Names]. property(rows(Count), Hash) :- data_signature_db(Hash, Signature), predicate_property(Signature, number_of_clauses(Count)). property(hash(Hash), Hash). property(source_version(SourceVersion), Hash) :- data_materialized(Hash, _, SourceVersion, _, _). property(materialized(TimeStamp), Hash) :- data_materialized(Hash, TimeStamp, _, _, _). property(source(SourceTerm), Hash) :- data_source_db(Hash, SourceTerm, _Lock). %! swish:goal_expansion(+Dict, -DataGoal) % % Translate a Dict where the tag is the identifier of a data source % and the keys are columns pf this source into a goal on the data. % Note that the data itself is represented as a Prolog predicate, % representing each row as a fact and each column as an argument. :- multifile swish:goal_expansion/2. swish:goal_expansion(Dict, swish_data_source:Head) :- is_dict(Dict, Id), prolog_load_context(module, M), clause(M:'$data'(Id, Hash), true), materialize(Hash), data_signature_db(Hash, Signature), data_record(Signature, Id, Record, Head), Dict :< Record. /******************************* * DATA MANAGEMENT * *******************************/ valid_source(Source) :- must_be(nonvar, Source), source(Source, _Goal), !. valid_source(Source) :- existence_error(data_source, Source). %! materialize(+Hash) % % Materialise the data identified by Hash. The materialization goal % should % % - Call 'data assert'/1 using a term Hash(Arg, ...) for each term % to add to the database. % - Call 'data materialized'(Hash, Signature, SourceVersion) on % completion, where `Signature` is a term Hash(ArgName, ...) and % `SourceVersion` indicates the version info provided by the % source. Use `-` if this information is not available. % - OR call `data failed`(+Hash, +Signature) if materialization % fails after some data has been asserted. materialize(Hash) :- must_be(atom, Hash), data_materialized(Hash, _When, _From, _CPU, _Wall), !, update_last_access(Hash). materialize(Hash) :- data_source_db(Hash, Source, Lock), update_last_access(Hash), gc_data, with_mutex(Lock, materialize_sync(Hash, Source)). materialize_sync(Hash, _Source) :- data_materialized(Hash, _When, _From, _CPU, _Wall), !. materialize_sync(Hash, Source) :- source(Source, Goal), get_time(Time0), statistics(cputime, CPU0), setup_call_cleanup( b_setval('$data_source_materalize', stats(Time0, CPU0)), call(Goal, Hash), nb_delete('$data_source_materalize')), data_signature_db(Hash, Head), functor(Head, Name, Arity), public(Name/Arity). /******************************* * GC * *******************************/ %! update_last_access(+Hash) is det. % % Update the last known access time. The value is rounded down to 1 % minute to reduce database updates. update_last_access(Hash) :- get_time(Now), Rounded is floor(Now/60)*60, ( data_last_access(Hash, Rounded, _) -> true ; clause(data_last_access(Hash, _, C0), true, Old) -> C is C0+1, asserta(data_last_access(Hash, Rounded, C)), erase(Old) ; asserta(data_last_access(Hash, Rounded, 1)) ). gc_stats(Hash, _{ hash:Hash, materialized:When, cpu:CPU, wall:Wall, bytes:Size, last_accessed_ago:Ago, access_frequency:AccessCount }) :- data_materialized(Hash, When, _From, CPU, Wall), data_signature_db(Hash, Signature), data_last_access(Hash, Last, AccessCount), get_time(Now), Ago is floor(Now/60)*60-Last, predicate_property(Signature, number_of_clauses(Count)), functor(Signature, _, Arity), Size is (88+(16*Arity))*Count. %! gc_data is det. %! gc_data(+MaxSize) is det. % % Remove the last unused data set until memory of this module drops % below MaxSize. The predicate gc_data/0 is called before % materializing a data source. gc_data :- setting(max_memory, MB), Bytes is MB*1024*1024, gc_data(Bytes), set_module(program_space(Bytes)). gc_data(MaxSize) :- module_property(swish_data_source, program_size(Size)), Size < MaxSize, !. gc_data(MaxSize) :- findall(Stat, gc_stats(_, Stat), Stats), sort(last_accessed_ago, >=, Stats, ByTime), member(Stat, ByTime), data_flush(ByTime.hash), module_property(swish_data_source, program_size(Size)), Size < MaxSize, !. gc_data(_). %! data_flush(+Hash) % % Drop the data associated with hash data_flush(Hash) :- data_signature_db(Hash, Signature), data_record(Signature, _Id, _Record, Head), retractall(Head), retractall(data_signature_db(Hash, Head)), retractall(data_materialized(Hash, _When1, _From, _CPU, _Wall)), retractall(data_last_access(Hash, _When2, _Count)). /******************************* * SANDBOX * *******************************/ :- multifile sandbox:safe_meta/2. sandbox:safe_meta(swish_data_source:data_source(Id,_), []) :- safe_id(Id). sandbox:safe_meta(swish_data_source:data_record(Id,_), []) :- safe_id(Id). sandbox:safe_meta(swish_data_source:record(Id,_), []) :- safe_id(Id). sandbox:safe_meta(swish_data_source:data_row(Id,_), []) :- safe_id(Id). sandbox:safe_meta(swish_data_source:data_row(Id,_,_,_), []) :- safe_id(Id). sandbox:safe_meta(swish_data_source:data_dump(Id,_,_), []) :- safe_id(Id). sandbox:safe_meta(swish_data_source:data_property(Id,_), []) :- safe_id(Id). safe_id(M:_) :- !, pengine_self(M). safe_id(_).