-
Notifications
You must be signed in to change notification settings - Fork 114
SI: Implement put operations #67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Changes from all commits
Commits
Show all changes
41 commits
Select commit
Hold shift + click to select a range
28c3a59
Basic PUT operation. Currently this never executes because the server
1b245b1
Bump Spark CLI service protocol version being used.
1239def
Log when attempting a staging operation
b605cce
Fix failing unit tests since function signature for ExecuteResponse c…
3ed84d8
Add e2e test for put.
57b8a34
Bail on tests if staging_ingestion_user is not set
7812278
Black client.py
6b76439
Add unit test that sanity checks _handle_staging_operation is called
3df7c89
Fix imports so that this module can be run independently:
8f0a02e
Implement GET operation
55525cb
Refactor client.py into distinct methods for each ingestion command type
157ac3d
Update pypoetry so I can develop on Python 3.10
0739ccc
Applied PR feedback around explicit response codes.
d3a3651
Applying PR feedback
72f917e
PR feedback
fba64b7
Black client.py
c27a3d6
Refactor e2e test to use a single teste for PUT, GET, and REMOVE
19ca706
Make REMOVE command work
0167bd9
These methods don't need to know the `operation`
85e4d7c
Remove single quote that broke query
713002d
Remove unneeded argument
fc06ef8
Expect operation to succeed
cafa17d
Black PySQLStagingIngestionTestSuite only
a508a1c
Tidy up comments in e2e test
ce80df0
Basic e2e test scaffolded in. Currently fails.
36885a4
Only allow ingestion commands when base_uploads_path is specified
c0c09d4
Restrict local file operations to descendents of uploads_base_path
f612795
Remove per PR feedback
e609ef3
Add check for null local_file per PR feedback
cdbe2d6
Open output stream _after_ successful HTTP request
34a0362
Resolve relative paths before comparing row.localFile to uploads_base…
c8a64c7
Add test that PUT fails if file exists in staging location and OVERWR…
d48d3f3
Add tests: operations fail to modify another user's staging location
e0037e0
Add test that ingestion command fails if local file is blank
3fa5d84
Add test that invalid staging path will fail at server
4824b68
Basic usage example (needs tweaking)
469f35f
Add samples of GET and REMOVE
bdb948a
Refactor to allow uploads_base_path to be either a single string object
0261b7a
Refactor uploads_base_path to staging_allowed_local_path
00d8a49
Fix mypy static type failures
7a602e6
Black src files
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
from databricks import sql | ||
import os | ||
|
||
""" | ||
Databricks experimentally supports data ingestion of local files via a cloud staging location. | ||
Ingestion commands will work on DBR >12. And you must include a staging_allowed_local_path kwarg when | ||
calling sql.connect(). | ||
Use databricks-sql-connector to PUT files into the staging location where Databricks can access them: | ||
PUT '/path/to/local/data.csv' INTO 'stage://tmp/[email protected]/salesdata/september.csv' OVERWRITE | ||
Files in a staging location can also be retrieved with a GET command | ||
GET 'stage://tmp/[email protected]/salesdata/september.csv' TO 'data.csv' | ||
and deleted with a REMOVE command: | ||
REMOVE 'stage://tmp/[email protected]/salesdata/september.csv' | ||
Ingestion queries are passed to cursor.execute() like any other query. For GET and PUT commands, a local file | ||
will be read or written. For security, this local file must be contained within, or descended from, a | ||
staging_allowed_local_path of the connection. | ||
Additionally, the connection can only manipulate files within the cloud storage location of the authenticated user. | ||
To run this script: | ||
1. Set the INGESTION_USER constant to the account email address of the authenticated user | ||
2. Set the FILEPATH constant to the path of a file that will be uploaded (this example assumes its a CSV file) | ||
3. Run this file | ||
Note: staging_allowed_local_path can be either a Pathlike object or a list of Pathlike objects. | ||
""" | ||
|
||
INGESTION_USER = "[email protected]" | ||
FILEPATH = "example.csv" | ||
|
||
# FILEPATH can be relative to the current directory. | ||
# Resolve it into an absolute path | ||
_complete_path = os.path.realpath(FILEPATH) | ||
|
||
if not os.path.exists(_complete_path): | ||
|
||
# It's easiest to save a file in the same directory as this script. But any path to a file will work. | ||
raise Exception( | ||
"You need to set FILEPATH in this script to a file that actually exists." | ||
) | ||
|
||
# Set staging_allowed_local_path equal to the directory that contains FILEPATH | ||
staging_allowed_local_path = os.path.split(_complete_path)[0] | ||
|
||
with sql.connect( | ||
server_hostname=os.getenv("DATABRICKS_SERVER_HOSTNAME"), | ||
http_path=os.getenv("DATABRICKS_HTTP_PATH"), | ||
access_token=os.getenv("DATABRICKS_TOKEN"), | ||
staging_allowed_local_path=staging_allowed_local_path, | ||
) as connection: | ||
|
||
with connection.cursor() as cursor: | ||
|
||
# Ingestion commands are executed like any other SQL. | ||
# Here's a sample PUT query. You can remove OVERWRITE at the end to avoid silently overwriting data. | ||
query = f"PUT '{_complete_path}' INTO 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv' OVERWRITE" | ||
|
||
print(f"Uploading {FILEPATH} to staging location") | ||
cursor.execute(query) | ||
print("Upload was successful") | ||
|
||
temp_fp = os.path.realpath("temp.csv") | ||
|
||
# Here's a sample GET query. Note that `temp_fp` must also be contained within, or descended from, | ||
# the staging_allowed_local_path. | ||
query = ( | ||
f"GET 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv' TO '{temp_fp}'" | ||
) | ||
|
||
print(f"Fetching from staging location into new file called temp.csv") | ||
cursor.execute(query) | ||
print("Download was successful") | ||
|
||
# Here's a sample REMOVE query. It cleans up the the demo.csv created in our first query | ||
query = f"REMOVE 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv'" | ||
|
||
print("Removing demo.csv from staging location") | ||
cursor.execute(query) | ||
print("Remove was successful") |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.