Skip to content

SI: Implement put operations #67

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 41 commits into from
Dec 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
28c3a59
Basic PUT operation. Currently this never executes because the server
Nov 15, 2022
1b245b1
Bump Spark CLI service protocol version being used.
Nov 15, 2022
1239def
Log when attempting a staging operation
Nov 15, 2022
b605cce
Fix failing unit tests since function signature for ExecuteResponse c…
Nov 16, 2022
3ed84d8
Add e2e test for put.
Nov 16, 2022
57b8a34
Bail on tests if staging_ingestion_user is not set
Nov 16, 2022
7812278
Black client.py
Nov 16, 2022
6b76439
Add unit test that sanity checks _handle_staging_operation is called
Nov 17, 2022
3df7c89
Fix imports so that this module can be run independently:
Nov 17, 2022
8f0a02e
Implement GET operation
Nov 17, 2022
55525cb
Refactor client.py into distinct methods for each ingestion command type
Nov 23, 2022
157ac3d
Update pypoetry so I can develop on Python 3.10
Nov 23, 2022
0739ccc
Applied PR feedback around explicit response codes.
Nov 23, 2022
d3a3651
Applying PR feedback
Nov 23, 2022
72f917e
PR feedback
Nov 23, 2022
fba64b7
Black client.py
Nov 23, 2022
c27a3d6
Refactor e2e test to use a single teste for PUT, GET, and REMOVE
Nov 23, 2022
19ca706
Make REMOVE command work
Nov 23, 2022
0167bd9
These methods don't need to know the `operation`
Nov 23, 2022
85e4d7c
Remove single quote that broke query
Nov 23, 2022
713002d
Remove unneeded argument
Nov 23, 2022
fc06ef8
Expect operation to succeed
Nov 23, 2022
cafa17d
Black PySQLStagingIngestionTestSuite only
Nov 23, 2022
a508a1c
Tidy up comments in e2e test
Nov 23, 2022
ce80df0
Basic e2e test scaffolded in. Currently fails.
Nov 23, 2022
36885a4
Only allow ingestion commands when base_uploads_path is specified
Nov 23, 2022
c0c09d4
Restrict local file operations to descendents of uploads_base_path
Nov 23, 2022
f612795
Remove per PR feedback
Dec 20, 2022
e609ef3
Add check for null local_file per PR feedback
Dec 20, 2022
cdbe2d6
Open output stream _after_ successful HTTP request
Dec 20, 2022
34a0362
Resolve relative paths before comparing row.localFile to uploads_base…
Dec 20, 2022
c8a64c7
Add test that PUT fails if file exists in staging location and OVERWR…
Dec 20, 2022
d48d3f3
Add tests: operations fail to modify another user's staging location
Dec 20, 2022
e0037e0
Add test that ingestion command fails if local file is blank
Dec 20, 2022
3fa5d84
Add test that invalid staging path will fail at server
Dec 20, 2022
4824b68
Basic usage example (needs tweaking)
Dec 22, 2022
469f35f
Add samples of GET and REMOVE
Dec 22, 2022
bdb948a
Refactor to allow uploads_base_path to be either a single string object
Dec 28, 2022
0261b7a
Refactor uploads_base_path to staging_allowed_local_path
Dec 29, 2022
00d8a49
Fix mypy static type failures
Dec 30, 2022
7a602e6
Black src files
Dec 30, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ export access_token=""
There are several e2e test suites available:
- `PySQLCoreTestSuite`
- `PySQLLargeQueriesSuite`
- `PySQLStagingIngestionTestSuite`
- `PySQLRetryTestSuite.HTTP503Suite` **[not documented]**
- `PySQLRetryTestSuite.HTTP429Suite` **[not documented]**
- `PySQLUnityCatalogTestSuite` **[not documented]**
Expand All @@ -122,6 +123,12 @@ To execute the core test suite:
poetry run python -m pytest tests/e2e/driver_tests.py::PySQLCoreTestSuite
```

The `PySQLCoreTestSuite` namespace contains tests for all of the connector's basic features and behaviours. This is the default namespace where tests should be written unless they require specially configured clusters or take an especially long-time to execute by design.

The `PySQLLargeQueriesSuite` namespace contains long-running query tests and is kept separate. In general, if the `PySQLCoreTestSuite` passes then these tests will as well.

The `PySQLStagingIngestionTestSuite` namespace requires a cluster running DBR version > 12.x which supports staging ingestion commands.

The suites marked `[not documented]` require additional configuration which will be documented at a later time.
### Code formatting

Expand Down
3 changes: 2 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ To run all of these examples you can clone the entire repository to your disk. O
- **`interactive_oauth.py`** shows the simplest example of authenticating by OAuth (no need for a PAT generated in the DBSQL UI) while Bring Your Own IDP is in public preview. When you run the script it will open a browser window so you can authenticate. Afterward, the script fetches some sample data from Databricks and prints it to the screen. For this script, the OAuth token is not persisted which means you need to authenticate every time you run the script.
- **`persistent_oauth.py`** shows a more advanced example of authenticating by OAuth while Bring Your Own IDP is in public preview. In this case, it shows how to use a sublcass of `OAuthPersistence` to reuse an OAuth token across script executions.
- **`set_user_agent.py`** shows how to customize the user agent header used for Thrift commands. In
this example the string `ExamplePartnerTag` will be added to the the user agent on every request.
this example the string `ExamplePartnerTag` will be added to the the user agent on every request.
- **`staging_ingestion.py`** shows how the connector handles Databricks' experimental staging ingestion commands `GET`, `PUT`, and `REMOVE`.
87 changes: 87 additions & 0 deletions examples/staging_ingestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from databricks import sql
import os

"""
Databricks experimentally supports data ingestion of local files via a cloud staging location.
Ingestion commands will work on DBR >12. And you must include a staging_allowed_local_path kwarg when
calling sql.connect().
Use databricks-sql-connector to PUT files into the staging location where Databricks can access them:
PUT '/path/to/local/data.csv' INTO 'stage://tmp/[email protected]/salesdata/september.csv' OVERWRITE
Files in a staging location can also be retrieved with a GET command
GET 'stage://tmp/[email protected]/salesdata/september.csv' TO 'data.csv'
and deleted with a REMOVE command:
REMOVE 'stage://tmp/[email protected]/salesdata/september.csv'
Ingestion queries are passed to cursor.execute() like any other query. For GET and PUT commands, a local file
will be read or written. For security, this local file must be contained within, or descended from, a
staging_allowed_local_path of the connection.
Additionally, the connection can only manipulate files within the cloud storage location of the authenticated user.
To run this script:
1. Set the INGESTION_USER constant to the account email address of the authenticated user
2. Set the FILEPATH constant to the path of a file that will be uploaded (this example assumes its a CSV file)
3. Run this file
Note: staging_allowed_local_path can be either a Pathlike object or a list of Pathlike objects.
"""

INGESTION_USER = "[email protected]"
FILEPATH = "example.csv"

# FILEPATH can be relative to the current directory.
# Resolve it into an absolute path
_complete_path = os.path.realpath(FILEPATH)

if not os.path.exists(_complete_path):

# It's easiest to save a file in the same directory as this script. But any path to a file will work.
raise Exception(
"You need to set FILEPATH in this script to a file that actually exists."
)

# Set staging_allowed_local_path equal to the directory that contains FILEPATH
staging_allowed_local_path = os.path.split(_complete_path)[0]

with sql.connect(
server_hostname=os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path=os.getenv("DATABRICKS_HTTP_PATH"),
access_token=os.getenv("DATABRICKS_TOKEN"),
staging_allowed_local_path=staging_allowed_local_path,
) as connection:

with connection.cursor() as cursor:

# Ingestion commands are executed like any other SQL.
# Here's a sample PUT query. You can remove OVERWRITE at the end to avoid silently overwriting data.
query = f"PUT '{_complete_path}' INTO 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv' OVERWRITE"

print(f"Uploading {FILEPATH} to staging location")
cursor.execute(query)
print("Upload was successful")

temp_fp = os.path.realpath("temp.csv")

# Here's a sample GET query. Note that `temp_fp` must also be contained within, or descended from,
# the staging_allowed_local_path.
query = (
f"GET 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv' TO '{temp_fp}'"
)

print(f"Fetching from staging location into new file called temp.csv")
cursor.execute(query)
print("Download was successful")

# Here's a sample REMOVE query. It cleans up the the demo.csv created in our first query
query = f"REMOVE 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv'"

print("Removing demo.csv from staging location")
cursor.execute(query)
print("Remove was successful")
48 changes: 42 additions & 6 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ pyarrow = "^9.0.0"
lz4 = "^4.0.2"
requests=">2.18.1"
oauthlib=">=3.1.0"
numpy = [
{version = "1.21.1", python = ">=3.7,<3.8"},
{version = "1.23.4", python = ">=3.8"}
]

[tool.poetry.dev-dependencies]
pytest = "^7.1.2"
Expand Down
Loading