Skip to content

Shared memory data transfer between Functions Host and Python worker #816

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 81 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
c83c7fe
Basic structure laid out for reading parameters from shared memory be…
Oct 14, 2020
c96eba9
Writing output from worker to Shared Memory
Oct 15, 2020
65107aa
Put output from worker into Shared Memory
Oct 15, 2020
d071bd8
Free shared memory resources after use
Oct 16, 2020
81c565c
Removed control flag from mmap header
Oct 19, 2020
d4ab3b0
Proto change
Oct 20, 2020
7de45f3
Working for blob shared memory data transfer; needs clean up, comment…
Oct 22, 2020
0f6ccde
Changing message for closing mmaps
Oct 22, 2020
c736890
Support for string datatype for shared memory data transfer
Oct 26, 2020
2dba4e7
Change to oneof
Nov 11, 2020
925cf55
Use oneof in .proto
Nov 11, 2020
997a1af
Refactoring mmap_handler
Dec 15, 2020
10648c0
Refactoring, cleaning up and adding docstrings
Dec 16, 2020
3ac5ef2
Updating CloseSharedMemoryResourcesResponse usage
Dec 16, 2020
6f2160f
Fixing accidental changes to tests/*
Dec 16, 2020
9398165
Addressing comments
Dec 17, 2020
ded4998
Basic structure laid out for reading parameters from shared memory be…
Oct 14, 2020
38c5502
Writing output from worker to Shared Memory
Oct 15, 2020
9a15243
Put output from worker into Shared Memory
Oct 15, 2020
fa0931d
Free shared memory resources after use
Oct 16, 2020
b469f2a
Removed control flag from mmap header
Oct 19, 2020
be814e2
Proto change
Oct 20, 2020
a1d5d98
Working for blob shared memory data transfer; needs clean up, comment…
Oct 22, 2020
af9faeb
Changing message for closing mmaps
Oct 22, 2020
ea3fbae
Support for string datatype for shared memory data transfer
Oct 26, 2020
eb33701
Change to oneof
Nov 11, 2020
e0fd812
Refactoring mmap_handler
Dec 15, 2020
beb1d23
Refactoring, cleaning up and adding docstrings
Dec 16, 2020
b5a7e20
Updating CloseSharedMemoryResourcesResponse usage
Dec 16, 2020
b0e645e
Fixing accidental changes to tests/*
Dec 16, 2020
a04982d
Addressing comments
Dec 17, 2020
84f08a2
Cleaning up, addressing comments
Feb 18, 2021
72b4446
Following same class structure as the shared memory changes made for …
Feb 18, 2021
afa115b
Moving shared memory data transfer related changes into separate dire…
Feb 19, 2021
26e2919
Input error checks for shared memory map ctor
Feb 19, 2021
6001f6c
Rebase fix
Feb 19, 2021
289255c
Trying to make lint happy
Feb 19, 2021
dca1c2c
Making flake8 happy
Feb 19, 2021
320eead
Merge branch 'dev' into gochaudh/shared_mem_data_transfer
Feb 19, 2021
ad4def0
Fixing more lint issues
Feb 19, 2021
4be1170
Removing the use of __annotations__ to be compatible with Python 3.6,…
Feb 19, 2021
bc10c1d
Added tests for FileAccessor, SharedMemoryMap, FileAccessorFactory
Feb 19, 2021
b07cf6a
Adding shared memory test setup for Unix
Feb 19, 2021
292b980
Adding tests for SharedMemoryManager
Feb 19, 2021
cfa943e
Adding tests to ensure the dispatcher can invoke the function and sen…
Feb 20, 2021
2825e11
More tests for worker/dispatcher's use of shared memory
Feb 23, 2021
62deedc
Using truncate instead of writing 0x00 byte manually upon creating mm…
Feb 23, 2021
66d53e1
Addressing comments
Feb 23, 2021
c4665db
Adding missing tests and doc strings for tests and their classes
Feb 23, 2021
dc84cf5
assertEqual -> assertTrue in test_dispose_without_delete_file
Feb 23, 2021
47cffb9
Tried with blob trigger function - removed TODO
Feb 23, 2021
52a8701
Addressing comments
Mar 3, 2021
db950a9
Merge branch 'dev' into gochaudh/shared_mem_data_transfer
Mar 3, 2021
848de3d
Minor const fix
Mar 4, 2021
cd75b85
Addressing comments
Mar 8, 2021
691a774
Fixed test - removed unused test functions
Mar 8, 2021
a399f2e
Addressing comments; caching list of valid directories for later use …
Mar 10, 2021
914bca4
Merge branch 'dev' into gochaudh/shared_mem_data_transfer
Mar 10, 2021
33aab5e
Whitespace fix
Mar 10, 2021
3c0c8a9
Cleanup
Mar 10, 2021
9467170
Adding AppSetting to override shared memory directory for Unix - test…
Mar 13, 2021
0fbca31
Logging subprocess result in Exception
Mar 13, 2021
050399d
Changed Exception -> IOError
Mar 13, 2021
dd425c5
Check shared memory directory AppSetting only for Darwin tests
Mar 13, 2021
3c75b31
Fix test cleanup for Darwin
Mar 13, 2021
4563697
Only split AppSetting list of directories if the AppSetting was found
Mar 13, 2021
da436e3
Fix consts.UNIX_TEMP_DIRS
Mar 13, 2021
ced9275
Typo fix
Mar 13, 2021
36797fa
Adding throughput tests for blob input/output using shared memory
Mar 15, 2021
598acf4
Changing branch hardcode to instead use current branch
Mar 15, 2021
4c38ae0
Creating tests URL from current branch name
Mar 15, 2021
9bffdde
Whitespace fixes
Mar 15, 2021
a43b3d3
Using tests from local dir
Mar 15, 2021
87ee40e
Removing the need to use a URL for tests; just use from local directory
Mar 15, 2021
04dba67
Rename env variable to follow convention from host of using FUNCTIONS_
Mar 15, 2021
f7df639
Addressing comments
Mar 19, 2021
00b36ed
Fixes for running tests on macOS
Mar 19, 2021
af03f2e
Merge branch 'dev' into gochaudh/shared_mem_data_transfer
Mar 19, 2021
ab7d488
Log which allowed directories for shared memory are being used
Mar 19, 2021
0e56751
Change assert -> raise
Mar 19, 2021
f60ae73
Merge branch 'dev' into gochaudh/shared_mem_data_transfer
Hazhzeng Mar 23, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .ci/perf_tests/dockerfiles/perf_tests.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ARG PYTHON_VERSION=3.8

FROM mcr.microsoft.com/azure-functions/python:3.0.14492-python$PYTHON_VERSION
FROM mcr.microsoft.com/azure-functions/python:3.0.15418-python$PYTHON_VERSION

# Mounting local machines azure-functions-python-worker and azure-functions-python-library onto it
RUN rm -rf /azure-functions-host/workers/python/${PYTHON_VERSION}/LINUX/X64/azure_functions_worker
Expand All @@ -11,7 +11,8 @@ VOLUME ["/azure-functions-host/workers/python/${PYTHON_VERSION}/LINUX/X64/azure_
ENV AzureWebJobsScriptRoot=/home/site/wwwroot \
AzureFunctionsJobHost__Logging__Console__IsEnabled=true \
FUNCTIONS_WORKER_PROCESS_COUNT=1 \
AZURE_FUNCTIONS_ENVIRONMENT=Development
AZURE_FUNCTIONS_ENVIRONMENT=Development \
FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED=1

RUN apt-get --quiet update && \
apt-get install --quiet -y git && \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { check } from "k6";
import { Rate } from "k6/metrics";
import http from "k6/http";

var HOSTNAME = __ENV.HOSTNAME || 'localhost';
var PORT = __ENV.PORT || '80';
var PROTOCOL = __ENV.PROTOCOL || (PORT === '80' ? 'http' : 'https');
var INPUT_FILENAME = 'Input_256MB'
var CONTENT_SIZE = 1024 * 1024 * 256; // 256 MB

// A custom metric to track failure rates
var failureRate = new Rate("check_failure_rate");

// Options
export let options = {
stages: [
// Linearly ramp up from 1 to 20 VUs during first minute
{ target: 20, duration: "1m" },
// Hold at 20 VUs for the next 3 minutes and 45 seconds
{ target: 20, duration: "3m45s" },
// Linearly ramp down from 20 to 0 VUs over the last 15 seconds
{ target: 0, duration: "15s" }
// Total execution time will be ~5 minutes
],
thresholds: {
// We want the 95th percentile of all HTTP request durations to be less than 40s
"http_req_duration": ["p(95)<40000"],
// Thresholds based on the custom metric we defined and use to track application failures
"check_failure_rate": [
// Global failure rate should be less than 1%
"rate<0.01",
// Abort the test early if it climbs over 5%
{ threshold: "rate<=0.05", abortOnFail: true },
],
},
};

// Setup function
// This will create a blob which will later be used as an input binding
export function setup() {
let no_random_input = true;
let url = `${PROTOCOL}://${HOSTNAME}:${PORT}/api/SyncPutBlobAsBytesReturnHttpResponse?content_size=${CONTENT_SIZE}&no_random_input=${no_random_input}&outfile=${INPUT_FILENAME}`;
let response = http.get(url);

// check() returns false if any of the specified conditions fail
let checkRes = check(response, {
"status is 200": (r) => r.status === 200,
"content_size matches": (r) => r.json().content_size === CONTENT_SIZE,
});
}

// Main function
export default function () {
let url = `${PROTOCOL}://${HOSTNAME}:${PORT}/api/SyncGetBlobAsBytesReturnHttpResponse?infile=${INPUT_FILENAME}`;
let response = http.get(url);

// check() returns false if any of the specified conditions fail
let checkRes = check(response, {
"status is 200": (r) => r.status === 200,
"content_size matches": (r) => r.json().content_size === CONTENT_SIZE,
});

// We reverse the check() result since we want to count the failures
failureRate.add(!checkRes);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { check } from "k6";
import { Rate } from "k6/metrics";
import http from "k6/http";
import { randomIntBetween } from "https://jslib.k6.io/k6-utils/1.0.0/index.js";

var HOSTNAME = __ENV.HOSTNAME || 'localhost';
var PORT = __ENV.PORT || '80';
var PROTOCOL = __ENV.PROTOCOL || (PORT === '80' ? 'http' : 'https');

// A custom metric to track failure rates
var failureRate = new Rate("check_failure_rate");

// Options
export let options = {
stages: [
// Linearly ramp up from 1 to 50 VUs during first minute
{ target: 50, duration: "1m" },
// Hold at 50 VUs for the next 3 minutes and 45 seconds
{ target: 50, duration: "3m45s" },
// Linearly ramp down from 50 to 0 VUs over the last 15 seconds
{ target: 0, duration: "15s" }
// Total execution time will be ~5 minutes
],
thresholds: {
// We want the 95th percentile of all HTTP request durations to be less than 40s
"http_req_duration": ["p(95)<40000"],
// Thresholds based on the custom metric we defined and use to track application failures
"check_failure_rate": [
// Global failure rate should be less than 1%
"rate<0.01",
// Abort the test early if it climbs over 5%
{ threshold: "rate<=0.05", abortOnFail: true },
],
},
};

// Main function
export default function () {
let content_size = 1024 * 1024 * 256; // 256 MB
let no_random_input = true;
let outfile = randomIntBetween(1,500000);
let url = `${PROTOCOL}://${HOSTNAME}:${PORT}/api/SyncPutBlobAsBytesReturnHttpResponse?content_size=${content_size}&no_random_input=${no_random_input}&outfile=${outfile}`;
let response = http.get(url);

// check() returns false if any of the specified conditions fail
let checkRes = check(response, {
"status is 200": (r) => r.status === 200,
"content_size matches": (r) => r.json().content_size === content_size,
});

// We reverse the check() result since we want to count the failures
failureRate.add(!checkRes);
}
8 changes: 4 additions & 4 deletions .github/workflows/perf-testing-setup.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
branches: [ dev ]

env:
PERF_TESTS_LINK: "https://raw.githubusercontent.com/Azure/azure-functions-python-worker/dev/.ci/perf_tests/k6scripts/"
TESTS_DIR_PATH: ".ci/perf_tests/k6scripts/"
PYTHON_VERSION: "3.8"
PORT: 8000

Expand All @@ -16,7 +16,7 @@ jobs:
strategy:
fail-fast: false
matrix:
test_to_run: [ SyncHttpTriggerHelloWorld, SyncHttpTriggerWithSyncRequests, AsyncHttpTriggerWithAsyncRequest, SyncHttpTriggerCPUIntensive ]
test_to_run: [ SyncHttpTriggerHelloWorld, SyncHttpTriggerWithSyncRequests, AsyncHttpTriggerWithAsyncRequest, SyncHttpTriggerCPUIntensive, SyncPutBlobAsBytesReturnHttpResponse, SyncGetBlobAsBytesReturnHttpResponse ]
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ env.PYTHON_VERSION }}
Expand Down Expand Up @@ -44,12 +44,12 @@ jobs:
- name: Build and Run the Docker image
run: |
docker build --build-arg PYTHON_VERSION=${{ env.PYTHON_VERSION }} --file .ci/perf_tests/dockerfiles/perf_tests.Dockerfile --tag perfimage:latest .
docker run -d --env FUNCTIONS_WORKER_RUNTIME_VERSION=${{ env.PYTHON_VERSION }} -p ${PORT}:80 -v $GITHUB_WORKSPACE/azure_functions_worker:/azure-functions-host/workers/python/${{ env.PYTHON_VERSION }}/LINUX/X64/azure_functions_worker perfimage:latest
docker run -d --shm-size="2g" --env FUNCTIONS_WORKER_RUNTIME_VERSION=${{ env.PYTHON_VERSION }} -p ${PORT}:80 -v $GITHUB_WORKSPACE/azure_functions_worker:/azure-functions-host/workers/python/${{ env.PYTHON_VERSION }}/LINUX/X64/azure_functions_worker perfimage:latest
sleep 10 # host needs some time to start.
- name: Validate if the functions are now running
run: |
curl --get http://localhost:${PORT}/api/${{ matrix.test_to_run }}
- name: Run Throughput tests
run: |
chmod 755 .ci/perf_tests/run-perftests.sh
.ci/perf_tests/run-perftests.sh localhost $PORT $PERF_TESTS_LINK ${{ matrix.test_to_run }}
.ci/perf_tests/run-perftests.sh localhost $PORT ${{ env.TESTS_DIR_PATH }} ${{ matrix.test_to_run }}
4 changes: 3 additions & 1 deletion azure_functions_worker/bindings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from .meta import check_output_type_annotation
from .meta import has_implicit_output
from .meta import is_trigger_binding
from .meta import from_incoming_proto, to_outgoing_proto
from .meta import from_incoming_proto, to_outgoing_proto, \
to_outgoing_param_binding
from .out import Out


Expand All @@ -16,4 +17,5 @@
'check_input_type_annotation', 'check_output_type_annotation',
'has_implicit_output',
'from_incoming_proto', 'to_outgoing_proto', 'TraceContext',
'to_outgoing_param_binding'
)
78 changes: 77 additions & 1 deletion azure_functions_worker/bindings/datumdef.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from typing import Any
from typing import Any, Optional
import json
from .. import protos
from ..logging import logger


class Datum:
Expand Down Expand Up @@ -92,6 +93,81 @@ def from_typed_data(cls, td: protos.TypedData):

return cls(val, tt)

@classmethod
def from_rpc_shared_memory(
cls,
shmem: protos.RpcSharedMemory,
shmem_mgr) -> Optional['Datum']:
"""
Reads the specified shared memory region and converts the read data into
a datum object of the corresponding type.
"""
if shmem is None:
logger.warning('Cannot read from shared memory. '
'RpcSharedMemory is None.')
return None

mem_map_name = shmem.name
offset = shmem.offset
count = shmem.count
data_type = shmem.type
ret_val = None

if data_type == protos.RpcDataType.bytes:
val = shmem_mgr.get_bytes(mem_map_name, offset, count)
if val is not None:
ret_val = cls(val, 'bytes')
elif data_type == protos.RpcDataType.string:
val = shmem_mgr.get_string(mem_map_name, offset, count)
if val is not None:
ret_val = cls(val, 'string')

if ret_val is not None:
logger.info(
f'Read {count} bytes from memory map {mem_map_name} '
f'for data type {data_type}')
return ret_val
return None

@classmethod
def to_rpc_shared_memory(
cls,
datum: 'Datum',
shmem_mgr) -> Optional[protos.RpcSharedMemory]:
"""
Writes the given value to shared memory and returns the corresponding
RpcSharedMemory object which can be sent back to the functions host over
RPC.
"""
if datum.type == 'bytes':
value = datum.value
shared_mem_meta = shmem_mgr.put_bytes(value)
data_type = protos.RpcDataType.bytes
elif datum.type == 'string':
value = datum.value
shared_mem_meta = shmem_mgr.put_string(value)
data_type = protos.RpcDataType.string
else:
raise NotImplementedError(
f'Unsupported datum type ({datum.type}) for shared memory'
)

if shared_mem_meta is None:
logger.warning('Cannot write to shared memory for type: '
f'{datum.type}')
return None

shmem = protos.RpcSharedMemory(
name=shared_mem_meta.mem_map_name,
offset=0,
count=shared_mem_meta.count_bytes,
type=data_type)

logger.info(
f'Wrote {shared_mem_meta.count_bytes} bytes to memory map '
f'{shared_mem_meta.mem_map_name} for data type {data_type}')
return shmem


def datum_as_proto(datum: Datum) -> protos.TypedData:
if datum.type == 'string':
Expand Down
64 changes: 55 additions & 9 deletions azure_functions_worker/bindings/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
from . import datumdef
from . import generic

PB_TYPE = 'rpc_data'
PB_TYPE_DATA = 'data'
PB_TYPE_RPC_SHARED_MEMORY = 'rpc_shared_memory'


def get_binding_registry():
func = sys.modules.get('azure.functions')
Expand Down Expand Up @@ -55,13 +59,11 @@ def has_implicit_output(bind_name: str) -> bool:

def from_incoming_proto(
binding: str,
val: protos.TypedData, *,
pb: protos.ParameterBinding, *,
pytype: typing.Optional[type],
trigger_metadata: typing.Optional[typing.Dict[str, protos.TypedData]])\
-> typing.Any:

trigger_metadata: typing.Optional[typing.Dict[str, protos.TypedData]],
shmem_mgr) -> typing.Any:
binding = get_binding(binding)
datum = datumdef.Datum.from_typed_data(val)
if trigger_metadata:
metadata = {
k: datumdef.Datum.from_typed_data(v)
Expand All @@ -70,22 +72,34 @@ def from_incoming_proto(
else:
metadata = {}

pb_type = pb.WhichOneof(PB_TYPE)
if pb_type == PB_TYPE_DATA:
val = pb.data
datum = datumdef.Datum.from_typed_data(val)
elif pb_type == PB_TYPE_RPC_SHARED_MEMORY:
# Data was sent over shared memory, attempt to read
datum = datumdef.Datum.from_rpc_shared_memory(pb.rpc_shared_memory,
shmem_mgr)
else:
raise TypeError(f'Unknown ParameterBindingType: {pb_type}')

try:
return binding.decode(datum, trigger_metadata=metadata)
except NotImplementedError:
# Binding does not support the data.
dt = val.WhichOneof('data')

raise TypeError(
f'unable to decode incoming TypedData: '
f'unsupported combination of TypedData field {dt!r} '
f'and expected binding type {binding}')


def to_outgoing_proto(binding: str, obj: typing.Any, *,
pytype: typing.Optional[type]) -> protos.TypedData:
def get_datum(binding: str, obj: typing.Any,
pytype: typing.Optional[type]) -> datumdef.Datum:
"""
Convert an object to a datum with the specified type.
"""
binding = get_binding(binding)

try:
datum = binding.encode(obj, expected_type=pytype)
except NotImplementedError:
Expand All @@ -94,5 +108,37 @@ def to_outgoing_proto(binding: str, obj: typing.Any, *,
f'unable to encode outgoing TypedData: '
f'unsupported type "{binding}" for '
f'Python type "{type(obj).__name__}"')
return datum


def to_outgoing_proto(binding: str, obj: typing.Any, *,
pytype: typing.Optional[type]) -> protos.TypedData:
datum = get_datum(binding, obj, pytype)
return datumdef.datum_as_proto(datum)


def to_outgoing_param_binding(binding: str, obj: typing.Any, *,
pytype: typing.Optional[type],
out_name: str,
shmem_mgr) \
-> protos.ParameterBinding:
datum = get_datum(binding, obj, pytype)
shared_mem_value = None
# If shared memory is enabled and supported for the given datum, try to
# transfer to host over shared memory as a default
if shmem_mgr.is_enabled() and shmem_mgr.is_supported(datum):
shared_mem_value = datumdef.Datum.to_rpc_shared_memory(datum, shmem_mgr)
# Check if data was written into shared memory
if shared_mem_value is not None:
# If it was, then use the rpc_shared_memory field in response message
return protos.ParameterBinding(
name=out_name,
rpc_shared_memory=shared_mem_value)
else:
# If not, send it as part of the response message over RPC
rpc_val = datumdef.datum_as_proto(datum)
if rpc_val is None:
raise TypeError('Cannot convert datum to rpc_val')
return protos.ParameterBinding(
name=out_name,
data=rpc_val)
Loading