dph-prim-seq: add stream functions to make indexvs
authorBen Lippmeier <benl@ouroborus.net>
Tue, 6 Dec 2011 05:02:39 +0000 (16:02 +1100)
committerBen Lippmeier <benl@ouroborus.net>
Tue, 6 Dec 2011 05:02:39 +0000 (16:02 +1100)
dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential/Streams.hs

index c7bc63a..9d8657b 100644 (file)
@@ -1,12 +1,25 @@
 {-# LANGUAGE CPP #-}
 #include "fusion-phases.h"
 module Data.Array.Parallel.Unlifted.Sequential.Streams
-        ( unsafeStreamSegsFromNested
-        , unsafeExtractsFromNestedWithUSSegd
+        ( -- * Segment streamers
+          unsafeStreamSegsFromNested
         , unsafeStreamSegsFromVectors
+
+          -- * Element streamers
+        , unsafeStreamElemsFromVectors
+        , unsafeStreamElemsFromVectorsVSegd
+
+          -- * SrcIxs transformers
+        , unsafeStreamSrcIxsThroughVSegids
+        , unsafeStreamSrcIxsThroughUSSegd
+
+          -- * Extracts wrappers
+        , unsafeExtractsFromNestedWithUSSegd
         , unsafeExtractsFromVectorsWithUSSegd
         , unsafeExtractsFromVectorsWithUVSegd)
 where
+import Data.Vector.Fusion.Stream.Size
+import Data.Vector.Fusion.Stream.Monadic
 import Data.Vector.Unboxed                                       (Unbox,   Vector)
 import Data.Array.Parallel.Unlifted.Sequential.Vectors           (Unboxes, Vectors)
 import Data.Array.Parallel.Unlifted.Sequential.USSegd            (USSegd(..))
@@ -15,16 +28,13 @@ import qualified Data.Array.Parallel.Unlifted.Sequential.Vectors as US
 import qualified Data.Array.Parallel.Unlifted.Sequential.USegd   as USegd
 import qualified Data.Array.Parallel.Unlifted.Sequential.USSegd  as USSegd
 import qualified Data.Array.Parallel.Unlifted.Sequential.UVSegd  as UVSegd
-import qualified Data.Vector.Fusion.Stream                       as S
-import qualified Data.Vector.Fusion.Stream.Size                  as S
-import qualified Data.Vector.Fusion.Stream.Monadic               as M
 import qualified Data.Vector.Unboxed                             as U
 import qualified Data.Vector.Generic                             as G
 import qualified Data.Vector                                     as V
 import qualified Data.Primitive.ByteArray                        as P
 
 
--- Nested -----------------------------------------------------------------------------------------
+-- Segment streamers ------------------------------------------------------------------------------
 -- | Stream some physical segments from many data arrays.
 -- 
 --   * TODO: make this more efficient, and fix fusion.
@@ -37,11 +47,11 @@ import qualified Data.Primitive.ByteArray                        as P
 --           ifThenElse checking the manifest flag.
 
 unsafeStreamSegsFromNested
-        :: Unbox a
+        :: (Unbox a, Monad m)
         => USSegd               -- ^ Segment descriptor defining segments base
                                 --   on source vectors.
         -> V.Vector (Vector a)  -- ^ Source arrays.
-        -> S.Stream a
+        -> Stream m a
 
 unsafeStreamSegsFromNested
         ussegd@(USSegd _ starts sources usegd)
@@ -55,11 +65,11 @@ unsafeStreamSegsFromNested
         fn (pseg, ix)
          -- All psegs are done.
          | pseg >= USSegd.length ussegd
-         = return $ S.Done
+         = return $ Done
          
          -- Current pseg is done
          | ix   >= pseglens `U.unsafeIndex` pseg 
-         = return $ S.Skip (pseg + 1, 0)
+         = return $ Skip (pseg + 1, 0)
 
          -- Stream an element from this pseg
          | otherwise
@@ -67,19 +77,11 @@ unsafeStreamSegsFromNested
                 !pdata   = pdatas  `V.unsafeIndex` srcid
                 !start   = starts  `U.unsafeIndex` pseg
                 !result  = pdata   `U.unsafeIndex` (start + ix)
-           in   return $ S.Yield result (pseg, ix + 1)
+           in   return $ Yield result (pseg, ix + 1)
 
-   in   M.Stream fn (0, 0) S.Unknown
+   in   Stream fn (0, 0) Unknown
 {-# INLINE_STREAM unsafeStreamSegsFromNested #-}
 
--- | Copy segments from a `Vectors` and concatenate them into a new array.
-unsafeExtractsFromNestedWithUSSegd
-        :: (U.Unbox a)
-        => USSegd -> V.Vector (Vector a) -> U.Vector a
-
-unsafeExtractsFromNestedWithUSSegd ussegd vectors
-        = G.unstream $ unsafeStreamSegsFromNested ussegd vectors
-{-# INLINE_U unsafeExtractsFromNestedWithUSSegd #-}
 
 
 -- Vectors ----------------------------------------------------------------------------------------
@@ -88,13 +90,16 @@ unsafeExtractsFromNestedWithUSSegd ussegd vectors
 --   * There must be at least one segment in the `USSegd`, but this is not checked.
 -- 
 --   * No bounds checking is done for the `USSegd`.
+--
+--   * TODO: refactor this to take a stream of segment ids and indices.
+--
 unsafeStreamSegsFromVectors
-        :: Unboxes a
+        :: (Unboxes a, Monad m)
         => Maybe (U.Vector Int) -- ^ Virtual segment identifiers
                                 --   if `Nothing` this is assumed to be [0, 1, 2 ... segs - 1]
         -> USSegd               -- ^ Scattered segment descriptor
         -> Vectors a            -- ^ Vectors holding source data.
-        -> S.Stream a
+        -> Stream m a
 
 unsafeStreamSegsFromVectors 
         mVSegids
@@ -125,7 +130,7 @@ unsafeStreamSegsFromVectors
             then if ixSeg + 1 >= segsTotal      -- Was that last seg?
 
                        -- That was the last seg, we're done.
-                  then return $ S.Done
+                  then return $ Done
                   
                        -- Move to the next seg.
                   else let ixSeg'       = ixSeg + 1
@@ -135,7 +140,7 @@ unsafeStreamSegsFromVectors
                            lenSeg       = U.unsafeIndex segLens    ixPSeg'
                            (arr, startArr, lenArr) 
                                         = US.unsafeIndexUnpack vectors sourceSeg
-                       in  return $ S.Skip
+                       in  return $ Skip
                                   ( ixSeg'
                                   , arr
                                   , startArr + startSeg + lenSeg
@@ -143,7 +148,7 @@ unsafeStreamSegsFromVectors
 
                  -- Stream the next element from the segment.
             else let !result  = P.indexByteArray baSeg ixElem
-                 in  return   $ S.Yield result (ixSeg, baSeg, ixEnd, ixElem + 1)
+                 in  return   $ Yield result (ixSeg, baSeg, ixEnd, ixElem + 1)
 
         -- Starting state of the stream.
         !initState
@@ -160,10 +165,107 @@ unsafeStreamSegsFromVectors
 
         -- It's important that we set the result stream size, so Data.Vector
         -- doesn't need to add code to grow the result when it overflows.
-   in   M.Stream fnSeg initState (S.Exact elements) 
+   in   Stream fnSeg initState (Exact elements) 
 {-# INLINE_STREAM unsafeStreamSegsFromVectors #-}
 
 
+-- | Take a stream of chunk and chunk element indices, look them up from
+--   some vectors, and produce a stream of elements.
+unsafeStreamElemsFromVectors 
+        :: (Monad m, Unboxes a) 
+        => Vectors a -> Stream m (Int, Int) -> Stream m a
+
+unsafeStreamElemsFromVectors vectors (Stream mkStep s0 size)
+ = vectors `seq` Stream mkStep' s0 size
+  where
+        {-# INLINE_INNER mkStep' #-}
+        mkStep' s
+         = do   step    <- mkStep s
+                case step of
+                 Yield (ix1, ix2) s' -> return $ Yield (US.unsafeIndex2 vectors ix1 ix2) s'
+                 Skip s'             -> return $ Skip s'
+                 Done                -> return Done
+{-# INLINE_STREAM unsafeStreamElemsFromVectors #-}
+
+
+-- | Take a stream of virtual segment ids and element indices, 
+--   pass them through a `UVSegd` to get physical segment and element indices, 
+--   and produce a stream of elements.
+unsafeStreamElemsFromVectorsVSegd
+        :: (Monad m, Unboxes a)
+        => Vectors a -> UVSegd -> Stream m (Int, Int) -> Stream m a
+
+unsafeStreamElemsFromVectorsVSegd vectors vsegd vsrcixs
+ = let  !vsegids = UVSegd.takeVSegids vsegd
+        !ussegd  = UVSegd.takeUSSegd  vsegd
+   in   unsafeStreamElemsFromVectors        vectors
+         $ unsafeStreamSrcIxsThroughUSSegd  ussegd
+         $ unsafeStreamSrcIxsThroughVSegids vsegids
+         $ vsrcixs
+{-# INLINE_STREAM unsafeStreamElemsFromVectorsVSegd #-}
+
+
+-- VSegd Streamers ------------------------------------------------------------
+-- | Take a stream of virtual segment and segment element indices,
+--   and convert it to a stream of physical segment and segment element indices.
+unsafeStreamSrcIxsThroughVSegids
+        :: Monad m
+        => U.Vector Int -> Stream m (Int, Int) -> Stream m (Int, Int)
+
+unsafeStreamSrcIxsThroughVSegids vsegids (Stream mkStep s0 size)
+ = vsegids `seq` Stream mkStep' s0 size
+ where
+        {-# INLINE_INNER mkStep' #-}
+        mkStep' s
+         = do   step    <- mkStep s
+                case step of
+                 Yield (ix1, ix2) s'
+                  -> let !pseg  = U.unsafeIndex vsegids ix1
+                     in  return $ Yield (pseg, ix2) s'
+                 
+                 Skip s' -> return $ Skip s'
+                 Done    -> return Done
+{-# INLINE_STREAM unsafeStreamSrcIxsThroughVSegids #-}
+
+
+-- SSegd Streamers ------------------------------------------------------------
+-- | Take a stream of segment and segment element indices,
+--   and convert it to a stream of chunk and chunk element indices.
+unsafeStreamSrcIxsThroughUSSegd 
+        :: Monad m
+        => USSegd -> Stream m (Int, Int) -> Stream m (Int, Int)
+        
+unsafeStreamSrcIxsThroughUSSegd ussegd (Stream mkStep s0 size)
+ = ussegd `seq` Stream mkStep' s0 size
+ where
+        !sources = USSegd.takeSources ussegd
+        !starts  = USSegd.takeStarts  ussegd
+   
+        {-# INLINE_INNER mkStep' #-}
+        mkStep' s
+         = do   step    <- mkStep s
+                case step of
+                 Yield (ix1, ix2) s'
+                  -> let !src    = U.unsafeIndex sources ix1
+                         !start  = U.unsafeIndex starts  ix1
+                     in  return $ Yield (src, start + ix2) s'
+                 
+                 Skip s' -> return $ Skip s'
+                 Done    -> return Done
+{-# INLINE_STREAM unsafeStreamSrcIxsThroughUSSegd #-}
+
+
+-- Extracts wrappers ---------------------------------------------------------
+-- | Copy segments from a `Vectors` and concatenate them into a new array.
+unsafeExtractsFromNestedWithUSSegd
+        :: (U.Unbox a)
+        => USSegd -> V.Vector (Vector a) -> U.Vector a
+
+unsafeExtractsFromNestedWithUSSegd ussegd vectors
+        = G.unstream $ unsafeStreamSegsFromNested ussegd vectors
+{-# INLINE_U unsafeExtractsFromNestedWithUSSegd #-}
+
+
 -- | Copy segments from a `Vectors` and concatenate them into a new array.
 unsafeExtractsFromVectorsWithUSSegd
         :: (Unboxes a, U.Unbox a)
@@ -177,6 +279,9 @@ unsafeExtractsFromVectorsWithUSSegd ussegd vectors
 -- | Copy segments from a `Vectors` and concatenate them into a new array.
 --
 --   TODO: avoid creating vsegids if possible.
+--   TODO: we should refactor this to have an intermediate stream of
+--         slice descriptors.
+--
 unsafeExtractsFromVectorsWithUVSegd
         :: (Unboxes a, U.Unbox a)
         => UVSegd -> Vectors a -> U.Vector a
@@ -188,6 +293,3 @@ unsafeExtractsFromVectorsWithUVSegd uvsegd vectors
                 (UVSegd.takeUSSegd  uvsegd)
                 vectors
 {-# INLINE_U unsafeExtractsFromVectorsWithUVSegd #-}
-
-
-