Streaming via a C API presents a conundrum in Haskell: on the one hand, the C code is side effecting, on the other, we would like to provide a lazy API.

If you are not familiar with the problem of side effects and laziness, have a look at the paper by Wadler (in particular the array updates example) and perhaps the source for the IO monad.

Consider the problem of streaming compression via a C library. We wish to ensure that effects of calling these C functions take place in the correct order (like IO), and we wish to ensure that specific effects take place before values are read; unlike IO we do not care if some effects are not performed if their results are not needed. A function of type decompress :: BSL.ByteString -> IO BSL.ByteString would force all side effects (i.e. allocations) to occur even if we only need part of the result. Morally, we would like the work to occur based on (lazy) evaluation rather than control flow.

Concretely, consider decompress from my lz4-hs package:

decompress :: BSL.ByteString -> BSL.ByteString decompress bs = runST $ do let bss = BSL.toChunks bs (ctx, buf) <- LazyST.unsafeIOToST $ do (err, preCtx) <- lZ4FCreateDecompressionContext lZ4FGetVersion ctx <- castForeignPtr <$> newForeignPtr lZ4FFreeCompressionContext (castPtr preCtx) check err dstBuf <- mallocForeignPtrBytes bufSz pure (ctx, dstBuf) BSL.fromChunks <$> loop ctx buf bss where bufSz :: Integral a => a bufSz = 32 * 1024 loop :: LzDecompressionCtxPtr -> ForeignPtr a -> [BS.ByteString] -> LazyST.ST s [BS.ByteString] loop _ _ [] = pure [] loop ctx buf (b:bs') = do (nxt, res) <- stepChunk ctx buf b case nxt of Nothing -> (res:) <$> loop ctx buf bs' Just next -> (res:) <$> loop ctx buf (next:bs') stepChunk :: LzDecompressionCtxPtr -> ForeignPtr a -> BS.ByteString -> LazyST.ST s (Maybe BS.ByteString, BS.ByteString) stepChunk !ctx !dst b = LazyST.unsafeIOToST $ BS.unsafeUseAsCStringLen b $ (buf, sz) -> withForeignPtr dst $ \d -> alloca $ \dSzPtr -> alloca $ \szPtr -> do poke dSzPtr (fromIntegral bufSz) poke szPtr (fromIntegral sz) res <- lZ4FDecompress ctx d dSzPtr buf szPtr nullPtr check res bRead <- peek szPtr bWritten <- peek dSzPtr outBs <- BS.packCStringLen (castPtr d, fromIntegral bWritten) let remBs = if fromIntegral bRead == sz then Nothing else Just (BS.drop (fromIntegral bRead) b) pure (remBs, outBs)

This relies on the following c2hs code:

{# fun pure LZ4F_getVersion as ^ {} -> `CUInt' #}

type LZ4FErrorCode = CSize {# typedef LZ4F_errorCode_t LZ4FErrorCode #}

data LzDecompressionCtx

{# pointer *LZ4F_dctx as LzDecompressionCtxPtr foreign finalizer LZ4F_freeDecompressionContext as ^ -> LzDecompressionCtx #}

{# fun LZ4F_createDecompressionContext as ^ { alloca- Ptr LzDecompressionCtx' peek*, CUInt' } -> `LZ4FErrorCode' #}

data LzDecompressOptions

{# pointer *LZ4F_decompressOptions_t as LzDecompressOptionsPtr -> LzDecompressOptions #}

{# fun LZ4F_decompress as ^ { LzDecompressionCtxPtr' , castPtr Ptr a' , castPtr Ptr CSize' , castPtr Ptr b' , castPtr Ptr CSize' , LzDecompressOptionsPtr' } -> `CSize' coerce #}

corresponding to the following C header:

typedef size_t LZ4F_errorCode_t;

LZ4FLIB_API unsigned LZ4F_getVersion(void);

typedef struct LZ4F_dctx_s LZ4F_dctx; typedef LZ4F_dctx* LZ4F_decompressionContext_t;

typedef struct { unsigned stableDst; unsigned reserved[3]; } LZ4F_decompressOptions_t;

LZ4FLIB_API LZ4F_errorCode_t LZ4F_createDecompressionContext(LZ4F_dctx** dctxPtr, unsigned version); LZ4FLIB_API LZ4F_errorCode_t LZ4F_freeDecompressionContext(LZ4F_dctx* dctx);

LZ4FLIB_API size_t LZ4F_decompress(LZ4F_dctx* dctx, void* dstBuffer, size_t* dstSizePtr, const void* srcBuffer, size_t* srcSizePtr, const LZ4F_decompressOptions_t* dOptPtr);

Such APIs are common in C: a stateful object LZ4F_decompressionContext_t is behind a pointer; we perform a series of steps that always has the same result, but we need to track side effects since the data pointed to by LzDecompressionCtxPtr is mutating throughout the computation (recall the example of array updates in the Wadler paper).

Each stepChunk needs to bunched together - we must perform lZ4FDecompress before BS.packCStringLen but at the same time IO [BS.ByteString] is not precisely what we want: it would mean lZ4FDecompress had to be called on each chunk of the input to read the first chunk of the output, failing to live up to the promise of laziness.

We resolve this by calling unsafeIOToST on the result of stepChunk, that is, each step is lifted into the lazy ST monad; stepChunk only does work when a new chunk of the BSL.ByteString is needed.

In fact, if decompress is not lazy, we get pathological memory overuse. Consider the following program:

module Main (main) where

import Codec.Lz4 import qualified Data.ByteString.Lazy as BSL import System.FilePath (()) import System.IO.Temp (withSystemTempDirectory)

main :: IO () main = sequence_ [ compressDump , decompressDump ]

decompressDump :: IO () decompressDump = withSystemTempDirectory "lz4" $ \fp -> BSL.writeFile (fp "valgrind-3.15.0.tar") =<< (decompress <$> BSL.readFile "valgrind-3.15.0.tar.lz4")

compressDump :: IO () compressDump = withSystemTempDirectory "lz4" $ \fp -> BSL.writeFile (fp "valgrind-3.15.0.tar.lz4") =<< (compress <$> BSL.readFile "valgrind-3.15.0.tar")

With the lazy ST monad, we get the following heap profile:

Had we used the strict ST monad:

This shows that laziness is necessary to have sensible memory use. Contrary to superstition, laziness is not synonymous with worse performance; some space leaks are strictness-induced.

Moreover, I think this provides a superior API. C libraries handle streaming compression in various ways (a stateful decoder, callbacks); in Haskell, we use a familiar lazy linked list. The streaming and non-streaming cases are essentially the same in Haskell (have a look at lz4-hs), which is not the case in C (see lz4frame.h and lz4.h).

Technique

I haven't seen this technique put forward or evaluated anywhere before; in fact, I only figured the above out by reading Herbert Valerio Riedel's source code for lzma.

So, for reference, if you want to use this technique:

  1. Use ForeignPtr over Ptr. Edward Z Yang has a series on c2hs, and you can look at the c2hs wiki as well.

    I have not had any success getting lazy streaming to work with ordinary Ptrs and explicit frees.

  2. Each step of the loop should return a LazyST s a rather than an IO a; one gets an ST s a from an IO a via unsafeIOToST.

    There is no need to use unsafeIOToST more than once per iteration.

Also of note, I have not had any success using unsafeInterleaveIO between steps as zstd does.