diff --git a/doc/StepFunctionsWorkflowExecutionPolicy.json b/doc/StepFunctionsWorkflowExecutionPolicy.json index b9fbb54..dd4562c 100644 --- a/doc/StepFunctionsWorkflowExecutionPolicy.json +++ b/doc/StepFunctionsWorkflowExecutionPolicy.json @@ -20,6 +20,9 @@ "sagemaker:DeleteEndpoint", "sagemaker:UpdateEndpoint", "sagemaker:ListTags", + "sagemaker:CreateProcessingJob", + "sagemaker:DescribeProcessingJob", + "sagemaker:StopProcessingJob", "lambda:InvokeFunction", "sqs:SendMessage", "sns:Publish", @@ -63,6 +66,7 @@ "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTrainingJobsRule", "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTransformJobsRule", "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTuningJobsRule", + "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerProcessingJobsRule", "arn:aws:events:*:*:rule/StepFunctionsGetEventsForECSTaskRule", "arn:aws:events:*:*:rule/StepFunctionsGetEventsForBatchJobsRule" ] diff --git a/doc/sagemaker.rst b/doc/sagemaker.rst index d2d8296..b4db56e 100644 --- a/doc/sagemaker.rst +++ b/doc/sagemaker.rst @@ -14,3 +14,5 @@ This module provides classes to build steps that integrate with Amazon SageMaker .. autoclass:: stepfunctions.steps.sagemaker.EndpointConfigStep .. autoclass:: stepfunctions.steps.sagemaker.EndpointStep + +.. autoclass:: stepfunctions.steps.sagemaker.ProcessingStep \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index e345d0a..464d5b1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -sagemaker>=1.42.8 +sagemaker>=1.71.0 boto3>=1.9.213 pyyaml diff --git a/src/stepfunctions/steps/__init__.py b/src/stepfunctions/steps/__init__.py index 42c763c..05d7c64 100644 --- a/src/stepfunctions/steps/__init__.py +++ b/src/stepfunctions/steps/__init__.py @@ -16,7 +16,7 @@ from stepfunctions.steps.states import Pass, Succeed, Fail, Wait, Choice, Parallel, Map, Task, Chain, Retry, Catch from stepfunctions.steps.states import Graph, FrozenGraph -from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointConfigStep, EndpointStep +from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointConfigStep, EndpointStep, ProcessingStep from stepfunctions.steps.compute import LambdaStep, BatchSubmitJobStep, GlueStartJobRunStep, EcsRunTaskStep from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep diff --git a/src/stepfunctions/steps/sagemaker.py b/src/stepfunctions/steps/sagemaker.py index edaa64c..6321563 100644 --- a/src/stepfunctions/steps/sagemaker.py +++ b/src/stepfunctions/steps/sagemaker.py @@ -17,7 +17,7 @@ from stepfunctions.steps.fields import Field from stepfunctions.steps.utils import tags_dict_to_kv_list -from sagemaker.workflow.airflow import training_config, transform_config, model_config, tuning_config +from sagemaker.workflow.airflow import training_config, transform_config, model_config, tuning_config, processing_config from sagemaker.model import Model, FrameworkModel from sagemaker.model_monitor import DataCaptureConfig @@ -356,3 +356,58 @@ def __init__(self, state_id, tuner, job_name, data, wait_for_completion=True, ta kwargs[Field.Parameters.value] = parameters super(TuningStep, self).__init__(state_id, **kwargs) + + +class ProcessingStep(Task): + + """ + Creates a Task State to execute a SageMaker Processing Job. + """ + + def __init__(self, state_id, processor, job_name, inputs=None, outputs=None, experiment_config=None, container_arguments=None, container_entrypoint=None, kms_key_id=None, wait_for_completion=True, tags=None, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + processor (sagemaker.processing.Processor): The processor for the processing step. + job_name (str or Placeholder): Specify a processing job name, this is required for the processing job to run. We recommend to use :py:class:`~stepfunctions.inputs.ExecutionInput` placeholder collection to pass the value dynamically in each execution. + inputs (list[:class:`~sagemaker.processing.ProcessingInput`]): Input files for + the processing job. These must be provided as + :class:`~sagemaker.processing.ProcessingInput` objects (default: None). + outputs (list[:class:`~sagemaker.processing.ProcessingOutput`]): Outputs for + the processing job. These can be specified as either path strings or + :class:`~sagemaker.processing.ProcessingOutput` objects (default: None). + experiment_config (dict, optional): Specify the experiment config for the processing. (Default: None) + container_arguments ([str]): The arguments for a container used to run a processing job. + container_entrypoint ([str]): The entrypoint for a container used to run a processing job. + kms_key_id (str): The AWS Key Management Service (AWS KMS) key that Amazon SageMaker + uses to encrypt the processing job output. KmsKeyId can be an ID of a KMS key, + ARN of a KMS key, alias of a KMS key, or alias of a KMS key. + The KmsKeyId is applied to all outputs. + wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait for the processing job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the processing job and proceed to the next step. (default: True) + tags (list[dict], optional): `List to tags `_ to associate with the resource. + """ + if wait_for_completion: + kwargs[Field.Resource.value] = 'arn:aws:states:::sagemaker:createProcessingJob.sync' + else: + kwargs[Field.Resource.value] = 'arn:aws:states:::sagemaker:createProcessingJob' + + if isinstance(job_name, str): + parameters = processing_config(processor=processor, inputs=inputs, outputs=outputs, container_arguments=container_arguments, container_entrypoint=container_entrypoint, kms_key_id=kms_key_id, job_name=job_name) + else: + parameters = processing_config(processor=processor, inputs=inputs, outputs=outputs, container_arguments=container_arguments, container_entrypoint=container_entrypoint, kms_key_id=kms_key_id) + + if isinstance(job_name, (ExecutionInput, StepInput)): + parameters['ProcessingJobName'] = job_name + + if experiment_config is not None: + parameters['ExperimentConfig'] = experiment_config + + if tags: + parameters['Tags'] = tags_dict_to_kv_list(tags) + + if 'S3Operations' in parameters: + del parameters['S3Operations'] + + kwargs[Field.Parameters.value] = parameters + + super(ProcessingStep, self).__init__(state_id, **kwargs) diff --git a/tests/data/sklearn_processing/preprocessor.py b/tests/data/sklearn_processing/preprocessor.py new file mode 100644 index 0000000..e5047a6 --- /dev/null +++ b/tests/data/sklearn_processing/preprocessor.py @@ -0,0 +1,75 @@ +import argparse +import os +import warnings + +import pandas as pd +import numpy as np +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelBinarizer, KBinsDiscretizer +from sklearn.preprocessing import PolynomialFeatures +from sklearn.compose import make_column_transformer + +from sklearn.exceptions import DataConversionWarning +warnings.filterwarnings(action='ignore', category=DataConversionWarning) + + +columns = ['age', 'education', 'major industry code', 'class of worker', 'num persons worked for employer', + 'capital gains', 'capital losses', 'dividends from stocks', 'income'] +class_labels = [' - 50000.', ' 50000+.'] + +def print_shape(df): + negative_examples, positive_examples = np.bincount(df['income']) + print('Data shape: {}, {} positive examples, {} negative examples'.format(df.shape, positive_examples, negative_examples)) + +if __name__=='__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--train-test-split-ratio', type=float, default=0.3) + args, _ = parser.parse_known_args() + + print('Received arguments {}'.format(args)) + + input_data_path = os.path.join('/opt/ml/processing/input', 'census-income.csv') + + print('Reading input data from {}'.format(input_data_path)) + df = pd.read_csv(input_data_path) + df = pd.DataFrame(data=df, columns=columns) + df.dropna(inplace=True) + df.drop_duplicates(inplace=True) + df.replace(class_labels, [0, 1], inplace=True) + + negative_examples, positive_examples = np.bincount(df['income']) + print('Data after cleaning: {}, {} positive examples, {} negative examples'.format(df.shape, positive_examples, negative_examples)) + + split_ratio = args.train_test_split_ratio + print('Splitting data into train and test sets with ratio {}'.format(split_ratio)) + X_train, X_test, y_train, y_test = train_test_split(df.drop('income', axis=1), df['income'], test_size=split_ratio, random_state=0) + + preprocess = make_column_transformer( + (['age', 'num persons worked for employer'], KBinsDiscretizer(encode='onehot-dense', n_bins=10)), + (['capital gains', 'capital losses', 'dividends from stocks'], StandardScaler()), + (['education', 'major industry code', 'class of worker'], OneHotEncoder(sparse=False)) + ) + print('Running preprocessing and feature engineering transformations') + train_features = preprocess.fit_transform(X_train) + test_features = preprocess.transform(X_test) + + print('Train data shape after preprocessing: {}'.format(train_features.shape)) + print('Test data shape after preprocessing: {}'.format(test_features.shape)) + + train_features_output_path = os.path.join('/opt/ml/processing/train', 'train_features.csv') + train_labels_output_path = os.path.join('/opt/ml/processing/train', 'train_labels.csv') + + test_features_output_path = os.path.join('/opt/ml/processing/test', 'test_features.csv') + test_labels_output_path = os.path.join('/opt/ml/processing/test', 'test_labels.csv') + + print('Saving training features to {}'.format(train_features_output_path)) + pd.DataFrame(train_features).to_csv(train_features_output_path, header=False, index=False) + + print('Saving test features to {}'.format(test_features_output_path)) + pd.DataFrame(test_features).to_csv(test_features_output_path, header=False, index=False) + + print('Saving training labels to {}'.format(train_labels_output_path)) + y_train.to_csv(train_labels_output_path, header=False, index=False) + + print('Saving test labels to {}'.format(test_labels_output_path)) + y_test.to_csv(test_labels_output_path, header=False, index=False) \ No newline at end of file diff --git a/tests/integ/conftest.py b/tests/integ/conftest.py index 55a74c3..394838e 100644 --- a/tests/integ/conftest.py +++ b/tests/integ/conftest.py @@ -20,6 +20,7 @@ import pickle from sagemaker import Session from sagemaker.amazon import pca +from sagemaker.sklearn.processing import SKLearnProcessor from tests.integ import DATA_DIR @pytest.fixture(scope="session") @@ -58,6 +59,17 @@ def pca_estimator_fixture(sagemaker_role_arn): ) return estimator +@pytest.fixture(scope="session") +def sklearn_processor_fixture(sagemaker_role_arn): + processor = SKLearnProcessor( + framework_version="0.20.0", + role=sagemaker_role_arn, + instance_type="ml.m5.xlarge", + instance_count=1, + max_runtime_in_seconds=300 + ) + return processor + @pytest.fixture(scope="session") def train_set(): data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") diff --git a/tests/integ/test_sagemaker_steps.py b/tests/integ/test_sagemaker_steps.py index 36309e8..36f2940 100644 --- a/tests/integ/test_sagemaker_steps.py +++ b/tests/integ/test_sagemaker_steps.py @@ -27,9 +27,10 @@ from sagemaker.utils import unique_name_from_base from sagemaker.parameter import IntegerParameter, CategoricalParameter from sagemaker.tuner import HyperparameterTuner +from sagemaker.processing import ProcessingInput, ProcessingOutput from stepfunctions.steps import Chain -from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointStep, EndpointConfigStep, TuningStep +from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointStep, EndpointConfigStep, TuningStep, ProcessingStep from stepfunctions.workflow import Workflow from tests.integ import DATA_DIR, DEFAULT_TIMEOUT_MINUTES @@ -297,3 +298,56 @@ def test_tuning_step(sfn_client, record_set_for_hyperparameter_tuning, sagemaker # Cleanup state_machine_delete_wait(sfn_client, workflow.state_machine_arn) # End of Cleanup + +def test_processing_step(sklearn_processor_fixture, sagemaker_session, sfn_client, sfn_role_arn): + region = boto3.session.Session().region_name + input_data = 's3://sagemaker-sample-data-{}/processing/census/census-income.csv'.format(region) + + input_s3 = sagemaker_session.upload_data( + path=os.path.join(DATA_DIR, 'sklearn_processing'), + bucket=sagemaker_session.default_bucket(), + key_prefix='integ-test-data/sklearn_processing/code' + ) + + output_s3 = 's3://' + sagemaker_session.default_bucket() + '/integ-test-data/sklearn_processing' + + inputs = [ + ProcessingInput(source=input_data, destination='/opt/ml/processing/input', input_name='input-1'), + ProcessingInput(source=input_s3 + '/preprocessor.py', destination='/opt/ml/processing/input/code', input_name='code'), + ] + + outputs = [ + ProcessingOutput(source='/opt/ml/processing/train', destination=output_s3 + '/train_data', output_name='train_data'), + ProcessingOutput(source='/opt/ml/processing/test', destination=output_s3 + '/test_data', output_name='test_data'), + ] + + job_name = generate_job_name() + processing_step = ProcessingStep('create_processing_job_step', + processor=sklearn_processor_fixture, + job_name=job_name, + inputs=inputs, + outputs=outputs, + container_arguments=['--train-test-split-ratio', '0.2'], + container_entrypoint=['python3', '/opt/ml/processing/input/code/preprocessor.py'], + ) + workflow_graph = Chain([processing_step]) + + with timeout(minutes=DEFAULT_TIMEOUT_MINUTES): + # Create workflow and check definition + workflow = create_workflow_and_check_definition( + workflow_graph=workflow_graph, + workflow_name=unique_name_from_base("integ-test-processing-step-workflow"), + sfn_client=sfn_client, + sfn_role_arn=sfn_role_arn + ) + + # Execute workflow + execution = workflow.execute() + execution_output = execution.get_output(wait=True) + + # Check workflow output + assert execution_output.get("ProcessingJobStatus") == "Completed" + + # Cleanup + state_machine_delete_wait(sfn_client, workflow.state_machine_arn) + # End of Cleanup diff --git a/tests/unit/test_sagemaker_steps.py b/tests/unit/test_sagemaker_steps.py index 095f9f8..bdc7a57 100644 --- a/tests/unit/test_sagemaker_steps.py +++ b/tests/unit/test_sagemaker_steps.py @@ -22,9 +22,11 @@ from sagemaker.pipeline import PipelineModel from sagemaker.model_monitor import DataCaptureConfig from sagemaker.debugger import Rule, rule_configs, DebuggerHookConfig, CollectionConfig +from sagemaker.sklearn.processing import SKLearnProcessor +from sagemaker.processing import ProcessingInput, ProcessingOutput from unittest.mock import MagicMock, patch -from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointStep, EndpointConfigStep +from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointStep, EndpointConfigStep, ProcessingStep from stepfunctions.steps.sagemaker import tuning_config from tests.unit.utils import mock_boto_api_call @@ -156,6 +158,22 @@ def tensorflow_estimator(): return estimator +@pytest.fixture +def sklearn_processor(): + sagemaker_session = MagicMock() + sagemaker_session.boto_region_name = 'us-east-1' + sagemaker_session._default_bucket = 'sagemaker' + + processor = SKLearnProcessor( + framework_version="0.20.0", + role=EXECUTION_ROLE, + instance_type="ml.m5.xlarge", + instance_count=1, + sagemaker_session=sagemaker_session + ) + + return processor + @patch('botocore.client.BaseClient._make_api_call', new=mock_boto_api_call) def test_training_step_creation(pca_estimator): step = TrainingStep('Training', @@ -566,3 +584,72 @@ def test_endpoint_step_creation(pca_model): 'Resource': 'arn:aws:states:::sagemaker:updateEndpoint', 'End': True } + +def test_processing_step_creation(sklearn_processor): + inputs = [ProcessingInput(source='dataset.csv', destination='/opt/ml/processing/input')] + outputs = [ + ProcessingOutput(source='/opt/ml/processing/output/train'), + ProcessingOutput(source='/opt/ml/processing/output/validation'), + ProcessingOutput(source='/opt/ml/processing/output/test') + ] + step = ProcessingStep('Feature Transformation', sklearn_processor, 'MyProcessingJob', inputs=inputs, outputs=outputs) + assert step.to_dict() == { + 'Type': 'Task', + 'Parameters': { + 'AppSpecification': { + 'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.20.0-cpu-py3' + }, + 'ProcessingInputs': [ + { + 'InputName': None, + 'S3Input': { + 'LocalPath': '/opt/ml/processing/input', + 'S3CompressionType': 'None', + 'S3DataDistributionType': 'FullyReplicated', + 'S3DataType': 'S3Prefix', + 'S3InputMode': 'File', + 'S3Uri': 'dataset.csv' + } + } + ], + 'ProcessingOutputConfig': { + 'Outputs': [ + { + 'OutputName': None, + 'S3Output': { + 'LocalPath': '/opt/ml/processing/output/train', + 'S3UploadMode': 'EndOfJob', + 'S3Uri': None + } + }, + { + 'OutputName': None, + 'S3Output': { + 'LocalPath': '/opt/ml/processing/output/validation', + 'S3UploadMode': 'EndOfJob', + 'S3Uri': None + } + }, + { + 'OutputName': None, + 'S3Output': { + 'LocalPath': '/opt/ml/processing/output/test', + 'S3UploadMode': 'EndOfJob', + 'S3Uri': None + } + } + ] + }, + 'ProcessingResources': { + 'ClusterConfig': { + 'InstanceCount': 1, + 'InstanceType': 'ml.m5.xlarge', + 'VolumeSizeInGB': 30 + } + }, + 'ProcessingJobName': 'MyProcessingJob', + 'RoleArn': EXECUTION_ROLE + }, + 'Resource': 'arn:aws:states:::sagemaker:createProcessingJob.sync', + 'End': True + }