Add a note about unreachable branches
[packages/dph.git] / dph-base / Data / Array / Parallel / Stream.hs
1 -----------------------------------------------------------------------------
2 -- |
3 -- Module : Data.Array.Parallel.Stream
4 -- Copyright : (c) 2010 Roman Leshchinskiy
5 -- License : see libraries/ndp/LICENSE
6 --
7 -- Maintainer : Roman Leshchinskiy <rl@cse.unsw.edu.au>
8 -- Stability : internal
9 -- Portability : non-portable (existentials)
10 --
11 -- Stream functions not implemented in vector
12 --
13 -- TODO: The use of INLINE pragmas in some of these function isn't consistent.
14 -- for indexedS and combine2ByTagS, there is an INLINE_INNER on the 'next'
15 -- function, but replicateEachS uses a plain INLINE and fold1SS uses
16 -- a hard INLINE [0]. Can we make a rule that all top-level stream functions
17 -- in this module have INLINE_STREAM, and all 'next' functions have
18 -- INLINE_INNER? If not we should document the reasons for the special cases.
19 --
20 -- TODO: The behavour of indicesSS looks suspiciously inconsistent.
21 --
22 --
23 -- Note: [NEVER ENTERED]
24 -- ~~~~~~~~~~~~~~~~~~~~~
25 -- Cases marked NEVER ENTERED should be unreachable, assuming there are no
26 -- bugs elsewhere in the library. We used to throw an error when these
27 -- branches were entered, but this was confusing the simplifier. It would be
28 -- better if we could put the errors back, but we'll need to check that
29 -- performance does not regress when we do so.
30 --
31
32 #include "fusion-phases.h"
33
34 module Data.Array.Parallel.Stream (
35
36 -- * Flat stream operators
37 indexedS, replicateEachS, replicateEachRS,
38 interleaveS, combine2ByTagS,
39 enumFromToEachS, enumFromStepLenEachS,
40
41 -- * Segmented stream operators
42 foldSS, fold1SS, combineSS, appendSS,
43 foldValuesR,
44 indicesSS
45 ) where
46
47 import Data.Array.Parallel.Base ( Tag )
48
49 import qualified Data.Vector.Fusion.Stream as S
50 import Data.Vector.Fusion.Stream.Monadic ( Stream(..), Step(..) )
51 import Data.Vector.Fusion.Stream.Size ( Size(..) )
52
53 -- | Tag each element of an stream with its index in that stream.
54 --
55 -- @
56 -- indexed [42,93,13]
57 -- = [(0,42), (1,93), (2,13)]
58 -- @
59 indexedS :: S.Stream a -> S.Stream (Int,a)
60 {-# INLINE_STREAM indexedS #-}
61 indexedS (Stream next s n) = Stream next' (0,s) n
62 where
63 {-# INLINE_INNER next' #-}
64 next' (i,s) = do
65 r <- next s
66 case r of
67 Yield x s' -> return $ Yield (i,x) (i+1,s')
68 Skip s' -> return $ Skip (i,s')
69 Done -> return Done
70
71
72 -- | Given a stream of pairs containing a count an an element,
73 -- replicate element the number of times given by the count.
74 --
75 -- The first parameter sets the size hint of the resulting stream.
76 --
77 -- @
78 -- replicateEach 10 [(2,10), (5,20), (3,30)]
79 -- = [10,10,20,20,20,20,20,30,30,30]
80 -- @
81 replicateEachS :: Int -> S.Stream (Int,a) -> S.Stream a
82 {-# INLINE_STREAM replicateEachS #-}
83 replicateEachS n (Stream next s _) =
84 Stream next' (0,Nothing,s) (Exact n)
85 where
86 {-# INLINE next' #-}
87 next' (0, _, s) =
88 do
89 r <- next s
90 case r of
91 Done -> return Done
92 Skip s' -> return $ Skip (0, Nothing, s')
93 Yield (k,x) s' -> return $ Skip (k, Just x,s')
94 next' (k,Nothing,s) = return Done -- NEVER ENTERED (See Note)
95 next' (k,Just x,s) = return $ Yield x (k-1,Just x,s)
96
97
98 -- | Repeat each element in the stream the given number of times.
99 --
100 -- @
101 -- replicateEach 2 [10,20,30]
102 -- = [10,10,20,20,30,30]
103 -- @
104 --
105 replicateEachRS :: Int -> S.Stream a -> S.Stream a
106 {-# INLINE_STREAM replicateEachRS #-}
107 replicateEachRS !n (Stream next s sz)
108 = Stream next' (0,Nothing,s) (sz `multSize` n)
109 where
110 next' (0,_,s) =
111 do
112 r <- next s
113 case r of
114 Done -> return Done
115 Skip s' -> return $ Skip (0,Nothing,s')
116 Yield x s' -> return $ Skip (n,Just x,s')
117 next' (i,Nothing,s) = return Done -- NEVER ENTERED (See Note)
118 next' (i,Just x,s) = return $ Yield x (i-1,Just x,s)
119
120
121 -- | Multiply a size hint by a scalar.
122 multSize :: Size -> Int -> Size
123 multSize (Exact n) k = Exact (n*k)
124 multSize (Max n) k = Max (n*k)
125 multSize Unknown _ = Unknown
126
127
128 -- | Interleave the elements of two streams. We alternate between the first
129 -- and second streams, stopping when we can't find a matching element.
130 --
131 -- @
132 -- interleave [2,3,4] [10,20,30] = [2,10,3,20,4,30]
133 -- interleave [2,3] [10,20,30] = [2,10,3,20]
134 -- interleave [2,3,4] [10,20] = [2,10,3,20,4]
135 -- @
136 --
137 interleaveS :: S.Stream a -> S.Stream a -> S.Stream a
138 {-# INLINE_STREAM interleaveS #-}
139 interleaveS (Stream next1 s1 n1) (Stream next2 s2 n2)
140 = Stream next (False,s1,s2) (n1+n2)
141 where
142 {-# INLINE next #-}
143 next (False,s1,s2) =
144 do
145 r <- next1 s1
146 case r of
147 Yield x s1' -> return $ Yield x (True ,s1',s2)
148 Skip s1' -> return $ Skip (False,s1',s2)
149 Done -> return Done
150
151 next (True,s1,s2) =
152 do
153 r <- next2 s2
154 case r of
155 Yield x s2' -> return $ Yield x (False,s1,s2')
156 Skip s2' -> return $ Skip (True ,s1,s2')
157 Done -> return Done -- NEVER ENTERED (See Note)
158
159
160 -- | Combine two streams, using a tag stream to tell us which of the data
161 -- streams to take the next element from.
162 --
163 -- If there are insufficient elements in the data strams for the provided
164 -- tag stream then `error`.
165 --
166 -- @
167 -- combine2ByTag [0,1,1,0,0,1] [1,2,3] [4,5,6]
168 -- = [1,4,5,2,3,6]
169 -- @
170 --
171 combine2ByTagS :: S.Stream Tag -> S.Stream a -> S.Stream a -> S.Stream a
172 {-# INLINE_STREAM combine2ByTagS #-}
173 combine2ByTagS (Stream next_tag s m) (Stream next0 s0 _)
174 (Stream next1 s1 _)
175 = Stream next (Nothing,s,s0,s1) m
176 where
177 {-# INLINE_INNER next #-}
178 next (Nothing,s,s0,s1)
179 = do
180 r <- next_tag s
181 case r of
182 Done -> return Done
183 Skip s' -> return $ Skip (Nothing,s',s0,s1)
184 Yield t s' -> return $ Skip (Just t, s',s0,s1)
185
186 next (Just 0,s,s0,s1)
187 = do
188 r <- next0 s0
189 case r of
190 Done -> error "combine2ByTagS: stream 1 too short"
191 Skip s0' -> return $ Skip (Just 0, s,s0',s1)
192 Yield x s0' -> return $ Yield x (Nothing,s,s0',s1)
193
194 next (Just t,s,s0,s1)
195 = do
196 r <- next1 s1
197 case r of
198 Done -> error "combine2ByTagS: stream 2 too short"
199 Skip s1' -> return $ Skip (Just t, s,s0,s1')
200 Yield x s1' -> return $ Yield x (Nothing,s,s0,s1')
201
202
203 -- | Create a stream of integer ranges. The pairs in the input stream
204 -- give the first and last value of each range.
205 --
206 -- The first parameter gives the size hint for the resulting stream.
207 --
208 -- @
209 -- enumFromToEach 11 [(2,5), (10,16), (20,22)]
210 -- = [2,3,4,5,10,11,12,13,14,15,16,20,21,22]
211 -- @
212 --
213 enumFromToEachS :: Int -> S.Stream (Int,Int) -> S.Stream Int
214 {-# INLINE_STREAM enumFromToEachS #-}
215 enumFromToEachS n (Stream next s _)
216 = Stream next' (Nothing,s) (Exact n)
217 where
218 {-# INLINE_INNER next' #-}
219 next' (Nothing,s)
220 = do
221 r <- next s
222 case r of
223 Yield (k,m) s' -> return $ Skip (Just (k,m),s')
224 Skip s' -> return $ Skip (Nothing, s')
225 Done -> return Done
226
227 next' (Just (k,m),s)
228 | k > m = return $ Skip (Nothing, s)
229 | otherwise = return $ Yield k (Just (k+1,m),s)
230
231
232 -- | Create a stream of integer ranges. The triples in the input stream
233 -- give the first value, increment, length of each range.
234 --
235 -- The first parameter gives the size hint for the resulting stream.
236 --
237 -- @
238 -- enumFromStepLenEach [(1,1,5), (10,2,4), (20,3,5)]
239 -- = [1,2,3,4,5,10,12,14,16,20,23,26,29,32]
240 -- @
241 --
242 enumFromStepLenEachS :: Int -> S.Stream (Int,Int,Int) -> S.Stream Int
243 {-# INLINE_STREAM enumFromStepLenEachS #-}
244 enumFromStepLenEachS len (Stream next s _)
245 = Stream next' (Nothing,s) (Exact len)
246 where
247 {-# INLINE_INNER next' #-}
248 next' (Nothing,s)
249 = do
250 r <- next s
251 case r of
252 Yield (from,step,len) s' -> return $ Skip (Just (from,step,len),s')
253 Skip s' -> return $ Skip (Nothing,s')
254 Done -> return Done
255
256 next' (Just (from,step,0),s) = return $ Skip (Nothing,s)
257 next' (Just (from,step,n),s)
258 = return $ Yield from (Just (from+step,step,n-1),s)
259
260
261 -- | Segmented Stream fold. Take segments from the given stream and fold each
262 -- using the supplied function and initial element.
263 --
264 -- @
265 -- foldSS (+) 0 [2, 3, 2] [10, 20, 30, 40, 50, 60, 70]
266 -- = [30,120,130]
267 -- @
268 --
269 foldSS :: (a -> b -> a) -- ^ function to perform the fold
270 -> a -- ^ initial element of each fold
271 -> S.Stream Int -- ^ stream of segment lengths
272 -> S.Stream b -- ^ stream of input data
273 -> S.Stream a -- ^ stream of fold results
274
275 {-# INLINE_STREAM foldSS #-}
276 foldSS f z (Stream nexts ss sz) (Stream nextv vs _) =
277 Stream next (Nothing,z,ss,vs) sz
278 where
279 {-# INLINE next #-}
280 next (Nothing,x,ss,vs) =
281 do
282 r <- nexts ss
283 case r of
284 Done -> return Done
285 Skip ss' -> return $ Skip (Nothing,x, ss', vs)
286 Yield n ss' -> return $ Skip (Just n, z, ss', vs)
287
288 next (Just 0,x,ss,vs) =
289 return $ Yield x (Nothing,z,ss,vs)
290 next (Just n,x,ss,vs) =
291 do
292 r <- nextv vs
293 case r of
294 Done -> return Done -- NEVER ENTERED (See Note)
295 Skip vs' -> return $ Skip (Just n,x,ss,vs')
296 Yield y vs' -> let r = f x y
297 in r `seq` return (Skip (Just (n-1), r, ss, vs'))
298
299
300 -- | Like `foldSS`, but use the first member of each chunk as the initial
301 -- element for the fold.
302 fold1SS :: (a -> a -> a) -> S.Stream Int -> S.Stream a -> S.Stream a
303 {-# INLINE_STREAM fold1SS #-}
304 fold1SS f (Stream nexts ss sz) (Stream nextv vs _) =
305 Stream next (Nothing,Nothing,ss,vs) sz
306 where
307 {-# INLINE [0] next #-}
308 next (Nothing,Nothing,ss,vs) =
309 do
310 r <- nexts ss
311 case r of
312 Done -> return Done
313 Skip ss' -> return $ Skip (Nothing,Nothing,ss',vs)
314 Yield n ss' -> return $ Skip (Just n ,Nothing,ss',vs)
315
316 next (Just !n,Nothing,ss,vs) =
317 do
318 r <- nextv vs
319 case r of
320 Done -> return Done -- NEVER ENTERED (See Note)
321 Skip vs' -> return $ Skip (Just n, Nothing,ss,vs')
322 Yield x vs' -> return $ Skip (Just (n-1),Just x, ss,vs')
323
324 next (Just 0,Just x,ss,vs) =
325 return $ Yield x (Nothing,Nothing,ss,vs)
326
327 next (Just n,Just x,ss,vs) =
328 do
329 r <- nextv vs
330 case r of
331 Done -> return Done -- NEVER ENTERED (See Note)
332 Skip vs' -> return $ Skip (Just n ,Just x ,ss,vs')
333 Yield y vs' -> let r = f x y
334 in r `seq` return (Skip (Just (n-1),Just r,ss,vs'))
335
336
337 -- | Segmented Stream combine. Like `combine2ByTagS`, except that the tags select
338 -- entire segments of each data stream, instead of selecting one element at a time.
339 --
340 -- @
341 -- combineSS [True, True, False, True, False, False]
342 -- [2,1,3] [10,20,30,40,50,60]
343 -- [1,2,3] [11,22,33,44,55,66]
344 -- = [10,20,30,11,40,50,60,22,33,44,55,66]
345 -- @
346 --
347 -- This says take two elements from the first stream, then another one element
348 -- from the first stream, then one element from the second stream, then three
349 -- elements from the first stream...
350 --
351 combineSS
352 :: S.Stream Bool -- ^ tag values
353 -> S.Stream Int -- ^ segment lengths for first data stream
354 -> S.Stream a -- ^ first data stream
355 -> S.Stream Int -- ^ segment lengths for second data stream
356 -> S.Stream a -- ^ second data stream
357 -> S.Stream a
358
359 {-# INLINE_STREAM combineSS #-}
360 combineSS (Stream nextf sf _)
361 (Stream nexts1 ss1 _) (Stream nextv1 vs1 nv1)
362 (Stream nexts2 ss2 _) (Stream nextv2 vs2 nv2)
363 = Stream next (Nothing,True,sf,ss1,vs1,ss2,vs2)
364 (nv1+nv2)
365 where
366 {-# INLINE next #-}
367 next (Nothing,f,sf,ss1,vs1,ss2,vs2) =
368 do
369 r <- nextf sf
370 case r of
371 Done -> return Done
372 Skip sf' -> return $ Skip (Nothing,f,sf',ss1,vs1,ss2,vs2)
373 Yield c sf'
374 | c ->
375 do
376 r <- nexts1 ss1
377 case r of
378 Done -> return Done
379 Skip ss1' -> return $ Skip (Nothing,f,sf,ss1',vs1,ss2,vs2)
380 Yield n ss1' -> return $ Skip (Just n,c,sf',ss1',vs1,ss2,vs2)
381
382 | otherwise ->
383 do
384 r <- nexts2 ss2
385 case r of
386 Done -> return Done
387 Skip ss2' -> return $ Skip (Nothing,f,sf,ss1,vs1,ss2',vs2)
388 Yield n ss2' -> return $ Skip (Just n,c,sf',ss1,vs1,ss2',vs2)
389
390 next (Just 0,_,sf,ss1,vs1,ss2,vs2) =
391 return $ Skip (Nothing,True,sf,ss1,vs1,ss2,vs2)
392
393 next (Just n,True,sf,ss1,vs1,ss2,vs2) =
394 do
395 r <- nextv1 vs1
396 case r of
397 Done -> return Done
398 Skip vs1' -> return $ Skip (Just n,True,sf,ss1,vs1',ss2,vs2)
399 Yield x vs1' -> return $ Yield x (Just (n-1),True,sf,ss1,vs1',ss2,vs2)
400
401 next (Just n,False,sf,ss1,vs1,ss2,vs2) =
402 do
403 r <- nextv2 vs2
404 case r of
405 Done -> return Done
406 Skip vs2' -> return $ Skip (Just n,False,sf,ss1,vs1,ss2,vs2')
407 Yield x vs2' -> return $ Yield x (Just (n-1),False,sf,ss1,vs1,ss2,vs2')
408
409
410 -- | Segmented Strem append. Append corresponding segments from each stream.
411 --
412 -- @
413 -- appendSS [2, 1, 3] [10, 20, 30, 40, 50, 60]
414 -- [1, 3, 2] [11, 22, 33, 44, 55, 66]
415 -- = [10,20,11,30,22,33,44,40,50,60,55,66]
416 -- @
417 --
418 appendSS
419 :: S.Stream Int -- ^ segment lengths for first data stream
420 -> S.Stream a -- ^ first data stream
421 -> S.Stream Int -- ^ segment lengths for second data stream
422 -> S.Stream a -- ^ second data stream
423 -> S.Stream a
424
425 {-# INLINE_STREAM appendSS #-}
426 appendSS (Stream nexts1 ss1 ns1) (Stream nextv1 sv1 nv1)
427 (Stream nexts2 ss2 ns2) (Stream nextv2 sv2 nv2)
428 = Stream next (True,Nothing,ss1,sv1,ss2,sv2) (nv1 + nv2)
429 where
430 {-# INLINE next #-}
431 next (True,Nothing,ss1,sv1,ss2,sv2) =
432 do
433 r <- nexts1 ss1
434 case r of
435 Done -> return $ Done
436 Skip ss1' -> return $ Skip (True,Nothing,ss1',sv1,ss2,sv2)
437 Yield n ss1' -> return $ Skip (True,Just n ,ss1',sv1,ss2,sv2)
438
439 next (True,Just 0,ss1,sv1,ss2,sv2)
440 = return $ Skip (False,Nothing,ss1,sv1,ss2,sv2)
441
442 next (True,Just n,ss1,sv1,ss2,sv2) =
443 do
444 r <- nextv1 sv1
445 case r of
446 Done -> return Done -- NEVER ENTERED (See Note)
447 Skip sv1' -> return $ Skip (True,Just n,ss1,sv1',ss2,sv2)
448 Yield x sv1' -> return $ Yield x (True,Just (n-1),ss1,sv1',ss2,sv2)
449
450 next (False,Nothing,ss1,sv1,ss2,sv2) =
451 do
452 r <- nexts2 ss2
453 case r of
454 Done -> return Done -- NEVER ENTERED (See Note)
455 Skip ss2' -> return $ Skip (False,Nothing,ss1,sv1,ss2',sv2)
456 Yield n ss2' -> return $ Skip (False,Just n,ss1,sv1,ss2',sv2)
457
458 next (False,Just 0,ss1,sv1,ss2,sv2)
459 = return $ Skip (True,Nothing,ss1,sv1,ss2,sv2)
460
461 next (False,Just n,ss1,sv1,ss2,sv2) =
462 do
463 r <- nextv2 sv2
464 case r of
465 Done -> return Done -- NEVER ENTERED (See Note)
466 Skip sv2' -> return $ Skip (False,Just n,ss1,sv1,ss2,sv2')
467 Yield x sv2' -> return $ Yield x (False,Just (n-1),ss1,sv1,ss2,sv2')
468
469
470 -- | Segmented Stream fold, with a fixed segment length.
471 --
472 -- Like `foldSS` but use a fixed length for each segment.
473 --
474 foldValuesR
475 :: (a -> b -> a) -- ^ function to perform the fold
476 -> a -- ^ initial element for fold
477 -> Int -- ^ length of each segment
478 -> S.Stream b -- ^ data stream
479 -> S.Stream a
480
481 {-# INLINE_STREAM foldValuesR #-}
482 foldValuesR f z segSize (Stream nextv vs nv) =
483 Stream next (segSize,z,vs) (nv `divSize` segSize)
484 where
485 {-# INLINE next #-}
486 next (0,x,vs) = return $ Yield x (segSize,z,vs)
487
488 next (n,x,vs) =
489 do
490 r <- nextv vs
491 case r of
492 Done -> return Done
493 Skip vs' -> return $ Skip (n,x,vs')
494 Yield y vs' -> let r = f x y
495 in r `seq` return (Skip ((n-1),r,vs'))
496
497
498 -- | Divide a size hint by a scalar.
499 divSize :: Size -> Int -> Size
500 divSize (Exact n) k = Exact (n `div` k)
501 divSize (Max n) k = Max (n `div` k)
502 divSize Unknown _ = Unknown
503
504
505 -- | Segmented Stream indices.
506 --
507 -- @
508 -- indicesSS 15 4 [3, 5, 7]
509 -- = [4,5,6,0,1,2,3,4,0,1,2,3,4,5,6]
510 -- @
511 --
512 -- TODO: Is that correct? Why does the first segment in the result start from 4,
513 -- unlike the others?
514 --
515 indicesSS
516 :: Int
517 -> Int
518 -> S.Stream Int
519 -> S.Stream Int
520
521 {-# INLINE_STREAM indicesSS #-}
522 indicesSS n i (Stream next s _) =
523 Stream next' (i,Nothing,s) (Exact n)
524 where
525 {-# INLINE next' #-}
526 next' (i,Nothing,s) =
527 do
528 r <- next s
529 case r of
530 Done -> return Done
531 Skip s' -> return $ Skip (i,Nothing,s')
532 Yield k s' -> return $ Skip (i,Just k,s')
533
534 next' (i,Just k,s)
535 | k > 0 = return $ Yield i (i+1,Just (k-1),s)
536 | otherwise = return $ Skip (0 ,Nothing ,s)
537