Push scattered segmented fold fns into Unlifted packages
authorBen Lippmeier <benl@ouroborus.net>
Mon, 3 Oct 2011 08:09:00 +0000 (19:09 +1100)
committerBen Lippmeier <benl@ouroborus.net>
Tue, 4 Oct 2011 02:45:22 +0000 (13:45 +1100)
13 files changed:
dph-common-vseg/Data/Array/Parallel/Lifted/Combinators.hs
dph-common-vseg/Data/Array/Parallel/PArray/Stream.hs [deleted file]
dph-common-vseg/Data/Array/Parallel/PArray/Sums.hs [new file with mode: 0644]
dph-common-vseg/dph-common-vseg.cabal
dph-prim-interface/Data/Array/Parallel/Unlifted.hs
dph-prim-interface/interface/DPH_Header.h
dph-prim-par/Data/Array/Parallel/Unlifted.hs
dph-prim-par/Data/Array/Parallel/Unlifted/Parallel.hs
dph-prim-par/Data/Array/Parallel/Unlifted/Parallel/Segmented.hs
dph-prim-seq/Data/Array/Parallel/Unlifted.hs
dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential/Segmented.hs
dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential/Segmented/Combinators.hs
dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential/Segmented/USSegd.hs

index e311fac..1431b61 100644 (file)
@@ -31,7 +31,7 @@ where
 import Data.Array.Parallel.Lifted.Closure
 import Data.Array.Parallel.PArray.PData
 import Data.Array.Parallel.PArray.PRepr
-import Data.Array.Parallel.PArray.Stream
+import Data.Array.Parallel.PArray.Sums
 import Data.Array.Parallel.PArray
 import qualified Data.Array.Parallel.Unlifted   as U
 import qualified Data.Vector                    as V
@@ -169,7 +169,8 @@ slicePA_l _ sliceStarts sliceLens arrs
 -- concat ---------------------------------------------------------------------
 {-# INLINE_PA concatPP #-}
 concatPP :: PA a => PArray (PArray a) :-> PArray a
-concatPP        = closure1 concatPA concatPA_l
+concatPP
+        = closure1 concatPA concatPA_l
 
 
 {-# INLINE_PA concatPA_l #-}
@@ -182,7 +183,7 @@ concatPA_l _ darr
 -- unzip ----------------------------------------------------------------------
 {-# INLINE_PA unzipPP #-}
 unzipPP :: (PA a, PA b) => PArray (a, b) :-> (PArray a, PArray b)
-unzipPP         = closure1 unzipPA unzipPA_l
+unzipPP = closure1 unzipPA unzipPA_l
 
 
 -- Scalar =====================================================================
@@ -206,8 +207,9 @@ intOfBool False = 0
 
 -- plus -----------------------------------------------------------------------
 {-# INLINE_PA plusPP_int #-}
-plusPP_int       :: Int :-> Int :-> Int
-plusPP_int       = closure2 (+) plusPA_int_l
+plusPP_int      :: Int :-> Int :-> Int
+plusPP_int
+        = closure2 (+) plusPA_int_l
 
 
 {-# INLINE_PA plusPA_int_l #-}
@@ -219,7 +221,8 @@ plusPA_int_l _ (PInt arr1) (PInt arr2)
 -- mult -----------------------------------------------------------------------
 {-# INLINE_PA multPP_double #-}
 multPP_double   :: Double :-> Double :-> Double
-multPP_double = closure2 (*) multPA_double_l
+multPP_double
+        = closure2 (*) multPA_double_l
 
 
 {-# INLINE_PA multPA_double_l #-}
@@ -231,7 +234,8 @@ multPA_double_l _ (PDouble arr1) (PDouble arr2)
 -- div -----------------------------------------------------------------------
 {-# INLINE_PA divPP_int #-}
 divPP_int   :: Int :-> Int :-> Int
-divPP_int = closure2 div divPA_int_l
+divPP_int
+        = closure2 div divPA_int_l
 
 
 {-# INLINE_PA divPA_int_l #-}
@@ -243,22 +247,11 @@ divPA_int_l _ (PInt arr1) (PInt arr2)
 -- sum ------------------------------------------------------------------------
 {-# INLINE_PA sumPP_double #-}
 sumPP_double :: PArray Double :-> Double
-sumPP_double    = closure1 sumPA_double sumPA_l_double
-
-
-{-# INLINE_PA sumPA_double #-}
-sumPA_double   :: PArray Double -> Double
-sumPA_double (PArray _ (PDouble xs))
-        = U.sum xs
+sumPP_double
+        = closure1 sumPA_double sumPA_l_double
 
 
 {-# INLINE_PA sumPP_int #-}
 sumPP_int :: PArray Int :-> Int
-sumPP_int    = closure1 sumPA_int sumPA_l_int
-
-
-{-# INLINE_PA sumPA_int #-}
-sumPA_int   :: PArray Int  -> Int
-sumPA_int (PArray _ (PInt xs))
-        = U.sum xs
-
+sumPP_int
+        = closure1 sumPA_int sumPA_l_int
diff --git a/dph-common-vseg/Data/Array/Parallel/PArray/Stream.hs b/dph-common-vseg/Data/Array/Parallel/PArray/Stream.hs
deleted file mode 100644 (file)
index 7b8ca95..0000000
+++ /dev/null
@@ -1,95 +0,0 @@
-
-module Data.Array.Parallel.PArray.Stream where
-
-import Data.Array.Parallel.Stream
-import Data.Array.Parallel.PArray.PData
-import qualified Data.Vector                            as V
-import qualified Data.Vector.Generic                    as G
-import qualified Data.Vector.Fusion.Stream              as S
-import qualified Data.Vector.Fusion.Stream.Size         as S
-import qualified Data.Vector.Fusion.Stream.Monadic      as M
-import qualified Data.Array.Parallel.Unlifted           as U
-import qualified Data.Array.Parallel.Unlifted.Sequential.Vector as SU
-
-
-
--- | Lifted sum for Doubles
-sumPA_l_double :: Int
-        -> PData (PArray Double)
-        -> PData Double
-        
-sumPA_l_double _ arr
- = let  psegResults
-         = G.unstream 
-         $ foldSS (+) 0
-                (SU.stream (pnested_pseglens arr))
-                (streamPSegsOfNested arr)
-        
-        vsegResults
-         = U.bpermute psegResults (pnested_vsegids arr) 
-                        
-   in   PDouble vsegResults
-
-
--- | Lifted sum for Ints
-sumPA_l_int :: Int
-        -> PData (PArray Int)
-        -> PData Int
-        
-sumPA_l_int _ arr
- = let  psegResults
-         = G.unstream 
-         $ foldSS (+) 0
-                (SU.stream (pnested_pseglens arr))
-                (streamPSegsOfNested arr)
-
-        vsegResults
-         = U.bpermute psegResults (pnested_vsegids arr) 
-                        
-   in   PInt vsegResults
-
-
--- | Stream all the psegs from an array.
-streamPSegsOfNested :: PR a => PData (PArray a) -> S.Stream a
-streamPSegsOfNested arr
-        = streamPSegs 
-                (pnested_pseglens   arr)
-                (pnested_psegstarts arr)
-                (pnested_psegsrcids arr)
-                (pnested_psegdata   arr)        
-
-
--- | Stream some physical segments from many data arrays.
---   TODO: make this more efficient, and fix fusion.
---         We should be able to eliminate a lot of the indexing happening in the 
---         inner loop by being cleverer about the loop state.
-streamPSegs 
-        :: PR a
-        => U.Array Int          -- ^ length of segments
-        -> U.Array Int          -- ^ starting index of segment
-        -> U.Array Int          -- ^ source id of segment
-        -> V.Vector (PData a)   -- ^ data arrays
-        -> S.Stream a
-        
-streamPSegs pseglens psegstarts psegsrcs psegdata
- = let  
-        -- We've finished streaming this pseg
-        fn (pseg, ix)
-         -- All psegs are done.
-         | pseg >= U.length pseglens
-         = return $ S.Done
-         
-         -- Current pseg is done
-         | ix   >= pseglens U.!: pseg 
-         = fn (pseg + 1, 0)
-
-         -- Stream an element from this pseg
-         | otherwise
-         = let  srcid   = psegsrcs   U.!: pseg
-                pdata   = psegdata   V.!  srcid
-                start   = psegstarts U.!: pseg
-                result  = pdata `indexPR` (start + ix)
-           in   return $ S.Yield result
-                                 (pseg, ix + 1)
-
-   in   M.Stream fn (0, 0) S.Unknown
diff --git a/dph-common-vseg/Data/Array/Parallel/PArray/Sums.hs b/dph-common-vseg/Data/Array/Parallel/PArray/Sums.hs
new file mode 100644 (file)
index 0000000..18d9afc
--- /dev/null
@@ -0,0 +1,60 @@
+
+module Data.Array.Parallel.PArray.Sums
+        ( sumPA_double, sumPA_l_double
+        , sumPA_int,    sumPA_l_int)
+where
+import Data.Array.Parallel.PArray.PData
+import qualified Data.Vector                            as V
+import qualified Data.Array.Parallel.Unlifted           as U
+
+
+-- Double ---------------------------------------------------------------------
+{-# INLINE_PA sumPA_double #-}
+sumPA_double   :: PArray Double -> Double
+sumPA_double (PArray _ (PDouble xs))
+        = U.sum xs
+
+
+-- | Lifted sum for Doubles
+sumPA_l_double 
+        :: Int
+        -> PData (PArray Double)
+        -> PData Double
+{-# INLINE sumPA_l_double #-}        
+sumPA_l_double _ (PNested vsegd datas)
+ = let  -- Grab all the flat source vectors.
+        pdatas          = V.map toUArrayPR datas
+
+        -- Sum up each physical segment individually.
+        psegResults     = U.fold_ss (+) 0 (U.ssegdVSegd vsegd) pdatas
+
+        -- Replicate the physical results according to the vsegids.
+        vsegResults     = U.bpermute psegResults (U.vsegidsVSegd vsegd) 
+
+   in   PDouble vsegResults
+
+
+
+-- Int ------------------------------------------------------------------------
+{-# INLINE_PA sumPA_int #-}
+sumPA_int   :: PArray Int  -> Int
+sumPA_int (PArray _ (PInt xs))
+        = U.sum xs
+
+
+sumPA_l_int
+        :: Int
+        -> PData (PArray Int)
+        -> PData Int
+{-# INLINE sumPA_l_int #-}
+sumPA_l_int _ (PNested vsegd datas)
+ = let  -- Grab all the flat source vectors.
+        pdatas          = V.map toUArrayPR datas
+
+        -- Sum up each physical segment individually.
+        psegResults     = U.fold_ss (+) 0 (U.ssegdVSegd vsegd) pdatas
+
+        -- Replicate the physical results according to the vsegids.
+        vsegResults     = U.bpermute psegResults (U.vsegidsVSegd vsegd) 
+
+   in   PInt vsegResults
index 2781137..ebc08e6 100644 (file)
@@ -20,7 +20,9 @@ Library
         Data.Array.Parallel.PArray.PData.Unit
         Data.Array.Parallel.PArray.PData.Tuple
         Data.Array.Parallel.PArray.PData.Closure
+        Data.Array.Parallel.PArray.PData
         Data.Array.Parallel.PArray.PRepr
+        Data.Array.Parallel.PArray.Sums
         Data.Array.Parallel.Lifted.Closure
         Data.Array.Parallel.Lifted.Combinators
         Data.Array.Parallel.PArray
index ed5ee4a..8db428e 100644 (file)
@@ -152,6 +152,11 @@ sum_r segSize xs
         = notImplemented "sum_r"
 
 
+-- Scattered Segmented Folds --------------------------------------------------
+fold_ss  = notImplemented "fold_ss"
+fold1_ss = notImplemented "fold1_ss"
+
+
 -- Segment Descriptors --------------------------------------------------------
 data Segd 
         = Segd 
index fb18a6d..312f281 100644 (file)
@@ -53,6 +53,9 @@ module Data.Array.Parallel.Unlifted (
   -- * Segmented Folds
   fold_s, fold1_s, fold_r, sum_s,  sum_r,
   
+  -- * Scattered Segmented Folds
+  fold_ss, fold1_ss,
+  
   -- * Segment Descriptors
   Segd, mkSegd, validSegd,
   emptySegd, singletonSegd,
index 465ec0a..21cedc1 100644 (file)
@@ -1,4 +1,4 @@
-{-# LANGUAGE PackageImports, CPP #-}
+{-# LANGUAGE PackageImports, CPP, NoMonomorphismRestriction #-}
 
 -- | Primitive parallel combinators that work on flat, unlifted arrays.
 --   Some of them don't actually have parallel implementations, so we bail out
@@ -221,6 +221,11 @@ sum_r x arr
         $! sumRUP x arr
 
 
+-- Scattered Segmented Folds --------------------------------------------------
+fold_ss                 = foldSSUP
+fold1_ss                = fold1SSUP
+
+
 -- Segment Descriptors --------------------------------------------------------
 type Segd               = UPSegd
 mkSegd                  = mkUPSegd
index 03c57e4..6c7971f 100644 (file)
@@ -36,7 +36,11 @@ module Data.Array.Parallel.Unlifted.Parallel (
  
   -- * Segmented operators
   replicateSUP, replicateRSUP, appendSUP, indicesSUP,
-  foldSUP, foldRUP, fold1SUP, sumSUP, sumRUP,
+  foldSUP,  foldSSUP,
+  fold1SUP, fold1SSUP,
+  foldRUP,
+  sumSUP,
+  sumRUP,
 
   indexedUP, replicateUP, repeatUP, interleaveUP,
 
index 03b26dd..a5e8184 100644 (file)
@@ -5,16 +5,27 @@
 -- | Parallel combinators for segmented unboxed arrays
 module Data.Array.Parallel.Unlifted.Parallel.Segmented (
   replicateSUP, replicateRSUP, appendSUP, indicesSUP,
-  foldSUP, foldRUP, fold1SUP, sumSUP, sumRUP
+  foldSUP,      foldSSUP,
+  fold1SUP,     fold1SSUP,
+  foldRUP,
+  sumSUP,
+  sumRUP
 ) where
 import Data.Array.Parallel.Unlifted.Sequential.Vector as Seq
 import Data.Array.Parallel.Unlifted.Sequential.Segmented
 import Data.Array.Parallel.Unlifted.Distributed
 import Data.Array.Parallel.Unlifted.Parallel.Basics
+import Data.Array.Parallel.Unlifted.Parallel.Combinators (mapUP, zipWithUP, packUP, combineUP)
+import Data.Array.Parallel.Unlifted.Parallel.Sums        (sumUP )
+import Data.Array.Parallel.Unlifted.Parallel.Basics      (replicateUP, repeatUP)
+import Data.Array.Parallel.Unlifted.Parallel.Enum
+import Data.Array.Parallel.Unlifted.Parallel.Permute     (bpermuteUP)
 import Data.Array.Parallel.Unlifted.Parallel.UPSegd
-import qualified Data.Vector.Fusion.Stream as S
+import Data.Array.Parallel.Unlifted.Parallel.UPSSegd
+import qualified Data.Vector.Fusion.Stream              as S
 import Data.Vector.Fusion.Stream.Monadic ( Stream(..), Step(..) )
 import Data.Vector.Fusion.Stream.Size    ( Size(..) )
+import qualified Data.Vector                            as V
 import Control.Monad.ST ( ST, runST )
 
 
@@ -164,26 +175,43 @@ folds fElem fSeg segd xs
            in  ((k, Seq.take n rs), Seq.drop n rs)
 
 
-foldSUP :: Unbox a => (a -> a -> a) -> a -> UPSegd -> Vector a -> Vector a
+-- fold -----------------------------------------------------------------------
+foldSUP :: Unbox a
+        => (a -> a -> a) -> a -> UPSegd -> Vector a -> Vector a
 {-# INLINE foldSUP #-}
 foldSUP f !z = folds f (foldlSU f z)
 
 
-fold1SUP :: Unbox a => (a -> a -> a) -> UPSegd -> Vector a -> Vector a
+-- TODO: make this parallel
+foldSSUP :: Unbox a
+         => (a -> a -> a) -> a -> UPSSegd -> V.Vector (Vector a) -> Vector a
+{-# INLINE foldSSUP #-}
+foldSSUP f z upssegd xss
+        = foldSSU f z (ssegdUPSSegd upssegd) xss
+
+
+-- fold1 ----------------------------------------------------------------------
+fold1SUP :: Unbox a
+         => (a -> a -> a) -> UPSegd -> Vector a -> Vector a
 {-# INLINE fold1SUP #-}
 fold1SUP f = folds f (fold1SU f)
 
 
+-- TODO: Make this parallel
+fold1SSUP :: Unbox a
+          => (a -> a -> a) -> UPSSegd -> V.Vector (Vector a) -> Vector a
+{-# INLINE fold1SSUP #-}
+fold1SSUP f upssegd xss
+        = fold1SSU f (ssegdUPSSegd upssegd) xss
+
+
+-- sumS -----------------------------------------------------------------------
 sumSUP :: (Num e, Unbox e) => UPSegd -> Vector e -> Vector e
 {-# INLINE sumSUP #-}
 sumSUP = foldSUP (+) 0
 
 
-sumRUP :: (Num e, Unbox e) => Int -> Vector e -> Vector e
-{-# INLINE sumRUP #-}
-sumRUP = foldRUP (+) 0
-
-
+-- foldR ----------------------------------------------------------------------
 foldRUP :: (Unbox a, Unbox b) => (b -> a -> b) -> b -> Int -> Vector a -> Vector b
 {-# INLINE foldRUP #-}
 foldRUP f z !segSize xs = 
@@ -196,6 +224,13 @@ foldRUP f z !segSize xs =
     dlen = splitLenD theGang noOfSegs
 
 
+-- sumR -----------------------------------------------------------------------
+sumRUP :: (Num e, Unbox e) => Int -> Vector e -> Vector e
+{-# INLINE sumRUP #-}
+sumRUP = foldRUP (+) 0
+
+
+
 -- indices --------------------------------------------------------------------
 indicesSUP :: UPSegd -> Vector Int
 {-# INLINE_UP indicesSUP #-}
index df76ae0..29bc1d2 100644 (file)
@@ -103,6 +103,11 @@ fold_r                  = foldlRU
 sum_r                   = sumRU
 
 
+-- Scattered Segmented Folds --------------------------------------------------
+fold_ss                 = foldSSU
+fold1_ss                = fold1SSU
+
+
 -- Segment Descriptors --------------------------------------------------------
 type Segd               = USegd
 mkSegd                  = mkUSegd
index e3b3ead..784314b 100644 (file)
@@ -4,7 +4,9 @@ module Data.Array.Parallel.Unlifted.Sequential.Segmented (
 
   replicateSU, replicateRSU, appendSU, indicesSU, indicesSU',
 
-  foldlSU, foldSU, fold1SU,
+  foldlSU,
+  foldSU,   foldSSU,
+  fold1SU,  fold1SSU,
   foldlRU,
   combineSU,
 
index 489070b..68df889 100644 (file)
 
 -- | Standard combinators for segmented unlifted arrays.
 module Data.Array.Parallel.Unlifted.Sequential.Segmented.Combinators (
-  foldlSU, foldSU, foldl1SU, fold1SU, {-scanSU,-} {-scan1SU,-}
+  foldlSU,      foldlSSU,
+  foldSU,       foldSSU,
+  foldl1SU,     foldl1SSU,
+  fold1SU,      fold1SSU,
   foldlRU,
   combineSU
 ) where
-import Data.Array.Parallel.Stream (
-  foldSS, fold1SS, combineSS, foldValuesR )
-import Data.Array.Parallel.Unlifted.Sequential.Vector as V
+import Data.Array.Parallel.Stream
+import Data.Array.Parallel.Unlifted.Sequential.Vector           as U
 import Data.Array.Parallel.Unlifted.Sequential.Segmented.USegd
+import Data.Array.Parallel.Unlifted.Sequential.Segmented.USSegd
+import qualified Data.Vector                                    as V
 import Debug.Trace
 
 
+-- foldl ----------------------------------------------------------------------
 -- | Segmented array reduction proceeding from the left
-foldlSU :: (Unbox a, Unbox b) => (b -> a -> b) -> b -> USegd -> Vector a -> Vector b
+foldlSU  :: (Unbox a, Unbox b)
+         => (b -> a -> b) -> b -> USegd -> Vector a -> Vector b
 {-# INLINE_U foldlSU #-}
 foldlSU f z segd xs 
         = unstream
-         $ foldSS f z (stream (lengthsUSegd segd)) (stream xs)
+        $ foldSS f z    (stream (lengthsUSegd segd))
+                        (stream xs)
+
+-- | Segmented array reduction proceeding from the left.
+--   For scattered segments.
+foldlSSU :: (Unbox a, Unbox b)
+         => (b -> a -> b) -> b -> USSegd -> V.Vector (Vector a) -> Vector b
+{-# INLINE_U foldlSSU #-}
+foldlSSU f z ssegd xss
+        = unstream
+        $ foldSS f z    (stream (lengthsUSSegd ssegd))
+                        (streamSegsFromUSSegd ssegd xss)
 
 
+-- fold -----------------------------------------------------------------------
 -- | Segmented array reduction that requires an associative combination
 --   function with its unit
-foldSU :: Unbox a => (a -> a -> a) -> a -> USegd -> Vector a -> Vector a
+foldSU  :: Unbox a
+        => (a -> a -> a) -> a -> USegd -> Vector a -> Vector a
+{-# INLINE_U foldSU #-}
 foldSU = foldlSU
 
 
+-- | Segmented array reduction that requires an associative combination
+--   function with its unit. For scattered segments.
+foldSSU :: Unbox a
+        => (a -> a -> a) -> a -> USSegd -> V.Vector (Vector a) -> Vector a
+{-# INLINE_U foldSSU #-}
+foldSSU = foldlSSU       
+
+
+-- foldl1 ---------------------------------------------------------------------
 -- | Segmented array reduction from left to right with non-empty subarrays only
-foldl1SU :: Unbox a => (a -> a -> a) -> USegd -> Vector a -> Vector a
+foldl1SU :: Unbox a
+         => (a -> a -> a) -> USegd -> Vector a -> Vector a
 {-# INLINE_U foldl1SU #-}
 foldl1SU f segd xs 
         = unstream
-        $ fold1SS f (stream (lengthsUSegd segd)) (stream xs)
+        $ fold1SS f     (stream (lengthsUSegd segd))
+                        (stream xs)
+
 
+-- | Segmented array reduction from left to right with non-empty subarrays only.
+--   For scattered segments.
+foldl1SSU :: Unbox a
+          => (a -> a -> a) -> USSegd -> V.Vector (Vector a) -> Vector a
+{-# INLINE_U foldl1SSU #-}
+foldl1SSU f ssegd xxs
+        = unstream
+        $ fold1SS f     (stream (lengthsUSSegd ssegd))
+                        (streamSegsFromUSSegd ssegd xxs)
 
+
+-- fold1 ----------------------------------------------------------------------
 -- | Segmented array reduction with non-empty subarrays and an associative
---   combination function
-fold1SU :: Unbox a => (a -> a -> a) -> USegd -> Vector a -> Vector a
+--   combination function.
+fold1SU :: Unbox a
+        => (a -> a -> a) -> USegd -> Vector a -> Vector a
+{-# INLINE_U fold1SU #-}
 fold1SU = foldl1SU
 
 
+-- | Segmented array reduction with non-empty subarrays and an associative
+--   combination function. For scattered segments.
+fold1SSU :: Unbox a
+        => (a -> a -> a) -> USSegd -> V.Vector (Vector a) -> Vector a
+{-# INLINE_U fold1SSU #-}
+fold1SSU = foldl1SSU
+
+
+
+-- foldlR ---------------------------------------------------------------------
+-- | Regular arrar reduction 
+foldlRU :: (Unbox a, Unbox b) => (b -> a -> b) -> b -> Int -> Vector a -> Vector b
+{-# INLINE_U foldlRU #-}
+foldlRU f z segSize
+        = unstream . foldValuesR f z segSize . stream
+
+
 -- | Merge two segmented arrays according to flag array
 combineSU :: Unbox a => Vector Bool -> USegd -> Vector a -> USegd -> Vector a -> Vector a
 {-# INLINE_U combineSU #-}
@@ -51,10 +113,3 @@ combineSU bs xd xs yd ys
                     (stream (lengthsUSegd xd)) (stream xs)
                     (stream (lengthsUSegd yd)) (stream ys)
 
-
--- | Regular arrar reduction 
-foldlRU :: (Unbox a, Unbox b) => (b -> a -> b) -> b -> Int -> Vector a -> Vector b
-{-# INLINE_U foldlRU #-}
-foldlRU f z segSize
-        = unstream . foldValuesR f z segSize . stream
-
index 4417f92..652a0e0 100644 (file)
@@ -21,10 +21,17 @@ module Data.Array.Parallel.Unlifted.Sequential.Segmented.USSegd (
         
         -- * Operators
         appendUSSegd,
-        cullUSSegdOnVSegids
+        cullUSSegdOnVSegids,
+        
+        -- * Streams
+        streamSegsFromUSSegd
 ) where
 import Data.Array.Parallel.Unlifted.Sequential.Segmented.USegd
-import Data.Array.Parallel.Unlifted.Sequential.Vector as V
+import Data.Array.Parallel.Unlifted.Sequential.Vector           as U
+import qualified Data.Vector                                    as V
+import qualified Data.Vector.Fusion.Stream                      as S
+import qualified Data.Vector.Fusion.Stream.Size                 as S
+import qualified Data.Vector.Fusion.Stream.Monadic              as M
 import Data.Array.Parallel.Pretty
 
 
@@ -60,8 +67,8 @@ instance PprPhysical USSegd where
   = vcat
   [ text "USSegd" 
         $$ (nest 7 $ vcat
-                [ text "starts:  " <+> (text $ show $ V.toList starts)
-                , text "srcids:  " <+> (text $ show $ V.toList srcids) ])
+                [ text "starts:  " <+> (text $ show $ U.toList starts)
+                , text "srcids:  " <+> (text $ show $ U.toList srcids) ])
   , pprp ssegd ]
 
 
@@ -84,15 +91,15 @@ mkUSSegd = USSegd
 validUSSegd :: USSegd -> Bool
 {-# INLINE validUSSegd #-}
 validUSSegd (USSegd starts srcids usegd)
-        =  (V.length starts == lengthUSegd usegd)
-        && (V.length srcids == lengthUSegd usegd)
+        =  (U.length starts == lengthUSegd usegd)
+        && (U.length srcids == lengthUSegd usegd)
 
 
 -- | O(1).
 --  Yield an empty segment descriptor, with no elements or segments.
 emptyUSSegd :: USSegd
 {-# INLINE emptyUSSegd #-}
-emptyUSSegd = USSegd V.empty V.empty emptyUSegd
+emptyUSSegd = USSegd U.empty U.empty emptyUSegd
 
 
 -- | O(1).
@@ -102,7 +109,7 @@ emptyUSSegd = USSegd V.empty V.empty emptyUSegd
 singletonUSSegd :: Int -> USSegd
 {-# INLINE singletonUSSegd #-}
 singletonUSSegd n 
-        = USSegd (V.singleton 0) (V.singleton 0) (singletonUSegd n)
+        = USSegd (U.singleton 0) (U.singleton 0) (singletonUSegd n)
 
 
 -- | O(segs). 
@@ -112,7 +119,7 @@ promoteUSegdToUSSegd :: USegd -> USSegd
 {-# INLINE promoteUSegdToUSSegd #-}
 promoteUSegdToUSSegd usegd
         = USSegd (indicesUSegd usegd)
-                 (V.replicate (lengthUSegd usegd) 0)
+                 (U.replicate (lengthUSegd usegd) 0)
                  usegd
                  
 
@@ -167,8 +174,8 @@ getSegOfUSSegd (USSegd starts sources usegd) ix
  = let  (len, index) = getSegOfUSegd usegd ix
    in   ( len
         , index
-        , starts  V.! ix
-        , sources V.! ix)
+        , starts  U.! ix
+        , sources U.! ix)
 
 
 -- Operators ------------------------------------------------------------------
@@ -182,8 +189,8 @@ appendUSSegd
 {-# INLINE appendUSSegd #-}
 appendUSSegd (USSegd starts1 srcs1 usegd1) pdatas1
              (USSegd starts2 srcs2 usegd2) _
-        = USSegd (starts1  V.++  starts2)
-                 (srcs1    V.++  V.map (+ pdatas1) srcs2)
+        = USSegd (starts1  U.++  starts2)
+                 (srcs1    U.++  U.map (+ pdatas1) srcs2)
                  (appendUSegd usegd1 usegd2)
 
 
@@ -206,15 +213,15 @@ cullUSSegdOnVSegids vsegids (USSegd starts sources usegd)
         --  
         --  Note that psegids '2' and '4' are not in vsegids_packed.
         psegids_used
-         = V.bpermuteDft (lengthUSegd usegd)
+         = U.bpermuteDft (lengthUSegd usegd)
                          (const False)
-                         (V.zip vsegids (V.replicate (V.length vsegids) True))
+                         (U.zip vsegids (U.replicate (U.length vsegids) True))
 
         -- Produce an array of used psegs.
         --  eg  psegids_used:   [1 1 0 1 0 1 1]
         --      psegids_packed: [0 1 3 5 6]
         psegids_packed
-         = V.pack (V.enumFromTo 0 (V.length psegids_used)) psegids_used
+         = U.pack (U.enumFromTo 0 (U.length psegids_used)) psegids_used
 
         -- Produce an array that maps psegids in the source array onto
         -- psegids in the result array. If a particular pseg isn't present
@@ -227,9 +234,9 @@ cullUSSegdOnVSegids vsegids (USSegd starts sources usegd)
         --                      [0 1 2 3 4]
         --      psegids_map:    [0 1 -1 2 -1 3 4]
         psegids_map
-         = V.bpermuteDft (lengthUSegd usegd)
+         = U.bpermuteDft (lengthUSegd usegd)
                          (const (-1))
-                         (V.zip psegids_packed (V.enumFromTo 0 (V.length psegids_packed - 1)))
+                         (U.zip psegids_packed (U.enumFromTo 0 (U.length psegids_packed - 1)))
 
         -- Use the psegids_map to rewrite the packed vsegids to point to the 
         -- corresponding psegs in the result.
@@ -239,13 +246,13 @@ cullUSSegdOnVSegids vsegids (USSegd starts sources usegd)
         -- 
         --      vsegids':       [0 1 1 2 3 3 4 4]
         --
-        vsegids'  = V.map (psegids_map V.!) vsegids
+        vsegids'  = U.map (psegids_map U.!) vsegids
 
         -- Rebuild the usegd.
-        starts'   = V.pack starts  psegids_used
-        sources'  = V.pack sources psegids_used
+        starts'   = U.pack starts  psegids_used
+        sources'  = U.pack sources psegids_used
 
-        lengths'  = V.pack (lengthsUSegd usegd) psegids_used
+        lengths'  = U.pack (lengthsUSegd usegd) psegids_used
         usegd'    = lengthsToUSegd lengths'
         
         ussegd'   = USSegd starts' sources' usegd'
@@ -254,4 +261,41 @@ cullUSSegdOnVSegids vsegids (USSegd starts sources usegd)
 
 
 
+-- | Stream some physical segments from many data arrays.
+--   TODO: make this more efficient, and fix fusion.
+--         We should be able to eliminate a lot of the indexing happening in the 
+--         inner loop by being cleverer about the loop state.
+streamSegsFromUSSegd
+        :: Unbox a
+        => USSegd               -- ^ Segment descriptor defining segments based on source vectors.
+        -> V.Vector (Vector a)  -- ^ Source vectors.
+        -> S.Stream a
+        
+streamSegsFromUSSegd ussegd@(USSegd starts sources usegd) pdatas
+ = let  
+        -- length of each segment
+        pseglens        = lengthsUSegd usegd
+        -- We've finished streaming this pseg
+        {-# INLINE fn #-}
+        fn (pseg, ix)
+         -- All psegs are done.
+         | pseg >= lengthUSSegd ussegd
+         = return $ S.Done
+         
+         -- Current pseg is done
+         | ix   >= pseglens U.! pseg 
+         = fn (pseg + 1, 0)
+
+         -- Stream an element from this pseg
+         | otherwise
+         = let  srcid   = sources    U.! pseg
+                pdata   = pdatas     V.!  srcid
+                start   = starts     U.! pseg
+                result  = pdata U.! (start + ix)
+           in   return $ S.Yield result 
+                                (pseg, ix + 1)
+
+   in   M.Stream fn (0, 0) S.Unknown
+