Pārlūkot izejas kodu

Allow benchmarking ingestion via Python API.

Stefano Cossu 6 gadi atpakaļ
vecāks
revīzija
3e9fa03460
1 mainītis faili ar 106 papildinājumiem un 29 dzēšanām
  1. 106 29
      lakesuperior/util/benchmark.py

+ 106 - 29
lakesuperior/util/benchmark.py

@@ -1,11 +1,14 @@
 #!/usr/bin/env python3
 
+import logging
 import sys
 
+from os import path
 from uuid import uuid4
 
 import arrow
 import click
+import rdflib
 import requests
 
 from matplotlib import pyplot as plt
@@ -17,16 +20,32 @@ __doc__ = '''
 Benchmark script to measure write performance.
 '''
 
+def_mode = 'ldp'
 def_endpoint = 'http://localhost:8000/ldp'
 def_ct = 10000
 def_parent = '/pomegranate'
 def_gr_size = 200
 
+logging.disable(logging.WARN)
+
 
 @click.command()
+@click.option(
+    '--mode', '-m', default=def_mode,
+    help=(
+        'Mode of ingestion. One of `ldp`, `python`. With the former, the '
+        'HTTP/LDP web server is used. With the latter, the Python API is '
+        'used, in which case the server need not be running. '
+        f'Default: {def_endpoint}'
+    )
+)
 @click.option(
     '--endpoint', '-e', default=def_endpoint,
-    help=f'LDP endpoint. Default: {def_endpoint}')
+    help=(
+        'LDP endpoint. Only meaningful with `ldp` mode. '
+        f'Default: {def_endpoint}'
+    )
+)
 @click.option(
     '--count', '-c', default=def_ct,
     help='Number of resources to ingest. Default: {def_ct}')
@@ -40,9 +59,12 @@ def_gr_size = 200
     help='Delete container resource and its children if already existing. By '
     'default, the container is not deleted and new resources are added to it.')
 @click.option(
-    '--method', '-m', default='put',
-    help='HTTP method to use. Case insensitive. Either PUT '
-    f'or POST. Default: PUT')
+    '--method', '-X', default='put',
+    help=(
+        'HTTP method to use. Case insensitive. Either PUT or POST. '
+        'Default: PUT'
+    )
+)
 @click.option(
     '--graph-size', '-s', default=def_gr_size,
     help=f'Number of triples in each graph. Default: {def_gr_size}')
@@ -52,47 +74,73 @@ def_gr_size = 200
     '`n` (only  LDP-NR, i.e. binaries), or `b` (50/50% of both). '
     'Default: r')
 @click.option(
-    '--graph', '-g', is_flag=True, help='Plot a graph of ingest timings. '
+    '--plot', '-P', is_flag=True, help='Plot a graph of ingest timings. '
     'The graph figure is displayed on screen with basic manipulation and save '
     'options.')
 
 def run(
-        endpoint, count, parent, method, delete_container,
-        graph_size, resource_type, graph):
-
-    container_uri = endpoint + parent
+    mode, endpoint, count, parent, method, delete_container,
+    graph_size, resource_type, plot
+):
+    """
+    Run the benchmark.
+    """
 
     method = method.lower()
     if method not in ('post', 'put'):
-        raise ValueError(f'HTTP method not supported: {method}')
+        raise ValueError(f'Insertion method not supported: {method}')
+
+    mode = mode.lower()
+    if mode == 'ldp':
+        parent = '{}/{}'.format(endpoint.strip('/'), parent.strip('/'))
+
+        if delete_container:
+            requests.delete(parent, headers={'prefer': 'no-tombstone'})
+        requests.put(parent)
 
-    if delete_container:
-        requests.delete(container_uri, headers={'prefer': 'no-tombstone'})
-    requests.put(container_uri)
+    elif mode == 'python':
+        from lakesuperior import env_setup
+        from lakesuperior.api import resource as rsrc_api
+
+        if delete_container:
+            rsrc_api.delete(parent, soft=False)
+        rsrc_api.create_or_replace(parent)
+    else:
+        raise ValueError(f'Mode not supported: {mode}')
 
-    print(f'Inserting {count} children under {container_uri}.')
 
     # URI used to establish an in-repo relationship. This is set to
     # the most recently created resource in each loop.
-    ref = container_uri
+    ref = parent
+
+    print(f'Inserting {count} children under {parent}.')
 
     wclock_start = arrow.utcnow()
-    if graph:
+    if plot:
         print('Results will be plotted.')
         # Plot coordinates: X is request count, Y is request timing.
         px = []
         py = []
         plt.xlabel('Requests')
         plt.ylabel('ms per request')
-        plt.title('FCREPO Benchmark')
+        plt.title('Lakesuperior / FCREPO Benchmark')
 
     try:
         for i in range(1, count + 1):
-            url = '{}/{}'.format(container_uri, uuid4()) if method == 'put' \
-                    else container_uri
+            #import pdb; pdb.set_trace()
+            if mode == 'ldp':
+                dest = (
+                    f'{parent}/{uuid4()}' if method == 'put'
+                    else parent
+                )
+            else:
+                dest = (
+                    path.join(parent, str(uuid4()))
+                    if method == 'put' else parent
+                )
 
             if resource_type == 'r' or (resource_type == 'b' and i % 2 == 0):
-                data = random_graph(graph_size, ref).serialize(format='ttl')
+                data = random_graph(graph_size, ref)
                 headers = {'content-type': 'text/turtle'}
             else:
                 img = random_image(name=uuid4(), ts=16, ims=512)
@@ -103,19 +151,21 @@ def run(
                         'content-disposition': 'attachment; filename="{}"'
                             .format(uuid4())}
 
-            #import pdb; pdb.set_trace()
             # Start timing after generating the data.
             ckpt = arrow.utcnow()
             if i == 1:
                 tcounter = ckpt - ckpt
                 prev_tcounter = tcounter
 
-            rsp = requests.request(method, url, data=data, headers=headers)
-            tdelta = arrow.utcnow() - ckpt
-            tcounter += tdelta
+            ref = (
+                _ingest_graph_ldp(
+                    method, dest, data.serialize(format='ttl'), headers, ref
+                )
+                if mode == 'ldp'
+                else _ingest_graph_py(method, dest, data, ref)
+            )
+            tcounter += (arrow.utcnow() - ckpt)
 
-            rsp.raise_for_status()
-            ref = rsp.headers['location']
             if i % 10 == 0:
                 avg10 = (tcounter - prev_tcounter) / 10
                 print(
@@ -123,7 +173,7 @@ def run(
                     f'Per resource: {avg10}')
                 prev_tcounter = tcounter
 
-                if graph:
+                if plot:
                     px.append(i)
                     # Divide by 1000 for µs → ms
                     py.append(avg10.microseconds // 1000)
@@ -136,7 +186,7 @@ def run(
     print(f'Total time spent ingesting resources: {tcounter}')
     print(f'Average time per resource: {tcounter.total_seconds()/i}')
 
-    if graph:
+    if plot:
         if resource_type == 'r':
             type_label = 'LDP-RS'
         elif resource_type == 'n':
@@ -144,12 +194,39 @@ def run(
         else:
             type_label = 'LDP-RS + LDP-NR'
         label = (
-            f'{container_uri}; {method.upper()}; {graph_size} trp/graph; '
+            f'{parent}; {method.upper()}; {graph_size} trp/graph; '
             f'{type_label}')
         plt.plot(px, py, label=label)
         plt.legend()
         plt.show()
 
 
+def _ingest_graph_ldp(method, uri, data, headers, ref):
+    """
+    Ingest the graph via HTTP/LDP.
+    """
+    rsp = requests.request(method, uri, data=data, headers=headers)
+    rsp.raise_for_status()
+    return rsp.headers['location']
+
+
+def _ingest_graph_py(method, dest, data, ref):
+    from lakesuperior.api import resource as rsrc_api
+
+    kwargs = {}
+    if isinstance(data, rdflib.Graph):
+        kwargs['graph'] = data
+    else:
+        kwargs['stream'] = data
+        kwargs['mimetype'] = 'image/png'
+
+    if method == 'put':
+        _, rsrc = rsrc_api.create_or_replace(dest, **kwargs)
+    else:
+        _, rsrc = rsrc_api.create(dest, **kwargs)
+
+    return rsrc.uid
+
+
 if __name__ == '__main__':
     run()