submission.lua 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. --[[ Deposit module.
  2. This module takes care of the complete deposit process (except for the back
  3. end storage, which is called here but defined in the repo module).
  4. The deposit process is carried out in several steps:
  5. - SIP generation (`generate_sip()`): scans the laundry list CSV and builds a
  6. temporary data structure with the found metadata; generates unique IDs for
  7. resources; infers some implicit relationships from the position of the CSV
  8. rows and folder layout; adds system-controlled metadata.
  9. - File staging (`deposit()`): scan through the generated SIP, identifies the
  10. files, calculates their checksums, and moves them to temporary storage; adds
  11. checksums to the metadata. TODO allow user-provided metadata and validation
  12. - Graph generation: generate an RDF graph for each resource in the SIP.
  13. - Permanent storage: push the RDF graph to permanent store (via functions in
  14. the `repo` module), which includes content model validation; if this
  15. succeeds, related files are also moved from the staging area to the archival
  16. store.
  17. - Cleanup (optional): if requested, the laundry list and resource folder are
  18. deleted from their original location.
  19. --]]
  20. local io = io
  21. local csv = require "ftcsv"
  22. local dir = require "pl.dir"
  23. local file = require "pl.file"
  24. local libmagic = require "libmagic"
  25. local path = require "pl.path"
  26. local pp = require "pl.pretty"
  27. local term = require "volksdata.term"
  28. local triple = require "volksdata.triple"
  29. local graph = require "volksdata.graph"
  30. local pkar = require "pocket_archive"
  31. local model = require "pocket_archive.model"
  32. local mc = require "pocket_archive.monocypher"
  33. local repo = require "pocket_archive.repo"
  34. local validator = require "pocket_archive.validator"
  35. local logger = pkar.logger
  36. -- "nil" table - for missing key fallback in chaining.
  37. local NT = {}
  38. -- Local path to URI mapping. For linking between newly created resources.
  39. local path_to_uri
  40. -- Track IDs in SIP to validate links created in a submission.
  41. local sip_ids
  42. -- Submission ID and name.
  43. local sub_id, sub_name
  44. -- Initialize libmagic database.
  45. local magic = libmagic.open(libmagic.MIME_TYPE, libmagic.NO_CHECK_COMPRESS )
  46. assert(magic:load())
  47. -- For idgen(). Makes a 60-character pool with ~5.9 bits of entropy per char.
  48. local chpool = {}
  49. for i = 48, 57 do table.insert(chpool, i) end -- 0-9
  50. for i = 65, 90 do table.insert(chpool, i) end -- A-Z
  51. for i = 97, 122 do table.insert(chpool, i) end -- a-z
  52. --[[
  53. Generate a random, reader-friendly ID.
  54. A 16-character ID with the above defined #chpool of 60 smybols has an entropy
  55. of 94.5 bits, which should be plenty for a medium-sized repository.
  56. ]]
  57. local function idgen(len)
  58. local charlist = {}
  59. for i = 1, (len or pkar.config.id.len) do
  60. table.insert(charlist, string.char(chpool[math.random(1, #chpool)]))
  61. end
  62. return table.concat(charlist)
  63. end
  64. local function generate_sip(ll_path)
  65. if not path.isfile(ll_path) then error(ll_path .. " is not a file.", 2) end
  66. -- Submission ID sticks to all the resources.
  67. sub_id = "sub:" .. idgen()
  68. sub_name = ll_path:match("pkar_submission[%-_%.](.*)%.csv")
  69. local sip = {root_path = path.dirname(ll_path)}
  70. path_to_uri = {}
  71. sip_ids = {}
  72. local tn_dir = path.join(sip.root_path, "proc", "tn")
  73. dir.makepath(tn_dir)
  74. local prev_id
  75. local i = 0
  76. for row_n, row in csv.parseLine(ll_path) do
  77. local has_content
  78. for k, v in pairs(row) do
  79. -- Change "" to nil.
  80. if v == "" then row[k] = nil
  81. else has_content = true end
  82. end
  83. -- Skip empty lines.
  84. if not has_content then goto skip end
  85. logger:debug("Parsing row:", pp.write(row))
  86. -- content_type is the only real mandatory entry.
  87. if row.content_type then
  88. i = i + 1
  89. -- New row.
  90. logger:info(
  91. ("Processing LL resource #%d at row #%d.")
  92. :format(i, row_n))
  93. sip[i] = {
  94. -- Normalize provided ID or generate random ID if not provided.
  95. id = "par:" .. (row.id or idgen()),
  96. sub_id = sub_id,
  97. }
  98. prev_id = row.id
  99. sip_ids[sip[i].id] = true -- Add to common sip ID set.
  100. for k, v in pairs(row) do
  101. if not v or k == "id" then goto cont1 end -- skip empty strings.
  102. if pkar.config.md.single_values[k] then sip[i][k] = v
  103. -- Multi-values are ordered in the SIP for further processing.
  104. else sip[i][k] = {v} end
  105. ::cont1::
  106. end
  107. -- Add to path to URI map for later referencing.
  108. path_to_uri[row.source_path] = sip[i].id
  109. else
  110. -- Continuation of values from a previous row.
  111. if i < 1 then
  112. error("First row MUST have a path value.", 2)
  113. elseif not prev_id then
  114. error(("No path information at row %d"):format(i), 2)
  115. else
  116. for k, v in pairs(row) do
  117. if not v then goto cont2 end -- skip empty strings.
  118. if k == "id" or pkar.config.md.single_values[k] then
  119. error(
  120. ("On CSV row #%d: field %s is single-valued.")
  121. :format(row_n, k))
  122. else
  123. logger:debug("Value: ", v)
  124. logger:debug("Inserting at row ", i - 1)
  125. table.insert(sip[i][k], v)
  126. end
  127. ::cont2::
  128. end
  129. row.id = prev_id
  130. end
  131. end
  132. ::skip::
  133. row_n = row_n + 1
  134. end
  135. -- Infer structure from paths and row ordering.
  136. for i, v in ipairs(sip) do
  137. local rmod = model.types[v.content_type]
  138. --require "debugger".assert(rmod)
  139. local fpath = path.join(sip.root_path, v.source_path)
  140. --dbg.assert(rmod)
  141. v.has_member = v.has_member or {}
  142. -- Create implicit members from single-file artifact.
  143. if rmod.types.artifact and path.isfile(fpath) then
  144. local file_id = "par:" .. idgen()
  145. sip_ids[file_id] = true
  146. -- Insert file resource and move it into a new sub-folder.
  147. table.insert(sip, {
  148. content_type = rmod.default_fmodel or "file",
  149. id = file_id,
  150. sub_id = sub_id,
  151. label = path.basename(v.source_path),
  152. source_path = v.source_path,
  153. })
  154. sip[i].has_file = file_id
  155. sip[i].pref_rep = file_id
  156. sip[i].source_path = nil
  157. goto skip
  158. end
  159. for j = i + 1, #sip do
  160. if sip[j].source_path:match("^" .. pkar.escape_ptn(v.source_path))
  161. then
  162. local rel_path = sip[j].source_path:sub(#v.source_path + 2)
  163. logger:debug("rel_path: " .. rel_path)
  164. if not rel_path:match("/") then
  165. logger:debug(("Adding member %s to %s"):format(
  166. rel_path, v.source_path))
  167. table.insert(v.has_member, sip[j].id)
  168. end
  169. end
  170. end
  171. ::skip::
  172. end
  173. logger:debug("Parsed SIP: ", pp.write(sip))
  174. return sip
  175. end
  176. --[[ Convert a SIP resource table to an in-memory Volksdata graph.
  177. --]]
  178. local function rsrc_to_graph(rsrc)
  179. local rmod = model.types[rsrc.content_type]
  180. logger:debug("Updating resource md: ", pp.write(rsrc))
  181. local s = term.new_iriref_ns(rsrc.id)
  182. local gr = graph.new(nil)
  183. it = gr:add_init()
  184. for prop, v in pairs(rsrc) do
  185. if prop == "id" then goto skip end
  186. logger:debug(("Adding attribute: %s = %s"):format(prop, pp.write(v)))
  187. local p = model.id_to_uri[prop]
  188. if not p then
  189. logger:warn(
  190. ("Term %s has no URI mapped. Assigning `pas:%s`.")
  191. :format(prop, prop))
  192. p = term.new_iriref_ns("pas:" .. prop)
  193. end
  194. local pconf = (rmod.properties or NT)[prop] or NT
  195. local rdf_type_str = pkar.config.md.datatypes[pconf.type]
  196. local rdf_type
  197. if rdf_type_str then
  198. rdf_type = term.new_iriref_ns(rdf_type_str).data
  199. end
  200. -- Force all fields to be multi-valued.
  201. if type(v) ~= "table" then v = {v} end
  202. -- Convert values to URIs.
  203. local o
  204. --if prop == "has_member" then dbg() end
  205. for i, vv in ipairs(v) do
  206. if prop == "content_type" then
  207. o = term.new_iriref_ns(rmod.uri)
  208. elseif prop == "sub_id" then
  209. o = term.new_iriref_ns(vv)
  210. elseif pconf.type == "resource" then
  211. -- "par:" could have been added previously.
  212. local rel_id = "par:" .. vv:gsub("^par:", "")
  213. if
  214. not sip_ids[rel_id]
  215. and not repo.gr:contains(triple.new(
  216. term.new_iriref_ns(rel_id),
  217. pkar.RDF_TYPE,
  218. term.new_iriref_ns("pas:Anything")
  219. ))
  220. then
  221. -- Convert local path to URIs.
  222. local uri = path_to_uri[vv]
  223. if not uri then error(
  224. ("Not a valid path: %s for property: %s on res: %s")
  225. :format(vv, prop, rsrc.id))
  226. end
  227. v[i] = uri
  228. logger:debug("Converted path ".. vv .. " to URI: " .. uri)
  229. else v[i] = rel_id
  230. end
  231. --if not v[i]:find("^par:") then dbg() end
  232. o = term.new_iriref_ns(v[i])
  233. elseif pconf.type == "ext_resource" then
  234. o = term.new_iriref(vv)
  235. else o = term.new_lit(vv, rdf_type)
  236. end
  237. it:add_iter(triple.new(s, p, o))
  238. end
  239. -- Create implicit bricks for "has_member" property.
  240. if prop == "has_member" then
  241. local proxy_s
  242. for i, vv in ipairs(v) do
  243. -- Add linked list proxies.
  244. local brick_id = "par:" .. idgen()
  245. local brick_uri = term.new_iriref_ns(brick_id)
  246. sip_ids[brick_id] = true
  247. if i == 1 then
  248. proxy_s = s
  249. it:add_iter(triple.new(
  250. proxy_s, model.id_to_uri.first, brick_uri))
  251. -- Add the "has member" property.
  252. it:add_iter(triple.new(
  253. s,
  254. term.new_iriref_ns(pconf.uri),
  255. term.new_iriref_ns(vv)))
  256. else
  257. it:add_iter(triple.new(proxy_s, model.id_to_uri.next, brick_uri))
  258. end
  259. -- Add the reference.
  260. -- Add basic triples.
  261. for t in pairs(model.types.brick.types) do
  262. it:add_iter(triple.new(
  263. brick_uri,
  264. pkar.RDF_TYPE,
  265. model.id_to_uri[t]
  266. ))
  267. end
  268. it:add_iter(triple.new(
  269. brick_uri,
  270. model.id_to_uri.content_type,
  271. term.new_iriref_ns("pas:Brick")))
  272. -- Add reference.
  273. it:add_iter(triple.new(
  274. brick_uri,
  275. term.new_iriref_ns("pas:ref"),
  276. term.new_iriref_ns(vv)))
  277. proxy_s = brick_uri
  278. end
  279. goto skip
  280. end
  281. ::skip::
  282. end
  283. -- Add resource lineage triples.
  284. for i, m in ipairs(rmod.lineage) do
  285. it:add_iter(triple.new(
  286. s, pkar.RDF_TYPE,
  287. term.new_iriref_ns(model.types[m].uri)))
  288. end
  289. it:add_done()
  290. return gr, s
  291. end
  292. -- Submission module.
  293. local M = {
  294. idgen = idgen,
  295. reset_ores = function()
  296. if path.isdir(pkar.config.fs.ores_path) then
  297. logger:warn("Removing existing opaque resource store.")
  298. dir.rmtree(pkar.config.fs.ores_path)
  299. end
  300. dir.makepath(pkar.config.fs.ores_path)
  301. end,
  302. }
  303. M.deposit = function(ll_path, cleanup)
  304. local sip = generate_sip(ll_path)
  305. local tstamp
  306. for i, rsrc in ipairs(sip) do
  307. -- TODO Wrap this chunk into a txn. Each row is atomic.
  308. logger:debug(("Processing resource #%d of %d: %s"):format(
  309. i, #sip, rsrc.id))
  310. local in_path, fext
  311. if not rsrc.source_path then goto continue end
  312. in_path = path.join(sip.root_path, rsrc.source_path)
  313. fext = path.extension(in_path)
  314. -- If it's a directory, skip file processing.
  315. if not path.isfile(in_path) then goto continue end
  316. do
  317. local tmp_dir = path.join(pkar.config.fs.ores_path, "tmp/")
  318. local file_ext
  319. _, file_ext = path.splitext(in_path)
  320. local tmp_path = tmp_dir .. rsrc.id .. file_ext
  321. dir.makepath(tmp_dir)
  322. local ifh = assert(io.open(in_path, "r"))
  323. rsrc.format = {magic:filehandle(ifh)}
  324. local hash_it = mc.new_blake2b()
  325. local fsize = 0
  326. logger:debug("Hashing ", in_path)
  327. local ofh = assert(io.open(tmp_path, "w"))
  328. while true do
  329. chunk = ifh:read(pkar.config.fs.stream_chunk_size)
  330. if not chunk then break end
  331. hash_it:update(chunk)
  332. ofh:write(chunk)
  333. fsize = fsize + #chunk
  334. end
  335. local checksum = hash_it:final(true)
  336. rsrc.checksum = {"blake2:" .. checksum}
  337. rsrc.size = fsize
  338. ofh:close()
  339. ifh:close()
  340. -- Copy file and calculate checksum.
  341. local out_dir, out_path
  342. out_dir = path.join(
  343. pkar.config.fs.ores_path,
  344. checksum:sub(1, 2),
  345. checksum:sub(3, 4))
  346. out_path = path.join(out_dir, checksum:sub(1,32) .. fext)
  347. dir.makepath(out_dir)
  348. logger:debug(("Moving file %s to %s"):format(tmp_path, out_path))
  349. dir.movefile(tmp_path, out_path)
  350. rsrc.archive_path = out_path
  351. -- Copy thumbnail if existing.
  352. if rsrc.thumbnail then
  353. src_path = rsrc.thumbnail
  354. out_path = path.join(
  355. out_dir, path.basename(src_path))
  356. logger:debug(("Moving file %s to %s"):format(src_path, out_path))
  357. dir.movefile(src_path, out_path)
  358. rsrc.thumbnail = out_path
  359. end
  360. end
  361. ::continue::
  362. tstamp = os.date("!%Y-%m-%dT%TZ")
  363. rsrc.submitted = tstamp
  364. rsrc.last_modified = tstamp
  365. local tmp_gr, s
  366. tmp_gr, s = rsrc_to_graph(rsrc)
  367. local val_report = validator.validate(tmp_gr, s)
  368. if val_report.max_level == "ERROR" then error(
  369. "Validation raised errors: " .. pp.write(val_report))
  370. elseif val_report.max_level == "WARN" then logger:warn(
  371. "Validation raised warnings: " .. pp.write(val_report))
  372. elseif val_report.max_level == "NOTICE" then logger:warn(
  373. "Validation raised notices: " .. pp.write(val_report)) end
  374. repo.store_updates(tmp_gr, s)
  375. logger:info("Stored: ", s.data)
  376. end
  377. -- Add triples for submission metadata directly to the stored graph.
  378. local it = repo.gr:add_init()
  379. local sub_uri = term.new_iriref_ns(sub_id)
  380. it:add_iter(triple.new(
  381. sub_uri,
  382. pkar.RDF_TYPE,
  383. term.new_iriref_ns("par:Submission")
  384. ))
  385. if sub_name then
  386. it:add_iter(triple.new(
  387. sub_uri,
  388. term.new_iriref_ns("rdfs:label"),
  389. term.new_lit(sub_name)
  390. ))
  391. end
  392. tstamp = os.date("!%Y-%m-%dT%TZ")
  393. it:add_iter(triple.new(
  394. sub_uri,
  395. model.id_to_uri.submitted,
  396. term.new_lit(tstamp, "xsd:dateTime", nil, true)
  397. ))
  398. it:add_iter(triple.new(
  399. sub_uri,
  400. model.id_to_uri.last_modified,
  401. term.new_lit(tstamp, "xsd:dateTime", nil, true)
  402. ))
  403. it:add_done()
  404. -- Remove processing directory.
  405. local proc_dir = path.join(sip.root_path, "proc")
  406. if path.isdir(proc_dir) then dir.rmtree(proc_dir) end
  407. if cleanup then
  408. -- Gather all top-level directories and delete them.
  409. rsrc_paths = {}
  410. for i, rsrc in ipairs(sip) do
  411. rsrc_paths[rsrc.source_path:match("[^/]+")] = true
  412. end
  413. for tlf in pairs(rsrc_paths) do
  414. local target = path.join(sip.root_path, tlf)
  415. logger:info("Cleaning up: " .. target)
  416. if path.isdir(target) then dir.rmtree(target)
  417. elseif path.isfile(target) then file.delete(target) end
  418. end
  419. logger:info("Cleaning up: " .. ll_path)
  420. file.delete(ll_path)
  421. end
  422. end
  423. return M