DEV Community

Ethan Bray
Ethan Bray

Posted on • Originally published at ethanbray.com

Streaming Buffered PHP Output in Zend Framework

I was recently working on a feature that required me to retrieve large amounts of data (7 figure document counts) from Elasticsearch, modify the data and then output a CSV file. This feature needed to be performant and not have an adverse effect on the memory usage of the application's EC2 instance.

An original implementation of the feature retrieved all documents, stored them in memory and then created a file on disk that would be served to the user. This resulted in memory exhaustion, as well as taking up large amounts of disk space on production instances.

This pseudocode represents the initial implementation we had:

class ExportController() {
    public function exportAction(): Stream {
        $exportQuery = $this->params()->fromPost('query');
        $this->dataExporter->export($exportQuery);

        $response        = $this->getResponse();
        $responseHeaders = $response
            ->getHeaders()
            ->addHeaders([
                'Content-Description'       => 'File Transfer',
                'Content-Type'              => 'text/csv',
                'Content-Transfer-Encoding' => 'binary',
                'Expires'                   => 0,
                'Cache-Control'             => 'must-revalidate',
                'Pragma'                    => 'public',
            ]);
        $response->setHeaders($responseHeaders);
        $response->setContent($fileContents);
        return $response;
    }
}

class DataExporter {
    public function export($exportQuery)
    {
        // Get all of the results
        $results = $this->elasticsearchService->getData($exportQuery);

        // Iterate through data and output to CSV file
        $filePointer = fopen('/tmp/data.csv', 'wb');
        foreach ($results as $result) {
            fputcsv($filePointer, $result, ',');
        }
    }
}

fputcsv takes a resource as its first parameter, including a resource that points to a PHP I/O stream such as php://output. By replacing /tmp/data.csv with php://output we removed our need to create CSV files on the server.

Now that we're writing the data straight to the output stream, we need to send the correct headers before we flush the buffer:

class ExportController() {
    public function exportAction(): Stream {
        $exportQuery = $this->params()->fromPost('query');
        $this->dataExporter->export($exportQuery);
    }
}

class DataExporter {
    public function export($exportQuery): void
    {
        // Get all of the results
        $results = $this->elasticsearchService->getData($exportQuery);

        // Iterate through data and output to CSV file
        ob_start();
        $outputPointer = fopen('php://output', 'wb');
        foreach ($results as $result) {
            fputcsv($outputPointer, $result, ',');
        }

        // Close our file pointer, send the file headers and flush our output buffer
        fclose($filePointer);
        header('Content-Description: File Transfer');
        header('Content-Type: text/csv');
        header('Content-Transfer-Encoding: binary');
        header('Expires: 0');
        header('Cache-Control: must-revalidate');
        header('Pragma: public');
        ob_end_flush();
        exit();
    }
}

We use exit() to avoid a headers already sent error as Zend Framework will try to send a response, and more headers, to the user.

If we were to write a test for the export method, we'd swiftly run into the issue where our usage of exit() ends our PHPUnit process. Short of monkey patching the exit() function, there's not much we can do about this. Other than removing exit() that is. Instead of flushing the buffer in our export method, we can instead create a Stream response and return that to our controller. The controller will then return the Stream to Zend Framework which will send it to our user.

class ExportController() {
    public function exportAction(): Stream {
        $exportQuery = $this->params()->fromPost('query');
        return $this->dataExporter->export($exportQuery);
    }
}

class DataExporter {
    public function export($exportQuery): Stream
    {
        // Get all of the results
        $results = $this->elasticsearchService->getData($exportQuery);

        // Create our Stream
        $outputPointer = fopen('php://output', 'wb');
        $stream = new Stream();
        $stream->setStream($outputPointer);
        $stream->setHeaders((new Headers())->addHeaders([
            'Content-Description'       => 'File Transfer',
            'Content-Type'              => 'text/csv',
            'Content-Transfer-Encoding' => 'binary',
            'Expires'                   => '0',
            'Cache-Control'             => 'must-revalidate',
            'Pragma'                    => 'public',
        ]);

        // Iterate through data and output to CSV file
        foreach ($results as $result) {
            fputcsv($outputPointer, $result, ',');
        }

        return $stream;
    }
}

When a response is returned by a controller, Zend MVC emits a SendResponseEvent event that is listened to by several Zend Framework listeners. A listener is available for each type of response we may return. In our case, the StreamResponseSender will be responsible for sending our response onwards.

  • Zend\Mvc\SendResponseListener\PhpEnvironmentResponseSender with a priority of -1000
  • Zend\Mvc\SendResponseListener\ConsoleResponseSender with a priority of -2000
  • Zend\Mvc\SendResponseListener\SimpleStreamResponseSender with a priority of -3000
class SimpleStreamResponseSender extends AbstractResponseSender
{
    public function sendStream(SendResponseEvent $event)
    {
        if ($event->contentSent()) {
            return $this;
        }
        $response = $event->getResponse();
        $stream   = $response->getStream();
        fpassthru($stream);
        $event->setContentSent();
    }

    public function __invoke(SendResponseEvent $event)
    {
        $response = $event->getResponse();
        if (! $response instanceof Stream) {
            return $this;
        }
        $this->sendHeaders($event);
        $this->sendStream($event);
        $event->stopPropagation(true);
        return $this;
    }
}

Now unfortunately, fpassthru does not support resources using php://output. Attempting to use the StreamResponseSender will result in an error about an invalid resource. Replacing fpassthru with ob_end_flush is all that is required to make the SimpleStreamResponseSender work.

Now we'll need to create our own response sender to handle these Stream responses using php://output. We also need to differentiate our responses using php://output from a standard Stream response.

class BufferedStream extends Stream {
    public function __construct($filePointer)
    {
        $this->setHeaders((new Headers())->addHeaders([
            'Content-Description'       => 'File Transfer',
            'Content-Type'              => 'text/csv',
            'Content-Transfer-Encoding' => 'binary',
            'Expires'                   => '0',
            'Cache-Control'             => 'must-revalidate',
            'Pragma'                    => 'public',
        ]));
    }
}

class BufferedStreamResponseSender extends AbstractResponseSender
{
    public function sendStream(SendResponseEvent $event): self
    {
        if ($event->contentSent()) {
            return $this;
        }

        // fpassthru does not work with php://output so we have to flush the buffer instead
        ob_end_flush();
        $event->setContentSent();
        return $this;
    }

    public function __invoke(SendResponseEvent $event)
    {
        $response = $event->getResponse();
        if (! $response instanceof BufferedStream) {
            return $this;
        }

        $this->sendHeaders($event);
        $this->sendStream($event);
        $event->stopPropagation(true);
        return $this;
    }
}

Now we have our listener and our new BufferedStream, we simply need to update our DataExporter to use the correct response and attach our listener to the SendResponseEvent.

class ExportController() {
    public function exportAction(): BufferedStream {
        $exportQuery = $this->params()->fromPost('query');
        return $this->dataExporter->export($exportQuery);
    }
}

class DataExporter {
    public function export($exportQuery): BufferedStream
    {
        // Get all of the results
        $results = $this->elasticsearchService->getData($exportQuery);

        // Create our Stream
        $outputPointer = fopen('php://output', 'wb');
        $stream = new BufferedStream();
        $stream->setStream($outputPointer);

        // Iterate through data and output to CSV file
        foreach ($results as $result) {
            fputcsv($outputPointer, $result, ',');
        }

        return $stream;
    }
}

class Module implements ConfigProviderInterface
{
    /**
     * @inheritdoc
     */
    public function getConfig()
    {
        return include __DIR__ . '/../config/module.config.php';
    }

    /**
     * @inheritdoc
     */
    public function onBootstrap(EventInterface $e)
    {
        /** @var EventManager $eventManager */
        $eventManager  = $e->getApplication()->getEventManager();
        $eventManager->attach(SendResponseEvent::EVENT_SEND_RESPONSE, new BufferedStreamResponseSender(), 1);
    }
}

We've now removed any need for processing data in to an intermediary
file. Data is processed and then streamed straight to PHP's buffered
output.

Elasticsearch offers the Scroll API for processing large amounts of data. It operates in a similar way to a cursor on a traditional database. By replacing our single getData call with a loop utilising the Scroll API, we can minimise the amount of data we have stored in memory at one time.

class DataExporter {
    public function export($exportQuery): BufferedStream
    {
        // Create our Stream
        $outputPointer = fopen('php://output', 'wb');
        $stream = new BufferedStream();
        $stream->setStream($outputPointer);

        // Start our scroll
        $resultSet = $this->elasticsearchService->startScroll($exportQuery);
        $this->writeDataToResource($outputPointer, $resultSet);

        // Continue the scroll till we have no further results
        while (count($resultSet->getResults()) !== 0) {
            $resultSet = $this->elasticsearchService->continueScroll($resultSet);
            $this->writeDataToResource($outputPointer, $resultSet);
        }

        return $stream;
    }

    private function writeDataToResource($outputPointer, ResultSet $resultSet): void
    {
        $results = $resultSet->getResults();
        foreach ($results as $result) {
            fputcsv($outputPointer, $result, ',');
        }
    }
}

By utilising streams and the Scroll API, we reduced our memory usage and removed our file usage completely.

Top comments (0)