default_layout.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import hashlib
  2. import logging
  3. import os
  4. import shutil
  5. from uuid import uuid4
  6. from lakesuperior import env
  7. from lakesuperior.store.ldp_nr.base_non_rdf_layout import BaseNonRdfLayout
  8. from lakesuperior.exceptions import ChecksumValidationError
  9. logger = logging.getLogger(__name__)
  10. class DefaultLayout(BaseNonRdfLayout):
  11. """
  12. Default file layout.
  13. This is a simple filesystem layout that stores binaries in pairtree folders
  14. in a local filesystem. Parameters can be specified for the
  15. """
  16. @staticmethod
  17. def local_path(root, uuid, bl=4, bc=4):
  18. """
  19. Generate the resource path splitting the resource checksum according to
  20. configuration parameters.
  21. :param str uuid: The resource UUID. This corresponds to the content
  22. checksum.
  23. """
  24. logger.debug('Generating path from uuid: {}'.format(uuid))
  25. term = len(uuid) if bc == 0 else min(bc * bl, len(uuid))
  26. path = [uuid[i : i + bl] for i in range(0, term, bl)]
  27. if bc > 0:
  28. path.append(uuid[term :])
  29. path.insert(0, root)
  30. return '/'.join(path)
  31. def __init__(self, *args, **kwargs):
  32. """Set up path segmentation parameters."""
  33. super().__init__(*args, **kwargs)
  34. self.bl = self.config['pairtree_branch_length']
  35. self.bc = self.config['pairtree_branches']
  36. ## INTERFACE METHODS ##
  37. def bootstrap(self):
  38. """Initialize binary file store."""
  39. try:
  40. shutil.rmtree(self.root)
  41. except FileNotFoundError:
  42. pass
  43. os.makedirs(self.root + '/tmp')
  44. def persist(
  45. self, uid, stream, bufsize=8192, prov_cksum=None,
  46. prov_cksum_algo=None):
  47. r"""
  48. Store the stream in the file system.
  49. This method handles the file in chunks. for each chunk it writes to a
  50. temp file and adds to a checksum. Once the whole file is written out
  51. to disk and hashed, the temp file is moved to its final location which
  52. is determined by the hash value.
  53. :param str uid: UID of the resource.
  54. :param IOstream stream: file-like object to persist.
  55. :param int bufsize: Chunk size. 2\*\*12 to 2\*\*15 is a good range.
  56. :param str prov_cksum: Checksum provided by the client to verify
  57. that the content received matches what has been sent. If None (the
  58. default) no verification will take place.
  59. :param str prov_cksum_algo: Verification algorithm to validate the
  60. integrity of the user-provided data. If this is different from
  61. the default hash algorithm set in the application configuration,
  62. which is used to calclate the checksum of the file for storing
  63. purposes, a separate hash is calculated specifically for validation
  64. purposes. Clearly it's more efficient to use the same algorithm and
  65. avoid a second checksum calculation.
  66. """
  67. # The temp file is created on the destination filesystem to minimize
  68. # time and risk of moving it to its final destination.
  69. tmp_fname = f'{self.root}/tmp/{uuid4()}'
  70. default_hash_algo = \
  71. env.app_globals.config['application']['uuid']['algo']
  72. if prov_cksum_algo is None:
  73. prov_cksum_algo = default_hash_algo
  74. try:
  75. with open(tmp_fname, 'wb') as f:
  76. logger.debug(f'Writing temp file to {tmp_fname}.')
  77. store_hash = hashlib.new(default_hash_algo)
  78. verify_hash = (
  79. store_hash if prov_cksum_algo == default_hash_algo
  80. else hashlib.new(prov_cksum_algo))
  81. size = 0
  82. while True:
  83. buf = stream.read(bufsize)
  84. if not buf:
  85. break
  86. store_hash.update(buf)
  87. if verify_hash is not store_hash:
  88. verify_hash.update(buf)
  89. f.write(buf)
  90. size += len(buf)
  91. if prov_cksum and verify_hash.hexdigest() != prov_cksum:
  92. raise ChecksumValidationError(
  93. uid, prov_cksum, verify_hash.hexdigest())
  94. except:
  95. logger.exception(f'File write failed on {tmp_fname}.')
  96. os.unlink(tmp_fname)
  97. raise
  98. if size == 0:
  99. logger.warn('Zero-length file received.')
  100. # If the file exists already, don't bother rewriting it.
  101. dst = __class__.local_path(
  102. self.root, store_hash.hexdigest(), self.bl, self.bc)
  103. if os.path.exists(dst):
  104. logger.info(f'File exists on {dst}. Not overwriting.')
  105. # Move temp file to final destination.
  106. logger.debug(f'Saving file to disk: {dst}')
  107. if not os.access(os.path.dirname(dst), os.X_OK):
  108. os.makedirs(os.path.dirname(dst))
  109. os.rename(tmp_fname, dst)
  110. return store_hash.hexdigest(), size
  111. def delete(self, uuid):
  112. """See BaseNonRdfLayout.delete."""
  113. os.unlink(__class__.local_path(self.root, uuid, self.bl, self.bc))