diff --git a/core/src/main/scala/scala/collection/parallel/ParIterableLike.scala b/core/src/main/scala/scala/collection/parallel/ParIterableLike.scala index 4d739537..343fc0e4 100644 --- a/core/src/main/scala/scala/collection/parallel/ParIterableLike.scala +++ b/core/src/main/scala/scala/collection/parallel/ParIterableLike.scala @@ -359,7 +359,7 @@ self => * if this $coll is empty. */ def reduce[U >: T](op: (U, U) => U): U = { - tasksupport.executeAndWaitResult(new Reduce(op, splitter) mapResult { _.get }) + tasksupport.executeAndWaitResult(new Reduce(op, splitter)).get } /** Optionally reduces the elements of this sequence using the specified associative binary operator. @@ -464,11 +464,11 @@ self => } def min[U >: T](implicit ord: Ordering[U]): T = { - tasksupport.executeAndWaitResult(new Min(ord, splitter) mapResult { _.get }).asInstanceOf[T] + tasksupport.executeAndWaitResult(new Min(ord, splitter)).get.asInstanceOf[T] } def max[U >: T](implicit ord: Ordering[U]): T = { - tasksupport.executeAndWaitResult(new Max(ord, splitter) mapResult { _.get }).asInstanceOf[T] + tasksupport.executeAndWaitResult(new Max(ord, splitter)).get.asInstanceOf[T] } def maxBy[S](f: T => S)(implicit cmp: Ordering[S]): T = { @@ -484,15 +484,15 @@ self => } def map[S](f: T => S): CC[S] = { - tasksupport.executeAndWaitResult(new Map[S, CC[S]](f, combinerFactory(() => companion.newCombiner[S]), splitter) mapResult { _.resultWithTaskSupport }) + tasksupport.executeAndWaitResult(new Map[S, CC[S]](f, combinerFactory(() => companion.newCombiner[S]), splitter)).resultWithTaskSupport } def collect[S](pf: PartialFunction[T, S]): CC[S] = { - tasksupport.executeAndWaitResult(new Collect[S, CC[S]](pf, combinerFactory(() => companion.newCombiner[S]), splitter) mapResult { _.resultWithTaskSupport }) + tasksupport.executeAndWaitResult(new Collect[S, CC[S]](pf, combinerFactory(() => companion.newCombiner[S]), splitter)).resultWithTaskSupport } def flatMap[S](f: T => IterableOnce[S]): CC[S] = { - tasksupport.executeAndWaitResult(new FlatMap[S, CC[S]](f, combinerFactory(() => companion.newCombiner[S]), splitter) mapResult { _.resultWithTaskSupport }) + tasksupport.executeAndWaitResult(new FlatMap[S, CC[S]](f, combinerFactory(() => companion.newCombiner[S]), splitter)).resultWithTaskSupport } /** Tests whether a predicate holds for all elements of this $coll. @@ -572,11 +572,11 @@ self => def withFilter(pred: T => Boolean): Repr = filter(pred) def filter(pred: T => Boolean): Repr = { - tasksupport.executeAndWaitResult(new Filter(pred, combinerFactory, splitter) mapResult { _.resultWithTaskSupport }) + tasksupport.executeAndWaitResult(new Filter(pred, combinerFactory, splitter)).resultWithTaskSupport } def filterNot(pred: T => Boolean): Repr = { - tasksupport.executeAndWaitResult(new FilterNot(pred, combinerFactory, splitter) mapResult { _.resultWithTaskSupport }) + tasksupport.executeAndWaitResult(new FilterNot(pred, combinerFactory, splitter)).resultWithTaskSupport } def ++[U >: T](that: IterableOnce[U]): CC[U] = that match { @@ -588,10 +588,8 @@ self => val othtask = new other.Copy(cfactory, other.splitter) tasksupport.executeAndWaitResult(othtask) } - val task = (copythis parallel copythat) { _ combine _ } mapResult { - _.resultWithTaskSupport - } - tasksupport.executeAndWaitResult(task) + val task = (copythis parallel copythat) { _ combine _ } + tasksupport.executeAndWaitResult(task).resultWithTaskSupport case _ => // println("case parallel builder, `that` not parallel") val copythis = new Copy(combinerFactory(() => companion.newCombiner[U]), splitter) @@ -600,15 +598,13 @@ self => cb ++= that cb } - tasksupport.executeAndWaitResult((copythis parallel copythat) { _ combine _ } mapResult { _.resultWithTaskSupport }) + tasksupport.executeAndWaitResult((copythis parallel copythat) { _ combine _ }).resultWithTaskSupport } def partition(pred: T => Boolean): (Repr, Repr) = { - tasksupport.executeAndWaitResult( - new Partition(pred, combinerFactory, combinerFactory, splitter) mapResult { - p => (p._1.resultWithTaskSupport, p._2.resultWithTaskSupport) - } - ) + val result = tasksupport.executeAndWaitResult( + new Partition(pred, combinerFactory, combinerFactory, splitter)) + (result._1.resultWithTaskSupport, result._2.resultWithTaskSupport) } def groupBy[K](f: T => K): immutable.ParMap[K, Repr] = { @@ -621,9 +617,7 @@ self => def take(n: Int): Repr = { val actualn = if (size > n) n else size if (actualn < MIN_FOR_COPY) take_sequential(actualn) - else tasksupport.executeAndWaitResult(new Take(actualn, combinerFactory, splitter) mapResult { - _.resultWithTaskSupport - }) + else tasksupport.executeAndWaitResult(new Take(actualn, combinerFactory, splitter)).resultWithTaskSupport } private def take_sequential(n: Int) = { @@ -641,7 +635,7 @@ self => def drop(n: Int): Repr = { val actualn = if (size > n) n else size if ((size - actualn) < MIN_FOR_COPY) drop_sequential(actualn) - else tasksupport.executeAndWaitResult(new Drop(actualn, combinerFactory, splitter) mapResult { _.resultWithTaskSupport }) + else tasksupport.executeAndWaitResult(new Drop(actualn, combinerFactory, splitter)).resultWithTaskSupport } private def drop_sequential(n: Int) = { @@ -656,7 +650,7 @@ self => val from = unc_from min size max 0 val until = unc_until min size max from if ((until - from) <= MIN_FOR_COPY) slice_sequential(from, until) - else tasksupport.executeAndWaitResult(new Slice(from, until, combinerFactory, splitter) mapResult { _.resultWithTaskSupport }) + else tasksupport.executeAndWaitResult(new Slice(from, until, combinerFactory, splitter)).resultWithTaskSupport } private def slice_sequential(from: Int, until: Int): Repr = { @@ -671,11 +665,9 @@ self => } def splitAt(n: Int): (Repr, Repr) = { - tasksupport.executeAndWaitResult( - new SplitAt(n, combinerFactory, combinerFactory, splitter) mapResult { - p => (p._1.resultWithTaskSupport, p._2.resultWithTaskSupport) - } - ) + val result = tasksupport.executeAndWaitResult( + new SplitAt(n, combinerFactory, combinerFactory, splitter)) + (result._1.resultWithTaskSupport, result._2.resultWithTaskSupport) } /** Computes a prefix scan of the elements of the collection. @@ -689,9 +681,7 @@ self => */ def scan[U >: T](z: U)(op: (U, U) => U): CC[U] = { if (size > 0) tasksupport.executeAndWaitResult(new CreateScanTree(0, size, z, op, splitter) mapResult { - tree => tasksupport.executeAndWaitResult(new FromScanTree(tree, z, op, combinerFactory(() => companion.newCombiner[U])) mapResult { - cb => cb.resultWithTaskSupport - }) + tree => tasksupport.executeAndWaitResult(new FromScanTree(tree, z, op, combinerFactory(() => companion.newCombiner[U]))).resultWithTaskSupport }) else setTaskSupport((companion.newCombiner[U] += z).result(), tasksupport) } @@ -711,15 +701,12 @@ self => val cbf = combinerFactory if (cbf.doesShareCombiners) { val parseqspan = toSeq.takeWhile(pred) - tasksupport.executeAndWaitResult(new Copy(combinerFactory, parseqspan.splitter) mapResult { - _.resultWithTaskSupport - }) + tasksupport.executeAndWaitResult(new Copy(combinerFactory, parseqspan.splitter)).resultWithTaskSupport } else { val cntx = new DefaultSignalling with AtomicIndexFlag cntx.setIndexFlag(Int.MaxValue) - tasksupport.executeAndWaitResult(new TakeWhile(0, pred, combinerFactory, splitter assign cntx) mapResult { - _._1.resultWithTaskSupport - }) + val result = tasksupport.executeAndWaitResult(new TakeWhile(0, pred, combinerFactory, splitter assign cntx)) + result._1.resultWithTaskSupport } } @@ -736,18 +723,18 @@ self => val cbf = combinerFactory if (cbf.doesShareCombiners) { val (xs, ys) = toSeq.span(pred) - val copyxs = new Copy(combinerFactory, xs.splitter) mapResult { _.resultWithTaskSupport } - val copyys = new Copy(combinerFactory, ys.splitter) mapResult { _.resultWithTaskSupport } + val copyxs = new Copy(combinerFactory, xs.splitter) + val copyys = new Copy(combinerFactory, ys.splitter) val copyall = (copyxs parallel copyys) { (xr, yr) => (xr, yr) } - tasksupport.executeAndWaitResult(copyall) + val result = tasksupport.executeAndWaitResult(copyall) + (result._1.resultWithTaskSupport, result._2.resultWithTaskSupport) } else { val cntx = new DefaultSignalling with AtomicIndexFlag cntx.setIndexFlag(Int.MaxValue) - tasksupport.executeAndWaitResult(new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult { - p => (p._1.resultWithTaskSupport, p._2.resultWithTaskSupport) - }) + val result = tasksupport.executeAndWaitResult(new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx)) + (result._1.resultWithTaskSupport, result._2.resultWithTaskSupport) } } @@ -765,10 +752,7 @@ self => val cntx = new DefaultSignalling with AtomicIndexFlag cntx.setIndexFlag(Int.MaxValue) tasksupport.executeAndWaitResult( - new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult { - _._2.resultWithTaskSupport - } - ) + new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx))._2.resultWithTaskSupport } def copyToArray[U >: T](xs: Array[U]): Unit = copyToArray(xs, 0) @@ -785,7 +769,7 @@ self => def zip[U >: T, S](that: ParIterable[S]): CC[(U, S)] = { that match { case thatseq: ParSeq[S] => - tasksupport.executeAndWaitResult(new Zip(combinerFactory(() => companion.newCombiner[(U, S)]), splitter, thatseq.splitter) mapResult { _.resultWithTaskSupport }) + tasksupport.executeAndWaitResult(new Zip(combinerFactory(() => companion.newCombiner[(U, S)]), splitter, thatseq.splitter)).resultWithTaskSupport case _ => (companion.newBuilder[(U, S)] ++= setTaskSupport(seq.zip(that.seq), tasksupport)).result() } @@ -806,18 +790,16 @@ self => def zipAll[S, U >: T](that: ParIterable[S], thisElem: U, thatElem: S): CC[(U, S)] = { val thatseq = that.toSeq tasksupport.executeAndWaitResult( - new ZipAll(size max thatseq.length, thisElem, thatElem, combinerFactory(() => companion.newCombiner[(U, S)]), splitter, thatseq.splitter) mapResult { - _.resultWithTaskSupport - } - ) + new ZipAll(size max thatseq.length, thisElem, thatElem, combinerFactory(() => companion.newCombiner[(U, S)]), splitter, thatseq.splitter) + ).resultWithTaskSupport } protected def toParCollection[U >: T, That](cbf: () => Combiner[U, That]): That = { - tasksupport.executeAndWaitResult(new ToParCollection(combinerFactory(cbf), splitter) mapResult { _.resultWithTaskSupport }) + tasksupport.executeAndWaitResult(new ToParCollection(combinerFactory(cbf), splitter)).resultWithTaskSupport } protected def toParMap[K, V, That](cbf: () => Combiner[(K, V), That])(implicit ev: T <:< (K, V)): That = { - tasksupport.executeAndWaitResult(new ToParMap(combinerFactory(cbf), splitter)(ev) mapResult { _.resultWithTaskSupport }) + tasksupport.executeAndWaitResult(new ToParMap(combinerFactory(cbf), splitter)(ev)).resultWithTaskSupport } def toArray[U >: T: ClassTag]: Array[U] = {