admin.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. import logging
  2. import click_log
  3. from contextlib import ExitStack
  4. from shutil import rmtree
  5. import lmdb
  6. import requests
  7. from rdflib import Graph, URIRef
  8. from lakesuperior.dictionaries.namespaces import ns_collection as nsc
  9. from lakesuperior.env import env
  10. from lakesuperior.store.ldp_rs.lmdb_store import TxnManager
  11. __doc__ = '''
  12. Admin API.
  13. This module contains maintenance utilities and stats.
  14. '''
  15. logger = logging.getLogger(__name__)
  16. app_globals = env.app_globals
  17. def stats():
  18. '''
  19. Get repository statistics.
  20. @return dict Store statistics, resource statistics.
  21. '''
  22. repo_stats = {'rsrc_stats': env.app_globals.rdfly.count_rsrc()}
  23. with TxnManager(env.app_globals.rdf_store) as txn:
  24. repo_stats['store_stats'] = env.app_globals.rdf_store.stats()
  25. return repo_stats
  26. @click_log.simple_verbosity_option(logger)
  27. def dump(
  28. src, dest, start=('/',), binary_handling='include',
  29. compact_uris=False):
  30. '''
  31. Dump a whole LDP repository or parts of it to disk.
  32. @param src (rdflib.term.URIRef) Webroot of source repository. This must
  33. correspond to the LDP root node (for Fedora it can be e.g.
  34. `http://localhost:8080fcrepo/rest/`) and is used to determine if URIs
  35. retrieved are managed by this repository.
  36. @param dest (rdflib.URIRef) Base URI of the destination. This can be any
  37. container in a LAKEsuperior server. If the resource exists, it must be an
  38. LDP container. If it does not exist, it will be created.
  39. @param start (tuple|list) List of starting points to retrieve resources
  40. from. It would typically be the repository root in case of a full dump
  41. or one or more resources in the repository for a partial one.
  42. @param binary_handling (string) One of 'include', 'truncate' or 'split'.
  43. @param compact_uris (bool) NOT IMPLEMENTED. Whether the process should
  44. attempt to compact URIs generated with broken up path segments. If the UID
  45. matches a pattern such as `/12/34/56/123456...` it is converted to
  46. `/123456...`. This would remove a lot of cruft caused by the pairtree
  47. segments. Note that this will change the publicly exposed URIs. If
  48. durability is a concern, a rewrite directive can be added to the HTTP
  49. server that proxies the WSGI endpoint.
  50. '''
  51. # 1. Retrieve list of resources.
  52. if not isinstance(start, list) and not isinstance(start, tuple):
  53. start = (start,)
  54. _gather_resources(src, start)
  55. def _gather_resources(webroot, start_pts):
  56. '''
  57. Gather all resources recursively and save them to temporary store.
  58. Resource UIDs (without the repository webroot) are saved as unique keys
  59. in a temporary store.
  60. @param webroot (string) Base URI of the repository.
  61. @param start_pts (tuple|list) Starting points to gather.
  62. '''
  63. dbpath = '/var/tmp/fcrepo_migration_data'
  64. rmtree(dbpath, ignore_errors=True)
  65. with lmdb.open(
  66. dbpath, 1024 ** 4, metasync=False, readahead=False,
  67. meminit=False) as db:
  68. #import pdb; pdb.set_trace()
  69. for start in start_pts:
  70. if not start.startswith('/'):
  71. raise ValueError(
  72. 'Starting point {} does not begin with a slash.'
  73. .format(start))
  74. _gather_refs(db, webroot, start)
  75. @click_log.simple_verbosity_option(logger)
  76. def _gather_refs(db, base, path):
  77. '''
  78. Get the UID of a resource and its relationships recursively.
  79. This method recurses into itself each time a reference to a resource
  80. managed by the repository is encountered.
  81. @param base (string) Base URL of repository. This is used to determine
  82. whether encountered URI terms are repository-managed.
  83. @param base (string) Path, relative to base URL, of the resource to gather.
  84. '''
  85. pfx = base.rstrip('/')
  86. # Public URI of source repo.
  87. uri = pfx + path
  88. # Internal URI of destination.
  89. iuri = uri.replace(pfx, nsc['fcres'])
  90. ibase = base.replace(pfx, nsc['fcres'])
  91. rsp = requests.head(uri)
  92. rsp.raise_for_status()
  93. # Determine LDP type.
  94. ldp_type = 'ldp_nr'
  95. for link in requests.utils.parse_header_links(rsp.headers.get('link')):
  96. if (
  97. link.get('rel') == 'type'
  98. and link.get('url') == str(nsc['ldp'].RDFSource)):
  99. ldp_type = 'ldp_rs'
  100. break
  101. if ldp_type == 'ldp_rs':
  102. # Get the whole RDF document now because we have to know all outbound
  103. # links.
  104. get_uri = uri
  105. else:
  106. get_uri = uri + '/fcr:metadata'
  107. get_req = requests.get(get_uri)
  108. get_req.raise_for_status()
  109. data = get_req.content.replace(base.encode('utf-8'), ibase.encode('utf-8'))
  110. logger.debug('Localized data: {}'.format(data.decode('utf-8')))
  111. gr = Graph(identifier=iuri).parse(data=data, format='turtle')
  112. # First store the resource, so when we recurse, a resource referring back
  113. # to this resource will skip it as already existing and avoid an infinite
  114. # loop.
  115. #
  116. # The RDF data stream inserted is the turtle-serialized bytestring as it
  117. # comes from the request.
  118. with db.begin(write=True) as txn:
  119. with txn.cursor() as cur:
  120. if not cur.set_key(iuri.encode('utf-8')):
  121. cur.put(uri.encode('utf-8'), data)
  122. # Now, crawl through outbound links.
  123. # LDP-NR fcr:metadata must be checked too.
  124. for pred, obj in gr.predicate_objects():
  125. if (
  126. isinstance(obj, URIRef)
  127. and obj.startswith(iuri)
  128. and pred != nsc['fcrepo'].hasParent):
  129. with db.begin() as txn:
  130. with txn.cursor() as cur:
  131. # Avoid ∞
  132. if cur.set_key(obj.encode('utf-8')):
  133. continue
  134. _gather_refs(db, base, obj.replace(ibase, ''))