ソースを参照

Simplify resource creation in migration.

* Remove dependency on `disable_checks`
* Use lower-level function to insert raw triples without checks
Stefano Cossu 6 年 前
コミット
8279ffa015

+ 0 - 1
lakesuperior/api/resource.py

@@ -79,7 +79,6 @@ def transaction(write=False):
             if len(env.app_globals.changelog):
                 job = Thread(target=process_queue)
                 job.start()
-            logger.debug('Deleting timestamp: {}'.format(getattr(env, 'timestamp')))
             delattr(env, 'timestamp')
             delattr(env, 'timestamp_term')
             return ret

+ 28 - 44
lakesuperior/migrator.py

@@ -3,7 +3,7 @@ import shutil
 
 from io import BytesIO
 from contextlib import ContextDecorator
-from os import path
+from os import makedirs, path
 from urllib.parse import urldefrag
 
 import lmdb
@@ -109,10 +109,6 @@ class Migrator:
         orig_config = parse_config(self.config_dir)
         orig_config['application']['store']['ldp_rs']['location'] = self.dbpath
         orig_config['application']['store']['ldp_nr']['path'] = self.fpath
-        # This sets a "hidden" configuration property that bypasses all server
-        # management on resource load: referential integrity, server-managed
-        # triples, etc. This will be removed at the end of the migration.
-        orig_config['application']['store']['ldp_rs']['disable_checks'] = True
 
         with open('{}/application.yml'.format(self.config_dir), 'w') \
                 as config_file:
@@ -121,20 +117,18 @@ class Migrator:
         env.config = parse_config(self.config_dir)
         env.app_globals = AppGlobals(env.config)
 
+        self.rdfly = env.app_globals.rdfly
+        self.nonrdfly = env.app_globals.nonrdfly
+
         with TxnManager(env.app_globals.rdf_store, write=True) as txn:
-            env.app_globals.rdfly.bootstrap()
-            env.app_globals.rdfly.store.close()
+            self.rdfly.bootstrap()
+            self.rdfly.store.close()
         env.app_globals.nonrdfly.bootstrap()
 
         self.src = src.rstrip('/')
         self.zero_binaries = zero_binaries
         self.skip_errors = skip_errors
 
-        from lakesuperior.api import resource as rsrc_api
-        self.rsrc_api = rsrc_api
-        print('Environment: {}'.format(env))
-        print('Resource API Environment: {}'.format(self.rsrc_api.env))
-
 
 
     def migrate(self, start_pts=None, list_file=None):
@@ -151,7 +145,7 @@ class Migrator:
         one per line.
         """
         self._ct = 0
-        with StoreWrapper(env.app_globals.rdfly.store):
+        with StoreWrapper(self.rdfly.store):
             if start_pts:
                 for start in start_pts:
                     if not start.startswith('/'):
@@ -164,7 +158,6 @@ class Migrator:
                 with open(list_file, 'r') as fp:
                     for uri in fp:
                         self._crawl(uri.strip().replace(self.src, ''))
-        self._remove_temp_options()
         logger.info('Dumped {} resources.'.format(self._ct))
 
         return self._ct
@@ -224,17 +217,16 @@ class Migrator:
 
         data = get_rsp.content.replace(
                 self.src.encode('utf-8'), ibase.encode('utf-8'))
-        #logger.debug('Localized data: {}'.format(data.decode('utf-8')))
         gr = Graph(identifier=iuri).parse(data=data, format='turtle')
+        # Store raw graph data. No checks.
+        with TxnManager(self.rdfly.store, True):
+            self.rdfly.modify_rsrc(uid, add_trp=set(gr))
 
         # Grab binary and set new resource parameters.
         if ldp_type == 'ldp_nr':
             provided_imr = gr.resource(URIRef(iuri))
             if self.zero_binaries:
-                data = b'\x00'
-                mimetype = str(provided_imr.value(
-                        nsc['ebucore'].hasMimeType,
-                        default='application/octet-stream'))
+                data = b''
             else:
                 bin_rsp = requests.get(uri)
                 if not self.skip_errors:
@@ -243,22 +235,18 @@ class Migrator:
                     print('Error retrieving resource {} body: {} {}'.format(
                         uri, bin_rsp.status_code, bin_rsp.text))
                 data = bin_rsp.content
-                mimetype = bin_rsp.headers.get('content-type')
-
-            self.rsrc_api.create_or_replace(
-                    uid, mimetype=mimetype, provided_imr=provided_imr,
-                    stream=BytesIO(data))
-        else:
-            mimetype = 'text/turtle'
-            # @TODO This can be improved by creating a resource API method for
-            # creating a resource from an RDFLib graph. Here we had to deserialize
-            # the RDF data to gather information but have to pass the original
-            # serialized stream, which has to be deserialized again in the model.
-            self.rsrc_api.create_or_replace(
-                    uid, mimetype=mimetype, stream=BytesIO(data))
+            #import pdb; pdb.set_trace()
+            uuid = str(gr.value(
+                URIRef(iuri), nsc['premis'].hasMessageDigest)).split(':')[-1]
+            fpath = self.nonrdfly.local_path(
+                    self.nonrdfly.config['path'], uuid)
+            makedirs(path.dirname(fpath), exist_ok=True)
+            with open(fpath, 'wb') as fh:
+                fh.write(data)
+
 
         self._ct += 1
-        if self._ct % 10 ==0:
+        if self._ct % 10 == 0:
             print('{} resources processed so far.'.format(self._ct))
 
         # Now, crawl through outbound links.
@@ -266,20 +254,16 @@ class Migrator:
         for pred, obj in gr.predicate_objects():
             #import pdb; pdb.set_trace()
             obj_uid = obj.replace(ibase, '')
-            if (
+            with TxnManager(self.rdfly.store, True):
+                conditions = bool(
                     isinstance(obj, URIRef)
                     and obj.startswith(iuri)
+                    # Avoid ∞ loop with fragment URIs.
                     and str(urldefrag(obj).url) != str(iuri)
-                    and not self.rsrc_api.exists(obj_uid) # Avoid ∞ loop
+                    # Avoid ∞ loop with circular references.
+                    and not self.rdfly.ask_rsrc_exists(obj_uid)
                     and pred not in self.ignored_preds
-            ):
+                )
+            if conditions:
                 print('Object {} will be crawled.'.format(obj_uid))
                 self._crawl(urldefrag(obj_uid).url)
-
-
-    def _remove_temp_options(self):
-        """Remove temporary options in configuration."""
-        del(env.config['application']['store']['ldp_rs']['disable_checks'])
-        with open('{}/application.yml'.format(self.config_dir), 'w') \
-                as config_file:
-            config_file.write(yaml.dump(env.config['application']))

+ 8 - 11
lakesuperior/model/ldp_factory.py

@@ -90,7 +90,6 @@ class LdpFactory:
         @param **kwargs Arguments passed to the LDP class constructor.
         '''
         uri = nsc['fcres'][uid]
-        disable_checks = rdfly.config.get('disable_checks', False)
 
         if not stream:
             # Create empty LDPC.
@@ -118,13 +117,12 @@ class LdpFactory:
 
             inst = cls(uid, provided_imr=provided_imr, **kwargs)
 
-            if not disable_checks:
-                # Make sure we are not updating an LDP-RS with an LDP-NR.
-                if inst.is_stored and LDP_NR_TYPE in inst.ldp_types:
-                    raise IncompatibleLdpTypeError(uid, mimetype)
+            # Make sure we are not updating an LDP-RS with an LDP-NR.
+            if inst.is_stored and LDP_NR_TYPE in inst.ldp_types:
+                raise IncompatibleLdpTypeError(uid, mimetype)
 
-                if kwargs.get('handling', 'strict') != 'none':
-                    inst._check_mgd_terms(inst.provided_imr.graph)
+            if kwargs.get('handling', 'strict') != 'none':
+                inst._check_mgd_terms(inst.provided_imr.graph)
 
         else:
             # Create a LDP-NR and equip it with the binary file provided.
@@ -134,10 +132,9 @@ class LdpFactory:
             inst = LdpNr(uid, stream=stream, mimetype=mimetype,
                     provided_imr=provided_imr, **kwargs)
 
-            if not disable_checks:
-                # Make sure we are not updating an LDP-NR with an LDP-RS.
-                if inst.is_stored and LDP_RS_TYPE in inst.ldp_types:
-                    raise IncompatibleLdpTypeError(uid, mimetype)
+            # Make sure we are not updating an LDP-NR with an LDP-RS.
+            if inst.is_stored and LDP_RS_TYPE in inst.ldp_types:
+                raise IncompatibleLdpTypeError(uid, mimetype)
 
         logger.info('Creating resource of type: {}'.format(
                 inst.__class__.__name__))

+ 16 - 26
lakesuperior/model/ldpr.py

@@ -122,7 +122,6 @@ class Ldpr(metaclass=ABCMeta):
         self.provided_imr = provided_imr
 
         # Disable all internal checks e.g. for raw I/O.
-        self.disable_checks = rdfly.config.get('disable_checks', False)
 
 
     @property
@@ -349,31 +348,23 @@ class Ldpr(metaclass=ABCMeta):
         '''
         create = create_only or not self.is_stored
 
-        if not self.disable_checks:
-            ev_type = RES_CREATED if create else RES_UPDATED
-            self._add_srv_mgd_triples(create)
-            ref_int = rdfly.config['referential_integrity']
-            if ref_int:
-                self._check_ref_int(ref_int)
+        ev_type = RES_CREATED if create else RES_UPDATED
+        self._add_srv_mgd_triples(create)
+        ref_int = rdfly.config['referential_integrity']
+        if ref_int:
+            self._check_ref_int(ref_int)
 
-            # Delete existing triples if replacing.
-            if not create:
-                rdfly.truncate_rsrc(self.uid)
+        # Delete existing triples if replacing.
+        if not create:
+            rdfly.truncate_rsrc(self.uid)
 
-            remove_trp = {
-                (self.uri, nsc['fcrepo'].lastModified, None),
-                (self.uri, nsc['fcrepo'].lastModifiedBy, None),
-            }
-            add_trp = (
-                    set(self.provided_imr.graph)
-                    | self._containment_rel(create))
-        else:
-            try:
-                remove_trp = set(self.imr.graph)
-            except ResourceNotExistsError:
-                remove_trp = set()
-            add_trp = set(self.provided_imr.graph)
-            ev_type = None
+        remove_trp = {
+            (self.uri, nsc['fcrepo'].lastModified, None),
+            (self.uri, nsc['fcrepo'].lastModifiedBy, None),
+        }
+        add_trp = (
+                set(self.provided_imr.graph)
+                | self._containment_rel(create))
 
         self._modify_rsrc(ev_type, remove_trp, add_trp)
         new_gr = Graph()
@@ -593,8 +584,7 @@ class Ldpr(metaclass=ABCMeta):
 
         if (
                 ev_type is not None
-                and env.config['application'].get('messaging')
-                and not self.disable_checks):
+                and env.config['application'].get('messaging')):
             logger.debug('Enqueuing message for {}'.format(self.uid))
             self._enqueue_msg(ev_type, remove_trp, add_trp)
 

+ 2 - 1
lakesuperior/store/ldp_rs/rsrc_centric_layout.py

@@ -495,7 +495,8 @@ class RsrcCentricLayout:
             # Add metadata.
             meta_gr.set(
                     (gr_uri, nsc['foaf'].primaryTopic, nsc['fcres'][uid]))
-            meta_gr.set((gr_uri, nsc['fcrepo'].created, env.timestamp_term))
+            ts = getattr(env, 'timestamp_term', Literal(arrow.utcnow()))
+            meta_gr.set((gr_uri, nsc['fcrepo'].created, ts))
             if historic:
                 # @FIXME Ugly reverse engineering.
                 ver_uid = uid.split(VERS_CONT_LABEL)[1].lstrip('/')