Kaynağa Gözat

Initial binary file storage.

Stefano Cossu 7 yıl önce
ebeveyn
işleme
4918816be7

+ 34 - 24
lakesuperior/endpoints/ldp.py

@@ -1,8 +1,10 @@
 import logging
 
 from collections import defaultdict
+from uuid import uuid4
 
 from flask import Blueprint, request
+from werkzeug.datastructures import FileStorage
 
 from lakesuperior.exceptions import InvalidResourceError, \
         ResourceExistsError, ResourceNotExistsError, \
@@ -98,29 +100,7 @@ def post_resource(parent):
     except KeyError:
         slug = None
 
-    logger.debug('Content type: {}'.format(request.mimetype))
-    logger.debug('files: {}'.format(request.files))
-    #logger.debug('stream: {}'.format(request.stream))
-    #logger.debug('form: {}'.format(request.form))
-    #logger.debug('data: {}'.format(request.data))
-    if request.mimetype in accept_post_rdf:
-        cls = Ldpc
-        data = request.data.decode('utf-8')
-    else:
-        cls = LdpNr
-        if request.mimetype == 'multipart/form-data':
-            # This seems the "right" way to upload a binary file, with a multipart/
-            # form-data MIME type and the file in the `file` field. This however is
-            # not supported by FCREPO4.
-            data = request.files.get('file')
-        else:
-            # This is a less clean way, with the file in the form body and the
-            # request as application/x-www-form-urlencoded.
-            # This is how FCREPO4 accepts binary uploads.
-            data = request.data
-
-    logger.info('POSTing resource of type: {}'.format(cls.__name__))
-    #logger.info('POST data: {}'.format(data))
+    cls, data = class_from_req_body()
 
     try:
        rsrc = cls.inst_for_post(parent, slug)
@@ -148,7 +128,10 @@ def put_resource(uuid):
     '''
     logger.info('Request headers: {}'.format(request.headers))
     rsp_headers = std_headers
-    rsrc = Ldpc(uuid)
+
+    cls, data = class_from_req_body()
+
+    rsrc = cls(uuid)
 
     logger.debug('form: {}'.format(request.form))
     # Parse headers.
@@ -209,3 +192,30 @@ def delete_resource(uuid):
     return '', 204, headers
 
 
+def class_from_req_body():
+    logger.debug('Content type: {}'.format(request.mimetype))
+    #logger.debug('files: {}'.format(request.files))
+    logger.debug('stream: {}'.format(request.stream))
+    if request.mimetype in accept_post_rdf:
+        cls = Ldpc
+        # Parse out the RDF string.
+        data = request.data.decode('utf-8')
+    else:
+        cls = LdpNr
+        if request.mimetype == 'multipart/form-data':
+            # This seems the "right" way to upload a binary file, with a
+            # multipart/form-data MIME type and the file in the `file` field.
+            # This however is not supported by FCREPO4.
+            data = request.files.get('file').stream
+        else:
+            # This is a less clean way, with the file in the form body and the
+            # request as application/x-www-form-urlencoded.
+            # This is how FCREPO4 accepts binary uploads.
+            data = request.stream
+
+    logger.info('POSTing resource of type: {}'.format(cls.__name__))
+    #logger.info('POST data: {}'.format(data))
+
+    return cls, data
+
+

+ 7 - 18
lakesuperior/model/ldp_nr.py

@@ -26,35 +26,24 @@ class LdpNr(Ldpr):
         raise NotImplementedError()
 
 
-    def post(self, data):
+    def post(self, stream):
+        '''
+        Create a new binary resource with a corresponding RDF representation.
+
+        @param file (Stream) A Stream resource representing the uploaded file.
+        '''
         #self._logger.debug('Data: {}'.format(data[:256]))
         metadata_rsrc = Resource(Graph(), self.urn)
 
         for t in self.base_types:
             metadata_rsrc.add(RDF.type, t)
 
-        cksum = Digest.non_rdf_cksum(data)
+        cksum = self.nonrdfly.persist(stream)
         cksum_term = URIRef('urn:sha1:{}'.format(cksum))
         metadata_rsrc.add(nsc['premis'].hasMessageDigest, cksum_term)
 
-        self._store_binary(data, cksum)
-
-
 
     def put(self, data):
         raise NotImplementedError()
 
 
-
-    ## PROTECTED METHODS ##
-
-    def _store_binary(self, data, cksum):
-        '''
-        Move a binary file to persistent storage.
-
-        @param data (bytes) Binary data to store.
-        @param cksum (string) Digest of the data. This is used to determine the
-        file location.
-        '''
-        pass
-

+ 5 - 17
lakesuperior/store_layouts/non_rdf/base_non_rdf_layout.py

@@ -21,23 +21,11 @@ class BaseNonRdfLayout(metaclass=ABCMeta):
         self.root = self._conf['path']
 
 
-    ## PROTECTED METHODS ##
+    ## INTERFACE METHODS ##
 
-    def _path(self, hash):
+    @abstractmethod
+    def persist(self, file):
         '''
-        Generate the resource path splitting the resource checksum according to
-        configuration parameters.
-
-        @param hash (string) The resource hash.
+        Store the stream in the designated persistence layer for this layout.
         '''
-        bl = self._conf['pairtree_branch_length']
-        bc = self._conf['pairtree_branches']
-        term = len(hash) if bc==0 else min(bc*bl, len(hash))
-
-        path = [ hash[i:i+bl] for i in range(0, term, bl) ]
-
-        if bc > 0:
-            path.append(hash[:term])
-        path.insert(0, self.root)
-
-        return '/'.join(path)
+        pass

+ 78 - 1
lakesuperior/store_layouts/non_rdf/default_layout.py

@@ -1,3 +1,9 @@
+import os
+
+from hashlib import sha1
+from shutil import copyfileobj
+from uuid import uuid4
+
 from lakesuperior.store_layouts.non_rdf.base_non_rdf_layout import \
         BaseNonRdfLayout
 
@@ -6,4 +12,75 @@ class DefaultLayout(BaseNonRdfLayout):
     This is momentarily a stub until more non-RDF layouts use cases are
     gathered.
     '''
-    pass
+
+    ## INTERFACE METHODS ##
+
+    def persist(self, stream, bufsize=8192):
+        '''
+        Store the stream in the file system.
+
+        This method handles the file in chunks. for each chunk it writes to a
+        temp file and adds to a checksum. Once the whole file is written out
+        to disk and hashed, the temp file is moved to its final location which
+        is determined by the hash value.
+
+        @param stream (IOstream): file-like object to persist.
+        @param bufsize (int) Chunk size. 2**12 to 2**15 is a good range.
+        '''
+        tmp_file = '{}/tmp/{}'.format(self.root, uuid4())
+        try:
+            with open(tmp_file, 'wb') as f:
+                #if hasattr(stream, 'seek'):
+                #    stream.seek(0)
+                self._logger.debug('Writing temp file to {}.'.format(tmp_file))
+
+                hash = sha1()
+                while True:
+                    buf = stream.read(bufsize)
+                    if not buf:
+                        break
+                    hash.update(buf)
+                    f.write(buf)
+        except:
+            self._logger.exception('File write failed on {}.'.format(tmp_file))
+            os.unlink(tmp_file)
+            raise
+
+        # Move temp file to final destination.
+
+        digest = hash.hexdigest()
+        dst = self._path(digest)
+        self._logger.debug('Saving file to disk: {}'.format(dst))
+        if not os.access(os.path.dirname(dst), os.X_OK):
+            os.makedirs(os.path.dirname(dst))
+        # If the file exists already, don't bother rewriting it.
+        if os.path.exists(dst):
+            self._logger.info('File exists on {}. Not overwriting.'.format(dst))
+            os.unlink(tmp_file)
+        else:
+            os.rename(tmp_file, dst)
+
+        return digest
+
+
+    ## PROTECTED METHODS ##
+
+    def _path(self, digest):
+        '''
+        Generate the resource path splitting the resource checksum according to
+        configuration parameters.
+
+        @param digest (string) The resource digest.
+        '''
+        self._logger.debug('Generating path from digest: {}'.format(digest))
+        bl = self._conf['pairtree_branch_length']
+        bc = self._conf['pairtree_branches']
+        term = len(digest) if bc==0 else min(bc*bl, len(digest))
+
+        path = [ digest[i:i+bl] for i in range(0, term, bl) ]
+
+        if bc > 0:
+            path.append(digest[term:])
+        path.insert(0, self.root)
+
+        return '/'.join(path)

+ 0 - 52
lakesuperior/util/digest.py

@@ -1,52 +0,0 @@
-import pickle
-
-from hashlib import sha1
-
-from rdflib.term import Literal, URIRef, Variable
-
-from lakesuperior.dictionaries.namespaces import ns_collection as nsc
-
-
-class Digest:
-    '''
-    Various digest functions. May be merged into something more generic later.
-    '''
-    @staticmethod
-    def rdf_cksum(g):
-        '''
-        Generate a checksum for a graph.
-
-        This is not straightforward because a graph is derived from an
-        unordered data structure (RDF).
-
-        What this method does is ordering the graph by subject, predicate,
-        object, then creating a pickle string and a checksum of it.
-
-        N.B. The context of the triples is ignored, so isomorphic graphs would
-        have the same checksum regardless of the context(s) they are found in.
-
-        @TODO This can be later reworked to use a custom hashing algorithm.
-
-        @param rdflib.Graph g The graph to be hashed.
-
-        @return string SHA1 checksum.
-        '''
-        # Remove the messageDigest property, which very likely reflects the
-        # previous state of the resource.
-        g.remove((Variable('s'), nsc['premis'].messageDigest, Variable('o')))
-
-        ord_g = sorted(list(g), key=lambda x : (x[0], x[1], x[2]))
-        hash = sha1(pickle.dumps(ord_g)).hexdigest()
-
-        return hash
-
-
-    @staticmethod
-    def non_rdf_cksum(data):
-        '''
-        Generate a checksum of non-RDF content.
-
-        @TODO This can be later reworked to use a custom hashing algorithm.
-        '''
-        return sha1(data).hexdigest()
-

+ 5 - 1
server.py

@@ -23,6 +23,11 @@ fcrepo.register_blueprint(ldp, url_prefix='/ldp')
 fcrepo.register_blueprint(ldp, url_prefix='/rest')
 fcrepo.register_blueprint(query, url_prefix='/query')
 
+# Initialize temporary folders.
+tmp_path = config['application']['store']['ldp_nr']['path'] + '/tmp'
+if not os.path.exists(tmp_path):
+    os.makedirs(tmp_path)
+
 
 ## ROUTES ##
 
@@ -41,4 +46,3 @@ def debug():
     '''
     raise RuntimeError()
 
-