threading_poc.pyx.disabled 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. # distutils: include_dirs = ../include
  2. # distutils: library_dirs = ../lib, ../include
  3. import multiprocessing
  4. import threading
  5. import time
  6. cimport cylmdb as lmdb
  7. from cython.parallel cimport prange, parallel
  8. cdef:
  9. lmdb.MDB_env *env
  10. lmdb.MDB_dbi dbi
  11. cdef void _check(int rc) except *:
  12. if rc != lmdb.MDB_SUCCESS:
  13. out_msg = 'LMDB Error ({}): {}'.format(
  14. rc, lmdb.mdb_strerror(rc).decode())
  15. raise RuntimeError(out_msg)
  16. cpdef void get_() except *:
  17. cdef:
  18. unsigned int flags = 0
  19. lmdb.MDB_txn *txn
  20. lmdb.MDB_val key_v, data_v
  21. lmdb.MDB_env *env
  22. _check(lmdb.mdb_env_create(&env))
  23. _check(lmdb.mdb_env_open(env, '/tmp/test_mp', flags, 0o644))
  24. print('Transaction address: {:x}'.format(<unsigned long>txn))
  25. _check(lmdb.mdb_txn_begin(env, NULL, lmdb.MDB_RDONLY, &txn))
  26. key_v.mv_data = b'a'
  27. key_v.mv_size = 1
  28. #_check(lmdb.mdb_get(txn, dbi, &key_v, &data_v))
  29. #print((<unsigned char *>data_v.mv_data)[:data_v.mv_size])
  30. time.sleep(1)
  31. _check(lmdb.mdb_txn_commit(txn))
  32. print('Txn {:x} in thread {} in process {} done.'.format(
  33. <unsigned long>txn,
  34. threading.currentThread().getName(),
  35. multiprocessing.current_process().name))
  36. def run():
  37. cdef:
  38. #unsigned int flags = lmdb.MDB_NOLOCK
  39. #unsigned int flags = lmdb.MDB_NOTLS
  40. unsigned int flags = 0
  41. lmdb.MDB_txn *wtxn
  42. lmdb.MDB_val key_v, data_v
  43. # Set up environment.
  44. _check(lmdb.mdb_env_create(&env))
  45. #_check(lmdb.mdb_env_set_maxreaders(env, 128))
  46. _check(lmdb.mdb_env_open(env, '/tmp/test_mp', flags, 0o644))
  47. # Create DB.
  48. _check(lmdb.mdb_txn_begin(env, NULL, 0, &wtxn))
  49. _check(lmdb.mdb_dbi_open(wtxn, NULL, lmdb.MDB_CREATE, &dbi))
  50. # Write something.
  51. key_v.mv_data = b'a'
  52. key_v.mv_size = 1
  53. ts = str(time.time()).encode()
  54. data_v.mv_data = <unsigned char *>ts
  55. data_v.mv_size = len(ts)
  56. _check(lmdb.mdb_put(wtxn, dbi, &key_v, &data_v, 0))
  57. _check(lmdb.mdb_txn_commit(wtxn))
  58. lmdb.mdb_env_close(env)
  59. #print('Threaded jobs:')
  60. #for i in range(100):
  61. # threading.Thread(target=get_).start()
  62. print('Multiprocess jobs:')
  63. for i in range(10):
  64. multiprocessing.Process(target=get_).start()