Skip to content

Add support for mTLS #382

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/devnotes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Decision Points
Testing Notes
-------------

When running tests on new code, you are advised to run 'test_default' first, then 'test_regression', then finally 'test_security'.
When running tests on new code, you are advised to run 'test_default' first, then 'test_regression', then finally 'test_ldap' and/or 'test_mtls'.
Because of the way errors are propagated you may have code failures which cause a teardown which then fails because of security controls, which can then obscure the original error.


Expand Down
11 changes: 8 additions & 3 deletions nipyapi/canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -1106,13 +1106,14 @@ def create_controller(parent_pg, controller, name=None):
return out


def list_all_controllers(pg_id='root', descendants=True):
def list_all_controllers(pg_id='root', descendants=True, include_reporting_tasks=False):
"""
Lists all controllers under a given Process Group, defaults to Root
Optionally recurses all child Process Groups as well
Args:
pg_id (str): String of the ID of the Process Group to list from
descendants (bool): True to recurse child PGs, False to not
include_reporting_tasks (bool): True to include Reporting Tasks, False to not

Returns:
None, ControllerServiceEntity, or list(ControllerServiceEntity)
Expand Down Expand Up @@ -1145,6 +1146,9 @@ def list_all_controllers(pg_id='root', descendants=True):
pg_id,
include_descendant_groups=descendants
).controller_services
if include_reporting_tasks:
mgmt_handle = nipyapi.nifi.FlowApi()
out += mgmt_handle.get_controller_services_from_controller().controller_services
return out


Expand Down Expand Up @@ -1274,7 +1278,7 @@ def _schedule_controller_state(cont_id, tgt_state):
raise ValueError("Scheduling request timed out")


def get_controller(identifier, identifier_type='name', bool_response=False):
def get_controller(identifier, identifier_type='name', bool_response=False, include_reporting_tasks=True):
"""
Retrieve a given Controller

Expand All @@ -1283,6 +1287,7 @@ def get_controller(identifier, identifier_type='name', bool_response=False):
identifier_type (str): 'id' or 'name', defaults to name
bool_response (bool): If True, will return False if the Controller is
not found - useful when testing for deletion completion
include_reporting_tasks (bool): If True, will include Reporting Tasks in the search

Returns:

Expand All @@ -1295,7 +1300,7 @@ def get_controller(identifier, identifier_type='name', bool_response=False):
if identifier_type == 'id':
out = handle.get_controller_service(identifier)
else:
obj = list_all_controllers()
obj = list_all_controllers(include_reporting_tasks=include_reporting_tasks)
out = nipyapi.utils.filter_obj(obj, identifier, identifier_type)
except nipyapi.nifi.rest.ApiException as e:
if bool_response:
Expand Down
4 changes: 3 additions & 1 deletion nipyapi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,11 @@
default_nifi_password = "password"
default_registry_username = "nobel"
default_registry_password = "password"
# Identity to be used for mTLS authentication
default_mtls_identity = "CN=user1, OU=nifi"
# Identity to be used in the Registry Client Proxy setup
# If called for during policy setup, particularly bootstrap_policies
default_proxy_user = "CN=localhost, OU=nifi"
default_proxy_user = "CN=user1, OU=nifi"

# Auth handling
# If set, NiPyAPI will always include the Basic Authorization header
Expand Down
131 changes: 106 additions & 25 deletions nipyapi/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,12 +729,13 @@ def set_service_ssl_context(
client_key_file=None,
client_key_password=None,
check_hostname=None,
purpose=None,
):
"""
Create an SSLContext for connecting over https to a secured NiFi or
NiFi-Registry instance.

This method can be used to create an SSLContext for
This method can be used to create an SSLContext for
two-way TLS in which a client cert is used by the service to authenticate
the client.

Expand All @@ -757,15 +758,16 @@ def set_service_ssl_context(
containing the client's secret key
client_key_password (str): The password to decrypt the client_key_file
check_hostname (bool): Enable or Disable hostname checking
purpose (ssl.Purpose): The purpose of the SSLContext

Returns:
(None)
"""
assert service in ["nifi", "registry"]
if client_key_file is None:
ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
else:
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context = ssl.create_default_context(
purpose=purpose or ssl.Purpose.SERVER_AUTH
)
if client_cert_file is not None and client_key_file is not None:
try:
ssl_context.load_cert_chain(
certfile=client_cert_file,
Expand Down Expand Up @@ -806,38 +808,45 @@ def set_service_ssl_context(


# pylint: disable=W0702,R0912
def bootstrap_security_policies(service, user_identity=None,
group_identity=None):
"""
Creates a default security context within NiFi or Nifi-Registry
def bootstrap_security_policies(service, user_identity=None, group_identity=None):
"""Creates a default security context within NiFi or Nifi-Registry.

Args:
service (str): 'nifi' or 'registry' to indicate which service
user_identity: a service user to establish in the security context
group_identity: a service group to establish in the security context
service (str): The service to configure security for ('nifi' or 'registry')
user_identity (nipyapi.nifi.UserEntity or nipyapi.registry.User, optional): User identity to apply policies to
group_identity (nipyapi.nifi.UserGroupEntity or nipyapi.registry.UserGroup, optional): Group identity to apply policies to

Returns:
None

"""
assert service in _valid_services, "service not in %s" % _valid_services
valid_ident_obj = [nipyapi.nifi.UserEntity, nipyapi.registry.User]
if user_identity is not None:
assert type(user_identity) in valid_ident_obj

if "nifi" in service:
rpg_id = nipyapi.canvas.get_root_pg_id()
if user_identity is None and group_identity is None:
# Try to find user by certificate DN if using mTLS
nifi_user_identity = nipyapi.security.get_service_user(
nipyapi.config.default_nifi_username, service="nifi"
nipyapi.config.default_mtls_identity, service="nifi"
)
# Fall back to default username if not found
if not nifi_user_identity:
nifi_user_identity = nipyapi.security.get_service_user(
nipyapi.config.default_nifi_username, service="nifi"
)
else:
nifi_user_identity = user_identity

access_policies = [
("write", "process-groups", rpg_id),
("read", "process-groups", rpg_id),
("write", "data/process-groups", rpg_id),
("read", "data/process-groups", rpg_id),
("read", "system", None),
("read", "system-diagnostics", None),
("read", "policies", None),
]
for pol in access_policies:
ap = nipyapi.security.get_access_policy_for_resource(
Expand Down Expand Up @@ -868,12 +877,19 @@ def bootstrap_security_policies(service, user_identity=None,
)
else:
if user_identity is None and group_identity is None:
# Try to find user by certificate DN if using mTLS
reg_user_identity = nipyapi.security.get_service_user(
nipyapi.config.default_registry_username,
service="registry"
nipyapi.config.default_mtls_identity, service="registry"
)
# Fall back to default username if not found
if not reg_user_identity:
reg_user_identity = nipyapi.security.get_service_user(
nipyapi.config.default_registry_username,
service="registry"
)
else:
reg_user_identity = user_identity

all_buckets_access_policies = [
("read", "/buckets"),
("write", "/buckets"),
Expand All @@ -887,20 +903,27 @@ def bootstrap_security_policies(service, user_identity=None,
auto_create=True
)
if reg_user_identity is None:
nipyapi.security.add_user_group_to_access_policy(
user_group=group_identity,
policy=pol,
service="registry"
)
if group_identity: # Only try to add group if it exists
nipyapi.security.add_user_group_to_access_policy(
user_group=group_identity,
policy=pol,
service="registry"
)
else:
nipyapi.security.add_user_to_access_policy(
user=reg_user_identity, policy=pol,
user=reg_user_identity,
policy=pol,
service="registry",
strict=False
)
# get the identity of the user as a string
if isinstance(reg_user_identity, nipyapi.registry.User):
reg_user_ident_str = reg_user_identity.identity
else:
reg_user_ident_str = reg_user_identity
# Setup Proxy Access
nifi_proxy = nipyapi.security.create_service_user(
identity=nipyapi.config.default_proxy_user,
nifi_proxy_user = nipyapi.security.create_service_user(
identity=reg_user_ident_str,
service="registry",
strict=False
)
Expand All @@ -917,8 +940,66 @@ def bootstrap_security_policies(service, user_identity=None,
auto_create=True
)
nipyapi.security.add_user_to_access_policy(
user=nifi_proxy,
user=nifi_proxy_user,
policy=pol,
service="registry",
strict=False
)


def create_ssl_context_controller_service(
parent_pg, name, keystore_file, keystore_password, truststore_file, truststore_password,
key_password=None, keystore_type=None, truststore_type=None, ssl_protocol=None, ssl_service_type=None):
"""
Creates and configures an SSL Context Service for secure client connections.
Note that once created it can be listed and deleted using the standard canvas functions.

Args:
parent_pg (ProcessGroupEntity): The Process Group to create the service in
name (str): Name for the SSL Context Service
keystore_file (str): Path to the client certificate/keystore file
keystore_password (str): Password for the keystore
truststore_file (str): Path to the truststore file
truststore_password (str): Password for the truststore
key_password (Optional[str]): Password for the key, defaults to keystore_password if not set
keystore_type (Optional[str]): Type of keystore (JKS, PKCS12), defaults to JKS
truststore_type (Optional[str]): Type of truststore (JKS, PKCS12), defaults to JKS
ssl_protocol (Optional[str]): SSL protocol version, defaults to TLS
ssl_service_type (Optional[str]): SSL service type, defaults to StandardRestrictedSSLContextService

Returns:
(ControllerServiceEntity): The configured SSL Context Service
"""
assert isinstance(parent_pg, nipyapi.nifi.ProcessGroupEntity)
assert isinstance(name, six.string_types)
assert isinstance(keystore_file, six.string_types)
assert isinstance(keystore_password, six.string_types)
assert isinstance(truststore_file, six.string_types)
assert isinstance(truststore_password, six.string_types)
assert key_password is None or isinstance(key_password, six.string_types)
assert keystore_type is None or isinstance(keystore_type, six.string_types)
assert truststore_type is None or isinstance(truststore_type, six.string_types)
assert ssl_protocol is None or isinstance(ssl_protocol, six.string_types)

with nipyapi.utils.rest_exceptions():
return nipyapi.nifi.ControllerApi().create_controller_service(
body=nipyapi.nifi.ControllerServiceEntity(
revision=nipyapi.nifi.RevisionDTO(
version=0
),
component=nipyapi.nifi.ControllerServiceDTO(
type= ssl_service_type or 'org.apache.nifi.ssl.StandardRestrictedSSLContextService',
name=name,
properties={
'Keystore Filename': keystore_file,
'Keystore Password': keystore_password,
'key-password': key_password or keystore_password,
'Keystore Type': keystore_type or 'JKS',
'Truststore Filename': truststore_file,
'Truststore Password': truststore_password,
'Truststore Type': truststore_type or 'JKS',
'SSL Protocol': ssl_protocol or 'TLS'
}
)
)
)
46 changes: 24 additions & 22 deletions nipyapi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,25 +295,17 @@ def is_endpoint_up(endpoint_url):
return False


def set_endpoint(endpoint_url, ssl=False, login=False,
username=None, password=None):
"""
EXPERIMENTAL

Sets the endpoint when switching between instances of NiFi or other
projects. Not tested extensively with secured instances.
def set_endpoint(endpoint_url, ssl=False, login=False, username=None, password=None):
"""Sets the endpoint when switching between instances of NiFi or other projects.

Args:
endpoint_url (str): The URL to set as the endpoint. Auto-detects the
relevant service e.g. 'http://localhost:18080/nifi-registry-api'
ssl (bool): Whether to use the default security context in
nipyapi.config to authenticate if a secure URL is detected
login (bool): Whether to attempt login using default cred in config
requires ssl to be set
username (str): The username to use for login, if specified
password (str): The password to use for login, if specified
endpoint_url (str): The URL to set as the endpoint
ssl (bool): Whether to use SSL context for HTTPS connections
login (bool): Whether to attempt token-based login
username (str): The username to use for login
password (str): The password to use for login

Returns (bool): True for success, False for not
Returns (bool): True for success
"""
log.info("Called set_endpoint with args %s", locals())
if 'nifi-api' in endpoint_url:
Expand All @@ -324,28 +316,38 @@ def set_endpoint(endpoint_url, ssl=False, login=False,
service = 'registry'
else:
raise ValueError("Endpoint not recognised")

log.info("Setting %s endpoint to %s", service, endpoint_url)
if configuration.api_client:
# Running controlled logout procedure
nipyapi.security.service_logout(service)
# Resetting API client so it recreates from config.host
configuration.api_client = None

# remove any trailing slash to avoid hard to spot errors
configuration.host = endpoint_url.rstrip('/')
if 'https://' in endpoint_url and ssl:
if not login:
nipyapi.security.set_service_ssl_context(
service=service,
**nipyapi.config.default_ssl_context
)

# Set up SSL context if using HTTPS
if ssl and 'https://' in endpoint_url:
if login:
# Username/password auth with basic SSL
nipyapi.security.set_service_ssl_context(
service=service,
ca_file=nipyapi.config.default_ssl_context['ca_file']
)
nipyapi.security.service_login(
service, username=username, password=password
)
else:
# mTLS auth with client certificates
nipyapi.security.set_service_ssl_context(
service=service,
ca_file=nipyapi.config.default_ssl_context['ca_file'],
client_cert_file=nipyapi.config.default_ssl_context['client_cert_file'],
client_key_file=nipyapi.config.default_ssl_context['client_key_file'],
client_key_password=nipyapi.config.default_ssl_context['client_key_password']
)

return True


Expand Down
Loading