dph-prim-par: add tracing to distributed operations
authorBen Lippmeier <benl@ouroborus.net>
Mon, 30 Jul 2012 01:54:43 +0000 (11:54 +1000)
committerBen Lippmeier <benl@ouroborus.net>
Mon, 30 Jul 2012 03:48:35 +0000 (13:48 +1000)
16 files changed:
dph-prim-par/Data/Array/Parallel/Unlifted.hs
dph-prim-par/Data/Array/Parallel/Unlifted/Distributed/Arrays.hs
dph-prim-par/Data/Array/Parallel/Unlifted/Distributed/Data/USSegd/Split.hs
dph-prim-par/Data/Array/Parallel/Unlifted/Distributed/Data/USegd/Split.hs
dph-prim-par/Data/Array/Parallel/Unlifted/Distributed/Primitive/DistST.hs
dph-prim-par/Data/Array/Parallel/Unlifted/Distributed/Primitive/Gang.hs
dph-prim-par/Data/Array/Parallel/Unlifted/Distributed/Primitive/Operators.hs
dph-prim-par/Data/Array/Parallel/Unlifted/Distributed/What.hs
dph-prim-par/Data/Array/Parallel/Unlifted/Parallel.hs
dph-prim-par/Data/Array/Parallel/Unlifted/Parallel/Extracts.hs
dph-prim-par/Data/Array/Parallel/Unlifted/Parallel/UPVSegd.hs
dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential/Extracts.hs
dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential/USSegd.hs
dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential/USegd.hs
dph-prim-seq/Data/Array/Parallel/Unlifted/Sequential/UVSegd.hs
dph-prim-seq/Data/Array/Parallel/Unlifted/Stream/Segments.hs

index 02972a8..394fe3a 100644 (file)
@@ -111,7 +111,7 @@ enumFromStepLenEach n starts steps lens
 length          = Seq.length
 index           = Seq.index
 indexs          = indexsFromVector
-indexs_avs      = indexsFromVectorsUPVSegd
+indexs_avs      = indexsFromVectorsUPVSegdP
 
 extract arr i n
         =  tracePrim (TraceExtract (Seq.length arr) i n)
index 8d6f42b..7ca98a8 100644 (file)
@@ -163,6 +163,21 @@ splitD_impl g !arr
 {-# INLINE_DIST splitD_impl #-}
 
 
+-- SplitJoin ------------------------------------------------------------------
+-- | Split a vector over a gang, run a distributed computation, then
+--   join the pieces together again.
+splitJoinD
+        :: (Unbox a, Unbox b)
+        => Gang
+        -> (Dist (Vector a) -> Dist (Vector b))
+        -> Vector a
+        -> Vector b
+splitJoinD g f !xs 
+  = joinD_impl g (f (splitD_impl g xs))
+{-# INLINE_DIST splitJoinD #-}
+
+
+-- Join -----------------------------------------------------------------------
 -- | Join a distributed array.
 --   Join sums up the array lengths of each chunk, allocates a new result array, 
 --   and copies each chunk into the result.
@@ -176,45 +191,37 @@ joinD g _ darr  = joinD_impl g darr
 
 
 joinD_impl :: forall a. Unbox a => Gang -> Dist (Vector a) -> Vector a
-joinD_impl g !darr 
-  = checkGangD (here "joinD") g darr 
-  $ Seq.new n (\ma -> zipWithDST_ g (copy ma) di darr)
-  where
-    (!di,!n)    = scanD (What "joinD_impl") g (+) 0 
-                $ lengthD darr
-
-    copy :: forall s. MVector s a -> Int -> Vector a -> DistST s ()
-    copy ma i arr 
-        = stToDistST (Seq.copy (Seq.mslice i (Seq.length arr) ma) arr)
-{-# INLINE_DIST joinD_impl #-}
+joinD_impl gang !darr 
+ = let  -- Determine where each thread's local chunk should go
+        -- in the result vector, and count the total number of elements.
+        (!di,!n) = scanD (What "joinD_impl/count") gang (+) 0 
+                 $ lengthD darr
 
+        copy :: forall s. MVector s a -> Int -> Vector a -> DistST s ()
+        copy ma i arr 
+         = stToDistST (Seq.copy (Seq.mslice i (Seq.length arr) ma) arr)
+        {-# INLINE copy #-}
 
--- | Split a vector over a gang, run a distributed computation, then
---   join the pieces together again.
-splitJoinD
-        :: (Unbox a, Unbox b)
-        => Gang
-        -> (Dist (Vector a) -> Dist (Vector b))
-        -> Vector a
-        -> Vector b
-splitJoinD g f !xs 
-  = joinD_impl g (f (splitD_impl g xs))
-{-# INLINE_DIST splitJoinD #-}
-
+   in   Seq.new n $ \ma 
+         -> zipWithDST_ 
+                (WhatJoinCopy n) 
+                gang (copy ma) di darr
+{-# INLINE_DIST joinD_impl #-}
 
 
 -- | Join a distributed array, yielding a mutable global array
 joinDM :: Unbox a => Gang -> Dist (Vector a) -> ST s (MVector s a)
-joinDM g darr 
- = checkGangD (here "joinDM") g darr 
+joinDM gang darr 
+ = checkGangD (here "joinDM") gang darr 
  $ do   marr <- Seq.newM n
-        zipWithDST_ g (copy marr) di darr
+        zipWithDST_ (WhatJoinCopy n) gang (copy marr) di darr
         return marr
  where
-        (!di,!n) = scanD (What "joinDM") g (+) 0 
+        (!di,!n) = scanD (What "joinDM/count") gang (+) 0 
                  $ lengthD darr
 
-        copy ma i arr   = stToDistST (Seq.copy (Seq.mslice i (Seq.length arr) ma) arr)
+        copy ma i arr   
+                 = stToDistST (Seq.copy (Seq.mslice i (Seq.length arr) ma) arr)
 {-# INLINE joinDM #-}
 
 
@@ -266,7 +273,7 @@ permuteD
         :: forall a. Unbox a 
         => Gang -> Dist (Vector a) -> Dist (Vector Int) -> Vector a
 permuteD g darr dis 
-  = Seq.new n (\ma -> zipWithDST_ g (permute ma) darr dis)
+  = Seq.new n (\ma -> zipWithDST_ (What "permuteD") g (permute ma) darr dis)
   where
     n = joinLengthD g darr
 
@@ -297,7 +304,7 @@ atomicUpdateD :: forall a. Unbox a
 atomicUpdateD g darr upd 
  = runST 
  $ do   marr <- joinDM g darr
-        mapDST_ g (update marr) upd
+        mapDST_ (What "atomicUpdateD") g (update marr) upd
         Seq.unsafeFreeze marr
  where
         update :: forall s. MVector s a -> Vector (Int,a) -> DistST s ()
index a9eba19..a4661ab 100644 (file)
@@ -18,6 +18,7 @@ import Data.Array.Parallel.Unlifted.Distributed.Data.USSegd.DT          ()
 import qualified Data.Array.Parallel.Unlifted.Sequential.USegd          as USegd
 import qualified Data.Array.Parallel.Unlifted.Sequential.USSegd         as USSegd
 import qualified Data.Array.Parallel.Unlifted.Sequential.Vector         as Seq
+import Debug.Trace
 
 here :: String -> String
 here s = "Data.Array.Parallel.Unlifted.Distributed.USSegd." ++ s
@@ -55,8 +56,9 @@ here s = "Data.Array.Parallel.Unlifted.Distributed.USSegd." ++ s
 splitSSegdOnElemsD :: Gang -> USSegd -> Dist ((USSegd,Int),Int)
 splitSSegdOnElemsD g !segd 
   = {-# SCC "splitSSegdOnElemsD" #-}
-    imapD (What "UPSSegd.splitSSegdOnElems/splitLenIx") g mk 
-        (splitLenIdxD g (USegd.takeElements $ USSegd.takeUSegd segd))
+    traceEvent ("dph-prim-par: USSegd.splitSSegdOnElems")
+  $ imapD (What "UPSSegd.splitSSegdOnElems/splitLenIx") g mk 
+          (splitLenIdxD g (USegd.takeElements $ USSegd.takeUSegd segd))
   where 
         -- Number of threads in gang.
         !nThreads = gangSize g
index 4f71d6d..3625552 100644 (file)
@@ -21,6 +21,7 @@ import Data.Bits                                                        (shiftR)
 import Control.Monad                                                    (when)
 import qualified Data.Array.Parallel.Unlifted.Sequential.USegd          as USegd
 import qualified Data.Array.Parallel.Unlifted.Sequential.Vector         as Seq
+import Debug.Trace
 
 here :: String -> String
 here s = "Data.Array.Parallel.Unlifted.Distributed.USegd." ++ s
@@ -55,7 +56,8 @@ here s = "Data.Array.Parallel.Unlifted.Distributed.USegd." ++ s
 --
 splitSegdOnSegsD :: Gang -> USegd -> Dist USegd
 splitSegdOnSegsD g !segd 
-  = mapD (What "USegd.splitSegdOnSegds/fromLengths") g USegd.fromLengths
+  = traceEvent ("dph-prim-par: USegd.splitSegdOnSegsD")
+  $ mapD (What "USegd.splitSegdOnSegds/fromLengths") g USegd.fromLengths
   $ splitAsD g d lens
   where
     !d   = snd
@@ -113,8 +115,9 @@ splitSegdOnSegsD g !segd
 splitSegdOnElemsD :: Gang -> USegd -> Dist ((USegd,Int),Int)
 splitSegdOnElemsD g !segd 
   = {-# SCC "splitSegdOnElemsD" #-} 
-    imapD (What "USegd.splitSegdOnElemsD/splitLenIdx") g mk 
-        (splitLenIdxD g (USegd.takeElements segd))
+    traceEvent ("dph-prim-par: USegd.splitSegdOnElemsD")
+  $ imapD      (What "USegd.splitSegdOnElemsD/splitLenIdx") 
+        g mk (splitLenIdxD g (USegd.takeElements segd))
   where 
         -- Number of threads in gang.
         !nThreads = gangSize g
index 489d8b9..a08b6d0 100644 (file)
@@ -19,7 +19,6 @@ module Data.Array.Parallel.Unlifted.Distributed.Primitive.DistST
         , stToDistST
         , distST_, distST
         , runDistST, runDistST_seq
-        , traceDistST
         , myIndex
         , myD
         , readMyMD, writeMyMD
@@ -27,14 +26,13 @@ module Data.Array.Parallel.Unlifted.Distributed.Primitive.DistST
           -- * Monadic combinators
         , mapDST_, mapDST, zipWithDST_, zipWithDST)
 where
-import Data.Array.Parallel.Base (ST, runST)
+import Data.Array.Parallel.Unlifted.Distributed.What
 import Data.Array.Parallel.Unlifted.Distributed.Primitive.DT
 import Data.Array.Parallel.Unlifted.Distributed.Primitive.Gang
 import Data.Array.Parallel.Unlifted.Distributed.Data.Tuple
+import Data.Array.Parallel.Base (ST, runST)
 import Control.Monad (liftM)
 
-here s = "Data.Array.Parallel.Unlifted.Distributed.DistST." ++ s
-
 
 -- | Data-parallel computations.
 --   When applied to a thread gang, the computation implicitly knows the index
@@ -88,34 +86,24 @@ writeMyMD mdt x
 {-# NOINLINE writeMyMD #-}
 
 
--- | Execute a data-parallel computation on a 'Gang'.
---   The same DistST comutation runs on each thread.
-distST_ :: Gang -> DistST s () -> ST s ()
-distST_ g = gangST g . unDistST
-{-# INLINE distST_ #-}
-
-
--- | Execute a data-parallel computation, yielding the distributed result.
-distST :: DT a => Gang -> DistST s a -> ST s (Dist a)
-distST g p 
- = do   md <- newMD g
-        distST_ g $ writeMyMD md =<< p
-        unsafeFreezeMD md
-{-# INLINE distST #-}
-
 
+-- Running --------------------------------------------------------------------
 -- | Run a data-parallel computation, yielding the distributed result.
-runDistST :: DT a => Gang -> (forall s. DistST s a) -> Dist a
-runDistST g p = runST (distST g p)
+runDistST :: DT a => Comp -> Gang -> (forall s. DistST s a) -> Dist a
+runDistST comp g p 
+ = runST $ distST comp g p
 {-# NOINLINE runDistST #-}
 
 
-runDistST_seq :: forall a. DT a => Gang -> (forall s. DistST s a) -> Dist a
-runDistST_seq g p = runST (
-  do
+runDistST_seq 
+        :: forall a. DT a 
+        => Gang -> (forall s. DistST s a) -> Dist a
+runDistST_seq g p 
+ = runST 
+ $ do
      md <- newMD g
      go md 0
-     unsafeFreezeMD md)                           
+     unsafeFreezeMD md
   where
     !n = gangSize g
     go :: forall s. MDist a s -> Int -> ST s ()
@@ -126,12 +114,42 @@ runDistST_seq g p = runST (
 {-# NOINLINE runDistST_seq #-}
 
 
-traceDistST :: String -> DistST s ()
-traceDistST s 
-        = DistST $ \n -> traceGangST ("Worker " ++ show n ++ ": " ++ s)
-{-# INLINE traceDistST #-}
+-- | Execute a data-parallel computation, yielding the distributed result.
+distST  :: DT a 
+        => Comp -> Gang 
+        -> DistST s a -> ST s (Dist a)
+distST comp g p 
+ = do   md <- newMD g
+
+        distST_ comp g 
+         $ writeMyMD md =<< p
+
+        unsafeFreezeMD md
+{-# INLINE distST #-}
 
 
+-- | Execute a data-parallel computation on a 'Gang'.
+--   The same DistST comutation runs on each thread.
+distST_ :: Comp -> Gang -> DistST s () -> ST s ()
+distST_ comp gang proc
+        = gangST gang 
+                (show comp) 
+                (workloadOfComp comp)
+        $ unDistST proc
+{-# INLINE distST_ #-}
+
+workloadOfComp :: Comp -> Workload
+workloadOfComp cc
+ = case cc of
+        CompDist w              -> workloadOfWhat w
+        _                       -> WorkUnknown
+
+workloadOfWhat :: What -> Workload
+workloadOfWhat ww
+ = case ww of
+        WhatJoinCopy elems      -> WorkCopy elems 
+        _                       -> WorkUnknown
+
 -- Combinators ----------------------------------------------------------------
 -- Versions that work on DistST -----------------------------------------------
 -- NOTE: The following combinators must be strict in the Dists because if they
@@ -139,43 +157,49 @@ traceDistST s
 -- the current computation which, again, is parallel. This would break our
 -- model andlead to a deadlock. Hence the bangs.
 
-mapDST_ :: DT a => Gang -> (a -> DistST s ()) -> Dist a -> ST s ()
-mapDST_ g p d 
- = mapDST_' g (\x -> x `deepSeqD` p x) d
+mapDST  :: (DT a, DT b) 
+        => What -> Gang -> (a -> DistST s b) -> Dist a -> ST s (Dist b)
+mapDST what g p !d 
+ = mapDST' what g (\x -> x `deepSeqD` p x) d
+{-# INLINE mapDST #-}
+
+
+mapDST_ :: DT a => What -> Gang -> (a -> DistST s ()) -> Dist a -> ST s ()
+mapDST_ what g p !d 
+ = mapDST_' what g (\x -> x `deepSeqD` p x) d
 {-# INLINE mapDST_ #-}
 
 
-mapDST_' :: DT a => Gang -> (a -> DistST s ()) -> Dist a -> ST s ()
-mapDST_' g p !d 
- = checkGangD (here "mapDST_") g d 
- $ distST_ g (myD d >>= p)
+mapDST' :: (DT a, DT b) => What -> Gang -> (a -> DistST s b) -> Dist a -> ST s (Dist b)
+mapDST' what g p !d 
+ = distST (CompDist what) g (myD d >>= p)
+{-# INLINE mapDST' #-}
 
 
-mapDST :: (DT a, DT b) => Gang -> (a -> DistST s b) -> Dist a -> ST s (Dist b)
-mapDST g p !d = mapDST' g (\x -> x `deepSeqD` p x) d
-{-# INLINE mapDST #-}
+mapDST_' 
+        :: DT a 
+        => What -> Gang -> (a -> DistST s ()) -> Dist a -> ST s ()
+mapDST_' what g p !d 
+ = distST_ (CompDist what) g (myD d >>= p)
+{-# INLINE mapDST_' #-}
 
 
-mapDST' :: (DT a, DT b) => Gang -> (a -> DistST s b) -> Dist a -> ST s (Dist b)
-mapDST' g p !d 
- = checkGangD (here "mapDST_") g d 
- $ distST g (myD d >>= p)
-{-# INLINE mapDST' #-}
+zipWithDST 
+        :: (DT a, DT b, DT c)
+        => What 
+        -> Gang
+        -> (a -> b -> DistST s c) -> Dist a -> Dist b -> ST s (Dist c)
+zipWithDST what g p !dx !dy 
+ = mapDST what g (uncurry p) (zipD dx dy)
+{-# INLINE zipWithDST #-}
 
 
 zipWithDST_ 
         :: (DT a, DT b)
-        => Gang -> (a -> b -> DistST s ()) -> Dist a -> Dist b -> ST s ()
-zipWithDST_ g p !dx !dy 
- = mapDST_ g (uncurry p) (zipD dx dy)
+        => What -> Gang -> (a -> b -> DistST s ()) -> Dist a -> Dist b -> ST s ()
+zipWithDST_ what g p !dx !dy 
+ = mapDST_ what g (uncurry p) (zipD dx dy)
 {-# INLINE zipWithDST_ #-}
 
 
-zipWithDST 
-        :: (DT a, DT b, DT c)
-        => Gang
-        -> (a -> b -> DistST s c) -> Dist a -> Dist b -> ST s (Dist c)
-zipWithDST g p !dx !dy 
- = mapDST g (uncurry p) (zipD dx dy)
-{-# INLINE zipWithDST #-}
 
index fd43c8f..4c0c860 100644 (file)
@@ -6,16 +6,16 @@
 #define SEQ_IF_GANG_BUSY 1
 
 -- Trace all work requests sent to the gang.
-#define TRACE_GANG 0
+#define TRACE_GANG 1
 
 -- | Gang primitives.
 module Data.Array.Parallel.Unlifted.Distributed.Primitive.Gang
         ( Gang
+        , Workload      (..)
         , seqGang
         , forkGang
         , gangSize
-        , gangIO, gangST
-        , traceGang, traceGangST )
+        , gangIO, gangST)
 where
 import GHC.IO
 import GHC.ST
@@ -83,25 +83,32 @@ seqGang (Gang n _ mv) = Gang n [] mv
 --   The threads blocks on the MVar waiting for a work request.
 gangWorker :: Int -> MVar Req -> IO ()
 gangWorker threadId varReq
- = do   traceGang $ "Worker " ++ show threadId ++ " waiting for request."
+ = do   traceWorker threadId $ "ready."
         req     <- takeMVar varReq
         
         case req of
          ReqDo action varDone
-          -> do traceGang $ "Worker " ++ show threadId ++ " begin"
+          -> do traceWorker threadId $ " begin."
                 start   <- getGangTime
                 action threadId
                 end     <- getGangTime
-                traceGang $ "Worker " ++ show threadId 
-                          ++ " end (" ++ diffTime start end ++ ")"
+                traceWorker threadId $ " end (" ++ diffTime start end ++ ")."
                 
                 putMVar varDone ()
                 gangWorker threadId varReq
 
          ReqShutdown varDone
-          -> do traceGang $ "Worker " ++ show threadId ++ " shutting down."
+          -> do traceWorker threadId $ " shutting down."
                 putMVar varDone ()
 
+traceWorker :: Int -> String -> IO ()
+traceWorker threadId str
+ = traceGang 
+        $ "Worker " ++ show threadId 
+        ++ " "
+        ++ replicate (threadId * 10) ' '
+        ++ str
+
 
 -- | Finaliser for worker threads.
 --   We want to shutdown the corresponding thread when it's MVar becomes
@@ -153,68 +160,103 @@ gangSize :: Gang -> Int
 gangSize (Gang n _ _) = n
 
 
+-------------------------------------------------------------------------------
+data Workload
+        -- | Unknown workload. Just run it in parallel.
+        = WorkUnknown
+
+        -- | Memory bound copy-like workload, 
+        --   of the given number of bytes.
+        | WorkCopy      Int
+        deriving (Eq, Show)
+
+
+-- | Decide whether a workload is too small to bother running in parallel.
+--   TODO: We want to determine this based on similar workloads that 
+--         we have run before. The gang should know what its minumum latency is.
+workloadIsSmall :: Workload -> Bool
+workloadIsSmall ww
+ = case ww of
+        WorkUnknown     -> False
+        WorkCopy bytes  -> bytes < 1000
+
+
+
 -- | Issue work requests for the 'Gang' and wait until they have been executed.
 --   If the gang is already busy then just run the action in the requesting
 --   thread. 
 gangIO  :: Gang
+        -> String 
+        -> Workload
         -> (Int -> IO ())
         -> IO ()
 
-gangIO (Gang n [] _)  p 
+-- Hrm. Gang hasn't been created yet. 
+-- Just run the requests in the main thread.
+gangIO (Gang n [] _) _what _workload p 
  = mapM_ p [0 .. n-1]
 
 #if SEQ_IF_GANG_BUSY
-gangIO (Gang n mvs busy) p 
- = do   traceGang   "gangIO: issuing work requests (SEQ_IF_GANG_BUSY)"
-        b <- swapMVar busy True
-
-        traceGang $ "gangIO: gang is currently " ++ (if b then "busy" else "idle")
-        if b
-         then mapM_ p [0 .. n-1]
+gangIO (Gang n mvs busy) what workload p 
+ = do   let !small      = workloadIsSmall workload
+        if small 
+         then do
+                traceGang $ "Issuing  small " ++ what
+                mapM_ p [0 .. n-1]
          else do
-                parIO n mvs p
-                _ <- swapMVar busy False
-                return ()
+                isBusy          <- swapMVar busy True
+                if isBusy 
+                 then do 
+                        traceGang $ "WARNING: Gang was already busy, running sequentially "
+                        mapM_ p [0 .. n-1]
+                 else do
+                        traceGang $ "Issuing  par   " ++ what
+                        parIO what n mvs p
+                        _ <- swapMVar busy False
+                        return ()
 #else
-gangIO (Gang n mvs busy) p = parIO n mvs p
+gangIO (Gang n mvs busy) what _workload p 
+        = parIO n mvs p
 #endif
 
 
 -- | Issue some requests to the worker threads and wait for them to complete.
-parIO   :: Int                  -- ^ Number of threads in the gang.
+parIO   :: String
+        -> Int                  -- ^ Number of threads in the gang.
         -> [MVar Req]           -- ^ Request vars for worker threads.
         -> (Int -> IO ())       -- ^ Action to run in all the workers, it's
                                 --   given the ix of the particular worker
                                 ---  thread it's running on.
         -> IO ()
 
-parIO n mvs p 
- = do   traceGang "parIO: begin"
-
-        start   <- getGangTime
+parIO what n mvs p 
+ = do   start   <- getGangTime
         reqs    <- sequence . replicate n $ newReq p
 
-        traceGang "parIO: issuing requests"
         zipWithM_ putMVar mvs reqs
 
-        traceGang "parIO: waiting for requests to complete"
+        traceGang $ "Running."
         mapM_ waitReq reqs
         end     <- getGangTime
 
-        traceGang $ "parIO: end " ++ diffTime start end
+        traceGang $ "Complete par   " ++ what ++ " in " ++ diffTime start end ++ "us."
 
 
 -- | Same as 'gangIO' but in the 'ST' monad.
-gangST :: Gang -> (Int -> ST s ()) -> ST s ()
-gangST g p = unsafeIOToST . gangIO g $ unsafeSTToIO . p
+gangST :: Gang -> String -> Workload -> (Int -> ST s ()) -> ST s ()
+gangST gang what workload p 
+        = unsafeIOToST 
+        $ gangIO gang what workload
+        $ unsafeSTToIO . p
 
 
 -- Tracing -------------------------------------------------------------------
 #if TRACE_GANG
 getGangTime :: IO Integer
 getGangTime
- = do   TOD sec pico <- getClockTime
-        return (pico + sec * 1000000000000)
+ = do   TOD sec pico    <- getClockTime
+        let !micro      = pico `div` 1000000
+        return (micro + sec * 1000000)
 
 diffTime :: Integer -> Integer -> String
 diffTime x y = show (y-x)
@@ -222,8 +264,7 @@ diffTime x y = show (y-x)
 -- | Emit a GHC event for debugging.
 traceGang :: String -> IO ()
 traceGang s
- = do   t <- getGangTime
-        traceEventIO $ show t ++ " @ " ++ s
+ = do   traceEventIO $ "GANG " ++ s
 
 #else
 getGangTime :: IO ()
@@ -236,8 +277,3 @@ diffTime _ _ = ""
 traceGang :: String -> IO ()
 traceGang _ = return ()
 #endif
-
-
--- | Emit a GHC event for debugging, in the `ST` monad.
-traceGangST :: String -> ST s ()
-traceGangST s = unsafeIOToST (traceGang s)
index 70c9be9..7b527b3 100644 (file)
@@ -35,9 +35,10 @@ generateD
         -> (Int -> a) 
         -> Dist a
 
-generateD what g f 
-        = traceEvent (show $ CompGenerate False what) 
-        $ runDistST g (myIndex >>= return . f)
+generateD what gang f 
+ = runDistST (CompGen False what) 
+        gang 
+        (myIndex >>= return . f)
 {-# NOINLINE generateD #-}
 
 
@@ -56,7 +57,7 @@ generateD_cheap
         -> Dist a
 
 generateD_cheap what g f 
-        = traceEvent (show $ CompGenerate True what) 
+        = traceEvent (show $ CompGen True what) 
         $ runDistST_seq g (myIndex >>= return . f)
 {-# NOINLINE generateD_cheap #-}
 
@@ -67,11 +68,12 @@ generateD_cheap what g f
 imapD'  :: (DT a, DT b) 
         => What -> Gang -> (Int -> a -> b) -> Dist a -> Dist b
 imapD' what gang f !d 
-  = traceEvent (show (CompMap $ what))
-  $ runDistST gang 
-        (do i <- myIndex
-            x <- myD d
-            return (f i x))
+  = runDistST (CompMap what) gang 
+  $ do  i               <- myIndex
+        x               <- myD d
+        let result      = f i x
+        deepSeqD result (return ())
+        return result
 {-# NOINLINE imapD' #-}
 
 
@@ -113,3 +115,13 @@ scanD what gang f z !d
                 scan md (i+1) (f x $ indexD (here "scanD") d i)
 {-# NOINLINE scanD #-}
 
+
+-- | Emit a GHC event for debugging, in the `ST` monad.
+{-
+traceGangST :: String -> ST s ()
+traceGangST s = unsafeIOToST (traceGang s)
+
+traceDistIO :: String -> IO ()
+traceDistIO s
+ = do   traceEventIO $ "DIST " ++ s
+-}
index 1aece2a..18ae9c7 100644 (file)
@@ -6,14 +6,15 @@ where
         
 
 
--- | What computation we are doing.
+-- | What distributed computation we are doing.
 data Comp
-        = CompGenerate  { compCheap     :: Bool
+        = CompGen       { compCheap     :: Bool
                         , compWhat      :: What}
 
         | CompMap       { compWhat      :: What }
         | CompFold      { compWhat      :: What }
         | CompScan      { compWhat      :: What }
+        | CompDist      What
         deriving Show
 
 -- | What sort of thing is being computed.
@@ -26,6 +27,9 @@ data What
         | WhatLengthIdx
         | WhatBpermute
 
+        -- Copy due to a join instruction.
+        | WhatJoinCopy  { whatElems     :: Int }
+
         | WhatFusedMapMap What What
         | WhatFusedMapGen What What
         | WhatFusedZipMap What What
index f5955f0..5e68d0b 100644 (file)
@@ -50,9 +50,13 @@ module Data.Array.Parallel.Unlifted.Parallel
           -- * Index and Extracts
         , indexsFromVector
         , indexsFromVectorsUPVSegd
+        , indexsFromVectorsUPVSegdP
+
         , extractsFromNestedUPSSegd
         , extractsFromVectorsUPSSegd
+
         , extractsFromVectorsUPVSegd
+        , extractsFromVectorsUPVSegdP
 
           -- * Subarrays
         , dropUP
index 13411ba..7dbf78d 100644 (file)
@@ -5,17 +5,23 @@
 module Data.Array.Parallel.Unlifted.Parallel.Extracts 
         ( -- * Scattered indexing
           indexsFromVector
+        , indexsFromVectorsUPVSegdP
         , indexsFromVectorsUPVSegd
 
           -- * Scattered extracts
         , extractsFromNestedUPSSegd
         , extractsFromVectorsUPSSegd
-        , extractsFromVectorsUPVSegd)
+
+        , extractsFromVectorsUPVSegdP
+        , extractsFromVectorsUPVSegd
+        , extractsFromVectorsUPSSegdSegmap)
 where
+import Data.Array.Parallel.Unlifted.Distributed
+import Data.Array.Parallel.Unlifted.Distributed.What
 import Data.Array.Parallel.Unlifted.Parallel.UPSSegd                    (UPSSegd)
 import Data.Array.Parallel.Unlifted.Parallel.UPVSegd                    (UPVSegd)
-import Data.Array.Parallel.Unlifted.Sequential.Vector                   as Seq
 import Data.Array.Parallel.Unlifted.Vectors                             (Vectors)
+import Data.Array.Parallel.Unlifted.Sequential.Vector                   as Seq
 import qualified Data.Array.Parallel.Unlifted.Parallel.UPSSegd          as UPSSegd
 import qualified Data.Array.Parallel.Unlifted.Parallel.UPVSegd          as UPVSegd
 import qualified Data.Array.Parallel.Unlifted.Sequential.UVSegd         as UVSegd
@@ -23,7 +29,8 @@ import qualified Data.Array.Parallel.Unlifted.Vectors                   as US
 import qualified Data.Array.Parallel.Unlifted.Stream                    as US
 import qualified Data.Array.Parallel.Unlifted.Sequential                as Seq
 import qualified Data.Vector                                            as V
-
+import Debug.Trace
+import Prelude  as P
 
 -- Indexvs --------------------------------------------------------------------
 -- | Lookup elements from a `Vector`.
@@ -41,6 +48,19 @@ indexsFromVector = Seq.indexsFromVector
 --
 --   TODO: make this parallel.
 --
+indexsFromVectorsUPVSegdP 
+        :: (Unbox a, US.Unboxes a)
+        => Vectors a -> UPVSegd -> Vector (Int, Int) -> Vector a
+
+indexsFromVectorsUPVSegdP vectors upvsegd vsrcixs
+ = splitJoinD theGang 
+        (mapD   (What "indexsFromVectorsUPVSegdP") theGang
+                (indexsFromVectorsUPVSegd vectors upvsegd))
+        vsrcixs
+{-# INLINE_UP indexsFromVectorsUPVSegdP #-}
+
+
+-- | Lookup elements from some Vectors through a `UPVSegd`
 indexsFromVectorsUPVSegd 
         :: (Unbox a, US.Unboxes a)
         => Vectors a -> UPVSegd -> Vector (Int, Int) -> Vector a
@@ -51,7 +71,7 @@ indexsFromVectorsUPVSegd vectors upvsegd vsrcixs
         !vsegids  = UPVSegd.takeVSegidsRedundant upvsegd
         !upssegd  = UPVSegd.takeUPSSegdRedundant upvsegd
         !ussegd   = UPSSegd.takeUSSegd upssegd
-   in   Seq.unstream
+   in  Seq.unstream
          $ US.streamElemsFromVectors     vectors
          $ US.streamSrcIxsThroughUSSegd  ussegd
          $ US.streamSrcIxsThroughVSegids vsegids
@@ -72,7 +92,6 @@ extractsFromNestedUPSSegd upssegd vectors
                 (UPSSegd.takeUSSegd upssegd)
 {-# INLINE_U extractsFromNestedUPSSegd #-}
 
-
 -- | TODO: make this parallel.
 extractsFromVectorsUPSSegd
         :: (Unbox a, US.Unboxes a)
@@ -87,7 +106,36 @@ extractsFromVectorsUPSSegd upssegd vectors
 {-# INLINE_UP extractsFromVectorsUPSSegd #-}
 
 
--- | TODO: make this parallel.
+
+-- From UPVSegd ---------------------------------------------------------------
+-- | Parallel extracts from UPVSegd and Segmap
+--   TODO: This just distributes the segmap over the gang, and will be unbalanced
+--         if there aren't many segments, or they have varying sizes.
+extractsFromVectorsUPVSegdP
+        :: (Unbox a, US.Unboxes a)
+        => UPVSegd
+        -> Vectors a
+        -> Vector a
+
+extractsFromVectorsUPVSegdP upvsegd vectors
+ =      splitJoinD theGang 
+                (mapD   (what upvsegd)
+                        theGang
+                        (extractsFromVectorsUPSSegdSegmap 
+                                (UPVSegd.takeUPSSegdRedundant upvsegd)
+                                vectors))
+                (UPVSegd.takeVSegidsRedundant upvsegd)
+
+ where  what upvsegd
+         = let  lens    = UPVSegd.takeLengths upvsegd
+           in   (What $ "dph-prim-par: extractsFromVectorsUPVSegdP." 
+                      P.++ show (UPVSegd.takeLengths upvsegd))
+        {-# NOINLINE what #-}
+
+{-# INLINE_UP extractsFromVectorsUPVSegdP #-}
+
+
+-- | Sequential extracts from UPVSegd.
 extractsFromVectorsUPVSegd
         :: (Unbox a, US.Unboxes a)
         => UPVSegd
@@ -102,3 +150,18 @@ extractsFromVectorsUPVSegd upvsegd vectors
                 (UPSSegd.takeUSSegd $ UPVSegd.takeUPSSegdRedundant upvsegd)
 {-# INLINE_UP extractsFromVectorsUPVSegd #-}
 
+
+-- | Sequential extracts from USSegd and Segmap
+extractsFromVectorsUPSSegdSegmap
+        :: (Unbox a, US.Unboxes a)
+        => UPSSegd
+        -> Vectors a
+        -> Vector Int
+        -> Vector a
+
+extractsFromVectorsUPSSegdSegmap upssegd vectors segmap
+        = Seq.unstream 
+        $ US.streamSegsFromVectorsUSSegdSegmap vectors
+                (UPSSegd.takeUSSegd upssegd)
+                segmap
+{-# INLINE_UP extractsFromVectorsUPSSegdSegmap #-}
index 50995f7..345ad50 100644 (file)
@@ -115,7 +115,7 @@ instance PprPhysical UPVSegd where
 -- | O(1). Check the internal consistency of a virutal segmentation descriptor.
 ---
 --   * TODO: this doesn't do any checks yet.
---\b
+--
 valid :: UPVSegd -> Bool
 valid UPVSegd{} = True
 {-# NOINLINE valid #-}
index a2c79bd..86033a5 100644 (file)
@@ -10,7 +10,8 @@ module Data.Array.Parallel.Unlifted.Sequential.Extracts
           -- * Scattered extracts
         , extractsFromNestedUSSegd
         , extractsFromVectorsUSSegd
-        , extractsFromVectorsUVSegd)
+        , extractsFromVectorsUVSegd
+        , extractsFromVectorsUSSegdSegmap)
 where
 import Data.Array.Parallel.Unlifted.Stream                      as US
 import Data.Array.Parallel.Unlifted.Vectors                     as US
@@ -76,3 +77,16 @@ extractsFromVectorsUVSegd
 extractsFromVectorsUVSegd uvsegd vectors
         = U.unstream  $ US.streamSegsFromVectorsUVSegd vectors uvsegd
 {-# INLINE_U extractsFromVectorsUVSegd #-}
+
+
+-- | Copy segments defined by a segmap and `USSegd` into a new array.
+extractsFromVectorsUSSegdSegmap
+        :: (Unbox a, US.Unboxes a)
+        => USSegd
+        -> Vector  Int
+        -> Vectors a
+        -> Vector  a
+
+extractsFromVectorsUSSegdSegmap ussegd segmap vectors
+        = U.unstream $ US.streamSegsFromVectorsUSSegdSegmap vectors ussegd segmap
+
index a66abca..7368dd3 100644 (file)
@@ -36,6 +36,7 @@ import Data.Array.Parallel.Pretty                               hiding (empty)
 import Prelude                                                  hiding (length)
 import qualified Data.Array.Parallel.Unlifted.Sequential.USegd  as USegd
 import qualified Data.Array.Parallel.Unlifted.Sequential.Vector as U
+import Debug.Trace
 
 here :: String -> String 
 here s = "Data.Array.Parallel.Unlifted.Sequential.USSegd." ++ s
@@ -215,10 +216,13 @@ appendWith
 appendWith
         (USSegd _ starts1 srcs1 usegd1) pdatas1
         (USSegd _ starts2 srcs2 usegd2) _
-        = USSegd False
-                 (starts1  U.++  starts2)
-                 (srcs1    U.++  U.map (+ pdatas1) srcs2)
-                 (USegd.append usegd1 usegd2)
+ = traceEvent 
+        (  "dph-prim-seq: USSegd.appendWith."
+        ++ " length(result) = " ++ show (U.length starts1 + U.length starts2))
+ $ USSegd False
+        (starts1  U.++  starts2)
+        (srcs1    U.++  U.map (+ pdatas1) srcs2)
+        (USegd.append usegd1 usegd2)
 {-# NOINLINE appendWith #-}
 --  NOINLINE because we're worried about code explosion. Might be useful though.
 
@@ -230,7 +234,10 @@ appendWith
 cullOnVSegids :: Vector Int -> USSegd -> (Vector Int, USSegd)
 cullOnVSegids vsegids (USSegd _ starts sources usegd)
  = {-# SCC "cullOnVSegids" #-}
- let    -- Determine which of the psegs are still reachable from the vsegs.
+   traceEvent 
+        (  "dph-prim-seq: USSegd.cullOnVSegids."
+        ++ " length(segmap) = " ++ show (U.length vsegids))
+ $ let  -- Determine which of the psegs are still reachable from the vsegs.
         -- This produces an array of flags, 
         --    with reachable   psegs corresponding to 1
         --    and  unreachable psegs corresponding to 0
@@ -239,7 +246,7 @@ cullOnVSegids vsegids (USSegd _ starts sources usegd)
         --   => psegids_used:   [1 1 0 1 0 1 1]
         --  
         --  Note that psegids '2' and '4' are not in vsegids_packed.
-        psegids_used
+        !psegids_used
          = U.bpermuteDft (USegd.length usegd)
                          (const False)
                          (U.zip vsegids (U.replicate (U.length vsegids) True))
@@ -247,7 +254,7 @@ cullOnVSegids vsegids (USSegd _ starts sources usegd)
         -- Produce an array of used psegs.
         --  eg  psegids_used:   [1 1 0 1 0 1 1]
         --      psegids_packed: [0 1 3 5 6]
-        psegids_packed
+        !psegids_packed
          = U.pack (U.enumFromTo 0 (U.length psegids_used)) psegids_used
 
         -- Produce an array that maps psegids in the source array onto
@@ -260,7 +267,7 @@ cullOnVSegids vsegids (USSegd _ starts sources usegd)
         --  eg  psegids_packed: [0 1 3 5 6]
         --                      [0 1 2 3 4]
         --      psegids_map:    [0 1 -1 2 -1 3 4]
-        psegids_map
+        !psegids_map
          = U.bpermuteDft (USegd.length usegd)
                          (const (-1))
                          (U.zip psegids_packed (U.enumFromTo 0 (U.length psegids_packed - 1)))
@@ -273,18 +280,18 @@ cullOnVSegids vsegids (USSegd _ starts sources usegd)
         -- 
         --      vsegids':       [0 1 1 2 3 3 4 4]
         --
-        vsegids'  = U.map (U.index (here "cullOnVSegids") psegids_map) vsegids
+        !vsegids'  = U.map (U.index (here "cullOnVSegids") psegids_map) vsegids
 
         -- Rebuild the usegd.
-        starts'   = U.pack starts  psegids_used
-        sources'  = U.pack sources psegids_used
+        !starts'   = U.pack starts  psegids_used
+        !sources'  = U.pack sources psegids_used
 
-        lengths'  = U.pack (USegd.takeLengths usegd) psegids_used
-        usegd'    = USegd.fromLengths lengths'
+        !lengths'  = U.pack (USegd.takeLengths usegd) psegids_used
+        !usegd'    = USegd.fromLengths lengths'
         
-        ussegd'   = USSegd False starts' sources' usegd'
+        !ussegd'   = USSegd False starts' sources' usegd'
 
-     in  (vsegids', ussegd')
+     in (vsegids', ussegd')
 
 {-# NOINLINE cullOnVSegids #-}
 --  NOINLINE because it's complicated and won't fuse with anything
index 6e73c6b..4572f14 100644 (file)
@@ -27,6 +27,7 @@ import qualified Data.Array.Parallel.Unlifted.Sequential.Vector as U
 import Data.Array.Parallel.Unlifted.Sequential.Vector           (Vector)
 import Data.Array.Parallel.Pretty                               hiding (empty)
 import Prelude                                                  hiding (length)
+import Debug.Trace
 
 here :: String -> String 
 here s = "Data.Array.Parallel.Unlifted.Sequential.USegd." ++ s
@@ -138,11 +139,12 @@ getSeg (USegd lengths indices _ ) ix
 --   two arrays.
 append :: USegd -> USegd -> USegd
 append (USegd lengths1 indices1 elems1)
-            (USegd lengths2 indices2 elems2)
- = USegd (lengths1 U.++ lengths2)
+       (USegd lengths2 indices2 elems2)
+ = traceEvent ("dph-prim-seq: USegd.append")
+ $ USegd (lengths1 U.++ lengths2)
          (indices1 U.++ U.map (+ elems1) indices2)
          (elems1 + elems2)
-{-# INLINE_U append #-}
+{-# NOINLINE append #-}
 
 
 -- | O(segs) Extract a slice of a segment descriptor, avoiding copying where possible.
index c6b1c5b..4f3bbcd 100644 (file)
@@ -49,6 +49,7 @@ import Prelude                                                  hiding (length)
 import qualified Data.Array.Parallel.Unlifted.Sequential.Vector as U
 import qualified Data.Array.Parallel.Unlifted.Sequential.USSegd as USSegd
 import qualified Data.Array.Parallel.Unlifted.Sequential.USegd  as USegd
+import Debug.Trace
 
 here :: String -> String 
 here s = "Data.Array.Parallel.Unlifted.Sequential.UVSegd." ++ s
@@ -307,9 +308,13 @@ getSeg uvsegd ix
 -- 
 unsafeDemoteToUSSegd :: UVSegd -> USSegd
 unsafeDemoteToUSSegd uvsegd
- | uvsegd_manifest uvsegd       = uvsegd_ussegd_culled uvsegd           -- TODO: take the redundant ones
- | otherwise
- = let  vsegids         = uvsegd_vsegids_culled uvsegd
+ = traceEvent 
+        (  "dph-prim-seq: UVSegd.unsafeDemoteToUSSSegd"
+        ++ " length(segmap) = " ++ show (U.length $ takeVSegids uvsegd))
+ $ if uvsegd_manifest uvsegd    
+    then uvsegd_ussegd_culled uvsegd           -- TODO: take the redundant ones
+    else let 
+        vsegids         = uvsegd_vsegids_culled uvsegd
         ussegd          = uvsegd_ussegd_culled  uvsegd
         starts'         = U.bpermute (USSegd.takeStarts  ussegd) vsegids
         sources'        = U.bpermute (USSegd.takeSources ussegd) vsegids
@@ -333,7 +338,10 @@ unsafeDemoteToUSSegd uvsegd
 --
 unsafeDemoteToUSegd :: UVSegd -> USegd
 unsafeDemoteToUSegd (UVSegd _ _ vsegids _ ussegd)
-        = USegd.fromLengths
+ = traceEvent
+        (  "dph-prim-seq: UVSegd.unsafeDemoteToUSegd"
+        ++ " length(segmap) = " ++ show (U.length vsegids))
+ $ USegd.fromLengths
         $ U.bpermute (USSegd.takeLengths ussegd) vsegids
 {-# NOINLINE unsafeDemoteToUSegd #-}
 --  NOINLINE because it won't fuse with anything.
@@ -433,7 +441,11 @@ appendWith
         (UVSegd _ _ vsegids1 _ ussegd1) pdatas1
         (UVSegd _ _ vsegids2 _ ussegd2) pdatas2
 
- = let  -- vsegids releative to appended psegs
+ = traceEvent 
+        (  "dph-prim-seq: UVSegd.appendWith"
+        ++ "length(result) = " ++ (show $ U.length vsegids1 + U.length vsegids2))
+ $ let  
+        -- vsegids releative to appended psegs
         vsegids1' = vsegids1
         vsegids2' = U.map (+ USSegd.length ussegd1) vsegids2
         
@@ -446,7 +458,7 @@ appendWith
                                 ussegd2 pdatas2
                                  
    in   UVSegd False vsegids' vsegids' ussegd' ussegd'
-{-# INLINE_U appendWith #-}
+{-# NOINLINE appendWith #-}
 
 
 -- combine --------------------------------------------------------------------
@@ -484,16 +496,20 @@ appendWith
 combine2
         :: USel2       -- ^ Selector for the combine operation.
         -> UVSegd      -- ^ Descriptor of first array.
-        -> Int          -- ^ Number of flat physical arrays for first descriptor.
+        -> Int         -- ^ Number of flat physical arrays for first descriptor.
         -> UVSegd      -- ^ Descriptor of second array.
-        -> Int          -- ^ Number of flat physical arrays for second descriptor.
+        -> Int         -- ^ Number of flat physical arrays for second descriptor.
         -> UVSegd
         
 combine2  usel2
         (UVSegd _ _ vsegids1 _ ussegd1) pdatas1
         (UVSegd _ _ vsegids2 _ ussegd2) pdatas2
 
- = let  -- vsegids relative to combined psegs
+ = traceEvent
+        (  "dph-prim-seq: UVSegd.combine2"
+        ++ "length(result) = " ++ show (U.length $ tagsUSel2 usel2))
+ $ let  
+        -- vsegids relative to combined psegs
         vsegids1' = vsegids1
         vsegids2' = U.map (+ (U.length vsegids1)) vsegids2
 
@@ -507,4 +523,5 @@ combine2  usel2
                                 ussegd2 pdatas2
                                   
    in   UVSegd False vsegids' vsegids' ussegd' ussegd'
-{-# INLINE_U combine2 #-}
+{-# NOINLINE combine2 #-}
+
index 48a0b0b..8496630 100644 (file)
@@ -3,7 +3,8 @@
 module Data.Array.Parallel.Unlifted.Stream.Segments
         ( streamSegsFromNestedUSSegd
         , streamSegsFromVectorsUSSegd
-        , streamSegsFromVectorsUVSegd)
+        , streamSegsFromVectorsUVSegd
+        , streamSegsFromVectorsUSSegdSegmap)
 where
 import Data.Vector.Fusion.Stream.Size
 import Data.Vector.Fusion.Stream.Monadic
@@ -14,8 +15,7 @@ import Data.Array.Parallel.Unlifted.Sequential.UVSegd           (UVSegd(..))
 import qualified Data.Array.Parallel.Unlifted.Vectors           as US
 import qualified Data.Array.Parallel.Unlifted.Sequential.USegd  as USegd
 import qualified Data.Array.Parallel.Unlifted.Sequential.USSegd as USSegd
-import qualified Data.Array.Parallel.Unlifted.Sequential.UVSegd as UVSegd
-import qualified Data.Vector.Unboxed                            as U
+import qualified Data.Array.Parallel.Unlifted.Sequential.Vector as U
 import qualified Data.Vector                                    as V
 import qualified Data.Primitive.ByteArray                       as P
 import System.IO.Unsafe
@@ -55,7 +55,7 @@ streamSegsFromNestedUSSegd
          = return $ Done
          
          -- Current pseg is done
-         | ix   >= pseglens `U.unsafeIndex` pseg 
+         | ix   >= U.index here pseglens pseg 
          = return $ Skip (pseg + 1, 0)
 
          -- Stream an element from this pseg
@@ -164,14 +164,29 @@ streamSegsFromVectorsUVSegd
 
 streamSegsFromVectorsUVSegd
         vectors
-        uvsegd@(UVSegd _ _ vsegids _ (USSegd _ segStarts segSources usegd) )
- = segStarts `seq` segSources `seq` uvsegd `seq` vectors `seq`
+        (UVSegd _ _ segmap _ ussegd)
+ = streamSegsFromVectorsUSSegdSegmap vectors ussegd segmap
+{-# INLINE_STREAM streamSegsFromVectorsUVSegd #-}
+
+
+streamSegsFromVectorsUSSegdSegmap
+        :: (Unboxes a, Monad m)
+        => Vectors a            -- ^ Vectors holding source data.
+        -> USSegd               -- ^ Scattered segment descriptor
+        -> Vector Int           -- ^ Segmap
+        -> Stream m a
+
+streamSegsFromVectorsUSSegdSegmap
+        vectors ussegd@(USSegd _ segStarts segSources usegd) segmap
+ = segStarts `seq` segSources `seq` usegd `seq` segmap `seq`
    let  here            = "stremSegsFromVectorsUVSegd"
 
-        !elemsTotal     = U.sum $ UVSegd.takeLengths uvsegd
+        -- Total number of elements to be streamed
+        !lengths        = USSegd.takeLengths ussegd
+        !elemsTotal     = U.sum $ U.map (U.index here lengths) segmap
 
         -- Total number of segments.
-        !segsTotal      = UVSegd.length uvsegd
+        !segsTotal      = U.length segmap
  
         -- Length of each physical segment.
         !segLens        = USegd.takeLengths usegd
@@ -188,7 +203,7 @@ streamSegsFromVectorsUVSegd
                   
                        -- Move to the next seg.
                   else let ixSeg'       = ixSeg + 1
-                           ixPSeg       = index here vsegids    ixSeg'
+                           ixPSeg       = index here segmap     ixSeg'
                            sourceSeg    = index here segSources ixPSeg
                            startSeg     = index here segStarts  ixPSeg
                            lenSeg       = index here segLens    ixPSeg
@@ -217,6 +232,6 @@ streamSegsFromVectorsUVSegd
         -- It's important that we set the result stream size, so Data.Vector
         -- doesn't need to add code to grow the result when it overflows.
    in   Stream fnSeg initState (Exact elemsTotal)
-{-# INLINE_STREAM streamSegsFromVectorsUVSegd #-}
+{-# INLINE_STREAM streamSegsFromVectorsUSSegdSegmap #-}