Faster stable sort (#492)
authorDonnacha Oisín Kidney <oisdk@users.noreply.github.com>
Sat, 20 Jan 2018 20:56:57 +0000 (20:56 +0000)
committerDavid Feuer <David.Feuer@gmail.com>
Sat, 20 Jan 2018 20:56:57 +0000 (15:56 -0500)
Use a new, faster implementation for stable sort

The old stable sort simply called Data.List.sort: here, we construct a pairing heap, but tag each element with its original position in the sequence. Then we perform heap sort, the same sort as used in `unstableSort`.

Data/Sequence/Internal.hs
Data/Sequence/sorting.md

index f80c165..e58ae2b 100644 (file)
@@ -4476,10 +4476,6 @@ zipWith4 f s1 s2 s3 s4 = zipWith' ($) (zipWith3' f s1' s2' s3') s4'
 ------------------------------------------------------------------------
 -- Sorting
 --
--- sort and sortBy are implemented by simple deforestations of
---      \ xs -> fromList2 (length xs) . Data.List.sortBy cmp . toList
--- which does not get deforested automatically, it would appear.
---
 -- Unstable sorting is performed by a heap sort implementation based on
 -- pairing heaps.  Because the internal structure of sequences is quite
 -- varied, it is difficult to get blocks of elements of roughly the same
@@ -4549,23 +4545,66 @@ zipWith4 f s1 s2 s3 s4 = zipWith' ($) (zipWith3' f s1' s2' s3') s4'
 --
 -- mail@doisinkidney.com, 4/30/17
 ------------------------------------------------------------------------
+-- The sort and sortBy functions are implemented by tagging each element
+-- in the input sequence with its position, and using that to
+-- discriminate between elements which are equivalent according to the
+-- comparator. This makes the sort stable.
+--
+-- The algorithm is effectively the same as the unstable sorts, except
+-- the queue is constructed while giving each element a tag.
+--
+-- It's quicker than the old implementation (which used Data.List.sort)
+-- in the general case (all times are on sequences of length 50000):
+--
+-- Times (ms)            min    est    max  std dev   r²
+-- to/from list:        64.23  64.50  64.81  0.432  1.000
+-- 1/11/18 stable heap: 38.87  39.40  40.09  0.457  0.999
+--
+-- Slightly slower in the case of already sorted lists:
+--
+-- Times (ms)            min    est    max  std dev   r²
+-- to/from list:        6.806  6.861  6.901  0.234  1.000
+-- 1/11/18 stable heap: 8.211  8.268  8.328  0.111  1.000
+--
+-- And quicker in the case of lists sorted in reverse:
+--
+-- Times (ms)            min    est    max  std dev   r²
+-- to/from list:        26.79  28.34  30.55  1.219  0.988
+-- 1/11/18 stable heap: 9.405  10.13  10.91  0.670  0.977
+--
+-- Interestingly, the stable sort is now competitive with the unstable:
+--
+-- Times (ms)            min    est    max  std dev   r²
+-- unstable:            34.71  35.10  35.38  0.845  1.000
+-- stable:              38.84  39.22  39.59  0.564  0.999
+--
+-- And even beats it in the case of already-sorted lists:
+--
+-- Times (ms)            min    est    max  std dev   r²
+-- unstable:            8.457  8.499  8.536  0.069  1.000
+-- stable:              8.160  8.230  8.334  0.158  0.999
+--
+-- mail@doisinkidney.com, 1/11/18
+------------------------------------------------------------------------
 -- Further notes are available in the file sorting.md (in this
 -- directory).
 ------------------------------------------------------------------------
 
 -- | \( O(n \log n) \).  'sort' sorts the specified 'Seq' by the natural
--- ordering of its elements.  The sort is stable.
--- If stability is not required, 'unstableSort' can be considerably
--- faster, and in particular uses less memory.
+-- ordering of its elements.  The sort is stable.  If stability is not
+-- required, 'unstableSort' can be slightly faster.
 sort :: Ord a => Seq a -> Seq a
 sort = sortBy compare
 
 -- | \( O(n \log n) \).  'sortBy' sorts the specified 'Seq' according to the
--- specified comparator.  The sort is stable.
--- If stability is not required, 'unstableSortBy' can be considerably
--- faster, and in particular uses less memory.
+-- specified comparator.  The sort is stable.  If stability is not required,
+-- 'unstableSortBy' can be slightly faster.
 sortBy :: (a -> a -> Ordering) -> Seq a -> Seq a
-sortBy cmp xs = fromList2 (length xs) (Data.List.sortBy cmp (toList xs))
+sortBy cmp (Seq xs) =
+    maybe
+        (Seq EmptyT)
+        (execState (replicateA (size xs) (popMinS cmp)))
+        (toPQS cmp (Seq xs))
 
 -- | \( O(n \log n) \).  'unstableSort' sorts the specified 'Seq' by
 -- the natural ordering of its elements, but the sort is not stable.
@@ -4582,8 +4621,10 @@ unstableSort = unstableSortBy compare
 -- uses less memory than 'sortBy'.
 unstableSortBy :: (a -> a -> Ordering) -> Seq a -> Seq a
 unstableSortBy cmp (Seq xs) =
-    maybe (Seq EmptyT) (execState (replicateA (size xs) (popMin cmp))) $
-    toPQ cmp (\(Elem x) -> PQueue x Nil) xs
+    maybe
+        (Seq EmptyT)
+        (execState (replicateA (size xs) (popMin cmp)))
+        (toPQ cmp (Seq xs))
 
 -- | fromList2, given a list and its length, constructs a completely
 -- balanced Seq whose elements are that list using the replicateA
@@ -4641,24 +4682,121 @@ popMin cmp = State unrollPQ'
     (<+>) = mergePQ cmp
 
 -- | 'toPQ', given an ordering function and a mechanism for queueifying
--- elements, converts a 'FingerTree' to a 'PQueue'.
-toPQ :: (e -> e -> Ordering) -> (a -> PQueue e) -> FingerTree a -> Maybe (PQueue e)
-toPQ _ _ EmptyT = Nothing
-toPQ _ f (Single x) = Just (f x)
-toPQ cmp f (Deep _ pr m sf) = Just (maybe (pr' <+> sf') ((pr' <+> sf') <+>) (toPQ cmp fNode m))
+-- elements, converts a 'Seq' to a 'PQueue'.
+toPQ :: (e -> e -> Ordering) -> Seq e -> Maybe (PQueue e)
+toPQ cmp' (Seq xs') = toPQTree cmp' (\(Elem a) -> PQueue a Nil) xs'
   where
-    fDigit digit = case fmap f digit of
-        One a           -> a
-        Two a b         -> a <+> b
-        Three a b c     -> a <+> b <+> c
-        Four a b c d    -> (a <+> b) <+> (c <+> d)
-    (<+>) = mergePQ cmp
-    fNode = fDigit . nodeToDigit
-    pr' = fDigit pr
-    sf' = fDigit sf
+    toPQTree :: (b -> b -> Ordering) -> (a -> PQueue b) -> FingerTree a -> Maybe (PQueue b)
+    toPQTree _ _ EmptyT = Nothing
+    toPQTree _ f (Single xs) = Just (f xs)
+    toPQTree cmp f (Deep _ pr m sf) =
+        Just (maybe (pr' <+> sf') ((pr' <+> sf') <+>) m')
+      where
+        pr' = toPQDigit cmp f pr
+        sf' = toPQDigit cmp f sf
+        m' = toPQTree cmp (toPQNode cmp f) m
+        (<+>) = mergePQ cmp
+    toPQDigit :: (b -> b -> Ordering) -> (a -> PQueue b) -> Digit a -> PQueue b
+    toPQDigit cmp f dig =
+        case dig of
+            One a -> f a
+            Two a b -> f a <+> f b
+            Three a b c -> f a <+> f b <+> f c
+            Four a b c d -> (f a <+> f b) <+> (f c <+> f d)
+      where
+        (<+>) = mergePQ cmp
+    toPQNode :: (b -> b -> Ordering) -> (a -> PQueue b) -> Node a -> PQueue b
+    toPQNode cmp f node =
+        case node of
+            Node2 _ a b -> f a <+> f b
+            Node3 _ a b c -> f a <+> f b <+> f c
+      where
+        (<+>) = mergePQ cmp
 
 -- | 'mergePQ' merges two 'PQueue's.
 mergePQ :: (a -> a -> Ordering) -> PQueue a -> PQueue a -> PQueue a
 mergePQ cmp q1@(PQueue x1 ts1) q2@(PQueue x2 ts2)
   | cmp x1 x2 == GT     = PQueue x2 (q1 :& ts2)
   | otherwise           = PQueue x1 (q2 :& ts1)
+
+-- | A pairing heap tagged with the original position of elements,
+-- to allow for stable sorting.
+data PQS e = PQS {-# UNPACK #-} !Int e (PQSL e)
+data PQSL e = Nl | {-# UNPACK #-} !(PQS e) :&& PQSL e
+
+infixr 8 :&&
+
+-- | 'popMinS', given an ordering function, constructs a stateful action
+-- which pops the smallest elements from a queue. This action will fail
+-- on empty queues.
+popMinS :: (e -> e -> Ordering) -> State (PQS e) e
+popMinS cmp = State unrollPQ'
+  where
+    {-# INLINE unrollPQ' #-}
+    unrollPQ' (PQS _ x ts) = (mergePQs ts, x)
+    mergePQs (t :&& Nl) = t
+    mergePQs (t1 :&& t2 :&& Nl) = t1 <+> t2
+    mergePQs (t1 :&& t2 :&& ts) = (t1 <+> t2) <+> mergePQs ts
+    mergePQs Nl = error "popMin: tried to pop from empty queue"
+    (<+>) = mergePQS cmp
+
+-- | 'toPQS', given an ordering function, converts a 'Seq' to a
+-- 'PQS'.
+toPQS :: (e -> e -> Ordering) -> Seq e -> Maybe (PQS e)
+toPQS cmp' (Seq xs') = toPQSTree cmp' (\s (Elem a) -> PQS s a Nl) 0 xs'
+  where
+    {-# SPECIALISE toPQSTree :: (b -> b -> Ordering) -> (Int -> Elem y -> PQS b) -> Int -> FingerTree (Elem y) -> Maybe (PQS b) #-}
+    {-# SPECIALISE toPQSTree :: (b -> b -> Ordering) -> (Int -> Node y -> PQS b) -> Int -> FingerTree (Node y) -> Maybe (PQS b) #-}
+    toPQSTree :: Sized a => (b -> b -> Ordering) -> (Int -> a -> PQS b) -> Int -> FingerTree a -> Maybe (PQS b)
+    toPQSTree _ _ !_s EmptyT = Nothing
+    toPQSTree _ f s (Single xs) = Just (f s xs)
+    toPQSTree cmp f s (Deep _ pr m sf) =
+        Just (maybe (pr' <+> sf') ((pr' <+> sf') <+>) m')
+      where
+        pr' = toPQSDigit cmp f s pr
+        sf' = toPQSDigit cmp f sPsprm sf
+        m' = toPQSTree cmp (toPQSNode cmp f) sPspr m
+        !sPspr = s + size pr
+        !sPsprm = sPspr + size m
+        (<+>) = mergePQS cmp
+    {-# SPECIALISE toPQSDigit :: (b -> b -> Ordering) -> (Int -> Elem y -> PQS b) -> Int -> Digit (Elem y) -> PQS b #-}
+    {-# SPECIALISE toPQSDigit :: (b -> b -> Ordering) -> (Int -> Node y -> PQS b) -> Int -> Digit (Node y) -> PQS b #-}
+    toPQSDigit :: Sized a => (b -> b -> Ordering) -> (Int -> a -> PQS b) -> Int -> Digit a -> PQS b
+    toPQSDigit _ f !s (One a) = f s a
+    toPQSDigit cmp f s (Two a b) = f s a <+> f sPsa b
+      where
+        !sPsa = s + size a
+        (<+>) = mergePQS cmp
+    toPQSDigit cmp f s (Three a b c) = f s a <+> f sPsa b <+> f sPsab c
+      where
+        !sPsa = s + size a
+        !sPsab = sPsa + size b
+        (<+>) = mergePQS cmp
+    toPQSDigit cmp f s (Four a b c d) =
+        (f s a <+> f sPsa b) <+> (f sPsab c <+> f sPsabc d)
+      where
+        !sPsa = s + size a
+        !sPsab = sPsa + size b
+        !sPsabc = sPsab + size c
+        (<+>) = mergePQS cmp
+    {-# SPECIALISE toPQSNode :: (b -> b -> Ordering) -> (Int -> Elem y -> PQS b) -> Int -> Node (Elem y) -> PQS b #-}
+    {-# SPECIALISE toPQSNode :: (b -> b -> Ordering) -> (Int -> Node y -> PQS b) -> Int -> Node (Node y) -> PQS b #-}
+    toPQSNode :: Sized a => (b -> b -> Ordering) -> (Int -> a -> PQS b) -> Int -> Node a -> PQS b
+    toPQSNode cmp f s (Node2 _ a b) = f s a <+> f sPsa b
+      where
+        !sPsa = s + size a
+        (<+>) = mergePQS cmp
+    toPQSNode cmp f s (Node3 _ a b c) = f s a <+> f sPsa b <+> f sPsab c
+      where
+        !sPsa = s + size a
+        !sPsab = sPsa + size b
+        (<+>) = mergePQS cmp
+
+-- | 'mergePQS' merges two PQS, taking into account the original
+-- position of the elements.
+mergePQS :: (a -> a -> Ordering) -> PQS a -> PQS a -> PQS a
+mergePQS cmp q1@(PQS i1 x1 ts1) q2@(PQS i2 x2 ts2) =
+    case cmp x1 x2 of
+        LT -> PQS i1 x1 (q2 :&& ts1)
+        EQ | i1 <= i2 -> PQS i1 x1 (q2 :&& ts1)
+        _ -> PQS i2 x2 (q1 :&& ts2)
index 86f53a2..d493e40 100644 (file)
@@ -98,3 +98,11 @@ mean                 238.8 ms   (231.3 ms .. 241.4 ms)
 std dev              5.006 ms   (269.0 μs .. 6.151 ms)
 variance introduced by outliers: 16% (moderately inflated)
 ```
+
+## Stable Sorting
+
+Stable sorting was previously accomplished by converting to a list, applying Data.List.sort, and rebuilding the sequence. Data.List.sort is designed to maximize laziness, which doesn't apply for Data.Sequence, and it can't take advantage of the structure of the finger tree. As a result, simply tagging each element with its position, then applying the unstable sort (using the tag to discriminate between elements for which the comparator is equal) is faster. The current implementation doesn't use the actual `unstableSort`: to perform the building of the queue and tagging in one pass, a specialized version is used.
+
+Times (ms)            min    est    max  std dev   r²
+to/from list:        64.23  64.50  64.81  0.432  1.000
+1/11/18 stable heap: 38.87  39.40  40.09  0.457  0.999