migrator.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. import logging
  2. import shutil
  3. from contextlib import ContextDecorator
  4. from os import makedirs, path
  5. from urllib.parse import urldefrag
  6. import requests
  7. import yaml
  8. from rdflib import Graph, URIRef
  9. from lakesuperior import env, basedir
  10. from lakesuperior.dictionaries.namespaces import ns_collection as nsc
  11. from lakesuperior.exceptions import InvalidResourceError
  12. from lakesuperior.globals import AppGlobals, ROOT_UID
  13. from lakesuperior.config_parser import parse_config
  14. from lakesuperior.store.ldp_rs.lmdb_store import TxnManager
  15. logger = logging.getLogger(__name__)
  16. class StoreWrapper(ContextDecorator):
  17. """
  18. Open and close a store.
  19. """
  20. def __init__(self, store):
  21. self.store = store
  22. def __enter__(self):
  23. self.store.open(env.app_globals.rdfly.config)
  24. def __exit__(self, *exc):
  25. self.store.close()
  26. class Migrator:
  27. """
  28. Class to handle a database migration.
  29. This class holds state of progress and shared variables as it crawls
  30. through linked resources in an LDP server.
  31. Since a repository migration can be a very long operation but it is
  32. impossible to know the number of the resources to gather by LDP interaction
  33. alone, a progress ticker outputs the number of processed resources at
  34. regular intervals.
  35. """
  36. db_params = {
  37. 'map_size': 1024 ** 4,
  38. 'metasync': False,
  39. 'readahead': False,
  40. 'meminit': False,
  41. }
  42. """
  43. LMDB database parameters.
  44. See :meth:`lmdb.Environment.__init__`
  45. """
  46. ignored_preds = (
  47. nsc['fcrepo'].hasParent,
  48. nsc['fcrepo'].hasTransactionProvider,
  49. nsc['fcrepo'].hasFixityService,
  50. )
  51. """List of predicates to ignore when looking for links."""
  52. def __init__(
  53. self, src, dest, clear=False, zero_binaries=False,
  54. compact_uris=False, skip_errors=False):
  55. """
  56. Set up base paths and clean up existing directories.
  57. :param rdflib.URIRef src: Webroot of source repository. This must
  58. correspond to the LDP root node (for Fedora it can be e.g.
  59. ``http://localhost:8080fcrepo/rest/``) and is used to determine if
  60. URIs retrieved are managed by this repository.
  61. :param str dest: Destination repository path. If the location exists
  62. it must be a writable directory. It will be deleted and recreated.
  63. If it does not exist, it will be created along with its parents if
  64. missing.
  65. :param bool clear: Whether to clear any pre-existing data at the
  66. locations indicated.
  67. :param bool zero_binaries: Whether to create zero-byte binary files
  68. rather than copy the sources.
  69. :param bool compact_uris: NOT IMPLEMENTED. Whether the process should
  70. attempt to compact URIs generated with broken up path segments. If
  71. the UID matches a pattern such as ``/12/34/56/123456...`` it is
  72. converted to ``/123456...``. This would remove a lot of cruft
  73. caused by the pairtree segments. Note that this will change the
  74. publicly exposed URIs. If durability is a concern, a rewrite
  75. directive can be added to the HTTP server that proxies the WSGI
  76. endpoint.
  77. """
  78. # Set up repo folder structure and copy default configuration to
  79. # destination file.
  80. self.dbpath = '{}/data/ldprs_store'.format(dest)
  81. self.fpath = '{}/data/ldpnr_store'.format(dest)
  82. self.config_dir = '{}/etc'.format(dest)
  83. if clear:
  84. shutil.rmtree(dest, ignore_errors=True)
  85. if not path.isdir(self.config_dir):
  86. shutil.copytree(
  87. '{}/etc.defaults'.format(basedir), self.config_dir)
  88. # Modify and overwrite destination configuration.
  89. orig_config = parse_config(self.config_dir)
  90. orig_config['application']['store']['ldp_rs']['location'] = self.dbpath
  91. orig_config['application']['store']['ldp_nr']['path'] = self.fpath
  92. if clear:
  93. with open('{}/application.yml'.format(self.config_dir), 'w') \
  94. as config_file:
  95. config_file.write(yaml.dump(orig_config['application']))
  96. env.app_globals = AppGlobals(parse_config(self.config_dir))
  97. self.rdfly = env.app_globals.rdfly
  98. self.nonrdfly = env.app_globals.nonrdfly
  99. if clear:
  100. with TxnManager(env.app_globals.rdf_store, write=True) as txn:
  101. self.rdfly.bootstrap()
  102. self.rdfly.store.close()
  103. env.app_globals.nonrdfly.bootstrap()
  104. self.src = src.rstrip('/')
  105. self.zero_binaries = zero_binaries
  106. self.skip_errors = skip_errors
  107. def migrate(self, start_pts=None, list_file=None):
  108. """
  109. Migrate the database.
  110. This method creates a fully functional and configured LAKEsuperior
  111. data set contained in a folder from an LDP repository.
  112. :param start_pts: List of starting points to retrieve
  113. resources from. It would typically be the repository root in case
  114. of a full dump or one or more resources in the repository for a
  115. partial one.
  116. :type start_pts: tuple or list
  117. :param str list_file: path to a local file containing a list of URIs,
  118. one per line.
  119. """
  120. from lakesuperior.api import resource as rsrc_api
  121. self._ct = 0
  122. with StoreWrapper(self.rdfly.store):
  123. if start_pts:
  124. for start in start_pts:
  125. if not start.startswith('/'):
  126. raise ValueError(
  127. 'Starting point {} does not begin with a slash.'
  128. .format(start))
  129. if not rsrc_api.exists(start):
  130. # Create the full hierarchy with link to the parents.
  131. rsrc_api.create_or_replace(start)
  132. # Then populate the new resource and crawl for more
  133. # relationships.
  134. self._crawl(start)
  135. elif list_file:
  136. with open(list_file, 'r') as fp:
  137. for uri in fp:
  138. uid = uri.strip().replace(self.src, '')
  139. if not rsrc_api.exists(uid):
  140. try:
  141. rsrc_api.create_or_replace(uid)
  142. except InvalidResourceError:
  143. pass
  144. self._crawl(uid)
  145. logger.info('Dumped {} resources.'.format(self._ct))
  146. return self._ct
  147. def _crawl(self, uid):
  148. """
  149. Get the contents of a resource and its relationships recursively.
  150. This method recurses into itself each time a reference to a resource
  151. managed by the repository is encountered.
  152. :param str uid: The path relative to the source server webroot
  153. pointing to the resource to crawl, effectively the resource UID.
  154. """
  155. ibase = str(nsc['fcres'])
  156. # Public URI of source repo.
  157. uri = self.src + uid
  158. # Internal URI of destination.
  159. iuri = ibase + uid
  160. try:
  161. rsp = requests.head(uri)
  162. except:
  163. logger.warn('Error retrieving resource {}'.format(uri))
  164. return
  165. if rsp:
  166. if not self.skip_errors:
  167. rsp.raise_for_status()
  168. elif rsp.status_code > 399:
  169. print('Error retrieving resource {} headers: {} {}'.format(
  170. uri, rsp.status_code, rsp.text))
  171. # Determine LDP type.
  172. ldp_type = 'ldp_nr'
  173. try:
  174. for link in requests.utils.parse_header_links(
  175. rsp.headers.get('link')):
  176. if (
  177. link.get('rel') == 'type'
  178. and (
  179. link.get('url') == str(nsc['ldp'].RDFSource)
  180. or link.get('url') == str(nsc['ldp'].Container))
  181. ):
  182. # Resource is an LDP-RS.
  183. ldp_type = 'ldp_rs'
  184. break
  185. except TypeError:
  186. ldp_type = 'ldp_rs'
  187. #raise ValueError('URI {} is not an LDP resource.'.format(uri))
  188. # Get the whole RDF document now because we have to know all outbound
  189. # links.
  190. get_uri = (
  191. uri if ldp_type == 'ldp_rs' else '{}/fcr:metadata'.format(uri))
  192. try:
  193. get_rsp = requests.get(get_uri)
  194. except:
  195. logger.warn('Error retrieving resource {}'.format(get_uri))
  196. return
  197. if get_rsp:
  198. if not self.skip_errors:
  199. get_rsp.raise_for_status()
  200. elif get_rsp.status_code > 399:
  201. print('Error retrieving resource {} body: {} {}'.format(
  202. uri, get_rsp.status_code, get_rsp.text))
  203. data = get_rsp.content.replace(
  204. self.src.encode('utf-8'), ibase.encode('utf-8'))
  205. gr = Graph(identifier=iuri).parse(data=data, format='turtle')
  206. # Store raw graph data. No checks.
  207. with TxnManager(self.rdfly.store, True):
  208. self.rdfly.modify_rsrc(uid, add_trp=set(gr))
  209. # Grab binary and set new resource parameters.
  210. if ldp_type == 'ldp_nr':
  211. provided_imr = gr.resource(URIRef(iuri))
  212. if self.zero_binaries:
  213. data = b''
  214. else:
  215. bin_rsp = requests.get(uri)
  216. if not self.skip_errors:
  217. bin_rsp.raise_for_status()
  218. elif bin_rsp.status_code > 399:
  219. print('Error retrieving resource {} body: {} {}'.format(
  220. uri, bin_rsp.status_code, bin_rsp.text))
  221. data = bin_rsp.content
  222. #import pdb; pdb.set_trace()
  223. uuid = str(gr.value(
  224. URIRef(iuri), nsc['premis'].hasMessageDigest)).split(':')[-1]
  225. fpath = self.nonrdfly.local_path(
  226. self.nonrdfly.config['path'], uuid)
  227. makedirs(path.dirname(fpath), exist_ok=True)
  228. with open(fpath, 'wb') as fh:
  229. fh.write(data)
  230. self._ct += 1
  231. if self._ct % 10 == 0:
  232. print('{} resources processed so far.'.format(self._ct))
  233. # Now, crawl through outbound links.
  234. # LDP-NR fcr:metadata must be checked too.
  235. for pred, obj in gr.predicate_objects():
  236. #import pdb; pdb.set_trace()
  237. obj_uid = obj.replace(ibase, '')
  238. with TxnManager(self.rdfly.store, True):
  239. conditions = bool(
  240. isinstance(obj, URIRef)
  241. and obj.startswith(iuri)
  242. # Avoid ∞ loop with fragment URIs.
  243. and str(urldefrag(obj).url) != str(iuri)
  244. # Avoid ∞ loop with circular references.
  245. and not self.rdfly.ask_rsrc_exists(obj_uid)
  246. and pred not in self.ignored_preds
  247. )
  248. if conditions:
  249. print('Object {} will be crawled.'.format(obj_uid))
  250. self._crawl(urldefrag(obj_uid).url)