Cloud SQL Python Connector
The Cloud SQL Python Connector is a Cloud SQL connector designed for use with the
Python language. Using a Cloud SQL connector provides a native alternative to the
Cloud SQL Auth Proxy while
providing the following benefits:
- IAM Authorization: uses IAM permissions to control who/what can connect to
your Cloud SQL instances
- Improved Security: uses robust, updated TLS 1.3 encryption and
identity verification between the client connector and the server-side proxy,
independent of the database protocol.
- Convenience: removes the requirement to use and distribute SSL
certificates, as well as manage firewalls or source/destination IP addresses.
- (optionally) IAM DB Authentication: provides support for
Cloud SQL’s automatic IAM DB AuthN feature.
The Cloud SQL Python Connector is a package to be used alongside a database driver.
Currently supported drivers are:
Installation
You can install this library with pip install
, specifying the driver
based on your database dialect.
MySQL
pip install "cloud-sql-python-connector[pymysql]"
Postgres
There are two different database drivers that are supported for the Postgres dialect:
pg8000
pip install "cloud-sql-python-connector[pg8000]"
asyncpg
pip install "cloud-sql-python-connector[asyncpg]"
SQL Server
pip install "cloud-sql-python-connector[pytds]"
APIs and Services
This package requires the following to successfully make Cloud SQL Connections:
- IAM principal (user, service account, etc.) with the
Cloud SQL Client role. This IAM principal will be used for
credentials.
- The Cloud SQL Admin API to be enabled within your Google Cloud
Project. By default, the API will be called in the project associated with
the IAM principal.
Credentials
This library uses the Application Default Credentials (ADC) strategy for
resolving credentials. Please see these instructions for how to set your ADC
(Google Cloud Application vs Local Development, IAM user vs service account credentials),
or consult the google.auth package.
To explicitly set a specific source for the credentials, see
Configuring the Connector below.
Usage
This package provides several functions for authorizing and encrypting
connections. These functions are used with your database driver to connect to
your Cloud SQL instance.
The instance connection name for your Cloud SQL instance is always in the
format "project:region:instance".
How to use this Connector
To connect to Cloud SQL using the connector, inititalize a Connector
object and call its connect
method with the proper input parameters.
The Connector
itself creates connection objects by calling its connect
method but does not manage database connection pooling. For this reason, it is recommended to use the connector alongside a library that can create connection pools, such as SQLAlchemy. This will allow for connections to remain open and be reused, reducing connection overhead and the number of connections needed.
In the Connector's connect
method below, input your connection string as the first positional argument and the name of the database driver for the second positional argument. Insert the rest of your connection keyword arguments like user, password and database. You can also set the optional timeout
or ip_type
keyword arguments.
To use this connector with SQLAlchemy, use the creator
argument for sqlalchemy.create_engine
:
from google.cloud.sql.connector import Connector
import sqlalchemy
connector = Connector()
def getconn() -> pymysql.connections.Connection:
conn: pymysql.connections.Connection = connector.connect(
"project:region:instance",
"pymysql",
user="my-user",
password="my-password",
db="my-db-name"
)
return conn
pool = sqlalchemy.create_engine(
"mysql+pymysql://",
creator=getconn,
)
The returned connection pool engine can then be used to query and modify the database.
insert_stmt = sqlalchemy.text(
"INSERT INTO my_table (id, title) VALUES (:id, :title)",
)
with pool.connect() as db_conn:
db_conn.execute(insert_stmt, parameters={"id": "book1", "title": "Book One"})
result = db_conn.execute(sqlalchemy.text("SELECT * from my_table")).fetchall()
db_conn.commit()
for row in result:
print(row)
To close the Connector
object's background resources, call its close()
method as follows:
connector.close()
[!NOTE]
For more examples of using SQLAlchemy to manage connection pooling with the connector,
please see Cloud SQL SQLAlchemy Samples.
Configuring the Connector
If you need to customize something about the connector, or want to specify
defaults for each connection to make, you can initialize a
Connector
object as follows:
from google.cloud.sql.connector import Connector
connector = Connector(
ip_type="public",
enable_iam_auth=False,
timeout=30,
credentials=custom_creds,
refresh_strategy="lazy",
)
Using Connector as a Context Manager
The Connector
object can also be used as a context manager in order to
automatically close and cleanup resources, removing the need for explicit
calls to connector.close()
.
Connector as a context manager:
from google.cloud.sql.connector import Connector
import pymysql
import sqlalchemy
def init_connection_pool(connector: Connector) -> sqlalchemy.engine.Engine:
def getconn() -> pymysql.connections.Connection:
conn = connector.connect(
"project:region:instance",
"pymysql",
user="my-user",
password="my-password",
db="my-db-name"
)
return conn
pool = sqlalchemy.create_engine(
"mysql+pymysql://",
creator=getconn,
)
return pool
with Connector() as connector:
pool = init_connection_pool(connector)
insert_stmt = sqlalchemy.text(
"INSERT INTO my_table (id, title) VALUES (:id, :title)",
)
with pool.connect() as db_conn:
db_conn.execute(insert_stmt, parameters={"id": "book1", "title": "Book One"})
db_conn.commit()
result = db_conn.execute(sqlalchemy.text("SELECT * from my_table")).fetchall()
for row in result:
print(row)
Configuring a Lazy Refresh (Cloud Run, Cloud Functions etc.)
The Connector's refresh_strategy
argument can be set to "lazy"
to configure
the Python Connector to retrieve connection info lazily and as-needed.
Otherwise, a background refresh cycle runs to retrive the connection info
periodically. This setting is useful in environments where the CPU may be
throttled outside of a request context, e.g., Cloud Run, Cloud Functions, etc.
To set the refresh strategy, set the refresh_strategy
keyword argument when
initializing a Connector
:
connector = Connector(refresh_strategy="lazy")
Specifying IP Address Type
The Cloud SQL Python Connector can be used to connect to Cloud SQL instances
using both public and private IP addresses, as well as
Private Service Connect (PSC). To specify which IP address type to connect
with, set the ip_type
keyword argument when initializing a Connector()
or when
calling connector.connect()
.
Possible values for ip_type
are "public"
(default value),
"private"
, and "psc"
.
Example:
conn = connector.connect(
"project:region:instance",
"pymysql",
ip_type="private"
... insert other kwargs ...
)
[!IMPORTANT]
If specifying Private IP or Private Service Connect (PSC), your application must be
attached to the proper VPC network to connect to your Cloud SQL instance. For most
applications this will require the use of a VPC Connector.
Automatic IAM Database Authentication
Connections using Automatic IAM database authentication are supported when using Postgres or MySQL drivers.
First, make sure to configure your Cloud SQL Instance to allow IAM authentication
and add an IAM database user.
Now, you can connect using user or service account credentials instead of a password.
In the call to connect, set the enable_iam_auth
keyword argument to true and the user
argument to the appropriately formatted IAM principal.
Postgres: For an IAM user account, this is the user's email address. For a service account, it is the service account's email without the .gserviceaccount.com
domain suffix.
MySQL: For an IAM user account, this is the user's email address, without the @ or domain name. For example, for test-user@gmail.com
, set the user
argument to test-user
. For a service account, this is the service account's email address without the @project-id.iam.gserviceaccount.com
suffix.
Example:
conn = connector.connect(
"project:region:instance",
"pg8000",
user="postgres-iam-user@gmail.com",
db="my-db-name",
enable_iam_auth=True,
)
SQL Server (MSSQL)
[!IMPORTANT]
If your SQL Server instance is set to enforce SSL connections,
you need to download the CA certificate for your instance and include cafile={path to downloaded certificate}
and validate_host=False
. This is a workaround for a known issue.
Active Directory Authentication
Active Directory authentication for SQL Server instances is currently only supported on Windows.
First, make sure to follow these steps
to set up a Managed AD domain and join your Cloud SQL instance to the domain.
See here for more info on Cloud SQL Active Directory integration.
Once you have followed the steps linked above, you can run the following code to return a connection object:
conn = connector.connect(
"project:region:instance",
"pytds",
db="my-db-name",
active_directory_auth=True,
server_name="public.[instance].[location].[project].cloudsql.[domain]",
)
Or, if using Private IP:
conn = connector.connect(
"project:region:instance",
"pytds",
db="my-db-name",
active_directory_auth=True,
server_name="private.[instance].[location].[project].cloudsql.[domain]",
ip_type="private"
)
Using the Python Connector with Python Web Frameworks
The Python Connector can be used alongside popular Python web frameworks such
as Flask, FastAPI, etc, to integrate Cloud SQL databases within your
web applications.
[!NOTE]
For serverless environments such as Cloud Functions, Cloud Run, etc, it may be
beneficial to initialize the Connector
with the lazy refresh strategy.
i.e. Connector(refresh_strategy="lazy")
See Configuring a Lazy Refresh
Flask-SQLAlchemy
Flask-SQLAlchemy
is an extension for Flask
that adds support for SQLAlchemy to your
application. It aims to simplify using SQLAlchemy with Flask by providing
useful defaults and extra helpers that make it easier to accomplish
common tasks.
You can configure Flask-SQLAlchemy to connect to a Cloud SQL database from
your web application through the following:
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from google.cloud.sql.connector import Connector
connector = Connector()
def getconn():
conn = connector.connect(
"project:region:instance-name",
"pg8000",
user="my-user",
password="my-password",
db="my-database",
ip_type="public"
)
return conn
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = "postgresql+pg8000://"
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
"creator": getconn
}
db = SQLAlchemy()
db.init_app(app)
For more details on how to use Flask-SQLAlchemy, check out the
Flask-SQLAlchemy Quickstarts
FastAPI
FastAPI is a modern, fast (high-performance),
web framework for building APIs with Python based on standard Python type hints.
You can configure FastAPI to connect to a Cloud SQL database from
your web application using SQLAlchemy ORM
through the following:
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from google.cloud.sql.connector import Connector
def init_connection_pool(connector: Connector) -> Engine:
def getconn():
conn = connector.connect(
"project:region:instance-name",
"pg8000",
user="my-user",
password="my-password",
db="my-database",
ip_type="public"
)
return conn
SQLALCHEMY_DATABASE_URL = "postgresql+pg8000://"
engine = create_engine(
SQLALCHEMY_DATABASE_URL , creator=getconn
)
return engine
connector = Connector()
engine = init_connection_pool(connector)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
To learn more about integrating a database into your FastAPI application,
follow along the FastAPI SQL Database guide.
Async Driver Usage
The Cloud SQL Connector is compatible with
asyncio to improve the speed
and efficiency of database connections through concurrency. You can use all
non-asyncio drivers through the Connector.connect_async
function, in addition
to the following asyncio database drivers:
The Cloud SQL Connector has a helper create_async_connector
function that is
recommended for asyncio database connections. It returns a Connector
object that uses the current thread's running event loop. This is different
than Connector()
which by default initializes a new event loop in a
background thread.
The create_async_connector
allows all the same input arguments as the
Connector object.
Once a Connector
object is returned by create_async_connector
you can call
its connect_async
method, just as you would the connect
method:
Asyncpg Connection Pool
import asyncpg
from google.cloud.sql.connector import Connector, create_async_connector
async def main():
connector = create_async_connector()
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
return await connector.connect_async(
instance_connection_name,
"asyncpg",
user="my-user",
password="my-password",
db="my-db",
**kwargs,
)
pool = await asyncpg.create_pool(
"my-project:my-region:my-instance", connect=getconn
)
async with pool.acquire() as conn:
res = await conn.fetch("SELECT NOW()")
await connector.close_async()
SQLAlchemy Async Engine
import asyncpg
import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from google.cloud.sql.connector import Connector, create_async_connector
async def init_connection_pool(connector: Connector) -> AsyncEngine:
async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect_async(
"project:region:instance",
"asyncpg",
user="my-user",
password="my-password",
db="my-db-name"
)
return conn
pool = create_async_engine(
"postgresql+asyncpg://",
async_creator=getconn,
)
return pool
async def main():
connector = await create_async_connector()
pool = await init_connection_pool(connector)
async with pool.connect() as conn:
await conn.execute(sqlalchemy.text("SELECT NOW()"))
await connector.close_async()
await pool.dispose()
For more details on additional database arguments with an asyncpg.Connection
, please visit the
official documentation.
Async Context Manager
An alternative to using the create_async_connector
function is initializing
a Connector
as an async context manager, removing the need for explicit
calls to connector.close_async()
to cleanup resources.
[!NOTE]
This alternative requires that the running event loop be
passed in as the loop
argument to Connector()
.
Asyncpg Connection Pool
import asyncpg
from google.cloud.sql.connector import Connector, create_async_connector
async def main():
loop = asyncio.get_running_loop()
async with Connector(loop=loop) as connector:
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
return await connector.connect_async(
instance_connection_name,
"asyncpg",
user="my-user",
password="my-password",
db="my-db",
**kwargs,
)
pool = await asyncpg.create_pool(
"my-project:my-region:my-instance", connect=getconn
)
async with pool.acquire() as conn:
res = await conn.fetch("SELECT NOW()")
SQLAlchemy Async Engine
import asyncio
import asyncpg
import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from google.cloud.sql.connector import Connector
async def init_connection_pool(connector: Connector) -> AsyncEngine:
async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect_async(
"project:region:instance",
"asyncpg",
user="my-user",
password="my-password",
db="my-db-name"
)
return conn
pool = create_async_engine(
"postgresql+asyncpg://",
async_creator=getconn,
)
return pool
async def main():
loop = asyncio.get_running_loop()
async with Connector(loop=loop) as connector:
pool = await init_connection_pool(connector)
async with pool.connect() as conn:
await conn.execute(sqlalchemy.text("SELECT NOW()"))
await pool.dispose()
Debug Logging
The Cloud SQL Python Connector uses the standard Python logging module
for debug logging support.
Add the below code to your application to enable debug logging with the Cloud SQL
Python Connector:
import logging
logging.basicConfig(format="%(asctime)s [%(levelname)s]: %(message)s")
logger = logging.getLogger(name="google.cloud.sql.connector")
logger.setLevel(logging.DEBUG)
For more details on configuring logging, please refer to the
Python logging docs.
Support policy
Major version lifecycle
This project uses semantic versioning, and uses the
following lifecycle regarding support for a major version:
Active - Active versions get all new features and security fixes (that
wouldn’t otherwise introduce a breaking change). New major versions are
guaranteed to be "active" for a minimum of 1 year.
Deprecated - Deprecated versions continue to receive security and critical
bug fixes, but do not receive new features. Deprecated versions will be publicly
supported for 1 year.
Unsupported - Any major version that has been deprecated for >=1 year is
considered publicly unsupported.
Supported Python Versions
We follow the Python Version Support Policy used by Google Cloud
Libraries for Python. Changes in supported Python versions will be
considered a minor change, and will be listed in the release notes.
Release cadence
This project aims for a minimum monthly release cadence. If no new
features or fixes have been added, a new PATCH version with the latest
dependencies is released.
Contributing
We welcome outside contributions. Please see our
Contributing Guide for details on how best to contribute.