123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402 |
- --[[ Deposit module.
- This module takes care of the complete deposit process (except for the back
- end storage, which is called here but defined in the repo module).
- The deposit process is carried out in several steps:
- - SIP generation (`generate_sip()`): scans the laundry list CSV and builds a
- temporary data structure with the found metadata; generates unique IDs for
- resources; infers some implicit relationships from the position of the CSV
- rows and folder layout; adds system-controlled metadata.
- - File staging (`deposit()`): scan through the generated SIP, identifies the
- files, calculates their checksums, and moves them to temporary storage; adds
- checksums to the metadata. TODO allow user-provided metadata and validation
- - Graph generation: generate an RDF graph for each resource in the SIP.
- - Permanent storage: push the RDF graph to permanent store (via functions in
- the `repo` module), which includes content model validation; if this
- succeeds, related files are also moved from the staging area to the archival
- store.
- - Cleanup (optional): if requested, the laundry list and resource folder are
- deleted from their original location.
- --]]
- local io = io
- local csv = require "ftcsv"
- local dir = require "pl.dir"
- local file = require "pl.file"
- local libmagic = require "libmagic"
- local path = require "pl.path"
- local pp = require "pl.pretty"
- local term = require "volksdata.term"
- local triple = require "volksdata.triple"
- local graph = require "volksdata.graph"
- local pkar = require "pocket_archive"
- local model = require "pocket_archive.model"
- local mc = require "pocket_archive.monocypher"
- local repo = require "pocket_archive.repo"
- local validator = require "pocket_archive.validator"
- local logger = pkar.logger
- local dbg = require "debugger"
- -- "nil" table - for missing key fallback in chaining.
- local NT = {}
- -- Local path to URI mapping. For linking between newly created resources.
- local path_to_uri
- -- Initialize libmagic database.
- local magic = libmagic.open(libmagic.MIME_TYPE, libmagic.NO_CHECK_COMPRESS )
- assert(magic:load())
- -- For idgen(). Makes a 60-character pool with ~5.9 bits of entropy per char.
- local chpool = {}
- for i = 48, 57 do table.insert(chpool, i) end -- 0-9
- for i = 65, 90 do table.insert(chpool, i) end -- A-Z
- for i = 97, 122 do table.insert(chpool, i) end -- a-z
- --[[
- Generate a random, reader-friendly ID.
- A 16-character ID with the above defined #chpool of 60 smybols has an entropy
- of 94.5 bits, which should be plenty for a medium-sized repository.
- ]]
- local function idgen(len)
- local charlist = {}
- for i = 1, (len or pkar.config.id.len) do
- table.insert(charlist, string.char(chpool[math.random(1, #chpool)]))
- end
- return table.concat(charlist)
- end
- local function generate_sip(ll_path)
- local sip = {root_path = path.dirname(ll_path)}
- path_to_uri = {}
- local tn_dir = path.join(sip.root_path, "proc", "tn")
- dir.makepath(tn_dir)
- local prev_path
- local i = 0
- for row_n, row in csv.parseLine(ll_path) do
- local has_content
- for k, v in pairs(row) do
- -- Change "" to nil.
- if v == "" then row[k] = nil
- else has_content = true end
- end
- -- Skip empty lines.
- if not has_content then goto skip end
- logger:debug("Row path: ", row.source_path or "")
- logger:debug("Parsing row:", pp.write(row))
- if row.source_path then
- i = i + 1
- logger:info(
- ("Processing LL resource #%d at row #%d.")
- :format(i, row_n))
- prev_path = row.source_path
- -- New row.
- sip[i] = {id = "par:" .. idgen()}
- -- Add to path to URI map for later referencing.
- path_to_uri[row.source_path] = sip[i].id
- for k, v in pairs(row) do
- if not v then goto cont1 end -- skip empty strings.
- if pkar.config.md.single_values[k] then sip[i][k] = v
- -- Multi-values are ordered in the SIP for further processing.
- else sip[i][k] = {v} end
- ::cont1::
- end
- else
- -- Continuation of values from a previous row.
- if i < 1 then
- error("First row MUST have a path value.", 2)
- elseif not prev_path then
- error(("No path information at row %d"):format(i), 2)
- else
- for k, v in pairs(row) do
- if not v then goto cont2 end -- skip empty strings.
- if pkar.config.md.single_values[k] then
- -- It doesn't make much sense to overwrite, maybe throw an error?
- error(
- ("On CSV row #%d: field %s is single-valued.")
- :format(row_n, k))
- else
- logger:debug("Value: ", v)
- logger:debug("Inserting at row ", i - 1)
- table.insert(sip[i][k], v)
- end
- ::cont2::
- end
- row.source_path = prev_path
- end
- end
- ::skip::
- row_n = row_n + 1
- end
- -- Infer structure from paths and row ordering.
- for i, v in ipairs(sip) do
- local rmod = model.types[v.content_type]
- dbg.assert(v.source_path)
- local fpath = path.join(sip.root_path, v.source_path)
- --dbg.assert(rmod)
- v.has_member = v.has_member or {}
- -- Create implicit members from single-file artifact.
- if rmod.types.artifact and path.isfile(fpath) then
- local file_id = "par:" .. idgen()
- -- Insert file resource and move it into a new sub-folder.
- table.insert(sip, {
- content_type = rmod.default_fmodel or "file",
- id = file_id,
- label = path.basename(v.source_path),
- source_path = v.source_path,
- })
- sip[i].has_file = file_id
- sip[i].pref_rep = file_id
- sip[i].source_path = nil
- goto skip
- end
- for j = i + 1, #sip do
- if sip[j].source_path:match("^" .. pkar.escape_ptn(v.source_path))
- then
- local rel_path = sip[j].source_path:sub(#v.source_path + 2)
- logger:debug("rel_path: " .. rel_path)
- if not rel_path:match("/") then
- logger:debug(("Adding member %s to %s"):format(
- rel_path, v.source_path))
- table.insert(v.has_member, sip[j].id)
- end
- end
- end
- ::skip::
- end
- return sip
- end
- --[[ Convert a SIP resource table to an in-memory Volksdata graph.
- --]]
- local function rsrc_to_graph(rsrc)
- local rmod = model.types[rsrc.content_type]
- logger:info("Updating resource md: ", pp.write(rsrc))
- local s = term.new_iriref_ns(rsrc.id)
- local gr = graph.new(nil)
- it = gr:add_init()
- for prop, v in pairs(rsrc) do
- if prop == "id" then goto skip end
- logger:debug(("Adding attribute: %s = %s"):format(prop, pp.write(v)))
- local p = model.id_to_uri[prop]
- if not p then
- logger:warn(
- ("Term %s has no URI mapped. Assigning `pas:%s`.")
- :format(prop, prop))
- p = term.new_iriref_ns("pas:" .. prop)
- end
- local pconf = (rmod.properties or NT)[prop] or NT
- local rdf_type_str = pkar.config.md.datatypes[pconf.type]
- local rdf_type
- if rdf_type_str then
- rdf_type = term.new_iriref_ns(rdf_type_str).data
- end
- -- Force all fields to be multi-valued.
- if type(v) ~= "table" then v = {v} end
- -- Convert values to URIs.
- local o
- --if prop == "has_member" then dbg() end
- for i, vv in ipairs(v) do
- if prop == "content_type" then
- o = term.new_iriref_ns(rmod.uri)
- elseif pconf.type == "resource" then
- if not vv:match("^[a-z]*:") then
- -- Convert local path to URIs.
- v[i] = assert(path_to_uri[vv]) end
- o = term.new_iriref_ns(v[i])
- elseif pconf.type == "ext_resource" then
- o = term.new_iriref(vv)
- else o = term.new_lit(vv, rdf_type) end
- it:add_iter(triple.new(s, p, o))
- end
- -- Create implicit bricks for "has_member" property.
- if prop == "has_member" then
- local proxy_s
- for i, vv in ipairs(v) do
- -- Add linked list proxies.
- local brick_uri = term.new_iriref_ns("par:" .. idgen())
- if i == 1 then
- proxy_s = s
- it:add_iter(triple.new(
- proxy_s, model.id_to_uri.first, brick_uri))
- -- Add the "has member" property.
- it:add_iter(triple.new(
- s,
- term.new_iriref_ns(pconf.uri),
- term.new_iriref_ns(vv)))
- else
- it:add_iter(triple.new(proxy_s, model.id_to_uri.next, brick_uri))
- end
- -- Add the reference.
- -- Add basic triples.
- for t in pairs(model.types.brick.types) do
- it:add_iter(triple.new(
- brick_uri,
- pkar.RDF_TYPE,
- model.id_to_uri[t]
- ))
- end
- it:add_iter(triple.new(
- brick_uri,
- model.id_to_uri.content_type,
- term.new_iriref_ns("pas:Brick")))
- -- Add reference.
- it:add_iter(triple.new(
- brick_uri,
- term.new_iriref_ns("pas:ref"),
- term.new_iriref_ns(vv)))
- proxy_s = brick_uri
- end
- goto skip
- end
- ::skip::
- end
- for i, m in ipairs(rmod.lineage) do
- it:add_iter(triple.new(
- s, pkar.RDF_TYPE,
- term.new_iriref_ns(model.types[m].uri)))
- end
- it:add_done()
- return gr, s
- end
- -- Submission module.
- local M = {}
- M.deposit = function(ll_path, cleanup)
- local sip = generate_sip(ll_path)
- for i, rsrc in ipairs(sip) do
- -- TODO Wrap this chunk into a txn. Each row is atomic.
- logger:debug(("Processing resource #%d of %d: %s"):format(
- i, #sip, rsrc.id))
- local in_path, fext
- if not rsrc.source_path then goto continue end
- in_path = path.join(sip.root_path, rsrc.source_path)
- fext = path.extension(in_path)
- -- If it's a directory, skip file processing.
- if not path.isfile(in_path) then goto continue end
- do
- local tmp_dir = path.join(pkar.config.fs.ores_path, "tmp/")
- local file_ext
- _, file_ext = path.splitext(in_path)
- local tmp_path = tmp_dir .. rsrc.id .. file_ext
- dir.makepath(tmp_dir)
- local ifh = assert(io.open(in_path, "r"))
- rsrc.format = {magic:filehandle(ifh)}
- local hash_it = mc.new_blake2b()
- local fsize = 0
- logger:debug("Hashing ", in_path)
- local ofh = assert(io.open(tmp_path, "w"))
- while true do
- chunk = ifh:read(pkar.config.fs.stream_chunk_size)
- if not chunk then break end
- hash_it:update(chunk)
- ofh:write(chunk)
- fsize = fsize + #chunk
- end
- local checksum = hash_it:final(true)
- rsrc.checksum = {"urn:blake2:" .. checksum}
- rsrc.size = fsize
- ofh:close()
- ifh:close()
- -- Copy file and calculate checksum.
- local out_dir, out_path
- out_dir = path.join(
- pkar.config.fs.ores_path,
- checksum:sub(1, 2),
- checksum:sub(3, 4))
- out_path = path.join(out_dir, checksum:sub(1,32) .. fext)
- dir.makepath(out_dir)
- logger:debug(("Moving file %s to %s"):format(tmp_path, out_path))
- dir.movefile(tmp_path, out_path)
- rsrc.archive_path = out_path
- -- Copy thumbnail if existing.
- if rsrc.thumbnail then
- src_path = rsrc.thumbnail
- out_path = path.join(
- out_dir, path.basename(src_path))
- logger:debug(("Moving file %s to %s"):format(src_path, out_path))
- dir.movefile(src_path, out_path)
- rsrc.thumbnail = out_path
- end
- end
- ::continue::
- tstamp = os.date("!%Y-%m-%dT%TZ")
- rsrc.submitted = tstamp
- rsrc.last_modified = tstamp
- local tmp_gr, s
- tmp_gr, s = rsrc_to_graph(rsrc)
- local val_report = validator.validate(tmp_gr, s)
- if val_report.max_level == "ERROR" then error(
- "Validation raised errors: " .. pp.write(val_report))
- elseif val_report.max_level == "WARN" then logger:warn(
- "Validation raised warnings: " .. pp.write(val_report))
- elseif val_report.max_level == "NOTICE" then logger:warn(
- "Validation raised notices: " .. pp.write(val_report)) end
- repo.store_updates(tmp_gr, s)
- end
- -- Remove processing directory.
- dir.rmtree(path.join(sip.root_path, "proc"))
- if cleanup then
- -- Gather all top-level directories and delete them.
- rsrc_paths = {}
- for i, rsrc in ipairs(sip) do
- rsrc_paths[rsrc.source_path:match("[^/]+")] = true
- end
- for tlf in pairs(rsrc_paths) do
- local target = path.join(sip.root_path, tlf)
- logger:info("Cleaning up: " .. target)
- if path.isdir(target) then dir.rmtree(target)
- elseif path.isfile(target) then file.delete(target) end
- end
- logger:info("Cleaning up: " .. ll_path)
- file.delete(ll_path)
- end
- end
- return M
|