migrator.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. import logging
  2. import shutil
  3. from io import BytesIO
  4. from contextlib import ContextDecorator
  5. from os import path
  6. import lmdb
  7. import requests
  8. import yaml
  9. from rdflib import Graph, URIRef
  10. from lakesuperior.dictionaries.namespaces import ns_collection as nsc
  11. from lakesuperior.env import env
  12. from lakesuperior.globals import AppGlobals
  13. from lakesuperior.config_parser import parse_config
  14. from lakesuperior.store.ldp_rs.lmdb_store import TxnManager
  15. logger = logging.getLogger(__name__)
  16. class StoreWrapper(ContextDecorator):
  17. '''
  18. Open and close a store.
  19. '''
  20. def __init__(self, store):
  21. self.store = store
  22. def __enter__(self):
  23. self.store.open(
  24. env.config['application']['store']['ldp_rs'])
  25. def __exit__(self, *exc):
  26. self.store.close()
  27. class Migrator:
  28. """
  29. Class to handle a database migration.
  30. This class holds state of progress and shared variables as it crawls
  31. through linked resources in an LDP server.
  32. Since a repository migration can be a very long operation but it is
  33. impossible to know the number of the resources to gather by LDP interaction
  34. alone, a progress ticker outputs the number of processed resources at
  35. regular intervals.
  36. """
  37. """
  38. LMDB database parameters.
  39. See :meth:`lmdb.Environment.__init__`
  40. """
  41. db_params = {
  42. 'map_size': 1024 ** 4,
  43. 'metasync': False,
  44. 'readahead': False,
  45. 'meminit': False,
  46. }
  47. """List of predicates to ignore when looking for links."""
  48. ignored_preds = (
  49. nsc['fcrepo'].hasParent,
  50. nsc['fcrepo'].hasTransactionProvider,
  51. )
  52. def __init__(
  53. self, src, dest, start_pts, zero_binaries=False,
  54. compact_uris=False):
  55. """
  56. Set up base paths and clean up existing directories.
  57. :param src: (URIRef) Webroot of source repository. This must
  58. correspond to the LDP root node (for Fedora it can be e.g.
  59. ``http://localhost:8080fcrepo/rest/``) and is used to determine if URIs
  60. retrieved are managed by this repository.
  61. :param dest: (str) Destination repository path. If the location exists
  62. it must be a writable directory. It will be deleted and recreated. If
  63. it does not exist, it will be created along with its parents if
  64. missing.
  65. :param start_pts: (tuple|list) List of starting points to retrieve
  66. resources from. It would typically be the repository root in case of a
  67. full dump or one or more resources in the repository for a partial one.
  68. :param binary_handling: (string) One of ``include``, ``truncate`` or
  69. ``split``.
  70. :param compact_uris: (bool) NOT IMPLEMENTED. Whether the process should
  71. attempt to compact URIs generated with broken up path segments. If the
  72. UID matches a pattern such as `/12/34/56/123456...` it is converted to
  73. `/123456...`. This would remove a lot of cruft caused by the pairtree
  74. segments. Note that this will change the publicly exposed URIs. If
  75. durability is a concern, a rewrite directive can be added to the HTTP
  76. server that proxies the WSGI endpoint.
  77. """
  78. # Set up repo folder structure and copy default configuration to
  79. # destination file.
  80. cur_dir = path.dirname(path.dirname(path.abspath(__file__)))
  81. self.dbpath = '{}/data/ldprs_store'.format(dest)
  82. self.fpath = '{}/data/ldpnr_store'.format(dest)
  83. self.config_dir = '{}/etc'.format(dest)
  84. shutil.rmtree(dest, ignore_errors=True)
  85. shutil.copytree(
  86. '{}/etc.defaults'.format(cur_dir), self.config_dir)
  87. # Modify and overwrite destination configuration.
  88. orig_config = parse_config(self.config_dir)
  89. orig_config['application']['store']['ldp_rs']['location'] = self.dbpath
  90. orig_config['application']['store']['ldp_nr']['path'] = self.fpath
  91. # This sets a "hidden" configuration property that bypasses all server
  92. # management on resource load: referential integrity, server-managed
  93. # triples, etc. This will be removed at the end of the migration.
  94. orig_config['application']['store']['ldp_rs']['disable_checks'] = True
  95. with open('{}/application.yml'.format(self.config_dir), 'w') \
  96. as config_file:
  97. config_file.write(yaml.dump(orig_config['application']))
  98. env.config = parse_config(self.config_dir)
  99. env.app_globals = AppGlobals(env.config)
  100. with TxnManager(env.app_globals.rdf_store, write=True) as txn:
  101. env.app_globals.rdfly.bootstrap()
  102. env.app_globals.rdfly.store.close()
  103. env.app_globals.nonrdfly.bootstrap()
  104. self.src = src.rstrip('/')
  105. self.start_pts = start_pts
  106. from lakesuperior.api import resource as rsrc_api
  107. self.rsrc_api = rsrc_api
  108. print('Environment: {}'.format(env))
  109. print('Resource API Environment: {}'.format(self.rsrc_api.env))
  110. def migrate(self):
  111. """
  112. Migrate the database.
  113. This method creates a fully functional and configured LAKEsuperior
  114. environment contained in a folder from an LDP repository.
  115. """
  116. self._ct = 0
  117. with StoreWrapper(env.app_globals.rdfly.store):
  118. for start in self.start_pts:
  119. if not start.startswith('/'):
  120. raise ValueError(
  121. 'Starting point {} does not begin with a slash.'
  122. .format(start))
  123. self._crawl(start)
  124. self._remove_temp_options()
  125. logger.info('Dumped {} resources.'.format(self._ct))
  126. return self._ct
  127. def _crawl(self, uid):
  128. """
  129. Get the contents of a resource and its relationships recursively.
  130. This method recurses into itself each time a reference to a resource
  131. managed by the repository is encountered.
  132. @param uid (string) The path relative to the source server webroot
  133. pointing to the resource to crawl, effectively the resource UID.
  134. """
  135. ibase = str(nsc['fcres'])
  136. # Public URI of source repo.
  137. uri = self.src + uid
  138. # Internal URI of destination.
  139. iuri = ibase + uid
  140. rsp = requests.head(uri)
  141. rsp.raise_for_status()
  142. # Determine LDP type.
  143. ldp_type = 'ldp_nr'
  144. try:
  145. for link in requests.utils.parse_header_links(
  146. rsp.headers.get('link')):
  147. if (
  148. link.get('rel') == 'type'
  149. and link.get('url') == str(nsc['ldp'].RDFSource)):
  150. # Resource is an LDP-RS.
  151. ldp_type = 'ldp_rs'
  152. break
  153. except TypeError:
  154. raise ValueError('URI {} is not an LDP resource.'.format(uri))
  155. # Get the whole RDF document now because we have to know all outbound
  156. # links.
  157. get_uri = (
  158. uri if ldp_type == 'ldp_rs' else '{}/fcr:metadata'.format(uri))
  159. get_req = requests.get(get_uri)
  160. get_req.raise_for_status()
  161. data = get_req.content.replace(
  162. self.src.encode('utf-8'), ibase.encode('utf-8'))
  163. #logger.debug('Localized data: {}'.format(data.decode('utf-8')))
  164. gr = Graph(identifier=iuri).parse(data=data, format='turtle')
  165. # Grab binary and set new resource parameters.
  166. if ldp_type == 'ldp_nr':
  167. bin_resp = requests.get('{}/fcr:content'.format(uri))
  168. bin_resp.raise_for_status()
  169. data = bin_resp.content
  170. provided_imr = gr.resource(URIRef(iuri))
  171. mimetype = bin_resp.headers.get('content-type')
  172. self.rsrc_api.create_or_replace(
  173. uid, mimetype=mimetype, provided_imr=provided_imr,
  174. stream=BytesIO(data))
  175. else:
  176. mimetype = 'text/turtle'
  177. # @TODO This can be improved by creating a resource API method for
  178. # creating a resource from an RDFLib graph. Here we had to deserialize
  179. # the RDF data to gather information but have to pass the original
  180. # serialized stream, which has to be deserialized again in the model.
  181. self.rsrc_api.create_or_replace(
  182. uid, mimetype=mimetype, stream=BytesIO(data))
  183. self._ct += 1
  184. if self._ct % 10 ==0:
  185. print('{} resources processed.'.format(self._ct))
  186. # Now, crawl through outbound links.
  187. # LDP-NR fcr:metadata must be checked too.
  188. for pred, obj in gr.predicate_objects():
  189. uid = obj.replace(ibase, '')
  190. if (
  191. isinstance(obj, URIRef)
  192. and obj.startswith(iuri)
  193. and not self.rsrc_api.exists(uid) # Avoid ∞ loop
  194. and pred not in self.ignored_preds):
  195. self._crawl(uid)
  196. def _remove_temp_options(self):
  197. """Remove temporary options in configuration."""
  198. del(env.config['application']['store']['ldp_rs']['disable_checks'])
  199. with open('{}/application.yml'.format(self.config_dir), 'w') \
  200. as config_file:
  201. config_file.write(yaml.dump(env.config['application']))