dph-lifted-vseg: push new segment streamers into lifted library
authorBen Lippmeier <benl@ouroborus.net>
Mon, 5 Dec 2011 05:45:34 +0000 (16:45 +1100)
committerBen Lippmeier <benl@ouroborus.net>
Mon, 5 Dec 2011 05:45:34 +0000 (16:45 +1100)
18 files changed:
dph-lifted-vseg/Data/Array/Parallel/PArray/PData/Double.hs
dph-lifted-vseg/Data/Array/Parallel/PArray/PData/Int.hs
dph-lifted-vseg/Data/Array/Parallel/PArray/PData/Nested.hs
dph-lifted-vseg/Data/Array/Parallel/PArray/PData/Sum2.hs
dph-lifted-vseg/Data/Array/Parallel/PArray/PData/Word8.hs
dph-prim-interface/Data/Array/Parallel/Unlifted.hs
dph-prim-interface/interface/DPH_Header.h
dph-prim-interface/interface/DPH_Interface.h
dph-prim-par/Data/Array/Parallel/Unlifted.hs
dph-prim-par/Data/Array/Parallel/Unlifted/Parallel.hs
dph-prim-par/Data/Array/Parallel/Unlifted/Parallel/Segmented.hs
dph-prim-par/Data/Array/Parallel/Unlifted/Parallel/UPSSegd.hs
dph-prim-seq/Data/Array/Parallel/Unlifted.hs
dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential.hs
dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential/Combinators.hs
dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential/USSegd.hs
dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential/Vectors.hs
dph-prim-seq/dph-prim-seq.cabal

index c685430..cfe1a49 100644 (file)
@@ -92,7 +92,7 @@ instance PR Double where
 
   {-# INLINE_PDATA extractsPR #-}
   extractsPR (PDoubles arrs) ssegd
-        = PDouble (U.unsafeExtract_ss ssegd arrs)
+        = PDouble (U.unsafeExtracts_ass ssegd arrs)
                 
 
   -- Pack and Combine ---------------------------
index 93609cb..da5ba43 100644 (file)
@@ -79,7 +79,7 @@ instance PR Int where
 
   {-# INLINE_PDATA extractsPR #-}
   extractsPR (PInts arrs) ssegd
-        = PInt $ U.unsafeExtract_ss ssegd arrs
+        = PInt $ U.unsafeExtracts_ass ssegd arrs
 
 
 
index edd8cbd..82591be 100644 (file)
@@ -357,7 +357,7 @@ instance PR a => PR (PArray a) where
      let segsrcs        = U.sourcesOfSSegd ussegd
          seglens        = U.lengthsOfSSegd ussegd
 
-         vsegids_src    = U.unsafeExtract_vs ussegd (V.map pnested_vsegids arrs)
+         vsegids_src    = U.unsafeExtracts_nss ussegd (V.map pnested_vsegids arrs)
          srcids'        = U.replicate_s (U.lengthsToSegd seglens) segsrcs
 
          -- See Note: psrcoffset
index 06ebf56..472791d 100644 (file)
@@ -256,7 +256,7 @@ instance (PR a, PR b) => PR (Sum2 a b)  where
          --  and rebuild the result selector indices based on these tags.
          -- tags'       = [1     1     0     0     1     1     0      0     1]
          -- sel'        = [0     1     0     1     2     3     2      3     4]   
-         tags'          = U.unsafeExtract_vs ssegd tagss
+         tags'          = U.unsafeExtracts_nss ssegd tagss
          sel'           = U.tagsToSel2 tags'
 
          -- Extract the indices of the data elements we want.
@@ -264,7 +264,7 @@ instance (PR a, PR b) => PR (Sum2 a b)  where
          -- (result)      [R 60, R 70, L 80, L 20, R 30, R 90, L 100, L 40, R 50]
          --                ----------------0 ----------1 -----------2 ----3 ----4
          -- indices'    = [  1     2     0     0     0     3     1      1     0 ]
-         indices'       = U.unsafeExtract_vs ssegd (V.map U.indicesSel2 sels)
+         indices'       = U.unsafeExtracts_nss ssegd (V.map U.indicesSel2 sels)
 
          -- Count the number of L and R elements for each segment,
          --  then scan them to produce the starting index of each segment in the
index 4d2ef82..9271489 100644 (file)
@@ -88,7 +88,7 @@ instance PR Word8 where
 
   {-# INLINE_PDATA extractsPR #-}
   extractsPR (PWord8s arrs) ssegd
-        = PWord8 $ U.unsafeExtract_ss ssegd arrs
+        = PWord8 $ U.unsafeExtracts_ass ssegd arrs
 
 
   -- Pack and Combine ---------------------------
index 342a29b..26aacaa 100644 (file)
@@ -89,9 +89,10 @@ length          = P.length
 (!:)            = (P.!!)
 unsafeIndex     = (P.!!)
 
-extract xs i n   = P.take n (P.drop i xs)
-unsafeExtract_ss = notImplemented "unsafeExtract_ss"
-unsafeExtract_vs = notImplemented "unsafeExtract_vs"
+extract xs i n     = P.take n (P.drop i xs)
+unsafeExtracts_nss = notImplemented "unsafeExtract_nss"
+unsafeExtracts_ass = notImplemented "unsafeExtract_ass"
+unsafeExtracts_avs = notImplemented "unsafeExtract_avs"
 
 drop            = P.drop
 
index 0643ecf..6da5338 100644 (file)
@@ -21,7 +21,10 @@ module Data.Array.Parallel.Unlifted (
   -- * Projections
   length,
   (!:),     unsafeIndex,
-  extract,  unsafeExtract_ss,  unsafeExtract_vs,
+  extract,
+  unsafeExtracts_nss,
+  unsafeExtracts_ass,
+  unsafeExtracts_avs,
   drop,
   
   -- * Update
index 4259c81..9cf228e 100644 (file)
@@ -216,24 +216,33 @@ extract :: Elt a
 {-# INLINE_BACKEND extract #-}
 
 
+-- | O(n). Segmented extract, from a vector of arrays.
+--   TODO: This is a transitory interface, we are refactoring code to always
+--         use the previous form.
+unsafeExtracts_nss
+        :: Elt a
+        => SSegd
+        -> VV.Vector (Array a)
+        -> Array a
+{-# INLINE_BACKEND unsafeExtracts_nss #-}
+
+
 -- | O(n). Segmented extract.
-unsafeExtract_ss 
+unsafeExtracts_ass
         :: (Elt a, Elts a)
         => SSegd        -- ^ `SSegd` defining the slices to extract.
         -> Arrays a     -- ^ Source arrays.
         -> Array a
-{-# INLINE_BACKEND unsafeExtract_ss #-}
+{-# INLINE_BACKEND unsafeExtracts_ass #-}
 
 
--- | O(n). Segmented extract, from a vector of arrays.
---   TODO: This is a transitory interface, we are refactoring code to always
---         use the previous form.
-unsafeExtract_vs 
-        :: Elt a
-        => SSegd
-        -> VV.Vector (Array a)
+-- | O(n). Segmented extract.
+unsafeExtracts_avs
+        :: (Elt a, Elts a)
+        => VSegd        -- ^ `VSegd` defining the slices to extract.
+        -> Arrays a     -- ^ Source arrays.
         -> Array a
-{-# INLINE_BACKEND unsafeExtract_vs #-}
+{-# INLINE_BACKEND unsafeExtracts_avs #-}
 
 
 -- | O(n). Drop some elements from the front of an array, 
index 12470ac..b5f4a7d 100644 (file)
@@ -113,8 +113,9 @@ extract arr i n
         =  tracePrim (TraceExtract (Seq.length arr) i n)
         $! Seq.extract arr i n
 
-unsafeExtract_ss        = UPSSegd.unsafeExtractsWithP
-unsafeExtract_vs        = extractsUP
+unsafeExtracts_nss      = unsafeExtractsFromNestedWithUPSSegd
+unsafeExtracts_ass      = unsafeExtractsFromVectorsWithUPSSegd
+unsafeExtracts_avs      = unsafeExtractsFromVectorsWithUPVSegd
 
 drop n arr
         =  tracePrim (TraceDrop n (Seq.length arr))
index b64a57f..248a9f5 100644 (file)
 --         Parallel modules, and the names are in the same order as in those
 --         modules.
 --
-module Data.Array.Parallel.Unlifted.Parallel (
-  -- * Basics
-  lengthUP,
-  nullUP,
-  emptyUP,
-  indexedUP,
-  replicateUP,
-  repeatUP,
-  interleaveUP,
+module Data.Array.Parallel.Unlifted.Parallel 
+        ( -- * Basics
+          lengthUP
+        , nullUP
+        , emptyUP
+        , indexedUP
+        , replicateUP
+        , repeatUP
+        , interleaveUP
   
-  -- * Combinators
-  mapUP,
-  filterUP,
-  packUP,
-  combineUP,  combine2UP,
-  zipWithUP,
-  foldUP,     fold1UP,
-  foldlUP,    foldl1UP,
-  scanUP,
-  extractsUP,
+          -- * Combinators
+        , mapUP
+        , filterUP
+        , packUP
+        , combineUP,  combine2UP
+        , zipWithUP
+        , foldUP,     fold1UP
+        , foldlUP,    foldl1UP
+        , scanUP
   
-  -- * Enum
-  enumFromToUP,
-  enumFromThenToUP,
-  enumFromStepLenUP,
-  enumFromStepLenEachUP,
+          -- * Enum
+        , enumFromToUP
+        , enumFromThenToUP
+        , enumFromStepLenUP
+        , enumFromStepLenEachUP
   
-  -- * Permute
-  bpermuteUP,
-  updateUP,
-
-  -- * Segmented
-  replicateRSUP,
-  appendSUP,
-  foldRUP,
-  sumRUP,
-
-  -- * Subarrays
-  dropUP,
+          -- * Permute
+        , bpermuteUP
+        , updateUP
+
+          -- * Segmented
+        , replicateRSUP
+        , appendSUP
+        , foldRUP
+        , sumRUP
+        , unsafeExtractsFromNestedWithUPSSegd
+        , unsafeExtractsFromVectorsWithUPSSegd
+        , unsafeExtractsFromVectorsWithUPVSegd
+
+          -- * Subarrays
+        , dropUP
   
-  -- * Sums
-  andUP,
-  orUP,
-  allUP,     anyUP,
-  sumUP,     productUP,
-  maximumUP, maximumByUP,
-  maximumIndexByUP
-where
+          -- * Sums
+        , andUP
+        , orUP
+        , allUP,     anyUP
+        , sumUP,     productUP
+        , maximumUP, maximumByUP
+        , maximumIndexByUP)
+where
 import Data.Array.Parallel.Unlifted.Parallel.Basics
 import Data.Array.Parallel.Unlifted.Parallel.Combinators
 import Data.Array.Parallel.Unlifted.Parallel.Enum
@@ -72,49 +74,3 @@ import qualified Data.Array.Parallel.Unlifted.Parallel.UPSegd   as UPSegd
 import qualified Data.Array.Parallel.Unlifted.Parallel.UPSSegd  as UPSSegd
 import qualified Data.Array.Parallel.Unlifted.Sequential.USSegd as USSegd
 import qualified Data.Vector                                    as VS
-
--- | O(n). Segmented extract.
-
--- TODO: make a uvsegd version that backs off to this one if the uvsegd is manifest.
---
--- TODO: zip srcids ixBase and startsix before calling replicate_s
---       don't want to replicate_s multiple times on same segd.
---
--- TODO: pass in a projection function to get the correct array from the vector, 
---       to avoid unpackig all the arrays from PDatas with a big map traversal.
---
-{-# INLINE_UP extractsUP #-}
-extractsUP :: Unbox a => UPSSegd -> VS.Vector (Vector a) -> Vector a
-extractsUP !ssegd !arrs
- = let  !segd      = UPSegd.fromLengths $ UPSSegd.takeLengths ssegd
-        -- source array ids to load from
-        !srcids'   = UPSegd.replicateWithP segd $ UPSSegd.takeSources ssegd
-
-        -- base indices in the source array to load from
-        !baseixs   = UPSegd.replicateWithP segd $ UPSSegd.takeStarts  ssegd
-        
-        -- starting indices for each of the segments in the result
-        !startixs' = UPSegd.replicateWithP segd $ UPSSegd.takeIndices ssegd
-
-        {-# INLINE get #-}
-        get (ixDst, ixSegDst) (ixSegSrcBase, srcid)
-         = let  !arr    = arrs `VS.unsafeIndex` srcid
-                !ix     = ixDst - ixSegDst + ixSegSrcBase
-           in   arr `US.unsafeIndex` ix
-       
-       -- total length of the result
-        !dstLen    = UPSSegd.takeElements ssegd
-         
-   in   zipWithUP get
-                (US.zip (enumFromToUP 0 (dstLen - 1)) startixs')
-                (US.zip baseixs                       srcids')
-
-
-
-
-
-
-
-
-
index 160d763..b45eafc 100644 (file)
@@ -3,24 +3,37 @@
 #include "fusion-phases.h"
 
 -- | Parallel combinators for segmented unboxed arrays
-module Data.Array.Parallel.Unlifted.Parallel.Segmented (
-  replicateRSUP, appendSUP,
-  foldRUP,
-  sumRUP
-) where
+module Data.Array.Parallel.Unlifted.Parallel.Segmented 
+        ( replicateRSUP
+        , appendSUP
+        , foldRUP
+        , sumRUP
+        , unsafeExtractsFromNestedWithUPSSegd
+        , unsafeExtractsFromVectorsWithUPSSegd
+        , unsafeExtractsFromVectorsWithUPVSegd)
+where
 import Data.Array.Parallel.Unlifted.Distributed
 import Data.Array.Parallel.Unlifted.Parallel.Basics
 import Data.Array.Parallel.Unlifted.Parallel.UPSegd                     (UPSegd)
+import Data.Array.Parallel.Unlifted.Parallel.UPSSegd                    (UPSSegd)
+import Data.Array.Parallel.Unlifted.Parallel.UPVSegd                    (UPVSegd)
 import Data.Array.Parallel.Unlifted.Sequential.USegd                    (USegd)
+import Data.Array.Parallel.Unlifted.Sequential.USSegd                   (USSegd)
 import Data.Array.Parallel.Unlifted.Sequential.Vector                   as Seq
+import Data.Array.Parallel.Unlifted.Sequential.Vectors                  (Vectors)
 import qualified Data.Array.Parallel.Unlifted.Parallel.UPSegd           as UPSegd
+import qualified Data.Array.Parallel.Unlifted.Parallel.UPSSegd          as UPSSegd
+import qualified Data.Array.Parallel.Unlifted.Parallel.UPVSegd          as UPVSegd
+import qualified Data.Array.Parallel.Unlifted.Sequential.Vectors        as US
+import qualified Data.Array.Parallel.Unlifted.Sequential.Streams        as US
 import qualified Data.Array.Parallel.Unlifted.Sequential                as Seq
 import qualified Data.Array.Parallel.Unlifted.Sequential.USegd          as USegd
+import qualified Data.Array.Parallel.Unlifted.Sequential.USSegd         as USSegd
 
 import Data.Vector.Fusion.Stream.Monadic ( Stream(..), Step(..) )
 import Data.Vector.Fusion.Stream.Size    ( Size(..) )
-import qualified Data.Vector.Fusion.Stream              as S
-
+import qualified Data.Vector.Fusion.Stream                              as S
+import qualified Data.Vector                                            as V
 
 -- replicate ------------------------------------------------------------------
 
@@ -132,3 +145,46 @@ sumRUP :: (Num e, Unbox e) => Int -> Vector e -> Vector e
 sumRUP = foldRUP (+) 0
 
 
+-- Extracts -------------------------------------------------------------------
+-- | Copy segments from a nested vectors and concatenate them into a new array.
+unsafeExtractsFromNestedWithUPSSegd
+        :: Unbox a
+        => UPSSegd -> V.Vector (Vector a) -> Vector a
+
+unsafeExtractsFromNestedWithUPSSegd upssegd vectors
+        = Seq.unstream 
+        $ US.unsafeStreamSegsFromNested 
+                (UPSSegd.takeUSSegd upssegd)
+                vectors
+{-# INLINE_U unsafeExtractsFromNestedWithUPSSegd #-}
+
+
+-- | TODO: make this parallel.
+unsafeExtractsFromVectorsWithUPSSegd
+        :: (Unbox a, US.Unboxes a)
+        => UPSSegd
+        -> Vectors a
+        -> Vector a
+
+unsafeExtractsFromVectorsWithUPSSegd upssegd vectors
+        = US.unsafeExtractsFromVectorsWithUSSegd
+                (UPSSegd.takeUSSegd upssegd) 
+                vectors
+{-# INLINE_UP unsafeExtractsFromVectorsWithUPSSegd #-}
+
+
+-- | TODO: make this parallel.
+unsafeExtractsFromVectorsWithUPVSegd
+        :: (Unbox a, US.Unboxes a)
+        => UPVSegd
+        -> Vectors a
+        -> Vector a
+
+unsafeExtractsFromVectorsWithUPVSegd upvsegd vectors
+        = Seq.unstream 
+        $ US.unsafeStreamSegsFromVectors 
+                (Just (UPVSegd.takeVSegidsRedundant upvsegd))
+                (UPSSegd.takeUSSegd $ UPVSegd.takeUPSSegd upvsegd)
+                vectors
+{-# INLINE_UP unsafeExtractsFromVectorsWithUPVSegd #-}
+
index 717edff..29ddfe7 100644 (file)
@@ -31,10 +31,7 @@ module Data.Array.Parallel.Unlifted.Parallel.UPSSegd (
   foldWithP,
   fold1WithP,
   sumWithP,
-  foldSegsWithP,
-  
-  -- * Segmented Projections
-  unsafeExtractsWithP
+  foldSegsWithP,  
 ) where
 import Data.Array.Parallel.Pretty                                       hiding (empty)
 import Data.Array.Parallel.Unlifted.Distributed
@@ -331,15 +328,3 @@ fixupFold f !mrs !dcarry = go 1
                            go (i + 1)
       where
         (k,c) = indexD dcarry i
-
--- Extracts -------------------------------------------------------------------
--- | TODO: make this parallel.
-{-# INLINE_UP unsafeExtractsWithP #-}
-unsafeExtractsWithP 
-        :: (Unbox a, Unboxes a)
-        => UPSSegd
-        -> Vectors a
-        -> Vector a
-
-unsafeExtractsWithP upssegd vectors
-        = Seq.unsafeExtracts (takeUSSegd upssegd) vectors
index 5e71a93..f7f3f16 100644 (file)
@@ -7,7 +7,6 @@
 --   @dph-prim-par@ and @dph-prim-seq@ really do export the same symbols.
 #include "DPH_Header.h"
 
-import Data.Array.Parallel.Unlifted.Sequential.Vector (Unbox, Vector)
 import Data.Array.Parallel.Unlifted.Sequential.USel
 import Data.Array.Parallel.Unlifted.Sequential.Basics
 import Data.Array.Parallel.Unlifted.Sequential.Combinators
@@ -18,6 +17,7 @@ import qualified Data.Array.Parallel.Unlifted.Sequential.USSegd  as USSegd
 import qualified Data.Array.Parallel.Unlifted.Sequential.UVSegd  as UVSegd
 import qualified Data.Array.Parallel.Unlifted.Sequential.Vector  as U
 import qualified Data.Array.Parallel.Unlifted.Sequential.Vectors as US
+import qualified Data.Array.Parallel.Unlifted.Sequential.Streams as US
 
 #include "DPH_Interface.h"
 
@@ -52,8 +52,9 @@ length                  = U.length
 (!:)                    = (U.!)
 unsafeIndex             = U.unsafeIndex
 extract                 = U.extract
-unsafeExtract_ss        = US.unsafeExtracts
-unsafeExtract_vs        = extractsSU
+unsafeExtracts_nss      = US.unsafeExtractsFromNestedWithUSSegd
+unsafeExtracts_ass      = US.unsafeExtractsFromVectorsWithUSSegd
+unsafeExtracts_avs      = US.unsafeExtractsFromVectorsWithUVSegd
 drop                    = U.drop
 
 
index c53696f..99f4f27 100644 (file)
@@ -27,9 +27,6 @@ module Data.Array.Parallel.Unlifted.Sequential
         , maximumSU
         , minimumSU
 
-        -- * Projections
-        , extractsSU
-        
         -- * Pack and Combine
         , combineSU)
 where
@@ -44,46 +41,3 @@ import Data.Array.Parallel.Unlifted.Sequential.Vector           as U
 import qualified Data.Array.Parallel.Unlifted.Sequential.USegd  as USegd
 import qualified Data.Vector                                    as V
 import Prelude hiding (zip)
-
--- | O(n). Segmented extract.
---
---   * Currently broken, and will just `error`.
-
--- TODO: This isn't finished because we don't have a sequential version of
---       USegd.replicateWith / segmented replicate. There is a corresponding
---       version of extractsSU in the parallel prim library.
-{-# INLINE_U extractsSU #-}
-extractsSU :: Unbox a => USSegd -> V.Vector (Vector a) -> Vector a
-extractsSU = error "Data.Array.Parallel.Unlifted.Sequential.extractsSU: not implemented"
-
-{-
-extractsSU arrs srcids ixBase lens 
- = let -- total length of the result
-        dstLen    = sumSU lens
-        segd      = USegd.fromLengths lens
-    
-        -- source array ids to load from
-        srcids'   = USegd.replicateWithP segd srcids
-
-        -- base indices in the source array to load from
-        baseixs   = USegd.replicateWithP segd ixBase
-        
-        -- starting indices for each of the segments
-        startixs  = U.scanl (+) 0 lens
-          
-        -- starting indices for each of the segments in the result
-        startixs' = USegd.replicateWithP segd startixs
-
-        {-# INLINE get #-}
-        get (ixDst, ixSegDst) (ixSegSrcBase, srcid)
-         = let  !arr    = arrs V.! srcid                        -- TODO: use unsafeIndex
-                !ix     = ixDst - ixSegDst + ixSegSrcBase
-           in   arr U.! ix                                      -- TODO unsafe unsafeIndex
-         
-        result    = U.zipWith get
-                        (zip (U.enumFromTo 0 (dstLen - 1))
-                                 startixs')
-                        (zip baseixs
-                                 srcids')
-   in result
--}
\ No newline at end of file
index 7253604..83311c3 100644 (file)
@@ -11,6 +11,7 @@ module Data.Array.Parallel.Unlifted.Sequential.Combinators (
   combineSU
 ) where
 import Data.Array.Parallel.Stream
+import Data.Array.Parallel.Unlifted.Sequential.Streams
 import Data.Array.Parallel.Unlifted.Sequential.Vector           as U
 import Data.Array.Parallel.Unlifted.Sequential.Vectors          as US
 import Data.Array.Parallel.Unlifted.Sequential.USSegd           (USSegd)
@@ -39,7 +40,7 @@ foldlSSU :: (Unbox a, Unboxes a, Unbox b)
 foldlSSU f z ssegd xss
         = unstream
         $ foldSS f z    (stream (USSegd.takeLengths ssegd))
-                        (US.unsafeStreamVectors ssegd xss)
+                        (unsafeStreamSegsFromVectors Nothing ssegd xss)
 
 
 -- fold -----------------------------------------------------------------------
@@ -78,7 +79,7 @@ foldl1SSU :: (Unbox a, Unboxes a)
 foldl1SSU f ssegd xxs
         = unstream
         $ fold1SS f     (stream (USSegd.takeLengths ssegd))
-                        (US.unsafeStreamVectors ssegd xxs)
+                        (unsafeStreamSegsFromVectors Nothing ssegd xxs)
 
 
 -- fold1 ----------------------------------------------------------------------
index e338111..fbd29d4 100644 (file)
@@ -26,22 +26,15 @@ module Data.Array.Parallel.Unlifted.Sequential.USSegd (
         
         -- * Operators
         appendWith,
-        cullOnVSegids,
-        
-        -- * Streams
-        streamSegs
+        cullOnVSegids
 ) where
 import Data.Array.Parallel.Unlifted.Sequential.USegd            (USegd)
-import Data.Array.Parallel.Unlifted.Sequential.Vector           (Vector, Unbox)
+import Data.Array.Parallel.Unlifted.Sequential.Vector           (Vector)
 import Data.Array.Parallel.Pretty                               hiding (empty)
 import Prelude                                                  hiding (length)
 
-import qualified Data.Array.Parallel.Unlifted.Sequential.USegd  as USegd
-import qualified Data.Array.Parallel.Unlifted.Sequential.Vector as U
-import qualified Data.Vector                                    as V
-import qualified Data.Vector.Fusion.Stream                      as S
-import qualified Data.Vector.Fusion.Stream.Size                 as S
-import qualified Data.Vector.Fusion.Stream.Monadic              as M
+import qualified Data.Array.Parallel.Unlifted.Sequential.USegd   as USegd
+import qualified Data.Array.Parallel.Unlifted.Sequential.Vector  as U
 
 
 -- USSegd ---------------------------------------------------------------------
@@ -221,7 +214,8 @@ getSeg (USSegd _ starts sources usegd) ix
 {-# INLINE_U getSeg #-}
 
 
--- Operators ------------------------------------------------------------------
+-- Operators ==================================================================
+
 -- | O(n). Produce a segment descriptor that describes the result of appending
 --   two arrays.
 appendWith
@@ -309,50 +303,3 @@ cullOnVSegids vsegids (USSegd _ starts sources usegd)
 --  This can also be expensive and we want to see the SCC in profiling builds.
 
 
--- Stream Functions -----------------------------------------------------------
--- | Stream some physical segments from many data arrays.
--- 
---   * TODO: make this more efficient, and fix fusion.
---           We should be able to eliminate a lot of the indexing happening in the 
---           inner loop by being cleverer about the loop state.
---
---   * TODO: If this is contiguous then we can stream the lot without worrying 
---           about jumping between segments. EXCEPT that this information must be
---           statically visible else streamSegs won't fuse, so we can't have an 
---           ifThenElse checking the manifest flag.
-
-streamSegs
-        :: Unbox a
-        => USSegd               -- ^ Segment descriptor defining segments base
-                                --   on source vectors.
-        -> V.Vector (Vector a)  -- ^ Source arrays.
-        -> S.Stream a
-
-{-# INLINE_STREAM streamSegs #-}
-streamSegs ussegd@(USSegd _ starts sources usegd) pdatas
- = let  
-        -- length of each segment
-        pseglens        = USegd.takeLengths usegd
-        -- We've finished streaming this pseg
-        {-# INLINE_INNER fn #-}
-        fn (pseg, ix)
-         -- All psegs are done.
-         | pseg >= length ussegd
-         = return $ S.Done
-         
-         -- Current pseg is done
-         | ix   >= pseglens `U.unsafeIndex` pseg 
-         = return $ S.Skip (pseg + 1, 0)
-
-         -- Stream an element from this pseg
-         | otherwise
-         = let  !srcid   = sources `U.unsafeIndex` pseg
-                !pdata   = pdatas  `V.unsafeIndex` srcid
-                !start   = starts  `U.unsafeIndex` pseg
-                !result  = pdata   `U.unsafeIndex` (start + ix)
-           in   return $ S.Yield result (pseg, ix + 1)
-
-   in   M.Stream fn (0, 0) S.Unknown
-
-
index 21fdff4..795fc0b 100644 (file)
@@ -21,10 +21,7 @@ module Data.Array.Parallel.Unlifted.Sequential.Vectors
         , unsafeIndexUnpack
         , append
         , fromVector
-        , toVector
-        
-        , unsafeExtracts
-        , unsafeStreamVectors)
+        , toVector)
 where
 import qualified Data.Primitive.ByteArray                       as P
 import qualified Data.Primitive.Array                           as P
@@ -226,73 +223,3 @@ packUVector ba start len
 {-# INLINE_U packUVector #-}
 
 
--- | Copy segments from a `Vectors` and concatenate them into a new array.
-unsafeExtracts
-        :: (Unboxes a, U.Unbox a)
-        => USSegd -> Vectors a -> U.Vector a
-
-unsafeExtracts ussegd vectors
-        = G.unstream $ unsafeStreamVectors ussegd vectors
-{-# INLINE_U unsafeExtracts #-}
-
-
--- Stream -----------------------------------------------------------------------------------------
--- | Stream segments from a `Vectors`.
--- 
---   * There must be at least one segment in the `USSegd`, but this is not checked.
--- 
---   * No bounds checking is done for the `USSegd`.
-unsafeStreamVectors :: Unboxes a => USSegd -> Vectors a -> S.Stream a
-unsafeStreamVectors ussegd@(USSegd _ segStarts segSources usegd) vectors
- = segStarts `seq` segSources `seq` usegd `seq` vectors `seq`
-   let  -- Length of each segment
-        !segLens        = USegd.takeLengths usegd
-
-        -- Total number of segments.
-        !segsTotal      = USSegd.length ussegd
-        -- Total number of elements to stream.
-        !elements       = USegd.takeElements usegd
-        -- seg, ix of that seg in usegd, length of seg, elem in seg
-        {-# INLINE_INNER fnSeg #-}
-        fnSeg (ixSeg, baSeg, ixEnd, ixElem)
-         = ixSeg `seq` baSeg `seq`
-           if ixElem >= ixEnd                   -- Was that the last elem in the current seg?
-            then if ixSeg + 1 >= segsTotal      -- Was that last seg?
-
-                       -- That was the last seg, we're done.
-                  then return $ S.Done
-                  
-                       -- Move to the next seg.
-                  else let ixSeg'       = ixSeg + 1
-                           sourceSeg    = U.unsafeIndex segSources ixSeg'
-                           startSeg     = U.unsafeIndex segStarts  ixSeg'
-                           lenSeg       = U.unsafeIndex segLens    ixSeg'
-                           (arr, startArr, lenArr) = unsafeIndexUnpack vectors sourceSeg
-                       in  return $ S.Skip
-                                  ( ixSeg'
-                                  , arr
-                                  , startArr + startSeg + lenSeg
-                                  , startArr + startSeg)
-
-                 -- Stream the next element from the segment.
-            else let !result  = P.indexByteArray baSeg ixElem
-                 in  return   $ S.Yield result (ixSeg, baSeg, ixEnd, ixElem + 1)
-
-        -- Starting state of the stream.
-        !initState
-         = let  sourceSeg       = U.unsafeIndex segSources 0
-                startSeg        = U.unsafeIndex segStarts  0
-                lenSeg          = U.unsafeIndex segLens    0
-                (arr, startArr, lenArr) = unsafeIndexUnpack vectors sourceSeg
-           in   ( 0                              -- starting segment id
-                , arr                            -- starting segment data
-                , startArr + startSeg + lenSeg   -- segment end
-                , startArr + startSeg)           -- segment start ix
-
-        -- It's important that we set the result stream size, so Data.Vector
-        -- doesn't need to add code to grow the result when it overflows.
-   in   M.Stream fnSeg initState (S.Exact elements) 
-{-# INLINE_STREAM unsafeStreamVectors #-}
-
index 4e0b5d0..7ae5d75 100644 (file)
@@ -19,6 +19,7 @@ Library
         Data.Array.Parallel.Unlifted.Sequential.UVSegd
         Data.Array.Parallel.Unlifted.Sequential.Vector
         Data.Array.Parallel.Unlifted.Sequential.Vectors
+        Data.Array.Parallel.Unlifted.Sequential.Streams
         Data.Array.Parallel.Unlifted.Sequential
         Data.Array.Parallel.Unlifted