dffdb3f30129ac5ad875db6ce0fb8d3d32a2d8a8
[packages/dph.git] / dph-prim-par / Data / Array / Parallel / Unlifted / Parallel / Segmented.hs
1 {-# LANGUAGE CPP #-}
2 #include "fusion-phases.h"
3
4 -- | Parallel combinators for segmented unboxed arrays
5 module Data.Array.Parallel.Unlifted.Parallel.Segmented
6 ( replicateRSUP
7 , appendSUP
8 , appendSUP_old
9 , appendSUPV
10 , foldRUP
11 , sumRUP)
12 where
13 import Data.Array.Parallel.Unlifted.Distributed
14 import Data.Array.Parallel.Unlifted.Parallel.Basics
15 import Data.Array.Parallel.Unlifted.Parallel.UPSegd (UPSegd)
16 import Data.Array.Parallel.Unlifted.Sequential.USegd (USegd)
17 import Data.Array.Parallel.Unlifted.Sequential.Vector as Seq
18 import qualified Data.Array.Parallel.Unlifted.Vectors as Vs
19 import qualified Data.Array.Parallel.Unlifted.Parallel.UPSegd as UPSegd
20 import qualified Data.Array.Parallel.Unlifted.Sequential as Seq
21 import qualified Data.Array.Parallel.Unlifted.Sequential.USegd as USegd
22 import qualified Data.Array.Parallel.Unlifted.Sequential.USSegd as USSegd
23 import qualified Data.Array.Parallel.Unlifted.Parallel.UPVSegd as UPVSegd
24 import qualified Data.Array.Parallel.Unlifted.Parallel.UPSSegd as UPSSegd
25 import Data.Vector.Fusion.Stream.Monadic ( Stream(..), Step(..) )
26 import Data.Vector.Fusion.Stream.Size ( Size(..) )
27 import qualified Data.Vector.Fusion.Stream as S
28
29 import GHC.Exts -- for unboxed primops
30
31 here :: String -> String
32 here s = "Data.Array.Parallel.Unlifted.Parallel.Segmented." Prelude.++ s
33
34
35 -- replicate ------------------------------------------------------------------
36 -- | Segmented replication.
37 -- Each element in the vector is replicated the given number of times.
38 --
39 -- @replicateRSUP 2 [1, 2, 3, 4, 5] = [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]@
40 --
41
42 -- TODO: make this efficient
43 replicateRSUP :: Unbox a => Int -> Vector a -> Vector a
44 replicateRSUP n xs
45 = UPSegd.replicateWithP (UPSegd.fromLengths (replicateUP (Seq.length xs) n)) xs
46 {-# INLINE_UP replicateRSUP #-}
47
48
49 -- Append ---------------------------------------------------------------------
50 -- | Segmented append.
51 appendSUP
52 :: Unbox a
53 => UPSegd -- ^ segment descriptor of result
54 -> UPSegd -- ^ left-hand segd
55 -> Vector a -- ^ left-hand data
56 -> UPSegd -- ^ right-hand segd
57 -> Vector a -- ^ right-hand data
58 -> Vector a
59
60 appendSUP segd !xd !xs !yd !ys
61 = joinD theGang balanced
62 . mapD theGang append
63 $ UPSegd.takeDistributed segd
64 where append ((segd',seg_off),el_off)
65 = Seq.unstream
66 $ appendSegS (UPSegd.takeUSegd xd) xs
67 (UPSegd.takeUSegd yd) ys
68 (USegd.takeElements segd')
69 seg_off el_off
70 {-# INLINE_UP appendSUP #-}
71
72
73 -- append ---------------------------------------------------------------------
74 appendSegS
75 :: Unbox a
76 => USegd -- ^ Segment descriptor of first array.
77 -> Vector a -- ^ Data of first array
78 -> USegd -- ^ Segment descriptor of second array.
79 -> Vector a -- ^ Data of second array.
80 -> Int -- ^ How many elements to return
81 -> Int -- ^ Segment offset
82 -> Int -- ^ Element offset
83 -> S.Stream a
84
85 appendSegS !xd !xs !yd !ys !n seg_off el_off
86 = Stream next state (Exact n)
87 where
88 !xlens = USegd.takeLengths xd
89 !ylens = USegd.takeLengths yd
90
91 -- Two index functions because of monomorphism restriction
92 {-# INLINE index1 #-}
93 index1 = Seq.index (here "appendSegS")
94
95 {-# INLINE index2 #-}
96 index2 = Seq.index (here "appendSegS")
97
98 {-# INLINE unbox #-}
99 unbox (I# i) = i
100
101 state
102 -- Nothing to return
103 | n == 0 = ASDo
104 { as_takefrom = 0#
105 , as_seg_off = 0#
106 , as_xs_index = 0#
107 , as_ys_index = 0#
108 , as_next_swap= 0#
109 , as_remain = 0# }
110
111 -- Start returning data from xs
112 | el_off < xlens `index1` seg_off
113 = let xi = (USegd.takeIndices xd `index1` seg_off) + el_off
114 yi = USegd.takeIndices yd `index1` seg_off
115 swap = (USegd.takeLengths xd `index1` seg_off) - el_off
116 in ASDo
117 -- start reading from xs, then read from ys at end of this xs segment
118 { as_takefrom = 0#
119 , as_seg_off = unbox seg_off
120 , as_xs_index = unbox xi
121 , as_ys_index = unbox yi
122 , as_next_swap= unbox swap
123 , as_remain = unbox n }
124
125 -- Start with ys
126 | otherwise
127 = let -- NOTE: *not* indicesUSegd xd ! (seg_off+1) since seg_off+1
128 -- might be out of bounds
129 xi = (USegd.takeIndices xd `index1` seg_off) + (USegd.takeLengths xd `index1` seg_off)
130 el_off' = el_off - USegd.takeLengths xd `index1` seg_off
131 yi = (USegd.takeIndices yd `index1` seg_off) + el_off'
132 swap = (USegd.takeLengths yd `index1` seg_off) - el_off'
133 in ASDo
134 { as_takefrom = 1#
135 , as_seg_off = unbox seg_off
136 , as_xs_index = unbox xi
137 , as_ys_index = unbox yi
138 , as_next_swap= unbox swap
139 , as_remain = unbox n }
140
141 {-# INLINE next #-}
142 next ASDo{as_remain=0#} = return Done
143
144 -- Reading from xs
145 next s@ASDo{as_takefrom=0#}
146 -- Done reading xs, so read the rest of this segment from ys.
147 | as_next_swap s ==# 0#
148 = return $ Skip (s{as_takefrom=1#, as_next_swap= unbox (ylens `index1` I# (as_seg_off s))})
149
150 -- Grab a value from xs
151 | otherwise = return $ Yield (xs `index2` I# (as_xs_index s)) (inc s)
152
153 -- Reading from ys; takefrom nonzero
154 next s
155 -- Done reading ys, so we need to look at the next segment's xs
156 | as_next_swap s ==# 0#
157 = let seg' = as_seg_off s +# 1#
158 in return $ Skip (s {as_takefrom=0#, as_seg_off=seg', as_next_swap= unbox (xlens `index1` I# seg')})
159
160 -- Grab a value from ys
161 | otherwise = return $ Yield (ys `index2` I# (as_ys_index s)) (inc s)
162
163 {-# INLINE inc #-}
164 -- Move data pointer forward, and decrease remaining and swap
165 inc s@ASDo{as_takefrom=0#, as_xs_index=xi, as_next_swap=swap, as_remain=n'}
166 = s{as_xs_index=xi +# 1#, as_next_swap=swap -# 1#, as_remain=n' -# 1#}
167
168 -- Takefrom is nonzero: reading from ys
169 inc s@ASDo{as_ys_index=yi, as_next_swap=swap, as_remain=n'}
170 = s{as_ys_index=yi +# 1#, as_next_swap=swap -# 1#, as_remain=n' -# 1#}
171 {-# INLINE_STREAM appendSegS #-}
172
173 data AppendState
174 = ASDo
175 { as_takefrom :: Int# -- ^ 0 = xs, nonzero = ys
176 , as_seg_off :: Int# -- ^ current segment
177 , as_xs_index :: Int# -- ^ pointer into xs data
178 , as_ys_index :: Int# -- ^ pointer into ys data
179 , as_next_swap:: Int# -- ^ toggle takefrom in this many elements
180 , as_remain :: Int# -- ^ how many left
181 }
182
183 -- virtual scattered append
184 appendSUPV
185 :: (Vs.Unboxes a, Unbox a)
186 => UPSegd -- ^ segment descriptor of result
187 -> UPVSegd.UPVSegd -- ^ left-hand segd
188 -> Vs.Vectors a -- ^ left-hand data
189 -> UPVSegd.UPVSegd -- ^ right-hand segd
190 -> Vs.Vectors a -- ^ right-hand data
191 -> Vector a
192
193 appendSUPV segd !xd !xs !yd !ys
194 = joinD theGang balanced
195 . mapD theGang append
196 $ UPSegd.takeDistributed segd
197 where append ((segd',seg_off),el_off)
198 = Seq.unstream
199 $ appendUPVSegS xd xs
200 yd ys
201 (USegd.takeElements segd')
202 seg_off el_off
203 {-# INLINE_UP appendSUPV #-}
204 appendUPVSegS
205 :: Vs.Unboxes a
206 => UPVSegd.UPVSegd -- ^ Segment descriptor of first array.
207 -> Vs.Vectors a -- ^ Data of first array
208 -> UPVSegd.UPVSegd -- ^ Segment descriptor of second array.
209 -> Vs.Vectors a -- ^ Data of second array.
210 -> Int -- ^ How many elements to return
211 -> Int -- ^ Segment offset
212 -> Int -- ^ Element offset
213 -> S.Stream a
214
215 appendUPVSegS !xd !xs !yd !ys !n seg_off el_off
216 = Stream next state (Exact n)
217 where
218 !xvsegs= UPVSegd.takeVSegidsRedundant xd
219 !yvsegs= UPVSegd.takeVSegidsRedundant yd
220
221 !xssegd= UPSSegd.takeUSSegd $ UPVSegd.takeUPSSegdRedundant xd
222 !yssegd= UPSSegd.takeUSSegd $ UPVSegd.takeUPSSegdRedundant yd
223
224 !xsegd = USSegd.takeUSegd xssegd
225 !ysegd = USSegd.takeUSegd yssegd
226
227 -- get physical segment id
228 {-#INLINE xpseg #-}
229 xpseg s = index1 xvsegs "xpseg" s
230 {-#INLINE ypseg #-}
231 ypseg s = index1 yvsegs "ypseg" s
232
233 !xseglens = USegd.takeLengths xsegd
234 !yseglens = USegd.takeLengths ysegd
235
236 !xsrc = USSegd.takeSources xssegd
237 !ysrc = USSegd.takeSources yssegd
238
239 !xstrt = USSegd.takeStarts xssegd
240 !ystrt = USSegd.takeStarts yssegd
241
242 -- physical lengths
243 {-#INLINE xplen #-}
244 xplen s = index1 xseglens "xplen1" (xpseg s)
245 {-#INLINE yplen #-}
246 yplen s = index1 yseglens "yplen1" (ypseg s)
247
248 -- get actual data
249 {-# INLINE gdata #-}
250 gdata gs st
251 = let !src = avs_ssrc st
252 !strt = avs_sstart st
253 !ix = avs_index st
254 in index2 gs (I# src) (I# (strt +# ix))
255
256 -- get scattered segment source and starts
257 {-# INLINE getscatter #-}
258 getscatter gpseg gsrcs gstrts segid
259 = let !phys = gpseg segid in
260 let !src = index1 gsrcs "src" phys in
261 let !strt = index1 gstrts "strt" phys in
262 (src, strt)
263
264 {-# INLINE index1 #-}
265 --index1 v i = Seq.index (here "appendUVSegS") v i
266
267 index1 v h i = Seq.index (here $ "appendUVSegS:" Prelude.++ h) v i
268
269 {-# INLINE index2 #-}
270 index2 v i1 i2 = Vs.index2 (here "appendUVSegS") v i1 i2
271
272
273 {-# INLINE unbox #-}
274 unbox (I# i) = i
275
276 state
277 -- Nothing to return
278 | n == 0 = ASUPVDo
279 { avs_takefrom = 0#
280 , avs_seg_off = 0#
281 , avs_index = 0#
282 , avs_next_swap= 0#
283 , avs_remain = 0#
284 , avs_sstart = 0#
285 , avs_ssrc = 0# }
286
287 -- Start returning data from xs
288 | el_off < xplen seg_off
289 = let (src,strt) = getscatter xpseg xsrc xstrt seg_off
290 swap = (xplen seg_off) - el_off
291 in ASUPVDo
292 -- start reading from xs, then read from ys at end of this xs segment
293 { avs_takefrom = 0#
294 , avs_seg_off = unbox seg_off
295 , avs_index = unbox el_off
296 , avs_next_swap= unbox swap
297 , avs_remain = unbox n
298 , avs_sstart = unbox strt
299 , avs_ssrc = unbox src }
300
301 -- Start with ys
302 | otherwise
303 = let (src,strt) = getscatter ypseg ysrc ystrt seg_off
304 el_off' = el_off - xplen seg_off
305 swap = (yplen seg_off) - el_off'
306 in ASUPVDo
307 { avs_takefrom = 1#
308 , avs_seg_off = unbox seg_off
309 , avs_index = unbox el_off'
310 , avs_next_swap= unbox swap
311 , avs_remain = unbox n
312 , avs_sstart = unbox strt
313 , avs_ssrc = unbox src }
314
315 {-# INLINE next #-}
316 next ASUPVDo{avs_remain=0#} = return Done
317
318 -- Reading from xs
319 next s@ASUPVDo{avs_takefrom=0#}
320 -- Done reading xs, so read the rest of this segment from ys.
321 | avs_next_swap s ==# 0# =
322 let seg' = I# (avs_seg_off s)
323 (src,strt) = getscatter ypseg ysrc ystrt seg'
324 in return $ Skip $
325 s {
326 avs_takefrom = 1#
327 , avs_index = 0#
328 , avs_next_swap = unbox (yplen seg')
329 , avs_sstart = unbox strt
330 , avs_ssrc = unbox src }
331 -- Grab a value from xs
332 | otherwise = return $ Yield (gdata xs s) (inc s)
333
334 -- Reading from ys, so avs_takefrom=1#
335 next s
336 -- Done reading ys, so we need to look at the next segment's xs
337 | avs_next_swap s ==# 0#
338 = let seg' = I# (avs_seg_off s +# 1#)
339 (src,strt) = getscatter xpseg xsrc xstrt seg'
340 in return $ Skip $
341 s {
342 avs_takefrom = 0#
343 , avs_seg_off = unbox seg'
344 , avs_index = 0#
345 , avs_next_swap = unbox (xplen seg')
346 , avs_sstart = unbox strt
347 , avs_ssrc = unbox src }
348
349 -- Grab a value from ys
350 | otherwise = return $ Yield (gdata ys s) (inc s)
351
352 {-# INLINE inc #-}
353 -- Move data pointer forward and decrease remaining and swap
354 inc s@ASUPVDo{avs_index=ix, avs_next_swap=swap, avs_remain=n'}
355 = s{avs_index=ix +# 1#, avs_next_swap=swap -# 1#, avs_remain=n' -# 1#}
356 {-# INLINE_STREAM appendUPVSegS #-}
357
358 data AppendUPVState
359 = ASUPVDo
360 { avs_takefrom :: Int# -- ^ 0 = xs, 1/else = ys
361 , avs_seg_off :: Int# -- ^ current segment
362 , avs_index :: Int# -- ^ index into current segment
363 , avs_next_swap:: Int# -- ^ toggle takefrom in this many elements
364 , avs_remain :: Int# -- ^ how many left
365 , avs_sstart :: Int# -- ^ scattered segment start
366 , avs_ssrc :: Int# -- ^ scattered segment source
367 }
368
369 -- Append ---------------------------------------------------------------------
370 -- | Segmented append.
371 -- -old
372 appendSUP_old
373 :: Unbox a
374 => UPSegd
375 -> UPSegd -> Vector a
376 -> UPSegd -> Vector a
377 -> Vector a
378
379 appendSUP_old segd !xd !xs !yd !ys
380 = joinD theGang balanced
381 . mapD theGang append
382 $ UPSegd.takeDistributed segd
383 where append ((segd',seg_off),el_off)
384 = Seq.unstream
385 $ appendSegS_old (UPSegd.takeUSegd xd) xs
386 (UPSegd.takeUSegd yd) ys
387 (USegd.takeElements segd')
388 seg_off el_off
389 {-# INLINE_UP appendSUP_old #-}
390
391
392 -- append ---------------------------------------------------------------------
393 appendSegS_old
394 :: Unbox a
395 => USegd -- ^ Segment descriptor of first array.
396 -> Vector a -- ^ Data of first array
397 -> USegd -- ^ Segment descriptor of second array.
398 -> Vector a -- ^ Data of second array.
399 -> Int
400 -> Int
401 -> Int
402 -> S.Stream a
403
404 appendSegS_old !xd !xs !yd !ys !n seg_off el_off
405 = Stream next state (Exact n)
406 where
407 !xlens = USegd.takeLengths xd
408 !ylens = USegd.takeLengths yd
409
410 {-# INLINE index1 #-}
411 index1 = Seq.index (here "appendSegS")
412
413 {-# INLINE index2 #-}
414 index2 = Seq.index (here "appendSegS")
415
416 state
417 | n == 0 = Nothing
418 | el_off < xlens `index1` seg_off
419 = let i = (USegd.takeIndices xd `index1` seg_off) + el_off
420 j = USegd.takeIndices yd `index1` seg_off
421 k = (USegd.takeLengths xd `index1` seg_off) - el_off
422 in Just (False, seg_off, i, j, k, n)
423
424 | otherwise
425 = let -- NOTE: *not* indicesUSegd xd ! (seg_off+1) since seg_off+1
426 -- might be out of bounds
427 i = (USegd.takeIndices xd `index1` seg_off) + (USegd.takeLengths xd `index1` seg_off)
428 el_off' = el_off - USegd.takeLengths xd `index1` seg_off
429 j = (USegd.takeIndices yd `index1` seg_off) + el_off'
430 k = (USegd.takeLengths yd `index1` seg_off) - el_off'
431 in Just (True, seg_off, i, j, k, n)
432
433 {-# INLINE next #-}
434 next Nothing = return Done
435
436 next (Just (False, seg, i, j, k, n'))
437 | n' == 0 = return Done
438 | k == 0 = return $ Skip (Just (True, seg, i, j, ylens `index1` seg, n'))
439 | otherwise = return $ Yield (xs `index2` i) (Just (False, seg, i+1, j, k-1, n'-1))
440
441 next (Just (True, seg, i, j, k, n'))
442 | n' == 0 = return Done
443 | k == 0
444 = let !seg' = seg+1
445 in return $ Skip (Just (False, seg', i, j, xlens `index1` seg', n'))
446
447 | otherwise = return $ Yield (ys `index2` j) (Just (True, seg, i, j+1, k-1, n'-1))
448 {-# INLINE_STREAM appendSegS_old #-}
449
450
451 -- foldR ----------------------------------------------------------------------
452 -- | Regular segmented fold.
453 foldRUP :: (Unbox a, Unbox b) => (b -> a -> b) -> b -> Int -> Vector a -> Vector b
454 foldRUP f z !segSize xs
455 = joinD theGang unbalanced
456 (mapD theGang
457 (Seq.foldlRU f z segSize)
458 (splitAsD theGang (mapD theGang (*segSize) dlen) xs))
459 where
460 noOfSegs = Seq.length xs `div` segSize
461 dlen = splitLenD theGang noOfSegs
462 {-# INLINE_UP foldRUP #-}
463
464
465 -- sumR -----------------------------------------------------------------------
466 -- | Regular segmented sum.
467 sumRUP :: (Num e, Unbox e) => Int -> Vector e -> Vector e
468 sumRUP = foldRUP (+) 0
469 {-# INLINE_UP sumRUP #-}
470
471