Skip to content

Commit 4953486

Browse files
hetangmodi-crestartemrys
authored andcommitted
feat: moved methods from splunktalib (#415)
[ADDON-73693](https://splunk.atlassian.net/browse/ADDON-73693) Migrated all the methods of `splunktalib` which were used in `splunktaucclib` to `solnlib`
1 parent d46cd21 commit 4953486

File tree

8 files changed

+1090
-0
lines changed

8 files changed

+1090
-0
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
#
2+
# Copyright 2024 Splunk Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
"""Concurrent executor provides concurrent executing function either in a
18+
thread pool or a process pool."""
19+
20+
import solnlib.concurrent.process_pool as pp
21+
import solnlib.concurrent.thread_pool as tp
22+
23+
24+
class ConcurrentExecutor:
25+
def __init__(self, config):
26+
"""
27+
:param config: dict like object, contains thread_min_size (int),
28+
thread_max_size (int), daemonize_thread (bool),
29+
process_size (int)
30+
"""
31+
32+
self._io_executor = tp.ThreadPool(
33+
config.get("thread_min_size", 0),
34+
config.get("thread_max_size", 0),
35+
config.get("task_queue_size", 1024),
36+
config.get("daemonize_thread", True),
37+
)
38+
self._compute_executor = None
39+
if config.get("process_size", 0):
40+
self._compute_executor = pp.ProcessPool(config.get("process_size", 0))
41+
42+
def start(self):
43+
self._io_executor.start()
44+
45+
def tear_down(self):
46+
self._io_executor.tear_down()
47+
if self._compute_executor is not None:
48+
self._compute_executor.tear_down()
49+
50+
def run_io_func_sync(self, func, args=(), kwargs=None):
51+
"""
52+
:param func: callable
53+
:param args: free params
54+
:param kwargs: named params
55+
:return whatever the func returns
56+
"""
57+
58+
return self._io_executor.apply(func, args, kwargs)
59+
60+
def run_io_func_async(self, func, args=(), kwargs=None, callback=None):
61+
"""
62+
:param func: callable
63+
:param args: free params
64+
:param kwargs: named params
65+
:calllback: when func is done and without exception, call the callback
66+
:return whatever the func returns
67+
"""
68+
69+
return self._io_executor.apply_async(func, args, kwargs, callback)
70+
71+
def enqueue_io_funcs(self, funcs, block=True):
72+
"""run jobs in a fire and forget way, no result will be handled over to
73+
clients.
74+
75+
:param funcs: tuple/list-like or generator like object, func shall be
76+
callable
77+
"""
78+
79+
return self._io_executor.enqueue_funcs(funcs, block)
80+
81+
def run_compute_func_sync(self, func, args=(), kwargs={}):
82+
"""
83+
:param func: callable
84+
:param args: free params
85+
:param kwargs: named params
86+
:return whatever the func returns
87+
"""
88+
89+
assert self._compute_executor is not None
90+
return self._compute_executor.apply(func, args, kwargs)
91+
92+
def run_compute_func_async(self, func, args=(), kwargs={}, callback=None):
93+
"""
94+
:param func: callable
95+
:param args: free params
96+
:param kwargs: named params
97+
:calllback: when func is done and without exception, call the callback
98+
:return whatever the func returns
99+
"""
100+
101+
assert self._compute_executor is not None
102+
return self._compute_executor.apply_async(func, args, kwargs, callback)

solnlib/concurrent/process_pool.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#
2+
# Copyright 2024 Splunk Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
"""A wrapper of multiprocessing.pool."""
18+
19+
import multiprocessing
20+
21+
import logging
22+
23+
24+
class ProcessPool:
25+
"""A simple wrapper of multiprocessing.pool."""
26+
27+
def __init__(self, size=0, maxtasksperchild=10000):
28+
if size <= 0:
29+
size = multiprocessing.cpu_count()
30+
self.size = size
31+
self._pool = multiprocessing.Pool(
32+
processes=size, maxtasksperchild=maxtasksperchild
33+
)
34+
self._stopped = False
35+
36+
def tear_down(self):
37+
"""Tear down the pool."""
38+
39+
if self._stopped:
40+
logging.info("ProcessPool has already stopped.")
41+
return
42+
self._stopped = True
43+
44+
self._pool.close()
45+
self._pool.join()
46+
logging.info("ProcessPool stopped.")
47+
48+
def apply(self, func, args=(), kwargs={}):
49+
"""
50+
:param func: callable
51+
:param args: free params
52+
:param kwargs: named params
53+
:return whatever the func returns
54+
"""
55+
56+
if self._stopped:
57+
logging.info("ProcessPool has already stopped.")
58+
return None
59+
60+
return self._pool.apply(func, args, kwargs)
61+
62+
def apply_async(self, func, args=(), kwargs={}, callback=None):
63+
"""
64+
:param func: callable
65+
:param args: free params
66+
:param kwargs: named params
67+
:callback: when func is done without exception, call this callack
68+
:return whatever the func returns
69+
"""
70+
71+
if self._stopped:
72+
logging.info("ProcessPool has already stopped.")
73+
return None
74+
75+
return self._pool.apply_async(func, args, kwargs, callback)

0 commit comments

Comments
 (0)