123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- import logging
- import shutil
- from io import BytesIO
- from contextlib import ContextDecorator
- from os import path
- import lmdb
- import requests
- import yaml
- from rdflib import Graph, URIRef
- from lakesuperior.dictionaries.namespaces import ns_collection as nsc
- from lakesuperior.env import env
- from lakesuperior.globals import AppGlobals
- from lakesuperior.config_parser import parse_config
- from lakesuperior.store.ldp_rs.lmdb_store import TxnManager
- logger = logging.getLogger(__name__)
- class StoreWrapper(ContextDecorator):
- '''
- Open and close a store.
- '''
- def __init__(self, store):
- self.store = store
- def __enter__(self):
- self.store.open(
- env.config['application']['store']['ldp_rs'])
- def __exit__(self, *exc):
- self.store.close()
- class Migrator:
- """
- Class to handle a database migration.
- This class holds state of progress and shared variables as it crawls
- through linked resources in an LDP server.
- Since a repository migration can be a very long operation but it is
- impossible to know the number of the resources to gather by LDP interaction
- alone, a progress ticker outputs the number of processed resources at
- regular intervals.
- """
- """
- LMDB database parameters.
- See :meth:`lmdb.Environment.__init__`
- """
- db_params = {
- 'map_size': 1024 ** 4,
- 'metasync': False,
- 'readahead': False,
- 'meminit': False,
- }
- """List of predicates to ignore when looking for links."""
- ignored_preds = (
- nsc['fcrepo'].hasParent,
- nsc['fcrepo'].hasTransactionProvider,
- )
- def __init__(
- self, src, dest, start_pts, binary_handling='include',
- compact_uris=False):
- """
- Set up base paths and clean up existing directories.
- :param src: (URIRef) Webroot of source repository. This must
- correspond to the LDP root node (for Fedora it can be e.g.
- ``http://localhost:8080fcrepo/rest/``) and is used to determine if URIs
- retrieved are managed by this repository.
- :param dest: (str) Destination repository path. If the location exists
- it must be a writable directory. It will be deleted and recreated. If
- it does not exist, it will be created along with its parents if
- missing.
- :param start_pts: (tuple|list) List of starting points to retrieve
- resources from. It would typically be the repository root in case of a
- full dump or one or more resources in the repository for a partial one.
- :param binary_handling: (string) One of ``include``, ``truncate`` or
- ``split``.
- :param compact_uris: (bool) NOT IMPLEMENTED. Whether the process should
- attempt to compact URIs generated with broken up path segments. If the
- UID matches a pattern such as `/12/34/56/123456...` it is converted to
- `/123456...`. This would remove a lot of cruft caused by the pairtree
- segments. Note that this will change the publicly exposed URIs. If
- durability is a concern, a rewrite directive can be added to the HTTP
- server that proxies the WSGI endpoint.
- """
- # Set up repo folder structure and copy default configuration to
- # destination file.
- cur_dir = path.dirname(path.dirname(path.abspath(__file__)))
- self.dbpath = '{}/data/ldprs_store'.format(dest)
- self.fpath = '{}/data/ldpnr_store'.format(dest)
- config_dir = '{}/etc'.format(dest)
- shutil.rmtree(dest, ignore_errors=True)
- shutil.copytree(
- '{}/etc.defaults'.format(cur_dir), config_dir)
- # Modify and overwrite destination configuration.
- orig_config = parse_config(config_dir)
- orig_config['application']['store']['ldp_rs']['location'] = self.dbpath
- orig_config['application']['store']['ldp_nr']['path'] = self.fpath
- # This sets a "hidden" configuration property that bypasses all server
- # management on resource load: referential integrity, server-managed
- # triples, etc. This will be removed at the end of the migration.
- orig_config['application']['store']['ldp_rs']['disable_checks'] = True
- with open('{}/application.yml'.format(config_dir), 'w') as config_file:
- config_file.write(yaml.dump(orig_config['application']))
- env.config = parse_config(config_dir)
- env.app_globals = AppGlobals(env.config)
- with TxnManager(env.app_globals.rdf_store, write=True) as txn:
- env.app_globals.rdfly.bootstrap()
- env.app_globals.rdfly.store.close()
- env.app_globals.nonrdfly.bootstrap()
- self.src = src.rstrip('/')
- self.start_pts = start_pts
- from lakesuperior.api import resource as rsrc_api
- self.rsrc_api = rsrc_api
- print('Environment: {}'.format(env))
- print('Resource API Environment: {}'.format(self.rsrc_api.env))
- def migrate(self):
- """
- Migrate the database.
- This method creates a fully functional and configured LAKEsuperior
- environment contained in a folder from an LDP repository.
- """
- self._ct = 0
- with StoreWrapper(env.app_globals.rdfly.store):
- for start in self.start_pts:
- if not start.startswith('/'):
- raise ValueError(
- 'Starting point {} does not begin with a slash.'
- .format(start))
- self._crawl(start)
- #self._remove_temp_options()
- logger.info('Dumped {} resources.'.format(self._ct))
- return self._ct
- def _crawl(self, uid):
- """
- Get the contents of a resource and its relationships recursively.
- This method recurses into itself each time a reference to a resource
- managed by the repository is encountered.
- @param uid (string) The path relative to the source server webroot
- pointing to the resource to crawl, effectively the resource UID.
- """
- ibase = str(nsc['fcres'])
- # Public URI of source repo.
- uri = self.src + uid
- # Internal URI of destination.
- iuri = ibase + uid
- rsp = requests.head(uri)
- rsp.raise_for_status()
- # Determine LDP type.
- ldp_type = 'ldp_nr'
- try:
- for link in requests.utils.parse_header_links(
- rsp.headers.get('link')):
- if (
- link.get('rel') == 'type'
- and link.get('url') == str(nsc['ldp'].RDFSource)):
- # Resource is an LDP-RS.
- ldp_type = 'ldp_rs'
- break
- except TypeError:
- raise ValueError('URI {} is not an LDP resource.'.format(uri))
- # Get the whole RDF document now because we have to know all outbound
- # links.
- get_uri = (
- uri if ldp_type == 'ldp_rs' else '{}/fcr:metadata'.format(uri))
- get_req = requests.get(get_uri)
- get_req.raise_for_status()
- data = get_req.content.replace(
- self.src.encode('utf-8'), ibase.encode('utf-8'))
- #logger.debug('Localized data: {}'.format(data.decode('utf-8')))
- gr = Graph(identifier=iuri).parse(data=data, format='turtle')
- # Grab binary and set new resource parameters.
- if ldp_type == 'ldp_nr':
- bin_resp = requests.get('{}/fcr:content'.format(uri))
- bin_resp.raise_for_status()
- data = bin_resp.content
- provided_imr = gr.resource(URIRef(iuri))
- mimetype = bin_resp.headers.get('content-type')
- self.rsrc_api.create_or_replace(
- uid, mimetype=mimetype, provided_imr=provided_imr,
- stream=BytesIO(data))
- else:
- mimetype = 'text/turtle'
- # @TODO This can be improved by creating a resource API method for
- # creating a resource from an RDFLib graph. Here we had to deserialize
- # the RDF data to gather information but have to pass the original
- # serialized stream, which has to be deserialized again in the model.
- self.rsrc_api.create_or_replace(
- uid, mimetype=mimetype, stream=BytesIO(data))
- self._ct += 1
- if self._ct % 10 ==0:
- print('{} resources processed.'.format(self._ct))
- # Now, crawl through outbound links.
- # LDP-NR fcr:metadata must be checked too.
- for pred, obj in gr.predicate_objects():
- uid = obj.replace(ibase, '')
- if (
- isinstance(obj, URIRef)
- and obj.startswith(iuri)
- and not self.rsrc_api.exists(uid) # Avoid ∞ loop
- and pred not in self.ignored_preds):
- self._crawl(uid)
- def _remove_temp_options(self):
- """Remove temporary options in configuration."""
- with open('{}/application.yml'.format(config_dir), 'w') as config_file:
- config['application']['store']['ldp_rs']['disable_checks'] = True
- config_file.write(yaml.dump(orig_config['application']))
|