submission.lua 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. --[[ Deposit module.
  2. This module takes care of the complete deposit process (except for the back
  3. end storage, which is called here but defined in the repo module).
  4. The deposit process is carried out in several steps:
  5. - SIP generation (`generate_sip()`): scans the laundry list CSV and builds a
  6. temporary data structure with the found metadata; generates unique IDs for
  7. resources; infers some implicit relationships from the position of the CSV
  8. rows and folder layout; adds system-controlled metadata.
  9. - File staging (`deposit()`): scan through the generated SIP, identifies the
  10. files, calculates their checksums, and moves them to temporary storage; adds
  11. checksums to the metadata. TODO allow user-provided metadata and validation
  12. - Graph generation: generate an RDF graph for each resource in the SIP.
  13. - Permanent storage: push the RDF graph to permanent store (via functions in
  14. the `repo` module), which includes content model validation; if this
  15. succeeds, related files are also moved from the staging area to the archival
  16. store.
  17. - Cleanup (optional): if requested, the laundry list and resource folder are
  18. deleted from their original location.
  19. --]]
  20. local io = io
  21. local csv = require "ftcsv"
  22. local dir = require "pl.dir"
  23. local file = require "pl.file"
  24. local libmagic = require "libmagic"
  25. local path = require "pl.path"
  26. local pp = require "pl.pretty"
  27. local term = require "volksdata.term"
  28. local triple = require "volksdata.triple"
  29. local graph = require "volksdata.graph"
  30. local pkar = require "pocket_archive"
  31. local model = require "pocket_archive.model"
  32. local mc = require "pocket_archive.monocypher"
  33. local repo = require "pocket_archive.repo"
  34. local validator = require "pocket_archive.validator"
  35. local logger = pkar.logger
  36. local dbg = require "debugger"
  37. -- "nil" table - for missing key fallback in chaining.
  38. local NT = {}
  39. -- Local path to URI mapping. For linking between newly created resources.
  40. local path_to_uri
  41. -- Initialize libmagic database.
  42. local magic = libmagic.open(libmagic.MIME_TYPE, libmagic.NO_CHECK_COMPRESS )
  43. assert(magic:load())
  44. -- For idgen(). Makes a 60-character pool with ~5.9 bits of entropy per char.
  45. local chpool = {}
  46. for i = 48, 57 do table.insert(chpool, i) end -- 0-9
  47. for i = 65, 90 do table.insert(chpool, i) end -- A-Z
  48. for i = 97, 122 do table.insert(chpool, i) end -- a-z
  49. --[[
  50. Generate a random, reader-friendly ID.
  51. A 16-character ID with the above defined #chpool of 60 smybols has an entropy
  52. of 94.5 bits, which should be plenty for a medium-sized repository.
  53. ]]
  54. local function idgen(len)
  55. local charlist = {}
  56. for i = 1, (len or pkar.config.id.len) do
  57. table.insert(charlist, string.char(chpool[math.random(1, #chpool)]))
  58. end
  59. return table.concat(charlist)
  60. end
  61. local function generate_sip(ll_path)
  62. local sip = {root_path = path.dirname(ll_path)}
  63. path_to_uri = {}
  64. local tn_dir = path.join(sip.root_path, "proc", "tn")
  65. dir.makepath(tn_dir)
  66. local prev_path
  67. local i = 0
  68. for row_n, row in csv.parseLine(ll_path) do
  69. local has_content
  70. for k, v in pairs(row) do
  71. -- Change "" to nil.
  72. if v == "" then row[k] = nil
  73. else has_content = true end
  74. end
  75. -- Skip empty lines.
  76. if not has_content then goto skip end
  77. logger:debug("Row path: ", row.source_path or "")
  78. logger:debug("Parsing row:", pp.write(row))
  79. if row.source_path then
  80. i = i + 1
  81. logger:info(
  82. ("Processing LL resource #%d at row #%d.")
  83. :format(i, row_n))
  84. prev_path = row.source_path
  85. -- New row.
  86. sip[i] = {id = "par:" .. idgen()}
  87. -- Add to path to URI map for later referencing.
  88. path_to_uri[row.source_path] = sip[i].id
  89. for k, v in pairs(row) do
  90. if not v then goto cont1 end -- skip empty strings.
  91. if pkar.config.md.single_values[k] then sip[i][k] = v
  92. -- Multi-values are ordered in the SIP for further processing.
  93. else sip[i][k] = {v} end
  94. ::cont1::
  95. end
  96. else
  97. -- Continuation of values from a previous row.
  98. if i < 1 then
  99. error("First row MUST have a path value.", 2)
  100. elseif not prev_path then
  101. error(("No path information at row %d"):format(i), 2)
  102. else
  103. for k, v in pairs(row) do
  104. if not v then goto cont2 end -- skip empty strings.
  105. if pkar.config.md.single_values[k] then
  106. -- It doesn't make much sense to overwrite, maybe throw an error?
  107. error(
  108. ("On CSV row #%d: field %s is single-valued.")
  109. :format(row_n, k))
  110. else
  111. logger:debug("Value: ", v)
  112. logger:debug("Inserting at row ", i - 1)
  113. table.insert(sip[i][k], v)
  114. end
  115. ::cont2::
  116. end
  117. row.source_path = prev_path
  118. end
  119. end
  120. ::skip::
  121. row_n = row_n + 1
  122. end
  123. -- Infer structure from paths and row ordering.
  124. for i, v in ipairs(sip) do
  125. local rmod = model.types[v.content_type]
  126. dbg.assert(v.source_path)
  127. local fpath = path.join(sip.root_path, v.source_path)
  128. --dbg.assert(rmod)
  129. v.has_member = v.has_member or {}
  130. -- Create implicit members from single-file artifact.
  131. if rmod.types.artifact and path.isfile(fpath) then
  132. local file_id = "par:" .. idgen()
  133. -- Insert file resource and move it into a new sub-folder.
  134. table.insert(sip, {
  135. content_type = rmod.default_fmodel or "file",
  136. id = file_id,
  137. label = path.basename(v.source_path),
  138. source_path = v.source_path,
  139. })
  140. sip[i].has_file = file_id
  141. sip[i].pref_rep = file_id
  142. sip[i].source_path = nil
  143. goto skip
  144. end
  145. for j = i + 1, #sip do
  146. if sip[j].source_path:match("^" .. pkar.escape_ptn(v.source_path))
  147. then
  148. local rel_path = sip[j].source_path:sub(#v.source_path + 2)
  149. logger:debug("rel_path: " .. rel_path)
  150. if not rel_path:match("/") then
  151. logger:debug(("Adding member %s to %s"):format(
  152. rel_path, v.source_path))
  153. table.insert(v.has_member, sip[j].id)
  154. end
  155. end
  156. end
  157. ::skip::
  158. end
  159. return sip
  160. end
  161. --[[ Convert a SIP resource table to an in-memory Volksdata graph.
  162. --]]
  163. local function rsrc_to_graph(rsrc)
  164. local rmod = model.types[rsrc.content_type]
  165. logger:info("Updating resource md: ", pp.write(rsrc))
  166. local s = term.new_iriref_ns(rsrc.id)
  167. local gr = graph.new(nil)
  168. it = gr:add_init()
  169. for prop, v in pairs(rsrc) do
  170. if prop == "id" then goto skip end
  171. logger:debug(("Adding attribute: %s = %s"):format(prop, pp.write(v)))
  172. local p = model.id_to_uri[prop]
  173. if not p then
  174. logger:warn(
  175. ("Term %s has no URI mapped. Assigning `pas:%s`.")
  176. :format(prop, prop))
  177. p = term.new_iriref_ns("pas:" .. prop)
  178. end
  179. local pconf = (rmod.properties or NT)[prop] or NT
  180. local rdf_type_str = pkar.config.md.datatypes[pconf.type]
  181. local rdf_type
  182. if rdf_type_str then
  183. rdf_type = term.new_iriref_ns(rdf_type_str).data
  184. end
  185. -- Force all fields to be multi-valued.
  186. if type(v) ~= "table" then v = {v} end
  187. -- Convert values to URIs.
  188. local o
  189. --if prop == "has_member" then dbg() end
  190. for i, vv in ipairs(v) do
  191. if prop == "content_type" then
  192. o = term.new_iriref_ns(rmod.uri)
  193. elseif pconf.type == "resource" then
  194. if not vv:match("^[a-z]*:") then
  195. -- Convert local path to URIs.
  196. v[i] = assert(path_to_uri[vv]) end
  197. o = term.new_iriref_ns(v[i])
  198. elseif pconf.type == "ext_resource" then
  199. o = term.new_iriref(vv)
  200. else o = term.new_lit(vv, rdf_type) end
  201. it:add_iter(triple.new(s, p, o))
  202. end
  203. -- Create implicit bricks for "has_member" property.
  204. if prop == "has_member" then
  205. local proxy_s
  206. for i, vv in ipairs(v) do
  207. -- Add linked list proxies.
  208. local brick_uri = term.new_iriref_ns("par:" .. idgen())
  209. if i == 1 then
  210. proxy_s = s
  211. it:add_iter(triple.new(
  212. proxy_s, model.id_to_uri.first, brick_uri))
  213. -- Add the "has member" property.
  214. it:add_iter(triple.new(
  215. s,
  216. term.new_iriref_ns(pconf.uri),
  217. term.new_iriref_ns(vv)))
  218. else
  219. it:add_iter(triple.new(proxy_s, model.id_to_uri.next, brick_uri))
  220. end
  221. -- Add the reference.
  222. -- Add basic triples.
  223. for t in pairs(model.types.brick.types) do
  224. it:add_iter(triple.new(
  225. brick_uri,
  226. pkar.RDF_TYPE,
  227. model.id_to_uri[t]
  228. ))
  229. end
  230. it:add_iter(triple.new(
  231. brick_uri,
  232. model.id_to_uri.content_type,
  233. term.new_iriref_ns("pas:Brick")))
  234. -- Add reference.
  235. it:add_iter(triple.new(
  236. brick_uri,
  237. term.new_iriref_ns("pas:ref"),
  238. term.new_iriref_ns(vv)))
  239. proxy_s = brick_uri
  240. end
  241. goto skip
  242. end
  243. ::skip::
  244. end
  245. for i, m in ipairs(rmod.lineage) do
  246. it:add_iter(triple.new(
  247. s, pkar.RDF_TYPE,
  248. term.new_iriref_ns(model.types[m].uri)))
  249. end
  250. it:add_done()
  251. return gr, s
  252. end
  253. -- Submission module.
  254. local M = {}
  255. M.deposit = function(ll_path, cleanup)
  256. local sip = generate_sip(ll_path)
  257. for i, rsrc in ipairs(sip) do
  258. -- TODO Wrap this chunk into a txn. Each row is atomic.
  259. logger:debug(("Processing resource #%d of %d: %s"):format(
  260. i, #sip, rsrc.id))
  261. local in_path, fext
  262. if not rsrc.source_path then goto continue end
  263. in_path = path.join(sip.root_path, rsrc.source_path)
  264. fext = path.extension(in_path)
  265. -- If it's a directory, skip file processing.
  266. if not path.isfile(in_path) then goto continue end
  267. do
  268. local tmp_dir = path.join(pkar.config.fs.ores_path, "tmp/")
  269. local file_ext
  270. _, file_ext = path.splitext(in_path)
  271. local tmp_path = tmp_dir .. rsrc.id .. file_ext
  272. dir.makepath(tmp_dir)
  273. local ifh = assert(io.open(in_path, "r"))
  274. rsrc.format = {magic:filehandle(ifh)}
  275. local hash_it = mc.new_blake2b()
  276. local fsize = 0
  277. logger:debug("Hashing ", in_path)
  278. local ofh = assert(io.open(tmp_path, "w"))
  279. while true do
  280. chunk = ifh:read(pkar.config.fs.stream_chunk_size)
  281. if not chunk then break end
  282. hash_it:update(chunk)
  283. ofh:write(chunk)
  284. fsize = fsize + #chunk
  285. end
  286. local checksum = hash_it:final(true)
  287. rsrc.checksum = {"urn:blake2:" .. checksum}
  288. rsrc.size = fsize
  289. ofh:close()
  290. ifh:close()
  291. -- Copy file and calculate checksum.
  292. local out_dir, out_path
  293. out_dir = path.join(
  294. pkar.config.fs.ores_path,
  295. checksum:sub(1, 2),
  296. checksum:sub(3, 4))
  297. out_path = path.join(out_dir, checksum:sub(1,32) .. fext)
  298. dir.makepath(out_dir)
  299. logger:debug(("Moving file %s to %s"):format(tmp_path, out_path))
  300. dir.movefile(tmp_path, out_path)
  301. rsrc.archive_path = out_path
  302. -- Copy thumbnail if existing.
  303. if rsrc.thumbnail then
  304. src_path = rsrc.thumbnail
  305. out_path = path.join(
  306. out_dir, path.basename(src_path))
  307. logger:debug(("Moving file %s to %s"):format(src_path, out_path))
  308. dir.movefile(src_path, out_path)
  309. rsrc.thumbnail = out_path
  310. end
  311. end
  312. ::continue::
  313. tstamp = os.date("!%Y-%m-%dT%TZ")
  314. rsrc.submitted = tstamp
  315. rsrc.last_modified = tstamp
  316. local tmp_gr, s
  317. tmp_gr, s = rsrc_to_graph(rsrc)
  318. local val_report = validator.validate(tmp_gr, s)
  319. if val_report.max_level == "ERROR" then error(
  320. "Validation raised errors: " .. pp.write(val_report))
  321. elseif val_report.max_level == "WARN" then logger:warn(
  322. "Validation raised warnings: " .. pp.write(val_report))
  323. elseif val_report.max_level == "NOTICE" then logger:warn(
  324. "Validation raised notices: " .. pp.write(val_report)) end
  325. repo.store_updates(tmp_gr, s)
  326. end
  327. -- Remove processing directory.
  328. dir.rmtree(path.join(sip.root_path, "proc"))
  329. if cleanup then
  330. -- Gather all top-level directories and delete them.
  331. rsrc_paths = {}
  332. for i, rsrc in ipairs(sip) do
  333. rsrc_paths[rsrc.source_path:match("[^/]+")] = true
  334. end
  335. for tlf in pairs(rsrc_paths) do
  336. local target = path.join(sip.root_path, tlf)
  337. logger:info("Cleaning up: " .. target)
  338. if path.isdir(target) then dir.rmtree(target)
  339. elseif path.isfile(target) then file.delete(target) end
  340. end
  341. logger:info("Cleaning up: " .. ll_path)
  342. file.delete(ll_path)
  343. end
  344. end
  345. return M