Sophie

Sophie

distrib > Mageia > 4 > x86_64 > by-pkgid > 4726f970c4b56b9a0ebb9a03a0b6522e > files > 17

python-tables-doc-3.0.0-4.mga4.noarch.rpm

# Benchmark three methods of using PyTables with multiple processes, where data
# is read from a PyTables file in one process and then sent to another
#
# 1. using multiprocessing.Pipe
# 2. using a memory mapped file that's shared between two processes, passed as
#    out argument to tables.Array.read.
# 3. using a Unix domain socket (this uses the "abstract namespace" and will
#    work only on Linux).
# 4. using an IPv4 socket
#
# In all three cases, an array is loaded from a file in one process, sent to
# another, and then modified by incrementing each array element.  This is meant
# to simulate retrieving data and then modifying it.

from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import multiprocessing
import os
import random
import select
import socket
import time

import numpy as np
import tables


# create a PyTables file with a single int64 array with the specified number of
# elements
def create_file(array_size):
    array = np.ones(array_size, dtype='i8')
    with tables.open_file('test.h5', 'w') as fobj:
        array = fobj.create_array('/', 'test', array)
        print('file created, size: {0} MB'.format(array.size_on_disk / 1e6))


# process to receive an array using a multiprocessing.Pipe connection
class PipeReceive(multiprocessing.Process):

    def __init__(self, receiver_pipe, result_send):
        super(PipeReceive, self).__init__()
        self.receiver_pipe = receiver_pipe
        self.result_send = result_send

    def run(self):
        # block until something is received on the pipe
        array = self.receiver_pipe.recv()
        recv_timestamp = time.time()
        # perform an operation on the received array
        array += 1
        finish_timestamp = time.time()
        assert(np.all(array == 2))
        # send the measured timestamps back to the originating process
        self.result_send.send((recv_timestamp, finish_timestamp))


def read_and_send_pipe(send_type, array_size):
    # set up Pipe objects to send the actual array to the other process
    # and receive the timing results from the other process
    array_recv, array_send = multiprocessing.Pipe(False)
    result_recv, result_send = multiprocessing.Pipe(False)
    # start the other process and pause to allow it to start up
    recv_process = PipeReceive(array_recv, result_send)
    recv_process.start()
    time.sleep(0.15)
    with tables.open_file('test.h5', 'r') as fobj:
        array = fobj.get_node('/', 'test')
        start_timestamp = time.time()
        # read an array from the PyTables file and send it to the other process
        output = array.read(0, array_size, 1)
        array_send.send(output)
        assert(np.all(output + 1 == 2))
        # receive the timestamps from the other process
        recv_timestamp, finish_timestamp = result_recv.recv()
    print_results(send_type, start_timestamp, recv_timestamp, finish_timestamp)
    recv_process.join()


# process to receive an array using a shared memory mapped file
# for real use, this would require creating some protocol to specify the array's
# data type and shape
class MemmapReceive(multiprocessing.Process):

    def __init__(self, path_recv, result_send):
        super(MemmapReceive, self).__init__()
        self.path_recv = path_recv
        self.result_send = result_send

    def run(self):
        # block until the memmap file path is received from the other process
        path = self.path_recv.recv()
        # create a memmap array using the received file path
        array = np.memmap(path, 'i8', 'r+')
        recv_timestamp = time.time()
        # perform an operation on the array
        array += 1
        finish_timestamp = time.time()
        assert(np.all(array == 2))
        # send the timing results back to the other process
        self.result_send.send((recv_timestamp, finish_timestamp))


def read_and_send_memmap(send_type, array_size):
    # create a multiprocessing Pipe that will be used to send the memmap
    # file path to the receiving process
    path_recv, path_send = multiprocessing.Pipe(False)
    result_recv, result_send = multiprocessing.Pipe(False)
    # start the receiving process and pause to allow it to start up
    recv_process = MemmapReceive(path_recv, result_send)
    recv_process.start()
    time.sleep(0.15)
    with tables.open_file('test.h5', 'r') as fobj:
        array = fobj.get_node('/', 'test')
        start_timestamp = time.time()
        # memmap a file as a NumPy array in 'overwrite' mode
        output = np.memmap('/tmp/array1', 'i8', 'w+', shape=(array_size, ))
        # read an array from a PyTables file into the memmory mapped array
        array.read(0, array_size, 1, out=output)
        # use a multiprocessing.Pipe to send the file's path to the receiving
        # process
        path_send.send('/tmp/array1')
        # receive the timestamps from the other process
        recv_timestamp, finish_timestamp = result_recv.recv()
        # because 'output' is shared between processes, all elements should now
        # be equal to 2
        assert(np.all(output == 2))
    print_results(send_type, start_timestamp, recv_timestamp, finish_timestamp)
    recv_process.join()


# process to receive an array using a socket
# for real use, this would require creating some protocol to specify the array's
# data type and shape
class SocketReceive(multiprocessing.Process):

    def __init__(self, socket_family, address, result_send, array_nbytes):
        super(SocketReceive, self).__init__()
        self.socket_family = socket_family
        self.address = address
        self.result_send = result_send
        self.array_nbytes = array_nbytes

    def run(self):
        # create the socket, listen for a connection and use select to block
        # until a connection is made
        sock = socket.socket(self.socket_family, socket.SOCK_STREAM)
        sock.bind(self.address)
        sock.listen(1)
        readable, _, _ = select.select([sock], [], [])
        # accept the connection and read the sent data into a bytearray
        connection = sock.accept()[0]
        recv_buffer = bytearray(self.array_nbytes)
        view = memoryview(recv_buffer)
        bytes_recv = 0
        while bytes_recv < self.array_nbytes:
            bytes_recv += connection.recv_into(view[bytes_recv:])
        # convert the bytearray into a NumPy array
        array = np.frombuffer(recv_buffer, dtype='i8')
        recv_timestamp = time.time()
        # perform an operation on the received array
        array += 1
        finish_timestamp = time.time()
        assert(np.all(array == 2))
        # send the timestamps back to the originating process
        self.result_send.send((recv_timestamp, finish_timestamp))
        connection.close()
        sock.close()


def unix_socket_address():
    # create a Unix domain address in the abstract namespace
    # this will only work on Linux
    return b'\x00' + os.urandom(5)


def ipv4_socket_address():
    # create an IPv4 socket address
    return ('127.0.0.1', random.randint(9000, 10000))


def read_and_send_socket(send_type, array_size, array_bytes, address_func,
                         socket_family):
    address = address_func()
    # start the receiving process and pause to allow it to start up
    result_recv, result_send = multiprocessing.Pipe(False)
    recv_process = SocketReceive(socket_family, address, result_send,
                                 array_bytes)
    recv_process.start()
    time.sleep(0.15)
    with tables.open_file('test.h5', 'r') as fobj:
        array = fobj.get_node('/', 'test')
        start_timestamp = time.time()
        # connect to the receiving process' socket
        sock = socket.socket(socket_family, socket.SOCK_STREAM)
        sock.connect(address)
        # read the array from the PyTables file and send its
        # data buffer to the receiving process
        output = array.read(0, array_size, 1)
        sock.send(output.data)
        assert(np.all(output + 1 == 2))
        # receive the timestamps from the other process
        recv_timestamp, finish_timestamp = result_recv.recv()
    sock.close()
    print_results(send_type, start_timestamp, recv_timestamp, finish_timestamp)
    recv_process.join()


def print_results(send_type, start_timestamp, recv_timestamp, finish_timestamp):
    msg = 'type: {0}\t receive: {1:5.5f}, add:{2:5.5f}, total: {3:5.5f}'
    print(msg.format(send_type,
                     recv_timestamp - start_timestamp,
                     finish_timestamp - recv_timestamp,
                     finish_timestamp - start_timestamp))


if __name__ == '__main__':

    random.seed(os.urandom(2))
    array_num_bytes = [int(x) for x in [1e5, 1e6, 1e7, 1e8]]

    for array_bytes in array_num_bytes:
        array_size = int(array_bytes // 8)

        create_file(array_size)
        read_and_send_pipe('multiproc.Pipe', array_size)
        read_and_send_memmap('memmap     ', array_size)
        # comment out this line to run on an OS other than Linux
        read_and_send_socket('Unix socket', array_size, array_bytes,
                             unix_socket_address, socket.AF_UNIX)
        read_and_send_socket('IPv4 socket', array_size, array_bytes,
                             ipv4_socket_address, socket.AF_INET)
        print()