--[[ Deposit module. This module takes care of the complete deposit process (except for the back end storage, which is called here but defined in the repo module). The deposit process is carried out in several steps: - SIP generation (`generate_sip()`): scans the laundry list CSV and builds a temporary data structure with the found metadata; generates unique IDs for resources; infers some implicit relationships from the position of the CSV rows and folder layout; adds system-controlled metadata. - File staging (`deposit()`): scan through the generated SIP, identifies the files, calculates their checksums, and moves them to temporary storage; adds checksums to the metadata. TODO allow user-provided metadata and validation - Graph generation: generate an RDF graph for each resource in the SIP. - Permanent storage: push the RDF graph to permanent store (via functions in the `repo` module), which includes content model validation; if this succeeds, related files are also moved from the staging area to the archival store. - Cleanup (optional): if requested, the laundry list and resource folder are deleted from their original location. --]] local io = io local csv = require "ftcsv" local dir = require "pl.dir" local file = require "pl.file" local libmagic = require "libmagic" local path = require "pl.path" local pp = require "pl.pretty" local term = require "volksdata.term" local triple = require "volksdata.triple" local graph = require "volksdata.graph" local pkar = require "pocket_archive" local model = require "pocket_archive.model" local mc = require "pocket_archive.monocypher" local repo = require "pocket_archive.repo" local validator = require "pocket_archive.validator" local logger = pkar.logger local dbg = require "debugger" -- "nil" table - for missing key fallback in chaining. local NT = {} -- Local path to URI mapping. For linking between newly created resources. local path_to_uri -- Initialize libmagic database. local magic = libmagic.open(libmagic.MIME_TYPE, libmagic.NO_CHECK_COMPRESS ) assert(magic:load()) -- For idgen(). Makes a 60-character pool with ~5.9 bits of entropy per char. local chpool = {} for i = 48, 57 do table.insert(chpool, i) end -- 0-9 for i = 65, 90 do table.insert(chpool, i) end -- A-Z for i = 97, 122 do table.insert(chpool, i) end -- a-z --[[ Generate a random, reader-friendly ID. A 16-character ID with the above defined #chpool of 60 smybols has an entropy of 94.5 bits, which should be plenty for a medium-sized repository. ]] local function idgen(len) local charlist = {} for i = 1, (len or pkar.config.id.len) do table.insert(charlist, string.char(chpool[math.random(1, #chpool)])) end return table.concat(charlist) end local function generate_sip(ll_path) local sip = {root_path = path.dirname(ll_path)} path_to_uri = {} local tn_dir = path.join(sip.root_path, "proc", "tn") dir.makepath(tn_dir) local prev_path local i = 0 for row_n, row in csv.parseLine(ll_path) do local has_content for k, v in pairs(row) do -- Change "" to nil. if v == "" then row[k] = nil else has_content = true end end -- Skip empty lines. if not has_content then goto skip end logger:debug("Row path: ", row.source_path or "") logger:debug("Parsing row:", pp.write(row)) if row.source_path then i = i + 1 logger:info( ("Processing LL resource #%d at row #%d.") :format(i, row_n)) prev_path = row.source_path -- New row. sip[i] = {id = "par:" .. idgen()} -- Add to path to URI map for later referencing. path_to_uri[row.source_path] = sip[i].id for k, v in pairs(row) do if not v then goto cont1 end -- skip empty strings. if pkar.config.md.single_values[k] then sip[i][k] = v -- Multi-values are ordered in the SIP for further processing. else sip[i][k] = {v} end ::cont1:: end else -- Continuation of values from a previous row. if i < 1 then error("First row MUST have a path value.", 2) elseif not prev_path then error(("No path information at row %d"):format(i), 2) else for k, v in pairs(row) do if not v then goto cont2 end -- skip empty strings. if pkar.config.md.single_values[k] then -- It doesn't make much sense to overwrite, maybe throw an error? error( ("On CSV row #%d: field %s is single-valued.") :format(row_n, k)) else logger:debug("Value: ", v) logger:debug("Inserting at row ", i - 1) table.insert(sip[i][k], v) end ::cont2:: end row.source_path = prev_path end end ::skip:: row_n = row_n + 1 end -- Infer structure from paths and row ordering. for i, v in ipairs(sip) do local rmod = model.types[v.content_type] dbg.assert(v.source_path) local fpath = path.join(sip.root_path, v.source_path) --dbg.assert(rmod) v.has_member = v.has_member or {} -- Create implicit members from single-file artifact. if rmod.types.artifact and path.isfile(fpath) then local file_id = "par:" .. idgen() -- Insert file resource and move it into a new sub-folder. table.insert(sip, { content_type = rmod.default_fmodel or "file", id = file_id, label = path.basename(v.source_path), source_path = v.source_path, }) sip[i].has_file = file_id sip[i].pref_rep = file_id sip[i].source_path = nil goto skip end for j = i + 1, #sip do if sip[j].source_path:match("^" .. pkar.escape_ptn(v.source_path)) then local rel_path = sip[j].source_path:sub(#v.source_path + 2) logger:debug("rel_path: " .. rel_path) if not rel_path:match("/") then logger:debug(("Adding member %s to %s"):format( rel_path, v.source_path)) table.insert(v.has_member, sip[j].id) end end end ::skip:: end return sip end --[[ Convert a SIP resource table to an in-memory Volksdata graph. --]] local function rsrc_to_graph(rsrc) local rmod = model.types[rsrc.content_type] logger:info("Updating resource md: ", pp.write(rsrc)) local s = term.new_iriref_ns(rsrc.id) local gr = graph.new(nil) it = gr:add_init() for prop, v in pairs(rsrc) do if prop == "id" then goto skip end logger:debug(("Adding attribute: %s = %s"):format(prop, pp.write(v))) local p = model.id_to_uri[prop] if not p then logger:warn( ("Term %s has no URI mapped. Assigning `pas:%s`.") :format(prop, prop)) p = term.new_iriref_ns("pas:" .. prop) end local pconf = (rmod.properties or NT)[prop] or NT local rdf_type_str = pkar.config.md.datatypes[pconf.type] local rdf_type if rdf_type_str then rdf_type = term.new_iriref_ns(rdf_type_str).data end -- Force all fields to be multi-valued. if type(v) ~= "table" then v = {v} end -- Convert values to URIs. local o --if prop == "has_member" then dbg() end for i, vv in ipairs(v) do if prop == "content_type" then o = term.new_iriref_ns(rmod.uri) elseif pconf.type == "resource" then if not vv:match("^[a-z]*:") then -- Convert local path to URIs. v[i] = assert(path_to_uri[vv]) end o = term.new_iriref_ns(v[i]) elseif pconf.type == "ext_resource" then o = term.new_iriref(vv) else o = term.new_lit(vv, rdf_type) end it:add_iter(triple.new(s, p, o)) end -- Create implicit bricks for "has_member" property. if prop == "has_member" then local proxy_s for i, vv in ipairs(v) do -- Add linked list proxies. local brick_uri = term.new_iriref_ns("par:" .. idgen()) if i == 1 then proxy_s = s it:add_iter(triple.new( proxy_s, model.id_to_uri.first, brick_uri)) -- Add the "has member" property. it:add_iter(triple.new( s, term.new_iriref_ns(pconf.uri), term.new_iriref_ns(vv))) else it:add_iter(triple.new(proxy_s, model.id_to_uri.next, brick_uri)) end -- Add the reference. -- Add basic triples. for t in pairs(model.types.brick.types) do it:add_iter(triple.new( brick_uri, pkar.RDF_TYPE, model.id_to_uri[t] )) end it:add_iter(triple.new( brick_uri, model.id_to_uri.content_type, term.new_iriref_ns("pas:Brick"))) -- Add reference. it:add_iter(triple.new( brick_uri, term.new_iriref_ns("pas:ref"), term.new_iriref_ns(vv))) proxy_s = brick_uri end goto skip end ::skip:: end for i, m in ipairs(rmod.lineage) do it:add_iter(triple.new( s, pkar.RDF_TYPE, term.new_iriref_ns(model.types[m].uri))) end it:add_done() return gr, s end -- Submission module. local M = {} M.deposit = function(ll_path, cleanup) local sip = generate_sip(ll_path) for i, rsrc in ipairs(sip) do -- TODO Wrap this chunk into a txn. Each row is atomic. logger:debug(("Processing resource #%d of %d: %s"):format( i, #sip, rsrc.id)) local in_path, fext if not rsrc.source_path then goto continue end in_path = path.join(sip.root_path, rsrc.source_path) fext = path.extension(in_path) -- If it's a directory, skip file processing. if not path.isfile(in_path) then goto continue end do local tmp_dir = path.join(pkar.config.fs.ores_path, "tmp/") local file_ext _, file_ext = path.splitext(in_path) local tmp_path = tmp_dir .. rsrc.id .. file_ext dir.makepath(tmp_dir) local ifh = assert(io.open(in_path, "r")) rsrc.format = {magic:filehandle(ifh)} local hash_it = mc.new_blake2b() local fsize = 0 logger:debug("Hashing ", in_path) local ofh = assert(io.open(tmp_path, "w")) while true do chunk = ifh:read(pkar.config.fs.stream_chunk_size) if not chunk then break end hash_it:update(chunk) ofh:write(chunk) fsize = fsize + #chunk end local checksum = hash_it:final(true) rsrc.checksum = {"urn:blake2:" .. checksum} rsrc.size = fsize ofh:close() ifh:close() -- Copy file and calculate checksum. local out_dir, out_path out_dir = path.join( pkar.config.fs.ores_path, checksum:sub(1, 2), checksum:sub(3, 4)) out_path = path.join(out_dir, checksum:sub(1,32) .. fext) dir.makepath(out_dir) logger:debug(("Moving file %s to %s"):format(tmp_path, out_path)) dir.movefile(tmp_path, out_path) rsrc.archive_path = out_path -- Copy thumbnail if existing. if rsrc.thumbnail then src_path = rsrc.thumbnail out_path = path.join( out_dir, path.basename(src_path)) logger:debug(("Moving file %s to %s"):format(src_path, out_path)) dir.movefile(src_path, out_path) rsrc.thumbnail = out_path end end ::continue:: tstamp = os.date("!%Y-%m-%dT%TZ") rsrc.submitted = tstamp rsrc.last_modified = tstamp local tmp_gr, s tmp_gr, s = rsrc_to_graph(rsrc) local val_report = validator.validate(tmp_gr, s) if val_report.max_level == "ERROR" then error( "Validation raised errors: " .. pp.write(val_report)) elseif val_report.max_level == "WARN" then logger:warn( "Validation raised warnings: " .. pp.write(val_report)) elseif val_report.max_level == "NOTICE" then logger:warn( "Validation raised notices: " .. pp.write(val_report)) end repo.store_updates(tmp_gr, s) end -- Remove processing directory. dir.rmtree(path.join(sip.root_path, "proc")) if cleanup then -- Gather all top-level directories and delete them. rsrc_paths = {} for i, rsrc in ipairs(sip) do rsrc_paths[rsrc.source_path:match("[^/]+")] = true end for tlf in pairs(rsrc_paths) do local target = path.join(sip.root_path, tlf) logger:info("Cleaning up: " .. target) if path.isdir(target) then dir.rmtree(target) elseif path.isfile(target) then file.delete(target) end end logger:info("Cleaning up: " .. ll_path) file.delete(ll_path) end end return M