From 27d5792106e8e29f01d356662917ca8dd66c6448 Mon Sep 17 00:00:00 2001 From: Shiv Lakshminarayan Date: Thu, 17 Jun 2021 09:53:21 -0700 Subject: [PATCH 1/4] chore: refresh contributing guide to reference the main branch which is the default branch now --- CONTRIBUTING.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index b014669..5fb3e01 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -42,7 +42,7 @@ Contributions via pull requests are much appreciated. Before sending us a pull request, please ensure that: -* You are working against the latest source on the *master* branch. +* You are working against the latest source on the *main* branch. * You check the existing open and recently merged pull requests to make sure someone else hasn't already addressed the problem. * You open an issue to discuss any significant work - we would hate for your time to be wasted. @@ -146,6 +146,6 @@ If you discover a potential security issue in this project we ask that you notif ## Licensing -See the [LICENSE](https://github.com/aws/aws-step-functions-data-science-sdk-python/blob/master/LICENSE) file for our project's licensing. We will ask you to confirm the licensing of your contribution. +See the [LICENSE](https://github.com/aws/aws-step-functions-data-science-sdk-python/blob/main/LICENSE) file for our project's licensing. We will ask you to confirm the licensing of your contribution. We may ask you to sign a [Contributor License Agreement (CLA)](http://en.wikipedia.org/wiki/Contributor_License_Agreement) for larger changes. From 07f0cb255483ddaad3ba08d7904e5f7826b75134 Mon Sep 17 00:00:00 2001 From: Shiv Lakshminarayan Date: Thu, 17 Jun 2021 11:10:32 -0700 Subject: [PATCH 2/4] feat: add support for EventBridge service integration Added EventBridgePutEvents step, which can be leveraged to create a task for sending a custom event to an event bus. Closes #141 --- doc/services.rst | 26 +++++++----- src/stepfunctions/steps/__init__.py | 4 +- src/stepfunctions/steps/service.py | 48 ++++++++++++++++++++- tests/unit/test_service_steps.py | 65 +++++++++++++++++++++++++++++ 4 files changed, 130 insertions(+), 13 deletions(-) diff --git a/doc/services.rst b/doc/services.rst index 1ac3738..405ccf4 100644 --- a/doc/services.rst +++ b/doc/services.rst @@ -8,12 +8,14 @@ This module provides classes to build steps that integrate with Amazon DynamoDB, - `Amazon DynamoDB <#amazon-dynamodb>`__ +- `Amazon EMR <#amazon-emr>`__ + +- `Amazon EventBridge <#amazon-eventbridge>`__ + - `Amazon SNS <#amazon-sns>`__ - `Amazon SQS <#amazon-sqs>`__ -- `Amazon EMR <#amazon-emr>`__ - Amazon DynamoDB ---------------- @@ -25,14 +27,6 @@ Amazon DynamoDB .. autoclass:: stepfunctions.steps.service.DynamoDBUpdateItemStep -Amazon SNS ------------ -.. autoclass:: stepfunctions.steps.service.SnsPublishStep - -Amazon SQS ------------ -.. autoclass:: stepfunctions.steps.service.SqsSendMessageStep - Amazon EMR ----------- .. autoclass:: stepfunctions.steps.service.EmrCreateClusterStep @@ -48,3 +42,15 @@ Amazon EMR .. autoclass:: stepfunctions.steps.service.EmrModifyInstanceFleetByNameStep .. autoclass:: stepfunctions.steps.service.EmrModifyInstanceGroupByNameStep + +Amazon EventBridge +----------- +.. autoclass:: stepfunctions.steps.service.EventBridgePutEventsStep + +Amazon SNS +----------- +.. autoclass:: stepfunctions.steps.service.SnsPublishStep + +Amazon SQS +----------- +.. autoclass:: stepfunctions.steps.service.SqsSendMessageStep diff --git a/src/stepfunctions/steps/__init__.py b/src/stepfunctions/steps/__init__.py index 92598f1..b27d411 100644 --- a/src/stepfunctions/steps/__init__.py +++ b/src/stepfunctions/steps/__init__.py @@ -19,6 +19,6 @@ from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointConfigStep, EndpointStep, TuningStep, ProcessingStep from stepfunctions.steps.compute import LambdaStep, BatchSubmitJobStep, GlueStartJobRunStep, EcsRunTaskStep from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep -from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep - +from stepfunctions.steps.service import EventBridgePutEventsStep +from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 6bf155f..5c32a88 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -18,9 +18,11 @@ from stepfunctions.steps.integration_resources import IntegrationPattern, get_service_integration_arn DYNAMODB_SERVICE_NAME = "dynamodb" +ELASTICMAPREDUCE_SERVICE_NAME = "elasticmapreduce" +EVENTBRIDGE_SERVICE_NAME = "events" SNS_SERVICE_NAME = "sns" SQS_SERVICE_NAME = "sqs" -ELASTICMAPREDUCE_SERVICE_NAME = "elasticmapreduce" + class DynamoDBApi(Enum): @@ -48,6 +50,10 @@ class ElasticMapReduceApi(Enum): ModifyInstanceGroupByName = "modifyInstanceGroupByName" +class EventBridgeApi(Enum): + PutEvents = "putEvents" + + class DynamoDBGetItemStep(Task): """ Creates a Task state to get an item from DynamoDB. See `Call DynamoDB APIs with Step Functions `_ for more details. @@ -77,6 +83,46 @@ def __init__(self, state_id, **kwargs): super(DynamoDBGetItemStep, self).__init__(state_id, **kwargs) +class EventBridgePutEventsStep(Task): + + """ + Creates a Task to send custom events to Amazon EventBridge. See`Call EventBridge with Step Functions `_ for more details. + """ + + def __init__(self, state_id, wait_for_callback=False, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60) + timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. + heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + """ + + if wait_for_callback: + """ + Example resource arn: arn:aws:states:::events:putEvents.waitForTaskToken + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EVENTBRIDGE_SERVICE_NAME, + EventBridgeApi.PutEvents, + IntegrationPattern.WaitForTaskToken) + else: + """ + Example resource arn: arn:aws:states:::events:putEvents + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EVENTBRIDGE_SERVICE_NAME, + EventBridgeApi.PutEvents) + + super(EventBridgePutEventsStep, self).__init__(state_id, **kwargs) + + class DynamoDBPutItemStep(Task): """ diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index 6576aaf..3d1640d 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -19,6 +19,7 @@ from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep +from stepfunctions.steps.service import EventBridgePutEventsStep @patch.object(boto3.session.Session, 'region_name', 'us-east-1') @@ -98,6 +99,70 @@ def test_sqs_send_message_step_creation(): 'End': True } +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_eventbridge_put_events_step_creation(): + step = EventBridgePutEventsStep('Send to EventBridge', parameters={ + "Entries": [ + { + "Detail": { + "Message": "MyMessage" + }, + "DetailType": "MyDetailType", + "EventBusName": "MyEventBus", + "Source": "my.source" + } + ] + }) + + assert step.to_dict() == { + "Type": "Task", + "Resource": 'arn:aws:states:::events:putEvents', + "Parameters": { + "Entries": [ + { + "Detail": { + "Message": "MyMessage" + }, + "DetailType": "MyDetailType", + "EventBusName": "MyEventBus", + "Source": "my.source" + } + ] + }, + "End": True + } + + step = EventBridgePutEventsStep('Send to EventBridge', wait_for_callback=True, parameters={ + "Entries": [ + { + "Detail": { + "Message.$": "$.MyMessage" + }, + "DetailType": "MyDetailType", + "EventBusName": "MyEventBus", + "Source": "my.source" + } + ] + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::events:putEvents.waitForTaskToken', + 'Parameters': { + "Entries": [ + { + "Detail": { + "Message.$": "$.MyMessage" + }, + "DetailType": "MyDetailType", + "EventBusName": "MyEventBus", + "Source": "my.source" + } + ] + }, + 'End': True + } + @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_dynamodb_get_item_step_creation(): From f26056b0ace6c56af84abe7901e150d19968841c Mon Sep 17 00:00:00 2001 From: Shiv Lakshminarayan Date: Thu, 17 Jun 2021 11:13:06 -0700 Subject: [PATCH 3/4] Revert "chore: refresh contributing guide to reference the main branch which is the default branch now" This reverts commit 27d5792106e8e29f01d356662917ca8dd66c6448. --- CONTRIBUTING.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 5fb3e01..b014669 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -42,7 +42,7 @@ Contributions via pull requests are much appreciated. Before sending us a pull request, please ensure that: -* You are working against the latest source on the *main* branch. +* You are working against the latest source on the *master* branch. * You check the existing open and recently merged pull requests to make sure someone else hasn't already addressed the problem. * You open an issue to discuss any significant work - we would hate for your time to be wasted. @@ -146,6 +146,6 @@ If you discover a potential security issue in this project we ask that you notif ## Licensing -See the [LICENSE](https://github.com/aws/aws-step-functions-data-science-sdk-python/blob/main/LICENSE) file for our project's licensing. We will ask you to confirm the licensing of your contribution. +See the [LICENSE](https://github.com/aws/aws-step-functions-data-science-sdk-python/blob/master/LICENSE) file for our project's licensing. We will ask you to confirm the licensing of your contribution. We may ask you to sign a [Contributor License Agreement (CLA)](http://en.wikipedia.org/wiki/Contributor_License_Agreement) for larger changes. From 2d6b57b22f708706086d38b19f2ca73e90989ce0 Mon Sep 17 00:00:00 2001 From: Shiv Lakshminarayan Date: Thu, 17 Jun 2021 11:17:37 -0700 Subject: [PATCH 4/4] style changes --- tests/unit/test_service_steps.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index 3d1640d..53039c7 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -146,9 +146,9 @@ def test_eventbridge_put_events_step_creation(): }) assert step.to_dict() == { - 'Type': 'Task', - 'Resource': 'arn:aws:states:::events:putEvents.waitForTaskToken', - 'Parameters': { + "Type": "Task", + "Resource": "arn:aws:states:::events:putEvents.waitForTaskToken", + "Parameters": { "Entries": [ { "Detail": { @@ -160,7 +160,7 @@ def test_eventbridge_put_events_step_creation(): } ] }, - 'End': True + "End": True }