From 11aeef259c3e45b3d4dea60589b7f7eaec830465 Mon Sep 17 00:00:00 2001 From: Oleksandr Pavlyk Date: Wed, 22 Mar 2023 10:41:28 -0500 Subject: [PATCH 1/2] Fix parallel MC example to run on Windows On Windows one needs to use multiprocessing.Manager to create the barrier to be shared across workers, and passed to worker initialization routine as argument. Also passed batch_size to routines as keyword argument, and in parallel_mc.py passed mc_worker_func to worker initialization routine as keyword argument, which assigns to process-global variable for reuse by subsequent MC computation tasks. --- examples/parallel_mc.py | 28 +++++++++++++----------- examples/stick_tetrahedron.py | 16 ++++++++------ examples/{fancy.py => stick_triangle.py} | 27 +++++++++++++---------- 3 files changed, 40 insertions(+), 31 deletions(-) rename examples/{fancy.py => stick_triangle.py} (77%) diff --git a/examples/parallel_mc.py b/examples/parallel_mc.py index b0cc73a..04f11fd 100644 --- a/examples/parallel_mc.py +++ b/examples/parallel_mc.py @@ -1,21 +1,23 @@ import multiprocessing as mp +from functools import partial __all__ = ['parallel_mc_run'] def worker_compute(w_id): "Worker function executed on the spawned slave process" - # global _local_rs - return _worker_mc_compute(_local_rs) + global _local_rs, _worker_mc_compute_func + return _worker_mc_compute_func(_local_rs) -def assign_worker_rs(w_rs): +def init_worker(w_rs, mc_compute_func=None, barrier=None): """Assign process local random state variable `rs` the given value""" assert not '_local_rs' in globals(), "Here comes trouble. Process is not expected to have global variable `_local_rs`" - global _local_rs + global _local_rs, _worker_mc_compute_func _local_rs = w_rs + _worker_mc_compute_func = mc_compute_func # wait to ensure that the assignment takes place for each worker - b.wait() + barrier.wait() def parallel_mc_run(random_states, n_workers, n_batches, mc_func): """ @@ -25,15 +27,15 @@ def parallel_mc_run(random_states, n_workers, n_batches, mc_func): and has access to worker-local global variable `rs`, containing worker's random states. """ # use of Barrier ensures that every worker gets one - global b, _worker_mc_compute - b = mp.Barrier(n_workers) + + with mp.Manager() as manager: + b = manager.Barrier(n_workers) - _worker_mc_compute = mc_func - with mp.Pool(processes=n_workers) as pool: - # 1. map over every worker once to distribute RandomState instances - pool.map(assign_worker_rs, random_states, chunksize=1) - # 2. Perform computations on workers - r = pool.map(worker_compute, range(n_batches), chunksize=1) + with mp.Pool(processes=n_workers) as pool: + # 1. map over every worker once to distribute RandomState instances + pool.map(partial(init_worker, mc_compute_func=mc_func, barrier=b), random_states, chunksize=1) + # 2. Perform computations on workers + r = pool.map(worker_compute, range(n_batches), chunksize=1) return r diff --git a/examples/stick_tetrahedron.py b/examples/stick_tetrahedron.py index 5a73ec1..c01cf6b 100644 --- a/examples/stick_tetrahedron.py +++ b/examples/stick_tetrahedron.py @@ -4,7 +4,7 @@ from sticky_math import mc_six_piece_stick_tetrahedron_prob from arg_parsing import parse_arguments -def mc_runner(rs): +def mc_runner(rs, batch_size=None): return mc_six_piece_stick_tetrahedron_prob(rs, batch_size) def aggregate_mc_counts(counts, n_batches, batch_size): @@ -40,6 +40,7 @@ def print_result(p_est, p_std, mc_size): from itertools import repeat from timeit import default_timer as timer import sys + from functools import partial args = parse_arguments() @@ -51,12 +52,17 @@ def print_result(p_est, p_std, mc_size): batch_size = args.batch_size batches = args.batch_count id0 = args.id_offset + print("Parallel Monte-Carlo estimation of stick tetrahedron probability") + print("Input parameters: -s {seed} -b {batchSize} -n {numBatches} -p {processes} -d {idOffset}".format( + seed=args.seed, batchSize=args.batch_size, numBatches=args.batch_count, processes=n_workers, idOffset=args.id_offset)) + print("") t0 = timer() rss = build_MT2203_random_states(seed, id0, n_workers) - r = parallel_mc_run(rss, n_workers, batches, mc_runner) - # r = sequential_mc_run(rss, n_workers, batches, mc_runner) + + r = parallel_mc_run(rss, n_workers, batches, partial(mc_runner, batch_size=batch_size)) + # r = sequential_mc_run(rss, n_workers, batches, partial(mc_runner, batch_size=batch_size)) # retrieve values of estimates into numpy array counts = np.fromiter(r, dtype=np.double) @@ -64,10 +70,6 @@ def print_result(p_est, p_std, mc_size): t1 = timer() - - print("Input parameters: -s {seed} -b {batchSize} -n {numBatches} -p {processes} -d {idOffset}".format( - seed=args.seed, batchSize=args.batch_size, numBatches=args.batch_count, processes=n_workers, idOffset=args.id_offset)) - print("") print_result(p_est, p_std, batches * batch_size) print("") print("Bayesian posterior beta distribution parameters: ({0}, {1})".format(event_count, nonevent_count)) diff --git a/examples/fancy.py b/examples/stick_triangle.py similarity index 77% rename from examples/fancy.py rename to examples/stick_triangle.py index 8137cec..865fa9a 100644 --- a/examples/fancy.py +++ b/examples/stick_triangle.py @@ -39,17 +39,17 @@ def mc_dist(rs, n): return mc_prob -def assign_worker_rs(w_rs): +def init_worker(w_rs, barrier=None): """Assign process local random state variable `rs` the given value""" assert not 'rs' in globals(), "Here comes trouble. Process is not expected to have global variable `rs`" global rs rs = w_rs # wait to ensure that the assignment takes place for each worker - b.wait() + barrier.wait() -def worker_compute(w_id): +def worker_compute(w_id, batch_size=None): return mc_dist(rs, batch_size) @@ -57,23 +57,28 @@ def worker_compute(w_id): import multiprocessing as mp from itertools import repeat from timeit import default_timer as timer + from functools import partial seed = 77777 n_workers = 12 batch_size = 1024 * 256 batches = 10000 + print("Parallel Monte-Carlo estimation of stick triangle probability") + print(f"Parameters: n_workers={n_workers}, batch_size={batch_size}, n_batches={batches}, seed={seed}") + print("") t0 = timer() # Create instances of RandomState for each worker process from MT2203 family of generators rss = [ rnd.RandomState(seed, brng=('MT2203', idx)) for idx in range(n_workers) ] - # use of Barrier ensures that every worker gets one - b = mp.Barrier(n_workers) - - with mp.Pool(processes=n_workers) as pool: - # map over every worker once to distribute RandomState instances - pool.map(assign_worker_rs, rss, chunksize=1) - # Perform computations on workers - r = pool.map(worker_compute, range(batches), chunksize=1) + with mp.Manager() as manager: + # use of Barrier ensures that every worker gets one + b = manager.Barrier(n_workers) + + with mp.Pool(processes=n_workers) as pool: + # map over every worker once to distribute RandomState instances + pool.map(partial(init_worker, barrier=b), rss, chunksize=1) + # Perform computations on workers + r = pool.map(partial(worker_compute, batch_size=batch_size), range(batches), chunksize=1) # retrieve values of estimates into numpy array ps = np.fromiter(r, dtype=np.double) From a9db09fe8e553210ffda9f7179f9d43734bdb944 Mon Sep 17 00:00:00 2001 From: Oleksandr Pavlyk Date: Wed, 22 Mar 2023 10:58:22 -0500 Subject: [PATCH 2/2] Updated README.md to identify whichs script to run --- examples/README.md | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/examples/README.md b/examples/README.md index 8c6092e..845b1d6 100644 --- a/examples/README.md +++ b/examples/README.md @@ -17,10 +17,40 @@ This data is post-processed as necessary for the application. Code is tested to estimate the probability that 3 segments, obtained by splitting a unit stick in two randomly chosen places, can be sides of a triangle. This probability is known in closed form to be $\frac{1}{4}$. +Run python script "stick_triangle.py" to estimate this probability using parallel Monte-Carlo algorithm: + +``` +> python stick_triangle.py +Parallel Monte-Carlo estimation of stick triangle probability +Parameters: n_workers=12, batch_size=262144, n_batches=10000, seed=77777 + +Monte-Carlo estimate of probability: 0.250000 +Population estimate of the estimator's standard deviation: 0.000834 +Expected standard deviation of the estimator: 0.000846 +Execution time: 64.043 seconds + +``` + ## Stick tetrahedron problem Code is used to estimate the probability that 6 segments, obtained by splitting a unit stick in 5 random chosen places, can be sides of a tetrahedron. The probability is not known in closed form. See -[math.stackexchange.com/questions/351913](https://math.stackexchange.com/questions/351913/probability-that-a-stick-randomly-broken-in-five-places-can-form-a-tetrahedron) for more details. \ No newline at end of file +[math.stackexchange.com/questions/351913](https://math.stackexchange.com/questions/351913/probability-that-a-stick-randomly-broken-in-five-places-can-form-a-tetrahedron) for more details. + +``` +> python stick_tetrahedron.py -s 1274 -p 4 -n 8096 +Parallel Monte-Carlo estimation of stick tetrahedron probability +Input parameters: -s 1274 -b 65536 -n 8096 -p 4 -d 0 + +Monte-Carlo estimate of probability: 0.01257113 +Population estimate of the estimator's standard deviation: 0.00000488 +Expected standard deviation of the estimator: 0.00000484 +Total MC size: 530579456 + +Bayesian posterior beta distribution parameters: (6669984, 523909472) + +Execution time: 30.697 seconds + +```