databricks-python-sdk

安装量: 82
排名: #9630

安装

npx skills add https://github.com/databricks-solutions/ai-dev-kit --skill databricks-python-sdk

Databricks Development Guide This skill provides guidance for Databricks SDK, Databricks Connect, CLI, and REST API. SDK Documentation: https://databricks-sdk-py.readthedocs.io/en/latest/ GitHub Repository: https://github.com/databricks/databricks-sdk-py Environment Setup Use existing virtual environment at .venv or use uv to create one For Spark operations: uv pip install databricks-connect For SDK operations: uv pip install databricks-sdk Databricks CLI version should be 0.278.0 or higher Configuration Default profile name: DEFAULT Config file: ~/.databrickscfg Environment variables: DATABRICKS_HOST , DATABRICKS_TOKEN Databricks Connect (Spark Operations) Use databricks-connect for running Spark code locally against a Databricks cluster. from databricks . connect import DatabricksSession

Auto-detects 'DEFAULT' profile from ~/.databrickscfg

spark

DatabricksSession . builder . getOrCreate ( )

With explicit profile

spark

DatabricksSession . builder . profile ( "MY_PROFILE" ) . getOrCreate ( )

Use spark as normal

df

spark . sql ( "SELECT * FROM catalog.schema.table" ) df . show ( ) IMPORTANT: Do NOT set .master("local[*]") - this will cause issues with Databricks Connect. Direct REST API Access For operations not yet in SDK or overly complex via SDK, use direct REST API: from databricks . sdk import WorkspaceClient w = WorkspaceClient ( )

Direct API call using authenticated client

response

w . api_client . do ( method = "GET" , path = "/api/2.0/clusters/list" )

POST with body

response

w . api_client . do ( method = "POST" , path = "/api/2.0/jobs/run-now" , body = { "job_id" : 123 } ) When to use: Prefer SDK methods when available. Use api_client.do for: New API endpoints not yet in SDK Complex operations where SDK abstraction is problematic Debugging/testing raw API responses Databricks CLI

Check version (should be >= 0.278.0)

databricks --version

Use specific profile

databricks --profile MY_PROFILE clusters list

Common commands

databricks clusters list databricks jobs list databricks workspace ls /Users/me SDK Documentation Architecture The SDK documentation follows a predictable URL pattern: Base: https://databricks-sdk-py.readthedocs.io/en/latest/ Workspace APIs: /workspace/{category}/{service}.html Account APIs: /account/{category}/{service}.html Authentication: /authentication.html DBUtils: /dbutils.html Workspace API Categories Category Services compute clusters, cluster_policies, command_execution, instance_pools, libraries catalog catalogs, schemas, tables, volumes, functions, storage_credentials, external_locations jobs jobs sql warehouses, statement_execution, queries, alerts, dashboards serving serving_endpoints vectorsearch vector_search_indexes, vector_search_endpoints pipelines pipelines workspace repos, secrets, workspace, git_credentials files files, dbfs ml experiments, model_registry Authentication Doc: https://databricks-sdk-py.readthedocs.io/en/latest/authentication.html Environment Variables DATABRICKS_HOST = https://your-workspace.cloud.databricks.com DATABRICKS_TOKEN = dapi .. .

Personal Access Token

Code Patterns

Auto-detect credentials from environment

from databricks . sdk import WorkspaceClient w = WorkspaceClient ( )

Explicit token auth

w

WorkspaceClient ( host = "https://your-workspace.cloud.databricks.com" , token = "dapi..." )

Azure Service Principal

w

WorkspaceClient ( host = "https://adb-xxx.azuredatabricks.net" , azure_workspace_resource_id = "/subscriptions/.../resourceGroups/.../providers/Microsoft.Databricks/workspaces/..." , azure_tenant_id = "tenant-id" , azure_client_id = "client-id" , azure_client_secret = "secret" )

Use a named profile from ~/.databrickscfg

w

WorkspaceClient ( profile = "MY_PROFILE" ) Core API Reference Clusters API Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/compute/clusters.html

List all clusters

for cluster in w . clusters . list ( ) : print ( f" { cluster . cluster_name } : { cluster . state } " )

Get cluster details

cluster

w . clusters . get ( cluster_id = "0123-456789-abcdef" )

Create a cluster (returns Wait object)

wait

w . clusters . create ( cluster_name = "my-cluster" , spark_version = w . clusters . select_spark_version ( latest = True ) , node_type_id = w . clusters . select_node_type ( local_disk = True ) , num_workers = 2 ) cluster = wait . result ( )

Wait for cluster to be running

Or use create_and_wait for blocking call

cluster

w . clusters . create_and_wait ( cluster_name = "my-cluster" , spark_version = "14.3.x-scala2.12" , node_type_id = "i3.xlarge" , num_workers = 2 , timeout = timedelta ( minutes = 30 ) )

Start/stop/delete

w . clusters . start ( cluster_id = "..." ) . result ( ) w . clusters . stop ( cluster_id = "..." ) w . clusters . delete ( cluster_id = "..." ) Jobs API Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/jobs/jobs.html from databricks . sdk . service . jobs import Task , NotebookTask

List jobs

for job in w . jobs . list ( ) : print ( f" { job . job_id } : { job . settings . name } " )

Create a job

created

w . jobs . create ( name = "my-job" , tasks = [ Task ( task_key = "main" , notebook_task = NotebookTask ( notebook_path = "/Users/me/notebook" ) , existing_cluster_id = "0123-456789-abcdef" ) ] )

Run a job now

run

w . jobs . run_now_and_wait ( job_id = created . job_id ) print ( f"Run completed: { run . state . result_state } " )

Get run output

output

w . jobs . get_run_output ( run_id = run . run_id ) SQL Statement Execution Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/sql/statement_execution.html

Execute SQL query

response

w . statement_execution . execute_statement ( warehouse_id = "abc123" , statement = "SELECT * FROM catalog.schema.table LIMIT 10" , wait_timeout = "30s" )

Check status and get results

if response . status . state == StatementState . SUCCEEDED : for row in response . result . data_array : print ( row )

For large results, fetch chunks

chunk

w . statement_execution . get_statement_result_chunk_n ( statement_id = response . statement_id , chunk_index = 0 ) SQL Warehouses Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/sql/warehouses.html

List warehouses

for wh in w . warehouses . list ( ) : print ( f" { wh . name } : { wh . state } " )

Get warehouse

warehouse

w . warehouses . get ( id = "abc123" )

Create warehouse

created

w . warehouses . create_and_wait ( name = "my-warehouse" , cluster_size = "Small" , max_num_clusters = 1 , auto_stop_mins = 15 )

Start/stop

w . warehouses . start ( id = "abc123" ) . result ( ) w . warehouses . stop ( id = "abc123" ) . result ( ) Unity Catalog - Tables Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/tables.html

List tables in a schema

for table in w . tables . list ( catalog_name = "main" , schema_name = "default" ) : print ( f" { table . full_name } : { table . table_type } " )

Get table info

table

w . tables . get ( full_name = "main.default.my_table" ) print ( f"Columns: { [ c . name for c in table . columns ] } " )

Check if table exists

exists

w . tables . exists ( full_name = "main.default.my_table" ) Unity Catalog - Catalogs & Schemas Doc (Catalogs): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/catalogs.html Doc (Schemas): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/schemas.html

List catalogs

for catalog in w . catalogs . list ( ) : print ( catalog . name )

Create catalog

w . catalogs . create ( name = "my_catalog" , comment = "Description" )

List schemas

for schema in w . schemas . list ( catalog_name = "main" ) : print ( schema . name )

Create schema

w . schemas . create ( name = "my_schema" , catalog_name = "main" ) Volumes Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/volumes.html from databricks . sdk . service . catalog import VolumeType

List volumes

for vol in w . volumes . list ( catalog_name = "main" , schema_name = "default" ) : print ( f" { vol . full_name } : { vol . volume_type } " )

Create managed volume

w . volumes . create ( catalog_name = "main" , schema_name = "default" , name = "my_volume" , volume_type = VolumeType . MANAGED )

Read volume info

vol

w . volumes . read ( name = "main.default.my_volume" ) Files API Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/files/files.html

Upload file to volume

w . files . upload ( file_path = "/Volumes/main/default/my_volume/data.csv" , contents = open ( "local_file.csv" , "rb" ) )

Download file

with w . files . download ( file_path = "/Volumes/main/default/my_volume/data.csv" ) as f : content = f . read ( )

List directory contents

for entry in w . files . list_directory_contents ( "/Volumes/main/default/my_volume/" ) : print ( f" { entry . name } : { entry . is_directory } " )

Upload/download with progress (parallel)

w . files . upload_from ( file_path = "/Volumes/main/default/my_volume/large.parquet" , source_path = "/local/path/large.parquet" , use_parallel = True ) w . files . download_to ( file_path = "/Volumes/main/default/my_volume/large.parquet" , destination = "/local/output/" , use_parallel = True ) Serving Endpoints (Model Serving) Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/serving/serving_endpoints.html

List endpoints

for ep in w . serving_endpoints . list ( ) : print ( f" { ep . name } : { ep . state } " )

Get endpoint

endpoint

w . serving_endpoints . get ( name = "my-endpoint" )

Query endpoint

response

w . serving_endpoints . query ( name = "my-endpoint" , inputs = { "prompt" : "Hello, world!" } )

For chat/completions endpoints

response

w . serving_endpoints . query ( name = "my-chat-endpoint" , messages = [ { "role" : "user" , "content" : "Hello!" } ] )

Get OpenAI-compatible client

openai_client

w . serving_endpoints . get_open_ai_client ( ) Vector Search Doc (Indexes): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/vectorsearch/vector_search_indexes.html Doc (Endpoints): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/vectorsearch/vector_search_endpoints.html

List vector search indexes

for idx in w . vector_search_indexes . list_indexes ( endpoint_name = "my-vs-endpoint" ) : print ( idx . name )

Query index

results

w . vector_search_indexes . query_index ( index_name = "main.default.my_index" , columns = [ "id" , "text" , "embedding" ] , query_text = "search query" , num_results = 10 ) for doc in results . result . data_array : print ( doc ) Pipelines (Delta Live Tables) Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/pipelines/pipelines.html

List pipelines

for pipeline in w . pipelines . list_pipelines ( ) : print ( f" { pipeline . name } : { pipeline . state } " )

Get pipeline

pipeline

w . pipelines . get ( pipeline_id = "abc123" )

Start pipeline update

w . pipelines . start_update ( pipeline_id = "abc123" )

Stop pipeline

w . pipelines . stop_and_wait ( pipeline_id = "abc123" ) Secrets Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/workspace/secrets.html

List secret scopes

for scope in w . secrets . list_scopes ( ) : print ( scope . name )

Create scope

w . secrets . create_scope ( scope = "my-scope" )

Put secret

w . secrets . put_secret ( scope = "my-scope" , key = "api-key" , string_value = "secret123" )

Get secret (returns GetSecretResponse with value)

secret

w . secrets . get_secret ( scope = "my-scope" , key = "api-key" )

List secrets in scope (metadata only, not values)

for s in w . secrets . list_secrets ( scope = "my-scope" ) : print ( s . key ) DBUtils Doc: https://databricks-sdk-py.readthedocs.io/en/latest/dbutils.html

Access dbutils through WorkspaceClient

dbutils

w . dbutils

File system operations

files

dbutils . fs . ls ( "/" ) dbutils . fs . cp ( "dbfs:/source" , "dbfs:/dest" ) dbutils . fs . rm ( "dbfs:/path" , recurse = True )

Secrets (same as w.secrets but dbutils interface)

value

dbutils . secrets . get ( scope = "my-scope" , key = "my-key" ) Common Patterns CRITICAL: Async Applications (FastAPI, etc.) The Databricks SDK is fully synchronous. All calls block the thread. In async applications (FastAPI, asyncio), you MUST wrap SDK calls with asyncio.to_thread() to avoid blocking the event loop. import asyncio from databricks . sdk import WorkspaceClient w = WorkspaceClient ( )

WRONG - blocks the event loop

async def get_clusters_bad ( ) : return list ( w . clusters . list ( ) )

BLOCKS!

CORRECT - runs in thread pool

async def get_clusters_good ( ) : return await asyncio . to_thread ( lambda : list ( w . clusters . list ( ) ) )

CORRECT - for simple calls

async def get_cluster ( cluster_id : str ) : return await asyncio . to_thread ( w . clusters . get , cluster_id )

CORRECT - FastAPI endpoint

from fastapi import FastAPI app = FastAPI ( ) @app . get ( "/clusters" ) async def list_clusters ( ) : clusters = await asyncio . to_thread ( lambda : list ( w . clusters . list ( ) ) ) return [ { "id" : c . cluster_id , "name" : c . cluster_name } for c in clusters ] @app . post ( "/query" ) async def run_query ( sql : str , warehouse_id : str ) :

Wrap the blocking SDK call

response

await asyncio . to_thread ( w . statement_execution . execute_statement , statement = sql , warehouse_id = warehouse_id , wait_timeout = "30s" ) return response . result . data_array Note: WorkspaceClient().config.host is NOT a network call - it just reads config. No need to wrap property access. Wait for Long-Running Operations from datetime import timedelta

Pattern 1: Use *_and_wait methods

cluster

w . clusters . create_and_wait ( cluster_name = "test" , spark_version = "14.3.x-scala2.12" , node_type_id = "i3.xlarge" , num_workers = 2 , timeout = timedelta ( minutes = 30 ) )

Pattern 2: Use Wait object

wait

w . clusters . create ( . . . ) cluster = wait . result ( )

Blocks until ready

Pattern 3: Manual polling with callback

def progress ( cluster ) : print ( f"State: { cluster . state } " ) cluster = w . clusters . wait_get_cluster_running ( cluster_id = "..." , timeout = timedelta ( minutes = 30 ) , callback = progress ) Pagination

All list methods return iterators that handle pagination automatically

for job in w . jobs . list ( ) :

Fetches all pages

print ( job . settings . name )

For manual control

from databricks . sdk . service . jobs import ListJobsRequest response = w . jobs . list ( limit = 10 ) for job in response : print ( job ) Error Handling from databricks . sdk . errors import NotFound , PermissionDenied , ResourceAlreadyExists try : cluster = w . clusters . get ( cluster_id = "invalid-id" ) except NotFound : print ( "Cluster not found" ) except PermissionDenied : print ( "Access denied" ) When Uncertain If I'm unsure about a method, I should: Check the documentation URL pattern: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/{category}/{service}.html Common categories: Clusters: /workspace/compute/clusters.html Jobs: /workspace/jobs/jobs.html Tables: /workspace/catalog/tables.html Warehouses: /workspace/sql/warehouses.html Serving: /workspace/serving/serving_endpoints.html Fetch and verify before providing guidance on parameters or return types. Quick Reference Links API Documentation URL Authentication https://databricks-sdk-py.readthedocs.io/en/latest/authentication.html Clusters https://databricks-sdk-py.readthedocs.io/en/latest/workspace/compute/clusters.html Jobs https://databricks-sdk-py.readthedocs.io/en/latest/workspace/jobs/jobs.html SQL Warehouses https://databricks-sdk-py.readthedocs.io/en/latest/workspace/sql/warehouses.html Statement Execution https://databricks-sdk-py.readthedocs.io/en/latest/workspace/sql/statement_execution.html Tables https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/tables.html Catalogs https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/catalogs.html Schemas https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/schemas.html Volumes https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/volumes.html Files https://databricks-sdk-py.readthedocs.io/en/latest/workspace/files/files.html Serving Endpoints https://databricks-sdk-py.readthedocs.io/en/latest/workspace/serving/serving_endpoints.html Vector Search https://databricks-sdk-py.readthedocs.io/en/latest/workspace/vectorsearch/vector_search_indexes.html Pipelines https://databricks-sdk-py.readthedocs.io/en/latest/workspace/pipelines/pipelines.html Secrets https://databricks-sdk-py.readthedocs.io/en/latest/workspace/workspace/secrets.html DBUtils https://databricks-sdk-py.readthedocs.io/en/latest/dbutils.html

返回排行榜