Skip to content

Improving parallel MC use-case example #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,40 @@ This data is post-processed as necessary for the application.
Code is tested to estimate the probability that 3 segments, obtained by splitting a unit stick
in two randomly chosen places, can be sides of a triangle. This probability is known in closed form to be $\frac{1}{4}$.

Run python script "stick_triangle.py" to estimate this probability using parallel Monte-Carlo algorithm:

```
> python stick_triangle.py
Parallel Monte-Carlo estimation of stick triangle probability
Parameters: n_workers=12, batch_size=262144, n_batches=10000, seed=77777

Monte-Carlo estimate of probability: 0.250000
Population estimate of the estimator's standard deviation: 0.000834
Expected standard deviation of the estimator: 0.000846
Execution time: 64.043 seconds

```

## Stick tetrahedron problem

Code is used to estimate the probability that 6 segments, obtained by splitting a unit stick in
5 random chosen places, can be sides of a tetrahedron.

The probability is not known in closed form. See
[math.stackexchange.com/questions/351913](https://math.stackexchange.com/questions/351913/probability-that-a-stick-randomly-broken-in-five-places-can-form-a-tetrahedron) for more details.
[math.stackexchange.com/questions/351913](https://math.stackexchange.com/questions/351913/probability-that-a-stick-randomly-broken-in-five-places-can-form-a-tetrahedron) for more details.

```
> python stick_tetrahedron.py -s 1274 -p 4 -n 8096
Parallel Monte-Carlo estimation of stick tetrahedron probability
Input parameters: -s 1274 -b 65536 -n 8096 -p 4 -d 0

Monte-Carlo estimate of probability: 0.01257113
Population estimate of the estimator's standard deviation: 0.00000488
Expected standard deviation of the estimator: 0.00000484
Total MC size: 530579456

Bayesian posterior beta distribution parameters: (6669984, 523909472)

Execution time: 30.697 seconds

```
28 changes: 15 additions & 13 deletions examples/parallel_mc.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import multiprocessing as mp
from functools import partial

__all__ = ['parallel_mc_run']

def worker_compute(w_id):
"Worker function executed on the spawned slave process"
# global _local_rs
return _worker_mc_compute(_local_rs)
global _local_rs, _worker_mc_compute_func
return _worker_mc_compute_func(_local_rs)


def assign_worker_rs(w_rs):
def init_worker(w_rs, mc_compute_func=None, barrier=None):
"""Assign process local random state variable `rs` the given value"""
assert not '_local_rs' in globals(), "Here comes trouble. Process is not expected to have global variable `_local_rs`"

global _local_rs
global _local_rs, _worker_mc_compute_func
_local_rs = w_rs
_worker_mc_compute_func = mc_compute_func
# wait to ensure that the assignment takes place for each worker
b.wait()
barrier.wait()

def parallel_mc_run(random_states, n_workers, n_batches, mc_func):
"""
Expand All @@ -25,15 +27,15 @@ def parallel_mc_run(random_states, n_workers, n_batches, mc_func):
and has access to worker-local global variable `rs`, containing worker's random states.
"""
# use of Barrier ensures that every worker gets one
global b, _worker_mc_compute
b = mp.Barrier(n_workers)

with mp.Manager() as manager:
b = manager.Barrier(n_workers)

_worker_mc_compute = mc_func
with mp.Pool(processes=n_workers) as pool:
# 1. map over every worker once to distribute RandomState instances
pool.map(assign_worker_rs, random_states, chunksize=1)
# 2. Perform computations on workers
r = pool.map(worker_compute, range(n_batches), chunksize=1)
with mp.Pool(processes=n_workers) as pool:
# 1. map over every worker once to distribute RandomState instances
pool.map(partial(init_worker, mc_compute_func=mc_func, barrier=b), random_states, chunksize=1)
# 2. Perform computations on workers
r = pool.map(worker_compute, range(n_batches), chunksize=1)

return r

Expand Down
16 changes: 9 additions & 7 deletions examples/stick_tetrahedron.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from sticky_math import mc_six_piece_stick_tetrahedron_prob
from arg_parsing import parse_arguments

def mc_runner(rs):
def mc_runner(rs, batch_size=None):
return mc_six_piece_stick_tetrahedron_prob(rs, batch_size)

def aggregate_mc_counts(counts, n_batches, batch_size):
Expand Down Expand Up @@ -40,6 +40,7 @@ def print_result(p_est, p_std, mc_size):
from itertools import repeat
from timeit import default_timer as timer
import sys
from functools import partial

args = parse_arguments()

Expand All @@ -51,23 +52,24 @@ def print_result(p_est, p_std, mc_size):
batch_size = args.batch_size
batches = args.batch_count
id0 = args.id_offset
print("Parallel Monte-Carlo estimation of stick tetrahedron probability")
print("Input parameters: -s {seed} -b {batchSize} -n {numBatches} -p {processes} -d {idOffset}".format(
seed=args.seed, batchSize=args.batch_size, numBatches=args.batch_count, processes=n_workers, idOffset=args.id_offset))
print("")

t0 = timer()

rss = build_MT2203_random_states(seed, id0, n_workers)
r = parallel_mc_run(rss, n_workers, batches, mc_runner)
# r = sequential_mc_run(rss, n_workers, batches, mc_runner)

r = parallel_mc_run(rss, n_workers, batches, partial(mc_runner, batch_size=batch_size))
# r = sequential_mc_run(rss, n_workers, batches, partial(mc_runner, batch_size=batch_size))

# retrieve values of estimates into numpy array
counts = np.fromiter(r, dtype=np.double)
p_est, p_std, event_count, nonevent_count = aggregate_mc_counts(counts, batches, batch_size)

t1 = timer()


print("Input parameters: -s {seed} -b {batchSize} -n {numBatches} -p {processes} -d {idOffset}".format(
seed=args.seed, batchSize=args.batch_size, numBatches=args.batch_count, processes=n_workers, idOffset=args.id_offset))
print("")
print_result(p_est, p_std, batches * batch_size)
print("")
print("Bayesian posterior beta distribution parameters: ({0}, {1})".format(event_count, nonevent_count))
Expand Down
27 changes: 16 additions & 11 deletions examples/fancy.py → examples/stick_triangle.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,41 +39,46 @@ def mc_dist(rs, n):
return mc_prob


def assign_worker_rs(w_rs):
def init_worker(w_rs, barrier=None):
"""Assign process local random state variable `rs` the given value"""
assert not 'rs' in globals(), "Here comes trouble. Process is not expected to have global variable `rs`"

global rs
rs = w_rs
# wait to ensure that the assignment takes place for each worker
b.wait()
barrier.wait()


def worker_compute(w_id):
def worker_compute(w_id, batch_size=None):
return mc_dist(rs, batch_size)


if __name__ == '__main__':
import multiprocessing as mp
from itertools import repeat
from timeit import default_timer as timer
from functools import partial

seed = 77777
n_workers = 12
batch_size = 1024 * 256
batches = 10000
print("Parallel Monte-Carlo estimation of stick triangle probability")
print(f"Parameters: n_workers={n_workers}, batch_size={batch_size}, n_batches={batches}, seed={seed}")
print("")

t0 = timer()
# Create instances of RandomState for each worker process from MT2203 family of generators
rss = [ rnd.RandomState(seed, brng=('MT2203', idx)) for idx in range(n_workers) ]
# use of Barrier ensures that every worker gets one
b = mp.Barrier(n_workers)

with mp.Pool(processes=n_workers) as pool:
# map over every worker once to distribute RandomState instances
pool.map(assign_worker_rs, rss, chunksize=1)
# Perform computations on workers
r = pool.map(worker_compute, range(batches), chunksize=1)
with mp.Manager() as manager:
# use of Barrier ensures that every worker gets one
b = manager.Barrier(n_workers)

with mp.Pool(processes=n_workers) as pool:
# map over every worker once to distribute RandomState instances
pool.map(partial(init_worker, barrier=b), rss, chunksize=1)
# Perform computations on workers
r = pool.map(partial(worker_compute, batch_size=batch_size), range(batches), chunksize=1)

# retrieve values of estimates into numpy array
ps = np.fromiter(r, dtype=np.double)
Expand Down