resource.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. from functools import wraps
  2. from multiprocessing import Process
  3. from threading import Lock, Thread
  4. from flask import (
  5. Blueprint, current_app, g, make_response, render_template,
  6. request, send_file)
  7. from lakesuperior.store.ldp_rs.lmdb_store import TxnManager
  8. def transaction(write=False):
  9. '''
  10. Handle atomic operations in a store.
  11. This wrapper ensures that a write operation is performed atomically. It
  12. also takes care of sending a message for each resource changed in the
  13. transaction.
  14. '''
  15. def _transaction_deco(fn):
  16. @wraps(fn)
  17. def _wrapper(*args, **kwargs):
  18. if not hasattr(g, 'changelog'):
  19. g.changelog = []
  20. store = current_app.rdfly.store
  21. with TxnManager(store, write=write) as txn:
  22. ret = fn(*args, **kwargs)
  23. if len(g.changelog):
  24. job = Thread(target=process_queue)
  25. job.start()
  26. return ret
  27. return _wrapper
  28. return _transaction_deco
  29. def process_queue():
  30. '''
  31. Process the message queue on a separate thread.
  32. '''
  33. lock = Lock()
  34. lock.acquire()
  35. while len(g.changelog):
  36. send_event_msg(g.changelog.pop())
  37. lock.release()
  38. def send_event_msg(remove_trp, add_trp, metadata):
  39. '''
  40. Break down delta triples, find subjects and send event message.
  41. '''
  42. remove_grp = groupby(remove_trp, lambda x : x[0])
  43. remove_dict = { k[0] : k[1] for k in remove_grp }
  44. add_grp = groupby(add_trp, lambda x : x[0])
  45. add_dict = { k[0] : k[1] for k in add_grp }
  46. subjects = set(remove_dict.keys()) | set(add_dict.keys())
  47. for rsrc_uri in subjects:
  48. self._logger.info('subject: {}'.format(rsrc_uri))
  49. #current_app.messenger.send