dph-prim-seq: shift streamers into their own modules and cleanup
authorBen Lippmeier <benl@ouroborus.net>
Fri, 9 Dec 2011 04:40:56 +0000 (15:40 +1100)
committerBen Lippmeier <benl@ouroborus.net>
Fri, 9 Dec 2011 04:40:56 +0000 (15:40 +1100)
12 files changed:
dph-prim-seq/Data/Array/Parallel/Unlifted.hs
dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential.hs
dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential/Combinators.hs
dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential/Streams.hs [deleted file]
dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential/USel.hs
dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential/Vector.hs
dph-prim-seq/Data/Array/Parallel/Unlifted/Stream.hs [new file with mode: 0644]
dph-prim-seq/Data/Array/Parallel/Unlifted/Stream/Elems.hs [new file with mode: 0644]
dph-prim-seq/Data/Array/Parallel/Unlifted/Stream/Ixs.hs [new file with mode: 0644]
dph-prim-seq/Data/Array/Parallel/Unlifted/Stream/Segments.hs [new file with mode: 0644]
dph-prim-seq/Data/Array/Parallel/Unlifted/Vectors.hs [moved from dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential/Vectors.hs with 84% similarity]
dph-prim-seq/dph-prim-seq.cabal

index ef997f1..07ffd1d 100644 (file)
@@ -11,13 +11,11 @@ import Data.Array.Parallel.Unlifted.Sequential.USel
 import Data.Array.Parallel.Unlifted.Sequential.Basics
 import Data.Array.Parallel.Unlifted.Sequential.Combinators
 import Data.Array.Parallel.Unlifted.Sequential.Sums
-import Data.Array.Parallel.Unlifted.Sequential
 import qualified Data.Array.Parallel.Unlifted.Sequential.USegd   as USegd
 import qualified Data.Array.Parallel.Unlifted.Sequential.USSegd  as USSegd
 import qualified Data.Array.Parallel.Unlifted.Sequential.UVSegd  as UVSegd
 import qualified Data.Array.Parallel.Unlifted.Sequential.Vector  as U
-import qualified Data.Array.Parallel.Unlifted.Sequential.Vectors as US
-import qualified Data.Array.Parallel.Unlifted.Sequential.Streams as US
+import qualified Data.Array.Parallel.Unlifted.Vectors            as US
 
 #include "DPH_Interface.h"
 
@@ -54,9 +52,9 @@ unsafeIndex             = U.unsafeIndex
 unsafeIndexs_avs        = Prelude.error "dph-prim-seq: unsafeIndex_avs wrapper not defined"
 
 extract                 = U.extract
-unsafeExtracts_nss      = US.unsafeExtractsFromNestedWithUSSegd
-unsafeExtracts_ass      = US.unsafeExtractsFromVectorsWithUSSegd
-unsafeExtracts_avs      = US.unsafeExtractsFromVectorsWithUVSegd
+unsafeExtracts_nss      = unsafeExtractsFromNestedUSSegd
+unsafeExtracts_ass      = unsafeExtractsFromVectorsUSSegd
+unsafeExtracts_avs      = Prelude.error "dph-prim-seq: unsafeExtracts_avs: not done"
 drop                    = U.drop
 
 
index 99f4f27..33ef421 100644 (file)
@@ -28,16 +28,15 @@ module Data.Array.Parallel.Unlifted.Sequential
         , minimumSU
 
         -- * Pack and Combine
-        , combineSU)
+        , combineSU
+        
+        , unsafeExtractsFromNestedUSSegd
+        , unsafeExtractsFromVectorsUSSegd)
 where
 import Data.Array.Parallel.Unlifted.Sequential.Basics
 import Data.Array.Parallel.Unlifted.Sequential.Combinators
 import Data.Array.Parallel.Unlifted.Sequential.Sums
 import Data.Array.Parallel.Unlifted.Sequential.USegd            ()
 import Data.Array.Parallel.Unlifted.Sequential.USel             ()
-import Data.Array.Parallel.Unlifted.Sequential.USSegd           (USSegd)
 import Data.Array.Parallel.Unlifted.Sequential.UVSegd           ()
-import Data.Array.Parallel.Unlifted.Sequential.Vector           as U
-import qualified Data.Array.Parallel.Unlifted.Sequential.USegd  as USegd
-import qualified Data.Vector                                    as V
 import Prelude hiding (zip)
index 83311c3..5326907 100644 (file)
@@ -8,18 +8,21 @@ module Data.Array.Parallel.Unlifted.Sequential.Combinators (
   foldl1SU,     foldl1SSU,
   fold1SU,      fold1SSU,
   foldlRU,
-  combineSU
+  combineSU,
+  
+  unsafeExtractsFromNestedUSSegd,
+  unsafeExtractsFromVectorsUSSegd
 ) where
 import Data.Array.Parallel.Stream
-import Data.Array.Parallel.Unlifted.Sequential.Streams
+import Data.Array.Parallel.Unlifted.Stream
+import Data.Array.Parallel.Unlifted.Vectors                     as US
 import Data.Array.Parallel.Unlifted.Sequential.Vector           as U
-import Data.Array.Parallel.Unlifted.Sequential.Vectors          as US
 import Data.Array.Parallel.Unlifted.Sequential.USSegd           (USSegd)
 import Data.Array.Parallel.Unlifted.Sequential.USegd            (USegd)
 import qualified Data.Array.Parallel.Unlifted.Sequential.USSegd as USSegd
 import qualified Data.Array.Parallel.Unlifted.Sequential.USegd  as USegd
 import qualified Data.Vector                                    as V
-import Debug.Trace
+import qualified Data.Vector.Generic                            as G
 
 
 -- foldl ----------------------------------------------------------------------
@@ -40,7 +43,7 @@ foldlSSU :: (Unbox a, Unboxes a, Unbox b)
 foldlSSU f z ssegd xss
         = unstream
         $ foldSS f z    (stream (USSegd.takeLengths ssegd))
-                        (unsafeStreamSegsFromVectors Nothing ssegd xss)
+                        (unsafeStreamSegsFromVectorsUSSegd xss ssegd)
 
 
 -- fold -----------------------------------------------------------------------
@@ -79,7 +82,7 @@ foldl1SSU :: (Unbox a, Unboxes a)
 foldl1SSU f ssegd xxs
         = unstream
         $ fold1SS f     (stream (USSegd.takeLengths ssegd))
-                        (unsafeStreamSegsFromVectors Nothing ssegd xxs)
+                        (unsafeStreamSegsFromVectorsUSSegd xxs ssegd)
 
 
 -- fold1 ----------------------------------------------------------------------
@@ -117,3 +120,25 @@ combineSU bs xd xs yd ys
                     (stream (USegd.takeLengths xd)) (stream xs)
                     (stream (USegd.takeLengths yd)) (stream ys)
 
+
+
+-- Extracts wrappers ---------------------------------------------------------
+-- | Copy segments from a `Vectors` and concatenate them into a new array.
+unsafeExtractsFromNestedUSSegd
+        :: (U.Unbox a)
+        => USSegd -> V.Vector (Vector a) -> U.Vector a
+
+unsafeExtractsFromNestedUSSegd ussegd vectors
+        = G.unstream $ unsafeStreamSegsFromNestedUSSegd vectors ussegd
+{-# INLINE_U unsafeExtractsFromNestedUSSegd #-}
+
+
+-- | Copy segments from a `Vectors` and concatenate them into a new array.
+unsafeExtractsFromVectorsUSSegd
+        :: (Unboxes a, U.Unbox a)
+        => USSegd -> Vectors a -> U.Vector a
+
+unsafeExtractsFromVectorsUSSegd ussegd vectors
+        = G.unstream $ unsafeStreamSegsFromVectorsUSSegd vectors ussegd
+{-# INLINE_U unsafeExtractsFromVectorsUSSegd #-}
+
diff --git a/dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential/Streams.hs b/dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential/Streams.hs
deleted file mode 100644 (file)
index 9d8657b..0000000
+++ /dev/null
@@ -1,295 +0,0 @@
-{-# LANGUAGE CPP #-}
-#include "fusion-phases.h"
-module Data.Array.Parallel.Unlifted.Sequential.Streams
-        ( -- * Segment streamers
-          unsafeStreamSegsFromNested
-        , unsafeStreamSegsFromVectors
-
-          -- * Element streamers
-        , unsafeStreamElemsFromVectors
-        , unsafeStreamElemsFromVectorsVSegd
-
-          -- * SrcIxs transformers
-        , unsafeStreamSrcIxsThroughVSegids
-        , unsafeStreamSrcIxsThroughUSSegd
-
-          -- * Extracts wrappers
-        , unsafeExtractsFromNestedWithUSSegd
-        , unsafeExtractsFromVectorsWithUSSegd
-        , unsafeExtractsFromVectorsWithUVSegd)
-where
-import Data.Vector.Fusion.Stream.Size
-import Data.Vector.Fusion.Stream.Monadic
-import Data.Vector.Unboxed                                       (Unbox,   Vector)
-import Data.Array.Parallel.Unlifted.Sequential.Vectors           (Unboxes, Vectors)
-import Data.Array.Parallel.Unlifted.Sequential.USSegd            (USSegd(..))
-import Data.Array.Parallel.Unlifted.Sequential.UVSegd            (UVSegd(..))
-import qualified Data.Array.Parallel.Unlifted.Sequential.Vectors as US
-import qualified Data.Array.Parallel.Unlifted.Sequential.USegd   as USegd
-import qualified Data.Array.Parallel.Unlifted.Sequential.USSegd  as USSegd
-import qualified Data.Array.Parallel.Unlifted.Sequential.UVSegd  as UVSegd
-import qualified Data.Vector.Unboxed                             as U
-import qualified Data.Vector.Generic                             as G
-import qualified Data.Vector                                     as V
-import qualified Data.Primitive.ByteArray                        as P
-
-
--- Segment streamers ------------------------------------------------------------------------------
--- | Stream some physical segments from many data arrays.
--- 
---   * TODO: make this more efficient, and fix fusion.
---           We should be able to eliminate a lot of the indexing happening in the 
---           inner loop by being cleverer about the loop state.
---
---   * TODO: If this is contiguous then we can stream the lot without worrying 
---           about jumping between segments. EXCEPT that this information must be
---           statically visible else streamSegs won't fuse, so we can't have an 
---           ifThenElse checking the manifest flag.
-
-unsafeStreamSegsFromNested
-        :: (Unbox a, Monad m)
-        => USSegd               -- ^ Segment descriptor defining segments base
-                                --   on source vectors.
-        -> V.Vector (Vector a)  -- ^ Source arrays.
-        -> Stream m a
-
-unsafeStreamSegsFromNested
-        ussegd@(USSegd _ starts sources usegd)
-        pdatas
- = let  
-        -- length of each segment
-        pseglens        = USegd.takeLengths usegd
-        -- We've finished streaming this pseg
-        {-# INLINE_INNER fn #-}
-        fn (pseg, ix)
-         -- All psegs are done.
-         | pseg >= USSegd.length ussegd
-         = return $ Done
-         
-         -- Current pseg is done
-         | ix   >= pseglens `U.unsafeIndex` pseg 
-         = return $ Skip (pseg + 1, 0)
-
-         -- Stream an element from this pseg
-         | otherwise
-         = let  !srcid   = sources `U.unsafeIndex` pseg
-                !pdata   = pdatas  `V.unsafeIndex` srcid
-                !start   = starts  `U.unsafeIndex` pseg
-                !result  = pdata   `U.unsafeIndex` (start + ix)
-           in   return $ Yield result (pseg, ix + 1)
-
-   in   Stream fn (0, 0) Unknown
-{-# INLINE_STREAM unsafeStreamSegsFromNested #-}
-
-
-
--- Vectors ----------------------------------------------------------------------------------------
--- | Stream segments from a `Vectors`.
--- 
---   * There must be at least one segment in the `USSegd`, but this is not checked.
--- 
---   * No bounds checking is done for the `USSegd`.
---
---   * TODO: refactor this to take a stream of segment ids and indices.
---
-unsafeStreamSegsFromVectors
-        :: (Unboxes a, Monad m)
-        => Maybe (U.Vector Int) -- ^ Virtual segment identifiers
-                                --   if `Nothing` this is assumed to be [0, 1, 2 ... segs - 1]
-        -> USSegd               -- ^ Scattered segment descriptor
-        -> Vectors a            -- ^ Vectors holding source data.
-        -> Stream m a
-
-unsafeStreamSegsFromVectors 
-        mVSegids
-        ussegd@(USSegd _ segStarts segSources usegd) 
-        vectors
- = segStarts `seq` segSources `seq` usegd `seq` vectors `seq`
-   let  -- Length of each segment
-        !segLens        = USegd.takeLengths usegd
-
-        -- Total number of segments.
-        !segsTotal      = USSegd.length ussegd
-        -- Total number of elements to stream.
-        !elements       = USegd.takeElements usegd
-        -- Convert a virtual segment id to a physical one.
-        {-# INLINE toPSeg #-}
-        toPSeg segid
-         = case mVSegids of
-                 Nothing        -> segid
-                 Just vsegids   -> U.unsafeIndex vsegids segid
-        -- seg, ix of that seg in usegd, length of seg, elem in seg
-        {-# INLINE_INNER fnSeg #-}
-        fnSeg (ixSeg, baSeg, ixEnd, ixElem)
-         = ixSeg `seq` baSeg `seq`
-           if ixElem >= ixEnd                   -- Was that the last elem in the current seg?
-            then if ixSeg + 1 >= segsTotal      -- Was that last seg?
-
-                       -- That was the last seg, we're done.
-                  then return $ Done
-                  
-                       -- Move to the next seg.
-                  else let ixSeg'       = ixSeg + 1
-                           ixPSeg'      = toPSeg ixSeg'
-                           sourceSeg    = U.unsafeIndex segSources ixPSeg'
-                           startSeg     = U.unsafeIndex segStarts  ixPSeg'
-                           lenSeg       = U.unsafeIndex segLens    ixPSeg'
-                           (arr, startArr, lenArr) 
-                                        = US.unsafeIndexUnpack vectors sourceSeg
-                       in  return $ Skip
-                                  ( ixSeg'
-                                  , arr
-                                  , startArr + startSeg + lenSeg
-                                  , startArr + startSeg)
-
-                 -- Stream the next element from the segment.
-            else let !result  = P.indexByteArray baSeg ixElem
-                 in  return   $ Yield result (ixSeg, baSeg, ixEnd, ixElem + 1)
-
-        -- Starting state of the stream.
-        !initState
-         = let  ixPSeg    = toPSeg 0
-                sourceSeg = U.unsafeIndex segSources ixPSeg
-                startSeg  = U.unsafeIndex segStarts  ixPSeg
-                lenSeg    = U.unsafeIndex segLens    ixPSeg
-                (arr, startArr, lenArr) 
-                          = US.unsafeIndexUnpack vectors sourceSeg
-           in   ( 0                              -- starting segment id
-                , arr                            -- starting segment data
-                , startArr + startSeg + lenSeg   -- segment end
-                , startArr + startSeg)           -- segment start ix
-
-        -- It's important that we set the result stream size, so Data.Vector
-        -- doesn't need to add code to grow the result when it overflows.
-   in   Stream fnSeg initState (Exact elements) 
-{-# INLINE_STREAM unsafeStreamSegsFromVectors #-}
-
-
--- | Take a stream of chunk and chunk element indices, look them up from
---   some vectors, and produce a stream of elements.
-unsafeStreamElemsFromVectors 
-        :: (Monad m, Unboxes a) 
-        => Vectors a -> Stream m (Int, Int) -> Stream m a
-
-unsafeStreamElemsFromVectors vectors (Stream mkStep s0 size)
- = vectors `seq` Stream mkStep' s0 size
-  where
-        {-# INLINE_INNER mkStep' #-}
-        mkStep' s
-         = do   step    <- mkStep s
-                case step of
-                 Yield (ix1, ix2) s' -> return $ Yield (US.unsafeIndex2 vectors ix1 ix2) s'
-                 Skip s'             -> return $ Skip s'
-                 Done                -> return Done
-{-# INLINE_STREAM unsafeStreamElemsFromVectors #-}
-
-
--- | Take a stream of virtual segment ids and element indices, 
---   pass them through a `UVSegd` to get physical segment and element indices, 
---   and produce a stream of elements.
-unsafeStreamElemsFromVectorsVSegd
-        :: (Monad m, Unboxes a)
-        => Vectors a -> UVSegd -> Stream m (Int, Int) -> Stream m a
-
-unsafeStreamElemsFromVectorsVSegd vectors vsegd vsrcixs
- = let  !vsegids = UVSegd.takeVSegids vsegd
-        !ussegd  = UVSegd.takeUSSegd  vsegd
-   in   unsafeStreamElemsFromVectors        vectors
-         $ unsafeStreamSrcIxsThroughUSSegd  ussegd
-         $ unsafeStreamSrcIxsThroughVSegids vsegids
-         $ vsrcixs
-{-# INLINE_STREAM unsafeStreamElemsFromVectorsVSegd #-}
-
-
--- VSegd Streamers ------------------------------------------------------------
--- | Take a stream of virtual segment and segment element indices,
---   and convert it to a stream of physical segment and segment element indices.
-unsafeStreamSrcIxsThroughVSegids
-        :: Monad m
-        => U.Vector Int -> Stream m (Int, Int) -> Stream m (Int, Int)
-
-unsafeStreamSrcIxsThroughVSegids vsegids (Stream mkStep s0 size)
- = vsegids `seq` Stream mkStep' s0 size
- where
-        {-# INLINE_INNER mkStep' #-}
-        mkStep' s
-         = do   step    <- mkStep s
-                case step of
-                 Yield (ix1, ix2) s'
-                  -> let !pseg  = U.unsafeIndex vsegids ix1
-                     in  return $ Yield (pseg, ix2) s'
-                 
-                 Skip s' -> return $ Skip s'
-                 Done    -> return Done
-{-# INLINE_STREAM unsafeStreamSrcIxsThroughVSegids #-}
-
-
--- SSegd Streamers ------------------------------------------------------------
--- | Take a stream of segment and segment element indices,
---   and convert it to a stream of chunk and chunk element indices.
-unsafeStreamSrcIxsThroughUSSegd 
-        :: Monad m
-        => USSegd -> Stream m (Int, Int) -> Stream m (Int, Int)
-        
-unsafeStreamSrcIxsThroughUSSegd ussegd (Stream mkStep s0 size)
- = ussegd `seq` Stream mkStep' s0 size
- where
-        !sources = USSegd.takeSources ussegd
-        !starts  = USSegd.takeStarts  ussegd
-   
-        {-# INLINE_INNER mkStep' #-}
-        mkStep' s
-         = do   step    <- mkStep s
-                case step of
-                 Yield (ix1, ix2) s'
-                  -> let !src    = U.unsafeIndex sources ix1
-                         !start  = U.unsafeIndex starts  ix1
-                     in  return $ Yield (src, start + ix2) s'
-                 
-                 Skip s' -> return $ Skip s'
-                 Done    -> return Done
-{-# INLINE_STREAM unsafeStreamSrcIxsThroughUSSegd #-}
-
-
--- Extracts wrappers ---------------------------------------------------------
--- | Copy segments from a `Vectors` and concatenate them into a new array.
-unsafeExtractsFromNestedWithUSSegd
-        :: (U.Unbox a)
-        => USSegd -> V.Vector (Vector a) -> U.Vector a
-
-unsafeExtractsFromNestedWithUSSegd ussegd vectors
-        = G.unstream $ unsafeStreamSegsFromNested ussegd vectors
-{-# INLINE_U unsafeExtractsFromNestedWithUSSegd #-}
-
-
--- | Copy segments from a `Vectors` and concatenate them into a new array.
-unsafeExtractsFromVectorsWithUSSegd
-        :: (Unboxes a, U.Unbox a)
-        => USSegd -> Vectors a -> U.Vector a
-
-unsafeExtractsFromVectorsWithUSSegd ussegd vectors
-        = G.unstream $ unsafeStreamSegsFromVectors Nothing ussegd vectors
-{-# INLINE_U unsafeExtractsFromVectorsWithUSSegd #-}
-
-
--- | Copy segments from a `Vectors` and concatenate them into a new array.
---
---   TODO: avoid creating vsegids if possible.
---   TODO: we should refactor this to have an intermediate stream of
---         slice descriptors.
---
-unsafeExtractsFromVectorsWithUVSegd
-        :: (Unboxes a, U.Unbox a)
-        => UVSegd -> Vectors a -> U.Vector a
-
-unsafeExtractsFromVectorsWithUVSegd uvsegd vectors
-        = G.unstream 
-        $ unsafeStreamSegsFromVectors 
-                (Just (UVSegd.takeVSegids uvsegd))
-                (UVSegd.takeUSSegd  uvsegd)
-                vectors
-{-# INLINE_U unsafeExtractsFromVectorsWithUVSegd #-}
index 0241fde..702f94b 100644 (file)
@@ -111,8 +111,8 @@ tagsToIndices2 tags
 
 
 mapAccumS :: (acc -> a -> (acc,b)) -> acc -> S.Stream a -> S.Stream b
-mapAccumS f acc (Stream step s n)
-  = Stream step' (acc,s) n
+mapAccumS f acc0 (Stream step s0 n)
+  = Stream step' (acc0,s0) n
   where
    {-# INLINE_INNER step' #-}
    step' (acc,s) 
index a68484a..7bb6fee 100644 (file)
@@ -1,5 +1,5 @@
 {-# LANGUAGE ScopedTypeVariables, BangPatterns, CPP #-}
-
+{-# OPTIONS  -w #-}     -- TODO: enable warnings
 #include "fusion-phases.h"
 
 -- | Wrappers for primitives defined in @Data.Vector@.
diff --git a/dph-prim-seq/Data/Array/Parallel/Unlifted/Stream.hs b/dph-prim-seq/Data/Array/Parallel/Unlifted/Stream.hs
new file mode 100644 (file)
index 0000000..830f963
--- /dev/null
@@ -0,0 +1,17 @@
+
+module Data.Array.Parallel.Unlifted.Stream
+        ( -- * Index streamers.
+          unsafeStreamSrcIxsThroughVSegids
+        , unsafeStreamSrcIxsThroughUSSegd
+
+          -- * Element streamers.
+        , unsafeStreamElemsFromVectors
+        , unsafeStreamElemsFromVectorsVSegd
+
+          -- * Segment streamers.
+        , unsafeStreamSegsFromNestedUSSegd
+        , unsafeStreamSegsFromVectorsUSSegd)
+where
+import Data.Array.Parallel.Unlifted.Stream.Ixs
+import Data.Array.Parallel.Unlifted.Stream.Elems
+import Data.Array.Parallel.Unlifted.Stream.Segments
diff --git a/dph-prim-seq/Data/Array/Parallel/Unlifted/Stream/Elems.hs b/dph-prim-seq/Data/Array/Parallel/Unlifted/Stream/Elems.hs
new file mode 100644 (file)
index 0000000..ab8bb42
--- /dev/null
@@ -0,0 +1,48 @@
+{-# LANGUAGE CPP, NoMonomorphismRestriction #-}
+#include "fusion-phases.h"
+module Data.Array.Parallel.Unlifted.Stream.Elems
+        ( unsafeStreamElemsFromVectors
+        , unsafeStreamElemsFromVectorsVSegd)
+where
+import Data.Array.Parallel.Unlifted.Stream.Ixs
+import Data.Vector.Fusion.Stream.Monadic
+import Data.Array.Parallel.Unlifted.Vectors                      (Unboxes, Vectors)
+import Data.Array.Parallel.Unlifted.Sequential.UVSegd            (UVSegd(..))
+import qualified Data.Array.Parallel.Unlifted.Vectors            as US
+import qualified Data.Array.Parallel.Unlifted.Sequential.UVSegd  as UVSegd
+
+
+-- | Take a stream of chunk and chunk element indices, look them up from
+--   some vectors, and produce a stream of elements.
+unsafeStreamElemsFromVectors 
+        :: (Monad m, Unboxes a) 
+        => Vectors a -> Stream m (Int, Int) -> Stream m a
+
+unsafeStreamElemsFromVectors vectors (Stream mkStep s0 size0)
+ = vectors `seq` Stream mkStep' s0 size0
+  where
+        {-# INLINE_INNER mkStep' #-}
+        mkStep' s
+         = do   step    <- mkStep s
+                case step of
+                 Yield (ix1, ix2) s' -> return $ Yield (US.unsafeIndex2 vectors ix1 ix2) s'
+                 Skip s'             -> return $ Skip s'
+                 Done                -> return Done
+{-# INLINE_STREAM unsafeStreamElemsFromVectors #-}
+
+
+-- | Take a stream of virtual segment ids and element indices, 
+--   pass them through a `UVSegd` to get physical segment and element indices, 
+--   and produce a stream of elements.
+unsafeStreamElemsFromVectorsVSegd
+        :: (Monad m, Unboxes a)
+        => Vectors a -> UVSegd -> Stream m (Int, Int) -> Stream m a
+
+unsafeStreamElemsFromVectorsVSegd vectors vsegd vsrcixs
+ = let  !vsegids = UVSegd.takeVSegids vsegd
+        !ussegd  = UVSegd.takeUSSegd  vsegd
+   in   unsafeStreamElemsFromVectors        vectors
+         $ unsafeStreamSrcIxsThroughUSSegd  ussegd
+         $ unsafeStreamSrcIxsThroughVSegids vsegids
+         $ vsrcixs
+{-# INLINE_STREAM unsafeStreamElemsFromVectorsVSegd #-}
diff --git a/dph-prim-seq/Data/Array/Parallel/Unlifted/Stream/Ixs.hs b/dph-prim-seq/Data/Array/Parallel/Unlifted/Stream/Ixs.hs
new file mode 100644 (file)
index 0000000..557a876
--- /dev/null
@@ -0,0 +1,60 @@
+{-# LANGUAGE CPP, NoMonomorphismRestriction #-}
+#include "fusion-phases.h"
+module Data.Array.Parallel.Unlifted.Stream.Ixs
+        ( unsafeStreamSrcIxsThroughVSegids
+        , unsafeStreamSrcIxsThroughUSSegd)
+where
+import Data.Vector.Fusion.Stream.Monadic
+import Data.Array.Parallel.Unlifted.Sequential.USSegd            (USSegd(..))
+import qualified Data.Array.Parallel.Unlifted.Sequential.USSegd  as USSegd
+import qualified Data.Vector.Unboxed                             as U
+
+
+-- VSegd Streamers ------------------------------------------------------------
+-- | Take a stream of virtual segment and segment element indices,
+--   and convert it to a stream of physical segment and segment element indices.
+unsafeStreamSrcIxsThroughVSegids
+        :: Monad m
+        => U.Vector Int -> Stream m (Int, Int) -> Stream m (Int, Int)
+
+unsafeStreamSrcIxsThroughVSegids vsegids (Stream mkStep s0 size0)
+ = vsegids `seq` Stream mkStep' s0 size0
+ where
+        {-# INLINE_INNER mkStep' #-}
+        mkStep' s
+         = do   step    <- mkStep s
+                case step of
+                 Yield (ix1, ix2) s'
+                  -> let !pseg  = U.unsafeIndex vsegids ix1
+                     in  return $ Yield (pseg, ix2) s'
+                 
+                 Skip s' -> return $ Skip s'
+                 Done    -> return Done
+{-# INLINE_STREAM unsafeStreamSrcIxsThroughVSegids #-}
+
+
+-- SSegd Streamers ------------------------------------------------------------
+-- | Take a stream of segment and segment element indices,
+--   and convert it to a stream of chunk and chunk element indices.
+unsafeStreamSrcIxsThroughUSSegd 
+        :: Monad m
+        => USSegd -> Stream m (Int, Int) -> Stream m (Int, Int)
+        
+unsafeStreamSrcIxsThroughUSSegd ussegd (Stream mkStep s0 size0)
+ = ussegd `seq` Stream mkStep' s0 size0
+ where
+        !sources = USSegd.takeSources ussegd
+        !starts  = USSegd.takeStarts  ussegd
+   
+        {-# INLINE_INNER mkStep' #-}
+        mkStep' s
+         = do   step    <- mkStep s
+                case step of
+                 Yield (ix1, ix2) s'
+                  -> let !src    = U.unsafeIndex sources ix1
+                         !start  = U.unsafeIndex starts  ix1
+                     in  return $ Yield (src, start + ix2) s'
+                 
+                 Skip s' -> return $ Skip s'
+                 Done    -> return Done
+{-# INLINE_STREAM unsafeStreamSrcIxsThroughUSSegd #-}
diff --git a/dph-prim-seq/Data/Array/Parallel/Unlifted/Stream/Segments.hs b/dph-prim-seq/Data/Array/Parallel/Unlifted/Stream/Segments.hs
new file mode 100644 (file)
index 0000000..499b25a
--- /dev/null
@@ -0,0 +1,137 @@
+{-# LANGUAGE CPP, NoMonomorphismRestriction #-}
+#include "fusion-phases.h"
+module Data.Array.Parallel.Unlifted.Stream.Segments
+        ( unsafeStreamSegsFromNestedUSSegd
+        , unsafeStreamSegsFromVectorsUSSegd)
+where
+import Data.Vector.Fusion.Stream.Size
+import Data.Vector.Fusion.Stream.Monadic
+import Data.Vector.Unboxed                                       (Unbox,   Vector)
+import Data.Array.Parallel.Unlifted.Vectors                      (Unboxes, Vectors)
+import Data.Array.Parallel.Unlifted.Sequential.USSegd            (USSegd(..))
+import qualified Data.Array.Parallel.Unlifted.Vectors            as US
+import qualified Data.Array.Parallel.Unlifted.Sequential.USegd   as USegd
+import qualified Data.Array.Parallel.Unlifted.Sequential.USSegd  as USSegd
+import qualified Data.Vector.Unboxed                             as U
+import qualified Data.Vector                                     as V
+import qualified Data.Primitive.ByteArray                        as P
+
+
+-- Nested -----------------------------------------------------------------------------------------
+-- | Stream some physical segments from many data arrays.
+-- 
+--   * TODO: make this more efficient, and fix fusion.
+--           We should be able to eliminate a lot of the indexing happening in the 
+--           inner loop by being cleverer about the loop state.
+--
+--   * TODO: If this is contiguous then we can stream the lot without worrying 
+--           about jumping between segments. EXCEPT that this information must be
+--           statically visible else streamSegs won't fuse, so we can't have an 
+--           ifThenElse checking the manifest flag.
+
+unsafeStreamSegsFromNestedUSSegd
+        :: (Unbox a, Monad m)
+        => V.Vector (Vector a)  -- ^ Source arrays.
+        -> USSegd               -- ^ Segment descriptor defining segments base on source vectors.
+        -> Stream m a
+
+unsafeStreamSegsFromNestedUSSegd
+        pdatas
+        ussegd@(USSegd _ starts sources usegd)
+ = let  
+        -- length of each segment
+        pseglens        = USegd.takeLengths usegd
+        -- We've finished streaming this pseg
+        {-# INLINE_INNER fn #-}
+        fn (pseg, ix)
+         -- All psegs are done.
+         | pseg >= USSegd.length ussegd
+         = return $ Done
+         
+         -- Current pseg is done
+         | ix   >= pseglens `U.unsafeIndex` pseg 
+         = return $ Skip (pseg + 1, 0)
+
+         -- Stream an element from this pseg
+         | otherwise
+         = let  !srcid   = sources `U.unsafeIndex` pseg
+                !pdata   = pdatas  `V.unsafeIndex` srcid
+                !start   = starts  `U.unsafeIndex` pseg
+                !result  = pdata   `U.unsafeIndex` (start + ix)
+           in   return $ Yield result (pseg, ix + 1)
+
+   in   Stream fn (0, 0) Unknown
+{-# INLINE_STREAM unsafeStreamSegsFromNestedUSSegd #-}
+
+
+-- Vectors ----------------------------------------------------------------------------------------
+-- | Stream segments from a `Vectors`.
+-- 
+--   * There must be at least one segment in the `USSegd`, but this is not checked.
+-- 
+--   * No bounds checking is done for the `USSegd`.
+--
+unsafeStreamSegsFromVectorsUSSegd
+        :: (Unboxes a, Monad m)
+        => Vectors a            -- ^ Vectors holding source data.
+        -> USSegd               -- ^ Scattered segment descriptor
+        -> Stream m a
+
+unsafeStreamSegsFromVectorsUSSegd
+        vectors
+        ussegd@(USSegd _ segStarts segSources usegd) 
+ = segStarts `seq` segSources `seq` usegd `seq` vectors `seq`
+   let  -- Length of each segment
+        !segLens        = USegd.takeLengths usegd
+
+        -- Total number of segments.
+        !segsTotal      = USSegd.length ussegd
+        -- Total number of elements to stream.
+        !elements       = USegd.takeElements usegd
+        -- seg, ix of that seg in usegd, length of seg, elem in seg
+        {-# INLINE_INNER fnSeg #-}
+        fnSeg (ixSeg, baSeg, ixEnd, ixElem)
+         = ixSeg `seq` baSeg `seq`
+           if ixElem >= ixEnd                   -- Was that the last elem in the current seg?
+            then if ixSeg + 1 >= segsTotal      -- Was that last seg?
+
+                       -- That was the last seg, we're done.
+                  then return $ Done
+                  
+                       -- Move to the next seg.
+                  else let ixSeg'       = ixSeg + 1
+                           sourceSeg    = U.unsafeIndex segSources ixSeg'
+                           startSeg     = U.unsafeIndex segStarts  ixSeg'
+                           lenSeg       = U.unsafeIndex segLens    ixSeg'
+                           (arr, startArr, _) 
+                                        = US.unsafeIndexUnpack vectors sourceSeg
+                       in  return $ Skip
+                                  ( ixSeg'
+                                  , arr
+                                  , startArr + startSeg + lenSeg
+                                  , startArr + startSeg)
+
+                 -- Stream the next element from the segment.
+            else let !result  = P.indexByteArray baSeg ixElem
+                 in  return   $ Yield result (ixSeg, baSeg, ixEnd, ixElem + 1)
+
+        -- Starting state of the stream.
+        !initState
+         = let  sourceSeg = U.unsafeIndex segSources 0
+                startSeg  = U.unsafeIndex segStarts  0
+                lenSeg    = U.unsafeIndex segLens    0
+                (arr, startArr, _) 
+                          = US.unsafeIndexUnpack vectors sourceSeg
+           in   ( 0                              -- starting segment id
+                , arr                            -- starting segment data
+                , startArr + startSeg + lenSeg   -- segment end
+                , startArr + startSeg)           -- segment start ix
+
+        -- It's important that we set the result stream size, so Data.Vector
+        -- doesn't need to add code to grow the result when it overflows.
+   in   Stream fnSeg initState (Exact elements) 
+{-# INLINE_STREAM unsafeStreamSegsFromVectorsUSSegd #-}
+
@@ -10,7 +10,7 @@
 --   * TODO: We currently only allow primitive types to be in a Vectors, but 
 --           in future we'll want `Vectors` of tuples etc.
 --
-module Data.Array.Parallel.Unlifted.Sequential.Vectors 
+module Data.Array.Parallel.Unlifted.Vectors 
         ( Vectors(..)
         , Unboxes
         , empty
@@ -28,22 +28,15 @@ import qualified Data.Primitive.Array                           as P
 import qualified Data.Primitive.Types                           as P
 
 import qualified Data.Vector.Generic                            as G
-import qualified Data.Vector.Fusion.Stream                      as S
-import qualified Data.Vector.Fusion.Stream.Size                 as S
-import qualified Data.Vector.Fusion.Stream.Monadic              as M
 import qualified Data.Vector.Primitive                          as R
 import qualified Data.Vector.Unboxed                            as U
 import qualified Data.Vector                                    as V
 import Data.Vector.Unboxed                                      (Unbox)
-
-import qualified Data.Array.Parallel.Unlifted.Sequential.USegd  as USegd
-import qualified Data.Array.Parallel.Unlifted.Sequential.USSegd as USSegd
-import Data.Array.Parallel.Unlifted.Sequential.USSegd           (USSegd(..))
 import System.IO.Unsafe
 import Prelude  hiding (length)
-import Debug.Trace
 import Data.Word
 
+
 -- | Class of element types that can be used in a `Vectors`
 class R.Prim a => Unboxes a
 instance Unboxes Int
@@ -84,11 +77,11 @@ singleton vec
  $ do   R.MVector start len mbaData <- R.unsafeThaw $ G.convert vec
         baData  <- P.unsafeFreezeByteArray mbaData
         
-        mbaStarts       <- P.newByteArray 1
+        mbaStarts       <- P.newByteArray 4
         P.writeByteArray mbaStarts 0 start
         baStarts        <- P.unsafeFreezeByteArray mbaStarts
         
-        mbaLengths      <- P.newByteArray 1
+        mbaLengths      <- P.newByteArray 4
         P.writeByteArray mbaLengths 0 len
         baLengths       <- P.unsafeFreezeByteArray mbaLengths
         
@@ -122,15 +115,15 @@ unsafeIndex (Vectors _ starts lens arrs) ix
 -- | Retrieve a single element from a `Vectors`, 
 --   given the outer and inner indices.
 unsafeIndex2 :: Unboxes a => Vectors a -> Int -> Int -> a
-unsafeIndex2 (Vectors _ starts lens arrs) ix1 ix2
- = (arrs `P.indexArray` ix1) `P.indexByteArray` (starts `P.indexByteArray` ix1 + ix2)
+unsafeIndex2 (Vectors _ starts _ arrs) ix1 ix2
+ = (arrs `P.indexArray` ix1) `P.indexByteArray` ((starts `P.indexByteArray` ix1) + ix2)
 {-# INLINE_U unsafeIndex2 #-}
 
 
 -- | Retrieve an inner array from a `Vectors`, returning the array data, 
 --   starting index in the data, and vector length.
 unsafeIndexUnpack :: Unboxes a => Vectors a -> Int -> (P.ByteArray, Int, Int)
-unsafeIndexUnpack (Vectors n starts lens arrs) ix
+unsafeIndexUnpack (Vectors _ starts lens arrs) ix
  =      ( arrs   `P.indexArray` ix
         , starts `P.indexByteArray` ix
         , lens   `P.indexByteArray` ix)
@@ -141,7 +134,7 @@ unsafeIndexUnpack (Vectors n starts lens arrs) ix
 --
 --   * Important: appending two `Vectors` involes work proportional to
 --     the length of the outer arrays, not the size of the inner ones.
-append :: Unboxes a => Vectors a -> Vectors a -> Vectors a
+append :: (Unboxes a, Unbox a, Show a) => Vectors a -> Vectors a -> Vectors a
 append  (Vectors len1 starts1 lens1 chunks1)
         (Vectors len2 starts2 lens2 chunks2)
  = unsafePerformIO
@@ -169,7 +162,9 @@ append  (Vectors len1 starts1 lens1 chunks1)
         P.copyArray     maChunks len1       chunks2   0 len2
         chunks'         <- P.unsafeFreezeArray maChunks
         
-        return  $ Vectors len' starts' lens' chunks'
+        
+        let result      = Vectors len' starts' lens' chunks'
+        return  $ result
 {-# INLINE_U append #-}
 
 
@@ -178,7 +173,7 @@ fromVector :: (Unboxes a, Unbox a) => V.Vector (U.Vector a) -> Vectors a
 fromVector vecs
  = unsafePerformIO
  $ do   let len     = V.length vecs
-        let (barrs, vstarts, vlens)     = V.unzip3 $ V.map unpackUVector vecs
+        let (_, vstarts, vlens) = V.unzip3 $ V.map unpackUVector vecs
         let (baStarts, _, _)    = unpackUVector $ V.convert vstarts
         let (baLens,   _, _)    = unpackUVector $ V.convert vlens
         mchunks                 <- P.newArray len (error "Vectors: fromVector argh!")
@@ -212,14 +207,3 @@ unpackUVector vec
         return  (ba, start, len)
 {-# INLINE_U unpackUVector #-}
 
-
--- | Pack some array data, starting index and vector length unto an unboxed vector.
-packUVector :: (Unbox a, P.Prim a) => P.ByteArray -> Int -> Int -> U.Vector a
-packUVector ba start len
- = unsafePerformIO
- $ do   mba             <- P.unsafeThawByteArray ba
-        pvec            <- R.unsafeFreeze $ R.MVector start len mba
-        return $ G.convert pvec
-{-# INLINE_U packUVector #-}
-
-
index 32feeb9..b46ac3b 100644 (file)
@@ -18,9 +18,12 @@ Library
         Data.Array.Parallel.Unlifted.Sequential.USSegd
         Data.Array.Parallel.Unlifted.Sequential.UVSegd
         Data.Array.Parallel.Unlifted.Sequential.Vector
-        Data.Array.Parallel.Unlifted.Sequential.Vectors
-        Data.Array.Parallel.Unlifted.Sequential.Streams
         Data.Array.Parallel.Unlifted.Sequential
+        Data.Array.Parallel.Unlifted.Stream.Ixs
+        Data.Array.Parallel.Unlifted.Stream.Elems
+        Data.Array.Parallel.Unlifted.Stream.Segments
+        Data.Array.Parallel.Unlifted.Stream
+        Data.Array.Parallel.Unlifted.Vectors
         Data.Array.Parallel.Unlifted
         
   Other-Modules:
@@ -35,7 +38,7 @@ Library
               BangPatterns, MagicHash, UnboxedTuples, TypeOperators,
               NoMonomorphismRestriction
   GHC-Options:
-        -Odph
+        -Odph -Wall
         -funbox-strict-fields -fcpr-off
 
   Build-Depends: