admin.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. import logging
  2. import os
  3. import click_log
  4. from contextlib import ExitStack
  5. from shutil import rmtree
  6. import lmdb
  7. import requests
  8. from rdflib import Graph, URIRef
  9. import lakesuperior.env_setup
  10. from lakesuperior.dictionaries.namespaces import ns_collection as nsc
  11. from lakesuperior.env import env
  12. from lakesuperior.store.ldp_rs.lmdb_store import TxnManager
  13. from lakesuperior.store.ldp_nr.default_layout import DefaultLayout as FileLayout
  14. __doc__ = '''
  15. Admin API.
  16. This module contains maintenance utilities and stats.
  17. '''
  18. logger = logging.getLogger(__name__)
  19. app_globals = env.app_globals
  20. _ignore_list = (
  21. nsc['fcrepo'].hasParent,
  22. nsc['fcrepo'].hasTransactionProvider,
  23. )
  24. def stats():
  25. '''
  26. Get repository statistics.
  27. @return dict Store statistics, resource statistics.
  28. '''
  29. repo_stats = {'rsrc_stats': env.app_globals.rdfly.count_rsrc()}
  30. with TxnManager(env.app_globals.rdf_store) as txn:
  31. repo_stats['store_stats'] = env.app_globals.rdf_store.stats()
  32. return repo_stats
  33. def dump(
  34. src, dest, start=('/',), binary_handling='include',
  35. compact_uris=False):
  36. '''
  37. Dump a whole LDP repository or parts of it to disk.
  38. @param src (rdflib.term.URIRef) Webroot of source repository. This must
  39. correspond to the LDP root node (for Fedora it can be e.g.
  40. `http://localhost:8080fcrepo/rest/`) and is used to determine if URIs
  41. retrieved are managed by this repository.
  42. @param dest (str) Local path of the destination. If the location exists it
  43. must be a writable directory. It will be deleted and recreated. If it does
  44. not exist, it will be created along with its parents if missing.
  45. @param start (tuple|list) List of starting points to retrieve resources
  46. from. It would typically be the repository root in case of a full dump
  47. or one or more resources in the repository for a partial one.
  48. @param binary_handling (string) One of 'include', 'truncate' or 'split'.
  49. @param compact_uris (bool) NOT IMPLEMENTED. Whether the process should
  50. attempt to compact URIs generated with broken up path segments. If the UID
  51. matches a pattern such as `/12/34/56/123456...` it is converted to
  52. `/123456...`. This would remove a lot of cruft caused by the pairtree
  53. segments. Note that this will change the publicly exposed URIs. If
  54. durability is a concern, a rewrite directive can be added to the HTTP
  55. server that proxies the WSGI endpoint.
  56. '''
  57. # 1. Retrieve list of resources.
  58. start_pts = (
  59. (start,)
  60. if not isinstance(start, list) and not isinstance(start, tuple)
  61. else start)
  62. dbpath = '{}/ldprs_store'.format(dest)
  63. rmtree(dbpath, ignore_errors=True)
  64. os.makedirs(dbpath)
  65. fpath = '{}/ldpnr_store'.format(dest)
  66. rmtree(fpath, ignore_errors=True)
  67. os.makedirs(fpath)
  68. with lmdb.open(
  69. dbpath, 1024 ** 4, metasync=False, readahead=False,
  70. meminit=False) as db:
  71. for start in start_pts:
  72. if not start.startswith('/'):
  73. raise ValueError(
  74. 'Starting point {} does not begin with a slash.'
  75. .format(start))
  76. _gather_refs(db, src, start, dest)
  77. entries = db.stat()['entries']
  78. logger.info('Dumped {} resources.'.format(entries))
  79. return entries
  80. def _gather_refs(db, base, path, dest):
  81. '''
  82. Get the UID of a resource and its relationships recursively.
  83. This method recurses into itself each time a reference to a resource
  84. managed by the repository is encountered.
  85. @param base (string) Base URL of repository. This is used to determine
  86. whether encountered URI terms are repository-managed.
  87. @param path (string) Path, relative to base URL, of the resource to gather.
  88. @param dest (string) Local path for RDF database and non-RDF files.
  89. '''
  90. pfx = base.rstrip('/')
  91. # Public URI of source repo.
  92. uri = pfx + path
  93. # Internal URI of destination.
  94. iuri = URIRef(uri.replace(pfx, nsc['fcres']))
  95. ibase = base.replace(pfx, nsc['fcres'])
  96. rsp = requests.head(uri)
  97. rsp.raise_for_status()
  98. # Determine LDP type.
  99. ldp_type = 'ldp_nr'
  100. for link in requests.utils.parse_header_links(rsp.headers.get('link')):
  101. if (
  102. link.get('rel') == 'type'
  103. and link.get('url') == str(nsc['ldp'].RDFSource)):
  104. ldp_type = 'ldp_rs'
  105. break
  106. # Get the whole RDF document now because we have to know all outbound
  107. # links.
  108. get_uri = uri if ldp_type == 'ldp_rs' else '{}/fcr:metadata'.format(uri)
  109. get_req = requests.get(get_uri)
  110. get_req.raise_for_status()
  111. data = get_req.content.replace(base.encode('utf-8'), ibase.encode('utf-8'))
  112. logger.debug('Localized data: {}'.format(data.decode('utf-8')))
  113. gr = Graph(identifier=iuri).parse(data=data, format='turtle')
  114. # First store the resource, so when we recurse, a resource referring back
  115. # to this resource will skip it as already existing and avoid an infinite
  116. # loop.
  117. #
  118. # The RDF data stream inserted is the turtle-serialized bytestring as it
  119. # comes from the request.
  120. with db.begin(write=True) as txn:
  121. with txn.cursor() as cur:
  122. if not cur.set_key(iuri.encode('utf-8')):
  123. cur.put(uri.encode('utf-8'), data)
  124. # Grab binary.
  125. if ldp_type == 'ldp_nr':
  126. bin_resp = requests.get('{}/fcr:content'.format(uri))
  127. bin_resp.raise_for_status()
  128. # @FIXME Use a more robust checking mechanism. Maybe offer the option
  129. # to verify the content checksum.
  130. cnt_hash = gr.value(iuri, nsc['premis'].hasMessageDigest).replace(
  131. 'urn:sha1:', '')
  132. fpath = FileLayout.local_path('{}/ldpnr_store'.format(dest), cnt_hash)
  133. os.makedirs(os.path.dirname(fpath), exist_ok=True)
  134. with open(fpath, 'wb') as f:
  135. f.write(bin_resp.content)
  136. # Now, crawl through outbound links.
  137. # LDP-NR fcr:metadata must be checked too.
  138. for pred, obj in gr.predicate_objects():
  139. if (
  140. isinstance(obj, URIRef)
  141. and obj.startswith(iuri)
  142. and pred not in _ignore_list):
  143. with db.begin() as txn:
  144. with txn.cursor() as cur:
  145. # Avoid ∞
  146. if cur.set_key(obj.encode('utf-8')):
  147. continue
  148. _gather_refs(db, base, obj.replace(ibase, ''), dest)