Skip to content

gh-89545: Adds internal _wmi module on Windows for directly querying OS properties #96289

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Include/internal/pycore_global_strings.h
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ struct _Py_global_strings {
STRUCT_FOR_ID(protocol)
STRUCT_FOR_ID(ps1)
STRUCT_FOR_ID(ps2)
STRUCT_FOR_ID(query)
STRUCT_FOR_ID(quotetabs)
STRUCT_FOR_ID(r)
STRUCT_FOR_ID(raw)
Expand Down
7 changes: 7 additions & 0 deletions Include/internal/pycore_runtime_init_generated.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

148 changes: 105 additions & 43 deletions Lib/platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,34 +309,52 @@ def _syscmd_ver(system='', release='', version='',
version = _norm_version(version)
return system, release, version

_WIN32_CLIENT_RELEASES = {
(5, 0): "2000",
(5, 1): "XP",
# Strictly, 5.2 client is XP 64-bit, but platform.py historically
# has always called it 2003 Server
(5, 2): "2003Server",
(5, None): "post2003",

(6, 0): "Vista",
(6, 1): "7",
(6, 2): "8",
(6, 3): "8.1",
(6, None): "post8.1",

(10, 0): "10",
(10, None): "post10",
}

# Server release name lookup will default to client names if necessary
_WIN32_SERVER_RELEASES = {
(5, 2): "2003Server",

(6, 0): "2008Server",
(6, 1): "2008ServerR2",
(6, 2): "2012Server",
(6, 3): "2012ServerR2",
(6, None): "post2012ServerR2",
}
try:
import _wmi
except ImportError:
def _wmi_query(*keys):
raise OSError("not supported")
else:
def _wmi_query(table, *keys):
table = {
"OS": "Win32_OperatingSystem",
"CPU": "Win32_Processor",
}[table]
data = _wmi.exec_query("SELECT {} FROM {}".format(
",".join(keys),
table,
)).split("\0")
split_data = (i.partition("=") for i in data)
dict_data = {i[0]: i[2] for i in split_data}
return (dict_data[k] for k in keys)


_WIN32_CLIENT_RELEASES = [
((10, 1, 0), "post11"),
((10, 0, 22000), "11"),
((6, 4, 0), "10"),
((6, 3, 0), "8.1"),
((6, 2, 0), "8"),
((6, 1, 0), "7"),
((6, 0, 0), "Vista"),
((5, 2, 3790), "XP64"),
((5, 2, 0), "XPMedia"),
((5, 1, 0), "XP"),
((5, 0, 0), "2000"),
]

_WIN32_SERVER_RELEASES = [
((10, 1, 0), "post2022Server"),
((10, 0, 20348), "2022Server"),
((10, 0, 17763), "2019Server"),
((6, 4, 0), "2016Server"),
((6, 3, 0), "2012ServerR2"),
((6, 2, 0), "2012Server"),
((6, 1, 0), "2008ServerR2"),
((6, 0, 0), "2008Server"),
((5, 2, 0), "2003Server"),
((5, 0, 0), "2000Server"),
]

def win32_is_iot():
return win32_edition() in ('IoTUAP', 'NanoServer', 'WindowsCoreHeadless', 'IoTEdgeOS')
Expand All @@ -359,22 +377,40 @@ def win32_edition():

return None

def win32_ver(release='', version='', csd='', ptype=''):
def _win32_ver(version, csd, ptype):
# Try using WMI first, as this is the canonical source of data
try:
(version, product_type, ptype, spmajor, spminor) = _wmi_query(
'OS',
'Version',
'ProductType',
'BuildType',
'ServicePackMajorVersion',
'ServicePackMinorVersion',
)
is_client = (int(product_type) == 1)
if spminor and spminor != '0':
csd = f'SP{spmajor}.{spminor}'
else:
csd = f'SP{spmajor}'
return version, csd, ptype, is_client
except OSError:
pass

# Fall back to a combination of sys.getwindowsversion and "ver"
try:
from sys import getwindowsversion
except ImportError:
return release, version, csd, ptype
return version, csd, ptype, True

winver = getwindowsversion()
is_client = (getattr(winver, 'product_type', 1) == 1)
try:
major, minor, build = map(int, _syscmd_ver()[2].split('.'))
version = _syscmd_ver()[2]
major, minor, build = map(int, version.split('.'))
except ValueError:
major, minor, build = winver.platform_version or winver[:3]
version = '{0}.{1}.{2}'.format(major, minor, build)

release = (_WIN32_CLIENT_RELEASES.get((major, minor)) or
_WIN32_CLIENT_RELEASES.get((major, None)) or
release)
version = '{0}.{1}.{2}'.format(major, minor, build)

# getwindowsversion() reflect the compatibility mode Python is
# running under, and so the service pack value is only going to be
Expand All @@ -386,12 +422,6 @@ def win32_ver(release='', version='', csd='', ptype=''):
if csd[:13] == 'Service Pack ':
csd = 'SP' + csd[13:]

# VER_NT_SERVER = 3
if getattr(winver, 'product_type', None) == 3:
release = (_WIN32_SERVER_RELEASES.get((major, minor)) or
_WIN32_SERVER_RELEASES.get((major, None)) or
release)

try:
try:
import winreg
Expand All @@ -407,6 +437,18 @@ def win32_ver(release='', version='', csd='', ptype=''):
except OSError:
pass

return version, csd, ptype, is_client

def win32_ver(release='', version='', csd='', ptype=''):
is_client = False

version, csd, ptype, is_client = _win32_ver(version, csd, ptype)

if version:
intversion = tuple(map(int, version.split('.')))
releases = _WIN32_CLIENT_RELEASES if is_client else _WIN32_SERVER_RELEASES
release = next((r for v, r in releases if v <= intversion), release)

return release, version, csd, ptype


Expand Down Expand Up @@ -725,6 +767,21 @@ def _get_machine_win32():
# http://www.geocities.com/rick_lively/MANUALS/ENV/MSWIN/PROCESSI.HTM

# WOW64 processes mask the native architecture
try:
arch, = _wmi_query('CPU', 'Architecture')
except OSError:
pass
else:
try:
arch = ['x86', 'MIPS', 'Alpha', 'PowerPC', None,
'ARM', 'ia64', None, None,
'AMD64', None, None, 'ARM64',
][int(arch)]
except (ValueError, IndexError):
pass
else:
if arch:
return arch
return (
os.environ.get('PROCESSOR_ARCHITEW6432', '') or
os.environ.get('PROCESSOR_ARCHITECTURE', '')
Expand All @@ -738,7 +795,12 @@ def get(cls):
return func() or ''

def get_win32():
return os.environ.get('PROCESSOR_IDENTIFIER', _get_machine_win32())
try:
manufacturer, caption = _wmi_query('CPU', 'Manufacturer', 'Caption')
except OSError:
return os.environ.get('PROCESSOR_IDENTIFIER', _get_machine_win32())
else:
return f'{caption}, {manufacturer}'

def get_OpenVMS():
try:
Expand Down
11 changes: 11 additions & 0 deletions Lib/test/audit-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,17 @@ def hook(event, args):
sys._getframe()


def test_wmi_exec_query():
import _wmi

def hook(event, args):
if event.startswith("_wmi."):
print(event, args[0])

sys.addaudithook(hook)
_wmi.exec_query("SELECT * FROM Win32_OperatingSystem")


if __name__ == "__main__":
from test.support import suppress_msvcrt_asserts

Expand Down
15 changes: 15 additions & 0 deletions Lib/test/test_audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,5 +185,20 @@ def test_sys_getframe(self):

self.assertEqual(actual, expected)


def test_wmi_exec_query(self):
import_helper.import_module("_wmi")
returncode, events, stderr = self.run_python("test_wmi_exec_query")
if returncode:
self.fail(stderr)

if support.verbose:
print(*events, sep='\n')
actual = [(ev[0], ev[2]) for ev in events]
expected = [("_wmi.exec_query", "SELECT * FROM Win32_OperatingSystem")]

self.assertEqual(actual, expected)


if __name__ == "__main__":
unittest.main()
21 changes: 21 additions & 0 deletions Lib/test/test_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,18 @@ def test_uname(self):
self.assertEqual(res[-1], res.processor)
self.assertEqual(len(res), 6)

@unittest.skipUnless(sys.platform.startswith('win'), "windows only test")
def test_uname_win32_without_wmi(self):
old_wmi_query = platform._wmi_query
def raises_oserror(*a):
raise OSError()
platform._wmi_query = raises_oserror

try:
self.test_uname()
finally:
platform._wmi_query = old_wmi_query

def test_uname_cast_to_tuple(self):
res = platform.uname()
expected = (
Expand Down Expand Up @@ -289,6 +301,14 @@ def test_uname_win32_ARCHITEW6432(self):
# on 64 bit Windows: if PROCESSOR_ARCHITEW6432 exists we should be
# using it, per
# http://blogs.msdn.com/david.wang/archive/2006/03/26/HOWTO-Detect-Process-Bitness.aspx

# We also need to suppress WMI checks, as those are reliable and
# overrule the environment variables
old_wmi_query = platform._wmi_query
def raises_oserror(*a):
raise OSError()
platform._wmi_query = raises_oserror

try:
with os_helper.EnvironmentVarGuard() as environ:
if 'PROCESSOR_ARCHITEW6432' in environ:
Expand All @@ -303,6 +323,7 @@ def test_uname_win32_ARCHITEW6432(self):
self.assertEqual(machine, 'bar')
finally:
platform._uname_cache = None
platform._wmi_query = old_wmi_query

def test_java_ver(self):
res = platform.java_ver()
Expand Down
54 changes: 54 additions & 0 deletions Lib/test/test_wmi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Test the internal _wmi module on Windows
# This is used by the platform module, and potentially others

import re
import sys
import unittest
from test.support import import_helper


# Do this first so test will be skipped if module doesn't exist
_wmi = import_helper.import_module('_wmi', required_on=['win'])


class WmiTests(unittest.TestCase):
def test_wmi_query_os_version(self):
r = _wmi.exec_query("SELECT Version FROM Win32_OperatingSystem").split("\0")
self.assertEqual(1, len(r))
k, eq, v = r[0].partition("=")
self.assertEqual("=", eq, r[0])
self.assertEqual("Version", k, r[0])
# Best we can check for the version is that it's digits, dot, digits, anything
# Otherwise, we are likely checking the result of the query against itself
self.assertTrue(re.match(r"\d+\.\d+.+$", v), r[0])

def test_wmi_query_repeated(self):
# Repeated queries should not break
for _ in range(10):
self.test_wmi_query_os_version()

def test_wmi_query_error(self):
# Invalid queries fail with OSError
try:
_wmi.exec_query("SELECT InvalidColumnName FROM InvalidTableName")
except OSError as ex:
if ex.winerror & 0xFFFFFFFF == 0x80041010:
# This is the expected error code. All others should fail the test
return
self.fail("Expected OSError")

def test_wmi_query_repeated_error(self):
for _ in range(10):
self.test_wmi_query_error()

def test_wmi_query_not_select(self):
# Queries other than SELECT are blocked to avoid potential exploits
with self.assertRaises(ValueError):
_wmi.exec_query("not select, just in case someone tries something")

def test_wmi_query_overflow(self):
# Ensure very big queries fail
# Test multiple times to ensure consistency
for _ in range(2):
with self.assertRaises(OSError):
_wmi.exec_query("SELECT * FROM CIM_DataFile")
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Updates :mod:`platform` to use native WMI queries to determine OS version, type, and architecture.
Loading