import threading


class PipeIO(object):
    """Thread-safe pipe with blocking reads and writes
    """
    MAX_BUFFER_SIZE = 4096

    def __init__(self):
        self.lock = threading.RLock()
        self.bytes_consumed = threading.Condition(self.lock)
        self.bytes_produced = threading.Condition(self.lock)

        self.buffer = bytearray()
        self.read_pos = 0

        self._closed = False

    def _bytes_available(self):
        return self.read_pos < len(self.buffer)

    def _reset_buffer(self):
        self.buffer = bytearray()
        self.read_pos = 0

    def read(self, sz):
        """Reads `sz` bytes at most

        Blocks until some data is available in buffer.

        :param sz: the maximum count of bytes to read
        :return: bytes read
        """
        self.lock.acquire()
        try:
            while not self._bytes_available():
                if self._closed:
                    return bytes()

                self.bytes_produced.wait()

            read_until_pos = min(self.read_pos + sz, len(self.buffer))
            result = bytes(self.buffer[self.read_pos:read_until_pos])

            self.read_pos = read_until_pos
            self.bytes_consumed.notify_all()

            return result
        finally:
            self.lock.release()

    def write(self, buf):
        """Writes `buf` content

        Blocks until all `buf` written.

        :param buf: bytes to write
        :return: None
        """
        self.lock.acquire()
        try:
            buf_pos = 0
            while True:
                if buf_pos == len(buf):
                    break

                if len(self.buffer) == self.MAX_BUFFER_SIZE:
                    while self.read_pos < self.MAX_BUFFER_SIZE:
                        self.bytes_consumed.wait()

                    self._reset_buffer()

                bytes_to_write = min(len(buf) - buf_pos, self.MAX_BUFFER_SIZE - len(self.buffer))
                new_buf_pos = buf_pos + bytes_to_write

                self.buffer.extend(buf[buf_pos:new_buf_pos])
                self.bytes_produced.notify_all()

                buf_pos = new_buf_pos
        finally:
            self.lock.release()

    def close(self):
        """Gracefully closes the `PipeIO`

        Allows to read remaining bytes from this `PipeIO`.

        Note that `close()` is expected to be invoked from the same thread as
        `write()`.
        """
        self.lock.acquire()
        try:
            self._closed = True
            # wake up the reader to let him find out that no more bytes will be
            # available
            self.bytes_produced.notify_all()
        finally:
            self.lock.release()


def readall(read_fn, sz):
    """Reads `sz` bytes using `read_fn`

    Raises `EOFError` if `read_fn` returned the empty byte array while reading
    all `sz` bytes.
    """
    buff = b''
    have = 0
    while have < sz:
        chunk = read_fn(sz - have)
        have += len(chunk)
        buff += chunk

        if len(chunk) == 0:
            raise EOFError

    return buff
