--[[ Deposit module. This module takes care of the complete deposit process (except for the back end storage, which is called here but defined in the repo module). The deposit process is carried out in several steps: - SIP generation (`generate_sip()`): scans the laundry list CSV and builds a temporary data structure with the found metadata; generates unique IDs for resources; infers some implicit relationships from the position of the CSV rows and folder layout; adds system-controlled metadata. - File staging (`deposit()`): scan through the generated SIP, identifies the files, calculates their checksums, and moves them to temporary storage; adds checksums to the metadata. TODO allow user-provided metadata and validation - Graph generation: generate an RDF graph for each resource in the SIP. - Permanent storage: push the RDF graph to permanent store (via functions in the `repo` module), which includes content model validation; if this succeeds, related files are also moved from the staging area to the archival store. - Cleanup (optional): if requested, the laundry list and resource folder are deleted from their original location. --]] local io = io local csv = require "ftcsv" local dir = require "pl.dir" local file = require "pl.file" local libmagic = require "libmagic" local path = require "pl.path" local pp = require "pl.pretty" local term = require "volksdata.term" local triple = require "volksdata.triple" local graph = require "volksdata.graph" local pkar = require "pocket_archive" local model = require "pocket_archive.model" local mc = require "pocket_archive.monocypher" local repo = require "pocket_archive.repo" local validator = require "pocket_archive.validator" local logger = pkar.logger -- "nil" table - for missing key fallback in chaining. local NT = {} -- Local path to URI mapping. For linking between newly created resources. local path_to_uri -- Track IDs in SIP to validate links created in a submission. local sip_ids -- Submission ID and name. local sub_id, sub_name -- Initialize libmagic database. local magic = libmagic.open(libmagic.MIME_TYPE, libmagic.NO_CHECK_COMPRESS ) assert(magic:load()) -- For idgen(). Makes a 60-character pool with ~5.9 bits of entropy per char. local chpool = {} for i = 48, 57 do table.insert(chpool, i) end -- 0-9 for i = 65, 90 do table.insert(chpool, i) end -- A-Z for i = 97, 122 do table.insert(chpool, i) end -- a-z --[[ Generate a random, reader-friendly ID. A 16-character ID with the above defined #chpool of 60 smybols has an entropy of 94.5 bits, which should be plenty for a medium-sized repository. ]] local function idgen(len) local charlist = {} for i = 1, (len or pkar.config.id.len) do table.insert(charlist, string.char(chpool[math.random(1, #chpool)])) end return table.concat(charlist) end local function generate_sip(ll_path) if not path.isfile(ll_path) then error(ll_path .. " is not a file.", 2) end -- Submission ID sticks to all the resources. sub_id = "sub:" .. idgen() sub_name = ll_path:match("pkar_submission[%-_%.](.*)%.csv") local sip = {root_path = path.dirname(ll_path)} path_to_uri = {} sip_ids = {} local tn_dir = path.join(sip.root_path, "proc", "tn") dir.makepath(tn_dir) local prev_id local i = 0 for row_n, row in csv.parseLine(ll_path) do local has_content for k, v in pairs(row) do -- Change "" to nil. if v == "" then row[k] = nil else has_content = true end end -- Skip empty lines. if not has_content then goto skip end logger:debug("Parsing row:", pp.write(row)) -- content_type is the only real mandatory entry. if row.content_type then i = i + 1 -- New row. logger:info( ("Processing LL resource #%d at row #%d.") :format(i, row_n)) sip[i] = { -- Normalize provided ID or generate random ID if not provided. id = "par:" .. (row.id or idgen()), sub_id = sub_id, } prev_id = row.id sip_ids[sip[i].id] = true -- Add to common sip ID set. for k, v in pairs(row) do if not v or k == "id" then goto cont1 end -- skip empty strings. if pkar.config.md.single_values[k] then sip[i][k] = v -- Multi-values are ordered in the SIP for further processing. else sip[i][k] = {v} end ::cont1:: end -- Add to path to URI map for later referencing. path_to_uri[row.source_path] = sip[i].id else -- Continuation of values from a previous row. if i < 1 then error("First row MUST have a path value.", 2) elseif not prev_id then error(("No path information at row %d"):format(i), 2) else for k, v in pairs(row) do if not v then goto cont2 end -- skip empty strings. if k == "id" or pkar.config.md.single_values[k] then error( ("On CSV row #%d: field %s is single-valued.") :format(row_n, k)) else logger:debug("Value: ", v) logger:debug("Inserting at row ", i - 1) table.insert(sip[i][k], v) end ::cont2:: end row.id = prev_id end end ::skip:: row_n = row_n + 1 end -- Infer structure from paths and row ordering. for i, v in ipairs(sip) do local rmod = model.types[v.content_type] --require "debugger".assert(rmod) local fpath = path.join(sip.root_path, v.source_path) --dbg.assert(rmod) v.has_member = v.has_member or {} -- Create implicit members from single-file artifact. if rmod.types.artifact and path.isfile(fpath) then local file_id = "par:" .. idgen() sip_ids[file_id] = true -- Insert file resource and move it into a new sub-folder. table.insert(sip, { content_type = rmod.default_fmodel or "file", id = file_id, sub_id = sub_id, label = path.basename(v.source_path), source_path = v.source_path, }) sip[i].has_file = file_id sip[i].pref_rep = file_id sip[i].source_path = nil goto skip end for j = i + 1, #sip do if sip[j].source_path:match("^" .. pkar.escape_ptn(v.source_path)) then local rel_path = sip[j].source_path:sub(#v.source_path + 2) logger:debug("rel_path: " .. rel_path) if not rel_path:match("/") then logger:debug(("Adding member %s to %s"):format( rel_path, v.source_path)) table.insert(v.has_member, sip[j].id) end end end ::skip:: end logger:debug("Parsed SIP: ", pp.write(sip)) return sip end --[[ Convert a SIP resource table to an in-memory Volksdata graph. --]] local function rsrc_to_graph(rsrc) local rmod = model.types[rsrc.content_type] logger:debug("Updating resource md: ", pp.write(rsrc)) local s = term.new_iriref_ns(rsrc.id) local gr = graph.new(nil) it = gr:add_init() for prop, v in pairs(rsrc) do if prop == "id" then goto skip end logger:debug(("Adding attribute: %s = %s"):format(prop, pp.write(v))) local p = model.id_to_uri[prop] if not p then logger:warn( ("Term %s has no URI mapped. Assigning `pas:%s`.") :format(prop, prop)) p = term.new_iriref_ns("pas:" .. prop) end local pconf = (rmod.properties or NT)[prop] or NT local rdf_type_str = pkar.config.md.datatypes[pconf.type] local rdf_type if rdf_type_str then rdf_type = term.new_iriref_ns(rdf_type_str).data end -- Force all fields to be multi-valued. if type(v) ~= "table" then v = {v} end -- Convert values to URIs. local o --if prop == "has_member" then dbg() end for i, vv in ipairs(v) do if prop == "content_type" then o = term.new_iriref_ns(rmod.uri) elseif prop == "sub_id" then o = term.new_iriref_ns(vv) elseif pconf.type == "resource" then -- "par:" could have been added previously. local rel_id = "par:" .. vv:gsub("^par:", "") if not sip_ids[rel_id] and not repo.gr:contains(triple.new( term.new_iriref_ns(rel_id), pkar.RDF_TYPE, term.new_iriref_ns("pas:Anything") )) then -- Convert local path to URIs. local uri = path_to_uri[vv] if not uri then error( ("Not a valid path: %s for property: %s on res: %s") :format(vv, prop, rsrc.id)) end v[i] = uri logger:debug("Converted path ".. vv .. " to URI: " .. uri) else v[i] = rel_id end --if not v[i]:find("^par:") then dbg() end o = term.new_iriref_ns(v[i]) elseif pconf.type == "ext_resource" then o = term.new_iriref(vv) else o = term.new_lit(vv, rdf_type) end it:add_iter(triple.new(s, p, o)) end -- Create implicit bricks for "has_member" property. if prop == "has_member" then local proxy_s for i, vv in ipairs(v) do -- Add linked list proxies. local brick_id = "par:" .. idgen() local brick_uri = term.new_iriref_ns(brick_id) sip_ids[brick_id] = true if i == 1 then proxy_s = s it:add_iter(triple.new( proxy_s, model.id_to_uri.first, brick_uri)) -- Add the "has member" property. it:add_iter(triple.new( s, term.new_iriref_ns(pconf.uri), term.new_iriref_ns(vv))) else it:add_iter(triple.new(proxy_s, model.id_to_uri.next, brick_uri)) end -- Add the reference. -- Add basic triples. for t in pairs(model.types.brick.types) do it:add_iter(triple.new( brick_uri, pkar.RDF_TYPE, model.id_to_uri[t] )) end it:add_iter(triple.new( brick_uri, model.id_to_uri.content_type, term.new_iriref_ns("pas:Brick"))) -- Add reference. it:add_iter(triple.new( brick_uri, term.new_iriref_ns("pas:ref"), term.new_iriref_ns(vv))) proxy_s = brick_uri end goto skip end ::skip:: end -- Add resource lineage triples. for i, m in ipairs(rmod.lineage) do it:add_iter(triple.new( s, pkar.RDF_TYPE, term.new_iriref_ns(model.types[m].uri))) end it:add_done() return gr, s end -- Submission module. local M = { idgen = idgen, reset_ores = function() if path.isdir(pkar.config.fs.ores_path) then logger:warn("Removing existing opaque resource store.") dir.rmtree(pkar.config.fs.ores_path) end dir.makepath(pkar.config.fs.ores_path) end, } M.deposit = function(ll_path, cleanup) local sip = generate_sip(ll_path) local tstamp for i, rsrc in ipairs(sip) do -- TODO Wrap this chunk into a txn. Each row is atomic. logger:debug(("Processing resource #%d of %d: %s"):format( i, #sip, rsrc.id)) local in_path, fext if not rsrc.source_path then goto continue end in_path = path.join(sip.root_path, rsrc.source_path) fext = path.extension(in_path) -- If it's a directory, skip file processing. if not path.isfile(in_path) then goto continue end do local tmp_dir = path.join(pkar.config.fs.ores_path, "tmp/") local file_ext _, file_ext = path.splitext(in_path) local tmp_path = tmp_dir .. rsrc.id .. file_ext dir.makepath(tmp_dir) local ifh = assert(io.open(in_path, "r")) rsrc.format = {magic:filehandle(ifh)} local hash_it = mc.new_blake2b() local fsize = 0 logger:debug("Hashing ", in_path) local ofh = assert(io.open(tmp_path, "w")) while true do chunk = ifh:read(pkar.config.fs.stream_chunk_size) if not chunk then break end hash_it:update(chunk) ofh:write(chunk) fsize = fsize + #chunk end local checksum = hash_it:final(true) rsrc.checksum = {"blake2:" .. checksum} rsrc.size = fsize ofh:close() ifh:close() -- Copy file and calculate checksum. local out_dir, out_path out_dir = path.join( pkar.config.fs.ores_path, checksum:sub(1, 2), checksum:sub(3, 4)) out_path = path.join(out_dir, checksum:sub(1,32) .. fext) dir.makepath(out_dir) logger:debug(("Moving file %s to %s"):format(tmp_path, out_path)) dir.movefile(tmp_path, out_path) rsrc.archive_path = out_path -- Copy thumbnail if existing. if rsrc.thumbnail then src_path = rsrc.thumbnail out_path = path.join( out_dir, path.basename(src_path)) logger:debug(("Moving file %s to %s"):format(src_path, out_path)) dir.movefile(src_path, out_path) rsrc.thumbnail = out_path end end ::continue:: tstamp = os.date("!%Y-%m-%dT%TZ") rsrc.submitted = tstamp rsrc.last_modified = tstamp local tmp_gr, s tmp_gr, s = rsrc_to_graph(rsrc) local val_report = validator.validate(tmp_gr, s) if val_report.max_level == "ERROR" then error( "Validation raised errors: " .. pp.write(val_report)) elseif val_report.max_level == "WARN" then logger:warn( "Validation raised warnings: " .. pp.write(val_report)) elseif val_report.max_level == "NOTICE" then logger:warn( "Validation raised notices: " .. pp.write(val_report)) end repo.store_updates(tmp_gr, s) logger:info("Stored: ", s.data) end -- Add triples for submission metadata directly to the stored graph. local it = repo.gr:add_init() local sub_uri = term.new_iriref_ns(sub_id) it:add_iter(triple.new( sub_uri, pkar.RDF_TYPE, term.new_iriref_ns("par:Submission") )) if sub_name then it:add_iter(triple.new( sub_uri, term.new_iriref_ns("rdfs:label"), term.new_lit(sub_name) )) end tstamp = os.date("!%Y-%m-%dT%TZ") it:add_iter(triple.new( sub_uri, model.id_to_uri.submitted, term.new_lit(tstamp, "xsd:dateTime", nil, true) )) it:add_iter(triple.new( sub_uri, model.id_to_uri.last_modified, term.new_lit(tstamp, "xsd:dateTime", nil, true) )) it:add_done() -- Remove processing directory. local proc_dir = path.join(sip.root_path, "proc") if path.isdir(proc_dir) then dir.rmtree(proc_dir) end if cleanup then -- Gather all top-level directories and delete them. rsrc_paths = {} for i, rsrc in ipairs(sip) do rsrc_paths[rsrc.source_path:match("[^/]+")] = true end for tlf in pairs(rsrc_paths) do local target = path.join(sip.root_path, tlf) logger:info("Cleaning up: " .. target) if path.isdir(target) then dir.rmtree(target) elseif path.isfile(target) then file.delete(target) end end logger:info("Cleaning up: " .. ll_path) file.delete(ll_path) end end return M