Sfoglia il codice sorgente

First complete dump.

* Dump full graphs into LMDB.
* Dump binaries following path conventions.
* Make local_path a static method.
Stefano Cossu 6 anni fa
parent
commit
6cfb06364f

+ 51 - 34
lakesuperior/api/admin.py

@@ -1,4 +1,5 @@
 import logging
+import os
 
 import click_log
 from contextlib import ExitStack
@@ -9,9 +10,12 @@ import requests
 
 from rdflib import Graph, URIRef
 
+import lakesuperior.env_setup
+
 from lakesuperior.dictionaries.namespaces import ns_collection as nsc
 from lakesuperior.env import env
 from lakesuperior.store.ldp_rs.lmdb_store import TxnManager
+from lakesuperior.store.ldp_nr.default_layout import DefaultLayout as FileLayout
 
 __doc__ = '''
 Admin API.
@@ -22,6 +26,11 @@ This module contains maintenance utilities and stats.
 logger = logging.getLogger(__name__)
 app_globals = env.app_globals
 
+_ignore_list = (
+    nsc['fcrepo'].hasParent,
+    nsc['fcrepo'].hasTransactionProvider,
+)
+
 
 def stats():
     '''
@@ -36,7 +45,6 @@ def stats():
     return repo_stats
 
 
-@click_log.simple_verbosity_option(logger)
 def dump(
         src, dest, start=('/',), binary_handling='include',
         compact_uris=False):
@@ -47,9 +55,9 @@ def dump(
     correspond to the LDP root node (for Fedora it can be e.g.
     `http://localhost:8080fcrepo/rest/`) and is used to determine if URIs
     retrieved are managed by this repository.
-    @param dest (rdflib.URIRef) Base URI of the destination. This can be any
-    container in a LAKEsuperior server. If the resource exists, it must be an
-    LDP container. If it does not exist, it will be created.
+    @param dest (str) Local path of the destination. If the location exists it
+    must be a writable directory. It will be deleted and recreated. If it does
+    not exist, it will be created along with its parents if missing.
     @param start (tuple|list) List of starting points to retrieve resources
     from. It would typically be the repository root in case of a full dump
     or one or more resources in the repository for a partial one.
@@ -63,38 +71,35 @@ def dump(
     server that proxies the WSGI endpoint.
     '''
     # 1. Retrieve list of resources.
-    if not isinstance(start, list) and not isinstance(start, tuple):
-        start = (start,)
-    _gather_resources(src, start)
+    start_pts = (
+            (start,)
+            if not isinstance(start, list) and not isinstance(start, tuple)
+            else start)
 
-
-def _gather_resources(webroot, start_pts):
-    '''
-    Gather all resources recursively and save them to temporary store.
-
-    Resource UIDs (without the repository webroot) are saved as unique keys
-    in a temporary store.
-
-    @param webroot (string) Base URI of the repository.
-    @param start_pts (tuple|list) Starting points to gather.
-    '''
-    dbpath = '/var/tmp/fcrepo_migration_data'
+    dbpath = '{}/ldprs_store'.format(dest)
     rmtree(dbpath, ignore_errors=True)
+    os.makedirs(dbpath)
+    fpath = '{}/ldpnr_store'.format(dest)
+    rmtree(fpath, ignore_errors=True)
+    os.makedirs(fpath)
+
     with lmdb.open(
             dbpath, 1024 ** 4, metasync=False, readahead=False,
             meminit=False) as db:
-        #import pdb; pdb.set_trace()
         for start in start_pts:
             if not start.startswith('/'):
                 raise ValueError(
                         'Starting point {} does not begin with a slash.'
                         .format(start))
 
-            _gather_refs(db, webroot, start)
+            _gather_refs(db, src, start, dest)
+        entries = db.stat()['entries']
+        logger.info('Dumped {} resources.'.format(entries))
+
+    return entries
 
 
-@click_log.simple_verbosity_option(logger)
-def _gather_refs(db, base, path):
+def _gather_refs(db, base, path, dest):
     '''
     Get the UID of a resource and its relationships recursively.
 
@@ -103,13 +108,14 @@ def _gather_refs(db, base, path):
 
     @param base (string) Base URL of repository. This is used to determine
     whether encountered URI terms are repository-managed.
-    @param base (string) Path, relative to base URL, of the resource to gather.
+    @param path (string) Path, relative to base URL, of the resource to gather.
+    @param dest (string) Local path for RDF database and non-RDF files.
     '''
     pfx = base.rstrip('/')
     # Public URI of source repo.
     uri = pfx + path
     # Internal URI of destination.
-    iuri = uri.replace(pfx, nsc['fcres'])
+    iuri = URIRef(uri.replace(pfx, nsc['fcres']))
     ibase = base.replace(pfx, nsc['fcres'])
 
     rsp = requests.head(uri)
@@ -124,15 +130,12 @@ def _gather_refs(db, base, path):
             ldp_type = 'ldp_rs'
             break
 
-    if ldp_type == 'ldp_rs':
-        # Get the whole RDF document now because we have to know all outbound
-        # links.
-        get_uri = uri
-    else:
-        get_uri = uri + '/fcr:metadata'
-
+    # Get the whole RDF document now because we have to know all outbound
+    # links.
+    get_uri = uri if ldp_type == 'ldp_rs' else '{}/fcr:metadata'.format(uri)
     get_req = requests.get(get_uri)
     get_req.raise_for_status()
+
     data = get_req.content.replace(base.encode('utf-8'), ibase.encode('utf-8'))
     logger.debug('Localized data: {}'.format(data.decode('utf-8')))
     gr = Graph(identifier=iuri).parse(data=data, format='turtle')
@@ -148,16 +151,30 @@ def _gather_refs(db, base, path):
             if not cur.set_key(iuri.encode('utf-8')):
                 cur.put(uri.encode('utf-8'), data)
 
+    # Grab binary.
+    if ldp_type == 'ldp_nr':
+        bin_resp = requests.get('{}/fcr:content'.format(uri))
+        bin_resp.raise_for_status()
+
+        # @FIXME Use a more robust checking mechanism. Maybe offer the option
+        # to verify the content checksum.
+        cnt_hash = gr.value(iuri, nsc['premis'].hasMessageDigest).replace(
+                'urn:sha1:', '')
+        fpath = FileLayout.local_path('{}/ldpnr_store'.format(dest), cnt_hash)
+        os.makedirs(os.path.dirname(fpath), exist_ok=True)
+        with open(fpath, 'wb') as f:
+            f.write(bin_resp.content)
+
     # Now, crawl through outbound links.
     # LDP-NR fcr:metadata must be checked too.
     for pred, obj in gr.predicate_objects():
         if (
                 isinstance(obj, URIRef)
                 and obj.startswith(iuri)
-                and pred != nsc['fcrepo'].hasParent):
+                and pred not in _ignore_list):
             with db.begin() as txn:
                 with txn.cursor() as cur:
                     # Avoid ∞
                     if cur.set_key(obj.encode('utf-8')):
                         continue
-            _gather_refs(db, base, obj.replace(ibase, ''))
+            _gather_refs(db, base, obj.replace(ibase, ''), dest)

+ 3 - 3
lakesuperior/endpoints/ldp.py

@@ -119,9 +119,9 @@ def get_resource(uid, out_fmt=None):
 
     @param uid (string) UID of resource to retrieve. The repository root has
     an empty string for UID.
-    @param force_rdf (boolean) Whether to retrieve RDF even if the resource is
+    @param out_fmt (string) Force output to RDF or non-RDF if the resource is
     a LDP-NR. This is not available in the API but is used e.g. by the
-    `*/fcr:metadata` endpoint. The default is False.
+    `*/fcr:metadata` and `*/fcr:content` endpoints. The default is False.
     '''
     logger.info('UID: {}'.format(uid))
     out_headers = std_headers
@@ -154,7 +154,7 @@ def get_resource(uid, out_fmt=None):
             if not getattr(rsrc, 'filename', False):
                 return ('{} has no binary content.'.format(rsrc.uid), 404)
 
-            logger.info('Streaming out binary content.')
+            logger.debug('Streaming out binary content.')
             rsp = make_response(send_file(
                     rsrc.local_path, as_attachment=True,
                     attachment_filename=rsrc.filename,

+ 2 - 1
lakesuperior/model/ldp_nr.py

@@ -63,7 +63,8 @@ class LdpNr(Ldpr):
     def local_path(self):
         cksum_term = self.imr.value(nsc['premis'].hasMessageDigest)
         cksum = str(cksum_term.identifier.replace('urn:sha1:',''))
-        return nonrdfly.local_path(cksum)
+        return nonrdfly.__class__.local_path(
+                nonrdfly.root, cksum, nonrdfly.bl, nonrdfly.bc)
 
 
     def create_or_replace(self, create_only=False):

+ 1 - 1
lakesuperior/model/ldpr.py

@@ -249,7 +249,7 @@ class Ldpr(metaclass=ABCMeta):
             if (
                 # Exclude digest hash and version information.
                 t[1] not in {
-                    nsc['premis'].hasMessageDigest,
+                    #nsc['premis'].hasMessageDigest,
                     nsc['fcrepo'].hasVersion,
                 }
             ) and (

+ 33 - 27
lakesuperior/store/ldp_nr/default_layout.py

@@ -15,6 +15,36 @@ class DefaultLayout(BaseNonRdfLayout):
     '''
     Default file layout.
     '''
+    @staticmethod
+    def local_path(root, uuid, bl=4, bc=4):
+        '''
+        Generate the resource path splitting the resource checksum according to
+        configuration parameters.
+
+        @param uuid (string) The resource UUID. This corresponds to the content
+        checksum.
+        '''
+        logger.debug('Generating path from uuid: {}'.format(uuid))
+        term = len(uuid) if bc == 0 else min(bc * bl, len(uuid))
+
+        path = [uuid[i : i + bl] for i in range(0, term, bl)]
+
+        if bc > 0:
+            path.append(uuid[term :])
+        path.insert(0, root)
+
+        return '/'.join(path)
+
+
+    def __init__(self, *args, **kwargs):
+        '''
+        Set up path segmentation parameters.
+        '''
+        super().__init__(*args, **kwargs)
+
+        self.bl = self.config['pairtree_branch_length']
+        self.bc = self.config['pairtree_branches']
+
 
     ## INTERFACE METHODS ##
 
@@ -60,11 +90,11 @@ class DefaultLayout(BaseNonRdfLayout):
             os.unlink(tmp_file)
             raise
         if size == 0:
-            logger.warn('Zero-file size received.')
+            logger.warn('Zero-length file received.')
 
         # Move temp file to final destination.
         uuid = hash.hexdigest()
-        dst = self.local_path(uuid)
+        dst = __class__.local_path(self.root, uuid, self.bl, self.bc)
         logger.debug('Saving file to disk: {}'.format(dst))
         if not os.access(os.path.dirname(dst), os.X_OK):
             os.makedirs(os.path.dirname(dst))
@@ -84,28 +114,4 @@ class DefaultLayout(BaseNonRdfLayout):
         '''
         See BaseNonRdfLayout.delete.
         '''
-        os.unlink(self.local_path(uuid))
-
-
-    ## PROTECTED METHODS ##
-
-    def local_path(self, uuid):
-        '''
-        Generate the resource path splitting the resource checksum according to
-        configuration parameters.
-
-        @param uuid (string) The resource UUID. This corresponds to the content
-        checksum.
-        '''
-        logger.debug('Generating path from uuid: {}'.format(uuid))
-        bl = self.config['pairtree_branch_length']
-        bc = self.config['pairtree_branches']
-        term = len(uuid) if bc==0 else min(bc*bl, len(uuid))
-
-        path = [ uuid[i:i+bl] for i in range(0, term, bl) ]
-
-        if bc > 0:
-            path.append(uuid[term:])
-        path.insert(0, self.root)
-
-        return '/'.join(path)
+        os.unlink(__class__.local_path(self.root, uuid, self.bl, self.bc))

+ 2 - 1
lsup-admin

@@ -143,7 +143,8 @@ def dump(src, dest, start, binaries):
     another LDP-compatible implementation.
     '''
     logger.info('Dumping database.')
-    return admin_api.dump(src, dest, start, binaries)
+    entries = admin_api.dump(src, dest, start, binaries)
+    logger.info('Dumped {} resources.'.format(entries))
 
 
 @click.command()